内容


DataStage 命令行集成

利用 Linux 和 UNIX® 中的一种最常用的进程间通信机制

Comments

集成场景

DataStage 作业通常用于按批次处理数据,它们计划按特定的间隔运行。在没有具体的计划要遵循时,DataStage 操作员可通过 DataStage 和 QualityStage Director 客户端或在命令行手动启动作业。如果在命令行运行作业,那么您可以按如下方式执行它。

dsjob -run -param input_file=/path/to/in_file -param output_file=/path/to/out_file dstage1 job1

图 1 给出了一个表示此命令的图表。

图 1. 调用 DataStage 作业
在命令行调用 dsjob 的表示图
在命令行调用 dsjob 的表示图

在正常环境中,in_file 和 out_file 存储在运行 DataStage 的机器上的某个文件系统中。但在 Linux 或 UNIX 中,输入和输出可传送到一系列命令中。例如,当需要对程序进行排序时,可执行命令 command|sort |uniq > /path/to/out_file。在这种情况下,图 2 显式了数据流,其中一个命令的输出变成下一个命令的输入,最终的输出位于文件系统中。

图 2. UNIX 典型管道用法
通过 UNIX 中的管道调用一些命令
通过 UNIX 中的管道调用一些命令

假设中间流程生成了几百万行代码,您可避免获取中间文件,从而节省文件系统中的空间以及写入这些文件的时间。与 UNIX 中执行的许多程序或命令不同,DataStage 作业不会通过管道获取标准输入。本文将介绍一种方法,展示如何使用脚本完成此任务,以及它的实际用法。

如果作业应接受标准输入并生成标准输出,就像常规 UNIX 命令一样,那么必须通过一个包装器脚本按以下方式调用它:command1|piped_ds_job.sh|command2 > /path/to/out_file

或者可能您必须将输出发送到某个文件,比如使用命令 command1|piped_ds_job.sh > /path/to/out_file

图 3 中的图表展示了应如何架构该脚本。

图 3. 一个 DataStage 作业的包装器脚本
一个使用管道代替常规文件的 DataStage 作业的包装器脚本
一个使用管道代替常规文件的 DataStage 作业的包装器脚本

该脚本必须将标准输入转换为指定管道,还必须将 DataStage 作业的输出文件转换为标准输出。在下面几节中,您将学习如何完成此任务。

开发 DataStage 作业

DataStage 作业不需要任何特殊对待。对于本示例,您需要创建一个作业对文件进行排序,如果文件正常运行,则会获取至少两个参数:输入文件和输出文件。但是,如果它的函数需要使用更多的参数,那么该作业可以拥有更多的参数,但对于本练习,最好保持简洁。

该作业如图 4 所示。

图 4. 简单的 DataStage 排序作业
仅接受两个参数的简单的 DataStage 排序作业。
仅接受两个参数的简单的 DataStage 排序作业。

此作业的 DSX 可从本文的下载部分获得。该作业接受一个文本文件,将整个行视为一列,对其进行排序,并将它写入输出文件。

此外,该作业将必须允许多个示例执行。它应接受没有分隔符和引号的输入行,输出文件具有相同的特征。

编写和使用包装器脚本

包装器脚本将会包含必要的代码,以便为指定管道创建临时文件,并创建调用 DataStage 作业的命令 (dsjob)。具体来讲,该脚本必须执行以下操作。

  • 将标准输入(这是传送给它的命令的输出)传送到某个指定管道。
  • 将作业的输出写入另一个指定管道,然后由该管道将输出传输到进程的标准输出,以便下一个命令可以在一个管道中读取输出。
  • 调用 DataStage 作业,使用之前创建的指定管道的文件名来指定输入文件和输出文件参数。
  • 清理为指定管道创建的临时文件。

现在开始编写包装器脚本。第一组命令将准备好环境,从安装目录获取 dsenv 文件并设置一些变量。您可以使用进程 ID (pid) 作为标识符,在临时目录中创建一个临时文件,如清单 1 所示。

清单 1. 准备 DataStage 环境
#!/bin/bash
dshome=`cat /.dshome`
. $dshome/dsenv
export PATH=$PATH:$DSHOME/bin

pid=$$
fifodir=/data/datastage/tmp
infname=$fifodir/infname.$pid
outfname=$fifodir/outfname.$pid

您可以继续创建 FIFO 和执行 dsjob。现在,该作业将等待管道开始接收输入。如果 DataStage 作业执行抛出了一个错误,那么该代码将会发出警告,如清单 2 所示。

清单 2. 创建指定管道并调用作业
mkfifo $infname
mkfifo $outfname
dsjob 	-run -param inputFile=$infname \
    -param outputFile=$outfname dstage1 ds_sort.$pid 2> /dev/null  &
                
if [ $? -ne 0 ]; then
    echo "error calling DataStage job."
    rm $infname
    rm $outfname
    exit 1
fi

dsjob 命令结束时,可以看到一个 & 符号,该符号必不可少,因为该作业在等待输入指定管道发送数据,但数据将提前几行传输。

以下代码通过一个简单的 cat 命令来准备要发送给标准输出的输出。可以看到,cat 命令和 rm 命令在圆括号内,表明这两个命令在一个发送到后台的子 shell 中调用(由行末的 & 符号指定),如清单 3 所示。

清单 3. 处理输入和输出指定管道
(cat $outfname;rm $outfname)&
                
if [ -z $1 ]; then
    cat > $infname
else
    cat $1 > $infname
fi
                
rm $infname

后者必不可少,因为当作业完成输出的写入后,会删除临时指定管道文件。后续代码测试是否使用一个参数调用该脚本作为一个文件,或者您是否从一个管道接收数据。将输入流(文件或管道)发送给输入指定管道后,您会完成和删除该文件。

您可将该脚本命名为 piped_ds_job.sh 并按前面提到的方式执行它:command1|piped_ds_job.sh > /path/to/out_file。事实上该脚本可通过一个匿名管道接收输入,这支持清单 4 中的用法。

清单 4. 包装器脚本的用法
command1|piped_ds_job.sh|command2
zcat compressedfile.gz |piped_ds_job.sh > /path/to/out_file
zcat compressedfile.gz |ssh -l dsadm@victory.ibm.com piped_ds_job.sh| command2

最后一个示例(您在其中使用了 SSH)假设您从另一个机器执行脚本,因此以某种方式将 DataStage 作业用作服务。这也是绕过文件传输(以及此情形下的解压)的一种代表性用法。

结束语

本文介绍的机制支持在命令行和 shell 脚本中更灵活地调用 DataStage 作业。您可以轻松地自定义所介绍的包装器脚本,让它更加通、更灵活。这项技术非常简单,可为当前作业快速实现此技术,通过 SSH 远程在服务中转换它们。当文件包含数千万个行时,应避免将数据加载到常规文件中,这样做的好处最明显,不过,即使您的数据没有这么大,这种集成用法也非常有用。


下载资源


相关主题


评论

添加或订阅评论,请先登录注册

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Information Management
ArticleID=938320
ArticleTitle=DataStage 命令行集成
publish-date=07232013