当前位置: 首页 > article >正文

kafka使用常见问题

连接不上kafka,报下边的错

org.apache.kafka.common.KafkaException: Producer is closed forcefully.
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]
	at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]

2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer756","payerAcc":"payer_acc756","payeeName":"payee756","payeeAcc":"payee...' to topic Fraud_acc:

org.apache.kafka.common.KafkaException: Producer is closed forcefully.
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]
	at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]

2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer757","payerAcc":"payer_acc757","payeeName":"payee757","payeeAcc":"payee...' to topic Fraud_acc:

org.apache.kafka.common.KafkaException: Producer is closed forcefully.
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]
	at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]

2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer758","payerAcc":"payer_acc758","payeeName":"payee758","payeeAcc":"payee...' to topic Fraud_acc:

org.apache.kafka.common.KafkaException: Producer is closed forcefully.
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]
	at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]

2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer759","payerAcc":"payer_acc759","payeeName":"payee759","payeeAcc":"payee...' to topic Fraud_acc:

org.apache.kafka.common.KafkaException: Producer is closed forcefully.
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]
	at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]

2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer760","payerAcc":"payer_acc760","payeeName":"payee760","payeeAcc":"payee...' to topic Fraud_acc:

org.apache.kafka.common.KafkaException: Producer is closed forcefully.
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]
	at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]

2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer761","payerAcc":"payer_acc761","payeeName":"payee761","payeeAcc":"payee...' to topic Fraud_acc:

org.apache.kafka.common.KafkaException: Producer is closed forcefully.
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]
	at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]
	at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]
解决方法

cat server.properties 文件中配置一下下边的参数, 当初是localhost所以访问丢失
在这里插入图片描述

2025-01-01 09:18:04.991  WARN 6240 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-my-group-1, groupId=my-group] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2025-01-01 09:18:05.736  INFO 6240 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2025-01-01 09:18:05.736  INFO 6240 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2025-01-01 09:18:05.737  INFO 6240 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 1 ms
2025-01-01 09:18:05.843  INFO 6240 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = -1
	batch.size = 16384
	bootstrap.servers = [192.168.1.112:9092]
	buffer.memory = 33554432
	client.dns.lookup = use_all_dns_ips
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = true
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	socket.connection.setup.timeout.max.ms = 30000
	socket.connection.setup.timeout.ms = 10000
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.engine.factory.class = null
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.certificate.chain = null
	ssl.keystore.key = null
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.certificates = null
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2025-01-01 09:18:05.862  INFO 6240 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Instantiated an idempotent producer.
2025-01-01 09:18:05.882  INFO 6240 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.0.2
2025-01-01 09:18:05.882  INFO 6240 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 25b1aea02e37da14
2025-01-01 09:18:05.882  INFO 6240 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1735694285881
2025-01-01 09:18:06.127  INFO 6240 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition my-topic-0 to 0 since the associated topicId changed from null to ZiipuoTKS22oBX6HbBpMbQ
2025-01-01 09:18:06.128  INFO 6240 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: DioEcCfQQNi6Ea50_-07Ag
2025-01-01 09:18:06.160  INFO 6240 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] ProducerId set to 16 with epoch 0
2025-01-01 09:18:08.127  WARN 6240 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-my-group-1, groupId=my-group] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

kafka消费者报错

2025-01-01 17:04:38.425 ERROR 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-group-49, groupId=my-group] Offset commit failed on partition FraudAcc-0 at offset 25: The coordinator is not aware of this member.
2025-01-01 17:04:38.425  INFO 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-group-49, groupId=my-group] OffsetCommit failed with Generation{generationId=-1, memberId='', protocol='null'}: The coordinator is not aware of this member.
2025-01-01 17:04:38.425  INFO 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-group-49, groupId=my-group] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2025-01-01 17:04:38.425  INFO 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-group-49, groupId=my-group] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2025-01-01 17:04:38.425  WARN 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-my-group-49, groupId=my-group] Asynchronous auto-commit of offsets {FraudAcc-0=OffsetAndMetadata{offset=25, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

解决方法

根据你提供的日志信息,错误的根本原因在于Kafka消费者在尝试提交偏移量(offset commit)时遇到了协调者(coordinator)不知道的成员(member)。这通常发生在以下几种情况下:

  1. 消费者组重新平衡:当消费者组中的一个或多个消费者加入或离开时,Kafka会触发一次重新平衡操作。在此期间,所有消费者都会暂时失去对分区的所有权,并且需要重新加入组以获得新的分配。如果你的消费者在重新平衡后没有正确地重新加入,它将无法提交偏移量。

  2. max.poll.interval.ms超时:这个配置项定义了消费者可以花费多长时间处理来自poll()调用的消息而不返回到Kafka进行心跳检测。如果消费者花费的时间超过了这个值,Kafka认为该消费者已经死亡并触发重新平衡。这可能是由于消息处理时间过长或者消费者卡住未能及时调用poll()导致的。

  3. session.timeout.ms超时:这是另一个与消费者健康检查有关的参数,它决定了Kafka等待消费者发送心跳的最大时间。如果超过这个时间没有收到心跳,Kafka也会认为该消费者已经死亡并触发重新平衡。

  4. max.poll.records设置过高:如果你的max.poll.records配置得太高,那么每次poll()调用可能会返回大量的记录,从而增加了处理这些记录所需的时间,可能导致max.poll.interval.ms超时。

解决方案

你可以通过调整以下几个配置来解决这个问题:

  • 增加max.poll.interval.ms:如果知道你的消费者可能需要更多时间来处理一批消息,可以适当增加这个值。默认是5分钟(300,000毫秒),你可以根据实际情况调整为更长的时间。
    spring.kafka.consumer.max-poll-interval=600000  # 例如设置为10分钟
    
  • 减少max.poll.records:降低每次poll()调用返回的最大记录数,可以减少单次处理的负担,使消费者更快地完成处理并返回心跳。
    spring.kafka.consumer.max-poll-records=100  # 根据具体情况调整
    
  • 优化消息处理逻辑:确保你的消息处理逻辑尽可能高效,避免长时间阻塞的操作,如数据库查询、网络请求等。考虑使用异步处理或其他方式来加速处理过程。
  • 检查消费者的健壮性:确保消费者不会因为异常情况而卡住或崩溃。添加适当的异常处理和监控机制可以帮助快速发现和解决问题。
  • 确保消费者组ID唯一:确认每个消费者使用的消费者组ID是唯一的,除非它们确实属于同一个消费者组并且共享相同的订阅主题。

日志分析

从日志中可以看到,消费者consumer-my-group-49试图提交偏移量,但协调者并不知道这个成员的存在,因此提交失败。然后消费者重置了它的生成代(generation),并请求加入组。紧接着的日志警告指出了问题的核心——消费者在两次poll()之间的间隔超过了max.poll.interval.ms,这通常意味着消息处理时间过长。


http://www.kler.cn/a/468246.html

相关文章:

  • vue3组件化开发优势劣势分析,及一个案例
  • 权限掩码umask
  • 【Uniapp-Vue3】swiper滑块视图容器的用法
  • 【C++项目实战】类和对象入门实践:日期类实现万字详解
  • 【openwrt】OpenWrt 路由器的 802.1X 动态 VLAN
  • 以太网UDP协议栈实现(支持ARP、ICMP、UDP)--FPGA学习笔记26
  • 【机器学习篇】交通革命:机器学习如何引领未来的道路创新
  • 仓颉笔记——windows11安装启用cangjie语言,并使用vscode编写“你好,世界”
  • 面试经典150题——矩阵
  • 《数据结构》期末考试测试题【中】
  • 在PostgreSQL中,函数调用是一个非常重要的操作
  • deepseek v3模型为啥要开源
  • Eplan 项目结构(高层代号、安装地点、位置代号)
  • 初识C语言之函数的递归
  • 【linux基础I/O(1)】文件描述符的本质重定向的本质
  • 解决HBuilderX报错:未安装内置终端插件,是否下载?或使用外部命令行打开。
  • SQL Server 的备份机制及其恢复实现
  • 利用轮换IP的强大功能
  • CSS系列(49)-- Relative Color Syntax详解
  • Postgresql中clog与xid对应关系计算方法(速查表)
  • lua库介绍:数据处理与操作工具库 - leo
  • k8s 镜像拉取策略
  • 计算机组成原理——控制单元设计
  • 青少年编程与数学 02-005 移动Web编程基础 13课题、本地存储
  • 洛谷:P1540 [NOIP2010 提高组] 机器翻译
  • Sqoop其二,Job任务、增量导入、Hdfs导入、龙目