RabbitMQ深度探索:五种消息模式
-
RabbitMQ 工作队列:
- 默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳
- 采用工作队列模式:
- 在通道中只需要设置 baseicQos 的值即可
- 表示 MQ 服务器每次只会给消费者推送 n 条消息
- 必须手动应答之后才会继续发送
- 在通道中只需要设置 baseicQos 的值即可
- 生产者:
public class ProducerFanout { private static final String QUEUE_NAME = "BoyatopMamber"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { // 1.创建新的连接 Connection connection = RabbitMQConnection.getConnection(); // 2.设置 channel Channel channel = connection.createChannel(); for (int i = 0; i < 10; i++) { // 3.发送消息 String msg = "Hello my Bro" + i; channel.confirmSelect(); channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); boolean result = channel.waitForConfirms(); } // 4.关闭资源 channel.close(); connection.close(); } }
- 消费者1:
public class Consumer1 { //定义队列 private static final String QUEUE_NAME = "BoyatopMamber"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接 Connection connection = RabbitMQConnection.getConnection(); //创建通道 final Channel channel = connection.createChannel(); channel.basicQos(1); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("用户1:" + msg); channel.basicAck(envelope.getDeliveryTag(),false); } }; //监听消息 channel.basicConsume(QUEUE_NAME,false,defaultConsumer); } }
- 消费者2:
public class Consumer2 { //定义队列 private static final String QUEUE_NAME = "BoyatopMamber"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接 Connection connection = RabbitMQConnection.getConnection(); //创建通道 final Channel channel = connection.createChannel(); channel.basicQos(3); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("消费者2:" + msg); channel.basicAck(envelope.getDeliveryTag(),false); } }; //监听消息 channel.basicConsume(QUEUE_NAME,false,defaultConsumer); } }
- 默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳
-
RabbitMQ 交换机类型:
- Direct exchange:直连交换机
- Fanout exchange:扇形交换机
- Topic exchange:主体交换机
- Headers exchange:头交换机
- Virtual Hostos:区分不同的团队
- 队列:存放消息
- 交换机:路由消息存放在那个队列中,类似于 Nginx
- 路由:key 分发规则
-
RabbitMQ Fanout 发布订阅:
- 生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者
- 步骤:
- 需要创建两个队列,每个队列都对应一个消费者
- 队列需要绑定我们交换机
- 生产者投递消息到交换机中,交换机再将消息分配给两个队列中都存放起来
- 消费者从队列中获取消息
- 生产者:
public class OrderConsumer { //定义队列 private static final String QUEUE_NAME = "BoyatopOrder"; //定义交换机 private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { ///创建连接 Connection connection = RabbitMQConnection.getConnection(); //创建通道 Channel channel = connection.createChannel(); //关联队列 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("订单接收:" + msg); } }; //监听消息 channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
- 数据消费者:
public class MamConsumer { //定义队列 private static final String QUEUE_NAME = "BoyatopMamber"; //定义交换机的名称 private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接 Connection connection = RabbitMQConnection.getConnection(); //创建通道 final Channel channel = connection.createChannel(); //关联队列 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("消费者接收:" + msg); } }; //开启监听 channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
- 订单消费者:
public class OrderConsumer { //定义队列 private static final String QUEUE_NAME = "BoyatopOrder"; //定义交换机 private static final String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { ///创建连接 Connection connection = RabbitMQConnection.getConnection(); //创建通道 Channel channel = connection.createChannel(); //关联队列 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("订单接收:" + msg); } }; //监听消息 channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
- 生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者
-
Direct 路由模式:
- 当交换机类型为 direct 类型时,根据队列绑定的路由转发到具体的队列中存放消息
- 生产者:
public class Producer { //定义交换机 private static final String EXCHANGE_NAME = "newDirect_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接 Connection connection = RabbitMQConnection.getConnection(); //创建 channel Channel channel = connection.createChannel(); //通道关联交换机 channel.exchangeDeclare(EXCHANGE_NAME,"direct",true); //发送消息 for (int i = 0; i < 10; i++) { String msg = "生产消息 --- 路由模式"; System.out.println(msg + i); channel.basicPublish(EXCHANGE_NAME,"mail",null,msg.getBytes()); channel.basicPublish(EXCHANGE_NAME,"sms",null,msg.getBytes()); } channel.close(); connection.close(); } }
- 邮件消费者:
public class MailConsumer { //定义交换机 private static final String EXCHANGE_NAME = "newDirect_exchange"; //定义队列 private static final String QUEUE_NAME = "newDirectqueueOne"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接 Connection connection = RabbitMQConnection.getConnection(); //创建 channel Channel channel = connection.createChannel(); //关联队列 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"mail"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("邮件消费者接收信息:" + msg); } }; //监听队列 channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
- 短信消费者:
public class SmsConsumer { //定义交换机 private static final String EXCHANGE_NAME = "newDirect_exchange"; //定义队列 private static final String QUEUE_NAME = "newDirectqueueTwo"; public static void main(String[] args) throws IOException, TimeoutException { //创建连接 Connection connection = RabbitMQConnection.getConnection(); //创建 channel Channel channel = connection.createChannel(); //关联队列 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"sms"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("短信消费者接收消息:" + msg); } }; //监听队列 channel.basicConsume(QUEUE_NAME,true,defaultConsumer); } }
- 当交换机类型为 direct 类型时,根据队列绑定的路由转发到具体的队列中存放消息
-
Topic 主体模式:
- 当交换机类型为 topic 类型时,根据队列绑定的路由键模糊转发到具体队列中存放
- #:表示支持匹配多个词
- *:表示只能匹配一个词