マルチプロセス(二)-信号伝達とプロセス制御

14088 ワード

コンテンツディレクトリ:

  • multiprocessing.Queue()
  • JoinableQueue
  • プロセス間の信号伝達Event
  • リソースへのアクセス制御ロック
  • 同期動作Condition
  • リソースへの同時アクセス制御Semaphore
  • 管理共有状態Manager
  • 共有ネーミングスペースmgr.Namespace()
  • プロセスプールPool

  • 1. multiprocessing.Queue()


    スレッドと同様に、マルチプロセスの一般的な使用パターンは、1つのタスクを複数のworkerに分割して並列に実行することです.マルチプロセスを効率的に使用するには、通常、それらの間のいくつかの通信が必要であり、これにより、作業が分割され、結果が集約されることができる.1つの簡単な方法は、キューmultiprocessing.Queue()を使用してメッセージを往復伝達することである.pickleでシーケンス化できる任意のオブジェクトは、キューを通過できます.
    import multiprocessing
    
    
    class MyFancyClass:
    
        def __init__(self, name):
            self.name = name
    
        def do_something(self):
            proc_name = multiprocessing.current_process().name
            print('Doing something fancy in {} for {}!'.format(
                proc_name, self.name))
    
    
    def worker(q):
        obj = q.get()
        obj.do_something()
    
    
    
    if __name__ == '__main__':
        queue = multiprocessing.Queue()
    
        p = multiprocessing.Process(target=worker, args=(queue,))
        p.start()
    
        queue.put(MyFancyClass('Fancy Dan'))
    
        # Wait for the worker to finish
        queue.close()
        queue.join_thread()
        p.join()
    

    結果:qが空の場合、q.get()が待ちます.
    Doing something fancy in Process-1 for Fancy Da
    

    2. JoinableQueue


    JoinableQueueのインスタンスpは、Queueオブジェクトと同じ方法の他に、以下の方法を有する.
  • q.task_done():ユーザはこの方法を用いてq.get()の戻り項目が処理されたことを示す信号を送信する.このメソッドを呼び出す回数がキューから削除するアイテムの数より大きい場合、ValueError例外
  • が発生します.
  • q.join():生産者は、キュー内のすべてのアイテムが処理されるまで、このメソッドをブロックするように呼び出します.ブロックがキュー内に続く各アイテムはq.task_を呼び出すdone()メソッドまで
  • Noneという特殊な値を使用してWorkerを終了するかどうかを判断します
    import multiprocessing
    import time
    
    
    class Consumer(multiprocessing.Process):
    
        def __init__(self, task_queue, result_queue):
            multiprocessing.Process.__init__(self)
            self.task_queue = task_queue
            self.result_queue = result_queue
    
        def run(self):
            proc_name = self.name
            while True:
                next_task = self.task_queue.get()
                if next_task is None:
                    # Poison pill means shutdown
                    print('{}: Exiting'.format(proc_name))
                    self.task_queue.task_done()
                    break
                # next_task Task() , next_task __str__
                print('{}: {}'.format(proc_name, next_task))
                #  next_task() __call__
                answer = next_task()
                self.task_queue.task_done()
                self.result_queue.put(answer)
    
    
    class Task:
    
        def __init__(self, a, b):
            self.a = a
            self.b = b
    
        def __call__(self):
            time.sleep(0.1)  # pretend to take time to do the work
            return '{self.a} * {self.b} = {product}'.format(
                self=self, product=self.a * self.b)
    
        def __str__(self):
            return '{self.a} * {self.b}'.format(self=self)
    
    
    if __name__ == '__main__':
        # Establish communication queues
        tasks = multiprocessing.JoinableQueue()
        results = multiprocessing.Queue()
    
        # Start consumers
        num_consumers = multiprocessing.cpu_count() * 2
        print('Creating {} consumers'.format(num_consumers))
        consumers = [
            Consumer(tasks, results)
            for i in range(num_consumers)
        ]
        for w in consumers:
            w.start()
    
        # Enqueue jobs
        num_jobs = 10
        for i in range(num_jobs):
            tasks.put(Task(i, i))
    
        # Add a poison pill for each consumer
        for i in range(num_consumers):
            tasks.put(None)
    
        # Wait for all of the tasks to finish
        tasks.join()
    
        # Start printing results
        while num_jobs:
            result = results.get()
            print('Result:', result)
            num_jobs -= 1
    

    実行結果:
    Creating 8 consumers
    Consumer-4: 0 * 0
    Consumer-1: 1 * 1
    Consumer-2: 2 * 2
    Consumer-4: 3 * 3
    Consumer-1: 4 * 4
    Consumer-2: 5 * 5
    Consumer-1: 6 * 6
    Consumer-6: 7 * 7
    Consumer-4: 8 * 8
    Consumer-2: 9 * 9
    Consumer-1: Exiting
    Consumer-4: Exiting
    Consumer-6: Exiting
    Consumer-2: Exiting
    Consumer-5: Exiting
    Consumer-8: Exiting
    Consumer-3: Exiting
    Consumer-7: Exiting
    Result: 0 * 0 = 0
    Result: 1 * 1 = 1
    Result: 2 * 2 = 4
    Result: 4 * 4 = 16
    Result: 3 * 3 = 9
    Result: 5 * 5 = 25
    Result: 6 * 6 = 36
    Result: 8 * 8 = 64
    Result: 7 * 7 = 49
    Result: 9 * 9 = 81
    

    3.プロセス間の信号伝達イベント


    Eventクラスは、プロセス間でステータス情報を伝達する簡単な方法を提供します.wait()が超過した場合、エラーは返されません.呼び出し元はis_の使用を担当します.set()イベントの状態をチェックする
    import multiprocessing
    import time
    
    
    def wait_for_event(e):
        """Wait for the event to be set before doing anything"""
        print('wait_for_event: starting')
        e.wait()
        print('wait_for_event: e.is_set()->', e.is_set())
    
    
    def wait_for_event_timeout(e, t):
        """Wait t seconds and then timeout"""
        print('wait_for_event_timeout: starting')
        e.wait(t)
        print('wait_for_event_timeout: e.is_set()->', e.is_set())
    
    
    if __name__ == '__main__':
        e = multiprocessing.Event()
        w1 = multiprocessing.Process(
            name='block',
            target=wait_for_event,
            args=(e,),
        )
        w1.start()
    
        w2 = multiprocessing.Process(
            name='nonblock',
            target=wait_for_event_timeout,
            args=(e, 2),
        )
        w2.start()
        print('main: waiting before calling Event.set()')
        time.sleep(3)
        e.set()
        print('main: event is set')
    

    実行結果:
    main: waiting before calling Event.set()
    wait_for_event: starting
    wait_for_event_timeout: starting
    wait_for_event_timeout: e.is_set()-> False
    main: event is set
    wait_for_event: e.is_set()-> True
    

    4.リソースへのアクセス制御ロック


    複数のプロセス間で単一のリソースを共有する必要がある場合は、ロックを使用して競合するアクセスを回避できます.
    import multiprocessing
    import sys
    
    
    def worker_with(lock):
        with lock:
            sys.stdout.write('Lock acquired via with
    ') def worker_no_with(lock): lock.acquire() try: sys.stdout.write('Lock acquired directly
    ') finally: lock.release() if __name__ == '__main__': lock = multiprocessing.Lock() w = multiprocessing.Process( target=worker_with, args=(lock,), ) nw = multiprocessing.Process( target=worker_no_with, args=(lock,), ) w.start() nw.start() w.join() nw.join()

    実行結果:
    Lock acquired via with
    Lock acquired directly
    

    5.同期操作Condition


    cond.待ってるよnotify_all()通知は下へ実行できます
    import multiprocessing
    import time
    
    
    def stage_1(cond):
        """perform first stage of work,
        then notify stage_2 to continue
        """
        name = multiprocessing.current_process().name
        print('Starting', name)
        with cond:
            print('{} done and ready for stage 2'.format(name))
            cond.notify_all()
    
    
    def stage_2(cond):
        """wait for the condition telling us stage_1 is done"""
        name = multiprocessing.current_process().name
        print('Starting', name)
        with cond:
            cond.wait()
            print('{} running'.format(name))
    
    
    if __name__ == '__main__':
        condition = multiprocessing.Condition()
        s1 = multiprocessing.Process(name='s1',
                                     target=stage_1,
                                     args=(condition,))
        s2_clients = [
            multiprocessing.Process(
                name='stage_2[{}]'.format(i),
                target=stage_2,
                args=(condition,),
            )
            for i in range(1, 3)
        ]
    
        for c in s2_clients:
            c.start()
            time.sleep(1)
        s1.start()
    
        s1.join()
        for c in s2_clients:
            c.join()
    

    実行結果:この例では、2つのプロセスが第2のフェーズの作業を並列に実行しますが、第1のフェーズが完了した後にのみ実行されます.
    Starting stage_2[1]
    Starting stage_2[2]
    Starting s1
    s1 done and ready for stage 2
    stage_2[1] running
    stage_2[2] running
    

    6.リソースへの同時アクセスの制御Semaphore


    複数のworkerが1つのリソースに一度にアクセスできるようにすることは便利ですが、数を制限する必要があります.
    import multiprocessing
    import time
    
    def worker(s, i):
        s.acquire()
        print(multiprocessing.current_process().name + "acquire");
        time.sleep(i)
        print(multiprocessing.current_process().name + "release
    "); s.release() if __name__ == "__main__": s = multiprocessing.Semaphore(2) for i in range(5): p = multiprocessing.Process(target = worker, args=(s, i*2)) p.start()

    実行結果:
    Process-2acquire
    Process-3acquire
    Process-2release
    
    Process-4acquire
    Process-3release
    
    Process-1acquire
    Process-1release
    
    Process-5acquire
    Process-4release
    
    Process-5release
    

    7.共有ステータスマネージャの管理


    Managerで情報を共有することで、すべてのプロセスが表示されます.
    import multiprocessing
    import pprint
    
    
    def worker(d, key, value):
        d[key] = value
    
    
    if __name__ == '__main__':
        mgr = multiprocessing.Manager()
        d = mgr.dict()
        jobs = [
            multiprocessing.Process(
                target=worker,
                args=(d, i, i * 2),
            )
            for i in range(10)
        ]
        for j in jobs:
            j.start()
        for j in jobs:
            j.join()
        print('Results:', d
    

    実行結果:マネージャによってリストが作成され、共有され、すべてのプロセスで更新が表示されます.辞書もサポートしています.
    Results: {0: 0, 2: 4, 3: 6, 1: 2, 4: 8, 6: 12, 5: 10, 7: 14, 8: 16, 9: 18}
    

    8.共有ネーミングスペースManager


    辞書とリストに加えて、管理者は共有された名前空間を作成することもできます.
    import multiprocessing
    
    
    def producer(ns, event):
        ns.value = 'This is the value'
        event.set()
    
    
    def consumer(ns, event):
        try:
            print('Before event: {}'.format(ns.value))
        except Exception as err:
            print('Before event, error:', str(err))
        event.wait()
        print('After event:', ns.value)
    
    
    if __name__ == '__main__':
        mgr = multiprocessing.Manager()
        namespace = mgr.Namespace()
        event = multiprocessing.Event()
        p = multiprocessing.Process(
            target=producer,
            args=(namespace, event),
        )
        c = multiprocessing.Process(
            target=consumer,
            args=(namespace, event),
        )
    
        c.start()
        p.start()
    
        c.join()
        p.join()
    

    実行結果:別のプロセスでmgr.Namespace()はレプリケーションを行い、他のプロセスにアクセスできます.
    Before event, error: 'Namespace' object has no attribute 'value'
    After event: This is the value
    

    重要なのはmgr.を知ることですNamespace()の可変値のコンテンツの更新は自動的に伝播しません.
    import multiprocessing
    
    
    def producer(ns, event):
        # DOES NOT UPDATE GLOBAL VALUE!
        ns.my_list.append('This is the value')
        event.set()
    
    
    def consumer(ns, event):
        print('Before event:', ns.my_list)
        event.wait()
        print('After event :', ns.my_list)
    
    
    if __name__ == '__main__':
        mgr = multiprocessing.Manager()
        namespace = mgr.Namespace()
        namespace.my_list = []
    
        event = multiprocessing.Event()
        p = multiprocessing.Process(
            target=producer,
            args=(namespace, event),
        )
        c = multiprocessing.Process(
            target=consumer,
            args=(namespace, event),
        )
    
        c.start()
        p.start()
    
        c.join()
        p.join()
    

    実行結果:
    Before event: []
    After event : []
    

    9.プロセスプールPool


    プールクラスは、固定数のworkerを管理し、簡単な作業に使用できます.この場合、作業を分解して独立してworkerに割り当てることができます.
    import multiprocessing
    
    
    def do_calculation(data):
        return data * 2
    
    
    def start_process():
        print('Starting', multiprocessing.current_process().name)
    
    
    if __name__ == '__main__':
        inputs = list(range(10))
        print('Input   :', inputs)
    
        builtin_outputs = map(do_calculation, inputs)
        print('Built-in:', builtin_outputs)
    
        pool_size = multiprocessing.cpu_count() * 2
        pool = multiprocessing.Pool(
            processes=pool_size,
            initializer=start_process,
        )
        pool_outputs = pool.map(do_calculation, inputs)
        pool.close()  # no more tasks
        pool.join()  # wrap up current tasks
    
        print('Pool    :', pool_outputs)
    

    実行結果:プロセスの戻り値が収集され、リストとして返されます.
    Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    Built-in: 
    Starting SpawnPoolWorker-2
    Starting SpawnPoolWorker-3
    Starting SpawnPoolWorker-4
    Starting SpawnPoolWorker-1
    Starting SpawnPoolWorker-6
    Starting SpawnPoolWorker-5
    Starting SpawnPoolWorker-7
    Starting SpawnPoolWorker-8
    Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    

    デフォルトでは、プールは一定数のworkerプロセスを作成し、より多くの作業がなくなるまでジョブを渡します.maxtasksperchildパラメータを設定すると、プールがいくつかのタスクを完了した後にworkerプロセスを再起動し、長時間実行するworkerがより多くのシステムリソースを消費することを防止します.
    import multiprocessing
    
    
    def do_calculation(data):
        return data * 2
    
    
    def start_process():
        print('Starting', multiprocessing.current_process().name)
    
    
    if __name__ == '__main__':
        inputs = list(range(10))
        print('Input   :', inputs)
    
        builtin_outputs = map(do_calculation, inputs)
        print('Built-in:', builtin_outputs)
    
        pool_size = multiprocessing.cpu_count() * 2
        pool = multiprocessing.Pool(
            processes=pool_size,
            initializer=start_process,
            maxtasksperchild=2,
        )
        pool_outputs = pool.map(do_calculation, inputs)
        pool.close()  # no more tasks
        pool.join()  # wrap up current tasks
    
        print('Pool    :', pool_outputs)
    

    実行結果:労働者が割り当てられたタスクを完了すると、より多くの仕事がなくても、仕事を再開します.この出力では、10個のタスクのみにもかかわらず、1回に2つのタスクを完了できるワークが9個作成されます.
    Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    Built-in: 
    Starting SpawnPoolWorker-4
    Starting SpawnPoolWorker-2
    Starting SpawnPoolWorker-1
    Starting SpawnPoolWorker-5
    Starting SpawnPoolWorker-3
    Starting SpawnPoolWorker-8
    Starting SpawnPoolWorker-6
    Starting SpawnPoolWorker-7
    Starting SpawnPoolWorker-9
    Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]