当前位置: 首页 > article >正文

centos kafka单机离线安装kafka服务化kafka tool连接kafka

a.版本&环境

linux版本:centos7.6

kafka: kafka_2.12

zookeeper:zookeeper_3.6.3(之前已经安装:linux zookeeper安装并服务化-CSDN博客)

java:1.8(之前已经安装)

windows kafka tool: 2.1

b.kafka单机安装

1.切换目录 cd downloads/,利用rz命令,上传kafka 及kafka eagle压缩包至服务器

2.解压文件:tar -zxvf kafka_2.12-2.6.2.tgz

创建文件夹:mdkir -p /opt/tool/kafka

创建文件夹:mdkir -p /home/kafka/log

剪切文件至指定目录:mv kafka_2.12-2.6.2 /opt/tool/kafka/

切换目录:cd /opt/tool/kafka/kafka_2.12-2.6.2/

3.修改配置并开放9092端口

执行:vim config/server.properties,修改以下配置

#机器在集群中的唯一标识, 和zookeeper的myid性质一样,每台服务器的broker.id都不能相同

broker.id=151

#kafka对外提供服务的端口默认是9092 这个IP不能用0.0.0.0这种


listeners=PLAINTEXT://192.168.42.151:9092 

advertised.listeners=PLAINTEXT://192.168.42.151:9092

消息存放的目录, 这个目录可以配置为", "逗号分割的表达式, 上面的num.io.threads要大于这个

目录的个数, 如果配置多个目录, 新创建的topic将消息持久化的地方是, 当前以逗号分割的目录中,

哪个分区数最少就放那一个

log.dirs=/home/kafka/log

#每个日志文件删除之前保存的时间, 默认168小时(7天)

#发送到Queue中的消息, 在被消费之后, 不会马上被删掉, 而是有后台进程在4天后去删除

log.retention.hours=96

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

zookeeper.connect=192.168.42.151:2181

#Timeout in ms for connecting to zookeeper, 默认6秒, 改成60秒, 因为systemd文件自启动

kafka时候需要等待zookeeper先启动完毕

zookeeper.connection.timeout.ms=60000

c.kafka测试及常用指令
  1. 切换目录:cd/opt/tool/kafka/kafka_2.12-2.6.2/bin

2.创建my-test1,my-test topic

./kafka-topics.sh --bootstrap-server 192.168.42.151:9092 --create --topic my-test topic --replication-factor 1 --partitions 3

./kafka-topics.sh --bootstrap-server 192.168.42.151:9092 --create --topic my-test1 topic --replication-factor 1 --partitions 3

3.修改topic分区数

./kafka-topics.sh --bootstrap-server 192.168.42.151:9092 --alter --topic my-test1 topic --partitions 5

4.查看topic列表

./kafka-topics.sh --list --bootstrap-server 192.168.42.151:9092

5.删除topic

./kafka-topics.sh --bootstrap-server 192.168.42.151:9092 --delete --topic my-test topic

6.发布消息

./kafka-console-producer.sh --bootstrap-server 192.168.42.151:9092 --topic my-test1

7.消费消息

./kafka-console-consumer.sh --bootstrap-server 192.168.42.151:9092 --topic my-test1 --group group-test

8.查看消费组列表

./kafka-consumer-groups.sh --bootstrap-server 192.168.42.151:9092 --list

9.查看消费者组偏移量

./kafka-consumer-groups.sh --bootstrap-server 192.168.42.151:9092 --describe --group group-test

10.查看消费组members

./kafka-consumer-groups.sh --bootstrap-server 192.168.42.151:9092 --describe --group group-test --members

./kafka-consumer-groups.sh --bootstrap-server 192.168.42.151:9092 --describe --group group-test --state

./kafka-consumer-groups.sh --bootstrap-server 192.168.42.151:9092 --describe --group group-test --members --verbose

11.指定内容查看消息

./kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic paymentmgt_download_asyn_deal --from-beginning | grep "1291248658964021250"

d.kafka服务化

切换目录:cd /usr/lib/systemd/system

创建文件:cd /usr/lib/systemd/system

打开文件并新增如下内容:vim kafka.service

[Uint]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target zookeeper.service

[Service]
Type=forking
Environment=JAVA_HOME=/opt/tool/java/jdk1.8.0_311
ExecStart=/opt/tool/kafka/kafka_2.12-2.6.2/bin/kafka-server-start.sh -daemon /opt/tool/kafka/kafka_2.12-2.6.2/config/server.properties
ExecStop=/opt/tool/kafka/kafka_2.12-2.6.2/bin//kafka-server-stop.sh
Restart=always
StartLimitInterval=60
StartLimitBurst=86400

[Install]
WantedBy=multi-user.target

使kafka.service立即生效执行命令:systemctl daemon-reload

e.kafka tool连接kafka

开放zookeeper端口:firewall-cmd --zone=public --add-port=2181/tcp --permanent

开放kafka端口:firewall-cmd --zone=public --add-port=2181/tcp --permanent

使增加的端口立即生效:systemctl daemon-reload

如果连接不成功,可以通过查看端口是否被用,来判断zookeeper及kafka是否启动成功:netstat -tunlp |grep 2181

netstat -tunlp |grep 9092


http://www.kler.cn/a/429304.html

相关文章:

  • 01.17周五F34-Day58打卡
  • Golang Gin系列-3:Gin Framework的项目结构
  • Kotlin 协程基础十 —— 协作、互斥锁与共享变量
  • [操作系统] 深入理解操作系统的概念及定位
  • Spring Boot 动态表操作服务实现
  • 【Flink系列】4. Flink运行时架构
  • MacOS 下 pico/pico2 学习笔记
  • java+springboot+mysql党务(党员)管理系统
  • ros2人脸检测
  • 【中间件开发】Nginx中过滤器模块实现
  • MFC 自定义静态文本控件:增强型标签控件
  • 40分钟学 Go 语言高并发:负载均衡与服务治理
  • 【前端】全面解析 JavaScript 中的 this 指向规则
  • 二阶段nginx1.0
  • 一些好用的网站
  • 深入理解Linux进程管理机制
  • 服务器数据恢复—LINUX下各文件系统删除/格式化的数据恢复可行性分析
  • java:commons-configuration2读取yaml及组合配置定义(CombinedConfiguration)
  • 华为:LLM工具调用数据合成方法
  • 开源向量数据库介绍说明
  • LearnOpenGL学习(高级OpenGL --> 帧缓冲,立方体贴图,高级数据)
  • 策略模式实战 - 猜拳游戏
  • 如何配置Jackson以忽略Java类中为null或空(empty)的字段
  • 避大坑!Vue3中reactive丢失响应式的问题
  • guava 整合springboot 自定义注解实现接口鉴权调用保护
  • 题海拾贝:力扣 231. 2 的幂