BlockingPopはRedis 2にある.バージョン6の変更点
3684 ワード
Blocking popとは
Redisでlistのデータのpopに対して2つのコマンドがあります rpopは、Listの右側からデータ をポップアップする. lpop、listの左側からデータ がポップアップ
blocking popとは、popの場合、listが存在しない場合、クライアントがブロックして待っていて、他のクライアントがpushを行うまで、このブロックしたクライアントが返答を受けるという意味で、もちろんブロックのタイムアウト時間を設定することもできます blpop list1[list2...] timeout rlpop list1[list2...] timeout
Redis2.バージョン6の変更点
まずシーンを仮定します Redis2.6バージョン前に、クライアント1は、v 1を取得し、v 1のpushおよびpop操作はaof を記録しない. Redis2.6バージョン以降、クライアント1は、v 3、v 1のpushおよび姑操作がaofに記録される を得る.
実装方法
コードを読み終えた後、コード実装には以下のような変化があることがわかりました.
データ構造の変化
2.6の後、redisDb構造体にready_が増加したことがわかる.keys辞書
lpush/rpush論理の変化
Redisは2.6リリース前に直接pushでblocking clientを処理するのでblpop/brpopは、pushの最初の数のvalueをblocking clientに返します.これらのvalueはdbを操作していません.aofにも入りません.
2.6バージョン以降は、まずdbのlistにデータを挿入し、processCommandで処理するので、blpop/brpopは意味と一致します.
Redisでlistのデータのpopに対して2つのコマンドがあります
blocking popとは、popの場合、listが存在しない場合、クライアントがブロックして待っていて、他のクライアントがpushを行うまで、このブロックしたクライアントが返答を受けるという意味で、もちろんブロックのタイムアウト時間を設定することもできます
Redis2.バージョン6の変更点
まずシーンを仮定します
1:blpop list1 0
2:lpush list1 v1 v2 v3
実装方法
コードを読み終えた後、コード実装には以下のような変化があることがわかりました.
データ構造の変化
2.6の後、redisDb構造体にready_が増加したことがわかる.keys辞書
/* 2.4 */
typedef struct redisDb {
dict *dict;
dict *expires;
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *io_keys; /* Keys with clients waiting for VM I/O */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id;
} redisDb;
/* 2.6 */
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id;
long long avg_ttl; /* Average TTL, just for stats */
} redisDb;
lpush/rpush論理の変化
Redisは2.6リリース前に直接pushでblocking clientを処理するのでblpop/brpopは、pushの最初の数のvalueをblocking clientに返します.これらのvalueはdbを操作していません.aofにも入りません.
2.6バージョン以降は、まずdbのlistにデータを挿入し、processCommandで処理するので、blpop/brpopは意味と一致します.
/* 2.6 */
/* push block client */
void pushGenericCommand(redisClient *c, int where) {
int j, addlen = 0, pushed = 0;
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
int may_have_waiting_clients = (lobj == NULL);
if (lobj && lobj->type != REDIS_LIST) {
addReply(c,shared.wrongtypeerr);
return;
}
for (j = 2; j < c->argc; j++) {
c->argv[j] = tryObjectEncoding(c->argv[j]);
if (may_have_waiting_clients) {
if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
addlen++;
continue;
} else {
may_have_waiting_clients = 0;
}
}
if (!lobj) {
lobj = createZiplistObject();
dbAdd(c->db,c->argv[1],lobj);
}
listTypePush(lobj,c->argv[j],where);
pushed++;
}
addReplyLongLong(c,addlen + (lobj ? listTypeLength(lobj) : 0));
if (pushed) signalModifiedKey(c->db,c->argv[1]);
server.dirty += pushed;
}
/* 2.6 */
/* list push , blocking keys ,
* , ready_keys */
void dbAdd(redisDb *db, robj *key, robj *val) {
sds copy = sdsdup(key->ptr);
int retval = dictAdd(db->dict, copy, val);
redisAssertWithInfo(NULL,key,retval == REDIS_OK);
if (val->type == REDIS_LIST) signalListAsReady(db, key);
}
/* processCommand */
call(c,REDIS_CALL_FULL);
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();