Python同時連携asyncio(7)


py 2およびpy 3の初期バージョンでは、pythonコラボレーションの主流の実装方法はgeventモジュールを使用することである.
asyncioはpython 3にあります.4後にpythonに組み込まれ、pythonのコラボレーション作成がより便利になります.
Python 3.5はasyncとawaitの2つのキーワードを追加し、それぞれasyncio.coroutineyield fromを置き換えるために使用されます.これにより、コンストラクションはジェネレータタイプではなく、新しい構文になります.イベントループとコヒーレンスの導入により、高負荷でのプログラムのI/O性能を大幅に向上させることができます.このほか、async with(非同期コンテキスト管理)、async for(非同期反復器)構文も追加されました.特に、新しくリリースされたPython 3.6では、やっと非同期ジェネレータが使えるようになりましたasyncioのプログラミングモデルはメッセージループである.asyncioモジュールからEventLoopの参照を直接取得し、実行する必要があるコヒーレンスをEventLoopに投げて実行し、非同期IOを実現した.
1,asyncio基本用法
同期/非同期の概念:
同期:トランザクションを完了するロジックです.最初のトランザクションを実行します.ブロックされている場合は、トランザクションが完了するまで待機し、2番目のトランザクションを実行し、順次実行します.
非同期:同期とは対照的で、非同期とは、呼び出されたトランザクションを処理した後、このトランザクションの処理結果を待つことなく、2番目のトランザクションを直接処理し、ステータス、通知、コールバックによって呼び出し者に処理結果を通知することを意味します.
コード例:(同期)
import time

def hello():
    time.sleep(1)

def run():
    for i in range(5):
        hello()
        print('Hello World:%s' % time.time())  #           Hello World    !
if __name__ == '__main__':
    run()

出力:(間隔は約1 s)
Hello World:1548574672.5149598
Hello World:1548574673.5190861
Hello World:1548574674.524205
Hello World:1548574675.528661
Hello World:1548574676.532924

非同期実行に変更するには、主に次の手順に従います.
  • は、非同期関数が非同期タスクcoroutineオブジェクトを返すことを定義し、非同期操作はcoroutineyield fromを介して
  • を完了する必要がある.
  • coroutineオブジェクトをイベントループに追加します.すなわち、非同期のタスクをこのループのrun_に捨てます.until_complete().複数のcoroutineを1組のTaskにカプセル化し、
  • を同時に実行することができる.
  • メインスレッドはイベントループを作成し、asyncioを呼び出す.get_event_loop()
  • イベントループ
  • を閉じる
    import time
    import asyncio
    
    #       
    @asyncio.coroutine
    def hello():
        '''
        yield from               generator。  asyncio.sleep()    coroutine,        asyncio.sleep(),                。
         asyncio.sleep()   ,      yield from     (   None),           。
         asyncio.sleep(1)       1  IO  ,    ,       ,     EventLoop        coroutine ,          
    
        '''
        r = yield from asyncio.sleep(1)
        print('Hello World:%s' % time.time())
    
    def run():
        for i in range(5):
            loop.run_until_complete(hello()) #            ,      
    
    loop = asyncio.get_event_loop() #     asyncio.get_event_loop()        
    if __name__ =='__main__':
        run()

    出力の実行:
    Hello World:1548575242.072916
    Hello World:1548575243.0778098
    Hello World:1548575244.080589
    Hello World:1548575245.0845451
    Hello World:1548575246.0891478

    マルチタスク実行に変更:
    import time
    import asyncio
    
    #       
    @asyncio.coroutine
    def hello():
        '''
        yield from               generator。  asyncio.sleep()    coroutine,        asyncio.sleep(),                。
         asyncio.sleep()   ,      yield from     (   None),           。
         asyncio.sleep(1)       1  IO  ,    ,       ,     EventLoop        coroutine ,          
    
        '''
        r = yield from asyncio.sleep(1)
        print('Hello World:%s' % time.time())
    
    def run():
        for i in range(3):
            tasks = [hello(), hello()]
            loop.run_until_complete(asyncio.wait(tasks)) #            ,      
    
    if __name__ =='__main__':
        loop = asyncio.get_event_loop()  #      asyncio.get_event_loop()        
        run()
        loop.close()

    出力:(2つのタスクが並列に実行されます)
    Hello World:1548576104.507673
    Hello World:1548576104.507762
    Hello World:1548576105.511091
    Hello World:1548576105.5111492
    Hello World:1548576106.5158
    Hello World:1548576106.5158558

    2,asyncとawaitasyncioで提供される@asyncio.coroutineは、1つのgeneratorをcoroutineタイプとしてマークし、coroutine内部でyield fromで別のcoroutineを呼び出して非同期動作を実現することができる.
    非同期IOを簡略化し、よりよく識別するために、Python 3.5から新しい構文asyncawaitが導入され、coroutineのコードをより簡潔に読みやすくすることができます.
    async:
    asyncキーワードは、asyncによって修飾された関数が通常の関数ではなくコヒーレントオブジェクトになるコヒーレントオブジェクトを定義することができます.asyncによって修飾された関数が呼び出されると、send起動によって実行できるコヒーレント関数が生成されます.
    import os
    import asyncio
    async def target_func1():
        print('the func start')
        print(os.getpid())
        print('the func end')
    coroutine = target_func1()
    
    try:
        coroutine.send(None) #     
    except StopIteration:
        print('xx')
        coroutine.close() #   

    await:
    awaitはイベントの実行順序を制御するために使用され、非同期関数、すなわちasyncキーワードで定義されたコヒーレント関数のみが使用され、そうでなければエラーが報告されます.awaitに実行すると、現在のコヒーレンスが停止し、awaitの後のコヒーレンスを実行し、終了したら現在のコヒーレンスに戻って下に進みます.
    import time
    import asyncio
    # async          
    async def target_func1():
        print('the func start')
        x = await target_func2() #      
        print(x)
        print('the func end')
        return 1
    
    async def target_func2():
        """
            2
        :return:
        """
        time.sleep(2)
        print('the func end2')
        return 0

    注意:asyncawaitはcoroutineの新しい文法です.新しい文法を使用するには、2つのステップで簡単に置き換える必要があります.
  • @asyncio.coroutineasyncに置き換えた.
  • yield fromawait
  • に置き換える.
    上記のコード例は、次のように変更されました.
    import time
    import asyncio
    
    #       
    async def hello():
        asyncio.sleep(1)
        print('Hello World:%s' % time.time())
    
    def run():
        for i in range(5):
            loop.run_until_complete(hello())
    
    loop = asyncio.get_event_loop()
    if __name__ =='__main__':
        run()

    asyncioの主な方法:
    asyncio.get_event_loop():        ,                  ;
    asyncio.ensure_future():      
    asyncio.gather(*fs):      
    asyncio.wait(fs):      ,     
    loop.run_until_complete(func):                
    loop.run_forever():        ,  stop   
    loop.create_task():            
    loop.close():    
    loop.time():           
    loop.stop():    
    loop.is_closed() #         
    loop.create_future():    future  ,               future  
    loop.call_soon() #       ,         ,    future  ,    
    loop.call_soon_threadsafe() #        
    loop.call_later() #          ,    
    loop.call_at() #       s  
    loop.call_exception_handler() #     

    主なクラス:
    Future:主にタスクの状態を保存するために使用されます.Task:Futureのサブクラスは、Futureの機能を拡張しています.
    # Future
    from asyncio import Future
    # future = Future()
    # future.result() #        
    # future.remove_done_callback(fn) #               
    # future.set_result('result') #        ,   result()    ,    
    # future.exception() #          
    # future.set_exception('bad') #          
    # future.add_done_callback('fn') #       
    
    # Task
    current_task():         ,   
    all_tasks():           
    get_stack():           
    print_stack:           
    cancel:    

    2.1複数のタスクをイベントループに追加
    asyncioを使用します.gather(*fs):イベントループにパラレルタスクを追加
    import asyncio
    import os
    
    # async          
    async def target_func1(name):
        print('the func start')
        print('PID: {} with name: {}'.format(os.getpid(), name))
        print('the func end')
        return name
    
    def run():
        loop = asyncio.get_event_loop() #         
        taks_list = [target_func1("A"), target_func1("B"), target_func1("C")]
        rest = loop.run_until_complete(asyncio.gather(*taks_list)) #      
        print(rest) #       ,    ,         ,           
        loop.close()
    
    if __name__ == '__main__':
        run()

    実行結果:
    the func start
    PID: 9435 with name: A
    the func end
    the func start
    PID: 9435 with name: C
    the func end
    the func start
    PID: 9435 with name: B
    the func end
    ['A', 'B', 'C']

    2.2  run_forever起動サイクル非同期計算結果の取得
    run_forever()は非同期関数の戻り結果を直接得ることができず、Futureクラスを使用して第三者として結果を保存し、コールバック関数を設定する必要がある.
    import time
    import asyncio
    from asyncio import Future
    from functools import partial
    
    async def target_func0(name, future):
        """
            2
        :return:
        """
        time.sleep(1)
        print(name)
        future.set_result(name) #   return,      
    
    def got_result(loop, future):
        print(future.result()) #     
        loop.stop() #    
    
    def run():
        loop = asyncio.get_event_loop()
        future = Future(loop=loop)
        res = asyncio.ensure_future(target_func0('A', future)) #     Task  
        print(res)
        future.add_done_callback(partial(got_result, loop)) #              future,       
        # print(future.result()) # future        future.set_result。
        loop.run_forever()
        loop.close()
    
    if __name__ == '__main__':
        run()

    実行結果:
    >
    A
    A

    2.3リンク連携
    コンシステント呼び出しは、別のコンシステントが完了するまで待機します.
    import time, os
    import asyncio
    
    async def target_func1(): # async          
        print('the func1 start')
        x = await target_func2() #      
        print(x)
        print('the func1 end')
        return 1
    
    async def target_func2():
        """
            2
        :return:
        """
        time.sleep(2)
        print('the func end2')
        return 0
    
    def run():
        #         
        coroutine = target_func1()
        #         
        loop = asyncio.get_event_loop()
        loop.run_until_complete(coroutine) #             ,      
        print(os.getpid())
        loop.close() #       
    
    if __name__ == '__main__':
        run()

    実行結果:
    the func1 start
    the func end2
    0
    the func1 end
    13584

    2.4コールバック関数
    import asyncio
    from functools import partial
    
    # async          
    async def target_func1():
        print('the func end')
        return 1
    
    def get_res(loop):
        print('xxxx')
        loop.stop()
    
    def run():
        loop = asyncio.get_event_loop() #        
        resp = loop.create_task(target_func1()) #            
        loop.call_soon(partial(get_res, loop)) #      ,         ,    future  
        # loop.call_soon_threadsafe() #        
        # loop.call_later(delay=5, callback=partial(get_res, loop)) #          ,  5   
        # loop.call_at(when=8000,callback=partial(get_res, loop)) #      8   
        # loop.call_exception_handler() #     
        loop.run_forever()
        print(resp.result())
        loop.close()

    実行結果:
    the func end
    xxxx
    1

    3,非同期ネットワークIOライブラリaiohttp
    aiohttpライブラリは、非同期Webリクエストなどの機能を実現するために使用される、非同期版のrequestsライブラリです.
    aiohttpライブラリについては、別の章で説明します.https://blog.csdn.net/biheyu828/article/details/87896507
    基本的な使い方は以下の通りです.
    import asyncio
    import aiohttp
    from aiohttp import ClientSession
    
    
    async def hello(url):
        async with ClientSession() as session: ##  session
            async with session.get(url) as resp:
                print(resp.status)
                resp = await resp.text()
                print(resp)
    
    loop = asyncio.get_event_loop()
    url = "http://httpbin.org/get"
    loop.run_until_complete(hello(url))

    4,コンシステントqueue
    asyncioモジュールにも独自のqueue実装生産消費モデルがあり、Queue(先進先出)、PriorityQueue(優先順位キュー)、LifoQuee(スタック)の3つのキューがある限り、Queueはスレッドセキュリティのクラスではなく、つまりマルチプロセスやマルチスレッドの場合はこのキューを使用しない.
    import asyncio
    from asyncio import Queue
    
    async def producer(queue):
        for i in range(10):
            await queue.put(i)
    
    async def consumer(queue):
        for i in range(10):
            data = await queue.get()
            print(data)
    
    def run():
        loop = asyncio.get_event_loop()
        q = Queue(10)
        tasks = [producer(q), consumer(q)]
        loop.run_until_complete(asyncio.gather(*tasks))
        loop.close()
    
    if __name__ == '__main__':
        run()

    Queueのget(),join(),put()メソッドはいずれもコンシステントを返し,awaitキーワードを使用する必要がある.
     
    参考文献:
    https://www.cnblogs.com/shenh/p/9090586.html
    https://www.cnblogs.com/aademeng/articles/7241485.html
    https://www.cnblogs.com/cwp-bg/p/9590700.html