よくある並行性の問題を GPars で解決する

Groovy の並行処理ライブラリーで、よく使用される並行性モデルを利用するとともに、これらのモデルに Java プラットフォームでアクセスできるようにする仕組みを学ぶ

シングルコア処理からマルチコア処理への移行が、フォーク/ジョイン、アクター、エージェント、Executor などの並行プログラミング・モデルへの関心を一層高めています。これらのモデルはそれぞれ異なるプログラミング言語のなかで生まれたものですが、その多くが、Groovy ベースの並行処理ライブラリーである GPars にカプセル化されています。この記事では Alex Miller を案内人に、関数型プログラミングとオブジェクト指向プラグラミングの両方から引き出した手法を使用して並行性の問題を解決する方法を学びます。しかもこの方法では、一貫して Groovy に馴染みの深い Java™ に適した構文を使用します。

Alex Miller, Architect and senior engineer, Revelytix

Alex MillerAlex Miller は Revelytix でフェデレーテッド・セマンティク Web クエリー製品の構築に従事しています。Revelytix に入社する前は、Terracotta の技術主任、BEA Systems のエンジニア、MetaMatrix の主任アーキテクトを経験しました。Java 技術、並行性、分散処理、ソフトウェア設計などに興味を持つ彼は、@puredanger の名前で Twitter にツイートを投稿し、Pure Danger Tech でもブログを書いています。セントルイスでは、架空の動的言語を研究する Lambda Lounge グループ、および開発者による Strange Loop コンファレンスを発足しました。



2010年 9月 07日

並行処理が主流となっている現在、プロセッサー・コアを 4 個あるいは、8 個、さらには 16 個搭載したチップは一般的になっていて、近い将来には数百個、さらには数千個ものコアを搭載したチップが登場することも考えられます。このような処理能力は途方もない可能性を秘めていますが、ソフトウェア開発者にとって、このことは大きな課題でもあります。それは多数の真新しいコアを最大限に活用しなければならないことであり、その必要性が、並行性と状態管理、そしてこの両方のために構造化されたプログラミング言語への関心を急速に高めています

Alex Miller と Andrew Glover が並行性について語っています

Andrew Glover と Alex が 2 回にわたり行った、並行性、GPars、そしてマルチコアの将来に関する掘り下げた内容の対談がポッドキャストで提供されています。第 1 回第 2 回を聴いてください。

多数のコアを最大限に活用するという必要性を満たすのが、Groovy、Scala、Clojure などの JVM 言語です。これらの言語は比較的新しく、高度に最適化されたJVM で実行されることによるメリットがあると同時に、Java 1.5 で追加された堅牢なJava 並行処理ライブラリーも利用することができます。上記の 3 つの JVM 言語では積極的に並行プログラミングを使用できるようにしていますが、その手法はそれぞれの原理に基づき異なっています。

この記事では、Groovy ベースの並行処理ライブラリーである GPars を使用して、並行性の問題を解決するためのモデルについて検討します。並行性の問題とは、例えばバックグラウンド処理、並列処理、状態管理、そしてスレッドの調整です。

Groovy と GPars を選ぶ理由

Groovy は JVM 上で動作する動的型付け言語です。Java 言語をベースとする Groovy は、Java コードに見られる形式上の構文を大幅に取り除くと共に、他のプログラミング言語の有用な機能を追加しています。Groovy に備わった最強の特徴の 1 つは、プログラマーが Groovy ベースの DSL を簡単に作成できることです (DSL (Domain-Specific Language: ドメイン特化言語) とは、特定のプログラミング問題を解決するように設計されたスクリプト言語です。DSL についての詳細は、「参考文献」を参照してください)。

コードとツールを入手してください

Groovy、GPars、そしてこの記事で使用しているその他のツールをダウンロードするには、「参考文献」セクションを参照してください。記事の実行可能コード・サンプルを随時ダウンロードすることができます。

GPars (Groovy Parallel Systems) は、並行性モデルおよび調整モデルを DSL として取り込んだ Groovy 用並行処理ライブラリーです。GPars は、他の言語でよく使用されている並行性モデルと調整モデルの概念を採り入れています。例えば、以下の概念です。

  • Java 言語の Executor およびフォーク/ジョイン
  • Erlang と Scala のアクター
  • Clojure のエージェント
  • Oz のデータフロー変数

並行性に対するさまざまな手法を説明するには、Groovy と GPars は理想的な組み合わせになります。Groovy の構文は Java 言語をベースとしているため、Groovy に馴染みのない Java 開発者にとっても、ここで説明する内容を簡単に理解することができるはずです。この記事のサンプル・コードは、Groovy 1.7 と GPars 0.10 をベースとしています。


バックグラウンド処理と並列処理

よくあるパフォーマンス上の問題は、I/O 処理が完了するのを待たなければならないことです。I/O には、ディスク、データベース、Web サービスなどのデータ、さらにはユーザーから入力されたデータの読み取り操作が関係してきます。あるスレッドが、I/O 処理が完了するのを待機してブロックされている場合には、本来のスレッド実行処理からその待機しているスレッドを切り離すことで、本来のスレッド実行を継続することができれば、パフォーマンス問題を解決するのに役立ちます。I/O 処理の完了はバックグラウンドで待機されることから、この手法はバックグラウンド処理と呼ばれます。

例えば、Twitter API を呼び出して複数の JVM 言語に関する最新のツイートを検索し、その結果を出力するプログラムがあるとします。Groovy では、このようなプログラムを twitter4j という Java ライブラリーを使用して簡単に作成することができます (リスト 1 を参照)。

リスト 1. ツイートを順次読み取る (langTweets.groovy)
import twitter4j.Twitter
import twitter4j.Query

@Grab(group='net.homeip.yusuke', module='twitter4j', version='2.0.10')
def recentTweets(api, queryStr) {
  query = new Query(queryStr)
  query.rpp = 5		// tweets to return
  query.lang = "en"		// language
  tweets = api.search(query).tweets
  threadName = Thread.currentThread().name
  tweets.collect {
    "[${threadName}-${queryStr}] @${it.fromUser}: ${it.text}"
  }
}

def api = new Twitter()
['#erlang','#scala','#clojure'].each {
  tweets = recentTweets(api, it)
  tweets.each {
    println "${it}"
  }
}

リスト 1 では、まず Groovy Grape (「参考文献」を参照) を使用して twitter4j ライブラリーの依存関係を取得します。次に定義している recentTweets メソッドは、クエリー・ストリングを引数に取り、そのクエリーを実行して、ストリングにフォーマット設定されたツイートのリストを返します。最後にタグのリストに含まれる各タグをウォークスルーしてツイートを取得し、それを出力するという仕組みです。スレッドは使用されないため、このコードは各検索を順次実行します (図 1 を参照)。

図 1. ツイートを順次読み取る
ツイートを順次読み取るプログラミング・モデル

並列処理

リスト 1 のプログラムが待機中に何の操作も行わないのであれば、最終的には複数の操作を待機する結果となってしまいます。一方、リモート・リクエストをバックグラウンド・スレッドに組み込めば、プログラムは各クエリーに対するレスポンスを並列に待機できるようになります。

図 2. ツイートを並列に読み取る
複数のツイートを並列に読み取るように改善されたプログラミング・モデル

GPars の Executors DSL を使用すると、リスト 1 のプログラムを順次処理から並列処理に簡単に変換することができます (リスト 2 を参照)。

リスト 2. ツイートの並列読み取り操作 (langTweetsParallel.groovy)
  import twitter4j.Twitter
import twitter4j.Query
import groovyx.gpars.GParsExecutorsPool

@Grab(group='net.homeip.yusuke', module='twitter4j', version='2.0.10')
@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
def recentTweets(api, queryStr) {
  query = new Query(queryStr)
  query.rpp = 5		// tweets to return
  query.lang = "en"		// language
  tweets = api.search(query).tweets
  threadName = Thread.currentThread().name
  tweets.collect {
    "[${threadName}-${queryStr}] @${it.fromUser}: ${it.text}"
  }
}

def api = new Twitter()
GParsExecutorsPool.withPool {
  def retrieveTweets = { query ->
    tweets = recentTweets(api, query)
    tweets.each {
      println "${it}"
    }
  }

  ['#erlang','#scala','#clojure'].each {
    retrieveTweets.callAsync(it)
  }
}

上記では Executors DSL を使用して、GParsExecutorsPoolimport 文と、Groovy Grape 依存性システムによって GPars ライブラリーを取得する Grab 文を追加しました。続いて GParsExecutorsPool.withPool ブロックを導入し、このブロック内でコードを拡張して他の機能を追加します。Groovy では、クロージャーを call メソッドで呼び出すことができます。call メソッドで呼び出されたクロージャーは、このプログラムで使用されているプール内のタスクとして、GParsExecutorsPool によって実行されます。さらに GParsExecutorsPool は、クロージャーを callAsync メソッドで拡張します。このメソッドは呼び出されるとすぐに制御を返すメソッドなので、ブロックは発生しません。上記の例では、ツイートの検索および出力アクションをクロージャー内にラップした上で、クエリーごとにクロージャーを非同期に呼び出しています。

GPars はこれらのタスクをワーカーのプールに渡すので、すべての検索を並列に実行できるようになるというわけです (ただし、プールに十分な大きさがあることが前提となります)。さらに、各クエリーからの結果も並列に処理することができるため、返されるクエリーの結果を順次画面に出力することができます。

この例は、バックグラウンド処理がパフォーマンスと応答性を改善する 2 つの仕組みを説明しています。つまり、I/O 処理が完了するのを並列に待機していると、その I/O に依存する処理も並列に行えるようになるということです。


Executor

皆さんの中には、Executor とは何なのか、そして GParsExecutorsPool でのバックグラウンド処理はどのように行われるのか不思議に思っている方もいることでしょう。Executor は、実際には Java 5 で導入された java.util.concurrent ライブラリーの一部です (「参考文献」を参照)。java.util.concurrent.Executor インターフェースには、execute(Runnable) というメソッドしかありません。このメソッドは、タスクの送信を、Executor 実装の中で実際にタスクが実行される方法とは切り離すために存在します。

java.util.concurrent.Executors クラスは、多種多様なスレッド・プールでサポートされる Executor インスタンスを作成する際に役立つ多数のメソッドを提供します。GParsExecutorsPool はデフォルトで、デーモン・スレッドと一定数のスレッド ((Runtime.getRuntime().availableProcessors() + 1) 個のスレッド) が含まれるスレッド・プールを使用するように設定されます。このデフォルト設定が適切でない場合には、簡単にスレッド・プールのサイズを変更することができます。また、カスタムの ThreadFactory を使用するのも、既存の Executor プール (withExistingPool) 全体を指定するのも簡単です。

スレッドの数を変更するには、withPool メソッドに GParsExecutorsPool スレッドの数を渡すだけでよいのです (リスト 3 を参照)。

リスト 3. スレッドの数を指定して Executor プールを使い始める場合
GParsExecutorsPool.withPool(8) {
  // ...
}

あるいは、特別に名前を付けたスレッドを持つカスタム・スレッド・ファクトリーを渡したい場合には、リスト 4 のように指定することもできます。

リスト 4. カスタム・スレッド・ファクトリーを指定して Executor プールを使い始める場合
def threadCounter = new AtomicLong(0)
def threadFactory = {Runnable runnable ->
  Thread thread = new Thread(runnable)
  id = threadCounter.getAndIncrement()
  thread.setName("thread ${id}")
  return thread
} as ThreadFactory

def api = new Twitter()
GParsExecutorsPool.withPool(2, threadFactory) {
  // ...
}

async メソッドと callAsync メソッドはブロックせずに Future オブジェクトを即座に返します。このオブジェクトには、非同期の計算が完了すると、その結果が入れられます。結果を受け取る側は Future オブジェクトに対し、計算結果が準備されるまでブロックするか、ポーリングして計算が完了したかどうかを確認するか、計算をキャンセルするか、それとも別のスレッドが計算をキャンセルしたかどうかを調べるか、を指定することができます。Executor インターフェースと同じく、Future クラスは使用されている java.util.concurrent パッケージの一部です。


CPU に対する並列性

バックグラウンド処理の例では、複数の I/O バウンドのタスクを順次処理するのではなく、プログラムに複数の I/O バウンドのタスクの完了を並列に待機させることの利点を説明しました。ワーカーのプールを使用して複数のタスクを並列に実行する方法は、CPU バウンドのタスクにとっても有益です。

アプリケーションが許容できる並列性の程度、したがって最終的にはプログラミングの選択肢の幅を左右するのは、アプリケーションが持つ以下の 2 つの重要な側面です。

  • タスクの粒度: 時間またはデータという観点でのタスクのスコープです。
  • タスクの依存性: タスクの間に通常存在する依存関係の数です。

この 2 つの側面はいずれもタスクに関するものであるため、ソリューションを考案する前に、タスクに関するどの部分に問題があるのかを考えると有効です。例えば、プログラムのタスクの粒度としては、大きなトランザクション・サイズの作業として定義することも、大きなデータ・セットのごく一部 (例えば画像全体のうちの数ピクセル) に対する簡単な計算が多数あるような構成にすることもできます。作業量に関わらず、スレッドまたはプロセスのコンテキスト・スイッチにはオーバーヘッドが伴うことから、小さなスコープの粒度のシステムは効率性が悪く、低いパフォーマンスに悩まされることがよくあります。システムの効率性を最適化できるポイントを見つけるには、タスクの粒度を基準にバッチ処理することが 1 つの方法となります。

互いの依存関係がほとんどないタスクは、「Embarrassingly Parallel」と表現されることがよくあります。この言葉が意味することは、これらのタスクをとても簡単に、並列する多数のタスクに分離することができるということです。その典型例には、グラフィック処理、総当たり検索、フラクタル計算、粒子シミュレーションなどが挙げられます。大量の注文やファイルを処理または変換するビジネス処理プログラムも、このカテゴリーに含められる場合があります。

Executor キューの競合

上記で説明した GPars でタスクを Executor プールにプッシュする仕組みは、頭に入れておけば参考にはなるものの、Executor が J2SE (Java 2 Platform, Standard Edition) に追加されたのは 2005年に遡ります。そのため、Executor は比較的少数のコア (2 個から 8 個) を比較的低い粒度で実行するように調整されていて、場合によってはタスク間の依存関係をわずかに持つトランザクション・タスクをブロックすることもあります。Executor の実装に使用される入力作業キューは 1 つだけで、このキューがすべてのワーカー・スレッドで共有されるためです。

このモデルの主な問題は、ワーカー・スレッドの数が増えると、作業キューでの競合が多くなることです (図 3 を参照)。スレッドおよびコアの数が増えてくると、最終的には作業キューの競合がスケーラビリティーのボトルネックとなります。

図 3. Executor キューの競合
Executor キューの競合に起因するスケーラビリティーのボトルネックを示す図

Executor キューに代わる方法は、フォーク/ジョインのフレームワークです。現在このフレームワークは JSR 166y メンテナンス・アップデート (「参考文献」を参照) の一部となっていますが、JDK 7 で正式に Java プラットフォームに導入される予定となっています。フォーク/ジョインは、多数の並行スレッドで細粒度の計算タスクを実行できるように調整されています。


GPars でのフォーク/ジョイン

フォーク/ジョインには、タスク間の依存関係を定義し、新しいタスクを生成するためのサポートがあります。このような特性があることから、フォーク/ジョインはタスクをサブタスクに分割し、最後にサブタスクを結合する分割統治スタイルのアルゴリズム (「参考文献」を参照) には理想的なフレームワークです。フォーク/ジョインはスレッドごとに 1 つの作業キューを使用することによって、キュー競合の問題に対処します。各スレッドで使用されるキューは実際には deque です (両端キュー。「デック」と発音します)。deque では、プールに入ってくる作業のバランスを取るために、スレッドが別のキューの末尾から作業を取り出すことができます。

例として、リスト内で最大の値を見つけるというタスクがあるとします。最も明らかなストラテジーは、単純に数値のすべてをウォークスルーしながら最大値を記録するというものです。けれども、これは本質的に順次処理のストラジーであり、せっかくの高価なコアを利用することにはなりません。

一方、最大値の関数を並列分割統治法アルゴリズムとして実装したとしたら、どうなるでしょうか。分割統治法は再帰的アルゴリズムです。したがってステップごとに、リスト 5 に記載する構造があります。

リスト 5. 分割統治法アルゴリズム
IF problem is small enough to solve directly
THEN solve it directly
ELSE {
  Divide the problem in two or more sub-problems 
  Solve each sub-problem
  Combine the results
}

IF 条件では、各タスクの粒度を変えることができます。このようなアルゴリズムのスタイルは、ツリーという形を生成することになります。ツリーのリーフは、THEN 分岐を取るタスクによって定義されます。ツリーの内部ノードは、ELSE 分岐を取るタスクです。各内部ノードは、その 2 つの (またはそれ以上の) 子タスクを待機しなければなりません (つまり、子タスクに依存するということです)。Fork/Join モデルはまさに、依存関係のツリーで多数のタスクを待機させるというこの種のアルゴリズムを対象に設計されています。フォーク/ジョインで待機するタスクが実際にスレッドをブロックすることはありません。

GPars では、フォーク/ジョインのタスクを実行することによって、フォーク/ジョインのアルゴリズムを作成して実行することができるようになっています (リスト 6 を参照)。

リスト 6. max() の並列フォーク/ジョイン実装 (computeMax.groovy)
import static groovyx.gpars.GParsPool.runForkJoin
import groovyx.gpars.GParsPool
import groovyx.gpars.AbstractForkJoinWorker

@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
class Config {
	static DATA_COUNT = 2**14
	static GRANULARITY_THRESHHOLD = 128
	static THREADS = 4
}

items = [] as List<Integer>
items.addAll(1..Config.DATA_COUNT)
Collections.shuffle(items)

GParsPool.withPool(Config.THREADS) {
  computedMax = runForkJoin(1, Config.DATA_COUNT, items.asImmutable()) 
    {begin, end, items ->
      int size = end - begin
      if (size <= Config.GRANULARITY_THRESHHOLD) {
        return items[begin..<end].max()
      } else { // divide and conquer
        leftEnd = begin + ((end + 1 - begin) / 2)
        forkOffChild(begin, leftEnd, items)
        forkOffChild(leftEnd + 1, end, items)
        return childrenResults.max()
      }
    }
		
	println "expectedMax = ${Config.DATA_COUNT}"
	println "computedMax = ${computedMax}"
}

フォーク/ジョインには、groovyx.gpars.GParsPool クラスに専用のプールがあることに注意してください。GParsPoolGParsExecutorsPool と多くの共通機能を共有しますが、GParsPool にはフォーク/ジョインに固有の特殊な機能があります。フォーク/ジョインを直接使用するには、タスク・クロージャーを用いた runForkJoin() メソッドを使用するか、AbstractForkJoinWorker をサブクラス化するタスク・クラスのいずれかを使用する必要があります。


並列コレクション

フォーク/ジョインは、分割統治法アルゴリズムでの場合は尚更のこと、並列タスク構造を定義し、実行する際の理想的な手段となります。けれども前の例では、かなりの形式上の構文が使用されていたことにお気付きでしょうか。なぜなら、タスク・クロージャーを定義し、適切なタスクの粒度を決定し、問題を分割し、結果を結合するなどの作業が必要だったためです。

これよりも理想的なのは、もっと上位の抽象化レベルで作業することです。そのレベルでデータ構造を定義し、定義したデータ構造に対して共通の操作を並列に実行できれば、あらゆるケースでの詳細を管理する下位レベルのタスクを定義する必要がなくなります。

JSR 166y メンテナンス・アップデートでは、この目的のために ParallelArray という上位レベルのインターフェースを指定しています。ParallelArray は共通の関数型プログラミングの操作を配列構造で提供します。これらの関数は、フォーク/ジョインのプールを使用して並列に実行されます。

この API には関数型の性質があるため、関数 (メソッド) を ParallelArray の各項目に対して実行するには、その関数をこれらの操作の多くに渡さなければなりません。現在 JDK 7 に向けて開発中の機能の 1 つとして、ラムダのサポートがあります。この機能によって、開発者はコード・ブロックを定義し、そのブロックを渡せるようになります。現時点では、JDK 7 での ParallelArray の組み込みの状況は、ラムダ・プロジェクトの結果待ちとなっています (「参考文献」を参照)。


GPars での ParallelArray

Groovy はコードのブロックをクロージャーとして定義し、そのブロックをファーストクラス・オブジェクトとして渡すための完全なサポートを備えています。したがって、ParallelArray の関数型のスタイルでは Groovy と GPars を使用するのがぴったりです。ParallelArray と GPars は、以下の関数演算子のコア・セットをサポートします。

  • map
  • reduce
  • filter
  • size
  • sum
  • min
  • max

さらに、GPars はコレクションを GParsPool ブロック内部で継承し、基本となるメソッドをベースに作成した以下の並列メソッドを追加で使用できるようにしています。

  • eachParallel
  • collectParallel
  • findAllParallel
  • everyParallel
  • groupByParallel

並列コレクション・メソッドはトランスペアレントなメソッドにすることができるため、デフォルトでは標準のコレクション・メソッドが並列に動作します。つまり、並列コレクションを既存の (並列ではない) コード・ベースに渡すことができるため、そのコードをそのまま使用できる可能性があります。ただし、外部コードが必要な同期を保証しない場合もあるので、これらの並列メソッドで使用される状態を考慮することは重要です。

リスト 6 の例をもう一度見てみると、max() は既に並列コレクションに定義されているメソッドであることがわかります。そのため、フォーク/ジョインのタスクを直接定義して呼び出す必要はありません (リスト 7 を参照)。

リスト 7. GPars ParallelArray 関数を使用する (computeMaxPA.groovy)
import static groovyx.gpars.GParsPool.runForkJoin
import groovyx.gpars.GParsPool

@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
class Config {
	static DATA_COUNT = 2**14
	static THREADS = 4
}

items = [] as List<Integer>
items.addAll(1..Config.DATA_COUNT)
Collections.shuffle(items)

GParsPool.withPool(Config.THREADS) {
	computedMax = items.parallel.max()
	println "expectedMax = ${Config.DATA_COUNT}"
	println "computedMax = ${computedMax}"
}

ParallelArray を使用した関数型プログラミング

ここで、私は今、在庫システムの注文に関するレポートを作成していて、納期を過ぎた注文の数と、これらの注文の平均延滞日数を計算しているとします。このレポートは、最初に注文された品物の納品が延滞していないかどうかを (今日の日付と納期とを比較することで) 判断するメソッドを定義し、それから今日の日付と納期との差の日数を計算するメソッドを定義することで、作成することができます。

このコードの中核は、コアの ParallelArray メソッドを使用して必要な数値を計算する部分です (リスト 8 を参照)。

リスト 8. 並列関数演算値をフル活用する例 (orders.groovy)
GParsPool.withPool {
  def data = createOrders().parallel.filter(isLate).map(daysOverdue)
  println("# overdue = " + data.size())
  println("avg overdue by = " + (data.sum() / data.size()))
}

上記では注文のリストを取り、それを ParallelArray に変換します。次に、isLate が true を返した注文だけを保持し、これらの注文のそれぞれを延滞日数に変換するために map 関数で計算します。こうすれば、あとは配列に対して組み込み集約関数を使用して、配列のサイズ、そして延滞日数の合計を取得し、平均を計算することができます。このコードは関数型プログラミング言語で見られるようなコードとよく似ていますが、ここにはコードが自動的に並列に実行されるという追加の利点があります。


状態の管理

複数のスレッドによって読み取り/書き込みが行われるデータを操作するときには、常に、そのデータをどのように管理し、変更を調整するかを考えなければなりません。Java 言語やその他の言語では、共有状態の管理を規制するためのパラダイムは、ロックやその他のクリティカル・セクション・マーカーで保護された可変状態で構成されます。

可変状態とロックには、さまざまな理由で問題があります。ロックは暗黙的にコード内の依存関係に順序を付け、開発者が実行パスと実行結果を推測できるようにします。けれどもロックの多くの側面は強制されません。そのため、可視性や安全な公開に関する問題や、レース・コンディション、デッドロック、その他並行性に関する一般的なバグなどの問題が含まれる質の悪いコードを目にすることがよくあります。

さらに深刻な問題は、最初に 2 つのコンポーネントを並行性に関して正しく作成したとしても、この 2 つを組み合わせることによって、新しい予期せぬバグが生まれる場合が (しかも、かなり高い確率で) あることです。したがって、システムの規模が大きくなっても信頼性を保ち続けられる並行システムを可変状態とロックをベースに作成するのは容易なことではありません。

以降のセクションでは、システム内の複数のスレッドで状態を管理し、共有するための 3 つのパラダイムを紹介します。根本的に、これらのパラダイムはスレッドとロックを基盤にすることができますが (実際そうなっています)、上位レベルでの抽象化となるため、複雑度はかなり低くなります。

これから紹介する 3 つの手法は、いずれも GPars でサポートされている、アクター、エージェント、そしてデータフロー変数です。


アクター

アクター・パラダイムは当初、Erlang で普及しました。最近では、Scala でのその使い方により、さらに知名度が高まっています (この 2 つの言語についての詳細は「参考文献」を参照してください)。Erlang は 1980年代から 1990年代にかけて、Ericsson で AXD301 電気通信交換機などの機器用に設計された言語です。このような交換機を対象にした設計には極めて高い信頼性が求められ、ダウンタイムはゼロ (ホット・コード・アップグレード)、そして大量の並行処理を行うといった、非常に困難な要求に応えなければならないものでした。

Erlang が前提とするのは、「プロセス」の世界です。ここで言うプロセスは、(オペレーティング・システムのプロセスのようなものではなく) 軽量のスレッドのようなものですが、ネイティブ・スレッドには直接対応付けられません。プロセスの実行は、そのベースで動作している仮想マシンによってスケジューリングされます。Erlang でのプロセスは、メモリーのフットプリントが小さく、開始するまでの時間が短く、コンテキスト・スイッチが高速に行われると考えられています。

Erlang のアクターは、プロセスで実行される関数にすぎません。Erlang には共有メモリーがなく、Erlang の中ではすべての状態が不変です。並行性に重点を置く多くの言語では、不変データがその (不変という) 素晴らしい性質により、重要な側面となります。不変データを変更することはできないため、不変データの読み取り操作にロックは必要ありません。それは、複数のスレッドが読み取りを行っているとしても同じです。不変データを変更するには、そのデータの新しいバージョンを作成し、その新しいバージョンを操作する必要があります。共有される可変状態の言語 (Java 言語など) で主に経験を積んだ開発者にとって、このような考え方に切り替えるには時間がかかるかもしれません。

アクター間で状態を「共有」するには、不変のメッセージを渡します。それぞれのアクターにはメール・ボックスがあり、メール・ボックスにメッセージを受信するために actor 関数が繰り返し実行されます。メッセージは通常、非同期で送信されますが、同期呼び出しを作成するのも簡単です。一部のアクター実装では同期呼び出しを 1 つの機能として提供しています。

GPars でのアクター

GPars は Erlang と Scala の概念の多くを使用してアクター・モデルを実装しています。GPars でのアクターは、メール・ボックスのメッセージを使用する軽量のプロセスです。メッセージが receive() メソッドまたは react() メソッドのどちらで使用されるかによって、スレッド・バインディングを使用しないことも、維持することもできます。

GPars でアクターを作成するには、クロージャーを引数に取るファクトリー・メソッドから作成するという方法、あるいは groovyx.gpars.actor.AbstractPooledActor をサブクラス化するという方法を使用することができます。アクターには、act() メソッドを含める必要があります。一般に、act() メソッドには無限に繰り返されるループが含まれ、ループの中で react (軽量のアクターの場合) または receive (スレッドにバインドされたままの重いアクターの場合) を呼び出します。

リスト 9. 「じゃんけん」の場合のアクター実装 (rps.groovy)
package puredanger.gparsdemo.rps;

import groovyx.gpars.actor.AbstractPooledActor

enum Move { ROCK, PAPER, SCISSORS }

@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
class Player extends AbstractPooledActor {
  String name
  def random = new Random()
  
  void act() {
    loop {
      react {
        // player replies with a random move 
        reply Move.values()[random.nextInt(Move.values().length)]
      }
    }
  }
}

class Coordinator extends AbstractPooledActor {
  Player player1
  Player player2
  int games
  
  void act() {
    loop {
      react {
	// start the game
        player1.send("play")
        player2.send("play")
        
        // decide the winner
        react {msg1 ->
          react {msg2 ->          
            announce(msg1.sender.name, msg1, msg2.sender.name, msg2) 

            // continue playing
            if(games-- > 0) 
              send("start")
            else 
              stop()
          }
        }
      }
    }
  }
  
  void announce(p1, m1, p2, m2) {
    String winner = "tie"
    if(firstWins(m1, m2) & ! firstWins(m2, m1)) {
      winner = p1
    } else if(firstWins(m2, m1) & ! firstWins(m1, m2)) {
      winner = p2
    } // else tie
    
    if(p1.compareTo(p2) < 0) {
      println toString(p1, m1) + ", " + toString(p2, m2) + ", winner = " + winner      
    } else {
      println toString(p2, m2) + ", " + toString(p1, m1) + ", winner = " + winner 
    }
  }
  
  String toString(player, move) {
    return player + " (" + move + ")"
  }
  
  boolean firstWins(Move m1, Move m2) {
    return (m1 == Move.ROCK & m2 == Move.SCISSORS) || 
        (m1 == Move.PAPER & m2 == Move.ROCK) ||
        (m1 == Move.SCISSORS & m2 == Move.PAPER)
  }
}


final def player1 = new Player(name: "Player 1") 
final def player2 = new Player(name: "Player 2")  
final def coordinator = new Coordinator(player1: player1, player2: player2, games: 10)

[player1,player2,coordinator]*.start()
coordinator < "start"
coordinator.join()
[player1,player2]*.terminate()

リスト 9 に記載されている GPars で実装された「じゃけん」の完全な表現には、Coordinator アクターと 2 つの Player アクターが使用されています。Coordinatorplay メッセージを両方の Playerに送信してから、レスポンスを受信するまで待機します。2 つのレスポンスを受け取ると、Coordinator はじゃんけんの結果を出力し、新しくじゃんけんを開始するためにメッセージを Coordinator 自身に送信します。Player アクターはじゃんけんの手を出すように求められるまで待機し、その要求があった時点でいずれかの手を出して応答します。

GPars は重要なすべてのアクター機能だけでなく、さらに機能を追加した素晴らしい実装を提供しています。アクターの手法は、あらゆる並行性の問題にとって最善のソリューションとなるわけではありませんが、メッセージ・パッシングに伴う問題をモデル化するには非常に有効な方法となります。


エージェント

エージェントは、識別可能な状態変化に対するマルチスレッド化されたアクセスを調整するために、Clojure (「参考文献」を参照) の中で使用されます。エージェントは、ID (参照する対象の名前) とその ID が参照する現在の値との間の不要な結合を切り離します。ほとんどの言語では、この 2 つの側面は表裏一体であり、名前が設定されているということは、その値を変更できることを意味します。この関係を図 4 に示します。

図 4. エージェントによる状態の管理
エージェントと状態の関係を示す図

エージェントは、変数のホルダーと可変状態自体との間の間接層となります。状態を変更するには、エージェントに関数を渡します。すると、エージェントが一連の関数を評価して状態を各関数の出力に置き換えます。エージェントはデータへのアクセスをシリアライズするため、レース・コンディションが発生したり、データが破損したりするリスクはありません。

さらに、データの読み取り操作では現在のスナップショットが調べられます。スナップショットは変更されないため、これによってもたらされる並行処理のオーバーヘッドはほとんどありません。

変更はエージェントに非同期で送信されます。必要な場合には、変更が適用されるまでスレッドをエージェントでブロックさせることも、変更を適用するときに実行するアクションを指定することも可能です。

GPars でのエージェント

GPars では Clojure で使用されているエージェント機能のほとんどを実装しています。リスト 10 では、ゾンビの侵略をモデル化し、エージェント内で世界の状態を管理しています。スレッドは 2 つあり、1 つは時間単位ごとに 1 体のゾンビが 1 人の人間の脳を食べるという前提で、食べられた人間の数をゾンビの数に換算します。もう 1 つのスレッドが前提としているのは、残りのゾンビの 5 パーセントは、生存者がショットガンで撃ち殺すことです。主要なスレッドは、世界を監視し、侵略の進行状態をレポートします。

リスト 10. ゾンビの侵略から世界を守る
(zombieApocalypse.groovy)
import groovyx.gpars.agent.Agent
import groovyx.gpars.GParsExecutorsPool

public class World {
	int alive = 1000
	int undead = 10
	
	public void eatBrains() {
		alive = alive - undead
		undead = undead * 2		
		if(alive <= 0) {
			alive = 0
			println "ZOMBIE APOCALYPSE!"
		}
	}
	
	public void shotgun() {
		undead = undead * 0.95
	}	
	
	public boolean apocalypse() {
		alive <= 0
	}
	
	public void report() {
		if(alive > 0) {
			println "alive=" + alive + " undead=" + undead
		}
	}
}

@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
def final world = new Agent<World>(new World())

final Thread zombies = Thread.start {
	while(! world.val.apocalypse()) {
		world < { it.eatBrains() }
		sleep 200
	}
}

final Thread survivors = Thread.start {
	while(! world.val.apocalypse()) {
		world < { it.shotgun() } 
		sleep 200
	}
}

while(! world.instantVal.apocalypse()) {
	world.val.report()
	sleep 200
}

Clojure の重要な機能であるエージェントが GPars にも登場したのは嬉しいことです。GPars 実装にはいくつか欠けている機能がありますが (変更アクションおよびウォッチャーなど)、これらは重要な意味があって欠けているわけではないため、いずれは追加されることになるはずです。


データフロー変数

データフロー変数が最も深く関係している言語は、Oz プログラミング言語 (「参考文献」を参照) ですが、それが実装されてきたのは Clojure、Scala、および GPars でした。

データフロー変数と似ていることからよく引き合いに出されるのは、スプレッドシートのセルです。どちらも必要な計算式と、その計算式を実行するための値を提供する変数を指定することを焦点としています。計算式と変数が指定されると、そのベースで動作するスケジューラーが、入力を使用できるようになって処理を進められるスレッドの実行に責任を持ちます。データフロー・システムが焦点とするのはシステム内でのデータのフローだけで、複数のコアを効率的に利用する方法については、スレッド・スケジューラーに任せられます。

データフローが持つ特性のなかには、ある種の問題 (例えば、レース・コンディション) は起こり得ない一方で、別の種類の問題 (例えばデッドロック) は起こり得るけれども確定的であるという、素晴らしい特性があります。つまり、テスト段階でコードがデッドロックを発生させないことを保証できれば、本番環境でデッドロックが発生することはありません。

データフロー変数は一度しかバインドできないため、その使用法は限られています。データフローのストリームはバインドされた値のキューとして機能します。したがって、コードが定義する構造にデータを流し込めば、データフローが持つ優れた特性の多くを維持することができます。実際のところ、データフロー変数はスレッド間で値をやり取りするには最適な方法となり、マルチスレッド化されたユニット・テストで結果をやり取りするためによく使用されています。GPars では論理データフロー・タスクも定義しており、このタスクは (アクターのように) スレッド・プールに対してスケジューリングされ、データフロー変数を介して通信が行われます。

データフローのストリーム

リスト 2 には、各バックグラウンド・スレッドが特定のトピックに関するツイートの検索結果を受け取り、出力するプログラムを記載しました。リスト 11 ではこのプログラムを変更し、代わりに単一の DataFlowStream を作成するようになっています。バックグラウンド・タスクは、作成された DataFlowStream を使用して結果のツイートをストリームとしてメインのスレッドに返します。すると、メインのスレッドはストリームから各ツイートを読み取ります。

リスト 11. DataFlowStream による結果のストリーミング
(langTweetsDataflow.groovy)
import twitter4j.Twitter
import twitter4j.Query
import groovyx.gpars.GParsExecutorsPool
import groovyx.gpars.dataflow.DataFlowStream

@Grab(group='net.homeip.yusuke', module='twitter4j', version='2.0.10')
@Grab(group='org.codehaus.gpars', module='gpars', version='0.10')
def recentTweets(api, queryStr, resultStream) {
  query = new Query(queryStr)
  query.rpp = 5		// tweets to return 
  query.lang = "en"		// language
  tweets = api.search(query).tweets
  threadName = Thread.currentThread().name
  tweets.each {
	resultStream < "[${threadName}-${queryStr}] @${it.fromUser}: ${it.text}"
  }
  resultStream < "DONE"
}
 
def api = new Twitter()
def resultStream = new DataFlowStream()
def tags = ['#erlang','#scala','#clojure']
GParsExecutorsPool.withPool {
  tags.each { 
    { query -> recentTweets(api, query, resultStream) }.callAsync(it)
  }
}

int doneMessages = 0
while(doneMessages < tags.size()) {
	msg = resultStream.val
	if(msg == "DONE") {
		doneMessages++
	} else {
		println "${msg}"
	}
}

各バックグラウンド・スレットからストリームを介して結果リスナーに送信される、「DONE」メッセージに注目してください。スレッドが結果の送信を完了したことを示すこのメッセージは、メッセージ・パッシングの言葉では「ポイズン・メッセージ」と呼ばれる場合もあり、プロデューサーとコンシューマーの間のシグナルとして機能します。

その他の状態管理方法

この記事で説明していない重要な並行性モデルには、CSP (Communicating Sequential Processes) と STM (Software Transactional Memory) の 2 つがあります。

CSP は、並行プログラムの振る舞いを形式化するための Tony Hoare による有名な取り組みと、プロセスとチャネルのコア概念に基づいています。プロセスは、この記事で説明したアクター・モデルとある意味似ている一方、チャネルにはデータフロー・ストリームと多くの共通点があります。CSP のプロセスは、単純なメール・ボックスよりも遥かに多くの入力および出力チャネルがあるという点で、アクターとは異なります。GPars では、CSP の概念の JCSP 実装に対応した API を組み込んでいます。

STM はこの数年の間、活発に研究が進められている分野で、Haskell、Clojure、Fortress などの言語に (多少異なるももの) この概念の実装が組み込まれています。STM では、プログラマーがソースにトランザクション境界を定める必要があります。その上で、システムは各トランザクションをシステムのベースとなる状態に適用します。矛盾が検出された場合にはトランザクションを再試行することができますが、それは自動的に処理されます。GPars ではまだ STM の実装を組み込んでいませんが、将来的には組み込むことになるはずです。


まとめ

並行性は現在話題を集めている分野であり、各種の言語とライブラリーで使用されている手法の間でさまざまなハイブリッド化が行われています。GPars では現在最もよく使用されている並行性モデルのいくつかを採用し、Groovy という手段を使って Java プラットフォームで使用できるようにしています。

多数のコアを搭載したコンピューターは今後も主流であり続けるはずです。事実、チップ上のコアの数は飛躍的に増加する傾向にあります。したがって、並行性はこれからも調査と革新が進められる分野であり続けるでしょう。そして最も持続する並行処理手法の多くは、JVM を拠りどころとするはずです。


ダウンロード

内容ファイル名サイズ
Sample code for this articlej-gpars.zip6KB

参考文献

学ぶために

製品や技術を入手するために

議論するために

  • My developerWorks コミュニティーに加わってください。ここでは他の developerWorks ユーザーとのつながりを持てる他、開発者が主導するブログ、フォーラム、グループ、ウィキを調べることができます。

コメント

developerWorks: サイン・イン

必須フィールドは(*)で示されます。


IBM ID が必要ですか?
IBM IDをお忘れですか?


パスワードをお忘れですか?
パスワードの変更

「送信する」をクリックすることにより、お客様は developerWorks のご使用条件に同意したことになります。 ご使用条件を読む

 


お客様が developerWorks に初めてサインインすると、お客様のプロフィールが作成されます。会社名を非表示とする選択を行わない限り、プロフィール内の情報(名前、国/地域や会社名)は公開され、投稿するコンテンツと一緒に表示されますが、いつでもこれらの情報を更新できます。

送信されたすべての情報は安全です。

ディスプレイ・ネームを選択してください



developerWorks に初めてサインインするとプロフィールが作成されますので、その際にディスプレイ・ネームを選択する必要があります。ディスプレイ・ネームは、お客様が developerWorks に投稿するコンテンツと一緒に表示されます。

ディスプレイ・ネームは、3文字から31文字の範囲で指定し、かつ developerWorks コミュニティーでユニークである必要があります。また、プライバシー上の理由でお客様の電子メール・アドレスは使用しないでください。

必須フィールドは(*)で示されます。

3文字から31文字の範囲で指定し

「送信する」をクリックすることにより、お客様は developerWorks のご使用条件に同意したことになります。 ご使用条件を読む

 


送信されたすべての情報は安全です。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=Java technology, Open source
ArticleID=550580
ArticleTitle=よくある並行性の問題を GPars で解決する
publish-date=09072010