当前位置: 首页 > article >正文

kafka的安装与使用

文章目录

    • kafka安装
      • 1 上传安装包
      • 2 解压安装包
      • 3 创建logs文件夹
      • 4 修改配置文件
      • 5 分发kafka
      • 6 启动kafka
    • kafka使用
      • 1 启动kafka
      • 2 关闭kafka
      • 3 查看topic
      • 4 创建topic,名称为test
      • 5 删除名称为test的topic
      • 6 向topic发送数据
      • 7 从topic里消费数据

kafka安装

kafka安装前需要确认zookeeper已安装

1 上传安装包

将安装包kafka_2.11-2.4.1.tgz上传至/opt/install_packages

在这里插入图片描述

2 解压安装包

tar -zxvf /opt/install_packages/kafka_2.11-2.4.1.tgz -C /opt/softs/

3 创建logs文件夹

cd /opt/softs/kafka_2.11-2.4.1

mkdir logs

4 修改配置文件

cd config/

##更名配置文件
mv server.properties server.properties_bak

vim  /opt/softs/kafka_2.11-2.4.1/config/server.properties

输入如下内容

#broker 的全局唯一编号,不能重复(根据实际修改)
broker.id=1
#删除 topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径(根据实际修改)
log.dirs=/opt/softs/kafka_2.11-2.4.1/logs
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
log.retention.hours=168
#配置连接Zookeeper集群地址(根据实际修改)
zookeeper.connect=bigdata03:2181,bigdata04:2181,bigdata05:2181

5 分发kafka

##在bigdata03上执行

scp -r /opt/softs/kafka_2.11-2.4.1/ root@bigdata04:/opt/softs/
scp -r /opt/softs/kafka_2.11-2.4.1/ root@bigdata05:/opt/softs/

##在bigdata04上修改server.properties,将broker.id修改为2
vim /opt/softs/kafka_2.11-2.4.1/config/server.properties

##在bigdata05上修改server.properties,将broker.id修改为3
vim /opt/softs/kafka_2.11-2.4.1/config/server.properties

6 启动kafka

## 依次在bigdata03,bigdata04,bigdata05节点上启动kafka

cd /opt/softs/kafka_2.11-2.4.1/bin

./kafka-server-start.sh ../config/server.properties &

& 代表在后台运行

在这里插入图片描述

kafka使用

1 启动kafka

--在安装kafka集群的每台节点上(bigdata03,bigdata04,bigdata05)分别启动kafka

--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin


--2 执行启动命令
./kafka-server-start.sh ../config/server.properties &


2 关闭kafka

--在安装kafka集群的每台节点上(bigdata03,bigdata04,bigdata05)分别关闭kafka

--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin

--2 执行关闭命令
./kafka-server-stop.sh stop

3 查看topic

--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin

--2 执行查看topic命令
./kafka-topics.sh --zookeeper bigdata03:2181,bigdata04:2181,bigdata05:2181 --list

4 创建topic,名称为test

--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin

--2 执行创建topic的命令

./kafka-topics.sh --zookeeper bigdata03:2181,bigdata04:2181,bigdata05:2181 --create --replication-factor 3 --partitions 1 --topic test

选项说明:

--topic 定义topic名称
--replication-factor 定义topic的副本数,副本的概念和hdfs上的文件副本一致,相同的数据存储在不同的节点上一共多少份
--partitions 定义topic的分区数


若报错:

Error while executing topic command : Replication factor: 3 larger than available brokers: 2.
[2023-05-02 17:49:56,071] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 2.

在这里插入图片描述

说明要么kafka启动的数量较少,在其他虚拟机上有没启动的,要么上面kafka的config目录下的server.properties中kafka的zookeeper的配置没有配置好。

对于kafka启动的数量较少,在其他虚拟机上有没启动的,重新启动就好,可以通过 jps 查询

kafka启动前
在这里插入图片描述
kafka启动后

在这里插入图片描述

5 删除名称为test的topic

--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin


--2 执行删除topic的命令
-- 删除topic需要在kafka目录下config目录中的server.properties中配置delete.topic.enable=true,如果不配置而去删除topic只会对topic进行标记删除


./kafka-topics.sh --zookeeper bigdata03:2181,bigdata04:2181,bigdata05:2181 --delete --topic test

在这里插入图片描述

6 向topic发送数据

--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin

--2 执行发送消息的命令
./kafka-console-producer.sh --broker-list bigdata03:9092,bigdata04:9092,bigdata05:9092 --topic test

选项说明

--broker-list 定义kafka集群的节点

在这里插入图片描述

7 从topic里消费数据

--1 切换目录
cd /opt/softs/kafka_2.11-2.4.1/bin

--2 执行消费的命令
./kafka-console-consumer.sh --bootstrap-server bigdata03:9092,bigdata04:9092,bigdata05:9092 --from-beginning --topic test


选项说明

--bootstrap-server 定义kafka集群的节点
--from-beginning 定义消费方式为从头消费

在这里插入图片描述
在这里插入图片描述


http://www.kler.cn/news/17126.html

相关文章:

  • 关于低代码开发平台的一些想法
  • 【Frame.h】
  • 手写堆priority_queue优先队列
  • 题目:16版.学生-成绩关联实体
  • Centos7快速安装Kibana并连接ES使用
  • 结合SSE实现实时位置展示与轨迹展示
  • 区块链系统探索之路:基于椭圆曲线的私钥与公钥生成
  • FPGA/Verilog HDL/AC620零基础入门学习——8*8同步FIFO实验
  • spring-模型数据和视图---视图解析器的说明以及大量代码演示
  • AUTOSAR知识点Com(十三):ComM内容分析
  • 后端程序员的前端必备【Vue】- 01 Vue入门
  • 计算机Z20-第7-8周作业
  • 17. Pod 自动管理——DeamonSet 和 Job
  • JDK8 中Arrays.sort() 排序方法解读
  • MySQL高阶——索引设计的推演
  • Redis-集合(Set)
  • 总结838
  • Java 中的异常处理机制是什么(十)
  • redis 持久化 RDB + AOF
  • 多城市门店店铺展示地图导航pc/h5系统开发
  • Packet Tracer - 研究直连路由
  • 第十五章 角色移动旋转实例
  • ubuntu16.04升级到20.04后报错 By not providing “FindEigen.cmake“
  • 超细Redis(一)
  • MySQL 一条SQL语句是如何执行的?
  • JVM调优入门指南:掌握步骤、参数和场景
  • 【前端面经】浏览器-http和https的区别及优缺点?
  • TensorRT:自定义插件学习与实践 002:实现GELU
  • MyBatis详细笔记
  • Java I/O