公理设计-由奇怪海战引发的软件设计思考

前几天看到了一个博客,推荐了《公理设计》一书,还有其相关的文档以及视频
)。简单了解了一下,增深了一些对软件设计的理解,特此也推荐给大家。

公理设计理论将设计建立在科学公理、定理和推论的基础上,由麻省理工学院教授 Nam. P. Suh 领导的研究小组于 1978 年提出,适用于各种类别的设计活动。软件设计当然也属于一类工程设计过程,下面我们就来看一下两者的关联。

奇怪的海战

首先从1862年11月13日的一场海战讲起。这场海战“标志着蒸汽动力铁甲舰新时代的到来。为了便于理解,我这里对舰船名称进行了修改,想了解的朋友可以百度 U.S.S. Monitor battles C.S.S. Virginia.

南方叛军的大大号战舰,体型庞大,非常凶悍。已经击沉了两艘联邦军舰。北方政府军则只派出小小号,一艘非常小,火力也小多的军舰。

大大号顾名思义,它船体特别的大,但是都是固定炮塔,两侧和首尾有很多门炮。而小小号虽然小,却有一个可以旋转的炮台。

我们可以理解为一条战舰需要有两个基础功能:调整航行方向和调整炮击方向。

对于大大号,这两个功能需求是耦合 couple 的,要改变炮击方向,就需要将船只转向。而对于小小号,这两个功能需求则是解耦合 decouple 的,航行方向与炮击方向无关,炮击方向可以独立调整。

于是小小号一直尽量守在大大号的射击死角攻击,而大大号虽然火力猛烈则必须不断通过改变航线来调整炮击方向,于是就不断绕圈。这两条船打了4个小时,大大号不得不撤退了,小小号获得了胜利。

由此可见功能之间的解耦十分重要,它增加了便捷性和灵活性。

工科生最爱的映射矩阵

​书中由海战作为引子,介绍了设计过程中的四个域(Domain):

  • CNs:Customer Needs,客户域,就是客户描述的一大堆自然语言也说不清楚的事情,什么高端大气上档次之类的东西。
  • FRs:Functional Requirements,功能域,从 CNs 域到 FRs 域的变换,就是把客户漫无边际的需求翻译成一些可定量的参数,比如战舰控制系统的 FR 是控制航行方向和控制开炮方向。
  • DPs:Design Parameters,设计参数,或者叫物理域,实现 FRs 的物理参数,比如航向控制器和炮塔控制器。
  • PVs:Process Variables,过程变量,或者叫过程域,是描述实现功能过程中涉及的过程变量。

相邻域之间的映射,可以看成目标(做什么?)和手段(怎样做?)之间的对应关系。设计过程是相邻域中特征向量之间映射和转换过程。

例如,用户域元素映射到功能域的过程,实际上是将用户需求转变成产品功能要素的过程,即产品规划;功能域向物理域的映射过程是产品的设计过程;从物理域到过程域的映射则可看成“加工产品”的过程。

其中最为重要的是FRs(功能需求)到DPs(设计参数)的映射,这也是我们软件开发过程中最长接触的步骤,需求文档有了,如何进行代码设计并实现。

书中以矩阵向量的方式讲述了 FRs (功能需求) 和 DPs (设计参数) 的映射关系,也就是上图中由 A 变量组成的矩阵代表着 FPs 到 DPs 的映射。不同的矩阵代表着不同的映射关系,其实我们不需要关心矩阵各个位置的具体值如何计算,只需简化的了解如果 FP 和 DP 有关联,则矩阵相应位置上的值为1,否则为0。

比如说小小号上的情况,有两个功能需要:FR1(调整航向)和FR2(调整开炮方向);以及两个设计参数:DP1(船舵)和DP2(旋转炮塔)

其中转动船舵的时候,船会转向,所以A11这里是X,同时船身上的炮塔也跟着船一起转向,所以也影响开炮方向FR2,因此A21也是X。 而在旋转炮塔的时候,不影响船的航行方向,所以A12这里是0。

好的设计?

所以,基于上边这个映射矩阵,好的设计应该有两个特点:

  • 首先FRs(功能需求)的数量N,应当等于DPs (设计参数)的数量M。
  • 每一个FR(功能需求)与且只与一个DP(设计参数)相互关联。

也就是说映射矩阵是一个对角矩阵,对角线上有值,其他位置都是0。《程序员修炼之道》中也提及了类似的思想,也就是正交性一节。那一节的提示是消除无关事务之间的影响,正好和这里映射矩阵是对角矩阵不谋而合。当映射举证是对角矩阵时,说明 FR 和 DP 一一对应,不会有交叉影响。当某一个 FR也就是需求发生变更时,只需要修改一个DP。

当然对角矩阵属于比较理想的情况,书中也罗列了一些其他类型的映射矩阵。

其中最差的情况是 FRs(功能需求)的数量N,小于 DPs(设计参数)的数量M。也就是大大号中的情景:它有两个功能需求,FR1 调整航向
和FR2 调整开炮方向,但只有一个DP1 船舵。所以它的映射矩阵如下图所示。

书中还继续讲解了矩阵分解的知识,也就是对应了需求功能点细分到软件详细设计细分等部分的内容,有兴趣的小伙伴可以自己去看看。

总结

所以书中最后给出两个公里:

  • 独立公理(功能独立性公理)
  • 信息公理(信息量最少公理)

这不正是软件设计中经常提及的松耦合和高内聚嘛。模块相互独立互不影响就是松耦合,最小化信息量就是不对外暴露过多信息,也就是高内聚或者信息隐藏。

Share

详解 Redis 内存管理机制和实现

Redis是一个基于内存的键值数据库,其内存管理是非常重要的。本文内存管理的内容包括:过期键的懒性删除和过期删除以及内存溢出控制策略。

最大内存限制

Redis使用 maxmemory 参数限制最大可用内存,默认值为0,表示无限制。限制内存的目的主要 有:

  • 用于缓存场景,当超出内存上限 maxmemory 时使用 LRU 等删除策略释放空间。
  • 防止所用内存超过服务器物理内存。因为 Redis 默认情况下是会尽可能多使用服务器的内存,可能会出现服务器内存不足,导致 Redis 进程被杀死。

maxmemory 限制的是Redis实际使用的内存量,也就是 used_memory统计项对应的内存。由于内存碎片率的存在,实际消耗的内存 可能会比maxmemory设置的更大,实际使用时要小心这部分内存溢出。具体Redis 内存监控的内容请查看一文了解 Redis 内存监控和内存消耗

Redis默认无限使用服务器内存,为防止极端情况下导致系统内存耗 尽,建议所有的Redis进程都要配置maxmemory。 在保证物理内存可用的情况下,系统中所有Redis实例可以调整 maxmemory参数来达到自由伸缩内存的目的。

内存回收策略

Redis 回收内存大致有两个机制:一是删除到达过期时间的键值对象;二是当内存达到 maxmemory 时触发内存移除控制策略,强制删除选择出来的键值对象。

删除过期键对象

Redis 所有的键都可以设置过期属性,内部保存在过期表中,键值表和过期表的结果如下图所示。当 Redis保存大量的键,对每个键都进行精准的过期删除可能会导致消耗大量的 CPU,会阻塞 Redis 的主线程,拖累 Redis 的性能,因此 Redis 采用惰性删除和定时任务删除机制实现过期键的内存回收。

惰性删除是指当客户端操作带有超时属性的键时,会检查是否超过键的过期时间,然后会同步或者异步执行删除操作并返回键已经过期。这样可以节省 CPU成本考虑,不需要单独维护过期时间链表来处理过期键的删除。

过期键的惰性删除策略由 db.c/expireifNeeded 函数实现,所有对数据库的读写命令执行之前都会调用 expireifNeeded 来检查命令执行的键是否过期。如果键过期,expireifNeeded 会将过期键从键值表和过期表中删除,然后同步或者异步释放对应对象的空间。源码展示的时 Redis 4.0 版本。

expireIfNeeded 先从过期表中获取键对应的过期时间,如果当前时间已经超过了过期时间(lua脚本执行则有特殊逻辑,详看代码注释),则进入删除键流程。删除键流程主要进行了三件事:

  • 一是删除操作命令传播,通知 slave 实例并存储到 AOF 缓冲区中
  • 二是记录键空间事件,
  • 三是根据 lazyfree_lazy_expire 是否开启进行异步删除或者异步删除操作。
int expireIfNeeded(redisDb *db, robj *key) {
    // 获取键的过期时间
    mstime_t when = getExpire(db,key);
    mstime_t now;
    // 键没有过期时间
    if (when < 0) return 0;
    // 实例正在从硬盘 laod 数据,比如说 RDB 或者 AOF
    if (server.loading) return 0;

    // 当执行lua脚本时,只有键在lua一开始执行时
    // 就到了过期时间才算过期,否则在lua执行过程中不算失效
    now = server.lua_caller ? server.lua_time_start : mstime();

    // 当本实例是slave时,过期键的删除由master发送过来的
    // del 指令控制。但是这个函数还是将正确的信息返回给调用者。
    if (server.masterhost != NULL) return now > when;
    // 判断是否未过期
    if (now <= when) return 0;

    // 代码到这里,说明键已经过期,而且需要被删除
    server.stat_expiredkeys++;
    // 命令传播,到 slave 和 AOF
    propagateExpire(db,key,server.lazyfree_lazy_expire);
    // 键空间通知使得客户端可以通过订阅频道或模式, 来接收那些以某种方式改动了 Redis 数据集的事件。
    notifyKeyspaceEvent(NOTIFY_EXPIRED,
        "expired",key,db->id);
    // 如果是惰性删除,调用dbAsyncDelete,否则调用 dbSyncDelete
    return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
                                         dbSyncDelete(db,key);
}

上图是写命令传播的示意图,删除命令的传播和它一致。propagateExpire 函数先调用 feedAppendOnlyFile 函数将命令同步到 AOF 的缓冲区中,然后调用 replicationFeedSlaves函数将命令同步到所有的 slave 中。Redis 复制的机制可以查看Redis 复制过程详解

// 将命令传递到slave和AOF缓冲区。maser删除一个过期键时会发送Del命令到所有的slave和AOF缓冲区
void propagateExpire(redisDb *db, robj *key, int lazy) {
    robj *argv[2];
    // 生成同步的数据
    argv[0] = lazy ? shared.unlink : shared.del;
    argv[1] = key;
    incrRefCount(argv[0]);
    incrRefCount(argv[1]);
    // 如果开启了 AOF 则追加到 AOF 缓冲区中
    if (server.aof_state != AOF_OFF)
        feedAppendOnlyFile(server.delCommand,db->id,argv,2);
    // 同步到所有 slave
    replicationFeedSlaves(server.slaves,db->id,argv,2);

    decrRefCount(argv[0]);
    decrRefCount(argv[1]);
}

dbAsyncDelete 函数会先调用 dictDelete 来删除过期表中的键,然后处理键值表中的键值对象。它会根据值的占用的空间来选择是直接释放值对象,还是交给 bio 异步释放值对象。判断依据就是值的估计大小是否大于 LAZYFREE_THRESHOLD 阈值。键对象和 dictEntry 对象则都是直接被释放。

#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
    // 删除该键在过期表中对应的entry
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

    // unlink 该键在键值表对应的entry
    dictEntry *de = dictUnlink(db->dict,key->ptr);
    // 如果该键值占用空间非常小,懒删除反而效率低。所以只有在一定条件下,才会异步删除
    if (de) {
        robj *val = dictGetVal(de);
        size_t free_effort = lazyfreeGetFreeEffort(val);
        // 如果释放这个对象消耗很多,并且值未被共享(refcount == 1)则将其加入到懒删除列表
        if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
            atomicIncr(lazyfree_objects,1);
            bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
            dictSetVal(db->dict,de,NULL);
        }
    }

    // 释放键值对,或者只释放key,而将val设置为NULL来后续懒删除
    if (de) {
        dictFreeUnlinkedEntry(db->dict,de);
        // slot 和 key 的映射关系是用于快速定位某个key在哪个 slot中。
        if (server.cluster_enabled) slotToKeyDel(key);
        return 1;
    } else {
        return 0;
    }
}

dictUnlink 会将键值从键值表中删除,但是却不释放 key、val和对应的表entry对象,而是将其直接返回,然后再调用dictFreeUnlinkedEntry进行释放。dictDelete 是它的兄弟函数,但是会直接释放相应的对象。二者底层都通过调用 dictGenericDelete来实现。dbAsyncDelete d的兄弟函数 dbSyncDelete 就是直接调用dictDelete来删除过期键。

void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
    if (he == NULL) return;
    // 释放key对象
    dictFreeKey(d, he);
    // 释放值对象,如果它不为null
    dictFreeVal(d, he);
    // 释放 dictEntry 对象
    zfree(he);
}

Redis 有自己的 bio 机制,主要是处理 AOF 落盘、懒删除逻辑和关闭大文件fd。bioCreateBackgroundJob 函数将释放值对象的 job 加入到队列中,bioProcessBackgroundJobs会从队列中取出任务,根据类型进行对应的操作。

void *bioProcessBackgroundJobs(void *arg) {
    .....
    while(1) {
        listNode *ln;

        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        if (type == BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == BIO_AOF_FSYNC) {
            aof_fsync((long)job->arg1);
        } else if (type == BIO_LAZY_FREE) {
            // 根据参数来决定要做什么。有参数1则要释放它,有参数2和3是释放两个键值表
            // 过期表,也就是释放db 只有参数三是释放跳表
            if (job->arg1)
                lazyfreeFreeObjectFromBioThread(job->arg1);
            else if (job->arg2 && job->arg3)
                lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
            else if (job->arg3)
                lazyfreeFreeSlotsMapFromBioThread(job->arg3);
        }
        zfree(job);
        ......
    }
}

dbSyncDelete 则是直接删除过期键,并且将键、值和 DictEntry 对象都释放。

int dbSyncDelete(redisDb *db, robj *key) {
    // 删除过期表中的entry
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
    // 删除键值表中的entry
    if (dictDelete(db->dict,key->ptr) == DICT_OK) {
        // 如果开启了集群,则删除slot 和 key 映射表中key记录。
        if (server.cluster_enabled) slotToKeyDel(key);
        return 1;
    } else {
        return 0;
    }
}

但是单独用这种方式存在内存泄露的问题,当过期键一直没有访问将无法得到及时删除,从而导致内存不能及时释放。正因为如此,Redis还提供另一种定时任 务删除机制作为惰性删除的补充。

Redis 内部维护一个定时任务,默认每秒运行10次(通过配置控制)。定时任务中删除过期键逻辑采用了自适应算法,根据键的 过期比例、使用快慢两种速率模式回收键,流程如下图所示。

  • 1)定时任务首先根据快慢模式( 慢模型扫描的键的数量以及可以执行时间都比快模式要多 )和相关阈值配置计算计算本周期最大执行时间、要检查的数据库数量以及每个数据库扫描的键数量。
  • 2) 从上次定时任务未扫描的数据库开始,依次遍历各个数据库。
  • 3)从数据库中随机选手 ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 个键,如果发现是过期键,则调用 activeExpireCycleTryExpire 函数删除它。
  • 4)如果执行时间超过了设定的最大执行时间,则退出,并设置下一次使用慢模式执行。
  • 5)未超时的话,则判断是否采样的键中是否有25%的键是过期的,如果是则继续扫描当前数据库,跳到第3步。否则开始扫描下一个数据库。

定期删除策略由 expire.c/activeExpireCycle 函数实现。在redis事件驱动的循环中的eventLoop->beforesleep和
周期性操作 databasesCron 都会调用 activeExpireCycle 来处理过期键。但是二者传入的 type 值不同,一个是ACTIVE_EXPIRE_CYCLE_SLOW 另外一个是ACTIVE_EXPIRE_CYCLE_FAST。activeExpireCycle 在规定的时间,分多次遍历各个数据库,从 expires 字典中随机检查一部分过期键的过期时间,删除其中的过期键,相关源码如下所示。

void activeExpireCycle(int type) {
    // 上次检查的db
    static unsigned int current_db = 0; 
    // 上次检查的最大执行时间
    static int timelimit_exit = 0;
    // 上一次快速模式运行时间
    static long long last_fast_cycle = 0; /* When last fast cycle ran. */

    int j, iteration = 0;
    // 每次检查周期要遍历的DB数
    int dbs_per_call = CRON_DBS_PER_CALL;
    long long start = ustime(), timelimit, elapsed;

    ..... // 一些状态时不进行检查,直接返回

    // 如果上次周期因为执行达到了最大执行时间而退出,则本次遍历所有db,否则遍历db数等于 CRON_DBS_PER_CALL
    if (dbs_per_call > server.dbnum || timelimit_exit)
        dbs_per_call = server.dbnum;

    // 根据ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC计算本次最大执行时间
    timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
    timelimit_exit = 0;
    if (timelimit <= 0) timelimit = 1;
    // 如果是快速模式,则最大执行时间为ACTIVE_EXPIRE_CYCLE_FAST_DURATION
    if (type == ACTIVE_EXPIRE_CYCLE_FAST)
        timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */
    // 采样记录
    long total_sampled = 0;
    long total_expired = 0;
    // 依次遍历 dbs_per_call 个 db
    for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
        int expired;
        redisDb *db = server.db+(current_db % server.dbnum);
        // 将db数增加,一遍下一次继续从这个db开始遍历
        current_db++;

        do {
            ..... // 申明变量和一些情况下 break
            if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
                num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
            // 主要循环,在过期表中进行随机采样,判断是否比率大于25%
            while (num--) {
                dictEntry *de;
                long long ttl;

                if ((de = dictGetRandomKey(db->expires)) == NULL) break;
                ttl = dictGetSignedIntegerVal(de)-now;
                // 删除过期键
                if (activeExpireCycleTryExpire(db,de,now)) expired++;
                if (ttl > 0) {
                    /* We want the average TTL of keys yet not expired. */
                    ttl_sum += ttl;
                    ttl_samples++;
                }
                total_sampled++;
            }
            // 记录过期总数
            total_expired += expired;
            // 即使有很多键要过期,也不阻塞很久,如果执行超过了最大执行时间,则返回
            if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
                elapsed = ustime()-start;
                if (elapsed > timelimit) {
                    timelimit_exit = 1;
                    server.stat_expired_time_cap_reached_count++;
                    break;
                }
            }
            // 当比率小于25%时返回
        } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
    }
    .....// 更新一些server的记录数据
}

activeExpireCycleTryExpire 函数的实现就和 expireIfNeeded 类似,这里就不赘述了。

int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
    long long t = dictGetSignedIntegerVal(de);
    if (now > t) {
        sds key = dictGetKey(de);
        robj *keyobj = createStringObject(key,sdslen(key));

        propagateExpire(db,keyobj,server.lazyfree_lazy_expire);
        if (server.lazyfree_lazy_expire)
            dbAsyncDelete(db,keyobj);
        else
            dbSyncDelete(db,keyobj);
        notifyKeyspaceEvent(NOTIFY_EXPIRED,
            "expired",keyobj,db->id);
        decrRefCount(keyobj);
        server.stat_expiredkeys++;
        return 1;
    } else {
        return 0;
    }
}

定期删除策略的关键点就是删除操作执行的时长和频率:

  • 如果删除操作太过频繁或者执行时间太长,就对 CPU 时间不是很友好,CPU 时间过多的消耗在删除过期键上。
  • 如果删除操作执行太少或者执行时间太短,就不能及时删除过期键,导致内存浪费。

内存溢出控制策略

当Redis所用内存达到maxmemory上限时会触发相应的溢出控制策略。 具体策略受maxmemory-policy参数控制,Redis支持6种策略,如下所示:

  • 1)noeviction:默认策略,不会删除任何数据,拒绝所有写入操作并返 回客户端错误信息(error)OOM command not allowed when used memory,此 时Redis只响应读操作。
  • 2)volatile-lru:根据LRU算法删除设置了超时属性(expire)的键,直 到腾出足够空间为止。如果没有可删除的键对象,回退到noeviction策略。
  • 3)allkeys-lru:根据LRU算法删除键,不管数据有没有设置超时属性, 直到腾出足够空间为止。
  • 4)allkeys-random:随机删除所有键,直到腾出足够空间为止。
  • 5)volatile-random:随机删除过期键,直到腾出足够空间为止。
  • 6)volatile-ttl:根据键值对象的ttl属性,删除最近将要过期数据。如果没有,回退到noeviction策略。

内存溢出控制策略可以使用 config set maxmemory-policy {policy} 语句进行动态配置。Redis 提供了丰富的空间溢出控制策略,我们可以根据自身业务需要进行选择。

当设置 volatile-lru 策略时,保证具有过期属性的键可以根据 LRU 剔除,而未设置超时的键可以永久保留。还可以采用allkeys-lru 策略把 Redis 变为纯缓存服务器使用。

当Redis因为内存溢出删除键时,可以通过执行 info stats 命令查看 evicted_keys 指标找出当前 Redis 服务器已剔除的键数量。

每次Redis执行命令时如果设置了maxmemory参数,都会尝试执行回收 内存操作。当Redis一直工作在内存溢出(used_memory>maxmemory)的状态下且设置非 noeviction 策略时,会频繁地触发回收内存的操作,影响Redis 服务器的性能,这一点千万要引起注意。

Share

一文了解 Redis 内存监控和内存消耗

Redis 是一种内存数据库,将数据保存在内存中,读写效率要比传统的将数据保存在磁盘上的数据库要快很多。所以,监控 Redis 的内存消耗并了解 Redis 内存模型对高效并长期稳定使用 Redis 至关重要。

内存使用统计

通过 info memory 命令可以获得 Redis 内存相关的指标。较为重要的指标和解释如下所示:

当 mem_fragmentation_ratio > 1 时,说明有部分内存并没有用于数据存储,而是被内存碎片所消耗,如果该值很大,说明碎片率严重。
当 mem_fragmentation_ratio < 1 时,这种情况一般出现在操作系统把 Redis 内存交换 (swap) 到硬盘导致,出现这种情况要格外关注,由于硬盘速度远远慢于内存,Redis 性能会变得很差,甚至僵死。

当 Redis 内存超出可以获得内存时,操作系统会进行 swap,将旧的页写入硬盘。从硬盘读写大概比从内存读写要慢5个数量级。used_memory 指标可以帮助判断 Redis 是否有被swap的风险或者它已经被swap。

在 Redis Administration 一文 (链接在文末) 建议要设置和内存一样大小的交换区,如果没有交换区,一旦 Redis 突然需要的内存大于当前操作系统可用内存时,Redis 会因为 out of memory 而被 Linix Kernel 的 OOM Killer 直接杀死。虽然当 Redis 的数据被换出 (swap out) 时,Redis的性能会变差,但是总比直接被杀死的好。

Redis 使用 maxmemory 参数限制最大可用内存。限制内存的目的主要有:

  • 用于缓存场景,当超出内存上限 maxmemory 时使用 LRU 等删除策略释放空间。
  • 防止所用的内存超过服务器物理内存,导致 OOM 后进程被系统杀死。

maxmemory 限制的是 Redis 实际使用的内存量,也就是 used_memory 统计项对应的内存。实际消耗的内存可能会比 maxmemory 设置的大,要小心因为这部内存导致 OOM。所以,如果你有 10GB 的内存,最好将 maxmemory 设置为 8 或者 9G

内存消耗划分

Redis 进程内消耗主要包括:自身内存 + 对象内存 + 缓冲内存 + 内存碎片,其中 Redis 空进程自身内存消耗非常少,通常 used_memory_rss 在 3MB 左右时,used_memory 一般在 800KB 左右,一个空的 Redis 进程消耗内存可以忽略不计。

对象内存

对象内存是 Redis 内存占用最大的一块,存储着用户所有的数据。Redis 所有的数据都采用 key-value 数据类型,每次创建键值对时,至少创建两个类型对象:key 对象和 value 对象。对象内存消耗可以简单理解为这两个对象的内存消耗之和(还有类似过期之类的信息)。键对象都是字符串,在使用 Redis 时很容易忽略键对内存消耗的影响,应当避免使用过长的键。有关 Redis 对象系统的详细内容,请看我之前的文章十二张图带你了解 Redis 的数据结构和对象系统

缓冲内存

缓冲内存主要包括:客户端缓冲、复制积压缓冲区和 AOF 缓冲区。

客户端缓冲指的是所有接入到 Redis 服务器 TCP 连接的输入输出缓冲。

输入缓冲无法控制,最大空间为 1G,如果超过将断开连接。而且输入缓冲区不受 maxmemory 控制,假设一个 Redis 实例设置了 maxmemory 为 4G,已经存储了 2G 数据,但是如果此时输入缓冲区使用了 3G,就已经超出了 maxmemory 限制,可能导致数据丢失、键值淘汰或者 OOM。

输入缓冲区过大主要是因为 Redis 的处理速度跟不上输入缓冲区的输入速度,并且每次进入输入缓冲区的命令包含了大量的 bigkey。

输出缓冲通过参数 client-output-buffer-limit 控制,其格式如下所示。

client-output-buffer-limit [hard limit] [soft limit] [duration]

hard limit 是指一旦缓冲区大小达到了这个阈值,Redis 就会立刻关闭该连接。而 soft limit 和时间 duration 共同生效,比如说 soft time 为 64mb、duration 为 60,则只有当缓冲区持续 60s 大于 64mb 时,Redis 才会关闭该连接。

普通客户端是除了复制和订阅的客户端之外的所有连接。Reids 对其的默认配置是 client-output-buffer-limit normal 0 0 0 , Redis 并没有对普通客户端的输出缓冲区做限制,一般普通客户端的内存消耗可以忽略不计,但是当有大量慢连接客户端接入时这部分内存消耗就不能忽略,可以设置 maxclients 做限制。特别当使用大量数据输出的命令且数据无法及时推送到客户端时,如 monitor 命令,容易造成 Redis 服务器内存突然飙升。相关案例可以查看这篇文章美团在Redis上踩过的一些坑-3.redis内存占用飙升

从客户端用于主从复制,主节点会为每个从节点单独建立一条连接用于命令复制,默认配置为 client-output-buffer-limit slave 256mb 64mb 60。当主从节点之间网络延迟较高或主节点挂载大量从节点时这部分内存消耗将占用很大一部分,建议主节点挂载的从节点不要多于 2 个,主从节点不要部署在较差的网络环境下,如异地跨机房环境,防止复制客户端连接缓慢造成溢出。与主从复制相关的一共有两类缓冲区,一个是从客户端输出缓冲区,另外一个是下面会介绍到的复制积压缓冲区。

订阅客户端用于发布订阅功能,连接客户端使用单独的输出缓冲区,默认配置为 client-output-buffer-limit pubsub 32mb 8mb 60,当订阅服务的消息生产快于消费速度时,输出缓冲区会产生积压造成内存空间溢出。

输入输出缓冲区在大流量场景中容易失控,造成 Redis 内存不稳定,需要重点监控。可以定期执行 client list 命令,监控每个客户端的输入输出缓冲区大小和其他信息。

属性名 属性说明
qbuf 查询缓冲区的长度(字节为单位, 0 表示没有分配查询缓冲区)
qbuf-free 查询缓冲区剩余空间的长度(字节为单位, 0 表示没有剩余空间)
obl 输出缓冲区的长度(字节为单位, 0 表示没有分配输出缓冲区)
oll 输出列表包含的对象数量(当输出缓冲区没有剩余空间时,命令回复会以字符串对象的形式被入队到这个队列里)
127.0.0.1:6379> client list
id=3 addr=127.0.0.1:58161 fd=8 name= \
age=1408 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 \
qbuf=26 qbuf-free=32742 obl=0 oll=0 omem=0 \
events=r cmd=client

client list 命令执行速度慢,客户端较多时频繁执行存在阻塞redis的可能,所以一般可以先使用 info clients 命令获取最大的客户端缓冲区大小。

127.0.0.1:6379> info clients
# Clients
connected_clients:1
client_recent_max_input_buffer:2
client_recent_max_output_buffer:0
blocked_clients:0

复制积压缓冲区是Redis 在 2.8 版本后提供的一个可重用的固定大小缓冲区,用于实现部分复制功能。根据 repl-backlog-size 参数控制,默认 1MB。对于复制积压缓冲区整个主节点只有一个,所有的从节点共享此缓冲区。因此可以设置较大的缓冲区空间,比如说 100MB,可以有效避免全量复制。有关复制积压缓冲区的详情可以看我的旧文章 Redis 复制过程详解

AOF 重写缓冲区:这部分空间用于在 Redis AOF 重写期间保存最近的写入命令。AOF 重写缓冲区的大小用户无法控制,取决于 AOF 重写时间和写入命令量,不过一般都很小。有关 AOF 持久化的详情可以看我的旧文章 Redis AOF 持久化详解

Redis 内存碎片

Redis 默认的内存分配器采用 jemalloc,可选的分配器还有:glibc、tcmalloc。内存分配器为了更好地管理和重复利用内存,分配内存策略一般采用固定范围的内存块进行分配。具体的分配策略后续会具体讲解,但是 Redis 正常碎片率一般在 1.03 左右(为什么是这个值)。但是当存储的数据长度长度差异较大时,以下场景容易出现高内存碎片问题:

  • 频繁做更新操作,例如频繁对已经存在的键执行 append、setrange 等更新操作。
  • 大量过期键删除,键对象过期删除后,释放的空间无法得到重复利用,导致碎片率上升。

这部分内容我们后续再详细讲解 jemalloc,因为大量的框架都会使用内存分配器,比如说 Netty 等。

子进程内存消耗

子进程内存消耗主要指执行 AOF 重写 或者进行 RDB 保存时 Redis 创建的子进程内存消耗。Redis 执行 fork 操作产生的子进程内存占用量表现为与父进程相同,理论上需要一倍的物理内存来完成相应的操作。但是 Linux 具有写时复制技术 (copy-on-write),父子进程会共享相同的物理内存页,当父进程处理写请求时会对需要修改的页复制出一份副本完成写操作,而子进程依然读取 fork 时整个父进程的内存快照。

如上图所示,fork 时只拷贝 page table,也就是页表。只有等到某一页发生修改时,才真正进行页的复制。

但是 Linux Kernel 在 2.6.38 内存增加了 Transparent Huge Pages (THP) 机制,简单理解,它就是让页大小变大,本来一页为 4KB,开启 THP 机制后,一页大小为 2MB。它虽然可以加快 fork 速度( 要拷贝的页的数量减少 ),但是会导致 copy-on-write 复制内存页的单位从 4KB 增大为 2MB,如果父进程有大量写命令,会加重内存拷贝量,都是修改一个页的内容,但是页单位变大了,从而造成过度内存消耗。例如,以下两个执行 AOF 重写时的内存消耗日志:

// 开启 THP
C * AOF rewrite: 1039 MB of memory used by copy-on-write
// 关闭 THP
C * AOF rewrite: 9MB of memory used by copy-on-write

这两个日志出自同一个 Redis 进程,used_memory 总量是 1.5GB,子进程执行期间每秒写命令量都在 200 左右。当分别开启和关闭 THP 时,子进程内存消耗有天壤之别。所以,在高并发写的场景下开启 THP,子进程内存消耗可能是父进程的数倍,造成机器物理内存溢出。

所以说,Redis 产生的子进程并不需要消耗 1 倍的父进程内存,实际消耗根据期间写入命令量决定,所以需要预留一些内存防止溢出。并且建议关闭系统的 THP,防止 copy-on-write 期间内存过度消耗。不仅是 Redis,部署 MySQL 的机器一般也会关闭 THP。

参考文章

Share

Spring Cloud Netflix Feign 基础应用实战

 微服务是软件系统架构上的一种设计风格,它倡导将一个原本独立的服务系统分成多个小型服务,这些小型服务都在独立的进程中运行,通过各个小型服务之间的协作来实现原本独立系统的所有业务功能。小型服务基于多种跨进程的方式进行通信协作,而在Spring Cloud架构中比较常见的跨进程的方式是RESTful HTTP请求和RPC调用。

 RPC就是远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。比如说,计算机 A 上的进程调用另外一台计算机 B 上的进程,其中 A 上的调用进程被挂起,而 B 上的被调用进程开始执行,当值返回给 A 时,A 进程继续执行。调用方可以通过使用参数将信息传送给被调用方,而后可以通过传回的结果得到信息。而这一过程,对于开发人员来说是透明的。

RPC示意图

 REST是Representational State Transfer的缩写,是表现层状态转移的含义。

 Resource是资源,所谓“资源”就是网络上的一个实体,或者说网上的一个具体信息。它可以是一段文本,一首歌曲,一种服务,总之就是一个具体的存在。你可以使用一个URI指向它,每种”资源“对应一个URI。

 Representational是”表现层“的意思,”资源“是一种消息实体,它可以有多种外在的表现形式,我们把”资源“的具体呈现出来的形式叫做它的”表现层“。比如说,文本可以用txt格式进行表现,也可以使用xml格式,JSON格式和二进制格式;视频可以以MP4格式表现,也可以以AVI格式表现。URI只代表资源的实体,不代表它的形式。它的具体表现形式,应该在HTTP请求的头信息Accept和Content-Type字段指定,这两个字段才是对”表现层“的描述。

 State Transfer是指状态转化。客户端访问服务的过程中必然涉及到数据和状态的转化。如果客户端想要操作服务器,必须通过某种手段,让服务器端发生”状态转化“。而这种转化是建立在表现层之上的,所以就是”表现层状态转化“。客户端通过使用HTTP协议中的四个动词来实现上述操作,它们分别:用来获取资源的GET,用来新建或更新资源的POST,用来更新资源的PUT,用来删除资源的DELETE。

 REST是Web Service的一种实现方式,另外一种实现方式为SOAP。REST致力于通过HTTP协议中的POST/GET/PUT/DELETE等方法和一个可读性较强的URL来提供一个HTTP请求;而SOAP致力于通过wsdl数据格式来实现通信。二者的使用场景和设计目标不同。SOAP一般作为应用层协议来进行服务间的消息调用。

 RPC和REST之间的最大差别在于RPC调用可以不依赖HTTP协议,底层直接使用TPC/IP协议进行传输,传输效率相比于REST会有一定的提升。

Feign简介

Feign是一个声明式RESTful HTTP请求客户端,它使得编写Web服务客户端更加方便和快捷。使用Feign创建一个接口并使用Feign提供的注解修饰该接口,然后就可以使用该接口进行RESTful HTTP请求的发送。Feign还可以集成Ribbon和Eureka来为自己提供负载均衡和断路器的机制。

Feign会将带有注解的函数接口信息转化为网络请求模板,在发送网络请求之前,函数的参数值会以一定的方式设置到这些请求模板中。虽然这样的模式使得Feign只能支持基于文本的网络请求,但是它可以简化网络请求的实现,方便编程人员快速构建自己的网络请求架构。
Feign架构示意图

 如上图所示,使用Feign的程序的架构一般分为三个部分,分别为服务注册中心,服务提供者和服务消费者。服务提供者向服务注册中心注册自己,然后服务消费者通过Feign发送请求时,Feign会向去服务注册中心获取关于服务提供者的信息,然后再向服务提供者发送网络请求。

代码示例

服务注册中心

Feign可以配合eureka等服务注册中心同时使用。eureka来作为服务注册中心,为Feign提供关于服务端信息的获取,比如说IP地址。关于eureka的具体使用可以参考第四章中关于eureka的快速入门介绍。

服务提供者

Spring Cloud Feign是声明式RESTful请求客户端,所以它不会侵入服务提供者程序的实现。也就是说,服务提供者只需要提供Web Service的API接口,至于具体实现既可以是Spring Controler也可以是Jersey。我们只需要确保该服务提供者被注册到服务注册中心上。

@RestController
@RequestMapping("/feign-service")
public class FeignServiceController {

    private static final Logger logger = LoggerFactory.getLogger(FeignServiceController.class);

    private static String DEFAULT_SERVICE_ID = "application";
    private static String DEFAULT_HOST = "localhost";
    private static int DEFAULT_PORT = 8080;

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.GET)
    public Instance getInstanceByServiceId(@PathVariable("serviceId") String serviceId){
        logger.info("Get Instance by serviceId {}", serviceId);
        return new Instance(serviceId, DEFAULT_HOST, DEFAULT_PORT);
    }

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.DELETE)
    public String deleteInstanceByServiceId(@PathVariable("serviceId") String serviceId){

        logger.info("Delete Instance by serviceId {}", serviceId);
        return "Instance whose serviceId is " + serviceId + " is deleted";

    }

    @RequestMapping(value = "/instance", method = RequestMethod.POST)
    public String createInstance(@RequestBody Instance instance){

        logger.info("Create Instance whose serviceId is {}", instance.getServiceId());
        return "Instance whose serviceId is" + instance.getServiceId() + " is created";
    }

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.PUT)
    public String updateInstanceByServiceId(@RequestBody Instance instance, @PathVariable("serviceId") String serviceId){
        logger.info("Update Instance whose serviceId is {}", serviceId);
        return "Instance whose serviceId is " + serviceId + " is updated";
    }
}

 上述代码中通过@RestController@RequestMapping声明了四个网络API接口,分别是对Instance资源的增删改查操作。

 除了实现网络API接口之外,还需要将该service注册到eureka上。如下列代码所示,需要在application.yml文件中设置服务注册中心的相关信息和代表该应用的名称。

eureka:
  instance:
    instance-id: ${spring.application.name}:${vcap.application.instance_id:${spring.application.instance_id:${random.value}}}
  client:
    service-url:
      default-zone: http://localhost:8761/eureka/
spring:
  application:
    name: feign-service
server:
  port: 0

服务消费者

Feign是声明式RESTful客户端,所以构建Feign项目的关键在于构建服务消费者。通过下面六步可以创建一个Spring Cloud Feign的服务消费者。

 首先创建一个普通的Spring Boot工程,取名为chapter-feign-client
 然后在pom文件中添加eurekafeign相关的依赖。其中spring-cloud-starter-eurekaeureka的starter依赖包,spring-cloud-starter-feignfeign的starter依赖包。

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-eureka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-feign</artifactId>
    </dependency>
</dependencies>

 接着在工程的入口类上添加@EnableFeignClients注解表示开启Spring Cloud Feign的支持功能,代码如下所示。

@SpringBootApplication
@EnableFeignClients()
public class ChapterFeignClientApplication {
	public static void main(String[] args) {
		SpringApplication.run(ChapterFeignClientApplication.class, args);
	}
}

@EnableFeignClients就像是一个开关,如果你使用了该注解,那么Feign相关的组件和处理机制才会生效,否则不会生效。@EnableFeignClients还可以对Feign相关组件进行自定义配置,它的方法和原理会在本章的源码分析章节在做具体的讲解。

 接下来我们定义一个FeignServiceClient接口,通过@FeignClient注解来指定服务名进而绑定服务。这一类被@FeignClient修饰的接口类一般被称为FeignClient。我们可以通过@RequestMapping来修饰相应的方法来定义调用函数。

@FeignClient("feign-service")
@RequestMapping("/feign-service")
public interface FeignServiceClient {

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.GET)
    public Instance getInstanceByServiceId(@PathVariable("serviceId") String serviceId);

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.DELETE)
    public String deleteInstanceByServiceId(@PathVariable("serviceId") String serviceId);

    @RequestMapping(value = "/instance", method = RequestMethod.POST)
    public String createInstance(@RequestBody Instance instance);

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.PUT)
    public String updateInstanceByServiceId(@RequestBody Instance instance, @PathVariable("serviceId") String serviceId);
}

 如上面代码片段所显示的,如果你调用FeignServiceClient对象的getInstanceByServiceId函数,那么Feign就会向feign-service服务的/feign-service/instance/{serviceId}接口发送网络请求。

 创建一个Controller来调用上边的服务,通过@Autowired来自动装载FeignServiceClient示例。代码如下:

@RestController
@RequestMapping("/feign-client")
public class FeignClientController {

    @Autowired
    FeignServiceClient feignServiceClient;

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.GET)
    public Instance getInstanceByServiceId(@PathVariable("serviceId") String serviceId){
        return feignServiceClient.getInstanceByServiceId(serviceId);
    }

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.DELETE)
    public String deleteInstanceByServiceId(@PathVariable("serviceId") String serviceId){
        return feignServiceClient.deleteInstanceByServiceId(serviceId);
    }

    @RequestMapping(value = "/instance", method = RequestMethod.POST)
    public String createInstance(@RequestBody Instance instance){
        return feignServiceClient.createInstance(instance);
    }

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.PUT)
    public String updateInstanceByServiceId(@RequestBody Instance instance, @PathVariable("serviceId") String serviceId){
        return feignServiceClient.updateInstanceByServiceId(instance, serviceId);
    }
}

 最后,application.yml中需要配置eureka服务注册中心的相关配置,具体配置如下所示:

eureka:
  instance:
    instance-id: ${spring.application.name}:${vcap.application.instance_id:${spring.application.instance_id:${random.value}}}
  client:
    service-url:
      default-zone: http://localhost:8761/eureka/

spring:
  application:
    name: feign-client
server:
  port: 8770

 相信读者通过搭建Feign的项目,已经对Feign的相关使用原理有了一定的了解,相信这个过程将对于理解Feign相关的工作原理大有裨益。

Share

Redis 复制过程详解

Redis 的复制功能分为同步( sync )和命令传播( command propagate )两个步骤:

  • 同步用于将从服务器的数据库状态更新至主服务器当前所处的数据库状态。
  • 命令传播则用于在主服务器的数据库状态被修改,导致主从服务器的数据库状态出现不一致时,让主从服务器的数据库重新回到一致状态。

同步

Redis 使用 psync 命令完成主从数据同步,同步过程分为:全量复制和部分复制。

全量复制:一般用于初次复制场景,它会把主节点全部数据一次性发送给从节点发送给从节点,当数据量较大时,会对主从节点和网络造成很大的开销。

部分复制:用于处理在主从复制中因网络闪断等原因造成的网络丢失场景,当从节点再次连接上主节点后,如果条件允许,主节点会补发丢失数据给从节点。因为补发的数据远远小于全量数据,可以有效避免全量复制的过高开销。

psync 命令运行需要以下组件支持:

  • 主从节点各自复制偏移量
  • 主节点复制积压缓冲区
  • 主节点运行 id

参与复制的从节点都会维护自身复制偏移量。主节点在处理完写命令后,会把命令的字节长度做累加记录,统计在 info replication 中的 master_repl_offset 指标中。
从节点在接收到主节点发送的命令后,也会累加记录自身的偏移量,并且会每秒钟上报自身的复制偏移量给主节点。
通过对比主从节点的复制偏移量,可以判断主从节点数据是否一致。

复制积压缓冲区是保存在主节点的一个固定长度的队列,默认大小为 1MB,当主节点有连接的从节点时被创建。主节点响应写命令时,不但会把命令发送给从节点,还会写入复制积压缓冲区中。

复制积压缓冲区大小有限,只能保存最近的复制数据,用于部分复制和复制命令丢失时的数据补救。

每个 Redis 节点启动后都会动态分配一个 40 位的十六进制字符串作为运行 ID。运行 ID 的主要作用是用来唯一标识 Redis 节点,比如说从节点保存主节点的运行 ID 来识别自己正在复制的时哪个主节点。

全量同步


slaveof 命令的执行

  • 1) 从节点发送 psync 命令进行数据同步,由于是第一次进行复制,从节点没有复制偏移量和主节点的运行ID,所以发送的命令时 PSYNC ? -1。
  • 2) 主节点根据 PSYNC ? -1 解析出当前为全量复制,回复 + FULLRESYNC 响应。
  • 3) 从节点接收主节点的响应数据保存运行 ID 和偏移量 offset。
  • 4) 主节点执行 bgsave 保存 RDB 文件到本地,有关 RDB 的知识可以查看《Redis RDB 持久化详解》
  • 5) 主节点发送 RDB 文件给从节点,从节点把接收的 RDB 文件保存在本地并直接作为从节点的数据文件,接收完 RDB 后从节点打印相关日志,可以在日志中查看主节点发送的数据量。

需要注意,对于数据量较大的主节点,比如生成的 RDB 文件超过 6GB 以上时要格外小心。如果传输 RDB 的时间超过 repl-timeout 所配置的值,从节点将发起接收 RDB 文件并清理已经下载的临时文件,导致全量复制失败。

  • 6) 对于主节点开始保存 RDB 快照到从节点接收完成期间,主节点仍然响应读命令,因此主节点会把这期间写命令保存在复制客户端缓冲区内,当从节点加载完 RDB 文件后,主节点再把缓冲区内的数据发送给从节点,保证主从之间数据一致性。

如果主节点创建和传输 RDB 的时间过长,可能会出现主节点复制客户端缓冲区溢出。默认配置为 client-output-buffer-limit slave 256MB 64MB 60,如果60s内缓冲区消耗持续大于64MB或者直接超过256MB时,主节点将直接关闭复制客户端连接,造成全量同步失败。

  • 7) 从节点接收完主节点传送来的全部数据后会清空自身旧数据,该步骤对应如下日志。
  • 8) 从节点清空数据后开始加载 RDB 文件,对于加大的 RDB 文件,这一步操作依然比较耗时,可以通过计算日志之间的时间差来判断加载 RDB 的总耗时。
  • 9) 收到 SYNC 命令的主服务器执行 BGSAVE 命令,在后台生成一个 RDB 文件,并使用一个缓冲区记录从现在开始执行的所有写命令。
  • 10) 当主服务器的 BGSAVE 命令执行完毕时,主服务器会将 GBSAVE 命令生成的 RDB 文件发送给从服务器,从服务器接收并载入这个 RDB 文件,将自己的数据库状态更新至主服务器执行 BGSAVE 命令时的数据库状态。
  • 11) 主服务器将记录在缓冲区里边的所有写命令发送给从服务器,从服务器执行这些写命令,将自己的数据库状态更新至主服务器数据库当前所处的状态。

通过分析全量复制的所有流程,读者会发现全量复制是一个非常耗时费力的操作。它时间开销主要包括:

  • 主节点 bgsave 时间
  • RDB 文件网络传输时间
  • 从节点清空数据时间
  • 从节点加载 RDB 的时间
  • 可能的 AOF 重写时间

全量同步过程中不仅会消耗大量时间,还会进行多次持久化相关操作和网络数据传输,这期间会大量消耗主从节点所在服务器的 CPU、内存和网络资源。所以,除了第一次复制是采用全量同步无法避免,其他场景应该规避全量复制,采取部分同步功能。

部分同步

部分复制主要是 Redis 针对全量复制的过高开销做出的一种优化措施,使用 psync {runId} {offset} 命令实现。当从节点正在复制主节点时,如果出现网络闪断或者命令丢失等异常情况时,从节点会向主节点要求补发丢失的命令数据,如果主节点的复制积压缓冲区存在这部分数据则直接发送给从节点,这样就保证了主从节点复制的一致性。补发的这部分数据一般远远小于全量数据,所以开销很小。

  • 1) 当主从节点之间网络出现中断时,如果超过了 repl-timeout 时间,主节点会认为从节点故障并中断复制连接。
  • 2) 主从连接中断期间主节点依然响应命令,但因复制连接中断命令无法发送给从节点,不过主节点内部存在复制积压缓冲区( repl-backlog-buffer ),依然可以保存最近一段时间的写命令数据,默认最大缓存 1MB。

  • 3) 当主从节点网络恢复后,从节点会再次连上主节点。

  • 4) 当主从连接恢复后,由于从节点之前保存了自身已复制的偏移量和主节点的运行ID。因此会把它们作为 psync 参数发送给主节点,要求进行补发复制操作。
  • 5) 主节点接到 psync 命令后首先核对参数 runId 是否与自身一致,如果一致,说明之前复制的是当前主节点;之后根据参数 offset 在自身复制积压缓冲区查找,如果偏移量之后的数据存在缓冲区中,则对从节点发送 +CONTINUE 响应,表示可以进行部分复制。
  • 6) 主节点根据偏移量把复制积压缓冲区里的数据发送给从节点,保证主从复制进入正常状态。

心跳检测

主从节点在建立复制后,它们之间维护着长连接并彼此发送心跳命令,如下图所示。

主从心跳判断机制如下所示:

  • 1) 主从节点彼此都有心跳检测机制,各自模拟成对方的客户端进行通信,通过 client list 命令查看复制相关客户端信息,主节点的连接状态为 flags=M,从节点连接状态为 flags=S。
  • 2) 主节点默认每隔 10 秒对从节点发送 ping 命令,判断从节点的存活性和连接状态。可以通过参数 repl-ping-slave-period 控制发送频率。
  • 3) 从节点在主线程中每隔 1 秒发送 replconf ack { offset } 命令,给主节点上报自己当前的复制偏移量。

replconf 命令不仅能实时监测主从节点网络状态,还能上报从节点复制偏移量。主节点会根据从节点上传的偏移量检查复制数据是否丢失,如果从节点数据丢失,再从主节点的复制缓存区中拉取丢失的数据发送给该从节点。

异步复制和命令传播

主节点不但负责数据读写,还负责把写命令同步给从节点。写命令的发送过程是异步完成,也就是说主节点自身处理完写命令后直接返回给客户端,并不等待从节点复制完成。

这个异步过程由命令传播来处理,它不仅会将写命令发送给所有从服务器,还会将写命令入队到复制积压缓冲区里边。

后记

Share

编程小技巧之 Linux 文本处理命令

合格的程序员都善于使用工具,正所谓君子性非异也,善假于物也。合理的利用 Linux 的命令行工具,可以提高我们的工作效率。

本文简单的介绍三个能使用 Linux 文本处理命令的场景,给大家开阔一下思路。希望大家阅读完这篇文章之后,要多加实践,将这些技巧内化到自己的日常工作习惯中,真正的提高效率。内化很重要,就像开玩笑所说的一样,即使我知道高内聚,低耦合的要求,了解 23 种设计模式和 6 大原则,熟读代码整洁之道,却仍然写不出优秀的代码。知道和内化到行为中区别还是很大的。

能不能让正确的原则指导正确的行动本身,其实就是区分是否是高手的一个显著标志。

程序员日常工作中往往要处理一些数据和文本,比如说统计一些服务日志文件信息,根据数据库数据生成一些处理数据的SQL和搜索文件内容等。可以直接通过编写代码处理,但不够便捷,因为有时候线上相关的代码环境依赖不一定具备。而直接使用 Linux 的文本处理命令可以很方便地处理这些问题。

日志文件捞数据

在工作中,我们往往需要对一些具有固定格式的文件进行信息统计,比如说根据 nginx 的 access.log 文件数据,计算出每个后端 API 接口的调用次数,并且排序。

nginx 的 access.log 文件文件格式配置如下所示,每个字段之间通过空格分隔开来。

log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                  '$status $body_bytes_sent "$http_referer" '
                  '"$http_user_agent" "$http_x_forwarded_for"';

上述配置中字段含义如下:

  • $remote_addr : 发送请求的源地址
  • $remote_user : 发送请求的用户信息
  • $time_local : 接收请求的本地时间
  • $request : 请求信息,比如说 http 的 method 和 路径。
  • $status : 请求状态,比如说 200、401或者 500。
  • $body_bytes_sent : 请求 body 字节数。
  • $http_referer : 域名。
  • $http_user_agent : 用户端 agent 信息,一般就是浏览器信息
  • $http_x_forwarded_for : 其他信息。

具体的一段 access.log 内容如下所示。

58.213.85.34 - - [11/Sep/2019:03:36:11 +0800] "POST /publish/pending/list HTTP/2.0" 200 1328 "https://remcarpediem.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36"
58.213.85.34 - - [11/Sep/2019:03:36:30 +0800] "GET /publish/search_inner?key=test HTTP/2.0" 200 34466 "https://remcarpediem.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36"

那么,我们可以通过下面命令来统计所有接口调用的次数,并且从大到小排序显示。

cat /var/log/nginx/access.log | awk '{print $7}' | awk -F'?' '{print $1}' | sort | uniq -c | sort -nr

这条命令涉及了 cat、awk、sort , uniq 四个命令行工具和 | 连接符的含义,我们依次简单讲解一下它们的使用,感兴趣的同学可以自行去全面了解学习。

cat 命令是将文件内容打印到标准输出设备上,可以是终端,也可以是其他文件。比如说:

cat /var/log/nginx/access.log # 打印到终端
cat /var/log/nginx/access.log > copy.log # 打印到其他文件中

| 符号是管道操作符,它将的一个命令的 stdout 指向第二个命令的 stdin。在这条命令中 | 符号将 cat 命令的输出指向到 awk 命令的输入中。

awk 是贝尔实验室 1977 年搞出来的文本流处理工具,用于对具有固定格式的文件进行流处理。比如说 nginx 的 access.log 文件,它各个字段之间通过空格分隔开来,awk 就很适合处理此类文件。

'{print $7}' 就是 awk 的指令声明,表示打印出变量$7$7则是 awk 内置的变量,代表按照分隔符分隔开来的第七个文本内容。对于 access.log 文件来说就是 $request 代表的路径相关的内容。 $request 的全部内容是POST /publish/pending/list HTTP/2.0$6 对应 POST,而 $7 对应的就是 /publish/pending/list

awk '{print $7}' Access.log # ''中是命令声明,后边跟着要操作的文件,也就是awk的输入流。

但是有些时候我们发现文本内容并不是按照空格进行分隔的,比如说 $request 内容可能为 /publish/search_inner?key=test,虽然是相同的 path,但是 query 不同,我们统计接口调用量时需要将 query 部分过滤掉。我们可以使用 awk 的 -F 指令指定分隔符。

awk -F'?' '{print $1}' 
# 可以将 /publish/search_inner?key=test 处理为 /publish/search_inner

sort 是专门用于排序的命令,它有多个参数:

  • -n 按数值进行排序,默认是按照字符值排序,按照数值比较 10 > 2 但是按照字符值排序,2 >10 ,因为字符值会先比较首位,也就是 2 > 1。
  • -r 默认是升序排列,这个参数指定按照逆序排列。
  • -k N 指定按第N列排序,默认是第一个值
sort -nr Access.log # 按照数值逆序排序

最后一个命令是 uniq,它用于消除重复行,或者统计。

sort unsort.txt | uniq #  消除重复行
sort unsort.txt | uniq -c # 统计各行在文件中出现的次数,输入格式是[字数] [内容]
sort unsort.txt | uniq -d # 找出重复行

比如说cat /var/log/nginx/access.log | awk '{print $7}' | awk -F'?' '{print $1}' | sort | uniq -c 命令的输出如下所示,正好作为 sort -nr 的输入。

5 /announcement/pending/list
5 /announcement/search_inner

利用这些指令,我们可以通过 access.log 统计很多信息,比如下列这些信息( access.log 的信息配置不同,不可以直接照搬 )。

cat access.log | awk -F ‘^A’ ‘{if($5 == 500) print $0}’ 
#查找当前日志文件 500 错误的访问:
tail -f access.log | awk -F ‘^A’ ‘{if($6>1) print $0}’ 
#查找耗时超过 1s 的慢请求

数据库SQL

在业务迭代过程中,有些数据库数据可能需要使用脚本去修改,这是我们可以要根据一些数据生成对应的 SQL 命令,这里我们可以使用命令行工具快速生成。
比如说我们要将一系列订单状态有问题,需要将其恢复成正常的状态。你现在已经收集到了这批订单的信息。

oder_id name info good_id
100000  '裤子' '山东' 1000
100001  '上衣' '江苏' 1000
100002  '内衣' '内蒙古' 1000
........
100003  '袜子' '江西' 1000

那么你可以使用如下命令直接生成对应的 SQL 语句。

cat ErrorOrderIdFile | awk '{print"UPDATE ORDER SET state = 0 WHERE resource_id = "$1}'

这里 ''中都是 awk 的命令内容,而""中是打印的纯文本,所以我们可以将需要补充的 SQL 命令打印出来。

代码信息统计

在大公司中,各个团队往往会公开出自己的接口给兄弟团队调用,但是随着版本地快速迭代,公开的接口越来越多,想要关闭掉又往往不清楚上游调用方是哪个部门的,轻易不敢关闭或者修改。这时,如果你能访问整个公司的代码库,就可以通过下面的脚本搜索一下项目中是否出现该接口相关的关键词。

笔者公司团队中微服务间通过 FeignClient 相互调用,所以对于这种情况,可以直接将搜索出对应 FeignClient 的函数名出现的文件名称。

下面是一段在多个项目中统计某些关键词出现次数,并打印出文件名的 bash 脚本。

#!/bin/bash
keyword=$1 # 将bash命令的第一个参数赋值给 keyword
prefix=`echo $keyword | tr -s '.' '|' | sed 's/$/|/'` # 处理前缀
files=`find services -name "*.java" -or -name "*.js" | xargs grep -il $keyword` 
# 最关键的一条,搜索services文件夹下文件名后缀为.java或者.js并且内容中有关键词的文件名称。
if [ -z "$files" ];then
	echo ${prefix}0
fi
# 打印
for f in $files;do
echo "$prefix$f"
done

我们只看一下最关键的 find 命令,其他的命令比如 tr 或者 sed,大家可以自行了解学习。

find 用于查找文件,可以按照文件名称、文件操作权限、文件属主、文件访问时间等条件来查找。

find services -name "*.java" -or -name "*.js" # 搜索 services 文件夹下
find . -atime 7 -type f -print 
# -atime是访问时间,-type 是文件类型,区分文件和目录,查找最近7天访问过的文件。
find . -type f -user remcarpediem -print// 找用户 remcarpediem 所拥有的文件
find . ! -name "*.java" -print # !是否定参数,查找所有不是以 .java 结尾的文件。
find . -type f -name "*.java" -delete # find 之后的操作,可以删除当前目录下所有的 java 文件
find . type f -name "*.java" | xargs rm # 上边语句的另外一种写法

xargs 命令能够将输入数据转化为特定命令的命令行参数,比如说多行变一行等,串联多个命令行,比如说上边 find 和 rm。

> ls 
Sentinel					groovy-engine					spring-cloud-bus-stream-binder-rocketmq
agent-demo					hash						spring-cloud-stream-binder-rabbit
> ls | xargs # 将 ls 的输出内容变成一行。
Sentinel agent-demo groovy-engine  hash spring-cloud-bus-stream-binder-rocketmq spring-cloud-stream-binder-rabbit
> echo "nameXnameXnameXname" | xargs -dX 
name name name name
# -d 选项可以自定义一个定界符,相信你已经了解 xargs 的大致作用了吧,按照分隔符拆分文本到一行,默认分隔符当时是回车了。

最后一个命令时 grep,它是文本搜索命令,它可以搜索文本内容的关键词。

grep remcarpediem file # 将 file 文件中的带有 remcarpediem 关键词的行。
grep -C10 remcarpediem file # 将 file 文件中的带有 remcarpediem 关键词前后10行的内容。
cat LOG.* | grep "FROM " | grep "WHERE" > b # 将日志中的所有带where条件的sql查找查找出来
grep -li remcarpediem file # 忽略大小写,并且打印出文件名称

现在大家在回头看一下这段 bash 脚本,是不是大致了解它执行的过程和原理啦。

files=`find services -name "*.java" -or -name "*.js" | xargs grep -il $keyword` 

后记

本文简单介绍了程序员日常工作中可能用到 Linux 命令的三个场景。大家可以根据自己的实际情况,来判断是否需要继续全面详细地学习相关的知识。毕竟只有能运用于实践,给自己工作产生价值的技术才是真技术。学习一项技术,就要坚持学以致用的目的。

编程小技巧之 IDEA 的 Live Template

Share

Redis 事件机制详解

Redis 采用事件驱动机制来处理大量的网络IO。它并没有使用 libevent 或者 libev 这样的成熟开源方案,而是自己实现一个非常简洁的事件驱动库 ae_event。

Redis中的事件驱动库只关注网络IO,以及定时器。该事件库处理下面两类事件:

  • 文件事件(file event):用于处理 Redis 服务器和客户端之间的网络IO。
  • 时间事件(time eveat):Redis 服务器中的一些操作(比如serverCron函数)需要在给定的时间点执行,而时间事件就是处理这类定时操作的。

事件驱动库的代码主要是在src/ae.c中实现的,其示意图如下所示。

事件管理器示意图

aeEventLoop是整个事件驱动的核心,它管理着文件事件表和时间事件列表,
不断地循环处理着就绪的文件事件和到期的时间事件。下面我们就先分别介绍文件事件和时间事件,然后讲述相关的aeEventLoop源码实现。

文件事件

Redis基于Reactor模式开发了自己的网络事件处理器,也就是文件事件处理器。文件事件处理器使用IO多路复用技术,同时监听多个套接字,并为套接字关联不同的事件处理函数。当套接字的可读或者可写事件触发时,就会调用相应的事件处理函数。

Redis 使用的IO多路复用技术主要有:selectepollevportkqueue等。每个IO多路复用函数库在 Redis 源码中都对应一个单独的文件,比如ae_select.c,ae_epoll.c, ae_kqueue.c等。Redis 会根据不同的操作系统,按照不同的优先级选择多路复用技术。事件响应框架一般都采用该架构,比如 netty 和 libevent。

示意图

如下图所示,文件事件处理器有四个组成部分,它们分别是套接字、I/O多路复用程序、文件事件分派器以及事件处理器。

示意图

文件事件是对套接字操作的抽象,每当一个套接字准备好执行 accept、read、write和 close 等操作时,就会产生一个文件事件。因为 Redis 通常会连接多个套接字,所以多个文件事件有可能并发的出现。

I/O多路复用程序负责监听多个套接字,并向文件事件派发器传递那些产生了事件的套接字。

尽管多个文件事件可能会并发地出现,但I/O多路复用程序总是会将所有产生的套接字都放到同一个队列(也就是后文中描述的aeEventLoopfired就绪事件表)里边,然后文件事件处理器会以有序、同步、单个套接字的方式处理该队列中的套接字,也就是处理就绪的文件事件。

一次请求的过程示意图

所以,一次 Redis 客户端与服务器进行连接并且发送命令的过程如上图所示。

  • 客户端向服务端发起建立 socket 连接的请求,那么监听套接字将产生 AE_READABLE 事件,触发连接应答处理器执行。处理器会对客户端的连接请求进行应答,然后创建客户端套接字,以及客户端状态,并将客户端套接字的 AE_READABLE 事件与命令请求处理器关联。
  • 客户端建立连接后,向服务器发送命令,那么客户端套接字将产生 AE_READABLE 事件,触发命令请求处理器执行,处理器读取客户端命令,然后传递给相关程序去执行。
  • 执行命令获得相应的命令回复,为了将命令回复传递给客户端,服务器将客户端套接字的 AE_WRITEABLE 事件与命令回复处理器关联。当客户端试图读取命令回复时,客户端套接字产生 AE_WRITEABLE 事件,触发命令回复处理器将命令回复全部写入到套接字中。

时间事件

Redis 的时间事件分为以下两类:

  • 定时事件:让一段程序在指定的时间之后执行一次。
  • 周期性事件:让一段程序每隔指定时间就执行一次。

Redis 的时间事件的具体定义结构如下所示。

typedef struct aeTimeEvent {
    /* 全局唯一ID */
    long long id; /* time event identifier. */
    /* 秒精确的UNIX时间戳,记录时间事件到达的时间*/
    long when_sec; /* seconds */
    /* 毫秒精确的UNIX时间戳,记录时间事件到达的时间*/
    long when_ms; /* milliseconds */
    /* 时间处理器 */
    aeTimeProc *timeProc;
    /* 事件结束回调函数,析构一些资源*/
    aeEventFinalizerProc *finalizerProc;
    /* 私有数据 */
    void *clientData;
    /* 前驱节点 */
    struct aeTimeEvent *prev;
    /* 后继节点 */
    struct aeTimeEvent *next;
} aeTimeEvent;

一个时间事件是定时事件还是周期性事件取决于时间处理器的返回值:

  • 如果返回值是 AE_NOMORE,那么这个事件是一个定时事件,该事件在达到后删除,之后不会再重复。
  • 如果返回值是非 AE_NOMORE 的值,那么这个事件为周期性事件,当一个时间事件到达后,服务器会根据时间处理器的返回值,对时间事件的 when 属性进行更新,让这个事件在一段时间后再次达到。

Redis 将所有时间事件都放在一个无序链表中,每次 Redis 会遍历整个链表,查找所有已经到达的时间事件,并且调用相应的事件处理器。

介绍完文件事件和时间事件,我们接下来看一下 aeEventLoop的具体实现。

创建事件管理器

Redis 服务端在其初始化函数 initServer中,会创建事件管理器aeEventLoop对象。

函数aeCreateEventLoop将创建一个事件管理器,主要是初始化 aeEventLoop的各个属性值,比如eventsfiredtimeEventHeadapidata

  • 首先创建aeEventLoop对象。
  • 初始化未就绪文件事件表、就绪文件事件表。events指针指向未就绪文件事件表、fired指针指向就绪文件事件表。表的内容在后面添加具体事件时进行初变更。
  • 初始化时间事件列表,设置timeEventHeadtimeEventNextId属性。
  • 调用aeApiCreate 函数创建epoll实例,并初始化 apidata
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;
    /* 创建事件状态结构 */
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    /* 创建未就绪事件表、就绪事件表 */
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    /* 设置数组大小 */
    eventLoop->setsize = setsize;
    /* 初始化执行最近一次执行时间 */
    eventLoop->lastTime = time(NULL);
    /* 初始化时间事件结构 */
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    /* 将多路复用io与事件管理器关联起来 */
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* 初始化监听事件 */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;
err:
   .....
}

aeApiCreate 函数首先创建了aeApiState对象,初始化了epoll就绪事件表;然后调用epoll_create创建了epoll实例,最后将该aeApiState赋值给apidata属性。

aeApiState对象中epfd存储epoll的标识,events是一个epoll就绪事件数组,当有epoll事件发生时,所有发生的epoll事件和其描述符将存储在这个数组中。这个就绪事件数组由应用层开辟空间、内核负责把所有发生的事件填充到该数组。

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    /* 初始化epoll就绪事件表 */
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    /* 创建 epoll 实例 */
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    /* 事件管理器与epoll关联 */
    eventLoop->apidata = state;
    return 0;
}
typedef struct aeApiState {
    /* epoll_event 实例描述符*/
    int epfd;
    /* 存储epoll就绪事件表 */
    struct epoll_event *events;
} aeApiState;

创建文件事件

aeFileEvent是文件事件结构,对于每一个具体的事件,都有读处理函数和写处理函数等。Redis 调用aeCreateFileEvent函数针对不同的套接字的读写事件注册对应的文件事件。

typedef struct aeFileEvent {
    /* 监听事件类型掩码,值可以是 AE_READABLE 或 AE_WRITABLE */
    int mask;
    /* 读事件处理器 */
    aeFileProc *rfileProc;
    /* 写事件处理器 */
    aeFileProc *wfileProc;
    /* 多路复用库的私有数据 */
    void *clientData;
} aeFileEvent;
/* 使用typedef定义的处理器函数的函数类型 */
typedef void aeFileProc(struct aeEventLoop *eventLoop, 
int fd, void *clientData, int mask);

比如说,Redis 进行主从复制时,从服务器需要主服务器建立连接,它会发起一个 socekt连接,然后调用aeCreateFileEvent函数针对发起的socket的读写事件注册了对应的事件处理器,也就是syncWithMaster函数。

aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL);
/* 符合aeFileProc的函数定义 */
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {....}

aeCreateFileEvent的参数fd指的是具体的socket套接字,procfd产生事件时,具体的处理函数,clientData则是回调处理函数时需要传入的数据。
aeCreateFileEvent主要做了三件事情:

  • fd为索引,在events未就绪事件表中找到对应事件。
  • 调用aeApiAddEvent函数,该事件注册到具体的底层 I/O 多路复用中,本例为epoll。
  • 填充事件的回调、参数、事件类型等参数。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
                       aeFileProc *proc, void *clientData)
{
    /* 取出 fd 对应的文件事件结构, fd 代表具体的 socket 套接字 */
    aeFileEvent *fe = &eventLoop->events[fd];
    /* 监听指定 fd 的指定事件 */
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    /* 置文件事件类型,以及事件的处理器 */
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    /* 私有数据 */
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

如上文所说,Redis 基于的底层 I/O 多路复用库有多套,所以aeApiAddEvent也有多套实现,下面的源码是epoll下的实现。其核心操作就是调用epollepoll_ctl函数来向epoll注册响应事件。有关epoll相关的知识可以看一下《Java NIO源码分析》

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* 如果 fd 没有关联任何事件,那么这是一个 ADD 操作。如果已经关联了某个/某些事件,那么这是一个 MOD 操作。 */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    /* 注册事件到 epoll */
    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    /* 调用epoll_ctl 系统调用,将事件加入epoll中 */
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

事件处理

因为 Redis 中同时存在文件事件和时间事件两个事件类型,所以服务器必须对这两个事件进行调度,决定何时处理文件事件,何时处理时间事件,以及如何调度它们。

aeMain函数以一个无限循环不断地调用aeProcessEvents函数来处理所有的事件。

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        /* 如果有需要在事件处理前执行的函数,那么执行它 */
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        /* 开始处理事件*/
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

下面是aeProcessEvents的伪代码,它会首先计算距离当前时间最近的时间事件,以此计算一个超时时间;然后调用aeApiPoll函数去等待底层的I/O多路复用事件就绪;aeApiPoll函数返回之后,会处理所有已经产生文件事件和已经达到的时间事件。

/* 伪代码 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
    /* 获取到达时间距离当前时间最接近的时间事件*/
    time_event = aeSearchNearestTimer();
    /* 计算最接近的时间事件距离到达还有多少毫秒*/
    remaind_ms = time_event.when - unix_ts_now();
    /* 如果事件已经到达,那么remaind_ms为负数,将其设置为0 */
    if (remaind_ms < 0) remaind_ms = 0;
    /* 根据 remaind_ms 的值,创建 timeval 结构*/
    timeval = create_timeval_with_ms(remaind_ms);
    /* 阻塞并等待文件事件产生,最大阻塞时间由传入的 timeval 结构决定,如果remaind_ms 的值为0,则aeApiPoll 调用后立刻返回,不阻塞*/
    /* aeApiPoll调用epoll_wait函数,等待I/O事件*/
    aeApiPoll(timeval);
    /* 处理所有已经产生的文件事件*/
    processFileEvents();
    /* 处理所有已经到达的时间事件*/
    processTimeEvents();
}

aeApiAddEvent类似,aeApiPoll也有多套实现,它其实就做了两件事情,调用epoll_wait阻塞等待epoll的事件就绪,超时时间就是之前根据最快达到时间事件计算而来的超时时间;然后将就绪的epoll事件转换到fired就绪事件。aeApiPoll就是上文所说的I/O多路复用程序。具体过程如下图所示。

aeApiPoll示意图

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) 
{
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    // 调用epoll_wait函数,等待时间为最近达到时间事件的时间计算而来。
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    // 有至少一个事件就绪?
    if (retval > 0) 
    {
        int j;
        /*为已就绪事件设置相应的模式,并加入到 eventLoop 的 fired 数组中*/
        numevents = retval;
        for (j = 0; j < numevents; j++) 
	{
            int mask = 0;
            struct epoll_event *e = state->events+j;
            if (e->events & EPOLLIN)
		mask |= AE_READABLE;
            if (e->events & EPOLLOUT)
		mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) 
		mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP)
		mask |= AE_WRITABLE;
            /* 设置就绪事件表元素 */
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    
    // 返回已就绪事件个数
    return numevents;
}

processFileEvent是处理就绪文件事件的伪代码,也是上文所述的文件事件分派器,它其实就是遍历fired就绪事件表,然后根据对应的事件类型来调用事件中注册的不同处理器,读事件调用rfileProc,而写事件调用wfileProc

void processFileEvent(int numevents) {
    for (j = 0; j < numevents; j++) {
            /* 从已就绪数组中获取事件 */
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0;
            int invert = fe->mask & AE_BARRIER;
	        /* 读事件 */
            if (!invert && fe->mask & mask & AE_READABLE) {
                /* 调用读处理函数 */
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
            /* 写事件. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            processed++;
        }
    }
}

processTimeEvents是处理时间事件的函数,它会遍历aeEventLoop的事件事件列表,如果时间事件到达就执行其timeProc函数,并根据函数的返回值是否等于AE_NOMORE来决定该时间事件是否是周期性事件,并修改器到达时间。

static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    time_t now = time(NULL);
    ....
    eventLoop->lastTime = now;

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    /* 遍历时间事件链表 */
    while(te) {
        long now_sec, now_ms;
        long long id;

        /* 删除需要删除的时间事件 */
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            if (te->prev)
                te->prev->next = te->next;
            else
                eventLoop->timeEventHead = te->next;
            if (te->next)
                te->next->prev = te->prev;
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }

        /* id 大于最大maxId,是该循环周期生成的时间事件,不处理 */
        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);
        /* 事件已经到达,调用其timeProc函数*/
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;

            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            /* 如果返回值不等于 AE_NOMORE,表示是一个周期性事件,修改其when_sec和when_ms属性*/
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                /* 一次性事件,标记为需删除,下次遍历时会删除*/
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        te = te->next;
    }
    return processed;
}

删除事件

当不在需要某个事件时,需要把事件删除掉。例如: 如果fd同时监听读事件、写事件。当不在需要监听写事件时,可以把该fd的写事件删除。

aeDeleteEventLoop函数的执行过程总结为以下几个步骤
1、根据fd在未就绪表中查找到事件
2、取消该fd对应的相应事件标识符
3、调用aeApiFree函数,内核会将epoll监听红黑树上的相应事件监听取消。

后记

接下来,我们会继续学习 Redis 的主从复制相关的原理,欢迎大家持续关注。

Share

Redis AOF 持久化详解

Redis 是一种内存数据库,将数据保存在内存中,读写效率要比传统的将数据保存在磁盘上的数据库要快很多。但是一旦进程退出,Redis 的数据就会丢失。

为了解决这个问题,Redis 提供了 RDB 和 AOF 两种持久化方案,将内存中的数据保存到磁盘中,避免数据丢失。RDB的介绍在这篇文章中《Redis RDB 持久化详解》,今天我们来看一下 AOF 相关的原理。

AOF( append only file )持久化以独立日志的方式记录每次写命令,并在 Redis 重启时在重新执行 AOF 文件中的命令以达到恢复数据的目的。AOF 的主要作用是解决数据持久化的实时性。

RDB 和 AOF

antirez 在《Redis 持久化解密》一文中讲述了 RDB 和 AOF 各自的优缺点:

  • RDB 是一个紧凑压缩的二进制文件,代表 Redis 在某个时间点上的数据备份。非常适合备份,全量复制等场景。比如每6小时执行 bgsave 备份,并把 RDB 文件拷贝到远程机器或者文件系统中,用于灾难恢复。
  • Redis 加载 RDB 恢复数据远远快于 AOF 的方式
  • RDB 方式数据没办法做到实时持久化,而 AOF 方式可以做到。

下面,我们就来了解一下 AOF 是如何做到实时持久化的。

AOF 持久化的实现

如上图所示,AOF 持久化功能的实现可以分为命令追加( append )、文件写入( write )、文件同步( sync )、文件重写(rewrite)和重启加载(load)。其流程如下:

  • 所有的写命令会追加到 AOF 缓冲中。
  • AOF 缓冲区根据对应的策略向硬盘进行同步操作。
  • 随着 AOF 文件越来越大,需要定期对 AOF 文件进行重写,达到压缩的目的。
  • 当 Redis 重启时,可以加载 AOF 文件进行数据恢复。

命令追加

当 AOF 持久化功能处于打开状态时,Redis 在执行完一个写命令之后,会以协议格式(也就是RESP,即 Redis 客户端和服务器交互的通信协议 )将被执行的写命令追加到 Redis 服务端维护的 AOF 缓冲区末尾。

比如说 SET mykey myvalue 这条命令就以如下格式记录到 AOF 缓冲中。

"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"

Redis 协议格式本文不再赘述,AOF之所以直接采用文本协议格式,是因为所有写入命令都要进行追加操作,直接采用协议格式,避免了二次处理开销。

文件写入和同步

Redis 每次结束一个事件循环之前,它都会调用 flushAppendOnlyFile 函数,判断是否需要将 AOF 缓存区中的内容写入和同步到 AOF 文件中。

flushAppendOnlyFile 函数的行为由 redis.conf 配置中的 appendfsync 选项的值来决定。该选项有三个可选值,分别是 alwayseverysecno

  • always:Redis 在每个事件循环都要将 AOF 缓冲区中的所有内容写入到 AOF 文件,并且同步 AOF 文件,所以 always 的效率是 appendfsync 选项三个值当中最差的一个,但从安全性来说,也是最安全的。当发生故障停机时,AOF 持久化也只会丢失一个事件循环中所产生的命令数据。
  • everysec:Redis 在每个事件循环都要将 AOF 缓冲区中的所有内容写入到 AOF 文件中,并且每隔一秒就要在子线程中对 AOF 文件进行一次同步。从效率上看,该模式足够快。当发生故障停机时,只会丢失一秒钟的命令数据。
  • no:Redis 在每一个事件循环都要将 AOF 缓冲区中的所有内容写入到 AOF 文件。而 AOF 文件的同步由操作系统控制。这种模式下速度最快,但是同步的时间间隔较长,出现故障时可能会丢失较多数据。

Linux 系统下 write 操作会触发延迟写( delayed write )机制。Linux 在内核提供页缓存区用来提供硬盘 IO 性能。write 操作在写入系统缓冲区之后直接返回。同步硬盘操作依赖于系统调度机制,例如:缓冲区页空间写满或者达到特定时间周期。同步文件之前,如果此时系统故障宕机,缓冲区内数据将丢失。

fsync 针对单个文件操作,对其进行强制硬盘同步,fsync 将阻塞直到写入磁盘完成后返回,保证了数据持久化。

appendfsync的三个值代表着三种不同的调用 fsync的策略。调用fsync周期越频繁,读写效率就越差,但是相应的安全性越高,发生宕机时丢失的数据越少。

有关 Linux 的I/O和各个系统调用的作用如下图所示。具体内容可以查看《聊聊 Linux I/O》一文。

AOF 数据恢复

AOF 文件里边包含了重建 Redis 数据所需的所有写命令,所以 Redis 只要读入并重新执行一遍 AOF 文件里边保存的写命令,就可以还原 Redis 关闭之前的状态。

Redis 读取 AOF 文件并且还原数据库状态的详细步骤如下:

  • 创建一个不带网络连接的的伪客户端( fake client),因为 Redis 的命令只能在客户端上下文中执行,而载入 AOF 文件时所使用的的命令直接来源于 AOF 文件而不是网络连接,所以服务器使用了一个没有网络连接的伪客户端来执行 AOF 文件保存的写命令,伪客户端执行命令的效果和带网络连接的客户端执行命令的效果完全一样的。
  • 从 AOF 文件中分析并取出一条写命令。
  • 使用伪客户端执行被读出的写命令。
  • 一直执行步骤 2 和步骤3,直到 AOF 文件中的所有写命令都被处理完毕为止。

当完成以上步骤之后,AOF 文件所保存的数据库状态就会被完整还原出来。

AOF 重写

因为 AOF 持久化是通过保存被执行的写命令来记录 Redis 状态的,所以随着 Redis 长时间运行,AOF 文件中的内容会越来越多,文件的体积也会越来越大,如果不加以控制的话,体积过大的 AOF 文件很可能对 Redis 甚至宿主计算机造成影响。

为了解决 AOF 文件体积膨胀的问题,Redis 提供了 AOF 文件重写( rewrite) 功能。通过该功能,Redis 可以创建一个新的 AOF 文件来替代现有的 AOF 文件。新旧两个 AOF 文件所保存的 Redis 状态相同,但是新的 AOF 文件不会包含任何浪费空间的荣誉命令,所以新 AOF 文件的体积通常比旧 AOF 文件的体积要小得很多。

如上图所示,重写前要记录名为list的键的状态,AOF 文件要保存五条命令,而重写后,则只需要保存一条命令。

AOF 文件重写并不需要对现有的 AOF 文件进行任何读取、分析或者写入操作,而是通过读取服务器当前的数据库状态来实现的。首先从数据库中读取键现在的值,然后用一条命令去记录键值对,代替之前记录这个键值对的多条命令,这就是 AOF 重写功能的实现原理。

在实际过程中,为了避免在执行命令时造成客户端输入缓冲区溢出,AOF 重写在处理列表、哈希表、集合和有序集合这四种可能会带有多个元素的键时,会先检查键所包含的元素数量,如果数量超过 REDIS_AOF_REWRITE_ITEMS_PER_CMD ( 一般为64 )常量,则使用多条命令记录该键的值,而不是一条命令。

rewrite的触发机制主要有一下三个:

  • 手动调用 bgrewriteaof 命令,如果当前有正在运行的 rewrite 子进程,则本次rewrite 会推迟执行,否则,直接触发一次 rewrite。
  • 通过配置指令手动开启 AOF 功能,如果没有 RDB 子进程的情况下,会触发一次 rewrite,将当前数据库中的数据写入 rewrite 文件。
  • 在 Redis 定时器中,如果有需要退出执行的 rewrite 并且没有正在运行的 RDB 或者 rewrite 子进程时,触发一次或者 AOF 文件大小已经到达配置的 rewrite 条件也会自动触发一次。

AOF 后台重写

AOF 重写函数会进行大量的写入操作,调用该函数的线程将被长时间阻塞,所以 Redis 在子进程中执行 AOF 重写操作。

  • 子进程进行 AOF 重写期间,Redis 进程可以继续处理客户端命令请求。
  • 子进程带有父进程的内存数据拷贝副本,在不适用锁的情况下,也可以保证数据的安全性。

但是,在子进程进行 AOF 重启期间,Redis接收客户端命令,会对现有数据库状态进行修改,从而导致数据当前状态和 重写后的 AOF 文件所保存的数据库状态不一致。

为此,Redis 设置了一个 AOF 重写缓冲区,这个缓冲区在服务器创建子进程之后开始使用,当 Redis 执行完一个写命令之后,它会同时将这个写命令发送给 AOF 缓冲区和 AOF 重写缓冲区。

当子进程完成 AOF 重写工作之后,它会向父进程发送一个信号,父进程在接收到该信号之后,会调用一个信号处理函数,并执行以下工作:

  • 将 AOF 重写缓冲区中的所有内容写入到新的 AOF 文件中,保证新 AOF 文件保存的数据库状态和服务器当前状态一致。
  • 对新的 AOF 文件进行改名,原子地覆盖现有 AOF 文件,完成新旧文件的替换
  • 继续处理客户端请求命令。

在整个 AOF 后台重写过程中,只有信号处理函数执行时会对 Redis 主进程造成阻塞,在其他时候,AOF 后台重写都不会阻塞主进程。

后记

后续将会继续学习 Redis 复制和集群相关的知识,希望大家持久关注。

Share

Redis RDB 持久化详解

Redis 是一种内存数据库,将数据保存在内存中,读写效率要比传统的将数据保存在磁盘上的数据库要快很多。但是一旦进程退出,Redis 的数据就会丢失。

为了解决这个问题,Redis 提供了 RDB 和 AOF 两种持久化方案,将内存中的数据保存到磁盘中,避免数据丢失。

antirez 在《Redis 持久化解密》一文中说,一般来说有三种常见的策略来进行持久化操作,防止数据损坏:

  • 方法1 是数据库不关心发生故障,在数据文件损坏后通过数据备份或者快照来进行恢复。Redis 的 RDB 持久化就是这种方式。

  • 方法2 是数据库使用操作日志,每次操作时记录操作行为,以便在故障后通过日志恢复到一致性的状态。因为操作日志是顺序追加的方式写的,所以不会出现操作日志也无法恢复的情况。类似于 Mysql 的 redo 和 undo 日志,具体可以看这篇《InnoDB的磁盘文件及落盘机制》文章。

  • 方法3 是数据库不进行老数据的修改,只是以追加方式去完成写操作,这样数据本身就是一份日志,这样就永远不会出现数据无法恢复的情况了。CouchDB就是此做法的优秀范例。

RDB 就是第一种方法,它就是把当前 Redis 进程的数据生成时间点快照( point-in-time snapshot ) 保存到存储设备的过程。

RDB 的使用

RDB 触发机制分为使用指令手动触发和 redis.conf 配置自动触发。

手动触发 Redis 进行 RDB 持久化的指令的为:

  • save ,该指令会阻塞当前 Redis 服务器,执行 save 指令期间,Redis 不能处理其他命令,直到 RDB 过程完成为止。
  • bgsave,执行该命令时,Redis 会在后台异步执行快照操作,此时 Redis 仍然可以相应客户端请求。具体操作是 Redis 进程执行 fork 操作创建子进程,RDB 持久化过程由子进程负责,完成后自动结束。Redis 只会在 fork 期间发生阻塞,但是一般时间都很短。但是如果 Redis 数据量特别大,fork 时间就会变长,而且占用内存会加倍,这一点需要特别注意。

自动触发 RDB 的默认配置如下所示:

save 900 1 # 表示900 秒内如果至少有 1 个 key 的值变化,则触发RDB
save 300 10 # 表示300 秒内如果至少有 10 个 key 的值变化,则触发RDB
save 60 10000 # 表示60 秒内如果至少有 10000 个 key 的值变化,则触发RDB

如果不需要 Redis 进行持久化,那么可以注释掉所有的 save 行来停用保存功能,也可以直接一个空字符串来停用持久化:save “”。

Redis 服务器周期操作函数 serverCron 默认每个 100 毫秒就会执行一次,该函数用于正在运行的服务器进行维护,它的一项工作就是检查 save 选项所设置的条件是否有一项被满足,如果满足的话,就执行 bgsave 指令。

RDB 整体流程

了解了 RDB 的基础使用后,我们要继续深入对 RDB持久化的学习。在此之前,我们可以先思考一下如何实现一个持久化机制,毕竟这是很多中间件所需的一个模块。

首先,持久化保存的文件内容结构必须是紧凑的,特别对于数据库来说,需要持久化的数据量十分大,需要保证持久化文件不至于占用太多存储。
其次,进行持久化时,中间件应该还可以快速地响应用户请求,持久化的操作应该尽量少影响中间件的其他功能。
最后,毕竟持久化会消耗性能,如何在性能和数据安全性之间做出平衡,如何灵活配置触发持久化操作。

接下来我们将带着这些问题,到源码中寻求答案。

本文中的源码来自 Redis 4.0 ,RDB持久化过程的相关源码都在 rdb.c 文件中。其中大概的流程如下图所示。

上图表明了三种触发 RDB 持久化的手段之间的整体关系。通过 serverCron 自动触发的 RDB 相当于直接调用了 bgsave 指令的流程进行处理。而 bgsave 的处理流程启动子进程后,调用了 save 指令的处理流程。

下面我们从 serverCron 自动触发逻辑开始研究。

自动触发 RDB 持久化

如上图所示,redisServer 结构体的save_params指向拥有三个值的数组,该数组的值与 redis.conf 文件中 save 配置项一一对应。分别是 save 900 1save 300 10save 60 10000dirty 记录着有多少键值发生变化,lastsave记录着上次 RDB 持久化的时间。

serverCron 函数就是遍历该数组的值,检查当前 Redis 状态是否符合触发 RDB 持久化的条件,比如说距离上次 RDB 持久化过去了 900 秒并且有至少一条数据发生变更。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ....
    /* Check if a background saving or AOF rewrite in progress terminated. */
    /* 判断后台是否正在进行 rdb 或者 aof 操作 */
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
        ldbPendingChildren())
    {
        ....
    } else {
        // 到这儿就能确定 当前木有进行 rdb 或者 aof 操作
        // 遍历每一个 rdb 保存条件
         for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;

            //如果数据保存记录 大于规定的修改次数 且距离 上一次保存的时间大于规定时间或者上次BGSAVE命令执行成功,才执行 BGSAVE 操作
            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds &&
                (server.unixtime-server.lastbgsave_try >
                 CONFIG_BGSAVE_RETRY_DELAY ||
                 server.lastbgsave_status == C_OK))
            {
                //记录日志
                serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, (int)sp->seconds);
                rdbSaveInfo rsi, *rsiptr;
                rsiptr = rdbPopulateSaveInfo(&rsi);
                // 异步保存操作
                rdbSaveBackground(server.rdb_filename,rsiptr);
                break;
            }
         }
    }
    ....
    server.cronloops++;
    return 1000/server.hz;
}

如果符合触发 RDB 持久化的条件,serverCron会调用rdbSaveBackground函数,也就是 bgsave 指令会触发的函数。

子进程后台执行 RDB 持久化

执行 bgsave 指令时,Redis 会先触发 bgsaveCommand 进行当前状态检查,然后才会调用rdbSaveBackground,其中的逻辑如下图所示。

rdbSaveBackground 函数中最主要的工作就是调用 fork 命令生成子流程,然后在子流程中执行 rdbSave函数,也就是 save 指令最终会触发的函数。

int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
    pid_t childpid;
    long long start;
    // 检查后台是否正在执行 aof 或者 rdb 操作
    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
    // 拿出 数据保存记录,保存为 上次记录
    server.dirty_before_bgsave = server.dirty;
    // bgsave 时间
    server.lastbgsave_try = time(NULL);
    start = ustime();
    // fork 子进程
    if ((childpid = fork()) == 0) {
        int retval;
        /* 关闭子进程继承的 socket 监听 */
        closeListeningSockets(0);
        // 子进程 title 修改
        redisSetProcTitle("redis-rdb-bgsave");
        // 执行rdb 写入操作
        retval = rdbSave(filename,rsi);
        // 执行完毕以后
        ....
        // 退出子进程
        exitFromChild((retval == C_OK) ? 0 : 1);
    } else {
        /* 父进程,进行fork时间的统计和信息记录,比如说rdb_save_time_start、rdb_child_pid、和rdb_child_type */
        ....
        // rdb 保存开始时间 bgsave 子进程
        server.rdb_save_time_start = time(NULL);
        server.rdb_child_pid = childpid;
        server.rdb_child_type = RDB_CHILD_TYPE_DISK;
        updateDictResizePolicy();
        return C_OK;
    }
    return C_OK; /* unreached */
}

为什么 Redis 使用子进程而不是线程来进行后台 RDB 持久化呢?主要是出于Redis性能的考虑,我们知道Redis对客户端响应请求的工作模型是单进程和单线程的,如果在主进程内启动一个线程,这样会造成对数据的竞争条件。所以为了避免使用锁降低性能,Redis选择启动新的子进程,独立拥有一份父进程的内存拷贝,以此为基础执行RDB持久化。

但是需要注意的是,fork 会消耗一定时间,并且父子进程所占据的内存是相同的,当 Redis 键值较大时,fork 的时间会很长,这段时间内 Redis 是无法响应其他命令的。除此之外,Redis 占据的内存空间会翻倍。

生成 RDB 文件,并且持久化到硬盘

Redis 的 rdbSave 函数是真正进行 RDB 持久化的函数,它的大致流程如下:

  • 首先打开一个临时文件,
  • 调用 rdbSaveRio函数,将当前 Redis 的内存信息写入到这个临时文件中,
  • 接着调用 fflushfsyncfclose 接口将文件写入磁盘中,
  • 使用 rename 将临时文件改名为 正式的 RDB 文件,
  • 最后记录 dirtylastsave等状态信息。这些状态信息在 serverCron时会使用到。
int rdbSave(char *filename, rdbSaveInfo *rsi) {
    char tmpfile[256];
    // 当前工作目录
    char cwd[MAXPATHLEN];
    FILE *fp;
    rio rdb;
    int error = 0;

    /* 生成tmpfile文件名 temp-[pid].rdb */
    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    /* 打开文件 */
    fp = fopen(tmpfile,"w");
    .....
    /* 初始化rio结构 */
    rioInitWithFile(&rdb,fp);

    if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
        errno = error;
        goto werr;
    }

    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /* 重新命名 rdb 文件,把之前临时的名称修改为正式的 rdb 文件名称 */
    if (rename(tmpfile,filename) == -1) {
        // 异常处理
        ....
    }
    // 写入完成,打印日志
    serverLog(LL_NOTICE,"DB saved on disk");
    // 清理数据保存记录
    server.dirty = 0;
    // 最后一次完成 SAVE 命令的时间
    server.lastsave = time(NULL);
    // 最后一次 bgsave 的状态置位 成功
    server.lastbgsave_status = C_OK;
    return C_OK;
    ....
}

这里要简单说一下 fflushfsync的区别。它们俩都是用于刷缓存,但是所属的层次不同。fflush函数用于 FILE* 指针上,将缓存数据从应用层缓存刷新到内核中,而fsync 函数则更加底层,作用于文件描述符,用于将内核缓存刷新到物理设备上。

关于 Linux IO 的具体原理可以参考《聊聊Linux IO》

内存数据到 RDB 文件

rdbSaveRio 会将 Redis 内存中的数据以相对紧凑的格式写入到文件中,其文件格式的示意图如下所示。

rdbSaveRio函数的写入大致流程如下:

  • 先写入 REDIS 魔法值,然后是 RDB 文件的版本( rdb_version ),额外辅助信息 ( aux )。辅助信息中包含了 Redis 的版本,内存占用和复制库( repl-id )和偏移量( repl-offset )等。

  • 然后 rdbSaveRio 会遍历当前 Redis 的所有数据库,将数据库的信息依次写入。先写入 RDB_OPCODE_SELECTDB识别码和数据库编号,接着写入RDB_OPCODE_RESIZEDB识别码和数据库键值数量和待失效键值数量,最后会遍历所有的键值,依次写入。

  • 在写入键值时,当该键值有失效时间时,会先写入RDB_OPCODE_EXPIRETIME_MS识别码和失效时间,然后写入键值类型的识别码,最后再写入键和值。

  • 写完数据库信息后,还会把 Lua 相关的信息写入,最后再写入 RDB_OPCODE_EOF结束符识别码和校验值。

    int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
        snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
        /* 1 写入 magic字符'REDIS' 和 RDB 版本 */
        if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
        /* 2 写入辅助信息  REDIS版本,服务器操作系统位数,当前时间,复制信息比如repl-stream-db,repl-id和repl-offset等等数据*/
        if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
        /* 3 遍历每一个数据库,逐个数据库数据保存 */
        for (j = 0; j < server.dbnum; j++) {
            /* 获取数据库指针地址和数据库字典 */
            redisDb *db = server.db+j;
            dict *d = db->dict;
            /* 3.1 写入数据库部分的开始标识 */
            if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
            /* 3.2 写入当前数据库号 */
            if (rdbSaveLen(rdb,j) == -1) goto werr;
    
            uint32_t db_size, expires_size;
            /* 获取数据库字典大小和过期键字典大小 */
            db_size = (dictSize(db->dict) <= UINT32_MAX) ?
                                    dictSize(db->dict) :
                                    UINT32_MAX;
            expires_size = (dictSize(db->expires) <= UINT32_MAX) ?
                                    dictSize(db->expires) :
                                    UINT32_MAX;
            /* 3.3 写入当前待写入数据的类型,此处为 RDB_OPCODE_RESIZEDB,表示数据库大小 */
            if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
            /* 3.4 写入获取数据库字典大小和过期键字典大小 */
            if (rdbSaveLen(rdb,db_size) == -1) goto werr;
            if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
            /* 4 遍历当前数据库的键值对 */
            while((de = dictNext(di)) != NULL) {
                sds keystr = dictGetKey(de);
                robj key, *o = dictGetVal(de);
                long long expire;
    
                /* 初始化 key,因为操作的是 key 字符串对象,而不是直接操作 键的字符串内容 */
                initStaticStringObject(key,keystr);
                /* 获取键的过期数据 */
                expire = getExpire(db,&key);
                /* 4.1 保存键值对数据 */
                if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
            }
    
        }
    
        /* 5 保存 Lua 脚本*/
        if (rsi && dictSize(server.lua_scripts)) {
            di = dictGetIterator(server.lua_scripts);
            while((de = dictNext(di)) != NULL) {
                robj *body = dictGetVal(de);
                if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
                    goto werr;
            }
            dictReleaseIterator(di);
        }
    
        /* 6 写入结束符 */
        if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
    
        /* 7 写入CRC64校验和 */
        cksum = rdb->cksum;
        memrev64ifbe(&cksum);
        if (rioWrite(rdb,&cksum,8) == 0) goto werr;
        return C_OK;
    }
    

rdbSaveRio在写键值时,会调用rdbSaveKeyValuePair 函数。该函数会依次写入键值的过期时间,键的类型,键和值。

int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime)
{
    /* 如果有过期信息 */
    if (expiretime != -1) {
        /* 保存过期信息标识 */
        if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        /* 保存过期具体数据内容 */
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }

    /* Save type, key, value */
    /* 保存键值对 类型的标识 */
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
    /* 保存键值对 键的内容 */
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    /* 保存键值对 值的内容 */
    if (rdbSaveObject(rdb,val) == -1) return -1;
    return 1;
}

根据键的不同类型写入不同格式,各种键值的类型和格式如下所示。

Redis 有庞大的对象和数据结构体系,它使用六种底层数据结构构建了包含字符串对象、列表对象、哈希对象、集合对象和有序集合对象的对象系统。感兴趣的同学可以参考 《十二张图带你了解 Redis 的数据结构和对象系统》一文。

不同的数据结构进行 RDB 持久化的格式都不同。我们今天只看一下集合对象是如何持久化的。

ssize_t rdbSaveObject(rio *rdb, robj *o) {
    ssize_t n = 0, nwritten = 0;
    ....
    } else if (o->type == OBJ_SET) {
        /* Save a set value */
        if (o->encoding == OBJ_ENCODING_HT) {
            dict *set = o->ptr;
            // 集合迭代器
            dictIterator *di = dictGetIterator(set);
            dictEntry *de;
            // 写入集合长度
            if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) return -1;
            nwritten += n;
            // 遍历集合元素
            while((de = dictNext(di)) != NULL) {
                sds ele = dictGetKey(de);
                // 以字符串的形式写入,因为是SET 所以只写入 Key 即可
                if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))
                    == -1) return -1;
                nwritten += n;
            }
            dictReleaseIterator(di);
        } 
    .....
    return nwritten;
}

Share

编程小技巧之 IDEA 的 Live Template

合格的程序员都善于使用工具,正所谓君子性非异也,善假于物也。

使用自动化工具可以减少自己的工作量,提高工作效率。日常编程过程中,我们经常需要编写重复的代码片段,比如说

private static final Logger LOGGER = LoggerFactory.getLogger(HashServiceImpl.class);

每次编写时都要键入很多键,有什么方法可以快速生成这段代码呢?类似的,如何保存格式固定的常用代码片段,然后在需要时快速生成呢。IDEA 的 Live Template 是一个可行的途径。

我也是最近才逐渐使用 IDEA 的 Live Template 功能,之前虽然知道这个功能,但是没有养成使用的习惯。最近一段时间在不断审视并反思自己的编程、工作和生活习惯,才发现其中有很多可以优化精进的地方。

这也是《程序员修炼之道》中所说的 Think ! About Your Work 。

IDEA 是一个很强大的编程工具,学会使用它能够极大的提高工作效率,将精力投入到更关键的事情上,而不是将时间浪费在编写重复代码上面。

而作为 Java 程序员,令人苦恼的地方是 Java 开发过程中经常需要编写有固定格式的代码,例如说声明一个私有变量,Logger 或者 Bean 等等。对于这种小范围的代码生成,我们可以利用 IDEA 提供的 Live Templates 功能。

Live Template 并不是简单的 Code Snippet,它甚至支持 Groovy函数配置,可以编写一些复杂的逻辑,支持很复杂的代码生成。

基本使用

IDEA 自带很多常用的动态模板,都是大家平常编码时的常用语句格式。比如说下面四张动图中的语句。

四张图分别是 声明静态 String 类型成员变量,判断字符串为空,for 循环和打印函数参数。

psfs

ifn

fori

soutp

自定义 Template

打开配置页面,进入 Live Template 选项卡,我们可以看到 IDEA 预先设置的模板配置。这些模板都是最常用的一些语句,我们先来看一下它们都是如何定义的。

缩写就是 IDEA 识别的模板的别名,就像文章开头展示的当你键入 soutm 时,IDEA 就会自动识别为该模板。

而应用上下文则表示该模板在什么上下文中生效。比如说上文中时一个 System.out 的语句,它只应该在 Java 的函数体中有效,所以它的应用上下文设置为 Java: statement,在其他类型文件或者 Java 文件的成员变量声明位置都无法使用该模板。

模板内容就是你按下 Tab 键之后,IDEA 自动生成的内容,它一般包含两个部分,纯文本和参数。参数可以进行值绑定,还支持光标的自动跳转。如同上文所示,$CLASS_NAME$$METHOD_NAME$ 就是参数,而$END$是一个特殊的参数,它表示光标最后一个跳转的位置。

而参数设置就是设置这些参数的值,可以使用 IDEA 提供的一些内置函数,还可以使用强大的 Groovy 脚本。去 IDEA 的官网可以查看这些函数的具体作用。

variables

我们这里讲解一下 groovyScript("groovy code", arg1) 的使用。它能提供一切你想要的能力,它支持执行 Groovy 脚本处理输入,然后输出处理后的字符串

groovyScript("code", ...)

|  code   |   一段Groovy代码或者Groovy脚本代码绝对路径    |
|  ...    |   可选入参,这些参数会绑定到`_1, _2, _3, ..._n`, 在 Groovy 代码中使用。|

比如之前打印函数参数的模板是这样定义的。

groovyScript("'\"' + _1.collect { it + ' = [\" + ' + it + ' + \"]'}.join(', ') + '\"'", methodParameters())

methodParameters 是 IDEA 内置的函数,它返回的结果作为参数输入到 Groovy 的脚本中,生成打印参数函数的字符串。

后记

感谢大家的阅读,希望大家继续关注,也可以留言分享你最喜欢使用的编程工具和编程小技巧。

Share