当前位置: 首页 > article >正文

[RabbitMQ] 常见面试题汇总 工作流程 消息可靠性 消息顺序性 幂等性 高级特性 延迟队列 仲裁队列 工作模式 消息积压 推拉模式

Author:
major:人工智能


Build your hopes like a tower!



目录

一.MQ的作用及应用场景

二.常用MQ之间的区别

1. Kafka

2. RabbitMQ

3.RocketMQ

三.RabbitMQ的核心概念和工作流程

核心概念

工作流程

工作模式

简单模式

工作队列

发布/订阅模式

路由模式

topic通配符模式

RPC通信

发布确认模式

四.RabbitMQ消息可靠性保证?编辑

五.RabbitMQ保证消息顺序性

六.RabbitMQ消息积压问题

常见的消息积压原因

解决方案

怎么实现work模式能者多劳

七.如何保证消息消费时的幂等性

幂等性概念

解决方案

全局唯一ID

业务逻辑判断

八.RabbitMQ的死信队列

死信队列概念

消息变死信场景

九.介绍下RabbitMQ的延迟队列

应用场景

延迟队列实现方式

十.推拉模式

推拉模式概念

推拉模式实现


一.MQ的作用及应用场景

**消息队列(MQ)是种应用程序间的通信方法,它允许系统组件以异步的方式进行交互,在不同的应用场景下可以展现不同的作用.**常见的应用场景如下:

异步解耦: 在一些业务场景中,一些操作可能比较耗时,但不需要立即返回结果,比如用户注册的时候,需要给用户发送短信,此时我们就可以作为异步处理,而不必等待所有操作都完成后,才告诉用户注册成功.

使用消息队列进行异步解耦后,当用户注册请求到来时,系统首先将用户信息存储到数据库,然后将发送短信或邮件的任务封装成消息发送到消息队列中,接着立即返回注册成功的结果给用户。之后,会有专门的消费者从消息队列中取出消息,并执行发送短信或邮件的操作。这样,发送短信或邮件的操作就与用户注册的主流程解耦开来,实现了异步处理。

流量削峰: 在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样突发的流量并不常见,如果因为这些特殊情况增加太多的机器,会造成巨大的浪费.于是我们可以使用MQ这种组件,支撑突发的访问压力,不会让我们的数据库因为突发流量而崩溃.比如双十一等活动的特价秒杀,就可以使用MQ来控制流量,让请求在队列中排好队,系统根据自己的处理能力逐步处理这些请求.

异步通信: 在很多时候我们并不需要立即处理消息,MQ为我们提供了异步机制,允许应用把一些消息放到MQ中,而不用立即处理它,在需要的时候再慢慢处理.就像我们在购买电影票的时候,程序会为我们锁票,我们不需要立即付款,而有一个订单失效时间,在我们确认付款后,系统才会处理我们的订单信息.

消息分发: 当多个系统需要对同一数据做出响应时,可以使用MQ进行消息分发,比如支付成功后,支付系统可以向MQ发送信息,其他系统订阅该信息,无需轮询数据库.还可以起到一个类似广播的效果,通知所有订阅MQ的程序.

延迟通知: 在需要在特定的时间后发送通知的场景中,可以使用MQ的延迟信息功能.比如在电子商务平台中,如果用户下单后定时间内未支付,可以使用延迟队列在超时后自动取消订单

二.常用MQ之间的区别

目前业界有很多的MQ产品,例如RabbitMQ,RocketMQ,Kafka等.

1. Kafka

Kafka开始的目的就是用于日志收集和传输,**追求高吞吐量,性能卓越,**单机吞吐达到十万级,在日志领域比较成熟,功能较为简单,主要支持简单的MQ功能.适合大数据处理,日志聚合,实时分析等场景.

2. RabbitMQ

采用Erlang语言开发,MQ功能比较完善,且几乎支持所有主流语言,开源提供的界面也非常友好,性能较好,吞吐量达到万级,社区活跃度高,文档更新频繁.比较适合数据量没那么大,并发量没那么高的场景.

3.RocketMQ

采用Java语言开发,在可用性,可靠性以及稳定性等方面都非常出色,吞吐量能达到十万级,在Alibaba集团内部广泛使用,但支持的客户端语言不多,产品较新文档较少, 且社区活跃度般.适合于大规模分布式系统,可靠性要求高,且并发量大的场景,比如互联网金融.

这些消息队列,各有侧重,没有好坏之分,要看使用的场景,具体问题具体分析.

三.RabbitMQ的核心概念和工作流程

核心概念

RabbitMQ是一个消息中间件,也是一个生产者消费者模型,他负责接收消息,转发消息.

Producer: 生产者, 向RabbitMQ发送消息.

Consumer: 消费者, 从RabbitMQ接收消息.

Broker: 消息队列服务器或服务实例,也就是RabbitMQServer.接收消息并转发消息.

Connection: 网络连接,它允许客户端与RabbitMQ通信.

Channel: 连接里的个虚拟通道,发送或者接收消息都是通过通道进行的.

Exchange: 交换机,负责接收生产者发送的消息,并根据路由算法和规则将消息路由到个或多个队 列.接收消息并路由消息.

Queue: 消息队列,存储消息直到它们被消费者消费.

工作流程

1. 创建连接(Connection),开启信道(Channel).

2. 声明交换机和队列,以及绑定规则(RountingKey)

3. 发布消息.

4. 消息存入消息队列,如果未找到相应的队列,则根据生产者的配置,选择丢弃或者退回给生产者.

5. 消费消息,有手动消息确认和自动消息确认.

6. 消息被确认后,会从队列中删除.

工作模式

public class Contants {
    public static final String HOST = "123.207.74.192";
    public static final Integer PORT = 5672;
    public static final String USER_NAME = "study";
    public static final String PASSWORD = "study";
    public static final String VIRTUAL_HOST = "bite";

    // 工作队列模式
    public static final String WORK_QUEUE = "work.queue";

    // 发布订阅模式
    public static final String FANOUT_EXCHANGE = "fanout.exchange";
    public static final String FANOUT_QUEUE1 = "fanout.queue1";
    public static final String FANOUT_QUEUE2 = "fanout.queue2";

    // 路由模式
    public static final String DIRECT_EXCHANGE = "direct.exchange";
    public static final String DIRECT_QUEUE1 = "direct.queue1";
    public static final String DIRECT_QUEUE2 = "direct.queue2";

    // 通配符模式
    public static final String TOPIC_EXCHANGE = "topic.exchange";
    public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";

    // RPC模式
    public static final String RPC_REQUEST_QUEUE = "rpc.request";
    public static final String RPC_RESPONSE_QUEUE = "rpc.response";

    //publisher confirms
    public static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirms.queue1";
    public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirms.queue2";
    public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirms.queue3";

}
简单模式

public class ConsumerDemo {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        // 1.声明队列
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("123.207.74.192");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("study");
        connectionFactory.setPassword("study");
        connectionFactory.setVirtualHost("bite");
        Connection connection = connectionFactory.newConnection();
        // 2.创建Channel
        Channel channel = connection.createChannel();
        // 3.声明队列(可以省略,但不建议)
        channel.queueDeclare("hello",true,false,false,null); // 已存在则不会创建,不存在则会创建,
                                                // 不声明若队列不存在,则会报错,影响生产环境
        // 4.消费消息
        /**
         *     String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
         * 参数说明
         * queue: 队列名称
         * autoAck: 是否自动确认
         * callback: 接收到消息后,执行的逻辑
         */
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 从队列中接收到信息就会执行的内容
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                /**
                 * consumerTag: 消费者标签
                 * envelope: 封装信息,队列名称,交换机等
                 * propertie: 配置信息
                 * body: 消息内容
                 */
                // TODO
                System.out.println("接收消息: "+new String(body));
            }
        };
        channel.basicConsume("hello",true,consumer);

        // 等待程序执行完成
        Thread.sleep(2000);
        // 5.释放资源
//        channel.close();
//        connection.close();
    }
}


public class ProducerDemo {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Contants.HOST);
        connectionFactory.setPort(Contants.PORT);
        connectionFactory.setUsername(Contants.USER_NAME);
        connectionFactory.setPassword(Contants.PASSWORD);
        connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        // 2.开启信道
        Channel channel = connection.createChannel();
        // 3.声明交换机 使用内置交换机
        // 4.声明队列
        /**
         * Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         *                                  Map<String, Object> arguments) throws IOException;
         * String queue: 队列名称
         * boolean durable: 是否可以持久化
         * boolean exclusive: 是否独占
         * boolean autoDelete: 是否自动删除
         * Map<String, Object> arguments: 参数
         */

        channel.queueDeclare("hello",true,false,false,null);
        // 5.发送消息
        /**
         * String exchange, String routingKey, BasicProperties props, byte[] body
         * exchange: 交换机名称
         * routingKey: 使用内置交换机,routingKey和队列名称保持一致
         * props: 属性
         * body: 发送的消息
         */
        for (int i=0;i<10;i++){
            String msg = "hello RabbitMQ: "+i;
            channel.basicPublish("","hello",null,msg.getBytes());
        }
        System.out.println("消息发送成功");
//        // 6.资源释放(有前后顺序)
//        channel.close(); // 可以省略
//        connection.close();

    }
}
工作队列

简单模式的增强版,和简单模式的区别就是:简单模式有个消费者,工作队列模式支持多个消费者接收消息,消费者之间是竞争关系,每个消息只能被个消费者接收

public class Producer {
    public static void main(String[] args) throws Exception {
        // 1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Contants.HOST);
        connectionFactory.setPort(Contants.PORT);
        connectionFactory.setUsername(Contants.USER_NAME);
        connectionFactory.setPassword(Contants.PASSWORD);
        connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        // 2.开启信道
        Channel channel = connection.createChannel();
        // 3.声明交换机 使用内置交换机
        channel.queueDeclare(Contants.WORK_QUEUE,true,false,false,null);
        // 4.发送消息
        for (int i=0;i<10;i++){
            String msg = "hello work queue.... "+i;
            channel.basicPublish("",Contants.WORK_QUEUE,null,msg.getBytes());
        }
        System.out.println("消息发送成功");
//        // 6.资源释放(有前后顺序)
//        channel.close(); // 可以省略
//        connection.close();

    }
}

public class Consumer1 {
    public static void main(String[] args) throws Exception {
        // 1.声明队列
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Contants.HOST);
        connectionFactory.setPort(Contants.PORT);
        connectionFactory.setUsername(Contants.USER_NAME);
        connectionFactory.setPassword(Contants.PASSWORD);
        connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        // 2.创建Channel
        Channel channel = connection.createChannel();
        // 3.声明队列(使用内置交换机)
        channel.queueDeclare(Contants.WORK_QUEUE,true,false,false,null);// 已存在则不会创建,不存在则会创建,
        // 不声明若队列不存在,则会报错,影响生产环境
        // 4.消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 从队列中接收到信息就会执行的内容
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // TODO
                System.out.println("接收消息: "+new String(body));
            }
        };
        channel.basicConsume(Contants.WORK_QUEUE,true,consumer);

        // 等待程序执行完成

        // 5.释放资源
//        channel.close();
//        connection.close();
    }
}


public class Consumer2 {
    public static void main(String[] args) throws Exception {
        // 1.声明队列
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Contants.HOST);
        connectionFactory.setPort(Contants.PORT);
        connectionFactory.setUsername(Contants.USER_NAME);
        connectionFactory.setPassword(Contants.PASSWORD);
        connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        // 2.创建Channel
        Channel channel = connection.createChannel();
        // 3.声明队列(使用内置交换机)
        channel.queueDeclare(Contants.WORK_QUEUE,true,false,false,null);// 已存在则不会创建,不存在则会创建,
        // 不声明若队列不存在,则会报错,影响生产环境
        // 4.消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 从队列中接收到信息就会执行的内容
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // TODO
                System.out.println("接收消息: "+new String(body));
            }
        };
        channel.basicConsume(Contants.WORK_QUEUE,true,consumer);

        // 等待程序执行完成
        // 5.释放资源
//        channel.close();
//        connection.close();
    }
}
发布/订阅模式

public class Producer {
    public static void main(String[] args) throws Exception {
        // 1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Contants.HOST);
        connectionFactory.setPort(Contants.PORT);
        connectionFactory.setUsername(Contants.USER_NAME);
        connectionFactory.setPassword(Contants.PASSWORD);
        connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        // 2.开启信道
        Channel channel = connection.createChannel();
        // 3.声明交换机
        channel.exchangeDeclare(Contants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true);
        // 4.声明队列
        channel.queueDeclare(Contants.FANOUT_QUEUE1,true,false,false,null);
        channel.queueDeclare(Contants.FANOUT_QUEUE2,true,false,false,null);
        // 5.交换机和队列绑定
        channel.queueBind(Contants.FANOUT_QUEUE1,Contants.FANOUT_EXCHANGE,"");
        channel.queueBind(Contants.FANOUT_QUEUE2,Contants.FANOUT_EXCHANGE,"");
        // 6.发送消息
        String msg = "hello fanout...";
        channel.basicPublish(Contants.FANOUT_EXCHANGE,"",null,msg.getBytes());
        // 7.释放资源
        channel.close();
        connection.close();
    }
}

public class Consumer1 {
    public static void main(String[] args) throws Exception {
        // 1.声明队列
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Contants.HOST);
        connectionFactory.setPort(Contants.PORT);
        connectionFactory.setUsername(Contants.USER_NAME);
        connectionFactory.setPassword(Contants.PASSWORD);
        connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        // 2.创建Channel
        Channel channel = connection.createChannel();
        // 3.声明队列(使用内置交换机)
        channel.queueDeclare(Contants.FANOUT_QUEUE1,true,false,false,null);// 已存在则不会创建,不存在则会创建,
        // 不声明若队列不存在,则会报错,影响生产环境
        // 4.消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 从队列中接收到信息就会执行的内容
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // TODO
                System.out.println("接收消息: "+new String(body));
            }
        };
        channel.basicConsume(Contants.FANOUT_QUEUE1,true,consumer);

        // 等待程序执行完成

        // 5.释放资源
//        channel.close();
//        connection.close();
    }

}


public class Consumer2 {
    public static void main(String[] args) throws Exception {
        // 1.声明队列
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Contants.HOST);
        connectionFactory.setPort(Contants.PORT);
        connectionFactory.setUsername(Contants.USER_NAME);
        connectionFactory.setPassword(Contants.PASSWORD);
        connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        // 2.创建Channel
        Channel channel = connection.createChannel();
        // 3.声明队列(使用内置交换机)
        channel.queueDeclare(Contants.FANOUT_QUEUE2,true,false,false,null);// 已存在则不会创建,不存在则会创建,
        // 不声明若队列不存在,则会报错,影响生产环境
        // 4.消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 从队列中接收到信息就会执行的内容
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // TODO
                System.out.println("接收消息: "+new String(body));
            }
        };
        channel.basicConsume(Contants.FANOUT_QUEUE2,true,consumer);

        // 等待程序执行完成

        // 5.释放资源
//        channel.close();
//        connection.close();
    }

}
路由模式

public class Producer {
    public static void main(String[] args) throws Exception {
        // 1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Contants.HOST);
        connectionFactory.setPort(Contants.PORT);
        connectionFactory.setUsername(Contants.USER_NAME);
        connectionFactory.setPassword(Contants.PASSWORD);
        connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        // 2.开启信道
        Channel channel = connection.createChannel();
        // 3.声明交换机
        channel.exchangeDeclare(Contants.DIRECT_EXCHANGE,BuiltinExchangeType.DIRECT,true);
        // 4.声明队列
        channel.queueDeclare(Contants.DIRECT_QUEUE1,true,false,false,null);
        channel.queueDeclare(Contants.DIRECT_QUEUE2,true,false,false,null);
        // 5.交换机和队列绑定
        channel.queueBind(Contants.DIRECT_QUEUE1,Contants.DIRECT_EXCHANGE,"a",null);
        channel.queueBind(Contants.DIRECT_QUEUE2,Contants.DIRECT_EXCHANGE,"a",null);
        channel.queueBind(Contants.DIRECT_QUEUE2,Contants.DIRECT_EXCHANGE,"b",null);
        channel.queueBind(Contants.DIRECT_QUEUE2,Contants.DIRECT_EXCHANGE,"c",null);
        // 6.发送消息
        String msg_a = "hello direct,my routingkey is a...";
        channel.basicPublish(Contants.DIRECT_EXCHANGE,"a",null,msg_a.getBytes());

        String msg_b = "hello direct,my routingkey is b...";
        channel.basicPublish(Contants.DIRECT_EXCHANGE,"b",null,msg_b.getBytes());

        String msg_c = "hello direct,my routingkey is c...";
        channel.basicPublish(Contants.DIRECT_EXCHANGE,"c",null,msg_c.getBytes());

        System.out.println("消息发送成功");
        // 7.释放资源
        channel.close();
        connection.close();
    }
}

public class Consumer1 {
    public static void main(String[] args) throws Exception {
        // 1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Contants.HOST);
        connectionFactory.setPort(Contants.PORT);
        connectionFactory.setUsername(Contants.USER_NAME);
        connectionFactory.setPassword(Contants.PASSWORD);
        connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        // 2.开启信道
        Channel channel = connection.createChannel();
        // 3.声明队列(使用内置交换机)
        channel.queueDeclare(Contants.DIRECT_QUEUE1,true,false,false,null);// 已存在则不会创建,不存在则会创建,
        // 不声明若队列不存在,则会报错,影响生产环境
        // 4.消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 从队列中接收到信息就会执行的内容
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // TODO
                System.out.println("接收消息: "+new String(body));
            }
        };
        channel.basicConsume(Contants.DIRECT_QUEUE1,true,consumer);

        // 等待程序执行完成

        // 5.释放资源
//        channel.close();
//        connection.close();
    }
}


public class Consumer2 {
    public static void main(String[] args) throws Exception {
        // 1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Contants.HOST);
        connectionFactory.setPort(Contants.PORT);
        connectionFactory.setUsername(Contants.USER_NAME);
        connectionFactory.setPassword(Contants.PASSWORD);
        connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        // 2.开启信道
        Channel channel = connection.createChannel();
        // 3.声明队列(使用内置交换机)
        channel.queueDeclare(Contants.DIRECT_QUEUE2,true,false,false,null);// 已存在则不会创建,不存在则会创建,
        // 不声明若队列不存在,则会报错,影响生产环境
        // 4.消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 从队列中接收到信息就会执行的内容
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // TODO
                System.out.println("接收消息: "+new String(body));
            }
        };
        channel.basicConsume(Contants.DIRECT_QUEUE2,true,consumer);

        // 等待程序执行完成

        // 5.释放资源
//        channel.close();
//        connection.close();
    }
}
topic通配符模式

public class Producer {
    public static void main(String[] args) throws Exception {
        // 1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Contants.HOST);
        connectionFactory.setPort(Contants.PORT);
        connectionFactory.setUsername(Contants.USER_NAME);
        connectionFactory.setPassword(Contants.PASSWORD);
        connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        // 2.开启信道
        Channel channel = connection.createChannel();
        // 3.声明交换机
        channel.exchangeDeclare(Contants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true);
        // 4.声明队列
        channel.queueDeclare(Contants.TOPIC_QUEUE1,true,false,false,null);
        channel.queueDeclare(Contants.TOPIC_QUEUE2,true,false,false,null);
        // 5.交换机和队列绑定
        channel.queueBind(Contants.TOPIC_QUEUE1,Contants.TOPIC_EXCHANGE,"*.a.*",null);
        channel.queueBind(Contants.TOPIC_QUEUE2,Contants.TOPIC_EXCHANGE,"*.*.b",null);
        channel.queueBind(Contants.TOPIC_QUEUE2,Contants.TOPIC_EXCHANGE,"c.#",null);
        // 6.发送消息
        String msg_a = "hello topic,my routingkey is ae.a.f";
        channel.basicPublish(Contants.TOPIC_EXCHANGE,"ae.a.f",null,msg_a.getBytes()); // 转发的Q1

        String msg_b = "hello topic,my routingkey is ef.a.b";
        channel.basicPublish(Contants.TOPIC_EXCHANGE,"ef.a.b",null,msg_b.getBytes()); // 转发到Q1和Q2

        String msg_c = "hello topic,my routingkey is c.ef.d";
        channel.basicPublish(Contants.TOPIC_EXCHANGE,"c.ef.d",null,msg_c.getBytes()); // 转发到Q2

        System.out.println("消息发送成功");
        // 7.释放资源
        channel.close();
        connection.close();
    }
}


public class Consumer1 {
    public static void main(String[] args) throws Exception {
        // 1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Contants.HOST);
        connectionFactory.setPort(Contants.PORT);
        connectionFactory.setUsername(Contants.USER_NAME);
        connectionFactory.setPassword(Contants.PASSWORD);
        connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        // 2.开启信道
        Channel channel = connection.createChannel();
        // 3.声明队列(使用内置交换机)
        channel.queueDeclare(Contants.TOPIC_QUEUE1,true,false,false,null);// 已存在则不会创建,不存在则会创建,
        // 不声明若队列不存在,则会报错,影响生产环境
        // 4.消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 从队列中接收到信息就会执行的内容
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // TODO
                System.out.println("接收消息: "+new String(body));
            }
        };
        channel.basicConsume(Contants.TOPIC_QUEUE1,true,consumer);

        // 等待程序执行完成

        // 5.释放资源
//        channel.close();
//        connection.close();
    }
}


public class Consumer2 {
    public static void main(String[] args) throws Exception {
        // 1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Contants.HOST);
        connectionFactory.setPort(Contants.PORT);
        connectionFactory.setUsername(Contants.USER_NAME);
        connectionFactory.setPassword(Contants.PASSWORD);
        connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        // 2.开启信道
        Channel channel = connection.createChannel();
        // 3.声明队列(使用内置交换机)
        channel.queueDeclare(Contants.TOPIC_QUEUE2,true,false,false,null);// 已存在则不会创建,不存在则会创建,
        // 不声明若队列不存在,则会报错,影响生产环境
        // 4.消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 从队列中接收到信息就会执行的内容
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // TODO
                System.out.println("接收消息: "+new String(body));
            }
        };
        channel.basicConsume(Contants.TOPIC_QUEUE2,true,consumer);

        // 等待程序执行完成

        // 5.释放资源
//        channel.close();
//        connection.close();
    }
}
RPC通信

public class RpcClient {
    public static void main(String[] args) throws Exception{
        // 1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Contants.HOST);
        connectionFactory.setPort(Contants.PORT);
        connectionFactory.setUsername(Contants.USER_NAME);
        connectionFactory.setPassword(Contants.PASSWORD);
        connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        // 2.开启信道
        Channel channel = connection.createChannel();
        channel.queueDeclare(Contants.RPC_REQUEST_QUEUE,true,false,false,null);
        channel.queueDeclare(Contants.RPC_RESPONSE_QUEUE,true,false,false,null);
        // 3.发送请求
        // 设置请求的相关属性
        String msg = "hello rpc";
        String correlationId = UUID.randomUUID().toString();
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .correlationId(correlationId) // 设置请求的唯一标识
                .replyTo(Contants.RPC_RESPONSE_QUEUE) // 告诉服务端(Server)完成响应后将结果放在 Contants.RPC_RESPONSE_QUEUE 中
                .build();
        channel.basicPublish("",Contants.RPC_REQUEST_QUEUE,props,msg.getBytes());

        // 4.接受响应
        // 使用阻塞队列,存储响应信息
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String respMsg = new String(body);
                System.out.println("接收到回调消息: " + respMsg);
                if (correlationId.equals(properties.getCorrelationId())){
                    // 如果correlationId一致,校验成功
                    response.offer(respMsg);
                }
            }
        };
        channel.basicConsume(Contants.RPC_RESPONSE_QUEUE,true,consumer); // 客户端的消费队列是 Contants.RPC_RESPONSE_QUEUE
        String result = response.take();
        System.out.println("RPC Client 响应结果: "+result);

    }
}


public class RpcServer {
    public static void main(String[] args) throws Exception{
        // 1.建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Contants.HOST);
        connectionFactory.setPort(Contants.PORT);
        connectionFactory.setUsername(Contants.USER_NAME);
        connectionFactory.setPassword(Contants.PASSWORD);
        connectionFactory.setVirtualHost(Contants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        // 2.开启信道
        Channel channel = connection.createChannel();

        // 3.接受请求
        channel.basicQos(1); // 一次只能接收一条信息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String request = new String(body,"UTF-8");
                System.out.println("接收到请求: "+request);
                String response = "针对request: "+request+" 响应成功";
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                channel.basicPublish("",Contants.RPC_RESPONSE_QUEUE,basicProperties,response.getBytes());
                // getDeliveryTag:开启手动确认会生成,明确确认的信息是哪一条  false -> 不设置批量确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        channel.basicConsume(Contants.RPC_REQUEST_QUEUE,false,consumer);
    }
}
发布确认模式

public class PublisherConfirms {
    private static final Integer MESSAGE_COUNT = 10000;
    static Connection createConnection() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        return connectionFactory.newConnection();
    }

    public static void main(String[] args) throws Exception {
        //Strategy #1: Publishing Messages Individually
        //单独确认
//        publishingMessagesIndividually();
        //Strategy #2: Publishing Messages in Batches
        //批量确认
        publishingMessagesInBatches();
        
        //Strategy #3: Handling Publisher Confirms Asynchronously
        //异步确认
        handlingPublisherConfirmsAsynchronously();
        
    }

    /**
     * 异步确认
     */
    private static void handlingPublisherConfirmsAsynchronously() throws Exception{
        try (Connection connection = createConnection()){
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3. 声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);
            //4. 监听confirm
            //集合中存储的是未确认的消息ID
            long start = System.currentTimeMillis();
            SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());

            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    if (multiple){
                        confirmSeqNo.headSet(deliveryTag+1).clear();
                    }else {
                        confirmSeqNo.remove(deliveryTag);
                    }
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    if (multiple){
                        confirmSeqNo.headSet(deliveryTag+1).clear();
                    }else {
                        confirmSeqNo.remove(deliveryTag);
                    }
                    //业务需要根据实际场景进行处理, 比如重发, 此处代码省略
                }
            });
            //5. 发送消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                long seqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());
                confirmSeqNo.add(seqNo);
            }
            while (!confirmSeqNo.isEmpty()){
                Thread.sleep(10);
            }
            long end = System.currentTimeMillis();
            System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms 
",MESSAGE_COUNT, end-start);
        }
    }

    /**
     * 批量确认
     * @throws Exception
     */
    private static void publishingMessagesInBatches() throws Exception{
        try(Connection connection = createConnection()) {
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3. 声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);
            //4. 发送消息, 并进行确认
            long start = System.currentTimeMillis();
            int batchSize = 100;
            int outstandingMessageCount = 0;
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());
                outstandingMessageCount++;
                if (outstandingMessageCount==batchSize){
                    channel.waitForConfirmsOrDie(5000);
                    outstandingMessageCount = 0;
                }
            }
            if (outstandingMessageCount>0){
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms 
",MESSAGE_COUNT, end-start);

        }
    }

    /**
     * 单独确认
     */
    private static void publishingMessagesIndividually() throws Exception {
        try(Connection connection = createConnection()) {
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3. 声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);
            //4. 发送消息, 并等待确认
            long start = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());
                //等待确认
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms 
",MESSAGE_COUNT, end-start);
        }
    }
}

四.RabbitMQ消息可靠性保证

1. 生产者问题.因为应用程序故障,网络抖动等各种原因,生产者没有成功向broker发送消息.

2. 消息中间件自身问题.生产者成功发送给了Broker,但是Broker没有把消息保存好,导致消息丢失.

3. 消费者问题.Broker发送消息到消费者,消费者在消费消息时,因为没有处理好,导致broker将消费 失败的消息从队列中删除了.

针对 Producer -> Broker ,我们可以采用发送方确认机制实现.

  1. Producer -> Exchange confirm模式

  2. Exchang -> Queue return模式

  3. 队列设置长度: 死信

针对 Broker ,交换机,队列,消息持久化.

针对 Broker -> Consumer 消费者确认

  1. 自动确认,消息到达消费者之后,如果消费者有逻辑错误,处理失败,此时消息已经从队列中删除,会导致消息丢失.

2) 手动确认,提高消息可靠性.

3) 重试机制,当消息消费失败可以重复消费来确保消息可靠性,不过要设置最大重试次数避免CPU资源的浪费.

五.RabbitMQ保证消息顺序性

消息顺序性保障分为:局部顺序性保证和全局顺序性保证. 局部顺序性通常指的是在单个队列内部保证消息的顺序.全局顺序性是指在多个队列或多个消费者之间保证消息的顺序.

在实际应用中,全局顺序性很难实现,可以考虑使用业务逻辑来保证顺序性,比如在消息中嵌入全局唯一序列号, 并在消费端进行排序处理.比如在购买商品的时候商品有唯一的可以排序订单号,我们可以通过查看该订单号的前一条和后一条是否已经被消费,再考虑是否消费该信息.(业务逻辑控制)

也可以考虑单队列,单消费者,不过全局顺序性这种情况下效率太低,我们再生产环境中要尽量避免.

相对而言, 局部顺序性更常见,也更容易实现.单个消费者的吞吐太低了,当需要多个消费者以提高处理速度时,可以使用分区消费.把个队列分割成 多个分区,每个分区由个消费者处理,以此来保持每个分区内消息的顺序性.(分区消费)

还可以考虑采用消息确认机制,消费者在处理完条消息后,显式地发送确认,这样RabbitMQ才会移除并继续发送下一条消息.

六.RabbitMQ消息积压问题

消息积压是指在消息队列(如RabbitMQ)中,待处理的消息数量超过了消费者处理能力,导致消息在队列中不断堆积的现象.

常见的消息积压原因

**1. 消息生产过快.**在高流量或者高负载的情况下,生产者以极高的速率发送消息,超过了消费者的处理 能力.

**2.消费者能力不足.**消费者处理处理消息的速度跟不上消息生产的速度,也会导致消息在队列中积压.比如消费端业务逻辑复杂,耗时长,消费端性能低,系统资源限制,异常处理不当等

**3.网络问题.**出现网络延迟或者网络抖动,消费者无法及时或确认消息,最终导致消息积压.

4.RabbitMQ服务配置过低.

解决方案

1. 提高消费者效率.

a. 增加消费者实例数量,比如新增机器.

b. 优化业务逻辑,比如使用多线程来处理业务.

c. 设置prefetchCount,当个消费者阻塞时,消息转发到其他未阻塞的消费者.

d. 消息发生异常时,设置合适的重试策略,或者转入到死信队列.

2. 限制生产者速率:流量控制,限流算法.

a. 流量控制:在消息生产者中实现流量控制逻辑,根据消费者处理能力动态调整发送速率.

b. 限流:使用限流工具,为消息发送速率设置个上限.

c. 设置过期时间.如果消息过期未消费,可以配置死信队列,以避免消息丢失,并减少对主队列的压力.

3. 资源与配置优化

升级RabbitMQ服务器硬件,调整配置参数等.

怎么实现work模式能者多劳

在 Work 模式中,默认情况下 RabbitMQ 采用轮询(round-robin)的方式将消息依次分发给各个消费者,即每个消费者会轮流接收到消息。但这种方式可能会导致性能不均衡,能力强的消费者可能会闲置,而能力弱的消费者可能会堆积大量任务。为了实现能者多劳,可以通过设置消费者的预取计数(prefetch count)来实现。

七.如何保证消息消费时的幂等性

幂等性概念

在应用程序中,幂等性就是指对个系统进行重复调用(相同参数),不论请求多少次,这些请求对系统的影响都是相同的效果.

比如数据库的 select 操作.不同时间两次查询的结果可能不同,但是这个操作是符合幂等性的.幂等 性指的是对资源的影响,而不是返回结果.查询操作对数据资源本身不会产生影响,之所以结果不同,可 能是因为两次查询之间有其他操作对资源进行了修改.

对于MQ而言,幂等性是指同条消息,多次消费,对系统的影响是相同的.

一般消息中间件的消息传输保障分为三个层级.

1. Atmostonce:最多次.消息可能会丢失,但绝不会重复传输.

2. Atleast once:最少次.消息绝不会丢失,但可能会重复传输.

3. Exactly once:恰好次.每条消息肯定会被传输次且仅传输次. RabbitMQ支持"最多一次"和"最少一次".

解决方案

MQ消费者的幂等性解决方案,一般有以下几种:

全局唯一ID

1. 为每条消息分配个唯一标识符,比如UUID或者MQ消息中的唯一ID,但是定要保证唯一性.

2. 消费者收到消息后,先用该id判断该消息是否已经消费过,如果已经消费过,则放弃处理.

3. 如果未消费过,消费者开始消费消息,业务处理成功后,把唯一ID保存起来(数据库或Redis等)

业务逻辑判断

在业务逻辑层面实现消息处理的幂等性. 例如:通过检查数据库中是否已存在相关数据记录,或者使用乐观锁机制来避免更新已被其他事务更改的数据,再或者在处理消息之前,先检查相关业务的状态,确保消息对应的操作尚未执行,然后才进行处 理,具体根据业务场景来处理.

八.RabbitMQ的死信队列

死信队列概念

**死信(deadmessage) 简单理解就是因为种种原因,无法被消费的信息,就是死信. 有死信,自然就有死信队列.**当消息在个队列中变成死信之后,它能被重新被发送到另个交换器中,这个交换器就是DLX( Dead Letter Exchange ),绑定DLX的队列,就称为死信队列(Dead Letter Queue,简称DLQ).

消息变死信场景

消息变成死信般是由于以下几种情况:

1. 消息被拒绝( Basic.Reject/Basic.Nack ),并且设置 requeue 参数为 false (放弃重试)

2. 消息过期.

3. 队列达到最大长度.

九.介绍下RabbitMQ的延迟队列

延迟队列(DelayedQueue),即消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后, 消费者才能拿到这个消息进行消费.

应用场景

1. 智能家居:用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作.这时候就可以将用户指令发送到延迟队列,当指令设定的时间到了再将指令推送到智能设备.

2. 日常管理:预定会议后,需要在会议开始前十五分钟提醒参会人参加会议.

3. 用户注册成功后,7天后发送短信,提高用户活跃度等.

延迟队列实现方式

1. 使用死信队列达到延迟队列的效果.

2. 使用RabbitMQ官方提供的延迟队列插件.

1. 基于死信实现的延迟队列.

a. 优点: 1) 灵活不需要额外的插件支持.

b. 缺点: 1) 存在消息顺序问题.

  1. 需要额外的逻辑来处理死信队列的消息,增加了系统的复杂性.

2. 基于插件实现的延迟队列.

a. 优点: 1) 通过插件可以直接创建延迟队列,简化延迟消息的实现.

  1. 避免了DLX的时序问题.(使用死信作为延迟队列,消息一30s过期,消息二10s过期,消息一在消息二前面,此时消息一过期了,后面的消息二也跟着过期了).

b. 缺点: 1) 需要依赖特定的插件,有运维工作.

  1. 只适特定版本.

十.推拉模式

RabbitMQ支持两种消息传递模式:推模式(push)和拉模式(pull).

推拉模式概念

推模式:消息中间件主动将消息推送给消费者.

拉模式:消费者主动从消息中间件拉取消息.

RabbitMQ主要是基于推模式工作的,它的核心设计是让消息队列中的消费者接收到由生产者发送的消 息.使用channel.basicConsume方法订阅队列,RabbitMQ就会把消息推送到订阅该队列的消费者,如 果只想从队列中获取单条消息而不是持续订阅,则可以使channel.basicGet方法来进行消费消息.

推拉模式实现

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列   使用内置的交换机
        //如果队列不存在, 则创建, 如果队列存在, 则不创建
        channel.queueDeclare(Constants.MESSAGE_QUEUE, true, false, false, null);
        //4. 发送消息
        for (int i = 0; i < 10; i++) {
            String msg = "hello message queue...."+i;
            channel.basicPublish("",Constants.MESSAGE_QUEUE, null, msg.getBytes());
        }

        System.out.println("消息发送成功~");
        //6. 资源释放
        channel.close();
        connection.close();
    }
}


public class PullConsumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列   使用内置的交换机
        //如果队列不存在, 则创建, 如果队列存在, 则不创建
        channel.queueDeclare(Constants.MESSAGE_QUEUE, true, false, false, null);
        //4. pull 模式获取消息
        GetResponse getResponse = channel.basicGet(Constants.MESSAGE_QUEUE, true);
        System.out.println("拉模式获取消息: "+ new String(getResponse.getBody(),"UTF-8"));
        //6. 资源释放
//        channel.close();
//        connection.close();
    }
}


public class PushConsumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列   使用内置的交换机
        //如果队列不存在, 则创建, 如果队列存在, 则不创建
        channel.queueDeclare(Constants.MESSAGE_QUEUE, true, false, false, null);
        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //TODO
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.MESSAGE_QUEUE, true, consumer);

//        //6. 资源释放
//        channel.close();
//        connection.close();
    }
}


http://www.kler.cn/a/536659.html

相关文章:

  • p5r预告信生成器API
  • 1.31-子序列问题
  • 485网关数据收发测试
  • AI驱动的智能流程自动化是什么
  • 无人机方位感知器官磁力传感器!
  • intra-mart实现简易登录页面笔记
  • easyxor
  • 赛博算命之 ”梅花易数“ 的 “JAVA“ 实现 ——从玄学到科学的探索
  • element-ui rate 组件源码分享
  • zsh: command not found: pip
  • Android Studio:键值对存储sharedPreferences
  • unity视频在场景中的使用
  • APP广告变现如何优化广告填充率,提升变现收益?
  • Mysql知识梳理(数据库的锁梳理,Mysql优化)
  • 回溯算法中关于剪枝的一些应用
  • PHP PDO 教程
  • 【Elasticsearch】date range聚合
  • 安卓/ios脚本开发按键精灵经验小分享
  • .net一些知识点5
  • VMware下Linux和macOS安装VSCode一些总结
  • 2025 IT职业发展方向及推荐
  • 基于SpringBoot养老院平台系统功能实现六
  • 【SpringBoot篇】基于Redis分布式锁的 误删问题 和 原子性问题
  • log4j2日志配置文件
  • DeepSeek 引领的 AI 范式转变与存储架构的演进
  • pring MVC 中的 `@RequestParam` 注解