RocketMQ发送消息之事务消息
前言
无论是对于数据库还是对于各种应用来讲,事务在项目中都起着举足轻重的作用。在前面我们接触过数据库的一致性,Redis与数据库的一致性等,今天我们一起学习RocketMQ事务这块的知识点。
希望这篇文章可以帮助到正在学习这块知识点的大佬们!
事务消息概念
RocketMQ的事务和数据库的略有不同。RocketMQ事务消息指的是应用本地事务和发送消息操作可以被定义到全局事务中,确保两者要么同时成功,要么同时失败。
官网上说这种机制类似于X/Open XA的分布式事务功能,通过事务消息达到分布式事务的最终一致性。RocketMQ在4.3.0版本中首次引入了分布式事务消息的功能。
事务消息执行流程
RocketMQ事务消息的工作流程可以分为以下几个阶段:
-
发送半消息:
生产者首先向RocketMQ服务端(也就是broker)发送一条预处理消息(这个被叫做半消息)。这条消息在消费者端是不可见的
,只是被存储在RocketMQ服务端(broker),并标记为“暂时不能投递”。 -
执行本地事务:
生产者接收到服务端(Broker,这里都标注一下)的确认后,开始执行本地事务
逻辑。本地事务执行完毕后,生产者根据执行结果返回Commit或者Rollback状态。 -
提交二次确认:
根据本地事务的执行结果,生产者向RocketMQ服务端(习惯说Broker)提交二次确认结果。如果收到的结果是Commit状态,服务端(Broker)将半消息标记为可投递,消费者就能够接收到这个消息了;如果收到Rollback状态,那么删除半消息,消费者也就不会接收到这个消息了。 -
消息回查:
如果服务端(Broker)长时间未收到生产者的二次确认结果,或者因为网络原因导致确认丢失了,服务端(Broker)就会主动向生产者发起回查请求。生产者接收到回查请求后,会再次检查本地事务的执行状态,并返回Commit或者Rollback状态。
事务消息应用场景
RocketMQ的事务消息经常应用于需要确保消息生产和消费原子性操作的场景,比如说订单支付、库存扣减这些。
面试问到八股可以这样答:
举个栗子额:A银行用户向B银行用户转账的时候:
首先,A银行系统发起转账操作,并且向RocketMQ发送半消息。
等待RocketMQ服务端Broker确认消息接收后,A银行执行扣款操作。
如果扣款成功,A银行向RocketMQ提交Commit状态,RocketMQ将消息标记为可投递,B银行系统消费消息执行转入操作。
如果扣款失败,A银行向RocketMQ提交Rollback状态,RocketMQ删除半消息,B银行不会接收到转账消息。
这是一个经典的栗子,如果面试中问到可以这样回答!
代码案例
生产者发送消息,生产者代码,详细步骤见注释:
package c.x.r.transaction;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;
public class TransactionProducer {
public static void main(String[] args) throws Exception {
// 创建事务监听器对象
TransactionListener transactionListener = new TransactionListenerImpl();
// 创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("192.168.220.135:9876");
// 线程池用于执行事务消息的回查
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
// 自定义的线程池设置给生产者
producer.setExecutorService(executorService);
// 事务监听器,处理本地事务和回查逻辑
producer.setTransactionListener(transactionListener);
// 启动生产者
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
// 发送10条事务消息
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
// 发送间隔
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
// 关闭生产者
producer.shutdown();
}
}
TransactionListenerImpl 实现类:
package c.x.r.transaction;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class TransactionListenerImpl implements TransactionListener {
// executeLocalTransaction 方法执行本地事务,根据消息标签决定事务状态
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 获取消息中的标签
String tags = msg.getTags();
// 如果标签中包含 "TagA",提交事务
if(StringUtils.contains(tags,"TagA")){
return LocalTransactionState.COMMIT_MESSAGE;
}
// 如果标签中包含 "TagB",回滚事务
else if(StringUtils.contains(tags,"TagB")){
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// 如果标签既不是 "TagA" 也不是 "TagB",返回未知状态
else{
return LocalTransactionState.UNKNOW;
}
}
// checkLocalTransaction 方法检查本地事务的状态,用于消息的最终确认
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 获取消息中的标签
String tags = msg.getTags();
// 如果标签中包含 "TagC",确认提交事务
if(StringUtils.contains(tags,"TagC")){
return LocalTransactionState.COMMIT_MESSAGE;
}
// 如果标签中包含 "TagD",确认回滚事务
else if(StringUtils.contains(tags,"TagD")){
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// 如果标签既不是 "TagC" 也不是 "TagD",返回未知状态
else{
return LocalTransactionState.UNKNOW;
}
}
}
消费者代码:
package c.x.r.transaction;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1");
consumer.setNamesrvAddr("192.168.220.135:9876");
consumer.subscribe("TopicTest","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("收到的消息:"+msg);
}
// 消息处理成功,返回CONSUME_SUCCESS状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("消费者已启动");
}
}
注意事项
RocketMQ在使用过程中存在一些限制,需要注意:
- 不支持延时消息和批量消息:事务消息必须单独发送,不支持批量和延时处理。
- 消费幂等性:事务性消息可能多次被检查或消费,所以消费者端需要做好消费幂等处理。
- 回查次数限制:单个消息的检查次数默认限制为15次,可以通过Broker配置文件修改。
- 事务执行时间限制:RocketMQ要求本地事务执行器在规定时间内完成并返回结果,否则可能触发回查机制。
本篇文章到这里就结束了,后续会继续分享RocketMQ相关的知识,感谢各位小伙伴们的支持!