RibbitMQ-原理使用
本文主要介绍RibbitMQ的基本架构,以及交换机Exchange在Direct、Fanout、Topic、Headers模式下的生产者、消费者应该如何发送、接收消息。
基本架构
RibbitMQ是一款基于AMQP(高级消息队列协议)用于软件之间通信的中间件,由Rabbit公司开发,服务器端用Erlang语言编写,支持多种语言的客户端。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
Rabbitmq四大核心:生产者、消费者、队列、交换机。
AMQP协议是一种二进制协议,它定义了一组规则和标准,以确保消息可以在不同的应用程序和平台之间传递和解释,AMQP协议包含四个核心组件:消息、交换机、队列、绑定
Virtual host: Virtual host是一个虚拟主机的概念,一个RibbitMQ服务端中可以有多个Virtual host,每个Virtual host都有一套自己的Exchange和Queue,同一个Virtual host中的Exchange和Queue不能重名,不同的Virtua host中的Exchange和Queue名字可以一样。这样,不同的用户在访问同一个RabbitMQ服务端时,可以创建自己单独的Virtual host,然后在自己的Virtual host中创建Exchange和Queue,很好地做到了不同用户之间消息的相互隔性。
Connection:producer/consumer和RibbitMQ服务端之间的TCP连接。
Channel:发送消息的通道,如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection 的开销将是巨大的,效率也较低。Channel 是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯。
Exchange:message到达RabbitMQ的第一站,根据分发规则,匹配査询表中的routing key,分发消息到queue中去。常用的类型有:direct、 topic、 fanout、 headers。
Queue:Queue是一个用来存放消息的队列,生产者发送的消息会被放到Queue中,消费者消费消息时也是从Queue中取走消息。
Message:Message是储存消息的最小单位。
基本使用
maven依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.22.0</version>
</dependency>
producer
/**
* 发送消息
*/
public static void sendMessage() throws IOException, TimeoutException {
String queueName = "xiaoyuan-queue";
String exchangeName = "xiaoyuan-exchange";
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// rabbitmq 服务地址
connectionFactory.setHost("8.140.224.210");
// rabbitmq 服务端口
connectionFactory.setPort(5672);
// rabbitmq 账号
connectionFactory.setUsername("xiaoyuan");
// rabbitmq 密码
connectionFactory.setPassword("xiaoyuan");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建信息通道
Channel channel = connection.createChannel();
try {
/**
* 声明交换机
* arg1:交换机名称
* arg2:交换机类型 direct、fanout、topic、headers
* arg3:交换机是否要持久化 如果true,则交换机元数据持久化
* arg4: 交换机没有队列绑定时,是否删除;false不删除
* arg5: Map<String,Object>类型 设定交换机其他一些结构化参数,可为null
*/
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);
/**
* 声明队列
* arg1:队列名称
* arg2:队列是否持久化;仅仅是队列信息的持久化,而不是队列中消息的持久化
* arg3:队列是否私有化,若是私有的,只有创建它的应用程序才能消费
* arg4:队列在没有消费者订阅的情况下,是否自动删除
* arg5: 队列的一些结构化信息,如声明死信队列,磁盘队列
*/
channel.queueDeclare(queueName, true, false, false, null);
/**
* 将队列与交换机绑定
* arg1: 队列名称
* arg2:交换机名称
* arg3: 路由键,直连模式下为队列名称
*/
channel.queueBind(queueName, exchangeName, queueName);
// 创建消息
String message = "hello world";
/**
* 发送消息
* arg1:交换机名称
* arg2:路由键,直连模式下为队列名称
* arg3:其他参数信息
* arg4:消息
*/
channel.basicPublish(exchangeName, queueName, null, message.getBytes());
} finally {
// 释放管道资源
channel.close();
// 释放连接资源
connection.close();
}
}
consumer
/**
* 发送消息
*/
public static void consumerMessage() throws IOException, TimeoutException {
String queueName = "xiaoyuan-queue";
String exchangeName = "xiaoyuan-exchange";
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// rabbitmq 服务地址
connectionFactory.setHost("8.140.224.210");
// rabbitmq 服务端口
connectionFactory.setPort(5672);
// rabbitmq 账号
connectionFactory.setUsername("xiaoyuan");
// rabbitmq 密码
connectionFactory.setPassword("xiaoyuan");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建信息通道
Channel channel = connection.createChannel();
try {
// 接收消息的回调函数
DeliverCallback deliverCallback = (consumerTage,messgae) -> {
System.out.println("接收到的消息: " + new String(messgae.getBody()));
};
// 取消消息的回调函数
CancelCallback cancelCallback = consumerTage -> {
System.out.println("接收消息失败");
};
/**
* 消费消息
* arg1:消费队列名称
* arg2:消费成功后是否自动提交 ack机制; false-不进行提交;true-提交,消息从队列中移除
* arg3: 接收消息的回调函数
* arg4;取消消息的回调函数
*/
channel.basicConsume(queueName,false,deliverCallback,cancelCallback);
} finally {
// 释放管道资源
channel.close();
// 释放连接资源
connection.close();
}
}
交换机
Direct
原理
路由键与队列名完全匹配交换机,此种类型交换机,通过RoutingKey路由键将交换机和队列进行绑定,消息被发送到exchange时,需要根据消息的RoutingKey,来进行匹配,只将消息发送到完全匹配到此RoutingKey的队列。
例如:如果一个队列绑定到交换机要求路由键为“key”,则只转发RoutingKey标记为“key”的消息,不会转发”key1"。它是完全匹配、单播的模式
使用
producer
/**
* 发送消息 指定交换机 direct
*/
public static void sendMessageByExchangeTypeDirect() throws IOException, TimeoutException {
String queueName_1 = "xiaoyuan-queue-1";
String queueName_2 = "xiaoyuan-queue-2";
String queueName_3 = "xiaoyuan-queue-3";
String queueName_4 = "xiaoyuan-queue-4";
// 路由键
String key_1 = "key-1";
String key_3 = "key-3";
String key_4 = "key-4";
String exchangeName = "xiaoyuan-exchange";
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// rabbitmq 服务地址
connectionFactory.setHost("8.140.224.210");
// rabbitmq 服务端口
connectionFactory.setPort(5672);
// rabbitmq 账号
connectionFactory.setUsername("xiaoyuan");
// rabbitmq 密码
connectionFactory.setPassword("xiaoyuan");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建信息通道
Channel channel = connection.createChannel();
try {
/**
* 声明交换机
* arg1:交换机名称
* arg2:交换机类型 direct、fanout、topic、headers
* arg3:交换机是否要持久化 如果true,则交换机元数据持久化
* arg4: 交换机没有队列绑定时,是否删除;false不删除
* arg5: Map<String,Object>类型 设定交换机其他一些结构化参数,可为null
*/
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);
/**
* 声明队列
* arg1:队列名称
* arg2:队列是否持久化;仅仅是队列信息的持久化,而不是队列中消息的持久化
* arg3:队列是否私有化,若是私有的,只有创建它的应用程序才能消费
* arg4:队列在没有消费者订阅的情况下,是否自动删除
* arg5: 队列的一些结构化信息,如声明死信队列,磁盘队列
*/
channel.queueDeclare(queueName_1, true, false, false, null);
channel.queueDeclare(queueName_2, true, false, false, null);
channel.queueDeclare(queueName_3, true, false, false, null);
channel.queueDeclare(queueName_4, true, false, false, null);
/**
* 将队列与交换机绑定
* arg1: 队列名称
* arg2:交换机名称
* arg3: 路由键,直连模式下为队列名称
*/
channel.queueBind(queueName_1, exchangeName, key_1);
channel.queueBind(queueName_2, exchangeName, key_1);
channel.queueBind(queueName_3, exchangeName, key_3);
channel.queueBind(queueName_4, exchangeName, key_4);
// 创建消息
String message = "hello world";
/**
* 发送消息
* arg1:交换机名称
* arg2:路由键,直连模式下为队列名称
* arg3:其他参数信息
* arg4:消息
*/
channel.basicPublish(exchangeName, key_1, null, ("key_1:" + message).getBytes());
channel.basicPublish(exchangeName, key_1, null, ("key_1:" + message).getBytes());
channel.basicPublish(exchangeName, key_3, null, ("key_3:" + message).getBytes());
channel.basicPublish(exchangeName, key_4, null, ("key_4:" + message).getBytes());
} finally {
// 释放管道资源
channel.close();
// 释放连接资源
connection.close();
}
}
consumer
/**
* 发送消息
*/
public static void consumerMessageByExchangeTypeDirect() throws IOException, TimeoutException {
String queueName = "xiaoyuan-queue";
String exchangeName = "xiaoyuan-exchange";
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// rabbitmq 服务地址
connectionFactory.setHost("8.140.224.210");
// rabbitmq 服务端口
connectionFactory.setPort(5672);
// rabbitmq 账号
connectionFactory.setUsername("xiaoyuan");
// rabbitmq 密码
connectionFactory.setPassword("xiaoyuan");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建信息通道
Channel channel = connection.createChannel();
try {
// 接收消息的回调函数
DeliverCallback deliverCallback = (consumerTage,messgae) -> {
System.out.println("接收到的消息: " + new String(messgae.getBody()));
};
// 取消消息的回调函数
CancelCallback cancelCallback = consumerTage -> {
System.out.println("接收消息失败");
};
/**
* 消费消息
* arg1:消费队列名称
* arg2:消费成功后是否自动提交 ack机制; false-不进行提交;true-提交,消息从队列中移除
* arg3: 接收消息的回调函数
* arg4;取消消息的回调函数
*/
channel.basicConsume("xiaoyuan-queue-1",true,deliverCallback,cancelCallback);
channel.basicConsume("xiaoyuan-queue-2",true,deliverCallback,cancelCallback);
channel.basicConsume("xiaoyuan-queue-3",true,deliverCallback,cancelCallback);
channel.basicConsume("xiaoyuan-queue-4",true,deliverCallback,cancelCallback);
} finally {
// 释放管道资源
channel.close();
// 释放连接资源
connection.close();
}
}
Fanout
原理
Fanout,扇出类型交换机,此种交换机,会将消息分发给所有绑定了此交换机的队列,此时RoutingKey参数无效。
fanout类型交换机下发送消息一条,无论RoutingKey是什么,queue1,queue2,queue3,queue4都可以收到消息
使用
producer
/**
* 发送消息 指定交换机 fanout
*/
public static void sendMessageByExchangeTypeFanout() throws IOException, TimeoutException {
String queueName_1 = "xiaoyuan-queue-fanout-1";
String queueName_2 = "xiaoyuan-queue-fanout-2";
String queueName_3 = "xiaoyuan-queue-fanout-3";
String queueName_4 = "xiaoyuan-queue-fanout-4";
String key_1 = "key-1";
String key_2 = "key-2";
String key_3 = "key-3";
String key_4 = "key-4";
String exchangeName = "xiaoyuan-exchange-fanout";
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// rabbitmq 服务地址
connectionFactory.setHost("8.140.224.210");
// rabbitmq 服务端口
connectionFactory.setPort(5672);
// rabbitmq 账号
connectionFactory.setUsername("xiaoyuan");
// rabbitmq 密码
connectionFactory.setPassword("xiaoyuan");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建信息通道
Channel channel = connection.createChannel();
try {
/**
* 声明交换机
* arg1:交换机名称
* arg2:交换机类型 direct、fanout、topic、headers
* arg3:交换机是否要持久化 如果true,则交换机元数据持久化
* arg4: 交换机没有队列绑定时,是否删除;false不删除
* arg5: Map<String,Object>类型 设定交换机其他一些结构化参数,可为null
*/
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, null);
/**
* 声明队列
* arg1:队列名称
* arg2:队列是否持久化;仅仅是队列信息的持久化,而不是队列中消息的持久化
* arg3:队列是否私有化,若是私有的,只有创建它的应用程序才能消费
* arg4:队列在没有消费者订阅的情况下,是否自动删除
* arg5: 队列的一些结构化信息,如声明死信队列,磁盘队列
*/
channel.queueDeclare(queueName_1, true, false, false, null);
channel.queueDeclare(queueName_2, true, false, false, null);
channel.queueDeclare(queueName_3, true, false, false, null);
channel.queueDeclare(queueName_4, true, false, false, null);
/**
* 将队列与交换机绑定
* arg1: 队列名称
* arg2:交换机名称
* arg3: 路由键,直连模式下为队列名称
*/
channel.queueBind(queueName_1, exchangeName, key_1);
channel.queueBind(queueName_2, exchangeName, key_2);
channel.queueBind(queueName_3, exchangeName, key_3);
channel.queueBind(queueName_4, exchangeName, key_4);
// 创建消息
String message = "hello world";
/**
* 发送消息
* arg1:交换机名称
* arg2:路由键,直连模式下为队列名称
* arg3:其他参数信息
* arg4:消息
*/
channel.basicPublish(exchangeName, key_1, null, ("fanout key_1:" + message).getBytes());
} finally {
// 释放管道资源
channel.close();
// 释放连接资源
connection.close();
}
}
consumer
/**
* 发送消息
*/
public static void consumerMessageByExchangeTypeFanout() throws IOException, TimeoutException {
String queueName_1 = "xiaoyuan-queue-fanout-1";
String queueName_2 = "xiaoyuan-queue-fanout-2";
String queueName_3 = "xiaoyuan-queue-fanout-3";
String queueName_4 = "xiaoyuan-queue-fanout-4";
String exchangeName = "xiaoyuan-exchange-fanout";
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// rabbitmq 服务地址
connectionFactory.setHost("8.140.224.210");
// rabbitmq 服务端口
connectionFactory.setPort(5672);
// rabbitmq 账号
connectionFactory.setUsername("xiaoyuan");
// rabbitmq 密码
connectionFactory.setPassword("xiaoyuan");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建信息通道
Channel channel = connection.createChannel();
try {
// 接收消息的回调函数
DeliverCallback deliverCallback = (consumerTage,messgae) -> {
System.out.println("接收到的消息: " + new String(messgae.getBody()));
};
// 取消消息的回调函数
CancelCallback cancelCallback = consumerTage -> {
System.out.println("接收消息失败");
};
/**
* 消费消息
* arg1:消费队列名称
* arg2:消费成功后是否自动提交 ack机制; false-不进行提交;true-提交,消息从队列中移除
* arg3: 接收消息的回调函数
* arg4;取消消息的回调函数
*/
channel.basicConsume(queueName_1,true,deliverCallback,cancelCallback);
channel.basicConsume(queueName_2,true,deliverCallback,cancelCallback);
channel.basicConsume(queueName_3,true,deliverCallback,cancelCallback);
channel.basicConsume(queueName_4,true,deliverCallback,cancelCallback);
} finally {
// 释放管道资源
channel.close();
// 释放连接资源
connection.close();
}
}
Topic
原理
Topic主题类型交换机,此种交换机与Direct类似,也是需要通过routingkey路由键进行匹配分发,区别在于Topic可以进行模糊匹配,Direct是完全匹配。
- “.”:来分为多个部分(类似于逗号分隔)
- “*”:代表一个部分
- “#”:代表0个或多个部分(如果绑定的路由键为"#"时,则接受所有消息,因为路由键所有都匹配)
当发送一条信息,routingkey为”key1.key2.key3.key4",那么根据"."将这个路由键分为了4个部分,此条路由键将会匹配:
- key1.key2.key3.* :模糊补充上一个部分,key4;
- key1.# :模糊补充上多个部分,key2,key3,key4;
- .key2..key4 :模糊补充上一部分,key1和key3;
- #.key3.key4 :模糊补充上多部分,key1,key2;
使用
producer
/**
* 发送消息 指定交换机 topic
*/
public static void sendMessageByExchangeTypeTopic() throws IOException, TimeoutException {
String queueName_1 = "xiaoyuan-queue-topic-1";
String queueName_2 = "xiaoyuan-queue-topic-2";
String queueName_3 = "xiaoyuan-queue-topic-3";
String queueName_4 = "xiaoyuan-queue-topic-4";
String key_1 = "key1.key2.key3.*";
String key_2 = "key1.#";
String key_3 = "*.key2.*.key4";
String key_4 = "#.key3.key4";
String exchangeName = "xiaoyuan-exchange-topic";
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// rabbitmq 服务地址
connectionFactory.setHost("8.140.224.210");
// rabbitmq 服务端口
connectionFactory.setPort(5672);
// rabbitmq 账号
connectionFactory.setUsername("xiaoyuan");
// rabbitmq 密码
connectionFactory.setPassword("xiaoyuan");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建信息通道
Channel channel = connection.createChannel();
try {
/**
* 声明交换机
* arg1:交换机名称
* arg2:交换机类型 direct、fanout、topic、headers
* arg3:交换机是否要持久化 如果true,则交换机元数据持久化
* arg4: 交换机没有队列绑定时,是否删除;false不删除
* arg5: Map<String,Object>类型 设定交换机其他一些结构化参数,可为null
*/
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, null);
/**
* 声明队列
* arg1:队列名称
* arg2:队列是否持久化;仅仅是队列信息的持久化,而不是队列中消息的持久化
* arg3:队列是否私有化,若是私有的,只有创建它的应用程序才能消费
* arg4:队列在没有消费者订阅的情况下,是否自动删除
* arg5: 队列的一些结构化信息,如声明死信队列,磁盘队列
*/
channel.queueDeclare(queueName_1, true, false, false, null);
channel.queueDeclare(queueName_2, true, false, false, null);
channel.queueDeclare(queueName_3, true, false, false, null);
channel.queueDeclare(queueName_4, true, false, false, null);
/**
* 将队列与交换机绑定
* arg1: 队列名称
* arg2:交换机名称
* arg3: 路由键,直连模式下为队列名称
*/
channel.queueBind(queueName_1, exchangeName, key_1);
channel.queueBind(queueName_2, exchangeName, key_2);
channel.queueBind(queueName_3, exchangeName, key_3);
channel.queueBind(queueName_4, exchangeName, key_4);
// 创建消息
String message = "hello world";
/**
* 发送消息
* arg1:交换机名称
* arg2:路由键,直连模式下为队列名称
* arg3:其他参数信息
* arg4:消息
*/
channel.basicPublish(exchangeName, "key1.key2.key3.key4", null, ("topic key_1:" + message).getBytes());
} finally {
// 释放管道资源
channel.close();
// 释放连接资源
connection.close();
}
}
consumer
/**
* 发送消息
*/
public static void consumerMessageByExchangeTypeTopic() throws IOException, TimeoutException {
String queueName_1 = "xiaoyuan-queue-topic-1";
String queueName_2 = "xiaoyuan-queue-topic-2";
String queueName_3 = "xiaoyuan-queue-topic-3";
String queueName_4 = "xiaoyuan-queue-topic-4";
String exchangeName = "xiaoyuan-exchange-topic";
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// rabbitmq 服务地址
connectionFactory.setHost("8.140.224.210");
// rabbitmq 服务端口
connectionFactory.setPort(5672);
// rabbitmq 账号
connectionFactory.setUsername("xiaoyuan");
// rabbitmq 密码
connectionFactory.setPassword("xiaoyuan");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建信息通道
Channel channel = connection.createChannel();
try {
// 接收消息的回调函数
DeliverCallback deliverCallback = (consumerTage,messgae) -> {
System.out.println("接收到的消息: " + new String(messgae.getBody()));
};
// 取消消息的回调函数
CancelCallback cancelCallback = consumerTage -> {
System.out.println("接收消息失败");
};
/**
* 消费消息
* arg1:消费队列名称
* arg2:消费成功后是否自动提交 ack机制; false-不进行提交;true-提交,消息从队列中移除
* arg3: 接收消息的回调函数
* arg4;取消消息的回调函数
*/
channel.basicConsume(queueName_1,true,deliverCallback,cancelCallback);
channel.basicConsume(queueName_2,true,deliverCallback,cancelCallback);
channel.basicConsume(queueName_3,true,deliverCallback,cancelCallback);
channel.basicConsume(queueName_4,true,deliverCallback,cancelCallback);
} finally {
// 释放管道资源
channel.close();
// 释放连接资源
connection.close();
}
}
Headers
原理
headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
消费方指定的headers中必须包含一个“x-match"的键。
键"x-match"的值有2个
- x-match= all :表示所有的键值对都匹配才能接受到消息
- x-match =any:表示只要有键值对匹配就能接受到消息
注:在消费者测进行交换机与队列的绑定,先启动生产者,创建交换机&队列,再启动消费者绑定
当发送消息的其它参数为{“name”:“小明123”,“age”:10}时,queue-1绑定的是匹配方式是all,全匹配,无法发送到queue-1中;但是queue-2绑定的是非全匹配,则可以发送到queue-2中。
使用
producer
/**
* 发送消息 指定交换机 headers
*/
public static void sendMessageByExchangeTypeHeaders() throws IOException, TimeoutException {
String queueName_1 = "xiaoyuan-queue-headers-1";
String exchangeName = "xiaoyuan-exchange-headers";
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// rabbitmq 服务地址
connectionFactory.setHost("8.140.224.210");
// rabbitmq 服务端口
connectionFactory.setPort(5672);
// rabbitmq 账号
connectionFactory.setUsername("xiaoyuan");
// rabbitmq 密码
connectionFactory.setPassword("xiaoyuan");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建信息通道
Channel channel = connection.createChannel();
try {
/**
* 声明交换机
* arg1:交换机名称
* arg2:交换机类型 direct、fanout、topic、headers
* arg3:交换机是否要持久化 如果true,则交换机元数据持久化
* arg4: 交换机没有队列绑定时,是否删除;false不删除
* arg5: Map<String,Object>类型 设定交换机其他一些结构化参数,可为null
*/
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.HEADERS, true, false, null);
/**
* 声明队列
* arg1:队列名称
* arg2:队列是否持久化;仅仅是队列信息的持久化,而不是队列中消息的持久化
* arg3:队列是否私有化,若是私有的,只有创建它的应用程序才能消费
* arg4:队列在没有消费者订阅的情况下,是否自动删除
* arg5: 队列的一些结构化信息,如声明死信队列,磁盘队列
*/
channel.queueDeclare(queueName_1, true, false, false, null);
Map<String,Object> headerMap = new HashMap<>();
headerMap.put("name","小明");
headerMap.put("age",10);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties().builder().headers(headerMap);
// 创建消息
String message = "hello world";
/**
* 发送消息
* arg1:交换机名称
* arg2:路由键,直连模式下为队列名称
* arg3:其他参数信息
* arg4:消息
*/
channel.basicPublish(exchangeName, "", props.build(), ("header:" + message).getBytes());
} finally {
// 释放管道资源
channel.close();
// 释放连接资源
connection.close();
}
}
consumer
/**
* 发送消息
*/
public static void consumerMessageByExchangeTypeHeaders() throws IOException, TimeoutException {
String queueName_1 = "xiaoyuan-queue-headers-1";
String exchangeName = "xiaoyuan-exchange-headers";
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// rabbitmq 服务地址
connectionFactory.setHost("8.140.224.210");
// rabbitmq 服务端口
connectionFactory.setPort(5672);
// rabbitmq 账号
connectionFactory.setUsername("xiaoyuan");
// rabbitmq 密码
connectionFactory.setPassword("xiaoyuan");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建信息通道
Channel channel = connection.createChannel();
try {
// 接收消息的回调函数
DeliverCallback deliverCallback = (consumerTage,messgae) -> {
System.out.println("接收到的消息: " + new String(messgae.getBody()));
};
// 取消消息的回调函数
CancelCallback cancelCallback = consumerTage -> {
System.out.println("接收消息失败");
};
Map<String,Object> headerMap = new HashMap<>();
headerMap.put("x-match","any");
headerMap.put("name","小明");
headerMap.put("age",10);
/**
* 将队列与交换机绑定
* arg1: 队列名称
* arg2:交换机名称
* arg3: 路由键,直连模式下为队列名称
* arg4:headeraMap
*/
channel.queueBind(queueName_1, exchangeName,"",headerMap);
/**
* 消费消息
* arg1:消费队列名称
* arg2:消费成功后是否自动提交 ack机制; false-不进行提交;true-提交,消息从队列中移除
* arg3: 接收消息的回调函数
* arg4;取消消息的回调函数
*/
channel.basicConsume(queueName_1,true,deliverCallback,cancelCallback);
} finally {
// 释放管道资源
channel.close();
// 释放连接资源
connection.close();
}
}