Redis学习压缩列表

2022-08-07 12:57:54

一、压缩列表

1.介绍

压缩列表(ziplist)是列表键和哈希键的底层实现之一。当一个列表键只包含列表项,并且每个列表要么就是长度较短的字符串,要么就是小整数值,那么redis就会使用压缩列表作为列表键的底层实现。

压缩列表是redis为了节约而开发的,是一系列由特殊编码的连续内存块组成的顺序型数据结构。一个压缩列表可以包含任意多个节点,每个节点包含一个字节数组或者一个整数值。

压缩列表的布局<zlbytes><zltail><zllen><entry><entry><zlen>
其中zlbytes记录整个压缩列表占用的内存字节数,再对压缩列表进行内存重分配,或者计算zlend的位置时使用

zltail:记录压缩列表表尾节点距离压缩列表的起始地址有多少字节,通过这个偏移量,程序无需遍历整个压缩列表就可以确定表尾节点的地址
zllen:记录压缩列表包含的数量,小于65535时,这个属性的值就是压缩列表包含节点的数量;当这个值等于uintx_max,节点的真实数量需要遍历整个压缩列表才能计算得出。

entryX列表节点,压缩列表包含的各个节点,节点的长度由节点保存的内容决定。
zlend,特殊值oxFF,用于标记压缩列表的末端。

2.压缩列表节点的构成

typedefstructzlentry{// prevrawlen :前置节点的长度// prevrawlensize :编码 prevrawlen 所需的字节大小unsignedint prevrawlensize, prevrawlen;// len :当前节点值的长度// lensize :编码 len 所需的字节大小unsignedint lensize, len;// 当前节点 header 的大小// 等于 prevrawlensize + lensizeunsignedint headersize;// 当前节点值所使用的编码类型unsignedchar encoding;// 指向当前节点的指针unsignedchar*p;} zlentry;

每个压缩列表可以保存一个字节数组或者一个整数值,其中,字节数组可以是以下三种长度之一:

  • 长度小于等于63的字节数组
  • 长度小于等于16383的字节数组
  • 长度小于等于 2 32 − 1 2^{32}-12321 的字节数组

而整数可以是以下六种长度之一:

  • 4位长,介于0到12之间的无符号整数
  • 1字节长的有符号整数
  • 3字节长的有符号整数
  • int16_t类型整数
  • int32_t类型整数
  • int64_t类型整数

节点的prevrawlen属性以字节为单位,记录了压缩列表中前一个节点的长度。previous_entry_length属性的长度可以是1字节或者是5字节。如果前一节点的长度小于254字节,那么就是1字节,如果大于254字节,那就是5字节。

encoding 记录了节点content属性的类型以及长度

  • 一字节、两字节或者五字节长,值的最高位为00、01、或者是10的是字节数组编码:这种编码表示节点的content属性保存着字节数组,数组的长度由除去最高两位之后的其它位记录。
  • 一字节长,值的最高位以11开头的是整数编码,这种鞭名马表示节点的content属性保存着整数值,整数值的类型的长度由除去最高两位之后的其他位记录。

二、API

//创建一个压缩列表unsignedchar*ziplistNew(void);//创建一个包含给定值的节点,并将这个节点插入到给给定的压缩列表里unsignedchar*ziplistPush(unsignedchar*zl,unsignedchar*s,unsignedint slen,int where);//返回压缩列表给点索引上的节点unsignedchar*ziplistIndex(unsignedchar*zl,int index);//返回给定节点的下一个节点unsignedchar*ziplistNext(unsignedchar*zl,unsignedchar*p);//返回给定节点的上一个节点unsignedchar*ziplistPrev(unsignedchar*zl,unsignedchar*p);//获取给定节点所保存的值unsignedintziplistGet(unsignedchar*p,unsignedchar**sval,unsignedint*slen,longlong*lval);//将包含给定值的新节点插入到给点节点之后unsignedchar*ziplistInsert(unsignedchar*zl,unsignedchar*p,unsignedchar*s,unsignedint slen);//从压缩列表删除指定的节点unsignedchar*ziplistDelete(unsignedchar*zl,unsignedchar**p);//删除指定范围的的节点unsignedchar*ziplistDeleteRange(unsignedchar*zl,unsignedint index,unsignedint num);unsignedintziplistCompare(unsignedchar*p,unsignedchar*s,unsignedint slen);//查找包含给定值的节点unsignedchar*ziplistFind(unsignedchar*p,unsignedchar*vstr,unsignedint vlen,unsignedint skip);//返回压缩列表包含的节点数量unsignedintziplistLen(unsignedchar*zl);//返回压缩列表包含的内存字节数size_tziplistBlobLen(unsignedchar*zl);

三、源码

1.ziplistNew

//创建一个新的压缩列表,并初始化属性unsignedchar*ziplistNew(void){// ZIPLIST_HEADER_SIZE 是 ziplist 表头的大小// 1 字节是表末端 ZIP_END 的大小unsignedint bytes= ZIPLIST_HEADER_SIZE+1;// 为表头和表末端分配空间unsignedchar*zl=zmalloc(bytes);// 初始化表属性ZIPLIST_BYTES(zl)=intrev32ifbe(bytes);ZIPLIST_TAIL_OFFSET(zl)=intrev32ifbe(ZIPLIST_HEADER_SIZE);ZIPLIST_LENGTH(zl)=0;// 设置表末端
    zl[bytes-1]= ZIP_END;return zl;}

2.ziplistPush

//将字符串s推入到zl中unsignedchar*ziplistPush(unsignedchar*zl,unsignedchar*s,unsignedint slen,int where){// 根据 where 参数的值,决定将值推入到表头还是表尾unsignedchar*p;
    p=(where== ZIPLIST_HEAD)?ZIPLIST_ENTRY_HEAD(zl):ZIPLIST_ENTRY_END(zl);// 返回添加新值后的 ziplist// T = O(N^2)return__ziplistInsert(zl,p,s,slen);}staticunsignedchar*__ziplistInsert(unsignedchar*zl,unsignedchar*p,unsignedchar*s,unsignedint slen){// 记录当前 ziplist 的长度size_t curlen=intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen, prevlen=0;size_t offset;int nextdiff=0;unsignedchar encoding=0;longlong value=123456789; 
    zlentry entry, tail;if(p[0]!= ZIP_END){// 如果 p[0] 不指向列表末端,说明列表非空,并且 p 正指向列表的其中一个节点// 那么取出 p 所指向节点的信息,并将它保存到 entry 结构中// 然后用 prevlen 变量记录前置节点的长度// (当插入新节点之后 p 所指向的节点就成了新节点的前置节点)// T = O(1)
        entry=zipEntry(p);
        prevlen= entry.prevrawlen;}else{// 如果 p 指向表尾末端,那么程序需要检查列表是否为:// 1)如果 ptail 也指向 ZIP_END ,那么列表为空;// 2)如果列表不为空,那么 ptail 将指向列表的最后一个节点。unsignedchar*ptail=ZIPLIST_ENTRY_TAIL(zl);if(ptail[0]!= ZIP_END){// 表尾节点为新节点的前置节点// 取出表尾节点的长度// T = O(1)
            prevlen=zipRawEntryLength(ptail);}}// 尝试看能否将输入字符串转换为整数,如果成功的话:// 1)value 将保存转换后的整数值// 2)encoding 则保存适用于 value 的编码方式if(zipTryEncoding(s,slen,&value,&encoding)){/* 'encoding' is set to the appropriate integer encoding */
        reqlen=zipIntSize(encoding);}else{/* 'encoding' is untouched, however zipEncodeLength will use the
         * string length to figure out how to encode it. */
        reqlen= slen;}/* We need space for both the length of the previous entry and
     * the length of the payload. */// 计算编码前置节点的长度所需的大小// T = O(1)
    reqlen+=zipPrevEncodeLength(NULL,prevlen);// 计算编码当前节点值所需的大小// T = O(1)
    reqlen+=zipEncodeLength(NULL,encoding,slen);// 只要新节点不是被添加到列表末端,// 那么程序就需要检查看 p 所指向的节点(的 header)能否编码新节点的长度。// nextdiff 保存了新旧编码之间的字节大小差,如果这个值大于 0// 那么说明需要对 p 所指向的节点(的 header )进行扩展// T = O(1)
    nextdiff=(p[0]!= ZIP_END)?zipPrevLenByteDiff(p,reqlen):0;/* Store offset because a realloc may change the address of zl. */// 因为重分配空间可能会改变 zl 的地址// 所以在分配之前,需要记录 zl 到 p 的偏移量,然后在分配之后依靠偏移量还原 p
    offset= p-zl;// curlen 是 ziplist 原来的长度// reqlen 是整个新节点的长度// nextdiff 是新节点的后继节点扩展 header 的长度(要么 0 字节,要么 4 个字节)// T = O(N)
    zl=ziplistResize(zl,curlen+reqlen+nextdiff);
    p= zl+offset;/* Apply memory move when necessary and update tail offset. */if(p[0]!= ZIP_END){// 新元素之后还有节点,因为新元素的加入,需要对这些原有节点进行调整/* Subtract one because of the ZIP_END bytes */// 移动现有元素,为新元素的插入空间腾出位置// T = O(N)memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);// 将新节点的长度编码至后置节点// p+reqlen 定位到后置节点// reqlen 是新节点的长度// T = O(1)zipPrevEncodeLength(p+reqlen,reqlen);// 更新到达表尾的偏移量,将新节点的长度也算上ZIPLIST_TAIL_OFFSET(zl)=intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);// 如果新节点的后面有多于一个节点// 那么程序需要将 nextdiff 记录的字节数也计算到表尾偏移量中// 这样才能让表尾偏移量正确对齐表尾节点
        tail=zipEntry(p+reqlen);if(p[reqlen+tail.headersize+tail.len]!= ZIP_END){ZIPLIST_TAIL_OFFSET(zl)=intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);}}else{// 新元素是新的表尾节点ZIPLIST_TAIL_OFFSET(zl)=intrev32ifbe(p-zl);}// 当 nextdiff != 0 时,新节点的后继节点的(header 部分)长度已经被改变,// 所以需要级联地更新后续的节点if(nextdiff!=0){
        offset= p-zl;// T  = O(N^2)
        zl=__ziplistCascadeUpdate(zl,p+reqlen);
        p= zl+offset;}// 一切搞定,将前置节点的长度写入新节点的 header
    p+=zipPrevEncodeLength(p,prevlen);// 将节点值的长度写入新节点的 header
    p+=zipEncodeLength(p,encoding,slen);// 写入节点值if(ZIP_IS_STR(encoding)){// T = O(N)memcpy(p,s,slen);}else{// T = O(1)zipSaveInteger(p,value,encoding);}// 更新列表的节点数量计数器ZIPLIST_INCR_LENGTH(zl,1);return zl;}

3.ziplistIndex

unsignedchar*ziplistIndex(unsignedchar*zl,int index){unsignedchar*p;

    zlentry entry;// 处理负数索引if(index<0){// 将索引转换为正数
        index=(-index)-1;// 定位到表尾节点
        p=ZIPLIST_ENTRY_TAIL(zl);// 如果列表不为空,那么。。。if(p[0]!= ZIP_END){// 从表尾向表头遍历
            entry=zipEntry(p);// T = O(N)while(entry.prevrawlen>0&& index--){// 前移指针
                p-= entry.prevrawlen;// T = O(1)
                entry=zipEntry(p);}}// 处理正数索引}else{// 定位到表头节点
        p=ZIPLIST_ENTRY_HEAD(zl);// T = O(N)while(p[0]!= ZIP_END&& index--){// 后移指针// T = O(1)
            p+=zipRawEntryLength(p);}}// 返回结果return(p[0]== ZIP_END|| index>0)?NULL: p;}

4.ziplistNext

unsignedchar*ziplistNext(unsignedchar*zl,unsignedchar*p){((void) zl);// p 已经指向列表末端if(p[0]== ZIP_END){returnNULL;}// 指向后一节点
    p+=zipRawEntryLength(p);if(p[0]== ZIP_END){// p 已经是表尾节点,没有后置节点returnNULL;}return p;}

5.ziplistPrev

unsignedchar*ziplistPrev(unsignedchar*zl,unsignedchar*p){
    zlentry entry;// 如果 p 指向列表末端(列表为空,或者刚开始从表尾向表头迭代)// 那么尝试取出列表尾端节点if(p[0]== ZIP_END){
        p=ZIPLIST_ENTRY_TAIL(zl);// 尾端节点也指向列表末端,那么列表为空return(p[0]== ZIP_END)?NULL: p;// 如果 p 指向列表头,那么说明迭代已经完成}elseif(p==ZIPLIST_ENTRY_HEAD(zl)){returnNULL;// 既不是表头也不是表尾,从表尾向表头移动指针}else{// 计算前一个节点的节点数
        entry=zipEntry(p);assert(entry.prevrawlen>0);// 移动指针,指向前一个节点return p-entry.prevrawlen;}}

6.ziplistGet

unsignedintziplistGet(unsignedchar*p,unsignedchar**sstr,unsignedint*slen,longlong*sval){

    zlentry entry;if(p==NULL|| p[0]== ZIP_END)return0;if(sstr)*sstr=NULL;// 取出 p 所指向的节点的各项信息,并保存到结构 entry 中// T = O(1)
    entry=zipEntry(p);// 节点的值为字符串,将字符串长度保存到 *slen ,字符串保存到 *sstr// T = O(1)if(ZIP_IS_STR(entry.encoding)){if(sstr){*slen= entry.len;*sstr= p+entry.headersize;}// 节点的值为整数,解码值,并将值保存到 *sval// T = O(1)}else{if(sval){*sval=zipLoadInteger(p+entry.headersize,entry.encoding);}}return1;}

7.ziplistDelete

unsignedchar*ziplistDelete(unsignedchar*zl,unsignedchar**p){// 因为 __ziplistDelete 时会对 zl 进行内存重分配// 而内存充分配可能会改变 zl 的内存地址// 所以这里需要记录到达 *p 的偏移量// 这样在删除节点之后就可以通过偏移量来将 *p 还原到正确的位置size_t offset=*p-zl;
    zl=__ziplistDelete(zl,*p,1);*p= zl+offset;return zl;}

8.ziplistDeleteRange

nsignedchar*ziplistDeleteRange(unsignedchar*zl,unsignedint index,unsignedint num){// 根据索引定位到节点unsignedchar*p=ziplistIndex(zl,index);// 连续删除 num 个节点return(p==NULL)? zl:__ziplistDelete(zl,p,num);}

9.ziplistcompare

unsignedintziplistCompare(unsignedchar*p,unsignedchar*sstr,unsignedint slen){
    zlentry entry;unsignedchar sencoding;longlong zval, sval;if(p[0]== ZIP_END)return0;// 取出节点
    entry=zipEntry(p);if(ZIP_IS_STR(entry.encoding)){// 节点值为字符串,进行字符串对比if(entry.len== slen){returnmemcmp(p+entry.headersize,sstr,slen)==0;}else{return0;}}else{// 节点值为整数,进行整数对比if(zipTryEncoding(sstr,slen,&sval,&sencoding)){
  • 作者:我在阳澄湖畔吃炸鸡
  • 原文链接:https://blog.csdn.net/smy166153/article/details/118151595
    更新时间:2022-08-07 12:57:54