Hadoop (4) - OutputFormat -
OutputFormatを拡張して利用してみる
EqualOutputFormat.javaを作成
package sample;
import java.io.IOException;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
public class EqualOutputFormat extends TextOutputFormat<Text, IntWritable> {
@Override
public RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext ctx) throws IOException, InterruptedException {
setOutputName(ctx, "test");
Configuration conf = ctx.getConfiguration();
Path path = getDefaultWorkFile(ctx, ".txt");
FileSystem fs = path.getFileSystem(conf);
FSDataOutputStream out = fs.create(path, true);
return new LineRecordWriter<Text,IntWritable>(out, "=");
}
}
LineRecordWriterを使用する事で指定したデリミタで行データを連結して出力する事ができる。利用するにはクライアント側側でsetOutputFormatClassを指定する事で利用する事ができるようになる
備考: JSONOutputFormatを作ってみる
package sample;
import java.io.IOException;
import java.io.DataOutputStream;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
public class JSONOutputFormat extends TextOutputFormat<Text, IntWritable> {
@Override
public RecordWriter<Text,IntWritable> getRecordWriter(TaskAttemptContext ctx) throws IOException, InterruptedException {
setOutputName(ctx, "test");
Configuration conf = ctx.getConfiguration();
Path path = getDefaultWorkFile(ctx, ".txt");
FileSystem fs = path.getFileSystem(conf);
FSDataOutputStream out = fs.create(path, false);
return new JSONRecordWriter(out);
}
private static class JSONRecordWriter extends LineRecordWriter<Text, IntWritable> {
public JSONRecordWriter(DataOutputStream out) {
super(out);
try {
out.write("{".getBytes());
} catch(IOException e) {
e.printStackTrace();
}
}
@Override
public synchronized void write(Text key, IntWritable value) throws IOException {
String text = String.format("\"%s\": %d,", key.toString(), value.get());
out.write(text.getBytes());
}
@Override
public synchronized void close(TaskAttemptContext ctx) throws IOException {
out.write("}".getBytes());
super.close(null);
}
}
}