python-multiprocessingマルチプロセス並列計算
pythonのmultiprocessingパケットは、標準ライブラリから提供されるマルチプロセス並列計算パケットであり、threading(マルチスレッド)に似たAPI関数を提供するが、threadingに比べてタスクを異なるCPUに割り当て、GIL(Global Interpreter Lock)の制限を回避する.次にmultiprocessingのPoolとProcessクラスについて紹介します.
Pool
Poolプロセスプールを使用すると、タスクの並列処理がより便利になります.並列CPUの数を指定すると、Poolは自動的にタスクをプロセスプールに配置して実行します.Poolには複数の並列関数が含まれている.
apply apply_async
applyはタスクを1つずつ実行し、python 3ではすでに破棄され、apply_asyncはapplyの非同期実行バージョンです.並列計算にはapply_を使用する必要があります.async関数.
map map_async
map_asyncはmapの非同期実行関数である.apply_に比べてasync, map_asyncは1つのパラメータしか受け入れられません.
Process
プロセスを作成するには、プロセスを作成するためにプロセスオブジェクトを使用する必要があります.各プロセスはCPUを占有するため、作成するプロセスはCPUの数以下でなければなりません.起動プロセスの数が多すぎる場合、特にCPUが密集しているタスクに遭遇すると、パラレルの効率が低下します.
プロセス間通信
ProcessとPoolはいずれもQueuesとPipesの2種類の通信をサポートしている.
Queueキュー
キューは先進的な先出しの原則に従い、各プロセス間で使用できます.
pipe
Queueとpipeの比較 Pipe() can only have two endpoints. Queue() can have multiple producers and consumers. When to use them If you need more than two points to communicate, use a Queue(). If you need absolute performance, a Pipe() is much faster because Queue() is built on top of Pipe().
参照先:https://stackoverflow.com/questions/8463008/python-multiprocessing-pipe-vs-queue
共有リソース
マルチプロセスは、リソースの共有を回避する必要があります.マルチスレッドでは、グローバル変数を使用したり、パラメータを渡したりするなど、リソースを比較的簡単に共有できます.マルチプロセスの場合、各プロセスには独自のメモリ領域があるため、上記の方法は適切ではありません.メモリとManagerを共有する方法でリソースを共有できます.しかし、これによりプログラムの複雑さが向上し、同期の必要性によりプログラムの効率が低下する.
共有メモリ
共有メモリはProcessクラスにのみ適用され、プロセスプールPoolには使用できません.
Manager Class
Manager Classは、プロセスプールPoolでもプロセスプールでも使用できます.
Pool
Poolプロセスプールを使用すると、タスクの並列処理がより便利になります.並列CPUの数を指定すると、Poolは自動的にタスクをプロセスプールに配置して実行します.Poolには複数の並列関数が含まれている.
apply apply_async
applyはタスクを1つずつ実行し、python 3ではすでに破棄され、apply_asyncはapplyの非同期実行バージョンです.並列計算にはapply_を使用する必要があります.async関数.
import multiprocessing
import time
from random import randint, seed
def f(num):
seed()
rand_num = randint(0,10) #
time.sleep(rand_num)
return (num, rand_num)
start_time = time.time()
cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=cores)
pool_list = []
result_list = []
start_time = time.time()
for xx in xrange(10):
pool_list.append(pool.apply_async(f, (xx, ))) # get,
result_list = [xx.get() for xx in pool_list]
# , for result.get() ? pool.apply_async , result.get() 。 , , 。
# :
pool.close()
pool.join()
print result_list
print ' %.2f' % (time.time() - start_time)
print ' %.2f' % (sum([xx[1] for xx in result_list]))
#[(0, 8), (1, 2), (2, 4), (3, 9), (4, 0), (5, 1), (6, 8), (7, 3), (8, 4), (9, 6)]
# 14.11
# 45.00
map map_async
map_asyncはmapの非同期実行関数である.apply_に比べてasync, map_asyncは1つのパラメータしか受け入れられません.
import time
from multiprocessing import Pool
def run(fn):
#fn:
time.sleep(1)
return fn*fn
if __name__ == "__main__":
testFL = [1,2,3,4,5,6]
print ' :' # ( , )
s = time.time()
for fn in testFL:
run(fn)
e1 = time.time()
print " :", int(e1 - s)
print ' :' # ,
pool = Pool(4) # 5
#testFL: ,run: testFL
rl =pool.map(run, testFL)
pool.close()# ,
pool.join()#
e2 = time.time()
print " :", int(e2-e1)
print rl
# :
# : 6
# :
# : 2
# [1, 4, 9, 16, 25, 36]
Process
プロセスを作成するには、プロセスを作成するためにプロセスオブジェクトを使用する必要があります.各プロセスはCPUを占有するため、作成するプロセスはCPUの数以下でなければなりません.起動プロセスの数が多すぎる場合、特にCPUが密集しているタスクに遭遇すると、パラレルの効率が低下します.
#16.6.1.1. The Process class
from multiprocessing import Process, cpu_count
import os
import time
start_time = time.time()
def info(title):
# print(title)
if hasattr(os, 'getppid'): # only available on Unix
print 'parent process:', os.getppid()
print 'process id:', os.getpid()
time.sleep(3)
def f(name):
info('function f')
print 'hello', name
if __name__ == '__main__':
# info('main line')
p_list = [] # Process
cpu_num = cpu_count()
for xx in xrange(cpu_num):
p_list.append(Process(target=f, args=('xx_%s' % xx,)))
for xx in p_list:
xx.start()
for xx in p_list:
xx.join()
print('spend time: %.2f' % (time.time() - start_time))
parent process: 11741
# parent process: 11741
# parent process: 11741
# process id: 12249
# process id: 12250
# parent process: 11741
# process id: 12251
# process id: 12252
# hello xx_1
# hello xx_0
# hello xx_2
# hello xx_3
# spend time: 3.04
プロセス間通信
ProcessとPoolはいずれもQueuesとPipesの2種類の通信をサポートしている.
Queueキュー
キューは先進的な先出しの原則に従い、各プロセス間で使用できます.
# 16.6.1.2. Exchanging objects between processes
# Queues
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print q.get() # prints "[42, None, 'hello']"
p.join()
pipe
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print parent_conn.recv() # prints "[42, None, 'hello']"
p.join()
Queueとpipeの比較
参照先:https://stackoverflow.com/questions/8463008/python-multiprocessing-pipe-vs-queue
共有リソース
マルチプロセスは、リソースの共有を回避する必要があります.マルチスレッドでは、グローバル変数を使用したり、パラメータを渡したりするなど、リソースを比較的簡単に共有できます.マルチプロセスの場合、各プロセスには独自のメモリ領域があるため、上記の方法は適切ではありません.メモリとManagerを共有する方法でリソースを共有できます.しかし、これによりプログラムの複雑さが向上し、同期の必要性によりプログラムの効率が低下する.
共有メモリ
共有メモリはProcessクラスにのみ適用され、プロセスプールPoolには使用できません.
# 16.6.1.4. Sharing state between processes
# Shared memory
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print num.value
print arr[:]
# 3.1415927
# [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
Manager Class
Manager Classは、プロセスプールPoolでもプロセスプールでも使用できます.
from multiprocessing import Manager, Process
def f(d, l, ii):
d[ii] = ii
l.append(ii)
if __name__ == '__main__':
manager = Manager()
d = manager.dict()
l = manager.list(range(10))
p_list = []
for xx in range(4):
p_list.append(Process(target=f, args=(d, l, xx)))
for xx in p_list:
xx.start()
for xx in p_list:
xx.join()
print d
print l
# {0: 0, 1: 1, 2: 2, 3: 3}
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3]