redis6.0マルチスレッドソースプロファイル
8526 ワード
1、概要
redis6.0マルチスレッドメカニズムが追加されました.常に2つの疑問があります.
1)、なぜマルチスレッドを入れるのか
2)、マルチスレッドメカニズム、なぜmemcacheと同じように設計しないのか
2、解析
まず最初の問題は,redisがmemcacheと異なる点は,redisが単一スレッドであることである.しかし、性能も良好で、主に彼のIO多重化のおかげで、また単線路はマルチスレッド切替コンテキストのオーバーヘッドを節約した.スレッドコンテキストの切り替えにより、CPU CACHE MISSが発生する可能性があります.CPUの1次キャッシュと3次キャッシュの速度は1レベルではありません.今度はredis 6.0はIOマルチスレッドのメカニズムを開き、主にredisの読み書きネットワークのread/writeシステム呼び出しがredis実行中に大量のCPU時間を消費したことを考慮する.ネットワークIOをマルチスレッド化すると大きなパフォーマンスが向上します.一方、lrange、del大keyなどの比較的時間のかかる操作であれば、プライマリスレッドをブロックする時間が多すぎることを回避し、オープンスレッドの非同期処理は良い選択である.
2つ目の問題は、redisがマルチスレッドモデルを選択するとき、memcacheと同じものを選択しない理由です.redisの著者Antirezはこう述べています.
Redisがマルチスレッドをサポートするには、次の2つの方法があります。 1つ目は「Memcached」のように、1つのRedisインスタンスが複数のスレッドを開き、GET/SETなどの簡単なコマンドで毎秒実行できる動作を向上させることです。これはI/O,コマンド解析などのマルチスレッド処理に関わるため,「I/O threading」と呼ぶ. もう1つは、他のクライアントがブロックされないように、異なるスレッドで時間のかかるコマンドを実行できるようにすることです。このスレッドモデルを「Slow commands threading」と呼びます。 熟考した結果、Redisは「I/O threading」を採用せず、Redisは実行時に主にネットワークとメモリに制約されるため、Redisのパフォーマンスを向上させるのは主に複数のRedisインスタンス、特にRedisクラスタを通じて行われる。 次に、主に2つの側面を改善することを考慮します。 Redisクラスタの複数のインスタンスは、AOFの同時書き換えを回避するために、ローカルインスタンスのディスクを編成することによって適切に使用することができる。 Redisクラスタエージェントを提供し、より良いクラスタプロトコルクライアントがない場合にクラスタを抽象化します。 なお、RedisはMemcachedと同様にメモリシステムであるが、Memcachedとは異なる。 マルチスレッドは複雑で、簡単なデータモデルを使用することを考慮しなければならない。LPUSHを実行するスレッドは、他のLPOPを実行するスレッドにサービスを提供する必要がある。 私が本当に望んでいるのは「slow operations threading」です。Redis 6またはRedis 7では、スレッドがゆっくりとした操作を処理するためにキーの制御を完全に得ることができるように「key-level locking」が提供されます。
簡単に言えば、redisの現在のパフォーマンスはネットワークとメモリに制限されています.したがって、マルチスレッドを用いてネットワークIOの読み書きと解析を処理し、実行コマンドは依然としてメインスレッドに渡される.マルチスレッドの同時制御はまた複雑(例えばトランザクション、LPUSH/LPPOPの同時制御)であり、redisの簡単な設計原則を維持するためにIOマルチスレッドを採用するのが適切である.
3、ソースの剖析
3.1、重要変数
io_threads_Active:IOマルチスレッドを開始するか
io_threads_op:操作タイプ読み書きIO_THREADS_OP_WRITE or IO_THREADS_OP_READ
io_threads_List[IO_THREADS_MAX_NUM]:各スレッドの待機クライアントリスト
io_threads_pending:スレッド当たりの待機クライアント数
3.1、スレッドを初期化し、コールバックを登録する
25行目pthread_を呼び出すcreateライブラリ関数はスレッドを作成し、スレッドコールバック関数IOThreadMainを登録します.スレッドTIDバインドスレッドID io_threads[i] = tid
void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
3.2、コールバック関数
12行目は実はループ待ちの実現であり、ここではsleepを使わず、sleepを設定する時間が不適切で性能の損失を招くことを避けるためであるが、ループ待ちもCPUを占有し、オーバーヘッドである.
17行目は、あるスレッドiのio_threads_pending[id]を0に設定
32行目は,現在のスレッド待ちキュー内のすべてのリクエストクライアントを順次取り出し処理し,リード操作はreadQueryFromClient処理,ライト操作はwriteToClient処理である.
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
if (tio_debug) printf("[%ld] %d to handle
", id, (int)listLength(io_threads_list[id]));
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done
", id);
}
}
3.3、処理対象のタスクの割り当て
次の関数の主な機能は,IOリクエストを異なるIOスレッドに割り当てて処理することである.
24行目から分かるように、割当ポリシーはRR,int target_であるid = item_id % server.io_threads_num; idで作ったhash型です.
25行目はIOスレッドを選択するとクライアントを要求し、キューの末尾に追加します.
49行目は、識別を更新し、各スレッドキューの長さを記録してコールバック関数の実行を通知する.
50行目は、すべてのスレッドが実行された後、メインスレッドに戻ります.
56行目は、メインスレッドが次のコマンド操作を実行する行です.
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
/* If I/O threads are disabled or we have few clients to serve, don't
* use I/O threads, but thejboring synchronous code. */
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
/* Start threads if needed. */
if (!io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL WRITE pending clients
", processed);
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed
");
/* Run the list of clients again to install the write handler where
* needed. */
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
return processed;
}