当前位置: 首页 > article >正文

kafka介绍,kafka集群环境搭建,kafka命令测试,C++实现kafka客户端

目录

  • kafka介绍
  • kafka集群环境搭建
    • zookeeper安装与配置
    • kafka安装与配置
  • kafka命令测试
  • C++实现kafka客户端
    • librdkafka库编译
    • 新版本cmake编译
    • cppkafka库编译
    • C++实现kafka生产者和消费者客户端

kafka介绍

定义与概述
Apache Kafka 是一个开源的分布式流处理平台,最初由 LinkedIn 开发,后来贡献给了 Apache 软件基金会。它被设计用于处理实时数据流,能够以高吞吐量、低延迟的方式处理大量的消息。Kafka 本质上是一个消息队列,但它在传统消息队列的基础上进行了扩展,更适合处理大规模的实时数据。
Kafka特点是生产者消费者模式,先进先出(FIFO)保证顺序,自己不丢数据,默认每隔7天清理数据。
核心概念

  • 主题(Topic):是 Kafka 中消息的逻辑分类,类似于数据库中的表或者文件系统中的文件夹。生产者将消息发布到特定的主题,消费者从主题中订阅并消费消息。
  • 生产者(Producer):负责将消息发送到 Kafka 的主题中。生产者可以根据需要将消息发送到不同的主题,并且可以控制消息的分区。
  • 消费者(Consumer):从 Kafka 的主题中读取消息。消费者通常以消费者组(Consumer Group)的形式存在,同一个消费者组中的消费者可以共同消费一个主题中的消息,从而实现负载均衡。
  • 分区(Partition):每个主题可以被划分为多个分区,分区是 Kafka 实现分布式的基础。分区中的消息是有序的,但不同分区之间的消息顺序是不确定的。通过分区,Kafka 可以实现消息的并行处理,提高系统的吞吐量。
  • 代理(Broker):Kafka 集群由多个代理组成,每个代理是一个独立的 Kafka 服务器。代理负责存储和管理消息,客户端(生产者和消费者)通过与代理进行通信来发送和接收消息。
    应用场景
  • 日志收集:Kafka 可以作为日志收集系统的核心组件,将各个服务产生的日志收集到 Kafka 中,然后由其他系统进行处理和分析。
  • 消息系统:作为传统的消息队列使用,实现系统之间的解耦和异步通信。
  • 流式处理:结合 Kafka Streams 等流处理库,对实时数据流进行处理和分析,例如实时计算、实时监控等。
    在这里插入图片描述

Kafka 与 ZooKeeper 的关系
在早期版本的 Kafka 中,ZooKeeper 是 Kafka 集群的重要组成部分,Kafka 使用 ZooKeeper 来存储集群的元数据,如主题、分区、代理的信息,以及进行领导者选举等操作。但从 Kafka 2.8 版本开始,Kafka 引入了 KRaft 模式,该模式允许 Kafka 集群在不依赖 ZooKeeper 的情况下运行,不过目前很多生产环境依然在使用 Kafka 与 ZooKeeper 结合的部署方式。
在这里插入图片描述

kafka集群环境搭建

使用三个centos7虚拟机模拟集群,三个虚拟机修改主机名:

hostnamectl set-hostname node0
hostnamectl set-hostname node1
hostnamectl set-hostname node2

配置hosts: vim /etc/hosts

192.168.3.147 node0
192.168.3.146 node1
192.168.3.145 node2

zookeeper安装与配置

官网下载地址:https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
下载二进制文件包,这里下载:apache-zookeeper-3.9.3-bin.tar.gz

1、把压缩包拷贝到一个节点主机,比如node0主机,执行:

mkdir -p /opt/workspace
#压缩包拷贝到该目录
cd /opt/workspace
tar xvf apache-zookeeper-3.9.3-bin.tar.gz
mv apache-zookeeper-3.9.3-bin zookeeper

2、创建和修改配置文件:

cd /opt/workspace/zookeeper
# zookeeper数据存放路径
mkdir -p data/zkdata
# zookeeper日志存放路径
mkdir -p data/zklog
cp conf/zoo_sample.cfg conf/zoo.cfg

vim conf/zoo.cfg

3、修改如下字段,注意字段后面不能带空格

dataDir=/opt/workspace/zookeeper/data/zkdata
dataLogDir=/opt/workspace/zookeeper/data/zklog

server.1=node0:2888:3888
server.2=node1:2888:3888
server.3=node2:2888:3888

4、创建myid文件

cd /opt/workspace/zookeeper/data/zkdata/
vim myid
# 输入内容1,保存。

5、其他节点node1和node2创建工作目录

mkdir -p /opt/workspace

6、从node0节点把zookeeper文件夹拷贝到node1和node2

scp -r zookeeper/ root@node1:/opt/workspace/
scp -r zookeeper/ root@node2:/opt/workspace/

在node1和node2把myid文件内容分别修改为2和3.

7、三个节点配置zookeeper环境变量

vim /etc/profile
# 文件最下面添加如下内容
export ZOOKEEPER_HOME=/opt/workspace/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH

使环境变量生效:

source /etc/profile

8、启动zookeeper集群,分别在三个节点执行,start后等待一会再执行status查询状态

zkServer.sh start
zkServer.sh status

主节点
在这里插入图片描述
从节点
在这里插入图片描述

kafka安装与配置

下载地址:https://kafka.apache.org/downloads
下载安装包:kafka_2.12-3.9.0.tgz

1、部署到node0主机

cd /opt/workspace/
# 安装包拷贝到当前目录
tar xvf kafka_2.12-3.9.0.tgz
mv kafka_2.12-3.9.0 kafka

2、修改kafka配置

cd kafka/
mkdir logs
vim config/server.properties

重点修改下面3个参数

broker.id=1
log.dirs=/opt/workspace/kafka/logs
zookeeper.connect=node0:2181,node1:2181,node2:2181

3、拷贝到node1和node2主机,分别修改server.properties对应的broker.id为2、3

scp -r kafka root@node1:/opt/workspace
scp -r kafka root@node2:/opt/workspace

4、三个节点配置kafka环境变量

vim /etc/profile
# 文件最下面添加如下内容
export KAFKA_HOME=/opt/workspace/kafka
export PATH=$KAFKA_HOME/bin:$PATH

使环境变量生效:

source /etc/profile

5、启动kafka集群
确保zookeeper集群已启动,然后三个节点执行:

kafka-server-start.sh -daemon /opt/workspace/kafka/config/server.properties &

在这里插入图片描述
停止kafka服务。

kafka-server-stop.sh

6、查看kafka进程

# 没有则安装jps
yum install -y  java-1.8.0-openjdk-devel.x86_64
jps

在这里插入图片描述
7、查看zookeeper上的broker节点

# 进入zookeeper命令
zookeeper-shell.sh localhost:2181
# 查看broker结点情况
ls /brokers/ids

通过下图可知三个节点都已经注册到zookeeper。
在这里插入图片描述

kafka命令测试

Topic(主题)命令

# 查看Topic列表
kafka-topics.sh --bootstrap-server localhost:9092 --list
# 创建Topic
# –partitions:分区数量
# –replication-factor:副本数量,不能大于broker数量
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic order --partitions 1 --replication-factor 3
# 查看Topic信息
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic order
# 修改Topic,分区数只能增加,不能减少;副本数不支持修改。
kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic order --partitions 3
# 删除Topic
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic order

在这里插入图片描述
分区数量修改为3个:
在这里插入图片描述

生产者和消费者命令

# 生成消息
kafka-console-producer.sh  --bootstrap-server localhost:9092 --topic order
# 消费消息,从最新的地方开始
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order
# 消费消息,从头开始
kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic order

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
消费者组,执行命令自动创建消费组。

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order --group group_1
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic order --group group_2

如果两个kafka-console-consumer.sh命令指定了相同的消费组,则同一个消息只会被一个kafka-console-consumer.sh命令消费一次。
如果两个kafka-console-consumer.sh命令指定了不同的消费组,则同一个消息会被两个kafka-console-consumer.sh命令各消费一次。

# 查看消费者Group列表
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 查看消费者Group详情
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group_1
# 删除消费者Group
kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group group_1 --delete

正在消费的group不能被删除。
在这里插入图片描述

C++实现kafka客户端

librdkafka、cmake、libcppkafka都是从源码编译,并且不安装到系统目录。

librdkafka库编译

1、从源码编译。
下载地址:https://github.com/confluentinc/librdkafka/releases
下载源码包:librdkafka-2.8.0.tar.gz

# librdkafka选择不依赖openssl(本次当依赖openssl编译后生成动态库,调用动态库时报错找不到SSL_CTX_use_cert_and_key)
tar xvf librdkafka-2.8.0.tar.gz
cd librdkafka-2.8.0/
./configure
make
# C的封装库在src文件夹,C++的封装库在src-cpp文件夹

2、直接安装(不推荐使用)

yum install librdkafka-devel

新版本cmake编译

cppkafka需要cmake版本3.9.2以上,可能需要编译较新版本cmake。
还是从源码编译,下载地址:https://cmake.org/download/
下载最新版本:cmake-3.31.5.tar.gz

# 可能需要安装依赖openssl,也可选择不依赖openssl
yum -y install openssl-devel
xvf cmake-3.31.5.tar.gz
cd cmake-3.31.5/
./bootstrap
make
# 在bin目录生成可执行文件cmake
cd bin/
./cmake --version

# 需要安装到系统目录时执行
make install

cppkafka库编译

cppkafka依赖librdkafka库。
下载地址:https://github.com/mfontanini/cppkafka/releases
下载源码包:cppkafka-0.4.1.tar.gz

tar xvf cppkafka-0.4.1.tar.gz
cd cppkafka-0.4.1
mkdir build
cd build/
# 使用刚刚编译的新版cmake构建,指定librdkafka库路径和头文件路径
# 注意:指定的头文件目录里面还有一级目录,完整路径:/root/code/librdkafka/librdkafka/rdkafkacpp.h
/root/soft/cmake-3.31.5/bin/cmake  -DRdKafka_LIBRARY_PATH=/root/code/librdkafka/lib/librdkafka++.so -DRdKafka_INCLUDE_DIR=/root/code/librdkafka ..
make
# 在cppkafka-0.4.1/build/src/lib64目录生成动态库libcppkafka.so.0.4.1

C++实现kafka生产者和消费者客户端

这里使用cppkafka自带的例子:cppkafka-0.4.1/examples/producer_example.cpp和consumer_example.cpp。

源码这里不贴了,地址在这里:
https://github.com/mfontanini/cppkafka/blob/master/examples/producer_example.cpp
https://github.com/mfontanini/cppkafka/blob/master/examples/consumer_example.cpp

简单写一个CMakeLists.txt:

cmake_minimum_required(VERSION 2.8) #设置cmake最低版本
project("consumer" CXX)	#设置项目名称
SET(PREFIX ${CMAKE_CURRENT_SOURCE_DIR})#设置普通变量,CMAKE_CURRENT_SOURCE_DIR为当前cmake文件目录
SET(BINDIR ${PREFIX})
message (">>> pwd = ${PREFIX}")#打印变量

# 添加依赖头文件目录
SET(INCDIR
    ${PREFIX}/include
)
INCLUDE_DIRECTORIES(${INCDIR})

SET(SRCS
    # ${PREFIX}/producer_example.cpp
    ${PREFIX}/consumer_example.cpp
)

#添加依赖库目录
SET(LIBDIR
    ${PREFIX}/lib
)
LINK_DIRECTORIES(${LIBDIR})

#添加依赖库
SET(LIB_SO
    # -lssl
    # -lcrypto
    -lrdkafka
    -lcppkafka
    -lboost_program_options
)

SET(RUN_MAIN "consumer")
#设置C++编译选项
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -std=c++11 -march=native -Wall")

# 添加可执行文件,并配相关文件
ADD_EXECUTABLE(${RUN_MAIN} ${SRCS})

TARGET_LINK_LIBRARIES(${RUN_MAIN} ${LIB_SO})#添加依赖库

# 安装可执行文件到指定位置,并指定权限
INSTALL(TARGETS ${RUN_MAIN} DESTINATION ${BINDIR} PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE WORLD_EXECUTE)

把编译好的rdkafka和cppkafka头文件和库文件拷贝过来,项目结构如下:

├── CMakeLists.txt
├── consumer_example.cpp
├── include
│ ├── cppkafka
│ │ ├── buffer.h
│ │ ├── clonable_ptr.h
│ │ ├── CMakeLists.txt
│ │ ├── configuration_base.h
│ │ ├── configuration.h
│ │ ├── configuration_option.h
│ │ ├── consumer.h
│ │ ├── cppkafka.h
│ │ ├── detail
│ │ │ ├── callback_invoker.h
│ │ │ └── endianness.h
│ │ ├── error.h
│ │ ├── event.h
│ │ ├── exceptions.h
│ │ ├── group_information.h
│ │ ├── header.h
│ │ ├── header_list.h
│ │ ├── header_list_iterator.h
│ │ ├── kafka_handle_base.h
│ │ ├── logging.h
│ │ ├── macros.h
│ │ ├── message_builder.h
│ │ ├── message.h
│ │ ├── message_internal.h
│ │ ├── message_timestamp.h
│ │ ├── metadata.h
│ │ ├── producer.h
│ │ ├── queue.h
│ │ ├── topic_configuration.h
│ │ ├── topic.h
│ │ ├── topic_partition.h
│ │ ├── topic_partition_list.h
│ │ └── utils
│ │ ├── backoff_committer.h
│ │ ├── backoff_performer.h
│ │ ├── buffered_producer.h
│ │ ├── compacted_topic_processor.h
│ │ ├── consumer_dispatcher.h
│ │ ├── poll_interface.h
│ │ ├── poll_strategy_base.h
│ │ └── roundrobin_poll_strategy.h
│ └── librdkafka
│ ├── rdkafkacpp.h
│ ├── rdkafka.h
│ └── rdkafka_mock.h
├── lib
│ ├── libcppkafka.so -> libcppkafka.so.0.4.1
│ ├── libcppkafka.so.0.4.1
│ ├── librdkafka.a
│ ├── librdkafka++.a
│ ├── librdkafka_cgrp_synch.png
│ ├── librdkafka-dbg.a
│ ├── librdkafka+±dbg.a
│ ├── librdkafka-dbg.so.1
│ ├── librdkafka+±dbg.so.1
│ ├── librdkafka.so
│ ├── librdkafka++.so
│ ├── librdkafka.so.1
│ ├── librdkafka++.so.1
│ ├── librdkafka-static.a
│ └── librdkafka-static-dbg.a
└── producer_example.cpp

编译:

# 当前在CMakeLists.txt目录
mkdir build
cd build
/root/soft/cmake-3.31.5/bin/cmake ..
make
# 当前目录生成consumer,修改CMakeLists.txt再次编译生成producer

生产者,指定brokers、topic、partition(可选)。
生产者命令行输入信息,消费者可以接收到消息。
在这里插入图片描述
消费者,指定brokers、topic、group。
在这里插入图片描述


http://www.kler.cn/a/543248.html

相关文章:

  • docker使用dockerfile打包镜像(docker如何打包)
  • SSM仓库物品管理系统 附带详细运行指导视频
  • Spring AI 介绍
  • AJAX XML技术详解
  • 防火墙安全综合实验
  • IPD项目管理是什么?
  • 如何选择合适的搜索关键词优化工具?
  • 按键可视化工具——Keyviz
  • 开源堡垒机 JumpServer 社区版实战教程:一步步构建企业安全运维环境
  • SQL Server:查看当前连接数和最大连接数
  • 【Vue3 入门到实战】13. 常用 API
  • 探索技术新边界:让 HTML 电子凭证与二维码、PDF 完美融合
  • 网络安全 理清 安全 边界
  • 计算机毕业设计制造业MES生产管理平台 MES 生产制造源码+文档+运行视频+讲解视频)
  • 【前端】ES6新特性汇总
  • AI直播的未来:智能化、自动化与个性化并存
  • AI时代的前端开发:效率、协作与ScriptEcho
  • C++设计模式 —— 单例模式
  • TiDB Vector 本地部署的亲身体验与心得
  • Druid GetConnectionTimeoutException解决方案之一
  • 基础连接已经关闭: 服务器关闭了本应保持活动状态的连接
  • Cursor无法安装插件解决方法
  • 操作系统|ARM和X86的区别,存储,指令集
  • 在anaconda环境中构建flask项目的exe文件
  • 后端面试题
  • 1.2 快速搭建环境