Hadoop作成WordCount

6292 ワード

本文は本人ブログに発表された.
前回Hadoopの環境構築、HDFS操作についてお話ししましたが、今日は続きます.もともとHadoopソースコードにはWordCountの例がありますが、今日はこのMapper、Reducerに対する理解を深めるために自分で実現します.
まず、カスタムMapperおよびReducerについて、mapおよびreduce関数を上書きし、入力ファイルディレクトリの設定、ファイルフォーマットクラスの入力、カスタムMapperの設定、パーティション化、ソート、グループ化、契約の設定、カスタムReducerの設定など、関連する手順に従います.ここでは、入力ファイルをスペース分割(タブでも使用できます)し、カスタムMapperクラスMyMapperを次に示します.
import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Mapper.Context;



public class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    

    @Override

    protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {

        String[] splied = value.toString().split(" ");

        for (int i = 0; i < splied.length; i++) {

            String lineWord = splied[i];

            context.write(new Text(lineWord), new LongWritable(1));

        }

    }

}

ここで私が選んだのは新しいAPIで、関連ライブラリは基本的にorgです.apache.hadoop.mapreduceの下で、古いAPIはorg.apache.hadoop.mapredでは、リファレンスライブラリも含まれています.カスタムMyMapperは汎用継承Mapperであり、パラメータkeyvalueはHadoop内部タイプであり、javaの基本タイプをサポートしていない.ここではjavaの基本タイプを選択しない理由に注意しなければならない.なぜなら、他の追加の操作が必要ではなく、それ自体がシーケンス化された逆シーケンス化とパフォーマンスの向上が必要であるため、hadoopを追加したタイプはjavaの基本タイプを放棄しているからだ.hadoop keyvalueとjava基本タイプの相互変換の問題についても簡単ですが、java基本タイプからhadoopのkeyvalueに変換すれば直接newでパラメータ付きでいいので、hadoopのkeyvalueタイプからjavaの基本タイプに変換するにはgetメソッドを使えばいいです!次のようになります.
LongWritable lw = new LongWritable(1L);

long temp = lw.get();

次に、カスタムReducerクラスMyReduceを見てみましょう.
import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.Reducer.Context;



public class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable> {



    @Override

    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

        long count = 0L;

        for(LongWritable value: values) {

            count += value.get();

        }

        context.write(key, new LongWritable(count));

    }

}

これは上記と似ていますが、mainメソッドの実行方法を見てみましょう.
    

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.InputFormat;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;



import com.sun.org.apache.xpath.internal.axes.HasPositionalPredChecker;



public class Test {

    static final String OUTPUT_DIR = "hdfs://hadoop-master:9000/mapreduce/output/";

    static final String INPUT_DIR = "hdfs://hadoop-master:9000/mapreduce/input/test.txt";

    

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        Job job = new Job(conf, Test.class.getSimpleName());        

        deleteOutputFile(OUTPUT_DIR);

        

        //1 

        FileInputFormat.setInputPaths(job, INPUT_DIR);

        //2 

        job.setInputFormatClass(TextInputFormat.class);

        //3 Mapper 

        job.setMapperClass(MyMapper.class);

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(LongWritable.class);

        //4 

        job.setPartitionerClass(HashPartitioner.class);

        job.setNumReduceTasks(1);

        //5 

        //6 Reduce 

        job.setReducerClass(MyReduce.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(LongWritable.class);

        //7 

        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR));

        //8 job

        job.waitForCompletion(true);

    }

    

    static void deleteOutputFile(String path) throws Exception{

        Configuration conf = new Configuration();

        FileSystem fs = FileSystem.get(new URI(INPUT_DIR),conf);

        if(fs.exists(new Path(path))){

            fs.delete(new Path(path));

        }

    }

}

実行時に前回実行した出力ディレクトリが出力されます.次の手順に従います.
1. ;

2. ;

3. Mapper ;

4. ;

5. ;

6. ;

7. ;

8. Reducer ;

9. ;

10. JobTracker。

もちろん、出力ファイルフォーマットクラスなど、省略できるものもあります.この例から、カスタムMapperやカスタムReducerを設定できる以上、カスタム入力ファイルフォーマットクラスやパーティション、ソート、グループ化、規約などを設定できるはずですが、これ以降は関連するメモがあります.ここでは簡単な例を書くだけです.次のようにファイルを作成してアップロードします.hdfs://hadoop-master:9000/mapreduce/input/test.txt:
luoliang me

asura asura.com luoliang

me

次にmain関数を実行すると、hdfs://hadoop-master:9000/mapreduce/output/ディレクトリの下にpart-*のようなファイルが出力されます.次のコマンドで表示できます.
hadoop fs -text /output/part-*

次のように出力されます.
asura 1

asura.com 1

luoliang 2

me 2

今、ファイルが出力されているのも比較的正しいですが、頭がまだ真っ白で、どのようにしているのか分かりません.それではmapreduceの原理についてです.次に、コードをJobTrackerに提出することから、指定された入力ファイルパスからファイルを取得します.ここでは、複数のファイルと2級ディレクトリの下の複数のファイルをサポートしています.ここで取得するのはHDFS apiを使って操作します!全てのファイルを読み出して指定の大きさでスプリットInputSplitを行い、スプリットしたキー値FileSplit(<0,"luoliang me"),<13,"asura asura.com luoliang")をRecordReader(<"luoliang",1>,<"luoliang",1>)に変換すると、すべての変換が完了するとmap関数が呼び出され、map関数はデータをMapperに書き込む.Contextでは、データをパーティション化してパケット規則を並べ替え、最後にshuffleを介してreduce側に到達し、各mapの出力数はreduceの入力数に等しい.reduceエンドに到達したデータは既に質的に変化しており、<「luoliang」ではなく、1>のように<「luoliang」,{1,1}>のようなキー値データとなり、合計数を反復して取得し、書き込みcontextで計算した後、指定されたディレクトリに出力する必要がある.ここでは重複する単語があるためmap関数の呼び出し回数とreduce関数の呼び出し回数は異なる.これはカスタマイズreduceですが、これは必須ではありません.類似平均数の統計の問題であれば、データはmap側で規制されています.転送時間や処理時間が減少して性能が向上しますが、最終結果に影響を及ぼす可能性がありますので、この規制は具体的な状況を見て使用します.このshuffleについては、まだあまり理解していないので、もっと見てみましょう.
今回はとりあえずここまで.記録を続けて少しずつ!