Redis源码阅读笔记-事件和事件循环

Redis源码阅读笔记-事件和事件循环

Reids 是采用单线程和IO多路复用是处理来之客户端的请求的,其中主要用到了evport,epoll,kqueue,select四种多路复用(按优先顺序)。

其中evport应该是Solaris上的,epoll是Linux上,kqueue则是FreeBSD上,而最后的select则是兼容性的选择,很多系统都支持。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// ae.c

/* Include the best multiplexing layer supported by this system.
* The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif

Redis将以上4种,都封装好,然后使用aeEventLoop进行调用。

在Redis的main函数最后,会调用aeMain()进入事件循环,直至结束:

1
2
3
4
5
6
7
8
9
10
11
12
13
// server.c

int main(int argc, char **argv) {
......
// 设置每次事件循环等待前的事件处理
aeSetBeforeSleepProc(server.el,beforeSleep);
// 设置每次事件循环等待后的事件处理
aeSetAfterSleepProc(server.el,afterSleep);
// 进入事件循环
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
}

事件循环 aeEventLoop

aeEventLoop的结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// ae.h

/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
  • maxfd: 记录aeEventLoop时间循环中,注册的文件描述符最大值
  • setsize: 最大文件描述符支持的数量
  • timeEventNextId: 记录时间事件的Id,每添加一个时间事件则增1,每处理一个时间事件则减一(详细可以看ae.c中的函数aeCreateTimeEvent()processTimeEvents()
  • lastTime: 检查系统时钟偏差所用的
  • events: 一个长度为setsize大小的aeFileEvent数组,其中数组的下标是文件描述符,例event[fd],可以参考aeCreateFileEvent()函数,aeFileEvent中保存在该fd注册的事件和事件对应着的读写操作(具体结构参考下文)。
  • fired: 一个长度为setsize大小的aeFiredEvent数组,该数组是保存在该次事件循环中,有哪些fd触发了事件,触发了什么事件(具体结构参考下文)。
  • timeEventHead: 记录着时间事件的链表头,时间事件是保存在一个链表中的(具体结构参考下文)
  • stop: 事件循环aeEventLoop是否已经停止的标识符,当stop为1时,表示已经停止了
  • apidata: 是给封装4个多路复用(evportepollkueueselect)时使用的。
  • beforesleep: 保存着事件循环阻塞前调用的方法(参考aeSetBeforeSleepProc()
  • aftersleep: 保存着事件循环阻塞后调用的方法(参考aeSetAfterSleepProc()

aeFileEvent结构体

1
2
3
4
5
6
7
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
  • mask: 记录该文件描述符fd已经注册了什么事件
    • READABLE: “可读”事件
    • WRITABLE: “可写”事件
    • BARRIER: 表示在处理“可写”事件之前,不处理“可读”事件
  • rfileProc: 记录读操作的函数指针
  • wfileProc: 记录写操作的函数指针
  • clientData: 私有数据

aeFiredEvent结构体

1
2
3
4
5
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
  • fd: 有事件触发的文件描述符
  • mask: 记录文件描述符触发了什么事件的标识

aeMain()aeProcessEvents()的流程

aeMain()aeEventLoop中的main函数(废话),调用后,就会循环调用aeProcessEvents(),直至stop为1。

aeProcessEvents()则是事件循环的主要操作,他会调用多路复用函数并阻塞对应的时间和计算触发时间事件。

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ae.c

// 事件循环的主程序
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {

// 触发sleep前的事件,其实就是 epoll_wait() 阻塞前执行
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 真正调用epoll的函数,AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP 表示触发所有类型的时间和触发epoll_wait()阻塞后的事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// ae.c

/* Process every pending time event, then every pending file event
* (that may be registered by time event callbacks just processed).
* Without special flags the function sleeps until some file event
* fires, or when the next time event occurs (if any).
*
* If flags is 0, the function does nothing and returns.
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
* if flags has AE_FILE_EVENTS set, file events are processed.
* if flags has AE_TIME_EVENTS set, time events are processed.
* if flags has AE_DONT_WAIT set the function returns ASAP until all
* if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
* the events that's possible to process without to wait are processed.
*
* The function returns the number of events processed. */
// 处理时间事件,调用各个文件描述符对应的读就绪/写就绪的函数,还有触发 AfterSleepProc
// flags 是标识函数处理的方式,
// AE_ALL_EVENTS: 标识所有事件都处理
// AE_FILE_EVENTS: 处理文件描述符事件
// AE_TIME_EVENTS: 处理时间事件
// AE_DONT_WAIT: 函数不阻塞(就是aeApiPoll()不会阻塞)
// AE_CALL_AFTER_SLEEP: 触发 AfterSleepProc
// 其实目前代码看来,只有ae.c 和 networking.c中调用到了该函数
// 基本上都是设为AE_ALL_EVENTS
// 差别仅为ae.c中要加上AE_CALL_AFTER_SLEEP
// networking.c要加上AE_DONT_WAIT
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;

/* Nothing to do? return ASAP */
// 如果不需要检查时间事件和文件(指文件描述符)事件,则直接返回
// 其实目前没有这个情况
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
// 如果有文件描述符
// 或
// falgs 表示需要检查时间事件 且 没有标为不等待
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;

if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
// 从数组中找出距离时间最短的时间事件
// PS: 从注释中看aeSearchNearestTimer()是遍历的结构,每次都是O(N)
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
// 如果有时间事件

long now_sec, now_ms;
// 获取当前时间
aeGetTime(&now_sec, &now_ms);
tvp = &tv;

// 计算确定epoll_wait()的timeout,让epoll_wait()能在下一个时间事件需要触发的时候返回
/* How many milliseconds we need to wait for the next
* time event to fire? */
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;

if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
// 如果是AE_DONT_WAIT,那就就会将epoll_wait()的timeout设为0,那么就不会阻塞了
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
// 像注释说的那样,这个情况可以让epoll_wait()阻塞,直至有时间返回
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}

/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
// 这里将会阻塞tvp中的时间(除非设置了AE_DONT_WAIT)
// numevents是有多少个事件,其中事件类型和其文件描述符保存在eventLoop->fired中
numevents = aeApiPoll(eventLoop, tvp);

/* After sleep callback. */
// 调用阻塞后的处理
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);

// 处理每个eventLoop->fired的事件
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */

/* Normally we execute the readable event first, and the writable
* event laster. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if AE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsynching a file to disk,
* before replying to a client. */
// AE_BARRIER 是表示优先处理 “可写事件”,Redis默认是优先处理“可读事件”的
int invert = fe->mask & AE_BARRIER;

/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
// 可读的文件描述符
if (!invert && fe->mask & mask & AE_READABLE) {
// rfileProc是为每个文件描述符注册的读事件
// 比如监听端口的服务端文件描述符,则是在server.c中的initServer()函数中,
// 调用了aeCreateFileEvent()来注册的
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}

/* Fire the writable event. */
// 可写的文件描述符
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
// rfileProc是为每个文件描述符注册的写事件
// 比如监听端口的服务端文件描述符,则是在server.c中的initServer()函数中,
// 调用了aeCreateFileEvent()来注册的
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}

/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}

processed++;
}
}

// 检查并执行时间事件(如果有需要触发的话)
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);

return processed; /* return the number of processed file/time events */
}

大概流程

事件循环流程

其他相关函数的作用

函数 作用
aeEventLoop *aeCreateEventLoop(int setsize) aeEventLoop的创建和初始化函数
void aeDeleteEventLoop(aeEventLoop *eventLoop) 删除释放aeDeleteEventLoop的函数
void aeStop(aeEventLoop *eventLoop) 停止事件循环
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,aeFileProc *proc, void *clientData) 为文件描述符fd注册事件,并将其添加到eventLoop->events
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) 为文件描述符fd注销事件,并将其从eventLoop->events中移除
int aeGetFileEvents(aeEventLoop *eventLoop, int fd) eventLoop->events中获取fd注册时的mask
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,aeTimeProc *proc, void *clientData,aeEventFinalizerProc *finalizerProc) 注册添加时间事件
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) 删除时间事件
int aeProcessEvents(aeEventLoop *eventLoop, int flags) 事件循环的主要函数
int aeWait(int fd, int mask, long long milliseconds) 阻塞等待文件描述符fd读/写/错误 就绪
void aeMain(aeEventLoop *eventLoop) Main函数
char *aeGetApiName(void) 获取所使用的多路复用的名字(evport,epoll,kqueue,select)
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) 设置阻塞前处理函数
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) 设置阻塞后处理函数
int aeGetSetSize(aeEventLoop *eventLoop) 获取eventLoopsetsize
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) 重置eventLoopsetsize

事件

从上面可以看出,事件循环里面,一共处理了3大类的时间:

  • BeforeSleepProcAfterSleepProc的阻塞前后处理
  • 文件描述符就绪的事件
  • 时间事件

下面来看看它们是做了哪些事情

BeforeSleepProc

server.c中,将beforeSleep()设置为阻塞前的处理函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/* This function gets called every time Redis is entering the
* main loop of the event driven library, that is, before to sleep
* for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);

/* Call the Redis Cluster before sleep function. Note that this function
* may change the state of Redis Cluster (from ok to fail or vice versa),
* so it's a good idea to call it before serving the unblocked clients
* later in this function. */
// 启动集群
if (server.cluster_enabled) clusterBeforeSleep();

/* Run a fast expire cycle (the called function will return
* ASAP if a fast cycle is not needed). */
// 过期键的回收
if (server.active_expire_enabled && server.masterhost == NULL)
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);

/* Send all the slaves an ACK request if at least one client blocked
* during the previous event loop iteration. */
// 主从的响应
if (server.get_ack_from_slaves) {
robj *argv[3];

argv[0] = createStringObject("REPLCONF",8);
argv[1] = createStringObject("GETACK",6);
argv[2] = createStringObject("*",1); /* Not used argument. */
replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[2]);
server.get_ack_from_slaves = 0;
}

/* Unblock all the clients blocked for synchronous replication
* in WAIT. */
if (listLength(server.clients_waiting_acks))
processClientsWaitingReplicas();

/* Check if there are clients unblocked by modules that implement
* blocking commands. */
moduleHandleBlockedClients();

/* Try to process pending commands for clients that were just unblocked. */
if (listLength(server.unblocked_clients))
processUnblockedClients();

/* Write the AOF buffer on disk */
// 写AOF
flushAppendOnlyFile(0);

/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();

/* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this
* time. */
// 释放GIL锁
if (moduleCount()) moduleReleaseGIL();
}

可以看出,其主要工作有:

  • 在阻塞前对集群进行一定处理
  • 启动过期键的回收
  • 主从的响应
  • 将AOF写入硬盘
  • 释放GIL锁

所以,在阻塞的启动,其实Redis是会存在其他线程对Redis的数据集等进行处理的。

但在响应客户端请求其中,有且只会有主线程对数据集进行操作,所以使得请求是串行访问。

AfterSleepProc

server.c中,将afterSleep()设置为阻塞后的处理函数

1
2
3
4
5
6
7
/* This function is called immadiately after the event loop multiplexing
* API returned, and the control is going to soon return to Redis by invoking
* the different events callbacks. */
void afterSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
if (moduleCount()) moduleAcquireGIL();
}

在事件循环阻塞后,第一件事是设置GIL锁,使得只有主线程对数据集进行操作。

文件事件

  1. 为Server端的接口(TCP Socket,Unix Socket,管道)客户端连接的可读事件(在server.cinitServer()函数中)
  2. 为各个客户端连接的Socket添加读/写事件(在networking.c中)
  3. AOF的管道(Pipe)添加读/写事件(在aof.c中)
  4. Cluster集群连接的读/写事件(在cluster.c中)
  5. 主从复制连接的读/写事件(在replication.c中)
  6. Redis哨兵模式连接的读/写事件(在sentinel.c中)

时间事件

server.c中,将serverCron()设置为一个时间事件。

从注释上看,serverCron()做了挺多东西的:

  • 激活过期键的回收
  • 看门狗
  • 更新统计信息
  • 对Redis DB的 Hash表 进行 rehash
  • 触发BGSAVE / AOF 的重写
  • 处理各种客户端的超时
  • 主从复制的重连
  • 其他工作

serverCron()中,挺巧妙的利用了run_with_period()使得每个工作都间隔一段事件执行,避免操作太频繁。

serverCron()具体的代码就不贴出来了,因为和其他各种功能都有关联,就不是事件循环里面写了。