设计一个限流器(Rate Limiter)-Java
分享一个大牛的人工智能教程。零基础!通俗易懂!风趣幽默!希望你也加入到人工智能的队伍中来!请轻击人工智能教程https://www.captainai.net/troubleshooter
这是一个生产级限流器设计,重点突出:并发安全、可测试性、策略灵活度、性能考量,以及如何验证它。
这里提供两种经典实现,并说明各自的适用场景。
一、限流器核心需求
你需要先问清楚业务场景:
限流维度:基于用户ID、IP、API路径、还是全局?
限流算法:固定窗口、滑动窗口、令牌桶、漏桶?
阈值类型:QPS、每分钟、每小时?
超限处理:拒绝请求、排队等待、降级返回?
分布式 vs 单机:单机能用本地缓存,分布式需要Redis
例如:“假设需要单机、针对某个key(比如用户ID)、QPS限流,超限直接拒绝。”
二、实现1:滑动窗口日志(最准确,首选)
核心思路:维护时间戳队列,每次请求前清理1秒前的记录。
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.Deque; import java.util.ArrayDeque; import java.time.Instant; /** * 滑动窗口限流器(基于队列) * 优点:边界精确,无突发流量风险 * 缺点:内存占用随请求量增长(可优化为环形缓冲) */ public class SlidingWindowRateLimiter { private final int maxRequestsPerSecond; private final ConcurrentHashMap<String, Window> limiters = new ConcurrentHashMap<>(); public SlidingWindowRateLimiter(int maxRequestsPerSecond) { this.maxRequestsPerSecond = maxRequestsPerSecond; } public boolean allowRequest(String key) { Window window = limiters.computeIfAbsent(key, k -> new Window()); return window.tryAcquire(); } private class Window { private final Deque<Long> timestamps = new ArrayDeque<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); boolean tryAcquire() { long now = Instant.now().toEpochMilli(); long oneSecondAgo = now - 1000; lock.writeLock().lock(); try { // 清理1秒前的请求记录 while (!timestamps.isEmpty() && timestamps.peekFirst() < oneSecondAgo) { timestamps.pollFirst(); } if (timestamps.size() < maxRequestsPerSecond) { timestamps.addLast(now); return true; } return false; } finally { lock.writeLock().unlock(); } } } }性能瓶颈在哪?
每个key独立锁,高并发下会有争用
队列可能无限增长(虽然会清理,但突发时可堆积大量时间戳)
优化方案:改用环形数组 + CAS无锁实现(下面给出)
三、实现2:令牌桶算法(生产最常用)
核心思路:以固定速率往桶里加令牌,请求时消耗令牌。
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; /** * 令牌桶限流器 * 优点:允许突发流量,平滑 * 缺点:实现稍复杂,突发可能导致下游过载 */ public class TokenBucketRateLimiter { private final int capacity; // 桶容量(允许的最大突发请求) private final int tokensPerSecond; // 令牌生成速率 private final ConcurrentHashMap<String, Bucket> buckets = new ConcurrentHashMap<>(); public TokenBucketRateLimiter(int capacity, int tokensPerSecond) { this.capacity = capacity; this.tokensPerSecond = tokensPerSecond; } public boolean allowRequest(String key) { Bucket bucket = buckets.computeIfAbsent(key, k -> new Bucket(capacity, tokensPerSecond)); return bucket.tryAcquire(); } private static class Bucket { private final int capacity; private final double tokensPerMillisecond; private final AtomicLong currentTokens = new AtomicLong(); private final AtomicLong lastRefillTime = new AtomicLong(); private final ReentrantLock lock = new ReentrantLock(); // 避免多线程重复计算 Bucket(int capacity, int tokensPerSecond) { this.capacity = capacity; this.tokensPerMillisecond = tokensPerSecond / 1000.0; this.currentTokens.set(capacity); this.lastRefillTime.set(System.currentTimeMillis()); } boolean tryAcquire() { while (true) { long now = System.currentTimeMillis(); refillTokens(now); long current = currentTokens.get(); if (current > 0) { if (currentTokens.compareAndSet(current, current - 1)) { return true; } } else { return false; } } } private void refillTokens(long now) { long last = lastRefillTime.get(); if (last == now) return; lock.lock(); try { // 双重检查 if (lastRefillTime.get() == last) { long elapsed = now - last; double newTokens = elapsed * tokensPerMillisecond; long newTotal = Math.min(capacity, currentTokens.get() + (long) newTokens); currentTokens.set(newTotal); lastRefillTime.set(now); } } finally { lock.unlock(); } } } }四、测试策略
怎么验证这个限流器是正确的?
这里给出多维度测试方案:
1. 单元测试(功能正确性)
@Test void testSlidingWindowExactLimit() throws InterruptedException { SlidingWindowRateLimiter limiter = new SlidingWindowRateLimiter(3); String key = "user1"; // 前3次应该成功 assertTrue(limiter.allowRequest(key)); assertTrue(limiter.allowRequest(key)); assertTrue(limiter.allowRequest(key)); // 第4次应该拒绝 assertFalse(limiter.allowRequest(key)); // 等待1秒后,应该恢复 Thread.sleep(1000); assertTrue(limiter.allowRequest(key)); }2. 并发测试(必测)
@Test void testConcurrentAccess() throws InterruptedException { SlidingWindowRateLimiter limiter = new SlidingWindowRateLimiter(100); String key = "concurrentTest"; int threadCount = 200; CountDownLatch latch = new CountDownLatch(threadCount); AtomicInteger successCount = new AtomicInteger(); for (int i = 0; i < threadCount; i++) { new Thread(() -> { if (limiter.allowRequest(key)) successCount.incrementAndGet(); latch.countDown(); }).start(); } latch.await(); assertTrue(successCount.get() <= 100); // 不能超过阈值 }3. 准确性测试(验证滑动窗口的边界)
@Test void testSlidingWindowBoundary() throws InterruptedException { SlidingWindowRateLimiter limiter = new SlidingWindowRateLimiter(5); String key = "boundaryTest"; // 在0-500ms内发5个请求 for (int i = 0; i < 5; i++) { assertTrue(limiter.allowRequest(key)); Thread.sleep(100); } // 第6个应该被拒绝 assertFalse(limiter.allowRequest(key)); // 等待到1000ms边界,最早的那个请求已滑出窗口 Thread.sleep(600); assertTrue(limiter.allowRequest(key)); }4. 性能基准测试(必测)
@Test void benchmarkThroughput() { SlidingWindowRateLimiter limiter = new SlidingWindowRateLimiter(10000); String key = "benchmark"; int iterations = 1_000_000; long start = System.nanoTime(); for (int i = 0; i < iterations; i++) { limiter.allowRequest(key); } long duration = System.nanoTime() - start; double opsPerSecond = iterations * 1_000_000_000.0 / duration; System.out.printf("Throughput: %.2f ops/sec%n", opsPerSecond); // 期望值:> 5M ops/sec (取决于硬件) }5. 混沌测试
@Test void testUnderExtremeLoad() { // 模拟真实场景:大量不同key同时涌入 ExecutorService executor = Executors.newFixedThreadPool(100); SlidingWindowRateLimiter limiter = new SlidingWindowRateLimiter(100); for (int i = 0; i < 1000; i++) { final String key = "user_" + (i % 50); executor.submit(() -> { for (int j = 0; j < 100; j++) { limiter.allowRequest(key); } }); } executor.shutdown(); // 验证没有死锁、性能不退化 }五、进阶问题
Q1:分布式限流怎么做?
单机限流扩展性差,分布式场景用Redis + Lua脚本保证原子性。
方案:
Key格式:
ratelimit:{api}:{user_id}滑动窗口用Redis的ZSET,清理旧数据用zremrangebyscore
配合sentinel做高可用
缺陷:网络开销大,适合低频API
Q2:如何避免限流器成为性能瓶颈?
无锁设计:改用LongAdder + 时间轮(Netty的HashedWheelTimer)
本地缓存 + 异步同步:如果是分布式场景,使用每个节点本地限流+定期配额同步(如100ms向Redis领取配额)
预热与放量:支持初始令牌数配置,避免冷启动瞬间打满
Q3:如果我要测试限流器在高并发下的内存泄漏,怎么做?
写一个持续运行7天的压测脚本,每10分钟调用一次
Runtime.getRuntime().totalMemory(),并输出Heap dump差异。
同时监控GC日志,确认Deque和ConcurrentHashMap没有无限增长。
对于非活跃key,加入LRU淘汰机制(用Guava Cache包装),避免内存无限增长。
六、最终交付清单
| 维度 | 实现要点 |
|---|---|
| 接口 | boolean allowRequest(String key) |
| 算法 | 滑动窗口(精确)/ 令牌桶(平滑突发) |
| 并发 | ReentrantReadWriteLock / CAS 无锁 |
| 内存 | 自动清理过期key + LRU淘汰 |
| 可观测 | 暴露metrics(QPS、拒绝数、令牌数) |
| 可测试 | 单元测试、并发测试、性能基准、Chaos Monkey |
| 可配置 | 阈值、窗口大小、算法类型(策略模式) |
“限流器虽然看似简单,但它是微服务的第一道防线。不仅要保证它逻辑正确,还要确保它在双十一那样的突发流量下依然稳定,同时为业务方提供足够细粒度的监控数据用于动态调整阈值。”
