JVM 并发性

使用 Akka 构建 actor 应用程序

从基础操作进阶到构建使用 actor 交互的应用程序

Comments

系列内容:

此内容是该系列 # 部分中的第 # 部分: JVM 并发性

敬请期待该系列的后续内容。

此内容是该系列的一部分:JVM 并发性

敬请期待该系列的后续内容。

JVM 并发性:使用 Akka 执行异步操作” 介绍了 actor 模型Akka 框架和运行时。构建 actor 应用程序与构建传统的线性应用程序不同。采用线性方法时,您会考虑完成目标所涉及的控制流和步骤顺序。要有效地使用 actor 模型,需要将应用程序分解为不同的状态和行为包 (actor),为这些包之间的交互(消息)编写脚本。这两个组件(actor 和消息)是您应用程序的构建块。

如果正确地创建 actor 和消息,最终会得到一个大部分操作都异步进行的系统。异步操作比线性方法更难理解,但它换来的是可伸缩性。高度异步的程序能够更好地使用更多的系统资源(例如内存和处理器)来更快地完成特定任务,或者并行处理任务的更多实例。借助 Akka,您甚至可以将此可伸缩性扩展到多个系统,使用远程操作来处理分布式 actor。

在本文中,您将进一步了解与构建系统相关的 actor 和消息方面。两个示例应用程序中的第一个示例将展示 Akka 中的 actor 和消息工作原理的基本知识。第二个更详细的示例将演示如何规划和可视化 actor 系统的结构。两个示例都使用了 Scala 代码,但 Java 开发人员很容易理解它们(要获得有关的帮助,请参阅本系列中 以前的文章,获取使用 Akka 的 Scala 和 Java 编程的并列示例。)

下载本文的 示例代码

了解 star

上一篇文章中的示例使用了:

  • 启动 actor 系统的主要应用程序直接创建的 Actor
  • 一种 actor 类型
  • actor 之间的交互极少

对于第一个示例应用程序,我使用了稍微复杂一点的结构,我将逐步详细介绍该结构。清单 1 显示了整个应用程序。

清单 1. 各代 star
import scala.concurrent.duration._
import scala.util.Random
import akka.actor._
import akka.util._
object Stars1 extends App {
  import Star._
  val starBaseLifetime = 5000 millis
  val starVariableLifetime = 2000 millis
  val starBaseSpawntime = 2000 millis
  val starVariableSpawntime = 1000 millis

  object Namer {
    case object GetName
    case class SetName(name: String)
    def props(names: Array[String]): Props = Props(new Namer(names))
  }
  class Namer(names: Array[String]) extends Actor {
    import context.dispatcher
    import Namer._

    context.setReceiveTimeout(starBaseSpawntime + starVariableSpawntime)

    def receive = {
      case GetName => {
        val name = ... 
        sender ! SetName(name)
      }
      case ReceiveTimeout => {
        println("Namer receive timeout, shutting down system")
        system shutdown
      }
    }
  }

  object Star {
    case class Greet(peer: ActorRef)
    case object AskName
    case class TellName(name: String)
    case object Spawn
    case object IntroduceMe
    case object Die
    def props(greeting: String, gennum: Int, parent: String) = Props(new Star(greeting, gennum, parent))
  }
  class Star(greeting: String, gennum: Int, parent: String) extends Actor {
    import context.dispatcher
    var myName: String = ""
    var starsKnown = Map[String, ActorRef]()
    val random = Random
    val namer = context actorSelection namerPath
    namer ! Namer.GetName

    def scaledDuration(base: FiniteDuration, variable: FiniteDuration) =
      base + variable * random.nextInt(1000) / 1000

    val killtime = scaledDuration(starBaseLifetime, starVariableLifetime)
    val killer = scheduler.scheduleOnce(killtime, self, Die)
    val spawntime = scaledDuration(starBaseSpawntime, starVariableSpawntime)
    val spawner = scheduler.schedule(spawntime, 1 second, self, Spawn)
    if (gennum > 1) scheduler.scheduleOnce(1 second, context.parent, IntroduceMe)

    def receive = {
      case Namer.SetName(name) => {
        myName = name
        println(s"$name is the ${gennum}th generation child of $parent")
        context become named
      }
    }
    def named: Receive = {
      case Greet(peer) => peer ! AskName
      case AskName => sender ! TellName(myName)
      case TellName(name) => {
        println(s"$myName says: '$greeting, $name'")
        starsKnown += name -> sender
      }
      case Spawn => {
        println(s"$myName says: A star is born!")
        context.actorOf(props(greeting, gennum + 1, myName))
      }
      case IntroduceMe => starsKnown.foreach {
        case (name, ref) => ref ! Greet(sender)
      }
      case Die => {
        println(s"$myName says: 'I'd like to thank the Academy...'")
        context stop self
      }
    }
  }

  val namerPath = "/user/namer"
  val system = ActorSystem("actor-demo-scala")
  val scheduler = system.scheduler
  system.actorOf(Namer.props(Array("Bob", "Alice", "Rock", "Paper", "Scissors", 
    "North", "South", "East", "West", "Up", "Down")), "namer")
  val star1 = system.actorOf(props("Howya doing", 1, "Nobody"))
  val star2 = system.actorOf(props("Happy to meet you", 1, "Nobody"))
  Thread sleep 500
  star1 ! Greet(star2)
  star2 ! Greet(star1)
}

此应用程序创建了一个包含两种 actor 类型的 actor 系统:NamerStarNamer actor 是一个单例对象(singleton),它实际上是一个保存名称的中央目录。Star actor 从 Namer 获取它们的(屏幕)名称,然后向其他 Star 打印问候消息,就像 上一期 的示例一样。但它们还生成子 Star,将这些子 star 介绍给它们认识的 Star;而且 Star actor 最终可能死亡。

清单 2 是您在运行此应用程序时可能看到的输出示例。

清单 2. 应用程序输出
Bob is the 1th generation child of Nobody
Alice is the 1th generation child of Nobody
Bob says: 'Howya doing, Alice'
Alice says: 'Happy to meet you, Bob'
Bob says: A star is born!
Rock is the 2th generation child of Bob
Alice says: A star is born!
Paper is the 2th generation child of Alice
Bob says: A star is born!
Scissors is the 2th generation child of Bob
Alice says: 'Happy to meet you, Rock'
Alice says: A star is born!
North is the 2th generation child of Alice
Bob says: 'Howya doing, Paper'
Rock says: 'Howya doing, Paper'
Bob says: A star is born!
South is the 2th generation child of Bob
Alice says: 'Happy to meet you, Scissors'
Paper says: 'Happy to meet you, Scissors'
Alice says: A star is born!
East is the 2th generation child of Alice
Bob says: 'Howya doing, North'
Rock says: 'Howya doing, North'
Scissors says: 'Howya doing, North'
Paper says: A star is born!
West is the 3th generation child of Paper
Rock says: A star is born!
Up is the 3th generation child of Rock
Bob says: A star is born!
Down is the 2th generation child of Bob
Alice says: 'Happy to meet you, South'
North says: 'Happy to meet you, South'
Paper says: 'Happy to meet you, South'
Scissors says: A star is born!
Bob-Bob is the 3th generation child of Scissors
Alice says: A star is born!
Bob-Alice is the 2th generation child of Alice
Scissors says: 'Howya doing, East'
Rock says: 'Howya doing, East'
Bob says: 'Howya doing, East'
South says: 'Howya doing, East'
North says: A star is born!
Bob-Rock is the 3th generation child of North
Paper says: A star is born!
Bob-Paper is the 3th generation child of Paper
Bob says: 'I'd like to thank the Academy...'
Scissors says: 'Howya doing, West'
South says: 'Howya doing, West'
Alice says: A star is born!
Bob-Scissors is the 2th generation child of Alice
North says: A star is born!
Bob-North is the 3th generation child of North
Paper says: A star is born!
Bob-South is the 3th generation child of Paper
Alice says: 'I'd like to thank the Academy...'
Namer receive timeout, shutting down system

各代 star

不同于真实世界的演员,Star actor 不会以戏剧性和公开的方式产生后代;每次收到一条 Spawn 消息时,它们都会安静地弹出一个子 star。它们在此事件中惟一的兴奋迹象是简单的出生公告 “A star is born!”同样地,不同于真实世界的演员,自豪的新父亲 Star 甚至无法公布其新孩子的姓名,该名称由命名机构确定。命名刚诞生的 Star 后,Namer 将子 Star 的名称和细节打印在表的一行中:“Ted is the 2th generation child of Bob”。

一个 Star 的死亡由 Star 收到一条 Die 消息来触发,作为响应,它会打印一条消息 “I'd like to thank the Academy...”。Star 然后执行 context stop self 语句,告知控制 Akka actor 上下文它的使命已完成,应该将其关闭。然后,该上下文负责所有清理工作,从系统中删除该 actor。

更改角色

真实世界的演员可扮演许多不同的角色。Akka actor 还可以通过更改消息处理函数方法来扮演不同角色。您可以在 Star actor 中看到这一点,其中默认的receive 方法仅处理 SetName 消息,其他所有消息由 named 方法来处理。移交过程发生在 SetName 消息的处理过程中,由 context become named 语句完成。此角色更改的目的是,让 Star 在被命名之前无法做任何事情,而且在为它命名后,绝不能重新命名。

您始终可以在单个 receive 方法中完成所有消息处理,但这样做通常会得到凌乱的代码,其中包含基于当前 actor 状态的条件语句。对不同的状态使用不同的 receive 方法,这样可以保持您的代码干净而又紧凑。一般而言,任何时候您都应该有一个适合一条不同消息的 actor 状态时,您还应该使用一种新 receive 方法来表示该状态。

在更改 actor 角色时,您需要小心,不要排除对有效消息的处理。例如,如果允许 Star actor 在任何时候进行重命名,那么 清单 1 中的 named 方法需要处理 SetName 消息。任何未由 actor 的当前 receive 方法处理的消息都会被丢弃(实际上会默认发送到一个死信邮箱,但在您的用户 actor 看来是被丢弃了)。

作为更改消息处理函数的替代方案,还可以将当前的消息处理函数推送到一个堆栈上,使用双参数表单 become(named, false) 来设置一个新处理函数。最终可以使用 context unbecome 调用还原最初的处理函数。您可以采用这种方式将对 become/unbecome 的调用嵌套到想要的深度,但必须小心,该代码最终会对每个 become 执行一次 unbecome 匹配。任何不匹配的 become 表示一次内存泄漏。

Namer actor

Namer actor 会在其构造函数中传递一个名称字符串数组。每次收到一条 GetName 消息,它都会在一条 SetName 消息中返回数组中的下一个名称,在用完简单名称时使用带连字符的名称。Namer actor 的作用是将名称(理想情况下是惟一名称)分配给 Star actor,所以在此系统中没有理由拥有多个 Namer 实例。启动 actor 系统的应用程序代码直接创建这个单体实例,所以它可供每个 Star 使用。

因为该应用程序创建了 Namer 单体实例,所以它可将此 actor 的一个 ActorRef 传递给每个 StarStar actor 可将它传递给其子对象。但 Akka 提供了一种更干净的方式来处理这种类型的已知 actor。Star actor 初始化代码中的 val namer = context actorSelection namerPath 行按照 actor 系统中的路径来查找 Namer actor,在本例中,路径为 /user/namer。(应该将 /user 前缀用于所有用户创建的 actor,namer 是在使用 system.actorOf 创建 Namer actor 时设置的名称。)namer 值对应用程序中包含的所有 actor 可见,所以可以在需要时直接使用它。

计划性消息

清单 1 中的示例使用了多条计划性消息来提示各种 actor。Star actor 在初始化期间创建了两条或三条计划性消息。val killer = scheduler.scheduleOnce(killtime, self, Die) 语句创建一个一次性消息计划程序,通过在该阶段结束时发送一个 Die 消息来触发 Star 的死亡。val spawner = scheduler.schedule(spawntime, 1 second, self, Spawn) 语句创建一个反复性计划程序,它在一个初步延迟后以 1 秒的间隔发送 Spawn 消息来填充 Star 的新操作。

Star 的第三种类型的计划性消息仅在 Star 是另一个 Star 的后代(而不是通过 actor 系统外的应用程序代码创建)时使用。if (gennum > 1) scheduler.scheduleOnce(1 second, context.parent, IntroduceMe) 语句创建了一条计划性消息,如果新的 Star 是第二代或更早的辈分,该消息会在初始化该 Star 后一秒发送到 Star 的父对象。当父 Star 收到此消息时,它会将一条 Greet 消息发送给它引入的其他每个 Star,要求这些已知的 Star 向子对象介绍自己。

Namer actor 也使用了一条计划性消息,这条消息具有接收超时的格式。context.setReceiveTimeout(starBaseSpawntime + starVariableSpawntime) 语句为生产 star 的最长时间设置了一个超时。每次 actor 收到一条消息时,上下文都会重置此超时,以便仅在经历指定的时间且没有收到任何消息时触发超时。Star 不断创建新的子 Star 来向 Namer 发送消息,所以该超时仅在所有 Star actor 都消失后才会发生。如果该超时发生,Namer 通过关闭整个 actor 系统来处理最终的 ReceiveTimeout 消息(在 akka.actor 包中定义)。

眼尖的读者可能想知道 Namer 超时如何发生。一个 Star 的生存期始终不低于 5 秒,而且每个 Star 在一段时间(最大 3 秒)后开始生成子 Star,所以Star似乎 在不断增多(就像真实的电视中一样)。此过程的原理是什么呢?答案在于 Akka actor 监督 模型和父子关系。

actor 家族

Akka 基于亲子关系而对 actor 采取一种监督分层结构。当一个 actor 创建了另一个 actor 时,所创建的 actor 就变成了最初的 actor 的一个下级。这意味着父 actor 负责管理它的子 actor(我们通常希望将此原则应用于真实世界的 actor)。此责任主要关系到 故障处理,但它对 actor 的工作方式具有一定的影响。

监督分层结构是 清单 1 中的 actor 系统关闭的原因。因为该分层结构需要父 actor 可用,所以终止一个父 actor 会自动终止它的所有子 actor。在 清单 1 中,只有两个 Star actor 是该应用程序最初创建的(它们的名称始终为 BobAlice)。其他所有 Star 由这两个最初的 Star 之一创建,或者由它们的一个子孙 Star 创建。所以当这些根 Star 中的每一个终止时,它会带走它的所有子孙后代。当二者都终止时,不会剩下任何 Star。没有任何 Star 生成子 Star,没有名称请求会发送到 Namer,所以 Namer 超时最终被触发,系统将关闭。

更复杂的 actor 系统

您在 清单 1 中看到了一个简单 actor 系统的工作原理的示例。但真实的应用程序系统往往拥有更多类型的 actor(通常包含数十或数百个)和 actor 之间更复杂的交互。设计和组织复杂 actor 系统的最佳方式之一是指定 actor 之间的消息流。

为了得到更复杂的示例,我扩展了 清单 1 中的应用程序,以便实现一个简单的电影制作模型。此模型使用 4 种主要的 actor 类型和两种专业的辅助 actor 类型:

  • Star:一位出演电影的演员
  • Scout:一位寻找新 Star 的星探
  • Academy:一个跟踪所有活动的单体 Star 的注册表
  • Director:电影制作者
    • CastingAssistant:帮助为影片选派演员的 Director 助手
    • ProductionAssistant:帮助制作影片的 Director 助手

清单 1 中的 Star 一样,此应用程序中的 Star actor 拥有有限的生存期。当一个 Director 开始制作一部电影时,它会获得当前要出演该影片的活动 Star 的列表。首先,Director 需要获取承诺出演影片的 Star,然后,在所有 Star 都承诺出演后,开始制作该影片。如果影片中的任何 Star 在影片完成前退出交易(或者用 actor 的术语来讲,star 死亡了),影片将会失败。

消息图表

清单 1 中的应用程序非常简单,我只是简要地解释了一下 actor 交互。这个复杂得多的新应用程序需要一种更好的方式来表示这些交互。消息传递图是显示这些交互的一种好方式。图 1 显示了一个 Scout 寻找一位新 Star(或用 actor 的术语来讲,创建一个 Star)和新 StarAcademy 注册所涉及的交互顺序。

图 1. Star 创建和初始化
创建和初始化 Star 的消息传递图
创建和初始化 Star 的消息传递图

这是添加一个 Star 所涉及的消息顺序(和创建步骤):

  1. FindTalent(从 SchedulerScout):触发添加一个新 Star 的过程。
  2. GetName(从 ScoutAcademy):为 Star 分配一个名称。
  3. GiveName(从 Academy 响应):提供分配的名称。
  4. actorOf()Scout 使用提供的名称创建新 Star actor 。
  5. Register(从 StarAcademy):向 Academy 注册 Star

此消息顺序的设计可扩展且灵活。每条消息都可以采用隔离方式进行处理,所以 actor 不需要更改其内部状态来处理消息交换。(Academy 单体对象会更改状态,但这是该消息交换的完整用途的一部分。)因为没有内部状态发生更改,所以您不需要严格按照这个顺序发送消息。例如,您可以通过向 Academy 发送多条 GetName 消息,让 FindTalent 消息创建多个 Star。甚至可以在完成最后一个 Star 的创建之前,连续处理多条 FindTalent 消息。还可以向您系统添加任意数量的 Scout actor,让它们独立地、没有冲突地运行。

制作一部电影是一个比创建一个新 Star 复杂得多的流程,涉及到更多的状态更改和潜在的故障条件。图 2 显示了制作一部电影所涉及的主要应用程序消息:

图 2. 制作一部电影
制作电影的消息传递图
制作电影的消息传递图

这是制作一部电影所涉及的消息序列,我们主要查看了一些一切正常、没有故障的愉快路径:

  1. MakeMovie(从 SchedulerDirector):触发一部电影的启动。
  2. PickStars(从 DirectorAcademy):选择要出演该电影的 Star
  3. StarsPickedPickFailure(来自 Academy 的响应):如果有足够多的 Star 可出演该电影,Academy 会选择需要的数量并在一条 StarsPicked 消息中发送回该列表;否则,Academy 发送一个 PickFailure 响应。
  4. actorOf()Director 创建一个 CastingAssistant actor 来处理电影的演员阵容。
  5. OfferRole(影片中的每个 StarCastingAssistant):CastingAssistantStar 提供角色。
  6. AcceptRoleRejectRole(来自每个 Star 的响应):一个 Star 如果已扮演另一个角色,它会拒绝所提供的角色,否则接受该角色。
  7. AllSignedCastingFailure(父 actor 的 CastingAssistant):当所有 Star 都接受其角色后,CastingAssistant 的工作就完成了,然后它会通过 AllSigned 消息向父 Director 传递成功消息;如果无法安排出演的 Star(具体地讲,如果某个演员死了),CastingAssistant 会向父 actor 传递失败消息。无论如何,CastingAssistant 都会完成并终止。
  8. actorOf()Director 创建一个 ProductionAssistant actor 来处理影片的拍摄。
  9. ProductionComplete(从 SchedulerProductionAssistant):在需要的时间过去后触发影片的完成。
  10. ProductionCompleteProductionFailure(父 actor 的 ProductionAssistant):当为影片的完成触发计时器后,ProductionAssistant 向其父 actor 报告影片已完成。
  11. RoleComplete(影片中每个 StarProductionAssistant):ProductionAssistant 还需要通知每个 Star,影片已完成,以便它们可出演其他影片。

此消息序列在处理过程中使用了一些 actor 的状态更改。Star 需要在可用和正在出演某部影片之间更改状态。CastingAssistant actor 需要跟踪哪些 Star 已接受要拍摄的影片中的角色,所以它们知道仍需招募哪些 actor。但 Director actor 不需要更改状态,因为它们仅响应它们收到的消息(包括来自其子 actor 的消息)。ProductionAssistant actor 也不需要更改状态,因为它们只需要在影片终止时告知其他 actor。

可避免使用分开的 CastingAssistantProductionAssistant actor,将它们的功能合并到 Director actor 中。但消除其他 actor 会使 Director 更加复杂,而且在本例中,将该功能分离到其他 actor 中更有意义。在考虑故障处理时尤其如此。

处理故障

应用程序的一个重要方面是省略 图 1图 2 中的消息流。Star 具有有限的生存期,所以所有处理 Star 的 actor 都需要知道 star 何时死亡。具体地讲,如果一个已被选择出演一部影片的 Star 在影片完成之前死亡,该影片一定会失败。

Akka actor 系统中的故障处理会使用家长监督功能,其中故障条件会沿 actor 分层结构向上传递。故障通常在 JVM 中表示为异常,所以 Akka 使用自然的异常处理方式来检测故障何时发生。如果一个 actor 没有处理其自己的代码中的异常,Akka 会通过终止该 actor 并将故障传递给父 actor 来处理这个未捕获的异常。父 actor 然后可以处理该故障,或者再将故障传递给它的父 actor。

Akka 内置的故障处理非常适合 I/O 相关故障条件,但对于影片制作系统,异常可能过于复杂。在这种情况下,需要监视其他 actor,幸运的是,Akka 提供了一种实现此操作的轻松方式。通过使用 actor 系统的 DeathWatch 组件,actor 可以注册自己来观察其他任何 actor。完成注册之后,如果被观察的 actor 死亡,观察 actor 会收到一条系统 Terminated 消息。(为了避免任何竞争条件,如果被观察的 actor 在开始观察之前已经死亡,Terminated 消息会立即显示在观察 actor 的邮箱中。)

DeathWatch 通过调用 context.watch() 方法来激活,该方法接受要观察的 actor 的 ActorRef。受关注的 actor 死亡时发出的最终 Terminated 消息,是影片制作示例需要完成的所有故障处理。

Star 创建代码

清单 3 显示了启动该应用程序并创建新 Star 所涉及的代码,与 图 1 中所示的消息流相匹配。

清单 3. 创建 Star 的代码
object Stars2 extends App {  object Scout {
    case object FindTalent
    val starBaseLifetime = 7 seconds
    val starVariableLifetime = 3 seconds
    val findBaseTime = 1 seconds
    val findVariableTime = 3 seconds
    def props(): Props = Props(new Scout())
}  
class Scout extends Actor {
    import Scout._
    import Academy._
    import context.dispatcher

    val random = Random
    scheduleFind

    def scheduleFind = {
      val nextTime = scaledDuration(findBaseTime, findVariableTime)
      scheduler.scheduleOnce(nextTime, self, FindTalent)
    }

    def scaledDuration(base: FiniteDuration, variable: FiniteDuration) =
      base + variable * random.nextInt(1000) / 1000

    def receive = {
      case FindTalent => academy ! GetName
      case GiveName(name) => {
        system.actorOf(Star.props(name, scaledDuration(starBaseLifetime, starVariableLifetime)), name)
        println(s"$name has been discovered")
        scheduleFind
      }
    }
  }

  object Academy {
    case object GetName
    case class GiveName(name: String)
    case class Register(name: String)
    ...
    def props(names: Array[String]): Props = Props(new Academy(names))
  }  
class Academy(names: Array[String]) extends Actor {
    import Academy._

    var nextNameIndex = 0
    val nameIndexLimit = names.length * (names.length + 1)
    val liveStars = Buffer[(ActorRef, String)]()
    ...
    def receive = {
      case GetName => {
        val name =
          if (nextNameIndex < names.length) names(nextNameIndex)
          else {
            val first = nextNameIndex / names.length - 1
            val second = nextNameIndex % names.length
            names(first) + "-" + names(second)
          }
        sender ! GiveName(name)
        nextNameIndex = (nextNameIndex + 1) % nameIndexLimit
      }
      case Register(name) => {
        liveStars += ((sender, name))
        context.watch(sender)
        println(s"Academy now tracking ${liveStars.size} stars")
      }
     case Terminated(ref) => {
        val star = (liveStars.find(_._1 == ref)).get
        liveStars -= star
        println(s"${star._2} has left the business\nAcademy now tracking ${liveStars.size} Stars")
      }
      ...
      }
    }
  }

  object Star {
    ...
    def props(name: String, lifespan: FiniteDuration) = Props(new Star(name, lifespan))
  }  
class Star(name: String, lifespan: FiniteDuration) extends Actor {
    import Star._
    import context.dispatcher

    academy ! Academy.Register(name)

    scheduler.scheduleOnce(lifespan, self, PoisonPill)
  }
  ...
  val system = ActorSystem("actor-demo-scala")
  val scheduler = system.scheduler
  val academy = system.actorOf(Academy.props(Array("Bob", "Alice", "Rock", 
    "Paper", "Scissors", "North", "South", "East",  "West", "Up", "Down")), "Academy")
  system.actorOf(Scout.props(), "Sam")
  system.actorOf(Scout.props(), "Dean")
  system.actorOf(Director.props("Astro"), "Astro")
  system.actorOf(Director.props("Cosmo"), "Cosmo")
  Thread sleep 15000
  system.shutdown
}

清单 3 主要使用了与 清单 1 中的 Star 示例相同的 Akka 功能,但添加了激活 DeathWatchcontext.watch() 调用,该调用由 Academy actor 在处理来自新 StarRegister 消息时执行。Academy actor 同时记录 ActorRef 和每个 Star 的名称,而且在处理一条 Terminated 消息时,它使用 ActorRef 查找并删除死亡的 Star。这样,活着的 StarBuffer(基本来讲是一个 ArrayList)会保持最新状态。

主要应用程序代码首先创建单体的 Academy actor,然后创建一对 Scout,最后创建一对 Director。该应用程序允许 actor 系统运行 15 秒,然后关闭该系统并退出。

启动一部影片的制作

清单 4 给出了制作一部影片所涉及的的代码的第一部分:安排出演影片的 Star。此代码与 图 2 的消息流中的顶部代码相匹配,包括 Scheduler 及一个 DirectorAcademy actor 之间的交互。

清单 4. 影片制作代码
object Stars2 extends App {
  ...
  object Director {
    case object MakeMovie

    val starCountBase = 2
    val starCountVariable = 4
    val productionTime = 3 seconds
    val recoveryTime = 3 seconds

    def props(name: String) = Props(new Director(name))
  }

  class Director(name: String) extends Actor {
    import Academy._
    import Director._
    import ProductionAssistant._
    import context.dispatcher

    val random = Random

    def makeMovie = {
      val numstars = random.nextInt(starCountVariable) + starCountBase
      academy ! PickStars(numstars)
    }
    def retryMovie = scheduler.scheduleOnce(recoveryTime, self, MakeMovie)
    makeMovie

    def receive = {
      case MakeMovie => makeMovie
      case PickFailure => retryMovie
      case StarsPicked(stars) => {
        println(s"$name wants to make a movie with ${stars.length} actors")
        context.actorOf(CastingAssistant.props(name, stars.map(_._1)), name + ":Casting")
        context become casting
      }
    }
    ...
  }
  ...
  object Academy {
    ...
    case class PickStars(count: Int)
    case object PickFailure
    case class StarsPicked(ref: List[(ActorRef, String)])

    def props(names: Array[String]): Props = Props(new Academy(names))
  }

  class Academy(names: Array[String]) extends Actor {
    ...
    def pickStars(n: Int): Seq[(ActorRef, String)] = ...
    
    def receive = {
      ...
      case PickStars(n) => {
        if (liveStars.size < n) sender ! PickFailure
        else sender ! StarsPicked(pickStars(n).toList)
      }
    }
  }

清单 4 的代码的开头给出了 Director 对象和 actor 定义的一部分,显示了如何通过 SchedulerDirector 发送一条 MakeMovie 消息来触发影片制作的启动。Director 在收到这条 MakeMovie 消息时启动影片制作流程,通过 PickStars 消息请求 AcademyStar 分配给影片。处理 PickStars 消息的 Academy 代码(显示在 清单 4 的末尾处)发回一条 PickFailure(如果没有足够的 Star 可用)或一条 StarsPicked 消息。如果 Director 收到一条 PickFailure 消息,它会计划以后进行另一次尝试。如果 Director 收到一条 StarsPicked 消息,它会启动一个 CastingAssistant actor,并获得 Academy 为影片中的角色选择的 Star 列表,然后更改状态来处理来自 CastingAssistant 的响应。清单 5 与此刻相衔接,以 Director actor 转换 Receive 方法开始。

清单 5. CastingAssistant 操作
  class Director(name: String) extends Actor {
    ...
    def casting: Receive = {
      case CastingAssistant.AllSigned(stars) => {
        println(s"$name cast ${stars.length} actors for movie, starting production")
        context.actorOf(ProductionAssistant.props(productionTime, stars), name + ":Production")
        context become making
      }
      case CastingAssistant.CastingFailure => {
        println(s"$name failed casting a movie")
        retryMovie
        context become receive
      }
    }
    ...
  }

object CastingAssistant {
    case class AllSigned(stars: List[ActorRef])
    case object CastingFailure

    val retryTime = 1 second

    def props(dirname: String, stars: List[ActorRef]) = Props(new CastingAssistant(dirname, stars))
  }

  class CastingAssistant(dirname: String, stars: List[ActorRef]) extends Actor {
    import CastingAssistant._
    import Star._
    import context.dispatcher

    var signed = Set[ActorRef]()
    stars.foreach { star =>
      {
        star ! OfferRole
        context.watch(star)
      }
    }

    def receive = {
      case AcceptRole => {
        signed += sender
        println(s"Signed star ${signed.size} of ${stars.size} for director $dirname")
        if (signed.size == stars.size) {
          context.parent ! AllSigned(stars)
          context.stop(self)
        }
      }
      case RejectRole => scheduler.scheduleOnce(retryTime, sender, OfferRole)
      case Terminated(ref) => {
        context.parent ! CastingFailure
        stars.foreach { _ ! Star.CancelOffer }
        context.stop(self)
      }
    }
  }

  object Star {
    case object OfferRole
    case object AcceptRole
    case object RejectRole
    case object CancelOffer
    case object RoleComplete
    ...
  }

  class Star(name: String, lifespan: FiniteDuration) extends Actor {
    ...
    var acceptedOffer: ActorRef = null

    scheduler.scheduleOnce(lifespan, self, PoisonPill)

    def receive = {
      case OfferRole => {
        sender ! AcceptRole
        acceptedOffer = sender
        context become booked
      }
    }

    def booked: Receive = {
      case OfferRole => sender ! RejectRole
      case CancelOffer => if (sender == acceptedOffer) context become receive
      case RoleComplete => context become receive
    }
  }

Director 使用要出演影片的 StarActorRef 列表来创建 CastingAssistantCastingAssistant 首先将一个 OfferRole 发送给每个 Star,并在每个 Star 上将自己注册为观察者。然后,CastingAssistant 等待从每个 Star 返回一条 AcceptRoleRejectRole 消息,或者从 actor 系统返回一条 Terminated 消息报告一位 Star 死亡。

如果 CastingAssistant 从演员阵容中的每个 Star 收到 AcceptRole,它会将一条 AllSigned 消息发回给它的父 Director。为了简便起见,此消息包含 Star actorRef 的列表,因为需要传递这个列表来执行下一个处理步骤。

如果 CastingAssistant 从任何 Star 收到一条 RejectRole 消息,它会计划在一定的延迟后将 OfferRole 重新发送给同一个 actor。(Star 通常是无法访问的,所以,如果您希望它们出演您的影片,则需要不断发出请求,直到它们接受。)

如果 CastingAssistant 获得一条 Terminated 消息,这意味着为影片选择的一位 Star 已死亡。在这种令人遗憾的情况下,CastingAssistant 会将一个 CastingFailure 报告给它的父 Director 并结束自己。但是在它结束之前,它会向其列表中的每个 Star 发送一条 CancelOffer 消息,以便任何已承诺出演该影片中的角色的 Star 可出演其他角色。

您可能想知道为什么 CastingAssistantCancelOffer 消息发送给每个Star,甚至发送给尚未处理发来的 AcceptRole 消息的 star。原因在于列表中的一个 Star 可能已发送了一个 AcceptRole,但它在处理 Terminated 消息时仍在邮箱中。在分布式 actor 系统的一般情况下,可能已接受 Star,但 AcceptRole 消息仍在传输或已丢失。将 CancelOffer 消息发送给每个 Star,会使每种情况下的故障处理更干净,此外,如果 Star 未接受要拍摄的影片中的一个角色,可以轻松地忽略 CancelOffer 消息。

清单 6 显示了影片制作流程的最后一部分:ProductionAssistant actor 的操作(与 图 2 的右下部分相匹配)。这一部分很简单,因为 ProductionAssistant 只需处理 SchedulerProductionComplete 消息或 Terminated 消息。

清单 6. ProductionAssistant 操作
  class Director(name: String) extends Actor {
    ...
    def making: Receive = {
      case m: ProductionAssistant.ProductionEnd => {
        m match {
          case ProductionComplete => println(s"$name made a movie!")
          case ProductionFailed => println(s"$name failed making a movie")
        }
        makeMovie
        context become receive
      }
    }
  }

  object ProductionAssistant {
    sealed trait ProductionEnd
    case object ProductionComplete extends ProductionEnd
    case object ProductionFailed extends ProductionEnd

    def props(time: FiniteDuration, stars: List[ActorRef]) = Props(new ProductionAssistant(time, stars))
  }

  class ProductionAssistant(time: FiniteDuration, stars: List[ActorRef]) extends Actor {
    import ProductionAssistant._
    import context.dispatcher

    stars.foreach { star => context.watch(star) }
    scheduler.scheduleOnce(time, self, ProductionComplete)

    def endProduction(end: ProductionEnd) = {
      context.parent ! end
      stars.foreach { star => star ! Star.RoleComplete }
      context.stop(self)
    }

    def receive = {
      case ProductionComplete => endProduction(ProductionComplete)
      case Terminated(ref) => endProduction(ProductionFailed)
    }
  }

如果 ProductionAssistantScheduler 收到 ProductionComplete 消息,它可向父 Director 报告成功。如果它先收到一条 Terminated 消息,那么它需要报告失败。无论如何,它都会告诉参与该影片的所有 Star,它们的工作已完成,以便执行清理。

清单 7 是您在运行此程序时想要看到的一个输出示例,其中影片制作结果以粗体显示。

清单 7. 示例输出
Bob has been discovered
Academy now tracking 1 stars
Alice has been discovered
Academy now tracking 2 stars
Rock has been discovered
Academy now tracking 3 stars
Paper has been discovered
Academy now tracking 4 stars
Cosmo wants to make a movie with 4 actors
Astro wants to make a movie with 3 actors
Signed star 1 of 4 for director Cosmo
Signed star 2 of 4 for director Cosmo
Signed star 3 of 4 for director Cosmo
Signed star 4 of 4 for director Cosmo
Cosmo cast 4 actors for movie, starting production
Scissors has been discovered
Academy now tracking 5 stars
Cosmo made a movie!
Cosmo wants to make a movie with 4 actors
Signed star 1 of 4 for director Cosmo
Signed star 2 of 4 for director Cosmo
Signed star 3 of 4 for director Cosmo
Signed star 4 of 4 for director Cosmo
Cosmo cast 4 actors for movie, starting production
North has been discovered
Academy now tracking 6 stars
South has been discovered
Academy now tracking 7 stars
Cosmo failed making a movieAstro failed casting a movie
Bob has left the business
Academy now tracking 6 Stars
Cosmo wants to make a movie with 3 actors
Signed star 1 of 3 for director Cosmo
Signed star 2 of 3 for director Cosmo
Signed star 3 of 3 for director Cosmo
Cosmo cast 3 actors for movie, starting production
East has been discovered
Academy now tracking 7 stars
West has been discovered
Academy now tracking 8 stars
Alice has left the business
Academy now tracking 7 Stars
Rock has left the business
Academy now tracking 6 Stars
Up has been discovered
Academy now tracking 7 stars
Astro wants to make a movie with 2 actors
Signed star 1 of 2 for director Astro
Signed star 2 of 2 for director Astro
Astro cast 2 actors for movie, starting production
Cosmo made a movie!
Cosmo wants to make a movie with 3 actors
Signed star 1 of 3 for director Cosmo
Signed star 2 of 3 for director Cosmo
Signed star 3 of 3 for director Cosmo
Cosmo cast 3 actors for movie, starting production
Down has been discovered
Academy now tracking 8 stars

靠近清单中间部分的双重故障显示了一个有趣的输出序列。首先是 Cosmo failed making a movie 行,然后是 Astro failed casting a movie,接着是 Bob has left the business。这些行显示了终止一个 StarBob 所产生的交互。在本例中,Bob 已接受 Cosmo 制作的影片中的一个角色,而且影片制作已经开始,所以 CosmoProductionAssistant 收到了 Terminated 消息并让影片制作失败。Bob 已被选择出演 Astro 制作的一部影片中的角色,但尚未接受该角色(因为 Bob 已承诺出演 Cosmo 的影片),所以 AstroCastingAssistant 收到了 Terminated 消息并让该影片的演员安排失败。第 3 条消息是 Academy 在收到 Terminated 消息时生成的。

结束语

真实的 actor 系统应用程序涉及到多个(通常是许多)actor 和这些 actor 之间的消息。本文展示了如何搭建一个 actor 系统,描绘了 actor 的交互来帮助您了解该系统的操作。处理 actor 和消息是一种与编写顺序代码不同的编程方法。获得一定的经验后,您会发现 actor 方法使创建具有异步执行能力的高度可扩展程序变得很容易。

搭建 actor 和消息交换结构只能让您的 actor 系统运行起来。在某个时刻,您需要跟踪您的 actor 在何处出现行为不当。actor 系统的异步性质使查明存在问题的交互变得更加困难。如何跟踪和调试 actor 交互,这是一个值得用整篇文章来讨论的主题。


相关主题

  • 可扩展的 Scala:系列文章作家 Dennis Sosnoski 分享对此系列中的内容和一般性的 Scala 开发的见解和背后信息。
  • 本文的示例代码:从作者在 GitHub 上的存储库获取本文的完整示例代码。
  • Scala:Scala 是 JVM 上一种现代的函数式语言。
  • Akka.io 包含有关 Akka 的所有信息的来源,包括 Scala 和 Java 应用程序的完整文档。
  • Actor 系统介绍”(Josh Suereth,DevNexus 大会):Josh Suereth 使用 Akka 和 actor 涉及了一个分布式搜索服务,在此过程中很好地概述了 Akka 所提供的许多不错特性。
  • developerWorks Java 技术专区:这里有数百篇关于 Java 编程各个方面的文章。

评论

添加或订阅评论,请先登录注册

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Java technology, Open source
ArticleID=1014974
ArticleTitle=JVM 并发性: 使用 Akka 构建 actor 应用程序
publish-date=09152015