当前位置: 首页 > article >正文

RibbitMQ-原理使用

本文主要介绍RibbitMQ的基本架构,以及交换机Exchange在Direct、Fanout、Topic、Headers模式下的生产者、消费者应该如何发送、接收消息。

基本架构

RibbitMQ是一款基于AMQP(高级消息队列协议)用于软件之间通信的中间件,由Rabbit公司开发,服务器端用Erlang语言编写,支持多种语言的客户端。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
Rabbitmq四大核心:生产者、消费者、队列、交换机。
AMQP协议是一种二进制协议,它定义了一组规则和标准,以确保消息可以在不同的应用程序和平台之间传递和解释,AMQP协议包含四个核心组件:消息、交换机、队列、绑定

image-20241102121015813

Virtual host: Virtual host是一个虚拟主机的概念,一个RibbitMQ服务端中可以有多个Virtual host,每个Virtual host都有一套自己的Exchange和Queue,同一个Virtual host中的Exchange和Queue不能重名,不同的Virtua host中的Exchange和Queue名字可以一样。这样,不同的用户在访问同一个RabbitMQ服务端时,可以创建自己单独的Virtual host,然后在自己的Virtual host中创建Exchange和Queue,很好地做到了不同用户之间消息的相互隔性。

Connection:producer/consumer和RibbitMQ服务端之间的TCP连接。

Channel:发送消息的通道,如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection 的开销将是巨大的,效率也较低。Channel 是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯。

Exchange:message到达RabbitMQ的第一站,根据分发规则,匹配査询表中的routing key,分发消息到queue中去。常用的类型有:direct、 topic、 fanout、 headers。

Queue:Queue是一个用来存放消息的队列,生产者发送的消息会被放到Queue中,消费者消费消息时也是从Queue中取走消息。

Message:Message是储存消息的最小单位。

基本使用

maven依赖

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>5.22.0</version>
</dependency>

producer

/**
     * 发送消息
     */
    public static void sendMessage() throws IOException, TimeoutException {
        String queueName = "xiaoyuan-queue";
        String exchangeName = "xiaoyuan-exchange";
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // rabbitmq 服务地址
        connectionFactory.setHost("8.140.224.210");
        // rabbitmq 服务端口
        connectionFactory.setPort(5672);
        // rabbitmq 账号
        connectionFactory.setUsername("xiaoyuan");
        // rabbitmq 密码
        connectionFactory.setPassword("xiaoyuan");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建信息通道
        Channel channel = connection.createChannel();
        try {
            /**
             * 声明交换机
             * arg1:交换机名称
             * arg2:交换机类型 direct、fanout、topic、headers
             * arg3:交换机是否要持久化 如果true,则交换机元数据持久化
             * arg4: 交换机没有队列绑定时,是否删除;false不删除
             * arg5: Map<String,Object>类型 设定交换机其他一些结构化参数,可为null
             */
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);
            /**
             * 声明队列
             * arg1:队列名称
             * arg2:队列是否持久化;仅仅是队列信息的持久化,而不是队列中消息的持久化
             * arg3:队列是否私有化,若是私有的,只有创建它的应用程序才能消费
             * arg4:队列在没有消费者订阅的情况下,是否自动删除
             * arg5: 队列的一些结构化信息,如声明死信队列,磁盘队列
             */
            channel.queueDeclare(queueName, true, false, false, null);
            /**
             * 将队列与交换机绑定
             * arg1: 队列名称
             * arg2:交换机名称
             * arg3: 路由键,直连模式下为队列名称
             */
            channel.queueBind(queueName, exchangeName, queueName);

            // 创建消息
            String message = "hello world";

            /**
             * 发送消息
             * arg1:交换机名称
             * arg2:路由键,直连模式下为队列名称
             * arg3:其他参数信息
             * arg4:消息
             */
            channel.basicPublish(exchangeName, queueName, null, message.getBytes());
        } finally {
            // 释放管道资源
            channel.close();
            // 释放连接资源
            connection.close();
        }
    }

consumer

 /**
     * 发送消息
     */
    public static void consumerMessage() throws IOException, TimeoutException {
        String queueName = "xiaoyuan-queue";
        String exchangeName = "xiaoyuan-exchange";
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // rabbitmq 服务地址
        connectionFactory.setHost("8.140.224.210");
        // rabbitmq 服务端口
        connectionFactory.setPort(5672);
        // rabbitmq 账号
        connectionFactory.setUsername("xiaoyuan");
        // rabbitmq 密码
        connectionFactory.setPassword("xiaoyuan");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建信息通道
        Channel channel = connection.createChannel();
        try {
            // 接收消息的回调函数
            DeliverCallback deliverCallback = (consumerTage,messgae) -> {
                System.out.println("接收到的消息: " + new String(messgae.getBody()));
            };
            // 取消消息的回调函数
            CancelCallback cancelCallback = consumerTage -> {
                System.out.println("接收消息失败");
            };
            /**
             * 消费消息
             * arg1:消费队列名称
             * arg2:消费成功后是否自动提交  ack机制; false-不进行提交;true-提交,消息从队列中移除  
             * arg3: 接收消息的回调函数
             * arg4;取消消息的回调函数
             */
            channel.basicConsume(queueName,false,deliverCallback,cancelCallback);
        } finally {
            // 释放管道资源
            channel.close();
            // 释放连接资源
            connection.close();
        }
    }

交换机

Direct

原理

路由键与队列名完全匹配交换机,此种类型交换机,通过RoutingKey路由键将交换机和队列进行绑定,消息被发送到exchange时,需要根据消息的RoutingKey,来进行匹配,只将消息发送到完全匹配到此RoutingKey的队列。
例如:如果一个队列绑定到交换机要求路由键为“key”,则只转发RoutingKey标记为“key”的消息,不会转发”key1"。它是完全匹配、单播的模式

image-20241102103923594

使用

producer

/**
     * 发送消息 指定交换机 direct
     */
    public static void sendMessageByExchangeTypeDirect() throws IOException, TimeoutException {
        String queueName_1 = "xiaoyuan-queue-1";
        String queueName_2 = "xiaoyuan-queue-2";
        String queueName_3 = "xiaoyuan-queue-3";
        String queueName_4 = "xiaoyuan-queue-4";

        // 路由键
        String key_1 = "key-1";
        String key_3 = "key-3";
        String key_4 = "key-4";

        String exchangeName = "xiaoyuan-exchange";
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // rabbitmq 服务地址
        connectionFactory.setHost("8.140.224.210");
        // rabbitmq 服务端口
        connectionFactory.setPort(5672);
        // rabbitmq 账号
        connectionFactory.setUsername("xiaoyuan");
        // rabbitmq 密码
        connectionFactory.setPassword("xiaoyuan");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建信息通道
        Channel channel = connection.createChannel();
        try {
            /**
             * 声明交换机
             * arg1:交换机名称
             * arg2:交换机类型 direct、fanout、topic、headers
             * arg3:交换机是否要持久化 如果true,则交换机元数据持久化
             * arg4: 交换机没有队列绑定时,是否删除;false不删除
             * arg5: Map<String,Object>类型 设定交换机其他一些结构化参数,可为null
             */
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);
            /**
             * 声明队列
             * arg1:队列名称
             * arg2:队列是否持久化;仅仅是队列信息的持久化,而不是队列中消息的持久化
             * arg3:队列是否私有化,若是私有的,只有创建它的应用程序才能消费
             * arg4:队列在没有消费者订阅的情况下,是否自动删除
             * arg5: 队列的一些结构化信息,如声明死信队列,磁盘队列
             */
            channel.queueDeclare(queueName_1, true, false, false, null);
            channel.queueDeclare(queueName_2, true, false, false, null);
            channel.queueDeclare(queueName_3, true, false, false, null);
            channel.queueDeclare(queueName_4, true, false, false, null);
            /**
             * 将队列与交换机绑定
             * arg1: 队列名称
             * arg2:交换机名称
             * arg3: 路由键,直连模式下为队列名称
             */
            channel.queueBind(queueName_1, exchangeName, key_1);
            channel.queueBind(queueName_2, exchangeName, key_1);
            channel.queueBind(queueName_3, exchangeName, key_3);
            channel.queueBind(queueName_4, exchangeName, key_4);
            // 创建消息
            String message = "hello world";

            /**
             * 发送消息
             * arg1:交换机名称
             * arg2:路由键,直连模式下为队列名称
             * arg3:其他参数信息
             * arg4:消息
             */
            channel.basicPublish(exchangeName, key_1, null, ("key_1:" + message).getBytes());
            channel.basicPublish(exchangeName, key_1, null, ("key_1:" + message).getBytes());
            channel.basicPublish(exchangeName, key_3, null, ("key_3:" + message).getBytes());
            channel.basicPublish(exchangeName, key_4, null, ("key_4:" + message).getBytes());
        } finally {
            // 释放管道资源
            channel.close();
            // 释放连接资源
            connection.close();
        }
    }

image-20241102102920069

consumer

 /**
     * 发送消息
     */
    public static void consumerMessageByExchangeTypeDirect() throws IOException, TimeoutException {
        String queueName = "xiaoyuan-queue";
        String exchangeName = "xiaoyuan-exchange";
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // rabbitmq 服务地址
        connectionFactory.setHost("8.140.224.210");
        // rabbitmq 服务端口
        connectionFactory.setPort(5672);
        // rabbitmq 账号
        connectionFactory.setUsername("xiaoyuan");
        // rabbitmq 密码
        connectionFactory.setPassword("xiaoyuan");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建信息通道
        Channel channel = connection.createChannel();
        try {
            // 接收消息的回调函数
            DeliverCallback deliverCallback = (consumerTage,messgae) -> {
                System.out.println("接收到的消息: " + new String(messgae.getBody()));
            };
            // 取消消息的回调函数
            CancelCallback cancelCallback = consumerTage -> {
                System.out.println("接收消息失败");
            };
            /**
             * 消费消息
             * arg1:消费队列名称
             * arg2:消费成功后是否自动提交  ack机制; false-不进行提交;true-提交,消息从队列中移除
             * arg3: 接收消息的回调函数
             * arg4;取消消息的回调函数
             */
            channel.basicConsume("xiaoyuan-queue-1",true,deliverCallback,cancelCallback);
            channel.basicConsume("xiaoyuan-queue-2",true,deliverCallback,cancelCallback);
            channel.basicConsume("xiaoyuan-queue-3",true,deliverCallback,cancelCallback);
            channel.basicConsume("xiaoyuan-queue-4",true,deliverCallback,cancelCallback);

        } finally {
            // 释放管道资源
            channel.close();
            // 释放连接资源
            connection.close();
        }
    }

Fanout

原理

Fanout,扇出类型交换机,此种交换机,会将消息分发给所有绑定了此交换机的队列,此时RoutingKey参数无效。

fanout类型交换机下发送消息一条,无论RoutingKey是什么,queue1,queue2,queue3,queue4都可以收到消息

image-20241102110225268

使用

producer

/**
     * 发送消息 指定交换机 fanout
     */
    public static void sendMessageByExchangeTypeFanout() throws IOException, TimeoutException {
        String queueName_1 = "xiaoyuan-queue-fanout-1";
        String queueName_2 = "xiaoyuan-queue-fanout-2";
        String queueName_3 = "xiaoyuan-queue-fanout-3";
        String queueName_4 = "xiaoyuan-queue-fanout-4";

        String key_1 = "key-1";
        String key_2 = "key-2";
        String key_3 = "key-3";
        String key_4 = "key-4";

        String exchangeName = "xiaoyuan-exchange-fanout";
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // rabbitmq 服务地址
        connectionFactory.setHost("8.140.224.210");
        // rabbitmq 服务端口
        connectionFactory.setPort(5672);
        // rabbitmq 账号
        connectionFactory.setUsername("xiaoyuan");
        // rabbitmq 密码
        connectionFactory.setPassword("xiaoyuan");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建信息通道
        Channel channel = connection.createChannel();
        try {
            /**
             * 声明交换机
             * arg1:交换机名称
             * arg2:交换机类型 direct、fanout、topic、headers
             * arg3:交换机是否要持久化 如果true,则交换机元数据持久化
             * arg4: 交换机没有队列绑定时,是否删除;false不删除
             * arg5: Map<String,Object>类型 设定交换机其他一些结构化参数,可为null
             */
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, null);
            /**
             * 声明队列
             * arg1:队列名称
             * arg2:队列是否持久化;仅仅是队列信息的持久化,而不是队列中消息的持久化
             * arg3:队列是否私有化,若是私有的,只有创建它的应用程序才能消费
             * arg4:队列在没有消费者订阅的情况下,是否自动删除
             * arg5: 队列的一些结构化信息,如声明死信队列,磁盘队列
             */
            channel.queueDeclare(queueName_1, true, false, false, null);
            channel.queueDeclare(queueName_2, true, false, false, null);
            channel.queueDeclare(queueName_3, true, false, false, null);
            channel.queueDeclare(queueName_4, true, false, false, null);

            /**
             * 将队列与交换机绑定
             * arg1: 队列名称
             * arg2:交换机名称
             * arg3: 路由键,直连模式下为队列名称
             */
            channel.queueBind(queueName_1, exchangeName, key_1);
            channel.queueBind(queueName_2, exchangeName, key_2);
            channel.queueBind(queueName_3, exchangeName, key_3);
            channel.queueBind(queueName_4, exchangeName, key_4);


            // 创建消息
            String message = "hello world";

            /**
             * 发送消息
             * arg1:交换机名称
             * arg2:路由键,直连模式下为队列名称
             * arg3:其他参数信息
             * arg4:消息
             */
            channel.basicPublish(exchangeName, key_1, null, ("fanout key_1:" + message).getBytes());
        } finally {
            // 释放管道资源
            channel.close();
            // 释放连接资源
            connection.close();
        }
    }

image-20241102105709409

consumer

/**
     * 发送消息
     */
    public static void consumerMessageByExchangeTypeFanout() throws IOException, TimeoutException {
        String queueName_1 = "xiaoyuan-queue-fanout-1";
        String queueName_2 = "xiaoyuan-queue-fanout-2";
        String queueName_3 = "xiaoyuan-queue-fanout-3";
        String queueName_4 = "xiaoyuan-queue-fanout-4";
        String exchangeName = "xiaoyuan-exchange-fanout";
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // rabbitmq 服务地址
        connectionFactory.setHost("8.140.224.210");
        // rabbitmq 服务端口
        connectionFactory.setPort(5672);
        // rabbitmq 账号
        connectionFactory.setUsername("xiaoyuan");
        // rabbitmq 密码
        connectionFactory.setPassword("xiaoyuan");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建信息通道
        Channel channel = connection.createChannel();
        try {
            // 接收消息的回调函数
            DeliverCallback deliverCallback = (consumerTage,messgae) -> {
                System.out.println("接收到的消息: " + new String(messgae.getBody()));
            };
            // 取消消息的回调函数
            CancelCallback cancelCallback = consumerTage -> {
                System.out.println("接收消息失败");
            };
            /**
             * 消费消息
             * arg1:消费队列名称
             * arg2:消费成功后是否自动提交  ack机制; false-不进行提交;true-提交,消息从队列中移除
             * arg3: 接收消息的回调函数
             * arg4;取消消息的回调函数
             */
            channel.basicConsume(queueName_1,true,deliverCallback,cancelCallback);
            channel.basicConsume(queueName_2,true,deliverCallback,cancelCallback);
            channel.basicConsume(queueName_3,true,deliverCallback,cancelCallback);
            channel.basicConsume(queueName_4,true,deliverCallback,cancelCallback);

        } finally {
            // 释放管道资源
            channel.close();
            // 释放连接资源
            connection.close();
        }
    }

Topic

原理

Topic主题类型交换机,此种交换机与Direct类似,也是需要通过routingkey路由键进行匹配分发,区别在于Topic可以进行模糊匹配,Direct是完全匹配。

  • “.”:来分为多个部分(类似于逗号分隔)
  • “*”:代表一个部分
  • “#”:代表0个或多个部分(如果绑定的路由键为"#"时,则接受所有消息,因为路由键所有都匹配)

image-20241102111235531

当发送一条信息,routingkey为”key1.key2.key3.key4",那么根据"."将这个路由键分为了4个部分,此条路由键将会匹配:

  1. key1.key2.key3.* :模糊补充上一个部分,key4;
  2. key1.# :模糊补充上多个部分,key2,key3,key4;
  3. .key2..key4 :模糊补充上一部分,key1和key3;
  4. #.key3.key4 :模糊补充上多部分,key1,key2;

使用

producer

/**
     * 发送消息 指定交换机 topic
     */
    public static void sendMessageByExchangeTypeTopic() throws IOException, TimeoutException {
        String queueName_1 = "xiaoyuan-queue-topic-1";
        String queueName_2 = "xiaoyuan-queue-topic-2";
        String queueName_3 = "xiaoyuan-queue-topic-3";
        String queueName_4 = "xiaoyuan-queue-topic-4";

        String key_1 = "key1.key2.key3.*";
        String key_2 = "key1.#";
        String key_3 = "*.key2.*.key4";
        String key_4 = "#.key3.key4";

        String exchangeName = "xiaoyuan-exchange-topic";
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // rabbitmq 服务地址
        connectionFactory.setHost("8.140.224.210");
        // rabbitmq 服务端口
        connectionFactory.setPort(5672);
        // rabbitmq 账号
        connectionFactory.setUsername("xiaoyuan");
        // rabbitmq 密码
        connectionFactory.setPassword("xiaoyuan");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建信息通道
        Channel channel = connection.createChannel();
        try {
            /**
             * 声明交换机
             * arg1:交换机名称
             * arg2:交换机类型 direct、fanout、topic、headers
             * arg3:交换机是否要持久化 如果true,则交换机元数据持久化
             * arg4: 交换机没有队列绑定时,是否删除;false不删除
             * arg5: Map<String,Object>类型 设定交换机其他一些结构化参数,可为null
             */
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, null);
            /**
             * 声明队列
             * arg1:队列名称
             * arg2:队列是否持久化;仅仅是队列信息的持久化,而不是队列中消息的持久化
             * arg3:队列是否私有化,若是私有的,只有创建它的应用程序才能消费
             * arg4:队列在没有消费者订阅的情况下,是否自动删除
             * arg5: 队列的一些结构化信息,如声明死信队列,磁盘队列
             */
            channel.queueDeclare(queueName_1, true, false, false, null);
            channel.queueDeclare(queueName_2, true, false, false, null);
            channel.queueDeclare(queueName_3, true, false, false, null);
            channel.queueDeclare(queueName_4, true, false, false, null);

            /**
             * 将队列与交换机绑定
             * arg1: 队列名称
             * arg2:交换机名称
             * arg3: 路由键,直连模式下为队列名称
             */
            channel.queueBind(queueName_1, exchangeName, key_1);
            channel.queueBind(queueName_2, exchangeName, key_2);
            channel.queueBind(queueName_3, exchangeName, key_3);
            channel.queueBind(queueName_4, exchangeName, key_4);


            // 创建消息
            String message = "hello world";

            /**
             * 发送消息
             * arg1:交换机名称
             * arg2:路由键,直连模式下为队列名称
             * arg3:其他参数信息
             * arg4:消息
             */
            channel.basicPublish(exchangeName, "key1.key2.key3.key4", null, ("topic key_1:" + message).getBytes());
        } finally {
            // 释放管道资源
            channel.close();
            // 释放连接资源
            connection.close();
        }
    }

image-20241102112335079

consumer

 /**
     * 发送消息
     */
    public static void consumerMessageByExchangeTypeTopic() throws IOException, TimeoutException {
        String queueName_1 = "xiaoyuan-queue-topic-1";
        String queueName_2 = "xiaoyuan-queue-topic-2";
        String queueName_3 = "xiaoyuan-queue-topic-3";
        String queueName_4 = "xiaoyuan-queue-topic-4";
        String exchangeName = "xiaoyuan-exchange-topic";
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // rabbitmq 服务地址
        connectionFactory.setHost("8.140.224.210");
        // rabbitmq 服务端口
        connectionFactory.setPort(5672);
        // rabbitmq 账号
        connectionFactory.setUsername("xiaoyuan");
        // rabbitmq 密码
        connectionFactory.setPassword("xiaoyuan");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建信息通道
        Channel channel = connection.createChannel();
        try {
            // 接收消息的回调函数
            DeliverCallback deliverCallback = (consumerTage,messgae) -> {
                System.out.println("接收到的消息: " + new String(messgae.getBody()));
            };
            // 取消消息的回调函数
            CancelCallback cancelCallback = consumerTage -> {
                System.out.println("接收消息失败");
            };
            /**
             * 消费消息
             * arg1:消费队列名称
             * arg2:消费成功后是否自动提交  ack机制; false-不进行提交;true-提交,消息从队列中移除
             * arg3: 接收消息的回调函数
             * arg4;取消消息的回调函数
             */
            channel.basicConsume(queueName_1,true,deliverCallback,cancelCallback);
            channel.basicConsume(queueName_2,true,deliverCallback,cancelCallback);
            channel.basicConsume(queueName_3,true,deliverCallback,cancelCallback);
            channel.basicConsume(queueName_4,true,deliverCallback,cancelCallback);

        } finally {
            // 释放管道资源
            channel.close();
            // 释放连接资源
            connection.close();
        }
    }

Headers

原理

headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
消费方指定的headers中必须包含一个“x-match"的键。

键"x-match"的值有2个

  1. x-match= all :表示所有的键值对都匹配才能接受到消息
  2. x-match =any:表示只要有键值对匹配就能接受到消息

注:在消费者测进行交换机与队列的绑定,先启动生产者,创建交换机&队列,再启动消费者绑定

image-20241102115412685

当发送消息的其它参数为{“name”:“小明123”,“age”:10}时,queue-1绑定的是匹配方式是all,全匹配,无法发送到queue-1中;但是queue-2绑定的是非全匹配,则可以发送到queue-2中。

使用

producer

 /**
     * 发送消息 指定交换机 headers
     */
    public static void sendMessageByExchangeTypeHeaders() throws IOException, TimeoutException {
        String queueName_1 = "xiaoyuan-queue-headers-1";
        String exchangeName = "xiaoyuan-exchange-headers";
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // rabbitmq 服务地址
        connectionFactory.setHost("8.140.224.210");
        // rabbitmq 服务端口
        connectionFactory.setPort(5672);
        // rabbitmq 账号
        connectionFactory.setUsername("xiaoyuan");
        // rabbitmq 密码
        connectionFactory.setPassword("xiaoyuan");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建信息通道
        Channel channel = connection.createChannel();
        try {
            /**
             * 声明交换机
             * arg1:交换机名称
             * arg2:交换机类型 direct、fanout、topic、headers
             * arg3:交换机是否要持久化 如果true,则交换机元数据持久化
             * arg4: 交换机没有队列绑定时,是否删除;false不删除
             * arg5: Map<String,Object>类型 设定交换机其他一些结构化参数,可为null
             */
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.HEADERS, true, false, null);
            /**
             * 声明队列
             * arg1:队列名称
             * arg2:队列是否持久化;仅仅是队列信息的持久化,而不是队列中消息的持久化
             * arg3:队列是否私有化,若是私有的,只有创建它的应用程序才能消费
             * arg4:队列在没有消费者订阅的情况下,是否自动删除
             * arg5: 队列的一些结构化信息,如声明死信队列,磁盘队列
             */
            channel.queueDeclare(queueName_1, true, false, false, null);

            Map<String,Object> headerMap = new HashMap<>();
            headerMap.put("name","小明");
            headerMap.put("age",10);
            AMQP.BasicProperties.Builder props = new AMQP.BasicProperties().builder().headers(headerMap);

            // 创建消息
            String message = "hello world";

            /**
             * 发送消息
             * arg1:交换机名称
             * arg2:路由键,直连模式下为队列名称
             * arg3:其他参数信息
             * arg4:消息
             */
            channel.basicPublish(exchangeName, "", props.build(), ("header:" + message).getBytes());
        } finally {
            // 释放管道资源
            channel.close();
            // 释放连接资源
            connection.close();
        }
    }

consumer

/**
     * 发送消息
     */
    public static void consumerMessageByExchangeTypeHeaders() throws IOException, TimeoutException {
        String queueName_1 = "xiaoyuan-queue-headers-1";
        String exchangeName = "xiaoyuan-exchange-headers";
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // rabbitmq 服务地址
        connectionFactory.setHost("8.140.224.210");
        // rabbitmq 服务端口
        connectionFactory.setPort(5672);
        // rabbitmq 账号
        connectionFactory.setUsername("xiaoyuan");
        // rabbitmq 密码
        connectionFactory.setPassword("xiaoyuan");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建信息通道
        Channel channel = connection.createChannel();
        try {
            // 接收消息的回调函数
            DeliverCallback deliverCallback = (consumerTage,messgae) -> {
                System.out.println("接收到的消息: " + new String(messgae.getBody()));
            };
            // 取消消息的回调函数
            CancelCallback cancelCallback = consumerTage -> {
                System.out.println("接收消息失败");
            };

            Map<String,Object> headerMap = new HashMap<>();
            headerMap.put("x-match","any");
            headerMap.put("name","小明");
            headerMap.put("age",10);

            /**
             * 将队列与交换机绑定
             * arg1: 队列名称
             * arg2:交换机名称
             * arg3: 路由键,直连模式下为队列名称
             * arg4:headeraMap
             */
            channel.queueBind(queueName_1, exchangeName,"",headerMap);
            /**
             * 消费消息
             * arg1:消费队列名称
             * arg2:消费成功后是否自动提交  ack机制; false-不进行提交;true-提交,消息从队列中移除
             * arg3: 接收消息的回调函数
             * arg4;取消消息的回调函数
             */
            channel.basicConsume(queueName_1,true,deliverCallback,cancelCallback);

        } finally {
            // 释放管道资源
            channel.close();
            // 释放连接资源
            connection.close();
        }
    }

http://www.kler.cn/a/385265.html

相关文章:

  • Ubuntu22.04安装DataEase
  • 纯血鸿蒙系统 HarmonyOS NEXT自动化测试实践
  • C++【string类,模拟实现string类】
  • WPF中如何简单的使用MvvmLight创建一个项目并进行 增删改查
  • DeFi 4.0峥嵘初现:主权金融时代的来临
  • springboot 传统应用程序,适配云原生改造
  • Java NIO操作
  • Liunx:文件fd、重定向、管道
  • 全局变量之C与Pthon的差异
  • 若依管理系统使用已有 Nacos 部署流程整理
  • [JAVAEE] 面试题(四) - 多线程下使用ArrayList涉及到的线程安全问题及解决
  • 城镇住房保障:SpringBoot系统维护与升级
  • Python基于TensorFlow实现双向循环神经网络GRU加注意力机制分类模型(BiGRU-Attention分类算法)项目实战
  • 多线程案例---阻塞队列
  • RapidrepairController
  • linux 下 signal() 函数的用法,信号类型在哪里定义的?
  • 【go从零单排】go语言中的指针
  • NVR小程序接入平台/设备EasyNVR多品牌NVR管理工具/设备汇聚公共资源场景方案全析
  • 如何设置 TORCH_CUDA_ARCH_LIST 环境变量以优化 PyTorch 性能
  • AutoOps 使每个 Elasticsearch 部署都更易于管理
  • C#核心(7)索引器
  • 从0开始linux(21)——文件(2)文件重定向
  • Hive 查询各类型专利 Top 10 申请人及对应的专利申请数
  • 记录offcanvas不能显示和关闭的修复方法
  • QT监控文件夹变化(文件增加、删除、改名)
  • B2C分销管理系统(源码+文档+部署+讲解)