创建一个简单的 Compute Grid 并行批处理应用程序

运行在 WebSphere Network Deployment Cluster 上的使用并行功能的批处理应用程序

本教程介绍如何开发一个简单的 Java 批处理应用程序,使用 IBM® Rational® Application Developer 8.0.2 作为开发环境,使用 WebSphere® Extended Deployment Compute Grid 8.0.0.1 作为运行时环境。此外,该应用程序使用 Compute Grid 所提供的 Parallel Job Manager 工具在 WebSphere Application Server Network Deployment 环境中执行并行作业。该教程还将介绍 WSBatchPackager 实用程序和它在从 Plain Old Java Object (POJO) 类封装一个批处理应用程序的过程中的用途。

Abhilash Usha, IT 专家, IBM

Abhilash Usha 的照片Abhilash Usha 是 IBM 印度软件实验室的一位 IT 专家。他在 IT 行业拥有超过 7 年从业经验。作为 WebSphere 基础架构团队的一员,他参与应对 WebSphere Application Server 严峻形势。他的专业技能领域包括 J2EE 应用程序和 WebSphere Application Server Infrastructure 的设计和开发。目前他专攻 WebSphere eXtreme Scale 和 WebSphere Extended Deployment Compute Grid 相关项目。



2013 年 9 月 23 日

简介

批处理是业务系统的一个重要方面,它用在帐单系统或报告生成,以及一天结束时的结算系统等领域中。随着业务系统在全球被夜以继日的使用,批处理窗口变得越来越窄,这使高效的批处理系统成为一种切实的需求。WebSphere Extended Deployment Compute Grid(下文简称 Compute Grid)是一个完整的、开箱即用的批处理平台,提供了一个高效、可靠、可扩展、高度可用和安全的批执行环境。

本文基于 WebSphere Compute Grid V8。我们使用 Rational Application Developer V8 的批处理作业开发特性来构造一个简单的事务性批处理应用程序。然后修改它以包含并行作业管理器工具。本文详细介绍了从头开发一个批处理应用程序的分步过程,以及如何使用 Compute Grid 所提供的 Parallel Job Manager(下文简称 PJM)工具在批处理作业中实现并行性。

关于本教程

示例批处理应用程序名为 EmployeeBatchV8,它从 EMPLOYEE 表获取员工数据,执行一定的处理,然后将更新的信息插入 EMPLOYEEOUTPUT 表中。我们将有大约 10,000 条员工输入记录,它们来自生活在美国不同州的员工,州缩写范围从 AL 到 WY。使用 Parallel Job Manager Facility,主要作业被拆分为不同的次要作业(AL-MO 和 MT-WY)并得到独立的处理。我们将改写 Parameterizer 系统编程接口 (SPI),以向每个次要作业提供一组独立输入,以便它们可在一个集群环境中不同的网格端点 (GEE) 中并行运行。参见图 1。

托管批处理应用程序的服务器 JVM 称为网格端点。

图 1. 应用程序概述
应用程序概述

目标

在本教程中,您将学习如何

  • 使用 Rational Application Developer 8 和 Compute Grid API 开发批处理应用程序
  • 使用 Compute Grid 的 Parallel Job Manager 工具
  • 将应用程序部署在 WebSphere Application Server Network Deployment 集群上并监视作业
  • 使用 WSBatchPackager 实用程序从 POJO 类创建一个批处理应用程序

前提条件

您必须熟悉使用基于 Eclipse 的 IDE 和 WebSphere Application Server Network Deployment(下文简称 Application Server)中的 Application Deployment 开发 Java 应用程序。

系统需求

要运行本教程中的示例,您需要在任何支持的环境中安装了 WebSphere Application Server V7.0.0.l7 或更高版本(最好是 ND)和 WebSphere Compute Grid 8.0.0.1。参见图 2。本教程所使用的环境设置为:

  • Windows XP 机器
  • 安装了 WebSphere Application Server 7.0.0.19、Compute Grid 8.0.0.1
  • 创建了 Network Deployment Manager 配置文件。
    • 配置文件名称:Dmgr01
    • 节点名称:${shorthostname}CellManager01
    • 服务器:dmgr
  • 创建了托管节点配置文件。(配置文件名称:AppSrv01
  • 将托管节点连锁到了 Network Deployment(ND) 单元中
  • 创建了集群(集群名称:CGCluster),它具有 2 个服务器(server1server2
  • 在同一个单元中创建了另一个名为 SchedulerClone 的服务器作为调度程序
  • 安装 DB2 UDB V9.7 并创建了 Employee 数据库。运行本教程中以下载形式提供的 ddl 文件,以创建 Employee 和 EMPLOYEEOUTPUT 表
  • 将 Compute Grid 数据源从默认的 derby 数据库迁移到了 DB2
  • 在 Application Server 控制台中将一个 DataSource 配置为指向 Employee Database。(JNDI 名称:jdbc/employeedbxa
  • 安装 Rational Application Developer 8.0.2 或最新版本,并打开了 Compute Grid Tools for Modern Batch 特性
图 2. 示例基础架构图
示例基础架构图

使用 Rational Application Developer V8 创建一个简单的批处理应用程序

创建一个批处理应用程序:

  1. 在 Rational Application Developer 中打开一个空工作区。
  2. 创建一个名为 EmployeeSampleV8 的新批处理项目并单击 Finish。Rational Application Developer 会创建 EJB、Batch 和 Enterprise Project(参见图 3)。
图 3. 创建批处理项目
创建员工批处理项目
  1. 选择 EmployeeSampleV8EJB 项目(参见图 3)并选择 New -> Batch Job(参见图 4)
图 4. 创建批处理作业
创建批处理作业
  1. Batch Job Creation 对话框上(参见图 5),指定 Job NameEmployee 并单击 Next
图 5. 指定员工批处理控制器
指定员工批处理控制器
  1. 接下来,创建一个作业步骤。在 Batch Step Creation 对话框上(参见图 6),指定:
    • NameCopyEmployeeStep
    • PatternGeneric
    • EnablePerformanceMeasurementtrue
    • debugtrue
图 6. 创建批处理步骤
创建批处理步骤
  1. 单击 Required PropertiesBATCH-RECORDPROCESSOR 行中的 Create(参见图 6)。Create Class 对话框将出现(参见图 7)。创建一个新类,它实现 BatchRecordProcessor 接口:
    • NameEmployeeBatchProcessor
    • Packagecom.trial.Employee

    单击 Finish。(我们稍后将提供该类的实现。)

图 7. 创建 BatchRecordProcessor 类
创建 BatchRecordProcessor 类
  1. 单击 AlgorithmsCheckpoint Algorithm 行中的 Add(参见图 6),创建一个新的检查点算法实现。在 Checkpoint Algorithm 对话框上(参见图 8),指定以下设置:
    • Namecheckpoint
    • PatternRecord Based
    • recordcount100(这表示在我们的示例应用程序中每 100 条记录之后产生一个检查点。如果作业意外终止,我们可从所产生的最新检查点重新启动它)。

    单击 Finish

图 8. 选择检查点算法
选择检查点算法
  1. 单击 AlgorithmsResult Algorithm 行中的 Add(参见图 6),创建一个新的结果算法实现。在 Result Algorithm 对话框上(参见图 9),指定以下设置:
    • Namejobsum
    • PatternJob Sum(这是默认的结果算法实现,它累加各个作业步骤的返回代码。作业总和为 0 表示作业已正确结束)。

    单击 Finish

图 9. 选择结果算法
选择结果算法
  1. 完成的 Batch Job 对话框如图 10 所示。单击 Next 继续创建输入流。
图 10. 创建批处理步骤
创建批处理步骤
  1. 创建一个输入流,指定如图 11 所示的以下设置:
    • NameinputStream(不要更改此名称)
    • PatternJDBC Reader
    • ds_jndi_namejdbc/employeedbxa
    • Pattern Implementation Class (PATTERN_IMPL_CLASS):com.trial.Employee.EmployeeJDBCReader

    将所有 Optional Properties 的值设置为 true。添加一个名为 STATES_LIST 的新的可选属性,为它分配值 AK,AL。单击 Next

图 11. 创建输入流
创建输入流
  1. 通过指定以下数据来创建一个新的输出流(参见图 12):
    • NameoutputStream
    • PatternJDBC Writer
    • PATTERN_IMPL_CLASScom.trial.Employee.EmployeeJDBCWriter

    添加一个名为 ds_jndi_name 的新可选属性,将它的值指定为 jdbc/employeedbxa。将 batchInterval 保留为空并将所有其他可选属性设置为 true。单击 Finish

注:GenericXDStep 要求该名称为 outputStream。不要更改此默认值。

图 12. 创建输出流
创建输出流
  1. 我们几乎已完成了。Rational Application Developer 自动在 xJCL 目录下创建一个 Employee.xml 文件。这是一种基于 XML 的作业控制语言(下文简称 xJCL),它定义了批处理作业。我们在上述屏幕中填入的大部分参数都用于创建 xJCL。部署批处理应用程序后,我们需要 xJCL 来调用该应用程序。我们创建了 3 个空类 BatchProcessorInputStreamOutputStream。在下一页中,我们将尝试实现上述类的一些核心方法。

批处理应用程序的组件

我们现在详细看看所创建的类,以及它们如何融入批处理体系结构中。(参见 “下载” 一节获取这些文件的源代码。)

  • EmployeeBatchProcessor
  • EmployeeJDBCReader
  • EmployeeJDBCWriter

一个事务性批处理应用程序中的一个作业步骤通常包含一个提供输入记录的 InputStream (EmployeeJDBCReader)、一个执行业务逻辑的 BatchRecordProcessor (EmployeeBatchProcessor) 和一个用于写入输出数据的 OutputSteam (EmployeeJDBCWriter)(参见图 13)。

图 13. 示例应用程序的组件
示例应用程序的组件

在这个示例应用程序中,我们使用了 Compute Grid 所提供的一般性批处理步骤 (GenericXDBatchStep)。Compute Grid 运行时调用 GenericXDBatchStep 中定义的方法,后者又调用 EmployeeBatchProcessor 中的方法(参见图 14)。一个一般性批处理步骤有一个输入和一个输出流。在批处理循环的每次迭代期间,它从批处理数据输入流读取一个条目 (EmployeeJDBCReader) 并将其传递给 BatchRecordProcessor(EmployeeBatchProcessor) 进行处理。

EmployeeJDBCReader

EmployeeJDBCReader 实现 JDBCReaderPattern,由批次容器 (Batch Container) 用于获取输入数据。在清单 1 中的代码段中,批次容器执行查找查询来获取输入记录。批次容器通过 BDSJDBCReader 调用 EmployeeJDBCReader 中编写的所有方法,逐个将输入记录提供给 EmployeeBatchProcessor。(图 14 显示了不同组件的交互。)请参阅本文附加的源代码,了解其他方法实现。有关不同方法和用途的更多细节,请参阅 使用 WebSphere Extended Deployment Compute Grid 的批处理编程简介

清单 1. EmployeeJDBCReader
protected static final String SELECT_CLAUSE =
		"SELECT name,address,city,state,zipcode,email,employeeID,
		phone,annualIncome,lastOfferDate	FROM EMPLOYEESCHEMA.EMPLOYEE ";
		.
		.
		.
public String getInitialLookupQuery() {
		
		String query = SELECT_CLAUSE;
		if ( statesList != null ) {
			query += " WHERE state in ("+statesList+") ";
		}
		query += " ORDER BY EMPLOYEEID";
		return query;
		}

EmployeeBatchProcessor

EmployeeBatchProcessor 是这个示例应用程序中主要的处理单元。它从 EMPLOYEE 表接收员工记录并将它传递给批次容器以持久化数据。清单 2 中的代码段来自 EmployeeBatchProcessorprocessRecord 方法。网格端点调用 GenericXDBatchStep 中定义的 processJobStep 方法,后者又调用 EmployeeBatchProcessorprocessRecord 方法(参见图 14)。您可改写此方法,放入要为每条记录执行的自定义逻辑。在这个示例应用程序中,我们仅输出出员工名称,然后返回员工对象以持久化到输出流中。

清单 2. EmployeeBatchProcessor
public Object processRecord(Object obj) throws Exception
{
        Employee employee = (Employee)obj;
        System.out.println("Employee Name:"+employee.getName());
        return employee;
         
    }

EmployeeJDBCWriter

EmployeeJDBCWriter 实现 JDBCWriterPattern,由批次容器用于将已处理的记录写入输出流中。清单 3 中的代码段来自 EmployeeJDBCWriter,其中显示了 getSQLQuery 方法(用于获取查询)和 writeRecord 方法(用于设置 Prepared Statement 的值)。网格端点通过 BDSJDBCWriter 调用这些方法。(参见图 14。)

清单 3. EmployeeJDBCWriter
 protected String tableName = "EMPLOYEESCHEMA.EMPLOYEEOUTPUT";
	protected String sqlQueryPreTablename="INSERT INTO ";
	protected String sqlQueryPostTablename=" VALUES (?, ?)";
	
	 
	public String getSQLQuery() {
		String query=this.sqlQueryPreTablename+this.tableName+
		this.sqlQueryPostTablename;
		System.out.println("Query is "+query);
		return query;
		// TODO Auto-generated method stub
	 
	}

	
	public PreparedStatement writeRecord(PreparedStatement pstmt, Object record) {
		
		if(record instanceof Employee)
		{
			try
			{
			System.out.println("Writing the Employee record
			into the output table");
			Employee employeerecord=(Employee)record;
			pstmt.setString(1,employeerecord.getName());
			pstmt.setString(2,employeerecord.getState());
			}catch(SQLException sqle)
			{
				System.out.println("Exception while making
             the prepared Statement");
				sqle.printStackTrace();
				
			}
			
		}
		else
		{
			System.out.println("Record is not a instance of
			the Employee");
		}
		
		
		// TODO Auto-generated method stub
		return pstmt;
	}
图 14. 批处理内部体系结构
批处理内部体系结构

我们已完成了第一个批处理应用程序。作为第一步,您可将此应用程序安装在现有的 WebSphere Application Server Network Deployment 环境中。该作业可使用 Rational Application Developer 所生成的 xJCL 提交。示例应用程序从 EMPLOYEE 表选择所有属于州 AK,AL 的员工,将它们插入 EMPLOYEEOUTPUT 表中。有关如何安装该应用程序和提交作业的详细信息,请参阅部署和运行应用程序 一节。惟一缺少的特性是并行性。我们现在所需做的只是添加某种并行作业功能。Compute Grid 提供了一个 Parallel Job Manager 工具来完成此任务,还提供了 SPI 来自定义并行作业的运行方式。

改写 SPI

Compute Grid 提供了一些系统编程接口 (SPI),它们可在您的批处理作业中用于自定义您的高级并行应用程序设计:

SPI用途
ParameterizerParameterizer SPI 使用 xJCL 中传递的信息帮助将作业拆分为次要作业。
SubJobAnalyzerParallel Job Manager(PJM) 调用 SubJobAnalyzer 来计算作业的返回代码
SubJobCollector当产生一个检查点时,会调用 SubJobCollector SPI 来收集有关次要作业的相关州信息
SynchronizationPJM 在所有次要作业到达最后的州时调用 Synchronization SPI

尽管示例应用程序改写了除 Parameterizer 外的所有 SPI,但是这些 SPI 只写入 System Out 中,以在运行并行作业时帮助跟踪事件流。这个示例应用程序有大约 10,000 个属于不同州的员工记录。使用 Parameterizer,我们将州总数除以并行作业数量,将一组惟一的州提供给每个次要作业。请参见清单 4 中的代码段。整段示例代码都可下载获得。

清单 4. EmployeeParameterizer
 public Parameters parameterize(String logicalJobName, String logicalTXID,
			Properties props) {
	 		int jobcount = Integer.valueOf(props.getProperty(
			"parallel.jobcount","1"));
    		Parameters parms = new Parameters();
		    parms.setSubJobCount(jobcount);
		Properties newprops [] = new Properties[jobcount];
		for ( int i=0;i<jobcount;i++ ) 
		{
		    newprops[i] = new Properties();
			String stateList=splitup(i,jobcount);
			newprops[i].put("STATES_LIST", stateList);
		}
		parms.setSubJobProperties(newprops);
		return parms;
	}
注:
导入 SPI 之前,确保 Rational Application Developer 的 Java 构建路径包含 com.ibm.ws.batch.runtime.jar。这一步很有必要,因为 Rational Application Developer v8 还不支持 WebSphere Compute Grid v8 的并行特性。您可在 $WAS_HOME/stacked_products/WCG/plugins 中找到该 jar 文件。

导入 EmployeeRecordProcessorEmployeeJDBCReaderEmployeeEmployeeJDBCReader 后,您的工作区应类似于图 15。

图 15. RAD 工作区
RAD 工作区

修改 xJCL 以包含这些 SPI

将清单 5 中所示的代码(忽略第一行,它是一个指针)添加到您的 XJCL 中,放在 Employee.xml 中紧挨 jndi-name 标记之后。这可帮助 ComputeGrid Runtime 决定并行作业数量和子作业在 JVM 中的位置。

清单 5. Employee.xml
<jndi-name>ejb/EmployeeBatchController</jndi-name>
<run instances="multiple" jvm="multiple"><props>
 <prop name="com.ibm.websphere.batch.parallel.parameterizer"
 value="com.trial.Employee.spi.EmployeeParameterizer"/>
 <prop name="com.ibm.websphere.batch.parallel.synchronization" 
 value="com.trial.Employee.spi.EmployeeTXSynchronization"/>
 <prop name="com.ibm.websphere.batch.parallel.subjobanalyzer"
 value="com.trial.Employee.spi.EmployeeSubJobAnalyzer"/>
 <prop name="com.ibm.websphere.batch.parallel.subjobcollector" 
 value="com.trial.Employee.spi.EmployeeSubJobCollector"/>
 <prop name="com.ibm.wsspi.batch.parallel.subjob.name" 
 value="EmployeeSampleSubJob" />
 <prop name="parallel.jobcount" value="2" />
 </props>
 </run>

另外,修改 STATES_LIST 属性标记(参见清单 6),让 Parameterizer 能够确定每个并行作业需要处理的州列表

清单 6. Employee.xml
<prop name="STATES_LIST" value="${STATES_LIST}" />

将清单 7 中的代码段(同样忽略第一行,因为它是一个指针)添加到 prop 标记下的 BATCHRECORDPROCESSOR 旁边。这些是 PJM 的必填字段。

清单 7. Employee.xml
 <prop name="BATCHRECORDPROCESSOR" value="com.trial.Employee.EmployeeBatchProcessor"/>
<prop name="com.ibm.wsspi.batch.parallel.jobname" value="${parallel.jobname}" />
<prop name="com.ibm.wsspi.batch.parallel.logicalTXID" value="${logicalTXID}" />
<prop name="com.ibm.wsspi.batch.parallel.jobmanager" value="${parallel.jobmanager}" />

将 EAR 从 Rational Application Developer 工作区导出以进行部署,保存 Employee.xml 文件以用于提交作业。

WSBatchPackager 实用程序

作为一种备选方法,我们可使用该产品所提供的 WSBatchPackager 实用程序来封装来自 POJO 类的应用程序。它可在 $WAS_HOME/stacked_products/WCG/bin 下找到。本应用程序中使用的 POJO 类包含 SPI 文件、EmployeeJDBCReaderEmployeeJDBCWriterEmployeeEmployeeBatchProcessor。您可将这些类封装在 pojoclasses.jar 中。清单 8 展示了 WSBatchPackager 实用程序的用法。该实用程序可以在您没有 Rational Application Developer 时用作批处理应用程序开发的一种备选方法。WSBatchPackager 生成的 Enterprise Archive (EAR) 文件类似于我们从 Rational Application Developer 生成的文件。

清单 8. 用法
WSBatchPackager.bat -appname =EmployeeSampleV8EAR 
-jarfile=C:\dwarticle\pojoclasses.jar -earfile=EmployeeSampleV8EARPOJO.ear

部署和运行应用程序

  1. 将 Employee Batch Application 部署到 WebSphere Application Server 集群中。继续使用默认设置,因为应用程序中已提供了资源引用的映射。
注:
部署该应用程序期间,确保 JDK 为 1.5 或更高版本。在 EJB Deploy Options 下选择 1.5 和更高的 JDK 兼容版本。
  1. 单击 URL http://schedulerhostname:defaulthostport/jmc 以访问 SchedulerCloneJob Management Console
  2. 通过该控制台提交 xJCl(参见图 16)
图 16. Job Management Console
Job Management Console
  1. 单击 View Jobs(参见图 17)
图 17. 监视作业
监视作业
  • 依据我们在 xJCL 中提供的并行作业数量设置,我们可以注意到生成了 3 个次要作业,它们在不同的 JVM 中彼此独立地运行(参见图 17)。

结束语

WebSphere Compute Grid 和 Rational Application Developer 8 大大简化了 Compute Grid 批处理程序的构建。使用 Compute Grid 所提供的 PJM 工具,我们能够最优地使用资源,在不同 JVM 上并行执行作业并使解决方案更容易扩展。另外,使用 WSBatchPackager,我们能够轻松地从 POJO 类生成批处理 EAR 文件。本文详细介绍了如何有效地使用 ComputeGrid 和 Rational Application Developer 8 创建您的第一个并行批处理程序。

致谢

感谢 IBM 的 WebSphere Batch 架构师 Christopher Vignola 审阅本文和提供宝贵建议。


下载

描述名字大小
实用的示例应用程序EmployeeSampleV8EAR.zip17KB
在 DB2 数据库中创建表的 DDLEmployeeallrecords.zip564 KB
完成的 XJCL 文件Employee.xml4 KB

参考资料

学习

获得产品和技术

讨论

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=WebSphere
ArticleID=945939
ArticleTitle=创建一个简单的 Compute Grid 并行批处理应用程序
publish-date=09232013