当前位置: 首页 > article >正文

kafka小实站

需要先在前面的文章里面照着下载好kafka,并且启动 先启动zookeeper

项目目录

package kafka; 
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * kafka消费者
 *
 **/
@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = "${kafka.topic.name}")
    public void listen(ConsumerRecord<?, ?> record) {
        log.info("topic={}, offset={}, message={}", record.topic(), record.offset(), record.value());
    }


}

comsumer代码

定义了一个简单的 Kafka 消费者,它使用 Spring 的 Kafka Listener 来监听指定的主题,并处理接收到的消息

  • @Component
    标记这个类为一个 Spring 的组件,使其被 Spring 容器管理并可以被自动扫描和加载。

  • @Slf4j
    这是 Lombok 提供的注解,自动为类生成一个名为 log日志记录器,可以直接用 log.info() 等方法记录日志,而无需手动定义日志实例。

  • @KafkaListener
    这是 Spring Kafka 提供的注解,用于标记一个方法为 Kafka 消息的监听器。

    • topics:指定要监听的 Kafka 主题。这里用占位符 ${kafka.topic.name},说明具体的主题名称是从配置文件(如 application.propertiesapplication.yml)中读取的。
  • 自动监听
    当有新的消息发布到指定的 Kafka 主题时,Spring 会自动调用标注了 @KafkaListener 的方法来处理消息。

  • 参数 ConsumerRecord<?, ?> record
    代表 Kafka 中一条消费记录,包含了消息的元数据(如主题名、分区、偏移量)和消息体。

    • record.topic():获取消息所属的 Kafka 主题名。
    • record.offset():获取消息的偏移量(在 Kafka 中,每条消息都有一个唯一的偏移量)。
    • record.value():获取消息的内容。
  • log.info
    使用日志记录器将接收到的消息详细信息打印到日志中,包括:

    • 消息所在的 主题名
    • 偏移量offset),用于定位消息在分区中的位置。
    • 消息的 内容
package kafka; 

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;

/**
 * kafka生产者
 **/
@Component
@Slf4j
public class KafkaProducer implements CommandLineRunner {

    @Resource
    KafkaTemplate kafkaTemplate;

    @Value("${kafka.topic.name}")
    private String topic;

    /**
     * 发送kafka消息
     *
     * @param string
     */
    public void send(String string) {
        ListenableFuture future = kafkaTemplate.send(topic, JSONObject.toJSONString(string));
        //future.addCallback(o -> log.info("kafka消息发送成功:" + jsonString), throwable -> log.error("kafka消息发送失败:" + jsonString));
    }

    /**
     * 容器启动完成后,生产测试数据
     */
    @Override
    public void run(String... args) {
        for (int i = 0; i < 10; i++) {
            send("hello world" + i);
        }
    }
}

producer代码

  • KafkaTemplate
    这是 Spring Kafka 提供的核心类,用于向 Kafka 发送消息。通过 @Resource 注解实现依赖注入。

  • @Value("${kafka.topic.name}")
    通过 Spring 的配置文件(如 application.propertiesapplication.yml)注入 Kafka 主题名称。

  • kafkaTemplate.send(topic, message)
    通过 KafkaTemplate 将消息发送到指定主题。

  • JSONObject.toJSONString(string)
    使用阿里巴巴的 fastjson 库将消息序列化为 JSON 格式。

  • ListenableFuture
    Kafka 消息发送是异步的,send() 方法返回一个 ListenableFuture 对象,可以用来监听消息的发送结果。

  • 日志记录(被注释)
    使用 addCallback 方法为发送成功和失败分别设置回调逻辑:

    • 发送成功时记录日志:log.info("kafka消息发送成功:" + string)
    • 发送失败时记录错误日志:log.error("kafka消息发送失败:" + string)
    • run(String... args)
      这是 CommandLineRunner 接口的方法,Spring 容器启动完成后会自动调用。

    • 循环发送消息
      启动后向 Kafka 主题发送 10 条测试消息,例如:hello world0hello world1 等。

## kafka ##
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=test


spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.topic.name=mqtt_location_data
#
#spring.data.mongodb.host=localhost
#spring.data.mongodb.port=27017
#spring.data.mongodb.database=your_database_name

配置文件application.properties 

运行结果:

2024-12-26 01:32:30.839  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:00.850  INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
2024-12-26 01:33:00.855  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.856  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.856  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted.
2024-12-26 01:33:00.970  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 01:33:00.971  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 01:33:00.980  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.847  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was Generation{generationId=2, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.847  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 01:33:15.874  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.875  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 3: {consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a=Assignment(partitions=[mqtt_location_data-0])}
2024-12-26 01:33:15.879  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}
2024-12-26 01:33:15.880  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-26 01:33:15.881  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-26 01:33:15.887  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=10, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-26 01:33:15.888  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions assigned: [mqtt_location_data-0]
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=10, message="hello world0"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=11, message="hello world1"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=12, message="hello world2"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=13, message="hello world3"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=14, message="hello world4"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=15, message="hello world5"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=16, message="hello world6"
2024-12-26 01:33:15.900  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=17, message="hello world7"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=18, message="hello world8"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=19, message="hello world9"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=20, message="hello world0"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=21, message="hello world1"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=22, message="hello world2"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=23, message="hello world3"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=24, message="hello world4"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=25, message="hello world5"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=26, message="hello world6"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=27, message="hello world7"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=28, message="hello world8"
2024-12-26 01:33:15.901  INFO 22936 --- [ntainer#0-0-C-1] kafka.KafkaConsumer                      : topic=mqtt_location_data, offset=29, message="hello world9"
2024-12-26 04:36:36.611  INFO 22936 --- [ntainer#0-0-C-1] o.a.kafka.clients.FetchSessionHandler    : [Consumer clientId=consumer-test-1, groupId=test] Error sending fetch request (sessionId=1770952277, epoch=389) to node 0:

org.apache.kafka.common.errors.DisconnectException: null

2024-12-26 04:36:36.612  INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
2024-12-26 04:36:36.727  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-26 04:36:36.731 ERROR 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Offset commit failed on partition mqtt_location_data-0 at offset 30: The coordinator is not aware of this member.
2024-12-26 04:36:36.731  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] OffsetCommit failed with Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'}: The coordinator is not aware of this member.
2024-12-26 04:36:36.732  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2024-12-26 04:36:36.732  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2024-12-26 04:36:36.732  WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Asynchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2024-12-26 04:36:36.732  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Failing OffsetCommit request since the consumer is not part of an active group
2024-12-26 04:36:36.733  WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Synchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2024-12-26 04:36:36.733  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2024-12-26 04:36:36.733  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Lost previously assigned partitions mqtt_location_data-0
2024-12-26 04:36:36.733  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions lost: [mqtt_location_data-0]
2024-12-26 04:36:36.733  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions revoked: [mqtt_location_data-0]
2024-12-26 04:36:36.733  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 04:36:36.734  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Attempt to heartbeat with stale Generation{generationId=3, memberId='consumer-test-1-2a15b89d-33d7-47ab-aa60-674ac9eb688a', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, ignoring the error
2024-12-26 04:36:36.735  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: need to re-join with the given member-id
2024-12-26 04:36:36.735  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-26 04:36:37.543  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}
2024-12-26 04:36:37.543  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 5: {consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61=Assignment(partitions=[mqtt_location_data-0])}
2024-12-26 04:36:37.553  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}
2024-12-26 04:36:37.554  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-26 04:36:37.554  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-26 04:36:37.557  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=30, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-26 04:36:37.589  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions assigned: [mqtt_location_data-0]
2024-12-30 22:44:08.518  INFO 22936 --- [t-thread | test] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted.
2024-12-30 22:44:09.620  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-12-30 22:44:09.629  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Attempt to heartbeat with Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'} and group instance id Optional.empty failed due to UNKNOWN_MEMBER_ID, resetting generation
2024-12-30 22:44:09.629  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response
2024-12-30 22:44:09.629  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: encountered UNKNOWN_MEMBER_ID from HEARTBEAT response
2024-12-30 22:44:09.629  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Failing OffsetCommit request since the consumer is not part of an active group
2024-12-30 22:44:09.630  WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Synchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
2024-12-30 22:44:09.630  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2024-12-30 22:44:09.630  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Lost previously assigned partitions mqtt_location_data-0
2024-12-30 22:44:09.630  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions lost: [mqtt_location_data-0]
2024-12-30 22:44:09.630  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions revoked: [mqtt_location_data-0]
2024-12-30 22:44:09.630  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-30 22:44:09.636 ERROR 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Offset commit failed on partition mqtt_location_data-0 at offset 30: The coordinator is not aware of this member.
2024-12-30 22:44:09.636  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] OffsetCommit failed with Generation{generationId=5, memberId='consumer-test-1-83590c41-dd9a-4a77-9349-487ca71dad61', protocol='range'}: The coordinator is not aware of this member.
2024-12-30 22:44:09.637  WARN 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Asynchronous auto-commit of offsets {mqtt_location_data-0=OffsetAndMetadata{offset=30, leaderEpoch=0, metadata=''}} failed: Offset commit cannot be completed since the consumer member's generation is already stale, meaning it has already participated another rebalance and got a new generation. You can try completing the rebalance by calling poll() and then retry commit again
2024-12-30 22:44:09.640  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Request joining group due to: need to re-join with the given member-id
2024-12-30 22:44:09.640  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] (Re-)joining group
2024-12-30 22:44:09.663  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully joined group with generation Generation{generationId=7, memberId='consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd', protocol='range'}
2024-12-30 22:44:09.663  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Finished assignment for group at generation 7: {consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd=Assignment(partitions=[mqtt_location_data-0])}
2024-12-30 22:44:09.667  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Successfully synced group in generation Generation{generationId=7, memberId='consumer-test-1-2c8145c0-d35d-4c70-bbe0-3c96144b69fd', protocol='range'}
2024-12-30 22:44:09.667  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Notifying assignor about the new Assignment(partitions=[mqtt_location_data-0])
2024-12-30 22:44:09.667  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Adding newly assigned partitions: mqtt_location_data-0
2024-12-30 22:44:09.672  INFO 22936 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-1, groupId=test] Setting offset for partition mqtt_location_data-0 to the committed offset FetchPosition{offset=30, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
2024-12-30 22:44:10.200  INFO 22936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : test: partitions assigned: [mqtt_location_data-0]


http://www.kler.cn/a/465638.html

相关文章:

  • MongoDB 固定集合
  • 对计网大题的一些指正(中间介绍一下CDM的原理和应用)
  • 01 数据分析介绍及工具准备
  • windows 图形基础架构简介
  • vim 的基础使用
  • 第二十六天 RNN在NLP中的应用
  • SASS 简化代码开发的基本方法
  • AcWing练习题:平均数2
  • 肿瘤免疫循环与肿瘤免疫治疗的关系
  • 《Vue3实战教程》39:Vue3无障碍访问
  • 初学stm32 --- FSMC驱动LCD屏
  • XML里预定义的字符实体引用
  • graylog+sidecar通过docker-compose部署并采集SSH登录日志
  • C++中的常见关键字
  • 如何在Golang中实现协程池
  • 靶机系列|VULNHUB|DC-3
  • grouped = df.drop(‘name‘, axis=1).groupby(‘team‘)
  • websocket-sharp:.NET平台上的WebSocket客户端与服务器开源库
  • 医学图像分析工具01:FreeSurfer || Recon -all 全流程MRI皮质表面重建
  • 在Windows计算机上打开 HEIC 文件的 6 种有效方法
  • Servlet中映射与部署
  • 形态学:图像处理中的强大工具
  • 数据分析思维(六):分析方法——相关分析方法
  • 关系分类(RC)模型和关系抽取(RE)模型的区别
  • ros2 py文件间函数调用
  • 【vLLM 学习】欢迎来到 vLLM!