RabbitMQ实现生产者消费者
一.启动MQ
注意管理员身份进入cmd才行,我这里是在本地安装的MQ,推荐使用虚拟机安装
二.思路
官方解释RabbitMQ结构:
自我理解RabbitMQ结构:
其实RabbitMQ的服务器就像邮局一样,我们的生产者和消费者对于这个服务器来说都是消费者,因为服务器都可以向两者发送消息
环境准备
导入依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
</dependencies>
建立生产者消费者
三.生产者代码
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");//MQ服务器地址
connectionFactory.setPort(5672);//端口
connectionFactory.setUsername("guest");//账号
connectionFactory.setPassword("guest");//密码
connectionFactory.setVirtualHost("/");//虚拟主机名称
Connection connection= connectionFactory.newConnection();
//2开启信道
Channel channel =connection.createChannel();
//3生明队列
channel.queueDeclare("hello",true,false,false,null);
//4发送消息
String message="hello,my name is RabbitMQ";
channel.basicPublish("","hello",null,message.getBytes());
System.out.println("成功发送消息");
//5资源释放
channel.close();
connection.close();
}
}
代码中方法解读解读:
四.消费者代码
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.建立连接
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");//MQ服务器地址
connectionFactory.setPort(5672);//端口
connectionFactory.setUsername("guest");//账号
connectionFactory.setPassword("guest");//密码
connectionFactory.setVirtualHost("/");//虚拟主机名称
Connection connection= connectionFactory.newConnection();
//2开启信道
Channel channel =connection.createChannel();
//3生明队列
channel.queueDeclare("hello",true,false,false,null);
//4消费消息
DefaultConsumer consumer =new DefaultConsumer(channel){
//队列收到消息,执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到了消息"+new String(body));
}
};
channel.basicConsume("hello",true,consumer);
//等待程序接受完毕大部分消息
Thread.sleep(3000);//没有这条代码,将只接受MQ中一条消息
//5资源释放
channel.close();
connection.close();
}
}
读取了MQ中全部消息