Concurrent merge

During the reduce phase of a MapReduce job, a reduce task's copiers fetch all map outputs for its particular partition in parallel. The map outputs are copied and decompressed to MRSS memory if the output is small; otherwise, the output is written directly to disk. When the in-memory buffer reaches a threshold size or reaches a threshold number of map outputs, the map output is merged and compressed to disk. As the copies accumulate on disk, a merger thread decompresses and merges the output. It then compresses and writes the output to larger sorted files, during what is known as the intermediate merge. When all the map outputs are copied, the reduce task decompresses and merges the map outputs, maintaining their sort ordering, during the final merge. The results are then directly fed into the application reduce function.

When you configure concurrent file-to-file merge, instead of a single thread, multiple merger threads decompress and merge the map output, enabling each merge thread to work on a subset of segments or files to speed up the file-to-file merge. This is done both during the intermediate and final merges.

To derive the most performance gains, ensure that you set the following configuration before you enable concurrent file-to-file merge:
  • Multiple disks: Reducer competition can reduce performance substantially for multi-threading. One way to avoid this issue is to have multiple disks for the compute node. Hadoop configuration suggests one disk per core. We recommend the same setting (one disk per core) to configure multiple disks for both HDFS (specified by dfs.data.dir in hdfs-site.xml) and the MapReduce local data directory (specified by mapred.local.dir in mapred-site.xml).
  • Balanced number of reducers: The reduceLimitPerHost parameter specifies the maximum number of reducers that can run on a host. While this parameter restricts the maximum number of reducers on a host, the overall number of reducers can still exceed this limit, generating an imbalance in the number of reducers at each host. To ensure a balanced number of reducers, set the mapred.reduce.slowstart.completed.maps parameter to zero, and set the number of reducers to be no more than half the available slots on the compute host.