系统设计概念:限流(Rate Limiting)
系统设计概念:限流(Rate Limiting)

系统设计概念:限流(Rate Limiting)

Created time
Jun 24, 2022 10:14 AM
Tags
blog
系统设计
限流
Priority
Status
Date
Jun 27, 2022
我们为什么需要限流?
限流通常来说有以下几个目的:
  • 保护系统,避免 DDoS 攻击
  • 减少开销💰
  • 避免服务过载
限流可以认为是服务降级的一种。限流就是限制系统的输入和输出流量以达到保护系统的目的。
一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。

限流在哪里做

通常,限流器可以在以下几个部分来做:
  • 客户端
  • 服务端
  • 中间件

限流算法

限流通常有以下几个算法:
  1. 令牌桶算法
  1. 漏斗算法
  1. 固定窗口计数器
  1. 滑动窗口计数器
接下来,让我们来一一揭开它们的面纱。

令牌桶算法

令牌桶算法
令牌桶算法
主要有两个关键的参数:
  • Bucket 的容量大小
  • Refiller 的恢复速率
当请求 req 到来,先看 Bucket 中是否还有 Token,如果有 Token,则正常处理请求 req;
如果 Bucket 中空了,那么丢弃该请求 req;
Refiller 根据设定的速率定期将 Bucket 填满 Token;

令牌桶算法实现

from time import time, sleep


class TokenBucket:
    """An implementation of a token bucket."""

    def __init__(
        self, tokens: int, time_unit: int, forward_callback, drop_callback
    ) -> None:
        self.tokens = tokens
        self.time_unit = time_unit
        self.fill_rate = tokens / time_unit
        self.last_check = time()
        self.bucket = tokens

        self.forward_callback = forward_callback
        self.drop_callback = drop_callback

    def handle(self, packet):
        """Handle a packet. Refill tokens according to fill rate"""
        current_time = time()
        time_elapsed = current_time - self.last_check
        self.last_check = current_time

        self.bucket = self.bucket + time_elapsed * self.fill_rate

        if self.bucket > self.tokens:
            self.bucket = self.tokens

        if self.bucket > 0:
            self.bucket -= 1
            self.forward_callback(packet)
        else:
            self.drop_callback(packet)


def forward(packet):
    print("Forwarding packet:", packet)


def drop(packet):
    print("Dropping packet:", packet)


if __name__ == "__main__":
    bucket = TokenBucket(1, 1, forward, drop)
    packet = 1
    while True:
        sleep(0.2)
        bucket.handle(packet)
        packet += 1

漏斗算法

顾名思义,漏斗算法就像一个漏斗,以恒定的流出速率来消费请求。
实现上,漏斗算法通常使用「队列」来实现。

固定窗口计数器算法

固定窗口计数器就是设定一个固定的时间窗口,然后统计在这个固定时间窗口的请求数,看是否超过阈值。
打个比方,就像一列火车,火车是由一节一节的车厢组成,每个乘客只能选择一个车厢上车;我们统计每一个车厢上的乘客,超出阈值的乘客就丢弃,在阈值范围内的乘客正常处理。
上面的例子中,车厢就是固定窗口,乘客就是请求。
假设我们设定时间窗口固定为 1 min,如下图所示:
固定窗口
固定窗口
  • 第一分钟请求没有超过阈值,全部正常处理
  • 第二分钟过了一半的时候就超出阈值,对于超出的请求,我们丢弃处理
  • 第三分钟刚好达到阈值,正常处理

固定窗口计数器算法实现

from time import time, sleep


class FixedWindow:
    def __init__(self, capacity, forward_callback, drop_callback) -> None:
        self.capacity = capacity
        self.allowance = capacity  # initiate this to allow the first request

        self.cur_time = int(time())  # seperate each second
        self.forward_callback = forward_callback
        self.drop_callback = drop_callback

    def handle(self, packet):
        if int(time()) != self.cur_time:
            self.cur_time = int(time())
            self.allowance = self.capacity

        if self.allowance == 0:
            return self.drop_callback(packet)

        self.allowance -= 1
        return self.forward_callback(packet)


def forward(packet):
    print(f"Forward Packet {packet}.")


def drop(packet):
    print(f"Drop Packet {packet}.")


throttle = FixedWindow(5, forward, drop)
packet = 0

while True:
    sleep(0.1)
    throttle.handle(packet)
    packet += 1

滑动窗口计数器算法

从名字上就能看出与固定窗口算法不同,滑动窗口使用的是变动的窗口然后根据权重计算出一个近似的结果。
固定窗口算法存在这样一个问题:当新旧两个时间窗口交接时如果出现突发流量,会导致限流失效,我们能够从监控系统中看到两个窗帘边缘部分存在明显的流量尖刺。
而滑动窗口为了解决固定窗口流量尖刺的问题,考虑将上一个窗口的计数器纳入限流的计算。
上一个窗口所占权重就是它在当前滑动时间窗口所占的面积
以下图为例,时间窗口为 1 min,rate limit 限制为 50,第一个时间窗口计数器为 50,在 00:01:20 来了一个新的请求,第二个计数器的数值为 20:
  • 在这个例子中,第一个时间窗口占比为
  • 当前的 rate limit 值为:
因为 53.5 > 50,所以 00:01:20 到达的请求会被丢弃。
滑动窗口-1
滑动窗口-1
 
而如果 00:01:40 秒的时候再次到达一个请求,此时计算的 rate limit 就是
因为 36.5 小于 50,因此 00:01:40 的请求可以正常被处理。
滑动窗口-2
滑动窗口-2

滑动窗口计数器算法实现

from rich import print
from time import time, sleep


class SlidingWindow:
    def __init__(self, capacity, time_unit, forward_callback, drop_callback) -> None:
        self.capacity = capacity
        self.time_unit = time_unit
        self.forward_callback = forward_callback
        self.drop_callback = drop_callback

        self.cur_time = time()  # current sliding window starts at
        self.pre_count = capacity
        self.cur_count = 0

    def handle(self, packet):
        if (time() - self.cur_time) > self.time_unit:
            self.cur_time = time()
            self.pre_count = self.cur_count
            self.cur_count = 0

        pre_weight = (self.time_unit - (time() - self.cur_time)) / self.time_unit
        estimate_count = self.pre_count * pre_weight + self.cur_count

        if estimate_count > self.capacity:
            return self.drop_callback(packet)

        self.cur_count += 1
        return self.forward_callback(packet)


def forward(packet):
    print(f"Forward Packet {packet}.")


def drop(packet):
    print(f"Drop Packet {packet}.")


throttle = SlidingWindow(10, 1, forward, drop)

packet = 0

while True:
    sleep(0.1)
    throttle.handle(packet)
    packet += 1
 

参考