内容


应用 WebSphere MQ V6 来构建企业信息总线的行业示例

Comments

系列内容:

此内容是该系列 # 部分中的第 # 部分:

敬请期待该系列的后续内容。

此内容是该系列的一部分:

敬请期待该系列的后续内容。

引言

IBM WebSphere MQ 是目前应用最多的消息中间件产品, 它采用了消息队列(Message Queue) 这种应用程序间的通信方法, 让不同的应用程序通过读写和检索出入队列中的数据(消息)来 通信, 而无需直接面对网络易变、系统异构、数据协同等各种问题和风险。 WebSphere MQ 同时 也支持简单的 Publish/Subscribe(发布 / 订阅)消息传递机制, 每个队列管理器中有唯一的 Broker 代理来处理所有的订阅和发布。

WebSphere MQ 支持 Cluster(簇或集群),即多个队列管理器(Queue Manager 以下简称 QM) 的集合,这些队列管理器可以分布在不同的机器上。 本文中的例子采用了上述的 Queue 和 Pub/Sub 两种方式, 并创建了一个 MQ Cluster 来简化数据传输的配置和 MQ 对象的管理, 采用 MQ Cluster 的理由基于其如下特点:

1) Cluster 中的队列管理器之间的数据传输通道是自动建立的, 使得数据传输配置变得更简单;

2) 队列管理器中的队列可以被指定为 Cluster 共享队列, 对 Cluster 中的所有的队列管 理器都是可见的; 不同队列管理器中定义的同名 Cluster 共享队列自动实现针对该队列的传输负载均衡;

3) 不同队列管理器的 Broker 代理之间可以指定主从关系, 这样就可以在 Cluster 内很方便地实现树状的 Pub/Sub 结构。

在根据需要配置好 WebSphere MQ 之后, 就可以在 WAS 上进行相应配置并在应用程序中集成这 些资源了。 下文将会详细介绍如何根据本文实例的需要来配置 WebSphere MQ, 以及 WAS 和应用代码、配置上的细节。

实例描述

这是一个出入境清关系统的例子(见图 1.1)。这个系统由一个管理中心 (Headquarter,以 下简称 HQ)和多个子控制中心(Control Point,以下简称 CP)构成。 其中每个 CP 会产生大量清关 数据(Move Record,以下简称 MR), 这些清关数据需要被传送到 HQ 上汇总;各个 CP 在处理清关操作 的时候需要的监控列表 (Watch List,以下简称 WL)则由 HQ 定制并不定期向各个 CP 分发。

图 1.1 系统示意图一
 系统示意图一
系统示意图一

这里 CP 和 HQ 之间的数据传输采用 WebSphere MQ 来实现, HQ 和每个 CP 都有自己的队列管理器(QM),这些 QM 被配置为同处于一个 MQ Cluster 中。 MR 的数据传输是 CP 到 HQ 单向点到点的 数据传输, 在本例中用定义在 HQ 端的队列管理器中的 Cluster 共享队列(MR2HQ.Q)来实现。 WL 的数据则是 HQ 向各个 CP 的广播,这就需要采用 Pub/Sub 的模式。 实例中是选择 HQ 的一个队列 管理器启动 Broker 服务, 然后每个 CP 的队列管理器也都启动 Broker 服务并作为 HQ 的 Broker 子 结点。

图 1.2 系统示意图二
 系统示意图二
系统示意图二

WebSphere MQ 自带的 Broker 服务用于支持简单的 Pub/Sub 功能, 每个队列管理器中有唯一 的 Broker 代理来处理所有主题的订阅和发布。 在单一队列管理器上启用 Broker 代理时, 主题 的发布者和订阅者都直接访问这同一个队列管理器,在其上可以定义多个主题。 多个队列管 理器之间也可以做父子级联, 前提是要建立双向的发送 / 接收通道以及相应的传输队列, 当然 如果两个队列管理器都在同一个 MQ Cluster 里也行。 父子级联的效果就是子节点成为订阅代 理, 订阅者向子节点发出的订阅都会代理成向父节点的订阅。 由此形成了一个发布 / 订阅的分 布式树形结构, 在父节点上发布的主题消息可以被其所有子节点上的订阅者消费。 这种方式 的好处就是可以消除对单一 Broker 服务的依赖,本文的例子就是采用这一方式, HQ1 为父节点,所有的 CP 都是子节点。

图 1.3 MQ Pub/Sub 示意图
MQ Pub/Sub 示意图
MQ Pub/Sub 示意图

WebSphere MQ 自带的 Broker 服务只支持简单的 Pub/Sub 功能,不支持集群功能。 比如本文 的例子,只有 HQ1 为父节点,不能把 HQ1 和 HQ2 的 Broker 服务配置成一个集群父节点; 子节点 只能指定一个父节点,而且也不能把多个子节点配置成一个集群子节点。 在复杂的应用环境 下,WebSphere MQ 自带的 Broker 服务不能满足要求时, 就只能采用功能更全面 的 WebSphere Message Broker 了。

Control Point

为了简化说明,我们在这里只描述 HQ 和一个单一的 CP 之间的情况。首先看 CP 端(见图 1.2)。 我们假设这个 CP 的名字就叫 CP1,它配置了一个队列管理器 CP1.QM, 这个队列管理器与 HQ 端的 队列管理器同在一个 Cluster 中(BCP_CLUSTER)。 由于定义在 HQ 端的 Cluster 共享队列 MR2HQ.Q 对 CP1.QM 是可见的, 这样 CP1 就可以直接向本地队列管理器 CP1.QM 中的 MR2HQ.Q 队列发送 MR 数 据, 这些数据也将会被自动地传送到 HQ 端。 注意:CP1 本地的队列管理器并没有定义该 MR2HQ.Q 队列。

由于本地的 Broker 服务已经被配置为 HQ Broker 服务的子节点, 这样 CP1 对本地 Broker 服务某一主题的订阅(Subscribe)将被本地 Broker 服务代理成向 HQ 的 Broker 服务的订阅。 当 HQ 向 其 Broker 服务发布该主题的消息时,该消息将被 CP1 本地的 Broker 服务代理消费, 进而可以被 CP1 消费。这个消费的具体实现就是在 CP1 端用一个 MDB(Message Driven Bean) 侦听(即订阅)本地 Broker 服务上的指定主题(本例中为 BCP.WL.Topic)。

Headquarter

MQ Cluster 一般都至少需要两个 Full Repository,为了简化, 在本文的实例中把 HQ 的两个队列管理器(HQ1.QM 和 HQ2.QM)都设置为 Full Repository。 Cluster 共享队列 MR2HQ.Q 在两个 队列管理器上都有定义,Broker 服务则只在 HQ1.QM 上启动。

HQ 上对每个队列管理器都需要一个 MDB 来处理发送到该队列管理器的 MR 数据; 而当需要发布 WL 数据到各个 CP 时,只需要向 HQ1.QM 上的 Broker 服务针对主题 BCP.WL.Topic 发布即可。

经过如上配置,无论是 CP 还是 HQ 上的应用程序都只需要和本地的队列管理器打交道。 在 CP 端,MR 同步消息直接发送到本地队列管理器,MQ 会负责把消息发送到 HQ。 如果 CP 和 HQ 之间的网络中断,这些消息将缓存在 CP 端,等到网络重新联通后就会继续发送。 在 HQ 端,应用程序 只侦听本地队列管理器的队列, MR 同步消息到达后就会触发相应的 MDB 来处理这些消息。这 样, 网络不稳定不可靠因素由 WebSphere MQ 处理,对应用程序完全透明。 由于 CP 和 HQ 之间不直 接交互,既消除了它们之间的性能依赖, 同时可以让其更专注于其各自的业务。

配置 MQ cluster

下图为 MQ cluster: BCP_CLUSTER 的示意图:

队列管理器 HQ1.QM 和 HQ2.QM 为该 Cluster 的 Full Repositories,队列管理器 CP1.QM 为 Partial Repository。HQ1.QM 和 HQ2.QM 都安装在 HQ Server 上 ( 一般会安装在不同的 Server 上 ),CP1.QM 安装在 CP1 Server 上 ( 其实还可以有 CP2、CP3……)。

使用 WebSphere MQ Explorer 进行 Cluster 配置具体步骤如下:

  • 在 HQ Server 上创建 Queue Manager:HQ1.QM 和 HQ2.QM
    • HQ1.QM 的侦听端口配置为 1414,HQ2.QM 的侦听端口配置为 1415
    • 在创建向导上都记得选上 Create server-connection channel, 可以在其他服务器上远程控制它们
  • 在 HQ Server 上创建 Cluster,并指定 Full Repositories
    • Cluster 名:BCP_CLUSTER
    • Full Repositories:HQ1.QM 和 HQ2.QM
    • Cluster Channels:
      • TO.HQ1.QM connection name:<HQ Server IP>(1414)
      • TO.HQ2.QM connection name:<HQ Server IP>(1415)
  • 在 CP1 Server 上创建 Queue Manager:CP1.QM
    • CP1.QM 的侦听端口配置为 1414
    • 同样也选上 Create server-connection channel
  • 在 CP1 Server 上将位于远程的 HQ1.QM 和 HQ2.QM 添加到 QM Explorer 中
    • HQ1.QM <HQ Server IP>(1414)
    • HQ2.QM <HQ Server IP>(1415)
  • 在 CP1 Server 上将 CP1.QM 加入 BCP_CLUSTER
    • BCP_CLUSTER 右键菜单 --> Add Queue Manager to Clusterr
    • 选择 CP1.QM,添加 Partial repository
    • Cluster Channel:
      • TO.CP1.QM connection name:<CP1 Server IP>(1414)
    • 选择 Full repositories 时将所有两个 Full repositories 都选上

完成 Cluster 配置之后的 MQ Explorer 界面如图:

  • 创建队列
    • 在 HQ1.QM 上:
      • 创建队列 DL.Q 并将其设置为 HQ1.QM 的死信队列
      • 创建队列 MR2HQ.Q 并将其在 Cluster 中共享
    • 在 HQ2.QM 上:
      • 创建队列 DL.Q 并将其设置为 HQ2.QM 的死信队列
      • 创建队列 MR2HQ.Q 并将其在 Cluster 中共享
    • 在 CP1.QM 上:
      • 创建队列 DL.Q 并将其设置为 CP1.QM 的死信队列
  • 在 HQ1.QM 上配置 Topic
    • 在 HQ1.QM -> Advanced -> Services 下,开启 SYSTEM.BROKER 服务 ( 如果看不到,请点击右上角图标,使之列出系统服务 ), 并设置其服务管理方式为 Queue manager
    • 在控制台 <MQ Install Root>\Java\bin 路径下运行 命令 runmqsc HQ1.QM < MQJMS_PSQ.mqsc
    • 重启 HQ1.QM 以使 Topic 生效
  • 在 CP1.QM 上配置 Topic
    • 与 HQ1.QM 上进行同样的 Topic 配置操作:
      • 开启 SYSTEM.BROKER 服务并设置其服务管理方式为 Queue manager
      • 在控制台 <MQ Install Root>\Java\bin 路径下运行命令 runmqsc HQ1.QM < MQJMS_PSQ.mqsc
      • 添加 SYSTEM.BROKER 服务的启动参数 -p HQ1.QM 以指定其父 Topic 在 HQ1.QM 上
      • 重启 CP1.QM 以使 Topic 生效

配置 WAS

与 WebSphere MQ 进行交互,需要在 WAS 管理控制台中将 MQ 配置为 JMS 提供程序。 首先我们需要登录到 WAS 管理控制台(在使用缺省端口设置情况下,可以访问 http://localhost:9060/ibm/console 访问 WAS 管理控制台)。在管理控制台中, 点 击 Resources > JMS Providers > WebSphere MQ, 进入 WebSphere MQ 消息 提供程序面板(图 3.1)

图 3.1 WebSphere MQ 消息 提供程序面板
1 WebSphere MQ 消息 提供程序面板
1 WebSphere MQ 消息 提供程序面板

缺省的资源作用域是 Node,用户配置的 MQ 提供程序定义将在 Node 范围内可见。 如果是在 Cell 内配置 WebSphere MQ 消息提供程序,则可以在作用域栏中选择 Cell, 然后点击 Apply。“General Properties”是 WAS 预定义的一些配置属性; “Additional Properties” 部分中的链接分别用于管理以下 JMS 管理对象: ConnectionFactory、QueueConnectionFactory、Queue、 TopicConnectionFactory 和 Topic。

在本次应用实例中,我们需要从上述 MQ 提供程序面板中“Additional Properties” 部分出发,执行以下步骤:

  • 定义 JMS 队列连接工厂
  • 定义 JMS 队列
  • 定义 JMS 主题连接工厂
  • 定义 JMS 主题
  • 定义侦听器端口

定义 JMS 队列连接工厂

在本应用实例中,我们需要在 HQ 端的应用服务器上监听各个 CP 发送到 MQ 集群共享队列 MR2HQ.Q 中的消息。 因此我们需要在 HQ 端的应用服务器上定义到其本地队列管理器的 JMS 连接工厂, 应用程序将通过它与 MQ 建立 JMS 连接,从而读取队列中的消息。 同时,在 CP 端,我们也需要建立到其本地队列管理器的连接工厂, 应用程序通过它与本地队列管理器建立连接,向集群共享队列 MR2HQ.Q 发送消息。 下面以建立连接到 HQ1.QM 的 JMS 队列连接工厂为例, 说明在 WAS 管理控制台配置面向 MQ 提供程序的 JMS 队列连接工厂的具体步骤。

  • 在图 3.1 中所示面板的“Additional Properties”部分, 选 择“WebSphere MQ queue connections factories” 创建或删除 WebSphere MQ 队列连接工厂(在 JMS1.1 规范中, 我们也可以定义不局限于特定 JMS 域的连接工厂,相应地, 选择上图中的“WebSphere MQ connections factories”)。
  • 单击 New 按钮新建 MQ 队列连接工厂
  • 为了配置应用程序到 MQ 的连接,我们首先需要配置标准的 JMS 管理对象属性:
    • Name:输入 BCP.HQ1.QCF
    • JNDI Name:输入 BCP.HQ1.QCF。该项指定连接工厂对应的 JNDI 名称, 应 用程序将通过该 JNDI 名称来访问连接工厂。
  • 除了上述标准属性外,我们还必须指定一下属性:
    • Transport type:选择 BINDINGS 方式。WebSphere MQ JMS 提供程序能够配置两种与 WebSphere MQ 交互的方式:绑定方式连接和客户方式连接。 当使用绑定方式连接时, JMS 提供程序利用 JAVA 本地接口(JNI) 直接与 MQ 的队列管理器进行交互,而不是使用网络。 这种方式与客户方式相比,能够提供更好的执行性能, 但它要求 MQ 和 WAS 必须安装在同一台机器上。 客户方式使用 TCP/IP 连接 MQ,它允许 MQ 和 WAS 安装在不同的机器上。 在本应用实例中,WAS 始终与本地 MQ 交互,所以,选择 BINDINGS 方式。
    • Queue Manager:输入 HQ1.QM。 该属性指定连接工厂使用的队列管理器名称。 如果没有指定队列管理器,连接工厂将使用默认的队列管理器。
    • Host:输入 127.0.0.1。该属性指定 MQ 队列管理器运行的服务器地址。
    • Port:输入 1414。该属性指定了连接到 MQ 队列管理器使用的 TCP/IP 端口。 这里定义的端口值必须匹配 MQ 队列管理器定义的监听端口, 默认监听端口是 1414。
    • CCSID:输入 819。该属性指定编码字符集标识符。 它涉及到消息的编 码、解码过程。 我们需要在消息发送和接收时保持该属性一致。
  • 其他属性保持默认值。
  • 单击面板底部的 Apply 提交更改,并保持更改。

在本应用实例中,除了定义上述 BCP.HQ1.QCF 队列连接工厂外, 我们还需要在 HQ 端定义 BCP.HQ2.QCF 连接工厂(用于连接在 HQ2 上的队列) 以及在 CP 端 BCP.CP1.QCF 连接工厂(用于连接本次队列管理器,发送消息)。 配置属性分别为:

PositionNameJNDI NameQueue ManagerHostPortCCSID
HQBCP.HQ2.QCFBCP.HQ2.QCFHQ2.QM127.0.0.11415819
CPBCP.CP1.QCFBCP.CP1.QCFCP1.QM127.0.0.11414819

定义 JMS 队列

在本应用实例中,我们需要登录在 HQ 和 CP 端的 WAS 管理控制台, 分别定义连接到队列 MR2HQ.Q 的 JMS 队列目标。 应用程序使用 JMS 队列来传递点对点类型的 JMS 消息。 部署在 HQ 端的应用程序通过该队列目标从队列 MR2HQ.Q 中读取消息; 部署在 CP 端的应用程序通过该队列目标向 MR2HQ.Q 中发送消息。 下面以建立连接到 MR2HQ.Q 的 JMS 队列目标为例, 说明在 WAS 管理控制台配置面向 MQ 提供程序的 JMS 队列目标的具体步骤。

  • 在管理控制台,我们再次导航到 WebSphere MQ 消息提供程序面板。 在图 3.1 中所示面板的“Additional Properties”部分, 选择“WebSphere MQ queue destinations”进行创建或删除 WebSphere MQ 队列。
  • 单击 New 按钮新建队列目标。
  • 除了标准的 JMS 管理对象属性(Name 和 JNDI name)外, 我们必须设置的属性是 Base queue name,输入 BCP.MR2HQ.Q。 指定的队列的名称必须是在我们连接的队列管理器中存在。 另外,我们还需要指定 CCSID 属性值为 819。
  • 其他属性保持默认值。对于面板上的队列管理器、队列管理器端口等属性, 缺省将使用用于连接该队列的连接工厂的相应属性值。
  • 单击面板底部的 Apply 提交更改,并保持更改。

定义 JMS 主题连接工厂

在本应用实例中,HQ 发布 WL 消息到各个 CP。 相应地,各个 CP 需要订阅该消息,更新本地的 WL 数据。 因此我们需要在 HQ 端的应用服务器上定义到 HQ 端队列管理器(HQ1.QM)的 JMS 连接工厂, 应用程序将通过它与 MQ 建立 JMS 连接,从而发布 WL 消息。 同时,在 CP 端,我们也需要建立到本地队列管理器(CP1.QM)的连接工厂, 应用程序通过它与本地队列管理器建立连接,订阅 WL 消息。 在 WAS 管理控制台配置面向 MQ 提供程序的 JMS 队列连接工厂的具体步骤如下:

  • 在图 3.1 中所示面板的“Additional Properties”部分,选择 “WebSphere MQ topic connections factories”进行创建或删除 WebSphere MQ 主题连接工厂(在 JMS1.1 规范中,我们也可以定义不局限于特定 JMS 域的连接工厂, 相应地,选择上图中的“WebSphere MQ connections factories”)。
  • 单击 New 按钮新建 MQ 主题连接工厂。
  • 在 MQ JMS 主题连接工厂设置的属性与上述在 MQ JMS 队列连接工厂设置的属性类似。 另外,面板中还可以设置主题连接工厂特定的属性, 比如 Broker control queue、Broker queue manager、Broker publication queue、 Broker subscription queue、Broker CC subscription queue。 在本应用中,保持为缺省值。
  • 单击面板底部的 Apply 提交更改,并保持更改。

我们需要设置的属性列表如下:

PositionNameJNDI NameHostPortQueue ManagerBroker Queue ManagerBroker versionBroker message selectionCCSID
HQBCP.WLPub.TCFBCP.WLPub.TCF127.0.0.11414HQ1.QMHQ1.QMBasicClient819
CPBCP.WLSub.TCFBCP.WLSub.TCF127.0.0.11414CP1.QMCP1.QMBasicClient819

特别地,CP 的 Topic 连接工厂用于消息订阅, 为了保证应用程序停止运行期间发布的消息在应用启动后也能收到, 我们需要启用 Durable 订阅特性。 这需要在这里配置 CP Topic 连接工厂(BCP.WLSub.TCF)的 Client ID 属性, 我们的配置值为 BCP.CP1。

定义 JMS 主题

在本应用实例中,我们需要登录在 HQ 和 CP 端的 WAS 管理控制台, 分别定义连接到主题 BCP.WL.Topic 的 JMS 主题目标。 应用程序使用 JMS 主题来传递发布 / 订阅类型的 JMS 消息。 部署在 HQ 端的应用程序通过该主题目标发布 WL 消息; 部署在 CP 端的应用程序通过该主题目标订阅 WL 消息。 下面以建立连接到 BCP.WL.Topic 的 JMS 主题目标为例, 说明在 WAS 管理控制台配置面向 MQ 提供程序的 JMS 主题目标的具体步骤。

  • 在管理控制台,我们再次导航到 WebSphere MQ 消息提供程序面板。 在图 3.1 中所示面板的“Additional Properties”部分, 选择“WebSphere MQ topic destinations”进行创建或删除 WebSphere MQ 主题目标。
  • 单击 New 按钮新建主题目标。
  • 除了标准的 JMS 管理对象属性(Name 和 JNDI name)外, 我们唯一必须设置的属性是 MQ 的 MA0C 代理中的主题名称(Base topic name), 输入 BCP.WL.Topic。这个名称是任意的,但应该是唯一的主题名称。 另外,我们还需要指定 CCSID 属性值为 819。
  • 其他设置都接受默认值。
  • 单击面板底部的 Apply 提交更改,并保持更改。

定义侦听器端

我们的应用示例代码需要定义侦听器端口来监听队列 / 主题目标, 并驱动消息 Bean(MDB)接受消息。

首先,我们需要登录到 WAS 管理控制台,展开 Servers > Application servers, 选择 server1。然后在 Communications 栏目下展开 Messaging, 单击 Message Listener Service > Listener Ports 链接进入 Listener Ports 控制面板, 可以新建、删除、启动、停止 Listener Port。如下图所示:

我们可以点击 New 按钮进入新建页面并设置相应属性。 需要配置的 Listener Ports 的属性如下列表:

PositionNameConnection factory JNDI nameDestination JNDI nameMaximum retries
HQBCP.HQ1.MR.LSRBCP.HQ1.QCFBCP.MR2HQ.Q5
HQBCP.HQ2.MR.LSRBCP.HQ2.QCFBCP.MR2HQ.Q5
CPBCP.WLSub.LSRBCP.WLSub.TCFBCP.WL.T5

这里需要特别说明的是 Maximum retries 属性。 该属性指定了侦听器向 MDB 尝试分发一个消息的最大次数。 该属性的缺省值是 0,表示当第一次出现消息无法分发时,侦听器端口将被关闭。 因此,我们需要配置 MQ 的 Backout threshold 属性, 使得有害消息在导致侦听器停止工作之前由 MQ 转发到专门的错误队列中。 在应用实例中,我们需要在 MQ Explorer 中分别对队列 BCP.MR2HQ.Q、 以及 WMQ 默认的订阅队列(SYSTEM.JMS.D.CC.SUBSCRIBER.QUEUE 和 SYSTEM.JMS.ND.CC.SUBSCRIBER.QUEUE)进行配置。 设置 Backout threshold 属性值设置为 3,并设置相应的 Backout requeue queue 属性值为某个用于存放有害消息的本地队列(如图 3.3)。

有关有害消息的处理的详细介绍,请参阅参考资料: 《WebSphere Application Server 处理有害消息的方法》。

与应用集成

同步 MR 数据到 HQ

每个 CP 都会将清关数据及时的上报到 HQ,对于 MQ 来说, 这属于典型的点对点(PTP)消息处理模式。 以 CP1 为例,定义在 HQ 端的 Cluster 共享队列 MR2HQ.Q 对 CP1 来说是可见的, CP1 会周期的将新产生的 MR 数据封装为 ObjectMessage 类型的 JMS 消息, 并发送到本地 CP1.QM 队列管理器中的 MR2HQ.Q 队列,进而传送到 HQ 端。

要发送 JMS 消息到目标队列中,首先需要得到一个 ConnectionFactory 对象, 以及代表队列的 Destination 对象。经过上节的正确配置后, 可以很容易的通过查询 J2EE 容器提供的 Context,来获取这些对象实例。 示例代码如下:

QueueConnectionFactory queueConnectionFactory = null;
Queue queue = null;
Context context = new InitialContext();
queueConnectionFactory = (QueueConnectionFactory) context.lookup("BCP.CP1.QCF");
queue = (Queue) context.lookup("BCP.MR2HQ.Q");

其次,创建 Connection 和 Session 对象,并新建 MessageSender, 以及用来存放消息的 Message 对象。 最后,使用 MessageSender 将 JMS 消息发送到指定的 Destination 中。示例代码如下:

QueueConnection queueConnection = null;
QueueSession queueSession = null;
try {
 queueConnection = queueConnectionFactory.createQueueConnection();
 queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
 QueueSender queueSender = queueSession.createSender(queue);

 ObjectMessage message = queueSession.createObjectMessage();
 // collect all MR data which has not been synchronized
 Serializable mrData = collectNewMR();
 message.setObject(mrData);
 queueSender.send(message);
} finally {
 try {
 if (queueSession != null)
 queueSession.close();
 if (queueConnection != null)
 queueConnection.close();
 } catch (Exception e) {
 //…
 }
}

为了接收和处理来自 CP 的 MR 数据,HQ 端将会部署两个相同的 MDB 来监听队列 MR2HQ.Q。 之所以需要两个 MDB,是因为队列 MR2HQ.Q 在两个队列管理器 HQ1.QM 和 HQ2.QM 中都有定义 (为了负载均衡)。 这些 MDB 将 MR 记录从 JMS 消息中分离出来,并持久化到数据库,MDB 的示例代码如下:

public void onMessage(javax.jms.Message msg) {
ObjectMessage objMsg = (ObjectMessage) msg;
 try {
 DataObject mrData = (DataObject) objMsg.getObject();
 // store MR data to database
 synchronizeMR(mrData);
 } catch (JMSException e) {
 //…
 }
}

为了指定 MDB 监听的队列,需要在 EJB Deployment Descriptor 中进行相应的配置, 具体来说就是指定在 WAS 中预先配置好的 ListenerPort,下图是使用 WID 进行配置的界面, 对于另外一个 MDB,同样需要设置 ListenerPort 为 BCP.HQ2.MR.LSR。

分发 WL 数据到 CP

HQ 会不定期的更新 WL 数据,同时需要将更新后的数据分发到各个 CP, MQ 提供的 Pub/Sub 消息处理模式方便的实现了此类需求。 在修改完 WL 数据后,HQ 会将最新的数据发送到本地主题 BCP.WL.Topic, 进而广播到所有 CP 的主题 BCP.WL.Topic,所有订阅了该主题的 MDB 均会收到最新的 WL 数据, 并将其更新到本地的数据库中。

对于应用程序的开发和集成来说,WL 数据分发同 MR 数据同步的差别仅仅在于发送 / 接收消息的 Destination 不同,前者是 Queue,而后者是 Topic, 这对程序的影响微乎其微,这里就不一一列举了。 特别的是,Topic 订阅需要配置为 Durable 订阅,在之前 WAS 的配置中, 我们已经给 CP Topic 连接工厂配置了 Client ID 以应用 Durable 订阅特性, 在这里我们还需要在 EJB Deployment Descriptor 中对 MDB 作如下设置:

即在 Activation Configuration 中增加两个配置:

  • destinationType (java.jms.Topic)
  • subscriptionDurability (Durable)

小结

在本文中,您了解了如何配置 WebSphere MQ 和 MQ Cluster 来实现 Queue 和 Topic 两种消息机制。并通过一个具体的实例了解如何将 WebSphere MQ 与 WAS 及应用集成起来。

申明:本文仅代表笔者个人的观点,不代表 IBM 公司的观点。


相关主题


评论

添加或订阅评论,请先登录注册

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=WebSphere
ArticleID=295714
ArticleTitle=应用 WebSphere MQ V6 来构建企业信息总线的行业示例
publish-date=03272008