Hadoop+Cassandra (2) - ReducerでCassandra -
前回はCassandraなデータを使うMapperを作って、HDFS上にMapReduce結果を出したので今回はその逆をやってみる。HDFSにインプットデータを用意して、それをMapReduceで処理後CassandraのColumn Familyにプッシュしてみる
Hadoop側の設定
適当に
hoge fuga foobar hoge hoge
なデータ(data/data.txt)をinputとしてプッシュしておく
hadoop fs -put data input
Cassandra側の設定
Outputする側のColumn Familyを作っておく
create column family Outputs with default_validation_class = UTF8Type and key_validation_class = UTF8Type and comparator = UTF8Type
HadoopClient2.java
package sample;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import sample.mapreduce.SampleCassandraReducer;
import sample.mapreduce.SampleMapper;
public class HadoopClient2 extends Configured implements Tool {
public static void main(String[] args) throws Exception {
ToolRunner.run(new Configuration(), new HadoopClient2(), args);
}
@Override
public int run(String[] arg0) throws Exception {
JobConf conf = new JobConf();
conf.setJar("sample.jar");
ConfigHelper.setInitialAddress(conf, "127.0.0.1");
ConfigHelper.setRpcPort(conf, "9160");
ConfigHelper.setOutputColumnFamily(conf, "Keyspace1", "Outputs");
ConfigHelper.setPartitioner(conf, "org.apache.cassandra.dht.RandomPartitioner");
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes("name")));
ConfigHelper.setInputSlicePredicate(conf, predicate);
Job job = new Job(conf);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleCassandraReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(ByteBuffer.class);
job.setOutputValueClass(List.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path("input"));
return job.waitForCompletion(true) ? 0 : 1;
}
}
今回はCassandraのデータを読み込まないで、アウトプット先がCassandraなのでConfigHelper.setOutputColumnFamilyで設定。でsetPartitionerは設定しておく必要がある(TokenRing関係っぽい) 。あとsetOutputFormatClass(ColumnFamilyOutputFormat.class)も
であとはSampleMapperとSampleCassandraReducerを作る
SampleMapper.java
package sample.mapreduce;
import java.io.IOException;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
public class SampleMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable key, Text value, Context ctx) throws IOException, InterruptedException {
ctx.write(value, new IntWritable(1));
}
}
SampleCassandraReducer.java
package sample.mapreduce;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SampleCassandraReducer extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation> >{
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context ctx) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
Column c = new Column();
c.setName(Arrays.copyOf(key.getBytes(), key.getLength()));
c.setValue(ByteBufferUtil.bytes(String.valueOf(sum)));
c.setTimestamp(System.currentTimeMillis());
Mutation m = new Mutation();
m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
m.column_or_supercolumn.setColumn(c);
ctx.write(ByteBufferUtil.bytes("count"),Collections.singletonList(m));
}
}
って感じ。実行したらcassandra-cliで
use Keyspace1;
list Outputs; # MapReduceで処理された結果一覧が出る