Python 3はwebsocketとrabbitmqの簡単な接続を実現

31824 ワード

Websocketとrabbitmqの簡単な接続
近ごろは用事がなく、以前書いた項目を振り返って整理した.このいわゆるwebsocketとrabbitmqをつなぐ「橋渡し」は、実は大きなプロジェクトの小さなモジュールにすぎない.設計当初の欠陥のため、この小型の「サーバ」を使用して需要を達成しなければならない.ここで、web側はdjango-channelsを用いて書かれたwebsocketベースのwebアプリケーションであり、データの特殊性のため、WPFベースのデスクトップアプリケーションデータとドッキングする必要がある.しかし,このWPFプログラムはrabbitmqメッセージミドルウェアに完全に依存してデータを伝達するため,このWPFのデータが受信されても送信されてもrabbitmqに完全に依存する.話を戻せば、当初は少し時間をかけてこれを書いたが、事が急いでいて、功力が家に着いていないので、1週間運行した後、正式に銃殺された.そのため、これは「半成功」のサービスプログラムにすぎません.使えるし、効果はまあまあなので、成功しました.効率が悪く、不安定なので失敗です.
おおよそのデザイン
1、受信と送信
Websocketとrabbitmqを接続するには,自然両端の受信と送信が必要である.区別を容易にするために、websocket向けはpublisher(このサービスはrabbitmqと同じサーバに配備されているため)、rabbitmq向けはconsumerである.publisherはconsumerと極めて似た構造と機能を持っている.
1、consumer
まずRounting_を定義しますkey
EXCHANGE = 'example'
EXCHANGE_TYPE = 'direct'
QUEUE = 'listener'

ROUTING_KEYS = [
    "A_new_example",
    "A_update_example",
    "A_delete_example",
    "B_new_example",
    "B_update_example",
    "B_delete_example",
]

そしてpikaモジュールのioloopを用いてリスニングサイクルを行う
# consumer        
def __init__(self, amqp_url):
    self._connection = None
    self._channel = None
    self._closing = False
    self._consumer_tag = None
    self._url = amqp_url

def connect(self):
    return pika.SelectConnection(pika.URLParameters(self._url),
                                 self.on_connection_open,
                                 stop_ioloop_on_close=False)

def run(self):
    self._connection = self.connect()
    self._connection.ioloop.start()

そして定義on_Messageはリスニングコールバック関数として
def start_consuming(self):
    self.add_on_cancel_callback()
    self._consumer_tag = self._channel.basic_consume(self.on_message, self.QUEUE)

def on_message(self, unused_channel, basic_deliver, properties, body):
    routing_key = basic_deliver.routing_key
    # desk websocket       , ROUTING_KEYS  A、B
    desk = routing_key.split("_")[0].upper() 
    type_ = "_".join(routing_key.split("_")[1:]).upper()

    body = json.loads(body)
    
    #               ,  from_web is_bridge         
    if body.get("from_web"):
        content = body.get("orig_content")
        content["is_bridge"] = True
        try:
            #        web      rabbitmq    rabbitmq     
            #   is_bridge   ,  
            # websocket      、      
            ws = websocket.create_connection(config.websocket_address)
            ws.send(json.dumps(content))
            ws.close()
        except Exception as err:
            LOGGER.exception("WebSocket connect FAILED")
    else:
        #   new_bwic      
        # ...      
        # is_new         
        #     rabbitmq          websocket 
        if is_new:
            if not body.get("Desk"):
                body["Desk"] = desk
            try:
                #           
                context = DataManager(SEND).dispatch(body, type_)
                #    websocket     ,   rabbitmq    
                ws = websocket.create_connection(config.websocket_address)
                ws.send(json.dumps(context))
                result = json.loads(ws.recv())
                self.acknowledge_message(basic_deliver.delivery_tag)
                ws.close()
            except Exception as err:
                LOGGER.exception("WebSocket connect FAILED")
    #   ioloop      ,  sleep1     
    time.sleep(1)

2、publisher
publisherはconsumer構造とほぼ同じで,私は便宜上,その中の重要なrecive法を提案した.
便利なこともあって、循環をデッドサイクルにした.相変わらずドキドキバッグを設置します.
websocketにはpikaのioloopの下位層がないので、ioloopを書くのは複雑で、flaskを使ったりtornadoを直接使ったりして実現したりして、tornadoにサービスをマウントすればいいです.
また,このpublisher接続websocketとrabbitmqはいずれも短い接続を用いており,これも不安定でオーバーヘッドが大きい原因の一つである.次はC#を使ってこれらの機能を実現する設計とコードを示します.
def recive():
    global reconnect_count
    while True:
        try:
            #         ,    。
            #        C#     
            ws = websocket.create_connection(address, sslopt={"check_hostname": False})
            try:
                result = json.loads(ws.recv())
                if result.get("is_web", None) is True:
                    if not result.get("is_bridge", None):
                        #      
                        context, routing_key = DataManager(RECEIVE).dispatch(result)
                       
                       	#   rabbitmq,   
                        connection = pika.BlockingConnection(parameters)
                        channel = connection.channel()
                        channel.exchange_declare(exchange='bwic', exchange_type='direct')
                        channel.basic_publish(exchange='bwic', routing_key=routing_key, body=json.dumps(context))

                        connection.close()
                        ws.close()
                    else:
                        LOGGER.warning("This Message is from bridge, so ignore it")
                        ws.close()
                else:
                    LOGGER.warning("This Message is not from WEB, so ignore it")
                    ws.close()
            except Exception as err:
                ws.close()

        except Exception as err:
            LOGGER.exception("WebSocket connect FAILED")
            #        ,        ,      
            reconnect_count = reconnect_count + 1
            if reconnect_count > 3:
                LOGGER.warning("SHOULD Notice the PERSON in Charge that THE CONNECTION HAS SOMETHING WRONG")
                LOGGER.warning("The Server is CLOSING DOWN...")
                break
                pass  #     ,        

2、データの変換
データの変換はもちろん、両端が設計上の理由でフィールド、フィールドタイプなどが異なるため、二次処理が必要です.ここに基本的なコードが貼られているのは,クラスの設計を検討するためである.親クラスは抽象としてだけでなく(抽象できるものは何もないので、インタフェースを最大で提供します)、「転送」として、呼び出しフェーズで子クラスを非表示にし、親クラスを直接使用します.余計なことはあるが、特別な場合にはメリットがある.
SEND = "DataManager-Send"
RECEIVE = "DataManager-Receive"


class DataManager(object):
    def __init__(self, TYPE):
        self.TYPE = TYPE

    def dispatch(self, content, *k):
        if self.TYPE == SEND:
            return DataManagerSend.dispatch(content, *k)
        elif self.TYPE == RECEIVE:
            return DataManagerReceive.dispatch(content)
        else:
            pass

    def DELETE_EXAMPLE(self, *k):
        pass

    def NEW_EXAMPLE(self, *k):
        pass

    def UPDATE_EXAMPLE(self, *k):
        pass


class DataManagerSend(DataManager):
    def __init__(self, content, TYPE=None):
        super().__init__(TYPE)
        self.content = content

    @classmethod
    def dispatch(cls, content, *k, ):
        manager = cls(content)

        if hasattr(manager, k[0]):
            method = getattr(manager, k[0])

            return method(content)
        else:
            raise NotImplementedError('{} not implemented'.format(type_))

    def DELETE_EXAMPLE(self, content):
        pass

    def NEW_EXAMPLE(self, content):
        pass

    def UPDATE_EXAMPLE(self, content):
        pass


class DataManagerReceive(DataManager):
    def __init__(self, content, TYPE=None):
        super().__init__(TYPE)
        self.content = content

    @classmethod
    def dispatch(cls, content, *k):
        manager = cls(content)

        type_ = content.get('type').upper()
        
        if hasattr(manager, type_):
            method = getattr(manager, type_)
            return method(content)
        else:
            raise NotImplementedError('{} not implemented'.format(type_))

    def DELETE_EXAMPLE(self, *k):
        pass

    def NEW_EXAMPLE(self, *k):
        pass

    def UPDATE_EXAMPLE(self, *k):
        pass


締めくくり
この文章は完全に当時の需要の実現を記録するためで、他の意味はありません.
しかし、反省した結果、元の方法は極めて不安定で丈夫ではないことが分かった.
そこで次編では、C#を使ってこのニーズを再実現します.