Redis源码阅读笔记-快速列表

快速列表

快速列表(quicklist)是由压缩列表(ziplist)组成的一个双向链表,链表中,每一个节点都是以压缩列表(ziplist)的结构保存。

在 Redis3.2 后加入的新数据结构,在列表键中取代了双向链表的作用。

特点

  • 双向链表(list)在插入节点删除节点上效率高,但是每个节点不连续,容易产生内存碎片。
  • 压缩列表(ziplist)是一段连续的内存,但不利于修改,插入删除麻烦,复杂度高,频繁申请释放内存。

快速列表综合了双向列表和压缩列表的优点,既有链表头尾插入相对便捷,又有连续内存存储的优点。

代码结构

快速列表结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* quicklist is a 40 byte struct (on 64-bit systems) describing a quicklist.
* 'count' is the number of total entries.
* 'len' is the number of quicklist nodes.
* 'compress' is: -1 if compression disabled, otherwise it's the number
* of quicklistNodes to leave uncompressed at ends of quicklist.
* 'fill' is the user-requested (or default) fill factor. */
typedef struct quicklist {
quicklistNode *head;
quicklistNode *tail;
unsigned long count; /* total count of all entries in all ziplists */
unsigned long len; /* number of quicklistNodes */
int fill : 16; /* fill factor for individual nodes */
unsigned int compress : 16; /* depth of end nodes not to compress;0=off */
} quicklist;

快速列表的结构:

  • *head: 指向快速列表(quicklist)头节点
  • *tail: 指向快速列表(quicklist)尾节点
  • count: 列表中,数据项的个数
  • len: 列表中,节点(quicklistNode)的个数
  • fill: 每个节点中压缩列表(ziplist)的大小限制,可以通过list-max-ziplist-size设定
  • compress: 节点的压缩深度设置,可以通过list-compress-depth设定

list-max-ziplist-size

list-max-ziplist-size的默认配置是OBJ_LIST_MAX_ZIPLIST_SIZE -2,参数的范围是(1 ~ 2^15)和(-1 ~ -5):

  • 当参数为正数,表示按照数据项个数限制每个节点的元素个数。
  • -1 每个节点的ziplist字节大小不能超过4kb
  • -2 每个节点的ziplist字节大小不能超过8kb
  • -3 每个节点的ziplist字节大小不能超过16kb
  • -4 每个节点的ziplist字节大小不能超过32kb
  • -5 每个节点的ziplist字节大小不能超过64kb

PS: 具体代码参看quicklist.c中的int _quicklistNodeSizeMeetsOptimizationRequirement(const size_t sz, const int fill)函数,sz为节点中ziplist的字节数,fill为快速列表的属性fill

list-compress-depth

list-compress-depth的默认配置是OBJ_LIST_COMPRESS_DEPTH 0,取值范围是(0 ~ 2^16):

  • 0 特殊值,表示不压缩
  • 1 表示quicklist两端各有1个节点不压缩,中间的节点压缩
  • 2 表示quicklist两端各有2个节点不压缩,中间的节点压缩
  • 3 表示quicklist两端各有3个节点不压缩,中间的节点压缩
  • 以此类推

快速列表节点结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* Node, quicklist, and Iterator are the only data structures used currently. */

/* quicklistNode is a 32 byte struct describing a ziplist for a quicklist.
* We use bit fields keep the quicklistNode at 32 bytes.
* count: 16 bits, max 65536 (max zl bytes is 65k, so max count actually < 32k).
* encoding: 2 bits, RAW=1, LZF=2.
* container: 2 bits, NONE=1, ZIPLIST=2.
* recompress: 1 bit, bool, true if node is temporarry decompressed for usage.
* attempted_compress: 1 bit, boolean, used for verifying during testing.
* extra: 12 bits, free for future use; pads out the remainder of 32 bits */
typedef struct quicklistNode {
struct quicklistNode *prev;
struct quicklistNode *next;
unsigned char *zl;
unsigned int sz; /* ziplist size in bytes */
unsigned int count : 16; /* count of items in ziplist */
unsigned int encoding : 2; /* RAW==1 or LZF==2 */
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
unsigned int recompress : 1; /* was this node previous compressed? */
unsigned int attempted_compress : 1; /* node can't compress; too small */
unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;

快速列表的节点结构:

  • *prev:指向上一个节点
  • *next:指向下一个节点
  • *zl: 指向数据的指针,如果没有被压缩则是指向压缩列表(ziplist);如果被压缩了,则是指向quicklistLZF
  • sz: zl中指向的结构所占用的字节数
  • count: 节点中的元素项个数,最大为65536
  • encoding: 编码方式 RAW=1, LZF=2 (1表示压缩列表,2表示quicklistLZF)
  • container: 预留字段,存放数据的方式,1-NONE,2-ziplist(代码中并没有使用到,仅仅设置了默认值)
  • recompress: 标识位,1标识临时解压中,需要重新压缩
  • attempted_compress: 仅用于测试中
  • extra: 预留字段

LZF压缩数据结构

1
2
3
4
5
6
7
8
9
/* quicklistLZF is a 4+N byte struct holding 'sz' followed by 'compressed'.
* 'sz' is byte length of 'compressed' field.
* 'compressed' is LZF data with total (compressed) length 'sz'
* NOTE: uncompressed length is stored in quicklistNode->sz.
* When quicklistNode->zl is compressed, node->zl points to a quicklistLZF */
typedef struct quicklistLZF {
unsigned int sz; /* LZF size in bytes*/
char compressed[];
} quicklistLZF;

quicklistLZF 经过LZF算法压缩后数据保存的结构:

  • sz: LZF压缩后占用的字节数
  • compressed[]compressed:压缩后的数据

PS: 在quicklist.c中的int __quicklistCompressNode(quicklistNode *node)函数可以看到,当节点中的压缩列表的大小小于#define MIN_COMPRESS_BYTES 48时,不会执行LZF压缩。

表示数据项结构

1
2
3
4
5
6
7
8
9
typedef struct quicklistEntry {
const quicklist *quicklist;
quicklistNode *node;
unsigned char *zi;
unsigned char *value;
long long longval;
unsigned int sz;
int offset;
} quicklistEntry;

表示quicklist节点中ziplist里的一个数据项结构:

  • *quicklist: 指向所在的快速列表
  • *node: 指向所在的节点
  • *zi: 指向所在的压缩列表
  • *value: 当前压缩列表中的节点的字符串值
  • longval: 当前压缩列表中的节点的整数值
  • sz: 当前压缩列表中的节点的字节大小
  • offset: 当前压缩列表中的节点 相对于 压缩列表 的偏移量

部分代码解析

  • int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) 将大小为sz的新选项value添加到quicklist的头中,如果函数返回0表示使用已经存在的节点,如果返回1表示新建头节点:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    /* Add new entry to head node of quicklist.
    *
    * Returns 0 if used existing head.
    * Returns 1 if new head created. */
    int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
    // orig_head 指向列表头节点
    quicklistNode *orig_head = quicklist->head;

    // 来之博客: https://blog.csdn.net/terence1212/article/details/53770882 的解析
    // likely()是linux提供给程序员的编译优化方法
    // 目的是将“分支转移”的信息提供给编译器,这样编译器可以对代码进行优化,以减少指令跳转带来的性能下降
    // 此处表示节点没有满发生的概率比较大,也就是数据项直接插入到当前节点的可能性大,
    // likely()属于编译器级别的优化
    if (likely(
    // 判断value是否能直接插入到头节点中,
    // 会通过设置的属性 fill 判断,
    // fill为正数时,主要判断 quicklist->head 中元素项的个数
    // fill为负数时,主要判断 quicklist->head 中的数据大小和新数据项value的大小
    _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {

    // 直接插入到 quicklist->head 的压缩列表中
    quicklist->head->zl =
    ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
    // 更新压缩列表的大小
    quicklistNodeUpdateSz(quicklist->head);
    } else {
    // 需要新建节点

    quicklistNode *node = quicklistCreateNode();
    // 指向新建压缩列表
    node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);

    // 更新节点的大小
    quicklistNodeUpdateSz(node);
    // 将node插入为quicklist的表头
    _quicklistInsertNodeBefore(quicklist, quicklist->head, node);
    }
    // 元素项 总数 + 1
    quicklist->count++;
    quicklist->head->count++;
    return (orig_head != quicklist->head);
    }

    // 判断长度为sz的值,是否能插入节点node
    REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
    const int fill, const size_t sz) {
    if (unlikely(!node))
    return 0;

    // ziplist_overhead为估算插入元素项后,压缩列表该元素项的头要占用的字节数

    // 计算更新下一个节点的值,“上一个节点长度”所占用的字节数
    int ziplist_overhead;
    /* size of previous offset */
    if (sz < 254)
    ziplist_overhead = 1;
    else
    ziplist_overhead = 5;

    // 计算编码“encoding”所占用的字节数
    /* size of forward offset */
    if (sz < 64)
    ziplist_overhead += 1;
    else if (likely(sz < 16384))
    ziplist_overhead += 2;
    else
    ziplist_overhead += 5;

    /* new_sz overestimates if 'sz' encodes to an integer type */
    unsigned int new_sz = node->sz + sz + ziplist_overhead;
    // new_sz为加入插入后,该压缩列表的大小
    // _quicklistNodeSizeMeetsOptimizationRequirement() 是判断list-max-ziplist-size的设置的
    if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
    return 1;
    else if (!sizeMeetsSafetyLimit(new_sz))
    return 0;
    else if ((int)node->count < fill)
    return 1;
    else
    return 0;
    }
  • void quicklistInsertAfter(quicklist *quicklist, quicklistEntry *entry, void *value, const size_t sz) 将大小为sz的值value插入到快速列表quicklist指定数据项节点node后:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    void quicklistInsertAfter(quicklist *quicklist, quicklistEntry *entry,
    void *value, const size_t sz) {
    // 调用_quicklistInsert()函数插入
    // 最后参数1表示,要将 value 插入到 entry后
    _quicklistInsert(quicklist, entry, value, sz, 1);
    }

    /* Insert a new entry before or after existing entry 'entry'.
    *
    * If after==1, the new value is inserted after 'entry', otherwise
    * the new value is inserted before 'entry'. */
    // 要将大小为sz的value, 插入到元素项entry的前或后
    // 1表示插入在后
    // 其他表示插入在前
    REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
    void *value, const size_t sz, int after) {
    // full 是判断entry所在的节点node中,是否可以直接插入value,不可以则置为1
    // full_next 是判断entry所在的节点的下一个节点,是否可以直接插入value,不可以则置为1
    // full_prev 是判断entry所在的节点的上一个节点,是否可以直接插入value,不可以则置为1
    // at_tail 表示entry是否在node中是最后一个元素项
    // at_head 表示entry是否在node中是第一个元素项
    int full = 0, at_tail = 0, at_head = 0, full_next = 0, full_prev = 0;
    int fill = quicklist->fill;
    // 获取entry所在的快速列表节点
    quicklistNode *node = entry->node;
    quicklistNode *new_node = NULL;

    if (!node) {
    // 如果指定的entry 并没有节点,那么创建一个并插入
    // 这里应该是默认为,当传入的`node`为NULL时,quicklist中是没有节点的
    // 可以看__quicklistInsertNode()函数中的处理,它并没有考虑quicklist->len > 0 且 node 为 NULL的状态
    /* we have no reference node, so let's create only node in the list */
    D("No node given!");
    new_node = quicklistCreateNode();
    new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
    __quicklistInsertNode(quicklist, NULL, new_node, after);
    new_node->count++;
    quicklist->count++;
    return;
    }

    /* Populate accounting flags for easier boolean checks later */
    // 判断节点node是否可以让value插入到它的压缩列表中
    if (!_quicklistNodeAllowInsert(node, fill, sz)) {
    D("Current node is full with count %d with requested fill %lu",
    node->count, fill);
    full = 1;
    }

    // 如果插在节点后,而且元素项entry恰好是最后一个元素
    // 那么检查节点node的下一个节点,是否可以让value插入到它的压缩列表中
    if (after && (entry->offset == node->count)) {
    D("At Tail of current ziplist");
    at_tail = 1;
    if (!_quicklistNodeAllowInsert(node->next, fill, sz)) {
    D("Next node is full too.");
    full_next = 1;
    }
    }

    // 如果插在节点前,而且元素项entry恰好是第一个元素
    // 那么检查节点node的上一个节点,是否可以让value插入到它的压缩列表中
    if (!after && (entry->offset == 0)) {
    D("At Head");
    at_head = 1;
    if (!_quicklistNodeAllowInsert(node->prev, fill, sz)) {
    D("Prev node is full too.");
    full_prev = 1;
    }
    }

    /* Now determine where and how to insert the new element */
    if (!full && after) {
    // 压缩列表可以插入,且插入entry之后
    D("Not full, inserting after current position.");
    // 尝试解压缩LZF的数据(该函数会判断node中的数据是否已压缩,所以直接调用)
    quicklistDecompressNodeForUse(node);

    // 压缩列表插入
    unsigned char *next = ziplistNext(node->zl, entry->zi);
    if (next == NULL) {
    node->zl = ziplistPush(node->zl, value, sz, ZIPLIST_TAIL);
    } else {
    node->zl = ziplistInsert(node->zl, next, value, sz);
    }
    node->count++;
    // 更新node的字节数
    quicklistNodeUpdateSz(node);
    // 重新压缩
    quicklistRecompressOnly(quicklist, node);
    } else if (!full && !after) {
    // 压缩列表可以插入,且插入entry之前
    D("Not full, inserting before current position.");
    // 尝试解压缩LZF的数据(该函数会判断node中的数据是否已压缩,所以直接调用)
    quicklistDecompressNodeForUse(node);

    // 压缩列表插入
    node->zl = ziplistInsert(node->zl, entry->zi, value, sz);
    node->count++;
    // 更新node的字节数
    quicklistNodeUpdateSz(node);
    // 重新压缩
    quicklistRecompressOnly(quicklist, node);
    } else if (full && at_tail && node->next && !full_next && after) {
    /* If we are: at tail, next has free space, and inserting after:
    * - insert entry at head of next node. */
    // 如果node中压缩列表无法插入
    // 且 entry 是尾元素项
    // 且 node 存在下一个节点
    // 且 下一个节点可以插入
    // 且 新元素项是插入到 entry 之后的
    D("Full and tail, but next isn't full; inserting next node head");
    new_node = node->next;
    quicklistDecompressNodeForUse(new_node);
    new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_HEAD);
    new_node->count++;
    quicklistNodeUpdateSz(new_node);
    quicklistRecompressOnly(quicklist, new_node);
    } else if (full && at_head && node->prev && !full_prev && !after) {
    /* If we are: at head, previous has free space, and inserting before:
    * - insert entry at tail of previous node. */
    // 如果node中压缩列表无法插入
    // 且 entry 是头元素项
    // 且 node 存在上一个节点
    // 且 上一个节点可以插入
    // 且 新元素项是插入到 entry 之前的
    D("Full and head, but prev isn't full, inserting prev node tail");
    new_node = node->prev;
    quicklistDecompressNodeForUse(new_node);
    new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_TAIL);
    new_node->count++;
    quicklistNodeUpdateSz(new_node);
    quicklistRecompressOnly(quicklist, new_node);
    } else if (full && ((at_tail && node->next && full_next && after) ||
    (at_head && node->prev && full_prev && !after))) {
    /* If we are: full, and our prev/next is full, then:
    * - create new node and attach to quicklist */
    // 如果 node 中压缩列表无法插入
    // (entry在对列尾 且 node存在下一个节点 且 下一个节点压缩列表无法插入 且 是插入entry之后)
    // 或者
    // (entry在对列头 且 node存在上一个节点 且 上一个节点压缩列表无法插入 且 是插入entry之前)
    D("\tprovisioning new node...");

    // 新建一个节点
    new_node = quicklistCreateNode();
    new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
    new_node->count++;
    quicklistNodeUpdateSz(new_node);
    // 新节点插入到node节点的前或者后(由after决定)
    __quicklistInsertNode(quicklist, node, new_node, after);
    } else if (full) {
    /* else, node is full we need to split it. */
    /* covers both after and !after cases */
    // 以上都不是,而且node中的压缩列表满了,则需要将node拆分为2个node
    D("\tsplitting node...");
    quicklistDecompressNodeForUse(node);
    // 拆分为2个node
    new_node = _quicklistSplitNode(node, entry->offset, after);
    // 将vale添加到new_node中
    new_node->zl = ziplistPush(new_node->zl, value, sz,
    after ? ZIPLIST_HEAD : ZIPLIST_TAIL);
    new_node->count++;
    quicklistNodeUpdateSz(new_node);
    // 将new_node插入到node对应文职
    __quicklistInsertNode(quicklist, node, new_node, after);
    _quicklistMergeNodes(quicklist, node);
    }

    quicklist->count++;
    }

快速列表API

函数 作用
quicklist *quicklistCreate(void) 创建一个新的快速列表。
quicklist *quicklistNew(int fill, int compress) 使用指定的fillcompress创建一个新的快速列表。
void quicklistSetCompressDepth(quicklist *quicklist, int depth) 设置quicklistdepth,设置后并不会执行压缩/解压缩操作,只改变quicklist中的depth属性值。
void quicklistSetFill(quicklist *quicklist, int fill) 设置quicklistfill属性值,设置后并不会对列表进行调整。
void quicklistSetOptions(quicklist *quicklist, int fill, int depth) 设置quicklistfilldepth属性值,设置后并不会对列表进行调整。
void quicklistRelease(quicklist *quicklist) 释放整个quicklist列表
int quicklistPushHead(quicklist *quicklist, void *value, const size_t sz) 将大小为sz的新选项value添加到quicklist的头中,如果函数返回0表示使用已经存在的节点,如果返回1表示新建头节点。
int quicklistPushTail(quicklist *quicklist, void *value, const size_t sz) 将大小为sz的新选项value添加到quicklist的尾中,如果函数返回0表示使用已经存在的节点,如果返回1表示新建尾节点。
void quicklistPush(quicklist *quicklist, void *value, const size_t sz,int where) 将大小为sz的新选项value添加到quicklist的最前或最末,由参数where决定位置,0表示添加到头,-1表示添加到尾。
void quicklistAppendZiplist(quicklist *quicklist, unsigned char *zl) 为压缩列表*zl创建一个新的节点,并将这个节点添加到quicklist的尾。
quicklist *quicklistAppendValuesFromZiplist(quicklist *quicklist, unsigned char *zl) 在压缩列表zl中读取数据项,并将这些数据项添加到quicklist的尾。
quicklist *quicklistCreateFromZiplist(int fill, int compress, unsigned char *zl) 从指定的压缩列表*zl中,创建一个新的quicklist。
void quicklistInsertAfter(quicklist *quicklist, quicklistEntry *node, void *value, const size_t sz) 将大小为sz的值value插入到快速列表quicklist指定数据项节点node
void quicklistInsertBefore(quicklist *quicklist, quicklistEntry *node, void *value, const size_t sz) 将大小为sz的值value插入到快速列表quicklist指定数据项节点node
int quicklistReplaceAtIndex(quicklist *quicklist, long index, void *data, int sz) 将长度为sz的数据data插入到快速列表quicklist中的指定位置index
int quicklistDelRange(quicklist *quicklist, const long start, const long stop) 删除quicklist中指定start ~ end范围的数据项
quicklist *quicklistDup(quicklist *orig) 复制一个新的快速列表并返回
int quicklistIndex(const quicklist *quicklist, const long long index, quicklistEntry *entry) 返回quicklist中指定位置index数据项的值,并写入*entry中,如果找到元素则返回1,否则返回0
void quicklistRotate(quicklist *quicklist) 旋转快速列表quicklist
int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data, unsigned int *sz, long long *sval, void *(*saver)(unsigned char *data, unsigned int sz)) quicklist中弹出(pop)一个元素项,并写入data中,如果这个元素是long long则写入sval中,where为0表示在头弹出,其他表示在尾弹出。
int quicklistPop(quicklist *quicklist, int where, unsigned char **data, unsigned int *sz, long long *slong) quicklist中弹出(pop)一个元素项,并写入data中,where为0表示在头弹出,其他表示在尾弹出。
unsigned long quicklistCount(const quicklist *ql) 返回快速列表ql中元素项的个数
int quicklistCompare(unsigned char *p1, unsigned char *p2, int p2_len) 对比压缩列表的内容。
size_t quicklistGetLzf(const quicklistNode *node, void **data) *node中获取LZF压缩数据,并写入data中,返回值为data的长度。