当前位置: 首页 > article >正文

【大数据学习 | kafka】producer的参数与结构

1. producer的结构

producer:生产者

它由三个部分组成

interceptor:拦截器能拦截到数据,处理完毕以后发送给下游,它和过滤器不同并不是丢弃数据,而是将数据处理完毕再次发送出去,这个默认是不存在的

serialiazer:序列化器kafka中存储的数据是二进制的,所以数据必须经过序列化器进行处理,这个是必须要有的,将用户的数据转换为byte[]的工具类,其中k和v要分别指定

partitioner: 分区器主要是控制发送的数据到topic的哪个分区中,这个默认也是存在的

record accumulator

本地缓冲累加器 默认32M

producer的数据不能直接发送到kafka集群中,因为producer和kafka集群并不在一起,远程发送的数据不是一次发送一条这样太影响发送的速度和性能,所以我们发送都是攒一批数据发一次,record accumulator就是一个本地缓冲区,producer将发送的数据放入到缓冲区中,另外一个线程会去拉取其中的数据,远程发送给kafka集群,这个异步线程会根据linger.msbatch-size进行拉取数据。如果本地累加器中的数据达到batch-size或者是linger.ms的大小阈值就会拉取数据到kafka集群中,这个本地缓冲区不仅仅可以适配两端的效率,还可以批次形式执行任务,增加效率

batch-size 默认16KB

linger.ms 默认为0

生产者部分的整体流程

首先producer将发送的数据准备好

经过interceptor的拦截器进行处理,如果有的话

然后经过序列化器进行转换为相应的byte[]

经过partitioner分区器分类在本地的record accumulator中缓冲

sender线程会自动根据linger.ms和batch-size双指标进行管控,复制数据到kafka

2. producer的简单代码

2.1 准备:

引入maven依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.3.2</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
</dependencies>

在resources文件中创建log4j.properties

log4j.rootLogger=info,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c %M(): %m%n

2.2 生产者中的设定参数

参数含义
bootstrap.serverskafka集群的地址
key.serializerkey的序列化器,这个序列化器必须和key的类型匹配
value.serializervalue的序列化器,这个序列化器必须和value的类型匹配
batch.size批次拉取大小默认是16KB
linger.ms拉取的间隔时间默认为0,没有延迟
partitioner分区器存在默认值
interceptor拦截器选的

2.3 全部代码

public class producer_test {
    public static void main(String[] args) {
        Properties pro = new Properties();
        pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");
        //设定集群地址
        pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //设定两个序列化器,其中StringSerializer是系统自带的序列化器,要和数据的类型完全一致
        pro.put(ProducerConfig.BATCH_SIZE_CONFIG, 16*1024);
        //batch-size默认是16KB,参数的单位是byte
        pro.put(ProducerConfig.LINGER_MS_CONFIG, 0);
        //默认等待批次时长是0
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
        ProducerRecord<String, String> record = new ProducerRecord<>("topic_a", "this is hainiu");
        //发送数据的时候有kv两个部分,但是一般k我们什么都不放,只放value的值
        producer.send(record);
        producer.close();
    }
}

在x-shell中观察消费的数据


http://www.kler.cn/a/376444.html

相关文章:

  • LeetCode100之找到字符串中所有字母异位词(438)--Java
  • Spring AI : 让ChatGPT成为你构建应用的核心亮点
  • 【flink】之kafka到kafka
  • vscode 创建 vue 项目时,配置文件为什么收缩到一起展示了?
  • 实现一个支持多线程的Web服务器,能够处理多个客户端请求
  • 刘艳兵-DBA016-在您的数据库中,SALES表存在于SH用户中,并且启用了统一审计。作为DBA,您成功执行了以下指令:
  • NeurIPS - Ariel Data Challenge 2024
  • Maven讲解从基础到高级配置与实践
  • 恋爱脑学Rust之Box与RC的对比
  • ChatGPT新体验:AI搜索功能与订阅支付指南
  • layui 自定义验证单选框必填
  • CSS3新增边框属性(五)
  • Java基于微信小程序的私家车位共享系统(附源码,文档)
  • Vim的简单使用
  • 到底要不要用SAP Screen Personas
  • Vue中监听属性watch的求值,以及与computed的适用场景
  • 【开源免费】基于SpringBoot+Vue.J影城管理系统(JAVA毕业设计)
  • HuggingFace情感分析任务微调
  • Prompt Engineering (Prompt工程)
  • 产品定义和独开分类
  • Qt | windows视频播放器小项目
  • 基于SpringBoot的物品分类识别管理系统uniapp源码带文档教程
  • 行业深耕+全球拓展双轮驱动,用友U9 cloud加速中国制造全球布局
  • Leetcode 62. 不同路径 动态规划+空间优化
  • opengl学习-2vao和vbo(通义千问的例子)
  • macOS Sequoia 15.1 (24B83) 正式版 ISO、IPSW、PKG 下载