扩展 Aggregation 样本

了解如何修改用于满足不同需求的聚集流。如果您正在从产品的 V5 和更早版本移植现有聚集流,那么可能也会发现此样本有用。

扇出上的聚集状态控件

AggregateControl 节点的 Control 终端用来传播包含特定聚集操作的状态和跟踪信息的控制消息。您可以使用以下选项连线 Control 终端:

  1. 保持它不连线。
  2. 将它直接连线到 AggregateReply 节点的 Control 输入终端。 仅当两个节点都在相同的消息流中时,直接连线才适用。
  3. 将它连线到 MQOutput 节点,并使用 Compute 节点以添加 MQMD,这会将控制消息写入到用户定义队列。相应的 AggregateReply 节点可能位于单独流中,已将从此队列读取的 MQInput 节点连线到其 Control 终端。产品的 V5 和更早版本的现有聚集流可能以这种方式工作。

当选择一个选项时,您必须考虑的因素为:

管理扇出上的事务

您必须将扇出流上 MQInput 节点的“事务方式”设为“”,以避免在 AggregateControl 节点已存储控制信息之前,扇入流收到应答消息的可能性。

当扇出流不在事务控制下运行时,正在调用的应用程序可以立即处理 MQOutput 节点编写的每个聚集扇出请求消息。根据这个应用程序的响应,AggregateReply 节点可能会在已存储控制信息之前收到应答。

如果 AggregateControl 节点的 Control 终端连线到 Compute 节点和 MQOutput 节点,即扇入和扇出流位于不同的消息流中,可能发生相同的问题。即使扇出是在事务下完成,事务的扩展数据块是由 MQOutput 节点消息编写,因此仍可以在处理扇入流的“控制”分支时,将应答作为未知处理。在先前部分的选项 3 中讨论了此方案。

如果 AggregateReply 节点上的未知消息超时不是零,那么在两个情况下,聚集仍会正确完成。在指定的秒数之后,会存储并重新处理未知应答,此时,它们组合在控制状态信息中。未知消息超时到期导致延迟之后,聚集操作仍会正常完成。如果您将未知消息超时设为零,那么在控制消息之前收到的应答会直接传播到 AggregateReply 节点的 Unknown 终端,且不会与剩余的聚集数据组合。

总结此部分和前一部分,聚集扇出流的最有效设计确保:

如果您采用最有效的设计选项,那么它会确保先前文本中描述的超时问题类型不会发生。 但是,如果不能使用这些选项,您仍可以将 AggregateReply 节点上的未知消息超时参数设为非零值,以确保顺利完成聚集操作。

扇入上的事务上下文

AggregateReply 节点具有三种非错误输出终端:Out、Unknown 和 Timeout。请务必了解从这些不同终端发送消息的时间和原因,并特别注意事务上下文。事务上下文可以由 MQInput 节点拥有,当入局应答消息触发从 AggregateReply 节点的输出时会发生这种情况,或者可以由 AggregateReply 节点自己拥有。

当事务上下文由 AggregateReply 节点拥有时,事务上下文具有典型语义,但是事务控制着重在 AggregateReply 节点,而不是 MQInput 节点,这暗示稍后消息流中产生的错误可能导致 AggregateReply 节点回滚,并将消息传播到其 Catch 终端,而不是 MQInput 节点。从自己事务上下文内部的 AggregateReply 节点传播的消息不是直接从 MQInput 节点提供;AggregateReply 节点是此流调用的输入节点。

AggregateReply 节点的事务行为受其事务方式配置参数控制。您必须确保此设置和 MQInput 节点上的设置相同,以确保 AggregateReply 节点的所有输出都在相同的事务控制级别下。

以下情境在 MQInput 节点的事务控制下:

以下情境在 AggregateReply 节点的事务控制下:

避免扇入上的线程匮乏

AggregateReply 节点有两个输入终端:In 和 Control。如果您同时使用这两个终端,请记住 Control 终端的使用是可选的,将数据提供给 AggregateReply 节点的最有效方式是,在扇入流的 Filter 节点前包含一个 MQInput 节点。Filter 节点用来根据适当的情况,将入局消息传递到 AggregateReply 节点的 In 或 Control 终端。在消息流中使用此方法,而不是使用两个 MQInput 节点:一个用于 In 终端,另一个用于 Control 终端。扇入流类似于下图:
具有过滤的截断的扇入流
Filter 节点需要类似于以下示例的 ESQL 模块,以确保消息传递到 AggregateReply 节点的适当终端:

CREATE FILTER MODULE FanIn_Filter

CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
IF Root.XMLNSC.ComIbmAggregateControlNode IS NULL THEN
RETURN TRUE; -- wired to In
ELSE
RETURN FALSE; -- wired to Control
END IF;
END;
END MODULE;

您必须使用单个 MQInput 节点,因为在两个 MQInput 节点之间无法指定必须分发多少附加线程(通过使用附加实例提供)。AggregateReply 节点 In 终端中的流量似乎更高,因此使更多的线程在其输入节点中运行是有用的,但是无法如此配置。因此,节点可能会出现线程匮乏,从而备份应答消息并引起聚集机制延迟。

AggregateControl 节点的 Control 终端连线到队列的输出时此情境才适用。如果您未连线 Control 终端,可以越过这些问题。

如果不能实施之前的解决方案,那么可以强制正在读取控制消息的 MQInput 节点单线程运行。

  1. 在 MQInput 节点配置的“高级”面板中,将“排序方式”设置为“按队列排序”。
  2. 选择“逻辑顺序”,这会释放所有已配置的其他实例,比便它们可供其他 MQInput 节点使用。

但是,此配置会导致第一个 MQInput 节点的性能严重受限。

返回到样本主页