运维学习————kafka(1)
关于消息队列的概念,三个作用(解耦、异步、削峰),使用场景,消息队列的两种模式,常见的MQ的框架等等,可以看我之前的RabbitMQ的文章,那里有详细介绍,这里就不过多赘述了........直接就kafka走起
一、Kafka简介
中文网址:kafka中文文档
Kafka官网:http://kafka.apache.org/
kafka是由apache软件基金会开发的一个开源流处理框架,由JAVA和scala语言编写。是一个高吞吐量的分布式的发布和订阅消息的一个系统。Kafka用于构建实时的数据管道和流式的app.它可以水平扩展,高可用,速度快,并且已经运行在数千家公司的生产环境。
二、基本术语
topic(话题):kafka将消息分门别类,每一类的消息称之话题,是逻辑上的一个概念,如果是真正到磁盘上,映射的是一个partition分区的一个目录。
生产者(producer):发布消息的对象称之为生产者,只负责数据的产生,生产的来源,可以不在kafka集群上,而是来自其他的业务系统。
消费者(consumer):订阅消息并处理发布消息的对象,称为消费者。
消费者组(consumerGroup):多个消费者可以构成消费者组,同一个消费者组的消费者,只能消费一个topic数据,不能重复消费。
broker :kafka本身可以是一个集群,集群中的每一个服务器都是一个代理,这个代理称为broker。只负责消息的存储,不管生产者和消费者,和他们没有任何关系。在集群中每个broker有唯一个ID,不能重复。
三、集群搭建
以下操作需要同时在三台机子进行,不想的话,可以先操作一台,然后把这一台的配置全部发送到另外两台,这个时候就需要在第一台机子上做免密登录的操作,保证第一个机子可以发送文件到另外两台
kafka_2.12-2.7.0.tar:kafka-2.12-2.7.0.tar
#解压缩
tar -xzvf /software/kafka_2.12-2.7.0.tgz -C /usr/
#修改名称
mv /usr/kafka_2.12-2.7.0/ /usr/kafka
#配置环境变量
vi /etc/profile
#添加以下代码
export KAFKA_HOME=/usr/kafka
export PATH=$PATH:$ZK_HOME/bin:$KAFKA_HOME/bin
#使配置文件生效
source /etc/profile
#测试,是否生效
echo $KAFKA_HOME
kafka是使用scala 语言编写 2.12是该语言的版本 2.7.0这是kafka版本
#进入kafka目录
cd /usr/kafka
#创建目录(存放消息),为后面配置做准备
mkdir logs
#修改配置server.properties文件
vim /usr/kafka/config/server.properties
server.properties配置
#broker的全局唯一编号,不能重复
broker.id=0
#是否允许删除topic
delete.topic.enable=true
#处理网络请求和响应的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的最大缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/usr/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
#以下配置控制日志段的处理。可以将策略设置为在一段时间后或在给定大小累积后删除段。只要满足这些条件中的*任一*项,就会删除段。删除总是从日志的末尾开始
#segment文件保留的最长时间,超时将被删除,单位小时,默认是168小时,也就是7天
log.retention.hours=168
#基于大小的日志保留策略。除非剩余的段低于log.retention.bytes,否则将从日志中删除段。独立于log.retention.hours的功能
#log.retention.bytes=1073741824
#日志段文件的最大大小。当达到此大小时,将创建一个新的日志段。
log.segment.bytes=1073741824
#检查日志段以查看是否可以根据保留策略删除日志段的间隔
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
#配置连接Zookeeper集群地址
zookeeper.connect=cluster1:2181,cluster2:2181,cluster3:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0
集群配置那里,需要先在三个机子的/etc/hosts下添加这三个机子的ip配置
192.168.37.152 cluster1
192.168.37.153 cluster2
192.168.37.154 cluster3
producer.properties
#修改producer.properties
vim /usr/kafka/config/producer.properties
#修改这一行就行
bootstrap.servers=cluster1:9092,cluster2:9092,cluster3:9092
consumer.properties
#修改consumer.properties
vim /usr/kafka/config/consumer.properties
#修改这一行就行
bootstrap.servers=cluster1:9092,cluster2:9092,cluster3:9092
免密登录
#发送配置好的kafka到另外两台机子(先做免密登录):
ssh-keygen -t rsa
ssh-copy-id cluster2
ssh-copy-id cluster3
scp -r /usr/kafka/ cluster2:/usr/
scp -r /usr/kafka/ cluster3:/usr/
#检查发送是否成功,在all session执行:
ls /usr
#修改broker.id(切记) 在cluster2和cluster3上修改broker.id
vim /usr/kafka/config/server.properties
broker.id=2
broker.id=3
#发送环境变量配置文件:
scp -r /etc/profile cluster2:/etc/
scp -r /etc/profile cluster3:/etc/
#在all session执行:
source /etc/profile
echo $KAFKA_HOME
#发送hosts配置文件:
scp -r /etc/hosts cluster2:/etc/
scp -r /etc/hosts cluster3:/etc/
#测试是否成功:在all session执行:
cat /etc/hosts
四、集群的启动和关闭
启动kafka之前一定要保证zk在启动,并且可用
启动zookeeper
启动zk ,关于zk脚本同时启动,可以参考我的另一个文章
https://blog.csdn.net/m0_73376570/article/details/141803080
zk启动之后的效果:
启动kafka
在all session中执行:
#启动kafka
kafka-server-start.sh -daemon /usr/kafka/config/server.properties
#检查是否运行成
jps
ps -ef | grep java
#停止kafka
kafka-server-stop.sh
五、常用命令
#查看当前服务器中的所有topic主题:
kafka-topics.sh --zookeeper cluster1:2181 --list
#如果是zk集群可以使用这样的命令:
kafka-topics.sh --zookeeper cluster1:2181,cluster2:2181,cluster3:2181 --list
#创建topic: list
kafka-topics.sh --zookeeper cluster2:2181 --create --replication-factor 3 --partitions 5 --topic ordertopic
kafka-topics.sh --zookeeper cluster2:2181 --create --replication-factor 2 --partitions 2 --topic goodstopic
#删除topic
kafka-topics.sh --zookeeper cluster1:2181 --delete --topic goodstopic
#生产消息
kafka-console-producer.sh --broker-list cluster2:9092 --topic goodstopic
#消费消息
kafka-console-consumer.sh --bootstrap-server cluster2:9092 --from-beginning --topic goodstopic
#同组消费者消费消息(多个窗口)
kafka-console-consumer.sh --bootstrap-server kafka1:9092 --consumer-property group.id=gtest --from-beginning --topic goodstopic
#查看一个topic详情
kafka-topics.sh --zookeeper cluster2:2181,cluster1:2181 --describe --topic tp1
参数说明:
--zookeeper 链接zk
--replication-factor 指定副本数目(副本数目不能大于总的brokers数目)
--partitions 指定分区数
--topic 指定topic名称
partitioncount 分区总数量
replicationfactor 副本数量
partition 分区
leader 每个分区有3个副本,每个副本都有leader
replicas 所有副本节点,不管leader follower
isr: 正在服务中的节点