Customizing a MapReduce service for lifecycle events
This tutorial walks you through a sample service code, then guides you through the process of customizing the default MapReduce service.
About this task
- onCreateService: when the customized MapReduce service starts
- onSessionEnter: when a MapReduce job starts
- onInvoke: when a map or reduce task within a job starts
- onSessionLeave: when a MapReduce job ends
- onDestroyService: when the customized MapReduce service terminates
Procedure
- Build the sample service.
- Change to the root directory under the directory in which you installed IBM® Spectrum Symphony Developer Edition. For example, if you installed IBM Spectrum Symphony Developer Edition in the default installation directory, you will change to the /opt/ibm/platformsymphonyde/de731 directory.
- Set the environment:
- (csh) source cshrc.platform
- (bash) . profile.platform
-
Change to the
$SOAM_HOME/mapreduce/version/samples/CustomizedPMRService
directory and run the make command:
make
By default, the sample is compiled with the Hadoop 0.21.0 API. If you want to compile the sample with a different Hadoop API version, change VERSION in the Makefile to the Hadoop API version. For example, Hadoop 2_7_2 = version 2.7.2
- Package the sample service.
You must package the files required by your service to create a service package. When you built the sample, the service package was automatically created for you.
-
The service package for your Hadoop version
pmr-CustomizedService-version.tar (which contains the
service jar file and the run script) is in the
$SOAM_HOME/mapreduce/version/samples/CustomizedPMRService
directory:
If you used the default Hadoop 0.21.0 API, the service package in this folder will be named pmr-CustomizedService-0.21.0.tar.
-
The service package for your Hadoop version
pmr-CustomizedService-version.tar (which contains the
service jar file and the run script) is in the
$SOAM_HOME/mapreduce/version/samples/CustomizedPMRService
directory:
- Deploy the sample service:
- Deploy the service package using the soamdeploy command. For example:
soamdeploy add SamplePMRService -p pmr-CustomizedService.0.21.0.tar -c /MapReduceConsumer/MapReduce731
- Check the list of deployed services to ensure that the
service package has deployed, using the soamdeploy view command. For example:
soamdeploy view -c /MapReduceConsumer/MapReduce731
You should see the package you just deployed.
- Open the MapReduceversion.xml application profile under the $SOAM_HOME/mapreduce/version/os_type/profile directory.
- Edit the application profile:
- Change:
<Service default="true" ... description="The Map Reduce Service" name="MapReduceService">
to:<Service default="true" ... description="The Map Reduce Service" name="SamplePMRService" packagename="SamplePMRService">
- Change:
<osType fileNamePattern="s" logDirectory="${SOAM_HOME}/mapreduce/logs/tasklogs" name="all" startCmd="${SOAM_HOME}/mapreduce/${PMR_VERSION}/${BINARY_TYPE}/etc/RunMapReduceService.sh" subDirectoryPattern="%applicationName%/%sessionId%/task_%taskId%" workDir="${PMR_HOME}/work/${SUB_WORK_DIR}">
to:<osType fileNamePattern="s" logDirectory="${SOAM_HOME}/mapreduce/logs/tasklogs" name="all" startCmd="${SOAM_DEPLOY_DIR}/RunCustomizedPMRService.sh" subDirectoryPattern="%applicationName%/%sessionId%/task_%taskId%" workDir="${PMR_HOME}/work/${SUB_WORK_DIR}">
Note: If you are running the sample with Cloudera's Distribution Including Hadoop (CDH) version 3 update 5, change RunCustomizedPMRService.sh at $SOAM_HOME/mapreduce/version/samples/CustomizedPMRService. Edit the script in a text editor and add the code marked in bold.elif [ $HADOOP_VERSION = "cdh3u1" ]; then PMR_APP_DEP_JARFILES=${PMR_APP_DEP_JARFILES}:$PMR_HOME/$PMR_VERSION/$EGO_MACHINE_TYPE/lib/cloudera-cdh3u1/pmr-cloudera-plus-cdh3u1.jar:$PMR_HOME/$PMR_VERSION/$EGO_MACHINE_TYPE/lib/cloudera-cdh3u1/hadoop-core-0.20.2-cdh3u1.jar; elif [ $HADOOP_VERSION = "cdh3u2" ]; then PMR_APP_DEP_JARFILES=${PMR_APP_DEP_JARFILES}:$PMR_HOME/$PMR_VERSION/$EGO_MACHINE_TYPE/lib/cloudera-cdh3u2/pmr-cloudera-plus-cdh3u2.jar:$PMR_HOME/$PMR_VERSION/$EGO_MACHINE_TYPE/lib/cloudera-cdh3u2/hadoop-core-0.20.2-cdh3u2.jar; elif [ $HADOOP_VERSION = "cdh3u5" ]; then if [ -z $CLOUDERA_HOME ] || [ $CLOUDERA_HOME = "" ]; then echo "'\$CLOUDERA_HOME' is not defined. Set CLOUDERA_HOME in ${PMR_HOME}/conf/pmr-env.sh and in ${PMR_HOME}/${PMR_VERSION}/${BINARY_TYPE}/profile/MapReduce731.xml. After setting the value in the MapReduce731.xml application profile, reregister the profile for the changes to take effect." exit fi if [ ! -e $CLOUDERA_HOME/hadoop-core-0.20.2-cdh3u5.jar ]; then echo "hadoop-core-0.20.2-cdh3u5.jar does not exist. Ensure that the file is available under $CLOUDERA_HOME." exit fi PMR_APP_DEP_JARFILES=${PMR_APP_DEP_JARFILES}:$PMR_HOME/$PMR_VERSION/$EGO_MACHINE_TYPE/lib/cloudera-cdh3u5/pmr-cloudera-plus-cdh3u5.jar:$CLOUDERA_HOME/hadoop-core-0.20.2-cdh3u5.jar:${CLOUDERA_HOME}/lib/guava-r09-jarjar.jar; for f in $PMR_HOME/$PMR_VERSION/$EGO_MACHINE_TYPE/lib/hadoop-0.20.204/*.jar; do if [ $f != "$PMR_HOME/$PMR_VERSION/$EGO_MACHINE_TYPE/lib/hadoop-0.20.204/pmr-hadoop-hdfs-0.20.204.jar" ] && [ $f != "$PMR_HOME/$PMR_VERSION/$EGO_MACHINE_TYPE/lib/hadoop-0.20.204/pmr-hadoop-core-0.20.204.jar" ] && [ $f != "$PMR_HOME/$PMR_VERSION/$EGO_MACHINE_TYPE/lib/hadoop-0.20.204/pmr-hadoop-mapred-0.20.204.jar" ] && [ $f != "$PMR_HOME/$PMR_VERSION/$EGO_MACHINE_TYPE/lib/hadoop-0.20.204/hadoop-streaming-0.20.204.0.jar" ]; then PMR_APP_DEP_JARFILES=${PMR_APP_DEP_JARFILES}:$f; fi done elif [ $HADOOP_VERSION = "1_0_0" ]; then for f in $PMR_HOME/$PMR_VERSION/$EGO_MACHINE_TYPE/lib/hadoop-1.0.0/*.jar; do PMR_APP_DEP_JARFILES=${PMR_APP_DEP_JARFILES}:$f; done elif [ $HADOOP_VERSION = "1_1_1" ]; then for f in $PMR_HOME/$PMR_VERSION/$EGO_MACHINE_TYPE/lib/hadoop-1.1.1/*.jar; do PMR_APP_DEP_JARFILES=${PMR_APP_DEP_JARFILES}:$f; done
- Change:
- Optional: Define the environment variables
corresponding to each lifecycle point in an application profile, for
example, MapReduce731.xml. These environment
variables define commands to execute the event handlers in the sample
on the host where the corresponding command is running and redirect
the output to a file (also set in an environment variable).
The following table describes the environment variables that the sample uses. If you do not define the variables, the default command is used.
Environment variable Description SYMMR_LC_CREATE_SERVICE_CMD Command to execute when the service is created. The default command is echo SYMMR_LC_CREATE_SERVICE_CMD. SYMMR_LC_SESSION_ENTER_CMD Command to execute when a starts. The default command is echo SYMMR_LC_SESSION_ENTER_CMD. SYMMR_LC_INVOKE_CMD Command to execute when a task (map or reduce) is invoked. The default command is echo SYMMR_LC_INVOKE_CMD. SYMMR_LC_SESSION_EXIT_CMD Command to execute when a job is complete. The default command is echo SYMMR_LC_SESSION_EXIT_CMD. SYMMR_LC_DESTROY_SERVICE_CMD Command to execute when the service is terminated. The default command is echo SYMMR_LC_DESTROY_SERVICE_CMD. SYMMR_LC_LOGFILE Path to where the standard output of each command is to be logged. While the location can be a known location on each cluster node, or a location in shared storage, the location must be writable by the execution user on every node. The default value is /dev/null, meaning no log file will be written. If you are editing the environment variables, note that each command must be executable on every host in the cluster and must only include the command and its arguments separated by spaces. If you are using shell built-ins such as redirects, we recommend that you a write a helper script and deploy it in a consistent location across the cluster. Edit the environment variables in specified in the MapReduceversion.xml application profile in any one of two ways:
- Manually edit the application profile under the
$SOAM_HOME/mapreduce/version/os_type/profile directory and
change the value of each variable under Service > osTypes > ostype > env.
For example, <env name="SYMMR_LC_INVOKE_CMD">echo onInvoke</env> would set the command to execute when a task is invoked to "echo onInvoke".
- Edit the application profile using the cluster management console and make the changes.
- Manually edit the application profile under the
$SOAM_HOME/mapreduce/version/os_type/profile directory and
change the value of each variable under Service > osTypes > ostype > env.
- Deploy the service package using the soamdeploy command.
- Register the MapReduce731.xml using the soamreg command:
For example:
soamreg $SOAM_HOME/mapreduce/version/os_type/profile/MapReduceversion.xml
- Confirm that the application is enabled using the soamview command.
For example:
soamview app MapReduceversion
If the profile is not enabled, enable the profile.
For example:
soamcontrol app enable MapReduceversion
- Run the sample service.
To run the sample, you should set the environment variables.
Submit a MapReduce job; for example, WordCount:
mrsh jar $SOAM_HOME/mapreduce/version/os_type/samples/hadoop-mapred-examples-0.21.0.jar wordcount hdfs://namenodeAddress:9000/input hdfs://namenodeAddress:9000/output
Note: Ensure that HADOOP_VERSION in $SOAM_HOME/mapreduce/conf/pmr-env.sh uses the version corresponding to your Hadoop API version.You should see output on the command line as work is submitted to the system.
- Walk through the code.
Review the sample code to learn how you can write a customized MapReduce service that can execute actions at various service, session (job), and individual task life cycle points.
- Locate the code samples, which are available on n Linux 64-bit hosts from the $SOAM_HOME/mapreduce/version/samples/CustomizedPMRService/com/platform/mapreduce/samples/SamplePMRService.java directory.
- Understand what the sample does. The sample defines a Java™ class that inherits the MRService class in the MapReduce API and overrides the following life cycle methods to provide customized behavior:
- onCreateService
- onSessionEnter
- onInvoke
- onSessionLeave
- onDestroyService
According to API documentation, each method calls the corresponding method in the superclass exactly once, but executes some customized code before doing so. This code simply reads a command from one of the following environment variables and executes it:- SYMMR_LC_CREATE_SERVICE_CMD
- SYMMR_LC_SESSION_ENTER_CMD
- SYMMR_LC_INVOKE_CMD
- SYMMR_LC_SESSION_LEAVE_CMD
- SYMMR_LC_DESTROY_SERVICE_CMD
- Understand the code detail. The overall structure of the SamplePMRService class is as follows:
public class SamplePMRService extends MRService { @Override public void onCreateService(ServiceContext serviceContext) throws SoamException { ... } @Override public void onSessionEnter(SessionContext sessionContext) @Override public void onInvoke(TaskContext taskContext) throws SoamException { ... } @Override public void onSessionLeave() throws SoamException { .... } @Override public void onDestroyService() throws SoamException { .... } }
The contents of each overridden method then look something like this; we'll take onCreateService as a typical example:@Overridepublic void onCreateService(ServiceContext serviceContext) throws SoamException { try { // Any customized code except 'return' } catch (Throwable e) { throw new SoamException(e); } super.onCreateService(serviceContext); // This has to be called once here try { // Any customized code except 'return' } catch (Throwable e) { throw new SoamException(e); } }
In the case of our application, we simply add a call to a single helper method execCmd() which has the following signature:void execCmd(String, String, String) throws Throwable
The first argument is the command line to execute, the second argument is a complete writable path descriptor to a file to which the standard output and standard error of the command will be executed, and the third is the name of the life cycle method from which it was called. Therefore, in the case of onCreateService, we call the following code:this.execCmd(this.cmdline("SYMMR_LC_CREATE_SERVICE_CMD"), this.logfile(), "onCreateService");
where cmdline() and logfile() are themselves helper functions defined as follows:private String cmdline(String var) { String cmd = System.getenv().get(var); if (cmd == null) { cmd="echo " + var; } return cmd; }
private String logfile() { String file = System.getenv().get("SYMMR_LC_LOGFILE"); if (file == null) { file="/dev/null"; } return file; }
In other words, they attempt to get a command line or a log file path from the appropriate environment variable, or failing that return default values.
Finally, the execCmd() method itself is then defined as follows:1: private void execCmd(String cmdline, 2: String outfile, 3: String calledFrom) throws Throwable { 4: try { 5: String line = null; 6: String cmd[] = cmdline.split(" "); 7: Process pr = Runtime.getRuntime().exec(cmd); 8: BufferedReader in = new BufferedReader( 9: newInputStreamReader(pr.getInputStream())); 10: BufferedWriter out = new BufferedWriter(new FileWriter(outfile,true)); 11:12: out.write(now() + " ==== " + calledFrom + 13: " -- BEGIN <" + cmdline + ">\n"); 14: while ((line = in.readLine()) != null) { 15: out.write(now() + " " + line + "\n"); 16: } 17: out.write(now() + " ==== END <" + cmdline + ">\n"); 18: out.close(); 19: } catch (Throwable e) { 20: throw e; 21: } 22:}
Simply put, lines 5 to 9 set up the standard Java runtime execution context and the input and output buffers for the program given by the command line parameter. Lines 12-18 log the command name and the standard output to the log file given by the out file parameter.
The now() helper function simply returns a timestamp representing the current time; it is defined as follows:private String now() { return new Date().toString(); }