Detailed Explanation of Guava RateLimiter’s Throttling Mechanism

Throttling Algorithms

Guava RateLimiter

// RateLimiter provides two factory methods. At last, the following two functions will be called to generate two subclasses of RateLimiter.
static RateLimiter create(SleepingStopwatch stopwatch, double permitsPerSecond) {
RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}
static RateLimiter create(
SleepingStopwatch stopwatch, double permitsPerSecond, long warmupPeriod, TimeUnit unit,
double coldFactor) {
RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);
rateLimiter.setRate(permitsPerSecond);
return rateLimiter;
}

SmoothBursty

public void testSmoothBursty() {
RateLimiter r = RateLimiter.create(5);
while (true) {
System.out.println("get 1 tokens: " + r.acquire() + "s");
}
/**
* output: Basically, the limiter is executed every 0.2s, which complies with the setting of releasing five tokens per second.
* get 1 tokens: 0.0s
* get 1 tokens: 0.182014s
* get 1 tokens: 0.188464s
* get 1 tokens: 0.198072s
* get 1 tokens: 0.196048s
* get 1 tokens: 0.197538s
* get 1 tokens: 0.196049s
*/
}
public void testSmoothBursty2() {
RateLimiter r = RateLimiter.create(2);
while (true)
{
System.out.println("get 1 tokens: " + r.acquire(1) + "s");
try {
Thread.sleep(2000);
} catch (Exception e) {}
System.out.println("get 1 tokens: " + r.acquire(1) + "s");
System.out.println("get 1 tokens: " + r.acquire(1) + "s");
System.out.println("get 1 tokens: " + r.acquire(1) + "s");
System.out.println("end");
/**
* output:
* get 1 tokens: 0.0s
* get 1 tokens: 0.0s
* get 1 tokens: 0.0s
* get 1 tokens: 0.0s
* end
* get 1 tokens: 0.499796s
* get 1 tokens: 0.0s
* get 1 tokens: 0.0s
* get 1 tokens: 0.0s
*/
}
}
public void testSmoothBursty3() {
RateLimiter r = RateLimiter.create(5);
while (true)
{
System.out.println("get 5 tokens: " + r.acquire(5) + "s");
System.out.println("get 1 tokens: " + r.acquire(1) + "s");
System.out.println("get 1 tokens: " + r.acquire(1) + "s");
System.out.println("get 1 tokens: " + r.acquire(1) + "s");
System.out.println("end");
/**
* output:
* get 5 tokens: 0.0s
* get 1 tokens: 0.996766s As a result of the delayed processing method, this thread waits for additional time on behalf of a previous thread.
* get 1 tokens: 0.194007s
* get 1 tokens: 0.196267s
* end
* get 5 tokens: 0.195756s
* get 1 tokens: 0.995625s As a result of the delayed processing method, this thread waits for additional time on behalf of a previous thread.
* get 1 tokens: 0.194603s
* get 1 tokens: 0.196866s
*/
}
}

SmoothWarmingUp

public void testSmoothwarmingUp() {
RateLimiter r = RateLimiter.create(2, 3, TimeUnit.SECONDS);
while (true)
{
System.out.println("get 1 tokens: " + r.acquire(1) + "s");
System.out.println("get 1 tokens: " + r.acquire(1) + "s");
System.out.println("get 1 tokens: " + r.acquire(1) + "s");
System.out.println("get 1 tokens: " + r.acquire(1) + "s");
System.out.println("end");
/**
* output:
* get 1 tokens: 0.0s
* get 1 tokens: 1.329289s
* get 1 tokens: 0.994375s
* get 1 tokens: 0.662888s The total amount of time that has been taken for acquiring these three tokens is 3s.
* end
* get 1 tokens: 0.49764s Tokens are acquired at the normal rate of two tokens/s.
* get 1 tokens: 0.497828s
* get 1 tokens: 0.49449s
* get 1 tokens: 0.497522s
*/
}
}

Source Code Analysis

//SmoothRateLimiter.java
//The number of currently stored tokens
double storedPermits;
//The maximum number of stored tokens
double maxPermits;
//The interval to add tokens
double stableIntervalMicros;
/**
* The time for the next thread to call the acquire() method
* RateLimiter allows preconsumption. After a thread preconsumes any tokens,
the next thread needs to wait until nextFreeTicketMicros to acquire tokens.
*/
private long nextFreeTicketMicros = 0L;

SmoothBursty

public double acquire(int permits) {
// Calculate the period of time that the next thread must wait to call the acquire() method
long microsToWait = reserve(permits);
// Put the thread into sleep
stopwatch.sleepMicrosUninterruptibly(microsToWait);
return 1.0 * microsToWait / SECONDS.toMicros(1L);
}
final long reserve(int permits) {
checkPermits(permits);
// Concurrent operations are performed, so the synchronized() method is used.
synchronized (mutex()) {
return reserveAndGetWaitLength(permits, stopwatch.readMicros());
}
}
final long reserveAndGetWaitLength(int permits, long nowMicros) {
// Calculate the earliest time for acquiring the target number of tokens starting from the current time.
long momentAvailable = reserveEarliestAvailable(permits, nowMicros);
// Calculate the difference between the two time, and determine the period that the next thread needs to wait
return max(momentAvailable - nowMicros, 0);
}
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// Refresh the number of tokens. This is done every time the acquire() method is called.
resync(nowMicros);
long returnValue = nextFreeTicketMicros;
// Compare the number of available tokens and the target number of tokens that need to be acquired to calculate the number of tokens that can be acquired for the moment.
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// freshPermits refers to the number of tokens that need to be preconsumed, which is equal to the target number of tokens minus the number of currently available tokens.
double freshPermits = requiredPermits - storedPermitsToSpend;
// When the number of available tokens is smaller than the target number of tokens needed, some tokens will be preconsumed.
// waitMicros is the period of time required for the preconsumed tokens. watiMicros should be added to the time that the next thread originally needs to wait without considering the preconsumption.
long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
// The implementation of storedPermitsToWaitTime in SmoothWarmingUp is different from that in SmoothBursty. It is used to implement the warm-up period.
// In SmoothBursty, storedPermitsToWaitTime directly returns 0. Therefore, waitMicros is the period of time that is required for the preconsumed tokens.
try {
//Update nextFreeTicketMicros. The period of time required for preconsumed tokens should be assumed by the next thread.
this.nextFreeTicketMicros = LongMath.checkedAdd(nextFreeTicketMicros, waitMicros);
} catch (ArithmeticException e) {
this.nextFreeTicketMicros = Long.MAX_VALUE;
}
// Update the number of tokens. The minimum value is 0.
this.storedPermits -= storedPermitsToSpend;
// Return the value of the original nextFreeTicketMicros. No need to wait additional time for preconsumed tokens.
return returnValue;
}
// SmoothBurest
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
return 0L;
}
void resync(long nowMicros) {
// The current time is greater than nextFreeTicketMicros, so it refreshes tokens and nextFreeTicketMicros.
if (nowMicros > nextFreeTicketMicros) {
// The coolDownIntervalMicros function generates a token per a specified number of seconds. Its implementation in SmoothWarmingUp is different than that in SmoothBursty.
// The coolDownIntervalMicros of SmoothBursty directly returns stableIntervalMicros.
// Deduct the time for refreshing tokens from the current time to determine the difference. Divide the difference with the token-adding-interval to determine the number of tokens that need to be added during this period of time.
storedPermits = min(maxPermits,
storedPermits
+ (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros());
nextFreeTicketMicros = nowMicros;
}
// If the current time is earlier than nextFreeTicketMicros, the thread that needs to acquire the token must wait until nextFreeTicketMicros.
The additional wait time is added to the next thread that needs to acquire the token.
}
double coolDownIntervalMicros() {
return stableIntervalMicros;
}

SmoothWarmingUp

// For a SmoothWarmingUp rate limiter, the wait time is the area of the square or the trapezoid in the preceding figure.
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
/**
* The number of permits in excess of the thresholdPermits
*/
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
/**
* If the number of the currently stored permits exceeds the thresholdPermits
*/
if (availablePermitsAboveThreshold > 0.0) {
/**
* The number of needed permits in excess of the thresholdPermits
*/
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
/**
* The are of the trapezoid
*
* h × (b1 + b2)/2
*
* where h is permitsAboveThresholdToTake, which is equivalent to the number of permits to be preconsumed
* b2 is the longer base, which is permitsToTime(availablePermitsAboveThreshold)
* b1 is the shorter base, which is permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake)
*/
micros = (long) (permitsAboveThresholdToTake
* (permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake)) / 2.0);
/**
* Minus the number of permits in excess of the thresholdPermits (already acquired)
*/
permitsToTake -= permitsAboveThresholdToTake;
}
/**
The area of the stable period equals to length × width
*/
micros += (stableIntervalMicros × permitsToTake);
return micros;
}
double coolDownIntervalMicros() {
/**
The number of permits increased per second equals to warmup time/maxPermits. During the warmup period, the number of permits increases to
* maxPermits
*/
return warmupPeriodMicros / maxPermits;
}

Postscript

References

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store