RabbitMQ的路由模式
路由模式
队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息
生产者代码
public class RouteProducer {
public static void main(String[] args) throws Exception {
//1.创建连接
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.64.140");
cf.setPort(5672);
cf.setUsername("guest");
cf.setPassword("guest");
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//2.定义交换机
cc.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
//3.发送数据,携带路由键
while(true) {
Scanner scanner = new Scanner(System.in);
System.out.print("消息:");
String s = scanner.nextLine();
System.out.print("路由键:");
String key =scanner.nextLine();
cc.basicPublish("direct_logs", key, null, s.getBytes())
System.out.println("-----------------------------------------------------");
}
}
}
消费者代码
public class RouteConsumer {
public static void main(String[] args) throws Exception {
//1.创建连接
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("192.168.64.140");
cf.setPort(5672);
cf.setUsername("guest");
cf.setPassword("guest");
Connection nc = cf.newConnection();
Channel cc = nc.createChannel();
//2.定义交换机
cc.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
/3.定义队列
String queue = cc.queueDeclare().getQueue();
//4.绑定队列和交换机(重复绑定多次)
System.out.print("输入绑定键,用空格隔开:");
String s = new Scanner(System.in).nextLine();//aaa bbb ccc
String[] a = s.split("\\s+");
for (String key : a) {
ch.queueBind(queue, "direct_logs", key);
}
//5.处理数据
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
//从message中取出消息和路由键
String s=new String(message.getBody());
String key = message.getEnvelope().getRoutingKey();
System.out.println(s+"--"+key);
System.out.println("=======================================");
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
}
};
cc.basicConsume(queue, true,deliverCallback,cancelCallback);
}
}
上一篇文章:https://blog.csdn.net/Z0412_J0103/article/details/143354922https://blog.csdn.net/Z0412_J0103/article/details/143354922下一篇文章: