Configuring the number of sub-partitions for a reduce task

Without hash table-based aggregation, the number of partitions equals the number of reduce tasks, with the reduce task requiring the map output only for its particular partition from several map tasks across the cluster. When hash table-based aggregation is enabled, the framework splits partitions into sub-partitions, with each reduce task handling multiple sub-partitions for each map task output.

About this task

Configure the number of sub-partitions that must be split for the output of each map task through the pmr.subpartition.num parameter. Creating sub-partitions avoids memory issues, when a reduce task could use too much memory if it emits outputs only at the end of a partition. With this parameter, the sub-partition becomes more fine-grained than the partition, so that the reduce task emits outputs at the end of each sub-partition within a partition.

Use this parameter along with the pmr.framework.aggregation parameter.
  • To avoid memory issues, set pmr.subpartition.num to be greater than the number of tasks a reducer can run (mapred.reduce.num) and set pmr.framework.aggregation to hash or none.
  • If you do not have memory concerns, set pmr.subpartition.num to equal mapred.reduce.num and set pmr.framework.aggregation to hash or none.

Assuming pmr.subpartition.num is N and mapred.reduce.num is M, each reduce task gets at least N/M sub-partitions. Front N%M reduce tasks get an extra sub-partition. For example, if pmr.subpartition.num is 8 and mapred.reduce.num is 3, reduce task 1 gets sub-partitions 0, 1, and 2. Reduce task 2 gets sub-partitions 3, 4, and 5. Reduce task 3 gets only sub-partitions 6 and 7.

If you do not specify a value for this property, the number of sub-partitions by default equals the number of reduce tasks.
Note: The number of subpartitions must not be less than the number of reduce tasks. If the number of subpartitions is configured to be less than the number of reduce tasks, the framework assigns the number of subpartitions to equal the number of reduce tasks.
To derive the most performance gains, ensure that you set the following configuration and follow any recommendations:
  • Enable services to be pre-started through the preStartApplication element in the Consumer section of the application profile.
  • If you do not want strict data sorting, set pmr.framework.aggregation=hash. If you want to implement aggregation in your reducer class code, set pmr.framework.aggregation=none.
  • When pmr.framework.aggregation is set to hash or none, configuring pmr.subpartition.num to be greater than mapred.reduce.tasks avoids out-of-memory issues. For a balanced workload, we recommend that you configure (pmr.subpartition.num=X*mapred.reduce.tasks).

    Determine the value of pmr.subpartition.num according to the ratio between InputSize and MapOutputSize, maximum input size, and number of reduce tasks. For example, if the ratio of map input to intermediate data is 1:2 (1 MB input generates 2 MB of intermediate data), the maximum input size is 55 MB, and the total number of input files is 100, the total size is 2*55MB*100. In this case, we would split 11000 MB into sub-partitions to fit the reducer memory limit.

Procedure

  1. To configure the number of subpartitions for the output of each map task within a job from the mrsh utility:
    1. Add the pmr.subpartition.num option to your job submission command.

      $ mrsh jar jarfile [classname] -Dpmr.framework.aggregation=value [args], where value is greater than or equal to mapred.reduce.num.

  2. To configure the number of sub-partitions for the output of each map task for all jobs submitted from a host:
    1. Open the pmr-site.xml configuration file located in the $PMR_HOME/conf directory.
    2. Add the pmr.subpartition.num property with a value greater than or equal to mapred.reduce.num.
      For example:
      <property>
        <name>pmr.subpartition.num</name>
        <value>20</value>
      </property>
      
    3. Save the file.