当前位置: 首页 > article >正文

springboot kafka在kafka server AUTH变动后consumer自动销毁

前言

笔者使用了kafka用来传输数据,笔者在今年10月写了文章,怎么使用配置化实现kafka的装载:springboot kafka多数据源,通过配置动态加载发送者和消费者-CSDN博客

不过在实际运行中,kafka broker是加密的,本来也没啥,但是突然的一天笔者在监控发现消费者掉线了,发送者居然还是正常的,见鬼的事情就是这么朴实的发生了,而且日志有Authentication/Authorization Exception and no authExceptionRetryInterval set

这样的错误,还有

Closing the Kafka consumer
Kafka consumer has been closed

这样的日志,非常诡异,后面查询kafka集群才知道,kafka集群在某个时间被改了AUTH配置,然后又改回来了,神奇的操作。

现象

实际上看到日志是懵的,毕竟kafka是对方提供的,AUTH用户名和密码也是对方给的,而且发送者也出现AUTH失败,但是发送者一直重试,然后因为kafka集群的AUTH改回来了,重试成功了;唯独消费者AUTH失败后,关闭了。

源码分析

从日志搜索Authentication/Authorization Exception and no authExceptionRetryInterval set

发现日志出现在

org.springframework.kafka.listener.KafkaMessageListenerContainer

刚好在消息监听器容器的内部类

ListenerConsumer

看源码定义,是一个定时线程池的任务定义

private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback

分析怎么去消费的KafkaMessageListenerContainer的doStart方法

既然明晰了,那么分析问题的来源, run方法

		public void run() {
			ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
			publishConsumerStartingEvent(); //事件通知,spring事件
			this.consumerThread = Thread.currentThread();
			setupSeeks();
			KafkaUtils.setConsumerGroupId(this.consumerGroupId);
			this.count = 0;
			this.last = System.currentTimeMillis();
			initAssignedPartitions(); // 初始分配partitions
			publishConsumerStartedEvent(); // 消费者启动事件
			Throwable exitThrowable = null;
			this.lastReceive = System.currentTimeMillis();
			while (isRunning()) { //状态在上面的截图代码已经更新为运行状态
				try {
					pollAndInvoke(); // 队列拉取,拉取过程会出现各种异常
				}
				catch (NoOffsetForPartitionException nofpe) { //这个需要注意,但是这个是不可配置的
					this.fatalError = true;
					ListenerConsumer.this.logger.error(nofpe, "No offset and no reset policy");
					exitThrowable = nofpe;
					break;
				}
				catch (AuthenticationException | AuthorizationException ae) { //这个是授权失败,就是来源于kafka broker的auth鉴权
					if (this.authExceptionRetryInterval == null) { //时间配置,默认居然是null
						ListenerConsumer.this.logger.error(ae,
								"Authentication/Authorization Exception and no authExceptionRetryInterval set");
						this.fatalError = true;
						exitThrowable = ae;
						break; //循环结束
					}
					else { // 一段时间后重试
						ListenerConsumer.this.logger.error(ae,
								"Authentication/Authorization Exception, retrying in "
										+ this.authExceptionRetryInterval.toMillis() + " ms");
						// We can't pause/resume here, as KafkaConsumer doesn't take pausing
						// into account when committing, hence risk of being flooded with
						// GroupAuthorizationExceptions.
						// see: https://github.com/spring-projects/spring-kafka/pull/1337
						sleepFor(this.authExceptionRetryInterval);
					}
				}
				catch (FencedInstanceIdException fie) {
					this.fatalError = true;
					ListenerConsumer.this.logger.error(fie, "'" + ConsumerConfig.GROUP_INSTANCE_ID_CONFIG
							+ "' has been fenced");
					exitThrowable = fie;
					break;
				}
				catch (StopAfterFenceException e) {
					this.logger.error(e, "Stopping container due to fencing");
					stop(false);
					exitThrowable = e;
				}
				catch (Error e) { // NOSONAR - rethrown
					this.logger.error(e, "Stopping container due to an Error");
					this.fatalError = true;
					wrapUp(e);
					throw e;
				}
				catch (Exception e) {
					handleConsumerException(e);
				}
				finally {
					clearThreadState();
				}
			}
            //上面异常后,这里的处理
			wrapUp(exitThrowable);
		}

注意NoOffsetForPartitionException和AuthenticationException | AuthorizationException,其中能配置重试的是AuthenticationException | AuthorizationException异常,就是授权失败可以通过配置过一段时间抢救一下,一般而言,kafka首次授权失败基本上就不太可能成功了,但是这个只能控制consumer销毁,producer还在重试,所以出现了消费者销毁了,发送者在kafka集群auth还原后成功恢复。看看wrapUp方法

既然知道了原理,那么解决办法是配置

authExceptionRetryInterval

解决方法

配置authExceptionRetryInterval也是不容易,分析取值

private final Duration authExceptionRetryInterval =
       this.containerProperties.getAuthExceptionRetryInterval();

从 containerProperties来的,实际上是继承org.springframework.kafka.listener.ConsumerProperties

在spring-kafka 2.8版本还对属性命名重构了,毕竟以前的命名字母太多了,😁

不过要修改这个值也不容易,kafkaproperties并没有提供这个参数,而且创建消费者容器工厂时

org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory

那么只能在

ConcurrentMessageListenerContainer

创建成功后,从这个里面读取配置,设置默认值。这就可以使用前面埋点的spring事件了,通过事件拿到Consumer,不就可以修改配置了

使用

publishConsumerStartedEvent();

的事件最合适,执行循环前最后一个事件,而且这里的this,就是消费者容器org.springframework.kafka.listener.KafkaMessageListenerContainer对象,就是我们需要的

编写代码如下:

@Component
public class KafkaConsumerStartedListener implements ApplicationListener<ConsumerStartedEvent> {
    @Override
    public void onApplicationEvent(ConsumerStartedEvent event) {
        KafkaMessageListenerContainer<?, ?> container = event.getSource(KafkaMessageListenerContainer.class);
        container.getContainerProperties().setAuthExceptionRetryInterval(Duration.of(30, ChronoUnit.SECONDS));
    }
}

就这样就解决了问题,哈哈哈贼简单。 

笔者在使用API时发现了还有一个API是授权失败后重启,是布尔变量,默认值是false,即不重试。那么这个是哪里触发的呢,在我们看到的消费者销毁事件里面,实际上控制重试有多种办法,一个是循环去kafka broker拉取,一个是重启kafka消费者容器。

 

这个在上面已经说明了,简单截个图再说明一下

设置的地方还是上面的代码里面,同理也可以修改事件为

ConsumerStoppedEvent

可以更精准,当然就用刚刚的代码加一行设置也是可以的。

总结

kafka在发送者和消费者是区分开的,发送者如果连接kafka broker失败后可以一直重试直到成功,但是消费者确有各种各样的逻辑,可以精准控制,比如消费者重启的配置

restartAfterAuthExceptions

可以控制消费者在停止时重启,如果仅仅是授权失败,而且不需要反复重启(消耗资源),那么可以通过

authExceptionRetryInterval

配置时间周期的方式实现,但是kafka并没有给我们配置的入口,但是kafka在消费者启动消费的过程埋了很多spring事件钩子,通过这些钩子可以操作,估计spring-kafka也不希望我们去修改,毕竟消费者启动失败了或授权失败了,消费者自动销毁是符合正常逻辑的。如果不使用kafka自己提供的事件,可以在启动完成通过

org.springframework.kafka.config.KafkaListenerEndpointRegistry

拿到所有消费者容器,来批量设置属性,毕竟spring-kafka也是通过这个端点注册器注册MessageListenerContainer的。


http://www.kler.cn/a/419142.html

相关文章:

  • 【解决安全扫描漏洞】---- 检测到目标站点存在 JavaScript 框架库漏洞
  • C# 2024年Visual Studio实用插件集合
  • Ubuntu 服务器部署 Tomcat 并配置 SSL/TLS 证书
  • Graphy 是一款终极、易于使用、功能齐全的 FPS 计数器、统计监视器和调试器,适用于您的 Unity 项目。
  • 认识redis 及 Ubuntu安装redis
  • 349. 两个数组的交集:
  • linux系统信号简介
  • Scala—列表(可变ListBuffer、不可变List)用法详解
  • FAT文件系统
  • 【ETCD】etcd简单入门之基础操作基于etcdctl进行操作
  • arkTS:持久化储存UI状态的基本用法(PersistentStorage)
  • 基于Java Springboot宠物医院微信小程序
  • UI设计-色彩、层级、字体、边距(二)
  • 民锋视角:数据分析如何助力金融决策
  • 【docker集群应用】Docker--harbor私有仓库部署与管理
  • C语言——管理系统
  • glog在vs2022 hello world中使用
  • MySQL Inception工具
  • 泷羽sec-shell(5)字符串运算符和逻辑运算符 学习笔记
  • 编程基础篇
  • 英语写作中以rationale 替代reason(理由)
  • node.js基础学习-querystring模块-查询字符串处理(三)
  • 二分搜索(二)搜索插入位置
  • javascript切换类、删除类、修改类以及增加类
  • 低代码与微服务融合在医疗集团中的补充应用探究
  • 计算机网络--网络安全测试