【Python】非同期プログラミング,イベントループについて学ぶ


本稿について

先に断っておきますが,「非同期プログラミングとは,」なんて大それたものを語ることは,正直今できません.
本稿では非同期プログラミングの要素の1つである協調的マルチタスク,Pythonのasync,イベントループについて学んだことを記述します.

協調的マルチタスクと非同期I/O

協調的マルチタスクは非同期プログラミングの中心となる要素であり,思想みたいなところ.

※ここでいう非同期プログラミングはシステムのスケジューリング機能によってコンテキストスイッチが行われず,プログラム内部でスイッチすること

コンピュータがマルチタスクを行う上で,OSのコンテキストスイッチを利用せず,
各プロセスが待機状態に入ると,自発的にプロセス自身が制御を解放して
同時に実行されている多数の他の処理に制御権を渡します.(協調的という意味に匹敵)
ここでいう制御とはプロセスを中断,再開したり,リソースを確保や解放したりすることを指します.

円滑にマルチタスクを行うためには,すべてのプロセスが協調的である必要があるようです.

アプリケーションレベルの協調的マルチタスクを行う場合

協調的マルチタスクはどこでおこなうのか?
複数のプロセスやスレッドを協調させるのではなく,
すべてのマルチタスクを1つプロセス,スレッドの中で実行します.

誰がタスクの制御を行うのか?
複数のタスクの制御は1つ関数に限定し,その関数がタスクを協調を管理します.

協調的マルチタスクにおける重要な問題

協調的マルチタスクは制御を解放するタイミングが最も重要な問題.
解放するタイミングはスレッドの振る舞いに似ているところがあります.
どこが?
⇒非同期アプリケーションの多くは,I/O命令時にイベントループやスケジューラに制御を譲る.I/O処理待ちの時に制御を解放する.
※イベントループについては後述します.

違いは?
スレッドはシステムレベルのスレッドに起因し,OSは動作しているスレッドにいつでも割り込んで,ほかのスレッドに制御を渡せる.非同期プログラミングでは,タスクはイベントループによって割り込むことができない.(ノンプリエンティブマルチタスク)

(おそらく)非同期プログラミングについて最低限抑えておかなければならないこと

PythonのはOS上で他のプロセスやリソースを奪い合いながら動作しています.つまり,OSがすべてのプロセスを制御しているということ.非同期アプリケーションの場合,スケジューラの割り込みで処理を中断しますが,制御が戻されたとき,中断したその場所から再開します.マルチスレッドやマルチプロセスは必ずしもそうとは限らない.
また,
マルチプロセスやスレッドでは,再開するタスクはOSのスケジューラが決定します.
非同期プログラミングでは,再開するタスクはアプリケーションが決定します.

Pythonにおけるasyncawait

asyncawaitという予約語

asyncdef文の前に使用して,新しいコルーチン(並行タスク)を定義します.コルーチン関数の実行は定義された状況に応じて中断,再開されます.
asyncで定義された関数を呼び出されてもその場では関数の実行はせず,コルーチンオブジェクトというものを返します.
以下,実装例になります.

>>> async def asyc_hello():
...   print("Hello")
...
>>> asyc_hello()
<coroutine object asyc_hello at 0x000001D3D021C748>

asyc_hello()ではprint("Hello")の標準出力の値ではなく,コルーチンオブジェクトを返しているのがわかります.
では,コルーチンオブジェクトとはどのようなものか?どのように扱えばよいのか?この辺をざっくり説明します.
コルーチンオブジェクトを実行するためにはあるものを作らなければいけません.それはイベントループです.
以下,イベントループを作成してコルーチンオブジェクトを実行したコードになります.

>>> import asyncio
>>> async def asyncio_hello():
...     print("こんにちは")
...
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(asyncio_hello())
こんにちは
>>> loop.close()

asyncio.get_event_loop()によってイベントループというものを作成し,
run_until_complete(asyncio_hello())によって,コルーチンオブジェクトを実行します.次にイベントループについて何なのか,基本的なことを説明します.

イベントループについて

本稿では,イベントループに関連する用語の説明はしません.イベントキュー,イベントディスパッチャ,イベントハンドラ,コールバック関数などの説明でなく,イベントループという仕組みは何を行っているのかに着目して自分が学んだことを図で表現しました.
以下,その図になります.イベントというものを便宜上リクエストという表現に変えました.(便宜上というか個人的にそっちの方がイメージがつきやすいから)

コルーチンオブジェクトについて

コルーチンオブジェクトの扱いについて話を戻します.
コルーチンオブジェクトはイベントループの中で実行されるものになります.また,イベントループの中でキューに貯められ,順番が来るまでコルーチンオブジェクトは何もしません.
以下のソースコードは単純なコルーチンが1つだけ用意してイベントループの実行するコードです.

asyncprint.py
import asyncio

async def print_numeber(number):
    print(number)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()

    loop.run_until_complete(
        asyncio.wait([
            print_numeber(number)
            for number in range(10)
        ])
    )
    loop.close()
$ python src/asyncpritn.py
2
4
8
6
9
5
1
3
7
0

上記コードの基本的な流れです.

  1. イベントループを作成する:asyncio.get_event_loop()
  2. タスクを作成or追加する:asyncio.get_event_loop().create_task()orasyncio.wait()
  3. イベントループを実行する:asyncio.get_event_loop().run_until_complete()
  4. イベントループを明示的に閉じる:asyncio.get_event_loop().close()

次に,asyncio.wait()awaitという予約語を付与した場合の例になります.

corowait.py
import time 
import random
import asyncio

async def waiter(name):
    for _ in range(4):
        time_to_sleep = random.randint(1,3)/4
        time.sleep(time_to_sleep)
        print(
            "{}は{}秒待ちました"
            "".format(name, time_to_sleep)
        )

async def main():
    await asyncio.wait([waiter("foo"), waiter("bar")])

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
$time python corowait.py
fooは0.5秒待ちました
fooは0.25秒待ちました
fooは0.75秒待ちました
fooは0.25秒待ちました
barは0.25秒待ちました
barは0.75秒待ちました
barは0.75秒待ちました
barは0.25秒待ちました

real    0m4.416s
user    0m0.130s                                                                                      sys     0m0.013s  

awaitはコルーチンが実行を返すのを待ち,実行が終わるまで制御を解放してイベントループに渡します.
ここでtime.sleep()関数により,処理がブロックされます.なので,同期的処理になり,順番に処理が実行されます.
ブロッキング処理を非ブロッキング処理,非同期処理に変えるために,Pythonではasycio.sleep()関数があります.これを使用することにより,非同期処理の実行が可能になります.以下,サンプルコード.

cowait_improved.py
import time 
import random
import asyncio

async def waiter(name):
    for _ in range(4):
        time_to_sleep = random.randint(1,3)/4
        await asyncio.sleep(time_to_sleep)
        print(
            "{}は{}秒待ちました"
            "".format(name, time_to_sleep)
        )

async def main():
    await asyncio.wait([waiter("foo"), waiter("bar")])

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()
$time python corowait_improved.py
fooは0.25秒待ちました
barは0.75秒待ちました
fooは0.75秒待ちました
barは0.75秒待ちました
fooは0.75秒待ちました
barは0.25秒待ちました
fooは0.25秒待ちました
barは0.25秒待ちました

real    0m2.442s
user    0m0.161s                                                                                      sys     0m0.017s  

foo,barの関数が交互に実行され,処理速度が向上しました.コルーチンが協調的に制御を解放したことを意味します.

参考文献

https://www.atmarkit.co.jp/ait/articles/1103/23/news101_2.html
入門Python3
エキスパートPythonプログラミング改訂2版