HBaseのTableMapper

2013-09-18T00:00:00+00:00 Hadoop HBase Java

前回の逆というか、TableMapperを使ってHBase上のテーブルからデータとってそれを普通のReducerで処理する

という事なので前回のと今回のを合わせれば、「HBaseで取ってHBaseに出力」なんていう事も出来る。というかベースはそれでなっててそれを分割しているにしかすぎないのが今回の件

HBaseに適当にデータ突っ込む

hbase(main) > put "input", "account1", "account:name", "hoge"
hbase(main) > put "input", "account2", "account:name", "fuga"
hbase(main) > put "input", "account3", "account:name", "foobar"
hbase(main) > put "input", "account4", "account:name", "hoge"

で(意味無い)このデータを集計する

SampleMapper.java

package sample;

import java.io.IOException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

public class SampleMapper extends TableMapper<Text, IntWritable> {

    private IntWritable one = new IntWritable(1);

    @Override
    protected void map(ImmutableBytesWritable row, Result result, Context context) {
        String value = new String(
            result.getValue(
                "account".getBytes(), // Column Family
                "name".getBytes() // Column Familyの属性?
            )
        );

        try {
            context.write(new Text(value), one);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

SampleReducer.java

package sample;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SampleReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) {
        int sum = 0;

        for (IntWritable value : values) {
            sum += value.get();
        }

        try {
            context.write(key, new IntWritable(sum));
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

SampleTaskRunner.java

package sample;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SampleTaskRunner {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        HBaseConfiguration.addHbaseResources(conf);

        Job job = Job.getInstance(conf);
        job.setMapperClass(SampleMapper.class);
        job.setReducerClass(SampleReducer.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        TableMapReduceUtil.initTableMapperJob(
            "input".getBytes(),
            new Scan(),
            SampleMapper.class,
            Text.class,
            IntWritable.class,
            job
        );

        TextOutputFormat.setOutputPath(job, new Path("output"));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

前回はinitTableReducerJobだったのが今回はinitTableMapperJobを設定

で前回と変わらずProgramDriverを使って動かす。実行はgradleで。んな感じでやると

foobar  1
fuga    1
hoge    2

んな感じで結果が得られると。

余談として、昔はResult#rowメソッドでKeyValueなクラスインスタンスが取れて属性とか色々な情報もとれたんだが、そこら辺のインターフェースが変わってるっぽいのでどうやるのかちょいと微妙

HBaseのHFileをコマンドで読み取る方法 HBaseのTableReducer