kafka 消费者线程安全问题详细探讨
内容概要
主要内容
常见错误案例
下面这段代码大概逻辑
-
初始化时
实例化KafkaConsumer, 开启线程拉取消息并且处理 -
资源释放回调
停止线程、调用kafkaConsumer.close进行资源释放
表面上没有问题,但实际上可能出现线程安全问题,因为poll 和 close 两个操作可能同时执行
,因此存在线程安全问题, 如何修改,读者自己思考下。
@PostConstruct
public void consumer(){
kafkaConsumer = new KafkaConsumer(getConfig());
kafkaConsumer.subscribe(Arrays.asList("test_partition_num"));
new Thread(new Runnable() {
@Override
public void run() {
while(running){
ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
records.forEach(record->{
System.out.println(" partition =" + record.partition() +" offset = " + record.offset() + " value = " + record.value());
});
}
}
}).start();
}
@PreDestroy
public void close(){
running = false;
if(kafkaConsumer != null){
kafkaConsumer.close();
}
}
消费者非线程安全代码解读
kafka生成者是线程安全的,但消费者是非线程安全的。KafkaConsumer
-
相关操作前
-
调用acquire()方法,校验线程安全问题,如果发现其他线程也在操作,则直接抛出异常。
-
-
操作完成后
-
调用release()清除痕迹
-
acquire()相对于加锁,release()相当于释放锁。
参看poll 方法实现,一目了然。
private void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
} else {
this.refcount.incrementAndGet();
}
}
private void release() {
if (this.refcount.decrementAndGet() == 0) {
this.currentThread.set(-1L);
}
}
poll源码
如何实现消费者多线程消费消息呢
思路1
每次实例化一个 KafkaConsumer
这种方式实现简单,但每次都需要建立TCP 链接
思路2
相关操作方法 加上 synchronized,获取使用Lock 加锁保证线程安全
这种方式性能较差
思路3
拉取消息使用一个线程, 消息处理使用多线程
因为通常拉取消息比较快,消息处理比较耗时,由于消息处理不涉及KafkaConsumer 相关API 操作,因此不存在线程安全问题。这种方式建议消息位移设置自动提交,否则编程复杂度较高。
示例代码
ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
executorService.execute(()->{
//处理消息
records.forEach(record->{
System.out.println(" partition =" + record.partition() +" offset = " + record.offset() + " value = " + record.value());
});
});
旁敲侧击 举一反三
面试题回顾 Dubbo 线程模型
通常我们线程分为两类
-
IO 线程:负责网络通信的读写操作,接收和发送请求与响应。
-
业务线程:处理具体的业务逻辑,避免因业务处理耗时过长而阻塞 IO 线程。
Dubbo 线程模型有几种你还记得否?该如何选择?
-
AllDispatcher:所有消息都派发到线程池,包括请求、响应、连接事件、断开事件、心跳等。
-
DirectDispatcher:所有消息都不派发到线程池,全部在 IO 线程上直接执行。
-
MessageOnlyDispatcher:只有请求和响应消息派发到线程池,其它连接断开、心跳等消息直接在 IO 线程上执行。
-
ExecutionDispatcher:只把请求消息派发到线程池,响应和其它连接、断开、心跳等消息直接在 IO 线程上执行。
其实选择的依据 业务处理的快慢
,如果业务处理很快则建议让业务处理逻辑放到 IO线程中执行,这样避免线程上下文切换影响性能。反之则处理逻辑需要放到具体的业务线程中执行。
一般来说业务执行需要查询数据库,绝大数场景建议使用默认的 AllDispatcher
是不是又和我一起温故知新了,加油吧 少年 !!!