Hadoop Pipesを使ってみる

MapReduceプログラムを作成

#include "hadoop/Pipes.hh"
#include "hadoop/TemplateFactory.hh"
#include "hadoop/StringUtils.hh"

using namespace std;

class SampleMapper : public HadoopPipes::Mapper {
    public:
        SampleMapper(HadoopPipes::TaskContext ctx) {
        }

        void map(HadoopPipes::MapContext ctx) {
            vector<string> words = HadoopUtils::splitString(ctx.getInputValue(), " ");

            for (unsigned int i = 0; i < words.size(); ++i) {
                ctx.emit(words[i], "1");
            }
        }
};

class SampleReducer : public HadoopPipes::Reducer {
    public:
        SampleReducer(HadoopPipes::TaskContext ctx) {
        }

        void reduce(HadoopPipes::ReduceContext ctx) {
            int sum = 0;

            while (ctx.nextValue()) {
                sum += HadoopUtils::toInt(ctx.getInputValue());
            }

            ctx.emit(ctx.getInputKey(), HadoopUtils::toString(sum));
        }
};

int main(int argc, char* argv[]) {
    return HadoopPipes::runTask(HadoopPipes::TemplateFactory<SampleMapper, SampleReducer>());
}

作成後

g++ Sample.cc -O2 -Wall -lpthread -lssl -lhadooppipes -lhadooputils -o sample

でコンパイル

設定ファイルを作成

<?xml version="1.0" ?>
<configuration>
    <property>
        <name>hadoop.pipes.executable</name>
        <value>sample</value>
    </property>
    <property>
        <name>hadoop.pipes.java.recordreader</name>
        <value>true</value>
    </property>
    <property>
        <name>hadoop.pipes.java.recordwriter</name>
        <value>true</value>
    </property>
</configuration>

必要なファイルをアップロード

# データファイルをアップロード
hadoop fs -put data input

# Pipes実行ファイルをアップロード
hadoop fs -put sample sample

実行

hadoop pipes -conf sample.xml -input input -output output

実行し完了すると、HDFSサイト上にoutputディレクトリが作成され処理データがそこに格納される