Hadoop+Cassandra (2) - ReducerでCassandra -

2012-04-07T00:00:00+09:00 Cassandra Hadoop Java

前回はCassandraなデータを使うMapperを作って、HDFS上にMapReduce結果を出したので今回はその逆をやってみる。HDFSにインプットデータを用意して、それをMapReduceで処理後CassandraのColumn Familyにプッシュしてみる

Hadoop側の設定

適当に

hoge
fuga
foobar
hoge
hoge

なデータ(data/data.txt)をinputとしてプッシュしておく

hadoop fs -put data input

Cassandra側の設定

Outputする側のColumn Familyを作っておく

create column family Outputs with default_validation_class = UTF8Type and key_validation_class = UTF8Type and comparator = UTF8Type

HadoopClient2.java

package sample;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;

import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import sample.mapreduce.SampleCassandraReducer;
import sample.mapreduce.SampleMapper;

public class HadoopClient2 extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new HadoopClient2(), args);
    }

    @Override
    public int run(String[] arg0) throws Exception {
        JobConf conf = new JobConf();
        conf.setJar("sample.jar");

        ConfigHelper.setInitialAddress(conf, "127.0.0.1");
        ConfigHelper.setRpcPort(conf, "9160");
        ConfigHelper.setOutputColumnFamily(conf, "Keyspace1", "Outputs");
        ConfigHelper.setPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner");

        SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes("name")));
        ConfigHelper.setInputSlicePredicate(conf, predicate);

        Job job = new Job(conf);
        job.setMapperClass(SampleMapper.class);
        job.setReducerClass(SampleCassandraReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(ByteBuffer.class);
        job.setOutputValueClass(List.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(ColumnFamilyOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("input"));

        return job.waitForCompletion(true) ? 0 : 1;
    }
}

今回はCassandraのデータを読み込まないで、アウトプット先がCassandraなのでConfigHelper.setOutputColumnFamilyで設定。でsetPartitionerは設定しておく必要がある(TokenRing関係っぽい) 。あとsetOutputFormatClass(ColumnFamilyOutputFormat.class)も

であとはSampleMapperとSampleCassandraReducerを作る

SampleMapper.java

package sample.mapreduce;

import java.io.IOException;

import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;

public class SampleMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    public void map(LongWritable key, Text value, Context ctx) throws IOException, InterruptedException {
        ctx.write(value, new IntWritable(1));
    }
}

SampleCassandraReducer.java

package sample.mapreduce;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SampleCassandraReducer extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation> >{
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, InterruptedException {
        int sum = 0;

        for (IntWritable value : values) {
            sum += value.get();
        }

        Column c = new Column();
        c.setName(Arrays.copyOf(key.getBytes(), key.getLength()));
        c.setValue(ByteBufferUtil.bytes(String.valueOf(sum)));
        c.setTimestamp(System.currentTimeMillis());

        Mutation m = new Mutation();
        m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
        m.column_or_supercolumn.setColumn(c);

        ctx.write(ByteBufferUtil.bytes("count"),Collections.singletonList(m));
    }
}

って感じ。実行したらcassandra-cliで

use Keyspace1;

list Outputs; # MapReduceで処理された結果一覧が出る

Hadoop+Cassandra (3) - Mapper & ReducerでCassandra - Hadoop+Cassandra (1) - MapperでCassandra -