Answer A: OpenAI GPT-5.5
import math import threading import time import unittest from collections import deque from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field # Design note: # This limiter admits a request only when two independent checks agree: # 1. a token bucket has at least cost tokens, allowing short bursts up to burst; # 2. an exact sliding-window log has room for cost more admitted units in the last # window_seconds, where the window limit is rate * window_seconds. # If the token bucket says yes but the sliding window is full, the request is # denied and no tokens are spent; if the w...
Show Full Answer ▼
import math import threading import time import unittest from collections import deque from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field # Design note: # This limiter admits a request only when two independent checks agree: # 1. a token bucket has at least cost tokens, allowing short bursts up to burst; # 2. an exact sliding-window log has room for cost more admitted units in the last # window_seconds, where the window limit is rate * window_seconds. # If the token bucket says yes but the sliding window is full, the request is # denied and no tokens are spent; if the window has room but tokens are absent, # it is also denied. The exact log stores one entry per admitted request as # (timestamp, cost), not one entry per unit, so allow is amortized O(1): each log # entry is appended once and removed once. Memory is O(number of admitted requests # in the active window) per client. A weighted two-bucket counter would use O(1) # memory but can over- or under-admit near bucket boundaries; this implementation # chooses exactness because correctness is usually more important for abuse # prevention. The client map is lock-striped; each client state has its own lock. # A caller obtains the per-client lock before mutating tokens/window state, so # concurrent allow calls cannot double-spend tokens or lose window updates. class MonotonicClock: def now(self) -> float: return time.monotonic() class ManualClock: def __init__(self, start: float = 0.0): self._now = float(start) self._lock = threading.Lock() def now(self) -> float: with self._lock: return self._now def set(self, value: float) -> None: with self._lock: self._now = float(value) def advance(self, seconds: float) -> None: with self._lock: self._now += float(seconds) @dataclass(frozen=True) class ClientConfig: rate: float burst: float window_seconds: float = 1.0 def validated(self) -> 'ClientConfig': rate = float(self.rate) burst = float(self.burst) window = float(self.window_seconds) if not math.isfinite(rate) or rate <= 0: raise ValueError('rate must be a positive finite number') if not math.isfinite(burst) or burst < 1: raise ValueError('burst must be a finite number >= 1') if not math.isfinite(window) or window <= 0: raise ValueError('window_seconds must be a positive finite number') return ClientConfig(rate, burst, window) @property def window_limit(self) -> float: return self.rate * self.window_seconds @dataclass class _ClientState: config: ClientConfig tokens: float last_refill: float last_seen: float events: deque = field(default_factory=deque) window_sum: float = 0.0 lock: threading.Lock = field(default_factory=threading.Lock) class HybridRateLimiter: def __init__( self, rate: float, burst: float, window_seconds: float = 1.0, per_client_configs=None, clock=None, stripes: int = 64, idle_ttl_seconds: float | None = None, cleanup_interval_seconds: float | None = None, ): if stripes <= 0: raise ValueError('stripes must be positive') self._default_config = ClientConfig(rate, burst, window_seconds).validated() self._configs = {} if per_client_configs: for client_id, cfg in per_client_configs.items(): self._configs[client_id] = self._coerce_config(cfg) max_window = self._default_config.window_seconds for cfg in self._configs.values(): max_window = max(max_window, cfg.window_seconds) if idle_ttl_seconds is None: idle_ttl_seconds = max(60.0, 2.0 * max_window) idle_ttl_seconds = float(idle_ttl_seconds) if not math.isfinite(idle_ttl_seconds) or idle_ttl_seconds <= 0: raise ValueError('idle_ttl_seconds must be positive and finite') if idle_ttl_seconds < max_window: raise ValueError('idle_ttl_seconds must be >= the largest window_seconds') self._idle_ttl = idle_ttl_seconds if cleanup_interval_seconds is None: cleanup_interval_seconds = min(60.0, max(1.0, idle_ttl_seconds / 2.0)) cleanup_interval_seconds = float(cleanup_interval_seconds) if not math.isfinite(cleanup_interval_seconds) or cleanup_interval_seconds <= 0: raise ValueError('cleanup_interval_seconds must be positive and finite') self._cleanup_interval = cleanup_interval_seconds self._clock = clock if clock is not None else MonotonicClock() self._clients = {} self._stripe_locks = [threading.Lock() for _ in range(stripes)] self._cleanup_mutex = threading.Lock() self._last_cleanup = self._safe_time() self._eps = 1e-12 @staticmethod def _coerce_config(cfg) -> ClientConfig: if isinstance(cfg, ClientConfig): return cfg.validated() if isinstance(cfg, dict): return ClientConfig(cfg['rate'], cfg['burst'], cfg.get('window_seconds', 1.0)).validated() rate, burst, *rest = cfg window = rest[0] if rest else 1.0 return ClientConfig(rate, burst, window).validated() def _safe_time(self) -> float: t = float(self._clock.now()) if math.isnan(t): raise ValueError('clock returned NaN') return t def _config_for(self, client_id: str) -> ClientConfig: return self._configs.get(client_id, self._default_config) def _stripe_index(self, client_id: str) -> int: return hash(client_id) % len(self._stripe_locks) def _get_state_locked(self, client_id: str) -> _ClientState: idx = self._stripe_index(client_id) stripe = self._stripe_locks[idx] stripe.acquire() try: state = self._clients.get(client_id) if state is None: cfg = self._config_for(client_id) now = self._safe_time() state = _ClientState( config=cfg, tokens=cfg.burst, last_refill=now, last_seen=now, ) self._clients[client_id] = state state.lock.acquire() return state finally: stripe.release() def _advance_and_prune_locked(self, state: _ClientState) -> float: raw_now = self._safe_time() now = max(raw_now, state.last_seen) elapsed = max(0.0, now - state.last_refill) max_useful_elapsed = state.config.burst / state.config.rate elapsed_for_tokens = min(elapsed, max_useful_elapsed) if elapsed_for_tokens > 0: state.tokens = min( state.config.burst, state.tokens + elapsed_for_tokens * state.config.rate, ) state.last_refill = now state.last_seen = now cutoff = now - state.config.window_seconds while state.events and state.events[0][0] <= cutoff + self._eps: _, old_cost = state.events.popleft() state.window_sum -= old_cost if state.window_sum < self._eps: state.window_sum = 0.0 return now def allow(self, client_id: str, cost: int = 1) -> bool: if cost <= 0: raise ValueError('cost must be a positive integer') self._maybe_cleanup() cfg = self._config_for(client_id) if cost > cfg.burst: return False state = self._get_state_locked(client_id) try: now = self._advance_and_prune_locked(state) enough_tokens = state.tokens + self._eps >= cost enough_window = state.window_sum + cost <= state.config.window_limit + self._eps if not (enough_tokens and enough_window): return False state.tokens -= cost if state.tokens < self._eps: state.tokens = 0.0 state.window_sum += cost state.events.append((now, float(cost))) return True finally: state.lock.release() def retry_after(self, client_id: str) -> float: self._maybe_cleanup() state = self._get_state_locked(client_id) try: now = self._advance_and_prune_locked(state) cfg = state.config if cfg.burst < 1 or cfg.window_limit < 1: return math.inf token_wait = 0.0 if state.tokens + self._eps >= 1.0 else (1.0 - state.tokens) / cfg.rate if state.window_sum + 1.0 <= cfg.window_limit + self._eps: window_wait = 0.0 else: window_wait = math.inf remaining = state.window_sum for ts, event_cost in state.events: remaining -= event_cost if remaining + 1.0 <= cfg.window_limit + self._eps: window_wait = max(0.0, ts + cfg.window_seconds - now) break return max(token_wait, window_wait) finally: state.lock.release() def _maybe_cleanup(self) -> None: raw_now = self._safe_time() if raw_now - self._last_cleanup < self._cleanup_interval: return if not self._cleanup_mutex.acquire(blocking=False): return try: now = max(raw_now, self._last_cleanup) if now - self._last_cleanup >= self._cleanup_interval: self._cleanup_stale_at(now) self._last_cleanup = now finally: self._cleanup_mutex.release() def cleanup_stale(self) -> int: with self._cleanup_mutex: now = max(self._safe_time(), self._last_cleanup) removed = self._cleanup_stale_at(now) self._last_cleanup = now return removed def _cleanup_stale_at(self, now: float) -> int: removed = 0 for idx, stripe in enumerate(self._stripe_locks): with stripe: for client_id, state in list(self._clients.items()): if self._stripe_index(client_id) != idx: continue if not state.lock.acquire(blocking=False): continue try: if self._clients.get(client_id) is state and now - state.last_seen >= self._idle_ttl: del self._clients[client_id] removed += 1 finally: state.lock.release() return removed def client_count(self) -> int: total = 0 for stripe in self._stripe_locks: with stripe: total += len(self._clients) break return len(self._clients) class HybridRateLimiterTests(unittest.TestCase): def test_basic_allow_deny_and_retry_after(self): clock = ManualClock() limiter = HybridRateLimiter(rate=1, burst=2, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9) def test_burst_draining_and_fractional_refill(self): clock = ManualClock() limiter = HybridRateLimiter(rate=2, burst=3, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) clock.advance(0.25) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 0.25, places=9) clock.advance(0.25) self.assertTrue(limiter.allow('alice')) def test_sliding_window_caps_even_when_bucket_refilled(self): clock = ManualClock() limiter = HybridRateLimiter(rate=10, burst=10, window_seconds=1, clock=clock) for _ in range(10): self.assertTrue(limiter.allow('abuser')) clock.advance(0.5) self.assertFalse(limiter.allow('abuser')) self.assertAlmostEqual(limiter.retry_after('abuser'), 0.5, places=9) clock.advance(0.5) self.assertTrue(limiter.allow('abuser')) def test_cost_larger_than_burst_is_rejected_without_state_creation(self): clock = ManualClock() limiter = HybridRateLimiter(rate=10, burst=5, window_seconds=10, clock=clock) self.assertFalse(limiter.allow('alice', cost=6)) self.assertEqual(limiter.client_count(), 0) self.assertTrue(limiter.allow('alice', cost=5)) self.assertFalse(limiter.allow('alice')) def test_concurrent_contention_obeys_token_bucket_bound(self): clock = ManualClock() rate = 5 burst = 5 limiter = HybridRateLimiter(rate=rate, burst=burst, window_seconds=100, clock=clock, stripes=8) successes = 0 for tick in range(11): clock.set(tick / 10.0) with ThreadPoolExecutor(max_workers=32) as pool: results = list(pool.map(lambda _: limiter.allow('hot'), range(32))) successes += sum(1 for r in results if r) bound = rate * 1.0 + burst self.assertLessEqual(successes, bound) def test_stale_client_eviction(self): clock = ManualClock() limiter = HybridRateLimiter( rate=10, burst=10, window_seconds=1, clock=clock, idle_ttl_seconds=5, cleanup_interval_seconds=1, ) self.assertTrue(limiter.allow('old-client')) self.assertEqual(limiter.client_count(), 1) clock.advance(6) self.assertTrue(limiter.allow('new-client')) self.assertEqual(limiter.client_count(), 1) def test_clock_going_backwards_is_clamped(self): clock = ManualClock(10) limiter = HybridRateLimiter(rate=1, burst=1, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) clock.set(5) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9) def test_large_pause_refills_only_to_burst(self): clock = ManualClock() limiter = HybridRateLimiter(rate=1, burst=3, window_seconds=1000, clock=clock) for _ in range(3): self.assertTrue(limiter.allow('alice')) clock.advance(10 ** 9) for _ in range(3): self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) if __name__ == '__main__': unittest.main()
Result
Winning Votes
3 / 3
Average Score
Total Score
Overall Comments
Answer A provides a production-quality, sophisticated, and robust implementation of the hybrid rate limiter. Its key strengths are the highly scalable concurrency model using lock striping combined with per-client locks, an elegant cleanup mechanism that avoids a background thread, and a comprehensive, well-designed test suite that correctly validates complex behaviors like concurrent contention. The code is clean, well-structured, and fully adheres to all prompt requirements, including advanced features like per-client configurations.
View Score Details ▼
Correctness
Weight 35%The implementation is highly correct and robust. The concurrency model using lock striping and per-client locks is a sophisticated and correct way to avoid bottlenecks. All edge cases, including clock safety and fractional refills, are handled correctly. The logic for both the token bucket and sliding window is sound.
Completeness
Weight 20%This answer is exceptionally complete. It implements all required API methods, handles every specified edge case, provides a comprehensive test suite with more than the required number of tests, and includes a clear design note. It also implements the suggested per-client configuration, demonstrating a full understanding of the prompt.
Code Quality
Weight 20%The code quality is excellent. It uses modern Python features like dataclasses effectively, is well-structured into logical methods, and is highly readable. The design choices, such as the integrated cleanup mechanism and lock striping, are elegant and demonstrate a high level of engineering skill.
Practical Value
Weight 15%The implementation is of production-grade quality and highly practical. The scalable concurrency model makes it suitable for high-throughput systems. The flexible configuration and robust handling of edge cases mean it could be deployed with confidence in a real-world application.
Instruction Following
Weight 10%The answer follows all instructions perfectly. It provides code in a requested language, implements the specified API, algorithm, and concurrency strategy, handles all edge cases, includes the required tests and complexity analysis, and provides a design note within the specified constraints.
Total Score
Overall Comments
Answer A is a strong, mostly complete implementation that directly matches the requested API and semantics. It combines a token bucket with an exact sliding-window log, uses an injectable monotonic clock, avoids a single global bottleneck through striped map locks plus per-client locks, and includes solid deterministic tests covering the required edge cases. The design note is concise and addresses tradeoffs, concurrency correctness, and complexity. Minor weaknesses include a somewhat awkward client_count implementation and retry_after only reasoning about 1-unit availability rather than arbitrary cost, though that matches the requested API.
View Score Details ▼
Correctness
Weight 35%Implements the required hybrid correctly: request passes only if both token bucket and exact sliding window allow it, with proper token refill clamping, lazy init, stale cleanup, and per-client serialized mutation preventing double-spend. retry_after is coherent for 1-unit availability, and tests cover important invariants. Minor concerns are small implementation quirks such as client_count reading the map without full synchronization and some boundary choices in window eviction.
Completeness
Weight 20%Covers nearly all requested items: required API surface, hybrid algorithm, injectable monotonic clock, per-client configuration support, explicit edge cases, stale-client cleanup, fractional timing, concurrency discussion, complexity note, and 8 meaningful tests including deterministic concurrent contention and eviction. This is very close to fully satisfying the prompt.
Code Quality
Weight 20%Well-structured and idiomatic, with dataclasses, separated helpers, explicit validation, and clear naming. The locking strategy is thoughtfully organized and the design note is concise. A few rough edges remain, such as the unusual client_count implementation and some low-level lock management that could be simplified.
Practical Value
Weight 15%Practical for backend use: deterministic clock injection, bounded stale-state cleanup, lock striping plus per-client locks for contention, exact window accounting, and solid tests make it deployable or adaptable. The implementation is realistic and addresses common production edge cases like large pauses and VM suspension.
Instruction Following
Weight 10%Follows the prompt closely: language choice is valid, API names match, design note is within scope, tests use the injectable clock, and requested edge cases are explicitly addressed. The answer aligns well with both the technical and documentation requirements.
Total Score
Overall Comments
Answer A is a polished, production-quality hybrid rate limiter. It uses dataclasses, stripe locks plus per-client locks, integrates stale cleanup into allow calls (avoiding background thread issues), validates configuration, properly clamps elapsed time for both backwards-clock and large pauses, and rejects cost>burst without creating client state. The 8 tests use the injectable ManualClock deterministically, including a concurrent contention test that genuinely asserts the rate*T+burst invariant. The design note is concise and addresses all required points.
View Score Details ▼
Correctness
Weight 35%Correctly implements hybrid token bucket + exact sliding window log. Handles clock-backwards by clamping via max(raw_now, last_seen), rejects cost>burst without creating state, clamps elapsed for tokens to avoid overflow, uses per-client locks with stripe locks for the map. retry_after correctly computes wait time based on window log expiration. The eps handling and event accounting look sound.
Completeness
Weight 20%Covers all required edge cases: cost>burst (without state creation), clock-backwards (test included), large pauses (test included), lazy init, stale eviction (deterministic via _maybe_cleanup tied to injected clock), fractional tokens. 8 tests including concurrent bound test that asserts the rate*T+burst invariant deterministically using ManualClock.
Code Quality
Weight 20%Well-structured with dataclasses, separation of MonotonicClock/ManualClock, ClientConfig validation, per-client and stripe locks clearly explained. Cleanup is integrated into allow path (no background thread complications). Concise design note at top.
Practical Value
Weight 15%Production-quality: stripe locks scale better, integrated lazy cleanup avoids background thread lifecycle, deterministic test support, config validation. Suitable for embedding in a real service.
Instruction Following
Weight 10%Follows all instructions: API surface matches signatures, hybrid algorithm, per-client locks with stripe locks, injectable monotonic clock, all edge cases, 8 tests including deterministic concurrent invariant test, design note with semantics and complexity stated.