当前位置: 首页 > article >正文

Kafka技术详解[3]: 生产与消费数据

目录

Kafka 生产与消费数据详解

 生产数据

 命令行操作

 工具操作

Java API

 消费数据

 命令行操作

Java API


Kafka 生产与消费数据详解

 生产数据

一旦消息主题创建完成,就可以通过Kafka客户端向Kafka服务器的主题中发送消息。Kafka生产者客户端是一套API接口,任何能够通过这些接口连接Kafka并发送数据的组件都可以称为Kafka生产者。以下是几种不同的生产数据方式:

 命令行操作
  1. 打开DOS窗口,进入E:\kafka_2.12-3.6.1\bin\windows目录。
  2. 在DOS窗口输入指令,进入生产者控制台。Kafka通过kafka-console-producer.bat文件进行消息生产者操作。必须传递的参数包括:
    • --bootstrap-server: 连接Kafka服务器,默认端口为9092,参数值为localhost:9092
    • --topic: 已经创建好的主题名称。
    指令如下:
    kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test
  3. 控制台生产数据时,每次输入的数据需要回车确认才能发送到Kafka服务器。
 工具操作

对于需要更直观操作的情况,可以使用专门的工具进行快速访问,例如kafkatool_64bit.exe

  1. 安装该工具后,打开工具。
  2. 点击左上角按钮File -> Add New Connection...建立连接。
  3. 点击Test按钮测试连接。
  4. 增加连接后,按照工具中的步骤生产数据。
  5. 增加成功后,点击绿色箭头按钮进行查询,工具会显示当前数据。
Java API

通常,也可通过Java程序来生产数据。以下是在IDEA中使用Kafka Java API来生产数据的示例:

  1. 创建Kafka项目。
  2. 修改pom.xml文件,增加Maven依赖。
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.6.1</version>
        </dependency>
    </dependencies>
  3. 创建com.lzl.kafka.test.KafkaProducerTest类,并添加main方法以及生产者代码。
    package com.lzl.kafka.test;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.HashMap;
    import java.util.Map;
    
    public class KafkaProducerTest {
        public static void main(String[] args) {
            // 配置属性集合
            Map<String, Object> configMap = new HashMap<>();
            // 配置属性:Kafka服务器集群地址
            configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            // 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作
            configMap.put(
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringSerializer");
            configMap.put(
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringSerializer");
            // 创建Kafka生产者对象,建立Kafka连接
            KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);
            // 准备数据,定义泛型
            // 构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数
            ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1", "value1");
            // 生产(发送)数据
            producer.send(record);
            // 关闭生产者连接
            producer.close();
        }
    }

 消费数据

一旦消息通过生产者客户端发送到了Kafka服务器,就可以通过消费者客户端对特定主题的消息进行消费。

 命令行操作
  1. 打开DOS窗口,进入E:\kafka_2.12-3.6.1\bin\windows目录。
  2. 输入指令,进入消费者控制台。使用kafka-console-consumer.bat文件进行消息消费者操作。必需的参数包括:
    • --bootstrap-server: 连接Kafka服务器,默认端口为9092。
    • --topic: 主题名称。
    • --from-beginning: 标记参数,确保从第一条数据开始消费。
    指令如下:
    kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
Java API

同样地,可以通过Java程序来消费数据。以下是在IDEA中使用Kafka Java API消费数据的示例:

  1. 创建Maven项目并增加Kafka依赖。
  2. 创建com.lzl.kafka.test.KafkaConsumerTest类,并添加main方法以及消费者代码。
    package com.lzl.kafka.test;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    
    public class KafkaConsumerTest {
        public static void main(String[] args) {
            // 配置属性集合
            Map<String, Object> configMap = new HashMap<String, Object>();
            // 配置属性:Kafka集群地址
            configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            // 配置属性: Kafka传输的数据为KV对,所以需要对获取的数据分别进行反序列化
            configMap.put(
                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringDeserializer");
            configMap.put(
                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                    "org.apache.kafka.common.serialization.StringDeserializer");
            // 配置属性: 读取数据的位置 ,取值为earliest(最早),latest(最晚)
            configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            // 配置属性: 消费者组
            configMap.put("group.id", "lzl");
            // 配置属性: 自动提交偏移量
            configMap.put("enable.auto.commit", "true");
            // 创建消费者对象
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configMap);
            // 消费者订阅指定主题的数据
            consumer.subscribe(Collections.singletonList("test"));
            while (true) {
                // 每隔100毫秒,抓取一次数据
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                // 打印抓取的数据
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("K = " + record.key() + ", V = " + record.value());
                }
            }
        }
    }

http://www.kler.cn/news/319108.html

相关文章:

  • 液体泄漏泼溅检测系统源码分享
  • 接口测试学习随笔 .. ..哪些参数为必填,以及接口测试中参数的含义.. ..
  • HarmonyOS Next鸿蒙扫一扫功能实现
  • Kubernetes监控、日志记录和运行时安全:CKS考试核心知识点实践
  • 【车联网安全】车端网络攻击及检测的框架/模型
  • 9月24日笔记
  • 探索未来的IT发展方向:技术与创新的融合
  • AI绘画+副业:让您的手机壁纸、微信头像都充满了个性,姓氏头像,情侣壁纸等
  • Chat2DB:AI驱动SQL编辑器,开启智能数据库管理新时代
  • Oracle查询(下)
  • 两张图讲透软件测试实验室认证技术体系与质量管理体系
  • Linux安装火狐游览器
  • python assert 断言用法
  • VuePress搭建文档网站/个人博客(详细配置)主题配置-侧边栏配置
  • Spring Boot 注解拦截器实现审计日志功能
  • Stable Diffusion 的 ControlNet 主要用途
  • 使用 Flask-Limiter 和 Nginx 实现接口访问次数限制
  • 美畅物联丨海康威视摄像机固件升级指南
  • msvcp140.dll丢失怎么办,总结6种解决msvcp140.dll丢失的方法
  • 代码随想录算法day40 | 动态规划算法part13 | 647. 回文子串,516.最长回文子序列
  • Android 模拟按键功能实现
  • 宠物智能听诊器宠物健康管理设备
  • Java中的位图和布隆过滤器(如果想知道Java中有关位图和布隆过滤器的知识点,那么只看这一篇就足够了!)
  • 分享6个icon在线生成网站,支持AI生成
  • 生信初学者教程(四):软件
  • C语言实现简单凯撒密码算法
  • springboot使用minio(8.5.11)
  • 某易易盾验证码逆向
  • vue3 选择字体的颜色,使用vue3-colorpicker来选择颜色
  • Spring Boot集成LiteFlow使用详解