什么是好的速率限制算法?

我可以使用一些伪代码,或者更好的 Python。我试图为 Python IRC 机器人实现一个速率限制队列,它部分工作,但如果有人触发的消息少于限制(例如,速率限制是每8秒5条消息,而人只触发4条) ,下一个触发超过8秒(例如,16秒后) ,机器人发送消息,但队列变满,机器人等待8秒,即使它不需要,因为8秒周期已经过去。

112573 次浏览

令牌桶实现起来相当简单。

从一个装有5个代币的桶开始。

每5/8秒: 如果桶中少于5个代币,则添加一个。

每次要发送消息时: 如果 bucket 有≥1个令牌,取出一个令牌并发送消息。否则,等待/删除消息/随便什么。

(显然,在实际的代码中,您将使用整数计数器而不是实际的令牌,并且您可以通过存储时间戳来优化每5/8秒的步骤)


再读一遍问题,如果速率限制每8秒完全重置一次,那么这里有一个修改:

从一个时间戳 last_send开始,在很久以前(例如,在那个时代)。另外,从相同的5令牌桶开始。

每5/8秒执行一次规则。

每次发送消息时: 首先,检查 last_send≥8秒前。如果是这样,填满桶(设置为5个令牌)。其次,如果 bucket 中有令牌,则发送消息(否则,删除/等待/等等)。第三,将 last_send设置为现在。

这个方案应该可行。


我实际上已经使用这样的策略(第一种方法)编写了一个 IRC bot。它在佩尔,而不是 Python,但这里有一些代码可以说明:

这里的第一部分处理向桶中添加令牌。您可以看到基于时间(从第2行到最后一行)添加令牌的优化,然后最后一行将 bucket 内容限制到最大值(MESSAGE _ BURST)

    my $start_time = time;
...
# Bucket handling
my $bucket = $conn->{fujiko_limit_bucket};
my $lasttx = $conn->{fujiko_limit_lasttx};
$bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$con 是一个数据结构,它可以传递。这在一个常规运行的方法中(它计算下一次有事情要做的时间,并且睡眠那么长时间或者直到获得网络流量)。该方法的下一部分处理发送。这相当复杂,因为消息具有与之相关联的优先级。

    # Queue handling. Start with the ultimate queue.
my $queues = $conn->{fujiko_queues};
foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
# Ultimate is special. We run ultimate no matter what. Even if
# it sends the bucket negative.
--$bucket;
$entry->{code}(@{$entry->{args}});
}
$queues->[PRIORITY_ULTIMATE] = [];

这是第一个队列,无论如何都要运行。即使这会让我们的连接因为洪水而中断。用于非常重要的事情,比如响应服务器的 PING。接下来,剩下的队列:

    # Continue to the other queues, in order of priority.
QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
my $queue = $queues->[$pri];
while (scalar(@$queue)) {
if ($bucket < 1) {
# continue later.
$need_more_time = 1;
last QRUN;
} else {
--$bucket;
my $entry = shift @$queue;
$entry->{code}(@{$entry->{args}});
}
}
}

最后,bucket 状态被保存回 $con 数据结构(实际上在方法中稍后一点; 它首先计算多久会有更多的工作)

    # Save status.
$conn->{fujiko_limit_bucket} = $bucket;
$conn->{fujiko_limit_lasttx} = $start_time;

正如您所看到的,实际的桶处理代码非常小,大约有四行。代码的其余部分是优先级队列处理。机器人有优先级队列,例如,与它聊天的人不能阻止它执行重要的踢/禁止任务。

一种解决方案是为每个队列项添加一个时间戳,并在8秒钟之后丢弃该项。每次添加队列时,都可以执行此检查。

只有在队列已满时,将队列大小限制为5并放弃任何添加的内容时,这种方法才有效。

保持最后五行发送的时间。保持队列中的消息,直到第五个最近消息(如果存在的话)在过去至少8秒(last _ five 作为时间数组) :

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
last_five.insert(0, now)
send_message(msg)
if len(last_five) > 5:
last_five.pop()

在排队的函数之前使用这个 decorator@RateLimited (ratepersec)。

基本上,它检查自上次以来1/rate sec 是否已经过去,如果没有,则等待剩余的时间,否则不等待。这实际上将您限制为速率/秒。这个修饰符可以应用于任何你想要的限速函数。

在您的示例中,如果希望每8秒钟最多发送5条消息,请在 sendToQueue 函数之前使用@RateLimited (0.625)。

import time


def RateLimited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args,**kargs):
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.clock()
return ret
return rateLimitedFunction
return decorate


@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
print num


if __name__ == "__main__":
print "This should print 1,2,3... at about 2 per second."
for i in range(1,100):
PrintNumber(i)

这里是 最简单的算法,如果你想在消息到达得太快时删除它们(而不是排队,这是有意义的,因为队列可能会变得任意大) :

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds


when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
discard_message();
else:
forward_message();
allowance -= 1.0;

在这个解决方案中没有数据结构、计时器等等,而且工作得很干净:)为了看到这一点,“津贴”最多以每秒5/8个单位的速度增长,也就是说最多每8秒增长5个单位。每转发一条消息就扣除一个单位,所以每8秒钟不能发送超过5条消息。

请注意,rate应该是一个整数,即没有非零小数部分,否则算法将无法正常工作(实际速率将不是 rate/per)。例如,rate=0.5; per=1.0;不工作,因为 allowance永远不会增长到1.0。但是 rate=1.0; per=2.0;可以正常工作。

这样吧:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;


private boolean isRateLimited(int msgs_per_sec) {
if (System.currentTimeMillis() - check_time > 1000) {
check_time = System.currentTimeMillis();
msgs_sent_count = 0;
}


if (msgs_sent_count > (msgs_per_sec - 1)) {
return true;
} else {
msgs_sent_count++;
}


return false;
}

阻塞处理,直到消息可以发送,从而排队进一步的消息,antti 的漂亮的解决方案也可能被修改为这样:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds


when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
time.sleep( (1-allowance) * (per/rate))
forward_message();
allowance = 0.0;
else:
forward_message();
allowance -= 1.0;

它只是等待,直到有足够的津贴发送消息。为了不以两倍的速率启动,允许值也可以初始化为0。

如果有人仍然感兴趣,我使用这个简单的可调用类结合定时 LRU 键值存储来限制每个 IP 的请求速率。使用 deque,但可以重写为与列表一起使用。

from collections import deque
import time




class RateLimiter:
def __init__(self, maxRate=5, timeUnit=1):
self.timeUnit = timeUnit
self.deque = deque(maxlen=maxRate)


def __call__(self):
if self.deque.maxlen == len(self.deque):
cTime = time.time()
if cTime - self.deque[0] > self.timeUnit:
self.deque.append(cTime)
return False
else:
return True
self.deque.append(time.time())
return False


r = RateLimiter()
for i in range(0,100):
time.sleep(0.1)
print(i, "block" if r() else "pass")

我需要 Scala 的一个变体,这就是:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {


import Thread.sleep
private def now = System.currentTimeMillis / 1000.0
private val (calls, sec) = callsPerSecond
private var allowance  = 1.0
private var last = now


def apply(a: A): B = {
synchronized {
val t = now
val delta_t = t - last
last = t
allowance += delta_t * (calls / sec)
if (allowance > calls)
allowance = calls
if (allowance < 1d) {
sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
}
allowance -= 1
}
f(a)
}


}

以下是使用方法:

val f = Limiter((5d, 8d), {
_: Unit ⇒
println(System.currentTimeMillis)
})
while(true){f(())}

只是一个来自已接受答案的代码的 Python 实现。

import time


class Object(object):
pass


def get_throttler(rate, per):
scope = Object()
scope.allowance = rate
scope.last_check = time.time()
def throttler(fn):
current = time.time()
time_passed = current - scope.last_check;
scope.last_check = current;
scope.allowance = scope.allowance + time_passed * (rate / per)
if (scope.allowance > rate):
scope.allowance = rate
if (scope.allowance < 1):
pass
else:
fn()
scope.allowance = scope.allowance - 1
return throttler