Rx で大量のリクエストを捌... きたい
2018年11月19日
自己紹介
ちきさん
GitHub/Twitter/Qiita: @ovrmrw
市ヶ谷のオプトという会社で働いています
今日話すこと
- わからないことだらけ
- 大量リクエストを RxJS で倒す
- 結局よくわからない
事の始まり
- Cloud Pub/Sub の Topic へのメッセージ送信は一度に複数送れるらしい。
- なるべく束ねて送ったらサーバーの負荷が低いのでは。
- RxJS で簡単に作れるのでは!!
わからないことだらけ
- GCE わからない
- GCE で Node.js のアプリを起動する方法わからない
- Nginx わからない
- 負荷試験わからない
- Tsung わからない (※負荷試験ツール)
- 思ったほど負荷かからない
- チューニングわからない
LT の資料を作ろうと思っていたら GCP に詳しくなった。
※ 詳しくなりたかったわけではない。
その過程で生まれた残骸
内容がしょぼいので Qiita 限定共有で投稿。主に自分用のメモ。
(ここから本編)
要件
- 大量の計測リクエストを捌きたい。数万 QPS ぐらい。
- データは最終的に BigQuery でいい感じに使いたい。
※ 実務で実際にあった要件ではありません ※
設計
- RxJS で捌いてみる。
- サーバー1台で受けきってみる。
- Cloud Pub/Sub に漏れなくデータを送れたら勝ち。その後のことは後で考えれば良い。
詳細な設計
- CPU コアの数だけ cluster を使って fastify のサーバーアプリを起動する。
- 1 メッセージ毎にTopic に送るのではなく、複数のメッセージを束ねて送る。 (一度に 1000 メッセージまで送れる)
- リクエストの発生回数を可能な限り減らす。
-
並列で発生させるリクエストの数を制限する。 (1本でも十分速い)
- GCP ネットワーク内では 1 リクエストが 数 ms 〜 数十 ms で完了する。
できあがった Rx まわりのコード
import { Subject, interval } from 'rxjs';
import { buffer, mergeMap, tap, filter } from 'rxjs/operators';
const INTERVAL = 500;
const NTH = 900;
class RxClient {
private dispatcher$ = new Subject();
private notifier$ = new Subject();
private counter = 0;
constructor() {
this.setDispatcher();
}
send(data, attributes) {
const message = { data, attributes };
this.dispatcher$.next(message);
}
private setDispatcher() {
this.dispatcher$
.pipe(
tap(() => this.notifyEveryNthMessage(NTH)),
buffer(this.notifier$),
filter(messages => messages.length > 0),
mergeMap(messages => this.publish(messages), 1)
)
.subscribe({
next: () => this.notifier$.next(),
error: err => console.error(err)
});
interval(INTERVAL).subscribe(() => this.notifier$.next());
}
private publish(messages) {
// Mocking Cloud Pub/Sub Publish logic
return new Promise(resolve => {
const startTime = Date.now();
setTimeout(() => {
console.log(`messageIds: ${messages.length}, processed: ${Date.now() - startTime}ms`);
resolve(true);
}, Math.random() * 200);
});
}
private notifyEveryNthMessage(n) {
this.counter++;
if (this.counter % n === 0) {
this.notifier$.next();
}
if (this.counter > n * 1000 * 1000 * 1000 * 1000 * 1000) {
this.counter = 0;
}
}
}
少し解説
リクエストの入り口
外部から呼ばれるメソッド send()
。
message
を組み立てて dispatcher$
に送る。
send(data, attributes) {
const message = { data, attributes };
this.dispatcher$.next(message);
}
心臓部 dispatcher$ から始まるストリーム
-
message
をbuffer
に溜め続ける。 - ストリームが一周したら
buffer
に溜まっているmessage
を配列にして次に流す。 -
buffer
から流れてきたmessage
の配列が要素数 0 だったら捨てる。 -
message
の配列を Topic に送る。 (非同期処理) -
buffuer
に溜まっているmessage
を配列にして次に流す。 - (繰り返す)
this.dispatcher$
.pipe(
tap(() => this.notifyEveryNthMessage(NTH)),
buffer(this.notifier$),
filter(messages => messages.length > 0),
mergeMap(messages => this.publish(messages), 1)
)
.subscribe({
next: () => this.notifier$.next(),
error: err => console.error(err)
});
ポーリングしてストリームを flush
- 一定時間ごとに
buffer
からmessage
を吐き出させる。 (いわゆる flush) - 例えば GAE は放っておくと勝手にシャットダウンするので「
dispatcher$
からmessage
が n 回流れてきた」という条件以外にこのようなポーリングが必要になる。
interval(INTERVAL).subscribe(() => this.notifier$.next());
message
が n 回流れてきたらストリームを flush
- Topic への送信は一度に 1000 メッセージまでなので、 n (= 900) 回毎に
buffer
からmessage
を吐き出させる。 (いわゆる flush)
private notifyEveryNthMessage(n) {
this.counter++;
if (this.counter % n === 0) {
this.notifier$.next();
}
}
実行時のイメージ
非同期処理はモック
負荷試験をやってみたけど
- 4,000 QPS ぐらいまでは確認できたが、それ以上はどこがボトルネックになっているのわからない。
- (サーバースペックは 4 CPU CORE, 4 GB RAM)
- (GCE => GCE で Tsung による負荷試験を実施)
- CPU使用率が100%近くなっているので1台ではここらへんが限界なのか。。。
まとめ
- Topic への送信はメッセージを束ねてもレスポンスタイムはほとんど変わらない。
- (230 メッセージでも 30 ms くらい)
- テスト目的なら GCE のプリエンプティブを使おう。なぜなら安い。
- 実は GAE でやれば良かったのでは?
Thanks !
Author And Source
この問題について(Rx で大量のリクエストを捌... きたい), 我々は、より多くの情報をここで見つけました https://qiita.com/ovrmrw/items/2c8bd3d46d7bcc0806d2著者帰属:元の著者の情報は、元のURLに含まれています。著作権は原作者に属する。
Content is automatically searched and collected through network algorithms . If there is a violation . Please contact us . We will adjust (correct author information ,or delete content ) as soon as possible .