kafka消息在client是怎么写入的
学习背景
今天我们来学习一下KAFKA的消息如何发送的。kafka消息的发送可以分为从客户端写入和服务端写入两部分,今天先将第一部分,客户端怎么写入的。
KAFKA基本概念
在正式开始介绍kafka之前,先了解一下kafka内的基本概念,这些基本概念是接下来讲解发送流程的基础。
-
broker kafka实例:每台机器上可以部署一个或者多个kafka实例,一般一台机器一个,我们可以认为一个kafka实例代表一台实际的对外提供服务的kafka物理机。kafka集群内实例用编号区分。例如broker-0,broker-1
-
topic 主题:每条消息的归属依据。
-
partition 分区: 每个主题有多个partitioon,partition可以认为是对主题下消息的二次分类,不同分区之间的数据是不重复的。分区在物理存储上的表现形式就是一个个的文件夹。
-
replication 副本: 副本是分区的实际实现,每个分区都有多个副本,但是只有一个服务对外提供服务,我们称为leader副本,其他副本称为flower。副本的作用是用来做备份,做leader挂掉时follower之间选举出新的leader,follower和leader绝不在同一个broker上。副本的概念理解起来很复杂,这里大家可能听得云里雾里的,没关系副本机制可以说是kafka的核心。先放着,一会讲完flower之间选举机制就明白了。
-
producer 生产者: 向topic写入消息的程序。
-
message: 生产者实际发送的消息主体。
-
consumer 消费者:向topic订阅并消费消息的程序。
-
consumer group: 消费者组,划分consumer的依据,同一个组内的消费者能够消费同一个topic中的数据。
消息发送流程
客户端发送
让我们从一个例子来开始,一个简单的发送消息的例子。这里我定义了一个producer,并且自己实现了拦截器,序列化器,分区器。可能大家看到之类很陌生,因为一般不需要自定义这些组件,统统用默认的就可以了。这里我自定义这些组件仅仅为了做讲解。
从kafka的组件设计来讲,消息使用producer.send发送(当然还有异步发送,但是这不是重点)后,会顺序的经过拦截器,序列化器,分区器。三个组件的作用分别讲解一下。
消息最先抵达的是拦截器,拦截器的存在是为了在消息被发送之前做一些统一的修改或者统计,比如打印日志,统计每个topci发送的消息等等。从拦截器最后的返回结果我们可以看出来,这一步其实可以修改很多东西,甚至是消息发送的topic,partition都是可以更改的。当然一般不会有人这么干,因为这相当于篡改了原始消息内容。
拦截器之后经过的是序列化器,序列化器的目的是为了将消息序列化为字节数组,这里我自定义了一个和简单的序列化方式,直接序列化为JSON字符串再转换为字符数组。当然如果我们直接写入JSON格式字符串,那么使用默认的字符序列化器也是可以的。而且大部分情况下,我们应该尽量直接发送字符串,这样不至于每次变更消息格式都需要再改一遍序列化器的代码。
消息被序列化之后就到了分区器,分区器的作用只有一个,选择消息最重要发往那个分区。分区器一般也是不需要手动定义的,使用kafka默认的分区器就可以。当然,我们还可以在发送消息的位置,指定一个消息的分区,这样就会被发往我们想要发送的分区。或者我们指定一个消息的key,kafka则会根据key计算出一个hash值然后使用这个分区。这个特性非常有用,我们可以利用这个特点将很多相同"分类"的消息发送到同一个分区,比如消息里有个字段叫做orderId,那么我们根据orderId作为分区的key,就可以保证同一个订单的消息被发往同一个分区,这样如果订单的生命周期发生变化,那么生命周期变化的消息会被顺序的写入到分区内(这种场景高并发多线程写入其实还是会有问题,需要考虑状态机的流转和回退问题,但是可以通过拉长生命周期来避免)。如果不指定分区,也不指定消息key,kafka则会在各个分区之间轮训的写入消息。
到这里大家是不是以为就完了,这就算消息发送完毕了?当然不是,我们好药探究kafka在消息发送过程里到底做了那些事情。先看一幅图。
实际上,在创建的kafka producer内,有2个线程,一个是主线程,也就是我们执行send方法的线程,一个是sender线程。我们来看看这两个线程分别做了什么。
消息在经过分区器确定要发往那个分区之后,会被写入到record accumulator(消息累加器),并且会将若干条消息合并为一个record batch,每个record batch的大小由参数batch.size指定,默认16K。将多条消息合并是为了减少发送消息的次数,这样原本需要多次发送的消息只需要一次发送即可,当然这里还没实际执行发送的。每个record batch其实是放在一个双向队列Deque里的,消息在被发到消息收集器record accumulator后会找到自己所属的分区对应的deque,并判断需要将消息加入到队尾的record batch还是新建一个record batch。这里有个问题,如果消息超过了record batch的大小会怎么样?其实会按照消息大小来新建一个record batch,不过这个新建的record batch使用完毕就被回收了。而一般的record batch则会被复用。
消息是数据,需要实际的发送给物理节点broker,所以record accumulator里的record batch其实会被sender线程封装为request执行实际的发送。那么每个producer是怎么知道每个partition是实际的对应的那个物理节点呢,其实每个producer在启动时都会选择一个Node来查询整个集群的元数据(集群leader,flower,borker地址等信息),这里选择的Node是producer感知到的负载最小的Node,比如途中分区2对应的Node2,它的被发送的Request只有1个,客户端感知到Node2的负载是最小的,所以会向Node2去查询元数据。sender根据每个partition对应的broker信息后,将消息累加器里partition: record batch的映射关系转换为node: request的对应关系,并将数据实际的发送到对应的node里。为了提高发送速度,kafka并不会要求每个request都响应后才允许发送下下一个request,而是允许同时发送若干个request,通过参数max.in.flight.request.per.connection配置,默认是5(在某些顺序要求比较严格的场景下,这个值都是配成1的,比如mysql binlog)。到达这个值,则会阻塞向本Node发送的消息,直到收到Response,再清理对应的record accumulator里的record batch。
以上就是消息发送时,客户端做的事情,其实我们在消息发送时还可以设置很多参数,在这个流程里没有讲解,这里我列举一下
-
acks 控制消息在发送到leader毕后需要多少flower回复,0不等待任何回复,1只需要一个flower确认,-1必须所有flower确认。一般在消息可靠性保障等级比较高的场景下回设置为-1。
-
compression.type producer可以在消息发送之前使用压缩的方式将消息大小减少,更加能增加吞吐量,但是因为在客户端进行压缩,服务端解压缩又额外增加了延迟。默认none不执行压缩。
-
retries 控制消息发送失败后的重试次数,默认0,不进行任何重试。
-
linger.ms 每个record batch被发送出去时需要等待的时间,默认0,表示有可以发送的request窗口时立刻发送。
-
request.timeout.ms 请求超时时间。默认3000ms