HadoopとCassandra1.1

2012-09-21T00:00:00+00:00 Cassandra Hadoop Java

何やらorg.apache.cassandra.hadoopサポートなAPIの使い方が変わったようなので、ちょっと色々検証を兼ねてやってみた

本題

で結局何が変わったのかっていうと。主な変化はConfigHelperが多そうな感じ。っていうか多分1.0.x系でのHadoop SupportなAPIだとまずコンパイルエラーになるので最終的に修正するのはCanssandraなMapReduceを呼び出す側を修正しないといけない

Mapperだけな場合 (Cassanra -> HDFS)

package sample;

import java.util.Arrays;

import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import sample.mapreduce.SampleReducer;
import sample.mapreduce.SampleCassandraMapper;

public class HadoopClient2 extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new HadoopClient2(), args);
    }

    @Override
    public int run(String[] arg0) throws Exception {
        JobConf conf = new JobConf();
        conf.setJar("sample.jar");

        ConfigHelper.setInputInitialAddress(conf, "127.0.0.1");
        ConfigHelper.setInputRpcPort(conf, "9160");
        ConfigHelper.setInputColumnFamily(conf, "sample", "Sample");
        ConfigHelper.setInputPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner");

        SlicePredicate predicate = new SlicePredicate().setColumn_names(
            Arrays.asList(ByteBufferUtil.bytes("name"))
        );
        ConfigHelper.setInputSlicePredicate(conf, predicate);

        Job job = new Job(conf);
        job.setMapperClass(SampleCassandraMapper.class);
        job.setReducerClass(SampleReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setInputFormatClass(ColumnFamilyInputFormat.class);

        FileOutputFormat.setOutputPath(job, new Path("output"));

        return job.waitForCompletion(true) ? 0 : 1;
    }
}

ConfigHelperのInitialAddress等がInputやOutputなプレフィックスが付くようになってます

Reducerだけな場合 (HDFS -> Cassandra)

package sample;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;

import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import sample.mapreduce.SampleCassandraReducer;
import sample.mapreduce.SampleMapper;

public class HadoopClient3 extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new HadoopClient3(), args);
    }

    @Override
    public int run(String[] arg0) throws Exception {
        JobConf conf = new JobConf();
        conf.setJar("sample.jar");

        ConfigHelper.setOutputInitialAddress(conf, "127.0.0.1");
        ConfigHelper.setOutputRpcPort(conf, "9160");
        ConfigHelper.setOutputColumnFamily(conf, "sample", "Outputs");
        ConfigHelper.setOutputPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner");

        SlicePredicate predicate = new SlicePredicate().setColumn_names(
            Arrays.asList(ByteBufferUtil.bytes("name"))
        );
        ConfigHelper.setInputSlicePredicate(conf, predicate);

        Job job = new Job(conf);
        job.setMapperClass(SampleMapper.class);
        job.setReducerClass(SampleCassandraReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(ByteBuffer.class);
        job.setOutputValueClass(List.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(ColumnFamilyOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("input"));

        return job.waitForCompletion(true) ? 0 : 1;
    }
}

これもMapperなだけの場合と同様にConfigHelperのメソッドを修正する。ただしこっちはReducerからCassandraに書きこみをするのでOutputなプレフィックスなメソッドを使う

Mapper & Reducer (Cassandra -> Cassandra)

package sample;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;

import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import sample.mapreduce.SampleCassandraMapper;
import sample.mapreduce.SampleCassandraReducer;

public class HadoopClient1 extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new HadoopClient1(), args);
    }

    @Override
    public int run(String[] arg0) throws Exception {
        JobConf conf = new JobConf();
        conf.setJar("sample.jar");

        ConfigHelper.setInputInitialAddress(conf, "127.0.0.1");
        ConfigHelper.setInputRpcPort(conf, "9160");
        ConfigHelper.setInputColumnFamily(conf, "sample", "Sample");
        ConfigHelper.setInputPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner");

        ConfigHelper.setOutputInitialAddress(conf, "127.0.0.1");
        ConfigHelper.setOutputRpcPort(conf, "9160");
        ConfigHelper.setOutputColumnFamily(conf, "sample", "Outputs");
        ConfigHelper.setOutputPartitioner(conf,  "org.apache.cassandra.dht.RandomPartitioner");

        SlicePredicate predicate = new SlicePredicate().setColumn_names(
            Arrays.asList(ByteBufferUtil.bytes("name"))
        );
        ConfigHelper.setInputSlicePredicate(conf, predicate);

        Job job = new Job(conf);
        job.setMapperClass(SampleCassandraMapper.class);
        job.setReducerClass(SampleCassandraReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setInputFormatClass(ColumnFamilyInputFormat.class);
        job.setOutputFormatClass(ColumnFamilyOutputFormat.class);

        job.setOutputKeyClass(ByteBuffer.class);
        job.setOutputValueClass(List.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }
}

Hectorを使ってみる (2) - Object Mapper - Hectorを使ってみる (1)