Hadoop (10) - RawComparator -
Reducerで集計した結果をソートしてみる
SampleComparator.javaを作成
package sample;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
public class SampleComparator extends WritableComparator {
private DataInputBuffer buffer;
public SampleComparator() {
super(IntWritbale.class, true);
buffer = new DataInputBuffer();
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
int _compare = -1;
try {
Text key1 = new Text();
Text key2 = new Text();
IntWritable value1 = new IntWritable();
IntWritable value2 = new IntWritable();
// 以下でバイトレコードからデータを読み出す
buffer.reset(b1, s1, l1);
key1.readFields(buffer);
buffer.reset(b1, s1, l1);
value1.readFields(buffer);
buffer.reset(b2, s2, l2);
key2.readFileds(buffer);
buffer.reset(b2, s2, l2);
value2.readFields(buffer);
// 値部をintで取得するならこれでも良い
// int value1 = readInt(b1, s1);
// テキストでソートする場合
// _compare = new Text.Compartor().compare(key1, key2);
_compare = compare(value1, value2);
} catch(IOException e) {
e.printStackTrace();
}
return _compare;
}
@Override
public int compare(WritableComparable o1, WritableComparable o2) {
return compare(((IntWritable)o1).get(), ((IntWritable)o2).get());
}
@Override
public int compare(Object o1, Object o2) {
return ((Integer)o1) - ((Integer)o2));
}
}
||lt;
RawComparatorを使う場合はRawComparatorを実装しないといけないが、WritableComparatorを継承する方が良い。キー部に指定されたText型はDataInputBufferを利用して取得する事ができるので、指定されたキー部のテキストでソートする事も可能
#### Client.javaを作成
```java
package sample;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.fs.Path;
public class Client {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(new Cluster(conf), conf);
job.setJarByClass(Client.class);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 作成したComparatorクラスを設定
job.setSortComparatorClass(SampleComparator.class);
FileInputFormat.setInputPaths(job, new Path("input"));
FileOutputFormat.setOutputPath(job, new Path("output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}