kinjouj.github.io

Hadoop (10) - RawComparator -

2011-04-10T00:00:00+00:00 Java Hadoop

Reducerで集計した結果をソートしてみる

SampleComparator.javaを作成

package sample;

import java.io.IOException;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

public class SampleComparator extends WritableComparator {

    private DataInputBuffer buffer;

    public SampleComparator() {
        super(IntWritbale.class, true);
        buffer = new DataInputBuffer();
    }

    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        int _compare = -1;

        try {
            Text key1 = new Text();
            Text key2 = new Text();
            IntWritable value1 = new IntWritable();
            IntWritable value2 = new IntWritable();

            // 以下でバイトレコードからデータを読み出す

            buffer.reset(b1, s1, l1);
            key1.readFields(buffer);

            buffer.reset(b1, s1, l1);
            value1.readFields(buffer);

            buffer.reset(b2, s2, l2);
            key2.readFileds(buffer);

            buffer.reset(b2, s2, l2);
            value2.readFields(buffer);

            // 値部をintで取得するならこれでも良い
            // int value1 = readInt(b1, s1);

            // テキストでソートする場合
            // _compare = new Text.Compartor().compare(key1, key2);

            _compare = compare(value1, value2);
        } catch(IOException e) {
            e.printStackTrace();
        }

        return _compare;
    }

    @Override
    public int compare(WritableComparable o1, WritableComparable o2) {
        return compare(((IntWritable)o1).get(), ((IntWritable)o2).get());
    }

    @Override
    public int compare(Object o1, Object o2) {
        return ((Integer)o1) - ((Integer)o2));
    }
}
||lt;


RawComparatorを使う場合はRawComparatorを実装しないといけないが、WritableComparatorを継承する方が良い。キー部に指定されたText型はDataInputBufferを利用して取得する事ができるので、指定されたキー部のテキストでソートする事も可能


#### Client.javaを作成


```java
package sample;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.fs.Path;

public class Client {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(new Cluster(conf), conf);
        job.setJarByClass(Client.class);
        job.setMapperClass(SampleMapper.class);
        job.setReducerClass(SampleReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        // 作成したComparatorクラスを設定
        job.setSortComparatorClass(SampleComparator.class);

        FileInputFormat.setInputPaths(job, new Path("input"));
        FileOutputFormat.setOutputPath(job, new Path("output"));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}