Quellcode für das WordCount
In diesem Beispiel zerlegt die Map-Funktion den Eingabetext in Wörter und gibt <Wort, "1">-Paare für alle Vorkommen jedes Wortes aus, auf das sie trifft.
Die Funktion reduce summiert dann alle für das Wort ausgegebenen Zählungen. Der Code zur Ausführung des Beispiels lautet wie folgt:
import java.io.*;
import java.util.*;
import org.netezza.inza.mr.conf.*;
import org.netezza.inza.mr.io.*;
import org.netezza.inza.mr.mapreduce.*;
import org.netezza.inza.mr.util.*;
public class WordCount extends Configured implements Tool {
public static class Map extends
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
public static class Reduce extends
Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException,
InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
public int run(String[] args) throws Exception {
if (args.length != 7) {
System.out.printf("Usage: %s <db> " +
"<input_table> <input_key_column>
<input_value_column>" +
"<output_table> <output_key_column>
<output_value_column>\n",
getClass().getSimpleName());
return -1;
}
Configuration conf = getConf();
final int MAX_WORD_LENGTH = conf.getInt("max.word.length", 100);
Job job = new Job(conf, "wordcount");
job.setJarByClass(getClass());
job.setDatabaseName(args[0]);
job.setInputTableName(args[1]);
job.setInputKeyColumnNames(args[2]);
job.setInputValueColumnNames(args[3]);
job.setOutputTableName(args[4]);
job.setOutputKeyColumnNames(args[5]);
job.setOutputValueColumnNames(args[6]);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapInputKeyClass(LongWritable.class);
job.setMapInputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputKeyColumnSize(0, MAX_WORD_LENGTH);
job.setMapOutputValueClass(IntWritable.class);
job.setReduceOutputKeyClass(Text.class);
job.setReduceOutputKeyColumnSize(0, MAX_WORD_LENGTH);
job.setReduceOutputValueClass(IntWritable.class);
boolean success = JobRunner.runJob(job);
return success ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new WordCount(), args);
System.exit(ret);
}
}