Hadoop MapReduceをした集計結果をHBaseにプッシュしてみる

SampleMapper.javaを作成

package sample;

import java.text.MessageFormat;
import java.text.ParseException;
import java.io.IOException;

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapWritable;

public class SampleMapper extends Mapper<LongWritbale, Text, Text, MapWritable> {
    @Override
    protected void map(LongWritable id, Text value, Context ctx) {
        try {
            Message msg = new MessageFormat("{0} - - [{1}] \"{2} {3} HTTP/1.1\" {4} {5} {6} {7}");
            Object[] o = msg.parse(value.toString());

            MapWritable map = new MapWritable();
            map.put(new Text("host"), new Text((String)o[0]));

            Text key = new Text();
            key.set((String)o[3]);

            ctx.write(key, map);
        } catch(ParseException e) {
            e.printStackTrace();
        } catch(IOException e) {
            e.printStackTrace();
        } catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}

SampleReducer.javaを作成

package sample;

import java.util.Map;
import java.util.HashMap;
import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;

public class SampleReducer extends Reducer<Text, MapWritable, Text, MapWritable> {
    @Override
    protected void reduce(Text key, Iterable<MapWritable> values, Context ctx) {
        try {
            Map<String, Map<String, String>> m = new HashMap<String, Map<String, String>>();

            for(MapWritable value : values) {
                int count = 1;

                if(m.containsKey(key.toString())) {
                    count += Integer.parseInt(m.get(key.toString()).get("count"));
                }

                Map<String, String> v = new HashMap<String, String>();
                v.put("count", String.valueOf(count));

                for(Map.Entry<Writable, Writbale> entry : value.entrySet()) {
                    v.put(entry.getKey().toString(), entry.getValue().toString());
                }

                m.put(key.toString(), v);
            }

            for(String k : m.keySet()) {
                Map<String, String> v = m.get(k);
                MapWritable w = new MapWritable();

                for(String wk : v.keySet()) {
                    w.put(new Text(wk), new Text(v.get(wk)));
                }

                ctx.write(new Text(k), w);
            }
        } catch(IOException e) {
            e.printStackTrace();
        } catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}

SampleOutputFormat.javaを作成

package sample;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;

import static org.apache.hadoop.hbase.regionserver.StoreFile.BloomType.ROWCOL;

public class SampleOutputFormat extends OutputFormat<Text, MapWritable> {

    private Configuration config;

    public SampleOutputFormat() {
        config = HBaseConfiguration.create();

        try {
            HBaseAdmin admin = new HBaseAdmin(config);

            /* テーブルを削除する場合
            if(admin.tableExists("samples")) {
                admin.disableTable("samples");
                admin.deleteTable("samples");
            }
            */

            if(!admin.tableExists("samples")) {
                HTableDescriptor descriptor = new HTableDescriptor("samples".getBytes());

                HColumnDescriptor column = new HColumnDescriptor("column");
                column.setBloomFilterType(ROWCOL);
                column.setInMemory(true);

                descriptor.addFamily(column);
                admin.createTable(descriptor);
            }
        } catch(IOException e) {
            e.printStackTrace();
        }
    }

    public RecordWriter<Text,MapWritable> getRecordWriter(TaskAttemptContext ctx) {
        return new RecordWriter<Text, MapWritable>() {
            @Override
            public void write(Text key, MapWritable value) {
                try {
                    HTable table = new HTable(config, "samples");
                    Put put = new Put(key.toString().getBytes());

                    for(Map.Entry<Writable, Writable> entry : value.entrySet()) {
                        String k = entry.getKey().toString();
                        String v = entry.getValue().toString();

                        put.add("score".getBytes(), k.getBytes(), v.getBytes());
                    }

                    table.put(put);
                    table.close();
                } catch(IOException e) {
                    e.printStackTrace();
                }
            }

            @Override
            public void close(TaskAttemptContext ctx) {
            }
        };
    }

    public OutputCommiter getOutputCommiter(TaskAttemptContext ctx) {
        return new OutputCommitter() {
            public void abortTask(TaskAttemptContext ctx) {
            }
            public void commitTask(TaskAttemptContext ctx) {
            }
            public void setupJob(JobContext ctx) {
            }
            public void setupTask(TaskAttemptContext ctx) {
            }
            public boolean needsTaskCommit(TaskAttemptContext ctx) {
                return true;
            }
        };
    }

    public void checkOutputSpecs(JobContext ctx) {
    }
}

Client.javaを作成

package sample;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.fs.Path;

public class Client {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(new Cluster(conf), conf);
        job.setJarByClass(Client.class);
        job.setMapperClass(SampleMapper.class);
        job.setReducerClass(SampleReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(MapWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(MapWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(SampleOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("path"));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}