2.7日学习打卡----初学RabbitMQ(二)
2.7日学习打卡
JMS
由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则
——JMS,用于操作消息中间件。JMS即Java消息服务
(JavaMessage Service)应用程序接口,是一个Java平台中关于面
向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多
MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没
有实现JMS规范,但是开源社区有JMS的实现包。
创建项目
# 开启管控台插件
rabbitmq-plugins enable
rabbitmq_management
# 启动rabbitmq
rabbitmq-server -detached
创建普通maven项目,添加RabbitMQ依赖:
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqpclient</artifactId>
<version>5.14.0</version>
</dependency>
</dependencies>
一. RabbitMQ 简单模式
P:生产者,也就是要发送消息的程序
C:消费者:消息的接收者,会一直等待消息到来
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息
特点:
- 一个生产者对应一个消费者,通过队列进行消息传递。
- 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机
生产者代码实现
步骤:
- 创建连接工厂ConnectionFactory
- 设置工厂的参数
- 创建连接 Connection
- 创建管道 Channel
- 简单模式中没有交换机exchange,所以不用创建(RabbitMQ会使用默认的交换机!)
- 创建队列 queue
- 设置发送内容,使用channal.basicPublish()发送
- 释放资源
代码实现
package com.jjy.mq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
//生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//使用自己的服务器ip地址
connectionFactory.setHost("192.168.66.100");
//rabbitmq的默认端口5672
connectionFactory.setPort(5672);
//用户名
connectionFactory.setUsername("jjy");
//密码
connectionFactory.setPassword("jjy");
//虚拟机
connectionFactory.setVirtualHost("/");
//2.创建连接
Connection connection = connectionFactory.newConnection();
//3.建立信道
Channel channel = connection.createChannel();
//4.创建队列,如果队列已存在,则使用该队列
/**
// * 参数1:队列名
// * 参数2:是否持久化,true表示MQ重启后队列还在。
// * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
// * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
// * 参数5:其他额外参数
// */
channel.queueDeclare("simple_queue",false,false,false,null);
//5.发送消息
String mesg="hello rabbitmq";
/**
* 参数1:交换机名,""表示默认交换机
* 参数2:路由键,简单模式就是队列名
* 参数3:其他额外参数
* 参数4:要传递的消息字节数组
*/
channel.basicPublish("","simple_queue",null,mesg.getBytes());
//6.关闭资源(信道和连接)
channel.close();
connection.close();
System.out.println("发送成功");
}
}
消费者代码实现
步骤:
1.创建连接工厂ConnectionFactory
2.设置工厂参数
3.创建连接
4.创建信道
前四步代码基本是一致的,需要注意的是生产者与消费者的Channel是不同Connection中的!不是同一个对象.
5. 最简单的模型没有交换机exchange,所以此处RabbitMQ会使用默认的交换机
6. 接收消息,有一个回调方法 channel.basicConsume()
代码实现
package com.jjy.mq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Customer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("jjy");
connectionFactory.setPassword("jjy");
connectionFactory.setVirtualHost("/");
//2.创建连接
Connection connection = connectionFactory.newConnection();
//3.建立信道
Channel channel = connection.createChannel();
//4.监听队列
/**
* 参数1:监听的队列名
* 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
* 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
*/
channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"UTF-8");
System.out.println("接受消息,消息为:"+message);
}
});
//
}
}
二. RabbitMQ 工作队列模式
与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该
模式也使用direct交换机,应用于处理消息较多的情况。特点如
下:
- 一个队列对应多个消费者。
- 一条消息只会被一个消费者消费。
- 消息队列默认采用轮询的方式将消息平均发送给消费者
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
生产者代码实现
代码实现
package com.jjy.mq.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("jjy");
connectionFactory.setPassword("jjy");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.创建队列,持久化队列
channel.queueDeclare("work_queue",true,false,false,null);
// 5.发送大量消息,参数3表示该消息为持久化消息,即除了保存到内存还会保存到磁盘中
for(int i=0;i<100;i++){
channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("你好,这是今天的第"+i+"条消息").getBytes());
}
// 6.关闭资源
channel.close();
connection.close();
}
}
消费者代码实现
消费者1:
package com.jjy.mq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("jjy");
connectionFactory.setPassword("jjy");
connectionFactory.setVirtualHost("/");
//2.创建连接
Connection connection = connectionFactory.newConnection();
//3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列,处理消息
channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("消费者1消费消息,消息为:" + message);
}
});
}
}
消费者2
package com.jjy.mq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Customer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("jjy");
connectionFactory.setPassword("jjy");
connectionFactory.setVirtualHost("/");
//2.创建连接
Connection connection = connectionFactory.newConnection();
//3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列,处理消息
channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("消费者2消费消息,消息为:" + message);
}
});
}
}
消费者3
package com.jjy.mq.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Customer3 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("jjy");
connectionFactory.setPassword("jjy");
connectionFactory.setVirtualHost("/");
//2.创建连接
Connection connection = connectionFactory.newConnection();
//3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列,处理消息
channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("消费者3消费消息,消息为:" + message);
}
});
}
}
三. RabbitMQ 发布订阅模式
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机
C:消费者,消息的接收者,会一直等待消息到来
Queue:消息队列,接收消息、缓存消息
在开发过程中,有一些消息需要不同消费者进行不同的处理,如电
商网站的同一条促销信息需要短信发送、邮件发送、站内信发送
等。此时可以使用发布订阅模式(Publish/Subscribe)
特点:
- 生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。
- 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。
Exchange:交换机(X)一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、 递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
➢ Fanout:广播,将消息交给所有绑定到交换机的队列
➢ Direct:定向,把消息交给符合指定routing key 的队列
➢ Topic(常用):通配符,把消息交给符合routing pattern(路由模式)的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
生产者代码实现
与之前的步骤相比,多了创建交换机和绑定交换机与队列的操作
代码实现
package com.jjy.mq.publish;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class produce {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("jjy");
connectionFactory.setPassword("jjy");
connectionFactory.setVirtualHost("/");
//2.创建连接
Connection connection = connectionFactory.newConnection();
//3.建立信道
Channel channel = connection.createChannel();
//4.创建交换机
/*
exchangeDeclare(String exchange, -- 交换机的名称
String type, -- 交换机的类型,4种
枚举(direct,fanout,topic,headers)
boolean durable, -- 持久化
boolean autoDelete, -- 自动删除
boolean internal, -- 内部使用,基本是false
Map<String, Object> arguments) -- 参数
*
/**
* 参数1:交换机名
* 参数2:交换机类型
* 参数3:交换机持久化
*/
channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);
//5.创建队列
//短信队列
channel.queueDeclare("SEND_MAIL",true,false,false,null);
//消息队列
channel.queueDeclare("SEND_MESSAGE",true,false,false,null);
//站内信息
channel.queueDeclare("SEND_STATION",true,false,false,null);
//6.交换机绑定队列
/**
* 参数1:队列名
* 参数2:交换机名
* 参数3:路由关键字,发布订阅模式写""即可
*/
channel.queueBind("SEND_MAIL","exchange_fanout","");
channel.queueBind("SEND_MESSAGE","exchange_fanout","");
channel.queueBind("SEND_STATION","exchange_fanout","");
//7.发送消息
for (int i = 1; i <= 10 ; i++) {
channel.basicPublish("exchange_fanout","",null,
("你好,尊敬的用户,秒杀商品开抢了!"+i).getBytes(StandardCharsets.UTF_8));
}
//8.关闭资源
channel.close();
connection.close();
}
}
消费者代码实现
接下来编写三个消费者,分别监听各自的队列。
//站内信消费者
package com.jjy.mq.publish;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
// 站内信消费者
public class CustomerStation {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("jjy");
connectionFactory.setPassword("jjy");
connectionFactory.setVirtualHost("/");// 默认虚拟机
//2.创建连接
Connection conn = connectionFactory.newConnection();
//3.建立信道
Channel channel = conn.createChannel();
// 4.监听队列
channel.basicConsume("SEND_STATION", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("发送站内信:"+message);
}
});
}
}
邮件消费者
package com.jjy.mq.publish;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class CustomerMail {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("jjy");
connectionFactory.setPassword("jjy");
connectionFactory.setVirtualHost("/");// 默认虚拟机
//2.创建连接
Connection conn = connectionFactory.newConnection();
//3.建立信道
Channel channel = conn.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MAIL", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("发送邮件:"+message);
}
});
}
}
短信消费者
package com.jjy.mq.publish;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class CustomerMessage {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("jjy");
connectionFactory.setPassword("jjy");
connectionFactory.setVirtualHost("/");// 默认虚拟机
//2.创建连接
Connection conn = connectionFactory.newConnection();
//3.建立信道
Channel channel = conn.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("发送短信:"+message);
}
});
}
}
也可以使用工作队列+发布订阅模式同时使用,两个消费者同时监听
一个队列:
// 短信消费者2
public class CustomerMessage2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.162");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");// 默认虚拟机
//2.创建连接
Connection conn = connectionFactory.newConnection();
//3.建立信道
Channel channel = conn.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("发送短信2:"+message);
}
});
}
}
两个不一样的系统,对同一条消息做不一样的处理
发布订阅模式与工作队列模式的区别
(1)工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机
(2)发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用 默认交换机)
(3)发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑定到默认的交换机
四. RabbitMQ 路由模式
使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多
时候,不是所有消息都无差别的发布到所有队列中。比如电商网站
的促销活动,双十一大促可能会发布到所有队列;而一些小的促销
活动为了节约成本,只发布到站内信队列。此时需要使用路由模式
(Routing)完成这一需求。
特点:
- 每个队列绑定路由关键字RoutingKey
- 生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模
式使用direct交换机。
队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息
生产者代码实现
package com.jjy.mq.routing;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class produce {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("jjy");
connectionFactory.setPassword("jjy");
connectionFactory.setVirtualHost("/");
//2.创建连接
Connection connection = connectionFactory.newConnection();
//3.建立信道
Channel channel = connection.createChannel();
//4.创建交换机
/**
* 参数1:交换机名
* 参数2:交换机类型
* 参数3:交换机持久化
*/
channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);
// 5.创建队列
channel.queueDeclare("SEND_MAIL2",true,false,false,null);
channel.queueDeclare("SEND_MESSAGE2",true,false,false,null);
channel.queueDeclare("SEND_STATION2",true,false,false,null);
//6.交换机绑定队列
/**
* 参数1:队列名
* 参数2:交换机名
* 参数3:路由关键字,发布订阅模式写""即可
*/
channel.queueBind("SEND_MAIL2","exchange_routing","import");
channel.queueBind("SEND_MESSAGE2","exchange_routing","import");
channel.queueBind("SEND_STATION2","exchange_routing","import");
channel.queueBind("SEND_STATION2","exchange_routing","normal");
//7.发送消息
channel.basicPublish("exchange_routing","import",null,
"双十一大促活动".getBytes());
channel.basicPublish("exchange_routing","normal",null,
"小型促销活动".getBytes());
//8.关闭资源
channel.close();
connection.close();
}
}
消费者代码实现
站内信消费者
package com.jjy.mq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
// 站内信消费者
public class CustomerStation {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("jjy");
connectionFactory.setPassword("jjy");
connectionFactory.setVirtualHost("/");// 默认虚拟机
//2.创建连接
Connection conn = connectionFactory.newConnection();
//3.建立信道
Channel channel = conn.createChannel();
// 4.监听队列
channel.basicConsume("SEND_STATION2", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("发送站内信:"+message);
}
});
}
}
短信消费者
package com.jjy.mq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class CustomerMessage {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("jjy");
connectionFactory.setPassword("jjy");
connectionFactory.setVirtualHost("/");// 默认虚拟机
//2.创建连接
Connection conn = connectionFactory.newConnection();
//3.建立信道
Channel channel = conn.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MESSAGE2", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("发送短信:"+message);
}
});
}
}
邮件消费者
package com.jjy.mq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class CustomerMail {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("jjy");
connectionFactory.setPassword("jjy");
connectionFactory.setVirtualHost("/");// 默认虚拟机
//2.创建连接
Connection conn = connectionFactory.newConnection();
//3.建立信道
Channel channel = conn.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MAIL2", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("发送邮件:"+message);
}
});
}
}
总的来说就一句话:
Routing 模式要求队列在绑定交换机时要指定 routing key,消息会转发到符合 routing key 的队列。
五. RabbitMQ 通配符模式
通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的
路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消
息转发到该队列。通配符模式比路由模式更灵活,使用topic交换
机.
通配符规则:
- 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以 . 分割。
- 队列设置RoutingKey时, # 可以匹配任意多个单词, * 可以匹配任意一个单词。
生产者代码实现
代码实现
package com.jjy.mq.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class produce {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("jjy");
connectionFactory.setPassword("jjy");
connectionFactory.setVirtualHost("/");
//2.创建连接
Connection connection = connectionFactory.newConnection();
//3.建立信道
Channel channel = connection.createChannel();
//4.创建交换机
/**
* 参数1:交换机名
* 参数2:交换机类型
* 参数3:交换机持久化
*/
channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);
// 5.创建队列
channel.queueDeclare("SEND_MAIL3",true,false,false,null);
channel.queueDeclare("SEND_MESSAGE3",true,false,false,null);
channel.queueDeclare("SEND_STATION3",true,false,false,null);
//6.交换机绑定队列
channel.queueBind("SEND_MAIL3","exchange_topic","#.mail.#");
channel.queueBind("SEND_MESSAGE3","exchange_topic","#.message.#");
channel.queueBind("SEND_STATION3","exchange_topic","#.station.#");
//7.发送消息
channel.basicPublish("exchange_topic","mail.message.station",null,
"双十一大促活动".getBytes());
channel.basicPublish("exchange_topic","station",null,
"小型促销活动".getBytes());
//8.关闭资源
channel.close();
connection.close();
}
}
消费者代码实现
站内信消费者
package com.jjy.mq.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
// 站内信消费者
public class CustomerStation {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("jjy");
connectionFactory.setPassword("jjy");
connectionFactory.setVirtualHost("/");// 默认虚拟机
//2.创建连接
Connection conn = connectionFactory.newConnection();
//3.建立信道
Channel channel = conn.createChannel();
// 4.监听队列
channel.basicConsume("SEND_STATION3", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("发送站内信:"+message);
}
});
}
}
短信消费者
package com.jjy.mq.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class CustomerMessage {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("jjy");
connectionFactory.setPassword("jjy");
connectionFactory.setVirtualHost("/");// 默认虚拟机
//2.创建连接
Connection conn = connectionFactory.newConnection();
//3.建立信道
Channel channel = conn.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MESSAGE3", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("发送短信:"+message);
}
});
}
}
邮件消费者
package com.jjy.mq.topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class CustomerMail {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("jjy");
connectionFactory.setPassword("jjy");
connectionFactory.setVirtualHost("/");// 默认虚拟机
//2.创建连接
Connection conn = connectionFactory.newConnection();
//3.建立信道
Channel channel = conn.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MAIL3", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("发送邮件:"+message);
}
});
}
}
总述:topics模式比routing模式要更加灵活,笼统的说就是routing模式加上通配符
如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!