kafka使用常见问题
连接不上kafka,报下边的错
org.apache.kafka.common.KafkaException: Producer is closed forcefully.
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]
at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]
2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer756","payerAcc":"payer_acc756","payeeName":"payee756","payeeAcc":"payee...' to topic Fraud_acc:
org.apache.kafka.common.KafkaException: Producer is closed forcefully.
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]
at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]
2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer757","payerAcc":"payer_acc757","payeeName":"payee757","payeeAcc":"payee...' to topic Fraud_acc:
org.apache.kafka.common.KafkaException: Producer is closed forcefully.
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]
at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]
2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer758","payerAcc":"payer_acc758","payeeName":"payee758","payeeAcc":"payee...' to topic Fraud_acc:
org.apache.kafka.common.KafkaException: Producer is closed forcefully.
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]
at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]
2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer759","payerAcc":"payer_acc759","payeeName":"payee759","payeeAcc":"payee...' to topic Fraud_acc:
org.apache.kafka.common.KafkaException: Producer is closed forcefully.
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]
at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]
2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer760","payerAcc":"payer_acc760","payeeName":"payee760","payeeAcc":"payee...' to topic Fraud_acc:
org.apache.kafka.common.KafkaException: Producer is closed forcefully.
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]
at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]
2024-12-31 21:04:59.505 ERROR 35092 --- [ad | producer-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='{"id":null,"payerName":"payer761","payerAcc":"payer_acc761","payeeName":"payee761","payeeAcc":"payee...' to topic Fraud_acc:
org.apache.kafka.common.KafkaException: Producer is closed forcefully.
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:760) [kafka-clients-3.0.2.jar:na]
at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:747) [kafka-clients-3.0.2.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:283) ~[kafka-clients-3.0.2.jar:na]
at java.lang.Thread.run(Thread.java:750) ~[na:1.8.0_351]
解决方法
cat server.properties 文件中配置一下下边的参数, 当初是localhost所以访问丢失
2025-01-01 09:18:04.991 WARN 6240 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-my-group-1, groupId=my-group] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2025-01-01 09:18:05.736 INFO 6240 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2025-01-01 09:18:05.736 INFO 6240 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2025-01-01 09:18:05.737 INFO 6240 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 1 ms
2025-01-01 09:18:05.843 INFO 6240 --- [nio-8080-exec-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = -1
batch.size = 16384
bootstrap.servers = [192.168.1.112:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = true
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
2025-01-01 09:18:05.862 INFO 6240 --- [nio-8080-exec-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Instantiated an idempotent producer.
2025-01-01 09:18:05.882 INFO 6240 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.0.2
2025-01-01 09:18:05.882 INFO 6240 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 25b1aea02e37da14
2025-01-01 09:18:05.882 INFO 6240 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1735694285881
2025-01-01 09:18:06.127 INFO 6240 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition my-topic-0 to 0 since the associated topicId changed from null to ZiipuoTKS22oBX6HbBpMbQ
2025-01-01 09:18:06.128 INFO 6240 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: DioEcCfQQNi6Ea50_-07Ag
2025-01-01 09:18:06.160 INFO 6240 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 16 with epoch 0
2025-01-01 09:18:08.127 WARN 6240 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-my-group-1, groupId=my-group] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
kafka消费者报错
2025-01-01 17:04:38.425 ERROR 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-49, groupId=my-group] Offset commit failed on partition FraudAcc-0 at offset 25: The coordinator is not aware of this member.
2025-01-01 17:04:38.425 INFO 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-49, groupId=my-group] OffsetCommit failed with Generation{generationId=-1, memberId='', protocol='null'}: The coordinator is not aware of this member.
2025-01-01 17:04:38.425 INFO 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-49, groupId=my-group] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2025-01-01 17:04:38.425 INFO 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-49, groupId=my-group] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2025-01-01 17:04:38.425 WARN 38716 --- [. Out (20/24)#0] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-my-group-49, groupId=my-group] Asynchronous auto-commit of offsets {FraudAcc-0=OffsetAndMetadata{offset=25, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
解决方法
根据你提供的日志信息,错误的根本原因在于Kafka消费者在尝试提交偏移量(offset commit)时遇到了协调者(coordinator)不知道的成员(member)。这通常发生在以下几种情况下:
-
消费者组重新平衡:当消费者组中的一个或多个消费者加入或离开时,Kafka会触发一次重新平衡操作。在此期间,所有消费者都会暂时失去对分区的所有权,并且需要重新加入组以获得新的分配。如果你的消费者在重新平衡后没有正确地重新加入,它将无法提交偏移量。
-
max.poll.interval.ms
超时:这个配置项定义了消费者可以花费多长时间处理来自poll()
调用的消息而不返回到Kafka进行心跳检测。如果消费者花费的时间超过了这个值,Kafka认为该消费者已经死亡并触发重新平衡。这可能是由于消息处理时间过长或者消费者卡住未能及时调用poll()
导致的。 -
session.timeout.ms
超时:这是另一个与消费者健康检查有关的参数,它决定了Kafka等待消费者发送心跳的最大时间。如果超过这个时间没有收到心跳,Kafka也会认为该消费者已经死亡并触发重新平衡。 -
max.poll.records
设置过高:如果你的max.poll.records
配置得太高,那么每次poll()
调用可能会返回大量的记录,从而增加了处理这些记录所需的时间,可能导致max.poll.interval.ms
超时。
解决方案
你可以通过调整以下几个配置来解决这个问题:
- 增加
max.poll.interval.ms
:如果知道你的消费者可能需要更多时间来处理一批消息,可以适当增加这个值。默认是5分钟(300,000毫秒),你可以根据实际情况调整为更长的时间。spring.kafka.consumer.max-poll-interval=600000 # 例如设置为10分钟
- 减少
max.poll.records
:降低每次poll()
调用返回的最大记录数,可以减少单次处理的负担,使消费者更快地完成处理并返回心跳。spring.kafka.consumer.max-poll-records=100 # 根据具体情况调整
- 优化消息处理逻辑:确保你的消息处理逻辑尽可能高效,避免长时间阻塞的操作,如数据库查询、网络请求等。考虑使用异步处理或其他方式来加速处理过程。
- 检查消费者的健壮性:确保消费者不会因为异常情况而卡住或崩溃。添加适当的异常处理和监控机制可以帮助快速发现和解决问题。
- 确保消费者组ID唯一:确认每个消费者使用的消费者组ID是唯一的,除非它们确实属于同一个消费者组并且共享相同的订阅主题。
日志分析
从日志中可以看到,消费者consumer-my-group-49
试图提交偏移量,但协调者并不知道这个成员的存在,因此提交失败。然后消费者重置了它的生成代(generation),并请求加入组。紧接着的日志警告指出了问题的核心——消费者在两次poll()
之间的间隔超过了max.poll.interval.ms
,这通常意味着消息处理时间过长。