【十八掌●武功編】第七掌:MapReduceのユニットテスト


MRUnitはJUnitベースのユニットテストフレームワークで、HadoopフレームワークのMapReduceをユニットテストするのに特化しています.MRUnit針は、異なる被験者に対して異なるDriver:-MapDriverを使用し、個別のMap試験に対して使用する.-ReduceDriverは、個別のReduceについてテストします.-MapReduceDriverは、MapとReduceを一貫してテストします.

ユニットテストの例


1、依存を追加
<dependency>
    <groupId>org.apache.mrunitgroupId>
    <artifactId>mrunitartifactId>
    <version>1.1.0version>
    <classifier>hadoop2classifier>
dependency>
<!—  classifierhadoop1 hadoop2,mapreduceV1 hadoop1,mapreduceV2 hadoop2 -->

2、MapReduceプログラム類
package mapreduce;
import mywritable.PariWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class SortWCMapReduce extends Configured implements Tool {

    //Mapper 
    public static class SortWCMapper extends
            Mapper<LongWritable, Text, PariWritable, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String lineValue = value.toString();

            String[] strs = lineValue.split(",");
            PariWritable mapOutputKey = new PariWritable(strs[0], Integer.valueOf(strs[1]));
            context.write(mapOutputKey, new IntWritable(mapOutputKey.getSecond()));
        }
    }

    //Reducer 
    public static class SortWCReducer extends
            Reducer<PariWritable, IntWritable, Text, IntWritable> {
        @Override
        public void reduce(PariWritable key, Iterable values,
                           Context context) throws IOException, InterruptedException {
            for (IntWritable value : values) {
                context.write(new Text(key.getFirst()), value);
            }
        }
    }

    //Driver 
    public int run(String[] args) throws Exception {
        // 
        Configuration configuration = this.getConf();
        // job
        Job job = Job.getInstance(configuration, SortWCMapReduce.class.getSimpleName());
        // MapReduce 
        job.setJarByClass(SortWCMapReduce.class);
        // 
        Path inpath = new Path(args[0]);
        FileInputFormat.addInputPath(job, inpath);
        // 
        Path outpath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outpath);

        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(SortWCMapper.class);
        job.setMapOutputKeyClass(PariWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(SortWCReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        boolean isSucces = job.waitForCompletion(true);
        return isSucces ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        int status = ToolRunner.run(configuration, new SortWCMapReduce(),
                args);
        System.exit(status);
    }
}

3、カスタムタイプクラス
package mywritable;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * Created by   on 2017/5/15.
 */
public class PariWritable implements WritableComparable<PariWritable> {
    private String first;
    private Integer second;

    public String getFirst() {
        return first;
    }

    public Integer getSecond() {
        return second;
    }

    public PariWritable() {
    }

    public PariWritable(String f, Integer s) {
        this.set(f, s);
    }

    public void set(String f, Integer s) {
        this.first = f;
        this.second = s;
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(first);
        dataOutput.writeInt(second);
    }

    public void readFields(DataInput dataInput) throws IOException {
        this.first = dataInput.readUTF();
        this.second = dataInput.readInt();
    }

    @Override
    public String toString() {
        return first + "|" + second;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        }
        if (!(obj instanceof PariWritable)) {
            return false;
        } else {
            PariWritable other = (PariWritable) obj;
            return this.first.equals(other.first) && this.second.compareTo(other.second) == 0;
        }
    }

    public int compareTo(PariWritable o) {
        // , , 
        int comp = this.first.compareTo(o.first);
        if (comp == 0) {
            comp= this.second.compareTo(o.second);
        }
        return comp;
    }
}

4、ユニットテストクラス

import mapreduce.SortWCMapReduce;
import mywritable.PariWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;

/**
 * Created by   on 2017/5/12.
 */
public class SortWCMapReduceTest {
    MapDriver mapDriver;
    ReduceDriver reduceDriver;

    @Before
    public void setUp() {
        SortWCMapReduce.SortWCMapper mapper = new SortWCMapReduce.SortWCMapper();
        SortWCMapReduce.SortWCReducer reduce = new SortWCMapReduce.SortWCReducer();
        mapDriver = new MapDriver(mapper);
        reduceDriver = new ReduceDriver(reduce);

    }

//Map 
    @Test
    public void testMapper() throws IOException {
        mapDriver.withInput(new LongWritable(), new Text("655209,3"));
        mapDriver.withOutput(new PariWritable("655209", 3), new IntWritable(3));
        mapDriver.runTest();
    }

//reduce 
    @Test
    public void testReduce() throws IOException {
        reduceDriver.withInput(new PariWritable("62669", 5), new ArrayList() {
            {
                add(new IntWritable(1));
                add(new IntWritable(3));
            }
        });
        reduceDriver.withOutput(new Text("62669"), new IntWritable(1))
                .withOutput(new Text("62669"), new IntWritable(3));
        reduceDriver.runTest();
    }

//MapReduce 
@Test
public void testMapReduce() {
    mapReduceDriver.withInput(new LongWritable(), new Text("10001,4"))
            .withInput(new LongWritable(), new Text("10002,2"))
            .withInput(new LongWritable(), new Text("10001,2"))
    mapReduceDriver.withOutput(new Text("10001"), new IntWritable(2))
            .withOutput(new Text("10001"), new IntWritable(4))
            .withOutput(new Text("10002"), new IntWritable(2));
}
}