redis源码系列--(四)--redis cluster
本文主要记录redis cluster相关流程 2024/11/11 10:19
redis cluster命令文档:https://redis.io/docs/latest/commands/cluster-failover/
redis集群模式和哨兵模式是不同的模式,别搞混了 --chatgpt
https://blog.csdn.net/zzti_erlie/article/details/122900856
https://xie.infoq.cn/article/e1555e88470e48f301626f35c
https://xie.infoq.cn/article/8835096d9845529fc9845bc99
本文应包括:集群启动、处理命令、集群处理比如扩容缩容迁移导入负载均衡
clusterCronclusterProcessPacket
!!!redis-cluster中投票只会统计master节点的投票,从节点的一律会忽略,比如收到一个来自x的ping包,报告y节点故障了,如果x是master节点则y_failure_count+=1,如果x是slave节点,则直接忽略这条故障报告信息,y_failure_count不变
1:client对应的fd上的read事件处理流程
getNodeByQuery函数中关于migrate、importing状态的slot的处理流程暂时略了
cluster meet:
meet包是要求接收者强制接受的即使发送者未知,而ping/pong包则是如果来源未知则会丢弃包
CLUSTER MEET 用于将启用了集群支持的不同 Redis 节点连接到工作集群中。。其基本思想是节点默认情况下不相互信任,并且被认为是未知的。因此,为了让一个给定的节点接受另一个节点进入组成 Redis 集群的节点列表,只有两种方法:第一种是系统管理员发送 CLUSTER MEET 命令来强制节点a与未知节点b会面,对于a来说b是unknow的,但是因为是cluster meet命令,所以a会接受b节点,但是如果不是cluster meet命令且b未知,那么a就会忽略这条消息。第二中是已知的节点c会发给a的相关ping/pong信息的节点列表中含有未知节点d的信息,如果接收节点a信任发送节点c作为已知节点,那么a就会接受d并尝试与d建立连接。发送meet后,a会保存b节点的信息,然后通过ping/pong消息返回b节点的信息,因为对于集群中的其他节点来说,a是受信任的节点,所以都会接受a返回的b的信息。
manulfailover:这个命令只能发送给slave
今天周五,leader调休,好像是去珠海看航展了,今天不想弄了,弄了三四天cluster流程了,大致流程是明白了,只是具体的细节没有理清而已,以及没有去画流程图了,大头已经搞完了,已经没有精力再搞了,得换个项目了,因为每次都是集中精力三四天就感觉身体被掏空,大脑就抗拒了,十分不想看了,效率就很低了,所以集中攻坚的时候晚上往往能加班到11点,而低落期到了下午三四点就有点磨洋工了。。。所以此时就先放下来,后续再去弄,除了工作外,此时适宜休息耍手机或者换一个项目,因为有新鲜感。 20241115 11:03
manulfailover流程:
1:slave告诉他的master停止处理来自客户端的查询。
2:master用当前的复制偏移量回复副本。
3:slave等待复制偏移在其一侧匹配,以确保slave数据和master同步
4:slave启动故障转移,就是开启投票即发送auth请求,如果过半master都同意则可以完成故障转移
5:旧master收到配置更新:因为master已经变了,所以这里解除对客户端的阻止后需要回复重定向消息,以便client可以重新连接到新master。
通过这种方式,客户端会原子地从旧主服务器移至新主服务器,并且仅当正在转变为新主服务器的副本已处理来自旧主服务器的所有复制流时。
fe->rfileProc(fe->clientData)
......
networking.processInputBuffer
......
server.processCommand
if server.cluster_enabled: #如果是集群,则丢给对应的节点去处理
if (server.cluster_enabled &&!mustObeyClient(c) && #如果开启了集群模式
!(!(c->cmd->flags&CMD_MOVABLE_KEYS) && #并且!(当前命令不可迁移(即必须本机处理)&&命令中的keyNum==0,)
c->cmd->key_specs_num == 0
&&c->cmd->proc != execCommand))
cluster.getNodeByQuery #获取对应的处理节点
#!!!检查命令对应的key是否全都位于同一个slot,如果不是,则报错
#!!!如果正处于事务状态,那么要求事务的所有命令的所有key都必须位于同一个slot中
#!!!如果slot处于migrateing或者importing状态,则需要进行特定处理,暂略
if cmd==exec: #如果当前命令是exec命令表示要执行事务了,此时需检查事务命令队列中的所有命令
ms = &c->mstate; #c->mstate表示事务命令队列
else:
ms = [当前单条命令]
for cmd in ms: #遍历ms中的所有命令
for key in cmd: #遍历该命令涉及的所有key
thisslot = cluster.keyHashSlot #获取key所在的slot
n = getNodeBySlot(slot) #根据slot获取节点
if thisslot!=slot: #表示不等于在此之前的key所在的slot
err_code=CLUSTER_REDIR_CROSS_SLOT #那么我们就立即返回错误
return null
if node==当前节点:
return 当前节点
else
return n
if (n == NULL || !clusterNodeIsMyself(n)) #如果不是当前节点进行处理,那么进行重定向
cluster.clusterRedirectClient #直接通知客户端重定向到其他节
networking.addReplyError #重定向是通知客户端重定向,而不是当前节点负责路由请求
return
else: #如果是本机处理,则继续往下执行
......
server.call
2:cluster启动
server.main
if server.cluster_enabled:
cluster_legacy.clusterInit
if server.cluster_enabled:
cluster_legacy.clusterInitLast
...init server.clistener,略...
server.createSocketAcceptHandler(&server.clistener, clusterAcceptHandler)
if !server.sentinel_mode:
aof.aofLoadManifestFromDisk
server.loadDataFromDisk
aof.aofOpenIfNeededOnServerStart
aof.aofDelHistoryFiles
aof.applyAppendOnlyConfig
if (server.cluster_enabled) {
cluster_legacy.verifyClusterConfigWithData
2.1:clusterCron定时任务
pfail:Possibly FAILed,即检测到长时间没有通信了,可能挂掉了,只是可能,所以叫pfail;
fail:已经确定该节点失败了,所以标记为failed
redis-cluster中任意两个节点之间都会建立连接
手动故障转移:管理员发现某个主节点崩溃了,所以在他的某个从节点上主动执行故障转移命令,这样该从节点就会启动手动故障处理流程。因为有些master节点是孤立的主节点,没有从节点,所以需要考虑这种状态。
可以通过slave migration来把master a节点的从节点slave a变成master b的从节点,在master b是孤立主节点时使用。
https://www.cnblogs.com/shanml/p/16391649.html
故障转移需要其他master节点授权
!!!当 Redis 集群中的主节点故障时,可能会导致从节点被提升为新的主节点,因为从节点往往会落后master节点,所以在这个过程中确实可能导致数据丢失,而客户端往往无法察觉到这种数据丢失,所以有时就会出现莫名其妙的bug,我明明写入了,怎么找不到,所以redis就只能作为缓存等对一致性要求不高的
从节点故障不用管,标记一下就行,但是如果master故障了,就要进行故障转移了
手动故障转移做的事情:
从节点角度:
1:用户发送 CLUSTER FAILOVER 命令。故障转移状态被初始化,并将 mf_end 设置为 Unix 毫秒时间,当该时间到达时我们将中止故障转移尝试。
2:从节点向主节点发送 MFSTART 消息,请求暂停客户端操作,暂停时间为手动故障转移超时 CLUSTER_MF_TIMEOUT 的两倍。当主节点被暂停进行手动故障转移时,它还会开始将数据包标记为 CLUSTERMSG_FLAG0_PAUSED。
3:从节点等待主节点发送其复制偏移量,且该偏移量会被标记为 PAUSED。
4:如果从节点接收到主节点的偏移量,并且其偏移量匹配,mf_can_start 被设置为 1,然后 clusterHandleSlaveFailover() 将按常规方式执行故障转移,不同之处在于投票请求会被修改,强制主节点投票给一个有正常主节点的从节点。
主节点的角度:
当主节点接收到 PAUSE_CLIENTS 数据包时,主节点也会设置 mf_end 并将发送者设置为 mf_slave。在手动故障转移的时间限制内,主节点会更频繁地向该从节点发送 PING,并标记为 PAUSED,以便从节点在接收到带有该标记的数据包时设置 mf_master_offset。
手动故障转移的目标是手动触发后cluster集群执行一个快速的故障转移,避免由于异步主从复制导致的数据丢失。
clusterCron源码流程
server.serverCron
server.clusterCron
cluster_legacy.clusterUpdateMyselfHostname
1: 强制关闭发送积压的节点的连接然后重新连接
for all server.cluster->nodes: #遍历集群所有节点
cluster_legacy.clusterNodeCronFreeLinkOnBufferLimitReached #如果一个节点的发送队列积压的数据量超过阈值则强制关闭连接
#因为此时往往是该节点超时了,如果是正常的,那么该节点稍后会自动重新连接
#如果挂掉了那么关掉后就不会自动重新连接
cluster_legacy.clusterNodeCronHandleReconnect #当前主动重新连接该节点,如果连不上则会删除保存的该节点的相关信息
2:随机选择5个节点发送ping/pong消息
if iteration%10==0: #每迭代10次才ping/pong一次即clusterCron每100ms执行一次,10次就是1s
for i in range 5: #随机选择5个节点然后从中选择一个最年老的node
#最年老:即收到该节点最后一次pong的时间距今最长
dict.dictGetRandomKey
cluster_legacy.clusterSendPing(CLUSTERMSG_TYPE_PING) #发送ping消息给这个最年老的node
#ping消息就是随机选择几个节点然后发送包含自身状态的消息
for 十分之一且至少3个节点: #选择十分之一的节点,并且至少三个。主要是达到一致性时间和网络包数的妥协
#多了容易泛滥,少了太慢
dict.dictGetRandomKey #随机选择一个节点
cluster_legacy.clusterSetGossipEntry #把要发送的数据添加到对应的缓冲区
#通信消息体中会包含那些检测到可能pfail的节点
cluster_legacy.clusterSendMessage #把填充好的数据丢到目标节点对应的发送队列中
if listLength(link->send_msg_queue) == 0 && msgblock->msg.totlen != 0: #设置对应的writeHandler
connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1); #Handler为clusterWriteHandler
listAddNodeTail(link->send_msg_queue, msgblock); #数据块添加到发送队列
3:遍历所有节点并标记失败的主节点。很多if #从节点失败不用管,主要是主节点
for all server.cluster->nodes:
3.1:当当前节点是从节点时检查当前遍历的节点是不是孤立主节点
if nodeIsSlave(myself) && clusterNodeIsMaster(node) && !nodeFailed(node): #检查孤立主节点是为了在允许迁移的时候把当前从节点迁移给该孤立主节点
#所以只有当前节点是从节点才需要检查孤立主节点因为只有从节点才允许迁移
okslaves=cluster_legacy.clusterCountNonFailingSlaves #获取该节点健康的slave数
if okslaves == 0 && node->numslots > 0 && node->flags & CLUSTER_NODE_MIGRATE_TO:
orphaned_masters++; #如果一个master健康的slave数为0且负责的slots数不为0且允许从节点迁移到他身上
#则该节点算作一个孤立主节点,孤立主节点是指正常的节点,只不过他的slave不正常
if okslaves > max_slaves: #检查孤立主节点的时候同时获取健康slave数最多的master节点的健康slave数
#因为当前节点即从节点a上执行slave迁移的时候不一定要真的迁移
#如果有更好地选择,那么当前从节点a就会放弃迁移
#如果一个master的健康slave数最多,
#那么从他的slave中选一个迁移是最好的选择,因为这样影响最小
max_slaves = okslaves;
if myself->slaveof == node:
this_slaves = okslaves;
3.2:检查该节点的各种超时时间并尝试ping/pong
mstime_t ping_delay = now - node->ping_sent; #最后一次发送ping给该节点的时间距今的时间
mstime_t data_delay = now - node->data_received; #最后一次收到该节点pong消息的时间距今的时间
if (node->link && #这里就是检查该连接是否正常,如果:当前连接不为空
now - node->link->ctime > server.cluster_node_timeout && #并且连接超时
node->ping_sent && #并且我们发送了ping消息
ping_delay > server.cluster_node_timeout/2 && #并且据我们最后一次ping该节点已经过去至少半个超时时间了
data_delay > server.cluster_node_timeout/2 : #并且我们最后一次收到该节点发来的pong消息距今超过半个超时时间了
{
cluster_legacy.freeClusterLink(node->link); #那么该节点就可能不正常,所以我们主动断掉连接
#如果该节点正常,他检测到连接被断开就会主动连接
}
if node->link &&node->ping_sent == 0 && #如果我们没有发送ping消息
(now - node->pong_received) > ping_interval #并且距离最后一次收到pong消息间隔ping_interval
{
clsuter_legacy.clusterSendPing(node->link, CLUSTERMSG_TYPE_PING); #那么我们就发送ping消息给该节点
#从这里也可以看出这个clusterCron也是一个小小的状态机器
#笔记:前面是随机选择一个节点gossip,这里是满足某些条件才会ping
#即超过一定时间还没有ping
#比如gossip阶段没有选中该节点并且他也没有指定时间内pong回来
continue; #本次流程结束,返回
}
if (server.cluster->mf_end && #如果有manual failure事件需要处理
#mf_end不为null表示该节点要处理manual failure事件
#master和slave对manual failure事件的处理不同
#master就是更频繁的ping slave,slave就是负责执行故障转移操作
#笔记:manual failure是人为主动触发的事件,
#也就是说是manual failure是一个强制操作
#强制取消当前master的master地位并把某个从节点提升为master
clusterNodeIsMaster(myself) && #如果并且当前节点是master节点,
server.cluster->mf_slave == node &&node->link: #并且该节点是当前节点的slave节点并且主从之间存在连接
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING); #则发送ping消息给该从节点
#manual failure过程中主节点会更频繁的ping从节点
continue; #本次流程结束,返回
} #啊啊啊,不搞了不搞了,下班下班,一分钟都不能都待了 2024/11/12 23:01
if node->ping_sent == 0: #如果还没发送过ping并且距离最后一次ping消息的时间间隔不足ping_interval
continue; #则直接返回
node_delay = (ping_delay < data_delay) ? ping_delay :data_delay; #node_delay表示node距今多久没与之通信(发送ping或者收到pong)
if node_delay > server.cluster_node_timeout: #如果node_delay超过了配置的超时时间,那么就认为这个节点pfail了
if !(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL)): #如果这个节点还没被标记为pfail或者fail,那么就打上标记
node->flags |= CLUSTER_NODE_PFAIL; #打上pfail标记
update_state = 1; #表示集群状态要更新
if clusterNodeIsMaster(myself) && server.cluster->size == 1: #如果当前节点是master并且只有集群只有一个master节点,
#则直接标记该节点为fail,即把pfail升级为fail
#注意:只有master才可以标记节点失败
#同时也只有来自master的fail消息才会被其他节点接受
{
cluster_legacy.markNodeAsFailingIfNeeded(node) #标记该节点为fail
1:判断是否超时或已经标记
if !nodeTimedOut(node) or nodeFailed(node): #1:判断是否超时或者已经标记,是则返回
return
2:统计是否已收到过半节点发来的针对该节点failure
failures=cluster_legacy.clusterNodeFailureReportsCount #2:统计有多少master节点报告他是failure
#注意:只有master的报告才会被统计,从节点会被忽略
if clusterNodeIsMaster(myself): #如果当前节点是master,则也投上自己的一票
failures++;
int needed_quorum = (server.cluster->size / 2) + 1; #quorum=过半,cluster->size只记录master节点数,从节点不在其中
if failures < needed_quorum: #如果没有过半master报告他有问题则认为他没问题
return
3:标记并广播该节点失效
node->flags &= ~CLUSTER_NODE_PFAIL;
node->flags |= CLUSTER_NODE_FAIL; #运行到此处则表明有过半master报告了这个节点有问题,
#已经可以确认该节点故障了,所以取消pfail标记而打上fail标记
#!!!pfail只是怀疑失败,而fail则是已经确认失败
cluster_legacy.clusterSendFail #广播该节点失效消息,
cluster_legacy.clusterBroadcastMessag #广播节点失效消息给所有节点,是所有节点
cluster_legacy.clusterSendMessage
listAddNodeTail(link->send_msg_queue, msgblock) #把数据丢到link的发送队列就行了
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); #beforefSleep中更新集群状态和持久化集群状态
} else {
serverLog(LL_DEBUG,"*** NODE %.40s possibly failing", node->name); #如果集群master很多则pfail只需要打印一条日志就行了,并不执行其他多余操作
}
4:处理故障
if nodeIsSlave(myself) &&server.masterhost == NULL && #如果当前节点是从节点并且从节点的master=null
myself->slaveof &&nodeHasAddr(myself->slaveof): #并且从节点已经分配了master并且直到mater的地址
{
replication.replicationSetMaster #那么就重新开启主从同步即重新连接master会自动进行主从同步
}
cluster_legacy.manualFailoverCheckTimeout #检查manual failure是否超时未成功
if server.cluster->mf_end && server.cluster->mf_end < mstime(): # 如果超时
cluster_legacy.resetManualFailover #则清除manual failure状态
server.cluster->mf_end = 0; #mf_end!=0表示正处于manualfailure状态,为0则表示不处于
......略其他操作......
}
if nodeIsSlave(myself): #如果是从节点
cluster_legacy.clusterHandleManualFailover #1:尝试进行manual failover
#这个函数就是检测是否可以开始manual failover
#如果故障转移已经可以开始,就打上HANDLER_FAILOVER标记
#反之如果条件还不满足则打上MANUALFAILOVER即表示还不能开始
#beforesleep中根据打上的标记来执行不同的操作
#和pfail/fail类似,
#MANUALFAILOVER表示暂时还不行,需要再次检测
#HANDLER_FAILOVER表示明确可以开始了
if server.cluster->mf_end==0: #如果mf_end==0就表示没有manual failover事件需要处理
return
if server.cluster->mf_master_offset == replicationGetSlaveOffset(): #如果从节点已经复制到最新位置了则在beforesleep中执行故障转移
clsuter_legacy.clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER) #manual failover就是打上一个标记,然后在beforesleep中处理
else:
cluster_legacy.clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER) #否则继续保持manual failover状态,需要再次检测
if !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER) #2:如果允许故障转移,则从节点尝试提升为master节点
cluster_legacy.clusterHandleSlaveFailover #执行从节点提升。
#不管是manual failover还是自动failover,
#最终都是调用clusterHandleSlaveFailover执行故障转移
1:检查是否需要执行故障转移
manual_failover = server.cluster->mf_end != 0 && server.cluster->mf_can_start
if clusterNodeIsMaster(myself) || #如果当前节点是master,则不允许,只有从节点才允许执行故障转移
myself->slaveof == NULL || #如果slave对应的master为null则不允许
(!nodeFailed(myself->slaveof) && !manual_failover) || #如果master节点没有被标记为failed并且没有开始手动故障转移则不允许
#即manual failover是强制的不管master是否正常都会执行故障转移
#反之自动failover则是只有当master出了问题时才会自动执行故障转移
(server.cluster_slave_no_failover && !manual_failover) || #如果不允许slave执行故障转移操作并且没有开始manual failover则不允许
myself->slaveof->numslots == 0) #如果之前分配给master的slots数为0即该master不对外服务则不允许
#因为故障转移就是从节点取代master节点,会继承master分配的slots
{
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE; #记录失败原因
return; #本次故障转移结束
}
2:检查当前节点是否可以从从节点变为主节点:数据年龄
if data_age>xxx: #本节点的数据年龄过大即超过了时效阈值,不能进行故障转移
return #就是说当前节点的数据太老了,不能提升为master节点
3:投票
if auth_age > auth_retry_time: #auth表示征求其他master授权当前从节点提升为主节点
#如果上一次授权请求超过再次重试时间都未收到答复则再次发送授权请求
#有两个时间点:重试和超时,重试时间点<超时时间点
cluster_legacy.clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES) #所以本次就广播一条pong消息通知其他节点,然后返回
return
if mstime() < server.cluster->failover_auth_time: #failover_auth_time表示设置的选举时间,
#只有到达指定时间才允许进行选举
#不同的节点会设置不同的选举时间以错开
return #如果还没有叨叨指定时间则本次直接返回
if auth_age > auth_timeout: #如果本次授权请求即投票超时了则本次故障转移失败
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED); #记录失败原因
return;
if server.cluster->failover_auth_sent == 0: #如果还没有发送投票请求
server.cluster->currentEpoch++; #设置当前投票轮次
cluster_legacy.clusterRequestFailoverAuth #发起投票
cluster_legacy.createClusterMsgSendBlock(CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST)
cluster_legacy.clusterBroadcastMessage #广播auth_request消息即投票消息给所有节点
#注意:所有节点都会收到该消息,但是只有master节点的投票才会计算
server.cluster->failover_auth_sent = 1; #标记已经投票了
return #所以本次流程结束,等待下次执行,因为收集投票要时间
#即这个故障转移也可以看做是一个简单的状态机
4:从节点标记为master节点并通知
if server.cluster->failover_auth_count >= needed_quorum : #如果赞成票数过半则执行从节点提升为master节点
cluster_legacy.clusterFailoverReplaceYourMaster #执行从节点提升:从节点提升为主节点
#就是设置当前节点的各种flag以及slots并且保存配置
#并且广播该消息给其他节点
else:
clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES); #如果还没有收到过半投票则继续等待,活着直到超时
if orphaned_masters && max_slaves >= 2 && #3:从节点迁移。前面是故障转移即slave取代他的master
#这里是从节点迁移即当前的slave变成另一个mater的slave
this_slaves == max_slaves && #如果孤立主节点数不为0并且集群slave数最多的master的slave数大于2
#并且这个master就是当前节点所在的master-slave节点丛的master
server.cluster_allow_replica_migration: #并且集群允许slave迁移,那么就执行副本迁移
#也就是说clusterHandleSlaveMigration就是尝试把自己
#迁移给孤立主节点做slave
{
cluster_legacy.clusterHandleSlaveMigration(max_slaves) #尝试把当前从节点slave a变成孤立主节点master b的从节点
#注意迁移是迁移当前节点,如果有更合适的则本次迁移什么也不做
if server.cluster->state != CLUSTER_OK: #流程:1:如果集群状态不正常则拒绝迁移操作;
return;
if okslaves <= server.cluster_migration_barrier: #2:如果迁移后当前节点的master的slave个数会少于barrir则拒绝迁移
return
for all servernodes #3:遍历所有节点,看是否有更合适的候选者同时会寻找孤立master
......
if is_orphaned: #找到孤立主节点那么就保存到target中
target=node
if (okslaves == max_slaves) {
...
candidate = node->slaves[j] #从节点达标的master中从节点数最多的master的nodeid最小的从节点作为新的候选者
if target && candidate == myself && ......: #4:如果没有找到更合适的候选者那么就迁移本节点
#即把本节点的master设置为该孤立从节点
cluster_legacy.clusterSetMaster(target); #操作包括:从原master的slave列表中摘去当前节点
#把当前节点加到新master的slave列表;重置主从相关设置等
}
5:更新集群状态
if update_state || server.cluster->state == CLUSTER_FAIL: #如果集群状态有变化,则更改集群状态
cluster_legacy.clusterUpdateState #cluster_ok or cluster_fail
2.2 beforesleep
server.main
server.el=ae.aeCreateEventLoop #创建事件循环
ae.aeMain(server.el)
while(!el.stop): #就一个死循环,不断处理事件,如果没有事件就阻塞
ae.aeProcessEvents
ae.aeEventLoop.beforesleep
server.beforeSleep
if server.cluster_enable
cluster_legacy.clusterBeforeSleep
if flags & CLUSTER_TODO_HANDLE_MANUALFAILOVER: #1:clusterCron里面会执行一遍这样的代码,而beforeSleep又执行一遍
#因为clusterCron是100ms才执行一次
if nodeIsSlave(myself):
cluster_legacy.clusterHandleManualFailover #设置标记,下次还会继续执行故障处理
#这个函数就是检测是否可以开始manual failover
#如果可以开始,就打上HANDLER_FAILOVER标记
#反之则打上MANUALFAILOVER即表示还不能开始下次继续检测
#beforesleep中会检测,如果打上了MANUALFAILOVER标记
#则beforesleep知道之前检测时还没准备好就再次调用
#clusterHandleManualFailover来检测和打标机
#以便在下一轮beforesleep中处理
if !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER):
clsuter_legacy.clusterHandleSlaveFailover #处理故障转移,即manual failure就是尝试把slave提升为master
#处理完后会清除manual failure标记,就不会再次处理了
#如果条件不满足则会直接返回而不执行故障转移
#这一块的逻辑就是:我调用clusterHandleManualFailover检测一下打标记
#然后不管条件满不满足我都执行一下clusterHandlerSlaveFailover
#如果满足,就执行,如果不满足,函数会自动跳过,
#所以这里才会不管条件满不满足都执行clusterHandlerSlaveFailover
else if flags & CLUSTER_TODO_HANDLE_FAILOVER: #如果是自动故障转移就直接处理了
cluster_legacy.clusterHandleSlaveFailover
if flags & CLUSTER_TODO_UPDATE_STATE: #更新集群状态
cluster_legacy.clusterUpdateState
if flags & CLUSTER_TODO_SAVE_CONFIG: #持久化当前集群状态到磁盘
cluster_legacy.clusterSaveConfigOrDie(fsync);
2.3处理ping/pong消息:clusterReadHandler
两个节点一主一从,含有相同的shardid则可以肯定从节点一定是该节点的从节点
redis收到其他节点发来的消息的时候会判断是否是已知节点,如果否,则会丢弃掉该消息
-------------------------------------------------------------------------------------------------------------------------------------------------
cluster_legacy.clusterReadHandler
connection.connRead #读取数据
cluster_legacy.clusterProcessPacket
sender = getNodeFromLinkAndMsg(link, hdr) #获取消息发送者即sender,会根据sender来判断是否是已知节点
#!!!强调一下:sender是旧信息可能过时
#!!!hdr中信息是最新的(仅当hdr->epoch>server.epoch)
#!!!如果sender和hdr对不上,那么就可以判断出许多信息
#!!!比如原来是slave现在是master,而且还在同一个分片
#!!!那么就可以判断他发生了failure
#对于未知节点发来的消息会直接丢弃
if link->node && !nodeInHandshake(link->node): #link->node是连接对象中保存的节点信息
#handshake表示该连接还处于握手阶段,不算有效连接
sender = link->node;
else:
sender = clusterLookupNode(hdr->sender, CLUSTER_NAMELEN) #hdr是收到的包中的sender信息,去集群查找该节点是否存在
#也就是说sender信息保存在link或者hdr中
#如果link和hdr中的对不上说明集群发生了变化或者消息来源未知
if sender && !link->node:
setClusterNodeToInboundClusterLink(sender, link) #设置该连接为inbound类型,即对方发起的连接而不是他主动连接的
1:根据收到的信息来更新本节点上保存的该节点的信息 #1:根据收到的信息来更新本节点上保存的该节点的信息
if sender: #如果sender不为空则更新该sender对应的data_received时间
sender->data_received = now
if sender && !nodeInHandshake(sender): #如果连接有效则根据收到的hdr中的信息更新一些信息
if senderCurrentEpoch > server.cluster->currentEpoch: #如果hdr中的epoch>server保存的epoch,
server.cluster->currentEpoch = senderCurrentEpoch #则更新server的epoch
if senderConfigEpoch > sender->configEpoch: #同理,更新configEpoch
sender->configEpoch = senderConfigEpoch
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG) #标记beforesleep中要持久化集群状态
sender->repl_offset = ntohu64(hdr->offset); #更新server保存的sender的repl_offset
if server.cluster->mf_end && #这个if是从节点处理收到master发来的repl_offset的逻辑
nodeIsSlave(myself) && #mf_end!=0表示正处于manualfailover,并且当前是从节点
myself->slaveof == sender && #sender正好是当前节点的master节点
hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
server.cluster->mf_master_offset == -1) #并且还没有收到来到master的repl_offset
{
server.cluster->mf_master_offset = sender->repl_offset #所以设置mf过程中用到的mf_master_offset字段
#一旦设置就标志manualfailure(即mf)过程可以开始了
clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER) #beforesleep中检测mf是否可以开始
}
2:开始处理ping/meet,并返回pong消息作为响应 #2:开始处理ping/meet,并返回pong消息作为响应
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
2.1:根据收到的meet消息来更新本节点ip #2.1:根据收到的meet消息来更新本节点ip
if (type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') && #如果是meet消息且本节点对外ip为空,则设置本节点ip
server.cluster_announce_ip == NULL: #meet消息就是集群发给准备加入集群的新节点的
{ #也就是说当前节点就是要加入集群的新节点
connAddrSockName(link->conn,ip,sizeof(ip) #获取ip
memcpy(myself->ip,ip,NET_IP_STR_LEN); #设置本机ip
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); #持久化集群状态
}
2.2:如果是新节点消息,则添加该节点到节点列表并持久化并通过gossip广播 #2.2:如果是新节点消息,则添加该节点到节点列表并通过gossip广播
if !sender && type == CLUSTERMSG_TYPE_MEET: #未知节点发来的meet消息,则保存该节点的信息
node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE) #创建节点信息
clusterAddNode(node); #添加节点到节点列表
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
clusterProcessGossipSection #处理发来的信息里的gossip部分的内容
clusterSendPing(CLUSTERMSG_TYPE_PONG) #发送pong消息
3:处理 ping/pong/meet消息 #3:处理 ping/pong/meet消息
if type == CLUSTERMSG_TYPE_PING / CLUSTERMSG_TYPE_PONG / CLUSTERMSG_TYPE_MEET:
if !link->inbound: #如果是本节点主动建立的连接
if nodeInHandshake(link->node): #如果连接处于handshake状态
if sender: #但是我们收到了来自该节点的有效信息
##则直接创建对应node结构体并结束handshake状态
nodeUpdateAddressIfNeeded(sender,link,hdr)
clusterDelNode(link->node);
return 0;
clusterRenameNode
link->node->flags &= ~CLUSTER_NODE_HANDSHAKE; #清除handshake标记
link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
else if link->node->name!=hdr->sender: #如果不出handshake阶段并且link中保存的
#与收到的hdr消息包中的sender对不上说明集群可能发生了变化
freeClusterLink(link); #则直接断开该连接,因为不与不知道的节点通信
if sender && type == CLUSTERMSG_TYPE_PING &&!nodeInHandshake(sender): #如果是ping消息则先更新对应的ip地址信息
nodeUpdateAddressIfNeeded(sender,link,hdr) #更新对应的ip地址信息
if !link->inbound && type == CLUSTERMSG_TYPE_PONG) #如果是收到pong消息,则表明此时节点正常
if nodeTimedOut(link->node): 什么情况下会pfail #所以如果node因为超时被打伤pfail了
link->node->flags &= ~CLUSTER_NODE_PFAIL; #那么这里就要清掉pfail,因为收到他的pong消息了
else if nodeFailed(link->node): #如果该节点已经被标记为fail的
#那么就不能直接清除要根据情况来判断是否可以清除fail标记
clearNodeFailureIfNeeded(link->node); #也就是说一个被标记为failed的节点,只要再次收到他的消息
#那么就可以认为他可能是正常的,某些情况下可以清除fail标记
if nodeIsSlave(node) || node->numslots == 0: #如果是slave节点或者虽然是master节点但是分配的slots数为0
#即这个master节点不对外服务,那么就可以清除这个slave或master的fail标记
serverLog("Clear FAIL state for node xxx is reachable again."); #再次说明:只要能访问就可以认为这个节点是可以正常通信的
#只是可以正常通信,但是节点不一定正常
#所以只能在某些情况下认为通新正常就是节点正常
node->flags &= ~CLUSTER_NODE_FAIL; #这里清除fail标记
if (clusterNodeIsMaster(node) && node->numslots > 0 && #如果是master节点,并且slots数不为0,
(now - node->fail_time) > (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT)) #并且被标记为fail的时间超过了阈值
#那么我们就可以认为该master是重新可达的可以清除fail标记
{
serverLog("Clear FAIL state for node xx: is reachable again and nobody is serving its slots after some time.")
node->flags &= ~CLUSTER_NODE_FAIL #清除fail标记
}
#检查是否有slave->master或者master->slave的转变
if sender {
if hdr->slaveof==null_name: #如果salveof字段为空则表明该节点现在是master角色
#所以这里就是检查是否有slave->master的转变
clusterSetNodeAsMaster(sender) #设置这个节点是master
if clusterNodeIsMaster(n): #如果已经是master了则返回
return
if (n->slaveof) { #如果是slave,则从原master的slave列表摘除这个节点
clusterNodeRemoveSlave(n->slaveof,n)
if n != myself:
n->flags |= CLUSTER_NODE_MIGRATE_TO #暂不清楚,这里就是标记这个节点正处于迁移状态
n->flags |= CLUSTER_NODE_MASTER #设置node为master,就是打上一个标记
n->slaveof = NULL
else #节点当前是slave,这里就是检查是否有master->slave的转变
clusterNode *master = clusterLookupNode(hdr->slaveof, CLUSTER_NAMELEN) #在hdr信息中查找该节点的原master
if clusterNodeIsMaster(sender): #如果sender原来是个master,现在是slave,那么就要更新吸纳骨干信息
if master && master->shard_id==sender->shard_id: #shardid相同表示slave是从原来的master变成了slave,还是在同一个分片
if sender->configEpoch > senderConfigEpoch:
serverLog
else
clusterMoveNodeSlots(sender, master) #所以就把sender上的slots更新到master中
clusterSetNodeAsMaster(master) #设置hdr信息包中的master节点的信息为master
master->configEpoch = senderConfigEpoch #同时更新epoch
#!!!笔记(半猜测):redis cluster各节点之间ping/pong不是直接同步最终数据集
#!!!即同步的时候一个节点不会把他看到的整个集群状态在ping/pong中同步给其他人
#!!!而是同步他自身的状态和一部分信息,这样另一个节点收到该节点信息的时候
#!!!就必须根据信息去对自身看到的集群状态数据及做不同的处理
#!!!也就是说每个节点都维护着自己看到的集群状态,并且定期接收其他节点的状态更新
#!!!但是这些更新不会包含完整的集群状态,而是通过故障、角色变化等事件来逐步
#!!!更新某个节点所看到的集群状态,并最终达到一致。举个例子:
#!!!这里从sender发来的信息知道原来是master现在变成了slave
#!!!而原来他的某个slave变成了他的master,
#!!!所以当前节点就知道原先分配给这个slave的槽现在应该分配给新master了
#!!!所以当前节点就据此去处理自己看到的集群状态数据集
#!!!即这里就是把sender的slots信息复制到master对象对应的slots信息字段中
#!!!也就是说sender发来的信息只是告诉当前节点发生了什么以及一部分数据
#!!!并不是直接告诉当前节点整个集群是什么状态
else:
clusterDelNodeSlots(sender) #sender和master不是同一个分片了,所以直接删除sender的slots就行了
#因为此时sender是slave
sender->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO) #清除MASTER和MIGRATE_TO标记,暂时不懂
sender->flags |= CLUSTER_NODE_SLAVE; #标记为slave
if master && sender->slaveof != master: #原先sender记录的不是这个master,但现在是这个master
if sender->slaveof: #那么就从sender原master的slave表中移除这个节点
clusterNodeRemoveSlave(sender->slaveof,sender)
clusterNodeAddSlave(master,sender); #新master的slave列表添加当前节点
sender->slaveof = master
updateShardId(sender, master->shard_id)
...更新slots信息并广播,略... #!!!上面是更新节点状态,这里是更新slots信息并广播,略
clusterSendUpdate(sender->link,server.cluster->slots[j]) #广播槽更新信息
break;
...处理epoch冲突问题,略... #处理epoch冲突问题,略...
clusterHandleConfigEpochCollision(sender)
if sender:
clusterProcessGossipSection(hdr,link) #处理gossip部分的信息
clusterProcessPingExtensions(hdr,link)
else if type == CLUSTERMSG_TYPE_FAIL: #处理fail消息,就是处理已经确定fail的节点
if (sender) {
if (failing &&!(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)): #就是清除pfail标记并打上fail标记
failing->flags |= CLUSTER_NODE_FAIL;
failing->flags &= ~CLUSTER_NODE_PFAIL;
else if type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD: #处理pub消息,直接pub
pubsub.pubsubPublishMessage
else if type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: #处理其他节点发来的要求故障转移投票的消息
clusterSendFailoverAuthIfNeeded #就是先看能不能投票,如果可以就投票
#要投一票则要求当前节点是master且slots数不为0
#还要求投票请求的epoch>server.epoch
#还要求投票开始时刻距今不能超过2倍节点超时时间
#还要求申请者必须是slave,master必须fail
#或者必须设置了manual failure标记
#满足这些要求才会投出一票赞成票
else if type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK #处理其他节点发来的对故障转移投票消息的投票结果
if clusterNodeIsMaster(sender) && sender->numslots > 0 &&
senderCurrentEpoch >= server.cluster->failover_auth_epoch
{
server.cluster->failover_auth_count++; #如果有master投赞成票则票数+1
}
else if type == CLUSTERMSG_TYPE_MFSTART: #处理MFSTART消息
#即从节点上开始执行manual failure
if !sender || sender->slaveof != myself:
return 1;
resetManualFailover();
server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT; #开始manual failure就是初始化mf_end
server.cluster->mf_slave = sender; #在clusterCron里面就会检测到mf_end不为0,不为0就会开始mf过程
pauseActions(PAUSE_DURING_FAILOVER);
clusterSendPing(link, CLUSTERMSG_TYPE_PING); #send ping消息回去
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
...更新一些信息,略...
} else if (type == CLUSTERMSG_TYPE_MODULE) {
...略...
} else {
serverLog(LL_WARNING,"Received unknown packet type: %d", type);
}
cluster failover命令:
failover命令会发给slave,然后slave会发送一个mfstart消息给指定节点
slave流程:
cluster.clusterCommand:
cluster_legacy.clusterCommandSpecial
if "failover":
server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT; #一旦mf_end不为0则表示本机开启了manual failover(简记为mf)
cluster_legacy.clusterSendMFStart(myself->slaveof) #发给本节点对应的master,因为只有slave能接受客户端的mf命令
clsuter_legacy.createClusterMsgSendBlock(CLUSTERMSG_TYPE_MFSTART) #创建mfstart消息
cluster_legacy.clusterSendMessage(node->link,msgblock) #发给master节点
-- -- - - - - >流程来到事件循环的clusterCron
clsuter.clusterCron:
if nodeIsSlave(myself):
cluster_legacy.clusterHandleManualFailover
if !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)
cluster_legacy.clusterHandleSlaveFailover #处理failover事件
manual_failover = server.cluster->mf_end != 0 &&server.cluster->mf_can_start #标记是否可以开始mf流程
if (clusterNodeIsMaster(myself) ||
myself->slaveof == NULL ||
(!nodeFailed(myself->slaveof) && !manual_failover) || #如果mf为真则可跳过后两个if即可以继续往下执行
(server.cluster_slave_no_failover && !manual_failover) || #如果mf为假则还需要判断是否要可以执行自动failover
myself->slaveof->numslots == 0) #如果都是否的,那么这里就直接返回,就不执行failover
{
return
}
...执行故障转移,略...
master流程:
cluster_legacy.clusterReadHandler: #收到其他节点发来的消息
cluster_legacy.clusterProcessPacket
if type==MFSTART: #如果是MFSTART,就设置mf_end表示mf开始,
#后续在clsuterCron中检测到mf_end不为0,就会更频繁的ping slave
server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
pauseActions(PAUSE_DURING_FAILOVER); #先暂停对外服务
clusterSendPing(link, CLUSTERMSG_TYPE_PING); #发送ping给slave
-- - - - - >流程来到cluster_leagcy.clusterCron
cluster_leagcy.clusterCron #clsuterCron中对mf处理比较简单,就是更频繁的ping一下slave
if (server.cluster->mf_end &&
clusterNodeIsMaster(myself) &&
server.cluster->mf_slave == node &&
node->link)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}