Hadoop作成WordCount
6292 ワード
本文は本人ブログに発表された.
前回Hadoopの環境構築、HDFS操作についてお話ししましたが、今日は続きます.もともとHadoopソースコードにはWordCountの例がありますが、今日はこのMapper、Reducerに対する理解を深めるために自分で実現します.
まず、カスタムMapperおよびReducerについて、mapおよびreduce関数を上書きし、入力ファイルディレクトリの設定、ファイルフォーマットクラスの入力、カスタムMapperの設定、パーティション化、ソート、グループ化、契約の設定、カスタムReducerの設定など、関連する手順に従います.ここでは、入力ファイルをスペース分割(タブでも使用できます)し、カスタムMapperクラスMyMapperを次に示します.
ここで私が選んだのは新しいAPIで、関連ライブラリは基本的にorgです.apache.hadoop.mapreduceの下で、古いAPIはorg.apache.hadoop.mapredでは、リファレンスライブラリも含まれています.カスタムMyMapperは汎用継承Mapperであり、パラメータkeyvalueはHadoop内部タイプであり、javaの基本タイプをサポートしていない.ここではjavaの基本タイプを選択しない理由に注意しなければならない.なぜなら、他の追加の操作が必要ではなく、それ自体がシーケンス化された逆シーケンス化とパフォーマンスの向上が必要であるため、hadoopを追加したタイプはjavaの基本タイプを放棄しているからだ.hadoop keyvalueとjava基本タイプの相互変換の問題についても簡単ですが、java基本タイプからhadoopのkeyvalueに変換すれば直接newでパラメータ付きでいいので、hadoopのkeyvalueタイプからjavaの基本タイプに変換するにはgetメソッドを使えばいいです!次のようになります.
次に、カスタムReducerクラスMyReduceを見てみましょう.
これは上記と似ていますが、mainメソッドの実行方法を見てみましょう.
実行時に前回実行した出力ディレクトリが出力されます.次の手順に従います.
もちろん、出力ファイルフォーマットクラスなど、省略できるものもあります.この例から、カスタムMapperやカスタムReducerを設定できる以上、カスタム入力ファイルフォーマットクラスやパーティション、ソート、グループ化、規約などを設定できるはずですが、これ以降は関連するメモがあります.ここでは簡単な例を書くだけです.次のようにファイルを作成してアップロードします.hdfs://hadoop-master:9000/mapreduce/input/test.txt:
次にmain関数を実行すると、hdfs://hadoop-master:9000/mapreduce/output/ディレクトリの下にpart-*のようなファイルが出力されます.次のコマンドで表示できます.
次のように出力されます.
今、ファイルが出力されているのも比較的正しいですが、頭がまだ真っ白で、どのようにしているのか分かりません.それではmapreduceの原理についてです.次に、コードをJobTrackerに提出することから、指定された入力ファイルパスからファイルを取得します.ここでは、複数のファイルと2級ディレクトリの下の複数のファイルをサポートしています.ここで取得するのはHDFS apiを使って操作します!全てのファイルを読み出して指定の大きさでスプリットInputSplitを行い、スプリットしたキー値FileSplit(<0,"luoliang me"),<13,"asura asura.com luoliang")をRecordReader(<"luoliang",1>,<"luoliang",1>)に変換すると、すべての変換が完了するとmap関数が呼び出され、map関数はデータをMapperに書き込む.Contextでは、データをパーティション化してパケット規則を並べ替え、最後にshuffleを介してreduce側に到達し、各mapの出力数はreduceの入力数に等しい.reduceエンドに到達したデータは既に質的に変化しており、<「luoliang」ではなく、1>のように<「luoliang」,{1,1}>のようなキー値データとなり、合計数を反復して取得し、書き込みcontextで計算した後、指定されたディレクトリに出力する必要がある.ここでは重複する単語があるためmap関数の呼び出し回数とreduce関数の呼び出し回数は異なる.これはカスタマイズreduceですが、これは必須ではありません.類似平均数の統計の問題であれば、データはmap側で規制されています.転送時間や処理時間が減少して性能が向上しますが、最終結果に影響を及ぼす可能性がありますので、この規制は具体的な状況を見て使用します.このshuffleについては、まだあまり理解していないので、もっと見てみましょう.
今回はとりあえずここまで.記録を続けて少しずつ!
前回Hadoopの環境構築、HDFS操作についてお話ししましたが、今日は続きます.もともとHadoopソースコードにはWordCountの例がありますが、今日はこのMapper、Reducerに対する理解を深めるために自分で実現します.
まず、カスタムMapperおよびReducerについて、mapおよびreduce関数を上書きし、入力ファイルディレクトリの設定、ファイルフォーマットクラスの入力、カスタムMapperの設定、パーティション化、ソート、グループ化、契約の設定、カスタムReducerの設定など、関連する手順に従います.ここでは、入力ファイルをスペース分割(タブでも使用できます)し、カスタムMapperクラスMyMapperを次に示します.
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
public class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
String[] splied = value.toString().split(" ");
for (int i = 0; i < splied.length; i++) {
String lineWord = splied[i];
context.write(new Text(lineWord), new LongWritable(1));
}
}
}
ここで私が選んだのは新しいAPIで、関連ライブラリは基本的にorgです.apache.hadoop.mapreduceの下で、古いAPIはorg.apache.hadoop.mapredでは、リファレンスライブラリも含まれています.カスタムMyMapperは汎用継承Mapperであり、パラメータkeyvalueはHadoop内部タイプであり、javaの基本タイプをサポートしていない.ここではjavaの基本タイプを選択しない理由に注意しなければならない.なぜなら、他の追加の操作が必要ではなく、それ自体がシーケンス化された逆シーケンス化とパフォーマンスの向上が必要であるため、hadoopを追加したタイプはjavaの基本タイプを放棄しているからだ.hadoop keyvalueとjava基本タイプの相互変換の問題についても簡単ですが、java基本タイプからhadoopのkeyvalueに変換すれば直接newでパラメータ付きでいいので、hadoopのkeyvalueタイプからjavaの基本タイプに変換するにはgetメソッドを使えばいいです!次のようになります.
LongWritable lw = new LongWritable(1L);
long temp = lw.get();
次に、カスタムReducerクラスMyReduceを見てみましょう.
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0L;
for(LongWritable value: values) {
count += value.get();
}
context.write(key, new LongWritable(count));
}
}
これは上記と似ていますが、mainメソッドの実行方法を見てみましょう.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import com.sun.org.apache.xpath.internal.axes.HasPositionalPredChecker;
public class Test {
static final String OUTPUT_DIR = "hdfs://hadoop-master:9000/mapreduce/output/";
static final String INPUT_DIR = "hdfs://hadoop-master:9000/mapreduce/input/test.txt";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, Test.class.getSimpleName());
deleteOutputFile(OUTPUT_DIR);
//1
FileInputFormat.setInputPaths(job, INPUT_DIR);
//2
job.setInputFormatClass(TextInputFormat.class);
//3 Mapper
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//4
job.setPartitionerClass(HashPartitioner.class);
job.setNumReduceTasks(1);
//5
//6 Reduce
job.setReducerClass(MyReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//7
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR));
//8 job
job.waitForCompletion(true);
}
static void deleteOutputFile(String path) throws Exception{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(new URI(INPUT_DIR),conf);
if(fs.exists(new Path(path))){
fs.delete(new Path(path));
}
}
}
実行時に前回実行した出力ディレクトリが出力されます.次の手順に従います.
1. ;
2. ;
3. Mapper ;
4. ;
5. ;
6. ;
7. ;
8. Reducer ;
9. ;
10. JobTracker。
もちろん、出力ファイルフォーマットクラスなど、省略できるものもあります.この例から、カスタムMapperやカスタムReducerを設定できる以上、カスタム入力ファイルフォーマットクラスやパーティション、ソート、グループ化、規約などを設定できるはずですが、これ以降は関連するメモがあります.ここでは簡単な例を書くだけです.次のようにファイルを作成してアップロードします.hdfs://hadoop-master:9000/mapreduce/input/test.txt:
luoliang me
asura asura.com luoliang
me
次にmain関数を実行すると、hdfs://hadoop-master:9000/mapreduce/output/ディレクトリの下にpart-*のようなファイルが出力されます.次のコマンドで表示できます.
hadoop fs -text /output/part-*
次のように出力されます.
asura 1
asura.com 1
luoliang 2
me 2
今、ファイルが出力されているのも比較的正しいですが、頭がまだ真っ白で、どのようにしているのか分かりません.それではmapreduceの原理についてです.次に、コードをJobTrackerに提出することから、指定された入力ファイルパスからファイルを取得します.ここでは、複数のファイルと2級ディレクトリの下の複数のファイルをサポートしています.ここで取得するのはHDFS apiを使って操作します!全てのファイルを読み出して指定の大きさでスプリットInputSplitを行い、スプリットしたキー値FileSplit(<0,"luoliang me"),<13,"asura asura.com luoliang")をRecordReader(<"luoliang",1>,<"luoliang",1>)に変換すると、すべての変換が完了するとmap関数が呼び出され、map関数はデータをMapperに書き込む.Contextでは、データをパーティション化してパケット規則を並べ替え、最後にshuffleを介してreduce側に到達し、各mapの出力数はreduceの入力数に等しい.reduceエンドに到達したデータは既に質的に変化しており、<「luoliang」ではなく、1>のように<「luoliang」,{1,1}>のようなキー値データとなり、合計数を反復して取得し、書き込みcontextで計算した後、指定されたディレクトリに出力する必要がある.ここでは重複する単語があるためmap関数の呼び出し回数とreduce関数の呼び出し回数は異なる.これはカスタマイズreduceですが、これは必須ではありません.類似平均数の統計の問題であれば、データはmap側で規制されています.転送時間や処理時間が減少して性能が向上しますが、最終結果に影響を及ぼす可能性がありますので、この規制は具体的な状況を見て使用します.このshuffleについては、まだあまり理解していないので、もっと見てみましょう.
今回はとりあえずここまで.記録を続けて少しずつ!