使用实时数据填充数据仓库的高效解决方案,第 2 部分: 探索两个不同的集成选项:使用临时表或使用 MQ 消息

通过集成 InfoSphere Replication Server 和 InfoSphere DataStage 逐步填充您的数据仓库

用源数据的更改填充数据仓库可能会花费巨大代价。如果只用 SQL 提取,根本无法确定发生变化的行。IBM InfoSphere™ Replication Server 可以通过只读取数据库日志就检测出变化的数据。该系列文章 将演示如何使用 InfoSphere Replication Server 来高效提取发生变化的数据,然后将更改传送给 IBM InfoSphere DataStage 以填充数据仓库。该系列分为两部分。在 第 1 部分:集成 InfoSphere Replication Server 与 InfoSphere DataStage 中,将会简要介绍产品及使用方法。在第 2 部分中,将探索两个不同的集成选项:使用 WebSphere® MQ 消息和 InfoSphere Event Publisher 以及使用临时表。

Anand Krishniyer, 资深软件工程师,InfoSphere Replication, IBM

Anand Krishniyer 的照片Anand Krishniyer 是 InfoSphere Replication 开发机构的资深工程师。作为管理团队的一员,他的职责包括开发项目,以及为用户提供 Replication Server 安装、设置和配置相关的技术支持。在担任本职务之前,Anand 是一家流程管理公司 Savvion(现在是 Progress Software 的一部分)Integration and Tools 团队的项目主管。



Tony Lee, 高级认证 IT 专家,InfoSphere Replication, IBM

Tony Lee 的照片Tony Lee 是 InfoSphere Replication 开发机构内的 InfoSphere Replication Center of Competency (COC) 团队中的高级认证 IT 专家。作为 COC 的成员,Tony 为用户和业务伙伴提供 InfoSphere replication 技术支持。Tony 具有多年关于各种 IBM 复制技术的咨询经验,包括 SQL 复制、Q 复制,还有最近的 InfoSphere Change Data Capture。在担任此职务之前,Tony 近二十年从事为客户和伙伴提供 Information Management 技术咨询,其中包含多个方面,有 DB2 调优、Information Server 和 Master Data Management。在成为顾问之前,Tony 从事过 Information Management 领域多项工作,从管理到开发。



James Yau, 技术解决方案架构师,InfoSphere Information Server, IBM

James Yau 的照片James Yau 是 InfoSphere Information Server DataStage 产品的认证高级解决方案架构师。目前,他在 InfoSphere Technology Enablement 机构负责 Information Server Boot Camp 内容开发和发布。James 具有多年 Information Server Suite 产品的顾问经验,包括 InfoSphere DataStage、QualityStage、Information Analyzer 和 FastTrack。在担任本职务之前,James 是 Business Partner Technical Enablement 团队的一员,InfoSphere Information Server 的技术项目经理。他的任务包括课程内容开发和发布,这其中包含教师指导、在线观看、自主学习等多种形式。此前,James 在 IBM 公司内外担任过多种角色,从软件开发到营销经理。



2010 年 12 月 13 日

引言

关于本教程

本系列的 第 1 部分:集成 InfoSphere Replication Server 与 InfoSphere DataStage 介绍了 InfoSphere Replication Server 和 DataStage 产品的技术以及通过集成它们来填充数据仓库的不同方式。还讨论了各种集成方法的优缺点。在第 2 部分中,讨论两种具体的集成方法:使用 MQ 消息和使用临时表。本教程通过屏幕截图和详细的步骤说明带领读者设置和配置这两种集成方法。本教程并不深入讲解如何编写 DataStage 作业或如何配置复制,而是重点关注集成技术。

先决条件

可以在以下环境中执行本教程描述的场景:

操作系统和硬件
  • AIX®, Version 5.3 操作系统,采用 64 位 Common Hardware Reference Platform (CHRP) 硬件架构
  • Windows XP Professional Service Pack 3,采用 32 位 Intel 处理器
软件
  • IBM InfoSphere Replication Server 9.7
  • IBM Information Server 8.1 Server for AIX(包含用于 DB2® 连接器的 Connectors rollup patch 2)
  • IBM Information Server 8.1 Client for Windows®(包含用于 DB2 连接器的 Connectors rollup patch 2)
  • WebSphere MQ, Version 7(用于 MQ 场景)

与 InfoSphere Event Publisher 的实时集成

设置 Event Publisher

第 1 部分:集成 InfoSphere Replication Server 与 InfoSphere DataStage 讨论了 InfoSphere Replication Server 和 InfoSphere DataStage 产品的技术以及通过集成它们来填充数据仓库的不同方式。还讨论了各种集成方法的优缺点。如果需要的话,在开始设置和配置集成方法之前回顾那篇文章。

图 1 是一个样例实时集成设置的示意图。驻留源数据的 Event Publisher 在 Windows 系统 svl-akrishni2 上。运行 InfoSphere DataStage 的操作数据库(ODS)在 AIX 系统 scoobydoo 上。数据仓库的位置并不重要,可以配置在完全不同的系统上。

图 1. 事件发布集成
显示源数据库在 svl-akrishn2 上,从数据库日志输出到 Q Capture 和 Q Manager,这连接到 scoobydoo 上的 Q Manager, 然后连接到 scoobydoo 上的 DataStage 作业

Q Capture 程序从日志中捕捉源表 Product 和 Inventory 中的更改,把它们发布到一个 WebSphere MQ 队列。这个队列称为发送 队列(SALES_DATA_TO_DATASTAGE),是在 Windows 系统 svl-akrishni2 上的队列管理器 QMEP 中定义的。队列管理器 QMEP 把消息发送到 AIX 系统上的一个队列。这个队列称为接收 队列(SALES_DATA_TO_DATASTAGE,与发送队列同名),是在 AIX 系统上的队列管理器 QMDS 中定义的。

通过配置事件发布,让消息采用逗号分隔的值(CSV)格式,每条消息包含单一行操作,比如 INSERT、UPDATE 或 DELETE。清单 1 给出队列中的 MQ 消息样例。

清单 1. 队列中的 MQ 消息样例
10,"IBM","2009355","021255984000","AKRISHNI","PRODUCT","REPL","0000:0000:0000:0000:5ae5",
"0000:87a0:c204:0000:0000","2009-12-21-10.12.52",,0000,"100-201-01","Ice Scraper, 
Windshield 4 inch",3.99,,,,,"100-201-01","Ice Scraper1, Windshield 4inch",3.99,,,,"
<product pid=""100-201-01"" ><description><name>Ice Scraper,Windshield
4 inch</name><details> Basic Ice Scraper 4 inches wide, foam handle
</details><price>3.99</price></description></product>"

正如 第 1 部分:集成 InfoSphere Replication Server 与 InfoSphere DataStage 所述,消息中的前 12 列是复制头,其余的列是从源表捕捉到的更改数据。第 6 和 7 列中的头对于设置和配置很重要。第 6 列中的头是源表的名称,第 7 列中的头是一个四字符的编码,表示造成行更改的 SQL 操作。四字符编码 ISRT 表示插入,REPL 表示更新,DLET 表示删除。关于消息中所有头的详细解释,请在 DB2 V9.7 Information Center 中查找 IBM InfoSphere Event Publisher(见 参考资料)。

DataStage 作业执行以下任务:

  1. 从 AIX 系统上的接收队列中读取消息
  2. 解析消息
  3. 转换数据
  4. 写数据集,数据集是通过数据集 stage 创建的操作系统文件

与对源表的三种行操作(INSERT、UPDATE 或 DELETE)对应的更改数据被写到不同的数据集中。因此,对于每个源表,DataStage 作业生成三个数据集,分别存储插入、更新和删除数据。

注意:示例假设在消息之间没有依赖性,比如引用约束。处理 INSERT 消息的次序可以独立于处理 DELETE 消息的次序。如果消息之间有依赖性,那么所有消息需要写到单一数据集中以保持次序,而且 DataStage 作业本身必须确保以正确的次序处理消息。

配置 Event Publisher

配置 Event Publisher 的步骤如下:

  1. 创建 DB2 对象(源表)

    注意:这个步骤只是这个示例场景需要的;它不是 配置设置步骤。正常情况下,DB2 对象已经存在了,只需确定源表和要复制的数据子集。
  2. 按以下步骤创建 Event Publisher 设置:
    1. 设置 MQ
    2. 设置 Event Publisher
    3. 操作 Event Publisher
  3. 按以下步骤创建 DataStage 设置:
    1. 配置 MQ Connector
    2. 配置第一个转换器 stage
    3. 配置第二个转换器 stage
    4. 配置数据集 stage
  4. 按以下步骤测试实时配置设置:
    1. 启动 Event Publisher
    2. 导入 DataStage 作业
    3. 编译 DataStage 作业
    4. 通过运行测试脚本在源数据中引入更改
    5. 运行 DataStage 作业
    6. 检查数据集以确认已经填充数据

下面几节详细描述示例。本教程的 下载 部分包含配置此样例需要的所有脚本。

创建 DB2 对象

对于样例设置,使用一个名为 SALES 的数据库并导入 PRODUCT 和 INVENTORY 表(下载 中提供的 ixf 文件)。这些是此场景使用的源表。也可以使用 DB2 V9.7 样例数据库中的 PRODUCT 和 INVENTORY 表,或者把这些表导入您选择的另一个数据库。

为了导入 Product 和 Inventory 表,进入下载的附件的 setupDB 目录,会在这里找到 product.ixf 文件和 inventory.ixf 文件。输入 db2cmd 打开 DB2 命令窗口,运行以下命令:

import from product.ixf of ixf create into product
import from inventory.ixf of ixf create into inventory

设置 WebSphere MQ 队列管理器

现在开始创建 Event Publisher 设置,首先按以下步骤设置 MQ:

  1. 在源系统 svl-akrishni2 上,执行 Crtmqm.exe QMEP 命令创建 WebSphere MQ 队列管理器 QMEP。
  2. 执行 Strmqm.exe QMEP 命令启动队列管理器。
  3. 执行 runmqsc.exe QMEP < mqObjectsPublisher.in 命令在源系统上创建其他 MQ 对象。

    可以在 下载 的 RealTimeInt\setupEP 目录下面找到 mqObjectsPublisher.in 脚本。此脚本在源系统上创建所有 MQ 对象(队列、通道和监听器)。

  4. 在目标(订阅程序)系统上,执行以下命令创建队列管理器和 MQ 对象:
    Createmqm QMDS
    Strmqm QMDS
    Runmqsc QMDS < mqObjectsSubscriber.in

    可以在本教程的 下载 的 RealTimeInt\setupEP 目录下面找到 mqObjectsSubscriber.in 脚本。

设置 Event Publisher

按以下步骤设置 EP:

  1. 在 DB2 命令窗口中使用 ASNCLP(命令行复制管理工具)运行以下脚本,创建复制对象:asnclp -f crtCtlTables.in
  2. 输入 asnclp -f crtQMapAndPublications.in 创建发布设置所需的所有控制表。

    可以在 下载 的 RealTimeInt\setupEP 目录下面找到 crtCtlTables.in 和 crtQMapAndPublications.in 脚本。这些脚本为 Product 和 Inventory 表创建队列映射(MQ 对象到 Q 复制对象的映射)以及发布。

操作 Event Publisher

在 DB2 命令窗口中执行以下命令启动 EP(Q Capture 程序):asnqcap CAPTURE_SERVER=SALES CAPTURE_SCHEMA=ASN

这会启动 Q Capture 程序,此程序开始发布 Product 和 Inventory 表中的更改。也可以使用 下载 的 RealTimeInt\setupEP 目录中的 startQCapture.bat 脚本。

设置 DataStage 作业

在 DataStage 作业中,使用 WebSphere MQ Connector 从接收队列读取消息,使用两个转换器 stage 解析消息,见 图 2。第一个转换器 stage 作为选择控制,分离与某个发布(比如某个源表)相关的消息。第二个转换器 stage 也作为选择控制,分离与每个表的特定操作(比如插入、更新或删除)相关的消息。分离后的数据与源表中的每行对应,它们根据操作类型(插入、更新或删除操作)被写到不同的数据集中。使用数据集 stage 写入适当的已处理数据,因为数据集采用原生文件格式,这种格式由引用实际数据的单一头文件组成,可以将该实际数据分割到多个 DataStage 并行分区中。因此,在并行环境中运行的 DataStage 很乐意使用这种格式,这种格式提供最好的性能。

图 2. 事件发布 DataStage 作业
事件发布 DataStage 作业

主 DataStage 作业执行的实际转换处理可以使用这些数据集作为输入。通常,DataStage 处理根据行操作(插入、更新或删除操作)分离数据。但是,如果对源数据有引用约束,就需要将更改数据写到单一数据集中,而不是三个不同的数据集。

下面几小节详细讨论如何设置 DataStage 作业。

配置 MQ Connector

下面的列表列出 MQ Connector 的配置:

  • 连接属性
    • Mode = server
    • Queue manager = QMDS
    • Username = db2inst1
  • 使用属性
    • Queue name = SALES_DATA_To_DATASTAGE
    • Access mode = As in queue definition
    • Wait time = 1
    • Message quantity = -1
    • Message read mode = Delete

下面是对配置元素的一些说明:

Wait time
使用这个属性指定等待新消息到达输入队列的最大秒数。默认值是 -1,这表示不限制时间。对于这个示例,把它设置为 1 秒。
Message quantity
使用这个属性指定要从输入队列接收的消息数量(不是行数)。对于这个示例,把它设置为 -1。-1 表示不限制消息数量。0 表示不读取消息。对于 message quantity,可以指定 -1 到 999999999 之间的整数。对于这个示例,message quantity 是 -1,wait time 是 1,所以 MQ Connector 可以读取到达队列的所有消息。
Message read mode
使用这个属性指定在当前事务中如何读取消息。对于这个示例,把它设置为 Delete(破坏性读取)。如果需要保留消息,也可以从下拉列表中选择 Move to work queue 选项。

创建一个名为 PayLoad 的列,类型为 Varchar,大小为 1000,见 图 3。因为这是文本消息,大小是指字符数。对于这个示例,只有消息头后面的部分是重要的。

图 3. MQ Connector 的配置
MQ Connector 的配置 - Column name = PayLoad

配置第一个转换器 stage

配置 SeparatePayLoad_Based_On_TableName 转换器 stage,见图 4。

图 4. 第一个转换器 stage 的配置
显示 PayLoad 定义

对于示例配置,注意以下几点:

  • 定义一个名为 SCHEMANAME 的 stage 变量,把它设置为输入 PayLoad 的第 5 列。
  • 定义一个名为 TABLENAME 的 stage 变量,把它设置为输入 PayLoad 的第 6 列。
  • 在图 4 中,SeparatePayLoad_Based_On_TableName 转换器 stage 有两个输出链接。每个链接通向另一个转换器 stage,使用 stage 变量 SCHEMANAME 和 TABLENAME 作为选择控制分离与 Product 和 Inventory 表相关的消息。

配置 stage 约束,见图 5。

图 5. 第一个转换器 stage 约束
显示约束的双引号

注意,消息中的所有非数字数据都放在双引号中。因此,在使用 TABLENAME stage 变量执行比较时,要把实际表名(PRODUCT 和 INVENTORY 表)放在一对额外的双引号中。

这个转换器 stage 根据源表名分离消息。样例消息见清单 2。

清单 2. 样例消息
10,"IBM","2009355","021255984000","ADMIN","PRODUCT","REPL","0000:0000:0000:0000:5ae5",
"0000:87a0:c204:0000:0000","2009-12-21-10.12.52",,0000,"100-201-01","Ice Scraper, 
Windshield 4 inch",3.99,,,,,"100-201-01","Ice Scraper1 Windshield 4inch",3.99,,,,"
<productpid=""100-201-01""><description><name>Ice Scraper,
Windshield 4 inch</name><details>Basic Ice Scraper 4 inches wide, foam
handle</details><price>3.99</price></description></product>"

对于这个样例消息,注意以下几点:

  • stage 变量 SCHEMANAME 定义为 Field(Read_MQPayLoad.PayLoad, ",",5)。消息中的第 5 列是模式名。
  • stage 变量 TABLENAME 定义为 Field(Read_MQPayLoad.PayLoad, ",",6)。消息中的第 6 列是表名。
  • 消息中的所有非数字数据都放在双引号中。因此,在定义 SCHEMANAME 或 TABLENAME 约束时,要把名称放在一对额外的双引号中。
  • 消息中的第 7 列总是操作类型:ISRT、REPL 或 DLET 分别表示插入、更新或删除。
  • 源数据从第 13 列开始。

配置第二个转换器 stage

分离与 PRODUCT 和 INVENTORY 表相关的消息之后,配置转换器 stage SeparatePayLoad_Based_On_I_U_D_Operation1 和 SeparatePayLoad_Based_On_I_U_D_Operation2 以解析行操作数据(插入、更新和删除)。图 6 说明如何配置 SeparatePayLoad_Based_On_I_U_D_Operation1 转换器 stage 以解析 PRODUCT 表中的列。

图 6. 第二个转换器 stage 的配置
图 6. 第二个转换器 stage 的配置

对于第二个转换器 stage 的配置,注意以下几点:

  • 定义一个名为 OPERATION 的 stage 变量,把它设置为输入 PayLoad 的第 7 列。
  • SeparatePayLoad_Based_On_I_U_D_Operation1 转换器 stage 有三个输出链接,见 图 2。每个链接通向一个数据集 stage,使用 stage 变量 OPERATION 作为选择控制分离与插入、更新和删除操作相关的消息。
  • 定义 Product 表中的所有列,把它们映射到 PayLoad 中的各个列。例如,对于插入操作,把列 PID、NAME、PRICE、PROMOPRICE、PROMOSTART 和 PROMOEND 映射到 PayLoad 中从第 19 列开始的列。
  • 对于插入操作,Product 表中所有列原来的值都是空的,所以第 13 到 18 列是空的。
  • 对于删除操作,所有列以后的值都是空的,所以第 19 列及后面的列是空的。
  • 更新操作同时具有原来的值和以后的值。
  • MQ 消息中的所有非数字列值都放在双引号中。

图 7 显示更新操作的列映射。

图 7. 更新操作的列映射
更新操作的列映射

对于更新操作的列映射,注意以下几点:

  • 图 7 显示更新操作的列映射(操作关键字 REPL)。
  • PayLoad 中的第 13 到 18 列包含原来的值,第 19 到 24 列包含以后的值。

图 8 说明如何分离 Inventory 表中的操作数据。

图 8. 分离 Inventory 表中的操作数据
分离 'Inventory' 表中的操作数据

注意,SeparatePayLoad_Based_On_I_U_D_Operation2 转换器 stage 的配置分离 Inventory 表中的操作数据。

配置数据集 stage

按本小节的说明完成 Product 表的插入、更新和删除数据集。图 9 说明如何设置数据集属性。

图 9. 数据集属性
数据集属性

对于数据集属性设置,注意以下几点:

  • 图 9 显示 Product 表中插入操作的数据所用的数据集 stage 属性。
  • 更新策略设置为 Append。一定要把所有数据集的更新策略设置为 Append。
  • 把文件属性设置为适当名称和完整路径。
  • 如果文件属性中的文件不存在,就会创建它。但是,路径中指定的目录应该已经在文件系统中存在。

图 10 显示数据集 stage Inserts_DataSet1 的列配置。

图 10. 插入数据集的列配置
插入数据集的列配置

图 11 显示更新数据集的列配置。

图 11. 更新数据集的列配置
更新数据集的列配置

对于更新数据集的列配置,注意以下几点:

  • 图 11 显示数据集 stage Update_DataSet1 的列配置。
  • 更新操作要捕捉原来的和以后的映像值。

图 12 显示删除数据集的列配置。

图 12. 删除数据集的列配置
删除数据集的列配置

对于删除数据集,注意以下几点:

  • 图 12 显示 Deletes_DataSet1 的列定义。
  • 这些是 Product 表中原来的映像值。

测试实时配置设置的步骤

下面是测试实时配置设置所需的步骤。下面几小节详细描述这些步骤。

  1. 启动 Event Publisher
  2. 导入 DataStage 作业
  3. 编译 DataStage 作业
  4. 通过运行测试脚本在源数据中引入更改
  5. 运行 DataStage 作业
  6. 检查数据集以确认已经填充数据

启动 Event Publisher

按以下步骤启动 Event Publisher。

  1. 打开一个 DB2 命令窗口,进入下载的 RealTimeInt\setupEP 目录。
  2. 运行 startQCapture.bat 脚本启动 Event Publisher。
  3. 确认成功地收到了程序发出的 ASN0572I 消息,见图 13。
图 13. 在 DB2 命令窗口中启动 Q Capture
在 DB2 命令窗口中启动 Q Capture

导入 DataStage 作业

按以下步骤导入 DataStage 作业。

  1. 启动 DataStage Designer,浏览到下载的 RealTimeInt 目录,导入 RealTimeDataInt.dsx 作业。
  2. 在 DataStage Import 窗口中,单击 OK 以完成作业的导入,见图 14 和图 15。
图 14. 导入 DataStage 作业
导入 DataStage 作业
图 15. 导入 DataStage 作业(续)
导入 DataStage 作业(续)

编译 DataStage 作业

按以下步骤编译 DataStage 作业:

  1. 如果您使用不同的 Q Manager 或接收队列名,那么双击 stage 进入属性页,修改 MQ Connector stage 中的这些值。
  2. 如果在 Windows® 平台上运行作业,那么修改所有数据集 stage 的文件属性:Inserts_DataSet1、Updates_DataSet1、Deletes_DataSet1、Inserts_DataSet2、Updates_DataSet2 和 Deletes_DataSet2。
  3. 单击 DataStage Designer 工具栏上的 Compile 编译 DataStage 作业,见图 16。
图 16. 编译 DataStage 作业
编译 DataStage 作业

通过运行测试脚本在源数据中引入更改

按以下步骤运行测试脚本:

  1. 打开一个 DB2 命令窗口,进入 RealTimeInt\setupEP 目录。
  2. 运行 updateSourceTables.sql 脚本。此脚本在 Product 和 Inventory 表中执行插入、更新和删除各一次。

运行 DataStage 作业

按以下步骤运行 DataStage 作业:

  1. 单击 All Programs > IBM Information Server > IBM WebSphere DataStage and QualityStage Director 启动 DataStage Director。
  2. 单击 DataStage Designer 工具栏上的 Run 图标运行作业,见图 17。
图 17. 运行 DataStage 作业
运行 DataStage 作业

检查数据集以确认已经填充数据

在 DataStage Director 中右键单击作业并单击 View Log,查看作业状态日志,见图 18。

图 18. 查看作业日志
图 18. 查看作业日志

也可以在 Designer 中检查作业是否成功运行。它以绿色显示数据流,还显示各个 stage 之间处理的行数。图 19 表明处理了 6 个消息,Product 和 Inventory 表的每个插入、更新和删除各一条消息。

图 19. 检查作业状态
检查作业状态

可以通过右键单击数据集并单击 View data 查看数据集。图 20 显示 Product 表的 Updates_DataSet1 中的数据。

图 20. 更新数据集
更新数据集

可以通过查看每个数据集检查填充的更改数据。


采用 SQL 复制的 stage 数据集成

设置 SQL 复制

在示例临时集成设置中,包含源数据的 SQL 复制配置在 Windows 系统 (svl-akrishni2) 上。运行 InfoSphere DataStage 的操作数据库(ODS)在 AIX 系统 scoobydoo 上。示例中没有给出数据仓库设置。数据仓库的位置并不重要,可以配置在完全不同的系统上。

SQL capture 程序从日志中捕捉源表(在这个示例中是 SALES 数据库中的 Product 和 Inventory 表)中的更改,并填充同一源数据库 SALES 中的更改数据(CD)表。SQL apply 从源数据库读取 CD 表并填充目标数据库 STAGEDB 中的临时表(CCD 表)。SALES 数据库和 SQL capture 在 Windows 系统 (svl-akrishni2) 上。STAGEDB 和 SQL apply 程序在 AIX 系统上。在目标系统(scoobydoo AIX 系统)上对到 SALES 数据库的连接进行编目。SQL apply 配置为事件驱动的。当发生某一事件时,比如 IBMSNAP_SUBS_EVENT 控制表中的一行更新为当前时间戳,SQL apply 把更改应用于目标 CCD 表。

在 AIX 系统上运行的 DataStage 作业从 STAGEDB 数据库中的 CCD 表提取数据,根据操作类型(插入、更新或删除)把数据写到三个不同的数据集中。如果从 CCD 表提取数据成功了,作业就清空 CCD 表。如果所有 CCD 表的所有提取和清空操作都成功了。那么更新 IBMSNAP_SUBS_EVENT 表中的某一事件,触发 SQL apply 开始处理另一个复制循环。

注意,SQL apply 程序每 5 分钟检查一次 IBMSNAP_SUBS_EVENT 表中的更新。因此,如果通过调度程序(DataStage Director 或外部调度器)调度 Staged Integration DataStage 作业自动运行,相邻循环之间的时间间隔应该是 5 分钟。如果两个连续的 Staged Integration DataStage 作业循环之间的时间间隔少于 5 分钟,可能会导致丢失数据,因为当 SQL apply 程序正在把更改应用于 CCD 表时,DataStage 作业可能会清空 CCD 表。

SQL 复制配置

SQL 复制配置的步骤如下:

  1. 创建 DB2 对象

    注意:这个步骤只是这个示例场景需要的;它不是 配置设置步骤。正常情况下,DB2 对象已经存在了,只需确定源表和要复制的数据子集。
  2. 设置 SQL 复制并创建 SQL 复制对象
  3. 操作 SQL capture 和 apply
  4. 设置 DataStage

下面几节详细描述示例。本教程的 下载 部分包含配置此样例需要的所有脚本。

创建 DB2 对象

对于示例设置,使用一个名为 SALES 的数据库并导入 PRODUCT 和 INVENTORY 表(下载 中提供的 ixf 文件)。这些是此场景使用的源表。也可以使用 DB2 V9.7 sample 数据库中的 PRODUCT 和 INVENTORY 表,或者把这些表导入选择的另一个数据库。

为了导入 Product 和 Inventory 表,进入下载的附件的 setupDB 目录,会在这里找到 product.ixf 文件和 inventory.ixf 文件。输入 db2cmd 打开 DB2 命令窗口,运行以下命令:

import from product.ixf of ixf create into product
import from inventory.ixf of ixf create into inventory

可以在目标系统(AIX 系统 scoobydoo)上创建目录数据库 STAGEDB,也可以使用自己选择的数据库作为 SQL 复制目标数据库。

设置 SQL 复制并创建 SQL 复制对象

可以使用复制管理 GUI 工具(复制中心)或 asnclp 命令行工具设置 SQL 复制配置。下面说明如何使用命令行工具设置 SQL 复制。

  1. 为了创建 SQL 复制对象,在目标系统(scoobydoo AIX 系统)上对到 SALES 源数据库的连接进行编目。
  2. 从目标系统运行所有脚本。对于这个示例,在目标系统上执行以下命令,对到 SALES 数据库的连接进行编目:
    db2 catalog tcpip node srcnode remote IPADDRESS server DB2_PORT
    db2 catalog db sales as sales at node srcnode

    IPADDRESS 是源系统(比如 svl-akrishni2)的 IP 地址,DB2_PORT 是源系统上数据库管理器实例的端口号。

  3. 在目标系统上,在 DB2 命令窗口中输入 asnclp –f crtCtlTablesCaptureServer.asnclp。crtCtlTablesCaptureServer.asnclp 脚本在 SALES 源数据库上创建捕捉控制表。
  4. 在目标系统上,在 DB2 命令窗口中输入 asnclp –f crtCtlTablesApplyCtlServer.asnclp。crtCtlTablesApplyCtlServer.asnclp 在 STAGEDB 目标数据库上创建应用控制表。
  5. 在目标系统上,在 DB2 命令窗口中输入 asnclp –f crtRegistration.in。crtRegistration.asnclp 脚本注册 Product 和 Inventory 源表。
  6. 在目标系统上,在 DB2 命令窗口中输入 asnclp –f crtSubscriptionSetAndAddMembers.in。crtSubscriptionSetAndAddMembers.asnclp 脚本创建一个订阅集并添加两个成员(每个源表一个)。

操作 SQL capture 和 apply

按以下步骤运行 SQL capture 和 apply:

  1. 在源系统上,在 DB2 命令窗口中输入 asncap CAPTURE_SERVER=SALES 以启动 SQL capture 服务器。SQL capture 程序开始捕捉 Product 和 Inventory 表中的更改。也可以使用 下载 的 StagedInt\setupSQLRep 目录中的 startSQLCapture.bat 脚本。
  2. 在目标系统上,在 DB2 命令窗口中输入 asnapply CONTROL_SERVER=STAGEDB APPLY_QUAL=AQ00 以启动 SQL apply 服务器。SQL apply 程序开始把 Product 和 Inventory 表中的更改应用于 CCD 表。也可以使用 下载 的 StagedInt\setupSQLRep 目录中的 startSQLApply.sh 或 startSQLApply.bat 脚本。

设置 DataStage

对于这个示例,Staged Integration DataStage 作业使用序列作业(见图 21),依次执行提取和清空 CCD 表以及触发 SQL apply 的不同作业(并行作业和操作系统命令)。

图 21. Staged Integration DataStage 作业
Staged Integration DataStage 作业

作业活动 stage Extract_From_StagedTable1 和 Extract_From_StagedTable2(每个 CCD 表一个)调用一个并行作业,这个作业从 CCD 表提取数据,并根据插入、更新或删除操作类型把数据追加到三个不同的数据集中。如果提取作业成功,用一个执行命令 stage 执行清空 CCD 表的脚本。如果所有 CCD 表的清空步骤都成功了,用一个执行命令 stage 执行在 STAGEDB 目标数据库上的 IBMSNAP_SUBS_EVENT 控制表中更新事件的脚本。一个序列器 stage 确保这一次序。如果提取作业或清空作业不成功,一个终止器 stage 停止所有作业。

StageDataIntSQLRep 是主序列作业,它控制临时集成中所有作业的流。作业序列从两个作业活动 stage 开始:Extract_From_StagedTable1 和 Extract_From_StagedTable2。这两个作业活动 stage 分别调用并行作业 extractFromProduct_CCD 和 extractFromInventory_CCD,从 CCD 表提取数据。

按以下步骤完成 stage 序列:

  1. 在作业活动 stage 的 Triggers 选项卡上,把两个触发器的表达式类型分别设置为条件 OK 和 Otherwise。触发器 Extract_OK 通向清空步骤,触发器 Extract_Not_OK 通向作业的终止。图 22 显示 Extract_From_StagedTable1 作业活动的 Triggers 选项卡。
图 22. 作业活动 stage 的 Triggers 选项卡
作业活动 stage 的 Triggers 选项卡
  1. 对于这个 stage,选择 Send STOP requests to all running jobs 选项,见图 23。所有作业终止。
图 23. 终止活动
终止活动
  1. 运行 prune_ProductCCDTable.sh 脚本(见清单 3)以清空源表 Product 的 CCD 目标表。注意,必须先把 DB2PATH 设置为 DB2 安装目录,然后才能使用下载的 StagedInt\scripts\Win 目录中的 Windows 版脚本 prune_ProductCCDTable.bat。
清单 3. 执行命令活动脚本
cd /home/db2inst1
. ./sqllib/db2profile
db2 connect to STAGEDB user admin using temp4fvt
db2 "delete from admin.PRODUCT_CCD"

与提取步骤一样,清空步骤也有两个触发器。触发器 Prune_OK1 通过一个作业序列器 stage 通向事件信号步骤。Prune_Not_OK 通向作业的终止。图 24 显示两个触发器的表达式类型分别设置为条件 OK 和 Otherwise。

图 24. 清空步骤的触发器
清空步骤的触发器

Prune_From_StagedTable1 执行命令 stage 的 Prune_OK 触发器通向一个采用 ALL 模式的作业序列器,见图 25。这个序列器确保只有在所有订阅集成员的所有 CCD 表的提取和清空步骤都成功的情况下,才会触发事件触发步骤。

图 25. 清空步骤的 Prune_OK 触发器
清空步骤的 Prune_OK 触发器

清单 4 给出触发 SQLApply 的事件触发脚本 continueSQLApply.sh。注意,必须先把 DB2PATH 设置为 DB2 安装目录,然后才能使用下载的 StagedInt\scripts\Win 目录中的 Windows 版脚本 prune_ continueSQLApply.bat。

清单 4. continueSQLApply.sh 脚本
cd /home/db2inst1
. ./sqllib/db2profile
db2 connect to STAGEDB user db2inst1 using temp4now
db2 "update ASN.IBMSNAP_SUBS_EVENT set EVENT_TIME=CURRENT_TIMESTAMP,
                    END_OF_PERIOD=CURRENT_TIMESTAMP WHERE EVENT_NAME='
                    DSEXTRACTDONE'"

创建从 CCD 提取数据的并行作业

并行作业 extractFromProduct_CCD 和 extractFromInventory_CCD 使用 DB2 连接器从 stage 表提取数据。作业使用一个转换器 stage 根据对源数据执行的操作类型(插入、更新或删除)分离行。作业把数据写到三个不同的数据集中:插入、更新和删除各一个数据集。

需要使用来自 DataStage 作业的数据的应用程序应该使用这些数据集。DataStage 作业通常根据插入、更新或删除行操作分离数据。但是,如果对源数据有引用约束,更改数据就需要写到单一数据集中,而不是三个不同的数据集。

图 26 显示示例中使用的并行作业。

图 26. 从 Product CCD 表提取数据的并行作业
从 Product CCD 表提取数据的并行作业

下面讲解创建并行作业的步骤。

创建表定义

按以下步骤创建表定义:

  1. 通过选择 Import > Table Definitions > Start Connector Import Wizard,为 Product 和 Inventory 源表的 CCD 目标表 PRODUCT_CCD 和 INVENTORY_CCD 创建表定义,见图 27。
图 27. 创建表定义:启动 Connector Import Wizard
创建表定义:启动 Connector Import Wizard
  1. 在 Connector Import Wizard 中,选择 DB2 Connector 选项,见图 28。
图 28. 创建表定义:DB2 连接器
创建表定义:DB2 连接器
  1. 在 Connection details 窗口中,输入实例名、数据库名、用户 ID 和密码,见图 29。
图 29. 创建表定义:连接详细信息
创建表定义:连接详细信息
  1. 可以测试连接以确认输入的值是正确的。
  2. 在 datasource 对话框中,从下拉列表中选择数据库名,比如 STAGEDB (db2inst1),见图 30。
图 30. 创建表定义:数据库名
创建表定义:数据库名
  1. 单击创建 CCD 目标表 PRODUCT_CCD 和 INVENTORY_CCD 所在的模式名,见图 31。
图 31. 创建表定义:模式名
创建表定义:模式名
  1. 在出现的选择对话框中,选择 PRODUCT_CCD 表并单击 Import 以导入表定义。
  2. 重复这些步骤导入 INVENTORY_CCD 表的表定义。

配置 DB2 连接器

按以下步骤配置 DB2 连接器:

  1. 打开 DB2 连接器的属性窗口,见图 32。
图 32. DB2 连接器的属性
DB2 连接器的属性
  1. 通过单击 BuildBuild new SQL (DB2 UDB SQL 8.2 Syntax) 构建 SQL 选择语句,其他属性保持默认值,见图 33。
图 33. DB2 连接器:SQL 构建器
DB2 连接器:SQL 构建器
  1. 在 SQL Builder 窗口中,浏览到左边面板上的 Table Definition,再到数据库 STAGEDB 下面的 PRODUCT_CCD 表。
  2. 把 PRODUCT_CCD 拖放到右边面板上。
  3. 选择所有列(原来的映像、以后的映像和 IBMSNAP 列)并拖放到 Select Columns 面板上。
  4. 单击 SQL 构建器的 OK 创建一个简单的选择语句。

配置转换器 stage Process_I_U_D_Operations_Data

从 PRODUCT_CCD 表中的所有列收集数据之后,配置转换器 stage Process_I_U_D_Operations_Data 以便根据插入、更新或删除行操作分离数据。行操作类型存储在 CCD 表的 IBMSNAP_OPERATION 列中,表示为 IUD。可以使用 IUD 分离行数据。

按以下步骤配置转换器 stage Process_I_U_D_Operations_Data:

  1. 在转换器 stage 的每个输出链接上设置约束,检查每个数据行的操作是 IU 还是 D
  2. 定义一个名为 OPERATION 的 stage 变量,通过把 select_from_CCD 输入中的 IBMSNAP_OPERATION 列拖放到 stage 变量的 Derivation 列,将其来源设置为输入列 IBMSNAP_OPERATION,见图 34。
图 34. 转换器 stage 的配置
转换器 stage 的配置
  1. 为转换器 stage 的每个输出链接定义约束,见图 35。
图 35. 转换器 stage 约束定义
转换器 stage 约束定义
  1. 映射 PRODUCT_CCD 表的转换器 stage 中的列,见图 36。
图 36. PRODUCT_CCD 表的转换器 stage 中的列映射
PRODUCT_CCD 表的转换器 stage 中的列映射

对于列映射,注意以下几点:

  • 把来自 PRODUCT 表的所有列定义为输出,映射输入链接 Select_From_CCD 中相同的列。
  • 更新操作的行数据同时包含原来的和以后的映像值。
  • 插入操作的行数据包含有效的以后映像值,原来的映像值是空的。
  • 删除操作具有有效的原映像值。

配置数据集 stage

按以下步骤配置数据集 stage:

  1. 打开 Product 表的插入操作数据的数据集 stage 属性窗口。注意。更新策略是 Append
图 37. 插入数据集的数据集属性
Inserts_DataSet 的数据集属性
  1. 确保所有数据集的更新策略都是 Append。插入数据集的列定义见图 38。
图 38. 插入数据集的列定义
数据集 stage Inserts_DataSet 的列定义

图 39 显示数据集 stage Update_DataSet 的列定义。注意,对于更新操作,要同时捕捉原来的和以后的映像值。

图 39. 更新数据集的列定义
更新数据集的列定义

图 40 显示 Deletes_DataSet1 的列定义。注意,这些是 Product 表中原来的映像值。

图 40. 删除数据集的列定义
删除数据集的列定义
  1. 按相同的步骤完成 Inventory 表的插入、更新和删除数据集。

测试临时数据配置设置

按以下步骤测试临时数据配置设置:

  1. 启动 SQL 复制,见图 41。
图 41. 启动 SQL Capture DB2 命令窗口
启动 SQL Capture DB2 命令窗口

对于启动 SQL 复制,注意以下几点:

  • start capture 控制台表明,有两个注册处于未激活状态。
  • 只有在 SQL apply 服务器启动而且在 SQL capture 和 SQL apply 之间有握手 的情况下,注册才会激活。
  1. 启动 DataStage Designer,见图 42。
图 42. Start capture 控制台
控制台
  1. 在 DataStage Import 窗口中,浏览到下载的 StagedDataInt 目录并单击 OK,导入 DataStage 作业 StagedDataInt.dsx,见图 43。
图 43. 导入 Staged Integration DataStage 作业
导入 Staged Integration DataStage 作业
图 43. 导入 Staged Integration DataStage 作业
导入 Staged Integration DataStage 作业(续)

导入 dsx 文件之后,会看到 StagedInt 目录中的三个作业。

  1. 打开 extractFromProduct_CCD 并行作业。
  2. 确保在 DB2 连接器 stage 中正确地设置数据库名(如果不是 STAGEDB 的话)、用户和密码。
  3. 通过正确地设置路径,确保三个数据集指向现有的文件。
  4. 通过在设计器中单击绿色图标,保存并编译作业。
  5. 打开 extractFromInventory_CCD 并行作业,重复相同的过程。
  6. 打开 StagedDataIntSQLRep 序列作业。
  7. 确保所有作业活动和执行命令 stage 指向正确的作业和脚本,见图 44。
  8. 保存并编译作业。
图 44. 编译临时集成作业
编译临时集成作业
  1. 打开一个 DB2 命令窗口,进入 StageDataInt\setupSQLRep 目录,运行脚本 updateSourceTables.sql。此脚本在 Product 和 Inventory 表上执行插入、更新和删除各一次。
  2. 打开一个 DB2 命令窗口,连接目标数据库 STAGEDB,输入以下 SQL 语句:
    Db2 select count(*) from ADMIN.PRODUCT_CCD
    Db2 select count(*) from ADMIN.INVENTORY_CCD

    这两个语句都应该返回行数 3,因为在两个源表中分别引入了三个插入、更新或删除操作。

  3. 通过单击 DataStage Director 工具栏上的 Run 图标运行 DataStage 作业。可以通过选择 All Programs > IBM Information Server > IBM WebSphere DataStage and QualityStage Director 启动 DataStage Director。
图 45. 运行临时集成作业
运行临时集成作业
  1. 检查作业状态。图 46 表明作业已经成功地完成了。
图 46. 检查作业状态
检查作业状态
  1. 查看作业日志,见图 47。
图 47. 查看作业日志
查看作业日志
图 47. 查看作业日志
查看作业日志(续)
  1. 查看数据集,确认已经填充数据,见图 48。
图 48. 查看数据集
查看数据集
  1. 查看 Updates_DataSet 中的数据,其中应该包含 PRODUCT_CCD 表(源表 Product 的 CCD 目标表)中的更新,见图 49。
图 49. 查看更新数据集
查看数据集(续)
  1. 检查 IBMSNAP_SUBS_EVENT 表中的 END_OF_PERIOD 是否更新为当前时间戳,见图 50。
图 50. 检查在 IBMSNAP_SUBS_EVENT 中是否更新了继续 SQL apply 事件
检查在 IBMSNAP_SUBS_EVENT 中是否更新了继续 SQL apply 事件
  1. 图 51 中的数据 stage 日志文件表明,Execute Command Activity ContinueSQLApply 作业运行成功。
图 51. 作业日志
作业日志

结束语

第 1 部分:集成 InfoSphere Replication Server 与 InfoSphere DataStage 介绍了 InfoSphere Replication Server 和 DataStage 产品的技术以及通过集成它们来填充数据仓库的不同方式。第 2 部分详细介绍了两种具体的集成方法:使用 MQ 消息和使用临时表。本教程通过屏幕截图和详细的步骤说明带领读者设置和配置这两种集成方法。有了本系列提供的信息,您的公司可以轻松地集成 InfoSphere Replication Server 和 InfoSphere Information Server 的 DataStage,从而高效地填充数据仓库。


下载

描述名字大小
样例配置脚本scripts-paper-mastercopy.zip486KB

参考资料

学习

获得产品和技术

讨论

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Information Management, WebSphere
ArticleID=600562
ArticleTitle=使用实时数据填充数据仓库的高效解决方案,第 2 部分: 探索两个不同的集成选项:使用临时表或使用 MQ 消息
publish-date=12132010