当前位置: 首页 > article >正文

在 Java 中实现 Kafka Producer 的单例模式


在这里插入图片描述
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。
在这里插入图片描述

  • 推荐:「stormsha的主页」👈,「stormsha的知识库」👈持续学习,不断总结,共同进步,为了踏实,做好当下事儿~

  • 专栏导航

    • Python系列: Python面试题合集,剑指大厂
    • Git系列: Git操作技巧
    • GO系列: 记录博主学习GO语言的笔记,该笔记专栏尽量写的试用所有入门GO语言的初学者
    • 数据库系列: 详细总结了常用数据库 mysql 技术点,以及工作中遇到的 mysql 问题等
    • 运维系列: 总结好用的命令,高效开发
    • 算法与数据结构系列: 总结数据结构和算法,不同类型针对性训练,提升编程思维

    非常期待和您一起在这个小小的网络世界里共同探索、学习和成长。💝💝💝 ✨✨ 欢迎订阅本专栏 ✨✨

    💖The Start💖点点关注,收藏不迷路💖

    📒文章目录

      • 一、前言
      • 二、Kafka Producer 的基本配置
      • 三、实现 Kafka Producer 的单例模式
      • 四、使用 Kafka Producer 发送消息
      • 五、总结


一、前言

在分布式系统中,Apache Kafka 是一个非常受欢迎的消息中间件。它提供了高吞吐量、低延迟的消息传递机制,非常适合处理实时数据流。本文将介绍如何在 Java 中使用 Kafka Producer 并实现单例模式,以确保资源的有效管理。

Kafka 是一个分布式流处理平台,它的核心功能包括发布和订阅记录流、存储流记录、以及处理流记录。为了充分利用 Kafka 的功能,一个高效的 Kafka 生产者(Producer)是必要的。在生产环境中,使用单例模式可以确保 Kafka Producer 资源的唯一性和线程安全性。

二、Kafka Producer 的基本配置

在开始之前,我们需要引入一些必要的依赖。假设我们使用 Maven 项目,pom.xml 文件中需要添加以下依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-simple</artifactId>
        <version>1.7.30</version>
    </dependency>
</dependencies>

三、实现 Kafka Producer 的单例模式

下面是完整的 QuoteKafkaProducer 类,包含了 Kafka Producer 的配置、消息发送和关闭方法。

package com.stormsha.util;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class QuoteKafkaProducer {

    private static final Logger logger = LoggerFactory.getLogger(QuoteKafkaProducer.class);

    private static String bootstrapServers;  // Kafka 服务器的地址
    private static KafkaProducer<String, String> producer;  // Kafka Producer 实例
    private static final Object lock = new Object();  // 锁对象,用于线程安全的单例模式

    /**
     * 获取 Kafka Producer 单例实例
     *
     * @param servers Kafka 服务器地址
     * @return KafkaProducer 实例
     */
    public static KafkaProducer<String, String> getInstance(String servers) {
        if (isLocalEnvironment()) {  // 本地启动不需要实例化kafka
            return producer;
        }
        if (producer == null) {  // 双重检查锁定机制,确保单例实例的唯一性和线程安全
            synchronized (lock) {
                if (producer == null) {
                    bootstrapServers = servers;  // 设置 Kafka 服务器地址
                    // 配置 Kafka Producer 属性
                    Properties props = new Properties();
                    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);  // 设置 Kafka 服务器地址
                    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  // 设置键的序列化器
                    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());  // 设置值的序列化器

                    // 创建 Kafka Producer 实例
                    producer = new KafkaProducer<>(props);
                    
                    // JVM 关闭时,确保 Kafka producer 被关闭
                    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                        logger.info("关闭 Kafka Producer");
                        close();
                    }));
                }
            }
        }
        return producer;
    }

    /**
     * 发送 Kafka 消息
     *
     * @param topic 目标主题
     * @param key   消息键
     * @param value 消息值
     */
    public static void sendMessage(String topic, String key, String value) {
        if (isLocalEnvironment()) {  // 本地启动不需要实例化kafka
            return;
        }

        // 获取 Kafka Producer 单例实例
        KafkaProducer<String, String> producer = QuoteKafkaProducer.getInstance(bootstrapServers);

        // 创建一条消息,包含topic、key 和 value
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);

        // 异步发送消息
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                // 发送失败的处理
                logger.error("消息发送失败", exception);
            } else {
                // 发送成功的处理
                logger.info("消息发送成功: 主题: {}, 分区: {}, 偏移量: {}", metadata.topic(), metadata.partition(), metadata.offset());
            }
        });
    }

    /**
     * 关闭 Kafka Producer 实例
     */
    public static void close() {
        if (producer != null) {
            synchronized (lock) {
                if (producer != null) {
                    producer.close();  // 关闭 Kafka Producer 实例
                    producer = null;  // 清空 Kafka Producer 实例
                }
            }
        }
    }

    /**
     * 判断当前程序是否在本地环境启动
     *
     * @return true 如果在本地环境启动,否则返回 false
     */
    private static boolean isLocalEnvironment() {
        // 获取环境变量
        String env = System.getenv().getOrDefault("ENV", "dev");
        // 返回是否为本地环境标志
        return "local".equals(env);
    }
}

四、使用 Kafka Producer 发送消息

以下是如何使用 QuoteKafkaProducer 类发送 Kafka 消息的示例:

// 使用示例
String servers = "127.0.0.1:9092,127.0.0.1:9092";
QuoteKafkaProducer.getInstance(servers);
QuoteKafkaProducer.sendMessage("topicId", "dataKey", "发送的内容");

五、总结

通过实现 Kafka Producer 的单例模式,我们可以确保 Kafka Producer 在整个应用程序中是唯一的,并且在多线程环境下是安全的。同时,通过引入日志记录,我们可以更好地监控消息的发送状态和处理潜在的异常。这种模式不仅提高了资源的利用效率,还简化了资源管理,特别是在处理高并发和大规模数据流的应用中。

希望本文对你在实际项目中使用 Kafka Producer 提供一些帮助。如果有任何问题或建议,欢迎在评论区讨论。


🔥🔥🔥道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙

💖The End💖点点关注,收藏不迷路💖

http://www.kler.cn/news/305895.html

相关文章:

  • Java实现建造者模式和源码中的应用
  • 俄罗斯方块——C语言实践(Dev-Cpp)
  • random.randrange与torch.arange的用法
  • Spring 源码解读:自定义实现BeanPostProcessor的扩展点
  • 热门远程控制工具大盘点,职场必备
  • Java架构师实战篇Redis亿级数据统计方案
  • 【智路】智路OS Perception Camera Service
  • 【JAVA开源】基于Vue和SpringBoot的在线旅游网站
  • sheng的学习笔记-AI-FOIL(First-Order Inductive Learner)
  • conda、anaconda、pip、torch、pytorch、tensorflow到底是什么东西?(转载自本人的知乎回答)
  • php转职golang第一期
  • 深度学习-物体检测YOLO(You only look once)
  • 【SQL】百题计划:SQL对于空值的比较判断。
  • Linux学习笔记8 理解Ubuntu网络管理,做自己网络的主人
  • 一家电子元件企业终止,业绩规模小,疑似通过收购调节收入利润
  • 大数据-136 - ClickHouse 集群 表引擎详解1 - 日志、Log、Memory、Merge
  • windows 安全与网络管理问题
  • 【人工智能学习笔记】6_自然语言处理基础
  • 借老系统重构我准备写个迷你版apiFox
  • <Linux> 进程间通信
  • 医疗机构关于DIP/DRG信息化建设
  • 【linux】cat 命令
  • 什么是MIPI接口?MIPI相机是如何工作的?
  • 算法_优先级队列---持续更新
  • mysql组合键唯一
  • HTTP 四、HttpClient的使用
  • 一文带你全面了解RAID技术:从基础到进阶的全景解析
  • 大厂硬件梦:字节、腾讯“向首”,华为、小米“向手”
  • 设计模式之建造者模式(通俗易懂--代码辅助理解【Java版】)
  • MSYS vs MSYS2:功能、兼容性与易用性全面比拼,助你挑选最佳Windows开发伴侣