当前位置: 首页 > article >正文

kafka 消费者线程安全问题详细探讨

内容概要

图片

主要内容

常见错误案例

下面这段代码大概逻辑

  • 初始化时 实例化KafkaConsumer, 开启线程拉取消息并且处理

  • 资源释放回调 停止线程、调用kafkaConsumer.close进行资源释放

表面上没有问题,但实际上可能出现线程安全问题,因为poll 和 close 两个操作可能同时执行,因此存在线程安全问题, 如何修改,读者自己思考下。

    @PostConstruct
    public void consumer(){
        kafkaConsumer = new KafkaConsumer(getConfig());
        kafkaConsumer.subscribe(Arrays.asList("test_partition_num"));

        new Thread(new Runnable() {
            @Override
            public void run() {
                while(running){
                    ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
                    records.forEach(record->{
                        System.out.println(" partition =" + record.partition()  +" offset  = " + record.offset() + " value = " + record.value());
                    });
                }
            }
        }).start();
    }

    @PreDestroy
    public void close(){
        running = false;
        if(kafkaConsumer != null){
            kafkaConsumer.close();
        }
    }

消费者非线程安全代码解读

kafka生成者是线程安全的,但消费者是非线程安全的。KafkaConsumer

  • 相关操作前

    • 调用acquire()方法,校验线程安全问题,如果发现其他线程也在操作,则直接抛出异常。

  • 操作完成后

    • 调用release()清除痕迹

acquire()相对于加锁,release()相当于释放锁。

参看poll 方法实现,一目了然。

    private void acquire() {
        long threadId = Thread.currentThread().getId();
        if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        } else {
            this.refcount.incrementAndGet();
        }
    }
    
      private void release() {
        if (this.refcount.decrementAndGet() == 0) {
            this.currentThread.set(-1L);
        }
    }

图片

poll源码

如何实现消费者多线程消费消息呢

思路1

每次实例化一个 KafkaConsumer

这种方式实现简单,但每次都需要建立TCP 链接


思路2

相关操作方法 加上  synchronized,获取使用Lock 加锁保证线程安全

这种方式性能较差

思路3

拉取消息使用一个线程, 消息处理使用多线程

因为通常拉取消息比较快,消息处理比较耗时,由于消息处理不涉及KafkaConsumer 相关API 操作,因此不存在线程安全问题。这种方式建议消息位移设置自动提交,否则编程复杂度较高。

示例代码

ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(1000));

executorService.execute(()->{
    //处理消息
    records.forEach(record->{
        System.out.println(" partition =" + record.partition()  +" offset  = " + record.offset() + " value = " + record.value());
    });
});

旁敲侧击 举一反三

面试题回顾 Dubbo 线程模型

通常我们线程分为两类

  • IO 线程:负责网络通信的读写操作,接收和发送请求与响应。

  • 业务线程:处理具体的业务逻辑,避免因业务处理耗时过长而阻塞 IO 线程。


Dubbo 线程模型有几种你还记得否?该如何选择?

  • AllDispatcher:所有消息都派发到线程池,包括请求、响应、连接事件、断开事件、心跳等。

  • DirectDispatcher:所有消息都不派发到线程池,全部在 IO 线程上直接执行。

  • MessageOnlyDispatcher:只有请求和响应消息派发到线程池,其它连接断开、心跳等消息直接在 IO 线程上执行。

  • ExecutionDispatcher:只把请求消息派发到线程池,响应和其它连接、断开、心跳等消息直接在 IO 线程上执行。

其实选择的依据 业务处理的快慢,如果业务处理很快则建议让业务处理逻辑放到 IO线程中执行,这样避免线程上下文切换影响性能。反之则处理逻辑需要放到具体的业务线程中执行。

一般来说业务执行需要查询数据库,绝大数场景建议使用默认的 AllDispatcher 

是不是又和我一起温故知新了,加油吧 少年 !!!


http://www.kler.cn/a/326042.html

相关文章:

  • uniapp 实现tabbar分类导航及滚动联动效果
  • 腾讯云内容合规基于springboot架构设计
  • HDMI之SBTM
  • 问:说说SpringDAO及ORM的用法?
  • 谷粒商城のRedisESRabbit MQ集群
  • ChromeDriver 官方下载地址_测试自动化浏览器驱动
  • 记HttpURLConnection下载图片
  • 综合绩效考核系统源码,三级医院绩效管理系统源码,基于springboot、mybaits+avue技术开发,支持项目二开。
  • Stable Diffusion绘画 | SDXL模型的优缺点及模型推荐
  • golang学习笔记13-函数(二):init函数,匿名函数,闭包,defer
  • 第L2周:机器学习|线性回归模型 LinearRegression:2. 多元线性回归模型
  • Vulhub zico 2靶机详解
  • GS-SLAM论文阅读笔记--MM3DGS SLAM
  • A Learning-Based Approach to Static Program Slicing —— 论文笔记
  • 【Git原理与使用】分支管理
  • C++可见性
  • 关于武汉芯景科技有限公司的IIC电平转换芯片XJ9509开发指南(兼容PCa9509)
  • Matlab实现麻雀优化算法优化回声状态网络模型 (SSA-ESN)(附源码)
  • linux环境oracle11.2.0.4打补丁(p31537677_112040_Linux-x86-64.zip)
  • [M贪心] lc2207. 字符串中最多数目的子序列(模拟+贪心+一次遍历+代码细节+思维)
  • 无人机避障—— 激光雷达定高北醒TF03-UART(二)
  • 【基础算法总结】分治--快排+归并
  • YOLOv8改进,YOLOv8改进损失函数采用Powerful-IoU(2024年最新IOU),助力涨点
  • 【YOLOv10改进[SPPF]】使用 SPPFCSPC替换SPPF模块 + 含全部代码和详细修改方式
  • Linux内核 -- 读写文件系统文件之kernel_read与kernel_write
  • APISIX 联动雷池 WAF 实现 Web 安全防护