Kafka消费者如何优雅下线
一、背景
我们在Kafka消费程序中,可能会调用dubbo接口,也可能会使用线程池,连接池等,但是在服务下线的时候,kafka的消费总是会报错。比如dubbo接口就会抛出异常RpcException: The channel is closed. 这说明kafka还在消费,但是dubbo rpc接口已经销毁了。
com.alibaba.dubbo.rpc.RpcException: Failed to invoke the method ****************cause: The channel com.alibaba.dubbo.remoting.transport.netty4.NettyClient [10.69.14.13:0 -> /10.21.105.4:8898] is closed!
at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:111) ~[dubbo-2.6.18.jar!/:2.6.18]
at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:244) ~[dubbo-2.6.18.jar!/:2.6.18]
at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:75) ~[dubbo-2.6.18.jar!/:2.6.18]
at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke$original$WNyopBb1(InvokerInvocationHandler.java:52) ~[dubbo-2.6.18.jar!/:2.6.18]
at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke$original$WNyopBb1$accessor$b7jdPeHU(InvokerInvocationHandler.java) ~[dubbo-2.6.18.jar!/:2.6.18]
at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler$auxiliary$dhLDgYzF.call(Unknown Source) ~[dubbo-2.6.18.jar!/:2.6.18]
at cn.techwolf.observer.instrument.interceptor.enhance.InstMethodsBeforeInter.intercept(InstMethodsBeforeInter.java:73) ~[twl-observer-agent.jar:1.9.1-SNAPSHOT]
at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java) ~[dubbo-2.6.18.jar!/:2.6.18]
at com.alibaba.dubbo.common.bytecode.proxy0.getTags(proxy0.java) ~[dubbo-2.6.18.jar!/:2.6.18]
at com.zhipin.zeus.engine.data.proxy.AbacusBizTagProxy.batchQueryTagValue(AbacusBizTagProxy.java:93) ~[classes!/:0.0.1-SNAPSHOT]
at com.zhipin.zeus.engine.data.observer.bg.F1GrcdListBossObserver.handle(F1GrcdListBossObserver.java:94) ~[classes!/:0.0.1-SNAPSHOT]
at com.zhipin.zeus.engine.data.consumer.BgActionConsumer.lambda$processBgAction$0(BgActionConsumer.java:97) ~[classes!/:0.0.1-SNAPSHOT]
at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_342]
二、原因分析
我们直观的想一下,kakfa消费时调用了dubbo接口,那么项目启动时,dubbo接口bean应该早于kafka消费者加载;销毁时,顺序反过来,先销毁kafka消费者,再销毁dubbo接口bean才合理,为什么下线时,会出现kafka还在消费,dubbo bean却先一步销毁的情况呢?
dubbo关闭的钩子函数为DubboShutdownHook,项目启动时,com.alibaba.dubbo.config.AbstractConfig会将DubboShutdownHook注册到JVM,同时也会基于com.alibaba.dubbo.config.spring.extension.SpringExtensionFactory.ShutdownHookListener将其注册到spring中去,而dubbo的钩子函数默认是早于spring的钩子函数触发的,都由JVM控制,两者是同级的。
那么将dubbo的钩子函数完全交于spring管理,是否能解决这个问题?
基于spring-context:5.3.31进行源码分析,可以看到org.springframework.context.support.AbstractApplicationContext#doClose方法中先发布了ContextClosedEvent事件,然后再关闭Kafka消费者,而ContextClosedEvent发布时,就会触发dubbo接口bean的关闭,导致出现dubbo接口先关闭而kafka消费者还在消费的情况。
com.alibaba.dubbo.config.spring.extension.SpringExtensionFactory.ShutdownHookListener中可以看到监听到ContextClosedEvent事件则会执行dubbo的关闭。
三、解决方案
首先第一步,将dubbo的钩子函数从JVM中移除,由spring管理
DubboShutdownHook dubboShutdownHook = DubboShutdownHook.getDubboShutdownHook();
Runtime.getRuntime().removeShutdownHook(dubboShutdownHook);
第二步,我们可以基于SmartApplicationListener控制kafka消费者销毁早于dubbo接口,只要其getOrder()方法返回值小于Integer.MAX_VALUE(默认值)即可
import com.zhipin.zeus.engine.data.config.RocketMQConsumerConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.util.StopWatch;
/**
* @Author: Sun Zhenghao
* @Date: 2025/1/9
*/
@Configuration
@Slf4j
public class GracefulShutDownListener implements SmartApplicationListener {
/**
* 对于@KafkaListenr注解实现的方法会自动注册到KafkaListenerEndpointRegistry中
*/
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
/**
* 这里自己维护了所有的RocketMQ的consumer
*/
@Autowired
private RocketMQConsumerConfig rocketMQConsumerConfig;
/**
* 设置Listener被什么事件触发
*/
@Override
public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
return eventType == ContextClosedEvent.class;
}
@Override
public boolean supportsSourceType(Class<?> sourceType) {
return SmartApplicationListener.super.supportsSourceType(sourceType);
}
/**
* 设置优先级,这里只说作为shutdownhook存在的Listener
* return的数字越小,优先级越高,越靠前执行
* @return
*/
@Override
public int getOrder() {
return LOWEST_PRECEDENCE - 100;
}
@Override
public String getListenerId() {
return SmartApplicationListener.super.getListenerId();
}
/**
* 实现真正优雅下线的逻辑
* @param event the event to respond to
*/
@Override
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ContextClosedEvent) {
// 将全部的kafka consumer关闭
log.info("kafka consumer size: {}", kafkaListenerEndpointRegistry.getListenerContainers().size());
kafkaListenerEndpointRegistry.getListenerContainers().forEach(messageListenerContainer -> {
log.info("kafka stop consume, groupId: {}", messageListenerContainer.getGroupId());
messageListenerContainer.stop();
});
// 同样的rocketMQ也可以优雅关闭
log.info("rocketMQ consumer size: {}", rocketMQConsumerConfig.getTopicConsumerMap().size());
rocketMQConsumerConfig.getTopicConsumerMap().forEach((topic, consumer) -> {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
// 尝试关闭,会等待awaitTerminationMillisWhenShutdown ms
consumer.shutdown();
stopWatch.stop();
log.info("DefaultMQPushConsumer shutdown success, topic: {}, cost: {}ms", topic, stopWatch.getTotalTimeMillis());
});
}
}
}
四、总结
本文主要以kafka消费者和dubbo接口为例,给出了优雅下线的解决方案,实际上也适用于线程池、连接池、rocketMQ消费者等的优雅下线。