深入浅出Kafka
这个主题 武哥漫谈IT ,作者骆俊武 讲得更好
一、什么是Kafka?
首先我们得去官网看看是怎么介绍Kafka的:
https://kafka.apache.org/intro
Apache Kafka is an open-source distributed event streaming platform.
翻译成中文就是:Apache Kafka 是一个开源的分布式流处理平台。
Kafka 不是一个消息系统吗?为什么被称为分布式的流处理平台呢?这两者是一回事吗?
一定有读者会有这样的疑问,要解释这个问题,需要先从 Kafka 的诞生背景说起。
Kafka 最开始其实是 Linkedin 内部孵化的项目,在设计之初是被当做「数据管道」,用于处理以下两种场景:
1、运营活动场景:记录用户的浏览、搜索、点击、活跃度等行为。
2、系统运维场景:监控服务器的 CPU、内存、请求耗时等性能指标。
在收集资料学习的时候,已经发现有不少的前辈对官网的介绍进行翻译和总结了,所以我这里就不重复了,贴下地址大家自行去学习啦:
https://scala.cool/2018/03/learning-kafka-1/
https://colobu.com/2014/08/06/kafka-quickstart/
所以从一开始,Kafka 就是为实时日志流而生的。了解了这个背景,就不难理解 Kafka 与流数据的关系了,以及 Kafka 为什么在大数据领域有如此广泛的应用?也是因为它最初就是为解决大数据的管道问题而诞生的。
接着再解释下:为什么 Kafka 被官方定义成流处理平台呢?它不就提供了一个数据通道能力吗,怎么还和平台扯上关系了?
这是因为 Kafka 从 0.8 版本开始,就已经在提供一些和数据处理有关的组件了,比如:
- 1、Kafka Streams:一个轻量化的流计算库,性质类似于 Spark、Flink。
- 2、Kafka Connect:一个数据同步工具,能将 Kafka 中的数据导入到关系数据库、Hadoop、搜索引擎中。
可见 Kafka 的野心不仅仅是一个消息系统,它早就在往「实时流处理平台」方向发展了。
这时候,再回来看 Kafka 的官网介绍提到的 3 种能力,也不难理解了:
- 1、数据的发布和订阅能力(消息队列)
- 2、数据的分布式存储能力(存储系统)
- 3、数据的实时处理能力(流处理引擎)
我之前写过的消息队列入门文章也提到了,要做一个消息队列可能要考虑到以下的问题:
使用消息队列不可能是单机的(必然是分布式or集群)
数据写到消息队列,可能会存在数据丢失问题,数据在消息队列需要持久化(磁盘?数据库?Redis?分布式文件系统?)
想要保证消息(数据)是有序的,怎么做?
为什么在消息队列中重复消费了数据
下面我以Kafka为例对这些问题进行简单的解答,进而入门Kafka。
Kafka入门
整体架构
topic
https://mp.weixin.qq.com/s?__biz=MzAwNDA2OTM1Ng==&mid=2453151488&idx=2&sn=d8bb690283528f04cc32c16312a1177d&chksm=8cfd3d83bb8ab4959b2a70ffd7d86aa2d15d2971d934411e646a1b85826e5941df4341e1a174&scene=21#wechat_redirect
要吃透一个MQ,建议从「消息模型」这种最核心的理论层面入手,而不是一上来就去看技术架构,更不要直接进入技术细节。
所谓消息模型,可以理解成一种逻辑结构,它是技术架构再往上的一层抽象,往往隐含了最核心的设计思想。
下面我们尝试分析下 Kafka 的消息模型,看看它究竟是如何演化来的?
首先,为了将一份消息数据分发给多个消费者,并且每个消费者都能收到全量的消息,很自然的想到了广播。
紧接着问题出现了:来一条消息,就广播给所有消费者,但并非每个消费者都想要全部的消息,比如消费者 A 只想要消息1、2、3,消费者 B 只想要消息4、5、6,这时候该怎么办呢?
这个问题的关键点在于:MQ 不理解消息的语义,它根本无法做到对消息进行分类投递。
此时,MQ 想到了一个很聪明的办法:它将难题直接抛给了生产者,要求生产者在发送消息时,对消息进行逻辑上的分类,因此就演进出了我们熟知的 Topic 以及发布-订阅模型。
这样,消费者只需要订阅自己感兴趣的 Topic,然后从 Topic 中获取消息即可。
但是这样做了之后,仍然存在一个问题:假如多个消费者都对同一个 Topic 感兴趣(如下图中的消费者 C),那又该如何解决呢?
如果采用传统的队列模式(单播),那当一个消费者从队列中取走消息后,这条消息就会被删除,另外一个消费者就拿不到了。
这个时候,很自然又想到下面的解决方案:
也就是:当 Topic 每增加一个新的消费者,就「复制」一个完全一样的数据队列。
这样问题是解决了,但是随着下游消费者数量变多,将引发 MQ 性能的快速退化。尤其对于 Kafka 来说,它在诞生之初就是处理大数据场景的,这种复制操作显然成本太高了。
这时候,就有了 Kafka 最画龙点睛的一个解法:它将所有消息进行了持久化存储,由消费者自己各取所需,想取哪个消息,想什么时候取都行,只需要传递一个消息的 offset 即可。
这样一个根本性改变,彻底将复杂的消费问题又转嫁给消费者了,这样使得 Kafka 本身的复杂度大大降低,从而为它的高性能和高扩展打下了良好的基础。(这是 Kafka 不同于 ActiveMQ 和 RabbitMQ 最核心的地方)
最后,简化一下,就是下面这张图:
这也间接解释了为什么官方会将 Kakfa 同时定义成存储系统的原因。
众所周知,Kafka是一个消息队列,把消息放到队列里边的叫生产者,从队列里边消费的叫消费者。
一个消息中间件,队列不单单只有一个,我们往往会有多个队列,而我们生产者和消费者就得知道:把数据丢给哪个队列,从哪个队列消息。我们需要给队列取名字,叫做topic(相当于数据库里边表的概念)
现在我们给队列取了名字以后,生产者就知道往哪个队列丢数据了,消费者也知道往哪个队列拿数据了。我们可以有多个生产者往同一个队列(topic)丢数据,多个消费者往同一个队列(topic)拿数据
分区 partition
下面我们再接着分析下:Kafka 究竟是如何解决存储问题的?
面对海量数据,单机的存储容量和读写性能肯定有限,大家很容易想到一种存储方案:对数据进行分片存储。这种方案在我们实际工作中也非常常见:
1、比如数据库设计中,当单表的数据量达到几千万或者上亿时,我们会将它拆分成多个库或者多张表。
2、比如缓存设计中,当单个 Redis 实例的数据量达到几十个 G 引发性能瓶颈时,我们会将单机架构改成分片集群架构。
类似的拆分思想在 HDFS、ElasticSearch 等中间件中都能看到。
Kafka 也不例外,它同样采用了这种水平拆分方案。在 Kafka 的术语中,拆分后的数据子集叫做 Partition(分区),各个分区的数据合集即全量数据。
我们再来看下 Kafka 中的 Partition 具体是如何工作的?举一个很形象的例子,如果我们把「Kafka」类比成「高速公路」:
1、当大家听到京广高速的时候,知道这是一条从北京到广州的高速路,这是逻辑上的叫法,可以理解成 Kafka 中的 Topic(主题)。
2、一条高速路通常会有多个车道进行分流,每个车道上的车都是通往一个目的地的(属于同一个Topic),这里所说的车道便是 Partition。
这样,一条消息的流转路径就如下图所示,先走主题路由,然后走分区路由,最终决定这条消息该发往哪个分区。
因此,Partition 是 Kafka 最基本的部署单元。本文之所以将 Partition 称作 Kafka 架构设计的任督二脉,基于下面两点原因:
1、Partition 是存储的关键所在,MQ「一发一存一消费」的核心流程必然围绕它展开。
2、Kafka 高并发设计中最难的三高问题都能和 Partition 关联起来。
因此,以 Partition 作为根,能很自然地联想出 Kafka 架构设计中的各个知识点,形成可靠的知识体系。
为了提高一个队列(topic)的吞吐量,Kafka会把topic进行分区(Partition)
所以,生产者实际上是往一个topic名为Java3y中的分区(Partition)丢数据,消费者实际上是往一个topic名为Java3y的分区(Partition)取数据
生产者集群
一台Kafka服务器叫做Broker,Kafka集群就是多台Kafka服务器:
一个topic会分为多个partition,实际上partition会分布在不同的broker中,举个例子:
由此得知:Kafka是天然分布式的。
高可用设计
按图上所示的情况,这里想要说明的是:
-
如果消费者组中的某个消费者挂了,那么其中一个消费者可能就要消费两个partition了
-
如果只有三个partition,而消费者组有4个消费者,那么一个消费者会空闲
-
如果多加入一个消费者组,无论是新增的消费者组还是原本的消费者组,都能消费topic的全部数据。(消费者组之间从逻辑上它们是独立的)
没错,Kafka 正是通过 Partition 的多副本机制解决了高可用问题。在 Kafka 集群中,每个 Partition 都有多个副本,同一分区的不同副本中保存的是相同的消息。
副本之间是 “一主多从” 的关系,其中 leader 副本负责读写请求,follower 副本只负责和 leader 副本同步消息,当 leader 副本发生故障时,它才有机会被选举成新的 leader 副本并对外提供服务,否则一直是待命状态。
现在,我假设 Kafka 集群中有 4 台服务器,主题 A 和主题 B 都有两个 Partition,且每个 Partition 各有两个副本,那最终的多副本架构将如下图所示:
现在我们已经知道了往topic里边丢数据,实际上这些数据会分到不同的partition上,这些partition存在不同的broker上。分布式肯定会带来问题:“万一其中一台broker(Kafka服务器)出现网络抖动或者挂了,怎么办?”
Kafka是这样做的:我们数据存在不同的partition上,那kafka就把这些partition做备份。比如,现在我们有三个partition,分别存在三台broker上。每个partition都会备份,这些备份散落在不同的broker上。
红色块的partition代表的是主分区,紫色的partition块代表的是备份分区。生产者往topic丢数据,是与主分区交互,消费者消费topic的数据,也是与主分区交互。
备份分区仅仅用作于备份,不做读写。如果某个Broker挂了,那就会选举出其他Broker的partition来作为主分区,这就实现了高可用。
另外值得一提的是:当生产者把数据丢进topic时,我们知道是写在partition上的,那partition是怎么将其持久化的呢?(不持久化如果Broker中途挂了,那肯定会丢数据嘛)。
- Kafka是将partition的数据写在磁盘的(消息日志),不过Kafka只允许追加写入(顺序访问),避免缓慢的随机 I/O 操作。
- Kafka也不是partition一有数据就立马将数据写到磁盘上,它会先缓存一部分,等到足够多数据量或等待一定的时间再批量写入(flush)。
消费者与消费者集群
上面balabala地都是讲生产者把数据丢进topic是怎么样的,下面来讲讲消费者是怎么消费的。既然数据是保存在partition中的,那么消费者实际上也是从partition中取数据。
从消费者来看,首先要满足两个基本诉求:
1、广播消费能力:同一个 Topic 可以被多个消费者订阅,一条消息能够被消费多次。
2、集群消费能力:当消费者本身也是集群时,每一条消息只能分发给集群中的一个消费者进行处理。
为了满足这两点要求,Kafka 引出了消费组的概念,每个消费者都有一个对应的消费组,组间进行广播消费,组内进行集群消费。此外,Kafka 还限定了:每个 Partition 只能由消费组中的一个消费者进行消费。
最终的消费关系如下图所示:假设主题 A 共有 4 个分区,消费组 2 只有两个消费者,最终这两个消费组将平分整个负载,各自消费两个分区的消息。
如果要加快消息的处理速度,该如何做呢?也很简单,向消费组 2 中增加新的消费者即可,Kafka 将以 Partition 为单位重新做负载均衡。当增加到 4 个消费者时,每个消费者仅需处理 1 个 Partition,处理速度将提升两倍。
到这里,存储可扩展、消息并行处理这两个难题都解决了。但是高并发架构设计上,还遗留了一个很重要的问题:那就是高可用设计(看前面生产者的章节)。
生产者可以有多个,消费者也可以有多个。像上面图的情况,是一个消费者消费三个分区的数据。多个消费者可以组成一个消费者组。
本来是一个消费者消费三个分区的,现在我们有消费者组,就可以每个消费者去消费一个分区(也是为了提高吞吐量)
其他——性能提升
前面讲解到了生产者往topic里丢数据是存在partition上的,而partition持久化到磁盘是IO顺序访问的,并且是先写缓存,隔一段时间或者数据量足够大的时候才批量写入磁盘的。
消费者在读的时候也很有讲究:正常的读磁盘数据是需要将内核态数据拷贝到用户态的,而Kafka 通过调用sendfile()直接从内核空间(DMA的)到内核空间(Socket的),少做了一步拷贝的操作。
有的同学可能会产生疑问:消费者是怎么知道自己消费到哪里的呀?Kafka不是支持回溯吗?那是怎么做的呀?
比如上面也提到:如果一个消费者组中的某个消费者挂了,那挂掉的消费者所消费的分区可能就由存活的消费者消费。那存活的消费者是需要知道挂掉的消费者消费到哪了,不然怎么玩。
这里要引出offset了,Kafka就是用offset来表示消费者的消费进度到哪了,每个消费者会都有自己的offset。说白了offset就是表示消费者的消费进度。
在以前版本的Kafka,这个offset是由Zookeeper来管理的,后来Kafka开发者认为Zookeeper不合适大量的删改操作,于是把offset在broker以内部topic(__consumer_offsets)的方式来保存起来。
每次消费者消费的时候,都会提交这个offset,Kafka可以让你选择是自动提交还是手动提交。
既然提到了Zookeeper,那就多说一句。Zookeeper虽然在新版的Kafka中没有用作于保存客户端的offset,但是Zookeeper是Kafka一个重要的依赖。它负责以下功能:
- 探测broker和consumer的添加或移除。
- 负责维护所有partition的领导者/从属者关系(主分区和备份分区),如果主分区挂了,需要选举出备份分区作为主分区。
- 维护topic、partition等元配置信息
- ….
最后小结一下
通过这篇文章,文章开头那几个问题估计多多少少都懂一些啦。我来简要回答一下:
使用消息队列不可能是单机的(必然是分布式or集群)
Kafka天然是分布式的,往一个topic丢数据,实际上就是往多个broker的partition存储数据
数据写到消息队列,可能会存在数据丢失问题,数据在消息队列需要持久化(磁盘?数据库?Redis?分布式文件系统?)
Kafka会将partition以消息日志的方式(落磁盘)存储起来,通过 顺序访问IO和缓存(等到一定的量或时间)才真正把数据写到磁盘上,来提高速度。
想要保证消息(数据)是有序的,怎么做?
Kafka会将数据写到partition,单个partition的写入是有顺序的。如果要保证全局有序,那只能写入一个partition中。如果要消费也有序,消费者也只能有一个。
为什么在消息队列中重复消费了数据
凡是分布式就无法避免网络抖动/机器宕机等问题的发生,很有可能消费者A读取了数据,还没来得及消费,就挂掉了。Zookeeper发现消费者A挂了,让消费者B去消费原本消费者A的分区,等消费者A重连的时候,发现已经重复消费同一条数据了。(各种各样的情况,消费者超时等等都有可能…)
如果业务上不允许重复消费的问题,最好消费者那端做业务上的校验(如果已经消费过了,就不消费了)
Kafka为什么这么快?
存储消息
顺序读写
完成一次磁盘 IO,需要经过寻道、旋转和数据传输三个步骤。数据传输时间通常远小于前两部分消耗时间(特别对不大的数据来说)简单计算时可忽略。
Kafka 选用的是「日志文件」来存储消息, 采用顺序写文件的方式来提高磁盘写入性能。顺序写文件,基本减少了磁盘寻道和旋转的次数。磁头再也不用在磁道上乱舞了,而是一路向前飞速前行。
Kafka 中每个分区是一个有序的,不可变的消息序列,新的消息不断追加到 Partition 的末尾,在 Kafka 中 Partition 只是一个逻辑概念,Kafka 将 Partition 划分为多个 Segment,每个 Segment 对应一个物理文件,Kafka 对 segment 文件追加写,这就是顺序写文件。
为什么 Kafka 可以使用追加写的方式呢?
这和 Kafka 的性质有关,我们来看看 Kafka 和 Redis,说白了,Kafka 就是一个Queue,而 Redis 就是一个HashMap。Queue和Map的区别是什么?
Queue 是 FIFO 的,数据是有序的;HashMap数据是无序的,是随机读写的。Kafka 的不可变性,有序性使得 Kafka 可以使用追加写的方式写文件。
其实很多符合以上特性的数据系统,都可以采用追加写的方式来优化磁盘性能。典型的有Redis的 AOF 文件,各种数据库的WAL(Write ahead log)机制等等。
more介绍
Kafka 来看,它的功能性需求至少包括以下几点:
1、存的数据主要是消息流:消息可以是最简单的文本字符串,也可以是自定义的复杂格式。
但是对于 Broker 来说,它只需处理好消息的投递即可,无需关注消息内容本身。
2、数据量级非常大:因为 Kafka 作为 Linkedin 的孵化项目诞生,用作实时日志流处理(运营活动中的埋点、运维监控指标等),按 Linkedin 当初的业务规模来看,每天要处理的消息量预计在千亿级规模。
3、CRUD 场景足够简单:因为消息队列最核心的功能就是数据管道,它仅提供转储能力,因此 CRUD 操作确实很简单。
首先,消息等同于通知事件,都是追加写入的,根本无需考虑 update。其次,对于 Consumer 端来说,Broker 提供按 offset(消费位移)或者 timestamp(时间戳)查询消息的能力就行。再次,长时间未消费的消息(比如 7 天前的),Broker 做好定期删除即可。
接着,我们再来看看非功能性需求:
1、性能要求:之前的文章交代过,Linkedin 最初尝试过用 ActiveMQ 来解决数据传输问题,但是性能无法满足要求,然后才决定自研 Kafka。ActiveMQ 的单机吞吐量大约是万级 TPS,Kafka 显然要比 ActiveMQ 的性能高一个量级才行。
2、稳定性要求:消息的持久化(确保机器重启后历史数据不丢失)、单台 Broker 宕机后如何快速故障转移继续对外提供服务,这两个能力也是 Kafka 必须要考虑的。
3、扩展性要求:Kafka 面对的是海量数据的存储问题,必然要考虑存储的扩展性。
再看数据存储领域,有两个 “极端” 发展方向:
1、加快读:通过索引( B+ 树、二份查找树等方式),提高查询速度,但是写入数据时要维护索引,因此会降低写入效率。
2、加快写:纯日志型,数据以 append 追加的方式顺序写入,不加索引,使得写入速度非常高(理论上可接近磁盘的写入速度),但是缺乏索引支持,因此查询性能低。
基于这两个极端,又衍生出来了 3 类最具代表性的底层索引结构:
1、哈希索引:通过哈希函数将 key 映射成数据的存储地址,适用于等值查询等简单场景,对于比较查询、范围查询等复杂场景无能为力。
2、B/B+ Tree 索引:最常见的索引类型,重点考虑的是读性能,它是很多传统关系型数据库,比如 MySQL、Oracle 的底层结构。
3、 LSM Tree 索引:数据以 Append 方式追加写入日志文件,优化了写但是又没显著降低读性能,众多 NoSQL 存储系统比如 BigTable,HBase,Cassandra,RocksDB 的底层结构。
PageCache
简单版
磁盘顺序写已经很快了,但是对比内存顺序写仍然慢了几个数量级,那有没有可能继续优化呢?答案是肯定的。
这里 Kafka 用到了 Page Cache 技术,简单理解就是:利用了操作系统本身的缓存技术,在读写磁盘日志文件时,其实操作的都是内存,然后由操作系统决定什么时候将 Page Cache 里的数据真正刷入磁盘。
通过下面这个示例图便一目了然。
那 Page Cache 究竟什么时候会发挥最大的威力呢?这又不得不提 Page Cache 所用到的两个经典原理。
Page Cache 缓存的是最近会被使用的磁盘数据,利用的是「时间局部性」原理,依据是:最近访问的数据很可能接下来再访问到。而预读到 Page Cache 中的磁盘数据,又利用了「空间局部性」原理,依据是:数据往往是连续访问的。
而 Kafka 作为消息队列,消息先是顺序写入,而且立马又会被消费者读取到,无疑非常契合上述两条局部性原理。因此,页缓存可以说是 Kafka 做到高吞吐的重要因素之一。
除此之外,页缓存还有一个巨大的优势。用过 Java 的人都知道:如果不用页缓存,而是用 JVM 进程中的缓存,对象的内存开销非常大(通常是真实数据大小的几倍甚至更多),此外还需要进行垃圾回收,GC 所带来的 Stop The World 问题也会带来性能问题。可见,页缓存确实优势明显,而且极大地简化了 Kafka 的代码实现。
复杂版
producer 生产消息到 Broker 时,Broker 会使用 pwrite() 系统调用【对应到 Java NIO 的 FileChannel.write() API】按偏移量写入数据,此时数据都会先写入page cache。consumer 消费消息时,Broker 使用 sendfile() 系统调用【对应 FileChannel.transferTo() API】,零拷贝地将数据从 page cache 传输到 broker 的 Socket buffer,再通过网络传输。
leader 与 follower 之间的同步,与上面 consumer 消费数据的过程是同理的。
page cache中的数据会随着内核中 flusher 线程的调度以及对 sync()/fsync() 的调用写回到磁盘,就算进程崩溃,也不用担心数据丢失。另外,如果 consumer 要消费的消息不在page cache里,才会去磁盘读取,并且会顺便预读出一些相邻的块放入 page cache,以方便下一次读取。
因此如果 Kafka producer 的生产速率与 consumer 的消费速率相差不大,那么就能几乎只靠对 broker page cache 的读写完成整个生产 - 消费过程,磁盘访问非常少。
IO 多路复用
Kafka 采用的是:很典型的 Reactor 网络通信模型,完整的网络通信层框架图如下所示:
通俗点记忆就是 1 + N + M:
- 1:表示 1 个 Acceptor 线程,负责监听新的连接,然后将新连接交给 Processor 线程处理。
- N:表示 N 个 Processor 线程,每个 Processor 都有自己的 selector,负责从 socket 中读写数据。
- M:表示 M 个 KafkaRequestHandler 业务处理线程,它通过调用 KafkaApis 进行业务处理,然后生成 response,再交由给 Processor 线程。
对于 IO 有所研究的同学,应该清楚:Reactor 模式正是采用了很经典的 IO 多路复用技术,它可以复用一个线程去处理大量的 Socket 连接,从而保证高性能。Netty 和 Redis 为什么能做到十万甚至百万并发?它们其实都采用了 Reactor 网络通信模型。
分区分段结构
简单版
磁盘顺序写加上页缓存很好地解决了日志文件的高性能读写问题。但是如果一个 Topic 只对应一个日志文件,显然只能存放在一台 Broker 机器上。
当面对海量消息时,单机的存储容量和读写性能肯定有限,这样又引出了又一个精妙的存储设计:对数据进行分区存储。
我在 Kafka 架构设计的任督二脉 一文中详细解释了分区(Partition)的概念和作用,它是 Kafka 并发处理的最小粒度,很好地解决了存储的扩展性问题。随着分区数的增加,Kafka 的吞吐量得以进一步提升。
其实在 Kafka 的存储底层,在分区之下还有一层:那便是「分段」。简单理解:分区对应的其实是文件夹,分段对应的才是真正的日志文件。
每个 Partition 又被分成了多个 Segment,那为什么有了 Partition 之后,还需要 Segment 呢?
如果不引入 Segment,一个 Partition 只对应一个文件,那这个文件会一直增大,势必造成单个 Partition 文件过大,查找和维护不方便。
此外,在做历史消息删除时,必然需要将文件前面的内容删除,只有一个文件显然不符合 Kafka 顺序写的思路。而在引入 Segment 后,则只需将旧的 Segment 文件删除即可,保证了每个 Segment 的顺序写。
详细版
分区并发
Kafka 的 Topic 可以分成多个 Partition,每个 Paritition 类似于一个队列,保证数据有序。同一个 Group 下的不同 Consumer 并发消费 Paritition,分区实际上是调优 Kafka 并行度的最小单元,因此,可以说,每增加一个 Paritition 就增加了一个消费并发。
Kafka 具有优秀的分区分配算法——StickyAssignor,可以保证分区的分配尽量地均衡,且每一次重分配的结果尽量与上一次分配结果保持一致。这样,整个集群的分区尽量地均衡,各个 Broker 和 Consumer 的处理不至于出现太大的倾斜。
65 哥:那是不是分区数越多越好呢?”
当然不是。
-
越多的分区需要打开更多的文件句柄
在 kafka 的 broker 中,每个分区都会对照着文件系统的一个目录。在 kafka 的数据日志文件目录中,每个日志数据段都会分配两个文件,一个索引文件和一个数据文件。因此,随着 partition 的增多,需要的文件句柄数急剧增加,必要时需要调整操作系统允许打开的文件句柄数。 -
客户端 / 服务器端需要使用的内存就越多
客户端 producer 有个参数 batch.size,默认是 16KB。它会为每个分区缓存消息,一旦满了就打包将消息批量发出。看上去这是个能够提升性能的设计。不过很显然,因为这个参数是分区级别的,如果分区数越多,这部分缓存所需的内存占用也会更多。 -
降低高可用性
分区越多,每个 Broker 上分配的分区也就越多,当一个发生 Broker 宕机,那么恢复时间将很长。
文件结构(提前提到了稀疏索引)
Kafka 消息是以 Topic 为单位进行归类,各个 Topic 之间是彼此独立的,互不影响。每个 Topic 又可以分为一个或多个分区。每个分区各自存在一个记录消息数据的日志文件。
Kafka 每个分区日志在物理上实际按大小被分成多个 Segment。
- segment file 组成:由 2 大部分组成,分别为 index file 和 data file,此 2 个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为 segment 索引文件、数据文件。
- segment 文件命名规则:partion 全局的第一个 segment 从 0 开始,后续每个 segment 文件名为上一个 segment 文件最后一条消息的 offset 值。数值最大为 64 位 long 大小,19 位数字字符长度,没有数字用 0 填充。
index 采用稀疏索引,这样每个 index 文件大小有限,Kafka 采用mmap的方式,直接将 index 文件映射到内存,这样对 index 的操作就不需要操作磁盘 IO。mmap的 Java 实现对应 MappedByteBuffer 。
65 哥笔记:mmap 是一种内存映射文件的方法。即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。实现这样的映射关系后,进程就可以采用指针的方式读写操作这一段内存,而系统会自动回写脏页面到对应的文件磁盘上,即完成了对文件的操作而不必再调用 read,write 等系统调用函数。相反,内核空间对这段区域的修改也直接反映用户空间,从而可以实现不同进程间的文件共享。
Kafka 充分利用二分法来查找对应 offset 的消息位置:
按照二分法找到小于 offset 的 segment 的.log 和.index
用目标 offset 减去文件名中的 offset 得到消息在这个 segment 中的偏移量。
再次用二分法在 index 文件中找到对应的索引。
到 log 文件中,顺序查找,直到找到 offset 对应的消息。
生产消息
批量与压缩
简单版
Kafka 作为一个消息队列,很显然是一个 IO 密集型应用,它所面临的挑战除了磁盘 IO(Broker 端需要对消息持久化),还有网络 IO(Producer 到 Broker,Broker 到 Consumer,都需要通过网络进行消息传输)。
在上一篇文章已经指出过:磁盘顺序 IO 的速度其实非常快,不亚于内存随机读写。这样网络 IO 便成为了 Kafka 的性能瓶颈所在。
基于这个背景, Kafka 采用了批量发送消息的方式,通过将多条消息按照分区进行分组,然后每次发送一个消息集合,从而大大减少了网络传输的 overhead。
看似很平常的一个手段,其实它大大提升了 Kafka 的吞吐量,而且它的精妙之处远非如此,下面几条优化手段都和它息息相关。
消息压缩的目的是为了进一步减少网络传输带宽。而对于压缩算法来说,通常是:数据量越大,压缩效果才会越好。
因为有了批量发送这个前期,从而使得 Kafka 的消息压缩机制能真正发挥出它的威力(压缩的本质取决于多消息的重复性)。对比压缩单条消息,同时对多条消息进行压缩,能大幅减少数据量,从而更大程度提高网络传输率。
有文章对 Kafka 支持的三种压缩算法:gzip、snappy、lz4 进行了性能对比,测试 2 万条消息,效果如下:
整体来看,gzip 压缩效果最好,但是生成耗时更长,综合对比 lz4 性能最佳。
其实压缩消息不仅仅减少了网络 IO,它还大大降低了磁盘 IO。因为批量消息在持久化到 Broker 中的磁盘时,仍然保持的是压缩状态,最终是在 Consumer 端做了解压缩操作。
这种端到端的压缩设计,其实非常巧妙,它又大大提高了写磁盘的效率。
详细版
Kafka Producer 向 Broker 发送消息不是一条消息一条消息的发送。使用过 Kafka 的同学应该知道,Producer 有两个重要的参数:batch.size和linger.ms。这两个参数就和 Producer 的批量发送有关。
Kafka Producer 的执行流程如下图所示:
发送消息依次经过以下处理器:
- Serialize:键和值都根据传递的序列化器进行序列化。优秀的序列化方式可以提高网络传输的效率。
- Partition:决定将消息写入主题的哪个分区,默认情况下遵循 murmur2 算法。自定义分区程序也可以传递给生产者,以控制应将消息写入哪个分区。
- Compress:默认情况下,在 Kafka 生产者中不启用压缩.Compression 不仅可以更快地从生产者传输到代理,还可以在复制过程中进行更快的传输。压缩有助于提高吞吐量,降低延迟并提高磁盘利用率。
- Accumulate:Accumulate顾名思义,就是一个消息累计器。其内部为每个 Partition 维护一个Deque双端队列,队列保存将要发送的批次数据,Accumulate将数据累计到一定数量,或者在一定过期时间内,便将数据以批次的方式发送出去。记录被累积在主题每个分区的缓冲区中。根据生产者批次大小属性将记录分组。主题中的每个分区都有一个单独的累加器 / 缓冲区。
- Group Send:记录累积器中分区的批次按将它们发送到的代理分组。批处理中的记录基于 batch.size 和 linger.ms 属性发送到代理。记录由生产者根据两个条件发送。当达到定义的批次大小或达到定义的延迟时间时。
Kafka 支持多种压缩算法:lz4、snappy、gzip。Kafka 2.1.0 正式支持 ZStandard —— ZStandard 是 Facebook 开源的压缩算法,旨在提供超高的压缩比 (compression ratio),具体细节参见 zstd。
Producer、Broker 和 Consumer 使用相同的压缩算法,在 producer 向 Broker 写入数据,Consumer 向 Broker 读取数据时甚至可以不用解压缩,最终在 Consumer Poll 到消息时才解压,这样节省了大量的网络和磁盘开销。
高效序列化
Kafka 消息中的 Key 和 Value,都支持自定义类型,只需要提供相应的序列化和反序列化器即可。因此,用户可以根据实际情况选用快速且紧凑的序列化方式(比如 ProtoBuf、Avro)来减少实际的网络传输量以及磁盘存储量,进一步提高吞吐量。
内存池复用
前面说过 Producer 发送消息是批量的,因此消息都会先写入 Producer 的内存中进行缓冲,直到多条消息组成了一个 Batch,才会通过网络把 Batch 发给 Broker。
当这个 Batch 发送完毕后,显然这部分数据还会在 Producer 端的 JVM 内存中,由于不存在引用了,它是可以被 JVM 回收掉的。
但是大家都知道,JVM GC 时一定会存在 Stop The World 的过程,即使采用最先进的垃圾回收器,也势必会导致工作线程的短暂停顿,这对于 Kafka 这种高并发场景肯定会带来性能上的影响。
有了这个背景,便引出了 Kafka 非常优秀的内存池机制,它和连接池、线程池的本质一样,都是为了提高复用,减少频繁的创建和释放。
具体是如何实现的呢?其实很简单:Producer 一上来就会占用一个固定大小的内存块,比如 64MB,然后将 64 MB 划分成 M 个小内存块(比如一个小内存块大小是 16KB)。
当需要创建一个新的 Batch 时,直接从内存池中取出一个 16 KB 的内存块即可,然后往里面不断写入消息,但最大写入量就是 16 KB,接着将 Batch 发送给 Broker ,此时该内存块就可以还回到缓冲池中继续复用了,根本不涉及垃圾回收。最终整个流程如下图所示:
了解了 Producer 端上面 4 条高性能设计后,大家一定会有一个疑问:传统的数据库或者消息中间件都是想办法让 Client 端更轻量,将 Server 设计成重量级,仅让 Client 充当应用程序和 Server 之间的接口。
但是 Kafka 却反其道而行之,采取了独具一格的设计思路,在将消息发送给 Broker 之前,需要先在 Client 端完成大量的工作,例如:消息的分区路由、校验和的计算、压缩消息等。这样便很好地分摊 Broker 的计算压力。
可见,没有最好的设计,只有最合适的设计,这就是架构的本源。
消费消息
稀疏索引
如何提高读性能,大家很容易想到的是:索引。Kafka 所面临的查询场景其实很简单:能按照 offset 或者 timestamp 查到消息即可。
如果采用 B Tree 类的索引结构来实现,每次数据写入时都需要维护索引(属于随机 IO 操作),而且还会引来「页分裂」这种比较耗时的操作。而这些代价对于仅需要实现简单查询要求的 Kafka 来说,显得非常重。所以,B Tree 类的索引并不适用于 Kafka。
相反,哈希索引看起来却非常合适。为了加快读操作,如果只需要在内存中维护一个「从 offset 到日志文件偏移量」的映射关系即可,每次根据 offset 查找消息时,从哈希表中得到偏移量,再去读文件即可。(根据 timestamp 查消息也可以采用同样的思路)
但是哈希索引常驻内存,显然没法处理数据量很大的情况,Kafka 每秒可能会有高达几百万的消息写入,一定会将内存撑爆。
可我们发现消息的 offset 完全可以设计成有序的(实际上是一个单调递增 long 类型的字段),这样消息在日志文件中本身就是有序存放的了,我们便没必要为每个消息建 hash 索引了,完全可以将消息划分成若干个 block,只索引每个 block 第一条消息的 offset 即可,先根据大小关系找到 block,然后在 block 中顺序搜索,这便是 Kafka “稀疏索引” 的设计思想。
采用 “稀疏索引”,可以认为是在磁盘空间、内存空间、查找性能等多方面的一个折中。有了稀疏索引,当给定一个 offset 时,Kafka 采用的是二分查找来高效定位不大于 offset 的物理位移,然后找到目标消息。
mmap
利用稀疏索引,已经基本解决了高效查询的问题,但是这个过程中仍然有进一步的优化空间,那便是通过 mmap(memory mapped files) 读写上面提到的稀疏索引文件,进一步提高查询消息的速度。
注意:mmap 和 page cache 是两个概念,网上很多资料把它们混淆在一起。此外,还有资料谈到 Kafka 在读 log 文件时也用到了 mmap,通过对 2.8.0 版本的源码分析,这个信息也是错误的,其实只有索引文件的读写才用到了 mmap.
究竟如何理解 mmap?前面提到,常规的文件操作为了提高读写性能,使用了 Page Cache 机制,但是由于页缓存处在内核空间中,不能被用户进程直接寻址,所以读文件时还需要通过系统调用,将页缓存中的数据再次拷贝到用户空间中。
而采用 mmap 后,它将磁盘文件与进程虚拟地址做了映射,并不会招致系统调用,以及额外的内存 copy 开销,从而提高了文件读取效率。
关于 mmap,好友小风哥写过一篇很通俗的文章: mmap 可以让程序员解锁哪些骚操作?(https://mp.weixin.qq.com/s?__biz=Mzg4OTYzODM4Mw==&mid=2247486269&idx=1&sn=4e6cfd17ddaf5eff6c0dc6d2758573ab&scene=21#wechat_redirect)大家可以参考。
具体到 Kafka 的源码层面,就是基于 JDK nio 包下的 MappedByteBuffer 的 map 函数,将磁盘文件映射到内存中。
至于为什么 log 文件不采用 mmap?其实是一个特别好的问题,这个问题社区并没有给出官方答案,网上的答案只能揣测作者的意图。个人比较认同 stackoverflow 上的这个答案:
mmap 有多少字节可以映射到内存中与地址空间有关,32 位的体系结构只能处理 4GB 甚至更小的文件。Kafka 日志通常足够大,可能一次只能映射部分,因此读取它们将变得非常复杂。然而,索引文件是稀疏的,它们相对较小。将它们映射到内存中可以加快查找过程,这是内存映射文件提供的主要好处。
零拷贝
消息借助稀疏索引被查询到后,下一步便是:将消息从磁盘文件中读出来,然后通过网卡发给消费者,那这一步又可以怎么优化呢?
Kafka 用到了零拷贝(Zero-Copy)技术来提升性能。所谓的零拷贝是指数据直接从磁盘文件复制到网卡设备,而无需经过应用程序,减少了内核和用户模式之间的上下文切换。
下面这个过程是不采用零拷贝技术时,从磁盘中读取文件然后通过网卡发送出去的流程,可以看到:经历了 4 次拷贝,4 次上下文切换。
如果采用传统的 IO 流程,先读取网络 IO,再写入磁盘 IO,实际需要将数据 Copy 四次。
第一次:读取磁盘文件到操作系统内核缓冲区;
第二次:将内核缓冲区的数据,copy 到应用程序的 buffer;
第三步:将应用程序 buffer 中的数据,copy 到 socket 网络发送缓冲区;
第四次:将 socket buffer 的数据,copy 到网卡,由网卡进行网络传输。
零拷贝就是尽量去减少上面数据的拷贝次数,从而减少拷贝的 CPU 开销,减少用户态内核态的上下文切换次数,从而优化数据传输的性能。
常见的零拷贝思路主要有三种:
直接 I/O:数据直接跨过内核,在用户地址空间与 I/O 设备之间传递,内核只是进行必要的虚拟存储配置等辅助工作;
避免内核和用户空间之间的数据拷贝:当应用程序不需要对数据进行访问时,则可以避免将数据从内核空间拷贝到用户空间;
写时复制:数据不需要提前拷贝,而是当需要修改的时候再进行部分拷贝。
Kafka 使用到了 mmap 和 sendfile 的方式来实现零拷贝。分别对应 Java 的 MappedByteBuffer 和 FileChannel.transferTo。
批量拉取
和生产者批量发送消息类似,消息者也是批量拉取消息的,每次拉取一个消息集合,从而大大减少了网络传输的 overhead。
另外,在 Kafka 精妙的高性能设计(上篇) 中介绍过,生产者其实在 Client 端对批量消息进行了压缩,这批消息持久化到 Broker 时,仍然保持的是压缩状态,最终在 Consumer 端再做解压缩操作。
网络模型(和前面的有重叠)
“65 哥:网络嘛,作为 Java 程序员,自然是 Netty”
是的,Netty 是 JVM 领域一个优秀的网络框架,提供了高性能的网络服务。大多数 Java 程序员提到网络框架,首先想到的就是 Netty。Dubbo、Avro-RPC 等等优秀的框架都使用 Netty 作为底层的网络通信框架。
Kafka 自己实现了网络模型做 RPC。底层基于 Java NIO,采用和 Netty 一样的 Reactor 线程模型。
Reacotr 模型主要分为三个角色
Reactor:把 IO 事件分配给对应的 handler 处理
Acceptor:处理客户端连接事件
Handler:处理非阻塞的任务
在传统阻塞 IO 模型中,每个连接都需要独立线程处理,当并发数大时,创建线程数多,占用资源;采用阻塞 IO 模型,连接建立后,若当前线程没有数据可读,线程会阻塞在读操作上,造成资源浪费
针对传统阻塞 IO 模型的两个问题,Reactor 模型基于池化思想,避免为每个连接创建线程,连接完成后将业务处理交给线程池处理;基于 IO 复用模型,多个连接共用同一个阻塞对象,不用等待所有的连接。遍历到有新数据可以处理时,操作系统会通知程序,线程跳出阻塞状态,进行业务逻辑处理
Kafka 即基于 Reactor 模型实现了多路复用和处理线程池。其设计如下:
其中包含了一个Acceptor线程,用于处理新的连接,Acceptor 有 N 个 Processor 线程 select 和 read socket 请求,N 个 Handler 线程处理请求并相应,即处理业务逻辑。
I/O 多路复用可以通过把多个 I/O 的阻塞复用到同一个 select 的阻塞上,从而使得系统在单线程的情况下可以同时处理多个客户端请求。它的最大优势是系统开销小,并且不需要创建新的进程或者线程,降低了系统的资源开销。
总结: Kafka Broker 的 KafkaServer 设计是一个优秀的网络架构,有想了解 Java 网络编程,或需要使用到这方面技术的同学不妨去读一读源码。后续『码哥』的 Kafka 系列文章也将涉及这块源码的解读。