Решение общих проблем параллелизма с помощью GPars

Узнайте, как библиотека параллельных операций языка Groovy использует популярные модели параллелизма и делает их доступными на платформе Java

Переход к многоядерным вычислительным системам подстегнул интерес к таким моделям параллельного программирования, как разделение/объединение, акторы, агенты и исполнители. И хотя эти модели пришли из разных языков программирования, многие из них инкапсулированы в GPars — библиотеке параллельных операций на основе Groovy. В этой статье Алекс Миллер научит вас решать проблемы параллелизма с помощью методов, позаимствованных из областей функционального и объектно-ориентированного программирования, и сделает все это с помощью знакомого и совместимого с Java™ синтаксиса Groovy.

Алекс Миллер, старший инженер, Revelytix

Алекс МиллерАлекс Миллер (Alex Miller), старший инженер компании Revelytix, занимается созданием технологии запросов для федерированного семантического интернета. Последние два года он работает исключительно с Clojure. До прихода Revelytix Алекс был техническим специалистом компании Terracotta, инженером BEA Systems и главным архитектором ПО MetaMatrix. В круг его интересов входят Java, параллельная обработка, распределенные системы, языки программирования и разработка программного обеспечения. Алекс занимается твитингом под псевдонимом @puredanger и блоггингом под псевдонимом http://tech.puredanger.com. Он - организатор конференции разработчиков Strange Loop и группы по изучению функциональных и динамических языков Lounge Lambda. Еще он ― большой любитель мексиканского начос.



01.02.2013

Развить навыки по этой теме

Этот материал — часть knowledge path для развития ваших навыков. Смотри Параллелизм Java

Сегодня, в эпоху параллелизма, широкое распространение получили процессоры с 4, 8 и 16 ядрами, а в ближайшем будущем мы увидим процессоры с сотнями или даже тысячами ядер. Появление таких вычислительных ресурсов открывает перед нами невероятные возможности, но одновременно создает определенные трудности для разработчиков программного обеспечения. Потребность в максимально эффективном использовании новых замечательных многоядерных процессоров ядер подстегнула интерес к параллелизму, управлению состояниями и языкам программирования, поддерживающим обе эти возможности.

Алекс Миллер и Эндрю Гловер обсуждают параллелизм

Эндрю Гловер берет интервью у Алекса о параллелизме, GPars и будущих перспективах развития многоядерности в двух подробных подкастах. Послушайте часть 1 и часть 2.

Перечисленным требованиям отвечают такие языки JVM, как Groovy, Scala и Clojure. Эти три языка являются сравнительно новыми и способны работать в высокооптимизированной среде JVM, предоставляя при этом доступ к надежным библиотекам параллельных операций языка Java, добавленным в версии Java 1.5. Эти языки позволяют активно использовать параллельное программирование, хотя каждый из них использует особый подход на основе собственной философии.

В этой статье мы на примере GPars — библиотеки параллельных операций на основе Groovy — исследуем модели решения таких проблем параллелизма, как фоновая обработка, параллельная обработка, управление состояниями и координация потоков.

Почему Groovy? Почему GPars?

Groovy представляет собой язык динамического типа, работающий на JVM. Groovy, разработанный на основе на языка Java, свободен от большей части церемониального синтаксиса, присущего коду Java, и добавляет полезные возможности, позаимствованные из других языков программирования. Одна из самых сильных сторон Groovy заключается в том, что он позволяет программистам легко создавать DSL, основанные на Groovy (DSL, или проблемно-ориентированный язык, — это скриптовый язык, предназначенный для решения специфических проблем программирования. Дополнительная информация о DSL приведена в разделе Ресурсы.

Скачайте код и инструменты

Загляните в раздел Ресурсы и скачайте Groovy, GPars и другие инструменты, используемые в этой статье. Также можно скачать примеры исполняемого кода для этой статьи.

GPars, или Groovy Parallel Systems, представляет собой библиотеку параллельных операций языка Groovy, включающую модели параллелизма и координации в виде DSL. Идеи GPars позаимствованы из ряда наиболее популярных моделей параллелизма и координации других языков, в том числе:

  • Исполнители и разделение/объединение — из языка Java
  • Акторы — из языков Erlang и Scala
  • Агенты — из языка Clojure
  • Переменные потока данных — из языка Oz

Groovy и GPars составляют идеальную пару для демонстрации различных подходов к параллелизму. Даже разработчики, пишущие на языке Java и незнакомые с Groovy, смогут легко следить за обсуждением, поскольку синтаксис Groovy основан на языке Java. Примеры в этой статье опираются на Groovy 1.7 и GPars 0.10.


Фоновая и параллельная обработка

Широко распространенная проблема производительности связана с необходимостью ожидать завершения операций ввода/вывода. Операции ввода/вывода могут включать чтение данных с диска, из базы данных или из Web-сервиса, либо даже от пользователя. Когда поток заблокирован и ждет завершения операции ввода/вывода, было бы полезно отделить ожидающий поток от исходного потока исполнения, чтобы последний мог продолжить работу. Поскольку ожидание становится при этом фоновым процессом, мы называем эту методику фоновой обработкой.

Например, представьте себе программу, которая вызывает API Твиттера для поиска свежих сообщений о нескольких языках JVM и затем распечатывает эти сообщения. Groovy позволяет легко написать такую программу с помощью библиотеки twitter4j языка Java. Пример подобной программы приведен в листинге 1:

Листинг 1. Последовательное считывание сообщений Твиттера (langTweets.groovy)
import twitter4j.Twitter
import twitter4j.Query

@Grab(group='net.homeip.yusuke', module='twitter4j', version='2.0.10')
def recentTweets(api, queryStr) {
  query = new Query(queryStr)
  query.rpp = 5		// число возвращаемых сообщений
  query.lang = "en"		// язык
  tweets = api.search(query).tweets
  threadName = Thread.currentThread().name
  tweets.collect {
    "[${threadName}-${queryStr}] @${it.fromUser}: ${it.text}"
  }
}

def api = new Twitter()
['#erlang','#scala','#clojure'].each {
  tweets = recentTweets(api, it)
  tweets.each {
    println "${it}"
  }
}

В листинге 1, я сначала использовал Groovy Grape (см. Ресурсы) для получения зависимости библиотеки twitter4j. Затем я определил метод recentTweets для формирования строки запроса и выполнил этот запрос, получив список сообщений Твиттера, сформатированных в виде строк. Наконец, я просмотрел каждый тег в моем списке тегов, получил сообщения Твиттера и распечатал их. Так как потоки в данном случае не использовались, программа выполняет поиск последовательно, как показано на рисунке 1:

Рисунок 1. Последовательное чтение сообщений Твиттера
A program model for reading tweets serially.

Параллельная обработка

Если уж программа, приведенная в листинге 1, собирается чего-то ожидать, то пусть ждет появления сразу нескольких событий. Если я помещу каждый удаленный запрос в отдельный фоновый поток, программа будет ждать ответа на все запросы параллельно, как показано на рисунке 2:

Рисунок 2. Параллельное считывание сообщений Твиттера
An improved program model for reading tweets in parallel.

GPars Executors DSL облегчают преобразование программы, приведенной в листинге 1, из последовательной в параллельную, как показано в листинге 2:

Параллельное считывание сообщений Твиттера (langTweetsParallel.groovy)
  import twitter4j.Twitter
import twitter4j.Query
import groovyx.gpars.GParsExecutorsPool

@Grab(group='net.homeip.yusuke', module='twitter4j', version='2.0.10')
@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
def recentTweets(api, queryStr) {
  query = new Query(queryStr)
  query.rpp = 5		// число возвращаемых сообщений
  query.lang = "en"		// язык
  tweets = api.search(query).tweets
  threadName = Thread.currentThread().name
  tweets.collect {
    "[${threadName}-${queryStr}] @${it.fromUser}: ${it.text}"
  }
}

def api = new Twitter()
GParsExecutorsPool.withPool {
  def retrieveTweets = { query ->
    tweets = recentTweets(api, query)
    tweets.each {
      println "${it}"
    }
  }

  ['#erlang','#scala','#clojure'].each {
    retrieveTweets.callAsync(it)
  }
}

Применив DSL Executors, я добавил оператор import для GParsExecutorsPool и оператор Grab для захвата библиотеки GPars с помощью системы зависимостей Groovy Grape. Затем я вставил блок GParsExecutorsPool.withPool, который усовершенствовал код внутри блока, добавив дополнительные возможности. В Groovy замыкания можно вызывать с помощью метода call. GParsExecutorsPool будет исполнять замыкания, вызванные методом call, как задачи нижележащего пула. Кроме того, GParsExecutorsPool дополняет замыкания методом callAsync, который вызывается и немедленно возвращает значение без блокировки. В данном примере я поместил операции поиска и распечатки сообщений в замыкание и затем вызывал их асинхронно для каждого запроса.

Поскольку GPars передает эти задачи в пул рабочих потоков, я могу теперь выполнять все операции поиска параллельно (при условии, что пул обладает достаточным размером). Кроме того, я могу параллельно обрабатывать результаты каждого запроса, выводя их на экран по мере поступления.

Этот пример иллюстрирует два способа, которыми фоновая обработка может повысить производительность и ускорить реакцию: ожидание завершения ввода/вывода выполняются параллельно, при этом процессы, зависящие от такого ожидания, также могут исполняться параллельно.


Исполнители

Вам, вероятно, интересно узнать, что представляет собой исполнитель (Executor) и как выполняется фоновая обработка в GParsExecutorsPool. В сущности, исполнители являются частью библиотеки java.util.concurrent, представленной в Java 5 (см. Ресурсы). Интерфейс java.util.concurrent.Executor содержит один-единственный метод: execute(Runnable). Он существует для того, чтобы отделить передачу задания от способа его исполнения в реализации Executor.

Класс java.util.concurrent.Executors предлагает много полезных методов для создания экземпляров Executor, поддерживаемых множеством разных типов потоков-пулов. По умолчанию GParsExecutorsPool использует пул потоков с потоками-демонами и фиксированным числом потоков (Runtime.getRuntime().availableProcessors() + 1). В тех случаях, когда стандартная конфигурация неприменима, можно легко изменить размер пула потоков, использовать специальный метод ThreadFactory или указать весь существующий пул Executor (withExistingPool).

Чтобы изменить число потоков, я мог бы просто передать число потоков GParsExecutorsPool в метод withPool, как показано в листинге 3:

Листинг 3. Организация пула Executor путем передачи числа потоков
GParsExecutorsPool.withPool(8) {
  // ...
}

Или, если бы я захотел передать собственную фабрику потоков, содержащую специально указанные потоки, я мог бы сделать это так, как показано в листинге 4.

Листинг 4. Организация пула Executor с помощью пользовательской фабрики потоков
def threadCounter = new AtomicLong(0)
def threadFactory = {Runnable runnable ->
  Thread thread = new Thread(runnable)
  id = threadCounter.getAndIncrement()
  thread.setName("thread ${id}")
  return thread
} as ThreadFactory

def api = new Twitter()
GParsExecutorsPool.withPool(2, threadFactory) {
  // ...
}

Методы async и callAsync не блокируют работу и немедленно возвращают объект Future, который представляет будущие результаты асинхронного расчета. Получатель может попросить Future заблокировать работу до готовности результата, выполнить опрос на предмет полноты результата, отменить расчет или проверить, не отменен ли он другим потоком. Подобно интерфейсу Executor, класс Future является частью нижележащего пакета java.util.concurrent.


Параллелизм для CPU

В примере, демонстрирующем фоновую обработку, вы видели преимущества параллельного ожидания нескольких операций ввода/вывода по сравнению с их последовательной обработкой. Применение пула рабочих потоков для параллельного исполнения нескольких операций благоприятно сказывается и на работе ЦП.

На доступную приложению степень параллелизма и, следовательно, на диапазон доступных вам возможностей программирования оказывают влияние два важных аспекта:

  • Уровень гранулярности задач, описывающий протяженность задач по времени и по объему данных
  • Зависимости задач, описывающие число зависимостей, обычно существующих между задачами

Оба аспекта существуют в некотором континууме, так что перед принятием решения полезно определить, зависит ли ваша проблема от этого континуума. Например, уровень гранулярности программной задачи может определяться большими операциями, размером в одну транзакцию, или состоять из множества коротких вычислений, выполняемых в малом фрагменте бόльшего набора данных (например, несколько пикселов целого изображения). В связи с накладными расходами, связанными с переключением контента между потоками или процессами, системы с малым масштабом гранулярности часто оказываются неэффективными и демонстрируют низкую производительность. Один из способов оптимизации эффективности системы заключается в пакетной обработке, учитывающей уровень гранулярности задач.

Задачи с малым числом зависимостей часто описывают как "чрезвычайно параллельные", имея в виду, что их очень легко разбить на несколько параллельных задач. Классические примеры включают обработку изображений, поиск методом перебора, фракталы и моделирование частиц. К этой категории можно отнести любые коммерческие программы, обрабатывающие или преобразующие большое число заказов или файлов.

Борьба за очередь исполнителя

Мы уже исследовали механизмы помещения задач в пул исполнителя с помощью GPars. Однако следует помнить, что исполнители были добавлены в стандартную версию платформы Java 2 (J2SE) примерно в 2005 году. Поэтому они рассчитаны на относительно малое число ядер (от 2 до 8), работающих в крупногранулярном режиме, и могут блокировать транзакции, имеющие малое число межзадачных зависимостей. Исполнители реализованы в виде одной входящей очереди задач, общей для всех рабочих потоков.

Ключевая проблема этой модели заключается в том, что увеличение числа рабочих потоков ужесточает борьбу за доступ к рабочей очереди (это показано на рисунке 3). В конечном итоге с увеличением числа потоков и ядер эта борьба становится узким местом масштабируемости.

Рисунок 3. Борьба за очередь исполнителя
A diagram showing the scalability bottleneck that results from executor queue contention.

Альтернативой очередям исполнителя является система разделений/объединений, которая доступна сейчас в рамках обновления JSR 166y (см. Ресурсы) и будет формально представлена в платформе Java в JDK 7. Разделение/объединение рассчитано на большое число параллельных потоков, выполняющих мелкогранулярные вычислительные задачи.


Разделение/объединение в GPars

Разделение/объединение поддерживает определение зависимостей между задачами и создание новых задач; эти свойства идеально подходят для алгоритмов в стиле «разделяй и властвуй», в которых задача разделяется на подзадачи и затем снова объединяет подзадачи (см. Ресурсы). Разделение/объединение решает проблему борьбы за очередь, предоставляя каждому процессу одну рабочую очередь. Используемая в каждом таком случае очередь фактически является двусторонней очередью, которая позволяет потокам перехватывать операции с конца другой очереди, осуществляя балансировку входящих в пул операций.

Давайте рассмотрим задачу нахождения максимального значения в списке. Наиболее очевидная стратегия заключается в простом переборе всех чисел и установке меток на самые большие значения, обнаруженные во время перебора. Однако эта стратегия последовательна по своей природе и не в полной мере использует возможности дорогостоящих вычислительных ядер.

Теперь давайте посмотрим, что произойдет, если я реализую функцию поиска максимального значения в виде параллельного алгоритма в стиле «разделяй и властвуй». Алгоритм «Разделяй и властвуй» является рекурсивным; каждый его шаг имеет структуру, показанную в листинге 5:

Листинг 5. Алгоритм «разделяй и властвуй»
IF проблема достаточно мала для непосредственного решения
THEN решить ее непосредственно
ELSE {
  Разделить проблему на две или более подпроблемы 
  Решить каждую подпроблему
  Объединить результаты
}

Оператор IF позволяет варьировать степень гранулярности каждой задачи. Стиль этого алгоритма порождает дерево, листья которого определяются задачами, ушедшими на ветвь THEN. Внутренние узлы дерева являются задачами, ушедшими на ветвь ELSE. Каждый внутренний узел должен ждать (зависеть от) завершения двух (или более) своих дочерних задач. Модель разделения/объединения предназначена именно для таких алгоритмов, в которых вы имеете большое число задач, ожидающих в дереве зависимостей. Ожидание задач в структуре разделения/объединения фактически не блокирует потоки.

GPars позволяет нам создавать и выполнять алгоритмы разделения/объединения путем запуска задачи разделения/объединения, как показано в листинге 6:

Листинг 6. Параллельная реализация функции max() за счет разделения/объединения (computeMax.groovy)
import static groovyx.gpars.GParsPool.runForkJoin
import groovyx.gpars.GParsPool
import groovyx.gpars.AbstractForkJoinWorker

@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
class Config {
	static DATA_COUNT = 2**14
	static GRANULARITY_THRESHHOLD = 128
	static THREADS = 4
}

items = [] as List<Integer>
items.addAll(1..Config.DATA_COUNT)
Collections.shuffle(items)

GParsPool.withPool(Config.THREADS) {
  computedMax = runForkJoin(1, Config.DATA_COUNT, items.asImmutable()) 
    {begin, end, items ->
      int size = end - begin
      if (size <= Config.GRANULARITY_THRESHHOLD) {
        return items[begin..<end].max()
      } else { // divide and conquer
        leftEnd = begin + ((end + 1 - begin) / 2)
        forkOffChild(begin, leftEnd, items)
        forkOffChild(leftEnd + 1, end, items)
        return childrenResults.max()
      }
    }
		
	println "expectedMax = ${Config.DATA_COUNT}"
	println "computedMax = ${computedMax}"
}

Обратите внимание, что разделение/объединение имеет свой собственный специальный пул в классе groovyx.gpars.GParsPool. GParsPool имеет много общих черт с GParsExecutorsPool, но обладает и специальными возможностями, характерными для разделения/объединения. Для непосредственного применения разделения/объединения нужно использовать либо метод runForkJoin() с замыканием задачи, либо класс задачи, содержащий подкласс AbstractForkJoinWorker.


Параллельные подборки

Разделение/объединение обеспечивает превосходный способ определения и выполнения параллельных задач, в особенности применительно к алгоритмам типа «разделяй и властвуй». Впрочем, как вы могли заметить, предыдущий пример содержал достаточно сложную церемонию. Я был вынужден определить замыкание задачи, соответствующую ей гранулярность, выделить подпроблему, объединить результаты и так далее.

В идеале хотелось бы работать на более высоком уровне абстракции, чтобы просто определить структуру данных и затем параллельно выполнять общие операции, не определяя в каждом случае низкоуровневые задачи, осуществляющие детальную обработку.

Пакет обновлений JSR 166y определяет для этой цели интерфейс высокого уровня, который называется ParallelArray. ParallelArray реализует общие операции функционального программирования над структурой массива, исполняемые параллельно с помощью пула разделения/объединения.

Благодаря функциональной природе этого API, во многие такие операции необходимо передавать функцию (метод), которые будут исполняться на каждом объекте в ParallelArray. Одна из возможностей, все еще разрабатывающаяся для JDK 7, — это поддержка лямбда-функций, которая позволяла бы разработчикам определять и передавать блоки кодов. В настоящее время вопрос о включении ParallelArray в JDK 7 ожидает результатов завершения лямбда-проекта (см. Ресурсы).


ParallelArray в GPars

Groovy полностью поддерживает определение блоков кода в виде замыканий и передачу их как объектов первого класса, поэтому совершенно естественно работать в этом функциональном стиле с помощью Groovy и GPars. ParallelArray и GPars поддерживают базовый набор функциональных операторов:

  • map
  • reduce
  • filter
  • size
  • sum
  • min
  • max

Кроме того, GPars расширяет коллекции внутри блока GParsPool, предлагая дополнительные параллельные методы, основанные на примитивах:

  • eachParallel
  • collectParallel
  • findAllParallel
  • everyParallel
  • groupByParallel

Параллельные методы коллекций можно сделать прозрачными, так чтобы стандартные методы коллекций работали параллельно по умолчанию. Это позволяет передать параллельную коллекцию в существующий (не параллельный) код и потенциально использовать данный код без каких-либо изменений. Тем не менее по-прежнему важно учитывать состояние, используемое этими параллельными методами, поскольку внешний код может не гарантировать необходимой синхронизации.

Возвращаясь к примеру из листинга 6, обратите внимание, что метод max()для параллельных коллекций уже существует, поэтому можно обойтись без прямого определения и вызова разделения/объединения, как показано в листинге 7:

Листинг 7. Применение функций GPars ParallelArray (computeMaxPA.groovy)
import static groovyx.gpars.GParsPool.runForkJoin
import groovyx.gpars.GParsPool

@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
class Config {
	static DATA_COUNT = 2**14
	static THREADS = 4
}

items = [] as List<Integer>
items.addAll(1..Config.DATA_COUNT)
Collections.shuffle(items)

GParsPool.withPool(Config.THREADS) {
	computedMax = items.parallel.max()
	println "expectedMax = ${Config.DATA_COUNT}"
	println "computedMax = ${computedMax}"
}

Функциональное программирование с помощью ParallelArray

Теперь предположим, что я пишу отчет о заказах, поступивших в систему складского учета, подсчитывая число просроченных заказов и среднее количество дней просрочки для этих заказов. Я могу сделать это, определив методы, которые сначала устанавливали бы, просрочен ли заказ (сравнивая срок исполнения с текущей датой), и затем вычисляли бы разницу в днях между текущей датой и сроком исполнения.

Основным компонентом этого кода является та его часть, которая рассчитывает необходимые значения с помощью базовых методов ParallelArray, как показано в листинге 8:

Листинг 8. Применение параллельных функциональных операторов (orders.groovy) в полном объеме
GParsPool.withPool {
  def data = createOrders().parallel.filter(isLate).map(daysOverdue)
  println("# overdue = " + data.size())
  println("avg overdue by = " + (data.sum() / data.size()))
}

В этом примере я беру список заказов, преобразую его в ParallelArray, оставляю только те заказы, для которых isLate возвратил истинное значение, и применяю к каждому заказу функцию отображения, вычисляющую число просроченных дней. Затем я могу применить к полученному массиву встроенные функции агрегирования, чтобы получить число и сумму просроченных дней и вычислить среднее значение. Данный код очень похож на тот, который можно увидеть в языке функционального программирования, но по сравнению с последним обладает тем дополнительным преимуществом, что автоматически исполняется параллельно.


Управление состоянием

Всякий раз при работе с данными, которые будут считываться или записываться несколькими потоками, вы должны решить, как управлять этими данными и координировать изменения. Основная парадигма управления общим состоянием в языке Java и других языках заключается в использовании состояния изменчивости, защищенного блокировками или другими критическими маркерами группы.

Состояния изменчивости и блокировки проблематичны по многим причинам. Блокировки подразумевают зависимость от порядка исполнения кода, что позволяет разработчику рассуждать о пути исполнения и ожидаемых результатах. Тем не менее, поскольку многие аспекты блокировки принудительно не реализуются, часто встречается код плохого качества, содержащий проблемы видимости, безопасной публикации, ситуаций конкуренции, взаимоблокировок и другие типичные ошибки параллельного исполнения.

Более серьезная проблема заключается в том, что даже если взять два компонента, корректно написанные с точки зрения параллельности, то возможно (и даже вероятно), что их объединение породит новые неожиданные ошибки. Поэтому трудно создать системы параллельной обработки, основанные на состоянии изменчивости и блокировках, которые сохраняли бы надежность по мере роста системы.

В следующих разделах я продемонстрирую три парадигмы управления состояниями и совместного доступа к состояниям во всех потоках системы. В сущности, эти парадигмы можно построить (и они строятся) на фундаменте потоков и блокировок, но при этом они создают более высокий уровень абстракции, который существенно снижает сложность.

Три подхода, которые я продемонстрирую — это акторы, агенты и переменные потока данных, и все они поддерживаются в GPars.


Акторы

Впервые парадигма актора была популяризирована в языке Erlang, а недавно она приобрела дополнительную известность благодаря применению в Scala (подробную информацию об этих языках можно найти в разделе Ресурсы). Erlang был создан в 1980-х и 1990-х годах компанией Ericsson для устройств, подобных телекоммуникационному коммутатору AXD301. Конструктивные требования к таким коммутаторам могут быть весьма сложными: чрезвычайно высокая надежность, отсутствие простоев (горячее обновление кода) и высокий уровень параллельности.

Erlang предполагает наличие мира "процессов", которые подобны легковесным потокам (а не работающим системным процессам), но не распределяются непосредственно по собственным потокам. Выполнение процесса планируется виртуальной машиной, на которой он исполняется. Предполагается, что процессы в языке Erlang занимают малый объем памяти, быстро запускаются и быстро переключаются на нужный контекст.

Акторы языка Erlang — это просто функции, которые выполняются в вычислительной системе. Erlang не имеет общей памяти, и все его состояния неизменяемы. Неизменяемые данные являются ключевым аспектом многих языков, фокусирующихся на параллелизме, поскольку они обладают весьма привлекательными свойствами. Неизменяемые данные нельзя изменить, вследствие чего они не требуют блокировки даже при считывании несколькими потоками. Изменение неизменяемых данных заключается в создании новой версии данных и дальнейшей работе с этой новой версией. Для разработчиков, привыкших работать с языками, которым свойственны состояния общего доступа и изменчивости (подобными языку Java), такой сдвиг перспективы может потребовать некоторого изменения стиля работы.

Состояния "обобществляются" между акторами путем передачи неизменяемых сообщений. Каждый актор имеет почтовый ящик, и функция актора многократно исполняется по мере поступления в этот ящик сообщений. Отправка сообщений выполняется обычно асинхронно, хотя можно легко организовать синхронные вызовы, и некоторые реализации акторов обеспечивают такие возможности.

Акторы в GPars

GPars реализует модель акторов, используя многие концепции языков Erlang и Scala. Акторы в GPars представляют собой легковесные процессы, забирающие сообщения из почтового ящика. Можно исключить или сохранить привязку к потоку в зависимости от того, выбираются ли сообщения с помощью метода —receive() или react().

В GPars акторы можно создавать из метода фабрики, который получает в качестве аргумента замыкание, или путем создания подкласса groovyx.gpars.actor.AbstractPooledActor. Внутри актора должен присутствовать метод act(). Обычно метод act() содержит бесконечно повторяемый цикл и затем вызывает либо react (для легковесного актора) либо receive (для тяжеловесного актора, который остается привязанным к потоку).

Listing 9. Реализация актора для игры 'Камень, ножницы, бумага' (rps.groovy)
package puredanger.gparsdemo.rps;

import groovyx.gpars.actor.AbstractPooledActor

enum Move { ROCK, PAPER, SCISSORS }

@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
class Player extends AbstractPooledActor {
  String name
  def random = new Random()
  
  void act() {
    loop {
      react {
        // игрок отвечает случайным ходом 
        reply Move.values()[random.nextInt(Move.values().length)]
      }
    }
  }
}

class Coordinator extends AbstractPooledActor {
  Player player1
  Player player2
  int games
  
  void act() {
    loop {
      react {
	// начало игры
        player1.send("play")
        player2.send("play")
        
        // определение победителя
        react {msg1 ->
          react {msg2 ->          
            announce(msg1.sender.name, msg1, msg2.sender.name, msg2) 

            // продолжение игры
            if(games-- > 0) 
              send("start")
            else 
              stop()
          }
        }
      }
    }
  }
  
  void announce(p1, m1, p2, m2) {
    String winner = "tie"
    if(firstWins(m1, m2) && ! firstWins(m2, m1)) {
      winner = p1
    } else if(firstWins(m2, m1) && ! firstWins(m1, m2)) {
      winner = p2
    } // в противном случае, связать
    
    if(p1.compareTo(p2) < 0) {
      println toString(p1, m1) + ", " + toString(p2, m2) + ", winner = " + winner      
    } else {
      println toString(p2, m2) + ", " + toString(p1, m1) + ", winner = " + winner 
    }
  }
  
  String toString(player, move) {
    return player + " (" + move + ")"
  }
  
  boolean firstWins(Move m1, Move m2) {
    return (m1 == Move.ROCK && m2 == Move.SCISSORS) || 
        (m1 == Move.PAPER && m2 == Move.ROCK) ||
        (m1 == Move.SCISSORS && m2 == Move.PAPER)
  }
}


final def player1 = new Player(name: "Player 1") 
final def player2 = new Player(name: "Player 2")  
final def coordinator = new Coordinator(player1: player1, player2: player2, games: 10)

[player1,player2,coordinator]*.start()
coordinator << "start"
coordinator.join()
[player1,player2]*.terminate()

Листинг 9 содержит полное представление игры "Камень, ножницы, бумага", реализованной с помощью GPars с применением актора Coordinator (координатор) и двух акторов Player (игрок). Coordinator отправляет сообщение play обоим Player и ждет поступления ответа. После получения двух ответов Coordinator распечатывает исход матча и отправляет себе сообщение о начале новой игры. Акторы Player ждут, когда их попросят сделать ход, после чего каждый из них отвечает некоторым произвольным ходом.

GPars обеспечивает прекрасную реализацию всех ключевых возможностей акторов плюс некоторые дополнительные возможности. Подход, основанный на акторах, возможно, и не является лучшим решением всех проблем параллелизма, но он предоставляет превосходный способ моделирования проблем, который включает передачу сообщений.


Агенты

Агенты используются в языке Clojure (см. Ресурсы) для координации многопоточного доступа к определяемому фрагменту изменяющегося состояния. Подход на основе агентов разрывает ненужную связь между идентификатором (имя некоторого объекта, к которому вы обращаетесь) и текущим значением, на которое указывает этот идентификатор. В большинстве языков эти два аспекта тесно связаны, то есть наличие имени означает, что вы можете изменить и само значение. Эта взаимосвязь показана на рисунке 4:

Рисунок 4. Агент, управляющий состоянием
A diagram showing the relationship of agent to state.

Агенты обеспечивают уровень косвенной адресации между держателем переменной и самим изменчивым состоянием. Чтобы изменить состояние, вы передаете функцию агенту, и он обрабатывает поток этих функций, заменяя состояние выходным значением каждой функции. Поскольку агент организует последовательный доступ к данным, риск конкуренции или повреждения данных отсутствует.

Кроме того, чтение данных выполняется путем рассмотрения текущего снимка, который можно сделать с ограниченным уровнем параллельности, поскольку снимок не изменяется.

Изменения передаются агентам асинхронно. При необходимости поток может заблокировать агента, пока не будут внесены его изменения, или можно указать операцию, которая должна быть выполнена при внесении изменений.

Агенты в GPars

GPars реализует большую часть функциональности агента, присущей языку Clojure. В листинге 10 я моделирую нашествие зомби и управляю состоянием мира с помощью агента. Здесь присутствуют два потока: один из них предполагает, что в каждую единицу времени зомби съедает мозг нескольких человек, превращая их в зомби. Другой поток предполагает, что 5 процентов оставшихся зомби истребляются выжившими людьми из дробовиков. Главный поток наблюдает за миром и сообщает о развитии вторжения.

Листинг 10. Защита мира от нашествия зомби
(zombieApocalypse.groovy)
import groovyx.gpars.agent.Agent
import groovyx.gpars.GParsExecutorsPool

public class World {
	int alive = 1000
	int undead = 10
	
	public void eatBrains() {
		alive = alive - undead
		undead = undead * 2		
		if(alive <= 0) {
			alive = 0
			println "ZOMBIE APOCALYPSE!"
		}
	}
	
	public void shotgun() {
		undead = undead * 0.95
	}	
	
	public boolean apocalypse() {
		alive <= 0
	}
	
	public void report() {
		if(alive > 0) {
			println "alive=" + alive + " undead=" + undead
		}
	}
}

@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
def final world = new Agent<World>(new World())

final Thread zombies = Thread.start {
	while(! world.val.apocalypse()) {
		world << { it.eatBrains() }
		sleep 200
	}
}

final Thread survivors = Thread.start {
	while(! world.val.apocalypse()) {
		world << { it.shotgun() } 
		sleep 200
	}
}

while(! world.instantVal.apocalypse()) {
	world.val.report()
	sleep 200
}

Агенты являются важной функцией языка Clojure, поэтому очень приятно видеть их появление в GPars. В реализации GPars отсутствуют некоторые возможности (такие как операции изменения и наблюдатели), но их немного и они, возможно, будут добавлены в будущем.


Переменные потока данных

Переменные потока данных обычно ассоциируются с языком программирования Oz (см. Ресурсы), но их реализация встроена и в Clojure, Scala и GPars.

Чаще всего переменные потока данных сравнивают с ячейками электронной таблицы – они указывают вычисление, которое нужно выполнить, и переменные, которые должны предоставить значения для выполнения этого вычисления. Затем соответствующий планировщик организует исполнение потоков, которые могут выполнять обработку, если для них доступны входные данные. Системы потока данных фокусируются, в первую очередь, на способе прохождения данных через систему, перекладывая задачу эффективного использования нескольких ядер на планировщик потоков.

Потоки данных обладают рядом полезных свойств, поскольку определенные классы проблем полностью исключаются (условия конкуренции), а некоторые остаются возможными, но становятся детерминированными (тупиковые ситуации); таким образом, вы можете быть уверены, что если ваш код не привел к возникновению тупиковых ситуаций в ходе тестирования, то они не возникнут и в производстве.

Переменные потока данных можно привязать лишь один раз, поэтому их применение ограничено. Потоки данных действуют как связанные очереди значений, поэтому данные могут поступать через структуру, определяемую кодом, сохраняя много прежних полезных свойств. На практике переменные потока данных обеспечивают превосходный способ обмена значениями между потоками и часто применяются для обмена результатами в ходе тестирования многопоточных блоков. Кроме того, GPars определяет логические задачи потока данных, исполнение которых (подобно акторам) запланировано через пул потоков, и которые взаимодействуют через переменные потока данных.

Потоки данных

Возвращаясь к листингу 2 можно увидеть, что каждый фоновый поток получает и распечатывает результаты извлекаемых сообщений Твиттера по конкретной теме. Листинг 11 представляет собой вариант указанной программы, которая вместо этого создает один поток данных DataFlowStream. Фоновые задачи будут использовать этот DataFlowStream для потоковой передачи результатов обратно в главный поток, который считывает их из потока данных.

Листинг 11. Потоковая передача результатов через DataFlowStream
(langTweetsDataflow.groovy)
import twitter4j.Twitter
import twitter4j.Query
import groovyx.gpars.GParsExecutorsPool
import groovyx.gpars.dataflow.DataFlowStream

@Grab(group='net.homeip.yusuke', module='twitter4j', version='2.0.10')
@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
def recentTweets(api, queryStr, resultStream) {
  query = new Query(queryStr)
  query.rpp = 5		//число возвращаемых сообщений
  query.lang = "en"		// язык
  tweets = api.search(query).tweets
  threadName = Thread.currentThread().name
  tweets.each {
	resultStream << "[${threadName}-${queryStr}] @${it.fromUser}: ${it.text}"
  }
  resultStream << "DONE"
}
 
def api = new Twitter()
def resultStream = new DataFlowStream()
def tags = ['#erlang','#scala','#clojure']
GParsExecutorsPool.withPool {
  tags.each { 
    { query -> recentTweets(api, query, resultStream) }.callAsync(it)
  }
}

int doneMessages = 0
while(doneMessages < tags.size()) {
	msg = resultStream.val
	if(msg == "DONE") {
		doneMessages++
	} else {
		println "${msg}"
	}
}

Обратите внимание, что из каждого фонового потока слушателю результатов отправляется через поток данных сообщение "DONE" (готово). Такое сообщение, говорящее о том, что поток завершил отправку результатов, в литературе о передаче сообщений иногда называют "отравленным сообщением" (poison message). Оно выступает в роли сигнала между производителем и потребителем.

Другие способы управления состоянием

Две важные модели параллелизма, которые не охвачены настоящей статьей – это взаимодействующие последовательные процессы (CSP) и программная транзакционная память (STM).

CSP опирается на классическую работу Тони Хоара (Tony Hoare), посвященную формализации поведения параллельно исполняемых программ и основанную на базовых концепциях процессов и каналов. Процессы до некоторой степени подобны уже обсуждавшейся модели акторов, а каналы имеют много общего с потоками данных. Процессы CSP отличаются от акторов тем, что имеют более обширный набор входных и выходных каналов, а не один почтовый ящик. GPars содержит API для JCSP-реализации идей CSP.

STM представляет собой область активного исследования в течение нескольких последних лет; языки, реализации этой концепции (подчас разные), содержатся в таких языках, как Haskell, Clojure и Fortress. STM требует от программистов разграничения транзакций в источнике и затем применяет каждую транзакцию к соответствующему состоянию. При обнаружении конфликтов транзакция может выполняться повторно, но это делается автоматически. GPars пока не содержит реализации STM, но, весьма вероятно, она появится в будущем.


Заключение

В настоящее время параллелизм является активной областью исследований, и среди подходов, реализуемых в разных языках и библиотеках, наблюдается высокая степень гибридизации. GPars поддерживает некоторые из наиболее популярных современных моделей параллелизма и делает их доступными на платформе Java через Groovy.

Компьютеры с большим числом ядер никуда не исчезнут, и скорее всего число ядер в одном кристалле будет экспоненциально увеличиваться. Соответственно, параллелизм по-прежнему будет оставаться областью исследований и инновационных решений; при этом, как мы узнали, многие наиболее отработанные варианты будут доступны на JVM.


Загрузка

ОписаниеИмяРазмер
Образец кода для этой статьиj-gpars.zip6 КБ

Ресурсы

Научиться

Получить продукты и технологии

Комментарии

developerWorks: Войти

Обязательные поля отмечены звездочкой (*).


Нужен IBM ID?
Забыли Ваш IBM ID?


Забыли Ваш пароль?
Изменить пароль

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Профиль создается, когда вы первый раз заходите в developerWorks. Информация в вашем профиле (имя, страна / регион, название компании) отображается для всех пользователей и будет сопровождать любой опубликованный вами контент пока вы специально не укажите скрыть название вашей компании. Вы можете обновить ваш IBM аккаунт в любое время.

Вся введенная информация защищена.

Выберите имя, которое будет отображаться на экране



При первом входе в developerWorks для Вас будет создан профиль и Вам нужно будет выбрать Отображаемое имя. Оно будет выводиться рядом с контентом, опубликованным Вами в developerWorks.

Отображаемое имя должно иметь длину от 3 символов до 31 символа. Ваше Имя в системе должно быть уникальным. В качестве имени по соображениям приватности нельзя использовать контактный e-mail.

Обязательные поля отмечены звездочкой (*).

(Отображаемое имя должно иметь длину от 3 символов до 31 символа.)

Нажимая Отправить, Вы принимаете Условия использования developerWorks.

 


Вся введенная информация защищена.


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=40
Zone=Технология Java, Open source
ArticleID=857044
ArticleTitle=Решение общих проблем параллелизма с помощью GPars
publish-date=02012013