Rx で大量のリクエストを捌... きたい


Roppongi.js #7@Retty

2018年11月19日


自己紹介

ちきさん

GitHub/Twitter/Qiita: @ovrmrw

市ヶ谷のオプトという会社で働いています


今日話すこと

  • わからないことだらけ
  • 大量リクエストを RxJS で倒す
  • 結局よくわからない

事の始まり

  • Cloud Pub/Sub の Topic へのメッセージ送信は一度に複数送れるらしい。
  • なるべく束ねて送ったらサーバーの負荷が低いのでは。
  • RxJS で簡単に作れるのでは!!

わからないことだらけ

  • GCE わからない
  • GCE で Node.js のアプリを起動する方法わからない
  • Nginx わからない
  • 負荷試験わからない
  • Tsung わからない (※負荷試験ツール)
  • 思ったほど負荷かからない
  • チューニングわからない

LT の資料を作ろうと思っていたら GCP に詳しくなった。


※ 詳しくなりたかったわけではない。


その過程で生まれた残骸

内容がしょぼいので Qiita 限定共有で投稿。主に自分用のメモ。


(ここから本編)


要件

  • 大量の計測リクエストを捌きたい。数万 QPS ぐらい。
  • データは最終的に BigQuery でいい感じに使いたい。

※ 実務で実際にあった要件ではありません ※


設計

  • RxJS で捌いてみる。
  • サーバー1台で受けきってみる。
  • Cloud Pub/Sub に漏れなくデータを送れたら勝ち。その後のことは後で考えれば良い。

詳細な設計

  • CPU コアの数だけ cluster を使って fastify のサーバーアプリを起動する。
  • 1 メッセージ毎にTopic に送るのではなく、複数のメッセージを束ねて送る。 (一度に 1000 メッセージまで送れる)
    • リクエストの発生回数を可能な限り減らす。
  • 並列で発生させるリクエストの数を制限する。 (1本でも十分速い)
    • GCP ネットワーク内では 1 リクエストが 数 ms 〜 数十 ms で完了する。

できあがった Rx まわりのコード


import { Subject, interval } from 'rxjs';
import { buffer, mergeMap, tap, filter } from 'rxjs/operators';

const INTERVAL = 500;
const NTH = 900;

class RxClient {
  private dispatcher$ = new Subject();
  private notifier$ = new Subject();
  private counter = 0;

  constructor() {
    this.setDispatcher();
  }

  send(data, attributes) {
    const message = { data, attributes };
    this.dispatcher$.next(message);
  }

  private setDispatcher() {
    this.dispatcher$
      .pipe(
        tap(() => this.notifyEveryNthMessage(NTH)),
        buffer(this.notifier$),
        filter(messages => messages.length > 0),
        mergeMap(messages => this.publish(messages), 1)
      )
      .subscribe({
        next: () => this.notifier$.next(),
        error: err => console.error(err)
      });

    interval(INTERVAL).subscribe(() => this.notifier$.next());
  }

  private publish(messages) {
    // Mocking Cloud Pub/Sub Publish logic
    return new Promise(resolve => {
      const startTime = Date.now();
      setTimeout(() => {
        console.log(`messageIds: ${messages.length}, processed: ${Date.now() - startTime}ms`);
        resolve(true);
      }, Math.random() * 200);
    });
  }

  private notifyEveryNthMessage(n) {
    this.counter++;
    if (this.counter % n === 0) {
      this.notifier$.next();
    }
    if (this.counter > n * 1000 * 1000 * 1000 * 1000 * 1000) {
      this.counter = 0;
    }
  }
}

少し解説


リクエストの入り口

外部から呼ばれるメソッド send()
message を組み立てて dispatcher$ に送る。


  send(data, attributes) {
    const message = { data, attributes };
    this.dispatcher$.next(message);
  }

心臓部 dispatcher$ から始まるストリーム

  • messagebuffer に溜め続ける。
  • ストリームが一周したら buffer に溜まっている message を配列にして次に流す。
  • buffer から流れてきた message の配列が要素数 0 だったら捨てる。
  • message の配列を Topic に送る。 (非同期処理)
  • buffuer に溜まっている message を配列にして次に流す。
  • (繰り返す)

    this.dispatcher$
      .pipe(
        tap(() => this.notifyEveryNthMessage(NTH)),
        buffer(this.notifier$),
        filter(messages => messages.length > 0),
        mergeMap(messages => this.publish(messages), 1)
      )
      .subscribe({
        next: () => this.notifier$.next(),
        error: err => console.error(err)
      });

ポーリングしてストリームを flush

  • 一定時間ごとに buffer から message を吐き出させる。 (いわゆる flush)
  • 例えば GAE は放っておくと勝手にシャットダウンするので「dispatcher$ から message が n 回流れてきた」という条件以外にこのようなポーリングが必要になる。

    interval(INTERVAL).subscribe(() => this.notifier$.next());

message が n 回流れてきたらストリームを flush

  • Topic への送信は一度に 1000 メッセージまでなので、 n (= 900) 回毎に buffer から message を吐き出させる。 (いわゆる flush)

  private notifyEveryNthMessage(n) {
    this.counter++;
    if (this.counter % n === 0) {
      this.notifier$.next();
    }
  }

実行時のイメージ

非同期処理はモック


負荷試験をやってみたけど

  • 4,000 QPS ぐらいまでは確認できたが、それ以上はどこがボトルネックになっているのわからない。
    • (サーバースペックは 4 CPU CORE, 4 GB RAM)
    • (GCE => GCE で Tsung による負荷試験を実施)
  • CPU使用率が100%近くなっているので1台ではここらへんが限界なのか。。。

まとめ

  • Topic への送信はメッセージを束ねてもレスポンスタイムはほとんど変わらない。
    • (230 メッセージでも 30 ms くらい)
  • テスト目的なら GCE のプリエンプティブを使おう。なぜなら安い。
  • 実は GAE でやれば良かったのでは?

Thanks !