Demonstrating memory shared between tasks in the MapReduce framework
This tutorial guides you through the process of using the shared memory sample to demonstrate how memory is shared between tasks in the MapReduce framework, so as to significantly reduce the service footprint.
About this task
- DataGenBinary: A Java™ executable that takes an input x file size and generates a binary file around the same size.
- DataReadBinary: A MapReduce application that generates n tasks to read and verify the binary file generated by DataGenBinary. The DataReadBinary also uses the SharedMemoryWrapper API to read the file from the map processes.
- Build the sample
- Package the sample
- Run the sample
- Walk through the code
Procedure
- Build the sample.
- Change to the root directory under the directory in which you installed IBM® Spectrum Symphony Developer Edition. For example, if you used the default installation directory, you will change to the opt/ibm/platformsymphonyde/de731 directory.
- Set the environment:
- (csh) source cshrc.platform
- (bash) . profile.platform
-
Change to the
$SOAM_HOME/mapreduce/version/samples/FileChannelAndMappedBufferWrapper
directory and run the make command:
make
By default, the sample is compiled with the Hadoop 0.21.0 API. If you want to compile the sample with a different Hadoop API version, change VERSION in the Makefile to the Hadoop API version. For example, Hadoop 2_7_2 = version 2.7.2
- Package the sample.
You must package the files to create a service package. When you built the sample, the service package was automatically created for you.
- The service package for your Hadoop version pmr-FileChannelAndMappedBufferWrapper-version.jar is in the following directory:
$SOAM_HOME/mapreduce/version/samples/FileChannelAndMappedBufferWrapper
If you used the default Hadoop 0.21.0 API, the service package in this folder will be named pmr-FileChannelAndMappedBufferWrapper-0.21.0.jar.
- The service package for your Hadoop version pmr-FileChannelAndMappedBufferWrapper-version.jar is in the following directory:
- Run the sample.
- Run DataGenBinary using the following
command:
java -cp pmr-FileChannelAndMappedBufferWrapper-version.jar com.platform.mapreduce.samples.DataGenBinary xxxx
where:- version is the Hadoop API version you used to build the sample; for example, 0.21.0.
- xxxx specifies the file size (in MB).
The binaryData.dat data file should appear in the current directory.
- Run DataReadBinary using the following
command:
java -cp $SOAM_HOME/mapreduce/version/os_type/lib/hadoop-version/pmr-hadoop-mapred-version.jar:pmr-FileChannelAndMappedBufferWrapper-version.jar com.platform.mapreduce.samples.DataReadBinary
where version is the Hadoop API version you used to build the sample; for example, 0.21.0.
- To run the sample on the grid cluster:
- Copy the data file binaryData.dat to the $SOAM_HOME/mapreduce directory on every node in the cluster.
- Log on to the primary node and
run:
mrsh jar pmr-FileChannelAndMappedBufferWrapper-version.jar com.platform.mapreduce.samples.DataReadBinaryMap hdfs://namenodeAddress:9000/input hdfs://namenodeAddress:9000/output
where:- version is the Hadoop API version you used to build the sample; for example, 0.21.0
- input and output are the input and output directories on HDFS for the standard WordCount workload
The framework will run WordCount and read the $SOAM_HOME/mapreduce/binaryData.dat file once for each map process generated. It then logs the output to the $SOAM_HOME/mapreduce/logs/DataGenBinaryMap.log file.
- Run DataGenBinary using the following
command:
- Walk through the code.
Review the FileChannelAndMappedBufferWrapper sample code to learn how you can write MapReduce applications that can share memory across tasks.
-
Locate the code samples for Linux 64-bit hosts:
Table 1. Code samples File Llocation of code sample Data generation tool $SOAM_HOME/mapreduce/version/samples/FileChannelAndMappedBufferWrapper/com/platform/mapreduce/samples/DataGenBinary.java Standalone data reading tool $SOAM_HOME/mapreduce/version/samples/FileChannelAndMappedBufferWrapper/com/platform/mapreduce/samples/DataReadBinary.java Clustered data reading tool $SOAM_HOME/mapreduce/version/samples/FileChannelAndMappedBufferWrapper/com/platform/mapreduce/samples/DataReadBinaryMap.java - Understand what the sample does.
The sample provides a tool to write a binary data file in a particular format using the standard Java DataOutputStream interface (DataGenBinary), and two tools to read and validate it using IBM Spectrum Symphony's shared memory API (rather than Java's DataInputStream interface) in single process and cluster mode respectively (DataReadBinary and DataReadBinaryMap).
The cluster-mode implementation (DataReadBinaryMap) is identical to Apache's WordCount v1.0 example in Hadoop MapReduce, except that each map task runs the DataReadBinary functionality in its constructor.
- Understand code detail for DataGenBinary. This class comprises a single main method taking an integer (
mb
) as its first argument. The program first sets up a standard Java binary output stream to write to a file in the current directory named binaryData.dat:File f = new File("binaryData.dat"); FileOutputStream fileOutputStream = new FileOutputStream(f); BufferedOutputStream bufferedStream = new BufferedOutputStream(fileOutputStream); DataOutputStream file = new DataOutputStream(bufferedStream);
It then calculates the total size of the file to be generated and initializes some counters:long fileSize = 0; long iterationNumber = 0; long maxSize = 1024 * 1024 * mb;
Next, it iterates through a loop writing one each of a Long, an Integer, a Short, a Float, a Double, a Character, and a Byte to the file using standard Java API calls. It also increments the file size and iteration counters appropriately as it goes along. Each write increments the file size by <TYPE>.SIZE / Byte.SIZE, where <TYPE> is the type of the written datum:while (fileSize < maxSize) { iterationNumber++; file.writeLong(iterationNumber); fileSize += (Long.SIZE / Byte.SIZE); file.writeInt((int)(iterationNumber % Integer.MAX_VALUE)); fileSize += (Integer.SIZE / Byte.SIZE); file.writeShort((short)(iterationNumber % Short.MAX_VALUE)); fileSize += (Short.SIZE / Byte.SIZE); file.writeFloat((float)iterationNumber); fileSize += (Float.SIZE / Byte.SIZE); file.writeDouble((double)iterationNumber); fileSize += (Double.SIZE / Byte.SIZE); file.writeChar((char)(iterationNumber % Character.MAX_VALUE)); fileSize += (Character.SIZE / Byte.SIZE); file.writeByte((byte)(iterationNumber % Byte.MAX_VALUE)); fileSize += (Byte.SIZE / Byte.SIZE); }
Note that the value written for each type is the current iteration number (modulo the types MAX_VALUE where appropriate); this is used later in the validation stage.
Finally, once the file is large enough, the file descriptor is closed and a message is printed for the user:file.close(); System.out.println("Number of iterations to write file: " + iterationNumber);
- Understand code detail for DataReadBinary.
This tool performs the exact inverse of DataGenBinary: it looks for a file named binaryData.dat in the current working directory and reads values from it in the order described previously, doing a few consistency checks as it goes along.
The notable oddity is that, rather than using a standard Java API to open the file (which may be very large), it uses the SharedMemoryMapper to do so:MappedBigByteBuffer buffer = SharedMemoryMapper.getSharedDataBuffer("binaryData.dat");
Now it can use methods similar to the standard Java ones (getLong, getInt, etc...) to retrieve the data from the file. It iterates over the file, reading values of various types in the same order as written by DataGenBinary, incrementing iterationNumber and fileSize counters as before until it reaches the buffer's capacity:long iterationNumber = 0; long fileSize = 0; while (fileSize < buffer.capacity()) { iterationNumber++; // Read Long value {...} // Read Integer value {...} // Read Short value {...} // Read Float value {...} // Read Double value {...} // Read Character value {...} // Read Byte value {...} //Print read progress if ((iterationNumber % (1024 * 1024)) == 0) { System.out.println("Successfully read: " + iterationNumber); } }
Each read performs a number of checks on the datum that it read from the file:- It reads the value at the current position twice to make sure the read does not corrupt the data
- It verifies that the value is equal to the current iteration number (module the read type's MAX_VALUE where appropriate)
- It calculates the expected current buffer position by incrementing fileSize by <TYPE>.SIZE / Byte.SIZE and ensuring that it equals the actual buffer position.
If any of these checks fail, an Exception is thrown and the program terminates. Here is an example of the code for reading a Long:long prevPosition = buffer.position(); long val = buffer.getLong(); long val1 = buffer.getLong(prevPosition); if (val != val1) { throw new Exception("Long: val=" + val + " does not match val1=" + val1 + " at prevPosition " + prevPosition); } long expectedVal = iterationNumber; if (val != iterationNumber) { throw new Exception("Long: Expected value " + expectedVal + ". Found value " + val); } fileSize += (Long.SIZE / Byte.SIZE); if (fileSize != buffer.position()) { throw new Exception("Long: Expected position " + fileSize + " does not match buffer position " + buffer.position()); }
- Understand code detail for DataReadBinaryMap. As previously explained, the code for this class is identical to the code from the WordCount v1.0 example of the Apache Hadoop MapReduce tutorial, with the exception that the constructor for the Map class is as follows:
public Map() { try { this.readfile(); } catch (Exception e) { try { this.log(e.getMessage()); } catch (IOException ex) {} } }
where the log() method is a private helper method to simply output a message to a (hard-coded) file in the MapReduce log directory:private void log(String msg) throws IOException { String file = System.getenv().get("PMR_HOME") + "/logs/DataReadBinaryMap.log"; BufferedWriter out = new BufferedWriter(new FileWriter(file,true)); out.write(now() + " " + msg + "\n"); out.close();}
And the readfile() method's contents is essentially identical to the those of DataReadBinary's main() method except that it looks for the input file in $PMR_HOME and writes all output to the log file described previously:private void readfile() throws Exception { MappedBigByteBuffer buffer = SharedMemoryMapper.getSharedDataBuffer( System.getenv().get("PMR_HOME") + "/binaryData.dat"); long it = 0; long sz = 0; while (sz < buffer.capacity()) { it++; // Long {...} // Integer {...} // Short {...} // Float {...} // Double {...} // Character {...} // Byte {...} if ( (it % (1024 * 1024)) == 0 ) { this.log("Read: " + it); } this.log("Total: " + it);}
-
Locate the code samples for Linux 64-bit hosts: