redisソース学習(クライアント)
45488 ワード
ざっと紹介する
redisクライアント設計は主に顧客のリンク、要求、解析を要求するコマンド、実行結果を格納する.まずserverの構造とclientの構造を見て、serverの中には複数のclientがあり、1つのサービス側が複数のクライアントを接続できることに相当し、サービス側はイベントトリガモードに基づいてクライアントの要求を順次処理する.
server構造
クライアント構造
クライアントのリクエスト・プロシージャのプロファイリング
クライアントはserverに接続して、コマンドを要求して、結果の全体の過程を返します
サーバ全体にイベント制御,clientリクエスト接続,送信リクエスト,server受信,解析,実行,戻り結果にはイベント制御がある.
まず、イベントの作成、トリガ、接続、クライアントの作成から始めます.
1)int main()内のinitServer関数
2)initServer関数
ipfd_countはサービス傍受ポートの数であり、異なる傍受ポートに異なるserver fdを作成する
さらに、各異なるserver fdを使用してファイルイベントを作成します.トリガーされる関数はaccepttTcpHandlerです(このうちの1つのリスニングポートに接続されると、対応するeventがトリガーされ、accepttTcpHandler関数がトリガーされます).
3)acceptTcpHandler関数
4)接続を要求し、クライアントを作成する:
networking.c:acceptCommonHandler関数
5)コマンド要求プロセッサ
networking.c:readQueryFromClient
データreadをclientのquerybufに,querybuf情報に基づいてコマンドに解析する
7)コマンドプロセッサ:
networking.c/sendReplyToClient関数:
redisクライアント設計は主に顧客のリンク、要求、解析を要求するコマンド、実行結果を格納する.まずserverの構造とclientの構造を見て、serverの中には複数のclientがあり、1つのサービス側が複数のクライアントを接続できることに相当し、サービス側はイベントトリガモードに基づいてクライアントの要求を順次処理する.
server構造
struct redisServer {
/* General */
//
char *configfile; /* Absolute config file path, or NULL */
// serverCron()
int hz; /* serverCron() calls frequency in hertz */
//
redisDb *db;
// ( rename )
dict *commands; /* Command table */
// ( rename )
dict *orig_commands; /* Command table before command renaming. */
//
aeEventLoop *el;
//
unsigned lruclock:REDIS_LRU_BITS; /* Clock for LRU eviction */
//
int shutdown_asap; /* SHUTDOWN needed ASAP */
// serverCron() rehash
int activerehashing; /* Incremental rehash in serverCron() */
//
char *requirepass; /* Pass for AUTH command, or NULL */
// PID
char *pidfile; /* PID file path */
//
int arch_bits; /* 32 or 64 depending on sizeof(long) */
// serverCron()
int cronloops; /* Number of times the cron function run */
// RUN ID
char runid[REDIS_RUN_ID_SIZE+1]; /* ID always different at every exec. */
// SENTINEL
int sentinel_mode; /* True if this instance is a Sentinel. */
/* Networking */
// TCP
int port; /* TCP listening port */
int tcp_backlog; /* TCP listen() backlog */
//
char *bindaddr[REDIS_BINDADDR_MAX]; /* Addresses we should bind to */
//
int bindaddr_count; /* Number of addresses in server.bindaddr[] */
// UNIX
char *unixsocket; /* UNIX socket path */
mode_t unixsocketperm; /* UNIX socket permission */
//
int ipfd[REDIS_BINDADDR_MAX]; /* TCP socket file descriptors */
//
int ipfd_count; /* Used slots in ipfd[] */
// UNIX
int sofd; /* Unix socket file descriptor */
int cfd[REDIS_BINDADDR_MAX];/* Cluster bus listening socket */
int cfd_count; /* Used slots in cfd[] */
// ,
list *clients; /* List of active clients */
// ,
list *clients_to_close; /* Clients to close asynchronously */
// , ,
list *slaves, *monitors; /* List of slaves and MONITORs */
// ,
redisClient *current_client; /* Current client, only used on crash report */
int clients_paused; /* True if clients are currently paused */
mstime_t clients_pause_end_time; /* Time when we undo clients_paused */
//
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
// MIGRATE
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
/* RDB / AOF loading information */
// ,
int loading; /* We are loading data from disk if true */
//
off_t loading_total_bytes;
//
off_t loading_loaded_bytes;
//
time_t loading_start_time;
off_t loading_process_events_interval_bytes;
/* Fast pointers to often looked up command */
//
struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
*rpopCommand;
/* Fields used only for stats */
//
time_t stat_starttime; /* Server start time */
//
long long stat_numcommands; /* Number of processed commands */
//
long long stat_numconnections; /* Number of connections received */
//
long long stat_expiredkeys; /* Number of expired keys */
//
long long stat_evictedkeys; /* Number of evicted keys (maxmemory) */
//
long long stat_keyspace_hits; /* Number of successful lookups of keys */
//
long long stat_keyspace_misses; /* Number of failed lookups of keys */
//
size_t stat_peak_memory; /* Max used memory record */
// fork()
long long stat_fork_time; /* Time needed to perform latest fork() */
//
long long stat_rejected_conn; /* Clients rejected because of maxclients */
// full sync
long long stat_sync_full; /* Number of full resyncs with slaves. */
// PSYNC
long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */
// PSYNC
long long stat_sync_partial_err;/* Number of unaccepted PSYNC requests. */
/* slowlog */
//
list *slowlog; /* SLOWLOG list of commands */
// ID
long long slowlog_entry_id; /* SLOWLOG current entry ID */
// slowlog-log-slower-than
long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
// slowlog-max-len
unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */
size_t resident_set_size; /* RSS sampled in serverCron(). */
/* The following two are used to track instantaneous "load" in terms
* of operations per second. */
//
long long ops_sec_last_sample_time; /* Timestamp of last sample (in ms) */
// ,
long long ops_sec_last_sample_ops; /* numcommands in last sample */
//
long long ops_sec_samples[REDIS_OPS_SEC_SAMPLES];
// , , 0
int ops_sec_idx;
/* Configuration */
//
int verbosity; /* Loglevel in redis.conf */
//
int maxidletime; /* Client timeout in seconds */
// SO_KEEPALIVE
int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */
int active_expire_enabled; /* Can be disabled for testing purposes. */
size_t client_max_querybuf_len; /* Limit for client query buffer length */
int dbnum; /* Total number of configured DBs */
int daemonize; /* True if running as a daemon */
//
// REDIS_CLIENT_LIMIT_NUM_CLASSES
// : 、 、pubsub,
clientBufferLimitsConfig client_obuf_limits[REDIS_CLIENT_LIMIT_NUM_CLASSES];
/* AOF persistence */
// AOF ( / / )
int aof_state; /* REDIS_AOF_(ON|OFF|WAIT_REWRITE) */
// fsync ( / / )
int aof_fsync; /* Kind of fsync() policy */
char *aof_filename; /* Name of the AOF file */
int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */
int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */
off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */
// BGREWRITEAOF , AOF
off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */
// AOF
off_t aof_current_size; /* AOF current size. */
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
// AOF ID
pid_t aof_child_pid; /* PID if rewriting process */
// AOF ,
list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */
// AOF
sds aof_buf; /* AOF buffer, written before entering the event loop */
// AOF
int aof_fd; /* File descriptor of currently selected AOF file */
// AOF
int aof_selected_db; /* Currently selected DB in AOF */
// write
time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */
// fsync
time_t aof_last_fsync; /* UNIX time of last fsync() */
time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */
// AOF
time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */
// BGREWRITEAOF
int aof_lastbgrewrite_status; /* REDIS_OK or REDIS_ERR */
// AOF write
unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */
// , fsync()
int aof_rewrite_incremental_fsync;/* fsync incrementally while rewriting? */
int aof_last_write_status; /* REDIS_OK or REDIS_ERR */
int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */
/* RDB persistence */
// SAVE ,
long long dirty; /* Changes to DB from the last save */
// BGSAVE
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
// BGSAVE ID
// BGSAVE , -1
pid_t rdb_child_pid; /* PID of RDB saving child */
struct saveparam *saveparams; /* Save points array for RDB */
int saveparamslen; /* Number of saving points */
char *rdb_filename; /* Name of RDB file */
int rdb_compression; /* Use compression in RDB? */
int rdb_checksum; /* Use RDB checksum? */
// SAVE
time_t lastsave; /* Unix time of last successful save */
// BGSAVE
time_t lastbgsave_try; /* Unix time of last attempted bgsave */
// BGSAVE
time_t rdb_save_time_last; /* Time used by last RDB save run. */
// BGSAVE
time_t rdb_save_time_start; /* Current RDB save start time. */
// SAVE
int lastbgsave_status; /* REDIS_OK or REDIS_ERR */
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
/* Propagation of commands in AOF / replication */
redisOpArray also_propagate; /* Additional command to propagate. */
/* Logging */
char *logfile; /* Path of log file */
int syslog_enabled; /* Is syslog enabled? */
char *syslog_ident; /* Syslog ident */
int syslog_facility; /* Syslog facility */
/* Replication (master) */
int slaveseldb; /* Last SELECTed DB in replication output */
// ( )
long long master_repl_offset; /* Global replication offset */
// PING
int repl_ping_slave_period; /* Master pings the slave every N seconds */
// backlog
char *repl_backlog; /* Replication backlog for partial syncs */
// backlog
long long repl_backlog_size; /* Backlog circular buffer size */
// backlog
long long repl_backlog_histlen; /* Backlog actual data length */
// backlog
long long repl_backlog_idx; /* Backlog circular buffer current offset */
// backlog
long long repl_backlog_off; /* Replication offset of first byte in the
backlog buffer. */
// backlog
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
gets released. */
//
time_t repl_no_slaves_since; /* We have no slaves since that time.
Only valid if server.slaves len is 0. */
//
int repl_min_slaves_to_write; /* Min number of slaves to write. */
//
int repl_min_slaves_max_lag; /* Max lag of <count> slaves to write. */
//
int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */
/* Replication (slave) */
//
char *masterauth; /* AUTH with this password with master */
//
char *masterhost; /* Hostname of master */
//
int masterport; /* Port of master */
//
int repl_timeout; /* Timeout after N seconds of master idle */
//
redisClient *master; /* Client that is master for this slave */
// ,PSYNC
redisClient *cached_master; /* Cached master to be reused for PSYNC. */
int repl_syncio_timeout; /* Timeout for synchronous I/O calls */
// ( )
int repl_state; /* Replication status if the instance is a slave */
// RDB
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
// RDB
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
// fsync
// sync_file_range
off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */
//
int repl_transfer_s; /* Slave -> Master SYNC socket */
// RDB
int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */
// RDB
char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */
// RDB
time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */
int repl_serve_stale_data; /* Serve stale data when link is down? */
// ?
int repl_slave_ro; /* Slave is read only? */
//
time_t repl_down_since; /* Unix time at which link with master went down */
// SYNC NODELAY ?
int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */
//
int slave_priority; /* Reported in INFO and used by Sentinel. */
// ( ) RUN ID
char repl_master_runid[REDIS_RUN_ID_SIZE+1]; /* Master run id for PSYNC. */
//
long long repl_master_initial_offset; /* Master PSYNC offset. */
/* Replication script cache. */
//
//
dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */
// FIFO
list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */
//
int repl_scriptcache_size; /* Max number of elements. */
/* Synchronous replication. */
list *clients_waiting_acks; /* Clients waiting in WAIT command. */
int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */
/* Limits */
int maxclients; /* Max number of simultaneous clients */
unsigned long long maxmemory; /* Max number of memory bytes to use */
int maxmemory_policy; /* Policy for key eviction */
int maxmemory_samples; /* Pricision of random sampling */
/* Blocked clients */
unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
list *unblocked_clients; /* list of clients to unblock before next loop */
list *ready_keys; /* List of readyList structures for BLPOP & co */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
int sort_alpha;
int sort_bypattern;
int sort_store;
/* Zip structure config, see redis.conf for more information */
size_t hash_max_ziplist_entries;
size_t hash_max_ziplist_value;
size_t list_max_ziplist_entries;
size_t list_max_ziplist_value;
size_t set_max_intset_entries;
size_t zset_max_ziplist_entries;
size_t zset_max_ziplist_value;
size_t hll_sparse_max_bytes;
time_t unixtime; /* Unix time sampled every cron cycle. */
long long mstime; /* Like 'unixtime' but with milliseconds resolution. */
/* Pubsub */
// , ,
//
//
dict *pubsub_channels; /* Map channels to list of subscribed clients */
//
list *pubsub_patterns; /* A list of pubsub_patterns */
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of REDIS_NOTIFY... flags. */
/* Cluster */
int cluster_enabled; /* Is cluster enabled? */
mstime_t cluster_node_timeout; /* Cluster node timeout. */
char *cluster_configfile; /* Cluster auto-generated config file name. */
struct clusterState *cluster; /* State of the cluster */
int cluster_migration_barrier; /* Cluster replicas migration barrier. */
/* Scripting */
// Lua
lua_State *lua; /* The Lua interpreter. We use just one for all clients */
// Lua Redis
redisClient *lua_client; /* The "fake client" to query Redis from Lua */
// EVAL , NULL
redisClient *lua_caller; /* The client running EVAL right now, or NULL */
// , Lua , SHA1
dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */
// Lua
mstime_t lua_time_limit; /* Script timeout in milliseconds */
//
mstime_t lua_time_start; /* Start time of script, milliseconds time */
//
int lua_write_dirty; /* True if a write command was called during the
execution of the current script. */
//
int lua_random_dirty; /* True if a random command was called during the
execution of the current script. */
//
int lua_timedout; /* True if we reached the time limit for script
execution. */
//
int lua_kill; /* Kill the script if true. */
/* Assert & bug reporting */
char *assert_failed;
char *assert_file;
int assert_line;
int bug_report_start; /* True if bug report header was already logged. */
int watchdog_period; /* Software watchdog period in ms. 0 = off */
};
クライアント構造
typedef struct redisClient {
//
int fd;
//
redisDb *db;
// id ( )
int dictid;
//
robj *name; /* As set by CLIENT SETNAME */
//
sds querybuf;
//
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size */
//
int argc;
//
robj **argv;
//
struct redisCommand *cmd, *lastcmd;
// :
int reqtype;
//
int multibulklen; /* number of multi bulk arguments left to read */
//
long bulklen; /* length of bulk argument in multi bulk request */
//
list *reply;
//
unsigned long reply_bytes; /* Tot bytes of objects in reply list */
// , short write
int sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
//
time_t ctime; /* Client creation time */
//
time_t lastinteraction; /* time of the last interaction, used for timeout */
//
time_t obuf_soft_limit_reached_time;
//
int flags; /* REDIS_SLAVE | REDIS_MONITOR | REDIS_MULTI ... */
// server.requirepass NULL
//
// 0 , 1
int authenticated; /* when requirepass is non-NULL */
//
int replstate; /* replication state if this is a slave */
// RDB
int repldbfd; /* replication DB file descriptor */
// RDB
off_t repldboff; /* replication DB file offset */
// RDB
off_t repldbsize; /* replication DB file size */
sds replpreamble; /* replication DB preamble. */
//
long long reploff; /* replication offset if this is our master */
// REPLCONF ACK
long long repl_ack_off; /* replication ack offset, if this is a slave */
// REPLCONF ACK
long long repl_ack_time;/* replication ack time, if this is a slave */
// master run ID
// ,
char replrunid[REDIS_RUN_ID_SIZE+1]; /* master run id if this is a master */
//
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
//
multiState mstate; /* MULTI/EXEC state */
//
int btype; /* Type of blocking op if REDIS_BLOCKED. */
//
blockingState bpop; /* blocking state */
//
long long woff; /* Last write global replication offset. */
//
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
//
// , NULL
// ,
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
// , pubsubPattern
//
// pubsubPattern
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
sds peerid; /* Cached peer ID. */
/* Response buffer */
//
int bufpos;
//
char buf[REDIS_REPLY_CHUNK_BYTES];
} redisClient;
クライアントのリクエスト・プロシージャのプロファイリング
クライアントはserverに接続して、コマンドを要求して、結果の全体の過程を返します
サーバ全体にイベント制御,clientリクエスト接続,送信リクエスト,server受信,解析,実行,戻り結果にはイベント制御がある.
まず、イベントの作成、トリガ、接続、クライアントの作成から始めます.
1)int main()内のinitServer関数
2)initServer関数
// TCP (accept)
// connect()
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
ipfd_countはサービス傍受ポートの数であり、異なる傍受ポートに異なるserver fdを作成する
さらに、各異なるserver fdを使用してファイルイベントを作成します.トリガーされる関数はaccepttTcpHandlerです(このうちの1つのリスニングポートに接続されると、対応するeventがトリガーされ、accepttTcpHandler関数がトリガーされます).
3)acceptTcpHandler関数
/*
* TCP
*/
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
while(max--) {
// accept
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
redisLog(REDIS_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
// (redisClient)
acceptCommonHandler(cfd,0);
}
}
4)接続を要求し、クライアントを作成する:
networking.c:acceptCommonHandler関数
/*
* TCP accept
*/
#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(int fd, int flags) {
//
redisClient *c;
if ((c = createClient(fd)) == NULL) {
redisLog(REDIS_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}
/* If maxclient directive is set and this is one client more... close the
* connection. Note that we create the client instead to check before
* for this condition, since now the socket is already set in non-blocking
* mode and we can send an error for free using the Kernel I/O */
//
// ,
// ,
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r
";
/* That's a best effort error message, don't check write errors */
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
//
server.stat_rejected_conn++;
freeClient(c);
return;
}
//
server.stat_numconnections++;
// FLAG
c->flags |= flags;
}
5)コマンド要求プロセッサ
networking.c:readQueryFromClient
redisClient *createClient(int fd) {
//
redisClient *c = zmalloc(sizeof(redisClient));
/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the Redis commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
// fd -1 ,
// fd -1 ,
// Redis , Lua
//
if (fd != -1) {
//
anetNonBlock(NULL,fd);
// Nagle
anetEnableTcpNoDelay(NULL,fd);
// keep alive
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// loop ( )
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
//......
}
createclientは、readQueryFromClientというイベントのトリガ関数である要求読み取り可能なイベントを作成しました.コマンド要求プロセッサです.次に、その機能を参照してください./*
*
*/
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient*) privdata;
int nread, readlen;
size_t qblen;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
//
server.current_client = c;
// ( 16 MB)
readlen = REDIS_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
* at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* Redis Object representing the argument. */
if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= REDIS_MBULK_BIG_ARG)
{
int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
if (remaining < readlen) readlen = remaining;
}
//
// short read ,
// ,
qblen = sdslen(c->querybuf);
// , (peak)
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
//
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
//
nread = read(fd, c->querybuf+qblen, readlen);
//
if (nread == -1) {
if (errno == EAGAIN) {
nread = 0;
} else {
redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
// EOF
} else if (nread == 0) {
redisLog(REDIS_VERBOSE, "Client closed connection");
freeClient(c);
return;
}
if (nread) {
// , (SDS) free len
// '\0'
sdsIncrLen(c->querybuf,nread);
//
c->lastinteraction = server.unixtime;
// master ,
if (c->flags & REDIS_MASTER) c->reploff += nread;
} else {
// nread == -1 errno == EAGAIN
server.current_client = NULL;
return;
}
//
//
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}
// , ,
//
processInputBuffer(c);
server.current_client = NULL;
}
)コマンド解析器データreadをclientのquerybufに,querybuf情報に基づいてコマンドに解析する
//
void processInputBuffer(redisClient *c) {
/* Keep processing while there is something in the input buffer */
//
// short read ,
// ,
//
while(sdslen(c->querybuf)) {
/* Return if clients are paused. */
// ,
if (!(c->flags & REDIS_SLAVE) && clientsArePaused()) return;
/* Immediately abort if the client is in the middle of something. */
// REDIS_BLOCKED
if (c->flags & REDIS_BLOCKED) return;
/* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands). */
// FLAG ,
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return;
/* Determine request type when unknown. */
//
// Redis :
// http://redis.readthedocs.org/en/latest/topic/protocol.html
// , ,
// TELNET
if (!c->reqtype) {
if (c->querybuf[0] == '*') {
//
c->reqtype = REDIS_REQ_MULTIBULK;
} else {
//
c->reqtype = REDIS_REQ_INLINE;
}
}
// ,
if (c->reqtype == REDIS_REQ_INLINE) {
if (processInlineBuffer(c) != REDIS_OK) break;
} else if (c->reqtype == REDIS_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != REDIS_OK) break;
} else {
redisPanic("Unknown request type");
}
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* Only reset the client when the command was executed. */
// ,
if (processCommand(c) == REDIS_OK)
resetClient(c);
}
}
}
7)コマンドプロセッサ:
/* If this function gets called we already read a whole
* command, arguments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
* server for a bulk read from the client.
*
* , ,
* ,
* 。
*
* If 1 is returned the client is still alive and valid and
* other operations can be performed by the caller. Otherwise
* if 0 is returned the client was destroyed (i.e. after QUIT).
*
* 1 , ,
* 。
* , 0 , 。
*/
int processCommand(redisClient *c) {
/* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in
* a regular command proc. */
// quit
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= REDIS_CLOSE_AFTER_REPLY;
return REDIS_ERR;
}
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
// , ,
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
//
flagTransaction(c);
addReplyErrorFormat(c,"unknown command '%s'",
(char*)c->argv[0]->ptr);
return REDIS_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
//
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return REDIS_OK;
}
/* Check if the user is authenticated */
//
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
{
flagTransaction(c);
addReply(c,shared.noautherr);
return REDIS_OK;
}
/* If cluster is enabled perform the cluster redirection here.
*
* , 。
*
* However we don't perform the redirection if:
*
* , , :
*
* 1) The sender of this command is our master.
*
*
* 2) The command has no key arguments.
* key
*/
if (server.cluster_enabled &&
!(c->flags & REDIS_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
{
int hashslot;
//
if (server.cluster->state != REDIS_CLUSTER_OK) {
flagTransaction(c);
addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down. Use CLUSTER INFO for more information\r
"));
return REDIS_OK;
//
} else {
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
//
if (n == NULL) {
flagTransaction(c);
if (error_code == REDIS_CLUSTER_REDIR_CROSS_SLOT) {
addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r
"));
} else if (error_code == REDIS_CLUSTER_REDIR_UNSTABLE) {
/* The request spawns mutliple keys in the same slot,
* but the slot is not "stable" currently as there is
* a migration or import in progress. */
addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r
"));
} else {
redisPanic("getNodeByQuery() unknown error.");
}
return REDIS_OK;
// ,
} else if (n != server.cluster->myself) {
flagTransaction(c);
// -<ASK or MOVED> <slot> <ip>:<port>
// -ASK 10086 127.0.0.1:12345
addReplySds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d\r
",
(error_code == REDIS_CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot,n->ip,n->port));
return REDIS_OK;
}
// , key
//
}
}
/* Handle the maxmemory directive.
*
* First we try to free some memory if possible (if there are volatile
* keys in the dataset). If there are not the only thing we can do
* is returning an error. */
// , ,
if (server.maxmemory) {
// ,
int retval = freeMemoryIfNeeded();
// (REDIS_CMD_DENYOOM)
//
//
if ((c->cmd->flags & REDIS_CMD_DENYOOM) && retval == REDIS_ERR) {
flagTransaction(c);
addReply(c, shared.oomerr);
return REDIS_OK;
}
}
/* Don't accept write commands if there are problems persisting on disk
* and if this is a master instance. */
// , BGSAVE
//
if (((server.stop_writes_on_bgsave_err &&
server.saveparamslen > 0 &&
server.lastbgsave_status == REDIS_ERR) ||
server.aof_last_write_status == REDIS_ERR) &&
server.masterhost == NULL &&
(c->cmd->flags & REDIS_CMD_WRITE ||
c->cmd->proc == pingCommand))
{
flagTransaction(c);
if (server.aof_last_write_status == REDIS_OK)
addReply(c, shared.bgsaveerr);
else
addReplySds(c,
sdscatprintf(sdsempty(),
"-MISCONF Errors writing to the AOF file: %s\r
",
strerror(server.aof_last_write_errno)));
return REDIS_OK;
}
/* Don't accept write commands if there are not enough good slaves and
* user configured the min-slaves-to-write option. */
//
// min-slaves-to-write
if (server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
c->cmd->flags & REDIS_CMD_WRITE &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
{
flagTransaction(c);
addReply(c, shared.noreplicaserr);
return REDIS_OK;
}
/* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master. */
// slave ,
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & REDIS_MASTER) &&
c->cmd->flags & REDIS_CMD_WRITE)
{
addReply(c, shared.roslaveerr);
return REDIS_OK;
}
/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
// ,
if ((dictSize(c->pubsub_channels) > 0 || listLength(c->pubsub_patterns) > 0)
&&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context");
return REDIS_OK;
}
/* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
* we are a slave with a broken link with master. */
if (server.masterhost && server.repl_state != REDIS_REPL_CONNECTED &&
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & REDIS_CMD_STALE))
{
flagTransaction(c);
addReply(c, shared.masterdownerr);
return REDIS_OK;
}
/* Loading DB? Return an error if the command has not the
* REDIS_CMD_LOADING flag. */
// , REDIS_CMD_LOADING
// ,
if (server.loading && !(c->cmd->flags & REDIS_CMD_LOADING)) {
addReply(c, shared.loadingerr);
return REDIS_OK;
}
/* Lua script too slow? Only allow a limited number of commands. */
// Lua , , SHUTDOWN SCRIPT KILL
if (server.lua_timedout &&
c->cmd->proc != authCommand &&
c->cmd->proc != replconfCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
flagTransaction(c);
addReply(c, shared.slowscripterr);
return REDIS_OK;
}
/* Exec the command */
if (c->flags & REDIS_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
//
// EXEC 、 DISCARD 、 MULTI WATCH
//
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
//
call(c,REDIS_CALL_FULL);
c->woff = server.master_repl_offset;
//
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return REDIS_OK;
}
)コマンド応答器networking.c/sendReplyToClient関数:
/*
*
*/
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = privdata;
int nwritten = 0, totwritten = 0, objlen;
size_t objmem;
robj *o;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
// ,
//
while(c->bufpos > 0 || listLength(c->reply)) {
if (c->bufpos > 0) {
// c->bufpos > 0
//
// c->sentlen short write
// short write , ,
// c->buf+c->sentlen ( ) 。
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
//
if (nwritten <= 0) break;
//
c->sentlen += nwritten;
totwritten += nwritten;
/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
//
//
if (c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
} else {
// listLength(c->reply) != 0
//
o = listNodeValue(listFirst(c->reply));
objlen = sdslen(o->ptr);
objmem = getStringObjectSdsUsedMemory(o);
//
if (objlen == 0) {
listDelNode(c->reply,listFirst(c->reply));
c->reply_bytes -= objmem;
continue;
}
//
// c->sentlen short write
// short write , ,
// c->buf+c->sentlen ( ) 。
nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
//
if (nwritten <= 0) break;
//
c->sentlen += nwritten;
totwritten += nwritten;
/* If we fully sent the object on head go to the next one */
// ,
if (c->sentlen == objlen) {
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
c->reply_bytes -= objmem;
}
}
/* Note that we avoid to send more than REDIS_MAX_WRITE_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
* super fast link that is always able to accept data (in real world
* scenario think about 'KEYS *' against the loopback interface).
*
* ,
* REDIS_MAX_WRITE_PER_EVENT ,
* , ,
*
*
* However if we are over the maxmemory limit we ignore that and
* just deliver as much data as it is possible to deliver.
*
* , ,
* ,
* ,
* REDIS_MAX_WRITE_PER_EVENT ,
*
*/
if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory)) break;
}
//
if (nwritten == -1) {
if (errno == EAGAIN) {
nwritten = 0;
} else {
redisLog(REDIS_VERBOSE,
"Error writing to client: %s", strerror(errno));
freeClient(c);
return;
}
}
if (totwritten > 0) {
/* For clients representing masters we don't count sending data
* as an interaction, since we always send REPLCONF ACK commands
* that take some time to just fill the socket output buffer.
* We just rely on data / pings received for timeout detection. */
if (!(c->flags & REDIS_MASTER)) c->lastinteraction = server.unixtime;
}
if (c->bufpos == 0 && listLength(c->reply) == 0) {
c->sentlen = 0;
// write handler
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
/* Close connection after entire reply has been sent. */
// FLAG ,
if (c->flags & REDIS_CLOSE_AFTER_REPLY) freeClient(c);
}
}