python Queueによるプロセス間通信


プロセス間通信-Queue Process間では通信が必要になる場合があり、オペレーティングシステムはプロセス間の通信を実現するための多くのメカニズムを提供しています.
  • Queueの使用はmultiprocessingモジュールのQueueを使用してマルチプロセス間のデータ伝達を実現することができ、Queue自体はメッセージキュープログラムであり、まず小さな例でQueueの動作原理を実証する:
  • #coding=utf-8
    from multiprocessing import Queue
    q=Queue(3) #     Queue  ,       put  
    q.put("  1") 
    q.put("  2")
    print(q.full())  #False
    q.put("  3")
    print(q.full()) #True
    
    #           try      ,   try   2       ,   Try       
    try:
        q.put("  4",True,2)
    except:
        print("      ,      :%s"%q.qsize())
    
    try:
        q.put_nowait("  4")
    except:
        print("      ,      :%s"%q.qsize())
    
    #     ,           ,   
    if not q.full():
        q.put_nowait("  4")
    
    #     ,           ,   
    if not q.empty():
        for i in range(q.qsize()):
            print(q.get_nowait())

    実行結果:
    False
    True:3:3
      1
      2
      3

    説明Queue()オブジェクトを初期化する場合(例えば、q=Queue())、カッコに最大受信可能なメッセージ数が指定されていない場合、または数が負の値である場合、許容可能なメッセージ数に上限がない(メモリの終端まで).
    Queue.qsize():現在のキューに含まれるメッセージの数を返します.
    Queue.Empty():キューが空の場合はTrueを返し、逆にFalseを返します.
    Queue.full():キューがいっぱいになったら、Trueを返し、逆にFalseを返します.
    Queue.get([block[,timeout]):キュー内のメッセージを取得し、キューから削除します.blockのデフォルト値はTrueです.
    1)ブロックがデフォルト値を使用する、timeout(単位秒)が設定されていない場合、メッセージ列が空の場合、プログラムはブロックされ(読み出し状態で停止)、メッセージ列からメッセージが読み出されるまで、timeoutが設定されている場合、timeout秒が待機し、メッセージがまだ読み出されていない場合、「Queue」が投げ出される.Empty異常
    2)block値がFalseの場合、メッセージ列が空の場合、すぐに「Queue.Empty異常
    Queue.get_nowait():かなりQueue.get(False);
    Queue.put(item,[block[,timeout]):itemメッセージをキューに書き込み、blockのデフォルト値はTrue;
    1)ブロックがデフォルト値を使用する、timeout(単位秒)が設定されていない場合、メッセージ列に書き込み可能なスペースがない場合、プログラムはブロック(書き込み状態で停止)され、メッセージ列からスペースが空けるまで、timeoutが設定されている場合、timeout秒が待つ、スペースがない場合は「Queue」が投げ出される.Full異常
    2)block値がFalseの場合、メッセージ列に書き込み可能なスペースがない場合は、「Queue.Full異常
    Queue.put_nowait(item):かなりQueue.put(item, False); 2.QueueインスタンスQueueを例に、親プロセスで2つのサブプロセスを作成します.1つはQueueにデータを書き、もう1つはQueueからデータを読みます.
    from multiprocessing import Process, Queue
    import os, time, random
    
    #           :
    def write(q):
        for value in ['A', 'B', 'C']:
            print 'Put %s to queue...' % value
            q.put(value)
            time.sleep(random.random())
    
    #           :
    def read(q):
        while True:
            if not q.empty():
                value = q.get(True)
                print 'Get %s from queue.' % value
                time.sleep(random.random())
            else:
                break
    
    if __name__=='__main__':
        #      Queue,        :
        q = Queue()
        pw = Process(target=write, args=(q,))
        pr = Process(target=read, args=(q,))
        #      pw,  :
        pw.start()    
        #   pw  :
        pw.join()
        #      pr,  :
        pr.start()
        pr.join()
        # pr       ,       ,      :
        print ''
        print '           '

    実行結果:
    Put A to queue...
    Put B to queue...
    Put C to queue...
    Get A to queue...
    Get B to queue...
    Get C to queue...
               
  • プロセスプールのQueue Poolを使用してプロセスを作成するにはmultiprocessingを使用する必要があります.Multiprocessingではなく、Manager()のQueue()です.Queue()です.そうしないと、
  • というエラーメッセージが表示されます.
    RuntimeError: Queue objects should only be shared between processes through inheritance.

    次の例では、プロセスプール内のプロセスがどのように通信するかを示します.
    #coding=utf-8
    
    #  import  Queue Manager
    from multiprocessing import Manager,Pool
    import os,time,random
    
    def reader(q):
        print("reader  (%s),    (%s)"%(os.getpid(),os.getppid()))
        for i in range(q.qsize()):
            print("reader Queue     :%s"%q.get(True))
    
    def writer(q):
        print("writer  (%s),    (%s)"%(os.getpid(),os.getppid()))
        for i in "dongGe":
            q.put(i)
    
    if __name__=="__main__":
        print("(%s) start"%os.getpid())
        q=Manager().Queue() #  Manager  Queue    
        po=Pool()
        #          ,       reader       ,   writer       ,  reader   
        po.apply(writer,(q,))
        po.apply(reader,(q,))
        po.close()
        po.join()
        print("(%s) End"%os.getpid())

    実行結果:
    (21156) start
    writer  (21162),    (21156)
    reader  (21162),    (21156)
    reader Queue     :d
    reader Queue     :o
    reader Queue     :n
    reader Queue     :g
    reader Queue     :G
    reader Queue     :e
    (21156) End