IBM®
跳转到主要内容
    中国 [选择]    使用条款
 
 
Select a scope: Search for:    
    首页    产品    服务与解决方案     支持与下载    个性化服务    
跳转到主要内容

developerWorks 中国  >  Grid computing | Java technology  >

使用 OGCE 向网格资源提交作业

快速上手 Open Grid Computing Environment 并实现自己的 Java 作业提交客户机

developerWorks
文档选项

未显示需要 JavaScript 的文档选项


级别: 初级

Jean-Luc Lepesant (lepesant@fr.ibm.com), 高级开发人员
Sébastien Fibra (fibra@fr.ibm.com), IT 专家

2005 年 4 月 26 日

IBM 构建了一个自己的网格来连接分布在三个大陆的设计中心。在过去几年里,我们使用了不同版本的 Globus Toolkit(GT):GT2、GT3 和将要发布的 GT4。但是这增加了那些需要向基于 Globus 的网格资源提交作业的客户机程序的复杂性。为了解决这个问题,网格开发团队想出了一种灵活的方法,使用 Java Commodity Grid(CoG)Kit 和 Open Grid Computing Environment(OGCE)来创建作业提交客户机,这可以帮助开发人员在 GT2、GT3 和 GT4 之间搭起一座桥。

IBM 构建的网格以及碰到的问题

网格计算就是要为那些位于不同地点、基于不同架构并且属于不同管理领域的资源实现共享。为了在全球范围内展示这些特性,并且对这个主题进行实验,IBM 创建了自己的网格。IBM Solutions Design Grid(ISDG)将位于三个大陆上的 IBM Design Center 连接起来,其目标是在全球的 IBM Design Centers for On Demand business 之间共享资源并促进协作。这个项目是与 IBM WebAhead 团队一起紧密协作开发出来的,它是网格技术的一次试验,也是网格技术的一个原型。

网格系统的一项主要功能是能够将作业提交给异构资源。这些作业是在远程服务器上运行的一些二进制可执行文件或命令。为了让这些作业能够在 ISDG 上运行,必须提前在计算节点上安装 Globus toolkit。虽然我们曾经使用过其他专有技术,但是 Globus Toolkit 是开源的,因此,我们知道,从 ISDG 中衍生的架构更易于复制。

ISDG 以在 Apache Tomcat 下运行的 Web 应用程序为基础,允许用户访问服务。用户必须先在门户上进行注册,在允许向 ISDG 注册的网格资源提交作业之前,必须至少成为一个项目的成员。有关用户、项目、资源和资源池的信息,都保存在一个 DB2® 数据库中。应用程序通过 Web 服务与中央证书机构进行通信,以获得一个 x.509 证书,用该证书在用户与网格资源之间建立安全通信。


图 1. ISDG 上的作业提交
ISDG 上的作业提交

问题:处理异构的 Globus 资源

在过去的几年中,我们已经在 ISDG 上使用过几种版本的 Globus Toolkit(GT)。最初,ISDG 只能处理 GT2 资源。在发布 GT3 之后,为了能够与技术革新保持同步,我们对 ISDG 进行了更新,允许它访问 GT2 和 GT3 资源。GT3 是以网格服务(可以将它想像成 Web 服务的一个扩展)的概念为基础的。而即将发布的版本 4(GT4)是以 Web Services Resource Framework(WSRF)标准为基础的。

ISDG 所碰到的挑战是:如何向那些运行 Globus Toolkit 版本 2 和版本 3 的网格资源提交作业,而不关心这些版本的不同。Globus Resource Allocation Manager(GRAM)是 Globus Toolkit 提供的一款工具,它是远程程序执行的核心。GRAM 允许用户向 Globus 资源提交、监控和检索作业。但是每个版本的 Globus Toolkit 都有自己的 GRAM 实现。

让我们简要介绍一下 GT2 和 GT3 的 GRAM 作业提交。

GT2 GRAM 作业提交

要使用 GT2 远程运行一个作业,需要以根用户的身份在远程资源上运行一个 GRAM 网守(服务器),监听特定的端口。当 GRAM 客户机向远程节点发送作业请求时,就开始执行程序。作业的代码在远程资源环境中必须是可执行的,作业代码以及标准输入文件、标准输出文件、远程节点上使用的名称和端口,都是在作业请求中指定的,并且都是使用 Resource Specification Language(RSL) 编写的。一旦提交了作业请求,网守(gatekeeper)就会处理这个作业请求,这会为新作业派生出一个作业管理器,用于处理新作业的执行,以及与用户的通信。


图 2. GT2 GRAM 作业提交
GT2 GRAM 作业提交
GT3 GRAM 作业提交

可以将 GT3 GRAM 视为一组启用远程作业执行的 OGSA 兼容服务,具体地说:

  • Master Managed Job Factory Service(MMJFS)会一直运行,并等待作业提交者提交请求,从而创建一个 Managed Job Service 实例。
  • Managed Job Service(MJS)对来自 MMJFS 的请求进行举例说明,专用于某一个作业提交请求。
  • File Stream Factory Service(FSFS)与网格容器是一起启动的,它等待请求,并创建一个 File Stream Service 实例。
  • File Stream Service(FSS)负责处理输入数据登台计算机,并对特殊的作业提交进行作业输出重定向。

GT3 GRAM 由两个服务容器组成:Master Hosting Environment(MHE)和 User Hosting Environment(UHE)。MHE 会负责根据网格映射文件对 GRAM 请求进行授权,并启动 UHE。然后使用一个重定向程序将请求转发给 UHE 进程,该进程将创建一个 Managed Job Service(MJS)并运行这个作业。UHE 还会通知提交客户机作业的状态,并使用 FSS 将作业输出(例如标准输出文件和标准错误)重定向到客户机。该体系结构可以防止以根用户的身份运行之类的安全问题,并且更换为使用 setuid 程序,只允许启动一个使用用户帐号提前配置的 UHE。


图 3. GT3 GRAM 作业提交
GT3 GRAM 作业提交




回页首


OGCE 如何解决这个问题

什么是 OGCE?

Global Grid Forum(GGF)的 Open Grid Services Architecture(OGSA)计划为基于 Web 服务的、面向标准服务的网格框架定义了一个基础。OGSA 允许不同供应商使用各种技术提供网格服务实现,不过这些技术能够通过与 GGF 定义的 OGSA 标准兼容来实现互操作。GGF 中的一个研究组 Grid Computing Environment(GCE)重点研究了与客户端的网格开发工具有关的问题。它提供了一组工具集和用户界面来访问网格提供的服务和功能。为了解决并满足日益增长的网格用户社区的需求,GCE 正计划构建一个框架,该框架可以为客户端的网格开发提供一组标准的接口,这个接口类似于 OGSA 为服务器端的网格开发提供的接口。这种框架称为 OGCE,它允许网格社区进行以下操作:

  • 开发客户机应用程序,这些程序可以跨多个网格后端实现进行互操作。
  • 提供可重用的代码,以支持基本网格访问模式的快速原型设计。
  • 提供一种开源的、可扩展的架构,可以根据社区的反馈,整体地、递增地构建该架构。
  • 访问使用异构技术实现的相同接口集。


图 4. OGCE
OGCE

概念

为了实现这种互操作能力,OGCE 定义了很多基本的类来实现网格框架所需的基本功能。为了简便起见,本文并没有介绍每种类的属性和功能。相反,在接下来的几节中,我们将介绍 OGCE 中最重要的实体及其语义。

Task 接口

任务提供了对以网格为中心的一些操作的抽象。它表示了网格中最小的执行单元,例如远程机器上一次任务的执行、文件传输请求、资源请求和其他类似的网格请求。网格任务由任务名称、惟一的 ID、任务类型(作业、文件传输和请求)、任务规范、安全上下文、服务上下文、应用程序的状态以及输出和错误描述组成。它还可以为自己指定任意的属性,而且具有将自身编写成 XML 规范的能力。


清单 1. Task 接口
				
package org.globus.cog.core.interfaces;
import java.util.Calendar;

public interface Task
{
    public static final int JOB_SUBMISSION = 1;
    public static final int FILE_TRANSFER = 2;
    public static final int INFORMATION_QUERY = 3;

    public void setName(String name);
    public String getName();

    public void setIdentity(Identity id);
    public Identity getIdentity();

    public void setType(int type);
    public int getType();

    public void setProvider(String provider);
    public String getProvider();

    public void setSpecification(Specification specification);
    public Specification getSpecification();

    public void setSecurityContext(SecurityContext security);
    public SecurityContext getSecurityContext();

    public void setServiceContact(ServiceContact servicecontact);
    public ServiceContact getServiceContact();

    public void setStdOutput(String output);
    public String getStdOutput();

    public void setStdError(String error);
    public String getStdError();

    public void setStatus(Status status);
    public void setStatus(int status);
    public Status getStatus();

    public void setAttribute(String name, Object value);
    public Object getAttribute(String name);

    public void addStatusListener(StatusListener listener);
    public void removeStatusListener(StatusListener listener);

    public void addOutputListener(OutputListener listener);
    public void removeOutputListener(OutputListener listener);

    public void fromXML(String task);
    public String toXML();

    public void fromString(String task);
    public String toString();

    public boolean isUnsubmitted();
    public boolean isActive();
    public boolean isCompleted();
    public boolean isSuspended();
    public boolean isFailed();
    public boolean isCanceled();
    public Calendar getSubmittedTime();
    public Calendar getCompletedTime();
}

任务规范接口

每个网格任务都有一个相关的规范接口,专门用来描述该任务的目标以及为了实现这个目标所需的环境。TaskHandler 根据在任务规范接口中指定的参数对任务进行管理。任务规范是一个通用概念,可以分为 JobSpecification、FileSpecification 和 QuerySpecification(现在还没有实现)。任务规范接口中所需的具体参数取决于执行该任务所使用的底层网格实现。例如,GT3 需要使用多个 GT2 所不支持的参数(反之亦然)。然而,规范接口提供了一些通用的属性和方法,根据任务的需要和特定网格实现,可以对这些属性和方法进行扩充,也可以忽略它们。


清单 2. 规范接口
				
// This program is distributed under the following licence:
// http://www.globus.org/toolkit/download/license.html
package org.globus.cog.core.interfaces;

public interface Specification
{
  public static final int JOB_SUBMISSION = 1;
  public static final int FILE_TRANSFER = 2;
  public static final int INFORMATION_QUERY = 3;

  public void setType(int type);
  public int getType();

public void setSpecification(String specification);
public String getSpecification();
}

JobSpecification 中提到了执行远程作业所需的所有重要属性。JobSpecification 接口提供的大部分属性都与 Globus Toolkit 支持的 RSL 中提供的属性类似。不过,还可以根据特定的需要添加其他属性。

下图显示了如何对规范接口进行扩充,以便构建 JobSpecification 接口。我们可以添加一些诸如可执行程序名和参数之类的字段,来描述将要提交的作业。在提交作业时,这些字段是直接与 RSL 信息相关的。


清单 3. 作业规范接口
				
// This program is distributed under the following license:
// http://www.globus.org/toolkit/download/license.html
package org.globus.cog.core.interfaces;

import java.util.Enumeration;

public interface JobSpecification extends Specification
{
  public void setExecutable(String executable);
  public String getExecutable();

  public void setDirectory(String directory);
  public String getDirectory();

  public void setEnvironment(String environment);
  public String getEnvironment();

  public void setArguments(String arguments);
  public String getArguments();

  public void setStdOutput(String output);
  public String getStdOutput();

  public void setStdInput(String input);
  public String getStdInput();

  public void setStdError(String error);
  public String getStdError();
    
  public void setCount(int count);
  public Integer getCount();

  public void setBatchJob(boolean bool);
  public boolean isBatchJob();

  public void setRedirected(boolean bool);
  public boolean isRedirected();

  public void setLocalExecutable(boolean bool);
  public boolean isLocalExecutable();

  public void setAttribute(String name, String value);
  public String getAttribute(String name);
  public Enumeration getAllAttributes();
}

规范接口的另外一个扩展是 FileSpecification,在这里可以找到一些将一个文件从一台机器上传输到另外一台机器所需的参数,而不是作业信息。


清单 4. 文件规范接口
				
// This program is distributed under the following license:
// http://www.globus.org/toolkit/download/license.html

package org.globus.cog.core.interfaces;

public interface FileSpecification extends Specification {
  public void setSourceServer(String server);
  public String getSourceServer();
  public void setDestinationServer(String server);
  public String getDestinationServer();
  public void setSourceDirectory(String directory);
  public String getSourceDirectory();
  public void setDestinationDirectory(String directory);
  public String getDestinationDirectory();
  public void setSourceFile(String file);
  public String getSourceFile();
  public void setDestinationFile(String file);
  public String getDestinationFile();
  public void setSource(String source);
  public String getSource();
  public void setDestination(String destination);
  public String getDestination();
  public void setBinary(boolean bool);
  public boolean isBinary();
  public void setNotpt(boolean bool);
  public boolean isNotpt();
  public void setDcau(boolean bool);
  public boolean isDcau();
  public void setBlockSize(int size);
  public int getBlockSize();
  public void setTcpBufferSize(int size);
  public int getTcpBufferSize();
  public void setParallelStreams(int value);
  public int getParallelStreams();
  public void setThirdParty(boolean bool);
  public boolean isThirdParty();
  public void setAttribute(String name, Object value);
  public Object getAttribute(String name);
}

TaskHandler 接口

在创建某项任务及其规范接口之后,必须对它们进行处理。这就是 TaskHandler 的任务。可以将 Java CoG Kit 中的 TaskHandler 视为适配器,这些适配器将任务的抽象定义转化为后端网格服务所能理解的特定于实现的构造。例如,TaskHandler 对远程作业执行和文件传输请求的处理是不同的。GT3 TaskHandler 会从任务接口中提取出适当的属性,同时生成对远程网格服务工厂的必要调用,检索网格服务的处理,并与新创建的服务实例进行交互。直观上看,TaskHandler 是特定于后端实现的,它是惟一一个为了支持其他网格实现而需要进行扩展的 OGCE 类。由于 Java CoG Kit 支持 GT2、GT3 和 SSH 实现,因此也可以使用这些产品的适当处理程序。


清单 5. TaskHandler 接口
				
// This program is distributed under the following license:
// http://www.globus.org/toolkit/download/license.html

package org.globus.cog.core.interfaces;

import java.util.Enumeration;

import org.globus.cog.core.impl.common.ActiveTaskException;
import org.globus.cog.core.impl.common.IllegalSpecException;
import org.globus.cog.core.impl.common.InvalidSecurityContextException;
import org.globus.cog.core.impl.common.InvalidServiceContactException;
import org.globus.cog.core.impl.common.TaskSubmissionException;

  public interface TaskHandler {
  public static final int GENERIC = 1;
  public static final int GT2 = 2;
  public static final int GT3 = 3;

  public void setType(int type);
  public int getType();

  public void submit(Task task) 
    throws IllegalSpecException,
           InvalidSecurityContextException,
           InvalidServiceContactException,
           TaskSubmissionException;

  public void suspend(Task task)
    throws InvalidSecurityContextException, TaskSubmissionException;

  public void resume(Task task)
    throws InvalidSecurityContextException, TaskSubmissionException;

  public void cancel(Task task)
    throws InvalidSecurityContextException, TaskSubmissionException;

  public void remove(Task task) throws ActiveTaskException;

  public Task[] getAllTasks();
  public Enumeration getActiveTasks();
  public Enumeration getFailedTasks();
  public Enumeration getCompletedTasks();
  public Enumeration getSuspendedTasks();
  public Enumeration getResumedTasks();
  public Enumeration getCanceledTasks();
}

其他接口

Status 接口

每项任务都有一个相关的执行状态,例如 unsubmitted(未提交)submitted(已提交)active(活动)suspended(挂起)resumed(恢复)failed(失败)canceled(取消)completed(完成)。并非每个网格实现都支持所有的状态。例如,在有些网格实现中,可能并不支持远程执行的挂起和恢复状态。将一个简单的任务与某一个状态相关联是很简单的。最初,任务处于 unsubmitted 状态;当它被某个处理程序处理时,状态变为 submitted;然后,在远程执行期间,其状态变为 active,等等。客户机程序通过监听程序方法来保持作业进度获得更新。上面介绍的任务接口拥有一些处理状态监听程序的方法,这些方法分别是 addStatusListener(StatusListener listener)removeStatusListener(StatusListener listener)。客户机程序需要实现 StatusListener 来检索状态,这可以通过提供一个 statusChanged() 方法来实现。只要作业状态发生变化,这种方法就会自动被调用,并作为一个参数检索新的状态。


清单 6. Status 接口
				
// This program is distributed under the following license:
// http://www.globus.org/toolkit/download/license.html

package org.globus.cog.core.interfaces;

import java.util.Calendar;

  public interface Status {
  public static final int UNSUBMITTED = 0;
  public static final int SUBMITTED = 1;
  public static final int ACTIVE = 2;
  public static final int SUSPENDED = 3;
  public static final int RESUMED = 4;
  public static final int FAILED = 5;
  public static final int CANCELED = 6;
  public static final int COMPLETED = 7;

  public abstract void setStatus(int status);
  public abstract int getStatus();
  public abstract void setPrevStatus(int status);
  public abstract int getPrevStatus();
  public abstract void setException(Exception exception);
  public abstract Exception getException();
  public abstract void setMessage(String message);
  public abstract String getMessage();
  public void setTime(Calendar time);
  public Calendar getTime();
}

TaskGraph

除了执行单项任务之外,OGCE 还提供了 TaskGraph 的概念。TaskGraph 是用来表示任务间的复杂依赖关系的一个构造块。所有特别高级的应用程序都需要一些机制来执行客户端工作流,这些工作流根据用户定义的依赖关系来处理任务。因此,表示 TaskGraph 的数据结构汇集了一组 ExecutableObjects(任务和 TaskGraph)对象,并允许用户定义这些任务之间的依赖关系。从理论上说,TaskGraph 可以包含无数的层次结构。但它实际上受到特定系统上可用资源(内存)的限制。





回页首


Globus Java CoG Kit 中的 OGCE 实现

Java CoG Kit 的当前版本提供了对 GT2、GT3 和 SSH 实现的支持。其他一些平台也将受到支持,这取决于可用的资源。下图显示了目前实现的一些类的结构。它们被封装在 org.globus.cog.core.impl 包中。


图 5. OGCE 实现
OGCE 实现

GT2 任务实现

GT2 的实现包括用于作业提交和文件传输的任务处理程序。GT2 的作业提交 TaskHandler 是以 GT2 GRAM 的客户机 API 为基础,这些 API 是随 Java CoG Kit 一起提供的,位于包 org.globus.gram.GramJob 中,此外,这些任务处理程序使用 GT2 RSL 格式来描述作业参数。GT2 RSL 的核心语法是将属性名与值关联的关系。GT2 的 FileTransferTaskHandler 使用 url-copy 将 sourceURL 中列出的文件复制到 destURL 指定的位置,使用 Global Access to Secondary Storage(GASS)传输 API。

GT3 任务实现

在 GT3 中,OGCE 也为作业提交和文件传输提供了处理程序。正如前面所介绍的,GT3 作业提交使用 ManagedJob 服务将作业提交给目标资源。GT3 的 JobSubmissionTaskHandler 使用 Java CoG Kit 的 org.globus.ogsa.impl.base.gram.client 包中提供的 GramJob 类。GT3 中使用的 RSL 格式与 GT2 中使用的 RSL 格式类似,但是它遵守 XML 的语法。GT3 的文件传输 TaskHandler 是以 Reliable File Transfer Service(RFT)为基础,RTF 是一个基于 OGSA 的服务,它为控制和监视第三方使用 GridFTP 服务器进行文件传输的进程提供了一些接口。控制文件传输的客户机位于网格服务内部,这样就可以使用软件状态模型进行管理,并使用在所有网格服务中都可以使用的 ServiceData 接口进行查询。

SSH 任务实现

OGCE 还提供了一个基于 SSH 的 JobSubmissionTaskHandler 实现。OGCE 还使用了 SSHTools 包,这是一组 Java SSH 应用程序,其中包括 Java SSH API、SSH Terminal、SSH secured VNC 客户机、SFTP 客户机和 SSH 守护进程。使用 OGCE SSHTask,客户机程序可以连接到任何 SSH 服务器上,并在远程主机上执行命令。下图显示了 SSHTask 的用法。


清单 7. 使用 SSHTask
				
package org.globus.cog.core.examples.ssh;

import org.globus.cog.core.impl.common.StatusEvent;
import org.globus.cog.core.impl.common.task.IllegalSpecException;
import org.globus.cog.core.impl.common.task.InvalidSecurityContextException;
import org.globus.cog.core.impl.common.task.InvalidServiceContactException;
import org.globus.cog.core.impl.common.task.JobSpecificationImpl;
import org.globus.cog.core.impl.common.task.SecurityContextImpl;
import org.globus.cog.core.impl.common.task.ServiceContactImpl;
import org.globus.cog.core.impl.common.task.TaskImpl;
import org.globus.cog.core.impl.common.task.TaskSubmissionException;
import org.globus.cog.core.impl.ssh.TaskHandlerImpl;
import org.globus.cog.core.interfaces.JobSpecification;
import org.globus.cog.core.interfaces.ServiceContact;
import org.globus.cog.core.interfaces.Status;
import org.globus.cog.core.interfaces.StatusListener;
import org.globus.cog.core.interfaces.Task;
import org.globus.cog.core.interfaces.TaskHandler;

public class JobSubmission implements StatusListener {
  private Task task;

  public JobSubmission() {
     prepareTask();
     submitTask();

     boolean done = false;
     while (!done) {
        //check status every seconds
        try {
           Thread.sleep(1000);
        }
        catch (Exception e) {
           e.printStackStrace();
        }
        String status = task.getStatus().getStatusString();
        if (status.equalsIgnoreCase("completed")) {
           System.out.println("ssh stdout: " + task.getStdOutput());
           done = true;
        }
        else if (status.equalsIgnoreCase("failed")) {
           System.out.println(ssh sterr: " + task.getStdError());
           done = true;
        }
     }
  }

  private void prepareTask() {
     this.task = new TaskImpl("mySSHTask", Task.JOB_SUBMISSION);
     JobSpecification spec = new JobSpecificationImpl();
     spec.setExecutable("/bin/ls -al");
     spec.setRedirected(true);
     this.task.setSpecification(spec);

     SecurityContextImpl securityContext = new SecurityContextImpl();
     securityContext.setAttribute("ssh-username", "valid user name");
     securityContext.setAttribute("ssh-password", "valid user password");
     this.task.setSecurityContext(securityContext);
     ServiceContact service = new ServiceContactImpl();
     service.setIP("a valid ssh server IP address");
     this.task.setServiceContact(service);
     this.task.addStatusListener(this);
  }

  private void submitTask() {
     TaskHandler handler = new TaskHandlerImpl();
     try {
        handler.submit(this.task);
     }
     catch (InvalidSecurityContextException ise) {
        ise.printStackTrace();
        System.exit(1);
     }
     catch (TaskSubmissionException tse) {
        tse.printStackTrace();
        System.exit(1);
     }
     catch (IllegalSpecException ispe) {
        ispe.printStackTrace();
        System.exit(1);
     }
     catch (InvalidServiceContactException isce) {
        isce.printStackTrace();
        System.exit(1);
     }
  }

  public void statusChanged(StatusEvent event) {
     Status status = event.getStatus();
     System.out.println("Status changed to " + status.getStatusString());
  }

  public static void main(String arg[]) {
     new JobSubmission();
  }
}





回页首


在 IBM Solutions Design Grid Web 应用程序中使用 OGCE

获取作业参数

IBM Solutions Design Grid Web 应用程序中包括一组 JSP,这些 JSP 提供了指定作业参数的可能。


图 6. 作业参数
作业参数

完成这个过程之后,提交者就可以为作业选择 GT2 或 GT3 目标资源,二者没有任何区别。


图 7. 网格资源的选择
网格资源的选择

创建任务

使用上面的 JSP 提供作业参数之后,用户就可以单击按钮来提交任务,这会触发对 IBM Solutions Design Grid Web 应用程序中的 servlet 的调用。这个 servlet 使用了一个 JobSubmitter 类(充当创建 Task 的客户机程序)为任务准备 JobSpecification,然后使用 TaskHandler 来提交任务。当调用这个 servlet 时,控制权就交给 submitOgceJob 方法,该方法接收一个 JobInfo 对象作为参数,并创建 TaskHandlerJobSubmitter 对象。JobInfo 是一个 JavaBean 对象,它包含用于作业的所有输入参数,以及在作业提交期间收集到的其他所有信息。

注意:下面的代码示例都使用了 T 类,这是一个与 log4j 类似的日志系统。它提供了一些有用的特性来跟踪 Java 方法调用流,以便更好地解释在运行时发生了什么。


清单 8. 创建任务
				
/****************************************************
 * 
 * Submit O G C E job.
 * 
 ****************************************************/
private void submitOgceJob(JobInfo job)
	throws
		IllegalSpecException,
		InvalidSecurityContextException,
		InvalidServiceContactException,
		TaskSubmissionException {

	T.logEntry();

	TaskHandler taskHandler = new GenericTaskHandler();
	JobSubmitter submitter = new JobSubmitter();

	//Retrieve target resource type from our database (could be GT2/GT3)
	GridResource resource =
		Db.getResourceInfoByName(job.getTargetResource());
	if (resource == null) {
		T.log('e', "RESOURCE: " + job.getTargetResource() + " NOT FOUND");
		return;
	}
	job.setTargetResourceType(resource.getType());

	// Create OGCE task 		
	Task task = submitter.createTask(user, job);

	//Save job id in our JobInfo object
	long id = task.getIdentity().getValue();
	job.setGlobusJobID(String.valueOf(id));

	// Save the job submit time in our JobInfo object
	String submitTime = String.valueOf(System.currentTimeMillis());
	job.setStartTime(submitTime);

	// Submit the job
	T.log('i', "Submitting job");
	submitter.submitTask(task, taskHandler);

	// Get the status just after submit
	String status = subDb.getJobStatus(job.getGlobusJobID());
	job.setStatus(status);

	T.logExit();
	return;
}

JobInfo 对象的属性之一是目标资源主机名。根据这条信息,Web 应用可以从数据库中检索出资源的类型(GT2 或 GT3),所有的网格资源事先都已经在这个数据库中注册,如下所示:

GridResource resource =
	Db.getResourceInfoByName(job.getTargetResource());

现在,servlet 就有了足够的信息来创建适当的 TaskTaskSpecification,并使用通过下面这几行代码调用的 JobSubmitter 来提交作业:
	Task task = submitter.createTask(user, job);
	submitter.submitTask(task, taskHandler);

在提交作业的第一个阶段中,该作业的启动时间被存储在 JobInfo 对象中。

JobSubmitter

JobSubmitter 类是作业提交者客户机程序的核心。它拥有一些创建任务的方法、为任务设置 JobSpecification 的方法、提交作业的方法,以及用来一直监听已提交作业的状态的方法。下面这段伪码对 JobSubmitter 类提供的特性进行了高层概述。


清单 9. JobSubmitter 类概述
				
package com.ibm.gdc.gt3.jobs;

/**
 * Submit a job to a Grid resource using OGCE classes
 * provided by the Java CogKit 2.0
 * @author Montpellier Pssc
 */
public class JobSubmitter implements StatusListener {

   /**
    * Create Task
    */
   public Task createTask(ISDGUser user, JobInfo job) {
      1. Create a JobSubmission task object
      2. According to the target resource type, call 
      prepareGT2Task() or prepareGT3Task() method 
      to set task specification.
      3. Return the created task object
   }

   /**
    * Set GT3 Task specifications
    */
   private void prepareGT3Task(Task task, JobInfo job, ISDGUser user) {
      1. Create a TaskSpecification object
      2. Set Task specification values like executable name and parameters.
      3. Set GT3 Task security context
      4. Set GT3 Task contact (MasterForkManagedJob grid service reference)
      5. Add Task status listener
   }

   /**
    * Set GT2 Task specifications
    */
   private void prepareGT2Task(Task task, JobInfo job, ISDGUser user) {
      1. Create a TaskSpecification object
      2. Set Task specification values like executable name and parameters.
      3. Set GT2 Task security context
      4. Set GT2 Task contact (Target resource host name)
      5. Add Task status listener
   }

   /**
    * Submit GT2/GT3 task
    */
   public void submitTask(Task task, TaskHandler handler) {
      1. Use the TaskHandler to submit the job (This is common to GT2 and GT3)
      2. Handle exceptions.
   }

   /**
    * Convert GlobusCredential to GSSCredential
    */
   private GSSCredential globus2gssCredential(ISDGUser user) {
      1. Utility method to convert credential from GT2 to GT3 format
      2. Return GT3 credential (GSSCredential object)
   }

   /**
    * Task status listener
    * (This method is mandatory because the JobSubmitter class implements StatusListener.)
    */
   public void statusChanged(StatusEvent event) {
      1. Receive the new status event and retrieve the calling Task.
      2. Save the job status in the database
      3. If the status is COMPLETED, retrieve the job 
      output from the Task and save it in the database.
   }

}

准备任务和规范

现在,让我们来看一下提交 GT3 作业时在 JobSubmitter 中发生了什么。createTask() 方法接收两个参数:ISDGUser 和 JobInfo。前者包含有关用户的信息,这些信息保存在 HTTP 会话中,因此,在整个 HTTP 请求过程中都可以访问。用户信息包含提交作业所需的用户名、名字、电子邮件地址、代理证书(凭证)等。后者包含了有关已提交作业的数据。正如下图所描述的那样,createTask 方法充当了逻辑开关,根据目标资源的类型来创建和准备一个 GT2 或 GT3 任务规范。

public Task createTask(ISDGUser user, JobInfo job) {
   T.logEntry();

   // Create a job submission task object
   Task task = new TaskImpl(job.getJobName(), Task.JOB_SUBMISSION);

   if (job.getTargetResourceType().equalsIgnoreCase("GT2")) {
      prepareGT2Task(task, job, user);
   }
   else if (job.getTargetResourceType().equalsIgnoreCase("GT3")) {
      prepareGT3Task(task, job, user);
   }
   else {
      task = null;
      T.log('e', "Invalid Task Provider: " + job.getTargetResourceType());
   }
   T.logExit();
   return task;
}

下面这段代码在资源类型是 GT3 时才会被执行。注释解释了每个准备步骤在做些什么。


清单 10. 将 GT3 JobSpecification 附加到一项任务上
				
/**
 * GT3 Task initialization
 * Prepare task using received job and user data.
 */
private void prepareGT3Task(Task task, JobInfo job, ISDGUser user) {
   T.logEntry();
   
   // Set the service provider type for this task. This is used at
   // submission time by the generic task handler to determine the class 
   // name of the TaskHandler to be loaded.
   task.setProvider("GT3");
   
   // Create a Specification object for the task
   JobSpecification spec = new JobSpecificationImpl();
   
   T.log('d', "Setting GT3 task specification");
   // Specify the command to be executed on the Grid resource.
   // The received job object contains all job information
   // entered by the user.
   spec.setExecutable(job.getCmdName());
   T.log('d', "Task executable: [" + spec.getExecutable() + "]");
   
   // Set location of the executable.
   // If the executable resides on the client machine, it will be 
   // automatically staged from the client machine to the Grid resource.
   if (job.getStageExecutable() != null) {
      if (job.getStageExecutable().equalsIgnoreCase("true")) {
         spec.setLocalExecutable(true);
       }
   }
   
   // Set the arguments (if any) for the executable.
   if (job.getCmdArgs() != null && job.getCmdArgs().length() > 0) {
      spec.setArguments(job.getCmdArgs());
      T.log('d', "Task arguments: [" + spec.getArguments() + "]");
   }
   // Specify the Grid resource working directory in which
   // the command is to be executed.
   if (job.getCmdDir() != null && job.getCmdDir().length()> 0) {
      spec.setDirectory(job.getCmdDir());
      T.log('d', "Task directory: [" + spec.getDirectory() + "]");
   }
   
   // Specify environment variable to be set prior to the
   // execution of the command
   if (job.getCmdEnv() != null && job.getCmdEnv().length() > 0) {
      spec.setEnvironment(job.getCmdEnv());
      T.log('d', "Task environment: [" + spec.getEnvironment() + "]");
   }
   
   // Specify the name of the file from which the input must be retrieved.
   if (job.getStdIn() != null) {
      if (job.getStdIn().length() > 0) {
         spec.setStdInput(job.getStdIn());
      }
   }
   
   // Specify the name of the file where the output must be redirected to. 
   // If left blank or not specified, the output is streamed to the 
   // standard output.
   if (job.getStdOut() != null) {
      if (job.getStdOut().length() > 0) {
         spec.setStdOutput(job.getStdOut());
      }
   }
   
   // Specify the name of the file where the error messages must be 
   // redirected to. If left blank, the errors are streamed to the 
   // standard error.
   if (job.getStdErr() != null) {
      if (job.getStdErr().length() > 0) {
         spec.setStdError(job.getStdErr());
      }
   }
   
   //Specify if the output and error streams are to be redirected
   //to the client. (This field is set only if both stdOut and stdErr
   // have not been set by the user)
   if ((spec.getStdOutput() == null || spec.getStdOutput().length() == 0)) {
      spec.setRedirected(true);
   }
   else {
      spec.setRedirected(false);
   }
   
   // Specify if the remote execution is to occur in batch mode.
   // If true, the client will not be notified of any status changes 
   // on the server side. It is the responsibility of the client to 
   // obtain the final output from the server by some
   // asynchronous mechanism. 
   // If false, the client will interactively receive status notifications
   // from the server as well as the final output or error.
   spec.setBatchJob(
       (job.getBatch().equalsIgnoreCase("true")) ? true : false);
   T.log('d', "Task batch: [" + spec.isBatchJob() + "]");
   if (spec.isBatchJob()) {
      spec.setRedirected(false);
   }
   T.log('d', "Task redirected: [" + spec.isRedirected() + "]");
   
   // Attach above specifications to the task
   task.setSpecification(spec);
   
   // Convert GlobusCredential (the user proxy) to GSSCredential
   // The user proxy is part of the received ISDGUser object.
   T.log('d', "Converting user proxy to GSSCredential");
   GSSCredential cred = globus2gssCredential(user);
   
   // Set the Task security context
   T.log('d', "Setting GT3 task security context");
   org.globus.cog.core.impl.gt3.GlobusSecurityContextImpl securityContext =
   	new org.globus.cog.core.impl.gt3.GlobusSecurityContextImpl();
   securityContext.setCredentials(cred);
   task.setSecurityContext(securityContext);
   
   // Set the task contact. In GT3, it is the URL of the MasterForkJobFactoryService
   T.log('d', "Setting GT3 task contact");
   String serviceName =
        job.getTargetResource()
                + ":"
                + 8080
                + "/ogsa/services/base/gram/MasterForkManagedJobFactoryService";
   
   ServiceContact service = new ServiceContactImpl(serviceName);
   task.setServiceContact(service);
   
   // Add the status listener. The JobSubmitter itself (this) is specified as
   // status listener because it provides a statusChanged() method.
   T.log('d', "Adding  GT3 task status listener");
   task.addStatusListener(this);
   T.logExit();
  }

现在,让我们考虑一下这种情况:如果想将作业提交给 GT2 资源,而不是提交给 GT3 资源,会有什么区别。实际上,只有极少的一些东西会发生变化。在该例中,将调用 prepareGT2Task() 方法为 GT2 任务设置适当的规范。以下列表总结了上述代码中需要修改的地方。


清单 11. 设置 GT2 TaskSpecification
				
1. The provider name must be changed in order 
to use the right TaskHandler class at submission time.

   // Set the service provider type for this task
   task.setProvider("GT2");

2. The security context has a different type than GT3.

   // Set the task security context
   org.globus.cog.core.impl.gt2.GlobusSecurityContextImpl securityContext =
             new org.globus.cog.core.impl.gt2.GlobusSecurityContextImpl();
   securityContext.setCredentials(cred);
   task.setSecurityContext(securityContext);

3. With GT2, the contact is the host name of 
the target resource that the job is submitted to. 
This information exists in the JobInfo object (job) received as parameter. 

   // Set the task contact
   String serviceName = job.getTargetResource();
   ServiceContact service = new ServiceContactImpl(serviceName);
   task.setServiceContact(service);

4. All other task specification, executable name, 
parameters, etc. are set the same way as for GT3.

提交任务

JobSubmittersubmitTask() 方法使用前面已经准备好的任务和 TaskHandler 来提交任务。该方法对于 GT2 和 GT3 来说是通用的。


清单 12. 提交任务
				
/**
 * Submit GT2/GT3 task
 */
public void submitTask(Task task, TaskHandler handler) {
   T.logEntry();
   try {
      T.log('d', "Calling Taskhandler to submit the task");
      handler.submit(task);
   }
   catch (InvalidSecurityContextException ise) {
      T.log('d', "Security Exception");
      T.logException('e', ise);
   }
   catch (TaskSubmissionException tse) {
      T.log('d', "TaskSubmission Exception");
      T.logException('e', tse);
   }
   catch (IllegalSpecException ispe) {
      T.log('d', "Specification Exception");
      T.logException('e', ispe);
   }
   catch (InvalidServiceContactException isce) {
      T.log('d', "Service Contact Exception");
      T.logException('e', isce);
   }
   catch (Exception ex) {
      T.log('d', "Exception while submitting job");
      T.logException('e', ex);
   }
   T.logExit();
   }

作业状态的处理

正如在 JobSubmitter 类的高级描述中所看到的,statusChanged() 方法用来接收并处理作业的状态。在 GT3 中,一直跟踪作业的状态并不是一项简单任务。任务的状态通过网格服务分发给客户机程序(JobSubmitter 类),这个网格服务必须在网格服务容器中部署并运行(详细信息请参阅下文)。在这一节中,我们将来看一下最终接收作业状态的 statusChanged() 方法的代码。这个方法从检索到状态值的地方获得一个 StatusEvent 对象,并获得与这个状态相关的任务。通过将任务 ID 用作主键,该方法可以在数据库中更新作业状态。当检索到的状态是 COMPLETED 时,就从该任务中提取作业输出,并将其保存到数据库中。


清单 13. 处理作业状态
				
/**
 * 
 * Task status listener
 * 
 */
 public void statusChanged(StatusEvent event) {
   T.logEntry();
   Status status = event.getStatus();
   Task task = event.getSource();
   long id = task.getIdentity().getValue();
   switch (status.getStatus()) {
      case 0 :
         T.log('d', "Status changed to " + Const.UNSUBMITTED);
         Db.setJobStatus(String.valueOf(id), Const.UNSUBMITTED);
      break;
      case 1 :
         T.log('d', "Status changed to " + Const.SUBMITTED);
         Db.setJobStatus(String.valueOf(id), Const.SUBMITTED);
         break;
      case 2 :
         T.log('d', "Status changed to " + Const.ACTIVE);
         Db.setJobStatus(String.valueOf(id), Const.ACTIVE);
         break;
      case 3 :
         T.log('d', "Status changed to " + Const.SUSPENDED);
         Db.setJobStatus(String.valueOf(id), Const.SUSPENDED);
         break;
      case 4 :
         T.log('d', "Status changed to " + Const.RESUMED);
         Db.setJobStatus(String.valueOf(id), Const.RESUMED);
         break;
      case 5 :
         T.log('d', "Status changed to " + Const.FAILED);
         Db.setJobStatus(String.valueOf(id), Const.FAILED);
         break;
      case 6 :
         T.log('d', "Status changed to " + Const.CANCELED);
         Db.setJobStatus(String.valueOf(id), Const.CANCELED);
         break;
      case 7 :
         T.log('d', "Status changed to COMPLETED");
         Db.setJobStatus(String.valueOf(id), Const.COMPLETED);
         if (task.getStdOutput() != null)
            if (task.getStdOutput().length() > 0)
               Db.setJobOutput(String.valueOf(id), task.getStdOutput());
         break;
      default :
         T.log('d', "Status changed to INVALID VALUE !!");
         break;
   }
   T.logExit();
}

GT3 中的作业状态通知是在网格服务之间发生的。通知源服务会向一个或多个订阅该源服务的目标服务发送通知。GT3 通知与服务数据有着非常紧密的关系。更确切地说,接收通知的服务并没有订阅整个服务,而是只订阅源服务中的特定 Service Data Element(SDE)。下图显示了 MasterForkManagedJobFactoryService 如何使用 SDE 来表示作业的状态、客户机如何订阅它,以及如何发送通知。


图 8. GT3 状态通知
GT3 状态通知

GT3 OGCE JobSubmissionTaskHandler 使用随 Java CoG Kit 提供的 GramJob 来处理作业提交。这个类位于 org.globus.ogsa.impl.base.gram.client 包中,是 GT3 作业提交过程的关键元素。要提交这个任务,TaskHandler 需要创建一个 GramJob 实例,并调用请求方法。然后,GramJob 对象要遵循以下算法,来启动任务并接收任务通知:

  1. 创建远程服务 —— 在目标资源上获得对 MasterManagedJobFactory Service 的引用,并请求这个工厂创建一个 ManagedJob Service 实例来运行任务。ManagedJob Service 有一个 SDE,这个 SDE 将与作业状态一起不断被更新。
  2. 订阅状态通知 —— 在创建这个服务时,可以调用 GramJob 类的 bind() 方法来订阅这个状态 SDE。这是一个相当复杂的过程,包括以下步骤:
    • 创建服务容器 —— 该服务容器内嵌在运行 Axis servlet 的 HTTP 服务器中,处理通知所需的网格服务就是在这里部署的。
    • 创建并部署 NotificationSink 服务 —— 这是特定的网格服务,将调用该服务来接收来自远程资源的作业状态通告。
  3. 在远程资源上启动作业 —— 在客户端,所有东西都是为接收通知而建立的。GramJob 类通过调用远程 ManagedJobService 实例的 start() 方法来启动作业。
  4. 接收通知 —— 当目标资源上的作业状态发生变化时,NotificationSink 网格服务就会接收到一个调用,其中包含了实际的状态服务数据。然后,要调用 GramJob 对象的 deliverNotification() 方法来提取状态值,并将其传递给 JobTaskHandler,后者将调用 JobSubmitter 类的 statusChanged() 方法。

作业输出处理

作业输出使用了一个与状态通知不同的通信通道。Remote File Stream 服务使用 Global Access to Secondary Storage(GASS)来传输作业输出和错误。GASS 服务器在客户端运行,并利用远程服务处理数据传输。如果客户机接收来自服务器的输出,那么还需要使用 GASS 服务器。很明显,这不适合批处理模式的作业提交。

加以注释的 GT3 作业提交流程

本文的这个部分将展示在作业提交给 GT3 资源时,究竟会发生什么。下图是一个 T 类提供的跟踪文件的详细解释。我们在其中添加了一些注释,以解释使用 OGCE 类提交 GT3 作业过程中的一些主要步骤。这个跟踪文件的左列显示了当前时间,之后是一个使用方括号括起来的数字,该数字表示从上一次操作到现在经过的时间间隔(以毫秒为单位)。这说明了作业提交过程中每个步骤的持续时间。下一列是目前运行的线程的名称,最后一列是日志数据本身。T 类还显示了进入和退出 Java 方法调用的嵌套条目。


图 9. GT3 作业提交跟踪
GT3 作业提交跟踪

图 9. GT3 作业提交跟踪
GT3 作业提交跟踪




回页首


结束语

在本文中,我们展示了一组与用来向网格提交作业有关的简化机制,以及基于 Java 和 OGCE 来实现这些机制的一种方法。我们的目标是要共享一种创建作业提交客户机的灵活方法。您可以从上面介绍的代码中得到一些灵感,根据自己的需要开发新的 TaskHandler,同时确保它与其他实现的兼容性。当然,完整的作业提交客户机还应该能够处理基本作业提交系统的其他方面,比如安全性、认证、监视,等等,这些超出了本文的范围。有关这些问题的更多信息,请参阅 Globus Toolkit 服务,例如 OGSIhandleResolver、SecureContextEstablishmentService 和 AuthenticationService。注意,所有的网格服务调用都使用带有 XML 签名的 GSI Authentication。这种基于证书的加密机制是随 GSI 一起提供的,它为网格服务提供了身份验证、识别和一些不可否认的特性。有了这些知识,网格应用开发人员就可以更好地了解使用 OGCE 的内部作业提交步骤,并可以显著提高自己的最初开发速度。



参考资料



作者简介

Jean-Luc Lepesant

Jean-Luc Lepesant 是 IBM Design Center for On Demand Business 的一名高级开发人员,该中心位于法国蒙彼利埃的 Product and Solutions Support Center(PSSC)。Jean-Luc Lepesant 在应用开发方面拥有超过 25 年的经验。他所擅长的领域包括数据库管理、Web 应用和网格计算。他还为 IBM Globus Toolkit 3.0 Quick Start 的代码编写做出了很大的贡献。


Sebastien Fibra

Sébastien Fibra 是 Product and Solutions Support Center(PSSC)中的 IBM Design Center for On Demand Business 的一位 IT 专家。先前,他是巴黎的一家 Web 代理机构的资深技术顾问,于 2003 年加入 IBM,目前在法国蒙彼利埃领导着 Automotive Engineering Innovation Framework environment for EMEA 的部署工作。他还为 IBM 的 Products & Services for Grid Computing 的代码编写做出了很大的贡献,并拥有法国一家出名的工程学校的计算机科学/应用数学的硕士学位。




对本文的评价

太差! (1)
需提高 (2)
一般;尚可 (3)
好文章 (4)
真棒!(5)

建议?







回页首


IBM 公司保留在 developerWorks 网站上发表的内容的著作权。未经IBM公司或原始作者的书面明确许可,请勿转载。如果您希望转载,请通过 提交转载请求表单 联系我们的编辑团队。
    关于 IBM 隐私条约 联系 IBM 使用条款