当前位置: 首页 > article >正文

Java Kafka生产者实现


在这里插入图片描述
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。
在这里插入图片描述

  • 推荐:「stormsha的主页」👈,「stormsha的知识库」👈持续学习,不断总结,共同进步,为了踏实,做好当下事儿~

  • 专栏导航

    • Python系列: Python面试题合集,剑指大厂
    • Git系列: Git操作技巧
    • GO系列: 记录博主学习GO语言的笔记,该笔记专栏尽量写的试用所有入门GO语言的初学者
    • 数据库系列: 详细总结了常用数据库 mysql 技术点,以及工作中遇到的 mysql 问题等
    • 运维系列: 总结好用的命令,高效开发
    • 算法与数据结构系列: 总结数据结构和算法,不同类型针对性训练,提升编程思维

    非常期待和您一起在这个小小的网络世界里共同探索、学习和成长。💝💝💝 ✨✨ 欢迎订阅本专栏 ✨✨

    💖The Start💖点点关注,收藏不迷路💖

    📒文章目录


下面是一个可以连接多个节点的Kafka生产者类,并且在其它文件中调用生产者发送消息的示例代码。代码包含了Kafka连接失败和发送消息失败的异常处理。

首先,确保你已经导入了Kafka的依赖。如果你使用的是Maven,可以在pom.xml文件中添加以下依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.0.0</version>
</dependency>

接下来是Kafka生产者类的实现:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerExample {
    private KafkaProducer<String, String> producer;

    public KafkaProducerExample(String bootstrapServers) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        try {
            producer = new KafkaProducer<>(props);
        } catch (Exception e) {
            System.err.println("Failed to create Kafka producer: " + e.getMessage());
            throw new RuntimeException(e);
        }
    }

    public void sendMessage(String topic, String key, String value) {
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
        try {
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.err.println("Failed to send message: " + exception.getMessage());
                    } else {
                        System.out.println("Message sent successfully to topic " + metadata.topic() +
                                " partition " + metadata.partition() + " with offset " + metadata.offset());
                    }
                }
            });
        } catch (Exception e) {
            System.err.println("Failed to send message: " + e.getMessage());
        }
    }

    public void close() {
        producer.close();
    }
}

然后是在其它文件中调用生产者发送消息的示例代码:

public class KafkaProducerDemo {
    public static void main(String[] args) {
        String bootstrapServers = "localhost:9092,localhost:9093,localhost:9094";
        KafkaProducerExample producerExample = new KafkaProducerExample(bootstrapServers);
        
        try {
            producerExample.sendMessage("test-topic", "key1", "value1");
            producerExample.sendMessage("test-topic", "key2", "value2");
        } catch (Exception e) {
            System.err.println("Exception occurred while sending messages: " + e.getMessage());
        } finally {
            producerExample.close();
        }
    }
}

在上面的代码中,我们创建了一个KafkaProducerExample类,该类的构造函数接受一个包含多个节点的Kafka集群地址字符串。sendMessage方法用于发送消息,并处理可能的异常。如果Kafka连接失败,或者消息发送失败,都会打印错误信息。

KafkaProducerDemo类中,我们实例化了KafkaProducerExample,并调用了sendMessage方法发送消息,最后关闭了生产者实例。这样可以确保资源被正确释放。

你可以根据需要修改主题名、消息内容以及Kafka集群的地址。希望这些代码能帮助你实现功能。


🔥🔥🔥道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙

💖The End💖点点关注,收藏不迷路💖

http://www.kler.cn/news/294275.html

相关文章:

  • 在线式环氧乙烷检测仪:现代工业生产中的环氧乙烷安全监测
  • 一文学会Flask-Login
  • 12. 如何在MyBatis中进行分页查询?常见的分页实现方式有哪些?
  • MySQL的服务器与客户端:架构解析与实践
  • 人工智能训练师边缘计算实训室解决方案
  • 当水泵遇上物联网:智能水务新时代的浪漫交响
  • spring boot项目中配置文件配置mapper*.xml文件路径无效的问题排查记录
  • Vatee万腾平台:科技赋能,开启企业数字化转型新篇章
  • 生日贺卡录放音芯片,多段音频录音ic生产厂商,NVF04M-32minute
  • Codeforces Round 971 (Div. 4) (A~G1)
  • 字节6面,面爆炸了
  • 智慧公厕技术应用、系统架构、应用功能有哪些?@卓振思众
  • C#中LINQ的Cast<T>与OfType<T>
  • DevOps学习笔记
  • 基于SpringBoot+Vue+MySQL的校园周边美食探索及分享平台
  • 初识jQuery
  • Android 15 新特性快速解读指南
  • 使用bert_base_chinese实现文本语义相似度计算
  • Spring Boot-自定义banner
  • 视频提取字幕的软件有哪些?高效转录用这些
  • react的useRef作用是什么怎么使用
  • Android Camera系列(一):SurfaceView+Camera
  • 数据结构,单向链表
  • 【2024高教社杯全国大学生数学建模竞赛】B题完整解析(含论文、代码分享)
  • 7个 C# 高阶用法详解:从基础到实战
  • 微信小程序实践案例
  • Kafka Broker处于高负载状态(例如消息处理量大或系统资源不足),无法及时响应消费者的请求
  • 智能工厂程序设计 之1 智能工厂都本俱的方面(Facet,Aspect和Respect)即智能依赖的基底Substrate 之1
  • 机械学习—零基础学习日志(概率论总笔记4)
  • JAVA基础:JVM中方法的执行过程和方法的重载,递归,可变参数的含义