MapReduce job properties in IBM® Spectrum Symphony

This section lists the job configuration properties that are supported within the MapReduce framework.

Make the properties take effect in any of the following ways:
  • For a single job:
    • From the mrsh utility, use the -D option during job submission.
    • From the cluster management console, use the Add button on the Submit Job window during job submission.
  • For all jobs submitted from the host, use the pmr-site.xml configuration file.

job.status.update.interval

Specifies the interval (in milliseconds) at which the job status is updated.

Default: 60000 (1 minute)

mapreduce.application.name

Specifies the name of the default MapReduce application.

Default: MapReduceversion

mapreduce.cleanup.log.level

Specifies the logging level for a cleanup task, which could be: OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, or ALL.

Default: INFO

mapreduce.client.resolve.hostname.for.datalocation

Specifies whether the name of a host on which input data splits are located must be resolved from IP address to short host name. Valid values are true or false.

Default: true

mapreduce.control.message.compress

Specifies whether data compression is enabled for a MapReduce job. Data compression is triggered when the size of serialized data is larger than the threshold for data compression, which is by default 1 KB. When data compression is enabled, any data compression attributes are applied to the input and output of map and reduce tasks.

Default: false

mapreduce.job.doas.submituser

Specifies whether map and reduce tasks associated with a job are to be run as the job submission user and requires the user to have read/write permission to HDFS.

Default: true

mapreduce.job.intermediatedata.checksum

Specifies whether a MapReduce job must use the CRC32 checksum to validate intermediate data for accidental errors during the shuffle stage. Valid values are true or false.

Default: true

mapreduce.job.local.dir.locator

Specifies how the MapReduce framework must choose the data location for temporary data, which is generated by the map and reduce task of a MapReduce job. Because the temporary data for a reducer is unknown, you can choose which disk to choose based on its size.

This parameter helps configure more than one data location, each with a different size, for intermediate data generated by map and reduce tasks, enabling the MapReduce framework to dynamically choose the location with the most available disk space.

Valid values for this parameter:
  • random: Specifies reducers to pick disks at random for storing intermediate data within the number of local disks.
  • max: Specifies reducers to pick disks that have the maximum space for storing intermediate data within the number of local disks.
  • roundrobin: Specifies mappers and reducers to pick disks through ordered scheduling for storing intermediate data at the job level within the number of local disks. The job ID is used to create unique sub directories on the local disks to store the intermediate data for each job.

Default: roundrobin

mapreduce.job.local.dir.margin

Specifies the ratio of the size of the directory to the size of the input split file in order to store temporary data which is generated by the map task of a MapReduce job. Valid values are between 2 and 100. If the folder size is x times more than the input split file size, that folder is used to store the map output files.

This parameter helps configure more than one data location, each with a different size, for intermediate data generated by map and reduce tasks, enabling the MapReduce framework to dynamically choose the location with the most available disk space.

Note: This property can only be used if mapreduce.job.local.dir.locator=max|random.

Default: 2

mapreduce.job.local.dir.shared

If the working directory for tasks is shared among compute hosts, this parameter specifies whether intermediate data is fetched locally, instead of through the shuffle service (mrss).

Default: false

mapreduce.job.login.password

Specifies the login password for the user who submits MapReduce jobs. If Kerberos security is enabled, do not store this value in this file.

Default: Guest, the password for the EGO consumer user.

mapreduce.job.login.user

Specifies the login for the user who submits MapReduce jobs. If Kerberos security is enabled, do not store this value in this file.

Default: Guest, the EGO consumer user.

mapred.job.priority.num

Specifies the scheduling priority for a MapReduce job. For Hadoop versions other than version 0.21.0, use mapreduce.job.priority.num.

Default: 5000

mapreduce.job.type

Specifies the type of MapReduce job. Valid values are RecoverableJob or UnrecoverableJob.

The job type can also be defined at the application level (specified as recoverable in the SessionType section of the application profile). If you define the job type in the application profile, the application profile setting overrides the pmr-site.xml setting.

A recoverable job can be used to preserve your workload under exceptional circumstances, such as a power failure or host failure. Recoverable jobs can incur extra overhead because the workload must be journaled. Specifying your jobs as recoverable may not be appropriate for all types of workload, since it can take less time to rerun all the tasks in the jobs rather than to recover and resume them. The time it takes to rerun or recover and resume tasks in a job varies with the data size and number of tasks.

Default: RecoverableJob

mapreduce.job.user.name

For internal system use only.

mapreduce.map.skip.commit.task

For internal system use only.

mapreduce.performance.log.enabled

For internal system use only.

mapreduce.progress.report.interval

Specifies the interval (in milliseconds) at which the progress of a job is reported to the cluster management console.

Default: 5000 (5 seconds)

mapreduce.reduce.merge.memtomem.enabled

Specifies whether memory-to-memory merge for internal data of a reduce task is enabled.

Default: false

mapreduce.reduce.merge.memtomem.threshold

Specifies the threshold when memory-to-memory merge for internal data of a reduce task is triggered.
Default:
  • Hadoop version 0.20.x, 1.0.x, and 1.1.1: io.sort.factor
  • Hadoop version 0.21.0: mapreduce.task.io.sort.factor

mapreduce.reduce.shuffle.connect.timeout

(For advanced users) Specifies the maximum duration (in milliseconds) that a reduce task tries connecting to the shuffle service (mrss) to fetch map output.

Default: 180000

Supported Hadoop versions:
  • 0.20.x, 1.0.x, and 1.1.1: mapreduce.reduce.shuffle.connect.timeout
  • 0.21.0: mapreduce.reduce.shuffle.connect.timeout

mapreduce.reduce.shuffle.merge.fetched.percent

Specifies the threshold at which an in-memory merge is initiated (expressed as a percentage of the total memory allocated which stored in-memory map outputs) as defined this value:
mapreduce.reduce.shuffle.merge.percent*mapred.job.shuffle.input.buffer.percent

Default: 0.8

mapreduce.submit.syncmode

Specifies whether the MapReduce job is submitted in synchronous or asynchronous mode.

Tasks associated with a MapReduce job are submitted to the application manager (SSM) in one of two ways: in synchronous mode or asynchronous mode. In synchronous mode, the job is aborted if the client disconnects from the application manager. In asynchronous mode, the job keeps on running even if the client disconnects from the application manager.

Note: When this parameter is set to true and abortSessionIfClientDisconnect (in the sessionType section of the application profile) is set to false, and if the client disconnects from the application manager, the job is not closed even if all tasks belonging to the job are complete. To avoid this issue, ensure that abortSessionIfClientDisconnect is set to true when submitting MapReduce jobs.

Default: true

mapred.task.maxvmem

Specifies the maximum virtual memory for a task. If the task's memory usage exceeds the limit, the task is killed. Parameters mapred.job.map.memory.mb and mapred.job.reduce.memory.mb take precedence over mapred.task.maxvmem, if they are configured.

Default: None

max.connect.sd.times

The number of times the client attempts to get information about a MapReduce job from the session director (SD) before it considers the connection to have expired.

Default: 20

pmr.cache.split

Specifies caching of common data input splits for a map task of a MapReduce job. When cache-aware scheduling is enabled, input splits are cached at the first iteration of a job. Subsequent iterations of a job, instead of fetching the same data from HDFS, reuse the cached input split as input. Valid values are on and off.
Note: When cache-aware scheduling is enabled, each host holds the input split of all its running map tasks in memory. Ensure that each host has sufficient memory to cache the data input splits.
To support cache-aware scheduling, ensure that you configure the following properties in the $EGO_ESRVDIR/esc/conf/services/mrss.xml file:
  • PMR_MRSS_CACHE_PATH: Specifies a location on the local disk to store the map file of the input split. The default location is ${PMR_HOME}/work/datacache.
  • PMR_MRSS_INPUTCACHE_MAX_MEMSIZE_MB: Specifies the maximum memory limit of the input split cache (in MB). The default value is 2 GB. If the size of the total memory cache does not exceed the configured size, the cache files are mapped to system memory and used as in-memory cache. If the size of the total memory cache exceeds the configured size, the cache files are not mapped to system memory but are instead used as on-disk cache.
  • PMR_MRSS_INPUTCACHE_CLEAN_INTERVAL: Specifies the duration (in seconds) that a split is cached without being accessed by any job of any application. When a split cache exceeds this duration, the data input split (and its local disk file) are deleted from the MRSS cache. The default value is 3600 seconds.

Default: off

pmr.consumer.name

Specifies the name of the consumer that is used to download JAR files and other related libraries for a job from the repository service (RS).

Default: None

pmr.decrease.map.output

For internal system use only.

pmr.debug.job.keep.failedtask.files

If there are failed tasks in a MapReduce job, specifies whether job related working directories should remain on the system, or be cleaned up.
Valid values:
  • true: Indicates that if there are failed tasks in the job, job related working directories will remain, rather than be cleaned up. This data can be used for debugging tasks.
    If set to true, these directories and files are retained at the following locations:
    Job and task working directory and files
    ${PMR_HOME}/work/AppName/JobID
    Footprint files
    ${PMR_HOME}/work/footprint/AppName/JobID
    Distributed cache files
    ${PMR_HOME}/work/distcahe/_public (for public distributed cache)
    ${PMR_HOME}/work/distcahe/$user (for private distributed cache)
  • false: Indicates that job related working directories should not remain on the system, and that directories will be cleaned up after the job completes.

Default: false

pmr.debug.job.keep.failedtask.interdata

If there are failed tasks in a MapReduce job, specifies whether map task output intermediate data should remain on the system, or be cleaned up.
Valid values:
  • true: Indicates that if there are failed tasks in the job, map task output intermediate data directories will remain and not be cleaned up. This data can be used for debugging tasks.
    If set to true, these directories and files are retained at the following locations:
    Footprint files
    ${PMR_HOME}/work/footprint/AppName/JobID
    Map task output intermediate
    ${PMR_HOME}/interdata/AppName/JobID
  • false: Indicates that map task output intermediate data directories should not remain on the system, and that directories will be cleaned up after the job completes.

Default: false

pmr.debug.task.keepfiles.pattern

Specify the regular expression (or type) of files that the system should retain if there are failed tasks in a MapReduce job. This data can be used for debugging tasks. If the file name matches this expression, the files and directories under the task working directory (${PMR_HOME}/work/AppName/ServiceIndex/) will be saved under ${PMR_HOME}/work/AppName/save/sessionID/taskID.
Valid values:
  • No value:

    For example, to specify that the system should not retain any type of files, leave this property blank (that is, specify pmr.debug.task.keepfiles.pattern=).

  • Regular expression of the file name. If the file name matches this expression, the files and directories under the task working directory (${PMR_HOME}/work/AppName/ServiceIndex) will be saved under ${PMR_HOME}/work/AppName/save/sessionID/taskID. The default is heapdump.*|javacore.*|Snap.*|core.*.

Default: heapdump.*|javacore.*|Snap.*|core.*

pmr.fast.byte.comparison

Enables fast byte array comparison to decide on an ordering between two keys by looking at the byte streams and without parsing all the data contained therein.

Default: false

pmr.framework.aggregation

Specifies how the MapReduce framework must handle <key, value> pairs which are generated by a mapper inside a partition.
Valid values:
  • sort: Specifies sorting to order and aggregate the <key, value> pairs inside partitions generated by the mapper.
  • hash: Specifies the hash table structure to aggregate <key, value> pairs. With this setting, the framework provides a hash table to hold all the <key, value> pairs in a subpartition, puts all values of the same key together, passes them into the user reduce function, and finally cleans up the <key, value> pairs in the hash table to free up memory.
    Note: Hash-based aggregation requires all data to be stored in the hash table. As a result, hash-based aggregation fails if memory for the hash table is insufficient.
  • none: Specifies hash-based aggregation to a user-overridden reducer class, rather than to the Mapreduce framework. With this setting, <key, value> pairs passed to the reducer are out of order and all values for one key end up in the same reducer shard, but not necessarily in the same reduce() call.

Default: sort

pmr.framework.cpp

Specifies whether the C++ enhanced mapper and reducer are used instead of the Java implementation. The C++ enhanced mapper transforms, processes, and transfers map task outputs in serialized byte arrays from the mapper side to the reducer side.
Valid values:
  • true: Specifies to use the C++ enhanced mapper and reducer instead of the Java implementation of the mapper and reducer.
  • false: Specifies to use the Java implementation of the mapper and reducer functions.
Note: When the C++ enhanced mapper and reducer are enabled, the following features are ignored:
  • Hash-based aggregation
  • Custom map.sort.class
  • Custom comparator mapred.output.key.comparator.class except for an out-of-box one, com.platform.mapreduce.io.SerializedComparator
  • Fast byte comparison
  • sort.compare.prefix.key
  • Partition-aware shuffling

Default: true

pmr.io.enhancement

For internal system use only.

pmr.io.enhancement.buffer.size

For internal system use only.

pmr.io.enhancement.estimate.size

For internal system use only.

pmr.io.read.enhancement

For internal system use only.

pmr.io.read.enhancement.size

For internal system use only.

pmr.job.jobfile.enabled

Specifies whether the job file is to be created when a job is submitted. Valid values are true or false. If set to false, the job file is not created and the Job.getJobFile() API returns an empty string. If set to true, the API returns the path to the job file; however, after the job is finished, the file must be deleted manually.

Default: false

pmr.job.package

Specifies the name of the user package that is used to download JAR files and other related libraries for a MapReduce job from the repository service (RS).

Default: None (set based on user examples)

pmr.job.package.version

Specifies the version of the user package that is used to download JAR files and other related libraries for a MapReduce job from the repository service (RS).

Default: None

pmr.job.map.resourceGroup.filter

Specifies a list of resource groups on whose resources map tasks can run. Map tasks from a job can only run on resources that belong to one of the resource groups listed in the filter.
Note: The system does not validate the filter you specify. If the specified filter does not exist, workload hangs because of a lack of matching resources.

Default: None (nabling map tasks to run on all resource groups specified for MapReduce workload).

pmr.job.reduce.resourceGroup.filter

Specifies a list of resource groups on whose resources reduce tasks can run. Reduce tasks from a job can only run on resources that belong to one of the resource groups listed in the filter.
Note: The system does not validate the filter you specify. If the specified filter does not exist, workload hangs because of a lack of matching resources.

Default: None (enabling reduce tasks to run on all resource groups specified for MapReduce workload).

pmr.job.reduce.service.to.slot.ratio

Specifies the number of slots required to run a reduce service instance, represented as 1:N or N:1, where N is a positive integer with a maximum value of 10.

Reduce tasks of a MapReduce job can run only on service instances that occupy the appropriate number of slots, which is determined by this ratio.

If reduceServiceToSlotRatio is specified, ensure the following configuration:
  • Configure the exclusive slot allocation policy for resources groups that will be used by multi-slot tasks.
  • If N slots are required to run some reduce tasks in a resource group, configure targetMrRatio in the MapReduce section of the application profile so that at least N slots are placed in the reducer pool on hosts in that resource group. For example, if all hosts in RG1 have eight slots but some reducers require six slots to run, configuring targetMrRatio as 1:1 causes the M:R ratio on the host to be 4:4, where four slots are not enough to run a reduce task. In this case, you could configure targetMrRatio as 1:3, which causes the M:R ratio on the host to be 2:6, giving enough slots to run a reduce task.

Default: 1:1

pmr.map.invoke.timeout.duration

Specifies the number of seconds the system waits for the invoke method to complete for a map task before a timeout is considered. If the map task does not read input, write output, or update its status string within the configured duration, the map task is killed. Valid values are between 0 and 4294967295. If you specify 0, the task never times out.

If you specify a negative integer or a decimal, the system ignores the value and uses the value set for the invoke method in the Service section of the application profile (Service > Control > Method invoke > timeout > duration). When applications take an extended duration to process, set a higher timeout so that the system does not kill the task assuming that it has timed out.

Default: None

pmr.map.output.buffer.type

Specifies whether a dual buffer or circular buffer is used to temporarily store the map outputs.
Valid values:
  • dual: Specifies that the map output buffer uses the dual buffer approach.
  • circular: Specifies that the map output buffer uses the circular buffer approach.

Default: dual

pmr.ondemand.2nd.sort.mb

In dual memory buffers, specifies whether the second map sort buffer is to be allocated for the job. Dual buffers for map tasks are used to temporarily save intermediate data for map tasks. Valid values are true or false. When set to false, the second map sort buffer is always allocated. When set to true, the second map sort buffer is allocated only when needed.

Default: false

pmr.performance.instrument.level

Specifies whether the profiling tool is switched on or off for each job. When the profiling tool is switched on, counter information is collected and written into the MapReduce job and task counters history files while the job is running.
Valid values:
  • 0: Profiling tool is switched off.
  • 1: Profiling tool is switched on.

Default: 0

pmr.reduce.f2f.factor

Enables a concurrent file-to-file merge when the number of map output files collected at the disk reaches pmr.reduce.f2f.factor*io.sort.factor-1, and the primary merge thread (which is also triggered) is busy. The thread then merges the io.sort.factor files into one. Valid values is a range of numbers greater than 1 (exclusive) but less than or equal to 2 (inclusive).

Default: None

pmr.reduce.healthy.check

Enables the calculation of a reducer's stall times during the fetching stage. The allowed stall time is calculated based on the average time taken to fetch output from previous map tasks.

Use this parameter to check the progress of a reduce task, especially when a map task takes an extended period of time to complete. In this case, sometimes a reduce task may fail because it lost notification of the map task's completion or because the reduce fetch threads hang while copying output data. As a best practice, use this parameter only when the reducer's stall time is in a small deviation range. Do not use it when some reducers need to fetch the output of more map tasks than the others.

Default: false

pmr.reduce.invoke.timeout.duration

Specifies the number of seconds the system waits for the invoke method to complete for a reduce tasks before a timeout is considered. If the reduce task does not read input, write output, or update its status string within the configured duration, the reduce task is killed. Valid values are between 0 and 4294967295. If you specify 0, the task never times out.

If you specify a negative integer or a decimal, the system ignores the value and uses the value set for the invoke method in the Service section of the application profile (Service > Control > Method invoke > timeout > duration). When applications take an extended duration to process, set a higher timeout so that the system does not kill the task assuming that it has timed out.

pmr.reduce.multithread.num

The number of concurrent threads to be created to execute the MapReduce job. This setting is mandatory.
Valid values: 1 to 2147483647, where:
  • 1: Indicates that no new threads will be created.
  • Any positive integer: For example, a value of 2 means two threads will be used to execute the job.
For best performance, note the following:
  • Set the pmr.reduce.multithread.num to a larger number, but no more than a value of 10.
  • Set the pmr.reduce.multithread.num value to be equal to the pmr.reduce.multithread.sample.min value.

To use this parameter, disable the hash aggregation feature by setting pmr.framework.aggregation to sort.

Default: 1

pmr.reduce.multithread.sample.min

Number of sample keys required to be collected before MapReduce splits segments. The sample keys determine how to partition and create corresponding threads to execute MapReduce jobs. This setting is optional.
To use this parameter:
  1. Set pmr.reduce.multithread.num to any positive integer higher than 1.
  2. Disable the hash aggregation feature by setting pmr.framework.aggregation to sort.

Valid values: 2 to 2147483647

For best performance, note the following:
  • Set the pmr.reduce.multithread.num to a larger number, but no more than a value of 10.
  • Set the pmr.reduce.multithread.num value to be equal to the pmr.reduce.multithread.sample.min value.

Default: 10

pmr.ship.compress.enabled

Specifies whether files must be compressed before they are deployed to the compute host. These files are required by the user client.

Default: true

pmr.ship.hdfs.files.by.hdfs

When large data files required by a MapReduce job are stored in the distributed cache on the Hadoop Distributed File System (HDFS), specifies the MapReduce framework to use the files on the service side directly from the original HDFS location. Valid values are true or false.
Note: This parameter takes effect only when the distributed cache files are on HDFS.

Default: true

pmr.ship.internal.hdfs.working.dir

Specifies the working directory on HDFS, which is used to store intermediate files and which is cleared when the job is finished.

Default: None (set based on user examples)

pmr.ship.local.files.mode

When large data files required by a MapReduce job are stored in the distributed cache, specifies how those files are made available to compute hosts in time for tasks requiring those files to run.
  • When the distributed cache files are located on a shared file system (such as NFS), the files on the shared file system are made available directly to compute hosts. In this case, the shared (NFS-mounted) file is not copied, but is available for access directly by the compute hosts.
  • When the distributed cache files are located on the local file system, the files are copied from the local file system to the temp folder on HDFS. The temp folder is the working directory on HDFS and is used to store intermediate data for map tasks. The temp folder is defined by the pmr.ship.internal.hdfs.working.dir parameter and is cleared when the job finishes.

Default: None (set based on user examples)

pmr.shuffle.pas

For internal system use only.

pmr.shuffle.startpoint.map.percent

Specifies whether the shuffle phase of a reduce task associated with a job must start only after a percentage of map tasks for the job have completed on all hosts. Valid values are between 0 and 100.

Default: 0

pmr.shuffle.startpoint.host.percent

Specifies whether the shuffle phase of a reduce task associated with a job must start only after map tasks for the job have completed on a percentage of hosts. Valid values are between 0 and 100.

Default: 0

pmr.subpartition.num

Specifies the number of sub-partitions that must be split for the output of each map task. When hash table-based aggregation is enabled, the framework splits partitions into sub-partitions, with each reduce task handling multiple sub-partitions for each map task output. Creating sub-partitions avoids memory issues, when a reduce task could use too much memory if it only emits outputs at the end of a partition. With this parameter, the sub-partition becomes more fine-grained than the partition, so that the reduce task emits outputs at the end of each sub-partition within a partition.
Use this parameter along with the pmr.framework.aggregation parameter.
  • To avoid memory issues, set pmr.subpartition.num to be greater than the number of tasks a reducer can run (mapred.reduce.num) and set pmr.framework.aggregation to hash or none.
  • If you do not have memory concerns, set pmr.subpartition.num to equal mapred.reduce.num and set pmr.framework.aggregation to hash or none.

Default: None (if this parameter is not defined, the number of sub-partitions equals the number of reduce tasks)

pmr.userjar.in.package

Specifies whether user JAR files for a MapReduce job must be deployed through the repository service, instead of the SOAM common data deployment mechanism, to hosts requiring these files to run tasks. Valid values are true or false.

Default: false

sort.compare.prefix.key

Specifies whether the prefix key comparison for job metadata must be enabled.

Default: false

task.counter.info.available

For internal system use only.