kafka入门(一):kafka消息消费
安装kafka,创建 topic:
Windows安装kafka, 详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851
Linux 安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353
添加依赖包:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
kafka代码示例(一):
主要按照以下步骤:
-
设置 broker服务器的ip和端口, 设置 消费者群组id
-
初始化消费者
-
消费者订阅主题
-
消费者批量拉取消息
public class KafkaDemo1 {
public static final String BROKER_LIST = "localhost:9092";
public static final String TOPIC = "myTopic1";
public static final String GROUP_ID = "group.demo";
public static void main(String[] args) {
consumerRecord();
}
public static void consumerRecord() {
//属性配置
Properties properties = getProperties(BROKER_LIST, GROUP_ID);
//消费者初始化
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
//消息者订阅主题
consumer.subscribe(Collections.singletonList(TOPIC));
//循环
while (true) {
//每次拉取 1千条消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("=============> 消费kafka消息:"+ record.value());
}
}
}
public static Properties getProperties(String brokerList, String groupId) {
Properties properties = new Properties();
//序列化
properties.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
//broker服务器的ip和端口,多个用逗号隔开
properties.put("bootstrap.servers", brokerList);
//消费者群组id
properties.put("group.id", groupId);
return properties;
}
}
使用文章开头安装好的 kafka,并按文章中的步骤,创建 topic ,打开一个 生产者 producer,并发送消息。
观察idea 控制台,可以看到 成功消费了消息:
=============> 消费kafka消息:hello kafka
参考资料:
《深入理解kafka 核心设计与实践原理》