MapReduceの対象がテキストと数値以外の他に多数ある場合にはMapWritableを利用する事でMapインターフェースの型も使用する事ができる

SampleMapper.javaを作成

package sample;

import java.text.MessageFormat;
import java.text.ParseException;
import java.io.IOException;

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapWritable;

public class SampleMapper extends Mapper<LongWritable, Text, Text, MapWritable> {
    @Override
    protected void map(LongWritable id, Text value, Context ctx) {
        try {
            MessageFormat format = new MessageFormat("{0} - - [{1}] \"{2} {3} HTTP/1.1\" {4} {5} {6} {7}");
            Object[] o = format.parse(value.toString());

            MapWritable map = new MapWritable();
            map.put(new Text("host"), new Text((String)o[0]));
            map.put(new Text("method"), new Text((String)o[2]));
            map.put(new Text("ua"), new Text((String)o[7]));

            ctx.write(new Text((String)o[3]), map);
        } catch(ParseException e) {
            e.printStackTrace();
        } catch(IOException e) {
            e.printStackTrace();
        } catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}

MapWritableはキーもバリューも両方Writableなので注意する。又、containsKey等のメソッドもあるが、対象がWritableな為通常どおりStringのような文字列を指定してもマッチしない

SampleReducer.java

package sample;

import java.io.IOException;
import java.util.Map;
import java.util.HashMap;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;

public class SampleReducer extends Reducer<Text, MapWritable, Text, MapWritable> {
    @Override
    protected void reduce(Text key, Iterable<MapWritable> values, Context ctx) {
        try {
            Map<String,Map<String, String> m = new HashMap<String, Map<String, String>>();

            for(MapWritable value : values) {
                int count = 1;

                if(m.containsKey(key.toString())) {
                    int _count = Integer.parseInt(m.get(key.toString()).get("count"));
                    count += _count;
                }

                Map<String, String> v = new HashMap<String, String>();
                v.put("count", String.valueOf(count));

                /* containsKeyを使う場合
                if(value.containsKey(new Text("host"))) {
                    v.put("host", ((Text)value.get(new Text("host"))).toString());
                }
                */

                for(Map.Entry<Writable, Writable> entry : value.entrySet()) {
                    v.put(entry.getKey().toString(), entry.getValue().toString());
                }

                m.put(key.toString(), v);
            }

            for(String k : m.keySet()) {
                Map<String,String> v = m.get(k);

                MapWritable w = new MapWritable();

                for(String wk : v.keySet()) {
                    w.put(new Text(wk), new Text(v.get(wk)));
                }

                ctx.write(new Text(k), w);
            }
        } catch(IOException e) {
            e.printStackTrace();
        } catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}

というようにする事で使用する事ができるがMapWritable自体を通常のようにTextOutputFormat等を使用して書き込んでもキーバリューを出力したり等はしないので別途OutputFormatを拡張しておく必要がある

TestOutputFormat.javaを作成

package sample;

import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;

public class TestOutputFormat extends TextOutputFormat<Text, MapWritable> {
    @Override
    public RecordWriter getRecordWriter(TaskAttemptContext ctx) {
        RecordWriter w = null;

        try {
            Configuration conf = ctx.getConfiguration();
            Path path = getDefaultWorkFile(ctx, ".txt");
            FileSystem fs = path.getFileSystem(conf);
            FSDataOutputStream out = fs.create(path, false);

            w = new TestRecordWriter(out);
        } catch(Exception e) {
            e.printStackTrace();
        }

        return w;
    }

    private static class TestRecordWriter extends RecordWriter<Text, MapWritable> {

        private DataOutputStream out;

        public TestRecordWriter(DataOutputStream out) {
            this.out = out;
        }

        @Override
        public synchronized void write(Text key, MapWritable value) throws IOException {
            if(out != null) {
                for(Map.Entry<Writable, Writable> entry : value.entrySet()) {
                    String k = entry.getKey().toString();
                    String v = entry.getValue().toString();

                    String text = String.format("%s %s:%s", key.toString(), k, v);

                    out.write(text.getBytes());
                    out.write("\n".getBytes());
                }
            }
        }

        @Override
        public synchronized void close(TaskAttemptContext ctx) throws IOException {
            if(out != null) {
                out.close();
            }
        }
    }
}

あとはClientクラスでsetOutputKeyClassでMapWritableを使用すれば良い

備考1 MapWritableをジェネリックスで利用する場合の注意点

MapperやReducerでMapWritable等をpushする場合においてはMapperとReducer間で型が違う場合には正常に渡されない。つまりOutputValueClassはIntWritableになっているのにMapWritableを渡すとおかしくなる。又、Mapperが<Text,IntWritable>なのに対し、Reducerでは<Text,MapWritable>をpushすると正常に処理されない。なのでそういう場合にはMapperからReducerに渡されるデータ型を設定しておく必要がある

package sample;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.fs.Path;

public class Client {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(new Cluster(conf), conf);
        job.setJarByClass(Client.class);
        job.setMapperClass(SampleMapper.class);
        job.setReducerClass(SampleReducer.class);
        job.setInputFormatClass(TextInputFormat.class);

        // 上記で作成したMapWritableの結果を出力するOutputFormatクラスを使用する
        job.setOutputFormatClass(TestOutputFormat.class);

        // Mapper側からpushされるデータ型の指定
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // Reducer側からpushされるデータ型の指定
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(MapWritable.class);

        FileInputFormat.setInputPaths(job, new Path("input"));
        FileOutputFormat.setOutputPath(job, new Path("output"));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}