Elastic-Jobタスク実行プロセスのソースコード分析
63249 ワード
基礎
必要な基礎:
1.zookeeper入門:https://zookeeper.apache.org/doc/r3.6.0/zookeeperStarted.html
2.カラット入門:https://curator.apache.org/getting-started.html
3.Quartz入門:http://www.quartz-scheduler.org/documentation/quartz-2.3.0/quick-start.html
4.Elastic-Job入門(中国語):http://elasticjob.io/docs/elastic-job-lite/00-overview/
5.Elastic-Jobはgavaとlombookを使っていますが、知らなくても読書に影響はありません.
6.できるだけ視覚化ツールを設置してください.https://github.com/apache/zookeeper/tree/master/zookeeper-contrib/zookeeper-contrib-zooinspector)0あるいはzkui(https://github.com/DeemOpen/zkui)【zooispector私のコンピュータがjarファイルをコンパイルした後、swingのグラフィックインターフェースはずっと問題があると表示されています.各バージョンを試したことがありますが、jarカバンはmaven中央倉庫で提供されています.https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper-contrib-zooinspector)
モモ
デモ:https://github.com/creasylai19/zookeeperdemo/tree/elastic_jobデモ
デモは二つのクラスしかありません.
MainClass
例1:例2:
zookeeperにおけるノード情報:
instanceノード:ジョブ実行例情報、サブノードは、現在のジョブ実行例のメインキーである.
leaderノード:ジョブサーバマスタノード情報
serversノード:ジョブサーバ情報、サブノードはジョブサーバのIPアドレス
sharrdingノード:作業スライス情報、サブノードはスライス番号であり、ゼロからスライス総数まで減少する.
各スライスは、実行されるスライスの例などの情報を記憶する.
Elastic-Jobフロー
簡単に分けると、初期化の流れとタスク実行の流れに分けられます.
初期化プロセス
初期化プロセスは主にタスク情報の登録、タスクプロセッサインスタンスの登録、リーダ選挙、スケジューラの起動などである.ピクチャ:http://elasticjob.io/docs/elastic-job-lite/img/principles/job_start.jpg
タスク実行フロー
タスク実行フローは、スレッドを取得し、データをスライスし、スライス情報に基づいてタスクを生成してスレッドプールに入れて実行する.
画像:http://elasticjob.io/docs/elastic-job-lite/img/principles/job_exec.jpg
ここではタスク実行フロー【Simpleタイプのタスク】のみを分析します.
ステップ1
初期化段階では、スケジューラSchdulerに入れられたタスクは、LiteJob.classであり、org.quartz.Jobインターフェースを実現しました.
スケジューラSchdulerはLiteJob.classのexecuteメソッドを呼び出します.
executeメソッドの実行
タスクの割り当てプロセス:
現在のジョブプロセッサで処理するスライスを取得します.
スライスごとのタスクを実行します.
必要な基礎:
1.zookeeper入門:https://zookeeper.apache.org/doc/r3.6.0/zookeeperStarted.html
2.カラット入門:https://curator.apache.org/getting-started.html
3.Quartz入門:http://www.quartz-scheduler.org/documentation/quartz-2.3.0/quick-start.html
4.Elastic-Job入門(中国語):http://elasticjob.io/docs/elastic-job-lite/00-overview/
5.Elastic-Jobはgavaとlombookを使っていますが、知らなくても読書に影響はありません.
6.できるだけ視覚化ツールを設置してください.https://github.com/apache/zookeeper/tree/master/zookeeper-contrib/zookeeper-contrib-zooinspector)0あるいはzkui(https://github.com/DeemOpen/zkui)【zooispector私のコンピュータがjarファイルをコンパイルした後、swingのグラフィックインターフェースはずっと問題があると表示されています.各バージョンを試したことがありますが、jarカバンはmaven中央倉庫で提供されています.https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper-contrib-zooinspector)
モモ
デモ:https://github.com/creasylai19/zookeeperdemo/tree/elastic_jobデモ
デモは二つのクラスしかありません.
MainClass
public class MainClass {
private static final Logger logger = Logger.getLogger(MainClass.class);
public static void main(String[] args) {
new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
}
private static CoordinatorRegistryCenter createRegistryCenter() {
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", "elastic-job-demo"));
regCenter.init();
return regCenter;
}
private static LiteJobConfiguration createJobConfiguration() {
// , 10 ,
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration
.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10)
.shardingItemParameters("0=A,1=B,2=C,3=D,4=E,5=F,6=G,7=H,8=I,9=J")
.build();
// SIMPLE
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());
// Lite
LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
return simpleJobRootConfig;
}
}
タスクMyElasticJobpublic class MyElasticJob implements SimpleJob {
private static final Logger logger = Logger.getLogger(MyElasticJob.class);
@Override
public void execute(ShardingContext context) {
logger.debug(ManagementFactory.getRuntimeMXBean().getName()+":"+context.getShardingItem()+":"+context.getJobName()+":"+context.getJobParameter()+":"+context.getShardingParameter());
switch (context.getShardingItem()) {
case 0:
break;
case 1:
break;
case 2:
break;
default:
}
}
}
実行結果(2つのインスタンスがオープンし、各インスタンスはそれぞれ5つのスライスを実行し、デフォルトの割り当てポリシーは平均的に割り当てられます.)例1:例2:
zookeeperにおけるノード情報:
instanceノード:ジョブ実行例情報、サブノードは、現在のジョブ実行例のメインキーである.
leaderノード:ジョブサーバマスタノード情報
serversノード:ジョブサーバ情報、サブノードはジョブサーバのIPアドレス
sharrdingノード:作業スライス情報、サブノードはスライス番号であり、ゼロからスライス総数まで減少する.
各スライスは、実行されるスライスの例などの情報を記憶する.
Elastic-Jobフロー
簡単に分けると、初期化の流れとタスク実行の流れに分けられます.
初期化プロセス
初期化プロセスは主にタスク情報の登録、タスクプロセッサインスタンスの登録、リーダ選挙、スケジューラの起動などである.ピクチャ:http://elasticjob.io/docs/elastic-job-lite/img/principles/job_start.jpg
タスク実行フロー
タスク実行フローは、スレッドを取得し、データをスライスし、スライス情報に基づいてタスクを生成してスレッドプールに入れて実行する.
画像:http://elasticjob.io/docs/elastic-job-lite/img/principles/job_exec.jpg
ここではタスク実行フロー【Simpleタイプのタスク】のみを分析します.
ステップ1
初期化段階では、スケジューラSchdulerに入れられたタスクは、LiteJob.classであり、org.quartz.Jobインターフェースを実現しました.
//JobScheduler.class
private JobDetail createJobDetail(final String jobClass) {
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();// Job
result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
if (elasticJobInstance.isPresent()) {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
} else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
try {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
} catch (final ReflectiveOperationException ex) {
throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
}
}
return result;
}
ステップ2スケジューラSchdulerはLiteJob.classのexecuteメソッドを呼び出します.
public final class LiteJob implements Job {
@Setter
private ElasticJob elasticJob;
@Setter
private JobFacade jobFacade;
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
}
ステップ3JobExecutorFactory.getJobExecutor(elasticJob, jobFacade)
において、関連するタスクがスレッド池に対応していない場合、スレッド池となります.//AbstractElasticJobExecutor.class
protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
this.jobFacade = jobFacade;
jobRootConfig = jobFacade.loadJobRootConfiguration(true);
jobName = jobRootConfig.getTypeConfig().getCoreConfig().getJobName();
executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));//
jobExceptionHandler = (JobExceptionHandler) getHandler(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER);
itemErrorMessages = new ConcurrentHashMap<>(jobRootConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 1);
}
デフォルトのスレッドサイズは、使用可能なCPUの2倍です.//ExecutorServiceObject.class
public ExecutorServiceObject(final String namingPattern, final int threadSize) {
workQueue = new LinkedBlockingQueue<>();
threadPoolExecutor = new ThreadPoolExecutor(threadSize, threadSize, 5L, TimeUnit.MINUTES, workQueue,
new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join(namingPattern, "%s")).build());
threadPoolExecutor.allowCoreThreadTimeOut(true);
}
ステップ4executeメソッドの実行
//AbstractElasticJobExecutor.class
public final void execute() {
...
ShardingContexts shardingContexts = jobFacade.getShardingContexts();// ( ,leader )
...
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);//
...
}
ステップ4.1タスクの割り当てプロセス:
//ShardingService.class
public void shardingIfNecessary() {
List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();//
if (!isNeedSharding() || availableJobInstances.isEmpty()) {//
return;
}
if (!leaderService.isLeaderUntilBlock()) {// ( , leader )
blockUntilShardingCompleted();// , , 。 sleep100 ,sleep sleep
return;
}
// leader
waitingOtherJobCompleted();
LiteJobConfiguration liteJobConfig = configService.load(false);// zk
int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
log.debug("Job '{}' sharding begin.", jobName);
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");//
resetShardingInfo(shardingTotalCount);// ,
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());// ,
jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));// , zk
log.debug("Job '{}' sharding complete.", jobName);
}
他の非leaderノードは実行します.//ShardingService.class
// leader, , , sleep100
private void blockUntilShardingCompleted() {
while (!leaderService.isLeaderUntilBlock() && (jobNodeStorage.isJobNodeExisted(ShardingNode.NECESSARY) || jobNodeStorage.isJobNodeExisted(ShardingNode.PROCESSING))) {
log.debug("Job '{}' sleep short time until sharding completed.", jobName);
BlockUtils.waitingShortTime();
}
}
均等配分のアルゴリズム//AverageAllocationJobShardingStrategy.class
/**
* :
* 1. / =N, N ,
* 2. % = , ,
* @param jobInstances
* @param jobName
* @param shardingTotalCount
* @return
*/
public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
if (jobInstances.isEmpty()) {
return Collections.emptyMap();
}
Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount);//
addAliquant(jobInstances, shardingTotalCount, result);//
return result;
}
事務の中で映画を分けることを完成します. @Override
public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception {
// , zk 【sharding/n/instance:instanceId】
for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
for (int shardingItem : entry.getValue()) {
curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)), entry.getKey().getJobInstanceId().getBytes()).and();
}
}
curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)).and();// /leader/sharding/necessary
curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and();// /leader/sharding/processing
}
ステップ4.2現在のジョブプロセッサで処理するスライスを取得します.
//ShardingService.class
public List<Integer> getShardingItems(final String jobInstanceId) {
JobInstance jobInstance = new JobInstance(jobInstanceId);
if (!serverService.isAvailableServer(jobInstance.getIp())) {
return Collections.emptyList();
}
List<Integer> result = new LinkedList<>();
int shardingTotalCount = configService.load(true).getTypeConfig().getCoreConfig().getShardingTotalCount();
for (int i = 0; i < shardingTotalCount; i++) {
if (jobInstance.getJobInstanceId().equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {// instance ID ,
result.add(i);
}
}
return result;
}
ステップ4.3スライスごとのタスクを実行します.
//AbstractElasticJobExecutor.class
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
...
final CountDownLatch latch = new CountDownLatch(items.size());
for (final int each : items) {// ,
final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
if (executorService.isShutdown()) {
return;
}
executorService.submit(new Runnable() {
@Override
public void run() {
try {
process(shardingContexts, each, jobExecutionEvent);//
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
パラメータの組み付け//AbstractElasticJobExecutor.class
private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
...
try {
process(new ShardingContext(shardingContexts, item));// item, item , item 0, 0=A,1=B,2=C..., A
...
// CHECKSTYLE:OFF
} catch (final Throwable cause) {
// CHECKSTYLE:ON
...
}
}
processこの呼び出しは私達自身が定義したSimpleJobに異動して実現します.public final class SimpleJobExecutor extends AbstractElasticJobExecutor {
private final SimpleJob simpleJob;
public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
super(jobFacade);
this.simpleJob = simpleJob;
}
@Override
protected void process(final ShardingContext shardingContext) {
simpleJob.execute(shardingContext);// execute(shardingContext)
}
}
以上はElastic-Jobの任務実行プロセスの概観分析です.Elastic-Jobは弾性拡大縮小容積、失効転移、監視・維持などをサポートしています.後で分析をしましょう.