摘要
限流算法在分布式领域是一个经常被提起的话题,当系统的处理能力有限时,如何阻止计划外的请求继续对系统施压,这是一个需要重视的问题。在这里用 “断尾求生” 形容限流背后的思想,当然还有很多成语也表达了类似的意思,如弃卒保车、壮士断腕等等。除了控制流量,限流还有一个应用目的是用于控制用户行为,避免垃圾请求。如在UGC 社区,用户的发帖、回复、点赞等行为都要严格受控,一般要严格限定某行为在规定时间内允许的次数,超过次数那就是非法行为。对非法行为,业务必须规定适当的惩处策略。
一、限流算法原理
代码的实现:https://gitee.com/xjl/RedisPrinciple/tree/master/springboot-redis/src/main/java/com/zhuangxiaoyan/springbootredis/cell
1.1 计数器算法
计数器算法是使用计数器在周期内累加访问次数,当达到设定的限流值时,触发限流策略。下一个周期开始时,进行清零,重新计数。此算法在单机还是分布式环境下实现都非常简单,使用redis的incr原子自增性和线程安全即可轻松实现。
这个算法通常用于QPS限流和统计总访问量,对于秒级以上的时间周期来说,会存在一个非常严重的问题,那就是临界问题,如下图:

1.2 滑动窗口的算法
滑动窗口算法是将时间周期分为N个小周期,分别记录每个小周期内访问次数,并且根据时间滑动删除过期的小周期。如下图,假设时间周期为1min,将1min再分为2个小周期,统计每个小周期的访问数量,则可以看到,第一个时间周期内,访问数量为75,第二个时间周期内,访问数量为100,超过100的访问则被限流掉了

由此可见,当滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。此算法可以很好的解决固定窗口算法的临界问题。
1.3 漏桶算法
漏桶算法是访问请求到达时直接放入漏桶,如当前容量已达到上限(限流值),则进行丢弃(触发限流策略)。漏桶以固定的速率进行释放访问请求(即请求通过),直到漏桶为空。
1.4 令牌桶算法
令牌桶算法是程序以r(r=时间周期/限流值)的速度向令牌桶中增加令牌,直到令牌桶满,请求到达时向令牌桶请求令牌,如获取到令牌则通过请求,否则触发限流策略

二、限流算法实现与实战
2.1 Redis的zset滑动时间窗口实战
这个限流需求中存在一个滑动时间窗口,想想zset 数据结构的 score 值,是不是可以通过 score 来圈出这个时间窗口来。而且我们只需要保留这个时间窗口,窗口之外的数据都可以砍掉。那这个 zset 的 value 填什么比较合适呢?它只需要保证唯一性即可,用 uuid 会比较浪费空间,那就改用毫秒时间戳吧。如图所示,用一个 zset 结构记录用户的行为历史,每一个行为都会作为 zset 中的一个key 保存下来。同一个用户同一种行为用一个 zset 记录。为节省内存,我们只需要保留时间窗口内的行为记录,同时如果用户是冷用户,滑动时间窗口内的行为是空记录,那么这个 zset 就可以从内存中移除,不再占用空间。zset 集合中只有 score 值非常重要, value 值没有特别的意义,只需要保证它是唯一的就可以了。因为这几个连续的 Redis 操作都是针对同一个 key 的,使用 pipeline 可以显著提升Redis 存取效率。但这种方案也有缺点,因为它要记录时间窗口内所有的行为记录,如果这个量很大,比如限定 60s 内操作不得超过 100w 次这样的参数,它是不适合做这样的限流的,因为会消耗大量的存储空间。

public class SimpleRateLimiter { private Jedis jedis; public SimpleRateLimiter(Jedis jedis) { this.jedis = jedis; } public boolean isActionAllowed(String userId, String actionKey, int period, int maxCount) { String key = String.format("hist:%s:%s", userId, actionKey); long nowTs = System.currentTimeMillis(); Pipeline pipe = jedis.pipelined(); pipe.multi(); pipe.zadd(key, nowTs, "" + nowTs); pipe.zremrangeByScore(key, 0, nowTs - period * 1000); Response<Long> count = pipe.zcard(key); pipe.expire(key, period + 1); pipe.exec(); pipe.close(); return count.get() <= maxCount; } public static void main(String[] args) { Jedis jedis = new Jedis(); SimpleRateLimiter limiter = new SimpleRateLimiter(jedis); for(int i=0;i<20;i++) { System.out.println(limiter.isActionAllowed("laoqian", "reply", 60, 5)); } } }
讯享网
整体思路就是:每一个行为到来时,都维护一次时间窗口。将时间窗口外的记录全部清理掉,只保留窗口内的记录。zset 集合中只有 score 值非常重要,value 值没有特别的意义,只需要保证它是唯一的就可以了。因为这几个连续的 Redis 操作都是针对同一个 key 的,使用 pipeline 可以显著提升 Redis 存取效率。但这种方案也有缺点,因为它要记录时间窗口内所有的行为记录,如果这个量很大,比如限定 60s 内操作不得超过 100w 次这样的参数,它是不适合做这样的限流的,因为会消耗大量的存储空间。
2.2 Redis令牌桶限流实现与实战
讯享网# coding: utf8 import time class Funnel(object): def __init__(self, capacity, leaking_rate): self.capacity = capacity # 漏斗容量 self.leaking_rate = leaking_rate # 漏嘴流水速率 self.left_quota = capacity # 漏斗剩余空间 self.leaking_ts = time.time() # 上一次漏水时间 def make_space(self): now_ts = time.time() delta_ts = now_ts - self.leaking_ts # 距离上一次漏水过去了多久 delta_quota = delta_ts * self.leaking_rate # 又可以腾出不少空间了 if delta_quota < 1: # 腾的空间太少,那就等下次吧 return self.left_quota += delta_quota # 增加剩余空间 self.leaking_ts = now_ts # 记录漏水时间 if self.left_quota > self.capacity: # 剩余空间不得高于容量 self.left_quota = self.capacity def watering(self, quota): self.make_space() if self.left_quota >= quota: # 判断剩余空间是否足够 self.left_quota -= quota return True return False funnels = {} # 所有的漏斗 # capacity 漏斗容量 # leaking_rate 漏嘴流水速率 quota/s def is_action_allowed(user_id, action_key, capacity, leaking_rate): key = '%s:%s' % (user_id, action_key) funnel = funnels.get(key) if not funnel: funnel = Funnel(capacity, leaking_rate) funnels[key] = funnel return funnel.watering(1) for i in range(20): print(is_action_allowed('laoqian', 'reply', 15, 0.5))
class FunnelRateLimiter { static class Funnel { int capacity; float leakingRate; int leftQuota; long leakingTs; public Funnel(int capacity, float leakingRate) { this.capacity = capacity; this.leakingRate = leakingRate; this.leftQuota = capacity; this.leakingTs = System.currentTimeMillis(); } void makeSpace() { long nowTs = System.currentTimeMillis(); long deltaTs = nowTs - leakingTs; int deltaQuota = (int) (deltaTs * leakingRate); if (deltaQuota < 0) { // 间隔时间太长,整数数字过大溢出 this.leftQuota = capacity; this.leakingTs = nowTs; return; } if (deltaQuota < 1) { // 腾出空间太小,最小单位是1 return; } this.leftQuota += deltaQuota; this.leakingTs = nowTs; if (this.leftQuota > this.capacity) { this.leftQuota = this.capacity; } } boolean watering(int quota) { makeSpace(); if (this.leftQuota >= quota) { this.leftQuota -= quota; return true; } return false; } } private Map<String, Funnel> funnels = new HashMap<>(); public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) { String key = String.format("%s:%s", userId, actionKey); Funnel funnel = funnels.get(key); if (funnel == null) { funnel = new Funnel(capacity, leakingRate); funnels.put(key, funnel); } return funnel.watering(1); // 需要1个quota } }
Funnel 对象的 make_space 方法是漏斗算法的核心,其在每次灌水前都会被调用以触发漏水,给漏斗腾出空间来。能腾出多少空间取决于过去了多久以及流水的速率。Funnel 对象占据的空间大小不再和行为的频率成正比,它的空间占用是一个常量。

问题来了,分布式的漏斗算法该如何实现?能不能使用 Redis 的基础数据结构来搞定?我们观察 Funnel 对象的几个字段,我们发现可以将 Funnel 对象的内容按字段存储到一个 hash 结构中,灌水的时候将 hash 结构的字段取出来进行逻辑运算后,再将新值回填到 hash 结构中就完成了一次行为频度的检测。但是有个问题,我们无法保证整个过程的原子性。从 hash 结构中取值,然后在内存里运算,再回填到 hash 结构,这三个过程无法原子化,意味着需要进行适当的加锁控制。而一旦加锁,就意味着会有加锁失败,加锁失败就需要选择重试或者放弃。如果重试的话,就会导致性能下降。如果放弃的话,就会影响用户体验。同时,代码的复杂度也跟着升高很多。这真是个艰难的选择。
讯享网cl.throttle laoqian:reply 15 30 60 1) (integer) 0 # 0 表示允许,1表示拒绝 2) (integer) 15 # 漏斗容量capacity 3) (integer) 14 # 漏斗剩余空间left_quota 4) (integer) -1 # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒) 5) (integer) 2 # 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)
在执行限流指令时,如果被拒绝了,就需要丢弃或重试。cl.throttle 指令考虑的非常周到,连重试时间都帮你算好了,直接取返回结果数组的第四个值进行 sleep 即可,如果不想阻塞线程,也可以异步定时任务来重试。
package com.zhuangxiaoyan.springbootredis.cell; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import java.util.Arrays; import java.util.List; / * @Classname RedisCell * @Description TODO * @Date 2022/4/13 22:37 * @Created by xjl */ public class RedisCell { / * redis 客户端 */ @Autowired private StringRedisTemplate redisTemplate; / * 请求是否被允许 * @param key 限流的key * @param maxBurst 最大请求 * @param tokens 令牌数量 每过seconds时间就往桶里放入tokens的令牌数量 * @param seconds 时间 每过seconds时间就往桶里放入tokens的令牌数量 * @param apply 本次获取几个令牌 * @return */ public boolean isActionAllowed(String key, int maxBurst, int tokens, int seconds, int apply){ DefaultRedisScript<List> script = new DefaultRedisScript<>("return redis.call('cl.throttle',KEYS[1],ARGV[1],ARGV[2],ARGV[3],ARGV[4])", List.class); List<Long> rst = redisTemplate.execute(script, Arrays.asList(key), maxBurst, tokens, seconds, apply); //响应结果 / * 1) (integer) 0 # 当前请求是否被允许,0表示允许,1表示不允许 * 2) (integer) 11 # 令牌桶的最大容量,令牌桶中令牌数的最大值 * 3) (integer) 10 # 令牌桶中当前的令牌数 * 4) (integer) -1 # 如果被拒绝,需要多长时间后在重试,如果当前被允许则为-1 * 5) (integer) 12 # 多长时间后令牌桶中的令牌会满 */ return rst.get(0) == 0; } }
三、 分布式限流解决方案
在高并发系统中常见的限流方式有很多种,但是总的来说限流的分类如下所示:
- 合法性验证限流:比如验证码、IP 黑名单等,这些手段可以有效的防止恶意攻击和爬虫采集;
- 容器限流:比如 Tomcat、Nginx 等限流手段,其中 Tomcat 可以设置最大线程数(maxThreads),当并发超过最大线程数会排队等待执行;而 Nginx 提供了两种限流手段:一是控制速率,二是控制并发连接数;
- 服务端限流:比如我们在服务器端通过限流算法实现限流,此项也是我们本文介绍的重点。
3.1 合法性验证限流
合法性验证限流为最常规的业务代码,就是普通的验证码和 IP 黑名单系统。可以利用的redis中布隆过滤器来实现的IP黑白名单。

3.2 容器限流设计
3.2.1 Tomcat 限流设置
讯享网<Connector port="8080" protocol="HTTP/1.1" connectionTimeout="20000" maxThreads="150" redirectPort="8443" />
其中 maxThreads 就是 Tomcat 的最大线程数,当请求的并发大于此值(maxThreads)时,请求就会排队执行,这样就完成了限流的目的。 maxThreads 的值可以适当的调大一些,此值默认为 150(Tomcat 版本 8.5.42),但这个值也不是越大越好,要看具体的硬件配置,需要注意的是每开启一个线程需要耗用 1MB 的 JVM 内存空间用于作为线程栈之用,并且线程越多 GC 的负担也越重。需要注意一下,操作系统对于进程中的线程数有一定的限制,Windows 每个进程中的线程数不允许超过 2000,Linux 每个进程中的线程数不允许超过 1000。
3.2.2 Nginx 限流
Nginx 提供了两种限流手段:一是控制速率,二是控制并发连接数。
我们需要使用 limit_req_zone 用来限制单位时间内的请求数,即速率限制。
limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s; server { location / { limit_req zone=mylimit; } } 以上配置表示,限制每个 IP 访问的速度为 2r/s,因为 Nginx 的限流统计是基于毫秒的,我们设置的速度是 2r/s,转换一下就是 500ms 内单个 IP 只允许通过 1 个请求,从 501ms 开始才允许通过第 2 个请求。
讯享网limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s; server { location / { limit_req zone=mylimit burst=4; } } burst=4 表示每个 IP 最多允许4个突发请求,
利用 limit_conn_zone 和 limit_conn 两个指令即可控制并发数
limit_conn_zone $binary_remote_addr zone=perip:10m; limit_conn_zone $server_name zone=perserver:10m; server { ... limit_conn perip 10; limit_conn perserver 100; } 其中 limit_conn perip 10 表示限制单个 IP 同时最多能持有 10 个连接;limit_conn perserver 100 表示 server 同时能处理并发连接的总数为 100 个。
3.3 服务端限流
3.3.1 连接池、线程限流方案
可以使用池化技术来限制总资源数:连接池、线程池。比如分配给每个应用的数据库连接是 100,那么本应用最多可以使用 100 个资源,超出了可以 等待 或者 抛异常。
讯享网//获取当前机器的核数 public static final int cpuNum = Runtime.getRuntime().availableProcessors(); @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(cpuNum);//核心线程大小 taskExecutor.setMaxPoolSize(cpuNum * 2);//最大线程大小 taskExecutor.setQueueCapacity(500);//队列最大容量 //当提交的任务个数大于QueueCapacity,就需要设置该参数,但spring提供的都不太满足业务场景,可以自定义一个,也可以注意不要超过QueueCapacity即可 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); taskExecutor.setThreadNamePrefix("BCarLogo-Thread-"); taskExecutor.initialize(); return taskExecutor; }
corePoolSize:核心线程数;maximunPoolSize:最大线程数
每当有新的任务到线程池时,
- 第一步:先判断线程池中当前线程数量是否达到了corePoolSize,若未达到,则新建线程运行此任务,且任务结束后将该线程保留在线程池中,不做销毁处理,若当前线程数量已达到corePoolSize,则进入下一步;
- 第二步:判断工作队列(workQueue)是否已满,未满则将新的任务提交到工作队列中,满了则进入下一步;
- 第三步:判断线程池中的线程数量是否达到了maxumunPoolSize,如果未达到,则新建一个工作线程来执行这个任务,如果达到了则使用饱和策略来处理这个任务。注意: 在线程池中的线程数量超过corePoolSize时,每当有线程的空闲时间超过了keepAliveTime,这个线程就会被终止。直到线程池中线程的数量不大于corePoolSize为止。
当工作队列满且线程个数达到maximunPoolSize后所采取的策略
AbortPolicy:默认策略;新任务提交时直接抛出未检查的异常RejectedExecutionException,该异常可由调用者捕获。CallerRunsPolicy:既不抛弃任务也不抛出异常,使用调用者所在线程运行新的任务。DiscardPolicy:丢弃新的任务,且不抛出异常。DiscardOldestPolicy:调用poll方法丢弃工作队列队头的任务,然后尝试提交新任务自定义策略:根据用户需要定制。
3.3.2 限流某个接口的总并发/请求数
使用 Java 中的 AtomicLong,示意代码: try{ if(atomic.incrementAndGet() > 限流数) { //拒绝请求 } else { //处理请求 } } finally { atomic.decrementAndGet(); }
讯享网LoadingCache counter = CacheBuilder.newBuilder() .expireAfterWrite(2, TimeUnit.SECONDS) .build(newCacheLoader() { @Override public AtomicLong load(Long seconds) throws Exception { return newAtomicLong(0); } }); longlimit =1000; while(true) { // 得到当前秒 long currentSeconds = System.currentTimeMillis() /1000; if(counter.get(currentSeconds).incrementAndGet() > limit) { System.out.println("限流了: " + currentSeconds); continue; } // 业务处理 }
3.3.3 利用的MQ来实现限流

博文参考
限流的一些解决方案_夜风_BLOG的博客-CSDN博客

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/11819.html