concurrentを使用します.futuresモジュールの同時実行、プロセスプール、スレッドプールの実現
7309 ワード
concurrent.futures紹介
futureは、非同期で実行される操作を表すオブジェクトです.この概念はconcurrentです.futuresモジュールとasyncioパッケージの基礎.
Python標準ライブラリは、threadingモジュールとmultiprocessingモジュールが対応する非同期マルチスレッド/マルチプロセスコードを記述するために提供しています.
concurrent.futuresモジュールはPython 3.2導入のPython 2 xバージョンについてPython 2.5以上のバージョンではfuturesパッケージをインストールしてこのモジュールを使用し、コマンドpip install futuresを使用してインストールすればよい.
concurrent.futuresは主に2つのクラスを使用し、マルチスレッド:ThreadPoolExecutor
マルチプロセス:ProcessPoolExecutor
;この2つのクラスはいずれも抽象Executor
クラスのサブクラスであり,同じインタフェースを継承している.
Python 3から4から、標準ライブラリにはFutureのクラスが2つあります:concurrent.futures.Futureとasyncio.Future.この2つのクラスは同じ役割を果たします.2つのFutureクラスのインスタンスは、完了または未完了の可能性のある遅延計算を示します.
concurrent.futuresモジュールの基礎はExectuorであり,Executorは抽象クラスであり,直接使用できない.しかし、スレッド・プールとプロセス・プールのコードを作成するために使用される2つのサブクラスThreadPoolExecutorとProcessPoolExecutorは非常に有用です.スレッドプール/プロセスプールに対応するtasksを直接入れることができます.Queueを維持してデッドロックの問題を心配する必要はありません.スレッドプール/プロセスプールは自動的にスケジューリングしてくれます.
concurrent.futuresモジュールは高度にカプセル化された非同期呼び出しインタフェースThreadPoolExecutor:スレッドプール、非同期呼び出しProcessPoolExecutor:プロセスプール、非同期呼び出しを提供する
きほんほうしき ●submit(fn, *args, **kwargs)
●map(func, *iterables, timeout=None, chunksize=1)
for submit
●shutdown(wait=True)
pool.close()+pool.join()
wait=True,
wait=False, ,
wait ,
submit map shutdown
●result(timeout=None)
●add_done_callback(fn)
プロセスプール #
import os
import time
from concurrent.futures import ProcessPoolExecutor
def task(n):
print('%s is runing' % os.getpid())
time.sleep(2)
return n ** 2
if __name__ == '__main__':
executor = ProcessPoolExecutor(max_workers=3) # cpu
futures = []
for i in range(10):
future = executor.submit(task, i)#submit() future , obj.result()
futures.append(future)
executor.shutdown() # True from multiprocessing import Pool close join
print('+++++++++++++++++++=>')
print([obj.result() for obj in futures])
上記の方法は以下の方法にもなりますimport os
import time
from concurrent.futures import ProcessPoolExecutor
def task(n):
print('%s is runing' % os.getpid())
time.sleep(2)
return n ** 2
if __name__ == '__main__':
start = time.time()
with ProcessPoolExecutor() as p: # , .shutdown()
future_tasks = [p.submit(task, i) for i in range(10)]
print('=' * 30)
print([obj.result() for obj in future_tasks])
スレッドプール
注意:
Windowsでプロセスプールを作成するにはif_name__ == '__main__':で、そうでないとエラーになります
理由:
これはWindows上のマルチプロセスの実装の問題です.Windowsでは、サブプロセスが自動的にimportでこのファイルを起動し、importのときにこれらの文が実行されます.このように書くと、サブプロセスエラーが無限に再帰的に作成されます.だがプロセスのソースコードでは,サブプロセスの再出産プロセスが制限されており,許可されていないため,このようなエラーメッセージが発生する.サブプロセスを作成した部分をそのif判定で保護しなければなりません.importの場合、nameはmainではなく、再帰的に実行されません.import os
import time
import threading
from concurrent.futures import ThreadPoolExecutor
def task(n):
print('%s:%s is running' % (threading.currentThread().getName(), os.getpid()))
time.sleep(2)
return n**2
if __name__ == '__main__':
executor = ThreadPoolExecutor(max_workers = 6) # cpu *5
futures = []
start = time.time()
for i in range(10):
obj = executor.submit(task, i)
futures.append(obj)
executor.shutdown()
print('=' * 30)
print([obj.result() for obj in futures])
print(time.time() - start)
上記の方法は以下の方法にもなりますimport os
import time
import threading
from concurrent.futures import ThreadPoolExecutor
def task(n):
print('%s:%s is running' % (threading.currentThread().getName(), os.getpid()))
time.sleep(2)
return n**2
if __name__ == '__main__':
start = time.time()
with ThreadPoolExecutor(max_workers=6) as executor: # , .shutdown()
future_tasks = [executor.submit(task, i) for i in range(10)]
print('=' * 30)
print([obj.result() for obj in future_tasks])
print(time.time() - start)
コールバック関数 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
from threading import currentThread
def get_page(url):
print('%s: is getting [%s]' %(currentThread().getName(),os.getpid(),url))
response=requests.get(url)
time.sleep(2)
return {'url':url,'text':response.text}
def parse_page(res): # res p.submit future ,
res=res.result() #res.result()
print('%s: parse [%s]' %(currentThread().getName(),os.getpid(),res['url']))
with open('db.txt','a') as f:
parse_res='url:%s size:%s
' %(res['url'],len(res['text']))
f.write(parse_res)
if __name__ == '__main__':
# p=ProcessPoolExecutor()
p=ThreadPoolExecutor()
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
for url in urls:
# multiprocessing.pool_obj.apply_async(get_page,args=(url,),callback=parse_page)
p.submit(get_page, url).add_done_callback(parse_page) # , submit , .result
p.shutdown()
print(' ',os.getpid())
mapメソッド
内蔵関数mapとあまり差のない使い方で、この方法はmap(func,*iterables)反復器を返し、反復器のコールバックは戻りの結果を秩序正しく実行します.
以下はconcurrent.futuresモジュールの下のクラスThreadPoolExecutorとProcessPoolExecutorのインスタンス化されたオブジェクトのmapメソッドによるプロセスプール、スレッドプールの実装from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import os
import time
def task(n):
print('%s is running' % os.getpid())
time.sleep(2)
return n**2
if __name__ == '__main__':
# executor=ProcessPoolExecutor()
executor = ThreadPoolExecutor()
start = time.time()
obj = executor.map(task, range(10))
executor.shutdown()
print('=' * 30)
print(list(obj))
print(time.time() - start)
簡略化
1、with ThreadPoolExecutor()as executor:#ファイルを開くのと同じように、省くことができます.shutdown()
2、executor.map(task,range(1,12)#mapがfor+submitに取って代わったwith ProcessPoolExecutor(max_workers=10) as executor:
executor.map(print_hello, range(10))
参照先:https://docs.python.org/zh-cn/dev/library/concurrent.futures.html
●submit(fn, *args, **kwargs)
●map(func, *iterables, timeout=None, chunksize=1)
for submit
●shutdown(wait=True)
pool.close()+pool.join()
wait=True,
wait=False, ,
wait ,
submit map shutdown
●result(timeout=None)
●add_done_callback(fn)
プロセスプール #
import os
import time
from concurrent.futures import ProcessPoolExecutor
def task(n):
print('%s is runing' % os.getpid())
time.sleep(2)
return n ** 2
if __name__ == '__main__':
executor = ProcessPoolExecutor(max_workers=3) # cpu
futures = []
for i in range(10):
future = executor.submit(task, i)#submit() future , obj.result()
futures.append(future)
executor.shutdown() # True from multiprocessing import Pool close join
print('+++++++++++++++++++=>')
print([obj.result() for obj in futures])
上記の方法は以下の方法にもなりますimport os
import time
from concurrent.futures import ProcessPoolExecutor
def task(n):
print('%s is runing' % os.getpid())
time.sleep(2)
return n ** 2
if __name__ == '__main__':
start = time.time()
with ProcessPoolExecutor() as p: # , .shutdown()
future_tasks = [p.submit(task, i) for i in range(10)]
print('=' * 30)
print([obj.result() for obj in future_tasks])
スレッドプール
注意:
Windowsでプロセスプールを作成するにはif_name__ == '__main__':で、そうでないとエラーになります
理由:
これはWindows上のマルチプロセスの実装の問題です.Windowsでは、サブプロセスが自動的にimportでこのファイルを起動し、importのときにこれらの文が実行されます.このように書くと、サブプロセスエラーが無限に再帰的に作成されます.だがプロセスのソースコードでは,サブプロセスの再出産プロセスが制限されており,許可されていないため,このようなエラーメッセージが発生する.サブプロセスを作成した部分をそのif判定で保護しなければなりません.importの場合、nameはmainではなく、再帰的に実行されません.import os
import time
import threading
from concurrent.futures import ThreadPoolExecutor
def task(n):
print('%s:%s is running' % (threading.currentThread().getName(), os.getpid()))
time.sleep(2)
return n**2
if __name__ == '__main__':
executor = ThreadPoolExecutor(max_workers = 6) # cpu *5
futures = []
start = time.time()
for i in range(10):
obj = executor.submit(task, i)
futures.append(obj)
executor.shutdown()
print('=' * 30)
print([obj.result() for obj in futures])
print(time.time() - start)
上記の方法は以下の方法にもなりますimport os
import time
import threading
from concurrent.futures import ThreadPoolExecutor
def task(n):
print('%s:%s is running' % (threading.currentThread().getName(), os.getpid()))
time.sleep(2)
return n**2
if __name__ == '__main__':
start = time.time()
with ThreadPoolExecutor(max_workers=6) as executor: # , .shutdown()
future_tasks = [executor.submit(task, i) for i in range(10)]
print('=' * 30)
print([obj.result() for obj in future_tasks])
print(time.time() - start)
コールバック関数 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
from threading import currentThread
def get_page(url):
print('%s: is getting [%s]' %(currentThread().getName(),os.getpid(),url))
response=requests.get(url)
time.sleep(2)
return {'url':url,'text':response.text}
def parse_page(res): # res p.submit future ,
res=res.result() #res.result()
print('%s: parse [%s]' %(currentThread().getName(),os.getpid(),res['url']))
with open('db.txt','a') as f:
parse_res='url:%s size:%s
' %(res['url'],len(res['text']))
f.write(parse_res)
if __name__ == '__main__':
# p=ProcessPoolExecutor()
p=ThreadPoolExecutor()
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
for url in urls:
# multiprocessing.pool_obj.apply_async(get_page,args=(url,),callback=parse_page)
p.submit(get_page, url).add_done_callback(parse_page) # , submit , .result
p.shutdown()
print(' ',os.getpid())
mapメソッド
内蔵関数mapとあまり差のない使い方で、この方法はmap(func,*iterables)反復器を返し、反復器のコールバックは戻りの結果を秩序正しく実行します.
以下はconcurrent.futuresモジュールの下のクラスThreadPoolExecutorとProcessPoolExecutorのインスタンス化されたオブジェクトのmapメソッドによるプロセスプール、スレッドプールの実装from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import os
import time
def task(n):
print('%s is running' % os.getpid())
time.sleep(2)
return n**2
if __name__ == '__main__':
# executor=ProcessPoolExecutor()
executor = ThreadPoolExecutor()
start = time.time()
obj = executor.map(task, range(10))
executor.shutdown()
print('=' * 30)
print(list(obj))
print(time.time() - start)
簡略化
1、with ThreadPoolExecutor()as executor:#ファイルを開くのと同じように、省くことができます.shutdown()
2、executor.map(task,range(1,12)#mapがfor+submitに取って代わったwith ProcessPoolExecutor(max_workers=10) as executor:
executor.map(print_hello, range(10))
参照先:https://docs.python.org/zh-cn/dev/library/concurrent.futures.html
#
import os
import time
from concurrent.futures import ProcessPoolExecutor
def task(n):
print('%s is runing' % os.getpid())
time.sleep(2)
return n ** 2
if __name__ == '__main__':
executor = ProcessPoolExecutor(max_workers=3) # cpu
futures = []
for i in range(10):
future = executor.submit(task, i)#submit() future , obj.result()
futures.append(future)
executor.shutdown() # True from multiprocessing import Pool close join
print('+++++++++++++++++++=>')
print([obj.result() for obj in futures])
import os
import time
from concurrent.futures import ProcessPoolExecutor
def task(n):
print('%s is runing' % os.getpid())
time.sleep(2)
return n ** 2
if __name__ == '__main__':
start = time.time()
with ProcessPoolExecutor() as p: # , .shutdown()
future_tasks = [p.submit(task, i) for i in range(10)]
print('=' * 30)
print([obj.result() for obj in future_tasks])
注意:
Windowsでプロセスプールを作成するにはif_name__ == '__main__':で、そうでないとエラーになります
理由:
これはWindows上のマルチプロセスの実装の問題です.Windowsでは、サブプロセスが自動的にimportでこのファイルを起動し、importのときにこれらの文が実行されます.このように書くと、サブプロセスエラーが無限に再帰的に作成されます.だがプロセスのソースコードでは,サブプロセスの再出産プロセスが制限されており,許可されていないため,このようなエラーメッセージが発生する.サブプロセスを作成した部分をそのif判定で保護しなければなりません.importの場合、nameはmainではなく、再帰的に実行されません.
import os
import time
import threading
from concurrent.futures import ThreadPoolExecutor
def task(n):
print('%s:%s is running' % (threading.currentThread().getName(), os.getpid()))
time.sleep(2)
return n**2
if __name__ == '__main__':
executor = ThreadPoolExecutor(max_workers = 6) # cpu *5
futures = []
start = time.time()
for i in range(10):
obj = executor.submit(task, i)
futures.append(obj)
executor.shutdown()
print('=' * 30)
print([obj.result() for obj in futures])
print(time.time() - start)
上記の方法は以下の方法にもなります
import os
import time
import threading
from concurrent.futures import ThreadPoolExecutor
def task(n):
print('%s:%s is running' % (threading.currentThread().getName(), os.getpid()))
time.sleep(2)
return n**2
if __name__ == '__main__':
start = time.time()
with ThreadPoolExecutor(max_workers=6) as executor: # , .shutdown()
future_tasks = [executor.submit(task, i) for i in range(10)]
print('=' * 30)
print([obj.result() for obj in future_tasks])
print(time.time() - start)
コールバック関数 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
from threading import currentThread
def get_page(url):
print('%s: is getting [%s]' %(currentThread().getName(),os.getpid(),url))
response=requests.get(url)
time.sleep(2)
return {'url':url,'text':response.text}
def parse_page(res): # res p.submit future ,
res=res.result() #res.result()
print('%s: parse [%s]' %(currentThread().getName(),os.getpid(),res['url']))
with open('db.txt','a') as f:
parse_res='url:%s size:%s
' %(res['url'],len(res['text']))
f.write(parse_res)
if __name__ == '__main__':
# p=ProcessPoolExecutor()
p=ThreadPoolExecutor()
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
for url in urls:
# multiprocessing.pool_obj.apply_async(get_page,args=(url,),callback=parse_page)
p.submit(get_page, url).add_done_callback(parse_page) # , submit , .result
p.shutdown()
print(' ',os.getpid())
mapメソッド
内蔵関数mapとあまり差のない使い方で、この方法はmap(func,*iterables)反復器を返し、反復器のコールバックは戻りの結果を秩序正しく実行します.
以下はconcurrent.futuresモジュールの下のクラスThreadPoolExecutorとProcessPoolExecutorのインスタンス化されたオブジェクトのmapメソッドによるプロセスプール、スレッドプールの実装from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import os
import time
def task(n):
print('%s is running' % os.getpid())
time.sleep(2)
return n**2
if __name__ == '__main__':
# executor=ProcessPoolExecutor()
executor = ThreadPoolExecutor()
start = time.time()
obj = executor.map(task, range(10))
executor.shutdown()
print('=' * 30)
print(list(obj))
print(time.time() - start)
簡略化
1、with ThreadPoolExecutor()as executor:#ファイルを開くのと同じように、省くことができます.shutdown()
2、executor.map(task,range(1,12)#mapがfor+submitに取って代わったwith ProcessPoolExecutor(max_workers=10) as executor:
executor.map(print_hello, range(10))
参照先:https://docs.python.org/zh-cn/dev/library/concurrent.futures.html
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
from threading import currentThread
def get_page(url):
print('%s: is getting [%s]' %(currentThread().getName(),os.getpid(),url))
response=requests.get(url)
time.sleep(2)
return {'url':url,'text':response.text}
def parse_page(res): # res p.submit future ,
res=res.result() #res.result()
print('%s: parse [%s]' %(currentThread().getName(),os.getpid(),res['url']))
with open('db.txt','a') as f:
parse_res='url:%s size:%s
' %(res['url'],len(res['text']))
f.write(parse_res)
if __name__ == '__main__':
# p=ProcessPoolExecutor()
p=ThreadPoolExecutor()
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
for url in urls:
# multiprocessing.pool_obj.apply_async(get_page,args=(url,),callback=parse_page)
p.submit(get_page, url).add_done_callback(parse_page) # , submit , .result
p.shutdown()
print(' ',os.getpid())
内蔵関数mapとあまり差のない使い方で、この方法はmap(func,*iterables)反復器を返し、反復器のコールバックは戻りの結果を秩序正しく実行します.
以下はconcurrent.futuresモジュールの下のクラスThreadPoolExecutorとProcessPoolExecutorのインスタンス化されたオブジェクトのmapメソッドによるプロセスプール、スレッドプールの実装
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import os
import time
def task(n):
print('%s is running' % os.getpid())
time.sleep(2)
return n**2
if __name__ == '__main__':
# executor=ProcessPoolExecutor()
executor = ThreadPoolExecutor()
start = time.time()
obj = executor.map(task, range(10))
executor.shutdown()
print('=' * 30)
print(list(obj))
print(time.time() - start)
簡略化
1、with ThreadPoolExecutor()as executor:#ファイルを開くのと同じように、省くことができます.shutdown()
2、executor.map(task,range(1,12)#mapがfor+submitに取って代わったwith ProcessPoolExecutor(max_workers=10) as executor:
executor.map(print_hello, range(10))
参照先:https://docs.python.org/zh-cn/dev/library/concurrent.futures.html
with ProcessPoolExecutor(max_workers=10) as executor:
executor.map(print_hello, range(10))