当前位置: 首页 > article >正文

kafka自定义配置信息踩坑

org.apache.kafka.common.config.ConfigException: Invalid value 0 for configuration acks: Expected value to be a string, but it was a java.lang.Integer

场景描述:
单个kafka使用springboot框架自带的 yml 配置完全OK(因为底层会帮我们处理好类型)
但是我用到了多个kafka,并且kafka的信息都是从远端获取的,那么我创建消费者和创建生产者都是动态的,但是当我使用其中一个生产者发送消息的时候爆出上述错误;

根本原因 acks的配置值希望是 string,但是我写的是integer

源码解释一下:
在这里插入图片描述
getstring
在这里插入图片描述
测试一下发现会报错:
在这里插入图片描述

修改后:

    /**
     * 构造生产者配置
     * @param ip
     * @param port
     * @return
     */
    private Map<String, Object> buildDefaultKafkaProducerMap(String ip, Integer port) {
        Map<String, Object> propertiesMap = new HashMap<>();
        propertiesMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ip + ":" + port);
        propertiesMap.put(ProducerConfig.RETRIES_CONFIG, 0);
        propertiesMap.put(ProducerConfig.BATCH_SIZE_CONFIG, 20960);//单独批次发送数据量
        propertiesMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        propertiesMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        propertiesMap.put(ProducerConfig.ACKS_CONFIG, "0");
        propertiesMap.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);//发送数据缓存
        propertiesMap.put(ProducerConfig.LINGER_MS_CONFIG, 10);//超过10ms未发送则强制发送
        return propertiesMap;
    }

    /**
     * 构造消费者配置
     * @param ip
     * @param port
     * @return
     */
    private Map<String, Object> buildDefaultKafkaConsumerMap(String ip, Integer port) {
        Map<String, Object> propertiesMap = new HashMap<>();
        propertiesMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ip + ":" + port);
        //earliest:如果消费者组没有已提交的偏移量(即新的消费者组),则从主题的最早消息开始消费
        //latest:如果消费者组没有已提交的偏移量,则从最新的消息开始消费(即从消费者启动之后生成的消息)
        propertiesMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerAutoOffsetReset);
        //propertiesMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 20);//自动提交时间间隔
        propertiesMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);//不自动提交偏移量 手动提交
        propertiesMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        propertiesMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        propertiesMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);//在时间内没有收到心跳,则该消费者会被剔除组
        propertiesMap.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        propertiesMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);//单次拉取50条数据
        return propertiesMap;
    }

http://www.kler.cn/a/350972.html

相关文章:

  • Git 版本控制:.gitignore 文件完全指南
  • 网络安全技术深度解析与实践案例
  • 归子莫的科技周刊#2:白天搬砖,夜里读诗
  • 【网络 MAC 学习专栏 -- 如何理解 PHY 的 Link Up】
  • SpringCloud源码-Ribbon
  • APISQL在线一键安装教程
  • php中的错误和异常捕获
  • 主流网络设备的组网方式和配置命令
  • Midjourney中文版:开启AI绘画新纪元
  • Learning to Adapt to Light
  • 【Flutter】Dart:流程控制语句
  • shell案例之一键部署kafka
  • Triton矩阵乘
  • 数据分析:R语言计算XGBoost二分类模型的SHAP值
  • python基于大数据的电影市场预测分析
  • 什么是MoE?
  • electron 操作 cookie
  • 大数据与人工智能在金融风险控制中的应用
  • Ajax(web笔记)
  • 《京东金融APP的鸿蒙之旅系列专题》鸿蒙工程化:Hvigor构建技术
  • 考研日语 - 高频核心 2200 词(十)
  • 【从零开始的LeetCode-算法】3158.求出出现两次数字的 XOR 值
  • latex公式输入-矩阵
  • 《深度学习》OpenCV 风格迁移、DNN模块 案例解析及实现
  • MyBatis-Plus 记录
  • wireshark 解密浏览器https数据包