SpringCloud 微服务消息队列灰度方案 (RocketMQ 4.x)
目录
- 背景
- 遇到的问题
- RocketMQ 基础
- 基础消息模型
- 扩展后的消息模型
- 部署模型
- 相关概念点
- 方案对比
- 影子Topic的方案
- Tag的方案
- UserProperty的方案
- 影子Group的方案
- 灰度分区的方案
- 方案对比
- 灰度分区方案设计
- 适配只有部分灰度的情况所做的功能扩展
- 消费者(无灰度)
- 消费者(有灰度)
- 改造流程
- 生产者改造逻辑
- 消费者改造逻辑
- 消费者自定义负载均衡
- MQ注册ClientID修改
- 常量配置类
- 链路信息传递`BasicMDC`类
- MessageStorage类
- 拦截配置
- 消费者拦截
- 生产者拦截
- 请求拦截
- 验证过程
- 消费者(未启动灰度订阅)
- 消费者(灰度订阅)
- 验证消息链路
- 发送端
- 消费端
- 发送灰度消息
背景
- 我们公司团队为了更好地控制版本发布的影响范围,自研了灰度发布流程,目前已经支持
http
、gRPC
等接口调用方式进行灰度
流量转发,但是消息队列
基于业务实现展示不支持。 - 参考了网上很多灰度方案博文,比较热门的vivo
鲁班RocketMQ
的文章,但是涉及到的改造范围都是比较大,最后在看到关于Asyncer关于灰度分区 的博文,整体比较巧妙的利用MQ的原有机制,改动相对少了很多
遇到的问题
如上图所示普通的业务灰度流程,保证了RPC服务之间的调用灰度,但是当消息从服务发出到消息队列后,队列分区是被均分到正常服务
和灰度服务
监听的;这样会导致正常服务消费到灰度消息;同时灰度服务消费到正常的消息;所以MQ灰度是目前需要解决的问题,这样才能完成整个灰度链路;
RocketMQ 基础
基础消息模型
扩展后的消息模型
- 相同的ConsumerGroup下的消费者主要有两种负载均衡模式,即广播模式,和集群模式(图中是最常用的集群模式)。
- 在集群模式下,同一个 ConsumerGroup 中的 Consumer 实例是负载均衡消费,如图中 ConsumerGroupA 订阅 TopicA,TopicA 对应 3个队列,则 GroupA 中的 Consumer1 消费的是 MessageQueue 0和 MessageQueue 1的消息,Consumer2是消费的是MessageQueue2的消息。
- 在广播模式下,同一个 ConsumerGroup 中的每个 Consumer 实例都处理全部的队列。需要注意的是,广播模式下因为每个 Consumer 实例都需要处理全部的消息,因此这种模式仅推荐在通知推送、配置同步类小流量场景使用。
部署模型
Producer、Consumer又是如何找到Topic和Broker的地址呢?消息的具体发送和接收又是怎么进行的呢?
RocketMQ 部署架构上主要分为四部分:
-
生产者 Producer
- 发布消息的角色。Producer通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败和重试。
-
消费者 Consumer
- 消息消费的角色
- 支持以推(push),拉(pull)两种模式对消息进行消费。
- 同时也支持集群方式和广播方式的消费。
- 提供实时消息订阅机制,可以满足大多数用户的需求。
-
名字服务器 NameServer
- NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。
- Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
- 路由信息管理,每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
- NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。
-
代理服务器 Broker
- Broker主要负责消息的存储、投递和查询以及服务高可用保证。
- NameServer几乎无状态节点,因此可集群部署,节点之间无任何信息同步。Broker部署相对复杂。
- 每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
- Producer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。
- Consumer 与 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息。
相关概念点
CommitLog
:消息体实际存储的地方,当我们发送的任一业务消息的时候,它最终会存储在commitLog上。MQ在Broker进行集群部署(这里为也简洁,不涉及主从部分)时,同一业务消息只会落到集群的某一个Broker节点上。而这个Broker上的commitLog就会存储所有Topic路由到它的消息,当消息数据量到达1个G后会重新生成一个新的commitLog。Topic
:消息主题,表示一类消息的逻辑集合。每条消息只属于一个Topic,Topic中包含多条消息,是MQ进行消息发送订阅的基本单位。属于一级消息类型,偏重于业务逻辑设计。Tag
:消息标签,二级消息类型,每一个具体的消息都可以选择性地附带一个Tag,用于区分同一个Topic中的消息类型Queue
:实际上Topic更像是一个逻辑概念供我们使用,在源码层级看,Topic以Queue的形式分布在多个Broker上,一个topic往往包含多条Queue消费组及其ID
:表示一类Producer或Consumer,这类Producer或Consumer通常生产或消费同应用域的消息,且消息生产与消费的逻辑一致。每个消费组可以定义全局维一的GroupID来标识,由它来代表消费组。不同的消费组在消费时互相隔离,不会影响彼此的消费位点计算。
方案对比
影子Topic的方案
- 新建一系列新的Topic来处理隔离灰度的消息
- 例如对于TOPIC_ORDER会创建TOPIC_ORDER_GRAY来给灰度环境使用。
- 发送方在发送时,对灰度的消息写入到影子Topic中。消费时,灰度环境只使用灰度的分组订阅灰度的Topic。
Tag的方案
- 发送方在发送时,对灰度环境产生的消息的Tag加
灰度标识
。 - 消费方,每次灰度版本发布时只订阅
灰度Tag
的消息,正常的版本订阅非灰度的Tag。
UserProperty的方案
- 发送方在发送时,对灰度环境产生的消息的UserProperty加灰度标识。
- 消费方的客户端需要进行改写,根据UserProperty来过滤,正常的版本会跳过这类消息,灰度的版本会处理灰度消息。
- 流程与灰度Tag差不错
影子Group的方案
- 发送消息:灰度的服务发送带有灰度标识的消息
- 消费消息:灰度服务只消费灰度标识的消息
灰度分区的方案
- 发送者注册时会检测当前将是不是灰度节点,灰度节点MQ注册的clientId会添加标识
- 消费者订阅时会检测当前节点是不是灰度节点,灰度节点会走灰度的Queue分配策略
方案对比
方案 | 优点 | 缺点 | 成本 |
---|---|---|---|
影子Topic的方案 | 使用独立的两个TOPIC,订阅关系一致,改造比较容易 | 有临界问题,灰度切换生产时有可能会漏掉消息;同时需要根据生产、消费者关系维护对应的订阅关系;改造数据需求 | 需要维护两套消费组和TOPC,有维护成本 |
影子Group的方案 | 使用独立的两个GROUP,订阅关系一致 | 有临界问题;需要修改生产者、订阅者、DevOps改动范围较大;正常节点和灰度节点都需要知道当前服务的灰度状态,来做出对应的处理 | 需要维护两套GROUP的关系,维护成本高 |
Tag的方案 | 通过MQ提供的tag机制过滤,可以保证灰度的消息只会被灰度节点消费,改造简单; | 如果Tag参与的业务过滤,不适合该方案;如果没有灰度节点订阅关系不一致,会出现消息丢失 | 无 |
UserProperty的方案 | 同上 | 同上 | 同上 |
灰度分区的方案 | 订阅关系一致,不需要额外维护多一套TOPIC、GROUP; | 无(比较适合公司的一个运营模式) | 无 |
灰度分区方案设计
灰度分区主要基于以下几点:
DevOps
:灰度服务的Pod容器内部会有CANARY_RELEASE
:true
的环境变量。MQ客户端心跳上报
:源码中,RocketMQ客户端启动时会想向所有产生订阅关系的broker发送心跳,心跳中带有clientId
,该值主要由实例名、容器ip等组成,可以利用canary环境变量做一层额外的注入MQ客户端重平衡
:源码中,每隔20秒/客户端上下线,都会触发一次客户端重平衡;我们可以自定义该策略,加入灰度分区平衡逻辑,来实现灰度分区和正常分区的订阅MQ客户端发送方
:源码中,RocketMQ发送方每次发送消息都会轮询队列发送,同时加入重试和故障规避的策略,可以通过重写该类来做扩展。
适配只有部分灰度的情况所做的功能扩展
- 基于ThreadLocal来实现消费和发送的链路标识传递
- 拦截HTTP请求:通过拦截http请求识别请求是否灰度,基于ThreadLocal实现线程传递;
- 拦截MQ消息消费:通过拦截消费者消息,识别
UserProperty
识别存在灰度标识;更新到ThreadLocal中 - 拦截MQ生产者发送消息:获取
ThreadLocal
存储的当前线程的UserProperty
标识,写入到发送消息的UserProperty
中
- MQ客户端发送方:检测到环境和线程链路变量,做出对应的发送策略;从而来实现
灰度消息
通过正常应用
后,发送MQ
消息能被下级监听的灰度应用接收
;
消费者(无灰度)
消费者(有灰度)
改造流程
生产者改造逻辑
无论何时,只要发送方自己当前环境是灰度(CANARY_RELEASE=true)或者当前是灰度链路,则会最后两个分区作为灰度队列,否则选取其他分区发送,根据RocketMQ的源码,自定义发送策略即可实现。
import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.latency.LatencyFaultTolerance;
import org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
/**
* Custom fault strategy
*/
public class CustomMQFaultStrategy {
private final InternalLogger log = ClientLogger.getLog();
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
private boolean sendLatencyFaultEnable = false;
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
private final ConcurrentHashMap<TopicPublishInfo, TopicPublishInfoCache> topicPublishInfoCacheTable = new ConcurrentHashMap<>();
public long[] getNotAvailableDuration() {
return notAvailableDuration;
}
public void setNotAvailableDuration(final long[] notAvailableDuration) {
this.notAvailableDuration = notAvailableDuration;
}
public long[] getLatencyMax() {
return latencyMax;
}
public void setLatencyMax(final long[] latencyMax) {
this.latencyMax = latencyMax;
}
public boolean isSendLatencyFaultEnable() {
return sendLatencyFaultEnable;
}
public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
this.sendLatencyFaultEnable = sendLatencyFaultEnable;
}
private TopicPublishInfoCache checkCacheChanged(TopicPublishInfo topicPublishInfo) {
if (topicPublishInfoCacheTable.containsKey(topicPublishInfo)) {
return topicPublishInfoCacheTable.get(topicPublishInfo);
}
synchronized (this) {
TopicPublishInfoCache cache = new TopicPublishInfoCache();
List<MessageQueue> canaryQueues = MessageStorage.getCanaryQueues(topicPublishInfo.getMessageQueueList());
List<MessageQueue> normalQueues = MessageStorage.getNormalQueues(topicPublishInfo.getMessageQueueList());
Collections.sort(canaryQueues);
Collections.sort(normalQueues);
cache.setCanaryQueueList(canaryQueues);
cache.setNormalQueueList(normalQueues);
topicPublishInfoCacheTable.putIfAbsent(topicPublishInfo, cache);
}
return topicPublishInfoCacheTable.get(topicPublishInfo);
}
/**
* Queue selection strategy
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
List<MessageQueue> messageQueueList = tpInfo.getMessageQueueList();
TopicPublishInfoCache topicPublishInfoCache = checkCacheChanged(tpInfo);
if (MessageStorage.isCanaryRelease() || MessageStorage.isThreadCanaryRelease()) {
MessageQueue messageQueue = selectDefaultMessageQueue(tpInfo, lastBrokerName, topicPublishInfoCache.getCanaryQueueList());
log.debug("canary context,send message to canary queue:{}", messageQueue.getBrokerName() + messageQueue.getQueueId());
return messageQueue;
} else {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().incrementAndGet();
int size = topicPublishInfoCache.getNormalQueueList().size();
for (int i = 0; i < size; i++) {
int pos = Math.max(Math.abs(index++) % size, 0);
MessageQueue mq = topicPublishInfoCache.getNormalQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (!topicPublishInfoCache.getCanaryQueueList().contains(mq)) {
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
}
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
}
return selectDefaultMessageQueue(tpInfo, lastBrokerName, topicPublishInfoCache.getNormalQueueList());
}
}
/**
* Default message queue selection strategy
*
* @param topicPublishInfo
* @param lastBrokerName
* @param queues
* @return {@link MessageQueue }
*/
private MessageQueue selectDefaultMessageQueue(final TopicPublishInfo topicPublishInfo, final String lastBrokerName,
List<MessageQueue> queues) {
ThreadLocalIndex sendWhichQueue = topicPublishInfo.getSendWhichQueue();
int size = queues.size();
if (lastBrokerName != null) {
for (int i = 0; i < size; i++) {
int index = sendWhichQueue.incrementAndGet();
int pos = Math.max(Math.abs(index) % size, 0);
MessageQueue mq = queues.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
}
int i = sendWhichQueue.incrementAndGet();
int res = Math.max(Math.abs(i) % size, 0);
log.debug("selectDefaultMessageQueue, lastBrokerName:{}, res:{}", lastBrokerName, topicPublishInfo.getMessageQueueList().get(res));
return queues.get(res);
}
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i]) {
return this.notAvailableDuration[i];
}
}
return 0;
}
/**
* Cache of topic publish info
*/
private static class TopicPublishInfoCache {
/**
* Grayscale message queue list
*/
private List<MessageQueue> canaryQueueList;
private List<MessageQueue> normalQueueList;
public List<MessageQueue> getCanaryQueueList() {
return canaryQueueList;
}
public void setCanaryQueueList(List<MessageQueue> canaryQueueList) {
this.canaryQueueList = canaryQueueList;
}
public List<MessageQueue> getNormalQueueList() {
return normalQueueList;
}
public void setNormalQueueList(List<MessageQueue> normalQueueList) {
this.normalQueueList = normalQueueList;
}
}
}
消费者改造逻辑
-
其实最大的问题在于消费方如何动态的感知灰度的状态流转,这也是产生之前灰度分区方案的临界问题的根本原因。但是通过源码的深入探索,发现其实我们可以通过改造ClientId和自定义负载均衡策略来实现;
-
RocketMQ客户端启动的时候,会构建本地客户端id(包括实例名、ip名等),然后向broker注册自己。我们可以通过DevOps注入的环境变量CANARY_RELEASE来做改造,即灰度服务clientId后面追加canary表示,default服务后面追加default标识;
消费者自定义负载均衡
import com.alibaba.fastjson.JSON;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class CanaryAllocateMessageQueueStrategyImpl implements AllocateMessageQueueStrategy {
private final InternalLogger log = ClientLogger.getLog();
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
log.info("consumerGroup:{} currentCID:{} cidAll:{}",consumerGroup,currentCID, JSON.toJSONString(cidAll));
if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return Collections.emptyList();
}
if (mqAll.stream().anyMatch(mq -> mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) {
return allocateByAvg(consumerGroup, currentCID, mqAll, cidAll);
}
if (!MessageStorage.hasCanaryRelease(cidAll)) {
List<MessageQueue> allocate = allocateByAvg(consumerGroup, currentCID, mqAll, cidAll);
return allocate;
}
if (MessageStorage.allCanaryRelease(cidAll)) {
List<MessageQueue> messageQueues = this.balanceAllocate(consumerGroup, currentCID, mqAll, cidAll);
log.info("[canary allocate]: group:{} sub topic:{} has all canary release client,maybe the sub is new,use the default avg strategy.\n" +
"current cid:{}\n" +
"allocate total {} message queue\n" +
"result:\n{}",
consumerGroup,
mqAll.get(0).getTopic(),
messageQueues.size(),
currentCID,
MessageStorage.joinMessageQueue(messageQueues));
return messageQueues;
}
List<String> canaryCids = MessageStorage.getCanaryCids(cidAll);
List<String> normalCids = MessageStorage.getNormalCids(cidAll);
List<MessageQueue> canaryQueues = MessageStorage.getCanaryQueues(mqAll);
List<MessageQueue> normalQueues = MessageStorage.getNormalQueues(mqAll);
Collections.sort(canaryCids);
Collections.sort(normalCids);
Collections.sort(normalQueues);
Collections.sort(canaryQueues);
List<MessageQueue> result = null;
if (canaryCids.contains(currentCID)) {
result = allocateByAvg(consumerGroup, currentCID, canaryQueues, canaryCids);
} else {
result = allocateByAvg(consumerGroup, currentCID, normalQueues, normalCids);
}
return result;
}
/**
* @param consumerGroup
* @param currentCID
* @param mqAll
* @param cidAll
* @return {@link List }<{@link MessageQueue }>
*/
private List<MessageQueue> allocateByAvg(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return result;
}
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
@Override
public String getName() {
return "CANARY";
}
public boolean check(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
List<String> cidAll) {
if (StringUtils.isEmpty(currentCID)) {
throw new IllegalArgumentException("currentCID is empty");
}
if (CollectionUtils.isEmpty(mqAll)) {
throw new IllegalArgumentException("mqAll is null or mqAll empty");
}
if (CollectionUtils.isEmpty(cidAll)) {
throw new IllegalArgumentException("cidAll is null or cidAll empty");
}
if (!cidAll.contains(currentCID)) {
log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
consumerGroup,
currentCID,
cidAll);
return false;
}
return true;
}
public List<MessageQueue> balanceAllocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
List<MessageQueue> result = new ArrayList<MessageQueue>();
if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
return result;
}
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
return result;
}
}
MQ注册ClientID修改
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
//The key is here
if (MessageStorage.isCanaryRelease()) {
sb.append(CustomCommonConstant.CANARY_RELEASE_PREFIX);
} else {
sb.append("@default");
}
if (this.enableStreamRequestType) {
sb.append("@");
sb.append(RequestType.STREAM);
}
return sb.toString();
}
常量配置类
public class CustomCommonConstant {
/**
* Number of grayscale queues
*/
public static final Integer GRAYSCALE_QUEUE_SIZE = 2;
/**
* Grayscale client identification
*/
public static final String CANARY_RELEASE_PREFIX = "@canary";
/**
* Thread identification
*/
public static final String THREAD_CANARY_RELEASE = "canary.release";
}
链路信息传递BasicMDC
类
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class BasicMDC {
final ThreadLocal<Map<String, String>> copyOnThreadLocal = new ThreadLocal<>();
private static final int WRITE_OPERATION = 1;
private static final int MAP_COPY_OPERATION = 2;
final ThreadLocal<Integer> lastOperation = new ThreadLocal();
private Integer getAndSetLastOperation(int op) {
Integer lastOp = (Integer)this.lastOperation.get();
this.lastOperation.set(op);
return lastOp;
}
private boolean wasLastOpReadOrNull(Integer lastOp) {
return lastOp == null || lastOp == 2;
}
private Map<String, String> duplicateAndInsertNewMap(Map<String, String> oldMap) {
Map<String, String> newMap = Collections.synchronizedMap(new HashMap());
if (oldMap != null) {
synchronized(oldMap) {
newMap.putAll(oldMap);
}
}
this.copyOnThreadLocal.set(newMap);
return newMap;
}
public void put(String key, String val) throws IllegalArgumentException {
if (key == null) {
throw new IllegalArgumentException("key cannot be null");
} else {
Map<String, String> oldMap = (Map)this.copyOnThreadLocal.get();
Integer lastOp = this.getAndSetLastOperation(1);
if (!this.wasLastOpReadOrNull(lastOp) && oldMap != null) {
oldMap.put(key, val);
} else {
Map<String, String> newMap = this.duplicateAndInsertNewMap(oldMap);
newMap.put(key, val);
}
}
}
public void remove(String key) {
if (key != null) {
Map<String, String> oldMap = (Map)this.copyOnThreadLocal.get();
if (oldMap != null) {
Integer lastOp = this.getAndSetLastOperation(1);
if (this.wasLastOpReadOrNull(lastOp)) {
Map<String, String> newMap = this.duplicateAndInsertNewMap(oldMap);
newMap.remove(key);
} else {
oldMap.remove(key);
}
}
}
}
public void clear() {
this.lastOperation.set(1);
this.copyOnThreadLocal.remove();
}
public String get(String key) {
Map<String, String> map = (Map)this.copyOnThreadLocal.get();
return map != null && key != null ? (String)map.get(key) : null;
}
public Map<String, String> getPropertyMap() {
this.lastOperation.set(2);
return (Map)this.copyOnThreadLocal.get();
}
public Set<String> getKeys() {
Map<String, String> map = this.getPropertyMap();
return map != null ? map.keySet() : null;
}
public Map<String, String> getCopyOfContextMap() {
Map<String, String> hashMap = (Map)this.copyOnThreadLocal.get();
return hashMap == null ? null : new HashMap(hashMap);
}
public void setContextMap(Map<String, String> contextMap) {
this.lastOperation.set(1);
Map<String, String> newMap = Collections.synchronizedMap(new HashMap());
newMap.putAll(contextMap);
this.copyOnThreadLocal.set(newMap);
}
}
MessageStorage类
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Message storage
*/
public class MessageStorage {
private static final InternalLogger log = ClientLogger.getLog();
private static BasicMDC mdcUtils;
static {
mdcUtils = new BasicMDC();
}
/**
* Determine whether there is a grayscale client
*
* @param cidAll
* @return
*/
public static boolean hasCanaryRelease(List<String> cidAll) {
return !getCanaryCids(cidAll).isEmpty();
}
/**
* Determine if all are grayscale clients
*
* @param cidAll
* @return
*/
public static boolean allCanaryRelease(List<String> cidAll) {
List<String> canaryCids = getCanaryCids(cidAll);
return canaryCids.size() == cidAll.size();
}
/**
* Connect the message queue into a string
*
* @param messageQueues
* @return
*/
public static String joinMessageQueue(List<MessageQueue> messageQueues) {
return messageQueues.stream()
.map(mq -> mq.getBrokerName() + ":" + mq.getQueueId())
.collect(Collectors.joining(", "));
}
/**
* Get the list of grayscale clients
*
* @param cidAll
* @return
*/
public static List<String> getCanaryCids(List<String> cidAll) {
return cidAll.stream()
.filter(cid -> cid.contains(CustomCommonConstant.CANARY_RELEASE_PREFIX))
.collect(Collectors.toList());
}
/**
* Get a list of non grayscale clients
*
* @param cidAll
* @return
*/
public static List<String> getNormalCids(List<String> cidAll) {
return cidAll.stream()
.filter(cid -> !cid.contains(CustomCommonConstant.CANARY_RELEASE_PREFIX))
.collect(Collectors.toList());
}
/**
* Get the list of grayscale queues
*
* @param mqAll
* @return
*/
public static List<MessageQueue> getCanaryQueues(List<MessageQueue> mqAll) {
Collections.sort(mqAll);
log.info("topic:{} reBalance, has canary release client, allocate {} message queue by canary release strategy.\n", JSON.toJSONString(mqAll));
int size = mqAll.size();
if (size > CustomCommonConstant.GRAYSCALE_QUEUE_SIZE) {
List<MessageQueue> lastTwo = mqAll.subList(size - CustomCommonConstant.GRAYSCALE_QUEUE_SIZE, size);
return new ArrayList<>(lastTwo);
} else {
return new ArrayList<>();
}
}
/**
* Get non grayscale queue list
*
* @param mqAll
* @return
*/
public static List<MessageQueue> getNormalQueues(List<MessageQueue> mqAll) {
Collections.sort(mqAll);
log.info("topic:{} reBalance, has normal release client, allocate {} message queue by canary release strategy.\n", JSON.toJSONString(mqAll));
int size = mqAll.size();
if (size > CustomCommonConstant.GRAYSCALE_QUEUE_SIZE) {
List<MessageQueue> lastTwo = mqAll.subList(0, size - 2);
return new ArrayList<>(lastTwo);
} else {
return new ArrayList<>(mqAll);
}
}
private static volatile String canaryRelease = null;
/**
* Determine whether it is a grayscale client
*
* @return
*/
public static boolean isCanaryRelease() {
return Boolean.parseBoolean(getCanaryRelease());
}
/**
* Determine whether it is a grayscale client
*
* @return boolean
*/
public static boolean isThreadCanaryRelease() {
String data = mdcUtils.get(CustomCommonConstant.THREAD_CANARY_RELEASE);
return Boolean.parseBoolean(data);
}
/**
* Set MDC
*
* @param key
* @param value
*/
public static void setMDC(String key, String value) {
mdcUtils.put(key, value);
}
/**
* @param key
* @return {@link String }
*/
public static String getMdcKey(String key) {
return mdcUtils.get(key);
}
/**
* Clear MDC
*/
public static void clearMDC() {
mdcUtils.clear();
}
/**
* Replacement strategy
*
* @return {@link AllocateMessageQueueStrategy }
*/
public static AllocateMessageQueueStrategy getAllocateMessageQueueAveragely(AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
// if (isCanaryRelease()) {
// return new CanaryAllocateMessageQueueStrategyImpl();
// } else {
// return allocateMessageQueueStrategy;
// }
return new CanaryAllocateMessageQueueStrategyImpl();
}
private static String getCanaryRelease() {
if (Objects.isNull(canaryRelease)) {
synchronized (MessageStorage.class) {
if (Objects.nonNull(canaryRelease)) {
return canaryRelease;
}
//CANARY_RELEASE
String tmpCanaryRelease = System.getProperty("canary.release");
if (Objects.isNull(tmpCanaryRelease)) {
tmpCanaryRelease = "false";
}
canaryRelease = tmpCanaryRelease;
return canaryRelease;
}
}
return canaryRelease;
}
}
拦截配置
消费者拦截
@Component
@Aspect
public class ConsumerAOP {
@Pointcut("execution(public * org.apache.rocketmq.client.consumer.listener.*.*(..))")
public void aspectConsumer() {
}
@Before("aspectConsumer()")
public void doBefore(JoinPoint joinPoint) {
Object[] args = joinPoint.getArgs();
for (Object arg : args) {
if (arg instanceof List<?>) {
List<MessageExt> messageExtList = (List<MessageExt>) arg;
for (MessageExt messageExt : messageExtList) {
String threadCanaryRelease = messageExt.getProperty(CustomCommonConstant.THREAD_CANARY_RELEASE);
if(StringUtils.isEmpty(threadCanaryRelease)){
threadCanaryRelease = "false";
}
MessageStorage.setMDC(CustomCommonConstant.THREAD_CANARY_RELEASE, threadCanaryRelease);
}
}
}
}
}
@Component
@Aspect
public class ListenerAOP {
@Pointcut("execution(public * org.apache.rocketmq.spring.core.*.*(..))")
public void aspectListener(){}
@Before("aspectListener()")
public void doBefore(JoinPoint joinPoint){
Object[] args = joinPoint.getArgs();
for (Object arg : args) {
if (arg instanceof MessageExt){
MessageExt messageExt = (MessageExt) arg;
String threadCanaryRelease = messageExt.getProperty(CustomCommonConstant.THREAD_CANARY_RELEASE);
if(StringUtils.isEmpty(threadCanaryRelease)){
threadCanaryRelease = "false";
}
MessageStorage.setMDC(CustomCommonConstant.THREAD_CANARY_RELEASE, threadCanaryRelease);
}
}
}
}
生产者拦截
@Component
@Aspect
public class ProducerAOP {
@Pointcut("execution(public * org.apache.rocketmq.client.producer.*.*(..))")
public void aspectProducer() {
}
@Before("aspectProducer()")
public void doBefore(JoinPoint joinPoint) {
Object[] args = joinPoint.getArgs();
for (Object arg : args) {
if (arg instanceof Message) {
Message message = (Message) arg;
String threadCanaryRelease = MessageStorage.getMdcKey(CustomCommonConstant.THREAD_CANARY_RELEASE);
if(StringUtils.isEmpty(threadCanaryRelease)){
threadCanaryRelease = "false";
}
message.putUserProperty(CustomCommonConstant.THREAD_CANARY_RELEASE, threadCanaryRelease);
}
}
}
}
请求拦截
@Slf4j
@Component
@WebFilter(urlPatterns = "/*", filterName = "requestFilter")
@Order(Integer.MIN_VALUE)
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ForwardFilter implements Filter {
@SneakyThrows
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) {
HttpServletRequest req = (HttpServletRequest) servletRequest;
String threadCanaryRelease = req.getHeader("THREAD_CANARY_RELEASE");
log.info("attributeNames:{}", JSON.toJSONString(req.getHeaderNames()));
if(Objects.nonNull(threadCanaryRelease)){
MessageStorage.setMDC(CustomCommonConstant.THREAD_CANARY_RELEASE, "true");
}
filterChain.doFilter(servletRequest, servletResponse);
}
}
验证过程
消费者(未启动灰度订阅)
消费者(灰度订阅)
验证消息链路
发送端
@Slf4j
@RestController
@RequestMapping("/producer")
public class ProducerMessageContrlller {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@PostMapping("/send")
public void send()
{
boolean canaryRelease = MessageStorage.isCanaryRelease();
String body = String.format("发送消息,环境:%s", canaryRelease ? "灰度" : "正式");
// for (Integer i = 0; i < 100; i++) {
log.info("发送消息,环境:{}", canaryRelease ? "灰度" : "正式");
rocketMQTemplate.convertAndSend("TEST-DATA-MSG", body);
// }
}
}
消费端
@Slf4j
@Service
@RocketMQMessageListener(topic = "TEST-DATA-MSG", consumerGroup = "test-consumer-group")
public class MyConsumer1 implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
boolean canaryRelease = MessageStorage.isCanaryRelease();
Map<String, String> properties = message.getProperties();
log.info("received message: 【{}】 环境:【{}】 配置参数:【{}】 ", new String(message.getBody()),canaryRelease ? "灰度" : "正式",JSON.toJSONString(properties));
}
}
发送灰度消息