消息系统队列(Message Queue)之kafka
1、 Kafka概述
1.1、 为什么需要消息系统
1)解耦:
在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
2)冗余:
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
3)扩展性:
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。
4)峰值处理能力:
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
5)可恢复性:
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
6)顺序保证:
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka保证一个Partition内的消息的有序性)
7)缓冲:
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
8)异步通信:
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
1.2、 消息队列的两种模式
最常见的两种消息系统队列模型是消息队列模型和发布/订阅模型。
l点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)
消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息。Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。
l发布/订阅模式(一对多,消费者消费数据之后不会清除消息)
消息生产者(发布)将消息发布到topic(主题)中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
kafka通过引入consumer组(consumer group)来同时支持这两种模型。(后面再讲)
1.1、 Kafka 是什么?
Kafka是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。
总结就是三句话:
生产者发送消息给kafka服务器;
消费者从kafka服务器读取消息;
kafka服务器依托zookeeper集群进行服务的协调管理。
1.2、 Kafka的使用场景:
l日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
l消息队列系统:解耦和生产者和消费者、缓存消息等。
l用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
l运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
l流式处理:比如spark streaming和storm
1.3、 官网地址
http://kafka.apache.org/
2、 部署kafka集群
Kafka架构:
kafka相关概念总结,如图上图中,kafka 相关名词解释如下:
1.producer:消息生产者,发布消息到 kafka 集群的终端或服务。
2.broker:kafka 集群中包含的服务器,一个kafka服务就是一个broker。
3.topic:每条发布到 kafka 集群的消息属于的类别,生产者和消费者面向的都是一个topic。
4.partition:partition 是物理上的概念,每个 topic 包含一个或多个 partition。为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列;
5.consumer:从 kafka 集群中消费消息的终端或服务。
6. Consumer Group (CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
7.replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。
8.leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。(replica 中的一个角色, producer 和 consumer 只跟 leader 交互)
9. follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的follower。
10.zookeeper:kafka 通过 zookeeper 来存储集群的 meta 信息。
主机规划:
2.1、 安装zookeeper
Kafka强依赖zookeeper,如果想要使用Kafka,就必须安装zookeeper,Kafka中的consumer信息、kafka集群、topic信息会被存储在ZK中。有人可能会说我在使用Kafka的时候就没有安装zookeeper,那是因为Kafka内置了一个ZK,一般我们不使用它。
在kafka中,zookeeper负责的是存储kafka中的元数据信息,队列的数据是不会存储到zookeeper的,kafka是分布式的,zookeeper协调broker、producer、consumer之间的关系,当有新的角色加入的时候,更新zookeeper中的数据,其他角色就可以得到通知,并作出相应的调整,不需要停机更新配置,做到动态扩容。
注:Zookeeper集群部署参考“分布式开源协调服务之ZooKeeper”文档。
2.2、 Kafka集群部署
一个Broker就是一个kafka服务,三种安装Kafka的方式,分别为:单节点单Broker部署、单节点多Broker部署、集群部署(多节点多Broker)。实际生产环境中使用的是第三种方式,以集群的方式来部署Kafka。
2.2.1、 官网下载kafka
http://kafka.apache.org/downloads.html
下载kafka软件包:kafka_2.12-3.8.0.tgz
2.2.2、 解压到指定目录
[root@cong11 ~]# tar -zxvf kafka_2.12-3.8.0.tgz -C /usr/local/
创建软链接
[root@cong11 ~]# ln -s /usr/local/kafka_2.12-3.8.0 /usr/local/kafka
[root@cong11 ~]# cd /usr/local/kafka/
[root@cong11 kafka_2.12-2.2.]# ls
bin config libs LICENSE NOTICE site-docs
我们下载的是二进制包,直接使用就可以
2.2.3、 修改kafka配置文件server.properties
[root@cong11 ~]# vim /usr/local/kafka/config/server.properties
broker.id=11
delete.topic.enable=true
auto.create.topics.enable=false
listeners=PLAINTEXT://cong11:9092
advertised.listeners=PLAINTEXT://cong11:9092
num.network.threads=30
num.io.threads=30
socket.send.buffer.bytes=5242880
socket.receive.buffer.bytes=5242880
socket.request.max.bytes=104857600
queued.max.requests=1000
log.dirs=/data/kafka/log
num.partitions=4
num.recovery.threads.per.data.dir=1
message.max.bytes=104857600
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=2
default.replication.factor=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=cong11:2181,cong12:2181,cong13:2181
zookeeper.session.timeout.ms=180000
zookeeper.connection.timeout.ms=6000
max.request.size=104857600
fetch.message.max.bytes=104857600
replica.fetch.max.bytes=104857600
replica.fetch.wait.max.ms=2000
unclean.leader.election=false
num.replica.fetchers=5
group.initial.rebalance.delay.ms=0
2.2.4、 创建日志目录
[root@cong11 ~]# mkdir -p /data/kafka/log
2.2.5、 添加path环境变量
[root@cong11 ~]# vim /etc/profile #文件最后天以下行
export KAFKA_HOME=/usr/local/kafka
export PATH=$KAFKA_HOME/bin:$PATH
[root@cong11 ~]# source /etc/profile #生效配置文件
补充:上述配置文件参数说明
lbroker.id=11 #每一个broker在集群中的唯一id标识,要求是正数。
ldelete.topic.enable=true #删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关的topic及其数据。默认为false,也就是说这条命令其实并不执行删除动作,仅仅是在zookeeper上标记该topic要被删除而已,同时也提醒用户一定要提前打开delete.topic.enable开关,否则删除动作是不会执行的。
lauto.create.topics.enable=false #是否允许自动创建topic,若是false,就需要通过命令创建topic
llisteners=PLAINTEXT://cong11:9092 #监听器,监听客户端请求的IP和端口,默认都是9092。listeners参数指定此kafka服务器需要监听的端口号,也就是告诉连接者(生产者、消费者)要通过什么协议访问指定主机名和端口开放的 Kafka 服务。
ladvertised.listeners=PLAINTEXT://cong11:9092 #暴露给外部的listeners,如果没有设置,会用listeners。
注:listeners和advertised.listeners区别
在内网部署及访问kafka时,只需要配置listeners参数即可
格式:listeners=<协议名称>://<内网ip>:<端口>,Kafka当前支持的协议类型包括:PLAINTEXT、SSL以及SASL_SSL等。
比如listeners=PLAINTEXT://192.168.133.11:9092
按照官网的参数说明,advertised.listeners默认值等于listeners参数的值,并被发布到zooleeper中,供客户端访问使用。此时kafka服务、broker之间通信都是使用192.168.133.11:9092
内外网访问
在内网部署kafka服务,并且生产者或者消费者在外网环境时,需要添加额外的配置,比如
此时zookeeper中的配置如下
advertised_listeners 监听器会注册在 zookeeper 中;
当我们对 192.168.133.11:9092 请求建立连接,kafka 服务器会通过 zookeeper 中注册的监听器,找到 INTERNAL监听器,然后通过 listeners 中找到对应的通讯 ip 和 端口 192.168.133.11:9092;
同理,当我们对 <公网 ip>:端口请求建立连接,kafka 服务器会通过 zookeeper 中注册的监听器,找到 EXTERNAL 监听器,然后通过 listeners 中找到对应的通讯 ip 和 端口 192.168.133.11:9093;
总结:
advertised_listeners 是对外暴露的服务端口,kafka组件之间通讯用的是 listeners。
lnum.network.threads=30 #网络线程数,即服务器用来接受请求或者发送响应的线程数
lnum.io.threads=30 #磁盘io线程数,即处理磁盘I/O的线程数
lsocket.send.buffer.bytes=5242880 #服务器使用的发送缓冲区大小,即发送缓冲区buffer大小,数据不是一下子就发送的,先会存储到缓冲区了到达一定的大小后在发送,能提高性能
lsocket.receive.buffer.bytes=5242880 #服务器使用的接收缓冲区大小,即kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘。
lsocket.request.max.bytes=104857600 #服务器将接受的请求的最大大小
lqueued.max.requests=1000 #I/O线程等待队列中的最大的请求数,超过这个数量,network线程就不会再接收一个新的请求。应该是一种自我保护机制。
llog.dirs=/data/kafka/log # kafka持久化消息的目录,该参数可以设置多个目录,用逗号分隔,这样kafka会负载均匀地分配到多个目录下。如果每个目录都在不同的磁盘上,那么还能提升整体写消息的吞吐量。默认情况下是/tmp/kafka-logs。
lnum.partitions=20 #每个topic的分区个数,一个topic默认一个分区,若是在topic创建时候有指定的话会被topic创建时的指定参数覆盖
ldefault.replication.factor #配置所有主题的默认复制因子,即副本的个数,默认是1
l复制还有2个内部主题设置:offsets.topic.replication.factor和transaction.state.log.replication.factor,这两个特殊主题的设置不能使用常规主题的默认设置。
The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
对于除了开发测试之外的其他任何东西,group元数据内部主题的复制因子“__consumer_offsets”和“__transaction_state”,建议值大于1,以确保可用性(如3)。
offsets.topic.replication.factor=2 #用于设置存储消费者客户端offset偏移量partition的副本个数。
transaction.state.log.replication.factor=2 #用于设置存储事务明细topic的副本数。
transaction.state.log.min.isr=2#覆盖事务主题的min.insync.replicas配置
lzookeeper.connect=cong11:2181,cong12:2181,cong13:2181 # 配置连接Zookeeper集群地址
lzookeeper.connection.timeout.ms=6000 #连接zookeeper超时时间
关于kafka配置文件的更多解释,请参考链接:
https://blog.csdn.net/memoordit/article/details/78850086
2.2.6、 配置cong12和cong13节点
1、将 kafka_2.12-3.8.0文件夹复制到另外两个节点下
[root@cong11 kafka]# scp -r /usr/local/kafka_2.12-3.8.0/ 192.168.1.12:/usr/local/
[root@cong11 kafka]# scp -r /usr/local/kafka_2.12-3.8.0/ 192.168.1.13:/usr/local/
2、在cong12和cong13节点上创建软链接
[root@cong12 ~]# ln -s /usr/local/kafka_2.12-3.8.0 /usr/local/kafka
[root@cong13 ~]# ln -s /usr/local/kafka_2.12-3.8.0 /usr/local/kafka
3、修改每个节点对应的 server.properties 文件的 broker.id和listenrs:
[root@cong12 ~]# cat /usr/local/kafka/config/server.properties
broker.id=12
listeners=PLAINTEXT://cong12:9092
advertised.listeners=PLAINTEXT://cong12:9092
[root@cong13 ~]# cat /usr/local/kafka/config/server.properties
broker.id=13
listeners=PLAINTEXT://cong13:9092
advertised.listeners=PLAINTEXT://cong13:9092
4、创建日志目录:
[root@cong12 ~]# mkdir -p /data/kafka/log
[root@cong13 ~]# mkdir -p /data/kafka/log
5、添加path环境变量
[root@cong11 kafka]# scp /etc/profile 192.168.1.12:/etc/profile
[root@cong12 ~]# source /etc/profile #生效配置文件
[root@cong11 kafka]# scp /etc/profile 192.168.1.13:/etc/profile
[root@cong13 ~]# source /etc/profile #生效配置文件
2.2.7、 启动kafka
2.2.7.1、 前台启动
[root@cong11 ~]# kafka-server-start.sh /usr/local/kafka/config/server.properties
看到[Kafka Server id=11] started (kafka.server.KafkaServer),说明kafka服务启动成功
2.2.7.2、 后台启动
[root@cong11 ~]# kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@cong12 ~]# kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
[root@cong13 ~]# kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
查看每个节点的kafka监听状态
2.2.7.3、 关闭kafka
如果需要关闭kafka可以执行下面的操作
[root@cong11 ~]# jps //查看kafka进程pid
[root@cong11 ~]# kill -9 50215
或者:
[root@cong11 ~]# kafka-server-stop.sh
2.2.8、 查看kafka进程
输入jps查看进程,如果可以看到Kafka进程,表示启动成功
[root@cong11 ~]# jps
50356 Jps
46567 QuorumPeerMain
50215 Kafka
[root@cong11 ~]# jps -m
46567 QuorumPeerMain /usr/local/apache-zookeeper-3.5.5-bin/bin/../conf/zoo.cfg
50215 Kafka /usr/local/kafka/config/server.properties
50380 Jps -m
可以看到我们的kafka和zookeeper的java进程
2.2.9、 Zookeeper+Kafka集群测试
创建一个topic,创建一个主题为test的topic
[root@cong11 kafka]# kafka-topics.sh --create --zookeeper cong11:2181,cong12:2181,cong13:2181 --replication-factor 2 --partitions 4 --topic test
Created topic test.
参数说明:
--zookeeper:指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样
--replication-factor:指定副本数量
--partitions:指定分区数量
--topic:主题名称
2.2.10、 显示指定的topic
[root@cong11 kafka]# kafka-topics.sh --describe --zookeeper cong11:2181,cong12:2181,cong13:2181 --topic test
2.2.11、 列出指定的topic:
[root@cong11 kafka]# kafka-topics.sh --list --zookeeper cong11:2181,cong12:2181,cong13:2181 --topic test
2.2.12、 查看所有的topic信息
注:修改分区数
# kafka-topics.sh --zookeeper cong11:2181,cong12:2181,cong13:2181 --alter --topic test --partitions 6
2.2.13、 创建 producer(生产者)
在cong11节点上测试产生消息:
[root@cong11 kafka]# kafka-console-producer.sh --broker-list cong11:9092 --topic test
2.2.14、 创建 consumer(消费者)
在cong12节点上测试消费
[root@cong12 ~]# kafka-console-consumer.sh --bootstrap-server cong11:9092,cong12:9092,cong13:9092 --topic test --from-beginning
hello world
welcome to china
this is example
在cong13节点上测试消费
[root@cong13 ~]# kafka-console-consumer.sh --bootstrap-server cong11:9092,cong12:9092,cong13:9092 --topic test --from-beginning
hello world
welcome to china
this is example
可以在消费者下看见刚才输入的数据。
然后在 producer 里输入消息,consumer 中就会显示出同样的内容,表示消费成功
注意:--from-beginning参数如果有表示从最开始消费数据,旧的和新的数据都会被消费,而没有该参数表示只会消费新产生的数据
2.2.15、 删除 topic
[root@cong13 ~]# kafka-topics.sh --delete --zookeeper cong11:2181,cong12:2181,cong13:2181 --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
至此Zookeeper+Kafka集群配置成功。
3、 Kafka 单节点多Broker(服务器)部署及使用(可以用于测试环境)
3.1、 部署架构
3.2、 安装kafka
参照单节点单Broker部署
3.2.1、 配置kafka配置文件
3.2.1.1、 生成配置文件
[root@cong11 ~]# cd /usr/local/kafka_2.12-3.8.0/config/
[root@cong11 config]# cp server.properties server-1.properties
[root@cong11 config]# cp server.properties server-2.properties
[root@cong11 config]# cp server.properties server-3.properties
3.2.1.2、 修改server-1.properties文件
[root@cong11 config]# vim server-1.properties #修改标红的部分
# broker的全局唯一编号,不能重复
broker.id=1
# 监听
listeners=PLAINTEXT://:9093
# 日志目录
log.dirs=/data/kafka/kafka-logs-1
3.2.1.3、 修改server-2.properties文件
[root@cong11 config]# vim server-2.properties #修改标红的部分
# broker的全局唯一编号,不能重复
broker.id=2
# 监听
listeners=PLAINTEXT://:9094
# 日志目录
log.dirs=/data/kafka/kafka-logs-2
3.2.1.4、 修改server-3.properties文件
[root@cong11 config]# vim server-3.properties #修改标红的部分
# broker的全局唯一编号,不能重复
broker.id=3
# 监听
listeners=PLAINTEXT://:9095
# 日志目录
log.dirs=/data/kafka/kafka-logs-3
3.2.1.5、 创建日志存放目录
[root@cong11 config]# mkdir -p /data/kafka/{kafka-logs-1,kafka-logs-2,kafka-logs-3}
3.2.2、 启动Zookeeper
[root@cong11 ~]# zkServer.sh start
3.2.3、 启动Kafka
启动3台kafka
[root@cong11 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties
[root@cong11 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties
[root@cong11 ~]# kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties
3.2.4、 查看进程
[root@cong11 ~]# jps
3.3、 创建topic(指定副本数量为3)
[root@cong11 ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic my-replicated-topic.
3.4、 查看所有的topic信息
[root@cong11 ~]# kafka-topics.sh --list --zookeeper localhost:2181
3.5、 查看某个topic的详细信息
[root@cong11 ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
replicas:列出了所有的副本节点,不管节点是否在服务中.
isr:是正在服务中的节点
3.6、 数据测试
3.6.1、 启动生产者
[root@cong11 ~]# kafka-console-producer.sh --broker-list localhost:9093 localhost:9094 localhost:9095 --topic my-replicated-topic
3.6.2、 启动消费者
[root@cong11 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic my-replicated-topic --from-beginning
3.6.3、 生产者生产数据
3.6.4、 消费者消费数据
[root@cong11 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic my-replicated-topic --from-beginning
注意:
如果消费不到数据,把第一台9092的服务打开
3.7、 单节点多borker容错性测试
Kafka是支持容错的,我们杀死leader,然后进行数据测试,测试集群故障转移
3.7.1、 查看集群leader
[root@cong11 ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
看到当前集群的leader是第2个节点
3.7.2、 杀死leader进程
[root@cong11 ~]# jps –m
[root@cong11 ~]# kill -9 2017
3.7.3、 查看当前集群状态
[root@cong11 ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
发现leader切换到第3个节点,第二个节点在活跃状态中消失
3.7.4、 测试数据
3.7.4.1、 启动生产者
[root@cong11 ~]# kafka-console-producer.sh --broker-list localhost:9093 localhost:9095 --topic my-replicated-topic
3.7.4.2、 启动消费者
[root@cong11 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9093 localhost:9095 -topic my-replicated-topic
3.7.4.3、 生产数据
[root@cong11 ~]# kafka-console-producer.sh --broker-list localhost:9093 localhost:9095 --topic my-replicated-topic
>test
>kafka
3.7.4.4、 消费数据
[root@cong11 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9093 localhost:9095 -topic my-replicated-topic
多节点部署跟单节点部署多Broker方法类似,只是把broker单独放在一个机器上。注意IP跟端口即可。跟redis和zookeeper多节点部署类似。