当前位置: 首页 > article >正文

Kafka入门:Java客户端库的使用

在现代的分布式系统中,消息队列扮演着至关重要的角色,而Apache Kafka以其高吞吐量、可扩展性和容错性而广受欢迎。本文将带你了解如何使用Kafka的Java客户端库来实现生产者(Producer)和消费者(Consumer)的基本操作。

Kafka简介

Apache Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流式应用程序。它具有高吞吐量、可扩展性和容错性,适用于处理实时数据。

环境准备

在开始之前,请确保你已经安装了以下环境:

  1. Java开发环境(JDK)
  2. Kafka服务器(可以是本地安装或远程服务器)
  3. Kafka客户端库(通常通过Maven或Gradle引入)

Maven依赖

如果你使用Maven来管理项目依赖,可以在pom.xml文件中添加以下依赖:

<dependencies>
    <!-- Kafka客户端依赖 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.2.0</version>
    </dependency>
</dependencies>

Kafka生产者(Producer)

生产者负责将消息发送到Kafka的特定主题(Topic)。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        // 配置属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建生产者
        Producer<String, String> producer = new KafkaProducer<>(props);

        // 创建消息
        ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key", "value");

        // 发送消息
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                exception.printStackTrace();
            } else {
                System.out.println("Message sent successfully");
            }
        });

        // 关闭生产者
        producer.close();
    }
}

Kafka消费者(Consumer)

消费者负责从Kafka的特定主题(Topic)接收消息。

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        // 配置属性
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");

        // 创建消费者
        Consumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("test-topic"));

        // 消费消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

运行截图

生产者:

消费者:

总结

通过上述示例,我们了解了如何使用Kafka的Java客户端库来创建生产者和消费者。这些基本操作是构建基于Kafka的分布式应用的基石。Kafka的强大功能远不止于此,包括但不限于消息持久化、分区、复制等高级特性,这些都需要在实际项目中根据具体需求进行深入学习和应用。

希望这篇文章能帮助你快速入门Kafka的Java客户端库使用。如果你有任何问题或需要进一步的指导,请随时留言讨论。


http://www.kler.cn/a/396001.html

相关文章:

  • 浅层神经网络
  • 华为数字化转型的本质为何是管理变革
  • Kafka-Eagle的配置——kafka可视化界面
  • ROS进阶:使用URDF和Xacro构建差速轮式机器人模型
  • 【eNSP】路由基础与路由来源——静态路由实验
  • 通过Python 调整Excel行高、列宽
  • C语言.冒泡排序的练习
  • 在word文档中,内容是一段英文,一段英文的显示,且段落的前后都有空行,我如何只去掉英文段落后面的空行。
  • 25浙江省考-28天学行测-Day5 Day6-判断推理(中)
  • reduce-scatter:适合分布式计算;Reduce、LayerNorm和Broadcast算子的执行顺序对计算结果的影响,以及它们对资源消耗的影响
  • R门 - rust第一课陈天 -内存知识学习笔记
  • Apache Doris:监控与运维及系统调优
  • 【RabbitMQ】07-业务幂等处理
  • Tomcat NIO 配置实操指南
  • JVM——类加载器、类加载器的分类
  • 【Ubuntu侧边菜单点击没反应】【Ubuntu 20.04】【浏览器、文件夹点击没反应】
  • LabVIEW开发相机与显微镜自动对焦功能
  • 消息中间件分类
  • 《Django 5 By Example》阅读笔记:p17-p53
  • 去中心化存储:Web3数据安全新标准
  • Wireshark中的length栏位
  • YOLO理解
  • 一个C++线程安全的栈数据结构的例子
  • 电子商务系统设计与Spring Boot框架
  • 对话 OpenCV 之父 Gary Bradski:灾难性遗忘和持续学习是尚未解决的两大挑战 | Open AGI Forum
  • cls(c基础)