Kafka技术详解[3]: 生产与消费数据
目录
Kafka 生产与消费数据详解
生产数据
命令行操作
工具操作
Java API
消费数据
命令行操作
Java API
Kafka 生产与消费数据详解
生产数据
一旦消息主题创建完成,就可以通过Kafka客户端向Kafka服务器的主题中发送消息。Kafka生产者客户端是一套API接口,任何能够通过这些接口连接Kafka并发送数据的组件都可以称为Kafka生产者。以下是几种不同的生产数据方式:
命令行操作
- 打开DOS窗口,进入
E:\kafka_2.12-3.6.1\bin\windows
目录。 - 在DOS窗口输入指令,进入生产者控制台。Kafka通过
kafka-console-producer.bat
文件进行消息生产者操作。必须传递的参数包括:--bootstrap-server
: 连接Kafka服务器,默认端口为9092,参数值为localhost:9092
。--topic
: 已经创建好的主题名称。
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test
- 控制台生产数据时,每次输入的数据需要回车确认才能发送到Kafka服务器。
工具操作
对于需要更直观操作的情况,可以使用专门的工具进行快速访问,例如kafkatool_64bit.exe
。
- 安装该工具后,打开工具。
- 点击左上角按钮
File -> Add New Connection...
建立连接。 - 点击
Test
按钮测试连接。 - 增加连接后,按照工具中的步骤生产数据。
- 增加成功后,点击绿色箭头按钮进行查询,工具会显示当前数据。
Java API
通常,也可通过Java程序来生产数据。以下是在IDEA中使用Kafka Java API来生产数据的示例:
- 创建Kafka项目。
- 修改
pom.xml
文件,增加Maven依赖。<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.6.1</version> </dependency> </dependencies>
- 创建
com.lzl.kafka.test.KafkaProducerTest
类,并添加main
方法以及生产者代码。package com.lzl.kafka.test; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.HashMap; import java.util.Map; public class KafkaProducerTest { public static void main(String[] args) { // 配置属性集合 Map<String, Object> configMap = new HashMap<>(); // 配置属性:Kafka服务器集群地址 configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 配置属性:Kafka生产的数据为KV对,所以在生产数据进行传输前需要分别对K,V进行对应的序列化操作 configMap.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); configMap.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // 创建Kafka生产者对象,建立Kafka连接 KafkaProducer<String, String> producer = new KafkaProducer<>(configMap); // 准备数据,定义泛型 // 构造对象时需要传递 【Topic主题名称】,【Key】,【Value】三个参数 ProducerRecord<String, String> record = new ProducerRecord<>("test", "key1", "value1"); // 生产(发送)数据 producer.send(record); // 关闭生产者连接 producer.close(); } }
消费数据
一旦消息通过生产者客户端发送到了Kafka服务器,就可以通过消费者客户端对特定主题的消息进行消费。
命令行操作
- 打开DOS窗口,进入
E:\kafka_2.12-3.6.1\bin\windows
目录。 - 输入指令,进入消费者控制台。使用
kafka-console-consumer.bat
文件进行消息消费者操作。必需的参数包括:--bootstrap-server
: 连接Kafka服务器,默认端口为9092。--topic
: 主题名称。--from-beginning
: 标记参数,确保从第一条数据开始消费。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
Java API
同样地,可以通过Java程序来消费数据。以下是在IDEA中使用Kafka Java API消费数据的示例:
- 创建Maven项目并增加Kafka依赖。
- 创建
com.lzl.kafka.test.KafkaConsumerTest
类,并添加main
方法以及消费者代码。package com.lzl.kafka.test; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; public class KafkaConsumerTest { public static void main(String[] args) { // 配置属性集合 Map<String, Object> configMap = new HashMap<String, Object>(); // 配置属性:Kafka集群地址 configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 配置属性: Kafka传输的数据为KV对,所以需要对获取的数据分别进行反序列化 configMap.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); configMap.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); // 配置属性: 读取数据的位置 ,取值为earliest(最早),latest(最晚) configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 配置属性: 消费者组 configMap.put("group.id", "lzl"); // 配置属性: 自动提交偏移量 configMap.put("enable.auto.commit", "true"); // 创建消费者对象 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configMap); // 消费者订阅指定主题的数据 consumer.subscribe(Collections.singletonList("test")); while (true) { // 每隔100毫秒,抓取一次数据 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 打印抓取的数据 for (ConsumerRecord<String, String> record : records) { System.out.println("K = " + record.key() + ", V = " + record.value()); } } } }