Pythonの簡単なrpcフレームワークをrabbitMQで実現

4066 ワード

rpc中国語すなわちリモートプロシージャ呼び出しの意味
マイクロサービスアーキテクチャが盛んな状況下で、開発者として多かれ少なかれ関連技術に触れる.
大手googleのgRPC、Facebookのthrift、アリのdubbleはrpcフレームワークの中で比較的優秀な代表です
多くのフレームワークでは、python呼び出しインタフェースがサポートされていますが、他の言語で記述されているため、使用が複雑で、pythonic手動で滑稽ではありません.
ここではrabbitMQを借りて自分の簡単なrpcフレームワークを実現してもいいです.
皆さんはまずrabbitMQのapiを熟知しなければなりません.ここには非常に友好的なチュートリアルドキュメントが貼られています.ドキュメントはいくつかの文章だけではありません.早く半日ほどでメッセージキューのメカニズムを理解することができます.rabbitmqは生産でよく使われるツールとして、よく理解する必要があります.python版チュートリアルの最後の1編です.単純なrpc demoです.この実装は単一の関数のリモートコールとしてのみ使用できます.私はこの上で、複数の関数呼び出しをサポートするrpc実装サービスアーキテクチャ構成を再カプセル化しました.
システム
バージョン#バージョン#
ubuntu
16.04
RabbitMQ
3.5.7
python
3.6.2
pika
1.0.0
クライアント/呼び出し側:
オブジェクトの初期化方法には、次の2つのデータ構造が追加されています.
    #   id     
    self._req_id_result_map = {}
    #   id  
    self.req_id_list = []

リクエスト呼び出しのたびに、呼び出し結果を保存するための一意のIDと呼び出し者idリストを保存するための2つのデータ構造が必要です.
    rst = self._req_id_result_map.get(req_id, None)
    del self._req_id_result_map[req_id]

保存した結果からデータを取り出して、データを空にしてください.そうしないと、要求データ量が大きいとメモリにデータが溜まってメモリが溢れてしまうことがあります.
    def call(self, req_id, fn, param):
        """
        :param req_id:str       id      
        :param fn:str                     
        :param param:list               2                 
                                               
                                        fn(*param[0], **param[1])
        :return:
        """
        self.req_id_list.append(req_id)
        self.channel.basic_publish(exchange='', routing_key='rpc_queue',
                                   properties=pika.BasicProperties(reply_to=self.callback_queue,
                                                                   correlation_id=req_id),
                                   body=json.dumps([fn, param]))
        while not self._req_id_result_map.get(req_id, None):
            self.connection.process_data_events()
        return self.get_request_result(req_id)

ここでは呼び出し側のコアロジックです.callメソッドパラメータは要求id、呼び出し関数名です.呼び出し関数パラメータにはパラメータに特別な要求があり、長さが2の遍歴可能なコンテナ(リスト、元祖、set...)でなければなりません.コンテナの最初の要素は位置パラメータを表し、2番目の要素は名前付きパラメータの辞書です.
callメソッドは、まずデフォルトスイッチにデータを送信します.ここでは、ルーティングキーによってrpc_queueというキューにメッセージを指定し、その後、遠隔応答結果までブロックします.
サービス側/被調整側:
サービス側はリモート呼び出しの関数を用意し、モジュール化してサービス側のプライマリロジックモジュールに導入してconnection、rpc_queueキューを宣言し、コールバック関数をバインドして消費(consume)を開くことをお勧めします.
コールバック関数の主な論理:
def on_request(ch, method, props, body):
    fn, param = json.loads(body.decode())
    print(" [.] fib(%s)" % (param,))
    if not (isinstance(param, list) and len(param) == 2 and 
            isinstance(param[0], list) and isinstance(param[1], dict)):
        response = "error params please use like this fn(*param[0], **param[1])"
    else:
        try:
            fn = eval(fn)
            response = fn(*param[0], **param[1])
        except:
            response = "remote has not function like {}".format(fn)

    ch.basic_publish(exchange='', routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

ここで呼び出し側のパラメータフォーマットが正しいか否かを判断するには、呼び出しの関数がルーティングキー(呼び出し側宣言のキューでもある)として呼び出し側のreply_toが存在するかどうか(呼び出し側宣言のキューでもあることに注意して、2つのキューがあり、1つはリモート呼び出し情報を格納し、1つはリモート呼び出し結果を格納し、それぞれサービス側/クライアント宣言)は同時にcorrelation_idという呼び出し側の要求idを取り出してこのidを属性として呼び出し側に送信し、呼び出し側はこのid(tokenと見なすことができる)を介して呼び出し結果を取り出す
データ転送のデフォルトではバイナリが使用され、パラメータ部分はjsonでシーケンス化され、バイナリに変換されます.
最後に呼び出し端をカプセル化する方法:call_single、call_manyはこの2つの関数を使用して、rpc呼び出し端(クライアント)とリモート端(サービス端/被変調端)をそれぞれ異なるマシンに配置することができ、統一されたrabbitMQホストアドレスを構成することに注意し、メッセージキューはサービス端に配置してもよいし、クライアントに配置してもよいし、独立したサーバに配置してもよい.
最后にまとめます:ソースコードのアドレスはここにあります~<ここで最も简単で直接的な方法で1つのrpcフレームワークを実现して、现在関数の远隔呼び出しだけを支持して、后で方法の远隔呼び出しを加えることができて、原理も関数呼び出しと似ていて、兴味のある小さい仲间は自分で実现することができます