OutputFormatを拡張して利用してみる

EqualOutputFormat.javaを作成

package sample;

import java.io.IOException;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;

public class EqualOutputFormat extends TextOutputFormat<Text, IntWritable> {
    @Override
    public RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext ctx) throws IOException, InterruptedException {
        setOutputName(ctx, "test");

        Configuration conf = ctx.getConfiguration();
        Path path = getDefaultWorkFile(ctx, ".txt");
        FileSystem fs = path.getFileSystem(conf);
        FSDataOutputStream out = fs.create(path, true);

        return new LineRecordWriter<Text,IntWritable>(out, "=");
    }
}

LineRecordWriterを使用する事で指定したデリミタで行データを連結して出力する事ができる。利用するにはクライアント側側でsetOutputFormatClassを指定する事で利用する事ができるようになる

備考: JSONOutputFormatを作ってみる

package sample;

import java.io.IOException;
import java.io.DataOutputStream;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;

public class JSONOutputFormat extends TextOutputFormat<Text, IntWritable> {
    @Override
    public RecordWriter<Text,IntWritable> getRecordWriter(TaskAttemptContext ctx) throws IOException, InterruptedException {
        setOutputName(ctx, "test");

        Configuration conf = ctx.getConfiguration();
        Path path = getDefaultWorkFile(ctx, ".txt");
        FileSystem fs = path.getFileSystem(conf);
        FSDataOutputStream out = fs.create(path, false);

        return new JSONRecordWriter(out);
    }

    private static class JSONRecordWriter extends LineRecordWriter<Text, IntWritable> {
        public JSONRecordWriter(DataOutputStream out) {
            super(out);

            try {
                out.write("{".getBytes());
            } catch(IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public synchronized void write(Text key, IntWritable value) throws IOException {
            String text = String.format("\"%s\": %d,", key.toString(), value.get());
            out.write(text.getBytes());
        }

        @Override
        public synchronized void close(TaskAttemptContext ctx) throws IOException {
            out.write("}".getBytes());
            super.close(null);
        }
    }
}