在 IBM WebSphere DataStage 中使用 Java Pack 组件实现用户自定义数据转换逻辑

IBM WebSphere DataStage (最新产品已更名为 InfoSphere DataStage)是一个高效的数据集成和数据整合工具,向开发人员提供了丰富的数据转换功能。本文着重介绍了如何使用 WebSphere DataStage 中提供的 Java Pack 组件来实现用户自定义的数据转换逻辑,并且以一个 Server Job 为实例向读者讲解了使用 Java Pack 组件进行开发的整个过程。

李志永 (lizyzy@cn.ibm.com), 软件工程师, IBM

李志永,IBM CDL 软件工程师,熟悉 DB2、IBM WebSphere DataStage,对数据整合以及 Java 技术非常感兴趣。



张能斌, 高级软件工程师, IBM

张能斌,IBM CDL 软件工程师,熟悉 DB2、IBM WebSphere DataStage,从事 ETL 方面的数据开发工作。



2009 年 4 月 23 日

IBM WebSphere DataStage 是一个图形化的 ETL(data Extract, Transform, and Load)开发环境,开发人员通过它可以便捷的实现企业的数据整合流程,从数据源中抽取数据,然后对这些数据进行转换,最终加载到目标数据库或者数据仓库中,而这个过程的核心又是对数据的清洗和转换。

IBM WebSphere DataStage 提供了各种常用的数据转换功能,可以满足大部分用户的需要。同时,对于某些比较复杂的数据转换逻辑,当 DataStage 内置的 Stage、转换器和内置函数都无法实现或难以实现时,其还提供了若干更灵活的方法,使得用户能够通过相应的编程接口来实现用户自定义的数据转换逻辑。

实现用户自定义数据转换逻辑

在 WebSphere DataStage 中实现用户自定义数据转换逻辑有三种方法。

  • 通过 WebSphere DataStage BASIC 编程语言实现。 DataStage BASIC 语言是一种可以运行在 DataStage 平台上,功能强大的面向过程的开发语言。用户可以使用 DataStage BASIC 语言撰写用户自定义函数和自定义转换器来封装复杂的数据转换逻辑。关于具体方法,用户可以参考发表在 developerWorks 中国网站上的《 WebSphere DataStage BASIC 语言开发实践》一文,详见参考资源。
  • 对于 Parallel Job 来说,用户还可以通过为 WebSphere DataStage 创建定制操作符来实现。 DataStage 作业运行时,后台实际上是通过一个个操作符来对输入数据流的进行处理并产生相应的输出数据流。 DataStage 提供了相应的 C++ API,使得用户可以创建自定义的操作符,并加载到 DataStage 中使用。关于具体方法,用户可以参考 developerWorks 中国网站上的教程《为 WebSphere DataStage 创建定制操作符》,详见参考资源。
  • 通过 Java Pack 组件调用 Java 代码实现。 Java Pack 组件提供了一组 Java API,使得 DataStage 能够调用用户 Java 代码。借助 Java 语言的强大功能,用户可以方便实现任何复杂的数据转换逻辑。本文将主要介绍如何通过 Java Pack 组件来实现用户自定义数据转换逻辑。

Java Pack 组件简介

Java 语言是一种功能强大、跨平台的通用编程语言,在企业级应用中被广泛运用。 WebSphere DataStage 通过 Java Pack 组件提供了调用外部 Java 代码的机制,能够有效将已有的 Java 企业应用集成到数据整合流程之中,同时,也让 ETL 开发人员能够在 DataStage 中充分利用 Java 语言提供的强大功能,提高开发效率。

在 DataStage 7.x 中,Java Pack 是作为一个附加组件提供的,需要单独进行安装,而在最新版本的 WebSphere DataStage 8 中,Java Pack 已经作为标准组件提供了。 Java Pack 提供了两种不同的 Stage,分别是 Java Client Stage 和 Java Transformer Stage,开发人员可以在组件面板的 Real Time 分类下找到这两个 Stage,如图 1 所示。

图 1. Java Pack 组件
图 1. Java Pack 组件

Java Client Stage 和 Java Transformer Stage 都可以调用外部的 Java 类。 Java Client Stage 主要用于对支持 Java 语言的数据系统进行读取、查询或写入。 Java Transformer Stage 则用于对数据进行处理,它通过调用 Java 代码将 Input link 上输入的数据进行转换,并将结果输出到 Output link 上。本文中实现用户定义数据转换逻辑所用到的正是 Java Transformer Stage 。


Java Pack API 介绍

WebSphere DataStage 提供了一系列简单易用的 Java API 来与外部 Java 类协同工作,并进行数据处理。这些 API 使得 Java 程序可以在运行时与 Datastage 引擎直接交互,获取当前作业中的 columns 和 links 的相关元数据,并在需要时在这些 links 上读取或者写入数据。开发人员需要在其 Java 类中定义若干特殊的方法,如 process() 方法,并在其中调用自己的数据处理逻辑。

Java Pack API 所对应的包名是 com.ascentialsoftware.jds 。以 WebSphere Datastage 8 和 Windows 操作系统为例,其相应的 Jar 包文件为 C:\IBM\InformationServer\Server\DSEngine\java\lib\tr4j.jar 。该包由 3 个 public 类构成,分别是 Stage 类,Row 类和 Column 类。

任何一个能被 DataStage 引擎调用的外部 Java 程序都必须实现一个 Stage 类的子类,其代码的框架结构如下。

public class Mytransformer extends Stage 
{ 
	public void initialize() 
		{ 
			// ...initialize logic 
		} 
	public void terminate() 
		{
			// ...terminate logic 
		} 
	public int process() 
		{ 
			Row inputRow = readRow(); 
			// ...process input row... 
			Row outputRow = createOutputRow(); 
			// ...fill output row... 
			writeRow(outputRow); 
			return OUTPUT_STATUS_READY; 
		} 
}

Stage 类抽象了 Java Client Stage 和 Java Transformer Stage 。在实现其子类时,开发人员必须根据自己的需求重新实现并覆盖 process() 方法,而 initialize() 方法和 terminate() 方法是可选的。

当一个 Java Client Stage 或 Java Transformer Stage 准备开始在 DataStage 作业中运行时,Java Pack API 将先调用 initialize() 方法。如果开发人员需要在数据处理前进行一些初始化工作,如设置计数器、读取用户自定义属性或打开数据库连接等,可以通过在子类覆盖该方法来完成。

process() 方法是处理输入输出数据的入口方法,每当数据到达并需要处理的时候,它就会被调用。在该方法内,开发人员可以分析到达的每一条记录,并调用自定义数据转换逻辑的代码进行数据转换,然后将转换后的数据输出。

当所有数据都处理完毕后,Java Pack API 将调用 terminate() 方法,开发人员可以通过在子类中覆盖该方法,进行某些资源的释放和清除操作。

下表中列举了 Stage 类所提供的一些重要方法。

表 1. Stage 类中的重要方法
方法名描述
initialize()子类中覆盖该方法进行初始化操作
terminate()子类中覆盖该方法释放相关资源
process()子类中覆盖该方法处理输入输出数据
createInputRow()创建一个对应 Input link 的数据记录对象
createOutputRow()创建一个对应 Output link 的数据记录对象
createRejectRow()创建一个对应 Reject link 的数据记录对象
readRow()从 Input link 上读取下一条可获取的数据记录
rejectRow()将一条数据记录写到 Reject link 上
writeRow()将一条数据记录写到 Output link 上
getUserProperties()获取用户自定义属性

Row 类是一条数据记录的抽象,它提供了相关方法得到每条记录上的字段数,并读取和设置每每个字段的值。

下表列举了 Row 类所提供的一些重要方法。

表 2. Row 类中的重要方法
方法名描述
getColumn()返回记录上相应字段
getColumnCount()返回一条数据记录上的字段数
getValueAsSQLTyped()读取某个字段的值
setValueAsSQLTyped()设置某个字段的值

Column 类是一个数据记录字段的抽象,它提供了相关方法来查询该字段的元数据信息,如字段名,数据类型,字段长度,字段在记录中的位置等,同时提供了方法来读取和设置该字段的值。

下表列举了 Column 类所提供的一些重要方法。

表 3. Row 类中的重要方法
方法名描述
getIndex()获取字段在记录中的位置
getName()获取字段名
getSQLType()获取字段的数据类型
isKey()判断该字段是否是主键的一部分
nullAllowed()判断该字段是否可以设置 Null 值
getValueAsSQLTyped()读取该字段的值
setValueAsSQLTyped()设置该字段的值

Java Pack 开发示例

使用 Java Pack 进行用户自定义数据转换逻辑开发,我们可以将其粗略分为四个阶段。

  1. 明确数据转换需求
  2. 开发 Java 代码
  3. 开发 DataStage 作业
  4. 测试并运行 DataStage 作业并检查运行结果

下面我们就按照这四个阶段来分步开发一个 DataStage 作业实例。

明确数据转换需求

我们有若干条员工信息的数据记录,其中一个字段是电子邮件地址,我们的需求是判断这个电子邮件地址是否是一个合法的电子邮件地址,并过滤掉电子邮件不合法的那些记录。

测试数据如图 2 所示。

图 2. 测试数据
图 2. 测试数据

员工信息的数据将存放在 CSV 文件中,过滤后,电子邮件地址合法的数据与电子邮件地址不合法的数据将分别写入到两个新的 CSV 文件中。

开发 Java 代码

根据需求,我们首先需要开发一个 Java 类用来判断一个电子邮件是否合法,这里我们实现一个 EmailChecker 类,其代码如下:

清单 1. EmailChecker 类
package com.ibm.cn.javapack;

import java.util.regex.*;

public class EmailChecker {
	public static int IsValidEmaill(String addr)
	{
		String patternString = "[\\w-\\.]+@([\\w-]+\\.)+[\\w-]{2,4}";
		Pattern pattern =null;
		try
		{
			pattern = Pattern.compile(patternString);
		}
		catch (PatternSyntaxException e)
		{
			return -1;
		}
		Matcher matcher = pattern.matcher(addr);
		boolean result = matcher.matches();
		if (result) 
			return 1 ; 
		else return 0 ;
	}
}

这里我们可以使用 Java 提供的正则表达式匹配功能,轻松实现对合法电子邮件的检测。我们可以通过调用 EmailChecker 类的 IsValidEmail() 方法来检测电子邮件地址的合法性。

下面,我们将实现一个 Stage 类的子类 ValidEmailFilter,并在该类中根据输入数据记录的电子邮件地址进行过滤并输出数据。

清单 2. ValidEmailFilter 类
package com.ibm.cn.javapack;
import java.text.ParseException;
import com.ascentialsoftware.jds.*;
public class ValidEmailFilter extends Stage {	
	int filterColumnIndex;	
	public void initialize() {
		String userProperties = getUserProperties();		
		filterColumnIndex = Integer.parseInt(userProperties);		
	}
	public int process() {		
		Row inputRow = readRow();		
		if (inputRow == null) {			
return OUTPUT_STATUS_END_OF_DATA;
}		
	int columnCount = inputRow.getColumnCount();
	Row outputRow = null;		
	String emailAddress = 
		inputRow.getValueAsString(filterColumnIndex).trim();		
	int validateResult = 
		EmailChecker.IsValidEmaill(emailAddress);		
	if (validateResult == 0 "" hasRejectLink())			
		outputRow = createRejectRow();		
	if (validateResult == 0 ""  !hasRejectLink())			
		return OUTPUT_STATUS_NOT_READY;		
	if (validateResult == 1)		
		outputRow = createOutputRow();		
	for (int i = 0; i < columnCount; i++) {			
		Object sqlObject = null;						
		try {
			sqlObject = inputRow.getValueAsSQLTyped(i);
		} catch (ParseException e) {				
			e.printStackTrace();
		}			 
		outputRow.setValueAsSQLTyped(i, sqlObject);
	}
	if (validateResult == 0 ""  hasRejectLink()){
		rejectRow(outputRow);			
	} else writeRow(outputRow);		
	return OUTPUT_STATUS_READY;
	}
}

在 ValidEmailFilter 类中,由于没有相关资源的释放和清除操作,我们仅实现了 initialize() 方法和 process() 方法。在 initialize() 方法中,我们调用了 getUserProperties() 方法来获取一个用户自定义属性,该属性的值将指出电子邮件地址对应的字段在数据记录中的位置。我们将在开发 DataStage 作业时指定该值。 readRow() 方法将在 Input link 上读入一行数据记录,getValueAsString() 方法取出该条记录上电子邮件地址对应字段上的值,并交由 EmailChecker 类来检查该地址的合法性。如果合法,将由 writeRow() 方法写入到 Output link 上,否则由 rejectRow() 方法写入到 Reject link 上。

将 Java 代码编译成为相应的 .class 文件,需要注意的是,要尽量使用与 DataStage 引擎所使用的 JRE 版本相同的 JDK 来编译开发。

开发 DataStage 作业

打开 WebSphere DataStage Designer,新建一个 Server Job,如图 3 所示。

图 3. DataStage 作业流程图
图 3. DataStage 作业流程图

这里“ Filter_Email ”就是一个 Java Transformer Stage,它从“ Employee_List ”这个 Sequential File Stage 中读取数据,并将结果分别输出到“ Result_File ”和“ Reject_File ”这两个 Sequential File Stage 。

下面我们来设置该 Job 的属性和各个 Stage 的属性:

  1. 设置 Job 属性
  2. 设置“ Employee_List ” Stage,“ Result_File ” Stage 和 Reject_File ” Stage
  3. 设置“ Filter_Email ” Stage

设置 Job 属性

打开 Job Properties 对话框,如图 4 所示,在 Parameters 页上设置如下 DataStage 参数:

  • class_path:指定 Java Transformer Stage 需要的 Java 类文件所在的目录
  • class_name:指定开发人员实现的 Stage 子类的类名(包括相应的 Package 名)
  • email_column_index:指定电子邮件地址在数据记录中的位置(该字段在第几列)
  • data_dir:源数据文件和输出数据文件存放的路径
图 4.设置 DataStage 作业参数
图 4.设置 DataStage 作业参数

设置“ Employee_List ” Stage,“ Result_File ” Stage 和“ Reject_File ” Stage

在“ Employee_List ”,“ Result_File ”和“ Reject_File ”三个 Sequential File Stage 中,分别指定数据文件路径,数据文件格式和数据列的定义。我们以“ Employee_List ” Stage 为例,来说明如何设置这三个 Stage 。

打开“ Employee_List ” Stage 的属性窗口,我们需要在“ Employee_List ” Stage 中指定数据源文件和设置数据列定义。

请参看图 5,在 General 选项卡中,我们设置数据源文件为 employee.txt,数据以 CSV 格式存放。在这里,我们使用了用户定义的参数 #data_dir# 。

图 5.指定数据文件
图 5.指定数据文件

切换到 Columns 选项卡,如图 6 所示,我们定义了 EMP_NUM、EMP_NAME、EMAIL,TEL_NUM 四个数据列。

图 6.设置数据列定义
图 6. 设置数据列定义

设置“ Filter_Email ” Stage

双击打开“ Filter Email ” Stage,在 Stage 页的 General 选项卡上,我们可以指定 Transformer Class Name 和 User ’ s Classpath,填入我们事先定义好的 DataStage 参数,如图 7 所示。

图 7.设置数据列定义
图 7.设置数据列定义

切换到 Properties 选项卡,我们来设置用户自定义属性,如图 8 所示。在这里我们输入预定义好的参数 #email_column_index# 。在 User ’ s Properties 中输入的值,都可通过上面提到的 Stage 类的 getUserProperties() 方法以字符串的形式在 Java 程序中取到。

图 8. 设置用户属性
图 8. 设置用户属性

切换到 Option 选项卡,这里可以指定 Java 代码在运行时所用到的 Java 虚拟机参数,本例中并不需要特别指定任何 Java 虚拟机参数。

图 9. 指定 Java 虚拟机参数
图 9. 指定 Java 虚拟机参数

切换到 Output 页,这里可以指定 Reject link 。需要注意的是,Java Transformer Stage 仅支持一条 Input link,一条 Output link 和一条 Reject link 。本例中我们指定 rejected 为 Reject link,被 Reject 的数据将流入到“ Reject_File ” Stage

图 10. 指定 Reject link
图 10. 指定 Reject link

测试并运行 DataStage 作业并检查运行结果

在完成 DataStage 作业的开发工作后,我们就可以编译和运行了。如图 9 所示,作业成功运行,有 3 条数据流入“ Result_File ” Stage 中,另外两条数据流入“ Reject_File ” Stage 中。打开 “ Result_File ” Stage 和 “ Reject_File ” Stage 的属性对话框,在 Input 选项卡,点击 View Data …按钮,就可以浏览运行结果。请参看图 12 和图 13 中的数据结果。数据源中的数据记录被“ Email_Filter ” Stage 根据其电子邮件地址的合法性正确的处理了。

图 11. 成功运行后的 JOB
图 11. 成功运行后的 JOB
图 12. “ Result_File ” Stage 中的数据结果
图 12. “ Result_File ” Stage 中的数据结果
图 13. “ Reject_File ” Stage 中的数据结果
图 13. “ Reject_File ” Stage 中的数据结果

结束语

本文着重介绍了如何使用 WebSphere DataStage 中提供的 Java Pack 组件来实现用户自定义的数据转换逻辑,并且以一个 Server Job 为实例向读者讲解了使用 Java Pack 组件进行开发的整个过程。通过阅读本文,读者对使用 Java Pack 组件有了初步的了解。读者可以参照这个实例和后面的参考资料,开发满足实际需要的数据逻辑转换功能。

参考资料

学习

获得产品和技术

讨论

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Information Management, Java technology
ArticleID=385009
ArticleTitle=在 IBM WebSphere DataStage 中使用 Java Pack 组件实现用户自定义数据转换逻辑
publish-date=04232009