Redisストリームを使ったリアルタイムのチャット


オリジナルの英語版はReddieの公式ブログをご覧ください。

この記事は、まだリリースされていない、Redisストリームの探求を目的としています。Redisストリームを使うと、いくつかのパワフルなユースケースが可能となります。ここではストリームを利用して、Hipchat、SlackやIRCのような、エンドツーエンドのリアルタイムのチャットルームを作っていきます。

Github

Github上でチェック:github: https://github.com/aprice-/redisstreamchat

Redisストリーム

Redisストリームは、Redis用に構築された新しいデータ構造で、時系列データを扱います。Redisリストに似ていますが、以下の2点において異なります。

  1. オリジナルインデックスではなく、タイムスタンプを使用
  2. 各ストリームは、Redisハッシュに似た複数フィールドを持つことが可能

ストリームエントリーのタイムスタンプIDが、シークエンス(例:1523418166062-0)をインクルードするため、イベントの起きる順は維持されます。よって指定時間範囲のストリームエントリーにクエリを出したり、指定時間(もしくは現時点)に新エントリーを加えたりすることができます。またリスト同様、ブロックオペレーション(BLPOPBRPOPBRPOPLPUSH)を使い、新イベントを待つようブロックを設定できます。

ストリームコマンド

イベントの追加

XADD <key> [MAXLEN <max-length>] <*/id> <field> <value>...

ストリームにイベントを追加する

  • id: イベント追加ID(最新でなければならない)。現時点に追加するときは(*)を使用
  • max-length: オプション。最大長。ストリームを指定の長さに制限する
  • 各設定フィールド
    • field name: フィールド名
    • value: 設定値

イベントの読み出し

XREVRANGE <key> <end/+> <start/-> [COUNT <count>]

ストリームから複数イベントを降順に読み出す

  • end: 読み出し開始ID (ストリーム先頭からには‘+‘を使用))
  • start: 読み出し終了ID (ストリーム後尾までには‘-’を使用)
  • count: オプション。最大読み出しイベント数

XREAD BLOCK <timeout> STREAMS <key>... <$/id>...

規定のストリーム上でイベントが有効になるまでブロックする

  • timeout: 新イベントを待つ最大期間
  • 各読み出しストリーム
    • key: ストリームキー
    • id: 読み出し開始ID (現時点からには ‘$’を使用)

XLEN <key>

ストリーム上のイベント数を取得する

ストリームを使いチャットを作る

複数のチャットルームをサポートするリアルタイムのチャットシステムを作るため、ここでは各ルーム毎に一つのストリームを用いて、チャットルームのメッセージとメンバーのためのセットをモデル化します。

例えば「Welcome」という名でチャットルームのモデルを作るとすると、Redisのコマンドは以下の通りです:

// track join notification
> XADD channel:welcome:log * message "adam has joined the channel."
1523499744387-0

// add to members list
> SADD channel:welcome:members "adam"
(integer) 1

// get the list of channel members
> SMEMBERS channel:welcome:members
1) "adam"

// send chat message
> XADD channel:welcome:log * message "Hi everyone!" userName "adam"
1523499778907-0

// read new messages
> XREAD BLOCK 5000 STREAMS channel:welcome:log $
1) 1) "channel:welcome:log"
   2) 1) 1) 1523499832690-0
         2) 1) "message"
            2) "Hi everyone!"
            3) "userName"
            4) "adam"
(1.67s)

// get latest 100 messages
> XREVRANGE channel:welcome:log + - COUNT 100
1) 1) 1523499832690-0
   2) 1) "message"
      2) "Hi everyone!"
      3) "userName"
      4) "adam"
2) 1) 1523499744387-0
   2) 1) "message"
      2) "adam has joined the channel."

残りのスタック

Redisのデータストレージ・ストラテジーが解決したので、次はNode.jsを使い、残りのリアルタイム・ウェブアプリケーション構造を構築していきます。

スタック

リアルタイムのチャットを可能にするため、イベント駆動型フレームワークとテクノロジーを併用していきます。

  • Angular: ブラウザ内のシングルページアプリケーション
  • socket.io: リアルタイムアップデートのためのWebSocketサーバー
  • Express: 全てのノンリアルタイムのためのHTTP API
  • RxJS: ストリーム管理
  • Redis: ストレージ

RxJSとは?

RxJSは、JavaScriptのリアクティブプログラミングを行うためのライブラリです。RxJSではPromiseに似たObservableという概念を取り入れ、ストリーム概念と連携しています。

忘れている方のために簡単に説明すると、Promiseは非同期処理をモデルとしており、処理が終了していない場合、完了およびエラーの二つの状態を通知します( thenもしくはcatch経由にて)。

一方Observableは、非同期ストリームをモデルとし、完了およびエラー(Promise同様)に加え、新イベント発生の三つの状態を通知します。これによりObservableは長く存在することができ、長期的に複数のイベントを伝搬させることが可能となります。Promiseの場合と同様、組み合わせ方次第でさまざまな望ましい結果を達成することができます。

Angularも非同期処理には、Promiseよりも主にObservableを使用しています。

RxJSを使いRedisストリームをポール

Redisストリームのポーリングは、XREADコマンドを繰り返し使い、ブロックしたり新メッセージを待ったりすることで達成しています。メッセージを受信したら、ハンドルし、IDを記録し、ストリームを続きから読みだすように通知します。

const pollCache = {}; // storage for a shared cache of polls

function pollNewMessages (channelName) {
    if (pollCache[channelName]) return pollCache[channelName]; // use the cache

    let seenId = null; // variable to hold our position in the stream
    let key = getMessagesKey(channelName); // derive the key (i.e. channel:welcome:log)
    let connection = redis.duplicate(); // create a new connection for polling this stream

    return pollCache[channelName] = Rx.Observable.of(null) // return and cache an Observable
        .expand(() => Rx.Observable.fromPromise(connection.xread(10000, {key, id: seenId})))
        // expand calls the provided function in a loop
        // XREAD will be called on our stream key with the latest ID repeatedly
        .filter(streams => streams) // do not process empty results (i.e. time out was reached)
        .flatMap(streams => streams) // process each stream returned individually
        .flatMap(stream => stream[1]) // process each event in each stream individually
        .map(streamEvent => parseChannelMessages(streamEvent, channelName)) // parse the event
        .do(streamEvent => { // for each event
            if (streamEvent.id > seenId || !seenId) { // if it is latest seen event
                seenId = streamEvent.id; // record it as such
            }
        })
        .finally(() => { // when the stream is cleaned up
            connection.quit(); // close the redis connection
            delete pollCache[channelName]; // remove it from the cache
        })
        .publish() // wrap the observable in a shared one
        .refCount(); // track subscriptions to it so it is cleaned up automatically
}

結果として、キャッシュされ、シェア可能で、自動的にクリーンアップされた、スクロールで読むことのできるRedisストリームが手に入ります。

RxJSとsocket.ioの併用

RxJSはsocket.ioのWebSocketサーバー処理をシンプルにしてくれます。

let sockets = [];

let socketServer = socketio(server, {path: '/api/socket.io'});

Rx.Observable.fromEvent(socketServer, 'connection') // when a new connection occurs
    .flatMap(socket => { // for each connection
        sockets.push(socket); // keep list of sockets
        const part$ = Rx.Observable.fromEvent(socket, 'part'); // when user leaves a channel
        const disconnect$ = Rx.Observable.fromEvent(socket, 'disconnect') // user disconnects
            .do(() => sockets.splice(sockets.indexOf(socket), 1)); // remove socket

        return Rx.Observable.fromEvent(socket, 'join') // when user joins a channel
            .flatMap(channel => { // for the joined channel
                return data.pollNewMessages(channel) // get a poller
                    .takeUntil(part$.filter(c => c === channel)) // listen until channel is left
            })
            .do(message => socket.emit('message', message)) // emit each message on the socket
            .takeUntil(disconnect$); // stop listening if user disconnects
    })
    .finally(() => sockets.forEach((socket) => {socket.destroy()})) // destroy sockets
    .takeUntil(Rx.Observable.fromEvent(process, 'SIGINT')) // listen until server is stopped
    .subscribe(() => {}); // start processing

これで、ユーザーが参加したり退出する毎に、オンザフライでチャネルをポールするものを作れるソケットサーバができました。全ユーザーがチャネルを退出した際のクリーンアップも担ってくれます。

Angularを使ったクライアントサイドでのイベントの処理

シングルページアプリケーションでは、ウェブソケット接続処理とアプリケーション状態のトラックに、AngularのService を使います。ユーザーが参加する各ルームは、それぞれのChannelオブジェクトで表されており、UIでやりとりすることが可能です。

ウェブソケット接続

socket = io.connect({path: '/api/socket.io'}); // connect the web socket
message = Observable.fromEvent<IMessage>(this.socket, 'message'); // socket received messages
reconnect = Observable.fromEvent(this.socket, 'reconnect'); // socket reconnections

チャネルの受信メッセージ処理

messages: IMessage[] = [];
latestSeenId: string;

this.channelService.message
  .filter(message => message.channel === this.name) // filter message stream to this channel
  .subscribe(message => {
    this.messages.push(message); // add to the end of the array
    if (this.channelService.current == this) { // if this channel has focus
      this.latestSeenId = message.id; // consider this message seen
    }
  });

メッセージの表示

<div class="row" *ngFor="let message of channelService.current?.messages; trackBy: trackByFn">
  <div class="col">
    <span class="text-info" *ngIf="message.userName">&lt;{{message.userName}}&gt;</span>
    <span [class.text-success]="message.join" [class.text-warning]="message.part">
      {{message.message}}
    </span>
  </div>
  <div class="col-3 col-md-3 col-lg-2 text-right">
    <span class="text-secondary mr-3 timestamp">
      {{message.id | humanRelativeDate}}
    </span>
  </div>
</div>

HTTP API

他のすべて(過去のメッセージの取得、チャネルのメンバーの取得、参加や退出など)には、HTTP APIを使います。ユーザーの名前は、各リクエストにHTTPヘッダーとしてインクルードされています。セキュリティや認証は、読者のあなたが実装するかどうかに委ねられています。

// join a channel
router.post('/join/:channel', async (req, res) => {
    let channel = req.params.channel;
    await data.join(channel, req.userName);
    res.send({success: true});
});

// leave a channel
router.post('/part/:channel', async (req, res) => {
    let channel = req.params.channel;
    await data.part(channel, req.userName);
    res.send({success: true});
});

// send a message to a channel
router.post('/channel/:channel', async (req, res) => {
    let channel = req.params.channel;
    let message = req.body.message;
    await data.send(channel, req.userName, message);
    res.send({success: true});
});

// get messages from a channel
router.get('/channel/:channel', async (req, res) => {
    let channel = req.params.channel;
    let before = req.query.before;
    let messages = await data.getMessages(channel, before);
    res.send({success: true, messages});
});

// get members of a channel
router.get('/channel/:channel/members', async (req, res) => {
    let channel = req.params.channel;
    let members = await data.getMembers(channel);
    res.send({success: true, members});
});

// get the user's name from the request
app.use((req, res, next) => {
    req.userName = req.get('x-username');
    next();
});

Redis unstable

公式にはまだストリームはリリースされていませんので、使用するにはRedisをunstableブランチから入手する必要があります。今回、ビルドと立ち上げのために、Dockerfiledocker-compose.ymlのセットアップをインクルードしています。

公式のRedis Docker image alpine variantに基づいていますが、tarリリースをダウンロードする代わりに、gitでアンステーブルブランチをクローンしています。

サンプルの実行

サンプルを実行する一番簡単な方法は、docker-composeを使って実際に試してみることです:

$ git clone https://github.com/aprice-/redisstreamchat
$ cd redisstreamchat
$ docker-compose up -d

さぁ、http://localhost:3000/へとナビゲートしましょう!