Eurekaクライアントソース分析
20952 ワード
eurekaクライアントができること eurekaクライアントは、他のクライアントに要求を送信するために他のクライアントの情報を取得することができる. eurekaクライアントは、自分を登録センターに登録することができ、他のクライアントが本クライアントを発見し、直接本クライアントを呼び出すことができます.
依存関係
まずはspringbootアプリ
私がここで使っているバージョンはBrixtonです.SR 5では、バージョンによって実装ロジックが異なる場合があります.
ロジック詳細
起動クラスに@EnableDiscoveryClient注記を追加する必要があります.この注記を開いてコードを開きます.
ここではDiscoveryClientを有効にする実装クラスであることがコメントの意味でわかります.ここはどうやって起動しますか?大体の構想はspringbootのstarterメカニズムにより実現され,特定のコンフィギュレーションクラスEurekaClientAutoConfigurationをスキャンすることにより,このコンフィギュレーションクラスにおいてDiscoveryClientのbeanが生成される.ここに小さなコードを貼ります.
実はCloudEurekaClientの中のコードは少なく、基本的には親DiscoveryClientが実現しているので、親の方法を直接見てみましょう.最後に,以下の構造方法を呼び出したが,ここではまず構造方法の具体的なコードを貼り付け,検討する必要はない.
主なロジックはinitScheduledTasks()の中にあり、私たちはこの中の実現を直接見ています.
ここには全部で3つの論理がある.登録センタ情報 を取得する.心拍 ローカル情報を更新し、リモート に同期する.
ローカル情報の更新とリモートへの同期
ここではInstanceInfoReplicatorのrunメソッドを見てみましょう
登録方法を見てみると、登録方法も簡単で、httpリクエストを呼び出して、自分の情報をリモートに伝えるだけです.
レジストリセンターの情報の取得
ここでは主にCacheRefreshThreadのrunメソッドを見て,最後にfetchRegistryメソッドを呼び出した.
ここでは直接更新の様子を見てgetAndUpdateDelta
ドキドキ
次に心拍数を見て、主にHeartbeatThreadのrunメソッドを見て、最後にrenewメソッドを呼び出しました.
登録情報
ここには、登録センターから返された情報を保存するためのアプリケーションクラスのコードが貼られています.具体的な情報の内容は、自分でチェックを中断することができます.中には多くのデータ構造が保存されており、主に後期データの使いやすさのためです.
依存関係
まずはspringbootアプリ
org.springframework.cloud
spring-cloud-starter-eureka
私がここで使っているバージョンはBrixtonです.SR 5では、バージョンによって実装ロジックが異なる場合があります.
ロジック詳細
起動クラスに@EnableDiscoveryClient注記を追加する必要があります.この注記を開いてコードを開きます.
/**
* Annotation to enable a DiscoveryClient implementation.
* @author Spencer Gibb
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
}
ここではDiscoveryClientを有効にする実装クラスであることがコメントの意味でわかります.ここはどうやって起動しますか?大体の構想はspringbootのstarterメカニズムにより実現され,特定のコンフィギュレーションクラスEurekaClientAutoConfigurationをスキャンすることにより,このコンフィギュレーションクラスにおいてDiscoveryClientのbeanが生成される.ここに小さなコードを貼ります.
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
public EurekaClient eurekaClient(ApplicationInfoManager manager,
EurekaClientConfig config) {
return new CloudEurekaClient(manager, config, this.optionalArgs,
this.context);
}
実はCloudEurekaClientの中のコードは少なく、基本的には親DiscoveryClientが実現しているので、親の方法を直接見てみましょう.最後に,以下の構造方法を呼び出したが,ここではまず構造方法の具体的なコードを貼り付け,検討する必要はない.
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args,
Provider backupRegistryProvider) {
// ,
if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
}
//
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
//
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
// eureka, eureka , 。
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
return; // no need to setup up an network tasks and we are done
}
// , task ,
//heartbeatExecutor ,cacheRefreshExecutor
try {
scheduler = Executors.newScheduledThreadPool(3,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// ,
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
}
主なロジックはinitScheduledTasks()の中にあり、私たちはこの中の実現を直接見ています.
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
// , 。
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// , CacheRefreshThread
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// ,
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// , , HeartbeatThread 。
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// instanceinfo
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
ここには全部で3つの論理がある.
ローカル情報の更新とリモートへの同期
ここではInstanceInfoReplicatorのrunメソッドを見てみましょう
public void run() {
try {
//
discoveryClient.refreshInstanceInfo();
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
// , dirtyTimestamp
if (dirtyTimestamp != null) {
//
discoveryClient.register();
// , ,
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
// , ,
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
登録方法を見てみると、登録方法も簡単で、httpリクエストを呼び出して、自分の情報をリモートに伝えるだけです.
boolean register() throws Throwable {
logger.info(PREFIX + appPathIdentifier + ": registering service...");
EurekaHttpResponse httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
レジストリセンターの情報の取得
ここでは主にCacheRefreshThreadのrunメソッドを見て,最後にfetchRegistryメソッドを呼び出した.
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
//
Applications applications = getApplications();
//
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
getAndStoreFullRegistry();
} else {
//
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// Notify about cache refresh before updating the instance remote status
onCacheRefreshed();
// Update remote status based on refreshed data held in the cache
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
ここでは直接更新の様子を見てgetAndUpdateDelta
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
//
EurekaHttpResponse httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) {
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
// 。
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
ドキドキ
次に心拍数を見て、主にHeartbeatThreadのrunメソッドを見て、最後にrenewメソッドを呼び出しました.
boolean renew() {
EurekaHttpResponse httpResponse;
try {
// http ,instanceInfo 。
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
return register();
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}
登録情報
ここには、登録センターから返された情報を保存するためのアプリケーションクラスのコードが貼られています.具体的な情報の内容は、自分でチェックを中断することができます.中には多くのデータ構造が保存されており、主に後期データの使いやすさのためです.
/**
* The class that wraps all the registry information returned by eureka server.
*
*/
@Serializer("com.netflix.discovery.converters.EntityBodyConverter")
@XStreamAlias("applications")
@JsonRootName("applications")
public class Applications {
private static final String APP_INSTANCEID_DELIMITER = "$$";
private static final Logger logger = LoggerFactory.getLogger(Applications.class);
private static final String STATUS_DELIMITER = "_";
private Long versionDelta = Long.valueOf(-1);
@XStreamImplicit
private AbstractQueue applications;
private Map appNameApplicationMap = new ConcurrentHashMap();
private Map> virtualHostNameAppMap = new ConcurrentHashMap>();
private Map> secureVirtualHostNameAppMap = new ConcurrentHashMap>();
private Map virtualHostNameIndexMap = new ConcurrentHashMap();
private Map secureVirtualHostNameIndexMap = new ConcurrentHashMap();
private Map>> shuffleVirtualHostNameMap = new ConcurrentHashMap>>();
private Map>> shuffledSecureVirtualHostNameMap = new ConcurrentHashMap>>();
// 。
}