Pythonマルチプロセスプログラミング

10713 ワード

Pythonマルチプロセスプログラミング
Pythonの解釈器のデフォルトはスレッドが安全で、その主な措置はGIL(Global Interpreter Lock)というメカニズムである.
GILメカニズムは、Pythonオブジェクトの参照カウントの操作がすべて原子操作であることを保証し、マルチスレッドによる参照オブジェクトのメモリ漏洩や誤った参照が発生しないようにします.しかし、このメカニズムはマルチスレッドの性能を制限している.GILが存在するため、Pythonは一度に1つのスレッドしか実行できません.
多くのI/O操作(ファイルの読み書き、ネットワーク通信など)がない限り、マルチスレッドはプログラムのパフォーマンスを向上させることはできません.
もう1つの性能向上は,マルチCPUを利用する方式がマルチプロセスである.
ここでは、Pythonライブラリmultiprocessingを使用して、マルチコアCPUを利用するマルチプロセスを生成する方法について説明します.
マルチプロセスライブラリmultiprocessing
MultiprocessingはPythonの標準ライブラリの1つで、公式ドキュメントはmultiprocessing-Process-based parallelismを参照しています.
私たちは主にその2つのクラスを使って出産プロセスを行います.
  • Processプロセスクラス
  • Poolプロセスプールクラス
  • プロセスクラスプロセス
    Processクラスは単独で呼び出すことも、継承して使用することもできます.主な操作は次のとおりです.
  • Process.start()サブプロセス
  • を開始
  • Process.join()サブプロセス終了待ち(ブロック待ち)
  • Process.is_alive()は、サブプロセスが実行(実行中にTrueを返す)
  • であるか否かを判断する.
  • Process.close()サブプロセス終了(使用を推奨せず、例外を放出)
  • ここでは呼び出しを例に挙げます.
    #!/usr/bin/python
    
    from __future__ import print_function  # at top of module
    from __future__ import division, unicode_literals, with_statement
    
    import time
    import os
    from multiprocessing import Process
    
    
    def task_func(timeout):
        name = 'task[%d]' % os.getpid()
        print('%s start' % name)
        for i in range(timeout):
            time.sleep(1)
            print('%s waiting(%d/%d)' % (name, i, timeout))
        print('%s end' % name)
    
    
    def main():
        print('main start')
    
        #      ,    task_func
        p1 = Process(target=task_func, args=(5,))
        p1.start()
    
        #            
        for i in range(3):
            print('do something %d' % i)
            time.sleep(1)
    
        #        
        p1.join()
    
        print('main end')
    
    
    if __name__ == '__main__':
        main()

    出力は次のとおりです.
    >  ./mutliproc.py
    main start
    do something 0
    task[30280] start
    do something 1
    task[30280] waiting(0/5)
    do something 2
    task[30280] waiting(1/5)
    task[30280] waiting(2/5)
    task[30280] waiting(3/5)
    task[30280] waiting(4/5)
    task[30280] end
    main end

    ここでは、キューQueueまたはパイプPipeを使用して、親プロセスとサブプロセス間のデータ交換を行うこともできます.
    次の意味キューQueueを例にとると、プロセスは一方向にデータを転送します.
    #!/usr/bin/python
    
    from __future__ import print_function  # at top of module
    from __future__ import division, unicode_literals, with_statement
    
    import time
    import os
    from multiprocessing import Queue, Process
    
    
    def task_func(queue, timeout):
        name = 'task[%d]' % os.getpid()
        print('%s start' % name)
        for i in range(timeout):
            time.sleep(1)
            #        ,          ,       
            data = {
                'task': name,
                'progress': i,
                'timeout': timeout,
            }
            queue.put(data)
        print('%s end' % name)
    
    
    def main():
        print('main start')
    
        #        
        q = Queue()
    
        #      ,              
        p1 = Process(target=task_func, args=(q, 5))
        p1.start()
    
        #            
        for i in range(3):
            print('do something %d' % i)
            time.sleep(1)
    
        #        
        p1.join()
    
        #       
        while not q.empty():
            print(q.get())
    
        print('main end')
    
    
    if __name__ == '__main__':
        main()
    

    データの双方向交換が必要なのはPipeクラスです.本人が使えないので、ここに公式の例を貼ります.
    from multiprocessing import Process, Pipe
    
    def f(conn):
        conn.send([42, None, 'hello'])
        conn.close()
    
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=f, args=(child_conn,))
        p.start()
        print(parent_conn.recv())   # prints "[42, None, 'hello']"
        p.join()

    プロセスプールクラスPool
    プロセスプールを使用する利点は、限られたプロセスを作成して大量のタスクを自動的に並列に処理できることです.
    Poolクラスの主な方法:
  • Pool.apply()起動タスク(ブロック待機タスク完了)
  • Pool.map()一括起動タスク(ブロックはすべてのタスクの完了を待つ)
  • Pool.apply_async()起動タスク(非ブロック)
  • Pool.map_async()一括起動タスク(非ブロック)
  • ここでは最もよく用いられる非ブロックを例に挙げる
    #!/usr/bin/python
    
    from __future__ import print_function  # at top of module
    from __future__ import division, unicode_literals, with_statement
    
    import time
    import os
    from multiprocessing import Pool
    
    
    def task_func(timeout):
        name = 'task[%d]' % os.getpid()
        print('%s start' % name)
        for i in range(timeout):
            time.sleep(1)
            print('%s waiting(%d/%d)' % (name, i, timeout))
        print('%s end' % name)
        #   return     
        return {'task': name, 'timeout': timeout}
    
    
    def main():
        print('main start')
    
        #      ,       4   
        pool = Pool(4)
    
        #   6   ,         (    4     ,   2    )
        ret = [pool.apply_async(task_func, args=(i,)) for i in range(6)]
    
        #            
        for each in ret:
            print(each.get())
    
        print('main end')
    
    
    if __name__ == '__main__':
        main()

    注意:1.Queueは、プロセスプールによって開始するサブプロセス2にパラメータで渡すことができない.プロセスプールで開始されたサブプロセスはreturnで結果データを返すことができます.
    まとめ
  • データ交換を必要とせず、結果のみを見るタスクについては、プロセスプール
  • の使用を推奨する.
  • 共有リソースをできるだけパラメータ形式でサブプロセスに転送する(LinuxとWindowsの動作の一致を維持する)
  • .