HBaseのTableReducer

2013-09-18T00:00:00+00:00 gradle Hadoop HBase Java

タイトル通り、Hadoop MapReduce上でReducer側から処理したデータをHBaseにぶっこむ為にTableReducerを使う事が出来る。やってみた

セットアップ

Hadoop(1.2.1)は既にあるのでHBase(0.94.11)を構築する。んまぁ特筆する事も無いけど

[/bin/sh] hbase shell
hbase(main) > create "output", "score"

的な事を行なってHadoopで処理したデータをHBaseに突っ込む為の所を作っておく。あとHDFS上にデータ突っ込んでおくのも忘れずに。んまぁこんかいは「hoge fuga foobar」が含まれるテキストを適当に書いて突っ込んだけ

あとはクラスをだらだらと書く

SampleMapper.java

package sample;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SampleMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private IntWritable one = new IntWritable(1);

    @Override
    protected void map(LongWritable id, Text value, Context context) {
        try {
            context.write(value, one);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

とりあえずスルー

SampleReducer.java

package sample;

import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

public class SampleReducer extends TableReducer<Text, IntWritable, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) {
        int sum = 0;

        for (IntWritable value : values) {
            sum += value.get();
        }

        Put put = new Put(
            key.toString().getBytes() // Row Key
        );

        put.add(
            "score".getBytes(), // Column Family
            "count".getBytes(), // Key
            String.valueOf(sum).getBytes() // Value
        );

        try {
            context.write(NullWritable.get(), put);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

ReducerクラスじゃなくてHBaseのTableReducerを継承する。公式ではwriteの第1引数の型がImmutableByteWritable的なのになってるけど、まぁnullしか指定してないならNullWritableでええんじゃないのかという事で

SampleTaskRunner.java

package sample;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class SampleTaskRunner {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        HBaseConfiguration.addHbaseResources(conf);

        Job job = Job.getInstance(conf);
        job.setJarByClass(SampleTaskRunner.class);
        job.setMapperClass(SampleMapper.class);
        job.setReducerClass(SampleReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TableOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("data"));

        // TableOutputFormatで使うHBase上テーブルを設定
        TableMapReduceUtil.initTableReducerJob("output", SampleReducer.class, job);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

っていう感じで。あとはjar化して

hadoop jar sample.jar sample.SampleTaskRunner

辺りすれば良いんだけど依存性関係でめんどくさい。という事でgradleのJavaExecを使いつつのProgramDriverで処理する方向で

Client.java

package sample;

import org.apache.hadoop.util.ProgramDriver;

public class Client {
    public static void main(String[] args) throws Exception {
        ProgramDriver driver = new ProgramDriver();

        try {
            driver.addClass("sample", Client.class, "hbase1");
            driver.driver(args);
        } catch (Throwable e) {
            e.printStackTrace();;
        }

        System.exit(0);
    }
}

つまりは

java sample.Client sample

っていう感じでやれば動かせられますよっていう事で

build.gradle

apply plugin: "java"
apply plugin: "eclipse"

repositories {
    mavenCentral()
}

dependencies {
    compile "org.apache.hadoop:hadoop-core:1.2.1"
    compile "org.apache.hbase:hbase:0.94.11"
}

task run(type: JavaExec) {
    classpath = sourceSets.main.runtimeClasspath
    main = "sample.Runner"
    args = ["sample"]
}

runタスクを実行するだけで良い。んまぁこれで終わり。実行し、HBaseシェル側で見てみる

hbase(main) > scan "output"
ROW                    COLUMN+CELL
 foobar                column=score:count, timestamp=1379472861619, value=3
 fuga                  column=score:count, timestamp=1379472861619, value=2
 hoge                  column=score:count, timestamp=1379472861619, value=3

という感じでHadoop MapReduceで処理されて最終的にHBaseにぶっこまれるっていう感じ

参考: http://hbase.apache.org/book/mapreduce.example.html

HBaseのTableMapper Struts2をやってみる (5) - struts.xmlに定義する事なくActionをテストする -