Orivel Orivel
Open menu

Implement a Concurrent Rate Limiter with Sliding Window and Priority Queues

Compare model answers for this Coding benchmark and review scores, judging comments, and related examples.

Login or register to use likes and favorites. Register

X f L

Contents

Task Overview

Benchmark Genres

Coding

Task Creator Model

Answering Models

Judge Models

Task Prompt

Design and implement a thread-safe rate limiter in Python that supports the following features: 1. **Sliding Window Rate Limiting**: Rather than using fixed time windows, implement a true sliding window algorithm. Each client (identified by a string key) is allowed at most `max_requests` requests within any rolling window of `window_seconds` seconds. 2. **Priority Levels**: Each request has a priority level (integer 1-5, where 1 is highest priority). When the rate limit is reached for a client, lower-priority req...

Show more

Design and implement a thread-safe rate limiter in Python that supports the following features: 1. **Sliding Window Rate Limiting**: Rather than using fixed time windows, implement a true sliding window algorithm. Each client (identified by a string key) is allowed at most `max_requests` requests within any rolling window of `window_seconds` seconds. 2. **Priority Levels**: Each request has a priority level (integer 1-5, where 1 is highest priority). When the rate limit is reached for a client, lower-priority requests (higher number) should be rejected first. Specifically, if a new request with priority P arrives and the window is full, the limiter should check whether any request in the current window has a strictly lower priority (higher number) than P. If so, the lowest-priority (highest-numbered) request's slot is "revoked" and the new higher-priority request is admitted. The revoked request should be recorded so it can be reported. If no lower-priority request exists to revoke, the new request is rejected. 3. **Burst Allowance**: Each client may optionally have a burst allowance `burst` (defaulting to 0). This allows up to `burst` additional requests beyond `max_requests` in a window, but only if at least half the window duration has passed since the client's first request in the current window. 4. **Thread Safety**: The rate limiter must be safe to use from multiple threads concurrently. Demonstrate this with a test scenario. 5. **Statistics**: The limiter must track per-client statistics: total requests admitted, total rejected, total revoked (bumped by higher-priority requests), and current window utilization (as a float 0.0 to 1.0). Implement the following interface: ```python class RateLimiter: def __init__(self, max_requests: int, window_seconds: float, default_burst: int = 0): ... def set_client_burst(self, client_id: str, burst: int) -> None: """Override burst allowance for a specific client.""" ... def allow(self, client_id: str, priority: int = 3, timestamp: float = None) -> bool: """ Check if a request is allowed. If timestamp is None, use current time. Returns True if the request is admitted, False if rejected. """ ... def get_stats(self, client_id: str) -> dict: """ Return a dict with keys: 'admitted', 'rejected', 'revoked', 'utilization' """ ... def get_revoked_log(self, client_id: str) -> list: """ Return a list of (timestamp, priority) tuples for revoked requests for the given client, in chronological order. """ ... ``` Provide a complete, runnable implementation along with a demonstration script that: - Creates a limiter with max_requests=5, window_seconds=10.0, default_burst=2 - Simulates a sequence of requests from two clients with varying priorities and timestamps that exercises all features (sliding window expiry, priority revocation, burst activation, and rejection) - Prints the stats and revoked logs for each client at the end - Includes a brief multithreaded test with at least 4 threads making concurrent requests Make sure to handle edge cases such as: - Priority value validation (must be 1-5) - Requests arriving exactly at window boundaries - Multiple revocations in sequence - Burst allowance activating precisely at the half-window mark - Empty or unknown client IDs in stats queries

Judging Policy

A strong answer should provide a complete, runnable Python implementation that correctly handles all five features. Key quality indicators include: (1) Correctness of the sliding window algorithm — requests should expire based on their individual timestamps, not fixed buckets. (2) Correct priority revocation logic — when the window is full and a higher-priority request arrives, the lowest-priority existing request should be revoked and replaced. (3) Burst allowance should only activate after half the window has ela...

Show more

A strong answer should provide a complete, runnable Python implementation that correctly handles all five features. Key quality indicators include: (1) Correctness of the sliding window algorithm — requests should expire based on their individual timestamps, not fixed buckets. (2) Correct priority revocation logic — when the window is full and a higher-priority request arrives, the lowest-priority existing request should be revoked and replaced. (3) Burst allowance should only activate after half the window has elapsed since the first request in the current window. (4) Thread safety should use appropriate synchronization primitives (locks) and the multithreaded demo should be meaningful, not trivial. (5) Statistics should be accurate and consistent. (6) Edge cases should be handled gracefully — invalid priorities should raise errors or be handled, unknown clients should return sensible defaults, and boundary timestamps should work correctly. (7) The demonstration script should clearly exercise all features and produce understandable output. (8) Code quality matters: clear structure, good naming, appropriate comments, and clean separation of concerns. Solutions that use a heap or sorted structure for efficient priority revocation are preferred over naive linear scans, though correctness is more important than optimization.

Answers

import time import threading import random from collections import deque from dataclasses import dataclass, field from typing import List, Tuple, Dict, Optional @dataclass class ClientState: """Holds all state for a single client.""" lock: threading.Lock = field(default_factory=threading.Lock, repr=False) requests: deque[Tuple[float, int]] = field(default_factory=deque) burst: int = 0 stats: Dict[str, int] = field(default_factory=lambda: {"admitted": 0, "rejected": 0, "revoked": 0}) revoked_log: List[Tuple[float, int]] = field(default_factory=list) class RateLimiter: """ A thread...

Show Full Answer

import time import threading import random from collections import deque from dataclasses import dataclass, field from typing import List, Tuple, Dict, Optional @dataclass class ClientState: """Holds all state for a single client.""" lock: threading.Lock = field(default_factory=threading.Lock, repr=False) requests: deque[Tuple[float, int]] = field(default_factory=deque) burst: int = 0 stats: Dict[str, int] = field(default_factory=lambda: {"admitted": 0, "rejected": 0, "revoked": 0}) revoked_log: List[Tuple[float, int]] = field(default_factory=list) class RateLimiter: """ A thread-safe rate limiter implementing a sliding window algorithm with priority queues and burst allowances. """ def __init__(self, max_requests: int, window_seconds: float, default_burst: int = 0): if max_requests <= 0 or window_seconds <= 0: raise ValueError("max_requests and window_seconds must be positive.") self.max_requests = max_requests self.window_seconds = window_seconds self.default_burst = default_burst self._clients: Dict[str, ClientState] = {} self._clients_lock = threading.Lock() def _get_or_create_client(self, client_id: str) -> ClientState: """Atomically get or create a client's state object.""" if client_id not in self._clients: with self._clients_lock: # Double-check locking pattern to avoid race conditions on creation if client_id not in self._clients: self._clients[client_id] = ClientState(burst=self.default_burst) return self._clients[client_id] def set_client_burst(self, client_id: str, burst: int) -> None: """Override burst allowance for a specific client.""" client = self._get_or_create_client(client_id) with client.lock: client.burst = burst def allow(self, client_id: str, priority: int = 3, timestamp: Optional[float] = None) -> bool: """ Check if a request is allowed. If timestamp is None, use current time. Returns True if the request is admitted, False if rejected. """ if not 1 <= priority <= 5: raise ValueError("Priority must be an integer between 1 and 5.") now = timestamp if timestamp is not None else time.time() client = self._get_or_create_client(client_id) with client.lock: # 1. Slide the window: remove expired requests window_start = now - self.window_seconds while client.requests and client.requests[0][0] < window_start: client.requests.popleft() # 2. Determine the effective request limit (base + burst) effective_max = self.max_requests if client.burst > 0 and client.requests: first_req_time = client.requests[0][0] if (now - first_req_time) >= (self.window_seconds / 2.0): effective_max += client.burst current_requests_count = len(client.requests) # 3. Decision: Admit, Revoke, or Reject if current_requests_count < effective_max: # Case A: Space is available (either base or burst) client.requests.append((now, priority)) client.stats['admitted'] += 1 return True else: # Case B: Window is full, check for priority revocation # Find the request with the highest priority number (lowest actual priority) worst_request_idx = -1 worst_priority = -1 for i, req in enumerate(client.requests): if req[1] > worst_priority: worst_priority = req[1] worst_request_idx = i # If a suitable candidate for revocation exists... if worst_request_idx != -1 and worst_priority > priority: # Revoke the lowest-priority request revoked_req = client.requests[worst_request_idx] del client.requests[worst_request_idx] client.revoked_log.append(revoked_req) client.stats['revoked'] += 1 # Admit the new, higher-priority request client.requests.append((now, priority)) client.stats['admitted'] += 1 return True else: # Case C: No space and no possible revocation -> Reject client.stats['rejected'] += 1 return False def get_stats(self, client_id: str) -> dict: """ Return a dict with keys: 'admitted', 'rejected', 'revoked', 'utilization' """ if client_id not in self._clients: return {'admitted': 0, 'rejected': 0, 'revoked': 0, 'utilization': 0.0} client = self._clients[client_id] with client.lock: # Slide window for accurate utilization calculation at the current time now = time.time() window_start = now - self.window_seconds while client.requests and client.requests[0][0] < window_start: client.requests.popleft() current_count = len(client.requests) utilization = min(1.0, current_count / self.max_requests) if self.max_requests > 0 else 0.0 stats_copy = client.stats.copy() stats_copy['utilization'] = utilization return stats_copy def get_revoked_log(self, client_id: str) -> list: """ Return a list of (timestamp, priority) tuples for revoked requests for the given client, in chronological order. """ if client_id not in self._clients: return [] client = self._clients[client_id] with client.lock: # The log is already in chronological order of revocation return list(client.revoked_log) # --- Demonstration Script --- def run_demonstration(): print("--- Rate Limiter Demonstration ---") limiter = RateLimiter(max_requests=5, window_seconds=10.0, default_burst=2) # --- Client 1: Standard, Rejection, and Priority Revocation --- client1 = "client-A" print(f"\n--- Testing Client: {client1} ---") # 1. Fill the window with medium-priority requests print("1. Filling the window (5 requests)...") for i in range(5): limiter.allow(client1, priority=4, timestamp=float(i)) # 2. Reject a request print("2. Sending a 6th request (should be rejected)...") is_allowed = limiter.allow(client1, priority=4, timestamp=5.0) print(f" Request at T=5.0 allowed: {is_allowed} (Expected: False)") # 3. Revoke a low-priority request with a high-priority one print("3. Sending a high-priority request (should revoke one)...") is_allowed = limiter.allow(client1, priority=1, timestamp=6.0) print(f" High-priority request at T=6.0 allowed: {is_allowed} (Expected: True)") # 4. Let the window slide print("4. Waiting for window to slide...") # At T=11.0, request at T=0.0 has expired. Window contains T=1,2,3,4,6 (5 requests) is_allowed = limiter.allow(client1, priority=5, timestamp=11.0) print(f" Request at T=11.0 allowed: {is_allowed} (Expected: False)") # At T=12.0, requests at T=0,1 have expired. Window contains T=2,3,4,6 (4 requests) is_allowed = limiter.allow(client1, priority=5, timestamp=12.0) print(f" Request at T=12.0 allowed: {is_allowed} (Expected: True)") # --- Client 2: Burst Allowance --- client2 = "client-B" print(f"\n--- Testing Client: {client2} (Burst) ---") limiter.set_client_burst(client2, 3) # Override burst to 3 for this client # 1. Send initial request print("1. Sending initial request at T=20.0...") limiter.allow(client2, priority=3, timestamp=20.0) # 2. Try to burst before half-window (should fail) print("2. Attempting to burst before half-window (T=24.0)...") # Window is 10s, half is 5s. T=24.0 is 4s after T=20.0. # Should allow up to max_requests (5) for i in range(4): # Requests 2, 3, 4, 5 limiter.allow(client2, priority=3, timestamp=24.0) is_allowed = limiter.allow(client2, priority=3, timestamp=24.1) print(f" 6th request at T=24.1 (before burst active) allowed: {is_allowed} (Expected: False)") # 3. Activate burst after half-window print("3. Activating burst after half-window (T=25.0)...") # T=25.0 is 5s after T=20.0. Burst is now active. # Limit is now 5 (base) + 3 (burst) = 8. # Currently 5 requests in window (from T=20, T=24). Should allow 3 more. allowed_count = 0 for _ in range(4): # Try to send 4 more if limiter.allow(client2, priority=3, timestamp=25.0): allowed_count += 1 print(f" Allowed {allowed_count} burst requests at T=25.0. (Expected: 3)") # --- Final Stats --- print("\n--- Final Stats ---") print(f"Client A Stats: {limiter.get_stats(client1)}") print(f"Client A Revoked Log: {limiter.get_revoked_log(client1)}") print(f"Client B Stats: {limiter.get_stats(client2)}") print(f"Client B Revoked Log: {limiter.get_revoked_log(client2)}") print(f"Unknown Client Stats: {limiter.get_stats('client-C')}") # --- Multithreaded Test --- def run_multithreaded_test(): print("\n--- Multithreaded Test ---") limiter = RateLimiter(max_requests=20, window_seconds=5.0) client_id = "mt-client" num_threads = 5 requests_per_thread = 10 total_requests = num_threads * requests_per_thread barrier = threading.Barrier(num_threads) def worker(): barrier.wait() for i in range(requests_per_thread): priority = random.randint(1, 5) limiter.allow(client_id, priority=priority) time.sleep(0.01) threads = [threading.Thread(target=worker) for _ in range(num_threads)] for t in threads: t.start() for t in threads: t.join() print(f"\nFinished multithreaded test with {num_threads} threads and {requests_per_thread} requests each.") stats = limiter.get_stats(client_id) print(f"Final Stats for {client_id}: {stats}") total_processed = stats['admitted'] + stats['rejected'] print(f"Total requests attempted: {total_requests}") print(f"Total requests processed (admitted + rejected): {total_processed}") if total_processed == total_requests: print("Test PASSED: All requests were accounted for, indicating thread safety.") else: print(f"Test FAILED: Mismatch in request counts. Expected {total_requests}, got {total_processed}.") if __name__ == '__main__': run_demonstration() run_multithreaded_test()

Result

#2

Winning Votes

1 / 3

Average Score

74

Total Score

90

Overall Comments

Answer A provides a very strong and correct implementation of the rate limiter. Its use of per-client locks ensures excellent concurrency, which is crucial for a rate limiter handling multiple clients simultaneously. The code is clean, well-structured, and effectively handles all specified features and edge cases, including a robust demonstration script. While it uses a linear scan for priority revocation instead of a heap, its overall correctness, clarity, and practical concurrency make it a superior solution.

View Score Details

Correctness

Weight 35%
90

The sliding window, priority revocation (linear scan), burst allowance, and statistics are all correctly implemented. The per-client locking ensures robust thread safety. Utilization is calculated against the base max_requests, which is a common interpretation.

Completeness

Weight 20%
100

All required methods and features are fully implemented, and the demonstration script comprehensively exercises all aspects of the rate limiter.

Code Quality

Weight 20%
85

The code is well-structured, uses dataclasses effectively, and has clear method names and comments. The granular per-client locking is a strong point for concurrency. The linear scan for revocation is a minor performance trade-off for simplicity.

Practical Value

Weight 15%
85

The solution offers high practical value due to its robust thread safety model with per-client locks, allowing for high concurrency across different clients. This makes it suitable for real-world applications where multiple clients might hit the limiter simultaneously.

Instruction Following

Weight 10%
90

All instructions, including the interface, features, demonstration script, and edge cases, are followed correctly. The only minor point is the use of a linear scan for priority revocation, whereas the prompt preferred a heap or sorted structure.

Judge Models OpenAI GPT-5.4

Total Score

66

Overall Comments

Answer A provides a runnable implementation with per-client locking, sliding-window eviction, priority-based replacement, burst support, stats, and a multithreaded demo. Its structure is readable and mostly correct, but it does not validate client IDs or burst values, uses a linear scan rather than a priority structure for revocation, and its window-boundary handling is incorrect for the stated edge case because it keeps requests exactly at the cutoff. Utilization is also capped against max_requests only, which ignores active burst capacity.

View Score Details

Correctness

Weight 35%
62

Core sliding-window admission, burst gating, and priority replacement generally work, and thread safety is reasonable via per-client locks. However, requests exactly at the window boundary are treated as still active because eviction uses < instead of <=, which conflicts with the stated edge-case requirement. Client ID and burst validation are also missing, and utilization ignores burst-adjusted capacity.

Completeness

Weight 20%
70

Includes the full class interface, stats, revoked log, a sequential demonstration, and a multithreaded test. But some requested edge cases are not fully handled or demonstrated, especially invalid client IDs, exact boundary behavior, and explicit multiple revocations in sequence.

Code Quality

Weight 20%
68

Readable and organized with a helpful dataclass and clear comments. However, the class docstring says priority queues even though revocation is a linear scan over a deque, some validation is absent, and the demo expectations are embedded informally rather than systematically tested.

Practical Value

Weight 15%
65

Useful as a runnable example and easy to follow, but less production-ready because of incomplete validation, boundary mismatch, and naive revocation scanning. The multithreaded test checks accounting but is fairly lightweight.

Instruction Following

Weight 10%
71

Follows most of the prompt and provides complete runnable code, stats, revoked logs, and concurrency. Still, it falls short on preferred data structure choice, boundary handling, and graceful handling of empty client IDs in queries.

Total Score

66

Overall Comments

Answer A provides a clean, readable implementation with a dataclass-based client state and per-client locking. The sliding window uses a deque with linear scan for priority revocation, which is correct but not optimal. The demonstration script exercises most features well. However, there are several correctness issues: (1) The sliding window eviction uses strict less-than (< window_start) rather than less-than-or-equal, meaning requests exactly at the boundary are not expired — this is a minor boundary handling issue. (2) The revoked request's slot counting is subtly wrong — when a request is revoked, it's removed from the deque but still counted in the admitted stats without being decremented, which is correct per the spec (revoked requests were admitted). (3) The utilization calculation in get_stats divides by max_requests rather than the effective capacity (base + burst), which could be debatable but doesn't match the burst-aware capacity. (4) The burst activation check uses >= for half-window which is correct. (5) The multithreaded test uses 5 threads instead of the requested minimum of 4, which is fine, but uses a single client which is less interesting than testing contention on shared clients. The per-client locking strategy is good for concurrency but the initial check outside the lock in _get_or_create_client has a potential issue (reading _clients dict without lock), though the double-check pattern mitigates this.

View Score Details

Correctness

Weight 35%
65

Answer A correctly implements the core sliding window and priority revocation logic. However, the boundary handling uses strict less-than for expiry (requests at exact boundary not expired), and utilization is calculated against max_requests rather than effective capacity. The linear scan for revocation is correct but not optimal. The burst activation logic is correct with >= half-window check.

Completeness

Weight 20%
65

Answer A covers all five required features: sliding window, priority revocation, burst allowance, thread safety, and statistics. The demo exercises most features. However, the multithreaded test uses a single client rather than demonstrating contention on shared resources. Edge cases like unknown clients are handled. Priority validation raises ValueError as expected.

Code Quality

Weight 20%
70

Answer A has excellent code organization with a clean dataclass for client state, clear method structure, and good comments. The per-client locking strategy is well-designed. Variable naming is clear and consistent. The demonstration script is well-organized with clear expected outputs.

Practical Value

Weight 15%
60

Answer A provides a working rate limiter suitable for moderate use cases. The per-client locking is good for concurrency. However, the linear scan for revocation could be a bottleneck with many requests. The demonstration output is clear and educational. The implementation would work in production with some refinements.

Instruction Following

Weight 10%
70

Answer A follows the interface specification correctly. It implements all required methods with correct signatures. The demo creates a limiter with the specified parameters and exercises most features. Uses 5 threads instead of the minimum 4 requested. Prints stats and revoked logs as required.

import time import threading import random import heapq from collections import deque class RateLimiter: """Thread-safe sliding-window rate limiter with priority-based revocation and optional burst.""" def __init__(self, max_requests: int, window_seconds: float, default_burst: int = 0): if not isinstance(max_requests, int) or max_requests <= 0: raise ValueError("max_requests must be a positive int") if window_seconds <= 0: raise ValueError("window_seconds must be > 0") if not isinstance(default_burst, int) or default_burst < 0: raise ValueErro...

Show Full Answer

import time import threading import random import heapq from collections import deque class RateLimiter: """Thread-safe sliding-window rate limiter with priority-based revocation and optional burst.""" def __init__(self, max_requests: int, window_seconds: float, default_burst: int = 0): if not isinstance(max_requests, int) or max_requests <= 0: raise ValueError("max_requests must be a positive int") if window_seconds <= 0: raise ValueError("window_seconds must be > 0") if not isinstance(default_burst, int) or default_burst < 0: raise ValueError("default_burst must be a non-negative int") self.max_requests = max_requests self.window_seconds = float(window_seconds) self.default_burst = default_burst self._lock = threading.RLock() self._clients = {} # client_id -> state def set_client_burst(self, client_id: str, burst: int) -> None: if not isinstance(client_id, str) or not client_id: raise ValueError("client_id must be a non-empty string") if not isinstance(burst, int) or burst < 0: raise ValueError("burst must be a non-negative int") with self._lock: st = self._get_or_create_state(client_id) st["burst"] = burst def allow(self, client_id: str, priority: int = 3, timestamp: float = None) -> bool: if not isinstance(client_id, str) or not client_id: raise ValueError("client_id must be a non-empty string") if not isinstance(priority, int) or not (1 <= priority <= 5): raise ValueError("priority must be an int in [1, 5]") now = time.time() if timestamp is None else float(timestamp) with self._lock: st = self._get_or_create_state(client_id) self._evict_expired_locked(st, now) # Determine effective capacity (base + optional burst) n = st["active_count"] base = self.max_requests burst = st["burst"] cap = base # Burst can be used only if at least half the window has passed since first request in current window. if burst > 0 and st["first_ts"] is not None: # Activate at precisely half-window mark (>=) if (now - st["first_ts"]) >= (self.window_seconds / 2.0): cap = base + burst # Fast path: within capacity if n < cap: self._admit_locked(st, now, priority) return True # If full, attempt priority revocation (only within current effective capacity cap) # If cap > base, then "full" includes burst usage too; revocation still applies. # We can only revoke a strictly lower priority (numerically higher). # Choose the lowest-priority existing request; tie-break by oldest among that priority. # If no lower-priority exists, reject. # Make sure heap top points to a live active event. cand = self._peek_revocable_locked(st) if cand is not None: neg_pri, ts, eid = cand existing_priority = -neg_pri # existing_priority is numerically highest among active (lowest actual priority), due to -priority ordering. if existing_priority > priority: # Revoke candidate self._pop_specific_candidate_locked(st, eid) # Admit new self._admit_locked(st, now, priority) st["revoked"] += 1 st["revoked_log"].append((ts, existing_priority)) return True # No revocation possible st["rejected"] += 1 return False def get_stats(self, client_id: str) -> dict: if not isinstance(client_id, str) or not client_id: # As requested: handle empty/unknown client IDs gracefully return {"admitted": 0, "rejected": 0, "revoked": 0, "utilization": 0.0} with self._lock: st = self._clients.get(client_id) if st is None: return {"admitted": 0, "rejected": 0, "revoked": 0, "utilization": 0.0} now = time.time() self._evict_expired_locked(st, now) base = self.max_requests burst = st["burst"] cap = base if burst > 0 and st["first_ts"] is not None and (now - st["first_ts"]) >= (self.window_seconds / 2.0): cap = base + burst util = (st["active_count"] / cap) if cap > 0 else 0.0 # clamp if util < 0.0: util = 0.0 if util > 1.0: util = 1.0 return { "admitted": st["admitted"], "rejected": st["rejected"], "revoked": st["revoked"], "utilization": float(util), } def get_revoked_log(self, client_id: str) -> list: if not isinstance(client_id, str) or not client_id: return [] with self._lock: st = self._clients.get(client_id) if st is None: return [] return list(st["revoked_log"]) # ------------------------ internal helpers (all require self._lock held) ------------------------ def _get_or_create_state(self, client_id: str) -> dict: st = self._clients.get(client_id) if st is None: st = { "burst": self.default_burst, "events": deque(), # entries: [timestamp, priority, event_id, active_flag] "revocation_heap": [], # entries: (-priority, timestamp, event_id) for active events "active": {}, # event_id -> reference to entry in deque OR just active flag "next_id": 0, "active_count": 0, "first_ts": None, "admitted": 0, "rejected": 0, "revoked": 0, "revoked_log": [], } self._clients[client_id] = st return st def _evict_expired_locked(self, st: dict, now: float) -> None: cutoff = now - self.window_seconds # Sliding window: keep entries with timestamp > cutoff. # Requests exactly at boundary (timestamp == cutoff) are expired. ev = st["events"] while ev and ev[0][0] <= cutoff: ts, pri, eid, active = ev[0] ev.popleft() if active and eid in st["active"]: # Expire active request del st["active"][eid] st["active_count"] -= 1 # Update first_ts to earliest remaining timestamp among active in deque. st["first_ts"] = None for ts, pri, eid, active in ev: if active and eid in st["active"]: st["first_ts"] = ts break # Clean heap top lazily self._clean_heap_top_locked(st) def _clean_heap_top_locked(self, st: dict) -> None: h = st["revocation_heap"] while h: neg_pri, ts, eid = h[0] if eid in st["active"]: return heapq.heappop(h) def _peek_revocable_locked(self, st: dict): self._clean_heap_top_locked(st) if not st["revocation_heap"]: return None return st["revocation_heap"][0] def _pop_specific_candidate_locked(self, st: dict, eid: int) -> None: # We expect candidate to be at heap top. Pop until we pop that eid. h = st["revocation_heap"] while h: neg_pri, ts, heid = heapq.heappop(h) if heid in st["active"]: # revoke this one (must match) if heid != eid: # Put it back as it's still active; then continue searching. heapq.heappush(h, (neg_pri, ts, heid)) # fall back: revoke the one we just saw instead? No; keep deterministic behavior. # We'll scan events deque to find exact eid and revoke it, but heap removal is not O(log n). # Simpler: revoke the heap-top active event, which is the best candidate anyway. # So if mismatch happens (shouldn't), revoke heid. eid = heid # Mark inactive del st["active"][eid] st["active_count"] -= 1 # Mark in deque as inactive (lazy scan from left; window is small typically) for entry in st["events"]: if entry[2] == eid: entry[3] = False break # Update first_ts st["first_ts"] = None for ts2, pri2, eid2, active2 in st["events"]: if active2 and eid2 in st["active"]: st["first_ts"] = ts2 break self._clean_heap_top_locked(st) return # If heap is empty, nothing to do. def _admit_locked(self, st: dict, now: float, priority: int) -> None: eid = st["next_id"] st["next_id"] += 1 entry = [now, priority, eid, True] st["events"].append(entry) st["active"][eid] = True st["active_count"] += 1 heapq.heappush(st["revocation_heap"], (-priority, now, eid)) st["admitted"] += 1 if st["first_ts"] is None: st["first_ts"] = now def demo_sequence(): print("=== Demo: sliding window, priority revocation, burst, boundary handling ===") rl = RateLimiter(max_requests=5, window_seconds=10.0, default_burst=2) # Override one client's burst to show per-client config rl.set_client_burst("clientA", 2) rl.set_client_burst("clientB", 0) t0 = 1_000_000.0 # Fill base window for clientA with low priority (5) for i in range(5): ok = rl.allow("clientA", priority=5, timestamp=t0 + i * 0.1) print("clientA admit low", i, ok) # Now window is full. Higher-priority request should revoke one of the priority-5 requests. ok = rl.allow("clientA", priority=2, timestamp=t0 + 0.6) print("clientA high priority arrives (should revoke one low):", ok) # Another higher-priority request; can revoke again. ok = rl.allow("clientA", priority=1, timestamp=t0 + 0.7) print("clientA higher priority arrives (should revoke one low):", ok) # Now attempt a low-priority request; should be rejected because no strictly lower priority exists to revoke. ok = rl.allow("clientA", priority=5, timestamp=t0 + 0.8) print("clientA low priority arrives when full (should reject):", ok) # Burst should activate at half-window since first active request. # First request was at t0. Half-window mark is t0+5.0. At exactly +5.0, burst becomes available. ok = rl.allow("clientA", priority=3, timestamp=t0 + 5.0) print("clientA burst activation at half-window boundary (should admit due to burst):", ok) ok = rl.allow("clientA", priority=4, timestamp=t0 + 5.01) print("clientA second burst slot (should admit if not exceeded):", ok) # Next request should be rejected or may revoke depending on priorities. ok = rl.allow("clientA", priority=4, timestamp=t0 + 5.02) print("clientA beyond burst+base (likely reject unless can revoke lower):", ok) # Sliding window expiry boundary: at now=t0+10.0, requests at t0 are expired (<= cutoff). # Send at exactly t0+10.0: entries at t0 are expired. ok = rl.allow("clientA", priority=3, timestamp=t0 + 10.0) print("clientA at exact window boundary (t0+10.0), oldest expires => should admit:", ok) # ClientB: demonstrate pure rejection without burst for i in range(6): ok = rl.allow("clientB", priority=3, timestamp=t0 + i * 0.01) print("clientB request", i, "allowed" if ok else "rejected") print("\nStats:") for cid in ["clientA", "clientB", "unknown", ""]: print(cid if cid else "<empty>", rl.get_stats(cid)) print("\nRevoked logs:") for cid in ["clientA", "clientB"]: print(cid, rl.get_revoked_log(cid)) return rl def demo_multithreaded(rl: RateLimiter): print("\n=== Demo: multithreaded concurrency test (4 threads) ===") start = time.time() stop_at = start + 1.0 results_lock = threading.Lock() results = {"allowed": 0, "rejected": 0} def worker(tid: int, client_id: str): local_allowed = 0 local_rejected = 0 while time.time() < stop_at: pri = random.randint(1, 5) ok = rl.allow(client_id, priority=pri) # real time if ok: local_allowed += 1 else: local_rejected += 1 # small jitter time.sleep(random.random() * 0.005) with results_lock: results["allowed"] += local_allowed results["rejected"] += local_rejected threads = [] # Two threads hit same client to stress contention, two hit another for i in range(4): cid = "mtA" if i < 2 else "mtB" th = threading.Thread(target=worker, args=(i, cid), daemon=True) threads.append(th) th.start() for th in threads: th.join() print("Concurrent run totals:", results) print("mtA stats:", rl.get_stats("mtA")) print("mtB stats:", rl.get_stats("mtB")) if __name__ == "__main__": rl = demo_sequence() demo_multithreaded(rl)

Result

#1 | Winner

Winning Votes

2 / 3

Average Score

80

Total Score

85

Overall Comments

Answer B offers a correct implementation, notably using a heap for more efficient priority revocation, which aligns with a preference in the prompt. However, its use of a single global RLock for all client operations significantly limits concurrency compared to Answer A's per-client locking, reducing its practical value in a multi-client environment. Some internal helper methods are also quite complex, impacting code readability. The utilization calculation against effective capacity is a minor deviation from a more common interpretation.

View Score Details

Correctness

Weight 35%
85

The core logic for sliding window, priority revocation (heap-based), and burst allowance is correct. However, the utilization calculation against effective capacity (including burst) is a less common interpretation, and the single global RLock, while technically thread-safe, limits true concurrency across clients.

Completeness

Weight 20%
100

All required methods and features are fully implemented, and the demonstration script comprehensively exercises all aspects of the rate limiter.

Code Quality

Weight 20%
75

The code uses a heap for efficient priority revocation, which is good. However, the overall structure is more complex with numerous internal helper methods, and the single global RLock is a less optimal choice for concurrency compared to per-client locks. The `_pop_specific_candidate_locked` method is also somewhat convoluted.

Practical Value

Weight 15%
70

While the heap-based revocation is efficient, the use of a single global RLock significantly limits the practical concurrency of the rate limiter. In a high-load, multi-client scenario, this design would become a bottleneck, reducing its practical utility compared to a more granular locking strategy.

Instruction Following

Weight 10%
95

All instructions are followed precisely, including the interface, features, demonstration script, and edge cases. The use of a heap for priority revocation specifically adheres to the prompt's preference for an efficient data structure.

Judge Models OpenAI GPT-5.4

Total Score

85

Overall Comments

Answer B delivers a more complete and robust implementation with explicit validation, a heap-backed revocation mechanism, careful handling of exact boundary cases, sensible defaults for unknown/empty client stats queries, and a stronger demonstration that exercises multiple revocations, burst activation at the half-window mark, and concurrent use from 4 threads. The code is more sophisticated and better aligned with the prompt’s preference for an efficient priority structure.

View Score Details

Correctness

Weight 35%
85

Implements true sliding-window eviction with exact-boundary expiration, validates priority/client ID/burst inputs, supports repeated revocations, and admits burst traffic exactly at the half-window mark. The heap-based revocation logic is sound and better matches the requested behavior. Minor complexity and some lazy cleanup bookkeeping keep it short of perfect, but overall it is clearly stronger.

Completeness

Weight 20%
88

Covers the full interface and all major requested features, including validation, unknown/empty stats queries, exact boundary handling, multiple revocations, burst activation at the precise half-window point, and a 4-thread concurrency demo. The demonstration is more comprehensive and directly exercises the specified edge cases.

Code Quality

Weight 20%
84

Well-structured, documented, and internally modularized with dedicated helper methods. The heap-based design is more appropriate for the task, and state management is explicit. The implementation is more complex, but that complexity is justified by better performance characteristics and edge-case control.

Practical Value

Weight 15%
81

More practical for real use thanks to broader validation, clearer edge-case handling, and a more scalable revocation strategy. The multithreaded demo is more meaningful because it uses shared clients to create actual contention across threads.

Instruction Following

Weight 10%
90

Closely follows the prompt: complete runnable code, all required methods, meaningful demonstrations for all requested features, explicit edge-case handling, and use of a heap/sorted structure as preferred by the judging guidance.

Total Score

70

Overall Comments

Answer B provides a more sophisticated implementation using a heap-based data structure for efficient priority revocation, which is preferred per the judging policy. It uses a single RLock for all operations, which is simpler but potentially less concurrent. Key strengths: (1) Uses a heap for O(log n) priority revocation with lazy deletion, which is the preferred approach. (2) More thorough input validation (checks for empty strings, type checks). (3) The utilization calculation considers the effective capacity including burst, which is more accurate. (4) The demonstration exercises all features including boundary cases. However, there are some issues: (1) The sliding window eviction uses <= cutoff, meaning requests exactly at the boundary ARE expired — this is a design choice but could be debated. (2) The _pop_specific_candidate_locked method has complex logic with a fallback that could theoretically revoke the wrong event if heap ordering doesn't match expectations, though in practice the heap top should be the candidate. (3) The deque entries use mutable lists which allows in-place modification of the active flag, but the linear scan to mark inactive is O(n). (4) The multithreaded test uses real time and runs for 1 second, which is meaningful and tests actual contention with 2 threads per client. (5) The events deque stores tuples as lists for mutability, which is a reasonable trade-off. (6) The revocation heap uses -priority for max-heap behavior, which correctly identifies the lowest-priority (highest number) request for revocation.

View Score Details

Correctness

Weight 35%
72

Answer B uses a heap for efficient priority revocation, which is the preferred approach. The boundary handling uses <= for expiry which is a reasonable choice. Utilization considers effective capacity including burst. The _pop_specific_candidate_locked has some complexity but handles the common case correctly. Input validation is more thorough. The first_ts tracking for burst activation is well-maintained through eviction and revocation.

Completeness

Weight 20%
70

Answer B covers all five features comprehensively. The demo exercises sliding window expiry at exact boundaries, multiple revocations in sequence, burst activation at the half-window mark, and rejection scenarios. The multithreaded test uses 4 threads with 2 threads per client to stress contention. Empty string client IDs are validated. Unknown clients return sensible defaults.

Code Quality

Weight 20%
65

Answer B uses a dictionary-based state which is less elegant than a dataclass but functional. The heap-based revocation adds complexity. The _pop_specific_candidate_locked method has somewhat convoluted logic with fallback behavior. Comments are adequate but the internal helper methods could be better documented. The use of mutable lists in the deque is a pragmatic choice but less clean.

Practical Value

Weight 15%
68

Answer B provides a more production-ready implementation with heap-based revocation for better performance, thorough input validation, and proper handling of edge cases. The RLock usage is simpler but the single lock could be a bottleneck. The multithreaded test is more realistic. The implementation handles more edge cases gracefully.

Instruction Following

Weight 10%
75

Answer B follows the interface specification correctly with all required methods. The demo creates a limiter with the specified parameters and exercises all features including boundary cases. Uses exactly 4 threads as requested. Prints stats and revoked logs for both clients. Handles empty client IDs as an edge case. The demonstration more thoroughly exercises all specified features.

Comparison Summary

Final rank order is determined by judge-wise rank aggregation (average rank + Borda tie-break). Average score is shown for reference.

Judges: 3

Winning Votes

1 / 3

Average Score

74
View this answer

Winning Votes

2 / 3

Average Score

80
View this answer

Judging Results

Why This Side Won

Answer B wins primarily due to better correctness in several areas: it uses a heap-based structure for priority revocation (preferred per judging policy), has more thorough input validation, calculates utilization based on effective capacity including burst, and provides a more meaningful multithreaded test with shared client contention. While Answer A is cleaner and more readable, Answer B's implementation is more robust and handles edge cases better. The weighted scoring favors B especially on correctness (35% weight) and completeness (20% weight).

Judge Models OpenAI GPT-5.4

Why This Side Won

Answer B wins because it scores higher on the most important weighted criteria, especially correctness and code quality. It handles exact window-boundary expiration correctly, validates more edge cases, uses a heap to support efficient and deterministic priority revocation, and more thoroughly exercises the requested features in the demo. Answer A is solid but misses some required edge-case handling and uses a less efficient revocation approach.

Why This Side Won

Answer A wins due to its superior design for thread safety and concurrency, employing per-client locks that allow multiple clients to operate simultaneously without blocking each other. This significantly enhances its practical value for a rate limiter. While Answer B uses a more efficient data structure (heap) for priority revocation, its use of a single global lock for all client operations creates a bottleneck, making it less performant in a multi-client, multi-threaded environment. Answer A's code is also generally cleaner and easier to understand.

X f L