In message-oriented-middleware (MOM) -style infrastructures, the message bus pattern is the single most important architectural design. Central to the Message Bus concept is the notion that all business applications connect to a message distribution engine that ensures that all messages reliably and efficiently get to where they need to go. Each business application is a peer of the others through the event-driven message bus. Each message that the bus receives is an event that the bus then processes to determine which peer will be affected by the event (in other words, needs to receive and process the message).
Figure 1. The message bus pattern
While implementations of the Message Bus pattern vary greatly depending on the vendor, there are several core concepts that are universal:
- Message channels: The message bus component is primarily comprised of one or more individual channels through which messages can be passed. Channels can have the following characteristics:
- Channels can be application specific or message specific.
- Channels can be unidirectional or bidirectional.
- Channels can be broadcast-oriented (all messages go to all applications), point-to-point (messages go to a specific application), or subscription-based (messages go to specific interested applications).
- Message broker: The message bus may include a message broker that determines where messages will be routed depending on various business criteria. The broker is optional in the base pattern but is often a critical piece in enterprise systems.
- Channel filters: Filters are components that transparently intercept messages in channels to perform some kind of operation, such as logging, transformation, context manipulation, and so on. Like brokers, filters are optional in theory but often critical in practice.
Figure 2. Common message bus components
Applications communicate with the message bus through one or more channels. Channels move messages either into or out of the message bus. There are two ways in which an application can receive messages from the bus: a push or a pull. In push exchanges, also known as event notifications, the bus initiates the transfer of the message to some waiting application. In pull exchanges the application initiates the transfer by sending some form of request to the bus and receiving one or more queued-up messages in response. The push-model is ideal for applications capable of maintaining a constant connection or listening channel through which messages can be pushed. The pull-model is ideal for applications that connect to the bus only intermittently or are incapable of maintaining a persistent connection with the bus.
Figure 3. Application-to-message bus connection options
Regardless of how the Message Bus is implemented, the types of channels used, or the message delivery model, the bus pattern is characterized by the fact that messages sent by one application will be reliably delivered to any appropriate destination regardless of where that destination is or how it has been implemented. Recipients of messages can reside on the same machine as the application sending the message or on a different machine entirely. Recipients may or may not be implemented using the same development technology or programming language. The point is that applications sending messages can be sure of the fact that messages will be delivered. Delivery might not be immediate, especially in the case of pull-model deliveries, but the message will get where it needs to go.
There are actually quite a few ways to implement a message bus, as well as quite a few vendor tools designed to make it easier. IBM®, for instance, offers WebSphere® MQ and WebSphere Business Integration Message Broker products that have been based around the Java Messaging Service API standards. Together these products provide a robust, enterprise-quality message bus architecture that meets the needs of most business cases. Another important point is that a lot of work has gone into defining a set of Web services standards that provide a model for implementing event and notification-based services. The WS-Notification family of specifications defines a comprehensive model for Web services that can be leveraged to implement message bus-style services (see Resources for a link to the WS-Notification family of specifications). In this paper, however, I focus on illustrating how the fundamental design pattern comes together, so I basically create my own implementation using the OpenJMS open source JMS implementation and Java servlets and don't worry about the standards or products at all.
This implementation consists of three JMS topics (publish-subscribe model channels):
- A Web services interface through which messages can be pushed into the Message Bus
- A JMS Queue used to implement the pull-model delivery of response messages through the Web services interface
- A handful of components that listen for and process messages that are pushed onto the message bus.
Figure 4. The message bus example
The Web service consists of an interface exposing two operations: send and receive. The send operation receives as input an object called a Case Model, which is representative of a simple application-specific message type that the message bus handles. Listing 1 shows the WSDL description for the Web service.
Listing 1. MessageBusService.wsdl
<?xml version="1.0" encoding="UTF-8"?>
<wsdl:definitions
targetNamespace="http://four.wspattern.developerworks.ibm.com"
xmlns:impl="http://four.wspattern.developerworks.ibm.com"
xmlns:intf="http://four.wspattern.developerworks.ibm.com"
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:wsdlsoap="http://schemas.xmlsoap.org/wsdl/soap/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<wsdl:types>
<schema
targetNamespace="http://four.wspattern.developerworks.ibm.com"
xmlns="http://www.w3.org/2001/XMLSchema"
xmlns:impl="http://four.wspattern.developerworks.ibm.com"
xmlns:intf="http://four.wspattern.developerworks.ibm.com"
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<complexType name="CaseModel">
<sequence>
<element name="text" nillable="true" type="xsd:string"/>
</sequence>
</complexType>
<element name="CaseModel" nillable="true" type="impl:CaseModel"/>
</schema>
</wsdl:types>
<wsdl:message name="receiveResponse">
<wsdl:part name="receiveReturn" type="intf:CaseModel"/>
</wsdl:message>
<wsdl:message name="sendRequest">
<wsdl:part name="model" type="intf:CaseModel"/>
</wsdl:message>
<wsdl:message name="receiveRequest">
</wsdl:message>
<wsdl:message name="sendResponse">
</wsdl:message>
<wsdl:portType name="MessageBusService">
<wsdl:operation name="receive">
<wsdl:input message="intf:receiveRequest" name="receiveRequest"/>
<wsdl:output message="intf:receiveResponse" name="receiveResponse"/>
</wsdl:operation>
<wsdl:operation name="send" parameterOrder="model">
<wsdl:input message="intf:sendRequest" name="sendRequest"/>
<wsdl:output message="intf:sendResponse" name="sendResponse"/>
</wsdl:operation>
</wsdl:portType>
<wsdl:binding name="MessageBusServiceSoapBinding" type="intf:MessageBusService">
<wsdlsoap:binding style="rpc" transport="http://schemas.xmlsoap.org/soap/http"/>
<wsdl:operation name="receive">
<wsdlsoap:operation soapAction=""/>
<wsdl:input name="receiveRequest">
<wsdlsoap:body
namespace="http://four.wspattern.developerworks.ibm.com"
use="literal"/>
</wsdl:input>
<wsdl:output name="receiveResponse">
<wsdlsoap:body
namespace="http://four.wspattern.developerworks.ibm.com"
use="literal"/>
</wsdl:output>
</wsdl:operation>
<wsdl:operation name="send">
<wsdlsoap:operation soapAction=""/>
<wsdl:input name="sendRequest">
<wsdlsoap:body
namespace="http://four.wspattern.developerworks.ibm.com"
use="literal"/>
</wsdl:input>
<wsdl:output name="sendResponse">
<wsdlsoap:body
namespace="http://four.wspattern.developerworks.ibm.com"
use="literal"/>
</wsdl:output>
</wsdl:operation>
</wsdl:binding>
<wsdl:service name="MessageBusServiceService">
<wsdl:port
binding="intf:MessageBusServiceSoapBinding"
name="MessageBusService">
<wsdlsoap:address
location="http://localhost:9080/WSPattern4/services/MessageBusService"/>
</wsdl:port>
</wsdl:service>
</wsdl:definitions>
|
One thing to note about this Web services interface is that there is absolutely nothing remarkable about it. It looks like most other simple Web services interfaces out there. The service's implementation, shown in Listing 2, is equally unremarkable.
Listing 2. MessageBusService.java
package com.ibm.developerworks.wspattern.four;
import java.io.Serializable;
import java.util.Enumeration;
import java.util.Iterator;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.servlet.http.HttpServlet;
import javax.xml.rpc.ServiceException;
import javax.xml.rpc.server.ServiceLifecycle;
public class MessageBusService
implements ServiceLifecycle {
Context context = null;
QueueConnection connection = null;
TopicConnection tconnection = null;
QueueSession session = null;
TopicSession tsession = null;
public void init(Object serviceContext)
throws ServiceException {
try {
context = JNDIHelper.getInitialContext();
connection = JNDIHelper.getQueueConnection(context);
session = JNDIHelper.getQueueSession(connection);
tconnection = JNDIHelper.getTopicConnection(context);
tsession = JNDIHelper.getTopicSession(tconnection);
} catch (Exception e) {}
}
public void destroy() {
try {
tsession.close();
session.close();
tconnection.close();
connection.close();
} catch (Exception e) {}
}
public void send(CaseModel model) {
sendMessage(JNDIHelper.INPUT_TOPIC, model);
}
public CaseModel receive() {
return receiveMessage(JNDIHelper.OUTPUT_QUEUE);
}
private void sendMessage(String topicName, CaseModel model) {
try {
ObjectMessage message = tsession.createObjectMessage(model);
Topic topic = JNDIHelper.getTopic(context, topicName);
TopicPublisher publisher = JNDIHelper.getTopicPublisher(tsession, topic);
publisher.publish(message);
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
private CaseModel receiveMessage(String queueName) {
CaseModel model = null;
try {
Queue queue = JNDIHelper.getQueue(context, queueName);
QueueReceiver receiver = JNDIHelper.getQueueReceiver(session, queue);
Message message = receiver.receiveNoWait();
if (message != null && message instanceof ObjectMessage) {
ObjectMessage objMessage = (ObjectMessage)message;
Serializable obj = objMessage.getObject();
if (obj instanceof CaseModel) {
model = (CaseModel)obj;
}
}
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
return model;
}
}
|
The model that the example implements is straightforward. The Web service receives a send request and publishes the model into an input JMS topic. Four different application components listen to that topic and perform some action in response to the message. Two of those components generate response messages that are delivered to an output JMS topic. Because the Web services client is not capable of maintaining a direct, persistent publish-subscribe connection to the JMS topic, a special listener receives messages from the output topic and stores those messages within a JMS response queue. When the Web services client calls the invoke message, the responses waiting in that queue are delivered.
The applications listening to the input topic include two that perform application-specific processes in response to a message (namely converting the received text to upper and lower case, respectively): the first logs the receipt of messages to a stdout console application, and the second sends notification messages to a remote Web services endpoint.
Listing 3 shows the code for one of two listeners that receive the CaseModel delivered through the message bus and convert the provided text to upper and lower case. Both are implemented as simple HTTP servlets that implement the JMS MessageListener interface. Alternatively I could have implemented each as JMS Message Driven Beans but chose not to do so, due to the mild increase in application complexity that it would cause.
Listing 3. UppercaseTopicListenerServlet.java
package com.ibm.developerworks.wspattern.four;
import java.io.Serializable;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.servlet.Servlet;
public class UppercaseTopicListenerServlet
extends TopicListenerServlet
implements Servlet {
protected String getTopic() {
return JNDIHelper.INPUT_TOPIC;
}
protected String getSelector() {
return "";
}
public void onMessage(Message message) {
try {
if (message instanceof ObjectMessage) {
ObjectMessage objMessage = (ObjectMessage) message;
Serializable obj = objMessage.getObject();
if (obj instanceof CaseModel) {
CaseModel model = (CaseModel)obj;
if (model.getText() != null) {
model.setText(model.getText().toUpperCase());
}
ObjectMessage response = session.createObjectMessage(model);
Topic responseTopic = JNDIHelper.getTopic(context, JNDIHelper.OUTPUT_TOPIC);
TopicPublisher responsePublisher =
JNDIHelper.getTopicPublisher(session, responseTopic);
responsePublisher.publish(response);
}
}
} catch (Throwable t) {}
}
}
|
Note that these listeners are identical in all respects, except for the toUpperCase() and toLowerCase() operations that are performed. Each generates a response message that contains a CaseModel object containing the converted text. Each delivers that response message to an output JMS topic.
Another listener, shown in Listing 4, waits for messages to be published on the output topic and places those messages on the response queue for delivery to the Web services client.
Listing 4. ResponseTopicListenerServlet.java
package com.ibm.developerworks.wspattern.four;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
public class ResponseTopicListenerServlet
extends TopicListenerServlet
implements Servlet {
protected QueueConnection qconnection;
protected QueueSession qsession;
protected Queue queue;
protected String getTopic() {
return JNDIHelper.OUTPUT_TOPIC;
}
protected String getSelector() {
return "";
}
public void init() throws ServletException {
super.init();
try {
qconnection = JNDIHelper.getQueueConnection(context);
qsession = JNDIHelper.getQueueSession(qconnection);
queue = JNDIHelper.getQueue(context, JNDIHelper.OUTPUT_QUEUE);
} catch (Exception e) {}
}
public void destroy() {
super.destroy();
try {
qsession.close();
qconnection.close();
} catch (Exception e) {}
}
public void onMessage(Message message) {
try {
QueueSender sender = JNDIHelper.getQueueSender(qsession, queue);
sender.send(message);
} catch (Exception e) {}
}
}
|
This store-and-forward technique for queuing-up response messages delivered through the message bus ensures that waiting Web services clients receive all messages in the exact order in which they were delivered through the message bus. One thing to note is the fact that, in this implementation, it is not ensured that Web services clients receive only the messages that are intended to be delivered to them and only them. In other words, in a real-world implementation of this model, you would need to enforce proper security and routing constraints in order to be sure that only the appropriate people receive the appropriate messages.
Listing 5 illustrates one of two application components that listen on both the input and output JMS topics. Upon receiving a message from the message bus, these components call out to a remote Web services endpoint, notifying it that a message has been received. These components are exactly the same, with the exception that they listen to different JMS topics.
Listing 5. RequestTopicListenerNotifierServlet.java
package com.ibm.developerworks.wspattern.four;
import java.io.Serializable;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.servlet.Servlet;
import javax.xml.namespace.QName;
import javax.xml.rpc.Call;
import javax.xml.rpc.ParameterMode;
import javax.xml.rpc.Service;
import javax.xml.rpc.ServiceFactory;
import javax.xml.rpc.encoding.TypeMapping;
import javax.xml.rpc.encoding.TypeMappingRegistry;
import com.ibm.ws.webservices.engine.encoding.ser.BeanDeserializerFactory;
import com.ibm.ws.webservices.engine.encoding.ser.BeanSerializerFactory;
public class RequestTopicListenerNotifierServlet
extends TopicListenerServlet
implements Servlet {
private static final String NSURI =
"http://four.wspattern.developerworks.ibm.com";
protected String getTopic() {
return JNDIHelper.INPUT_TOPIC;
}
protected String getSelector() {
return "";
}
public void onMessage(Message message) {
try {
if (message instanceof ObjectMessage) {
ObjectMessage objMessage = (ObjectMessage) message;
Serializable obj = objMessage.getObject();
if (obj instanceof CaseModel) {
CaseModel model = (CaseModel)obj;
ServiceFactory factory = ServiceFactory.newInstance();
Service service =
factory.createService(
new QName(
NSURI,
"RemoteService"
)
);
QName cmq = new QName(NSURI,"CaseModel");
TypeMappingRegistry tmreg = service.getTypeMappingRegistry();
TypeMapping tm = tmreg.createTypeMapping();
tm.register(
CaseModel.class,
cmq,
new BeanSerializerFactory(
CaseModel.class,
cmq),
new BeanDeserializerFactory(
CaseModel.class,
cmq));
tmreg.register("", tm);
Call call = service.createCall();
call.setTargetEndpointAddress(
"http://localhost:9080/WSPattern4/services/RemoteService");
call.addParameter(
"model",
cmq,
CaseModel.class,
ParameterMode.IN);
call.invoke(
new QName(
NSURI,
"notifyRequest"),
new Object[] {model}
);
}
}
} catch (Throwable t) {}
}
}
|
The final application component plugged into the message bus is a simple console application that displays stdout notifications whenever a message is delivered to either the input or output JMS topics.
Listing 6. ListenerApp.java
package com.ibm.developerworks.wspattern.four;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.Serializable;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
public class ListenerApp
implements MessageListener {
private static Context context;
private static TopicConnection connection;
private static TopicSession session;
private static Topic requestTopic;
private static Topic responseTopic;
private static TopicSubscriber requestSubscriber;
private static TopicSubscriber responseSubscriber;
public static void main(String[] args) throws Exception {
ListenerApp listener = new ListenerApp();
context = JNDIHelper.getInitialContext();
connection = JNDIHelper.getTopicConnection(context);
session = JNDIHelper.getTopicSession(connection);
requestTopic = JNDIHelper.getTopic(context, JNDIHelper.INPUT_TOPIC);
responseTopic = JNDIHelper.getTopic(context, JNDIHelper.OUTPUT_TOPIC);
requestSubscriber = JNDIHelper.getTopicSubscriber(session, requestTopic);
responseSubscriber = JNDIHelper.getTopicSubscriber(session, responseTopic);
requestSubscriber.setMessageListener(listener);
responseSubscriber.setMessageListener(listener);
String input = "";
while (!"exit".equalsIgnoreCase(input)) {
BufferedReader br =
new BufferedReader(
new InputStreamReader(
System.in));
System.out.print("> ");
input = br.readLine();
}
requestSubscriber.close();
responseSubscriber.close();
session.close();
connection.close();
}
public void onMessage(Message message) {
synchronized(System.out) {
try {
if (message instanceof ObjectMessage) {
ObjectMessage objMessage = (ObjectMessage) message;
Serializable obj = objMessage.getObject();
if (obj instanceof CaseModel) {
CaseModel model = (CaseModel) obj;
System.out.println("\nMessage Received: " + model);
System.out.println("\tSent To: " + message.getJMSDestination());
System.out.print("> ");
}
}
} catch (Exception e) {}
}
}
}
|
This particular component's goal is to demonstrate the inherent flexibility of the message bus model. Because of the way the message bus works, application components can at any time be plugged in (or removed from) the application architecture, thereby extending the capabilities and reach of the system. Shutting down the console application while the Web service is up and running in no way affects the functionality of the message bus.
The material covered in this installment has provided only a cursory glance at the basic implementation of the message bus pattern. There are plenty of issues that a real-world implementation would need to deal with, such as security, proper message routing (brokering), delivery models, an so on, all of which are important, but are out of scope for this series. The goal here, and in the previous three installments, has been to stimulate your thinking about how you can implement Web services. Simple RPC-Style Stock Quotes services do not represent the true power of the Web services family of specifications and standards.
Patterns such as the asynchronous query, command, routing and message bus models provide a tremendous amount of flexibility in the way you can implement Web services and build different types of applications. Leveraging such patterns is key to securing the maximum value from the multitude of Web services specifications, standards, and products that are available.
| Description | Name | Size | Download method |
|---|---|---|---|
| WebSphere deployable EAR file | ws-tip-altdesign4code.zip | 2102 KB | HTTP |
Information about download methods
- Read the other installments of the "Learn simple, practical Web services design patterns" series:
- Part 1: Asynchronous Web services operations using JMS
- Part 2: Encapsulate business logic with a command facade pattern
- Part 3: Creating flexible Web service implementations with the Router pattern
- Visit the Enterprise Integration Patterns Web site for more information about the message bus pattern.
- Download WebSphere Studio Application Developer, which the author used to write the example application.
- See James Snell's developerWorks blog for more discussion about Emerging Technology topics including (but not limited to) Web services.
- Browse for books on these and other technical topics.
- Want more? The developerWorks SOA and Web services zone hosts hundreds of informative articles and introductory, intermediate, and advanced tutorials on how to develop Web services applications.

James Snell is a member of the IBM Emerging Technologies Toolkit team and has spent the past few years focusing on emerging Web services technologies and standards. He maintains a weblog on developerWorks focused on emerging technologies.
Comments (Undergoing maintenance)





