Redis源码阅读笔记-命令的接收和执行过程(二)

接着上一篇Redis源码阅读笔记-命令的接收和执行过程(一),主要写processCommand()的调用过程,和call()是如何调用各个命令的执行函数的。

processCommand()

processCommand()主要作用是判断命令的类型和简单判断参数数量,并对机器状态(内存限制,磁盘,主从状态等)和命令类型进行判断,看是否能调用命令,最后调用call()来真正调用各命令的执行函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
// server.c 

/* If this function gets called we already read a whole
* command, arguments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
* server for a bulk read from the client.
*
* If C_OK is returned the client is still alive and valid and
* other operations can be performed by the caller. Otherwise
* if C_ERR is returned the client was destroyed (i.e. after QUIT). */
int processCommand(client *c) {
/* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in
* a regular command proc. */
// 检查是否是退出命令
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}

/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
// 从命令字典中判断c->argv[0]->ptr中是否是正确的命令
// 命令集保存在server.commands中,这是一个redis的字典结构
// 在initServerConfig()的时候通过populateCommandTable()函数将命令集添加进字典中
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
// c->cmd 是一个redisCommand的结构,其中proc是一个函数指针,指向对应命令的操作
if (!c->cmd) {
// 命令不在命令集中,返回错误信息给客户端

flagTransaction(c);
sds args = sdsempty();
int i;
for (i=1; i < c->argc && sdslen(args) < 128; i++)
args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
(char*)c->argv[0]->ptr, args);
sdsfree(args);
return C_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
// 命令在命令集中
// 但是命令需要参数,且argc中的参数数量不等于命令所需的,或者少于所需要
// 返回错误信息给客户端
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return C_OK;
}

/* Check if the user is authenticated */
// 检查客户端连接是否已经登录验证(设置了需要验证的话)
if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
{
flagTransaction(c);
addReply(c,shared.noautherr);
return C_OK;
}

/* If cluster is enabled perform the cluster redirection here.
* However we don't perform the redirection if:
* 1) The sender of this command is our master.
* 2) The command has no key arguments. */
// 如果启动了集群,则进行集群的重定向
// 下面两种情况不重定向:
// 1. 命令时Master节点发送过来的
// 2. 命令没有关键参数
if (server.cluster_enabled && // 集群是否启动
!(c->flags & CLIENT_MASTER) && // 非Master节点的命令
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) && // 非Mater节点的lua函数
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand)) // 命令有关键参数
{
int hashslot;
int error_code;
// getNodeByQuery获取为该命令服务的节点,并获取错误代码和哈希槽
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&hashslot,&error_code);
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
// 命令重定向到节点
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
}

/* Handle the maxmemory directive.
*
* First we try to free some memory if possible (if there are volatile
* keys in the dataset). If there are not the only thing we can do
* is returning an error. */
// 如果有限定最大使用内存
// 则先尝试释放内存,如果内存不足则返回OOM错误
if (server.maxmemory) {
// 检查和尝试释放内存
// 原理是,zmalloc.c中会有一个used_memory的全局变量,来记录redis申请了多少内存
// PS: AOF的缓冲区和slaves的输出缓冲并不会算入已使用内存中
// 当内存不够时,freeMemoryIfNeeded()会根据设置的最大内存规则区尝试释放内存
int retval = freeMemoryIfNeeded();
/* freeMemoryIfNeeded may flush slave output buffers. This may result
* into a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) return C_ERR;

/* It was impossible to free enough memory, and the command the client
* is trying to execute is denied during OOM conditions? Error. */
if ((c->cmd->flags & CMD_DENYOOM) && retval == C_ERR) {
flagTransaction(c);
addReply(c, shared.oomerr);
return C_OK;
}
}

/* Don't accept write commands if there are problems persisting on disk
* and if this is a master instance. */
// 同时满足3个条件:
// 1. 磁盘存在问题,aof等写操作失败
// 2. 本机是master主机
// 3. 命令属于写操作
// 则会返回错误
if (((server.stop_writes_on_bgsave_err &&
server.saveparamslen > 0 &&
server.lastbgsave_status == C_ERR) ||
server.aof_last_write_status == C_ERR) &&
server.masterhost == NULL &&
(c->cmd->flags & CMD_WRITE ||
c->cmd->proc == pingCommand))
{
flagTransaction(c);
if (server.aof_last_write_status == C_OK)
addReply(c, shared.bgsaveerr);
else
addReplySds(c,
sdscatprintf(sdsempty(),
"-MISCONF Errors writing to the AOF file: %s\r\n",
strerror(server.aof_last_write_errno)));
return C_OK;
}

/* Don't accept write commands if there are not enough good slaves and
* user configured the min-slaves-to-write option. */
// 如果同时满足
// 1. 本机是master
// 2. 用户配置了 min-slaves-to-write 的选项
// 3. 命令属于写操作
// 4. 可用的 slaves 数量少于 repl_min_slaves_to_write 所需要的数量
// 则返回错误
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
c->cmd->flags & CMD_WRITE &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
{
flagTransaction(c);
addReply(c, shared.noreplicaserr);
return C_OK;
}

/* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master. */
// 如果同时满足
// 1. 本机是 slave
// 2. 本机是 只读 的slave
// 3. client不是来自 master
// 4. client的命令属于写操作
// 则返回错误
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
c->cmd->flags & CMD_WRITE)
{
addReply(c, shared.roslaveerr);
return C_OK;
}

/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */
// 如果client是消息订阅的连接
// 只允许以下操作:
// * ping
// * subscribe
// * unsubscribe
// * psubscribe
// * punsubscribe
if (c->flags & CLIENT_PUBSUB &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");
return C_OK;
}

/* Only allow INFO and SLAVEOF when slave-serve-stale-data is no and
* we are a slave with a broken link with master. */

if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0 &&
!(c->cmd->flags & CMD_STALE))
{
// // 如果本机是与master断开的slave时
flagTransaction(c);
addReply(c, shared.masterdownerr);
return C_OK;
}

/* Loading DB? Return an error if the command has not the
* CMD_LOADING flag. */
if (server.loading && !(c->cmd->flags & CMD_LOADING)) {
addReply(c, shared.loadingerr);
return C_OK;
}

/* Lua script too slow? Only allow a limited number of commands. */
// 当lua脚本很慢的时候,只允许部分操作
if (server.lua_timedout &&
c->cmd->proc != authCommand &&
c->cmd->proc != replconfCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
flagTransaction(c);
addReply(c, shared.slowscripterr);
return C_OK;
}

/* Exec the command */
// 执行各个命令
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
// 如果是client属于multi,且命令不是(exec,discard,multi,watch),则将命令加入到队列中
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
// 执行命令
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnLists();
}
return C_OK;
}

call()

call()是Redis中执行命令的核心函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
// server.c

/* Call() is the core of Redis execution of a command.
*
* The following flags can be passed:
* CMD_CALL_NONE No flags.
* CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed.
* CMD_CALL_STATS Populate command stats.
* CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset
* or if the client flags are forcing propagation.
* CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset
* or if the client flags are forcing propagation.
* CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL.
* CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE.
*
* The exact propagation behavior depends on the client flags.
* Specifically:
*
* 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set
* and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set
* in the call flags, then the command is propagated even if the
* dataset was not affected by the command.
* 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP
* are set, the propagation into AOF or to slaves is not performed even
* if the command modified the dataset.
*
* Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF
* or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or
* slaves propagation will never occur.
*
* Client flags are modified by the implementation of a given command
* using the following API:
*
* forceCommandPropagation(client *c, int flags);
* preventCommandPropagation(client *c);
* preventCommandAOF(client *c);
* preventCommandReplication(client *c);
*
*/
void call(client *c, int flags) {
long long dirty, start, duration;
int client_old_flags = c->flags;

/* Sent the command to clients in MONITOR mode, only if the commands are
* not generated from reading an AOF. */
// 将命令发送到monitor的客户端(当server正在读AOF的数据或者客户端的命令设置了CMD_SKIP_MONITOR或CMD_ADMIN)
// PS: MONITOR, 实时打印出 Redis 服务器接收到的命令,调试用
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
{
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}

/* Initialization: clear the flags that must be set by the command on
* demand, and initialize the array for additional commands propagation. */
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
// 初始化also_propagate,部分命令会调用also_propagate()将一些AOF的传播函数添加进server.also_propagate中,
// 然后在call()的末尾处调用
redisOpArray prev_also_propagate = server.also_propagate;
redisOpArrayInit(&server.also_propagate);

/* Call the command. */
dirty = server.dirty;
start = ustime();
// 执行命令
c->cmd->proc(c);
duration = ustime()-start; // 命令的执行时间
dirty = server.dirty-dirty; // 用来记录DB是否被修改了
if (dirty < 0) dirty = 0;

/* When EVAL is called loading the AOF we don't want commands called
* from Lua to go into the slowlog or to populate statistics. */
// 当正在加载AOF,且是执行lua时
// 不进入slow log 和 不统计数据
if (server.loading && c->flags & CLIENT_LUA)
flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);

/* If the caller is Lua, we want to force the EVAL caller to propagate
* the script if the command flag or client flag are forcing the
* propagation. */
// 如果是lua脚本,强制repl 和 aof
if (c->flags & CLIENT_LUA && server.lua_caller) {
if (c->flags & CLIENT_FORCE_REPL)
server.lua_caller->flags |= CLIENT_FORCE_REPL;
if (c->flags & CLIENT_FORCE_AOF)
server.lua_caller->flags |= CLIENT_FORCE_AOF;
}

/* Log the command into the Slow log if needed, and populate the
* per-command statistics that we show in INFO commandstats. */
// 记录slow log
if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
char *latency_event = (c->cmd->flags & CMD_FAST) ?
"fast-command" : "command";
latencyAddSampleIfNeeded(latency_event,duration/1000);
slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
}
if (flags & CMD_CALL_STATS) {
c->lastcmd->microseconds += duration;
c->lastcmd->calls++;
}

/* Propagate the command into the AOF and replication link */
// 传播数据库的变动到AOF或者主从复制中
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;

/* Check if the command operated changes in the data set. If so
* set for replication / AOF propagation. */
// 如果有数据库的变动,这设置传播AOF和主从
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);

/* If the client forced AOF / replication of the command, set
* the flags regardless of the command effects on the data set. */
// 判断是否需要强制传播
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;

/* However prevent AOF / replication propagation if the command
* implementatino called preventCommandPropagation() or similar,
* or if we don't have the call() flags to do so. */
// 判断client是否阻止AOF传播或者主从复制传播
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;

/* Call propagate() only if at least one of AOF / replication
* propagation is needed. Note that modules commands handle replication
* in an explicit way, so we never replicate them automatically. */
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
// 传播
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}

/* Restore the old replication flags, since call() can be executed
* recursively. */
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
c->flags |= client_old_flags &
(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);

/* Handle the alsoPropagate() API to handle commands that want to propagate
* multiple separated commands. Note that alsoPropagate() is not affected
* by CLIENT_PREVENT_PROP flag. */
// 调用使用alsoPropagate()添加的方法,alsoPropagate()会将方法添加到server.also_propagate中
if (server.also_propagate.numops) {
int j;
redisOp *rop;

if (flags & CMD_CALL_PROPAGATE) {
for (j = 0; j < server.also_propagate.numops; j++) {
rop = &server.also_propagate.ops[j];
int target = rop->target;
/* Whatever the command wish is, we honor the call() flags. */
if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
if (target)
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
}
}
redisOpArrayFree(&server.also_propagate);
}
server.also_propagate = prev_also_propagate;
server.stat_numcommands++;
}