MapReduce job properties in IBM® Spectrum Symphony
This section lists the job configuration properties that are supported within the MapReduce framework.
- For a single job:
- From the mrsh utility, use the -D option during job submission.
- From the cluster management console, use the Add button on the Submit Job window during job submission.
- For all jobs submitted from the host, use the pmr-site.xml configuration file.
job.status.update.interval
Specifies the interval (in milliseconds) at which the job status is updated.Default: 60000 (1 minute)
mapreduce.application.name
Specifies the name of the default MapReduce application.Default: MapReduceversion
mapreduce.cleanup.log.level
Specifies the logging level for a cleanup task, which could be: OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, or ALL.Default: INFO
mapreduce.client.resolve.hostname.for.datalocation
Specifies whether the name of a host on which input data splits are located must be resolved from IP address to short host name. Valid values are true or false.Default: true
mapreduce.control.message.compress
Specifies whether data compression is enabled for a MapReduce job. Data compression is triggered when the size of serialized data is larger than the threshold for data compression, which is by default 1 KB. When data compression is enabled, any data compression attributes are applied to the input and output of map and reduce tasks.Default: false
mapreduce.job.doas.submituser
Specifies whether map and reduce tasks associated with a job are to be run as the job submission user and requires the user to have read/write permission to HDFS.Default: true
mapreduce.job.intermediatedata.checksum
Specifies whether a MapReduce job must use the CRC32 checksum to validate intermediate data for accidental errors during the shuffle stage. Valid values are true or false.Default: true
mapreduce.job.local.dir.locator
Specifies how the MapReduce framework must choose the data location for temporary data, which is generated by the map and reduce task of a MapReduce job. Because the temporary data for a reducer is unknown, you can choose which disk to choose based on its size.This parameter helps configure more than one data location, each with a different size, for intermediate data generated by map and reduce tasks, enabling the MapReduce framework to dynamically choose the location with the most available disk space.
- random: Specifies reducers to pick disks at random for storing intermediate data within the number of local disks.
- max: Specifies reducers to pick disks that have the maximum space for storing intermediate data within the number of local disks.
- roundrobin: Specifies mappers and reducers to pick disks through ordered scheduling for storing intermediate data at the job level within the number of local disks. The job ID is used to create unique sub directories on the local disks to store the intermediate data for each job.
Default: roundrobin
mapreduce.job.local.dir.margin
Specifies the ratio of the size of the directory to the size of the input split file in order to store temporary data which is generated by the map task of a MapReduce job. Valid values are between 2 and 100. If the folder size is x times more than the input split file size, that folder is used to store the map output files.This parameter helps configure more than one data location, each with a different size, for intermediate data generated by map and reduce tasks, enabling the MapReduce framework to dynamically choose the location with the most available disk space.
Default: 2
mapreduce.job.local.dir.shared
If the working directory for tasks is shared among compute hosts, this parameter specifies whether intermediate data is fetched locally, instead of through the shuffle service (mrss).Default: false
mapreduce.job.login.password
Specifies the login password for the user who submits MapReduce jobs. If Kerberos security is enabled, do not store this value in this file.Default: Guest, the password for the EGO consumer user.
mapreduce.job.login.user
Specifies the login for the user who submits MapReduce jobs. If Kerberos security is enabled, do not store this value in this file.Default: Guest, the EGO consumer user.
mapred.job.priority.num
Specifies the scheduling priority for a MapReduce job. For Hadoop versions other than version 0.21.0, use mapreduce.job.priority.num.Default: 5000
mapreduce.job.type
Specifies the type of MapReduce job. Valid values are RecoverableJob or UnrecoverableJob.The job type can also be defined at the application level (specified as recoverable in the SessionType section of the application profile). If you define the job type in the application profile, the application profile setting overrides the pmr-site.xml setting.
A recoverable job can be used to preserve your workload under exceptional circumstances, such as a power failure or host failure. Recoverable jobs can incur extra overhead because the workload must be journaled. Specifying your jobs as recoverable may not be appropriate for all types of workload, since it can take less time to rerun all the tasks in the jobs rather than to recover and resume them. The time it takes to rerun or recover and resume tasks in a job varies with the data size and number of tasks.
Default: RecoverableJob
mapreduce.job.user.name
For internal system use only.mapreduce.map.skip.commit.task
For internal system use only.mapreduce.performance.log.enabled
For internal system use only.mapreduce.progress.report.interval
Specifies the interval (in milliseconds) at which the progress of a job is reported to the cluster management console.Default: 5000 (5 seconds)
mapreduce.reduce.merge.memtomem.enabled
Specifies whether memory-to-memory merge for internal data of a reduce task is enabled.Default: false
mapreduce.reduce.merge.memtomem.threshold
Specifies the threshold when memory-to-memory merge for internal data of a reduce task is triggered.- Hadoop version 0.20.x, 1.0.x, and 1.1.1: io.sort.factor
- Hadoop version 0.21.0: mapreduce.task.io.sort.factor
mapreduce.reduce.shuffle.connect.timeout
(For advanced users) Specifies the maximum duration (in milliseconds) that a reduce task tries connecting to the shuffle service (mrss) to fetch map output.Default: 180000
- 0.20.x, 1.0.x, and 1.1.1: mapreduce.reduce.shuffle.connect.timeout
- 0.21.0: mapreduce.reduce.shuffle.connect.timeout
mapreduce.reduce.shuffle.merge.fetched.percent
Specifies the threshold at which an in-memory merge is initiated (expressed as a percentage of the total memory allocated which stored in-memory map outputs) as defined this value:mapreduce.reduce.shuffle.merge.percent*mapred.job.shuffle.input.buffer.percent
Default: 0.8
mapreduce.submit.syncmode
Specifies whether the MapReduce job is submitted in synchronous or asynchronous mode.Tasks associated with a MapReduce job are submitted to the application manager (SSM) in one of two ways: in synchronous mode or asynchronous mode. In synchronous mode, the job is aborted if the client disconnects from the application manager. In asynchronous mode, the job keeps on running even if the client disconnects from the application manager.
Default: true
mapred.task.maxvmem
Specifies the maximum virtual memory for a task. If the task's memory usage exceeds the limit, the task is killed. Parameters mapred.job.map.memory.mb and mapred.job.reduce.memory.mb take precedence over mapred.task.maxvmem, if they are configured.Default: None
max.connect.sd.times
The number of times the client attempts to get information about a MapReduce job from the session director (SD) before it considers the connection to have expired.Default: 20
pmr.cache.split
Specifies caching of common data input splits for a map task of a MapReduce job. When cache-aware scheduling is enabled, input splits are cached at the first iteration of a job. Subsequent iterations of a job, instead of fetching the same data from HDFS, reuse the cached input split as input. Valid values are on and off.- PMR_MRSS_CACHE_PATH: Specifies a location on the local disk to store the map file of the input split. The default location is ${PMR_HOME}/work/datacache.
- PMR_MRSS_INPUTCACHE_MAX_MEMSIZE_MB: Specifies the maximum memory limit of the input split cache (in MB). The default value is 2 GB. If the size of the total memory cache does not exceed the configured size, the cache files are mapped to system memory and used as in-memory cache. If the size of the total memory cache exceeds the configured size, the cache files are not mapped to system memory but are instead used as on-disk cache.
- PMR_MRSS_INPUTCACHE_CLEAN_INTERVAL: Specifies the duration (in seconds) that a split is cached without being accessed by any job of any application. When a split cache exceeds this duration, the data input split (and its local disk file) are deleted from the MRSS cache. The default value is 3600 seconds.
Default: off
pmr.consumer.name
Specifies the name of the consumer that is used to download JAR files and other related libraries for a job from the repository service (RS).Default: None
pmr.decrease.map.output
For internal system use only.pmr.debug.job.keep.failedtask.files
If there are failed tasks in a MapReduce job, specifies whether job related working directories should remain on the system, or be cleaned up.- true: Indicates that if there are failed tasks in the job, job related working
directories will remain, rather than be cleaned up. This data can be used for debugging tasks.If set to true, these directories and files are retained at the following locations:
- Job and task working directory and files
- ${PMR_HOME}/work/AppName/JobID
- Footprint files
- ${PMR_HOME}/work/footprint/AppName/JobID
- Distributed cache files
- ${PMR_HOME}/work/distcahe/_public (for public distributed cache)
- false: Indicates that job related working directories should not remain on the system, and that directories will be cleaned up after the job completes.
Default: false
pmr.debug.job.keep.failedtask.interdata
If there are failed tasks in a MapReduce job, specifies whether map task output intermediate data should remain on the system, or be cleaned up.- true: Indicates that if there are failed tasks in the job, map task output
intermediate data directories will remain and not be cleaned up. This data can be used for debugging
tasks. If set to true, these directories and files are retained at the following locations:
- Footprint files
- ${PMR_HOME}/work/footprint/AppName/JobID
- Map task output intermediate
- ${PMR_HOME}/interdata/AppName/JobID
- false: Indicates that map task output intermediate data directories should not remain on the system, and that directories will be cleaned up after the job completes.
Default: false
pmr.debug.task.keepfiles.pattern
Specify the regular expression (or type) of files that the system should retain if there are failed tasks in a MapReduce job. This data can be used for debugging tasks. If the file name matches this expression, the files and directories under the task working directory (${PMR_HOME}/work/AppName/ServiceIndex/) will be saved under ${PMR_HOME}/work/AppName/save/sessionID/taskID.- No value:
For example, to specify that the system should not retain any type of files, leave this property blank (that is, specify pmr.debug.task.keepfiles.pattern=).
- Regular expression of the file name. If the file name matches this expression, the files and directories under the task working directory (${PMR_HOME}/work/AppName/ServiceIndex) will be saved under ${PMR_HOME}/work/AppName/save/sessionID/taskID. The default is heapdump.*|javacore.*|Snap.*|core.*.
Default: heapdump.*|javacore.*|Snap.*|core.*
pmr.fast.byte.comparison
Enables fast byte array comparison to decide on an ordering between two keys by looking at the byte streams and without parsing all the data contained therein.Default: false
pmr.framework.aggregation
Specifies how the MapReduce framework must handle <key, value> pairs which are generated by a mapper inside a partition.- sort: Specifies sorting to order and aggregate the <key, value> pairs inside partitions generated by the mapper.
- hash: Specifies the hash table structure to aggregate <key, value> pairs.
With this setting, the framework provides a hash table to hold all the <key, value> pairs in a
subpartition, puts all values of the same key together, passes them into the user reduce function,
and finally cleans up the <key, value> pairs in the hash table to free up memory.Note: Hash-based aggregation requires all data to be stored in the hash table. As a result, hash-based aggregation fails if memory for the hash table is insufficient.
- none: Specifies hash-based aggregation to a user-overridden reducer class, rather than to the Mapreduce framework. With this setting, <key, value> pairs passed to the reducer are out of order and all values for one key end up in the same reducer shard, but not necessarily in the same reduce() call.
Default: sort
pmr.framework.cpp
Specifies whether the C++ enhanced mapper and reducer are used instead of the Java implementation. The C++ enhanced mapper transforms, processes, and transfers map task outputs in serialized byte arrays from the mapper side to the reducer side.- true: Specifies to use the C++ enhanced mapper and reducer instead of the Java implementation of the mapper and reducer.
- false: Specifies to use the Java implementation of the mapper and reducer functions.
- Hash-based aggregation
- Custom map.sort.class
- Custom comparator mapred.output.key.comparator.class except for an out-of-box one, com.platform.mapreduce.io.SerializedComparator
- Fast byte comparison
- sort.compare.prefix.key
- Partition-aware shuffling
Default: true
pmr.io.enhancement
For internal system use only.pmr.io.enhancement.buffer.size
For internal system use only.pmr.io.enhancement.estimate.size
For internal system use only.pmr.io.read.enhancement
For internal system use only.pmr.io.read.enhancement.size
For internal system use only.pmr.job.jobfile.enabled
Specifies whether the job file is to be created when a job is submitted. Valid values are true or false. If set to false, the job file is not created and the Job.getJobFile() API returns an empty string. If set to true, the API returns the path to the job file; however, after the job is finished, the file must be deleted manually.Default: false
pmr.job.package
Specifies the name of the user package that is used to download JAR files and other related libraries for a MapReduce job from the repository service (RS).Default: None (set based on user examples)
pmr.job.package.version
Specifies the version of the user package that is used to download JAR files and other related libraries for a MapReduce job from the repository service (RS).Default: None
pmr.job.map.resourceGroup.filter
Specifies a list of resource groups on whose resources map tasks can run. Map tasks from a job can only run on resources that belong to one of the resource groups listed in the filter.Default: None (nabling map tasks to run on all resource groups specified for MapReduce workload).
pmr.job.reduce.resourceGroup.filter
Specifies a list of resource groups on whose resources reduce tasks can run. Reduce tasks from a job can only run on resources that belong to one of the resource groups listed in the filter.Default: None (enabling reduce tasks to run on all resource groups specified for MapReduce workload).
pmr.job.reduce.service.to.slot.ratio
Specifies the number of slots required to run a reduce service instance, represented as 1:N or N:1, where N is a positive integer with a maximum value of 10.Reduce tasks of a MapReduce job can run only on service instances that occupy the appropriate number of slots, which is determined by this ratio.
- Configure the exclusive slot allocation policy for resources groups that will be used by multi-slot tasks.
- If N slots are required to run some reduce tasks in a resource group, configure targetMrRatio in the MapReduce section of the application profile so that at least N slots are placed in the reducer pool on hosts in that resource group. For example, if all hosts in RG1 have eight slots but some reducers require six slots to run, configuring targetMrRatio as 1:1 causes the M:R ratio on the host to be 4:4, where four slots are not enough to run a reduce task. In this case, you could configure targetMrRatio as 1:3, which causes the M:R ratio on the host to be 2:6, giving enough slots to run a reduce task.
Default: 1:1
pmr.map.invoke.timeout.duration
Specifies the number of seconds the system waits for the invoke method to complete for a map task before a timeout is considered. If the map task does not read input, write output, or update its status string within the configured duration, the map task is killed. Valid values are between 0 and 4294967295. If you specify 0, the task never times out.If you specify a negative integer or a decimal, the system ignores the value and uses the value set for the invoke method in the Service section of the application profile (Service > Control > Method invoke > timeout > duration). When applications take an extended duration to process, set a higher timeout so that the system does not kill the task assuming that it has timed out.
Default: None
pmr.map.output.buffer.type
Specifies whether a dual buffer or circular buffer is used to temporarily store the map outputs.- dual: Specifies that the map output buffer uses the dual buffer approach.
- circular: Specifies that the map output buffer uses the circular buffer approach.
Default: dual
pmr.ondemand.2nd.sort.mb
In dual memory buffers, specifies whether the second map sort buffer is to be allocated for the job. Dual buffers for map tasks are used to temporarily save intermediate data for map tasks. Valid values are true or false. When set to false, the second map sort buffer is always allocated. When set to true, the second map sort buffer is allocated only when needed.Default: false
pmr.performance.instrument.level
Specifies whether the profiling tool is switched on or off for each job. When the profiling tool is switched on, counter information is collected and written into the MapReduce job and task counters history files while the job is running.- 0: Profiling tool is switched off.
- 1: Profiling tool is switched on.
Default: 0
pmr.reduce.f2f.factor
Enables a concurrent file-to-file merge when the number of map output files collected at the disk reachespmr.reduce.f2f.factor*io.sort.factor-1
, and the primary merge thread (which is also
triggered) is busy. The thread then merges the io.sort.factor files into one.
Valid values is a range of numbers greater than 1 (exclusive) but less than or equal to 2
(inclusive).Default: None
pmr.reduce.healthy.check
Enables the calculation of a reducer's stall times during the fetching stage. The allowed stall time is calculated based on the average time taken to fetch output from previous map tasks.Use this parameter to check the progress of a reduce task, especially when a map task takes an extended period of time to complete. In this case, sometimes a reduce task may fail because it lost notification of the map task's completion or because the reduce fetch threads hang while copying output data. As a best practice, use this parameter only when the reducer's stall time is in a small deviation range. Do not use it when some reducers need to fetch the output of more map tasks than the others.
Default: false
pmr.reduce.invoke.timeout.duration
Specifies the number of seconds the system waits for the invoke method to complete for a reduce tasks before a timeout is considered. If the reduce task does not read input, write output, or update its status string within the configured duration, the reduce task is killed. Valid values are between 0 and 4294967295. If you specify 0, the task never times out.If you specify a negative integer or a decimal, the system ignores the value and uses the value set for the invoke method in the Service section of the application profile (Service > Control > Method invoke > timeout > duration). When applications take an extended duration to process, set a higher timeout so that the system does not kill the task assuming that it has timed out.
pmr.reduce.multithread.num
The number of concurrent threads to be created to execute the MapReduce job. This setting is mandatory.- 1: Indicates that no new threads will be created.
- Any positive integer: For example, a value of 2 means two threads will be used to execute the job.
- Set the pmr.reduce.multithread.num to a larger number, but no more than a value of 10.
- Set the pmr.reduce.multithread.num value to be equal to the pmr.reduce.multithread.sample.min value.
To use this parameter, disable the hash aggregation feature by setting pmr.framework.aggregation to sort.
Default: 1
pmr.reduce.multithread.sample.min
Number of sample keys required to be collected before MapReduce splits segments. The sample keys determine how to partition and create corresponding threads to execute MapReduce jobs. This setting is optional.- Set pmr.reduce.multithread.num to any positive integer higher than 1.
- Disable the hash aggregation feature by setting pmr.framework.aggregation to sort.
Valid values: 2 to 2147483647
- Set the pmr.reduce.multithread.num to a larger number, but no more than a value of 10.
- Set the pmr.reduce.multithread.num value to be equal to the pmr.reduce.multithread.sample.min value.
Default: 10
pmr.ship.compress.enabled
Specifies whether files must be compressed before they are deployed to the compute host. These files are required by the user client.Default: true
pmr.ship.hdfs.files.by.hdfs
When large data files required by a MapReduce job are stored in the distributed cache on the Hadoop Distributed File System (HDFS), specifies the MapReduce framework to use the files on the service side directly from the original HDFS location. Valid values are true or false.Default: true
pmr.ship.internal.hdfs.working.dir
Specifies the working directory on HDFS, which is used to store intermediate files and which is cleared when the job is finished.Default: None (set based on user examples)
pmr.ship.local.files.mode
When large data files required by a MapReduce job are stored in the distributed cache, specifies how those files are made available to compute hosts in time for tasks requiring those files to run.- When the distributed cache files are located on a shared file system (such as NFS), the files on the shared file system are made available directly to compute hosts. In this case, the shared (NFS-mounted) file is not copied, but is available for access directly by the compute hosts.
- When the distributed cache files are located on the local file system, the files are copied from the local file system to the temp folder on HDFS. The temp folder is the working directory on HDFS and is used to store intermediate data for map tasks. The temp folder is defined by the pmr.ship.internal.hdfs.working.dir parameter and is cleared when the job finishes.
Default: None (set based on user examples)
pmr.shuffle.pas
For internal system use only.pmr.shuffle.startpoint.map.percent
Specifies whether the shuffle phase of a reduce task associated with a job must start only after a percentage of map tasks for the job have completed on all hosts. Valid values are between 0 and 100.Default: 0
pmr.shuffle.startpoint.host.percent
Specifies whether the shuffle phase of a reduce task associated with a job must start only after map tasks for the job have completed on a percentage of hosts. Valid values are between 0 and 100.Default: 0
pmr.subpartition.num
Specifies the number of sub-partitions that must be split for the output of each map task. When hash table-based aggregation is enabled, the framework splits partitions into sub-partitions, with each reduce task handling multiple sub-partitions for each map task output. Creating sub-partitions avoids memory issues, when a reduce task could use too much memory if it only emits outputs at the end of a partition. With this parameter, the sub-partition becomes more fine-grained than the partition, so that the reduce task emits outputs at the end of each sub-partition within a partition.- To avoid memory issues, set pmr.subpartition.num to be greater than the number of tasks a reducer can run (mapred.reduce.num) and set pmr.framework.aggregation to hash or none.
- If you do not have memory concerns, set pmr.subpartition.num to equal mapred.reduce.num and set pmr.framework.aggregation to hash or none.
Default: None (if this parameter is not defined, the number of sub-partitions equals the number of reduce tasks)
pmr.userjar.in.package
Specifies whether user JAR files for a MapReduce job must be deployed through the repository service, instead of the SOAM common data deployment mechanism, to hosts requiring these files to run tasks. Valid values are true or false.Default: false
sort.compare.prefix.key
Specifies whether the prefix key comparison for job metadata must be enabled.Default: false