当前位置: 首页 > article >正文

4.RabbitMQ工作模型

四、Rabbit的工作模型

1、RabbitMQ的工作模型

4.1.1、基本流程图

在这里插入图片描述

  • broker:相当于mysql服务器
  • virtual host相当于数据库(可以有多个数据库)
  • queue相当于表
  • 消息相当于记录

4.1.2、核心概念

消息队列有三个核心要素: 消息生产者消息队列消息消费者

(1)、生产者(Producer)

发送消息的应用;

它将消息发送到 RabbitMQ 的交换器中

(2)、消费者(Consumer)

消费者是消息的接收方,它从 RabbitMQ 的队列中获取消息并进行处理

(3)、代理(Broker)

就是消息服务器

负责接收、存储和转发消息,作用类似邮局

RabbitMQ Server就是Message Broker

  • Broker = VHost1+Vhost2+Vhost3+…
  • Virtual Host = Exchange + Queue +Binding
(4)、消息 Message

由生产者通过RabbitMQ发送给消费者的信息

消息可以任何数据(字符串、user对象,json串等等)

  • 消息=消息头+消息体,根据routekey发送到指定的交换机 Exchange
  • 消息头:含有各种属性 routing-key(路由键)、priority(优先级)、delivery-mode(指出该消息可能需要持久性存储)等。
(5)、连接(Connection)

生产者/消费者和broker之间的TCP连接

断开连接的操作只会在client端进行,Broker不会断开连接,除非出现网络故障或broker服务出现问题

(6)、信道(Channel)

Channel是多路复用连接中的一条独立的双向数据流通道

如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低

Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯

AMQP method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的

Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成

(7)、虚拟主机(Virtual host)

逻辑分组机制,类似于nacos中的命名空间的概念(在代码中就是一个字符串)

把AMQP的基本组件划分到一个虚拟的分组中从而将不同的用户、队列、交换器等资源隔离开来

当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个vhost,每个用户在自己vhost创建exchange/queue等

  • Virtual 即 VHost
  • 默认目录 /
(8)、交换机(Exchange)

交换机负责从生产者接收消息,并根据交换机类型分发到对应的消息队列中,起到一个路由的作用

常用三种类型

  • dirct(Direct Exchange)

    直连交换器

    单播】完全匹配路由键的队列

  • fanout(Fanout Exchange)

    扇出交换器

    广播】消息分发所有绑定队列上,不处理路由键

  • topic(Topic Exchange)

    主题交换器

    【模式匹配】

    • #:配置0个或者多个单词
    • *:匹配一个单词
  • headers:很少使用

  • system:很少使用

(9)、路由键(Routing Key)

交换机根据路由键来决定消息分发到哪个队列,路由键是消息的目的地址

(10)、绑定(Binding)

绑定是消息队列和交换机的一个关联连接(关联关系)

  • 一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表
  • Exchange 和Queue的绑定可以是多对多的关系
(11)、队列(Queue)

存储消息的容器

消息最终被送到这里等待consumer取走

一个message可以被同时拷贝到多个queue中

2、七种工作模式

4.2.1、简单队列模式

(1)、模式简介

P为生产者,将消息发送到消息队列,消费者直接消费消息队列里面的内容

在这里插入图片描述

(2)、模式特点

特点

  • ⼀个⽣产者P
  • ⼀个消费者C
  • 消息只能被消费⼀次

也称为点对点模式

(3)、模式示例

rabbitmq-04-seven-mode-01

引入依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.25.0</version>
</dependency>
定义常量
package com.longdidi;

public class Constants {
    // MQ服务器地址
    public static final String HOST = "192.168.0.101";
    // 端口号
    public static final Integer PORT = 5672;
    // 用户名
    public static final String USER_NAME = "admin";
    // 密码
    public static final String PASSWORD = "123456";
    // 虚拟主机名称
    public static final String VIRTUAL_HOST = "longdidi";
    // 声明队列名称
    public static final String QUEUE_NAME = "simple";
}
生产者
  1. 创建连接工厂
  2. 设置工厂参数
  3. 创建channel
  4. 声明queue
  5. 通过channel发送到queue
  6. 资源释放
package com.longdidi;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置工厂参数
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        //获取TCP链接
        Connection connection = factory.newConnection();

        //3.创建channel信道
        Channel channel = connection.createChannel();

        /**
         * 4.声明队列
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         * Map<String, Object> arguments)
         * 1.队列名称
         * 2.durable 可持久化 true为持久化
         * 3.exclusive 是否独占 false
         * 4. autoDelete 是否自动删除 false
         * 5.arguments 参数
         */
        //如果没有⼀个"simple"这样的⼀个队列,会⾃动创建;如果有则不创建
        channel.queueDeclare(Constants.QUEUE_NAME, true, false, false, null);

        /**
         * 5.通过channel发送到queue
         * basicPublish(String exchange, String routingKey, AMQP.BasicProperties props,
         * byte[] body)
         * 1.exchange 交换机名称 ,简单情况下 一般默认的情况为""
         * 2.routingKey 路由key名称=队列名称
         * 3.props 配置信息
         * 4.body 发送信息的数据
         */
        for (int i = 0; i < 10; i++) {
            String msg = "hello 简单队列~" + i;
            channel.basicPublish("", "simple", null, msg.getBytes());
        }
        System.out.println("信息发送成功!");
        //6. 资源释放
        channel.close();
        connection.close();
    }
}

在这里插入图片描述

消费者

1.创建连接工厂

2.设置工厂参数

3.创建channel

4.声明queue

5.消费数据

6.资源释放

package com.longdidi;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.设置工厂参数
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = factory.newConnection();

        //3.创建channel
        Channel channel = connection.createChannel();

        /**
         * 4.声明queue
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         * Map<String, Object> arguments)
         * 1.队列名称
         * 2.durable 可持久化 true为持久化
         * 3.exclusive 是否独占 false
         * 4. autoDelete 是否自动删除 false
         * 5.arguments 参数
         */
        //如果没有⼀个"simple"这样的⼀个队列,则会⾃动创建;如果有则不创建
        channel.queueDeclare("simple", true, false, false, null);

        /**
         * 5.接收信息并消费
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * queue :队列名称
         * autoAck:是否自动确认 消费者接收信息与MQ
         * callback :回调对象
         */
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /*
            回调方法 当收到信息 自动执行该方法
            consumerTag
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到信息:" + new String(body));
            }
        };
        channel.basicConsume(Constants.QUEUE_NAME, true, consumer);
        // 6.释放资源
        channel.close();
        connection.close();
    }
}

在这里插入图片描述

4.2.2、⼯作队列模式

(1)、模式简介

一个生产者§,多个消费者©,消息队列会平均分配给消费者

在这里插入图片描述

(2)、模式特点

特点:消息不会重复分配给不同的消费者 适⽤场景:集群环境中做异步处理

(3)、模式示例

rabbitmq-04-seven-mode-02

引入依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.25.0</version>
</dependency>
定义常量
package com.longdidi;

public class Constants {
    // MQ服务器地址
    public static final String HOST = "192.168.0.101";
    // 端口号
    public static final Integer PORT = 5672;
    // 用户名
    public static final String USER_NAME = "admin";
    // 密码
    public static final String PASSWORD = "123456";
    // 虚拟主机名称
    public static final String VIRTUAL_HOST = "longdidi";
    // 声明队列名称
    public static final String QUEUE_NAME = "work_queue";
}
生产者
package com.longdidi;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2.设置工厂参数
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        //获取TCP连接
        Connection connection = factory.newConnection();
        //3.创建channel
        Channel channel = connection.createChannel();
        //4.声明queue
        channel.queueDeclare(Constants.QUEUE_NAME, true, false, false, null);
        //5.通过channel发送到queue
        for (int i = 0; i < 10; i++) {
            String msg = "hello 工作队列~" + i;
            channel.basicPublish("", Constants.QUEUE_NAME, null, msg.getBytes());
        }
        System.out.println("信息发送成功!");
        //6.资源释放
        channel.close();
        connection.close();
    }
}

在这里插入图片描述

消费者
  • 消费者1

    package com.longdidi;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer1 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
    
            //2.设置工厂参数
            factory.setHost(Constants.HOST);
            factory.setPort(Constants.PORT);
            factory.setUsername(Constants.USER_NAME);
            factory.setPassword(Constants.PASSWORD);
            factory.setVirtualHost(Constants.VIRTUAL_HOST);
            //获取TCP连接
            Connection connection = factory.newConnection();
            //3.创建channel
            Channel channel = connection.createChannel();
            //4.声明queue
            //如果没有⼀个"work_queue"这样的⼀个队列,会⾃动创建;如果有则不创建
            channel.queueDeclare(Constants.QUEUE_NAME, true, false, false, null);
            //5.接收信息并消费
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接收到信息:" + new String(body));
                }
            };
            channel.basicConsume(Constants.QUEUE_NAME, true, consumer);
            //6.释放资源
            /*channel.close();
            connection.close();*/
        }
    
    }
    

    在这里插入图片描述

  • 消费者2

    package com.longdidi;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer2 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
    
            //2.设置工厂参数
            factory.setHost(Constants.HOST);
            factory.setPort(Constants.PORT);
            factory.setUsername(Constants.USER_NAME);
            factory.setPassword(Constants.PASSWORD);
            factory.setVirtualHost(Constants.VIRTUAL_HOST);
            //获取TCP连接
            Connection connection = factory.newConnection();
            //3.创建channel
            Channel channel = connection.createChannel();
            //4.声明queue
            //如果没有⼀个"work_queue"这样的⼀个队列,会⾃动创建,;如果有则不创建
            channel.queueDeclare(Constants.QUEUE_NAME, true, false, false, null);
            //5.接收信息并消费
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接收到信息:" + new String(body));
                }
            };
            channel.basicConsume(Constants.QUEUE_NAME, true, consumer);
            //6.释放资源
            /*channel.close();
            connection.close();*/
        }
    
    }
    

    在这里插入图片描述

4.2.3、发布/订阅模式

(1)、模式简介

将消息交给所有绑定到交换机的队列

发布订阅模式是⽆条件的将所有消息分发给所有消费者

在这里插入图片描述

X作为交换机将消息复制多份并且发送多个消费者,并且每个消费者收到相同的信息

比如P发送了10条消息,C1和C2都消费10条信息

适合场景: 消息需要被多个消费者同时接收的场景(如: 实时通知或者⼴播消息)

在这里插入图片描述

(2)、模式特点
  • Exchange(交换机)

    只负责转发消息, 不具备存储消息的能⼒, 因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失

  • RoutingKey(路由键)

    ⽣产者将消息发给交换器时,指定的⼀个字符串, ⽤来告诉交换机应该如何处理这个消息

  • Binding Key(绑定)

    RabbitMQ中通过Binding(绑定)将交换器与队列关联起来, 在绑定的时候⼀般会指定⼀个Binding Key, 这样RabbitMQ就知道如何正确地将消息路由到队列了

(3)、模式示例

rabbitmq-04-seven-mode-03

引入依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.25.0</version>
</dependency>
定义常量
package com.longdidi;

public class Constants {
    // MQ服务器地址
    public static final String HOST = "192.168.0.101";
    // 端口号
    public static final Integer PORT = 5672;
    // 用户名
    public static final String USER_NAME = "admin";
    // 密码
    public static final String PASSWORD = "123456";
    // 虚拟主机名称
    public static final String VIRTUAL_HOST = "longdidi";
    // 声明交换机
    public static final String FANOUT_EXCHANGE = "fanout.exchange";
    // 队列1
    public static final String FANOUT_QUEUE1 = "fanout.queue1";
    // 队列2
    public static final String FANOUT_QUEUE2 = "fanout.queue2";
}
生产者
package com.longdidi;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明交换机
        channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
        //4. 声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
        //5. 交换机和队列绑定
        channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");
        channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");
        //6. 发布消息
        for (int i = 0; i < 10; i++) {
            String msg = "hello 发布订阅队列~" + i;
            channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes());
        }
        System.out.println("消息发送成功");
        //7. 释放资源
        channel.close();
        connection.close();
    }
}

在这里插入图片描述

查看控制台

在这里插入图片描述

消费者
  • 消费者1

    package com.longdidi;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer1 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1. 建立连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(Constants.HOST);
            connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
            connectionFactory.setUsername(Constants.USER_NAME);//账号
            connectionFactory.setPassword(Constants.PASSWORD);  //密码
            connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
            Connection connection = connectionFactory.newConnection();
            //2. 开启信道
            Channel channel = connection.createChannel();
            //3. 声明队列
            channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
            //4. 消费消息
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                //从队列中收到消息, 就会执行的方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接收到消息:" + new String(body));
                }
            };
            channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);
        }
    }
    

在这里插入图片描述

  • 消费者2

    package com.longdidi;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer2 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1. 建立连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(Constants.HOST);
            connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
            connectionFactory.setUsername(Constants.USER_NAME);//账号
            connectionFactory.setPassword(Constants.PASSWORD);  //密码
            connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
            Connection connection = connectionFactory.newConnection();
            //2. 开启信道
            Channel channel = connection.createChannel();
            //3. 声明队列
            channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
            //4. 消费消息
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                //从队列中收到消息, 就会执行的方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接收到消息:" + new String(body));
                }
            };
            channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);
        }
    }
    

在这里插入图片描述

4.2.4、路由模式

(1)、模式简介

在这里插入图片描述

路由模式是发布订阅模式的变种, 在发布订阅基础上,增加路由key

路由模式是Exchange根据RoutingKey的规则, 将数据筛选后发给对应的消费者队列

适合场景: 需要根据特定规则分发消息的场景

⽐如系统打印⽇志(⽇志等级分为error, warning, info,debug)就可以通过这种模式把不同的⽇志发送到不同的队列, 最终输出到不同的⽂件

(2)、模式示例

rabbitmq-04-seven-mode-04

引入依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.25.0</version>
</dependency>
定义常量
package com.longdidi;

public class Constants {
    // MQ服务器地址
    public static final String HOST = "192.168.0.101";
    // 端口号
    public static final Integer PORT = 5672;
    // 用户名
    public static final String USER_NAME = "admin";
    // 密码
    public static final String PASSWORD = "123456";
    // 虚拟主机名称
    public static final String VIRTUAL_HOST = "longdidi";
    // 声明交换机
    public static final String DIRECT_EXCHANGE = "direct.exchange";
    // 队列1
    public static final String DIRECT_QUEUE1 = "direct.queue1";
    // 队列2
    public static final String DIRECT_QUEUE2 = "direct.queue2";
}
生产者
package com.longdidi;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明交换机
        channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
        //4. 声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);
        //5. 交换机和队列绑定
        channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");
        //6. 发布消息
        String msg_a = "hello 路由队列~ my routingKey is a...";
        channel.basicPublish(Constants.DIRECT_EXCHANGE, "a", null, msg_a.getBytes());
        String msg_b = "hello 路由队列~ my routingKey is b...";
        channel.basicPublish(Constants.DIRECT_EXCHANGE, "b", null, msg_b.getBytes());
        String msg_c = "hello 路由队列~ my routingKey is c...";
        channel.basicPublish(Constants.DIRECT_EXCHANGE, "c", null, msg_c.getBytes());

        System.out.println("消息发送成功");
        //7. 释放资源
        channel.close();
        connection.close();
    }
}

在这里插入图片描述

消费者
  • 消费者1

    package com.longdidi;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer1 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2.设置工厂参数
            factory.setHost(Constants.HOST);
            factory.setPort(Constants.PORT);
            factory.setUsername(Constants.USER_NAME);
            factory.setPassword(Constants.PASSWORD);
            factory.setVirtualHost(Constants.VIRTUAL_HOST);
            Connection connection = factory.newConnection();
            //3.创建channel
            Channel channel = connection.createChannel();
            //4.声明queue
            //如果没有这样的⼀个队列会⾃动创建;如果有则不创建
            channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
            //5.接收信息并消费
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接收到信息:" + new String(body));
                }
            };
            channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);
        }
    }
    

在这里插入图片描述

  • 消费者2

    package com.longdidi;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer2 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2.设置工厂参数
            factory.setHost(Constants.HOST);
            factory.setPort(Constants.PORT);
            factory.setUsername(Constants.USER_NAME);
            factory.setPassword(Constants.PASSWORD);
            factory.setVirtualHost(Constants.VIRTUAL_HOST);
            Connection connection = factory.newConnection();
            //3.创建channel
            Channel channel = connection.createChannel();
            //4.声明queue
            //如果没有这样的⼀个队列会⾃动创建;如果有则不创建
            channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
            //5.接收信息 并消费
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接收到信息:" + new String(body));
                }
            };
            channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);
        }
    }
    

    在这里插入图片描述

4.2.5、通配符模式

(1)、模式简介

在这里插入图片描述

路由模式的升级版, 在routingKey的基础上,增加了通配符的功能, 使之更加灵活

Topics和Routing的基本原理相同,即⽣产者将消息发给交换机,交换机根据RoutingKey将消息转发给与RoutingKey匹配的队列,类似于正则表达式的⽅式来定义Routingkey的模式

适合场景: 需要灵活匹配和过滤消息的场景

(2)、模式示例

rabbitmq-04-seven-mode-05

引用依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.25.0</version>
</dependency>
定义常量
package com.longdidi;

public class Constants {
    // MQ服务器地址
    public static final String HOST = "192.168.0.101";
    // 端口号
    public static final Integer PORT = 5672;
    // 用户名
    public static final String USER_NAME = "admin";
    // 密码
    public static final String PASSWORD = "123456";
    // 虚拟主机名称
    public static final String VIRTUAL_HOST = "longdidi";
    // 声明交换机
    public static final String TOPIC_EXCHANGE = "topic.exchange";
    // 队列1
    public static final String TOPIC_QUEUE1 = "topic.queue1";
    // 队列2
    public static final String TOPIC_QUEUE2 = "topic.queue2";
}
生产者
package com.longdidi;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明交换机
        channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
        //4. 声明队列
        channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
        //5. 交换机和队列绑定
        channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");
        channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");
        channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");
        //6. 发布消息
        String msg_a = "hello 路由队列~ my routingKey is ae.a.f...";
        channel.basicPublish(Constants.TOPIC_EXCHANGE, "ae.a.f", null, msg_a.getBytes());
        String msg_b = "hello 路由队列~ my routingKey is ef.a.b...";
        channel.basicPublish(Constants.TOPIC_EXCHANGE, "ef.a.b", null, msg_b.getBytes());
        String msg_c = "hello 路由队列~ my routingKey is c.ef.d ...";
        channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.ef.d", null, msg_c.getBytes());

        System.out.println("消息发送成功");
        //7. 释放资源
        channel.close();
        connection.close();
    }
}

在这里插入图片描述

查看绑定关系

在这里插入图片描述

消费者
  • 消费者1

    package com.longdidi;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer1 {
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2.设置工厂参数
            factory.setHost(Constants.HOST);
            factory.setPort(Constants.PORT);
            factory.setUsername(Constants.USER_NAME);
            factory.setPassword(Constants.PASSWORD);
            factory.setVirtualHost(Constants.VIRTUAL_HOST);
            Connection connection = factory.newConnection();
            //3.创建channel
            Channel channel = connection.createChannel();
            //4.声明queue
            //如果没有这样的⼀个队列会⾃动创建;如果有则不创建
            channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
            //5.接收信息并消费
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接收到信息:" + new String(body));
                }
            };
            channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);
        }
    }
    

    在这里插入图片描述

  • 消费者2

    package com.longdidi;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Consumer2 {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2.设置工厂参数
            factory.setHost(Constants.HOST);
            factory.setPort(Constants.PORT);
            factory.setUsername(Constants.USER_NAME);
            factory.setPassword(Constants.PASSWORD);
            factory.setVirtualHost(Constants.VIRTUAL_HOST);
            Connection connection = factory.newConnection();
            //3.创建channel
            Channel channel = connection.createChannel();
            //4.声明queue
            //如果没有这样的⼀个队列会⾃动创建;如果有则不创建
            channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
            //5.接收信息并消费
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("接收到信息:" + new String(body));
                }
            };
            channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);
        }
    }
    

    在这里插入图片描述

4.2.6、RPC通信模式

(1)、模式简介

在这里插入图片描述

在RPC通信的过程中, 没有⽣产者和消费者, ⽐较像咱们RPC远程调⽤, ⼤概就是通过两个队列实现了⼀个可回调的过程

在这里插入图片描述

  • 客户端:

    1.发送请求(携带replyTo,correlationId )

    2.接收响应(验证correlationId)

  • 服务器:

    1.接收请求 进行响应

    2.发送响应(按客户端指定的replyTo,设置correlationId)

(2)、模式示例

rabbitmq-04-seven-mode-06

引入依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.25.0</version>
</dependency>
定义常量
package com.longdidi;

public class Constants {
    // MQ服务器地址
    public static final String HOST = "192.168.0.101";
    // 端口号
    public static final Integer PORT = 5672;
    // 用户名
    public static final String USER_NAME = "admin";
    // 密码
    public static final String PASSWORD = "123456";
    // 虚拟主机名称
    public static final String VIRTUAL_HOST = "longdidi";
    // 请求队列
    public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
    // 响应队列
    public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
}
客户端1
package com.longdidi;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class Client1 {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
        //3. 发送请求
        String msg = "hello rpc...";
        //设置请求的唯一标识
        String correlationID = UUID.randomUUID().toString();
        //设置请求的相关属性
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .correlationId(correlationID)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();
        channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());

        //4. 接收响应
        //使用阻塞队列, 来存储响应信息
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String respMsg = new String(body);
                System.out.println("接收到回调消息: " + respMsg);
                if (correlationID.equals(properties.getCorrelationId())) {
                    //如果correlationID校验一致
                    response.offer(respMsg);
                }
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
        String result = response.take();
        System.out.println("[RPC Client 响应结果]:" + result);
    }
}
客户端2
package com.longdidi;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class Client2 {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
        //3. 发送请求
        String msg = "hello rpc...";
        //设置请求的唯一标识
        String correlationID = UUID.randomUUID().toString();
        //设置请求的相关属性
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .correlationId(correlationID)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();
        channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());

        //4. 接收响应
        //使用阻塞队列, 来存储响应信息
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String respMsg = new String(body);
                System.out.println("接收到回调消息: " + respMsg);
                if (correlationID.equals(properties.getCorrelationId())) {
                    //如果correlationID校验一致
                    response.offer(respMsg);
                }
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
        String result = response.take();
        System.out.println("[RPC Client 响应结果]:" + result);
    }
}

4.2.7、发布确认模式

(1)、模式简介

消息丢失其中一种情是⽣产者问题:因为应⽤程序故障、⽹络抖动等各种原因, ⽣产者没有成功向broker发送消息 可以采⽤发布确认(Publisher Confirms)机制实现

发送⽅确认机制最⼤的好处在于它是异步的, ⽣产者可以同时发布消息和等待信道返回确认消息

  1. 当消息最终得到确认之后, ⽣产者可以通过回调⽅法来处理该确认消息
  2. 如果RabbitMQ因为⾃⾝内部错误导致消息丢失, 就会发送⼀条nack(Basic.Nack)命令, ⽣产者同样 可以在回调⽅法中处理该nack命令 使⽤发送确认机制必须要信道设置成confirm(确认)模式

发布确认有3种策略

(2)、模式示例

测试模块:rabbitmq-04-seven-mode-07

引入依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.25.0</version>
</dependency>
定义常量
package com.longdidi;

public class Constants {
    // MQ服务器地址
    public static final String HOST = "192.168.0.101";
    // 端口号
    public static final Integer PORT = 5672;
    // 用户名
    public static final String USER_NAME = "admin";
    // 密码
    public static final String PASSWORD = "123456";
    // 虚拟主机名称
    public static final String VIRTUAL_HOST = "longdidi";
    // 声明交换机
    public static final String DIRECT_EXCHANGE = "direct.exchange";
    // 队列1
    public static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher_confirms_queue1.queue1";
    // 队列2
    public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher_confirms_queue2.queue2";
    // 队列3
    public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher_confirms_queue3.queue3";

    public static final Integer MESSAGE_COUNT = 10;
}
三种模式
  • 单独确认
package com.longdidi;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Individually {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //1.开启信道
        Channel channel = connection.createChannel();
        //2. 设置信道为confirm模式
        channel.confirmSelect();
        //3.声明队列
        channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);
        //4.发送消息,并等待确认
        Long start = System.currentTimeMillis();
        for (int i = 0; i < Constants.MESSAGE_COUNT; i++) {
            String msg = "hello publisher confirms" + i;
            channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());
            //等待确认
            channel.waitForConfirmsOrDie(5000);
        }
        Long end = System.currentTimeMillis();
        System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n", Constants.MESSAGE_COUNT, end - start);

        //5.关闭资源
        channel.close();
        connection.close();
    }
}

在这里插入图片描述

  • 批量确认
package com.longdidi;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Batches {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //1.开启信道
        Channel channel = connection.createChannel();

        //2. 设置信道为confirm模式
        channel.confirmSelect();
        //3.声明队列
        channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);
        //4. 发送消息, 并进行确认
        long start = System.currentTimeMillis();
        int batchSize = 100;
        int outstandingMessageCount = 0;
        for (int i = 0; i < Constants.MESSAGE_COUNT; i++) {
            String msg = "hello publisher confirms" + i;
            channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());
            outstandingMessageCount++;
            if (outstandingMessageCount == batchSize) {
                channel.waitForConfirmsOrDie(5000);
                outstandingMessageCount = 0;
            }
        }
        if (outstandingMessageCount > 0) {
            channel.waitForConfirmsOrDie(5000);
        }
        long end = System.currentTimeMillis();
        System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n", Constants.MESSAGE_COUNT, end - start);
        //5.关闭资源
        channel.close();
        connection.close();
    }

}

在这里插入图片描述

  • 异步确认
package com.longdidi;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;

public class Asynchronously {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //1.开启信道
        Channel channel = connection.createChannel();

        //2. 设置信道为confirm模式
        channel.confirmSelect();
        //3.声明队列
        channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);
        //4. 监听confirm
        //集合中存储的是未确认的消息ID
        long start = System.currentTimeMillis();
        SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
        channel.addConfirmListener(new ConfirmListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    confirmSeqNo.headSet(deliveryTag + 1).clear();
                } else {
                    confirmSeqNo.remove(deliveryTag);
                }
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    confirmSeqNo.headSet(deliveryTag + 1).clear();
                } else {
                    confirmSeqNo.remove(deliveryTag);
                }
                //业务需要根据实际场景进行处理, 比如重发, 此处代码省略
            }
        });
        for (int i = 0; i < Constants.MESSAGE_COUNT; i++) {
            String msg = "hello publisher confirms" + i;
            long seqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("", Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());
            confirmSeqNo.add(seqNo);
        }
        while (!confirmSeqNo.isEmpty()) {
            Thread.sleep(10);
        }
        long end = System.currentTimeMillis();
        System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n", Constants.MESSAGE_COUNT, end - start);
        //5.关闭资源
        channel.close();
        connection.close();
    }

}

在这里插入图片描述


http://www.kler.cn/a/572838.html

相关文章:

  • EXCEL页脚如何插入表格或者图片
  • #define GBB_DEPRECATED_MSG(msg) __declspec(deprecated(msg))
  • YOLOv8 自定义目标检测
  • Franka机器人FR3快速安装指南
  • mysql AB复制
  • scBaseCamp:一个AI代理的可持续扩充的单细胞数据存储库
  • 【linux】文件与目录命令 - uniq
  • 信创:信息技术应用创新产业
  • [Redis] 终极缓存四连杀:缓存预热、缓存击穿、缓存穿透、缓存雪崩,真的懂了吗?
  • 为什么过滤器需要一个 Wrapper 来 extends HttpServletRequestWrapper
  • 神经网络为什么要用 ReLU 增加非线性?
  • DeepSeek R1模型医疗机构本地化部署评估分析(Discuss V1版上)
  • 探索IntelliJ IDEA的Tongyi Lingma插件:AI编程助手的新体验
  • Java项目中ES作为时序库
  • PHP之常量
  • 基于 Kubernetes 搭建 DevOps 持续集成环境
  • DeepSeek基础学习:深度讲解AI大模型基本原理
  • Unity:物体指定初速度、方向移动方法
  • day1 第二次入门rust
  • dify、open-webui、chatbox 对比