当前位置: 首页 > article >正文

Kafka--关于broker的夺命连环问

目录

1、zk在kafka集群中有何作用 

2、简述kafka集群中的Leader选举机制

3、kafka是如何处理数据乱序问题的。

 4、kafka中节点如何服役和退役

4.1 服役新节点

1)新节点准备

2)执行负载均衡操作

4.2 退役旧节点

5、Kafka中Leader挂了,Follower挂了,然后再启动,数据如何同步?  

6、kafka中初始化的时候Leader选举有一定的规律,如何打破这个规律呢? 

        7、kafka是如何做到高效读写

        8、Kafka集群中数据的存储是按照什么方式存储的?

        9、简述kafka中的数据清理策略。

        10、kafka中是如何快速定位到一个offset的。


1、zk在kafka集群中有何作用 


在zookeeper中存储着kafka集群中的相关信息:

(1)kafka/brokers/ids:{0,1,2} 记录集群中有哪些服务器

(2)kafka/brokers/topics/first/partition/0/state:{Leader:0,isr:{0,1,2}} 记录集群中不同分区谁是Leader以及可用的服务器

(3)kafka/controller {"borkerid:0"}辅助Leader的选举

图解: 

2、简述kafka集群中的Leader选举机制


总结: 

        集群启动后,集群中的broker会在zk中进行注册,谁先进行注册,那么先注册的broker中的Controller就会被选举为Controller Leader,负责监听broker节点的变化以及Leader的选举,随后controller将节点信息上传到zk中,其他的controller从zk中同步相关信息。当broker中的Leader挂掉时,controller会监听到节点的变化,从而获取isr,从isr存活的副本中,按照AR中的排序优先选择靠前的副本成为新的Leader,并及时更新ISR。

图解: 

3、kafka是如何处理数据乱序问题的。

 开启幂等性

 4、kafka中节点如何服役和退役

服役:执行负载均衡操作(1.创建均衡主题 2.生成副本均衡计划(设置--broker-list "0,1,2,3") 3.创建副本存储计划(将未来分区策略放在一个json文件中) 4.执行副本存储计划(运行json文件))

退役:同上 在第二步2.生成副本均衡计划中设置--broker-list "0,1,2"

4.1 服役新节点

1)新节点准备

(1)关闭 bigdata03,进行一个快照,并右键执行克隆操作。

(2)开启 bigdata04,并修改 IP 地址。

vi /etc/sysconfig/network-scripts/ifcfg-ens33

修改完记得重启网卡:
systemctl restart network

(3)在 bigdata04 上,修改主机名称为 bigdata04。

hostname bigdata04    # 临时修改

[root@bigdata04 ~]# vim /etc/hostname

bigdata04

还要记得修改 /etc/hosts文件,并进行同步

修改bigdata01的hosts 文件,修改完之后,记得同步一下

192.168.52.11 bigdata01
192.168.52.12 bigdata03
192.168.52.13 bigdata02
192.168.52.14 bigdata04

xsync.sh /etc/hosts
scp -r /etc/hosts root@bigdata04:/etc/

(4)重新启动 bigdata03、bigdata04。

(5)修改 bigdata04 中 kafka 的 broker.id 为 3。

进入bigdata04的kafka中,修改里面的配置文件   config/server.properties

(6)删除 bigdata04 中 kafka 下的 datas 和 logs。

rm -rf datas/* logs/*

(7)启动 bigdata01、bigdata02、bigdata03 上的 kafka 集群。

先启动zk集群

xcall.sh zkServer.sh stop
xcall.sh zkServer.sh start

启动kafka集群(只能启动三台)

kf.sh start 

(8)单独启动 bigdata04 中的 kafka。

bin/kafka-server-start.sh -daemon ./config/server.properties

查看kafka集群first主题的详情:

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --topic first --describe

发现副本数并没有增加。

由于我之前创建first这个主题的时候只有一个副本,不是三个副本,所以呢,演示效果不佳。

kafka-topics.sh --bootstrap-server bigdata01:9092 --topic third --create --partitions 3 --replication-factor 3

2)执行负载均衡操作

(1)创建一个要均衡的主题

创建一个文件:vi topics-to-move.json
写上如下代码,如果多个topic 可以使用,分隔
{
  "topics": [
    {"topic": "third"}
  ],
  "version": 1
}

(2)生成一个负载均衡的计划

在创建的时候,记得启动bigdata04节点,否则计划中还是没有bigdata04

bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate

未来的分区策略拷贝一份:

{"version":1,"partitions":[{"topic":"abc","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"abc","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"abc","partition":2,"replicas":[0,2,3],"log_dirs":["any","any","any"]}]}

(3)创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)。

vi increase-replication-factor.json
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[3,2,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,3,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]}]}
以上这个内容来自于第二步的执行计划。

(4)执行副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --reassignment-json-file increase-replication-factor.json --execute

(5)验证副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --reassignment-json-file increase-replication-factor.json --verify

如果不相信添加成功,可以查看first节点的详情:

4.2 退役旧节点

1)执行负载均衡操作

先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。

(1)创建一个要均衡的主题

kafka下添加文件:vim topics-to-move.json
添加如下内容:
{
 "topics": [
 {"topic": "abc"}
 ],
 "version": 1
}

(2)创建执行计划。

bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2" --generate

(3)创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)。

添加文件: vi increase-replication-factor.json 
添加如下代码:
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[1,0,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[2,1,0],"log_dirs":["any","any","any"]}]}

(4)执行副本存储计划

bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --reassignment-json-file increase-replication-factor.json --execute

(5)验证副本存储计划。

bin/kafka-reassign-partitions.sh --bootstrap-server hadoop11:9092 --reassignment-json-file increase-replication-factor.json --verify

2)执行停止命令

在 bigdata04上执行停止命令即可。

bin/kafka-server-stop.sh

5、Kafka中Leader挂了,Follower挂了,然后再启动,数据如何同步?  

总结: 

follower故障:

        当follower发生故障时,会被临时踢出ISR队列,此时的Leader和其他的follower会继续接收信息。当follower恢复后,会读取磁盘中记录的上次的HW,并将logs文件中高于HW的部分全部截掉,从HW开始从Leader中同步信息。当该follower中的LEO大于等于partition中的HW时,又重新加入ISR队列。

Leader故障:

        Leader故障后,会从ISR中重新选举出新的Leader。为了保证数据的一致性,follower会从logs文件中把高于Leader的部分全部截掉,然后重新从Leader中同步信息。

详解: 

LEO演示-- 每一个副本最后的偏移量offset + 1

HW(高水位线 High Water) 演示:所有副本中,最小的LEO

        由于数据同步的时候先进入Leader,随后同步给Follower,假如Follower挂掉了,Leader和其他的Follower 继续往前存储数据,挂掉的节点从ISR集合中剔除,此时挂掉的Follower又重启了,它会先从上一次挂掉的节点的HW开始同步数据,直到追上最后一个Follower为止,此时会重新回归ISR。

6、kafka中初始化的时候Leader选举有一定的规律,如何打破这个规律呢? 

  简述:可以通过创建副本计划(编写json文件),指定副本在broker中的存储位置,然后执行副本计划(运行json文件),打破Leader的选举规律。 

手动调整分区副本存储的步骤如下:

(1)创建一个新的 topic,名称为 three。

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 4 --replication-factor 2 --topic three

(2)查看分区副本存储情况。

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic three

(3)创建副本存储计划(所有副本都指定存储在 broker0、broker1 中)。

vi increase-replication-factor.json

输入如下内容:

{ 
  "version":1, 
  "partitions":[{"topic":"three","partition":0,"replicas":[0,1]}, 
    {"topic":"three","partition":1,"replicas":[0,1]}, 
    {"topic":"three","partition":2,"replicas":[1,0]}, 
    {"topic":"three","partition":3,"replicas":[1,0]}] 
}

(4)执行副本存储计划。

屁股决定脑袋

bin/kafka-reassign-partitions.sh --bootstrap-server bigdata01:9092 --reassignment-json-file increase-replication-factor.json --execute

(5)验证副本存储计划。

bin/kafka-reassign-partitions.sh -- bootstrap-server bigdata01:9092 --reassignment-json-file increase-replication-factor.json --verify

(6)查看分区副本存储情况

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --describe --topic three

7、kafka是如何做到高效读写

1)Kafka 本身是分布式集群,可以采用分区技术,并行度高

2)读数据采用稀疏索引,可以快速定位要消费的数据。(mysql中索引多了之后,写入速度就慢了)

3)顺序写磁盘

Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端, 为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

4)页缓存 + 零拷贝技术

零拷贝:Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka Broker应用层不关心存储的数据,所以就不用 走应用层,传输效率高

PageCache页缓存:Kafka重度依赖底层操作系统提供的PageCache功 能。当上层有写操作时,操作系统只是将数据写入 PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用

8、Kafka集群中数据的存储是按照什么方式存储的?

总结: 

        Topic是逻辑概念,partition是物理概念,每一partition对应一个log(日志文件),该log文件中存储的就是producer生产的数据。Prodecer生产的数据会被不断地追加写入到log文件的末尾,为了防止log文件过大而导致数据定位效率低下,kafka采取了分片和索引的机制,将每个partition分为多个segment。每个segment包括:".index"文件、".log"文件、".timeindex"文件,这些文件位于一个文件夹下面,该文件夹的明明规则为:topic名称+分区序号。

 图解:

9、简述kafka中的数据清理策略。

1)delete 日志删除:将过期数据删除

log.cleanup.policy = delete 所有数据启用删除策略

(1)基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。

(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。

log.retention.bytes,默认等于-1,表示无穷大。

思考:如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?

2)compact 日志压缩(合并的意思,不是真的压缩)

compact日志压缩:对于相同key的不同value值,只保留最后一个版本。

log.cleanup.policy = compact 所有数据启用压缩策略

压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。

这种策略只适合特殊场景,比如消息的key是用户ID,value是用户的资料,通过这种压缩策略,整个消息集里就保存了所有用户最新的资料。

比如:张三 去年18岁,今年19岁,这种场景下可以进行压缩。

10、kafka中是如何快速定位到一个offset的。

总结: 

        首先根据目标offset定位segment文件,找到小于等于目标offset的最大offset对应的索引,定位到log文件,向下遍历找到目标record。

图解: 

 


http://www.kler.cn/a/392809.html

相关文章:

  • Android Framework AMS(16)进程管理
  • Springboot集成syslog+logstash收集日志到ES
  • 深度学习——权重初始化、评估指标、梯度消失和梯度爆炸
  • 定时器(QTimer)与随机数生成器(QRandomGenerator)的应用实践——Qt(C++)
  • 使用支付宝沙箱完成商品下单
  • 【蓝桥等考C++真题】蓝桥杯等级考试C++组第13级L13真题原题(含答案)-最大的数
  • 半导体企业如何利用 Jira 应对复杂商业变局?
  • C++进阶-->封装map和set
  • deeponet作者相关三篇论文链接(理论基础、实用拓展、外推)
  • lmod安装和使用
  • 12 go语言(golang) - 数据类型:接口
  • C++ 优先算法 —— 四数之和(双指针)
  • 二、深度学习_基本概念笔记
  • UVC 输出视频格式修改和windows下数据分析
  • web实验3:虚拟主机基于不同端口、目录、IP、域名访问不同页面
  • Java学生管理系统(GUI和数据库)
  • vue3中查找字典列表中某个元素的值
  • 阅读《当代反无人机系统技术综述》笔记
  • Django 外键引用另一个表中的多个字段
  • Linux文件目录命令
  • 歌尔嵌入式面试题及参考答案
  • Python的装饰器
  • 什么是MVC模式?
  • python爬虫获得淘宝商品类目 API 返回值说明
  • 深入理解 Spark 中的 Shuffle
  • 不同规模的企业需要部署哪种组网?