级别: 初级 James M Snell, 软件工程师, IBM
2004 年 12 月 14 日 作者 James Snell 为 Web 服务世界提供了简短系列的关于定义良好的且提供 Web 应用程序设计策略的应用程序的讨论。在这部分中,将探究消息总线模式(message bus pattern),该模式将异步性、灵活性及基于人们所熟悉的且提供设计概念的面向消息服务的实现连接在一起。
消息总线模式
在面向消息中间件(MOM)——基础架构样式中,消息总线模式是唯一最重要的架构设计。消息总线概念的核心部分是:所有的业务应用程序都连接到消息分发引擎上,该引擎确保将所有的消息可靠且有效的分发到它们该去的地方。通过事件驱动消息总线的每一个业务应用程序都是其它应用程序的同位体。总线接收到的每一个消息都是总线下一步需要处理的事件,通过处理该事件来确定该事件将影响哪个客户端(换句话说,需要接收及处理消息)。
图 1. 消息总线模式
虽然消息总线模式的实现与供应商有根大的关系,但其中仍有几个核心通用的概念:
-
消息通道:消息总线组件主要包含一个或多个单独的通道,可以通过这些通道传递消息。通道有下列特性:
- 通道可以是应用程序指定的或消息指定的。
- 通道可以是单向的或双向的。
- 通道可以是
定向广播的(所有的消息发向所有的应用程序),
点对点的(消息发向特定的应用程序)或
基于订阅的(消息发向特定感兴趣的应用程序)。
-
消息代理: 消息总线可以包含消息代理,通过代理可以根据不同的业务标准确定该往何处发送消息。在基本模式中代理是可选的但是在企业系统中经常是关键性部件。
-
通道过滤器: 过滤器是通道中执行某种类型操作的透明的截取消息的组件,例如记录日志、转换、上下文处理等等。就像代理一样,在理论上过滤器可选的,但在实际中却是很关键的。
图 2. 公共消息总线组件
应用程序通过一个或多个通道与消息总线交互。通道将消息移入或移出消息总线。应用程序有两种方式可从总线中接收消息:
push 或
pull。在
push 交换(也被称为
事件通知)中,总线为某些等待的应用程序启动消息的传输。在
pull 交换中,应用程序通过给总线发送某种形式的请求并接收一个或多个在响应中排队等候的消息来启动传输。推动模型是应用程序有能力保持固定连接或监听消息传递的通道。拖拽模型是应用程序仅间歇的连接到总线或不能与总线保持持久连接。
图 3. 应用程序要消息总线连接选项
不管消息总线是如何实现的、使用的通道类型以及消息传送模型,总线模型总是表现于这样的事实之上:一个应用程序发送的消息将很可靠的被传递给任何适当的目的地,而不管该目的地在哪或它是如何被实现的。消息的接收者可以与发送消息的应用程序同处于相同的机器上或完全不同的机器上。接收者可以使用相同或不同的部署技术或编程语言来实现。关键是,发送消息的应用程序可以确保消息将会被发送这个事实。可能不会立即传输,特别是在推动模型传输的情况下更是如此,但是消息将知道它该去向何处。
实现消息总线
确实有一些方法可以实现消息总线,并且也有许多设计的第三方工具使得实现它更容易了。例如,IBM® 提供了WebSphere® MQ 和 WebSphere Business Integration Message Broker 产品,他们都是基于 Java Messaging Service API 标准的。这些产品共同提供了健壮的、企业质量的消息总线架构,它们能满足大多数业务案例的需要。另外一个关键部分是一大部分工作已经进入了定义一套 Web 服务标准,该标准提供实现事件及基于通告的服务的模型。规范的 WS-Notification 系列为 Web 服务定义了全面的模型,可以利用该模型来实现消息总线样式服务(参见
参考资料,规范的 WS-Notification 系列)。然而在本文中我将重点放在解释基本的设计模式是如何聚到一起的,所以我基本上使用开源的 JMS 实现:OpenJMS 以及 Java servlet 来创建我自己的实现,因此我们根本不用担心什么标准或产品。
这个实现由三个 JMS 主题组成(发布-订阅模型通道):
- 可以通过 Web 服务接口将消息推进消息总线中
- JMS Queue 通常通过 Web 服务接口实现响应消息的拖拽模型传输
- 少数监听流程消息的组件被推进消息总线中。
图 4. 消息总线举例
Web 服务由包含两个操作的接口组成:
send 及
receive。
send 操作接收称为
Case Model 的对象作为输入,该对象代表消息总线处理的应用程序指定的消息类型。
清单 1 Web 服务的 WSDL 描述。
清单 1. MessageBusService.wsdl
<?xml version="1.0" encoding="UTF-8"?>
<wsdl:definitions
targetNamespace="http://four.wspattern.developerworks.ibm.com"
xmlns:impl="http://four.wspattern.developerworks.ibm.com"
xmlns:intf="http://four.wspattern.developerworks.ibm.com"
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:wsdlsoap="http://schemas.xmlsoap.org/wsdl/soap/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<wsdl:types>
<schema
targetNamespace="http://four.wspattern.developerworks.ibm.com"
xmlns="http://www.w3.org/2001/XMLSchema"
xmlns:impl="http://four.wspattern.developerworks.ibm.com"
xmlns:intf="http://four.wspattern.developerworks.ibm.com"
xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
xmlns:xsd="http://www.w3.org/2001/XMLSchema">
<complexType name="CaseModel">
<sequence>
<element name="text" nillable="true" type="xsd:string"/>
</sequence>
</complexType>
<element name="CaseModel" nillable="true" type="impl:CaseModel"/>
</schema>
</wsdl:types>
<wsdl:message name="receiveResponse">
<wsdl:part name="receiveReturn" type="intf:CaseModel"/>
</wsdl:message>
<wsdl:message name="sendRequest">
<wsdl:part name="model" type="intf:CaseModel"/>
</wsdl:message>
<wsdl:message name="receiveRequest">
</wsdl:message>
<wsdl:message name="sendResponse">
</wsdl:message>
<wsdl:portType name="MessageBusService">
<wsdl:operation name="receive">
<wsdl:input message="intf:receiveRequest" name="receiveRequest"/>
<wsdl:output message="intf:receiveResponse" name="receiveResponse"/>
</wsdl:operation>
<wsdl:operation name="send" parameterOrder="model">
<wsdl:input message="intf:sendRequest" name="sendRequest"/>
<wsdl:output message="intf:sendResponse" name="sendResponse"/>
</wsdl:operation>
</wsdl:portType>
<wsdl:binding name="MessageBusServiceSoapBinding" type="intf:MessageBusService">
<wsdlsoap:binding style="rpc" transport="http://schemas.xmlsoap.org/soap/http"/>
<wsdl:operation name="receive">
<wsdlsoap:operation soapAction=""/>
<wsdl:input name="receiveRequest">
<wsdlsoap:body
namespace="http://four.wspattern.developerworks.ibm.com"
use="literal"/>
</wsdl:input>
<wsdl:output name="receiveResponse">
<wsdlsoap:body
namespace="http://four.wspattern.developerworks.ibm.com"
use="literal"/>
</wsdl:output>
</wsdl:operation>
<wsdl:operation name="send">
<wsdlsoap:operation soapAction=""/>
<wsdl:input name="sendRequest">
<wsdlsoap:body
namespace="http://four.wspattern.developerworks.ibm.com"
use="literal"/>
</wsdl:input>
<wsdl:output name="sendResponse">
<wsdlsoap:body
namespace="http://four.wspattern.developerworks.ibm.com"
use="literal"/>
</wsdl:output>
</wsdl:operation>
</wsdl:binding>
<wsdl:service name="MessageBusServiceService">
<wsdl:port
binding="intf:MessageBusServiceSoapBinding"
name="MessageBusService">
<wsdlsoap:address
location="http://localhost:9080/WSPattern4/services/MessageBusService"/>
</wsdl:port>
</wsdl:service>
</wsdl:definitions>
|
需要对此 Web 服务接口注意的一件事情是:它完全没有任何与众不同的地方。它与其他大多数简单的 Web 服务接口相似。在
清单 2 中显示的服务的实现,也同样没有什么特别之处。
清单 2. MessageBusService.java
package com.ibm.developerworks.wspattern.four;
import java.io.Serializable;
import java.util.Enumeration;
import java.util.Iterator;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.servlet.http.HttpServlet;
import javax.xml.rpc.ServiceException;
import javax.xml.rpc.server.ServiceLifecycle;
public class MessageBusService
implements ServiceLifecycle {
Context context = null;
QueueConnection connection = null;
TopicConnection tconnection = null;
QueueSession session = null;
TopicSession tsession = null;
public void init(Object serviceContext)
throws ServiceException {
try {
context = JNDIHelper.getInitialContext();
connection = JNDIHelper.getQueueConnection(context);
session = JNDIHelper.getQueueSession(connection);
tconnection = JNDIHelper.getTopicConnection(context);
tsession = JNDIHelper.getTopicSession(tconnection);
} catch (Exception e) {}
}
public void destroy() {
try {
tsession.close();
session.close();
tconnection.close();
connection.close();
} catch (Exception e) {}
}
public void send(CaseModel model) {
sendMessage(JNDIHelper.INPUT TOPIC, model);
}
public CaseModel receive() {
return receiveMessage(JNDIHelper.OUTPUT QUEUE);
}
private void sendMessage(String topicName, CaseModel model) {
try {
ObjectMessage message = tsession.createObjectMessage(model);
Topic topic = JNDIHelper.getTopic(context, topicName);
TopicPublisher publisher = JNDIHelper.getTopicPublisher(tsession, topic);
publisher.publish(message);
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
private CaseModel receiveMessage(String queueName) {
CaseModel model = null;
try {
Queue queue = JNDIHelper.getQueue(context, queueName);
QueueReceiver receiver = JNDIHelper.getQueueReceiver(session, queue);
Message message = receiver.receiveNoWait();
if (message != null && message instanceof ObjectMessage) {
ObjectMessage objMessage = (ObjectMessage)message;
Serializable obj = objMessage.getObject();
if (obj instanceof CaseModel) {
model = (CaseModel)obj;
}
}
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
return model;
}
}
|
范例实现的模型很直接。Web 服务接收到
send 请求然后将模型发布到输入 JMS 主题。四个不同应用程序组件监听那个主题并对消息执行某些响应动作。这些组件中的两个生成响应消息,该消息被传送给输出 JMS 主题。由于 Web 服务客户端没有能力维持与 JMS 主题的直接的、持久的发布-订阅连接,所以就有一个特殊的监听器接收从输出主题中发出的消息,然后将那些消息存储在 JMS 响应队列中。当 Web 服务客户端调用
invoke 消息时,在那个队列中等待的响应就会被传送。
应用程序监听输入主题,包含两个对消息响应的应用程序指定的流程(也就是分别的将接收的文本转换为大写体或小写体):第一个将收到的消息记录在
stdout 控制台应用程序中,第二个位远程 Web 服务端点发送通知消息。
清单 3 显示了两个监听器中其中一个监听器的代码,该监听器接收通过消息总线传送的
CaseModel,然后将提供的文本转换为大写体和小写体。二者都以简单的 HTTP servlet 来实现 JMS
MessageListener 接口。我可以使用 JMS Message Driven Bean 来实现,但我没有选择这么做,是因为它将导致应用程序复杂度的有所增加。
清单 3. UppercaseTopicListenerServlet.java
package com.ibm.developerworks.wspattern.four;
import java.io.Serializable;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.servlet.Servlet;
public class UppercaseTopicListenerServlet
extends TopicListenerServlet
implements Servlet {
protected String getTopic() {
return JNDIHelper.INPUT TOPIC;
}
protected String getSelector() {
return "";
}
public void onMessage(Message message) {
try {
if (message instanceof ObjectMessage) {
ObjectMessage objMessage = (ObjectMessage) message;
Serializable obj = objMessage.getObject();
if (obj instanceof CaseModel) {
CaseModel model = (CaseModel)obj;
if (model.getText() != null) {
model.setText(model.getText().toUpperCase());
}
ObjectMessage response = session.createObjectMessage(model);
Topic responseTopic = JNDIHelper.getTopic(context, JNDIHelper.OUTPUT TOPIC);
TopicPublisher responsePublisher =
JNDIHelper.getTopicPublisher(session, responseTopic);
responsePublisher.publish(response);
}
}
} catch (Throwable t) {}
}
}
|
注意,除
toUpperCase() 及
toLowerCase() 操作所执行的不同之外,这些监听器在所有方面都是一样的。它们每一个都生成
response 消息,该消息包含容纳转换文本的
CaseModel 对象。每个都将响应消息传输给输出 JMS 主题。
在
清单 4 中显示的另外一个监听器,等待将消息发布到输出主题上,然后将那些消息放置在响应消息队列中,等待传输给 Web 服务客户端。
清单 4. ResponseTopicListenerServlet.java
package com.ibm.developerworks.wspattern.four;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.servlet.Servlet;
import javax.servlet.ServletException;
public class ResponseTopicListenerServlet
extends TopicListenerServlet
implements Servlet {
protected QueueConnection qconnection;
protected QueueSession qsession;
protected Queue queue;
protected String getTopic() {
return JNDIHelper.OUTPUT TOPIC;
}
protected String getSelector() {
return "";
}
public void init() throws ServletException {
super.init();
try {
qconnection = JNDIHelper.getQueueConnection(context);
qsession = JNDIHelper.getQueueSession(qconnection);
queue = JNDIHelper.getQueue(context, JNDIHelper.OUTPUT QUEUE);
} catch (Exception e) {}
}
public void destroy() {
super.destroy();
try {
qsession.close();
qconnection.close();
} catch (Exception e) {}
}
public void onMessage(Message message) {
try {
QueueSender sender = JNDIHelper.getQueueSender(qsession, queue);
sender.send(message);
} catch (Exception e) {}
}
}
|
这个排队等候通过消息总线传递响应消息的“存储并传送”的技术,确保了等待的 Web 服务客户端接收到所有消息,且是以它们在消息总线中传输时的准确的顺序接收。要注意的一点实事是:在这个实现中,并没有任何保证说 Web 服务客户端只接收只传递给它们的消息。换句话说,在这个模型的实际实现当中您需要加强适当的安全性以及应用限制,以确保只有适当的人接收到适当的消息。
清单 5 展示了两个应用程序组件之一,该组件监听输入及输出 JMS 主题。从消息总线上接收消息,这些组件唤醒远程 Web 服务端点,通知它已经接收了消息。这些组件完全相同,只是它们监听不同的 JMS 主题。
清单 5. RequestTopicListenerNotifierServlet.java
package com.ibm.developerworks.wspattern.four;
import java.io.Serializable;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.servlet.Servlet;
import javax.xml.namespace.QName;
import javax.xml.rpc.Call;
import javax.xml.rpc.ParameterMode;
import javax.xml.rpc.Service;
import javax.xml.rpc.ServiceFactory;
import javax.xml.rpc.encoding.TypeMapping;
import javax.xml.rpc.encoding.TypeMappingRegistry;
import com.ibm.ws.webservices.engine.encoding.ser.BeanDeserializerFactory;
import com.ibm.ws.webservices.engine.encoding.ser.BeanSerializerFactory;
public class RequestTopicListenerNotifierServlet
extends TopicListenerServlet
implements Servlet {
private static final String NSURI =
"http://four.wspattern.developerworks.ibm.com";
protected String getTopic() {
return JNDIHelper.INPUT TOPIC;
}
protected String getSelector() {
return "";
}
public void onMessage(Message message) {
try {
if (message instanceof ObjectMessage) {
ObjectMessage objMessage = (ObjectMessage) message;
Serializable obj = objMessage.getObject();
if (obj instanceof CaseModel) {
CaseModel model = (CaseModel)obj;
ServiceFactory factory = ServiceFactory.newInstance();
Service service =
factory.createService(
new QName(
NSURI,
"RemoteService"
)
);
QName cmq = new QName(NSURI,"CaseModel");
TypeMappingRegistry tmreg = service.getTypeMappingRegistry();
TypeMapping tm = tmreg.createTypeMapping();
tm.register(
CaseModel.class,
cmq,
new BeanSerializerFactory(
CaseModel.class,
cmq),
new BeanDeserializerFactory(
CaseModel.class,
cmq));
tmreg.register("", tm);
Call call = service.createCall();
call.setTargetEndpointAddress(
"http://localhost:9080/WSPattern4/services/RemoteService");
call.addParameter(
"model",
cmq,
CaseModel.class,
ParameterMode.IN);
call.invoke(
new QName(
NSURI,
"notifyRequest"),
new Object[] {model}
);
}
}
} catch (Throwable t) {}
}
}
|
最后加入到消息总线的应用程序组件是简单的控制台应用程序,它显示
stdout 通知,而不管消息是被传送到输入还是输出 JMS 主题。
清单 6. ListenerApp.java
package com.ibm.developerworks.wspattern.four;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.Serializable;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
public class ListenerApp
implements MessageListener {
private static Context context;
private static TopicConnection connection;
private static TopicSession session;
private static Topic requestTopic;
private static Topic responseTopic;
private static TopicSubscriber requestSubscriber;
private static TopicSubscriber responseSubscriber;
public static void main(String[] args) throws Exception {
ListenerApp listener = new ListenerApp();
context = JNDIHelper.getInitialContext();
connection = JNDIHelper.getTopicConnection(context);
session = JNDIHelper.getTopicSession(connection);
requestTopic = JNDIHelper.getTopic(context, JNDIHelper.INPUT TOPIC);
responseTopic = JNDIHelper.getTopic(context, JNDIHelper.OUTPUT TOPIC);
requestSubscriber = JNDIHelper.getTopicSubscriber(session, requestTopic);
responseSubscriber = JNDIHelper.getTopicSubscriber(session, responseTopic);
requestSubscriber.setMessageListener(listener);
responseSubscriber.setMessageListener(listener);
String input = "";
while (!"exit".equalsIgnoreCase(input)) {
BufferedReader br =
new BufferedReader(
new InputStreamReader(
System.in));
System.out.print("> ");
input = br.readLine();
}
requestSubscriber.close();
responseSubscriber.close();
session.close();
connection.close();
}
public void onMessage(Message message) {
synchronized(System.out) {
try {
if (message instanceof ObjectMessage) {
ObjectMessage objMessage = (ObjectMessage) message;
Serializable obj = objMessage.getObject();
if (obj instanceof CaseModel) {
CaseModel model = (CaseModel) obj;
System.out.println("\nMessage Received: " + model);
System.out.println("\tSent To: " + message.getJMSDestination());
System.out.print("> ");
}
}
} catch (Exception e) {}
}
}
}
|
这个特殊的组件的目的是示范消息总线模型的固有的灵活性。由于消息总线工作的方式,应用程序组件可以在任何时候被加入到(或从中去除)应用程序架构中,因而可以扩展容量并达到系统。当 Web 服务启动起来关闭控制台应用程序,以不影响消息总线的功能性的方式运行。
结束语
在这部分所涵盖的内容当中只是粗略的提供了对消息总线模型实现。在实际实现当中仍有许多问题需要解决,例如安全性、适当的消息运行(代理程序)、传输模型等等。它们都很重要,但超出了本系列的范围。这里以及前三部分的目的是要激起您对思考如何实现 Web 服务的兴趣。Simple RPC-Style Stock Quotes 服务没有代表规范及标准的 Web 服务系列的真正的能力。
像异步队列、命令、路由及消息总线模型这样的模式,提供了许多的灵活性,以这种方式您可以实现 Web 服务及创建不同类型的应用程序。利用这样的模式是从众多的 Web 服务规范、标准及可利用的产品中取得最大值的关键。
下载 | 描述 | 名字 | 大小 | 下载方法 |
|---|
| WebSphere deployable EAR file | ws-tip-altdesign4code.zip | 2102 KB | HTTP |
|---|
参考资料
关于作者  | 
|  | James Snell 是 IBM Emerging Technologies Toolkit 开发组成员。在过去的几年中他一直致力于 Web 服务技术和标准的实现。在 developerWorks 上,他维护着一个关于新兴技术的
weblog。
|
对本文的评价
|