一个用于实现并行执行的 Java actor 库

使用 μJavaActors(一个轻型 Java actor 库)现代化常见的并发模式

Java™ 平台目前不支持 actor 并行性,但仍然可以在 Java 代码中使用 actor。在本文中,Barry Feigenbaum 将介绍 μJavaActors 库,这是一个轻型的、基于 Java 的 actor 包,用于在传统 Java 应用程序中实现高度并行的执行。本教程将介绍 μJavaActors 库的完整源代码,以及将 actor 用于 Java 备用模式(比如 Command、Producer/Consumer 和 Map/Reduce 等)的实际示例的完整源代码。

Barry A. Feigenbaum, Ph.D., 软件工程师, Dell

http://www.ibm.com/developerworks/i/p-bfeigenbaum.jpgBarry Feigenbaum 目前是一名软件工程师,目前在 Dell 工作,之前曾在 IBM 和 Amazon 工作过。他是一位 Sun(现在为 Oracle)认证的 Java 程序员、开发人员和架构师。Barry 撰写了其他许多 developerWorks 文章,出席过 JavaOne 等大会,还编写了多部技术图书。他拥有计算机工程方面的博士学位。



2012 年 7 月 16 日

即使 Java 6 和 Java 7 中引入并发性更新,Java 语言仍然无法让并行编程变得特别容易。Java 线程、synchronized 代码块、wait/notifyjava.util.concurrent 包都拥有自己的位置,但面对多核系统的容量压力,Java 开发人员正在依靠其他语言中开创的技术。actor 模型就是这样一项技术,它已在 Erlang、Groovy 和 Scala 中实现。本文为那些希望体验 actor 但又要继续编写 Java 代码的开发人员带来了 μJavaActors 库。

用于 JVM 的另外 3 个 actor 库

请参阅 “表 1:对比 JVM actor 库”,快速了解 3 个用于 JVM 的流行的 actor 库与 μJavaActors 的对比特征。

μJavaActors 库 是一个紧凑的库,用于在 Java 平台上实现基于 actor 的系统(μ 表示希腊字母 Mμ,意指 “微型”)。在本文中,我使用 μJavaActors 探讨 actor 在 Producer/Consumer 和 Map/Reduce 等常见设计模式中的工作原理。

您随时可以 下载 μJavaActors 库的源代码

Java 平台上的 actor 并发性

这个名称有何含义?具有任何其他名称的 actor 也适用!

基于 actor 的系统 通过实现一种消息传递 模式,使并行处理更容易编码。在此模式中,系统中的每个 actor 都可接收消息;执行该消息所表示的操作;然后将消息发送给其他 actor(包括它们自己)以执行复杂的操作序列。actor 之间的所有消息是异步的,这意味着发送者会在收到任何回复之前继续进行处理。因此,一个 actor 可能终生都陷入接收和处理消息的无限循环中。

当使用多个 actor 时,独立的活动可轻松分配到多个可并行执行消息的线程上(进而分配在多个处理器上)。一般而言,每个 actor 都在一个独立线程上处理消息。一些 actor 系统静态地向 actor 分配线程;而其他系统(比如本文中介绍的系统)则会动态地分配它们。

μJavaActors 简介

μJavaActors 是 actor 系统的一个简单的 Java 实现。只有 1,200 行代码,μJavaActors 虽然很小,但很强大。在下面的练习中,您将学习如何使用 μJavaActors 动态地创建和管理 actor,将消息传送给它们。

μJavaActors 围绕 3 个核心界面而构建:

  • 消息 是在 actor 之间发送的消息。Message 是 3 个(可选的)值和一些行为的容器:
    • source 是发送 actor。
    • subject 是定义消息含义的字符串(也称为命令)。
    • data 是消息的任何参数数据;通常是一个映射、列表或数组。参数可以是要处理和/或其他 actor 要与之交互的数据。
    • subjectMatches() 检查消息主题是否与字符串或正则表达式匹配。
    μJavaActors 包的默认消息类是 DefaultMessage
  • ActorManager 是一个 actor 管理器。它负责向 actor 分配线程(进而分配处理器)来处理消息。ActorManager 拥有以下关键行为或特征:
    • createActor() 创建一个 actor 并将它与此管理器相关联。
    • startActor() 启动一个 actor。
    • detachActor() 停止一个 actor 并将它与此管理器断开。
    • send()/broadcast() 将一条消息发送给一个 actor、一组 actor、一个类别中的任何 actor 或所有 actor。
    在大部分程序中,只有一个 ActorManager,但如果您希望管理多个线程和/或 actor 池,也可以有多个 ActorManager。此接口的默认实现是 DefaultActorManager
  • Actor 是一个执行单元,一次处理一条消息。Actor 具有以下关键行为或特征:
    • 每个 actor 有一个 name,该名称在每个 ActorManager 中必须是惟一的。
    • 每个 actor 属于一个 category;类别是一种向一组 actor 中的一个成员发送消息的方式。一个 actor 一次只能属于一个类别。
    • 只要 ActorManager 可以提供一个执行 actor 的线程,系统就会调用 receive()。为了保持最高效率,actor 应该迅速处理消息,而不要进入漫长的等待状态(比如等待人为输入)。
    • willReceive() 允许 actor 过滤潜在的消息主题。
    • peek() 允许该 actor 和其他 actor 查看是否存在挂起的消息(或许是为了选择主题)。
    • remove() 允许该 actor 和其他 actor 删除或取消任何尚未处理的消息。
    • getMessageCount() 允许该 actor 和其他 actor 获取挂起的消息数量。
    • getMaxMessageCount() 允许 actor 限制支持的挂起消息数量;此方法可用于预防不受控制地发送。
    大部分程序都有许多 actor,这些 actor 常常具有不同的类型。actor 可在程序启动时创建或在程序执行时创建(和销毁)。本文中的 actor 包 包含一个名为 AbstractActor 的抽象类,actor 实现基于该类。

图 1 显示了 actor 之间的关系。每个 actor 可向其他 actor 发送消息。这些消息保存在一个消息队列(也称为邮箱;从概念上讲,每个 actor 有一个队列,当 ActorManager 看到某个线程可用于处理消息时,就会从队列中删除该消息,并将它传送给在线程下运行的 actor,以便处理该消息。

图 1. actor 之间的关系
actor 通过线程向其他 actor 发送消息

μJavaActors 的并行执行功能

现在您已可开始使用 μJavaActors 实现并行执行了。首先要创建一组 actor。这些是简单的 actor,因为它们所做的只是延迟少量时间并将消息发送给其他 actor。这样做的效果是创建一个消息风暴,您首先会看到如何创建 actor,然后会看到如何逐步分派它们来处理消息。

有两种消息类型:

  • initialization (init) 会导致 actor 初始化。仅需为每个 actor 发送一次这种类型的消息。
  • repeat 会导致 actor 发送 N-1 条消息,其中 N 是一个传入的消息参数。

清单 1 中的 TestActor 实现从 AbstractActor 继承的抽象方法。activatedeactivate 方法向 actor 通知它的寿命信息;此示例中不会执行任何其他操作。runBody 方法是在收到任何消息之前、首次创建 actor 的时候调用的。它通常用于将第一批消息引导至 actor。testMessage 方法在 actor 即将收到消息时调用;这里 actor 可拒绝或接受消息。在本例中,actor 使用继承的 testMessage 方法测试消息接受情况;因此接受了所有消息。

清单 1. TestActor
  class TestActor extends AbstractActor {

    @Override
    public void activate() {
      super.activate();
    }

    @Override
    public void deactivate() {
      super.deactivate();
    }

    @Override
    protected void runBody() {
      sleeper(1);  // delay up to 1 second
      DefaultMessage dm = new DefaultMessage("init", 8);
      getManager().send(dm, null, this);
    }

    @Override
    protected Message testMessage() {
      return super.testMessage();
    }

loopBody 方法(如清单 2 中所示)在 actor 收到一条消息时调用。在通过较短延迟来模拟某种一般性处理之后,才开始处理该消息。如果消息为 “repeat”,那么 actor 基于 count 参数开始发送另外 N-1 条消息。这些消息通过调用 actor 管理器的 send 方法发送给一个随机 actor。

清单 2. loopBody()
    @Override
    protected void loopBody(Message m) {
      sleeper(1);
      String subject = m.getSubject();
      if ("repeat".equals(subject)) {
        int count = (Integer) m.getData();
        if (count > 0) {
          DefaultMessage dm = new DefaultMessage("repeat", count - 1);
          String toName = "actor" + rand.nextInt(TEST_ACTOR_COUNT);
          Actor to = testActors.get(toName);
          getManager().send(dm, this, to);
        }
      }

如果消息为 “init”,那么 actor 通过向随机选择的 actor 或一个属于 common 类别的 actor 发送两组消息,启动 repeat 消息队列。一些消息可立即处理(实际上在 actor 准备接收它们且有一个线程可用时即可处理);其他消息则必须等待几秒才能运行。这种延迟的消息处理对本示例不是很重要,但它可用于实现对长期运行的流程(比如等待用户输入或等待对网络请求的响应到达)的轮询。

清单 3. 一个初始化序列
      else if ("init".equals(subject)) {
        int count = (Integer) m.getData();
        count = rand.nextInt(count) + 1;
        for (int i = 0; i < count; i++) {
          DefaultMessage dm = new DefaultMessage("repeat", count);
          String toName = "actor" + rand.nextInt(TEST_ACTOR_COUNT);
          Actor to = testActors.get(toName);
          getManager().send(dm, this, to);
          
          dm = new DefaultMessage("repeat", count);
          dm.setDelayUntil(new Date().getTime() + (rand.nextInt(5) + 1) * 1000);
          getManager().send(dm, this, "common");
        }
      }

否则,表明消息不适合并会报告一个错误:

      else {
        System.out.printf("TestActor:%s loopBody unknown subject: %s%n", 
          getName(), subject);
      }
    }
  }

主要程序包含清单 4 中的代码,它在 common 类别中创建了 2 个 actor,在 default 类别中创建了 5 个 actor,然后启动它们。然后 main 至多会等待 120 秒(sleeper 等待它的参数值的时间约为 1000ms),定期显示进度消息。

清单 4. createActor、startActor
    DefaultActorManager am = DefaultActorManager.getDefaultInstance();
    :
    Map<String, Actor> testActors = new HashMap<String, Actor>();
    for (int i = 0; i < 2; i++) {
        Actor a = am.createActor(TestActor.class, "common" + i);
        a.setCategory("common");
        testActors.put(a.getName(), a);
    }
    for (int i = 0; i < 5; i++) {
        Actor a = am.createActor(TestActor.class, "actor" + i);
        testActors.put(a.getName(), a);
    }
    for (String key : testActors.keySet()) {
       am.startActor(testActors.get(key));
    }    
    for (int i = 120; i > 0; i--) {
        if (i < 10 || i % 10 == 0) {
            System.out.printf("main waiting: %d...%n", i);
        }
        sleeper(1);
    }
    :
    am.terminateAndWait();

跟踪输出

要理解刚执行的流程,让我们看看来自 actor 的一些跟踪输出。(请注意,因为对计数和延迟使用了随机数,所以每次执行的输出可能有所不同。)在清单 5 中,可以看到在程序启动后不久出现的消息。左列(括号中)是执行的线程名称。在此次运行中,有 25 个线程可用于处理消息。每行的剩余部分(经过删减)是跟踪输出,显示了收到的每条消息。请注意,repeat 计数 — 也就是参数数据,它在减少不断。(另请注意,线程名称与 actor 的名称毫无关系,尽管该名称是以 actor 开头。)

清单 5. 跟踪输出:程序启动
[main         ] - main waiting: 120...
[actor17      ] - TestActor:actor4 repeat(4)
[actor0       ] - TestActor:actor1 repeat(4)
[actor10      ] - TestActor:common1 repeat(4)
[actor1       ] - TestActor:actor2 repeat(4)
[actor3       ] - TestActor:actor0 init(8)
[actor22      ] - TestActor:actor3 repeat(4)
[actor17      ] - TestActor:actor4 init(7)
[actor20      ] - TestActor:common0 repeat(4)
[actor24      ] - TestActor:actor0 repeat(4)   
[actor0       ] - TestActor:actor1 init(3)
[actor1       ] - TestActor:actor2 repeat(4)   
[actor20      ] - TestActor:common0 repeat(4)   
[actor17      ] - TestActor:actor4 repeat(4)   
[actor17      ] - TestActor:actor4 repeat(3)   
[actor0       ] - TestActor:actor1 repeat(8)   
[actor10      ] - TestActor:common1 repeat(4)   
[actor24      ] - TestActor:actor0 repeat(8)   
[actor0       ] - TestActor:actor1 repeat(8)   
[actor24      ] - TestActor:actor0 repeat(7)   
[actor22      ] - TestActor:actor3 repeat(4)   
[actor1       ] - TestActor:actor2 repeat(3)   
[actor20      ] - TestActor:common0 repeat(4)   
[actor22      ] - TestActor:actor3 init(5)
[actor24      ] - TestActor:actor0 repeat(7)   
[actor10      ] - TestActor:common1 repeat(4)   
[actor17      ] - TestActor:actor4 repeat(8)   
[actor1       ] - TestActor:actor2 repeat(3)   
[actor17      ] - TestActor:actor4 repeat(8)   
[actor0       ] - TestActor:actor1 repeat(8)   
[actor10      ] - TestActor:common1 repeat(4)   
[actor22      ] - TestActor:actor3 repeat(8)   
[actor0       ] - TestActor:actor1 repeat(7)   
[actor1       ] - TestActor:actor2 repeat(3)   
[actor0       ] - TestActor:actor1 repeat(3)   
[actor20      ] - TestActor:common0 repeat(4)   
[actor24      ] - TestActor:actor0 repeat(7)   
[actor24      ] - TestActor:actor0 repeat(6)   
[actor10      ] - TestActor:common1 repeat(8)   
[actor17      ] - TestActor:actor4 repeat(7)

在清单 6 中,可以看到在程序即将结束时出现的消息,这时 repeat 计数已减小。如果观察此程序的执行,您将能够看到生成各行的速度在逐渐减慢;这是因为生成的消息数量在逐渐减少。如果等待足够长时间,发送给 actor 的消息会完全停止(与清单 6 中所示的 common actor 上发生的一样)。请注意,消息处理工作合理地分散在可用的线程上,并且没有任何 actor 被绑定到特定的线程上。

清单 6. 跟踪输出:程序结束
[main         ] - main waiting: 20...
[actor0       ] - TestActor:actor4 repeat(0)   
[actor2       ] - TestActor:actor2 repeat(1)   
[actor3       ] - TestActor:actor0 repeat(0)   
[actor17      ] - TestActor:actor4 repeat(0)   
[actor0       ] - TestActor:actor1 repeat(2)   
[actor3       ] - TestActor:actor2 repeat(1)   
[actor14      ] - TestActor:actor1 repeat(2)   
[actor5       ] - TestActor:actor4 repeat(0)   
[actor14      ] - TestActor:actor2 repeat(0)   
[actor21      ] - TestActor:actor1 repeat(0)   
[actor14      ] - TestActor:actor0 repeat(1)   
[actor14      ] - TestActor:actor4 repeat(0)   
[actor5       ] - TestActor:actor2 repeat(1)   
[actor5       ] - TestActor:actor4 repeat(1)   
[actor6       ] - TestActor:actor1 repeat(1)   
[actor5       ] - TestActor:actor3 repeat(0)   
[actor6       ] - TestActor:actor2 repeat(1)   
[actor4       ] - TestActor:actor0 repeat(0)   
[actor5       ] - TestActor:actor4 repeat(1)   
[actor12      ] - TestActor:actor1 repeat(0)   
[actor20      ] - TestActor:actor2 repeat(2)   
[main         ] - main waiting: 10...
[actor7       ] - TestActor:actor4 repeat(2)   
[actor23      ] - TestActor:actor1 repeat(0)   
[actor13      ] - TestActor:actor2 repeat(1)   
[actor8       ] - TestActor:actor0 repeat(0)   
[main         ] - main waiting: 9...
[actor2       ] - TestActor:actor1 repeat(0)   
[main         ] - main waiting: 8...
[actor7       ] - TestActor:actor2 repeat(0)   
[actor13      ] - TestActor:actor1 repeat(0)   
[main         ] - main waiting: 7...
[actor2       ] - TestActor:actor2 repeat(2)   
[main         ] - main waiting: 6...
[main         ] - main waiting: 5...
[actor18      ] - TestActor:actor1 repeat(1)   
[main         ] - main waiting: 4...
[actor15      ] - TestActor:actor2 repeat(0)   
[actor16      ] - TestActor:actor1 repeat(1)   
[main         ] - main waiting: 3...
[main         ] - main waiting: 2...
[main         ] - main waiting: 1...
[actor4       ] - TestActor:actor1 repeat(0)   
[actor6       ] - TestActor:actor2 repeat(0)

模拟屏幕截图

很难从前面的跟踪信息中全面了解 actor 系统的行为,很大程度上是因为并不是所有跟踪格式都有用。可以使用一个类似 actor 模拟执行的快照图像,以图形格式查看相同的信息。每个图像显示一段固定时期之后的模拟情况。以下视频演示了一些未被代码示例和屏幕截图采集到的 Java actor 流程。可以在本地或在 土豆 上查看下面的视频。

此处 查看脚本。

图 2 显示了在运行任何模拟之前模拟的用户界面。请注意右侧显示的模拟菜单。

图 2. 运行任何模拟之前的 actor 模拟器
运行任何模拟之前的 actor 模拟器

此处 查看此图的完整版本。

屏幕的顶部区域显示了一个包含多种变体的模拟菜单;除非另行说明,否则下列模拟将如跟踪输出和以下屏幕截图中所示:

  • 倒计时模拟 (0:15) 创建将一个值倒计时到 0 并发送更多请求的 actor。
  • Producer/Consumer 模拟 (2:40) 在经典的 Producer/Consumer 并发性问题上创建了一个变体。
  • Map/Reduce 模拟 (5:28) 创建对 1000 个整数的平方和的并行执行操作。
  • 病毒扫描 模拟 (6:45) 扫描一个磁盘目录树来查找 “.txt” 文件(限制扫描的数量),检测可疑的内容模式。这个没有 CPU 限制的模拟未在以下屏幕截图中显示,但它 视频演示的一部分。
  • 所有模拟并发运行,但这仅是在视频演示中 (8:18)。

该视频格式显示了按顺序运行的所有这些模拟,每个模拟之间具有较短的暂停时间。

除了 Start 和 Stop 之外,图 2 中的屏幕截图还显示了以下控件和设置。(请注意,Stop 不会停止线程,所以某些操作在停止线程后可能仍在进行。)

  • Redistribute 半随机地在 actor 圆圈中重新分配 actor(默认顺序是创建顺序)。这使您能够更容易地通过重新放置 actor 来查看分组到一起的邻近 actor 之间的消息。它还可以向 actor 分配新颜色。
  • Add TaskRemove Task 在启动工具中添加或删除任务(线程)。Remove Task 将仅删除添加的(而不是原始的)任务。
  • Maximum steps(使用值的 log2)限制模拟的持续时间,仅在模拟启动之前生效。各个步骤大约持续 1 秒。
  • Show actors as transparent 使用户更容易看到邻近 actor 之间的消息。不透明的 actor 常常更容易看到。可以在模拟运行时更改此设置。
  • Number of threads to use spinner 仅在模拟启动之前生效。许多模拟的运行速度要比更多线程快得多。

控件下面的显示区域显示了当前的线程使用情况(显示为过去的 1 秒中的平均值)。大型的中心区域会显示模拟。底部区域会显示模拟历史。右侧区域会显示完整的模拟轨迹。当运行时,模拟框架按如下方式配置:

  • 控制区域是大约每秒更新一次的仪表显示:
    • 每秒接受的消息数。
    • 每秒完成的消息数。
    • 每秒接受的消息数与完成的消息数的比率。
      如果活动显示在右侧,那么到达的消息比正在处理的消息要多一些;最终,消息缓冲区会发生溢出。如果活动显示在左侧,正在处理的消息比到达的消息要多一些;最终,系统会空闲下来。平衡的系统会显示 0 或者仅较长时间内显示绿色水平线。
  • 中心区域之上是一个包含绿条的网格;每个绿条表示一个线程(就像外部圆圈中一样)。全绿的条带表示线程正被全面利用,全黄的条带表示线程完全空闲。
  • 在中央区域,正方形的外环表示线程(在这些模拟中为 10 个,在以前的跟踪轨迹中有 25 个)。绿色线程附加到一个 actor 来执行收到的消息;中心的点的颜色表示 actor 类型。接近正方形的数字是当前分配给此线程的 actor 数量(从左侧的 0 开始顺时针排列到 360 度)。黄色线程是空闲的。
  • 内部的圆环表示 actor;颜色表示类型(在第一个示例中仅有一种类型)。如果 actor 正忙于处理一条消息,它会显示在更暗的阴影中(如果使用了非透明的 actor,这会更加明显)。圆圈 (actor) 之间的线表示消息。任何浅红色的线都是在给定刷新周期中发送的新消息(模拟每秒刷新 10 次);其他颜色是缓冲的消息(过去发送过来的,但目前仍未处理)。缓冲线在接收端有一个小圆圈;该圆圈随着缓冲消息数量增加而变大。
  • 最右端显示输出轨迹;此轨迹类似于前面探讨的轨迹,但更详细一些。
  • 图像底部是一组较小的圆圈;每个圆圈是在过去定期显示的主要圆圈的缩小版本。这提供了一种查看消息随时间变化的趋势的轻松方式。如果观察此历史,您就会看到消息将迅速积压,然后逐渐减少。

图 3 显示了执行大约 10 秒后的模拟效果。请注意大量的挂起消息,它们是迅速累积起来的。有 34 个 actor,但仅有 10 个线程,所以一些 actor 需要空闲下来。在此时,所有线程都忙于处理消息。

图 3. 启动倒计时模拟效果 (0:15)
执行 10 秒时的倒计时模拟

此处 查看全图。

图 4 是执行大约 30 秒后的模拟。挂起消息的数量已大大减少。由于消息到达率更低一些,所以只有部分线程在繁忙地处理消息。

图 4. 中期的倒计时模拟效果
执行 30 秒时的倒计时模拟

此处 查看全图。

图 5 是执行大约 90 秒后的模拟。现在所有挂起的消息都已处理,因此所有线程都是空闲的。

图 5. 完成时的倒计时模拟效果
执行 90 秒时的倒计时模拟

此处 查看全图。


一个 Producer/Consumer 系统中的 actor

接下来,让我们看一下 Producer/Consumer 模式下的 actor 的演示。Producer/Consumer 是多处理器系统的一种最常见的同步模式。在下面的 μJavaActors 演示中,生成者 actor 生成要求使用者 actor 创建各种项的请求。使用者会创建这些项(这需要一定的时间),然后将一条完成消息发送回请求的生成者。

图 6 显示了执行大约 30 秒后的模拟效果。请注意,两种 actor 类型按颜色区分。生成者 actor 首先显示在屏幕右下侧。生成者在运行时创建使用者,所以随后才会显示后者。工作负载随时间的流逝而缓慢减少,大部分线程都很忙。请注意,生成者会迅速完成它们的工作,以至于它们很少显示为活动状态。

图 6. 启动不久后的 Producer/Consumer 模拟 (2:40)
执行 30 秒时的 Producer/Consumer 模拟

此处 查看全图。

图 7 显示了执行大约 115 秒后的模拟,这接近程序完成的时间。新请求和挂起的消息的数量已经大大减少。在视频演示中,您可能注意到,一些 actor 在很短时间内显示为未填充的圆圈;这些是处理发送给它们自身的消息的 actor。

图 7. 接近结束时的 Producer/Consumer 模拟效果
执行 115 秒时的 Producer/Consumer 模拟

此处 查看全图。

ProducerActor

清单 7 显示了演示中的生成者 actor 的代码。这里的 “produceN” 消息已处理。它转换成为了一条 “produce1” 消息,该 actor 将该消息发送给自己。预期的响应记录是一个挂起的回复计数,以供以后验证。

清单 7. 生成者 actor
public class ProducerActor extends AbstractActor {
  Map<String , Integer> expected = new ConcurrentHashMap<String
        , Integer>();

  @Override
  protected void loopBody(Message m) {
    String subject = m.getSubject();
    if ("produceN".equals(subject)) {
      Object[] input = (Object[]) m.getData();
      int count = (Integer) input[0];
      if (count > 0) {
        DefaultActorTest.sleeper(1); // this takes some time
        String type = (String) input[1];
        // request the consumers to consume work (i.e., produce)
        Integer mcount = expected.get(type);
        if (mcount == null) {
          mcount = new Integer(0);
        }
        mcount += count;
        expected.put(type, mcount);

        DefaultMessage dm = new DefaultMessage("produce1", 
          new Object[] { count, type });
        getManager().send(dm, this, this);
      }

在清单 8 中,“produce1” 消息已被处理。如果剩余计数大于 0,它会转换为一条 “construct” 消息并发送给使用者。请注意,此逻辑可能已作为对计数值的一个 for 循环来完成,而不是重新发送 “produce1” 消息。重新发送该消息常常会带来更出色的线程负载,尤其在循环主体会话占用大量时间的时候。

清单 8. 处理一个生成者请求
    } else if ("produce1".equals(subject)) {
      Object[] input = (Object[]) m.getData();
      int count = (Integer) input[0];
      if (count > 0) {
        sleep(100); // take a little time
        String type = (String) input[1];
        m = new DefaultMessage("construct", type);
        getManager().send(m, this, getConsumerCategory());

        m = new DefaultMessage("produce1", new Object[] { count - 1, type });
        getManager().send(m, this, this);
      }

在清单 9 中,“constructionComplete” 消息(由一个使用者发送)已被处理。它会对挂起的回复计数进行递减。如果一切正常,在模拟完成时,所有 actor 和类型值的此计数都将为 0。

清单 9. constructionComplete
    } else if ("constructionComplete".equals(subject)) {
      String type = (String) m.getData();
      Integer mcount = expected.get(type);
      if (mcount != null) {
        mcount--;
        expected.put(type, mcount);
      }

init” 消息在清单 10 中处理。生成者创建一些使用者 actor,然后向它自己发送多条 produceN 请求。

清单 10. 初始化
    } else if ("init".equals(subject)) {
      // create some consumers; 1 to 3 x consumers per producer
      for (int i = 0; i < DefaultActorTest.nextInt(3) + 1; i++) {
        Actor a = getManager().createAndStartActor(ConsumerActor.class,
            String.format("%s_consumer%02d", getName(), i));
        a.setCategory(getConsumerCategory());
        if (actorTest != null) {
          actorTest.getTestActors().put(a.getName(), a);
        }
      }
      // request myself create some work items
      for (int i = 0; i < DefaultActorTest.nextInt(10) + 1; i++) {
        m = new DefaultMessage("produceN", new Object[] 
             { DefaultActorTest.nextInt(10) + 1,
               DefaultActorTest.getItemTypes()[
                  DefaultActorTest.nextInt(DefaultActorTest.getItemTypes().length)] });
        getManager().send(m, this, this);
      }

清单 11 处理无效的消息:

清单 11. 处理无效的消息
    } else {
      System.out.printf("ProducerActor:%s loopBody unknown subject: %s%n", 
         getName(), subject);
    }
  }

  protected String getConsumerCategory() {
    return getName() + "_consumer";
  }
}

ConsumerActor

使用者(consumer) actor 很简单。它处理 “construct” 消息并向请求者发送回复消息。使用者 actor 的代码如清单 12 所示:

清单 12. 使用者 actor
public class ConsumerActor extends AbstractActor {

  @Override
  protected void loopBody(Message m) {
    String subject = m.getSubject();
    if ("construct".equals(subject)) {
      String type = (String) m.getData();
      delay(type); // takes ~ 1 to N seconds

      DefaultMessage dm = new 
         DefaultMessage("constructionComplete", type);
      getManager().send(dm, this, m.getSource());
    } else if ("init".equals(subject)) {
      // nothing to do
    } else {
      System.out.printf("ConsumerActor:%s loopBody unknown subject: %s%n", 
        getName(), subject);
    }
  }

清单 13 中处理的生产延迟基于构造的项的类型。从跟踪轨迹中,您可以回想起支持的项类型为 widgetframitfrizzlegothcasplat。每个类型需要花不同的时间量来构造。

清单 13. 生产延迟
  protected void delay(String type) {
    int delay = 1;
    for (int i = 0; i < DefaultActorTest.getItemTypes().length; i++) {
      if (DefaultActorTest.getItemTypes()[i].equals(type)) {
        break;
      }
      delay++;
    }
    DefaultActorTest.sleeper(DefaultActorTest.nextInt(delay) + 1);
  }
}

Producer/Consumer 模式中的 actor

Producer/Consumer 演示表明创建 actor 实现非常简单。典型的 actor 会解码收到的消息并处理它们,就像在一个 case 语句中一样。实际的处理在本示例中微不足道,只是短暂的时间延迟。在真实应用程序中会更复杂,但不会超过使用标准 Java 同步技术的实现;通常它会简单得多。

在此演示中,还应注意的是,复杂且重复性的算法可分解为离散(且常常可重用)的步骤。可为每个步骤分配一个不同的主题名称,时每个主题的情形变得非常简单。当状态包含在消息参数中时(比如前面演示的倒计时值),许多 actor 会变得无状态。这样的程序非常容易定义和扩展(添加更多 actor 来匹配更多线程),也可以在多线程环境中安全地运行;这类似于在行数样式编程中使用不可变的值。


actor 的更多模式

出于特定的用途,Producer/Consumer 演示中的 actor 是硬编码的,但这并不是您在编码 actor 时的惟一选择。在本节中,您将学习如何在更加通用的模式中使用 actor,首先需要改写 Gang of Four Command 模式

清单 14 中的 actor 实现大部分 Java 开发人员应该熟悉的 Command 模式的一种变体。在这里,CommandActor 支持两种消息:“execute” 和 “executeStatic。”

清单 14. CommandActor
public class CommandActor extends AbstractActor {

  @Override
  protected void loopBody(Message m) {
    String subject = m.getSubject();
    if ("execute".equals(subject)) {
      excuteMethod(m, false);
    } else if ("executeStatic".equals(subject)) {
      excuteMethod(m, true);
    } else if ("init".equals(subject)) {
      // nothing to do
    } else {
      System.out.printf("CommandActor:%s loopBody unknown subject: %s",
          getName(), subject);
    }
  }

清单 15 中的 executeMethod 方法加载了一个参数化的类,在该类或该类的实例上调用一个方法,然后返回该方法的结果或发生的任何异常。您可以看到这个简单的 actor 如何用于运行类路径上具有合适的执行方法的所有服务类。id 参数由客户端发送,所以它可以将响应与创建它们的请求进行关联。回复常常按照与发出时不同的顺序返回。

清单 15. 执行一个参数化方法
  private void excuteMethod(Message m, boolean fstatic) {
    Object res = null;
    Object id = null;
    try {
      Object[] params = (Object[]) m.getData();
      id = params[0];
      String className = (String) params[1];
      params = params.length > 2 ? (Object[]) params[2] : null;
      Class<?> clazz = Class.forName(className);
      Method method = clazz.getMethod(fstatic ? "executeStatic"
          : "execute", new Class[] { Object.class });
      if (Modifier.isStatic(method.getModifiers()) == fstatic) {
        Object target = fstatic ? null : clazz.newInstance();
        res = method.invoke(target, params);
      }
    } catch (Exception e) {
      res = e;
    }

    DefaultMessage dm = new DefaultMessage("executeComplete", new Object[] {
        id, res });
    getManager().send(dm, this, m.getSource());
  }
}

Event Listener 模式中的 actor

清单 16 中的 DelegatingActor 实现一种基于熟悉的 Java Event Listener(或 Callback)模式的类似的一般方法。它将到达的每条消息映射到每个注册的监听器上的一个 onMessage 回调,直到某个回调使用(也就是处理)该事件。这种委托方法可显著减少 actor 系统与它的消息处理器之间的联系。

清单 16. DelegatingActor
public class DelegatingActor extends AbstractActor {
  private List<MessageListener> listeners = new LinkedList<MessageListener>();

  public void addMessageListener(MessageListener ml) {
    if (!listeners.contains(ml)) {
      listeners.add(ml);
    }
  }

  public void removeMessageListener(MessageListener ml) {
    listeners.remove(ml);
  }

  protected void fireMessageListeners(MessageEvent me) {
    for (MessageListener ml : listeners) {
      if (me.isConsumed()) {
        break;
      }
      ml.onMessage(me);
    }
  }

  @Override
  protected void loopBody(Message m) {
    fireMessageListeners(new MessageEvent(this, m));
  }
}

DelegatingActor 类(如清单 17 所示)依赖于 MessageEventMessageListener 类:

清单 17. DelegatingActor
/** Defines a message arrival event. */
public static class MessageEvent extends EventObject {
  private Message message;

  public Message getMessage() {
    return message;
  }

  public void setMessage(Message message) {
    this.message = message;
  }

  private boolean consumed;

  public boolean isConsumed() {
    return consumed;
  }

  public void setConsumed(boolean consumed) {
    this.consumed = consumed;
  }

  public MessageEvent(Object source, Message msg) {
    super(source);
    setMessage(msg);
  }
}

/** Defines the message arrival call back. */
public interface MessageListener {
  void onMessage(MessageEvent me);
}

DelegatingActor 的一种示例用法如清单 18 所示:

清单 18. DelegatingActor 的示例用法
public static void addDelegate(DelegatingActor da) {
  MessageListener ml = new Echo("Hello world!");
  da.addMessageListener(ml);
}
	
	
public class Echo implements MessageListener {
  protected String message;

  public Echo(String message) {
    this.message = message;
  }

  @Override
  public void onMessage(MessageEvent me) {
    if ("echo".equals(me.getMessage().getSubject())) {
      System.out.printf("%s says \"%s\".%n", 
         me.getMessage().getSource(), message);
      me.setConsumed(true);
    }
  }
}

Map/Reduce 模式中的 actor

清单 14 到清单 18 中的示例 actor 简单且一目了然,因为消息仅朝一个方向发送。如果该行为需要反馈(比如当一个流程只有在处理了所有以前的消息后才能继续时),情况可能变得更加复杂。例如,请考虑这样一种 Map/Reduce 实现,其中的 reduce 阶段只有在 map 阶段完成后才能开始。

Map/Reduce 用于在处理大量数据的程序上实现并行处理。在下面的示例中,map 函数接受一个较大的项列表,然后将它分解为分区,发送一条消息来映射每个分区。我选择在每个映射请求上递增一个消息计数,让分区的映射处理器发送一条会递减该计数的回复。当计数为 0 时,所有映射已完成且 reduce 阶段可以启动。类似地,reduce 阶段对该列表分区(再次实现并行性)并发送消息来 reduce 分区。像 map 阶段中一样,reduce 也会统计它的消息,所以可以检测到递减操作的完成。要处理的值列表和计数在每个消息中作为消息传输。

对于本示例,我对许多主题使用了同一种 actor 类型。您也可以使用多种 actor 类型,为每个 actor 使用更少的主题(最少 1 个)。

图 8 是执行大约 20 秒后的 Map/Reduce 模拟。这是一个繁忙的处理阶段,所以线程都被处理消息所占用。

图 8. 启动不久后的 Map/Reduce (5:28)
执行 20 秒时的 Map/Reduce 模拟

此处 查看全图。

使用 MapReduceer 进行映射和缩减

请注意,此实现是可插拔的;它可运行 MapReduceer 接口的任何实现,如清单 19 所示。

清单 19. MapReduceer
public interface MapReduceer {
  /**
   * Map (in place) the elements of an array.
   * 
   * @param values elements to map
   * @param start start position in values
   * @param end end position in values
   */
  void map(Object[] values, int start, int end);

  /**
   * Reduce the elements of an array.
   * 
   * @param values elements to reduce
   * @param start start position in values
   * @param end end position in values
   * @param target place to set reduced value
   * @param posn position in target to place the value
   */
  void reduce(Object[] values, int start, int end, Object[] target, int posn);
}

例如,您可以使用 MapReduceer 计算一组整数的平方和,如清单 20 所示:

清单 20. MapReduceer 计算
public class SumOfSquaresReducer implements MapReduceer {
  @Override
  public void map(Object[] values, int start, int end) {
    for (int i = start; i <= end; i++) {
      values[i] = ((BigInteger) values[i]).multiply((BigInteger) values[i]);
      sleep(200); // fake taking time
    }
  }

  @Override
  public void reduce(Object[] values, int start, int end, Object[] target, int posn) {
    BigInteger res = new BigInteger("0");
    for (int i = start; i <= end; i++) {
      res = res.add((BigInteger) values[i]);
      sleep(100); // fake taking time
    }
    target[posn] = res;
  }
}

MapReduceActor

Map/Reduce actor 分解为多个主题,每个主题具有一个简单的任务。您将在下面的代码示例中看到它们每一个。您也可以在视频演示中查看 Map/Reduce 操作;观看模拟,然后研究代码示例,这会让您非常清楚地了解如何使用 actor 实现 Map/Reduce。(请注意,以下清单中的主题顺序可按任意多种方式分解;我将示例代码设计为包含许多次发送,以让视频演示更有趣。)

mapReduce 主题(如清单 21 所示)通过对输入数组分区来启动 Map/Reduce,它通过发送 createPartition 消息来进行分区。Map 和 Redu测 参数是在一个 MapReduceParameters 实例中提供的,该实例根据需要进行了克隆和修改,然后传递出去。请注意,该操作不需要时间延迟;我添加它们是为了确保将在用户界面中看到模拟。

清单 21. mapReduce
  @Override
  protected void loopBody(Message m) {
    ActorManager manager = getManager();
    String subject = m.getSubject();
    if ("mapReduce".equals(subject)) {
      try {
        MapReduceParameters p = (MapReduceParameters) m.getData();
        int index = 0;
        int count = (p.end - p.start + 1 + partitionSize - 1) / partitionSize;
        sleep(1000);
        // split up into partition size chunks
        while (p.end - p.start + 1 >= partitionSize) {
          MapReduceParameters xp = new MapReduceParameters(p);
          xp.end = xp.start + partitionSize - 1;
          DefaultMessage lm = new DefaultMessage("createPartition", 
            new Object[] { xp, index, count });
          manager.send(lm, this, getCategory());
          p.start += partitionSize;
          index++;
        }
        if (p.end - p.start + 1 > 0) {
          DefaultMessage lm = new DefaultMessage("createPartition", 
            new Object[] { p, index, count });
          manager.send(lm, this, getCategory());
        }
      } catch (Exception e) {
        triageException("mapFailed", m, e);
      }
}

createPartition 主题创建了更多 actor,并将请求转发给一个工作线程,如清单 22 所示。请注意,createMapReduceActor 方法在它将创建的 actor 数量上有一个上限(目前为 25)。

清单 22. createPartition
    } else if ("createPartition".equals(subject)) {
      try {
        Object[] oa = (Object[]) m.getData();
        MapReduceParameters p = (MapReduceParameters) oa[0];
        int index = (Integer) oa[1];
        int count = (Integer) oa[2];
        sleep(500);
        createMapReduceActor(this);
        DefaultMessage lm = new DefaultMessage("mapWorker", 
          new Object[] { p, index, count });
        manager.send(lm, this, getCategory());
      } catch (Exception e) {
        triageException("createPartitionFailed", m, e);
      }
}

清单 23 中的 mapWorker 主题在其分区上通过提供的 MapReducer 调用 map 操作,然后在回复中表明映射分区是完整的:

清单 23. mapWorker
    } else if ("mapWorker".equals(subject)) {
      try {
        Object[] oa = (Object[]) m.getData();
        MapReduceParameters p = (MapReduceParameters) oa[0];
        int index = (Integer) oa[1];
        int count = (Integer) oa[2];
        sleep(100);
        p.mr.map(p.values, p.start, p.end);
        DefaultMessage rm = new DefaultMessage("mapResponse", 
          new Object[] { p, index, count });
        manager.send(rm, this, getCategoryName());
      } catch (Exception e) {
        triageException("mapWorkerFailed", m, e);
      }
}

然后,清单 24 中的 mapResponse 主题会完成 MapReduceParameters 实例(它包含计数)并启动 Reduce 流程:

清单 24. mapResponse
    } else if ("mapResponse".equals(subject)) {
      try {
        Object[] oa = (Object[]) m.getData();
        MapReduceParameters p = (MapReduceParameters) oa[0];
        int index = (Integer) oa[1];
        int count = (Integer) oa[2];
        sleep(100);
        p.complete();
        DefaultMessage rm = new DefaultMessage("reduce", 
          new Object[] { p, index, count });
        manager.send(rm, this, getCategoryName());
      } catch (Exception e) {
        triageException("mapResponseFailed", m, e);
      }
}

接下来,reduce 消息会将请求转发给某个工作线程,如清单 25 所示:

清单 25. reduce
    } else if ("reduce".equals(subject)) {
      try {
        MapReduceParameters p = null;
        int index = 0, count = 0;
        Object o = m.getData();
        if (o instanceof MapReduceParameters) {
          p = (MapReduceParameters) o;
        } else {
          Object[] oa = (Object[]) o;
          p = (MapReduceParameters) oa[0];
          index = (Integer) oa[1];
          count = (Integer) oa[2];
        }
        sleep(100);
        if (p.end - p.start + 1 > 0) {
          createMapReduceActor(this);
          MapReduceParameters xp = new MapReduceParameters(p);
          DefaultMessage lm = new DefaultMessage("reduceWorker", 
            new Object[] { xp, index, count });
          manager.send(lm, this, getCategory());
        }
      } catch (Exception e) {
        triageException("reduceFailed", m, e);
      }
}

清单 26 中的 reduceWorker 主题在其分区上通过提供的 MapReducer 调用 reduce 操作,然后在回复中表明 Reduce 操作已完成。如果所有 Reduce 操作都已完成,则会在回复中表明 Map/Reduce 操作已完成。

清单 26. reduceWorker
    } else if ("reduceWorker".equals(subject)) {
      try {
        Object[] oa = (Object[]) m.getData();
        MapReduceParameters p = (MapReduceParameters) oa[0];
        int index = (Integer) oa[1];
        int count = (Integer) oa[2];
        sleep(100);
        if (index >= 0) {
          p.mr.reduce(p.values, p.start, p.end, p.target, index);
          DefaultMessage rm = new DefaultMessage("reduceResponse", 
            new Object[] { p, index, count });
          manager.send(rm, this, getCategory());
        } else {
          Object[] res = new Object[1];
          p.mr.reduce(p.target, 0, count - 1, res, 0);
          DefaultMessage rm = new DefaultMessage("done", 
            new Object[] { p, res[0] });
          manager.send(rm, this, getCategory());
        }
      } catch (Exception e) {
        triageException("reduceWorkerFailed", m, e);
      }
}

接下来,清单 27 中的 reduceResponse 主题会完成该分区,并测试所有分区是否已完成,然后表明结果:

清单 27. reduceResponse
    } else if ("reduceResponse".equals(subject)) {
      try {
        Object[] oa = (Object[]) m.getData();
        MapReduceParameters p = (MapReduceParameters) oa[0];
        int index = (Integer) oa[1];
        int count = (Integer) oa[2];
        sleep(100);
        p.complete();
        if (p.isSetComplete()) {
          if (count > 0) {
            createMapReduceActor(this);
            MapReduceParameters xp = new MapReduceParameters(p);
            DefaultMessage lm = new DefaultMessage("reduceWorker", 
              new Object[] { xp, -1, count });
            manager.send(lm, this, getCategory());
          }
        }
      } catch (Exception e) {
        triageException("mapResponseFailed", m, e);
      }
}

最后,清单 28 中的 done 主题会报告结果:

清单 28. done
    } else if ("done".equals(subject)) {
      try {
        Object[] oa = (Object[]) m.getData();
        MapReduceParameters p = (MapReduceParameters) oa[0];
        Object res = oa[1];
        sleep(100);
        System.out.printf("**** mapReduce done with result %s", res);
      } catch (Exception e) {
        triageException("mapResponseFailed", m, e);
      }
}

继续执行循环,init 主题启动另一个 Map/Reduce 流程,如清单 29 中所示。为每个 Map/Reduce 提供一个不同的 “集合” 名称,使多个 Map/Reduce 可同时运行。

清单 29. 初始化另一个 Map/Reduce
    } else if ("init".equals(subject)) {
      try {
        Object[] params = (Object[]) m.getData();
        if (params != null) {
          Object[] values = (Object[]) params[0];
          Object[] targets = (Object[]) params[1];
          Class clazz = (Class) params[2];
          MapReduceer mr = (MapReduceer) clazz.newInstance();
          sleep(2 * 1000);
          MapReduceParameters p = new MapReduceParameters("mrSet_" + setCount++, 
            values, targets, mr, this);
          DefaultMessage rm = new DefaultMessage("mapReduce", p);
          manager.send(rm, this, getCategoryName());
        }
      } catch (Exception e) {
        triageException("initFailed", m, e);
      }
    } else {
      System.out.printf("**** MapReduceActor:%s loopBody unexpected subject: %s", 
        getName(), subject);
    }
  }
}

Map/Reduce 主要过程

清单 30 中的 MapReduceActor 实现创建了一些数据值,并在这些数据上运行一个 Map/Reduce。它将分区大小设置为 10。

清单 30. Map/Reduce 主要过程
BigInteger[] values = new BigInteger[1000];
for (int i = 0; i < values.length; i++) {
  values[i] = new BigInteger(Long.toString((long)rand.nextInt(values.length)));
}
BigInteger[] targets = new BigInteger[Math.max(1, values.length / 10)];

// start at least 5 actors
DefaultActorManager am = new DefaultActorManager();
MapReduceActor.createMapReduceActor(am, 10);
MapReduceActor.createMapReduceActor(am, 10);
MapReduceActor.createMapReduceActor(am, 10);
MapReduceActor.createMapReduceActor(am, 10);
MapReduceActor.createMapReduceActor(am, 10);
        
DefaultMessage dm = new DefaultMessage("init", new Object[] 
    { values, targets, SumOfSquaresReducer.class });
am.send(dm, null, MapReduceActor.getCategoryName());

Map/Reduce 是一种最普遍的分而治之设计模式。从最基本的函数编程算法一直到大规模并行处理(Google 用于构建它自己的 Web 搜索引擎索引的类型),都可以看见到它的身影。μJavaActors 库能够以某种直观的方式实现这一高级模式,这凸显了它的强大功能以及潜在的用途。


μJavaActors 库的内幕

管理器对 actor 说:不要找我;我会去找您。

您已看到如何使用 actor 将一些常见的面向对象模式用于其他用途。现在可以考虑一下 μJavaActors 系统的实现细节,即 AbstractActorDefaultActorManager 类。我将仅讨论每个类的关键方法;您可以查看 μJavaActors 源代码 来获取更多实现细节。

AbstractActor

每个 actor 都知道管理它的 ActorManager。actor 使用该管理器帮助它将消息发送给其他 actor。

在清单 31 中,receive 方法有条件地处理一条消息。如果 testMessage 方法返回 null,那么将不会使用任何消息。否则,会从 actor 的消息队列中删除消息,并通过调用 loopBody 方法来处理它。每个具体的 actor 子类都必须提供此方法。无论在哪种情况下,actor 都会通过调用管理器的 awaitMessage 方法来等待更多消息传来。

清单 31. AbstractActor 实现 DefaultActorManager
public abstract class AbstractActor implements Actor {
  protected DefaultActorManager manager;

  @Override
  public boolean receive() {
    Message m = testMessage();
    boolean res = m != null;
    if (res) {
      remove(m);
      try {
        loopBody(m);
      } catch (Exception e) {
        System.out.printf("loop exception: %s%n", e);
      }
    }
    manager.awaitMessage(this);
    return res;
  }

  abstract protected void loopBody(Message m);

每个 actor 都可以实现 willReceive 方法来控制将接受哪些消息主题(表明它将放在消息列表中);默认情况下,会接受所有具有非空主题的消息。每个 actor 还可以实现 testMessage 方法来检查是否有消息可供处理(也就是说,它存在于消息列表中);默认情况下,这一监督工作是通过使用 peekNext 方法来实现的。

清单 32. willReceive()、testMessage() 和 peekNext()
  @Override
  public boolean willReceive(String subject) {
    return !isEmpty(subject); 
  }

  protected Message testMessage() {
    return getMatch(null, false);
  }

  protected Message getMatch(String subject, boolean isRegExpr) {
    Message res = null;
    synchronized (messages) {
      res = peekNext(subject, isRegExpr);
    }
    return res;
  }

消息容量

actor 可具有无限有限 的消息容量。一般而言,有限的容量更好,因为它可帮助检测不受控制的消息发送者。任何客户端(但通常是 ActorManager)均可向 actor 添加未经筛选的消息。请注意,对 messages 列表的所有访问都是异步的。

清单 33. 消息处理
  public static final int DEFAULT_MAX_MESSAGES = 100;
  protected List<DefaultMessage> messages = new LinkedList<DefaultMessage>();

  @Override
  public int getMessageCount() {
    synchronized (messages) {
      return messages.size();
    }
  }

  @Override
  public int getMaxMessageCount() {
    return DEFAULT_MAX_MESSAGES;
  }

  public void addMessage(Message message) {
    synchronized (messages) {
      if (messages.size() < getMaxMessageCount()) {
        messages.add(message);
      } else {
        throw new IllegalStateException("too many messages, cannot add");
      }
    }
  }

  @Override
  public boolean remove(Message message) {
    synchronized (messages) {
      return messages.remove(message);
    }
  }

消息匹配

客户端(具体来讲是 actor 本身)可检查一个 actor 是否拥有挂起的消息。这可用于不按发送顺序处理消息,或者为某些主题提供优先级。消息匹配是通过测试消息主题与一个字符串值的同等性来完成的,或者通过将一个正则表达式与一个参数值匹配来完成的。null 主题匹配任何消息。再次提醒,请注意,对消息列表的所有访问都是异步的。

清单 34. peekNext()
  @Override
  public Message peekNext() {
    return peekNext(null);
  }

  @Override
  public Message peekNext(String subject) {
    return peekNext(subject, false);
  }

  @Override
  public Message peekNext(String subject, boolean isRegExpr) {
    long now = new Date().getTime();
    Message res = null;
    Pattern p = subject != null ? (isRegExpr ? Pattern.compile(subject) : null) : null;
    synchronized (messages) {
      for (DefaultMessage m : messages) {
        if (m.getDelayUntil() <= now) {
          boolean match = subject == null || 
            (isRegExpr ? m.subjectMatches(p) : m.subjectMatches(subject));
          if (match) {
            res = m;
            break;
          }
        }
      }
    }
    return res;
  }

生命周期方法

每个 actor 都有生命周期方法。每次与某个特定 ActorManager 关联时,都会调用 activatedeactivate 方法。每次与某个特定的 ActorManager 关联时还会调用 run 方法,它通常通过自行向 actor 发送启动消息来启动该 actor。run 消息开始消息处理。

清单 35. 生命周期方法
  @Override
  public void activate() {
    // defaults to no action
  }

  @Override
  public void deactivate() {
    // defaults to no action
  }

  /** Do startup processing. */
  protected abstract void runBody();

  @Override
  public void run() {
    runBody();
    ((DefaultActorManager) getManager()).awaitMessage(this);
  }
}

DefaultActorManager

以下字段包含 actor 管理器的状态:

  • actors 包含向管理器注册的所有 actor。
  • runnables 包含已创建但尚未调用其 run 方法的 actor。
  • waiters 包含所有等待消息的 actor。
  • threads 包含管理器启动的所有线程。

请注意,LinkedHashMap 的使用至关重要(对等待者列表尤为如此);否则,一些 actor 可能会急需线程。

清单 36. DefaultActorManager 类和状态
public class DefaultActorManager implements ActorManager {

  public static final int DEFAULT_ACTOR_THREAD_COUNT = 25;

  protected static DefaultActorManager instance;
  public static DefaultActorManager getDefaultInstance() {
    if (instance == null) {
      instance = new DefaultActorManager();
    }
    return instance;
  }

  protected Map<String , AbstractActor> actors = 
    new LinkedHashMap<String , AbstractActor>();

  protected Map<String , AbstractActor> runnables = 
    new LinkedHashMap<String , AbstractActor>();

  protected Map<String , AbstractActor> waiters = 
    new LinkedHashMap<String , AbstractActor>();

  protected List<Thread> threads = new LinkedList<Thread>();

detachActor 方法打破了 actor 和它的管理器之间的关联:

清单 37. actor 终止
  @Override
  public void detachActor(Actor actor) {
    synchronized (actors) {
      actor.deactivate();
      ((AbstractActor)actor).setManager(null);
      String name = actor.getName();
      actors.remove(name);
      runnables.remove(name);
      waiters.remove(name);
    }
  }

发送方法

send 方法家族将一条消息发送给一个或多个 actor。首先需要检查每条消息,查看 actor 是否会接受它。对消息进行排队后,就会使用 notify 唤醒一个线程来处理消息。在发送到某个类别时,只有该类别中的一个 actor(当前具有最少消息的 actor)会实际收到该消息。awaitMessage 方法在 waiters 列表基础上对 actor 排队。

清单 38. DefaultActorManager 类处理一个发送操作
  @Override
  public int send(Message message, Actor from, Actor to) {
    int count = 0;
    AbstractActor aa = (AbstractActor) to;
    if (aa != null) {
      if (aa.willReceive(message.getSubject())) {
        DefaultMessage xmessage = (DefaultMessage) 
           ((DefaultMessage) message).assignSender(from);
        aa.addMessage(xmessage);
        count++;
        synchronized (actors) {
          actors.notifyAll();
        }
      }
    }
    return count;
  }

  @Override
  public int send(Message message, Actor from, Actor[] to) {
    int count = 0;
    for (Actor a : to) {
      count += send(message, from, a);
    }
    return count;
  }

  @Override
  public int send(Message message, Actor from, Collection<Actor> to) {
    int count = 0;
    for (Actor a : to) {
      count += send(message, from, a);
    }
    return count;
  }

  @Override
  public int send(Message message, Actor from, String category) {
    int count = 0;
    Map<String, Actor> xactors = cloneActors();
    List<Actor> catMembers = new LinkedList<Actor>();
    for (String key : xactors.keySet()) {
      Actor to = xactors.get(key);
      if (category.equals(to.getCategory()) && 
            (to.getMessageCount() < to.getMaxMessageCount())) {
        catMembers.add(to);
      }
    }
    // find an actor with lowest message count
    int min = Integer.MAX_VALUE;
    Actor amin = null;
    for (Actor a : catMembers) {
      int mcount = a.getMessageCount();
      if (mcount < min) {
        min = mcount;
        amin = a;
      }
    }
    if (amin != null) {
      count += send(message, from, amin);
    }
    return count;
  }

  @Override
  public int broadcast(Message message, Actor from) {
    int count = 0;
    Map<String, Actor> xactors = cloneActors();
    for (String key : xactors.keySet()) {
      Actor to = xactors.get(key);
      count += send(message, from, to);
    }
    return count;
  }

  public void awaitMessage(AbstractActor a) {
    synchronized (actors) {
      waiters.put(a.getName(), a);
    }
  }

线程池初始化

管理器提供一个低优先级后台线程池,将它分配给 actor,以便处理收到的消息。(请注意,为保持简洁,我们省略了选项处理,它包含在提供的源代码中。)

清单 39. DefaultActorManager 类初始化
  protected static int groupCount;

  @Override
  public void initialize(Map<String, Object> options) {
    int count = getThreadCount(options);
    ThreadGroup tg = new ThreadGroup("ActorManager" + groupCount++);
    for (int i = 0; i < count; i++) {
      Thread t = new Thread(tg, new ActorRunnable(), "actor" + i);
      threads.add(t);
      t.setDaemon(true);
      t.setPriority(Math.max(Thread.MIN_PRIORITY, 
         Thread.currentThread().getPriority() - 1));
    }
    running = true;
    for (Thread t : threads) {
      t.start();
    }
  }

每个 actor 由清单 40 中的 Runnable 实现分派。只要准备好的 actor(具有挂起的消息的 actor)可用,就会将它们分派出去;否则,线程会等待(具有可变的超时)消息到来。

清单 40. 通过一个 Runnable 处理消息
  public class ActorRunnable implements Runnable {
    public void run() {
      int delay = 1;
      while (running) {
        try {
          if (!procesNextActor()) {
            synchronized (actors) {
              actors.wait(delay * 1000);
            }
            delay = Math.max(5, delay + 1);
          } else {
            delay = 1;
          }
        } catch (InterruptedException e) {
        } catch (Exception e) {
          System.out.printf("procesNextActor exception %s%n", e);
        }
      }
    }
  }

procesNextActor 方法首先测试是否存在任何新创建的 actor,然后运行其中一个。否则,它会测试一个等待的 actor。如果有任何等待的 actor,则会分派一个 actor 来处理它的下一条消息。最多一次调用处理一条消息。请注意,所有同步操作都是使用 actors 字段完成的;这减少了发生死锁的可能性。

清单 41. 选择和分派下一个 actor
  protected boolean procesNextActor() {
    boolean run = false, wait = false, res = false;
    AbstractActor a = null;
    synchronized (actors) {
      for (String key : runnables.keySet()) {
        a = runnables.remove(key);
        break;
      }
    }
    if (a != null) {
      run = true;
      a.run();
    } else {
      synchronized (actors) {
        for (String key : waiters.keySet()) {
          a = waiters.remove(key);
          break;
        }
      }
      if (a != null) {
        // then waiting for responses
        wait = true;
        res = a.receive();
      }
    }
    return run || res;
  }

终止方法

可以通过调用 terminateterminateAndWait 方法来请求管理器终止处理。terminate 告诉所有线程尽快停止处理。terminateAndWait 仍会等待线程完成。

清单 42. DefaultActorManager 类终止
@Override
  public void terminateAndWait() {
    terminate();
    for (Thread t : threads) {
      try {
        t.join();
      } catch (InterruptedException e) {
      }
    }
  }

  boolean running;

  @Override
  public void terminate() {
    running = false;
    for(Thread t: threads) {
      t.interrupt();
    }
    synchronized (actors) {
      for (String key : actors.keySet()) {
        actors.get(key).deactivate();
      }
    }
  }

创建方法

create 方法家族构造 actor 并将它们与此管理器关联。create 通过 actor 的类提供,它必须有一个默认的构造函数。此外,actor 可在创建时或以后启动。请注意,此实现需要所有 actor 扩展 AbstractActor

清单 43. 创建和启动 actor
@Override
  public Actor createAndStartActor(Class<? extends Actor> clazz, String name, 
        Map<String, Object> options) {
    Actor res = createActor(clazz, name, options);
    startActor(res);
    return res;
  }

  @Override
  public Actor createActor(Class<? extends Actor> clazz, String name, 
       Map<String, Object> options) {
    AbstractActor a = null;
    synchronized (actors) {
      if (!actors.containsKey(name)) {
        try {
          a = (AbstractActor) clazz.newInstance();
          a.setName(name);
          a.setManager(this);
        } catch (Exception e) {
          throw e instanceof RuntimeException ? 
             (RuntimeException) e : new RuntimeException(
              "mapped exception: " + e, e);
        }
      } else {
        throw new IllegalArgumentException("name already in use: " + name);
      }
    }
    return a;
  }
}

  @Override
  public void startActor(Actor a) {
    a.activate();
    synchronized (actors) {
      String name = a.getName();
      actors.put(name, (AbstractActor) a);
      runnables.put(name, (AbstractActor) a);
    }
  }

结束语

送君千里,终有一别!

在本文中,您学习了如何将一个相对简单的 actor 系统用于各种常见的 Java 编程场景和模式。μJavaActors 库具有灵活的、动态的行为,为 Akka 等更加庞大的 actor 库提供了一个基于 Java 的替代方案。

从代码示例和视频模拟中可以明显看到,μJavaActors 可跨一个执行线程池高效地分配 actor 消息处理工作。而且,可在用户界面中迅速确定是否需要更多线程。该界面还容易确定哪些 actor 渴求工作或者是否有一些 actor 负载过重。

DefaultActorManagerActorManager 接口的默认实现)可保证没有 actor 会一次处理多条消息。因此这会减轻 actor 作者的负担,他们无需处理任何重新输入考虑因素。该实现还不需要 actor 同步,只要:(1) actor 仅使用私有(实例或方法本地的)数据,(2) 消息参数仅由消息发送者编写,以及 (3) 仅由消息接收者读取。

DefaultActorManager 的两个重要的设计参数是线程与 actor 的比率 以及要使用的线程总数。线程数量至少应该与计算机上的处理器一样多,除非一些线程为其他用途而保留。因为线程可能常常空闲(例如,当等待 I/O 时),所以正确的比率常常是线程是处理器的 2 倍或多倍。一般而言,应该有足够的 actor(其实是 actor 之间的消息比率)来保持线程池中大部分时间都很繁忙。(为了获得最佳的响应,应该有一些保留线程可用;通常平均 75% 到 80% 的活动比率最佳。)这意味着 actor 通常比线程更多,因为有时 actor 可能没有任何要处理的挂起消息。当然,您的情况可能有所不同。执行等待操作(比如等待一个人为响应)的 actor 将需要更多线程。(线程在等待时变为 actor 专用的,无法处理其他消息。)

DefaultActorManager 很好地利用了 Java 线程,因为在 actor 处理一条消息时,一个线程仅与一个特定的 actor 关联;否则,它可供其他 actor 自由使用。这允许一个固定大小的线程池为无限数量的 actor 提供服务。结果,需要为给定的工作负载创建的线程更少。这很重要,因为线程是重量级的对象,常常被主机操作系统限制于相对较少数量的实例。μJavaActors 库正是因为这一点而与为每个 actor 分配一个线程的 actor 系统区分开来;如果 actor 没有消息要处理,并且可能限制了可存在的 actor 实例数量,这么做可以让线程实际空闲下来。

在线程切换方面,μJavaActors 实现有很大不同。如果在消息处理完成时有一条新消息需要处理,则不会发生线程切换;而是会重复一个简单循环来处理该新消息。因此,如果等待的消息数量至少与线程一样多,则没有线程是空闲线程,因此不需要进行切换。如果存在足够的处理器(至少一个线程一个),则可以有效地将每个线程分配给一个处理器,而从不会发生线程切换。如果缓冲的消息不足,那么线程将会休眠,但这并不明显,因为只有在没有工作挂起时才会出现负载过重的现象。

用于 JVM 的其他 actor 库

还存在其他用于 JVM 的 actor 解决方案。表 1 简短介绍了它们与 μJavaActors 库的对比特征:

表 1. 对比 JVM actor 库与 μJavaActors
名称访问地址描述与 μJavaActors 对比
Kilimhttp://www.malhar.net/sriram/kilim/一个支持基于轻型线程的多生成者、单使用者邮箱模型的 Java 库。Kilim 需要字节代码调整。在 μJavaActors 中,每个 actor 也是其自身的邮箱,所以不需要独立的邮箱对象。
Akkahttp://akka.io/尝试使用函数语言模拟 actor 的模式匹配,一般使用 instanceof 类型检查(但 μJavaActors 一般使用字符串同等性或正则表达式匹配)。Akka 功能更多(比如支持分布式 actor),因此比 μJavaActors 更大且有可能更复杂。
GParshttp://gpars.codehaus.org/ActorGroovy Actor 库。类似于 μJavaActors,但更适合 Groovy 开发人员。

请注意,表 1 中的一些 JVM actor 解决方案添加了同步发送功能(也就是发送者需要等待回复)。尽管很方便,但这可能导致更低的消息处理公平性和/或对 actor 的更少的重新输入调用。μJavaActors 使用了 POJT(纯旧 Java 线程)和标准线程显示器,它是一种更加传统的实现。其他这些方法中的一些方法为提供它们自己的线程模型提供了专门支持。μJavaActors 是一个纯 Java 库;要使用它,仅需确保它的 JAR 位于类路径上即可,此外,它不需要字节代码操作或其他特殊操作。

增强 μJavaActors

当然,还有改进或扩展 μJavaActors 库的空间。如果您感兴趣,我总结了以下可能性:

  • 在一个类别中重新分配挂起的消息:目前,在发送时会为消息分配 round-robin,而不会在以后重新均衡。
  • 允许基于优先级的 actor 执行:目前,所有 actor 都在具有同等优先级的线程上执行;如果存在具有不同优先级的线程(或线程池)并且可在条件更改后向这些线程分配 actor,那么系统可能更加灵活。
  • 允许优先级消息:目前,消息通常按发送顺序处理,允许优先级处理将支持更灵活的处理。
  • 允许 actor 处理来自多个类别的消息:目前,一次仅允许处理一个类别的消息。
  • 可以通过实现优化来减少线程切换,进而提高潜在的消息处理速率:这样做的代价将是更高的复杂性。
  • 分布式 actor:目前,actor 必须都在一个 JVM 中运行;跨 JVM 执行将是一种强大的扩展。

下载

描述名字大小
actor 运行时和 actor 演示源代码j-javaactors.jar104KB
Java 源文件j-javaactors.zip47KB

参考资料

学习

获得产品和技术

讨论

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Java technology
ArticleID=825917
ArticleTitle=一个用于实现并行执行的 Java actor 库
publish-date=07162012