Redis源码阅读笔记-各命令的执行(编写中)

由上两篇Redis源码阅读笔记-命令的接收和执行过程(一) Redis源码阅读笔记-命令的接收和执行过程(二) 可以知道,大部分命令是从server.c文件中的int processCommand(client *c)函数,通过lookupCommand()获的结构体为redisCommand的命令的。其实这是通过命令名在server.commands中来查看的。

Redis的命令列表是保存在全局变量server.commands中的,而其初始化是在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// server.c
void initServerConfig(void) {
......
server.commands = dictCreate(&commandTableDictType,NULL);
server.orig_commands = dictCreate(&commandTableDictType,NULL);
populateCommandTable();
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
server.lpushCommand = lookupCommandByCString("lpush");
server.lpopCommand = lookupCommandByCString("lpop");
server.rpopCommand = lookupCommandByCString("rpop");
server.sremCommand = lookupCommandByCString("srem");
server.execCommand = lookupCommandByCString("exec");
server.expireCommand = lookupCommandByCString("expire");
server.pexpireCommand = lookupCommandByCString("pexpire");
......
}

而其中,populateCommandTable()是给server.commandsserver.orig_commands初始化值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// server.c
/* Populates the Redis Command Table starting from the hard coded list
* we have on top of redis.c file. */
void populateCommandTable(void) {
int j;
int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);

for (j = 0; j < numcommands; j++) {
struct redisCommand *c = redisCommandTable+j;
char *f = c->sflags;
int retval1, retval2;

// 区分命令的类型
while(*f != '\0') {
switch(*f) {
case 'w': c->flags |= CMD_WRITE; break;
case 'r': c->flags |= CMD_READONLY; break;
case 'm': c->flags |= CMD_DENYOOM; break;
case 'a': c->flags |= CMD_ADMIN; break;
case 'p': c->flags |= CMD_PUBSUB; break;
case 's': c->flags |= CMD_NOSCRIPT; break;
case 'R': c->flags |= CMD_RANDOM; break;
case 'S': c->flags |= CMD_SORT_FOR_SCRIPT; break;
case 'l': c->flags |= CMD_LOADING; break;
case 't': c->flags |= CMD_STALE; break;
case 'M': c->flags |= CMD_SKIP_MONITOR; break;
case 'k': c->flags |= CMD_ASKING; break;
case 'F': c->flags |= CMD_FAST; break;
default: serverPanic("Unsupported command flag"); break;
}
f++;
}

retval1 = dictAdd(server.commands, sdsnew(c->name), c);
/* Populate an additional dictionary that will be unaffected
* by rename-command statements in redis.conf. */
retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
}
}

redisCommandTable则是在server.c文件中的一个全局变量,保存着命令字符串和操作函数。

redisCommand结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//server.h 

struct redisCommand {
char *name;
redisCommandProc *proc;
int arity;
char *sflags; /* Flags as string representation, one char per flag. */
int flags; /* The actual flags, obtained from the 'sflags' field. */
/* Use a function to determine keys arguments in a command line.
* Used for Redis Cluster redirect. */
redisGetKeysProc *getkeys_proc;
/* What keys should be loaded in background when calling this command? */
int firstkey; /* The first argument that's a key (0 = no keys) */
int lastkey; /* The last argument that's a key */
int keystep; /* The step between first and last key */
long long microseconds, calls;
};
  • name: 是命令的名字。
  • proc: 是命令的操作函数的指针,指向这个命令的具体操作。
  • arity: 命令所需参数的数量。
  • sflags: 命令标识的字符串形式。
    • w: 写命令。
    • r: 读命令。
    • m: 当调用时可能会增加内存的使用。(当超过内存限制时不允许执行)
    • a: 管理员权限的命令,etc: SAVESHUTDOWN
    • p: Pub/Sub相关的命令。
    • f: 无论server.dirty的值如何,都强制执行。
    • s: 不允许在Lua脚本中执行此函数。
    • R: 随机类型的命令,这表示命令是不幂等的。意思是,同一个命令,具有相同的参数和键空间,可能会有不同的结果。比如SPOPRANDOMKEY
    • S: 表示如果命令从脚本调用,会将输出的数组排序,使得结果具有幂等性。
    • l: 表示允许该命令在加载数据库时执行。
    • t: 表示允许命令在“从”服务器中有过时数据时执行。这个命令类型是很少的。
    • M: 表示命令不会自动在MONITOR上传播。
    • k: 表示会对该命令隐式得执行ASKING这会使得在集群模式下,slot会被标记为importing
    • F: 快速执行的命令(一般是时间复杂度为O(n)O(log(n))的命令)。只要内核调度给予时间则都不应该被延迟执行。PS:对于SET来说,由于可能会触发DEL命令,所以不是快速命令。
  • flags: 命令表示的比特位形式。
  • getkeys_proc: 指向一个帮助在命令行中获取”键”参数的函数。(可选的,仅当下面3个参数不足以指定哪些是“键”)。
  • firstkey: 表明第一个参数是“键”。
  • lastkey: 表明最后一个参数是”键”。
  • keystep: “键”参数的间隔数,比如MSET中参数形式为key, val, key, val

命令的执行

MODULE模块化命令

Reids 4.0 中加入的功能,可以通过外部模块对Redis进行功能扩展。

  • sflagsas,表示其为管理员权限的命令,而且不允许被Lua脚本调用。

命令形式

  • MODULE LOAD /path/to/mymodule.so表示加载模块。
  • MODULE LIST表示列出已经加载的模块。
  • MODULE UNLOAD mymodule表示卸载模块。

内部实现函数

打算迟点写一下module模块的实现,所以更详细的迟点再写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// module.c

/* Redis MODULE command.
*
* MODULE LOAD <path> [args...] */
void moduleCommand(client *c) {
char *subcmd = c->argv[1]->ptr;

if (!strcasecmp(subcmd,"load") && c->argc >= 3) {
// 调用 `MODULE LOAD /path/to/mymodule.so`
robj **argv = NULL;
int argc = 0;

if (c->argc > 3) {
argc = c->argc - 3;
argv = &c->argv[3];
}

// 真正加载模块的函数,该函数工作顺序:
// 1. 通过`dlopen()`加载动态链接库
// 2. 通过`dlsym()`获得 RedisModule_OnLoad 的加载函数 onload
// 3. 调用 onload 加载
// 4. 加载成功,以模块名字为Key,保存入全局字典变量modules中
if (moduleLoad(c->argv[2]->ptr,(void **)argv,argc) == C_OK)
addReply(c,shared.ok);
else
addReplyError(c,
"Error loading the extension. Please check the server logs.");
} else if (!strcasecmp(subcmd,"unload") && c->argc == 3) {
// 调用 `MODULE UNLOAD mymodule`
// 真正卸载模块的函数,该函数工作顺序:
// 1. 在注销模块注册的命令
// 2. 注销所有此模块的消息订阅
// 3. 通过`dlclose()`卸载模块
// 4. 在全局字典变量modules删除模块
if (moduleUnload(c->argv[2]->ptr) == C_OK)
addReply(c,shared.ok);
else {
char *errmsg;
switch(errno) {
case ENOENT:
errmsg = "no such module with that name";
break;
case EBUSY:
errmsg = "the module exports one or more module-side data types, can't unload";
break;
default:
errmsg = "operation not possible.";
break;
}
addReplyErrorFormat(c,"Error unloading module: %s",errmsg);
}
} else if (!strcasecmp(subcmd,"list") && c->argc == 2) {
// 调用`MODULE LIST`
dictIterator *di = dictGetIterator(modules);
dictEntry *de;

// 遍历全局字典表量modules, 获得其中的模块名
addReplyMultiBulkLen(c,dictSize(modules));
while ((de = dictNext(di)) != NULL) {
sds name = dictGetKey(de);
struct RedisModule *module = dictGetVal(de);
addReplyMultiBulkLen(c,4);
addReplyBulkCString(c,"name");
addReplyBulkCBuffer(c,name,sdslen(name));
addReplyBulkCString(c,"ver");
addReplyLongLong(c,module->ver);
}
dictReleaseIterator(di);
} else {
addReply(c,shared.syntaxerr);
}
}

GET命令

返回与键 key 相关联的字符串值。

如果键key不存在,那么返回特殊值nil;否则,返回键key的值。

如果键key的值并非字符串类型,那么返回一个错误,因为GET命令只能用于字符串值。

  • sflagsrF,表示是读命令,且快速执行,不应该被延迟。

命令形式

1
GET {key name}

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// t_string.c
void getCommand(client *c) {
getGenericCommand(c);
}

int getGenericCommand(client *c) {
robj *o;

// c->argv[1]为命令中的 key,`shared.nullbulk`是返回的默认值 `nil`
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
return C_OK;

// 判断返回的值得格式,只允许字符串格式
if (o->type != OBJ_STRING) {
addReply(c,shared.wrongtypeerr);
return C_ERR;
} else {
addReplyBulk(c,o);
return C_OK;
}
}

// db.c
robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
robj *o = lookupKeyRead(c->db, key);
if (!o) addReply(c,reply);
return o;
}

调用流程:

  1. getCommand()在函数内调用getGenericCommand()
  2. getGenericCommand(),在函数内调用lookupKeyReadOrReply(),并设置null为默认值。
  3. lookupKeyReadOrReply()client->db中查找key,如果没有找到就返回默认值。

SET命令

将字符串值 value 关联到 key

如果 key 已经持有其他值, SET 就覆写旧值, 无视类型。

SET 命令对一个带有生存时间(TTL)的键进行设置之后, 该键原有的 TTL 将被清除。

  • sflagswm,写命令,并可能导致增加内存的使用。

命令形式

1
SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>]
  • EX seconds : 将键的过期时间设置为 seconds 秒。 执行 SET key value EX seconds 的效果等同于执行 SETEX key seconds value
  • PX milliseconds : 将键的过期时间设置为 milliseconds 毫秒。 执行 SET key value PX milliseconds 的效果等同于执行 PSETEX key milliseconds value
  • NX : 只在键不存在时, 才对键进行设置操作。 执行 SET key value NX 的效果等同于执行 SETNX key value
  • XX : 只在键已经存在时, 才对键进行设置操作。

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// t_string.c

/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
void setCommand(client *c) {
int j;
robj *expire = NULL;
int unit = UNIT_SECONDS;
int flags = OBJ_SET_NO_FLAGS;

// 判断是否有 `EX` `PX` `NX` `XX`
// 并将其标志入flags中
// expire是保存过期时间
// unit 是保存过期时间的单位
for (j = 3; j < c->argc; j++) {
char *a = c->argv[j]->ptr;
robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];

if ((a[0] == 'n' || a[0] == 'N') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
!(flags & OBJ_SET_XX))
{
flags |= OBJ_SET_NX;
} else if ((a[0] == 'x' || a[0] == 'X') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
!(flags & OBJ_SET_NX))
{
flags |= OBJ_SET_XX;
} else if ((a[0] == 'e' || a[0] == 'E') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
!(flags & OBJ_SET_PX) && next)
{
flags |= OBJ_SET_EX;
unit = UNIT_SECONDS;
expire = next;
j++;
} else if ((a[0] == 'p' || a[0] == 'P') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
!(flags & OBJ_SET_EX) && next)
{
flags |= OBJ_SET_PX;
unit = UNIT_MILLISECONDS;
expire = next;
j++;
} else {
addReply(c,shared.syntaxerr);
return;
}
}

// 将 value 编码
c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}

void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0; /* initialized to avoid any harmness warning */

// 判断是否需要设置过期时间
if (expire) {
// 如果是需要设置过期时间,
// 则从expire中获取,
// 并将其转成 long long 类型保存入变量milliseconds中
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
return;
if (milliseconds <= 0) {
addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
return;
}
// 如果传入的单位是秒,则需要将变量milliseconds中的值转成毫秒
if (unit == UNIT_SECONDS) milliseconds *= 1000;
}

// 满足条件
// 1. 带NX参数,且DB中的key已经存在
// 或
// 2. 带XX参数,且DB中的key不存在
// 则返回空
if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
(flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
{
addReply(c, abort_reply ? abort_reply : shared.nullbulk);
return;
}

// 将键值对添加进c->db中
setKey(c->db,key,val);
server.dirty++;
// 如果有过期时间,则会将过期时间和键值,保存到`c->db->expires`中
if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
// 触发键空间的事件
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
"expire",key,c->db->id);
// 返回响应
addReply(c, ok_reply ? ok_reply : shared.ok);
}

SETNX命令

只在键 key 不存在的情况下, 将键 key 的值设置为 value

若键 key 已经存在, 则 SETNX 命令不做任何动作。

SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。

  • sflagswmF,写命令,并可能导致增加内存的使用。快速执行,不应该被延迟。

PS: 可以用来实现分布式锁。

命令形式

1
SETNX key value

内部实现函数

1
2
3
4
5
// t_string.c
void setnxCommand(client *c) {
c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c,OBJ_SET_NX,c->argv[1],c->argv[2],NULL,0,shared.cone,shared.czero);
}

实际上是调用setGenericCommand(),详见SET的代码解析。

SETEX命令

将键 key 的值设置为 value , 并将键 key 的生存时间设置为 seconds 秒钟。

如果键 key 已经存在, 那么 SETEX 命令将覆盖已有的值。

  • sflagswm,写命令,并可能导致增加内存的使用。

命令形式

1
SETEX key value seconds

内部实现函数

1
2
3
4
5
// t_string.c
void setexCommand(client *c) {
c->argv[3] = tryObjectEncoding(c->argv[3]);
setGenericCommand(c,OBJ_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_SECONDS,NULL,NULL);
}

实际上是调用setGenericCommand(),详见SET的代码解析。

PSETEX命令

SETEX 类似,差别在时间单位是毫秒。

  • sflagswm,写命令,并可能导致增加内存的使用。

命令形式

1
SETEX key value milliseconds

内部实现函数

1
2
3
4
5
// t_string.c
void psetexCommand(client *c) {
c->argv[3] = tryObjectEncoding(c->argv[3]);
setGenericCommand(c,OBJ_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_MILLISECONDS,NULL,NULL);
}

实际上是调用setGenericCommand(),详见SET的代码解析。

APPEND命令

如果键 key 已经存在并且它的值是一个字符串, APPEND 命令将把 value 追加到键 key 现有值的末尾。

如果 key 不存在, APPEND 就简单地将键 key 的值设为 value , 就像执行 SET key value 一样。

  • sflagswm,写命令,并可能导致增加内存的使用。

命令形式

1
APPEND key value

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// t_string.c
void appendCommand(client *c) {
// totlen 是返回的新值的长度
size_t totlen;
robj *o, *append;


// 根据键获取值
// lookupKeyWrite() 会检查键是否已经过期
o = lookupKeyWrite(c->db,c->argv[1]);
if (o == NULL) {
// 没有获取到对应键的值,则创建一个新键值对
/* Create the key */
c->argv[2] = tryObjectEncoding(c->argv[2]);
dbAdd(c->db,c->argv[1],c->argv[2]);
incrRefCount(c->argv[2]);
totlen = stringObjectLen(c->argv[2]);
} else {
/* Key exists, check type */
// 键存在,并检查值得类型
if (checkType(c,o,OBJ_STRING))
return;

/* "append" is an argument, so always an sds */
append = c->argv[2];
// 计算新字符串的长度值,并检查是否超过最大限制,默认为512MB
totlen = stringObjectLen(o)+sdslen(append->ptr);
if (checkStringLength(c,totlen) != C_OK)
return;

/* Append the value */
// 调用dbUnshareStringValue()
// dbUnshareStringValue() 会检查字符串的引用,
// 如果引用大于1,则函数会创建一个新的sds字符串返回,
// 并将引用减1
o = dbUnshareStringValue(c->db,c->argv[1],o);
// 追加新值
o->ptr = sdscatlen(o->ptr,append->ptr,sdslen(append->ptr));
totlen = sdslen(o->ptr);
}
signalModifiedKey(c->db,c->argv[1]);
// 触发事件
notifyKeyspaceEvent(NOTIFY_STRING,"append",c->argv[1],c->db->id);
server.dirty++;
addReplyLongLong(c,totlen);
}

STRLEN命令

返回键 key 储存的字符串值的长度。

当键 key 不存在时, 命令返回 0

key 储存的不是字符串值时, 返回一个错误。

  • sflagsrF,只读,快速执行的命令,不应该被延迟。

命令形式

1
STRLEN key

内部实现函数

1
2
3
4
5
6
7
8
9
// t_string.c
void strlenCommand(client *c) {
robj *o;
// 使用key(c->argv[1])以只读模式获取值
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,o,OBJ_STRING)) return;
// stringObjectLen()是sds获取字符串长度的方法
addReplyLongLong(c,stringObjectLen(o));
}

DEL命令

删除给定的一个或多个 key

  • sflagsw,写命令。

命令形式

1
DEL key [key ...]

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// db.c

void delCommand(client *c) {
// 0表示非延迟删除
delGenericCommand(c,0);
}

/* This command implements DEL and LAZYDEL. */
void delGenericCommand(client *c, int lazy) {
int numdel = 0, j;

// DEL 命令可以接受多个key
for (j = 1; j < c->argc; j++) {
// 检查键是否过期
expireIfNeeded(c->db,c->argv[j]);
// 通过lazy判断删除是否是延迟删除
// 如果是延迟删除,则会调用异步删除
// 如果不是,则会同步删除
int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
dbSyncDelete(c->db,c->argv[j]);
if (deleted) {
signalModifiedKey(c->db,c->argv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,
"del",c->argv[j],c->db->id);
server.dirty++;
numdel++;
}
}
addReplyLongLong(c,numdel);
}
  • 将删除操作,封装进一个BIO任务(多线程操作)中。在EventLoop的阻塞期间执行(可以参考[事件和事件循环](http://blog.mingforpc.me/2018/12/18/Redis%E6%BA%90%E7%A0%81%E9%98%85%E8%AF%BB%E7%AC%94%E8%AE%B0-%E4%BA%8B%E4%BB%B6%E5%92%8C%E4%BA%8B%E4%BB%B6%E5%BE%AA%E7%8E%AF/))
    1
    2
    3
    4
    5
    6
    7
    8
    9
    * ```dbSyncDelete()```: 同步,直接`db`中删除键值对。

    ### `UNLINK`命令

    延迟删除给定的一个或多个 `key` 。4.0 中加入,解决大KEY删除的问题。

    * `sflags`:`wF`,写命令,快速执行,不应该被延迟。

    #### 命令形式

DEL key [key …]

1
2
3
4
5
6
7
8
9

#### 内部实现函数

```c
// db.c
void unlinkCommand(client *c) {
// 1 表示延迟删除
delGenericCommand(c,1);
}

详见DEL中的详解。

EXISTS命令

检查给定 key 是否存在。

  • sflagsrF,只读,快速执行命令,不应该被延迟。

命令形式

1
EXISTS key [key2... keyN]

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// db.c
/* EXISTS key1 key2 ... key_N.
* Return value is the number of keys existing. */
void existsCommand(client *c) {
long long count = 0;
int j;

for (j = 1; j < c->argc; j++) {
// 在db(hash map)中检查key是否存在,
// 并且会检查key是否过期的
if (lookupKeyRead(c->db,c->argv[j])) count++;
}
addReplyLongLong(c,count);
}

SETBIT命令

key 所储存的字符串值,设置或清除指定偏移量上的位(bit)。

位的设置或清除取决于 value 参数,可以是 0 也可以是 1

key 不存在时,自动生成一个新的字符串值。

字符串会进行伸展(grown)以确保它可以将 value 保存在指定的偏移量上。当字符串值进行伸展时,空白位置以 0 填充。

offset 参数必须大于或等于 0 ,小于 2^32 (bit 映射被限制在 512 MB 之内)。

  • sflagswm,写命令,并且可能增加内存的使用。

命令形式

1
SETBIT key offset bitvalue

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// bitops.c

/* SETBIT key offset bitvalue */
void setbitCommand(client *c) {
robj *o;
char *err = "bit is not an integer or out of range";
size_t bitoffset;
ssize_t byte, bit;
int byteval, bitval;
long on;

// 获取offset,并赋值到变量bitoffset中
if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset,0,0) != C_OK)
return;

// 获取bitvalue,并赋值到变量on中
if (getLongFromObjectOrReply(c,c->argv[3],&on,err) != C_OK)
return;


/* Bits can only be set or cleared... */
// ~1 = 0xF...FFE
// 所以, 如果 on & ~1 > 0
// 表示 on 不等于(1或者0)
if (on & ~1) {
addReplyError(c,err);
return;
}
// 专门给位运算实现的函数
// 当 key 保存的值 不是 字符串类型,则会返回NULL
// 如果 key 不存在,则会生产指定长度butoffset的的字符串
// 如果 key 已经存在,且字符串长度不够,则会扩展至指定长度butoffset
if ((o = lookupStringForBitCommand(c,bitoffset)) == NULL) return;

/* Get current values */
// bitoffset >> 3 表示左移3位,既整除8,按照byte大小运算
byte = bitoffset >> 3;
byteval = ((uint8_t*)o->ptr)[byte];
bit = 7 - (bitoffset & 0x7);
// bitval 是原本的值
bitval = byteval & (1 << bit);

/* Update byte with new bit value and return original value */
byteval &= ~(1 << bit);
byteval |= ((on & 0x1) << bit);
// 赋新值
((uint8_t*)o->ptr)[byte] = byteval;
// 消息通知
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id);
server.dirty++;
addReply(c, bitval ? shared.cone : shared.czero);
}

GETBIT命令

key 所储存的字符串值,获取指定偏移量上的位(bit)。

offset 比字符串值的长度大,或者 key 不存在时,返回 0 。

  • sflagsrF,只读,快速执行命令,不应该被延迟。

命令形式

1
GETBIT key offset

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// bitops.c

/* GETBIT key offset */
void getbitCommand(client *c) {
robj *o;
char llbuf[32];
size_t bitoffset;
size_t byte, bit;
size_t bitval = 0;

// 获取offset,并赋值到变量bitoffset中
if (getBitOffsetFromArgument(c,c->argv[2],&bitoffset,0,0) != C_OK)
return;

// 获取值并检查是否是sds字符串格式
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,o,OBJ_STRING)) return;

// 按byte计算偏移量
byte = bitoffset >> 3;
bit = 7 - (bitoffset & 0x7);
// 判断值o的sds保存格式,因为sds会有字符串格式和整数格式
if (sdsEncodedObject(o)) {
// 字符串格式
if (byte < sdslen(o->ptr))

bitval = ((uint8_t*)o->ptr)[byte] & (1 << bit);
} else {
// 整数格式
// 通过ll2string()将long long格式转为字节大小的数值表示
if (byte < (size_t)ll2string(llbuf,sizeof(llbuf),(long)o->ptr))
bitval = llbuf[byte] & (1 << bit);
}

addReply(c, bitval ? shared.cone : shared.czero);
}

关于Redis中字符串的数据结构和编码可以看对象及其类型和编码

BITFIELD命令

BITFIELD 命令可以将一个 Redis 字符串看作是一个由二进制位组成的数组, 并对这个数组中储存的长度不同的整数进行访问 (被储存的整数无需进行对齐)。 换句话说, 通过这个命令, 用户可以执行诸如 “对偏移量 1234 上的 5 位长有符号整数进行设置”、 “获取偏移量 4567 上的 31 位长无符号整数”等操作。 此外, BITFIELD 命令还可以对指定的整数执行加法操作和减法操作, 并且这些操作可以通过设置妥善地处理计算时出现的溢出情况。

BITFIELD 命令可以在一次调用中同时对多个位范围进行操作: 它接受一系列待执行的操作作为参数, 并返回一个数组作为回复, 数组中的每个元素就是对应操作的执行结果。

  • sflagswm,写命令,并且可能增加内存的使用。

命令形式

1
BITFIELD key subcommmand-1 arg ... subcommand-2 arg ... subcommand-N ...

子命令支持:

1
2
3
4
GET <type> <offset>
SET <type> <offset> <value>
INCRBY <type> <offset> <increment>
OVERFLOW [WRAP|SAT|FAIL]

type为指定的二进制范围,i表示有符号整数,u表示无符号整数。例如u8表示8位长的无符号整数,i16表示16位长的有符号整数。

  • GET <type> <offset>: 返回指定的二进制位范围。
  • SET <type> <offset> <value>: 对指定的二进制位范围进行设置,并返回旧值。
  • INCRBY <type> <offset> <increment>: 对指定的二进制位范围进行加法操作,并返回旧值。
  • OVERFLOW [WRAP|SAT|FAIL]: 溢出控制:
    • WRAP: 使用回绕(wrap around)方法处理有符号整数和无符号整数的溢出情况。 对于无符号整数来说, 回绕就像使用数值本身与能够被储存的最大无符号整数执行取模计算, 这也是 C 语言的标准行为。 对于有符号整数来说, 上溢将导致数字重新从最小的负数开始计算, 而下溢将导致数字重新从最大的正数开始计算。 比如说, 如果我们对一个值为 127i8 整数执行加1操作, 那么将得到结果 -128
    • SAT: 使用饱和计算(saturation arithmetic)方法处理溢出, 也即是说, 下溢计算的结果为最小的整数值, 而上溢计算的结果为最大的整数值。 举个例子, 如果我们对一个值为 120i8 整数执行加 10 计算, 那么命令的结果将为 i8 类型所能储存的最大整数值 127 。 与此相反, 如果一个针对 i8 值的计算造成了下溢, 那么这个 i8 值将被设置为 -127
    • FAIL : 在这一模式下, 命令将拒绝执行那些会导致上溢或者下溢情况出现的计算, 并向用户返回空值表示计算未被执行。

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
// bittops.c

/* BITFIELD key subcommmand-1 arg ... subcommand-2 arg ... subcommand-N ...
*
* Supported subcommands:
*
* GET <type> <offset>
* SET <type> <offset> <value>
* INCRBY <type> <offset> <increment>
* OVERFLOW [WRAP|SAT|FAIL]
*/

// 子命令的结构体
struct bitfieldOp {
uint64_t offset; /* Bitfield offset. */
int64_t i64; /* Increment amount (INCRBY) or SET value */
int opcode; /* Operation id. */
int owtype; /* Overflow type to use. */
int bits; /* Integer bitfield bits width. */
int sign; /* True if signed, otherwise unsigned op. */
};

void bitfieldCommand(client *c) {
robj *o;
size_t bitoffset;
int j, numops = 0, changes = 0;
// 保存操作的数组
struct bitfieldOp *ops = NULL; /* Array of ops to execute at end. */
int owtype = BFOVERFLOW_WRAP; /* Overflow type. */
int readonly = 1;
size_t higest_write_offset = 0;

// 从命令中解析操作,保存到`ops`中
for (j = 2; j < c->argc; j++) {
// `remargs` 是用来判断参数是否足够的
int remargs = c->argc-j-1; /* Remaining args other than current. */
char *subcmd = c->argv[j]->ptr; /* Current command name. */
int opcode; /* Current operation code. */
long long i64 = 0; /* Signed SET value. */
int sign = 0; /* Signed or unsigned type? */
int bits = 0; /* Bitfield width in bits. */

if (!strcasecmp(subcmd,"get") && remargs >= 2)
// GET的子命令
opcode = BITFIELDOP_GET;
else if (!strcasecmp(subcmd,"set") && remargs >= 3)
// SET的子命令
opcode = BITFIELDOP_SET;
else if (!strcasecmp(subcmd,"incrby") && remargs >= 3)
// INCRBY的子命令
opcode = BITFIELDOP_INCRBY;
else if (!strcasecmp(subcmd,"overflow") && remargs >= 1) {
// 设置溢出处理
char *owtypename = c->argv[j+1]->ptr;
j++;
if (!strcasecmp(owtypename,"wrap"))
owtype = BFOVERFLOW_WRAP;
else if (!strcasecmp(owtypename,"sat"))
owtype = BFOVERFLOW_SAT;
else if (!strcasecmp(owtypename,"fail"))
owtype = BFOVERFLOW_FAIL;
else {
addReplyError(c,"Invalid OVERFLOW type specified");
zfree(ops);
return;
}
continue;
} else {
addReply(c,shared.syntaxerr);
zfree(ops);
return;
}

/* Get the type and offset arguments, common to all the ops. */
// getBitfieldTypeFromArgument() 获取命令行中的, `type`
// type 是有取值范围的
// 当是有符号整数时,支持 i1 < x < i64
// 当是无符号整数时,支持 u1 < x < u63
if (getBitfieldTypeFromArgument(c,c->argv[j+1],&sign,&bits) != C_OK) {
zfree(ops);
return;
}

// getBitOffsetFromArgument() 获取命令行中的, `offset`
// offset 存在大小限制,必须是 -1 < offset
// 而且,offset通过bit转换后,大小是要先知道 512*1024*1024 bytes(512MB)以内
if (getBitOffsetFromArgument(c,c->argv[j+2],&bitoffset,1,bits) != C_OK){
zfree(ops);
return;
}

// 除了 Get以外, SET 和 INCRBY 都有第3个参数
if (opcode != BITFIELDOP_GET) {
readonly = 0;
if (higest_write_offset < bitoffset + bits - 1)
higest_write_offset = bitoffset + bits - 1;
/* INCRBY and SET require another argument. */
if (getLongLongFromObjectOrReply(c,c->argv[j+3],&i64,NULL) != C_OK){
zfree(ops);
return;
}
}

/* Populate the array of operations we'll process. */
// 构成bitfieldOp结构体,保存入ops中
ops = zrealloc(ops,sizeof(*ops)*(numops+1));
ops[numops].offset = bitoffset;
ops[numops].i64 = i64;
ops[numops].opcode = opcode;
ops[numops].owtype = owtype;
ops[numops].bits = bits;
ops[numops].sign = sign;
numops++;

j += 3 - (opcode == BITFIELDOP_GET);
}

// readonly 标记为记录子命令中是否存在写的命令
if (readonly) {
/* Lookup for read is ok if key doesn't exit, but errors
* if it's not a string. */
o = lookupKeyRead(c->db,c->argv[1]);
if (o != NULL && checkType(c,o,OBJ_STRING)) return;
} else {
/* Lookup by making room up to the farest bit reached by
* this operation. */
if ((o = lookupStringForBitCommand(c,
higest_write_offset)) == NULL) return;
}

addReplyMultiBulkLen(c,numops);

/* Actually process the operations. */
for (j = 0; j < numops; j++) {
// 执行各个命令

struct bitfieldOp *thisop = ops+j;

/* Execute the operation. */
if (thisop->opcode == BITFIELDOP_SET ||
thisop->opcode == BITFIELDOP_INCRBY)
{
/* SET and INCRBY: We handle both with the same code path
* for simplicity. SET return value is the previous value so
* we need fetch & store as well. */

/* We need two different but very similar code paths for signed
* and unsigned operations, since the set of functions to get/set
* the integers and the used variables types are different. */
if (thisop->sign) {
// 有符号整数
int64_t oldval, newval, wrapped, retval;
int overflow;

// 获取旧值
oldval = getSignedBitfield(o->ptr,thisop->offset,
thisop->bits);

if (thisop->opcode == BITFIELDOP_INCRBY) {
// INCRBY
newval = oldval + thisop->i64;
// 检查Overflow
overflow = checkSignedBitfieldOverflow(oldval,
thisop->i64,thisop->bits,thisop->owtype,&wrapped);
if (overflow) newval = wrapped;
retval = newval;
} else {
// SET
newval = thisop->i64;
// 检查Overflow
overflow = checkSignedBitfieldOverflow(newval,
0,thisop->bits,thisop->owtype,&wrapped);
if (overflow) newval = wrapped;
retval = oldval;
}

/* On overflow of type is "FAIL", don't write and return
* NULL to signal the condition. */
if (!(overflow && thisop->owtype == BFOVERFLOW_FAIL)) {
addReplyLongLong(c,retval);
setSignedBitfield(o->ptr,thisop->offset,
thisop->bits,newval);
} else {
addReply(c,shared.nullbulk);
}
} else {
// 无符号整数
uint64_t oldval, newval, wrapped, retval;
int overflow;

// 获取旧值
oldval = getUnsignedBitfield(o->ptr,thisop->offset,
thisop->bits);

if (thisop->opcode == BITFIELDOP_INCRBY) {
// INCRBY
newval = oldval + thisop->i64;
overflow = checkUnsignedBitfieldOverflow(oldval,
thisop->i64,thisop->bits,thisop->owtype,&wrapped);
if (overflow) newval = wrapped;
retval = newval;
} else {
// SET
newval = thisop->i64;
overflow = checkUnsignedBitfieldOverflow(newval,
0,thisop->bits,thisop->owtype,&wrapped);
if (overflow) newval = wrapped;
retval = oldval;
}
/* On overflow of type is "FAIL", don't write and return
* NULL to signal the condition. */
if (!(overflow && thisop->owtype == BFOVERFLOW_FAIL)) {
addReplyLongLong(c,retval);
setUnsignedBitfield(o->ptr,thisop->offset,
thisop->bits,newval);
} else {
addReply(c,shared.nullbulk);
}
}
changes++;
} else {
/* GET */
unsigned char buf[9];
long strlen = 0;
unsigned char *src = NULL;
char llbuf[LONG_STR_SIZE];

if (o != NULL)
src = getObjectReadOnlyString(o,&strlen,llbuf);

/* For GET we use a trick: before executing the operation
* copy up to 9 bytes to a local buffer, so that we can easily
* execute up to 64 bit operations that are at actual string
* object boundaries. */
// 因为之前看,取值范围是有限制的,所以使用最多9个字节处理
memset(buf,0,9);
int i;
size_t byte = thisop->offset >> 3;
for (i = 0; i < 9; i++) {
if (src == NULL || i+byte >= (size_t)strlen) break;
buf[i] = src[i+byte];
}

/* Now operate on the copied buffer which is guaranteed
* to be zero-padded. */
if (thisop->sign) {
// 有符号整数
int64_t val = getSignedBitfield(buf,thisop->offset-(byte*8),
thisop->bits);
addReplyLongLong(c,val);
} else {
// 无符号整数
uint64_t val = getUnsignedBitfield(buf,thisop->offset-(byte*8),
thisop->bits);
addReplyLongLong(c,val);
}
}
}

if (changes) {
// 如果值改变了,则发送消息
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"setbit",c->argv[1],c->db->id);
server.dirty += changes;
}
zfree(ops);
}

SETRANGE命令

从偏移量 offset 开始, 用 value 参数覆写(overwrite)键 key 储存的字符串值。

不存在的键 key 当作空白字符串处理。

SETRANGE 命令会确保字符串足够长以便将 value 设置到指定的偏移量上, 如果键 key 原来储存的字符串长度比偏移量小(比如字符串只有 5 个字符长,但你设置的 offset 是 10 ), 那么原字符和偏移量之间的空白将用零字节(zerobytes, "\x00" )进行填充。

  • sflagswm,写命令,并且可能增加内存的使用。

命令形式

1
SETRANGE key offset value

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// t_string.c
void setrangeCommand(client *c) {
robj *o;
long offset;
sds value = c->argv[3]->ptr;

// 获取offset
if (getLongFromObjectOrReply(c,c->argv[2],&offset,NULL) != C_OK)
return;

// offset必须大于等于0
if (offset < 0) {
addReplyError(c,"offset is out of range");
return;
}

// 在db中根据键寻找值
o = lookupKeyWrite(c->db,c->argv[1]);
if (o == NULL) {
/* Return 0 when setting nothing on a non-existing string */
if (sdslen(value) == 0) {
// 当键的值不存在,且value为空,则直接返回0
addReply(c,shared.czero);
return;
}

/* Return when the resulting string exceeds allowed size */
// 判断设置偏移后,总长度是否大于512MB(由于这个键不存在,所以这里的其实检查了offset的大小)
if (checkStringLength(c,offset+sdslen(value)) != C_OK)
return;

// 直接创建一个sds字符串,并加入db
o = createObject(OBJ_STRING,sdsnewlen(NULL, offset+sdslen(value)));
dbAdd(c->db,c->argv[1],o);
} else {
size_t olen;

// 检查值得蕾西,需要时string
/* Key exists, check type */
if (checkType(c,o,OBJ_STRING))
return;

/* Return existing string length when setting nothing */
olen = stringObjectLen(o);
if (sdslen(value) == 0) {
// value为空,表示不设置值,直接返回
addReplyLongLong(c,olen);
return;
}

/* Return when the resulting string exceeds allowed size */
// 判断设置偏移后,总长度是否大于512MB
if (checkStringLength(c,offset+sdslen(value)) != C_OK)
return;

/* Create a copy when the object is shared or encoded. */
// 保存值,dbUnshareStringValue()会处理redisObject的引用问题
o = dbUnshareStringValue(c->db,c->argv[1],o);
}

if (sdslen(value) > 0) {
// 发小键值变化的消息
o->ptr = sdsgrowzero(o->ptr,offset+sdslen(value));
memcpy((char*)o->ptr+offset,value,sdslen(value));
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,
"setrange",c->argv[1],c->db->id);
server.dirty++;
}
addReplyLongLong(c,sdslen(o->ptr));
}

GETRANGE命令

返回键 key 储存的字符串值的指定部分, 字符串的截取范围由 startend 两个偏移量决定 (包括 startend 在内)。

负数偏移量表示从字符串的末尾开始计数, -1 表示最后一个字符, -2 表示倒数第二个字符, 以此类推。

GETRANGE 通过保证子字符串的值域(range)不超过实际字符串的值域来处理超出范围的值域请求。

  • sflagsr,只读命令。

命令形式

1
GETRANGE key start end

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// t_string.c

void getrangeCommand(client *c) {
robj *o;
long long start, end;
char *str, llbuf[32];
size_t strlen;

// 从参数中获取 start
if (getLongLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK)
return;
// 从参数中获取 end
if (getLongLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK)
return;
// 根据 key 获取其值 o, 并检查数据类型是否为redis字符串类型
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptybulk)) == NULL ||
checkType(c,o,OBJ_STRING)) return;

// 如果字符串中保存的是整型,则转成字符串形式
if (o->encoding == OBJ_ENCODING_INT) {
str = llbuf;
strlen = ll2string(llbuf,sizeof(llbuf),(long)o->ptr);
} else {
str = o->ptr;
strlen = sdslen(str);
}

// 对不正确的start,end进行处理
/* Convert negative indexes */
if (start < 0 && end < 0 && start > end) {
addReply(c,shared.emptybulk);
return;
}
// 对负数的(start, end)和越界的end进行处理
if (start < 0) start = strlen+start;
if (end < 0) end = strlen+end;
if (start < 0) start = 0;
if (end < 0) end = 0;
if ((unsigned long long)end >= strlen) end = strlen-1;

/* Precondition: end >= 0 && end < strlen, so the only condition where
* nothing can be returned is: start > end. */
if (start > end || strlen == 0) {
// start > end 的错误情况
// strlen == 0 的情况(PS:为什么这里不提前判断?提前判断了就不用做start和end的运算了。不过可能考虑的是代码的可读性,毕竟start和end的运行应该也花不了多少时间)
addReply(c,shared.emptybulk);
} else {
addReplyBulkCBuffer(c,(char*)str+start,end-start+1);
}
}

SUBSTR命令

在Redis 2.0 之前,SUBSTR其实是GETRANGE。现在的实现也是一样的,所以参照GETRANGE

  • sflagsr,只读命令。

INCR命令

为键 key 储存的数字值加上1

如果键 key 不存在, 那么它的值会先被初始化为 0 , 然后再执行 INCR 命令。

如果键 key 储存的值不能被解释为数字, 那么 INCR 命令将返回一个错误。

本操作的值限制在 64 位(bit)有符号数字表示之内。

  • sflagswmF,写命令,并可能导致增加内存的使用。快速执行,不应该被延迟。

命令形式

1
INCR key

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// t_string.c
void incrCommand(client *c) {
// 其实`INCR`,`INCRBY`,`DECR`,`DECRBY`都是调用的incrDecrCommand()函数
incrDecrCommand(c,1);
}

void incrDecrCommand(client *c, long long incr) {
long long value, oldvalue;
robj *o, *new;

// 查询键对应的值o
o = lookupKeyWrite(c->db,c->argv[1]);
// 检查值o的类型
if (o != NULL && checkType(c,o,OBJ_STRING)) return;
// 从o中获取整数值value(可能是sds的)
if (getLongLongFromObjectOrReply(c,o,&value,NULL) != C_OK) return;

// 判断相加后,是否会造成越界,不能超过LLONG_MIN,也不能小于LLONG_MAX
oldvalue = value;
if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) ||
(incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) {
addReplyError(c,"increment or decrement would overflow");
return;
}
value += incr;

if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT &&
(value < 0 || value >= OBJ_SHARED_INTEGERS) &&
value >= LONG_MIN && value <= LONG_MAX)
{
// 这是判断 值 o 是存在的,而且不在 OBJ_SHARED_INTEGERS (Redis初始化的共享整数)范围内
// 而且没有其他键引用到o(o->refcount == 1)
// 所以可以直接修改o中的值
new = o;
o->ptr = (void*)((long)value);
} else {
// createStringObjectFromLongLong() 会判断值value是否在 OBJ_SHARED_INTEGERS 的范围内
// 如果在范围内,则直接引用共享的整数对象
// 如果不是,则创建新的对象
new = createStringObjectFromLongLong(value);
if (o) {
// 原本键值对已经存在,则覆盖
dbOverwrite(c->db,c->argv[1],new);
} else {
// 原本键值不存在,则添加
dbAdd(c->db,c->argv[1],new);
}
}
// 发送消息
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id);
server.dirty++;
addReply(c,shared.colon);
addReply(c,new);
addReply(c,shared.crlf);
}

DECR命令

为键 key 储存的数字值减去1

如果键 key 不存在, 那么键 key 的值会先被初始化为 0 , 然后再执行 DECR 操作。

如果键 key 储存的值不能被解释为数字, 那么 DECR 命令将返回一个错误。

本操作的值限制在 64 位(bit)有符号数字表示之内。

  • sflagswmF,写命令,并可能导致增加内存的使用。快速执行,不应该被延迟。

命令形式

1
DECR key

内部实现函数

1
2
3
4
// t_string.c
void decrCommand(client *c) {
incrDecrCommand(c,-1);
}

incrDecrCommand()的解析参照INCR

MGET命令

返回所有(一个或多个)给定 key 的值。

如果给定的 key 里面,有某个 key 不存在,那么这个 key 返回特殊值 nil。因此,该命令永不失败。

  • sflagsrF,只读操作。快速执行,不应该被延迟。

命令形式

1
MGET key [key ...]

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// t_string.c
void mgetCommand(client *c) {
int j;
// 返回客户端要执行的命令数量
addReplyMultiBulkLen(c,c->argc-1);
for (j = 1; j < c->argc; j++) {
// 在DB中查找对应的值,并返回
robj *o = lookupKeyRead(c->db,c->argv[j]);
if (o == NULL) {
addReply(c,shared.nullbulk);
} else {
if (o->type != OBJ_STRING) {
// 只返回字符串类型
addReply(c,shared.nullbulk);
} else {
addReplyBulk(c,o);
}
}
}
}

RPUSH命令

将一个或多个值 value 插入到列表 key 的表尾(最右边)。 。

如果 key 不存在,一个空列表会被创建并执行 RPUSH 操作。

key 存在但不是列表类型时,返回一个错误。

  • sflagswmF,写命令,并可能导致增加内存的使用。快速执行,不应该被延迟。

命令形式

1
RPUSH key value [value ...]

内部实现函数

1
2
3
4
5
// t_list.c
void rpushxCommand(client *c) {
// RPUSH 和 LPUSH公用的函数
pushxGenericCommand(c,LIST_TAIL);
}

RPUSHLPUSH只是插入位置不同,功能相同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// t_list.c
void pushGenericCommand(client *c, int where) {
int j, pushed = 0;
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);

// 判断键的类型
if (lobj && lobj->type != OBJ_LIST) {
addReply(c,shared.wrongtypeerr);
return;
}

for (j = 2; j < c->argc; j++) {
if (!lobj) {
// 如果本来是空键,则新建 quick list, 并添加到db中
lobj = createQuicklistObject();
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
dbAdd(c->db,c->argv[1],lobj);
}
listTypePush(lobj,c->argv[j],where);
pushed++;
}
// 返回列表的长度
addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0));
if (pushed) {
// 如果有成功插入的,触发对应事件
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";

signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
server.dirty += pushed;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// t_list.c

/*-----------------------------------------------------------------------------
* List API
*----------------------------------------------------------------------------*/

/* The function pushes an element to the specified list object 'subject',
* at head or tail position as specified by 'where'.
*
* There is no need for the caller to increment the refcount of 'value' as
* the function takes care of it if needed. */
void listTypePush(robj *subject, robj *value, int where) {
// 新版本的Redis,已经将所有的列表实现换成 quick list
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
// 对val进行decode, 因为value有可能是字符串或者整型
value = getDecodedObject(value);
size_t len = sdslen(value->ptr);
// quick list的插入方法
quicklistPush(subject->ptr, value->ptr, len, pos);
decrRefCount(value);
} else {
serverPanic("Unknown list encoding");
}
}

LPUSH命令

将一个或多个值 value 插入到列表 key 的表头(最左边)。 。

如果 key 不存在,一个空列表会被创建并执行 LPUSH 操作。

key 存在但不是列表类型时,返回一个错误。

  • sflagswmF,写命令,并可能导致增加内存的使用。快速执行,不应该被延迟。

命令形式

1
LPUSH key value [value ...]

内部实现函数

1
2
3
4
// t_list.c
void lpushxCommand(client *c) {
pushxGenericCommand(c,LIST_HEAD);
}

其他具体看RPUSH

RPUSHX命令

将值 value 插入到列表 key 的表尾,当且仅当 key 存在并且是一个列表。

RPUSH 命令相反,当 key 不存在时, RPUSHX 命令什么也不做。

  • sflagswmF,写命令,并可能导致增加内存的使用。快速执行,不应该被延迟。

命令形式

1
RPUSHX key value [value ...]

内部实现函数

1
2
3
4
5
// t_list.c
void rpushxCommand(client *c) {
// RPUSHX 和 LPUSHX 公用的函数
pushxGenericCommand(c,LIST_TAIL);
}

RPUSHXLPUSHX只是插入位置不同,功能相同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// t_list.c
void pushxGenericCommand(client *c, int where) {
int j, pushed = 0;
robj *subject;

// 从db中获取指定的列表键的值,并判断是否为列表
if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,subject,OBJ_LIST)) return;

for (j = 2; j < c->argc; j++) {
// Push的方法
listTypePush(subject,c->argv[j],where);
pushed++;
}

// 返回列表的长度
addReplyLongLong(c,listTypeLength(subject));

if (pushed) {
// 如果有成功插入的,触发对应事件
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
server.dirty += pushed;
}
1
2
3
4
5
6
7
8
9
10

### `LPUSHX`命令

将值 `value` 插入到列表 `key` 的表头,**当且仅当** `key` 存在并且是一个列表。

和 `LPUSH` 命令相反,当 `key` 不存在时, `LPUSHX` 命令什么也不做。

* `sflags`:`wmF`,写命令,并可能导致增加内存的使用。快速执行,不应该被延迟。

#### 命令形式

LPUSHX key value [value …]

1
2
3
4
5
6
7
8

#### 内部实现函数

```c
// t_list.c
void lpushxCommand(client *c) {
pushxGenericCommand(c,LIST_HEAD);
}

具体参照RPUSHX

LINSERT命令

将值 value 插入到列表 key 当中,位于值 pivot 之前或之后。

pivot 不存在于列表 key 时,不执行任何操作。

key 不存在时, key 被视为空列表,不执行任何操作。

如果 key 不是列表类型,返回一个错误。

  • sflagswm,写命令,并可能导致增加内存的使用。

命令形式

1
LINSERT key BEFORE|AFTER pivot value

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// t_list.c
void linsertCommand(client *c) {
int where;
robj *subject;
listTypeIterator *iter;
listTypeEntry entry;
int inserted = 0;

// 判断是在指定位置前插入还是指定位置后插入
if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
where = LIST_TAIL;
} else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
where = LIST_HEAD;
} else {
addReply(c,shared.syntaxerr);
return;
}

// 从db中取出对应的key,并判断数据类型是否为list
if ((subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,subject,OBJ_LIST)) return;

/* Seek pivot from head to tail */
// 获取list的迭代器(封装了quick list的迭代)
iter = listTypeInitIterator(subject,0,LIST_TAIL);
while (listTypeNext(iter,&entry)) {
if (listTypeEqual(&entry,c->argv[3])) {
// 查找位置,并插入
listTypeInsert(&entry,c->argv[4],where);
inserted = 1;
break;
}
}
// 释放迭代器
listTypeReleaseIterator(iter);

if (inserted) {
// 如果有插入触发事件
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,"linsert",
c->argv[1],c->db->id);
server.dirty++;
} else {
/* Notify client of a failed insert */
addReply(c,shared.cnegone);
return;
}

// 返回列表长度
addReplyLongLong(c,listTypeLength(subject));
}

RPOP命令

移除并返回列表 key 的尾(最右)元素。

  • sflagswF,写命令。快速执行,不应该被延迟。

命令形式

1
RPOP key

内部实现函数

1
2
3
4
// t_list.c
void rpopCommand(client *c) {
popGenericCommand(c,LIST_TAIL);
}

RPOPLPOP只是位置不同,功能相同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// t_list.c
void popGenericCommand(client *c, int where) {
// 查找对应的键, 并判断类型
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;

// 尝试从尾取出值
robj *value = listTypePop(o,where);
if (value == NULL) {
// 没有值取出(长度为0,一般不会出现)
addReply(c,shared.nullbulk);
} else {
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
// 返回取出的值
addReplyBulk(c,value);
// 引用次数减1
decrRefCount(value);
// 触发对应事件
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
if (listTypeLength(o) == 0) {
// 如果取出值后,列表长度为0,则直接删除列表
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
c->argv[1],c->db->id);
dbDelete(c->db,c->argv[1]);
}
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// t_list.c
robj *listTypePop(robj *subject, int where) {
long long vlong;
robj *value = NULL;

int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL;
//判断类型, 现在列表的实现只剩下quick list
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
// quick list 的取出操作
if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value,
NULL, &vlong, listPopSaver)) {
if (!value)
value = createStringObjectFromLongLong(vlong);
}
} else {
serverPanic("Unknown list encoding");
}
return value;
}

LPOP命令

移除并返回列表 key 的头(最左)元素。

key不存在时,返回nil

  • sflagswF,写命令。快速执行,不应该被延迟。

命令形式

1
LPOP key

内部实现函数

1
2
3
4
// t_list.c
void lpopCommand(client *c) {
popGenericCommand(c,LIST_HEAD);
}

RPOPLPOP只是位置不同,功能相同。关于popGenericCommand()的具体命令参考RPOP

BRPOP命令

它是 RPOP 命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BRPOP 命令阻塞,直到等待超时或发现可弹出元素为止。

当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的尾部元素。

假如在指定时间内没有任何元素被弹出,则返回一个 nil等待时长
反之,返回一个含有两个元素的列表,第一个元素是被弹出元素所属的 key ,第二个元素是被弹出元素的

  • sflagsws,写命令,不允许在Lua脚本中执行此函数。

命令形式

1
BRPOP key [key ...] timeout

内部实现函数

1
2
3
4
// t_list.c
void brpopCommand(client *c) {
blockingPopGenericCommand(c,LIST_TAIL);
}

阻塞的pop操作统一通过blockingPopGenericCommand处理,只是位置不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// t_list.c
/* Blocking RPOP/LPOP */
void blockingPopGenericCommand(client *c, int where) {
robj *o;
mstime_t timeout;
int j;

// 从命令参数中获取客户端设置的过期时间(c->argv[c->argc-1] 最后一个参数),转化成秒的形式
// 所以可以看出,BRPOP/BLPOP接受的最小单位是秒(SECONDS)
if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
!= C_OK) return;

// 循环处理指定的key
for (j = 1; j < c->argc-1; j++) {
// 从db中检查key是否存在
o = lookupKeyWrite(c->db,c->argv[j]);
if (o != NULL) {
// 检查key的数据类型是否为list
if (o->type != OBJ_LIST) {
addReply(c,shared.wrongtypeerr);
return;
} else {
if (listTypeLength(o) != 0) {
// 列表数据长度>0,所以可以直接取数据返回

/* Non empty list, this is like a non normal [LR]POP. */
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
robj *value = listTypePop(o,where);
serverAssert(value != NULL);

// 返回取出的key和val
addReplyMultiBulkLen(c,2);
addReplyBulk(c,c->argv[j]);
addReplyBulk(c,value);
// 因为已经pop出来了,所以引用次数减1
decrRefCount(value);

// 触发事件
notifyKeyspaceEvent(NOTIFY_LIST,event,
c->argv[j],c->db->id);
if (listTypeLength(o) == 0) {
// 判断取出后列表长度,如果已经为0,则删除列表
dbDelete(c->db,c->argv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
c->argv[j],c->db->id);
}
signalModifiedKey(c->db,c->argv[j]);
server.dirty++;

/* Replicate it as an [LR]POP instead of B[LR]POP. */
rewriteClientCommandVector(c,2,
(where == LIST_HEAD) ? shared.lpop : shared.rpop,
c->argv[j]);
return;
}
}
}
}

// 在事务(CLIENT_MULTI)中使用 BRPOP/BLPOP 没意义,所以把它当做无阻塞处理
/* If we are inside a MULTI/EXEC and the list is empty the only thing
* we can do is treating it as a timeout (even with timeout 0). */
if (c->flags & CLIENT_MULTI) {
addReply(c,shared.nullmultibulk);
return;
}

// 阻塞处理
/* If the list is empty or the key does not exists we must block */
blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
}

blockForKeys()才是阻塞的开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// t_list.c
/* Set a client in blocking mode for the specified key, with the specified
* timeout */
void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
dictEntry *de;
list *l;
int j;

c->bpop.timeout = timeout; // 为这个客户端连接(client)设置阻塞时间
c->bpop.target = target;

if (target != NULL) incrRefCount(target);

for (j = 0; j < numkeys; j++) {
// 循环处理每个key

// 为这个客户端连接(client)的bpop操作添加key
/* If the key already exists in the dict ignore it. */
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
incrRefCount(keys[j]);

// redis的db中,会维持中一个dict(blocking_keys)来保存被阻塞操作的key
// 下面则是尝试从这个db的blocking_keys中查找指定的key
/* And in the other "side", to map keys -> clients */
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
int retval;

// 阻塞key没有在db中查找到,则添加
/* For every key we take a list of clients blocked for it */
l = listCreate();
retval = dictAdd(c->db->blocking_keys,keys[j],l);
incrRefCount(keys[j]);
serverAssertWithInfo(c,keys[j],retval == DICT_OK);
} else {
// 阻塞key查找到,从中获取list
l = dictGetVal(de);
}
// 将客户端连接保存进list
// 这个list, 是db中,将阻塞key和对应了客户端连接相关联的list,用来保存这个key被哪些客户端阻塞获取
listAddNodeTail(l,c);
}
// 阻塞客户端
blockClient(c,BLOCKED_LIST);
}

通过blockClient()将客户端连接阻塞。

对于阻塞连接的过期触发,会被server.c中的clientsCronHandleTimeout()来触发的。

至于key的变动触发,则是通过t_list.c中的handleClientsBlockedOnLists()来触发的,从注释中可以知道,每个单个命令后,Redis都会触发这个函数,或者在MULTI/EXECLua脚本后。

BLPOP命令

它是 LPOP 命令的阻塞版本,当给定列表内没有任何元素可供弹出的时候,连接将被 BLPOP 命令阻塞,直到等待超时或发现可弹出元素为止。

当给定多个 key 参数时,按参数 key 的先后顺序依次检查各个列表,弹出第一个非空列表的头部元素。

假如在指定时间内没有任何元素被弹出,则返回一个 nil等待时长
反之,返回一个含有两个元素的列表,第一个元素是被弹出元素所属的 key ,第二个元素是被弹出元素的

  • sflagsws,写命令,不允许在Lua脚本中执行此函数。

命令形式

1
BLPOP key [key ...] timeout

内部实现函数

1
2
3
void blpopCommand(client *c) {
blockingPopGenericCommand(c,LIST_HEAD);
}

与命令BRPOP只是元素弹出的位置不同,所以详细可以看BRPOP

BRPOPLPUSH命令

BRPOPLPUSHRPOPLPUSH 的阻塞版本,当给定列表 source 不为空时, BRPOPLPUSH 的表现和 RPOPLPUSH 一样。

命令 BRPOPLPUSH 执行以下两个动作:

  • 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端。
  • source 弹出的元素插入到列表 destination ,作为 destination 列表的的头元素。

当列表 source 为空时, BRPOPLPUSH 命令将阻塞连接,直到等待超时,或有另一个客户端对 source 执行 LPUSHRPUSH 命令为止。

超时参数 timeout 接受一个以秒为单位的数字作为值。超时参数设为 0 表示阻塞时间可以无限期延长(block indefinitely) 。

假如在指定时间内没有任何元素被弹出,则返回一个 nil 和等待时长。
反之,返回一个含有两个元素的列表,第一个元素是被弹出元素的值,第二个元素是等待时长。

  • sflagswms,写命令,可能改变内存使用,不允许在Lua脚本中执行此函数。

命令形式

1
BRPOPLPUSH source destination timeout

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// t_list.c
void brpoplpushCommand(client *c) {
mstime_t timeout;

// 从命令参数中解析出timeout秒数
if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_SECONDS)
!= C_OK) return;

// 取出source对应的key
robj *key = lookupKeyWrite(c->db, c->argv[1]);

if (key == NULL) {
// source对应的key不存在
if (c->flags & CLIENT_MULTI) {
// 事务不支持阻塞(Block)
/* Blocking against an empty list in a multi state
* returns immediately. */
addReply(c, shared.nullbulk);
} else {
// c->argv + 1 是 source, c->argv[2] 是 destination
/* The list is empty and the client blocks. */
blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
}
} else {
if (key->type != OBJ_LIST) {
addReply(c, shared.wrongtypeerr);
} else {
// source存在且有值(默认情况下, 没值的list类型会被删除)
/* The list exists and has elements, so
* the regular rpoplpushCommand is executed. */
serverAssertWithInfo(c,key,listTypeLength(key) > 0);
// 直接调用 rpoplpushCommand() 不需要阻塞
rpoplpushCommand(c);
}
}
}
  • 对于blockForKeys()可以参考BRPOP中的内容。
  • 对于rpoplpushCommand()可以参考RPOPLPUSH的内容。

LLEN命令

返回列表 key 的长度。

如果 key 不存在,则 key 被解释为一个空列表,返回 0 .

如果 key 不是列表类型,返回一个错误。

  • sflagsrF,只读操作。快速执行,不应该被延迟。

命令形式

1
LLEN key

内部实现函数

1
2
3
4
5
6
7
8
9
// t_list.c
void llenCommand(client *c) {
// 尝试在db中取出key, 并判断类型
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;

// 获取list类型的长度(其实,list类型的实现只剩下quick list了)
addReplyLongLong(c,listTypeLength(o));
}
1
2
3
4
5
6
7
8
9
// t_list
// 获取list类型的对象长度(只剩下quick list)
unsigned long listTypeLength(const robj *subject) {
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
return quicklistCount(subject->ptr);
} else {
serverPanic("Unknown list encoding");
}
}

LINDEX命令

返回列表 key 中,下标为 index 的元素。

下标(index)参数 startstop 都以 0 为底,也就是说,以 0 表示列表的第一个元素,以 1 表示列表的第二个元素,以此类推。

你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推。

如果 key 不是列表类型,返回一个错误。

如果 index 参数的值不在列表的区间范围内(out of range),返回 nil

  • sflagsr,只读命令。

命令形式

1
LINDEX key index

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// t_list.c
void lindexCommand(client *c) {

// 尝试从db中查找key并判断类型
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
long index;
robj *value = NULL;

// 从命令参数中获取index
if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
return;

// list的实现,只剩下quick list
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklistEntry entry;
if (quicklistIndex(o->ptr, index, &entry)) {
// 从quicklist中取出对应index的entry
if (entry.value) {
// 新建value字符类型
value = createStringObject((char*)entry.value,entry.sz);
} else {
// 新建value整型类型
value = createStringObjectFromLongLong(entry.longval);
}
// 返回
addReplyBulk(c,value);
// 减少value的引用次数(因为create的时候会默认置为1或者增1)
decrRefCount(value);
} else {
addReply(c,shared.nullbulk);
}
} else {
serverPanic("Unknown list encoding");
}
}

LSET命令

将列表 key 下标为 index 的元素的值设置为 value

index 参数超出范围,或对一个空列表( key 不存在)进行 LSET 时,返回一个错误。

操作成功返回 ok ,否则返回错误信息。

  • sflagswm,写命令,并可能导致增加内存的使用。

命令形式

1
LSET key index value

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// t_list.c
void lsetCommand(client *c) {

// 获取对应的key,并判断类型
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_LIST)) return;
long index;
robj *value = c->argv[3];

// 命令中获取参数index
if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK))
return;

// list的实现,只剩下quick list
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = o->ptr;
// 调用快速列表的方法来替换元素
int replaced = quicklistReplaceAtIndex(ql, index,
value->ptr, sdslen(value->ptr));
if (!replaced) {
// 替换失败
addReply(c,shared.outofrangeerr);
} else {
// 返回成功
addReply(c,shared.ok);
// 标记key改变和触发事件
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,"lset",c->argv[1],c->db->id);
server.dirty++;
}
} else {
serverPanic("Unknown list encoding");
}
}

LRANGE命令

返回列表 key 中指定区间内的元素,区间以偏移量 startstop 指定。

下标(index)参数 startstop 都以 0 为底,也就是说,以 0 表示列表的第一个元素,以 1 表示列表的第二个元素,以此类推。

你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推。

超出范围的下标

超出范围的下标值不会引起错误。

如果 start 下标比列表的最大下标 end ( LLEN list 减去 1 )还要大,那么 LRANGE 返回一个空列表。

如果 stop 下标比 end 下标还要大,Redis将 stop 的值设置为 end

  • sflagsr,只读命令。

命令形式

1
LRANGE key start stop

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// t_list.c
void lrangeCommand(client *c) {
robj *o;
long start, end, llen, rangelen;

// 从命令中获取 start 和 end 的值
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;

// 获取和判断key的类型
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
|| checkType(c,o,OBJ_LIST)) return;

// key对应list的长度
llen = listTypeLength(o);

// 计算范围..
/* convert negative indexes */
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;

/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
addReply(c,shared.emptymultibulk);
return;
}
if (end >= llen) end = llen-1;
rangelen = (end-start)+1;

/* Return the result in form of a multi-bulk reply */
addReplyMultiBulkLen(c,rangelen);
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
// 获取list的迭代器
listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL);

// 迭代获取list中的值
while(rangelen--) {
listTypeEntry entry;
listTypeNext(iter, &entry);
quicklistEntry *qe = &entry.entry;
if (qe->value) {
addReplyBulkCBuffer(c,qe->value,qe->sz);
} else {
addReplyBulkLongLong(c,qe->longval);
}
}
listTypeReleaseIterator(iter);
} else {
serverPanic("List encoding is not QUICKLIST!");
}
}

LTRIM命令

让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。

  • sflagsw,写命令。

命令形式

1
LTRIM key start stop

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// t_list.c
void ltrimCommand(client *c) {
robj *o;
long start, end, llen, ltrim, rtrim;

// 从命令参数中获取start/stop偏移量
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return;

// 获取对应的key并判断类型
if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
checkType(c,o,OBJ_LIST)) return;
llen = listTypeLength(o);

// 将相对的start/stop转换成绝对的
/* convert negative indexes */
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;

/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
/* Out of range start or start > end result in empty list */
ltrim = llen;
rtrim = 0;
} else {
if (end >= llen) end = llen-1;
ltrim = start;
rtrim = llen-end-1;
}

// ltrim: 左边开始删除几个元素
// rtrim: 右边开始删除几个元素
/* Remove list elements to perform the trim */
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
// quicklistDelRange(quicklist *quicklist, const long start, const long count)
// 所以quicklistDelRange是从start开始,删除count个
quicklistDelRange(o->ptr,0,ltrim);
quicklistDelRange(o->ptr,-rtrim,rtrim);
} else {
serverPanic("Unknown list encoding");
}

// 触发键空间改变事件
notifyKeyspaceEvent(NOTIFY_LIST,"ltrim",c->argv[1],c->db->id);
if (listTypeLength(o) == 0) {
// list 中没有数据, 删除
dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
}
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
addReply(c,shared.ok);
}

LREM命令

根据参数 count 的值,移除列表中与参数 value 相等的元素。

count 的值可以是以下几种:

  • count > 0 : 从表头开始向表尾搜索,移除与 value 相等的元素,数量为 count
  • count < 0 : 从表尾开始向表头搜索,移除与 value 相等的元素,数量为 count 的绝对值。
  • count = 0 : 移除表中所有与 value 相等的值。
  • sflagsw,写命令。

命令形式

1
LREM key count value

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// t_list.c
void lremCommand(client *c) {
robj *subject, *obj;
obj = c->argv[3];
long toremove;
long removed = 0;

// 从命令中获取要删除的元素个数toremove
if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != C_OK))
return;

// 从命令中获取key,并判断类型
subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
if (subject == NULL || checkType(c,subject,OBJ_LIST)) return;

// 获取迭代器
// listTypeInitIterator()其实只是封装了quick list的迭代器
listTypeIterator *li;
if (toremove < 0) {
toremove = -toremove;
li = listTypeInitIterator(subject,-1,LIST_HEAD);
} else {
li = listTypeInitIterator(subject,0,LIST_TAIL);
}

// 迭代元素并删除对应数量(toremove)指定的值(obj)
listTypeEntry entry;
while (listTypeNext(li,&entry)) {
if (listTypeEqual(&entry,obj)) {
listTypeDelete(li, &entry);
server.dirty++;
removed++;
if (toremove && removed == toremove) break;
}
}
listTypeReleaseIterator(li);

if (removed) {
// 如果成功删除元素,触发事件
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,"lrem",c->argv[1],c->db->id);
}

// list为空,直接删除
if (listTypeLength(subject) == 0) {
dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
}

addReplyLongLong(c,removed);
}

RPOPLPUSH命令

命令 RPOPLPUSH 在一个原子时间内,执行以下两个动作:

将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端。
source 弹出的元素插入到列表 destination ,作为 destination 列表的的头元素。

  • sflagswm,写命令,并可能导致内存使用增加。

命令形式

1
RPOPLPUSH source destination

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// t_list.c
void rpoplpushCommand(client *c) {
robj *sobj, *value;

// 从命令中获取指定的 source key
if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
checkType(c,sobj,OBJ_LIST)) return;

if (listTypeLength(sobj) == 0) {
// 这个key的list长度为0,一般情况下不会出现,具体场景可以看下面的注释
/* This may only happen after loading very old RDB files. Recent
* versions of Redis delete keys of empty lists. */
addReply(c,shared.nullbulk);
} else {
// 从命令中获取指定的 destination key
robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
robj *touchedkey = c->argv[1];

// 检查 destination key 的类型
// PS:checkType()返回0表示 是指定类型
if (dobj && checkType(c,dobj,OBJ_LIST)) return;
// 从 source key 末尾获取元素 value
value = listTypePop(sobj,LIST_TAIL);
/* We saved touched key, and protect it, since rpoplpushHandlePush
* may change the client command argument vector (it does not
* currently). */
incrRefCount(touchedkey);
// 插入 destination key 的列表头
rpoplpushHandlePush(c,c->argv[2],dobj,value);

/* listTypePop returns an object with its refcount incremented */
decrRefCount(value);

/* Delete the source list when it is empty */
notifyKeyspaceEvent(NOTIFY_LIST,"rpop",touchedkey,c->db->id);
if (listTypeLength(sobj) == 0) {
dbDelete(c->db,touchedkey);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
touchedkey,c->db->id);
}
signalModifiedKey(c->db,touchedkey);
decrRefCount(touchedkey);
server.dirty++;
if (c->cmd->proc == brpoplpushCommand) {
rewriteClientCommandVector(c,3,shared.rpoplpush,c->argv[1],c->argv[2]);
}
}
}

SADD命令

将一个或多个 member 元素加入到集合 key 当中,已经存在于集合的 member 元素将被忽略。

假如 key 不存在,则创建一个只包含 member 元素作成员的集合。

key 不是集合类型时,返回一个错误。

  • sflagswmF,写命令,并可能导致内存使用增加。快速执行,不应该被延迟。

命令形式

1
SADD key member [member ...]

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// t_set.c
void saddCommand(client *c) {
robj *set;
int j, added = 0;

// 从命令中获取想要设置的set的key
set = lookupKeyWrite(c->db,c->argv[1]);
if (set == NULL) {
// 如果set是NULL, 那么根据第一个元素的类型, 返回一个Set
// 主要有2中Set, Integer和Str
set = setTypeCreate(c->argv[2]->ptr);
// 添加到db中
dbAdd(c->db,c->argv[1],set);
} else {
// set已经存在,判断类型
if (set->type != OBJ_SET) {
addReply(c,shared.wrongtypeerr);
return;
}
}

// 将命令中的元素存入set, 并统计成功添加的个数added
for (j = 2; j < c->argc; j++) {
if (setTypeAdd(set,c->argv[j]->ptr)) added++;
}
if (added) {
// 触发事件
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
}
server.dirty += added;
addReplyLongLong(c,added);
}

SREM命令

移除集合 key 中的一个或多个 member 元素,不存在的 member 元素会被忽略。

key 不是集合类型,返回一个错误。

  • sflagswF,写命令。快速执行,不应该被延迟。

命令形式

1
SREM key member [member ...]

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// t_set.c
void sremCommand(client *c) {
robj *set;
int j, deleted = 0, keyremoved = 0;

// 获取要移除元素的Set key并判断类型
if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,set,OBJ_SET)) return;

// 依次在set中删除命令中的key
for (j = 2; j < c->argc; j++) {
if (setTypeRemove(set,c->argv[j]->ptr)) {
deleted++;
if (setTypeSize(set) == 0) {
// 如果set已经是空了,直接删除set
dbDelete(c->db,c->argv[1]);
keyremoved = 1;
break;
}
}
}
if (deleted) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id);
if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],
c->db->id);
server.dirty += deleted;
}
addReplyLongLong(c,deleted);
}

SMOVE命令

member 元素从 source 集合移动到 destination 集合。

SMOVE 是原子性操作。

如果 source 集合不存在或不包含指定的 member 元素,则 SMOVE 命令不执行任何操作,仅返回 0 。否则, member 元素从 source 集合中被移除,并添加到 destination 集合中去。

destination 集合已经包含 member 元素时, SMOVE 命令只是简单地将 source 集合中的 member 元素删除。

sourcedestination 不是集合类型时,返回一个错误。

  • sflagswF,写命令。快速执行,不应该被延迟。

命令形式

1
SMOVE source destination member

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// t_set.c
void smoveCommand(client *c) {
robj *srcset, *dstset, *ele;
srcset = lookupKeyWrite(c->db,c->argv[1]);
dstset = lookupKeyWrite(c->db,c->argv[2]);
ele = c->argv[3];

// 检查source key是否存在
/* If the source key does not exist return 0 */
if (srcset == NULL) {
addReply(c,shared.czero);
return;
}

// 检查source key和destination key的类型(destination key允许为NULL)
/* If the source key has the wrong type, or the destination key
* is set and has the wrong type, return with an error. */
if (checkType(c,srcset,OBJ_SET) ||
(dstset && checkType(c,dstset,OBJ_SET))) return;


// 如果source key和destination key是同一个Set,那么元素不需要移动,但需要检查元素ele是否真的在Set中
/* If srcset and dstset are equal, SMOVE is a no-op */
if (srcset == dstset) {
addReply(c,setTypeIsMember(srcset,ele->ptr) ?
shared.cone : shared.czero);
return;
}

// ele不能从source key中删除,说明ele不存在于source key
/* If the element cannot be removed from the src set, return 0. */
if (!setTypeRemove(srcset,ele->ptr)) {
addReply(c,shared.czero);
return;
}
notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id);

// 检查source key的大小, 为0则删除key
/* Remove the src set from the database when empty */
if (setTypeSize(srcset) == 0) {
dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
}

// 判断是否需要创建destination key
/* Create the destination set when it doesn't exist */
if (!dstset) {
dstset = setTypeCreate(ele->ptr);
dbAdd(c->db,c->argv[2],dstset);
}

signalModifiedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[2]);
server.dirty++;

// 将ele添加到destination key中
/* An extra key has changed when ele was successfully added to dstset */
if (setTypeAdd(dstset,ele->ptr)) {
server.dirty++;
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[2],c->db->id);
}
addReply(c,shared.cone);
}

SISMEMBER命令

判断 member 元素是否集合 key 的成员。

  • sflagsrF,读命令。快速执行,不应该被延迟。

命令形式

1
SISMEMBER key member

内部实现函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// t_set.c

void sismemberCommand(client *c) {
robj *set;

// 检查Set key是否存在并判断类型
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,set,OBJ_SET)) return;

// 判断元素是否在Set中
if (setTypeIsMember(set,c->argv[2]->ptr))
addReply(c,shared.cone);
else
addReply(c,shared.czero);
}

SCARD命令

返回集合 key 中元素的数量。

  • sflagsrF,读命令。快速执行,不应该被延迟。

命令形式

1
SCARD key

内部实现函数

1
2
3
4
5
6
7
8
9
10
// t_set.c
void scardCommand(client *c) {
robj *o;

if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,o,OBJ_SET)) return;

// setTypeSize()封装了hash table和int set的获取size方法
addReplyLongLong(c,setTypeSize(o));
}

SPOP命令

移除并返回集合 key 中的一个或者多个随机元素。

  • sflagswRF, 写命令,并且是随机的。快速执行,不应该被延迟。

命令形式

1
SPOP key [count]

内部实现函数

主要分为两个函数:

  • spopCommand()处理随机移出1个;
  • spopWithCountCommand()处理随机移出count个;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// t_set.c
void spopCommand(client *c) {
robj *set, *ele, *aux;
sds sdsele;
int64_t llele;
int encoding;

// 如果命令中带有count参数,则交给`spopWithCountCommand()`处理
if (c->argc == 3) {
spopWithCountCommand(c);
return;
} else if (c->argc > 3) {
// 命令语法错误
addReply(c,shared.syntaxerr);
return;
}

// 获取key并判断是否存在和类型
/* Make sure a key with the name inputted exists, and that it's type is
* indeed a set */
if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
checkType(c,set,OBJ_SET)) return;

// 在set中获取一个随机元素(详细随机的实现没看)
/* Get a random element from the set */
encoding = setTypeRandomElement(set,&sdsele,&llele);

// 判断set的编码类型
/* Remove the element from the set */
if (encoding == OBJ_ENCODING_INTSET) {
// 移出元素
ele = createStringObjectFromLongLong(llele);
set->ptr = intsetRemove(set->ptr,llele,NULL);
} else {
// 移出元素
// setTypeRemove()其实是支持OBJ_ENCODING_INTSET的
// 但传入的值类型是sds, 所以上面直接调用了intsetRemove()
ele = createStringObject(sdsele,sdslen(sdsele));
setTypeRemove(set,ele->ptr);
}

// 触发事件
notifyKeyspaceEvent(NOTIFY_SET,"spop",c->argv[1],c->db->id);

// 对于这个删除元素实现,使用SREM写到AOF/复制中(因为SPOP是随机的,不幂等的)
/* Replicate/AOF this command as an SREM operation */
aux = createStringObject("SREM",4);
rewriteClientCommandVector(c,3,aux,c->argv[1],ele);
decrRefCount(aux);

/* Add the element to the reply */
addReplyBulk(c,ele);
decrRefCount(ele);

// 判断Set的大小,为0则直接删除
/* Delete the set if it's empty */
if (setTypeSize(set) == 0) {
dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
}

/* Set has been modified */
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// t_set.c
/* Handle the "SPOP key <count>" variant. The normal version of the
* command is handled by the spopCommand() function itself. */

/* How many times bigger should be the set compared to the remaining size
* for us to use the "create new set" strategy? Read later in the
* implementation for more info. */
#define SPOP_MOVE_STRATEGY_MUL 5

// 对于带count参数的SPOP命令处理
void spopWithCountCommand(client *c) {
long l;
unsigned long count, size;
robj *set;

// 获取参数count
/* Get the count argument */
if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != C_OK) return;
if (l >= 0) {
count = (unsigned long) l;
} else {
addReply(c,shared.outofrangeerr);
return;
}

// 获取指定的Set key并判断类型
/* Make sure a key with the name inputted exists, and that it's type is
* indeed a set. Otherwise, return nil */
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk))
== NULL || checkType(c,set,OBJ_SET)) return;

/* If count is zero, serve an empty multibulk ASAP to avoid special
* cases later. */
if (count == 0) {
addReply(c,shared.emptymultibulk);
return;
}

// size为set的原本大小
size = setTypeSize(set);

/* Generate an SPOP keyspace notification */
notifyKeyspaceEvent(NOTIFY_SET,"spop",c->argv[1],c->db->id);
server.dirty += count;

/* CASE 1:
* The number of requested elements is greater than or equal to
* the number of elements inside the set: simply return the whole set. */
if (count >= size) {
// 直接返回整个set
/* We just return the entire set */
sunionDiffGenericCommand(c,c->argv+1,1,NULL,SET_OP_UNION);

// 删除这个set
/* Delete the set as it is now empty */
dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],c->db->id);

// 重写删除到命令
/* Propagate this command as an DEL operation */
rewriteClientCommandVector(c,2,shared.del,c->argv[1]);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
return;
}

// 这里propargv[]是构造了一个写到AOF等的参数数据,[0]是SREM命令,[1]是指定的key
/* Case 2 and 3 require to replicate SPOP as a set of SREM commands.
* Prepare our replication argument vector. Also send the array length
* which is common to both the code paths. */
robj *propargv[3];
propargv[0] = createStringObject("SREM",4);
propargv[1] = c->argv[1];
addReplyMultiBulkLen(c,count);

/* Common iteration vars. */
sds sdsele;
robj *objele;
int encoding;
int64_t llele;
// remaining SPOP后, Set最终会剩余的数量
unsigned long remaining = size-count; /* Elements left after SPOP. */


// 剩下两个方法:
// 1.直接在Set中循环删除随机count个元素
// 2.随机抽取remaining个元素,组成新的Set并保存指定的key中,然后将旧的返回
// remaining*SPOP_MOVE_STRATEGY_MUL相当于一个比例参数(魔法数字?)

/* If we are here, the number of requested elements is less than the
* number of elements inside the set. Also we are sure that count < size.
* Use two different strategies.
*
* CASE 2: The number of elements to return is small compared to the
* set size. We can just extract random elements and return them to
* the set. */
if (remaining*SPOP_MOVE_STRATEGY_MUL > count) {
// 直接在Set中循环删除随机count个元素
while(count--) {
/* Emit and remove. */
encoding = setTypeRandomElement(set,&sdsele,&llele);
if (encoding == OBJ_ENCODING_INTSET) {
addReplyBulkLongLong(c,llele);
objele = createStringObjectFromLongLong(llele);
set->ptr = intsetRemove(set->ptr,llele,NULL);
} else {
addReplyBulkCBuffer(c,sdsele,sdslen(sdsele));
objele = createStringObject(sdsele,sdslen(sdsele));
setTypeRemove(set,sdsele);
}

/* Replicate/AOF this command as an SREM operation */
propargv[2] = objele;
alsoPropagate(server.sremCommand,c->db->id,propargv,3,
PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(objele);
}
} else {
/* CASE 3: The number of elements to return is very big, approaching
* the size of the set itself. After some time extracting random elements
* from such a set becomes computationally expensive, so we use
* a different strategy, we extract random elements that we don't
* want to return (the elements that will remain part of the set),
* creating a new set as we do this (that will be stored as the original
* set). Then we return the elements left in the original set and
* release it. */
// 随机抽取remaining个元素,组成新的Set并保存指定的key中,然后将旧的返回
robj *newset = NULL;

/* Create a new set with just the remaining elements. */
while(remaining--) {
encoding = setTypeRandomElement(set,&sdsele,&llele);
if (encoding == OBJ_ENCODING_INTSET) {
sdsele = sdsfromlonglong(llele);
} else {
sdsele = sdsdup(sdsele);
}
if (!newset) newset = setTypeCreate(sdsele);
setTypeAdd(newset,sdsele);
setTypeRemove(set,sdsele);
sdsfree(sdsele);
}

/* Assign the new set as the key value. */
incrRefCount(set); /* Protect the old set value. */
dbOverwrite(c->db,c->argv[1],newset);

/* Tranfer the old set to the client and release it. */
setTypeIterator *si;
si = setTypeInitIterator(set);
while((encoding = setTypeNext(si,&sdsele,&llele)) != -1) {
if (encoding == OBJ_ENCODING_INTSET) {
addReplyBulkLongLong(c,llele);
objele = createStringObjectFromLongLong(llele);
} else {
addReplyBulkCBuffer(c,sdsele,sdslen(sdsele));
objele = createStringObject(sdsele,sdslen(sdsele));
}

/* Replicate/AOF this command as an SREM operation */
propargv[2] = objele;
alsoPropagate(server.sremCommand,c->db->id,propargv,3,
PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(objele);
}
setTypeReleaseIterator(si);
decrRefCount(set);
}

/* Don't propagate the command itself even if we incremented the
* dirty counter. We don't want to propagate an SPOP command since
* we propagated the command as a set of SREMs operations using
* the alsoPropagate() API. */
decrRefCount(propargv[0]);
preventCommandPropagation(c);
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}

SRANDMEMBER命令

如果命令执行时,只提供了 key 参数,那么返回集合中的一个随机元素。

如果 count 为正数,且小于集合基数,那么命令返回一个包含 count 个元素的数组,数组中的元素各不相同。如果 count 大于等于集合基数,那么返回整个集合。
如果 count 为负数,那么命令返回一个数组,数组中的元素可能会重复出现多次,而数组的长度为 count 的绝对值。

  • sflagsrR,读命令,而且随机不幂等。

命令形式

1
SRANDMEMBER key [count]

内部实现函数

主要分为两个函数:

  • srandmemberCommand()处理随机返回1个;
  • srandmemberWithCountCommand()处理随机返回count个;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// t_set.c
void srandmemberCommand(client *c) {
robj *set;
sds ele;
int64_t llele;
int encoding;

if (c->argc == 3) {
// 当存在count参数时,交由srandmemberWithCountCommand()处理
srandmemberWithCountCommand(c);
return;
} else if (c->argc > 3) {
addReply(c,shared.syntaxerr);
return;
}

// 获取指定的Set key并判断类型
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
checkType(c,set,OBJ_SET)) return;

// 随机取出一个元素并返回Set的编码类型
encoding = setTypeRandomElement(set,&ele,&llele);
if (encoding == OBJ_ENCODING_INTSET) {
addReplyBulkLongLong(c,llele);
} else {
addReplyBulkCBuffer(c,ele,sdslen(ele));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// t_set.c
/* handle the "SRANDMEMBER key <count>" variant. The normal version of the
* command is handled by the srandmemberCommand() function itself. */

/* How many times bigger should be the set compared to the requested size
* for us to don't use the "remove elements" strategy? Read later in the
* implementation for more info. */
#define SRANDMEMBER_SUB_STRATEGY_MUL 3

void srandmemberWithCountCommand(client *c) {
long l;
unsigned long count, size;
int uniq = 1;
robj *set;
sds ele;
int64_t llele;
int encoding;

dict *d;

// 获取返回格式count和uniq(用于标识是否返回重复数据, 1: 不重复 0: 重复)
if (getLongFromObjectOrReply(c,c->argv[2],&l,NULL) != C_OK) return;
if (l >= 0) {
count = (unsigned long) l;
} else {
/* A negative count means: return the same elements multiple times
* (i.e. don't remove the extracted element after every extraction). */
count = -l;
uniq = 0;
}

// 获取指定的Set key并判断是否存在和类型
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk))
== NULL || checkType(c,set,OBJ_SET)) return;
size = setTypeSize(set);

/* If count is zero, serve it ASAP to avoid special cases later. */
if (count == 0) {
addReply(c,shared.emptymultibulk);
return;
}

// uniq == 0, 允许返回重复的数据,所以简单执行count次就可以了
/* CASE 1: The count was negative, so the extraction method is just:
* "return N random elements" sampling the whole set every time.
* This case is trivial and can be served without auxiliary data
* structures. */
if (!uniq) {
addReplyMultiBulkLen(c,count);
while(count--) {
encoding = setTypeRandomElement(set,&ele,&llele);
if (encoding == OBJ_ENCODING_INTSET) {
addReplyBulkLongLong(c,llele);
} else {
addReplyBulkCBuffer(c,ele,sdslen(ele));
}
}
return;
}

// 但cout >= size,需要返回的数量大于size数,而且又不重复
// 直接返回这个set就可以了
/* CASE 2:
* The number of requested elements is greater than the number of
* elements inside the set: simply return the whole set. */
if (count >= size) {
sunionDiffGenericCommand(c,c->argv+1,1,NULL,SET_OP_UNION);
return;
}

// 后面的case,需要一个字典来保存
/* For CASE 3 and CASE 4 we need an auxiliary dictionary. */
d = dictCreate(&objectKeyPointerValueDictType,NULL);

// 剩下两个方法:
// case 3: 由于指定的count很大(但小于size),所以直接将set保存到字典d中,然后随机删除size-count个元素
// case 4: 由于指定的count相对于size很小(但小于size),所以直接随机在set中读取元素,保存到字典d中,直到d的大小达到count
// count*SRANDMEMBER_SUB_STRATEGY_MUL相当于一个比例参数(魔法数字?)

/* CASE 3:
* The number of elements inside the set is not greater than
* SRANDMEMBER_SUB_STRATEGY_MUL times the number of requested elements.
* In this case we create a set from scratch with all the elements, and
* subtract random elements to reach the requested number of elements.
*
* This is done because if the number of requsted elements is just
* a bit less than the number of elements in the set, the natural approach
* used into CASE 3 is highly inefficient. */
if (count*SRANDMEMBER_SUB_STRATEGY_MUL > size) {
setTypeIterator *si;

/* Add all the elements into the temporary dictionary. */
si = setTypeInitIterator(set);
while((encoding = setTypeNext(si,&ele,&llele)) != -1) {
int retval = DICT_ERR;

if (encoding == OBJ_ENCODING_INTSET) {
retval = dictAdd(d,createStringObjectFromLongLong(llele),NULL);
} else {
retval = dictAdd(d,createStringObject(ele,sdslen(ele)),NULL);
}
serverAssert(retval == DICT_OK);
}
setTypeReleaseIterator(si);
serverAssert(dictSize(d) == size);

// 删除size-count个元素
/* Remove random elements to reach the right count. */
while(size > count) {
dictEntry *de;

de = dictGetRandomKey(d);
dictDelete(d,dictGetKey(de));
size--;
}
}

/* CASE 4: We have a big set compared to the requested number of elements.
* In this case we can simply get random elements from the set and add
* to the temporary set, trying to eventually get enough unique elements
* to reach the specified count. */
else {
unsigned long added = 0;
robj *objele;

while(added < count) {
encoding = setTypeRandomElement(set,&ele,&llele);
if (encoding == OBJ_ENCODING_INTSET) {
objele = createStringObjectFromLongLong(llele);
} else {
objele = createStringObject(ele,sdslen(ele));
}
/* Try to add the object to the dictionary. If it already exists
* free it, otherwise increment the number of objects we have
* in the result dictionary. */
if (dictAdd(d,objele,NULL) == DICT_OK)
added++;
else
decrRefCount(objele);
}
}

/* CASE 3 & 4: send the result to the user. */
{
dictIterator *di;
dictEntry *de;

addReplyMultiBulkLen(c,count);
di = dictGetIterator(d);
while((de = dictNext(di)) != NULL)
addReplyBulk(c,dictGetKey(de));
dictReleaseIterator(di);
dictRelease(d);
}
}

SINTER命令

返回一个集合的全部成员,该集合是所有给定集合的交集。

不存在的 key 被视为空集。

当给定集合当中有一个空集时,结果也为空集(根据集合运算定律)。

  • sflagsrS,读命令,具有幂等性。

命令形式

1
SINTER key [key ...]

内部实现函数

1
2
3
4
// t_set.c
void sinterCommand(client *c) {
sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
}

具体实现,其实交由sinterGenericCommand(),与命令SINTERSTORE相同

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// t_set.c
void sinterGenericCommand(client *c, robj **setkeys,
unsigned long setnum, robj *dstkey) {
robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si;
robj *dstset = NULL;
sds elesds;
int64_t intobj;
void *replylen = NULL;
unsigned long j, cardinality = 0;
int encoding;

for (j = 0; j < setnum; j++) {
robj *setobj = dstkey ?
lookupKeyWrite(c->db,setkeys[j]) :
lookupKeyRead(c->db,setkeys[j]);
if (!setobj) {
// 处理不存在的key, 当成空集,直接返回空
zfree(sets);
if (dstkey) {
// 如果有dstkey,则尝试在db中删除
if (dbDelete(c->db,dstkey)) {
signalModifiedKey(c->db,dstkey);
server.dirty++;
}
addReply(c,shared.czero);
} else {
addReply(c,shared.emptymultibulk);
}
return;
}
if (checkType(c,setobj,OBJ_SET)) {
zfree(sets);
return;
}
sets[j] = setobj;
}
// 根据set的size进行排序,从小到大(这样计算交集,需要处理的元素更少)
/* Sort sets from the smallest to largest, this will improve our
* algorithm's performance */
qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality);

// 根据dstkey判断是否需要将得到的交集保存(SINTER/SINTERSTORE)
/* The first thing we should output is the total number of elements...
* since this is a multi-bulk write, but at this stage we don't know
* the intersection set size, so we use a trick, append an empty object
* to the output list and save the pointer to later modify it with the
* right length */
if (!dstkey) {
replylen = addDeferredMultiBulkLength(c);
} else {
/* If we have a target key where to store the resulting set
* create this key with an empty set inside */
dstset = createIntsetObject();
}

/* Iterate all the elements of the first (smallest) set, and test
* the element against all the other sets, if at least one set does
* not include the element it is discarded */
// 获取最小的set的迭代器si
si = setTypeInitIterator(sets[0]);
while((encoding = setTypeNext(si,&elesds,&intobj)) != -1) {
// 迭代si,并判断si中的每个元素,是否在其他集合中
for (j = 1; j < setnum; j++) {
if (sets[j] == sets[0]) continue;
if (encoding == OBJ_ENCODING_INTSET) {
/* intset with intset is simple... and fast */
if (sets[j]->encoding == OBJ_ENCODING_INTSET &&
!intsetFind((intset*)sets[j]->ptr,intobj))
{
break;
/* in order to compare an integer with an object we
* have to use the generic function, creating an object
* for this */
} else if (sets[j]->encoding == OBJ_ENCODING_HT) {
elesds = sdsfromlonglong(intobj);
if (!setTypeIsMember(sets[j],elesds)) {
sdsfree(elesds);
break;
}
sdsfree(elesds);
}
} else if (encoding == OBJ_ENCODING_HT) {
if (!setTypeIsMember(sets[j],elesds)) {
break;
}
}
}

/* Only take action when all sets contain the member */
// 当si的一个元素,在sets[]中都遍历了,并且存在,则向客户端返回改元素or加入到指定的dstkey中
if (j == setnum) {
if (!dstkey) {
if (encoding == OBJ_ENCODING_HT)
addReplyBulkCBuffer(c,elesds,sdslen(elesds));
else
addReplyBulkLongLong(c,intobj);
cardinality++;
} else {
if (encoding == OBJ_ENCODING_INTSET) {
elesds = sdsfromlonglong(intobj);
setTypeAdd(dstset,elesds);
sdsfree(elesds);
} else {
setTypeAdd(dstset,elesds);
}
}
}
}
setTypeReleaseIterator(si);

if (dstkey) {
/* Store the resulting set into the target, if the intersection
* is not an empty set. */
int deleted = dbDelete(c->db,dstkey);
if (setTypeSize(dstset) > 0) {
// 当dstset是有值的,保存到db中
dbAdd(c->db,dstkey,dstset);
addReplyLongLong(c,setTypeSize(dstset));
notifyKeyspaceEvent(NOTIFY_SET,"sinterstore",
dstkey,c->db->id);
} else {
decrRefCount(dstset);
addReply(c,shared.czero);
if (deleted)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
dstkey,c->db->id);
}
signalModifiedKey(c->db,dstkey);
server.dirty++;
} else {
setDeferredMultiBulkLength(c,replylen,cardinality);
}
zfree(sets);
}

SINTERSTORE命令

这个命令类似于 SINTER 命令,但它将结果保存到 destination 集合,而不是简单地返回结果集。

如果 destination 集合已经存在,则将其覆盖。

destination 可以是 key 本身。

  • sflagswm,写命令,并可能导致内存使用增加。

命令形式

1
SINTERSTORE destination key [key ...]

内部实现函数

1
2
3
4
// t_set.c
void sinterstoreCommand(client *c) {
sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
}

具体参考SINTER命令。

SUNION命令

返回一个集合的全部成员,该集合是所有给定集合的并集。

不存在的 key 被视为空集。

  • sflagsrS,读命令,具有幂等性。

命令形式

1
SUNION key [key ...]

内部实现函数

对于SUNION,SUNIONSTORE,SDIFF,SDIFFSTORE其实是公用一个函数sunionDiffGenericCommand()实现的。

1
2
3
4
// t_set.c
void sunionCommand(client *c) {
sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,SET_OP_UNION);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// t_set.c
// sunionCommand() -> sunionDiffGenericCommand()
void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
robj *dstkey, int op) {
robj **sets = zmalloc(sizeof(robj*)*setnum);
setTypeIterator *si;
robj *dstset = NULL;
sds ele;
int j, cardinality = 0;
int diff_algo = 1;

// 检查指定的Set key是否存在和类型是否正确,并将其保存到set[]中(不存在的key,会在对应位置保存NULL指针)
for (j = 0; j < setnum; j++) {
robj *setobj = dstkey ?
lookupKeyWrite(c->db,setkeys[j]) :
lookupKeyRead(c->db,setkeys[j]);
if (!setobj) {
sets[j] = NULL;
continue;
}
if (checkType(c,setobj,OBJ_SET)) {
zfree(sets);
return;
}
sets[j] = setobj;
}

// 下面这段函数,是用来选择SDIFF的计算方式的:
// 方式1:
// * 选取第一个Set, 然后迭代set[0]的元素, 在set[j](0<j<setnum)中查找是否存在,如果均不存在,则保存到dstkey中
// * 时间复杂度是O(N*M), N是set[0]的size, M是剩余set的总size
// 方式2:
// * 将第一个set的所有元素添加到一个临时的set中,然后在迭代后面set[j](0<j<setnum)中,并尝试将元素从临时set中删除
// * 时间复杂度是O(N) N是所有set的总size
/* Select what DIFF algorithm to use.
*
* Algorithm 1 is O(N*M) where N is the size of the element first set
* and M the total number of sets.
*
* Algorithm 2 is O(N) where N is the total number of elements in all
* the sets.
*
* We compute what is the best bet with the current input here. */
if (op == SET_OP_DIFF && sets[0]) {
long long algo_one_work = 0, algo_two_work = 0;

for (j = 0; j < setnum; j++) {
if (sets[j] == NULL) continue;

algo_one_work += setTypeSize(sets[0]);
algo_two_work += setTypeSize(sets[j]);
}

/* Algorithm 1 has better constant times and performs less operations
* if there are elements in common. Give it some advantage. */
algo_one_work /= 2;
diff_algo = (algo_one_work <= algo_two_work) ? 1 : 2;

if (diff_algo == 1 && setnum > 1) {
/* With algorithm 1 it is better to order the sets to subtract
* by decreasing size, so that we are more likely to find
* duplicated elements ASAP. */
qsort(sets+1,setnum-1,sizeof(robj*),
qsortCompareSetsByRevCardinality);
}
}

// 用来保存临时数组等的set
/* We need a temp set object to store our union. If the dstkey
* is not NULL (that is, we are inside an SUNIONSTORE operation) then
* this set object will be the resulting object to set into the target key*/
dstset = createIntsetObject();


if (op == SET_OP_UNION) {
// UNION是最简单的,只要遍历所有元素,存到dstset就可以了
/* Union is trivial, just add every element of every set to the
* temporary set. */
for (j = 0; j < setnum; j++) {
if (!sets[j]) continue; /* non existing keys are like empty sets */

si = setTypeInitIterator(sets[j]);
while((ele = setTypeNextObject(si)) != NULL) {
if (setTypeAdd(dstset,ele)) cardinality++;
sdsfree(ele);
}
setTypeReleaseIterator(si);
}
} else if (op == SET_OP_DIFF && sets[0] && diff_algo == 1) {
// DIFF的方案1
/* DIFF Algorithm 1:
*
* We perform the diff by iterating all the elements of the first set,
* and only adding it to the target set if the element does not exist
* into all the other sets.
*
* This way we perform at max N*M operations, where N is the size of
* the first set, and M the number of sets. */
si = setTypeInitIterator(sets[0]);
while((ele = setTypeNextObject(si)) != NULL) {
for (j = 1; j < setnum; j++) {
if (!sets[j]) continue; /* no key is an empty set. */
if (sets[j] == sets[0]) break; /* same set! */
if (setTypeIsMember(sets[j],ele)) break;
}
if (j == setnum) {
/* There is no other set with this element. Add it. */
setTypeAdd(dstset,ele);
cardinality++;
}
sdsfree(ele);
}
setTypeReleaseIterator(si);
} else if (op == SET_OP_DIFF && sets[0] && diff_algo == 2) {
// DIFF的方案2
/* DIFF Algorithm 2:
*
* Add all the elements of the first set to the auxiliary set.
* Then remove all the elements of all the next sets from it.
*
* This is O(N) where N is the sum of all the elements in every
* set. */
for (j = 0; j < setnum; j++) {
if (!sets[j]) continue; /* non existing keys are like empty sets */

si = setTypeInitIterator(sets[j]);
while((ele = setTypeNextObject(si)) != NULL) {
if (j == 0) {
if (setTypeAdd(dstset,ele)) cardinality++;
} else {
if (setTypeRemove(dstset,ele)) cardinality--;
}
sdsfree(ele);
}
setTypeReleaseIterator(si);

/* Exit if result set is empty as any additional removal
* of elements will have no effect. */
if (cardinality == 0) break;
}
}

// 判断是否需要将结果写到一个dstkey
/* Output the content of the resulting set, if not in STORE mode */
if (!dstkey) {
addReplyMultiBulkLen(c,cardinality);
si = setTypeInitIterator(dstset);
while((ele = setTypeNextObject(si)) != NULL) {
addReplyBulkCBuffer(c,ele,sdslen(ele));
sdsfree(ele);
}
setTypeReleaseIterator(si);
decrRefCount(dstset);
} else {
/* If we have a target key where to store the resulting set
* create this key with the result set inside */
int deleted = dbDelete(c->db,dstkey);
if (setTypeSize(dstset) > 0) {
dbAdd(c->db,dstkey,dstset);
addReplyLongLong(c,setTypeSize(dstset));
notifyKeyspaceEvent(NOTIFY_SET,
op == SET_OP_UNION ? "sunionstore" : "sdiffstore",
dstkey,c->db->id);
} else {
decrRefCount(dstset);
addReply(c,shared.czero);
if (deleted)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
dstkey,c->db->id);
}
signalModifiedKey(c->db,dstkey);
server.dirty++;
}
zfree(sets);
}

SUNIONSTORE命令

这个命令类似于 SUNION 命令,但它将结果保存到 destination 集合,而不是简单地返回结果集。

如果 destination 已经存在,则将其覆盖。

destination 可以是 key 本身。

  • sflagswm,写命令,并可能增加内存使用。

命令形式

1
SUNIONSTORE destination key [key ...]

内部实现函数

详细参考SUNION命令。

1
2
3
4
// t_set.c
void sunionstoreCommand(client *c) {
sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],SET_OP_UNION);
}

SDIFF命令

返回一个集合的全部成员,该集合是所有给定集合之间的差集。

不存在的 key 被视为空集。

  • sflagsrS,读命令,具有幂等性。

命令形式

1
SDIFF key [key ...]

内部实现函数

详细参考SUNION命令。

1
2
3
4
// t_set.c
void sdiffCommand(client *c) {
sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,SET_OP_DIFF);
}

SDIFFSTORE命令

这个命令的作用和 SDIFF 类似,但它将结果保存到 destination 集合,而不是简单地返回结果集。

如果 destination 集合已经存在,则将其覆盖。

destination 可以是 key 本身。

  • sflagswm,写命令,并可能增加内存使用。

命令形式

1
SDIFFSTORE destination key [key ...]

内部实现函数

详细参考SUNION命令。

1
2
3
4
// t_set.c
void sdiffstoreCommand(client *c) {
sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],SET_OP_DIFF);
}

SMEMBERS命令

返回集合 key 中的所有成员。

不存在的 key 被视为空集合。

  • sflagsrS,读命令,具有幂等性。

命令形式

1
SMEMBERS key

内部实现函数

其实内部实现是使用SINTER命令相同,都是sinterCommand()

1
2
3
4
// t_set.c
void sinterCommand(client *c) {
sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
}