python-multiprocessingマルチプロセス並列計算

12470 ワード

pythonのmultiprocessingパケットは、標準ライブラリから提供されるマルチプロセス並列計算パケットであり、threading(マルチスレッド)に似たAPI関数を提供するが、threadingに比べてタスクを異なるCPUに割り当て、GIL(Global Interpreter Lock)の制限を回避する.次にmultiprocessingのPoolとProcessクラスについて紹介します.
Pool
Poolプロセスプールを使用すると、タスクの並列処理がより便利になります.並列CPUの数を指定すると、Poolは自動的にタスクをプロセスプールに配置して実行します.Poolには複数の並列関数が含まれている.
apply apply_async
applyはタスクを1つずつ実行し、python 3ではすでに破棄され、apply_asyncはapplyの非同期実行バージョンです.並列計算にはapply_を使用する必要があります.async関数.

import multiprocessing
import time

from random import randint, seed

def f(num):
    seed()
    rand_num = randint(0,10) #              
    time.sleep(rand_num)
    return (num, rand_num)

start_time = time.time()
cores = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=cores)
pool_list = []
result_list = []
start_time = time.time()
for xx in xrange(10):
    pool_list.append(pool.apply_async(f, (xx, )))  #      get,      

result_list = [xx.get() for xx in pool_list]
#          ,        for       result.get() ?    pool.apply_async            ,   result.get()                       。   ,                     ,         。

#                :   
pool.close()
pool.join()

print result_list
print '       %.2f' % (time.time() - start_time)
print '       %.2f' % (sum([xx[1] for xx in  result_list]))

#[(0, 8), (1, 2), (2, 4), (3, 9), (4, 0), (5, 1), (6, 8), (7, 3), (8, 4), (9, 6)]
#       14.11
#       45.00

map map_async
map_asyncはmapの非同期実行関数である.apply_に比べてasync, map_asyncは1つのパラメータしか受け入れられません.

import time
from multiprocessing import Pool
def run(fn):
  #fn:               
  time.sleep(1)
  return fn*fn

if __name__ == "__main__":
  testFL = [1,2,3,4,5,6]  
  print '  :' #    (       ,   )
  s = time.time()
  for fn in testFL:
    run(fn)

  e1 = time.time()
  print "      :", int(e1 - s)

  print '  :' #      ,    
  pool = Pool(4)  #    5         
  #testFL:        ,run:  testFL        
  rl =pool.map(run, testFL) 
  pool.close()#     ,        
  pool.join()#             
  e2 = time.time()
  print "      :", int(e2-e1)
  print rl

#   :
#       : 6
#   :
#       : 2
# [1, 4, 9, 16, 25, 36]

Process
プロセスを作成するには、プロセスを作成するためにプロセスオブジェクトを使用する必要があります.各プロセスはCPUを占有するため、作成するプロセスはCPUの数以下でなければなりません.起動プロセスの数が多すぎる場合、特にCPUが密集しているタスクに遭遇すると、パラレルの効率が低下します.
#16.6.1.1. The Process class
from multiprocessing import Process, cpu_count
import os
import time

start_time = time.time()
def info(title):
#     print(title)
    if hasattr(os, 'getppid'):  # only available on Unix
        print 'parent process:', os.getppid()
    print 'process id:', os.getpid()
    time.sleep(3)

def f(name):
    info('function f')
    print 'hello', name

if __name__ == '__main__':
#     info('main line')
    p_list = [] #   Process     
    cpu_num = cpu_count()
    for xx in xrange(cpu_num):
        p_list.append(Process(target=f, args=('xx_%s' % xx,)))
    for xx in p_list:
        xx.start()

    for xx in p_list:
        xx.join()
    print('spend time: %.2f' % (time.time() - start_time))
parent process: 11741
# parent process: 11741
# parent process: 11741
# process id: 12249
# process id: 12250
# parent process: 11741
# process id: 12251
# process id: 12252
# hello xx_1
# hello xx_0
# hello xx_2
# hello xx_3
# spend time: 3.04

プロセス間通信
ProcessとPoolはいずれもQueuesとPipesの2種類の通信をサポートしている.
Queueキュー
キューは先進的な先出しの原則に従い、各プロセス間で使用できます.

# 16.6.1.2. Exchanging objects between processes
# Queues

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print q.get()    # prints "[42, None, 'hello']"
    p.join()

pipe
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print parent_conn.recv()   # prints "[42, None, 'hello']"
    p.join()

Queueとpipeの比較
  • Pipe() can only have two endpoints.
  • Queue() can have multiple producers and consumers. When to use them If you need more than two points to communicate, use a Queue(). If you need absolute performance, a Pipe() is much faster because Queue() is built on top of Pipe().

  • 参照先:https://stackoverflow.com/questions/8463008/python-multiprocessing-pipe-vs-queue
    共有リソース
    マルチプロセスは、リソースの共有を回避する必要があります.マルチスレッドでは、グローバル変数を使用したり、パラメータを渡したりするなど、リソースを比較的簡単に共有できます.マルチプロセスの場合、各プロセスには独自のメモリ領域があるため、上記の方法は適切ではありません.メモリとManagerを共有する方法でリソースを共有できます.しかし、これによりプログラムの複雑さが向上し、同期の必要性によりプログラムの効率が低下する.
    共有メモリ
    共有メモリはProcessクラスにのみ適用され、プロセスプールPoolには使用できません.
    # 16.6.1.4. Sharing state between processes
    # Shared memory
    from multiprocessing import Process, Value, Array
    
    def f(n, a):
        n.value = 3.1415927
        for i in range(len(a)):
            a[i] = -a[i]
    
    if __name__ == '__main__':
        num = Value('d', 0.0)
        arr = Array('i', range(10))
    
        p = Process(target=f, args=(num, arr))
        p.start()
        p.join()
    
        print num.value
        print arr[:]
    
    # 3.1415927
    # [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

    Manager Class
    Manager Classは、プロセスプールPoolでもプロセスプールでも使用できます.
    
    from multiprocessing import Manager, Process
    def f(d, l, ii):
        d[ii] = ii
        l.append(ii)
    
    if __name__ == '__main__':
        manager = Manager()
    
        d = manager.dict()
        l = manager.list(range(10))
        p_list = [] 
        for xx in range(4):
            p_list.append(Process(target=f, args=(d, l, xx)))
        for xx in p_list:
            xx.start()
    
        for xx in p_list:
            xx.join()
        print d
        print l
    # {0: 0, 1: 1, 2: 2, 3: 3}
    # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3]