HadoopでMapReduceをやってみる

セットアップ

http://hadoop.apache.org から辿ってHadoopをインストール。JAVA_HOMEが無いと言われた場合にはconf/hadoop-env.shを修正する。んで以下のファイルを修正していく

core-site.xml

<?xml version="1.0" ?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>fs.default.name</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>

hdfs-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

今回はシングルモードなのでレプリケーションは1で良い

mapred-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>mapred.job.tracker</name>
        <value>localhost:9001</value>
    </property>
</configuration>

完了したらHadoop MapReduceで使用するサーバー類を起動(start-all.sh)。その後一度初期化する為

hadoop namenode -format

を実行しておく

データをアップロード

hadoop fs -put data input

を実行するとHadoop HDFS上に/user/ユーザー名/input/dataとしてアップロードされる。これをする事でマルチレプリケーション時には全てのサーバーに同期される

SampleMapperを作成

package sample;

import java.util.StringTokenizer;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;

public class SampleMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private final IntWritable one = new IntWritable(1);
    private final Text txt = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context ctx) {
        StringTokenizer tokens = new StringTokenizer(value.toString(), " ");

        try {
            while (tokens.hasMoreTokens()) {
                txt.set(((String)tokens.nextToken()).toLowerCase());
                ctx.write(txt, one);
            }
        } catch(IOException e) {
        } catch(InterruptedException e) {
        }
    }
}

SampleReducer.javaを作成

package sample;

import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;

public class SampleReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) {
        int sum = 0;

        try {
            for (IntWritable value : values) {
                sum += value.get();
            }

            ctx.write(key, new IntWritable(sum));
        } catch(IOException e) {
        } catch(InterruptedException e) {
        }
    }
}

SamplePartitioner.javaを作成

package sample;

import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;

public class SamplePartitioner extends Partitioner<Text, IntWritable> {
    public int getPartition(Text key, IntWritable value, int numPartitions) {
        if ((key.toString().length() + value.get()) % 2 == 0) {
            return 0;
        } else {
            return 1;
        }
    }
}

※必ず必要というわけではない

SampleCombiner.javaを作成

package sample;

import java.util.regex.Pattern;
import java.util.regex.Matcher;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;

public class SampleCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context ctx) {
        try {
            for(IntWritable value : values) {
                String str = key.toString();
                Pattern ptn = Pattern.compile("^[\\w]+$");
                Matcher match = ptn.matcher(str);

                if (str.length() <= 1 || !match.find()) {
                    continue;
                }

                ctx.write(key, value);
            }
        } catch(IOException e) {
        } catch(InterruptedException e) {
        }
    }
}

※必ず必要というわけでは無い

MapperからReducerに渡される前にフィルタする事ができる。条件に合うのを選別してReducerに渡す事ができる

Sample.java

package sample;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;

public class Sample extends Configured implements Tool {
    public static void main(String... args) throws Exception {
        int code = ToolRunner.run(new Sample(), args);
        System.exit(code);
    }
    public int run(String... args) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(new Cluster(conf), conf);
        job.setJarByClass(Sample.class);
        job.setMapperClass(SampleMapper.class);
        job.setCombinerClass(SampleCombiner.class);
        job.setReducerClass(SampleReducer.class);
        job.setPartitionerClass(SamplePartitioner.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("input"));
        FileOutputFormat.setOutputPath(job, new Path(;"output"));

        return job.waitForCompletion(true) ? 0 : 1;
    }
}
||<


これを作成後、コンパイルしclassファイルをjarにまとめる


#### 実行


```shell
hadoop jar sample.jar sample.Sample