当前位置: 首页 > article >正文

Kafka 之消息并发消费

前言:

我们知道 Kafka 以其高性能著称,Kafka 的性能极其强大,那在 Java 应用中,我们怎么来利用其 Kafka 的高性能呢?

Kafka 系列文章传送门

Kafka 简介及核心概念讲解

Spring Boot 整合 Kafka 详解

Kafka @KafkaListener 注解的详解及使用

Kafka 客户端工具使用分享【offsetexplorer】

Kafka 之消息同步/异步发送

Kafka 之批量消息发送消费

Kafka 之消息广播消费

Kafka 之消息并发消费

我们使用 @KafkaListener 进行消息消费的时候,默认是单线程消费的,当生产者数据量很大的时候,单线程消费很容器导致消费堆积,浪费了 Kafka 的优异性能,我们在开发 Java 应用程序的时候,如果发现单线程效率低不能满足业务场景的时候,我们自然而然的想到了使用多线程来处理,同样 Kafka 是支持多线程消费的,而且 spring-cloud-starter-stream-kafka 已经帮我们实现好了,我们只需要按需配置即可。

我们在来看看 @KafkaListener 注解,该注解有一个属性叫 concurrency,该属性可以指定并发小消费的现成数量,我们可以进行如下配置:

@KafkaListener(id = "my-kafka-consumer",
            groupId = "my-kafka-consumer-groupId",
            topics = "my-topic",
            concurrency = "3",
            containerFactory = "myContainerFactory")

concurrency = “3”,表示启动三个线程来消费,接下来我们来演示一下 Kafka 的批量消费。

Kafka 并发消费之 Producer 代码

生产者代码还是没有什么特殊,我写了一个 for 循环来发送 100 条消息,如下:

package com.order.service.kafka.producer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @ClassName: MyKafkaConcurrencyProducer 
 * @Author: Author
 * @Date: 2024/10/22 19:22
 * @Description:
 */
@Slf4j
@Component
public class MyKafkaConcurrencyProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void batchSendMessage() {
        for (int a = 0; a < 100; a++) {
            this.kafkaTemplate.send("my-topic", "第" + a + "条 kafka 消息");
        }
    }

}

Kafka 并发消费之消费者代码

并发消费我们还是使用 @KafkaListener 注解来完成消息消费监听,具体代码如下:

package com.order.service.kafka.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @ClassName: MyKafkaConcurrencyConsumer
 * @Author: zhangyong
 * @Date: 2024/10/22 19:22
 * @Description:
 */
@Slf4j
@Component
public class MyKafkaConcurrencyConsumer {

    @KafkaListener(id = "my-kafka-concurrency-consumer",
            groupId = "my-kafka-consumer-groupId",
            topics = "my-topic",
            concurrency = "3",
            containerFactory = "myContainerFactory")
    public void listen(String message) {
        log.info("消息消费成功,线程ID:{},消息内容:{}", Thread.currentThread().getId(), message);
    }

}

注意这里相交前面分享的案例,我们在注解的属性上加了 concurrency = “3”,正是这个属性才能放 Kafka 完成并发消费。

Kafka 并发消费之结果验证

我们触发消息发送,控制台部分日志如下:

2024-10-27 18:10:12.089  INFO 7308 --- [-consumer-2-C-1] c.o.s.k.c.MyKafkaConcurrencyConsumer     : 消息消费成功,线程ID:146,消息内容:第12条 kafka 消息
2024-10-27 18:10:12.097  INFO 7308 --- [-consumer-1-C-1] c.o.s.k.c.MyKafkaConcurrencyConsumer     : 消息消费成功,线程ID:144,消息内容:第46条 kafka 消息
2024-10-27 18:10:12.100  INFO 7308 --- [-consumer-2-C-1] c.o.s.k.c.MyKafkaConcurrencyConsumer     : 消息消费成功,线程ID:146,消息内容:第13条 kafka 消息
2024-10-27 18:10:12.123  INFO 7308 --- [-consumer-0-C-1] c.o.s.k.c.MyKafkaConcurrencyConsumer     : 消息消费成功,线程ID:142,消息内容:第68条 kafka 消息
2024-10-27 18:10:12.132  INFO 7308 --- [-consumer-1-C-1] c.o.s.k.c.MyKafkaConcurrencyConsumer     : 消息消费成功,线程ID:144,消息内容:第47条 kafka 消息

通过控制台日志可以看到,线程 ID 分别有 144、146、142,表示确实有多个线程在进行消息监听,结果符合预期。

Kafka 并发消费的时候指定多个个线程合适?

在进行 Kafka 消息并发消费的时候,并不是设定的线程数量越多,效率就越高,线程数的多少跟 Kafka 的 Partition 是有直接关系的,一个 Partition 只能被一个线程消费,因此我们最多将并发线程数量和 Kafka Topic 的 Partition 数量设置一致,或者不要超过 Kafka Topic 的 Partion 的数量,下面我们来分析一下线程数量设置不合理会有什么情况发生。

  • 并发线程数 concurrency > Partition ,会导致有多余的线程没有消息可以消费,会造成资源浪费。
  • 并发线程数 concurrency = Partition ,最佳状态,一个消费者线程消费一个 Partition 的数据。
  • 并发线程数 concurrency < Partition ,会有一个线程消费多个 Partition 的情况,可能会导致消费不均衡的情况出现。

一般没有特殊情况下,使用并发消费的时候,建议都将线程数和消费的 Kafka Topic 的 Partition 数量保持一致。

如有不正确的地方欢迎各位指出纠正。


http://www.kler.cn/a/382814.html

相关文章:

  • C++_STL_xx_番外01_关于STL的总结(常见容器的总结;关联式容器分类及特点;二叉树、二叉搜索树、AVL树(平衡二叉搜索树)、B树、红黑树)
  • 使用GPT-SoVITS训练语音模型
  • loading为什么不更新
  • 第07章 运算符的使用
  • openEuler的aarch64操作系统上安装k3s
  • Windows安装多个NodeJS版本
  • Linux权限解析:用户、组和权限的协同
  • 如何跑通 PHP(web)项目
  • DPDK高性能处理框架VPP
  • 力扣:225 用队列实现栈
  • 【JavaScript】V8,Nodejs 与浏览器
  • 【linux】的爱恨情仇
  • 机器学习—TensorFlow实现
  • C语言必做30道练习题
  • mysql-springboot netty-flink-kafka-spark(paimon)-minio
  • 笔记整理—linux驱动开发部分(8)framebuffer类设备
  • FlinkSql读取外部Mysql和HBase数据库的方法(scala)
  • AI笔筒操作说明及应用场景
  • Rust 图形界面开发——使用 GTK 创建跨平台 GUI
  • .NET 8 中 Entity Framework Core 的使用
  • 无人机无线电频谱侦测运行方式!
  • hf_transformers
  • 五、Java并发 Java Google Guava 实现
  • 解决CORS (跨源资源共享) 错误
  • RabbitMQ 实现消息队列负载均衡
  • 光纤定向耦合器