HadoopのJobコミットプロセスの概要(一)

4012 ワード

Hadoopは利用者に3種類の宿題を提出する方法を提供し、3種類のこのようなAPIを提供した.なぜ3種類の異なる方法があるのか、Hadoopはその歴史上新旧の2つのAPIと1つの融通のきく方法があるため、この3つの方法はそれぞれ:1、JobClient.runJob():古いAPIであるJobClientクラスによって提供されるメソッドrunJob()を呼び出す.2、Job.waitForCompletion():新しいAPIに属するJobクラスによって提供されるメソッドwaitForCompletion()を呼び出します.3、ToolRunner.run():ToolRunnerクラスによって提供されるrun()メソッドを呼び出します.これはトランスフォームメソッドです.
古いAPIは現在ではあまり使われていないため、古いバージョンを維持するためだけにコードには古いAPIが残っています.したがって、現在主に2番目のAPIを使用しています.Hadoopソースコードには、hadoop-mapreduce-projectのhadoop-mapreduce-examplesのWordCount.の例も示されています.java
WordCountクラスの詳細については、次の文書を参照してください.(https://www.jianshu.com/p/3136a9fa84ed)
WordCountクラスには、TokenizerMapperとIntSumReducerの2つのクラスが定義されています.前者はMapperクラスのmapメソッドを上書きし,後者はreducerクラスのreduceメソッドを上書きした.コミットされたコードにmapメソッドとreduceメソッドが上書きされていない場合、hadoopは自動的に自分のデフォルトのmap reduceメソッドを使用します.
具体的なmap reduceの実現過程を省略する
次のWordCountのmainメソッドのコード:
  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount  [...] ");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

プログラムはジョブのコミットのためにwaitForCompletionメソッドを呼び出すJobオブジェクトを新規に作成し、メソッドの戻りtrueが終わるまで呼び出すことがわかります.System.exit(job.waitForCompletion(true) ? 0 : 1);は、trueを返すと、プログラムは0、Systemを返すことを意味する.exit.falseを返すと、プログラムは1を返し、プログラムの実行に失敗したことがわかります.
Hadoopのmapreduceのjobがどのように提出されたかは大体分かりましたが、Jobクラスの具体的な実装を見てみましょう.
public class Job extends JobContextImpl implements JobContext{}

Jobクラスには多くの静的属性が定義されています.たとえば、public static enum JobState {DEFINE, RUNNING}は、jobが準備フェーズと実行フェーズにあることを示す列挙属性JobStateを定義しています.DEFINEとRUNNINGは、jobが準備フェーズと実行フェーズにあることを示しています.private static final long MAX_JOBSTATUS_AGE = 1000 * 2 MAX_JOBSTATUS_AGEは2000に設定され、最大2000ミリ秒でこのジョブのステータスをリフレッシュすることを示します.public static final String OUTPUT_FILTER = "org.apache.hadoop.mapreduce.client.output.filter" OUTPUT_FILTERは、mapreduceの出力にフィルタリングが必要かどうかを示し、必要であればorg.apache.hadoop.mapreduce.client.output.filterをキー名としてXMLプロファイルにクエリーします.
その中で私たちが最も関心を持っているのはwaitForCompletionの方法です.
  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {//  Job   ,     Running
      submit();//  Job.submit()    
    }
    if (verbose) {//        ,    
      monitorAndPrintJob();//          
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = //             
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {//        
        try {
          Thread.sleep(completionPollIntervalMillis);//   ,      
        } catch (InterruptedException ie) {//
        }
      }
    }
    return isSuccessful();
  }

このコードは実際にはJob.submitの呼び出しは、呼び出す前に、ジョブが複数回コミットされないことを確認するために、このジョブがDEFINE状態にあるかどうかを確認します.ジョブのコミットに成功したらRUNNINGに変更します.
通常の場合、Job.submit()はすぐに戻ります.この方法の役割は宿題を提出するだけで、宿題の実行と完成を待つ必要はありませんが、Job.waitForCompletion()は、プログラムが完了するまで戻りません.待機中、パラメータverboseがtrueの場合、サイクルのレポートジョブ実行の進捗、またはサイクルの検出ジョブが完了したかどうかを確認します.
そして、Job.sumbit()以降の2段目の操作です.