Skip to main content

Web services programming tips and tricks: Learn simple, practical Web services design patterns, Part 4

Understand and implement the message bus pattern

James Snell, Software Engineer, IBM
Author photo
James Snell is a member of the IBM Emerging Technologies Toolkit team and has spent the past few years focusing on emerging Web services technologies and standards. He maintains a weblog on developerWorks focused on emerging technologies.

Summary:  Author James Snell continues a short series of discussions that focus on the application of well-defined and proven Web application design strategies to the world of Web services. In this installment, explore the message bus pattern, which ties together asynchronous, flexible, message-oriented service implementations based on well-known and proven design concepts.

Date:  14 Dec 2004
Level:  Intermediate
Activity:  1957 views

The message bus pattern

In message-oriented-middleware (MOM) -style infrastructures, the message bus pattern is the single most important architectural design. Central to the Message Bus concept is the notion that all business applications connect to a message distribution engine that ensures that all messages reliably and efficiently get to where they need to go. Each business application is a peer of the others through the event-driven message bus. Each message that the bus receives is an event that the bus then processes to determine which peer will be affected by the event (in other words, needs to receive and process the message).


Figure 1. The message bus pattern
The message bus pattern

While implementations of the Message Bus pattern vary greatly depending on the vendor, there are several core concepts that are universal:

  • Message channels: The message bus component is primarily comprised of one or more individual channels through which messages can be passed. Channels can have the following characteristics:
    • Channels can be application specific or message specific.
    • Channels can be unidirectional or bidirectional.
    • Channels can be broadcast-oriented (all messages go to all applications), point-to-point (messages go to a specific application), or subscription-based (messages go to specific interested applications).
  • Message broker: The message bus may include a message broker that determines where messages will be routed depending on various business criteria. The broker is optional in the base pattern but is often a critical piece in enterprise systems.
  • Channel filters: Filters are components that transparently intercept messages in channels to perform some kind of operation, such as logging, transformation, context manipulation, and so on. Like brokers, filters are optional in theory but often critical in practice.

Figure 2. Common message bus components
Common message bus components

Applications communicate with the message bus through one or more channels. Channels move messages either into or out of the message bus. There are two ways in which an application can receive messages from the bus: a push or a pull. In push exchanges, also known as event notifications, the bus initiates the transfer of the message to some waiting application. In pull exchanges the application initiates the transfer by sending some form of request to the bus and receiving one or more queued-up messages in response. The push-model is ideal for applications capable of maintaining a constant connection or listening channel through which messages can be pushed. The pull-model is ideal for applications that connect to the bus only intermittently or are incapable of maintaining a persistent connection with the bus.


Figure 3. Application-to-message bus connection options
Appication-to-message bus connection options

Regardless of how the Message Bus is implemented, the types of channels used, or the message delivery model, the bus pattern is characterized by the fact that messages sent by one application will be reliably delivered to any appropriate destination regardless of where that destination is or how it has been implemented. Recipients of messages can reside on the same machine as the application sending the message or on a different machine entirely. Recipients may or may not be implemented using the same development technology or programming language. The point is that applications sending messages can be sure of the fact that messages will be delivered. Delivery might not be immediate, especially in the case of pull-model deliveries, but the message will get where it needs to go.


Implement a message bus

There are actually quite a few ways to implement a message bus, as well as quite a few vendor tools designed to make it easier. IBM®, for instance, offers WebSphere® MQ and WebSphere Business Integration Message Broker products that have been based around the Java Messaging Service API standards. Together these products provide a robust, enterprise-quality message bus architecture that meets the needs of most business cases. Another important point is that a lot of work has gone into defining a set of Web services standards that provide a model for implementing event and notification-based services. The WS-Notification family of specifications defines a comprehensive model for Web services that can be leveraged to implement message bus-style services (see Resources for a link to the WS-Notification family of specifications). In this paper, however, I focus on illustrating how the fundamental design pattern comes together, so I basically create my own implementation using the OpenJMS open source JMS implementation and Java servlets and don't worry about the standards or products at all.

This implementation consists of three JMS topics (publish-subscribe model channels):

  • A Web services interface through which messages can be pushed into the Message Bus
  • A JMS Queue used to implement the pull-model delivery of response messages through the Web services interface
  • A handful of components that listen for and process messages that are pushed onto the message bus.

Figure 4. The message bus example
The message bus example

The Web service consists of an interface exposing two operations: send and receive. The send operation receives as input an object called a Case Model, which is representative of a simple application-specific message type that the message bus handles. Listing 1 shows the WSDL description for the Web service.


Listing 1. MessageBusService.wsdl

<?xml version="1.0" encoding="UTF-8"?>
<wsdl:definitions 
  targetNamespace="http://four.wspattern.developerworks.ibm.com" 
  xmlns:impl="http://four.wspattern.developerworks.ibm.com" 
  xmlns:intf="http://four.wspattern.developerworks.ibm.com" 
  xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/" 
  xmlns:wsdlsoap="http://schemas.xmlsoap.org/wsdl/soap/" 
  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
 <wsdl:types>
  <schema 
    targetNamespace="http://four.wspattern.developerworks.ibm.com" 
    xmlns="http://www.w3.org/2001/XMLSchema" 
    xmlns:impl="http://four.wspattern.developerworks.ibm.com" 
    xmlns:intf="http://four.wspattern.developerworks.ibm.com" 
    xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/" 
    xmlns:xsd="http://www.w3.org/2001/XMLSchema">
   <complexType name="CaseModel">
    <sequence>
     <element name="text" nillable="true" type="xsd:string"/>
    </sequence>
   </complexType>
   <element name="CaseModel" nillable="true" type="impl:CaseModel"/>
  </schema>
 </wsdl:types>

   <wsdl:message name="receiveResponse">
      <wsdl:part name="receiveReturn" type="intf:CaseModel"/>
   </wsdl:message>

   <wsdl:message name="sendRequest">
      <wsdl:part name="model" type="intf:CaseModel"/>
   </wsdl:message>

   <wsdl:message name="receiveRequest">
   </wsdl:message>

   <wsdl:message name="sendResponse">
   </wsdl:message>

   <wsdl:portType name="MessageBusService">
      <wsdl:operation name="receive">
         <wsdl:input message="intf:receiveRequest" name="receiveRequest"/>
         <wsdl:output message="intf:receiveResponse" name="receiveResponse"/>
      </wsdl:operation>

      <wsdl:operation name="send" parameterOrder="model">
         <wsdl:input message="intf:sendRequest" name="sendRequest"/>
         <wsdl:output message="intf:sendResponse" name="sendResponse"/>
      </wsdl:operation>
   </wsdl:portType>

   <wsdl:binding name="MessageBusServiceSoapBinding" type="intf:MessageBusService">
      <wsdlsoap:binding style="rpc" transport="http://schemas.xmlsoap.org/soap/http"/>

      <wsdl:operation name="receive">
         <wsdlsoap:operation soapAction=""/>
         <wsdl:input name="receiveRequest">
            <wsdlsoap:body 
              namespace="http://four.wspattern.developerworks.ibm.com" 
              use="literal"/>
         </wsdl:input>
         <wsdl:output name="receiveResponse">
            <wsdlsoap:body 
              namespace="http://four.wspattern.developerworks.ibm.com" 
              use="literal"/>
         </wsdl:output>
      </wsdl:operation>

      <wsdl:operation name="send">
         <wsdlsoap:operation soapAction=""/>
         <wsdl:input name="sendRequest">
            <wsdlsoap:body 
              namespace="http://four.wspattern.developerworks.ibm.com" 
              use="literal"/>
         </wsdl:input>

         <wsdl:output name="sendResponse">
            <wsdlsoap:body 
              namespace="http://four.wspattern.developerworks.ibm.com" 
              use="literal"/>
         </wsdl:output>

      </wsdl:operation>
   </wsdl:binding>


   <wsdl:service name="MessageBusServiceService">
      <wsdl:port 
          binding="intf:MessageBusServiceSoapBinding" 
          name="MessageBusService">
         <wsdlsoap:address 
            location="http://localhost:9080/WSPattern4/services/MessageBusService"/>
      </wsdl:port>
   </wsdl:service>

</wsdl:definitions>

One thing to note about this Web services interface is that there is absolutely nothing remarkable about it. It looks like most other simple Web services interfaces out there. The service's implementation, shown in Listing 2, is equally unremarkable.


Listing 2. MessageBusService.java

package com.ibm.developerworks.wspattern.four;

import java.io.Serializable;
import java.util.Enumeration;
import java.util.Iterator;

import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.servlet.http.HttpServlet;
import javax.xml.rpc.ServiceException;
import javax.xml.rpc.server.ServiceLifecycle;

public class MessageBusService 
  implements ServiceLifecycle {

  Context context = null;
  QueueConnection connection = null;
  TopicConnection tconnection = null;
  QueueSession session = null;
  TopicSession tsession = null;
  
  public void init(Object serviceContext) 
    throws ServiceException {
      try {
        context = JNDIHelper.getInitialContext();
        connection = JNDIHelper.getQueueConnection(context);
        session = JNDIHelper.getQueueSession(connection);
        tconnection = JNDIHelper.getTopicConnection(context);
        tsession = JNDIHelper.getTopicSession(tconnection);
      } catch (Exception e) {}
  }

  public void destroy() {
    try {
      tsession.close();
      session.close();
      tconnection.close();
      connection.close();
    } catch (Exception e) {}
  }
  
  public void send(CaseModel model) {
    sendMessage(JNDIHelper.INPUT_TOPIC, model);
  }

  public CaseModel receive() {
    return receiveMessage(JNDIHelper.OUTPUT_QUEUE);
  }
 
  private void sendMessage(String topicName, CaseModel model) {
    try {
      ObjectMessage message = tsession.createObjectMessage(model);
      Topic topic = JNDIHelper.getTopic(context, topicName);
      TopicPublisher publisher = JNDIHelper.getTopicPublisher(tsession, topic);
      publisher.publish(message);
    } catch (Exception e) {
      throw new RuntimeException(e.getMessage());
    }
  }
 
  private CaseModel receiveMessage(String queueName) {
    CaseModel model = null;
    try {
      Queue queue = JNDIHelper.getQueue(context, queueName);
      QueueReceiver receiver = JNDIHelper.getQueueReceiver(session, queue);
      Message message = receiver.receiveNoWait();
      if (message != null && message instanceof ObjectMessage) {
        ObjectMessage objMessage = (ObjectMessage)message;
        Serializable obj = objMessage.getObject();
        if (obj instanceof CaseModel) {
          model = (CaseModel)obj;
        }
      }
    } catch (Exception e) {
      throw new RuntimeException(e.getMessage());
    }
    return model;    
  }

}

The model that the example implements is straightforward. The Web service receives a send request and publishes the model into an input JMS topic. Four different application components listen to that topic and perform some action in response to the message. Two of those components generate response messages that are delivered to an output JMS topic. Because the Web services client is not capable of maintaining a direct, persistent publish-subscribe connection to the JMS topic, a special listener receives messages from the output topic and stores those messages within a JMS response queue. When the Web services client calls the invoke message, the responses waiting in that queue are delivered.

The applications listening to the input topic include two that perform application-specific processes in response to a message (namely converting the received text to upper and lower case, respectively): the first logs the receipt of messages to a stdout console application, and the second sends notification messages to a remote Web services endpoint.

Listing 3 shows the code for one of two listeners that receive the CaseModel delivered through the message bus and convert the provided text to upper and lower case. Both are implemented as simple HTTP servlets that implement the JMS MessageListener interface. Alternatively I could have implemented each as JMS Message Driven Beans but chose not to do so, due to the mild increase in application complexity that it would cause.


Listing 3. UppercaseTopicListenerServlet.java

package com.ibm.developerworks.wspattern.four;

import java.io.Serializable;

import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.servlet.Servlet;

public class UppercaseTopicListenerServlet
  extends TopicListenerServlet
  implements Servlet {

  protected String getTopic() {
    return JNDIHelper.INPUT_TOPIC;
  }

  protected String getSelector() {
    return "";
  }

  public void onMessage(Message message) {
    try {
      if (message instanceof ObjectMessage) {
        ObjectMessage objMessage = (ObjectMessage) message;
        Serializable obj = objMessage.getObject();
        if (obj instanceof CaseModel) {
          CaseModel model = (CaseModel)obj;
          if (model.getText() != null) {
            model.setText(model.getText().toUpperCase());
          }
          ObjectMessage response = session.createObjectMessage(model);
          Topic responseTopic = JNDIHelper.getTopic(context, JNDIHelper.OUTPUT_TOPIC);
          TopicPublisher responsePublisher = 
            JNDIHelper.getTopicPublisher(session, responseTopic);
          responsePublisher.publish(response);
        }
      }
    } catch (Throwable t) {}
  }

}

Note that these listeners are identical in all respects, except for the toUpperCase() and toLowerCase() operations that are performed. Each generates a response message that contains a CaseModel object containing the converted text. Each delivers that response message to an output JMS topic.

Another listener, shown in Listing 4, waits for messages to be published on the output topic and places those messages on the response queue for delivery to the Web services client.


Listing 4. ResponseTopicListenerServlet.java

package com.ibm.developerworks.wspattern.four;

import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.servlet.Servlet;
import javax.servlet.ServletException;

public class ResponseTopicListenerServlet
  extends TopicListenerServlet
  implements Servlet {

    protected QueueConnection qconnection;
    protected QueueSession qsession;
    protected Queue queue;

    protected String getTopic() {
      return JNDIHelper.OUTPUT_TOPIC;
    }

    protected String getSelector() {
      return "";
    }

    public void init() throws ServletException {
      super.init();
      try {
        qconnection = JNDIHelper.getQueueConnection(context);
        qsession = JNDIHelper.getQueueSession(qconnection);
        queue = JNDIHelper.getQueue(context, JNDIHelper.OUTPUT_QUEUE);
      } catch (Exception e) {}
    }

    public void destroy() {
      super.destroy();
      try {
        qsession.close();
        qconnection.close();
      } catch (Exception e) {}
    }

    public void onMessage(Message message) {
      try {
        QueueSender sender = JNDIHelper.getQueueSender(qsession, queue);
        sender.send(message);
      } catch (Exception e) {}
    }

}

This store-and-forward technique for queuing-up response messages delivered through the message bus ensures that waiting Web services clients receive all messages in the exact order in which they were delivered through the message bus. One thing to note is the fact that, in this implementation, it is not ensured that Web services clients receive only the messages that are intended to be delivered to them and only them. In other words, in a real-world implementation of this model, you would need to enforce proper security and routing constraints in order to be sure that only the appropriate people receive the appropriate messages.

Listing 5 illustrates one of two application components that listen on both the input and output JMS topics. Upon receiving a message from the message bus, these components call out to a remote Web services endpoint, notifying it that a message has been received. These components are exactly the same, with the exception that they listen to different JMS topics.


Listing 5. RequestTopicListenerNotifierServlet.java

package com.ibm.developerworks.wspattern.four;

import java.io.Serializable;

import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.servlet.Servlet;
import javax.xml.namespace.QName;
import javax.xml.rpc.Call;
import javax.xml.rpc.ParameterMode;
import javax.xml.rpc.Service;
import javax.xml.rpc.ServiceFactory;
import javax.xml.rpc.encoding.TypeMapping;
import javax.xml.rpc.encoding.TypeMappingRegistry;

import com.ibm.ws.webservices.engine.encoding.ser.BeanDeserializerFactory;
import com.ibm.ws.webservices.engine.encoding.ser.BeanSerializerFactory;

public class RequestTopicListenerNotifierServlet
  extends TopicListenerServlet
  implements Servlet {

  private static final String NSURI = 
    "http://four.wspattern.developerworks.ibm.com";

  protected String getTopic() {
    return JNDIHelper.INPUT_TOPIC;
  }

  protected String getSelector() {
    return "";
  }

  public void onMessage(Message message) {
    try {
      if (message instanceof ObjectMessage) {
        ObjectMessage objMessage = (ObjectMessage) message;
        Serializable obj = objMessage.getObject();
        if (obj instanceof CaseModel) {
          CaseModel model = (CaseModel)obj;
          ServiceFactory factory = ServiceFactory.newInstance();
          Service service = 
            factory.createService(
              new QName(
                NSURI,
                "RemoteService"
              )
            );
            
          QName cmq = new QName(NSURI,"CaseModel");  
          TypeMappingRegistry tmreg = service.getTypeMappingRegistry();
          TypeMapping tm = tmreg.createTypeMapping();
          tm.register(
            CaseModel.class,
            cmq,
            new BeanSerializerFactory(
              CaseModel.class, 
              cmq),
            new BeanDeserializerFactory(
              CaseModel.class, 
              cmq));
          tmreg.register("", tm);

          Call call = service.createCall();
          call.setTargetEndpointAddress(
            "http://localhost:9080/WSPattern4/services/RemoteService");
          call.addParameter(
            "model", 
            cmq, 
            CaseModel.class, 
            ParameterMode.IN);
          call.invoke(
            new QName(
              NSURI, 
              "notifyRequest"), 
              new Object[] {model}
          );
        }
      }
    } catch (Throwable t) {}
  }

}

The final application component plugged into the message bus is a simple console application that displays stdout notifications whenever a message is delivered to either the input or output JMS topics.


Listing 6. ListenerApp.java

package com.ibm.developerworks.wspattern.four;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.Serializable;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;

public class ListenerApp
  implements MessageListener {

  private static Context context;
  private static TopicConnection connection;
  private static TopicSession session;
  private static Topic requestTopic;
  private static Topic responseTopic;
  private static TopicSubscriber requestSubscriber;
  private static TopicSubscriber responseSubscriber;

  public static void main(String[] args) throws Exception {
    
    ListenerApp listener = new ListenerApp();
    context = JNDIHelper.getInitialContext();
    connection = JNDIHelper.getTopicConnection(context);
    session = JNDIHelper.getTopicSession(connection);
    requestTopic = JNDIHelper.getTopic(context, JNDIHelper.INPUT_TOPIC);
    responseTopic = JNDIHelper.getTopic(context, JNDIHelper.OUTPUT_TOPIC);
    requestSubscriber = JNDIHelper.getTopicSubscriber(session, requestTopic);
    responseSubscriber = JNDIHelper.getTopicSubscriber(session, responseTopic);
    requestSubscriber.setMessageListener(listener);
    responseSubscriber.setMessageListener(listener);
    
    String input = "";
    while (!"exit".equalsIgnoreCase(input)) {
      BufferedReader br = 
        new BufferedReader(
          new InputStreamReader(
            System.in));
      System.out.print("> ");
      input = br.readLine();
    }
 
    requestSubscriber.close();
    responseSubscriber.close();
    session.close();
    connection.close();    
  }

  public void onMessage(Message message) {
    synchronized(System.out) {
      try {
        if (message instanceof ObjectMessage) {
          ObjectMessage objMessage = (ObjectMessage) message;
          Serializable obj = objMessage.getObject();
          if (obj instanceof CaseModel) {
            CaseModel model = (CaseModel) obj;
            System.out.println("\nMessage Received: " + model);
            System.out.println("\tSent To: " + message.getJMSDestination());
            System.out.print("> ");
          }
        }
      } catch (Exception e) {}
    }
  }

}

This particular component's goal is to demonstrate the inherent flexibility of the message bus model. Because of the way the message bus works, application components can at any time be plugged in (or removed from) the application architecture, thereby extending the capabilities and reach of the system. Shutting down the console application while the Web service is up and running in no way affects the functionality of the message bus.


Closing points

The material covered in this installment has provided only a cursory glance at the basic implementation of the message bus pattern. There are plenty of issues that a real-world implementation would need to deal with, such as security, proper message routing (brokering), delivery models, an so on, all of which are important, but are out of scope for this series. The goal here, and in the previous three installments, has been to stimulate your thinking about how you can implement Web services. Simple RPC-Style Stock Quotes services do not represent the true power of the Web services family of specifications and standards.

Patterns such as the asynchronous query, command, routing and message bus models provide a tremendous amount of flexibility in the way you can implement Web services and build different types of applications. Leveraging such patterns is key to securing the maximum value from the multitude of Web services specifications, standards, and products that are available.



Download

DescriptionNameSizeDownload method
WebSphere deployable EAR filews-tip-altdesign4code.zip2102 KB HTTP

Information about download methods


Resources

About the author

Author photo

James Snell is a member of the IBM Emerging Technologies Toolkit team and has spent the past few years focusing on emerging Web services technologies and standards. He maintains a weblog on developerWorks focused on emerging technologies.

Comments (Undergoing maintenance)



Trademarks  |  My developerWorks terms and conditions

Help: Update or add to My dW interests

What's this?

This little timesaver lets you update your My developerWorks profile with just one click! The general subject of this content (AIX and UNIX, Information Management, Lotus, Rational, Tivoli, WebSphere, Java, Linux, Open source, SOA and Web services, Web development, or XML) will be added to the interests section of your profile, if it's not there already. You only need to be logged in to My developerWorks.

And what's the point of adding your interests to your profile? That's how you find other users with the same interests as yours, and see what they're reading and contributing to the community. Your interests also help us recommend relevant developerWorks content to you.

View your My developerWorks profile

Return from help

Help: Remove from My dW interests

What's this?

Removing this interest does not alter your profile, but rather removes this piece of content from a list of all content for which you've indicated interest. In a future enhancement to My developerWorks, you'll be able to see a record of that content.

View your My developerWorks profile

Return from help

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=SOA and Web services, XML
ArticleID=32183
ArticleTitle=Web services programming tips and tricks: Learn simple, practical Web services design patterns, Part 4
publish-date=12142004
author1-email=
author1-email-cc=

My developerWorks community

Tags

Help
Use the search field to find all types of content in My developerWorks with that tag.

Use the slider bar to see more or fewer tags.

Popular tags shows the top tags for this particular content zone (for example, Java technology, Linux, WebSphere).

My tags shows your tags for this particular content zone (for example, Java technology, Linux, WebSphere).

Use the search field to find all types of content in My developerWorks with that tag. Popular tags shows the top tags for this particular content zone (for example, Java technology, Linux, WebSphere). My tags shows your tags for this particular content zone (for example, Java technology, Linux, WebSphere).

Rate a product. Write a review.

Special offers