Pythonマルチスレッドとマルチプロセス(六)マルチプロセスプログラミングと同期


このシリーズの記事ディレクトリ
展開/終了
  • Pythonマルチスレッドとマルチプロセス(一)GILロックとThreadを使用したマルチスレッド
  • の作成
  • Pythonマルチスレッドとマルチプロセス(二)スレッドの同期の反発ロックと再入ロック
  • Pythonマルチスレッドとマルチプロセス(3)スレッドの同期の条件変数
  • Pythonマルチスレッドとマルチプロセス(4)スレッドの同期信号量
  • Pythonマルチスレッドとマルチプロセス(5)マルチスレッド管理-スレッドプール
  • Pythonマルチスレッドとマルチプロセス(六)マルチプロセスプログラミングと同期
  • マルチプロセスプログラミング
    マルチプロセスとマルチスレッドの比較:1.pythonにはGILロックがあり、1つのプロセスが1つのGILロックを管理しているため、マルチスレッドはマルチコアを使用できません.つまり、同じ時点で1つのスレッドしか実行できません.マルチプロセスプログラミングではマルチコアを使用できます.つまり、マルチプロセスは並列に実行することができ、同じ時点でマルチプロセスは複数のCPUを使用して同時に実行することができ、マルチスレッドは実際に同時に実行することができない.
    一言:pythonのマルチスレッドは同時でしか並列できません.マルチプロセスは並列できます(コンピュータがマルチコアであることを前提としています).
    2.マルチスレッドは同時並行しかできないため、マルチIO操作、CPU操作の少ないタスク、例えば爬虫類、ディスク読み書きなどのタスクに適している.マルチプロセスは逆に,少ないIO操作,マルチCPU操作のタスク,例えば純計算のタスクに適している.なぜなら、IO操作には待機があり、スレッドは待機中にCPUを他のスレッドに動作させ、プログラム全体がずっと仕事をしているようにすることができるからです.しかし、CPUを消費する操作であれば、ブロック待ちの場合はほとんどなく、各スレッドがブロックのためにCPUを譲るのではなく、タイムスライスが切れたためにCPUを譲る.この場合、マルチスレッド間の切り替えのため、マルチスレッドは単一スレッドよりも遅くなります.
    一方、マルチプロセスでは、パラレル可能であるため、複数のプロセスが同時に複数のCPUを消費する操作を完了し、時間を節約することができる.例を挙げて、私は今2つの比較的巨大な演算を完成したいと思っています:AとBの任務.つまり、2つのサブプロセス、1つの計算Aタスク、1つの計算Bタスクを生成することができます.AとBタスクは並列であり、同時よりも速い.しかし、複数のプロセスがタスクAの演算を一緒に完了し、Aの演算時間を半減させることができると考えているのは間違っている.一方、IO操作では、待ち時間に遭遇すると、プロセスはCPUを他のプログラムのプロセスに譲渡し、この間、このプロセスは何もできません.また、マルチプロセスの切り替えには時間がかかり、プロセスが消費するリソースが多くなります(メモリなど、forkは親プロセスのメモリをコピーするプロセスです).そのため、マルチスレッドを使用するほうがいいです.
    3.マルチスレッドの切り替えはマルチプロセスの切り替えよりも損失が少なく、マルチプロセスの切り替えが遅い.スレッドは軽量級で、プロセスは重量級です.
    PS:windowsでpythonマルチプロセスはif__でなければなりませんname__=="__main__":次の実行を実行します.そうしないと、エラーが発生します.Linuxではこの問題は存在しません.
    1.マルチプロセス間でデータを共有しない
    例:
    from multiprocessing import Process
    
    num=10
    def task1:
      global num
      num+=5
      print("   1 num:%d" % num)
     
    def task2:
      global num
      num+=10
      print("   2 num:%d" % num)
     
    p1 = Process(target=task1)
    p2 = Process(target=task2)
    p1.start()   # p1.pid        id
    p2.start()
    p1.join()
    p2.join()
    print("    num:%d" % num)
    

    結果:サブプロセス1のnum:15サブプロセス2のnum:20メインプロセスのnum:10
    ここから,3つのプロセス間のnumは共有されていないことが分かる.共有されている場合は15,25
    原因詳細分析:メインプロセスはサブプロセスを作成するため、最下位でfork()が実行され、メインプロセスのPCB、メモリ、データなどをコピーする別の空間にメインプロセスを完全にコピーします.したがって,上記のプログラムでは2つのサブプロセスが生成され,2つの空間が開かれ,メインプロセスの内容が2つの空間にコピーされる.メインプロセス自体を加えると、3つの空間があります.3つの空間にはそれぞれnumがあり、この3つのnum変数は互いに干渉しない.したがって、各プロセスは自分のnumを自分で操作し、他の2つのプロセスのnumに影響を与えません.
    マルチプロセスは、start()、join()などのマルチスレッドの多くのインタフェースと同じです.ProcessはThreadのように他のクラスに継承されてもよい.
     
    2.プロセスプールを使用したプログラミング
    プロセス・プールとスレッド・プールこそ、プロセス・スレッドのベスト・プラクティスです.
    # coding=utf-8
    
    from multiprocessing import Pool
    
    def task(num):
      res = 0
      for i in range(num):
        res+=i
    
      return res
    
    
    if __name__=="__main__":
      #        
      pool = Pool()  #              
      task1 = pool.apply_async(task,args=(10000,))  #         submit,          
      task2 = pool.apply_async(task,args=(20000,))
    
      pool.close()  #      ,           
      pool.join()   #           ,    pool.close()    join()    
    
      print(task1.get()) #     ,                     ,  get()       join,               
      print(task2.get())
     
    

        
    Poolのソースコードはここには貼られていません.Poolのソースコードを見る時間があります.
    スレッドプールThreadPoolExecutorのmap()メソッドと同様にimapメソッドを使用できます.
    if __name__=="__main__":
      #        
      pool = Pool()  #              
    
      for result in pool.imap(task,[2000000,200000]):
        print(result)
    

    結果:(5秒後)1999999億円19999億円
    imap_unorderedメソッドは、imapと似ていますが、imap_unorderedは、先に実行したタスクの結果を返しますが、imapは、すべてのタスクが実行された後、転送されたタスクの順序でタスクの結果を返します.
    if __name__=="__main__":
      #        
      pool = Pool()  #              
    
      for result in pool.imap_unordered(task,[2000000,200000]):
        print(result)
    

    結果:(1秒過ぎ)19999900000(4秒過ぎ)19999900000
    それ以外はPoolの他にconcurrentを使用することもできる.futures.ProcessPoolExecutorプロセスプール、ThreadPoolExecutorスレッドプールの方法と一致する
    3.プロセス間通信
    プロセス間通信の3つの方法:キューQueue、パイプPipe、共有メモリマネージャ、sharedctypes
    3-1共有メモリ
    プロセスベースを学びすぎた友人は、共有メモリがプロセス間通信方式の中で最も簡単で最も速い方法であることを知っています.
    共有メモリの原理:複数のプロセスの仮想メモリ(ページ)が同じ物理メモリにマッピングされ、複数のプロセスがこの物理メモリのデータを共有できます.マルチプロセスはメモリを共有することでデータを共有できますが、システムカーネルは共有メモリ内のデータの変更とアクセスを同期する責任を負いません.複数のプロセスが同時または並列に共有メモリ内の同じデータを変更すると、データが一致しない可能性が高いという意味です.
    Multiprocessingでは、共有メモリを作成する方法としてManagerとsharedctypesの2つがあります.
    Managerの効率は低いが、リモート共有メモリはサポートされている.sharedctypesは効率が高く、高速Managerの2桁で、マルチプロセスアクセス時に通常のメモリアクセスに相当する例です.
    from multiprocessing import Process
    
    def task1:
      global num
      num+=5
      print("   1 num:%d" % num)
     
    def task2:
      global num
      num+=10
      print("   2 num:%d" % num)
     
    if __name__=="__main__":
      num = 10
      p1 = Process(target=task1)
      p2 = Process(target=task2)
      p1.start()   # p1.pid        id
      p2.start()
      p1.join()
      p2.join()
      print("    num:%d" % num)
    

     
    結果:サブプロセス1のnum:15サブプロセス2のnum:20メインプロセスのnum:10
    この例は、マルチプロセスがデータを共有しないことを示しています.
    ただし、変数numを共有メモリに格納すると、メインプロセスでは、p 1およびp 2プロセスにnum変数が1つずつあるのではなく、唯一のnum変数が1つだけ共有メモリに存在します.
    # coding=utf-8
    
    from multiprocessing import Manager
    from multiprocessing import Process
    
    def task1(num):
      num.value+=5
      print("   1 num:%d" % num.value)
    
    def task2(num):
      num.value+=10
      print("   2 num:%d" % num.value)
    
    if __name__=="__main__":
      manager = Manager()   #           
      num = manager.Value("abc",10)
      # print(num.value)
      p1 = Process(target=task1,args=(num,))
      p2 = Process(target=task2,args=(num,))
      p1.start()   
      p2.start()
      p1.join()
      p2.join()
      print("    num:%d" % num.value)
    

        
    共有メモリはプロセスの同期を保証しないので、複数のプロセスが同じ共有メモリの同じリソースまたは変数を操作するときにロックする必要があります.
    # coding=utf-8
    
    from multiprocessing import Manager
    from multiprocessing import Process
    
    def task1(num,lock):
      for i in range(10000):
        lock.acquire()
        num.value+=1
        lock.release()
    
    def task2(num,lock):
      for i in range(10000):
        lock.acquire()
        num.value-=1
        lock.release()
    
    if __name__=="__main__":
      manager = Manager()   #           
      num = manager.Value("abc",0)
      lock = manager.Lock()  #    
      p1 = Process(target=task1,args=(num,lock))
      p2 = Process(target=task2,args=(num,lock))
      p1.start()   
      p2.start()
      p1.join()
      p2.join()
      print("    num:%d" % num.value)
     
    

    最後に得られたnumは0
    PS:ここに加えたロックは、Manager共有オブジェクトの中で実例化されたロックでなければならない.Threadモジュールのロックではない.理由は簡単である.Threadモジュールのロックは共有変数であるが、マルチプロセスは変数を共有しないので、Threadのロックを使用すると、このロックはすべてのサブプロセスでコピーされ、このロックは1つのロックではなく3つのロックになる.プロセスの安全を保護する役割を果たすことはできません.Managerからインスタンス化されたロックは共有メモリに格納され,メインプロセス,p 1,p 2プロセスは1つのロックを共有する.
    Managerオブジェクトから作成された変数、ロック、条件変数、イベントオブジェクト、信号量、キューは共有メモリに格納されます.マルチプロセスプログラミングでは、共有メモリのロック、条件変数、キューなどしか使用できません.
     
    3-2キューQueue
    # coding=utf-8
    
    from multiprocessing import Manager
    from multiprocessing import Process,Queue
    
    def task1(queue):
      queue.put("a")
      print("queue put a")
    
    def task2(queue):
      res = queue.get()
      print("queue get %s" % res)
    
    if __name__=="__main__":
      queue = Queue()   #      manager = Manager(); queue=manager.Queue()  Queue
      p1 = Process(target=task1,args=(queue,))
      p2 = Process(target=task2,args=(queue,))
      p1.start()
      p2.start()
      p1.join()
      p2.join()
     
    

    PS: 1. マルチプロセスでキューQueueを使用して通信を行う場合、from queue import QueueのQueueを使用することはできません(そうでないとエラーが発生します)、from multiprocessing import QueueのQueueまたはManagerのQueueのみを使用できます
    2.プロセスプールを使用する場合.Poolが作成したプロセスでQueue通信を使用するには、ManagerのQueueのみを使用します.
    Queueはプロセスが安全で、中にロックが入っているので、キューが複数のプロセスで競合して使用されるときに変更される心配はありません.
     
    3-3 Pipe配管
    pipeは2つのプロセス間の通信にのみ使用できます.ソケットに似ていて、2つのプロセス間で接続が確立され、接続の両端にポートのような2つのポートが開きます.データの通信と交換はこの接続によって行われる.
    queueとは異なり、queueは複数のプロセスを通信させることができ、pipeは2つのプロセス間の通信にのみ使用できます.Queueの性能は低く、中にロックが入っているからです.pipeはロックされず,socketのような技術を用いてプロセス間で接続を確立した.
    # coding=utf-8
    
    from multiprocessing import Manager
    from multiprocessing import Process,Pipe
    from time import sleep
    
    def task1(conn):
      res = conn.recv()  #      ,       
      print("task1 recv ",res)
      conn.send("Got it")
    
    def task2(conn):
      # sleep(5)
      conn.send("abc")
      print("task2 send abc")
    
      res = conn.recv()
      print(res)
    
    if __name__=="__main__":
      # queue = Queue()
      conn1,conn2 = Pipe()  # Pipe()      ,           ,                 
      p1 = Process(target=task1,args=(conn1,))
      p2 = Process(target=task2,args=(conn2,))
      p1.start()
      p2.start()
      p1.join()
      p2.join()
     
    

    結果:task 2 send abc task 1 recv abc Got本文転載:張柏沛IT技術ブログ>Pythonマルチスレッドとマルチプロセス(六)マルチプロセスプログラミングと同期