SpringCloud Aliba Nacos構成センターソースガイド
20924 ワード
Nacosコンフィギュレーションセンター
バージョン1.0に基づく.0
Nacos Client
ClientのDemoコードの使用
プロセス全体にわたってConfigServiceが作成され、
コンフィグファクトリクラスによってコンフィグサービスが作成され、引き続き表示され、反射によってインスタンスが作成されることがわかります.
サービス側コードリード
APIインタフェース/nacos/v 1/cs/configs/listener、インタフェースコード
変更APIの構成
一方listenersはEntryでメンテナンスされ、EntryはLISTERNER_を再構築します.HUBで取得、LISTENER_HUBはaddEventListenerでlinstenerを追加
バージョン1.0に基づく.0
Nacos Client
ClientのDemoコードの使用
public class ClientDemo {
public static void main(String[] args) {
try {
String serverAddr = "127.0.0.1:8848";
String dataId = "nacos-demo.properties";
String group = "DEFAULT_GROUP";
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
ConfigService configService = NacosFactory.createConfigService(properties);
// listener
String content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
} catch (NacosException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
プロセス全体にわたってConfigServiceが作成され、
NacosFactory.createConfigService(properties);
が表示されます.public class NacosFactory {
public static ConfigService createConfigService(Properties properties)
throws NacosException {
//
return ConfigFactory.createConfigService(properties);
}
//.....
}
コンフィグファクトリクラスによってコンフィグサービスが作成され、引き続き表示され、反射によってインスタンスが作成されることがわかります.
public class ConfigFactory {
public static ConfigService createConfigService(Properties properties)
throws NacosException {
try {
Class> driverImplClass = Class.forName("com.alibaba.nacos.client.config.NacosConfigService");
Constructor constructor = driverImplClass.getConstructor(Properties.class);
//
ConfigService vendorImpl =
(ConfigService) constructor.newInstance(properties);
return vendorImpl;
} catch (Throwable e) {
throw new NacosException(-400, e.getMessage());
}
}
}
NacosConfigService
インスタンスが反射形式で作成されました.ConfigService
はインタフェースであり、以下のように構成されている.public interface ConfigService {
//
String getConfig(String dataId, String group, long timeoutMs)
throws NacosException;
//
void addListener(String dataId, String group, Listener listener)
throws NacosException;
//
boolean publishConfig(String dataId, String group, String content)
throws NacosException;
//
boolean removeConfig(String dataId, String group)
throws NacosException;
//
void removeListener(String dataId, String group, Listener listener);
String getServerStatus();
}
NacosConfigService
は以下のように実現される.public class NacosConfigService implements ConfigService {
private final long POST_TIMEOUT = 3000L;
private static final String EMPTY = "";
private HttpAgent agent;
private ClientWorker worker;//
private String namespace;
private String encode;
private ConfigFilterChainManager configFilterChainManager =
new ConfigFilterChainManager();
public NacosConfigService(Properties properties) throws NacosException {
String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE);
if (StringUtils.isBlank(encodeTmp)) {
encode = Constants.ENCODE;
} else {
encode = encodeTmp.trim();
}
initNamespace(properties);
// , 、
agent = new MetricsHttpAgent(new ServerHttpAgent(properties));
// ServerHttpAgent, 。 。
agent.start();
// ,
worker = new ClientWorker(agent, configFilterChainManager);
}
//...
}
ClientWorker
ソースコード public ClientWorker(final HttpAgent agent,
final ConfigFilterChainManager configFilterChainManager) {
this.agent = agent;
this.configFilterChainManager = configFilterChainManager;
executor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker." + agent.getName());
t.setDaemon(true);
return t;
}
});
executorService = Executors.newCachedThreadPool(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker.longPolling" + agent.getName());
t.setDaemon(true);
return t;
}
});
executor.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
// , cacheData(linstener) md5
checkConfigInfo();
} catch (Throwable e) {
}
}
}, 1L, 10L, TimeUnit.MILLISECONDS);// 10
}
public void checkConfigInfo() {
// cacheMap groupKey -> cacheData。groupkey dataId+group
int listenerSize = cacheMap.get().size();
int longingTaskCount = (int)Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize());
if (longingTaskCount > currentLongingTaskCount) {
for (int i = (int)currentLongingTaskCount; i < longingTaskCount; i++) {
//
executorService.execute(new LongPollingRunnable(i));
}
currentLongingTaskCount = longingTaskCount;
}
}
LongPollingRunnable
構造 class LongPollingRunnable implements Runnable {
private int taskId;
public LongPollingRunnable(int taskId) {
this.taskId = taskId;
}
public void run() {
try {
List cacheDatas = new ArrayList();
for (CacheData cacheData : cacheMap.get().values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
//
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
// linstener md5
cacheData.checkListenerMd5();
}
} catch (Exception e) {
}
}
}
List inInitializingCacheList = new ArrayList();
// , inInitializingCacheList
List changedGroupKeys =
checkUpdateDataIds(cacheDatas, inInitializingCacheList);
//
for (String groupKey : changedGroupKeys) {
String[] key = GroupKey.parseKey(groupKey);
String dataId = key[0];
String group = key[1];
String tenant = null;
if (key.length == 3) {
tenant = key[2];
}
try {
//
String content = getServerConfig(dataId, group, tenant, 3000L);
CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant));
cache.setContent(content);// listener
} catch (NacosException ioe) {
}
}
for (CacheData cacheData : cacheDatas) {
if (!cacheData.isInitializing() || inInitializingCacheList
.contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
// md5 , md5
// listener receiveConfigInfo
cacheData.checkListenerMd5();
cacheData.setInitializing(false);
}
}
inInitializingCacheList.clear();
} catch (Throwable e) {
} finally {
// ,
executorService.execute(this);
}
}
}
checkUpdateDataIds
コードで呼び出されたcheckUpdateConfigStr
/**
* Server DataID 。 dataId group 。 NULL。
*/
//probeUpdateString dataId group md5 tenant
List checkUpdateConfigStr(String probeUpdateString,
boolean isInitializingCacheList) {
List params =
Arrays.asList(Constants.PROBE_MODIFY_REQUEST, probeUpdateString);
long timeout = TimeUnit.SECONDS.toMillis(30L);// 30
List headers = new ArrayList(2);
headers.add("Long-Pulling-Timeout");
headers.add("" + timeout);
// told server do not hang me up if new initializing cacheData added in
// ,
if (isInitializingCacheList) {
headers.add("Long-Pulling-Timeout-No-Hangup");
headers.add("true");
}
if (StringUtils.isBlank(probeUpdateString)) {
return Collections.emptyList();
}
try {
// nacos Api Constants.CONFIG_CONTROLLER_PATH + "/listener
HttpResult result = agent.httpPost(Constants.CONFIG_CONTROLLER_PATH + "/listener", headers, params,
agent.getEncode(), timeout);
if (HttpURLConnection.HTTP_OK == result.code) {
setHealthServer(true);
// groupkey
return parseUpdateDataIdResponse(result.content);
} else {
setHealthServer(false);
}
} catch (IOException e) {
}
return Collections.emptyList();
}
checkLocalConfig
メソッド private void checkLocalConfig(CacheData cacheData) {
final String dataId = cacheData.dataId;
final String group = cacheData.group;
final String tenant = cacheData.tenant;
//
//LOCAL_SNAPSHOT_PATH = //System.getProperty("JM.SNAPSHOT.PATH",System.getProperty("user.home")) + //File.separator+ "nacos" + File.separator + "config/data" ;
File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant);
if (!cacheData.isUseLocalConfigInfo() && path.exists()) {
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
String md5 = MD5.getInstance().getMD5String(content);
cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content);
return;
}
// -> 。 , server 。
if (cacheData.isUseLocalConfigInfo() && !path.exists()) {
cacheData.setUseLocalConfigInfo(false);
return;
}
//
if (cacheData.isUseLocalConfigInfo() && path.exists()
&& cacheData.getLocalConfigInfoVersion() != path.lastModified()) {
String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant);
String md5 = MD5.getInstance().getMD5String(content);
cacheData.setUseLocalConfigInfo(true);
cacheData.setLocalConfigInfoVersion(path.lastModified());
cacheData.setContent(content);
return;
}
}
サービス側コードリード
APIインタフェース/nacos/v 1/cs/configs/listener、インタフェースコード
@RequestMapping(value = "/listener", method = RequestMethod.POST)
public void listener(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
// client groupkey
String probeModify = request.getParameter("Listening-Configs");
if (StringUtils.isBlank(probeModify)) {
throw new IllegalArgumentException("invalid probeModify");
}
probeModify = URLDecoder.decode(probeModify, Constants.ENCODE);
Map clientMd5Map;
try {
clientMd5Map = MD5Util.getClientMd5Map(probeModify);
} catch (Throwable e) {
throw new IllegalArgumentException("invalid probeModify");
}
// do long-polling
//inner ConfigServletInner
inner.doPollingConfig(request, response, clientMd5Map, probeModify.length());
}
ConfigServletInner
実装public class ConfigServletInner {
@Autowired
private LongPollingService longPollingService;
@Autowired
private PersistService persistService;
private static final int TRY_GET_LOCK_TIMES = 9;
private static final int START_LONGPOLLING_VERSION_NUM = 204;
/**
*
*/
public String doPollingConfig(HttpServletRequest request, HttpServletResponse response, Map clientMd5Map, int probeRequestSize)
throws IOException, ServletException {
//
if (LongPollingService.isSupportLongPolling(request)) {
longPollingService.addLongPollingClient(request, response, clientMd5Map, probeRequestSize);
return HttpServletResponse.SC_OK + "";
}
// ...
return HttpServletResponse.SC_OK + "";
}
longPollingService.addLongPollingClient
public void addLongPollingClient(HttpServletRequest req, HttpServletResponse rsp, Map clientMd5Map,int probeRequestSize) {
String str = req.getHeader(LongPollingService.LONG_POLLING_HEADER);
String noHangUpFlag = req.getHeader(LongPollingService.LONG_POLLING_NO_HANG_UP_HEADER);
String appName = req.getHeader(RequestUtil.CLIENT_APPNAME_HEADER);
String tag = req.getHeader("Vipserver-Tag");
int delayTime = SwitchService.getSwitchInteger(SwitchService.FIXED_DELAY_TIME, 500);
/**
* 500ms , @qiaoyi.dingqy 2013.10.22 add delay time for LoadBalance
*/
long timeout = Math.max(10000, Long.parseLong(str) - delayTime);
if (isFixedPolling()) {
timeout = Math.max(10000, getFixedPollingInterval());
} else {
long start = System.currentTimeMillis();
List changedGroups = MD5Util.compareMd5(req, rsp, clientMd5Map);
if (changedGroups.size() > 0) {
generateResponse(req, rsp, changedGroups);
return;
} else if (noHangUpFlag != null && noHangUpFlag.equalsIgnoreCase(TRUE_STR)) {
return;
}
}
String ip = RequestUtil.getRemoteIp(req);
// HTTP ,
final AsyncContext asyncContext = req.startAsync();
// AsyncContext.setTimeout() ,
asyncContext.setTimeout(0L);
// 30s-delayTime , ClientLongPolling
scheduler.execute(
new ClientLongPolling(asyncContext, clientMd5Map, ip, probeRequestSize, timeout, appName, tag));
}
ClientLongPolling
コードClientLongPolling(AsyncContext ac, Map clientMd5Map, String ip, int probeRequestSize,long timeoutTime, String appName, String tag) {
this.asyncContext = ac;
this.clientMd5Map = clientMd5Map;
this.probeRequestSize = probeRequestSize;
this.createTime = System.currentTimeMillis();
this.ip = ip;
this.timeoutTime = timeoutTime;
this.appName = appName;
this.tag = tag;
}
/**
* final Queue allSubs;
* , api
*
*/
public void run() {
asyncTimeoutFuture = scheduler.schedule(new Runnable() {
public void run() {
try {
getRetainIps().put(ClientLongPolling.this.ip, System.currentTimeMillis());
//
allSubs.remove(ClientLongPolling.this);
if (isFixedPolling()) {
// md5 groupkey
List changedGroups = MD5Util.compareMd5(
(HttpServletRequest)asyncContext.getRequest(),
(HttpServletResponse)asyncContext.getResponse(), clientMd5Map);
if (changedGroups.size() > 0) {
sendResponse(changedGroups);
} else {
sendResponse(null);
}
} else {
sendResponse(null);
}
} catch (Throwable t) {
}
}
}, timeoutTime, TimeUnit.MILLISECONDS);
allSubs.add(this);
}
変更APIの構成
@RequestMapping(method = RequestMethod.POST)
@ResponseBody
public Boolean publishConfig(...)
throws NacosException {
//...
final Timestamp time = TimeUtils.getCurrentTime();
String betaIps = request.getHeader("betaIps");
ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content);
if (StringUtils.isBlank(betaIps)) {
if (StringUtils.isBlank(tag)) {
persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);
//
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
} else {
persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
}
} else { // beta publish
persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, time, false);
EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
}
ConfigTraceService.logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), LOCAL_IP, ConfigTraceService.PERSISTENCE_EVENT_PUB, content);
return true;
}
EventDispatcher.fireEvent
static public void fireEvent(Event event) {
if (null == event) {
throw new IllegalArgumentException();
}
// linstener
for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {
try {
listener.onEvent(event);
} catch (Exception e) {
log.error(e.toString(), e);
}
}
}
一方listenersはEntryでメンテナンスされ、EntryはLISTERNER_を再構築します.HUBで取得、LISTENER_HUBはaddEventListenerでlinstenerを追加
// AbstractEventListener EventDispatcher.addEventListener。
//AbstractEventListener
static public abstract class AbstractEventListener {
public AbstractEventListener() {
EventDispatcher.addEventListener(this);
}
}
LongPollingService
を参照すると、クライアントがlinstenerを追加するとallSubはサブスクリプション関係を維持し、構成変更時にonEventイベントがトリガーされ、onEventはDataChangeTaskタスクを実行します. @Override
public void onEvent(Event event) {
if (isFixedPolling()) {
// ignore
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}
DataChangeTask
実装 public void run() {
try {
ConfigService.getContentBetaMd5(groupKey);
for (Iterator iter = allSubs.iterator(); iter.hasNext(); ) {
ClientLongPolling clientSub = iter.next();
// groupkey
if (clientSub.clientMd5Map.containsKey(groupKey)) {
// ...
getRetainIps().put(clientSub.ip, System.currentTimeMillis());
iter.remove(); //
// ,
clientSub.sendResponse(Arrays.asList(groupKey));
}
}
} catch (Throwable t) {
}
}