linux安装zookeeper和kafka集群
linux安装zookeeper和kafka集群
- 一、Zookeeper集群部署
- 安装zookeeper
- 1. 下载
- 2. 上传, 解压
- 3. 配置 Zookeeper 节点
- 4. 创建 myid 文件
- 5. 启动参数更改
- 6. sh文件授权
- 7. 启动集群
- 8. 防火墙开启端口
- 验证集群
- 二、kafka集群安装
- 安装Kafka
- 1. 下载Kafka安装包
- 2. 上传到服务器,并解压到指定目录。
- 3. 创建目录用于存放数据
- 4. 修改配置文件
- 5. 启动参数更改
- 6. sh文件授权
- 7. 启动集群
- 8. 防火墙开启端口
- 9. 创建主题
- 10. 验证主题配置
- 11. 查看主题列表
- 12. 删除指定主题
- 安装kafka可视化管理工具(可选)
- 1. 上传,并解压。
- 2. 修改配置文件
- 3. 启动kafka-ui-lite
- 4.日志输出
- 5. 防火墙开启端口
- 测试
- 1. 进入kafka管理页面
- 2. 添加配置
- 3. 生产、消费消息
- 4. 模拟节点宕机
一、Zookeeper集群部署
安装zookeeper
1. 下载
- 检查jdk版本, Zookeeper 依赖于 Java8 以上环境
java -version
- 下载zk, 我这里下载的是3.6.4
https://archive.apache.org/dist/zookeeper/
2. 上传, 解压
tar -zxvf apache-zookeeper-3.6.4-bin.tar.gz -C /opt/app/apache-zookeeper-3.6.4
3. 配置 Zookeeper 节点
- 复制zoo_sample.cfg为zoo.cfg, 然后编辑
# tickTime 是一个核心配置参数,定义了 Zookeeper 的基本时间单位。
# 它用于控制和影响其他一些时间相关的配置,例如会话超时、Leader 与 Follower 之间的心跳检测等。
tickTime=2000
# initLimit 指定了 Follower 从服务器在启动时与 Leader 服务器进行数据同步的最大时间。
# 如果从服务器在规定的 initLimit 时间内未完成与 Leader 的同步,则 Zookeeper 会认为该从服务器无法正常加入集群。
# 如果 tickTime=2000 毫秒(2 秒),那么 initLimit=10 就表示允许的初始化同步时间为 10 * 2000 = 20000 毫秒(即 20 秒)。
initLimit=10
# syncLimit 指定了 Leader 和 Follower 之间心跳消息的超时时间。
# 如果一个 Follower 在规定的 syncLimit 时间内无法收到 Leader 的消息,
# Zookeeper 会认为该 Follower 已失效并从集群中剔除。
syncLimit=5
# 在 Zookeeper 中,快照是指当前节点数据的备份。
# Zookeeper 会定期将内存中的数据状态保存为一个快照文件,存储在指定的目录中,
# 以便在服务器重启或崩溃时可以从快照恢复数据。
dataDir=/opt/app/apache-zookeeper-3.6.4/data
# 客户端连接的地址
clientPort=8111
# 每个客户端(根据客户端的 IP 地址区分)对单个 Zookeeper 服务器的最大连接数。
maxClientCnxns=60
# 用于自动清理旧数据的配置参数。它控制 Zookeeper 定期清理旧的快照(snapshot)文件和事务日志的频率
# 默认情况下,autopurge.purgeInterval 为 0,表示不启用自动清理功能。在这里先用默认的。
autopurge.purgeInterval=0
# 集群节点信息
# 8112 端口:用于 Follower 和 Leader 之间的通信。
# 8113 端口:用于 Leader 选举。
server.1=192.168.0.170:8112:8113
server.2=192.168.0.171:8112:8113
server.3=192.168.0.149:8112:8113
# 监控工具
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
4. 创建 myid 文件
- 在每台机器上的zookeeper数据目录下创建myid, 然后执行以下命令
echo "1" > /opt/app/apache-zookeeper-3.6.4/data/myid # 在 192.168.0.170 机器上, 和zoo.cfg中的server.1对应
echo "2" > /opt/app/apache-zookeeper-3.6.4/data/myid # 在 192.168.0.171 机器上, 和zoo.cfg中的server.2对应
echo "3" > /opt/app/apache-zookeeper-3.6.4/data/myid # 在 192.168.0.149 机器上, 和zoo.cfg中的server.3对应
5. 启动参数更改
- 进入bin目录, 更改zkEnv.sh中的启动参数
vim /opt/app/apache-zookeeper-3.6.4/bin/zkEnv.sh
- 更改SERVER_JVMFLAGS, 设置jvm初始化内存与最大堆内存
export SERVER_JVMFLAGS="-Xms4g -Xmx4g"
6. sh文件授权
- 进入bin目录
cd /opt/app/apache-zookeeper-3.6.4/bin
- 授予.sh执行权限
chmod 755 *.sh
7. 启动集群
- 在每台机器上启动zk
/opt/app/apache-zookeeper-3.6.4/bin/zkServer.sh start
- 然后检查每台机器zk状态
/opt/app/apache-zookeeper-3.6.4/bin/zkServer.sh status
- 停止命令如下
/opt/app/apache-zookeeper-3.6.4/bin/zkServer.sh stop
8. 防火墙开启端口
- 开启端口
firewall-cmd --add-port=8111/tcp --permanent
firewall-cmd --add-port=8112/tcp --permanent
firewall-cmd --add-port=8113/tcp --permanent
- 重启防火墙
firewall-cmd --reload
验证集群
- 使用 Zookeeper 客户端连接
/opt/app/apache-zookeeper-3.6.4/bin/zkCli.sh -server ip:port
二、kafka集群安装
安装Kafka
1. 下载Kafka安装包
下载地址:https://kafka.apache.org/downloads
如果只是使用的话,下载二进制文件就行,不用选择source,在这里我选择下载kafka_2.13-3.5.2.tgz,scala版本为2.13,kafka版本为3.5.2。
2. 上传到服务器,并解压到指定目录。
tar -zxvf kafka_2.13-3.5.2.tgz -C /opt/app/kafka_2.13-3.5.2
3. 创建目录用于存放数据
mkdir -p /opt/app/kafka_2.13-3.5.2/data
4. 修改配置文件
修改kafka的配置文件,修改如下:
vim /opt/app/kafka_2.13-3.5.2/config/server.properties
############################# Server Basics #############################
# 节点id, 集群中每个节点的唯一标识, 每台机器不能重复192.168.0.170为1,192.168.0.171为2,192.168.0.149为2
broker.id=1
############################# Socket Server Settings #############################
# 设置 Kafka 监听的地址和端口, 需要对应机器的ip, 端口自定义
listeners=PLAINTEXT://192.168.0.170:8114
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/opt/app/kafka_2.13-3.5.2/data
# 分区数为 3,Kafka 会将主题数据分成 3 个分区,这可以提高并行消费的能力。
num.partitions=3
# 当 Kafka 启动或发生崩溃时,它会执行数据恢复操作,以确保日志文件的一致性并重建索引文件。指定了每个数据目录中用于恢复的线程数。
num.recovery.threads.per.data.dir=2
############################# Internal Topic Settings #############################
# 用于配置存储消费者偏移量(offsets)的内部主题 (__consumer_offsets 主题) 的副本数的参数。
offsets.topic.replication.factor=3
# Kafka 中的事务功能依赖于事务日志来记录所有的事务状态信息,这些信息用于确保事务的一致性和可靠性。
# 用于指定事务状态日志的副本数,也就是该日志在 Kafka 集群中的副本数。
transaction.state.log.replication.factor=3
# 如果 Kafka 集群有 3 个节点,建议将 transaction.state.log.min.isr 设置为 2。
# 这意味着事务状态日志需要至少 2 个副本是同步的才能继续写入事务。
# 这样可以保证即使一个副本不可用或出现故障,仍然有足够的副本来确保事务日志的一致性和容错性。
transaction.state.log.min.isr=2
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
#log.retention.ms=10000
# 当我们同时设置了保留时间和保留大小时,kafka满足其中任意一个条件时,就会删除日志段。
# 日志保留时间 72h
log.retention.hours=72
# 日志保留大小20GB
log.retention.bytes=21474836480
# 每个日志段大小1G
log.segment.bytes=1073741824
# 每5分钟扫描一次
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# 设置zk集群
zookeeper.connect=192.168.0.170:8111,192.168.0.171:8111,192.168.0.149:8111
# zk连接超时时间
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
## 分区副本的最小同步数,通常配置为集群节点数减一
## 如果 replication.factor=3 且 min.insync.replicas=2,生产者在 acks=all 时,只有当至少 2 个副本完成了同步,写操作才会被确认。
min.insync.replicas=2
# 它定义了每个分区的数据在 Kafka 集群中有多少个副本,以提高数据的可靠性和容错性。
# 如果一个分区的某个副本所在的节点宕机,Kafka 可以从其他副本继续读取数据,确保数据可用性。
# 如果 default.replication.factor=3,那么每个新建主题的分区会默认有 3 个副本。
default.replication.factor=3
# 允许删除主题
delete.topic.enable=true
5. 启动参数更改
编辑启动文件
vim /opt/app/kafka_2.13-3.5.2/bin/kafka-server-start.sh
找到参数KAFKA_HEAP_OPTS设置的地方, 修改为如下
export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"
6. sh文件授权
- 进入bin目录
cd /opt/app/kafka_2.13-3.5.2/bin/
- 授予.sh执行权限
chmod 755 *.sh
7. 启动集群
- 在每台机器上启动kafka
/opt/app/kafka_2.13-3.5.2/bin/kafka-server-start.sh -daemon /opt/app/kafka_2.13-3.5.2/config/server.properties
- 查看启动状态
ps -ef|grep kafka.Kafka
- 停止命令如下
/opt/app/kafka_2.13-3.5.2/bin/kafka-server-stop.sh
8. 防火墙开启端口
firewall-cmd --add-port=8114/tcp --permanent;
firewall-cmd --reload
9. 创建主题
因为我们已经在配置文件中指定num.partitions=3和default.replication.factor=3,所以就不用在创建主题的时候指定分区数和副本数了
/opt/app/kafka_2.13-3.5.2/bin/kafka-topics.sh --create --topic my-topic --bootstrap-server 192.168.0.170:8114
10. 验证主题配置
创建成功后,可以使用 --describe 命令查看主题的配置信息:
/opt/app/kafka_2.13-3.5.2/bin/kafka-topics.sh --describe --topic my-topic --bootstrap-server 192.168.0.170:8114
主要检查分区数量(PartitionCount)和副本数量(ReplicationFactor)
Topic: my-topic TopicId: 6941T0WBTpS9UaXJT-ytYw PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824,retention.bytes=21474836480
Topic: my-topic Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: my-topic Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: my-topic Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
11. 查看主题列表
/opt/app/kafka_2.13-3.5.2/bin/kafka-topics.sh --bootstrap-server 192.168.0.170:8114 --list
12. 删除指定主题
/opt/app/kafka_2.13-3.5.2/bin/kafka-topics.sh --bootstrap-server 192.168.0.170:8114 --delete --topic topic-name
安装kafka可视化管理工具(可选)
在这里我选择kafkaUI-lite,下载下方的二进制安装包就行
https://gitee.com/freakchicken/kafka-ui-lite/releases/tag/v1.2.11
1. 上传,并解压。
tar -zxvf kafka-ui-lite-1.2.11.tar.gz -C /opt/app/kafka-ui-lite-1.2.11
2. 修改配置文件
如果想修改元数据库为mysql, 修改conf/application.properties中的以下配置
在这里我们直接使用linux自带的sqlite数据库
server.port=19092
spring.datasource.driver-class-name=org.sqlite.JDBC
spring.datasource.url=jdbc:sqlite::resource:data.db
spring.datasource.username=
spring.datasource.password=
查看系统有无sqlite
rpm -qa | grep sqlite
有sqlite就会有如下输出
sqlite-3.7.17-8.el7_7.1.x86_64
如果使用mysql数据库就需要执行sql目录下的sql脚本
[root@localhost sql]# pwd
/opt/app/kafka-ui-lite-1.2.11/sql
[root@localhost sql]# ll
总用量 8
-rw-r--r--. 1 root root 1540 4月 13 2021 ddl_mysql.sql
-rw-r--r--. 1 root root 1077 4月 13 2021 ddl_sqlite.sql
3. 启动kafka-ui-lite
进入到安装目录下,执行如下命令
# 前台启动
sh bin/kafkaUI.sh start
# 后台启动
sh bin/kafkaUI.sh -d start
# 关闭后台启动的进程
sh bin/kafkaUI.sh stop
4.日志输出
/opt/app/kafka-ui-lite-1.2.11/logs
5. 防火墙开启端口
# 开启kafka-ui-lite可视化管理工具端口
firewall-cmd --zone=public --add-port=19092/tcp --permanent
# 重新加载防火墙
firewall-cmd --reload
测试
1. 进入kafka管理页面
访问地址:http://192.168.1.100:19092
尽量不要使用此客户端创建主题, 我试了一下, 创建的主题无法正常生产消费消息
2. 添加配置
3. 生产、消费消息
4. 模拟节点宕机
可以随机停止一个zk节点, kafka节点, 测试一下集群是否能够正常生产消费