Pythonの同時プログラミングを理解するだけで十分です|プロセス編

312402 ワード

前節ではPythonの同時プログラミングを理解すれば十分です.スレッド編では、いくつかのスレッドの技術について説明しました.この節では、プロセスについて説明します.
前節ではGIL(グローバル解釈ロック)の問題でマルチスレッドはマルチコアプロセッサを十分に利用できないと述べ,CPU計算型のタスクであればマルチプロセスモジュールmultiprocessingを用いるべきである.スレッド・ライブラリとはまったく異なりますが、2つのライブラリの構文とインタフェースは非常に似ています.Multiprocessingは各プロセスに個別のPython解釈器を与え,グローバル解釈ロックによる問題を回避する.しかし、あまり早く喜ばないでください.これからお話しするマルチプロセス間でどのように通信するかという問題に直面するからです.
まず、前節の例を単一プロセスとマルチプロセスに変更して、パフォーマンスを比較します.
   
   
   
   
  1. import time

  2. import multiprocessing

  3. def profile(func):

  4.    def wrapper(*args, **kwargs):

  5.        import time

  6.        start = time.time()

  7.        func(*args, **kwargs)

  8.        end   = time.time()

  9.        print 'COST: {}'.format(end - start)

  10.    return wrapper

  11. def fib(n):

  12.    if n<= 2:

  13.        return 1

  14.    return fib(n-1) + fib(n-2)

  15. @profile

  16. def nomultiprocess():

  17.    fib(35)

  18.    fib(35)

  19. @profile

  20. def hasmultiprocess():

  21.    jobs = []

  22.    for i in range(2):

  23.        p = multiprocessing.Process(target=fib, args=(35,))

  24.        p.start()

  25.        jobs.append(p)

  26.    for p in jobs:

  27.        p.join()

  28. nomultiprocess()

  29. hasmultiprocess()

   
   
   
   
  1. python profile_process.py

  2. COST: 4.66861510277

  3. COST: 2.5424861908

, , 2 fib(35), 。 ,GIL 。

: CPU 。 hasmultiprocess , Pool, :

   
   
   
   
  1. from multiprocessing import Pool

  2. pool = Pool(2)

  3. pool.map(fib, [35] * 2)

map map , 。

PS: , multiprocessing.Pool Python MapReduce。 。

dummy

/ , , , 。 , multiprocessing.dummy ,「dummy」 「 」 , , 。 :

multiprocessing.dummy replicates the API of multiprocess ing but is no more than a wrapper around the threading module.

!!! CPU IO , 2 :

   
   
   
   
  1. from multiprocessing import Pool

  2. from multiprocessing.dummy import Pool

。 , / 。

: CPU I/O , , 。

Pipe parmap

(IPC) rpc、socket、pipe( ) (queue)。 3 。 , :

   
   
   
   
  1. from multiprocessing import Process, Pipe

  2. def f(conn):

  3.    conn.send(['hello'])

  4.    conn.close()

  5. parent_conn, child_conn = Pipe()

  6. p = Process(target=f, args=(child_conn,))

  7. p.start()

  8. print parent_conn.recv()

  9. p.join()

Pipe 2 :「 」 「 」。 hello , ,  parent_conn.recv()が されます.これにより、Python のデータ をマルチプロセス で することが に できます.しかし、xmlrpclibにシーケンス できないオブジェクトは、このように できないことを に する.
の で べたhasmultiprocess はPoolのmapメソッドを いており,まあまあだ.しかし、 のビジネスでは、 の のように です.

   
   
   
   
  1. class CalculateFib(object):

  2.    @classmethod

  3.    def fib(cls, n):

  4.        if n<= 2:

  5.            return 1

  6.        return cls.fib(n-1) + cls.fib(n-2)

  7.    def map_run(self):

  8.        pool = Pool(2)

  9.        print pool.map(self.fib, [35] * 2)

  10. cl = CalculateFib()

  11. cl.map_run()

fib , :

   
   
   
   
  1. python parmap.py

  2. Exception in thread Thread-1:

  3. Traceback (most recent call last):

  4.  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 810, in __bootstrap_inner

  5.    self.run()

  6.  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/threading.py", line 763, in run

  7.    self.__target(*self.__args, **self.__kwargs)

  8.  File "/Library/Python/2.7/site-packages/multiprocessing-2.6.2.1-py2.7-macosx-10.9-intel.egg/multiprocessing/pool.py", line 225, in _handle_tasks

  9.    put(task)

  10. PicklingError: Can't pickle instancemethod'>: attribute lookup __builtin__.instancemethod failed

, 。 。 :

   
   
   
   
  1. from multiprocessing import Pool, Process, Pipe

  2. from itertools import izip

  3. def spawn(f):

  4.    def func(pipe, item):

  5.        pipe.send(f(item))

  6.        pipe.close()

  7.    return func

  8. def parmap(f, items):

  9.    pipe = [Pipe() for _ in items]

  10.    proc = [Process(target=spawn(f),

  11.                    args=(child, item))

  12.            for item, (parent, child) in izip(items, pipe)]

  13.    [p.start() for p in proc]

  14.    [p.join() for p in proc]

  15.    return [parent.recv() for (parent, child) in pipe]

  16. class CalculateFib(object):

  17.    ...

  18.    def parmap_run(self):

  19.        print parmap(self.fib, [35] * 2)

  20. cl = CalculateFib()

  21. cl.parmap_run()

parmap ( fib(35)) , , 。

。 , , , 。

Queue , Queue , 。 / , 2 : , :

   
   
   
   
  1. import time

  2. from multiprocessing import Process, JoinableQueue, Queue

  3. from random import random

  4. tasks_queue = JoinableQueue()

  5. results_queue = Queue()

  6. def double(n):

  7.    return n * 2

  8. def producer(in_queue):

  9.    while 1:

  10.        wt = random()

  11.        time.sleep(wt)

  12.        in_queue.put((double, wt))

  13.        if wt > 0.9:

  14.            in_queue.put(None)

  15.            print 'stop producer'

  16.            break

  17. def consumer(in_queue, out_queue):

  18.    while 1:

  19.        task = in_queue.get()

  20.        if task is None:

  21.            break

  22.        func, arg = task

  23.        result = func(arg)

  24.        in_queue.task_done()

  25.        out_queue.put(result)

  26. processes = []

  27. p = Process(target=producer, args=(tasks_queue,))

  28. p.start()

  29. processes.append(p)

  30. p = Process(target=consumer, args=(tasks_queue, results_queue))

  31. p.start()

  32. processes.append(p)

  33. tasks_queue.join()

  34. for p in processes:

  35.    p.join()

  36. while 1:

  37.    if results_queue.empty():

  38.        break

  39.    result = results_queue.get()

  40.    print 'Result:', result

, :

  1. , 0.9 tasks_queue put None,

  2. None , , tasks_queue , put results_queue

  3. ( join ), results_queue

Queue task_done join , JoinableQueue, results_queue Queue 。

CalculateFib , parmap , :

   
   
   
   
  1. from multiprocessing import Queue, Process, cpu_count

  2. def apply_func(f, q_in, q_out):

  3.    while not q_in.empty():

  4.        i, item = q_in.get()

  5.        q_out.put((i, f(item)))

  6. def parmap(f, items, nprocs = cpu_count()):

  7.    q_in, q_out = Queue(), Queue()

  8.    proc = [Process(target=apply_func, args=(f, q_in, q_out))

  9.            for _ in range(nprocs)]

  10.    sent = [q_in.put((i, item)) for i, item in enumerate(items)]

  11.    [p.start() for p in proc]

  12.    res = [q_out.get() for _ in sent]

  13.    [p.join() for p in proc]

  14.    return [item for _, item in sorted(res)]

enumerate , 。

multiprocessing Lock、Condition、Event、RLock、Semaphore threading , , , 。

multiprocessing 2 :

Value Array 。 :

   
   
   
   
  1. In : from multiprocessing.sharedctypes import typecode_to_type

  2. In : typecode_to_type

  3. Out:

  4. {'B': ctypes.c_ubyte,

  5. 'H': ctypes.c_ushort,

  6. 'I': ctypes.c_uint,

  7. 'L': ctypes.c_ulong,

  8. 'b': ctypes.c_byte,

  9. 'c': ctypes.c_char,

  10. 'd': ctypes.c_double,

  11. 'f': ctypes.c_float,

  12. 'h': ctypes.c_short,

  13. 'i': ctypes.c_int,

  14. 'l': ctypes.c_long,

  15. 'u': ctypes.c_wchar}

Value Array lock , RLock。

   
   
   
   
  1. from multiprocessing import Process, Lock

  2. from multiprocessing.sharedctypes import Value, Array

  3. from ctypes import Structure, c_bool, c_double

  4. lock = Lock()

  5. class Point(Structure):

  6.    _fields_ = [('x', c_double), ('y', c_double)]

  7. def modify(n, b, s, arr, A):

  8.    n.value **= 2

  9.    b.value = True

  10.    s.value = s.value.upper()

  11.    arr[0] = 10

  12.    for a in A:

  13.        a.x **= 2

  14.        a.y **= 2

  15. n = Value('i', 7)

  16. b = Value(c_bool, False, lock=False)

  17. s = Array('c', 'hello world', lock=lock)

  18. arr = Array('i', range(5), lock=True)

  19. A = Array(Point, [(1.875, -6.25), (-5.75, 2.0)], lock=lock)

  20. p = Process(target=modify, args=(n, b, s, arr, A))

  21. p.start()

  22. p.join()

  23. print n.value

  24. print b.value

  25. print s.value

  26. print arr[:]

  27. print [(a.x, a.y) for a in A]

。 2 :

  1. typecode_to_type , ctypes 。

  2. arr int , array list , SynchronizedArray , , append/extend 。

   
   
   
   
  1. python shared_memory.py

  2. 49

  3. True

  4. HELLO WORLD

  5. [10, 1, 2, 3, 4]

  6. [(3.515625, 39.0625), (33.0625, 4.0)]

multiprocessing.Manager , 。 :

  1. Namespace。 。

  2. Value/Array。 ctypes 。

  3. dict/list。 dict/list, 。

  4. Condition/Event/Lock/Queue/Semaphore。 。

   
   
   
   
  1. from multiprocessing import Manager, Process

  2. def modify(ns, lproxy, dproxy):

  3.    ns.a **= 2

  4.    lproxy.extend(['b', 'c'])

  5.    dproxy['b'] = 0

  6. manager = Manager()

  7. ns = manager.Namespace()

  8. ns.a = 1

  9. lproxy = manager.list()

  10. lproxy.append('a')

  11. dproxy = manager.dict()

  12. dproxy['b'] = 2

  13. p = Process(target=modify, args=(ns, lproxy, dproxy))

  14. p.start()

  15. print 'PID:', p.pid

  16. p.join()

  17. print ns.a

  18. print lproxy

  19. print dproxy

id 8341 :

   
   
   
   
  1. python manager.py

  2. PID: 8341

  3. 1

  4. ['a', 'b', 'c']

  5. {'b': 0}

, Manager Queue (C/S )。

   
   
   
   
  1. from multiprocessing.managers import BaseManager

  2. host = '127.0.0.1'

  3. port = 9030

  4. authkey = 'secret'

  5. shared_list = []

  6. class RemoteManager(BaseManager):

  7.    pass

  8. RemoteManager.register('get_list', callable=lambda: shared_list)

  9. mgr = RemoteManager(address=(host, port), authkey=authkey)

  10. server = mgr.get_server()

  11. server.serve_forever()

shared_list , :

   
   
   
   
  1. from multiprocessing.managers import BaseManager

  2. host = '127.0.0.1'

  3. port = 9030

  4. authkey = 'secret'

  5. class RemoteManager(BaseManager):

  6.    pass

  7. RemoteManager.register('get_list')

  8. mgr = RemoteManager(address=(host, port), authkey=authkey)

  9. mgr.connect()

  10. l = mgr.get_list()

  11. print l

  12. l.append(1)

  13. print mgr.get_list()

, client callable 。

PS