Kafka详解——介绍与部署
1. 什么是 Kafka?
Kafka 是一个分布式的消息队列系统,最初由 LinkedIn 开发,后来成为 Apache 开源项目。它的主要用途包括实时数据处理、日志收集、数据流管道构建等。Kafka 具备高吞吐量、可扩展性、持久性和容错性,广泛应用于大数据和实时流处理场景。
核心概念
- Producer(生产者):负责向 Kafka 发送消息的数据发布方。
- Consumer(消费者):从 Kafka 读取消息的应用程序。
- Topic(主题):消息的分类,生产者将消息发布到特定的 Topic,消费者订阅感兴趣的 Topic。
- Partition(分区):每个 Topic 可划分为多个分区,支持并行处理,提高吞吐量。
- Broker(代理):Kafka 集群中的服务器节点,负责存储数据和处理请求。
- Zookeeper:用于管理 Kafka 集群的元数据,跟踪 Broker、Partition 以及消费者的状态。
Kafka 的工作流程
-
生产者发送消息
生产者将消息发布到指定的 Topic。Topic 内部有多个分区,生产者按照轮询或键哈希的方式将消息分配到不同分区。 -
Broker 存储消息
Broker 接收到消息后,持久化到磁盘,并为每条消息分配一个偏移量(offset)。 -
消费者读取消息
消费者订阅 Topic,从对应的分区读取消息,并记录已消费的偏移量,确保数据不重复、不丢失。
Kafka 的特点
- 高吞吐量:支持大规模消息处理,利用分区并行处理能力。
- 持久化存储:消息持久化到磁盘,支持数据恢复。
- 水平扩展:通过增加 Broker 扩容,支持百万级 TPS(每秒事务处理量)。
- 容错性:分区副本机制确保数据高可用,Broker 故障时其他节点能接管。
- 流处理能力:与 Kafka Streams、Flink 等工具结合,支持实时数据处理。
常见应用场景
- 日志收集:收集服务器日志,统一传输到大数据平台。
- 监控系统:实时收集应用程序指标,构建监控告警系统。
- 数据管道:作为数据流的中间件,在系统间传输和处理数据。
- 消息队列:解耦系统,提高服务扩展性和容错能力。
2. Kafka的部署
前置准备:需要有3台免密互通的虚拟机并安装好JAVA环境,未安装可参考:
本地部署大数据集群前置准备https://blog.csdn.net/m0_73641796/article/details/145994787?spm=1001.2014.3001.5501
2.1 Zookeeper安装
上传Zookeeper并修改配置文件
tar -zxf apache-zookeeper-3.7.1-bin.tar.gz -C /export/server/
cd /export/server/
mv apache-zookeeper-3.7.1-bin/ zookeeper
cd zookeeper/
mkdir zkData
cd zkData/
echo "1" >> myid;
cd ../conf/
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg
# 以下内容为修改内容
dataDir=/export/server/zookeeper/zkData
# 以下内容为新增内容
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
分发Zookeeper
cd /export/server/
scp -r zookeeper node2:`pwd`/
scp -r zookeeper node3:`pwd`/
# 分别将不同虚拟机/export/server/zookeeper/zkData目录下myid文件进行修改
vim /export/server/zookeeper/zkData/myid
# node1:1
# node2:2
# node3:3
封装启停脚本
cd /usr/bin
vim zk.sh
--添加如下内容:
#!/bin/bash
case $1 in
"start"){
for i in node1 node2 node3
do
echo ---------- zookeeper $i 启动 ------------
ssh $i "/export/server/zookeeper/bin/zkServer.sh start"
done
};;
"stop"){
for i in node1 node2 node3
do
echo ---------- zookeeper $i 停止 ------------
ssh $i "/export/server/zookeeper/bin/zkServer.sh stop"
done
};;
"status"){
for i in node1 node2 node3
do
echo ---------- zookeeper $i 状态 ------------
ssh $i "/export/server/zookeeper/bin/zkServer.sh status"
done
};;
esac
# 给zk.sh文件授权
chmod 777 zk.sh
2.2 启停Zookeeper
# 启动ZK服务
zk.sh start
# 查看ZK服务状态
zk.sh status
# 停止ZK服务
zk.sh stop
2.3 Kafka安装
上传kafka并修改配置文件
tar -zxf kafka_2.12-3.6.1.tgz -C /export/server/
cd /export/server/
mv kafka_2.12-3.6.1/ kafka
cd kafka/config
vim server.properties
--修改如下配置
broker.id=1
advertised.listeners=PLAINTEXT://node1:9092
log.dirs=/export/server/kafka/kafka-logs
zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka
分发kafka
cd /export/server/
scp -r kafka node2:`pwd`/
scp -r kafka node3:`pwd`/
# 分别修改Node2与Node3的配置文件
vim /export/server/kafka/config/server.properties
--node2
broker.id=2
advertised.listeners=PLAINTEXT://node2:9092
--node3
broker.id=3
advertised.listeners=PLAINTEXT://node3:9092
配置环境变量
--在node1,node2,node3均执行以下操作:
vim /etc/profile
--添加如下内容:
#KAFKA_HOME
export KAFKA_HOME=/export/server/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
2.4 启停Kafka
启动前请先启动ZooKeeper服务
cd /export/server/kafka
# 执行启动指令
bin/kafka-server-start.sh -daemon config/server.properties
# 执行关闭指令
bin/kafka-server-stop.sh
封装启停脚本
cd /usr/bin
vim kfk.sh
--添加如下内容:
#! /bin/bash
case $1 in
"start"){
for i in node1 node2 node3
do
echo " --------启动 $i Kafka-------"
ssh $i "/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties"
done
};;
"stop"){
for i in node1 node2 node3
do
echo " --------停止 $i Kafka-------"
ssh $i "/export/server/kafka/bin/kafka-server-stop.sh "
done
};;
esac
# 给文件授权
chmod 777 kfk.sh
# 启动kafka
kfk.sh start
# 停止Kafka
kfk.sh stop
注意:停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止ZooKeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。
2.5 联合脚本
因为Kafka启动前,需要先启动ZooKeeper,关闭时,又需要将所有Kafka全部关闭后,才能关闭ZooKeeper,这样,操作起来感觉比较麻烦,所以可以将之前的2个脚本再做一次封装。
新建xcall脚本,具体实现可参考:
集群批量命令执行工具 xcall 配置指南https://blog.csdn.net/m0_73641796/article/details/146318713?sharetype=blogdetail&sharerId=146318713&sharerefer=PC&sharesource=m0_73641796&spm=1011.2480.3001.8118创建cluster.sh脚本文件
vim /usr/bin/cluster.sh
--添加如下内容:
#!/bin/bash
case $1 in
"start"){
echo ================== 启动 Kafka集群 ==================
#启动 Zookeeper集群
zk.sh start
#启动 Kafka集群
kfk.sh start
};;
"stop"){
echo "================== 停止 Kafka 集群 =================="
# 停止 Kafka
kfk.sh stop
# 检查 Kafka 进程是否完全退出
max_wait=30 # 最多等待 30 秒
count=0
while true
do
kafka_count=$(xcall jps | grep -c Kafka)
echo "当前未停止的 Kafka 进程数为 $kafka_count"
# 判断 Kafka 是否全部停止
if [ "$kafka_count" -eq 0 ]; then
echo "Kafka 已完全停止"
break
fi
# 如果超过最大等待时间,还没停干净,强制结束
count=$((count+1))
if [ $count -ge $max_wait ]; then
echo "Kafka 停止超时,强制杀死残留进程"
xcall "pkill -f kafka"
break
fi
sleep 1
done
# 停止 Zookeeper
zk.sh stop
};;
esac
--添加权限
chmod 777 /usr/bin/cluster.sh
脚本调用方式
# 集群启动
cluster.sh start
# 集群关闭
cluster.sh stop