kafka的安装与使用
文章目录
- kafka安装
- 1 上传安装包
- 2 解压安装包
- 3 创建logs文件夹
- 4 修改配置文件
- 5 分发kafka
- 6 启动kafka
- kafka使用
- 1 启动kafka
- 2 关闭kafka
- 3 查看topic
- 4 创建topic,名称为test
- 5 删除名称为test的topic
- 6 向topic发送数据
- 7 从topic里消费数据
kafka安装
kafka安装前需要确认zookeeper已安装
1 上传安装包
将安装包kafka_2.11-2.4.1.tgz上传至/opt/install_packages
2 解压安装包
tar -zxvf /opt/install_packages/kafka_2.11-2.4.1.tgz -C /opt/softs/
3 创建logs文件夹
cd /opt/softs/kafka_2.11-2.4.1
mkdir logs
4 修改配置文件
cd config/
##更名配置文件
mv server.properties server.properties_bak
vim /opt/softs/kafka_2.11-2.4.1/config/server.properties
输入如下内容
#broker 的全局唯一编号,不能重复(根据实际修改)
broker.id=1
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径(根据实际修改)
log.dirs=/opt/softs/kafka_2.11-2.4.1/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
log.retention.hours=168
#配置连接Zookeeper集群地址(根据实际修改)
zookeeper.connect=bigdata03:2181,bigdata04:2181,bigdata05:2181
5 分发kafka
##在bigdata03上执行
scp -r /opt/softs/kafka_2.11-2.4.1/ root@bigdata04:/opt/softs/
scp -r /opt/softs/kafka_2.11-2.4.1/ root@bigdata05:/opt/softs/
##在bigdata04上修改server.properties,将broker.id修改为2
vim /opt/softs/kafka_2.11-2.4.1/config/server.properties
##在bigdata05上修改server.properties,将broker.id修改为3
vim /opt/softs/kafka_2.11-2.4.1/config/server.properties
6 启动kafka
## 依次在bigdata03,bigdata04,bigdata05节点上启动kafka
cd /opt/softs/kafka_2.11-2.4.1/bin
./kafka-server-start.sh ../config/server.properties &
& 代表在后台运行
kafka使用
1 启动kafka
--在安装kafka集群的每台节点上(bigdata03,bigdata04,bigdata05)分别启动kafka
--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin
--2 执行启动命令
./kafka-server-start.sh ../config/server.properties &
2 关闭kafka
--在安装kafka集群的每台节点上(bigdata03,bigdata04,bigdata05)分别关闭kafka
--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin
--2 执行关闭命令
./kafka-server-stop.sh stop
3 查看topic
--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin
--2 执行查看topic命令
./kafka-topics.sh --zookeeper bigdata03:2181,bigdata04:2181,bigdata05:2181 --list
4 创建topic,名称为test
--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin
--2 执行创建topic的命令
./kafka-topics.sh --zookeeper bigdata03:2181,bigdata04:2181,bigdata05:2181 --create --replication-factor 3 --partitions 1 --topic test
选项说明:
--topic 定义topic名称
--replication-factor 定义topic的副本数,副本的概念和hdfs上的文件副本一致,相同的数据存储在不同的节点上一共多少份
--partitions 定义topic的分区数
若报错:
Error while executing topic command : Replication factor: 3 larger than available brokers: 2.
[2023-05-02 17:49:56,071] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.
说明要么kafka启动的数量较少,在其他虚拟机上有没启动的,要么上面kafka的config目录下的server.properties中kafka的zookeeper的配置没有配置好。
对于kafka启动的数量较少,在其他虚拟机上有没启动的,重新启动就好,可以通过 jps 查询
kafka启动前
kafka启动后
5 删除名称为test的topic
--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin
--2 执行删除topic的命令
-- 删除topic需要在kafka目录下config目录中的server.properties中配置delete.topic.enable=true,如果不配置而去删除topic只会对topic进行标记删除
./kafka-topics.sh --zookeeper bigdata03:2181,bigdata04:2181,bigdata05:2181 --delete --topic test
6 向topic发送数据
--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin
--2 执行发送消息的命令
./kafka-console-producer.sh --broker-list bigdata03:9092,bigdata04:9092,bigdata05:9092 --topic test
选项说明
--broker-list 定义kafka集群的节点
7 从topic里消费数据
--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin
--2 执行消费的命令
./kafka-console-consumer.sh --bootstrap-server bigdata03:9092,bigdata04:9092,bigdata05:9092 --from-beginning --topic test
选项说明
--bootstrap-server 定义kafka集群的节点
--from-beginning 定义消费方式为从头消费