Guava的布隆过滤器

 程序世界的算法都要在时间,资源占用甚至正确率等多种因素间进行平衡。同样的问题,所属的量级或场景不同,所用算法也会不同,其中也会涉及很多的trade-off。

If there’s one rule in programming, it’s this: there will always be trade-offs.

你是否真的存在

 今天我们就来探讨如何判断一个值是否存在于已有的集合问题。这类问题在很多场景下都会遇到,比如说防止缓存击穿,爬虫重复URL检测,字典纠缠和CDN代理缓存等。

 我们以网络爬虫为例。网络间的链接错综复杂,爬虫程序在网络间“爬行”很可能会形成“环”。为了避免形成“环”,程序需要知道已经访问过网站的URL。当程序又遇到一个网站,根据它的URL,怎么判断是否已经访问过呢?

 第一个想法就是将已有URL放置在HashSet中,然后利用HashSet的特性进行判断。它只花费O(1)的时间。但是,该方法消耗的内存空间很大,就算只有1亿个URL,每个URL只算50个字符,就需要大约5GB内存。

 如何减少内存占用呢?URL可能太长,我们使用MD5等单向哈希处理后再存到HashSet中吧,处理后的字段只有128Bit,这样可以节省大量的空间。我们的网络爬虫程序又可以继续执行了。

 但是好景不长,网络世界浩瀚如海,URL的数量急速增加,以128bit的大小进行存储也要占据大量的内存。

 这种情况下,我们还可以使用BitSet,使用哈希函数将URL处理为1bit,存储在BitSet中。但是,哈希函数发生冲突的概率比较高,若要降低冲突概率到1%,就要将BitSet的长度设置为URL个数的100倍。

 但是冲突无法避免,这就带来了误判。理想中的算法总是又准确又快捷,但是现实中往往是“一地鸡毛”。我们真的需要100%的正确率吗?如果需要,时间和空间的开销无法避免;如果能够忍受低概率的错误,就有极大地降低时间和空间的开销的方法。

 所以,一切都要trade-off。布隆过滤器(Bloom Filter)就是一种具有较低错误率,但是极大节约空间消耗的算法。

布隆过滤器

 Bloom Filter是一种空间效率很高的随机数据结构,它利用位数组很简洁地表示一个集合,并能判断一个元素是否属于这个集合。Bloom Filter的这种高效是有一定代价的:在判断一个元素是否属于某个集合时,有可能会把不属于这个集合的元素误认为属于这个集合(false positive)。因此,Bloom Filter不适合那些“零错误”的应用场合。而在能容忍低错误率的应用场合下,Bloom Filter通过极少的错误换取了存储空间的极大节省。

A Bloom filter is a space-efficient probabilistic data structure, conceived by Burton Howard Bloom in 1970, that is used to test whether an element is a member of a set. False positive matches are possible, but false negatives are not, thus a Bloom filter has a 100% recall rate. In other words, a query returns either “possibly in set” or “definitely not in set”.

 上述描述引自维基百科,特点总结为如下:

  • 空间效率高的概率型数据结构,用来检查一个元素是否在一个集合中。
  • 对于一个元素检测是否存在的调用,BloomFilter会告诉调用者两个结果之一:可能存在或者一定不存在。

 布隆过滤器的使用场景很多,除了上文说的网络爬虫,还有处理缓存击穿和避免磁盘读取等。Goole Bigtable,Apache HBase和Postgresql等都使用了布隆过滤器。

 我们就以下面这个例子具体描述使用BloomFilter的场景,以及在此场景下,BloomFilter的优势和劣势。

 一组元素存在于磁盘中,数据量特别大,应用程序希望在元素不存在的时候尽量不读磁盘,此时,可以在内存中构建这些磁盘数据的BloomFilter,对于一次读数据的情况,分为以下几种情况:

 我们知道HashMap或者Set等数据结构也可以支持上述场景,这里我们就具体比较一下二者的优劣,并给出具体的数据。

精确度量十分重要,对于算法的性能,我们不能只是简单的感官上比较,要进行具体的计算和性能测试。找到不同算法之间的平衡点,根据平衡点和现实情况来决定使用哪种算法。就像Redis一样,它对象在不同情况下使用不同的数据结构,比如说列表对象的内置结构可以为ziplist或者linkedlist,在不同的场景下使用不同的数据结构。

 请求的元素不在磁盘中,如果BloomFilter返回不存在,那么应用不需要走读盘逻辑,假设此概率为P1。如果BloomFilter返回可能存在,那么属于误判情况,假设此概率为P2。请求的元素在磁盘中,BloomFilter返回存在,假设此概率为P3。

 如果使用HashMap等数据结构,情况如下:

  • 请求的数据不在磁盘中,应用不走读盘逻辑,此概率为P1+P2
  • 请求的元素在磁盘中,应用走读盘逻辑,此概率为P3

 假设应用不读盘逻辑的开销为C1,走读盘逻辑的开销为C2,那么,BloomFilter和hashmap的开销分别为

  • Cost(BloomFilter) = P1 C1 + (P2 + P3) C2
  • Cost(HashMap) = (P1 + P2) C1 + P3 C2;
  • Delta = Cost(BloomFilter) - Cost(HashMap)
    = P2 * (C2 - C1)
    

 因此,BloomFilter相当于以增加P2 * (C2 - C1)的时间开销,来获得相对于HashMap而言更少的空间开销。

 既然P2是影响BloomFilter性能开销的主要因素,那么BloomFilter设计时如何降低概率P2(即误判率false positive probability)呢?,接下来的BloomFilter的原理将回答这个问题。

原理解析

 初始状态下,布隆过滤器是一个包含m位的位数组,每一位都置为0。

 为了表达S={x1, x2,…,xn}这样一个n个元素的集合,Bloom Filter使用k个相互独立的哈希函数,它们分别将集合中的每个元素映射到{1,…,m}的范围中。对任意一个元素x,第i个哈希函数映射的位置hi(x)就会被置为1(1≤i≤k)。注意,如果一个位置多次被置为1,那么只有第一次会起作用,后面几次将没有任何效果。在下图中,k=3,且有两个哈希函数选中同一个位置(从左边数第五位)。

 在判断y是否属于这个集合时,我们对y应用k次哈希函数,如果所有hi(y)的位置都是1(1≤i≤k),那么我们就认为y是集合中的元素,否则就认为y不是集合中的元素。下图中y1就不是集合中的元素。y2则可能属于这个集合,或者刚好是一个误判。

 下面我们来看一下具体的例子,哈希函数的数量为3,首先加入1,10两个元素。通过下面两个图,我们可以清晰看到1,10两个元素被三个不同的韩系函数映射到不同的bit上,然后判断3是否在集合中,3映射的3个bit都没有值,所以判断绝对不在集合中。

 关于误判率,实际的使用中,期望能给定一个误判率期望和将要插入的元素数量,能计算出分配多少的存储空间较合适。这涉及很多最优数值计算问题,比如说错误率估计,最优的哈希函数个数和位数组的大小等,相关公式计算感兴趣的同学可以自行百度,重温一下大学的计算微积分时光。

Guava的布隆过滤器

 这就又要提起我们的Guava了,它是Google开源的Java包,提供了很多常用的功能,比如说我们之前总结的超详细的Guava RateLimiter限流原理解析

 Guava中,布隆过滤器的实现主要涉及到2个类,BloomFilterBloomFilterStrategies,首先来看一下BloomFilter的成员变量。需要注意的是不同Guava版本的BloomFilter实现不同。

 /** guava实现的以CAS方式设置每个bit位的bit数组 */
  private final LockFreeBitArray bits;
  /** hash函数的个数 */
  private final int numHashFunctions;
  /** guava中将对象转换为byte的通道 */
  private final Funnel<? super T> funnel;
  /**
   * 将byte转换为n个bit的策略,也是bloomfilter hash映射的具体实现
   */
  private final Strategy strategy;

 这是它的4个成员变量:

  • LockFreeBitArray是定义在BloomFilterStrategies中的内部类,封装了布隆过滤器底层bit数组的操作。
  • numHashFunctions表示哈希函数的个数。
  • Funnel,它和PrimitiveSink配套使用,能将任意类型的对象转化成Java基本数据类型,默认用java.nio.ByteBuffer实现,最终均转化为byte数组。
  • Strategy是定义在BloomFilter类内部的接口,代码如下,主要有2个方法,putmightContain
interface Strategy extends java.io.Serializable {
    /** 设置元素 */
    <T> boolean put(T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits);
    /** 判断元素是否存在*/
    <T> boolean mightContain(
    T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits);
    .....
}

 创建布隆过滤器,BloomFilter并没有公有的构造函数,只有一个私有构造函数,而对外它提供了5个重载的create方法,在缺省情况下误判率设定为3%,采用BloomFilterStrategies.MURMUR128_MITZ_64的实现。

BloomFilterStrategies.MURMUR128_MITZ_64Strategy的两个实现之一,Guava以枚举的方式提供这两个实现,这也是《Effective Java》书中推荐的提供对象的方法之一。

enum BloomFilterStrategies implements BloomFilter.Strategy {
    MURMUR128_MITZ_32() {//....}
    MURMUR128_MITZ_64() {//....}
}

 二者对应了32位哈希映射函数,和64位哈希映射函数,后者使用了murmur3 hash生成的所有128位,具有更大的空间,不过原理是相通的,我们选择相对简单的MURMUR128_MITZ_32来分析。

 先来看一下它的put方法,它用两个hash函数来模拟多个hash函数的情况,这是布隆过滤器的一种优化。

public <T> boolean put(
    T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits) {
    long bitSize = bits.bitSize();
    // 先利用murmur3 hash对输入的funnel计算得到128位的哈希值,funnel现将object转换为byte数组,
    // 然后在使用哈希函数转换为long
    long hash64 = Hashing.murmur3_128().hashObject(object, funnel).asLong();
    // 根据hash值的高低位算出hash1和hash2
    int hash1 = (int) hash64;
    int hash2 = (int) (hash64 >>> 32);

    boolean bitsChanged = false;
    // 循环体内采用了2个函数模拟其他函数的思想,相当于每次累加hash2
    for (int i = 1; i <= numHashFunctions; i++) {
    int combinedHash = hash1 + (i * hash2);
    // 如果是负数就变为正数
    if (combinedHash < 0) {
        combinedHash = ~combinedHash;
    }
    // 通过基于bitSize取模的方式获取bit数组中的索引,然后调用set函数设置。
    bitsChanged |= bits.set(combinedHash % bitSize);
    }
    return bitsChanged;
}

 在put方法中,先是将索引位置上的二进制置为1,然后用bitsChanged记录插入结果,如果返回true表明没有重复插入成功,而mightContain方法则是将索引位置上的数值取出,并判断是否为0,只要其中出现一个0,那么立即判断为不存在。

public <T> boolean mightContain(
    T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits) {
    long bitSize = bits.bitSize();
    long hash64 = Hashing.murmur3_128().hashObject(object, funnel).asLong();
    int hash1 = (int) hash64;
    int hash2 = (int) (hash64 >>> 32);

    for (int i = 1; i <= numHashFunctions; i++) {
    int combinedHash = hash1 + (i * hash2);
    // Flip all the bits if it's negative (guaranteed positive number)
    if (combinedHash < 0) {
        combinedHash = ~combinedHash;
    }
    // 和put的区别就在这里,从set转换为get,来判断是否存在
    if (!bits.get(combinedHash % bitSize)) {
        return false;
    }
    }
    return true;
}

Guava为了提供效率,自己实现了LockFreeBitArray来提供bit数组的无锁设置和读取。我们只来看一下它的put函数。

boolean set(long bitIndex) {
    if (get(bitIndex)) {
    return false;
    }

    int longIndex = (int) (bitIndex >>> LONG_ADDRESSABLE_BITS);
    long mask = 1L << bitIndex; // only cares about low 6 bits of bitIndex

    long oldValue;
    long newValue;
    // 经典的CAS自旋重试机制
    do {
    oldValue = data.get(longIndex);
    newValue = oldValue | mask;
    if (oldValue == newValue) {
        return false;
    }
    } while (!data.compareAndSet(longIndex, oldValue, newValue));

    bitCount.increment();
    return true;
}

后记

 欢迎大家留言和持续关注我。

参考

Share

TCP报文发送的那些事

 今天我们来总结学习一下TCP发送报文的相关知识,主要包括发送报文的步骤,MSS,滑动窗口和Nagle算法。

发送报文

 该节是根据陶辉大神的系列文章总结。如下图所示,我们一起来看一下TCP发送报文时操作系统内核都做了那些事情。其中有些概念在接下来的小节中会介绍。

 首先,用户程序在用户态调用send方法来发送一段较长的数据。然后send函数调用内核态的tcp_sendmsg方法进行处理。

send方法返回成功,内核也不一定真正将IP报文都发送到网络中了,也就是说内核发送报文和send方法是不同步的。所以,内核将用户态内存中的发送数据,拷贝到内核态内存中,不依赖于用户态内存,这样可以使得进程可以快速释放发送数据占用的用户态内存。

 在拷贝过程中,内核将待发送数据,按照MSS来划分成多个尽量接近MSS大小的分片,放到这个TCP连接对应的tcp_write_queue发送队列中

 内核中为这个TCP连接分配的内核缓存,也就是tcp_write_queue是有限的。当没有多余的空间来复制用户态的待发送数据时,就需要调用一个方法sk_stream_wait_memory来等待滑动窗口移动,释放出一些缓存出来(收到ACK后,不需要再缓存原来已经发送出的报文,因为既然已经确认对方收到,就不需要定时重发,自然就释放缓存了)

 当这个套接字是阻塞套接字时,等待的超时时间就是SO_SNDTIMEO选项指定的发送超时时间。如果这个套接字是非阻塞套接字,则超时时间就是0。也就是说,sk_stream_wait_memory对于非阻塞套接字会直接返回,并将 errno错误码置为EAGAIN

 我们假定使用了阻塞套接字,且等待了足够久的时间,收到了对方的ACK,滑动窗口释放出了缓存。所以,可以将剩下的用户态数据都组成MSS报文拷贝到内核态的缓存队列中。

 最后,调用tcp_push等方法,它最终会调用IP层的方法来发送tcp_write_queue队列中的报文。注意的是,IP层方法返回时,也不意味着报文发送了出去。

 在发送函数处理过程中,Nagle算法、滑动窗口、拥塞窗口都会影响发送操作。

MTU和MSS

 我们都知道TCP/IP架构有五层协议,低层协议的规则会影响到上层协议,比如说数据链路层的最大传输单元MTU和传输层TCP协议的最大报文段长度MSS。

 数据链路层协议会对网络分组的长度进行限制,也就是不能超过其规定的MTU,例如以太网限制为1500字节,802.3限制为1492字节。但是,需要注意的时,现在有些网卡具备自动分包功能,所以也可以传输远大于MTU的帧

 网络层的IP协议试图发送报文时,若一个报文的长度大于MTU限制,就会被分成若干个小于MTU的报文,每个报文都会有独立的IP头部。IP协议能自动获取所在局域网的MTU值,然后按照这个MTU来分片。IP协议的分片机制对于传输层是透明的,接收方的IP协议会根据收到的多个IP包头部,将发送方IP层分片出的IP包重组为一个消息。

 这种IP层的分片效率是很差的,因为首先做了额外的分片操作,然后所有分片都到达后,接收方才能重组成一个包,其中任何一个分片丢失了,都必须重发所有分片。

 所以,TCP层为了避免IP层执行数据报分片定义了最大报文段长度MSS。在TCP建立连接时会通知各自期望接收到的MSS的大小。

 需要注意的是MSS的值是预估值。两台主机只是根据其所在局域网的计算MSS,但是TCP连接上可能会穿过许多中间网络,这些网络分别具有不同的数据链路层,导致问题。比如说,若中间途径的MTU小于两台主机所在的网络MTU时,选定的MSS仍然太大了,会导致中间路由器出现IP层的分片或者直接返回错误(设置IP头部的DF标志位)。

 比如阿里中间件的这篇文章(链接不见的话,请看文末)所说,当上述情况发生时,可能会导致卡死状态,比如scp的时候不动了,或者其他更复杂操作的时候不动了,卡死的状态。

滑动窗口

 IP层协议属于不可靠的协议,IP层并不关心数据是否发送到了接收方,TCP通过确认机制来保证数据传输的可靠性。

 除了保证数据必定发送到对端,TCP还要解决包乱序(reordering)和流控的问题。包乱序和流控会涉及滑动窗口和接收报文的out_of_order队列,另外拥塞控制算法也会处理流控,详情请看TCP拥塞控制算法简介

 TCP头里有一个字段叫Window,又叫Advertised-Window,这个字段是接收端告诉发送端自己还有多少缓冲区可以接收数据。于是发送端就可以根据这个接收端的处理能力来发送数据,否则会导致接收端处理不过来。

 我们可以将TCP缓冲区中的数据分为以下四类,并把它们看作一个时间轴

  • Sent and Acknowledged: 表示已经发送成功并已经被确认的数据,比如图中的前31个字节的数据

  • Send But Not Yet Acknowledged:表示发送但没有被确认的数据,数据被发送出去,没有收到接收端的ACK,认为并没有完成发送,这个属于窗口内的数据。

  • Not Sent,Recipient Ready to Receive:表示需要尽快发送的数据,这部分数据已经被加载到缓存等待发送,也就是窗口中。接收方ACK表示能够接受这些包,所以发送方需要尽快发送这些包。

  • Not Sent,Recipient Not Ready to Receive: 表示属于未发送,同时接收端也不允许发送的,因为这些数据已经超出了发送端所接收的范围

 除了四种不同范畴的数据外,我们可以看到上边的示意图中还有三种窗口。

  • Window Already:已经发送了,但是没有收到ACK,和Send But Not Yet Acknowledged部分重合。
  • Usable Window : 可用窗口,和Not Sent,Recipient Ready to Receive部分重合
  • Send Window: 真正的窗口大小。建立连接时接收方会告知发送方自己能够处理的发送窗口大小,同时在接收过程中也不断的通告可以发送窗口大小,来实时调节。

 下面,我们来看一下滑动窗口的滑动。下图是窗口滑动窗口的示意图。

 当发送方收到发送数据的确认消息时,会移动发送窗口。比如上图中,接收到36字节的确认,将其之前的5个字节都移除窗口,发出了46-51的字节,将52到56的字节加入到可用窗口。

 下面我们来看一下整体的示意图。

 图片来源为tcpipguide.

 Client端窗口的不同颜色的矩形块代表的含义和上边滑动窗口示意图的含义相同。我们只简单看一下第二三四步。接收端发送的TCP报文window为260,表示发送窗口减少100,可以发现黑色矩形缩短了。并且ack为141,所以发送端将140个字节的数据从滑动窗口中移除,从Send But Not Yet Acknowledged变为Sent and Acknowledged,也就是从蓝色变成紫色。然后发送端发送180字节的数据,就有180字节的数据从Not Sent,Recipient Ready to Receive变为Send But Not Yet Acknowledged,也就是从绿色变为蓝色。

Nagle算法

 上述滑动窗口会出现一种Silly Window Syndrome的问题,当接收方来不及取走Receive Windows里的数据,会导致发送方的窗口越来越小。到最后,如果接收方腾出几个字节并告诉发送方现在有几个字节的window,而我们的发送方会义无反顾地发送这几个字节。

 只为了发送几个字节,要加上TCP和IP头的40多个字节。这样,效率太低,就像你搬运物品,明明一次可以全部搬完,但是却偏偏一次只搬一个物品,来回搬多次。

 为此,TCP引入了Nagle算法。应用进程调用发送方法时,可能每次只发送小块数据,造成这台机器发送了许多小的TCP报文。对于整个网络的执行效率来说,小的TCP报文会增加网络拥塞的可能。因此,如果有可能,应该将相临的TCP报文合并成一个较大的TCP报文(当然还是小于MSS的)发送。

 Nagle算法的规则(可参考tcp_output.c文件里tcp_nagle_check函数注释):

  • 如果包长度达到MSS,则允许发送;
  • 如果该包含有FIN,则允许发送;
  • 设置了TCP_NODELAY选项,则允许发送;
  • 未设置TCP_CORK选项时,若所有发出去的小数据包(包长度小于MSS)均被确认,则允许发送;
  • 上述条件都未满足,但发生了超时(一般为200ms),则立即发送。

当对请求的时延非常在意且网络环境非常好的时候(例如同一个机房内),Nagle算法可以关闭。使用TCP_NODELAY套接字选项就可以关闭Nagle算法

个人博客: Remcarpediem

参考

Share

基于Redis和Lua的分布式限流

 Java单机限流可以使用AtomicInteger,RateLimiter或Semaphore来实现,但是上述方案都不支持集群限流。集群限流的应用场景有两个,一个是网关,常用的方案有Nginx限流和Spring Cloud Gateway,另一个场景是与外部或者下游服务接口的交互,因为接口限制必须进行限流。

 本文的主要内容为:

  • Redis和Lua的使用场景和注意事项,特别是KEY映射的问题
  • Spring Cloud Gateway中限流的实现

集群限流的难点

 在上篇Guava RateLimiter的分析文章
中,我们学习了令牌桶限流算法的原理,下面我们就探讨一下,如果将RateLimiter扩展,让它支持集群限流,会遇到哪些问题。

RateLimiter会维护两个关键的参数nextFreeTicketMicrosstoredPermits,它们分别是下一次填充时间和当前存储的令牌数。当RateLimiteracquire函数被调用时,也就是有线程希望获取令牌时,RateLimiter会对比当前时间和nextFreeTicketMicros,根据二者差距,刷新storedPermits,然后再判断更新后的storedPermits是否足够,足够则直接返回,否则需要等待直到令牌足够(Guava RateLimiter的实现比较特殊,并不是当前获取令牌的线程等待,而是下一个获取令牌的线程等待)。

 由于要支持集群限流,所以nextFreeTicketMicrosstoredPermits这两个参数不能只存在JVM的内存中,必须有一个集中式存储的地方。而且,由于算法要先获取两个参数的值,计算后在更新两个数值,这里涉及到竞态限制,必须要处理并发问题。

 集群限流由于会面对相比单机更大的流量冲击,所以一般不会进行线程等待,而是直接进行丢弃,因为如果让拿不到令牌的线程进行睡眠,会导致大量的线程堆积,线程持有的资源也不会释放,反而容易拖垮服务器。

Redis和Lua

 分布式限流本质上是一个集群并发问题,Redis单进程单线程的特性,天然可以解决分布式集群的并发问题。所以很多分布式限流都基于Redis,比如说Spring Cloud的网关组件Gateway。

 Redis执行Lua脚本会以原子性方式进行,单线程的方式执行脚本,在执行脚本时不会再执行其他脚本或命令。并且,Redis只要开始执行Lua脚本,就会一直执行完该脚本再进行其他操作,所以Lua脚本中不能进行耗时操作。使用Lua脚本,还可以减少与Redis的交互,减少网络请求的次数。

 Redis中使用Lua脚本的场景有很多,比如说分布式锁,限流,秒杀等,总结起来,下面两种情况下可以使用Lua脚本:

  • 使用 Lua 脚本实现原子性操作的CAS,避免不同客户端先读Redis数据,经过计算后再写数据造成的并发问题。
  • 前后多次请求的结果有依赖时,使用 Lua 脚本将多个请求整合为一个请求。

 但是使用Lua脚本也有一些注意事项:

  • 要保证安全性,在 Lua 脚本中不要定义自己的全局变量,以免污染 Redis内嵌的Lua环境。因为Lua脚本中你会使用一些预制的全局变量,比如说redis.call()
  • 要注意 Lua 脚本的时间复杂度,Redis 的单线程同样会阻塞在 Lua 脚本的执行中。
  • 使用 Lua 脚本实现原子操作时,要注意如果 Lua 脚本报错,之前的命令无法回滚,这和Redis所谓的事务机制是相同的。
  • 一次发出多个 Redis 请求,但请求前后无依赖时,使用 pipeline,比 Lua 脚本方便。
  • Redis要求单个Lua脚本操作的key必须在同一个Redis节点上。解决方案可以看下文对Gateway原理的解析。

性能测试

 Redis虽然以单进程单线程模型进行操作,但是它的性能却十分优秀。总结来说,主要是因为:

  • 绝大部分请求是纯粹的内存操作
  • 采用单线程,避免了不必要的上下文切换和竞争条件
  • 内部实现采用非阻塞IO和epoll,基于epoll自己实现的简单的事件框架。epoll中的读、写、关闭、连接都转化成了事件,然后利用epoll的多路复用特性,绝不在io上浪费一点时间。

 所以,在集群限流时使用Redis和Lua的组合并不会引入过多的性能损耗。我们下面就简单的测试一下,顺便熟悉一下涉及的Redis命令。

# test.lua脚本的内容
local test = redis.call("get", "test")
local time = redis.call("get", "time")
redis.call("setex", "test", 10, "xx")
redis.call("setex", "time", 10, "xx")
return {test, time}

# 将脚本导入redis,之后调用不需再传递脚本内容
redis-cli -a 082203 script load "$(cat test.lua)"
"b978c97518ae7c1e30f246d920f8e3c321c76907"
# 使用redis-benchmark和evalsha来执行lua脚本
redis-benchmark -a 082203 -n 1000000 evalsha b978c97518ae7c1e30f246d920f8e3c321c76907 0 
======
1000000 requests completed in 20.00 seconds
50 parallel clients
3 bytes payload
keep alive: 1

93.54% <= 1 milliseconds
99.90% <= 2 milliseconds
99.97% <= 3 milliseconds
99.98% <= 4 milliseconds
99.99% <= 5 milliseconds
100.00% <= 6 milliseconds
100.00% <= 7 milliseconds
100.00% <= 7 milliseconds
49997.50 requests per second

 通过上述简单的测试,我们可以发现本机情况下,使用Redis执行Lua脚本的性能极其优秀,一百万次执行,99.99%在5毫秒以下。

 本来想找一下官方的性能数据,但是针对Redis + Lua的性能数据较少,只找到了几篇个人博客,感兴趣的同学可以去探索。这篇文章有Lua和zadd的性能比较(具体数据请看原文,链接缺失的话,请看文末)。

以上lua脚本的性能大概是zadd的70%-80%,但是在可接受的范围内,在生产环境可以使用。负载大概是zadd的1.5-2倍,网络流量相差不大,IO是zadd的3倍,可能是开启了AOF,执行了三次操作。

Spring Cloud Gateway的限流实现

Gateway是微服务架构Spring Cloud的网关组件,它基于Redis和Lua实现了令牌桶算法的限流功能,下面我们就来看一下它的原理和细节吧。

Gateway基于Filter模式,提供了限流过滤器RequestRateLimiterGatewayFilterFactory。只需在其配置文件中进行配置,就可以使用。具体的配置感兴趣的同学自行学习,我们直接来看它的实现。

RequestRateLimiterGatewayFilterFactory依赖RedisRateLimiterisAllowed函数来判断一个请求是否要被限流抛弃。

public Mono<Response> isAllowed(String routeId, String id) {
        //routeId是ip地址,id是使用KeyResolver获取的限流维度id,比如说基于uri,IP或者用户等等。
	Config routeConfig = loadConfiguration(routeId);
	// 每秒能够通过的请求数
	int replenishRate = routeConfig.getReplenishRate();
	// 最大流量
	int burstCapacity = routeConfig.getBurstCapacity();
	try {
	    // 组装Lua脚本的KEY
		List<String> keys = getKeys(id);
		// 组装Lua脚本需要的参数,1是指一次获取一个令牌
		List<String> scriptArgs = Arrays.asList(replenishRate + "",
				burstCapacity + "", Instant.now().getEpochSecond() + "", "1");
		// 调用Redis,tokens_left = redis.eval(SCRIPT, keys, args)
		Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys,
				scriptArgs);
	..... // 省略			
}
static List<String> getKeys(String id) {
	String prefix = "request_rate_limiter.{" + id;
	String tokenKey = prefix + "}.tokens";
	String timestampKey = prefix + "}.timestamp";
	return Arrays.asList(tokenKey, timestampKey);
}				

 需要注意的是getKeys函数的prefix包含了”{id}”,这是为了解决Redis集群键值映射问题。Redis的KeySlot算法中,如果key包含{},就会使用第一个{}内部的字符串作为hash key,这样就可以保证拥有同样{}内部字符串的key就会拥有相同slot。Redis要求单个Lua脚本操作的key必须在同一个节点上,但是Cluster会将数据自动分布到不同的节点,使用这种方法就解决了上述的问题。

 然后我们来看一下Lua脚本的实现,该脚本就在Gateway项目的resource文件夹下。它就是如同GuavaRateLimiter一样,实现了令牌桶算法,只不过不在需要进行线程休眠,而是直接返回是否能够获取。

local tokens_key = KEYS[1]   -- request_rate_limiter.${id}.tokens 令牌桶剩余令牌数的KEY值
local timestamp_key = KEYS[2] -- 令牌桶最后填充令牌时间的KEY值

local rate = tonumber(ARGV[1])  -- replenishRate 令令牌桶填充平均速率
local capacity = tonumber(ARGV[2]) -- burstCapacity 令牌桶上限
local now = tonumber(ARGV[3]) -- 得到从 1970-01-01 00:00:00 开始的秒数
local requested = tonumber(ARGV[4]) -- 消耗令牌数量,默认 1 

local fill_time = capacity/rate   -- 计算令牌桶填充满令牌需要多久时间
local ttl = math.floor(fill_time*2)  -- *2 保证时间充足


local last_tokens = tonumber(redis.call("get", tokens_key)) 
-- 获得令牌桶剩余令牌数
if last_tokens == nil then  -- 第一次时,没有数值,所以桶时满的
  last_tokens = capacity
end

local last_refreshed = tonumber(redis.call("get", timestamp_key)) 
-- 令牌桶最后填充令牌时间
if last_refreshed == nil then
  last_refreshed = 0
end

local delta = math.max(0, now-last_refreshed)  
-- 获取距离上一次刷新的时间间隔
local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) 
-- 填充令牌,计算新的令牌桶剩余令牌数 填充不超过令牌桶令牌上限。

local allowed = filled_tokens >= requested      
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
-- 若成功,令牌桶剩余令牌数(new_tokens) 减消耗令牌数( requested ),并设置获取成功( allowed_num = 1 ) 。
  new_tokens = filled_tokens - requested
  allowed_num = 1
end       

-- 设置令牌桶剩余令牌数( new_tokens ) ,令牌桶最后填充令牌时间(now) ttl是超时时间?
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)

-- 返回数组结果
return { allowed_num, new_tokens }

后记

 Redis的主从异步复制机制可能丢失数据,出现限流流量计算不准确的情况,当然限流毕竟不同于分布式锁这种场景,对于结果的精确性要求不是很高,即使多流入一些流量,也不会影响太大。
 正如Martin在他质疑Redis分布式锁RedLock文章中说的,Redis的数据丢弃了也无所谓时再使用Redis存储数据。

I think it’s a good fit in situations where you want to share some transient, approximate, fast-changing data between servers, and where it’s not a big deal if you occasionally lose that data for whatever reason

 接下来我们回来学习阿里开源的分布式限流组件sentinel,希望大家持续关注。

参考

Share

超详细的Guava RateLimiter限流原理解析

 限流是保护高并发系统的三把利器之一,另外两个是缓存和降级。限流在很多场景中用来限制并发和请求量,比如说秒杀抢购,保护自身系统和下游系统不被巨型流量冲垮等。

 限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务或进行流量整形。

 常用的限流方式和场景有:限制总并发数(比如数据库连接池、线程池)、限制瞬时并发数(如nginx的limit_conn模块,用来限制瞬时并发连接数,Java的Semaphore也可以实现)、限制时间窗口内的平均速率(如Guava的RateLimiter、nginx的limit_req模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制MQ的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。

 比如说,我们需要限制方法被调用的并发数不能超过100(同一时间并发数),则我们可以用信号量Semaphore实现。可如果我们要限制方法在一段时间内平均被调用次数不超过100,则需要使用RateLimiter

限流的基础算法

 我们先来讲解一下两个限流相关的基本算法:漏桶算法和令牌桶算法。

漏桶算法

 从上图中,我们可以看到,就像一个漏斗一样,进来的水量就好像访问流量一样,而出去的水量就像是我们的系统处理请求一样。当访问流量过大时,这个漏斗中就会积水,如果水太多了就会溢出。

 漏桶算法的实现往往依赖于队列,请求到达如果队列未满则直接放入队列,然后有一个处理器按照固定频率从队列头取出请求进行处理。如果请求量大,则会导致队列满,那么新来的请求就会被抛弃。

漏桶算法示意图

令牌桶算法则是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。桶中存放的令牌数有最大上限,超出之后就被丢弃或者拒绝。当流量或者网络请求到达时,每个请求都要获取一个令牌,如果能够获取到,则直接处理,并且令牌桶删除一个令牌。如果获取不同,该请求就要被限流,要么直接丢弃,要么在缓冲区等待。

令牌桶算法示意图

令牌桶和漏桶对比:

  • 令牌桶是按照固定速率往桶中添加令牌,请求是否被处理需要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求;漏桶则是按照常量固定速率流出请求,流入请求速率任意,当流入的请求数累积到漏桶容量时,则新流入的请求被拒绝;

  • 令牌桶限制的是平均流入速率,允许突发请求,只要有令牌就可以处理,支持一次拿3个令牌,4个令牌;漏桶限制的是常量流出速率,即流出速率是一个固定常量值,比如都是1的速率流出,而不能一次是1,下次又是2,从而平滑突发流入速率;

  • 令牌桶允许一定程度的突发,而漏桶主要目的是平滑流出速率;

Guava RateLimiter

Guava是Java领域优秀的开源项目,它包含了Google在Java项目中使用一些核心库,包含集合(Collections),缓存(Caching),并发编程库(Concurrency),常用注解(Common annotations),String操作,I/O操作方面的众多非常实用的函数。
 Guava的RateLimiter提供了令牌桶算法实现:平滑突发限流(SmoothBursty)和平滑预热限流(SmoothWarmingUp)实现。

类图

RateLimiter的类图如上所示,其中RateLimiter是入口类,它提供了两套工厂方法来创建出两个子类。这很符合《Effective Java》中的用静态工厂方法代替构造函数的建议,毕竟该书的作者也正是Guava库的主要维护者,二者配合”食用”更佳。

// RateLimiter提供了两个工厂方法,最终会调用下面两个函数,生成RateLimiter的两个子类。
static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}
static RateLimiter create(
    SleepingStopwatch stopwatch, double permitsPerSecond, long warmupPeriod, TimeUnit unit,
    double coldFactor) {
    RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
}

平滑突发限流

 使用RateLimiter的静态方法创建一个限流器,设置每秒放置的令牌数为5个。返回的RateLimiter对象可以保证1秒内不会给超过5个令牌,并且以固定速率进行放置,达到平滑输出的效果。

public void testSmoothBursty() {
    RateLimiter r = RateLimiter.create(5);
    while (true) {
      System.out.println("get 1 tokens: " + r.acquire() + "s");
    }
    /**
     * output: 基本上都是0.2s执行一次,符合一秒发放5个令牌的设定。
     * get 1 tokens: 0.0s 
     * get 1 tokens: 0.182014s
     * get 1 tokens: 0.188464s
     * get 1 tokens: 0.198072s
     * get 1 tokens: 0.196048s
     * get 1 tokens: 0.197538s
     * get 1 tokens: 0.196049s
     */
}

RateLimiter使用令牌桶算法,会进行令牌的累积,如果获取令牌的频率比较低,则不会导致等待,直接获取令牌。

public void testSmoothBursty2() {
    RateLimiter r = RateLimiter.create(2);
    while (true)
    {
      System.out.println("get 1 tokens: " + r.acquire(1) + "s");
      try {
        Thread.sleep(2000);
      } catch (Exception e) {}
      System.out.println("get 1 tokens: " + r.acquire(1) + "s");
      System.out.println("get 1 tokens: " + r.acquire(1) + "s");
      System.out.println("get 1 tokens: " + r.acquire(1) + "s");
      System.out.println("end");
      /**
       * output:
       * get 1 tokens: 0.0s
       * get 1 tokens: 0.0s
       * get 1 tokens: 0.0s
       * get 1 tokens: 0.0s
       * end
       * get 1 tokens: 0.499796s
       * get 1 tokens: 0.0s
       * get 1 tokens: 0.0s
       * get 1 tokens: 0.0s
       */
    }
}

RateLimiter由于会累积令牌,所以可以应对突发流量。在下面代码中,有一个请求会直接请求5个令牌,但是由于此时令牌桶中有累积的令牌,足以快速响应。
RateLimiter在没有足够令牌发放时,采用滞后处理的方式,也就是前一个请求获取令牌所需等待的时间由下一次请求来承受,也就是代替前一个请求进行等待。

public void testSmoothBursty3() {
    RateLimiter r = RateLimiter.create(5);
    while (true)
    {
      System.out.println("get 5 tokens: " + r.acquire(5) + "s");
      System.out.println("get 1 tokens: " + r.acquire(1) + "s");
      System.out.println("get 1 tokens: " + r.acquire(1) + "s");
      System.out.println("get 1 tokens: " + r.acquire(1) + "s");
      System.out.println("end");
      /**
       * output:
       * get 5 tokens: 0.0s
       * get 1 tokens: 0.996766s 滞后效应,需要替前一个请求进行等待
       * get 1 tokens: 0.194007s
       * get 1 tokens: 0.196267s
       * end
       * get 5 tokens: 0.195756s
       * get 1 tokens: 0.995625s 滞后效应,需要替前一个请求进行等待
       * get 1 tokens: 0.194603s
       * get 1 tokens: 0.196866s
       */
    }
}

平滑预热限流

RateLimiterSmoothWarmingUp是带有预热期的平滑限流,它启动后会有一段预热期,逐步将分发频率提升到配置的速率。
 比如下面代码中的例子,创建一个平均分发令牌速率为2,预热期为3分钟。由于设置了预热时间是3秒,令牌桶一开始并不会0.5秒发一个令牌,而是形成一个平滑线性下降的坡度,频率越来越高,在3秒钟之内达到原本设置的频率,以后就以固定的频率输出。这种功能适合系统刚启动需要一点时间来“热身”的场景。

public void testSmoothwarmingUp() {
    RateLimiter r = RateLimiter.create(2, 3, TimeUnit.SECONDS);
    while (true)
    {
      System.out.println("get 1 tokens: " + r.acquire(1) + "s");
      System.out.println("get 1 tokens: " + r.acquire(1) + "s");
      System.out.println("get 1 tokens: " + r.acquire(1) + "s");
      System.out.println("get 1 tokens: " + r.acquire(1) + "s");
      System.out.println("end");
      /**
       * output:
       * get 1 tokens: 0.0s
       * get 1 tokens: 1.329289s
       * get 1 tokens: 0.994375s
       * get 1 tokens: 0.662888s  上边三次获取的时间相加正好为3秒
       * end
       * get 1 tokens: 0.49764s  正常速率0.5秒一个令牌
       * get 1 tokens: 0.497828s
       * get 1 tokens: 0.49449s
       * get 1 tokens: 0.497522s
       */
    }
}

源码分析

 看完了RateLimiter的基本使用示例后,我们来学习一下它的实现原理。先了解一下几个比较重要的成员变量的含义。

//SmoothRateLimiter.java
//当前存储令牌数
double storedPermits;
//最大存储令牌数
double maxPermits;
//添加令牌时间间隔
double stableIntervalMicros;
/**
 * 下一次请求可以获取令牌的起始时间
 * 由于RateLimiter允许预消费,上次请求预消费令牌后
 * 下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌
 */
private long nextFreeTicketMicros = 0L;

平滑突发限流

RateLimiter的原理就是每次调用acquire时用当前时间和nextFreeTicketMicros进行比较,根据二者的间隔和添加单位令牌的时间间隔stableIntervalMicros来刷新存储令牌数storedPermits。然后acquire会进行休眠,直到nextFreeTicketMicros

acquire函数如下所示,它会调用reserve函数计算获取目标令牌数所需等待的时间,然后使用SleepStopwatch进行休眠,最后返回等待时间。

public double acquire(int permits) {
    // 计算获取令牌所需等待的时间
    long microsToWait = reserve(permits);
    // 进行线程sleep
    stopwatch.sleepMicrosUninterruptibly(microsToWait);
    return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
    checkPermits(permits);
    // 由于涉及并发操作,所以使用synchronized进行并发操作
    synchronized (mutex()) {
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
    // 计算从当前时间开始,能够获取到目标数量令牌时的时间
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
    // 两个时间相减,获得需要等待的时间
    return max(momentAvailable - nowMicros, 0);
}

reserveEarliestAvailable是刷新令牌数和下次获取令牌时间nextFreeTicketMicros的关键函数。它有三个步骤,一是调用resync函数增加令牌数,二是计算预支付令牌所需额外等待的时间,三是更新下次获取令牌时间nextFreeTicketMicros和存储令牌数storedPermits

 这里涉及RateLimiter的一个特性,也就是可以预先支付令牌,并且所需等待的时间在下次获取令牌时再实际执行。详细的代码逻辑的解释请看注释。

final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    // 刷新令牌数,相当于每次acquire时在根据时间进行令牌的刷新
    resync(nowMicros);
    long returnValue = nextFreeTicketMicros;
    // 获取当前已有的令牌数和需要获取的目标令牌数进行比较,计算出可以目前即可得到的令牌数。
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    // freshPermits是需要预先支付的令牌,也就是目标令牌数减去目前即可得到的令牌数
    double freshPermits = requiredPermits - storedPermitsToSpend;
    // 因为会突然涌入大量请求,而现有令牌数又不够用,因此会预先支付一定的令牌数
    // waitMicros即是产生预先支付令牌的数量时间,则将下次要添加令牌的时间应该计算时间加上watiMicros
    long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
        + (long) (freshPermits * stableIntervalMicros);
    // storedPermitsToWaitTime在SmoothWarmingUp和SmoothBuresty的实现不同,用于实现预热缓冲期
    // SmoothBuresty的storedPermitsToWaitTime直接返回0,所以watiMicros就是预先支付的令牌所需等待的时间
    try {
      // 更新nextFreeTicketMicros,本次预先支付的令牌所需等待的时间让下一次请求来实际等待。
      this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros);
    } catch (ArithmeticException e) {
      this.nextFreeTicketMicros = Long.MAX_VALUE;
    }
    // 更新令牌数,最低数量为0
    this.storedPermits -= storedPermitsToSpend;
    // 返回旧的nextFreeTicketMicros数值,无需为预支付的令牌多加等待时间。
    return returnValue;
}
// SmoothBurest
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
    return 0L;
}

resync函数用于增加存储令牌,核心逻辑就是(nowMicros - nextFreeTicketMicros) / stableIntervalMicros。当前时间大于nextFreeTicketMicros时进行刷新,否则直接返回。

void resync(long nowMicros) {
    // 当前时间晚于nextFreeTicketMicros,所以刷新令牌和nextFreeTicketMicros
    if (nowMicros > nextFreeTicketMicros) {
      // coolDownIntervalMicros函数获取每机秒生成一个令牌,SmoothWarmingUp和SmoothBuresty的实现不同
      // SmoothBuresty的coolDownIntervalMicros直接返回stableIntervalMicros
      // 当前时间减去要更新令牌的时间获取时间间隔,再除以添加令牌时间间隔获取这段时间内要添加的令牌数
      storedPermits = min(maxPermits,
          storedPermits
            + (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros());
      nextFreeTicketMicros = nowMicros;
    }
    // 如果当前时间早于nextFreeTicketMicros,则获取令牌的线程要一直等待到nextFreeTicketMicros,该线程获取令牌所需
    // 额外等待的时间由下一次获取的线程来代替等待。
}
double coolDownIntervalMicros() {
    return stableIntervalMicros;
}

 下面我们举个例子,让大家更好的理解resyncreserveEarliestAvailable函数的逻辑。

 比如RateLimiterstableIntervalMicros为500,也就是1秒发两个令牌,storedPermits为0,nextFreeTicketMicros为1553918495748。线程一acquire(2),当前时间为1553918496248,首先resync函数计算,(1553918496248 - 1553918495748)/500 = 1,所以当前可获取令牌数为1,但是由于可以预支付,所以nextFreeTicketMicros= nextFreeTicketMicro + 1 * 500 = 1553918496748。线程一无需等待。

 紧接着,线程二也来acquire(2),首先resync函数发现当前时间早于nextFreeTicketMicros,所以无法增加令牌数,所以需要预支付2个令牌,nextFreeTicketMicros= nextFreeTicketMicro + 2 * 500 = 1553918497748。线程二需要等待1553918496748时刻,也就是线程一获取时计算的nextFreeTicketMicros时刻。同样的,线程三获取令牌时也需要等待到线程二计算的nextFreeTicketMicros时刻。

平滑预热限流

 上述就是平滑突发限流RateLimiter的实现,下面我们来看一下加上预热缓冲期的实现原理。
SmoothWarmingUp实现预热缓冲的关键在于其分发令牌的速率会随时间和令牌数而改变,速率会先慢后快。表现形式如下图所示,令牌刷新的时间间隔由长逐渐变短。等存储令牌数从maxPermits到达thresholdPermits时,发放令牌的时间价格也由coldInterval降低到了正常的stableInterval。

image.png

SmoothWarmingUp的相关代码如下所示,相关的逻辑都写在注释中。

// SmoothWarmingUp,等待时间就是计算上图中梯形或者正方形的面积。
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
    /**
    * 当前permits超出阈值的部分
    */
    double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
    long micros = 0;
    /**
    * 如果当前存储的令牌数超出thresholdPermits
    */
    if (availablePermitsAboveThreshold > 0.0) {
    /**
     * 在阈值右侧并且需要被消耗的令牌数量
     */
    double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);

    /**
        * 梯形的面积
        *
        * 高 * (顶 * 底) / 2
        *
        * 高是 permitsAboveThresholdToTake 也就是右侧需要消费的令牌数
        * 底 较长 permitsToTime(availablePermitsAboveThreshold)
        * 顶 较短 permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake)
        */
    micros = (long) (permitsAboveThresholdToTake
        * (permitsToTime(availablePermitsAboveThreshold)
        + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake)) / 2.0);
    /**
        * 减去已经获取的在阈值右侧的令牌数
        */
    permitsToTake -= permitsAboveThresholdToTake;
    }
    /**
    * 平稳时期的面积,正好是长乘宽
    */
    micros += (stableIntervalMicros * permitsToTake);
    return micros;
}

double coolDownIntervalMicros() {
    /**
    * 每秒增加的令牌数为 warmup时间/maxPermits. 这样的话,在warmuptime时间内,就就增张的令牌数量
    * 为 maxPermits
    */
    return warmupPeriodMicros / maxPermits;
}

后记

RateLimiter只能用于单机的限流,如果想要集群限流,则需要引入redis或者阿里开源的sentinel中间件,请大家继续关注。

参考

Share

TCP/IP的底层队列

 自从上次学习了TCP/IP的拥塞控制算法后,我越发想要更加深入的了解TCP/IP的一些底层原理,搜索了很多网络上的资料,看到了陶辉大神关于高性能网络编程的专栏,收益颇多。今天就总结一下,并且加上自己的一些思考。

 我自己比较了解Java语言,对Java网络编程的理解就止于Netty框架的使用。Netty的源码贡献者Norman Maurer对于Netty网络开发有过一句建议,”Never block the event loop, reduce context-swtiching”。也就是尽量不要阻塞IO线程,也尽量减少线程切换。我们今天只关注前半句,对这句话感兴趣的同学可以看一下蚂蚁通信框架实践

 为什么不能阻塞读取网络信息的IO线程呢?这里就要从经典的网络C10K开始理解,服务器如何支持并发1万请求。C10K的根源在于网络的IO模型。Linux 中网络处理都用同步阻塞的方式,也就是每个请求都分配一个进程或者线程,那么要支持1万并发,难道就要使用1万个线程处理请求嘛?这1万个线程的调度、上下文切换乃至它们占用的内存,都会成为瓶颈。解决C10K的通用办法就是使用I/O 多路复用,Netty就是这样。

Netty的reactor模型

 Netty有负责服务端监听建立连接的线程组(mainReactor)和负责连接读写操作的IO线程组(subReactor),还可以有专门处理业务逻辑的Worker线程组(ThreadPool)。三者相互独立,这样有很多好处。一是有专门的线程组负责监听和处理网络连接的建立,可以防止TCP/IP的半连接队列(sync)和全连接队列(acceptable)被占满。二是IO线程组和Worker线程分开,双方并行处理网络I/O和业务逻辑,可以避免IO线程被阻塞,防止TCP/IP的接收报文的队列被占满。当然,如果业务逻辑较少,也就是IO 密集型的轻计算业务,可以将业务逻辑放在IO线程中处理,避免线程切换,这也就是Norman Maurer话的后半部分。

 TCP/IP怎么就这么多队列啊?今天我们就来细看一下TCP/IP的几个队列,包括建立连接时的半连接队列(sync),全连接队列(accept)和接收报文时的receive、out_of_order、prequeue以及backlog队列。

建立连接时的队列

TCP三次握手和队列示意图

 如上图所示,这里有两个队列:syns queue(半连接队列)和accept queue(全连接队列)。三次握手中,服务端接收到客户端的SYN报文后,把相关信息放到半连接队列中,同时回复SYN+ACK给客户端。
 第三步的时候服务端收到客户端的ACK,如果这时全连接队列没满,那么从半连接队列拿出相关信息放入到全连接队列中,否则按tcp_abort_on_overflow的值来执行相关操作,直接抛弃或者过一段时间在重试。

接收报文时的队列

 相比于建立连接,TCP在接收报文时的处理逻辑更为复杂,相关的队列和涉及的配置参数更多。

 应用程序接收TCP报文和程序所在服务器系统接收网络里发来的TCP报文是两个独立流程。二者都会操控socket实例,但是会通过锁竞争来决定某一时刻由谁来操控,由此产生很多不同的场景。例如,应用程序正在接收报文时,操作系统通过网卡又接收到报文,这时该如何处理?若应用程序没有调用read或者recv读取报文时,操作系统收到报文又会如何处理?

 我们接下来就以三张图为主,介绍TCP接收报文时的三种场景,并在其中介绍四个接收相关的队列。

接收报文场景一

场景一

上图是TCP接收报文场景一的示意图。操作系统首先接收报文,存储到socket的receive队列,然后用户进程再调用recv进行读取。

1) 当网卡接收报文并且判断为TCP协议时,经过层层调用,最终会调用到内核的tcp_v4_rcv方法。由于当前TCP要接收的下一个报文正是S1,所以tcp_v4_rcv函数将其直接加入到receive队列中。receive队列是将已经接收到的TCP报文,去除了TCP头部、排好序放入的、用户进程可以直接按序读取的队列。由于socket不在用户进程上下文中(也就是没有用户进程在读socket),并且我们需要S1序号的报文,而恰好收到了S1报文,因此,它进入了receive队列。

2) 接收到S3报文,由于TCP要接收的下一个报文序号是S2,所以加入到out_of_order队列,所有乱序的报文会放在这里。

3) 接着,收到了TCP期望的S2报文,直接进入recevie队列。由于此时out_of_order队列不为空,需要检查一下。

4) 每次向receive队列插入报文时都会检查out_of_order队列,由于接收到S2报文后,期望的的序号为S3,所以out_of_order队列中的S3报文会被移到receive队列。

5) 用户进程开始读取socket,先在进程中分配一块内存,然后调用read或者recv方法。socket有一系列的具有默认值的配置属性,比如socket默认是阻塞式的,它的SO_RCVLOWAT属性值默认为1。当然,recv这样的方法还会接收一个flag参数,它可以设置为MSG_WAITALLMSG_PEEKMSG_TRUNK等等,这里我们假定为最常用的0。进程调用了recv方法。

6) 调用tcp_recvmsg方法

7) tcp_recvmsg方法会首先锁住socket。socket是可以被多线程使用的,而且操作系统也会使用,所以必须处理并发问题。要操控socket,就先获取锁。

8) 此时,receive队列已经有3个报文了,将第一个报文拷贝到用户态内存中,由于第五步中socket的参数并没有带MSG_PEEK,所以将第一个报文从队列中移除,从内核态释放掉。反之,MSG_PEEK标志位会导致receive队列不会删除报文。所以,MSG_PEEK主要用于多进程读取同一套接字的情形。

9) 拷贝第二个报文,当然,执行拷贝前都会检查用户态内存的剩余空间是否足以放下当前这个报文,不够时会直接返回已经拷贝的字节数。
10) 拷贝第三个报文。
11) receive队列已经为空,此时会检查SO_RCVLOWAT这个最小阈值。如果已经拷贝字节数小于它,进程会休眠,等待更多报文。默认的SO_RCVLOWAT值为1,也就是读取到报文就可以返回。

12) 检查backlog队列,backlog队列是用户进程正在拷贝数据时,网卡收到的报文会进这个队列。如果此时backlog队列有数据,就顺带处理下。backlog队列是没有数据的,因此释放锁,准备返回用户态。

13) 用户进程代码开始执行,此时recv等方法返回的就是从内核拷贝的字节数。

接收报文场景二

 第二张图给出了第二个场景,这里涉及了prequeue队列。用户进程调用recv方法时,socket队列中没有任何报文,而socket是阻塞的,所以进程睡眠了。然后操作系统收到了报文,此时prequeue队列开始产生作用。该场景中,tcp_low_latency为默认的0,套接字socket的SO_RCVLOWAT是默认的1,仍然是阻塞socket,如下图。

场景二

 其中1,2,3步骤的处理和之前一样。我们直接从第四步开始。

4) 由于此时receive,prequeuebacklog队列都为空,所以没有拷贝一个字节到用户内存中。而socket的配置要求至少拷贝SO_RCVLOWAT也就是1字节的报文,因此进入阻塞式套接字的等待流程。最长等待时间为SO_RCVTIMEO指定的时间。socket在进入等待前会释放socket锁,会使第五步中,新来的报文不再只能进入backlog队列。
5) 接到S1报文,将其加入prequeue队列中。
6) 插入到prequeue队列后,会唤醒在socket上休眠的进程。
7) 用户进程被唤醒后,重新获取socket锁,此后再接收到的报文只能进入backlog队列。
8) 进程先检查receive队列,当然仍然是空的;再去检查prequeue队列,发现有报文S1,正好是正在等待序号的报文,于是直接从prequeue队列中拷贝到用户内存,再释放内核中的这个报文。
9) 目前已经拷贝了一个字节的报文到用户内存,检查这个长度是否超过了最低阈值,也就是len和SO_RCVLOWAT的最小值。
10) 由于SO_RCVLOWAT使用了默认值1,拷贝字节数大于最低阈值,准备返回用户态,顺便会查看一下backlog队列中是否有数据,此时没有,所以准备放回,释放socket锁。
11) 返回用户已经拷贝的字节数。

接收报文场景三

 在第三个场景中,系统参数tcp_low_latency为1,socket上设置了SO_RCVLOWAT属性值。服务器先收到报文S1,但是其长度小于SO_RCVLOWAT。用户进程调用recv方法读取,虽然读取到了一部分,但是没有到达最小阈值,所以进程睡眠了。与此同时,在睡眠前接收的乱序的报文S3直接进入backlog队列。然后,报文S2到达,由于没有使用prequeue队列(因为设置了tcp_low_latency),而它起始序号正是下一个待拷贝的值,所以直接拷贝到用户内存中,总共拷贝字节数已满足SO_RCVLOWAT的要求!最后在返回用户前把backlog队列中S3报文也拷贝给用户。

场景三

1) 接收到报文S1,正是准备接收的报文序号,因此,将它直接加入到有序的receive队列中。
2) 将系统属性tcp_low_latency设置为1,表明服务器希望程序能够及时的接收到TCP报文。用户调用的recv接收阻塞socket上的报文,该socket的SO_RCVLOWAT值大于第一个报文的大小,并且用户分配了足够大的长度为len的内存。
3) 调用tcp_recvmsg方法来完成接收工作,先锁住socket。
4) 准备处理内核各个接收队列中的报文。
5) receive队列中有报文可以直接拷贝,其大小小于len,直接拷贝到用户内存。
6) 在进行第五步的同时,内核又接收到S3报文,此时socket被锁,报文直接进入backlog队列。这个报文并不是有序的。
7) 在第五步时,拷贝报文S1到用户内存,它的大小小于SO_RCVLOWAT的值。由于socket是阻塞型,所以用户进程进入睡眠状态。进入睡眠前,会先处理backlog队列的报文。因为S3报文是失序的,所以进入out_of_order队列。用户进程进入休眠状态前都会先处理一下backlog队列。
8) 进程休眠,直到超时或者receive队列不为空。
9) 内核接收到报文S2。注意,此时由于打开了tcp_low_latency标志位,所以报文是不会进入prequeue队列等待进程处理。
10) 由于报文S2正是要接收的报文,同时,一个用户进程在休眠等待该报文,所以直接将报文S2拷贝到用户内存。
11) 每处理完一个有序报文后,无论是拷贝到receive队列还是直接复制到用户内存,都会检查out_of_order队列,看看是否有报文可以处理。报文S3拷贝到用户内存,然后唤醒用户进程。
12) 唤醒用户进程。
13) 此时会检查已拷贝的字节数是否大于SO_RCVLOWAT,以及backlog队列是否为空。两者皆满足,准备返回。

 总结一下四个队列的作用。

  • receive队列是真正的接收队列,操作系统收到的TCP数据包经过检查和处理后,就会保存到这个队列中。
  • backlog是“备用队列”。当socket处于用户进程的上下文时(即用户正在对socket进行系统调用,如recv),操作系统收到数据包时会将数据包保存到backlog队列中,然后直接返回。
  • prequeue是“预存队列”。当socket没有正在被用户进程使用时,也就是用户进程调用了read或者recv系统调用,但是进入了睡眠状态时,操作系统直接将收到的报文保存在prequeue中,然后返回。
  • out_of_order是“乱序队列”。队列存储的是乱序的报文,操作系统收到的报文并不是TCP准备接收的下一个序号的报文,则放入out_of_order队列,等待后续处理。

后记

 如果你觉得本篇文章对你有帮助,请点个赞。同时欢迎订阅本人的微信公众号。

参考

Share

TCP拥塞控制算法简介

 最近花了些时间在学习TCP/IP协议上,首要原因是由于本人长期以来对TCP/IP的认识就只限于三次握手四次分手上,所以希望深入了解一下。再者,TCP/IP和Linux系统层级的很多设计都可以用于中间件系统架构上,比如说TCP 拥塞控制算法也可以用于以响应时间来限流的中件间。更深一层,像TCP/IP协议这种基础知识和原理性的技术,都是经过长时间的考验的,都是前人智慧的结晶,可以给大家很多启示和帮助。

 本文中会出现一些缩写,因为篇幅问题,无法每个都进行解释,如果你不明白它的含义,请自己去搜索了解,做一个主动寻求知识的人。

 TCP协议有两个比较重要的控制算法,一个是流量控制,另一个就是阻塞控制。

 TCP协议通过滑动窗口来进行流量控制,它是控制发送方的发送速度从而使接受者来得及接收并处理。而拥塞控制是作用于网络,它是防止过多的包被发送到网络中,避免出现网络负载过大,网络拥塞的情况。

 拥塞算法需要掌握其状态机和四种算法。拥塞控制状态机的状态有五种,分别是Open,Disorder,CWR,Recovery和Loss状态。四个算法为慢启动,拥塞避免,拥塞发生时算法和快速恢复。

Congestion Control State Machine

 和TCP一样,拥塞控制算法也有其状态机。当发送方收到一个Ack时,Linux TCP通过状态机(state)来决定其接下来的行为,是应该降低拥塞窗口cwnd大小,或者保持cwnd不变,还是继续增加cwnd。如果处理不当,可能会导致丢包或者超时。

状态机示意图

1 Open状态

 Open状态是拥塞控制状态机的默认状态。这种状态下,当ACK到达时,发送方根据拥塞窗口cwnd(Congestion Window)是小于还是大于慢启动阈值ssthresh(slow start threshold),来按照慢启动或者拥塞避免算法来调整拥塞窗口。

2 Disorder状态

 当发送方检测到DACK(重复确认)或者SACK(选择性确认)时,状态机将转变为Disorder状态。在此状态下,发送方遵循飞行(in-flight)包守恒原则,即一个新包只有在一个老包离开网络后才发送,也就是发送方收到老包的ACK后,才会再发送一个新包。

3 CWR状态

 发送方接收到一个拥塞通知时,并不会立刻减少拥塞窗口cwnd,而是每收到两个ACK就减少一个段,直到窗口的大小减半为止。当cwnd正在减小并且网络中有没有重传包时,这个状态就叫CWR(Congestion Window Reduced,拥塞窗口减少)状态。CWR状态可以转变成Recovery或者Loss状态。

4 Recovery状态

 当发送方接收到足够(推荐为三个)的DACK(重复确认)后,进入该状态。在该状态下,拥塞窗口cnwd每收到两个ACK就减少一个段(segment),直到cwnd等于慢启动阈值ssthresh,也就是刚进入Recover状态时cwnd的一半大小。
 发送方保持 Recovery 状态直到所有进入 Recovery状态时正在发送的数据段都成功地被确认,然后发送方恢复成Open状态,重传超时有可能中断 Recovery 状态,进入Loss状态。

5 Loss状态

 当一个RTO(重传超时时间)到期后,发送方进入Loss状态。所有正在发送的数据标记为丢失,拥塞窗口cwnd设置为一个段(segment),发送方再次以慢启动算法增大拥塞窗口cwnd。

 Loss 和 Recovery 状态的区别是:Loss状态下,拥塞窗口在发送方设置为一个段后增大,而 Recovery 状态下,拥塞窗口只能被减小。Loss 状态不能被其他的状态中断,因此,发送方只有在所有 Loss 开始时正在传输的数据都得到成功确认后,才能退到 Open 状态。

四大算法

 拥塞控制主要是四个算法:1)慢启动,2)拥塞避免,3)拥塞发生,4)快速恢复。这四个算法不是一天都搞出来的,这个四算法的发展经历了很多时间,到今天都还在优化中。

示意图

慢热启动算法 – Slow Start

 所谓慢启动,也就是TCP连接刚建立,一点一点地提速,试探一下网络的承受能力,以免直接扰乱了网络通道的秩序。

 慢启动算法:

1) 连接建好的开始先初始化拥塞窗口cwnd大小为1,表明可以传一个MSS大小的数据。
2) 每当收到一个ACK,cwnd大小加一,呈线性上升。
3) 每当过了一个往返延迟时间RTT(Round-Trip Time),cwnd大小直接翻倍,乘以2,呈指数让升。
4) 还有一个ssthresh(slow start threshold),是一个上限,当cwnd >= ssthresh时,就会进入“拥塞避免算法”(后面会说这个算法)

拥塞避免算法 – Congestion Avoidance

 如同前边说的,当拥塞窗口大小cwnd大于等于慢启动阈值ssthresh后,就进入拥塞避免算法。算法如下:

1) 收到一个ACK,则cwnd = cwnd + 1 / cwnd
2) 每当过了一个往返延迟时间RTT,cwnd大小加一。

 过了慢启动阈值后,拥塞避免算法可以避免窗口增长过快导致窗口拥塞,而是缓慢的增加调整到网络的最佳值。

拥塞状态时的算法

 一般来说,TCP拥塞控制默认认为网络丢包是由于网络拥塞导致的,所以一般的TCP拥塞控制算法以丢包为网络进入拥塞状态的信号。当丢包时,会有以下两种情况。对于丢包有两种判定方式,一种是超时重传RTO[Retransmission Timeout]超时,另一个是收到三个重复确认ACK。

 超时重传是TCP协议保证数据可靠性的一个重要机制,其原理是在发送一个数据以后就开启一个计时器,在一定时间内如果没有得到发送数据报的ACK报文,那么就重新发送数据,直到发送成功为止。

 但是如果发送端接收到3个以上的重复ACK,TCP就意识到数据发生丢失,需要重传。这个机制不需要等到重传定时器超时,所以叫
做快速重传,而快速重传后没有使用慢启动算法,而是拥塞避免算法,所以这又叫做快速恢复算法。

 超时重传RTO[Retransmission Timeout]超时,TCP会重传数据包。TCP认为这种情况比较糟糕,反应也比较强烈:

  • 由于发生丢包,将慢启动阈值ssthresh设置为当前cwnd的一半,即ssthresh = cwnd / 2.
  • cwnd重置为1
  • 进入慢启动过程

 最为早期的TCP Tahoe算法就只使用上述处理办法,但是由于一丢包就一切重来,导致cwnd又重置为1,十分不利于网络数据的稳定传递。

 所以,TCP Reno算法进行了优化。当收到三个重复确认ACK时,TCP开启快速重传Fast Retransmit算法,而不用等到RTO超时再进行重传:

  • cwnd大小缩小为当前的一半
  • ssthresh设置为缩小后的cwnd大小
  • 然后进入快速恢复算法Fast Recovery。

cwnd曲线示意图

快速恢复算法 – Fast Recovery

 TCP Tahoe是早期的算法,所以没有快速恢复算法,而Reno算法有。在进入快速恢复之前,cwnd和ssthresh已经被更改为原有cwnd的一般。快速恢复算法的逻辑如下:

  • cwnd = cwnd + 3 MSS,加3 MSS的原因是因为收到3个重复的ACK。
  • 重传DACKs指定的数据包。
  • 如果再收到DACKs,那么cwnd大小增加一。
  • 如果收到新的ACK,表明重传的包成功了,那么退出快速恢复算法。将cwnd设置为ssthresh,然后进入拥塞避免算法。

快速重传示意图

 如图所示,第五个包发生了丢失,所以导致接收方接收到三次重复ACK,也就是ACK5。所以将ssthresh设置当当时cwnd的一半,也就是6/2 = 3,cwnd设置为3 + 3 = 6。然后重传第五个包。当收到新的ACK时,也就是ACK11,则退出快速恢复阶段,将cwnd重新设置为当前的ssthresh,也就是3,然后进入拥塞避免算法阶段。

后记

 本文为大家大致描述了TCP拥塞控制的一些机制,但是这些拥塞控制还是有很多缺陷和待优化的地方,业界也在不断推出新的拥塞控制算法,比如说谷歌的BBR。这些我们后续也会继续探讨,请大家继续关注。

引用

Share

Spring AOP(二) 修饰者模式和JDK Proxy

 在上边一篇文章中我们介绍了Spring AOP的基本概念,今天我们就来学习一下与AOP实现相关的修饰者模式和Java Proxy相关的原理,为之后源码分析打下基础。

修饰者模式

 Java设计模式中的修饰者模式能动态地给目标对象增加额外的职责(Responsibility)。它使用组合(object composition),即将目标对象作为修饰者对象(代理)的成员变量,由修饰者对象决定调用目标对象的时机和调用前后所要增强的行为。

 装饰模式包含如下组成部分:

  • Component: 抽象构件,也就是目标对象所实现的接口,有operation函数
  • ConcreteComponent: 具体构件,也就是目标对象的类
  • Decorator: 抽象装饰类,也实现了抽象构件接口,也就是目标类和装饰类都实现了相同的接口
  • ConcreteDecorator: 具体装饰类,其中addBeavior函数就是增强的行为,装饰类可以自己决定addBeavior函数和目标对象函数operation函数的调用时机。

修饰者模式的类图

 修饰者模式调用的时序图如下图所示。程序首先创建目标对象,然后创建修饰者对象,并将目标对象传入作为其成员变量。当程序调用修饰者对象的operation函数时,修饰者对象会先调用目标对象的operation函数,然后再调用自己的addBehavior函数。这就是类似于AOP的后置增强器,在目标对象的行为之后添加新的行为。

修饰者模式的时序图

 Spring AOP的实现原理和修饰者模式类似。在上一篇文章中说到AOP的动态代理有两种实现方式,分别是JDK Proxy和cglib。

 如下图所示,JDK Proxy的类结构和上文中修饰者的类图结构类似,都是代理对象和目标对象都实现相同的接口,代理对象持有目标对象和切面对象,并且决定目标函数和切面增强函数的调用时机。
 而cglib的实现略有不同,它没有实现实现相同接口,而是代理对象继承目标对象类。
两种动态代理的对标

 本文后续就讲解一下JDK Proxy的相关源码分析。

JDK Proxy

 JDK提供了Proxy类来实现动态代理的,可通过它的newProxyInstance函数来获得代理对象。JDK还提供了InvocationHandler类,代理对象的函数被调用时,会调用它的invoke函数,程序员可以在其中实现所需的逻辑。

 JDK Proxy的基本语法如下所示。先构造一个InvocationHandler的实现类,然后调用ProxynewProxyInstance函数生成代理对象,传入类加载器,目标对象的接口和自定义的InvocationHandler实例。

public class CustomInvocationHandler implements InvocationHandler {
    private Object target;

    public CustomInvocationHandler(Object target) {
        this.target = target;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        System.out.println("Before invocation");
        Object retVal = method.invoke(target, args);
        System.out.println("After invocation");
        return retVal;
    }
}

CustomInvocationHandler customInvocationHandler = new CustomInvocationHandler(
        helloWord);
//通过Proxy.newProxyInstance生成代理对象
ProxyTest proxy = (ProxyTest) Proxy.newProxyInstance(
        ProxyTest.class.getClassLoader(),
       proxyObj.getClass().getInterfaces(), customInvocationHandler);

生成代理对象

 我们首先来看一下ProxynewProxyInstance函数。newProxyInstance函数的逻辑大致如下:

  • 首先根据传入的目标对象接口动态生成代理类
  • 然后获取代理类的构造函数实例
  • 最后将InvocationHandler作为参数通过反射调用构造函数实例,生成代理类对象。
     具体源码如下所示。
    public static Object newProxyInstance(ClassLoader loader,
                                            Class<?>[] interfaces,
                                            InvocationHandler h)
        throws IllegalArgumentException
    {
    
        final Class<?>[] intfs = interfaces.clone();
        final SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            checkProxyAccess(Reflection.getCallerClass(), loader, intfs);
        }
        // 1 动态生成代理对象的类
        Class<?> cl = getProxyClass0(loader, intfs);
    
        // ... 代码省略,下边代码其实是在try catch中的
        if (sm != null) {
            checkNewProxyPermission(Reflection.getCallerClass(), cl);
        }
        // 2 获取代理类的构造函数
        final Constructor<?> cons = cl.getConstructor(constructorParams);
        final InvocationHandler ih = h;
        if (!Modifier.isPublic(cl.getModifiers())) {
            AccessController.doPrivileged(new PrivilegedAction<Void>() {
                public Void run() {
                    cons.setAccessible(true);
                    return null;
                }
            });
        }
        // 3调用构造函数,传入InvocationHandler对象
        return cons.newInstance(new Object[]{h});
    }
    

getProxyClass0函数的源码如下所示,通过代理类缓存获取代理类信息,如果不存在则会生成代理类。

// 生成代理类
private static Class<?> getProxyClass0(ClassLoader loader,
                                        Class<?>... interfaces) {
    if (interfaces.length > 65535) {
        throw new IllegalArgumentException("interface limit exceeded");
    }

    // 如果已经有Proxy类的缓存则直接返回,否则要进行创建
    return proxyClassCache.get(loader, interfaces);
}

生成代理类

 JDK Proxy通过ProxyClassFactory生成代理类。其apply函数大致逻辑如下:

  • 校验接口是否符合规范
  • 生成代理类的名称和包名
  • 生成代理类字节码
  • 根据字节码生成代理类Class
// 生成代理类的工厂类
private static final class ProxyClassFactory
    implements BiFunction<ClassLoader, Class<?>[], Class<?>>
{
    // 所有代理类名的前缀
    private static final String proxyClassNamePrefix = "$Proxy";

    // 生成唯一类名的原子Long对象
    private static final AtomicLong nextUniqueNumber = new AtomicLong();

    @Override
    public Class<?> apply(ClassLoader loader, Class<?>[] interfaces) {

        Map<Class<?>, Boolean> interfaceSet = new IdentityHashMap<>(interfaces.length);
        for (Class<?> intf : interfaces) {
            // 通过loader找到接口对应的类信息。
            Class<?> interfaceClass = null;
            try {
                interfaceClass = Class.forName(intf.getName(), false, loader);
            } catch (ClassNotFoundException e) {
            }
            if (interfaceClass != intf) {
                throw new IllegalArgumentException(
                    intf + " is not visible from class loader");
            }
            // 判断找出来的类确实是一个接口
            if (!interfaceClass.isInterface()) {
                throw new IllegalArgumentException(
                    interfaceClass.getName() + " is not an interface");
            }
            // 判断接口是否重复
            if (interfaceSet.put(interfaceClass, Boolean.TRUE) != null) {
                throw new IllegalArgumentException(
                    "repeated interface: " + interfaceClass.getName());
            }
        }
        // 代理类的包路径
        String proxyPkg = null;
        int accessFlags = Modifier.PUBLIC | Modifier.FINAL;

        // 记录非公开的代理接口,以便于生成的代理类和原来的类在同一个路径下。 
        for (Class<?> intf : interfaces) {
            int flags = intf.getModifiers();
            if (!Modifier.isPublic(flags)) {
                accessFlags = Modifier.FINAL;
                String name = intf.getName();
                int n = name.lastIndexOf('.');
                String pkg = ((n == -1) ? "" : name.substring(0, n + 1));
                if (proxyPkg == null) {
                    proxyPkg = pkg;
                } else if (!pkg.equals(proxyPkg)) {
                    throw new IllegalArgumentException(
                        "non-public interfaces from different packages");
                }
            }
        }
        // 如果没有非公开的Proxy接口,使用com.sun.proxy报名
        if (proxyPkg == null) {
            proxyPkg = ReflectUtil.PROXY_PACKAGE + ".";
        }

        long num = nextUniqueNumber.getAndIncrement();
        // 默认情况下,代理类的完全限定名为:com.sun.proxy.$Proxy0,$Proxy1……依次递增  
        String proxyName = proxyPkg + proxyClassNamePrefix + num;

        // 生成代理类字节码
        byte[] proxyClassFile = ProxyGenerator.generateProxyClass(
            proxyName, interfaces, accessFlags);
        try {
            // 根据字节码返回相应的Class实例
            return defineClass0(loader, proxyName,
                                proxyClassFile, 0, proxyClassFile.length);
        } catch (ClassFormatError e) {
            throw new IllegalArgumentException(e.toString());
        }
    }
}

 其中关于字节码生成的部分逻辑我们就暂时不深入介绍了,感兴趣的同学可以自行研究。

$Proxy反编译

 我们来看一下生成的代理类的反编译代码。代理类实现了Object的基础函数,比如toStringhasCodeequals,也实现了目标接口中定义的函数,比如说ProxyTest接口的test函数。

$Proxy中函数的实现都是直接调用了InvocationHandlerinvoke函数。

public final class $Proxy0 extends Proxy
  implements ProxyTest 
// 会实现目标接口,但是由于集成了Proxy,所以无法再集成其他类
{
  private static Method m1;
  private static Method m0;
  private static Method m3;
  private static Method m2;
  // 构造函数要传入一个InvocationHandler对象
  public $Proxy0(InvocationHandler paramInvocationHandler)
    throws 
  {
    super(paramInvocationHandler);
  }
  // equal函数
  public final boolean equals(Object paramObject)
    throws 
  {
      try
    {
      return ((Boolean)this.h.invoke(this, m1, new Object[] { paramObject })).booleanValue();
    }
    catch (RuntimeException localRuntimeException)
    {
      throw localRuntimeException;
    }
    catch (Throwable localThrowable)
    {
    }
    throw new UndeclaredThrowableException(localThrowable);
  }

  public final int hashCode()
    throws 
  {
    try
    {
      return ((Integer)this.h.invoke(this, m0, null)).intValue();
    }
    catch (RuntimeException localRuntimeException)
    {
      throw localRuntimeException;
    }
    catch (Throwable localThrowable)
    {
    }
    throw new UndeclaredThrowableException(localThrowable);
  }
  // test函数,也就是ProxyTest接口中定义的函数
  public final void test(String paramString)
    throws 
  {
    try
    {
      // 调用InvocationHandler的invoke函数
      this.h.invoke(this, m3, new Object[] { paramString });
      return;
    }
    catch (RuntimeException localRuntimeException)
    {
      throw localRuntimeException;
    }
    catch (Throwable localThrowable)
    {
    }
    throw new UndeclaredThrowableException(localThrowable);
  }

  public final String toString()
    throws 
  {
    try
    {
      return (String)this.h.invoke(this, m2, null);
    }
    catch (RuntimeException localRuntimeException)
    {
      throw localRuntimeException;
    }
    catch (Throwable localThrowable)
    {
    }
    throw new UndeclaredThrowableException(localThrowable);
  }
  // 获取各个函数的Method对象
  static
  {
    try
    {
      m1 = Class.forName("java.lang.Object").getMethod("equals", new Class[] { Class.forName("java.lang.Object") });
      m0 = Class.forName("java.lang.Object").getMethod("hashCode", new Class[0]);
      m3 = Class.forName("com.proxy.test2.HelloTest").getMethod("say", new Class[] { Class.forName("java.lang.String") });
      m2 = Class.forName("java.lang.Object").getMethod("toString", new Class[0]);
      return;
    }
    catch (NoSuchMethodException localNoSuchMethodException)
    {
      throw new NoSuchMethodError(localNoSuchMethodException.getMessage());
    }
    catch (ClassNotFoundException localClassNotFoundException)
    {
    }
    throw new NoClassDefFoundError(localClassNotFoundException.getMessage());
  }
}

后记

 下一篇文章就是AOP的源码分析了,希望大家继续关注。

Share

Spring AOP(一) AOP基本概念

 Spring框架自诞生之日就拯救我等程序员于水火之中,它有两大法宝,一个是IoC控制反转,另一个便是AOP面向切面编程。今日我们就来破一下它的AOP法宝,以便以后也能自由使出一手AOP大法。

 AOP全名Aspect-oriented programming面向切面编程大法,它有很多兄弟,分别是经常见的面向对象编程,朴素的面向过程编程和神秘的函数式编程等。所谓AOP的具体解释,以及和OOP的区别不清楚的同学可以自行去了解。

 AOP实现的关键在于AOP框架自动创建的AOP代理,AOP代理主要分为静态代理和动态代理。本文就主要讲解AOP的基本术语,然后用一个例子让大家彻底搞懂这些名词,最后介绍一下AOP的两种代理方式:

  • 以AspectJ为代表的静态代理。
  • 以Spring AOP为代表的动态代理。

基本术语

(1)切面(Aspect)

 切面是一个横切关注点的模块化,一个切面能够包含同一个类型的不同增强方法,比如说事务处理和日志处理可以理解为两个切面。切面由切入点和通知组成,它既包含了横切逻辑的定义,也包括了切入点的定义。 Spring AOP就是负责实施切面的框架,它将切面所定义的横切逻辑织入到切面所指定的连接点中。

@Component
@Aspect
public class LogAspect {
}

可以简单地认为, 使用 @Aspect 注解的类就是切面

(2) 目标对象(Target)

 目标对象指将要被增强的对象,即包含主业务逻辑的类对象。或者说是被一个或者多个切面所通知的对象。

(3) 连接点(JoinPoint)

 程序执行过程中明确的点,如方法的调用或特定的异常被抛出。连接点由两个信息确定:

  • 方法(表示程序执行点,即在哪个目标方法)
  • 相对点(表示方位,即目标方法的什么位置,比如调用前,后等)

 简单来说,连接点就是被拦截到的程序执行点,因为Spring只支持方法类型的连接点,所以在Spring中连接点就是被拦截到的方法。

@Before("pointcut()")
public void log(JoinPoint joinPoint) { //这个JoinPoint参数就是连接点
}

(4) 切入点(PointCut)

 切入点是对连接点进行拦截的条件定义。切入点表达式如何和连接点匹配是AOP的核心,Spring缺省使用AspectJ切入点语法。
 一般认为,所有的方法都可以认为是连接点,但是我们并不希望在所有的方法上都添加通知,而切入点的作用就是提供一组规则(使用 AspectJ pointcut expression language 来描述) 来匹配连接点,给满足规则的连接点添加通知。

@Pointcut("execution(* com.remcarpediem.test.aop.service..*(..))")
public void pointcut() {
}

 上边切入点的匹配规则是com.remcarpediem.test.aop.service包下的所有类的所有函数。

(5) 通知(Advice)

 通知是指拦截到连接点之后要执行的代码,包括了“around”、“before”和“after”等不同类型的通知。Spring AOP框架以拦截器来实现通知模型,并维护一个以连接点为中心的拦截器链。

// @Before说明这是一个前置通知,log函数中是要前置执行的代码,JoinPoint是连接点,
@Before("pointcut()")
public void log(JoinPoint joinPoint) { 
}

(6) 织入(Weaving)

 织入是将切面和业务逻辑对象连接起来, 并创建通知代理的过程。织入可以在编译时,类加载时和运行时完成。在编译时进行织入就是静态代理,而在运行时进行织入则是动态代理。

(7) 增强器(Adviser)

 Advisor是切面的另外一种实现,能够将通知以更为复杂的方式织入到目标对象中,是将通知包装为更复杂切面的装配器。Advisor由切入点和Advice组成。
 Advisor这个概念来自于Spring对AOP的支撑,在AspectJ中是没有等价的概念的。Advisor就像是一个小的自包含的切面,这个切面只有一个通知。切面自身通过一个Bean表示,并且必须实现一个默认接口。

// AbstractPointcutAdvisor是默认接口
public class LogAdvisor extends AbstractPointcutAdvisor {
    private Advice advice; // Advice
    private Pointcut pointcut; // 切入点

    @PostConstruct
    public void init() {
        // AnnotationMatchingPointcut是依据修饰类和方法的注解进行拦截的切入点。
        this.pointcut = new AnnotationMatchingPointcut((Class) null, Log.class);
        // 通知
        this.advice = new LogMethodInterceptor();
    }
}

深入理解

 看完了上面的理论部分知识, 我相信还是会有不少朋友感觉AOP 的概念还是很模糊, 对 AOP 的术语理解的还不是很透彻。现在我们就找一个具体的案例来说明一下。
 简单来讲,整个 aspect 可以描述为: 满足 pointcut 规则的 joinpoint 会被添加相应的 advice 操作。我们来看下边这个例子。

@Component
@Aspect // 切面
public class LogAspect {
    private final static Logger LOGGER = LoggerFactory.getLogger(LogAspect.class.getName());
     // 切入点,表达式是指com.remcarpediem.test.aop.service
     // 包下的所有类的所有方法
    @Pointcut("execution(* com.remcarpediem.test.aop.service..*(..))")
    public void aspect() {}
    // 通知,在符合aspect切入点的方法前插入如下代码,并且将连接点作为参数传递
    @Before("aspect()")
    public void log(JoinPoint joinPoint) { //连接点作为参数传入
        if (LOGGER.isInfoEnabled()) {
            // 获得类名,方法名,参数和参数名称。
            Signature signature = joinPoint.getSignature();
            String className = joinPoint.getTarget().getClass().getName();
            String methodName = joinPoint.getSignature().getName();
            Object[] arguments = joinPoint.getArgs();
            MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();

            String[] argumentNames = methodSignature.getParameterNames();

            StringBuilder sb = new StringBuilder(className + "." + methodName + "(");

            for (int i = 0; i< arguments.length; i++) {
                Object argument = arguments[i];
                sb.append(argumentNames[i] + "->");
                sb.append(argument != null ? argument.toString() : "null ");
            }
            sb.append(")");

            LOGGER.info(sb.toString());
        }
    }
}

 上边这段代码是一个简单的日志相关的切面,依次定义了切入点和通知,而连接点作为log的参数传入进来,进行一定的操作,比如说获取连接点函数的名称,参数等。

静态代理模式

 所谓静态代理就是AOP框架会在编译阶段生成AOP代理类,因此也称为编译时增强。ApsectJ是静态代理的实现之一,也是最为流行的。静态代理由于在编译时就生成了代理类,效率相比动态代理要高一些。AspectJ可以单独使用,也可以和Spring结合使用。

动态代理模式

 与静态代理不同,动态代理就是说AOP框架不会去修改编译时生成的字节码,而是在运行时在内存中生成一个AOP代理对象,这个AOP对象包含了目标对象的全部方法,并且在特定的切点做了增强处理,并回调原对象的方法。

 Spring AOP中的动态代理主要有两种方式:JDK动态代理和CGLIB动态代理。

 JDK代理通过反射来处理被代理的类,并且要求被代理类必须实现一个接口。核心类是 InvocationHandler接口 和 Proxy类。
 而当目标类没有实现接口时,Spring AOP框架会使用CGLIB来动态代理目标类。
 CGLIB(Code Generation Library),是一个代码生成的类库,可以在运行时动态的生成某个类的子类。CGLIB是通过继承的方式做的动态代理,因此如果某个类被标记为final,那么它是无法使用CGLIB做动态代理的。核心类是 MethodInterceptor 接口和Enhancer 类

两种动态代理的区别

后记

 AOP的基础知识都比较枯燥,本人也不擅长概念性的文章,不过下一篇文章就是AOP源码分析了,希望大家可以继续关注。

Share

AbstractQueuedSynchronizer超详细原理解析

 今天我们来研究学习一下AbstractQueuedSynchronizer类的相关原理,java.util.concurrent包中很多类都依赖于这个类所提供队列式同步器,比如说常用的ReentranLockSemaphoreCountDownLatch等。
 为了方便理解,我们以一段使用ReentranLock的代码为例,讲解ReentranLock每个方法中有关AQS的使用。

ReentranLock示例

 我们都知道ReentranLock的加锁行为和Synchronized类似,都是可重入的锁,但是二者的实现方式确实完全不同的,我们之后也会讲解Synchronized的原理。除此之外,Synchronized的阻塞无法被中断,而ReentrantLock则提供了可中断的阻塞。下面的代码是ReentranLock的函数,我们就以此为顺序,依次讲解这些函数背后的实现原理。

ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.unlock();

公平锁和非公平锁

ReentranLock分为公平锁和非公平锁,二者的区别就在获取锁机会是否和排队顺序相关。我们都知道,如果锁被另一个线程持有,那么申请锁的其他线程会被挂起等待,加入等待队列。理论上,先调用lock函数被挂起等待的线程应该排在等待队列的前端,后调用的就排在后边。如果此时,锁被释放,需要通知等待线程再次尝试获取锁,公平锁会让最先进入队列的线程获得锁。而非公平锁则会唤醒所有线程,让它们再次尝试获取锁,所以可能会导致后来的线程先获得了锁,则就是非公平。

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

 我们会发现FairSyncNonfairSync都继承了Sync类,而Sync的父类就是AbstractQueuedSynchronizer(后续简称AQS)。但是AQS的构造函数是空的,并没有任何操作。
 之后的源码分析,如果没有特别说明,就是指公平锁。

lock操作

ReentranLocklock函数如下所示,直接调用了synclock函数。也就是调用了FairSynclock函数。

    //ReentranLock
    public void lock() {
        sync.lock();
    }
    //FairSync
    final void lock() {
        //调用了AQS的acquire函数,这是关键函数之一
        acquire(1);
    }

 我们接下来就正式开始AQS相关的源码分析了,acquire函数的作用是获取同一时间段内只能被一个线程获取的量,这个量就是抽象化的锁概念。我们先分析代码,你慢慢就会明白其中的含义。

public final void acquire(int arg) {
	// tryAcquire先尝试获取"锁",获取了就不进入后续流程
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //addWaiter是给当前线程创建一个节点,并将其加入等待队列
        //acquireQueued是当线程已经加入等待队列之后继续尝试获取锁.
        selfInterrupt();
}

tryAcquireaddWaiteracquireQueued都是十分重要的函数,我们接下来依次学习一下这些函数,理解它们的作用。

//AQS类中的变量.
private volatile int state;
//这是FairSync的实现,AQS中未实现,子类按照自己的需要实现该函数
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    //获取AQS中的state变量,代表抽象概念的锁.
    int c = getState();
    if (c == 0) { //值为0,那么当前独占性变量还未被线程占有
        //如果当前阻塞队列上没有先来的线程在等待,UnfairSync这里的实现就不一致
        if (!hasQueuedPredecessors() && 
            compareAndSetState(0, acquires)) {
            //成功cas,那么代表当前线程获得该变量的所有权,也就是说成功获得锁
            setExclusiveOwnerThread(current);
            // setExclusiveOwnerThread将本线程设置为独占性变量所有者线程
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        //如果该线程已经获取了独占性变量的所有权,那么根据重入性
        //原理,将state值进行加1,表示多次lock
        //由于已经获得锁,该段代码只会被一个线程同时执行,所以不需要
        //进行任何并行处理
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //上述情况都不符合,说明获取锁失败
    return false;
}

 由上述代码我们可以发现,tryAcquire就是尝试获取那个线程独占的变量state。state的值表示其状态:如果是0,那么当前还没有线程独占此变量;否在就是已经有线程独占了这个变量,也就是代表已经有线程获得了锁。但是这个时候要再进行一次判断,看是否是当前线程自己获得的这个锁,如果是,就增加state的值。

ReentranLock获得锁

 这里有几点需要说明一下,首先是compareAndSetState函数,这是使用CAS操作来设置state的值,而且state值设置了volatile修饰符,通过这两点来确保修改state的值不会出现多线程问题。然后是公平锁和非公平锁的区别问题,在UnfairSyncnonfairTryAcquire函数中不会在相同的位置上调用hasQueuedPredecessors来判断当前是否已经有线程在排队等待获得锁。

 如果tryAcquire返回true,那么就是获取锁成功;如果返回false,那么就是未获得锁,需要加入阻塞等待队列。我们下面就来看一下addWaiter的相关操作。

等待锁的阻塞队列

 将保存当前线程信息的节点加入到等待队列的相关函数中涉及到了无锁队列的相关算法,由于在AQS中只是将节点添加到队尾,使用到的无锁算法也相对简单。真正的无锁队列的算法我们等到分析ConcurrentSkippedListMap时在进行讲解。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    //先使用快速入列法来尝试一下,如果失败,则进行更加完备的入列算法.
    //只有在必要的情况下才会使用更加复杂耗时的算法,也就是乐观的态度
    Node pred = tail; //列尾指针
    if (pred != null) {
        node.prev = pred; //步骤1:该节点的前趋指针指向tail
        if (compareAndSetTail(pred, node)){ //步骤二:cas将尾指针指向该节点
            pred.next = node;//步骤三:如果成果,让旧列尾节点的next指针指向该节点.
            return node;
        }
    }
    //cas失败,或在pred == null时调用enq
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) { //cas无锁算法的标准for循环,不停的尝试
        Node t = tail;
        if (t == null) { //初始化
            if (compareAndSetHead(new Node())) 
              //需要注意的是head是一个哨兵的作用,并不代表某个要获取锁的线程节点
                tail = head;
        } else {
            //和addWaiter中一致,不过有了外侧的无限循环,不停的尝试,自旋锁
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

 通过调用addWaiter函数,AQS将当前线程加入到了等待队列,但是还没有阻塞当前线程的执行,接下来我们就来分析一下acquireQueued函数。

等待队列节点的操作

 由于进入阻塞状态的操作会降低执行效率,所以,AQS会尽力避免试图获取独占性变量的线程进入阻塞状态。所以,当线程加入等待队列之后,acquireQueued会执行一个for循环,每次都判断当前节点是否应该获得这个变量(在队首了)。如果不应该获取或在再次尝试获取失败,那么就调用shouldParkAfterFailedAcquire判断是否应该进入阻塞状态。如果当前节点之前的节点已经进入阻塞状态了,那么就可以判定当前节点不可能获取到锁,为了防止CPU不停的执行for循环,消耗CPU资源,调用parkAndCheckInterrupt函数来进入阻塞状态。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) { //一直执行,直到获取锁,返回.
            final Node p = node.predecessor(); 
            //node的前驱是head,就说明,node是将要获取锁的下一个节点.
            if (p == head && tryAcquire(arg)) { //所以再次尝试获取独占性变量
                setHead(node); //如果成果,那么就将自己设置为head
                p.next = null; // help GC
                failed = false;
                return interrupted;
                //此时,还没有进入阻塞状态,所以直接返回false,表示不需要中断调用selfInterrupt函数
            }
            //判断是否要进入阻塞状态.如果`shouldParkAfterFailedAcquire`
            //返回true,表示需要进入阻塞
            //调用parkAndCheckInterrupt;否则表示还可以再次尝试获取锁,继续进行for循环
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                //调用parkAndCheckInterrupt进行阻塞,然后返回是否为中断状态
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) //前一个节点在等待独占性变量释放的通知,所以,当前节点可以阻塞
        return true;
    if (ws > 0) { //前一个节点处于取消获取独占性变量的状态,所以,可以跳过去
        //返回false
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //将上一个节点的状态设置为signal,返回false,
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); //将AQS对象自己传入
    return Thread.interrupted();
}

阻塞和中断

 由上述分析,我们知道了AQS通过调用LockSupportpark方法来执行阻塞当前进程的操作。其实,这里的阻塞就是线程不再执行的含义,通过调用这个函数,线程进入阻塞状态,上述的lock操作也就阻塞了,等待中断或在独占性变量被释放。

public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);//设置阻塞对象,用来记录线程被谁阻塞的,用于线程监控和分析工具来定位
    UNSAFE.park(false, 0L);//让当前线程不再被线程调度,就是当前线程不再执行.
    setBlocker(t, null);
}

 关于中断的相关知识,我们以后再说,就继续沿着AQS的主线,看一下释放独占性变量的相关操作吧。

ReentrantLock未获得阻塞,加入队列

unlock操作

 与lock操作类似,unlock操作调用了AQSrelase方法,参数和调用acquire时一样,都是1。

public final boolean release(int arg) {
    if (tryRelease(arg)) { 
    //释放独占性变量,起始就是将status的值减1,因为acquire时是加1
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//唤醒head的后继节点
        return true;
    }
    return false;
}

 由上述代码可知,release就是先调用tryRelease来释放独占性变量。如果成功,那么就看一下是否有等待锁的阻塞线程,如果有,就调用unparkSuccessor来唤醒他们。

protected final boolean tryRelease(int releases) {
    //由于只有一个线程可以获得独占先变量,所以,所有操作不需要考虑多线程
    int c = getState() - releases; 
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) { //如果等于0,那么说明锁应该被释放了,否则表示当前线程有多次lock操作.
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

 我们可以看到tryRelease中的逻辑也体现了可重入锁的概念,只有等到state的值为0时,才代表锁真正被释放了。所以独占性变量state的值就代表锁的有无。当state=0时,表示锁未被占有,否在表示当前锁已经被占有。

private void unparkSuccessor(Node node) {
    .....
     //一般来说,需要唤醒的线程就是head的下一个节点,但是如果它获取锁的操作被取消,或在节点为null时
     //就直接继续往后遍历,找到第一个未取消的后继节点.
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

 调用了unpark方法后,进行lock操作被阻塞的线程就恢复到运行状态,就会再次执行acquireQueued中的无限for循环中的操作,再次尝试获取锁。

ReentrantLock释放锁并通知阻塞线程恢复执行

后记

 有关AQSReentrantLock的分析就差不多结束了。不得不说,我第一次看到AQS的实现时真是震惊,以前都认为SynchronizedReentrantLock的实现原理是一致的,都是依靠java虚拟机的功能实现的。没有想到还有AQS这样一个背后大Boss在提供帮助啊。学习了这个类的原理,我们对JUC的很多类的分析就简单了很多。此外,AQS涉及的CAS操作和无锁队列的算法也为我们学习其他无锁算法提供了基础。知识的海洋是无限的啊!

Share

LongAdder原理完全解析

 对LongAdder的最初了解是从Coolshell上的一篇文章中获得的,但是一直都没有深入的了解过其实现,只知道它相较于AtomicLong来说,更加适合写多读少的并发情景。今天,我们就研究一下LongAdder的原理,探究一下它如此高效的原因。

基本原理和思想

 Java有很多并发控制机制,比如说以AQS为基础的锁或者以CAS为原理的自旋锁。不了解AQS的朋友可以阅读我之前的AQS源码解析文章。一般来说,CAS适合轻量级的并发操作,也就是并发量并不多,而且等待时间不长的情况,否则就应该使用普通锁,进入阻塞状态,避免CPU空转。

 所以,如果你有一个Long类型的值会被多线程修改,那么使用CAS进行并发控制比较好,但是如果你是需要锁住一些资源,然后进行数据库操作,那么还是使用阻塞锁比较好。

 第一种情况下,我们一般都使用AtomicLongAtomicLong是通过无限循环不停的采取CAS的方法去设置内部的value,直到成功为止。那么当并发数比较多或出现更新热点时,就会导致CAS的失败机率变高,重试次数更多,越多的线程重试,CAS失败的机率越高,形成恶性循环,从而降低了效率。

 而LongAdder的原理就是降低对value更新的并发数,也就是将对单一value的变更压力分散到多个value值上,降低单个value的“热度”。

 我们知道LongAdder的大致原理之后,再来详细的了解一下它的具体实现,其中也有很多值得借鉴的并发编程的技巧。

LongAdder的成员变量

LongAdderStriped64的子类,其有三个比较重要的成员函数,在之后的函数分析中需要使用到,这里先说明一下。

// CPU的数量
static final int NCPU = Runtime.getRuntime().availableProcessors();
// Cell对象的数组,长度一般是2的指数
transient volatile Cell[] cells;
// 基础value值,当并发较低时,只累加该值
transient volatile long base;
// 创建或者扩容Cells数组时使用的自旋锁变量
transient volatile int cellsBusy;

cellsLongAdder的父类Striped64中的Cell数组类型的成员变量。每个Cell对象中都包含一个value值,并提供对这个value值的CAS操作。

static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }
}

Add操作

 我们首先来看一下LongAdderadd函数,其会多次尝试CAS操作将值进行累加,如果成功了就直接返回,失败则继续执行。代码比较复杂,而且涉及的情况比较多,我们就以梳理历次尝试CAS操作为主线,讲清楚这些CAS操作的前提条件和场景。

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    // 当cells数组为null时,会进行第一次cas操作尝试。
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
            (a = as[getProbe() & m]) == null || 
            !(uncontended = a.cas(v = a.value, v + x)))
            // 当cells数组不为null,并且通过getProbe() & m
            // 定位的Cell对象不为null时进行第二次CAS操作。
            // 如果执行不成功,则进入longAccumulate函数。
            longAccumulate(x, null, uncontended); 
    }
}

 当并发量较少时,cell数组尚未初始化,所以只调用casBase函数,对base变量进行CAS累加。

第一个CAS操作

 我们来看一下casBase函数相关的源码吧。我们可以认为变量base就是第一个value值,也是基础value变量。先调用casBase函数来cas一下base变量,如果成功了,就不需要在进行下面比较复杂的算法,

final boolean casBase(long cmp, long val) {
    return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
}

 当并发量逐渐提高时,casBase函数会失败。如果cells数组为null或为空,就直接调用longAccumulate方法。因为cells为null或在为空,说明cells未初始化,所以调用longAccumulate进行初始化。否则继续判断。
 如果cells中已经初始化,就继续进行后续判断。我们先来理解一下getProbe() & m的这个操作吧,可以把这个操作当作一次计算”hash”值,然后将cells中这个位置的Cell对象赋值给变量a。如果变量a不为null,那么就调用该对象的cas方法去设置其value值。如果a为null,或在cas赋值发生冲突,那么调用longAccumulate方法。

第二个CAS操作

LongAccumulate方法

longAccumulate函数比较复杂,带有我的注释的代码已经贴在了文章后边,这里我们就只讲一下其中比较关键的一些技巧和思想。

 首先,我们都知道只有当对base的cas操作失败之后,LongAdder才引入Cell数组.所以在longAccumulate中就是对Cell数组进行操作,分别涉及了数组的初始化,扩容和设置某个位置的Cell对象等操作。

 在这段代码中,关于cellBusy的cas操作构成了一个SpinLock,这就是经典的SpinLock的编程技巧,大家可以学习一下。

 我们先来看一下longAccumulate的主体代码,首先是一个无限for循环,然后根据cells数组的状态来判断是要进行cells数组的初始化,还是进行对象添加或者扩容。

final void longAccumulate(long x, LongBinaryOperator fn,
                             boolean wasUncontended) {
       int h;
       if ((h = getProbe()) == 0) { 
           //获取PROBE变量,探针变量,与当前运行的线程相关,不同线程不同
           ThreadLocalRandom.current(); 
       //初始化PROBE变量,和getProbe都使用Unsafe类提供的原子性操作。
           h = getProbe();
           wasUncontended = true;
       }
       boolean collide = false;
       for (;;) { //cas经典无限循环,不断尝试
           Cell[] as; Cell a; int n; long v;
           if ((as = cells) != null && (n = as.length) > 0) { 
           // cells不为null,并且数组size大于0,表示cells已经初始化了
           // 初始化Cell对象并设置到数组中或者进行数组扩容
           }
           else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
           //cells数组未初始化,获得cellsBusy lock,进行cells数组的初始化
           // cells数组初始化操作
           }
          //如果初始化数组失败了,那就再次尝试一下直接cas base变量,
          // 如果成功了就直接返回,这是最后一个进行CAS操作的地方。
           else if (casBase(v = base, ((fn == null) ? v + x :
                                       fn.applyAsLong(v, x))))
               break;
       }
   }

 进行Cell数组代码如下所示,它首先调用casCellsBusy函数获取了cellsBusy‘锁’,然后进行数组的初始化操作,最后将cellBusy‘锁’释放掉。

// 注意在进入这段代码之前已经casCellsBusy获得cellsBusy这个锁变量了。
boolean init = false;
try {
    if (cells == as) {
        Cell[] rs = new Cell[2];
        rs[h & 1] = new Cell(x); //设置x的值为cell对象的value值
        cells = rs;
        init = true;
    }
} finally {
    cellsBusy = 0;
}
if (init)
    break;

第三个CAS操作

 如果Cell数组已经初始化过了,那么就进行Cell数组的设置或者扩容。这部分代码有一系列的if else的判断,如果前一个条件不成立,才会进入下一条判断。

 首先,当Cell数组中对应位置的cell对象为null时,表明该位置的Cell对象需要进行初始化,所以使用casCellsBusy函数获取’锁’,然后初始化Cell对象,并且设置进cells数组,最后释放掉’锁’。

 当Cell数组中对应位置的cell对象不为null,则直接调用其cas操作进行累加。

 当上述操作都失败后,认为多个线程在对同一个位置的Cell对象进行操作,这个Cell对象是一个“热点”,所以Cell数组需要进行扩容,将热点分散。

if ((a = as[(n - 1) & h]) == null) { //通过与操作计算出来需要操作的Cell对象的坐标
    if (cellsBusy == 0) { //volatile 变量,用来实现spinLock,来在初始化和resize cells数组时使用。
    //当cellsBusy为0时,表示当前可以对cells数组进行操作。 
        Cell r = new Cell(x);//将x值直接赋值给Cell对象
        if (cellsBusy == 0 && casCellsBusy()) {//如果这个时候cellsBusy还是0
        //就cas将其设置为非0,如果成功了就是获得了spinLock的锁.可以对cells数组进行操作.
        //如果失败了,就会再次执行一次循环
            boolean created = false;
            try {
                Cell[] rs; int m, j;
                //判断cells是否已经初始化,并且要操作的位置上没有cell对象.
                if ((rs = cells) != null &&
                    (m = rs.length) > 0 &&
                    rs[j = (m - 1) & h] == null) {
                    rs[j] = r; //将之前创建的值为x的cell对象赋值到cells数组的响应位置.
                    created = true;
                }
            } finally {
                //经典的spinLock编程技巧,先获得锁,然后try finally将锁释放掉
                //将cellBusy设置为0就是释放锁.
                cellsBusy = 0;
            }
            if (created)
                break; //如果创建成功了,就是使用x创建了新的cell对象,也就是新创建了一个分担热点的value
            continue; 
        }
    }
    collide = false; //未发生碰撞
}
else if (!wasUncontended)//是否已经发生过一次cas操作失败
    wasUncontended = true; //设置成true,以便第二次进入下一个else if 判断
else if (a.cas(v = a.value, ((fn == null) ? v + x :
                            fn.applyAsLong(v, x))))
     //fn是操作类型,如果是空,就是相加,所以让a这个cell对象中的value值和x相加,然后在cas设置,如果成果
    //就直接返回
    break;
else if (n >= NCPU || cells != as)
  //如果cells数组的大小大于系统的可获得处理器数量或在as不再和cells相等.
    collide = false;
else if (!collide)
    collide = true;
else if (cellsBusy == 0 && casCellsBusy()) {
  //再次获得cellsBusy这个spinLock,对数组进行resize
    try {
        if (cells == as) {//要再次检测as是否等于cells以免其他线程已经对cells进行了操作.
            Cell[] rs = new Cell[n << 1]; //扩容一倍
            for (int i = 0; i < n; ++i)
                rs[i] = as[i];
            cells = rs;//赋予cells一个新的数组对象
        }
    } finally {
        cellsBusy = 0;
    }
    collide = false;
    continue;
}
h = advanceProbe(h);//由于使用当前探针变量无法操作成功,所以重新设置一个,再次尝试

第四个CAS操作

后记

 本篇文章写的不是很好,我写完之后又看了一遍coolshell上的关于LongAdder的文章,感觉自己没有人家写的那么简洁明了。我对代码细节的注释和投入太多了。其实很多代码大家都可以看懂,并不需要大量的代码片段加注释。以后要注意一下。之后会接着研究一下JUC包中的其他类,希望大家多多关注。

Share