当前位置: 首页 > article >正文

RabbitMQ系列学习笔记(三)--工作队列模式

文章目录

  • 一、工作队列模式原理
  • 二、工作队列模式实战
    • 1、抽取工具类
    • 2、消费者代码
    • 3、生产者代码
    • 4、查看运行结果

本文参考
尚硅谷RabbitMQ教程丨快速掌握MQ消息中间件rabbitmq
RabbitMQ 详解
Centos7环境安装Erlang、RabbitMQ详细过程(配图)

一、工作队列模式原理

image.png
与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也是使用 direct 交换机,应用于处理消息较多的情况,对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
工作队列模式实际上是一种竞争关系的模式,多个消费者之间是竞争关系,即一条消息如果被某个消费者消费了,那么其他的消费者就获取不到了。特点如下:

  • 一个队列对应多个消费者。
  • 一条消息只会被一个消费者消费。
  • 消息队列默认采用 轮询 的方式将消息平均发送给消费者。

二、工作队列模式实战

1、抽取工具类

通过前边简单模式的实践发现,其实无论是生产者还是消费者,在代码实现中是有共通点的,我们可以将前期建立连接获取信道的代码片段抽取出来,形成一个工具类。

public class RabbitMqUtil {
	//得到一个连接的 channel
	public static Channel getChannel() throws Exception {
    	//创建一个连接工厂并建立连接 
        ConnectionFactory factory = new ConnectionFactory(); 
        factory.setHost("192.168.222.139"); 
    	factory.setUsername("admin"); 
        factory.setPassword("scr1pt_yang.");
    	Connection connection = factory.newConnection(); 
        Channel channel = connection.createChannel(); 
    	return channel;
	}
}

2、消费者代码

工作队列模式的消费者与简单模式的消费者代码实现上没有区别,只是消费者数量增加了。

/**
 * Description: 工作队列模式消费者01
 */
public class Consumer01 {
    //队列名称
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        // 创建队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println("C1等待接收消息.....");
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("接收到消息: " + message);
            }
        });
    }
}
/**
 * Description: 工作队列模式消费者02
 */
public class Consumer01 {
    //队列名称
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        System.out.println("C2等待接收消息.....");
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("接收到消息: " + message);
            }
        });
    }
}

3、生产者代码

生产者代码逻辑比较简单,获取到channel以后直接通过channel.basicPublish()发送消息即可。

/**
 * Description: 工作队列模式生产者
 */
public class Producer {
    //队列名称
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {
        // 1.通过工具类建立信道
        Channel channel = RabbitMqUtil.getChannel();
        // 2.从控制台获取输入,并将其作为消息发送消息队列
        Scanner scanner = new Scanner(System.in);
		while (scanner.hasNext()){
			String message = scanner.next(); 
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); 
			System.out.println("生产者发出消息:"+message);
		}
        // 3.关闭资源
        channel.close();
    }
}

4、查看运行结果

先将消费者启动,因为消费者1的代码中负责声明了work_queue队列,再启动生产者进行消息的发送。image.pngimage.pngimage.png
通过上述执行结果可以发现在工作队列工作模式下,消息队列是通过轮询的方式将消息轮流发送给所有消费者,当第一条消息到达消费者1时,消费者2则不能接收到这条消息,同理反之亦然。两者接受到的消息是互斥的。


http://www.kler.cn/news/359921.html

相关文章:

  • 基于卷积神经网络和 Swin Transformer 的图像处理模型
  • Docker设置日志滚动
  • restrict是如何限定指针访问的?C语言必须用.c为扩展名吗?为什么C系语言很流行?哪些语言可以称为C系语言?
  • GLSL(OpenGL Shading Language)学习路线
  • Python | Leetcode Python题解之第498题对角线遍历
  • Ratkins Army Pack 死亡军团骑士战士游戏角色
  • 2024年10月21日可以使用的微信小程序官方获取头像和姓名
  • 函数的力量:掌握C语言的基石
  • CommonJS 和 ES modules
  • vector和list
  • 基于AIACC加速器快速实现LLaMA-7B指令微调
  • 写好英文邮件的技巧,如何结构化表达?
  • OpenLayers:用于在 web 应用程序中创建互动地图
  • 阿里巴巴达摩院|Chain of Ideas: 利用大型语言模型代理革新新颖创意开发的研究
  • Python Flask 接收前端上传的图片
  • C++11 thread,mutex,condition_variable,atomic,原子操作CAS,智能指针线程安全,单例模式最简单实现方式
  • Redis 数据类型Streams
  • performance.timing
  • OpenCV坐标系统与图像处理案例
  • ActivationType, Pool, ModelType(helpers文件中的classes.py)