Kafka-Controller选举
一、上下文
《Kafka-broker粗粒度启动流程》博客中我们分析了broker的大致启动流程,这个时候每个broker都不是controller角色,下面我们就来看下它是如何选举出来的吧
二、设置ZooKeeper
ZooKeeper是一个开源的分布式协调服务,主要用于分布式系统中各节点的协调和管理。Kafka的Controller选举也一样用到了它。
override def startup(): Unit = {
//....
initZkClient(time)
configRepository = new ZkConfigRepository(new AdminZkClient(zkClient))
//....
}
private def initZkClient(time: Time): Unit = {
//config.zkConnect
//zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafka
info(s"Connecting to zookeeper on ${config.zkConnect}")
_zkClient = KafkaZkClient.createZkClient("Kafka server", time, config, zkClientConfig)
//如果需要,在ZK中预先创建顶级路径。
_zkClient.createTopLevelPaths()
}
def createTopLevelPaths(): Unit = {
//创建 Persistent 持久化的 zk 路径
ZkData.PersistentZkPaths.foreach(makeSurePersistentPathExists)
}
//确保ZK中存在持久路径
def makeSurePersistentPathExists(path: String): Unit = {
createRecursive(path, data = null, throwIfPathExists = false)
}
1、KafkaZkClient
KafkaZkClient是在Kafka.zookeeper.ZooKeeperClient之上提供更高级别的Kafka特定操作。
实现说明:此类包括各种组件(Controller, Configs, Old Consumer等)的方法,在某些情况下会从调用包中返回类的实例。
def createZkClient(name: String, time: Time, config: KafkaConfig, zkClientConfig: ZKClientConfig): KafkaZkClient = {
//...
KafkaZkClient(...)
}
2、AdminZkClient
它提供与ZooKeeper交互的管理员相关方法。
class AdminZkClient(...){
//创建topic
def createTopic(...){...}
//获取broker元数据
def getBrokerMetadatas(...){...}
//创建主题并可选地验证其参数。请注意,TopicCommand也使用此方法。
def createTopicWithAssignment(...){...}
//验证主题创建参数
def validateTopicCreate(...){...}
//删除topic
//为给定主题创建删除路径
def deleteTopic(...){...}
//使用可选的副本分配向现有主题添加分区。请注意,TopicCommand使用此方法。
def addPartitions(...){...}
//将broker从实体名称解析为整数id
def parseBroker(...){...}
//.....
}
3、ZkConfigRepository
zookeeper的配置仓库,也就是kafka在zookeeper中配置信息。
class ZkConfigRepository(adminZkClient: AdminZkClient) extends ConfigRepository {
override def config(configResource: ConfigResource): Properties = {
//....
//从zookeeper的目录下读取数据,并封装成实体(topic、broker、client-id、user、user/clients/client-id、ip)
adminZkClient.fetchEntityConfig(configTypeForZk, effectiveName)
}
}
4、ZkData
object ZkData {
//这些是kafka broker 启动时应该存在的持久ZK路径。
val PersistentZkPaths: Seq[String] = Seq(
ConsumerPathZNode.path, // old consumer path
BrokerIdsZNode.path,
TopicsZNode.path,
ConfigEntityChangeNotificationZNode.path,
DeleteTopicsZNode.path,
BrokerSequenceIdZNode.path,
IsrChangeNotificationZNode.path,
ProducerIdBlockZNode.path,
LogDirEventNotificationZNode.path
) ++ ConfigType.ALL.asScala.map(ConfigEntityTypeZNode.path)
}
//旧的consumer在zk上的路径
object ConsumerPathZNode {
def path = "/consumers"
}
object BrokerIdsZNode {
def path = s"${BrokersZNode.path}/ids"
def encode: Array[Byte] = null
}
object TopicsZNode {
def path = s"${BrokersZNode.path}/topics"
}
object ConfigEntityChangeNotificationZNode {
def path = s"${ConfigZNode.path}/changes"
}
object DeleteTopicsZNode {
def path = s"${AdminZNode.path}/delete_topics"
}
object BrokerSequenceIdZNode {
def path = s"${BrokersZNode.path}/seqid"
}
object IsrChangeNotificationZNode {
def path = "/isr_change_notification"
}
object ProducerIdBlockZNode {
val CurrentVersion: Long = 1L
def path = "/latest_producer_id_block"
def generateProducerIdBlockJson(producerIdBlock: ProducerIdsBlock): Array[Byte] = {
Json.encodeAsBytes(Map("version" -> CurrentVersion,
"broker" -> producerIdBlock.assignedBrokerId,
"block_start" -> producerIdBlock.firstProducerId.toString,
"block_end" -> producerIdBlock.lastProducerId.toString).asJava
)
}
object LogDirEventNotificationZNode {
def path = "/log_dir_event_notification"
}
三、验证元数据属性集成是否有效
1、meta.properties文件是否始终设置了cluster.id
2、meta.properties文件是否始终设置了node.id或者 broker.id
initialMetaPropsEnsemble.verify(Optional.of(_clusterId), verificationId, verificationFlags)
四、动态broker初始化
动态broker配置存储在ZooKeeper中,可以在两个级别定义:
1、每个代理的配置持久化在/configs/brokers/{brokerId} 这些可以使用AdminClient使用资源名称brokerId进行描述/更改
2、整个集群的默认值持续存在于/configs/brokers/<default> 这些可以使用AdminClient使用空资源名称进行描述/更改。
broker配置的优先级顺序为:
1、DYNAMIC_BROKER_CONFIG:存储在ZK中的/configs/brokers/{brokerId}
2、DYNAMIC_DEFAULT_BROKER_CONFIG: 存储在ZK中的//configs/brokers/<default>
3、STATIC_BROKER_CONFIG:启动代理时使用的属性,通常来自server.properties文件
4、DEFAULT_CONFIG:KafkaConfig中定义的默认配置
config.dynamicConfig.initialize(Some(zkClient), clientMetricsReceiverPluginOpt = None)
五、启动KafkaController
_kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, metadataCache, threadNamePrefix)
kafkaController.startup()
1、KafkaController结构
class KafkaController(...){
//事件管理
private[controller] val eventManager = new ControllerEventManager(config.brokerId, this, time,controllerContext.stats.rateAndTimeMetrics)
//如果brokerid = 当前的controllerid 那么就返回true
def isActive: Boolean = activeControllerId == config.brokerId
@volatile private var brokerInfo = initialBrokerInfo
@volatile private var _brokerEpoch = initialBrokerEpoch
//启动
def startup(): Unit = {
zkClient.registerStateChangeHandler(new StateChangeHandler {
//ControllerHandler = "controller-state-change-handler"
override val name: String = StateChangeHandlers.ControllerHandler
override def afterInitializingSession(): Unit = {
eventManager.put(RegisterBrokerAndReelect)
}
override def beforeInitializingSession(): Unit = {
val queuedEvent = eventManager.clearAndPut(Expire)
//阻止新会话的初始化,直到处理过期事件,这确保在创建新会话之前已处理所有挂起的事件
queuedEvent.awaitProcessing()
}
})
eventManager.put(Startup)
eventManager.start()
}
override def process(event: ControllerEvent): Unit = {
event match {
//.....
case RegisterBrokerAndReelect =>
processRegisterBrokerAndReelect()
case Startup =>
processStartup()
//.....
}
}
}
1、ControllerEventManager结构
class ControllerEventManager(...){
//用串行化队列代替锁
private val queue = new LinkedBlockingQueue[QueuedEvent]
//ControllerEventThreadName = "controller-event-thread"
private[controller] var thread = new ControllerEventThread(ControllerEventThreadName)
def start(): Unit = thread.start()
def put(event: ControllerEvent): QueuedEvent = inLock(putLock) {
val queuedEvent = new QueuedEvent(event, time.milliseconds())
queue.put(queuedEvent)
queuedEvent
}
class ControllerEventThread(name: String) extends ShutdownableThread(...){
override def doWork(): Unit = {
//从队列获取事件,主要是controller相关的事件
val dequeued = pollFromEventQueue()
dequeued.event match {
case controllerEvent =>
def process(): Unit = dequeued.process(processor)
}
}
}
}
}
ControllerEventManager中的ControllerEventThread的父类是ShutdownableThread,它里面有真正的run()且调起了doWork(),doWork()又调起了process(),因此真正执行的是process()
public abstract class ShutdownableThread extends Thread {
public abstract void doWork();
public void run() {
while (isRunning())
doWork();
}
}
这是一个死循环,也就是后面只要往队列中添加事件,会自动执行对应方法。从KafkaController的startup()中我们知道放了两个事件:RegisterBrokerAndReelect和Startup,下面我们来看看它们里面做了什么
2、RegisterBrokerAndReelect事件处理
private def processRegisterBrokerAndReelect(): Unit = {
_brokerEpoch = zkClient.registerBroker(brokerInfo)
processReelect()
}
1、向zookeeper注册broker
class KafkaZkClient private[zk] (...{
def registerBroker(brokerInfo: BrokerInfo): Long = {
//brokers/ids/brokerid
val path = brokerInfo.path
//创建 对应的 brokerid 的 临时znode节点,说明:当该brokers挂掉后会随之消失
val stat = checkedEphemeralCreate(path, brokerInfo.toJsonBytes)
info(s"Registered broker ${brokerInfo.broker.id} at path $path with addresses: " +
s"${brokerInfo.broker.endPoints.map(_.connectionString).mkString(",")}, czxid (broker epoch): ${stat.getCzxid}")
//返回czxid (broker epoch)
stat.getCzxid
}
}
2、开始选举
class KafkaController(...){
private def processReelect(): Unit = {
maybeResign()
elect()
}
private def maybeResign(): Unit = {
val wasActiveBeforeChange = isActive
//在zk上注册节点改变事件,当controller改变时触发,为下面的选举做铺垫
zkClient.registerZNodeChangeHandlerAndCheckExistence(controllerChangeHandler)
activeControllerId = zkClient.getControllerId.getOrElse(-1)
if (wasActiveBeforeChange && !isActive) {
//当当前broker辞去controller职务时触发
onControllerResignation()
}
}
private def elect(): Unit = {
//获取 活动状态 contoller ,如果 集群已经启动了很长时间,新增了一台broker,那么此时会获得 当下的controller ,
//如果此时集群刚刚启动,那么此时没有 活动状态的 controller ,返回的结果就是 -1
activeControllerId = zkClient.getControllerId.getOrElse(-1)
/*
* 我们可以在初始启动和handleDeleted ZK回调期间到达这里。由于潜在的竞争条件,当我们到达这里时,控制器可能已经被选中了。如果此代理已经是控制器,则此检查将防止以下createEphemeralPath方法进入无限循环。
*/
if (activeControllerId != -1) {
//如果当下已经有 activeControllerId 那么就停止选举 ,否则继续往下走
//Broker $activeControllerId 已被选为控制器,因此停止选举过程
debug(s"Broker $activeControllerId has been elected as the controller, so stopping the election process.")
return
}
//try中会发生如下情况
//1、正常运行:当选controller
//2、异常:
// 1、ControllerMovedException
// 1、其他broker成功当选controller
// 2、controller已经当选,但刚刚离职,需要重新选举
// 2、Throwable 该节点当选controller,但是就职时出错了。删除该controller,重新选举
//
try {
val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId)
controllerContext.epoch = epoch
controllerContext.epochZkVersion = epochZkVersion
activeControllerId = config.brokerId
//${config.brokerId}已成功当选为控制器。Epoch增加到${controllerContext.eepoch},Epoch zk版本现在是${controller Context.eepoch ZkVersion}”
info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} " +
s"and epoch zk version is now ${controllerContext.epochZkVersion}")
//成功当选controller,并开始履行作为该角色的责任
onControllerFailover()
} catch {
case e: ControllerMovedException =>
//重新开始监听目录变化
maybeResign()
if (activeControllerId != -1)
debug(s"代理$activeControllerId被选为控制器,而不是代理${config.brokerId}", e)
else
warn("管制员已经当选,但刚刚辞职,这将导致另一轮选举", e)
case t: Throwable =>
error(s"在代理${config.brokerId}上选择或成为控制器时出错。立即触发控制器移动", t)
triggerControllerMove()
}
}
}
在选举前zkClient注册的 controllerChangeHandler 事件其实就是观察 controller目录的变化
class ControllerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChangeHandler {
//controller目录
override val path: String = ControllerZNode.path
override def handleCreation(): Unit = eventManager.put(ControllerChange)
override def handleDeletion(): Unit = eventManager.put(Reelect)
override def handleDataChange(): Unit = eventManager.put(ControllerChange)
}
六、总结
1、设置zookeeper,如:zookeeper.connect=hostname1:2181,hostname2:2181,hostname2:2181/kafka并创建持久化目录:consumers、brokers/ids、brokers/topics、config/changes、admin/delete_topics、brokers/seqid、isr_change_notification、latest_producer_id_block、log_dir_event_notification
2、验证元数据属性集成是否有效,主要时看每个broker是否有了唯一的id
3、将每个broker的id注册到zookeeper
4、启动KafkaController
5、启动ControllerEventThread线程并不断消费LinkedBlockingQueue中事件
6、向队列注册RegisterBrokerAndReelect事件、Startup事件
7、首先处理RegisterBrokerAndReelect事件
8、向zookeeper注册broker,并建立临时znode
9、注册controllerChangeHandler 事件其实就是观察 controller目录的变化
10、每个broker开始向zookeeper将自己注册为controller
11、正常情况下只有一个broker成功注册成功,其他broker抛出ControllerMovedException继续监控controller目录的变化
12、如果选举controller成功,但是在就职时失败会里面进行卸任工作,并进行新一轮选举