rabbitmq direct reply-toはspringAMQPとpythonの間で使用されます.
7359 ワード
背景
会社の1つのプロジェクトはbrook erとしてrabbiitmqを使用してインタラクションし、RPC Client端末はjavaを使用して、springAMQPパケットを使って、rabbiitmqとインタラクションし、RPC Server端でpythonのpikaパケットを使って、rabbitmqとインタラクションする.両端は標準的な公式ルーチンを使用しており、Client側で送信されたメッセージはServer側で受信されて処理されて結果に戻ることができるが、Client側ではnull値しか受信されないことがわかった.
問題の調査
1従来のRPCモードの運転フローを理解する
従来のモードでは、Clientは指定されたキューにメッセージを送り、彼の列を一度に並べて宣言し、メッセージを送るヘッダのreply-to属性の値をキューの名前に設定します.ID属性は、ランダムに生成された値に設定され、メッセージ識別のために使用され、メッセージを送信する.送信後にClient側で声明を傍受する彼の列が、メッセージを受信した後、corealtional_に比べてID,正しくはメッセージを処理して傍受接続を切断し、このキューはシステムによって自動的に回収される.Server側でメッセージを受信したらメッセージを処理して返し、戻るメッセージのrouting-keyはreply-toの値に設定され、propertiesにはcorelation_を設定します.idは受け取ったcorelation_です.id値このようにして、RPCインタラクションモードを一回完成します.今日の問題を解決するにはいくつかの知識点が必要です. 1メッセージがexchangeに送信された後、このメッセージを受信するキューがなければ、このメッセージは失われます. 使い捨ての彼の列はClientでこの列を傍受しないと自動的にrabitmqに削除されます. 1 Clientから受け取ったNull値はどこから来ますか?
私はpythonを使ってRPC Server端末を書いていますので、javaコードはあまりできません.ですから、このnull値はあそこから来て、Cientから手をつけられません.私たちはServer側から検査するしかないです.(最後に、Javaコード作成エラー(自分のコードです)の場合、springAMQPが戻ってくるデフォルトの値だと思います.
2 Server端末がメッセージを受信した後、正しくメッセージを返しますか?
Server端で受信したメッセージを印刷してこのメッセージを印刷するheader情報とbody情報は、reply-toの中でClient端に設定されたキューであることを確認します.また、このメッセージの返信はラビットmqを介しても見られた.
3観察メッセージをチェックして、reply-toキューに戻されましたか?
そして私はServer側でメッセージを受信した後のcalback関数のヘッドが切断点より大きくなり、メッセージを受信したらServer側のプログラムが保留されます.この時reply-toの列を見に行きましたが、もうラビットmqにはないことが分かりました.上記の従来のRPCモードでは、おそらくClientからコードを送信した後、reply-toキューを傍受していないので、列が消えてしまい、Server側から送信されたメッセージはキューを受信していないため破棄されたと推測しています.この時、私達は基本的に問題をClientの端にロックしました.しかし、Clientのコードはrabitmqの公式の与えられたルーチンによって書かれています.大丈夫です.この時は行き詰まりに陥ったようです.
ポジショニング問題:Googleが大発して公式文書を追加します.
この時GoogleでSprigAMQPフレームのRPCコードはどう書きますか?いくつかの招待状の中にListenerのクラスを追加するコードがあることを発見しましたが、追加しないものもあります.私たちは彼らが全部運行できると仮定します.なぜこのような状況になったのですか?私は最初にバージョンの問題を考えました.バージョンが変わるとコードも変わるかもしれません.その後、SprigAMQPの公式文書を検索します.やはり見つけられました.公式文書にはこんな説明があります.
Starting with version 3.4.0、the Rabbit MQ server now supports Direply-to.this eliminates the main reason for a fixed reply queue(to avoid the need to create a temporary queue for each request).Starting with Spring AMQP version 1.4.1 Direct-to will used by deult(starequapp of)(or it is set with the name amq.rabit mq.reply-to)、the Rabit Template will aut autut matrically detect whether Direct replay-to is supported and ether use it or fall back to using a temporary rerery.Diplequegure.
springAMQPの公式アドレスを翻訳してみると、Rabbiit MQ 3.4.0バージョン以降、政府はDirect reply-toという方式を提供してRPCを実現しています.(この方式はRPCの性能を大幅に向上させます.彼は毎回Clientに新しい列を申請する必要がないので、後で詳しく紹介します.この特性です.SpringAMQP version 1.4.1バージョンの後、デフォルトの使用特性を見てみました.サーバー上のrabitmqバージョン3.3.30を見ました.これは本当に古いです.SprigAMQPのバージョンもやはりこのバージョンより高いです.問題が見つかりました.嬉しいですが、どう解決しますか?Direct replay-to公式紹介です.
ソリューション
一:rabbiitmqバージョンを向上させ、両端コードをdirect reply-to方式に適合させる.難点1 pythonの公式サイトは定例をあげていませんが、紹介してくれました. を実現する方法も教えてくれました.难点2サーバのバージョンアップは、すでに上を走っている业务があります.私のこのようなrabitmqに対する萌えは、rabitmqのバージョンアップ後の変更はよく分かりません.
难点2については考えませんでしたが、难点1の私はもうpythonを书きました.どのようにdirect reply-toのコードに适合しますか? 1 reply-toの名前を“amq.rabitmq.reply-to”という仮想キューに変更しました.ラビットmqのコンソールではこの列は見えません. そしてCientがこの列を傍受する時はno-ackモードにします. 以下は公式のpython RPCコードによって変更された適応Direct reply-toのpythonコードClient端pythonコードです.
この方法は私がjavaを書くのではないので、公式文書で理解しているものしか書けません.SpringAMQPを使わないデフォルトRPCモードの化にはListenerオブジェクトを追加して自分のキューを監督する必要があります.
締め括りをつける
この問題は基本的によく解決されました.まず一つの問題を解決するには、一つのものが正常な状況にあるかどうかを理解して、問題が発生したらすぐに出発して、後ろから前へ行って、中から前へ行ってください.そしてGoogle、または公式文書、公式論壇.個人的には公式文書は本当に良いものだと思います.無数の浅穴の解決方法は全部官職にあります.つの文書です.もちろん深穴は言いません.つまり、フォーラムの能力と運があってこそ、調べられます.でも、公式の多くは英語です.本当に困りました.英語の力を強めましょう.
会社の1つのプロジェクトはbrook erとしてrabbiitmqを使用してインタラクションし、RPC Client端末はjavaを使用して、springAMQPパケットを使って、rabbiitmqとインタラクションし、RPC Server端でpythonのpikaパケットを使って、rabbitmqとインタラクションする.両端は標準的な公式ルーチンを使用しており、Client側で送信されたメッセージはServer側で受信されて処理されて結果に戻ることができるが、Client側ではnull値しか受信されないことがわかった.
問題の調査
1従来のRPCモードの運転フローを理解する
従来のモードでは、Clientは指定されたキューにメッセージを送り、彼の列を一度に並べて宣言し、メッセージを送るヘッダのreply-to属性の値をキューの名前に設定します.ID属性は、ランダムに生成された値に設定され、メッセージ識別のために使用され、メッセージを送信する.送信後にClient側で声明を傍受する彼の列が、メッセージを受信した後、corealtional_に比べてID,正しくはメッセージを処理して傍受接続を切断し、このキューはシステムによって自動的に回収される.Server側でメッセージを受信したらメッセージを処理して返し、戻るメッセージのrouting-keyはreply-toの値に設定され、propertiesにはcorelation_を設定します.idは受け取ったcorelation_です.id値このようにして、RPCインタラクションモードを一回完成します.今日の問題を解決するにはいくつかの知識点が必要です.
私はpythonを使ってRPC Server端末を書いていますので、javaコードはあまりできません.ですから、このnull値はあそこから来て、Cientから手をつけられません.私たちはServer側から検査するしかないです.(最後に、Javaコード作成エラー(自分のコードです)の場合、springAMQPが戻ってくるデフォルトの値だと思います.
2 Server端末がメッセージを受信した後、正しくメッセージを返しますか?
Server端で受信したメッセージを印刷してこのメッセージを印刷するheader情報とbody情報は、reply-toの中でClient端に設定されたキューであることを確認します.また、このメッセージの返信はラビットmqを介しても見られた.
3観察メッセージをチェックして、reply-toキューに戻されましたか?
そして私はServer側でメッセージを受信した後のcalback関数のヘッドが切断点より大きくなり、メッセージを受信したらServer側のプログラムが保留されます.この時reply-toの列を見に行きましたが、もうラビットmqにはないことが分かりました.上記の従来のRPCモードでは、おそらくClientからコードを送信した後、reply-toキューを傍受していないので、列が消えてしまい、Server側から送信されたメッセージはキューを受信していないため破棄されたと推測しています.この時、私達は基本的に問題をClientの端にロックしました.しかし、Clientのコードはrabitmqの公式の与えられたルーチンによって書かれています.大丈夫です.この時は行き詰まりに陥ったようです.
ポジショニング問題:Googleが大発して公式文書を追加します.
この時GoogleでSprigAMQPフレームのRPCコードはどう書きますか?いくつかの招待状の中にListenerのクラスを追加するコードがあることを発見しましたが、追加しないものもあります.私たちは彼らが全部運行できると仮定します.なぜこのような状況になったのですか?私は最初にバージョンの問題を考えました.バージョンが変わるとコードも変わるかもしれません.その後、SprigAMQPの公式文書を検索します.やはり見つけられました.公式文書にはこんな説明があります.
Starting with version 3.4.0、the Rabbit MQ server now supports Direply-to.this eliminates the main reason for a fixed reply queue(to avoid the need to create a temporary queue for each request).Starting with Spring AMQP version 1.4.1 Direct-to will used by deult(starequapp of)(or it is set with the name amq.rabit mq.reply-to)、the Rabit Template will aut autut matrically detect whether Direct replay-to is supported and ether use it or fall back to using a temporary rerery.Diplequegure.
springAMQPの公式アドレスを翻訳してみると、Rabbiit MQ 3.4.0バージョン以降、政府はDirect reply-toという方式を提供してRPCを実現しています.(この方式はRPCの性能を大幅に向上させます.彼は毎回Clientに新しい列を申請する必要がないので、後で詳しく紹介します.この特性です.SpringAMQP version 1.4.1バージョンの後、デフォルトの使用特性を見てみました.サーバー上のrabitmqバージョン3.3.30を見ました.これは本当に古いです.SprigAMQPのバージョンもやはりこのバージョンより高いです.問題が見つかりました.嬉しいですが、どう解決しますか?Direct replay-to公式紹介です.
ソリューション
一:rabbiitmqバージョンを向上させ、両端コードをdirect reply-to方式に適合させる.
难点2については考えませんでしたが、难点1の私はもうpythonを书きました.どのようにdirect reply-toのコードに适合しますか?
# -*- coding:utf-8 -*-
#!/usr/bin/env python
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
# result = self.channel.queue_declare(exclusive=True)
# self.callback_queue = result.method.queue
# self.channel.basic_consume(self.on_response, no_ack=True,
# queue=self.callback_queue)
# amp.rabbitmq.reply-to no_ack
self.channel.basic_consume(self.on_response,
queue='amq.rabbitmq.reply-to',
no_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
# reply_to = self.callback_queue,
#
reply_to='amq.rabbitmq.reply-to',
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
Serverエンドコードは変更されていません.#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
# ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print(" [x] Awaiting RPC requests")
channel.start_consuming()
解決方法2 javaコードはデフォルトのdirect reply-toモードを使用しません.この方法は私がjavaを書くのではないので、公式文書で理解しているものしか書けません.SpringAMQPを使わないデフォルトRPCモードの化にはListenerオブジェクトを追加して自分のキューを監督する必要があります.
RabbitTemplate rabbitTempete=new RabbitTemplate(connectionFactory);
rabbitTempete.setExchange(exchangeName);
rabbitTempete.setRoutingKey(topic);
//
Queue replyqQueue=replyQueue();
admin.declareQueue(replyqQueue);
rabbitTempete.setReplyQueue(replyqQueue);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(replyqQueue);
container.setMessageListener(rabbitTempete);
container.start();
//
Object response=rabbitTempete.convertSendAndReceive(t);
SpringAMQPは公式文書を作成するには、自分でキューを申請して自分でモニターします.でも、このコードを試したことがないので、使えるかどうかは分かりません.締め括りをつける
この問題は基本的によく解決されました.まず一つの問題を解決するには、一つのものが正常な状況にあるかどうかを理解して、問題が発生したらすぐに出発して、後ろから前へ行って、中から前へ行ってください.そしてGoogle、または公式文書、公式論壇.個人的には公式文書は本当に良いものだと思います.無数の浅穴の解決方法は全部官職にあります.つの文書です.もちろん深穴は言いません.つまり、フォーラムの能力と運があってこそ、調べられます.でも、公式の多くは英語です.本当に困りました.英語の力を強めましょう.