Python標準モジュール--concurrent.futuresモジュール(ThreadPoolExecutor:スレッドプール、非同期呼び出し、ProcessPoolExecutor:プロセスプール、非同期呼び出し)
4600 ワード
目次
ProcessPoolExecutor:プロセスプール
ThreadPoolExecutor:スレッドプール
mapの使い方
コールバック関数
https://docs.python.org/dev/library/concurrent.futures.html
1.concurrent.futuresモジュールは高度にカプセル化された非同期呼び出しインターフェースThreadPoolExecutor:スレッドプール、非同期呼び出しProcessPoolExecutor:プロセスプール、非同期呼び出しBoth implement the same interface、which is defined by the abstract Executor classを提供する.
2基本方法●submit(fn,*args,**kwargs)非同期コミットタスク
●map(func,*iterables,timeout=None,chunksize=1)forサイクルsubmitの動作に取って代わる
●shutdown(wait=True)はプロセスプールのpoolに相当する.close()+pool.join()はwait=Trueを操作し、プール内のすべてのタスクの実行が完了してリソースが回収されるのを待ってからwait=Falseを継続し、すぐに戻ります.プール内のタスクの実行が完了するのを待つことはありませんが、waitパラメータの値にかかわらず、プログラム全体はすべてのタスクの実行が完了するまでsubmitとmapがshutdownの前に必要です.
●result(timeout=None)取得結果
●add_done_callback(fn)コールバック関数
ProcessPoolExecutor:プロセスプール
ThreadPoolExecutor:スレッドプール
mapの使い方
コールバック関数
ProcessPoolExecutor:プロセスプール
ThreadPoolExecutor:スレッドプール
mapの使い方
コールバック関数
https://docs.python.org/dev/library/concurrent.futures.html
1.concurrent.futuresモジュールは高度にカプセル化された非同期呼び出しインターフェースThreadPoolExecutor:スレッドプール、非同期呼び出しProcessPoolExecutor:プロセスプール、非同期呼び出しBoth implement the same interface、which is defined by the abstract Executor classを提供する.
2基本方法●submit(fn,*args,**kwargs)非同期コミットタスク
●map(func,*iterables,timeout=None,chunksize=1)forサイクルsubmitの動作に取って代わる
●shutdown(wait=True)はプロセスプールのpoolに相当する.close()+pool.join()はwait=Trueを操作し、プール内のすべてのタスクの実行が完了してリソースが回収されるのを待ってからwait=Falseを継続し、すぐに戻ります.プール内のタスクの実行が完了するのを待つことはありませんが、waitパラメータの値にかかわらず、プログラム全体はすべてのタスクの実行が完了するまでsubmitとmapがshutdownの前に必要です.
●result(timeout=None)取得結果
●add_done_callback(fn)コールバック関数
ProcessPoolExecutor:プロセスプール
#
The ProcessPoolExecutor class is an Executor subclass that uses a pool of processes to execute calls asynchronously. ProcessPoolExecutor uses the multiprocessing module, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None)
An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine. If max_workers is lower or equal to 0, then a ValueError will be raised.
#
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2
if __name__ == '__main__':
executor=ProcessPoolExecutor(max_workers=3)
futures=[]
for i in range(11):
future=executor.submit(task,i)
futures.append(future)
executor.shutdown(True)
print('+++>')
for future in futures:
print(future.result())
ThreadPoolExecutor:スレッドプール
#
ThreadPoolExecutor is an Executor subclass that uses a pool of threads to execute calls asynchronously.
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An Executor subclass that uses a pool of at most max_workers threads to execute calls asynchronously.
Changed in version 3.5: If max_workers is None or not given, it will default to the number of processors on the machine, multiplied by 5, assuming that ThreadPoolExecutor is often used to overlap I/O instead of CPU work and the number of workers should be higher than the number of workers for ProcessPoolExecutor.
New in version 3.6: The thread_name_prefix argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging.
#
ProcessPoolExecutor
ThreadPoolExecutor
mapの使い方
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import os,time,random
def task(n):
print('%s is runing' %os.getpid())
time.sleep(random.randint(1,3))
return n**2
if __name__ == '__main__':
executor=ThreadPoolExecutor(max_workers=3)
# for i in range(11):
# future=executor.submit(task,i)
executor.map(task,range(1,12)) #map for+submit
コールバック関数
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from multiprocessing import Pool
import requests
import json
import os
def get_page(url):
print(' get %s' %(os.getpid(),url))
respone=requests.get(url)
if respone.status_code == 200:
return {'url':url,'text':respone.text}
def parse_page(res):
res=res.result()
print(' parse %s' %(os.getpid(),res['url']))
parse_res='url: size:[%s]
' %(res['url'],len(res['text']))
with open('db.txt','a') as f:
f.write(parse_res)
if __name__ == '__main__':
urls=[
'https://www.baidu.com',
'https://www.python.org',
'https://www.openstack.org',
'https://help.github.com/',
'http://www.sina.com.cn/'
]
# p=Pool(3)
# for url in urls:
# p.apply_async(get_page,args=(url,),callback=pasrse_page)
# p.close()
# p.join()
p=ProcessPoolExecutor(3)
for url in urls:
p.submit(get_page,url).add_done_callback(parse_page) #parse_page future obj, obj.result()