アクションにおける反応プログラミング
10068 ワード
アクションにおける反応プログラミング
このポストはreactive programming はDataBeacon ’という.ポストに触発されますrubber duck debugging method .
基本をカバーする代わりに、そこにはるかに良いリソースがあります、フォーカスは実際の例といくつかのアーキテクチャの決定の説明と生産準備コードになります.
Code snippets have been adapted to this blog post specifically, it is not a 1:1 copy of production code and some implementation details have hidden.
導入
漏斗コンポーネントはKafka トピックスSPA クライアント.これは、クライアントの接続を調整し、各クライアントのステータスと好みに適応したWebソケット接続(ソケット. io)にカフカの入力ストリームを変換します.
Kafkaトピックス-> funnel ->クライアント
漏斗はTypeScript とNode , 現在移行中であるがGo . この実装はノードの実装に焦点を当てていますが、私は将来のポストに進むための論理と移行をカバーするかもしれません.
詳細に漏斗を調査しましょう.
クライアントの設定
環境の詳細を設定した後、メインコードが起動します.
const connection$ = await socketIOServer();
ここで我々はconnection$
活字があるfromSocketIO
使用rxjs-socket.io . それぞれの新しいHTTPリクエストに対して、connection$
タイプのオブジェクトで加入者に通知しますConnector
interface Connector<L extends EventsMap, S extends EventsMap> {
from: <Ev extends EventNames<L>>(eventName: Ev) => Observable<EventParam<L, Ev>>;
to: <Ev extends EventNames<S>>(eventName: Ev) => Observer<Parameters<S[Ev]>>;
id: string;
user?: string;
onDisconnect: (callback: () => void) => void;
}
最初の2つのメソッドに注意してください.
Code snippets have been adapted to this blog post specifically, it is not a 1:1 copy of production code and some implementation details have hidden.
const connection$ = await socketIOServer();
interface Connector<L extends EventsMap, S extends EventsMap> {
from: <Ev extends EventNames<L>>(eventName: Ev) => Observable<EventParam<L, Ev>>;
to: <Ev extends EventNames<S>>(eventName: Ev) => Observer<Parameters<S[Ev]>>;
id: string;
user?: string;
onDisconnect: (callback: () => void) => void;
}
from
イベント名をパラメータとして受け取り、Observable
からreceive . to
イベント名をパラメータとして受け取り、Observer
to emit . from('action').subscribe(to('reducer'))
クライアントの状態をリモートで管理するために使用できる.パラメータ
id
and user
は自己記述的であり、onDisconnect
クライアントの切断時に実行されるコールバックを登録します.フードの下、ソケット.IOサーバはauth0-socketio Auth 0アイデンティティプロバイダーとのMANGE認証へのミドルウェア
この接続オブジェクトを使用して接続アクティビティを監視できます.
connection$.subscribe(({ id, user, onDisconnect }) => {
log(`Connected ${user} through session ${id}`);
onDisconnect(() => log(`Disconnected ${user} from session ${id}`));
});
このインターフェースはclient
観測できるようにするclient$
const client$ = connection$.pipe(map((connector) => client(connector)));
それぞれclient
観測可能であるstate$
クライアントの状態で更新されます.client$.subscribe(({ state$ }) => state$.subscribe((state) => log(util.inspect(state, { depth: 4 }))));
必要なことはクライアントにデータソースを添付することです.幸運にもstate$
各クライアントはattachDataSource
とremoveDataSource
つのソースだけを一度に添付することができます.attachDataSource
観測可能であると予想するremoveDataSource
ソースアップデートからクライアントを取り消す機能だけです.それは今我々が必要とするすべてである
client
将来のポストの世代では、データソースを設定しましょう.ソースからデータを取得する
Kafkaトピックを観測可能に変換するにはrxkfk 図書館.接続の詳細は
kafkaConnector
しかし、それは与えられた話題のメッセージでタイプ可能な観測を返します.const msg$ = await kafkaConnector();
簡単な購読で入力データを監視できます.msg$.subscribe(({ flights }) => log(`Got a new msg containing ${flights.length} flights`));
最後に、各クライアントを個別に購読する必要があります.すべてのクライアントを同じソースに接続したい場合には、単にオブザーバーを取り付ける必要がありますclient$
個人間のつながりを確立するclient
とmsg$
観測可能.client$.subscribe((client) => client.attachDataSource(msg$));
データソースの添付に加えて、クライアントはclient.removeDataSource()
メソッド.これにより、クライアントはデータソースを動的に変更できます.今度来る
これまでのコードの基本的な構造をカバーしている:クライアントとサーバー側のための2つのオブザーバブルを作成し、プログラム✨ 接続両方.
次の章では、ギャップを埋めるし、クライアントとデータソースの接続方法を説明し、どのように作成する
clients
からconnections
を使用してデータソースをフィルタリングする方法projection and combination operator .Reference
この問題について(アクションにおける反応プログラミング), 我々は、より多くの情報をここで見つけました https://dev.to/typesamuel/reactive-programming-in-action-part-1-h75テキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol