Java 数据持久化系列之池化技术

在上一篇文章《Java 数据持久化系列之JDBC》中,我们了解到使用 JDBC 创建 Connection 可以执行对应的SQL,但是创建 Connection 会消耗很多资源,所以 Java 持久化框架中往往不直接使用 JDBC,而是在其上建立数据库连接池层。

今天我们就先来了解一下池化技术的必要性、原理;然后使用 Apache-common-Pool2实现一个简单的数据库连接池;接着通过实验,对比简单连接池、HikariCP、Druid 等数据库连接池的性能数据,分析实现高性能数据库连接池的关键;最后分析 Pool2 的具体源代码实现。

对象不是你想要,想要就能要

你我单身狗们经常调侃可以随便 New 出一个对象,用完就丢。但是有些对象创建的代价比较大,比如线程、tcp连接、数据库连接等对象。对于这些创建耗时较长,或者资源占用较大(占据操作系统资源,比如说线程,网络连接等)的对象,往往会引入池化来管理,减少频繁创建对象的次数,避免创建对象时的耗时,提高性能。

我们就以数据库连接 Connection 对象为例,详细说明一下创建该对象花费的时间和资源。下面是MySQL Driver 创建 Connection 对象的方法,在调用 connect 方法创建 Connection 时,会与 MySQL 进行网络通讯,建立 TCP 连接,这是极其消耗时间的。

connection = driver.connect(URL, props);

使用 Apache-Common-Pool2实现简易数据库连接池

下面,我们以 Apache-Common-Pool2为例来看一下池化技术相关的抽象结构。

首先了解一下Pool2中三元一体的 ObjectPool,PooledObject 和 PooledObjectFactory,对他们的解释如下:

  • ObjectPool 就是对象池,提供了 borrowObjectreturnObject 等一系列函数。
  • PooledObject 是池化对象的封装类,负责记录额外信息,比如说对象状态,对象创建时间,对象空闲时间,对象上次使用时间等。
  • PooledObjectFactory 是负责管理池化对象生命周期的工厂类,提供 makeObjectdestroyObjectactivateObjectvalidateObject 等一系列函数。

上述三者都有其基础的实现类,分别是 GenericObjectPool,DefaultPooledObject 和 BasePooledObjectFactory。上一节实验中的 SimpleDatasource 就是使用上述类实现的。

首先,你要实现一个继承 BasePooledObjectFactory 的工厂类,提供管理池化对象生命周期的具体方法:

  • makeObject:创建池化对象实例,并且使用 PooledObject 将其封装。
  • validateObject:验证对象实例是否安全或者可用,比如说 Connection 是否还保存连接状态。
  • activateObject:将池返回的对象实例进行重新初始化,比如说设置 Connection是否默认AutoCommit等。
  • passivateObject:将返回给池的对象实例进行反初始化,比如说 Connection 中未提交的事务进行 Rollback等。
  • destroyObject:销毁不再被池需要的对象实例,比如说 Connection不再被需要时,调用其 close 方法。

具体的实现源码如下所示,每个方法都有详细的注释。

public class SimpleJdbcConnectionFactory extends BasePooledObjectFactory<Connection> {
    ....
    @Override
    public Connection create() throws Exception {
        // 用于创建池化对象
        Properties props = new Properties();
        props.put("user", USER_NAME);
        props.put("password", PASSWORD);
        Connection connection = driver.connect(URL, props);
        return connection;
    }

    @Override
    public PooledObject<Connection> wrap(Connection connection) {
        // 将池化对象进行封装,返回DefaultPooledObject,这里也可以返回自己实现的PooledObject
        return new DefaultPooledObject<>(connection);
    }

    @Override
    public PooledObject<Connection> makeObject() throws Exception {
        return super.makeObject();
    }

    @Override
    public void destroyObject(PooledObject<Connection> p) throws Exception {
        p.getObject().close();
    }

    @Override
    public boolean validateObject(PooledObject<Connection> p) {
        // 使用 SELECT 1 或者其他sql语句验证Connection是否可用,ConnUtils代码详见Github中的项目
        try {
            ConnUtils.validateConnection(p.getObject(), this.validationQuery);
        } catch (Exception e) {
            return false;
        }
        return true;
    }


    @Override
    public void activateObject(PooledObject<Connection> p) throws Exception {
        Connection conn = p.getObject();
        // 对象被借出,需要进行初始化,将其 autoCommit进行设置
        if (conn.getAutoCommit() != defaultAutoCommit) {
            conn.setAutoCommit(defaultAutoCommit);
        }
    }

    @Override
    public void passivateObject(PooledObject<Connection> p) throws Exception {
        // 对象被归还,进行回收或者处理,比如将未提交的事务进行回滚
        Connection conn = p.getObject();
        if(!conn.getAutoCommit() && !conn.isReadOnly()) {
            conn.rollback();
        }
        conn.clearWarnings();
        if(!conn.getAutoCommit()) {
            conn.setAutoCommit(true);
        }

    }
}

接着,你就可以使用 BasePool 来从池中获取对象,使用后归还给池。

Connection connection = pool.borrowObject(); // 从池中获取连接对象实例
Statement statement = connection.createStatement();
statement.executeQuery("SELECT * FROM activity");
statement.close();
pool.returnObject(connection); // 使用后归还连接对象实例到池中

如上,我们就使用 Apache Common Pool2 实现了一个简易的数据库连接池。下面,我们先来使用 benchmark 验证一下这个简易数据库连接池的性能,再分析 Pool2 的具体源码实现,

性能试验

至此,我们分析完了 Pool2的相关原理和实现,下面就修改 Hikari-benchmark 对我们编写的建议数据库连接池进行性能测试。修改后的 benchmark 的地址为 https://github.com/ztelur/HikariCP-benchmark。

可以看到 hikari 和 Druid 两个数据库连接池的性能是最优的,而我们的简易数据库连接池性能排在末尾。在后续系列文章中会对比我们的简易数据库分析 Hikari 和 Druid 高性能的原因。下面我们先来看一下简易数据库连接池的具体实现。

Apache Common Pool2 源码分析

我们来简要分析 Pool2 的源码( 2.8.0版本 )实现,了解池化技术的基本原理,为后续了解和分析 HikariCP 和 Druid 打下基础,三者在设计思路具有互通之处。

通过前边的实例,我们知道通过 borrowObjectreturnObject 从对象池中接取或者归还对象,进行这些操作时,封装实例 PooledObject 的状态也会发生变化,下面就沿着 PooledObject 状态机的状态变化路线来讲解相关的代码实现。

PooledObject 状态机示意图

上图是 PooledObject 的状态机示意图,蓝色元素代表状态,红色代表 ObjectPool的相关方法。PooledObject 的状态有 IDLE、ALLOCATED、RETURNING、ABANDONED、INVALID、EVICTION 和 EVICTION_RETURN_TO_HEAD(所有状态定义在 PooledObjectState 类中,有些状态暂时未被使用,这里不进行说明)。

主要涉及三部分的状态变化,分别是 1、2、3的借出归还状态变化,4,5的标记抛弃删除状态变化以及6,7,8的检测驱除状态变化。后续会分小节详细介绍这三部分的状态变化。

在这些状态变化过程中,不仅涉及 ObjectPool 的方法,也会调用 PooledObjectFactory 的方法进行相关操作。

PooledOjbectFactory方法示意图

上图表明了在 PooledObject 状态变化过程中涉及的 PooledObjectFactory 的方法。按照前文对 PooledObjectFactory 方法的描述,可以很容易的对应起来。比如说,在编号 1 的对象被借出过程中,先调用 invalidateObject 判断对象可用性,然后调用 activeObject 将对象默认配置初始化。

借出归还状态变化

我们从 GenericObjectPool 的 borrowObject 方法开始了解。该方法可以传入最大等待时间为参数,如果不传则使用配置的默认最大等待时间,borrowObject 的源码如下所示(为了可读性,对代码进行删减)。

public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
    // 1 根据 abandonedConfig 和其他检测判断是否要调用 removeAbandoned 方法进行标记删除操作
    ....
    PooledObject<T> p = null;
    // 当暂时无法获取对象时是否阻塞
    final boolean blockWhenExhausted = getBlockWhenExhausted();
    while (p == null) {
        create = false;
        // 2 先从 idleObjects 队列中获取, pollFisrt 是非阻塞的
        p = idleObjects.pollFirst();
        // 3 没有则调用 create 方法创建一个新的对象
        if (p == null) {
            p = create();
        }
        // 4 blockWhenExhausted 为true,则根据 borrowMaxWaitMillis 进行阻塞操作
        if (blockWhenExhausted) {
            if (p == null) {
                if (borrowMaxWaitMillis < 0) {
                    p = idleObjects.takeFirst(); // 阻塞到获取对象为止
                } else {
                    p = idleObjects.pollFirst(borrowMaxWaitMillis,
                            TimeUnit.MILLISECONDS); // 阻塞到最大等待时间或者获取到对象
                }
            }
        }
        // 5 调用 allocate 进行状态变化
        if (!p.allocate()) {
            p = null;
        }
        if (p != null) {
            // 6 调用 activateObject 进行对象默认初始化,如果出现问题则调用 destroy 
            factory.activateObject(p);
            // 7 如果配置了 TestOnBorrow,则调用 validateObject 进行可用性校验,如果不通过则调用 destroy
            if (getTestOnBorrow()) {
                validate = factory.validateObject(p);
            }
        }
    }
    return p.getObject();
}

borrowObject 方法主要做了五步操作:

  • 第一步是根据配置判断是否要调用 removeAbandoned 方法进行标记删除操作,这个后续小节再细讲。
  • 第二步是尝试获取或创建对象,由源码中2,3,4 步骤组成。
  • 第三步是调用 allocate 进行状态变更,转换为 ALLOCATED 状态,如源码中的 5 步骤。
  • 第四步是调用 factory 的 activateObject 进行对象的初始化,如果出错则调用 destroy 方法销毁对象,如源码中的 6 步骤。
  • 第五步是根据 TestOnBorrow 配置调用 factory 的 validateObject 进行对象可用性分析,如果不可用,则调用 destroy 方法销毁对象,如源码中的 7 步骤。

示意图

我们对第二步进行一下细致的分析。idleObjects 是存储着所有 IDLE状态 (也有可能是 EVICTION 状态) PooledObject 的 LinkedBlockingDeque 对象。第二步中先调用其 pollFirst 方法从队列头获取 PooledObject,如果未获取到则调用 create 方法创建一个新的。

create 也可能未创建成功,则当 blockWhenExhausted 为 true 时,未获取到对象需要一直阻塞,所以根据最大等待时间 borrowMaxWaitMillis 来调用 takeFirst 或者 pollFirst(time) 方法进行阻塞式获取;当 blockWhenExhausted 为 false 时,则直接抛出异常返回。

create 方法会判断当前状况下是否应该创建新的对象,主要是要防止创建的对象数量超过最大池对象数量。如果可以创建新对象,则调用 PooledObjectFactory 的 makeObject 方法进行新对象创建,然后根据 testOnCreate 配置来判断是否调用 validateObject 方法进行校验,源码如下所示。

private PooledObject<T> create() throws Exception {
    int localMaxTotal = getMaxTotal(); // 获取池对象最大数量
    final long localStartTimeMillis = System.currentTimeMillis();
    final long localMaxWaitTimeMillis = Math.max(getMaxWaitMillis(), 0); // 获取最大等待时间
    Boolean create = null;
    // 一直等待到 create 被赋值,true代表要创建新对象,false代表不能创建
    while (create == null) {
        synchronized (makeObjectCountLock) {
            final long newCreateCount = createCount.incrementAndGet();
            if (newCreateCount > localMaxTotal) {
                // pool已经满或者正在创建的足够达到最大数量的对象。
                createCount.decrementAndGet();
                if (makeObjectCount == 0) {
                    // 目前没有其他的 makeObject 方法被调用,直接返回false
                    create = Boolean.FALSE;
                } else {
                    // 目前有其他的 makeObject 方法被调用,但是可能失败,所以等待一段时间再试试
                    makeObjectCountLock.wait(localMaxWaitTimeMillis);
                }
            } else {
                // pool未满 可以创建对象。
                makeObjectCount++;
                create = Boolean.TRUE;
            }
        }

        // 执行超过 maxWaitTimeMillis 则返回false
        if (create == null &&
            (localMaxWaitTimeMillis > 0 &&
                System.currentTimeMillis() - localStartTimeMillis >= localMaxWaitTimeMillis)) {
            create = Boolean.FALSE;
        }
    }
    // 如果 create 为false,返回 NULL
    if (!create.booleanValue()) {
        return null;
    }

    final PooledObject<T> p;
    try {
        // 调用 factory 的 makeObject 进行对象创建,并且按照 testOnCreate 配置调用 validateObject 方法
        p = factory.makeObject();
        if (getTestOnCreate() && !factory.validateObject(p)) {
            // 这里代码有问题,校验不通过的对象没有进行销毁?
            createCount.decrementAndGet();
            return null;
        }
    } catch (final Throwable e) {
        createCount.decrementAndGet();
        throw e;
    } finally {
        synchronized (makeObjectCountLock) {
            // 减少 makeObjectCount
            makeObjectCount--;
            makeObjectCountLock.notifyAll();
        }
    }
    allObjects.put(new IdentityWrapper<>(p.getObject()), p);
    return p;
}

需要注意的是 create 方法创建的对象并没有第一时间加入到 idleObjects 队列中,该对象将会在后续使用完毕调用 returnObject 方法时才会加入到队列中。

接下来,我们看一下 returnObject 方法的实现。该方法主要做了六步操作:

  • 第一步是调用 markReturningState 方法将状态变更为 RETURNING。
  • 第二步是根据 testOnReturn 配置调用 PooledObjectFactory 的 validateObject 方法进行可用性校验。如果未通过校验,则调用 destroy 消耗该对象,然后调用 ensureIdle 确保池中有 IDLE 状态对象可用,如果没有会调用 create 方法创建新的对象。
  • 第三步是调用 PooledObjectFactory 的 passivateObject 方法进行反初始化操作。
  • 第四步是调用 deallocate 将状态变更为 IDLE。
  • 第五步是检测是否超过了最大 IDLE 对象数量,如果超过了则销毁当前对象。
  • 第六步是根据 LIFO (last in, first out) 配置将对象放置到队列的首部或者尾部。
    public void returnObject(final T obj) {
        final PooledObject<T> p = allObjects.get(new IdentityWrapper<>(obj));
        // 1 将状态转换为 RETURNING
        markReturningState(p);
    
        final long activeTime = p.getActiveTimeMillis();
        // 2 根据配置,对实例进行可用性校验
        if (getTestOnReturn() && !factory.validateObject(p)) {
            destroy(p);
            // 因为删除了一个对象,需要确保池内还有对象,如果没有改方法会创建新对象
            ensureIdle(1, false); 
            updateStatsReturn(activeTime);
            return;
        }
        // 3 调用 passivateObject 将对象反初始化。
        try {
            factory.passivateObject(p);
        } catch (final Exception e1) {
             .... // 和上边 validateObject 校验失败相同操作。
        }
        // 4 将状态变更为 IDLE
        if (!p.deallocate()) {
            throw new IllegalStateException(
                    "Object has already been returned to this pool or is invalid");
        }
    
        final int maxIdleSave = getMaxIdle();
        // 5 如果超过最大 IDLE 数量,则进行销毁
        if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
            .... // 同上边 validateObject 校验失败相同操作。
        } else {
            // 6 根据 LIFO 配置,将归还的对象放置在队列首部或者尾部。 这边源码拼错了。
            if (getLifo()) {
                idleObjects.addFirst(p);
            } else {
                idleObjects.addLast(p);
            }
        }
        updateStatsReturn(activeTime);
    }
    

下图介绍了第六步两种入队列的场景,LIFO 为 true 时防止在队列头部;LIFO 为 false 时,防止在队列尾部。要根据不同的池化对象选择不同的场景。但是放置在尾部可以避免并发热点,因为借对象和还对象都需要操作队列头,需要进行并发控制。

LIFO示意图

标记删除状态变化

标记删除状态变化操作主要通过 removeAbandoned 实现,它主要是检查已经借出的对象是否需要删除,防止对象被借出长时间未使用或者归还所导致的池对象被耗尽的情况。

removeAbandoned 根据 AbandonedConfig 可能会在 borrowObject 或者 检测驱除对象的 evict 方法执行时被调用。

public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
    
    final AbandonedConfig ac = this.abandonedConfig;
    // 当配置了 removeAbandonedOnBorrow 并且 当前 idle 对象数量少于2,活跃对象数量只比最大对象数量少3.
    if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
            (getNumIdle() < 2) &&
            (getNumActive() > getMaxTotal() - 3) ) {
        removeAbandoned(ac);
    }
    ....
}

public void evict() throws Exception {
    ....
    final AbandonedConfig ac = this.abandonedConfig;
        // 设置了 removeAbandonedOnMaintenance
        if (ac != null && ac.getRemoveAbandonedOnMaintenance()) {
            removeAbandoned(ac);
        }
}

removeAbandoned 使用典型的标记删除策略:标记阶段是先对所有的对象进行遍历,如果该对象是 ALLOCATED 并且上次使用时间已经超过超时时间,则将其状态变更为 ABANDONED 状态,并加入到删除队列中;删除阶段则遍历删除队列,依次调用 invalidateObject 方法删除并销毁对象。

private void removeAbandoned(final AbandonedConfig ac) {
    // 收集需要 abandoned 的对象
    final long now = System.currentTimeMillis();
    // 1 根据配置的时间计算超时时间
    final long timeout =
            now - (ac.getRemoveAbandonedTimeout() * 1000L);
    final ArrayList<PooledObject<T>> remove = new ArrayList<>();
    final Iterator<PooledObject<T>> it = allObjects.values().iterator();
    while (it.hasNext()) {
        final PooledObject<T> pooledObject = it.next();
        // 2 遍历所有的对象,如果它是已经分配状态,并且该对象的最近一次使用时间小于超时时间
        synchronized (pooledObject) {
            if (pooledObject.getState() == PooledObjectState.ALLOCATED &&
                    pooledObject.getLastUsedTime() <= timeout) {
                // 3 将对象状态更改为 ABANDONED,并加入到删除队列
                pooledObject.markAbandoned();
                remove.add(pooledObject);
            }
        }
    }

    // 4 遍历删除队列
    final Iterator<PooledObject<T>> itr = remove.iterator();
    while (itr.hasNext()) {
        final PooledObject<T> pooledObject = itr.next();
        // 5 调用 invalidateObject 方法删除对象
        invalidateObject(pooledObject.getObject());
    }
}

invalidateObject 方法直接调用了 destroy 方法,destroy 方法在上边的源码分析中也反复出现,它主要进行了四步操作:

  • 1 将对象状态变更为 INVALID。
  • 2 将对象从队列和集合中删除。
  • 3 调用 PooledObjectFactory 的 destroyObject 方法销毁对象。
  • 4 更新统计数据
private void destroy(final PooledObject<T> toDestroy) throws Exception {
    // 1 将状态变更为 INVALID
    toDestroy.invalidate();
    // 2 从队列和池中删除
    idleObjects.remove(toDestroy);
    allObjects.remove(new IdentityWrapper<>(toDestroy.getObject()));
    // 3 调用 destroyObject 回收对象
    try {
        factory.destroyObject(toDestroy);
    } finally {
        // 4 更新统计数据
        destroyedCount.incrementAndGet();
        createCount.decrementAndGet();
    }
}

检测驱除状态变化

检测驱除状态变化主要由 evict 方法操作,在后台线程中独立完成,主要检测池中的 IDLE 状态的空闲对象是否需要驱除,超时时间通过 EvictionConfig 进行配置。

驱逐者 Evictor,在 BaseGenericObjectPool 中定义,本质是由 java.util.TimerTask 定义的定时任务。

final void startEvictor(final long delay) {
    synchronized (evictionLock) {
        if (delay > 0) {
            // 定时执行 evictor 线程
            evictor = new Evictor();
            EvictionTimer.schedule(evictor, delay, delay);
        }
    }
}

在 Evictor 线程中会调用 evict 方法,该方法主要是遍历所有的 IDLE 对象,然后对每个对象执行检测驱除操作,具体源码如下所示:

  • 调用 startEvictionTest 方法将状态更改为 EVICTED。
  • 根据驱除策略和对象超时时间判断是否要驱除。
  • 如果需要被驱除则调用 destroy 方法销毁对象。
  • 如果设置了 testWhileIdle 则调用 PooledObject 的 validateObject 进行可用性校验。
  • 调用 endEvictionTest 将状态更改为 IDLE。
public void evict() throws Exception {
    if (idleObjects.size() > 0) {
        ....
        final EvictionPolicy<T> evictionPolicy = getEvictionPolicy();
        synchronized (evictionLock) {
            for (int i = 0, m = getNumTests(); i < m; i++) {
                // 1 遍历所有 idle 的对象
                try {
                    underTest = evictionIterator.next();
                } catch (final NoSuchElementException nsee) {
                }
                // 2 调用 startEvictionTest 将状态变更为 EVICTED
                if (!underTest.startEvictionTest()) {
                    continue;
                }
                // 3 根据驱除策略判断是否要驱除
                boolean evict = evictionPolicy.evict(evictionConfig, underTest,
                        idleObjects.size());

                if (evict) {
                    // 4 进行驱除
                    destroy(underTest);
                    destroyedByEvictorCount.incrementAndGet();
                } else {
                    // 5 如果需要检测,则进行可用性检测
                    if (testWhileIdle) {
                        factory.activateObject(underTest);
                        factory.validateObject(underTest));
                        factory.passivateObject(underTest);
                        }
                    // 5 变更状态为 IDLE
                    if (!underTest.endEvictionTest(idleObjects)) {
                    }
                }
            }
        }
    }
    .... // abandoned 相关的操作
}

后记

后续会分析 Hikari 和 Druid 数据库连接池的实现,请大家多多关注。

个人博客,欢迎来玩

参考

Share

2020年,计划目录

2020 年度计划

公众号

  • 公众号订阅量定个小目标,保证发文频次

技术

  • kafka、rocketmq
  • linux和jvm性能优化
  • 算法leetcode

日常

  • 股票计划
  • 坚持每周至少2次锻炼,包括健身房,篮球或者其他运动
  • 计划追踪总结体系建立
Share

Redis Cluster 的数据分片机制

上一篇《分布式数据缓存中的一致性哈希算法》
文章中讲述了一致性哈希算法的基本原理和实现,今天就以 Redis Cluster 为例,详细讲解一下分布式数据缓存中的数据分片,上线下线时数据迁移以及请求重定向等操作。

Redis 集群简介

Redis Cluster 是 Redis 的分布式解决方案,在 3.0 版本正式推出,有效地解决了 Redis 分布式方面的需求。

Redis Cluster 一般由多个节点组成,节点数量至少为 6 个才能保证组成完整高可用的集群,其中三个为主节点,三个为从节点。三个主节点会分配槽,处理客户端的命令请求,而从节点可用在主节点故障后,顶替主节点。


图片来源 redislabs

如上图所示,该集群中包含 6 个 Redis 节点,3主3从,分别为M1,M2,M3,S1,S2,S3。除了主从 Redis 节点之间进行数据复制外,所有 Redis 节点之间采用 Gossip 协议进行通信,交换维护节点元数据信息。

一般来说,主 Redis 节点会处理 Clients 的读写操作,而从节点只处理读操作。

数据分片策略

分布式数据存储方案中最为重要的一点就是数据分片,也就是所谓的 Sharding。

为了使得集群能够水平扩展,首要解决的问题就是如何将整个数据集按照一定的规则分配到多个节点上,常用的数据分片的方法有:范围分片,哈希分片,一致性哈希算法,哈希槽等。

范围分片假设数据集是有序,将顺序相临近的数据放在一起,可以很好的支持遍历操作。范围分片的缺点是面对顺序写时,会存在热点。比如日志类型的写入,一般日志的顺序都是和时间相关的,时间是单调递增的,因此写入的热点永远在最后一个分片。

范围分区

对于关系型的数据库,因为经常性的需要表扫描或者索引扫描,基本上都会使用范围的分片策略。

哈希分片和一致性哈希算法在上一篇文章中已经学习过了,感兴趣的同学可以去了解一下《分布式数据缓存中的一致性哈希算法》。我们接下来主要来看Redis 的虚拟哈希槽策略。

Redis Cluster 采用虚拟哈希槽分区,所有的键根据哈希函数映射到 0 ~ 16383 整数槽内,计算公式:slot = CRC16(key) & 16383。每一个节点负责维护一部分槽以及槽所映射的键值数据。

Redis 虚拟槽分区的特点:

  • 解耦数据和节点之间的关系,简化了节点扩容和收缩难度。
  • 节点自身维护槽的映射关系,不需要客户端或者代理服务维护槽分区元数据
  • 支持节点、槽和键之间的映射查询,用于数据路由,在线集群伸缩等场景。

Redis 集群提供了灵活的节点扩容和收缩方案。在不影响集群对外服务的情况下,可以为集群添加节点进行扩容也可以下线部分节点进行缩容。可以说,槽是 Redis 集群管理数据的基本单位,集群伸缩就是槽和数据在节点之间的移动。

下面我们就先来看一下 Redis 集群伸缩的原理。然后再了解当 Redis 节点数据迁移过程中或者故障恢复时如何保证集群可用。

扩容集群

为了让读者更好的理解上线节点时的扩容操作,我们通过 Redis Cluster 的命令来模拟整个过程。

当一个 Redis 新节点运行并加入现有集群后,我们需要为其迁移槽和数据。首先要为新节点指定槽的迁移计划,确保迁移后每个节点负责相似数量的槽,从而保证这些节点的数据均匀。

1) 首先启动一个 Redis 节点,记为 M4。
2) 使用 cluster meet 命令,让新 Redis 节点加入到集群中。新节点刚开始都是主节点状态,由于没有负责的槽,所以不能接受任何读写操作,后续我们就给他迁移槽和填充数据。
3) 对 M4 节点发送 cluster setslot { slot } importing { sourceNodeId} 命令,让目标节点准备导入槽的数据。
4) 对源节点,也就是 M1,M2,M3 节点发送 cluster setslot { slot } migrating { targetNodeId} 命令,让源节点准备迁出槽的数据。
5) 源节点执行 cluster getkeysinslot { slot } { count } 命令,获取 count 个属于槽 { slot } 的键,然后执行步骤六的操作进行迁移键值数据。
6) 在源节点上执行 migrate { targetNodeIp} “ “ 0 { timeout } keys { key… } 命令,把获取的键通过 pipeline 机制批量迁移到目标节点,批量迁移版本的 migrate 命令在 Redis 3.0.6 以上版本提供。
7) 重复执行步骤 5 和步骤 6 直到槽下所有的键值数据迁移到目标节点。
8) 向集群内所有主节点发送 cluster setslot { slot } node { targetNodeId } 命令,通知槽分配给目标节点。为了保证槽节点映射变更及时传播,需要遍历发送给所有主节点更新被迁移的槽执行新节点。

收缩集群

收缩节点就是将 Redis 节点下线,整个流程需要如下操作流程。

1) 首先需要确认下线节点是否有负责的槽,如果是,需要把槽迁移到其他节点,保证节点下线后整个集群槽节点映射的完整性。
2) 当下线节点不再负责槽或者本身是从节点时,就可以通知集群内其他节点忘记下线节点,当所有的节点忘记改节点后可以正常关闭。

下线节点需要将节点自己负责的槽迁移到其他节点,原理与之前节点扩容的迁移槽过程一致。

迁移完槽后,还需要通知集群内所有节点忘记下线的节点,也就是说让其他节点不再与要下线的节点进行 Gossip 消息交换。

Redis 集群使用 cluster forget { downNodeId } 命令来讲指定的节点加入到禁用列表中,在禁用列表内的节点不再发送 Gossip 消息。

客户端路由

在集群模式下,Redis 节点接收任何键相关命令时首先计算键对应的槽,在根据槽找出所对应的节点,如果节点是自身,则处理键命令;否则回复 MOVED 重定向错误,通知客户端请求正确的节点。这个过程称为 MOVED 重定向。

需要注意的是 Redis 计算槽时并非只简单的计算键值内容,当键值内容包括大括号时,则只计算括号内的内容。比如说,key 为 user:{10000}:books时,计算哈希值只计算10000。

MOVED 错误示例如下,键 x 所属的哈希槽 3999 ,以及负责处理这个槽的节点的 IP 和端口号 127.0.0.1:6381 。 客户端需要根据这个 IP 和端口号, 向所属的节点重新发送一次 GET 命令请求。

GET x
-MOVED 3999 127.0.0.1:6381

由于请求重定向会增加 IO 开销,这不是 Redis 集群高效的使用方式,而是要使用 Smart 集群客户端。Smart 客户端通过在内部维护 slot 到 Redis节点的映射关系,本地就可以实现键到节点的查找,从而保证 IO 效率的最大化,而 MOVED 重定向负责协助客户端更新映射关系。

Redis 集群支持在线迁移槽( slot ) 和数据来完成水平伸缩,当 slot 对应的数据从源节点到目标节点迁移过程中,客户端需要做到智能迁移,保证键命令可正常执行。例如当 slot 数据从源节点迁移到目标节点时,期间可能出现一部分数据在源节点,而另一部分在目标节点。

所以,综合上述情况,客户端命令执行流程如下所示:

  • 客户端根据本地 slot 缓存发送命令到源节点,如果存在键对应则直接执行并返回结果给客户端。
  • 如果节点返回 MOVED 错误,更新本地的 slot 到 Redis 节点的映射关系,然后重新发起请求。
  • 如果数据正在迁移中,节点会回复 ASK 重定向异常。格式如下: ( error ) ASK { slot } { targetIP } : {targetPort}
  • 客户端从 ASK 重定向异常提取出目标节点信息,发送 asking 命令到目标节点打开客户端连接标识,再执行键命令。

ASK 和 MOVED 虽然都是对客户端的重定向控制,但是有着本质区别。ASK 重定向说明集群正在进行 slot 数据迁移,客户端无法知道什么时候迁移完成,因此只能是临时性的重定向,客户端不会更新 slot 到 Redis 节点的映射缓存。但是 MOVED 重定向说明键对应的槽已经明确指定到新的节点,因此需要更新 slot 到 Redis 节点的映射缓存。

故障转移

当 Redis 集群内少量节点出现故障时通过自动故障转移保证集群可以正常对外提供服务。

当某一个 Redis 节点客观下线时,Redis 集群会从其从节点中通过选主选出一个替代它,从而保证集群的高可用性。这块内容并不是本文的核心内容,感兴趣的同学可以自己学习。

但是,有一点要注意。默认情况下,当集群 16384 个槽任何一个没有指派到节点时整个集群不可用。执行任何键命令返回 CLUSTERDOWN Hash slot not served 命令。当持有槽的主节点下线时,从故障发现到自动完成转移期间整个集群是不可用状态,对于大多数业务无法忍受这情况,因此建议将参数 cluster-require-full-coverage 配置为 no ,当主节点故障时只影响它负责槽的相关命令执行,不会影响其他主节点的可用性

参考

Share

Redis 命令执行过程(下)

在上一篇文章中《Redis 命令执行过程(上)》中,我们首先了解 Redis 命令执行的整体流程,然后细致分析了从 Redis 启动到建立 socket 连接,再到读取 socket 数据到输入缓冲区,解析命令,执行命令等过程的原理和实现细节。接下来,我们来具体看一下 set 和 get 命令的实现细节和如何将命令结果通过输出缓冲区和 socket 发送给 Redis 客户端。

set 和 get 命令具体实现

前文讲到 processCommand 方法会从输入缓冲区中解析出对应的 redisCommand,然后调用 call 方法执行解析出来的 redisCommand的 proc 方法。不同命令的的 proc 方法是不同的,比如说名为 set 的 redisCommand 的 proc 是 setCommand 方法,而 get 的则是 getCommand 方法。通过这种形式,实际上实现在Java 中特别常见的多态策略。

void call(client *c, int flags) {
    ....
    c->cmd->proc(c);
    ....
}
// redisCommand结构体
struct redisCommand {
    char *name;
    // 对应方法的函数范式
    redisCommandProc *proc;
    .... // 其他定义
};
// 使用 typedef 定义的别名
typedef void redisCommandProc(client *c);
// 不同的命令,调用不同的方法。
struct redisCommand redisCommandTable[] = {
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    {"hmset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
    .... // 所有的 redis 命令都有
}

setCommand 会判断set命令是否携带了nx、xx、ex或者px等可选参数,然后调用setGenericCommand命令。我们直接来看 setGenericCommand 方法。

setGenericCommand 方法的处理逻辑如下所示:

  • 首先判断 set 的类型是 set_nx 还是 set_xx,如果是 nx 并且 key 已经存在则直接返回;如果是 xx 并且 key 不存在则直接返回。
  • 调用 setKey 方法将键值添加到对应的 Redis 数据库中。
  • 如果有过期时间,则调用 setExpire 将设置过期时间
  • 进行键空间通知
  • 返回对应的值给客户端。
// t_string.c 
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
    long long milliseconds = 0; 
    /**
     * 设置了过期时间;expire是robj类型,获取整数值
     */
    if (expire) {
        if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
            return;
        if (milliseconds <= 0) {
            addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
            return;
        }
        if (unit == UNIT_SECONDS) milliseconds *= 1000;
    }
    /**
     * NX,key存在时直接返回;XX,key不存在时直接返回
     * lookupKeyWrite 是在对应的数据库中寻找键值是否存在
     */
    if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
        (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
    {
        addReply(c, abort_reply ? abort_reply : shared.nullbulk);
        return;
    }
    /**
     * 添加到数据字典
     */
    setKey(c->db,key,val);
    server.dirty++;
    /**
     * 过期时间添加到过期字典
     */
    if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
    /**
     * 键空间通知
     */
    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
    if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
        "expire",key,c->db->id);
    /**
     * 返回值,addReply 在 get 命令时再具体讲解
     */
    addReply(c, ok_reply ? ok_reply : shared.ok);
}

具体 setKey 和 setExpire 的方法实现我们这里就不细讲,其实就是将键值添加到db的 dict 数据哈希表中,将键和过期时间添加到 expires 哈希表中,如下图所示。

接下来看 getCommand 的具体实现,同样的,它底层会调用 getGenericCommand 方法。

getGenericCommand 方法会调用 lookupKeyReadOrReply 来从 dict 数据哈希表中查找对应的 key值。如果找不到,则直接返回 C_OK;如果找到了,则根据值的类型,调用 addReply 或者 addReplyBulk 方法将值添加到输出缓冲区中。

int getGenericCommand(client *c) {
    robj *o;
    // 调用 lookupKeyReadOrReply 从数据字典中查找对应的键
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
        return C_OK;
    // 如果是string类型,调用 addReply 单行返回。如果是其他对象类型,则调用 addReplyBulk
    if (o->type != OBJ_STRING) {
        addReply(c,shared.wrongtypeerr);
        return C_ERR;
    } else {
        addReplyBulk(c,o);
        return C_OK;
    }
}

lookupKeyReadWithFlags 会从 redisDb 中查找对应的键值对,它首先会调用 expireIfNeeded判断键是否过期并且需要删除,如果为过期,则调用 lookupKey 方法从 dict 哈希表中查找并返回。具体解释可以看代码中的详细注释

/*
 * 查找key的读操作,如果key找不到或者已经逻辑上过期返回 NULL,有一些副作用
 *   1 如果key到达过期时间,它会被设备为过期,并且删除
 *   2 更新key的最近访问时间
 *   3 更新全局缓存击中概率
 * flags 有两个值: LOOKUP_NONE 一般都是这个;LOOKUP_NOTOUCH 不修改最近访问时间
 */
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { // db.c
    robj *val;
    // 检查键是否过期
    if (expireIfNeeded(db,key) == 1) {
        .... // master和 slave 对这种情况的特殊处理
    }
    // 查找键值字典
    val = lookupKey(db,key,flags);
    // 更新全局缓存命中率
    if (val == NULL)
        server.stat_keyspace_misses++;
    else
        server.stat_keyspace_hits++;
    return val;
}

Redis 在调用查找键值系列方法前都会先调用 expireIfNeeded 来判断键是否过期,然后根据 Redis 是否配置了懒删除来进行同步删除或者异步删除。关于键删除的细节可以查看《详解 Redis 内存管理机制和实现》一文。

在判断键释放过期的逻辑中有两个特殊情况:

  • 如果当前 Redis 是主从结构中的从实例,则只判断键是否过期,不直接对键进行删除,而是要等待主实例发送过来的删除命令后再进行删除。如果当前 Redis 是主实例,则调用 propagateExpire 来传播过期指令。
  • 如果当前正在进行 Lua 脚本执行,因为其原子性和事务性,整个执行过期中时间都按照其开始执行的那一刻计算,也就是说lua执行时未过期的键,在它整个执行过程中也都不会过期。

/*
 * 在调用 lookupKey*系列方法前调用该方法。
 * 如果是slave:
 *  slave 并不主动过期删除key,但是返回值仍然会返回键已经被删除。
 *  master 如果key过期了,会主动删除过期键,并且触发 AOF 和同步操作。
 * 返回值为0表示键仍然有效,否则返回1
 */
int expireIfNeeded(redisDb *db, robj *key) { // db.c
    // 获取键的过期时间
    mstime_t when = getExpire(db,key);
    mstime_t now;

    if (when < 0) return 0;

    /*
     * 如果当前是在执行lua脚本,根据其原子性,整个执行过期中时间都按照其开始执行的那一刻计算
     * 也就是说lua执行时未过期的键,在它整个执行过程中也都不会过期。
     */ 
    now = server.lua_caller ? server.lua_time_start : mstime();

    // slave 直接返回键是否过期
    if (server.masterhost != NULL) return now > when;
    // master时,键未过期直接返回
    if (now <= when) return 0;

    // 键过期,删除键
    server.stat_expiredkeys++;
    // 触发命令传播
    propagateExpire(db,key,server.lazyfree_lazy_expire);
    // 和键空间事件
    notifyKeyspaceEvent(NOTIFY_EXPIRED,
        "expired",key,db->id);
    // 根据是否懒删除,调用不同的函数 
    return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
                                         dbSyncDelete(db,key);
}

lookupKey 方法则是通过 dictFind 方法从 redisDb 的 dict 哈希表中查找键值,如果能找到,则根据 redis 的 maxmemory_policy 策略来判断是更新 lru 的最近访问时间,还是调用 updateFU 方法更新其他指标,这些指标可以在后续内存不足时对键值进行回收。

robj *lookupKey(redisDb *db, robj *key, int flags) {
    // dictFind 根据 key 获取字典的entry
    dictEntry *de = dictFind(db->dict,key->ptr);
    if (de) {
        // 获取 value
        robj *val = dictGetVal(de);
        // 当处于 rdb aof 子进程复制阶段或者 flags 不是 LOOKUP_NOTOUCH
        if (server.rdb_child_pid == -1 &&
            server.aof_child_pid == -1 &&
            !(flags & LOOKUP_NOTOUCH))
        {
            // 如果是 MAXMEMORY_FLAG_LFU 则进行相应操作
            if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
                updateLFU(val);
            } else {
                // 更新最近访问时间
                val->lru = LRU_CLOCK();
            }
        }
        return val;
    } else {
        return NULL;
    }
}

将命令结果写入输出缓冲区

在所有的 redisCommand 执行的最后,一般都会调用 addReply 方法进行结果返回,我们的分析也来到了 Redis 命令执行的返回数据阶段。

addReply 方法做了两件事情:

  • prepareClientToWrite 判断是否需要返回数据,并且将当前 client 添加到等待写返回数据队列中。
  • 调用 _addReplyToBuffer 和 _addReplyObjectToList 方法将返回值写入到输出缓冲区中,等待写入 socekt。
void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;
    if (sdsEncodedObject(obj)) {
        // 需要将响应内容添加到output buffer中。总体思路是,先尝试向固定buffer添加,添加失败的话,在尝试添加到响应链表
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyObjectToList(c,obj);
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        .... // 特殊情况的优化
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

prepareClientToWrite 首先判断了当前 client是否需要返回数据:

  • Lua 脚本执行的 client 则需要返回值;
  • 如果客户端发送来 REPLY OFF 或者 SKIP 命令,则不需要返回值;
  • 如果是主从复制时的主实例 client,则不需要返回值;
  • 当前是在 AOF loading 状态的假 client,则不需要返回值。

接着如果这个 client 还未处于延迟等待写入 (CLIENT_PENDING_WRITE)的状态,则将其设置为该状态,并将其加入到 Redis 的等待写入返回值客户端队列中,也就是 clients_pending_write队列。

int prepareClientToWrite(client *c) {
    // 如果是 lua client 则直接OK
    if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;
    // 客户端发来过 REPLY OFF 或者 SKIP 命令,不需要发送返回值
    if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;

    // master 作为client 向 slave 发送命令,不需要接收返回值
    if ((c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;
    // AOF loading 时的假client 不需要返回值
    if (c->fd <= 0) return C_ERR; 

    // 将client加入到等待写入返回值队列中,下次事件周期会进行返回值写入。
    if (!clientHasPendingReplies(c) &&
        !(c->flags & CLIENT_PENDING_WRITE) &&
        (c->replstate == REPL_STATE_NONE ||
         (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
    {
        // 设置标志位并且将client加入到 clients_pending_write 队列中
        c->flags |= CLIENT_PENDING_WRITE;
        listAddNodeHead(server.clients_pending_write,c);
    }
    // 表示已经在排队,进行返回数据
    return C_OK;
}

Redis 将存储等待返回的响应数据的空间,也就是输出缓冲区分成两部分,一个固定大小的 buffer 和一个响应内容数据的链表。在链表为空并且 buffer 有足够空间时,则将响应添加到 buffer 中。如果 buffer 满了则创建一个节点追加到链表上。_addReplyToBuffer 和 _addReplyObjectToList 就是分别向这两个空间写数据的方法。

固定buffer和响应链表,整体上构成了一个队列。这么组织的好处是,既可以节省内存,不需一开始预先分配大块内存,并且可以避免频繁分配、回收内存。

上面就是响应内容写入输出缓冲区的过程,下面看一下将数据从输出缓冲区写入 socket 的过程。

prepareClientToWrite 函数,将客户端加入到了Redis 的等待写入返回值客户端队列中,也就是 clients_pending_write 队列。请求处理的事件处理逻辑就结束了,等待 Redis 下一次事件循环处理时,将响应从输出缓冲区写入到 socket 中。

将命令返回值从输出缓冲区写入 socket

《Redis 事件机制详解》
一文中我们知道,Redis 在两次事件循环之间会调用 beforeSleep 方法处理一些事情,而对 clients_pending_write 列表的处理就在其中。

下面的 aeMain 方法就是 Redis 事件循环的主逻辑,可以看到每次循环时都会调用 beforesleep 方法。

void aeMain(aeEventLoop *eventLoop) { // ae.c
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        /* 如果有需要在事件处理前执行的函数,那么执行它 */
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        /* 开始处理事件*/
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

beforeSleep 函数会调用 handleClientsWithPendingWrites 函数来处理 clients_pending_write 列表。

handleClientsWithPendingWrites 方法会遍历 clients_pending_write 列表,对于每个 client 都会先调用 writeToClient 方法来尝试将返回数据从输出缓存区写入到 socekt中,如果还未写完,则只能调用 aeCreateFileEvent 方法来注册一个写数据事件处理器 sendReplyToClient,等待 Redis 事件机制的再次调用。

这样的好处时对于返回数据较少的客户端,不需要麻烦的注册写数据事件,等待事件触发再写数据到 socket,而是在下一次事件循环周期就直接将数据写到 socket中,加快了数据返回的响应速度。

但是从这里也会发现,如果 clients_pending_write 队列过长,则处理时间也会很久,阻塞正常的事件响应处理,导致 Redis 后续命令延时增加。

// 直接将返回值写到client的输出缓冲区中,不需要进行系统调用,也不需要注册写事件处理器
int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    // 获取系统延迟写队列的长度
    int processed = listLength(server.clients_pending_write);

    listRewind(server.clients_pending_write,&li);
    // 依次处理
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write,ln);

        // 将缓冲值写入client的socket中,如果写完,则跳过之后的操作。
        if (writeToClient(c->fd,c,0) == C_ERR) continue;

        // 还有数据未写入,只能注册写事件处理器了
        if (clientHasPendingReplies(c)) {
            int ae_flags = AE_WRITABLE;
            if (server.aof_state == AOF_ON &&
                server.aof_fsync == AOF_FSYNC_ALWAYS)
            {
                ae_flags |= AE_BARRIER;
            }
            // 注册写事件处理器 sendReplyToClient,等待执行
            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
                sendReplyToClient, c) == AE_ERR)
            {
                    freeClientAsync(c);
            }
        }
    }
    return processed;
}

sendReplyToClient 方法其实也会调用 writeToClient 方法,该方法就是将输出缓冲区中的 buf 和 reply 列表中的数据都尽可能多的写入到对应的 socket中。

// 将输出缓冲区中的数据写入socket,如果还有数据未处理则返回C_OK
int writeToClient(int fd, client *c, int handler_installed) {
    ssize_t nwritten = 0, totwritten = 0;
    size_t objlen;
    sds o;
    // 仍然有数据未写入
    while(clientHasPendingReplies(c)) {
        // 如果缓冲区有数据
        if (c->bufpos > 0) {
            // 写入到 fd 代表的 socket 中
            nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            // 统计本次一共输出了多少子节
            totwritten += nwritten;

            // buffer中的数据已经发送,则重置标志位,让响应的后续数据写入buffer
            if ((int)c->sentlen == c->bufpos) {
                c->bufpos = 0;
                c->sentlen = 0;
            }
        } else {
            // 缓冲区没有数据,从reply队列中拿
            o = listNodeValue(listFirst(c->reply));
            objlen = sdslen(o);

            if (objlen == 0) {
                listDelNode(c->reply,listFirst(c->reply));
                continue;
            }
            // 将队列中的数据写入 socket
            nwritten = write(fd, o + c->sentlen, objlen - c->sentlen);
            if (nwritten <= 0) break;
            c->sentlen += nwritten;
            totwritten += nwritten;
            // 如果写入成功,则删除队列
            if (c->sentlen == objlen) {
                listDelNode(c->reply,listFirst(c->reply));
                c->sentlen = 0;
                c->reply_bytes -= objlen;
                if (listLength(c->reply) == 0)
                    serverAssert(c->reply_bytes == 0);
            }
        }
        // 如果输出的字节数量已经超过NET_MAX_WRITES_PER_EVENT限制,break
        if (totwritten > NET_MAX_WRITES_PER_EVENT &&
            (server.maxmemory == 0 ||
             zmalloc_used_memory() < server.maxmemory) &&
            !(c->flags & CLIENT_SLAVE)) break;
    }
    server.stat_net_output_bytes += totwritten;
    if (nwritten == -1) {
        if (errno == EAGAIN) {
            nwritten = 0;
        } else {
            serverLog(LL_VERBOSE,
                "Error writing to client: %s", strerror(errno));
            freeClient(c);
            return C_ERR;
        }
    }
    if (!clientHasPendingReplies(c)) {
        c->sentlen = 0;
        //如果内容已经全部输出,删除事件处理器
        if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
        // 数据全部返回,则关闭client和连接
        if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
            freeClient(c);
            return C_ERR;
        }
    }
    return C_OK;
}

个人博客地址,欢迎查看)

Share

Redis 命令执行过程(上)

今天我们来了解一下 Redis 命令执行的过程。在之前的文章中《当 Redis 发生高延迟时,到底发生了什么》我们曾简单的描述了一条命令的执行过程,本篇文章展示深入说明一下,加深读者对 Redis 的了解。

如下图所示,一条命令执行完成并且返回数据一共涉及三部分,第一步是建立连接阶段,响应了socket的建立,并且创建了client对象;第二步是处理阶段,从socket读取数据到输入缓冲区,然后解析并获得命令,执行命令并将返回值存储到输出缓冲区中;第三步是数据返回阶段,将返回值从输出缓冲区写到socket中,返回给客户端,最后关闭client。

这三个阶段之间是通过事件机制串联了,在 Redis 启动阶段首先要注册socket连接建立事件处理器:

  • 当客户端发来建立socket的连接的请求时,对应的处理器方法会被执行,建立连接阶段的相关处理就会进行,然后注册socket读取事件处理器
  • 当客户端发来命令时,读取事件处理器方法会被执行,对应处理阶段的相关逻辑都会被执行,然后注册socket写事件处理器
  • 当写事件处理器被执行时,就是将返回值写回到socket中。

接下来,我们分别来看一下各个步骤的具体原理和代码实现。

启动时监听socket

Redis 服务器启动时,会调用 initServer 方法,首先会建立 Redis 自己的事件机制 eventLoop,然后在其上注册周期时间事件处理器,最后在所监听的 socket 上
创建文件事件处理器,监听 socket 建立连接的事件,其处理函数为 acceptTcpHandler。

void initServer(void) { // server.c
    ....
    /**
     * 创建eventLoop
     */
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    /* Open the TCP listening socket for the user commands. */

    if (server.port != 0 &&
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);

    /**
     * 注册周期时间事件,处理后台操作,比如说客户端操作、过期键等
     */
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
    /**
     * 为所有监听的socket创建文件事件,监听可读事件;事件处理函数为acceptTcpHandler
     * 
     */
    for (j = 0; j < server.ipfd_count; j++) {
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
    ....
}

在《Redis 事件机制详解》一文中,我们曾详细介绍过 Redis 的事件机制,可以说,Redis 命令执行过程中都是由事件机制协调管理的,也就是 initServer 方法中生成的 aeEventLoop。当socket发生对应的事件时,aeEventLoop 对调用已经注册的对应的事件处理器。

建立连接和Client

当客户端向 Redis 建立 socket时,aeEventLoop 会调用 acceptTcpHandler 处理函数,服务器会为每个链接创建一个 Client 对象,并创建相应文件事件来监听socket的可读事件,并指定事件处理函数。

acceptTcpHandler 函数会首先调用 anetTcpAccept方法,它底层会调用 socket 的 accept 方法,也就是接受客户端来的建立连接请求,然后调用 acceptCommonHandler方法,继续后续的逻辑处理。

// 当客户端建立链接时进行的eventloop处理函数  networking.c
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    ....
    // 层层调用,最后在anet.c 中 anetGenericAccept 方法中调用 socket 的 accept 方法
    cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
    if (cfd == ANET_ERR) {
        if (errno != EWOULDBLOCK)
            serverLog(LL_WARNING,
                "Accepting client connection: %s", server.neterr);
        return;
    }
    serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
    /**
     * 进行socket 建立连接后的处理
     */
    acceptCommonHandler(cfd,0,cip);
}

acceptCommonHandler 则首先调用 createClient 创建 client,接着判断当前 client 的数量是否超出了配置的 maxclients,如果超过,则给客户端发送错误信息,并且释放 client。

static void acceptCommonHandler(int fd, int flags, char *ip) { //networking.c
    client *c;
    // 创建redisClient
    c = createClient(fd)
    // 当 maxClient 属性被设置,并且client数量已经超出时,给client发送error,然后释放连接
    if (listLength(server.clients) > server.maxclients) {
        char *err = "-ERR max number of clients reached\r\n";
        if (write(c->fd,err,strlen(err)) == -1) {
        }
        server.stat_rejected_conn++;
        freeClient(c);
        return;
    }
    .... // 处理为设置密码时默认保护状态的客户端连接
    // 统计连接数
    server.stat_numconnections++;
    c->flags |= flags;
}

createClient 方法用于创建 client,它代表着连接到 Redis 客户端,每个客户端都有各自的输入缓冲区和输出缓冲区,输入缓冲区存储客户端通过 socket 发送过来的数据,输出缓冲区则存储着 Redis 对客户端的响应数据。client一共有三种类型,不同类型的对应缓冲区的大小都不同。

  • 普通客户端是除了复制和订阅的客户端之外的所有连接
  • 从客户端用于主从复制,主节点会为每个从节点单独建立一条连接用于命令复制
  • 订阅客户端用于发布订阅功能

createClient 方法除了创建 client 结构体并设置其属性值外,还会对 socket进行配置并注册读事件处理器

设置 socket 为 非阻塞 socket、设置 NO_DELAY 和 SO_KEEPALIVE标志位来关闭 Nagle 算法并且启动 socket 存活检查机制。

设置读事件处理器,当客户端通过 socket 发送来数据后,Redis 会调用 readQueryFromClient 方法。

client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));
    // fd 为 -1,表示其他特殊情况创建的client,redis在进行比如lua脚本执行之类的情况下也会创建client
    if (fd != -1) {
        // 配置socket为非阻塞、NO_DELAY不开启Nagle算法和SO_KEEPALIVE
        anetNonBlock(NULL,fd);
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        /**
         * 向 eventLoop 中注册了 readQueryFromClient。
         * readQueryFromClient 的作用就是从client中读取客户端的查询缓冲区内容。
         * 绑定读事件到事件 loop (开始接收命令请求)
         */
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
    // 默认选择数据库
    selectDb(c,0);
    uint64_t client_id;
    atomicGetIncr(server.next_client_id,client_id,1);
    c->id = client_id;
    c->fd = fd;
    .... // 设置client的属性
    return c;
}

client 的属性中有很多属性,比如后边会看到的输入缓冲区 querybuf 和输出缓冲区 buf,这里因为代码过长做了省略,感兴趣的同学可以自行阅读源码。

读取socket数据到输入缓冲区

readQueryFromClient 方法会调用 read 方法从 socket 中读取数据到输入缓冲区中,然后判断其大小是否大于系统设置的 client_max_querybuf_len,如果大于,则向 Redis返回错误信息,并关闭 client。

将数据读取到输入缓冲区后,readQueryFromClient 方法会根据 client 的类型来做不同的处理,如果是普通类型,则直接调用 processInputBuffer 来处理;如果是主从客户端,还需要将命令同步到自己的从服务器中。也就是说,Redis实例将主实例传来的命令执行后,继续将命令同步给自己的从实例。

// 处理从client中读取客户端的输入缓冲区内容。
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    client *c = (client*) privdata;
    ....
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    // 从 fd 对应的socket中读取到 client 中的 querybuf 输入缓冲区
    nread = read(fd, c->querybuf+qblen, readlen);
    if (nread == -1) {
        .... // 出错释放 client
    } else if (nread == 0) {
        // 客户端主动关闭 connection
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    } else if (c->flags & CLIENT_MASTER) { 
        /*
         * 当这个client代表主从的master节点时,将query buffer和 pending_querybuf结合
         * 用于主从复制中的命令传播????
         */
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }
    // 增加已经读取的字节数
    sdsIncrLen(c->querybuf,nread);
    c->lastinteraction = server.unixtime;
    if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
    server.stat_net_input_bytes += nread;
    // 如果大于系统配置的最大客户端缓存区大小,也就是配置文件中的client-query-buffer-limit
    if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
        sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
        // 返回错误信息,并且关闭client
        bytes = sdscatrepr(bytes,c->querybuf,64);
        serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
        sdsfree(ci);
        sdsfree(bytes);
        freeClient(c);
        return;
    }

    
    if (!(c->flags & CLIENT_MASTER)) {
        // processInputBuffer 处理输入缓冲区
        processInputBuffer(c);
    } else {
        // 如果client是master的连接
        size_t prev_offset = c->reploff;
        processInputBuffer(c);
        // 判断是否同步偏移量发生变化,则通知到后续的slave
        size_t applied = c->reploff - prev_offset;

        if (applied) {
            replicationFeedSlavesFromMasterStream(server.slaves,
                    c->pending_querybuf, applied);
            sdsrange(c->pending_querybuf,applied,-1);
        }
    }
}

解析获取命令

processInputBuffer 主要是将输入缓冲区中的数据解析成对应的命令,根据命令类型是 PROTO_REQ_MULTIBULK 还是 PROTO_REQ_INLINE,来分别调用 processInlineBuffer 和 processMultibulkBuffer 方法来解析命令。

然后调用 processCommand 方法来执行命令。执行成功后,如果是主从客户端,还需要更新同步偏移量 reploff 属性,然后重置 client,让client可以接收一条命令。

void processInputBuffer(client *c) { // networking.c
    server.current_client = c;
    /* 当缓冲区中还有数据时就一直处理 */
    while(sdslen(c->querybuf)) {
        .... // 处理 client 的各种状态
        /* 判断命令请求类型 telnet发送的命令和redis-cli发送的命令请求格式不同 */
        if (!c->reqtype) {
            if (c->querybuf[0] == '*') {
                c->reqtype = PROTO_REQ_MULTIBULK;
            } else {
                c->reqtype = PROTO_REQ_INLINE;
            }
        }
        /**
         * 从缓冲区解析命令
         */
        if (c->reqtype == PROTO_REQ_INLINE) {
            if (processInlineBuffer(c) != C_OK) break;
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) {
            if (processMultibulkBuffer(c) != C_OK) break;
        } else {
            serverPanic("Unknown request type");
        }

        /* 参数个数为0时重置client,可以接受下一个命令 */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            // 执行命令
            if (processCommand(c) == C_OK) {
                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
                    // 如果是master的client发来的命令,则 更新 reploff
                    c->reploff = c->read_reploff - sdslen(c->querybuf);
                }

                // 如果不是阻塞状态,则重置client,可以接受下一个命令
                if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
                    resetClient(c);
            }
        }
    }
    server.current_client = NULL;
}

解析命令暂时不看,就是将 redis 命令文本信息,记录到client的argv/argc属性中

执行命令

processCommand 方法会处理很多逻辑,不过大致可以分为三个部分:首先是调用 lookupCommand 方法获得对应的 redisCommand;接着是检测当前 Redis 是否可以执行该命令;最后是调用 call 方法真正执行命令。

processCommand会做如下逻辑处理:

  • 1 如果命令名称为 quit,则直接返回,并且设置客户端标志位。
  • 2 根据 argv[0] 查找对应的 redisCommand,所有的命令都存储在命令字典 redisCommandTable 中,根据命令名称可以获取对应的命令。
  • 3 进行用户权限校验。
  • 4 如果是集群模式,处理集群重定向。当命令发送者是 master 或者 命令没有任何 key 的参数时可以不重定向。
  • 5 预防 maxmemory 情况,先尝试回收一下,如果不行,则返回异常。
  • 6 当此服务器是 master 时:aof 持久化失败时,或上一次 bgsave 执行错误,且配置 bgsave 参数和 stop_writes_on_bgsave_err;禁止执行写命令。
  • 7 当此服务器时master时:如果配置了 repl_min_slaves_to_write,当slave数目小于时,禁止执行写命令。
  • 8 当时只读slave时,除了 master 的不接受其他写命令。
  • 9 当客户端正在订阅频道时,只会执行部分命令。
  • 10 服务器为slave,但是没有连接 master 时,只会执行带有 CMD_STALE 标志的命令,如 info 等
  • 11 正在加载数据库时,只会执行带有 CMD_LOADING 标志的命令,其余都会被拒绝。
  • 12 当服务器因为执行lua脚本阻塞时,只会执行部分命令,其余都会拒绝
  • 13 如果是事务命令,则开启事务,命令进入等待队列;否则直接执行命令。
int processCommand(client *c) {
    // 1 处理 quit 命令
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }

    /**
     * 根据 argv[0] 查找对应的 command
     * 2 命令字典查找指定命令;所有的命令都存储在命令字典中 struct redisCommand redisCommandTable[]={}
     */
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {
        // 处理未知命令
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < -c->cmd->arity)) {
        // 处理参数错误
    }
    // 3 检查用户验证
    if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
    {
        flagTransaction(c);
        addReply(c,shared.noautherr);
        return C_OK;
    }

    /**
     * 4 如果是集群模式,处理集群重定向。当命令发送者是master或者 命令没有任何key的参数时可以不重定向
     */
    if (server.cluster_enabled &&
        !(c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_LUA &&
          server.lua_caller->flags & CLIENT_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
          c->cmd->proc != execCommand))
    {
        int hashslot;
        int error_code;
        // 查询可以执行的node信息
        clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
                                        &hashslot,&error_code);
        if (n == NULL || n != server.cluster->myself) {
            if (c->cmd->proc == execCommand) {
                discardTransaction(c);
            } else {
                flagTransaction(c);
            }
            clusterRedirectClient(c,n,hashslot,error_code);
            return C_OK;
        }
    }

    // 5 处理maxmemory请求,先尝试回收一下,如果不行,则返回异常
    if (server.maxmemory) {
        int retval = freeMemoryIfNeeded();
        ....
    }

    /**
     * 6 当此服务器是master时:aof持久化失败时,或上一次bgsave执行错误,
     * 且配置bgsave参数和stop_writes_on_bgsave_err;禁止执行写命令
     */
    if (((server.stop_writes_on_bgsave_err &&
          server.saveparamslen > 0 &&
          server.lastbgsave_status == C_ERR) ||
          server.aof_last_write_status == C_ERR) &&
        server.masterhost == NULL &&
        (c->cmd->flags & CMD_WRITE ||
         c->cmd->proc == pingCommand)) { .... }

    /**
     * 7 当此服务器时master时:如果配置了repl_min_slaves_to_write,
     * 当slave数目小于时,禁止执行写命令
     */
    if (server.masterhost == NULL &&
        server.repl_min_slaves_to_write &&
        server.repl_min_slaves_max_lag &&
        c->cmd->flags & CMD_WRITE &&
        server.repl_good_slaves_count < server.repl_min_slaves_to_write) { .... }

    /**
     * 8 当时只读slave时,除了master的不接受其他写命令
     */
    if (server.masterhost && server.repl_slave_ro &&
        !(c->flags & CLIENT_MASTER) &&
        c->cmd->flags & CMD_WRITE) { .... }

    /**
     * 9 当客户端正在订阅频道时,只会执行以下命令
     */
    if (c->flags & CLIENT_PUBSUB &&
        c->cmd->proc != pingCommand &&
        c->cmd->proc != subscribeCommand &&
        c->cmd->proc != unsubscribeCommand &&
        c->cmd->proc != psubscribeCommand &&
        c->cmd->proc != punsubscribeCommand) { .... }
    /**
     * 10 服务器为slave,但没有正确连接master时,只会执行带有CMD_STALE标志的命令,如info等
     */
    if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
        server.repl_serve_stale_data == 0 &&
        !(c->cmd->flags & CMD_STALE)) {...}
    /**
     * 11 正在加载数据库时,只会执行带有CMD_LOADING标志的命令,其余都会被拒绝
     */
    if (server.loading && !(c->cmd->flags & CMD_LOADING)) { .... }
    /**
     * 12 当服务器因为执行lua脚本阻塞时,只会执行以下几个命令,其余都会拒绝
     */
    if (server.lua_timedout &&
          c->cmd->proc != authCommand &&
          c->cmd->proc != replconfCommand &&
        !(c->cmd->proc == shutdownCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
        !(c->cmd->proc == scriptCommand &&
          c->argc == 2 &&
          tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) {....}

    /**
     * 13 开始执行命令
     */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        /**
         * 开启了事务,命令只会入队列
         */
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        /**
         * 直接执行命令
         */
        call(c,CMD_CALL_FULL);
        c->woff = server.master_repl_offset;
        if (listLength(server.ready_keys))
            handleClientsBlockedOnLists();
    }
    return C_OK;
}


struct redisCommand redisCommandTable[] = {
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
    {"hmset",hsetCommand,-4,"wmF",0,NULL,1,1,1,0,0},
    .... // 所有的 redis 命令都有
}

call 方法是 Redis 中执行命令的通用方法,它会处理通用的执行命令的前置和后续操作。

  • 如果有监视器 monitor,则需要将命令发送给监视器。
  • 调用 redisCommand 的proc 方法,执行对应具体的命令逻辑。
  • 如果开启了 CMD_CALL_SLOWLOG,则需要记录慢查询日志
  • 如果开启了 CMD_CALL_STATS,则需要记录一些统计信息
  • 如果开启了 CMD_CALL_PROPAGATE,则当 dirty大于0时,需要调用 propagate 方法来进行命令传播。

命令传播就是将命令写入 repl-backlog-buffer 缓冲中,并发送给各个从服务器中。

// 执行client中持有的 redisCommand 命令
void call(client *c, int flags) {
    /**
     * dirty记录数据库修改次数;start记录命令开始执行时间us;duration记录命令执行花费时间
     */
    long long dirty, start, duration;
    int client_old_flags = c->flags;

    /**
     * 有监视器的话,需要将不是从AOF获取的命令会发送给监视器。当然,这里会消耗时间
     */
    if (listLength(server.monitors) &&
        !server.loading &&
        !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
    {
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
    }
    ....
    /* Call the command. */
    dirty = server.dirty;
    start = ustime();
    // 处理命令,调用命令处理函数
    c->cmd->proc(c);
    duration = ustime()-start;
    dirty = server.dirty-dirty;
    if (dirty < 0) dirty = 0;

    .... // Lua 脚本的一些特殊处理

    /**
     * CMD_CALL_SLOWLOG 表示要记录慢查询日志
     */
    if (flags & CMD_CALL_SLOWLOG && c->cmd->proc != execCommand) {
        char *latency_event = (c->cmd->flags & CMD_FAST) ?
                              "fast-command" : "command";
        latencyAddSampleIfNeeded(latency_event,duration/1000);
        slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
    }
    /**
     * CMD_CALL_STATS 表示要统计
     */
    if (flags & CMD_CALL_STATS) {
        c->lastcmd->microseconds += duration;
        c->lastcmd->calls++;
    }
    /**
     * CMD_CALL_PROPAGATE表示要进行广播命令
     */
    if (flags & CMD_CALL_PROPAGATE &&
        (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
    {
        int propagate_flags = PROPAGATE_NONE;
        /**
         * dirty大于0时,需要广播命令给slave和aof
         */
        if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
        .... 
        /**
         * 广播命令,写如aof,发送命令到slave
         * 也就是传说中的传播命令
         */
        if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
    }
    ....
}

由于文章篇幅问题,本篇文章就先讲到这里,后半部分在接下来的文章中进行讲解,欢迎大家继续关注。

Share

Java 数据持久化系列之JDBC

前段时间小冰在工作中遇到了一系列关于数据持久化的问题,在排查问题时发现自己对 Java 后端的数据持久化框架的原理都不太了解,只有不断试错,因此走了很多弯路。于是下定决心,集中精力学习了持久化相关框架的原理和实现,总结出这个系列。

上图是我根据相关源码和网上资料总结的有关 Java 数据持久化的架构图(只代表本人想法,如有问题,欢迎留言指出)。最下层就是今天要讲的 JDBC,上一层是数据库连接池层,包括 HikariCP 和 Druid等;再上一层是分库分表中间件,比如说 ShardingJDBC;再向上是对象关系映射层,也就是 ORM,包括 Mybatis 和 JPA;最上边是 Spring 的事务管理。

本系列的文章会依次讲解图中各个开源框架的基础使用,然后描述其原理和代码实现,最后会着重分析它们之间是如何相互集成和配合的。

废话不多说,我们先来看 JDBC。

JDBC 定义

JDBC是Java Database Connectivity的简称,它定义了一套访问数据库的规范和接口。但它自身不参与数据库访问的实现。因此对于目前存在的数据库(譬如Mysql、Oracle)来说,要么数据库制造商本身提供这些规范与接口的实现,要么社区提供这些实现。

如上图所示,Java 程序只依赖于 JDBC API,通过 DriverManager 来获取驱动,并且针对不同的数据库可以使用不同的驱动。这是典型的桥接的设计模式,把抽象 Abstraction 与行为实现Implementation 分离开来,从而可以保持各部分的独立性以及应对他们的功能扩展。

JDBC 基础代码示例

单纯使用 JDBC 的代码逻辑十分简单,我们就以最为常用的MySQL 为例,展示一下使用 JDBC 来建立数据库连接、执行查询语句和遍历结果的过程。

public static void connectionTest(){

    Connection connection = null;
    Statement statement = null;
    ResultSet resultSet = null;

    try {
        // 1. 加载并注册 MySQL 的驱动
        Class.forName("com.mysql.cj.jdbc.Driver").newInstance();

        // 2. 根据特定的数据库连接URL,返回与此URL的所匹配的数据库驱动对象
        Driver driver = DriverManager.getDriver(URL);
        // 3. 传入参数,比如说用户名和密码
        Properties props = new Properties();
        props.put("user", USER_NAME);
        props.put("password", PASSWORD);

        // 4. 使用数据库驱动创建数据库连接 Connection
        connection = driver.connect(URL, props);

        // 5. 从数据库连接 connection 中获得 Statement 对象
        statement = connection.createStatement();
        // 6. 执行 sql 语句,返回结果
        resultSet = statement.executeQuery("select * from activity");
        // 7. 处理结果,取出数据
        while(resultSet.next())
        {
            System.out.println(resultSet.getString(2));
        }

        .....
    }finally{
        // 8.关闭链接,释放资源  按照JDBC的规范,使用完成后管理链接,
        // 释放资源,释放顺序应该是: ResultSet ->Statement ->Connection
        resultSet.close();
        statement.close();
        connection.close();
    }
}

代码中有详细的注释描述每一步的过程,相信大家也都对这段代码十分熟悉。

唯一要提醒的是使用完之后的资源释放顺序。按照 JDBC 规范,应该依次释放 ResultSet,Statement 和 Connection。当然这只是规范,很多开源框架都没有严格的执行,但是 HikariCP却严格准守了,它可以带来很多优势,这些会在之后的文章中讲解。

上图是 JDBC 中核心的 5 个类或者接口的关系,它们分别是 DriverManager、Driver、Connection、Statement 和 ResultSet。

DriverManager 负责管理数据库驱动程序,根据 URL 获取与之匹配的 Driver 具体实现。Driver 则负责处理与具体数据库的通信细节,根据 URL 创建数据库连接 Connection。

Connection 表示与数据库的一个连接会话,可以和数据库进行数据交互。Statement 是需要执行的 SQL 语句或者存储过程语句对应的实体,可以执行对应的 SQL 语句。ResultSet 则是 Statement 执行后获得的结果集对象,可以使用迭代器从中遍历数据。

不同数据库的驱动都会实现各自的 Driver、Connection、Statement 和 ResultSet。而更为重要的是,众多数据库连接池和分库分表框架也都是实现了自己的 Connection、Statement 和 ResultSet,比如说 HikariCP、Druid 和 ShardingJDBC。我们接下来会经常看到它们的身影。

接下来,我们依次看一下这几个类及其涉及的操作的原理和源码实现。

载入 Driver 实现

可以直接使用 Class#forName的方式来载入驱动实现,或者在 JDBC 4.0 后则基于 SPI 机制来导入驱动实现,通过在 META-INF/services/java.sql.Driver 文件中指定实现类的方式来导入驱动实现,下面我们就来看一下两种方式的实现原理。

Class#forName 作用是要求 JVM 查找并加载指定的类,如果在类中有静态初始化器的话,JVM 会执行该类的静态代码段。加载具体 Driver 实现时,就会执行 Driver 中的静态代码段,将该 Driver 实现注册到 DriverManager 中。我们来看一下 MySQL 对应 Driver 的具体代码。它就是直接调用了 DriverManager的 registerDriver 方法将自己注册到其维护的驱动列表中。

public class Driver extends NonRegisteringDriver implements java.sql.Driver {
    public Driver() throws SQLException {
    }

    static {
        // 直接调用 DriverManager的 registerDriver 将自己注册到其中
        DriverManager.registerDriver(new Driver());
    }
}

SPI 机制使用 ServiceLoader 类来提供服务发现机制,动态地为某个接口寻找服务实现。当服务的提供者提供了服务接口的一种实现之后,必须根据 SPI 约定在 META-INF/services 目录下创建一个以服务接口命名的文件,在该文件中写入实现该服务接口的具体实现类。当服务调用 ServiceLoader 的 load 方法的时候,ServiceLoader 能够通过约定的目录找到指定的文件,并装载实例化,完成服务的发现。

DriverManager 中的 loadInitialDrivers 方法会使用 ServiceLoader 的 load 方法加载目前项目路径下的所有 Driver 实现。

public class DriverManager {
    // 程序中已经注册的Driver具体实现信息列表。registerDriver类就是将Driver加入到这个列表
    private final static CopyOnWriteArrayList<DriverInfo> registeredDrivers = new CopyOnWriteArrayList<>();
    // 使用ServiceLoader 加载具体的jdbc driver实现
    static {
        loadInitialDrivers();
    }
    private static void loadInitialDrivers() {
        // 省略了异常处理
        // 获得系统属性 jdbc.drivers 配置的值
        String drivers = AccessController.doPrivileged(new PrivilegedAction<String>() {
            public String run() {
                return System.getProperty("jdbc.drivers");
            }
        });

        AccessController.doPrivileged(new PrivilegedAction<Void>() {
            public Void run() {

                ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class);
                Iterator<Driver> driversIterator = loadedDrivers.iterator();
                // 通过 ServiceLoader 获取到Driver的具体实现类,然后加载这些类,会调用其静态代码块
                while(driversIterator.hasNext()) {
                    driversIterator.next();
                }
                return null;
            }
        });

        String[] driversList = drivers.split(":");
        // for 循环加载系统属性中的Driver类。
        for (String aDriver : driversList) {
            println("DriverManager.Initialize: loading " + aDriver);
            Class.forName(aDriver, true,
                    ClassLoader.getSystemClassLoader());
        }
    }
}

比如说,项目引用了 MySQL 的 jar包 mysql-connector-java,在这个 jar 包的 META-INF/services 文件夹下有一个叫 java.sql.Driver 的文件,文件的内容为 com.mysql.cj.jdbc.Driver。而 ServiceLoader 的 load 方法找到这个文件夹下的文件,读取文件的内容,然后加载出文件内容所指定的 Driver 实现。而正如之前所分析的,这个 Driver 类被加载时,会调用 DriverManager 的 registerDriver 方法,从而完成了驱动的加载。

Connection、Statement 和 ResultSet

当程序加载完具体驱动实现后,接下来就是建立与数据库的连接,执行 SQL 语句并且处理返回结果了,其过程如下图所示。

建立 Connection

创建 Connection 连接对象,可以使用 Driver 的 connect 方法,也可以使用 DriverManager 提供的 getConnection 方法,此方法通过 url 自动匹配对应的驱动 Driver 实例,然后还是调用对应的 connect 方法返回 Connection 对象实例。

建立 Connection 会涉及到与数据库进行网络请求等大量费时的操作,为了提升性能,往往都会引入数据库连接池,也就是说复用 Connection,免去每次都创建 Connection 所消耗的时间和系统资源。

Connection 默认情况下,对于创建的 Statement 执行的 SQL 语句都是自动提交事务的,即在 Statement 语句执行完后,自动执行 commit 操作,将事务提交,结果影响到物理数据库。为了满足更好地事务控制需求,我们也可以手动地控制事务,手动地在Statement 的 SQL 语句执行后进行 commit 或者rollback。

connection = driver.connect(URL, props);
// 将自动提交关闭
connection.setAutoCommit(false);

statement = connection.createStatement();
statement.execute("INSERT INTO activity (activity_id, activity_name, product_id, start_time, end_time, total, status, sec_speed, buy_limit, buy_rate) VALUES (1, '香蕉大甩卖', 1, 530871061, 530872061, 20, 0, 1, 1, 0.20);");
// 执行后手动 commit
statement.getConnection().commit();

Statement

Statement 的功能在于根据传入的 SQL 语句,将传入 SQL 经过整理组合成数据库能够识别的执行语句(对于静态的 SQL 语句,不需要整理组合;而对于预编译SQL 语句和批量语句,则需要整理),然后传递 SQL 请求,之后会得到返回的结果。对于查询 SQL,结果会以 ResultSet 的形式返回。

当你创建了一个 Statement 对象之后,你可以用它的三个执行方法的任一方法来执行 SQL 语句。

  • boolean execute(String SQL) : 如果 ResultSet 对象可以被检索,则返回的布尔值为 true ,否则返回 false 。当你需要使用真正的动态 SQL 时,可以使用这个方法来执行 SQL DDL 语句。
  • int executeUpdate(String SQL) : 返回执行 SQL 语句影响的行的数目。使用该方法来执行 SQL 语句,是希望得到一些受影响的行的数目,例如,INSERT,UPDATE 或 DELETE 语句。
  • ResultSet executeQuery(String SQL) : 返回一个 ResultSet 对象。当你希望得到一个结果集时使用该方法,就像你使用一个 SELECT 语句。

对于不同类型的 SQL 语句,Statement 有不同的接口与其对应。

接口 介绍
Statement 适合运行静态 SQL 语句,不接受动态参数
PreparedStatement 计划多次使用并且预先编译的 SQL 语句,接口需要传入额外的参数
CallableStatement 用于访问数据库存储过程

Statement 主要用于执行静态SQL语句,即内容固定不变的SQL语句。Statement每执行一次都要对传入的SQL语句编译一次,效率较差。而 PreparedStatement则解决了这个问题,它会对 SQL 进行预编译,提高了执行效率。

PreparedStatement pstmt = null;
    try {
        String SQL = "Update activity SET activity_name = ? WHERE activity_id = ?";
        pstmt = connection.prepareStatement(SQL);
        pstmt.setString(1, "测试");
        pstmt.setInt(2, 1);
        pstmt.executeUpdate();
    }
    catch (SQLException e) {
    }
    finally {
        pstmt.close();
    }
}

除此之外, PreparedStatement 还可以预防 SQL 注入,因为 PreparedStatement 不允许在插入参数时改变 SQL 语句的逻辑结构。

PreparedStatement 传入任何数据不会和原 SQL 语句发生匹配关系,无需对输入的数据做过滤。如果用户将”or 1 = 1”传入赋值给占位符,下述SQL 语句将无法执行:select * from t where username = ? and password = ?。

ResultSet

当 Statement 查询 SQL 执行后,会得到 ResultSet 对象,ResultSet 对象是 SQL语句查询的结果集合。ResultSet 对从数据库返回的结果进行了封装,使用迭代器的模式可以逐条取出结果集中的记录。

while(resultSet.next()) {
    System.out.println(resultSet.getString(2));
}

ResultSet 一般也建议使用完毕直接 close 掉,但是需要注意的是关闭 ResultSet 对象不关闭其持有的 Blob、Clob 或 NClob 对象。 Blob、Clob 或 NClob 对象在它们被创建的的事务期间会一直持有效,除非其 free 函数被调用。

参考

Share

当 Redis 发生高延迟时,到底发生了什么

Redis 是一种内存数据库,将数据保存在内存中,读写效率要比传统的将数据保存在磁盘上的数据库要快很多。但是 Redis 也会发生延迟时,这是就需要我们对其产生原因有深刻的了解,以便于快速排查问题,解决 Redis的延迟问题

一条命令执行过程

在本文场景下,延迟 (latency) 是指从客户端发送命令到客户端接收到命令返回值的时间间隔。所以我们先来看一下 Redis 一条命令执行的步骤,其中每个步骤出问题都可能导致高延迟。

上图是 Redis 客户端发送一条命令的执行过程示意图,绿色的是执行步骤,而蓝色的则是可能出现的导致高延迟的原因。

网络连接限制、网络传输速率和CPU性能等是所有服务端都可能产生的性能问题。但是 Redis 有自己独有的可能导致高延迟的问题:命令或者数据结构误用、持久化阻塞和内存交换。

而且更为致命的是,Redis 采用单线程和事件驱动的机制来处理网络请求,分别有对应的连接应答处理器,命令请求处理器和命令回复处理器来处理客户端的网络请求事件,处理完一个事件就继续处理队列中的下一个。一条命令处理出现了高延迟会影响接下来处于排队状态的其他命令。有关 Redis 事件处理机制的可以参考本篇文章

对于高延迟,Redis 原生提供慢查询统计功能,执行 slowlog get {n} 命令可以获取最近的 n 条慢查询命令,默认对于执行超过10毫秒(可配置)的命令都会记录到一个定长队列中,线上实例建议设置为1毫秒便于及时发现毫秒级以上的命令。

# 超过 slowlog-log-slower-than 阈值的命令都会被记录到慢查询队列中
# 队列最大长度为 slowlog-max-len
slowlog-log-slower-than 10000
slowlog-max-len 128

如果命令执行时间在毫秒级,则实例实际OPS只有1000左右。慢查询队列长度默认128,可适当调大。慢查询本身只记录了命令执行时间,不包括数据网络传输时间和命令排队时间,因此客户端发生阻塞异常 后,可能不是当前命令缓慢,而是在等待其他命令执行。需要重点比对异常和慢查询发生的时间点,确认是否有慢查询造成的命令阻塞排队。

slowlog的输出格式如下所示。第一个字段表示该条记录在所有慢日志中的序号,最新的记录被展示在最前面;第二个字段是这条记录被记录时的系统时间,可以用 date 命令来将其转换为友好的格式第三个字段表示这条命令的响应时间,单位为 us (微秒);第四个字段为对应的 Redis 操作。

> slowlog get
1) 1) (integer) 26
   2) (integer) 1450253133
   3) (integer) 43097
   4) 1) "flushdb"

下面我们就来依次看一下不合理地使用命令或者数据结构、持久化阻塞和内存交换所导致的高延迟问题。

不合理的命令或者数据结构

一般来说 Redis 执行命令速度都非常快,但是当数据量达到一定级别时,某些命令的执行就会花费大量时间,比如对一个包含上万个元素的 hash 结构执行 hgetall 操作,由于数据量比较大且命令算法复杂度是 O(n),这条命令执行速度必然很慢。

这个问题就是典型的不合理使用命令和数据结构。对于高并发的场景我们应该尽量避免在大对象上执行算法复杂度超过 O(n) 的命令。对于键值较多的 hash 结构可以使用 scan 系列命令来逐步遍历,而不是直接使用 hgetall 来全部获取。

Redis 本身提供发现大对象的工具,对应命令:redis-cli-h {ip} -p {port} bigkeys。这条命令会使用 scan 从指定的 Redis DB 中持续采样,实时输出当时得到的 value 占用空间最大的 key 值,并在最后给出各种数据结构的 biggest key 的总结报告。

> redis-cli -h host -p 12345 --bigkeys

# Scanning the entire keyspace to find biggest keys as well as
# average sizes per key type.  You can use -i 0.1 to sleep 0.1 sec
# per 100 SCAN commands (not usually needed).

[00.00%] Biggest hash   found so far 'idx:user' with 1 fields
[00.00%] Biggest hash   found so far 'idx:product' with 3 fields
[00.00%] Biggest hash   found so far 'idx:order' with 14 fields
[02.29%] Biggest hash   found so far 'idx:fund' with 16 fields
[02.29%] Biggest hash   found so far 'idx:pay' with 69 fields
[04.45%] Biggest set    found so far 'indexed_word_set' with 1482 members
[05.93%] Biggest hash   found so far 'idx:address' with 159 fields
[11.79%] Biggest hash   found so far 'idx:reply' with 196 fields

-------- summary -------

Sampled 1484 keys in the keyspace!
Total key length in bytes is 13488 (avg len 9.09)

Biggest    set found 'indexed_word_set' has 1482 members
Biggest   hash found 'idx:的' has 196 fields

0 strings with 0 bytes (00.00% of keys, avg size 0.00)
0 lists with 0 items (00.00% of keys, avg size 0.00)
2 sets with 1710 members (00.13% of keys, avg size 855.00)
1482 hashs with 6731 fields (99.87% of keys, avg size 4.54)
0 zsets with 0 members (00.00% of keys, avg size 0.00)

持久化阻塞

对于开启了持久化功能的Redis节点,需要排查是否是持久化导致的阻 塞。持久化引起主线程阻塞的操作主要有:fork 阻塞、AOF刷盘阻塞。

fork 操作发生在 RDB 和 AOF 重写时,Redis 主线程调用 fork 操作产生共享内存的子进程,由子进程完成对应的持久化工作。如果 fork 操作本身耗时过长,必然会导致主线程的阻塞。

Redis 执行 fork 操作产生的子进程内存占用量表现为与父进程相同,理论上需要一倍的物理内存来完成相应的操作。但是 Linux 具有写时复制技术 (copy-on-write),父子进程会共享相同的物理内存页,当父进程处理写请求时会对需要修改的页复制出一份副本完成写操作,而子进程依然读取 fork 时整个父进程的内存快照。所以,一般来说,fork 不会消耗过多时间。

可以执行info stats命令获取到 latest_fork_usec 指标,表示 Redis 最近一次 fork 操作耗时,如果耗时很大,比如超过1秒,则需要做出优化调整。

> redis-cli -c -p 7000 info | grep -w latest_fork_usec
latest_fork_usec:315

当我们开启AOF持久化功能时,文件刷盘的方式一般采用每秒一次,后 台线程每秒对AOF文件做 fsync 操作。当硬盘压力过大时,fsync 操作需要等待,直到写入完成。如果主线程发现距离上一次的 fsync 成功超过2秒,为了数据安全性它会阻塞直到后台线程执行 fsync 操作完成。这种阻塞行为主要是硬盘压力引起,可以查看 Redis日志识别出这种情况,当发生这种阻塞行为时,会打印如下日志:

Asynchronous AOF fsync is taking too long (disk is busy). \ 
Writing the AOF buffer without waiting for fsync to complete, \
this may slow down Redis.

也可以查看 info persistence 统计中的 aof_delayed_fsync 指标,每次发生 fdatasync 阻塞主线程时会累加。

>info persistence
loading:0
aof_pending_bio_fsync:0
aof_delayed_fsync:0

内存交换

内存交换(swap)对于 Redis 来说是非常致命的,Redis 保证高性能的一个重要前提是所有的数据在内存中。如果操作系统把 Redis 使用的部分内存换出到硬盘,由于内存与硬盘读写速度差几个数量级,会导致发生交换后的 Redis 性能急剧下降。识别 Redis 内存交换的检查方法如下:

>redis-cli -p 6383 info server | grep process_id # 查询 redis 进程号
>cat /proc/4476/smaps | grep Swap # 查询内存交换大小
Swap: 0 kB 
Swap: 4 kB 
Swap: 0 kB 
Swap: 0 kB

如果交换量都是0KB或者个别的是4KB,则是正常现象,说明Redis进程内存没有被交换。

有很多方法可以避免内存交换的发生。比如说:

  • 保证机器充足的可用内存
  • 确保所有Redis实例设置最大可用内存(maxmemory),防止极端情况下 Redis 内存不可控的增长。
  • 降低系统使用swap优先级,如echo10>/proc/sys/vm/swappiness

参考

Share

公理设计-由奇怪海战引发的软件设计思考

前几天看到了一个博客,推荐了《公理设计》一书,还有其相关的文档以及视频
)。简单了解了一下,增深了一些对软件设计的理解,特此也推荐给大家。

公理设计理论将设计建立在科学公理、定理和推论的基础上,由麻省理工学院教授 Nam. P. Suh 领导的研究小组于 1978 年提出,适用于各种类别的设计活动。软件设计当然也属于一类工程设计过程,下面我们就来看一下两者的关联。

奇怪的海战

首先从1862年11月13日的一场海战讲起。这场海战“标志着蒸汽动力铁甲舰新时代的到来。为了便于理解,我这里对舰船名称进行了修改,想了解的朋友可以百度 U.S.S. Monitor battles C.S.S. Virginia.

南方叛军的大大号战舰,体型庞大,非常凶悍。已经击沉了两艘联邦军舰。北方政府军则只派出小小号,一艘非常小,火力也小多的军舰。

大大号顾名思义,它船体特别的大,但是都是固定炮塔,两侧和首尾有很多门炮。而小小号虽然小,却有一个可以旋转的炮台。

我们可以理解为一条战舰需要有两个基础功能:调整航行方向和调整炮击方向。

对于大大号,这两个功能需求是耦合 couple 的,要改变炮击方向,就需要将船只转向。而对于小小号,这两个功能需求则是解耦合 decouple 的,航行方向与炮击方向无关,炮击方向可以独立调整。

于是小小号一直尽量守在大大号的射击死角攻击,而大大号虽然火力猛烈则必须不断通过改变航线来调整炮击方向,于是就不断绕圈。这两条船打了4个小时,大大号不得不撤退了,小小号获得了胜利。

由此可见功能之间的解耦十分重要,它增加了便捷性和灵活性。

工科生最爱的映射矩阵

​书中由海战作为引子,介绍了设计过程中的四个域(Domain):

  • CNs:Customer Needs,客户域,就是客户描述的一大堆自然语言也说不清楚的事情,什么高端大气上档次之类的东西。
  • FRs:Functional Requirements,功能域,从 CNs 域到 FRs 域的变换,就是把客户漫无边际的需求翻译成一些可定量的参数,比如战舰控制系统的 FR 是控制航行方向和控制开炮方向。
  • DPs:Design Parameters,设计参数,或者叫物理域,实现 FRs 的物理参数,比如航向控制器和炮塔控制器。
  • PVs:Process Variables,过程变量,或者叫过程域,是描述实现功能过程中涉及的过程变量。

相邻域之间的映射,可以看成目标(做什么?)和手段(怎样做?)之间的对应关系。设计过程是相邻域中特征向量之间映射和转换过程。

例如,用户域元素映射到功能域的过程,实际上是将用户需求转变成产品功能要素的过程,即产品规划;功能域向物理域的映射过程是产品的设计过程;从物理域到过程域的映射则可看成“加工产品”的过程。

其中最为重要的是FRs(功能需求)到DPs(设计参数)的映射,这也是我们软件开发过程中最长接触的步骤,需求文档有了,如何进行代码设计并实现。

书中以矩阵向量的方式讲述了 FRs (功能需求) 和 DPs (设计参数) 的映射关系,也就是上图中由 A 变量组成的矩阵代表着 FPs 到 DPs 的映射。不同的矩阵代表着不同的映射关系,其实我们不需要关心矩阵各个位置的具体值如何计算,只需简化的了解如果 FP 和 DP 有关联,则矩阵相应位置上的值为1,否则为0。

比如说小小号上的情况,有两个功能需要:FR1(调整航向)和FR2(调整开炮方向);以及两个设计参数:DP1(船舵)和DP2(旋转炮塔)

其中转动船舵的时候,船会转向,所以A11这里是X,同时船身上的炮塔也跟着船一起转向,所以也影响开炮方向FR2,因此A21也是X。 而在旋转炮塔的时候,不影响船的航行方向,所以A12这里是0。

好的设计?

所以,基于上边这个映射矩阵,好的设计应该有两个特点:

  • 首先FRs(功能需求)的数量N,应当等于DPs (设计参数)的数量M。
  • 每一个FR(功能需求)与且只与一个DP(设计参数)相互关联。

也就是说映射矩阵是一个对角矩阵,对角线上有值,其他位置都是0。《程序员修炼之道》中也提及了类似的思想,也就是正交性一节。那一节的提示是消除无关事务之间的影响,正好和这里映射矩阵是对角矩阵不谋而合。当映射举证是对角矩阵时,说明 FR 和 DP 一一对应,不会有交叉影响。当某一个 FR也就是需求发生变更时,只需要修改一个DP。

当然对角矩阵属于比较理想的情况,书中也罗列了一些其他类型的映射矩阵。

其中最差的情况是 FRs(功能需求)的数量N,小于 DPs(设计参数)的数量M。也就是大大号中的情景:它有两个功能需求,FR1 调整航向
和FR2 调整开炮方向,但只有一个DP1 船舵。所以它的映射矩阵如下图所示。

书中还继续讲解了矩阵分解的知识,也就是对应了需求功能点细分到软件详细设计细分等部分的内容,有兴趣的小伙伴可以自己去看看。

总结

所以书中最后给出两个公里:

  • 独立公理(功能独立性公理)
  • 信息公理(信息量最少公理)

这不正是软件设计中经常提及的松耦合和高内聚嘛。模块相互独立互不影响就是松耦合,最小化信息量就是不对外暴露过多信息,也就是高内聚或者信息隐藏。

Share

详解 Redis 内存管理机制和实现

Redis是一个基于内存的键值数据库,其内存管理是非常重要的。本文内存管理的内容包括:过期键的懒性删除和过期删除以及内存溢出控制策略。

最大内存限制

Redis使用 maxmemory 参数限制最大可用内存,默认值为0,表示无限制。限制内存的目的主要 有:

  • 用于缓存场景,当超出内存上限 maxmemory 时使用 LRU 等删除策略释放空间。
  • 防止所用内存超过服务器物理内存。因为 Redis 默认情况下是会尽可能多使用服务器的内存,可能会出现服务器内存不足,导致 Redis 进程被杀死。

maxmemory 限制的是Redis实际使用的内存量,也就是 used_memory统计项对应的内存。由于内存碎片率的存在,实际消耗的内存 可能会比maxmemory设置的更大,实际使用时要小心这部分内存溢出。具体Redis 内存监控的内容请查看一文了解 Redis 内存监控和内存消耗

Redis默认无限使用服务器内存,为防止极端情况下导致系统内存耗 尽,建议所有的Redis进程都要配置maxmemory。 在保证物理内存可用的情况下,系统中所有Redis实例可以调整 maxmemory参数来达到自由伸缩内存的目的。

内存回收策略

Redis 回收内存大致有两个机制:一是删除到达过期时间的键值对象;二是当内存达到 maxmemory 时触发内存移除控制策略,强制删除选择出来的键值对象。

删除过期键对象

Redis 所有的键都可以设置过期属性,内部保存在过期表中,键值表和过期表的结果如下图所示。当 Redis保存大量的键,对每个键都进行精准的过期删除可能会导致消耗大量的 CPU,会阻塞 Redis 的主线程,拖累 Redis 的性能,因此 Redis 采用惰性删除和定时任务删除机制实现过期键的内存回收。

惰性删除是指当客户端操作带有超时属性的键时,会检查是否超过键的过期时间,然后会同步或者异步执行删除操作并返回键已经过期。这样可以节省 CPU成本考虑,不需要单独维护过期时间链表来处理过期键的删除。

过期键的惰性删除策略由 db.c/expireifNeeded 函数实现,所有对数据库的读写命令执行之前都会调用 expireifNeeded 来检查命令执行的键是否过期。如果键过期,expireifNeeded 会将过期键从键值表和过期表中删除,然后同步或者异步释放对应对象的空间。源码展示的时 Redis 4.0 版本。

expireIfNeeded 先从过期表中获取键对应的过期时间,如果当前时间已经超过了过期时间(lua脚本执行则有特殊逻辑,详看代码注释),则进入删除键流程。删除键流程主要进行了三件事:

  • 一是删除操作命令传播,通知 slave 实例并存储到 AOF 缓冲区中
  • 二是记录键空间事件,
  • 三是根据 lazyfree_lazy_expire 是否开启进行异步删除或者异步删除操作。
int expireIfNeeded(redisDb *db, robj *key) {
    // 获取键的过期时间
    mstime_t when = getExpire(db,key);
    mstime_t now;
    // 键没有过期时间
    if (when < 0) return 0;
    // 实例正在从硬盘 laod 数据,比如说 RDB 或者 AOF
    if (server.loading) return 0;

    // 当执行lua脚本时,只有键在lua一开始执行时
    // 就到了过期时间才算过期,否则在lua执行过程中不算失效
    now = server.lua_caller ? server.lua_time_start : mstime();

    // 当本实例是slave时,过期键的删除由master发送过来的
    // del 指令控制。但是这个函数还是将正确的信息返回给调用者。
    if (server.masterhost != NULL) return now > when;
    // 判断是否未过期
    if (now <= when) return 0;

    // 代码到这里,说明键已经过期,而且需要被删除
    server.stat_expiredkeys++;
    // 命令传播,到 slave 和 AOF
    propagateExpire(db,key,server.lazyfree_lazy_expire);
    // 键空间通知使得客户端可以通过订阅频道或模式, 来接收那些以某种方式改动了 Redis 数据集的事件。
    notifyKeyspaceEvent(NOTIFY_EXPIRED,
        "expired",key,db->id);
    // 如果是惰性删除,调用dbAsyncDelete,否则调用 dbSyncDelete
    return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
                                         dbSyncDelete(db,key);
}

上图是写命令传播的示意图,删除命令的传播和它一致。propagateExpire 函数先调用 feedAppendOnlyFile 函数将命令同步到 AOF 的缓冲区中,然后调用 replicationFeedSlaves函数将命令同步到所有的 slave 中。Redis 复制的机制可以查看Redis 复制过程详解

// 将命令传递到slave和AOF缓冲区。maser删除一个过期键时会发送Del命令到所有的slave和AOF缓冲区
void propagateExpire(redisDb *db, robj *key, int lazy) {
    robj *argv[2];
    // 生成同步的数据
    argv[0] = lazy ? shared.unlink : shared.del;
    argv[1] = key;
    incrRefCount(argv[0]);
    incrRefCount(argv[1]);
    // 如果开启了 AOF 则追加到 AOF 缓冲区中
    if (server.aof_state != AOF_OFF)
        feedAppendOnlyFile(server.delCommand,db->id,argv,2);
    // 同步到所有 slave
    replicationFeedSlaves(server.slaves,db->id,argv,2);

    decrRefCount(argv[0]);
    decrRefCount(argv[1]);
}

dbAsyncDelete 函数会先调用 dictDelete 来删除过期表中的键,然后处理键值表中的键值对象。它会根据值的占用的空间来选择是直接释放值对象,还是交给 bio 异步释放值对象。判断依据就是值的估计大小是否大于 LAZYFREE_THRESHOLD 阈值。键对象和 dictEntry 对象则都是直接被释放。

#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
    // 删除该键在过期表中对应的entry
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

    // unlink 该键在键值表对应的entry
    dictEntry *de = dictUnlink(db->dict,key->ptr);
    // 如果该键值占用空间非常小,懒删除反而效率低。所以只有在一定条件下,才会异步删除
    if (de) {
        robj *val = dictGetVal(de);
        size_t free_effort = lazyfreeGetFreeEffort(val);
        // 如果释放这个对象消耗很多,并且值未被共享(refcount == 1)则将其加入到懒删除列表
        if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
            atomicIncr(lazyfree_objects,1);
            bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
            dictSetVal(db->dict,de,NULL);
        }
    }

    // 释放键值对,或者只释放key,而将val设置为NULL来后续懒删除
    if (de) {
        dictFreeUnlinkedEntry(db->dict,de);
        // slot 和 key 的映射关系是用于快速定位某个key在哪个 slot中。
        if (server.cluster_enabled) slotToKeyDel(key);
        return 1;
    } else {
        return 0;
    }
}

dictUnlink 会将键值从键值表中删除,但是却不释放 key、val和对应的表entry对象,而是将其直接返回,然后再调用dictFreeUnlinkedEntry进行释放。dictDelete 是它的兄弟函数,但是会直接释放相应的对象。二者底层都通过调用 dictGenericDelete来实现。dbAsyncDelete d的兄弟函数 dbSyncDelete 就是直接调用dictDelete来删除过期键。

void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
    if (he == NULL) return;
    // 释放key对象
    dictFreeKey(d, he);
    // 释放值对象,如果它不为null
    dictFreeVal(d, he);
    // 释放 dictEntry 对象
    zfree(he);
}

Redis 有自己的 bio 机制,主要是处理 AOF 落盘、懒删除逻辑和关闭大文件fd。bioCreateBackgroundJob 函数将释放值对象的 job 加入到队列中,bioProcessBackgroundJobs会从队列中取出任务,根据类型进行对应的操作。

void *bioProcessBackgroundJobs(void *arg) {
    .....
    while(1) {
        listNode *ln;

        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        if (type == BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == BIO_AOF_FSYNC) {
            aof_fsync((long)job->arg1);
        } else if (type == BIO_LAZY_FREE) {
            // 根据参数来决定要做什么。有参数1则要释放它,有参数2和3是释放两个键值表
            // 过期表,也就是释放db 只有参数三是释放跳表
            if (job->arg1)
                lazyfreeFreeObjectFromBioThread(job->arg1);
            else if (job->arg2 && job->arg3)
                lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
            else if (job->arg3)
                lazyfreeFreeSlotsMapFromBioThread(job->arg3);
        }
        zfree(job);
        ......
    }
}

dbSyncDelete 则是直接删除过期键,并且将键、值和 DictEntry 对象都释放。

int dbSyncDelete(redisDb *db, robj *key) {
    // 删除过期表中的entry
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
    // 删除键值表中的entry
    if (dictDelete(db->dict,key->ptr) == DICT_OK) {
        // 如果开启了集群,则删除slot 和 key 映射表中key记录。
        if (server.cluster_enabled) slotToKeyDel(key);
        return 1;
    } else {
        return 0;
    }
}

但是单独用这种方式存在内存泄露的问题,当过期键一直没有访问将无法得到及时删除,从而导致内存不能及时释放。正因为如此,Redis还提供另一种定时任 务删除机制作为惰性删除的补充。

Redis 内部维护一个定时任务,默认每秒运行10次(通过配置控制)。定时任务中删除过期键逻辑采用了自适应算法,根据键的 过期比例、使用快慢两种速率模式回收键,流程如下图所示。

  • 1)定时任务首先根据快慢模式( 慢模型扫描的键的数量以及可以执行时间都比快模式要多 )和相关阈值配置计算计算本周期最大执行时间、要检查的数据库数量以及每个数据库扫描的键数量。
  • 2) 从上次定时任务未扫描的数据库开始,依次遍历各个数据库。
  • 3)从数据库中随机选手 ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 个键,如果发现是过期键,则调用 activeExpireCycleTryExpire 函数删除它。
  • 4)如果执行时间超过了设定的最大执行时间,则退出,并设置下一次使用慢模式执行。
  • 5)未超时的话,则判断是否采样的键中是否有25%的键是过期的,如果是则继续扫描当前数据库,跳到第3步。否则开始扫描下一个数据库。

定期删除策略由 expire.c/activeExpireCycle 函数实现。在redis事件驱动的循环中的eventLoop->beforesleep和
周期性操作 databasesCron 都会调用 activeExpireCycle 来处理过期键。但是二者传入的 type 值不同,一个是ACTIVE_EXPIRE_CYCLE_SLOW 另外一个是ACTIVE_EXPIRE_CYCLE_FAST。activeExpireCycle 在规定的时间,分多次遍历各个数据库,从 expires 字典中随机检查一部分过期键的过期时间,删除其中的过期键,相关源码如下所示。

void activeExpireCycle(int type) {
    // 上次检查的db
    static unsigned int current_db = 0; 
    // 上次检查的最大执行时间
    static int timelimit_exit = 0;
    // 上一次快速模式运行时间
    static long long last_fast_cycle = 0; /* When last fast cycle ran. */

    int j, iteration = 0;
    // 每次检查周期要遍历的DB数
    int dbs_per_call = CRON_DBS_PER_CALL;
    long long start = ustime(), timelimit, elapsed;

    ..... // 一些状态时不进行检查,直接返回

    // 如果上次周期因为执行达到了最大执行时间而退出,则本次遍历所有db,否则遍历db数等于 CRON_DBS_PER_CALL
    if (dbs_per_call > server.dbnum || timelimit_exit)
        dbs_per_call = server.dbnum;

    // 根据ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC计算本次最大执行时间
    timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
    timelimit_exit = 0;
    if (timelimit <= 0) timelimit = 1;
    // 如果是快速模式,则最大执行时间为ACTIVE_EXPIRE_CYCLE_FAST_DURATION
    if (type == ACTIVE_EXPIRE_CYCLE_FAST)
        timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */
    // 采样记录
    long total_sampled = 0;
    long total_expired = 0;
    // 依次遍历 dbs_per_call 个 db
    for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
        int expired;
        redisDb *db = server.db+(current_db % server.dbnum);
        // 将db数增加,一遍下一次继续从这个db开始遍历
        current_db++;

        do {
            ..... // 申明变量和一些情况下 break
            if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
                num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
            // 主要循环,在过期表中进行随机采样,判断是否比率大于25%
            while (num--) {
                dictEntry *de;
                long long ttl;

                if ((de = dictGetRandomKey(db->expires)) == NULL) break;
                ttl = dictGetSignedIntegerVal(de)-now;
                // 删除过期键
                if (activeExpireCycleTryExpire(db,de,now)) expired++;
                if (ttl > 0) {
                    /* We want the average TTL of keys yet not expired. */
                    ttl_sum += ttl;
                    ttl_samples++;
                }
                total_sampled++;
            }
            // 记录过期总数
            total_expired += expired;
            // 即使有很多键要过期,也不阻塞很久,如果执行超过了最大执行时间,则返回
            if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
                elapsed = ustime()-start;
                if (elapsed > timelimit) {
                    timelimit_exit = 1;
                    server.stat_expired_time_cap_reached_count++;
                    break;
                }
            }
            // 当比率小于25%时返回
        } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
    }
    .....// 更新一些server的记录数据
}

activeExpireCycleTryExpire 函数的实现就和 expireIfNeeded 类似,这里就不赘述了。

int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
    long long t = dictGetSignedIntegerVal(de);
    if (now > t) {
        sds key = dictGetKey(de);
        robj *keyobj = createStringObject(key,sdslen(key));

        propagateExpire(db,keyobj,server.lazyfree_lazy_expire);
        if (server.lazyfree_lazy_expire)
            dbAsyncDelete(db,keyobj);
        else
            dbSyncDelete(db,keyobj);
        notifyKeyspaceEvent(NOTIFY_EXPIRED,
            "expired",keyobj,db->id);
        decrRefCount(keyobj);
        server.stat_expiredkeys++;
        return 1;
    } else {
        return 0;
    }
}

定期删除策略的关键点就是删除操作执行的时长和频率:

  • 如果删除操作太过频繁或者执行时间太长,就对 CPU 时间不是很友好,CPU 时间过多的消耗在删除过期键上。
  • 如果删除操作执行太少或者执行时间太短,就不能及时删除过期键,导致内存浪费。

内存溢出控制策略

当Redis所用内存达到maxmemory上限时会触发相应的溢出控制策略。 具体策略受maxmemory-policy参数控制,Redis支持6种策略,如下所示:

  • 1)noeviction:默认策略,不会删除任何数据,拒绝所有写入操作并返 回客户端错误信息(error)OOM command not allowed when used memory,此 时Redis只响应读操作。
  • 2)volatile-lru:根据LRU算法删除设置了超时属性(expire)的键,直 到腾出足够空间为止。如果没有可删除的键对象,回退到noeviction策略。
  • 3)allkeys-lru:根据LRU算法删除键,不管数据有没有设置超时属性, 直到腾出足够空间为止。
  • 4)allkeys-random:随机删除所有键,直到腾出足够空间为止。
  • 5)volatile-random:随机删除过期键,直到腾出足够空间为止。
  • 6)volatile-ttl:根据键值对象的ttl属性,删除最近将要过期数据。如果没有,回退到noeviction策略。

内存溢出控制策略可以使用 config set maxmemory-policy {policy} 语句进行动态配置。Redis 提供了丰富的空间溢出控制策略,我们可以根据自身业务需要进行选择。

当设置 volatile-lru 策略时,保证具有过期属性的键可以根据 LRU 剔除,而未设置超时的键可以永久保留。还可以采用allkeys-lru 策略把 Redis 变为纯缓存服务器使用。

当Redis因为内存溢出删除键时,可以通过执行 info stats 命令查看 evicted_keys 指标找出当前 Redis 服务器已剔除的键数量。

每次Redis执行命令时如果设置了maxmemory参数,都会尝试执行回收 内存操作。当Redis一直工作在内存溢出(used_memory>maxmemory)的状态下且设置非 noeviction 策略时,会频繁地触发回收内存的操作,影响Redis 服务器的性能,这一点千万要引起注意。

Share

一文了解 Redis 内存监控和内存消耗

Redis 是一种内存数据库,将数据保存在内存中,读写效率要比传统的将数据保存在磁盘上的数据库要快很多。所以,监控 Redis 的内存消耗并了解 Redis 内存模型对高效并长期稳定使用 Redis 至关重要。

内存使用统计

通过 info memory 命令可以获得 Redis 内存相关的指标。较为重要的指标和解释如下所示:

当 mem_fragmentation_ratio > 1 时,说明有部分内存并没有用于数据存储,而是被内存碎片所消耗,如果该值很大,说明碎片率严重。
当 mem_fragmentation_ratio < 1 时,这种情况一般出现在操作系统把 Redis 内存交换 (swap) 到硬盘导致,出现这种情况要格外关注,由于硬盘速度远远慢于内存,Redis 性能会变得很差,甚至僵死。

当 Redis 内存超出可以获得内存时,操作系统会进行 swap,将旧的页写入硬盘。从硬盘读写大概比从内存读写要慢5个数量级。used_memory 指标可以帮助判断 Redis 是否有被swap的风险或者它已经被swap。

在 Redis Administration 一文 (链接在文末) 建议要设置和内存一样大小的交换区,如果没有交换区,一旦 Redis 突然需要的内存大于当前操作系统可用内存时,Redis 会因为 out of memory 而被 Linix Kernel 的 OOM Killer 直接杀死。虽然当 Redis 的数据被换出 (swap out) 时,Redis的性能会变差,但是总比直接被杀死的好。

Redis 使用 maxmemory 参数限制最大可用内存。限制内存的目的主要有:

  • 用于缓存场景,当超出内存上限 maxmemory 时使用 LRU 等删除策略释放空间。
  • 防止所用的内存超过服务器物理内存,导致 OOM 后进程被系统杀死。

maxmemory 限制的是 Redis 实际使用的内存量,也就是 used_memory 统计项对应的内存。实际消耗的内存可能会比 maxmemory 设置的大,要小心因为这部内存导致 OOM。所以,如果你有 10GB 的内存,最好将 maxmemory 设置为 8 或者 9G

内存消耗划分

Redis 进程内消耗主要包括:自身内存 + 对象内存 + 缓冲内存 + 内存碎片,其中 Redis 空进程自身内存消耗非常少,通常 used_memory_rss 在 3MB 左右时,used_memory 一般在 800KB 左右,一个空的 Redis 进程消耗内存可以忽略不计。

对象内存

对象内存是 Redis 内存占用最大的一块,存储着用户所有的数据。Redis 所有的数据都采用 key-value 数据类型,每次创建键值对时,至少创建两个类型对象:key 对象和 value 对象。对象内存消耗可以简单理解为这两个对象的内存消耗之和(还有类似过期之类的信息)。键对象都是字符串,在使用 Redis 时很容易忽略键对内存消耗的影响,应当避免使用过长的键。有关 Redis 对象系统的详细内容,请看我之前的文章十二张图带你了解 Redis 的数据结构和对象系统

缓冲内存

缓冲内存主要包括:客户端缓冲、复制积压缓冲区和 AOF 缓冲区。

客户端缓冲指的是所有接入到 Redis 服务器 TCP 连接的输入输出缓冲。

输入缓冲无法控制,最大空间为 1G,如果超过将断开连接。而且输入缓冲区不受 maxmemory 控制,假设一个 Redis 实例设置了 maxmemory 为 4G,已经存储了 2G 数据,但是如果此时输入缓冲区使用了 3G,就已经超出了 maxmemory 限制,可能导致数据丢失、键值淘汰或者 OOM。

输入缓冲区过大主要是因为 Redis 的处理速度跟不上输入缓冲区的输入速度,并且每次进入输入缓冲区的命令包含了大量的 bigkey。

输出缓冲通过参数 client-output-buffer-limit 控制,其格式如下所示。

client-output-buffer-limit [hard limit] [soft limit] [duration]

hard limit 是指一旦缓冲区大小达到了这个阈值,Redis 就会立刻关闭该连接。而 soft limit 和时间 duration 共同生效,比如说 soft time 为 64mb、duration 为 60,则只有当缓冲区持续 60s 大于 64mb 时,Redis 才会关闭该连接。

普通客户端是除了复制和订阅的客户端之外的所有连接。Reids 对其的默认配置是 client-output-buffer-limit normal 0 0 0 , Redis 并没有对普通客户端的输出缓冲区做限制,一般普通客户端的内存消耗可以忽略不计,但是当有大量慢连接客户端接入时这部分内存消耗就不能忽略,可以设置 maxclients 做限制。特别当使用大量数据输出的命令且数据无法及时推送到客户端时,如 monitor 命令,容易造成 Redis 服务器内存突然飙升。相关案例可以查看这篇文章美团在Redis上踩过的一些坑-3.redis内存占用飙升

从客户端用于主从复制,主节点会为每个从节点单独建立一条连接用于命令复制,默认配置为 client-output-buffer-limit slave 256mb 64mb 60。当主从节点之间网络延迟较高或主节点挂载大量从节点时这部分内存消耗将占用很大一部分,建议主节点挂载的从节点不要多于 2 个,主从节点不要部署在较差的网络环境下,如异地跨机房环境,防止复制客户端连接缓慢造成溢出。与主从复制相关的一共有两类缓冲区,一个是从客户端输出缓冲区,另外一个是下面会介绍到的复制积压缓冲区。

订阅客户端用于发布订阅功能,连接客户端使用单独的输出缓冲区,默认配置为 client-output-buffer-limit pubsub 32mb 8mb 60,当订阅服务的消息生产快于消费速度时,输出缓冲区会产生积压造成内存空间溢出。

输入输出缓冲区在大流量场景中容易失控,造成 Redis 内存不稳定,需要重点监控。可以定期执行 client list 命令,监控每个客户端的输入输出缓冲区大小和其他信息。

属性名 属性说明
qbuf 查询缓冲区的长度(字节为单位, 0 表示没有分配查询缓冲区)
qbuf-free 查询缓冲区剩余空间的长度(字节为单位, 0 表示没有剩余空间)
obl 输出缓冲区的长度(字节为单位, 0 表示没有分配输出缓冲区)
oll 输出列表包含的对象数量(当输出缓冲区没有剩余空间时,命令回复会以字符串对象的形式被入队到这个队列里)
127.0.0.1:6379> client list
id=3 addr=127.0.0.1:58161 fd=8 name= \
age=1408 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 \
qbuf=26 qbuf-free=32742 obl=0 oll=0 omem=0 \
events=r cmd=client

client list 命令执行速度慢,客户端较多时频繁执行存在阻塞redis的可能,所以一般可以先使用 info clients 命令获取最大的客户端缓冲区大小。

127.0.0.1:6379> info clients
# Clients
connected_clients:1
client_recent_max_input_buffer:2
client_recent_max_output_buffer:0
blocked_clients:0

复制积压缓冲区是Redis 在 2.8 版本后提供的一个可重用的固定大小缓冲区,用于实现部分复制功能。根据 repl-backlog-size 参数控制,默认 1MB。对于复制积压缓冲区整个主节点只有一个,所有的从节点共享此缓冲区。因此可以设置较大的缓冲区空间,比如说 100MB,可以有效避免全量复制。有关复制积压缓冲区的详情可以看我的旧文章 Redis 复制过程详解

AOF 重写缓冲区:这部分空间用于在 Redis AOF 重写期间保存最近的写入命令。AOF 重写缓冲区的大小用户无法控制,取决于 AOF 重写时间和写入命令量,不过一般都很小。有关 AOF 持久化的详情可以看我的旧文章 Redis AOF 持久化详解

Redis 内存碎片

Redis 默认的内存分配器采用 jemalloc,可选的分配器还有:glibc、tcmalloc。内存分配器为了更好地管理和重复利用内存,分配内存策略一般采用固定范围的内存块进行分配。具体的分配策略后续会具体讲解,但是 Redis 正常碎片率一般在 1.03 左右(为什么是这个值)。但是当存储的数据长度长度差异较大时,以下场景容易出现高内存碎片问题:

  • 频繁做更新操作,例如频繁对已经存在的键执行 append、setrange 等更新操作。
  • 大量过期键删除,键对象过期删除后,释放的空间无法得到重复利用,导致碎片率上升。

这部分内容我们后续再详细讲解 jemalloc,因为大量的框架都会使用内存分配器,比如说 Netty 等。

子进程内存消耗

子进程内存消耗主要指执行 AOF 重写 或者进行 RDB 保存时 Redis 创建的子进程内存消耗。Redis 执行 fork 操作产生的子进程内存占用量表现为与父进程相同,理论上需要一倍的物理内存来完成相应的操作。但是 Linux 具有写时复制技术 (copy-on-write),父子进程会共享相同的物理内存页,当父进程处理写请求时会对需要修改的页复制出一份副本完成写操作,而子进程依然读取 fork 时整个父进程的内存快照。

如上图所示,fork 时只拷贝 page table,也就是页表。只有等到某一页发生修改时,才真正进行页的复制。

但是 Linux Kernel 在 2.6.38 内存增加了 Transparent Huge Pages (THP) 机制,简单理解,它就是让页大小变大,本来一页为 4KB,开启 THP 机制后,一页大小为 2MB。它虽然可以加快 fork 速度( 要拷贝的页的数量减少 ),但是会导致 copy-on-write 复制内存页的单位从 4KB 增大为 2MB,如果父进程有大量写命令,会加重内存拷贝量,都是修改一个页的内容,但是页单位变大了,从而造成过度内存消耗。例如,以下两个执行 AOF 重写时的内存消耗日志:

// 开启 THP
C * AOF rewrite: 1039 MB of memory used by copy-on-write
// 关闭 THP
C * AOF rewrite: 9MB of memory used by copy-on-write

这两个日志出自同一个 Redis 进程,used_memory 总量是 1.5GB,子进程执行期间每秒写命令量都在 200 左右。当分别开启和关闭 THP 时,子进程内存消耗有天壤之别。所以,在高并发写的场景下开启 THP,子进程内存消耗可能是父进程的数倍,造成机器物理内存溢出。

所以说,Redis 产生的子进程并不需要消耗 1 倍的父进程内存,实际消耗根据期间写入命令量决定,所以需要预留一些内存防止溢出。并且建议关闭系统的 THP,防止 copy-on-write 期间内存过度消耗。不仅是 Redis,部署 MySQL 的机器一般也会关闭 THP。

参考文章

Share