Nacos(Config)配置中心源码分析-02
又是美好的一天呀~
个人博客地址: huanghong.top
服务端配置修改的事件推送
通过nacos ui界面新增或修改配置信息(接口访问同理)
#post请求
#示例url
http://localhost:8848/nacos/v1/cs/configs?accessToken=eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJuYWNvcyIsImV4cCI6MTY3MjgzMjQ3Nn0.7Ph4LSo2z4FYO09bZP145HRxXJJI5ZjepDodGT2LS4s&message=true
#示例请求体
dataId: config.properties
group: DEFAULT_GROUP
content: key%3Dvalue
config_tags:
type: properties
appName:
tenant:
namespaceId:
请求会进入到com.alibaba.nacos.config.server.controller.ConfigController#publishConfig方法中
publishConfig
//com.alibaba.nacos.config.server.controller.ConfigController#publishConfig
//根据传入接口的参数查询config_info表,判断配置信息是插入还是更新
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
//发布ConfigDataChangeEvent事件
ConfigChangePublisher.notifyConfigChange(
new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
onEvent
//com.alibaba.nacos.config.server.service.notify.AsyncNotifyService
//com.alibaba.nacos.common.notify.listener.Subscriber#onEvent
public void onEvent(Event event) {
// Generate ConfigDataChangeEvent concurrently
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = evt.lastModifiedTs;
String dataId = evt.dataId;
String group = evt.group;
String tenant = evt.tenant;
String tag = evt.tag;
//获取所有nacos服务端地址
Collection<Member> ipList = memberManager.allMembers();
// In fact, any type of queue here can be
Queue<NotifySingleTask> httpQueue = new LinkedList<>();
Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<>();
for (Member member : ipList) {
//检查nacos服务端是否支持长连接
//支持长连接
if (!MemberUtil.isSupportedLongCon(member)) {
httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
evt.isBeta));
} else {
//不支持长连接
rpcQueue.add(
new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));
}
}
if (!httpQueue.isEmpty()) {
//执行异步任务
ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));
}
if (!rpcQueue.isEmpty()) {
//执行异步任务
ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));
}
}
}
AsyncRpcTask
//com.alibaba.nacos.config.server.service.notify.AsyncNotifyService.AsyncRpcTask
class AsyncRpcTask implements Runnable {
private Queue<NotifySingleRpcTask> queue;
public AsyncRpcTask(Queue<NotifySingleRpcTask> queue) {
this.queue = queue;
}
@Override
public void run() {
while (!queue.isEmpty()) {
//获取任务
NotifySingleRpcTask task = queue.poll();
//构建异步请求
ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();
syncRequest.setDataId(task.getDataId());
syncRequest.setGroup(task.getGroup());
syncRequest.setBeta(task.isBeta);
syncRequest.setLastModified(task.getLastModified());
syncRequest.setTag(task.tag);
syncRequest.setTenant(task.getTenant());
//获取nacos服务端
Member member = task.member;
if (memberManager.getSelf().equals(member)) {
if (syncRequest.isBeta()) {
dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
syncRequest.getLastModified(), NetUtils.localIP(), true);
} else {
//添加dump任务至TaskManager,任务会异步执行(100ms执行一次)
dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),
syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());
}
continue;
}
if (memberManager.hasMember(member.getAddress())) {
// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress());
if (unHealthNeedDelay) {
// target ip is unhealthy, then put it in the notification list
ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
0, member.getAddress());
// get delay time and set fail count to the task
asyncTaskExecute(task);
} else {
if (!MemberUtil.isSupportedLongCon(member)) {
asyncTaskExecute(
new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,
task.getLastModified(), member.getAddress(), task.isBeta));
} else {
try {
configClusterRpcClientProxy
.syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));
} catch (Exception e) {
MetricsMonitor.getConfigNotifyException().increment();
asyncTaskExecute(task);
}
}
}
} else {
//No nothig if member has offline.
}
}
}
}
dump
//com.alibaba.nacos.config.server.service.dump.DumpService#dump(java.lang.String, java.lang.String, java.lang.String, java.lang.String, long, java.lang.String, boolean)
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
boolean isBeta) {
//构建groupkey
String groupKey = GroupKey2.getKey(dataId, group, tenant);
String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
//添加任务至TaskManager
dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}
TaskManager异步任务执行源码简要分析
-
EmbeddedDumpService初始化会进行TaskManager的初始化
-
TaskManager构造器中调用父类NacosDelayTaskExecuteEngine构造器,初始化父类
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { super(logger); tasks = new ConcurrentHashMap<>(initCapacity); processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); //processInterval = 100,延迟100ms开始执行异步任务,每100ms执行一次 processingExecutor .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS); }
-
ProcessRunnable实现Runnable接口,执行processTasks方法
//com.alibaba.nacos.config.server.manager.TaskManager#processTasks protected void processTasks() { super.processTasks(); MetricsMonitor.getDumpTaskMonitor().set(tasks.size()); if (tasks.isEmpty()) { this.lock.lock(); try { this.notEmpty.signalAll(); } finally { this.lock.unlock(); } } } //com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine#processTasks protected void processTasks() { //获取所有taskKey Collection<Object> keys = getAllTaskKeys(); for (Object taskKey : keys) { //获取并移除任务 AbstractDelayTask task = removeTask(taskKey); if (null == task) { continue; } //获取任务对应processor NacosTaskProcessor processor = getProcessor(taskKey); if (null == processor) { getEngineLog().error("processor not found for task, so discarded. " + task); continue; } try { // ReAdd task if process failed //执行异步任务,任务执行失败则重入队列 if (!processor.process(task)) { retryFailedTask(taskKey, task); } } catch (Throwable e) { getEngineLog().error("Nacos task execute error ", e); retryFailedTask(taskKey, task); } } }
process
初始化TaskManager时
//com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#process
public boolean process(NacosTask task) {
final PersistService persistService = dumpService.getPersistService();
DumpTask dumpTask = (DumpTask) task;
String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());
String dataId = pair[0];
String group = pair[1];
String tenant = pair[2];
long lastModified = dumpTask.getLastModified();
String handleIp = dumpTask.getHandleIp();
boolean isBeta = dumpTask.isBeta();
String tag = dumpTask.getTag();
//根据配置信息构建ConfigDumpEventBuilder对象
ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)
.group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);
//如果发布beta配置,则进行保存配置并更新beta缓存
if (isBeta) {
// if publish beta, then dump config, update beta cache
ConfigInfo4Beta cf = persistService.findConfigInfo4Beta(dataId, group, tenant);
build.remove(Objects.isNull(cf));
build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());
build.content(Objects.isNull(cf) ? null : cf.getContent());
build.encryptedDataKey(Objects.isNull(cf) ? null : cf.getEncryptedDataKey());
return DumpConfigHandler.configDump(build.build());
}
if (StringUtils.isBlank(tag)) {
//根据dataId, group, tenant查询config_info表获取配置信息
ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);
//属性设置
build.remove(Objects.isNull(cf));
build.content(Objects.isNull(cf) ? null : cf.getContent());
build.type(Objects.isNull(cf) ? null : cf.getType());
build.encryptedDataKey(Objects.isNull(cf) ? null : cf.getEncryptedDataKey());
} else {
ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);
build.remove(Objects.isNull(cf));
build.content(Objects.isNull(cf) ? null : cf.getContent());
}
//触发配置持久化事件
return DumpConfigHandler.configDump(build.build());
}
configDump
//com.alibaba.nacos.config.server.service.dump.DumpConfigHandler#configDump
public static boolean configDump(ConfigDumpEvent event) {
final String dataId = event.getDataId();
final String group = event.getGroup();
final String namespaceId = event.getNamespaceId();
final String content = event.getContent();
final String type = event.getType();
final long lastModified = event.getLastModifiedTs();
final String encryptedDataKey = event.getEncryptedDataKey();
...
if (StringUtils.isBlank(event.getTag())) {
//聚合白名单内容处理
if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {
AggrWhitelist.load(content);
}
if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {
ClientIpWhiteList.load(content);
}
if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {
SwitchService.load(content);
}
boolean result;
//事件没有被移除
if (!event.isRemove()) {
//配置信息持久化
//同服务端启动推送配置目录中的dump方法解析
result = ConfigCacheService
.dump(dataId, group, namespaceId, content, lastModified, type, encryptedDataKey);
//日志记录
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,
content.length());
}
} else {
result = ConfigCacheService.remove(dataId, group, namespaceId);
if (result) {
ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),
ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);
}
}
return result;
}
...
}
感谢阅读完本篇文章!!!
个人博客地址: huanghong.top