Hadoop DBInputFormat/DBOutputFormat を使ってみる
今更って感じですが... っていうか元ブログ(自宅用)からの現在のバージョンで動くか確認しつつの転用っていう形
※ちなみに検証したHadoopのバージョンは1.2.1
概要
まぁ必要かどうかに関してはおいといて、MySQL等のデータベースからデータを取り出して別のテーブルにレポートとして出力する方法的な感じになるかなと。まぁBasic Construction的な形の使い方的にはHDFSにデータをうpしてそれをMapReduceで処理する感じじゃねーかなと。まぁ今回あんま関係ないので(ry
とりあえず入力側なテーブルからデータをカウントして別テーブルにレポートとして出力する感じかと。作る必要性があるのは
- Client.java (MapReduceタスクをぶん投げるやつ)
- DBInputWritable.java (DBからのデータを受け取って保管しておく的なクラス)
- DBOutputWritable.java (省略)
- Mapper及びReducerなクラス
の計5つのクラスが必要
DBInputWritable.java
package sample;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
public class DBInputWritable implements Writable, DBWritable {
private int id;
private String name;
@Override
public void readFields(DataInput in) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void readFields(ResultSet rs) throws SQLException {
id = rs.getInt(1);
name = rs.getString(2);
}
@Override
public void write(DataOutput out) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void write(PreparedStatement ps) throws SQLException {
ps.setInt(1, id);
ps.setString(2, name);
}
public int getId() {
return id;
}
public String getName() {
return name;
}
}
InputFormat側から動的にインスタンスが作られる為に空コンストラクタは必ず必要になる。又、今回はファイルから読み込む訳ではないので、引数がDataInput/DataOutputの物はサポートしないようにする
DBOutputWritable.java
package sample;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
public class DBOutputWritable implements Writable, DBWritable {
private String name;
private int count;
public DBOutputWritable(String name, int count) {
this.name = name;
this.count = count;
}
@Override
public void readFields(DataInput in) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void readFields(ResultSet rs) throws SQLException {
name = rs.getString(1);
count = rs.getInt(2);
}
@Override
public void write(DataOutput out) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public void write(PreparedStatement ps) throws SQLException {
ps.setString(1, name);
ps.setInt(2, count);
}
}
ReducerからOutputFormat側に明示的に流す為、空引数なコンストラクタは必要ない
SampleMapper.java
package sample;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
public class SampleMapper extends Mapper<LongWritable, DBInputWritable, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
@Override
protected void map(LongWritable id, DBInputWritable value, Context ctx) {
try {
String[] keys = value.getName().split(" ");
for(String key : keys) {
ctx.write(new Text(key),one);
}
} catch(IOException e) {
e.printStackTrace();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
DBInputFormatで行を読み込んで、DBInputWritableのreadFieldsが呼び出され引数に渡されるので、ジェネリックスの2番目はDBInputWritableを指定しなければならない
SampleReducer.java
package sample;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
public class SampleReducer extends Reducer<Text, IntWritable, DBOutputWritable, NullWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) {
int sum = 0;
for(IntWritable value : values) {
sum += value.get();
}
try {
ctx.write(new DBOutputWritable(key.toString(), sum), NullWritable.get());
} catch(IOException e) {
e.printStackTrace();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
OutputFormatにはDBOutputWritableを流す為、値部の型はNullWritableにしておく。ジェネリックスの型もキー部はDBOutputWritableにする
Client.java
package sample;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
public class Client {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
// JDBCドライバー及びURLを設定
DBConfiguration.configureDB(
conf,
"com.mysql.jdbc.Driver",
"jdbc:mysql://localhost:3306/dbname",
"user",
"pass"
);
Job job = new Job(conf);
job.setJarByClass(Client.class);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(DBOutputWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setInputFormatClass(DBInputFormat.class);
job.setOutputFormatClass(DBOutputFormat.class);
DBInputFormat.setInput(
job,
DBInputWritable.class,
"input_tbl", // 入力テーブル
null,
null,
new String[] { "id", "name" } // idとnameカラムを持つ
);
DBOutputFormat.setOutput(
job,
"output_tbl", // 出力テーブル
new String[] { "name", "count" } // nameとcountカラムを持つ
);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
っていう感じ。適当にinput_tblにデータ突っ込んで
hadoop jar sample.jar sample.Client
な感じでタスク送り込んでやると
mysql> select * from output_tbl; +----+--------+-------+ | id | name | count | +----+--------+-------+ | 1 | foobar | 2 | | 2 | fuga | 1 | | 3 | hoge | 3 | +----+--------+-------+ 3 rows in set (0.00 sec)
っ的な感じでDBに出力される
もちろんDBから取ってきてHDFSにレポートするとか、HDFSからデータ読み込んでDBにレポートするとかも出来るので(ry
まぁ使い方的な感じはこんなもんかと