当前位置: 首页 > article >正文

RocketMQ高级特性二-消息重试与流控

目录

前言

消息发送重试和流控机制

定义

使用场景

原理机制

消息发送重试

重试策略

流控机制

优缺点

Java 代码示例

消息发送重试

使用RocketMQ内置重试机制:

使用RocketMQ指数退避策略

实现自定义重试策略(指数退避):

流控机制

总结


前言

本章将基于RocketMQ的高级特性消费者负载均衡、消费进度管理、消费重试、消息存储和清理机制四大方面进行讲解。注:部分图片及信息来源于Apache

消息发送重试和流控机制

定义
  • 消息发送重试:当消息发送失败时,客户端(Producer)会自动尝试重新发送消息,以确保消息最终能够成功送达Broker。
  • 流控机制:在高并发情况下,RocketMQ通过限制消息发送速率和并发量,防止系统过载,确保系统的稳定性和可靠性。
使用场景
  • 消息发送重试
    • 网络不稳定或临时故障导致消息发送失败时,确保消息不丢失。
    • Broker临时不可用或负载过高时,自动重试发送消息以提高消息传递的成功率。
  • 流控机制
    • 高并发环境下,防止消息发送速率过快导致Broker或网络资源耗尽。
    • 保护系统在负载高峰期仍能稳定运行,避免因资源不足引发系统故障。
原理机制
消息发送重试

RocketMQ的消息发送重试机制主要由生产者(Producer)端实现。当发送消息失败时,Producer会根据配置的重试次数自动重试发送消息。

内置重试机制

  • retryTimesWhenSendFailed:配置生产者在发送失败时的重试次数,默认为2次。
  • retryAnotherBrokerWhenNotStoreOK:当发送消息到指定Broker失败时,是否尝试发送到其他Broker。
  • 异步发送失败的回调:在异步发送模式下,可以通过回调函数处理发送失败的情况。

重试策略

RocketMQ的内置重试机制主要是同步地进行有限次数的重试,通常是固定间隔重试。RocketMQ并不直接支持复杂的重试策略(如指数退避等),但用户可以通过自定义实现来扩展重试逻辑。

重试策略

为了提高消息发送的可靠性和效率,用户可以根据需求自定义不同的重试策略。常见的重试策略包括:

  1. 固定间隔重试:每次重试之间保持固定的时间间隔。
  2. 指数退避重试:每次重试的间隔时间按指数级增长,逐渐增加等待时间,减少系统负载。
  3. 抖动重试:在重试间隔中添加随机性,防止大规模的并发重试请求导致的系统压力。
  4. 条件重试:根据失败原因决定是否重试,例如只对网络异常或临时故障进行重试,而对逻辑错误则不重试。
流控机制

RocketMQ的流控机制主要通过以下方式实现:

  • 信号量控制:使用信号量限制并发发送请求的数量,当并发请求数达到上限时,生产者会阻塞或抛出异常。
  • 限速:限制消息发送的速率,确保在高负载情况下不会超过系统的承载能力。
优缺点
  • 消息发送重试

    • 优点
      • 提高消息发送的可靠性,确保消息能够最终送达Broker。
      • 处理临时的网络或Broker故障,减少消息丢失的可能性。
    • 缺点
      • 重试次数过多可能导致消息顺序混乱,尤其是在顺序消息场景下。
      • 增加网络负担,特别是在高重试次数配置下,可能导致系统资源消耗增加。
  • 流控机制

    • 优点
      • 保护Broker和网络资源,防止过载,确保系统稳定性。
      • 通过控制发送速率,避免在高负载情况下发生系统崩溃。
    • 缺点
      • 流控可能导致消息发送延迟增加,影响消息的实时性。
      • 需要根据系统负载合理配置流控参数,调优难度较大。
Java 代码示例
消息发送重试
使用RocketMQ内置重试机制

以下示例展示了如何配置RocketMQ生产者的内置重试机制,包括设置重试次数和是否在失败时切换Broker。

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class RocketMQProducerRetryExample {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 创建生产者实例并设置组名
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("localhost:9876");
        
        // 配置重试次数
        producer.setRetryTimesWhenSendFailed(3);  // 设置重试次数为3次
        producer.setRetryAnotherBrokerWhenNotStoreOK(true); // 发送失败时是否重试到其他Broker
        
        // 启动生产者
        producer.start();
        
        // 创建消息实例
        Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
        
        try {
            // 发送消息
            SendResult sendResult = producer.send(msg);
            System.out.printf("Send Result: %s%n", sendResult);
        } catch (Exception e) {
            // 处理发送失败的情况
            e.printStackTrace();
        }
        
        // 关闭生产者
        producer.shutdown();
    }
}
使用RocketMQ指数退避策略
  • INITIAL_BACKOFF:第一次失败后重试前需要等待的时间间隔,默认值是1秒。这个参数决定了重试开始时的等待时间。

  • MULTIPLIER:指数退避因子,默认值为1.6。每次重试的等待时间会按此因子进行增长。例如,第一次重试等待INITIAL_BACKOFF秒,第二次重试等待INITIAL_BACKOFF * MULTIPLIER秒,以此类推。

  • JITTER:随机抖动因子,默认值为0.2。抖动的目的是在指数退避的基础上,增加或减少一定比例的时间间隔,以避免在大规模重试时导致系统负载过高或产生重试“风暴”。

  • MAX_BACKOFF:等待时间的上限,默认值为120秒。无论指数退避如何增长,重试等待时间不会超过此上限。

  • MIN_CONNECT_TIMEOUT:最短的重试间隔,默认值为20秒。即使指数退避计算出的重试间隔低于此值,也会强制等待至少MIN_CONNECT_TIMEOUT的时间。

指数退避策略工作机制

在RocketMQ的重试机制中,失败的重试等待时间T可以通过以下公式计算:

T=min(INITIAL_BACKOFF×MULTIPLIERn−1+JITTER×RANDOM(),MAX_BACKOFF)

其中:

  • nnn 为当前的重试次数。
  • RANDOM()\text{RANDOM}()RANDOM() 是一个取值范围在 [−1,1][-1, 1][−1,1] 之间的随机数,用于增加或减少基于JITTER的随机抖动。
  • 重试时间不会低于MIN_CONNECT_TIMEOUT,即:

T = \max(T, \text{MIN\_CONNECT_TIMEOUT})

具体举例

假设初始重试等待时间为1秒(INITIAL_BACKOFF = 1),指数退避因子为1.6(MULTIPLIER = 1.6),抖动因子为0.2(JITTER = 0.2),我们来计算前几次重试的等待时间:

  1. 第一次重试

    • 等待时间:1 * 1.6^0 + 随机抖动
    • 假设抖动为正向0.1,则最终等待时间为:1.0 + 0.1 = 1.1秒
  2. 第二次重试

    • 等待时间:1 * 1.6^1 + 随机抖动
    • 假设抖动为负向0.05,则最终等待时间为:1.6 - 0.08 = 1.52秒
  3. 第三次重试

    • 等待时间:1 * 1.6^2 + 随机抖动
    • 假设抖动为正向0.05,则最终等待时间为:2.56 + 0.13 = 2.69秒
  4. 第四次重试

    • 等待时间:1 * 1.6^3 + 随机抖动
    • 假设抖动为负向0.1,则最终等待时间为:4.096 - 0.4096 = 3.6864秒

以此类推,直到等待时间达到或超过MAX_BACKOFF(120秒),将不再继续增加。每次计算的时间也不会低于MIN_CONNECT_TIMEOUT(20秒)。

优点和缺点

  • 优点

    • 指数退避:避免在系统高负载或网络抖动时,进行频繁重试导致资源耗尽。
    • 抖动机制:防止大量重试请求在同一时间段内冲击系统,降低重试“风暴”发生的可能性。
    • 时间间隔控制:通过设置MAX_BACKOFFMIN_CONNECT_TIMEOUT,可以灵活控制重试策略的上限和下限。
  • 缺点

    • 配置复杂:对于一般用户来说,理解和配置这些参数可能需要一定的经验和调优。
    • 延迟增大:当设置的MULTIPLIER较大时,重试等待时间可能迅速增大,导致重试过程中的总延迟较大。

Java 代码示例

import java.util.Random;

public class RocketMQRetryExample {
    private static final long INITIAL_BACKOFF = 1000L; // 初始重试等待时间,单位毫秒
    private static final double MULTIPLIER = 1.6;     // 指数退避因子
    private static final double JITTER = 0.2;         // 抖动因子
    private static final long MAX_BACKOFF = 120000L;  // 最大等待时间
    private static final long MIN_CONNECT_TIMEOUT = 20000L; // 最短重试间隔

    public static void main(String[] args) {
        int maxRetries = 5;
        Random random = new Random();

        for (int retryCount = 1; retryCount <= maxRetries; retryCount++) {
            // 计算指数退避的基础时间
            long baseBackoff = (long) (INITIAL_BACKOFF * Math.pow(MULTIPLIER, retryCount - 1));
            
            // 计算抖动
            double jitter = JITTER * (2 * random.nextDouble() - 1); // 随机抖动范围[-1, 1]
            long jitterBackoff = (long) (baseBackoff * (1 + jitter));
            
            // 应用MIN_CONNECT_TIMEOUT和MAX_BACKOFF
            long backoffTime = Math.max(jitterBackoff, MIN_CONNECT_TIMEOUT);
            backoffTime = Math.min(backoffTime, MAX_BACKOFF);

            System.out.printf("Retry %d: Waiting %d milliseconds before next attempt.%n", retryCount, backoffTime);

            try {
                Thread.sleep(backoffTime); // 模拟重试等待
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.err.println("Retry interrupted");
                break;
            }

            // 模拟消息发送逻辑
            boolean success = simulateMessageSend();
            if (success) {
                System.out.println("Message sent successfully!");
                break;
            }
        }
    }

    private static boolean simulateMessageSend() {
        // 模拟发送失败的情况,可以根据具体逻辑修改
        return false;
    }
}

解释

  • 该代码展示了如何在消息发送重试中应用指数退避、抖动机制、以及重试时间的上下限控制。
  • retryCount代表当前的重试次数,baseBackoff是基于INITIAL_BACKOFFMULTIPLIER计算的退避时间,jitterBackoff是应用抖动后的实际等待时间。
  • backoffTime通过MIN_CONNECT_TIMEOUTMAX_BACKOFF进行限制,确保等待时间在合理范围内。
实现自定义重试策略(指数退避)

RocketMQ没有直接支持复杂的重试策略,需要用户在应用层实现。以下示例展示了如何在发送消息时实现指数退避重试策略。

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class RocketMQProducerCustomRetry {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 创建生产者实例并设置组名
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("localhost:9876");
        
        // 禁用RocketMQ的内置重试
        producer.setRetryTimesWhenSendFailed(0);
        
        // 启动生产者
        producer.start();
        
        // 创建消息实例
        Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ with Custom Retry".getBytes());
        
        int maxRetries = 5;
        int retryCount = 0;
        long waitTime = 1000; // 初始等待时间1秒
        boolean sendSuccess = false;
        
        while (retryCount <= maxRetries && !sendSuccess) {
            try {
                SendResult sendResult = producer.send(msg);
                System.out.printf("Send Result: %s%n", sendResult);
                sendSuccess = true;
            } catch (MQBrokerException | RemotingException | InterruptedException | MQClientException e) {
                retryCount++;
                if (retryCount > maxRetries) {
                    System.err.println("Failed to send message after " + maxRetries + " retries.");
                    e.printStackTrace();
                    break;
                }
                System.err.println("Send failed, retrying... Attempt " + retryCount);
                try {
                    // 指数退避
                    Thread.sleep(waitTime);
                    waitTime *= 2; // 每次重试等待时间加倍
                } catch (InterruptedException ie) {
                    ie.printStackTrace();
                }
            }
        }
        
        // 关闭生产者
        producer.shutdown();
    }
}

解释

  • 首先,禁用了RocketMQ内置的重试机制(producer.setRetryTimesWhenSendFailed(0);)。
  • 然后,手动实现了一个指数退避的重试逻辑,每次重试的等待时间加倍,最大重试次数为5次。
  • 如果所有重试都失败,则记录错误并停止重试。
流控机制

配置生产者的流控参数

RocketMQ的流控主要通过信号量和限速来实现。以下示例展示了如何设置生产者的信号量以限制并发发送请求数量。

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.client.producer.SendResult;

public class RocketMQProducerFlowControlExample {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 创建生产者实例并设置组名
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("localhost:9876");
        
        // 配置流控参数
        producer.setSendMsgTimeout(3000); // 设置发送超时时间为3秒
        producer.getDefaultMQProducerImpl().setSemaphoreAsyncSendNum(100); // 设置异步发送的信号量
        
        // 启动生产者
        producer.start();
        
        // 发送大量消息以触发流控
        for (int i = 0; i < 1000; i++) {
            Message msg = new Message("TopicTest", "TagA", ("Message " + i).getBytes());
            try {
                SendResult sendResult = producer.send(msg);
                System.out.printf("Send Result: %s%n", sendResult);
            } catch (Exception e) {
                // 处理发送失败的情况,可能由于流控导致
                System.err.println("Failed to send message: " + e.getMessage());
                e.printStackTrace();
            }
        }
        
        // 关闭生产者
        producer.shutdown();
    }
}

解释

  • setSendMsgTimeout:设置消息发送的超时时间为3秒。
  • setSemaphoreAsyncSendNum:限制异步发送的最大并发数为100。
  • 在发送大量消息时,如果并发发送请求超过限制,生产者会根据配置的流控策略(如阻塞、抛出异常等)处理超出部分。

实现自定义流控策略

如果需要更复杂的流控策略,可以在应用层实现,例如使用令牌桶算法或其他限流算法。以下示例展示了如何使用令牌桶算法来控制消息发送速率。

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.client.producer.SendResult;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class RocketMQProducerCustomFlowControl {
    private static final int MAX_TOKENS = 100;
    private static final long REFILL_INTERVAL_MS = 1000;
    private static final AtomicInteger tokens = new AtomicInteger(MAX_TOKENS);
    
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 创建生产者实例并设置组名
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        producer.setNamesrvAddr("localhost:9876");
        
        // 启动生产者
        producer.start();
        
        // 启动令牌桶补充线程
        Thread refillThread = new Thread(() -> {
            while (true) {
                try {
                    Thread.sleep(REFILL_INTERVAL_MS);
                    tokens.set(MAX_TOKENS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
        });
        refillThread.setDaemon(true);
        refillThread.start();
        
        // 发送大量消息,应用层控制发送速率
        for (int i = 0; i < 1000; i++) {
            // 尝试获取令牌
            while (tokens.get() <= 0) {
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            tokens.decrementAndGet();
            
            // 创建消息
            Message msg = new Message("TopicTest", "TagA", ("Custom Flow Control Message " + i).getBytes());
            try {
                SendResult sendResult = producer.send(msg);
                System.out.printf("Send Result: %s%n", sendResult);
            } catch (Exception e) {
                // 处理发送失败的情况
                System.err.println("Failed to send message: " + e.getMessage());
                e.printStackTrace();
            }
        }
        
        // 关闭生产者
        producer.shutdown();
    }
}

解释

  • 令牌桶算法:通过令牌桶控制发送速率。每秒补充一定数量的令牌,发送消息前需要获取令牌,如果没有令牌则等待。
  • MAX_TOKENS:令牌桶的容量,控制每秒最多发送的消息数。
  • REFILL_INTERVAL_MS:令牌桶的补充间隔,这里设为每秒补充一次。
总结

消息发送重试是RocketMQ保证消息可靠性的关键机制,通过配置重试次数和策略,生产者可以在发送失败时自动尝试重新发送消息。RocketMQ提供了基本的重试配置,但对于更复杂的重试需求(如指数退避、抖动等),用户需要在应用层自行实现自定义的重试逻辑。

流控机制则是RocketMQ保护系统稳定性的另一重要手段,通过限制并发发送请求数量和发送速率,确保系统在高负载下依然能够正常运行。RocketMQ提供了基本的流控配置,用户也可以根据需求在应用层实现更复杂的限流策略,如令牌桶算法、漏桶算法等。

通过合理配置和使用这些机制,用户可以在确保消息可靠性的同时,维护系统的高可用性和稳定性。


http://www.kler.cn/a/287360.html

相关文章:

  • css数据不固定情况下,循环加不同背景颜色
  • 台式电脑没有声音怎么办?台式电脑没有声音解决详解
  • 【网页设计】CSS3 进阶(动画篇)
  • 游戏引擎学习第12天
  • 跨平台WPF框架Avalonia教程 一
  • 【ASR技术】WhisperX安装使用
  • Sentieon 应用教程 | 使用CNVscope进行CNV检测分析
  • 精通Redis-CLI:命令行玩转高效缓存
  • Java设计模式之外观模式详细讲解和案例示范
  • Git分支原理、操作及实际开发中如何规范使用分支
  • pycharm破解教程
  • OD C卷 - 项目排期/最少交付时间
  • java05
  • Java Web_00001
  • HarmonyOS开发实战:Node-API扩展能力接口
  • 从实验室到应用:LC-MS/MS技术与AbMole化合物共舞,揭开半胱氨酸靶向共价抑制剂的新篇章
  • Android创建自己的内容提供器(ContentProvider)
  • java面试(java基础)
  • python脚本处理---(不同文件夹中的文件对比、移动,提取指定类型文件、中文文件名转英文)
  • 年化从19.1%提升到22.5%,全球大类资产轮动,加上RSRS择时,RSRS性能优化70倍。(附策略源码)
  • 如何选到好的宠物空气净化器?有没有推荐的品牌?
  • Javascript实现笛卡儿积算法
  • CSS 高级区块效果——WEB开发系列25
  • 免费的电脑录屏软件,这几款软件满足录屏需求!
  • visual studio code默认打开文档的编码格式simplified chiness(GBK) gbk
  • 基于FFMPEG读取摄像头图像编码为h264