当前位置: 首页 > article >正文

RabbitMQ(三)

RabbitMQ中的各模式及其用法

  • 工作队列模式
    • 一、生产者代码
      • 1、封装工具类
      • 2、编写代码
      • 3、发送消息效果
    • 二、消费者代码
      • 1、编写代码
      • 2、运行效果
  • 发布订阅模式
    • 一、生产者代码
    • 二、消费者代码
      • 1、消费者1号
      • 2、消费者2号
    • 三、运行效果
    • 四、小结
  • 路由模式
    • 一、生产者代码
    • 二、消费者代码
      • 1、消费者1号
      • 2、消费者2号
    • 三、运行结果
      • 1、绑定关系
      • 2、消费消息
  • 主题模式
    • 一、生产者代码
    • 二、消费者代码
    • 1、消费者1号
    • 2、消费者2号
    • 三、运行效果
  • 总结

工作队列模式

一、生产者代码

新建一个module,在module下创建属于自己的包,并且创建一个名为“work”的子包,以及工具类包“util”。结构如图所示:
在这里插入图片描述
在pom文件中添加图中所示依赖:

<dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.20.0</version>
        </dependency>
    </dependencies>

此时准备工作基本完成。

1、封装工具类

修改rabbitMQ地址,替换为自己的。

package com.xxx.rabbitmq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @ClassName: ConnectionUtil
 * @Package: com.xxx.rabbitmq.util
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class ConnectionUtil {
    public static final String HOST_ADDRESS = "192.168.xxx.xxx";

    public static Connection getConnection() throws Exception {

        // 定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        // 设置服务地址
        factory.setHost(HOST_ADDRESS);

        // 端口
        factory.setPort(5672);

        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("123456");

        // 通过工程获取连接
        Connection connection = factory.newConnection();

        return connection;
    }



    public static void main(String[] args) throws Exception {

        Connection con = ConnectionUtil.getConnection();

        // amqp://guest@192.168.xxx.xxx:5672/
        System.out.println(con);

        con.close();

    }

}

2、编写代码

新建生产者类Producer:

package com.xxx.rabbitmq.work;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @ClassName: Producer
 * @Package: com.xxx.rabbitmq.work
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Producer {
    public static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        for (int i = 1; i <= 10; i++) {

            String body = i+"hello rabbitmq~~~";

            channel.basicPublish("",QUEUE_NAME,null,body.getBytes());

        }

        channel.close();

        connection.close();

    }
}

3、发送消息效果

在这里插入图片描述

二、消费者代码

1、编写代码

创建Consumer1和Consumer2。Consumer2只是类名和打印提示不同,代码完全一样。
Consumer1:

package com.xxx.rabbitmq.work;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: Consumer1
 * @Package: com.xxx.rabbitmq.work
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Consumer1 {
    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("Consumer1 body:"+new String(body));

            }

        };

        channel.basicConsume(QUEUE_NAME,true,consumer);

    }
}

Consumer2:

package com.xxx.rabbitmq.work;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: Consumer2
 * @Package: com.xxx.rabbitmq.work
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Consumer2 {
    static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("Consumer2 body:"+new String(body));

            }

        };

        channel.basicConsume(QUEUE_NAME,true,consumer);

    }
}

** 注意:**
运行的时候先启动两个消费端程序,然后再启动生产者端程序。
如果已经运行过生产者程序,则手动把work_queue队列删掉。

2、运行效果

最终两个消费端程序竞争结果如下:
在这里插入图片描述
在这里插入图片描述
这样就完成了工作队列模式的演示。

发布订阅模式

一、生产者代码

还是在上面的module内,新建一个名为fanout的子包,在包内创建Producer类:

package com.xxx.rabbitmq.fanout;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @ClassName: Producer
 * @Package: com.xxx.rabbitmq.fanout
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Producer {
    public static void main(String[] args) throws Exception {

        // 1、获取连接
        Connection connection = ConnectionUtil.getConnection();

        // 2、创建频道
        Channel channel = connection.createChannel();

        // 参数1. exchange:交换机名称
        // 参数2. type:交换机类型
        //     DIRECT("direct"):定向
        //     FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。
        //     TOPIC("topic"):通配符的方式
        //     HEADERS("headers"):参数匹配
        // 参数3. durable:是否持久化
        // 参数4. autoDelete:自动删除
        // 参数5. internal:内部使用。一般false
        // 参数6. arguments:其它参数
        String exchangeName = "test_fanout";

        // 3、创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);

        // 4、创建队列
        String queue1Name = "test_fanout_queue1";
        String queue2Name = "test_fanout_queue2";

        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        // 5、绑定队列和交换机
        // 参数1. queue:队列名称
        // 参数2. exchange:交换机名称
        // 参数3. routingKey:路由键,绑定规则
        //     如果交换机的类型为fanout,routingKey设置为""
        channel.queueBind(queue1Name,exchangeName,"");
        channel.queueBind(queue2Name,exchangeName,"");

        String body = "日志信息:张三调用了findAll方法...日志级别:info...";

        // 6、发送消息
        channel.basicPublish(exchangeName,"",null,body.getBytes());

        // 7、释放资源
        channel.close();
        connection.close();

    }
}

二、消费者代码

1、消费者1号

package com.xxx.rabbitmq.fanout;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: Consumer1
 * @Package: com.xxx.rabbitmq.fanout
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Consumer1 {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String queue1Name = "test_fanout_queue1";

        channel.queueDeclare(queue1Name,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body:"+new String(body));
                System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");

            }

        };

        channel.basicConsume(queue1Name,true,consumer);

    }
}

2、消费者2号

package com.xxx.rabbitmq.fanout;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: Consumer2
 * @Package: com.xxx.rabbitmq.fanout
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Consumer2 {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String queue2Name = "test_fanout_queue2";

        channel.queueDeclare(queue2Name,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body:"+new String(body));
                System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");

            }

        };

        channel.basicConsume(queue2Name,true,consumer);

    }
}

三、运行效果

先启动消费者,然后再运行生产者程序发送消息:
在这里插入图片描述
在这里插入图片描述

四、小结

交换机和队列的绑定关系如下图所示:
在这里插入图片描述
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别:

  • 工作队列模式本质上是绑定默认交换机
  • 发布订阅模式绑定指定交换机
  • 监听同一个队列的消费端程序彼此之间是竞争关系
  • 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息

路由模式

一、生产者代码

新建子包routing,并新建Producer类:

package com.xxx.rabbitmq.routing;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @ClassName: Producer
 * @Package: com.xxx.rabbitmq.routing
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Producer {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_direct";

        // 创建交换机
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);

        // 创建队列
        String queue1Name = "test_direct_queue1";
        String queue2Name = "test_direct_queue2";

        // 声明(创建)队列
        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        // 队列绑定交换机
        // 队列1绑定error
        channel.queueBind(queue1Name,exchangeName,"error");

        // 队列2绑定info error warning
        channel.queueBind(queue2Name,exchangeName,"info");
        channel.queueBind(queue2Name,exchangeName,"error");
        channel.queueBind(queue2Name,exchangeName,"warning");

        String message = "日志信息:张三调用了delete方法.错误了,日志级别error";

        // 发送消息
        channel.basicPublish(exchangeName,"error",null,message.getBytes());
        System.out.println(message);

        // 释放资源
        channel.close();
        connection.close();

    }
}

二、消费者代码

1、消费者1号

package com.xxx.rabbitmq.routing;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: Consumer1
 * @Package: com.xxx.rabbitmq.routing
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Consumer1 {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String queue1Name = "test_direct_queue1";

        channel.queueDeclare(queue1Name,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body:"+new String(body));
                System.out.println("Consumer1 将日志信息打印到控制台.....");

            }

        };

        channel.basicConsume(queue1Name,true,consumer);

    }
}

2、消费者2号

package com.xxx.rabbitmq.routing;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: Consumer2
 * @Package: com.xxx.rabbitmq.routing
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Consumer2 {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String queue2Name = "test_direct_queue2";

        channel.queueDeclare(queue2Name,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body:"+new String(body));
                System.out.println("Consumer2 将日志信息存储到数据库.....");

            }

        };

        channel.basicConsume(queue2Name,true,consumer);

    }

}

三、运行结果

1、绑定关系

在这里插入图片描述

2、消费消息

在这里插入图片描述

主题模式

一、生产者代码

新建子包topic,新建生产者类Producer:

package com.xxx.rabbitmq.topic;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * @ClassName: Producer
 * @Package: com.xxx.rabbitmq.topic
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Producer {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_topic";

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);

        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";

        channel.queueDeclare(queue1Name,true,false,false,null);
        channel.queueDeclare(queue2Name,true,false,false,null);

        // 绑定队列和交换机
        // 参数1. queue:队列名称
        // 参数2. exchange:交换机名称
        // 参数3. routingKey:路由键,绑定规则
        //      如果交换机的类型为fanout ,routingKey设置为""
        // routing key 常用格式:系统的名称.日志的级别。
        // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
        channel.queueBind(queue1Name,exchangeName,"#.error");
        channel.queueBind(queue1Name,exchangeName,"order.*");
        channel.queueBind(queue2Name,exchangeName,"*.*");

        // 分别发送消息到队列:order.info、goods.info、goods.error
        String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";
        channel.basicPublish(exchangeName,"order.info",null,body.getBytes());

        body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";
        channel.basicPublish(exchangeName,"goods.info",null,body.getBytes());

        body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";
        channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());

        channel.close();
        connection.close();

    }
}

二、消费者代码

1、消费者1号

消费者1监听队列1:

package com.xxx.rabbitmq.topic;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: Consumer1
 * @Package: com.xxx.rabbitmq.topic
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Consumer1 {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String QUEUE_NAME = "test_topic_queue1";

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body:"+new String(body));

            }

        };

        channel.basicConsume(QUEUE_NAME,true,consumer);

    }
}

2、消费者2号

消费者2监听队列2:

package com.xxx.rabbitmq.topic;

import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @ClassName: Consumer2
 * @Package: com.xxx.rabbitmq.topic
 * @Author: 
 * @CreateDate: 
 * @Version: V1.0.0
 * @Description:
 */

public class Consumer2 {
    public static void main(String[] args) throws Exception {

        Connection connection = ConnectionUtil.getConnection();

        Channel channel = connection.createChannel();

        String QUEUE_NAME = "test_topic_queue2";

        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        Consumer consumer = new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("body:"+new String(body));

            }

        };

        channel.basicConsume(QUEUE_NAME,true,consumer);

    }
}

三、运行效果

队列1:
在这里插入图片描述
队列2:
在这里插入图片描述
至此,就完成了RabbitMQ各模式的使用演示。

总结

在选择使用什么模式时,需要对应业务需求,结合需求选择合适的模式。


http://www.kler.cn/a/505882.html

相关文章:

  • SDK调用文心一言如何接入,文心一言API接入教程
  • 2025 年前端开发学习路线图完整指南
  • Docker
  • maven常见知识点
  • 上传自己的镜像到docker hub详细教程
  • 为深度学习创建PyTorch张量 - 最佳选项
  • Agile Scrum 敏捷开发方法
  • 基于Verilog的简易音乐节奏游戏设计
  • 【芯片封测学习专栏 -- 2D | 2.5D | 3D 封装的区别和联系】
  • ElasticSearch的劈山斧-自定义评分
  • 一步到位Python Django部署,浅谈Python Django框架
  • 性能测试 - Locust WebSocket client
  • node mysql和mysql2有什么区别
  • 潜力巨大但道路曲折的量子计算:探索未来科技的无限可能
  • 系统学习算法:专题四 前缀和
  • Vue.js组件开发-如何自定义Element UI组件
  • 人民邮电出版社书籍信息爬虫
  • C/C++中,const、static关键字有什么作用,如何定义、初始化,什么情形下需要用到这两关键字?
  • util层注入service
  • RabbitMQ-交换机
  • Flink CDC 在阿里云实时计算Flink版的云上实践
  • [Qt]常用控件介绍-多元素控件-QListWidget、QTableWidget、QQTreeWidget
  • 再见IT!
  • [每周一更]-(第131期):Go并发协程总结篇
  • 如何在JS里进行深拷贝
  • K8S 节点选择器