# 常见的限流算法分析
限流在我们日常生活中经常见到,如火车站门口的栏杆、一些景点的门票只出售一定的数量 等等。在我们的开发中也用到了这种思想。
# 为什么要限流
🏫在保证可用的情况下尽可能多增加进入的人数,其余的人在排队等待,或者返回友好提示,保证里面的进行系统的用户可以正常使用, 防止系统雪崩。
# 限流算法
🌴🌴限流算法很多,常见的有三类,分别是 计数器算法 、漏桶算法、令牌桶算法 。
(1)计数器:
在一段时间间隔内,处理请求的最大数量固定,超过部分不做处理。
(2)漏桶:
漏桶大小固定,处理速度固定,但请求进入速度不固定(在突发情况请求过多时,会丢弃过多的请求)。
(3)令牌桶:
令牌桶的大小固定,令牌的产生速度固定,但是消耗令牌(即请求)速度不固定(可以应对一些某些时间请求过多的情况);每个请求都会从令牌桶中取出令牌,如果没有令牌则丢弃该次请求。
# 计数器限流
🍺在一段时间间隔内,处理请求的最大数量固定,超过部分不做处理。
举个🌰, 比如我们规定对于 A 接口,我们 1 分钟的访问次数不能超过 100 次。
那么我们可以这么做:
🎈在一开 始的时候,我们可以设置一个计数器 counter,每当一个请求过来的时候,counter 就加 1,如果 counter 的值大于 100 并且该请求与第一个请求的间隔时间还在 1 分钟之内,那么说明请求数过多,拒绝访问;
🍬如果该请求与第一个请求的间隔时间大于 1 分钟,且 counter 的值还在限流范围内,那么就重置 counter, 就是这么简单粗暴。
代码实现: 😎
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicLong; | |
// 计数器 限流 | |
public class CounterLimiter { | |
// 起始时间 | |
private static long startTime = System.currentTimeMillis(); | |
// 时间间隔 1000ms | |
private static long interval = 1000; | |
// 每个时间间隔内,限制数量 | |
private static long limit = 3; | |
// 累加器 | |
private static AtomicLong accumulator = new AtomicLong(); | |
/** | |
* true 代表放行,请求可已通过 | |
* false 代表限制,不让请求通过 | |
*/ | |
public static boolean tryAcquire() { | |
long nowTime = System.currentTimeMillis(); | |
// 判断是否在上一个时间间隔内 | |
if (nowTime < startTime + interval) { | |
// 如果还在上个时间间隔内 | |
long count = accumulator.incrementAndGet(); | |
if (count <= limit) { | |
return true; | |
} else { | |
return false; | |
} | |
} else { | |
// 如果不在上一个时间间隔内 | |
synchronized (CounterLimiter.class) { | |
// 防止重复初始化 | |
if (nowTime > startTime + interval) { | |
startTime = nowTime; | |
accumulator.set(0); | |
} | |
} | |
// 再次进行判断 | |
long count = accumulator.incrementAndGet(); | |
if (count <= limit) { | |
return true; | |
} else { | |
return false; | |
} | |
} | |
} | |
// 测试 | |
public static void main(String[] args) { | |
// 线程池,用于多线程模拟测试 | |
ExecutorService pool = Executors.newFixedThreadPool(10); | |
// 被限制的次数 | |
AtomicInteger limited = new AtomicInteger(0); | |
// 线程数 | |
final int threads = 2; | |
// 每条线程的执行轮数 | |
final int turns = 20; | |
// 同步器 | |
CountDownLatch countDownLatch = new CountDownLatch(threads); | |
long start = System.currentTimeMillis(); | |
for (int i = 0; i < threads; i++) { | |
pool.submit(() -> | |
{ | |
try { | |
for (int j = 0; j < turns; j++) { | |
boolean flag = tryAcquire(); | |
if (!flag) { | |
// 被限制的次数累积 | |
limited.getAndIncrement(); | |
} | |
Thread.sleep(200); | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
// 等待所有线程结束 | |
countDownLatch.countDown(); | |
}); | |
} | |
try { | |
countDownLatch.await(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
float time = (System.currentTimeMillis() - start) / 1000F; | |
// 输出统计结果 | |
System.out.println("限制的次数为:" + limited.get() + | |
",通过的次数为:" + (threads * turns - limited.get())); | |
System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns)); | |
System.out.println("运行的时长为:" + time + "s"); | |
} | |
} |
计数器限流的不足: 🌴
这个算法虽然简单,但是存在临界问题,我们看下图:
👉🏻从上图中我们可以看到,假设有一个恶意用户,他在 0:59 时,瞬间发送了 100 个请求,并且 1:00 又瞬间发送了 100 个请求,那么其实这个用户在 1 秒里面,瞬间发送了 200 个请求。
🍦我们刚才规定的是 1 分钟最多 100 个请求(规划的吞吐量),也就是每秒钟最多 1.7 个请求,用户通过在时间窗口的重置节点处突发请求, 可以瞬间超过我们的速率限制。
用户有可能通过算法的这个漏洞,瞬间压垮我们的应用。🙇🏻♀️
# 漏桶限流
✨漏桶算法限流的基本原理为:水(对应请求)从进水口进入到漏桶里,漏桶以一定的速度出水(请求放行),当水流入速度过大,桶内的总水量大于桶容量会直接溢出,请求被拒绝。
大致的漏桶限流规则如下:📍
(1)进水口(对应客户端请求)以任意速率流入进入漏桶。
(2)漏桶的容量是固定的,出水(放行)速率也是固定的。
(3)漏桶容量是不变的,如果处理速度太慢,桶内水量会超出了桶的容量,则后面流入的水滴会溢出,表示请求拒绝。
⭐漏桶算法其实很简单,可以粗略的认为就是注水漏水过程,往桶中以任意速率流入水,以一定速率流出水,当水超过桶容量(capacity)则丢弃,因为桶容量是不变的,保证了整体的速率。
以一定速率流出水,
削峰📍: 有大量流量进入时,会发生溢出,从而限流保护服务可用
缓冲📍: 不至于直接请求到服务器,缓冲压力
代码实现: 😎
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicLong; | |
// 漏斗限流 | |
public class LeakBucketLimiter { | |
// 桶的大小 | |
private static long capacity = 10; | |
// 流出速率,每秒两个 | |
private static long rate = 2; | |
// 开始时间 | |
private static long startTime = System.currentTimeMillis(); | |
// 桶中剩余的水 | |
private static AtomicLong water = new AtomicLong(); | |
/** | |
* true 代表放行,请求可已通过 | |
* false 代表限制,不让请求通过 | |
*/ | |
public synchronized static boolean tryAcquire() { | |
// 如果桶的余量问 0,直接放行 | |
if (water.get() == 0) { | |
startTime = System.currentTimeMillis(); | |
water.set(1); | |
return true; | |
} | |
// 计算从当前时间到开始时间流出的水,和现在桶中剩余的水 | |
// 桶中剩余的水 | |
water.set(water.get() - (System.currentTimeMillis() - startTime) / 1000 * rate); | |
// 防止出现 & lt;0 的情况 | |
water.set(Math.max(0, water.get())); | |
// 设置新的开始时间 | |
startTime += (System.currentTimeMillis() - startTime) / 1000 * 1000; | |
// 如果当前水小于容量,表示可以放行 | |
if (water.get() < capacity) { | |
water.incrementAndGet(); | |
return true; | |
} else { | |
return false; | |
} | |
} | |
// 测试 | |
public static void main(String[] args) { | |
// 线程池,用于多线程模拟测试 | |
ExecutorService pool = Executors.newFixedThreadPool(10); | |
// 被限制的次数 | |
AtomicInteger limited = new AtomicInteger(0); | |
// 线程数 | |
final int threads = 2; | |
// 每条线程的执行轮数 | |
final int turns = 20; | |
// 同步器 | |
CountDownLatch countDownLatch = new CountDownLatch(threads); | |
long start = System.currentTimeMillis(); | |
for (int i = 0; i < threads; i++) { | |
pool.submit(() -> | |
{ | |
try { | |
for (int j = 0; j < turns; j++) { | |
boolean flag = tryAcquire(); | |
if (!flag) { | |
// 被限制的次数累积 | |
limited.getAndIncrement(); | |
} | |
Thread.sleep(200); | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
// 等待所有线程结束 | |
countDownLatch.countDown(); | |
}); | |
} | |
try { | |
countDownLatch.await(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
float time = (System.currentTimeMillis() - start) / 1000F; | |
// 输出统计结果 | |
System.out.println("限制的次数为:" + limited.get() + | |
",通过的次数为:" + (threads * turns - limited.get())); | |
System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns)); | |
System.out.println("运行的时长为:" + time + "s"); | |
} | |
} |
漏桶的不足: 🤷♀️
漏桶的出水速度固定,也就是请求放行速度是固定的。
漏桶出口的速度固定,不能灵活的应对后端能力提升。比如,通过动态扩容,后端流量从 1000QPS 提升到 1WQPS,漏桶没有办法。
# 令牌桶限流
🍬令牌桶算法中新请求到来时会从桶里拿走一个令牌,如果桶内没有令牌可拿,就拒绝服务。 当然,令牌的数量也是有上限的。令牌的数量与时间和发放速率强相关,时间流逝的时间越长,会不断往桶里加入越多的令牌,如果令牌发放的速度比申请速度快,令牌桶会放满令牌,直到令牌占满整个令牌桶。
令牌桶限流大致的规则如下:🙇🏻
(1)进水口按照某个速度,向桶中放入令牌。
(2)令牌的容量是固定的,但是放行的速度不是固定的,只要桶中还有剩余令牌,一旦请求过来就能申请成功,然后放行。
(3)如果令牌的发放速度,慢于请求到来速度,桶内就无牌可领,请求就会被拒绝。
总之,令牌的发送速率可以设置,从而可以对突发的出口流量进行有效的应对。
代码实现: 😎
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicLong; | |
// 令牌桶 | |
public class TokenBucketLimiter { | |
// 桶的容量 | |
private static long capacity = 10; | |
// 放入令牌的速率,每秒 2 个 | |
private static long rate = 2; | |
// 上次放置令牌的时间 | |
private static long lastTime = System.currentTimeMillis(); | |
// 桶中令牌的余量 | |
private static AtomicLong tokenNum = new AtomicLong(); | |
/** | |
* true 代表放行,请求可已通过 | |
* false 代表限制,不让请求通过 | |
*/ | |
public synchronized static boolean tryAcquire() { | |
// 更新桶中剩余令牌的数量 | |
long now = System.currentTimeMillis(); | |
tokenNum.addAndGet((now - lastTime) / 1000 * rate); | |
tokenNum.set(Math.min(capacity, tokenNum.get())); | |
// 更新时间 | |
lastTime += (now - lastTime) / 1000 * 1000; | |
// 桶中还有令牌就放行 | |
if (tokenNum.get() > 0) { | |
tokenNum.decrementAndGet(); | |
return true; | |
} else { | |
return false; | |
} | |
} | |
// 测试 | |
public static void main(String[] args) { | |
// 线程池,用于多线程模拟测试 | |
ExecutorService pool = Executors.newFixedThreadPool(10); | |
// 被限制的次数 | |
AtomicInteger limited = new AtomicInteger(0); | |
// 线程数 | |
final int threads = 2; | |
// 每条线程的执行轮数 | |
final int turns = 20; | |
// 同步器 | |
CountDownLatch countDownLatch = new CountDownLatch(threads); | |
long start = System.currentTimeMillis(); | |
for (int i = 0; i < threads; i++) { | |
pool.submit(() -> | |
{ | |
try { | |
for (int j = 0; j < turns; j++) { | |
boolean flag = tryAcquire(); | |
if (!flag) { | |
// 被限制的次数累积 | |
limited.getAndIncrement(); | |
} | |
Thread.sleep(200); | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
// 等待所有线程结束 | |
countDownLatch.countDown(); | |
}); | |
} | |
try { | |
countDownLatch.await(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
float time = (System.currentTimeMillis() - start) / 1000F; | |
// 输出统计结果 | |
System.out.println("限制的次数为:" + limited.get() + | |
",通过的次数为:" + (threads * turns - limited.get())); | |
System.out.println("限制的比例为:" + (float) limited.get() / (float) (threads * turns)); | |
System.out.println("运行的时长为:" + time + "s"); | |
} | |
} |
令牌桶的好处: 🚀🚀🚀
令牌桶的好处之一就是可以方便地应对 突发出口流量(后端能力的提升)。
比如,可以改变令牌的发放速度,算法能按照新的发送速率调大令牌的发放数量,使得出口突发流量能被处理。