Source code for the WordCount example
In this example, the map function splits the input text into words and emits <word, "1"> pairs for all occurrences of each word it encounters.
The reduce function then sums all counts emitted for the word. The code to execute the example is
as
follows:
import java.io.*;
import java.util.*;
import org.netezza.inza.mr.conf.*;
import org.netezza.inza.mr.io.*;
import org.netezza.inza.mr.mapreduce.*;
import org.netezza.inza.mr.util.*;
public class WordCount extends Configured implements Tool {
public static class Map extends
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException,
InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public int run(String[] args) throws Exception {
if (args.length != 7) {
System.out.printf("Usage: %s <db> " +
"<input_table> <input_key_column>
<input_value_column>" +
"<output_table> <output_key_column>
<output_value_column>\n",
getClass().getSimpleName());
return -1;
}
Configuration conf = getConf();
final int MAX_WORD_LENGTH = conf.getInt("max.word.length", 100);
Job job = new Job(conf, "wordcount");
job.setJarByClass(getClass());
job.setDatabaseName(args[0]);
job.setInputTableName(args[1]);
job.setInputKeyColumnNames(args[2]);
job.setInputValueColumnNames(args[3]);
job.setOutputTableName(args[4]);
job.setOutputKeyColumnNames(args[5]);
job.setOutputValueColumnNames(args[6]);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapInputKeyClass(LongWritable.class);
job.setMapInputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputKeyColumnSize(0, MAX_WORD_LENGTH);
job.setMapOutputValueClass(IntWritable.class);
job.setReduceOutputKeyClass(Text.class);
job.setReduceOutputKeyColumnSize(0, MAX_WORD_LENGTH);
job.setReduceOutputValueClass(IntWritable.class);
boolean success = JobRunner.runJob(job);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new WordCount(), args);
System.exit(ret);
}
}