当前位置: 首页 > article >正文

Kafka详解——介绍与部署

1. 什么是 Kafka?

Kafka 是一个分布式的消息队列系统,最初由 LinkedIn 开发,后来成为 Apache 开源项目。它的主要用途包括实时数据处理、日志收集、数据流管道构建等。Kafka 具备高吞吐量、可扩展性、持久性和容错性,广泛应用于大数据和实时流处理场景。

核心概念

  1. Producer(生产者):负责向 Kafka 发送消息的数据发布方。
  2. Consumer(消费者):从 Kafka 读取消息的应用程序。
  3. Topic(主题):消息的分类,生产者将消息发布到特定的 Topic,消费者订阅感兴趣的 Topic。
  4. Partition(分区):每个 Topic 可划分为多个分区,支持并行处理,提高吞吐量。
  5. Broker(代理):Kafka 集群中的服务器节点,负责存储数据和处理请求。
  6. Zookeeper:用于管理 Kafka 集群的元数据,跟踪 Broker、Partition 以及消费者的状态。

Kafka 的工作流程

  1. 生产者发送消息

    生产者将消息发布到指定的 Topic。Topic 内部有多个分区,生产者按照轮询或键哈希的方式将消息分配到不同分区。
  2. Broker 存储消息

    Broker 接收到消息后,持久化到磁盘,并为每条消息分配一个偏移量(offset)。
  3. 消费者读取消息

    消费者订阅 Topic,从对应的分区读取消息,并记录已消费的偏移量,确保数据不重复、不丢失。

Kafka 的特点

  • 高吞吐量:支持大规模消息处理,利用分区并行处理能力。
  • 持久化存储:消息持久化到磁盘,支持数据恢复。
  • 水平扩展:通过增加 Broker 扩容,支持百万级 TPS(每秒事务处理量)。
  • 容错性:分区副本机制确保数据高可用,Broker 故障时其他节点能接管。
  • 流处理能力:与 Kafka Streams、Flink 等工具结合,支持实时数据处理。

常见应用场景

  1. 日志收集:收集服务器日志,统一传输到大数据平台。
  2. 监控系统:实时收集应用程序指标,构建监控告警系统。
  3. 数据管道:作为数据流的中间件,在系统间传输和处理数据。
  4. 消息队列:解耦系统,提高服务扩展性和容错能力。

2. Kafka的部署

前置准备:需要有3台免密互通的虚拟机并安装好JAVA环境,未安装可参考:

本地部署大数据集群前置准备https://blog.csdn.net/m0_73641796/article/details/145994787?spm=1001.2014.3001.5501

2.1 Zookeeper安装

上传Zookeeper并修改配置文件

tar -zxf apache-zookeeper-3.7.1-bin.tar.gz -C /export/server/
cd /export/server/
mv apache-zookeeper-3.7.1-bin/ zookeeper
cd zookeeper/
mkdir zkData
cd zkData/
echo "1" >> myid;
cd ../conf/
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg
# 以下内容为修改内容
dataDir=/export/server/zookeeper/zkData

# 以下内容为新增内容
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888

分发Zookeeper

cd /export/server/
scp -r zookeeper node2:`pwd`/
scp -r zookeeper node3:`pwd`/

# 分别将不同虚拟机/export/server/zookeeper/zkData目录下myid文件进行修改
vim /export/server/zookeeper/zkData/myid

# node1:1
# node2:2
# node3:3

封装启停脚本

cd /usr/bin
vim zk.sh
--添加如下内容:
#!/bin/bash

case $1 in
"start"){
	for i in node1 node2 node3
	do
        echo ---------- zookeeper $i 启动 ------------
		ssh $i "/export/server/zookeeper/bin/zkServer.sh start"
	done
};;
"stop"){
	for i in node1 node2 node3
	do
        echo ---------- zookeeper $i 停止 ------------    
		ssh $i "/export/server/zookeeper/bin/zkServer.sh stop"
	done
};;
"status"){
	for i in node1 node2 node3
	do
        echo ---------- zookeeper $i 状态 ------------    
		ssh $i "/export/server/zookeeper/bin/zkServer.sh status"
	done
};;
esac

# 给zk.sh文件授权
chmod 777 zk.sh

2.2 启停Zookeeper

# 启动ZK服务
zk.sh start
# 查看ZK服务状态
zk.sh status
# 停止ZK服务
zk.sh stop

2.3  Kafka安装

上传kafka并修改配置文件

tar -zxf kafka_2.12-3.6.1.tgz -C /export/server/
cd /export/server/
mv kafka_2.12-3.6.1/ kafka
cd kafka/config
vim server.properties
--修改如下配置
broker.id=1
advertised.listeners=PLAINTEXT://node1:9092
log.dirs=/export/server/kafka/kafka-logs
zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka

分发kafka

cd /export/server/

scp -r kafka node2:`pwd`/
scp -r kafka node3:`pwd`/

# 分别修改Node2与Node3的配置文件
vim /export/server/kafka/config/server.properties
--node2
broker.id=2
advertised.listeners=PLAINTEXT://node2:9092
--node3
broker.id=3
advertised.listeners=PLAINTEXT://node3:9092

配置环境变量

--在node1,node2,node3均执行以下操作:
 
vim /etc/profile
--添加如下内容:
#KAFKA_HOME
export KAFKA_HOME=/export/server/kafka
export PATH=$PATH:$KAFKA_HOME/bin
 
source /etc/profile

2.4  启停Kafka

启动前请先启动ZooKeeper服务

cd /export/server/kafka
# 执行启动指令
bin/kafka-server-start.sh -daemon config/server.properties
# 执行关闭指令
bin/kafka-server-stop.sh

 封装启停脚本

cd /usr/bin
vim kfk.sh
--添加如下内容:
#! /bin/bash

case $1 in
"start"){
    for i in node1 node2 node3
    do
        echo " --------启动 $i Kafka-------"
        ssh $i "/export/server/kafka/bin/kafka-server-start.sh -daemon /export/server/kafka/config/server.properties"
    done
};;
"stop"){
    for i in node1 node2 node3
    do
        echo " --------停止 $i Kafka-------"
        ssh $i "/export/server/kafka/bin/kafka-server-stop.sh "
    done
};;
esac

# 给文件授权
chmod 777 kfk.sh

# 启动kafka
kfk.sh start
# 停止Kafka
kfk.sh stop

注意:停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止ZooKeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。

2.5 联合脚本

因为Kafka启动前,需要先启动ZooKeeper,关闭时,又需要将所有Kafka全部关闭后,才能关闭ZooKeeper,这样,操作起来感觉比较麻烦,所以可以将之前的2个脚本再做一次封装。

新建xcall脚本,具体实现可参考:

集群批量命令执行工具 xcall 配置指南https://blog.csdn.net/m0_73641796/article/details/146318713?sharetype=blogdetail&sharerId=146318713&sharerefer=PC&sharesource=m0_73641796&spm=1011.2480.3001.8118创建cluster.sh脚本文件

vim /usr/bin/cluster.sh
--添加如下内容:
#!/bin/bash

case $1 in
"start"){
        echo ================== 启动 Kafka集群 ==================
        #启动 Zookeeper集群
        zk.sh start

        #启动 Kafka集群
        kfk.sh start

        };;
"stop"){
    echo "================== 停止 Kafka 集群 =================="

    # 停止 Kafka
    kfk.sh stop

    # 检查 Kafka 进程是否完全退出
    max_wait=30  # 最多等待 30 秒
    count=0
    while true
    do
        kafka_count=$(xcall jps | grep -c Kafka)
        echo "当前未停止的 Kafka 进程数为 $kafka_count"

        # 判断 Kafka 是否全部停止
        if [ "$kafka_count" -eq 0 ]; then
            echo "Kafka 已完全停止"
            break
        fi

        # 如果超过最大等待时间,还没停干净,强制结束
        count=$((count+1))
        if [ $count -ge $max_wait ]; then
            echo "Kafka 停止超时,强制杀死残留进程"
            xcall "pkill -f kafka"
            break
        fi

        sleep 1
    done

    # 停止 Zookeeper
    zk.sh stop
};;
esac

--添加权限
chmod 777 /usr/bin/cluster.sh

脚本调用方式

# 集群启动
cluster.sh start
# 集群关闭
cluster.sh stop

http://www.kler.cn/a/591732.html

相关文章:

  • 【Linux】Bash是什么?怎么使用?
  • 森林防火预警广播监控系统:以4G为纽带架构融合智能广播、远程监控、AI智能识别、告警提示、太阳能供电于一体的新一代森林防火预警系统
  • LeetCode 392. 判断子序列 java题解
  • 在 Ubuntu 中配置 NFS 共享服务的完整指南
  • C++ —— 线程同步(互斥锁)
  • OpenCV图像拼接(1)概述
  • 【Vue3+Vite指南】全局引入SCSS文件后出现Undefined mixin?一招解决命名空间陷阱!
  • 机器视觉工程师如何学习C#通讯
  • Flask实时监控:打造智能多设备在线离线检测平台(升级版)
  • 移动版 Edge :插件安装功能全面指南
  • SpringBoot-MVC配置类与 Controller 的扫描
  • 【Java】链表(LinkedList)(图文版)
  • QT学习笔记1
  • c语言笔记 存储期
  • 【实习经历Two:参与开源项目,学习并应用Git】
  • 解决qt中自定插件加载失败,不显示问题。
  • 报数游戏/补种未成活胡杨[E卷-hw_od]
  • HTML 新手入门:从零基础到搭建第一个静态页面(二)
  • 将温度预测的神经网络部署到服务器端,封装成api接口步骤
  • 高级java每日一道面试题-2025年3月04日-微服务篇[Eureka篇]-Eureka是什么?