当前位置: 首页 > article >正文

【Linux】环境部署kafka集群

目录

一、kafka简介

1. 主要特点

2.组件介绍

3.消息中间件的对比

二、环境准备

1.Java环境

2.Zookeeper环境

3.硬件环境集群

三、Zookeeper的集群部署

1.下载zookeeper

2.部署zookeeper集群

(1)node1节点服务器

(2)node2节点服务器和node3 节点服务器重复上述node1节点安装zookeeper步骤

(3)启动node1、node2、node3节点的zookeeper

​编辑 四、搭建kafka集群

1.下载kafka安装包

2.部署kafka集群

(1)node1节点部署

(2)node2节点服务器、node3 节点服务器重复上述node1节点安装kafka步骤

(3)启动kafka集群

(4)测试集群


一、kafka简介

Apache Kafka 是一个开源的分布式事件流平台,最初由LinkedIn开发,并于2011年贡献给了Apache软件基金会。Kafka被设计为一个高吞吐量、可扩展且容错的消息系统,它能够处理大量的实时数据流。Kafka广泛应用于构建实时数据管道和流应用。

1. 主要特点

(1)发布/订阅消息模型:Kafka支持传统的发布/订阅模式,允许多个消费者订阅并处理相同的消息流。
(2)持久化存储:消息在Kafka中被持久化到磁盘,并且可以根据配置保留一段时间(例如7天)。这使得Kafka可以作为可靠的存储系统来使用。
(3)高吞吐量:Kafka设计用于支持每秒百万级别的消息吞吐量,适用于大规模的数据处理场景。
(4)水平扩展性:可以通过增加更多的broker节点来轻松扩展Kafka集群,以处理更大的流量。
(5)多租户能力:支持多个生产者和消费者共享同一集群,同时保持隔离性和安全性。
(6)零拷贝原理:使用操作系统级别的零拷贝技术来优化网络传输效率,减少CPU使用率。
(7)分区机制:每个topic可以分成多个partition,这些partition可以分布在不同的broker上,从而实现负载均衡和故障转移。

(8)消费组:一组消费者可以组成一个消费者组,共同消费一个topic下的不同partition,确保每个partition只被组内的一个消费者消费。

2.组件介绍

        Broker:Kafka集群中的单个服务器,负责存储消息和处理客户端请求。

        Topic:特定类型的消息流,是消息的分类或主题。        

        Partition:每个topic可以分为多个partition,提高并行处理能力和吞吐量。

        Producer:向Kafka发送消息的应用程序。

        Consumer:从Kafka读取消息的应用程序。

        Consumer Group:一组消费者的集合,它们共同消费一个topic的所有消息。

        Offset:每个partition中的消息都有一个唯一的序列号,称为offset。

        Zookeeper:虽然不是Kafka的核心组件,但早期版本的Kafka依赖Zookeeper来管理集群元数据。新版本中已经内置了替代方案,减少了对Zookeeper的依赖。

3.消息中间件的对比

二、环境准备

1.Java环境

需要Java 8或更高版本(JDK)。Kafka是用Scala编写的,并且运行在JVM之上。

安装步骤参考:Linux环境下离线安装jdk1.8(内置最新的jdk安装包x64)_linux 离线安装jdk1.8-CSDN博客

2.Zookeeper环境

Kafka使用Zookeeper来管理集群元数据。

3.硬件环境集群

采用三台服务器部署kafka和zookeeper集群。

node1node2node3
ZookeeperZookeeperZookeeper
KafkaKafkaKafka

三、Zookeeper的集群部署

1.下载zookeeper

地址:Apache ZooKeeper

2.部署zookeeper集群

(1)node1节点服务器

下载zookeeper包之后上传服务器

# 1.解压 zookeeper
tar -zxvf apache-zookeeper-3.8.4-bin.tar.gz

# 2.移动到/usr/local/目录下方便管理
mv apache-zookeeper-3.8.4-bin /usr/local/zookeeper-3.8.4

# 3.切换到/usr/local/zookeeper-3.8.4/conf 配置目录下
cp zoo_sample.cfg zoo.cfg

# 4.编辑配置文件
vim zoo.cfg

zoo.cfg文件新增如下内容: 数据目录和日志目录需要提前创建

说明:server.A=B:C:D:其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

节点标记

myid配置:在/data/zookeeper/data设置myid, 这个myid的数字跟配置文件里面的server id对应

echo 1 > /data/zookeeper/data/myid

配置环境变量

# 打开环境变量的文件
vim /etc/profile

# 文件末尾添加如下内容 
export ZOOKEEPER_HOME=/usr/local/zookeeper-3.8.4
export PATH=$ZOOKEEPER_HOME/bin:$PATH

# 保存并退出 刷新环境变量
source /etc/profile

到此node1节点服务器zookeeper部署完成

(2)node2节点服务器和node3 节点服务器重复上述node1节点安装zookeeper步骤

注意:记得修改节点标记 myid 改为 2 和 3,若没改启动时会报错。

(3)启动node1、node2、node3节点的zookeeper
# 启动三台机器zookeeper
zkServer.sh start
 
# 查看集群状态
zkServer.sh status

# 停止命令
zkServer.sh stop

从下面的截图就能看出来 node1节点为主节点,其他两个是从节点。

 四、搭建kafka集群

1.下载kafka安装包

地址:Apache Kafka

2.部署kafka集群

(1)node1节点部署
# 1.将下载好的kafka包上传服务器
# 2.解压
tar -zxvf kafka_2.12-3.8.0.tgz

# 3.将解压后的目录移动到 /usr/local/下
mv kafka_2.12-3.8.0 /usr/local/kafka_2.12-3.8.0

# 4.切换到目录下 编辑kafka配置文件
cd /usr/local/kafka_2.12-3.8.0/config
vim server.properties

 主要修改如下所示几个配置信息

#--------------------------------配置文件-------------------------------------------
# broker的全局唯一编号,不能重复,多个节点需要修改 0、1、2
broker.id=0
# 监听
listeners=PLAINTEXT://:9092  #开启此项
# 日志目录
log.dirs=/data/kafka/kafka_logs      #修改日志目录(需提前创建)
# 配置zookeeper的连接(如果不是本机,需要该为ip或主机名)多个地址 用,隔开
zookeeper.connect=cong11:2181,cong12:2181,cong13:2181
#--------------------------------配置文件-------------------------------------------

 配置环境变量

# 打开环境变量的文件
vim /etc/profile

# 文件末尾添加如下内容 
export KAFKA_HOME=/usr/local/kafka_2.12-3.8.0
export PATH=$KAFKA_HOME/bin:$PATH

# 保存并退出 刷新环境变量
source /etc/profile
(2)node2节点服务器、node3 节点服务器重复上述node1节点安装kafka步骤

或者 你直接将节点1的kafka目录分别复制到节点2和节点3上,修改内容如下

node2节点:

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://ip:9092

node3节点

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://ip:9092

在添加环境变量 刷新环境变量。

(3)启动kafka集群
# 后台启动服务(3台都启动)
kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.8.0/config/server.properties 
 

可以通过 jps 或者 ps -ef|grep kafka 查看进程

(4)测试集群

1)创建一个测试主题(node1)

kafka-topics.sh --create --topic test --partitions 3 --replication-factor 3 --bootstrap-server ip1:9092
  • --replication-factor:指定副本数量
  • --partitions:指定分区数量
  • --topic:主题名称

注意:在node1节点上创建之后,会同步到node2、node3节点主题信息。说明你集群搭建成功。

2)查看刚刚创建的主题信息

kafka-topics.sh --describe --topic test --bootstrap-server IP1:9092

说明

  • Partition: 0:分区编号为0。
  • Leader: 0:当前分区的leader broker是broker 0。
  • Replicas: 0,2,1:该分区的所有副本分布在broker 0、broker 2和broker 1上。
  • Isr: 0,2,1:与leader保持同步的副本列表(In-Sync Replicas),即broker 0、broker 2和broker 1。
  • Elr: N/A:选举中的leader(Elected Leader Replica),这里没有显示。
  • LastKnownElr: N/A:上次已知的选举中的leader,这里也没有显示。

 3)查看所有的topic命令和查看指定的topic命令

# 列出指定的topic
kafka-topics.sh --bootstrap-server ip:9092 --list --topic test
 
# 列出所有的topic
kafka-topics.sh --bootstrap-server ip:9092 --list 

4)在node1上创建生产者(producer)

kafka-console-producer.sh --broker-list ip:9092 --topic test

随便写一些测试消息

5) 在node2上测试消费消息

kafka-console-consumer.sh --bootstrap-server IP1:9092,IP2:9092,IP3:9092 --topic test --from-beginning
  • --bootstrap-server IP1:9092,IP2:9092,IP3:9092

    指定Kafka集群的多个broker地址。这里列出了三个broker:JClouds:9092, Book:9092, 和 Client01:9092。这些broker地址用于初始化连接。Kafka客户端会使用这些地址来发现整个集群的其他broker。
  • --from-beginning

    从主题的最早可用偏移量(offset)开始消费消息。如果没有这个选项,消费者将从最新的偏移量开始消费。

6)删除topic

kafka-topics.sh --delete --bootstrap-server IP1:9092 --topic test

到此kafka验证完毕,有兴趣可试试kafka其他相关命令

参考:Kafka 集群部署_kafka集群安装部署-CSDN博客


http://www.kler.cn/a/319356.html

相关文章:

  • 论文解析:边缘计算网络中资源共享的分布式协议(2区)
  • 图片画廊 day2 (可复制源码)
  • 什么是数字图像?
  • MySQL Workbench导入数据比mysql命令行慢
  • STM32嵌入式闹钟系统设计与实现
  • 【AI大模型】ELMo模型介绍:深度理解语言模型的嵌入艺术
  • 工业一体机在汽车零部件工厂ESOP系统中的关键作用
  • 【记录】在返回值类型为BigDecimal情况下末尾小数位为0的会省略不显示
  • linux如何启用ipv6随机地址
  • QT 中的信号与槽机制详解
  • 2:java的介绍与基础2:Scanner
  • 【BurpSuite】Cross-site scripting (XSS 学徒部分:1-9)
  • JVM面试问题集
  • 航班延误背后隐秘原因--网络安全
  • 41. 如何在MyBatis-Plus中实现批量操作?批量插入和更新的最佳实践是什么?
  • MELON的难题- 华为OD统一考试(E卷)
  • 【Unity服务】如何使用Unity Version Control
  • transformer模型写诗词
  • 【大模型教程】如何在Spring Boot中无缝集成LangChain4j,玩转AI大模型!
  • day-60 字符串中最多数目的子序列
  • 整合SpringSecurity框架经典报错
  • 大数据实验一: Linux系统安装和使用
  • 看Threejs好玩示例,学习创新与技术(GridDistortionEffect)
  • [Redis][List]详细讲解
  • 秋分之际,又搭建了一款微信记账本小程序
  • 大模型智能体在金融公告理解领域的应用 | OPENAIGC开发者大赛高校组AI创新之星奖