当前位置: 首页 > article >正文

linux安装zookeeper和kafka集群

linux安装zookeeper和kafka集群

  • 一、Zookeeper集群部署
    • 安装zookeeper
      • 1. 下载
      • 2. 上传, 解压
      • 3. 配置 Zookeeper 节点
      • 4. 创建 myid 文件
      • 5. 启动参数更改
      • 6. sh文件授权
      • 7. 启动集群
      • 8. 防火墙开启端口
    • 验证集群
  • 二、kafka集群安装
    • 安装Kafka
      • 1. 下载Kafka安装包
      • 2. 上传到服务器,并解压到指定目录。
      • 3. 创建目录用于存放数据
      • 4. 修改配置文件
      • 5. 启动参数更改
      • 6. sh文件授权
      • 7. 启动集群
      • 8. 防火墙开启端口
      • 9. 创建主题
      • 10. 验证主题配置
      • 11. 查看主题列表
      • 12. 删除指定主题
    • 安装kafka可视化管理工具(可选)
      • 1. 上传,并解压。
      • 2. 修改配置文件
      • 3. 启动kafka-ui-lite
      • 4.日志输出
      • 5. 防火墙开启端口
    • 测试
      • 1. 进入kafka管理页面
      • 2. 添加配置
      • 3. 生产、消费消息
      • 4. 模拟节点宕机

一、Zookeeper集群部署

安装zookeeper

1. 下载

  • 检查jdk版本, Zookeeper 依赖于 Java8 以上环境
java -version
  • 下载zk, 我这里下载的是3.6.4
https://archive.apache.org/dist/zookeeper/

2. 上传, 解压

tar -zxvf  apache-zookeeper-3.6.4-bin.tar.gz  -C /opt/app/apache-zookeeper-3.6.4

3. 配置 Zookeeper 节点

  • 复制zoo_sample.cfg为zoo.cfg, 然后编辑
# tickTime 是一个核心配置参数,定义了 Zookeeper 的基本时间单位。
# 它用于控制和影响其他一些时间相关的配置,例如会话超时、Leader 与 Follower 之间的心跳检测等。
tickTime=2000

# initLimit 指定了 Follower 从服务器在启动时与 Leader 服务器进行数据同步的最大时间。
# 如果从服务器在规定的 initLimit 时间内未完成与 Leader 的同步,则 Zookeeper 会认为该从服务器无法正常加入集群。
# 如果 tickTime=2000 毫秒(2 秒),那么 initLimit=10 就表示允许的初始化同步时间为 10 * 2000 = 20000 毫秒(即 20 秒)。
initLimit=10

# syncLimit 指定了 Leader 和 Follower 之间心跳消息的超时时间。
# 如果一个 Follower 在规定的 syncLimit 时间内无法收到 Leader 的消息,
# Zookeeper 会认为该 Follower 已失效并从集群中剔除。
syncLimit=5

# 在 Zookeeper 中,快照是指当前节点数据的备份。
# Zookeeper 会定期将内存中的数据状态保存为一个快照文件,存储在指定的目录中,
# 以便在服务器重启或崩溃时可以从快照恢复数据。
dataDir=/opt/app/apache-zookeeper-3.6.4/data

# 客户端连接的地址
clientPort=8111

# 每个客户端(根据客户端的 IP 地址区分)对单个 Zookeeper 服务器的最大连接数。
maxClientCnxns=60


# 用于自动清理旧数据的配置参数。它控制 Zookeeper 定期清理旧的快照(snapshot)文件和事务日志的频率
# 默认情况下,autopurge.purgeInterval 为 0,表示不启用自动清理功能。在这里先用默认的。
autopurge.purgeInterval=0

# 集群节点信息
# 8112 端口:用于 Follower 和 Leader 之间的通信。
# 8113 端口:用于 Leader 选举。
server.1=192.168.0.170:8112:8113
server.2=192.168.0.171:8112:8113
server.3=192.168.0.149:8112:8113

# 监控工具
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true

4. 创建 myid 文件

  • 在每台机器上的zookeeper数据目录下创建myid, 然后执行以下命令
echo "1" > /opt/app/apache-zookeeper-3.6.4/data/myid  # 在 192.168.0.170 机器上, 和zoo.cfg中的server.1对应
echo "2" > /opt/app/apache-zookeeper-3.6.4/data/myid  # 在 192.168.0.171 机器上, 和zoo.cfg中的server.2对应
echo "3" > /opt/app/apache-zookeeper-3.6.4/data/myid  # 在 192.168.0.149 机器上, 和zoo.cfg中的server.3对应

5. 启动参数更改

  • 进入bin目录, 更改zkEnv.sh中的启动参数
vim /opt/app/apache-zookeeper-3.6.4/bin/zkEnv.sh
  • 更改SERVER_JVMFLAGS, 设置jvm初始化内存与最大堆内存
export SERVER_JVMFLAGS="-Xms4g -Xmx4g"

6. sh文件授权

  • 进入bin目录
cd /opt/app/apache-zookeeper-3.6.4/bin
  • 授予.sh执行权限
chmod 755 *.sh

7. 启动集群

  • 在每台机器上启动zk
/opt/app/apache-zookeeper-3.6.4/bin/zkServer.sh start
  • 然后检查每台机器zk状态
/opt/app/apache-zookeeper-3.6.4/bin/zkServer.sh status
  • 停止命令如下
/opt/app/apache-zookeeper-3.6.4/bin/zkServer.sh stop

8. 防火墙开启端口

  • 开启端口
firewall-cmd --add-port=8111/tcp --permanent
firewall-cmd --add-port=8112/tcp --permanent
firewall-cmd --add-port=8113/tcp --permanent
  • 重启防火墙
firewall-cmd --reload

验证集群

  • 使用 Zookeeper 客户端连接
/opt/app/apache-zookeeper-3.6.4/bin/zkCli.sh -server ip:port

二、kafka集群安装

安装Kafka

1. 下载Kafka安装包

下载地址:https://kafka.apache.org/downloads

如果只是使用的话,下载二进制文件就行,不用选择source,在这里我选择下载kafka_2.13-3.5.2.tgz,scala版本为2.13,kafka版本为3.5.2。

2. 上传到服务器,并解压到指定目录。

tar -zxvf kafka_2.13-3.5.2.tgz -C /opt/app/kafka_2.13-3.5.2

3. 创建目录用于存放数据

mkdir -p /opt/app/kafka_2.13-3.5.2/data

4. 修改配置文件

修改kafka的配置文件,修改如下:

vim /opt/app/kafka_2.13-3.5.2/config/server.properties

############################# Server Basics #############################

# 节点id, 集群中每个节点的唯一标识, 每台机器不能重复192.168.0.170为1,192.168.0.171为2,192.168.0.149为2
broker.id=1

############################# Socket Server Settings #############################
# 设置 Kafka 监听的地址和端口, 需要对应机器的ip, 端口自定义
listeners=PLAINTEXT://192.168.0.170:8114


# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/opt/app/kafka_2.13-3.5.2/data

# 分区数为 3,Kafka 会将主题数据分成 3 个分区,这可以提高并行消费的能力。
num.partitions=3

# 当 Kafka 启动或发生崩溃时,它会执行数据恢复操作,以确保日志文件的一致性并重建索引文件。指定了每个数据目录中用于恢复的线程数。
num.recovery.threads.per.data.dir=2

############################# Internal Topic Settings  #############################
# 用于配置存储消费者偏移量(offsets)的内部主题 (__consumer_offsets 主题) 的副本数的参数。
offsets.topic.replication.factor=3
# Kafka 中的事务功能依赖于事务日志来记录所有的事务状态信息,这些信息用于确保事务的一致性和可靠性。
# 用于指定事务状态日志的副本数,也就是该日志在 Kafka 集群中的副本数。
transaction.state.log.replication.factor=3
# 如果 Kafka 集群有 3 个节点,建议将 transaction.state.log.min.isr 设置为 2。
# 这意味着事务状态日志需要至少 2 个副本是同步的才能继续写入事务。
# 这样可以保证即使一个副本不可用或出现故障,仍然有足够的副本来确保事务日志的一致性和容错性。
transaction.state.log.min.isr=2

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age

#log.retention.ms=10000 

# 当我们同时设置了保留时间和保留大小时,kafka满足其中任意一个条件时,就会删除日志段。
# 日志保留时间 72h
log.retention.hours=72
# 日志保留大小20GB
log.retention.bytes=21474836480
# 每个日志段大小1G
log.segment.bytes=1073741824
# 每5分钟扫描一次
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# 设置zk集群
zookeeper.connect=192.168.0.170:8111,192.168.0.171:8111,192.168.0.149:8111

# zk连接超时时间
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0


## 分区副本的最小同步数,通常配置为集群节点数减一
## 如果 replication.factor=3 且 min.insync.replicas=2,生产者在 acks=all 时,只有当至少 2 个副本完成了同步,写操作才会被确认。
min.insync.replicas=2
# 它定义了每个分区的数据在 Kafka 集群中有多少个副本,以提高数据的可靠性和容错性。
# 如果一个分区的某个副本所在的节点宕机,Kafka 可以从其他副本继续读取数据,确保数据可用性。
# 如果 default.replication.factor=3,那么每个新建主题的分区会默认有 3 个副本。
default.replication.factor=3

# 允许删除主题
delete.topic.enable=true

5. 启动参数更改

编辑启动文件

vim /opt/app/kafka_2.13-3.5.2/bin/kafka-server-start.sh

找到参数KAFKA_HEAP_OPTS设置的地方, 修改为如下

export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"

6. sh文件授权

  • 进入bin目录
cd /opt/app/kafka_2.13-3.5.2/bin/
  • 授予.sh执行权限
chmod 755 *.sh

7. 启动集群

  • 在每台机器上启动kafka
/opt/app/kafka_2.13-3.5.2/bin/kafka-server-start.sh -daemon /opt/app/kafka_2.13-3.5.2/config/server.properties
  • 查看启动状态
ps -ef|grep kafka.Kafka
  • 停止命令如下
/opt/app/kafka_2.13-3.5.2/bin/kafka-server-stop.sh

8. 防火墙开启端口

firewall-cmd --add-port=8114/tcp --permanent;
firewall-cmd --reload

9. 创建主题

因为我们已经在配置文件中指定num.partitions=3和default.replication.factor=3,所以就不用在创建主题的时候指定分区数和副本数了

/opt/app/kafka_2.13-3.5.2/bin/kafka-topics.sh --create --topic my-topic --bootstrap-server 192.168.0.170:8114

10. 验证主题配置

创建成功后,可以使用 --describe 命令查看主题的配置信息:

/opt/app/kafka_2.13-3.5.2/bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server 192.168.0.170:8114

主要检查分区数量(PartitionCount)和副本数量(ReplicationFactor)

Topic: my-topic TopicId: 6941T0WBTpS9UaXJT-ytYw PartitionCount: 3       ReplicationFactor: 3    Configs: min.insync.replicas=2,segment.bytes=1073741824,retention.bytes=21474836480
        Topic: my-topic Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
        Topic: my-topic Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: my-topic Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1

11. 查看主题列表

/opt/app/kafka_2.13-3.5.2/bin/kafka-topics.sh --bootstrap-server 192.168.0.170:8114 --list

12. 删除指定主题

/opt/app/kafka_2.13-3.5.2/bin/kafka-topics.sh --bootstrap-server 192.168.0.170:8114 --delete --topic topic-name

安装kafka可视化管理工具(可选)

在这里我选择kafkaUI-lite,下载下方的二进制安装包就行

https://gitee.com/freakchicken/kafka-ui-lite/releases/tag/v1.2.11

1. 上传,并解压。

tar -zxvf kafka-ui-lite-1.2.11.tar.gz -C /opt/app/kafka-ui-lite-1.2.11

2. 修改配置文件

如果想修改元数据库为mysql, 修改conf/application.properties中的以下配置
在这里我们直接使用linux自带的sqlite数据库

server.port=19092
spring.datasource.driver-class-name=org.sqlite.JDBC
spring.datasource.url=jdbc:sqlite::resource:data.db 
spring.datasource.username=
spring.datasource.password=

查看系统有无sqlite

rpm -qa | grep sqlite

有sqlite就会有如下输出

sqlite-3.7.17-8.el7_7.1.x86_64

如果使用mysql数据库就需要执行sql目录下的sql脚本

[root@localhost sql]# pwd
/opt/app/kafka-ui-lite-1.2.11/sql
[root@localhost sql]# ll
总用量 8
-rw-r--r--. 1 root root 1540 413 2021 ddl_mysql.sql
-rw-r--r--. 1 root root 1077 413 2021 ddl_sqlite.sql

3. 启动kafka-ui-lite

进入到安装目录下,执行如下命令

# 前台启动
sh bin/kafkaUI.sh start
# 后台启动
sh bin/kafkaUI.sh -d start
# 关闭后台启动的进程
sh bin/kafkaUI.sh stop

4.日志输出

/opt/app/kafka-ui-lite-1.2.11/logs

5. 防火墙开启端口

# 开启kafka-ui-lite可视化管理工具端口
firewall-cmd --zone=public --add-port=19092/tcp --permanent
# 重新加载防火墙
firewall-cmd --reload

测试

1. 进入kafka管理页面

访问地址:http://192.168.1.100:19092

尽量不要使用此客户端创建主题, 我试了一下, 创建的主题无法正常生产消费消息

2. 添加配置

[外链图片转存中...(img-fOjDvhqp-1731318632832)]

3. 生产、消费消息

[外链图片转存中...(img-jb4lbhZl-1731318632833)]

4. 模拟节点宕机

可以随机停止一个zk节点, kafka节点, 测试一下集群是否能够正常生产消费


http://www.kler.cn/a/391077.html

相关文章:

  • 【马来西亚理工大学主办,ACM出版】2025年大数据、通信技术与计算机应用国际学术会议(BDCTA 2025)
  • React Native 项目 Error: EMFILE: too many open files, watch
  • DeepSeek-V3与GPT-4o的对比详解
  • arcgisPro加载CGCS2000天地图后,如何转成米单位
  • 全新免押租赁系统打造便捷安全的租赁体验
  • ADO.NET知识总结4---SqlParameter参数
  • C++学习笔记----11、模块、头文件及各种主题(一)---- 模板概览与类模板(7)
  • 设计模式之单列模式(7种单例模式案例,Effective Java 作者推荐枚举单例模式)
  • 城镇住房保障:SpringBoot系统架构解析
  • 科技前沿:汽车智能玻璃,开启透明显示新纪元
  • 【二叉树】——
  • 人保财险(外包)面试分享
  • UI资源分包 -- 基于Xasset框架代码实例
  • Ubuntu中以root身份运行Qt创建的项目
  • UML概述、类图关系及连接线表示
  • 【MQTT】代理服务比较RabbitMQ、Mosquitto 和 EMQX
  • MySQ怎么使用语法介绍(详细)
  • 工业主板在汽车制造中的应用
  • php 如何将数组转成对象数组
  • 人工智能(10)——————自然语言处理
  • 网络安全管理与运维服务_网络安全运维方案
  • HCIP—快速生成树协议(RSTP)实验配置
  • 剪辑视频和制作视频的软件哪个好
  • A018基于Spring Boot的民宿租赁系统
  • 2024年华为OD机试真题-关联子串-Java-OD统一考试(E卷)
  • 使用 PageHelper 在 Spring Boot 项目中实现分页查询