Python 3はwebsocketとrabbitmqの簡単な接続を実現
31824 ワード
Websocketとrabbitmqの簡単な接続
近ごろは用事がなく、以前書いた項目を振り返って整理した.このいわゆるwebsocketとrabbitmqをつなぐ「橋渡し」は、実は大きなプロジェクトの小さなモジュールにすぎない.設計当初の欠陥のため、この小型の「サーバ」を使用して需要を達成しなければならない.ここで、web側はdjango-channelsを用いて書かれたwebsocketベースのwebアプリケーションであり、データの特殊性のため、WPFベースのデスクトップアプリケーションデータとドッキングする必要がある.しかし,このWPFプログラムはrabbitmqメッセージミドルウェアに完全に依存してデータを伝達するため,このWPFのデータが受信されても送信されてもrabbitmqに完全に依存する.話を戻せば、当初は少し時間をかけてこれを書いたが、事が急いでいて、功力が家に着いていないので、1週間運行した後、正式に銃殺された.そのため、これは「半成功」のサービスプログラムにすぎません.使えるし、効果はまあまあなので、成功しました.効率が悪く、不安定なので失敗です.
おおよそのデザイン
1、受信と送信
Websocketとrabbitmqを接続するには,自然両端の受信と送信が必要である.区別を容易にするために、websocket向けはpublisher(このサービスはrabbitmqと同じサーバに配備されているため)、rabbitmq向けはconsumerである.publisherはconsumerと極めて似た構造と機能を持っている.
1、consumer
まずRounting_を定義しますkey
そしてpikaモジュールのioloopを用いてリスニングサイクルを行う
そして定義on_Messageはリスニングコールバック関数として
2、publisher
publisherはconsumer構造とほぼ同じで,私は便宜上,その中の重要なrecive法を提案した.
便利なこともあって、循環をデッドサイクルにした.相変わらずドキドキバッグを設置します.
websocketにはpikaのioloopの下位層がないので、ioloopを書くのは複雑で、flaskを使ったりtornadoを直接使ったりして実現したりして、tornadoにサービスをマウントすればいいです.
また,このpublisher接続websocketとrabbitmqはいずれも短い接続を用いており,これも不安定でオーバーヘッドが大きい原因の一つである.次はC#を使ってこれらの機能を実現する設計とコードを示します.
2、データの変換
データの変換はもちろん、両端が設計上の理由でフィールド、フィールドタイプなどが異なるため、二次処理が必要です.ここに基本的なコードが貼られているのは,クラスの設計を検討するためである.親クラスは抽象としてだけでなく(抽象できるものは何もないので、インタフェースを最大で提供します)、「転送」として、呼び出しフェーズで子クラスを非表示にし、親クラスを直接使用します.余計なことはあるが、特別な場合にはメリットがある.
締めくくり
この文章は完全に当時の需要の実現を記録するためで、他の意味はありません.
しかし、反省した結果、元の方法は極めて不安定で丈夫ではないことが分かった.
そこで次編では、C#を使ってこのニーズを再実現します.
近ごろは用事がなく、以前書いた項目を振り返って整理した.このいわゆるwebsocketとrabbitmqをつなぐ「橋渡し」は、実は大きなプロジェクトの小さなモジュールにすぎない.設計当初の欠陥のため、この小型の「サーバ」を使用して需要を達成しなければならない.ここで、web側はdjango-channelsを用いて書かれたwebsocketベースのwebアプリケーションであり、データの特殊性のため、WPFベースのデスクトップアプリケーションデータとドッキングする必要がある.しかし,このWPFプログラムはrabbitmqメッセージミドルウェアに完全に依存してデータを伝達するため,このWPFのデータが受信されても送信されてもrabbitmqに完全に依存する.話を戻せば、当初は少し時間をかけてこれを書いたが、事が急いでいて、功力が家に着いていないので、1週間運行した後、正式に銃殺された.そのため、これは「半成功」のサービスプログラムにすぎません.使えるし、効果はまあまあなので、成功しました.効率が悪く、不安定なので失敗です.
おおよそのデザイン
1、受信と送信
Websocketとrabbitmqを接続するには,自然両端の受信と送信が必要である.区別を容易にするために、websocket向けはpublisher(このサービスはrabbitmqと同じサーバに配備されているため)、rabbitmq向けはconsumerである.publisherはconsumerと極めて似た構造と機能を持っている.
1、consumer
まずRounting_を定義しますkey
EXCHANGE = 'example'
EXCHANGE_TYPE = 'direct'
QUEUE = 'listener'
ROUTING_KEYS = [
"A_new_example",
"A_update_example",
"A_delete_example",
"B_new_example",
"B_update_example",
"B_delete_example",
]
そしてpikaモジュールのioloopを用いてリスニングサイクルを行う
# consumer
def __init__(self, amqp_url):
self._connection = None
self._channel = None
self._closing = False
self._consumer_tag = None
self._url = amqp_url
def connect(self):
return pika.SelectConnection(pika.URLParameters(self._url),
self.on_connection_open,
stop_ioloop_on_close=False)
def run(self):
self._connection = self.connect()
self._connection.ioloop.start()
そして定義on_Messageはリスニングコールバック関数として
def start_consuming(self):
self.add_on_cancel_callback()
self._consumer_tag = self._channel.basic_consume(self.on_message, self.QUEUE)
def on_message(self, unused_channel, basic_deliver, properties, body):
routing_key = basic_deliver.routing_key
# desk websocket , ROUTING_KEYS A、B
desk = routing_key.split("_")[0].upper()
type_ = "_".join(routing_key.split("_")[1:]).upper()
body = json.loads(body)
# , from_web is_bridge
if body.get("from_web"):
content = body.get("orig_content")
content["is_bridge"] = True
try:
# web rabbitmq rabbitmq
# is_bridge ,
# websocket 、
ws = websocket.create_connection(config.websocket_address)
ws.send(json.dumps(content))
ws.close()
except Exception as err:
LOGGER.exception("WebSocket connect FAILED")
else:
# new_bwic
# ...
# is_new
# rabbitmq websocket
if is_new:
if not body.get("Desk"):
body["Desk"] = desk
try:
#
context = DataManager(SEND).dispatch(body, type_)
# websocket , rabbitmq
ws = websocket.create_connection(config.websocket_address)
ws.send(json.dumps(context))
result = json.loads(ws.recv())
self.acknowledge_message(basic_deliver.delivery_tag)
ws.close()
except Exception as err:
LOGGER.exception("WebSocket connect FAILED")
# ioloop , sleep1
time.sleep(1)
2、publisher
publisherはconsumer構造とほぼ同じで,私は便宜上,その中の重要なrecive法を提案した.
便利なこともあって、循環をデッドサイクルにした.相変わらずドキドキバッグを設置します.
websocketにはpikaのioloopの下位層がないので、ioloopを書くのは複雑で、flaskを使ったりtornadoを直接使ったりして実現したりして、tornadoにサービスをマウントすればいいです.
また,このpublisher接続websocketとrabbitmqはいずれも短い接続を用いており,これも不安定でオーバーヘッドが大きい原因の一つである.次はC#を使ってこれらの機能を実現する設計とコードを示します.
def recive():
global reconnect_count
while True:
try:
# , 。
# C#
ws = websocket.create_connection(address, sslopt={"check_hostname": False})
try:
result = json.loads(ws.recv())
if result.get("is_web", None) is True:
if not result.get("is_bridge", None):
#
context, routing_key = DataManager(RECEIVE).dispatch(result)
# rabbitmq,
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.exchange_declare(exchange='bwic', exchange_type='direct')
channel.basic_publish(exchange='bwic', routing_key=routing_key, body=json.dumps(context))
connection.close()
ws.close()
else:
LOGGER.warning("This Message is from bridge, so ignore it")
ws.close()
else:
LOGGER.warning("This Message is not from WEB, so ignore it")
ws.close()
except Exception as err:
ws.close()
except Exception as err:
LOGGER.exception("WebSocket connect FAILED")
# , ,
reconnect_count = reconnect_count + 1
if reconnect_count > 3:
LOGGER.warning("SHOULD Notice the PERSON in Charge that THE CONNECTION HAS SOMETHING WRONG")
LOGGER.warning("The Server is CLOSING DOWN...")
break
pass # ,
2、データの変換
データの変換はもちろん、両端が設計上の理由でフィールド、フィールドタイプなどが異なるため、二次処理が必要です.ここに基本的なコードが貼られているのは,クラスの設計を検討するためである.親クラスは抽象としてだけでなく(抽象できるものは何もないので、インタフェースを最大で提供します)、「転送」として、呼び出しフェーズで子クラスを非表示にし、親クラスを直接使用します.余計なことはあるが、特別な場合にはメリットがある.
SEND = "DataManager-Send"
RECEIVE = "DataManager-Receive"
class DataManager(object):
def __init__(self, TYPE):
self.TYPE = TYPE
def dispatch(self, content, *k):
if self.TYPE == SEND:
return DataManagerSend.dispatch(content, *k)
elif self.TYPE == RECEIVE:
return DataManagerReceive.dispatch(content)
else:
pass
def DELETE_EXAMPLE(self, *k):
pass
def NEW_EXAMPLE(self, *k):
pass
def UPDATE_EXAMPLE(self, *k):
pass
class DataManagerSend(DataManager):
def __init__(self, content, TYPE=None):
super().__init__(TYPE)
self.content = content
@classmethod
def dispatch(cls, content, *k, ):
manager = cls(content)
if hasattr(manager, k[0]):
method = getattr(manager, k[0])
return method(content)
else:
raise NotImplementedError('{} not implemented'.format(type_))
def DELETE_EXAMPLE(self, content):
pass
def NEW_EXAMPLE(self, content):
pass
def UPDATE_EXAMPLE(self, content):
pass
class DataManagerReceive(DataManager):
def __init__(self, content, TYPE=None):
super().__init__(TYPE)
self.content = content
@classmethod
def dispatch(cls, content, *k):
manager = cls(content)
type_ = content.get('type').upper()
if hasattr(manager, type_):
method = getattr(manager, type_)
return method(content)
else:
raise NotImplementedError('{} not implemented'.format(type_))
def DELETE_EXAMPLE(self, *k):
pass
def NEW_EXAMPLE(self, *k):
pass
def UPDATE_EXAMPLE(self, *k):
pass
締めくくり
この文章は完全に当時の需要の実現を記録するためで、他の意味はありません.
しかし、反省した結果、元の方法は極めて不安定で丈夫ではないことが分かった.
そこで次編では、C#を使ってこのニーズを再実現します.