内容


WebSphere Message Broker V6.1 中的文件处理

Comments

引言

本文介绍如何使用 IBM® WebSphere® Message Broker V6.1(以下称为 Message Broker)中新的文件处理功能。本文旨在帮助 Message Broker 开发人员和架构师使用 FileInput 和 FileOutput 节点实现常见的文件处理场景。您应该拥有 Message Broker 的一些经验,不过本文对初学者和有经验的用户应该都有用。

将文件处理功能作为核心产品的一部分,WebSphere Message Broker V6.1 是第一个版本。在使用 Message Broker 为新的或现有的消息流实现文件接口时,FileInput 和 FileOutput 节点应该是您的第一选择。

过去,消息流可能使用了 Message Broker File Extender、WebSphere Transformation Extender、Support Pac IA0X 或其他插件节点解决方案来提供文件处理。

Message Broker File Extender 基于 Primeur 推出的 SPAZIO 系列产品中的技术,并在 AIX®、Linux®、Sun Solaris® 和 Windows® 上运行。File Extender 节点在 z/OS® 上不可用。

WebSphere Transformation Extender for Message Broker 基于以前的 Mercator/Ascential 多输入、多输出转换引擎。过去,Message Broker 客户使用 Transformation Extender 来处理超大文件。

Message Broker V6.1 是为异构 IT 环境中通用数据连接、路由和转换而构建的高级企业服务总线。当与基于文件的应用程序集成时,Message Broker V6.1 对分布式和 z/OS 平台上的文件输入和文件输出的支持使其成为理想的选择。新的本机文件支持提供了:

  • 大文件处理
  • 批处理
  • 流解析

新节点的设计使得从其他解决方案迁移成为非常简单的任务:

  • FileInput 节点可以使用现有的消息集模型解析各个记录,以及用于记录检测(确定一个记录从何处结束以及下一个记录从何处开始)。
  • Local Environment 树用于存储和传播输入文件元数据。
  • FileInput 节点提供了与其他 Message Broker V6.1 节点相同的 Validation、Transactions 和 Instances 选项卡。

FileInput 节点记录检测

FileInput 节点为您提供了一组用于解析输入文件内容的方法。在决定要使用哪一个选项时,最重要的考虑事项是在传播每个记录时,文件中的多少数据应该通过消息流发送。FileInput 节点的 Records and Elements 选项卡指定应该如何解释输入文件的构成记录。在此选项卡和 Input Message Parsing 选项卡上设置属性,以控制从输入文件获得并通过流传播的消息数量:

图 1. FileInput 节点属性指定记录检测方法
图 1
图 1

下图总结了可调用的不同模型。红色条指示传播到流的其他部分的文件部分。绿色框表示由分隔符提供的标记:

图 2. Whole file——使用一次传播来传播整个文件
图 2
图 2
图 3. Fixed length——FileInput 节点通过计算传播的长度来拆分每次传播。不需要分隔符将每次传播与下一次传播分离。
图 3
图 3
图 4. Delimited——FileInput 节点通过确定分隔符来拆分每次传播。输入文件的每次传播之间包含一个分隔符。分隔符本身不通过消息流进行传播。
图 4
图 4
图 5. Parsed record sequence——只要解析器能够确定一次传播在何处结束和下一次传播在何处开始,则不需要任何分隔符将每次传播与下一次传播分离。如果与 XMLNSC 或 MRM TDS 选项结合使用,则每个记录可以具有不同的长度。
图 5
图 5

FileOutput 节点记录定义

FileOutput 节点为您提供了一组方法,以指定应该如何在各个记录的基础上构造输出文件内容。Records and Elements 选项卡使用 Record Definition 来指定输出文件的构造:

图 6. FileOutput 节点属性指定记录定义方法
图 6
图 6

下面的关系图总结了可调用的不同模型。红色条指示从流的主要部分传播到 FileOutput 节点的文件部分。绿色框表示由 FileOutput 节点的属性指定的分隔符提供的标记。绿色框表示由 FileOutput 节点的属性指定的填充字节提供的标记。

图 7. 记录为 Whole File。使用一次传播来传播整个文件内容。
图 7
图 7
图 8. 记录为 Unmodified Data。每次传播之间不写入分隔符。
图 8
图 8
图 9. 记录为 Fixed Length Data。FileOutput 节点使用填充字节将每次传播的长度填充至所需的长度,从而拆分每次传播。输出文件中不使用任何分隔符。
图 9
图 9
图 10. 记录为 Delimited Data。FileOutput 节点在每次传播之间添加分隔符,从而拆分每次传播。
图 10
图 10

使用 FileInput 节点检测带分隔符的记录

在 FileInput 节点的 Records and Elements 选项卡上,选择 Delimited 将告诉该节点,应该使用分隔符将输入文件中的数据分离为单独的记录以便传播。分隔符决不会包括在传播的消息中。

配置分隔符

可以将分隔符设置为 DOS 或 UNIX 行结束符,或自定义分隔符(十六进制):

图 11. FileInput 节点属性指定分隔符
图 11
图 11

当将分隔符设置为 DOS 或 Unix 行结束符时,节点将行结束符识别为分隔符。在 Windows 上,行结束符是回车符后面跟着一个换行字符(<CR><LF> 或 X'0D0A')。在 Unix 上,行结束符是单个换行字符(<LF> 或 X'0A')。无论运行代理的平台是什么,节点都将这两个序列视为分隔符。如果两个序列同时出现在同一个文件中,则节点将两者都视为分隔符。

节点不会识别 z/OS 上的 EBCDIC 文件中发现的行结束符,即换行字节 (X'15')。要将此行结束符识别为分隔符,可以设置节点使用值为 15 的自定义分隔符。

当将分隔符设置为自定义分隔符(十六进制)时,节点使用自定义分隔符属性中指定的字节或字节序列作为分隔符。有效的自定义分隔符必须具有偶数个十六进制数字。例如,您可以输入 0D0A,其中包括四个十六进制数字(两个字节),并表示 X'0D0A'。最大序列长度为 32 个十六进制数字(16 个字节)。如果没有指定自定义分隔符,则使用缺省的 X'0A',此字符表示单个换行字符。

配置分隔符类型

FileInput 节点将输入文件中出现的每个分隔符视为分离(中缀)或终止(后缀)每个记录。如果文件以分隔符开始,则节点将该分隔符之前的(零长度)文件内容视为一个记录,并向流传播一个空记录。

缺省的分隔符类型为 Postfix(后缀)。这意味着每个分隔符终止一个记录。如果文件以分隔符结尾,则不会传播分隔符之后的空记录。如果文件没有以分隔符结尾,则对文件的处理方式就像文件的最后一个字节后面跟着一个分隔符一样。

或者,可以指定 Infix(中缀)类型的分隔符。这意味着每个分隔符分离记录。如果文件以分隔符结尾,则仍然传播最后一个分隔符后面的(零长度)文件内容,尽管其中没有包含数据。

读取其中的记录由 DOS 或 Unix 行结束符分隔的文件的示例

假设此示例中的输入文件具有以下内容。每行以行结束符(在 Windows 上为 X'0D0A',在 Unix 上为 X'0A')结尾。

<AccNo>12345</AccNo>
<AccNo>34567</AccNo>
<AccNo>56789</AccNo>

在 FileInput 节点的 Records and Elements 选项卡上选择以下属性:

图 12. FileInput 节点属性
图 12
图 12

FileInput 节点检测以 DOS 或 UNIX 行结束符结尾的记录,并为发现的每个记录传播一条消息。DOS 或 UNIX 行结束符不是任何消息的一部分。结果是传播了三条消息,如下所示:

消息 1: <AccNo>12345</AccNo>

消息 2: <AccNo>34567</AccNo>

消息 3: <AccNo>56789</AccNo>

读取其中的记录由自定义分隔符分隔的文件的示例

假设此示例中的输入文件具有以下内容。文件结尾没有行终止符。

<AccNo>12345</AccNo>,<AccNo>34567</AccNo>,<AccNo>56789</AccNo>

在 FileInput 节点的 Records and Elements 选项卡上选择以下属性:

图 13. FileInput 节点属性
图 13
图 13

十六进制 X'2C' 表示 ASCII 中的逗号字符。在其他系统上可能需要使用不同的十六进制代码(此序列不经历代码页转换)。

FileInput 节点检测逗号字符并使用该字符分离记录。由于 Delimiter type 属性的值为 Infix,因此文件结尾不需要逗号。逗号字符不属于传播的任何消息的一部分。结果是传播了三条消息,如下所示:

消息 1: <AccNo>12345</AccNo>

消息 2: <AccNo>34567</AccNo>

消息 3: <AccNo>56789</AccNo>

在此示例中,消息正文中没有任何逗号。如果消息正文中出现了逗号,则会导致在该点拆分记录,从而导致将不正确地形成的消息传播到流的其余部分。

使用 FileOutput 节点定义带分隔符的记录

在 FileOutput 节点的 Records and Elements 选项卡上,选择 Record is Delimited Data 将告诉该节点,应该使用分隔符分离从输入消息得到的数据。仅当在 Finish File 终端上接收到消息时,文件才结束。

配置分隔符

可以将分隔符设置为 Broker 系统行结束符或自定义分隔符(十六进制)。

图 14. FileOutput 节点属性
图 14
图 14

当将分隔符设置为 Broker 系统行结束符时,节点将使用运行代理的平台上的相应行结束符字节序列作为分隔符。在 Windows 上,行结束符是回车符后面跟着一个换行字符(<CR><LF> 或 X'0D0A')。在 Unix 上,行结束符是单个换行字符(<LF> 或 X'0A')。在 z/OS 上,行结束符是“换行”字符 (X'15')。

当将分隔符设置为 Custom Delimiter (Hexadecimal) 时,节点将使用 Custom delimiter 属性中指定的显式分隔符序列作为要使用的分隔符。有效的自定义分隔符必须具有偶数个十六进制数字,并且允许的最大序列长度为 16 字节(32 个十六进制数字)。缺省分隔符为 X'0A',此字符表示单个换行字符。

配置分隔符类型

缺省的分隔符类型为 Postfix。这意味着在所写入的每个记录后面添加分隔符。

或者,可以指定 Infix 类型的分隔符。这意味着仅将分隔符插入任何两个相邻记录之间。

写入其中的记录由 DOS 或 Unix 行结束符分隔的文件的示例

假设将以下三条消息发送到 FileOutput 节点的 In 终端,随后将最后一条消息发送到同一个节点的 Finish File 终端。最后一条消息包含什么内容并不重要。

消息 1: <AccNo>12345</AccNo>

消息 2: <AccNo>34567</AccNo>

消息 3: <AccNo>56789</AccNo>

最后一条消息: <Anything></Anything>

在 FileOutput 节点的 Records and Elements 选项卡上选择以下属性:

图 15. FileOutput 节点属性
图 15
图 15

FileOutput 节点写入一个文件。该文件包含三个记录,每个记录由运行代理的平台上的相应行终止符终止。文件包含以下内容:

<AccNo>12345</AccNo>
<AccNo>34567</AccNo>
<AccNo>56789</AccNo>

写入其中的记录由自定义分隔符分隔的文件的示例

假设将以下三条消息发送到 FileOutput 节点的 In 终端,随后将最后一条消息发送到同一个节点的 Finish File 终端。最后一条消息包含什么内容并不重要。

消息 1: <AccNo>12345</AccNo>

消息 2: <AccNo>34567</AccNo>

消息 3: <AccNo>56789</AccNo>

最后一条消息: <Anything></Anything>

在 FileOutput 节点的 Records and Elements 选项卡上选择以下属性:

图 16. FileOutput 节点属性
图 16
图 16

十六进制数字 X'2C' 表示 ASCII 中的逗号字符。在其他系统上可能需要使用不同的十六进制序列(此序列不经历代码页转换)。

FileOutput 节点写入一个文件,其中包含单个行。该文件包含以下内容。

<AccNo>12345</AccNo>,<AccNo>34567</AccNo>,<AccNo>56789</AccNo>

请注意,使用 Infix 作为分隔符类型意味着,写入的最后一个记录后面没有添加逗号,因此分隔符仅插入任何两个相邻记录之间。

传输大文件

文件节点可用于将文件从一个目录位置传输到另一个目录位置。如果启用了 FTP,则可以将文件从某个目录传输到远程 FTP 服务器。这将在轮询 FTP 服务器上的 FileInput 节点内容部分中介绍。

通过将数据分拆为较小的块并逐块地“按原样”传输数据,可以实现使用 File 节点传输大文件。

例如,考虑一个大小为 139,456,234 KB 的任意文件。通过按如下方式配置节点属性,可以按 20,000,000 字节大小的块传输此文件:

  1. 在 FileInput 节点的 Input Message Parsing 选项卡上,将 Message 域设置为 BLOB,因为只是通过流传输数据而不解析数据。
  2. 在 FileInput 节点的 Records and Elements 选项卡上,将 Record Detection 设置为 Fixed Length。同样在此选项卡上,将 Length(bytes) 设置为将在每个块中传输的字节数。因此在此例中将其设置为 20000000
  3. 将 FileInput 的 End of Data 终端连接到 FileOutput 节点的 Finish File 终端。这将导致在传输最后一个块后关闭输出文件。
  4. 在 FileOutput 节点的 Records and Elements 选项卡上,将 Record Definition 设置为 Record isUnmodified Data
图 17. 用于传输大文件的消息流
图 17

在此例中,139,456,234 KB 的传入文件大小没有均匀地拆分为 20000000 字节的块。这意味着除了最后一个记录的长度将为 3183616 字节以外,其他每个记录将为 20000000 字节。

将输出记录填充到固定长度

在此例中,输入文件包含许多具有不同长度的记录。这些记录将写入到单个输出文件。将处理输出文件的应用程序预期每个记录的长度为 120 字节。FileOutput 节点的 Records and Elements 选项卡上的 Padding byte 字段可用于将长度不定的记录填充到固定长度。在此例中,通过在 Records and Elements 选项卡上设置以下值,所有输出记录将使用十六进制值 20(ASCII 平台上的空格字符)填充至 120 个字节长度:

  1. 在 Records and Elements 选项卡上,将 Record Definition 设置为 Record is Fixed Length Data。
  2. 将 Length (bytes) 设置为值 120
  3. 将 Padding byte (hexadecimal) 设置为值 20

将 MQ 消息转换为文件

在此例中,三个 MQInput 节点接受 XML 消息,这些消息全都写入单个输出文件。每个 MQ 消息被追加到同一个输出文件。下面的流可用于实现此目的:

图 18. 示例消息到文件消息流
图 18
图 18

三个 MQInput 节点(其 Message Domain 属性设置为 XMLNSC)将消息传播到一个 Compute 节点。由于 FileOutput 节点从三个来源接收输入,因此需要向 Finish File 终端发送一条消息以关闭文件。此消息的内容是什么并不重要,重要的是 Finish File 终端接收到一条消息。然后文件将关闭,并从 mqsitransit 目录移动到输出目录。

在此例中,向 Compute 节点添加了 ESQL 以确定何时向 Finish File 终端发送消息。当所有 MQInput 节点都已发送完最后一条消息时,其中的最后一条消息将传播到 Finish File 终端以导致文件关闭。Compute 节点的 Out1 终端连接到 FileOutput 节点的 Finish File 终端。

DECLARE LAST_MESSAGE SHARED INTEGER 0;
CREATE COMPUTE MODULE InputFlow_Compute
CREATE FUNCTION Main() RETURNS BOOLEAN
BEGIN
SET OutputRoot = InputRoot;
IF InputRoot.XMLNSC.Dept.LastMessage = 'YES' THEN
BEGIN ATOMIC        
SET LAST_MESSAGE = LAST_MESSAGE + 1;
END;
END IF;
IF LAST_MESSAGE = 3
THEN
PROPAGATE TO TERMINAL 'out' DELETE NONE;
PROPAGATE TO TERMINAL 'out1' DELETE NONE;
SET LAST_MESSAGE = 0;
ELSE
PROPAGATE TO TERMINAL 'out' DELETE NONE;  
END IF;
RETURN FALSE;
END;

在 FileOutput 节点上,切换到 Records and Elements 选项卡,并将 Record Definition 设置为 Record is unmodified data。在 Basic 选项卡上,将 DirectoryFile Name or pattern 分别设置为输出文件的位置和名称。

将文件记录转换为 MQ 消息

此示例说明如何记录通过流传播 FileInput 节点从文件中读取的记录,然后输出为 MQ 消息。包含 XML 数据的输入记录由 XMLNSC 解析器从输入文件中读取并解析。以下是来自输入文件的摘录:

<Dept><DeptId>124511</DeptId><Sale>34.99</Sale><ItemCode>19449539</ItemCode></Dept>
<Dept><DeptId>124512</DeptId><Sale>29.99</Sale><ItemCode>24873586</ItemCode></Dept>
<Dept><DeptId>124513</DeptId><Sale>44.99</Sale><ItemCode>54852744</ItemCode></Dept>
<Dept><DeptId>124514</DeptId><Sale>99.99</Sale><ItemCode>85222843</ItemCode></Dept>
<Dept><DeptId>124511</DeptId><Sale>34.99</Sale><ItemCode>19449539</ItemCode></Dept>

通过使用 FileInput 节点中的 Parsed Record Sequence 属性,输入数据上的每个结束 Root 标记用于标记记录的结束。然后作为如下形式的单独消息通过流传播每个记录:

<Dept><DeptId>124511</DeptId><Sale>34.99</Sale><ItemCode>19449539</ItemCode></Dept>

在 FileInput 节点中设置以下属性,以将 Parsed Record Sequence 记录检测用于 XMLNSC 解析器:

  • Input Message Parsing 选项卡上,将 Message domain 设置为 XMLNSC
  • Records and Elements 选项卡上,将 Record detection 设置为 Parsed Record Sequence

您可以直接将 FileInput 节点的 Out 终端连接到 MQOutput 节点的 In 终端。可以在这些节点之间添加任何转换节点,以支持对每个记录进行修改。在此例中,FileInput 节点的 End of Data 终端未连接,因为输出节点不是 FileOutput 节点。

图 19. 示例文件到消息流
图 19
图 19

每个记录从 MQOutput 节点作为单独的消息输出。

轮询 FTP 服务器上的 FileInput 节点内容:

还可以使用 FileInput 节点执行对远程 FTP 服务器的轮询,以便检索输入文件。首先,在代理之外使用标准接口检查 FTP 服务器是否在运行,并且您拥有足够的权限。下面的屏幕快照显示了对 FTP 服务器进行的命令行访问。登录以后,创建一个目录并复制用于消息流的输入文件。

图 20. 命令行 FTP 指令
图 20
图 20

FileInput 节点的 FTP 选项卡用于配置代理需要的属性,以便在轮询服务器时检索文件。使用代理命令 mqsicreateconfigurableservice,还可以通过可配置的服务的定义进一步抽象这些属性的设置。有关此主题的更多信息可以在信息中心找到。

请考虑按下面的屏幕快照所示设置其属性的 FileInput 节点:

图 21. FileInput 节点属性
图 21
图 21

Security identity 属性导致代理在自己的安全存储区中查找用户名和密码对。mqsisetdbparms 命令用于配置这些设置。确保在发出该命令前停止运行时代理:

图 22. 设置代理的安全标识
图 22
图 22

启动代理,并部署使用前述配置的 FileInput 节点的消息流。您可以使用本文作为示例提供的消息流之一。检查消息流是否像输入文件放在了代理的本地文件系统中一样执行。确保从 FTP 服务器删除输入文件(如下面的命令窗口所示),并创建消息流的 FileOutput 节点中配置的输出文件。

图 23. 显示文件已得到处理的 FTP 提示
图 23
图 23

在处理远程 FTP 服务器中的文件时,将首先传输文件,然后处理传输的副本,仿佛该文件已放在了本地输入目录中一样。如果远程文件传输成功,则从 FTP 服务器删除该文件。如果在解析文件内容时出现问题(在成功的 FTP 传输之后),则根据 FileInput 节点的 Retry 选项卡上指定的撤销策略,相应填充输入目录的存档 (mqsiarchive) 和撤销 (mqsibackout) 子目录。

使用 LocalEnvironment 变量中的文件元数据

LocalEnvironment 变量包含有关某个流读取和写入的文件的信息,可以访问并在某些情况下修改这些变量。与文件相关的 LocalEnvironment 变量的完整列表

本部分提供访问和修改其中一些 LocalEnvironment 变量的示例。

使用 LocalEnvironment 变量查询文件记录编号

LocalEnvironment.File 文件由 FileInput 节点设置。在此例中,日期仅添加到第一个文件记录。编写 Compute 节点 ESQL 以查询 LocalEnvironment.File.Record 变量,其中包含当前记录的记录编号。仅当此变量等于 1 时,才向文件记录添加日期。Compute 节点中使用的 ESQL 如下:

IF InputLocalEnvironment.File.Record = 1 
 THEN
   SET OutputRoot.XMLNSC.Dept.(XMLNSC.Attribute)Date = CURRENT_DATE; 
END IF;

并且从流输出的第一个记录将具有以下形式:

<Dept Date="2007-12-06"><DeptId>124511</DeptId>
<Sale>34.99</Sale><ItemCode>19449539</ItemCode></Dept>

使用 LocalEnvironment 更改输出文件名

可以修改 LocalEnvironment.Destination.File.DirectoryLocalEnvironment.Destination.File.Name 以覆盖 FileOutput 节点中设置的输出文件名称和目录。例如,此 ESQL 将修改文件名:

SET OutputLocalEnvironment.Destination.File.Name = 'GIFT_dept.xml';

如果以这种方式使用 LocalEnvironment,请确保也以同样的方式设置“Finish File”(发送到 Finish File 终端的空消息)传播的 LocalEnvironment。这可以确保在 FileOutput 的 Finish File 终端接收到该消息时,将关闭正确的文件。

图 24. 用于更改输出文件名的示例消息流摘录
图 24
图 24

检查 FileOutput 节点的 Request 选项卡中的 Request directory property locationRequest file name property location 设置。这些设置应该指向 LocalEnvironment,如下图所示。

图 25. FileOutput 节点属性
图 25
图 25

使用 LocalEnvironment 查询输出文件的名称

LocalEnvironment.WrittenDestination.File 变量包含有关 FileOutput 节点先前已写入的文件的信息。

图 26. 用于查询 FileOutput 节点使用的文件名的示例消息流摘录
图 26
图 26

Compute 节点中的 ESQL 如下:

SET OutputRoot.XMLNSC.Dept.OutputFile = 
    InputLocalEnvironment.WrittenDestination.File.Directory || '\' ||
InputLocalEnvironment.WrittenDestination.File.Name;

此 ESQL 使用 LocalEnvironment 来查询由 FileOutput 节点写入的文件的名称。在此例中,该文件名然后将输出为 MQ 消息。请注意,Compute 节点连接到 End of Data 终端。如果连接了 Out 终端,则 Compute 节点应该已经接收到每个记录的传播。

FileInput 节点和 MRM 解析器的分隔符支持

FileInput 节点能够将自定义分隔符指定为十六进制数字序列。例如,要将换行符指定为分隔符,您将在节点的 Custom delimiter (hexadecimal) 属性中输入值 OA。

MRM 标记/分隔字符串(Tagged/Delimited String,TDS)解析器提供了三种用于指定分隔符的不同语法:

  • 标准字符,例如空格字符
  • 空格字符的助记符,即 <SPACE>
  • 空格字符的 Unicode 值,即 <U+0020>

有关 MRM 分隔符语法的更多信息

MRM 自定义有线格式(Custom Wire Format,CWF)解析器提供了三种用于指定填充字符的不同语法(CWF 没有分隔符的概念):

  • 使用诸如 ' ' 或 " " 等引号括起的标准字符来指定空格字符。
  • 使用诸如 0x20 等十六进制符号来指定空格字符
  • 使用诸如 32 等十进制值来指定空格字符
  • 使用诸如 U+0020 等 Unicode 值来指定空格字符

有关这些填充字符语法的更多信息

将资源导入 Message Broker Toolkit

本文的“下载”部分包含单个名为 FileExamples.zip 的压缩文件。这是可导入 WMBv6.1 Toolkit 的项目交换文件。本文引用的所有消息流和消息集项目都将在导入的工作区中可用。

将 MRM 定义重用于 FileInput 节点

消息流——WholeFile.msgflow

可以在 File Msg Flows 项目中找到此消息流,下图显示了此消息流:

图 27. 消息流 WholeFile.msgflow
图 27
图 27

此流接受一个目录中的文件作为输入,并将其写入另一个目录。输出文件与输入文件完全相同。跟踪节点用于提供从 FileInput 节点传播的内容的有用输出。可以将此跟踪输出与第二个示例消息流 (Delimited.msgflow) 产生的跟踪做一下比较。

FileInput 节点设置了以下属性。在 Records and Elements 选项卡中,Record detection 设置为 Whole File。在 Input Message Parsing 选项卡上,属性如下图所示。请注意,MRM 消息类型被指定为 Message_WholeFile(这是在消息定义文件 MsgDefFile.mxsd 中定义的)。消息集项目名为 FileMsgSet1

图 28. WholeFile.msgflow 中的 FileInput 节点属性
图 28
图 28

消息流——Delimited.msgflow

可以在 FileMsgFlows 项目中找到此消息流,下图显示了此消息流。

图 29. 消息流 Delimited.msgflow
图 29
图 29

该流接受一个目录中的文件作为输入,并将其写入另一个目录。输出文件与输入文件完全相同。跟踪节点用于提供从 FileInput 节点传播的内容的有用输出。可以将此跟踪输出与第一个示例消息流 (WholeFile.msgflow) 产生的跟踪做一下比较。

FileInput 节点设置了以下属性。在 Records and Elements 选项卡上,Record detection 设置为 Delimited(使用 DOS 或 UNIX 行结束符作为分隔符,并使用 Postfix 作为分隔符类型)。在 Input Message Parsing 选项卡上,属性如下图所示。请注意,MRM 消息类型被指定为 Message_Delimited(这是在消息定义文件 MsgDefFile.mxsd 中定义的)。消息集项目名为 FileMsgSet1

图 30. Delimited.msgflow 中的 FileInput 节点属性
图 30
图 30

消息集——messageSet.mset

此消息集可以在名为 FileMsgSet1 的项目中找到。需要相应地设置消息集的 Default message domain(在此例中为 MRM),以便正确构建为部署做好准备的代理存档文件。请注意,此行为是 6.1 版新引入的。

图 31. MessageSet.mset 文件属性
图 31
图 31

消息定义文件——MsgDefFile.mxsd

消息定义文件可以在名为 FileMsgSet1 的项目中找到。

图 32. 消息定义文件 MsgDefFile.mxsd

图 32.
图 32
图 32

请注意,Message_Delimited 是在 choice 类型的基础上构建的,而 Message_WholeFile 则是在 sequence(其中包括数量不限的 Record 元素实例)的基础上构建的。

这两个消息定义是使用相同的元素引用有意地构建的,以便说明 File 节点对现有定义的重用。

输入文件——WholeFile_input.txt

此输入文件可以在名为 FileMsgFlows 的项目中找到。其中包含以下数据:

HDR:RefundsDepartment,7,57.62
REC:widget,2007-05-13,faulty,9.99
REC:asset,2007-06-19,unwanted gift,5.50
REC:thing,2007-07-07,faulty,6.66
REC:widget,2007-08-30,exchange,9.99
REC:asset,2007-09-01,faulty,5.50
REC:widget,2007-09-30,exchange,9.99
REC:widget,2007-10-10,faulty,9.99
TRL:39.96,11.00,6.66,57.62

此文件用作消息流 WholeFile.msgflow 的输入。

输入文件——Delimited_input.txt

此输入文件可以在名为 File MsgFlows 的项目中找到。其中包含与 WholeFile_input.txt 文件相同的数据。

此文件用作消息流 Delimited.msgflow 的输入。

代理存档文件——FileArchive.bar

代理存档文件可以在名为 FileMsgFlows 的项目中找到。其中包含两个消息流的编译版本和一个用于消息集的字典文件。

运行消息流

缺省情况下,该示例是针对 Windows 进行设置的,并假设已在部署代理存档之前在文件系统上创建了以下目录。

C:\File Examples\FILEIN——由每个流的 FileInput 节点读取的目录

C:\File Examples\FILEOUT——由每个流的 FileOutput 节点写入的目录

C:\File Examples\TRACE——由每个流的 Trace 节点写入的目录

将代理存档部署到代理。

要运行任一个流,请将相关输入文件的副本放在由 FileInput 节点读取的目录中(缺省为 C:\FileExamples\FILEIN)。

输出文件将在目录 C:\FileExamples\FILEOUT 中产生,跟踪文件将在目录 C:\FileExamples\TRACE 中产生。

WholeFile.msgflow 接收一个名为 WholeFile_input.txt 的输入文件,并创建一个名为 WholeFile_output.txt 的输出文件。Delimited.msgflow 接收一个名为 Delimited_input.txt 的输入文件,并创建一个名为 WholeFile_output.txt 的输出文件。

无论使用哪个输入文件来驱动流,输出文件的内容都是完全相同的。

两个消息流之间的传播比较

WholeFile.msgflow 仅涉及到从 FileInput 节点进行的一次传播。相反,Delimited .msgflow 涉及到从 FileInput 节点进行的多次单独传播。

这是因为 WholeFile.msgflow 中的 FileInput 节点将整个输入文件检测为一个记录,因此仅传播一次。但是,Delimited.msgflow 使用的 FileInput 节点将一个记录检测为以回车符后面跟着换行符 (CRLF) 结尾。

WholeFile.msgflow 不在 FileInput 节点中执行记录检测,而是由 TDS 解析器解释输入数据中的每一行之间的 CRLF。在 Delimited.msgflow 的情况下,FileInput 节点使用 CRLF 确定一次传播在何处结束以及下一次传播在何处开始。

WholeFile.msgflow 的跟踪输出

WholeFile .msgflow 的跟踪输出可以在文件 TRACETREE_WHOLEFILE.txt 中找到。其中将包含以下内容(请注意,Properties 文件夹的跟踪已从此片段中删除):

(0x01000021):MRM        = (
  (0x01000013):Header  = (
    (0x0300000B):Department      = 'RefundsDepartment'
    (0x0300000B):NumberOfRecords = 7
    (0x0300000B):Total           = 57.62
  )
  (0x01000013):Record  = (
    (0x0300000B):ItemName        = 'widget'
    (0x0300000B):DateOfReturn    = DATE '2007-05-13'
    (0x0300000B):ReasonForReturn = 'faulty'
    (0x0300000B):Amount          = 9.99
  )
  (0x01000013):Record  = (
    (0x0300000B):ItemName        = 'asset'
    (0x0300000B):DateOfReturn    = DATE '2007-06-19'
    (0x0300000B):ReasonForReturn = 'unwanted gift'
    (0x0300000B):Amount          = 5.50
  )
  (0x01000013):Record  = (
    (0x0300000B):ItemName        = 'thing'
    (0x0300000B):DateOfReturn    = DATE '2007-07-07'
    (0x0300000B):ReasonForReturn = 'faulty'
    (0x0300000B):Amount          = 6.66
  )
  (0x01000013):Record  = (
    (0x0300000B):ItemName        = 'widget'
    (0x0300000B):DateOfReturn    = DATE '2007-08-30'
    (0x0300000B):ReasonForReturn = 'exchange'
    (0x0300000B):Amount          = 9.99
  )
  (0x01000013):Record  = (
    (0x0300000B):ItemName        = 'asset'
    (0x0300000B):DateOfReturn    = DATE '2007-09-01'
    (0x0300000B):ReasonForReturn = 'faulty'
    (0x0300000B):Amount          = 5.50
  )
  (0x01000013):Record  = (
    (0x0300000B):ItemName        = 'widget'
    (0x0300000B):DateOfReturn    = DATE '2007-09-30'
    (0x0300000B):ReasonForReturn = 'exchange'
    (0x0300000B):Amount          = 9.99
  )
  (0x01000013):Record  = (
    (0x0300000B):ItemName        = 'widget'
    (0x0300000B):DateOfReturn    = DATE '2007-10-10'
    (0x0300000B):ReasonForReturn = 'faulty'
    (0x0300000B):Amount          = 9.99
  )
				   (0x01000013):Trailer = (
      (0x0300000B):SubTotal1 = 39.96
      (0x0300000B):SubTotal2 = 11.00
      (0x0300000B):SubTotal3 = 6.66
      (0x0300000B):Total     = 57.62
    )
  )

Delimited.msgflow 的跟踪输出

Delimited .msgflow 的跟踪输出可以在两个文件中找到。第一个文件 TRACETREE_DELIMITED.txt 将包含以下内容(请注意,Properties 文件夹的跟踪已从此片段中删除):

一次 Header 元素的传播……

(0x01000021):MRM        = (
  (0x01000013):Header = (
    (0x0300000B):Department      = 'RefundsDepartment'
    (0x0300000B):NumberOfRecords = 7
    (0x0300000B):Total           = 57.62
  )
)

然后是若干次 Record 元素的传播……

(0x01000021):MRM        = (
  (0x01000013):Record = (
    (0x0300000B):ItemName        = 'widget'
    (0x0300000B):DateOfReturn    = DATE '2007-05-13'
    (0x0300000B):ReasonForReturn = 'faulty'
    (0x0300000B):Amount          = 9.99
  )
)

直到 Trailer 元素的最后传播……

(0x01000021):MRM        = (
  (0x01000013):Trailer = (
    (0x0300000B):SubTotal1 = 39.96
    (0x0300000B):SubTotal2 = 11.00
    (0x0300000B):SubTotal3 = 6.66
    (0x0300000B):Total     = 57.62
  )
)

第二个跟踪文件 TRACEFILE_EOF.txt 将包含以下内容(请注意,Properties 文件夹的跟踪已从此片段中删除):

(0x01000000):BLOB       = (
  (0x03000000):UnknownParserName = 'NONE'
    (0x03000000):BLOB              = X"
      )

这是因为从 FileInput 的 End of Data 终端发送的消息始终是空 BLOB 消息。

Parsed Record Sequence 记录检测

Records and Elements 选项卡上,选择 Parsed Record Sequence 将告诉 FileInput 节点,应该仅使用 Input Message Parsing 选项卡上的属性将输入文件中的数据分离为记录以便传播。换句话说,与依赖节点本身上指定的记录长度或分隔符不同,应该使用代理的解析器之一来决定一个记录从何处结束以及下一个记录从何处开始。

图 33. FileInput 节点属性
图 33
图 33

在 WebSphere Message Broker v6.1 的初始版本中,Parsed Record Sequence 设置支持使用以下解析器来执行记录检测:

  • XMLNSC 域解析器
  • MRM TDS 域解析器
  • MRM CWF 域解析器

如果流开发人员部署了一个将 Parsed Record Sequence 设置为 Record Detection 的 FileInput 节点,并对 Input Message Parsing 参数使用不在上述列表中的任何其他解析器设置(例如 MRM XML),则会在运行时引发异常 BIP6066E。

需要相应地设置消息集的 Default message domain (在此例中为 MRM),以便正确构建为部署做好准备的代理存档文件。此行为是 V6.1 新引入的。

图 34. MessageSet.mset 文件属性
图 34
图 34

MRM 域可以支持多种有线格式(显示在屏幕左侧的 Properties 层次结构下面)。在下面的示例中,将使用单个消息集对一个具有两种物理格式的逻辑模型建模——分别名为 Text1Tagged/Delimited String Format 和名为 Binary1Custom Wire Format

使用 XMLNSC 解析器的 Parsed Record Sequence 示例:

下面的示例显示了由名为 ParsedRecordSequence_XMLNSC.msgflow 的消息流中的 FileInput 节点解析的输入文件的内容。此流可以在名为 FileMsgFlows 的项目中找到。彩色编码演示了应该如何将数据分离为记录以便通过消息流传播。

图 35. 从 ParsedRecordSequence_XMLNSC_input.txt 输入的文件
图 35
图 35

请注意,整个文件没有总体根标记。每个记录是良构的 XML 消息,具有单个名为 Invoice 的根标记。XMLNSC 解析器解释 XML 标记以确定一个 Invoice 记录在何处结束以及下一个 Invoice 记录在何处开始。每个 Invoice 记录通过消息流单独地传播。跟踪流表明存在如下四次传播:

传播 1:

(0x01000000):XMLNSC     = (
    (0x01000000):Invoice = (
      (0x01000000):Customer = (
        (0x03000000):FirstName   = 'Fred'
        (0x03000000):LastName    = 'Bloggs'
        (0x03000000):AddressLine = '123 Happy Street'
        (0x03000000):City        = 'Joyville'
      )
      (0x01000000):Product  = (
        (0x03000000):Description   = 'WIDGET'
        (0x03000000):NumberOfItems = '3'
        (0x03000000):UnitPrice     = '19.99'
        (0x03000000):Total         = '59.97'
      )
    )
  )

传播 2:

(0x01000000):XMLNSC     = (
    (0x01000000):Invoice = (
      (0x01000000):Customer = (
        (0x03000000):FirstName   = 'John'
        (0x03000000):LastName    = 'Doe'
        (0x03000000):AddressLine = '123 Sad Avenue'
        (0x03000000):City        = 'Sadstate'
      )
      (0x01000000):Product  = (
        (0x03000000):Description   = 'THING'
        (0x03000000):NumberOfItems = '2'
        (0x03000000):UnitPrice     = '3.50'
        (0x03000000):Total         = '7.00'
      )
    )
  )

传播 3:

(0x01000000):XMLNSC     = (
  (0x01000000):Invoice = (
    (0x01000000):Customer = (
      (0x03000000):FirstName   = 'Gordon'
      (0x03000000):LastName    = 'Brown'
      (0x03000000):AddressLine = '10 Downing Street'
      (0x03000000):City        = 'London'
    )
    (0x01000000):Product  = (
      (0x03000000):Description   = 'ASSET'
      (0x03000000):NumberOfItems = '1'
      (0x03000000):UnitPrice     = '49.99'
      (0x03000000):Total         = '49.99'
    )
  )
)

传播 4:

(0x01000000):XMLNSC     = (
  (0x01000000):Invoice = (
    (0x01000000):Customer = (
      (0x03000000):FirstName   = 'George'
      (0x03000000):LastName    = 'Bush'
      (0x03000000):AddressLine = '1600 Pennsylvania Avenue'
      (0x03000000):City        = 'Washington'
    )
    (0x01000000):Product  = (
      (0x03000000):Description   = 'WIDGET'
      (0x03000000):NumberOfItems = '4'
      (0x03000000):UnitPrice     = '19.99'
      (0x03000000):Total         = '79.96'
    )
  )
)

使用 MRM CWF 解析器的 Parsed Record Sequence 示例:

下面的示例显示了由名为 ParsedRecordSequence_MRMCWF.msgflow. 的消息流中的 FileInput 节点解析的输入文件的内容。此流可以在名为 FileMsgFlows 的项目中找到。彩色编码演示了应该如何将数据分离为记录以便通过消息流传播。

图 36. 从 ParsedRecordSequence_MRMCWF_input.txt 输入的文件
图 36
图 36

此示例演示了如何将 Parsed Record Sequence 记录检测用于 MRM CWF 解析器,以通过消息流传播固定长度的单独记录。每个记录与 Invoice 消息的 CWF 有线格式层的定义匹配。MRM CWF 解析器使用记录的属性(每个字段的长度)来确定一个 Invoice 记录从何处结束以及下一个 Invoice 记录从何处开始。每个 Invoice 记录通过消息流单独地传播。跟踪该流表明了四次与其他示例完全相同的传播。

使用 MRM TDS 解析器的 Parsed Record Sequence 示例:

下面的示例显示了由名为 ParsedRecordSequence_MRMTDS.msgflow 的消息流中的 FileInput 节点解析的输入文件的内容。此流可以在名为 FileMsgFlows 的项目中找到。彩色编码演示了如何将数据分离为记录以便通过消息流传播。

图 37. 从 ParsedRecordSequence_MRMTDS_input.txt 输入的文件
图 37
图 37

每个记录之间没有放置分隔符。MRM TDS 消息的定义将确定一次传播从何处结束以及下一次传播从何处开始。用于解析记录的消息定义如下图所示。Invoice.mxsd 可以在名为 FileMsgSet2 的消息集项目中找到。

图 38. 消息定义文件 Invoice.mxsd
图 38
图 38

消息的 MRM TDS 属性为解析器提供了有关分隔符、组指示符和组终止符的信息,从而使解析器能够解析一个消息从何处结束以及下一个消息从何处开始。

使用 SWIFT MRM TDS 解析器的 Parsed Record Sequence 示例:

从 WebSphere Message Broker Toolkit Help 菜单上,打开 Samples Gallery 并导入 SWIFT Message Set 示例:

图 39. SWIFT Message Set 示例
图 39
图 39

此示例与名为 SWIFT Message Sets 的导入的消息集项目相关。就此示例而言,名为 SWIFT Message Flows 的导入的消息流项目可以忽略。此示例中使用的消息流名为 ParsedRecordSequence_SWIFT.msgflow,可以在名为 FileMsgFlows 的消息流项目中找到。

名为 ParsedRecordSequence_SWIFT_input.txt 的输入文件包含四个相继连接在一起的 SWIFT 消息。每个消息的内容取自 SWIFT 示例提供的排队消息文件。

图 40. 从 ParsedRecordSequence_SWIFT_input.txt 输入的文件
图 40
图 40

设置 FileInput 节点的 Input Message Parsing 选项卡上的属性,再与 Parsed Record Sequence 记录检测相结合,将导致 MRM TDS 解析器使用 SWIFT Message Set 示例来确定一个 SWIFT 记录从何处结束以及下一个 SWIFT 记录从何处开始。

图 41. FileInput 节点属性
图 41
图 41

每个记录(在前面的图中用不同的颜色表示)相继通过消息流传播。要查看通过流传播的各个消息,请找到名为 C:\FileExamples\TRACE\TRACETREE_PARSEDRECORDSEQUENCE_SWIFT.TXT 的示例跟踪文件。

消息流中基于文件内容的路由

使用 RouteToLabel 节点是通过消息流基于消息内容路由文件记录的方法之一。在此例中,FileInput 节点读取如下所示的输入文件:

<Dept><DeptId>124511</DeptId><Sale>34.99</Sale><ItemCode>19449539</ItemCode></Dept>
<Dept><DeptId>124512</DeptId><Sale>29.99</Sale><ItemCode>24873586</ItemCode></Dept>
<Dept><DeptId>124513</DeptId><Sale>44.99</Sale><ItemCode>54852744</ItemCode></Dept>
<Dept><DeptId>124514</DeptId><Sale>99.99</Sale><ItemCode>85222843</ItemCode></Dept>
<Dept><DeptId>124511</DeptId><Sale>34.99</Sale><ItemCode>19449539</ItemCode></Dept>

文件中列出的每个部门 ID <DeptId> 与商店中的某个部门名称相对应。每个文件记录中的部门 ID 用于将记录路由到四个标签之一:ELECTRICALGIFTKITCHENHOME,这四个标签分别与商店中的四个部门名称对应。

<Dept><DeptId>124511</DeptId><Sale>34.99</Sale><ItemCode>19449539</ItemCode></Dept>

Compute 节点用于为 RouteToLabel 节点设置标签。下面的 ESQL 摘录显示了如何设置标签以使用传入消息中的 <DeptId>

SET OutputLocalEnvironment.Destination.RouterList.DestinationData[1].labelname = 
   CASE (InputRoot.XMLNSC.Dept.DeptId )
      WHEN '124511' THEN 'ELECTRICAL'
      WHEN '124512' THEN 'KITCHEN'
      WHEN '124513' THEN 'GIFT'
      WHEN '124514' THEN 'HOME'
      ELSE 'UNKNOWN_DEPT' 
   END;

流中的四个标签节点连接到四个 FileOutput 节点,后者为每个部门产生一个输出文件。每个 FileOutput 节点需要在 Finish File 终端中接收一条“Finish File”消息,以便关闭四个单独的输出文件。这就是显示从 FileInput 节点的 End of Data 终端发出的连接的原因。

图 42. 消息流中基于文件内容的路由
图 42
图 42

转换文件记录内容和添加新的最终记录

在此例中,FileInput 节点读取如下所示的输入文件:

<Dept><DeptId>124513</DeptId><Sale>34.99</Sale><ItemCode>19449539</ItemCode></Dept>
<Dept><DeptId>124511</DeptId><Sale>19.99</Sale><ItemCode>24873586</ItemCode></Dept>
<Dept><DeptId>124514</DeptId><Sale>59.99</Sale><ItemCode>54852744</ItemCode></Dept>

每个记录有一个元素 <Sale>,,其中包含该部门销售的商品的成本。. 名为 AddRunningTotal 的 Compute 节点用于保存已通过流的所有商品的累计销售额(以名为 running Total 的 ESQL 共享变量的形式存在)。runningTotal 在将每个记录写入输出文件之前添加到每个记录。所有商品的最终累计销售额在处理完输入文件中的所有记录之后添加到输出文件。下面的消息流显示了如何能够实现此目的。

图 43. 转换文件记录内容的消息流
图 43
图 43

在名为 AddRunningTotal 的 Compute 节点中,以下 ESQL 保存累计销售额,同时还向每个记录添加一个新元素,以显示到目前为止的总销售额。

SET OutputRoot = InputRoot;
BEGIN ATOMIC
  SET runningTotal = 
      runningTotal + CAST(InputRoot.XMLNSC.Dept.Sale  AS DECIMAL);
END;
SET OutputRoot.XMLNSC.Dept.TotalSales = runningTotal;

保存累计销售额以后,我们需要向输出文件添加一个最终记录,其中包含当天的总销售额。这可以通过在 FileOutput 节点的 Finish File 终端之前添加一个 Compute 节点来实现。

下面显示了 AddFinalTotal Compute 节点中的 ESQL,此 ESQL 创建一个名为 FinalSales 的新元素,并将共享变量 runningTotal 的内容分配给该元素。Compute 节点将把此数据传播到 FileOutput 节点的 In 终端,以便能够将数据写入输出文件(从所示的 out1 终端发出的蓝色连接)。Compute 节点还将“Finish File”消息传播到 FileOutput 节点的 Finish File 终端,以便能够关闭输出文件(从所示的 out 终端发出的红色连接): SET OutputRoot.XMLNSC.Dept.FinalSales = runningTotal;

PROPAGATE TO TERMINAL 'out1' DELETE NONE;
SET OutputRoot = InputRoot;
SET runningTotal = 0.0;
RETURN TRUE;

写入输出文件的记录现在已添加了累计销售额。文件结尾将有一个附加的记录显示最终总销售额。

下面的摘录显示了 FileOutput 节点写入的记录。每个记录包含由 AddRunningTotal Compute 节点添加的累计销售额元素 TotalSales。最后一个记录显示由 Compute 节点 AddFinalTotal 添加的最终销售额元素 FinalSales

<Dept><DeptId>124513</DeptId><Sale>34.99</Sale><ItemCode>19449539</ItemCode>
     <TotalSales>34.99</TotalSales></Dept>
<Dept><DeptId>124511</DeptId><Sale>19.99</Sale><ItemCode>24873586</ItemCode>
    <TotalSales>54.98</TotalSales></Dept>
<Dept><DeptId>124514</DeptId><Sale>59.99</Sale><ItemCode>54852744</ItemCode>
    <TotalSales>114.97</TotalSales></Dept>
<Dept><FinalSales>114.97</FinalSales></Dept>

在 FileOutput 节点中使用 Xpath 有选择地连接元素

在 FileOutput 节点的 Request 选项卡上,您可以在 Data Location 字段中输入一个 XPath 表达式,以选择要将什么数据从输入记录写入输出文件。请考虑这样一个输入文件,其中包含如下格式的记录:

<Dept><DeptId>124511</DeptId><Sale>34.99</Sale><ItemCode>19449539</ItemCode></Dept>

要仅将每个记录的 ItemCode 写入输出文件,可以将 FileOutput 节点的 Request 选项卡上的 Data Location 设置为 $Body/Dept/ItemCode。

事务性

消息流开发人员了解 FileInput 和 FileOutput 节点在事务方面的行为是非常重要的。FileInput 节点提供了相关参数,可控制在消息流操作成功执行和消息流操作失败的情况下,该操作所在的消息流的行为。FileInput 节点有一个 Transactions 选项卡,用于将 Transaction mode 设置为 Yes 或 No。此节点上的事务设置将确定是否应该在同步点下执行消息流中的其余节点。File 节点本身并不是事务性的,因此该属性影响流中的其他节点,但是不影响 File 节点。该属性的缺省设置是 Yes。此值意味着应该在可能的情况下将流更新(例如,插入数据库,以及将消息写入 WMQ 队列)视为事务性的。流开发人员还可能熟悉消息流属性 CommitCount 的概念,对于 WebSphere MQ 消息,此属性指定消息流要处理多少条输入消息以后才取得同步点(通过发出 MQCMIT)。当消息流不是在处理 WebSphere MQ 消息时,此属性不影响流行为,因此不应将其与配置 File 节点混为一谈。不存在用于 File 节点的等效批处理属性。

包含 File 节点(无论是输入节点还是输出节点)的消息流与文件系统本身之间的交互不是事务性的。只有在真正的事务性文件系统的支持下才能实现此类协作。因此,通过消息流中的 File 节点的配置进行编排的写入、移动、删除和追加操作决不会在流的控制下批量撤销或提交。

提供了多记录检测策略(Whole File、Fixed Length、Delimited、Parsed Record Sequence),这些策略确定消息流应该如何处理输入文件的内容。

FileInput 节点的 Records and Elements 选项卡包括一个 Record detection 属性,此属性控制从单个输入文件产生多少次传播。选择 Whole File 导致将输入文件的整个内容作为单个逻辑树通过消息流传播。选择 Fixed LengthDelimitedParsed Record Sequence 导致将输入文件的内容分解为单独的记录,并通过消息流单独地传播。在传播 Whole File 内容并且使用 FileOutput 节点终止流的情况下,一旦流成功完成,输出文件将自动写入输出文件系统。在通过消息流逐个记录地传播内容的情况下,仅当 FileOutput 节点的 Finish File 终端接收到消息时,才会将输出文件写入输出文件系统。

FileInput 节点允许流开发人员指定在消息流未能成功处理输入文件时应该采取的操作。在 Retry 选项卡上,Action on failing file 属性指定在处理输入文件内容的所有尝试都失败后,应对输入文件执行的操作。可以将文件移动到撤销子目录(向文件名追加时间戳或不追加时间戳),或者可以将文件删除。

FileInput 节点还允许指定在消息流成功完成对输入文件的处理时,应该对原始输入文件执行的操作。在 Basic 选项卡上,Action on successful processing 属性指定可以将原始文件移动到存档子目录(向文件名追加时间戳或不追加时间戳),或者可以将文件删除。

在文件处理过程中发生灾难性故障的情况下,例如硬件故障或导致 Message Broker 的执行组立即停止的任何事情,原始输入文件将保留在文件系统上,并在代理重新启动时从头重新开始处理。代理不记录处理多记录文件的进展状态,因此不存在从停止的位置重新开始代理流的概念。

输入文件轮询和消息流可伸缩性

名为 Polling interval 的 FileInput 节点属性控制 FileInput 节点访问文件系统以查找要处理的文件的频率。流启动时的初始输入目录扫描将填充与输入模式匹配并等待由消息流处理的输入文件列表。消息流根据文件的最后修改时间处理这些文件(最旧的文件最先处理)。FileInput 节点使用可用实例池中的单个线程解析每个文件(与所有消息流输入节点一样,可以将 FileInput 节点配置为使用与消息流关联的线程池或与各个节点关联的线程池)。这种体系结构可以保证按照记录在文件中的出现顺序处理特定输入文件中的记录。通过增加线程池中可用的实例数量,可以扩展代理以同时操作多个输入文件。

一旦某个流实例完成其输入文件的处理,该实例立即返回线程池,并开始处理列表中的下一个输入文件。当列表中的所有文件都已完成处理时,流将返回输入目录去查找自从上次扫描以来已到达的新文件。如果没有进一步的输入文件可用(该目录没有包含与输入模式匹配的可用文件),则 FileInput 节点将等待由轮询间隔定义的时间,从而防止 FileInput 节点持续访问文件系统并消耗系统资源。请根据您的应用程序需求设置轮询间隔。缺省的 5 秒间隔非常适合用于测试新的流,因为这减少了重复运行测试所花的时间。对于具有许多文件的生产系统或网络文件系统,您可能发现诸如 60 秒甚至 300 秒等较长的间隔可以降低系统负载。


下载资源


相关主题


评论

添加或订阅评论,请先登录注册

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=WebSphere
ArticleID=365986
ArticleTitle=WebSphere Message Broker V6.1 中的文件处理
publish-date=01222009