Skip to main content

By clicking Submit, you agree to the developerWorks terms of use.

The first time you sign into developerWorks, a profile is created for you. Select information in your profile (name, country/region, and company) is displayed to the public and will accompany any content you post. You may update your IBM account at any time.

All information submitted is secure.

  • Close [x]

The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerworks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

By clicking Submit, you agree to the developerWorks terms of use.

All information submitted is secure.

  • Close [x]

Web services programming tips and tricks: Learn simple, practical Web services design patterns, Part 4

Understand and implement the message bus pattern

James Snell, Software Engineer, IBM
Author photo
James Snell is a member of the IBM Emerging Technologies Toolkit team and has spent the past few years focusing on emerging Web services technologies and standards. He maintains a weblog on developerWorks focused on emerging technologies.

Summary:  Author James Snell continues a short series of discussions that focus on the application of well-defined and proven Web application design strategies to the world of Web services. In this installment, explore the message bus pattern, which ties together asynchronous, flexible, message-oriented service implementations based on well-known and proven design concepts.

Date:  14 Dec 2004
Level:  Intermediate
Also available in:   Russian  Japanese

Activity:  10941 views
Comments:  

The message bus pattern

In message-oriented-middleware (MOM) -style infrastructures, the message bus pattern is the single most important architectural design. Central to the Message Bus concept is the notion that all business applications connect to a message distribution engine that ensures that all messages reliably and efficiently get to where they need to go. Each business application is a peer of the others through the event-driven message bus. Each message that the bus receives is an event that the bus then processes to determine which peer will be affected by the event (in other words, needs to receive and process the message).


Figure 1. The message bus pattern
The message bus pattern

While implementations of the Message Bus pattern vary greatly depending on the vendor, there are several core concepts that are universal:

  • Message channels: The message bus component is primarily comprised of one or more individual channels through which messages can be passed. Channels can have the following characteristics:
    • Channels can be application specific or message specific.
    • Channels can be unidirectional or bidirectional.
    • Channels can be broadcast-oriented (all messages go to all applications), point-to-point (messages go to a specific application), or subscription-based (messages go to specific interested applications).
  • Message broker: The message bus may include a message broker that determines where messages will be routed depending on various business criteria. The broker is optional in the base pattern but is often a critical piece in enterprise systems.
  • Channel filters: Filters are components that transparently intercept messages in channels to perform some kind of operation, such as logging, transformation, context manipulation, and so on. Like brokers, filters are optional in theory but often critical in practice.

Figure 2. Common message bus components
Common message bus components

Applications communicate with the message bus through one or more channels. Channels move messages either into or out of the message bus. There are two ways in which an application can receive messages from the bus: a push or a pull. In push exchanges, also known as event notifications, the bus initiates the transfer of the message to some waiting application. In pull exchanges the application initiates the transfer by sending some form of request to the bus and receiving one or more queued-up messages in response. The push-model is ideal for applications capable of maintaining a constant connection or listening channel through which messages can be pushed. The pull-model is ideal for applications that connect to the bus only intermittently or are incapable of maintaining a persistent connection with the bus.


Figure 3. Application-to-message bus connection options
Appication-to-message bus connection options

Regardless of how the Message Bus is implemented, the types of channels used, or the message delivery model, the bus pattern is characterized by the fact that messages sent by one application will be reliably delivered to any appropriate destination regardless of where that destination is or how it has been implemented. Recipients of messages can reside on the same machine as the application sending the message or on a different machine entirely. Recipients may or may not be implemented using the same development technology or programming language. The point is that applications sending messages can be sure of the fact that messages will be delivered. Delivery might not be immediate, especially in the case of pull-model deliveries, but the message will get where it needs to go.


Implement a message bus

There are actually quite a few ways to implement a message bus, as well as quite a few vendor tools designed to make it easier. IBM®, for instance, offers WebSphere® MQ and WebSphere Business Integration Message Broker products that have been based around the Java Messaging Service API standards. Together these products provide a robust, enterprise-quality message bus architecture that meets the needs of most business cases. Another important point is that a lot of work has gone into defining a set of Web services standards that provide a model for implementing event and notification-based services. The WS-Notification family of specifications defines a comprehensive model for Web services that can be leveraged to implement message bus-style services (see Resources for a link to the WS-Notification family of specifications). In this paper, however, I focus on illustrating how the fundamental design pattern comes together, so I basically create my own implementation using the OpenJMS open source JMS implementation and Java servlets and don't worry about the standards or products at all.

This implementation consists of three JMS topics (publish-subscribe model channels):

  • A Web services interface through which messages can be pushed into the Message Bus
  • A JMS Queue used to implement the pull-model delivery of response messages through the Web services interface
  • A handful of components that listen for and process messages that are pushed onto the message bus.

Figure 4. The message bus example
The message bus example

The Web service consists of an interface exposing two operations: send and receive. The send operation receives as input an object called a Case Model, which is representative of a simple application-specific message type that the message bus handles. Listing 1 shows the WSDL description for the Web service.


Listing 1. MessageBusService.wsdl

<?xml version="1.0" encoding="UTF-8"?>
<wsdl:definitions 
  targetNamespace="http://four.wspattern.developerworks.ibm.com" 
  xmlns:impl="http://four.wspattern.developerworks.ibm.com" 
  xmlns:intf="http://four.wspattern.developerworks.ibm.com" 
  xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/" 
  xmlns:wsdlsoap="http://schemas.xmlsoap.org/wsdl/soap/" 
  xmlns:xsd="http://www.w3.org/2001/XMLSchema">
 <wsdl:types>
  <schema 
    targetNamespace="http://four.wspattern.developerworks.ibm.com" 
    xmlns="http://www.w3.org/2001/XMLSchema" 
    xmlns:impl="http://four.wspattern.developerworks.ibm.com" 
    xmlns:intf="http://four.wspattern.developerworks.ibm.com" 
    xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/" 
    xmlns:xsd="http://www.w3.org/2001/XMLSchema">
   <complexType name="CaseModel">
    <sequence>
     <element name="text" nillable="true" type="xsd:string"/>
    </sequence>
   </complexType>
   <element name="CaseModel" nillable="true" type="impl:CaseModel"/>
  </schema>
 </wsdl:types>

   <wsdl:message name="receiveResponse">
      <wsdl:part name="receiveReturn" type="intf:CaseModel"/>
   </wsdl:message>

   <wsdl:message name="sendRequest">
      <wsdl:part name="model" type="intf:CaseModel"/>
   </wsdl:message>

   <wsdl:message name="receiveRequest">
   </wsdl:message>

   <wsdl:message name="sendResponse">
   </wsdl:message>

   <wsdl:portType name="MessageBusService">
      <wsdl:operation name="receive">
         <wsdl:input message="intf:receiveRequest" name="receiveRequest"/>
         <wsdl:output message="intf:receiveResponse" name="receiveResponse"/>
      </wsdl:operation>

      <wsdl:operation name="send" parameterOrder="model">
         <wsdl:input message="intf:sendRequest" name="sendRequest"/>
         <wsdl:output message="intf:sendResponse" name="sendResponse"/>
      </wsdl:operation>
   </wsdl:portType>

   <wsdl:binding name="MessageBusServiceSoapBinding" type="intf:MessageBusService">
      <wsdlsoap:binding style="rpc" transport="http://schemas.xmlsoap.org/soap/http"/>

      <wsdl:operation name="receive">
         <wsdlsoap:operation soapAction=""/>
         <wsdl:input name="receiveRequest">
            <wsdlsoap:body 
              namespace="http://four.wspattern.developerworks.ibm.com" 
              use="literal"/>
         </wsdl:input>
         <wsdl:output name="receiveResponse">
            <wsdlsoap:body 
              namespace="http://four.wspattern.developerworks.ibm.com" 
              use="literal"/>
         </wsdl:output>
      </wsdl:operation>

      <wsdl:operation name="send">
         <wsdlsoap:operation soapAction=""/>
         <wsdl:input name="sendRequest">
            <wsdlsoap:body 
              namespace="http://four.wspattern.developerworks.ibm.com" 
              use="literal"/>
         </wsdl:input>

         <wsdl:output name="sendResponse">
            <wsdlsoap:body 
              namespace="http://four.wspattern.developerworks.ibm.com" 
              use="literal"/>
         </wsdl:output>

      </wsdl:operation>
   </wsdl:binding>


   <wsdl:service name="MessageBusServiceService">
      <wsdl:port 
          binding="intf:MessageBusServiceSoapBinding" 
          name="MessageBusService">
         <wsdlsoap:address 
            location="http://localhost:9080/WSPattern4/services/MessageBusService"/>
      </wsdl:port>
   </wsdl:service>

</wsdl:definitions>

One thing to note about this Web services interface is that there is absolutely nothing remarkable about it. It looks like most other simple Web services interfaces out there. The service's implementation, shown in Listing 2, is equally unremarkable.


Listing 2. MessageBusService.java

package com.ibm.developerworks.wspattern.four;

import java.io.Serializable;
import java.util.Enumeration;
import java.util.Iterator;

import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.servlet.http.HttpServlet;
import javax.xml.rpc.ServiceException;
import javax.xml.rpc.server.ServiceLifecycle;

public class MessageBusService 
  implements ServiceLifecycle {

  Context context = null;
  QueueConnection connection = null;
  TopicConnection tconnection = null;
  QueueSession session = null;
  TopicSession tsession = null;
  
  public void init(Object serviceContext) 
    throws ServiceException {
      try {
        context = JNDIHelper.getInitialContext();
        connection = JNDIHelper.getQueueConnection(context);
        session = JNDIHelper.getQueueSession(connection);
        tconnection = JNDIHelper.getTopicConnection(context);
        tsession = JNDIHelper.getTopicSession(tconnection);
      } catch (Exception e) {}
  }

  public void destroy() {
    try {
      tsession.close();
      session.close();
      tconnection.close();
      connection.close();
    } catch (Exception e) {}
  }
  
  public void send(CaseModel model) {
    sendMessage(JNDIHelper.INPUT_TOPIC, model);
  }

  public CaseModel receive() {
    return receiveMessage(JNDIHelper.OUTPUT_QUEUE);
  }
 
  private void sendMessage(String topicName, CaseModel model) {
    try {
      ObjectMessage message = tsession.createObjectMessage(model);
      Topic topic = JNDIHelper.getTopic(context, topicName);
      TopicPublisher publisher = JNDIHelper.getTopicPublisher(tsession, topic);
      publisher.publish(message);
    } catch (Exception e) {
      throw new RuntimeException(e.getMessage());
    }
  }
 
  private CaseModel receiveMessage(String queueName) {
    CaseModel model = null;
    try {
      Queue queue = JNDIHelper.getQueue(context, queueName);
      QueueReceiver receiver = JNDIHelper.getQueueReceiver(session, queue);
      Message message = receiver.receiveNoWait();
      if (message != null && message instanceof ObjectMessage) {
        ObjectMessage objMessage = (ObjectMessage)message;
        Serializable obj = objMessage.getObject();
        if (obj instanceof CaseModel) {
          model = (CaseModel)obj;
        }
      }
    } catch (Exception e) {
      throw new RuntimeException(e.getMessage());
    }
    return model;    
  }

}

The model that the example implements is straightforward. The Web service receives a send request and publishes the model into an input JMS topic. Four different application components listen to that topic and perform some action in response to the message. Two of those components generate response messages that are delivered to an output JMS topic. Because the Web services client is not capable of maintaining a direct, persistent publish-subscribe connection to the JMS topic, a special listener receives messages from the output topic and stores those messages within a JMS response queue. When the Web services client calls the invoke message, the responses waiting in that queue are delivered.

The applications listening to the input topic include two that perform application-specific processes in response to a message (namely converting the received text to upper and lower case, respectively): the first logs the receipt of messages to a stdout console application, and the second sends notification messages to a remote Web services endpoint.

Listing 3 shows the code for one of two listeners that receive the CaseModel delivered through the message bus and convert the provided text to upper and lower case. Both are implemented as simple HTTP servlets that implement the JMS MessageListener interface. Alternatively I could have implemented each as JMS Message Driven Beans but chose not to do so, due to the mild increase in application complexity that it would cause.


Listing 3. UppercaseTopicListenerServlet.java

package com.ibm.developerworks.wspattern.four;

import java.io.Serializable;

import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.servlet.Servlet;

public class UppercaseTopicListenerServlet
  extends TopicListenerServlet
  implements Servlet {

  protected String getTopic() {
    return JNDIHelper.INPUT_TOPIC;
  }

  protected String getSelector() {
    return "";
  }

  public void onMessage(Message message) {
    try {
      if (message instanceof ObjectMessage) {
        ObjectMessage objMessage = (ObjectMessage) message;
        Serializable obj = objMessage.getObject();
        if (obj instanceof CaseModel) {
          CaseModel model = (CaseModel)obj;
          if (model.getText() != null) {
            model.setText(model.getText().toUpperCase());
          }
          ObjectMessage response = session.createObjectMessage(model);
          Topic responseTopic = JNDIHelper.getTopic(context, JNDIHelper.OUTPUT_TOPIC);
          TopicPublisher responsePublisher = 
            JNDIHelper.getTopicPublisher(session, responseTopic);
          responsePublisher.publish(response);
        }
      }
    } catch (Throwable t) {}
  }

}

Note that these listeners are identical in all respects, except for the toUpperCase() and toLowerCase() operations that are performed. Each generates a response message that contains a CaseModel object containing the converted text. Each delivers that response message to an output JMS topic.

Another listener, shown in Listing 4, waits for messages to be published on the output topic and places those messages on the response queue for delivery to the Web services client.


Listing 4. ResponseTopicListenerServlet.java

package com.ibm.developerworks.wspattern.four;

import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.servlet.Servlet;
import javax.servlet.ServletException;

public class ResponseTopicListenerServlet
  extends TopicListenerServlet
  implements Servlet {

    protected QueueConnection qconnection;
    protected QueueSession qsession;
    protected Queue queue;

    protected String getTopic() {
      return JNDIHelper.OUTPUT_TOPIC;
    }

    protected String getSelector() {
      return "";
    }

    public void init() throws ServletException {
      super.init();
      try {
        qconnection = JNDIHelper.getQueueConnection(context);
        qsession = JNDIHelper.getQueueSession(qconnection);
        queue = JNDIHelper.getQueue(context, JNDIHelper.OUTPUT_QUEUE);
      } catch (Exception e) {}
    }

    public void destroy() {
      super.destroy();
      try {
        qsession.close();
        qconnection.close();
      } catch (Exception e) {}
    }

    public void onMessage(Message message) {
      try {
        QueueSender sender = JNDIHelper.getQueueSender(qsession, queue);
        sender.send(message);
      } catch (Exception e) {}
    }

}

This store-and-forward technique for queuing-up response messages delivered through the message bus ensures that waiting Web services clients receive all messages in the exact order in which they were delivered through the message bus. One thing to note is the fact that, in this implementation, it is not ensured that Web services clients receive only the messages that are intended to be delivered to them and only them. In other words, in a real-world implementation of this model, you would need to enforce proper security and routing constraints in order to be sure that only the appropriate people receive the appropriate messages.

Listing 5 illustrates one of two application components that listen on both the input and output JMS topics. Upon receiving a message from the message bus, these components call out to a remote Web services endpoint, notifying it that a message has been received. These components are exactly the same, with the exception that they listen to different JMS topics.


Listing 5. RequestTopicListenerNotifierServlet.java

package com.ibm.developerworks.wspattern.four;

import java.io.Serializable;

import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.servlet.Servlet;
import javax.xml.namespace.QName;
import javax.xml.rpc.Call;
import javax.xml.rpc.ParameterMode;
import javax.xml.rpc.Service;
import javax.xml.rpc.ServiceFactory;
import javax.xml.rpc.encoding.TypeMapping;
import javax.xml.rpc.encoding.TypeMappingRegistry;

import com.ibm.ws.webservices.engine.encoding.ser.BeanDeserializerFactory;
import com.ibm.ws.webservices.engine.encoding.ser.BeanSerializerFactory;

public class RequestTopicListenerNotifierServlet
  extends TopicListenerServlet
  implements Servlet {

  private static final String NSURI = 
    "http://four.wspattern.developerworks.ibm.com";

  protected String getTopic() {
    return JNDIHelper.INPUT_TOPIC;
  }

  protected String getSelector() {
    return "";
  }

  public void onMessage(Message message) {
    try {
      if (message instanceof ObjectMessage) {
        ObjectMessage objMessage = (ObjectMessage) message;
        Serializable obj = objMessage.getObject();
        if (obj instanceof CaseModel) {
          CaseModel model = (CaseModel)obj;
          ServiceFactory factory = ServiceFactory.newInstance();
          Service service = 
            factory.createService(
              new QName(
                NSURI,
                "RemoteService"
              )
            );
            
          QName cmq = new QName(NSURI,"CaseModel");  
          TypeMappingRegistry tmreg = service.getTypeMappingRegistry();
          TypeMapping tm = tmreg.createTypeMapping();
          tm.register(
            CaseModel.class,
            cmq,
            new BeanSerializerFactory(
              CaseModel.class, 
              cmq),
            new BeanDeserializerFactory(
              CaseModel.class, 
              cmq));
          tmreg.register("", tm);

          Call call = service.createCall();
          call.setTargetEndpointAddress(
            "http://localhost:9080/WSPattern4/services/RemoteService");
          call.addParameter(
            "model", 
            cmq, 
            CaseModel.class, 
            ParameterMode.IN);
          call.invoke(
            new QName(
              NSURI, 
              "notifyRequest"), 
              new Object[] {model}
          );
        }
      }
    } catch (Throwable t) {}
  }

}

The final application component plugged into the message bus is a simple console application that displays stdout notifications whenever a message is delivered to either the input or output JMS topics.


Listing 6. ListenerApp.java

package com.ibm.developerworks.wspattern.four;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.Serializable;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;

public class ListenerApp
  implements MessageListener {

  private static Context context;
  private static TopicConnection connection;
  private static TopicSession session;
  private static Topic requestTopic;
  private static Topic responseTopic;
  private static TopicSubscriber requestSubscriber;
  private static TopicSubscriber responseSubscriber;

  public static void main(String[] args) throws Exception {
    
    ListenerApp listener = new ListenerApp();
    context = JNDIHelper.getInitialContext();
    connection = JNDIHelper.getTopicConnection(context);
    session = JNDIHelper.getTopicSession(connection);
    requestTopic = JNDIHelper.getTopic(context, JNDIHelper.INPUT_TOPIC);
    responseTopic = JNDIHelper.getTopic(context, JNDIHelper.OUTPUT_TOPIC);
    requestSubscriber = JNDIHelper.getTopicSubscriber(session, requestTopic);
    responseSubscriber = JNDIHelper.getTopicSubscriber(session, responseTopic);
    requestSubscriber.setMessageListener(listener);
    responseSubscriber.setMessageListener(listener);
    
    String input = "";
    while (!"exit".equalsIgnoreCase(input)) {
      BufferedReader br = 
        new BufferedReader(
          new InputStreamReader(
            System.in));
      System.out.print("> ");
      input = br.readLine();
    }
 
    requestSubscriber.close();
    responseSubscriber.close();
    session.close();
    connection.close();    
  }

  public void onMessage(Message message) {
    synchronized(System.out) {
      try {
        if (message instanceof ObjectMessage) {
          ObjectMessage objMessage = (ObjectMessage) message;
          Serializable obj = objMessage.getObject();
          if (obj instanceof CaseModel) {
            CaseModel model = (CaseModel) obj;
            System.out.println("\nMessage Received: " + model);
            System.out.println("\tSent To: " + message.getJMSDestination());
            System.out.print("> ");
          }
        }
      } catch (Exception e) {}
    }
  }

}

This particular component's goal is to demonstrate the inherent flexibility of the message bus model. Because of the way the message bus works, application components can at any time be plugged in (or removed from) the application architecture, thereby extending the capabilities and reach of the system. Shutting down the console application while the Web service is up and running in no way affects the functionality of the message bus.


Closing points

The material covered in this installment has provided only a cursory glance at the basic implementation of the message bus pattern. There are plenty of issues that a real-world implementation would need to deal with, such as security, proper message routing (brokering), delivery models, an so on, all of which are important, but are out of scope for this series. The goal here, and in the previous three installments, has been to stimulate your thinking about how you can implement Web services. Simple RPC-Style Stock Quotes services do not represent the true power of the Web services family of specifications and standards.

Patterns such as the asynchronous query, command, routing and message bus models provide a tremendous amount of flexibility in the way you can implement Web services and build different types of applications. Leveraging such patterns is key to securing the maximum value from the multitude of Web services specifications, standards, and products that are available.



Download

DescriptionNameSizeDownload method
WebSphere deployable EAR filews-tip-altdesign4code.zip2102 KB HTTP

Information about download methods


Resources

About the author

Author photo

James Snell is a member of the IBM Emerging Technologies Toolkit team and has spent the past few years focusing on emerging Web services technologies and standards. He maintains a weblog on developerWorks focused on emerging technologies.

Report abuse help

Report abuse

Thank you. This entry has been flagged for moderator attention.


Report abuse help

Report abuse

Report abuse submission failed. Please try again later.


developerWorks: Sign in


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Select information in your profile (name, country/region, and company) is displayed to the public and will accompany any content you post. You may update your IBM account at any time.

Choose your display name

The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


Rate this article

Comments

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=SOA and Web services, XML
ArticleID=32183
ArticleTitle=Web services programming tips and tricks: Learn simple, practical Web services design patterns, Part 4
publish-date=12142004
author1-email=
author1-email-cc=