当前位置: 首页 > article >正文

RabbitMQ Work Queues (工作队列模式) 使用案例

Hi~!这里是奋斗的明志,很荣幸您能阅读我的文章,诚请评论指点,欢迎欢迎 ~~
🌱🌱个人主页:奋斗的明志
🌱🌱所属专栏:RabbitMQ

📚本系列文章为个人学习笔记,在这里撰写成文一为巩固知识,二为展示我的学习过程及理解。文笔、排版拙劣,望见谅。

在这里插入图片描述

Work Queues 工作队列

  • 前言
  • Work Queues (工作队列)
    • 1、引入依赖
    • 2、编写生产者代码
    • 3、编写消费者代码
    • 4、运行程序, 观察结果

前言

在前面学习了简单模式的写法, 接下来学习另外几种工作模式的写法
简单模式
快速入门程序就是简单模式.
在这里插入图片描述

默契之舞 之 生产者消费者模式(RabbitMQ)

Work Queues (工作队列)

简单模式的增强版, 和简单模式的区别就是: 简单模式有一个消费者, 工作队列模式支持多个消费者接收消息, 消费者之间是竞争关系, 每个消息只能被一个消费者接收

在这里插入图片描述

1、引入依赖


<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

2、编写生产者代码

工作队列模式和简单模式区别是有多个消费者, 所以生产者消费者代码差异不大
相比简单模式, 生产者的代码基本⼀样, 为了能看到多个消费者竞争的关系, 我们一次发送10条消息
我们把发送消息的地方, 改为一发送10条消息


for (int i = 0; i < 10; i++) {
    String mag = "hello work queue......" + i;
    channel.basicPublish("", Constants.WORK_QUEUE, null, mag.getBytes());
}

Constant 包下 Constants 类

package rabbitmq.constant;

public class Constants {
    public static final String HOST = "123.57.16.61";
    public static final Integer PORT = 5672;
    public static final String USERNAME = "study";
    public static final String PASSWORD = "study";
    public static final String VIRTUAL_HOST = "bite";
    //声明一个工作队列
    public static final String WORK_QUEUE = "work.queue";
}

完整代码:

package rabbitmq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


/**
 * 工作队列模式
 */
public class Prooducer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USERNAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        //创建一个新的连接
        Connection connection = connectionFactory.newConnection();
        //开启一个通信
        Channel channel = connection.createChannel();
        //声明交换机,使用内置的交换机
        //声明一个队列
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
        //发送消息
        for (int i = 0; i < 10; i++) {
            String mag = "hello work queue......" + i;
            channel.basicPublish("", Constants.WORK_QUEUE, null, mag.getBytes());
        }
        System.out.println("消息发送成功~");
        //资源释放
        channel.close();
        connection.close();
    }
}

在这里插入图片描述

3、编写消费者代码

消费者代码和简单模式⼀样, 只是复制两份. 两个消费者代码可以是⼀样的

消费者1:

package rabbitmq.work;

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USERNAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        //创建一个新的连接
        Connection connection = connectionFactory.newConnection();
        //开启一个通信
        Channel channel = connection.createChannel();
        //声明一个队列,如果队列不存在,则创建,如果队列存在,则不创建
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
        //消费消息
        //消费的逻辑
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受到消息:" + new String(body));
            }
        };
        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
//        channel.close();
//        connection.close();

    }
}

在这里插入图片描述

消费者2:

package rabbitmq.work;

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建一个连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USERNAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        //创建一个新的连接
        Connection connection = connectionFactory.newConnection();
        //开启一个通信
        Channel channel = connection.createChannel();
        //声明一个队列,如果队列不存在,则创建,如果队列存在,则不创建
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
        //消费消息
        //消费的逻辑
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接受到消息:" + new String(body));
            }
        };
        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
//        channel.close();
//        connection.close();
    }
}

4、运行程序, 观察结果

先启动两个消费者运行, 再启动生产者
如果先启动⽣产者, 在启动消费者, 由于消息较少, 处理较快, 那么第⼀个启动的消费者就会瞬间把10条
消息消费掉, 所以我们先启动两个消费者, 再启动生产者

在这里插入图片描述


在这里插入图片描述


观察RabbitMQ客户端:

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述

启动生产者:

在这里插入图片描述


在这里插入图片描述


在这里插入图片描述

在这里插入图片描述

在这里插入图片描述


http://www.kler.cn/a/441819.html

相关文章:

  • 基础jjj
  • 【Linux系统编程】—— 从零开始实现一个简单的自定义Shell
  • HarmonyOS NEXT:华为分享-碰一碰开发分享
  • Dockerfile另一种使用普通用户启动的方式
  • k8s集群换IP
  • WGAN - 瓦萨斯坦生成对抗网络
  • leetcode--字符串
  • Git工具
  • 【人工智能-中级】卷积神经网络(CNN)的中阶应用:从图像分类到目标检测
  • Win7上安装node.js(v18.16.0),并创建vue3项目
  • [STM32]从零开始的cube IDE安装与配置教程
  • Android AOSP 源码中批量替换“phone“为“tablet“的命令详解
  • stable diffusion学习01
  • nodeJS——Mongoose基础操作
  • SM4笔记整理
  • 深度学习之 Deep Video Super-Resolution (VSRNet)
  • 黑神话无缘TGA2024年度游戏的原因
  • Redis经典面试题
  • [免费]SpringBoot+Vue校园社团管理系统(优质版)【论文+源码+SQL脚本】
  • centos7扩容ext4文件系统类型硬盘
  • element中input框添加@keyup.enter.native,按enter后刷新页面
  • MySQL数据库备份,恢复
  • EasyExcel 动态设置表格的背景颜色和排列
  • Ubuntu 上cutecom使用指南
  • 【Mysql优化】EXPLAIN 返回列详解:深入 SQL 查询优化的工具
  • 复习打卡MySQL篇03