4.微服务灰度发布落地实践(消息队列增强)
文章目录
- 前言
- 问题分析
- 消息队列特性分析
- kafka
- rocketmq
- rabbitmq
- 发布订阅公共抽象
- 发送端
- 订阅端
前言
消息队列是一种用于在应用程序的不同组件或系统之间传递消息的通信机制。它通过将消息存储在一个队列中,确保消息能够可靠地从发送方传递到接收方,即使发送方和接收方不同时在线或处理能力不同。消息队列在现代分布式系统、微服务架构以及异步处理场景中扮演着至关重要的角色;所以在处理灰度发布的实现,避免不了对其改造。
问题分析
消息的灰度路由,不像应用服务之间可以简单的改写路由规则;网上看到很多关于消息灰度的方案,都不是很理想,大多涉及到需要配合修改业务代码;现实中,在打算实施灰度发布方案时,往往已存在大量的旧服务在上线运行着,改造推动难信不可想象,方案是否能落地得大打折扣;所以如果涉及到需要改业务代码的方案,不太能行得通,现在唯一到想到的是通过agent的方案,可以实现无需改动业务代码,并且接入升级较为方便。
消息队列特性分析
本文重点是关注消息队列可能作灰度路由特性的分析,个人认为消息灰度路由作为整个灰度发布最为麻烦的点。
kafka
kafka 有个消费者组这样的特性: 同一个topic某条消息,同一消费者的只有其中一个消费者能收到该消息;利用这一特性,可以把某一topic的消费者组分为:正常的消费者组和灰度消息者组;正常的消费者收到topic消息后,判断如果是正常的消息则接受,如果是携带灰度标签的消息则丢弃;同量灰度消费者收到topic消息后,如果正常消息去弃掉,如果是携带灰度标签的消息则接受。对生产端,如接收到的请求是灰度请求,则发送携带灰度标签的消息,否则发送不携带灰度标签的消息。
rocketmq
rocketmq 跟kafka有些类似,处理方式可以同步
rabbitmq
rabbitmq 的发布、订阅,与kafka或rocketmq 有些不同,rabbitmq 中的exchange和routingKey组成有些相当于kafka中的topic,服务订阅下,要创建queue及绑定相应的exchange和routingKey;没有消费者组的概念,但同一条exchange和routingKey可以绑定多个队列,同时它用virtualhost的概念,vhost 是 RabbitMQ 中的一个逻辑隔离单元,类似于操作系统中的“虚拟目录”或 Web 服务器中的“虚拟主机”。每个 vhost 都有自己的独立命名空间,包含自己的交换机、队列、绑定、用户权限等;那么可以利可以用绑定多个queue或virtualhost的特性,实现消息灰度发布订阅;考虑管理实现复杂,难易度,本方采用利用virtualhost的特性实现消息的灰度发布订阅:
- 在rabbitmq原有的vitualhost 配置基本上,利用程序创建一个对应的灰度virtualhost,并完全copy原有的配置;
- 服务在启动时,据自身所处的状态,mq客户端连接相应用virtualhost:如处理灰度状态,连接灰度的virtualhost,如果处理正常状态,则连接原有的virtualhost;
- 实例状态在线切换时,销毁原有的连接,切换到对应的virtualhost连接;
- 对生消息发生端,则同时连接正常virtualhost和灰度的vitualhost,如果发送正常消息,则使用正常的连接,如果发送灰度消息,则使用灰度virtualhost连接;
- 但是存在一个问题,当某个服务只有灰度或正常实例时,与其状态相反的消息没法被消费,除非做了双订阅,同时订阅正常消息和灰度消息,如果是kafka和rocketmq则可以很好处理该问题。
发布订阅公共抽象
据以上分析,抽象出消息队列了的发送端和订况端
发送端
public abstract class AbstractPublishInterceptor<M>{
/**
* 续传灰度变量
* @param allArguments
* @param routingEnv
* @return
*/
protected abstract List<M> setOutContext(Object[] allArguments, String routingEnv);
protected abstract ComponentType getType();
}
订阅端
public interface InInterceptor {
default void setContext(String routingEnv) {
ServerContextHolder.setData(X_ENV, routingEnv);
}
default void removeContext() {
ServerContextHolder.remove(X_ENV);
}
default String getContext(Object... args) {
if (args == null) {
return (String) ServerContextHolder.getData(X_ENV);
}
return null;
}
}
public abstract class AbstractConsumeInterceptor implements InInterceptor, InstanceMethodsAroundInterceptor {
protected static final ILog logger = LogManager.getLogger(AbstractConsumeInterceptor.class);
protected boolean discard(Object xEnv) {
ServerInstance instance = ServerInstance.getInstance();
if (instance.getEnvStatus() == 0 && xEnv == null) {
if (instance.getNormalInstances() < 1 && instance.getMessageConsumeMode() == 1) {
//没有正常实例,让灰度实例消息费(一般不会出现该情况)
return false;
}
//灰度实列,丢弃正常消息
logger.debug("灰度实列丢掉正常消息");
return true;
}
if (instance.getEnvStatus() == 1 && xEnv != null) {
if (instance.getGrayInstances() < 1 && instance.getMessageConsumeMode() == 1) {
//没有灰度实例,让正常实例消息费,如果灰度再起来,可能会出现重复消费
return false;
}
//正常实例收到灰度消息,丢弃
logger.info("正常实列丢掉灰度消息");
return true;
}
return false;
}
protected abstract ComponentType getType();
}