Pythonマルチプロセスの使用

29280 ワード

一、普通のマルチプロセス
import time
import multiprocessing

def worker(flag):
    print("before sleep---%s" % flag)
    time.sleep(3)
    print("after sleep---%s" % flag)

if __name__ == '__main__':
    p_one = multiprocessing.Process(target=worker, args=("one",))
    p_two = multiprocessing.Process(target=worker, args=("two",))

    p_one.start()
    p_two.start()

出力結果:
before sleep—one before sleep—two after sleep—one after sleep—two
二、プロセスを守る
daemonによって実現され、darmonがTrueの場合、メインプロセスが終了すると、サブプロセスも強制的に終了する.
import time
import multiprocessing

def worker(flag):
    print("before sleep---%s" % flag)
    time.sleep(3)
    print("after sleep---%s" % flag)

if __name__ == '__main__':
    p_one = multiprocessing.Process(target=worker, args=("one",))
    p_two = multiprocessing.Process(target=worker, args=("two",))
    #        ,p_one     ,      after sleep---one
    p_one.daemon = True  # or False
    p_one.start()
    p_two.start()
    #      1   ,     before sleep   
    time.sleep(1)

出力結果:
before sleep—one before sleep—two after sleep—two
三、プロセスロック
これはmultiprocessing.ロック()は,操作を実行する前にacquire()でロックをかけ,終了後にrelease()でロックを解放することを実現する.with文を使用してロックの表示を避けることもできます.
import time
import multiprocessing

def worker_one(lock, flag):
    #     
    lock.acquire()
    print("one   ")
    time.sleep(3)
    print("one   ")
    #    
    lock.release()


def worker_two(lock, flag):
    #    with    ,      
    with lock:
        print("two   ")
        time.sleep(3)
        print("two   ")

if __name__ == '__main__':
    #    
    lock = multiprocessing.Lock()

    p_one = multiprocessing.Process(target=worker_one, args=(lock, 1))
    p_two = multiprocessing.Process(target=worker_two, args=(lock, 2))

    p_one.start()
    p_two.start()

出力結果:
oneロック...ちょっと待ってoneリリースtwoロック...ちょっと待ってtwoリリース
四、信号量
Semaphoreによって有限資源の使用制御を実現することができ、同時に多くのプロセスの崩壊システムを起動することを避けることができる.(プロセスプールの概念)
import time
import multiprocessing

def worker_pool(semaphore, flag):
    semaphore.acquire()
    print("I am %d" % flag)
    time.sleep(3)
    semaphore.release()

if __name__ == '__main__':
    semaphore = multiprocessing.Semaphore(2)
    n = 0
    while n < 6:
        p = multiprocessing.Process(target=worker_pool, args=(semaphore, n))
        p.start()
        n += 1
    print('      ~')

出力結果:2行ごとに約3秒間隔
I am 0 I am 1 I am 2 I am 3 I am 4 I am 5
五、プロセス間通信
プロセス間の通信はイベント(Event)によって実現される.主にメインスレッド制御の他のスレッドの実行に使用され、Golangのchannelに似ています.イベントは主に4つのメソッドset、wait、clear、is_を提供します.set.
一般的な方法
  • .set()イベントをTrueに設定し、コントローラで使用します.
  • .wait(timeout=Nont)待機イベントはTrueであり、状態実行を監視するために使用されます.int型パラメータ(タイムアウトイベント、秒単位)を受け入れます.
  • .cleat()イベントをFalseに設定し、すべてのwait状態がFalse終了を受信します.
  • .is_set()イベントステータスをチェックし、TrueまたはFalseを返します.
  • import time
    import multiprocessing
    
    
    def worker(e):
        print("worker: starting")
        e.wait()
        print("worker: .is_set():%s" % e.is_set())
    
    
    def worker_for_timeout(e):
        print("worker_for_timeout:starting")
        e.wait(2)
        print("wait_for_event_timeout: .is_set():%s" % e.is_set())
    
    
    if __name__ == "__main__":
        e = multiprocessing.Event()
        w1 = multiprocessing.Process(target=worker, args=(e,))
    
        w2 = multiprocessing.Process(target=worker_for_timeout, args=(e,))
        w1.start()
        w2.start()
    
        time.sleep(3)
        #                    
        e.set()
        print("Main e.set()")
    

    出力結果:
    worker: starting worker_for_timeout:starting wait_for_event_timeout: .is_set():False Main e.set() worker: .is_set():True
    六、行列
    Queueはマルチプロセスの安全なキューであり,Queueを用いてマルチプロセス間のデータ転送を実現できる.その通過put()はキューにデータを追加する.get()メソッドはキューからデータを取り出す.put()と.get()はいずれも2つのパラメータtimeout,blockedを受信、blockedがFalseまたはblockedがTrueであり、timeoutを待つ時間になると、キュー空がキューからデータを取り出すことに成功しなかったり、キューがいっぱいになったりしてキューにデータを入れることに成功しなかったりすると、Queueが投げ出す.EmptyかQueue.Full異常.
    import multiprocessing
    
    
    def process_push(q):
        q.put(1, block=False)
    
    
    def process_pull(q):
        print(q.get(block=True, timeout=3))
    
    
    if __name__ == "__main__":
        q = multiprocessing.Queue()
        reader = multiprocessing.Process(target=process_pull, args=(q,))
        reader.start()
    
        writer = multiprocessing.Process(target=process_push, args=(q,))
        writer.start()
    
        reader.join()
        writer.join()
    

    出力結果:
    1
    七、Pipe
    Pipeは、宣言時にduplexパラメータを受信し、配列(2つのConnectionインスタンス)を返します.duplexがFalseの場合、Connection 0のみが受信できます.Connection 1は送信を担当し、duplexがTrueの場合、Connection 0とConnection 1は受信および送信できます.
    Connectionオブジェクトには、次の2つの一般的な方法があります.
  • .send():Pipeに送信されたデータのパラメータを受信します.
  • .recv():receivedは、キューからデータを取り出し、Pipeにデータがない場合はブロックされ、Pipeが閉じている場合は例外が放出されます.
  • import multiprocessing
    import time
    
    
    def worker_one(pipe):
        while True:
            print("worker_one rev:", pipe.recv())
            time.sleep(1)
    
    
    def worker_two(pipe):
        for i in range(5):
            print("worker_two send: %s" % (i))
            pipe.send(i)
            time.sleep(1)
    
    
    if __name__ == "__main__":
        pipe = multiprocessing.Pipe(duplex=False)
        p1 = multiprocessing.Process(target=worker_one, args=(pipe[0],))
        p2 = multiprocessing.Process(target=worker_two, args=(pipe[1],))
    
        p1.start()
        p2.start()
    
        p1.join()
        p2.join()
    

    出力結果:
    worker_two send: 0 worker_one rev: 0 worker_two send: 1 worker_one rev: 1 worker_two send: 2 worker_one rev: 2 worker_two send: 3 worker_one rev: 3 worker_two send: 4 worker_one rev: 4
    八、プロセスプール
    これはmultiprocessing.プロセスプールを実装するPool.プールを宣言するときにint型データを受け入れて、プールのサイズを決定します.
    一般的な方法
  • .apply()ブロック式追加タスク、タスク順序実行
  • .apply_async()非ブロック追加タスク、タスク順序実行
  • .close()プールを閉じ、プールにタスク
  • を追加することはできません.
  • .join()プール内のすべてのタスクの実行が完了するのを待つ
  • .terminate()強制的にプールを閉じ、すべてのプロセスが
  • で終了します.
    import time
    import multiprocessing
    
    def worker(flag):
        print("before sleep---%s" % flag)
        time.sleep(3)
        print("after sleep---%s" % flag)
        return flag
    
    if __name__ == '__main__':
        #    
        pool = multiprocessing.Pool(processes=3)
        result = []
        for i in range(4):
            #     ,           ,    ApplyResult   ,     .get()     
            # pool.apply_async(worker, (i,))
            #      ,         ,      return  
            # pool.apply(worker, (i,))
            #                     
            result.append(pool.apply_async(worker, (i,)))
    
        #    ,      
        pool.close()
        #             
        pool.join()
    
        #         pool.ApplyResult
        for i in result:
            print("    %s" % i.get())
    
    

    出力結果:
    before sleep-0 before sleep-1 before sleep-2 after sleep-0 after sleep-1 after sleep-2 before sleep-3 after sleep-3結果セット0結果セット1結果セット2結果セット3
    さて、Pythonプロセスの操作についてはこれで紹介します.他の文章を見て、PythonのスレッドとスレッドとGILの関係を理解することができます.