Redis源码阅读笔记-命令的接收和执行过程(一)

Redis会为监听端口的Server Socket的fd在事件循环中注册读就绪事件,并添加相应的handler进行处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void initServer(void) {
......
// 为监听的端口的fd设置epoll事件和回调, 针对TCP socket
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
......
}

同时也会为客户端连接的Client Socket的fd在事件循环中注册相应的读写事件,并添加与之相对的handler进行处理。

比如接收到一个客户端连接,创建并注册读就绪事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));

......

if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 注册事件
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}

......
}

下面会分开两个方向写:

  • Server Socket接收到Client会如何处理
  • Client是如何接收命令和执行并响应的

会以TCP连接为主。

接收Client的连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// networking.c

// 当Server Socket接收到客户端连接,就会有AE_READABLE的事件,然后就会调用该Handler
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);

// 一个限制,每次事件循环,只接受最多MAX_ACCEPTS_PER_CALL(1000)个Client进行处理
// 防止短时间内要处理过多的Client
while(max--) {
// anetTcpAccept()是一个Socket操作的封装,里面调用了accept()将Client Socket的fd返回,
// 并返回远端的IP和端口号
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
// 获取到的Client Socket的fd会进入acceptCommonHandler()进行处理
// 会进行一些判断Redis是否已经超过了最大连接数等处理
// 如果没错误的话,会将其封装成client结构体,放入server.clients中
acceptCommonHandler(cfd,0,cip);
}
}

static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
// 将Client Socket的fd封装创建个一个client结构
// 会在createClient()中将fd注册事件循环的读就绪事件
if ((c = createClient(fd)) == NULL) {
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}

// 如果超过最大连接数,将会发送错误给客户端(忽略错误),并断开连接
/* If maxclient directive is set and this is one client more... close the
* connection. Note that we create the client instead to check before
* for this condition, since now the socket is already set in non-blocking
* mode and we can send an error for free using the Kernel I/O */
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";

// 忽略发送数据的结果
/* That's a best effort error message, don't check write errors */
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++;
// 断开client c的连接
// 会关闭socket,并注销所有事件循环的事件
freeClient(c);
return;
}

/* If the server is running in protected mode (the default) and there
* is no password set, nor a specific interface is bound, we don't accept
* requests from non loopback interfaces. Instead we try to explain the
* user what to do to fix it if needed. */
// 如果Redis是在 protected mode
// 且 没有绑定固定端口
// 且 没有设置访问密码
// 且 不是来之Unix Socket的连接
// 且 ip 不为空

if (server.protected_mode &&
server.bindaddr_count == 0 &&
server.requirepass == NULL &&
!(flags & CLIENT_UNIX_SOCKET) &&
ip != NULL)
{
// 那么就会尝试判断ip是否为本地连接,如果不是就断开连接(因为不安全啊~)
if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {
char *err =
"-DENIED Redis is running in protected mode because protected "
"mode is enabled, no bind address was specified, no "
"authentication password is requested to clients. In this mode "
"connections are only accepted from the loopback interface. "
"If you want to connect from external computers to Redis you "
"may adopt one of the following solutions: "
"1) Just disable protected mode sending the command "
"'CONFIG SET protected-mode no' from the loopback interface "
"by connecting to Redis from the same host the server is "
"running, however MAKE SURE Redis is not publicly accessible "
"from internet if you do so. Use CONFIG REWRITE to make this "
"change permanent. "
"2) Alternatively you can just disable the protected mode by "
"editing the Redis configuration file, and setting the protected "
"mode option to 'no', and then restarting the server. "
"3) If you started the server manually just for testing, restart "
"it with the '--protected-mode no' option. "
"4) Setup a bind address or an authentication password. "
"NOTE: You only need to do one of the above things in order for "
"the server to start accepting connections from the outside.\r\n";
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++;
freeClient(c);
return;
}
}

// 连接数+1
server.stat_numconnections++;
c->flags |= flags;
}

流程

acceptTcpHandler流程

client的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/* With multiplexing we need to take per-client state.
* Clients are taken in a linked list. */
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
int fd; /* Client socket. */
redisDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
sds pending_querybuf; /* If this client is flagged as master, this buffer
represents the yet not applied portion of the
replication stream that we are receiving from
the master. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
int reqtype; /* Request protocol type: PROTO_REQ_* */
int multibulklen; /* Number of multi bulk arguments left to read. */
long bulklen; /* Length of bulk argument in multi bulk request. */
list *reply; /* List of reply objects to send to the client. */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
size_t sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; /* Client creation time. */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
int flags; /* Client flags: CLIENT_* macros. */
int authenticated; /* When requirepass is non-NULL. */
int replstate; /* Replication state if this is a slave. */
int repl_put_online_on_ack; /* Install slave write handler on ACK. */
int repldbfd; /* Replication DB file descriptor. */
off_t repldboff; /* Replication DB file offset. */
off_t repldbsize; /* Replication DB file size. */
sds replpreamble; /* Replication DB preamble. */
long long read_reploff; /* Read replication offset if this is a master. */
long long reploff; /* Applied replication offset if this is a master. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: SLAVECONF listening-port */
char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
multiState mstate; /* MULTI/EXEC state */
int btype; /* Type of blocking op if CLIENT_BLOCKED. */
blockingState bpop; /* blocking state */
long long woff; /* Last write global replication offset. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
sds peerid; /* Cached peer ID. */
listNode *client_list_node; /* list node in client list */

/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;

PS: 有注释就懒得写了,而且部分属性我还没细看

创建client *createClient(int fd)

createClient()的主要功能是传入Client Socket的fd,用来初始化创建一个client,client中记录则该连接的一些操作数据,比如WATCH KEY的列表等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// networking.c

// 传入Client Socket的fd,用来初始化创建一个client
client *createClient(int fd) {
// 申请内存
client *c = zmalloc(sizeof(client));

/* passing -1 as fd it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
if (fd != -1) {
// 将Client Socket设成非阻塞模式(epoll等需要)
anetNonBlock(NULL,fd);
// 关闭TCP的Nagle算法,使得能更快响应客户端的请求
anetEnableTcpNoDelay(NULL,fd);
// 开启keepalive
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 为客户端注册读就绪事件,并注册handler
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}

// 初始化client的各个参数
selectDb(c,0);
uint64_t client_id;
atomicGetIncr(server.next_client_id,client_id,1);
c->id = client_id;
c->fd = fd;
c->name = NULL;
c->bufpos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty();
c->pending_querybuf = sdsempty();
c->querybuf_peak = 0;
c->reqtype = 0;
c->argc = 0;
c->argv = NULL;
c->cmd = c->lastcmd = NULL;
c->multibulklen = 0;
c->bulklen = -1;
c->sentlen = 0;
c->flags = 0;
c->ctime = c->lastinteraction = server.unixtime;
c->authenticated = 0;
c->replstate = REPL_STATE_NONE;
c->repl_put_online_on_ack = 0;
c->reploff = 0;
c->read_reploff = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->slave_listening_port = 0;
c->slave_ip[0] = '\0';
c->slave_capa = SLAVE_CAPA_NONE;
c->reply = listCreate();
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue);
c->btype = BLOCKED_NONE;
c->bpop.timeout = 0;
c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
c->bpop.target = NULL;
c->bpop.xread_group = NULL;
c->bpop.xread_consumer = NULL;
c->bpop.xread_group_noack = 0;
c->bpop.numreplicas = 0;
c->bpop.reploffset = 0;
c->woff = 0;
c->watched_keys = listCreate();
c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
c->pubsub_patterns = listCreate();
c->peerid = NULL;
c->client_list_node = NULL;
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
if (fd != -1) linkClient(c);
initClientMultiState(c);
return c;
}

释放void freeClient(client *c)

freeClient()是释放client,断开连接,释放缓存等操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// networking.c


void freeClient(client *c) {
listNode *ln;

/* If a client is protected, yet we need to free it right now, make sure
* to at least use asynchronous freeing. */
if (c->flags & CLIENT_PROTECTED) {

// 将client添加到 erver.clients_to_close中
// 等时间事件serverCron,调用freeClientsInAsyncFreeQueue()来释放里面的连接
freeClientAsync(c);
return;
}

/* If it is our master that's beging disconnected we should make sure
* to cache the state to try a partial resynchronization later.
*
* Note that before doing this we make sure that the client is not in
* some unexpected state, by checking its flags. */
// 与Master断开的处理
if (server.master && c->flags & CLIENT_MASTER) {
serverLog(LL_WARNING,"Connection with master lost.");
if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY|
CLIENT_CLOSE_ASAP|
CLIENT_BLOCKED)))
{
replicationCacheMaster(c);
return;
}
}

// 与 Slave 断开连接的处理
/* Log link disconnection with slave */
if ((c->flags & CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
serverLog(LL_WARNING,"Connection with replica %s lost.",
replicationGetSlaveName(c));
}

/* 下面就是释放各种内存等操作了 */

/* Free the query buffer */
sdsfree(c->querybuf);
sdsfree(c->pending_querybuf);
c->querybuf = NULL;

/* Deallocate structures used to block on blocking ops. */
if (c->flags & CLIENT_BLOCKED) unblockClient(c);
dictRelease(c->bpop.keys);

/* UNWATCH all the keys */
unwatchAllKeys(c);
listRelease(c->watched_keys);

/* Unsubscribe from all the pubsub channels */
pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeAllPatterns(c,0);
dictRelease(c->pubsub_channels);
listRelease(c->pubsub_patterns);

/* Free data structures. */
listRelease(c->reply);
freeClientArgv(c);

/* Unlink the client: this will close the socket, remove the I/O
* handlers, and remove references of the client from different
* places where active clients may be referenced. */
// 真正断开Socket的地方
unlinkClient(c);

/* Master/slave cleanup Case 1:
* we lost the connection with a slave. */
if (c->flags & CLIENT_SLAVE) {
if (c->replstate == SLAVE_STATE_SEND_BULK) {
if (c->repldbfd != -1) close(c->repldbfd);
if (c->replpreamble) sdsfree(c->replpreamble);
}
list *l = (c->flags & CLIENT_MONITOR) ? server.monitors : server.slaves;
ln = listSearchKey(l,c);
serverAssert(ln != NULL);
listDelNode(l,ln);
/* We need to remember the time when we started to have zero
* attached slaves, as after some time we'll free the replication
* backlog. */
if (c->flags & CLIENT_SLAVE && listLength(server.slaves) == 0)
server.repl_no_slaves_since = server.unixtime;
refreshGoodSlavesCount();
}

/* Master/slave cleanup Case 2:
* we lost the connection with the master. */
if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection();

/* If this client was scheduled for async freeing we need to remove it
* from the queue. */
if (c->flags & CLIENT_CLOSE_ASAP) {
ln = listSearchKey(server.clients_to_close,c);
serverAssert(ln != NULL);
listDelNode(server.clients_to_close,ln);
}

/* Release other dynamically allocated client structure fields,
* and finally release the client structure itself. */
if (c->name) decrRefCount(c->name);
zfree(c->argv);
freeClientMultiState(c);
sdsfree(c->peerid);
zfree(c);
}

接收来之Client的命令

这里就是主要看,Client连接上之后,主要的处理流程。

主要是从createClient()中,为客户端注册读就绪事件的readQueryFromClient()这个Handler开始。

这里涉及到了Redis的通信协议,配合https://redis.io/topics/protocol食用才是正道。

从上面链接可以得知,请求可以分为2种类型:

  • INLINE: 单个请求
  • MULTIBULK: 以*开头的多个请求

整个调用流程大概为:

  • readQueryFromClient(), 客户端Client读就绪后,向Socket读取数据,并存入client的buf中,然后调用processInputBufferAndReplicate()
  • processInputBufferAndReplicate()会根据client的类型(Master的Client 和 其他Client),分别调用processInputBuffer()对收到的数据进行处理,两种Client都调用processInputBuffer(),但是Master的Client需要额外处理。
  • processInputBuffer()会对数据进行一定处理,取出client的buf中未处理的数据,并判断请求的类型(INLINE/MULTIBULK),并将数据的数量和值保存进client->argcclient->argv中,然后调用processCommand()(在server.c中)执行。
  • processCommand()是真正检查和执行命令的函数。

PS: processCommand()后面单独写,主要是懒

readQueryFromClient()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// 客户端Client读就绪后,调用的Handler
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
int nread, readlen;
size_t qblen;
UNUSED(el);
UNUSED(mask);

// 默认的读取缓存大小
readlen = PROTO_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
* at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* Redis Object representing the argument. */
// 如果是一个批量请求
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);

/* Note that the 'remaining' variable may be zero in some edge case,
* for example once we resume a blocked client after CLIENT PAUSE. */
if (remaining > 0 && remaining < readlen) readlen = remaining;
}

qblen = sdslen(c->querybuf);
// 查询query的长度最大值
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
// 扩大c->querybuf的SDS字符串到相应的长度
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
// socket中读取数据
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
if (errno == EAGAIN) {
return;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) {
serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c);
return;
} else if (c->flags & CLIENT_MASTER) {
/* Append the query buffer to the pending (not applied) buffer
* of the master. We'll use this buffer later in order to have a
* copy of the string applied by the last command executed. */
// 复制拼接字符串,将`c->querybuf+qblen`后的数据,复制拼接到`c->pending_querybuf`后
// master的连接是处理`c->pending_querybuf`的
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
// 更新sds的长度
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
server.stat_net_input_bytes += nread;
// 超过最大长度限制
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}

/* Time to process the buffer. If the client is a master we need to
* compute the difference between the applied offset before and after
* processing the buffer, to understand how much of the replication stream
* was actually applied to the master state: this quantity, and its
* corresponding part of the replication stream, will be propagated to
* the sub-slaves and to the replication backlog. */
// 处理接收到的数据
processInputBufferAndReplicate(c);
}

可以看到readQueryFromClient()的主要工作其实很简单,主要是从socket中读取数据,并将其存入client->querybuf

processInputBufferAndReplicate()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/* This is a wrapper for processInputBuffer that also cares about handling
* the replication forwarding to the sub-slaves, in case the client 'c'
* is flagged as master. Usually you want to call this instead of the
* raw processInputBuffer(). */
// 对processInputBuffer()的封装,主要是master连接的处理差异
void processInputBufferAndReplicate(client *c) {
if (!(c->flags & CLIENT_MASTER)) {
// 如果这个不是master的client连接
processInputBuffer(c);
} else {
// master的client连接
size_t prev_offset = c->reploff;
processInputBuffer(c);
size_t applied = c->reploff - prev_offset;
// applied是计算c->pending_querybuf处理了哪些数据,然后在使用sdsrange()清除已经被处理的
if (applied) {
// 用户将数据代理到该Redis的slaves中
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}

processInputBufferAndReplicate()主要是对processInputBuffer()进行一个封装,其实直接写在readQueryFromClient()也可以,但现在这么做,对于以后添加c->flags不同的处理时,更加直观。

processInputBuffer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/* This function is called every time, in the client structure 'c', there is
* more query buffer to process, because we read more data from the socket
* or because a client was blocked and later reactivated, so there could be
* pending query buffer, already representing a full command, to process. */
// 读取数据,判断数据格式是否正确
void processInputBuffer(client *c) {
server.current_client = c;

/* Keep processing while there is something in the input buffer */
// `c->qb_pos`是已经读取的`c->querybuf`游标
// 所以当`c->qb_pos` >= `c->querybuf`时,就不需要处理了
while(c->qb_pos < sdslen(c->querybuf)) {
/* Return if clients are paused. */
// 如果非slave连接,而且当前服务器的所有client都被暂停了
// 则退出循环
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;

// client 处于CLIENT_BLOCKED,中止
// TODO: 看看什么情况下会处于这个状态
/* Immediately abort if the client is in the middle of something. */
if (c->flags & CLIENT_BLOCKED) break;

/* Don't process input from the master while there is a busy script
* condition on the slave. We want just to accumulate the replication
* stream (instead of replying -BUSY like we do with other clients) and
* later resume the processing. */
if (server.lua_timedout && c->flags & CLIENT_MASTER) break;

/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands).
*
* The same applies for clients we want to terminate ASAP. */
// CLIENT_CLOSE_AFTER_REPLY: 表示响应客户端的请求后断开连接
// CLIENT_CLOSE_ASAP: 表示要断开这个连接
// 所以 `CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP`是已经响应请求,需要断开的连接
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

/* Determine request type when unknown. */
// 判断命令的类型
if (!c->reqtype) {
// 当client刚创建的时候`c->reqtype`默认为0(详细见`createClient()`)
// 通过读数据的第一位判断是命令类型是MULTIBULK还是INLINE
// 详细协议看 https://redis.io/topics/protocol
if (c->querybuf[c->qb_pos] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}

if (c->reqtype == PROTO_REQ_INLINE) {
// 处理INLINE请求的buffer
// 其实就是处理好请求的命令数据,存进去`client->argc`和`client->argv`中
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
// 处理MULTIBULK请求
// 与processInlineBuffer()处理的方式类似,当时协议格式不一样
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}

/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* Only reset the client when the command was executed. */
// 调用server.c中的processCommand()函数执行命令
if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */
c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;
}

/* Don't reset the client structure for clients blocked in a
* module blocking command, so that the reply callback will
* still be able to access the client argv and argc field.
* The client will be reset in unblockClientFromModule(). */
if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
resetClient(c);
}
/* freeMemoryIfNeeded may flush slave output buffers. This may
* result into a slave, that may be the active client, to be
* freed. */
if (server.current_client == NULL) break;
}
}

/* Trim to pos */
if (c->qb_pos) {
// 将已经处理的`c->querybuf`中的数据删除
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}

server.current_client = NULL;
}

processInputBuffer()最主要的工作就是对连接传入的数据,进行一定格式化(并没有检查内容),方便processCommand()调用,同时通过首字符,判断请求的类型(INLINE、MULTIBULK)。