rabbitmq direct reply-toはspringAMQPとpythonの間で使用されます.

7359 ワード

背景
会社の1つのプロジェクトはbrook erとしてrabbiitmqを使用してインタラクションし、RPC Client端末はjavaを使用して、springAMQPパケットを使って、rabbiitmqとインタラクションし、RPC Server端でpythonのpikaパケットを使って、rabbitmqとインタラクションする.両端は標準的な公式ルーチンを使用しており、Client側で送信されたメッセージはServer側で受信されて処理されて結果に戻ることができるが、Client側ではnull値しか受信されないことがわかった.
問題の調査
1従来のRPCモードの運転フローを理解する
従来のモードでは、Clientは指定されたキューにメッセージを送り、彼の列を一度に並べて宣言し、メッセージを送るヘッダのreply-to属性の値をキューの名前に設定します.ID属性は、ランダムに生成された値に設定され、メッセージ識別のために使用され、メッセージを送信する.送信後にClient側で声明を傍受する彼の列が、メッセージを受信した後、corealtional_に比べてID,正しくはメッセージを処理して傍受接続を切断し、このキューはシステムによって自動的に回収される.Server側でメッセージを受信したらメッセージを処理して返し、戻るメッセージのrouting-keyはreply-toの値に設定され、propertiesにはcorelation_を設定します.idは受け取ったcorelation_です.id値このようにして、RPCインタラクションモードを一回完成します.今日の問題を解決するにはいくつかの知識点が必要です.
  • 1メッセージがexchangeに送信された後、このメッセージを受信するキューがなければ、このメッセージは失われます.
  • 使い捨ての彼の列はClientでこの列を傍受しないと自動的にrabitmqに削除されます.
  • 1 Clientから受け取ったNull値はどこから来ますか?
    私はpythonを使ってRPC Server端末を書いていますので、javaコードはあまりできません.ですから、このnull値はあそこから来て、Cientから手をつけられません.私たちはServer側から検査するしかないです.(最後に、Javaコード作成エラー(自分のコードです)の場合、springAMQPが戻ってくるデフォルトの値だと思います.
    2 Server端末がメッセージを受信した後、正しくメッセージを返しますか?
    Server端で受信したメッセージを印刷してこのメッセージを印刷するheader情報とbody情報は、reply-toの中でClient端に設定されたキューであることを確認します.また、このメッセージの返信はラビットmqを介しても見られた.
    3観察メッセージをチェックして、reply-toキューに戻されましたか?
    そして私はServer側でメッセージを受信した後のcalback関数のヘッドが切断点より大きくなり、メッセージを受信したらServer側のプログラムが保留されます.この時reply-toの列を見に行きましたが、もうラビットmqにはないことが分かりました.上記の従来のRPCモードでは、おそらくClientからコードを送信した後、reply-toキューを傍受していないので、列が消えてしまい、Server側から送信されたメッセージはキューを受信していないため破棄されたと推測しています.この時、私達は基本的に問題をClientの端にロックしました.しかし、Clientのコードはrabitmqの公式の与えられたルーチンによって書かれています.大丈夫です.この時は行き詰まりに陥ったようです.
    ポジショニング問題:Googleが大発して公式文書を追加します.
    この時GoogleでSprigAMQPフレームのRPCコードはどう書きますか?いくつかの招待状の中にListenerのクラスを追加するコードがあることを発見しましたが、追加しないものもあります.私たちは彼らが全部運行できると仮定します.なぜこのような状況になったのですか?私は最初にバージョンの問題を考えました.バージョンが変わるとコードも変わるかもしれません.その後、SprigAMQPの公式文書を検索します.やはり見つけられました.公式文書にはこんな説明があります.
    Starting with version 3.4.0、the Rabbit MQ server now supports Direply-to.this eliminates the main reason for a fixed reply queue(to avoid the need to create a temporary queue for each request).Starting with Spring AMQP version 1.4.1 Direct-to will used by deult(starequapp of)(or it is set with the name amq.rabit mq.reply-to)、the Rabit Template will aut autut matrically detect whether Direct replay-to is supported and ether use it or fall back to using a temporary rerery.Diplequegure.
    springAMQPの公式アドレスを翻訳してみると、Rabbiit MQ 3.4.0バージョン以降、政府はDirect reply-toという方式を提供してRPCを実現しています.(この方式はRPCの性能を大幅に向上させます.彼は毎回Clientに新しい列を申請する必要がないので、後で詳しく紹介します.この特性です.SpringAMQP version 1.4.1バージョンの後、デフォルトの使用特性を見てみました.サーバー上のrabitmqバージョン3.3.30を見ました.これは本当に古いです.SprigAMQPのバージョンもやはりこのバージョンより高いです.問題が見つかりました.嬉しいですが、どう解決しますか?Direct replay-to公式紹介です.
    ソリューション
    一:rabbiitmqバージョンを向上させ、両端コードをdirect reply-to方式に適合させる.
  • 難点1 pythonの公式サイトは定例をあげていませんが、紹介してくれました.
  • を実現する方法も教えてくれました.
  • 难点2サーバのバージョンアップは、すでに上を走っている业务があります.私のこのようなrabitmqに対する萌えは、rabitmqのバージョンアップ後の変更はよく分かりません.
    难点2については考えませんでしたが、难点1の私はもうpythonを书きました.どのようにdirect reply-toのコードに适合しますか?
  • 1 reply-toの名前を“amq.rabitmq.reply-to”という仮想キューに変更しました.ラビットmqのコンソールではこの列は見えません.
  • そしてCientがこの列を傍受する時はno-ackモードにします.
  • 以下は公式のpython RPCコードによって変更された適応Direct reply-toのpythonコードClient端pythonコードです.
    # -*- coding:utf-8 -*-  
    #!/usr/bin/env python
    import pika
    import uuid
    
    
    class FibonacciRpcClient(object):
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    
            self.channel = self.connection.channel()
    
            # result = self.channel.queue_declare(exclusive=True)
            # self.callback_queue = result.method.queue
            # self.channel.basic_consume(self.on_response, no_ack=True,
            #                            queue=self.callback_queue)
            #       amp.rabbitmq.reply-to   no_ack   
            self.channel.basic_consume(self.on_response,
                                       queue='amq.rabbitmq.reply-to',
                                       no_ack=True)
    
        def on_response(self, ch, method, props, body):
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self, n):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue',
                                       properties=pika.BasicProperties(
                                             # reply_to = self.callback_queue,
                                             #        
                                             reply_to='amq.rabbitmq.reply-to',
                                             correlation_id=self.corr_id,
                                             ),
                                       body=str(n))
            while self.response is None:
                self.connection.process_data_events()
            return int(self.response)
    
    fibonacci_rpc = FibonacciRpcClient()
    
    print(" [x] Requesting fib(30)")
    response = fibonacci_rpc.call(30)
    print(" [.] Got %r" % response)
    
    Serverエンドコードは変更されていません.
    #!/usr/bin/env python
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    
    channel = connection.channel()
    
    channel.queue_declare(queue='rpc_queue')
    
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n-1) + fib(n-2)
    
    def on_request(ch, method, props, body):
        n  = int(body)
        print(" [.] fib(%s)" % n)
        response = fib(n)
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id = \
                                                             props.correlation_id),
                         body=str(response))
        # ch.basic_ack(delivery_tag = method.delivery_tag)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(on_request, queue='rpc_queue')
    
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()
    
    解決方法2 javaコードはデフォルトのdirect reply-toモードを使用しません.
    この方法は私がjavaを書くのではないので、公式文書で理解しているものしか書けません.SpringAMQPを使わないデフォルトRPCモードの化にはListenerオブジェクトを追加して自分のキューを監督する必要があります.
    RabbitTemplate rabbitTempete=new RabbitTemplate(connectionFactory);  
                rabbitTempete.setExchange(exchangeName);  
                rabbitTempete.setRoutingKey(topic);  
               //       
                Queue  replyqQueue=replyQueue();  
                admin.declareQueue(replyqQueue); 
                rabbitTempete.setReplyQueue(replyqQueue);  
                SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();  
                container.setConnectionFactory(connectionFactory);  
                container.setQueues(replyqQueue);  
                container.setMessageListener(rabbitTempete);  
                container.start();  
                //         
                Object  response=rabbitTempete.convertSendAndReceive(t);  
    
    SpringAMQPは公式文書を作成するには、自分でキューを申請して自分でモニターします.でも、このコードを試したことがないので、使えるかどうかは分かりません.
    締め括りをつける
    この問題は基本的によく解決されました.まず一つの問題を解決するには、一つのものが正常な状況にあるかどうかを理解して、問題が発生したらすぐに出発して、後ろから前へ行って、中から前へ行ってください.そしてGoogle、または公式文書、公式論壇.個人的には公式文書は本当に良いものだと思います.無数の浅穴の解決方法は全部官職にあります.つの文書です.もちろん深穴は言いません.つまり、フォーラムの能力と運があってこそ、調べられます.でも、公式の多くは英語です.本当に困りました.英語の力を強めましょう.