zeroMQ初体験-10.マルチスレッドを優雅に使用


「もしかすると、ZeroMQは最高のマルチスレッド実行環境かもしれません!」公式サイトによると.
実際にサポートしたいのはerlang信号のようなモードです.従来のマルチスレッドでは、さまざまな「ロック」に伴ってさまざまな奇妙な問題が発生します.一方、zeroMQのマルチスレッドは「デロック化」に力を入れており、簡単に言えば、1つのデータは同じ時点で1つのスレッドにしか持たれない(従来は、1つのスレッドによってのみ操作されることを許可している).ロックは、マルチスレッドが1つのデータを同時に操作する可能性があるため、副産物です.ここからzeromqの切り込み点がはっきりと見え、スレッド間のデータフローによって同じ時点でどのデータも1つのスレッドにしか持たないことを保証します.
ここでは、従来の応答パターンの例を示します.

import time
import threading
import zmq

def worker_routine(worker_url, context):
    
    socket = context.socket(zmq.REP)
    
    socket.connect(worker_url)
    
    while True:
        
        string  = socket.recv()
        print("Received request: [%s]
" % (string)) time.sleep(1) socket.send("World") def main(): url_worker = "inproc://workers" url_client = "tcp://*:5555" context = zmq.Context(1) clients = context.socket(zmq.XREP) clients.bind(url_client) workers = context.socket(zmq.XREQ) workers.bind(url_worker) for i in range(5): thread = threading.Thread(target=worker_routine, args=(url_worker, context, )) thread.start() zmq.device(zmq.QUEUE, clients, workers) clients.close() workers.close() context.term() if __name__ == "__main__": main()

このような切り分けにはもう一つの隠れた利点があり,万一マルチスレッドからマルチプロセスに移行すると,コードを容易に切断して再利用することができる.
ここでは、マルチスレッドでマルチプロセスを使用しない理由も示します.
プロセスのオーバーヘッドが大きすぎます(pythonはスレッドの代わりにマルチプロセスを奨励します).
上のコードの例ではサブスレッド間の通信はないようですね?マルチスレッドがサポートされている以上、これは忘れられません.

import threading
import zmq

def step1(context):
    sender = context.socket(zmq.PAIR)
    sender.connect("inproc://step2")   
    sender.send("")  

def step2(context):
    receiver = context.socket(zmq.PAIR)
    receiver.bind("inproc://step2")
    
    thread = threading.Thread(target=step1, args=(context, ))
    thread.start()
    
    string = receiver.recv()

    sender = context.socket(zmq.PAIR)
    sender.connect("inproc://step3")
    sender.send("")
    
    return

def main():
    context = zmq.Context(1)
    
    receiver = context.socket(zmq.PAIR)
    receiver.bind("inproc://step3")
    
    thread = threading.Thread(target=step2, args=(context, ))
    thread.start()
    
    string = receiver.recv()
    
    print("Test successful!
") receiver.close() context.term() return if __name__ == "__main__": main()

注意:
ここでは新しいポートタイプ:PAIRを用いた.プロセス間通信のために用意されています(応答など、以前に登場したタイプも列挙されています).このタイプはタイムリーで、信頼性が高く、安全です(プロセス間でも使用でき、応答と似ています).
(未完待機)