Содержание


Путеводитель по Scala для Java-разработчиков

Параллелизм в Scala

Узнайте о том, как Scala упрощает параллельное программирование, а также о возможных подводных камнях

Comments

Серия контента:

Этот контент является частью # из серии # статей: Путеводитель по Scala для Java-разработчиков

Следите за выходом новых статей этой серии.

Этот контент является частью серии:Путеводитель по Scala для Java-разработчиков

Следите за выходом новых статей этой серии.

В 2003 году Герб Саттер (Herb Sutter) в своей статье под заголовком "The Free Lunch Is Over" («Бесплатный сыр закончился») раскрыл один из главных нелицеприятных секретов компьютерной индустрии, наглядно продемонстрировав, что эпоха "все более быстрых процессоров" подходит к концу, и ее должна сменить эра параллелизма, ключевая роль в которой отведена многоядерным процессорам. В кругах разработчиков ПО это произвело эффект разорвавшейся бомбы, поскольку вопросы написания корректного потокобезопасного кода исторически считались прерогативой исключительно высококлассных специалистов, зарплата которых не по карману большинству компаний. Некоторые Java-разработчики все же сумели разобраться с моделью параллелизма в Java, соответствующими API и осознали, как писать потокобезопасный и эффективный код, а также поняли значимость ключевого слова "synchronized"... однако большинству из них пришлось учиться на собственных ошибках.

При этом подразумевается, что остальные разработчики оказываются предоставленными самим себе, что малоприятно, по крайней мере для отделов IT, которые финансируют разработку программного обеспечения.

Как и аналогичный язык под платформу .NET - F# - Scala рассматривается в качестве возможного решения "проблемы параллелизма". В предыдущих статьях были затронуты несколько свойств Scala, которые облегчают написание потокобезопасного кода, например неизменяемые по умолчанию объекты и склонность к возврату копий объектов вместо изменения их содержимого. Однако это лишь малая часть возможностей Scala по поддержке параллелизма. Теперь пришло время рассмотреть библиотеки Scala на предмет средств параллельной обработки данных.

Основы параллельной обработки информации

Перед тем как углубиться в вопросы поддержки параллелизма в Scala, имеет смысл удостовериться, что вы хорошо понимаете базовую модель параллелизма в Java, поскольку средства Scala, направленные на реализацию параллельной обработки, базируются на функциональности, предоставляемой JVM и сопутствующими библиотеками. Вследствие этого мы начнем с рассмотрения классической задачи параллельной обработки под названием "производитель-потребитель" (producer/consumer), которая представлена в листинге 1 (данная задача описана также в разделе "Guarded Blocks" руководства по Java от Sun). Обратите внимание, что в руководстве по Java не используются классы из пакета java.util.concurrent, вместо которых применяются классические методы wait()/notifyAll() базового класса java.lang.Object.

Листинг 1. Решение задачи "производитель-потребитель" без использования средств Java 5
package com.tedneward.scalaexamples.notj5;

class Producer implements Runnable
{
  private Drop drop;
  private String importantInfo[] = {
    "Mares eat oats",
    "Does eat oats",
    "Little lambs eat ivy",
    "A kid will eat ivy too"
  };

  public Producer(Drop drop) { this.drop = drop; }

  public void run()
  {
    for (int i = 0; i < importantInfo.length; i++)
    {
      drop.put(importantInfo[i]);
    }
    drop.put("DONE");
  }
}

class Consumer implements Runnable
{
  private Drop drop;

  public Consumer(Drop drop) { this.drop = drop; }

  public void run()
  {
    for (String message = drop.take(); !message.equals("DONE");
         message = drop.take())
    {
      System.out.format("MESSAGE RECEIVED: %s%n", message);
    }
  }
}

class Drop
{
  //Сообщение, отправляемое производителем потребителям
  private String message;
  
  //True, если потребитель должен ждать отправки сообщения производителем,
  //false, если производитель должен ждать пока потребитель не выберет сообщение из буфера
  private boolean empty = true;

  //Объект, на котором будет выполняться синхронизация
  private Object lock = new Object();

  public String take()
  {
    synchronized(lock)
    {
      //Ожидание поступления сообщения
      while (empty)
      {
        try
        {
          lock.wait();
        }
        catch (InterruptedException e) {}
      }
      //Изменение флага состояния
      empty = true;
      //Уведомление производителя об изменении состояния
      lock.notifyAll();
      return message;
    }
  }

  public void put(String message)
  {
    synchronized(lock)
    {
      //Ожидание выборки сообщения
      while (!empty)
      {
        try
        { 
          lock.wait();
        } catch (InterruptedException e) {}
      }
      //Изменение флага состояния
      empty = false;
      //Сохранение сообщения
      this.message = message;
      //Уведомление потребителя об изменении состояния
      lock.notifyAll();
    }
  }
}

public class ProdConSample
{
  public static void main(String[] args)
  {
    Drop drop = new Drop();
    (new Thread(new Producer(drop))).start();
    (new Thread(new Consumer(drop))).start();
  }
}

"Ошибка" в руководстве по Java

Любознательные читатели наверняка захотят сравнить решения, приведенные в этой статье и в руководстве по Java. При этом выяснится, что вместо простой синхронизации методов put и take, в нашем решении используется объект lock, хранящийся в классе Drop. Причина этого проста: если монитор объекта не заключен внутри класса, то могут возникать проблемы в случаях, подобных показанному ниже (хотя в этом фрагменте и нет никакого смысла):

public class ProdConSample
{
  public static void main(String[] args)
  {
    Drop drop = new Drop();
    (new Thread(new Producer(drop))).start();
    (new Thread(new Consumer(drop))).start();
  synchronized(drop)
  {
    Thread.sleep(60 * 60 * 24 * 365 * 10); // заснуть на 10 лет?!?
  }
  }
}



Однако этих проблем удается избежать, если использовать блокировку на основе закрытого члена класса. По сути, теперь реализация потоковой безопасности полностью заключена внутри класса, в то время как ранее она зависела от благоволения клиента.

Замечание. Приведенный выше код представляет собой несколько модифицированную версию решения из руководства Sun. В их варианте решения есть небольшой недостаток, описанный в заметке "Ошибка" в руководстве по Java").

Задача "производителей-потребителей" формулируется очень просто: существует один или несколько объектов-производителей данных, а также один или несколько объектов-потребителей этих данных. В данном случае "потребление" заключается в выводе информации в консоль. Классы Producer и Consumer, реализующие производителей и потребителей соответственно, представляют собой простые классы с интерфейсом Runnable. Класс Producer выбирает строки из массива и помещает их в буфер (метод put), из которого их извлекает Consumer (метод take).

Основная сложность заключается в том, что если производители будут работать слишком быстро, то данные могут быть перезаписаны и, как следствие, утеряны, а если наоборот, потребители будут работать слишком быстро, то они могут считывать одни и те же данные несколько раз. Задача буфера (класса под названием Drop в руководстве по Java) заключается в предотвращении обеих ситуаций. Разумеется, он также должен предохранять данные от возможного повреждения при их помещении и выборке (это сложно себе представить в случае экземпляров String, но, тем не менее, об этом следует позаботиться).

Эта задача подробно рассматривается в книге Брайана Гётца (Brian Goetz) "Java Concurrency in Practice", а также в более ранней книге Дуга Ли (Doug Lea) "Concurrent Programming in Java" (см. Ресурсы). Для наших целей будет достаточно краткого разбора данного решения перед тем, как мы перейдем к Scala.

Встретив в коде ключевое слово synchronized, компилятор Java генерирует на его месте блок try/finally с кодом операции monitorenter в его начале и monitorexit в конце блока finally. Это гарантирует, что монитор будет корректно освобожден, что бы ни случилось при выполнении кода (монитор – это базовая конструкция в Java для обеспечения атомарности операций). Таким образом, метод put в классе Drop будет выглядеть следующим образом (листинг 2).

Листинг 2. Вид метода Drop.put после обработки компилятором
  // Это псевдокод
  public void put(String message)
  {
    try
    {
    monitorenter(lock)
  
      //Ожидание выборки сообщения
      while (!empty)
      {
        try
        { 
          lock.wait();
        } catch (InterruptedException e) {}
      }
      //Изменение флага состояния
      empty = false;
      //Помещение сообщения в буфер
      this.message = message;
      //Уведомление потребителя об изменении состояния
      lock.notifyAll();
    }
  finally
  {
    monitorexit(lock)
  }
  }

Метод wait() переводит текущий поток в неактивное состояние, в котором он будет находиться до тех пор, пока другой поток не вызовет метод notifyAll() того же объекта. Разбуженный поток сразу же попытается захватить монитор, после чего сможет продолжить выполнение. Фактически методы wait() и notify()/notifyAll() реализуют простой механизм сигнализации, при помощи которого класс Drop может координировать выполнение потоков Producer и Consumer, разрешая одну операцию take на каждую операцию put.

В коде, который находится в архиве к этой статье, используются средства Java 5 по поддержке параллелизма, в частности, интерфейсы Lock и Condition, а также реализация ReentrantLock. Они позволяют добавить поддержку тайм-аутов, но в остальном принципы остаются теми же, что и в листинге 2. Проблема заключается именно в том, что разработчикам, которые пишут аналогичный код, приходится уделять много внимания низкоуровневым деталям координации параллельной обработки, в частности блокировкам и потокам. Более того, им необходимо тщательно проверять каждую строку кода на предмет того, требует ли она синхронизации, поскольку при излишней синхронизации возникают не менее сложные проблемы, чем при ее недостатке.

Теперь можно обратиться к тому, что предлагает Scala.

Классический параллелизм в Scala (версия первая)

Один из вариантов организации параллельной обработки в Scala заключается в адаптации Java-кода. При этом можно использовать некоторые преимущества синтаксиса Scala в целях упрощения кода (листинг 3).

Листинг 3. Объект ProdConSample (Scala)
object ProdConSample
{
  class Producer(drop : Drop)
    extends Runnable
  {
    val importantInfo : Array[String] = Array(
      "Mares eat oats",
      "Does eat oats",
      "Little lambs eat ivy",
      "A kid will eat ivy too"
    );
  
    override def run() : Unit =
    {
      importantInfo.foreach((msg) => drop.put(msg))
      drop.put("DONE")
    }
  }
  
  class Consumer(drop : Drop)
    extends Runnable
  {
    override def run() : Unit =
    {
      var message = drop.take()
      while (message != "DONE")
      {
        System.out.format("MESSAGE RECEIVED: %s%n", message)
        message = drop.take()
      }
    }
  }
  
  class Drop
  {
    var message : String = ""
    var empty : Boolean = true
    var lock : AnyRef = new Object()
  
    def put(x: String) : Unit =
      lock.synchronized
      {
        // Ожидание выборки сообщения
        await (empty == true)
        // Изменение флага состояния
        empty = false
        // Помещение сообщения в буфер
        message = x
        // Уведомление потребителя об изменении состояния
        lock.notifyAll()
      }

    def take() : String =
      lock.synchronized
      {
        // Ожидание помещения сообщения в буфер
        await (empty == false)
        // Изменение флага состояния
        empty=true
        // Уведомление производителя об изменении состояния
        lock.notifyAll()
        // Выборка сообщения
        message
      }

    private def await(cond: => Boolean) =
      while (!cond) { lock.wait() }
  }

  def main(args : Array[String]) : Unit =
  {
    // Создание объекта Drop
    val drop = new Drop();
  
    // Запуск производителя (Producer)
    new Thread(new Producer(drop)).start();
    
    // Запуск потребителя (Consumer)
    new Thread(new Consumer(drop)).start();
  }
}

В этом примере классы Producer и Consumer практически идентичны версиям в Java. Они оба реализуют интерфейс Runnable и перегружают метод run(). При этом класс Producer использует встроенную конструкцию для перебора элементов массива importantInfo. Кстати говоря, в мире Scala массив importantInfo было бы логичнее сделать экземпляром List (т. е. списком), а не Array, но в качестве первого примера имеет смысл рассмотреть наиболее близкий к Java вариант.

Класс Drop также выглядит очень похоже на свой аналог в Java за тем исключением, что в Scala "synchronized" – это не ключевое слово, а метод, определенный в классе AnyRef, который в Scala является базовым для всех ссылочных типов. Это означает, что для синхронизации доступа к конкретному объекту достаточно вызвать у него синхронизирующий метод. В данном примере таковым объектом будет поле lock в классе Drop.

Обратите внимание еще на одну конструкцию Scala, которая используется в классе Drop, а именно – в определении метода await(): параметр cond представляет собой блок кода, который будет вычислен в самом методе, а не до его вызова. В Scala такая конструкция называется "вызов по имени" (call-by-name), а в данном примере ее удобно использовать для реализации условия ожидания, которое в Java-версии приходилось повторять дважды (в методах put и take).

Наконец, в методе main() создается экземпляр Drop и два экземпляра потоков, которые запускаются методом start(). Затем выполнение main() завершается, причем неявно считается, что JVM успеет запустить оба потока до выхода из main(). Кстати, в реальном приложении на это лучше не полагаться, хотя в подобных простых примерах все будет в порядке в 99.99% случаев. Да будет бдительным покупатель.

Тем не менее, несмотря на переписывание кода на Scala, старые проблемы никуда не делись. Разработчикам по-прежнему приходится уделять слишком много внимания координации параллельного выполнения потоков. Разумеется, некоторые синтаксические конструкции Scala несколько облегчают этот процесс, но пока они не выглядят достаточно убедительно.

Параллелизм в Scala (версия вторая)

Бросив даже поверхностный взгляд на документацию по стандартной библиотеке Scala, можно увидеть интересный пакет: scala.concurrency. Он включает в себя множество различных конструкций для поддержки параллелизма, в том числе класс MailBox, который будет использоваться в примерах ниже.

Как можно догадаться по его имени, MailBox (почтовый ящик) представляет собой не что иное как класс Drop – буфер единичного размера, хранящий некоторый объект данных до его выборки. При этом MailBox обладает серьезным преимуществом: он полностью скрывает все детали синхронизации за фасадом из case-классов и сопоставления с образцом. Это делает его более гибким по сравнению с Drop, а также с классом java.util.concurrent.BoundedBuffer, который отличается от Drop тем, что позволяет хранить несколько элементов данных (пример приведен в листинге 4).

Листинг 4. Объект ProdConSample, версия вторая (Scala)
package com.tedneward.scalaexamples.scala.V2
{
  import concurrent.{MailBox, ops}

  object ProdConSample
  {
    class Producer(drop : Drop)
      extends Runnable
    {
      val importantInfo : Array[String] = Array(
        "Mares eat oats",
        "Does eat oats",
        "Little lambs eat ivy",
        "A kid will eat ivy too"
      );
    
      override def run() : Unit =
      {
        importantInfo.foreach((msg) => drop.put(msg))
        drop.put("DONE")
      }
    }
    
    class Consumer(drop : Drop)
      extends Runnable
    {
      override def run() : Unit =
      {
        var message = drop.take()
        while (message != "DONE")
        {
          System.out.format("MESSAGE RECEIVED: %s%n", message)
          message = drop.take()
        }
      }
    }

    class Drop
    {
      private val m = new MailBox()
      
      private case class Empty()
      private case class Full(x : String)
      
      m send Empty()  // инициализация
      
      def put(msg : String) : Unit =
      {
        m receive
        {
          case Empty() =>
            m send Full(msg)
        }
      }
      
      def take() : String =
      {
        m receive
        {
          case Full(msg) =>
            m send Empty(); msg
        }
      }
    }
  
    def main(args : Array[String]) : Unit =
    {
      // Создание объекта Drop
      val drop = new Drop()
      
      // Запуск производителя (Producer)
      new Thread(new Producer(drop)).start();
      
      // Запуск потребителя (Consumer)
      new Thread(new Consumer(drop)).start();
    }
  }
}

Единственным различием между первой и второй версиями является реализация класса Drop, которая теперь основывается на использовании MailBox. Данный класс отвечает за все вопросы блокировки и посылки сигналов при поступлении и выборке сообщений из буфера. При этом можно было бы переписать классы Producer и Consumer таким образом, чтобы они использовали MailBox напрямую, однако лучше поддерживать API класса Drop неизменным во всех примерах. Использование MailBox несколько отличается от классического BoundedBuffer (Drop), поэтому стоит рассмотреть его более подробно.

Класс MailBox поддерживает две базовые операции: send и receive (кроме них еще есть receiveWithin, но это не более чем receive с поддержкой тайм-аута), причем сообщения могут быть произвольных типов. Метод send() помещает сообщение в почтовый ящик, добавляя его в конец связанного списка, а также посылая уведомления всем получателям, которых интересует данный тип сообщения. receive() блокирует текущий поток до того момента, пока не будет получено сообщение, соответствующее переданному в метод функциональному блоку.

Таким образом, в данном примере используются два case-класса, один из которых содержит значение Empty, а другой – Full. Они означают пустой и заполненный данными MailBox соответственно.

  • Метод put, задачей которого является помещение данных в Drop, вызывает метод receive() объекта MailBox, указывая в качестве типа сообщения Empty. Таким образом текущий поток будет заблокирован до появления в ящике элемента Empty. В этот момент метод помещает в MailBox сообщение типа Full, содержащее новые данные.
  • Метод take, задачей которого является извлечение данных из Drop, вызывает метод receive() объекта MailBox, указывая в качестве типа сообщения Full. Затем он извлекает сообщение и помещает элемент Empty в MailBox. Оба метода используют сопоставление с образцом для проверки значений case-классов и присвоения их локальным переменным.

При этом не требуется ни ручного управления блокировками, ни явной работы с мониторами.

Параллелизм в Scala (версия третья)

На самом деле можно существенно сократить объем кода в случаях, когда нет необходимости делать Producer и Consumer самостоятельными классами (как в данном примере). Если они являются не более чем обертками вокруг метода Runnable.run(), то вместо них можно использовать метод scala.concurrent.ops объекта spawn, как показано в листинге 5.

Листинг 5. Объект ProdConSample, версия третья (Scala)
package com.tedneward.scalaexamples.scala.V3
{
  import concurrent.MailBox
  import concurrent.ops._

  object ProdConSample
  {
    class Drop
    {
      private val m = new MailBox()
      
      private case class Empty()
      private case class Full(x : String)
      
      m send Empty()  // инициализация
      
      def put(msg : String) : Unit =
      {
        m receive
        {
          case Empty() =>
            m send Full(msg)
        }
      }
      
      def take() : String =
      {
        m receive
        {
          case Full(msg) =>
            m send Empty(); msg
        }
      }
    }
  
    def main(args : Array[String]) : Unit =
    {
      // Создание объекта Drop
      val drop = new Drop()
      
      // Запуск производителя (Producer)
      spawn
      {
        val importantInfo : Array[String] = Array(
          "Mares eat oats",
          "Does eat oats",
          "Little lambs eat ivy",
          "A kid will eat ivy too"
        );
        
        importantInfo.foreach((msg) => drop.put(msg))
        drop.put("DONE")
      }
      
      // Запуск потребителя (Consumer)
      spawn
      {
        var message = drop.take()
        while (message != "DONE")
        {
          System.out.format("MESSAGE RECEIVED: %s%n", message)
          message = drop.take()
        }
      }
    }
  }
}

Метод spawn, который импортируется в верхней части листинга через объект ops, принимает на вход блок кода (это очередной пример вызова по имени) и создает на его основе анонимный экземпляр потока с соответствующим методом run(). На самом деле логика работы spawn очевидным образом вытекает из его определения в классе ops (листинг 6).

Листинг 6. scala.concurrent.ops.spawn()
  def spawn(p: => Unit) = {
    val t = new Thread() { override def run() = p }
    t.start()
  }

Данный метод – это еще одна иллюстрация возможностей передачи параметров по имени.

Одним из недостатков метода ops.spawn является то, что он был написан в 2003 году, еще до появления классов Java 5, направленных на поддержку параллелизма. В частности, в Java 5 появились такие классы как java.util.concurrent.Executor, которые облегчают задачу порождения новых потоков, освобождая разработчиков от рутинной работы по созданию объектов-потоков. К счастью, определение spawn является настолько простым, что не представляет труда создать собственную реализацию этого метода на основе Executor (также можно использовать ExecutorService или ScheduledExecutorService), который будет управлять запуском потоков.

Следует упомянуть, что поддержка параллелизма в Scala совершенно не ограничивается классами MailBox и ops. Scala также поддерживает понятие "актора". Как и MailBox, акторы работают на основе передачи сообщений, однако используют этот механизм значительно интенсивнее и обладают большей гибкостью. Мы оставим эти вопросы до следующего раза.

Заключение

Во многом аналогично другим аспектам программирования на Java, Scala предоставляет двухуровневую поддержку параллелизма:

  • во-первых, Scala предоставляет полный доступ к низкоуровневым библиотекам Java, таким как java.util.concurrent, и поддерживает классические механизмы параллельного выполнения в стиле Java, например на основе мониторов или методов wait()/notifyAll();
  • во-вторых, Scala предоставляет новый уровень абстракции, построенный на основе этих механизмов. Примером этого является класс MailBox, описанный выше, а также библиотека акторов, которую мы обсудим в следующей статье серии.

Так или иначе, задачей Scala является облегчение труда разработчиков, чтобы они могли сосредоточиться на логике приложения и отвлечься от низкоуровневых деталей параллельного программирования. Разумеется, второй подход решает эту задачу значительно лучше, по-крайней мере для тех разработчиков, которые не привыкли затрачивать серьезные усилия на продумывание деталей параллельной обработки.

Очевидным недостатком текущей версии библиотек Scala является то, что они не используют преимущества классов Java 5. В частности, классу scala.concurrent.ops следовало бы использовать интерфейсы Executor в методе spawn, а также поддерживать версию synchronized на основе интерфейсов Lock. К счастью, это можно исправить, сохранив обратную совместимость. Более того, затратив немного времени, это могут сделать сами разработчики Scala-приложений не дожидаясь, пока новую версию выпустит команда, занимающаяся ядром Scala.


Ресурсы для скачивания


Похожие темы

  • Оригинал статьи: "The busy Java developer's guide to Scala: Explore Scala concurrency. (EN)
  • Путеводитель по Scala для Java-разработчиков (Тед Ньювард, developerWorks).
  • В книге Java Concurrency in Practice (Брайан Гётц, Addison-Wesley Professional, 2006 г.) описываются как основы, так и сложные вопросы параллельного программирования. (EN)
  • В книге Concurrent Programming in Java (Дуг Ли, Prentice Hall PTR, 1999 г.) приводится обзор широкого круга вопросов, связанных с параллельным программированием, а также описывается множество приемов и советов по разработке многопоточных приложений на Java. (EN)
  • Прочитайте статью "Функциональное программирование на Java" (Абхиджит Белапуркар, developerWorks, июль 2004 г.), в которой обсуждаются преимущества и возможности применения функционального программирования с точки зрения разработчика Java. (EN)
  • Ознакомьтесь со статьей "Scala в примерах" (Мартин Одерски, декабрь 2007 г.), в которой приводится короткое введение в Scala, изобилующее примерами (формат PDF). (EN)
  • Прочитайте книгу Программирование на Scala (Мартин Одерски, Лекс Спун, Lex Spoon и Билл Веннерс; сигнальный экземпляр опубликован Artima в феврале 2008 г.) - первое подробное введение в Scala, написанное в соавторстве с Биллом Веннерсом. (EN)
  • Бьёрн Страуструп – автор и создатель языка C++, который он лично охарактеризовал как "улучшенный C". (EN)
  • В книге Java Puzzlers: Traps, Pitfalls, and Corner Cases (Addison-Wesley Professional, июль 2005 г.) на занимательных и интригующих задачах описывается ряд странных ситуаций, возникающих при программировании на Java. (EN)
  • Загрузите Scala и начните ее изучение с этой серии. (EN)
  • Загрузите SUnit - набор классов пакета scala.testing, входящего в стандартный дистрибутив Scala. (EN)
  • Сотни статей по всем аспектам программирования на Java можно найти на сайте developerWorks, в разделе Технология Java.

Комментарии

Войдите или зарегистрируйтесь для того чтобы оставлять комментарии или подписаться на них.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=Технология Java
ArticleID=388638
ArticleTitle=Путеводитель по Scala для Java-разработчиков: Параллелизм в Scala
publish-date=05122009