Concurrent programming with Boost using IPC and MPI libraries

The Boost C++ libraries make concurrent programming fun and straightforward. Learn how to use two of the Boost libraries—the Interprocess (IPC) library and the Message Passing Interface (MPI)—to implement functionality, such as shared memory objects, synchronized file locking, and distributed communication.

Arpan Sen (arpansen@gmail.com), Independent author

Arpan Sen is a lead engineer working on the development of software in the electronic design automation industry. He has worked on several flavors of UNIX, including Solaris, SunOS, HP-UX, and IRIX as well as Linux and Microsoft Windows for several years. He takes a keen interest in software performance-optimization techniques, graph theory, and parallel computing. Arpan holds a post-graduate degree in software systems. You can reach him at arpansen@gmail.com.



03 May 2011

Also available in Chinese

Concurrent programming using the extremely popular Boost libraries is a lot of fun. Boost has several libraries within the concurrent programming space—the Interprocess library (IPC) for shared memory, memory-mapped I/O, and message queue; the Thread library for portable multi-threading; the Message Passing Interface (MPI) library for message passing, which finds use in distributed computing; and the Asio library for portable networking using sockets and other low-level functions, to name just a few. This article introduces the IPC and MPI libraries along with some of the functionality they offer.

In this article, you learn how to use the Boost IPC library to implement shared memory objects, message queues, and synchronized file locking. Using the Boost MPI library, you learn about the environment and the communicator classes and how you can achieve distributed communication.

Note: The code in this article was tested using the gcc-4.3.4 and boost-1.45 packages.

Frequently used acronyms

  • API: Application programming interface
  • I/O: Input/output
  • POSIX: Portable Operating System Interface for UNIX®
  • SDK: Software development kit

Using the Boost IPC library

Boost Interprocess is a header-only library, so all you need to do is include the appropriate header in your sources and make the compiler aware of the include path. This is a nifty feature to have; you just download the Boost sources (see Resources for a link), and you're ready to get started. For example, to use shared memory in your code, use the include shown in Listing 1.

Listing 1. The Boost IPC library is a header-only affair
#include <boost/interprocess/shared_memory_object.hpp>
using namespace boost::interprocess; 
//… your sources follow …

When you pass the information to the compiler, you request that the reader modify the include path appropriately per the installation. Then, compile the code:

bash-4.1$  g++ ipc1.cpp –I../boost_1_45_0

Create a shared memory object

Let's begin with the customary "Hello World!" program. You have two processes: The first writes the string "Hello World!" into shared memory, and the latter reads and displays the string. Create your shared memory object as shown in Listing 2.

Listing 2. Creating the shared memory object
#include <boost/interprocess/shared_memory_object.hpp>

int main(int argc, char* argv[ ]) 
{
    using namespace using boost::interprocess; 
    try { 
    // creating our first shared memory object.
    shared_memory_object sharedmem1 (create_only, "Hello", read_write);

    // setting the size of the shared memory
    sharedmem1.truncate (256);

    // … more code follows
    } catch (interprocess_exception& e) { 
    // .. .  clean up 
    } 
}

The object sharedmem1 is of type shared_memory_object (declared and defined in Boost headers) and takes three arguments in its constructor:

  • The first argument—create_only—means that this shared memory object is to be created and has not already been created. If a shared object by the same name already exists, an exception will be thrown. For a process that wants to have access to an already-created shared memory, the first argument should be open_only.
  • The second argument—Hello—is the name of the share memory region. Another process that accesses this shared memory will be using this name for the access.
  • The third argument—read_write—is the access specifier of the shared memory object. Because this process modifies the contents of the shared memory object, you use read_write. A process that only reads from this shared memory uses the read_only specifier for access.

The truncate method sets the size of the shared memory in bytes. The code should ideally be wrapped by try-catch blocks. For example, if the shared memory object cannot be created, an exception of type boost::interprocess_exception is thrown.

Using the shared memory object for writing

For a process to use a shared memory object, the process has to map the object in its address space. The mapping is done using the mapped_region class declared and defined in the header mapped_region.hpp. Another benefit of using mapped_region is that both full and partial access to the shared memory object is possible. Listing 3 shows how to use your mapped_region.

Listing 3. Using mapped_region to access shared memory objects
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>

int main(int argc, char* argv[ ]) 
{
    using namespace boost::interprocess; 
    try { 
    // creating our first shared memory object.
    shared_memory_object sharedmem1 (create_only, "Hello", read_write);

    // setting the size of the shared memory
    sharedmem1.truncate (256);

   // map the shared memory to current process 
   mapped_region mmap (sharedmem1, 256); 

    // access the mapped region using get_address
    std::strcpy(static_cast<char* >(region.get_address()), "Hello World!\n");
    
    } catch (interprocess_exception& e) { 
    // .. .  clean up 
    } 
}

That's about it, really. You have created your mapped_region object and accessed it using the get_address method. The static_cast has been done, because get_address returns a void*.

What happens to the shared memory when main exits?

The shared memory is not deleted when the process exits. To delete shared memory, you need to call shared_memory_object::remove. The access mechanism for process 2 is simple enough: Listing 4 proves this point.

Listing 4. Accessing the shared memory object from the second process
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <cstring>
#include <cstdlib>
#include <iostream>

int main(int argc, char *argv[ ])
{
      using namespace boost::interprocess; 
      try { 
      // opening an existing shared memory object 
      shared_memory_object sharedmem2 (open_only, "Hello", read_only);

      // map shared memory object in current address space
      mapped_region mmap (sharedmem2, read_only);

      // need to type-cast since get_address returns void* 
      char *str1 = static_cast<char*> (mmap.get_address());
      std::cout << str1 << std::endl;
      } catch (interprocess_exception& e) { 
          std::cout << e.what( ) << std::endl;
      } 
      return 0;
}

In Listing 4, you create the shared memory object using the open_only and read_only attributes. If the shared memory object cannot be found, an exception is thrown. Now, build and run the code in Listing 3 and Listing 4. You should see "Hello World!" in your terminal.

Next, add the following lines in the code for the second process (Listing 4) just after std::cout, and rebuild the code:

// std::cout code here
shared_memory_object::remove("Hello");
// } catch(interprocess_exception& e) {

Execute the code twice in succession. The second run prints the line "No such file or directory," confirming that the shared memory has been deleted.


Interprocess communication using message queue

Now, let's explore another popular mechanism for interprocess communication: the message queue. Each communicating process may add messages to the queue and read messages from the queue. The message queue comes with the following properties:

  • It has a name, and processes access it using the given name.
  • During queue creation, the user must specify the maximum length of the queue and the maximum size of an individual message.
  • The queue is persistent, which means that it remains in memory when the process that created it dies. The queue may be removed using an explicit call to boost::interprocess::message_queue::remove.

Listing 5 shows a code snippet in which a process has created a message queue of 20 integers.

Listing 5. Creating a message queue of 20 integers
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream> 

int main(int argc, char* argv[ ]) 
{
    using namespace boost::interprocess;
    try { 
      // creating a message queue
      message_queue mq (create_only,   // only create
                                       "mq",              // name
                                        20,                 //max message count
                                        sizeof(int)      //max message size
                                        );
       // … more code follows
    } catch (interprocess_exception& e) { 
       std::cout << e.what( ) << std::endl;
    } 
}

Note the create_only attribute passed in the constructor for message_queue. Similar to the case for a shared memory object, a message queue that is opened only for reading will have the open_only attribute passed in the constructor.

Sending and receiving data

On the sending side, you use the send method of the queue to add data. The send method signature has three inputs: a pointer to the raw data (void*), the size of the data, and a priority. For now, send all the numbers with the same priority. Listing 6 shows the code.

Listing 6. Sending messages to the queue
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream> 

int main(int argc, char* argv[ ]) 
{
    using namespace boost::interprocess;
    try { 
      // creating a message queue
      message_queue mq (create_only,   // only create
                                       "mq",              // name
                                        20,                 //max message count
                                        sizeof(int)      //max message size
                                        );
      // now send the messages to the queue
      for (int i=0; i<20; ++i) 
        mq.send(&i, sizeof(int), 0); // the 3rd argument is the priority 
    } catch (interprocess_exception& e) { 
        std::cout << e.what( ) << std::endl;
    } 
}

On the receiving side, the queue takes in the open_only attribute. The individual messages are obtained from the queue by calling the receive method of the message_queue class. Listing 7 shows the receive method signature.

Listing 7. Method signature for message_queue::receive
void receive (void *buffer,           
                      std::size_t buffer_size, 
                      std::size_t &recvd_size,
                      unsigned int &priority
                     );

Let's decipher this a bit. The first argument is where the received data from the queue will be stored. The second argument is the expected size of the received data. The third argument is the actual size of the data received, and the fourth argument is the priority of the received message. Clearly, if the second and third arguments turn out to be unequal during the course of program execution, that's an error. Listing 8 provides the code for the receiver process.

Listing 8. Receiving messages from the message queue
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream> 

int main(int argc, char* argv[ ]) 
{
    using namespace boost::interprocess;
    try { 
      // opening the message queue whose name is mq
      message_queue mq (open_only,     // only open
                                       "mq"               // name
                                        );
      size_t recvd_size; 
      unsigned int priority; 

      // now send the messages to the queue
      for (int i=0; i<20; ++i) { 
        int buffer; 
        mq.receive ((void*) &buffer, sizeof(int), recvd_size, priority); 
        if (recvd_size != sizeof(int)) 
            ; // do the error handling
        std::cout << buffer << " " << recvd_size << " " << priority;
      } 
    } catch (interprocess_exception& e) { 
        std::cout << e.what( ) << std::endl;
    } 
}

That was reasonably simple. Note that you still have not removed the message queue from memory; much like the shared memory object, this queue is persistent. For removing the queue, add the following line whenever you are done using the queue:

message_queue::remove("mq"); // remove the queue using its name

Message priority

Make the modification shown in Listing 9 on the sending side. The receiver code needs no changes.

Listing 9. Changing the priority of messages
      message_queue::remove("mq"); // remove the old queue
      message_queue mq (…); // create as before
      for (int i=0; i<20; ++i) 
        mq.send(&i, sizeof(int), i%2); // the 3rd argument is the priority
      // … rest as usual

On re-running the code, you should see the output provided in Listing 10.

Listing 10. The output as seen in the receiving process
1 4 1
3 4 1
5 4 1
7 4 1
9 4 1
11 4 1
13 4 1
15 4 1
17 4 1
19 4 1
0 4 0
2 4 0
4 4 0
6 4 0
8 4 0
10 4 0
12 4 0
14 4 0
16 4 0
18 4 0

Higher-priority messages will be available for removal by the second process, as Listing 10 confirms.


Synchronized access to a file

Shared memory and message queues are fine, but file I/O is also an important tool that processes use to communicate with each other. Synchronizing file accesses used by concurrent processes to communicate is not an easy task, but the file-locking capability from the Boost IPC library does make life simpler. Before any further explanation, look at Listing 11 to understand how a file_lock object works.

Listing 11. Using a file_lock object for synchronizing file accesses
#include <fstream> 
#include <iostream> 
#include <boost/interprocess/sync/file_lock.hpp> 
#include <cstdlib>

int main() 
{ 
    using namespace boost::interprocess; 
    std::string fileName("test"); 
    std::fstream file;

    file.open(fileName.c_str(), std::ios::out | std::ios::binary | 
std::ios::trunc); 
    if (!file.is_open() || file.bad()) 
    { 
        std::cout << "Open failed" << std::endl; 
        exit(-1); 
    }

    try { 
    file_lock f_lock(fileName.c_str());
    f_lock.lock();
    std::cout << "Locked in Process 1" << std::endl;
    file.write("Process 1", 9);
    file.flush(); 
    f_lock.unlock();
    std::cout << "Unlocked from Process 1" << std::endl;
    } catch (interprocess_exception& e) { 
    std::cout << e.what( ) << std::endl;
    }

    file.close();
    return 0; 
}

This code first opens a file, then locks it using file_lock. On completion of the writing, it flushes the file buffers and unlocks the file. You use the lock method to gain exclusive access to the file. If there's another process that is also trying to write to the file and has already invoked lock, the second process waits until the first process has voluntarily relinquished using unlock. The constructor for the file_lock class accepts the name of the file to be locked, and it's important to open the file before lock is invoked; otherwise, an exception will be thrown.

Now, copy the code in Listing 11 and make some changes to it. Specifically, make it the second process that's requesting the lock. Listing 12 shows the relevant changes.

Listing 12. Code for the second process trying to access the file
    // .. as in Listing 11
    file_lock f_lock(fileName.c_str());
    f_lock.lock();
    std::cout << "Locked in Process 2" << std::endl;
    system("sleep 4"); 
    file.write("Process 2", 9);
    file.flush(); 
    f_lock.unlock();
    std::cout << "Unlocked from Process 2" << std::endl;
    // file.close();

Now, if these two processes are run concurrently, you expect the first process to wait 4 seconds before acquiring the file_lock 50 percent of the time, all other things being equal.

Here are a few things you must remember when using file_lock. You're talking about interprocess communication here, with emphasis on process. This means that you're not supposed to use file_lock to synchronize data accesses by threads of the same process. On POSIX-compliant systems, file handles are process and not thread attributes. Here are a few guidelines for using file locking:

  • Use a single file_lock object per file per process.
  • Use the same thread to lock and unlock a file.
  • Flush data in writer processes before unlocking a file by either calling C's flush library routine or the flush method (if you prefer a C++ fstream).

Using file_lock with scoped locks

It is possible that during program execution some exception is thrown, and the file is not unlocked. Such an occurrence might result in undesirable program behavior. To avoid this situation, consider wrapping the file_lock object in a scoped_lock, defined in boost/interprocess/sync/scoped_lock.hpp. Using scoped_lock, you don't need to explicitly lock or unlock the file; the locking occurs inside the constructor, and the unlocking happens automatically whenever you exit the scope. Listing 13 shows the modification to Listing 11 to make it use scoped locks.

Listing 13. Using scoped_lock with file_lock
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/sync/file_lock.hpp>

//… code as in Listing 11
file_lock f_lock(fileName.c_str());
scoped_lock<file_lock> s_lock(f_lock);  // internally calls f_lock.lock( ); 

// No need to call explicit lock anymore
std::cout << "Locked in Process 1" << std::endl;
file.write("Process 1", 9);
// … code as in Listing 11

Note: See Resources for links to more information on the Resource Acquisition Is Initialization (RAII) programming idiom.


Learning Boost MPI

If you are not already familiar with the Message Passing Interface, before delving into Boost MPI, you should briefly check out the links to MPI resources provided in the Resources section. The MPI is an easy-to-use standard that works on the model of processes communicating with each other by passing messages. You don't need to use sockets or other level communication primitives; the MPI back end manages all the hard work. So, where does Boost MPI fit in? The creators of Boost MPI have provided an even higher level of abstraction and a simple set of routines built on top of the MPI-provided API, such as MPI_Init and MPI_Bcast.

Boost MPI is not a stand-alone library in the sense that you download it, build it, and you are ready for work. Instead, you must install any of the MPI implementations, such as MPICH or Open MPI, and build the Boost Serialization library. For details on how to build Boost MPI, see Resources. Typically, you would use the following command to build Boost MPI:

bash-4.1$ bjam –with-mpi

Windows® users can download the pre-built libraries for MPI from BoostPro (see Resources). The libraries are compatible with Microsoft® HPC Pack 2008 and 2008 R2 (see Resources) and work on Windows XP with Service Pack 3 and later client operating systems.


Hello World with MPI

There are two primary classes in the Boost MPI library that you must learn: the environment class and the communicator class. The former is responsible for the initialization of the distributed environment; the latter is used for communicating between processes. Because we're talking about distributed computing here, let's have four processes all printing "Hello World" to the terminal. Listing 14 shows the code.

Listing 14. Hello World using Boost MPI
#include <boost/mpi.hpp>
#include <iostream>

int main(int argc, char* argv[])
{
  boost::mpi::environment env(argc, argv);
  boost::mpi::communicator world;

  std::cout << argc << std::endl;
  std::cout << argv[0] << std::endl;
  std::cout << "Hello World! from process " << world.rank() << std::endl;

  return 0;
}

Now build the code in Listing 14 with proper linking to the Boost MPI and Serialization libraries. Run the executable at the shell prompt. You should see "Hello World! from process 0". Next, use your MPI dispatcher tool—for example, mpirun for Open MPI users and mpiexec for Microsoft HPC Pack 2008—and run the executable as:

mpirun –np 4 <executable name> 

OR

mpiexec –n 4 <executable name>

You should now see something like Listing 15, with mympi1 being the executable name.

Listing 15. Output from running the MPI code
1
mympi1
Hello, World! from process 3
1
mympi1
1
mympi1
Hello, World! from process 1
Hello, World! from process 2
1
mympi1
Hello, World! from process 0

There you have it. Within the MPI framework, four copies of the same process have been created. Within the MPI environment, each process has its unique ID, as determined by the communicator object. Now, try communicating between the processes. Have one process communicate with another process using the send and receive function calls. Call the process sending the message the master process and the processes receiving the message the worker process. The source code is the same for both the master and the receiver, with the functionality being decided using the rank that the world object provides (see Listing 16.

Listing 16. Code for processes 0, 1, and 2 communicating with each other
#include <boost/mpi.hpp>
#include <iostream>

int main(int argc, char* argv[]) 
{
  boost::mpi::environment env(argc, argv);
  boost::mpi::communicator world;

  if (world.rank() == 0) {
    world.send(1, 9, 32);
    world.send(2, 9, 33);
  } else { 
    int data;
    world.recv(0, 9, data);
    std::cout << "In process " << world.rank( ) << "with data " << data
                   << std::endl;
  } 
  return 0;
}

Let's start with the send function. The first ID is the ID of the receiver process; the second is message data; and, the third is the actual data. Why do you need the message tag? The receiver process might want to deal with messages that have a specific tag at some point during execution, so this scheme of doing things helps. For processes 1 and 2, the recv function is blocking, which means that the program will wait until it receives a message with tag ID 9 from process 0. When it does receive the message, the information is stored in data. Here's the output when running the code:

In process 1 with data 32
In process 2 with data 33

What happens, then, if you have something like world.recv(0, 1, data); on the receiver side? The code hangs, but in reality, the receiver process is waiting for a message with a tag that's never going to arrive.


Conclusion

This article just scratched the surface of the functionality that these two useful libraries provide. Other functionality that these libraries provide include IPC's memory-mapped I/O and MPI's broadcast ability. From a usability standpoint, IPC is easy to use. However, the MPI library is dependant on native MPI implementations, and off-the-shelf availability of a native MPI library along with the pre-built Boost MPI and Serialization libraries is still an issue. Nevertheless, it is well worth the effort to make builds from sources for both the MPI implementation and Boost.

Resources

Learn

Get products and technologies

Discuss

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into AIX and Unix on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=AIX and UNIX
ArticleID=654342
ArticleTitle=Concurrent programming with Boost using IPC and MPI libraries
publish-date=05032011