MRUnitによるMapReduceユニットテスト


一、MRUnitの概要


公式サイトのアドレス:https://mrunit.apache.org/
    Apache MRUnit ™ is a Java library that helps developers unit test Apache Hadoop map reduce jobs.
MRUnitは、開発者がmap reduceジョブをテストするのを支援するユニットテストライブラリです.

二、コード例


mavenプロジェクトを例に、MRUnitを使用してMRユニットテストを行う方法を示します.
例の説明については、以下を参照してください.https://cwiki.apache.org/confluence/display/MRUNIT/MRUnit+Tutorial
プロジェクトpom.xmlファイルは,mrunit,mockito-all,junitの3つのクラスライブラリの導入に重点を置き,MRUnitはmockito+junitを用いてMRプログラムのシミュレーションテストを行う.

MRユニットテストクラス

package mrunit;

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import mrunit.SMSCDRMapper.CDRCounter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

/**
 *   CDRID;CDRType;Phone1;Phone2;SMS Status Code
 * 655209;1;796764372490213;804422938115889;6
 * 353415;0;356857119806206;287572231184798;4
 * 835699;1;252280313968413;889717902341635;0
 * 
 */
public class SMSCDRMapperReducerTest {
	Configuration conf = new Configuration();
	MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
	ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
	MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;

	@Before
	public void setUp() {

		// mapreduce
		SMSCDRMapper mapper = new SMSCDRMapper();
		SMSCDRReducer reducer = new SMSCDRReducer();
		mapDriver = MapDriver.newMapDriver(mapper);
		reduceDriver = ReduceDriver.newReduceDriver(reducer);
		mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
		
		// 
		mapDriver.setConfiguration(conf);
		conf.set("myParameter1", "20");
		conf.set("myParameter2", "23");
		
	}

	@Test
	public void testMapper() throws IOException {
		mapDriver.withInput(new LongWritable(), new Text(
				"655209;1;796764372490213;804422938115889;6"));
		mapDriver.withOutput(new Text("6"), new IntWritable(1));
		mapDriver.runTest();
	}

	@Test
	public void testReducer() throws IOException {
		List<IntWritable> values = new ArrayList<IntWritable>();
		values.add(new IntWritable(1));
		values.add(new IntWritable(1));
		reduceDriver.withInput(new Text("6"), values);
		reduceDriver.withOutput(new Text("6"), new IntWritable(2));
		reduceDriver.runTest();
	}
	
	@Test
	public void testMapperReducer() throws IOException {
		mapReduceDriver.withInput(new LongWritable(), new Text(
				"655209;1;796764372490213;804422938115889;6"));
		mapReduceDriver.withOutput(new Text("6"), new IntWritable(1));
	}

	@Test
	public void testMapperCount() throws IOException {
		mapDriver.withInput(new LongWritable(), new Text(
				"655209;0;796764372490213;804422938115889;6"));
		// mapDriver.withOutput(new Text("6"), new IntWritable(1));
		mapDriver.runTest();
		assertEquals("Expected 1 counter increment", 1, mapDriver.getCounters()
				.findCounter(CDRCounter.NonSMSCDR).getValue());
	}
}

Mapperクラス

package mrunit;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SMSCDRMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	 
	  private Text status = new Text();
	  private final static IntWritable addOne = new IntWritable(1);
	  
	  static enum CDRCounter {
			NonSMSCDR;
		};
	 
	  /**
	   * Returns the SMS status code and its count
	   */
	  protected void map(LongWritable key, Text value, Context context)
	      throws java.io.IOException, InterruptedException {
	 
	    //655209;1;796764372490213;804422938115889;6 is the Sample record format
	    String[] line = value.toString().split(";");
	    // If record is of SMS CDR
	    if (Integer.parseInt(line[1]) == 1) {
	      status.set(line[4]);
	      context.write(status, addOne);
	    }else{
	    	// CDR record is not of type SMS so increment the counter
			context.getCounter(CDRCounter.NonSMSCDR).increment(1);
		}
	  }
	}

Reducerクラス

package mrunit;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SMSCDRReducer extends
  Reducer<Text, IntWritable, Text, IntWritable> {
 
  protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws java.io.IOException, InterruptedException {
    int sum = 0;
    for (IntWritable value : values) {
      sum += value.get();
    }
    context.write(key, new IntWritable(sum));
  }
}

プロジェクトのpom.xmlファイル

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.cdh</groupId>
	<artifactId>cdh-test</artifactId>
	<version>SNAPSHOT-1.0.0</version>
	<packaging>jar</packaging>

	<name>cdh-test</name>
	<url>http://maven.apache.org</url>

	<properties>
		<hadoop.version>2.0.0-mr1-cdh4.4.0</hadoop.version>
		<hbase.version>0.94.6-cdh4.4.0</hbase.version>
		<project.build.sourceEncoding>utf-8</project.build.sourceEncoding>
		<maven.compiler.encoding>utf-8</maven.compiler.encoding>
	</properties>

	<build>
		<pluginManagement>
			<plugins>
				<plugin>
					<groupId>org.apache.maven.plugins</groupId>
					<artifactId>maven-compiler-plugin</artifactId>
					<version>3.1</version>
					<configuration>
						<encoding>utf-8</encoding>
						<source>1.6</source>
						<target>1.6</target>
					</configuration>
				</plugin>
			</plugins>
		</pluginManagement>

		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>2.1</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
					</execution>
				</executions>
			</plugin>

			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-eclipse-plugin</artifactId>
				<version>2.9</version>
				<configuration>
					<buildOutputDirectory>eclipse-classes</buildOutputDirectory>
					<downloadSources>true</downloadSources>
					<downloadJavadocs>false</downloadJavadocs>
				</configuration>
			</plugin>
		</plugins>
	</build>

	<dependencies>
		<dependency>
			<groupId>jdk.tools</groupId>
			<artifactId>jdk.tools</artifactId>
			<version>1.6</version>
			<scope>system</scope>
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
		</dependency>
		
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>${hadoop.version}</version>
			<scope>provided</scope>
			<exclusions>
				<exclusion>
					<artifactId>mockito-all</artifactId>
					<groupId>org.mockito</groupId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-core</artifactId>
			<version>2.0.0-cdh4.4.0</version>
			<exclusions>
				<exclusion>
					<artifactId>
						jersey-test-framework-grizzly2
					</artifactId>
					<groupId>
						com.sun.jersey.jersey-test-framework
					</groupId>
				</exclusion>
				<exclusion>
					<artifactId>netty</artifactId>
					<groupId>org.jboss.netty</groupId>
				</exclusion>
			</exclusions>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.hbase</groupId>
			<artifactId>hbase</artifactId>
			<version>${hbase.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>com.hadoop.gplcompression</groupId>
			<artifactId>hadoop-lzo-cdh4</artifactId>
			<version>0.4.15-gplextras</version>
		</dependency>

		<dependency>
			<groupId>org.hsqldb</groupId>
			<artifactId>hsqldb</artifactId>
			<version>2.2.9</version>
		</dependency>

		<dependency>
			<groupId>redis.clients</groupId>
			<artifactId>jedis</artifactId>
			<version>2.5.1</version>
		</dependency>
		
		<!-- junit test -->
		<dependency>
			<groupId>org.apache.mrunit</groupId>
			<artifactId>mrunit</artifactId>
			<version>1.1.0</version>
			<classifier>hadoop2</classifier>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.mockito</groupId>
			<artifactId>mockito-all</artifactId>
			<version>1.9.5</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.10</version>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<repositories>
		<repository>
			<id>cloudera</id>
			<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
			<releases>
				<enabled>true</enabled>
			</releases>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</repository>
	</repositories>
</project>