当前位置: 首页 > article >正文

运维学习————kafka(1)

关于消息队列的概念,三个作用(解耦、异步、削峰),使用场景,消息队列的两种模式,常见的MQ的框架等等,可以看我之前的RabbitMQ的文章,那里有详细介绍,这里就不过多赘述了........直接就kafka走起

一、Kafka简介 

中文网址:kafka中文文档

Kafka官网:http://kafka.apache.org/

kafka是由apache软件基金会开发的一个开源流处理框架,由JAVA和scala语言编写。是一个高吞吐量的分布式的发布和订阅消息的一个系统。Kafka用于构建实时的数据管道和流式的app.它可以水平扩展,高可用,速度快,并且已经运行在数千家公司的生产环境。

二、基本术语 

topic(话题):kafka将消息分门别类,每一类的消息称之话题,是逻辑上的一个概念,如果是真正到磁盘上,映射的是一个partition分区的一个目录。

生产者(producer):发布消息的对象称之为生产者,只负责数据的产生,生产的来源,可以不在kafka集群上,而是来自其他的业务系统。

消费者(consumer):订阅消息并处理发布消息的对象,称为消费者。

消费者组(consumerGroup):多个消费者可以构成消费者组,同一个消费者组的消费者,只能消费一个topic数据,不能重复消费。

broker :kafka本身可以是一个集群,集群中的每一个服务器都是一个代理,这个代理称为broker。只负责消息的存储,不管生产者和消费者,和他们没有任何关系。在集群中每个broker有唯一个ID,不能重复。

三、集群搭建

以下操作需要同时在三台机子进行,不想的话,可以先操作一台,然后把这一台的配置全部发送到另外两台,这个时候就需要在第一台机子上做免密登录的操作,保证第一个机子可以发送文件到另外两台

kafka_2.12-2.7.0.tar:kafka-2.12-2.7.0.tar

#解压缩
tar -xzvf /software/kafka_2.12-2.7.0.tgz -C /usr/

#修改名称
mv /usr/kafka_2.12-2.7.0/  /usr/kafka

#配置环境变量
vi /etc/profile


#添加以下代码
export KAFKA_HOME=/usr/kafka
export PATH=$PATH:$ZK_HOME/bin:$KAFKA_HOME/bin

#使配置文件生效
source /etc/profile

#测试,是否生效
echo $KAFKA_HOME

 

kafka是使用scala 语言编写 2.12是该语言的版本 2.7.0这是kafka版本

#进入kafka目录
cd /usr/kafka

#创建目录(存放消息),为后面配置做准备  	
mkdir logs

#修改配置server.properties文件
vim /usr/kafka/config/server.properties

server.properties配置

#broker的全局唯一编号,不能重复
broker.id=0
#是否允许删除topic
delete.topic.enable=true
#处理网络请求和响应的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的最大缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/usr/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1


############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1



#以下配置控制日志段的处理。可以将策略设置为在一段时间后或在给定大小累积后删除段。只要满足这些条件中的*任一*项,就会删除段。删除总是从日志的末尾开始
#segment文件保留的最长时间,超时将被删除,单位小时,默认是168小时,也就是7天
log.retention.hours=168
#基于大小的日志保留策略。除非剩余的段低于log.retention.bytes,否则将从日志中删除段。独立于log.retention.hours的功能
#log.retention.bytes=1073741824
#日志段文件的最大大小。当达到此大小时,将创建一个新的日志段。
log.segment.bytes=1073741824

#检查日志段以查看是否可以根据保留策略删除日志段的间隔
log.retention.check.interval.ms=300000


############################# Zookeeper #############################

#配置连接Zookeeper集群地址
zookeeper.connect=cluster1:2181,cluster2:2181,cluster3:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0

集群配置那里,需要先在三个机子的/etc/hosts下添加这三个机子的ip配置

192.168.37.152 cluster1

192.168.37.153 cluster2

192.168.37.154 cluster3

 producer.properties

#修改producer.properties
vim /usr/kafka/config/producer.properties



#修改这一行就行
bootstrap.servers=cluster1:9092,cluster2:9092,cluster3:9092

consumer.properties

#修改consumer.properties
vim /usr/kafka/config/consumer.properties


#修改这一行就行
bootstrap.servers=cluster1:9092,cluster2:9092,cluster3:9092

免密登录

#发送配置好的kafka到另外两台机子(先做免密登录):
	 ssh-keygen -t rsa
	 ssh-copy-id cluster2
	 ssh-copy-id cluster3
	 scp -r /usr/kafka/  cluster2:/usr/
	 scp -r /usr/kafka/  cluster3:/usr/
#检查发送是否成功,在all session执行:
	 ls /usr
#修改broker.id(切记)   在cluster2和cluster3上修改broker.id
	 vim /usr/kafka/config/server.properties
	    broker.id=2
        broker.id=3
#发送环境变量配置文件:
	 scp -r  /etc/profile  cluster2:/etc/
	 scp -r  /etc/profile  cluster3:/etc/
#在all session执行:
	 source  /etc/profile
	 echo  $KAFKA_HOME
#发送hosts配置文件:
	 scp -r  /etc/hosts  cluster2:/etc/
	 scp -r  /etc/hosts  cluster3:/etc/
#测试是否成功:在all session执行:
	 cat  /etc/hosts

四、集群的启动和关闭

启动kafka之前一定要保证zk在启动,并且可用

启动zookeeper

启动zk ,关于zk脚本同时启动,可以参考我的另一个文章

https://blog.csdn.net/m0_73376570/article/details/141803080

zk启动之后的效果:

启动kafka

在all session中执行:

#启动kafka
kafka-server-start.sh -daemon  /usr/kafka/config/server.properties

#检查是否运行成
jps
ps -ef | grep java

#停止kafka
kafka-server-stop.sh

五、常用命令

#查看当前服务器中的所有topic主题:
kafka-topics.sh --zookeeper cluster1:2181 --list

#如果是zk集群可以使用这样的命令:
kafka-topics.sh --zookeeper cluster1:2181,cluster2:2181,cluster3:2181 --list

#创建topic: list
kafka-topics.sh --zookeeper cluster2:2181 --create --replication-factor 3 --partitions 5 --topic ordertopic

kafka-topics.sh --zookeeper cluster2:2181 --create --replication-factor 2 --partitions 2 --topic goodstopic

#删除topic
kafka-topics.sh --zookeeper cluster1:2181 --delete --topic goodstopic

#生产消息
kafka-console-producer.sh --broker-list cluster2:9092  --topic goodstopic

#消费消息
kafka-console-consumer.sh  --bootstrap-server cluster2:9092  --from-beginning  --topic  goodstopic

#同组消费者消费消息(多个窗口)
kafka-console-consumer.sh --bootstrap-server kafka1:9092 --consumer-property group.id=gtest  --from-beginning --topic goodstopic

#查看一个topic详情
kafka-topics.sh --zookeeper  cluster2:2181,cluster1:2181 --describe --topic tp1

参数说明:

--zookeeper    链接zk

--replication-factor   指定副本数目(副本数目不能大于总的brokers数目)

--partitions  指定分区数

--topic  指定topic名称

partitioncount   分区总数量

replicationfactor    副本数量

partition 分区

leader  每个分区有3个副本,每个副本都有leader

replicas   所有副本节点,不管leader follower

isr: 正在服务中的节点


http://www.kler.cn/a/291611.html

相关文章:

  • iClent3D for Cesium 实现无人机巡检飞行效果
  • 如何在window 使用 conda 环境下载大模型
  • PostgreSQL标识符长度限制不能超过63字节
  • 54、库卡机器人轴的软限位设置
  • 你好Python
  • MacPorts 中安装高/低版本软件方式,以 RabbitMQ 为例
  • 【LVI-SAM】激光点云如何辅助视觉特征深度提取
  • 前端黑科技:使用 JavaScript 实现网页扫码功能
  • ElasticSearch-关联关系
  • Ruoyi若依框架中工单管理(智能售货机运营管理系统)
  • 前端知识HTMLCSS
  • 软件测试 - 性能测试 (概念)(并发数、吞吐量、响应时间、TPS、QPS、基准测试、并发测试、负载测试、压力测试、稳定性测试)
  • 浙大数据结构:01-复杂度2 Maximum Subsequence Sum
  • Spring Boot:医疗排班系统开发的技术革新
  • java 给list对象根据给定条数进行分组工具类
  • Ai Illustrator 取消吸附到像素点,鼠标拖动的时候只能到像素点
  • 如何给Maven添加阿里云镜像
  • union 的正确食用方法
  • 任务栏透明怎么设置?适配最新版 Windows 电脑的方法介绍(图文教程)
  • 【文献阅读】AdaLora: Adaptive Budget Allocation for Parameter-Efficient Fine-Tuning
  • 鸿蒙ndk
  • List 集合指定值升序降序排列Comparator实现
  • JVM系列(八) -运行期的几种优化技术
  • TikTok养号一般养几天?账号起步方法
  • 0.3 学习Stm32经历过的磨难
  • 深度学习系列69:tts技术原理