pyzmqのPolling and Sockets

6828 ワード

Polling and Sockets
1つのスレッドには複数のsokectがあり、同時にデータの送受信が必要であり、zmqはrecv()時にブロックすることなくpolling sockets実装を提供する.
次のインスタンスでは、ワークが終了するタイミングを示すcommand serverを作成し、ワークがPublisherからサブスクリプションを取得して印刷されます('exit'で終了します).
    1.PUSH server,コマンドサービス
import zmq
import time
import sys
import random
from  multiprocessing import Process

def server_push(port="5556"):
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://*:%s" % port)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        if reqnum < 6:
            socket.send("Continue")
        else:
            socket.send("Exit")
            break
        time.sleep (1) 

   2.PUB server、メッセージの発行
def server_pub(port="5558"):
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:%s" % port)
    publisher_id = random.randrange(0,9999)
    print "Running server on port: ", port
    # serves only 5 request and dies
    for reqnum in range(10):
        # Wait for next request from client
        topic = random.randrange(8,10)
        messagedata = "server#%s" % publisher_id
        print "%s %s" % (topic, messagedata)
        socket.send("%d %s" % (topic, messagedata))
        time.sleep(1) 

   3.クライアント
def client(port_push, port_sub):
    context = zmq.Context()
    socket_pull = context.socket(zmq.PULL)
    socket_pull.connect ("tcp://localhost:%s" % port_push)
    print "Connected to server with port %s" % port_push
    socket_sub = context.socket(zmq.SUB)
    socket_sub.connect ("tcp://localhost:%s" % port_sub)
    socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
    print "Connected to publisher with port %s" % port_sub
    #    Poller
    poller = zmq.Poller()
    poller.register(socket_pull, zmq.POLLIN)
    poller.register(socket_sub, zmq.POLLIN)
    # Work on requests from both server and publisher
    should_continue = True
    while should_continue:
        socks = dict(poller.poll())
        if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
            message = socket_pull.recv()
            print "Recieved control command: %s" % message
            if message == "Exit": 
                print "Recieved exit command, client will stop recieving messages"
                should_continue = False

        if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
            string = socket_sub.recv()
            topic, messagedata = string.split()
            print "Processing ... ", topic, messagedata

うんてん
if __name__ == "__main__":
    # Now we can run a few servers 
    server_push_port = "5556"
    server_pub_port = "5558"
    Process(target=server_push, args=(server_push_port,)).start()
    Process(target=server_pub, args=(server_pub_port,)).start()
    Process(target=client, args=(server_push_port,server_pub_port,)).start()

apiを見てpoll (timeout=None)
Poll the registered 0MQ or native fds for I/O.
Parameters:
Returns:
Return type:
timeout (float, int) – The timeout in milliseconds. If None, no timeout (infinite). This is in milliseconds to be compatible with  select.poll() .
events – The list of events that are ready to be processed. This is a list of tuples of the form  (socket, event) , where the 0MQ Socket or integer fd is the first element, and the poll event mask (POLLIN, POLLOUT) is the second. It is common to call events = dict(poller.poll()) , which turns the list of tuples into a mapping of socket : event .
list of tuples
POLLINについては、POLLOUT:
  • flag (int, default=POLLIN|POLLOUT) – 0MQ poll flags. If flag|POLLIN, recv events will be flushed. If flag|POLLOUT, send events will be flushed. Both flags can be set at once, which is the default.

  • このように、java nioでchannelをselectorに登録するのと同じように、pollerに登録されているsockter状態をポーリングし続けます.SOcketデータ受信準備(POLLIN)が見つかり、ビジネスコードが実行されます. 
     
    しかし,'if'の処理方式ではやや醜いためpyzmqはtornador ioloopを実現するIOStreamのクラスを提供する:ZMQStreamはpolling eventを処理し,これによりコールバックを使用することができる.
    まず、tornadorをインストールします:pip install tornado
    次に、上のコードを変更します.
    import zmq
    import time
    import sys
    import random
    from  multiprocessing import Process
    
    from zmq.eventloop import ioloop, zmqstream
    ioloop.install()
        
    ioloop.install()は
    tornador.ioloop.IOLoopはzmqのpollerを使用します.
    PUSH serverとPUB serverのコードは変更しないでください
    2つの処理業務の関数を持ち出してコールバックする
    def getcommand(msg):
    	print "Received control command: %s" % msg
    	if msg[0] == "Exit":
    		print "Received exit command, client will stop receiving messages"
    		should_continue = False
    		ioloop.IOLoop.instance().stop()#     
            
    def process_message(msg):
    	print "Processing ... %s" % msg

    クライアントは次のように変更されました.
    def client(port_push, port_sub):    
    	context = zmq.Context()
    	socket_pull = context.socket(zmq.PULL)
    	socket_pull.connect ("tcp://localhost:%s" % port_push)
    	stream_pull = zmqstream.ZMQStream(socket_pull)
    	stream_pull.on_recv(getcommand)
    	print "Connected to server with port %s" % port_push
    	
    	socket_sub = context.socket(zmq.SUB)
    	socket_sub.connect ("tcp://localhost:%s" % port_sub)
    	socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
    	stream_sub = zmqstream.ZMQStream(socket_sub)
    	stream_sub.on_recv(process_message)
    	print "Connected to publisher with port %s" % port_sub
    	ioloop.IOLoop.instance().start()
    	print "Worker has stopped processing messages."
    
    
    if __name__ == "__main__":
        # Now we can run a few servers 
        server_push_port = "5556"
        server_pub_port = "5558"
        Process(target=server_push, args=(server_push_port,)).start()
        Process(target=server_pub, args=(server_pub_port,)).start()
        Process(target=client, args=(server_push_port,server_pub_port,)).start()

    元のsocketをzmqstreamに飾り、ioloopインスタンスrunを起こすと、他に心配する必要はありません.妥当なバカ式です.