RocketMQ延迟消息机制
架构设计
延迟级别(Delay Level)
RocketMQ 通过预定义的延迟级别来实现延迟消息。每个延迟级别对应一个固定的延迟时间。
消息存储和处理:
- 当消息到达 broker 后,如果设置了延迟级别,broker 将不会立即将消息投递到消费队列中,而是将其存储在特定的延迟队列中。
- RocketMQ 的 broker 定时任务会检查延迟队列中消息的时间,当消息到达指定的延迟时间后,broker 会将消息存入实际的消费队列中以供消费者消费。
示例代码
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class DelayedMessageProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("example_producer_group");
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建消息,并指定主题
Message msg = new Message("TopicTest", "TagA", "OrderID001", "Hello RocketMQ".getBytes());
// 设置延迟级别
msg.setDelayTimeLevel(3); // 表示延迟 10 秒 (根据 delay level 3)
// 发送消息
producer.send(msg);
// 关闭生产者
producer.shutdown();
}
}