当前位置: 首页 > article >正文

RocketMQ源码分析之事务消息分析

rocketMQ事务消息原理概述

RocketMQ采用两阶段提交(2PC)的思想来实现事务消息,当事务消息失败或者超时,同时采用补偿的方式处理这个问题。这两个阶段分别为正常事务消息的发送与提交以及事务消息的补偿。我们看看官方文档给的事务消息的流程图:

在这里插入图片描述

1、事务消息的发送与提交

MQ Producer 将事务消息发送给MQ Server(Broker 服务器),这时的消息称为半消息,半消息是不能被消费者消费的。当MQ Server成功接收到MQ Producer发送的半消息,就会给MQ Producer返回ack确认消息,告诉MQ Producer半消息是否成功接收到。如果半消息发送成功,就可以执行本地事务了,这个本地事务一般是数据库事务,否则就不执行本地事务。本地事务执行以后,MQ Server根据本地事务的执行状态执行半消息的提交或者回滚,当本地事务执行成功时,半消息被提交变成正常的消息,能够被消费者消息,当本地事务执行失败时,半消息就会被删除。

2、事务消息的补偿

当本地事务执行以后,MQ Producer会将本地事务的执行状态告诉MQ Server,即上图4过程,但是如果这个过程的请求如果失败或者超时了,MQ Server并不知道本地事务的状态,所以MQ Server会发送消息告诉MQ Procuer回查一次本地事务的状态,MQ Procuer回查本地事务的状态以后告知MQ Server,从而决定半消息是提交还是回滚。这个回查的逻辑为业务方实现,告知本地事务的执行结果。

这就是RocketMQ的事务消息原理,通过两阶段提交实现,采用补偿的方式达到数据的最终一致性。

这里有一个点需要注意,就是半消息是如何做到对消费者不可见的?

如果让我们来做,我们可能会将消息先放在一个消息者不能消费的地方,消息者是根据topic消费的,只要将消息放到消费者没有订阅的topic的队列中,这样消费者就不能消费消息了。当本地事务成功执行以后,半消息要变成可以被消费者消费的消息,那么将消息放回原本topic的消费队列就可以了。

其实RocketMQ让半消息对消费者不可见的做法也是这样的,如果是半消息,将消息的topic替换成RMQ_SYS_TRANS_HALF_TOPIC,由于消费者未订阅该主题,所以就实现了半消息对消费者不可见。

RocketMQ事务消息使用

讲完了RocketMQ的原理,我们接下看看RocketMQ事务消息的使用:

public class TransactionProducer {

   public static void main(String[] args) throws MQClientException, InterruptedException {
       //事务的监听器
       TransactionListener transactionListener = new TransactionListenerImpl();
       //创建事务生产者
       TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
       //回查线程
       ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
           @Override
           public Thread newThread(Runnable r) {
               Thread thread = new Thread(r);
               thread.setName("client-transaction-msg-check-thread");
               return thread;
           }
       });
       //设置回查线程以及事务监听器
       producer.setExecutorService(executorService);
       producer.setTransactionListener(transactionListener);
       producer.start();
       String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
       //发送消息
       for (int i = 0; i < 10; i++) {
           try {
               Message msg =
                   new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                       ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
               //发送事务消息
               SendResult sendResult = producer.sendMessageInTransaction(msg, null);
               System.out.printf("%s%n", sendResult);
               Thread.sleep(10);
           } catch (MQClientException | UnsupportedEncodingException e) {
               e.printStackTrace();
           }
       }
       for (int i = 0; i < 100000; i++) {
           Thread.sleep(1000);
       }
       producer.shutdown();
   }
}

首先创建事务监听器TransactionListenerImpl、事务生产者TransactionMQProducer以及回查线程,然后设置事务监听器以及回查线程、启动生产者,最后使用sendMessageInTransaction方法发送事务消息了。这里设置事务监听器,事务监听器实现了TransactionListener接口:

public interface TransactionListener {
    /**
     * 当发送事务半消息成功,该方法将会执行本地事务
     */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

    /**
     * 当没有收到半消息的响应,broker将会发送回查消息检测事务的状态,以及该方法将会获取本地事务的状态
     */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

TransactionListener有executeLocalTransaction和checkLocalTransaction方法,executeLocalTransaction方法是当发送事务半消息成功,该方法方法用来执行本地事务,checkLocalTransaction当没有收到发送半消息的响应,broker将会通过该方法回查本地事务的状态,从而决定半消息是提交还是回滚。

RocketMQ事务消息源码分析

分析了RocketMQ的事务消息原理以及RoketMQ事务消息的使用,接下来深入源码分析下面几个问题:

  • 半消息如何做到对消费者不可见
  • 本地事务什么时候执行,即executeLocalTransaction方法什么时候执行
  • 半消息的提交以及回滚
  • Broker什么时候触发checkLocalTransaction回查方法

半消息如何做到对消费者不可见

RocketMQ事务消息采用的是同步的发送方法发送事务消息的,由于消息的发送已经在RocketMQ源码之生产者发送消息分析中已经讲过,这里就不讲了。当Broker接收到半消息以后,将会解析半消息,方法如下:

//代码位置: org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
        //REAL_TOPIC 真实的topic
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
        //REAL_QID 真实的QueueId
        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
            String.valueOf(msgInner.getQueueId()));
        msgInner.setSysFlag(
            MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
        //重新设置topic和QueueId
        msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
        msgInner.setQueueId(0);
        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
        return msgInner;
}

parseHalfMessageInner将消息的真实topic和真实的QueueId保存在Property属性中,然后重新设置topic和QueueId,topic设置为RMQ_SYS_TRANS_HALF_TOPIC,QueueId设置为0,buildHalfTopic方法就是获取RMQ_SYS_TRANS_HALF_TOPIC,如下:

//代码位置:org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil#buildHalfTopic
public static String buildHalfTopic() {
        return MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
}

上述将半消息的topic替换为RMQ_SYS_TRANS_HALF_TOPIC,因为消费者没有订阅RMQ_SYS_TRANS_HALF_TOPIC,所以半消息对消费者不可见,这就是半消息对消费者不可见的源码分析。

本地事务什么时候执行

当半消息发送以后,就等待消息发送的结果,然后就调用本地事务执行方法executeLocalTransaction方法:

//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg,

        //省略代码
        SendResult sendResult = null;
        //设置事务标致
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            //发送消息
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }

        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        //根据发送状态                                              
        switch (sendResult.getSendStatus()) {
            case SEND_OK: {
                try {

                    //省略代码
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        //执行executeLocalTransaction方法
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                //其他状态则回滚消息
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }

        try {
            //本地事务执行以后,告知broker服务器,半消息是提交还是回滚
            this.endTransaction(sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }

        //省略代码
}

sendMessageInTransaction方法发送事务消息,当发送以后会得到一个发送结果sendResult,根据发送结果的状态决定是否执行本地事务,当发送状态为SEND_OK时,执行本地事务方法executeLocalTransaction,并得到本地事务执行状态localTransactionState;当发送结果的状态是其他状态时,localTransactionState设置回滚消息,最后调用endTransaction方法通知Broker服务器,告知本地事务的执行状态,Broker服务器根据本地事务执行状态决定半消息是提交还是回滚。

半消息的提交以及回滚

当本地事务执行完成以后,就会告知Broker服务器本地事务执行的状态,调用的方法就是endTransaction,如下:

//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#endTransaction
public void endTransaction(
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        final MessageId id;
        //设置事务id
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }
        String transactionId = sendResult.getTransactionId();
        //根据broker名字查找broker地址
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());

        //事务结束请求头
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        //设置提交或者回滚标志
        switch (localTransactionState) {
            case COMMIT_MESSAGE: //提交消息
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE: //回滚消息
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW: //不知道
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }

        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
        //调用单向的发送方法发送事务结束请求
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
}

endTransaction方法首先根据broker名字查找broker地址,然后再进行封装事务结束请求头,设置提交还是回滚的标志,然后调用单向的发送方法发送事务结束请求。当Broker服务器接收事务结束请求,将会调用processRequest处理请求:

//代码位置:org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
        RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        //解析事务结束请求头
        final EndTransactionRequestHeader requestHeader =
            (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
        LOGGER.debug("Transaction request:{}", requestHeader);
        //如果是broker角色是SLAVE,则返回禁止结束事务的响应
        if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
            response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
            LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
            return response;
        }

        //如果是事务回查
        if (requestHeader.getFromTransactionCheck()) {
            switch (requestHeader.getCommitOrRollback()) {
                //不是事务类型,打印告警信息,返回null
                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, but it's pending status."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    return null;
                }

                //事务提交
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());

                    break;
                }

                //事务回滚
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                    LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    break;
                }
                default:
                    return null;
            }
        } else {
            switch (requestHeader.getCommitOrRollback()) {
                //不是事务类型
                case MessageSysFlag.TRANSACTION_NOT_TYPE: {
                    LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    return null;
                }

                 //事务回滚
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
                    break;
                }
                //事务回滚
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
                    LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
                            + "RequestHeader: {} Remark: {}",
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        requestHeader.toString(),
                        request.getRemark());
                    break;
                }
                default:
                    return null;
            }
        }
        //省略代码:做提交或者回滚操作
}

processRequest方法首先解析事务结束请求头,如果该broker角色是SLAVE,则返回禁止结束事务的响应。然后判断事务结束请求头中的回查标志,正常的事务结束请求的回查标志为false,根据事务结束请求头的提交回滚标志做不同的逻辑,这里提交和回滚其实并没有什么逻辑,打印日志或者什么都不做就退出switch,接下来则需要继续走提交和回滚的操作,代码如下:

//代码位置:org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
        RemotingCommandException {

        //代码省略

        //操作结果
        OperationResult result = new OperationResult();
        if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
            //从commitLog中读出half消息
            result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
            //提交成功
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                //检查半消息是否合法
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                if (res.getCode() == ResponseCode.SUCCESS) {
                    //将半消息转成真实的原topic消息
                    MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                    msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                    msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                    msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                    msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    //将消息写入commitLog
                    RemotingCommand sendResult = sendFinalMessage(msgInner);
                    if (sendResult.getCode() == ResponseCode.SUCCESS) {
                        //real消息持久化以后,删除半消息
                        this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                    }
                    return sendResult;
                }
                return res;
            }
        } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
            //回滚消息
            result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
            if (result.getResponseCode() == ResponseCode.SUCCESS) {
                RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
                //回滚成功,删除半消息
                if (res.getCode() == ResponseCode.SUCCESS) {
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return res;
            }
        }
        response.setCode(result.getResponseCode());
        response.setRemark(result.getResponseRemark());
        return response;
}

如果是事务类型是提交类型,则提交消息,消息提交成功以后,则检查半消息是否合法,如果合法,则把半消息还原为原来的消息,即将topic和queueId替换为真实的topic和queueId,最后将消息落盘持久化保存起来,持久化成功以后还需要将原来的半消息删除掉。因为消息的topic已经被替换成真实的topic,则消费者就可以消费此消息了;如果是事务回滚类型,则回滚消息,回滚消息成功以后删除半消息。endMessageTransaction方法里面替换了真实的topic和queueId,代码如下:

//代码位置:org.apache.rocketmq.broker.processor.EndTransactionProcessor#endMessageTransaction
private MessageExtBrokerInner endMessageTransaction(MessageExt msgExt) {

        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        //替换为消息的真实topic和queueId
        msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));

        //代码省略

}

Broker什么时候触发checkLocalTransaction回查方法

上述的分析是正常的事务消息的提交以及回滚,当Broker不能确定本地事务执行状态时,需要依靠回查确定本地事务状态确定消息提交还是回滚。Broker在启动的时候,会创建并启动事务回查服务TransactionalMessageCheckService线程,TransactionalMessageCheckService服务会每分钟进行回查。代码如下:

//代码位置:org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#run
public void run() {
        log.info("Start transaction check service thread!");
        //事务检测间隔
        long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
        while (!this.isStopped()) {
            this.waitForRunning(checkInterval);
        }
        log.info("End transaction check service thread!");
}

waitForRunning方法又调用onWaitEnd方法进行回查操作,代码如下:

//代码位置:org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService#onWaitEnd
protected void onWaitEnd() {
        //超时时间6秒
        long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
        //最大回查次数15
        int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
        long begin = System.currentTimeMillis();
        log.info("Begin to check prepare message, begin time:{}", begin);
        this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
        log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

从onWaitEnd方法可知,回查的超时时间为6秒,最大的回查次数为15秒,如果15次回查还是无法得知事务状态,rocketmq默认回滚该消息。然后调用check方法回查,check方法最终会调用AbstractTransactionalMessageCheckListener的sendCheckMessage方法给生产者发送回查本地事务执行状态的方法,如下代码所示:

//代码位置:org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener#sendCheckMessage
public void sendCheckMessage(MessageExt msgExt) throws Exception {
        //回查事务状态请求头
        CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
        checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
        checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
        checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
        checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
        checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
        //替换成真实的topic和queueId
        msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
        //REAL_QID
        msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
        msgExt.setStoreSize(0);
        //PGROUP
        String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
        //获取可用的连接
        Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
        if (channel != null) {
            //发送回查请求
            brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
        } else {
            LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
        }
}

sendCheckMessage方法首先构建回查事务状态请求头,将消息的topic和queueId替换成真实的topic和queueId,最后调用checkProducerTransactionState给生产者发送回查请求,请求码为CHECK_TRANSACTION_STATE(39)。这就是Broker给生产者发送回查请求的过程分析,接下来将分析当生产者接受到回查请求如何处理。

//代码位置:org.apache.rocketmq.client.impl.ClientRemotingProcessor#checkTransactionState
//检查本地事务状态
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        //检查事务状态请求头
        final CheckTransactionStateRequestHeader requestHeader =
            (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class);
        final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody());
        //解码消息体
        final MessageExt messageExt = MessageDecoder.decode(byteBuffer);
        if (messageExt != null) {
            if (StringUtils.isNotEmpty(this.mqClientFactory.getClientConfig().getNamespace())) {
                messageExt.setTopic(NamespaceUtil
                    .withoutNamespace(messageExt.getTopic(), this.mqClientFactory.getClientConfig().getNamespace()));
            }
            //事务id
            String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
            if (null != transactionId && !"".equals(transactionId)) {
                messageExt.setTransactionId(transactionId);
            }
            final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
            if (group != null) {
                //通过group选择内部生产者
                MQProducerInner producer = this.mqClientFactory.selectProducer(group);
                if (producer != null) {
                    //连接地址
                    final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
                    //会查本地事务状态
                    producer.checkTransactionState(addr, messageExt, requestHeader);
                } else {
                    log.debug("checkTransactionState, pick producer by group[{}] failed", group);
                }
            } else {
                log.warn("checkTransactionState, pick producer group failed");
            }
        } else {
            log.warn("checkTransactionState, decode message failed");
        }

        return null;
}

生产者收到Broker的回查本地事务的请求,会将请求交给上述的checkTransactionState方法处理。当接收到请求时,首先解析本地事务请求头,通过生产者组group找到生产者producer,然后执行生产者producer的checkTransactionState方法回查本地事务的状态,生产者producer的checkTransactionState方法如下:

//代码位置:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#checkTransactionState
public void checkTransactionState(final String addr, final MessageExt msg,
        final CheckTransactionStateRequestHeader header) {
        Runnable request = //代码省略
        this.checkExecutor.submit(request);
 }

checkTransactionState方法首先创建Runnable,上述将创建Runnable的代码省略了,在下面将会具体讲解这部分代码。创建好Runnable以后,就交给线程池处理。接下来看看创建Runnable的代码:

Runnable request = new Runnable() {
            private final String brokerAddr = addr;
            private final MessageExt message = msg;
            private final CheckTransactionStateRequestHeader checkRequestHeader = header;
            private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup();

            @Override
            public void run() {
                //获取事务回查监听接口
                TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener();
                TransactionListener transactionListener = getCheckListener();
                //如果事务回查监听接口不为空
                if (transactionCheckListener != null || transactionListener != null) {
                    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
                    Throwable exception = null;
                    try {
                        if (transactionCheckListener != null) {
                            //回查本地事务的状态
                            localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
                        } else if (transactionListener != null) {
                            log.debug("Used new check API in transaction message");
                            localTransactionState = transactionListener.checkLocalTransaction(message);
                        } else {
                            log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
                        }
                    } catch (Throwable e) {
                        log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
                        exception = e;
                    }

                    this.processTransactionState(
                        localTransactionState,
                        group,
                        exception);
                } else {
                    log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group);
                }
            }

            //处理本地事务状态,给Broker发送本地事务状态
            private void processTransactionState(
                final LocalTransactionState localTransactionState,
                final String producerGroup,
                final Throwable exception) {
                //创建结束事务请求头
                final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader();
                thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset());
                thisHeader.setProducerGroup(producerGroup);
                thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset());
                thisHeader.setFromTransactionCheck(true);

                String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (uniqueKey == null) {
                    uniqueKey = message.getMsgId();
                }
                thisHeader.setMsgId(uniqueKey);
                thisHeader.setTransactionId(checkRequestHeader.getTransactionId());
                //根据本地不同的事务状态进行设置事务的类型
                switch (localTransactionState) {
                    case COMMIT_MESSAGE:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                        break;
                    case ROLLBACK_MESSAGE:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                        log.warn("when broker check, client rollback this transaction, {}", thisHeader);
                        break;
                    case UNKNOW:
                        thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                        log.warn("when broker check, client does not know this transaction state, {}", thisHeader);
                        break;
                    default:
                        break;
                }

                String remark = null;
                if (exception != null) {
                    remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception);
                }

                try {
                    DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
                        3000);
                } catch (Exception e) {
                    log.error("endTransactionOneway exception", e);
                }
            }
};

Runnable类中有两个方法,一个是run,这个方法就是具体执行回查本地事务状态,processTransactionState方法就是将回查到本地事务的执行状态告知Broker服务器,run方法会调用processTransactionState方法。

run方法首先会获取本地事务回查接口TransactionCheckListener和TransactionListener,TransactionCheckListener是比较老的回查本地事务接口,已经废弃了,TransactionListener是比较新的接口。当TransactionCheckListener接口不等于null时,则调用TransactionCheckListener接口的checkLocalTransactionState方法回查本地事务状态,当TransactionListener不等于null时,则调用TransactionListener接口的checkLocalTransaction方法回查本地事务状态。

回查完本地事务状态以后,将本地事务状态作为参数传给processTransactionState方法,processTransactionState方法创建事务结束请求头,根据本地事务状态设置事务类型,然后调用endTransactionOneway方法告知Broker服务器,Broker服务器接收到事务结束请求,就交给上述processRequest方法处理,processRequest处理事务结束请求的逻辑已经分析过,这里就不分析了。


http://www.kler.cn/a/509963.html

相关文章:

  • TextButton组件的功能与用法
  • CV 图像处理基础笔记大全(超全版哦~)!!!
  • 【Linux】【Vim】vim编辑器的用法
  • Html5 video标签学习
  • MySQL 事务
  • Kafka常用命令
  • kubernetes v1.29.XX版本HPA、KPA、VPA并压力测试
  • Json转换类型报错问题:java.lang.Integer cannot be cast to java.math.BigDecimal
  • 记录一次关于spring映射postgresql的jsonb类型的转化器事故,并使用hutool的JSONArray完成映射
  • Leetcode - 周赛432
  • leetcode34-排序数组中查找数组的第一个和最后一个位置
  • Learning Prompt
  • Kubernetes (K8s) 权限管理指南
  • 【Linux】15.Linux进程概念(4)
  • linux 安装jdk1.8
  • 【脑机接口数据处理】bdf文件转化mat文件
  • AI Prompt 设计指南:从基础构建到高质量生成的全面解析
  • h5使用video播放时关掉vant弹窗视频声音还在后台播放
  • Centos7将/dev/mapper/centos-home磁盘空间转移到/dev/mapper/centos-root
  • 分布式CAP理论介绍
  • Dart语言
  • 计算机视觉语义分割——U-Net(Convolutional Networks for Biomedical Image Segmentation)
  • 【视觉惯性SLAM:十六、 ORB-SLAM3 中的多地图系统】
  • 深入探索Go语言中的临时对象池:sync.Pool
  • Vue2.0的安装
  • K210视觉识别模块