concurrentを使用します.futuresモジュールの同時実行、プロセスプール、スレッドプールの実現

7309 ワード

concurrent.futures紹介


futureは、非同期で実行される操作を表すオブジェクトです.この概念はconcurrentです.futuresモジュールとasyncioパッケージの基礎.
Python標準ライブラリは、threadingモジュールとmultiprocessingモジュールが対応する非同期マルチスレッド/マルチプロセスコードを記述するために提供しています.
concurrent.futuresモジュールはPython 3.2導入のPython 2 xバージョンについてPython 2.5以上のバージョンではfuturesパッケージをインストールしてこのモジュールを使用し、コマンドpip install futuresを使用してインストールすればよい.
concurrent.futuresは主に2つのクラスを使用し、マルチスレッド:ThreadPoolExecutorマルチプロセス:ProcessPoolExecutor;この2つのクラスはいずれも抽象Executorクラスのサブクラスであり,同じインタフェースを継承している.
Python 3から4から、標準ライブラリにはFutureのクラスが2つあります:concurrent.futures.Futureとasyncio.Future.この2つのクラスは同じ役割を果たします.2つのFutureクラスのインスタンスは、完了または未完了の可能性のある遅延計算を示します.
concurrent.futuresモジュールの基礎はExectuorであり,Executorは抽象クラスであり,直接使用できない.しかし、スレッド・プールとプロセス・プールのコードを作成するために使用される2つのサブクラスThreadPoolExecutorとProcessPoolExecutorは非常に有用です.スレッドプール/プロセスプールに対応するtasksを直接入れることができます.Queueを維持してデッドロックの問題を心配する必要はありません.スレッドプール/プロセスプールは自動的にスケジューリングしてくれます.
concurrent.futuresモジュールは高度にカプセル化された非同期呼び出しインタフェースThreadPoolExecutor:スレッドプール、非同期呼び出しProcessPoolExecutor:プロセスプール、非同期呼び出しを提供する

きほんほうしき

●submit(fn, *args, **kwargs)
 

●map(func, *iterables, timeout=None, chunksize=1) 
 for submit 

●shutdown(wait=True) 
 pool.close()+pool.join() 
wait=True, 
wait=False, , 
 wait , 
submit map shutdown 

●result(timeout=None)
 

●add_done_callback(fn)
 

プロセスプール

#  
import os
import time
from concurrent.futures import  ProcessPoolExecutor

def task(n):
    print('%s is runing' % os.getpid())
    time.sleep(2)
    return n ** 2

if __name__ == '__main__':

    executor = ProcessPoolExecutor(max_workers=3) # cpu 
    futures = []
    for i in range(10):
        future = executor.submit(task, i)#submit() future , obj.result()
        futures.append(future)
    executor.shutdown() # True from multiprocessing import Pool close join 
    print('+++++++++++++++++++=>')
    print([obj.result() for obj in futures])

上記の方法は以下の方法にもなります
import os
import time
from concurrent.futures import  ProcessPoolExecutor

def task(n):
    print('%s is runing' % os.getpid())
    time.sleep(2)
    return n ** 2

if __name__ == '__main__':

    start = time.time()
    with ProcessPoolExecutor() as p:   # , .shutdown()
        future_tasks = [p.submit(task, i) for i in range(10)]
    print('=' * 30)
    print([obj.result() for obj in future_tasks])

スレッドプール


注意:
Windowsでプロセスプールを作成するにはif_name__ == '__main__':で、そうでないとエラーになります
理由:
これはWindows上のマルチプロセスの実装の問題です.Windowsでは、サブプロセスが自動的にimportでこのファイルを起動し、importのときにこれらの文が実行されます.このように書くと、サブプロセスエラーが無限に再帰的に作成されます.だがプロセスのソースコードでは,サブプロセスの再出産プロセスが制限されており,許可されていないため,このようなエラーメッセージが発生する.サブプロセスを作成した部分をそのif判定で保護しなければなりません.importの場合、nameはmainではなく、再帰的に実行されません.
import os
import time
import threading
from concurrent.futures import ThreadPoolExecutor


def task(n):
    print('%s:%s is running' % (threading.currentThread().getName(), os.getpid()))
    time.sleep(2)
    return n**2


if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers = 6)  #  cpu *5
    futures = []
    start = time.time()
    for i in range(10):
        obj = executor.submit(task, i)
        futures.append(obj)
    executor.shutdown()
    print('=' * 30)
    print([obj.result() for obj in futures])
    print(time.time() - start)

上記の方法は以下の方法にもなります
import os
import time
import threading
from concurrent.futures import ThreadPoolExecutor


def task(n):
    print('%s:%s is running' % (threading.currentThread().getName(), os.getpid()))
    time.sleep(2)
    return n**2


if __name__ == '__main__':

    start = time.time()
    with ThreadPoolExecutor(max_workers=6) as executor:   # , .shutdown()
        future_tasks = [executor.submit(task, i) for i in range(10)]
    print('=' * 30)
    print([obj.result() for obj in future_tasks])
    print(time.time() - start)

コールバック関数

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
from threading import currentThread
def get_page(url):
    print('%s: is getting [%s]' %(currentThread().getName(),os.getpid(),url))
    response=requests.get(url)
    time.sleep(2)
    return {'url':url,'text':response.text}
def parse_page(res):  # res p.submit future , 
    res=res.result()  #res.result() 
    print('%s: parse [%s]' %(currentThread().getName(),os.getpid(),res['url']))
    with open('db.txt','a') as f:
        parse_res='url:%s size:%s
' %(res['url'],len(res['text'])) f.write(parse_res) if __name__ == '__main__': # p=ProcessPoolExecutor() p=ThreadPoolExecutor() urls = [ 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', 'https://www.baidu.com', ] for url in urls: # multiprocessing.pool_obj.apply_async(get_page,args=(url,),callback=parse_page) p.submit(get_page, url).add_done_callback(parse_page) # , submit , .result p.shutdown() print(' ',os.getpid())

mapメソッド


内蔵関数mapとあまり差のない使い方で、この方法はmap(func,*iterables)反復器を返し、反復器のコールバックは戻りの結果を秩序正しく実行します.
以下はconcurrent.futuresモジュールの下のクラスThreadPoolExecutorとProcessPoolExecutorのインスタンス化されたオブジェクトのmapメソッドによるプロセスプール、スレッドプールの実装
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import os
import time


def task(n):
    print('%s is running' % os.getpid())
    time.sleep(2)
    return n**2


if __name__ == '__main__':
    # executor=ProcessPoolExecutor()
    executor = ThreadPoolExecutor()
    start = time.time()
    obj = executor.map(task, range(10))
    executor.shutdown()
    print('=' * 30)
    print(list(obj))
    print(time.time() - start)

簡略化


1、with ThreadPoolExecutor()as executor:#ファイルを開くのと同じように、省くことができます.shutdown()
2、executor.map(task,range(1,12)#mapがfor+submitに取って代わった
with ProcessPoolExecutor(max_workers=10) as executor:
    executor.map(print_hello, range(10))

参照先:https://docs.python.org/zh-cn/dev/library/concurrent.futures.html