当前位置: 首页 > article >正文

Kafka-Java四:Spring配置Kafka消费者提交Offset的策略

一、Kafka消费者提交Offset的策略

Kafka消费者提交Offset的策略有

  1. 自动提交Offset:
    1. 消费者将消息拉取下来以后未被消费者消费前,直接自动提交offset。
    2. 自动提交可能丢失数据,比如消息在被消费者消费前已经提交了offset,有可能消息拉取下来以后,消费者挂了
  2. 手动提交Offset
    1. 消费者在消费消息时/后,再提交offset,在消费者中实现
    2. 手动提交Offset分为:手动同步提交、手动异步提交
  3. 什么是Offset
    1. 参考文章:Linux:【Kafka三】组件介绍

二、自动提交策略

        Kafka消费者默认是自动提交Offset的策略

        可设置自动提交的时间间隔

package com.demo.lxb.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @Description: kafka消费者消费消息,自动提交offset
 * @Author: lvxiaobu
 * @Date: 2023-10-24 16:26
 **/
public class MyConsumerAutoSubmitOffset {

    private  final static String CONSUMER_GROUP_NAME = "GROUP1";
    private  final static String TOPIC_NAME = "topic0921";

    public static void main(String[] args) {
        Properties props = new Properties();

        // 一、设置参数
        // 配置kafka地址
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "192.168.151.28:9092"); // 单机配置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置
        // 配置消息 键值的序列化规则
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        // 配置消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);

        // 设置消费者offset的提交方式
        // 自动提交:默认配置
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        // 自动提交offset的时间间隔
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        // 二、创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
        // 三、消费者订阅主题
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        // 四、拉取消息,开始消费
        while (true){
            // 从kafka集群中拉取消息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            // 消费消息,当前是自动提交模式,在消息上一行消息被拉取下来以后,offset就自动被提交了,下面的代码如果出错,或者此时
            // 消费者挂掉了,那么消费其实是没有进行消费的(也就是业务逻辑处理)
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()
                + ", key值: " + record.key() + " , value值: "+record.value());
            }
        }
    }
}

上述代码中的如下代码是自动提交策略的相关设置 

        // 设置消费者offset的提交方式
        // 自动提交:默认配置
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
        // 自动提交offset的时间间隔
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

三、手动提交策略

3.1、手动同步提交策略

        手动同步提交,会在提交offset处阻塞。当消费者接收到 kafka集群返回的消费者提交offset成功的ack后,才开始执行消费者中后续的代码。

        因为使用异步提交容易丢失消息,固一般使用同步提交,在同步提交后不要再做其他逻辑处理。

package com.demo.lxb.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * @Description: kafka消费者消费消息,手动同步提交offset
 * @Author: lvxiaobu
 * @Date: 2023-10-24 16:26
 **/
public class MyConsumerMauSubmitOffset {

    private  final static String CONSUMER_GROUP_NAME = "GROUP1";
    private  final static String TOPIC_NAME = "topic0921";

    public static void main(String[] args) {
        Properties props = new Properties();

        // 一、设置参数
        // 配置kafka地址
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "192.168.151.28:9092"); // 单机配置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置
        // 配置消息 键值的序列化规则
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        // 配置消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);

        // 设置消费者offset的提交方式
        // 手动提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        // 自动提交offset的时间间隔:此时不再需要设置该值
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        // 二、创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
        // 三、消费者订阅主题
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        // 四、拉取消息,开始消费
        while (true){
            // 从kafka集群中拉取消息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            //  业务逻辑处理
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()
                + ", key值: " + record.key() + " , value值: "+record.value());
            }

            // 当for循环业务逻辑处理结束以后,再手动提交offset
            // 同步方式提交,此时会产生阻塞,当kafka集群返回了提交成功的ack以后,才会消除阻塞,进行后续的代码逻辑。
            // 一般使用同步提交,在同步提交后不再做其他逻辑处理
            consumer.commitAsync();


            // do anything


        }
    }
}

3.2、手动异步提交策略

        异步提交,不会在提交offset代码处阻塞,即消费者提交了offset后,不需要等待kafka集群返回的ack即可继续执行后续代码。但是在提交offset时需要提供一个回调方法,供kafka集群回调,来告诉消费者提交offset的结果。

package com.demo.lxb.kafka;

import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

/**
 * @Description: kafka消费者消费消息,手动异步提交offset
 * @Author: lvxiaobu
 * @Date: 2023-10-24 16:26
 **/
public class MyConsumerMauSubmitOffset2 {

    private  final static String CONSUMER_GROUP_NAME = "GROUP1";
    private  final static String TOPIC_NAME = "topic0921";

    public static void main(String[] args) {
        Properties props = new Properties();

        // 一、设置参数
        // 配置kafka地址
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "192.168.151.28:9092"); // 单机配置
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置
        // 配置消息 键值的序列化规则
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        // 配置消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_GROUP_NAME);

        // 设置消费者offset的提交方式
        // 手动提交offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        // 自动提交offset的时间间隔:此时不再需要设置该值
//        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        // 二、创建消费者
        KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
        // 三、消费者订阅主题
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        // 四、拉取消息,开始消费
        while (true){
            // 从kafka集群中拉取消息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
         
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("接收到的消息: 分区: " + record.partition() + ", offset: " + record.offset()
                + ", key值: " + record.key() + " , value值: "+record.value());
            }
            // 异步提交,不影响后续的内容。
            // new OffsetCommitCallback是kafka集群会回调的方法,告诉消费者提交offset的结果
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                    if(e != null){
                        // 可将提交失败的消息记录到日志
                        System.out.println("记录提交offset失败的消息到日志");
                        System.out.println("消费者提交offset抛出异常:" + Arrays.toString(e.getStackTrace()));
                        System.out.println("消费者提交offset异常的消息信息:" + JSONObject.toJSONString(map));
                    }
                }
            });

            // 后续逻辑处理,不需要等到kafka集群返回了提交成功的ack以后才开始处理。
            //do anything


        }
    }
}


http://www.kler.cn/a/107616.html

相关文章:

  • 【大模型】Langchain-Chatchat-v0.3.1 的环境配置
  • Cesium加载地形
  • MapReduce完整工作流程
  • 大疆C++开发面试题及参考答案
  • nvim 打造成可用的IDE(2)
  • 数据在内存的存储
  • vue如何使用路由拦截器
  • 数据结构 C语言 2.1 线性表抽象数据类型 2.2 小议顺序表
  • Tp框架如何使用事务和锁,还有查询缓存
  • Linux UWB Stack实现——FiRa会话状态机
  • jmeter疑难杂症
  • 数据库数据恢复—Oracle数据库报错ORA-01110错误的数据恢复案例
  • Hive 常用DML操作
  • 前端移动web高级详细解析二
  • 安装虚拟机(VMware)保姆级教程及配置虚拟网络编辑器和安装WindowsServer以及宿主机访问虚拟机和配置服务器环境
  • 实体店做商城小程序如何
  • 模数转换器-ADC基础
  • 深入探究深度学习、神经网络与卷积神经网络以及它们在多个领域中的应用
  • Android-宝宝相册(第四次作业)
  • 【计算机网络】(谢希仁第八版)第一章课后习题答案
  • 软考 系统架构设计师系列知识点之设计模式(9)
  • ES6之Set集合(通俗易懂,含实践)
  • 外卖霸王餐系统 支持小程序,分站合作
  • 关于pycharm中句号变成点的问题
  • Redis 与 MySQL 一致性 实现方案
  • RSA:基于小加密指数的攻击方式与思维技巧