我们为什么需要限流?
限流通常来说有以下几个目的:
- 保护系统,避免 DDoS 攻击
- 减少开销💰
- 避免服务过载
限流可以认为是服务降级的一种。限流就是限制系统的输入和输出流量以达到保护系统的目的。
一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。
限流在哪里做
通常,限流器可以在以下几个部分来做:
- 客户端
- 服务端
- 中间件
限流算法
限流通常有以下几个算法:
- 令牌桶算法
- 漏斗算法
- 固定窗口计数器
- 滑动窗口计数器
接下来,让我们来一一揭开它们的面纱。
令牌桶算法

主要有两个关键的参数:
- Bucket 的容量大小
- Refiller 的恢复速率
当请求 req 到来,先看 Bucket 中是否还有 Token,如果有 Token,则正常处理请求 req;
如果 Bucket 中空了,那么丢弃该请求 req;
Refiller 根据设定的速率定期将 Bucket 填满 Token;
令牌桶算法实现
from time import time, sleep
class TokenBucket:
"""An implementation of a token bucket."""
def __init__(
self, tokens: int, time_unit: int, forward_callback, drop_callback
) -> None:
self.tokens = tokens
self.time_unit = time_unit
self.fill_rate = tokens / time_unit
self.last_check = time()
self.bucket = tokens
self.forward_callback = forward_callback
self.drop_callback = drop_callback
def handle(self, packet):
"""Handle a packet. Refill tokens according to fill rate"""
current_time = time()
time_elapsed = current_time - self.last_check
self.last_check = current_time
self.bucket = self.bucket + time_elapsed * self.fill_rate
if self.bucket > self.tokens:
self.bucket = self.tokens
if self.bucket > 0:
self.bucket -= 1
self.forward_callback(packet)
else:
self.drop_callback(packet)
def forward(packet):
print("Forwarding packet:", packet)
def drop(packet):
print("Dropping packet:", packet)
if __name__ == "__main__":
bucket = TokenBucket(1, 1, forward, drop)
packet = 1
while True:
sleep(0.2)
bucket.handle(packet)
packet += 1漏斗算法
顾名思义,漏斗算法就像一个漏斗,以恒定的流出速率来消费请求。
实现上,漏斗算法通常使用「队列」来实现。
固定窗口计数器算法
固定窗口计数器就是设定一个固定的时间窗口,然后统计在这个固定时间窗口的请求数,看是否超过阈值。
打个比方,就像一列火车,火车是由一节一节的车厢组成,每个乘客只能选择一个车厢上车;我们统计每一个车厢上的乘客,超出阈值的乘客就丢弃,在阈值范围内的乘客正常处理。
上面的例子中,车厢就是固定窗口,乘客就是请求。
假设我们设定时间窗口固定为 1 min,如下图所示:

- 第一分钟请求没有超过阈值,全部正常处理
- 第二分钟过了一半的时候就超出阈值,对于超出的请求,我们丢弃处理
- 第三分钟刚好达到阈值,正常处理
固定窗口计数器算法实现
from time import time, sleep
class FixedWindow:
def __init__(self, capacity, forward_callback, drop_callback) -> None:
self.capacity = capacity
self.allowance = capacity # initiate this to allow the first request
self.cur_time = int(time()) # seperate each second
self.forward_callback = forward_callback
self.drop_callback = drop_callback
def handle(self, packet):
if int(time()) != self.cur_time:
self.cur_time = int(time())
self.allowance = self.capacity
if self.allowance == 0:
return self.drop_callback(packet)
self.allowance -= 1
return self.forward_callback(packet)
def forward(packet):
print(f"Forward Packet {packet}.")
def drop(packet):
print(f"Drop Packet {packet}.")
throttle = FixedWindow(5, forward, drop)
packet = 0
while True:
sleep(0.1)
throttle.handle(packet)
packet += 1滑动窗口计数器算法
从名字上就能看出与固定窗口算法不同,滑动窗口使用的是变动的窗口然后根据权重计算出一个近似的结果。
固定窗口算法存在这样一个问题:当新旧两个时间窗口交接时如果出现突发流量,会导致限流失效,我们能够从监控系统中看到两个窗帘边缘部分存在明显的流量尖刺。
而滑动窗口为了解决固定窗口流量尖刺的问题,考虑将上一个窗口的计数器纳入限流的计算。
上一个窗口所占权重就是它在当前滑动时间窗口所占的面积。
以下图为例,时间窗口为 1 min,rate limit 限制为 50,第一个时间窗口计数器为 50,在 00:01:20 来了一个新的请求,第二个计数器的数值为 20:
- 在这个例子中,第一个时间窗口占比为
- 当前的 rate limit 值为:
因为 53.5 > 50,所以 00:01:20 到达的请求会被丢弃。

而如果 00:01:40 秒的时候再次到达一个请求,此时计算的 rate limit 就是
因为 36.5 小于 50,因此 00:01:40 的请求可以正常被处理。

滑动窗口计数器算法实现
from rich import print
from time import time, sleep
class SlidingWindow:
def __init__(self, capacity, time_unit, forward_callback, drop_callback) -> None:
self.capacity = capacity
self.time_unit = time_unit
self.forward_callback = forward_callback
self.drop_callback = drop_callback
self.cur_time = time() # current sliding window starts at
self.pre_count = capacity
self.cur_count = 0
def handle(self, packet):
if (time() - self.cur_time) > self.time_unit:
self.cur_time = time()
self.pre_count = self.cur_count
self.cur_count = 0
pre_weight = (self.time_unit - (time() - self.cur_time)) / self.time_unit
estimate_count = self.pre_count * pre_weight + self.cur_count
if estimate_count > self.capacity:
return self.drop_callback(packet)
self.cur_count += 1
return self.forward_callback(packet)
def forward(packet):
print(f"Forward Packet {packet}.")
def drop(packet):
print(f"Drop Packet {packet}.")
throttle = SlidingWindow(10, 1, forward, drop)
packet = 0
while True:
sleep(0.1)
throttle.handle(packet)
packet += 1.png?table=block&id=ea171914-8323-42bc-b059-9223aaafd481&cache=v2)