当前位置: 首页 > article >正文

kafka消息发送几种方式

同步发送 or 异步发送

       消息发送根据是否需要处理发送的结果分为同步发送、异步发送。

同步发送:等待发送结果返回,这种方式是可靠的,因为异常能及时处理,但同步发送需要阻塞等待一条消息发送完才处理下一条,吞吐量差。


异步发送:发送是异步的,不关心发送的结果,吞吐量最高,但可能存在发送失败的情况。

    本质上kafka 客户端提供的发送接口都是异步的,因为发送接口返回的是一个Future对象。对于同步发送通过future.get获取发送结果。异步发送则忽略send 返回值。

ListenableFuture<SendResult> future = kafkaTemplate.send(topic, content);
  try {
      SendResult sendResult = future.get();
  } catch (InterruptedException e) {
      e.printStackTrace();
  } catch (ExecutionException e) {
      e.printStackTrace();
  }

发送完成回调

有没有办法既要异步发送还要能处理发送失败的场景,这就是第三种,发送完成时,执行相应的回调方法。这是折中方案,兼顾效率且保证发送失败能被监控到。

producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {

if(e != null){
    System.out.println("send error ");
}else {
    System.out.println("send result  topic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "  offset=" + recordMetadata.offset() );
}

}
});

发送异常

       有些发送异常可以通过重试几次后解决,比如网络异常,对于有些异常比如消息太大超出kafka配置的最大消息字节数,这类异常重试也会失败,所以这类异常KafkaProducer 不会进行任何重试。对于可重试异常可以配置重试次数

spring.kafka.producer.retries=10

SpringBoot 集成简单介绍

     参考上篇文章SpringBoot 集成配置(pom依赖、application配置),简单讲解SpringBoot 几个重要自动装配类。

KafkaAutoConfiguration

KafkaAutoConfiguration给我们自动配置了几个类

KafkaTemplate:可以通过KafkaTemplate进行发送消息,本质上内部还是使用的KafkaProducer发送消息的。

ProducerFactory:KafkaProducer工厂,通过createProducer()方法可以获取(KafkaProducer) 进行发送消息,避免直接new KafkaProducer

使用方式也很简单,由于直接KafkaAutoConfiguration已经定义了相关Bean, 使用时注入Bean即可

图片

@Autowired
private KafkaTemplate kafkaTemplate;

@Autowired
private ProducerFactory producerFactory;

具体代码

同步发送、异步发送的方式直接使用 kafkaTemplate即可完成,同步发送结果处理:这里简单的打印出消息的topic partition offset 等信息如下图

ListenableFuture<SendResult> future = kafkaTemplate.send(topic, content);
SendResult sendResult = future.get();
RecordMetadata recordMetadata = sendResult.getRecordMetadata();
System.out.println("send result  topic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "  offset=" + recordMetadata.offset() );
 

                                   

图片

发送回调kafkaTemplate没有对应api , 需要通过Producer发送,我们通过producerFactory获取。

ProducerRecord record = new ProducerRecord(topic,content);
    Producer producer = producerFactory.createProducer();
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {

            if(e != null){
                System.out.println("send error ");
            }else {
                System.out.println("send result  topic ="+recordMetadata.topic() + " partition=" + recordMetadata.partition() + "  offset=" + recordMetadata.offset() );
            }
        }
    });


http://www.kler.cn/news/310303.html

相关文章:

  • 系统架构设计师 数据库篇
  • superset 解决在 mac 电脑上发送 slack 通知的问题
  • 如何准备教师资格证科目三“学科知识与教学能力”的考试与面试?(理科导向:数学/物理)
  • 基于Springboot+vue的音乐网站
  • 深度学习速通系列:TextCNN介绍
  • Koa (下一代web框架) 【Node.js进阶】
  • 遥感图像目标检测数据集-DOTA数据集
  • SAP自动化-ME12批量更新某行价格
  • 分类评估指标:准确率、精确度、召回率、F1分数、Roc详解
  • 单片机(Microcontroller)原理及应用
  • git拉取大文件
  • Spring 源码解读:手动实现Spring的资源管理机制
  • 图像处理与OCR识别的实践经验(1)
  • React框架搭建,看这一篇就够了,看完你会感谢我
  • 思通数科开源产品:免费的AI视频监控卫士安装指南
  • 【C++】模拟实现vector
  • Aegisub字幕自动化及函数篇(图文教程附有gif动图展示)(一)
  • 如何在 CentOS 6 上使用 RVM 安装 Ruby on Rails
  • C语言自定义类型结构体(24)
  • JAVA基础面试题(第二十二篇)MYSQL---锁、分库分表!
  • Shell篇之编写MySQL启动脚本
  • 9.17日常记录
  • 深度学习-14-深入理解BERT的基本思想和如何训练BERT模型
  • Golang | Leetcode Golang题解之第416题分割等和子集
  • golang学习笔记28——golang中实现多态与面向对象
  • 微信getUserProfile不弹出授权框
  • 编写第一个hadoop3.3.6的mapreduce程序
  • 解决 npm ERR! node-sass 和 gyp ERR! node-gyp 报错问题
  • 第十一章 【后端】商品分类管理微服务(11.4)——spring-boot-devtools
  • play-with-docker使用指南