当前位置: 首页 > article >正文

kafka单条消息过大发送失败

一、背景

生产环境中使用kafka作为消息队列,生产者发送消息失败,查询报错日志,得到如下输出:

Caused by: org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 4067035 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

这个错误信息表明生产者尝试发送到Kafka的消息在序列化后大小为4067035字节(约4MB),超过了Kafka配置中的max.request.size参数的限制,该参数当前设置为1048576字节(1MB)

二、原因

排查kafka的max.request.size

排查发现最大的请求大小为10485760字节(10M)

排查springboot配置文件中max.request.size的值

项目中没有配置这个值

排查springboot默认kafka的max.request.size值

springboot中Kafka消息队列的默认max.request.size值通常是1048576字节,即1MB

好了就是这个原因了

三、修改方案

要解决这个问题,通常有以下几种选择:

1. 增加kafka本身的message.max.bytes配置

message.max.bytes=10485760 # 10MB

当然也可以为特定的主题设置这个值:

kafka-topics.sh --alter --zookeeper localhost:2181 --topic your_topic --config max.message.bytes=10485760 

2. 增加生产者max.request.size配置

如果是代码则配置如下

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("max.request.size", 10485760 ); // 10MB

如果是yml 文件则配置如下 

spring:
  kafka:
    producer:
      properties:
        max.request.size: 10485760 

3. 增加消费者的fetch.max.bytesmax.partition.fetch.bytes配置

如果是代码则配置如下

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("fetch.max.bytes", 10485760); // 10MB
props.put("max.partition.fetch.bytes", 10485760); // 10MB

如果是yml 文件则配置如下

spring:
  kafka:
    consumer:
      properties:
        fetch.max.bytes: 10485760
        max.partition.fetch.bytes: 10485760

4. 分割消息

如果消息非常大,可能需要考虑将其分割成较小的部分,然后在消费者端重新组装。这种方法可以避免配置过大的消息大小限制。

5.生产者压缩消息

你可以启用消息压缩来减少消息的大小。Kafka支持多种压缩算法,如gzip、snappy、lz4等。我们可以在生产者配置中启用压缩:

 如果是代码则配置如下:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.type", "gzip");

如果是yml 文件则配置如下

spring:
  kafka:
    producer:
      properties:
        compression.type: gzip


http://www.kler.cn/news/288457.html

相关文章:

  • CTF Show逆向4reserve wp--mingyue
  • 滑动窗口系列(不定长滑动窗口长度) 9/2
  • 09--kubernetes持久化存储和StatefulSet控制器
  • Ubuntu最新镜像下载,国内镜像源地址大全
  • RocketMQ集群搭建,及RocketMQ-Dashboard部署(前RocketMQ-Console)
  • vscode远程连接服务器并根据项目配置setting.json
  • 四、基本电路设计笔记——4.1 DC-DC稳压电路
  • 【Python123题库】#研究生录取数据分析A #图书数据分析(A)
  • 【算法每日一练及解题思路】判断数字是否为偶数
  • Vue实现步骤条(el-step)+Popover弹出框
  • Oracle 网络安全产品安全认证检索
  • 编程如何塑造我们的世界
  • 安宝特科技 | AR眼镜在安保与安防领域的创新应用及前景
  • 项目管理时间痛点解决百宝箱
  • 2025最新剧本杀服务平台构建攻略,Java SpringBoot+Vue,打造沉浸式用户体验!
  • 【Kubernetes部署篇】二进制搭建K8s高可用集群1.26.15版本(超详细,可跟做)
  • VMware命令
  • python基础语法(二)
  • 微软分享其首款定制人工智能芯片Maia 100的更多细节
  • ssh的小绝招,一般人我不告诉他!ssh免密登陆和第三方踏板登陆内网
  • 【负载均衡】LoadBalance场景演示
  • kafka快速上手
  • React 服务器组件
  • 智能汽车座椅制造:RFID技术助力精密加工与全程追踪
  • Getting an error trying to import environment OpenAI Gym
  • mongodb 时间存储使用Date还是时间戳
  • 【Python机器学习】NLP词频背后的含义——主成分分析
  • 使⽤docker部署project-exam-system(2)
  • [翻译+笔记] 用于视频生成的Diffusion Model
  • codesys进行控制虚拟轴运动时出现的一些奇怪bug的解释