HBaseのTableMapper
前回の逆というか、TableMapperを使ってHBase上のテーブルからデータとってそれを普通のReducerで処理する
という事なので前回のと今回のを合わせれば、「HBaseで取ってHBaseに出力」なんていう事も出来る。というかベースはそれでなっててそれを分割しているにしかすぎないのが今回の件
HBaseに適当にデータ突っ込む
hbase(main) > put "input", "account1", "account:name", "hoge" hbase(main) > put "input", "account2", "account:name", "fuga" hbase(main) > put "input", "account3", "account:name", "foobar" hbase(main) > put "input", "account4", "account:name", "hoge"
で(意味無い)このデータを集計する
SampleMapper.java
package sample;
import java.io.IOException;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
public class SampleMapper extends TableMapper<Text, IntWritable> {
private IntWritable one = new IntWritable(1);
@Override
protected void map(ImmutableBytesWritable row, Result result, Context context) {
String value = new String(
result.getValue(
"account".getBytes(), // Column Family
"name".getBytes() // Column Familyの属性?
)
);
try {
context.write(new Text(value), one);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
SampleReducer.java
package sample;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SampleReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
try {
context.write(key, new IntWritable(sum));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
SampleTaskRunner.java
package sample;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SampleTaskRunner {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
HBaseConfiguration.addHbaseResources(conf);
Job job = Job.getInstance(conf);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
job.setOutputFormatClass(TextOutputFormat.class);
TableMapReduceUtil.initTableMapperJob(
"input".getBytes(),
new Scan(),
SampleMapper.class,
Text.class,
IntWritable.class,
job
);
TextOutputFormat.setOutputPath(job, new Path("output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
前回はinitTableReducerJobだったのが今回はinitTableMapperJobを設定
で前回と変わらずProgramDriverを使って動かす。実行はgradleで。んな感じでやると
foobar 1 fuga 1 hoge 2
んな感じで結果が得られると。
余談として、昔はResult#rowメソッドでKeyValueなクラスインスタンスが取れて属性とか色々な情報もとれたんだが、そこら辺のインターフェースが変わってるっぽいのでどうやるのかちょいと微妙