内容


IBM Extreme Transaction Processing (XTP) 模式

通过 WebSphere eXtreme Scale 进行可伸缩快速异步处理

Comments

系列内容:

此内容是该系列 # 部分中的第 # 部分: IBM Extreme Transaction Processing (XTP) 模式

敬请期待该系列的后续内容。

此内容是该系列的一部分:IBM Extreme Transaction Processing (XTP) 模式

敬请期待该系列的后续内容。

简介

即使是最心不在焉的技术观察者也一定会注意到向某些系统和服务的迁移,这些系统和服务能够使用比以前更多的数据源执行越来越复杂的任务,所有这些工作甚至只需一个更简便易用的界面即可完成。混搭(mash-up) 是为表达这个概念而创造的术语。例如,提供综合详细的街道地图就是一个好主意。再向这些街道地图添加当前交通数据就更好了。还要加上带有用户评级的最近午餐地点?现在,这样的地图 真是很酷!

不管这种系统或服务是针对一般消费者还是位于一个金融交易系统的大规模网格内,这种趋势都是显而易见的。基于更多数据源更快地制定更多决策对于前沿系统很重要。幸运的是,我们开始了解解决这类问题需要采用什么方法,推动这种变革的最佳方法之一就是从同步、线性的处理迁移到响应性更好的异步处理。这些异步模式对于集成来自完全不同的、可能的外部数据源的信息尤其有用。这种异步机制还向用户提供可感知的响应性,并向整个系统提供最佳总体流量。通过将这些任务委托给一个可伸缩处理基础设施内的异步请求,可以实现更优化、更高效的系统利用率。

依赖外部信息源时,重要的是要分离各个独立的应用程序层并将它们更松散地链接起来,以便分别解决每一层的伸缩性问题。这不仅是一个可以轻松解决的简单问题,而且,当数据源出现延迟或停用时,客户端不会出现没有响应或功能无效的混乱状态。这种分离添加了对工作任务进行按需卸载、重试和排序的能力。

一个扩大的环境中的异步处理的陷阱与在等待没有响应或临时超载的基础设施组件时耗尽资源有关。在一个同步场景中,一个缓慢响应的外部资源或完全利用的连接池将捆绑上游线程,直到该瓶颈得到缓解。这种效果将一直持续上溯到客户体验,因为每个连续的资源都被捆绑起来以等待瓶颈得到缓解。

借助异步模型,队列本身可以充当一个减压阀。缓慢的组件在队列后面隔离,最大的效果是请求在队列中累积起来,但该组件最终也会跟上来。即使不是这样,也只有依赖这个缓慢的资源的服务受到影响。这种分离对整个系统有一个影响,即阻止在扩大过程中产生瓶颈,这仅仅归功于这样一个事实:每个资源都能独立操作。

WebSphere eXtreme Scale 和异步处理

尽管 IBM WebSphere eXtreme Scale 最常见、最广泛的用途是作为一个弹性网格,但 WebSphere eXtreme Scale 有一些关键特性,使其成为适合帮助解决异步问题的一个主要候选工具。其中一个特性是能够将一个处理代理部署到一个网格中,并指引该网格在被发送到一个特殊地图中的每个实体上调用这个 “代理”。我们利用了这种能力并将其一般化为一组函数,这组函数主要用于将一个 POJO 实体转化为一条 “消息”。您需要为代理提供代码,作为您愿意异步执行的业务逻辑。

这个框架是 WebSphere eXtreme Scale 异步服务框架,该框架是一个库,可以在您的应用程序中通过 WebSphere eXtreme Scale 来利用,向您提供以一种可靠的、可伸缩的方式异步处理工作的能力。由于这个框架实际上是一个使用标准 WebSphere eXtreme Scale 功能来执行这个功能的库或应用程序(就像任意 WebSphere eXtreme Scale 部署一样),我们将把这个框架称为一个样例。但是,这个样例的目的是隐藏细节并提供一个直观的界面,以帮助您轻松部署这种异步处理。

在成熟的框架(比如 JMS 和其他消息传递基础设施)上使用 WebSphere eXtreme Scale 来进行异步处理有几个好处:

  • 简单性:WebSphere eXtreme Scale 和这个异步服务框架可以作为几个 JAR 文件部署到一个 Java™ 环境的类路径中。
  • 可伸缩性:异步工作的处理在网格节点本身上发生。这可以是发出请求的 JVM,或者 —— 甚至更好 —— 可以是已部署的数千个网格分区之一。WebSphere eXtreme Scale 的灵活性很高,如果您需要处理过程速度更快,只需将更多 JVMs 部署到处理消息的网格上。这就是我们所说的弹性特征,这使得异步服务成为一个弹性队列。
  • 可靠性:就像 WebSphere eXtreme Scale 复制和处理应用程序对象那样,同样的功能也被应用到网格中的异步消息。假如您使用的是一个充分部署和复制的网格,就可以使用这个处理系统消除单一的故障点。

上述大多数好处都可以通过其他异步框架以某种形式实现,但通常需要更大的成本和更陡的学习曲线。

图 1. WebSphere eXtreme Scale 异步服务框架架构
图 1. WebSphere eXtreme Scale 异步服务框架架构
图 1. WebSphere eXtreme Scale 异步服务框架架构

一个示例

我们将通过实现一个简单的应用程序 “Hello World” 来展示 WebSphere eXtreme Scale 异步服务框架的简单性。这个示例将把一条消息发送到一个网格中,识别已指定哪个网格分区来处理该消息,并将该信息返回消息发送者。尽管这是一个简单示例,但您将会看到核心概念的实际应用,以便您今后将它们应用到各种应用程序。

图 2. 该框架的开发人员视图
图 2. 该框架的开发人员视图
图 2. 该框架的开发人员视图

消息 POJO

首先,您需要定义消息本身,以及将被处理的业务逻辑。操作方法是创建一个实现这个 “可序列化” 接口的 POJO。这个接口表明:该对象可以作为一个参数通过一个远程方法调用发送,这是插入到 WebSphere eXtreme Scale 网格中的任意对象的基本要求。

清单 1. 样例消息对象
package com.ibm.websphere.asyncservice.test;

import java.io.Serializable;

public class TestMessage implements Serializable {


  private static final long serialVersionUID = 7894711850787375307L;

  public int id;

  public TestMessage(int id){
    this.id = id;
  }

}

这个对象充当在网格中接收到消息时需要处理的工作的数据容器。在本例中,您将一直使用一个单一的 Integer 字段,该字段将被用作一个惟一标识符并允许您穿过网格跟踪消息的进度。

当这样一个实例插入到网格中时,接收消息的网格分区将通过专用于处理这些请求的本地线程池使用该实例中的数据来执行消息监听器中的逻辑。我们将这个线程池称为一个处理单元,每个 shard 中托管一个线程池。这个处理单元的配置可以通过使用 Spring 框架(将在下一篇文章中介绍)来实现。默认情况下,每个处理单元使用三个线程,由于这个配置将复制到每个分区,该配置对许多类型的应用程序都比较理想。

监听器(MessageProcessor)

要实现这个消息处理逻辑,只需创建一个实现 com.ibm.websphere.objectgrid.asyncservice.MessageProcessor 接口的类,而这只需实现一个名为 onMessage 的方法。该方法接收三个参数:一个惟一的消息 ID,一个对消息发送到的网格的引用,以及作为一个对象的消息本身。清单 2 展示了相应的代码。

清单 2. 样例消息处理器
package com.ibm.websphere.asyncservice.test;

import java.io.Serializable;

import com.ibm.websphere.objectgrid.Session;
import com.ibm.websphere.objectgrid.asyncservice.MessageProcessor;

public class TestProcessor implements MessageProcessor<Integer> {

  public Integer onMessage(Session session, String msgId, Serializable msg){

    int partitionId = session.getObjectGrid().getMap("Queue").getPartitionId();
    TestMessage testMsg = (TestMessage) msg;
    System.out.println("SHARD: Message " + testMsg.id + " processed by partition " 
      + partitionId);
    return partitionId;

  }

}

这个实例返回一个 Integer 对象,用于向最初发送该消息的进程/线程标识处理分区的 ID。这个异步服务框架将接受这个值并实现 Future 对象,该对象被返回客户端,以便对该值感兴趣的任意客户端进程能够使用这个验证信息。

Future 是在 Java 5 并发性框架中引入的一种数据类型,特指稍后将被 “填充”,并提供一些方法来检查该值当前是否已分配,甚至阻塞和等待,直到该值被分配。最终返回类型不通过任意异步服务接口或实现类指定,允许您选择任意可序列化的返回数据类型。

有一个特殊细节值得一提:每个处理单元分区都有一个单一的 MessageProcessor 实例。这个异步处理框架通过利用 WebSphere eXtreme Scale 的 shard 事件监听器接口来确保这一点。这个实例被用于发送到该分区中的所有消息,使这个类的成员线程安全并提供缓存必要资源的机会。如果消息处理器需要进行一个数据源查询或其他昂贵的资源连接,那么您需要在这里那样做。通过将它保持为这个类中的一个成员字段,您可以安全地缓存这些昂贵的资源。

客户端

下面我们看看要将这些消息中的一条发送到网格中并处理结果需要如何操作:

清单 3. 异步框架客户端示例
AsyncServiceManager<Integer> aservice = new AsyncServiceManagerImpl<Integer>(clientGrid);

TestMessage msg10 = new TestMessage(10);
Future<Integer> response = aservice.sendAsyncMessage(msg10);

System.out.println("{CLIENT: Message " + i + " was successfuly processed by partition " 
  + responses.get(i).get() + ".}" );

除清单 3 中的简短代码段外,还需要一些代码来创建到由 clientGrid 对象表示的网格的连接,这将在稍后讨论 WebSphere eXtreme Scale 部署细节时介绍。

消息被发送之后,发送者可以保存对 Future 对象的一个引用并定期检查是否完成,或者,它也可以通过调用 get() 方法来阻塞和等待。清单 3 中的代码展示了一个阻塞调用,其中客户端将一直等待,直到消息被处理并报告哪个网格分区已被选中来使用该消息。

本文 下载 部分包含本示例的源代码和部署文件。另外,参考资料 部分还包含下载这个异步服务框架以及功能齐备的 WebSphere eXtreme Scale 试用版的链接。

网格

本文包含的样例网格可以通过两种方法启动和部署。第一种方法是使用一个简单的进程内(in-process)目录服务和容器,这个服务和容器有利于开发环境和快速测试。第二种方法是提供一些脚本来部署一些独立网格容器,它们可以跨一个机器集群部署。这是生产环境和性能测试环境的标准部署类型。

  • 进程内网格

    使用一个进程内网格来部署和运行这个样例是查看这些概念的实际应用的最简单的方法。一个脚本被 提供 来运行客户端。当这个脚本不带任何参数运行时,这个网格就被自动部署在进程内。这不需要其他任何更改。如果您已将这个代码导入 Eclipse 或类似的 IDE,只需运行 com.ibm.websphere.asyncservice.test.TestClient 类中的 main 方法来完成相同的功能。

  • 远程网格

    首先启动一个目录服务,样例包含的脚本中描述了这个服务。您可以在样例代码的 bin 目录中找到这些脚本,且只需更改 env.sh 中的 JAVA_HOME 变量来反映您想要的 JVM 的路径,以及 WebSphere eXtreme Scale 产品代码所在的位置。

    要启动目录服务,只需执行脚本 runcat.sh

    网格 JVMs 本身需要一些部署信息,但是它们很少、甚至不需要任何定制就可以在几乎任意环境中工作。这些地图和网格的名称全部被这个异步服务框架本身使用,不需要更改。这些信息存储在样例的 deployment.xmlobjectgrid.xml 文件中,您可以使用它们来启动您的网格 JVMs。所有这些配置细节都位于这些提供的脚本中,因此,要启动一个容器 JVM,运行以下命令:runcontainer.sh <your unique server name>

    您的服务器进程启动之后,您可以使用清单 4 中的代码,通过检索对客户端中的 WebSphere eXtreme Scale 网格的引用来连接到并使用该网格。

清单 4. 网格客户端示例
// Connect to the Catalog Server.  The security and client override XML are not specified
ClientClusterContext ccc = ObjectGridManagerFactory.getObjectGridManager()
                                                      .connect(“localhost”, null, null);

// Retrieve the ObjectGrid client connection and return it.
ObjectGrid grid = ObjectGridManagerFactory.getObjectGridManager()
                                             .getObjectGrid(ccc, “Grid”);

根据标准 WebSphere eXtreme Scale 开发模式,您首先从目录服务(这里部署在本地主机上,应该针对一个远程目录服务更改或参数化)获取一个上下文对象。然后,获取对该网格的一个引用,然后该引用被提供给上面的 AsyncServiceManagerImpl 对象以发送异步消息。

runclient.sh 脚本也可用于针对一个远程网格运行样例,方法是将主机名和端口作为单个参数提供给处理单元的目录服务。例如,执行命令 runclient.sh localhost:2809 来针对部署在本地机器上的一个网格运行客户端。

运行时

这里的目的是最大化可伸缩性和可靠性。从可伸缩性角度看,您只需启动其他的 JVMs 来参与网格,网格将自动开始服务异步消息传递请求。要获取更多容量,只需使用上面的命令行(但需提供一个新的惟一服务器 ID)启动一个新 JVM,轻松提供其他处理资源。

可靠性已经在网格中得到处理。根据配置,每条消息都将在另一个网格 JVM 中复制一次并在主分区停用时在那里处理。与任意典型的 WebSphere eXtreme Scale 网格实例一样,复制行为可以在部署 XML 中配置。在样例中,网格配置为当另一个容器可用时异步执行消息的单一复制。这个服务是一个确定的 “交付一次” 机制,其可靠性只受要复制到的可用 JVMs 的数量限制。

需要提到的一个重要细节是:“交付一次” 是在您的消息监听器实现上下文中。这个异步服务框架将在网格中只交付和提交消息 “一次”,但是由于失败窗口,监听器中的实际逻辑有可能被多次执行。这意味着,如果监听器逻辑更改一个外部数据源(数据库、大型机等)中的任意状态,那么这些交互必须通过一个 “至少一次” 交付范式处理。为了被清空,onMessage 方法将只被完整处理一次,但它可能会多次尝试;这样,该方法的一些部分可能会被多次执行。例如,如果一个容器 JVM 在处理一条消息时失败,那么 onMessage 方法将在新的主分区被提升后在该分区上再次执行。可能会影响外部资源的任意代码必须意识到这种可能性。

事务考虑

这个框架,以及该框架基于的 WebSphere eXtreme Scale,没有正式参与 JTA 事务(WebSphere eXtreme Scale 拥有自己的事务管理器)。但是,标准 “嵌套” 技术可用于将消息的处理作为一个 JTA 事务的一个逻辑部分。即,您的监听器(MessageProcessor)应被编写来以该顺序执行以下逻辑,确保每个步骤在下一个步骤开始前成功执行:

  1. 开始一个 JTA 事务。
  2. 处理一条消息。
  3. 将消息标记为 “已处理”(或者将其从队列中删除)。
  4. 提交这个 JTA 事务。

如果任一步骤失败(例如,您没有成功将消息标记为 “已处理”),您可以回滚到 JTA 事务,该消息可以被再次有效处理。上面的逻辑将消息处理这个操作作为这个 JTA 事务的一个逻辑参与者。

结束语

本文演示了 WebSphere eXtreme Scale 异步服务框架,该框架作为一个可部署在您的环境中的样例提供。本文展示的示例演示了 WebSphere eXtreme Scale 服务器的这一新功能,以帮助您解决一个不断变化的事务处理环境中的一些更麻烦、更具挑战性的增长问题。它们创建的异步原型和可伸缩环境对于许多不断演变的系统很关键。

这个异步服务框架作为 WebSphere eXtreme Scale 样例库中的一个样例提供。这个模式和示例知识库是我们帮助用户最好地利用这些新技术的最有价值的沟通渠道之一。


相关主题


评论

添加或订阅评论,请先登录注册

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=WebSphere
ArticleID=512102
ArticleTitle=IBM Extreme Transaction Processing (XTP) 模式: 通过 WebSphere eXtreme Scale 进行可伸缩快速异步处理
publish-date=08192010