今まで知らなかった 5 つの事項: java.util.concurrent 第 2 回

並行プログラミングとはスマートな作業であり、困難な作業ではありません

java.util.concurrent には並行処理に有効なコレクションの他にも、マルチスレッド・アプリケーションでのスレッドの制御や実行に役立つ作成済みのコンポーネントが導入されています。この記事では、Ted Neward が java.util.concurrent パッケージの中で Java™ プログラミングに必ず必要な事項をさらに 5 つ紹介します。

Ted Neward, Principal, Neward & Associates

Ted Neward photoTed Neward は、Neward & Associates の代表として、Java や .NET、XML サービスなどのプラットフォームに関するコンサルティング、助言、指導、講演を行っています。彼はワシントン州シアトルの近郊に在住です。



2010年 6月 01日

並行コレクションは、適切に調整されたスレッドセーフなデータ構造を提供できるため、開発者にとって並行プログラミングが容易になります。しかし場合によると、もう一歩進んで、スレッドの実行に関する制御や制限について考えなければならないケースもあります。java.util.concurrent 本来の目的はマルチスレッド・プログラミングを単純化することであると考えると、このパッケージに同期ユーティリティーが含まれていることを望む人がいるかもしれません。そして実際に、そうしたユーティリティーが含まれているのです。

この記事は第 1 回の続きとして、同期のための構成体をいくつか紹介します。これらの構成体は Java 言語の中核的な基本要素 (モニター) よりも上位のレベルにあるものですが、それほど上位にあるわけではなく、コレクション・クラスの中に組み込まれています。こうしたロックやゲートの使い方は、その目的を理解できさえすれば非常に単純です。

この連載について

皆さんは自分が Java プログラミングについて知っていると思うかもしれません。しかし実際には、ほとんどの開発者は Java プラットフォームの表面的な部分しか扱っておらず、当面の作業を完了するために十分なことしか学んでいません。この連載では、Ted Neward が Java プラットフォームのコア機能を深く掘り下げ、非常に厄介な Java プログラミングの難題の解決にも役立つ、ほとんど知られていない事実を紹介します。

1. Semaphore

一部のエンタープライズ・システムでは、特定のリソースに対してオープンされるリクエスト (スレッドやアクション) の数を制限することは珍しくありません。実際、リクエストの数を制限することで特定のリソースで発生する競合の度合いが軽減され、システムのスループットを改善できる場合があります。確かにそうした制限用のコードをハンドコーディングすることも可能ですが、制限を処理してくれる Semaphore クラスを使った方が簡単です (リスト 1)。

リスト 1. Semaphore を使って制限する
import java.util.*;import java.util.concurrent.*;

public class SemApp
{
    public static void main(String[] args)
    {
        Runnable limitedCall = new Runnable() {
            final Random rand = new Random();
            final Semaphore available = new Semaphore(3);
            int count = 0;
            public void run()
            {
                int time = rand.nextInt(15);
                int num = count++;
                
                try
                {
                    available.acquire();
                    
                    System.out.println("Executing " + 
                        "long-running action for " + 
                        time + " seconds... #" + num);
                
                    Thread.sleep(time * 1000);

                    System.out.println("Done with #" + 
                        num + "!");

                    available.release();
                }
                catch (InterruptedException intEx)
                {
                    intEx.printStackTrace();
                }
            }
        };
        
        for (int i=0; i<10; i++)
            new Thread(limitedCall).start();
    }
}

この例では 10 本のスレッドが実行されています (それを確認するためには、SemApp を実行する Java プロセスに対して jstack を実行します)。しかしアクティブなスレッドは 3 本しかありません。他の 7 本は、セマフォー・カウントの 1 つが解放されるまで動作が保留されます。(実は、Semaphore クラスは 1 度に複数の許可を取得、解放することができますが、それはこのシナリオでは意味をなしません。)


2. CountDownLatch

Semaphore が 1 度に 1 つずつスレッドを「中に入れる」ための並行クラスであるなら (人気のナイトクラブの用心棒を思い浮かべてください)、CountDownLatch は競馬の出走ゲートです。CountDownLatch クラスは特定の条件が満たされるまですべてのスレッドを保留し、その条件が満たされると、すべてのスレッドを同時に解放します。

リスト 2. CountDownLatch: レースに行きましょう
import java.util.*;
import java.util.concurrent.*;

class Race
{
    private Random rand = new Random();
    
    private int distance = rand.nextInt(250);
    private CountDownLatch start;
    private CountDownLatch finish;
    
    private List<String> horses = new ArrayList<String>();
    
    public Race(String... names)
    {
        this.horses.addAll(Arrays.asList(names));
    }
    
    public void run()
        throws InterruptedException
    {
        System.out.println("And the horses are stepping up to the gate...");
        final CountDownLatch start = new CountDownLatch(1);
        final CountDownLatch finish = new CountDownLatch(horses.size());
        final List<String> places = 
            Collections.synchronizedList(new ArrayList<String>());
        
        for (final String h : horses)
        {
            new Thread(new Runnable() {
                public void run() {
                    try
                    {
                        System.out.println(h + 
                            " stepping up to the gate...");
                        start.await();
                        
                        int traveled = 0;
                        while (traveled < distance)
                        {
                            // In a 0-2 second period of time....
                            Thread.sleep(rand.nextInt(3) * 1000);
                            
                            // ... a horse travels 0-14 lengths
                            traveled += rand.nextInt(15);
                            System.out.println(h + 
                                " advanced to " + traveled + "!");
                        }
                        finish.countDown();
                        System.out.println(h + 
                            " crossed the finish!");
                        places.add(h);
                    }
                    catch (InterruptedException intEx)
                    {
                        System.out.println("ABORTING RACE!!!");
                        intEx.printStackTrace();
                    }
                }
            }).start();
        }

        System.out.println("And... they're off!");
        start.countDown();        

        finish.await();
        System.out.println("And we have our winners!");
        System.out.println(places.get(0) + " took the gold...");
        System.out.println(places.get(1) + " got the silver...");
        System.out.println("and " + places.get(2) + " took home the bronze.");
    }
}

public class CDLApp
{
    public static void main(String[] args)
        throws InterruptedException, java.io.IOException
    {
        System.out.println("Prepping...");
        
        Race r = new Race(
            "Beverly Takes a Bath",
            "RockerHorse",
            "Phineas",
            "Ferb",
            "Tin Cup",
            "I'm Faster Than a Monkey",
            "Glue Factory Reject"
            );
        
        System.out.println("It's a race of " + r.getDistance() + " lengths");
        
        System.out.println("Press Enter to run the race....");
        System.in.read();
        
        r.run();
    }
}

リスト 2 で、CountDownLatch には 2 つの目的があることに注意してください。第 1 に、CountDownLatch はすべてのスレッドを同時に解放し、レースの開始をシミュレートしています。しかしその後、別のラッチによってレースの終了をシミュレートしており、基本的に main スレッドによって結果を出力できるようにしています。もっと実況解説の豊富なレースにするためには、馬がコースの 1/4、1/2、3/4 の各地点を通過するのに合わせ、レースの「コーナー」や「中間点」に CountDownLatch を追加することもできます。


3. Executor

リスト 1リスト 2 の例には共に、Thread オブジェクトを直接作成する必要がある、という非常に苛立たしい欠陥があります。これはトラブルの元です。一部の JVM では Thread の作成は非常に面倒な作業であり、既存の Thread を再利用した方が新しい Thread を作成するよりもはるかに望ましいからです。しかし他の JVM では、それがまったく逆です。つまり Thread は非常に軽量であり、必要な時に new によって単純に新しい Thread を作成した方がはるかに望ましいです。もちろん、もしマーフィーの法則が適用されるなら (通常は適用されがちです)、どちらの方法を選んだとしても、皆さんが選んだ方法はデプロイ対象のプラットフォームにはたまたま不適切、という羽目になります。

JSR-166 Expert Group (「参考文献」を参照) は、この状況をある程度想定していました。Java 開発者に Thread を直接作成させる代わりに、彼らは新しいスレッドを作成するための抽象インターフェース Executor を導入しました。リスト 3 に示すように、Executor を使用すると、開発者自身が new によって新しい Thread オブジェクトを作成しなくても、スレッドを作成することができます。

リスト 3. Executor
Executor exec = getAnExecutorFromSomeplace();
exec.execute(new Runnable() { ... });

Executor を使う上での最大の欠点は、私達があらゆるファクトリーで直面する欠点と同じです。つまり、どこかでファクトリーを入手しなければなりません。残念ながら CLR の場合とは異なり、どの VM にも対応した標準的なスレッド・プールというものは JVM に同梱されていません。

Executor クラスは Executor を実装するインスタンスを得るための共通の場としては機能しますが、(例えば新しいスレッド・プールを作成するための) new メソッドしか持っておらず、事前に作成されたインスタンスはありません。そのため、コードの中で Executor インスタンスを作成して使いたい場合には、自分でインスタンスを作成するしかありません (あるいは、皆さんが選択したコンテナーやプラットフォームによって提供されるインスタンスを使用できる場合もあるかもしれません)。

ExecutorService がサービスに登場

Thread をどこから得るかを気にする必要がないという点で Executor は便利ですが、Executor インターフェースには Java 開発者が期待する機能のいくつかが欠けています。例えば、ある結果を生成するように設計されたスレッドを起動し、その結果が得られるまで邪魔にならないように待つ、といった機能がありません (これはデスクトップ・アプリケーションで一般的に必要とされる機能です。デスクトップ・アプリケーションでは、データベースへのアクセスが必要な UI 操作をユーザーが実行し、その実行に時間がかかりすぎる場合には完了前にユーザーが操作をキャンセルしようとする場合があります)。

JSR-166 Expert Group の人達は、このために Executor よりもはるかに有用な抽象化を行い、ExecutorService インターフェースを作成しました。ExecutorService インターフェースはスレッドを起動するファクトリーを、集合的に制御可能なサービスとしてモデル化しています。例えば、ExecutorService はタスクごとに execute() を 1 度呼び出す代わりにタスクの集合を引数に取り、それらの各タスクの将来の結果を表す「将来のリスト」を返します。


4. ScheduledExecutorService

ExecutorService は優れたインターフェースですが、一部のタスクはスケジュールに従って実行する必要があります (例えば、指定タスクを決まった間隔または特定の時刻に実行する、など)。それを行うのが、ExecutorService を継承する ScheduledExecutorService です。

例えば 5 秒ごとに ping を実行する「ハートビート」コマンドを作成することが目標の場合には、ScheduledExecutorService によってリスト 4 のように簡単になります。

リスト 4. ScheduledExecutorService によって一定間隔で ping を実行する
import java.util.concurrent.*;

public class Ping
{
    public static void main(String[] args)
    {
        ScheduledExecutorService ses =
            Executors.newScheduledThreadPool(1);
        Runnable pinger = new Runnable() {
            public void run() {
                System.out.println("PING!");
            }
        };
        ses.scheduleAtFixedRate(pinger, 5, 5, TimeUnit.SECONDS);
    }
}

いかがでしょう。スレッドを気にする必要がなく、ユーザーがハートビートをキャンセルしようとした場合の対応を気にする必要もなく、スレッドに対してフォアグラウンドまたはバックグラウンドと明示的に指定する必要はなく、スケジューリング動作の詳細をすべて ScheduledExecutorService に任せればよいのです。

ちなみに、実際にユーザーがハートビートをキャンセルしようとした場合には、scheduleAtFixedRate を呼び出した結果として返されるのは ScheduledFuture インスタンスです。ScheduledFuture インスタンスは結果がある場合には結果をラップしますが、このインスタンスにはスケジュール操作を終了するための cancel メソッドもあります。


5. タイムアウト・メソッド

他の動作をブロックする操作に対して明確なタイムアウトを設定できる機能 (従ってデッドロックを回避できる機能) は、並行処理に関するかつての機能 (モニターやロックなど) に比べて java.util.concurrent ライブラリーで大きく改善された点のひとつです。

これらのタイムアウトを設定できるメソッドは、ほぼ必ず int/TimeUnit でオーバーロードされるので、ブロックが解除されてプログラムに制御が返されるまでに、他のメソッドがどれくらい待機しなければならないかが示されます。しかし、これによって開発者の作業は増えることになり、ロックを取得できない場合にどう対処すればよいのか検討しなければなりません。それでも、その結果はほとんど常に適切なものとなり、デッドロックの少ない、より本番対応のコードを生成することができるのです (本番に使用可能なコードの作成方法については、Michael Nygard による「Release It!」を「参考文献」に挙げましたので、それを参照してください)。


まとめ

java.util.concurrent パッケージには、単なるコレクションの範疇をはるかに超えた便利なユーティリティーが他にも含まれています。特に .locks パッケージと .atomic パッケージには数多くのユーティリティーがあり、これらのパッケージを詳しく調べると、CyclicBarrier などの便利な制御構造体も見つかるはずです。

Java プラットフォームの多くの側面と同様、それほど苦労して探さなくても、基盤となる非常に有用なコードが見つかるはずです。マルチスレッドのコードを作成する際には、この記事や前回の記事で説明したユーティリティーを必ず思い出してください。

次回の記事では新しいトピックに切り替え、JAR に関して今まで知らなかった 5 つの事項について説明します。


ダウンロード

内容ファイル名サイズ
Sample code for this article5things5-src.zip10KB

参考文献

学ぶために

議論するために

コメント

developerWorks: サイン・イン

必須フィールドは(*)で示されます。


IBM ID が必要ですか?
IBM IDをお忘れですか?


パスワードをお忘れですか?
パスワードの変更

「送信する」をクリックすることにより、お客様は developerWorks のご使用条件に同意したことになります。 ご使用条件を読む

 


お客様が developerWorks に初めてサインインすると、お客様のプロフィールが作成されます。会社名を非表示とする選択を行わない限り、プロフィール内の情報(名前、国/地域や会社名)は公開され、投稿するコンテンツと一緒に表示されますが、いつでもこれらの情報を更新できます。

送信されたすべての情報は安全です。

ディスプレイ・ネームを選択してください



developerWorks に初めてサインインするとプロフィールが作成されますので、その際にディスプレイ・ネームを選択する必要があります。ディスプレイ・ネームは、お客様が developerWorks に投稿するコンテンツと一緒に表示されます。

ディスプレイ・ネームは、3文字から31文字の範囲で指定し、かつ developerWorks コミュニティーでユニークである必要があります。また、プライバシー上の理由でお客様の電子メール・アドレスは使用しないでください。

必須フィールドは(*)で示されます。

3文字から31文字の範囲で指定し

「送信する」をクリックすることにより、お客様は developerWorks のご使用条件に同意したことになります。 ご使用条件を読む

 


送信されたすべての情報は安全です。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=Java technology
ArticleID=498260
ArticleTitle=今まで知らなかった 5 つの事項: java.util.concurrent 第 2 回
publish-date=06012010