HBaseのTableReducer
タイトル通り、Hadoop MapReduce上でReducer側から処理したデータをHBaseにぶっこむ為にTableReducerを使う事が出来る。やってみた
セットアップ
Hadoop(1.2.1)は既にあるのでHBase(0.94.11)を構築する。んまぁ特筆する事も無いけど
[/bin/sh] hbase shell hbase(main) > create "output", "score"
的な事を行なってHadoopで処理したデータをHBaseに突っ込む為の所を作っておく。あとHDFS上にデータ突っ込んでおくのも忘れずに。んまぁこんかいは「hoge fuga foobar」が含まれるテキストを適当に書いて突っ込んだけ
あとはクラスをだらだらと書く
SampleMapper.java
package sample;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SampleMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable id, Text value, Context context) {
try {
context.write(value, one);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
とりあえずスルー
SampleReducer.java
package sample;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
public class SampleReducer extends TableReducer<Text, IntWritable, NullWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
Put put = new Put(
key.toString().getBytes() // Row Key
);
put.add(
"score".getBytes(), // Column Family
"count".getBytes(), // Key
String.valueOf(sum).getBytes() // Value
);
try {
context.write(NullWritable.get(), put);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ReducerクラスじゃなくてHBaseのTableReducerを継承する。公式ではwriteの第1引数の型がImmutableByteWritable的なのになってるけど、まぁnullしか指定してないならNullWritableでええんじゃないのかという事で
SampleTaskRunner.java
package sample;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class SampleTaskRunner {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
HBaseConfiguration.addHbaseResources(conf);
Job job = Job.getInstance(conf);
job.setJarByClass(SampleTaskRunner.class);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("data"));
// TableOutputFormatで使うHBase上テーブルを設定
TableMapReduceUtil.initTableReducerJob("output", SampleReducer.class, job);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
っていう感じで。あとはjar化して
hadoop jar sample.jar sample.SampleTaskRunner
辺りすれば良いんだけど依存性関係でめんどくさい。という事でgradleのJavaExecを使いつつのProgramDriverで処理する方向で
Client.java
package sample;
import org.apache.hadoop.util.ProgramDriver;
public class Client {
public static void main(String[] args) throws Exception {
ProgramDriver driver = new ProgramDriver();
try {
driver.addClass("sample", Client.class, "hbase1");
driver.driver(args);
} catch (Throwable e) {
e.printStackTrace();;
}
System.exit(0);
}
}
つまりは
java sample.Client sample
っていう感じでやれば動かせられますよっていう事で
build.gradle
apply plugin: "java"
apply plugin: "eclipse"
repositories {
mavenCentral()
}
dependencies {
compile "org.apache.hadoop:hadoop-core:1.2.1"
compile "org.apache.hbase:hbase:0.94.11"
}
task run(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
main = "sample.Runner"
args = ["sample"]
}
runタスクを実行するだけで良い。んまぁこれで終わり。実行し、HBaseシェル側で見てみる
hbase(main) > scan "output" ROW COLUMN+CELL foobar column=score:count, timestamp=1379472861619, value=3 fuga column=score:count, timestamp=1379472861619, value=2 hoge column=score:count, timestamp=1379472861619, value=3
という感じでHadoop MapReduceで処理されて最終的にHBaseにぶっこまれるっていう感じ