Redis源码阅读笔记-服务器启动和初始化

Redis源码阅读笔记-服务器启动和初始化

服务器的启动

Redis服务器的main函数是在server.c中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
// server.c

int main(int argc, char **argv) {
struct timeval tv;
int j;

/* 对服务器一些常量等进行设置 */
/* We need to initialize our libraries, and the server configuration. */
#ifdef INIT_SETPROCTITLE_REPLACEMENT
spt_init(argc, argv);
#endif
setlocale(LC_COLLATE,"");
tzset(); /* Populates 'timezone' global. */
// OutOfMemory时的处理,主要是打印到日志中
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
srand(time(NULL)^getpid());
gettimeofday(&tv,NULL);

char hashseed[16];
getRandomHexChars(hashseed,sizeof(hashseed));
dictSetHashFunctionSeed((uint8_t*)hashseed);
// 判断是否是sentinel模式(哨兵模式)
server.sentinel_mode = checkForSentinelMode(argc,argv);
// 初始化Redis server的常量配置
initServerConfig();
moduleInitModulesSystem();

/* Store the executable path and arguments in a safe place in order
* to be able to restart the server later. */
server.executable = getAbsolutePath(argv[0]);
server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
server.exec_argv[argc] = NULL;
for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);

/* We need to init sentinel right now as parsing the configuration file
* in sentinel mode will have the effect of populating the sentinel
* data structures with master nodes to monitor. */
if (server.sentinel_mode) {
// 如果是sentinel模式,则进行sentinel的配置初始化
initSentinelConfig();
initSentinel();
}

// 下面是对命令和参数进行处理,判断是否是执行server启动的命令,或者其他命令
/* Check if we need to start in redis-check-rdb/aof mode. We just execute
* the program main. However the program is part of the Redis executable
* so that we can easily execute an RDB check on loading errors. */
// argv[0] 为执行的程序名称
// 执行的是否为redis-check-rdb RDB文件检查工具
if (strstr(argv[0],"redis-check-rdb") != NULL)
redis_check_rdb_main(argc,argv,NULL);
// 执行的是否为redis-check-aof AOF文件修复工具
else if (strstr(argv[0],"redis-check-aof") != NULL)
redis_check_aof_main(argc,argv);

// 判断是否只是输出提示的的命令操作
if (argc >= 2) {
j = 1; /* First option to parse in argv[] */
sds options = sdsempty();
char *configfile = NULL;

/* Handle special options --help and --version */
if (strcmp(argv[1], "-v") == 0 ||
strcmp(argv[1], "--version") == 0) version();
if (strcmp(argv[1], "--help") == 0 ||
strcmp(argv[1], "-h") == 0) usage();
if (strcmp(argv[1], "--test-memory") == 0) {
if (argc == 3) {
memtest(atoi(argv[2]),50);
exit(0);
} else {
fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
exit(1);
}
}

// 获取指定配置文件的路径
/* First argument is the config file name? */
if (argv[j][0] != '-' || argv[j][1] != '-') {
configfile = argv[j];
server.configfile = getAbsolutePath(configfile);
/* Replace the config file in server.exec_argv with
* its absolute path. */
zfree(server.exec_argv[j]);
server.exec_argv[j] = zstrdup(server.configfile);
j++;
}

/* All the other options are parsed and conceptually appended to the
* configuration file. For instance --port 6380 will generate the
* string "port 6380\n" to be parsed after the actual file name
* is parsed, if any. */
while(j != argc) {
if (argv[j][0] == '-' && argv[j][1] == '-') {
/* Option name */
if (!strcmp(argv[j], "--check-rdb")) {
/* Argument has no options, need to skip for parsing. */
j++;
continue;
}
if (sdslen(options)) options = sdscat(options,"\n");
options = sdscat(options,argv[j]+2);
options = sdscat(options," ");
} else {
/* Option argument */
options = sdscatrepr(options,argv[j],strlen(argv[j]));
options = sdscat(options," ");
}
j++;
}
if (server.sentinel_mode && configfile && *configfile == '-') {
serverLog(LL_WARNING,
"Sentinel config from STDIN not allowed.");
serverLog(LL_WARNING,
"Sentinel needs config file on disk to save state. Exiting...");
exit(1);
}
resetServerSaveParams();
loadServerConfig(configfile,options);
sdsfree(options);
}

serverLog(LL_WARNING, "oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo");
serverLog(LL_WARNING,
"Redis version=%s, bits=%d, commit=%s, modified=%d, pid=%d, just started",
REDIS_VERSION,
(sizeof(long) == 8) ? 64 : 32,
redisGitSHA1(),
strtol(redisGitDirty(),NULL,10) > 0,
(int)getpid());

if (argc == 1) {
serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
} else {
serverLog(LL_WARNING, "Configuration loaded");
}

// 定义是upstart还是systemd接管redis进程
server.supervised = redisIsSupervised(server.supervised_mode);
int background = server.daemonize && !server.supervised;
// 是否在后台执行
if (background) daemonize();

// 初始化服务
initServer();
if (background || server.pidfile) createPidFile();
redisSetProcTitle(argv[0]);
redisAsciiArt();
checkTcpBacklogSettings();

// 打印部分警告或者提示
if (!server.sentinel_mode) {
/* Things not needed when running in Sentinel mode. */
serverLog(LL_WARNING,"Server initialized");
#ifdef __linux__
linuxMemoryWarnings();
#endif
moduleLoadFromQueue();
loadDataFromDisk();
if (server.cluster_enabled) {
if (verifyClusterConfigWithData() == C_ERR) {
serverLog(LL_WARNING,
"You can't have keys in a DB different than DB 0 when in "
"Cluster mode. Exiting.");
exit(1);
}
}
if (server.ipfd_count > 0)
serverLog(LL_NOTICE,"Ready to accept connections");
if (server.sofd > 0)
serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
} else {
sentinelIsRunning();
}

/* Warning the user about suspicious maxmemory setting. */
if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
}

// 设置每次事件循环等待前的事件处理
aeSetBeforeSleepProc(server.el,beforeSleep);
// 设置每次事件循环等待后的事件处理
aeSetAfterSleepProc(server.el,afterSleep);
// 进入事件循环
aeMain(server.el);
aeDeleteEventLoop(server.el);
return 0;
}

整个启动的顺序大概是:

  1. 设置部分全局变量,比如hash种子等。
  2. 解析命令和命令行参数,如果执行的不是redis-server或者是redis-server的其他(比如--version等)命令,则指定对应命令后退出。
  3. 准备启动服务,调用initServer()初始化服务,会开始监听对应的端口,和把要监控的时间/文件事件添加到EventLoop中。
  4. 设置EventLoop的 事件循环等待前的事件处理 和 事件循环等待后的事件处理,然后进入aeMain()事件的监听和循环,默认的IO多路复用是epoll。

其中ae.c是封装好的事件循环调用,而ae_epoll.cae_select.c等则是封装epoll和select等的代码,使它们都由一个统一的接口。

服务器初始化initServer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// server.c

// 初始化服务器,监听端口,设置事件回调,申请需要的内存等
void initServer(void) {
int j;

// 设置信号的处理
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();

if (server.syslog_enabled) {
openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
server.syslog_facility);
}

// 为server变量的部分值设置和初始化变量
server.hz = server.config_hz;
server.pid = getpid();// 进程id
server.current_client = NULL;
server.clients = listCreate();
server.clients_index = raxNew();
server.clients_to_close = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
server.clients_pending_write = listCreate();
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
server.clients_paused = 0;
server.system_memory_size = zmalloc_get_memory_size();

// 创建一个可以共享的对象,主要是SDS结构的对象
createSharedObjects();
// 根据最大客户端数等,设置文件(socket, pipe等)打开数量限制
adjustOpenFilesLimit();
// 创建EventLoop
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
if (server.el == NULL) {
serverLog(LL_WARNING,
"Failed creating the event loop. Error message: '%s'",
strerror(errno));
exit(1);
}
server.db = zmalloc(sizeof(redisDb)*server.dbnum);

// 监听TCP端口
/* Open the TCP listening socket for the user commands. */
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);

// 监听Unix scoket
/* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,
server.unixsocketperm, server.tcp_backlog);
if (server.sofd == ANET_ERR) {
serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
exit(1);
}
anetNonBlock(NULL,server.sofd);
}

// 判断是否监听端口成功
/* Abort if there are no listening sockets at all. */
if (server.ipfd_count == 0 && server.sofd < 0) {
serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
exit(1);
}

/* Create the Redis databases, and initialize other internal state. */
// 初始化默认的数据库,默认为16个
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
}
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
server.pubsub_patterns = listCreate();
listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
server.cronloops = 0;
server.rdb_child_pid = -1;
server.aof_child_pid = -1;
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
server.rdb_bgsave_scheduled = 0;
server.child_info_pipe[0] = -1;
server.child_info_pipe[1] = -1;
server.child_info_data.magic = 0;
aofRewriteBufferReset();
server.aof_buf = sdsempty();
server.lastsave = time(NULL); /* At startup we consider the DB saved. */
server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */
server.rdb_save_time_last = -1;
server.rdb_save_time_start = -1;
server.dirty = 0;
resetServerStats();
/* A few stats we don't want to reset: server startup time, and peak mem. */
server.stat_starttime = time(NULL);
server.stat_peak_memory = 0;
server.stat_rdb_cow_bytes = 0;
server.stat_aof_cow_bytes = 0;
server.cron_malloc_stats.zmalloc_used = 0;
server.cron_malloc_stats.process_rss = 0;
server.cron_malloc_stats.allocator_allocated = 0;
server.cron_malloc_stats.allocator_active = 0;
server.cron_malloc_stats.allocator_resident = 0;
server.lastbgsave_status = C_OK;
server.aof_last_write_status = C_OK;
server.aof_last_write_errno = 0;
server.repl_good_slaves_count = 0;

/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
// 时间事件,虽然这里设置了1毫秒,但是是为了保证每个EventLoop有时间或者timeout后,都能执行
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}

// 为监听的端口的fd设置epoll事件和回调, 针对TCP socket
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
// 为Unix socket的fd设置epoll事件和回调, 针对Unix socket
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");


// 针对管道的事件回调
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}

// 是否打开AOF
/* Open the AOF file if needed. */
if (server.aof_state == AOF_ON) {
server.aof_fd = open(server.aof_filename,
O_WRONLY|O_APPEND|O_CREAT,0644);
if (server.aof_fd == -1) {
serverLog(LL_WARNING, "Can't open the append-only file: %s",
strerror(errno));
exit(1);
}
}

// 针对32位系统的内存限制,限制为3G
/* 32 bit instances are limited to 4GB of address space, so if there is
* no explicit limit in the user provided configuration we set a limit
* at 3 GB using maxmemory with 'noeviction' policy'. This avoids
* useless crashes of the Redis instance for out of memory. */
if (server.arch_bits == 32 && server.maxmemory == 0) {
serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
server.maxmemory_policy = MAXMEMORY_NO_EVICTION;
}

// 是否打开redis集群
if (server.cluster_enabled) clusterInit();
replicationScriptCacheInit();
scriptingInit(1);
slowlogInit();
latencyMonitorInit();
bioInit();
server.initial_memory_usage = zmalloc_used_memory();
}

initServer()主要做了:

  • 获取系统部分限制,比如文件最大打开数和32位系统下限制最大3G内存使用。
  • 开始监听对应的TCP socket、Unix socket等,并将它们加入到EventLoop中监听读事件。
  • ServerCron函数加加入到时间事件中。
  • 初始化其他各种功能。