HadoopのJobコミットプロセスの概要(一)
4012 ワード
Hadoopは利用者に3種類の宿題を提出する方法を提供し、3種類のこのようなAPIを提供した.なぜ3種類の異なる方法があるのか、Hadoopはその歴史上新旧の2つのAPIと1つの融通のきく方法があるため、この3つの方法はそれぞれ:1、JobClient.runJob():古いAPIであるJobClientクラスによって提供されるメソッドrunJob()を呼び出す.2、Job.waitForCompletion():新しいAPIに属するJobクラスによって提供されるメソッドwaitForCompletion()を呼び出します.3、ToolRunner.run():ToolRunnerクラスによって提供されるrun()メソッドを呼び出します.これはトランスフォームメソッドです.
古いAPIは現在ではあまり使われていないため、古いバージョンを維持するためだけにコードには古いAPIが残っています.したがって、現在主に2番目のAPIを使用しています.Hadoopソースコードには、hadoop-mapreduce-projectのhadoop-mapreduce-examplesのWordCount.の例も示されています.java
WordCountクラスの詳細については、次の文書を参照してください.(https://www.jianshu.com/p/3136a9fa84ed)
WordCountクラスには、TokenizerMapperとIntSumReducerの2つのクラスが定義されています.前者はMapperクラスのmapメソッドを上書きし,後者はreducerクラスのreduceメソッドを上書きした.コミットされたコードにmapメソッドとreduceメソッドが上書きされていない場合、hadoopは自動的に自分のデフォルトのmap reduceメソッドを使用します.
具体的なmap reduceの実現過程を省略する
次のWordCountのmainメソッドのコード:
プログラムはジョブのコミットのためにwaitForCompletionメソッドを呼び出すJobオブジェクトを新規に作成し、メソッドの戻りtrueが終わるまで呼び出すことがわかります.
Hadoopのmapreduceのjobがどのように提出されたかは大体分かりましたが、Jobクラスの具体的な実装を見てみましょう.
Jobクラスには多くの静的属性が定義されています.たとえば、
その中で私たちが最も関心を持っているのはwaitForCompletionの方法です.
このコードは実際にはJob.submitの呼び出しは、呼び出す前に、ジョブが複数回コミットされないことを確認するために、このジョブがDEFINE状態にあるかどうかを確認します.ジョブのコミットに成功したらRUNNINGに変更します.
通常の場合、Job.submit()はすぐに戻ります.この方法の役割は宿題を提出するだけで、宿題の実行と完成を待つ必要はありませんが、Job.waitForCompletion()は、プログラムが完了するまで戻りません.待機中、パラメータverboseがtrueの場合、サイクルのレポートジョブ実行の進捗、またはサイクルの検出ジョブが完了したかどうかを確認します.
そして、Job.sumbit()以降の2段目の操作です.
古いAPIは現在ではあまり使われていないため、古いバージョンを維持するためだけにコードには古いAPIが残っています.したがって、現在主に2番目のAPIを使用しています.Hadoopソースコードには、hadoop-mapreduce-projectのhadoop-mapreduce-examplesのWordCount.の例も示されています.java
WordCountクラスの詳細については、次の文書を参照してください.(https://www.jianshu.com/p/3136a9fa84ed)
WordCountクラスには、TokenizerMapperとIntSumReducerの2つのクラスが定義されています.前者はMapperクラスのmapメソッドを上書きし,後者はreducerクラスのreduceメソッドを上書きした.コミットされたコードにmapメソッドとreduceメソッドが上書きされていない場合、hadoopは自動的に自分のデフォルトのmap reduceメソッドを使用します.
具体的なmap reduceの実現過程を省略する
次のWordCountのmainメソッドのコード:
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount [...] ");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
プログラムはジョブのコミットのためにwaitForCompletionメソッドを呼び出すJobオブジェクトを新規に作成し、メソッドの戻りtrueが終わるまで呼び出すことがわかります.
System.exit(job.waitForCompletion(true) ? 0 : 1);
は、trueを返すと、プログラムは0、Systemを返すことを意味する.exit.falseを返すと、プログラムは1を返し、プログラムの実行に失敗したことがわかります.Hadoopのmapreduceのjobがどのように提出されたかは大体分かりましたが、Jobクラスの具体的な実装を見てみましょう.
public class Job extends JobContextImpl implements JobContext{}
Jobクラスには多くの静的属性が定義されています.たとえば、
public static enum JobState {DEFINE, RUNNING}
は、jobが準備フェーズと実行フェーズにあることを示す列挙属性JobStateを定義しています.DEFINEとRUNNINGは、jobが準備フェーズと実行フェーズにあることを示しています.private static final long MAX_JOBSTATUS_AGE = 1000 * 2
MAX_JOBSTATUS_AGEは2000に設定され、最大2000ミリ秒でこのジョブのステータスをリフレッシュすることを示します.public static final String OUTPUT_FILTER = "org.apache.hadoop.mapreduce.client.output.filter"
OUTPUT_FILTERは、mapreduceの出力にフィルタリングが必要かどうかを示し、必要であればorg.apache.hadoop.mapreduce.client.output.filter
をキー名としてXMLプロファイルにクエリーします.その中で私たちが最も関心を持っているのはwaitForCompletionの方法です.
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == JobState.DEFINE) {// Job , Running
submit();// Job.submit()
}
if (verbose) {// ,
monitorAndPrintJob();//
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis = //
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {//
try {
Thread.sleep(completionPollIntervalMillis);// ,
} catch (InterruptedException ie) {//
}
}
}
return isSuccessful();
}
このコードは実際にはJob.submitの呼び出しは、呼び出す前に、ジョブが複数回コミットされないことを確認するために、このジョブがDEFINE状態にあるかどうかを確認します.ジョブのコミットに成功したらRUNNINGに変更します.
通常の場合、Job.submit()はすぐに戻ります.この方法の役割は宿題を提出するだけで、宿題の実行と完成を待つ必要はありませんが、Job.waitForCompletion()は、プログラムが完了するまで戻りません.待機中、パラメータverboseがtrueの場合、サイクルのレポートジョブ実行の進捗、またはサイクルの検出ジョブが完了したかどうかを確認します.
そして、Job.sumbit()以降の2段目の操作です.