HBase (1) - Hadoop MapReduce -> HBase -
Hadoop MapReduceをした集計結果をHBaseにプッシュしてみる
SampleMapper.javaを作成
package sample;
import java.text.MessageFormat;
import java.text.ParseException;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapWritable;
public class SampleMapper extends Mapper<LongWritbale, Text, Text, MapWritable> {
@Override
protected void map(LongWritable id, Text value, Context ctx) {
try {
Message msg = new MessageFormat("{0} - - [{1}] \"{2} {3} HTTP/1.1\" {4} {5} {6} {7}");
Object[] o = msg.parse(value.toString());
MapWritable map = new MapWritable();
map.put(new Text("host"), new Text((String)o[0]));
Text key = new Text();
key.set((String)o[3]);
ctx.write(key, map);
} catch(ParseException e) {
e.printStackTrace();
} catch(IOException e) {
e.printStackTrace();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
SampleReducer.javaを作成
package sample;
import java.util.Map;
import java.util.HashMap;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
public class SampleReducer extends Reducer<Text, MapWritable, Text, MapWritable> {
@Override
protected void reduce(Text key, Iterable<MapWritable> values, Context ctx) {
try {
Map<String, Map<String, String>> m = new HashMap<String, Map<String, String>>();
for(MapWritable value : values) {
int count = 1;
if(m.containsKey(key.toString())) {
count += Integer.parseInt(m.get(key.toString()).get("count"));
}
Map<String, String> v = new HashMap<String, String>();
v.put("count", String.valueOf(count));
for(Map.Entry<Writable, Writbale> entry : value.entrySet()) {
v.put(entry.getKey().toString(), entry.getValue().toString());
}
m.put(key.toString(), v);
}
for(String k : m.keySet()) {
Map<String, String> v = m.get(k);
MapWritable w = new MapWritable();
for(String wk : v.keySet()) {
w.put(new Text(wk), new Text(v.get(wk)));
}
ctx.write(new Text(k), w);
}
} catch(IOException e) {
e.printStackTrace();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
SampleOutputFormat.javaを作成
package sample;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import static org.apache.hadoop.hbase.regionserver.StoreFile.BloomType.ROWCOL;
public class SampleOutputFormat extends OutputFormat<Text, MapWritable> {
private Configuration config;
public SampleOutputFormat() {
config = HBaseConfiguration.create();
try {
HBaseAdmin admin = new HBaseAdmin(config);
/* テーブルを削除する場合
if(admin.tableExists("samples")) {
admin.disableTable("samples");
admin.deleteTable("samples");
}
*/
if(!admin.tableExists("samples")) {
HTableDescriptor descriptor = new HTableDescriptor("samples".getBytes());
HColumnDescriptor column = new HColumnDescriptor("column");
column.setBloomFilterType(ROWCOL);
column.setInMemory(true);
descriptor.addFamily(column);
admin.createTable(descriptor);
}
} catch(IOException e) {
e.printStackTrace();
}
}
public RecordWriter<Text,MapWritable> getRecordWriter(TaskAttemptContext ctx) {
return new RecordWriter<Text, MapWritable>() {
@Override
public void write(Text key, MapWritable value) {
try {
HTable table = new HTable(config, "samples");
Put put = new Put(key.toString().getBytes());
for(Map.Entry<Writable, Writable> entry : value.entrySet()) {
String k = entry.getKey().toString();
String v = entry.getValue().toString();
put.add("score".getBytes(), k.getBytes(), v.getBytes());
}
table.put(put);
table.close();
} catch(IOException e) {
e.printStackTrace();
}
}
@Override
public void close(TaskAttemptContext ctx) {
}
};
}
public OutputCommiter getOutputCommiter(TaskAttemptContext ctx) {
return new OutputCommitter() {
public void abortTask(TaskAttemptContext ctx) {
}
public void commitTask(TaskAttemptContext ctx) {
}
public void setupJob(JobContext ctx) {
}
public void setupTask(TaskAttemptContext ctx) {
}
public boolean needsTaskCommit(TaskAttemptContext ctx) {
return true;
}
};
}
public void checkOutputSpecs(JobContext ctx) {
}
}
Client.javaを作成
package sample;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.fs.Path;
public class Client {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(new Cluster(conf), conf);
job.setJarByClass(Client.class);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MapWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MapWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(SampleOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("path"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}