在 N 秒内节流对 M 个请求的方法调用

我需要一个组件/类,它可以在 N 秒内将某些方法的执行限制到最大 M 次调用(或者 ms 或 nanos,无关紧要)。

换句话说,我需要确保我的方法在 N 秒的滑动窗口中执行不超过 M 次。

如果您不知道现有的类,请随意张贴您的解决方案/想法,您将如何实现这一点。

110173 次浏览

仔细阅读 代币桶算法。基本上,你有一个装有代币的桶。每次执行该方法时,都会获取一个标记。如果没有更多的令牌,则阻塞直到得到一个令牌。同时,有一些外部参与者以固定的时间间隔补充令牌。

我不知道有这样的图书馆(或类似的东西)。您可以将此逻辑写入代码中,或者使用 AspectJ 添加行为。

我会使用一个固定大小为 M 的 环形缓冲器环形缓冲器时间戳,每次调用该方法时,检查最早的条目,如果过去的时间小于 N 秒,则执行并添加另一个条目,否则时间差会导致休眠。

具体来说,您应该能够使用 DelayQueue来实现这一点。使用 M Delayed实例初始化队列,其延迟最初设置为零。当对该方法的请求进来时,take是一个令牌,它会导致该方法阻塞,直到满足节流要求为止。获取令牌后,向队列添加新的令牌 add,其延迟为 N

虽然这不是您要求的,但是 ThreadPoolExecutor也可能是有用的,它被设计为在 N 秒内限制 M 个同时请求,而不是 M 个请求。

我已经实现了一个简单的节流算法, Http://krishnaprasadas.blogspot.in/2012/05/throttling-algorithm.html

算法简介,

该算法利用了 Java延迟队列的能力。 创建一个具有预期延迟的 推迟了对象(这里是1000/M,表示毫秒 时间单位)。 将同一对象放入延迟队列中,延迟队列将为我们提供移动窗口。 然后在每个方法调用 拿着队列中的对象之前,take 是一个阻塞调用,它只会在指定的延迟之后返回,而且在方法调用之后不要忘记将对象以更新的时间放入队列中(这里是当前的毫秒)。

在这里我们也可以有多个延迟对象和不同的延迟。这种方法也将提供高吞吐量。

这取决于应用程序。

想象一下这样的情况: 多线程希望一个令牌用 不允许爆炸执行一些 全球有限利率行动操作(即,您希望每10秒限制10个操作,但是您不希望在第一秒发生10个操作,然后保持9秒停止)。

DelayedQueue 有一个缺点: 线程请求令牌的顺序可能与它们完成请求的顺序不同。如果多个线程在等待令牌时被阻塞,则不清楚哪个线程将获取下一个可用令牌。在我看来,你甚至可以让线程永远等待。

一种解决方案是使用 两个连续动作之间的最短时间间隔,并按照请求的顺序执行操作。

下面是一个实现方案:

public class LeakyBucket {
protected float maxRate;
protected long minTime;
//holds time of last action (past or future!)
protected long lastSchedAction = System.currentTimeMillis();


public LeakyBucket(float maxRate) throws Exception {
if(maxRate <= 0.0f) {
throw new Exception("Invalid rate");
}
this.maxRate = maxRate;
this.minTime = (long)(1000.0f / maxRate);
}


public void consume() throws InterruptedException {
long curTime = System.currentTimeMillis();
long timeLeft;


//calculate when can we do the action
synchronized(this) {
timeLeft = lastSchedAction + minTime - curTime;
if(timeLeft > 0) {
lastSchedAction += minTime;
}
else {
lastSchedAction = curTime;
}
}


//If needed, wait for our time
if(timeLeft <= 0) {
return;
}
else {
Thread.sleep(timeLeft);
}
}
}

对我来说,开箱即用的是谷歌番石榴 费率限制器

// Allow one request per second
private RateLimiter throttle = RateLimiter.create(1.0);


private void someMethod() {
throttle.acquire();
// Do something
}

试着用这个简单的方法:

public class SimpleThrottler {


private static final int T = 1; // min
private static final int N = 345;


private Lock lock = new ReentrantLock();
private Condition newFrame = lock.newCondition();
private volatile boolean currentFrame = true;


public SimpleThrottler() {
handleForGate();
}


/**
* Payload
*/
private void job() {
try {
Thread.sleep(Math.abs(ThreadLocalRandom.current().nextLong(12, 98)));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.err.print(" J. ");
}


public void doJob() throws InterruptedException {
lock.lock();
try {


while (true) {


int count = 0;


while (count < N && currentFrame) {
job();
count++;
}


newFrame.await();
currentFrame = true;
}


} finally {
lock.unlock();
}
}


public void handleForGate() {
Thread handler = new Thread(() -> {
while (true) {
try {
Thread.sleep(1 * 900);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
currentFrame = false;


lock.lock();
try {
newFrame.signal();
} finally {
lock.unlock();
}
}
}
});
handler.start();
}

}

Apache Camel 还支持以下 节流器机制:

from("seda:a").throttle(100).asyncDelayed().to("seda:b");

这是对上面的 LeakyBucket 代码的更新。 这可以满足每秒超过1000个请求。

import lombok.SneakyThrows;
import java.util.concurrent.TimeUnit;


class LeakyBucket {
private long minTimeNano; // sec / billion
private long sched = System.nanoTime();


/**
* Create a rate limiter using the leakybucket alg.
* @param perSec the number of requests per second
*/
public LeakyBucket(double perSec) {
if (perSec <= 0.0) {
throw new RuntimeException("Invalid rate " + perSec);
}
this.minTimeNano = (long) (1_000_000_000.0 / perSec);
}


@SneakyThrows public void consume() {
long curr = System.nanoTime();
long timeLeft;


synchronized (this) {
timeLeft = sched - curr + minTimeNano;
sched += minTimeNano;
}
if (timeLeft <= minTimeNano) {
return;
}
TimeUnit.NANOSECONDS.sleep(timeLeft);
}
}

以及上述的单位:

import com.google.common.base.Stopwatch;
import org.junit.Ignore;
import org.junit.Test;


import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;


public class LeakyBucketTest {
@Test @Ignore public void t() {
double numberPerSec = 10000;
LeakyBucket b = new LeakyBucket(numberPerSec);
Stopwatch w = Stopwatch.createStarted();
IntStream.range(0, (int) (numberPerSec * 5)).parallel().forEach(
x -> b.consume());
System.out.printf("%,d ms%n", w.elapsed(TimeUnit.MILLISECONDS));
}
}

如果您需要一个基于 Java 的滑动窗口速率限制器,它可以在分布式系统上运行,那么您可能需要查看一下 https://github.com/mokies/ratelimitj项目。

Redis 支持的配置将 IP 请求限制在每分钟50个,如下所示:

import com.lambdaworks.redis.RedisClient;
import es.moki.ratelimitj.core.LimitRule;


RedisClient client = RedisClient.create("redis://localhost");
Set<LimitRule> rules = Collections.singleton(LimitRule.of(1, TimeUnit.MINUTES, 50)); // 50 request per minute, per key
RedisRateLimit requestRateLimiter = new RedisRateLimit(client, rules);


boolean overLimit = requestRateLimiter.overLimit("ip:127.0.0.2");

有关 Redis 配置的详细信息,请参阅 https://github.com/mokies/ratelimitj/tree/master/ratelimitj-redis

这里是一个简单的速率限制器的一个小高级版本

/**
* Simple request limiter based on Thread.sleep method.
* Create limiter instance via {@link #create(float)} and call {@link #consume()} before making any request.
* If the limit is exceeded cosume method locks and waits for current call rate to fall down below the limit
*/
public class RequestRateLimiter {


private long minTime;


private long lastSchedAction;
private double avgSpent = 0;


ArrayList<RatePeriod> periods;




@AllArgsConstructor
public static class RatePeriod{


@Getter
private LocalTime start;


@Getter
private LocalTime end;


@Getter
private float maxRate;
}




/**
* Create request limiter with maxRate - maximum number of requests per second
* @param maxRate - maximum number of requests per second
* @return
*/
public static RequestRateLimiter create(float maxRate){
return new RequestRateLimiter(Arrays.asList( new RatePeriod(LocalTime.of(0,0,0),
LocalTime.of(23,59,59), maxRate)));
}


/**
* Create request limiter with ratePeriods calendar - maximum number of requests per second in every period
* @param ratePeriods - rate calendar
* @return
*/
public static RequestRateLimiter create(List<RatePeriod> ratePeriods){
return new RequestRateLimiter(ratePeriods);
}


private void checkArgs(List<RatePeriod> ratePeriods){


for (RatePeriod rp: ratePeriods ){
if ( null == rp || rp.maxRate <= 0.0f || null == rp.start || null == rp.end )
throw new IllegalArgumentException("list contains null or rate is less then zero or period is zero length");
}
}


private float getCurrentRate(){


LocalTime now = LocalTime.now();


for (RatePeriod rp: periods){
if ( now.isAfter( rp.start ) && now.isBefore( rp.end ) )
return rp.maxRate;
}


return Float.MAX_VALUE;
}






private RequestRateLimiter(List<RatePeriod> ratePeriods){


checkArgs(ratePeriods);
periods = new ArrayList<>(ratePeriods.size());
periods.addAll(ratePeriods);


this.minTime = (long)(1000.0f / getCurrentRate());
this.lastSchedAction = System.currentTimeMillis() - minTime;
}


/**
* Call this method before making actual request.
* Method call locks until current rate falls down below the limit
* @throws InterruptedException
*/
public void consume() throws InterruptedException {


long timeLeft;


synchronized(this) {
long curTime = System.currentTimeMillis();


minTime = (long)(1000.0f / getCurrentRate());
timeLeft = lastSchedAction + minTime - curTime;


long timeSpent = curTime - lastSchedAction + timeLeft;
avgSpent = (avgSpent + timeSpent) / 2;


if(timeLeft <= 0) {
lastSchedAction = curTime;
return;
}


lastSchedAction = curTime + timeLeft;
}


Thread.sleep(timeLeft);
}


public synchronized float getCuRate(){
return (float) ( 1000d / avgSpent);
}
}

和单元测试

import org.junit.Assert;
import org.junit.Test;


import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


public class RequestRateLimiterTest {




@Test(expected = IllegalArgumentException.class)
public void checkSingleThreadZeroRate(){


// Zero rate
RequestRateLimiter limiter = RequestRateLimiter.create(0);
try {
limiter.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}


@Test
public void checkSingleThreadUnlimitedRate(){


// Unlimited
RequestRateLimiter limiter = RequestRateLimiter.create(Float.MAX_VALUE);


long started = System.currentTimeMillis();
for ( int i = 0; i < 1000; i++ ){


try {
limiter.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}


long ended = System.currentTimeMillis();
System.out.println( "Current rate:" + limiter.getCurRate() );
Assert.assertTrue( ((ended - started) < 1000));
}


@Test
public void rcheckSingleThreadRate(){


// 3 request per minute
RequestRateLimiter limiter = RequestRateLimiter.create(3f/60f);


long started = System.currentTimeMillis();
for ( int i = 0; i < 3; i++ ){


try {
limiter.consume();
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}


long ended = System.currentTimeMillis();


System.out.println( "Current rate:" + limiter.getCurRate() );
Assert.assertTrue( ((ended - started) >= 60000 ) & ((ended - started) < 61000));
}






@Test
public void checkSingleThreadRateLimit(){


// 100 request per second
RequestRateLimiter limiter = RequestRateLimiter.create(100);


long started = System.currentTimeMillis();
for ( int i = 0; i < 1000; i++ ){


try {
limiter.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}


long ended = System.currentTimeMillis();


System.out.println( "Current rate:" + limiter.getCurRate() );
Assert.assertTrue( (ended - started) >= ( 10000 - 100 ));
}


@Test
public void checkMultiThreadedRateLimit(){


// 100 request per second
RequestRateLimiter limiter = RequestRateLimiter.create(100);
long started = System.currentTimeMillis();


List<Future<?>> tasks = new ArrayList<>(10);
ExecutorService exec = Executors.newFixedThreadPool(10);


for ( int i = 0; i < 10; i++ ) {


tasks.add( exec.submit(() -> {
for (int i1 = 0; i1 < 100; i1++) {


try {
limiter.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}) );
}


tasks.stream().forEach( future -> {
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});


long ended = System.currentTimeMillis();
System.out.println( "Current rate:" + limiter.getCurRate() );
Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
}


@Test
public void checkMultiThreaded32RateLimit(){


// 0,2 request per second
RequestRateLimiter limiter = RequestRateLimiter.create(0.2f);
long started = System.currentTimeMillis();


List<Future<?>> tasks = new ArrayList<>(8);
ExecutorService exec = Executors.newFixedThreadPool(8);


for ( int i = 0; i < 8; i++ ) {


tasks.add( exec.submit(() -> {
for (int i1 = 0; i1 < 2; i1++) {


try {
limiter.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}) );
}


tasks.stream().forEach( future -> {
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});


long ended = System.currentTimeMillis();
System.out.println( "Current rate:" + limiter.getCurRate() );
Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
}


@Test
public void checkMultiThreadedRateLimitDynamicRate(){


// 100 request per second
RequestRateLimiter limiter = RequestRateLimiter.create(100);
long started = System.currentTimeMillis();


List<Future<?>> tasks = new ArrayList<>(10);
ExecutorService exec = Executors.newFixedThreadPool(10);


for ( int i = 0; i < 10; i++ ) {


tasks.add( exec.submit(() -> {


Random r = new Random();
for (int i1 = 0; i1 < 100; i1++) {


try {
limiter.consume();
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}) );
}


tasks.stream().forEach( future -> {
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});


long ended = System.currentTimeMillis();
System.out.println( "Current rate:" + limiter.getCurRate() );
Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
}


}

下面的实现可以处理任意的请求时间精度,对每个请求具有 O (1)时间复杂度,不需要任何额外的缓冲区,例如 O (1)空间复杂度,此外它不需要后台线程释放令牌,而是根据自上次请求以来传递的时间释放令牌。

class RateLimiter {
int limit;
double available;
long interval;


long lastTimeStamp;


RateLimiter(int limit, long interval) {
this.limit = limit;
this.interval = interval;


available = 0;
lastTimeStamp = System.currentTimeMillis();
}


synchronized boolean canAdd() {
long now = System.currentTimeMillis();
// more token are released since last request
available += (now-lastTimeStamp)*1.0/interval*limit;
if (available>limit)
available = limit;


if (available<1)
return false;
else {
available--;
lastTimeStamp = now;
return true;
}
}
}

我的解决方案: 一个简单的 util 方法,你可以修改它来创建一个包装类。

public static Runnable throttle (Runnable realRunner, long delay) {
Runnable throttleRunner = new Runnable() {
// whether is waiting to run
private boolean _isWaiting = false;
// target time to run realRunner
private long _timeToRun;
// specified delay time to wait
private long _delay = delay;
// Runnable that has the real task to run
private Runnable _realRunner = realRunner;
@Override
public void run() {
// current time
long now;
synchronized (this) {
// another thread is waiting, skip
if (_isWaiting) return;
now = System.currentTimeMillis();
// update time to run
// do not update it each time since
// you do not want to postpone it unlimited
_timeToRun = now+_delay;
// set waiting status
_isWaiting = true;
}
try {
Thread.sleep(_timeToRun-now);


} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// clear waiting status before run
_isWaiting = false;
// do the real task
_realRunner.run();
}
}};
return throttleRunner;
}

线程退弹和油门开始