当前位置: 首页 > article >正文

Kafka生产者如何提高吞吐量?

1、batch.size:批次大小,默认16k

2、linger.ms:等待时间,修改为5-100ms

3、compression.type:压缩snappy

4、 RecordAccumulator:缓冲区大小,修改为64m

 测试代码:

package com.bigdata.producter;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
 *   测试自定义分区器的使用
 */
public class CustomProducer07 {

    public static void main(String[] args) {

        // Properties 它是map的一种
        Properties properties = new Properties();
        // 设置连接kafka集群的ip和端口
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        /**
         *  此处是提高效率的代码
         */
        // batch.size:批次大小,默认 16K
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 18000);
        // linger.ms:等待时间,默认 0
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        // RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        // compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");



        // 创建了一个消息生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        // 调用这个里面的send方法
        // ctrl + p
        /*ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first","告诉你个秘密");
        kafkaProducer.send(producerRecord);*/
        for (int i = 0; i < 5; i++) {
            // 发送消息的时候,指定key值,但是没有分区号,会根据 hash(key) % 3 = [0,1,2]
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("first","c","告诉你个找bigdata的好办法:"+i);
            // 回调-- 调用之前先商量好,回扣多少。
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    // 获取很多信息,exception == null 说明成功,不为null,说明失败
                    if(exception == null){
                        System.out.println("消息发送成功");
                        System.out.println(metadata.partition());// 三个分区,我什么每次都是2分区,粘性分区
                        System.out.println(metadata.offset());// 13 14 15 16 17
                        System.out.println(metadata.topic());
                    }else{
                        System.out.println("消息发送失败,失败原因:"+exception.getMessage());
                    }

                }
            });
        }

        kafkaProducer.close();
    }
}

测试:

①在 bigdata01 上开启 Kafka 消费者。

bin/kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic first 

②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。


http://www.kler.cn/a/386921.html

相关文章:

  • CV 图像处理基础笔记大全(超全版哦~)!!!
  • 【Unity3D】利用Hinge Joint 2D组件制作绳索效果
  • 【tailscale 和 ssh】当服务器建立好节点,但通过客户端无法通过 ssh 连接
  • 联发科MTK6762/MT6762安卓核心板_4G智能模块应用
  • Redis系列之底层数据结构字典Dict
  • 初识go语言之指针用法
  • 使用 Redux 在 Flutter鸿蒙next 中实现状态管理
  • Excel:vba实现正则匹配
  • 【Linux】Ansible集中化运维工具(详解)安装、常用模块、playbook脚本
  • MQTT协议解析 : 物联网领域的最佳选择
  • 浏览器是如何渲染页面的? - 2024最新版前端秋招面试短期突击面试题
  • Git遇到“fatal: bad object refs/heads/master - 副本”问题的解决办法
  • 【Webpack配置全解析】打造你的专属构建流程️(1-4)
  • DBeaver工具连接Hive
  • 冒泡选择法(c基础)
  • 【.NET 8 实战--孢子记账--从单体到微服务】--简易权限--角色可访问接口管理
  • 探索 Python 的新边疆:sh 库的革命性功能
  • AWTK fscript 中的 JSON 扩展函数
  • Spark 的介绍与搭建:从理论到实践
  • Java命名规范
  • (2024.11.5)亚博树莓派5部署yolov8目标检测
  • Jmeter的安装,设置中文,解决乱码问题
  • A021基于Spring Boot的自习室管理和预约系统设计与实现
  • 前端实现数据下载为json文件
  • 【Lucene】什么是全文检索?解读结构化数据与非结构化数据
  • 从pg_depend和pg_class开始了解MogDB/openGauss/postgresql的系统元数据设计