当前位置: 首页 > article >正文

Broker: Unknown topic or partition 问题解决

./kafka-console-producer.sh --broker-list XXX:9092 --topic kafka_test < ttt.txt
报错:Broker: Unknown topic or partition
生产者端失败重试
配置重试参数:在Kafka生产者的配置中,可以设置retries和retry.backoff.ms参数来启用重试机制。
异步发送与回调:使用异步发送消息,并在回调中处理发送失败的情况。
异常处理:在回调中对异常进行分类处理,对于可恢复的错误进行重试,对于不可恢复的错误进行日志记录或报警。
幂等性:确保生产者发送消息的逻辑是幂等的,即使消息被重复发送也不会影响系统状态。
Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
props.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);
props.put(“retries”, 3); // 设置重试次数
props.put(“retry.backoff.ms”, 1000); // 设置重试间隔
props.put(“acks”, “all”); // 确保消息被所有副本确认

Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>(“topic”, “key”, “value”), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// 处理发送失败的逻辑
// 可以选择重试或记录日志
}
}
});

消费者端失败重试
手动提交偏移量:设置enable.auto.commit为false,手动提交偏移量,以便在消息处理成功后才提交。
重试策略:使用SpringKafka的SeekToCurrentErrorHandler或自定义错误处理逻辑来实现重试。
死信队列:对于重试次数达到上限仍然失败的消息,发送到死信队列(DLQ)进行后续处理。
幂等性:确保消费者处理消息的逻辑是幂等的,即使消息被重复处理也不会影响系统状态。
错误处理:在处理消息时,对可能抛出的异常进行捕获和处理,根据异常类型决定是否重试。
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate()),
new FixedBackOff(3000, 3)
));
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}

@KafkaListener(topics = “topic”, containerFactory = “kafkaListenerContainerFactory”)
public void listen(ConsumerRecord<String, String> record) {
try {
// 处理消息
} catch (Exception e) {
// 处理失败逻辑,消息将被重新消费
}
}


http://www.kler.cn/a/545308.html

相关文章:

  • 爬虫代码中如何设置请求间隔?
  • Android 原生层SurfaceView截屏
  • PL/SQL 变量以及数据类型(上篇)
  • 【React】react-redux+redux-toolkit实现状态管理
  • 基于WebAssembly的后端服务突破:打造高性能、安全的新型微服务架构
  • 27、深度学习-自学之路-NLP自然语言处理-做一个简单的项目识别一组电影评论,来判断电影评论是积极的,还是消极的。
  • Golang 进阶训练营
  • Rocky Linux系统修改网卡全攻略
  • 【清晰教程】本地部署DeepSeek-r1模型
  • C语言插入排序之直接插入排序
  • 【网络安全 | 漏洞挖掘】价值3133美元的Google IDOR
  • VS Code 通知中一直显示“Reactivating terminals...”的问题解决
  • 解决 paddle ocr 遇到 CXXABI_1.3.13 not found 的问题
  • JAVA DDD设计模式:用策略模式干掉满屏if-else
  • 吉祥汽车泰国首发,用 Unity 实现行业首创全 3D 座舱虚拟世界
  • 【深度学习】计算机视觉(CV)-目标检测-SSD(Single Shot MultiBox Detector)—— 单次检测多框检测器
  • 了解卷积神经网络(Convolutional Neural Network,CNN)
  • React组件重新渲染机制
  • App UI自动化--Appium学习--第二篇
  • 【Elasticsearch】标准化器(Normalizers)