RabbitMQメッセージキュー(7):クラウドコンピューティングクラスタのリモートコール(RPC)に適用
クラウドコンピューティング環境では、他のマシンのコンピューティングリソースを使用する必要がある場合が多く、Messageを受信して処理すると、一部のコンピューティングタスクが他のノードに割り当てられて完了する可能性があります.では、RabbitMQはRPCをどのように使うのでしょうか.本論文では,他のノードを用いてフィボナッチの完成例を求める.
1.クライアントインタフェースClient interface
RPCサービスがどのように使用されているかを示すために、簡単なクライアントclassを作成します.コールという名前の関数が外部に提供され、RPCリクエストが送信され、RPC演算を受信した結果がブロックされます.コードは次のとおりです.
2.コールバック関数キューCallback queue
全体的にRabbitMQでRPCリモートコールを行うのは比較的容易である.クライアントはリクエストのMessageを送信し、serverは応答結果を返す.応答クライアントがpublishメッセージを受信するには、「callback」(コールバック)のqueueアドレスを指定する必要があります.codeは次のとおりです.
2.1 Message properties
AMQPは、14個の属性を事前に定義している.それらのほとんどはあまり使われません.以下のいくつかは普段使っているものが多いです. delivery_mode:Messageを1つ永続化します(設定値で2).その他の任意の値は非永続化です.RabbitMQメッセージキューに移動してください(3):タスク配布メカニズム content_type:mime-typeのencodingを記述します.例えばJSONコードに設定:このpropertyをアプリケーション/jsonに設定します. reply_to:コールバック用のqueue(Commonly used to name a callback queue)を指定するのに一般的に使用されます. correlation_id:要求にRPC応答(correlate RPC responses with requests)を関連付けて処理する.
3.関連id Correlation id
前のセクションでは、RPCリクエストごとにcallback queueを作成する方法を実装しました.これは効率的ではありません.幸いなことに、ここにはclientごとに一意のcallback queueを作成する解決策があります.
これには他の問題があります.応答を受信すると、すべての応答が同じqueueに書かれているため、応答がそれであるかどうかを判断できません.前のセクションのcorrelation_idはこの場合に役立ちます.requestごとに一意の値を設定し、応答を受信した後、この値で自分の応答かどうかを判断できます.自分の応答でなければ、処理しません.
4.まとめ
ワークフロー:クライアントが起動すると、匿名のexclusive callback queueが作成される. クライアントのRPCリクエスト時に2つのproperties:reply_が同時に設定されます.to callback queueに設定します.correlation_idはrequestごとにユニークな値に設定. 要求はan rpc_に送信されるqueue queue. RPCエンドまたはserverは、そのqueueのリクエストをずっと待っています.リクエストが到着するとreply_to指定したqueueはclientにメッセージを返信します. clientはcallback queueのデータをずっと待っています.メッセージが到着するとcorrelation_がチェックされますidの値は、値がrequest送信時と一致すると応答を返します.
5.最終実現
The code for rpc_server.py:
The server code is rather straightforward: (4) As usual we start by establishing the connection and declaring the queue. (11) We declare our fibonacci function. It assumes only valid positive integer input. (Don't expect this one to work for big numbers, it's probably the slowest recursive implementation possible). (19) We declare a callback for basic_consume, the core of the RPC server. It's executed when the request is received. It does the work and sends the response back. (32) We might want to run more than one server process. In order to spread the load equally over multiple servers we need to set theprefetch_count setting.
The code for rpc_client.py:
The client code is slightly more involved: (7) We establish a connection, channel and declare an exclusive 'callback' queue for replies. (16) We subscribe to the 'callback' queue, so that we can receive RPC responses. (18) The 'on_response' callback executed on every response is doing a very simple job, for every response message it checks if thecorrelation_id is the one we're looking for. If so, it saves the response inself.response and breaks the consuming loop. (23) Next, we define our main call method - it does the actual RPC request. (24) In this method, first we generate a unique correlation_id number and save it - the 'on_response' callback function will use this value to catch the appropriate response. (25) Next, we publish the request message, with two properties: reply_to and correlation_id. (32) At this point we can sit back and wait until the proper response arrives. (33) And finally we return the response back to the user.
開始rpc_server.py: RPCサーバが遅すぎる場合は、拡張できます.別のRPCサーバを起動します. クライアント側では、ロック同期動作は行われず、要求待ち応答を送信する.
私たちのcodeはまだ簡単で、より複雑で重要な問題を解決しようとしていません.例えば、 serverが実行されていない場合、clientはどうすればいいですか? RPCはタイムアウトメカニズムを設定すべきですか? serverの実行エラーが発生し、異常が投げ出された場合、この問題をclientに転送する必要がありますか? 境界検査は必要ですか?
オリジナルを尊重し、転載は出典anzhsoftを明記してください.http://blog.csdn.net/anzhsoft/article/details/19633107
参考資料:
1. http://www.rabbitmq.com/tutorials/tutorial-six-python.html
1.クライアントインタフェースClient interface
RPCサービスがどのように使用されているかを示すために、簡単なクライアントclassを作成します.コールという名前の関数が外部に提供され、RPCリクエストが送信され、RPC演算を受信した結果がブロックされます.コードは次のとおりです.
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print "fib(4) is %r" % (result,)
2.コールバック関数キューCallback queue
全体的にRabbitMQでRPCリモートコールを行うのは比較的容易である.クライアントはリクエストのMessageを送信し、serverは応答結果を返す.応答クライアントがpublishメッセージを受信するには、「callback」(コールバック)のqueueアドレスを指定する必要があります.codeは次のとおりです.
result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)
# ... and some code to read a response message from the callback_queue ...
2.1 Message properties
AMQPは、14個の属性を事前に定義している.それらのほとんどはあまり使われません.以下のいくつかは普段使っているものが多いです.
3.関連id Correlation id
前のセクションでは、RPCリクエストごとにcallback queueを作成する方法を実装しました.これは効率的ではありません.幸いなことに、ここにはclientごとに一意のcallback queueを作成する解決策があります.
これには他の問題があります.応答を受信すると、すべての応答が同じqueueに書かれているため、応答がそれであるかどうかを判断できません.前のセクションのcorrelation_idはこの場合に役立ちます.requestごとに一意の値を設定し、応答を受信した後、この値で自分の応答かどうかを判断できます.自分の応答でなければ、処理しません.
4.まとめ
ワークフロー:
5.最終実現
The code for rpc_server.py:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print " [.] fib(%s)" % (n,)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print " [x] Awaiting RPC requests"
channel.start_consuming()
The server code is rather straightforward:
The code for rpc_client.py:
#!/usr/bin/env python
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print " [x] Requesting fib(30)"
response = fibonacci_rpc.call(30)
print " [.] Got %r" % (response,)
The client code is slightly more involved:
開始rpc_server.py:
$ python rpc_server.py
[x] Awaiting RPC requests
クライアントを介してfibonacci数を要求する:$ python rpc_client.py
[x] Requesting fib(30)
現在、この設計は唯一ではありませんが、この実装には以下の利点があります.私たちのcodeはまだ簡単で、より複雑で重要な問題を解決しようとしていません.例えば、
オリジナルを尊重し、転載は出典anzhsoftを明記してください.http://blog.csdn.net/anzhsoft/article/details/19633107
参考資料:
1. http://www.rabbitmq.com/tutorials/tutorial-six-python.html