Implement Leaky Bucket Rate Limiter in Spring Webflux

Denny Lesmana
3 min readDec 4, 2023

Case: Implement middleware to rate limit the requests.

We can use WebFilter to intercept the request and check if the request allows us to pass/reach the limit.

import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;

@Slf4j
public class RateLimiterFilter implements WebFilter {

private final Queue<Long> bucket;
private final int capacity;
private final int rate;

public RateLimiterFilter(int capacity, int rate) {
this.capacity = capacity;
this.rate = rate;
this.bucket = new ConcurrentLinkedQueue<>();
startLeak();
}

@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return processRequest()
.then(chain.filter(exchange))
.onErrorResume(throwable -> handleRateLimitExceeded(exchange));
}
private Mono<Void> processRequest() {
return Mono.defer(() -> {
if (bucket.size() >= capacity) {
return Mono.error(new RuntimeException("Rate limit exceeded"));
}
bucket.add(System.currentTimeMillis());
return Mono.empty();
});
}

private void startLeak() {
Mono.delay(Duration.ofMillis(1000))
.repeat()
.subscribe(tick -> leak());
}

private void leak() {
long currentTime = System.currentTimeMillis();
bucket.removeIf(timestamp -> currentTime - timestamp > 1000);
}


private Mono<Void> handleRateLimitExceeded(ServerWebExchange exchange) {
ServerHttpResponse response = exchange.getResponse();
response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
return response.setComplete();
}
}
  • The filter method now returns a Mono<Boolean> indicating whether the request is allowed based on the rate limit.
  • The handleRateLimitExceeded method is only called when the rate limit is exceeded (returns false in processRequest).
  • The processRequest method now returns a Mono<Boolean> instead of Mono<Void) to indicate whether the request is allowed.
  • The filter method uses flatMap to conditionally execute the next filter in the chain based on the result of processRequest.
  • The startLeak the method initializes a scheduler that triggers a leak every 1/rate second.
  • The leak method removes expired elements from the bucket, simulating the leaky bucket behavior.

I’m using ConcurrentLinkedQueue, because ConcurrentLinkedQueue is a non-blocking, thread-safe implementation of the Queue the interface provided in the java.util.concurrent package. It's specifically designed for scenarios where multiple threads may need to access and modify a queue concurrently.

Here are some key characteristics and features of ConcurrentLinkedQueue:

  1. Non-blocking Operations: As a “concurrent” collection, ConcurrentLinkedQueue is designed to allow multiple threads to perform operations concurrently without the need for explicit locking. It achieves this through the use of lock-free algorithms and techniques.
  2. FIFO Ordering: Like other queue implementations, ConcurrentLinkedQueue maintains the First-In-First-Out (FIFO) ordering of elements. Elements are added at the tail of the queue and removed from the head.
  3. Scalability: It is designed for good scalability in high-concurrency scenarios. Multiple threads can concurrently perform operations such as add, poll, and peek without blocking each other.
  4. No Size Limit: Unlike some other concurrent collections, ConcurrentLinkedQueue does not have a fixed capacity. It can grow dynamically as elements are added.
  5. Iterators: Iterators provided by ConcurrentLinkedQueue are weakly consistent. They reflect some snapshot of the queue's state at some point in time and may not show the effects of concurrent modifications.

And Last, we have to add configuration files to make it work.

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.server.WebFilter;

@Configuration
public class WebConfig {

@Bean
public WebFilter rateLimiterFilter() {
return new RateLimiterFilter(3, 1); // Adjust capacity and rate as needed
}
}

Capacity: represents the maximum number of tokens (or requests) that the leaky bucket can hold at any given time.

  • Role in Leak Rate: The bucket can accumulate tokens up to its capacity. When a request arrives, a token is added to the bucket. If the bucket is already at its capacity, the incoming request might be subject to a rate-limiting policy (e.g., delayed, rejected, or handled differently).

Rate: defines how quickly the leaky bucket releases or “leaks” tokens. It is often expressed as the number of tokens allowed per unit of time.

  • Role in Regulating Requests: The rate determines how frequently new tokens are added to the bucket. For example, if the rate is set to 10 tokens per second, a new token will be added every 100 milliseconds. This controls the rate at which requests are allowed to be processed.

Thanks for reading.

#NeverEndingImprovement

reference:

https://www.baeldung.com/java-queue-linkedblocking-concurrentlinked

--

--

Denny Lesmana

Senior Full Stack Engineer @ Ajaib | Tech & Investment Enthusiast | twitter: https://twitter.com/Denny_lesmanaa