Elastic-Jobタスク実行プロセスのソースコード分析

63249 ワード

基礎
必要な基礎:
1.zookeeper入門:https://zookeeper.apache.org/doc/r3.6.0/zookeeperStarted.html
2.カラット入門:https://curator.apache.org/getting-started.html
3.Quartz入門:http://www.quartz-scheduler.org/documentation/quartz-2.3.0/quick-start.html
4.Elastic-Job入門(中国語):http://elasticjob.io/docs/elastic-job-lite/00-overview/
5.Elastic-Jobはgavaとlombookを使っていますが、知らなくても読書に影響はありません.
6.できるだけ視覚化ツールを設置してください.https://github.com/apache/zookeeper/tree/master/zookeeper-contrib/zookeeper-contrib-zooinspector)0あるいはzkui(https://github.com/DeemOpen/zkui)【zooispector私のコンピュータがjarファイルをコンパイルした後、swingのグラフィックインターフェースはずっと問題があると表示されています.各バージョンを試したことがありますが、jarカバンはmaven中央倉庫で提供されています.https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper-contrib-zooinspector)
モモ
デモ:https://github.com/creasylai19/zookeeperdemo/tree/elastic_jobデモ
デモは二つのクラスしかありません.
MainClass
public class MainClass {

    private static final Logger logger = Logger.getLogger(MainClass.class);

    public static void main(String[] args) {
        new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
    }

    private static CoordinatorRegistryCenter createRegistryCenter() {
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183", "elastic-job-demo"));
        regCenter.init();
        return regCenter;
    }

    private static LiteJobConfiguration createJobConfiguration() {
        //         ,    10 ,                
        JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration
                .newBuilder("demoSimpleJob", "0/15 * * * * ?", 10)
                .shardingItemParameters("0=A,1=B,2=C,3=D,4=E,5=F,6=G,7=H,8=I,9=J")
                .build();
        //   SIMPLE    
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());
        //   Lite     
        LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
        return simpleJobRootConfig;
    }

}
タスクMyElasticJob
public class MyElasticJob implements SimpleJob {

    private static final Logger logger = Logger.getLogger(MyElasticJob.class);

    @Override
    public void execute(ShardingContext context) {
        logger.debug(ManagementFactory.getRuntimeMXBean().getName()+":"+context.getShardingItem()+":"+context.getJobName()+":"+context.getJobParameter()+":"+context.getShardingParameter());
        switch (context.getShardingItem()) {
            case 0:
                break;
            case 1:
                break;
            case 2:
                break;
            default:
        }
    }
}
実行結果(2つのインスタンスがオープンし、各インスタンスはそれぞれ5つのスライスを実行し、デフォルトの割り当てポリシーは平均的に割り当てられます.)
例1:Elastic-Job任务执行流程源码分析_第1张图片例2:Elastic-Job任务执行流程源码分析_第2张图片
zookeeperにおけるノード情報:Elastic-Job任务执行流程源码分析_第3张图片
instanceノード:ジョブ実行例情報、サブノードは、現在のジョブ実行例のメインキーElastic-Job任务执行流程源码分析_第4张图片である.
leaderノード:ジョブサーバマスタノード情報Elastic-Job任务执行流程源码分析_第5张图片
serversノード:ジョブサーバ情報、サブノードはジョブサーバのIPアドレスElastic-Job任务执行流程源码分析_第6张图片
sharrdingノード:作業スライス情報、サブノードはスライス番号であり、ゼロからスライス総数Elastic-Job任务执行流程源码分析_第7张图片まで減少する.
各スライスは、実行されるスライスの例などの情報在这里插入图片描述を記憶する.
Elastic-Jobフロー
簡単に分けると、初期化の流れとタスク実行の流れに分けられます.
初期化プロセス
初期化プロセスは主にタスク情報の登録、タスクプロセッサインスタンスの登録、リーダ選挙、スケジューラの起動などである.Elastic-Job任务执行流程源码分析_第8张图片ピクチャ:http://elasticjob.io/docs/elastic-job-lite/img/principles/job_start.jpg
タスク実行フロー
タスク実行フローは、スレッドを取得し、データをスライスし、スライス情報に基づいてタスクを生成してスレッドプールに入れて実行する.Elastic-Job任务执行流程源码分析_第9张图片
画像:http://elasticjob.io/docs/elastic-job-lite/img/principles/job_exec.jpg
ここではタスク実行フロー【Simpleタイプのタスク】のみを分析します.
ステップ1
初期化段階では、スケジューラSchdulerに入れられたタスクは、LiteJob.classであり、org.quartz.Jobインターフェースを実現しました.
//JobScheduler.class
	private JobDetail createJobDetail(final String jobClass) {
        JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();//       Job
        result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
        Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
        if (elasticJobInstance.isPresent()) {
            result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
        } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
            try {
                result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
            } catch (final ReflectiveOperationException ex) {
                throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
            }
        }
        return result;
    }
ステップ2
スケジューラSchdulerはLiteJob.classのexecuteメソッドを呼び出します.
public final class LiteJob implements Job {
    
    @Setter
    private ElasticJob elasticJob;
    
    @Setter
    private JobFacade jobFacade;
    
    @Override
    public void execute(final JobExecutionContext context) throws JobExecutionException {
        JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
    }
}
ステップ3JobExecutorFactory.getJobExecutor(elasticJob, jobFacade)において、関連するタスクがスレッド池に対応していない場合、スレッド池となります.
//AbstractElasticJobExecutor.class    
		protected AbstractElasticJobExecutor(final JobFacade jobFacade) {
        this.jobFacade = jobFacade;
        jobRootConfig = jobFacade.loadJobRootConfiguration(true);
        jobName = jobRootConfig.getTypeConfig().getCoreConfig().getJobName();
        executorService = ExecutorServiceHandlerRegistry.getExecutorServiceHandler(jobName, (ExecutorServiceHandler) getHandler(JobProperties.JobPropertiesEnum.EXECUTOR_SERVICE_HANDLER));//        
        jobExceptionHandler = (JobExceptionHandler) getHandler(JobProperties.JobPropertiesEnum.JOB_EXCEPTION_HANDLER);
        itemErrorMessages = new ConcurrentHashMap<>(jobRootConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(), 1);
    }
デフォルトのスレッドサイズは、使用可能なCPUの2倍です.
//ExecutorServiceObject.class    
		public ExecutorServiceObject(final String namingPattern, final int threadSize) {
        workQueue = new LinkedBlockingQueue<>();
        threadPoolExecutor = new ThreadPoolExecutor(threadSize, threadSize, 5L, TimeUnit.MINUTES, workQueue, 
                new BasicThreadFactory.Builder().namingPattern(Joiner.on("-").join(namingPattern, "%s")).build());
        threadPoolExecutor.allowCoreThreadTimeOut(true);
    }
ステップ4
executeメソッドの実行
//AbstractElasticJobExecutor.class    
		public final void execute() {
        ...
        ShardingContexts shardingContexts = jobFacade.getShardingContexts();//               (       ,leader          )
        ...
        execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);//        
        ...
    }
ステップ4.1
タスクの割り当てプロセス:
//ShardingService.class
public void shardingIfNecessary() {
        List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();//       
        if (!isNeedSharding() || availableJobInstances.isEmpty()) {//                     
            return;
        }
        if (!leaderService.isLeaderUntilBlock()) {//       (       ,   leader  )
            blockUntilShardingCompleted();//    ,           ,     。  sleep100  ,sleep             sleep
            return;
        }
  //    leader     
        waitingOtherJobCompleted();
        LiteJobConfiguration liteJobConfig = configService.load(false);// zk       
        int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
        log.debug("Job '{}' sharding begin.", jobName);
        jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");//        
        resetShardingInfo(shardingTotalCount);//      ,                  
        JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(liteJobConfig.getJobShardingStrategyClass());//        ,       
        jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));//          ,       zk 
        log.debug("Job '{}' sharding complete.", jobName);
    }
他の非leaderノードは実行します.
//ShardingService.class
//    leader,       ,     ,  sleep100  
private void blockUntilShardingCompleted() {
        while (!leaderService.isLeaderUntilBlock() && (jobNodeStorage.isJobNodeExisted(ShardingNode.NECESSARY) || jobNodeStorage.isJobNodeExisted(ShardingNode.PROCESSING))) {
            log.debug("Job '{}' sleep short time until sharding completed.", jobName);
            BlockUtils.waitingShortTime();
        }
    }
均等配分のアルゴリズム
//AverageAllocationJobShardingStrategy.class
    /**
     *    :
     *			1.    /       =N,        N   ,
     *			2.    %       =      ,             ,    
     * @param jobInstances         
     * @param jobName     
     * @param shardingTotalCount     
     * @return
     */
    public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
        if (jobInstances.isEmpty()) {
            return Collections.emptyMap();
        }
        Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount);//     
        addAliquant(jobInstances, shardingTotalCount, result);//                  
        return result;
    }
事務の中で映画を分けることを完成します.
        @Override
        public void execute(final CuratorTransactionFinal curatorTransactionFinal) throws Exception {
          //           , zk       【sharding/n/instance:instanceId】
            for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
                for (int shardingItem : entry.getValue()) {
                    curatorTransactionFinal.create().forPath(jobNodePath.getFullPath(ShardingNode.getInstanceNode(shardingItem)), entry.getKey().getJobInstanceId().getBytes()).and();
                }
            }
            curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.NECESSARY)).and();//  /leader/sharding/necessary  
            curatorTransactionFinal.delete().forPath(jobNodePath.getFullPath(ShardingNode.PROCESSING)).and();//  /leader/sharding/processing  
        }
ステップ4.2
現在のジョブプロセッサで処理するスライスを取得します.
//ShardingService.class
public List<Integer> getShardingItems(final String jobInstanceId) {
        JobInstance jobInstance = new JobInstance(jobInstanceId);
        if (!serverService.isAvailableServer(jobInstance.getIp())) {
            return Collections.emptyList();
        }
        List<Integer> result = new LinkedList<>();
        int shardingTotalCount = configService.load(true).getTypeConfig().getCoreConfig().getShardingTotalCount();
        for (int i = 0; i < shardingTotalCount; i++) {
            if (jobInstance.getJobInstanceId().equals(jobNodeStorage.getJobNodeData(ShardingNode.getInstanceNode(i)))) {//     instance               ID  ,                 
                result.add(i);
            }
        }
        return result;
    }
ステップ4.3
スライスごとのタスクを実行します.
//AbstractElasticJobExecutor.class
		private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
        Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
        ...
        final CountDownLatch latch = new CountDownLatch(items.size());
        for (final int each : items) {//             ,          
            final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
            if (executorService.isShutdown()) {
                return;
            }
            executorService.submit(new Runnable() {
                
                @Override
                public void run() {
                    try {
                        process(shardingContexts, each, jobExecutionEvent);//      
                    } finally {
                        latch.countDown();
                    }
                }
            });
        }
        try {
            latch.await();
        } catch (final InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }
パラメータの組み付け
//AbstractElasticJobExecutor.class
		private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
        ...
        try {
            process(new ShardingContext(shardingContexts, item));//     item,  item       , item 0,      0=A,1=B,2=C...,      A    
            ...
            // CHECKSTYLE:OFF
        } catch (final Throwable cause) {
            // CHECKSTYLE:ON
            ...
        }
    }
processこの呼び出しは私達自身が定義したSimpleJobに異動して実現します.
public final class SimpleJobExecutor extends AbstractElasticJobExecutor {
    
    private final SimpleJob simpleJob;
    
    public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
        super(jobFacade);
        this.simpleJob = simpleJob;
    }
    
    @Override
    protected void process(final ShardingContext shardingContext) {
        simpleJob.execute(shardingContext);//   execute(shardingContext)    
    }
}
以上はElastic-Jobの任務実行プロセスの概観分析です.Elastic-Jobは弾性拡大縮小容積、失効転移、監視・維持などをサポートしています.後で分析をしましょう.