ChainMapper/ChainReducerを使用する事であるMapperから他のMapperを呼び出したり、ReducerからMapperを呼び出したりできる

但し、呼び出し元のMapperと呼び出し先MapperではアウトプットされるKeyValueのデータ型のジェネリックス等が一致しないといけないっぽい

SampleMapper.javaを作成

package sample;

import java.io.IOException;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;

public class SampleMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private static final IntWritable one = new IntWritable(1);

    @Override
    protected void map(LongWritable id, Text value, Context ctx) {
        try {
            // スプリット処理は次のMapperで行うのでここでは行わない
            ctx.write(value, one);
        } catch(IOException e) {
            e.printStackTrace();
        } catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}

TestMapper.javaを作成

package sample;

import java.io.IOException;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;

public class TestMapper extends Mapper<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void map(Text key, IntWritable value, Context ctx) {
        String keys = key.toString().split(" ");
        int sum = value.get();

        try {
            for(String k : keys) {
                ctx.write(new Text(k), new IntWritable(sum));
            }
        } catch(IOException e) {
            e.printStackTrace();
        } catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}

ChainReducerの場合には指定されたMapperからOutputFormatに行くので最終的な集計等はここで行わないと重複する値がアウトプットされてしまうので注意

SampleReducer.javaを作成

package sample;


import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;

public class SampleReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) {
        int sum = 0;

        for(IntWritable value : values) {
            sum += value.get();
        }

        try {
            ctx.write(key, new IntWritable(sum));
        } catch(IOException e) {
            e.printStackTrace();
        } catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Client.javaを作成

package sample;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;

public class Client {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(new Cluster(conf), conf);
        job.setJarByClass(Client.class);
        ChainMapper.addMapper(job, SampleMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, conf);
        ChainMapper.addMapper(job, TestMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, conf);

        job.setReducerClass(SampleReducer.class);

        /* ChainReducerを使用する場合
        ChainReducer.setReducer(job, SampleReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, conf);
        ChainReducer.addMapper(job, TestMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, conf);
        */

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("input"));
        FileOutputFormat.setOutputFormat(job, new Path("output"));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

こうする事で、SampleMapper → TestMapper → SampleReducerというようにMapperからMapperに処理を促す事ができる。但し、ChainMapperから呼び出すMapperが1つである場合にはCombinerクラスを使う方が良い