当前位置: 首页 > article >正文

Kafka java 配置

前言:
        大家好,大家在springboot项目中,经常采用 @KafkaListener 做为消费者。这个是spring为我们封装的。 但是某些情况 注解的方式并不能满足需求。这个时候就需要手动版本了。

介绍:

        我们已经集成spring-Kafka 就不需要再额外引入kafka-clients的依赖了。直接亮代码。

给大家解释配置含义。

1.Kafka配置代码

public KafkaConsumer<String, String> getCustomer() {
    // 1. 配置属性参数
    Properties properties = new Properties();

    // 设置Kafka集群的地址和端口,消费者将连接到这个地址和端口
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    // 设置键(Key)的反序列化器为StringDeserializer,用于将字节数据转换为String类型
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    // 设置值(Value)的反序列化器为StringDeserializer,用于将字节数据转换为String类型
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    // 设置消费者所属的消费者组,消费者组内的消费者将共同消费同一个Topic的消息
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
    // 设置消费者与Kafka集群之间的会话超时时间(单位:毫秒)
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
    // 设置消费者是否自动提交offset,true表示自动提交
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    // 设置自动提交offset的时间间隔(单位:毫秒)
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
    // 设置每次poll操作返回的最大记录数
    properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1);

    // 根据配置属性创建Kafka消费者实例
    return new KafkaConsumer<>(properties);
}

2.Kafka消费者代码

@Test
void KafkaConsumerTest() {
    // 创建Kafka消费者实例,通过getCustomer()方法获取
    KafkaConsumer<String, String> consumer = kafkaCustomer.getCustomer();

    // 订阅要消费的主题,这里是 "test-topic"
    consumer.subscribe(Collections.singletonList("test-topic"));

    // 从Kafka服务器拉取消息,poll等待的最长时间设置为10秒(10000000毫秒)
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000000));
    for (ConsumerRecord<String, String> record : records) {
        // 处理消息的逻辑
        // 打印消息的offset、key和value
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

        //以下代码是我的场景,本人需要在某些情况跳转,而编写单元测试做试验的。
        boolean flag = true;
        if (flag){
            // 如果flag为true,则不自动提交offset,可以在这里添加业务逻辑处理消息
            // 如果需要手动提交offset,可以取消注释下面的代码
            // consumer.commitAsync();
            // 由于flag为true,这里会跳出循环,不再处理后续的消息
            break;
        }
    }
    // 关闭消费者,释放资源
    consumer.close();
    // 打印结束消费的日志
    System.out.println("结束消费");
}


http://www.kler.cn/a/388662.html

相关文章:

  • 后端接口返回二进制文件,前端 window.opent预览展示
  • Dial-insight:利用高质量特定领域数据微调大型语言模型防止灾难性遗忘
  • 重学SpringBoot3-整合 Elasticsearch 8.x (三)使用Repository
  • 机器学习———特征工程
  • 【韩老师零基础30天学会Java 】06章 数组、排序和查找
  • Springboot应用的端口配置方法解析与优先级详解
  • Transformer-GRU、Transformer、CNN-GRU、GRU、CNN五模型多变量回归预测
  • Python的函数(补充浅拷贝和深拷贝)
  • 测试开发面试题记录
  • 拿下阿里云之后如何在本地运行镜像进行分析
  • 二维前缀和 子矩阵的和
  • 「iOS」——知乎日报第三周总结
  • 云财务SaaS财务软件源码
  • 深入理解智能合约 ABI
  • ORU 的 Open RAN 管理平面 (M 平面)
  • 词嵌入方法(Word Embedding)
  • Openstack nova创建一台实例的过程概述
  • Axios 的 responseType 属性详解及 Blob 与 ArrayBuffer 解析
  • Vue 3 中Pinia状态管理库的使用方法总结
  • 数据仓库之 Atlas 血缘分析:揭示数据流奥秘
  • 2024软件测试面试热点问题
  • 【51单片机4位数码管左右移位显示0-9不用数组】2022-4-19
  • 【ETL:概念、流程与应用】
  • Stable Diffusion Web UI - ControlNet 边缘特征提取 CANNY
  • Linux grep 使用正则表达式说明
  • SpringBoot中的注解详解(一)