zeroMQ初体験-10.マルチスレッドを優雅に使用
「もしかすると、ZeroMQは最高のマルチスレッド実行環境かもしれません!」公式サイトによると.
実際にサポートしたいのはerlang信号のようなモードです.従来のマルチスレッドでは、さまざまな「ロック」に伴ってさまざまな奇妙な問題が発生します.一方、zeroMQのマルチスレッドは「デロック化」に力を入れており、簡単に言えば、1つのデータは同じ時点で1つのスレッドにしか持たれない(従来は、1つのスレッドによってのみ操作されることを許可している).ロックは、マルチスレッドが1つのデータを同時に操作する可能性があるため、副産物です.ここからzeromqの切り込み点がはっきりと見え、スレッド間のデータフローによって同じ時点でどのデータも1つのスレッドにしか持たないことを保証します.
ここでは、従来の応答パターンの例を示します.
このような切り分けにはもう一つの隠れた利点があり,万一マルチスレッドからマルチプロセスに移行すると,コードを容易に切断して再利用することができる.
ここでは、マルチスレッドでマルチプロセスを使用しない理由も示します.
プロセスのオーバーヘッドが大きすぎます(pythonはスレッドの代わりにマルチプロセスを奨励します).
上のコードの例ではサブスレッド間の通信はないようですね?マルチスレッドがサポートされている以上、これは忘れられません.
注意:
ここでは新しいポートタイプ:PAIRを用いた.プロセス間通信のために用意されています(応答など、以前に登場したタイプも列挙されています).このタイプはタイムリーで、信頼性が高く、安全です(プロセス間でも使用でき、応答と似ています).
(未完待機)
実際にサポートしたいのはerlang信号のようなモードです.従来のマルチスレッドでは、さまざまな「ロック」に伴ってさまざまな奇妙な問題が発生します.一方、zeroMQのマルチスレッドは「デロック化」に力を入れており、簡単に言えば、1つのデータは同じ時点で1つのスレッドにしか持たれない(従来は、1つのスレッドによってのみ操作されることを許可している).ロックは、マルチスレッドが1つのデータを同時に操作する可能性があるため、副産物です.ここからzeromqの切り込み点がはっきりと見え、スレッド間のデータフローによって同じ時点でどのデータも1つのスレッドにしか持たないことを保証します.
ここでは、従来の応答パターンの例を示します.
import time
import threading
import zmq
def worker_routine(worker_url, context):
socket = context.socket(zmq.REP)
socket.connect(worker_url)
while True:
string = socket.recv()
print("Received request: [%s]
" % (string))
time.sleep(1)
socket.send("World")
def main():
url_worker = "inproc://workers"
url_client = "tcp://*:5555"
context = zmq.Context(1)
clients = context.socket(zmq.XREP)
clients.bind(url_client)
workers = context.socket(zmq.XREQ)
workers.bind(url_worker)
for i in range(5):
thread = threading.Thread(target=worker_routine, args=(url_worker, context, ))
thread.start()
zmq.device(zmq.QUEUE, clients, workers)
clients.close()
workers.close()
context.term()
if __name__ == "__main__":
main()
このような切り分けにはもう一つの隠れた利点があり,万一マルチスレッドからマルチプロセスに移行すると,コードを容易に切断して再利用することができる.
ここでは、マルチスレッドでマルチプロセスを使用しない理由も示します.
プロセスのオーバーヘッドが大きすぎます(pythonはスレッドの代わりにマルチプロセスを奨励します).
上のコードの例ではサブスレッド間の通信はないようですね?マルチスレッドがサポートされている以上、これは忘れられません.
import threading
import zmq
def step1(context):
sender = context.socket(zmq.PAIR)
sender.connect("inproc://step2")
sender.send("")
def step2(context):
receiver = context.socket(zmq.PAIR)
receiver.bind("inproc://step2")
thread = threading.Thread(target=step1, args=(context, ))
thread.start()
string = receiver.recv()
sender = context.socket(zmq.PAIR)
sender.connect("inproc://step3")
sender.send("")
return
def main():
context = zmq.Context(1)
receiver = context.socket(zmq.PAIR)
receiver.bind("inproc://step3")
thread = threading.Thread(target=step2, args=(context, ))
thread.start()
string = receiver.recv()
print("Test successful!
")
receiver.close()
context.term()
return
if __name__ == "__main__":
main()
注意:
ここでは新しいポートタイプ:PAIRを用いた.プロセス間通信のために用意されています(応答など、以前に登場したタイプも列挙されています).このタイプはタイムリーで、信頼性が高く、安全です(プロセス間でも使用でき、応答と似ています).
(未完待機)