 | 级别: 中级 Keshava Murthy (rkeshav@us.ibm.com), 架构师, IBM
2006 年 11 月 27 日 IBM® WebSphere® Message Queue (MQ) 软件套件为分布式的异构应用程序交换信息、委托工作、协调事件和提供服务提供了可靠的消息传递。当 Informix® 应用程序使用 WMQ 时,开发人员可以编写定制代码,管理多个连接,并通过应用程序路由数据。Informix Dynamic Server (IDS) Version 10.00.UC3 引入了对 Informix 应用程序与 WebSphere MQ 的交互的内置支持,这样的交互是通过使用具有两阶段提交支持的 SQL 可调用函数进行的。这减少了开发开销,并且封装了集成的复杂性。
WebSphere MQ 简介
简单来说,WebSphere MQ 是两个端点之间交换消息的一种方法。它充当两个系统之间的中介,提供诸如可靠性和事务语义之类的增值功能。
图 1. 用于业务集成的 WebSphere MQ
不管是在 amazon.com 上购买一本书,还是使用 ibm.com 的电子商务,订购事件都将触发在多个模块之间传递信息的工作流:用户帐户管理、开帐单、打包和发货、采购、客户服务和合作伙伴服务。被触发的模块中执行的任务又触发随后的工作流。为了满足可靠性和可伸缩性需求,通常将应用程序的诸多模块放在多台计算机上。
如果在所有系统上使用相同的软件,例如 SAP,那么软件本身通常会附带工作流管理特性。如果各模块是在一个同构环境中运行的,例如在运行 WebSphere 和 Informix 的 Linux® 机器上,那么就更容易使用分布式查询或企业复制来更改信息。另一方面,如果应用程序运行在异构的系统上,例如 WebSphere、DB2®、Oracle 和 Informix 的不同组合,则编程和分布式查询或复制的设置就有些复杂,在很多情况下甚至不能满足应用程序的需求。
WebSphere MQ 就是为解决这类集成问题而设计的。它不限于特定的平台,也不强加特定的范例:WebSphere MQ 支持超过 80 种平台,并支持 C、C++、Java™、Java Message Service (JMS) 和 Visual Basic 等 API。WebSphere MQ 还是为面向服务架构(SOA)设计企业服务总线(ESB)的支柱。
WebSphere MQ 提供了一种可靠的存储和转发(store-and-forward)机制,所以每个模块可以与它互发消息。WebSphere MQ 是通过持久队列和用于编程的 API 来实现这一点的。此外,WebSphere MQ Message Broker —— WebSphere MQ 产品套件中的另一种产品 —— 提供了消息路由和转换服务。由于基础设施的简单性,应用程序必须自行处理一些事情,例如建立消息格式和队列属性。WebSphere MQ 还支持队列的发布和订阅语义,这使得将一条消息发送给多个接受者和按需向队列订阅消息变得更为容易,这一点与邮件列表类似。
应用程序预先确定队列名称、消息和消息格式,就像两个网络应用程序就套接字数量达成协议一样。应用程序到应用程序的消息交换是异步的 —— 应用程序不必等到另一个应用程序收到消息。WebSphere MQ 保证消息是被可靠地存储的,并且使消息可为目标应用程序所用。但是,目标应用程序需负责从 WebSphere MQ 接收消息。
Informix(及其他数据库服务器)应用程序如何使用 WebSphere MQ?
图 2. 使用 MQ Interface (MQI) 或 JMS 接口使 MQ 与 IDS 集成
应用程序有很多输入源,例如:用户输入、B2B 事务、工作流消息和数据库中的数据。图 2 中的订单输入应用程序需要将数据存储在 Informix 数据库中,还需要将消息发送到 WebSphere MQ 以及从 WebSphere MQ 接收消息。应用程序建立与 Informix 和 WMQ 的连接。此外,应用程序使用事务管理器来确保数据交换的可靠性。例如,保存在数据库中的订单必须被发送到一个队列,并在数据库中标记为已处理。只有当 WebSphere MQ 成功地收到消息之后,才能将订单标记为已处理。因此,交互必须受到事务性的保护。
订单输入应用程序通过编写定制代码与 WebSphere MQ 交换消息。每当应用程序要与 WebSphere MQ 交互时,都需要定制代码,这部分的开发工作很费成本。因此,需要训练程序员从事此项工作,或者雇用顾问来开发、调试和维护这样的代码,并根据新的队列和应用程序对它进行修改。数据库与 WebSphere MQ 之间交换的数据要经过应用程序 —— 如果有大量的数据需要交换,那么这显然效率不高,而且必须要有一个事务管理器。
IDS 对 WebSphere MQ 的支持
图 3. 使用 Informix MQ 函数
IDS 提供了用于读取、接收、发送、订阅和发布的 SQL 可调用函数。这些 SQL 可调用函数将 WebSphere MQ 特性暴露给 IDS 应用程序,还将 WebSphere MQ 操作集成到 IDS 事务中。也就是说,WebSphere MQ 操作的命运依赖于事务的命运。如果事务被回滚,那么 WebSphere MQ 上的操作 —— 消息发送或接收 —— 也随之回滚。这是通过在 IDS 和 WebSphere MQ 处协调事务来完成的,而不是通过补偿事务完成的。因此,这种方式既可靠,又能获得高性能。
调用 MQ 函数的代码
使用 IDS WebSphere MQ 的功能,向 WebSphere MQ 队列发送消息或从中接收消息都很简单,如清单 1 所示:
清单 1. 在 SQL 和存储过程中使用 Informix MQ 函数
select MQSend("CreditService", customerid || ":" || address || ":" || product ":" || orderid)
from order_tab
where customerid = 1234;
insert into shipping_tab(shipping_msg) values(MQReceive());
create function get_my_order() returns int;
define cust_msg lvarchar(2048);
define customerid char(12);
define address char(64);
define product char(12);
define corderid char(12);
define snd_status int;
-- Get the order from Order entry application.
execute function MQReceive("OrderQueue") into cust_msg;
let customerid = substr(cust_msg, 1, 12);
let address = substr(cust_msg, 14, 77);
let product = substr(cust_msg, 79, 90);
let corderid = substr(cust_msg, 92, 103);
insert into shipping_table(custid, addr, productid, orderid)
Values(customerid, address, product, corderid);
-- send the status to CRM application
execute function MQSend("CRMQueue", corderid || ":IN-SHIPPING") into snd_status;
return 1;
end function;
|
调用 MQ 函数的代码
当回滚清单 2 中的事务时,接收到的消息被恢复到队列 bookorder 中,这一行将从 shipping_tab 中删除。
清单 2. 包含 MQ 函数的多语句事务
begin work;
insert into shipping_tab(shipping_msg) values (MQReceive("bookorderservice"));
rollback work; -- Undo previous statement including WMQ operation.
|
在 IDS 应用程序中使用 MQ 函数
IDS 提供了用于暴露 WebSphere MQ 提供的每个接口 —— 读取、接收、发送、发布、订阅和取消订阅 —— 的函数,以及用于发送和接收大型消息的函数。WebSphere MQ 函数可以在任何可使用函数的地方进行调用:值子句、投影列表、查询过滤器、存储过程和触发器。此外,借助 IDS,可以将一个 WebSphere MQ 队列映射为一个 IDS 表。这个表上的插入操作将被翻译为到 WebSphere MQ 的发送操作,选择操作则被翻译为读取或接收操作。
基础
WebSphere MQ 提供队列及其操作的简单抽象,每种操作带有一些选项,例如消息到期时间和重试计数。IDS 将这些选项抽象为服务和策略。
服务:将队列、队列管理器和消息的代码集映射为服务。表 "informix".mqiservice 存储这种映射。IDS.DEFAULT.SERVICE 被映射为系统缺省队列管理器、名为 IDS.DEFAULT.QUEUE 的队列和缺省代码集。
策略:策略定义每种操作的诸如优先级、到期日期之类的属性。表 "informix".mqipolicy 存储每种策略的 37 个属性。IDS.DEFAULT.POLICY 是缺省策略。取决于您的应用程序环境,需要创建一个或多个策略。
相关 ID:当多个应用程序共享相同的队列时,可以使用一个最长为 48 字节的相关 ID 来控制交互。一旦应用程序就它们的消息的相关 ID 达成一致,它们就可以获得与相关 ID 匹配的消息。其工作方式类似于过滤器或 SQL 查询中的谓词。相关 ID 不是强制要求的,也没有缺省值。
表 1. IDS 中的 MQ 函数
| 函数名 | 描述 |
|---|
| MQSend() | 将一条字符串消息发送到一个队列 | | MQSendClob() | 将 CLOB 数据发送到一个队列 | | MQRead() | 将队列中的一条字符串消息读取到 IDS 中,但是不从队列中将其删除 | | MQReadClob() | 将队列中的一个 CLOB 读取到 IDS 中,但是不从队列中将其删除 | | MQReceive() | 将队列中的一条字符串消息接收到 IDS 中,并将其从队列中删除 | | MQReceiveClob() | 将队列中的一个 CLOB 接收到 IDS 中,并将其从队列中删除 | | MQSubscribe() | 订阅一个主题 | | MQUnSubscribe() | 取消订阅之前订阅的主题 | | MQPublish()() | 发布一条消息到一个主题中 | | MQPublishClob() | 发布一个 CLOB 到一个主题中 | | CreateMQVTIRead() | 创建一个 read Virtual Table Interface (VTI) 表,并将它映射到一个队列 | | CreateMQVTIReceive() | 创建一个 receive VTI 表,并将它映射到一个队列 | | MQTrace() | 跟踪 MQ 函数的执行 | | MQVersion() | 获得 MQ 函数的版本 |
用于从 IDS 发送到 WebSphere MQ 的函数
清单 3. MQSend 和 MQSendClob 函数
MQSend(Service, Service_Policy, Message, CorrelationID);
MQSendClob(Service, Service_Policy, ClobMessage, CorrelationID);
|
可以将最长为 32739 字节的消息发送到 WebSphere MQ 队列。要发送更大的消息,可以使用 CLOB 数据类型和 MQSendClob() 函数。MQSendClob() 的行为与 MQSend() 一致,惟一的区别是它以 CLOB 作为消息参数,而不是以字符类型作为参数。Message 和 ClobMessage 是必需的参数。IDS 使用保存在 "informix".mqiservice 表中的 Service 记录项中的策略将消息发送到队列管理器所管理的队列中。
发送函数的参数解释
四个参数都被指定。当给出了 4 个参数时,翻译起来就很简单。MQSend(serviceparam, policyparam, messageparam, correlationparam)
照样执行即可。
下面是缺少一个或多个参数的情况下的翻译:
MQsend(messageparam) 被翻译为:MQSend("IDS.DEFAULT.SERVICE", "IDS.DEFAULT_POLICY", messageparam, NULL);
MQsend(messageparam) 被翻译为:MQSend("IDS.DEFAULT.SERVICE", "IDS.DEFAULT_POLICY", messageparam, NULL);
MQsend(serviceparam, policyparam, messageparam) 被翻译为:MQSend(serviceparam, policyparam, messageparam, NULL);
清单 4. 发送函数示例
select MQSend("myservice", "mypolicy", orderid || ":" || address)
FROM tab
Where orderid = 12345; |
所有 WebSphere MQ 函数都应该在一个事务中运行。在 IDS 中,SQL 语句 SELECT、UPDATE、DELETE 和 INSERT 自动开始一个新事务。或者,可以用 BEGIN WORK 语句开始一个新事务。
如果直接执行函数,则会产生一个错误。例如:
execute function MQSend("MyService", "<order><id>5</id><custid>6789</custid></order>");
IDS 不会隐式地为 EXECUTE 语句开始一个新事务。因此,必须显式地开始一个事务:
清单 5. 显式事务
begin work;
execute function MQSend("MyService", "<order><id>5</id><custid>6789</custid></order>");
commit work;
|
如果事务被回滚,那么 WebSphere MQ 上的所有操作也随之回滚,就像 IDS 回滚它的更改一样。
清单 6. 带回滚的事务
begin work;
insert into resultstab(sendval) value(MQSend("MyService", "<order><id><5</id><custid>6789
</custid></order>"));
rollback work;
|
用于从 WebSphere MQ 读取和接收消息的 IDS 函数
清单 7. 读取和接收函数
MQRead(Service, Policy, CorrelationID) returns lvarchar;
MQReadClob(Service, Policy, CorrelationID) returns CLOB;
MQReceive(Service, Policy, CorrelationID) returns lvarchar;
MQReceiveClob(Service, Policy, CorrelationID) returns CLOB;
|
读取操作从队列获取消息,但是不从队列中删除消息。接收操作从队列中删除消息,并获取该消息。这些函数调用时可以带 0 个或多个参数。参数的解释类似于前面的 MQSend()。接收函数的事务行为类似于 MQSend。
MQRead() 和 MQReceive() 可以返回至多 32739 个字节。消息本身的最大大小是一个 WebSphere MQ 配置参数。更大的消息应该以 CLOB 的形式读取或接收。对于 MQ,一条消息就是一条消息。根据长度的不同,IDS 对消息加以区分,将消息映射到不同数据类型。
如果给定了一个相关 ID,那么 WebSphere MQ 获取队列中具有匹配相关 ID 的下一条消息。否则,返回 NULL 消息。当队列上没有适用的消息时,由策略决定等待时间。因此,通过使用预定义的相关 ID,多个应用程序可以为不同的目的共享相同的队列。
清单 8. SQL 中的读取和接收函数
select mqread("SHIPING.SERVICE","My.DEFAULT.POLICY") from systables where tabid = 1;
select mqreceive("SHIPING.SERVICE","My.DEFAULT.POLICY") from systables where tabid = 1;
|
发布和订阅函数
清单 9. 发布和订阅函数
MQPublish(publisher_name, policyparam, message, topic, correlationid);
MQPublishClob(publisher_name, policyparam, clob_param, topic, correlationid);
MQSubscribe(subscriber_name, policy_name, topic);
MQUnsubscribe(subscriber_name, policy_name, topic);
|
对一个队列的发布和订阅,对于在多个应用程序之间基于不同主题交换信息来说是一种有效的配置。当订单输入应用程序需要与信用卡应用程序、发货应用程序、CRM 应用程序和合作伙伴应用程序交互时,订单输入应用程序将订单发布到一个队列上,并且只需要发布这一次。然后,目标应用程序可以订阅这个队列,并使用读取或接收函数获得消息。在这种机制中,WebSphere MQ 还支持将消息归入到一些主题中,以便于进行更细致的控制。例如,订单输入消息可以将订单分为书籍、电子和服饰等主题。
为发布和定义主题,必须对队列进行配置。WebSphere MQ 允许静态或动态地定义主题。除了队列管理器外,提供发布和订阅特性的 Message Broker 也必须运行。Message Broker 组件提供消息路由和消息转换,使业务集成变得更容易。
订阅一个主题,并指定要从中接收消息的队列。当发布者将一条属于那个主题的消息插入到该队列时,WebSphere MQ 中介将消息路由到每个指定订阅者的队列。订阅者通过读取或接收函数从队列获取消息。
"informix".mqipubsub 表:在使用发布和订阅服务之前,必须建立这个表。请参阅 The Informix dynamic server documentation, its schema, and examples。
发布者的名称和订阅者的名称必须在 "informix".mqipubsub 表中定义。其他参数在前面已经讨论过。
清单 10. SQL 中的发布和订阅函数
select MQSubscribe(‘WeatherChannel’,"Weather") from systables where tabid = 1;
select mqPublish("WeatherChannel",
"<weather><zip>94501</zip><date>7/27/2006</date>
<high>89</high><low>59</low></weather>","Weather")
from systables where tabid = 1;
select mqreceive("WeatherChannel","Weather")
from systables where tabid = 1;
|
实用函数
MQVersion() 返回 IDS 中的 WebSphere MQ blade 的当前版本。通过
MQTrace(trace_level, trace_file) 可以跟踪 WebSphere MQ 函数的执行路径和 IDS 与 MQ 之间的交互。跟踪级别介于 10 到 50 之间 —— 是 10 的倍数。
清单 11. 跟踪输出示例
14:19:38 Trace ON level : 50
14:19:47 >>ENTER : mqSend<<
14:19:47 status:corrid is null
14:19:47 >>ENTER : MqOpen<<
14:19:47 status:MqOpen @ build_get_mq_cache()
14:19:47 >>ENTER : build_get_mq_cache<<
14:19:47 status:build_get_mq_cache @ mi_get_database_info()
14:19:47 status:build_get_mq_cache @ build_mq_service_cache()
14:19:47 >>ENTER : build_mq_service_cache<<
14:19:47 <<EXIT : build_mq_service_cache>>
|
MQ 表映射函数
在 IDS 中调用 WebSphere MQ 函数比较容易,但是并不是使用 MQ 的最容易的方式。IDS 可以将一个 WebSphere MQ 队列映射到一个 IDS 表。在表上执行一条 SELECT 语句就可以取得队列中的消息,而执行表上的 INSERT 语句就可以发送消息。用法如下所示。其他操作,例如 UPDATE 和 DELETE,是不允许在这种表上执行的。
清单 12. 表到队列映射函数
MQCreateVtiRead(readtable, servicename, policy, maxMessage)
MQCreateVtiReceive(receivetable, servicename, policy, maxMessage)
|
read 表上的 SELECT 操作的行为与 MQRead() 类似。它取得消息,但是不会将消息从队列中删除。而 receive 表上的 SELECT 操作在取走消息的同时,还从队列中删除消息。maxMessage 参数决定列的长度,同时还决定列的类型。当长度为正数时,创建一个长度为 maxMessage 的 lvarchar 列。可以定义的消息的最大长度为 32607。如果使用 -1 作为 maxMessage,则可以检索 CLOB 类型的消息,如果使用 -2 作为 maxMessage,则可以检索 BLOB 类型的消息。
清单 13. MQ 表到队列映射函数
-- Create a READ table with max message length 4096.
execute function MQCreateVTIREAD("myreadtable", "myservice", "mypolicy", 4096);
-- Below is the table created by MQCreateVTIREAD() function.
create table myreadtab
( msg lvarchar(4096),
correlid varchar(24),
topic varchar(40),
qname varchar(48),
msgid varchar(12),
msgformat varchar(8))
using "informix".mq (SERVICE = "myservice", POLICY = "mypolicy", ACCESS = "READ");
-- Get the top 10 messages from the queue.
SELECT first 10 * from myreadtable;
-- INSERT a message into the table
INSERT into myreadtable values("IBM:81.98;Volume:1020");
-- SELECT the first message matching correlation id
SELECT FIRST 1 * from myreadtable where correlid = 'abc123';
IDS is aware of correlation id predicate and sends the correlation id request to MQ.
WMQ matches to correlation ID and sends the matched message.
-- create a table to transport BLOB data.
execute function MQCreateVTIRECEIVE("mydoctable", "myservice", "mypolicy", -2);
-- Below is the table created by MQCreateVTIRECEIVE() function.
create table mydoctable
( msg BLOB,
correlid varchar(24),
topic varchar(40),
qname varchar(48),
msgid varchar(12),
msgformat varchar(8))
using "informix".mq (SERVICE = "myservice", POLICY = "mypolicy", ACCESS = "RECEIVE");
execute function MQCreateVTIREAD("myreadtable", "myservice", "mypolicy", 4096);
execute function MQCreateVTIREAD("myreadtable", "myservice", "mypolicy", 4096);
INSERT into mydoctable(msg) select blobcol from ordertab;
-- insert using blob, get through blob
insert into mydoctable(msg) values(filetoblob("/etc/passwd", "client"));
select lotofile(msg, '/tmp/blob.dat','client') from mydoctable;
|

 |

|
MQ 函数和事务
图 4. 将 MQ 函数集成到 IDS Transaction Manager 中
任何用于与 MQ 交换消息的 WebSphere MQ 函数都必须在一个事务中显式或隐式地调用。为了提供 IDS 与 MQ 之间的可靠的交互,必须依赖于事务。当提交获得成功时,应用程序需要持久存储在 IDS 和 WebSphere MQ 上对数据的更改。当应用程序回滚时,WebSphere MQ 上的任何操作也随之回滚,就像 IDS 上的操作回滚一样。当发出数据操纵语言语句(UPDATE/DELETE/INSERT 或 SELECT)和数据定义语言语句(CREATE)时,IDS 就会隐式地开始一个事务。或者,如果关闭了自动提交,可以用 BEGIN WORK 语句和 Java Database Connectivity(JDBC) 之类的 API 显式地开始一个新事务。
注意:EXECUTE FUNCTION/PROCEDURE 语句不会开始一个新事务,所以,在 EXECUTE 语句中调用 WebSphere MQ 函数之前,需要开始一个事务。
事务管理对于应用程序是透明的。应用程序只是在一个事务中使用 WebSphere MQ 的功能,而由 IDS 使用开放的两阶段提交协议来处理 IDS 与 WebSphere MQ 之间的提交或回滚的协调。这项工作被集成到 IDS Transaction Manager 中。IDS 处理 WebSphere MQ,以及它的调用其他 IDS 实例的分布式事务。在 IDS-MQ 交互期间,IDS 打开一个到 WebSphere MQ 的连接,当应用程序在一个事务内调用第一个 WebSphere MQ 函数时,IDS 就在 MQ 上开始一个相应的事务。在提交或回滚期间,IDS Transaction Manager 知道 WebSphere MQ 参与了事务,并使事务与之协调。
环境
IDS 提供了 MQ 功能,在安装 IDS 的时候,数据刀片(datablade)就被安装到 $INFORMIXDIR/extend 中。必须将数据刀片注册到您想要在其中调用 MQ 函数的数据库中。目前只有 WebSphere MQ 与 Informix-logged 数据库的交互是受支持的。WebSphere MQ 与 ANSI 和 non-logged 数据库的交互是不受支持的。
IDS 使用服务器 API 与 WebSphere MQ 通信。因此,需要将 WebSphere MQ 与服务器安装在同一台计算机上。WebSphere MQ 可以将消息传送到一个或多个远程 WebSphere MQ 服务器。每个动态服务器实例只能连接到一个 WebSphere MQ Queue Manager。
如果您想在 non-logged 或 ANSI 模式的数据库中使用 WebSphere MQ,那么请将您的要求发送给作者,或发送给 IBM Informix 支持小组。
平台支持
表 2. 平台支持
| Informix Dynamic 服务器 | 受支持的平台 | WebSphere MQ 版本 |
|---|
| 10.00.xC3 和更高版本 | Solaris-32 bit * HP/UX(PA-RISK) -- 32bit * AIX-32bit * Windows-32bit | Needs V5.3 或更高版本 | | 10.00.xC4 和更高版本 | AIX-64 bit * HP/UX (PA-RISK) -- 64bit | Needs V6.0 或更高版本 | | 10.00.xC5 和更高版本 | Linux (Intel) -- 32 bit * Linux(pSeries) - 64bit * Solaris - 64bit | Needs C6.0 或更高版本 |
结束语
有了 IDS WebSphere MQ,就不必为 IDS 应用程序与 MQ 的交互开发定制代码了。设置好队列、服务和策略之后,开发人员可以在他们选择的开发环境中,像对待其他内置函数一样来使用 WebSphere MQ 函数。更妙的是可以建立 READ 和 RECEIVE 表,然后对其执行 SELECT 和 INSERT 操作。用您节省下来的时间,悠闲地品味通过 MQ 订购的自己喜欢的美酒吧!
参考资料 学习
获得产品和技术
讨论
关于作者  | |  | Keshava Murthy 是 IBM Informix Dynamic Server(IDS) 的 SQL 和 Optimizer 组件的架构师,负责带领新版本的特性开发。他从事过多种不同关系数据库和对象关系数据库方面的工作,曾经开发过 SQL、RTREE、分布式查询、异构事务管理和 IDS 的可扩展性组件中的特性。他和 Sitaram Vemulapalli 一起开发过 IDS 中的 MQ 功能。他与主要的 ISV 及客户协作,为在 Informix 上进行嵌入式和应用程序开发创造条件。Keshav 拥有印度 Mysore 大学计算机科学与工程学士学位。 |
对本文的评价
|  |