HadoopとCassandra1.1
何やらorg.apache.cassandra.hadoopサポートなAPIの使い方が変わったようなので、ちょっと色々検証を兼ねてやってみた
本題
で結局何が変わったのかっていうと。主な変化はConfigHelperが多そうな感じ。っていうか多分1.0.x系でのHadoop SupportなAPIだとまずコンパイルエラーになるので最終的に修正するのはCanssandraなMapReduceを呼び出す側を修正しないといけない
Mapperだけな場合 (Cassanra -> HDFS)
package sample;
import java.util.Arrays;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import sample.mapreduce.SampleReducer;
import sample.mapreduce.SampleCassandraMapper;
public class HadoopClient2 extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new HadoopClient2(), args);
}
@Override
public int run(String[] arg0) throws Exception {
JobConf conf = new JobConf();
conf.setJar("sample.jar");
ConfigHelper.setInputInitialAddress(conf, "127.0.0.1");
ConfigHelper.setInputRpcPort(conf, "9160");
ConfigHelper.setInputColumnFamily(conf, "sample", "Sample");
ConfigHelper.setInputPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner");
SlicePredicate predicate = new SlicePredicate().setColumn_names(
Arrays.asList(ByteBufferUtil.bytes("name"))
);
ConfigHelper.setInputSlicePredicate(conf, predicate);
Job job = new Job(conf);
job.setMapperClass(SampleCassandraMapper.class);
job.setReducerClass(SampleReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(ColumnFamilyInputFormat.class);
FileOutputFormat.setOutputPath(job, new Path("output"));
return job.waitForCompletion(true) ? 0 : 1;
}
}
ConfigHelperのInitialAddress等がInputやOutputなプレフィックスが付くようになってます
Reducerだけな場合 (HDFS -> Cassandra)
package sample;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import sample.mapreduce.SampleCassandraReducer;
import sample.mapreduce.SampleMapper;
public class HadoopClient3 extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new HadoopClient3(), args);
}
@Override
public int run(String[] arg0) throws Exception {
JobConf conf = new JobConf();
conf.setJar("sample.jar");
ConfigHelper.setOutputInitialAddress(conf, "127.0.0.1");
ConfigHelper.setOutputRpcPort(conf, "9160");
ConfigHelper.setOutputColumnFamily(conf, "sample", "Outputs");
ConfigHelper.setOutputPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner");
SlicePredicate predicate = new SlicePredicate().setColumn_names(
Arrays.asList(ByteBufferUtil.bytes("name"))
);
ConfigHelper.setInputSlicePredicate(conf, predicate);
Job job = new Job(conf);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleCassandraReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(ByteBuffer.class);
job.setOutputValueClass(List.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("input"));
return job.waitForCompletion(true) ? 0 : 1;
}
}
これもMapperなだけの場合と同様にConfigHelperのメソッドを修正する。ただしこっちはReducerからCassandraに書きこみをするのでOutputなプレフィックスなメソッドを使う
Mapper & Reducer (Cassandra -> Cassandra)
package sample;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import sample.mapreduce.SampleCassandraMapper;
import sample.mapreduce.SampleCassandraReducer;
public class HadoopClient1 extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new HadoopClient1(), args);
}
@Override
public int run(String[] arg0) throws Exception {
JobConf conf = new JobConf();
conf.setJar("sample.jar");
ConfigHelper.setInputInitialAddress(conf, "127.0.0.1");
ConfigHelper.setInputRpcPort(conf, "9160");
ConfigHelper.setInputColumnFamily(conf, "sample", "Sample");
ConfigHelper.setInputPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner");
ConfigHelper.setOutputInitialAddress(conf, "127.0.0.1");
ConfigHelper.setOutputRpcPort(conf, "9160");
ConfigHelper.setOutputColumnFamily(conf, "sample", "Outputs");
ConfigHelper.setOutputPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner");
SlicePredicate predicate = new SlicePredicate().setColumn_names(
Arrays.asList(ByteBufferUtil.bytes("name"))
);
ConfigHelper.setInputSlicePredicate(conf, predicate);
Job job = new Job(conf);
job.setMapperClass(SampleCassandraMapper.class);
job.setReducerClass(SampleCassandraReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setInputFormatClass(ColumnFamilyInputFormat.class);
job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
job.setOutputKeyClass(ByteBuffer.class);
job.setOutputValueClass(List.class);
return job.waitForCompletion(true) ? 0 : 1;
}
}