目次


Python でのオンデマンド・データ, 第 3 回

コルーチンと asyncio

ソフトウェアの入力/出力処理でのパフォーマンスを劇的に向上させる方法を学ぶ

Comments

コンテンツシリーズ

このコンテンツは全#シリーズのパート#です: Python でのオンデマンド・データ, 第 3 回

このシリーズの続きに乞うご期待。

このコンテンツはシリーズの一部分です:Python でのオンデマンド・データ, 第 3 回

このシリーズの続きに乞うご期待。

このチュートリアル・シリーズの第 1 回では Python イテレーターについて詳しく調べ、第 2 回では itertools について学びました。今回のチュートリアルでは、コルーチンと呼ばれる特殊なタイプのジェネレーター関数について学びます。さらに、強力ながらも扱いに注意を要する標準ライブラリーのモジュールとして挙げられる、asyncio についても説明します。

想像してください。あなたは小さなレストランを訪れました。店内には 3 つのテーブルがありますが、接客係は 1 人しかいません。どのような流れになるのかは、わかるはずです。まず、接客係があなたのところにやってきてメニューを渡します。あなたが注文すると、注文された料理を伝えに奥に下がります。料理人が注文された料理を準備すると同時に、接客係が料理を運んできます。あなたが食事を終えると、接客係が伝票を持ってきます。その後、支払いを受け取りにテーブルに戻ってきます。

他のテーブルでは、客が食事を楽しんでいます。あなたが何を注文しようか考えている間、料理人があなたの料理を作っている間、あるいはあなたが食事をしている間、接客係はあなたに対して取ったのと同じ手順を他のテーブルでも取っているからです。複数のテーブルの客が同時に接客係に対応を求めている場合、接客を受けるまで数分待たなければなりませんが、それほど気にはなりません。

客のいるテーブルが 1 つだけのときに、レストランに入ってから出ていくまでの食事時間の平均が 1 時間だとして、もう 1 つのテーブルも塞がっていると待ち時間が加わって平均時間は 1 時間と 10 分、3 つのテーブルが塞がっている場合の平均時間は 1 時間と 20 分くらいになるとします。まずまずでしょう。

今度は別のシナリオを想像してください。レストランに入ると、接客係はすべての手順を終えるまで、1 つのテーブルに掛かりきりです。これでは食事を始められるまでに、2 時間も待つことになりかねません。接客係はあなたに対応する前に、他のテーブルでの接客を先に済ませなければならないためです。これでは、レストランの評判はがた落ちでしょう。

同期的と非同期的の比較

意外なことに、私たちが作成するコンピューター・コードのほとんどは、この非常に効率の悪いレストランと同じように動作しています。コンピューター用語では、評判の悪いレストランの状況は「シリアル処理」と呼ばれ、接客係の行動は「同期的」と表現されます。各テーブルでのサービスの流れに沿って、接客係が複数のテーブルに同時に対応するという、私たちが慣れているレストランの状況は「並列処理」と呼ばれ、接客係の行動は「非同期的」と表現されます。

この例えを説明するのにかなりの時間を費やしたわけは、この例えは、入力/出力 (I/O) に依存するデータベースやネットワークなどのリソースを使用するスケーラブルなアプリケーションを作成するためには、開発者が正しく理解しておかなければならない最も重要な手法のうちの 1 つを説明するためです。実際のレストランは、非同期プロセスに従っています。そうでなければ、客にとって魅力的でもなければ、競争にも勝てません。理想を言えば、実際のプログラムではできるだけ非同期プロセスを使用するようにすべきですが、そのためには開発者に正しいツール、ライブラリー、スキル、手法が必要です。シリーズ第 3 回となる今回のチュートリアルは入門編として、Python で非同期プロセスを使用する方法をわかりやすく紹介します。

私が強調したいのは、Python の非同期プログラミングのサポート機能はかなり多層化されていて、扱うのに注意が必要であり、同じことを行うのにいくつもの方法があるという点です。Python のサポート機能の中には、比較的最近導入されたことから実験段階での調整要素がまだ残っているものがあります。それでもやはり、このトピックは重要であり、根気よく学ぶ価値は十分にあります。ここでは意図的に、実際的なサポート機能を抜粋して説明します。基本概念を十分に理解すれば、他のサポート機能の手法も自力で調査できるようになるためです。

コルーチン

前のチュートリアルでジェネレーター関数の概要と、ジェネレーター関数が通常の関数とはどのように違うのかを説明しました。呼び出し側が通常の関数を呼び出すと、関数の先頭からプロセスが開始し、その関数のロジックに応じた特定の出口点で終了します。ジェネレーターの場合、呼び出し側は 1 つの関数に何度も出入りできます。それに従って、実行の中断、再開が繰り返されます。

このように何度も出入りできて、中断、再開される関数のことを、コルーチンと呼びます。ジェネレーターは、単純化された形のコルーチンに過ぎません。Python にはいくつかのタイプのコルーチンがありますが、このチュートリアルでは非同期処理をサポートするように設計されたコルーチンに焦点を絞ります。レストランの例えに戻ると、各テーブルでのメニュー/注文/食事/請求/支払いからなるシーケンスは、テーブルごとに個別のコルーチンです。けれども、接客係がそれぞれのテーブルへの対応を中断し、再開することで、テーブルによってプロセスの段階が異なるとしても 3 つすべてのコルーチンを同時に実行できます。十分に脳が訓練された接客係は、これらの並列コルーチンをやりくりするスケジューラーとしての役割を果たします。

同期的なレストランでは、すべてが通常の単一の関数です。客が到着すると関数が開始され、客が出ていくと終了されるといったように、開始、終了するのはそれぞれ一度だけです。通常の関数は一度に 1 つだけ実行されるため、客が食事を始めるまでに 2 時間も待たなければならない場合もあります。

非同期的レストランでは、客が到着した時点でコルーチン関数が開始され、コルーチン・オブジェクトが作成されます。客が出て行った時点でコルーチン関数が終了されると、コルーチン・オブジェクトは必要なくなります。ただしコルーチン関数が開始されて終了されるまでの間、接客係はメニューをテーブルに持って行った後に、その特定のテーブルのコルーチン・オブジェクトを中断して、サービスを必要とするテーブルが他にないかどうかチェックできます。これと同じことが、どのテーブルでも、注文が取られた後、伝票を受け取った後などに行われます。

レストランの接客係を表すコード

Python は疑似コードとほぼ同じくらいに読んで理解しやすいという事実を利用して、以下に接客係を実装した実際のコルーチンのコードを示します。

async def serve_table(table_number):
    await get_menus()
    print('Welcome. Please sit at table', table_number, 'Here are your menus')
    order = await get_order()
    print('Table', table_number, 'what will you be having today?')
    await prepare_order(order)
    print('Table', table_number, 'here is your meal:', order)
    await eat()
    print('Table', table_number, 'here is your check')
    await get_payment()
    print('Thanks for visiting us! (table', table_number, ')')

上記の関数は、単なる def ではなく、async def を使用して定義されています。これにより、この関数が非同期コルーチン関数としてマークされます。ついでに言及すると、関数の本文のどこかで yield ステートメントを渡す非同期コルーチン・ジェネレーター関数もありますが、そうするのは特殊なケースであり、このチュートリアル・シリーズの範囲外です。正直なところ、Python 3 には困惑するほど多様な関数/ジェネレーター/コルーチンがごったがえしています。けれどもこのチュートリアル・シリーズでこれまでそうしてきたように、使用できる方法のいくつかは無視し、入門者にとってわかりやすい方法を紹介します。

serve_table の本文には、一連の await ステートメントが含まれています。これにより、呼び出されたコルーチン関数からコルーチン・オブジェクトが生成されます。また、生成されたオブジェクトを呼び出すと、実行できる状態の他のコルーチンに制御を渡すことにもなります。いわば、レストランの接客係が料理人に料理の支度を開始させると同時に、サービスを必要とするテーブルが他にないかどうかをチェックするといったプロセスを開始することと同じです。

こうしたタスクのジャグリングは、十分に訓練された接客係の脳の中で行われます。Python でこのジャグリングに相当するのは、イベント・ループと呼ばれるものです。イベント・ループについては、この後すぐに取り上げます。

他のコルーチン

serve_table によって呼び出される他のコルーチンの実装を見ていきましょう。

async def get_menus():
    delay_minutes = random.randrange(3) #0 to 3 minutes
    await asyncio.sleep(delay_minutes) #Pretend a second is a minute

async def get_order():
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)
    order = random.choice(['Special of the day', 'Fish & Chips', 'Pasta'])
    return order

async def prepare_order(order):
    delay_minutes = random.randrange(10, 20) #10 to 20 minutes
    await asyncio.sleep(delay_minutes)
    print('   [Order ready from kitchen: ', order, ']')

async def eat():
    delay_minutes = random.randrange(20, 40)
    await asyncio.sleep(delay_minutes)

async def get_payment():
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)

上記の一連の関数ではスリープ・タイマーを使用して、何らかの処理を行うまで待機する時間をシミュレーションしています。random.randrange 関数が指定する整数の範囲の中から、1 つの整数がランダムに選択されます。asyncio.sleep 関数は、指定された秒数の間、アクションを中断させる特殊なコルーチンです。当然、このスリープ期間中は、イベント・ループが実行可能な状態にある他のどのコルーチンでも自由に実行することができます。通常と同じく、この関数を呼び出すには await キーワードを使用します。

よい機会なので言い添えておきますが、await キーワードを使用できるのは、非同期コルーチン関数の本文内に限られます (例えば、async def を使用して定義します)。await を他の場所で使用すると、構文エラーが発生します。

get_order コルーチンは値を返すことに注目してください。この値は、呼び出し側の await ステートメントに返されます。

すべてを連携させる: イベント・ループ

前に、イベント・ループについて触れました。非同期モードを開始してイベント・ループを作成し、連携タスクとして実行されるようにコーディングされたコルーチンをイベント・ループでスケジューリングして管理するには、特殊なセットアップ・コードが必要です。簡潔さを維持する目的で、asyncio コルーチンはタスクとも呼ばれます。コルーチンが await を使用して制御を別のコルーチンに渡すということは、実際には制御をイベント・ループに返すことを意味します。イベント・ループは十分に訓練された接客係の脳のようなものです。

以下に、これまでに定義したレストラン接客係のコルーチンを実行するためのコードを記載します。

#Create coroutines for three tables
gathered_coroutines = asyncio.gather(
    serve_table(1),
    serve_table(2),
    serve_table(3)
)

#asyncio uses event loops to manage its operation
loop = asyncio.get_event_loop()
#This is the entry from synchronous to asynchronous code. It will block
#Until the coroutine passed in has completed
loop.run_until_complete(gathered_coroutines)
#We're done with the event loop
loop.close()

asyncio.gather という特殊なコルーチンは、1 つ以上の他のコルーチンを取り、そのすべてが実行されるようにスケジューリングします。この特殊なコルーチンは、収集されたすべてのコルーチンが完了するまでは完了しません。上記のコードでは、イベント・ループ内で 3 つのテーブルのコルーチンを実行するために asyncio.gather を使用しています。最初に ayncio.get_event_loop によって asyncio.gather を取得し、その次の行で指定のコルーチンを実行します。そのすべてが完了すると、asyncio.gather が完了します。ここでは 3 つのコルーチンの集合が渡されるため、asyncio.gather はこれら 3 つのコルーチンが完了するまで実行し続けることになります。当然、それぞれの serve_table コルーチンは他のコルーチン (get_menusget_order など) を呼び出します。await を使用して呼び出された他のコルーチンは、イベント・ループによってスケジューリングされます。

完全なプログラム

リスト 1. 完全な serve_tables.py プログラム
import random
import asyncio

async def get_menus():
    delay_minutes = random.randrange(3) #0 to 3 minutes
    await asyncio.sleep(delay_minutes) #Pretend a second is a minute

async def get_order():
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)
    order = random.choice(['Special of the day', 'Fish & Chips', 'Pasta'])
    return order

async def prepare_order(order):
    delay_minutes = random.randrange(10, 20) #10 to 20 minutes
    await asyncio.sleep(delay_minutes)
    print('   [Order ready from kitchen: ', order, ']')

async def eat():
    delay_minutes = random.randrange(20, 40)
    await asyncio.sleep(delay_minutes)

async def get_payment():
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)

async def serve_table(table_number):
    await get_menus()
    print('Welcome. Please sit at table', table_number, 'Here are your menus')
    order = await get_order()
    print('Table', table_number, 'what will you be having today?')
    await prepare_order(order)
    print('Table', table_number, 'here is your meal:', order)
    await eat()
    print('Table', table_number, 'here is your check')
    await get_payment()
    print('Thanks for visiting us! (table', table_number, ')')

#Create coroutines for three tables
gathered_coroutines = asyncio.gather(
    serve_table(1),
    serve_table(2),
    serve_table(3)
)

#asyncio uses event loops to manage its operation
loop = asyncio.get_event_loop()
#This is the entry from synchronous to asynchronous code. It will block
#Until the coroutine passed in has completed
loop.run_until_complete(gathered_coroutines)
#We're done with the event loop
loop.close()

以下に、このプログラムの出力例を記載します。

Welcome. Please sit at table 1 Here are your menus
Welcome. Please sit at table 2 Here are your menus
Table 1 what will you be having today?
Welcome. Please sit at table 3 Here are your menus
Table 3 what will you be having today?
Table 2 what will you be having today?
   [Order ready from kitchen:  Pasta ]
Table 1 here is your meal: Pasta
   [Order ready from kitchen:  Fish & Chips ]
Table 3 here is your meal: Fish & Chips
   [Order ready from kitchen:  Special of the day ]
Table 2 here is your meal: Special of the day
Table 3 here is your check
Table 1 here is your check
Thanks for visiting us! (table 3 )
Thanks for visiting us! (table 1 )
Table 2 here is your check
Thanks for visiting us! (table 2 )

ほとんどの行の間に数秒の遅延が発生します。この遅延は、さまざまなコルーチン内で発生するスリープ遅延であり、レストラン内で何らかの行動がとられるまで待機する時間をシミュレーションしています。時間は短縮されていて、プログラムでの 1 秒はレストラン内での 1 分を表します。スリープ遅延はランダムな長さなので、プログラムを実行するたびにメッセージが異なる順序で表示されます。

また、このプログラムは常にテーブル 1、テーブル 2、テーブル 3 の順で開始しないことにも注目してください。asyncio.gather コルーチンは指定されたコルーチンをスケジューリングしますが、特定の順序を付けることはしません。

このプログラムの真価は、連携マルチタスクキングのフローにあります。上記のコードを実行して調整しながら、コルーチンが制御を渡した後、再び制御を取り返す動作を十分に把握できるまで調べてください。たまに、3 つの serve_table コルーチン・オブジェクトすべてが、他のコルーチンの 1 つを同時に呼び出すことがあります。3 つのコルーチン・オブジェクトのすべてがたまたまスリープ遅延で待機中であったためです。その場合には数秒間、出力が表示されません。このようなときは、イベント・ループが根気よく各コルーチンをチェックして、再開できるタイミングを見計らいます。

コルーチンを追加する

リスト 1 のプログラムを実行中に、ある出力から次の出力まで遅延させる方法を説明しました。けれども、それよりも、ある種の進捗インジケーターを表示したほうがユーザー・フレンドリーです。進捗インジケーターを実装するには、連携マルチタスキングの魔法のような力を利用できます。以下に示すコルーチン関数は、進捗インジケーターとして 1 秒間に数回ドットを表示します。

async def progress_indicator(delay, loop):
    while True:
        try:
            await asyncio.sleep(delay)
        except asyncio.CancelledError:
            break
        #Print a dot, with no newline afterward & force the output to appear immediately
        print('.', end='', flush=True)
        #Check if this is the last remaining task, and exit if so
        num_active_tasks = [ task for task in asyncio.Task.all_tasks(loop)
                                  if not task.done() ]
        if len(num_active_tasks) == 1:
            break

この関数は引数として、ドットを出力する最小間隔とイベント・ループ・オブジェクトの 2 つを取ります。イベント・ループ・オブジェクトは、例えばリスト 1 の下のほうで作成されているようなものです。かなりの数の asyncio にループ・オブジェクトを渡すことになりますが、一連の制御対象のコルーチン間で連携が維持されるようにしてください。上記の例では、イベント・ループを asyncio.Task.all_tasks に渡します。すると、このコルーチンから、そのイベント・ループ内でスケジューリングされているすべてのタスク (つまりコルーチン) のリストが返されます。リストには完了済みのタスクも含まれています。完了していないタスクだけを取得するには、task.done を使用してタスクをふるいにかけます。

例えば、遅延の引数として .0.5 を渡して、この関数からコルーチン・オブジェクトを作成します。すると、前回のチュートリアルで説明した無限ジェネレーターでの場合と同じように、すぐに無限ループが開始されます。関数は続いてスリープ遅延を呼び出しますが、外部エンティティーがコルーチンをキャンセルした場合の例外を考慮に入れます (この例外が発生する状況はいくつかあります)。キャンセルされた場合、コルーチンは asyncio.CancelledError 例外で中断され、無限ループが断ち切られることになります。

コルーチンが正常に再開されると、関数はドットを出力してから、他のすべてのコルーチンが正常に実行したかどうかをチェックします。残っている唯一のコルーチンが progress_indicator であれば、無限ループから抜け出します。

リスト 2. コルーチンを使用するように更新された完全な progress_indicator プログラム
import random
import asyncio

async def get_menus():
    delay_minutes = random.randrange(3) #0 to 3 minutes
    await asyncio.sleep(delay_minutes) #Pretend a second is a minute

async def get_order():
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)
    order = random.choice(['Special of the day', 'Fish & Chips', 'Pasta'])
    return order

async def prepare_order(order):
    delay_minutes = random.randrange(10, 20) #10 to 20 minutes
    await asyncio.sleep(delay_minutes)
    print('   [Order ready from kitchen: ', order, ']')

async def eat():
    delay_minutes = random.randrange(20, 40)
    await asyncio.sleep(delay_minutes)

async def get_payment():
    delay_minutes = random.randrange(10)
    await asyncio.sleep(delay_minutes)

async def progress_indicator(delay, loop):
    while True:
        try:
            await asyncio.sleep(delay)
        except asyncio.CancelledError:
            break
        #Print a dot, with no newline afterward & force the output to appear immediately
        print('.', end='', flush=True)
        #Check if this is the last remaining task, and exit if so
        num_active_tasks = [ task for task in asyncio.Task.all_tasks(loop)
                                  if not task.done() ]
        if len(num_active_tasks) == 1:
            break

async def serve_table(table_number):
    await get_menus()
    print('Welcome. Please sit at table', table_number, 'Here are your menus')
    order = await get_order()
    print('Table', table_number, 'what will you be having today?')
    await prepare_order(order)
    print('Table', table_number, 'here is your meal:', order)
    await eat()
    print('Table', table_number, 'here is your check')
    await get_payment()
    print('Thanks for visiting us! (table', table_number, ')')

#asyncio uses event loops to manage its operation
loop = asyncio.get_event_loop()

#Create coroutines for three tables
gathered_coroutines = asyncio.gather(
    serve_table(1),
    serve_table(2),
    serve_table(3),
    progress_indicator(0.5, loop)
)

#This is the entry from synchronous to asynchronous code. It will block
#Until the coroutine passed in has completed
loop.run_until_complete(gathered_coroutines)
#We're done with the event loop
loop.close()

コルーチンの集合を作成する前に、イベント・ループが作成されるようになったことに注目してください。これは、収集されるコルーチンのリストを見るとわかるように、イベント・ループを progress_indicator に渡す必要があるためです。

以下に、出力例を記載します。

.Welcome. Please sit at table 3 Here are your menus
..Welcome. Please sit at table 2 Here are your menus
Welcome. Please sit at table 1 Here are your menus
........Table 3 what will you be having today?
......Table 1 what will you be having today?
....Table 2 what will you be having today?
..........   [Order ready from kitchen:  Fish & Chips ]
Table 3 here is your meal: Fish & Chips
..............   [Order ready from kitchen:  Fish & Chips ]
Table 2 here is your meal: Fish & Chips
......   [Order ready from kitchen:  Pasta ]
Table 1 here is your meal: Pasta
..............................Table 3 here is your check
Thanks for visiting us! (table 3 )
..........Table 2 here is your check
..Thanks for visiting us! (table 2 )
......................Table 1 here is your check
..........Thanks for visiting us! (table 1 )
.

進捗インジケーターのドットは定期的 (約 0.5 秒間隔) に表示されています。

このマルチタスキングの特性

Python でのマルチスレッディングまたはマルチプロセッシングに取り組んだ経験があるとしたら、この asyncio 連携マルチタスキングの手法にはどのような違いがあるのか興味があることでしょう。主な違いは、asyncio の手法では実際に 2 つのコルーチンに同時に処理を行わせようとしているのではないという点にあります。まさに、レストランの接客係がテーブル 1 にメニューを持っていくのと同時にテーブル 3 に食事を運ぶことはできないことと同じです。asyncio イベント・ループの役目は、タスク内での自然な中断時間を利用して、コルーチンに処理するものがあるときにはそれを処理させて、その後アイドルになった時点で他のコルーチンに制御を渡せるようにすることです。

コルーチンは、再び実行中の状態になるタイミングを制御できません。これがなぜ連携マルチタスキングと呼ばれるのかを説明すると、あるコルーチンがイベント・ループに制御を返さずに長時間処理を続けた場合、他のすべてのコルーチンがブロックされて不要な遅延が生じるからです。それでは、マルチタスキングがもたらすメリットはありません。つまり、開発者にとって何よりもまず必要なのは、連携マルチタスキングとして実装するのにふさわしいプログラムであるかどうかを把握することです。ふさわしいものであれば、プログラムをコルーチンに分割し、適切なタイミングでコルーチンの間で制御が受け渡されるように慎重にコーティングする必要があります。これは、思うよりも油断がならない作業になることがあります。コルーチンから通常の関数を呼び出すと、処理が完了するまでに時間がかかりますが、この問題はすぐには明らかにならないためです。

一般的な経験則として、asyncio イベント・ループは、ネットワークに頻繁に接続するプログラム、または大量のデータベース・クエリーなどを実行するプログラムに適しています。リモート・サーバーまたはデータベースがリクエストやクエリーに応答するまで待機する時間が、制御をイベント・ループに返すのに絶好のタイミングとなります。かつて、プログラマーはこのような場合にスレッドを使用して対処する傾向がありましたが、マルチスレッディングよりも asyncio イベント・ループのほうが遥かに明確で柔軟なプログラミング手法です。連携マルチタスキングに伴う複雑さには、asyncio イベント・ループを最大限に利用するためには、ネットワークおよびデータベース API を asyncio コルーチン内でコーディングしなければならないことが挙げられます。けれども最近は幸いにも、asyncio を利用するように実装された多数の Python サード・パーティー・ライブラリーを入手できるようになっています。

それでもなお、asyncio を利用したくても、使用しなければならないライブラリーが asyncio をサポートしていないという問題に突き当たることはあるでしょう。つまり、マルチタスキングを台無しにすることなく、非同期コードから同期コードを呼び出さなければなりません。それには、別個のスレッドまたはプロセス内で同期コードを実行する asyncio executor を使用できます。皆さんが疑問に思っている場合に備え、この方法に言及しましたが、その詳細についてはこのチュートリアル・シリーズの範囲外です。

まとめ

asyncio を習熟するにつれ、この手法に関連する他の風変りな概念、例えば印象的な名前の「Future」などについての知識が増えていくはずです。また、コロケーションからイベント・ループに制御を渡すための他の手段があることもわかるでしょう。例えば async with、または Python 3.6 以降を使用している場合は async for などの手段です。このチュートリアルでは Python 3.5 を最小要件としているので後者については取り上げませんが、async with については他の巧みな手法と併せ、次のチュートリアルで説明します。

関連トピック

Python に関する技術対談、コード・パターン、ブログを参照してください。

cognitiveclass.ai に用意されている Python コースを参照してください。


ダウンロード可能なリソース


コメント

コメントを登録するにはサインインあるいは登録してください。

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=ビジネス・アナリティクス, Open source
ArticleID=1064515
ArticleTitle=Python でのオンデマンド・データ, 第 3 回: コルーチンと asyncio
publish-date=01242019