MRUnitでHadoop MapReduceのテストを書く
Hadoop MapReduceのテストを書く方法としてMRUnitっていうのがあるのを以前から知ってて、だけどまったく検証する気配が無いのでやってみた。但し、相変わらずMaven2を使う
pom.xml
<?xml version="1.0" ?>
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>省略</groupId>
<artifactId>省略</artifactId>
<version>1.0</version>
<name>省略</name>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.205.0</version>
</dependency>
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>0.8.0-incubating</version>
</dependency>
</dependencies>
</project>
なぜかmvnrepositoryにはMRUnitの0.9.0-incubatingが上がってる模様げなのに、それを指定すると取得できないという罠に若干ハマったのでバージョンをひとつ下げました
これに書いてある通り使用するHadoopのバージョンは0.20.205
src/main/java/sample/SampleMapper.java
package sample;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
public class SampleMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable id, Text value, Context ctx) {
String v = value.toString();
String[] vs = v.split(" ");
try {
for (String s : vs) {
ctx.write(new Text(s), one);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
src/main/java/sample/SampleReducer.java
package sample;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
public class SampleReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
try {
ctx.write(key, new IntWritable(sum));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
src/test/java/SampleHadoopTestCase.java
で上記で書いたMapperとReducerを単一でもテストしつつMapReduceで処理した場合のテストを行なってみる
package sample;
import java.util.List;
import java.util.ArrayList;
import org.junit.Before;
import org.junit.Test;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
public class SampleHadoopTestCase {
MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> mrDriver;
@Before
public void setUp() {
SampleMapper mapper = new SampleMapper();
mapDriver = new MapDriver<LongWritable, Text, Text, IntWritable>();
mapDriver.setMapper(mapper);
SampleReducer reducer = new SampleReducer();
reduceDriver = new ReduceDriver<Text, IntWritable, Text, IntWritable>();
reduceDriver.setReducer(reducer);
mrDriver = new MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable>();
mrDriver.setMapper(mapper);
mrDriver.setReducer(reducer);
}
@Test
public void testMappe() {
mapDriver.withInput(new LongWritable(), new Text("hoge fuga foobar fuga hoge"));
mapDriver.withOutput(new Text("hoge"), new IntWritable(1));
mapDriver.withOutput(new Text("fuga"), new IntWritable(1));
mapDriver.withOutput(new Text("foobar"), new IntWritable(1));
mapDriver.withOutput(new Text("fuga"), new IntWritable(1));
mapDriver.withOutput(new Text("hoge"), new IntWritable(1));
mapDriver.runTest();
}
@Test
public void testReduce() {
List<IntWritable> values = new ArrayList<IntWritable>();
values.add(new IntWritable(1));
values.add(new IntWritable(2));
reduceDriver.withInput(new Text("hoge"), values);
reduceDriver.withOutput(new Text("hoge"), new IntWritable(3));
reduceDriver.runTest();
}
@Test
public void testMapReduce() {
mrDriver.withInput(new LongWritable() , new Text("hoge fuga foobar fuga hoge"));
mrDriver.withOutput(new Text("foobar"), new IntWritable(1));
mrDriver.withOutput(new Text("fuga"), new IntWritable(2));
mrDriver.withOutput(new Text("hoge"), new IntWritable(2));
mrDriver.runTest();
}
}
ってな感じで、MapperをテストするならMapDriver、ReducerをテストするならReduceDriver、MapperとReducerを流してMapReduceとして?テストするならMapReduceDriver(他にもPipelineMapReduceDriverとかもあるらしい)
でそれに対して、withInputで入力データをやってあげて、withOutputでそれのContext.writeなどから流れるのをテスト出来るっていう仕組みっぽい。であとは
mvn test
参考: https://cwiki.apache.org/confluence/display/MRUNIT/MRUnit+Tutorial
追記
MavenでMRUnit-0.9.0-incubatingを使う場合には
<dependency>
<groupId>org.apache.mrunit</groupId>
<artifactId>mrunit</artifactId>
<version>0.9.0-incubating</version>
<scope>test</scope>
<classifier>hadoop1</classifier>
</dependency>
というように<classifier>を設定しないといけない(0.20及び1.0系ならhadoop1、0.23とかの場合はhadoop2っぽい)