Redis源码阅读笔记-跳跃表结构

跳跃表

跳跃表 是一种有序的数据结构,通过在每个节点中维持多个指向其他节点的指针,从而达到快速访问节点的目的。它是基于有序链表,在链表的基础上,每个节点不止包含一个指针,还可能包含多个指向后继节点的指针,从而加快操作。

示例:

跳跃表示例图.png

结合图,比如查找key为3是否存在:

  1. 首先访问的是key1,比3小,然后从最高层的后继节点4进行比较,34小,所以3肯定在14之间;
  2. 因此移向下一层,后继节点为3刚好满足。

时间复杂度

跳跃表支持平均O(logN),最坏O(N)时间复杂度的节点查找。

在大部分情况下,跳跃表的效率可以和平衡树相媲美,并且跳跃表的实现比平衡树简单。

特点

来之《Redis设计与实现》

  • 跳跃表是有序集合的底层实现之一。Redis只在实现有序集合键和集群节点的内部数据结构中用到了跳跃表。
  • Redis的跳跃表实现由zskiplist和zskiplistNode两个结构组成,其中zskiplist用于保存跳跃表信息(比如表头节点、表尾节点、长度),而zskiplistNode则用于表示跳跃表节点。
  • 每个跳跃表节点的层高都是1至32之间的随机数(幂次定律,越大的数出现的 概率越小)。
  • 在同一个跳跃表中,多个节点可以包含相同的分值,但每个节点的成员对象必须是唯一的。
  • 跳跃表中的节点按照分值大小进行排序,当分值相同时,节点按照成员对象的大小进行排序。

代码结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// server.h

/* ZSETs use a specialized version of Skiplists */
// 跳跃表节点 结构
typedef struct zskiplistNode {
// 节点的成员
// 在redis3.0 中使用的是 robj
// 在redis4.0 中使用 sds
sds ele;
double score; // 分值
struct zskiplistNode *backward; // 后退指针

// 层
struct zskiplistLevel {
struct zskiplistNode *forward; // 层的前进指针
unsigned int span; // 层的跨度
} level[];
} zskiplistNode;

成员ele: 在同一个跳跃表中,各个节点保存的成员对象必须是唯一的。

分值score: 跳跃表中,所有节点都按分值从小到大排序。不同节点中的分值可以相同,分值相同的节点将按照成员对象的大小来排序,成员对象较小的节点在前面。

后退指针*backward: 用于从表尾向表头方向访问节点,跟可以一次跳过多个节点的前进指针不同,因为每个节点只有一个后退指针,所以每次只能后退至前一个节点。

level[]:跳跃表节点的level数组可以包含多个元素,每个元素都包含一个指向其他节点的指针,程序可以通过这些层来加快访问其他节点的速度,一般来说,层的数量越多,访问其他节点的速度就越快。每次创建一个新跳跃表节点的时候,程序都根据幂次定律(powerlaw,越大的数出现的概率越小)随机生成一个介于1和32之间的值作为level数组的大小,这个大小就是层的“高度”。

前进指针*forward: 每个层都有一个指向表尾方向的前进指针,用于从表头向表尾方向访问节点。

跨度span: 层的跨度用于记录连个节点之间的距离:

  • 两个节点之间的跨度越大,它们相距得就越远。
  • 指向NULL的所有前进指针的跨度都为0,因为它们没有连向任何节点。
1
2
3
4
5
6
// 跳跃表 结构
typedef struct zskiplist {
struct zskiplistNode *header, *tail; // 记录 表头节点 和 表尾节点 的指针
unsigned long length; // 表中节点的数量
int level; // 表中层数最大的节点层数
} zskiplist;

部分代码解析

  • zskiplist *zslCreate(void) 创建一个空的zskiplist:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    // t_zet.c

    /* Create a new skiplist. */
    zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;
    zsl->length = 0;
    // 创建一个空节点给表头节点
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
    // 为头节点的各个层级赋值
    // 前进指针forward 置为 NULL
    // 跨度span 设为 0
    zsl->header->level[j].forward = NULL;
    zsl->header->level[j].span = 0;
    }
    // 因为该跳跃表是空的,所以 表头节点的后退指针 置为 NULL
    zsl->header->backward = NULL;
    // 表尾节点 置为 NULL
    zsl->tail = NULL;
    return zsl;
    }
  • zskiplistNode *zslCreateNode(int level, double score, sds ele) 创建一个跳跃表节点,创建level层数,分值score,节点成员ele:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // t_zet.c

    /* Create a skiplist node with the specified number of levels.
    * The SDS string 'ele' is referenced by the node after the call. */
    zskiplistNode *zslCreateNode(int level, double score, sds ele) {
    // 为层数level数组分配内存
    zskiplistNode *zn =
    zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    // 分值赋值
    zn->score = score;
    // 成员赋值
    zn->ele = ele;
    return zn;
    }
  • zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) 往跳跃表*zsl 中插入成员ele,分值是score

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62

    // t_zet.c

    /* Insert a new node in the skiplist. Assumes the element does not already
    * exist (up to the caller to enforce that). The skiplist takes ownership
    * of the passed SDS string 'ele'. */
    zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned int rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
    /* store rank that is crossed to reach the insert position */
    rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
    while (x->level[i].forward &&
    (x->level[i].forward->score < score ||
    (x->level[i].forward->score == score &&
    sdscmp(x->level[i].forward->ele,ele) < 0)))
    {
    rank[i] += x->level[i].span;
    x = x->level[i].forward;
    }
    update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
    * scores, reinserting the same element should never happen since the
    * caller of zslInsert() should test in the hash table if the element is
    * already inside or not. */
    level = zslRandomLevel();
    if (level > zsl->level) {
    for (i = zsl->level; i < level; i++) {
    rank[i] = 0;
    update[i] = zsl->header;
    update[i]->level[i].span = zsl->length;
    }
    zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
    x->level[i].forward = update[i]->level[i].forward;
    update[i]->level[i].forward = x;

    /* update span covered by update[i] as x is inserted here */
    x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
    update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels */
    for (i = level; i < zsl->level; i++) {
    update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
    x->level[0].forward->backward = x;
    else
    zsl->tail = x;
    zsl->length++;
    return x;
    }
    • 插入第一个节点(假设节点score为1, ele为a):

      1. 初始化状态

        跳跃表的初始化状态.png

      2. 获得插入位置

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        x = zsl->header;
        // 当跳跃表本身为空时,zsl->level 为 1
        // 所以这个循环只执行1次
        for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        // 所以rank[0] = 0
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];

        // 因为 x 指向 zsl 的表头节点,
        // 表头节点 所有的level 的前进指针都为NULL
        // 所以插入第一个节点上时,不进入这个循环
        while (x->level[i].forward &&
        (x->level[i].forward->score < score ||
        (x->level[i].forward->score == score &&
        sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
        rank[i] += x->level[i].span;
        x = x->level[i].forward;
        }
        // 将update[0]指向表头节点
        update[i] = x;
        }

        空跳跃表插入节点_1.png

      3. 插入节点

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        // 该函数会生成一个随机(1-32之间)的level(假设为4)
        level = zslRandomLevel();
        // 如果生成的level大于zsl->level(当前为1)
        if (level > zsl->level) {
        // i 从 1 开始,循坏到 i = 3
        for (i = zsl->level; i < level; i++) {
        // rank[1],rank[2],rank[3] 设为 0
        rank[i] = 0;
        // update[1],update[2],update[3] 都指向表头节点
        update[i] = zsl->header;
        // update[1]->level[1].span,update[2]->level[2].span,update[3]->level[3].span 都设为 0
        // 实际上就是将 header的level[1,2,3]的span都设为0
        update[i]->level[i].span = zsl->length;
        }
        // 更新zls->level的值
        zsl->level = level;
        }

        空跳跃表插入节点_2.png

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        // 创建表节点
        x = zslCreateNode(level,score,ele);
        // level 为4,循环 i = [0, 1, 2, 3]
        for (i = 0; i < level; i++) {
        // x 是指向新建节点的
        // update[0-3]是指向头节点
        // 而头节点的level[0-3]的前进节点都是NULL
        // 所以新建节点x->level[0-3]的前进节点都指向NULL
        x->level[i].forward = update[i]->level[i].forward;

        // 将头节点的level[0-3]的前进节点都指向新建节点x
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        // 更新 新建节点x->level[0-3]的层跨度
        // 而 表头节点->level[0-3].span 的层跨度都为0
        // rank[0-3] 为 0
        // 所以 新建节点x->level[0-3]的层跨度 都设为0
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        // 更新表头节点的level[0-3]层跨度 = 1
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
        }

        空跳跃表插入节点_3.png

      4. 更新其他节点的level信息

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        /* increment span for untouched levels */
        // level 为4
        // zsl->level 也为 4
        // 所以不进入该循环
        for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
        }

        // 新建节点的信息指向NULL
        x->backward = (update[0] == zsl->header) ? NULL : update[0];
        // x->level[0].forward 指向 NULL
        if (x->level[0].forward)
        x->level[0].forward->backward = x;
        else
        // 表中只有1个结点,将表尾指针指向新建节点x
        zsl->tail = x;
        // zsl 的长度 + 1
        zsl->length++;
        return x;

        空跳跃表插入节点_4.png
        空跳跃表插入节点_5.png

    • 向有数据的跳跃表插入

      1. 假设跳跃表中有如下节点,假设插入节点score为2,ele为b:

        有节点跳跃表插入节点_1.png

      2. 获得插入位置

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        x = zsl->header;

        // i 初始值为 4,所以循环会执行 i[4, 3, 2, 1, 0] 5次
        for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */

        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];

        // 循环寻找出要定位的位置
        // 直至找到末尾,或者参数分值小于节点分值
        // 如果找到相同分值,则比较成员的大小
        // 随着 i 的变小,x将会指向插入位置前方的节点
        while (x->level[i].forward &&
        (x->level[i].forward->score < score ||
        (x->level[i].forward->score == score &&
        sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
        // 累积记下节点移动的跨度
        rank[i] += x->level[i].span;
        // 更新x的指向,在while循环中x向前移动
        x = x->level[i].forward;
        }

        update[i] = x;
        }

        有节点跳跃表插入节点_2.png

      3. 为新增的节点随机生成level(假设生成的是6)

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        // 获取一个随机的level数,假设生成的是6
        level = zslRandomLevel();
        if (level > zsl->level) {
        // 如果随机生成的level 比原来跳跃表的level大,则需要对表头节点的level扩展
        for (i = zsl->level; i < level; i++) {
        rank[i] = 0;
        update[i] = zsl->header;
        update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
        }

        有节点跳跃表插入节点_3.png

      4. 创建并在跳跃表中插入新节点

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
           // 创建节点,level=6, sorce=2, ele="b"
        x = zslCreateNode(level,score,ele);
        for (i = 0; i < level; i++) {
        // 因为 *update数组 中保存的都是在 插入位置 之前的节点
        // 所以在这里将x中level的前进指针 指向 *update数组中节点的前进指针指向的节点
        x->level[i].forward = update[i]->level[i].forward;
        // 然后 *update数组中节点的前进指针 指向 x
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here */
        // 更新 x 中各个level的跨度
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        // 更新 *update数组中节点的level跨度
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
        }

        有节点跳跃表插入节点_4.png

      5. 更新其他节点的level信息

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        /* increment span for untouched levels */
        // 更新其他节点的跨度,本例中level和zsl->level已经相等,所以不进入循环
        for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
        }
        // 更新插入节点的后退指针
        x->backward = (update[0] == zsl->header) ? NULL : update[0];
        if (x->level[0].forward)
        x->level[0].forward->backward = x;
        else
        zsl->tail = x;
        zsl->length++;
        return x;

        有节点跳跃表插入节点_5.png

  • int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) 通过指定scoreelse删除节点,如果在跳跃表中找到节点,函数将会返回1,并且将删除的节点写入**node中;如果没有找到,将会返回0:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    /* Delete an element with matching score/element from the skiplist.
    * The function returns 1 if the node was found and deleted, otherwise
    * 0 is returned.
    *
    * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
    * it is not freed (but just unlinked) and *node is set to the node pointer,
    * so that it is possible for the caller to reuse the node (including the
    * referenced SDS string at node->ele). */
    int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    // 找出 要删除节点 的前置节点,与zslInsert()方法中类似
    // 而update数组中保存了 要删除节点 在删除之前各个 level 紧邻的前置节点
    for (i = zsl->level-1; i >= 0; i--) {
    while (x->level[i].forward &&
    (x->level[i].forward->score < score ||
    (x->level[i].forward->score == score &&
    sdscmp(x->level[i].forward->ele,ele) < 0)))
    {
    x = x->level[i].forward;
    }
    update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
    * is to find the element with both the right score and object. */
    x = x->level[0].forward;
    // 判断分值和成员是否相同
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
    zslDeleteNode(zsl, x, update);
    if (!node)
    // 如果参数node 是 NULL的,则直接释放掉要删除的节点
    zslFreeNode(x);
    else
    // 否则将node 指向 x
    *node = x;
    return 1;
    }
    return 0; /* not found */
    }

    /* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
    // 删除节点的内部函数
    // *zsl 是指定的跳跃表
    // *x 是要删除的节点
    // **update 是指向一个数组的指针,数组中保存了 要删除节点 在删除之前各个 level 紧邻的前置节点
    void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    // 删除节点x,并且更新表中x前置节点level的跨度和指向
    for (i = 0; i < zsl->level; i++) {
    if (update[i]->level[i].forward == x) {
    // 如果是本来指向x的,则指向x对应level的前置节点
    // 并且重新计算跨度
    update[i]->level[i].span += x->level[i].span - 1;
    update[i]->level[i].forward = x->level[i].forward;
    } else {
    // 如果本来不是指向x的,则是指向x之后的节点,所以只需要跨度减1
    update[i]->level[i].span -= 1;
    }
    }
    // 修改 要删除节点x 的后一个节点的后退指针
    if (x->level[0].forward) {
    x->level[0].forward->backward = x->backward;
    } else {
    // 如果x是尾节点,则修改尾结点的指向
    zsl->tail = x->backward;
    }
    // 修改表中最大level值
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
    zsl->level--;
    // 节点x已经不再关联,长度减1
    zsl->length--;
    }

API

参考之《Redis设计与实现》

函数 作用
zskiplist *zslCreate(void) 创建一个新的跳跃表
void zslFree(zskiplist *zsl) 释放给定跳跃表,以及表中包含的所有节点
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) 将包含给定成员和分值的新节点添加到跳跃表中
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) 删除跳跃表中包含给定成员和分值的节点
unsigned long zslGetRank(zskiplist *zsl, double score, sds o) 返回包含给定成员和分值的节点在跳跃表中的排位
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) 返回跳跃表在给定排位上的节点
int zslIsInRange(zskiplist *zsl, zrangespec *range) 给定一个分值范围range,如果跳跃表中有至少一个节点的分值在这个分值范围内,返回1,否则返回0。zrangespec是包含minmax的结构体。
zskiplistNode *zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) 给定一个分值范围range,返回跳跃表中第一个符合这个范围的节点
zskiplistNode *zslLastInRange(zskiplist *zsl, zlexrangespec *range) 给定一个分值范围range,返回跳跃表中最后一个符合这个范围的节点
unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) 给定一个分值范围,删除跳跃表中所有这个范围内的节点(同时也会删除dict中对应的key)
unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict) 给定一个排位范围,删除跳跃表中所有这个范围内的节点(同时也会删除dict中对应的key)