マルチスレッドとのコラボレーションを共有します

14548 ワード

概要
シーンの実現方法のまとめ1.シーン
非同期爬虫類を書くと,多くのリクエストがわけがわからずタイムアウトしていることが分かった.
Webページの解析に時間がかかりすぎて、一部のリクエストが予定時間を超えたためだ.
根本的には、asyncioの協程は非占有式だからだ.協程は自発的に制御権を渡さなければ、ずっと実行されます.
1つのスレッドに時間がかかりすぎると、他のスレッドがタイムアウトして停止する可能性があります.
簡単なプログラムで実験してみましょう.
import asyncio
import time


async def long_calc():
    print('long calc start')
    time.sleep(3)
    print('long calc end')


async def waiting_task(i):
    print(f'waiting task {i} start')
    try:
        await asyncio.wait_for(asyncio.sleep(1), 1.5)
    except asyncio.TimeoutError:
        print(f'waiting task {i} timeout')
    else:
        print(f'waiting task {i} end')


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    coros = [long_calc()]
    coros.extend(waiting_task(x) for x in range(3))
    loop.run_until_complete(asyncio.wait(coros))

long_calcは高消費時間の同期処理をシミュレートする.waiting_taskシミュレーションは時間制限のある非同期処理であり,時間を1秒,時間を1.5秒とした.
3つの非同期、1つの同期を実行すると、次のようになります.
waiting task 0 start
waiting task 1 start
long calc start
long calc end
waiting task 2 start
waiting task 0 timeout
waiting task 1 timeout
waiting task 2 end

long_calc起動前のwaiting_taskスーパータイムズエラー、long_calc完了後のwaiting_taskは正常に終了しました.
明らかに、同期タスクが多すぎるのは、この場合と同じです.
非同期爬虫類のシーンでは,解析が多すぎるか長すぎると要求がタイムアウトする.
合理的なアーキテクチャ設計はもちろんこの衝突を避けることができ、ここでは別の解決方法を提供します.
2.実現
あまり話さないで、まずコードを見てください.
import asyncio
import time
import threading
import datetime


def print_with_time(fmt):
    print(str(datetime.datetime.now()) + ': ' + fmt)


async def long_calc():
    print_with_time('long calc start')
    time.sleep(8)
    print_with_time('long calc end')


async def waiting_task(i):
    print_with_time(f'waiting task {i} start')
    try:
        await asyncio.wait_for(asyncio.sleep(3), 5)
    except asyncio.TimeoutError:
        print_with_time(f'waiting task {i} timeout')
    else:
        print_with_time(f'waiting task {i} end')


if __name__ == '__main__':
    sub_loop = asyncio.new_event_loop()
    thread = threading.Thread(target=sub_loop.run_forever)
    thread.start()

    loop = asyncio.get_event_loop()
    task = loop.create_task(long_calc())
    futs = [asyncio.run_coroutine_threadsafe(waiting_task(x), loop=sub_loop) for x in range(3)]
    futs = [asyncio.wrap_future(f, loop=loop) for f in futs]

    loop.run_until_complete(asyncio.wait([task, *futs]))

    sub_loop.call_soon_threadsafe(sub_loop.stop)
    thread.join()

またlong_calcとwaiting_taskを例にとると,時間調整を行い,同期に8秒,非同期に3秒,時限5秒を費やした.
スレッドを作成し、付属イベントループを実行し、asyncioのスレッドが安全な方法で非同期タスクを追加します.
非同期タスクを付属イベントサイクルで実行し、同期タスクをメインサイクルで実行します.マルチスレッドはシステムによって制御され、プリエンプト式です.2つのスレッドが互いに邪魔されず、タイムアウトの問題を回避していることは明らかです.(メモリとCPUの消費量は高くなりますが)
run_からuntil_completeの行から,プライマリループは付属ループでタスクが完了した情報を受信できることが分かった.
最後にstopメソッドcallを付属サイクルに入れ,サブスレッドの終了を待つ.
実行結果:
2018-06-26 21:37:50.957276: waiting task 0 start
2018-06-26 21:37:50.958276: long calc start
2018-06-26 21:37:50.958276: waiting task 1 start
2018-06-26 21:37:50.958276: waiting task 2 start
2018-06-26 21:37:53.958448: waiting task 0 end
2018-06-26 21:37:53.958448: waiting task 2 end
2018-06-26 21:37:53.958448: waiting task 1 end
2018-06-26 21:37:58.958734: long calc end

完璧な調和.
3.方法
この実装はasyncioに基づく2つの方法である.
def run_coroutine_threadsafe(coro, loop)

def wrap_future(future, *, loop=None)

まず最初を見てください.
def run_coroutine_threadsafe(coro, loop):
    """Submit a coroutine object to a given event loop.

    Return a concurrent.futures.Future to access the result.
    """
    if not coroutines.iscoroutine(coro):
        raise TypeError('A coroutine object is required')
    future = concurrent.futures.Future()

    def callback():
        try:
            futures._chain_future(ensure_future(coro, loop=loop), future)
        except Exception as exc:
            if future.set_running_or_notify_cancel():
                future.set_exception(exc)
            raise

    loop.call_soon_threadsafe(callback)
    return future

指定されたイベントループにコヒーレントをコミットし、スレッドのセキュリティを保証します.
第一歩:タイプチェック(Pythonが遅いのは...)
ステップ2:Futureを新規作成します.このFutureはasyncioのFutureとは異なり,スレッドが安全であり,concurrentのexecutor実装にも用いられる.
第3歩:閉包関数、協程を循環のfutureに包装して、更にスレッドの安全なfutureと接続します.(???)
ステップ4:閉パッケージ関数callをループし、スレッドの安全なfutureを返します.
疑問点があり、しばらく置いてあります.次を見てください.
def wrap_future(future, *, loop=None):
    """Wrap concurrent.futures.Future object."""
    if isfuture(future):
        return future
    assert isinstance(future, concurrent.futures.Future), \
        'concurrent.futures.Future is expected, got {!r}'.format(future)
    if loop is None:
        loop = events.get_event_loop()
    new_future = loop.create_future()
    _chain_future(future, new_future)
    return new_future

タイプチェック、ループを決定し、ループのfutureを作成し、2つのfuture(?)を接続します.新しいfutureを返します.
私たちは再び会った.chain_future、ソースコードを見てみましょう.
def _chain_future(source, destination):
    """Chain two futures so that when one completes, so does the other.

    The result (or exception) of source will be copied to destination.
    If destination is cancelled, source gets cancelled too.
    Compatible with both asyncio.Future and concurrent.futures.Future.
    """
    if not isfuture(source) and not isinstance(source,
                                               concurrent.futures.Future):
        raise TypeError('A future is required for source argument')
    if not isfuture(destination) and not isinstance(destination,
                                                    concurrent.futures.Future):
        raise TypeError('A future is required for destination argument')
    source_loop = source._loop if isfuture(source) else None
    dest_loop = destination._loop if isfuture(destination) else None

    def _set_state(future, other):
        if isfuture(future):
            _copy_future_state(other, future)
        else:
            _set_concurrent_future_state(future, other)

    def _call_check_cancel(destination):
        if destination.cancelled():
            if source_loop is None or source_loop is dest_loop:
                source.cancel()
            else:
                source_loop.call_soon_threadsafe(source.cancel)

    def _call_set_state(source):
        if dest_loop is None or dest_loop is source_loop:
            _set_state(destination, source)
        else:
            dest_loop.call_soon_threadsafe(_set_state, destination, source)

    destination.add_done_callback(_call_check_cancel)
    source.add_done_callback(_call_set_state)

2つのfutureに対して統一的な処理を行い、srcが完了するとdstも完了し、スレッドが安全になる.
これで分かります.付属サイクル内には非同期タスクの状態を表すfutureがあり、メインスレッドにはスレッドセキュリティのfutureが残り、付属サイクル内のfutureに接続され、最後にメインサイクルにfutureが作成され、スレッドセキュリティのfutureに接続されます.
メインサイクルではawaitというfutureを同じサイクルで使用することができますが、lockとtimeoutメカニズムをサイクル間で使用することはできません.
4.まとめ
このようなシーンでは、ほとんどの場合、スレッドプールexecutorを使用することができます.
しかし、マルチレベルで抽出し、解析して要求し、結果を得た後、上位ページの情報を加えてItemに組み立てるには、複数回の関数を呼び出し、パラメータの山を伝え、コヒーレントな役割ドメインの特性を利用することはできません.
上記の方法を用いると,比較的快適な書き方を振り回すことができ,主サイクル+付属サイクルのフレームワークは抽象的で多重化できる.
もちろん、これは必ずしも生産で実用的な方法ではありませんが、これを明らかにすると、asyncioに対する理解が深まるに違いありません.
Python学習交流群:834179111、群には多くの学習資料がある.皆さん、交流学習へようこそ.