Concurrent programming using the extremely popular Boost libraries is a lot of fun. Boost has several libraries within the concurrent programming space—the Interprocess library (IPC) for shared memory, memory-mapped I/O, and message queue; the Thread library for portable multi-threading; the Message Passing Interface (MPI) library for message passing, which finds use in distributed computing; and the Asio library for portable networking using sockets and other low-level functions, to name just a few. This article introduces the IPC and MPI libraries along with some of the functionality they offer.
In this article, you learn how to use the Boost IPC library to implement shared memory objects, message queues, and synchronized file locking. Using the Boost MPI library, you learn about the environment and the communicator classes and how you can achieve distributed communication.
Note: The code in this article was tested using the
gcc-4.3.4 and boost-1.45
packages.
Boost Interprocess is a header-only library, so all you need to do is include the
appropriate header in your sources and make the compiler aware of the
include path. This is a nifty feature to have; you
just download the Boost sources (see Resources for a
link), and you're ready to get started. For example, to use shared memory in your
code, use the include shown in Listing 1.
Listing 1. The Boost IPC library is a header-only affair
#include <boost/interprocess/shared_memory_object.hpp> using namespace boost::interprocess; //… your sources follow … |
When you pass the information to the compiler, you request that the reader modify
the include path appropriately per the installation.
Then, compile the code:
bash-4.1$ g++ ipc1.cpp –I../boost_1_45_0 |
Let's begin with the customary "Hello World!" program. You have two processes: The first writes the string "Hello World!" into shared memory, and the latter reads and displays the string. Create your shared memory object as shown in Listing 2.
Listing 2. Creating the shared memory object
#include <boost/interprocess/shared_memory_object.hpp>
int main(int argc, char* argv[ ])
{
using namespace using boost::interprocess;
try {
// creating our first shared memory object.
shared_memory_object sharedmem1 (create_only, "Hello", read_write);
// setting the size of the shared memory
sharedmem1.truncate (256);
// … more code follows
} catch (interprocess_exception& e) {
// .. . clean up
}
}
|
The object sharedmem1 is of type
shared_memory_object (declared and defined in
Boost headers) and takes three arguments in its constructor:
- The first argument—
create_only—means that this shared memory object is to be created and has not already been created. If a shared object by the same name already exists, an exception will be thrown. For a process that wants to have access to an already-created shared memory, the first argument should beopen_only. - The second argument—
Hello—is the name of the share memory region. Another process that accesses this shared memory will be using this name for the access. - The third argument—
read_write—is the access specifier of the shared memory object. Because this process modifies the contents of the shared memory object, you useread_write. A process that only reads from this shared memory uses theread_onlyspecifier for access.
The truncate method sets the size of the shared memory in
bytes. The code should ideally be wrapped by try-catch
blocks. For example, if the shared memory object cannot be created, an exception of type boost::interprocess_exception is thrown.
Using the shared memory object for writing
For a process to use a shared memory object, the process has to map the object in its
address space. The mapping is done using the mapped_region
class declared and defined in the header mapped_region.hpp. Another
benefit of using mapped_region is that
both full and partial access to the shared memory object is possible. Listing 3 shows how to use your mapped_region.
Listing 3. Using mapped_region to access shared memory objects
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
int main(int argc, char* argv[ ])
{
using namespace boost::interprocess;
try {
// creating our first shared memory object.
shared_memory_object sharedmem1 (create_only, "Hello", read_write);
// setting the size of the shared memory
sharedmem1.truncate (256);
// map the shared memory to current process
mapped_region mmap (sharedmem1, 256);
// access the mapped region using get_address
std::strcpy(static_cast<char* >(region.get_address()), "Hello World!\n");
} catch (interprocess_exception& e) {
// .. . clean up
}
}
|
That's about it, really. You have created your mapped_region
object and accessed it using the get_address method.
The static_cast has been done, because
get_address returns a void*.
What happens to the shared memory when main exits?
The shared memory is not deleted when the process exits. To delete shared memory,
you need to call shared_memory_object::remove. The
access mechanism for process 2 is simple enough: Listing 4
proves this point.
Listing 4. Accessing the shared memory object from the second process
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <cstring>
#include <cstdlib>
#include <iostream>
int main(int argc, char *argv[ ])
{
using namespace boost::interprocess;
try {
// opening an existing shared memory object
shared_memory_object sharedmem2 (open_only, "Hello", read_only);
// map shared memory object in current address space
mapped_region mmap (sharedmem2, read_only);
// need to type-cast since get_address returns void*
char *str1 = static_cast<char*> (mmap.get_address());
std::cout << str1 << std::endl;
} catch (interprocess_exception& e) {
std::cout << e.what( ) << std::endl;
}
return 0;
}
|
In Listing 4, you create the shared memory object using the open_only
and read_only attributes. If the shared memory
object cannot be found, an exception is thrown. Now, build and run the code in
Listing 3 and Listing 4. You should
see "Hello World!" in your terminal.
Next, add the following lines in the code for the second process
(Listing 4) just after std::cout,
and rebuild the code:
// std::cout code here
shared_memory_object::remove("Hello");
// } catch(interprocess_exception& e) {
|
Execute the code twice in succession. The second run prints the line "No such file or directory," confirming that the shared memory has been deleted.
Interprocess communication using message queue
Now, let's explore another popular mechanism for interprocess communication: the message queue. Each communicating process may add messages to the queue and read messages from the queue. The message queue comes with the following properties:
- It has a name, and processes access it using the given name.
- During queue creation, the user must specify the maximum length of the queue and the maximum size of an individual message.
- The queue is persistent, which means that it remains in memory when the process
that created it dies. The queue may be removed using an explicit call to
boost::interprocess::message_queue::remove.
Listing 5 shows a code snippet in which a process has created a message queue of 20 integers.
Listing 5. Creating a message queue of 20 integers
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
int main(int argc, char* argv[ ])
{
using namespace boost::interprocess;
try {
// creating a message queue
message_queue mq (create_only, // only create
"mq", // name
20, //max message count
sizeof(int) //max message size
);
// … more code follows
} catch (interprocess_exception& e) {
std::cout << e.what( ) << std::endl;
}
}
|
Note the create_only attribute passed in the constructor for
message_queue. Similar to the case for a shared memory
object, a message queue that is opened only for reading will have the
open_only attribute passed in the constructor.
On the sending side, you use the send method of the
queue to add data. The send method signature has
three inputs: a pointer to the raw data (void*), the
size of the data, and a priority. For now, send all the numbers with the same
priority. Listing 6 shows the code.
Listing 6. Sending messages to the queue
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
int main(int argc, char* argv[ ])
{
using namespace boost::interprocess;
try {
// creating a message queue
message_queue mq (create_only, // only create
"mq", // name
20, //max message count
sizeof(int) //max message size
);
// now send the messages to the queue
for (int i=0; i<20; ++i)
mq.send(&i, sizeof(int), 0); // the 3rd argument is the priority
} catch (interprocess_exception& e) {
std::cout << e.what( ) << std::endl;
}
}
|
On the receiving side, the queue takes in the open_only
attribute. The individual messages are obtained from the queue by calling the
receive method of the
message_queue class. Listing 7
shows the receive method signature.
Listing 7. Method signature for message_queue::receive
void receive (void *buffer,
std::size_t buffer_size,
std::size_t &recvd_size,
unsigned int &priority
);
|
Let's decipher this a bit. The first argument is where the received data from the queue will be stored. The second argument is the expected size of the received data. The third argument is the actual size of the data received, and the fourth argument is the priority of the received message. Clearly, if the second and third arguments turn out to be unequal during the course of program execution, that's an error. Listing 8 provides the code for the receiver process.
Listing 8. Receiving messages from the message queue
#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
int main(int argc, char* argv[ ])
{
using namespace boost::interprocess;
try {
// opening the message queue whose name is mq
message_queue mq (open_only, // only open
"mq" // name
);
size_t recvd_size;
unsigned int priority;
// now send the messages to the queue
for (int i=0; i<20; ++i) {
int buffer;
mq.receive ((void*) &buffer, sizeof(int), recvd_size, priority);
if (recvd_size != sizeof(int))
; // do the error handling
std::cout << buffer << " " << recvd_size << " " << priority;
}
} catch (interprocess_exception& e) {
std::cout << e.what( ) << std::endl;
}
}
|
That was reasonably simple. Note that you still have not removed the message queue from memory; much like the shared memory object, this queue is persistent. For removing the queue, add the following line whenever you are done using the queue:
message_queue::remove("mq"); // remove the queue using its name
|
Make the modification shown in Listing 9 on the sending side. The receiver code needs no changes.
Listing 9. Changing the priority of messages
message_queue::remove("mq"); // remove the old queue
message_queue mq (…); // create as before
for (int i=0; i<20; ++i)
mq.send(&i, sizeof(int), i%2); // the 3rd argument is the priority
// … rest as usual
|
On re-running the code, you should see the output provided in Listing 10.
Listing 10. The output as seen in the receiving process
1 4 1 3 4 1 5 4 1 7 4 1 9 4 1 11 4 1 13 4 1 15 4 1 17 4 1 19 4 1 0 4 0 2 4 0 4 4 0 6 4 0 8 4 0 10 4 0 12 4 0 14 4 0 16 4 0 18 4 0 |
Higher-priority messages will be available for removal by the second process, as Listing 10 confirms.
Shared memory and message queues are fine, but file I/O is also an important tool that
processes use to communicate with each other. Synchronizing file accesses used by
concurrent processes to communicate is not an easy task, but the file-locking capability
from the Boost IPC library does make life simpler. Before any further explanation, look
at Listing 11 to understand how a file_lock
object works.
Listing 11. Using a file_lock object for synchronizing file accesses
#include <fstream>
#include <iostream>
#include <boost/interprocess/sync/file_lock.hpp>
#include <cstdlib>
int main()
{
using namespace boost::interprocess;
std::string fileName("test");
std::fstream file;
file.open(fileName.c_str(), std::ios::out | std::ios::binary |
std::ios::trunc);
if (!file.is_open() || file.bad())
{
std::cout << "Open failed" << std::endl;
exit(-1);
}
try {
file_lock f_lock(fileName.c_str());
f_lock.lock();
std::cout << "Locked in Process 1" << std::endl;
file.write("Process 1", 9);
file.flush();
f_lock.unlock();
std::cout << "Unlocked from Process 1" << std::endl;
} catch (interprocess_exception& e) {
std::cout << e.what( ) << std::endl;
}
file.close();
return 0;
}
|
This code first opens a file, then locks it using file_lock. On
completion of the writing, it flushes the file buffers and unlocks the file. You use the
lock method to gain exclusive access to the file. If there's
another process that is also trying to write to the file and has already invoked lock, the
second process waits until the first process has voluntarily relinquished using
unlock. The constructor for the file_lock
class accepts the name of the file to be locked, and it's important to open the file before
lock is invoked; otherwise, an exception will be thrown.
Now, copy the code in Listing 11 and make some changes to it. Specifically, make it the second process that's requesting the lock. Listing 12 shows the relevant changes.
Listing 12. Code for the second process trying to access the file
// .. as in Listing 11
file_lock f_lock(fileName.c_str());
f_lock.lock();
std::cout << "Locked in Process 2" << std::endl;
system("sleep 4");
file.write("Process 2", 9);
file.flush();
f_lock.unlock();
std::cout << "Unlocked from Process 2" << std::endl;
// file.close();
|
Now, if these two processes are run concurrently, you expect the first process to
wait 4 seconds before acquiring the file_lock 50 percent of
the time, all other things being equal.
Here are a few things you must remember when using file_lock. You're
talking about interprocess communication here, with emphasis on process.
This means that you're not supposed to use file_lock to
synchronize data accesses by threads of the same process. On POSIX-compliant systems,
file handles are process and not thread attributes. Here are a few guidelines
for using file locking:
- Use a single
file_lockobject per file per process. - Use the same thread to lock and unlock a file.
- Flush data in writer processes before unlocking a file by either calling
C'sflushlibrary routine or theflushmethod (if you prefer aC++ fstream).
Using file_lock with scoped locks
It is possible that during program execution some exception is thrown, and the file
is not unlocked. Such an occurrence might result in undesirable program
behavior. To avoid this situation, consider wrapping the file_lock
object in a scoped_lock, defined in
boost/interprocess/sync/scoped_lock.hpp. Using scoped_lock,
you don't need to explicitly lock or unlock the file; the locking occurs inside the
constructor, and the unlocking happens automatically whenever you exit the
scope. Listing 13 shows the modification to
Listing 11 to make it use scoped locks.
Listing 13. Using scoped_lock with file_lock
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/interprocess/sync/file_lock.hpp>
//… code as in Listing 11
file_lock f_lock(fileName.c_str());
scoped_lock<file_lock> s_lock(f_lock); // internally calls f_lock.lock( );
// No need to call explicit lock anymore
std::cout << "Locked in Process 1" << std::endl;
file.write("Process 1", 9);
// … code as in Listing 11
|
Note: See Resources for links to more information on the Resource Acquisition Is Initialization (RAII) programming idiom.
If you are not already familiar with the Message Passing Interface, before delving into
Boost MPI, you should briefly check out the links to MPI resources provided in the
Resources section. The MPI is an easy-to-use standard that
works on the model of processes communicating with each other by passing messages.
You don't need to use sockets or other level communication primitives; the MPI back
end manages all the hard work. So, where does Boost MPI fit in? The creators of
Boost MPI have provided an even higher level of abstraction and a simple set of
routines built on top of the MPI-provided API, such as MPI_Init
and MPI_Bcast.
Boost MPI is not a stand-alone library in the sense that you download it, build it, and you are ready for work. Instead, you must install any of the MPI implementations, such as MPICH or Open MPI, and build the Boost Serialization library. For details on how to build Boost MPI, see Resources. Typically, you would use the following command to build Boost MPI:
bash-4.1$ bjam –with-mpi |
Windows® users can download the pre-built libraries for MPI from BoostPro (see Resources). The libraries are compatible with Microsoft® HPC Pack 2008 and 2008 R2 (see Resources) and work on Windows XP with Service Pack 3 and later client operating systems.
There are two primary classes in the Boost MPI library that you must learn: the
environment class and the
communicator class. The former is responsible for the
initialization of the distributed environment; the latter is used for communicating
between processes. Because we're talking about distributed computing here, let's
have four processes all printing "Hello World" to the terminal. Listing 14
shows the code.
Listing 14. Hello World using Boost MPI
#include <boost/mpi.hpp>
#include <iostream>
int main(int argc, char* argv[])
{
boost::mpi::environment env(argc, argv);
boost::mpi::communicator world;
std::cout << argc << std::endl;
std::cout << argv[0] << std::endl;
std::cout << "Hello World! from process " << world.rank() << std::endl;
return 0;
}
|
Now build the code in Listing 14 with proper linking to the
Boost MPI and Serialization libraries. Run the executable at the shell prompt. You
should see "Hello World! from process 0". Next, use your MPI dispatcher tool—for
example, mpirun for Open MPI users and
mpiexec for Microsoft HPC Pack 2008—and
run the executable as:
mpirun –np 4 <executable name> OR mpiexec –n 4 <executable name> |
You should now see something like Listing 15, with mympi1 being the executable name.
Listing 15. Output from running the MPI code
1 mympi1 Hello, World! from process 3 1 mympi1 1 mympi1 Hello, World! from process 1 Hello, World! from process 2 1 mympi1 Hello, World! from process 0 |
There you have it. Within the MPI framework, four copies of the same process have been
created. Within the MPI environment, each process has its unique ID, as determined
by the communicator object. Now, try communicating
between the processes. Have one process communicate with another process using
the send and receive function
calls. Call the process sending the message the master process and the
processes receiving the message the worker process. The source code is
the same for both the master and the receiver, with the functionality being decided
using the rank that the world object provides (see Listing 16.
Listing 16. Code for processes 0, 1, and 2 communicating with each other
#include <boost/mpi.hpp>
#include <iostream>
int main(int argc, char* argv[])
{
boost::mpi::environment env(argc, argv);
boost::mpi::communicator world;
if (world.rank() == 0) {
world.send(1, 9, 32);
world.send(2, 9, 33);
} else {
int data;
world.recv(0, 9, data);
std::cout << "In process " << world.rank( ) << "with data " << data
<< std::endl;
}
return 0;
}
|
Let's start with the send function. The first ID is the ID
of the receiver process; the second is message data; and, the third is the actual
data. Why do you need the message tag? The receiver process might want to deal
with messages that have a specific tag at some point during execution, so this
scheme of doing things helps. For processes 1 and 2, the recv
function is blocking, which means that the program will wait until it
receives a message with tag ID 9 from process 0. When it does receive the message,
the information is stored in data. Here's the output when running the code:
In process 1 with data 32 In process 2 with data 33 |
What happens, then, if you have something like world.recv(0, 1, data);
on the receiver side? The code hangs, but in reality, the receiver process is waiting for
a message with a tag that's never going to arrive.
This article just scratched the surface of the functionality that these two useful libraries provide. Other functionality that these libraries provide include IPC's memory-mapped I/O and MPI's broadcast ability. From a usability standpoint, IPC is easy to use. However, the MPI library is dependant on native MPI implementations, and off-the-shelf availability of a native MPI library along with the pre-built Boost MPI and Serialization libraries is still an issue. Nevertheless, it is well worth the effort to make builds from sources for both the MPI implementation and Boost.
Learn
-
Learn more about
interprocess
communication.
-
Learn how to build
Boost MPI.
-
Discover all the
mpirunoptions. -
Learn more about the
RAII idiom.
-
Read the Message Passing Interface
standard.
-
Learn more about the Microsoft
HPC Pack 2008/R2 SDK.
-
AIX and UNIX developerWorks
zone: The AIX and UNIX zone provides a wealth of information relating to
all aspects of AIX systems administration and expanding your UNIX skills.
-
New to AIX and UNIX?
Visit the New to AIX and UNIX page to learn more.
-
Technology
bookstore: Browse the technology bookstore for books on this and other
technical topics.
Get products and technologies
-
Learn more about and download the
Boost
Thread library.
-
Learn more about and download the
Boost MPI library.
-
Download the pre-built MPI library from BoostPro.
-
Download the Boost
IPC library.
-
Check out the MPICH2
download site.
-
Check out the Open MPI v1.4
download site.
Discuss
-
Follow developerWorks on Twitter.
-
developerWorks blogs: Check out
our blogs and get involved in the developerWorks
community.
-
Participate in the AIX and UNIX forums:
- AIX 5L—technical forum
- AIX for Developers Forum
- Cluster Systems Management
- IBM Support Assistant
- Performance Tools—technical
- More AIX and UNIX forums
Arpan Sen is a lead engineer working on the development of software in the electronic design automation industry. He has worked on several flavors of UNIX, including Solaris, SunOS, HP-UX, and IRIX as well as Linux and Microsoft Windows for several years. He takes a keen interest in software performance-optimization techniques, graph theory, and parallel computing. Arpan holds a post-graduate degree in software systems. You can reach him at arpansen@gmail.com.




