RocketMQ源码分析之事务消息分析
rocketMQ事务消息原理概述
RocketMQ采用两阶段提交(2PC)的思想来实现事务消息,当事务消息失败或者超时,同时采用补偿的方式处理这个问题。这两个阶段分别为正常事务消息的发送与提交以及事务消息的补偿。我们看看官方文档给的事务消息的流程图:
1、事务消息的发送与提交
MQ Producer 将事务消息发送给MQ Server(Broker 服务器),这时的消息称为半消息,半消息是不能被消费者消费的。当MQ Server成功接收到MQ Producer发送的半消息,就会给MQ Producer返回ack确认消息,告诉MQ Producer半消息是否成功接收到。如果半消息发送成功,就可以执行本地事务了,这个本地事务一般是数据库事务,否则就不执行本地事务。本地事务执行以后,MQ Server根据本地事务的执行状态执行半消息的提交或者回滚,当本地事务执行成功时,半消息被提交变成正常的消息,能够被消费者消息,当本地事务执行失败时,半消息就会被删除。
2、事务消息的补偿
当本地事务执行以后,MQ Producer会将本地事务的执行状态告诉MQ Server,即上图4过程,但是如果这个过程的请求如果失败或者超时了,MQ Server并不知道本地事务的状态,所以MQ Server会发送消息告诉MQ Procuer回查一次本地事务的状态,MQ Procuer回查本地事务的状态以后告知MQ Server,从而决定半消息是提交还是回滚。这个回查的逻辑为业务方实现,告知本地事务的执行结果。
这就是RocketMQ的事务消息原理,通过两阶段提交实现,采用补偿的方式达到数据的最终一致性。
这里有一个点需要注意,就是半消息是如何做到对消费者不可见的?
如果让我们来做,我们可能会将消息先放在一个消息者不能消费的地方,消息者是根据topic消费的,只要将消息放到消费者没有订阅的topic的队列中,这样消费者就不能消费消息了。当本地事务成功执行以后,半消息要变成可以被消费者消费的消息,那么将消息放回原本topic的消费队列就可以了。
其实RocketMQ让半消息对消费者不可见的做法也是这样的,如果是半消息,将消息的topic替换成RMQ_SYS_TRANS_HALF_TOPIC,由于消费者未订阅该主题,所以就实现了半消息对消费者不可见。
RocketMQ事务消息使用
讲完了RocketMQ的原理,我们接下看看RocketMQ事务消息的使用:
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
//事务的监听器
TransactionListener transactionListener = new TransactionListenerImpl();
//创建事务生产者
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
//回查线程
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//设置回查线程以及事务监听器
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
//发送消息
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
首先创建事务监听器TransactionListenerImpl、事务生产者TransactionMQProducer以及回查线程,然后设置事务监听器以及回查线程、启动生产者,最后使用sendMessageInTransaction方法发送事务消息了。这里设置事务监听器,事务监听器实现了TransactionListener接口:
public interface TransactionListener {
/**
* 当发送事务半消息成功,该方法将会执行本地事务
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/**
* 当没有收到半消息的响应,broker将会发送回查消息检测事务的状态,以及该方法将会获取本地事务的状态
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
TransactionListener有executeLocalTransaction和checkLocalTransaction方法,executeLocalTransaction方法是当发送事务半消息成功,该方法方法用来执行本地事务,checkLocalTransaction当没有收到发送半消息的响应,broker将会通过该方法回查本地事务的状态,从而决定半消息是提交还是回滚。
RocketMQ事务消息源码分析
分析了RocketMQ的事务消息原理以及RoketMQ事务消息的使用,接下来深入源码分析下面几个问题:
- 半消息如何做到对消费者不可见
- 本地事务什么时候执行,即executeLocalTransaction方法什么时候执行
- 半消息的提交以及回滚
- Broker什么时候触发checkLocalTransaction回查方法
半消息如何做到对消费者不可见
RocketMQ事务消息采用的是同步的发送方法发送事务消息的,由于消息的发送已经在RocketMQ源码之生产者发送消息分析中已经讲过,这里就不讲了。当Broker接收到半消息以后,将会解析半消息,方法如下:
//代码位置: org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
//REAL_TOPIC 真实的topic
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
//REAL_QID 真实的QueueId
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
//重新设置topic和QueueId
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
parseHalfMessageInner将消息的真实topic和真实的QueueId保存在Property属性中,然后重新设置topic和QueueId,topic设置为RMQ_SYS_TRANS_HALF_TOPIC,QueueId设置为0,buildHalfTopic方法就是获取RMQ_SYS_TRANS_HALF_TOPIC,如下:
//代码位置:org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil#buildHalfTopic
public static String buildHalfTopic() {
return MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
}
上述将半消息的topic替换为RMQ_SYS_TRANS_HALF_TOPIC,因为消费者没有订阅RMQ_SYS_TRANS_HALF_TOPIC,所以半消息对消费者不可见,这就是半消息对消费者不可见的源码分析。
本地事务什么时候执行
当半消息发送以后,就等待消息发送的结果,然后就调用本地事务执行方法executeLocalTransaction方法:
//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg,
//省略代码
SendResult sendResult = null;
//设置事务标致
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
//发送消息
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
//根据发送状态
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
//省略代码
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
//执行executeLocalTransaction方法
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
//其他状态则回滚消息
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
try {
//本地事务执行以后,告知broker服务器,半消息是提交还是回滚
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
//省略代码
}
sendMessageInTransaction方法发送事务消息,当发送以后会得到一个发送结果sendResult,根据发送结果的状态决定是否执行本地事务,当发送状态为SEND_OK时,执行本地事务方法executeLocalTransaction,并得到本地事务执行状态localTransactionState;当发送结果的状态是其他状态时,localTransactionState设置回滚消息,最后调用endTransaction方法通知Broker服务器,告知本地事务的执行状态,Broker服务器根据本地事务执行状态决定半消息是提交还是回滚。
半消息的提交以及回滚
当本地事务执行完成以后,就会告知Broker服务器本地事务执行的状态,调用的方法就是endTransaction,如下:
//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#endTransaction
public void endTransaction(
final SendResult sendResult,
final LocalTransactionState localTransactionState,
final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
final MessageId id;
//设置事务id
if (sendResult.getOffsetMsgId() != null) {
id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
} else {
id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
}
String transactionId = sendResult.getTransactionId();
//根据broker名字查找broker地址
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
//事务结束请求头
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setTransactionId(transactionId);
requestHeader.setCommitLogOffset(id.getOffset());
//设置提交或者回滚标志
switch (localTransactionState) {
case COMMIT_MESSAGE: //提交消息
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE: //回滚消息
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
case UNKNOW: //不知道
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
requestHeader.setMsgId(sendResult.getMsgId());
String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
//调用单向的发送方法发送事务结束请求
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
endTransaction方法首先根据broker名字查找broker地址,然后再进行封装事务结束请求头,设置提交还是回滚的标志,然后调用单向的发送方法发送事务结束请求。当Broker服务器接收事务结束请求,将会调用processRequest处理请求:
//代码位置:org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
//解析事务结束请求头
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
LOGGER.debug("Transaction request:{}", requestHeader);
//如果是broker角色是SLAVE,则返回禁止结束事务的响应
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
return response;
}
//如果是事务回查
if (requestHeader.getFromTransactionCheck()) {
switch (requestHeader.getCommitOrRollback()) {
//不是事务类型,打印告警信息,返回null
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
LOGGER.warn("Check producer[{}] transaction state, but it's pending status."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
return null;
}
//事务提交
case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
break;
}
//事务回滚
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
break;
}
default:
return null;
}
} else {
switch (requestHeader.getCommitOrRollback()) {
//不是事务类型
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
LOGGER.warn("The producer[{}] end transaction in sending message, and it's pending status."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
return null;
}
//事务回滚
case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
break;
}
//事务回滚
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
break;
}
default:
return null;
}
}
//省略代码:做提交或者回滚操作
}
processRequest方法首先解析事务结束请求头,如果该broker角色是SLAVE,则返回禁止结束事务的响应。然后判断事务结束请求头中的回查标志,正常的事务结束请求的回查标志为false,根据事务结束请求头的提交回滚标志做不同的逻辑,这里提交和回滚其实并没有什么逻辑,打印日志或者什么都不做就退出switch,接下来则需要继续走提交和回滚的操作,代码如下:
//代码位置:org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
RemotingCommandException {
//代码省略
//操作结果
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
//从commitLog中读出half消息
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
//提交成功
if (result.getResponseCode() == ResponseCode.SUCCESS) {
//检查半消息是否合法
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
//将半消息转成真实的原topic消息
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
//将消息写入commitLog
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
//real消息持久化以后,删除半消息
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
//回滚消息
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
//回滚成功,删除半消息
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
response.setCode(result.getResponseCode());
response.setRemark(result.getResponseRemark());
return response;
}
如果是事务类型是提交类型,则提交消息,消息提交成功以后,则检查半消息是否合法,如果合法,则把半消息还原为原来的消息,即将topic和queueId替换为真实的topic和queueId,最后将消息落盘持久化保存起来,持久化成功以后还需要将原来的半消息删除掉。因为消息的topic已经被替换成真实的topic,则消费者就可以消费此消息了;如果是事务回滚类型,则回滚消息,回滚消息成功以后删除半消息。endMessageTransaction方法里面替换了真实的topic和queueId,代码如下:
//代码位置:org.apache.rocketmq.broker.processor.EndTransactionProcessor#endMessageTransaction
private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
//替换为消息的真实topic和queueId
msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
//代码省略
}
Broker什么时候触发checkLocalTransaction回查方法
上述的分析是正常的事务消息的提交以及回滚,当Broker不能确定本地事务执行状态时,需要依靠回查确定本地事务状态确定消息提交还是回滚。Broker在启动的时候,会创建并启动事务回查服务TransactionalMessageCheckService线程,TransactionalMessageCheckService服务会每分钟进行回查。代码如下:
//代码位置:org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#run
public void run() {
log.info("Start transaction check service thread!");
//事务检测间隔
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
log.info("End transaction check service thread!");
}
waitForRunning方法又调用onWaitEnd方法进行回查操作,代码如下:
//代码位置:org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#onWaitEnd
protected void onWaitEnd() {
//超时时间6秒
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
//最大回查次数15
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
从onWaitEnd方法可知,回查的超时时间为6秒,最大的回查次数为15秒,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。然后调用check方法回查,check方法最终会调用AbstractTransactionalMessageCheckListener的sendCheckMessage方法给生产者发送回查本地事务执行状态的方法,如下代码所示:
//代码位置:org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener#sendCheckMessage
public void sendCheckMessage(MessageExt msgExt) throws Exception {
//回查事务状态请求头
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
//替换成真实的topic和queueId
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
//REAL_QID
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
//PGROUP
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
//获取可用的连接
Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
if (channel != null) {
//发送回查请求
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}
sendCheckMessage方法首先构建回查事务状态请求头,将消息的topic和queueId替换成真实的topic和queueId,最后调用checkProducerTransactionState给生产者发送回查请求,请求码为CHECK_TRANSACTION_STATE(39)。这就是Broker给生产者发送回查请求的过程分析,接下来将分析当生产者接受到回查请求如何处理。
//代码位置:org.apache.rocketmq.client.impl.ClientRemotingProcessor#checkTransactionState
//检查本地事务状态
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
//检查事务状态请求头
final CheckTransactionStateRequestHeader requestHeader =
(CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
//解码消息体
final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
if (messageExt != null) {
if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
messageExt.setTopic(NamespaceUtil
.withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));
}
//事务id
String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
messageExt.setTransactionId(transactionId);
}
final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
if (group != null) {
//通过group选择内部生产者
MQProducerInner producer = this.mqClientFactory.selectProducer(group);
if (producer != null) {
//连接地址
final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
//会查本地事务状态
producer.checkTransactionState(addr, messageExt, requestHeader);
} else {
log.debug("checkTransactionState, pick producer by group[{}] failed", group);
}
} else {
log.warn("checkTransactionState, pick producer group failed");
}
} else {
log.warn("checkTransactionState, decode message failed");
}
return null;
}
生产者收到Broker的回查本地事务的请求,会将请求交给上述的checkTransactionState方法处理。当接收到请求时,首先解析本地事务请求头,通过生产者组group找到生产者producer,然后执行生产者producer的checkTransactionState方法回查本地事务的状态,生产者producer的checkTransactionState方法如下:
//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkTransactionState
public void checkTransactionState(final String addr, final MessageExt msg,
final CheckTransactionStateRequestHeader header) {
Runnable request = //代码省略
this.checkExecutor.submit(request);
}
checkTransactionState方法首先创建Runnable,上述将创建Runnable的代码省略了,在下面将会具体讲解这部分代码。创建好Runnable以后,就交给线程池处理。接下来看看创建Runnable的代码:
Runnable request = new Runnable() {
private final String brokerAddr = addr;
private final MessageExt message = msg;
private final CheckTransactionStateRequestHeader checkRequestHeader = header;
private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();
@Override
public void run() {
//获取事务回查监听接口
TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
TransactionListener transactionListener = getCheckListener();
//如果事务回查监听接口不为空
if (transactionCheckListener != null || transactionListener != null) {
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable exception = null;
try {
if (transactionCheckListener != null) {
//回查本地事务的状态
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
}
this.processTransactionState(
localTransactionState,
group,
exception);
} else {
log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
}
}
//处理本地事务状态,给Broker发送本地事务状态
private void processTransactionState(
final LocalTransactionState localTransactionState,
final String producerGroup,
final Throwable exception) {
//创建结束事务请求头
final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
thisHeader.setProducerGroup(producerGroup);
thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
thisHeader.setFromTransactionCheck(true);
String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (uniqueKey == null) {
uniqueKey = message.getMsgId();
}
thisHeader.setMsgId(uniqueKey);
thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
//根据本地不同的事务状态进行设置事务的类型
switch (localTransactionState) {
case COMMIT_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
case ROLLBACK_MESSAGE:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
log.warn("when broker check, client rollback this transaction, {}", thisHeader);
break;
case UNKNOW:
thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
break;
default:
break;
}
String remark = null;
if (exception != null) {
remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
}
try {
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
3000);
} catch (Exception e) {
log.error("endTransactionOneway exception", e);
}
}
};
Runnable类中有两个方法,一个是run,这个方法就是具体执行回查本地事务状态,processTransactionState方法就是将回查到本地事务的执行状态告知Broker服务器,run方法会调用processTransactionState方法。
run方法首先会获取本地事务回查接口TransactionCheckListener和TransactionListener,TransactionCheckListener是比较老的回查本地事务接口,已经废弃了,TransactionListener是比较新的接口。当TransactionCheckListener接口不等于null时,则调用TransactionCheckListener接口的checkLocalTransactionState方法回查本地事务状态,当TransactionListener不等于null时,则调用TransactionListener接口的checkLocalTransaction方法回查本地事务状态。
回查完本地事务状态以后,将本地事务状态作为参数传给processTransactionState方法,processTransactionState方法创建事务结束请求头,根据本地事务状态设置事务类型,然后调用endTransactionOneway方法告知Broker服务器,Broker服务器接收到事务结束请求,就交给上述processRequest方法处理,processRequest处理事务结束请求的逻辑已经分析过,这里就不分析了。