Implement Leaky Bucket Rate Limiter in Spring Webflux
Case: Implement middleware to rate limit the requests.
We can use WebFilter to intercept the request and check if the request allows us to pass/reach the limit.
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
@Slf4j
public class RateLimiterFilter implements WebFilter {
private final Queue<Long> bucket;
private final int capacity;
private final int rate;
public RateLimiterFilter(int capacity, int rate) {
this.capacity = capacity;
this.rate = rate;
this.bucket = new ConcurrentLinkedQueue<>();
startLeak();
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return processRequest()
.then(chain.filter(exchange))
.onErrorResume(throwable -> handleRateLimitExceeded(exchange));
}
private Mono<Void> processRequest() {
return Mono.defer(() -> {
if (bucket.size() >= capacity) {
return Mono.error(new RuntimeException("Rate limit exceeded"));
}
bucket.add(System.currentTimeMillis());
return Mono.empty();
});
}
private void startLeak() {
Mono.delay(Duration.ofMillis(1000))
.repeat()
.subscribe(tick -> leak());
}
private void leak() {
long currentTime = System.currentTimeMillis();
bucket.removeIf(timestamp -> currentTime - timestamp > 1000);
}
private Mono<Void> handleRateLimitExceeded(ServerWebExchange exchange) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
return response.setComplete();
}
}
- The
filter
method now returns aMono<Boolean>
indicating whether the request is allowed based on the rate limit. - The
handleRateLimitExceeded
method is only called when the rate limit is exceeded (returnsfalse
inprocessRequest
). - The
processRequest
method now returns aMono<Boolean>
instead ofMono<Void)
to indicate whether the request is allowed. - The
filter
method usesflatMap
to conditionally execute the next filter in the chain based on the result ofprocessRequest
. - The
startLeak
the method initializes a scheduler that triggers a leak every 1/rate second. - The
leak
method removes expired elements from the bucket, simulating the leaky bucket behavior.
I’m using ConcurrentLinkedQueue
, because ConcurrentLinkedQueue
is a non-blocking, thread-safe implementation of the Queue
the interface provided in the java.util.concurrent
package. It's specifically designed for scenarios where multiple threads may need to access and modify a queue concurrently.
Here are some key characteristics and features of ConcurrentLinkedQueue
:
- Non-blocking Operations: As a “concurrent” collection,
ConcurrentLinkedQueue
is designed to allow multiple threads to perform operations concurrently without the need for explicit locking. It achieves this through the use of lock-free algorithms and techniques. - FIFO Ordering: Like other queue implementations,
ConcurrentLinkedQueue
maintains the First-In-First-Out (FIFO) ordering of elements. Elements are added at the tail of the queue and removed from the head. - Scalability: It is designed for good scalability in high-concurrency scenarios. Multiple threads can concurrently perform operations such as
add
,poll
, andpeek
without blocking each other. - No Size Limit: Unlike some other concurrent collections,
ConcurrentLinkedQueue
does not have a fixed capacity. It can grow dynamically as elements are added. - Iterators: Iterators provided by
ConcurrentLinkedQueue
are weakly consistent. They reflect some snapshot of the queue's state at some point in time and may not show the effects of concurrent modifications.
And Last, we have to add configuration files to make it work.
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.server.WebFilter;
@Configuration
public class WebConfig {
@Bean
public WebFilter rateLimiterFilter() {
return new RateLimiterFilter(3, 1); // Adjust capacity and rate as needed
}
}
Capacity: represents the maximum number of tokens (or requests) that the leaky bucket can hold at any given time.
- Role in Leak Rate: The bucket can accumulate tokens up to its capacity. When a request arrives, a token is added to the bucket. If the bucket is already at its capacity, the incoming request might be subject to a rate-limiting policy (e.g., delayed, rejected, or handled differently).
Rate: defines how quickly the leaky bucket releases or “leaks” tokens. It is often expressed as the number of tokens allowed per unit of time.
- Role in Regulating Requests: The rate determines how frequently new tokens are added to the bucket. For example, if the rate is set to 10 tokens per second, a new token will be added every 100 milliseconds. This controls the rate at which requests are allowed to be processed.
Thanks for reading.
#NeverEndingImprovement
reference:
https://www.baeldung.com/java-queue-linkedblocking-concurrentlinked