MySQLのデータをHadoopで利用するにはDBInputFormat及びDBOutputFormatを使用すれば良い

DBOutputFormatだけを使う前提であればJDBCドライバー自体はHDFSに格納しDistributedCacheを利用する事ができるが、InputFormatでは依存性の解決ができない。なのでJDBCドライバーはHadoopコンテナに事前にコピーしておく

TestWritable.javaを作成

DBInputFormatでテーブルから読み込んでMapperで利用する為、別途Writable実装が必要になる

package sample;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class TestWritable implements Writable, DBWritable {

    private int id;
    private String name;

    @Override
    public void readFields(DataInput in) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void readFields(ResultSet rs) throws SQLException {
        id = rs.getInt(1);
        name = rs.getString(2);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void write(PreparedStatement ps) throws SQLException {
        ps.setInt(1, id);
        ps.setString(2, name);
    }

    public int getId() {
        return id;
    }

    public String getName() {
        return name;
    }
}

InputFormat側から動的にインスタンスが作られる為に空コンストラクタは必ず必要になる。又、今回はファイルから読み込む訳ではないので、引数がDataInput/DataOutputの物はサポートしないようにする

SampleWritable.javaを作成

package sample;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class SampleWritable implements Writable, DBWritable {

    private String name;
    private int count;

    public SampleWritable(String name, int count) {
        this.name = name;
        this.count = count;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void readFields(ResultSet rs) throws SQLException {
        name = rs.getString(1);
        count = rs.getInt(2);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        throw new UnsupportedOperationException();
    }

    @Override
    public void write(PreparedStatement ps) throws SQLException {
        ps.setString(1, name);
        ps.setInt(2, count);
    }
}

ReducerからOutputFormat側に明示的に流す為、空引数なコンストラクタは必要ない

SampleMapper.javaを作成

package sample;

import java.io.IOException;

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;

public class SampleMapper extends Mapper<LongWritable, TestWritable, Text, IntWritable> {

    private IntWritable one = new IntWritable(1);

    @Override
    protected void map(LongWritable id, TestWritable value, Context ctx) {
        try {
            String[] keys = value.getName().split(" ");

            for(String key : keys) {
                ctx.write(new Text(key), one);
            }
        } catch(IOException e) {
            e.printStackTrace();
        } catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}

DBInputFormatで行を読み込んで、TestWritableのreadFieldsが呼び出され引数に渡されるので、ジェネリックスの2番目はTestWritableを指定しなければならない

SampleReducer.javaを作成

package sample;

import java.io.IOException;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;

public class SampleReducer extends Reducer<Text, IntWritable, SampleWritable, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) {
        int sum = 0;

        for(IntWritable value : values) {
            sum += value.get();
        }

        try {
            ctx.write(new SampleWritable(key.toString(), sum), NullWritable.get());
        } catch(IOException e) {
            e.printStackTrace();
        } catch(IntterruptedException e) {
            e.printStackTrace();
        }
    }
}

OutputFormatにはSampleWritableを流す為、値部の型はNullWritableにしておく。ジェネリックスの型もキー部はSampleWritableにする

Client.javaを作成

package sample;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.fs.Path;

public class Client {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        // JDBCドライバー及びURLを設定
        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/sample", "username", "password");

        Job job = new Job.getInstance(new Cluster(conf), conf);
        job.setJarByClass(Client.class);
        job.setMapperClass(SampleMapper.class);
        job.setReducerClass(SampleReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(SampleWritable.class);
        job.setOutputValueClass(NullWritable.class);
        job.setInputFormatClass(DBInputFormat.class);
        job.setOutputFormatClass(DBOutputFormat.class);

        DBInputFormat.setInput(job, TestWritable.class, "test_tbl", null, null, new String[] { "id", "name" });
        DBOutputFormat.setOutput(job, "sample_tbl", new String[] { "name", "count" });

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}