pyzmqのPolling and Sockets
Polling and Sockets
1つのスレッドには複数のsokectがあり、同時にデータの送受信が必要であり、zmqはrecv()時にブロックすることなくpolling sockets実装を提供する.
次のインスタンスでは、ワークが終了するタイミングを示すcommand serverを作成し、ワークがPublisherからサブスクリプションを取得して印刷されます('exit'で終了します).
1.PUSH server,コマンドサービス
2.PUB server、メッセージの発行
3.クライアント
うんてん
apiを見て
Poll the registered 0MQ or native fds for I/O.
Parameters:
Returns:
Return type:
timeout (float, int) – The timeout in milliseconds. If None, no timeout (infinite). This is in milliseconds to be compatible with
events – The list of events that are ready to be processed. This is a list of tuples of the form
list of tuples
POLLINについては、POLLOUT: flag (int, default=POLLIN|POLLOUT) – 0MQ poll flags. If flag|POLLIN, recv events will be flushed. If flag|POLLOUT, send events will be flushed. Both flags can be set at once, which is the default.
このように、java nioでchannelをselectorに登録するのと同じように、pollerに登録されているsockter状態をポーリングし続けます.SOcketデータ受信準備(POLLIN)が見つかり、ビジネスコードが実行されます.
しかし,'if'の処理方式ではやや醜いためpyzmqはtornador ioloopを実現するIOStreamのクラスを提供する:ZMQStreamはpolling eventを処理し,これによりコールバックを使用することができる.
まず、tornadorをインストールします:pip install tornado
次に、上のコードを変更します.
ioloop.install()は
tornador.ioloop.IOLoopはzmqのpollerを使用します.
PUSH serverとPUB serverのコードは変更しないでください
2つの処理業務の関数を持ち出してコールバックする
クライアントは次のように変更されました.
元のsocketをzmqstreamに飾り、ioloopインスタンスrunを起こすと、他に心配する必要はありません.妥当なバカ式です.
1つのスレッドには複数のsokectがあり、同時にデータの送受信が必要であり、zmqはrecv()時にブロックすることなくpolling sockets実装を提供する.
次のインスタンスでは、ワークが終了するタイミングを示すcommand serverを作成し、ワークがPublisherからサブスクリプションを取得して印刷されます('exit'で終了します).
1.PUSH server,コマンドサービス
import zmq
import time
import sys
import random
from multiprocessing import Process
def server_push(port="5556"):
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:%s" % port)
print "Running server on port: ", port
# serves only 5 request and dies
for reqnum in range(10):
if reqnum < 6:
socket.send("Continue")
else:
socket.send("Exit")
break
time.sleep (1)
2.PUB server、メッセージの発行
def server_pub(port="5558"):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
publisher_id = random.randrange(0,9999)
print "Running server on port: ", port
# serves only 5 request and dies
for reqnum in range(10):
# Wait for next request from client
topic = random.randrange(8,10)
messagedata = "server#%s" % publisher_id
print "%s %s" % (topic, messagedata)
socket.send("%d %s" % (topic, messagedata))
time.sleep(1)
3.クライアント
def client(port_push, port_sub):
context = zmq.Context()
socket_pull = context.socket(zmq.PULL)
socket_pull.connect ("tcp://localhost:%s" % port_push)
print "Connected to server with port %s" % port_push
socket_sub = context.socket(zmq.SUB)
socket_sub.connect ("tcp://localhost:%s" % port_sub)
socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
print "Connected to publisher with port %s" % port_sub
# Poller
poller = zmq.Poller()
poller.register(socket_pull, zmq.POLLIN)
poller.register(socket_sub, zmq.POLLIN)
# Work on requests from both server and publisher
should_continue = True
while should_continue:
socks = dict(poller.poll())
if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
message = socket_pull.recv()
print "Recieved control command: %s" % message
if message == "Exit":
print "Recieved exit command, client will stop recieving messages"
should_continue = False
if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
string = socket_sub.recv()
topic, messagedata = string.split()
print "Processing ... ", topic, messagedata
うんてん
if __name__ == "__main__":
# Now we can run a few servers
server_push_port = "5556"
server_pub_port = "5558"
Process(target=server_push, args=(server_push_port,)).start()
Process(target=server_pub, args=(server_pub_port,)).start()
Process(target=client, args=(server_push_port,server_pub_port,)).start()
apiを見て
poll
(timeout=None) Poll the registered 0MQ or native fds for I/O.
Parameters:
Returns:
Return type:
timeout (float, int) – The timeout in milliseconds. If None, no timeout (infinite). This is in milliseconds to be compatible with
select.poll()
. events – The list of events that are ready to be processed. This is a list of tuples of the form
(socket, event)
, where the 0MQ Socket or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second. It is common to call events = dict(poller.poll())
, which turns the list of tuples into a mapping of socket : event
. list of tuples
POLLINについては、POLLOUT:
このように、java nioでchannelをselectorに登録するのと同じように、pollerに登録されているsockter状態をポーリングし続けます.SOcketデータ受信準備(POLLIN)が見つかり、ビジネスコードが実行されます.
しかし,'if'の処理方式ではやや醜いためpyzmqはtornador ioloopを実現するIOStreamのクラスを提供する:ZMQStreamはpolling eventを処理し,これによりコールバックを使用することができる.
まず、tornadorをインストールします:pip install tornado
次に、上のコードを変更します.
import zmq
import time
import sys
import random
from multiprocessing import Process
from zmq.eventloop import ioloop, zmqstream
ioloop.install()
ioloop.install()は
tornador.ioloop.IOLoopはzmqのpollerを使用します.
PUSH serverとPUB serverのコードは変更しないでください
2つの処理業務の関数を持ち出してコールバックする
def getcommand(msg):
print "Received control command: %s" % msg
if msg[0] == "Exit":
print "Received exit command, client will stop receiving messages"
should_continue = False
ioloop.IOLoop.instance().stop()#
def process_message(msg):
print "Processing ... %s" % msg
クライアントは次のように変更されました.
def client(port_push, port_sub):
context = zmq.Context()
socket_pull = context.socket(zmq.PULL)
socket_pull.connect ("tcp://localhost:%s" % port_push)
stream_pull = zmqstream.ZMQStream(socket_pull)
stream_pull.on_recv(getcommand)
print "Connected to server with port %s" % port_push
socket_sub = context.socket(zmq.SUB)
socket_sub.connect ("tcp://localhost:%s" % port_sub)
socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
stream_sub = zmqstream.ZMQStream(socket_sub)
stream_sub.on_recv(process_message)
print "Connected to publisher with port %s" % port_sub
ioloop.IOLoop.instance().start()
print "Worker has stopped processing messages."
if __name__ == "__main__":
# Now we can run a few servers
server_push_port = "5556"
server_pub_port = "5558"
Process(target=server_push, args=(server_push_port,)).start()
Process(target=server_pub, args=(server_pub_port,)).start()
Process(target=client, args=(server_push_port,server_pub_port,)).start()
元のsocketをzmqstreamに飾り、ioloopインスタンスrunを起こすと、他に心配する必要はありません.妥当なバカ式です.