博客
关于我
nacos配置自动刷新源码解析
阅读量:790 次
发布时间:2023-02-14

本文共 8276 字,大约阅读时间需要 27 分钟。

Nacos????????????

????

????Nacos?????Spring Cloud Native???????????????????????????????????????????????????????????????????????????????????????????????????????????????????

??????

1???????????????

???Nacos?????????????????????

[INFO] [server-push] config changed. dataId=..., group=..., tenant=...

????????????????????????????????????ClientWorker???????

public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager, final NacosClientProperties properties) throws NacosException {    this.configFilterChainManager = configFilterChainManager;    init(properties);    agent = new ConfigRpcTransportClient(properties, serverListManager);    int count = ThreadUtils.getSuitableThreadCount(THREAD_MULTIPLE);    ScheduledExecutorService executorService = Executors.newScheduledThreadPool(Math.max(count, MIN_THREAD_NUM), r -> {        Thread t = new Thread(r);        t.setName("com.alibaba.nacos.client.Worker");        t.setDaemon(true);        return t;    });    agent.setExecutor(executorService);    agent.start();}

?????????ConfigRpcTransportClient???????ScheduledExecutorService????????start()??????????????5???executeConfigListen()?

2?????????

executeConfigListen()?????????????

public void executeConfigListen() {    // ???????????????    Map
> listenCachesMap = new HashMap(16); Map
> removeListenCachesMap = new HashMap(16); // ??????? long now = System.currentTimeMillis(); // ?????????????? boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL; // 5?? for (CacheData cache : cacheMap.get().values()) { synchronized (cache) { if (cache.isSyncWithServer()) { cache.checkListenerMd5(); if (!needAllSync) { continue; } } if (!cache.isDiscard()) { // ????? if (!cache.isUseLocalConfigInfo()) { List
cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId())); if (cacheDatas == null) { cacheDatas = new LinkedList<>(); listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); } cacheDatas.add(cache); } } else if (cache.isDiscard()) { // ???? if (!cache.isUseLocalConfigInfo()) { List
cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId())); if (cacheDatas == null) { cacheDatas = new LinkedList<>(); removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas); } cacheDatas.add(cache); } } } } // ????????? if (!listenCachesMap.isEmpty()) { for (Map.Entry
> entry : listenCachesMap.entrySet()) { String taskId = entry.getKey(); List
listenCaches = entry.getValue(); // ???????? Map
timestampMap = new HashMap(listenCachesMap.size() * 2); for (CacheData cacheData : listenCaches) { timestampMap.put(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant), cacheData.getLastModifiedTs().longValue()); } // ?????????? ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches); configChangeListenRequest.setListen(true); try { RpcClient rpcClient = ensureRpcClient(taskId); ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(rpcClient, configChangeListenRequest); if (configChangeBatchListenResponse.isSuccess()) { Set
changeKeys = new HashSet(); for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse.getChangedConfigs()) { String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(), changeConfig.getTenant()); changeKeys.add(changeKey); refreshContentAndCheck(changeKey); } // ????????? for (CacheData cacheData : listenCaches) { String groupKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant); if (!changeKeys.contains(groupKey)) { synchronized (cacheData) { if (!cacheData.getListeners().isEmpty()) { Long previousTimestamp = timestampMap.get(groupKey); if (previousTimestamp != null && !cacheData.getLastModifiedTs().compareAndSet(previousTimestamp, System.currentTimeMillis())) { continue; } cacheData.setSyncWithServer(true); } cacheData.setInitializing(false); } } cacheData.setInitializing(false); } } } catch (Exception e) { LOGGER.error("Async listen config change error", e); try { Thread.sleep(50L); } catch (InterruptedException interruptedException) { // ?? } } } } // ?????? if (!removeListenCachesMap.isEmpty()) { for (Map.Entry
> entry : removeListenCachesMap.entrySet()) { String taskId = entry.getKey(); List
removeListenCaches = entry.getValue(); ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches); configChangeListenRequest.setListen(false); try { RpcClient rpcClient = ensureRpcClient(taskId); boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest); if (removeSuccess) { for (CacheData cacheData : removeListenCaches) { synchronized (cacheData) { if (cacheData.isDiscard()) { removeCache(cacheData.dataId, cacheData.group, cacheData.tenant); } } } } } catch (Exception e) { LOGGER.error("Async remove listen config change error", e); } try { Thread.sleep(50L); } catch (InterruptedException interruptedException) { // ?? } } } if (needAllSync) { lastAllSyncTime = now; } if (hasChangedKeys) { notifyListenConfig(); }}

3????????

?initRpcClientHandler??????????????????????Handler?

private void initRpcClientHandler(final RpcClient rpcClientInner) {    rpcClientInner.registerServerRequestHandler((request) -> {        if (request instanceof ConfigChangeNotifyRequest) {            ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;            LOGGER.info("[{}] [server-push] config changed. dataId={}, group={}, tenant={}",                     rpcClientInner.getName(), configChangeNotifyRequest.getDataId(),                     configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());            String groupKey = GroupKey.getKeyTenant(configChangeNotifyRequest.getDataId(),                     configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());            CacheData cacheData = cacheMap.get().get(groupKey);            if (cacheData != null) {                synchronized (cacheData) {                    cacheData.getLastModifiedTs().set(System.currentTimeMillis());                    cacheData.setSyncWithServer(false);                    notifyListenConfig();                }            }            return new ConfigChangeNotifyResponse();        }        return null;    });    // ????????}

??????????????Handler????????????????notifyListenConfig()???????????????

??@ConfigurationProperties???bean????

1???????

?refreshEnvironment()??????????????????????????????

public synchronized Set
refreshEnvironment() { Map
before = extract(this.context.getEnvironment().getPropertySources()); updateEnvironment(); Set
keys = changes(before, extract(this.context.getEnvironment().getPropertySources())).keySet(); this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys)); return keys;}

??????????ConfigurationPropertiesRebinder????????rebind()?????bean????????????

2?@RefreshScope???bean????

@RefreshScope???bean??refreshAll()???????

@ManagedOperation(description = "Dispose of the current instance of all beans in this scope and force a refresh on next method execution.")public void refreshAll() {    super.destroy();    this.context.publishEvent(new RefreshScopeRefreshedEvent());}

??GenericScope?destroy()??????????????bean????????

????

Nacos?????????????????

  • ?????????ConfigRpcTransportClient????????
  • ?????????5???executeConfigListen()????????
  • ????????????????????????????
  • ?????????@ConfigurationProperties?@RefreshScope?????bean??????
  • ????????????????????????????Nacos?????????????

    转载地址:http://gwcfk.baihongyu.com/

    你可能感兴趣的文章