当前位置: 首页 > article >正文

java每日精进 2.24 【MQ实际应用场景】

1.可靠的消息投递方案

一、可靠消息投递的核心目标

  1. 消息不丢失:

    • 确保消息从生产者到RabbitMQ,再到消费者的整个链路中不会丢失。

  2. 消息不重复:

    • 避免消息因网络问题或重试机制导致重复投递。

  3. 消息顺序性:

    • 在需要保证顺序的场景下,确保消息按顺序投递。

二、可靠消息投递方案设计

1. 生产者端的可靠性

生产者在发送消息时,需要确保消息成功到达RabbitMQ。如果消息发送失败,需要进行重试。

实现步骤:
  1. 开启生产者确认模式(Publisher Confirm):

    • RabbitMQ提供了生产者确认机制,生产者发送消息后,RabbitMQ会返回一个确认(ack)或拒绝(nack)。

    • 配置生产者确认模式:

      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("localhost");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      channel.confirmSelect(); // 开启确认模式
  2. 异步监听确认结果:

    • 通过异步监听确认结果,确保消息成功发送。

      channel.addConfirmListener(new ConfirmListener() {
          @Override
          public void handleAck(long deliveryTag, boolean multiple) {
              System.out.println("消息已确认,deliveryTag: " + deliveryTag);
          }
      
          @Override
          public void handleNack(long deliveryTag, boolean multiple) {
              System.out.println("消息未确认,deliveryTag: " + deliveryTag);
              // 重试发送消息
          }
      });
  3. 消息持久化:

    • 发送消息时,将消息标记为持久化,确保RabbitMQ重启后消息不会丢失。

      AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
          .deliveryMode(2) // 2表示持久化消息
          .build();
      channel.basicPublish("exchange", "routingKey", properties, message.getBytes());
  4. 生产者重试机制:

    • 如果消息发送失败(未收到ack),生产者需要进行重试。

    • 可以使用Spring Retry等工具实现重试逻辑。


2. RabbitMQ端的可靠性

RabbitMQ需要确保消息在队列中不会丢失,并且在消费者处理失败时能够重新投递。

实现步骤:
  1. 队列持久化:

    • 创建队列时,将队列标记为持久化。

      boolean durable = true;
      channel.queueDeclare("queueName", durable, false, false, null);
  2. 交换机持久化:

    • 创建交换机时,将交换机标记为持久化。

      boolean durable = true;
      channel.exchangeDeclare("exchangeName", "direct", durable);
  3. 消息持久化:

    • 发送消息时,将消息标记为持久化(见生产者端实现)。

  4. 死信队列(DLX):

    • 如果消息在队列中无法被消费者正确处理,可以将其路由到死信队列,避免消息丢失。

      Map<String, Object> args = new HashMap<>();
      args.put("x-dead-letter-exchange", "dlxExchange");
      args.put("x-dead-letter-routing-key", "dlxRoutingKey");
      channel.queueDeclare("queueName", true, false, false, args);

3. 消费者端的可靠性

消费者需要确保消息被正确处理,并且在处理失败时能够重新投递。

实现步骤:
  1. 手动确认模式(Manual Ack):

    • 消费者在处理完消息后,手动向RabbitMQ发送确认(ack)。如果处理失败,则不发送ack,RabbitMQ会将消息重新投递。

      boolean autoAck = false; // 关闭自动确认
      channel.basicConsume("queueName", autoAck, new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              try {
                  // 处理消息
                  processMessage(body);
                  // 手动确认
                  channel.basicAck(envelope.getDeliveryTag(), false);
              } catch (Exception e) {
                  // 处理失败,拒绝消息并重新入队
                  channel.basicNack(envelope.getDeliveryTag(), false, true);
              }
          }
      });
  2. 消费者重试机制:

    • 如果消息处理失败,消费者可以进行重试。可以在消费者端实现重试逻辑,或者依赖RabbitMQ的重试机制。

  3. 幂等性设计:

    • 消费者处理消息时需要保证幂等性,避免重复消费导致的数据不一致。

    • 可以通过唯一ID、数据库唯一约束等方式实现幂等性。


4. 消息可靠性增强

  1. 事务机制:

    • RabbitMQ支持事务机制,但性能较差,通常不推荐使用。

      channel.txSelect(); // 开启事务
      try {
          channel.basicPublish("exchange", "routingKey", properties, message.getBytes());
          channel.txCommit(); // 提交事务
      } catch (Exception e) {
          channel.txRollback(); // 回滚事务
      }
  2. 消息补偿机制:

    • 如果消息最终无法投递或处理,可以通过日志记录、告警等方式进行人工干预。


三、完整流程示例

  1. 生产者发送消息:

    • 开启生产者确认模式,发送持久化消息。

    • 如果未收到ack,进行重试。

  2. RabbitMQ存储消息:

    • 将消息存储在持久化队列中。

    • 如果消息无法投递,路由到死信队列。

  3. 消费者处理消息:

    • 使用手动确认模式,确保消息被正确处理。

    • 如果处理失败,拒绝消息并重新入队。

2.代码示例

一、生产者发送消息

1.代码实现:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendOrderMessage(String orderId) {
        // 设置消息属性(持久化)
        MessageProperties properties = new MessageProperties();
        properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 持久化消息
        Message message = new Message(("订单消息:订单ID=" + orderId).getBytes(), properties);

        // 设置消息ID
        CorrelationData correlationData = new CorrelationData(orderId);

        // 发送消息
        rabbitTemplate.convertAndSend("order_exchange", "order_routing_key", message, correlationData);

        // 监听确认回调
        rabbitTemplate.setConfirmCallback((correlationData1, ack, cause) -> {
            if (ack) {
                System.out.println("消息已确认,订单ID: " + correlationData1.getId());
            } else {
                System.out.println("消息未确认,订单ID: " + correlationData1.getId() + ",原因: " + cause);
                // 重试发送消息
                retrySendMessage(orderId);
            }
        });
    }

    private void retrySendMessage(String orderId) {
        // 重试逻辑
        System.out.println("重试发送消息,订单ID: " + orderId);
        sendOrderMessage(orderId);
    }
}

2.注

  • MessageProperties:代表消息的属性,这些属性决定消息的行为和存储方式。
  • setDeliveryMode(MessageDeliveryMode.PERSISTENT):设置消息为持久化,即使 RabbitMQ 宕机,消息也不会丢失。持久化的消息会保存在磁盘中。
  • 创建一个消息对象 (Message),消息内容是一个字符串 订单消息:订单ID=xxx,并将其转换为字节数组(getBytes())。
  • 该消息还附带了我们之前设置的属性 (properties),比如消息的持久化属性。
  • CorrelationData:这是一个用于关联发送的消息与回调(确认)信息的对象。在此例中,它使用订单ID作为消息的唯一标识符。这样在回调中就能知道是哪个订单的消息。
  • convertAndSend 方法用于将消息发送到指定的交换机、路由键和消息。它的参数如下:
    • "order_exchange":目标交换机的名称。
    • "order_routing_key":路由键,用来决定消息应该被发送到哪个队列。
    • message:要发送的消息。
    • correlationData:消息的唯一标识符,用于确认回调。

3.说明

  1. 消息持久化:

    • 通过 MessageProperties 设置消息为持久化。

  2. 生产者确认模式:

    • 使用 setConfirmCallback 监听消息确认结果。

  3. 重试机制:

    • 如果消息未确认(ack=false),触发重试逻辑。

二、RabbitMQ 配置

1.代码实现:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {

    // 订单交换机
    @Bean
    public Exchange orderExchange() {
        return new DirectExchange("order_exchange", true, false); // 持久化交换机
    }

    // 订单队列
    @Bean
    public Queue orderQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "dlx_exchange"); // 死信交换机
        args.put("x-dead-letter-routing-key", "dlx_routing_key"); // 死信路由键
        return new Queue("order_queue", true, false, false, args); // 持久化队列
    }

    // 绑定订单队列和交换机
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("order_routing_key").noargs();
    }

    // 死信交换机
    @Bean
    public Exchange dlxExchange() {
        return new DirectExchange("dlx_exchange", true, false); // 持久化死信交换机
    }

    // 死信队列
    @Bean
    public Queue dlxQueue() {
        return new Queue("dlx_queue", true); // 持久化死信队列
    }

    // 绑定死信队列和死信交换机
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx_routing_key").noargs();
    }
}

2.注

  • 订单队列 (order_queue) 用于处理正常的消息,若消息无法消费或有问题,它会被转发到 死信交换机 (dlx_exchange)
  • 死信队列 (dlx_queue) 用于存储那些无法正常消费的消息(死信)。
  • orderQueue 和 dlxQueue 都是持久化队列,保证消息不会丢失。
  • orderExchange 和 dlxExchange 都是持久化交换机,确保交换机本身也不会丢失。

3.说明

  1. 持久化交换机和队列:

    • 交换机和队列都设置为持久化。

  2. 死信队列配置:

    • 如果消息无法被正确处理,会被路由到死信队列。

三、消费者处理消息

1.代码实现:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderConsumer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = "order_queue")
    public void handleOrderMessage(Message message) {
        try {
            // 处理消息
            String orderMessage = new String(message.getBody());
            processOrder(orderMessage);

            // 手动确认消息
            rabbitTemplate.execute(channel -> {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                return null;
            });
            System.out.println("消息处理成功:" + orderMessage);
        } catch (Exception e) {
            // 处理失败,拒绝消息并重新入队
            rabbitTemplate.execute(channel -> {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                return null;
            });
            System.out.println("消息处理失败,重新入队:" + new String(message.getBody()));
        }
    }

    private void processOrder(String orderMessage) throws Exception {
        // 模拟订单处理逻辑
        System.out.println("处理订单:" + orderMessage);
        if (orderMessage.contains("失败")) {
            throw new Exception("订单处理失败");
        }
    }
}

2.备注

  • basicAck 方法确认消息已经被成功处理并从队列中移除。deliveryTag 是消息的唯一标识符,false 表示消息仅对当前消费者确认。
  • basicNack 方法用于拒绝消息并将其重新入队。deliveryTag 标识消息,false 表示该消息不需要重新路由,true 表示消息会被重新入队供后续消费者继续消费。
  • processOrder 方法模拟了一个订单的处理过程。
  • 如果订单消息包含 "失败" 字符串,就抛出一个异常,表示订单处理失败。这会触发异常处理逻辑,将消息重新入队。
  • channel.basicNack:这是用于拒绝当前消息的操作。它有三个参数:
    • deliveryTag:消息的唯一标识符,可以通过 message.getMessageProperties().getDeliveryTag() 获取。
    • false:表示是否对多个消息进行批量确认。false 表示仅拒绝当前消息,而不是批量拒绝。
    • true:表示是否将消息重新入队。如果设为 true,消息会被重新投递到队列中,供后续消费者再次处理。

3.说明

  1. 手动确认模式:

    • 使用 basicAck 确认消息,使用 basicNack 拒绝消息并重新入队。

  2. 幂等性设计:

    • 在 processOrder 方法中实现幂等性逻辑。

四、死信队列处理

代码实现:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQTest implements CommandLineRunner {

    @Autowired
    private OrderProducer orderProducer;

    @Override
    public void run(String... args) throws Exception {
        // 发送正常消息
        orderProducer.sendOrderMessage("123");

        // 发送失败消息(模拟处理失败)
        orderProducer.sendOrderMessage("456失败");
    }
}

说明:

  1. 死信队列消费者:

    • 监听死信队列,记录日志或触发告警。

五、全流程测试

测试代码:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQTest implements CommandLineRunner {

    @Autowired
    private OrderProducer orderProducer;

    @Override
    public void run(String... args) throws Exception {
        // 发送正常消息
        orderProducer.sendOrderMessage("123");

        // 发送失败消息(模拟处理失败)
        orderProducer.sendOrderMessage("456失败");
    }
}

测试结果:

  1. 正常消息:

    • 消息被成功处理,消费者发送 ack

  2. 失败消息:

    • 消息处理失败,消费者发送 nack 并重新入队。

    • 如果多次重试失败,消息最终进入死信队列。


六、总结

通过 RabbitMQTemplate,我们可以轻松实现 RabbitMQ 的可靠消息投递方案:

  1. 生产者端:

    • 使用 ConfirmCallback 确保消息成功发送。

  2. RabbitMQ 配置:

    • 配置持久化交换机和队列,以及死信队列。

  3. 消费者端:

    • 使用手动确认模式和幂等性设计,确保消息被正确处理。

  4. 死信队列:

    • 处理无法正常消费的消息,记录日志或人工干预。

3.优化

在 processOrder 方法中实现幂等性设计,可以确保即使同一条消息被多次消费,也不会对系统状态产生副作用。以下是几种常见的幂等性设计方法,并结合代码示例进行实现。

一、幂等性设计方法

1. 基于唯一标识的幂等性

通过消息的唯一标识(如订单ID)来确保同一条消息只被处理一次。

实现步骤:
  1. 在数据库中为订单ID添加唯一约束。

  2. 在处理消息时,先检查订单是否已处理。

代码示例:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class OrderService {

    @Autowired
    private OrderRepository orderRepository;

    @Transactional
    public void processOrder(String orderMessage) throws Exception {
        // 解析订单ID
        String orderId = extractOrderId(orderMessage);

        // 检查订单是否已处理
        if (orderRepository.existsById(orderId)) {
            System.out.println("订单已处理,跳过:" + orderId);
            return;
        }

        // 处理订单逻辑
        System.out.println("处理订单:" + orderId);
        if (orderMessage.contains("失败")) {
            throw new Exception("订单处理失败");
        }

        // 保存订单
        Order order = new Order();
        order.setId(orderId);
        order.setStatus("已处理");
        orderRepository.save(order);
    }

    private String extractOrderId(String orderMessage) {
        // 假设订单消息格式为 "订单消息:订单ID=123"
        return orderMessage.split("=")[1];
    }
}

2. 基于 Redis 的幂等性

使用 Redis 缓存记录已处理的消息ID,确保同一条消息只被处理一次。

实现步骤:
  1. 在处理消息前,检查 Redis 中是否存在该消息ID。

  2. 如果存在,则跳过处理;如果不存在,则处理消息并将消息ID存入 Redis。

代码示例:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class OrderService {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    public void processOrder(String orderMessage) throws Exception {
        // 解析订单ID
        String orderId = extractOrderId(orderMessage);

        // 检查 Redis 中是否存在该订单ID
        String key = "order:" + orderId;
        if (redisTemplate.opsForValue().setIfAbsent(key, "processed", 30, TimeUnit.MINUTES)) {
            // 处理订单逻辑
            System.out.println("处理订单:" + orderId);
            if (orderMessage.contains("失败")) {
                throw new Exception("订单处理失败");
            }
        } else {
            System.out.println("订单已处理,跳过:" + orderId);
        }
    }

    private String extractOrderId(String orderMessage) {
        // 假设订单消息格式为 "订单消息:订单ID=123"
        return orderMessage.split("=")[1];
    }
}

3. 基于数据库状态机的幂等性

通过状态机设计,确保订单状态只能从特定状态转移到下一状态。

实现步骤:
  1. 在订单表中添加状态字段。

  2. 处理消息时,检查订单状态是否符合预期。

代码示例:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class OrderService {

    @Autowired
    private OrderRepository orderRepository;

    @Transactional
    public void processOrder(String orderMessage) throws Exception {
        // 解析订单ID
        String orderId = extractOrderId(orderMessage);

        // 查询订单
        Order order = orderRepository.findById(orderId).orElseThrow(() -> new Exception("订单不存在"));

        // 检查订单状态
        if ("已处理".equals(order.getStatus())) {
            System.out.println("订单已处理,跳过:" + orderId);
            return;
        }

        // 处理订单逻辑
        System.out.println("处理订单:" + orderId);
        if (orderMessage.contains("失败")) {
            throw new Exception("订单处理失败");
        }

        // 更新订单状态
        order.setStatus("已处理");
        orderRepository.save(order);
    }

    private String extractOrderId(String orderMessage) {
        // 假设订单消息格式为 "订单消息:订单ID=123"
        return orderMessage.split("=")[1];
    }
}

二、整合到 processOrder 方法

以下是基于 Redis 的幂等性设计整合到 processOrder 方法的完整代码:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.TimeUnit;

@Service
public class OrderConsumer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @RabbitListener(queues = "order_queue")
    public void handleOrderMessage(Message message) {
        try {
            // 处理消息
            String orderMessage = new String(message.getBody());
            processOrder(orderMessage);

            // 手动确认消息
            rabbitTemplate.execute(channel -> {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                return null;
            });
            System.out.println("消息处理成功:" + orderMessage);
        } catch (Exception e) {
            // 处理失败,拒绝消息并重新入队
            rabbitTemplate.execute(channel -> {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                return null;
            });
            System.out.println("消息处理失败,重新入队:" + new String(message.getBody()));
        }
    }

    private void processOrder(String orderMessage) throws Exception {
        // 解析订单ID
        String orderId = extractOrderId(orderMessage);

        // 检查 Redis 中是否存在该订单ID
        String key = "order:" + orderId;
        if (redisTemplate.opsForValue().setIfAbsent(key, "processed", 30, TimeUnit.MINUTES)) {
            // 处理订单逻辑
            System.out.println("处理订单:" + orderId);
            if (orderMessage.contains("失败")) {
                throw new Exception("订单处理失败");
            }
        } else {
            System.out.println("订单已处理,跳过:" + orderId);
        }
    }

    private String extractOrderId(String orderMessage) {
        // 假设订单消息格式为 "订单消息:订单ID=123"
        return orderMessage.split("=")[1];
    }
}

三、总结

在 processOrder 方法中实现幂等性设计,可以有效避免消息重复消费带来的问题。常用的方法包括:

  1. 基于唯一标识的幂等性:

    • 使用数据库唯一约束或 Redis 缓存记录已处理的消息ID。

  2. 基于状态机的幂等性:

    • 通过状态机设计,确保订单状态只能从特定状态转移到下一状态。

在实际项目中,可以根据业务需求选择合适的方法,或结合多种方法实现更强大的幂等性保障。


http://www.kler.cn/a/564089.html

相关文章:

  • Vue 中的 computed 与 watch:深度剖析与实践应用
  • 本地大模型编程实战(22)用langchain实现基于SQL数据构建问答系统(1)
  • Hot100 动态规划
  • 应用的负载均衡
  • HBuilderx 插件开发变量名称翻译 ,中文转(小驼峰,大驼峰,下划线,常量,CSS类名)
  • IP---网络类型
  • 【DeepSeek】【GPT-Academic】:DeepSeek集成到GPT-Academic(官方+第三方)
  • DDNS-GO 动态域名解析
  • 前端网页或者pwa如何实现只横屏显示,设备竖着的时候依然保持横屏
  • kiln微调大模型-使用deepseek R1去训练一个你的具备推理能力的chatGPT 4o
  • seacmsv9报错注入
  • 基于MATLAB红外弱小目标检测MPCM算法复现
  • nginx 配置https
  • 归并排序:分而治之的排序之道
  • Mac安装双系统教程
  • 如果更换ip地址会怎么样?网络ip地址怎么更换
  • Elasticsearch索引设计与分片策略深度优化-手记
  • 2025-02-26 学习记录--C/C++-C语言 判断字符串S2是否在字符串S1中
  • docker file中ADD命令的介绍
  • mysql-analyze table导致waiting for table flush