Hadoop (6) - MapWritable -
MapReduceの対象がテキストと数値以外の他に多数ある場合にはMapWritableを利用する事でMapインターフェースの型も使用する事ができる
SampleMapper.javaを作成
package sample;
import java.text.MessageFormat;
import java.text.ParseException;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapWritable;
public class SampleMapper extends Mapper<LongWritable, Text, Text, MapWritable> {
@Override
protected void map(LongWritable id, Text value, Context ctx) {
try {
MessageFormat format = new MessageFormat("{0} - - [{1}] \"{2} {3} HTTP/1.1\" {4} {5} {6} {7}");
Object[] o = format.parse(value.toString());
MapWritable map = new MapWritable();
map.put(new Text("host"), new Text((String)o[0]));
map.put(new Text("method"), new Text((String)o[2]));
map.put(new Text("ua"), new Text((String)o[7]));
ctx.write(new Text((String)o[3]), map);
} catch(ParseException e) {
e.printStackTrace();
} catch(IOException e) {
e.printStackTrace();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
MapWritableはキーもバリューも両方Writableなので注意する。又、containsKey等のメソッドもあるが、対象がWritableな為通常どおりStringのような文字列を指定してもマッチしない
SampleReducer.java
package sample;
import java.io.IOException;
import java.util.Map;
import java.util.HashMap;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
public class SampleReducer extends Reducer<Text, MapWritable, Text, MapWritable> {
@Override
protected void reduce(Text key, Iterable<MapWritable> values, Context ctx) {
try {
Map<String,Map<String, String> m = new HashMap<String, Map<String, String>>();
for(MapWritable value : values) {
int count = 1;
if(m.containsKey(key.toString())) {
int _count = Integer.parseInt(m.get(key.toString()).get("count"));
count += _count;
}
Map<String, String> v = new HashMap<String, String>();
v.put("count", String.valueOf(count));
/* containsKeyを使う場合
if(value.containsKey(new Text("host"))) {
v.put("host", ((Text)value.get(new Text("host"))).toString());
}
*/
for(Map.Entry<Writable, Writable> entry : value.entrySet()) {
v.put(entry.getKey().toString(), entry.getValue().toString());
}
m.put(key.toString(), v);
}
for(String k : m.keySet()) {
Map<String,String> v = m.get(k);
MapWritable w = new MapWritable();
for(String wk : v.keySet()) {
w.put(new Text(wk), new Text(v.get(wk)));
}
ctx.write(new Text(k), w);
}
} catch(IOException e) {
e.printStackTrace();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
というようにする事で使用する事ができるがMapWritable自体を通常のようにTextOutputFormat等を使用して書き込んでもキーバリューを出力したり等はしないので別途OutputFormatを拡張しておく必要がある
TestOutputFormat.javaを作成
package sample;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
public class TestOutputFormat extends TextOutputFormat<Text, MapWritable> {
@Override
public RecordWriter getRecordWriter(TaskAttemptContext ctx) {
RecordWriter w = null;
try {
Configuration conf = ctx.getConfiguration();
Path path = getDefaultWorkFile(ctx, ".txt");
FileSystem fs = path.getFileSystem(conf);
FSDataOutputStream out = fs.create(path, false);
w = new TestRecordWriter(out);
} catch(Exception e) {
e.printStackTrace();
}
return w;
}
private static class TestRecordWriter extends RecordWriter<Text, MapWritable> {
private DataOutputStream out;
public TestRecordWriter(DataOutputStream out) {
this.out = out;
}
@Override
public synchronized void write(Text key, MapWritable value) throws IOException {
if(out != null) {
for(Map.Entry<Writable, Writable> entry : value.entrySet()) {
String k = entry.getKey().toString();
String v = entry.getValue().toString();
String text = String.format("%s %s:%s", key.toString(), k, v);
out.write(text.getBytes());
out.write("\n".getBytes());
}
}
}
@Override
public synchronized void close(TaskAttemptContext ctx) throws IOException {
if(out != null) {
out.close();
}
}
}
}
あとはClientクラスでsetOutputKeyClassでMapWritableを使用すれば良い
備考1 MapWritableをジェネリックスで利用する場合の注意点
MapperやReducerでMapWritable等をpushする場合においてはMapperとReducer間で型が違う場合には正常に渡されない。つまりOutputValueClassはIntWritableになっているのにMapWritableを渡すとおかしくなる。又、Mapperが<Text,IntWritable>なのに対し、Reducerでは<Text,MapWritable>をpushすると正常に処理されない。なのでそういう場合にはMapperからReducerに渡されるデータ型を設定しておく必要がある
package sample;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.fs.Path;
public class Client {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(new Cluster(conf), conf);
job.setJarByClass(Client.class);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
job.setInputFormatClass(TextInputFormat.class);
// 上記で作成したMapWritableの結果を出力するOutputFormatクラスを使用する
job.setOutputFormatClass(TestOutputFormat.class);
// Mapper側からpushされるデータ型の指定
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// Reducer側からpushされるデータ型の指定
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MapWritable.class);
FileInputFormat.setInputPaths(job, new Path("input"));
FileOutputFormat.setOutputPath(job, new Path("output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}