Одновременное исполнение на платформе JVM

Асинхронная обработка событий в языке Scala

Описание блокирующих и неблокирующих подходов, включая простой неблокирующий код с макросом async

Comments

Серия контента:

Этот контент является частью # из серии # статей: Одновременное исполнение на платформе JVM

Следите за выходом новых статей этой серии.

Этот контент является частью серии:Одновременное исполнение на платформе JVM

Следите за выходом новых статей этой серии.

Асинхронная обработка событий имеет большое значение для параллельных приложений. Независимо от источника событий — будь то отдельные вычислительные задачи, операции ввода/вывода или взаимодействия с внешними системами — программный код приложения должен отслеживать события и координировать действия, предпринимаемые в ответ на эти события. В распоряжении разработчиков приложений имеется два базовых подхода к асинхронной обработке событий.

  • Блокирующий подход. Приложение имеет координирующий поток, который ждет наступления определенного события.
  • Неблокирующий подход. Событие генерирует некоторую форму уведомления для приложения, а поток не осуществляет в явном виде ожидания этого события.

В статье Одновременное исполнение на платформе JVM: Блокировать или не блокировать? были описаны блокирующие и неблокирующие подходы к асинхронной обработке событий в языке Java™ 8 на основе класса CompletableFuture. В данной статье описываются некоторые возможности асинхронной обработки событий в языке Scala. Сначала рассматривается простая блокирующая версия, а затем некоторые неблокирующие варианты. В заключение демонстрируется использование конструкций async / await для преобразования простого блокирующего кода в неблокирующий. Загрузите полный демонстрационный код для данной статьи из репозитария автора на сайте GitHub.

Формирование событий

Классы scala.concurrent.Promise и scala.concurrent.Future предоставляют Scala-разработчикам примерно такой же набор опций, какой разработчики на языке Java 8 имеют благодаря классу CompletableFuture. В частности, класс Future предлагает блокирующие и неблокирующие способы работы с завершениями событий. Однако, несмотря на определенное подобие на этом уровне, приемы, используемые для работы с двумя типами future-объектов, различаются.

В этом разделе приведены примеры блокирующих и неблокирующих подходов для работы с событиями, представляемыми средствами Future. В этой статье используется тот же набор параллельных задач, что и в предыдущей. Я кратко опишу его, прежде чем углубляться в рассмотрение программного кода.

Задачи и установление последовательности действий

Во многих случаях приложение должно осуществить несколько шагов обработки в ходе определенной операции. Представим, например, что прежде чем возвратить ответ пользователю, веб-приложение должно осуществить следующие шаги.

  1. Поиск в базе данных информации для пользователя.
  2. Использование найденной информации для вызова веб-сервиса и, возможно, для следующего запроса к базе данных.
  3. Внесение изменений в базу данных на основе результатов первых двух операций.

На рис. 1 показана структура этого типа.

Рисунок 1. Поток задач приложения
Diagram of four tasks with ordered execution
Diagram of four tasks with ordered execution

На рис. 1 вся обработка разбита на четыре отдельные задачи (task), соединенные стрелками, которые представляют собой зависимости от порядка исполнения. Задача task 1 может исполняться независимо, задачи task 2 и task 3 исполняются после завершения задачи task 1, задача task 4 исполняется после завершения задач task 2 и task 3.

Моделирование асинхронных событий

В реальной системе источниками асинхронных событий обычно являются параллельные вычисления или операции ввода/вывода. Моделировать систему такого типа проще с помощью простых временных задержек. Именно этот подход я и применяю в данной статье. В листинге 1 показан базовый код с синхронизацией по событиям, который я использую для генерации событий в форме Future.

Листинг 1. Код с синхронизацией по событиям
import java.util.Timer
import java.util.TimerTask

import scala.concurrent._

object TimedEvent {
  val timer = new Timer

  /** Возвращение Future-объекта, который успешно завершается с предоставленным значением 
  через secs секунд. */
  def delayedSuccess[T](secs: Int, value: T): Future[T] = {
    val result = Promise[T]
    timer.schedule(new TimerTask() {
      def run() = {
        result.success(value)
      }
    }, secs * 1000)
    result.future
  }

  /** Возвращение Future-объекта, который завершается с исключением IllegalArgumentException 
  через secs * секунд */
  def delayedFailure(secs: Int, msg: String): Future[Int] = {
    val result = Promise[Int]
    timer.schedule(new TimerTask() {
      def run() = {
        result.failure(new IllegalArgumentException(msg))
      }
    }, secs * 1000)
    result.future
  }

Подобно Java-коду из предыдущей статьи, Scala-код в листинге 1 вызывает таймер java.util.Timer для планирования исполнения задач java.util.TimerTask после задержки. Каждая задача TimerTask завершает ассоциированный future-объект в процессе исполнения. Функция delayedSuccess планирует задачу для успешного завершения исполняющегося Scala-интерфейса Future[T] и возвращает future-объект вызывающей стороне. Функция delayedSuccess возвращает тот же самый тип future-объекта, но использует задачу, которая завершает future-объект с исключением IllegalArgumentException.

В листинге 2 демонстрируется использование кода из листинга 1 для создания событий (в форме Future[Int]), соответствующих четырем задачам на рис. 1 (это код из класса AsyncHappy в демонстрационном коде).

Листинг 2. События для демонстрационных задач
// task definitions
def task1(input: Int) = TimedEvent.delayedSuccess(1, input + 1)
def task2(input: Int) = TimedEvent.delayedSuccess(2, input + 2)
def task3(input: Int) = TimedEvent.delayedSuccess(3, input + 3)
def task4(input: Int) = TimedEvent.delayedSuccess(1, input + 4)

Каждый из четырех методов задач в листинге 2 использует конкретные значения задержки для задания момента завершения своей задачи: 1 секунда для задачи task1, 2 секунды для задачи task2, 3 секунды для задачи task3 и 1 секунда для задачи task4. Кроме того, каждый метод принимает входное значение и использует его, а также номер задачи, в качестве (возможного) значения результата для future-объекта. Все эти методы используют success-форму future-объекта; примеры с использованием failure-формы будут показаны позднее.

Эти задачи должны исполняться в порядке, показанном на рис 1. В каждую задачу должно передаваться значение результата, возвращенное предыдущей задачей (или сумма результатов двух предыдущих задач в случае задачи task4). Общее время исполнения должно составлять примерно 5 секунд (1 секунда + max(2 секунды, 3 секунды) + 1 секунда), если задачи task 2 и task 3 исполняются одновременно. Если на вход задачи task1 поступает 1, то результат равен 2. Если этот результат передается в задачу task2 и в задачу task3, то результаты равны 4 и 5. Если сумма этих двух результатов (9) передается на вход задачи task4, то окончательный результат равен 13.

Блокирующее ожидание

Теперь, когда сцена готова, пришло время посмотреть, как Scala обрабатывает завершения событий. Как и в Java-коде из предыдущей статьи, самый простой способ скоординировать исполнение этих четырех задач состоит в использовании блокирующего ожидания. Основной поток поочередно ждет завершения каждой задачи. Этот подход показан в листинге 3 (код также взят из класса AsyncHappy в демонстрационном коде).

Листинг 3. Блокирующее ожидание для задач
def runBlocking() = {
  val v1 = Await.result(task1(1), Duration.Inf)
  val future2 = task2(v1)
  val future3 = task3(v1)
  val v2 = Await.result(future2, Duration.Inf)
  val v3 = Await.result(future3, Duration.Inf)
  val v4 = Await.result(task4(v2 + v3), Duration.Inf)
  val result = Promise[Int]
  result.success(v4)
  result.future
}

В листинге 3 для выполнения блокирующего ожидания используется метод result() Scala-объекта scala.concurrent.Await. Код сначала ждет результата задачи task1, затем создает future-объекты задач task2 и task3, поочередно ждет от них возвращения результата, и, наконец, ждет результата задачи task4. Последние три строки (создание и настройка result) позволяют вышеупомянутому методу возвратить Future[Int]. Возвращение future-объекта обеспечивает соответствие этого метода неблокирующим вариантам, которые я покажу ниже, однако в данном случае future-объект фактически завершается перед возвращением результатов метода.

Сочетание future-объектов

В листинге 4 (он также взят из класса AsyncHappy в демонстрационном коде) показан один из способов связывания future-объектов между собой с целью исполнения задач в надлежащем порядке и с корректными зависимостями без каких-либо блокировок.

Листинг 4. Обработка завершений с использованием метода onSuccess()
def runOnSuccess() = {
  val result = Promise[Int]
  task1(1).onSuccess(v => v match {
    case v1 => {
      val a = task2(v1)
      val b = task3(v1)
      a.onSuccess(v => v match {
        case v2 =>
          b.onSuccess(v => v match {
            case v3 => task4(v2 + v3).onSuccess(v4 => v4 match {
              case x => result.success(x)
            })
          })
      })
    }
  })
  result.future
}

Код в листинге 4 использует метод onSuccess() для настройки исполнения функции (с технической точки зрения это частичная функция, поскольку она обрабатывает лишь случай успешного завершения) при завершении каждого future-объекта. Поскольку вызовы метода onSuccess() являются вложенными, они будут исполнены в требуемом порядке, даже если не все future-объекты завершаются в этом порядке.

Код в листинге 4 достаточно прост для понимания, однако весьма многословен. В листинге 5 показан более простой способ обработки этого случая с использованием метода flatMap().

Листинг 5. Обработка завершений с использованием метода flatMap()
def runFlatMap() = {
  task1(1) flatMap {v1 =>
    val a = task2(v1)
    val b = task3(v1)
    a flatMap { v2 =>
      b flatMap { v3 => task4(v2 + v3) }}
  }
}

Код в листинге 5 в сущности делает то же самое, что и код в листинге 4, однако в листинге 5 метод flatMap() используется для извлечения единственного значения результата из каждого future-объекта. Использование метода flatMap() устраняет конструкцию match / case, необходимую в листинге 4. Это делает код более компактным, но сохраняет прежний пошаговый маршрут исполнения.

Тестирование примера

Демонстрационный код использует Scala-объект App для поочередного исполнения каждой версии кода событий и проверки корректности значения времени до завершения (примерно 5 с) и значения результата (13). Этот код можно запустить из командной строки с помощью Maven (см. листинг 6; не относящаяся к делу выходная информация Maven удалена).

Листинг 6. Исполнение кода примера
dennis@linux-9qea:~/devworks/scala4/code> mvn scala:run -Dlauncher=happypath
...
[INFO] launcher 'happypath' selected => com.sosnoski.concur.article4.AsyncHappy
Starting runBlocking
runBlocking returned 13 in 5029 ms.
Starting runOnSuccess
runOnSuccess returned 13 in 5011 ms.
Starting runFlatMap
runFlatMap returned 13 in 5002 ms.
�

Действия в случае неудачи

Вплоть до настоящего момента вы видели программный код для координации событий в форме future-объектов, которые всегда завершаются успешно. В реальных приложениях нельзя рассчитывать на то, что так будет всегда: у обрабатывающих задач время от времени возникают различные проблемы. В JVM-языках такие проблемы обычно представляются как Throwable.

Изменим определения задач в листинге 2 таким образом, чтобы вместо метода delayedSuccess() использовать метод delayedFailure(), как показано ниже для задачи task4:

def task4(input: Int) = TimedEvent.delayedFailure(1, "This won't work!")

Если в коде, показанном в листинге 3, изменить задачу task4, чтобы ее исполнение завершалось с исключением, то в результате вызова Await.result() задачи task4 вы получите ожидаемое исключение IllegalArgumentException. Если проблема не перехвачена в методе runBlocking(), это исключение передается вверх по цепочке вызовов до тех пор, пока она не будет перехвачена (в противном случае она завершает исполняющийся поток). К счастью, этот код можно легко модифицировать так, чтобы в случае завершения любой задачи с исключением это исключение передавалось вызывающей стороне для обработки посредством возвращенного future-объекта. Соответствующие изменения показаны в листинге 7.

Листинг 7. Блокирующее ожидание с исключениями
def runBlocking() = {
  val result = Promise[Int]
  try {
    val v1 = Await.result(task1(1), Duration.Inf)
    val future2 = task2(v1)
    val future3 = task3(v1)
    val v2 = Await.result(future2, Duration.Inf)
    val v3 = Await.result(future3, Duration.Inf)
    val v4 = Await.result(task4(v2 + v3), Duration.Inf)
    result.success(v4)
  } catch {
    case t: Throwable => result.failure(t)
  }
  result.future
}

В листинге 7 первоначальный код заключен в блок try/catch, а оператор catch передает исключение обратно в качестве завершения возвращенного future-объекта. Этот подход несколько сложнее, однако он по-прежнему достаточно прост для понимания любым Scala-разработчиком.

Что можно сказать относительно неблокирующих вариантов кода обработки событий (в листинге 4 и в листинге 5)? Метод onSuccess(), используемый в листинге 4, работает только с успешными завершениями future-объекта. Если вы хотите обрабатывать и успешные, и сбойные завершения, вместо него нужно использовать метод onComplete() и осуществлять проверку, чтобы видеть, какой тип завершения имеет место. В листинге 8 показано, как применить этот способ к коду обработки событий.

Листинг 8. Использование метода onComplete() для обработки успешных и сбойных завершений
def runOnComplete() = {
  val result = Promise[Int]
  task1(1).onComplete(v => v match {
    case Success(v1) => {
      val a = task2(v1)
      val b = task3(v1)
      a.onComplete(v => v match {
        case Success(v2) =>
          b.onComplete(v => v match {
            case Success(v3) => task4(v2 + v3).onComplete(v4 => v4 match {
              case Success(x) => result.success(x)
              case Failure(t) => result.failure(t)
            })
            case Failure(t) => result.failure(t)
          })
        case Failure(t) => result.failure(t)
      })
    }
    case Failure(t) => result.failure(t)
  })
  result.future
}

Код в листинге 8 выглядит весьма запутанным, однако, к счастью, есть гораздо более простой альтернативный вариант: использовать код flatMap(), показанный в листинге 5. Подход на основе метода flatMap() обрабатывает как успешные, так и сбойные завершения без каких-либо изменений.

Использование макроса async

Последние версии языка Scala поддерживают возможность преобразования кода в процессе компиляции с помощью макросов. Одним из самых полезных макросов, реализованных к настоящему времени, является макрос async, который во время компиляции преобразует предположительно последовательный код, использующий future-объекты, в асинхронный код. В листинге 9 показано, как макрос async упрощает код задачи, используемый в этой статье.

Листинг 9. Сочетание future-объектов с макросом async {}
def runAsync(): Future[Int] = {
  async {
    val v1 = await(task1(1))
    val a = task2(v1)
    val b = task3(v1)
    await(task4(await(a) + await(b)))
  }
}

В листинге 9 охватывающий блок async {...} вызывает макрос async. Этот вызов декларирует, что блок является асинхронным и исполняется асинхронно (по умолчанию), и что он возвращает future-объект в качестве результата. Внутри этого блока метод await() (в действительности это ключевое слово для макроса, а не настоящий метод) показывает, в каком месте требуется результат future-объекта. В процессе компиляции макрос async изменяет абстрактное синтаксическое дерево (abstract syntax tree, AST) Scala-программы таким образом, чтобы преобразовать вышеупомянутый блок в код, использующий обратные вызовы (примерно эквивалентный коду в листинге 4).

За исключением конструкции async {...} код в листинге 9 очень напоминает первоначальный блокирующий код, показанный в листинге 3. В этом и состоит достоинство данного макроса — он абстрагирует всю сложность асинхронных событий и создает у разработчика впечатление, что тот пишет простой линейный код. На самом деле существенные сложности все же имеют место, однако они остаются "за сценой".

Содержимое макроса async

Если посмотреть на классы, сгенерированные компилятором Scala из исходного кода, то можно увидеть несколько внутренних классов с такими именами, как AsyncHappy$$anonfun$1.class. Нетрудно догадаться, что компилятор генерирует эти классы для анонимных функций (таких как утверждения, передаваемые в методы onSuccess() или flatMap()).

При использовании компилятора Scala 2.11.1 и реализации Async 0.9.2 также можно увидеть класс с именем AsyncUnhappy$stateMachine$macro$1$1.class. Это код фактический реализации, сгенерированный макросом asyncв форме конечного автомата для обработки асинхронных задач. В листинге 10 показана часть декомпилированного представления этого класса.

Листинг 10. Декомпилированное представление класса AsyncUnhappy$stateMachine$macro$1$1.class
public class AsyncUnhappy$stateMachine$macro$1$1
  implements Function1<Try<Object>, BoxedUnit>, Function0.mcV.sp
{
  private int state;
  private final Promise<Object> result;
  private int await$macro$3$macro$13;
  private int await$macro$7$macro$14;
  private int await$macro$5$macro$15;
  private int await$macro$11$macro$16;
  ...
  public void resume() {
    ...
  }

  public void apply(Try<Object> tr) {
    int i = this.state;
    switch (i) {
      default:
        throw new MatchError(BoxesRunTime.boxToInteger(i));
      case 3:
        if (tr.isFailure()) {
          result().complete(tr);
        } else {
          this.await$macro$11$macro$16 = BoxesRunTime.unboxToInt(tr.get());
          this.state = 4;
          resume();
        }
        break;
      case 2:
        if (tr.isFailure()) {
          result().complete(tr);
        } else {
          this.await$macro$7$macro$14 = BoxesRunTime.unboxToInt(tr.get());
          this.state = 3;
          resume();
        }
        break;
      case 1:
        if (tr.isFailure()) {
          result().complete(tr);
        } else {
          this.await$macro$5$macro$15 = BoxesRunTime.unboxToInt(tr.get());
          this.state = 2;
          resume();
        }
        break;
      case 0:
        if (tr.isFailure()) {
          result().complete(tr);
        } else {
          this.await$macro$3$macro$13 = BoxesRunTime.unboxToInt(tr.get());
          this.state = 1;
          resume();
        }
        break;
    }
  } 
  ...
}

В листинге 10 метод apply() обрабатывает изменения реального состояния, оценивая результат future-объекта и изменяя состояние вывода соответствующим образом. Входное состояние сообщает коду о том, какой future-объект оценивается; каждое значение состояния соответствует одному конкретному future-объекту внутри async-блока. Такой вывод сложно сделать на основе частично декомпилированного кода в листинге 10, однако при рассмотрении определенных фрагментов остального байткода можно увидеть, что коды состояния соответствуют задачам, поэтому состояние 0 означает, что ожидается результат задачи task1, состояние 1 означает, что ожидается результат задачи task2 и так далее.

Метод resume() не показан в листинге 10, поскольку декомпилятор не смог выяснить, как преобразовать этот метод в Java-код. Я больше не буду углубляться в этот вопрос, однако на основе рассмотрения байткода можно сказать, что метод resume() делает примерно то же самое, что Java-оператор switch делает с кодом состояния. Для каждого нетерминального состояния метод resume() исполняет соответствующий фрагмент кода с целью настройки следующего ожидаемого future-объекта, заканчивая настройкой экземпляра AsyncUnhappy$stateMachine$macro$1$1 в качестве цели для метода onComplete() этого future-объекта. Для терминального состояния метод resume() устанавливает значение результата и завершает promise для окончательного результата.

В действительности у вас нет необходимости углубляться в сгенерированный код, чтобы понять макрос async (хотя это может оказаться интересным). Полное описание работы макроса async содержится в документе SIP-22 - Async.

Ограничения макроса async

Использование макроса async имеет некоторые ограничения по причине способа, которым этот макрос преобразует код в класс конечного автомата. Наиболее существенное ограничение состоит в том, что в рамках блока async нельзя вложить метод await() в другой объект или в замыкание (включая определение функции. Также нельзя вложить await() внутрь конструкции try или catch.

Помимо этих ограничений использования, самая большая проблема макроса async состоит в том, что при отладке вы снова возвращаетесь к трудностям обратных вызовов, нередко сопровождающих асинхронный код — в данном случае трудности возникают при попытке осмыслить стеки вызовов, которые не отражают видимую структуру вашего кода. К сожалению, при нынешней конструкции отладчика не существует никакого способа избавления от этих проблем. В настоящее время в этой области Scala проводится определенная работа (см. документ под названием Rethinking the debugger). А пока для облегчения отладки можно отключить асинхронное исполнение блоков async (в предположении, что проблема, которую вы пытаетесь устранить, сохраняется и при последовательном исполнении).

Кроме того, макросы Scala все еще находятся в стадии разработки. Конечная цель этой деятельности состоит в том, чтобы официально включить макрос async в состав языка Scala в его будущем релизе, однако это произойдет только тогда, когда проектировщики языка Scala будут удовлетворены работой макроса. До этого момента не существует никаких гарантий того, что форма макроса async не изменится.

Заключение

Некоторые Scala-подходы к обработке асинхронных событий значительно отличаются от Java-подходов, рассмотренных в статье Одновременное исполнение на платформе JVM: Блокировать или не блокировать? . Благодаря методу flatMap() и макросу async язык Scala предлагает "чистые" и легкие для понимания технические приемы. В этом смысле особенный интерес представляет макрос async, позволяющий разработчику писать код, который выглядит как обычный последовательный код, однако после компиляции исполняется параллельно. Scala — это не единственный язык, предлагающий подобный подход, однако реализация на основе макроса обеспечивает более высокую степень гибкости, чем при использовании других подходов.


Ресурсы для скачивания


Похожие темы

  • Оригинал статьи: JVM concurrency: Asynchronous event handling in Scala
  • Scalable Scala: Денис Сосноски, автор данного цикла, делится опытом и внутренней информацией о его содержании и о разработке на языке Scala в целом.
  • Демонстрационный программный код для данной статьи. Загрузите полный демонстрационный код для данной статьи из репозитария автора на сайте GitHub.
  • Scala: современный функциональный язык, работающий на платформе JVM.
  • SIP-22 - Async Philipp Haller и Jason Zaugg, Scala Improvement Process. В этом предложении категории SIP (Scala Improvement Process) описывается назначение конструкций async и await, а также излагаются подробности преобразования кода при создании класса конечного автомата.
  • An asynchronous programming facility for Scala, Jason Zaugg и другие, Github. Загрузите исходный код и новейшую документацию для реализации async.
  • Rethinking the debugger, Iulian Dragos, конференция Scala Days 2014. Познакомьтесь с захватывающими идеями, которые используются в языке Scala для упрощения отладки параллельных программ.

Комментарии

Войдите или зарегистрируйтесь для того чтобы оставлять комментарии или подписаться на них.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=Технология Java, Open source
ArticleID=1000642
ArticleTitle=Одновременное исполнение на платформе JVM: Асинхронная обработка событий в языке Scala
publish-date=03162015