HBase (2) - HBase MapReduce -
HBaseのMapReduce機能を使用してHBase上のテーブルをMapReduceしてみる
SampleMapper.javaを作成
package sample;
import java.io.IOException;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
public class SampleMapper extends TableMapper<Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
@Override
protected void map(ImmutableBytesWritable row, Result value, Context ctx) {
for(KeyValue kv : value.raw()) {
String family = Bytes.toString(kv.getFamily());
String qualifier = Bytes.toString(kv.getQualifier());
String val = Bytes.toString(kv.getValue());
System.out.println(String.format("%s:%s = %s", family, qualifier, val));
}
String str = Bytes.toString(row.get());
int index = str.lastIndexOf(".");
if(index == -1) {
return;
}
String ext = str.substring(index + 1);
try {
ctx.write(new Text(ext), one);
} catch(IOException e) {
e.printStackTrace();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
SampleReducer.javaを作成
package sample;
import java.io.IOException;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
public class SampleReducer extends TableReducer<Text, IntWritable, NullWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) {
int sum = 0;
for(IntWritable value : values) {
sum += value.get();
}
Put put = new Put(key.toString().getBytes());
put.add("score".getBytes(), "count".getBytes(), String.valueOf(sum).getBytes());
try {
ctx.write(NullWritable.get(), put);
} catch(IOException e) {
e.printStackTrace();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
Client.javaを作成
package sample;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable.class;
import org.apache.hadoop.hbase.HBaseConfiguraion;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
public class Client {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
HBaseConfiguration.addHbaseResources(conf);
Job job = Job.getInstance(new Cluster(conf), conf);
job.setJarByClass(Client.class);
job.setInputFormatClass(TableInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class);
TableMapReduceUtil.initTableMapperJob("samples", new Scan(), SampleMapper.class, Text.class, IntWritable.class, job);
TableMapReduceUtil.initTableReducerJob("test", SampleReducer.class, job);
System.out.println(job.waitForCompletion(true) ? 0 : 1);
}
}
TableMapReduceUtilを使う前提であれば、InputFormatClass/OutputFormatClass以外のHadoopの設定は必要ない。TableMapReduceUtilを使わない場合には
Configuration conf = new Configuration();
conf.set(TableInputFormat.INPUT_TABLE, "samples");
conf.set(TableOutputFormat.OUTPUT_TABLE, "test");
という風に設定しておく必要がある。その場合にはsetMapperClass等の厳密な設定が必要になる