2.rabbitMQ之交换机
1.交换机的作用
1.默认交换机会自动指定队列
2.之前一个信息必须被消费1次,现在的是一个消息可以被消费多次(发送到不同队列的前提下,正常情况下一个队列只能消费一次)
3.消息先发给交换机,然后交换机发给多个队列,可以达到多次消费的效果
如图mq3
2.交换机的类型
- 默认交换机 无名 ""指定
1.直接类型 direct(一个routingKey绑定一个队列,一个交换机绑定一个队列)
2.主题 topic(一个交换机绑定多个队列,主要是通过表达式来实现)
3.标题 headers(不常用)
4.扇出 fanout(一个信息被交换机全部队列接收,相当于QQ聊天)
3.临时队列 没有D的队列 一旦断开连接,队列会被自动删除
//获得临时队列,features有 AD和 Excl
String QName=channel.quequeDeclare().getQueue();
4.交换机和队列绑定binding
//在界面, 添加queue然后添加exchange,然后在交换机 添加队列,
rountingkey代表想要发给哪个队列,后面可以指定哪个可以接收特定的信息
5.fanout(相当于qq群的广播,一条消息被多台计算机 接收) ,队列名可以写为空
1. 2个消费者 声明交换机的名字和类型 主要代码如下
channel.exchangeDeclare(name,"fanout");
String QName=channel.queueDeclare().getQueue();
//队列名,交换机名,routingKey
channel.queueBind(QName,exName,"");
-------完整代码-------
public class exchangeConsumer1 {
public static final String EXCHANGE_NAME="log";
public static void main(String[] args) throws Exception {
Channel channel = MQRabbitUtil.getChannel();
//得到临时队列
String QName=channel.queueDeclare().getQueue();
//交换机的名字和类型
// channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
channel.queueBind(QName,EXCHANGE_NAME,"");
DeliverCallback deliverCallback=(tag, delivery)->{
System.out.println("consumer1"+new String(delivery.getBody(),"UTF-8"));
};
CancelCallback nCallback=(tag)->{
System.out.println("失败应答");
};
boolean IsAck=false;
channel.basicConsume(QName,IsAck,deliverCallback,(tag)->{});
}
}
2.生产者 也要不用再次声明交换机,不用队列名了,用交换机名就可以接收消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
-------完整代码-------
public class exchangeProducer {
public static final String EXCHANGE_NAME="log";
public static void main(String[] args) throws Exception {
Channel channel = MQRabbitUtil.getChannel();
//交换机的名字和类型
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner scanner = new Scanner(System.in);
//模拟生产者不停发消息
while (scanner.hasNext()){
String next = scanner.next();
//交换机
//队列名
//设置消息持久化
//二进制
channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
// boolean flag=channel.waitForConfirms();
// if(flag){
// System.out.println("消息已经写入磁盘的确认");
//
// }
}
}
}
6.直接交换机 direct(rountingKey相等就是fanout交换机,不相等就是direct)(路由)
//可以绑定队列
1.提供者修改交换机类型和routingKey
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
channel.basicPublish(EXCHANGE_NAME,"wrong", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
2.消费者修改交换机类型和routingKey
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
channel.queueBind(QName,EXCHANGE_NAME,"error");
7.topic主题交换机,direct只能一个交换机绑定一个队列,这个可以路由多个队列(生产者不声明队列,只声明交换机,消费者声明队列和交换机)
1.routingKey的写法单词不能全部是字母(就成direct交换机了) 如aa.bb.mq *代替1个单词 #代替0个或多个单词
如 lazy.#可以匹配 lazy.ngs.me
*.*.rabbit 匹配 ngs.me.rabbit
2.注意 绑定是 #相当于fanout交换机 #和*都没有出现就是direct交换机
3.代码 交换机为topic
#消费者.queueBind()可以写多次
-------代码------
public class TopicProducer {
public static final String EXCHANGE_NAME="log2";
public static void main(String[] args) throws Exception {
Channel channel = MQRabbitUtil.getChannel();
channel.confirmSelect();
//交换机的名字和类型
// channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
// channel.queueDeclare("Q1",false,false,false,null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String next = scanner.next();
//交换机
//队列名
//设置消息持久化
//二进制
//fanout交换机模式
// channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
//direct交换机,这里发送的routingKey可以自行修改测试
channel.basicPublish(EXCHANGE_NAME,"quick.orange.rabbit", MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
// boolean flag=channel.waitForConfirms();
// if(flag){
// System.out.println("消息已经写入磁盘的确认");
//
// }
}
}
}
----消费者-----
public class TopicConsumer1 {
public static final String EXCHANGE_NAME="log2";
public static void main(String[] args) throws Exception {
Channel channel = MQRabbitUtil.getChannel();
//得到临时队列
//交换机的名字和类型
// channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String queueName="Q1";
channel.queueDeclare(queueName, false, false, false, null);
//!!!核心代码,就是发送到的rountingKey和队列绑定
channel.queueBind("Q1",EXCHANGE_NAME,"*.orange.*");
channel.queueBind("Q1",EXCHANGE_NAME,"*.orange.aa");
DeliverCallback deliverCallback=(tag, delivery)->{
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
System.out.println("consumer1"+new String(delivery.getBody(),"UTF-8"));
};
CancelCallback nCallback=(tag)->{
System.out.println("失败应答");
};
boolean IsAck=false;
//消费信息
channel.basicConsume("Q1",IsAck,deliverCallback,(tag)->{});
}
}
8.死信队列(不能被消费的信息放到死信队列,防止消息丢失) 管理界面队列有DLK代表开启了死信
1.来源
—1.消息TTL过期(信息指定时间会过期)
—2.队列达到最大长度(队列满了)
—3.信息被拒绝(basic.reject或basic.nack不应答) requeue=false(信息不放回队列,丢失?)
------2.实现(绑定普通队列和死信队列) 普通和死信交换机都为 dirrect
如图mq4
//普通队列要声明的时候加入arg才会转发到死信队列 !!!注意设置的普通队列的args,不是死信的
-----消费者-----
Map<String,Object> args=new HashMap();
args.put("x-dead-letter-exchange","DEAD_EXCHANGE_NAME");
//设置死信 routingKey
args.put("x-dead-letter-routing-key","lisi");
channe.queueDeclare(...,args);
//过期时间这里可以设置,也可producer声明
//声明队列
--------生产者--------
//设置ttl time to live过期时间
channel.basicPublish(...,prop,...);
9.死信队列之队列达到最大长度
//消费者设置最大正常队列的长度
10.死信队列之信息被拒绝
//消费者
channel.basicReject(msg.getEnvelope().getDeliveryTag(),false);//不放回队列
//一定要开启手动应答
11.延迟队列.是死信队列的过期时间(企业上班案例)
//整合springboot 选2.3.1,复制依赖,复制配置
//整合后不用自己声明队列和交换机,由专门的配置类写
1.配置文件类
@Configuration //配置类上面写
//声明交换机
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange("name");
}
//声明队列
@Bean("queueA")
public Queue xExchange(){
//指定死信 xxx参数
return QueueBuilder.durable(Queue_A).withArguments(xxx).build();
}
//死信队列
@Bean("queueA")
public Queue xExchange(){
//没有参数
return QueueBuilder.durable(DEAD_QUEUE).build();
}
//绑定,名字必须要有语义化 semantic
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
//,队列和交换机绑定,并指定routingKey
return BindingBuilder.bind(queueA).to(xExchange).with("XB");
}
2.写controller接收消息
@Autowired
private RabbitTemplate rabbitTemplate;
log.info("xxxx{},xxx{}",new Date().toString(),msg);
//发送消息来自spring公司的工具,转发到交换机和发送
rabbitTemplate.convertAndSend("X","routingKey","xxmsg")
3.死信消费者接收消息 需要监听器
@Slf4j
@Component
xx class
@RabbitListener(queues="QD")
//注意!!!msg是spring的类,Channel是mq的,刚开始导错包
public void xx(Message msg,Channel channel){
sout(new String(msg.getBody()));
}
12.延迟优化 不在队列声明写延迟时间,而在生产者的声明,就不用一直更新队列代码
图mq5
----1.增加一个不设置时间的队列
//controller当生产者
rabbitTemplate.convertAndSend("exchange_x", "XB", "消息来自 ttl 为 xS 的队列: "+message, correlationData ->{
correlationData.getMessageProperties().setExpiration(ttl);
return correlationData;
});
2.死信做延迟的缺陷,因为是排队的,所以发送多条消息不同延迟时间,按第一条的时间延迟(mq只会检查第一条消息是否过期)(导致先发送时间长的数据一直等待,其他后发送的数据在等待完成用同一时间送达)
3.解决方法(使用插件) 将我们的插件 复制到 mq的plugin的文件夹下(到exchange页面会多一个x-delayed-messeage选项)由交换机延迟了
//文件夹的路径
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
//插件放到文件夹后,enable开启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
//只能配置类写自定义交换机
@Bean
public CustomExchange delayedExchange(){
}
//绑定交换机和队列
//生产者设置时间,注意之前是 setExpiration现在是setDelay交换机延迟
//代码如下
-----配置类-------
@Configuration
public class TtlQueueConfig{
private static String Exchange_X= "exchange_x";
private static String Exchange_Y_DEAD= "exchange_y_dead";
private static String QUEUE_A= "queue_a";
private static String QUEUE_B= "queue_b";
private static String QUEUE_D= "queue_dead";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(Exchange_X);
}
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Exchange_Y_DEAD);
}
//自定义交换机 我们在这里定义的是一个延迟交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
//自定义交换机的类型 !!!
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,
args);
}
@Bean("queueA")
public Queue queueA(){
// Map<String, Object> args = new HashMap<>();
// //声明当前队列绑定的死信交换机
// args.put("x-dead-letter-exchange",Exchange_Y_DEAD);
// //声明当前队列的死信路由 key
// args.put("x-dead-letter-routing-key", "YD");
// //声明队列的 TTL
// args.put("x-message-ttl", 10000);
// return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
return QueueBuilder.durable(QUEUE_A).build();
}
@Bean("queueB")
public Queue queueB(){
// Map<String, Object> args = new HashMap<>();
// //声明当前队列绑定的死信交换机
// args.put("x-dead-letter-exchange",Exchange_Y_DEAD);
// //声明当前队列的死信路由 key
// args.put("x-dead-letter-routing-key", "YD");
// //声明队列的 TTL
// args.put("x-message-ttl", 40000);
// return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
return QueueBuilder.durable(QUEUE_B).build();
}
@Bean("queueDead")
public Queue queueDead(){
return QueueBuilder.durable(QUEUE_D).build();
}
@Bean("xBindingQueueA")
public Binding xBindingQueueA(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean("xBindingQueueB")
public Binding xBindingQueueB(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean("yBindingQueueDead")
public Binding yBindingQueueDead(@Qualifier("queueDead") Queue queueDead,@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueDead).to(yExchange).with("YD");
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("queueDead") Queue queue,
@Qualifier("delayedExchange") CustomExchange
delayedExchange) {
return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
---------死信消费者-------
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues="queue_dead")
public void deadMeg(Message msg, Channel channel){
String s = new String(msg.getBody());
log.info("时间,{}消息{}",new Date(),s);
}
}