Одновременное исполнение на платформе JVM

Построение основанных на акторах приложений с использованием Akka

Углубленные вопросы построения приложений, использующих взаимодействия между акторами

Comments

Серия контента:

Этот контент является частью # из серии # статей: Одновременное исполнение на платформе JVM

Следите за выходом новых статей этой серии.

Этот контент является частью серии:Одновременное исполнение на платформе JVM

Следите за выходом новых статей этой серии.

В статье "Одновременное исполнение на платформе JVM: Асинхронное исполнение с помощью Akka" была представлена модель акторов, а также инфраструктура и среда исполнения Akka.Создание приложений на основе акторов отличается от создания традиционных линейных приложений. В случае линейного приложения разработчик создает для него управляющую логику и последовательность шагов, обеспечивающих достижение поставленной цели. Чтобы эффективно использовать модель акторов, необходимо осуществить декомпозицию приложения на независимые связки "состояние/поведение" (акторы) и описать взаимодействия между этими связками (сообщения). Эти два компонента — акторы и сообщения — являются элементарными блоками для построения приложения.

При правильном сочетании акторов и сообщений разработчик получает систему, в которой большая часть деятельности осуществляется асинхронно. Асинхронное функционирование труднее в понимании, чем функционирование при линейном подходе, однако эти трудности окупаются благодаря более высокой масштабируемости. Программы с высокой степенью асинхронности способны лучше использовать значительные системные ресурсы (в том числе ресурсы памяти и процессорные ресурсы) либо быстрее выполнять определенную задачу или обрабатывать параллельно больше экземпляров задачи. Akka позволяет распространить эту масштабируемость на несколько систем путем использования дистанционного подхода к работе с распределенными акторами.

В данной статье более глубоко рассматривается структурирование систем в терминологии акторов и сообщений. В статью включено два учебных приложения. В первом приложении демонстрируются базовые сведения по работе акторов и сообщений в Akka. Во втором, более сложном приложении демонстрируется планирование структуры для системы акторов и визуализация этой структуры. В обоих приложениях используется код на языке Scala, однако они достаточно просты в понимании для Java-разработчиков (в качестве справочного материала можно использовать предыдущую статью этого цикла, в которой приведены примеры аналогичных программ с использованием Akka на языке Scala и на языке Java).

Загрузите программный код для этой статьи.

Знакомство с акторами Star

Примеры из предыдущей статьи отличались следующими особенностями.

  • Акторы непосредственно создавались основным приложением, которое запускало систему акторов.
  • Использовался только один тип акторов.
  • Взаимодействия между акторами были минимальными.

В первом учебном приложении данной статьи я использую несколько более сложную структуру и рассматриваю ее по частям. В листинге 1 это приложение показано целиком.

Листинг 1. Поколения акторов Star
 import scala.concurrent.duration._ import scala.util.Random import akka.actor._ import akka.util._ object Stars1 extends App { import Star._ val starBaseLifetime = 5000 millis val starVariableLifetime = 2000 millis val starBaseSpawntime = 2000 millis val starVariableSpawntime = 1000 millis object Namer { case object GetName case class SetName(name: String) def props(names: Array[String]): Props = Props(new Namer(names)) } class Namer(names: Array[String]) extends Actor { import context.dispatcher import Namer._ context.setReceiveTimeout(starBaseSpawntime + starVariableSpawntime) def receive = { case GetName => { val name = ... sender ! SetName(name) } case ReceiveTimeout => { println("Namer receive timeout, shutting down system") system shutdown } } } object Star { case class Greet(peer: ActorRef) case object AskName case class TellName(name: String) case object Spawn case object IntroduceMe case object Die def props(greeting: String, gennum: Int, parent: String) = Props(new Star(greeting, gennum, parent)) } class Star(greeting: String, gennum: Int, parent: String) extends Actor { import context.dispatcher var myName: String = "" var starsKnown = Map[String, ActorRef]() val random = Random val namer = context actorSelection namerPath namer ! Namer.GetName def scaledDuration(base: FiniteDuration, variable: FiniteDuration) = base + variable * random.nextInt(1000) / 1000 val killtime = scaledDuration(starBaseLifetime, starVariableLifetime) val killer = scheduler.scheduleOnce(killtime, self, Die) val spawntime = scaledDuration(starBaseSpawntime, starVariableSpawntime) val spawner = scheduler.schedule(spawntime, 1 second, self, Spawn) if (gennum > 1) scheduler.scheduleOnce(1 second, context.parent, IntroduceMe) def receive = { case Namer.SetName(name) => { myName = name println(s"$name is the ${gennum}th generation child of $parent") context become named } } def named: Receive = { case Greet(peer) => peer ! AskName case AskName => sender ! TellName(myName) case TellName(name) => { println(s"$myName says: '$greeting, $name'") starsKnown += name -> sender } case Spawn => { println(s"$myName says: A star is born!") context.actorOf(props(greeting, gennum + 1, myName)) } case IntroduceMe => starsKnown.foreach { case (name, ref) => ref ! Greet(sender) } case Die => { println(s"$myName says: 'I'd like to thank the Academy...'") context stop self } } } val namerPath = "/user/namer" val system = ActorSystem("actor-demo-scala") val scheduler = system.scheduler system.actorOf(Namer.props(Array("Bob", "Alice", "Rock", "Paper", "Scissors", "North", "South", "East", "West", "Up", "Down")), "namer") val star1 = system.actorOf(props("Howya doing", 1, "Nobody")) val star2 = system.actorOf(props("Happy to meet you", 1, "Nobody")) Thread sleep 500 star1 ! Greet(star2) star2 ! Greet(star1) }

Это приложение создает систему акторов с акторами двух типов: Namer (именователь) и Star (звезда). Namer – это актор-синглтон, в сущности, это центральный каталог имен. Акторы Star получают свои имена (псевдонимы) от актора Namer, а затем печатают приветственные сообщения для других акторов Stars, как в примерах из последней статьи. Кроме того, эти акторы порождают акторов-потомков Star, которых затем представляют другим, известным им акторам Star. И, наконец, по истечении определенного времени акторы Star могут умирать.

В листинге 2 приведен пример выходной информации при исполнении этого приложения.

Листинг 2. Выходная информация приложения
 Bob is the 1th generation child of Nobody Alice is the 1th generation child of Nobody Bob says: 'Howya doing, Alice' Alice says: 'Happy to meet you, Bob' Bob says: A star is born! Rock is the 2th generation child of Bob Alice says: A star is born! Paper is the 2th generation child of Alice Bob says: A star is born! Scissors is the 2th generation child of Bob Alice says: 'Happy to meet you, Rock' Alice says: A star is born! North is the 2th generation child of Alice Bob says: 'Howya doing, Paper' Rock says: 'Howya doing, Paper' Bob says: A star is born! South is the 2th generation child of Bob Alice says: 'Happy to meet you, Scissors' Paper says: 'Happy to meet you, Scissors' Alice says: A star is born! East is the 2th generation child of Alice Bob says: 'Howya doing, North' Rock says: 'Howya doing, North' Scissors says: 'Howya doing, North' Paper says: A star is born! West is the 3th generation child of Paper Rock says: A star is born! Up is the 3th generation child of Rock Bob says: A star is born! Down is the 2th generation child of Bob Alice says: 'Happy to meet you, South' North says: 'Happy to meet you, South' Paper says: 'Happy to meet you, South' Scissors says: A star is born! Bob-Bob is the 3th generation child of Scissors Alice says: A star is born! Bob-Alice is the 2th generation child of Alice Scissors says: 'Howya doing, East' Rock says: 'Howya doing, East' Bob says: 'Howya doing, East' South says: 'Howya doing, East' North says: A star is born! Bob-Rock is the 3th generation child of North Paper says: A star is born! Bob-Paper is the 3th generation child of Paper Bob says: 'I'd like to thank the Academy...' Scissors says: 'Howya doing, West' South says: 'Howya doing, West' Alice says: A star is born! Bob-Scissors is the 2th generation child of Alice North says: A star is born! Bob-North is the 3th generation child of North Paper says: A star is born! Bob-South is the 3th generation child of Paper Alice says: 'I'd like to thank the Academy...' Namer receive timeout, shutting down system

Поколения акторов Star

Акторы Star производят свое потомство не таким драматичным и публичным образом, как некоторые реальные актеры; вместо этого актор Star бесшумно порождает потомка при каждом получении сообщения Spawn. Единственным признаком волнения родительского актора по поводу этого события является простое объявление о рождении A star is born! (Родилась новая звезда!). Еще одно отличие от реальных актеров состоит в том, испытывающие чувство гордости новые родительские акторы Star даже не могут объявить об имени своего нового потомка – это имя определяет специальный именующий актор. После того, как недавно созданный акторStar получил имя, актор Namer печатает имя потомка и сведения о нем в строке формы "Ted is the 2th generation child of Bob" (Тед – представитель второго поколения потомков Боба).

Смерть актора Star инициируется актором Star, получившим сообщение Die, в ответ на которое он печатает сообщение "I'd like to thank the Academy...." (Я хотел бы поблагодарить Академию....). Затем актор Star выполняет оператор context stop self, сообщающее Akka-контексту управления акторами о том, что он все сделал, и что его следует завершить. После этого контекст выполняет всю работу по очистке и удаляет данного актора из системы.

Смена ролей

Реальные актеры могут играть много различных ролей. Akka-акторы также способны брать на себя различные роли; эта задача решается изменением методов обработки сообщений. Это можно увидеть в акторе Star, где метод по умолчанию receive обрабатывает только сообщение SetName, а все остальные сообщения обрабатывает метод named. Передача полномочий происходит при обработке сообщения SetName, с помощью утверждения context become named. Смысл этой смены ролей состоит в том, что актор Star ничего не может сделать, пока он не получит имя, а после получения имени он никогда не может переименован.

Обработку всех сообщений всегда можно осуществлять одним методом receive, однако такой подход зачастую порождает неряшливый программный код с условными операторами, результат выполнения которых зависит от текущего состояния актора. Использование отдельного метода receive для каждого отличающегося от других состояния сохраняет опрятность и простоту программного кода. В общем случае всегда, когда состоянию актора соответствует новое сообщение, необходимо перейти к использованию нового метода receive, чтобы представить это состояние.

Необходимо соблюдать осторожность, чтобы при смене ролей актора не исключить обработку действительных сообщений. Например, если бы для акторов Star было разрешено переименование в любой момент времени, методу named в листинге 1 нужно было бы обрабатывать сообщение SetName. Любые сообщения, которые не обрабатывает текущий метод receive актора, фактически отбрасываются (на самом деле они отправляются в почтовый ящик по умолчанию для невостребованных сообщений, однако с точки зрения ваших пользовательских акторов они именно отбрасываются).

В качестве альтернативы изменению обработчика сообщений также можно продвинуть в стеке текущий обработчик сообщений и настроить новый обработчик, используя для этого следующую форму с двумя аргументами: become(named, false). После этого вы можете в конечном счете восстановить исходный обработчик с помощью вызова context unbecome. Подобным образом вызовы become/unbecomeможно вкладывать сколь угодно глубоко, но при этом необходимо соблюдать осторожность, чтобы код в конечном итоге исполнял соответствующий unbecome для каждого become. Иначе любой незакрытый becomes будет оборачиваться утечкой памяти.

Актор Namer

Актор Namer передает массив строк с именами в своем конструкторе. Каждый раз, когда этот актор получает сообщение GetName, он возвращает следующее в массиве имя в сообщении SetName. При исчерпании запаса простых имен он переходит к двойным именам (написанным через дефис). Цель актора Namer состоит в присвоении имен (в идеале уникальных) акторам Star; соответственно в данной системе нет никакой необходимости иметь более одного экземпляра актора Namer. Код приложения, запускающий систему акторов, непосредственно создает экземпляр этого актора-синглтона, доступный для использования всем акторам Star.

Поскольку актор-синглтон Namer создается приложением, оно может передать ссылку ActorRef на этот актор каждому актору Star, а акторы Star могут передать эту ссылку далее своим потомкам. Однако Akka предоставляет разработчику более опрятный способ работы с такого рода общеизвестными акторами. Строка val namer = context actorSelection namerPath в разделе инициализации актора Star ищет актора Namer по его маршруту в системе акторов — в данном случае по маршруту, /user/namer (префикс /user применяется ко всем акторам, созданным пользователями, а namer – это имя, присвоенное при создании актора Namer с помощью system.actorOf). Значение namer видят все акторы, включенные в приложение, поэтому при необходимости его можно использовать непосредственно.

Запланированные сообщения

В примере, показанном в листинге 1, используется несколько запланированных сообщений для подсказок различным акторам. В ходе инициализации акторы Star создают два или три запланированных сообщения. Утверждение val killer = scheduler.scheduleOnce(killtime, self, Die) создает планировщик однократного сообщения, чтобы инициировать смерть актора Star путем отправки сообщени Die, когда время жизни этого актора на данной стадии истекает. Утверждение val spawner = scheduler.schedule(spawntime, 1 second, self, Spawn) создает планировщик повторяющихся сообщений, который отправляет сообщения Spawn с секундными интервалами после начальной задержки, чтобы заселить новое поколение акторов Star.

Третий тип запланированного сообщения для актора Star используется только тогда, когда этот актор Star является потомком другого актора Star (а не создан кодом приложения за пределами системы акторов). Утверждение if (gennum > 1) scheduler.scheduleOnce(1 second, context.parent, IntroduceMe) создает запланированное сообщение, подлежащее отправке родителю актора Star спустя одну секунду после инициализации этого актора Star, если новый актор Star относится ко второму или последующему поколению. Когда родительский актор Star получает это сообщение, он отправляет каждому из остальных акторов Star , которым он был представлен, сообщение Greet, посредством которого он просит этих известных акторов Star to представить себя данному потомку.

Актор Namer также использует запланированное сообщение, представленное в форме получения тайм-аута. Оператор context.setReceiveTimeout(starBaseSpawntime + starVariableSpawntime)присваивает тайм-ауту значение максимального времени рождения для акторов Star. Контекст сбрасывает этот тайм-аут каждый раз, когда актор получает сообщение; таким образом, он срабатывает только в том случае, если заданный промежуток времени истекает без получения каких-либо сообщений. Акторы Star непрерывно создают новых акторов-потомковd Star, которые отправляют сообщения актору Namer; соответственно тайм-аут происходит только в том случае, если все акторы Star закончились. Если тайм-аут все же происходит, актор Namer обрабатывает результирующее сообщение ReceiveTimeout (определенное в пакете akka.actor), выключая всю систему акторов.

У внимательного читателя может возникнуть вопрос, каким образом тайм-аут актора Namer вообще происходит. Продолжительность жизни актора Star всегда составляет не менее 5 секунд, а каждый актор Star начинает порождать акторов-потомков Star к тому моменту, когда он имеет возраст максимум 3 секунды — таким образом, складывается впечатление, что должен иметь место непрерывно растущий запас акторов Star (примерно как в реалити-телевидении). Так каким образом работает тайм-аут? Ответ заключается в используемой в Akka модели actor supervision (надзор за акторами) и в отношениях "родитель-потомок".

Семьи акторов

Akka принудительно применяет иерархию надзора за акторами на основе их происхождения. Когда один актор создает другого актора, созданный актор становится подчиненным исходного актора. Это означает, что родительский актор отвечает за своих акторов-потомков (это именно тот принцип, который мы зачастую хотели бы увидеть в применении к реальным актерам). Эта ответственность распространяется преимущественно на обработку сбоев, но также имеет определенное влияние и на характер работы акторов.

Иерархия надзора – это именно та причина, по которой завершает работу система акторов в листинге 1. Эта иерархия требует, чтобы был доступен родительский актор, поэтому завершение родительского актора автоматически завершает всех его акторов-потомков. В листинге 1 приложение первоначально создает лишь два актора Star (которые всегда получают имена Bob и Alice). Всех остальных акторов Stars создает один из этих двух начальных акторов Star либо один из акторов Star – их потомков (детей или внуков). Таким образом, когда какой-либо из этих двух корневых акторов Star терминируется, он забирает с собой всех своих потомков. После терминации этих двух акторов никаких других акторов Star не остается. Поскольку нет никаких акторов Star, которые порождали бы акторов-потомков Star, в актор Namer не поступает никаких запросов на имена, поэтому в конечном итоге тайм-аут Namer срабатывает, и система выключается.

Более сложные системы акторов

В листинге 1 был показан пример того, как работает простая система акторов. Однако обычно в реальных прикладных системах имеется гораздо больше типов акторов (нередко – десятки или сотни), а между акторами осуществляются более сложные взаимодействия. Один из лучших способов проектирования и организации сложных систем акторов состоит в специфицировании потоков сообщений между акторами.

Для более сложного примера я расширю приложение в листинге 1 таким образом, чтобы реализовать простую модель создания фильма. Эта модель использует четыре типа основных акторов и два специализированных типа вторичных акторов.

  • Актор Star: Актер для участия в фильмах
  • Актор Scout: Искатель новых акторов Star
  • Актор Academy: Реестр-синглтон, отслеживающий всех активных акторов Star
  • Актор Director: Создатель фильма (режиссер)
    • Актор CastingAssistant: Ассистент актора Director, отвечающий за кастинг фильма
    • Актор ProductionAssistant: Ассистент актора Director, отвечающий за создание фильма

Подобно акторам Star в листинге 1, акторы Star в этом приложении имеют ограниченную продолжительность жизни. Когда актор Director приступает к созданию фильма, он получает актуальный список активных акторов Star , которых можно использовать в этом фильме. Сначала актору Director нужно привлечь акторов Star к участию в фильме, а затем сделать фильм с участием этих акторов Star. Если какой-либо из акторов Star, участвующих в фильме, выходит из бизнеса (или "умирает" в терминологии акторов) до завершения фильма, создание фильма заканчивается неудачей.

Схематическое изображение сообщений

Приложение в листинге 1 Приложение в листинге 1 было достаточно простым для того, чтобы взаимодействия между акторами можно было объяснить словесно. Это новое приложение намного сложнее, поэтому для представления взаимодействий требуется более наглядный способ. Схема передачи сообщений является отличным способом демонстрации этих взаимодействий. На рис. 1 показана последовательность взаимодействий, происходящих в процессе деятельности актора Scout по отысканию нового актора Star (или, в терминологии акторов, по созданию актора Star) и в процессе регистрации нового актора Star в акторе Academy.

Рисунок 1. Создание и инициализация актора Star
Message-passing diagram of Star creation and initialization
Message-passing diagram of Star creation and initialization

Ниже описывается последовательность сообщений (и шагов по их созданию) в процессе добавления актора Star:

  1. Сообщение FindTalent (от Scheduler к Scout): Инициирует добавление актора Star.
  2. Сообщение GetName (от Scout к Academy): Присваивает имя актору Star.
  3. Сообщение GiveName (ответ от Academy): Предоставляет присвоенное имя.
  4. Сообщение actorOf(): Scout создает нового актора Star с предоставленным именем.
  5. Сообщение Register (от Star к Academy): Регистрирует актора Star с актором Academy.

Эта последовательность сообщений отличается масштабируемостью и гибкостью. Каждое сообщение может быть обработано изолированно; соответственно для осуществления обмена сообщениями акторам нет необходимости изменять свое внутреннее состояние (актор-синглтон Academy изменяет состояние, однако это осуществляется в интересах обмена сообщениями). Поскольку внутреннее состояние не изменяется, вам нет необходимости строго соблюдать последовательность сообщений. Например, можно сделать так, чтобы сообщение FindTalent создавало несколько акторов Star. Для этого нужно отправить актору Academy более одного сообщения GetName. Можно даже обработать подряд несколько сообщений FindTalent прежде, чем завершится создание последнего актора Star. Можно также добавить в систему любое количество акторов Scout и исполнять их независимо без каких-либо конфликтов.

Создание фильма является гораздо более сложным процессом, чем создание нового актора Star,включающим больше изменений состояний и потенциальных сбойных ситуаций. На рис. 2 показаны сообщения основного приложения, участвующие в создании фильма.

Рисунок 2. Создание фильма
Message-passing diagram for making a movie
Message-passing diagram for making a movie

Ниже описывается последовательность сообщений, участвующих в создании фильма (относящаяся преимущественно к благоприятному развитию ситуации без каких-либо затруднений).

  1. Сообщение MakeMovie (от Scheduler к Director): Инициирует создание фильма.
  2. Сообщение PickStars (от Director к Academy): Выбирает актора Star для участия в фильме.
  3. Сообщение StarsPicked или PickFailure (ответ от Academy): При наличии достаточного количества доступных акторов Star для создания фильма актор Academy выбирает нужное количество и возвращает список в сообщении StarsPicked; в противном случае актор Academy StarsPicked; в противном случае актор Academy PickFailure.
  4. Сообщение actorOf(): Director создает акторCastingAssistant с целью осуществления кастинга для фильма.
  5. Сообщение OfferRole (от актора CastingAssistant к каждому актору Star, участвующему в фильме): CastingAssistant предлагает роль актору Star.
  6. Сообщение AcceptRole или RejectRole (ответ от каждого актора Star): Актор Star отклоняет предлагаемую роль, если он уже назначен на другую роль; в противном случае он принимает предлагаемую роль.
  7. Сообщение AllSigned или CastingFailure (от актора CastingAssistant к родительскому актору): Когда все акторы Stars получили свои роли, работа актора CastingAssistant сделана, поэтому он информирует родительский актор Director об успешном выполнении этой работы посредством сообщения AllSigned; в случае невозможности привлечения актора Star (в частности, если тот "умирает"), актор CastingAssistant информирует родительский актор о неудаче. Так или иначе, актор CastingAssistant сделал свою работу и может быть терминирован.
  8. Сообщение actorOf(): Director создает актор ProductionAssistant для осуществления производства фильма.
  9. Сообщение ProductionComplete (от Scheduler к ProductionAssistant): Инициирует завершение фильма после истечения заданного промежутка времени.
  10. Сообщение ProductionComplete или ProductionFailure (от ProductionAssistant к родительскому актору): При срабатывании таймера завершения фильма актор ProductionAssistant сообщает своему родительскому актору о том, что фильм готов.
  11. Сообщение RoleComplete (от ProductionAssistant каждому актору Star, участвующему в фильме): Актор ProductionAssistant также обязан уведомить каждого актора Star о том, что фильм завершен, благодаря чему они становятся доступными для участия в других фильмах.

Эта последовательность сообщений использует изменение состояний некоторых акторов как часть обработки. Акторам Starнужно изменять состояние между состоянием доступности и состоянием участия в фильме. Акторам CastingAssistant нужно отслеживать, какие акторы Star нужно отслеживать, какие акторы Star приняли роли в создаваемом фильме, чтобы знать, каких акторов еще нужно привлечь. Однако акторам Director не нужно изменять состояние, поскольку они лишь реагируют на получаемые сообщения (включая сообщения от своих акторов-потомков). Акторам ProductionAssistant также не нужно изменять состояние, поскольку они должны лишь уведомлять других акторов о завершении фильма.

Можно обойтись без использования отдельных акторов CastingAssistant и ProductionAssistant, если объединить их функциональность с функциональностью актора Director. Однако устранение других акторов существенно усложняет актор Director, поэтому в данном случае целесообразнее разделить его функциональность между несколькими акторами. Это оказывается особенно полезно при обработке сбоев.

Обработка сбоев

В потоках сообщений на рис. 1 и рис. 2 не показан один важный аспект приложения. Акторы Star имеют ограниченную продолжительность жизни, поэтому каждый актор, имеющий дело со акторами Star, должен знать, когда тот или иной актор умирает. В частности, если актор Star, выбранный для фильма, умрет до того, как этот фильм будет готов, создание фильма неминуемо завершится неудачей.

При обработке сбоев в системах Akka-акторов используется родительский надзор, в соответствии с которым сбойные состояния передаются вверх по иерархии акторов. В JVM сбои обычно представляются как исключения; соответственно, для обнаружения возникшего сбоя Akka использует естественную обработку исключений. Если какой-либо актор не обрабатывает исключение в своем коде, Akka обрабатывает неперехваченное исключение посредством терминирования этого актора и передачи сбоя вверх родительскому актору. После этого данный родитель может обработать этой сбой или сам передать сбой своему родителю.

Встроенная в Akka обработка сбоев хорошо работает в таких условиях, как ошибки ввода/вывода, однако для исключений в системе кинопроизводства она чрезмерно сложна. В этом случае необходимо лишь контролировать других акторов; к счастью, Akka предоставляет удобный способ решения этой задачи. Так, компонент DeathWatch системы акторов позволяет актору зарегистрировать себя в качестве наблюдателя за любым другим актором. После регистрации наблюдающий актор в случае смерти наблюдаемого актора получит системное сообщение Terminated (чтобы избежать конфликтных ситуаций, когда наблюдаемый актор уже умер до начала наблюдения, сообщение Terminated немедленно появляется в почтовом ящике наблюдающего актора).

Компонент DeathWatch активируется путем вызова метода context.watch(), который принимает ссылку ActorRef актора, подлежащего наблюдению. Результирующее сообщение Terminated в случае смерти представляющего интерес актора полностью исчерпывает все потребности по обработке сбоев в примере с созданием фильма.

Программный код для создания актора Star

В листинге 3 показан код, осуществляющий запуск приложения и создание новых акторов Star в соответствии с потоком сообщений на рис. 1.

Листинг 3. Код для создания актора Star
 object Stars2 extends App { object Scout { case object FindTalent val starBaseLifetime = 7 seconds val starVariableLifetime = 3 seconds val findBaseTime = 1 seconds val findVariableTime = 3 seconds def props(): Props = Props(new Scout()) } class Scout extends Actor { import Scout._ import Academy._ import context.dispatcher val random = Random scheduleFind def scheduleFind = { val nextTime = scaledDuration(findBaseTime, findVariableTime) scheduler.scheduleOnce(nextTime, self, FindTalent) } def scaledDuration(base: FiniteDuration, variable: FiniteDuration) = base + variable * random.nextInt(1000) / 1000 def receive = { case FindTalent => academy ! GetName case GiveName(name) => { system.actorOf(Star.props(name, scaledDuration(starBaseLifetime, starVariableLifetime)), name) println(s"$name has been discovered") scheduleFind } } } object Academy { case object GetName case class GiveName(name: String) case class Register(name: String) ... def props(names: Array[String]): Props = Props(new Academy(names)) } class Academy(names: Array[String]) extends Actor { import Academy._ var nextNameIndex = 0 val nameIndexLimit = names.length * (names.length + 1) val liveStars = Buffer[(ActorRef, String)]() ... def receive = { case GetName => { val name = if (nextNameIndex < names.length) names(nextNameIndex) else { val first = nextNameIndex / names.length - 1 val second = nextNameIndex % names.length names(first) + "-" + names(second) } sender ! GiveName(name) nextNameIndex = (nextNameIndex + 1) % nameIndexLimit } case Register(name) => { liveStars += ((sender, name)) context.watch(sender) println(s"Academy now tracking ${liveStars.size} stars") } case Terminated(ref) => { val star = (liveStars.find(_._1 == ref)).get liveStars -= star println(s"${star._2} has left the business\nAcademy now tracking ${liveStars.size} Stars") } ... } } } object Star { ... def props(name: String, lifespan: FiniteDuration) = Props(new Star(name, lifespan)) } class Star(name: String, lifespan: FiniteDuration) extends Actor { import Star._ import context.dispatcher academy ! Academy.Register(name) scheduler.scheduleOnce(lifespan, self, PoisonPill) } ... val system = ActorSystem("actor-demo-scala") val scheduler = system.scheduler val academy = system.actorOf(Academy.props(Array("Bob", "Alice", "Rock", "Paper", "Scissors", "North", "South", "East", "West", "Up", "Down")), "Academy") system.actorOf(Scout.props(), "Sam") system.actorOf(Scout.props(), "Dean") system.actorOf(Director.props("Astro"), "Astro") system.actorOf(Director.props("Cosmo"), "Cosmo") Thread sleep 15000 system.shutdown }

Код в листинге 3 по большей части использует такую же функциональность Akka, что в примере с генерацией акторов Star в листинге 1, но с добавлением вызова context.watch(), активирующего компонент DeathWatch. Этот вызов осуществляет актор Academy при обработке сообщения Register от нового актора Star. Для каждого актора Star актор Academy записывает ссылку ActorRef и имя этого актора; при обработке сообщения Terminated он использует ссылку ActorRef, чтобы найти и удалить умершего актора Star. Благодаря этому поддерживается актуальность компонента Buffer (по существу, списка ArrayList), содержащего живых акторов Star.

Программный код основного приложения сначала создает актор-синглтон Academy, затем пару акторов Scout и, наконец, пару акторов Director. Описываемое приложение позволяет системе акторов работать в течение 15 секунд, а затем завершает работу системы и осуществляет выход.

Запуск процесса создания фильма

В листинге 4 показана первая часть кода, участвующего в создании фильма: кастинг акторов Star Этот код соответствует верхней части потока сообщений на рис. 2, включая Scheduler и взаимодействие между актором Director и актором Academy.

Листинг 4. Программный код создания фильма
 object Stars2 extends App { ... object Director { case object MakeMovie val starCountBase = 2 val starCountVariable = 4 val productionTime = 3 seconds val recoveryTime = 3 seconds def props(name: String) = Props(new Director(name)) } class Director(name: String) extends Actor { import Academy._ import Director._ import ProductionAssistant._ import context.dispatcher val random = Random def makeMovie = { val numstars = random.nextInt(starCountVariable) + starCountBase academy ! PickStars(numstars) } def retryMovie = scheduler.scheduleOnce(recoveryTime, self, MakeMovie) makeMovie def receive = { case MakeMovie => makeMovie case PickFailure => retryMovie case StarsPicked(stars) => { println(s"$name wants to make a movie with ${stars.length} actors") context.actorOf(CastingAssistant.props(name, stars.map(_._1)), name + ":Casting") context become casting } } ... } ... object Academy { ... case class PickStars(count: Int) case object PickFailure case class StarsPicked(ref: List[(ActorRef, String)]) def props(names: Array[String]): Props = Props(new Academy(names)) } class Academy(names: Array[String]) extends Actor { ... def pickStars(n: Int): Seq[(ActorRef, String)] = ... def receive = { ... case PickStars(n) => { if (liveStars.size < n) sender ! PickFailure else sender ! StarsPicked(pickStars(n).toList) } } }

В начале кода в листинге 4 представляется объект Director и часть определения актора. Производство фильма инициируется компонентом Scheduler, отсылающим сообщение MakeMovie актору Director. Получив это сообщение MakeMovie, актор Director запускает процесс производства фильма, направляя актору Academy сообщение PickStars с запросом о выделении акторов Star для участия в фильме. Код актора Academy для обработки сообщения PickStars, показанный в конце листинга 4, отсылает в ответ сообщение PickFailure (если доступных акторов Star недостаточно) или сообщение StarsPicked. Если актор Director получает сообщени еPickFailure, он планирует другую попытку на более поздний момент времени. Если актор Director получает сообщение StarsPicked, он запускает актора CastingAssistant со списком акторо Star, выбранных актором Academy для ролей создаваемого фильма, а затем изменяет состояние на обработку ответа от актора CastingAssistant. В листинге 5 показано продолжение после этой точки, начиная с метода Receive актора Director.

Листинг 5. Работа актора CastingAssistant
 class Director(name: String) extends Actor { ... def casting: Receive = { case CastingAssistant.AllSigned(stars) => { println(s"$name cast ${stars.length} actors for movie, starting production") context.actorOf(ProductionAssistant.props(productionTime, stars), name + ":Production") context become making } case CastingAssistant.CastingFailure => { println(s"$name failed casting a movie") retryMovie context become receive } } ... } object CastingAssistant { case class AllSigned(stars: List[ActorRef]) case object CastingFailure val retryTime = 1 second def props(dirname: String, stars: List[ActorRef]) = Props(new CastingAssistant(dirname, stars)) } class CastingAssistant(dirname: String, stars: List[ActorRef]) extends Actor { import CastingAssistant._ import Star._ import context.dispatcher var signed = Set[ActorRef]() stars.foreach { star => { star ! OfferRole context.watch(star) } } def receive = { case AcceptRole => { signed += sender println(s"Signed star ${signed.size} of ${stars.size} for director $dirname") if (signed.size == stars.size) { context.parent ! AllSigned(stars) context.stop(self) } } case RejectRole => scheduler.scheduleOnce(retryTime, sender, OfferRole) case Terminated(ref) => { context.parent ! CastingFailure stars.foreach { _ ! Star.CancelOffer } context.stop(self) } } } object Star { case object OfferRole case object AcceptRole case object RejectRole case object CancelOffer case object RoleComplete ... } class Star(name: String, lifespan: FiniteDuration) extends Actor { ... var acceptedOffer: ActorRef = null scheduler.scheduleOnce(lifespan, self, PoisonPill) def receive = { case OfferRole => { sender ! AcceptRole acceptedOffer = sender context become booked } } def booked: Receive = { case OfferRole => sender ! RejectRole case CancelOffer => if (sender == acceptedOffer) context become receive case RoleComplete => context become receive } }

Актор Director создает актор CastingAssistant со списком ссылок ActorRef для акторов Star, подлежащих кастингу для фильма. Сначала актор CastingAssistant отправляет сообщение OfferRole каждому из этих акторов Star, а также регистрирует себя в качестве наблюдателя за каждым актором Star. Затем актор CastingAssistant ожидает от каждого актора Star ответного сообщения AcceptRole или RejectRole либо сообщения Terminated от системы акторов, информирующего о кончине какого-либо актора Star.

Если актор CastingAssistant получил сообщение AcceptRole от каждого актора Star, участвующего в кастинге, он направляет сообщение AllSigned своему родительскому актору Director. Для дополнительного удобства это сообщение включает в себя список ссылок actorRef акторов Star, поскольку эту информацию нужно передавать на следующий шаг обработки.

Если актор CastingAssistant получает от какого-либо актора Star сообщение RejectRole, он планирует повторное направление сообщения OfferRole тому же актору после некоторой задержки (в реальном мире актеры-звезды зачастую недоступны, поэтому если вы хотите, чтобы конкретный актер участвовал в вашем фильме, вам нужно обращаться к нему с предложениями до тех пор, пока он не примет его).

Если актор CastingAssistant получил сообщение Terminated, это означает, что один из акторов Star, отобранных для фильма, умер. В этом прискорбном случае актор CastingAssistant направляет своему родительскому актору Director сообщение CastingFailure и завершает свою работу. Однако перед этим он отправляет сообщение CancelOffer каждому актору Star в своем списке, чтобы акторы Star, получившие роли в данном фильме, имели возможность принять другие роли.

Может возникнуть вопрос, почему актор CastingAssistant отправляет сообщение CancelOfferкаждому актору Star— даже тем акторам, от которых еще не было обработано сообщение AcceptRole. Причина состоит в том, что, возможно, актор Star в списке уже отправил сообщение AcceptRole, однако оно все еще находится в почтовом ящике на момент обработки сообщения Terminated. В общем случае распределенной системы акторов также возможно, что актор Star принял предложенную роль, но сообщение AcceptRole все еще находится в пути или было потеряно. Отправка сообщения CancelOffer каждому актору Star делает обработку сбоев чище в любом случае, а актор Star может с легкостью игнорировать сообщения CancelOffer, если он не принял роли в создаваемом фильме.

В листинге 6 показана последняя часть процесса производства фильма: работа актора ProductionAssistant (соответствующая нижней правой части of рис. 2). Эта часть весьма проста, поскольку актору ProductionAssistant нужно лишь обработать сообщение SchedulerProductionComplete или сообщение Terminated.

Листинг 6. Работа актора ProductionAssistant
 class Director(name: String) extends Actor { ... def making: Receive = { case m: ProductionAssistant.ProductionEnd => { m match { case ProductionComplete => println(s"$name made a movie!") case ProductionFailed => println(s"$name failed making a movie") } makeMovie context become receive } } } object ProductionAssistant { sealed trait ProductionEnd case object ProductionComplete extends ProductionEnd case object ProductionFailed extends ProductionEnd def props(time: FiniteDuration, stars: List[ActorRef]) = Props(new ProductionAssistant(time, stars)) } class ProductionAssistant(time: FiniteDuration, stars: List[ActorRef]) extends Actor { import ProductionAssistant._ import context.dispatcher stars.foreach { star => context.watch(star) } scheduler.scheduleOnce(time, self, ProductionComplete) def endProduction(end: ProductionEnd) = { context.parent ! end stars.foreach { star => star ! Star.RoleComplete } context.stop(self) } def receive = { case ProductionComplete => endProduction(ProductionComplete) case Terminated(ref) => endProduction(ProductionFailed) } }

Если актор ProductionAssistant получает сообщение ProductionComplete от Scheduler, он может сообщить об успехе родительскому актору Director. Если он сначала получит сообщение Terminated, то должен будет сообщить о неудаче. В любом случае этот актор также производит очистку, сообщая всем акторам Star, участвующим в фильме, что их работа завершена.

В листинге 7 показан пример выходной информации при исполнении описываемой программы; результаты процесса создания фильма выделены жирным шрифтом.

Листинг 7. Пример выходной информации
 Bob has been discovered Academy now tracking 1 stars Alice has been discovered Academy now tracking 2 stars Rock has been discovered Academy now tracking 3 stars Paper has been discovered Academy now tracking 4 stars Cosmo wants to make a movie with 4 actors Astro wants to make a movie with 3 actors Signed star 1 of 4 for director Cosmo Signed star 2 of 4 for director Cosmo Signed star 3 of 4 for director Cosmo Signed star 4 of 4 for director Cosmo Cosmo cast 4 actors for movie, starting production Scissors has been discovered Academy now tracking 5 stars Cosmo made a movie! Cosmo wants to make a movie with 4 actors Signed star 1 of 4 for director Cosmo Signed star 2 of 4 for director Cosmo Signed star 3 of 4 for director Cosmo Signed star 4 of 4 for director Cosmo Cosmo cast 4 actors for movie, starting production North has been discovered Academy now tracking 6 stars South has been discovered Academy now tracking 7 stars Cosmo failed making a movieAstro failed casting a movie Bob has left the business Academy now tracking 6 Stars Cosmo wants to make a movie with 3 actors Signed star 1 of 3 for director Cosmo Signed star 2 of 3 for director Cosmo Signed star 3 of 3 for director Cosmo Cosmo cast 3 actors for movie, starting production East has been discovered Academy now tracking 7 stars West has been discovered Academy now tracking 8 stars Alice has left the business Academy now tracking 7 Stars Rock has left the business Academy now tracking 6 Stars Up has been discovered Academy now tracking 7 stars Astro wants to make a movie with 2 actors Signed star 1 of 2 for director Astro Signed star 2 of 2 for director Astro Astro cast 2 actors for movie, starting production Cosmo made a movie! Cosmo wants to make a movie with 3 actors Signed star 1 of 3 for director Cosmo Signed star 2 of 3 for director Cosmo Signed star 3 of 3 for director Cosmo Cosmo cast 3 actors for movie, starting production Down has been discovered Academy now tracking 8 stars

Двойная неудача примерно в середине листинга демонстрирует интересную последовательность выходной информации. Сначала следует строка Cosmo failed making a movie (Cosmo не смог сделать фильм), затем строка Astro failed casting a movie (Astro не смог осуществить кастинг для фильма), а ниже строка Bob has left the business (Bob ушел из бизнеса). Эти строки отражают взаимодействия, явившиеся результатом терминации одного из акторов Star с именем Bob. В данном случае Bob принял роль в фильме, который создает Cosmo, и производство этого фильма уже началось, поэтому актор ProductionAssistant актора Cosmo получил сообщение Terminated, вследствие чего создание фильма завершилось неудачей. Кроме того, Bob был отобран для получения роли в фильме, который создает Astro, но еще не принял предложения этой роли (поскольку Bob уже согласился участвовать в фильме, который делает Cosmo), поэтому актор CastingAssistant актора Astro получил сообщение Terminated, вследствие чего кастинг фильма также завершился неудачей. Третье сообщение было сгенерировано актором Academy, когда тот получил сообщение Terminated.

Заключение

Реальные приложения с системой акторов включают несколько — обычно множество — акторов и сообщений между этими акторами. В статье было показано, каким образом можно структурировать систему акторов и схематически представить взаимодействия между акторами с целью понимания функционирования этой системы. Подход к программированию, основанный на работе с акторами и сообщениями, отличается от подхода, основанного на написании последовательного программного кода. После получения определенного опыта разработчик осознает, что подход на основе акторов облегчает создание асинхронно исполняемых программ с высокой степенью масштабируемости.

Однако структурирование акторов и обмена сообщениями – это лишь часть пути к работающей системе акторов. В какой-то момент возникает необходимость в выявлении мест, в которых акторы ведут себя ненадлежащим образом. Асинхронная природа систем акторов усложняет точную локализацию взаимодействий, порождающих проблемы. Отслеживание и отладка взаимодействий между акторами – это тема, достойная отдельной статьи.


Ресурсы для скачивания


Похожие темы

  • Оригинал статьи: JVM concurrency: Building actor applications with Akka
  • Scalable Scala: Денис Сосноски, автор данного цикла, делится опытом и внутренней информацией о его содержании и о разработке на языке Scala в целом.
  • Учебный программный код для этой статьи: Загрузите полный программный код для этой статьи из репозитария ее автора на сайте GitHub.
  • Scala: Современный функциональный язык, работающий на платформе JVM.
  • Akka.io: Всеобъемлющий источник ресурсов по Akka, включая полный набор документации для приложений на Scala и на Java.
  • "Introduction to Actors Systems" (Введение в системы акторов), Джош Сурет (Josh Suereth), конференция DevNexus: Джош Сурет проектирует на Akka распределенный поисковый сервис с использованием акторов и в процессе этой работы демонстрирует множество превосходных возможностей Akka.

Комментарии

Войдите или зарегистрируйтесь для того чтобы оставлять комментарии или подписаться на них.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=Технология Java, Open source
ArticleID=1033416
ArticleTitle=Одновременное исполнение на платформе JVM: Построение основанных на акторах приложений с использованием Akka
publish-date=06102016