アクションにおける反応プログラミング


アクションにおける反応プログラミング


このポストはreactive programmingDataBeacon ’という.ポストに触発されますrubber duck debugging method .
基本をカバーする代わりに、そこにはるかに良いリソースがあります、フォーカスは実際の例といくつかのアーキテクチャの決定の説明と生産準備コードになります.

Code snippets have been adapted to this blog post specifically, it is not a 1:1 copy of production code and some implementation details have hidden.


導入


漏斗コンポーネントはKafka トピックスSPA クライアント.これは、クライアントの接続を調整し、各クライアントのステータスと好みに適応したWebソケット接続(ソケット. io)にカフカの入力ストリームを変換します.
Kafkaトピックス-> funnel ->クライアント
漏斗はTypeScriptNode , 現在移行中であるがGo . この実装はノードの実装に焦点を当てていますが、私は将来のポストに進むための論理と移行をカバーするかもしれません.
詳細に漏斗を調査しましょう.

クライアントの設定


環境の詳細を設定した後、メインコードが起動します.
const connection$ = await socketIOServer();
ここで我々はconnection$ 活字があるfromSocketIO 使用rxjs-socket.io . それぞれの新しいHTTPリクエストに対して、connection$ タイプのオブジェクトで加入者に通知しますConnector
interface Connector<L extends EventsMap, S extends EventsMap> {
    from: <Ev extends EventNames<L>>(eventName: Ev) => Observable<EventParam<L, Ev>>;
    to: <Ev extends EventNames<S>>(eventName: Ev) => Observer<Parameters<S[Ev]>>;
    id: string;
    user?: string;
    onDisconnect: (callback: () => void) => void;
}
最初の2つのメソッドに注意してください.
  • from イベント名をパラメータとして受け取り、Observable からreceive .
  • to イベント名をパラメータとして受け取り、Observer to emit .
  • これによりfrom('action').subscribe(to('reducer')) クライアントの状態をリモートで管理するために使用できる.
    パラメータid and user は自己記述的であり、onDisconnect クライアントの切断時に実行されるコールバックを登録します.
    フードの下、ソケット.IOサーバはauth0-socketio Auth 0アイデンティティプロバイダーとのMANGE認証へのミドルウェア
    この接続オブジェクトを使用して接続アクティビティを監視できます.
    connection$.subscribe(({ id, user, onDisconnect }) => {
        log(`Connected ${user} through session ${id}`);
        onDisconnect(() => log(`Disconnected ${user} from session ${id}`));
    });
    
    このインターフェースはclient 観測できるようにするclient$
    const client$ = connection$.pipe(map((connector) => client(connector)));
    
    それぞれclient 観測可能であるstate$ クライアントの状態で更新されます.
    client$.subscribe(({ state$ }) => state$.subscribe((state) => log(util.inspect(state, { depth: 4 }))));
    
    必要なことはクライアントにデータソースを添付することです.幸運にもstate$ 各クライアントはattachDataSourceremoveDataSourceつのソースだけを一度に添付することができます.attachDataSource 観測可能であると予想するremoveDataSource ソースアップデートからクライアントを取り消す機能だけです.
    それは今我々が必要とするすべてであるclient 将来のポストの世代では、データソースを設定しましょう.

    ソースからデータを取得する


    Kafkaトピックを観測可能に変換するにはrxkfk 図書館.接続の詳細はkafkaConnector しかし、それは与えられた話題のメッセージでタイプ可能な観測を返します.
    const msg$ = await kafkaConnector();
    
    簡単な購読で入力データを監視できます.
    msg$.subscribe(({ flights }) => log(`Got a new msg containing ${flights.length} flights`));
    
    最後に、各クライアントを個別に購読する必要があります.すべてのクライアントを同じソースに接続したい場合には、単にオブザーバーを取り付ける必要がありますclient$ 個人間のつながりを確立するclientmsg$ 観測可能.
    client$.subscribe((client) => client.attachDataSource(msg$));
    
    データソースの添付に加えて、クライアントはclient.removeDataSource() メソッド.これにより、クライアントはデータソースを動的に変更できます.

    今度来る


    これまでのコードの基本的な構造をカバーしている:クライアントとサーバー側のための2つのオブザーバブルを作成し、プログラム✨ 接続両方.
    次の章では、ギャップを埋めるし、クライアントとデータソースの接続方法を説明し、どのように作成するclients からconnections を使用してデータソースをフィルタリングする方法projection and combination operator .