kinjouj.github.io

Mahout (7) - HadoopでMahout -

2011-05-07T00:00:00+00:00 Java Hadoop Mahout

Hadoop MapReduceでMahoutを使用してみる。事前にHadoop側にMahoutのライブラリをコピーしておく

Client.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.fs.Path;

public class Client {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(new Cluster(conf), conf);
        job.setJarByClass(Client.class);
        job.setMapperClass(SampleMapper.class);
        job.setReducerClass(SampleReducer.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(MapWritable.class);
        job.setOutputValueClass(DoubleWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(SampleOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("input"));
        FileOutputFormat.setOutputPath(job, new Path("output"));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

SampleMapper.java

import java.util.StringTokenizer;

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.NullWritable;

public class SampleMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context ctx) {
        try {
            StringTokenizer tokens = new StringTokenizer(value.toString(), ",");

            while(tokens.hasMoreTokens()) {
                String user = tokens.nextToken();
                tokens.nextToken();
                tokens.nextToken();

                ctx.write(new LongWritable(Long.valueOf(user)), NullWritable.get());
            }
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
}

SampleReducer.java

import java.io.IOException;
import java.io.File;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;

import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel;
import org.apache.mahout.cf.taste.impl.similarity.LogLikelihoodSimilarity;
import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
import org.apache.mahout.cf.taste.common.TasteException;

public class SampleReducer extends Reducer<LongWritable, NullWritable, MapWritable, DoubleWritable> {

    private DataModel model;
    private LogLikelihoodSimilarity similar;

    @Override
    protected void setup(Context ctx) throws IOException {
        Configuration conf = ctx.getConfiguration();
        Path path = new Path("input/data.txt");
        FileSystem fs = FileSystem.get(path.toUri(), conf);
        File tmp = File.createTempFile("mahout", "txt");
        tmp.deleteOnExit();

        fs.copyToLocalFile(path, new Path(tmp.getAbsolutePath()));

        model = new FileDataModel(tmp);
        similar = new LogLikelihoodSimilarity(model);
    }

    @Override
    protected void reduce(LongWritable key, Iterable<NullWritable> values, Context ctx) {
        long user = key.get();

        try {
            LongPrimitiveIterator targets = model.getUserIDs();

            while(targets.hasNext()) {
                long target = targets.nextLong();

                if(user == target) {
                    continue;
                }

                Double distance = similar.userSimilarity(user, target);

                if(!distance.isNaN()) {
                    MapWritable w = new MapWritable();
                    w.put(new Text("user"), new LongWritable(user));
                    w.put(new Text("target"), new LongWritable(target));

                    ctx.write(w, new DoubleWritable(distance));
                }
            }
        } catch(TasteException e) {
            e.printStackTrace();
        } catch(IOException e) {
            e.printStackTrace();
        } catch(InterruptedException e) {
            e.printStackTrace();
        }
    }
}

SampleOutputFormat.class

import java.io.IOException;
import java.io.DataOutputStream;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;

public class SampleOutputFormat extends TextOutputFormat<MapWritable, DoubleWritable> {
    @Override
    public RecordWriter<MapWritable, DoubleWritable> getRecordWriter(TaskAttemptContext ctx) throws IOException,  InterruptedException {
        Configuration conf = ctx.getConfiguration();
        Path path = getDefaultWorkFile(ctx, ".txt");
        FileSystem fs = path.getFileSystem(conf);
        FSDataOutputStream out = fs.create(path, true);

        return new SampleRecordWriter(out);
    }

    private static class SampleRecordWriter extends LineRecordWriter<MapWritable, DoubleWritable> {

        public SampleRecordWriter(DataOutputStream out) {
            super(out);

            try {
                out.write("user\ttarget\tscore\n".getBytes());
            } catch(IOException e) {
                e.printStackTrace();
            }
        }

        public synchronized void write(MapWritable key,DoubleWritable value) {
            long user = ((LongWritable)key.get(new Text("user"))).get();
            long target = ((LongWritable)key.get(new Text("target"))).get();
            double distance = value.get();

            try {
                out.write(
                    String.format(
                        "%d\t%d\t%s\n",
                        user,
                        target,
                        String.valueOf(distance)
                    ).getBytes()
                );
            } catch(IOException e) {
                e.printStackTrace();
            }
        }

        public synchronized void close(TaskAttemptContext ctx) throws IOException {
            super.close(null);
        }
    }
}