Kafka-1
下载地址:
Apache Kafka
Apache ZooKeeper
- 消息分区有序,根据offset偏移量顺序消费消息
下载并解压缩Kafka
启动 ZooKeeper
当前版本 Kafka 软件内部依然依赖 ZooKeeper 进行多节点协调调度,所以启动 Katka软件之前,需要先启动 ZooKeeper 软件。不过因为Kafka 软件本身内置了 ZooKeeper 软件所以无需额外安装 ZooKeeper 软件,直接调用脚本命令启动即可。具体操作步骤如下:
- 进入 Kafka 解压缩文件夹的 config 目录,修改 zookeeper.properties 配置文件
- 新建
- 修改
- 启动命令:
call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
直接把这条命令放到一个新建的zk.cmd文件当中,以后直接点击该cmd命令直接启动
启动Kafka:
- 修改server.properties:
- 配置cmd 启动文件
注意:启动先zookeeper再kafka,关闭先kafka再zookeeper
命令行操作:
调用D:\kafka\local\bin\windows下的kafka-topics.bat脚本创建主题topic:
kafka-topics.bat --bootstrap-server localhost:9092 --topic test123 --create
-
kafka-topics.bat
:这是 Kafka 提供的一个脚本,用于执行与 Kafka 主题相关的操作。.bat
扩展名表明这是一个 Windows 批处理文件。 -
--bootstrap-server localhost:9092
:这个参数指定了 Kafka 集群中的一个或多个代理服务器(broker)的地址和端口。在这个例子中,它指向本地主机(localhost)上的 Kafka 服务,端口号为 9092。 -
--topic test123
:这个参数指定了要创建的主题的名称。在这个例子中,主题名称为test123
。 -
--create
:这个参数告诉 Kafka 创建指定的主题。
创建生产者:
kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test123
创建消费者:
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test123
代码演示
生产者代码:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
// 1. 配置生产者参数
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Kafka 集群地址
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all"); // 消息确认机制
// 2. 创建生产者实例
Producer<String, String> producer = new KafkaProducer<>(props);
// 3. 发送消息
for (int i = 0; i < 10; i++) {
String message = "Message-" + i;
ProducerRecord<String, String> record =
new ProducerRecord<>("test123", "key-" + i, message);
// 异步发送(带回调)
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("消息发送成功 -> topic:%s, partition:%d, offset:%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
}
// 4. 关闭生产者
producer.close();
}
}
消费者:
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
// 1. 配置消费者参数
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "test-group"); // 消费者组ID
props.put("auto.offset.reset", "earliest"); // 从最早的消息开始消费
// 2. 创建消费者实例
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// 3. 订阅主题
consumer.subscribe(Collections.singletonList("test123"));
// 4. 轮询消费消息
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息 -> topic:%s, partition:%d, offset:%d, key:%s, value:%s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
}
} finally {
consumer.close();
}
}
}
运行后生产者收到消息: