RocketMQ学习笔记
RocketMQ简介
RocketMQ是阿里巴巴2016年MQ中间件,使用Java语言开发,RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:
- 能够保证严格的消息顺序
- 提供丰富的消息拉取模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
为什么使用MQ
- 要做到系统解耦,当新的模块进来时,可以做到代码改动最小; 能够解耦
- 设置流程缓冲池,可以让后端系统按自身吞吐能力进行消费,不被冲垮; 能够削峰,限流
- 强弱依赖梳理能把非关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步
定义
中间件(缓存中间件 redis memcache 数据库中间件 mycat canal 消息中间件mq )
面向消息的中间件(message-oriented middleware) MOM能够很好的解决以上的问题。
是指利用高效可靠的消息传递机制进行与平台无关(跨平台)的数据交流,并基于数据通信来进行分布式系统的集成。
通过提供消息传递和消息排队模型在分布式环境下提供应用解耦,弹性伸缩,冗余存储,流量削峰,异步通信,数据同步等
大致流程
发送者把消息发给消息服务器(MQ),消息服务器把消息存放在若干队列/主题中,在合适的时候,消息服务器会把消息转发给接受者。在这个过程中,发送和接受是异步的,也就是发送无需等待,发送者和接受者的生命周期也没有必然关系在发布pub/订阅sub模式下,也可以完成一对多的通信,可以让一个消息有多个接受者(微信订阅号就是这样的)
各个MQ产品的比较
[[Kafka|动力节点kafka]]
RocketMQ重要概念【重点】
Producer:消息的发送者,生产者;举例:发件人
Consumer:消息接收者,消费者;举例:收件人
Broker:暂存和传输消息的通道;举例:快递
NameServer:管理Broker;举例:各个快递公司的管理机构相当于broker的注册中心,保留了broker的信息
Queue:队列,消息存放的位置,一个Broker中可以有多个队列
Topic:主题,消息的分类
ProducerGroup:生产者组
ConsumerGroup:消费者组,多个消费者组可以同时消费一个主题的消息
消息发送的流程是,Producer询问NameServerNameServer分配一个broker然后Consumer也要询问NameServer,得到一个具体的broker,然后消费消息
RocketMQ快速入门
RocketMQ提供了发送多种发送消息的模式,例如同步消息,异步消息,顺序消息,延迟消息,事务消息等。
消息发送和监听的流程
我们先搞清楚消息发送和监听的流程,然后我们在开始敲代码
消息生产者
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体等
5.发送消息
6.关闭生产者producer
消息消费者
1.创建消费者consumer,制定消费者组名
2.指定Nameserver地址
3.创建监听订阅主题Topic和Tag等
4.处理消息
5.启动消费者consumer
搭建Rocketmq-demo
导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<!-- 原生的api -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.12</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>3.0.6</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
编写生产者
public interface MqConstant {
String NAME_SRV_ADDR = "127.0.0.1:9876";
}
/**
* 生产者
*
* @throws Exception
*/@Test
public void simpleProducer() throws Exception {
// 创建一个生产者,并且指定一个组名
DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");
// 连接nameserver
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 启动
producer.start();
// 创建一个消息
Message message = new Message("testTopic", "我是一个简单的消息".getBytes());
// 发送消息
SendResult sendResult = producer.send(message);
System.out.println(sendResult.getSendStatus());
// 关闭生产者
producer.shutdown();
}
编写消费者
/**
* 消费者
*
* @throws Exception
*/@Test
public void simpleConsumer() throws Exception {
// 创建一个消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");
// 连接nameserver
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 订阅一个主题,* 表示订阅这个主题中所有的消息 后期会有消息过滤
consumer.subscribe("testTopic", "*");
// 设置一个监听器(一直监听的,异步回调方式)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 这个就是消费的方法(业务处理)
System.out.println("我是消费者");
System.out.println(msgs.get(0).toString());
System.out.println("消息内容:" + new String(msgs.get(0).getBody()));
System.out.println("消息消费的上下文" + context);
// 返回值 CONSUME_SUCCESS 成功,消息会从mq出队
// RECONSUME_LATER(报错/null) 失败,消息会重新回到队列 过一会重新投递出来 给当前消费者或者其他消费者消费的
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动
consumer.start();
// 挂起当前的jvm
System.in.read();
}
消息模型
消费模式
MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。
Push
是服务端【MQ】主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。Pull
是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
Push模式也是基于pull模式的,只能客户端内部封装了api,一般场景下,上游消息生产量小或者均速的时候,选择push模式。在特殊场景下,例如电商大促,抢优惠券等场景可以选择pull模式
RocketMQ发送同步消息⭐️⭐️
上面的快速入门就是发送同步消息,发送过后会有一个返回值,也就是mq服务器接收到消息后返回的一个确认,这种方式非常安全,但是性能上并没有这么高,而且在mq集群中,也是要等到所有的从机都复制了消息以后才会返回,所以针对重要的消息可以选择这种方式。
RocketMQ发送异步消息⭐️⭐️
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知
异步消息生产者
@Test
public void asyncProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message = new Message("asyncTopic", "我是一个异步消息".getBytes());
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功");
}
@Override
public void onException(Throwable e) {
System.out.println("发送失败:" + e.getMessage());
}
});
System.out.println("我先执行");
System.in.read();
}
异步消息消费者
@Test
public void asyncConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("asyncTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + "---->" + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
RocketMQ发送单向消息⭐️
这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送
单向消息生产者
@Test
public void onewayProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message = new Message("onewayTopic", "我是一个单向消息".getBytes());
producer.sendOneway(message);
System.out.println("发送成功");
producer.shutdown();
}
单向消息消费者
和上面的消费者一样
RocketMQ发送延迟消息⭐️⭐️
消息放入mq后,过一段时间,才会被监听到,然后消费。比如下订单业务,提交了一个订单就可以发送一个延时消息,30min后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
延迟消息生产者
@Test
public void msProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message = new Message("orderMsTopic", "订单号,座位号".getBytes());
// 给消息设置一个延迟时间
// messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
message.setDelayTimeLevel(3);
producer.send(message);
System.out.println("发送时间:" + new Date());
producer.shutdown();
}
延迟消息消费者
/**
* 发送时间:Mon Oct 28 10:21:48 CST 2024
* 收到消息了,Mon Oct 28 10:22:24 CST 2024
* ---------------------------------------
* 发送时间:Mon Oct 28 10:23:23 CST 2024
* 收到消息了,Mon Oct 28 10:23:34 CST 2024
* @throws Exception
*/
@Test
public void msConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ms-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("orderMsTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("收到消息了," + new Date());
System.out.println(new String(msgs.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
RocketMQ发送批量消息
Rocketmq可以一次性发送一组消息,那么这一组消息会被当做一个消息消费
批量消息生产者
@Test
public void batchProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("batch-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
List<Message> msgs = Arrays.asList(
new Message("batchTopic", "我是一组消息的A消息".getBytes()),
new Message("batchTopic", "我是一组消息的B消息".getBytes()),
new Message("batchTopic", "我是一组消息的C消息".getBytes())
);
producer.send(msgs);
producer.shutdown();
}
批量消息消费者
@Test
public void testBatchConsumer() throws Exception {
// 创建默认消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch-consumer-group");
// 设置nameServer地址
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 订阅一个主题来消费 表达式,默认是*
consumer.subscribe("batchTopic", "*");
// 注册一个消费监听 MessageListenerConcurrently是并发消费
// 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax() consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 这里执行消费的代码 默认是多线程消费
System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
RocketMQ发送顺序消息⭐️
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为:分区有序或者全局有序。
可能大家会有疑问,mq不就是FIFO吗?
rocketMq的broker的机制,导致了rocketMq会有这个问题
因为一个broker中对应了四个queue
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:下订单、发短信通知、物流、签收。订单顺序号相同的消息会被先后发送到同一个队列中,消费时,同一个顺序获取到的肯定是同一个队列。
场景分析
模拟一个订单的发送流程,创建两个订单,发送的消息分别是
- 订单号111 消息流程 下订单->物流->签收
- 订单号112 消息流程 下订单->物流->拒收
创建一个消息对象
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MsgModel {
private String orderSn;
private Integer userId;
private String desc; // 下单 短信 物流
}
顺序消息生产者
private List<MsgModel> msgModels = Arrays.asList(
new MsgModel("qwer", 1, "下单"),
new MsgModel("qwer", 1, "短信"),
new MsgModel("qwer", 1, "物流"),
new MsgModel("zxcv", 2, "下单"),
new MsgModel("zxcv", 2, "短信"),
new MsgModel("zxcv", 2, "物流")
);
@Test
public void orderlyProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("orderly-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
// 发送顺序消息 发送时要确保有序 并且要发送到同一个队列下面去
msgModels.forEach(msgModel -> {
Message message = new Message("orderlyTopic", msgModel.toString().getBytes());
try {
// 发 相同的订单号去相同的队列
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 在这里 arg就是下面传入的订单号
// 选择队列
int hashCode = arg.toString().hashCode();
int i = hashCode % mqs.size();
return mqs.get(i);
}
}, msgModel.getOrderSn());
} catch (Exception e) {
e.printStackTrace();
}
});
producer.shutdown();
System.out.println("发送完毕");
}
顺序消息消费者
@Test
public void orderlyConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("orderlyTopic", "*");
// MessageListenerConcurrently 并发模式 多线程的 重试16次
// MessageListenerOrderly 顺序模式 单线程的 无限重试Integer.MAX_Value
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
System.out.println(Thread.currentThread().getId() + "----->" + new String(msgs.get(0).getBody()));
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.in.read();
}
RocketMQ发送带标签的消息,消息过滤
Rocketmq提供消息过滤功能,通过tag或者key进行区分
我们往一个主题里面发送消息的时候,根据业务逻辑,可能需要区分,比如带有tagA标签的被A消费,带有tagB标签的被B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,我们也需要通过过滤才区别对待。
标签消息生产者
@Test
public void tagProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
Message message = new Message("tagTopic", "vip1", "我是vip1的文章".getBytes());
Message message2 = new Message("tagTopic", "vip2", "我是vip2的文章".getBytes());
producer.send(message);
producer.send(message2);
System.out.println("发送成功!");
producer.shutdown();
}
标签消息消费者
/**
* vip1 * @throws Exception
*/@Test
public void tagConsumer1() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 只能收到tag为vip1的消息
consumer.subscribe("tagTopic", "vip1");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("我是vip1的消费者,我正在消费消息:" + new String(msgs.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
/**
* vip1 || vip2 * @throws Exception
*/@Test
public void tagConsumer2() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 只能收到tag为vip1的消息
consumer.subscribe("tagTopic", "vip1 || vip2");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("我是vip2的消费者,我正在消费消息:" + new String(msgs.get(0).getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
什么时候该用Topic,什么时候该用Tag?
总结:不同的业务应该使用不同的Topic如果是相同的业务里面有不同表的表现形式,那么我们要使用tag进行区分
可以从以下几个方面进行判断:
- 消息类型是否一致:如普通消息、事务消息、定时(延时)消息、顺序消息,不同的消息类型使用不同的 Topic,无法通过 Tag 进行区分。
- 业务是否相关联:没有直接关联的消息,如淘宝交易消息,京东物流消息使用不同的 Topic 进行区分;而同样是天猫交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用 Tag 进行区分。
- 消息优先级是否一致:如同样是物流消息,盒马必须小时内送达,天猫超市 24 小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的 Topic 进行区分。
- 消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级的消息使用同一个 Topic,则有可能会因为过长的等待时间而“饿死”,此时需要将不同量级的消息进行拆分,使用不同的 Topic。
总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic下创建多个 Tag。但通常情况下,不同的 Topic之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic相互关联的消息,例如全集和子集的关系、流程先后的关系。
RocketMQ中消息的key⭐️
在rocketmq中的消息,默认会有一个messageId当做消息的唯一标识,我们也可以给消息携带一个key,用作唯一标识或者业务标识,包括在控制面板查询的时候也可以使用messageId或者key来进行查询
带key消息生产者
/**
* key是业务参数 我们自身要确保唯一
* 为了查阅和去重
* @throws Exception
*/
@Test
public void keyProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("key-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
String key = UUID.randomUUID().toString();
System.out.println(key);
Message message = new Message("keyTopic", "vip1", key, "我是vip1的文章".getBytes());
producer.send(message);
System.out.println("发送成功!");
producer.shutdown();
}
带key消息消费者
@Test
public void keyConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
// 只能收到tag为vip1的消息
consumer.subscribe("keyTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("消费消息:" + new String(msgs.get(0).getBody()));
System.out.println("我们业务的标识" + msgs.get(0).getKeys());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
RocketMQ消息重复消费问题(去重)⭐️⭐️
为什么会出现重复消费的问题呢?
重复消费生产者
@Autowired
private JdbcTemplate jdbcTemplate;
@Test
public void repeatedProducer() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("repeat-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
String key = UUID.randomUUID().toString();
System.out.println(key);
// 测试 发两个key一样的消息
Message m1 = new Message("repeatTopic", null, key, "扣减库存-1".getBytes());
Message m1Repeat = new Message("repeatTopic", null, key, "扣减库存-1".getBytes());
producer.send(m1);
producer.send(m1Repeat);
System.out.println("发送成功");
producer.shutdown();
}
重复消费消费者
/**
* 我们设计一个去重表,对消息的唯一key添加唯一索引
* 每次消费消息的时候 先插入数据库 如果成功则执行业务逻辑 如果业务逻辑执行报错 则删除这个去重表记录
* 如果插入失败 则说明消息来过了,直接签收了
*
* @throws Exception
*/
@Test
public void repeatedConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("repeatTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 先拿key
MessageExt messageExt = msgs.get(0);
String keys = messageExt.getKeys();
// 原生方式操作数据库
Connection connection = null;
try {
connection = DriverManager.getConnection("jdbc:mysql://localhost:13306/test?serverTimezone=GMT%2B8&useSSL=false",
"root", "123456");
} catch (SQLException e) {
e.printStackTrace();
}
PreparedStatement statement = null;
try {
// 插入数据库 因为我们给key做了唯一索引
// 新增 要么成功 要么报错 修改 要么成功 要么返回0 要么报错
statement = connection.prepareStatement("insert into order_oper_log(`type`, `order_sn`, `user`) values (1, '" + keys + "', '123')");
} catch (SQLException e) {
e.printStackTrace();
}
try {
statement.executeUpdate();
} catch (SQLException e) {
if (e instanceof SQLIntegrityConstraintViolationException) {
// 唯一索引冲突异常
// 说明消息来过了
System.out.println("该消息来过了");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
e.printStackTrace();
}
// 处理业务逻辑
// 如果业务报错 则删除掉这个去重表记录 delete order_oper_log where order_sn = keys
System.out.println(new String(messageExt.getBody()));
System.out.println(keys);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
RocketMQ重试机制
生产者重试
@Test
public void retryProducer() throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("retry-producer-group");
producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
producer.start();
// 生产者发送消息 重试的次数
producer.setRetryTimesWhenSendFailed(2);
producer.setRetryTimesWhenSendAsyncFailed(2);
String key = UUID.randomUUID().toString();
System.out.println(key);
Message message = new Message("retryTopic", "vip1", key, "我是vip666的文章".getBytes());
producer.send(message);
System.out.println("发送成功!");
producer.shutdown();
}
消费者重试
在消费者放return ConsumeConcurrentlyStatus.RECONSUME_LATER;后就会执行重试
上图代码中说明了,我们再实际生产过程中,一般重试3-5次,如果还没有消费成功,则可以把消息签收了,通知人工等处理。
/**
* 重试时间间隔
* 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 默认重试16次
* 1. 能否自定义重试次数
* 2. 如果重试了16次(并发模式)顺序模式下(int最大值次)都是失败的? 认为该消息是一个死信消息 则会放在一个死信主题中去 主题的名称:%DLQ%消费组的名字
* 3. 当消息处理失败的时候 该如何正确的处理?
* -----------
* 重试的次数一般 5次
* @throws Exception
*/
@Test
public void retryConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("retryTopic", "*");
// 设定重试次数
consumer.setMaxReconsumeTimes(2);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0);
System.out.println(new Date());
System.out.println(messageExt.getReconsumeTimes());
System.out.println(new String(messageExt.getBody()));
// 业务报错了 返回null 返回RECONSUME_LATER 都会重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.in.read();
}
// 直接监听死信主题的消息,记录下来 通知人工接入处理
@Test
public void retryDeadConsumer() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("%DLQ%retry-consumer-group", "*");
// 设定重试次数
consumer.setMaxReconsumeTimes(2);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0);
System.out.println(new Date());
System.out.println(new String(messageExt.getBody()));
System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
// 业务报错了 返回null 返回RECONSUME_LATER 都会重试
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
// 第二种方案 用法比较多
@Test
public void retryDeadConsumer2() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
consumer.subscribe("retryTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt messageExt = msgs.get(0);
System.out.println(new Date());
// 业务处理
try {
handledb();
} catch (Exception e) {
// 重试
int reconsumeTimes = messageExt.getReconsumeTimes();
if (reconsumeTimes >= 3) {
// 不要重试了
System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 业务报错了 返回null 返回RECONSUME_LATER 都会重试
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.in.read();
}
private void handledb() {
int i = 10 / 0;
}
RocketMQ集成SpringBoot
搭建消息生产者
引入pom.xml依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.11</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.xyf</groupId>
<artifactId>b-rocketnq-boot-p</artifactId>
<version>1.0-SNAPSHOT</version>
<name>b-rocketmq-boot-p</name>
<description>b-rocketmq-boot-p</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.22</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
修改配置文件application.yml
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: boot-producer-group
测试发送消息
@SpringBootTest
public class BRocketmqBootPApplicationTests {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void test() {
// 同步
rocketMQTemplate.syncSend("bootTestTopic", "我是boot的一个消息");
}
}
搭建消息消费者
引入pom.xml依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.11</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.xyf</groupId>
<artifactId>c-rocketmq-boot-c</artifactId>
<version>1.0-SNAPSHOT</version>
<name>c-rocketmq-boot-c</name>
<description>c-rocketmq-boot-c</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.22</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
修改配置文件application.yml
server:
port: 8081
rocketmq:
name-server: 127.0.0.1:9876
# 一个boot项目中可以写很多个消费者程序,但是一般在开发中 一个boot项目只对应一个消费者
测试消费消息
@Component
@RocketMQMessageListener(topic = "bootTestTopic", consumerGroup = "boot-test-consumer-group")
public class ABootSimpleMsgListener implements RocketMQListener<MessageExt> {
/**
* 这个方法就是消费者的方法
* 如果泛型指定了固定的类型 那么消息体就是我们的参数
* MessageExt 这个类型是消息的所有内容
* --------------
* 没有报错 那就签收了
* 如果报错了 就是拒收 就会重试
*
* @param message
*/
@Override
public void onMessage(MessageExt message) {
System.out.println(new String(message.getBody()));
}
}
发送顺序消息
搭建顺序消息生产者
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MsgModel {
private String orderSn;
private Integer userId;
private String desc; // 下单 短信 物流
}
@SpringBootTest
public class BRocketmqBootPApplicationTests {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void test() {
// 同步
// rocketMQTemplate.syncSend("bootTestTopic", "我是boot的一个消息");
// 异步
// rocketMQTemplate.asyncSend("bootTestTopic", "我是boot的一个异步消息", new SendCallback() {
// @Override
// public void onSuccess(SendResult sendResult) {
// System.out.println("成功");
// }
//
// @Override
// public void onException(Throwable throwable) {
// System.out.println("失败" + throwable.getMessage());
// }
// });
// 单向
// rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息");
// 延迟
// Message<String> msg = MessageBuilder.withPayload("我是一个延迟消息").build();
// rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 3);
// 顺序 发送者方 需要将一组消息 都发在同一个队列中去 消费者方 需要单线程消费
List<MsgModel> msgModels = Arrays.asList(
new MsgModel("qwer", 1, "下单"),
new MsgModel("qwer", 1, "短信"),
new MsgModel("qwer", 1, "物流"),
new MsgModel("zxcv", 2, "下单"),
new MsgModel("zxcv", 2, "短信"),
new MsgModel("zxcv", 2, "物流")
);
msgModels.forEach(msgModel -> {
// 发送 一般都是以json的方式进行处理
rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());
});
}
}
搭建顺序消息消费者
@Component
@RocketMQMessageListener(topic = "bootOrderlyTopic",
consumerGroup = "boot-orderly-consumer-group",
consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程
maxReconsumeTimes = 5 // 消息重试的次数
)
public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);
System.out.println(msgModel);
}
}
发送带标签的消息
生产者
@Test
public void tagKeyTest() throws Exception {
// topic:tag
rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");
}
消费者
@Component
@RocketMQMessageListener(topic = "bootTagTopic",
consumerGroup = "boot-tag-consumer-group",
selectorType = SelectorType.TAG, // tag过滤模式
selectorExpression = "tagA || tagB"
)
public class CTagMsgListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
System.out.println(new String(message.getBody()));
}
}
发送带key的消息
生产者
@Test
public void tagKeyTest() throws Exception {
// key是携带在消息头的
Message<String> message = MessageBuilder.withPayload("我是一个带key的消息")
.setHeader(RocketMQHeaders.KEYS, "qwerasdzxcv")
.build();
rocketMQTemplate.syncSend("bootKeyTopic", message);
}
消费者
@Component
@RocketMQMessageListener(topic = "bootKeyTopic",
consumerGroup = "boot-key-consumer-group",
selectorType = SelectorType.TAG, // tag过滤模式
selectorExpression = "tagA || tagB"
)
public class DKeyMsgListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
System.out.println(message.getKeys());
}
}
RocketMQ集成SpringBoot消息消费两种模式
Rocketmq消息消费的模式分为两种:负载均衡模式和广播模式
负载均衡模式表示多个消费者交替消费同一个主题里面的消息
广播模式表示每个消费者都消费一遍订阅的主题的消息
负载均衡模式
生产者
// 测试消息消费模式 集群模式 广播模式
@Test
public void modeTest() throws Exception {
for (int i = 1; i <= 10; i++) {
rocketMQTemplate.syncSend("modeTopic", "我是第" + i + "个消息");
}
}
消费者
搭建三个消费者,同为一个消费者组
@Component
@RocketMQMessageListener(topic = "modeTopic",
consumerGroup = "mode-consumer-group-a",
messageModel = MessageModel.CLUSTERING //集群模式 负载均衡
)
public class EC1 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("我是mode-consumer-group-a组的第一个消费者:" + message);
}
}
@Component
@RocketMQMessageListener(topic = "modeTopic",
consumerGroup = "mode-consumer-group-a",
messageModel = MessageModel.CLUSTERING //集群模式
)
public class EC2 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("我是mode-consumer-group-a组的第二个消费者:" + message);
}
}
@Component
@RocketMQMessageListener(topic = "modeTopic",
consumerGroup = "mode-consumer-group-a",
messageModel = MessageModel.CLUSTERING //集群模式
)
public class EC3 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("我是mode-consumer-group-a组的第三个消费者:" + message);
}
}
广播模式
生产者同上
消费者
搭建三个消费者,同位一个消费者组
@Component
@RocketMQMessageListener(topic = "modeTopic",
consumerGroup = "mode-consumer-group-b",
messageModel = MessageModel.BROADCASTING //广播模式
)
public class EC4 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("我是mode-consumer-group-b组的第一个消费者:" + message);
}
}
@Component
@RocketMQMessageListener(topic = "modeTopic",
consumerGroup = "mode-consumer-group-b",
messageModel = MessageModel.BROADCASTING //集群模式 负载均衡
)
public class EC5 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("我是mode-consumer-group-b组的第二个消费者:" + message);
}
}
@Component
@RocketMQMessageListener(topic = "modeTopic",
consumerGroup = "mode-consumer-group-b",
messageModel = MessageModel.BROADCASTING //集群模式 负载均衡
)
public class EC6 implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("我是mode-consumer-group-b组的第三个消费者:" + message);
}
}
CLUSTERING 集群模式下 队列会被消费者分摊 队列数量 >= 消费者数量 消息的消费位点 mq服务器会记录处理
BROADCASTING 广播模式下 消息会被每一个消费者都处理一次 mq服务器不会记录消费点位 并且也不会重试
如何解决消息堆积问题 ⭐️⭐️
一般认为单条队列消息差值>=10w时 算堆积问题
什么情况下会出现堆积
-
生产太快了
生产方可以做业务限流
增加消费者数量,但是消费者数量<=队列数量,适当的设置最大的消费线程数量(根据IO(2n)/CPU(n+1))
动态扩容队列数量,从而增加消费者数量 -
消费者消费出现问题
排查消费者程序的问题
如何确保消息不丢失?
- 生产者使用同步发送模式 ,收到mq的返回确认以后 顺便往自己的数据库里面写
msgId status(0) time - 消费者消费以后 修改数据这条消息的状态 = 1
- 写一个定时任务 间隔两天去查询数据 如果有status = 0 and time < day-2
- 将mq的刷盘机制设置为同步刷盘
- 使用集群模式 ,搞主备模式,将消息持久化在不同的硬件上
- 可以开启mq的trace机制,消息跟踪机制
1.在broker.conf中开启消息追踪
traceTopicEnable=true
2.重启broker即可
3.生产者配置文件开启消息轨迹
enable-msg-trace: true
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: boot-producer-group
enable-msg-trace: true
- 消费者开启消息轨迹功能,可以给单独的某一个消费者开启
enableMsgTrace = true
@Component
@RocketMQMessageListener(topic = "traceTopic",
consumerGroup = "trace-consumer-group",
consumeThreadNumber = 40,
consumeMode = ConsumeMode.CONCURRENTLY,
enableMsgTrace = true //开启消费者方的轨迹
)
public class HTraceListener implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("我是消费者:" + message);
}
}
在rocketmq的面板中可以查看消息轨迹
默认会将消息轨迹的数据存在 RMQ_SYS_TRACE_TOPIC 主题里面
安全
- 开启acl的控制 在broker.conf中开启aclEnable=true
- 配置账号密码 修改plain_acl.yml
- 修改控制面板的配置文件 放开52/53行 把49行改为true 上传到服务器的jar包平级目录下即可