当前位置: 首页 > article >正文

Kafka消费者如何优雅下线

一、背景

我们在Kafka消费程序中,可能会调用dubbo接口,也可能会使用线程池,连接池等,但是在服务下线的时候,kafka的消费总是会报错。比如dubbo接口就会抛出异常RpcException: The channel is closed. 这说明kafka还在消费,但是dubbo rpc接口已经销毁了。

com.alibaba.dubbo.rpc.RpcException: Failed to invoke the method ****************cause: The channel com.alibaba.dubbo.remoting.transport.netty4.NettyClient [10.69.14.13:0 -> /10.21.105.4:8898] is closed!
    at com.alibaba.dubbo.rpc.cluster.support.FailoverClusterInvoker.doInvoke(FailoverClusterInvoker.java:111) ~[dubbo-2.6.18.jar!/:2.6.18]
    at com.alibaba.dubbo.rpc.cluster.support.AbstractClusterInvoker.invoke(AbstractClusterInvoker.java:244) ~[dubbo-2.6.18.jar!/:2.6.18]
    at com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker.invoke(MockClusterInvoker.java:75) ~[dubbo-2.6.18.jar!/:2.6.18]
    at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke$original$WNyopBb1(InvokerInvocationHandler.java:52) ~[dubbo-2.6.18.jar!/:2.6.18]
    at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke$original$WNyopBb1$accessor$b7jdPeHU(InvokerInvocationHandler.java) ~[dubbo-2.6.18.jar!/:2.6.18]
    at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler$auxiliary$dhLDgYzF.call(Unknown Source) ~[dubbo-2.6.18.jar!/:2.6.18]
    at cn.techwolf.observer.instrument.interceptor.enhance.InstMethodsBeforeInter.intercept(InstMethodsBeforeInter.java:73) ~[twl-observer-agent.jar:1.9.1-SNAPSHOT]
    at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java) ~[dubbo-2.6.18.jar!/:2.6.18]
    at com.alibaba.dubbo.common.bytecode.proxy0.getTags(proxy0.java) ~[dubbo-2.6.18.jar!/:2.6.18]
    at com.zhipin.zeus.engine.data.proxy.AbacusBizTagProxy.batchQueryTagValue(AbacusBizTagProxy.java:93) ~[classes!/:0.0.1-SNAPSHOT]
    at com.zhipin.zeus.engine.data.observer.bg.F1GrcdListBossObserver.handle(F1GrcdListBossObserver.java:94) ~[classes!/:0.0.1-SNAPSHOT]
    at com.zhipin.zeus.engine.data.consumer.BgActionConsumer.lambda$processBgAction$0(BgActionConsumer.java:97) ~[classes!/:0.0.1-SNAPSHOT]
    at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_342]

二、原因分析

我们直观的想一下,kakfa消费时调用了dubbo接口,那么项目启动时,dubbo接口bean应该早于kafka消费者加载;销毁时,顺序反过来,先销毁kafka消费者,再销毁dubbo接口bean才合理,为什么下线时,会出现kafka还在消费,dubbo bean却先一步销毁的情况呢?
dubbo关闭的钩子函数为DubboShutdownHook,项目启动时,com.alibaba.dubbo.config.AbstractConfig会将DubboShutdownHook注册到JVM,同时也会基于com.alibaba.dubbo.config.spring.extension.SpringExtensionFactory.ShutdownHookListener将其注册到spring中去,而dubbo的钩子函数默认是早于spring的钩子函数触发的,都由JVM控制,两者是同级的。
那么将dubbo的钩子函数完全交于spring管理,是否能解决这个问题?
基于spring-context:5.3.31进行源码分析,可以看到org.springframework.context.support.AbstractApplicationContext#doClose方法中先发布了ContextClosedEvent事件,然后再关闭Kafka消费者,而ContextClosedEvent发布时,就会触发dubbo接口bean的关闭,导致出现dubbo接口先关闭而kafka消费者还在消费的情况。
在这里插入图片描述
com.alibaba.dubbo.config.spring.extension.SpringExtensionFactory.ShutdownHookListener中可以看到监听到ContextClosedEvent事件则会执行dubbo的关闭。
在这里插入图片描述

三、解决方案

首先第一步,将dubbo的钩子函数从JVM中移除,由spring管理

DubboShutdownHook dubboShutdownHook = DubboShutdownHook.getDubboShutdownHook();
Runtime.getRuntime().removeShutdownHook(dubboShutdownHook);

第二步,我们可以基于SmartApplicationListener控制kafka消费者销毁早于dubbo接口,只要其getOrder()方法返回值小于Integer.MAX_VALUE(默认值)即可

import com.zhipin.zeus.engine.data.config.RocketMQConsumerConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.util.StopWatch;

/**
 * @Author: Sun Zhenghao
 * @Date: 2025/1/9
 */
@Configuration
@Slf4j
public class GracefulShutDownListener implements SmartApplicationListener {
    /**
     * 对于@KafkaListenr注解实现的方法会自动注册到KafkaListenerEndpointRegistry中
     */
    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
    /**
     * 这里自己维护了所有的RocketMQ的consumer
     */
    @Autowired
    private RocketMQConsumerConfig rocketMQConsumerConfig;

    /**
     * 设置Listener被什么事件触发
     */
    @Override
    public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
        return eventType == ContextClosedEvent.class;
    }

    @Override
    public boolean supportsSourceType(Class<?> sourceType) {
        return SmartApplicationListener.super.supportsSourceType(sourceType);
    }

    /**
     * 设置优先级,这里只说作为shutdownhook存在的Listener
     * return的数字越小,优先级越高,越靠前执行
     * @return
     */
    @Override
    public int getOrder() {
        return LOWEST_PRECEDENCE - 100;
    }

    @Override
    public String getListenerId() {
        return SmartApplicationListener.super.getListenerId();
    }

    /**
     * 实现真正优雅下线的逻辑
     * @param event the event to respond to
     */
    @Override
    public void onApplicationEvent(ApplicationEvent event) {
        if (event instanceof ContextClosedEvent) {
            // 将全部的kafka consumer关闭
            log.info("kafka consumer size: {}", kafkaListenerEndpointRegistry.getListenerContainers().size());
            kafkaListenerEndpointRegistry.getListenerContainers().forEach(messageListenerContainer -> {
                log.info("kafka stop consume, groupId: {}", messageListenerContainer.getGroupId());
                messageListenerContainer.stop();
            });

            // 同样的rocketMQ也可以优雅关闭
            log.info("rocketMQ consumer size: {}", rocketMQConsumerConfig.getTopicConsumerMap().size());
            rocketMQConsumerConfig.getTopicConsumerMap().forEach((topic, consumer) -> {
                StopWatch stopWatch = new StopWatch();
                stopWatch.start();
                // 尝试关闭,会等待awaitTerminationMillisWhenShutdown ms
                consumer.shutdown();

                stopWatch.stop();
                log.info("DefaultMQPushConsumer shutdown success, topic: {}, cost: {}ms", topic, stopWatch.getTotalTimeMillis());
            });
        }
    }
}

四、总结

本文主要以kafka消费者和dubbo接口为例,给出了优雅下线的解决方案,实际上也适用于线程池、连接池、rocketMQ消费者等的优雅下线。


http://www.kler.cn/a/503524.html

相关文章:

  • 对MySQL滴MVCC理解(超详细)
  • String.intern是什么
  • linux进程
  • 网络原理(四)—— 网络层、数据链路层 与 DNS
  • 【华为云开发者学堂】基于华为云 CodeArts CCE 开发微服务电商平台
  • 用python编写一个放烟花的小程序
  • RTK北斗高精度定位4G执法记录仪在铁路作业安全风险管控中的应用
  • 【kubernetes】K8S节点状态的维护
  • C++并发编程之普通无锁队列与单生成者单消费者队列
  • 数据结构与算法之栈: LeetCode 151. 反转字符串中的单词 (Ts版)
  • 概率论考前一天
  • Elasticsearch面试题总结
  • Linux Kernel 之十 详解 PREEMPT_RT、Xenomai 的架构、源码、构建及使用
  • 高级运维:源码编译安装httpd 2.4,提供系统服务管理脚本并测试
  • 【华为OD-E卷 - 猜数字 100分(python、java、c++、js、c)】
  • 代码随想录算法训练营第十二天|第18题. 四数之和
  • golang之数据库操作
  • ctf竞赛
  • VirtualBox环境中vscode报错:提取扩展时出错。Failed to fetch
  • Steam个人开发者注册备记
  • 解锁未来情感科技:AI 机器人 Ropet 搭载的前沿智能黑科技
  • K8s数据存储之详解(Detailed Explanation of K8s Data Storage)
  • 【JVM-2.2】使用JConsole监控和管理Java应用程序:从入门到精通
  • latex 中不要求显示页码
  • (一)QSQLite3库简介
  • 平台介绍-快速开发上手指南