Kafka安装
Kfka安装
Kafka3.4开始建议使用JDK17
安装Kafka需要有JDK环境
JDK安装
Java Downloads | Oracle
往下拉
下载需要登录
这里没有下载下来 报400 用以前下载的
mkdir /usr/local/develop/
cd /usr/local/develop
tar -zxvf /usr/local/develop/jdk-17.0.10_linux-x64_bin.tar.gz -C /usr/local/develop/
配置JAVA_HOME
1.系统环境
vim /etc/profile
到最下面
export JAVA_HOME=/usr/local/develop/jdk-17.0.10
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib
让环境变量生效
source /etc/profile
java -version 查看jdk版本 至此JDK安装完成
安装Kafka
tar -zxvf /usr/local/develop/kafka-3.9.0-src.tgz -C /usr/local/develop/
下载下面这个 如果下载源码 可能还需要调整scala环境
tar -zxvf /usr/local/develop/kafka_2.13-3.9.0.tgz -C /usr/local/develop/
解压之后
Kafka的bin下有很多脚本文件
Kafka首先需要启动Kafka服务 然后创建Topic 生产者和消费者可以通过Topic进行消息传递
启动Zookeeper
在启动Kafka服务前需要先启动ZooKeeper做为配置中心
启动Kafka自带的Zookeeper 并指定Zookeeper的配置文件 即使不指定应该也是用的这个配置文件
nohup /usr/local/develop/kafka_2.13-3.9.0/bin/zookeeper-server-start.sh /usr/local/develop/kafka_2.13-3.9.0/config/zookeeper.properties > /usr/local/develop/zookeeper.log 2>&1 &
查看zookeeper日志
tail -f /usr/local/develop/zookeeper.log
停止zookeeper
/usr/local/develop/kafka_2.13-3.9.0/bin/zookeeper-server-stop.sh
启动Kafka
启动zookeeper之后 可以启动Kafka
nohup /usr/local/develop/kafka_2.13-3.9.0/bin/kafka-server-start.sh /usr/local/develop/kafka_2.13-3.9.0/config/server.properties > /usr/local/develop/kafka.log 2>&1 &
关闭Kafka /usr/local/develop/kafka_2.13-3.9.0/bin/kafka-server-stop.sh
这里注意 因为原先启动Zookeeper使用的是Kafka自带的 因此
如果 你 ps -ef|grep kafka 即使有输出 也不是kafka服务 是kafka中zookeeper的服务 别搞错了
Kafka基础工作机制
创建Topic --bootstrap-server
后面跟的是 Kafka Broker 的主机名(或 IP 地址)和端口号,格式为:hostname:port
。例如,localhost:9092
就是连接到本地机器的 Kafka 服务,端口是 9092
Kafka服务默认端口号9092
创建一个Topic
/usr/local/develop/kafka_2.13-3.9.0/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
/usr/local/develop/kafka_2.13-3.9.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
默认情况下 从队列最后开始消费
- 默认情况下,如果消费者启动时,消费者将从最新的消息(
latest
)开始消费,即它只会接收从启动后生产的消息,而错过之前的消息。 - 如果你希望消费者从最开始的消息(
earliest
)开始消费,可以通过配置来修改。Spring Kafka 提供了类似auto-offset-reset
的配置项,用于指定消费者启动时的消费策略。例如,配置为earliest
,即使消费者之前没有消费过该 Topic,它也会从最早的消息开始消费。
就可以收到历史消息
/usr/local/develop/kafka_2.13-3.9.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
从指定第几条消息开始消费 这里表示第三条消息 就是下标2
partition(分区)是Topic中真实存消息的地方 Topic中有多个partition --partition 0 就是0分区
/usr/local/develop/kafka_2.13-3.9.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 2
消费者组的概念
创建Topic之后 可以指定消费者组
/usr/local/develop/kafka_2.13-3.9.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group testGroup
如果你在两个窗口都执行这条命令 就是说有两个消费者组
当生产者给指定Topic发送消息后
相同的两个消费者组 只有一个会收到消息
你可以通过
/usr/local/develop/kafka_2.13-3.9.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group testGroup2
这样生产者给同一个Topic发送消息 相同Topic下的不同消费者组都可以收到消息
查看消费者组消息消费情况
/usr/local/develop/kafka_2.13-3.9.0/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe
可以指定组 查看具体消费者组消息消费情况
/usr/local/develop/kafka_2.13-3.9.0/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup
Broker和Topic概念
Kafka Broker 是 Kafka 集群中的一个服务器实例,它负责接收生产者发送的消息,存储这些消息,并将它们提供给消费者。每个 Kafka Broker 都能够接收和存储数据,也可以处理来自消费者的请求(拉取消息)。
Kafka Broker 负责将消息存储到不同的 Topic 中,但具体来说,它并不直接决定消息应该放在哪个 Topic。实际上,生产者(Producer)决定将消息发送到哪个 Topic,而 Broker 则负责接收这些消息并存储到相应的 Topic 分区中。
Kafka的配置
broker.id 保持不重复即可
里面有个
zookeeper.connect=localhost:2181 是默认的 如果你的zookeeper改变了端口 那么也需要改下
Kafka集群搭建图(这里没有集群搭建)
如果搭建Kafka集群 那么单独下载一个Zookeeper
一般来说Kafka都是集群部署
注意:单机情况下也会将数据持久化 但是不会自动备份 可以用脚本等其他方式备份
在 单机环境 下,Kafka 不会 自动备份数据,因为 Kafka 的 副本机制(Replication)是基于多 Broker 的集群架构设计的,目的是提供高可用性和容错性。在单机环境中,没有多个 Broker 来存储数据的副本,因此无法实现副本机制。
单独搭建Zookeeper
Kafka自带了Zookeeper 但一般建议 额外配个ZooKeeper
tar -zxvf /usr/local/develop/apache-zookeeper-3.9.3-bin.tar.gz -C /usr/local/develop/
/usr/local/develop/apache-zookeeper-3.9.3-bin
在/usr/local/develop/apache-zookeeper-3.9.3-bin/conf中提供了一个zoo_sample.cfg
模板配置文件
cd /usr/local/develop/apache-zookeeper-3.9.3-bin/conf
zookeeper的配置文件zoo.cfg 复制并改名为zoo.cfg
cp zoo_sample.cfg zoo.cfg
编辑修改配置文件
vim zoo.cfg
需要修改的地方
注意是dataDir 和集群节点配置
下面是集群搭建方式
3888 端口是用于 Zookeeper 集群内部的选举 和 投票通信。
这里只是测试 一般是三台服务器
要保证每台服务器之间的配置都是一样的
2888和3888可以改变,例如你三台机子,那么就要保证三台机子的配置都是这样的
admin.serverPort=8888
admin.enableServer=false 关闭也可以
这个需要重新配置一下 默认是使用8080端口
我这里使用了8081
这个端口安全组还是关闭的好
启动zookeeper服务
cd /usr/local/develop/apache-zookeeper-3.9.3-bin/bin
这样就指定conf下的zoo.cfg配置文件来启动
./zkServer.sh --config ../conf start
./zkServer.sh stop 关闭
启动Kafka
nohup /usr/local/develop/kafka_2.13-3.9.0/bin/kafka-server-start.sh /usr/local/develop/kafka_2.13-3.9.0/config/server.properties > /usr/local/develop/kafka.log 2>&1 &
没有正常启动起来 原因是
原先用kafka自带的zookeeper启动过
把meta.properties删除就好了
原先使用 Kafka 自带的 Zookeeper 启动过 Kafka ,然后后来又单独安装了 Zookeeper,出现 Invalid cluster.id
错误时,问题通常是因为 meta.properties
文件中保存了 Zookeeper 相关的 ID,而这个 ID 在不同的 Zookeeper 实例之间是不一致的。