当前位置: 首页 > article >正文

Kafka 消费者

Kafka消费者主要负责消费(读取和处理)由生产者发布的消息。

1 消费者入门

消费组将具有相同group.id的消费者实例组织成组。它们共同读取一个或多个主题的消息。每个消费者都有一个对应的消费组。

消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。

图 消费组与主题的关系

一个消费者,增加新的消费者时,分区会进行重分配,每个消费者会消费不同的分区。可以通过增加(或减少)消费者的个数来提高(或降低)整体的消费能力。但,如果消费者的个数大于分区个数,会出现消费者分配不到任何分区的情况。

1.1 创建消费者消费的步骤

1)配置消费者参数。必备参数:key.deserializer、value.deserializer、bootstrap.servers、group.id(消费者隶属的消费组名称)

2)创建消费者实例。

3)订阅主题及分区。

4)拉取消息,并进行消费。

5)关闭消费者实例。

public static void doConsume(String topic,ThreadSwitch threadSwitch) {
        new Thread(() -> {
            Properties props = new Properties();
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,HOST);
            Consumer<String,String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singleton(topic));
            while (threadSwitch.running()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String,String> record : records) {
                    System.out.println(record.partition() + "," + record.offset() + "," + record.value());
                }
            }
            consumer.close();
        }).start();
    }

1.1.1 订阅主题与分区

void subscribe(Collection<String> topics);
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
void subscribe(Pattern pattern);
void assign(Collection<TopicPartition> partitions);

上面是消费者订阅主题及分区的方法。订阅方式有三种:集合订阅(AUTO_TOPICS)、正则表达式订阅(AUTO_PARTTERN)、指定分区订阅(USER_ASSIGNED)。消费者只能选择其中一种方式。

1.1.2 查询指定主题的元数据信息

List<PartitionInfo> partitionsFor(String topic);
List<PartitionInfo> partitionsFor(String topic, Duration timeout);

图 PartitionInfo UML

 partitionsFor 是一个同步方法,当给定主题没有元数据时,会发送请求到Kafka服务器中,请求元数据。

1.1.3 取消订阅

unsubscribe() 方法来取消主题的订阅,也可以通过将subscribe(Collection)或assign(Collection)中的集合参数设置为空集合的方式取消主题的订阅。

2 消息消费

ConsumerRecords<K, V> poll(Duration timeout);

消费者通过不断轮询poll方法来获取所订阅的主题(分区)上的一组消息。

2.1 位移提交

对于分区,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置(又称偏移量)。

对应消费者,使用offset来表示它消费到分区中某个消息所在的位置(又称消费位移)。

2.1.1 消费位移持久化

每次调用poll()方法时,需要它返回的是还没有被消费过的消息集。为实现这个,需要记录上次消费时的消费位移(这个值不单单保存在内存中,还需要做持久化保存,否则消费者重启之后就无法知晓之前的消费位移)。

在Kafka中,消费位移存储在Kafka内部的主题__consumer_offsets中。把消费位移存储起来的动作称为“提交”。

图 消费位移

理想情况下,我们希望消费到X的位置时,能向服务器提交的是X+1这个位置的消费位移。

2.1.2 重复消费与消息丢失

实际上,通过poll方法获取消息后,需要经过我们的业务代码处理获取的消息才能算真正的把消息消费完成。有时,业务代码会比较耗时,或出现异常。在消费消息时会出现重复消费或消息丢失的情况。

重复消费

消费完某条消息时,因服务器故障导致没能提交消费位移,再次启动时,将会导致重复消费。

消息丢失

在还未消费完某组消息时,已完成消费位移提交,此时消费者发生故障,导致消费中断。 消费者重启时,会导致这些消息没能被继续消费。

表 重复消费及消息丢失的场景

 2.1.3 自动提交

Kafka中默认的消费位移的提交方式是自动提交,由消费者参数enable.auto.commit配置。周期性提交,auto.commit.interval.ms配置自动提交的间隔时间,默认值为5秒。

在未发生异常的时候,poll方法在轮询的时候是通过保存在内存中的消费位移来取消息。

2.1.4 手动提交

自动位移提交无法做到精确的位移管理,Kafka还提供来手动位移提交的方式。

void commitSync();
void commitSync(Duration timeout);
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);

手动提交分为同步提交和异步提交。注意:不要每消费完一条消息就提交一次,这样会降低性能。

2.2 指定位移消费

auto.offset.reset 配置当消费者查找不到所记录的消费位移时,从何处开始进行消费。默认值latest,表示从分区末尾开始消费;earliest表示从起始处;none 表示既不从起始也不从末尾,而是报出NoOffsetForPartitionException异常。

Kafka提供seek方法用于指定从特定的位移处开始拉取消息。

void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);

2.1.1 获取分区信息

调用seek方法时,参数需要指定分区信息。上面的partitionsFor方法是获取某个主题下所有的分区信息,而非该消费者实例所被分配的分区信息。

Set<TopicPartition> assignment();

assignment方法用于获取消费者所被分配的所有分区。如果分配还未发生或者正在重新分配过程中,则返回值可能为空。可以通过以下方式来确保获取到的分区信息不为空。

Set<TopicPartition> partitionSet = null;
while (partitionSet == null || partitionSet.isEmpty()) {
    consumer.poll(Duration.ofMillis(100));
    partitionSet = consumer.assignment();
}

2.3 再均衡

分区的所属权从一个消费者转移到另一个消费者的行为。当添加或删除消费组内的消费者时会发生再均衡。

再均衡发生期间,消费组内的消费者是无法读取的。当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。一般情况下,应避免不必要的再均衡的发生。

subscribe方法有个参数是ConsumerRebalanceListener类型,用于指定订阅后发生再均衡的监听器。

图 ConsumerRebalanceListener接口的UML

onPartitionsRevoked 会在再均衡开始之前和消费者停止读取消息之后被调用,参数表示再均衡前所分配的分区。

onPartitionsAssigned 会再重新分配分区之后和消费者开始读取消费之前被调用。参数表示再均衡后所分配到的分区。

可以通过onPartitionsRevoked 回掉执行手动提交消费位移,来尽量避免不必要的重复消费。

2.4 拦截器

图 ConsumerInterceptor 接口的UML

消费者客户端的interceptor.classes 的参数配置拦截器。

onConsume 会在poll方法返回之前调用,来对消息进行相应的定制化操作。

onCommit 会在提交完消费位移之后被调用。

2.5 多线程实现

KafkaProducer是线程安全的,而KafkaConsumer 不是。在KafkaConsumer 中除了wakeup()方法外的其他公用方法,在执行前都会调用acquire()方法,来检测当前是否只有一个线程在操作,否则抛出异常。

2.5.1 多个消费者实例线程

创建多个线程,每个线程创建一个KafkaConsumer实例,每个消费者分别消费不同的区,或者多个消费者同时消费同一个分区。

如果分区数量多,那么所需要创建的线程也比较多,每个消费线程都要维护一个独立的TCP连接,这样会带来不小的系统开销。

2.5.2 一个消费线程 + 消费业务线程池

一般而言,poll()拉取消息的速度是相当快,而整体消费的瓶颈在处理消息这一块。

创建一个消费线程来管理一个KafkaConsumer实例及拉取消息。然后将这些消息分发给业务线程池中的线程去处理。

2.6 重要的消费者参数

fetch.min.bytes

在一次拉取请求中(调用poll方法)中能从Kafka中拉取的最小数据量。默认值为1(B)。如果返回给Consumer的数据量小于其值,那它就需要等待,直到满足。

适当调大这个参数能提高一定的吞吐量,但也会造成额外的延迟。

fetch.max.wait.ms

指定Kafka的等待时间,默认值500ms。

max.poll.records

在一次拉取请求中拉取的最大消息数,默认值500

exclude.internal.

topics

Kafka有两个内部主题:__consumer_offsets和__transaction_state。参数配置内部主题是否可以向消费者公开,默认值true。

request.timeout.ms

Consumer等待请求响应的最长时间,默认值30000ms

metadata.max.age.

ms

元数据的过期时间,默认值300000ms。

isolation.level

配置消费者的事物隔离级别,表示消费者所消费到的位置。默认值read_uncommitted,可以消费到HW(High Watermark)处的位置,read_committed 会忽略事物未提交的消息,只能消费到LSO(LastStableOffset)的位置。 

表 重要的消费者参数


http://www.kler.cn/a/470715.html

相关文章:

  • 【Cesium】自定义材质,添加带有方向的滚动路线
  • WPF中RenderTargetBitmap问题解决
  • css中的部分文字特性
  • 【CPU】页表项和叶子表项(个人草稿)
  • 《Mcal》--MCU模块
  • JMeter + Grafana +InfluxDB性能监控 (二)
  • C++二十三种设计模式之桥接模式
  • 一文读懂51单片机的中断系统
  • 【React】如何高效使用条件渲染
  • 使用C++对SQLite3数据库进行增、删、改、查操作实例
  • 2024 华为开发者大会介绍(附大会PPT下载)
  • PyTorch 自动混合精度AMP Grad Scaler 源码解析:_unscale_grads_ 与 unscale_ 函数
  • 【C++数据结构——树】二叉树的性质(头歌实践教学平台习题)【合集】
  • Excel 技巧02 - 如何批量输入百分号 (★),如何输入百分号并指定小数位数,如何批量删除百分号,如何批量删除小数最后的0?
  • iOS - Tagged Pointer
  • 使用vue项目中,使用webpack模板和直接用vue.config来配置相关插件 区别是什么,具体有哪些提现呢
  • Oracle数据库如何找到 Top Hard Parsing SQL 语句?
  • 基于Django运维系统实现AWS Route 53管理
  • SDAE的介绍
  • 路由器的转发表
  • 【Python】论文长截图、页面分割、水印去除、整合PDF
  • openwrt 清缓存命令行
  • OSPF - 影响OSPF邻居建立的因素
  • [python3]Excel解析库-openpyxl
  • Python----Matplotlib数据可视化
  • Springboot应急安全学习平台k4u90