RabbitMQメッセージキュー(7):クラウドコンピューティングクラスタのリモートコール(RPC)に適用


クラウドコンピューティング環境では、他のマシンのコンピューティングリソースを使用する必要がある場合が多く、Messageを受信して処理すると、一部のコンピューティングタスクが他のノードに割り当てられて完了する可能性があります.では、RabbitMQはRPCをどのように使うのでしょうか.本論文では,他のノードを用いてフィボナッチの完成例を求める.
1.クライアントインタフェースClient interface
RPCサービスがどのように使用されているかを示すために、簡単なクライアントclassを作成します.コールという名前の関数が外部に提供され、RPCリクエストが送信され、RPC演算を受信した結果がブロックされます.コードは次のとおりです.
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)

2.コールバック関数キューCallback queue
全体的にRabbitMQでRPCリモートコールを行うのは比較的容易である.クライアントはリクエストのMessageを送信し、serverは応答結果を返す.応答クライアントがpublishメッセージを受信するには、「callback」(コールバック)のqueueアドレスを指定する必要があります.codeは次のとおりです.
result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

# ... and some code to read a response message from the callback_queue ...

2.1 Message properties
AMQPは、14個の属性を事前に定義している.それらのほとんどはあまり使われません.以下のいくつかは普段使っているものが多いです.
  • delivery_mode:Messageを1つ永続化します(設定値で2).その他の任意の値は非永続化です.RabbitMQメッセージキューに移動してください(3):タスク配布メカニズム
  • content_type:mime-typeのencodingを記述します.例えばJSONコードに設定:このpropertyをアプリケーション/jsonに設定します.
  • reply_to:コールバック用のqueue(Commonly used to name a callback queue)を指定するのに一般的に使用されます.
  • correlation_id:要求にRPC応答(correlate RPC responses with requests)を関連付けて処理する.

  • 3.関連id Correlation id
    前のセクションでは、RPCリクエストごとにcallback queueを作成する方法を実装しました.これは効率的ではありません.幸いなことに、ここにはclientごとに一意のcallback queueを作成する解決策があります.
    これには他の問題があります.応答を受信すると、すべての応答が同じqueueに書かれているため、応答がそれであるかどうかを判断できません.前のセクションのcorrelation_idはこの場合に役立ちます.requestごとに一意の値を設定し、応答を受信した後、この値で自分の応答かどうかを判断できます.自分の応答でなければ、処理しません.
     
    4.まとめ
    ワークフロー:
  • クライアントが起動すると、匿名のexclusive callback queueが作成される.
  • クライアントのRPCリクエスト時に2つのproperties:reply_が同時に設定されます.to callback queueに設定します.correlation_idはrequestごとにユニークな値に設定.
  • 要求はan rpc_に送信されるqueue queue.
  • RPCエンドまたはserverは、そのqueueのリクエストをずっと待っています.リクエストが到着するとreply_to指定したqueueはclientにメッセージを返信します.
  • clientはcallback queueのデータをずっと待っています.メッセージが到着するとcorrelation_がチェックされますidの値は、値がrequest送信時と一致すると応答を返します.

  • 5.最終実現
    The code for rpc_server.py:
    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    
    channel = connection.channel()
    
    channel.queue_declare(queue='rpc_queue')
    
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n-1) + fib(n-2)
    
    def on_request(ch, method, props, body):
        n = int(body)
    
        print " [.] fib(%s)"  % (n,)
        response = fib(n)
    
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                         body=str(response))
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(on_request, queue='rpc_queue')
    
    print " [x] Awaiting RPC requests"
    channel.start_consuming()
    

    The server code is rather straightforward:
  • (4) As usual we start by establishing the connection and declaring the queue.
  • (11) We declare our fibonacci function. It assumes only valid positive integer input. (Don't expect this one to work for big numbers, it's probably the slowest recursive implementation possible).
  • (19) We declare a callback for basic_consume, the core of the RPC server. It's executed when the request is received. It does the work and sends the response back.
  • (32) We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set theprefetch_count setting.

  • The code for rpc_client.py:
    #!/usr/bin/env python
    import pika
    import uuid
    
    class FibonacciRpcClient(object):
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                    host='localhost'))
    
            self.channel = self.connection.channel()
    
            result = self.channel.queue_declare(exclusive=True)
            self.callback_queue = result.method.queue
    
            self.channel.basic_consume(self.on_response, no_ack=True,
                                       queue=self.callback_queue)
    
        def on_response(self, ch, method, props, body):
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self, n):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue',
                                       properties=pika.BasicProperties(
                                             reply_to = self.callback_queue,
                                             correlation_id = self.corr_id,
                                             ),
                                       body=str(n))
            while self.response is None:
                self.connection.process_data_events()
            return int(self.response)
    
    fibonacci_rpc = FibonacciRpcClient()
    
    print " [x] Requesting fib(30)"
    response = fibonacci_rpc.call(30)
    print " [.] Got %r" % (response,)

    The client code is slightly more involved:
  • (7) We establish a connection, channel and declare an exclusive 'callback' queue for replies.
  • (16) We subscribe to the 'callback' queue, so that we can receive RPC responses.
  • (18) The 'on_response' callback executed on every response is doing a very simple job, for every response message it checks if thecorrelation_id is the one we're looking for. If so, it saves the response inself.response and breaks the consuming loop.
  • (23) Next, we define our main call method - it does the actual RPC request.
  • (24) In this method, first we generate a unique correlation_id number and save it - the 'on_response' callback function will use this value to catch the appropriate response.
  • (25) Next, we publish the request message, with two properties: reply_to and correlation_id.
  • (32) At this point we can sit back and wait until the proper response arrives.
  • (33) And finally we return the response back to the user.

  • 開始rpc_server.py:
    $ python rpc_server.py
     [x] Awaiting RPC requests
    クライアントを介してfibonacci数を要求する:
    $ python rpc_client.py
     [x] Requesting fib(30)
    現在、この設計は唯一ではありませんが、この実装には以下の利点があります.
  • RPCサーバが遅すぎる場合は、拡張できます.別のRPCサーバを起動します.
  • クライアント側では、ロック同期動作は行われず、要求待ち応答を送信する.

  • 私たちのcodeはまだ簡単で、より複雑で重要な問題を解決しようとしていません.例えば、
  • serverが実行されていない場合、clientはどうすればいいですか?
  • RPCはタイムアウトメカニズムを設定すべきですか?
  • serverの実行エラーが発生し、異常が投げ出された場合、この問題をclientに転送する必要がありますか?
  • 境界検査は必要ですか?

  • オリジナルを尊重し、転載は出典anzhsoftを明記してください.http://blog.csdn.net/anzhsoft/article/details/19633107
    参考資料:
    1. http://www.rabbitmq.com/tutorials/tutorial-six-python.html