Mahout (7) - HadoopでMahout -
Hadoop MapReduceでMahoutを使用してみる。事前にHadoop側にMahoutのライブラリをコピーしておく
Client.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.fs.Path;
public class Client {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(new Cluster(conf), conf);
job.setJarByClass(Client.class);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(MapWritable.class);
job.setOutputValueClass(DoubleWritable.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(SampleOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("input"));
FileOutputFormat.setOutputPath(job, new Path("output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
SampleMapper.java
import java.util.StringTokenizer;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;
public class SampleMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context ctx) {
try {
StringTokenizer tokens = new StringTokenizer(value.toString(), ",");
while(tokens.hasMoreTokens()) {
String user = tokens.nextToken();
tokens.nextToken();
tokens.nextToken();
ctx.write(new LongWritable(Long.valueOf(user)), NullWritable.get());
}
} catch(Exception e) {
e.printStackTrace();
}
}
}
SampleReducer.java
import java.io.IOException;
import java.io.File;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel;
import org.apache.mahout.cf.taste.impl.similarity.LogLikelihoodSimilarity;
import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
import org.apache.mahout.cf.taste.common.TasteException;
public class SampleReducer extends Reducer<LongWritable, NullWritable, MapWritable, DoubleWritable> {
private DataModel model;
private LogLikelihoodSimilarity similar;
@Override
protected void setup(Context ctx) throws IOException {
Configuration conf = ctx.getConfiguration();
Path path = new Path("input/data.txt");
FileSystem fs = FileSystem.get(path.toUri(), conf);
File tmp = File.createTempFile("mahout", "txt");
tmp.deleteOnExit();
fs.copyToLocalFile(path, new Path(tmp.getAbsolutePath()));
model = new FileDataModel(tmp);
similar = new LogLikelihoodSimilarity(model);
}
@Override
protected void reduce(LongWritable key, Iterable<NullWritable> values, Context ctx) {
long user = key.get();
try {
LongPrimitiveIterator targets = model.getUserIDs();
while(targets.hasNext()) {
long target = targets.nextLong();
if(user == target) {
continue;
}
Double distance = similar.userSimilarity(user, target);
if(!distance.isNaN()) {
MapWritable w = new MapWritable();
w.put(new Text("user"), new LongWritable(user));
w.put(new Text("target"), new LongWritable(target));
ctx.write(w, new DoubleWritable(distance));
}
}
} catch(TasteException e) {
e.printStackTrace();
} catch(IOException e) {
e.printStackTrace();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
SampleOutputFormat.class
import java.io.IOException;
import java.io.DataOutputStream;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
public class SampleOutputFormat extends TextOutputFormat<MapWritable, DoubleWritable> {
@Override
public RecordWriter<MapWritable, DoubleWritable> getRecordWriter(TaskAttemptContext ctx) throws IOException, InterruptedException {
Configuration conf = ctx.getConfiguration();
Path path = getDefaultWorkFile(ctx, ".txt");
FileSystem fs = path.getFileSystem(conf);
FSDataOutputStream out = fs.create(path, true);
return new SampleRecordWriter(out);
}
private static class SampleRecordWriter extends LineRecordWriter<MapWritable, DoubleWritable> {
public SampleRecordWriter(DataOutputStream out) {
super(out);
try {
out.write("user\ttarget\tscore\n".getBytes());
} catch(IOException e) {
e.printStackTrace();
}
}
public synchronized void write(MapWritable key,DoubleWritable value) {
long user = ((LongWritable)key.get(new Text("user"))).get();
long target = ((LongWritable)key.get(new Text("target"))).get();
double distance = value.get();
try {
out.write(
String.format(
"%d\t%d\t%s\n",
user,
target,
String.valueOf(distance)
).getBytes()
);
} catch(IOException e) {
e.printStackTrace();
}
}
public synchronized void close(TaskAttemptContext ctx) throws IOException {
super.close(null);
}
}
}