Using hash-based grouping for data aggregation

This tutorial guides you through the process of using hash-based grouping to implement aggregation in the reducer class code.

About this task

When hash-based aggregation is specified to a user-overridden reducer class (set as pmr.framework.aggregation=none), rather than to the MapReduce framework, <key, value> pairs passed to the reducer are out of order and all values for one key end up in the same reducer shard, but not necessarily in the same reduce() call. This tutorial relates only to the setting when the pmr.framework.aggregation parameter is set to none. Other values for pmr.framework.aggregation are sort and hash; this tutorial does not apply to these settings.

The hash-based aggregation sample consists of two MapReduce programs:
  • WordCountNewApiAggr: A Java executable to implement data aggregation through the new Hadoop API. The new Hadoop API is included in the org.apache.hadoop.mapreduce package and uses the org.apache.hadoop.mapreduce.Job class to submit jobs.
  • WordCountOldApiAggr: A Java executable to implement data aggregation through the old Hadoop API. Because the WordCount sample included with Hadoop versions 1.0.1 and 1.1.1, which are the only versions supported for this sample, uses the new Hadoop API, this sample enables hash-based aggregation for MapReduce applications submitted through the old Hadoop API. The old Hadoop API is included in the org.apache.hadoop.mapred package and uses the org.apache.hadoop.mapred.JobClient class to submit jobs.
Note: This sample applies to the MapReduce framework and is supported only on Linux 64-bit hosts.
In this tutorial, you will complete these tasks:
  1. Build the sample
  2. Run the sample
  3. Walk through the code

Procedure

  1. Build the sample.
    Note: This sample only supports the following Hadoop API versions: 1.0.1 and 1.1.1.

    Build the pmr-aggregration-examples.jar file in your IBM® Spectrum Symphony Developer Edition environment:

    1. Change to the root directory under the directory in which you installed IBM Spectrum Symphony Developer Edition. For example, if you used the default installation directory, change to the opt/ibm/platformsymphonyde/de/de732 directory.
    2. Set the environment:
      • (csh) source cshrc.platform
      • (bash) . profile.platform
    3. Change to the $SOAM_HOME/mapreduce/version/samples/HashAggregration directory and run the make command.
      make
      
      The sample is compiled with the Hadoop 1.1.1 API and the pmr-aggregration-examples.jar file is created in the following directory:

      $SOAM_HOME/mapreduce/version/samples/HashAggregation

  2. Run the sample.

    Before running the sample, ensure that the pmr.framework.aggregation parameter is set to none in the $PMR_HOME/conf/pmr-site.xml configuration file or in the job submission command.

    1. To run the sample through the new Hadoop API, use the following syntax:

      $ mrsh jar $PMR_HOME/version/os_type/samples/pmr-aggregation-examples.jar com.platform.mapreduce.examples.hashaggregation.WordCountNewApiAggr -Dpmr.framework.aggregation=none Dmapred.reduce.tasks=value -Dmapred.compress.map.output=value -Dmapred.mapoutput=value -Dmapred.map.output.compression.codec=value input_path output_path

      For example:

      $ mrsh jar $SOAM_HOME/mapreduce/version/linux2.6-glibc2.3-x86_64/samples/pmr-aggregation-examples.jar com.platform.mapreduce.examples.hashaggregation.WordCountNewApiAggr -Dpmr.framework.aggregation=none -Dmapred.reduce.tasks=20 -Dmapred.compress.map.output=true -Dmapred.mapoutput=true -Dmapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec input output

    2. To run the sample through the old Hadoop API, use the following syntax:

      $ mrsh jar $PMR_HOME/version/os_type/samples/pmr-aggregation-examples.jar com.platform.mapreduce.examples.hashaggregation.WordCountOldApiAggr -Dpmr.framework.aggregation=none Dmapred.reduce.tasks=value -Dmapred.compress.map.output=value -Dmapred.mapoutput=value -Dmapred.map.output.compression.codec=value input_path output_path

      For example:

      $ mrsh jar $SOAM_HOME/mapreduce/version2/linux2.6-glibc2.3-x86_64/samples/pmr-aggregation-examples.jar com.platform.mapreduce.examples.hashaggregation.WordCountOldApiAggr -Dpmr.framework.aggregation=none -Dmapred.reduce.tasks=2 -Dmapred.compress.map.output=true -Dmapred.mapoutput=true -Dmapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec input output

  3. Walk through the code.
    1. Locate the code samples:
      Table 1. Code samples
      Operating System File Location of code sample
      IBM Spectrum Symphony Developer Edition:
      Linux 64-bit hosts WordCountNewApiAggr.java $SOAM_HOME/mapreduce/version/samples/HashAggregation/com/platform/mapreduce/examples/hashaggregation/WordCountNewApiAggr.java
        WordCountOldApiAggr.java $SOAM_HOME/mapreduce/version/samples/HashAggregation/com/platform/mapreduce/examples/hashaggregation/WordCountOldApiAggr.java
      IBM Spectrum Symphony:
      Linux 64-bit hosts pmr-aggregation-examples.jar $SOAM_HOME/mapreduce/version/os_type/samples/
    2. Understand what the sample does.

      The hash-based aggregation sample implements the WordCount sample for data aggregated in the reducer class code. If you set pmr.framework.aggregation=none in the $PMR_HOME/conf/pmr-site.xml configuration file or in the job submission command, <key, value> pairs passed to the reducer are out of order and all values for one key end up in the same reducer shard.

      In this simple program, the reduce function aggregates the data of every partition or subpartition (if pmr.subpartition.num is set to be greater than the number of reduce tasks number and subpartitions are available) by using a HashTable. It does not emit <key, value> pairs in the reduce() function, but hold the pair in the HashTable, then emits all aggregated <key, value> pairs in the HashTable in the run() function after a partition or subpartition ends.

      Some initial work for every partition or subpartition (if pmr.subpartition.num is set to be greater than the number of reduce tasks and subpartitions are available) is done in the setup() function. Some cleanup work for every partition or subpartition is done in the cleanup() function.

      The following is the pseudo-code for the new Hadoop API:
      Reducer {
      		HashMap<Text,IntWritable> wordMap = new HashMap<Text,IntWritable>();
         	Reducer::setup(){
          Initialization;
              	}
      	/**
         	    * This method is called once for each key. Most applications will define
         	    * their reduce class by overriding this method. The default implementation
         	    * is an identity function.
         	    */
         	       @SuppressWarnings("unchecked")
             protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context 
                              ) throws IOException, InterruptedException {
           	for(VALUEIN value: values) {
           	// Add value in the hash table
           	   wordMap[key] += value;
            	// Do not emit the result to context yet because more values may be in progress for the same key.
                   	}
           }
      
        	/**
         	  * Advanced application writers can use the 
            * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
            * control how the reduce task works.
            */
        	  public void run(Context context) throws IOException, InterruptedException {
             setup(context);
      	   	// Process key-value pairs such as (ab, 2) (ab, 1) (aa, 1) (ab, 1) (ac, 1) in one subpartition
      			while (context.nextKey() is processed in the partition) {
            		// Process one key-value pair
        	   reduce(context.getCurrentKey(), context.getValues(), context);
      	     } 
      
         	    // Finished a subpartition, flush out the processed results.
             For (all K-V pairs in wordMap) {
               context.write((KEYOUT) key, (VALUEOUT) value);  //emit output here after all input K-V pairs of
           this sub-partition are processed
              }
             // Clear the hash table to free up memory.
                    cleanup(context);
                  }
      For the old Hadoop API, we provide a new Interface org.apache.hadoop.mapred.oldReducerExtend, which extends the current org.apache.hadoop.mapred.Reducer.
       {
       void reduce(K2 key, Iterator<V2> values,
                     OutputCollector<K3,V3> output, Reporter reporter);
         void run(org.apache.hadoop.mapreduce.ReduceContext);
       }
      Note: If you want to enable hash-based aggregation for a MapReduce application submitted through the old Hadoop API, implement the reducer class from the org.apache.hadoop.mapred.oldReducerExtend interface and then implement the reduce class logic according to the new API example. WordCountOldApiAggr.java implements it for the WordCount sample.