HBaseのMapReduce機能を使用してHBase上のテーブルをMapReduceしてみる

SampleMapper.javaを作成

package sample;

import java.io.IOException;

import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;

public class SampleMapper extends TableMapper<Text, IntWritable> {

    private static final IntWritable one = new IntWritable(1);

    @Override
    protected void map(ImmutableBytesWritable row, Result value, Context ctx) {
        for(KeyValue kv : value.raw()) {
            String family = Bytes.toString(kv.getFamily());
            String qualifier = Bytes.toString(kv.getQualifier());
            String val = Bytes.toString(kv.getValue());

            System.out.println(String.format("%s:%s = %s", family, qualifier, val));
        }

        String str = Bytes.toString(row.get());
        int index = str.lastIndexOf(".");

        if(index == -1) {
            return;
        }

        String ext = str.substring(index + 1);

        try {
            ctx.write(new Text(ext), one);
        } catch(IOException e) {
            e.printStackTrace();
        } catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}

SampleReducer.javaを作成

package sample;

import java.io.IOException;

import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;

public class SampleReducer extends TableReducer<Text, IntWritable, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) {
        int sum = 0;

        for(IntWritable value : values) {
            sum += value.get();
        }

        Put put = new Put(key.toString().getBytes());
        put.add("score".getBytes(), "count".getBytes(), String.valueOf(sum).getBytes());

        try {
            ctx.write(NullWritable.get(), put);
        } catch(IOException e) {
            e.printStackTrace();
        } catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Client.javaを作成

package sample;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable.class;
import org.apache.hadoop.hbase.HBaseConfiguraion;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;

public class Client {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        HBaseConfiguration.addHbaseResources(conf);

        Job job = Job.getInstance(new Cluster(conf), conf);
        job.setJarByClass(Client.class);
        job.setInputFormatClass(TableInputFormat.class);
        job.setOutputFormatClass(TableOutputFormat.class);

        TableMapReduceUtil.initTableMapperJob("samples", new Scan(), SampleMapper.class, Text.class, IntWritable.class, job);
        TableMapReduceUtil.initTableReducerJob("test", SampleReducer.class, job);

        System.out.println(job.waitForCompletion(true) ? 0 : 1);
    }
}

TableMapReduceUtilを使う前提であれば、InputFormatClass/OutputFormatClass以外のHadoopの設定は必要ない。TableMapReduceUtilを使わない場合には

Configuration conf = new Configuration();
conf.set(TableInputFormat.INPUT_TABLE, "samples");
conf.set(TableOutputFormat.OUTPUT_TABLE, "test");

という風に設定しておく必要がある。その場合にはsetMapperClass等の厳密な設定が必要になる