asyncio:非同期I/O、イベントサイクル、および同時ツール(継続的および新規)
21110 ワード
スムーズなPythonの本の中の協程部分のバージョンは低すぎて、しかも話すことが少ないので、今回はPython 3標準ライブラリの本の中の実例に基づいてasyncioの使用を記録することを学びます.
asyncioモジュールは、二次空腹を使用して同時アプリケーションを構築するツールを提供します.threadingモジュールはスレッドの同時化を適用し、mutilprocessingはシステムプロセスを用いて同時化を実現し、asyncioは単一スレッドの単一プロセスの方法を用いて同時化を実現し、アプリケーションの各部分は互いに協力し、最適な時点でタスクを明示的に切り替える.
asyncioが提供するフレームワークは、I/Oイベント、システムイベント、およびアプリケーションのコンテキスト切り替えを効率的に処理する最初のクラスオブジェクトであるイベントループ(event loop)を中心としています.
awaitは、実行コヒーレンスまたはfuture、taskを直接アクティブにすることができます.
しかしfutureとtaskで買うともっと多くの属性が含まれていて、
'add_done_callback', 'all_tasks', 'cancel', 'cancelled',
'current_task', 'done', 'exception', 'get_loop', 'get_stack',
'print_stack', 'remove_done_callback', 'result', 'set_exception', 'set_result'
だからfutureとtaskの機能は更に強化して、ensure_を使うことができますfutureまたはloop.create_taskは協程をtaskに装飾する
task時のfutureのサブセットは、asyncio.Futureから直接futureを作成することもできます.
現在の本人の理解では、一般的なioアプリケーションでは、マルチスレッドの使用を主とし、自分がコラボレーションを書くとき、複数のコラボレーションの間に、制御権を有効に設定することはできません.
唯一応用できるパッケージはaiohttpで、もし私が別のパッケージで協程を実現したいなら、基本的にはできませんが、協程asyncioパッケージは流暢なPython書で述べたように、ほとんどが概念とAPIを話しています.
コンシステントの譲渡制御権譲渡は、適切なパケットが多いため、一般的にはasyncio.sleepによって単一キャリア譲渡制御権をシミュレートするしかない.
后ろにもっと豊富なバッグがあって协程に协力することができることを望んで、あるいは私がいつ达人になることを待って、协程に协力できるバッグを书くことができます.
2、協力を利用してマルチタスクを完成する
先にget_を通過event_loopはデフォルトのイベントループを作成し、run_until_completeメソッドは内部のコヒーレンスを駆動し,最後にイベントループを閉じる.
コンシステントからの戻り値
れんさきょうてい
コンシステントではなくジェネレータ
Python 3.5から
@asyncio.coroutineの代わりにasync
awaitがyield fromに代わった
awaitかyield fromの方が直感的だと思います.
3、一般関数呼び出しのスケジュール
これを使用するには,イベントループをコヒーレントに伝達し,コヒーレントでイベントループの方法で呼び出す必要がある関数をアクティブにする必要がある.
イベントループに転送されるコンシステントにasyncio.seleepを設定する必要があります.そうしないと、コンシステントは制御権を譲らず、イベントループは呼び出し関数をアクティブにすることはできません.
Delayでコールバックをスケジュールする
call_soonは直接呼び出し、call_later()最初のパラメータは、秒単位で遅延したイベントに転送され、2番目のパラメータはfunctionです.
イベントループに転送されるコンシステントにasyncio.seleepを設定し、最も遅い呼び出し関数時間より大きい必要があります.そうしないと、コンシステントは制御権を譲らず、イベントループは呼び出し関数をアクティブにすることはできません.
指定したイベント内でコールバックをスケジュールする
この目的を達成するサイクル依存性は,いわゆる単調クロックであり,loop.time()で生成し,アクティブ化用call_at
4、非同期的に結果を生成する.
Futureは、まだ完了していない作業結果を示します.イベントループは、Futureオブジェクトのステータスを監視することによって完了したことを示すことができ、アプリケーションの一部が他の部分の完了を待つことができます.
私の理解と操作から見ると、futureが関数の値を取得するにはevent_loop.calll_soonは関数を呼び出して、関数の中でfutureに入って、future.set_を通じてresultメソッドはパラメータを保存します.
一般的にfutureの値はawaitで取得できます
実際に感じて、この書き方はもっと理解させられます.
Futureコールバック
これはfutureオブジェクトを介して関数にパラメータを渡すようなもので、ジェネレータのsendのようなものです.
多くの場合、awaitをyiled fromと見るともっと理解できます.
5.同時実行タスク
タスクの起動はawaitで直接起動するか、複数のタスクの間にタスクawait asyncio.sleepがあれば、他のタスクはすぐに制御権を得ることができます.
タスクは、イベントループとインタラクティブになる主な方法の1つです.タスクは、コヒーレントにパッケージ化され、コヒーレントがいつ完了するかを追跡できます.タスクはFutureのサブクラスであるため、他のスレッドはタスクを待つことができ、各タスクには結果があり、完了した後にこの結果を得ることができます.
タスクの開始
タスクを開始するには、タイムサイクル.create_を使用します.task()Taskインスタンスを作成します.
上からtask運転と未運転の状態が異なることがわかります.
タスクをキャンセル
同時に実行中のタスクをキャンセルします.
コンセンサスからタスクを作成します.
6、コンポジットと制御構造の組み合わせ
一連のコヒーレント間のリニア制御フローは、内蔵キーワードawaitで容易に管理できます.よりレプリケートされた構造では、asyncioのツールを使用してより複雑な構造を作成することができます.
複数のキャリアを待つ
wait内にタイムアウト値を設定し、このイベントpendingに未完了のコヒーレンスを保持する
コンシステントから結果を収集
バックグラウンド操作完了時に処理します.
as_を運用しましたcompletedメソッド、その完了が早くどの協程に戻るか
7、同期原語
続きを待つ
asyncioモジュールは、二次空腹を使用して同時アプリケーションを構築するツールを提供します.threadingモジュールはスレッドの同時化を適用し、mutilprocessingはシステムプロセスを用いて同時化を実現し、asyncioは単一スレッドの単一プロセスの方法を用いて同時化を実現し、アプリケーションの各部分は互いに協力し、最適な時点でタスクを明示的に切り替える.
asyncioが提供するフレームワークは、I/Oイベント、システムイベント、およびアプリケーションのコンテキスト切り替えを効率的に処理する最初のクラスオブジェクトであるイベントループ(event loop)を中心としています.
awaitは、実行コヒーレンスまたはfuture、taskを直接アクティブにすることができます.
しかしfutureとtaskで買うともっと多くの属性が含まれていて、
'add_done_callback', 'all_tasks', 'cancel', 'cancelled',
'current_task', 'done', 'exception', 'get_loop', 'get_stack',
'print_stack', 'remove_done_callback', 'result', 'set_exception', 'set_result'
だからfutureとtaskの機能は更に強化して、ensure_を使うことができますfutureまたはloop.create_taskは協程をtaskに装飾する
task時のfutureのサブセットは、asyncio.Futureから直接futureを作成することもできます.
現在の本人の理解では、一般的なioアプリケーションでは、マルチスレッドの使用を主とし、自分がコラボレーションを書くとき、複数のコラボレーションの間に、制御権を有効に設定することはできません.
唯一応用できるパッケージはaiohttpで、もし私が別のパッケージで協程を実現したいなら、基本的にはできませんが、協程asyncioパッケージは流暢なPython書で述べたように、ほとんどが概念とAPIを話しています.
コンシステントの譲渡制御権譲渡は、適切なパケットが多いため、一般的にはasyncio.sleepによって単一キャリア譲渡制御権をシミュレートするしかない.
后ろにもっと豊富なバッグがあって协程に协力することができることを望んで、あるいは私がいつ达人になることを待って、协程に协力できるバッグを书くことができます.
2、協力を利用してマルチタスクを完成する
import asyncio
async def coroutine():
print('in coroutine')
#
event_loop = asyncio.get_event_loop()
try:
print('starting coroutine')
coro = coroutine()
print('entering event loop')
#
event_loop.run_until_complete(coro)
finally:
print('closing event loop')
event_loop.close()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_coroutine.py
starting coroutine
entering event loop
in coroutine
closing event loop
Process finished with exit code 0
先にget_を通過event_loopはデフォルトのイベントループを作成し、run_until_completeメソッドは内部のコヒーレンスを駆動し,最後にイベントループを閉じる.
コンシステントからの戻り値
import asyncio
#
async def coroutine():
print('in coroutine')
return 'result'
#
event_loop = asyncio.get_event_loop()
try:
print('starting coroutine')
coro = coroutine()
print('entering event loop')
# ,
return_value = event_loop.run_until_complete(coro)
print(f'it returned: {return_value!r}')
finally:
print('closing event loop')
event_loop.close()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_coroutine_return.py
starting coroutine
entering event loop
in coroutine
it returned: 'result'
closing event loop
Process finished with exit code 0
れんさきょうてい
import asyncio
async def outer():
print('in outer')
print('waiting for result1')
# phase1()
result1 = await phase1()
print('waiting for result2')
# phase2()
result2 = await phase2(result1)
return result1, result2
async def phase1():
print('in parse1')
return 'result1'
async def phase2(arg):
print('in phase2')
return 'result2 derived from {}'.format(arg)
event_loop = asyncio.get_event_loop()
try:
return_value = event_loop.run_until_complete(outer())
print(f'return_value:{return_value!r}')
finally:
event_loop.close()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_coroutine_chain.py
in outer
waiting for result1
in parse1
waiting for result2
in phase2
return_value:('result1', 'result2 derived from result1')
Process finished with exit code 0
コンシステントではなくジェネレータ
Python 3.5から
@asyncio.coroutineの代わりにasync
awaitがyield fromに代わった
awaitかyield fromの方が直感的だと思います.
3、一般関数呼び出しのスケジュール
これを使用するには,イベントループをコヒーレントに伝達し,コヒーレントでイベントループの方法で呼び出す必要がある関数をアクティブにする必要がある.
イベントループに転送されるコンシステントにasyncio.seleepを設定する必要があります.そうしないと、コンシステントは制御権を譲らず、イベントループは呼び出し関数をアクティブにすることはできません.
import asyncio
from functools import partial
def callback(arg, *, kwarg='default'):
print(f'callback invoked with {arg} and {kwarg}')
async def main(loop):
print('registering callbacks')
# , , partial
loop.call_soon(callback, 1)
# partial
wrapped = partial(callback, kwarg='not default')
loop.call_soon(wrapped, 2)
await asyncio.sleep(.1)
event_loop = asyncio.get_event_loop()
try:
print('entering event loop')
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_call_soon.py
entering event loop
registering callbacks
callback invoked with 1 and default
callback invoked with 2 and not default
Process finished with exit code 0
Delayでコールバックをスケジュールする
call_soonは直接呼び出し、call_later()最初のパラメータは、秒単位で遅延したイベントに転送され、2番目のパラメータはfunctionです.
イベントループに転送されるコンシステントにasyncio.seleepを設定し、最も遅い呼び出し関数時間より大きい必要があります.そうしないと、コンシステントは制御権を譲らず、イベントループは呼び出し関数をアクティブにすることはできません.
import asyncio
from functools import partial
def callback(arg):
print(f'callback invoked {arg}')
async def main(loop):
print('registering callbacks')
# ,
loop.call_later(0.2, callback, 1)
loop.call_later(0.1, callback, 2)
#
await asyncio.sleep(.21)
event_loop = asyncio.get_event_loop()
try:
print('entering event loop')
event_loop.run_until_complete(main(event_loop))
finally:
print('closing event loop')
event_loop.close()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_call_later.py
entering event loop
registering callbacks
callback invoked 2
callback invoked 1
closing event loop
Process finished with exit code 0
指定したイベント内でコールバックをスケジュールする
この目的を達成するサイクル依存性は,いわゆる単調クロックであり,loop.time()で生成し,アクティブ化用call_at
import asyncio
import time
def callback(n, loop):
print(f'callback {n} invoked at {loop.time()}')
async def main(loop):
# , now 0
now = loop.time()
print(f'clock time: {time.time()}')
print(f'loop time {now}')
print('registering callbacks')
# 0.2
loop.call_at(now + .2, callback, 1, loop)
# 0.1
loop.call_at(now + .1, callback, 2, loop)
#
loop.call_soon(callback, 3, loop)
await asyncio.sleep(1)
event_loop = asyncio.get_event_loop()
try:
print('entering event loop')
event_loop.run_until_complete(main(event_loop))
finally:
print('closing event loop')
event_loop.close()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asynvio_call_at.py
entering event loop
clock time: 1579366496.336677
loop time 0.086557153
registering callbacks
callback 3 invoked at 0.086701756
callback 2 invoked at 0.189241196
callback 1 invoked at 0.29037571
closing event loop
Process finished with exit code 0
4、非同期的に結果を生成する.
Futureは、まだ完了していない作業結果を示します.イベントループは、Futureオブジェクトのステータスを監視することによって完了したことを示すことができ、アプリケーションの一部が他の部分の完了を待つことができます.
私の理解と操作から見ると、futureが関数の値を取得するにはevent_loop.calll_soonは関数を呼び出して、関数の中でfutureに入って、future.set_を通じてresultメソッドはパラメータを保存します.
import asyncio
def mark_done(future, result):
print('setting future result to {!r}'.format(result))
future.set_result(result)
event_loop = asyncio.get_event_loop()
try:
# future
all_done = asyncio.Future()
print('scheduling mark_done')
# , future
event_loop.call_soon(mark_done, all_done, 'the result')
print('entering event_loop')
# future , 。
result = event_loop.run_until_complete(all_done)
print('returned result: {!r}'.format(result))
finally:
print('closing event_loop')
event_loop.close()
print('future result: {!r}'.format(all_done.result()))
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_future_event_loop.py
scheduling mark_done
entering event_loop
setting future result to 'the result'
returned result: 'the result'
closing event_loop
future result: 'the result'
Process finished with exit code 0
一般的にfutureの値はawaitで取得できます
実際に感じて、この書き方はもっと理解させられます.
import asyncio
def mark_down(future, result):
print('setting future result to {!r}'.format(result))
future.set_result(result)
async def main(loop):
# , , future, 。
all_done = asyncio.Future()
print('scheduling mark done')
loop.call_soon(mark_down, all_done, 'the result')
# await future
result = await all_done
print('returned result: {!r}'.format(result))
return result
event_loop = asyncio.get_event_loop()
try:
# , 。
result = event_loop.run_until_complete(main(event_loop))
print(f'last result is {result!r}')
finally:
event_loop.close()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_future_await.py
scheduling mark done
setting future result to 'the result'
returned result: 'the result'
last result is 'the result'
Process finished with exit code 0
Futureコールバック
これはfutureオブジェクトを介して関数にパラメータを渡すようなもので、ジェネレータのsendのようなものです.
多くの場合、awaitをyiled fromと見るともっと理解できます.
import asyncio
import functools
def callback(future, n):
print('{}: future done: {}'.format(n, future.result()))
async def register_callbacks(all_done):
print('register callbacks on future')
all_done.add_done_callback(functools.partial(callback, n=1))
all_done.add_done_callback(functools.partial(callback, n=2))
async def main(all_done):
#
await register_callbacks(all_done)
print('setting result of future')
#
all_done.set_result('the result')
event_loop = asyncio.get_event_loop()
try:
all_done = asyncio.Future()
#
event_loop.run_until_complete(main(all_done))
finally:
event_loop.close()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_future_callback.py
register callbacks on future
setting result of future
1: future done: the result
2: future done: the result
Process finished with exit code 0
5.同時実行タスク
タスクの起動はawaitで直接起動するか、複数のタスクの間にタスクawait asyncio.sleepがあれば、他のタスクはすぐに制御権を得ることができます.
タスクは、イベントループとインタラクティブになる主な方法の1つです.タスクは、コヒーレントにパッケージ化され、コヒーレントがいつ完了するかを追跡できます.タスクはFutureのサブクラスであるため、他のスレッドはタスクを待つことができ、各タスクには結果があり、完了した後にこの結果を得ることができます.
タスクの開始
タスクを開始するには、タイムサイクル.create_を使用します.task()Taskインスタンスを作成します.
import asyncio
async def task_func():
print('in task_func')
return 'the result'
async def main(loop):
print('creating task')
#
task = loop.create_task(task_func())
# task = [loop.create_task(task_func()) for i in range(1000)]
print('waiting for {!r}'.format(task))
# , , await
return_value = await task
print('task_completed {!r}'.format(task))
print('return value: {!r}'.format(return_value))
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_create_task.py
creating task
waiting for >
in task_func
task_completed result='the result'>
return value: 'the result'
Process finished with exit code 0
上からtask運転と未運転の状態が異なることがわかります.
タスクをキャンセル
import asyncio
async def task_func():
print('in task_func')
return 'the result'
async def main(loop):
print('creating task')
task = loop.create_task(task_func())
#
print('canceling task')
task.cancel()
print('canceled task {!r}'.format(task))
try:
await task
#
except asyncio.CancelledError:
print('caught error from canceled task')
else:
print('task result: {!r}'.format(task.result()))
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_cancel_task.py
creating task
canceling task
canceled task >
caught error from canceled task
Process finished with exit code 0
同時に実行中のタスクをキャンセルします.
import asyncio
async def task_func():
print('in task_func, sleeping')
try:
# , loop.call_soon
await asyncio.sleep(1)
except asyncio.CancelledError:
print('task_func was canceled')
raise
return 'the result'
def task_canceller(t):
print('in task_canceller')
t.cancel()
# print(t)
print('canceled the task')
async def main(loop):
print('create task')
task = loop.create_task(task_func())
# , 。
loop.call_soon(task_canceller, task)
print(f'task status is: {task} ')
try:
# task, , call_soon
res = await task
print(res,'is here')
except asyncio.CancelledError:
print('main() also sees task as canceled')
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(event_loop))
finally:
event_loop.close()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_cancel_task2.py
create task
task status is: >
in task_func, sleeping
in task_canceller
canceled the task
task_func was canceled
main() also sees task as canceled
Process finished with exit code 0
コンセンサスからタスクを作成します.
import asyncio
# await , 。
async def wrapper():
print('wrapped')
return 'result'
async def inner(task):
print('inner: starting')
print('inner:waiting for {!r}'.format(task))
# task
result = await task
print('inner: task returned {!r}'.format(result))
return result
async def starter():
print('starter: creating task')
#
task = asyncio.ensure_future(wrapper())
print('starter:waiting for inner')
# inner
result = await inner(task)
print('starter: inner returned')
return result
event_loop = asyncio.get_event_loop()
try:
print('entering event loop')
result = event_loop.run_until_complete(starter())
print('last res is {}'.format(result))
finally:
event_loop.close()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_ensure_future.py
entering event loop
starter: creating task
starter:waiting for inner
inner: starting
inner:waiting for >
wrapped
inner: task returned 'result'
starter: inner returned
last res is result
Process finished with exit code 0
6、コンポジットと制御構造の組み合わせ
一連のコヒーレント間のリニア制御フローは、内蔵キーワードawaitで容易に管理できます.よりレプリケートされた構造では、asyncioのツールを使用してより複雑な構造を作成することができます.
複数のキャリアを待つ
import asyncio
async def phase(i):
print('in phase {}.'.format(i))
await asyncio.sleep(0.1 * i)
print('done with phase {}'.format(i))
return 'phase {} result'.format(i)
async def main(num_phase):
print('starting main')
#
phases = [
phase(i)
for i in range(num_phase)
]
print('wait for phases to complete')
# ,complete ,pending
completed, pending = await asyncio.wait(phases)
#
results = [t.result() for t in completed]
print('results: {!r}'.format(results))
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(3))
finally:
event_loop.close()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_wait.py
starting main
wait for phases to complete
in phase 1.
in phase 2.
in phase 0.
done with phase 0
done with phase 1
done with phase 2
results: ['phase 2 result', 'phase 0 result', 'phase 1 result']
Process finished with exit code 0
wait内にタイムアウト値を設定し、このイベントpendingに未完了のコヒーレンスを保持する
import asyncio
async def phase(i):
print('in phase {}'.format(i))
try:
await asyncio.sleep(0.1 * i)
except asyncio.CancelledError:
print('phase {} canceled'.format(i))
raise
else:
print('done with phase {}'.format(i))
return 'phase {} result'.format(i)
async def main(num_phases):
print('starting main')
phases = [
phase(i)
for i in range(num_phases)
]
print('waiting 0.1 for phase to complete')
# , pending
completed, pending = await asyncio.wait(phases, timeout=0.1)
print('{} completed and {} pending'.format(len(completed), len(pending)))
#
if pending:
print('canceling tasks')
[t.cancel() for t in pending]
print('exiting main')
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(3))
finally:
event_loop.close()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/asyncio_wait_timeout.py
starting main
waiting 0.1 for phase to complete
in phase 1
in phase 2
in phase 0
done with phase 0
1 completed and 2 pending
canceling tasks
exiting main
phase 1 canceled
phase 2 canceled
Process finished with exit code 0
コンシステントから結果を収集
import asyncio
async def phase1():
print('in phase1')
await asyncio.sleep(2)
print('done with phase1')
return 'phases1 result'
async def phase2():
print('in phase2')
await asyncio.sleep(1)
print('done with phase2')
return 'phase2 result'
async def main():
print('starting main')
print('waiting for phases to complete')
# ,
result = await asyncio.gather(phase1(), phase2())
print('results: {!r}'.format(result))
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main())
finally:
event_loop.close()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/async_gather.py
starting main
waiting for phases to complete
in phase1
in phase2
done with phase2
done with phase1
results: ['phases1 result', 'phase2 result']
Process finished with exit code 0
バックグラウンド操作完了時に処理します.
as_を運用しましたcompletedメソッド、その完了が早くどの協程に戻るか
import asyncio
async def phase(i):
print('in phase {}'.format(i))
await asyncio.sleep(.5 - (.1 * i))
print('done with phase {}'.format(i))
return 'phase {} result'.format(i)
async def main(num_phases):
print('starting main')
phases = [
phase(i)
for i in range(num_phases)
]
print('waiting for phases to completed')
results = []
#
for next_to_complete in asyncio.as_completed(phases):
# result ,future
answer = await next_to_complete
print('received answer {!r}'.format(answer))
results.append(answer)
print('results: {!r}'.format(results))
return results
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(3))
finally:
event_loop.close()
/usr/local/bin/python3.7 /Users/shijianzhong/study/t_asyncio/async_gather.py
starting main
waiting for phases to complete
in phase1
in phase2
done with phase2
done with phase1
results: ['phases1 result', 'phase2 result']
Process finished with exit code 0
7、同期原語
続きを待つ