asyncioを深く理解する(二)


asyncioを深く理解する(二)
Asyncio.gather vs asyncio.wait
前の文章ではasyncioを何度も使っているのを見た.もう一つの使い方はasyncioですwaitは、複数のコラボレーションを同時に実行することができます.では、なぜ2つの方法を提供するのでしょうか.彼らはどんな違いがありますか.適用シーンはどうですか.実は私もasyncioのソースコードを読むまで少し困惑していました.まず2つの協程の例を見てみましょう.
async def a():
    print('Suspending a')
    await asyncio.sleep(3)
    print('Resuming a')
    return 'A'


async def b():
    print('Suspending b')
    await asyncio.sleep(1)
    print('Resuming b')
    return 'B'

IPythonでgatherで実行します.
In : return_value_a, return_value_b = await asyncio.gather(a(), b())
Suspending a
Suspending b
Resuming b
Resuming a

In : return_value_a, return_value_b
Out: ('A', 'B')

Ok,asyncio.gatherメソッドの名前はその用途を説明しており、gatherの意味は「収集」であり、つまりコヒーレンスの結果を収集することができ、入力コヒーレンスの順序で保存される対応するコヒーレンスの実行結果に注意してください.
次にasyncioと言いますawait、まず実行します.
In : done, pending = await asyncio.wait([a(), b()])
Suspending b
Suspending a
Resuming b
Resuming a

In : done
Out:
{
     <Task finished coro=<a() done, defined at <ipython-input-5-5ee142734d16>:1> result='A'>,
 <Task finished coro=<b() done, defined at <ipython-input-5-5ee142734d16>:8> result='B'>}

In : pending
Out: set()

In : task = list(done)[0]

In : task
Out: <Task finished coro=<b() done, defined at <ipython-input-5-5ee142734d16>:8> result='B'>

In : task.result()
Out: 'B'

asyncio.waitの戻り値は2つあり、1つ目は完了したタスクリスト(done)、2つ目は完了待ちのタスクリスト(pending)であり、各タスクはTaskインスタンスであり、これら2つのタスクはすでに完了しているためtaskを実行することができる.result()は、コヒーレント戻り値を取得します.
Ok、ここまで言って、私はその2つの違いの第1層の違いをまとめました.
  • asyncio.gatherパッケージのTask全行程ブラックボックスは、協程結果だけを教えてくれます.
  • asyncio.waitはパッケージされたTask(完了したタスクと保留中のタスクを含む)を返します.コンカレント実行結果に注目する場合は、対応するTaskインスタンスからresultメソッドで自分で持つ必要があります.

  • なぜ「第1層の違い」と言うのか、asyncio.waitは名前を見ると「待つ」と理解できるので、戻り値の2番目の項目はpendingリストですが、上記の例を見るとpendingは空の集合ですが、どのような場合、pendingの中は空ではありませんか?これが第2層の違いですwaitは、戻るタイミングを選択することをサポートします.
    asyncio.waitは受信パラメータreturnをサポートします.デフォルトではasynciowaitはすべてのタスクの完了(return_when=‘ALL_COMPLETED’)を待っており、FIRSTもサポートされています.COMPLETEDとFIRST_EXCEPTION:
    In : done, pending = await asyncio.wait([a(), b()], return_when=asyncio.tasks.FIRST_COMPLETED)
    Suspending a
    Suspending b
    Resuming b
    
    In : done
    Out: {
         <Task finished coro=<b() done, defined at <ipython-input-5-5ee142734d16>:8> result='B'>}
    
    In : pending
    Out: {
         <Task pending coro=<a() running at <ipython-input-5-5ee142734d16>:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x108065e58>()]>>}
    

    見たでしょう、今回は協程bしか完成していませんが、協程aはpending状態です.
    ほとんどの場合asyncioを用いる.gatherは十分で、特別なニーズがあればasyncioを選ぶことができます.wait、2つの例を挙げます.
    成功したコールバックをキャンセルしたり追加したりするためにパッケージされたTaskを入手する必要があります.COMPLETED/FIRST_EXCEPTIONが戻る
    taskを作成する3つの方法
    Taskを作成するには、このセクションのタイトルのような3つの方法があります.前の記事で述べたように、Python 3.7からより高いレベルのasyncioを統一的に使用することができます.create_task.実はcreate_taskはloopです.create_task:
    def create_task(coro):
        loop = events.get_running_loop()
        return loop.create_task(coro)
    

    loop.create_taskが受け入れるパラメータはコヒーレンスである必要があるがasyncio.ensure_futureは、コヒーレンスを受け入れるだけでなく、Futureオブジェクトまたはawaitableオブジェクトであってもよい.パラメータがコヒーレントである場合、実際には下位層がloopを使用する.create_task、Taskオブジェクトを返します.Futureオブジェクトの場合は直接戻ります.awaitableオブジェクトであればawaitというオブジェクトの_await__メソッド、もう一度ensure_を実行future、最後にTaskまたはFutureに戻ります.だからensure_futureの名前は、これがFutureオブジェクトであることを確認します.TaskはFutureサブクラスです.一般的に開発者は自分でFutureを作成する必要はありません.
    実は前に言ったasyncio.これとasyncio.gatherにはasyncioが使われていますensure_future.ほとんどのシーンで同時に実行するのはコヒーレンスなのでasyncioを直接使用します.create_taskで十分~
    shield
    続けてshieldは、キャンセル操作を隠すことができます.ここまでTaskのキャンセルを見たことがありません.例を見てみましょう
    In : loop = asyncio.get_event_loop()
    
    In : task1 = loop.create_task(a())
    
    In : task2 = loop.create_task(b())
    
    In : task1.cancel()
    Out: True
    
    In : await asyncio.gather(task1, task2)
    Suspending a
    Suspending b
    -------------------------------------------------------
    CancelledError                            Traceback (most recent call last)
    cell_name in async-def-wrapper()
    
    CancelledError:
    

    上記の例ではtask 1がキャンセルするからasyncioを用いる.gatherは結果を収集し、CancelledErrorを直接投げた.詳細はgatherがreturnをサポートしていますExceptionsパラメータ:
    In : await asyncio.gather(task1, task2, return_exceptions=True)
    Out: [concurrent.futures._base.CancelledError(), 'B']
    

    task 2はまだ完了していますが、task 1の戻り値はCancelledErrorエラー、つまりタスクがキャンセルされたことがわかります.作成後に何もキャンセルされたくない場合はasyncioを使用します.shield保護タスクは順調に完了します.
    In : task1 = asyncio.shield(a())
    
    In : task2 = loop.create_task(b())
    
    In : ts = asyncio.gather(task1, task2, return_exceptions=True)
    
    In : task1.cancel()
    Out: True
    
    In : await ts
    Suspending a
    Suspending b
    Resuming a
    Resuming b
    Out: [concurrent.futures._base.CancelledError(), 'B']
    

    結果はCancelledErrorエラーであったが,出力を見てコヒーレンスが実際に実行されたことを確認できた.注意:ここでは、asyncioに深く入り込んでいる理解エラーがあります.shieldでは再解釈し理解し、読むことをお勧めします.
    asynccontextmanager
    Pythonを知っている場合は、contextmanager、コンテキストマネージャを聞いたことがあるか、使ったことがあるかもしれません.1つのタイミングの例でその役割を理解します.
    from contextlib import contextmanager
    
    async def a():
        await asyncio.sleep(3)
        return 'A'
    
    async def b():
        await asyncio.sleep(1)
        return 'B'
    
    async def s1():
        return await asyncio.gather(a(), b())
    
    @contextmanager
    def timed(func):
        start = time.perf_counter()
        yield asyncio.run(func())
        print(f'Cost: {time.perf_counter() - start}')
    

    timed関数はcontextmanager装飾器を使って、協程の運行結果yieldを出して、実行が終わった後にまた時間を計算しました:
    In : from contextmanager import *
    
    In : with timed(s1) as rv:
    ...:     print(f'Result: {rv}')
    ...:
    Result: ['A', 'B']
    Cost: 3.0052654459999992
    

    皆さんはまず体得してください.Python 3.7にasynccontextmanager、すなわち非同期バージョンのcontextmanagerが追加され、非同期関数の実行に適しています.上記の例では、次のように変更できます.
    @asynccontextmanager
    async def async_timed(func):
        start = time.perf_counter()
        yield await func()
        print(f'Cost: {time.perf_counter() - start}')
    
    
    async def main():
        async with async_timed(s1) as rv:
            print(f'Result: {rv}')
    
    In : asyncio.run(main())
    Result: ['A', 'B']
    Cost: 3.00414147500004
    

    asyncバージョンのwithはasync withを使用します.また、yield await func()という文に注意してください.yield+await func()に相当します.
    PS:contextmanagerとasynccontextmanagerの最も良い理解方法はソースコードの注釈を見に行くことであり、拡張読書リンク2を見ることができ、また拡張読書リンク3に含まれるPRに関連するテストコード部分も理解に役立つ.