ハードディスク(HDD)がいっぱいになった後のredisの処理メカニズム
10834 ワード
先日、1台のredisマシンのハードディスクがいっぱいになり、主にプログラムバグによるバックアップ量の急増で、ちょうどプログラムを監視する通知メカニズムもストライキになったので、redisのストライキ(読み書きのみ)を初めて体験しました.
ディスクがいっぱいになった後のredisの処理メカニズムを見てみましょう.
saveプロセス:serverCron->rdbSaveBackground->rdbSave
save後処理:serverCron->backgroundSaveDoneHandler
上記フローの結果がserverである.lastbgsave_status = REDIS_ERR,
その影響でprocessCommandとluaRedisGenericCommandで書き込み操作であればREDIS_に戻ると判断実際に書き込みなしでOK
1.rdbSaveすべての書き出しエラーはREDIS_に戻ります.ERR
2.rdbSaveBackgroundで、サブプロセスがrdbsaveを呼び出してREDIS_を返す場合ERR,ではサブプロセスexit(1)
3.bgsaveが完了すると、serverCronでbgsaveサブプロセスのリターンコードを得て後続処理を行う
4.サブプロセスが非信号で終了し、exitcodeが0でない場合、bgsave状態をREDIS_に設定ERR
5.processCommandでcmdが書き込み操作と判定されたら、そのままREDIS_に戻るOK
6.luaRedisGenericCommandでcmdが書き込み操作と判定された場合、マスク
ディスクがいっぱいになった後のredisの処理メカニズムを見てみましょう.
saveプロセス:serverCron->rdbSaveBackground->rdbSave
save後処理:serverCron->backgroundSaveDoneHandler
上記フローの結果がserverである.lastbgsave_status = REDIS_ERR,
その影響でprocessCommandとluaRedisGenericCommandで書き込み操作であればREDIS_に戻ると判断実際に書き込みなしでOK
1.rdbSaveすべての書き出しエラーはREDIS_に戻ります.ERR
int rdbSave(char *filename) {
dictIterator *di = NULL;
dictEntry *de;
char tmpfile[256];
char magic[10];
int j;
long long now = mstime();
FILE *fp;
rio rdb;
uint64_t cksum;
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
strerror(errno));
return REDIS_ERR;
}
rioInitWithFile(&rdb,fp);
if (server.rdb_checksum)
rdb.update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
if (!di) {
fclose(fp);
return REDIS_ERR;
}
/* Write the SELECT DB opcode */
if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(&rdb,j) == -1) goto werr;
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;
}
dictReleaseIterator(di);
}
di = NULL; /* So that we don't release it again on error. */
/* EOF opcode */
if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;
/* CRC64 checksum. It will be zero if checksum computation is disabled, the
* loading code skips the check in this case. */
cksum = rdb.cksum;
memrev64ifbe(&cksum);
if (rioWrite(&rdb,&cksum,8) == 0) goto werr;
/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr;
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
unlink(tmpfile);
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,"DB saved on disk");
server.dirty = 0;
server.lastsave = time(NULL);
server.lastbgsave_status = REDIS_OK;
return REDIS_OK;
werr:
fclose(fp);
unlink(tmpfile);
redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
if (di) dictReleaseIterator(di);
return REDIS_ERR;
}
2.rdbSaveBackgroundで、サブプロセスがrdbsaveを呼び出してREDIS_を返す場合ERR,ではサブプロセスexit(1)
int rdbSaveBackground(char *filename) {
pid_t childpid;
long long start;
if (server.rdb_child_pid != -1) return REDIS_ERR;
server.dirty_before_bgsave = server.dirty;
server.lastbgsave_try = time(NULL);
start = ustime();
if ((childpid = fork()) == 0) {
int retval;
/* Child */
closeListeningSockets(0);
redisSetProcTitle("redis-rdb-bgsave");
retval = rdbSave(filename);
if (retval == REDIS_OK) {
size_t private_dirty = zmalloc_get_private_dirty();
if (private_dirty) {
redisLog(REDIS_NOTICE,
"RDB: %zu MB of memory used by copy-on-write",
private_dirty/(1024*1024));
}
}
exitFromChild((retval == REDIS_OK) ? 0 : 1); // 0/1
} else {
/* Parent */
server.stat_fork_time = ustime()-start;
if (childpid == -1) {
server.lastbgsave_status = REDIS_ERR;
redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
strerror(errno));
return REDIS_ERR;
}
redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
server.rdb_save_time_start = time(NULL);
server.rdb_child_pid = childpid;
updateDictResizePolicy();
return REDIS_OK;
}
return REDIS_OK; /* unreached */
}
3.bgsaveが完了すると、serverCronでbgsaveサブプロセスのリターンコードを得て後続処理を行う
/* Check if a background saving or AOF rewrite in progress terminated. */
if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
int statloc;
pid_t pid;
if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
int exitcode = WEXITSTATUS(statloc);
int bysignal = 0;
if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);
if (pid == server.rdb_child_pid) {
backgroundSaveDoneHandler(exitcode,bysignal); // bgsave exitcode
} else if (pid == server.aof_child_pid) {
backgroundRewriteDoneHandler(exitcode,bysignal);
} else {
redisLog(REDIS_WARNING,
"Warning, detected child with unmatched pid: %ld",
(long)pid);
}
updateDictResizePolicy();
}
}
4.サブプロセスが非信号で終了し、exitcodeが0でない場合、bgsave状態をREDIS_に設定ERR
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
if (!bysignal && exitcode == 0) {
redisLog(REDIS_NOTICE,
"Background saving terminated with success");
server.dirty = server.dirty - server.dirty_before_bgsave;
server.lastsave = time(NULL);
server.lastbgsave_status = REDIS_OK;
} else if (!bysignal && exitcode != 0) {
redisLog(REDIS_WARNING, "Background saving error");
server.lastbgsave_status = REDIS_ERR; //
} else {
mstime_t latency;
redisLog(REDIS_WARNING,
"Background saving terminated by signal %d", bysignal);
latencyStartMonitor(latency);
rdbRemoveTempFile(server.rdb_child_pid);
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("rdb-unlink-temp-file",latency);
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
* tirggering an error conditon. */
if (bysignal != SIGUSR1)
server.lastbgsave_status = REDIS_ERR;
}
server.rdb_child_pid = -1;
server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
server.rdb_save_time_start = -1;
/* Possibly there are slaves waiting for a BGSAVE in order to be served
* (the first stage of SYNC is a bulk transfer of dump.rdb) */
updateSlavesWaitingBgsave((!bysignal && exitcode == 0) ? REDIS_OK : REDIS_ERR);
}
5.processCommandでcmdが書き込み操作と判定されたら、そのままREDIS_に戻るOK
/* Don't accept write commands if there are problems persisting on disk
* and if this is a master instance. */
if (((server.stop_writes_on_bgsave_err &&
server.saveparamslen > 0 &&
server.lastbgsave_status == REDIS_ERR) ||
server.aof_last_write_status == REDIS_ERR) &&
server.masterhost == NULL &&
(c->cmd->flags & REDIS_CMD_WRITE ||
c->cmd->proc == pingCommand))
{
flagTransaction(c);
if (server.aof_last_write_status == REDIS_OK)
addReply(c, shared.bgsaveerr);
else
addReplySds(c,
sdscatprintf(sdsempty(),
"-MISCONF Errors writing to the AOF file: %s\r
",
strerror(server.aof_last_write_errno)));
return REDIS_OK;
}
6.luaRedisGenericCommandでcmdが書き込み操作と判定された場合、マスク
/* Write commands are forbidden against read-only slaves, or if a
* command marked as non-deterministic was already called in the context
* of this script. */
if (cmd->flags & REDIS_CMD_WRITE) {
if (server.lua_random_dirty) {
luaPushError(lua,
"Write commands not allowed after non deterministic commands");
goto cleanup;
} else if (server.masterhost && server.repl_slave_ro &&
!server.loading &&
!(server.lua_caller->flags & REDIS_MASTER))
{
luaPushError(lua, shared.roslaveerr->ptr);
goto cleanup;
} else if (server.stop_writes_on_bgsave_err &&
server.saveparamslen > 0 &&
server.lastbgsave_status == REDIS_ERR)
{
luaPushError(lua, shared.bgsaveerr->ptr);
goto cleanup;
}
}
cleanup:
/* Clean up. Command code may have changed argv/argc so we use the
* argv/argc of the client instead of the local variables. */
for (j = 0; j < c->argc; j++) {
robj *o = c->argv[j];
/* Try to cache the object in the cached_objects array.
* The object must be small, SDS-encoded, and with refcount = 1
* (we must be the only owner) for us to cache it. */
if (j < LUA_CMD_OBJCACHE_SIZE &&
o->refcount == 1 &&
o->encoding == REDIS_ENCODING_RAW &&
sdslen(o->ptr) <= LUA_CMD_OBJCACHE_MAX_LEN)
{
struct sdshdr *sh = (void*)(((char*)(o->ptr))-(sizeof(struct sdshdr)));
if (cached_objects[j]) decrRefCount(cached_objects[j]);
cached_objects[j] = o;
cached_objects_len[j] = sh->free + sh->len;
} else {
decrRefCount(o);
}
}
if (c->argv != argv) {
zfree(c->argv);
argv = NULL;
}
if (raise_error) {
/* If we are here we should have an error in the stack, in the
* form of a table with an "err" field. Extract the string to
* return the plain error. */
lua_pushstring(lua,"err");
lua_gettable(lua,-2);
return lua_error(lua);
}
return 1;