SpringCloud Aliba Nacos構成センターソースガイド

20924 ワード

Nacosコンフィギュレーションセンター
バージョン1.0に基づく.0
Nacos Client
ClientのDemoコードの使用
public class ClientDemo {
    public static void main(String[] args) {
        try {
            String serverAddr = "127.0.0.1:8848";
            String dataId = "nacos-demo.properties";
            String group = "DEFAULT_GROUP";
            Properties properties = new Properties();
            properties.put("serverAddr", serverAddr);
            ConfigService configService = NacosFactory.createConfigService(properties);
            //    listener          
            String content = configService.getConfig(dataId, group, 5000);
            System.out.println(content);
        } catch (NacosException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

プロセス全体にわたってConfigServiceが作成され、NacosFactory.createConfigService(properties);が表示されます.
public class NacosFactory {
    public static ConfigService createConfigService(Properties properties) 
        throws NacosException {
        //    
        return ConfigFactory.createConfigService(properties);
    }
//.....
}

コンフィグファクトリクラスによってコンフィグサービスが作成され、引き続き表示され、反射によってインスタンスが作成されることがわかります.
public class ConfigFactory {
    public static ConfigService createConfigService(Properties properties) 
        throws NacosException {
        try {
            Class> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
            Constructor constructor = driverImplClass.getConstructor(Properties.class);
            //        
            ConfigService vendorImpl = 
                (ConfigService) constructor.newInstance(properties);
            return vendorImpl;
        } catch (Throwable e) {
            throw new NacosException(-400, e.getMessage());
        }
    }
}
NacosConfigServiceインスタンスが反射形式で作成されました.ConfigServiceはインタフェースであり、以下のように構成されている.
public interface ConfigService {
    //      
    String getConfig(String dataId, String group, long timeoutMs) 
        throws NacosException;
    //          
    void addListener(String dataId, String group, Listener listener) 
        throws NacosException;
   //      
    boolean publishConfig(String dataId, String group, String content) 
        throws NacosException;
   //    
    boolean removeConfig(String dataId, String group) 
        throws NacosException;
    //     
    void removeListener(String dataId, String group, Listener listener);
    String getServerStatus();
}
NacosConfigServiceは以下のように実現される.
public class NacosConfigService implements ConfigService {
    private final long POST_TIMEOUT = 3000L;
    private static final String EMPTY = "";
    private HttpAgent agent;
    private ClientWorker worker;//          
    private String namespace;
    private String encode;
    private ConfigFilterChainManager configFilterChainManager = 
        new ConfigFilterChainManager();
    public NacosConfigService(Properties properties) throws NacosException {
        String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
        if (StringUtils.isBlank(encodeTmp)) {
            encode = Constants.ENCODE;
        } else {
            encode = encodeTmp.trim();
        }
        initNamespace(properties);
        //    ,    、            
        agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
        //   ServerHttpAgent,              。    。          
        agent.start();
        //       ,         
        worker = new ClientWorker(agent, configFilterChainManager);
    }
    //...
}

ClientWorkerソースコード
    public ClientWorker(final HttpAgent agent, 
                        final ConfigFilterChainManager configFilterChainManager) {
        this.agent = agent;
        this.configFilterChainManager = configFilterChainManager;
        executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        executorService = Executors.newCachedThreadPool(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("com.alibaba.nacos.client.Worker.longPolling" + agent.getName());
                t.setDaemon(true);
                return t;
            }
        });
        executor.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                try {
                    //        ,  cacheData(linstener)      md5    
                    checkConfigInfo();
                } catch (Throwable e) {
                }
            }
        }, 1L, 10L, TimeUnit.MILLISECONDS);// 10      
    }
    public void checkConfigInfo() {
        // cacheMap    groupKey -> cacheData。groupkey dataId+group  
        int listenerSize = cacheMap.get().size();
        int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
        if (longingTaskCount > currentLongingTaskCount) {
            for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
                //           
                executorService.execute(new LongPollingRunnable(i));
            }
            currentLongingTaskCount = longingTaskCount;
        }
    }
LongPollingRunnable構造
    class LongPollingRunnable implements Runnable {
        private int taskId;
        public LongPollingRunnable(int taskId) {
            this.taskId = taskId;
        }
        public void run() {
            try {
                List cacheDatas = new ArrayList();
                for (CacheData cacheData : cacheMap.get().values()) {
                    if (cacheData.getTaskId() == taskId) {
                        cacheDatas.add(cacheData);
                        try {
                            //       
                            checkLocalConfig(cacheData);
                            if (cacheData.isUseLocalConfigInfo()) {
                                //   linstener   md5 
                                cacheData.checkListenerMd5();
                            }
                        } catch (Exception e) {
                        }
                    }
                }
                List inInitializingCacheList = new ArrayList();
                //             , inInitializingCacheList   
                List changedGroupKeys = 
                    checkUpdateDataIds(cacheDatas, inInitializingCacheList);
                //                 
                for (String groupKey : changedGroupKeys) {
                    String[] key = GroupKey.parseKey(groupKey);
                    String dataId = key[0];
                    String group = key[1];
                    String tenant = null;
                    if (key.length == 3) {
                        tenant = key[2];
                    }
                    try {
                        //         
              String content = getServerConfig(dataId, group, tenant, 3000L);
              CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
                        cache.setContent(content);//  listener      
                    } catch (NacosException ioe) {
                    }
                }
                for (CacheData cacheData : cacheDatas) {
                    if (!cacheData.isInitializing() || inInitializingCacheList
                        .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                        //     md5    ,      md5  
                        //      listener receiveConfigInfo  
                        cacheData.checkListenerMd5();
                        cacheData.setInitializing(false);
                    }
                }
                inInitializingCacheList.clear();
            } catch (Throwable e) {
            } finally {
                //      ,    
                executorService.execute(this);
            }
        }
    }

checkUpdateDataIdsコードで呼び出されたcheckUpdateConfigStr

    /**
     *  Server       DataID  。        dataId group    。      NULL。
     */
//probeUpdateString dataId group md5 tenant  
List checkUpdateConfigStr(String probeUpdateString, 
                                  boolean isInitializingCacheList) {
    List params = 
        Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
    long timeout = TimeUnit.SECONDS.toMillis(30L);// 30   
    List headers = new ArrayList(2);
    headers.add("Long-Pulling-Timeout");
    headers.add("" + timeout);
    // told server do not hang me up if new initializing cacheData added in
    //            ,        
    if (isInitializingCacheList) {
        headers.add("Long-Pulling-Timeout-No-Hangup");
        headers.add("true");
    }
    if (StringUtils.isBlank(probeUpdateString)) {
        return Collections.emptyList();
    }
    try {
      //  nacos Api Constants.CONFIG_CONTROLLER_PATH + "/listener            
        HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
            agent.getEncode(), timeout);
        if (HttpURLConnection.HTTP_OK == result.code) {
            setHealthServer(true);
            //       groupkey
            return parseUpdateDataIdResponse(result.content);
        } else {
            setHealthServer(false);
        }
    } catch (IOException e) {
    }
    return Collections.emptyList();
}
checkLocalConfigメソッド
    private void checkLocalConfig(CacheData cacheData) {
        final String dataId = cacheData.dataId;
        final String group = cacheData.group;
        final String tenant = cacheData.tenant;
//            
//LOCAL_SNAPSHOT_PATH = //System.getProperty("JM.SNAPSHOT.PATH",System.getProperty("user.home")) + //File.separator+ "nacos" + File.separator + "config/data"      ;
        File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
        if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
            String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
            String md5 = MD5.getInstance().getMD5String(content);
            cacheData.setUseLocalConfigInfo(true);
            cacheData.setLocalConfigInfoVersion(path.lastModified());
            cacheData.setContent(content);
            return;
        }
        //   ->   。        , server       。
        if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
            cacheData.setUseLocalConfigInfo(false);
            return;
        }
        //    
        if (cacheData.isUseLocalConfigInfo() && path.exists()
            && cacheData.getLocalConfigInfoVersion() != path.lastModified()) {
            String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
            String md5 = MD5.getInstance().getMD5String(content);
            cacheData.setUseLocalConfigInfo(true);
            cacheData.setLocalConfigInfoVersion(path.lastModified());
            cacheData.setContent(content);
            return;
        }
    }

サービス側コードリード
APIインタフェース/nacos/v 1/cs/configs/listener、インタフェースコード
    @RequestMapping(value = "/listener", method = RequestMethod.POST)
    public void listener(HttpServletRequest request, HttpServletResponse response)
        throws ServletException, IOException {
        request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
        // client    groupkey
        String probeModify = request.getParameter("Listening-Configs");
        if (StringUtils.isBlank(probeModify)) {
            throw new IllegalArgumentException("invalid probeModify");
        }
        probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
        Map clientMd5Map;
        try {
            clientMd5Map = MD5Util.getClientMd5Map(probeModify);
        } catch (Throwable e) {
            throw new IllegalArgumentException("invalid probeModify");
        }
        // do long-polling
        //inner ConfigServletInner
        inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
    }
ConfigServletInner実装
public class ConfigServletInner {
    @Autowired
    private LongPollingService longPollingService;
    @Autowired
    private PersistService persistService;
    private static final int TRY_GET_LOCK_TIMES = 9;
    private static final int START_LONGPOLLING_VERSION_NUM = 204;
    /**
     *     
     */
    public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map clientMd5Map, int probeRequestSize)
        throws IOException, ServletException {
        //    
        if (LongPollingService.isSupportLongPolling(request)) {
            longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
            return HttpServletResponse.SC_OK + "";
        }
       // ...       
        return HttpServletResponse.SC_OK + "";
    }
longPollingService.addLongPollingClient
    public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map clientMd5Map,int probeRequestSize) {
        String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
        String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
        String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
        String tag = req.getHeader("Vipserver-Tag");
        int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
        /**
         *   500ms    ,         @qiaoyi.dingqy 2013.10.22    add delay time for LoadBalance
         */
        long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
        if (isFixedPolling()) {
            timeout = Math.max(10000, getFixedPollingInterval());
        } else {
            long start = System.currentTimeMillis();
            List changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
            if (changedGroups.size() > 0) {
                generateResponse(req, rsp, changedGroups);
                return;
            } else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
                return;
            }
        }
        String ip = RequestUtil.getRemoteIp(req);
        //     HTTP    ,              
        final AsyncContext asyncContext = req.startAsync();
        // AsyncContext.setTimeout()       ,        
        asyncContext.setTimeout(0L);
       //   30s-delayTime  ,  ClientLongPolling  
        scheduler.execute(
            new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
    }
ClientLongPollingコード
ClientLongPolling(AsyncContext ac, Map clientMd5Map, String ip, int probeRequestSize,long timeoutTime, String appName, String tag) {
    this.asyncContext = ac;
    this.clientMd5Map = clientMd5Map;
    this.probeRequestSize = probeRequestSize;
    this.createTime = System.currentTimeMillis();
    this.ip = ip;
    this.timeoutTime = timeoutTime;
    this.appName = appName;
    this.tag = tag;
}        
/**
*  final Queue allSubs;      
*               ,       api      
* 
*/
public void run() {
    asyncTimeoutFuture = scheduler.schedule(new Runnable() {
        public void run() {
            try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
                //      
                allSubs.remove(ClientLongPolling.this);
                if (isFixedPolling()) {
       //     md5       groupkey
       List changedGroups = MD5Util.compareMd5(
       (HttpServletRequest)asyncContext.getRequest(),
       (HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
                    if (changedGroups.size() > 0) {
                        sendResponse(changedGroups);
                    } else {
                        sendResponse(null);
                    }
                } else {
                    sendResponse(null);
                }
            } catch (Throwable t) {
            }
        }
    }, timeoutTime, TimeUnit.MILLISECONDS);
    allSubs.add(this);
}

変更APIの構成
    @RequestMapping(method = RequestMethod.POST)
    @ResponseBody
    public Boolean publishConfig(...)
        throws NacosException {
       //...
        final Timestamp time = TimeUtils.getCurrentTime();
        String betaIps = request.getHeader("betaIps");
        ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
        if (StringUtils.isBlank(betaIps)) {
            if (StringUtils.isBlank(tag)) {
                persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
                //         
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
            } else {
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
                EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
            }
        } else { // beta publish
persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
        }
        ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
        return true;
    }

EventDispatcher.fireEvent
    static public void fireEvent(Event event) {
        if (null == event) {
            throw new IllegalArgumentException();
        }
// linstener           
        for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {
            try {
                listener.onEvent(event);
            } catch (Exception e) {
                log.error(e.toString(), e);
            }
        }
    }

一方listenersはEntryでメンテナンスされ、EntryはLISTERNER_を再構築します.HUBで取得、LISTENER_HUBはaddEventListenerでlinstenerを追加
// AbstractEventListener           EventDispatcher.addEventListener。     
//AbstractEventListener   
static public abstract class AbstractEventListener {
    public AbstractEventListener() {
        EventDispatcher.addEventListener(this);
    }
}
LongPollingServiceを参照すると、クライアントがlinstenerを追加するとallSubはサブスクリプション関係を維持し、構成変更時にonEventイベントがトリガーされ、onEventはDataChangeTaskタスクを実行します.
    @Override
    public void onEvent(Event event) {
        if (isFixedPolling()) {
            // ignore
        } else {
            if (event instanceof LocalDataChangeEvent) {
                LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
                scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
            }
        }
    }
DataChangeTask実装
        public void run() {
            try {
                ConfigService.getContentBetaMd5(groupKey);
                for (Iterator iter = allSubs.iterator(); iter.hasNext(); ) {
                    ClientLongPolling clientSub = iter.next();
                    //        groupkey  
                    if (clientSub.clientMd5Map.containsKey(groupKey)) {
                        // ...
                        getRetainIps().put(clientSub.ip, System.currentTimeMillis());
                        iter.remove(); //       
                        //     ,           
                        clientSub.sendResponse(Arrays.asList(groupKey));
                    }
                }
            } catch (Throwable t) {
            }
        }