跳到主要内容

跳表实现原理

介绍

跳表(Skip List) 是一种基于链表的数据结构,但引入了多级索引来加快查找速度。它克服了单纯链表查找的低效率问题,通过层次化设计,实现类似于二分查找的效果。

Redis中的 有序集合 sorted set 就是用跳表实现的。

原理

跳表查询

对于单链表来说,即使有序,想要查找数据也只能从头开始遍历,时间复杂度为O(n)O(n)

为了提高查询效率,我们使用二分查找的思想,对于有序的单链表建立一级索引,索引层的节点包含两个指针,一个是指向下一个节点,一个是指向下一级节点。

每两个节点建一个索引:

img

此时比如说想要查找18这个节点,那么就在一级索引遍历,5次到14这个节点,然后向下找到18,总共7次,而如果没有索引需要10次。次数会减少,如果继续增加层级索引,效率会继续提高,相当于使用空间来换取时间了,例如:

每三个节点建一个索引:

img

此时我们发现想要走到18,只需要6次操作了。

时间复杂度

建立层级索引的时候,如果我们每两个节点建立一层索引,第一层是n2\frac{n}{2}个节点,第二层是n4\frac{n}{4},第三层是:n23\frac{n}{2^3},第kk层是n2k\frac{n}{2^k}

最后一层索引会有两个节点,也就是初始节点和结束节点,那么n2k=2\frac{n}{2^k}=2,解得:k=log2n1k=\log_2{n}-1,加上原来的单链表,共log2nlog_2{n}

在跳表查询时,每一级索引 最多需要遍历3个节点,总的查询时间复杂度为3log2n3log_2{n}

空间复杂度

新加的节点数为:sum=n2+n22+n23+...+2sum=\frac{n}{2}+\frac{n}{2^2}+\frac{n}{2^3}+...+2

这个和是一个等比数列,其中首项为 n2\frac{n}{2},公比r=12r=\frac{1}{2},各项呈现递减的规律。

根据等比数列求和公式 Sn=a1rn1rS_n = a \frac{1 - r^n}{1 - r},我们可以计算出总和。

在这里,a=n2a = \frac{n}{2}r=12r = \frac{1}{2},当 nn \to \infty 时,rn0r^n \to 0,因此总和为:

S=n/211/2=nS = \frac{n/2}{1 - 1/2} = n

所以新加的节点数的总和为 nn。也就是时间复杂度为O(n)O(n)

优化

在跳表中,如果我们每三个节点建立一层索引,第一层将会有n3\frac{n}{3}个节点,第二层将会有n32\frac{n}{3^2}个节点,第三层将会有n33\frac{n}{3^3}个节点,第kk层将会有n3k\frac{n}{3^k}个节点。

最后一层索引会有两个节点,也就是初始节点和结束节点,那么n3k=2\frac{n}{3^k}=2,解得:k=log3n1k=\log_3{n}-1,加上原来的单链表,共有log3nlog_3{n}层。

在跳表查询时,每一级索引 最多需要遍历3个节点,因此总的查询时间复杂度为3log3n3log_3{n}

新加的节点数为:sum=n3+n32+n33+...+9+3+1sum=\frac{n}{3}+\frac{n}{3^2}+\frac{n}{3^3}+...+9+3+1

这个和是一个等比数列,其中首项为 n3\frac{n}{3},公比r=13r=\frac{1}{3},各项呈现递减的规律。

根据等比数列求和公式 Sn=a1rn1rS_n = a \frac{1 - r^n}{1 - r},我们可以计算出总和。

在这里,a=n3a = \frac{n}{3}r=13r = \frac{1}{3},当 nn \to \infty 时,rn0r^n \to 0,因此总和为:

S=n/311/3=n/32/3=n2S = \frac{n/3}{1 - 1/3} = \frac{n/3}{2/3} = \frac{n}{2}

所以新加的节点数的总和为 n2\frac{n}{2}。也就是时间复杂度为O(2log3n)O(2log_3{n}),空间复杂度为O(n2)O(\frac{n}{2})

跳表插入

为保证索引的有序性,可以先使用多级索引找到对应的位置,然后进行插入,再更新索引。

查询的时间复杂度为O(log2n)O(log_2{n}),插入的时间复杂度为O(1)O(1)

跳表删除

删除也是一样,先查询,然后删除,最后更新索引

索引更新

在跳表中,频繁的删除和插入操作如果不及时更新索引,可能导致索引结构退化为单链表。这种退化会显著降低查询效率,因此为了保持高效的查询性能,我们需要动态更新索引,以实现类似于红黑树的平衡状态。跳表通过引入随机函数来决定每个节点的层级。

当向跳表中插入数据时,我们选择同时将这个数据插入到部分索引层中。 如何决定插入到哪些索引层中呢? 通过一个随机函数来决定,比如通过 随机函数得到某个值 K, 那么就将这个节点添加到第一级到第K级索引中。

为什么Redis的Zset使用跳表而不是红黑树

  1. 实现简单:红黑树的插入、删除操作需要复杂的旋转和重平衡操作,以保持其平衡性。而跳表通过随机化的层次结构,只需要修改相邻节点,无需复杂的平衡操作,大大简化了实现。

  2. 范围查询:跳表本质是多级索引链表,跳表的结构非常适合范围查询,红黑树需要依赖中序遍历

  3. 内存更优:平衡树每个节点两个指针,跳表为11p\frac{1}{1-p},Redis中取p=14p=\frac{1}{4},也就是1.33个节点

Redis源码

ZSet数据结构

/* ZSETs use a specialized version of Skiplists */
//这里是跳跃表的节点
typedef struct zskiplistNode { // 跳跃表节点结构体
sds ele; // 一个SDS(简单动态字符串)类型的指针,指向节点的元素数据。
double score; // 用于排序的分值,跳跃表中的节点是根据这个score来进行排序的。
struct zskiplistNode *backward; // 指向跳跃表中当前节点的前一个节点,实现双向链表的结构,使得可以向前遍历。
struct zskiplistLevel { // 一个灵活数组,表示跳跃表的不同层级。每一层包含
struct zskiplistNode *forward; // 指向该层中下一个节点的指针,用来实现跳跃的功能。
unsigned long span; // 当前节点到forward节点的距离,用于快速计算排名。
} level[]; // 层级数组
} zskiplistNode; // 跳跃表节点类型

typedef struct zskiplist { // 跳跃表结构体
struct zskiplistNode *header, *tail; // 头节点和尾节点
unsigned long length; // 跳跃表长度
int level; // 跳跃表当前的最大层数(一般层数是动态变化的,插入时随机分配)。
} zskiplist; // 跳跃表类型

typedef struct zset { // 有序集合结构体
dict *dict; // 字典
zskiplist *zsl; // 跳跃表
} zset; // 有序集合类型

Redis中的有序集合ZSET通过一个字典和一个跳跃表来实现:

  • dict: 一个字典,元素为键,分值为值。用于快速查找元素是否存在,以及获取其分值。
  • zsl: 一个跳跃表,用来实现有序性,基于分值score排序。

随机跳表层数zslRandomLevel

int zslRandomLevel(void) {
int level = 1; //初始为1,表示每个节点至少有一层。
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF)) //random() 生成一个随机数,random() & 0xFFFF 取16位随机数。
level += 1; //如果这个随机数小于 ZSKIPLIST_P * 0xFFFF,则层数增加。ZSKIPLIST_P 通常是一个小于1的常数(例如 0.25),这样保证层数越高越不常见。
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL; //最后返回的层数在1到 ZSKIPLIST_MAXLEVEL 之间。
}

image-20240903094531282

这种机制使得跳跃表层次结构分布均匀,使得查找、插入、删除操作的平均时间复杂度为 O(log N)

插入操作zslInsert

zslInsert() 完成了在跳跃表中根据分值和元素的字典序插入新节点,并维护各层的链表结构和跨度。

zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; // 定义一个更新数组和一个跳跃表节点指针x
unsigned int rank[ZSKIPLIST_MAXLEVEL]; // 定义一个排名数组,用于记录每一层的跨度
int i, level; // 定义两个整型变量i和level

serverAssert(!isnan(score)); // 断言分数不是NaN(Not a Number)
x = zsl->header; // 将x指向跳跃表的头节点
for (i = zsl->level-1; i >= 0; i--) { // 从跳跃表的最高层开始向下遍历
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; // 初始化每一层的排名,最高层的排名为0,其他层的排名继承自上一层
while (x->level[i].forward && // 当当前层的下一个节点存在时
(x->level[i].forward->score < score || // 如果下一个节点的分数小于插入节点的分数,继续向前
(x->level[i].forward->score == score && // 如果分数相等,比较元素的字典序
sdscmp(x->level[i].forward->ele,ele) < 0))) // 如果下一个节点的元素字典序小于插入节点的元素,继续向前
{
rank[i] += x->level[i].span; // 更新排名,增加跨度
x = x->level[i].forward; // 将x指向下一个节点
}
update[i] = x; // 将当前节点记录到更新数组中
}
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
level = zslRandomLevel(); // 随机生成一个层数
if (level > zsl->level) { // 如果生成的层数大于当前跳跃表的层数
for (i = zsl->level; i < level; i++) { // 初始化新增的层
rank[i] = 0; // 新增层的排名初始化为0
update[i] = zsl->header; // 新增层的更新节点为头节点
update[i]->level[i].span = zsl->length; // 新增层的跨度为跳跃表的长度
}
zsl->level = level; // 更新跳跃表的层数
}
x = zslCreateNode(level,score,ele); // 创建一个新的跳跃表节点
for (i = 0; i < level; i++) { // 遍历每一层,插入新节点
x->level[i].forward = update[i]->level[i].forward; // 新节点的forward指针指向更新节点的forward指针
update[i]->level[i].forward = x; // 更新节点的forward指针指向新节点

/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); // 新节点的跨度为更新节点的跨度减去排名差
update[i]->level[i].span = (rank[0] - rank[i]) + 1; // 更新节点的跨度为排名差加1
}

/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) { // 对于未触及的层,增加跨度
update[i]->level[i].span++; // 增加跨度
}

x->backward = (update[0] == zsl->header) ? NULL : update[0]; // 设置新节点的backward指针,如果更新节点是头节点,则为NULL,否则指向更新节点
if (x->level[0].forward) // 如果新节点的forward指针不为NULL
x->level[0].forward->backward = x; // 将forward节点的backward指针指向新节点
else
zsl->tail = x; // 如果新节点是最后一个节点,将跳跃表的尾节点指向新节点
zsl->length++; // 增加跳跃表的长度
return x; // 返回新节点
}

删除节点zslDelete

这个 zslDelete 函数的作用是在跳跃表中根据 scoreele 删除指定的节点

int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; // 定义一个数组update用于存储每一层的更新节点,定义一个指针x用于遍历跳跃表
int i; // 定义一个整型变量i用于循环

x = zsl->header; // 将x指向跳跃表的头节点
for (i = zsl->level-1; i >= 0; i--) { // 从跳跃表的最高层开始向下遍历每一层
while (x->level[i].forward && // 如果当前层的forward指针不为空
(x->level[i].forward->score < score || // 并且forward节点的score小于目标score
(x->level[i].forward->score == score && // 或者forward节点的score等于目标score并且
sdscmp(x->level[i].forward->ele,ele) < 0))) // forward节点的ele小于目标ele
{
x = x->level[i].forward; // 将x指向forward节点
}
update[i] = x; // 将当前层的更新节点存储在update数组中
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward; // 将x指向第0层的forward节点
if (x && score == x->score && sdscmp(x->ele,ele) == 0) { // 如果x不为空并且x的score等于目标score并且x的ele等于目标ele
zslDeleteNode(zsl, x, update); // 删除节点x
if (!node) // 如果node为空
zslFreeNode(x); // 释放节点x
else // 如果node不为空
*node = x; // 将node指向x
return 1; // 返回1表示删除成功
}
return 0; /* not found */ // 返回0表示未找到目标节点
}

修改分数zslUpdateScore

zslUpdateScore 函数的作用是更新跳跃表中某个节点的分数 (score),并根据新分数调整节点的位置。如果新分数不会改变节点的位置,函数只会更新分数;否则,节点将被删除并重新插入到跳跃表中。

zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; // 定义一个数组update用于存储每一层的更新节点,定义一个指针x用于遍历跳跃表
int i; // 定义一个整型变量i用于循环

/* We need to seek to element to update to start: this is useful anyway,
* we'll have to update or remove it. */
x = zsl->header; // 将x指向跳跃表的头节点
for (i = zsl->level-1; i >= 0; i--) { // 从跳跃表的最高层开始向下遍历每一层
while (x->level[i].forward && // 如果当前层的forward指针不为空
(x->level[i].forward->score < curscore || // 并且forward节点的score小于当前score
(x->level[i].forward->score == curscore && // 或者forward节点的score等于当前score并且
sdscmp(x->level[i].forward->ele,ele) < 0))) // forward节点的ele小于当前ele
{
x = x->level[i].forward; // 将x指向forward节点
}
update[i] = x; // 将当前层的更新节点存储在update数组中
}

/* Jump to our element: note that this function assumes that the
* element with the matching score exists. */
x = x->level[0].forward; // 将x指向第0层的forward节点
serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0); // 断言x不为空并且x的score等于当前score并且x的ele等于当前ele

/* If the node, after the score update, would be still exactly
* at the same position, we can just update the score without
* actually removing and re-inserting the element in the skiplist. */
if ((x->backward == NULL || x->backward->score < newscore) && // 如果x的backward为空或者x的backward的score小于新的score
(x->level[0].forward == NULL || x->level[0].forward->score > newscore)) // 并且x的forward为空或者x的forward的score大于新的score
{
x->score = newscore; // 更新x的score为新的score
return x; // 返回更新后的节点x
}

/* No way to reuse the old node: we need to remove and insert a new
* one at a different place. */
zslDeleteNode(zsl, x, update); // 删除节点x
zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele); // 插入一个新的节点newnode,score为新的score,ele为x的ele
/* We reused the old node x->ele SDS string, free the node now
* since zslInsert created a new one. */
x->ele = NULL; // 将x的ele置为空
zslFreeNode(x); // 释放节点x
return newnode; // 返回新的节点newnode
}

查询第一个分数zslFirstInRange

zslFirstInRange 函数用于在跳跃表中查找第一个分数 (score) 落在指定范围 range 内的节点。如果没有节点符合条件,函数返回 NULL

/* Find the first node that is contained in the specified range.
* Returns NULL when no element is contained in the range. */
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x; // 定义一个指向跳跃表节点的指针x
int i; // 定义一个整型变量i,用于循环控制

/* If everything is out of range, return early. */
if (!zslIsInRange(zsl,range)) return NULL; // 如果整个范围都不在区间内,提前返回NULL

x = zsl->header; // 将x指向跳跃表的头节点
for (i = zsl->level-1; i >= 0; i--) { // 从跳跃表的最高层开始,逐层向下遍历
/* Go forward while *OUT* of range. */
while (x->level[i].forward && // 当当前节点的forward指针不为空,并且
!zslValueGteMin(x->level[i].forward->score,range)) // 当前节点的forward节点的score不大于等于range的最小值
x = x->level[i].forward; // 将x指向当前节点的forward节点
}

/* This is an inner range, so the next node cannot be NULL. */
x = x->level[0].forward; // 将x指向最底层的forward节点
serverAssert(x != NULL); // 断言x不为空

/* Check if score <= max. */
if (!zslValueLteMax(x->score,range)) return NULL; // 如果x的score不小于等于range的最大值,返回NULL
return x; // 返回第一个在范围内的节点x
}

参考资料

跳表的原理与实现 [图解]

一文彻底搞懂跳表的各种时间复杂度、适用场景以及实现原理

GitHub Redis