IBM®
跳转到主要内容
    中国 [选择]    使用条款
 
 
Select a scope:Search for:    
    首页    产品    服务与解决方案     支持与下载    个性化服务    
跳转到主要内容

developerWorks 中国  >  SOA and Web services | WebSphere  >

将基于 WebSphere MQ 的应用程序与基于 BPEL 的流整合

developerWorks
文档选项

未显示需要 JavaScript 的文档选项


级别: 初级

Naveen Sachdeva (sachdeva@us.ibm.com), 咨询软件工程师, WebSphere Competency Center, IBM

2005 年 6 月 16 日

通过使用 JMS(Java Messaging Service),可以将基于 IBM(R) WebSphere(R) MQ 的应用程序公开为一系列同步或异步的服务,并将其同基于业务流程执行语言(Business Process Execution Language,BPEL)的流程集成在一起。服务的集成与集成类型、流程类型以及应用类型密切相关。本文中将描述 MQ 应用程序的两种类型——一种类型的 MQ 应用程序用头属性一一识别请求和响应消息,另外一种类型的 MQ 应用程序则依赖于消息属性进行识别。同时,作者也将向您展示如何将它们与可中断流及不可中断流整合。

引言

BPEL 流有两种不同的类型——可中断流与不可中断流。不可中断流用于描绘短期运行的流程,它们不能接收异步事件。 换言之,就是它们仅仅具有一个 Receive 活动。不可中断流中的所有活动都必须在单独的事务处理范围内执行。而可中断流用于描绘长期运行的程序,它可以具有多个 Receive 活动,且每个活动都具有其单独的事务边界。这些不同之处将影响不可中断流及可中断流与 Java 消息服务应用程序的集成。有关 BPEL 更详细的介绍请参阅参考资料部分。

在这篇文章中,我将分析 WebSphere MQ(MQ)应用程序的两种类型——类型 1 与类型 2。类型 1 的应用程序使用 MessageIDCorrelId 属性(JMS 的 JMSMessageIDJMSCorrelationID)来关联请求与响应消息,同时使用 ReplyToQReplyToQMgr 属性(JMS 的 JMSReplyTo)来确定响应队列。另一方面,类型 2 的应用程序仅使用预先定义好的响应队列,且可以通过请求与响应消息中的属性将它们关联在一起。大部分 MQ 应用程序是类型 1 与类型 2 应用程序的组合。

清单 1清单 2 展示了分别使用 MQ JMS 与 MQ Java 类来实现类型 1 应用程序的相关代码。有关如何使用 Java 开发 MQ 应用程序的更多详细介绍,请参阅参考资料部分。


清单 1. 使用 MQ Java 类的类型 1 应用程序
				
				…. responseMessage.correlationId =
				requestMessage.messageId; int openOptions =
				MQC.MQOO_OUTPUT; MQQueue sendQueue = null; if
				(requestMessage.replyToQueueName != null) { sendQueue =
				qMgr.accessQueue(requestMessage.replyToQueueName,
				openOptions, requestMessage. replyToQueueManagerName,
				null, null); } else { sendQueue =
				qMgr.accessQueue(DEFAULT_REPLY_QUEUE, openOptions); } ….
			


清单 2. 使用 MQ JMS 类的类型 1 应用程序

				
				….
				responseMessage.setJMSCorrelationID(requestMessage.getJMSMessageID());
				if (requestMessage.getJMSReplyTo() != null) {
				sendQueue=queueSession.createQueue(requestMessage.getJMSReplyTo(
				).toString()); } else {
				sendQueue=queueSession.createQueue(DEFAULT_REPLY_QUEUE);
				} ….
			

在本文中,我会介绍如何将双向 MQ 应用程序公开为 JMS 服务,并将他们与可中断及不可中断流进行同步或是异步集成。 你应该对基本的 BPEL 和 MQ 概念有所了解,知道如何创建单向 JMS 服务并将其同 BPEL 流集成(参阅相关文章 不使用 Java 代码在 BPEL 流程中发送简单的 JMS 消息 以了解更详细的内容 )。

本文中的范例基于下列版本的 IBM 产品:

  • WebSphere Studio Application Developer - Integration Edition V5.1.1.5
  • IBM WebSphere Application Server V5.1.1.3
  • IBM WebSphere Business Integration Server Foundation V5.1.1.1
  • IBM WebSphere MQ V5.3 + CSD8

在使用本文的例子前,请确认你使用的 IBM 产品版本与上面提到的相同或更高。(参阅 参考资料 相关链接以下载试用版本)





回页首


将 WebSphere MQ 应用程序公开为 JMS 服务

本部分将介绍如何通过下列步骤,将 MQ 应用程序公开为 JMS 服务:

  1. 创建接口文件。
  2. 创建绑定文件。
  3. 创建服务文件。

根据本范例,假定类型 1 的应用程序请求与响应消息具有一个单独的类型为 String 的属性 ,类型 2 应用程序具有一个额外的属性 messageID ,其类型也为 String 。 两种类型的应用程序中请求与响应队列分别是 RequestQueueReplyQueue ,它们使用 QM 作为队列管理器,并在测试服务器上将请求与响应队列配置为 jms/RequestQueuejms/ReplyQueuejms/MQConnection

让我们从创建类型 1 与类型 2 应用程序的接口开始。

创建接口文件

可以用 Web 服务描述语言(WSDL)文件分别描述类型 1 与类型 2 应用程序的接口,如以下 图 1图 2 所示。这里, SendAndReceive 用于同步调用而 SendReceive 组合在一起可以用于异步调用。


图 1.类型 1 WSDL
图 1.类型 1 WSDL

图 2.类型 2 WSDL
图 2.类型 2 WSDL

创建绑定文件

为了创建绑定文件,创建一个新的包含接口文件的 WSDL 文件。 使用 WSDL 编辑器为所有端口创建 JMS 绑定——消息类型选择 ObjectMessage图 3 展示了编辑器创建的缺省 JMS 绑定。


图 3.缺省 JMS 绑定
图 3.缺省 JMS 绑定

对于本例而言,使用缺省绑定已经足够了。您可以指定客户端期待响应的队列,从而改进绑定的可读性。通过指定 JMSReplyTo 属性可以完成该操作。

在 WSDL 编辑器中,按照下列步骤进行:

  1. 在绑定窗格中,右键单击 SendAndReceive 操作并选择 Add Extensibility Element > jms:propertyValue
  2. 选择新建的 jms:propertyValue,设置名字为 JMSReplyTo,类型为 xsd:string,值为 jms/ReplyQueue
  3. 对于 Send 操作重复执行上面的步骤。

注意:客户端往往可以改写 JMSReplyTo 属性 ,但仅当后端应用程序是类型 1 应用程序时才能生效。

图 4 将展示如何使用指定的 JMSReplyTo 属性修改绑定文件。


图 4.修改 JMS 绑定
图 4.修改 JMS 绑定

创建服务文件

可以通过绑定文件,指定用 JMS 来调用应用程序。现在,为了创建服务文件,创建一个新的包含绑定文件的 WSDL 文件。然后使用 WSDL 编辑器来给所有 JMS 绑定创建服务端口。同时指定端口细节,使用下列值:

  1. Protocol: JMS
  2. JNDI connection factory name: jms/MQConnection
  3. JNDI destination name: jms/RequestQueue
  4. JNDI provider URL: iiop://localhost:2809

服务的 jms:address 如图 5 所示。


图 5.服务文件
图 5.服务文件




回页首


同步集成

在本部分中,您会学习到如何通过 BPEL 流调用 SendAndReceive 服务,而无需编写任何 Java 代码。我们使用一个简单的业务流程来展示整个过程,如图 6 所示。该流程使用 JMSServicePartnerLink 指向 SendAndReceive 端口类型,这一端口类型通过先前定义的服务文件在部署阶段确定。该流程的输入是字符串 (InputVariable:contents) ,并在调用 SendAndReceive 前映射至 JMSRequestVariable:Part。同样,服务输出—— JMSResponseVariable:Part——映射回 InputVariable:contents,InputVariable:contents 将作为流程输出返回。


图 6.通过业务流程双向同步调用 JMS 服务
图 6.通过业务流程双向同步调用 JMS 服务

为调用双向 JMS 服务,IBM WebSphere Business Integration Server Foundation 中的 Business Process Choreographer 组件将使用单独的事务边界来发送和接收消息。Business Process Choreographer 也可以将请求消息的 JMSReplyTo 属性设置为 BPEIntQueue,这样,响应就能传递给 Business Process Choreographer,并根据后台应用程序将 JMSMessageID 映射到 JMSCorrelationID

要想正常工作,后台应用程序应为类型 1 应用程序,且不能在现有的事务中调用 JMS 服务。 否则,Business Process Choreographer 将不能提交发送事务,因此消息将不被发送,导致接收方出现超时异常。此外,要使后台 MQ 应用程序能将消息直接放到 BPEIntQueue 上,应用程序队列管理器(QM)与 Business Process Choreographer 队列管理器(即 BPEQM)之间应存在一个通道(除非 Business Process Choreographer 队列也基于 QM 定义)。对于 WebSphere Business Integration Server Foundation V5.x,Business Process Choreographer 队列应通过 MQ 队列管理器定义。 关于如何在两个队列管理器之间创建通道的详细内容请参见参考资料部分。

可中断流

缺省情况下,可中断流中所有操作应在相互独立的事务边界内执行。因此,调用双向同步 JMS 服务无需更多操作。

不可中断流

不可中断流的缺省情况下,所有活动都在相同的事务边界内执行。但可以通过将活动的 Transactional Behavior属性(在 Server 选项卡下)设置为 Requires Own 来更改这一点,如图 7 所示。


图 7.将 Transactional Behavior 更改为 Requires Own,从而通过不可中断流来调用双向同步 JMS 服务
图 7.将 Transactional Behavior 更改为 Requires Own,从而通过不可中断流来调用双向同步 JMS 服务




回页首


异步集成

在本部分中,您将学习如何通过 BPEL 流集成 SendReceive 服务。 在 BPEL 中,可以使用流中的 Receive 活动来接收异步消息。只有可中断流支持异步 Receive 活动,因此异步集成时,自然会选择可中断流。然而,使用不可中断流来集成异步服务也是有可能的。 此时需要服务适配器,允许不可中断流程使用 Invoke 活动来接收应答。 在本部分中,也将展示有关此服务适配器的范例。

可中断流

在可中断流情况下,通过使用 Invoke 活动发送请求,并使用 Receive 活动接收应答,从而实现双向异步服务。 图 8 展示了将 Send 服务与 Receive 服务集成的可中断流程范例。在这个流程中,JMSAsyncServicePartnerLink 是一个双向伙伴链接,它为 Send 服务指向外部的应用程序,并为 Receive 服务实现指向可中断流程。为了使该流程中的请求与响应消息相关联,这两种消息应具有专门用于关联的属性。换句话说, 后台应用程序应是一个类型 2 的应用程序 。为了将可中断流与类型 1 应用程序集成,你可以使用上面不可中断流中提及的服务适配器。


图 8.与双向异步服务集成的可中断流程范例
图 8.与双向异步服务集成的可中断流程范例

该流程将使用请求与响应消息中的 messageID 属性来关联这两种消息。定义相关性设置来映射两种消息(如 图 9 所示),通过 Invoke 活动来初始化该相关性设置(即 sendRequest,如 图 10 所示),接下来,用它来进行 Receive 活动(即 receiveResponse,如 图 11 所示)。有关创建相关性设置的详细内容请参见 参考资料 部分。


图 9.使用 messageID 属性关联请求与响应消息
图 9.使用 messageID 属性关联请求与响应消息

图 10.使用请求消息初始化相关性设置
图 10.使用请求消息初始化相关性设置

图 11.使用相关性设置接收相应的响应消息
图 11.使用相关性设置接收相应的响应消息

Send 服务端口映射到先前为 MQ 应用程序创建的服务文件,以生成流程的部署代码。流程将消息发送到 RequestQueue 中。 但仍需从 ReplyQueue 中获取响应消息(该队列由 MQ 应用程序放置),并将该消息放置到 BPEIntQueue 中(Business Process Choreographer 需要等待从该队列获取响应消息)。为了实现这一目的,需要为 Receive 服务(该服务使用 MQ 应用程序的绑定通过流程来实现)创建部署代码:

在 Business Integration 视图中,按照下列步骤进行:

  1. 右键单击用于 Receive 端口的基于流程的 JMS 服务文件,即<process name>_Receive_JMS.wsdl,并选择 Enterprise Services > Generate Deploy Code…
  2. 选择 Use an existing port 并单击 Next
  3. 单击 Browse… 并选择 MQ 应用程序服务文件。
  4. 对于 Service name: 选择 JMSReceiveService 并单击 Finish

这样将创建消息驱动 Bean(MDB)—— JMSReceiveServiceMDB——用来监听 SenderQListenerPort 并将消息发送到流程。 图 12 展示了生成 MDB 的部署描述符。清除 Message Selector 并保存部署描述符。 在测试服务器上运行程序前,使用下列值来创建 SenderQListenerPort :

  1. Connection Factory JNDI Name: jms/MQConnection
  2. Destination JNDI Name: jms/ReplyQueue

图 12.生成的 JMSReceiveServiceMDB 的部署描述符
图 12.生成的 JMSReceiveServiceMDB 的部署描述符

注意:当我写这篇文章时,生成 MDB 代码中有一个错误——生成的代码用于输入-输出的操作,而不是仅用于输入的操作。 在实际运行时,要将 JMSRecceiveServiceMDB::executeOperation() 中对 executeInputOnlyOperation() 的调用替换为对 executeRequestResponseOperation() 的调用,如下所示:

				
				// success = //
				operation.executeRequestResponseOperation(inMsg, outMsg,
				faultMsg); operation.executeInputOnlyOperation(inMsg);
			

不可中断流

不可中断流只能同类型 1 应用程序集成。这是因为,为了接受响应消息,不可中断流应能选择来自 ReplyQueue 的消息,且 JMS 仅支持消息选择的头属性。 要将不可中断(或可中断)流与类型 1 应用程序集成,需要一个服务适配器来匹配 Send 服务,从而返回请求消息的消息 ID,并匹配 Receive 服务,从而根据请求消息的消息 ID 来返回响应消息。 图 13 展示了服务适配器的交互图。


图 13.服务适配器交互图
图 13.服务适配器交互图

图 14 展示了服务适配器的接口文件范例。 图 15 展示了使用 Invoke 活动与服务适配器进行集成的不可中断流范例,其中 JMSSendPartnerLink 与 JMSReceivePartnerLInk 分别引用 JMSSendProxy::send 和 JMSReceiveProxy::receive。


图 14.服务适配器接口文件
图 14.服务适配器接口文件

图 15.双向异步集成的不可中断流范例
图 15.双向异步集成的不可中断流范例

按照下列步骤创建服务提供者:

  1. 右键单击服务适配器接口文件,并选择 New > Build from Service…
  2. 选择缺省值并单击 Finish
  3. 将 JMS 代码加入生成的 JMSSendProxy.java 与 JMSReceiveProxy.java 中,分别用于发送请求并接收响应。 清单 3清单 4 展示了这两个类的代码范例。 代码使用 JMSParameter.properties 与 JMS_Client 实用类,分别如 清单 5清单 6 所示。

清单 3.发送 JMS 消息的代码范例
				
				public java.lang.String send(java.lang.String
				argMessage) { // user code begin {method_content}
				ResourceBundle res =
				ResourceBundle.getBundle("com.sample.service.JMSParameter");
				String initialContextFactory =
				res.getString("initialContextFactory"); String
				providerURL = res.getString("providerURL"); String
				conFactory = res.getString("conFactory"); String sendQ =
				res.getString("senderQ"); String receiveQ =
				res.getString("receiverQ"); String messageExpiration =
				res.getString("messageExpiration");
				String messageId = null; try { messageId =
				JMS_Client.sendMessage( argMessage,
				initialContextFactory, providerURL, conFactory, sendQ,
				receiveQ, new Long(messageExpiration).longValue()); }
				catch (Exception ex) { ex.printStackTrace(); } return
				messageId; // user code end }
			


清单 4.接收 JMS 消息的代码范例

				
				public java.lang.String receive(java.lang.String
				argMessageId) { // user code begin {method_content}
				ResourceBundle res =
				ResourceBundle.getBundle("com.sample.service.JMSParameter");
				String initialContextFactory =
				res.getString("initialContextFactory"); String
				providerURL = res.getString("providerURL"); String
				conFactory = res.getString("conFactory"); String sendQ =
				res.getString("senderQ"); String receiveQ =
				res.getString("receiverQ"); String receiveMessageTimeout
				= res.getString("receiveMessageTimeout");
				String receivedMsg = null; try { receivedMsg =
				(String)JMS_Client.receiveMessage( argMessageId,
				initialContextFactory, providerURL, conFactory,
				receiveQ, new Long(receiveMessageTimeout).longValue());
				} catch (Exception ex) { ex.printStackTrace(); } return
				receivedMsg; // user code end }
			


清单 5.JMSParameter.properties 文件内容

				
				initialContextFactory =
				com.ibm.websphere.naming.WsnInitialContextFactory
				providerURL = iiop://localhost:2809 conFactory =
				jms/MQConnection senderQ = jms/RequestQueue receiverQ =
				jms/ReplyQueue messageExpiration = 0
				receiveMessageTimeout = 10000
			


清单 6.JMS_Client 实用类

				
				public class JMS_Client {
				public static String sendMessage( Serializable message,
				String initialContextFactory, String providerURL, String
				conFactory, String sendQ, String receiveQ, long
				messageExpiration) throws Exception {
				QueueConnection queueConnection = null; QueueSession
				queueSession = null; String messageId = null;
				Context jndiContext = null; try { Hashtable
				jndiProperties = new Hashtable(); jndiProperties.put(
				Context.INITIAL_CONTEXT_FACTORY, initialContextFactory);
				jndiProperties.put(Context.PROVIDER_URL, providerURL);
				jndiContext = new InitialContext(jndiProperties);
				QueueConnectionFactory queueConnectionFactory =
				(QueueConnectionFactory) jndiContext.lookup(conFactory);
				Queue sendQueue = (Queue) jndiContext.lookup(sendQ);
				Queue receiveQueue = (Queue)
				jndiContext.lookup(receiveQ);
				queueConnection =
				queueConnectionFactory.createQueueConnection();
				queueSession = queueConnection.createQueueSession(
				false, Session.AUTO_ACKNOWLEDGE);
				QueueSender queueSender =
				queueSession.createSender(sendQueue);
				ObjectMessage sendMessage =
				queueSession.createObjectMessage(message);
				sendMessage.setJMSExpiration(messageExpiration);
				sendMessage.setJMSReplyTo(receiveQueue);
				queueSender.send(sendMessage); queueSender.close();
				messageId = sendMessage.getJMSMessageID(); } catch
				(Exception e) { throw e; } finally {
				closeConnections(queueSession, queueConnection); }
				return messageId; }
				public static Serializable receiveMessage( String
				messageID, String initialContextFactory, String
				providerURL, String conFactory, String receiveQ, long
				receiveMessageTimeOut) throws Exception {
				Serializable object = null;
				QueueConnection queueConnection = null; QueueSession
				queueSession = null;
				Context jndiContext = null; try { Hashtable
				jndiProperties = new Hashtable(); jndiProperties.put(
				Context.INITIAL_CONTEXT_FACTORY, initialContextFactory);
				jndiProperties.put(Context.PROVIDER_URL, providerURL);
				jndiContext = new InitialContext(jndiProperties);
				QueueConnectionFactory queueConnectionFactory =
				(QueueConnectionFactory) jndiContext.lookup(conFactory);
				Queue receiveQueue = (Queue)
				jndiContext.lookup(receiveQ);
				queueConnection =
				queueConnectionFactory.createQueueConnection();
				queueSession = queueConnection.createQueueSession(
				false, Session.AUTO_ACKNOWLEDGE);
				String selector = "JMSCorrelationID= '" + messageID +
				"'"; QueueReceiver queueReceiver =
				queueSession.createReceiver(receiveQueue, selector);
				queueConnection.start();
				Message responseMessage =
				queueReceiver.receive(receiveMessageTimeOut);
				object = ((ObjectMessage)responseMessage).getObject(); }
				catch (Exception e) { throw e; } finally {
				closeConnections(queueSession, queueConnection); }
				return object; }
				private static void closeConnections( QueueSession
				session, QueueConnection connection) { try {
				session.close(); session = null; connection.close();
				connection = null; } catch (Exception e) {
				} } }
			

如先前所讨论,Send 服务应在单独的事务边界中调用。要调用单独事务中不可中断流的 Send 服务,可以按照下列步骤进行(对可中断流而言,无需这些步骤):

  1. 右键单击 JMSSendProxyJavaService.wsdl 并选择 Enterprise Services > Generate Deploy Code…
  2. 对于 Inbound binding type,选择 EJB 并单击 Next > Finish
  3. 双击 EJB 项目中的 ejb-jar.xml 并单击 Assembly Descriptor 选项卡。
  4. 单击 Add… 并设置 Container Transactions
  5. 选择 JMSSendProxyService 并单击 Next
  6. Container transaction type 中选择 RequiresNew ,选择 send 方法并单击 Finish

现在您已经完成了使用 JMSSendProxyEJBService.wsdl 与 JMSReceiveProxyJavaService.wsdl 为可中断流生成部署代码的全部设置。





回页首


结束语

在本文中,作者介绍了如何将 BPEL 流与不同类型的 MQ 应用程序集成,在大多数情况下,您可以不用写代码而直接进行集成。同时也向您介绍了只有类型 1 应用程序可以被同步集成,类型 2 应用程序不能与不可中断流集成, 以及类型 1 应用程序需要服务适配器来实现异步集成。



参考资料

  • 您可以参阅本文在 developerWorks 全球站点上的 英文原文

  • 请参阅红皮书 WebSphere Business Integration Server V5.1 手册 ,其中有关于 BPEL 以及如何创建相关性设置的详细内容。该红皮书包含 WebSphere Business Integration Server Foundation 的技术细节,并讨论了使用 WebSphere Studio Application Developer Integration Edition 进行应用程序开发。

  • 红皮书 WebSphere MQ Using Java 描述了如何安装用于 Java 的 WebSphere MQ 类,并使用这些类来编写程序以访问 WebSphere MQ 系统,以及安装用于 Java 消息服务(JMS)的 WebSphere MQ 类,并用这些类编写程序以访问 Java 消息服务和 WebSphere MQ 应用程序。

  • WebSphere MQ Intercommunication 一书中,讨论了 WebSphere MQ 产品与 WebSphere MQ 的分布式编队管理(DQM)设施间的相互通信。描述了如何在两个队列管理器之间创建通道。

  • 使用 DB2 ® 、Lotus ® 、Rational ® 、Tivoli ® 与 WebSphere ® 的应用开发工具及中间件产品。您可以免费 下载试用版本 产品,或选择免费 developerWorks 的 Linux ® 或 Windows ® 版本的 Software Evaluation Kit

  • 访问 Developer Bookstore,获取全面的技术书籍清单,包括大量Web 服务主题的书籍。

  • 通过参与 developerWorks blogs 加入 developerWorks 社团。

  • IBM developerWorks 组在全球范围内有大量的技术讲座,您可以免费参加。

  • 需要更多的信息?developerWorks 的SOA 和 Web 服务专区有大量专题文章以及关于开发 Web 服务应用程序的入门级、中级和高级教程。


关于作者

Naveen Sachdeva 是 WebSphere 技术中心 IBM Application Integration Middleware 组的一名咨询软件工程师。他在大规模系统集成及分布式计算系统的设计与开发方面有10年以上的经验。 目前,他帮助 IBM 业务伙伴使用 IBM 产品进行技术功能实现和概念验证。他现在主要从事 SOA 工具技术以及 Model Driven Development 模型驱动开发。




对本文的评价

太差! (1)
需提高 (2)
一般;尚可 (3)
好文章 (4)
真棒!(5)

建议?




回页首


IBM 公司保留在 developerWorks 网站上发表的内容的著作权。未经IBM公司或原始作者的书面明确许可,请勿转载。如果您希望转载,请通过 提交转载请求表单 联系我们的编辑团队。
    关于 IBM 隐私条约 联系 IBM 使用条款