Spring Cloud Netflix Feign 基础应用实战

 微服务是软件系统架构上的一种设计风格,它倡导将一个原本独立的服务系统分成多个小型服务,这些小型服务都在独立的进程中运行,通过各个小型服务之间的协作来实现原本独立系统的所有业务功能。小型服务基于多种跨进程的方式进行通信协作,而在Spring Cloud架构中比较常见的跨进程的方式是RESTful HTTP请求和RPC调用。

 RPC就是远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。比如说,计算机 A 上的进程调用另外一台计算机 B 上的进程,其中 A 上的调用进程被挂起,而 B 上的被调用进程开始执行,当值返回给 A 时,A 进程继续执行。调用方可以通过使用参数将信息传送给被调用方,而后可以通过传回的结果得到信息。而这一过程,对于开发人员来说是透明的。

RPC示意图

 REST是Representational State Transfer的缩写,是表现层状态转移的含义。

 Resource是资源,所谓“资源”就是网络上的一个实体,或者说网上的一个具体信息。它可以是一段文本,一首歌曲,一种服务,总之就是一个具体的存在。你可以使用一个URI指向它,每种”资源“对应一个URI。

 Representational是”表现层“的意思,”资源“是一种消息实体,它可以有多种外在的表现形式,我们把”资源“的具体呈现出来的形式叫做它的”表现层“。比如说,文本可以用txt格式进行表现,也可以使用xml格式,JSON格式和二进制格式;视频可以以MP4格式表现,也可以以AVI格式表现。URI只代表资源的实体,不代表它的形式。它的具体表现形式,应该在HTTP请求的头信息Accept和Content-Type字段指定,这两个字段才是对”表现层“的描述。

 State Transfer是指状态转化。客户端访问服务的过程中必然涉及到数据和状态的转化。如果客户端想要操作服务器,必须通过某种手段,让服务器端发生”状态转化“。而这种转化是建立在表现层之上的,所以就是”表现层状态转化“。客户端通过使用HTTP协议中的四个动词来实现上述操作,它们分别:用来获取资源的GET,用来新建或更新资源的POST,用来更新资源的PUT,用来删除资源的DELETE。

 REST是Web Service的一种实现方式,另外一种实现方式为SOAP。REST致力于通过HTTP协议中的POST/GET/PUT/DELETE等方法和一个可读性较强的URL来提供一个HTTP请求;而SOAP致力于通过wsdl数据格式来实现通信。二者的使用场景和设计目标不同。SOAP一般作为应用层协议来进行服务间的消息调用。

 RPC和REST之间的最大差别在于RPC调用可以不依赖HTTP协议,底层直接使用TPC/IP协议进行传输,传输效率相比于REST会有一定的提升。

Feign简介

Feign是一个声明式RESTful HTTP请求客户端,它使得编写Web服务客户端更加方便和快捷。使用Feign创建一个接口并使用Feign提供的注解修饰该接口,然后就可以使用该接口进行RESTful HTTP请求的发送。Feign还可以集成Ribbon和Eureka来为自己提供负载均衡和断路器的机制。

Feign会将带有注解的函数接口信息转化为网络请求模板,在发送网络请求之前,函数的参数值会以一定的方式设置到这些请求模板中。虽然这样的模式使得Feign只能支持基于文本的网络请求,但是它可以简化网络请求的实现,方便编程人员快速构建自己的网络请求架构。
Feign架构示意图

 如上图所示,使用Feign的程序的架构一般分为三个部分,分别为服务注册中心,服务提供者和服务消费者。服务提供者向服务注册中心注册自己,然后服务消费者通过Feign发送请求时,Feign会向去服务注册中心获取关于服务提供者的信息,然后再向服务提供者发送网络请求。

代码示例

服务注册中心

Feign可以配合eureka等服务注册中心同时使用。eureka来作为服务注册中心,为Feign提供关于服务端信息的获取,比如说IP地址。关于eureka的具体使用可以参考第四章中关于eureka的快速入门介绍。

服务提供者

Spring Cloud Feign是声明式RESTful请求客户端,所以它不会侵入服务提供者程序的实现。也就是说,服务提供者只需要提供Web Service的API接口,至于具体实现既可以是Spring Controler也可以是Jersey。我们只需要确保该服务提供者被注册到服务注册中心上。

@RestController
@RequestMapping("/feign-service")
public class FeignServiceController {

    private static final Logger logger = LoggerFactory.getLogger(FeignServiceController.class);

    private static String DEFAULT_SERVICE_ID = "application";
    private static String DEFAULT_HOST = "localhost";
    private static int DEFAULT_PORT = 8080;

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.GET)
    public Instance getInstanceByServiceId(@PathVariable("serviceId") String serviceId){
        logger.info("Get Instance by serviceId {}", serviceId);
        return new Instance(serviceId, DEFAULT_HOST, DEFAULT_PORT);
    }

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.DELETE)
    public String deleteInstanceByServiceId(@PathVariable("serviceId") String serviceId){

        logger.info("Delete Instance by serviceId {}", serviceId);
        return "Instance whose serviceId is " + serviceId + " is deleted";

    }

    @RequestMapping(value = "/instance", method = RequestMethod.POST)
    public String createInstance(@RequestBody Instance instance){

        logger.info("Create Instance whose serviceId is {}", instance.getServiceId());
        return "Instance whose serviceId is" + instance.getServiceId() + " is created";
    }

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.PUT)
    public String updateInstanceByServiceId(@RequestBody Instance instance, @PathVariable("serviceId") String serviceId){
        logger.info("Update Instance whose serviceId is {}", serviceId);
        return "Instance whose serviceId is " + serviceId + " is updated";
    }
}

 上述代码中通过@RestController@RequestMapping声明了四个网络API接口,分别是对Instance资源的增删改查操作。

 除了实现网络API接口之外,还需要将该service注册到eureka上。如下列代码所示,需要在application.yml文件中设置服务注册中心的相关信息和代表该应用的名称。

eureka:
  instance:
    instance-id: ${spring.application.name}:${vcap.application.instance_id:${spring.application.instance_id:${random.value}}}
  client:
    service-url:
      default-zone: http://localhost:8761/eureka/
spring:
  application:
    name: feign-service
server:
  port: 0

服务消费者

Feign是声明式RESTful客户端,所以构建Feign项目的关键在于构建服务消费者。通过下面六步可以创建一个Spring Cloud Feign的服务消费者。

 首先创建一个普通的Spring Boot工程,取名为chapter-feign-client
 然后在pom文件中添加eurekafeign相关的依赖。其中spring-cloud-starter-eurekaeureka的starter依赖包,spring-cloud-starter-feignfeign的starter依赖包。

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-eureka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-feign</artifactId>
    </dependency>
</dependencies>

 接着在工程的入口类上添加@EnableFeignClients注解表示开启Spring Cloud Feign的支持功能,代码如下所示。

@SpringBootApplication
@EnableFeignClients()
public class ChapterFeignClientApplication {
	public static void main(String[] args) {
		SpringApplication.run(ChapterFeignClientApplication.class, args);
	}
}

@EnableFeignClients就像是一个开关,如果你使用了该注解,那么Feign相关的组件和处理机制才会生效,否则不会生效。@EnableFeignClients还可以对Feign相关组件进行自定义配置,它的方法和原理会在本章的源码分析章节在做具体的讲解。

 接下来我们定义一个FeignServiceClient接口,通过@FeignClient注解来指定服务名进而绑定服务。这一类被@FeignClient修饰的接口类一般被称为FeignClient。我们可以通过@RequestMapping来修饰相应的方法来定义调用函数。

@FeignClient("feign-service")
@RequestMapping("/feign-service")
public interface FeignServiceClient {

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.GET)
    public Instance getInstanceByServiceId(@PathVariable("serviceId") String serviceId);

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.DELETE)
    public String deleteInstanceByServiceId(@PathVariable("serviceId") String serviceId);

    @RequestMapping(value = "/instance", method = RequestMethod.POST)
    public String createInstance(@RequestBody Instance instance);

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.PUT)
    public String updateInstanceByServiceId(@RequestBody Instance instance, @PathVariable("serviceId") String serviceId);
}

 如上面代码片段所显示的,如果你调用FeignServiceClient对象的getInstanceByServiceId函数,那么Feign就会向feign-service服务的/feign-service/instance/{serviceId}接口发送网络请求。

 创建一个Controller来调用上边的服务,通过@Autowired来自动装载FeignServiceClient示例。代码如下:

@RestController
@RequestMapping("/feign-client")
public class FeignClientController {

    @Autowired
    FeignServiceClient feignServiceClient;

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.GET)
    public Instance getInstanceByServiceId(@PathVariable("serviceId") String serviceId){
        return feignServiceClient.getInstanceByServiceId(serviceId);
    }

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.DELETE)
    public String deleteInstanceByServiceId(@PathVariable("serviceId") String serviceId){
        return feignServiceClient.deleteInstanceByServiceId(serviceId);
    }

    @RequestMapping(value = "/instance", method = RequestMethod.POST)
    public String createInstance(@RequestBody Instance instance){
        return feignServiceClient.createInstance(instance);
    }

    @RequestMapping(value = "/instance/{serviceId}", method = RequestMethod.PUT)
    public String updateInstanceByServiceId(@RequestBody Instance instance, @PathVariable("serviceId") String serviceId){
        return feignServiceClient.updateInstanceByServiceId(instance, serviceId);
    }
}

 最后,application.yml中需要配置eureka服务注册中心的相关配置,具体配置如下所示:

eureka:
  instance:
    instance-id: ${spring.application.name}:${vcap.application.instance_id:${spring.application.instance_id:${random.value}}}
  client:
    service-url:
      default-zone: http://localhost:8761/eureka/

spring:
  application:
    name: feign-client
server:
  port: 8770

 相信读者通过搭建Feign的项目,已经对Feign的相关使用原理有了一定的了解,相信这个过程将对于理解Feign相关的工作原理大有裨益。

Share

Redis 复制过程详解

Redis 的复制功能分为同步( sync )和命令传播( command propagate )两个步骤:

  • 同步用于将从服务器的数据库状态更新至主服务器当前所处的数据库状态。
  • 命令传播则用于在主服务器的数据库状态被修改,导致主从服务器的数据库状态出现不一致时,让主从服务器的数据库重新回到一致状态。

同步

Redis 使用 psync 命令完成主从数据同步,同步过程分为:全量复制和部分复制。

全量复制:一般用于初次复制场景,它会把主节点全部数据一次性发送给从节点发送给从节点,当数据量较大时,会对主从节点和网络造成很大的开销。

部分复制:用于处理在主从复制中因网络闪断等原因造成的网络丢失场景,当从节点再次连接上主节点后,如果条件允许,主节点会补发丢失数据给从节点。因为补发的数据远远小于全量数据,可以有效避免全量复制的过高开销。

psync 命令运行需要以下组件支持:

  • 主从节点各自复制偏移量
  • 主节点复制积压缓冲区
  • 主节点运行 id

参与复制的从节点都会维护自身复制偏移量。主节点在处理完写命令后,会把命令的字节长度做累加记录,统计在 info replication 中的 master_repl_offset 指标中。
从节点在接收到主节点发送的命令后,也会累加记录自身的偏移量,并且会每秒钟上报自身的复制偏移量给主节点。
通过对比主从节点的复制偏移量,可以判断主从节点数据是否一致。

复制积压缓冲区是保存在主节点的一个固定长度的队列,默认大小为 1MB,当主节点有连接的从节点时被创建。主节点响应写命令时,不但会把命令发送给从节点,还会写入复制积压缓冲区中。

复制积压缓冲区大小有限,只能保存最近的复制数据,用于部分复制和复制命令丢失时的数据补救。

每个 Redis 节点启动后都会动态分配一个 40 位的十六进制字符串作为运行 ID。运行 ID 的主要作用是用来唯一标识 Redis 节点,比如说从节点保存主节点的运行 ID 来识别自己正在复制的时哪个主节点。

全量同步


slaveof 命令的执行

  • 1) 从节点发送 psync 命令进行数据同步,由于是第一次进行复制,从节点没有复制偏移量和主节点的运行ID,所以发送的命令时 PSYNC ? -1。
  • 2) 主节点根据 PSYNC ? -1 解析出当前为全量复制,回复 + FULLRESYNC 响应。
  • 3) 从节点接收主节点的响应数据保存运行 ID 和偏移量 offset。
  • 4) 主节点执行 bgsave 保存 RDB 文件到本地,有关 RDB 的知识可以查看《Redis RDB 持久化详解》
  • 5) 主节点发送 RDB 文件给从节点,从节点把接收的 RDB 文件保存在本地并直接作为从节点的数据文件,接收完 RDB 后从节点打印相关日志,可以在日志中查看主节点发送的数据量。

需要注意,对于数据量较大的主节点,比如生成的 RDB 文件超过 6GB 以上时要格外小心。如果传输 RDB 的时间超过 repl-timeout 所配置的值,从节点将发起接收 RDB 文件并清理已经下载的临时文件,导致全量复制失败。

  • 6) 对于主节点开始保存 RDB 快照到从节点接收完成期间,主节点仍然响应读命令,因此主节点会把这期间写命令保存在复制客户端缓冲区内,当从节点加载完 RDB 文件后,主节点再把缓冲区内的数据发送给从节点,保证主从之间数据一致性。

如果主节点创建和传输 RDB 的时间过长,可能会出现主节点复制客户端缓冲区溢出。默认配置为 client-output-buffer-limit slave 256MB 64MB 60,如果60s内缓冲区消耗持续大于64MB或者直接超过256MB时,主节点将直接关闭复制客户端连接,造成全量同步失败。

  • 7) 从节点接收完主节点传送来的全部数据后会清空自身旧数据,该步骤对应如下日志。
  • 8) 从节点清空数据后开始加载 RDB 文件,对于加大的 RDB 文件,这一步操作依然比较耗时,可以通过计算日志之间的时间差来判断加载 RDB 的总耗时。
  • 9) 收到 SYNC 命令的主服务器执行 BGSAVE 命令,在后台生成一个 RDB 文件,并使用一个缓冲区记录从现在开始执行的所有写命令。
  • 10) 当主服务器的 BGSAVE 命令执行完毕时,主服务器会将 GBSAVE 命令生成的 RDB 文件发送给从服务器,从服务器接收并载入这个 RDB 文件,将自己的数据库状态更新至主服务器执行 BGSAVE 命令时的数据库状态。
  • 11) 主服务器将记录在缓冲区里边的所有写命令发送给从服务器,从服务器执行这些写命令,将自己的数据库状态更新至主服务器数据库当前所处的状态。

通过分析全量复制的所有流程,读者会发现全量复制是一个非常耗时费力的操作。它时间开销主要包括:

  • 主节点 bgsave 时间
  • RDB 文件网络传输时间
  • 从节点清空数据时间
  • 从节点加载 RDB 的时间
  • 可能的 AOF 重写时间

全量同步过程中不仅会消耗大量时间,还会进行多次持久化相关操作和网络数据传输,这期间会大量消耗主从节点所在服务器的 CPU、内存和网络资源。所以,除了第一次复制是采用全量同步无法避免,其他场景应该规避全量复制,采取部分同步功能。

部分同步

部分复制主要是 Redis 针对全量复制的过高开销做出的一种优化措施,使用 psync {runId} {offset} 命令实现。当从节点正在复制主节点时,如果出现网络闪断或者命令丢失等异常情况时,从节点会向主节点要求补发丢失的命令数据,如果主节点的复制积压缓冲区存在这部分数据则直接发送给从节点,这样就保证了主从节点复制的一致性。补发的这部分数据一般远远小于全量数据,所以开销很小。

  • 1) 当主从节点之间网络出现中断时,如果超过了 repl-timeout 时间,主节点会认为从节点故障并中断复制连接。
  • 2) 主从连接中断期间主节点依然响应命令,但因复制连接中断命令无法发送给从节点,不过主节点内部存在复制积压缓冲区( repl-backlog-buffer ),依然可以保存最近一段时间的写命令数据,默认最大缓存 1MB。

  • 3) 当主从节点网络恢复后,从节点会再次连上主节点。

  • 4) 当主从连接恢复后,由于从节点之前保存了自身已复制的偏移量和主节点的运行ID。因此会把它们作为 psync 参数发送给主节点,要求进行补发复制操作。
  • 5) 主节点接到 psync 命令后首先核对参数 runId 是否与自身一致,如果一致,说明之前复制的是当前主节点;之后根据参数 offset 在自身复制积压缓冲区查找,如果偏移量之后的数据存在缓冲区中,则对从节点发送 +CONTINUE 响应,表示可以进行部分复制。
  • 6) 主节点根据偏移量把复制积压缓冲区里的数据发送给从节点,保证主从复制进入正常状态。

心跳检测

主从节点在建立复制后,它们之间维护着长连接并彼此发送心跳命令,如下图所示。

主从心跳判断机制如下所示:

  • 1) 主从节点彼此都有心跳检测机制,各自模拟成对方的客户端进行通信,通过 client list 命令查看复制相关客户端信息,主节点的连接状态为 flags=M,从节点连接状态为 flags=S。
  • 2) 主节点默认每隔 10 秒对从节点发送 ping 命令,判断从节点的存活性和连接状态。可以通过参数 repl-ping-slave-period 控制发送频率。
  • 3) 从节点在主线程中每隔 1 秒发送 replconf ack { offset } 命令,给主节点上报自己当前的复制偏移量。

replconf 命令不仅能实时监测主从节点网络状态,还能上报从节点复制偏移量。主节点会根据从节点上传的偏移量检查复制数据是否丢失,如果从节点数据丢失,再从主节点的复制缓存区中拉取丢失的数据发送给该从节点。

异步复制和命令传播

主节点不但负责数据读写,还负责把写命令同步给从节点。写命令的发送过程是异步完成,也就是说主节点自身处理完写命令后直接返回给客户端,并不等待从节点复制完成。

这个异步过程由命令传播来处理,它不仅会将写命令发送给所有从服务器,还会将写命令入队到复制积压缓冲区里边。

后记

Share

编程小技巧之 Linux 文本处理命令

合格的程序员都善于使用工具,正所谓君子性非异也,善假于物也。合理的利用 Linux 的命令行工具,可以提高我们的工作效率。

本文简单的介绍三个能使用 Linux 文本处理命令的场景,给大家开阔一下思路。希望大家阅读完这篇文章之后,要多加实践,将这些技巧内化到自己的日常工作习惯中,真正的提高效率。内化很重要,就像开玩笑所说的一样,即使我知道高内聚,低耦合的要求,了解 23 种设计模式和 6 大原则,熟读代码整洁之道,却仍然写不出优秀的代码。知道和内化到行为中区别还是很大的。

能不能让正确的原则指导正确的行动本身,其实就是区分是否是高手的一个显著标志。

程序员日常工作中往往要处理一些数据和文本,比如说统计一些服务日志文件信息,根据数据库数据生成一些处理数据的SQL和搜索文件内容等。可以直接通过编写代码处理,但不够便捷,因为有时候线上相关的代码环境依赖不一定具备。而直接使用 Linux 的文本处理命令可以很方便地处理这些问题。

日志文件捞数据

在工作中,我们往往需要对一些具有固定格式的文件进行信息统计,比如说根据 nginx 的 access.log 文件数据,计算出每个后端 API 接口的调用次数,并且排序。

nginx 的 access.log 文件文件格式配置如下所示,每个字段之间通过空格分隔开来。

log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                  '$status $body_bytes_sent "$http_referer" '
                  '"$http_user_agent" "$http_x_forwarded_for"';

上述配置中字段含义如下:

  • $remote_addr : 发送请求的源地址
  • $remote_user : 发送请求的用户信息
  • $time_local : 接收请求的本地时间
  • $request : 请求信息,比如说 http 的 method 和 路径。
  • $status : 请求状态,比如说 200、401或者 500。
  • $body_bytes_sent : 请求 body 字节数。
  • $http_referer : 域名。
  • $http_user_agent : 用户端 agent 信息,一般就是浏览器信息
  • $http_x_forwarded_for : 其他信息。

具体的一段 access.log 内容如下所示。

58.213.85.34 - - [11/Sep/2019:03:36:11 +0800] "POST /publish/pending/list HTTP/2.0" 200 1328 "https://remcarpediem.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36"
58.213.85.34 - - [11/Sep/2019:03:36:30 +0800] "GET /publish/search_inner?key=test HTTP/2.0" 200 34466 "https://remcarpediem.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.142 Safari/537.36"

那么,我们可以通过下面命令来统计所有接口调用的次数,并且从大到小排序显示。

cat /var/log/nginx/access.log | awk '{print $7}' | awk -F'?' '{print $1}' | sort | uniq -c | sort -nr

这条命令涉及了 cat、awk、sort , uniq 四个命令行工具和 | 连接符的含义,我们依次简单讲解一下它们的使用,感兴趣的同学可以自行去全面了解学习。

cat 命令是将文件内容打印到标准输出设备上,可以是终端,也可以是其他文件。比如说:

cat /var/log/nginx/access.log # 打印到终端
cat /var/log/nginx/access.log > copy.log # 打印到其他文件中

| 符号是管道操作符,它将的一个命令的 stdout 指向第二个命令的 stdin。在这条命令中 | 符号将 cat 命令的输出指向到 awk 命令的输入中。

awk 是贝尔实验室 1977 年搞出来的文本流处理工具,用于对具有固定格式的文件进行流处理。比如说 nginx 的 access.log 文件,它各个字段之间通过空格分隔开来,awk 就很适合处理此类文件。

'{print $7}' 就是 awk 的指令声明,表示打印出变量$7$7则是 awk 内置的变量,代表按照分隔符分隔开来的第七个文本内容。对于 access.log 文件来说就是 $request 代表的路径相关的内容。 $request 的全部内容是POST /publish/pending/list HTTP/2.0$6 对应 POST,而 $7 对应的就是 /publish/pending/list

awk '{print $7}' Access.log # ''中是命令声明,后边跟着要操作的文件,也就是awk的输入流。

但是有些时候我们发现文本内容并不是按照空格进行分隔的,比如说 $request 内容可能为 /publish/search_inner?key=test,虽然是相同的 path,但是 query 不同,我们统计接口调用量时需要将 query 部分过滤掉。我们可以使用 awk 的 -F 指令指定分隔符。

awk -F'?' '{print $1}' 
# 可以将 /publish/search_inner?key=test 处理为 /publish/search_inner

sort 是专门用于排序的命令,它有多个参数:

  • -n 按数值进行排序,默认是按照字符值排序,按照数值比较 10 > 2 但是按照字符值排序,2 >10 ,因为字符值会先比较首位,也就是 2 > 1。
  • -r 默认是升序排列,这个参数指定按照逆序排列。
  • -k N 指定按第N列排序,默认是第一个值
sort -nr Access.log # 按照数值逆序排序

最后一个命令是 uniq,它用于消除重复行,或者统计。

sort unsort.txt | uniq #  消除重复行
sort unsort.txt | uniq -c # 统计各行在文件中出现的次数,输入格式是[字数] [内容]
sort unsort.txt | uniq -d # 找出重复行

比如说cat /var/log/nginx/access.log | awk '{print $7}' | awk -F'?' '{print $1}' | sort | uniq -c 命令的输出如下所示,正好作为 sort -nr 的输入。

5 /announcement/pending/list
5 /announcement/search_inner

利用这些指令,我们可以通过 access.log 统计很多信息,比如下列这些信息( access.log 的信息配置不同,不可以直接照搬 )。

cat access.log | awk -F ‘^A’ ‘{if($5 == 500) print $0}’ 
#查找当前日志文件 500 错误的访问:
tail -f access.log | awk -F ‘^A’ ‘{if($6>1) print $0}’ 
#查找耗时超过 1s 的慢请求

数据库SQL

在业务迭代过程中,有些数据库数据可能需要使用脚本去修改,这是我们可以要根据一些数据生成对应的 SQL 命令,这里我们可以使用命令行工具快速生成。
比如说我们要将一系列订单状态有问题,需要将其恢复成正常的状态。你现在已经收集到了这批订单的信息。

oder_id name info good_id
100000  '裤子' '山东' 1000
100001  '上衣' '江苏' 1000
100002  '内衣' '内蒙古' 1000
........
100003  '袜子' '江西' 1000

那么你可以使用如下命令直接生成对应的 SQL 语句。

cat ErrorOrderIdFile | awk '{print"UPDATE ORDER SET state = 0 WHERE resource_id = "$1}'

这里 ''中都是 awk 的命令内容,而""中是打印的纯文本,所以我们可以将需要补充的 SQL 命令打印出来。

代码信息统计

在大公司中,各个团队往往会公开出自己的接口给兄弟团队调用,但是随着版本地快速迭代,公开的接口越来越多,想要关闭掉又往往不清楚上游调用方是哪个部门的,轻易不敢关闭或者修改。这时,如果你能访问整个公司的代码库,就可以通过下面的脚本搜索一下项目中是否出现该接口相关的关键词。

笔者公司团队中微服务间通过 FeignClient 相互调用,所以对于这种情况,可以直接将搜索出对应 FeignClient 的函数名出现的文件名称。

下面是一段在多个项目中统计某些关键词出现次数,并打印出文件名的 bash 脚本。

#!/bin/bash
keyword=$1 # 将bash命令的第一个参数赋值给 keyword
prefix=`echo $keyword | tr -s '.' '|' | sed 's/$/|/'` # 处理前缀
files=`find services -name "*.java" -or -name "*.js" | xargs grep -il $keyword` 
# 最关键的一条,搜索services文件夹下文件名后缀为.java或者.js并且内容中有关键词的文件名称。
if [ -z "$files" ];then
	echo ${prefix}0
fi
# 打印
for f in $files;do
echo "$prefix$f"
done

我们只看一下最关键的 find 命令,其他的命令比如 tr 或者 sed,大家可以自行了解学习。

find 用于查找文件,可以按照文件名称、文件操作权限、文件属主、文件访问时间等条件来查找。

find services -name "*.java" -or -name "*.js" # 搜索 services 文件夹下
find . -atime 7 -type f -print 
# -atime是访问时间,-type 是文件类型,区分文件和目录,查找最近7天访问过的文件。
find . -type f -user remcarpediem -print// 找用户 remcarpediem 所拥有的文件
find . ! -name "*.java" -print # !是否定参数,查找所有不是以 .java 结尾的文件。
find . -type f -name "*.java" -delete # find 之后的操作,可以删除当前目录下所有的 java 文件
find . type f -name "*.java" | xargs rm # 上边语句的另外一种写法

xargs 命令能够将输入数据转化为特定命令的命令行参数,比如说多行变一行等,串联多个命令行,比如说上边 find 和 rm。

> ls 
Sentinel					groovy-engine					spring-cloud-bus-stream-binder-rocketmq
agent-demo					hash						spring-cloud-stream-binder-rabbit
> ls | xargs # 将 ls 的输出内容变成一行。
Sentinel agent-demo groovy-engine  hash spring-cloud-bus-stream-binder-rocketmq spring-cloud-stream-binder-rabbit
> echo "nameXnameXnameXname" | xargs -dX 
name name name name
# -d 选项可以自定义一个定界符,相信你已经了解 xargs 的大致作用了吧,按照分隔符拆分文本到一行,默认分隔符当时是回车了。

最后一个命令时 grep,它是文本搜索命令,它可以搜索文本内容的关键词。

grep remcarpediem file # 将 file 文件中的带有 remcarpediem 关键词的行。
grep -C10 remcarpediem file # 将 file 文件中的带有 remcarpediem 关键词前后10行的内容。
cat LOG.* | grep "FROM " | grep "WHERE" > b # 将日志中的所有带where条件的sql查找查找出来
grep -li remcarpediem file # 忽略大小写,并且打印出文件名称

现在大家在回头看一下这段 bash 脚本,是不是大致了解它执行的过程和原理啦。

files=`find services -name "*.java" -or -name "*.js" | xargs grep -il $keyword` 

后记

本文简单介绍了程序员日常工作中可能用到 Linux 命令的三个场景。大家可以根据自己的实际情况,来判断是否需要继续全面详细地学习相关的知识。毕竟只有能运用于实践,给自己工作产生价值的技术才是真技术。学习一项技术,就要坚持学以致用的目的。

编程小技巧之 IDEA 的 Live Template

Share

Redis 事件机制详解

Redis 采用事件驱动机制来处理大量的网络IO。它并没有使用 libevent 或者 libev 这样的成熟开源方案,而是自己实现一个非常简洁的事件驱动库 ae_event。

Redis中的事件驱动库只关注网络IO,以及定时器。该事件库处理下面两类事件:

  • 文件事件(file event):用于处理 Redis 服务器和客户端之间的网络IO。
  • 时间事件(time eveat):Redis 服务器中的一些操作(比如serverCron函数)需要在给定的时间点执行,而时间事件就是处理这类定时操作的。

事件驱动库的代码主要是在src/ae.c中实现的,其示意图如下所示。

事件管理器示意图

aeEventLoop是整个事件驱动的核心,它管理着文件事件表和时间事件列表,
不断地循环处理着就绪的文件事件和到期的时间事件。下面我们就先分别介绍文件事件和时间事件,然后讲述相关的aeEventLoop源码实现。

文件事件

Redis基于Reactor模式开发了自己的网络事件处理器,也就是文件事件处理器。文件事件处理器使用IO多路复用技术,同时监听多个套接字,并为套接字关联不同的事件处理函数。当套接字的可读或者可写事件触发时,就会调用相应的事件处理函数。

Redis 使用的IO多路复用技术主要有:selectepollevportkqueue等。每个IO多路复用函数库在 Redis 源码中都对应一个单独的文件,比如ae_select.c,ae_epoll.c, ae_kqueue.c等。Redis 会根据不同的操作系统,按照不同的优先级选择多路复用技术。事件响应框架一般都采用该架构,比如 netty 和 libevent。

示意图

如下图所示,文件事件处理器有四个组成部分,它们分别是套接字、I/O多路复用程序、文件事件分派器以及事件处理器。

示意图

文件事件是对套接字操作的抽象,每当一个套接字准备好执行 accept、read、write和 close 等操作时,就会产生一个文件事件。因为 Redis 通常会连接多个套接字,所以多个文件事件有可能并发的出现。

I/O多路复用程序负责监听多个套接字,并向文件事件派发器传递那些产生了事件的套接字。

尽管多个文件事件可能会并发地出现,但I/O多路复用程序总是会将所有产生的套接字都放到同一个队列(也就是后文中描述的aeEventLoopfired就绪事件表)里边,然后文件事件处理器会以有序、同步、单个套接字的方式处理该队列中的套接字,也就是处理就绪的文件事件。

一次请求的过程示意图

所以,一次 Redis 客户端与服务器进行连接并且发送命令的过程如上图所示。

  • 客户端向服务端发起建立 socket 连接的请求,那么监听套接字将产生 AE_READABLE 事件,触发连接应答处理器执行。处理器会对客户端的连接请求进行应答,然后创建客户端套接字,以及客户端状态,并将客户端套接字的 AE_READABLE 事件与命令请求处理器关联。
  • 客户端建立连接后,向服务器发送命令,那么客户端套接字将产生 AE_READABLE 事件,触发命令请求处理器执行,处理器读取客户端命令,然后传递给相关程序去执行。
  • 执行命令获得相应的命令回复,为了将命令回复传递给客户端,服务器将客户端套接字的 AE_WRITEABLE 事件与命令回复处理器关联。当客户端试图读取命令回复时,客户端套接字产生 AE_WRITEABLE 事件,触发命令回复处理器将命令回复全部写入到套接字中。

时间事件

Redis 的时间事件分为以下两类:

  • 定时事件:让一段程序在指定的时间之后执行一次。
  • 周期性事件:让一段程序每隔指定时间就执行一次。

Redis 的时间事件的具体定义结构如下所示。

typedef struct aeTimeEvent {
    /* 全局唯一ID */
    long long id; /* time event identifier. */
    /* 秒精确的UNIX时间戳,记录时间事件到达的时间*/
    long when_sec; /* seconds */
    /* 毫秒精确的UNIX时间戳,记录时间事件到达的时间*/
    long when_ms; /* milliseconds */
    /* 时间处理器 */
    aeTimeProc *timeProc;
    /* 事件结束回调函数,析构一些资源*/
    aeEventFinalizerProc *finalizerProc;
    /* 私有数据 */
    void *clientData;
    /* 前驱节点 */
    struct aeTimeEvent *prev;
    /* 后继节点 */
    struct aeTimeEvent *next;
} aeTimeEvent;

一个时间事件是定时事件还是周期性事件取决于时间处理器的返回值:

  • 如果返回值是 AE_NOMORE,那么这个事件是一个定时事件,该事件在达到后删除,之后不会再重复。
  • 如果返回值是非 AE_NOMORE 的值,那么这个事件为周期性事件,当一个时间事件到达后,服务器会根据时间处理器的返回值,对时间事件的 when 属性进行更新,让这个事件在一段时间后再次达到。

Redis 将所有时间事件都放在一个无序链表中,每次 Redis 会遍历整个链表,查找所有已经到达的时间事件,并且调用相应的事件处理器。

介绍完文件事件和时间事件,我们接下来看一下 aeEventLoop的具体实现。

创建事件管理器

Redis 服务端在其初始化函数 initServer中,会创建事件管理器aeEventLoop对象。

函数aeCreateEventLoop将创建一个事件管理器,主要是初始化 aeEventLoop的各个属性值,比如eventsfiredtimeEventHeadapidata

  • 首先创建aeEventLoop对象。
  • 初始化未就绪文件事件表、就绪文件事件表。events指针指向未就绪文件事件表、fired指针指向就绪文件事件表。表的内容在后面添加具体事件时进行初变更。
  • 初始化时间事件列表,设置timeEventHeadtimeEventNextId属性。
  • 调用aeApiCreate 函数创建epoll实例,并初始化 apidata
aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;
    /* 创建事件状态结构 */
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    /* 创建未就绪事件表、就绪事件表 */
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    /* 设置数组大小 */
    eventLoop->setsize = setsize;
    /* 初始化执行最近一次执行时间 */
    eventLoop->lastTime = time(NULL);
    /* 初始化时间事件结构 */
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    /* 将多路复用io与事件管理器关联起来 */
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* 初始化监听事件 */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;
err:
   .....
}

aeApiCreate 函数首先创建了aeApiState对象,初始化了epoll就绪事件表;然后调用epoll_create创建了epoll实例,最后将该aeApiState赋值给apidata属性。

aeApiState对象中epfd存储epoll的标识,events是一个epoll就绪事件数组,当有epoll事件发生时,所有发生的epoll事件和其描述符将存储在这个数组中。这个就绪事件数组由应用层开辟空间、内核负责把所有发生的事件填充到该数组。

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    /* 初始化epoll就绪事件表 */
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    /* 创建 epoll 实例 */
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    /* 事件管理器与epoll关联 */
    eventLoop->apidata = state;
    return 0;
}
typedef struct aeApiState {
    /* epoll_event 实例描述符*/
    int epfd;
    /* 存储epoll就绪事件表 */
    struct epoll_event *events;
} aeApiState;

创建文件事件

aeFileEvent是文件事件结构,对于每一个具体的事件,都有读处理函数和写处理函数等。Redis 调用aeCreateFileEvent函数针对不同的套接字的读写事件注册对应的文件事件。

typedef struct aeFileEvent {
    /* 监听事件类型掩码,值可以是 AE_READABLE 或 AE_WRITABLE */
    int mask;
    /* 读事件处理器 */
    aeFileProc *rfileProc;
    /* 写事件处理器 */
    aeFileProc *wfileProc;
    /* 多路复用库的私有数据 */
    void *clientData;
} aeFileEvent;
/* 使用typedef定义的处理器函数的函数类型 */
typedef void aeFileProc(struct aeEventLoop *eventLoop, 
int fd, void *clientData, int mask);

比如说,Redis 进行主从复制时,从服务器需要主服务器建立连接,它会发起一个 socekt连接,然后调用aeCreateFileEvent函数针对发起的socket的读写事件注册了对应的事件处理器,也就是syncWithMaster函数。

aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL);
/* 符合aeFileProc的函数定义 */
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {....}

aeCreateFileEvent的参数fd指的是具体的socket套接字,procfd产生事件时,具体的处理函数,clientData则是回调处理函数时需要传入的数据。
aeCreateFileEvent主要做了三件事情:

  • fd为索引,在events未就绪事件表中找到对应事件。
  • 调用aeApiAddEvent函数,该事件注册到具体的底层 I/O 多路复用中,本例为epoll。
  • 填充事件的回调、参数、事件类型等参数。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
                       aeFileProc *proc, void *clientData)
{
    /* 取出 fd 对应的文件事件结构, fd 代表具体的 socket 套接字 */
    aeFileEvent *fe = &eventLoop->events[fd];
    /* 监听指定 fd 的指定事件 */
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    /* 置文件事件类型,以及事件的处理器 */
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    /* 私有数据 */
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

如上文所说,Redis 基于的底层 I/O 多路复用库有多套,所以aeApiAddEvent也有多套实现,下面的源码是epoll下的实现。其核心操作就是调用epollepoll_ctl函数来向epoll注册响应事件。有关epoll相关的知识可以看一下《Java NIO源码分析》

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* 如果 fd 没有关联任何事件,那么这是一个 ADD 操作。如果已经关联了某个/某些事件,那么这是一个 MOD 操作。 */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    /* 注册事件到 epoll */
    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    /* 调用epoll_ctl 系统调用,将事件加入epoll中 */
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

事件处理

因为 Redis 中同时存在文件事件和时间事件两个事件类型,所以服务器必须对这两个事件进行调度,决定何时处理文件事件,何时处理时间事件,以及如何调度它们。

aeMain函数以一个无限循环不断地调用aeProcessEvents函数来处理所有的事件。

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        /* 如果有需要在事件处理前执行的函数,那么执行它 */
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        /* 开始处理事件*/
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

下面是aeProcessEvents的伪代码,它会首先计算距离当前时间最近的时间事件,以此计算一个超时时间;然后调用aeApiPoll函数去等待底层的I/O多路复用事件就绪;aeApiPoll函数返回之后,会处理所有已经产生文件事件和已经达到的时间事件。

/* 伪代码 */
int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
    /* 获取到达时间距离当前时间最接近的时间事件*/
    time_event = aeSearchNearestTimer();
    /* 计算最接近的时间事件距离到达还有多少毫秒*/
    remaind_ms = time_event.when - unix_ts_now();
    /* 如果事件已经到达,那么remaind_ms为负数,将其设置为0 */
    if (remaind_ms < 0) remaind_ms = 0;
    /* 根据 remaind_ms 的值,创建 timeval 结构*/
    timeval = create_timeval_with_ms(remaind_ms);
    /* 阻塞并等待文件事件产生,最大阻塞时间由传入的 timeval 结构决定,如果remaind_ms 的值为0,则aeApiPoll 调用后立刻返回,不阻塞*/
    /* aeApiPoll调用epoll_wait函数,等待I/O事件*/
    aeApiPoll(timeval);
    /* 处理所有已经产生的文件事件*/
    processFileEvents();
    /* 处理所有已经到达的时间事件*/
    processTimeEvents();
}

aeApiAddEvent类似,aeApiPoll也有多套实现,它其实就做了两件事情,调用epoll_wait阻塞等待epoll的事件就绪,超时时间就是之前根据最快达到时间事件计算而来的超时时间;然后将就绪的epoll事件转换到fired就绪事件。aeApiPoll就是上文所说的I/O多路复用程序。具体过程如下图所示。

aeApiPoll示意图

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) 
{
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    // 调用epoll_wait函数,等待时间为最近达到时间事件的时间计算而来。
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    // 有至少一个事件就绪?
    if (retval > 0) 
    {
        int j;
        /*为已就绪事件设置相应的模式,并加入到 eventLoop 的 fired 数组中*/
        numevents = retval;
        for (j = 0; j < numevents; j++) 
	{
            int mask = 0;
            struct epoll_event *e = state->events+j;
            if (e->events & EPOLLIN)
		mask |= AE_READABLE;
            if (e->events & EPOLLOUT)
		mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) 
		mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP)
		mask |= AE_WRITABLE;
            /* 设置就绪事件表元素 */
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    
    // 返回已就绪事件个数
    return numevents;
}

processFileEvent是处理就绪文件事件的伪代码,也是上文所述的文件事件分派器,它其实就是遍历fired就绪事件表,然后根据对应的事件类型来调用事件中注册的不同处理器,读事件调用rfileProc,而写事件调用wfileProc

void processFileEvent(int numevents) {
    for (j = 0; j < numevents; j++) {
            /* 从已就绪数组中获取事件 */
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0;
            int invert = fe->mask & AE_BARRIER;
	        /* 读事件 */
            if (!invert && fe->mask & mask & AE_READABLE) {
                /* 调用读处理函数 */
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }
            /* 写事件. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
            processed++;
        }
    }
}

processTimeEvents是处理时间事件的函数,它会遍历aeEventLoop的事件事件列表,如果时间事件到达就执行其timeProc函数,并根据函数的返回值是否等于AE_NOMORE来决定该时间事件是否是周期性事件,并修改器到达时间。

static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    time_t now = time(NULL);
    ....
    eventLoop->lastTime = now;

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    /* 遍历时间事件链表 */
    while(te) {
        long now_sec, now_ms;
        long long id;

        /* 删除需要删除的时间事件 */
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            if (te->prev)
                te->prev->next = te->next;
            else
                eventLoop->timeEventHead = te->next;
            if (te->next)
                te->next->prev = te->prev;
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }

        /* id 大于最大maxId,是该循环周期生成的时间事件,不处理 */
        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);
        /* 事件已经到达,调用其timeProc函数*/
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;

            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            /* 如果返回值不等于 AE_NOMORE,表示是一个周期性事件,修改其when_sec和when_ms属性*/
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                /* 一次性事件,标记为需删除,下次遍历时会删除*/
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        te = te->next;
    }
    return processed;
}

删除事件

当不在需要某个事件时,需要把事件删除掉。例如: 如果fd同时监听读事件、写事件。当不在需要监听写事件时,可以把该fd的写事件删除。

aeDeleteEventLoop函数的执行过程总结为以下几个步骤
1、根据fd在未就绪表中查找到事件
2、取消该fd对应的相应事件标识符
3、调用aeApiFree函数,内核会将epoll监听红黑树上的相应事件监听取消。

后记

接下来,我们会继续学习 Redis 的主从复制相关的原理,欢迎大家持续关注。

Share

Redis AOF 持久化详解

Redis 是一种内存数据库,将数据保存在内存中,读写效率要比传统的将数据保存在磁盘上的数据库要快很多。但是一旦进程退出,Redis 的数据就会丢失。

为了解决这个问题,Redis 提供了 RDB 和 AOF 两种持久化方案,将内存中的数据保存到磁盘中,避免数据丢失。RDB的介绍在这篇文章中《Redis RDB 持久化详解》,今天我们来看一下 AOF 相关的原理。

AOF( append only file )持久化以独立日志的方式记录每次写命令,并在 Redis 重启时在重新执行 AOF 文件中的命令以达到恢复数据的目的。AOF 的主要作用是解决数据持久化的实时性。

RDB 和 AOF

antirez 在《Redis 持久化解密》一文中讲述了 RDB 和 AOF 各自的优缺点:

  • RDB 是一个紧凑压缩的二进制文件,代表 Redis 在某个时间点上的数据备份。非常适合备份,全量复制等场景。比如每6小时执行 bgsave 备份,并把 RDB 文件拷贝到远程机器或者文件系统中,用于灾难恢复。
  • Redis 加载 RDB 恢复数据远远快于 AOF 的方式
  • RDB 方式数据没办法做到实时持久化,而 AOF 方式可以做到。

下面,我们就来了解一下 AOF 是如何做到实时持久化的。

AOF 持久化的实现

如上图所示,AOF 持久化功能的实现可以分为命令追加( append )、文件写入( write )、文件同步( sync )、文件重写(rewrite)和重启加载(load)。其流程如下:

  • 所有的写命令会追加到 AOF 缓冲中。
  • AOF 缓冲区根据对应的策略向硬盘进行同步操作。
  • 随着 AOF 文件越来越大,需要定期对 AOF 文件进行重写,达到压缩的目的。
  • 当 Redis 重启时,可以加载 AOF 文件进行数据恢复。

命令追加

当 AOF 持久化功能处于打开状态时,Redis 在执行完一个写命令之后,会以协议格式(也就是RESP,即 Redis 客户端和服务器交互的通信协议 )将被执行的写命令追加到 Redis 服务端维护的 AOF 缓冲区末尾。

比如说 SET mykey myvalue 这条命令就以如下格式记录到 AOF 缓冲中。

"*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"

Redis 协议格式本文不再赘述,AOF之所以直接采用文本协议格式,是因为所有写入命令都要进行追加操作,直接采用协议格式,避免了二次处理开销。

文件写入和同步

Redis 每次结束一个事件循环之前,它都会调用 flushAppendOnlyFile 函数,判断是否需要将 AOF 缓存区中的内容写入和同步到 AOF 文件中。

flushAppendOnlyFile 函数的行为由 redis.conf 配置中的 appendfsync 选项的值来决定。该选项有三个可选值,分别是 alwayseverysecno

  • always:Redis 在每个事件循环都要将 AOF 缓冲区中的所有内容写入到 AOF 文件,并且同步 AOF 文件,所以 always 的效率是 appendfsync 选项三个值当中最差的一个,但从安全性来说,也是最安全的。当发生故障停机时,AOF 持久化也只会丢失一个事件循环中所产生的命令数据。
  • everysec:Redis 在每个事件循环都要将 AOF 缓冲区中的所有内容写入到 AOF 文件中,并且每隔一秒就要在子线程中对 AOF 文件进行一次同步。从效率上看,该模式足够快。当发生故障停机时,只会丢失一秒钟的命令数据。
  • no:Redis 在每一个事件循环都要将 AOF 缓冲区中的所有内容写入到 AOF 文件。而 AOF 文件的同步由操作系统控制。这种模式下速度最快,但是同步的时间间隔较长,出现故障时可能会丢失较多数据。

Linux 系统下 write 操作会触发延迟写( delayed write )机制。Linux 在内核提供页缓存区用来提供硬盘 IO 性能。write 操作在写入系统缓冲区之后直接返回。同步硬盘操作依赖于系统调度机制,例如:缓冲区页空间写满或者达到特定时间周期。同步文件之前,如果此时系统故障宕机,缓冲区内数据将丢失。

fsync 针对单个文件操作,对其进行强制硬盘同步,fsync 将阻塞直到写入磁盘完成后返回,保证了数据持久化。

appendfsync的三个值代表着三种不同的调用 fsync的策略。调用fsync周期越频繁,读写效率就越差,但是相应的安全性越高,发生宕机时丢失的数据越少。

有关 Linux 的I/O和各个系统调用的作用如下图所示。具体内容可以查看《聊聊 Linux I/O》一文。

AOF 数据恢复

AOF 文件里边包含了重建 Redis 数据所需的所有写命令,所以 Redis 只要读入并重新执行一遍 AOF 文件里边保存的写命令,就可以还原 Redis 关闭之前的状态。

Redis 读取 AOF 文件并且还原数据库状态的详细步骤如下:

  • 创建一个不带网络连接的的伪客户端( fake client),因为 Redis 的命令只能在客户端上下文中执行,而载入 AOF 文件时所使用的的命令直接来源于 AOF 文件而不是网络连接,所以服务器使用了一个没有网络连接的伪客户端来执行 AOF 文件保存的写命令,伪客户端执行命令的效果和带网络连接的客户端执行命令的效果完全一样的。
  • 从 AOF 文件中分析并取出一条写命令。
  • 使用伪客户端执行被读出的写命令。
  • 一直执行步骤 2 和步骤3,直到 AOF 文件中的所有写命令都被处理完毕为止。

当完成以上步骤之后,AOF 文件所保存的数据库状态就会被完整还原出来。

AOF 重写

因为 AOF 持久化是通过保存被执行的写命令来记录 Redis 状态的,所以随着 Redis 长时间运行,AOF 文件中的内容会越来越多,文件的体积也会越来越大,如果不加以控制的话,体积过大的 AOF 文件很可能对 Redis 甚至宿主计算机造成影响。

为了解决 AOF 文件体积膨胀的问题,Redis 提供了 AOF 文件重写( rewrite) 功能。通过该功能,Redis 可以创建一个新的 AOF 文件来替代现有的 AOF 文件。新旧两个 AOF 文件所保存的 Redis 状态相同,但是新的 AOF 文件不会包含任何浪费空间的荣誉命令,所以新 AOF 文件的体积通常比旧 AOF 文件的体积要小得很多。

如上图所示,重写前要记录名为list的键的状态,AOF 文件要保存五条命令,而重写后,则只需要保存一条命令。

AOF 文件重写并不需要对现有的 AOF 文件进行任何读取、分析或者写入操作,而是通过读取服务器当前的数据库状态来实现的。首先从数据库中读取键现在的值,然后用一条命令去记录键值对,代替之前记录这个键值对的多条命令,这就是 AOF 重写功能的实现原理。

在实际过程中,为了避免在执行命令时造成客户端输入缓冲区溢出,AOF 重写在处理列表、哈希表、集合和有序集合这四种可能会带有多个元素的键时,会先检查键所包含的元素数量,如果数量超过 REDIS_AOF_REWRITE_ITEMS_PER_CMD ( 一般为64 )常量,则使用多条命令记录该键的值,而不是一条命令。

rewrite的触发机制主要有一下三个:

  • 手动调用 bgrewriteaof 命令,如果当前有正在运行的 rewrite 子进程,则本次rewrite 会推迟执行,否则,直接触发一次 rewrite。
  • 通过配置指令手动开启 AOF 功能,如果没有 RDB 子进程的情况下,会触发一次 rewrite,将当前数据库中的数据写入 rewrite 文件。
  • 在 Redis 定时器中,如果有需要退出执行的 rewrite 并且没有正在运行的 RDB 或者 rewrite 子进程时,触发一次或者 AOF 文件大小已经到达配置的 rewrite 条件也会自动触发一次。

AOF 后台重写

AOF 重写函数会进行大量的写入操作,调用该函数的线程将被长时间阻塞,所以 Redis 在子进程中执行 AOF 重写操作。

  • 子进程进行 AOF 重写期间,Redis 进程可以继续处理客户端命令请求。
  • 子进程带有父进程的内存数据拷贝副本,在不适用锁的情况下,也可以保证数据的安全性。

但是,在子进程进行 AOF 重启期间,Redis接收客户端命令,会对现有数据库状态进行修改,从而导致数据当前状态和 重写后的 AOF 文件所保存的数据库状态不一致。

为此,Redis 设置了一个 AOF 重写缓冲区,这个缓冲区在服务器创建子进程之后开始使用,当 Redis 执行完一个写命令之后,它会同时将这个写命令发送给 AOF 缓冲区和 AOF 重写缓冲区。

当子进程完成 AOF 重写工作之后,它会向父进程发送一个信号,父进程在接收到该信号之后,会调用一个信号处理函数,并执行以下工作:

  • 将 AOF 重写缓冲区中的所有内容写入到新的 AOF 文件中,保证新 AOF 文件保存的数据库状态和服务器当前状态一致。
  • 对新的 AOF 文件进行改名,原子地覆盖现有 AOF 文件,完成新旧文件的替换
  • 继续处理客户端请求命令。

在整个 AOF 后台重写过程中,只有信号处理函数执行时会对 Redis 主进程造成阻塞,在其他时候,AOF 后台重写都不会阻塞主进程。

后记

后续将会继续学习 Redis 复制和集群相关的知识,希望大家持久关注。

Share

Redis RDB 持久化详解

Redis 是一种内存数据库,将数据保存在内存中,读写效率要比传统的将数据保存在磁盘上的数据库要快很多。但是一旦进程退出,Redis 的数据就会丢失。

为了解决这个问题,Redis 提供了 RDB 和 AOF 两种持久化方案,将内存中的数据保存到磁盘中,避免数据丢失。

antirez 在《Redis 持久化解密》一文中说,一般来说有三种常见的策略来进行持久化操作,防止数据损坏:

  • 方法1 是数据库不关心发生故障,在数据文件损坏后通过数据备份或者快照来进行恢复。Redis 的 RDB 持久化就是这种方式。

  • 方法2 是数据库使用操作日志,每次操作时记录操作行为,以便在故障后通过日志恢复到一致性的状态。因为操作日志是顺序追加的方式写的,所以不会出现操作日志也无法恢复的情况。类似于 Mysql 的 redo 和 undo 日志,具体可以看这篇《InnoDB的磁盘文件及落盘机制》文章。

  • 方法3 是数据库不进行老数据的修改,只是以追加方式去完成写操作,这样数据本身就是一份日志,这样就永远不会出现数据无法恢复的情况了。CouchDB就是此做法的优秀范例。

RDB 就是第一种方法,它就是把当前 Redis 进程的数据生成时间点快照( point-in-time snapshot ) 保存到存储设备的过程。

RDB 的使用

RDB 触发机制分为使用指令手动触发和 redis.conf 配置自动触发。

手动触发 Redis 进行 RDB 持久化的指令的为:

  • save ,该指令会阻塞当前 Redis 服务器,执行 save 指令期间,Redis 不能处理其他命令,直到 RDB 过程完成为止。
  • bgsave,执行该命令时,Redis 会在后台异步执行快照操作,此时 Redis 仍然可以相应客户端请求。具体操作是 Redis 进程执行 fork 操作创建子进程,RDB 持久化过程由子进程负责,完成后自动结束。Redis 只会在 fork 期间发生阻塞,但是一般时间都很短。但是如果 Redis 数据量特别大,fork 时间就会变长,而且占用内存会加倍,这一点需要特别注意。

自动触发 RDB 的默认配置如下所示:

save 900 1 # 表示900 秒内如果至少有 1 个 key 的值变化,则触发RDB
save 300 10 # 表示300 秒内如果至少有 10 个 key 的值变化,则触发RDB
save 60 10000 # 表示60 秒内如果至少有 10000 个 key 的值变化,则触发RDB

如果不需要 Redis 进行持久化,那么可以注释掉所有的 save 行来停用保存功能,也可以直接一个空字符串来停用持久化:save “”。

Redis 服务器周期操作函数 serverCron 默认每个 100 毫秒就会执行一次,该函数用于正在运行的服务器进行维护,它的一项工作就是检查 save 选项所设置的条件是否有一项被满足,如果满足的话,就执行 bgsave 指令。

RDB 整体流程

了解了 RDB 的基础使用后,我们要继续深入对 RDB持久化的学习。在此之前,我们可以先思考一下如何实现一个持久化机制,毕竟这是很多中间件所需的一个模块。

首先,持久化保存的文件内容结构必须是紧凑的,特别对于数据库来说,需要持久化的数据量十分大,需要保证持久化文件不至于占用太多存储。
其次,进行持久化时,中间件应该还可以快速地响应用户请求,持久化的操作应该尽量少影响中间件的其他功能。
最后,毕竟持久化会消耗性能,如何在性能和数据安全性之间做出平衡,如何灵活配置触发持久化操作。

接下来我们将带着这些问题,到源码中寻求答案。

本文中的源码来自 Redis 4.0 ,RDB持久化过程的相关源码都在 rdb.c 文件中。其中大概的流程如下图所示。

上图表明了三种触发 RDB 持久化的手段之间的整体关系。通过 serverCron 自动触发的 RDB 相当于直接调用了 bgsave 指令的流程进行处理。而 bgsave 的处理流程启动子进程后,调用了 save 指令的处理流程。

下面我们从 serverCron 自动触发逻辑开始研究。

自动触发 RDB 持久化

如上图所示,redisServer 结构体的save_params指向拥有三个值的数组,该数组的值与 redis.conf 文件中 save 配置项一一对应。分别是 save 900 1save 300 10save 60 10000dirty 记录着有多少键值发生变化,lastsave记录着上次 RDB 持久化的时间。

serverCron 函数就是遍历该数组的值,检查当前 Redis 状态是否符合触发 RDB 持久化的条件,比如说距离上次 RDB 持久化过去了 900 秒并且有至少一条数据发生变更。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ....
    /* Check if a background saving or AOF rewrite in progress terminated. */
    /* 判断后台是否正在进行 rdb 或者 aof 操作 */
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||
        ldbPendingChildren())
    {
        ....
    } else {
        // 到这儿就能确定 当前木有进行 rdb 或者 aof 操作
        // 遍历每一个 rdb 保存条件
         for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;

            //如果数据保存记录 大于规定的修改次数 且距离 上一次保存的时间大于规定时间或者上次BGSAVE命令执行成功,才执行 BGSAVE 操作
            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds &&
                (server.unixtime-server.lastbgsave_try >
                 CONFIG_BGSAVE_RETRY_DELAY ||
                 server.lastbgsave_status == C_OK))
            {
                //记录日志
                serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, (int)sp->seconds);
                rdbSaveInfo rsi, *rsiptr;
                rsiptr = rdbPopulateSaveInfo(&rsi);
                // 异步保存操作
                rdbSaveBackground(server.rdb_filename,rsiptr);
                break;
            }
         }
    }
    ....
    server.cronloops++;
    return 1000/server.hz;
}

如果符合触发 RDB 持久化的条件,serverCron会调用rdbSaveBackground函数,也就是 bgsave 指令会触发的函数。

子进程后台执行 RDB 持久化

执行 bgsave 指令时,Redis 会先触发 bgsaveCommand 进行当前状态检查,然后才会调用rdbSaveBackground,其中的逻辑如下图所示。

rdbSaveBackground 函数中最主要的工作就是调用 fork 命令生成子流程,然后在子流程中执行 rdbSave函数,也就是 save 指令最终会触发的函数。

int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
    pid_t childpid;
    long long start;
    // 检查后台是否正在执行 aof 或者 rdb 操作
    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
    // 拿出 数据保存记录,保存为 上次记录
    server.dirty_before_bgsave = server.dirty;
    // bgsave 时间
    server.lastbgsave_try = time(NULL);
    start = ustime();
    // fork 子进程
    if ((childpid = fork()) == 0) {
        int retval;
        /* 关闭子进程继承的 socket 监听 */
        closeListeningSockets(0);
        // 子进程 title 修改
        redisSetProcTitle("redis-rdb-bgsave");
        // 执行rdb 写入操作
        retval = rdbSave(filename,rsi);
        // 执行完毕以后
        ....
        // 退出子进程
        exitFromChild((retval == C_OK) ? 0 : 1);
    } else {
        /* 父进程,进行fork时间的统计和信息记录,比如说rdb_save_time_start、rdb_child_pid、和rdb_child_type */
        ....
        // rdb 保存开始时间 bgsave 子进程
        server.rdb_save_time_start = time(NULL);
        server.rdb_child_pid = childpid;
        server.rdb_child_type = RDB_CHILD_TYPE_DISK;
        updateDictResizePolicy();
        return C_OK;
    }
    return C_OK; /* unreached */
}

为什么 Redis 使用子进程而不是线程来进行后台 RDB 持久化呢?主要是出于Redis性能的考虑,我们知道Redis对客户端响应请求的工作模型是单进程和单线程的,如果在主进程内启动一个线程,这样会造成对数据的竞争条件。所以为了避免使用锁降低性能,Redis选择启动新的子进程,独立拥有一份父进程的内存拷贝,以此为基础执行RDB持久化。

但是需要注意的是,fork 会消耗一定时间,并且父子进程所占据的内存是相同的,当 Redis 键值较大时,fork 的时间会很长,这段时间内 Redis 是无法响应其他命令的。除此之外,Redis 占据的内存空间会翻倍。

生成 RDB 文件,并且持久化到硬盘

Redis 的 rdbSave 函数是真正进行 RDB 持久化的函数,它的大致流程如下:

  • 首先打开一个临时文件,
  • 调用 rdbSaveRio函数,将当前 Redis 的内存信息写入到这个临时文件中,
  • 接着调用 fflushfsyncfclose 接口将文件写入磁盘中,
  • 使用 rename 将临时文件改名为 正式的 RDB 文件,
  • 最后记录 dirtylastsave等状态信息。这些状态信息在 serverCron时会使用到。
int rdbSave(char *filename, rdbSaveInfo *rsi) {
    char tmpfile[256];
    // 当前工作目录
    char cwd[MAXPATHLEN];
    FILE *fp;
    rio rdb;
    int error = 0;

    /* 生成tmpfile文件名 temp-[pid].rdb */
    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    /* 打开文件 */
    fp = fopen(tmpfile,"w");
    .....
    /* 初始化rio结构 */
    rioInitWithFile(&rdb,fp);

    if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
        errno = error;
        goto werr;
    }

    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /* 重新命名 rdb 文件,把之前临时的名称修改为正式的 rdb 文件名称 */
    if (rename(tmpfile,filename) == -1) {
        // 异常处理
        ....
    }
    // 写入完成,打印日志
    serverLog(LL_NOTICE,"DB saved on disk");
    // 清理数据保存记录
    server.dirty = 0;
    // 最后一次完成 SAVE 命令的时间
    server.lastsave = time(NULL);
    // 最后一次 bgsave 的状态置位 成功
    server.lastbgsave_status = C_OK;
    return C_OK;
    ....
}

这里要简单说一下 fflushfsync的区别。它们俩都是用于刷缓存,但是所属的层次不同。fflush函数用于 FILE* 指针上,将缓存数据从应用层缓存刷新到内核中,而fsync 函数则更加底层,作用于文件描述符,用于将内核缓存刷新到物理设备上。

关于 Linux IO 的具体原理可以参考《聊聊Linux IO》

内存数据到 RDB 文件

rdbSaveRio 会将 Redis 内存中的数据以相对紧凑的格式写入到文件中,其文件格式的示意图如下所示。

rdbSaveRio函数的写入大致流程如下:

  • 先写入 REDIS 魔法值,然后是 RDB 文件的版本( rdb_version ),额外辅助信息 ( aux )。辅助信息中包含了 Redis 的版本,内存占用和复制库( repl-id )和偏移量( repl-offset )等。

  • 然后 rdbSaveRio 会遍历当前 Redis 的所有数据库,将数据库的信息依次写入。先写入 RDB_OPCODE_SELECTDB识别码和数据库编号,接着写入RDB_OPCODE_RESIZEDB识别码和数据库键值数量和待失效键值数量,最后会遍历所有的键值,依次写入。

  • 在写入键值时,当该键值有失效时间时,会先写入RDB_OPCODE_EXPIRETIME_MS识别码和失效时间,然后写入键值类型的识别码,最后再写入键和值。

  • 写完数据库信息后,还会把 Lua 相关的信息写入,最后再写入 RDB_OPCODE_EOF结束符识别码和校验值。

    int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
        snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
        /* 1 写入 magic字符'REDIS' 和 RDB 版本 */
        if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
        /* 2 写入辅助信息  REDIS版本,服务器操作系统位数,当前时间,复制信息比如repl-stream-db,repl-id和repl-offset等等数据*/
        if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
        /* 3 遍历每一个数据库,逐个数据库数据保存 */
        for (j = 0; j < server.dbnum; j++) {
            /* 获取数据库指针地址和数据库字典 */
            redisDb *db = server.db+j;
            dict *d = db->dict;
            /* 3.1 写入数据库部分的开始标识 */
            if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
            /* 3.2 写入当前数据库号 */
            if (rdbSaveLen(rdb,j) == -1) goto werr;
    
            uint32_t db_size, expires_size;
            /* 获取数据库字典大小和过期键字典大小 */
            db_size = (dictSize(db->dict) <= UINT32_MAX) ?
                                    dictSize(db->dict) :
                                    UINT32_MAX;
            expires_size = (dictSize(db->expires) <= UINT32_MAX) ?
                                    dictSize(db->expires) :
                                    UINT32_MAX;
            /* 3.3 写入当前待写入数据的类型,此处为 RDB_OPCODE_RESIZEDB,表示数据库大小 */
            if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
            /* 3.4 写入获取数据库字典大小和过期键字典大小 */
            if (rdbSaveLen(rdb,db_size) == -1) goto werr;
            if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
            /* 4 遍历当前数据库的键值对 */
            while((de = dictNext(di)) != NULL) {
                sds keystr = dictGetKey(de);
                robj key, *o = dictGetVal(de);
                long long expire;
    
                /* 初始化 key,因为操作的是 key 字符串对象,而不是直接操作 键的字符串内容 */
                initStaticStringObject(key,keystr);
                /* 获取键的过期数据 */
                expire = getExpire(db,&key);
                /* 4.1 保存键值对数据 */
                if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
            }
    
        }
    
        /* 5 保存 Lua 脚本*/
        if (rsi && dictSize(server.lua_scripts)) {
            di = dictGetIterator(server.lua_scripts);
            while((de = dictNext(di)) != NULL) {
                robj *body = dictGetVal(de);
                if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
                    goto werr;
            }
            dictReleaseIterator(di);
        }
    
        /* 6 写入结束符 */
        if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
    
        /* 7 写入CRC64校验和 */
        cksum = rdb->cksum;
        memrev64ifbe(&cksum);
        if (rioWrite(rdb,&cksum,8) == 0) goto werr;
        return C_OK;
    }
    

rdbSaveRio在写键值时,会调用rdbSaveKeyValuePair 函数。该函数会依次写入键值的过期时间,键的类型,键和值。

int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime)
{
    /* 如果有过期信息 */
    if (expiretime != -1) {
        /* 保存过期信息标识 */
        if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        /* 保存过期具体数据内容 */
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }

    /* Save type, key, value */
    /* 保存键值对 类型的标识 */
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
    /* 保存键值对 键的内容 */
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    /* 保存键值对 值的内容 */
    if (rdbSaveObject(rdb,val) == -1) return -1;
    return 1;
}

根据键的不同类型写入不同格式,各种键值的类型和格式如下所示。

Redis 有庞大的对象和数据结构体系,它使用六种底层数据结构构建了包含字符串对象、列表对象、哈希对象、集合对象和有序集合对象的对象系统。感兴趣的同学可以参考 《十二张图带你了解 Redis 的数据结构和对象系统》一文。

不同的数据结构进行 RDB 持久化的格式都不同。我们今天只看一下集合对象是如何持久化的。

ssize_t rdbSaveObject(rio *rdb, robj *o) {
    ssize_t n = 0, nwritten = 0;
    ....
    } else if (o->type == OBJ_SET) {
        /* Save a set value */
        if (o->encoding == OBJ_ENCODING_HT) {
            dict *set = o->ptr;
            // 集合迭代器
            dictIterator *di = dictGetIterator(set);
            dictEntry *de;
            // 写入集合长度
            if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) return -1;
            nwritten += n;
            // 遍历集合元素
            while((de = dictNext(di)) != NULL) {
                sds ele = dictGetKey(de);
                // 以字符串的形式写入,因为是SET 所以只写入 Key 即可
                if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))
                    == -1) return -1;
                nwritten += n;
            }
            dictReleaseIterator(di);
        } 
    .....
    return nwritten;
}

Share

编程小技巧之 IDEA 的 Live Template

合格的程序员都善于使用工具,正所谓君子性非异也,善假于物也。

使用自动化工具可以减少自己的工作量,提高工作效率。日常编程过程中,我们经常需要编写重复的代码片段,比如说

private static final Logger LOGGER = LoggerFactory.getLogger(HashServiceImpl.class);

每次编写时都要键入很多键,有什么方法可以快速生成这段代码呢?类似的,如何保存格式固定的常用代码片段,然后在需要时快速生成呢。IDEA 的 Live Template 是一个可行的途径。

我也是最近才逐渐使用 IDEA 的 Live Template 功能,之前虽然知道这个功能,但是没有养成使用的习惯。最近一段时间在不断审视并反思自己的编程、工作和生活习惯,才发现其中有很多可以优化精进的地方。

这也是《程序员修炼之道》中所说的 Think ! About Your Work 。

IDEA 是一个很强大的编程工具,学会使用它能够极大的提高工作效率,将精力投入到更关键的事情上,而不是将时间浪费在编写重复代码上面。

而作为 Java 程序员,令人苦恼的地方是 Java 开发过程中经常需要编写有固定格式的代码,例如说声明一个私有变量,Logger 或者 Bean 等等。对于这种小范围的代码生成,我们可以利用 IDEA 提供的 Live Templates 功能。

Live Template 并不是简单的 Code Snippet,它甚至支持 Groovy函数配置,可以编写一些复杂的逻辑,支持很复杂的代码生成。

基本使用

IDEA 自带很多常用的动态模板,都是大家平常编码时的常用语句格式。比如说下面四张动图中的语句。

四张图分别是 声明静态 String 类型成员变量,判断字符串为空,for 循环和打印函数参数。

psfs

ifn

fori

soutp

自定义 Template

打开配置页面,进入 Live Template 选项卡,我们可以看到 IDEA 预先设置的模板配置。这些模板都是最常用的一些语句,我们先来看一下它们都是如何定义的。

缩写就是 IDEA 识别的模板的别名,就像文章开头展示的当你键入 soutm 时,IDEA 就会自动识别为该模板。

而应用上下文则表示该模板在什么上下文中生效。比如说上文中时一个 System.out 的语句,它只应该在 Java 的函数体中有效,所以它的应用上下文设置为 Java: statement,在其他类型文件或者 Java 文件的成员变量声明位置都无法使用该模板。

模板内容就是你按下 Tab 键之后,IDEA 自动生成的内容,它一般包含两个部分,纯文本和参数。参数可以进行值绑定,还支持光标的自动跳转。如同上文所示,$CLASS_NAME$$METHOD_NAME$ 就是参数,而$END$是一个特殊的参数,它表示光标最后一个跳转的位置。

而参数设置就是设置这些参数的值,可以使用 IDEA 提供的一些内置函数,还可以使用强大的 Groovy 脚本。去 IDEA 的官网可以查看这些函数的具体作用。

variables

我们这里讲解一下 groovyScript("groovy code", arg1) 的使用。它能提供一切你想要的能力,它支持执行 Groovy 脚本处理输入,然后输出处理后的字符串

groovyScript("code", ...)

|  code   |   一段Groovy代码或者Groovy脚本代码绝对路径    |
|  ...    |   可选入参,这些参数会绑定到`_1, _2, _3, ..._n`, 在 Groovy 代码中使用。|

比如之前打印函数参数的模板是这样定义的。

groovyScript("'\"' + _1.collect { it + ' = [\" + ' + it + ' + \"]'}.join(', ') + '\"'", methodParameters())

methodParameters 是 IDEA 内置的函数,它返回的结果作为参数输入到 Groovy 的脚本中,生成打印参数函数的字符串。

后记

感谢大家的阅读,希望大家继续关注,也可以留言分享你最喜欢使用的编程工具和编程小技巧。

Share

用户日活月活怎么统计 - Redis HyperLogLog 详解

HyperLogLog 是一种概率数据结构,用来估算数据的基数。数据集可以是网站访客的 IP 地址,E-mail 邮箱或者用户 ID。

基数就是指一个集合中不同值的数目,比如 a, b, c, d 的基数就是 4,a, b, c, d, a 的基数还是 4。虽然 a 出现两次,只会被计算一次。

精确的计算数据集的基数需要消耗大量的内存来存储数据集。在遍历数据集时,判断当前遍历值是否已经存在唯一方法就是将这个值与已经遍历过的值进行一一对比。当数据集的数量越来越大,内存消耗就无法忽视,甚至成了问题的关键。

使用 Redis 统计集合的基数一般有三种方法,分别是使用 Redis 的 HashMap,BitMap 和 HyperLogLog。前两个数据结构在集合的数量级增长时,所消耗的内存会大大增加,但是 HyperLogLog 则不会。

Redis 的 HyperLogLog 通过牺牲准确率来减少内存空间的消耗,只需要12K内存,在标准误差0.81%的前提下,能够统计2^64个数据。所以 HyperLogLog 是否适合在比如统计日活月活此类的对精度要不不高的场景。

这是一个很惊人的结果,以如此小的内存来记录如此大数量级的数据基数。下面我们就带大家来深入了解一下 HyperLogLog 的使用,基础原理,源码实现和具体的试验数据分析。

HyperLogLog 在 Redis 中的使用

Redis 提供了 PFADDPFCOUNTPFMERGE 三个命令来供用户使用 HyperLogLog。

PFADD 用于向 HyperLogLog 添加元素。

> PFADD visitors alice bob carol
(integer) 1
> PFCOUNT visitors
(integer) 3

如果 HyperLogLog 估计的近似基数在 PFADD 命令执行之后出现了变化, 那么命令返回 1 , 否则返回 0 。 如果命令执行时给定的键不存在, 那么程序将先创建一个空的 HyperLogLog 结构, 然后再执行命令。

PFCOUNT 命令会给出 HyperLogLog 包含的近似基数。在计算出基数后,PFCOUNT 会将值存储在 HyperLogLog 中进行缓存,知道下次 PFADD 执行成功前,就都不需要再次进行基数的计算。

PFMERGE 将多个 HyperLogLog 合并为一个 HyperLogLog , 合并后的 HyperLogLog 的基数接近于所有输入 HyperLogLog 的并集基数。

> PFADD customers alice dan
(integer) 1
> PFMERGE everyone visitors customers
OK
> PFCOUNT everyone
(integer) 4

内存消耗对比实验

我们下面就来通过实验真实对比一下下面三种数据结构的内存消耗,HashMap、BitMap 和 HyperLogLog。

我们首先使用 Lua 脚本向 Redis 对应的数据结构中插入一定数量的数,然后执行
bgsave 命令,最后使用 redis-rdb-tools 的 rdb 的命令查看各个键所占的内存大小。

下面是 Lua 的脚本,不了解 Redis 执行 Lua 脚本的同学可以看一下我之前写的文章《基于Redis和Lua的分布式限流》

local key = KEYS[1]
local size = tonumber(ARGV[1])
local method = tonumber(ARGV[2])

for i=1,size,1 do
  if (method == 0)
  then
    redis.call('hset',key,i,1)
  elseif (method == 1)
  then
    redis.call('pfadd',key, i)
  else
    redis.call('setbit', key, i, 1)
  end
end

我们在通过 redis-cli 的 script load 命令将 Lua 脚本加载到 Redis 中,然后使用 evalsha 命令分别向 HashMap、HyperLogLog 和 BitMap 三种数据结构中插入了一千万个数,然后使用 rdb 命令查看各个结构内存消耗。

[root@VM_0_11_centos ~]# redis-cli -a 082203 script load "$(cat HyperLogLog.lua)"
"6255c6d0a1f32349f59fd2c8711e93f2fbc7ecf8"
[root@VM_0_11_centos ~]# redis-cli -a 082203 evalsha 6255c6d0a1f32349f59fd2c8711e93f2fbc7ecf8 1 hashmap 10000000 0
(nil)
[root@VM_0_11_centos ~]# redis-cli -a 082203 evalsha 6255c6d0a1f32349f59fd2c8711e93f2fbc7ecf8 1 hyperloglog 10000000 1
(nil)
[root@VM_0_11_centos ~]# redis-cli -a 082203 evalsha 6255c6d0a1f32349f59fd2c8711e93f2fbc7ecf8 1 bitmap 10000000 2
(nil)


[root@VM_0_11_centos ~]# rdb -c memory dump.rdb 
database,type,key,size_in_bytes,encoding,num_elements,len_largest_element,expiry

0,string,bitmap,1310768,string,1250001,1250001,
0,string,hyperloglog,14392,string,12304,12304,
0,hash,hashmap,441326740,hashtable,10000000,8,

我们进行了两轮实验,分别插入一万数字和一千万数字,三种数据结构消耗的内存统计如下所示。

统计图表

从表中可以明显看出,一万数量级时 BitMap 消耗内存最小, 一千万数量级时 HyperLogLog 消耗内存最小,但是总体来看,HyperLogLog 消耗的内存都是 14392 字节,可见 HyperLogLog 在内存消耗方面有自己的独到之处。

基本原理

HyperLogLog 是一种概率数据结构,它使用概率算法来统计集合的近似基数。而它算法的最本源则是伯努利过程。

伯努利过程就是一个抛硬币实验的过程。抛一枚正常硬币,落地可能是正面,也可能是反面,二者的概率都是 1/2 。伯努利过程就是一直抛硬币,直到落地时出现正面位置,并记录下抛掷次数k。比如说,抛一次硬币就出现正面了,此时 k 为 1; 第一次抛硬币是反面,则继续抛,直到第三次才出现正面,此时 k 为 3。

对于 n 次伯努利过程,我们会得到 n 个出现正面的投掷次数值 $ k_1, k_2 … k_n $, 其中这里的最大值是k_max。

根据一顿数学推导,我们可以得出一个结论: $2^{k_ max}$ 来作为n的估计值。也就是说你可以根据最大投掷次数近似的推算出进行了几次伯努利过程。

示意图

下面,我们就来讲解一下 HyperLogLog 是如何模拟伯努利过程,并最终统计集合基数的。

HyperLogLog 在添加元素时,会通过Hash函数,将元素转为64位比特串,例如输入5,便转为101(省略前面的0,下同)。这些比特串就类似于一次抛硬币的伯努利过程。比特串中,0 代表了抛硬币落地是反面,1 代表抛硬币落地是正面,如果一个数据最终被转化了 10010000,那么从低位往高位看,我们可以认为,这串比特串可以代表一次伯努利过程,首次出现 1 的位数为5,就是抛了5次才出现正面。

所以 HyperLogLog 的基本思想是利用集合中数字的比特串第一个 1 出现位置的最大值来预估整体基数,但是这种预估方法存在较大误差,为了改善误差情况,HyperLogLog中引入分桶平均的概念,计算 m 个桶的调和平均值。

示意图

Redis 中 HyperLogLog 一共分了 2^14 个桶,也就是 16384 个桶。每个桶中是一个 6 bit 的数组,如下图所示。

桶

HyperLogLog 将上文所说的 64 位比特串的低 14 位单独拿出,它的值就对应桶的序号,然后将剩下 50 位中第一次出现 1 的位置值设置到桶中。50位中出现1的位置值最大为50,所以每个桶中的 6 位数组正好可以表示该值。

在设置前,要设置进桶的值是否大于桶中的旧值,如果大于才进行设置,否则不进行设置。示例如下图所示。
示例

此时为了性能考虑,是不会去统计当前的基数的,而是将 HyperLogLog 头的 card 属性中的标志位置为 1,表示下次进行 pfcount 操作的时候,当前的缓存值已经失效了,需要重新统计缓存值。在后面 pfcount 流程的时候,发现这个标记为失效,就会去重新统计新的基数,放入基数缓存。

在计算近似基数时,就分别计算每个桶中的值,带入到上文将的 DV 公式中,进行调和平均和结果修正,就能得到估算的基数值。

Redis 源码分析

我们首先来看一下 HyperLogLog 对象的定义

struct hllhdr {
    char magic[4];      /* 魔法值 "HYLL" */
    uint8_t encoding;   /* 密集结构或者稀疏结构 HLL_DENSE or HLL_SPARSE. */
    uint8_t notused[3]; /* 保留位, 全为0. */
    uint8_t card[8];    /* 基数大小的缓存 */
    uint8_t registers[]; /* 数据字节数组 */
};

HyperLogLog 对象中的 registers 数组就是桶,它有两种存储结构,分别为密集存储结构和稀疏存储结构,两种结构只涉及存储和桶的表现形式,从中我们可以看到 Redis 对节省内存极致地追求。

密集存储结构

我们先看相对简单的密集存储结构,它也是十分的简单明了,既然要有 2^14 个 6 bit的桶,那么我就真使用足够多的 uint8_t 字节去表示,只是此时会涉及到字节位置和桶的转换,因为字节有 8 位,而桶只需要 6 位。

所以我们需要将桶的序号转换成对应的字节偏移量 offset_bytes 和其内部的位数偏移量 offset_bits。需要注意的是小端字节序,高位在右侧,需要进行倒转。

当 offset_bits 小于等于2时,说明一个桶就在该字节内,只需要进行倒转就能得到桶的值。

示意图

如果 offset_bits 大于 2 ,则说明一个桶分布在两个字节内,此时需要将两个字节的内容都进行倒置,然后再进行拼接得到桶的值,如下图所示。

示意图

HyperLogLog 的稀疏存储结构是为了节约内存消耗,它不像密集存储模式一样,真正找了那么多个字节数组来表示2^14 个桶,而是使用特殊的字节结构来表达。

示意图

Redis 为了方便表达稀疏存储,它将上面三种字节表示形式分别赋予了一条指令。

  • ZERO : 一字节,表示连续多少个桶计数为0,前两位为标志00,后6位表示有多少个桶,最大为64。
  • XZERO : 两个字节,表示连续多少个桶计数为0,前两位为标志01,后14位表示有多少个桶,最大为16384。
  • VAL : 一字节,表示连续多少个桶的计数为多少,前一位为标志1,四位表示连桶内计数,所以最大表示桶的计数为32。后两位表示连续多少个桶。

示意图

所以,一个初始状态的 HyperLogLog 对象只需要2 字节,也就是一个 XZERO 来存储其数据,而不需要消耗12K 内存。当 HyperLogLog 插入了少数元素时,可以只使用少量的 XZERO、VAL 和 ZERO 进行表示,如下图所示。

示意图

Redis从稀疏存储转换到密集存储的条件是:

  • 任意一个计数值从 32 变成 33,因为 VAL 指令已经无法容纳,它能表示的计数值最大为 32
  • 稀疏存储占用的总字节数超过 3000 字节,这个阈值可以通过 hll_sparse_max_bytes 参数进行调整。

具体 Redis 中的 HyperLogLog 源码由于涉及较多的位运算,这里就不多带大家学习了。大家对 HyperLogLog 有什么更好的理解或者问题都欢迎积极留言。

参考

https://thoughtbot.com/blog/hyperloglogs-in-redis
https://juejin.im/post/5c7fe7525188251ba53b0623
https://juejin.im/post/5bef9c706fb9a049c23204a3

Share

十二张图带你了解 Redis 的数据结构和对象系统

Redis是一个开源的 key-value 存储系统,它使用六种底层数据结构构建了包含字符串对象、列表对象、哈希对象、集合对象和有序集合对象的对象系统。今天我们就通过12张图来全面了解一下它的数据结构和对象系统的实现原理。

本文的内容如下:

  • 首先介绍六种基础数据结构:动态字符串,链表,字典,跳跃表,整数集合和压缩列表。
  • 其次介绍 Redis 的对象系统中的字符串对象(String)、列表对象(List)、哈希对象(Hash)、集合对象(Set)和有序集合对象(ZSet)
  • 最后介绍 Redis 的键空间和过期键( expire )实现。

数据结构

简单动态字符串

Redis 使用动态字符串 SDS 来表示字符串值。下图展示了一个值为 Redis 的 SDS结构 :

  • len: 表示字符串的真正长度(不包含NULL结束符在内)。
  • alloc: 表示字符串的最大容量(不包含最后多余的那个字节)。
  • flags: 总是占用一个字节。其中的最低3个bit用来表示header的类型。
  • buf: 字符数组。
    动态字符串示意图

SDS 的结构可以减少修改字符串时带来的内存重分配的次数,这依赖于内存预分配和惰性空间释放两大机制。

当 SDS 需要被修改,并且要对 SDS 进行空间扩展时,Redis 不仅会为 SDS 分配修改所必须要的空间,还会为 SDS 分配额外的未使用的空间

  • 如果修改后, SDS 的长度(也就是len属性的值)将小于 1MB ,那么 Redis 预分配和 len 属性相同大小的未使用空间。
  • 如果修改后, SDS 的长度将大于 1MB ,那么 Redis 会分配 1MB 的未使用空间。

比如说,进行修改后 SDS 的 len 长度为20字节,小于 1MB,那么 Redis 会预先再分配 20 字节的空间, SDS 的 buf数组的实际长度(除去最后一字节)变为 20 + 20 = 40 字节。当 SDS的 len 长度大于 1MB时,则只会再多分配 1MB的空间。

类似的,当 SDS 缩短其保存的字符串长度时,并不会立即释放多出来的字节,而是等待之后使用。

链表

链表在 Redis 中的应用非常广泛,比如列表对象的底层实现之一就是链表。除了链表对象外,发布和订阅、慢查询、监视器等功能也用到了链表。

链表示意图

Redis 的链表是双向链表,示意图如上图所示。链表是最为常见的数据结构,这里就不在细说。

Redis 的链表结构的dup 、 free 和 match 成员属性是用于实现多态链表所需的类型特定函数:

  • dup 函数用于复制链表节点所保存的值,用于深度拷贝。
  • free 函数用于释放链表节点所保存的值。
  • match 函数则用于对比链表节点所保存的值和另一个输入值是否相等。

字典

字典被广泛用于实现 Redis 的各种功能,包括键空间和哈希对象。其示意图如下所示。

字典示意图

Redis 使用 MurmurHash2 算法来计算键的哈希值,并且使用链地址法来解决键冲突,被分配到同一个索引的多个键值对会连接成一个单向链表。

跳跃表

Redis 使用跳跃表作为有序集合对象的底层实现之一。它以有序的方式在层次化的链表中保存元素, 效率和平衡树媲美 —— 查找、删除、添加等操作都可以在对数期望时间下完成, 并且比起平衡树来说, 跳跃表的实现要简单直观得多。

跳表示意图

跳表的示意图如上图所示,这里只简单说一下它的核心思想,并不进行详细的解释。

如示意图所示,zskiplistNode 是跳跃表的节点,其 ele 是保持的元素值,score 是分值,节点按照其 score 值进行有序排列,而 level 数组就是其所谓的层次化链表的体现。

每个 node 的 level 数组大小都不同, level 数组中的值是指向下一个 node 的指针和 跨度值 (span),跨度值是两个节点的score的差值。越高层的 level 数组值的跨度值就越大,底层的 level 数组值的跨度值越小。

level 数组就像是不同刻度的尺子。度量长度时,先用大刻度估计范围,再不断地用缩小刻度,进行精确逼近。

当在跳跃表中查询一个元素值时,都先从第一个节点的最顶层的 level 开始。比如说,在上图的跳表中查询 o2 元素时,先从o1 的节点开始,因为 zskiplist 的 header 指针指向它。

先从其 level[3] 开始查询,发现其跨度是 2,o1 节点的score是1.0,所以加起来为 3.0,大于 o2 的score值2.0。所以,我们可以知道 o2 节点在 o1 和 o3 节点之间。这时,就改用小刻度的尺子了。就用level[1]的指针,顺利找到 o2 节点。

整数集合

整数集合 intset 是集合对象的底层实现之一,当一个集合只包含整数值元素,并且这个集合的元素数量不多时, Redis 就会使用整数集合作为集合对象的底层实现。

整数集合的示意图

如上图所示,整数集合的 encoding 表示它的类型,有int16_t,int32_t 或者int64_t。其每个元素都是 contents 数组的一个数组项,各个项在数组中按值的大小从小到大有序的排列,并且数组中不包含任何重复项。length 属性就是整数集合包含的元素数量。

压缩列表

压缩队列 ziplist 是列表对象和哈希对象的底层实现之一。当满足一定条件时,列表对象和哈希对象都会以压缩队列为底层实现。

压缩队列的示意图

压缩队列是 Redis 为了节约内存而开发的,是由一系列特殊编码的连续内存块组成的顺序型数据结构。它的属性值有:

  • zlbytes : 长度为 4 字节,记录整个压缩数组的内存字节数。
  • zltail : 长度为 4 字节,记录压缩队列表尾节点距离压缩队列的起始地址有多少字节,通过该属性可以直接确定尾节点的地址。
  • zllen : 长度为 2 字节,包含的节点数。当属性值小于 INT16_MAX时,该值就是节点总数,否则需要遍历整个队列才能确定总数。
  • zlend : 长度为 1 字节,特殊值,用于标记压缩队列的末端。

中间每个节点 entry 由三部分组成:

  • previous_entry_length : 压缩列表中前一个节点的长度,和当前的地址进行指针运算,计算出前一个节点的起始地址。
  • encoding: 节点保存数据的类型和长度
  • content :节点值,可以为一个字节数组或者整数。

对象

上面介绍了 6 种底层数据结构,Redis 并没有直接使用这些数据结构来实现键值数据库,而是基于这些数据结构创建了一个对象系统,这个系统包含字符串对象、列表对象、哈希对象、集合对象和有序集合这五种类型的对象,每个对象都使用到了至少一种前边讲的底层数据结构。

Redis 根据不同的使用场景和内容大小来判断对象使用哪种数据结构,从而优化对象在不同场景下的使用效率和内存占用。

Redis 的 redisObject 结构的定义如下所示。

typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; 
    int refcount;
    void *ptr;
} robj;

其中 type 是对象类型,包括REDIS_STRING, REDIS_LIST, REDIS_HASH, REDIS_SET 和 REDIS_ZSET。

encoding是指对象使用的数据结构,全集如下。
对象的编码

字符串对象

我们首先来看字符串对象的实现,如下图所示。

字符串对象示意图

如果一个字符串对象保存的是一个字符串值,并且长度大于32字节,那么该字符串对象将使用 SDS 进行保存,并将对象的编码设置为 raw,如图的上半部分所示。如果字符串的长度小于32字节,那么字符串对象将使用embstr 编码方式来保存。

embstr 编码是专门用于保存短字符串的一种优化编码方式,这个编码的组成和 raw 编码一致,都使用 redisObject 结构和 sdshdr 结构来保存字符串,如上图的下半部所示。

但是 raw 编码会调用两次内存分配来分别创建上述两个结构,而embstr则通过一次内存分配来分配一块连续的空间,空间中一次包含两个结构。

embstr 只需一次内存分配,而且在同一块连续的内存中,更好的利用缓存带来的优势,但是 embstr 是只读的,不能进行修改,当一个 embstr 编码的字符串对象进行 append 操作时, redis 会现将其转变为 raw 编码再进行操作。

列表对象

列表对象的编码可以是 ziplist 或 linkedlist。其示意图如下所示。

列表对象示意图

当列表对象可以同时满足以下两个条件时,列表对象使用 ziplist 编码:

  • 列表对象保存的所有字符串元素的长度都小于 64 字节。
  • 列表对象保存的元素数量数量小于 512 个。

不能满足这两个条件的列表对象需要使用 linkedlist 编码或者转换为 linkedlist 编码。

哈希对象

哈希对象的编码可以使用 ziplist 或 dict。其示意图如下所示。

当哈希对象使用压缩队列作为底层实现时,程序将键值对紧挨着插入到压缩队列中,保存键的节点在前,保存值的节点在后。如下图的上半部分所示,该哈希有两个键值对,分别是 name:Tom 和 age:25。

哈希对象示意图

当哈希对象可以同时满足以下两个条件时,哈希对象使用 ziplist 编码:

  • 哈希对象保存的所有键值对的键和值的字符串长度都小于64字节。
  • 哈希对象保存的键值对数量小于512个。

不能满足这两个条件的哈希对象需要使用 dict 编码或者转换为 dict 编码。

集合对象

集合对象的编码可以使用 intset 或者 dict。

intset 编码的集合对象使用整数集合最为底层实现,所有元素都被保存在整数集合里边。

而使用 dict 进行编码时,字典的每一个键都是一个字符串对象,每个字符串对象就是一个集合元素,而字典的值全部都被设置为NULL。如下图所示。

集合对象示意图

当集合对象可以同时满足以下两个条件时,对象使用 intset 编码:

  • 集合对象保存的所有元素都是整数值。
  • 集合对象保存的元素数量不超过512个。

否则使用 dict 进行编码。

有序集合对象

有序集合的编码可以为 ziplist 或者 skiplist。

有序集合使用 ziplist 编码时,每个集合元素使用两个紧挨在一起的压缩列表节点表示,前一个节点是元素的值,第二个节点是元素的分值,也就是排序比较的数值。

压缩列表内的集合元素按照分值从小到大进行排序,如下图上半部分所示。

有序集合使用 skiplist 编码时使用 zset 结构作为底层实现,一个 zet 结构同时包含一个字典和一个跳跃表。

其中,跳跃表按照分值从小到大保存所有元素,每个跳跃表节点保存一个元素,其score值是元素的分值。而字典则创建一个一个从成员到分值的映射,字典的键是集合成员的值,字典的值是集合成员的分值。通过字典可以在O(1)复杂度查找给定成员的分值。如下图所示。

跳跃表和字典中的集合元素值对象都是共享的,所以不会额外消耗内存。

有序集合示意图

当有序集合对象可以同时满足以下两个条件时,对象使用 ziplist 编码:

  • 有序集合保存的元素数量少于128个;
  • 有序集合保存的所有元素的长度都小于64字节。

否则使用 skiplist 编码。

数据库键空间

Redis 服务器都有多个 Redis 数据库,每个Redis 数据都有自己独立的键值空间。每个 Redis 数据库使用 dict 保存数据库中所有的键值对。

redis server.jpg

键空间的键也就是数据库的键,每个键都是一个字符串对象,而值对象可能为字符串对象、列表对象、哈希表对象、集合对象和有序集合对象中的一种对象。

除了键空间,Redis 也使用 dict 结构来保存键的过期时间,其键是键空间中的键值,而值是过期时间,如上图所示。

通过过期字典,Redis 可以直接判断一个键是否过期,首先查看该键是否存在于过期字典,如果存在,则比较该键的过期时间和当前服务器时间戳,如果大于,则该键过期,否则未过期。

Share

分布式数据缓存中的一致性哈希算法

一致性哈希算法在分布式缓存领域的 MemCache,负载均衡领域的 Nginx 以及各类 RPC 框架中都有广泛的应用,它主要是为了解决传统哈希函数添加哈希表槽位数后要将关键字重新映射的问题。

本文会介绍一致性哈希算法的原理及其实现,并给出其不同哈希函数实现的性能数据对比,探讨Redis 集群的数据分片实现等,文末会给出实现的具体 github 地址。

Memcached 与客户端分布式缓存

Memcached 是一个高性能的分布式缓存系统,然而服务端没有分布式功能,各个服务器不会相互通信。它的分布式实现依赖于客户端的程序库,这也是 Memcached 的一大特点。比如第三方的 spymemcached 客户端就基于一致性哈希算法实现了其分布式缓存的功能。

其具体步骤如下:

  • 向 Memcached 添加数据,首先客户端的算法根据 key 值计算出该 key 对应的服务器。
  • 服务器选定后,保存缓存数据。
  • 获取数据时,对于相同的 key ,客户端的算法可以定位到相同的服务器,从而获取数据。

在这个过程中,客户端的算法首先要保证缓存的数据尽量均匀地分布在各个服务器上,其次是当个别服务器下线或者上线时,会出现数据迁移,应该尽量减少需要迁移的数据量。

客户端算法是客户端分布式缓存性能优劣的关键。

普通的哈希表算法一般都是计算出哈希值后,通过取余操作将 key 值映射到不同的服务器上,但是当服务器数量发生变化时,取余操作的除数发生变化,所有 key 所映射的服务器几乎都会改变,这对分布式缓存系统来说是不可以接收的。

一致性哈希算法能尽可能减少了服务器数量变化所导致的缓存迁移。

哈希算法

首先,一致性哈希算法依赖于普通的哈希算法。大多数同学对哈希算法的理解可能都停留在 JDK 的 hashCode 函数上。其实哈希算法有很多种实现,它们在不同方面都各有优劣,针对不同的场景可以使用不同的哈希算法实现。

下面,我们会介绍一下几款比较常见的哈希算法,并且了解一下它们在分布均匀程度,哈希碰撞概率和性能等方面的优劣。

MD5 算法:全称为 Message-Digest Algorithm 5,用于确保信息传输完整一致。是计算机广泛使用的杂凑算法之一,主流编程语言普遍已有 MD5 实现。MD5 的作用是把大容量信息压缩成一种保密的格式(就是把一个任意长度的字节串变换成定长的16进制数字串)。常见的文件完整性校验就是使用 MD5。

CRC 算法:全称为 CyclicRedundancyCheck,中文名称为循环冗余校验。它是一类重要的,编码和解码方法简单,检错和纠错能力强的哈希算法,在通信领域广泛地用于实现差错控制。

MurmurHash 算法:高运算性能,低碰撞率,由 Austin Appleby 创建于 2008 年,现已应用到 Hadoop、libstdc++、nginx、libmemcached 等开源系统。Java 界中 Redis,Memcached,Cassandra,HBase,Lucene和Guava 都在使用它。

FNV 算法:全称为 Fowler-Noll-Vo 算法,是以三位发明人 Glenn Fowler,Landon Curt Noll,Phong Vo 的名字来命名的,最早在 1991 年提出。 FNV 能快速 hash 大量数据并保持较小的冲突率,它的高度分散使它适用于 hash 一些非常相近的字符串,比如 URL,hostname,文件名,text 和 IP 地址等。

Ketama 算法:一致性哈希算法的实现之一,其他的哈希算法有通用的一致性哈希算法实现,只不过是替换了哈希映射函数而已,但 Ketama 是一整套的流程,我们将在后面介绍。

一致性哈希算法

下面,我们以分布式缓存场景为例,分析一下一致性哈希算法环的原理。

首先将缓存服务器( ip + 端口号)进行哈希,映射成环上的一个节点,计算出缓存数据 key 值的 hash key,同样映射到环上,并顺时针选取最近的一个服务器节点作为该缓存应该存储的服务器。具体实现见后续的章节。

比如说,当存在 A,B,C,D 四个缓存服务器时,它们及其 key 值为1的缓存数据在一致性哈希环上的位置如下图所示,根据顺时针取最近一个服务器节点的规则,该缓存数据应该存储在服务器 B 上。

当要存储一个 key 值为4的缓存数据时,它在一致性哈希环上的位置如下所示,所以它应该存储在服务器 C 上。

类似的,key 值为5,6的数据应该存在服务 D 上,key 值为7,8的数据应该存储在服务 A 上。

此时,服务器 B 宕机下线,服务器 B 中存储的缓存数据要进行迁移,但由于一致性哈希环的存在,只需要迁移key 值为1的数据,其他的数据的存储服务器不会发生变化。这也是一致性哈希算法比取余映射算法出色的地方。

由于服务器 B 下线,key 值为1的数据顺时针最近的服务器是 C ,所以数据存迁移到服务器 C 上。

现实情况下,服务器在一致性哈希环上的位置不可能分布的这么均匀,导致了每个节点实际占据环上的区间大小不一。

这种情况下,可以增加虚节点来解决。通过增加虚节点,使得每个节点在环上所“管辖”的区域更加均匀。这样就既保证了在节点变化时,尽可能小的影响数据分布的变化,而同时又保证了数据分布的均匀。

具体实现

下面我们实现 Memcached 分布式缓存场景下的一致性哈希算法,并给出具体的测试性能数据。该实现借鉴了 kiritomoe 博文中的实现和 spymemcached 客户端代码。具体实现请看我的github,地址为 https://github.com/ztelur/consistent-hash-algorithm。

NodeLocator 是分布式缓存场景下一致性哈希算法的抽象,它有一个 getPrimary 函数,接收一个缓存数据的 key 值,输出存储该缓存数据的服务器实例。

public interface NodeLocator {
    MemcachedNode getPrimary(String k);
}

下面是通用的一致性哈希算法的实现,它使用 TreeMap 作为一致性哈希环的数据结构,其 ceilingEntry 函数可以获取环上最近的一个节点。buildConsistentHashRing 函数中包含了构建一致性哈希环的过程,默认加入了 12 个虚拟节点。

public class ConsistentHashNodeLocator implements NodeLocator {
    private final static int VIRTUAL_NODE_SIZE = 12;
    private final static String VIRTUAL_NODE_SUFFIX = "-";

    private volatile TreeMap<Long, MemcachedNode> hashRing;
    private final HashAlgorithm hashAlg;

    public ConsistentHashNodeLocator(List<MemcachedNode> nodes, HashAlgorithm hashAlg) {
        this.hashAlg = hashAlg;
        this.hashRing = buildConsistentHashRing(hashAlg, nodes);
    }


    @Override
    public MemcachedNode getPrimary(String k) {
        long hash = hashAlg.hash(k);
        return getNodeForKey(hashRing, hash);
    }

    private MemcachedNode getNodeForKey(TreeMap<Long, MemcachedNode> hashRing, long hash) {
        /* 向右找到第一个key */
        Map.Entry<Long, MemcachedNode> locatedNode = hashRing.ceilingEntry(hash);
        /* 想象成为一个环,超出尾部取出第一个 */
        if (locatedNode == null) {
            locatedNode = hashRing.firstEntry();
        }
        return locatedNode.getValue();
    }

    private TreeMap<Long, MemcachedNode> buildConsistentHashRing(HashAlgorithm hashAlgorithm, List<MemcachedNode> nodes) {
        TreeMap<Long, MemcachedNode> virtualNodeRing = new TreeMap<>();
        for (MemcachedNode node : nodes) {
            for (int i = 0; i < VIRTUAL_NODE_SIZE; i++) {
                // 新增虚拟节点的方式如果有影响,也可以抽象出一个由物理节点扩展虚拟节点的类
                virtualNodeRing.put(hashAlgorithm.hash(node.getSocketAddress().toString() + VIRTUAL_NODE_SUFFIX + i), node);
            }
        }
        return virtualNodeRing;
    }
}

getPrimary 函数中,首先使用 HashAlgorithm 计算出 key 值对应的哈希值,然后调用 getNodeForKey 函数从 TreeMap 中获取对应的最近的服务器节点实例。

HashAlgorithm 是对哈希算法的抽象,一致性哈希算法可以使用各种普通的哈希算法,比如说 CRC ,MurmurHash 和 FNV 等。下面,我们将会对比各种哈希算法给该实现带来的性能差异性。

性能测试

测试数据是评价一个算法好坏的最为真实有效的方法,量化的思维模式一定要有,这也是程序员进阶的法宝之一。我们以下面四个量化的指标对基于不同哈希函数的一致性哈希算法进行评测。

  • 统计每个服务器节点存储的缓存数量,计算方差和标准差。测量缓存分布均匀情况,我们可以模拟 50000个缓存数据,分配到100 个服务器,测试最后个节点存储缓存数据量的方差和标准差。
  • 随机下线10%的服务器,重新分配缓存,统计缓存迁移比率。测量节点上下线的情况,我们可以模拟 50000 个缓存数据,分配到100 个指定服务器,之后随机下线 10 个服务器并重新分配这50000个数据,统计缓存分配到不同服务器的比例,也就是迁移比率。
  • 使用JMH对不同哈希算法的执行效率进行对比。

具体评测算法如下。

public class NodeLocatorTest {

    /**
     * 测试分布的离散情况
     */
    @Test
    public void testDistribution() {
        List<MemcachedNode> servers = new ArrayList<>();
        for (String ip : ips) {
            servers.add(new MemcachedNode(new InetSocketAddress(ip, 8080)));
        }
        // 使用不同的DefaultHashAlgorithm进行测试,得出不同的数据
        NodeLocator nodeLocator = new ConsistentHashNodeLocator(servers, DefaultHashAlgorithm.NATIVE_HASH);
        // 构造 50000 随机请求
        List<String> keys = new ArrayList<>();
        for (int i = 0; i < 50000; i++) {
            keys.add(UUID.randomUUID().toString());
        }
        // 统计分布
        AtomicLongMap<MemcachedNode> atomicLongMap = AtomicLongMap.create();
        for (MemcachedNode server : servers) {
            atomicLongMap.put(server, 0);
        }
        for (String key : keys) {
            MemcachedNode node = nodeLocator.getPrimary(key);
            atomicLongMap.getAndIncrement(node);
        }
        System.out.println(StatisticsUtil.variance(atomicLongMap.asMap().values().toArray(new Long[]{})));
        System.out.println(StatisticsUtil.standardDeviation(atomicLongMap.asMap().values().toArray(new Long[]{})));

    }

    /**
     * 测试节点新增删除后的变化程度
     */
    @Test
    public void testNodeAddAndRemove() {
        List<MemcachedNode> servers = new ArrayList<>();
        for (String ip : ips) {
            servers.add(new MemcachedNode(new InetSocketAddress(ip, 8080)));
        }
        //随机下线10个服务器, 先shuffle,然后选择0到90,简单模仿随机计算。
        Collections.shuffle(servers);
        List<MemcachedNode> serverChanged = servers.subList(0, 90);
        NodeLocator loadBalance = new ConsistentHashNodeLocator(servers, DefaultHashAlgorithm.NATIVE_HASH);
        NodeLocator changedLoadBalance = new ConsistentHashNodeLocator(serverChanged, DefaultHashAlgorithm.NATIVE_HASH);

        // 构造 50000 随机请求
        List<String> keys = new ArrayList<>();
        for (int i = 0; i < 50000; i++) {
            keys.add(UUID.randomUUID().toString());
        }
        int count = 0;
        for (String invocation : keys) {
            MemcachedNode origin = loadBalance.getPrimary(invocation);
            MemcachedNode changed = changedLoadBalance.getPrimary(invocation);
           // 统计发生变化的数值
            if (!origin.getSocketAddress().equals(changed.getSocketAddress())) count++;
        }
        System.out.println(count / 50000D);
    }
    static String[] ips = {...};
}

JMH的测试脚本如下所示。

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Thread)
public class JMHBenchmark {

    private NodeLocator nodeLocator;
    private List<String> keys;

    @Benchmark
    public void test() {
        for (String key : keys) {
            MemcachedNode node = nodeLocator.getPrimary(key);
        }
    }

    public static void main(String[] args) throws RunnerException {
        Options opt = new OptionsBuilder()
                .include(JMHBenchmark.class.getSimpleName())
                .forks(1)
                .warmupIterations(5)
                .measurementIterations(5)
                .build();
        new Runner(opt).run();
    }

    @Setup
    public void prepare() {
        List<MemcachedNode> servers = new ArrayList<>();
        for (String ip : ips) {
            servers.add(new MemcachedNode(new InetSocketAddress(ip, 8080)));
        }
        nodeLocator = new ConsistentHashNodeLocator(servers, DefaultHashAlgorithm.MURMUR_HASH);
        // 构造 50000 随机请求
        keys = new ArrayList<>();
        for (int i = 0; i < 50000; i++) {
            keys.add(UUID.randomUUID().toString());
        }
    }

    @TearDown
    public void shutdown() {
    }
    static String[] ips = {...};
}

分别测试了 JDK 哈希算法,FNV132 算法,CRC 算法,MurmurHash 算法和Ketama 算法,分别对应 DefaultHashAlgorithmNATIVE_HASHFNV1_32_HASHCRC_HASHMURMUR_HASHKETAMA_HASH 。具体数据如下所示。

虚拟槽分区

有些文章说,Redis 集群并没有使用一致性哈希算法,而是使用虚拟槽分区算法。但是外网(地址见文末)上都说 Redis 使用的虚拟槽分区只是一致性哈希算法的变种,虚拟槽可以允许 Redis 动态扩容。

或许只有去了解一下Redis的源码才能对这个问题作出准确的回答。请了解的同学积极留言解答,谢谢。

github 地址: https://github.com/ztelur/consistent-hash-algorithm
redis分布式讨论的地址: https://www.reddit.com/r/redis/comments/4yztxi/which_one_is_better_hash_slot_or_consistent/

参考

Share