asyncio:非同期I/O、イベントサイクル、および同時ツール(継続的および新規)

21110 ワード

スムーズなPythonの本の中の協程部分のバージョンは低すぎて、しかも話すことが少ないので、今回はPython 3標準ライブラリの本の中の実例に基づいてasyncioの使用を記録することを学びます.
 
asyncioモジュールは、二次空腹を使用して同時アプリケーションを構築するツールを提供します.threadingモジュールはスレッドの同時化を適用し、mutilprocessingはシステムプロセスを用いて同時化を実現し、asyncioは単一スレッドの単一プロセスの方法を用いて同時化を実現し、アプリケーションの各部分は互いに協力し、最適な時点でタスクを明示的に切り替える.
 
asyncioが提供するフレームワークは、I/Oイベント、システムイベント、およびアプリケーションのコンテキスト切り替えを効率的に処理する最初のクラスオブジェクトであるイベントループ(event loop)を中心としています.
 
awaitは、実行コヒーレンスまたはfuture、taskを直接アクティブにすることができます.
しかしfutureとtaskで買うともっと多くの属性が含まれていて、
'add_done_callback', 'all_tasks', 'cancel', 'cancelled',
'current_task', 'done', 'exception', 'get_loop', 'get_stack',
'print_stack', 'remove_done_callback', 'result', 'set_exception', 'set_result'
だからfutureとtaskの機能は更に強化して、ensure_を使うことができますfutureまたはloop.create_taskは協程をtaskに装飾する
task時のfutureのサブセットは、asyncio.Futureから直接futureを作成することもできます.
 
 
現在の本人の理解では、一般的なioアプリケーションでは、マルチスレッドの使用を主とし、自分がコラボレーションを書くとき、複数のコラボレーションの間に、制御権を有効に設定することはできません.
唯一応用できるパッケージはaiohttpで、もし私が別のパッケージで協程を実現したいなら、基本的にはできませんが、協程asyncioパッケージは流暢なPython書で述べたように、ほとんどが概念とAPIを話しています.
コンシステントの譲渡制御権譲渡は、適切なパケットが多いため、一般的にはasyncio.sleepによって単一キャリア譲渡制御権をシミュレートするしかない.
后ろにもっと豊富なバッグがあって协程に协力することができることを望んで、あるいは私がいつ达人になることを待って、协程に协力できるバッグを书くことができます.
 
2、協力を利用してマルチタスクを完成する
import asyncio


async def coroutine():
    print('in coroutine')

#       
event_loop = asyncio.get_event_loop()

try:
    print('starting coroutine')
    coro = coroutine()
    print('entering event loop')
    #     
    event_loop.run_until_complete(coro)
finally:
    print('closing event loop')
    event_loop.close()

 
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_coroutine.py
starting coroutine
entering event loop
in coroutine
closing event loop

Process finished with exit code 0

 先にget_を通過event_loopはデフォルトのイベントループを作成し、run_until_completeメソッドは内部のコヒーレンスを駆動し,最後にイベントループを閉じる.
 
コンシステントからの戻り値
import asyncio


#     
async def coroutine():
    print('in coroutine')
    return 'result'

#       
event_loop = asyncio.get_event_loop()

try:
    print('starting coroutine')
    coro = coroutine()
    print('entering event loop')
    #     ,   
    return_value = event_loop.run_until_complete(coro)
    print(f'it returned: {return_value!r}')
finally:
    print('closing event loop')
    event_loop.close()

 
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_coroutine_return.py
starting coroutine
entering event loop
in coroutine
it returned: 'result'
closing event loop

Process finished with exit code 0

 
れんさきょうてい
import asyncio


async def outer():
    print('in outer')
    print('waiting for result1')
    #   phase1()      
    result1 = await phase1()
    print('waiting for result2')
    #   phase2()      
    result2 = await phase2(result1)
    return result1, result2


async def phase1():
    print('in parse1')
    return 'result1'

async def phase2(arg):
    print('in phase2')
    return 'result2 derived from {}'.format(arg)


event_loop = asyncio.get_event_loop()
try:
    return_value = event_loop.run_until_complete(outer())
    print(f'return_value:{return_value!r}')
finally:
    event_loop.close()

 
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_coroutine_chain.py
in outer
waiting for result1
in parse1
waiting for result2
in phase2
return_value:('result1', 'result2 derived from result1')

Process finished with exit code 0

 
コンシステントではなくジェネレータ
Python 3.5から
@asyncio.coroutineの代わりにasync
awaitがyield fromに代わった
awaitかyield fromの方が直感的だと思います.
 
3、一般関数呼び出しのスケジュール
これを使用するには,イベントループをコヒーレントに伝達し,コヒーレントでイベントループの方法で呼び出す必要がある関数をアクティブにする必要がある.
イベントループに転送されるコンシステントにasyncio.seleepを設定する必要があります.そうしないと、コンシステントは制御権を譲らず、イベントループは呼び出し関数をアクティブにすることはできません.
import asyncio
from functools import partial


def callback(arg, *, kwarg='default'):
    print(f'callback invoked with {arg} and {kwarg}')


async def main(loop):
    print('registering callbacks')
    #     ,        ,       partial
    loop.call_soon(callback, 1)
    #   partial       
    wrapped = partial(callback, kwarg='not default')
    loop.call_soon(wrapped, 2)

    await asyncio.sleep(.1)


event_loop = asyncio.get_event_loop()
try:
    print('entering event loop')
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

 
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_call_soon.py
entering event loop
registering callbacks
callback invoked with 1 and default
callback invoked with 2 and not default

Process finished with exit code 0

 
Delayでコールバックをスケジュールする
call_soonは直接呼び出し、call_later()最初のパラメータは、秒単位で遅延したイベントに転送され、2番目のパラメータはfunctionです.
イベントループに転送されるコンシステントにasyncio.seleepを設定し、最も遅い呼び出し関数時間より大きい必要があります.そうしないと、コンシステントは制御権を譲らず、イベントループは呼び出し関数をアクティブにすることはできません.
 
import asyncio
from functools import partial


def callback(arg):
    print(f'callback invoked {arg}')


async def main(loop):
    print('registering callbacks')
    #     ,         
    loop.call_later(0.2, callback, 1)
    loop.call_later(0.1, callback, 2)
    #      
    await asyncio.sleep(.21)


event_loop = asyncio.get_event_loop()
try:
    print('entering event loop')
    event_loop.run_until_complete(main(event_loop))
finally:
    print('closing event loop')
    event_loop.close()

 
 
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_call_later.py
entering event loop
registering callbacks
callback invoked 2
callback invoked 1
closing event loop

Process finished with exit code 0

 
指定したイベント内でコールバックをスケジュールする
この目的を達成するサイクル依存性は,いわゆる単調クロックであり,loop.time()で生成し,アクティブ化用call_at
import asyncio
import time


def callback(n, loop):
    print(f'callback {n} invoked at {loop.time()}')


async def main(loop):
    #       , now  0   
    now = loop.time()
    print(f'clock time: {time.time()}')
    print(f'loop  time {now}')
    print('registering callbacks')
    #  0.2 
    loop.call_at(now + .2, callback, 1, loop)
    #  0.1  
    loop.call_at(now + .1, callback, 2, loop)
    #     
    loop.call_soon(callback, 3, loop)

    await asyncio.sleep(1)


event_loop = asyncio.get_event_loop()
try:
    print('entering event loop')
    event_loop.run_until_complete(main(event_loop))
finally:
    print('closing event loop')
    event_loop.close()

 
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asynvio_call_at.py
entering event loop
clock time: 1579366496.336677
loop  time 0.086557153
registering callbacks
callback 3 invoked at 0.086701756
callback 2 invoked at 0.189241196
callback 1 invoked at 0.29037571
closing event loop

Process finished with exit code 0

 
4、非同期的に結果を生成する.
Futureは、まだ完了していない作業結果を示します.イベントループは、Futureオブジェクトのステータスを監視することによって完了したことを示すことができ、アプリケーションの一部が他の部分の完了を待つことができます.
 
私の理解と操作から見ると、futureが関数の値を取得するにはevent_loop.calll_soonは関数を呼び出して、関数の中でfutureに入って、future.set_を通じてresultメソッドはパラメータを保存します.
import asyncio


def mark_done(future, result):
    print('setting future result to {!r}'.format(result))
    future.set_result(result)

event_loop = asyncio.get_event_loop()
try:
    #   future  
    all_done = asyncio.Future()
    print('scheduling mark_done')
    #     ,     future
    event_loop.call_soon(mark_done, all_done, 'the result')

    print('entering event_loop')
    #   future  ,    。
    result = event_loop.run_until_complete(all_done)
    print('returned result: {!r}'.format(result))
finally:
    print('closing event_loop')
    event_loop.close()
print('future result: {!r}'.format(all_done.result()))

 
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_future_event_loop.py
scheduling mark_done
entering event_loop
setting future result to 'the result'
returned result: 'the result'
closing event_loop
future result: 'the result'

Process finished with exit code 0

 一般的にfutureの値はawaitで取得できます
実際に感じて、この書き方はもっと理解させられます.
import asyncio


def mark_down(future, result):
    print('setting future result to {!r}'.format(result))
    future.set_result(result)

async def main(loop):
    #         ,    ,      future,     。
    all_done = asyncio.Future()
    print('scheduling mark done')
    loop.call_soon(mark_down, all_done, 'the result')
    #   await  future  
    result = await all_done
    print('returned result: {!r}'.format(result))
    return result


event_loop = asyncio.get_event_loop()
try:
    #     ,         。
    result = event_loop.run_until_complete(main(event_loop))
    print(f'last result is {result!r}')
finally:
    event_loop.close()

 
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_future_await.py
scheduling mark done
setting future result to 'the result'
returned result: 'the result'
last result is 'the result'

Process finished with exit code 0

 
Futureコールバック
これはfutureオブジェクトを介して関数にパラメータを渡すようなもので、ジェネレータのsendのようなものです.
多くの場合、awaitをyiled fromと見るともっと理解できます.
import asyncio
import functools


def callback(future, n):
    print('{}: future done: {}'.format(n, future.result()))


async def register_callbacks(all_done):
    print('register callbacks on future')
    all_done.add_done_callback(functools.partial(callback, n=1))
    all_done.add_done_callback(functools.partial(callback, n=2))

async def main(all_done):
    #       
    await register_callbacks(all_done)
    print('setting result of future')
    #     
    all_done.set_result('the result')

event_loop = asyncio.get_event_loop()
try:
    all_done = asyncio.Future()
    #     
    event_loop.run_until_complete(main(all_done))
finally:
    event_loop.close()

 
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_future_callback.py
register callbacks on future
setting result of future
1: future done: the result
2: future done: the result

Process finished with exit code 0

 
5.同時実行タスク
タスクの起動はawaitで直接起動するか、複数のタスクの間にタスクawait asyncio.sleepがあれば、他のタスクはすぐに制御権を得ることができます.
タスクは、イベントループとインタラクティブになる主な方法の1つです.タスクは、コヒーレントにパッケージ化され、コヒーレントがいつ完了するかを追跡できます.タスクはFutureのサブクラスであるため、他のスレッドはタスクを待つことができ、各タスクには結果があり、完了した後にこの結果を得ることができます.
 
タスクの開始
タスクを開始するには、タイムサイクル.create_を使用します.task()Taskインスタンスを作成します.
import asyncio


async def task_func():
    print('in task_func')
    return 'the result'

async def main(loop):
    print('creating task')
    #       
    task = loop.create_task(task_func())
    # task = [loop.create_task(task_func()) for i in range(1000)]
    print('waiting for {!r}'.format(task))
    #       ,      ,   await     
    return_value = await task
    print('task_completed {!r}'.format(task))
    print('return value: {!r}'.format(return_value))


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

 
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_create_task.py
creating task
waiting for >
in task_func
task_completed  result='the result'>
return value: 'the result'

Process finished with exit code 0

 上からtask運転と未運転の状態が異なることがわかります.
 
タスクをキャンセル
import asyncio


async def task_func():
    print('in task_func')
    return 'the result'

async def main(loop):
    print('creating task')
    task = loop.create_task(task_func())
    #     
    print('canceling task')
    task.cancel()

    print('canceled task {!r}'.format(task))
    try:
        await task
    #     
    except asyncio.CancelledError:
        print('caught error from canceled task')
    else:
        print('task result: {!r}'.format(task.result()))

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

 
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_cancel_task.py
creating task
canceling task
canceled task >
caught error from canceled task

Process finished with exit code 0

 
同時に実行中のタスクをキャンセルします.
import asyncio


async def task_func():
    print('in task_func, sleeping')
    try:
        #      ,  loop.call_soon
        await asyncio.sleep(1)
    except asyncio.CancelledError:
        print('task_func was canceled')
        raise
    return 'the result'


def task_canceller(t):
    print('in task_canceller')
    t.cancel()
    # print(t)
    print('canceled the task')


async def main(loop):
    print('create task')
    task = loop.create_task(task_func())
    #              ,       。
    loop.call_soon(task_canceller, task)
    print(f'task status is: {task} ')
    try:
        #    task,     , call_soon
        res = await task
        print(res,'is here')
    except asyncio.CancelledError:
        print('main() also sees task as canceled')


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

 
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_cancel_task2.py
create task
task status is: > 
in task_func, sleeping
in task_canceller
canceled the task
task_func was canceled
main() also sees task as canceled

Process finished with exit code 0

 
コンセンサスからタスクを作成します.
import asyncio

# await        ,       。

async def wrapper():
    print('wrapped')
    return 'result'


async def inner(task):
    print('inner: starting')
    print('inner:waiting for {!r}'.format(task))
    #   task  
    result = await task
    print('inner: task returned {!r}'.format(result))
    return result


async def starter():
    print('starter: creating task')
    #     
    task = asyncio.ensure_future(wrapper())
    print('starter:waiting for inner')
    #   inner  
    result = await inner(task)
    print('starter: inner returned')
    return result
event_loop = asyncio.get_event_loop()
try:
    print('entering event loop')
    result = event_loop.run_until_complete(starter())
    print('last res is {}'.format(result))
finally:
    event_loop.close()

 
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_ensure_future.py
entering event loop
starter: creating task
starter:waiting for inner
inner: starting
inner:waiting for >
wrapped
inner: task returned 'result'
starter: inner returned
last res is result

Process finished with exit code 0

 
6、コンポジットと制御構造の組み合わせ
一連のコヒーレント間のリニア制御フローは、内蔵キーワードawaitで容易に管理できます.よりレプリケートされた構造では、asyncioのツールを使用してより複雑な構造を作成することができます.
 
複数のキャリアを待つ
import asyncio


async def phase(i):
    print('in phase {}.'.format(i))
    await asyncio.sleep(0.1 * i)
    print('done with phase {}'.format(i))
    return 'phase {} result'.format(i)

async def main(num_phase):
    print('starting main')
    #     
    phases = [
        phase(i)
        for i in range(num_phase)
    ]
    print('wait for phases to complete')
    #       ,complete    ,pending     
    completed, pending = await asyncio.wait(phases)
    #           
    results = [t.result() for t in completed]
    print('results: {!r}'.format(results))

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(3))
finally:
    event_loop.close()

 
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_wait.py
starting main
wait for phases to complete
in phase 1.
in phase 2.
in phase 0.
done with phase 0
done with phase 1
done with phase 2
results: ['phase 2 result', 'phase 0 result', 'phase 1 result']

Process finished with exit code 0

 
wait内にタイムアウト値を設定し、このイベントpendingに未完了のコヒーレンスを保持する
import asyncio


async def phase(i):
    print('in phase {}'.format(i))
    try:
        await asyncio.sleep(0.1 * i)
    except asyncio.CancelledError:
        print('phase {} canceled'.format(i))
        raise
    else:
        print('done with phase {}'.format(i))
        return 'phase {} result'.format(i)


async def main(num_phases):
    print('starting main')
    phases = [
        phase(i)
        for i in range(num_phases)
    ]
    print('waiting 0.1 for phase to complete')
    #       ,        pending
    completed, pending = await asyncio.wait(phases, timeout=0.1)

    print('{} completed and {} pending'.format(len(completed), len(pending)))
    #           
    if pending:
        print('canceling tasks')
    [t.cancel() for t in pending]
    print('exiting main')


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(3))
finally:
    event_loop.close()

 
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_wait_timeout.py
starting main
waiting 0.1 for phase to complete
in phase 1
in phase 2
in phase 0
done with phase 0
1 completed and 2 pending
canceling tasks
exiting main
phase 1 canceled
phase 2 canceled

Process finished with exit code 0

 
コンシステントから結果を収集
import asyncio


async def phase1():
    print('in phase1')
    await asyncio.sleep(2)
    print('done with phase1')
    return 'phases1 result'


async def phase2():
    print('in phase2')
    await asyncio.sleep(1)
    print('done with phase2')
    return 'phase2 result'

async def main():
    print('starting main')
    print('waiting for phases to complete')
    #          ,           
    result = await asyncio.gather(phase1(), phase2())
    print('results: {!r}'.format(result))


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main())
finally:
    event_loop.close()

 
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/async_gather.py
starting main
waiting for phases to complete
in phase1
in phase2
done with phase2
done with phase1
results: ['phases1 result', 'phase2 result']

Process finished with exit code 0

 
バックグラウンド操作完了時に処理します.
as_を運用しましたcompletedメソッド、その完了が早くどの協程に戻るか
import asyncio

async def phase(i):
    print('in phase {}'.format(i))
    await asyncio.sleep(.5 - (.1 * i))
    print('done with phase {}'.format(i))
    return 'phase {} result'.format(i)


async  def main(num_phases):
    print('starting main')
    phases = [
        phase(i)
        for i in range(num_phases)
    ]
    print('waiting for phases to completed')
    results = []
    #             
    for next_to_complete in asyncio.as_completed(phases):
        #     result  ,future  
        answer = await next_to_complete
        print('received answer {!r}'.format(answer))
        results.append(answer)
    print('results: {!r}'.format(results))
    return results

event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(3))
finally:
    event_loop.close()

 
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/async_gather.py
starting main
waiting for phases to complete
in phase1
in phase2
done with phase2
done with phase1
results: ['phases1 result', 'phase2 result']

Process finished with exit code 0

 
7、同期原語
 
 
続きを待つ