Answer A: Google Gemini 2.5 Flash-Lite
```python import time import threading import collections import heapq import uuid from dataclasses import dataclass, field from typing import Dict, List, Tuple, Any, Optional # --- Configuration --- # Define tiers with their respective rate limits and window sizes. # Lower window_seconds means tighter rate limiting. TIER_CONFIG = { "free": {"max_requests": 5, "window_seconds": 60}, "standard": {"max_requests": 50, "window_seconds": 60}, "premium": {"max_requests": 200, "window_seconds": 60}, } # How long to keep client data before cleaning up if no activity. CLEANUP_THRESHOLD_SECONDS = 300 # ---...
Show Full Answer ▼
```python import time import threading import collections import heapq import uuid from dataclasses import dataclass, field from typing import Dict, List, Tuple, Any, Optional # --- Configuration --- # Define tiers with their respective rate limits and window sizes. # Lower window_seconds means tighter rate limiting. TIER_CONFIG = { "free": {"max_requests": 5, "window_seconds": 60}, "standard": {"max_requests": 50, "window_seconds": 60}, "premium": {"max_requests": 200, "window_seconds": 60}, } # How long to keep client data before cleaning up if no activity. CLEANUP_THRESHOLD_SECONDS = 300 # --- Data Structures --- @dataclass(order=True) class Request: """Represents a request with priority, for the priority queue.""" # We use priority as the first element for heapq to sort by it. # Lower number means higher priority. priority: int # Use a unique ID to break ties in priority and ensure stable sorting. request_id: str = field(default_factory=lambda: str(uuid.uuid4())) client_id: str timestamp: float payload: Any def __lt__(self, other): # Custom comparison for heapq: prioritize by priority, then by request_id for stability. if self.priority != other.priority: return self.priority < other.priority return self.request_id < other.request_id class SlidingWindowCounter: """Implements a sliding window counter for rate limiting.""" def __init__(self, max_requests: int, window_seconds: float): self.max_requests = max_requests self.window_seconds = window_seconds # Use a deque to store timestamps of requests within the window. # Deques offer efficient appends and pops from both ends. self.request_timestamps: collections.deque[float] = collections.deque() # Lock for thread-safe access to request_timestamps. self.lock = threading.Lock() def record_request(self) -> bool: """Records a request and returns True if it's within the limit, False otherwise.""" with self.lock: current_time = time.time() # Remove timestamps older than the window. while self.request_timestamps and self.request_timestamps[0] <= current_time - self.window_seconds: self.request_timestamps.popleft() # Check if adding a new request exceeds the limit. if len(self.request_timestamps) < self.max_requests: self.request_timestamps.append(current_time) return True else: return False def get_current_count(self) -> int: """Returns the current number of requests within the window.""" with self.lock: current_time = time.time() # Clean up old timestamps before returning count. while self.request_timestamps and self.request_timestamps[0] <= current_time - self.window_seconds: self.request_timestamps.popleft() return len(self.request_timestamps) class RateLimiter: """A thread-safe rate limiter with multiple tiers, sliding window, and priority queues.""" def __init__(self, tier_config: Dict[str, Dict[str, Any]], cleanup_threshold_seconds: float): self.tier_config = tier_config self.cleanup_threshold_seconds = cleanup_threshold_seconds # Stores the sliding window counters for each tier. self.tier_counters: Dict[str, SlidingWindowCounter] = {} # Stores the priority queue for deferred requests for each tier. self.tier_queues: Dict[str, List[Request]] = collections.defaultdict(list) # Stores the last activity timestamp for each client for cleanup. self.client_last_activity: Dict[str, float] = {} # Stores the tier assigned to each client. self.client_tiers: Dict[str, str] = {} # Main lock to protect shared data structures like client_tiers, client_last_activity, and tier_queues. # The SlidingWindowCounter has its own internal lock. self.global_lock = threading.Lock() # Initialize tier counters. for tier_name, config in tier_config.items(): self.tier_counters[tier_name] = SlidingWindowCounter( max_requests=config["max_requests"], window_seconds=config["window_seconds"] ) # Start a background thread for cleanup. self.cleanup_thread = threading.Thread(target=self._cleanup_task, daemon=True) self.cleanup_thread.start() def register_client(self, client_id: str, tier: str) -> bool: """Registers a client with a specific tier. Returns True if successful, False if client already exists or tier is invalid.""" if tier not in self.tier_config: print(f"[!] Error: Tier '{tier}' does not exist.") return False with self.global_lock: if client_id in self.client_tiers: print(f"[!] Warning: Client '{client_id}' already registered.") return False self.client_tiers[client_id] = tier self.client_last_activity[client_id] = time.time() print(f"[*] Client '{client_id}' registered with tier '{tier}'.") return True def _get_client_tier(self, client_id: str) -> Optional[str]: """Safely retrieves the tier for a given client_id.""" with self.global_lock: return self.client_tiers.get(client_id) def _update_client_activity(self, client_id: str): """Updates the last activity timestamp for a client.""" with self.global_lock: self.client_last_activity[client_id] = time.time() def allow_request(self, client_id: str) -> bool: """Checks if a request from client_id is allowed based on their tier's rate limit.""" tier = self._get_client_tier(client_id) if not tier: print(f"[!] Error: Client '{client_id}' not registered.") return False counter = self.tier_counters.get(tier) if not counter: # This should not happen if tier_config is valid and initialization is correct. print(f"[!] Internal Error: Counter not found for tier '{tier}'.") return False if counter.record_request(): self._update_client_activity(client_id) # print(f"[*] Request ALLOWED for client '{client_id}' (Tier: {tier}). Current count: {counter.get_current_count()}") return True else: # print(f"[*] Request DENIED for client '{client_id}' (Tier: {tier}). Current count: {counter.get_current_count()}") return False def enqueue_request(self, client_id: str, priority: int, payload: Any): """Enqueues a rate-limited request into the appropriate priority queue.""" tier = self._get_client_tier(client_id) if not tier: print(f"[!] Error: Client '{client_id}' not registered. Cannot enqueue.") return request = Request(priority=priority, client_id=client_id, timestamp=time.time(), payload=payload) with self.global_lock: heapq.heappush(self.tier_queues[tier], request) print(f"[*] Request ENQUEUED for client '{client_id}' (Tier: {tier}, Priority: {priority}). Queue size: {len(self.tier_queues[tier])}") self._update_client_activity(client_id) def dequeue_and_process(self, client_id: str) -> Optional[Tuple[Request, bool]]: """Attempts to dequeue and process the highest priority request for a client's tier if capacity is available.""" tier = self._get_client_tier(client_id) if not tier: print(f"[!] Error: Client '{client_id}' not registered. Cannot dequeue.") return None counter = self.tier_counters.get(tier) if not counter: print(f"[!] Internal Error: Counter not found for tier '{tier}'.") return None # Check if capacity is available BEFORE trying to dequeue. # This prevents unnecessary locking and heap operations if no capacity exists. if counter.get_current_count() < counter.max_requests: with self.global_lock: queue = self.tier_queues[tier] if queue: # Re-check capacity inside the lock to handle race conditions. # It's possible another thread just filled the capacity. if counter.get_current_count() < counter.max_requests: highest_priority_request = heapq.heappop(queue) # Attempt to record the request again. This should succeed if capacity is available. if counter.record_request(): self._update_client_activity(highest_priority_request.client_id) print(f"[*] Request DEQUEUED and PROCESSED for client '{highest_priority_request.client_id}' (Tier: {tier}, Priority: {highest_priority_request.priority}). Queue size: {len(queue)}") return highest_priority_request, True else: # This is a rare race condition: capacity was available, but record_request failed. # Push it back and log an error. heapq.heappush(queue, highest_priority_request) print(f"[!] Race condition: Failed to record dequeued request for {highest_priority_request.client_id}. Pushed back.") return highest_priority_request, False else: # Capacity was taken by another thread between the outer check and this inner check. # Push the request back if it was popped. heapq.heappush(queue, highest_priority_request) # print(f"[*] Dequeue attempt failed for {client_id}: Capacity filled by another thread.") return highest_priority_request, False else: # Queue is empty. # print(f"[*] Dequeue attempt for {client_id}: Queue is empty.") return None, False else: # No capacity available. # print(f"[*] Dequeue attempt for {client_id}: No capacity available.") return None, False def _cleanup_task(self): """Background task to remove inactive client data.""" while True: time.sleep(self.cleanup_threshold_seconds / 2) # Check periodically current_time = time.time() clients_to_remove = [] with self.global_lock: for client_id, last_activity in self.client_last_activity.items(): if current_time - last_activity > self.cleanup_threshold_seconds: clients_to_remove.append(client_id) for client_id in clients_to_remove: tier = self.client_tiers.pop(client_id, None) if tier: self.client_last_activity.pop(client_id, None) # Optionally, clear queues for this client if needed, though typically not done. # For simplicity, we leave enqueued requests as they are. print(f"[*] Cleaned up inactive client '{client_id}'.") # --- Demonstration Script --- def simulate_requests(rate_limiter: RateLimiter, client_id: str, num_requests: int, delay: float, priority_offset: int = 0): """Simulates a burst of requests from a single client.""" print(f"--- Simulating {num_requests} requests from client '{client_id}' ---") for i in range(num_requests): # Simulate varying priorities for enqueued requests priority = i % 5 + priority_offset # Lower number = higher priority payload = f"data_{client_id}_{i}" if rate_limiter.allow_request(client_id): print(f"[Thread {threading.current_thread().name}] Request {i+1}/{num_requests} ALLOWED for '{client_id}'. Payload: {payload}") else: print(f"[Thread {threading.current_thread().name}] Request {i+1}/{num_requests} DENIED for '{client_id}'. Enqueuing with priority {priority}.") rate_limiter.enqueue_request(client_id, priority, payload) time.sleep(delay) def process_deferred_requests(rate_limiter: RateLimiter, client_id: str, num_attempts: int): """Periodically tries to process deferred requests for a client.""" print(f"--- Attempting to process deferred requests for client '{client_id}' ---") for i in range(num_attempts): time.sleep(1.5) # Wait a bit for capacity to potentially free up print(f"[Processor Thread] Attempt {i+1}/{num_attempts} to dequeue for '{client_id}'...") result = rate_limiter.dequeue_and_process(client_id) if result: request, success = result if success: print(f"[Processor Thread] Successfully processed: {request.payload} (Priority: {request.priority})") else: print(f"[Processor Thread] Failed to process (capacity issue?): {request.payload} (Priority: {request.priority}). Re-enqueued.") else: print(f"[Processor Thread] No deferred requests processed for '{client_id}' this attempt (queue empty or no capacity).") if __name__ == "__main__": print("--- Rate Limiter Demonstration ---") # Initialize the rate limiter limiter = RateLimiter(tier_config=TIER_CONFIG, cleanup_threshold_seconds=CLEANUP_THRESHOLD_SECONDS) # Register clients clients = { "client_A": "free", "client_B": "standard", "client_C": "premium", "client_D": "free", "client_E": "standard", } for cid, tier in clients.items(): limiter.register_client(cid, tier) print("\n--- Simulating concurrent requests ---") # Simulate a burst of requests from multiple threads threads = [] # Client A (free tier) - likely to hit rate limit t1 = threading.Thread(target=simulate_requests, args=(limiter, "client_A", 15, 0.1, 0), name="Thread-A") threads.append(t1) # Client B (standard tier) - more capacity t2 = threading.Thread(target=simulate_requests, args=(limiter, "client_B", 60, 0.05, 1), name="Thread-B") threads.append(t2) # Client C (premium tier) - highest capacity t3 = threading.Thread(target=simulate_requests, args=(limiter, "client_C", 250, 0.01, 2), name="Thread-C") threads.append(t3) # Client D (free tier) - another client hitting the same limit t4 = threading.Thread(target=simulate_requests, args=(limiter, "client_D", 10, 0.15, 3), name="Thread-D") threads.append(t4) # Start simulation threads for t in threads: t.start() # Give some time for initial requests and enqueuing time.sleep(2) print("\n--- Attempting to process deferred requests ---") # Start threads to process deferred requests for clients that might have been rate-limited # We'll try processing for clients A and D (free tier) multiple times. processor_threads = [] tp1 = threading.Thread(target=process_deferred_requests, args=(limiter, "client_A", 5), name="Processor-A") processor_threads.append(tp1) tp2 = threading.Thread(target=process_deferred_requests, args=(limiter, "client_D", 5), name="Processor-D") processor_threads.append(tp2) # Also try for client B, just in case tp3 = threading.Thread(target=process_deferred_requests, args=(limiter, "client_B", 3), name="Processor-B") processor_threads.append(tp3) for tp in processor_threads: tp.start() # Wait for all simulation threads to complete for t in threads: t.join() # Wait for processor threads to complete their attempts for tp in processor_threads: tp.join() print("\n--- Demonstration Complete ---") print("Final state check (counts are approximate due to timing):") for tier_name, counter in limiter.tier_counters.items(): print(f"Tier '{tier_name}': Current requests in window: {counter.get_current_count()} (Max: {counter.max_requests})") for tier_name, queue in limiter.tier_queues.items(): print(f"Tier '{tier_name}': Deferred requests in queue: {len(queue)}") # --- Design Choices Explanation --- # 1. Sliding Window Implementation: # - We use `collections.deque` to store timestamps of requests within the window. # - `deque` provides O(1) complexity for appending new timestamps and popping old ones from the left. # - When `record_request` is called, we first remove all timestamps older than `current_time - window_seconds`. # - This ensures that `len(self.request_timestamps)` accurately reflects the number of requests within the current sliding window. # - Trade-off: This approach is precise but can involve multiple pops in `record_request` if many requests arrive in quick succession and then stop. However, over time, the deque size is managed. # - Thread Safety: Each `SlidingWindowCounter` instance has its own `threading.Lock` to protect its `request_timestamps` deque, ensuring that concurrent calls to `record_request` or `get_current_count` for the same tier are serialized. # 2. Multiple Tiers: # - The `RateLimiter` class maintains a dictionary `tier_config` mapping tier names to their rate limit parameters (`max_requests`, `window_seconds`). # - It also maintains separate `SlidingWindowCounter` instances and priority queues (`tier_queues`) for each tier. # - Client tier assignment is stored in `client_tiers`. # - This allows for flexible configuration and isolation of rate limits between different client types. # 3. Priority Queue for Deferred Requests: # - For each tier, a standard Python list is used as a min-heap, managed by the `heapq` module. # - The `Request` dataclass is designed with `priority` as the primary sorting key (lower value = higher priority). # - A unique `request_id` is included to ensure stable sorting (FIFO for same priorities) and prevent comparison errors if timestamps are identical. # - `enqueue_request` uses `heapq.heappush` to add requests, maintaining the heap property. # - `dequeue_and_process` uses `heapq.heappop` to retrieve the highest priority request. # - Crucially, `dequeue_and_process` first checks available capacity *before* attempting to pop from the heap. If capacity is available, it then acquires the global lock, re-checks capacity (to handle race conditions), pops the request, and attempts to `record_request` again. If `record_request` succeeds, the request is processed. If it fails (due to a race condition where capacity was just filled), the request is pushed back onto the heap. # - Trade-off: Using Python's list + heapq is efficient for moderate numbers of deferred requests. For extremely high volumes, a more specialized concurrent priority queue implementation might be considered, but this adds significant complexity. # 4. Thread Safety: # - `SlidingWindowCounter`: Uses an internal `threading.Lock` for its `request_timestamps` deque. # - `RateLimiter`: # - `global_lock` (`threading.Lock`): Protects shared client state (`client_tiers`, `client_last_activity`) and the tier priority queues (`tier_queues`). This lock is acquired for operations like `register_client`, `enqueue_request`, and within `dequeue_and_process` when accessing/modifying the queues. # - `allow_request`: Calls the thread-safe `counter.record_request()`. It also updates client activity under the global lock. # - `dequeue_and_process`: Carefully manages locks. It first checks capacity *without* the global lock. If capacity *might* be available, it acquires the global lock, re-checks capacity, performs the heap operation, and then calls the thread-safe `counter.record_request()` again. # - Trade-off: The use of multiple locks (one per counter + one global lock) aims to reduce contention compared to a single global lock for all operations. However, operations that span client state and tier state (like dequeueing) require careful lock ordering or acquisition of the global lock. # 5. Cleanup: # - A background thread (`_cleanup_task`) periodically scans `client_last_activity`. # - It identifies clients whose last activity timestamp is older than `cleanup_threshold_seconds`. # - These clients are removed from `client_tiers` and `client_last_activity` under the `global_lock`. # - This prevents the `RateLimiter` from accumulating indefinitely large amounts of state for inactive clients. # - Trade-off: The cleanup is not instantaneous; it runs periodically. The `cleanup_threshold_seconds` determines the trade-off between memory usage and responsiveness of cleanup. # 6. Edge Cases Handled: # - Duplicate client registration: `register_client` returns False and logs a warning. # - Unregistered clients: `allow_request` and `enqueue_request` check `client_tiers` and return/log errors. # - Empty priority queues: `dequeue_and_process` handles cases where `self.tier_queues[tier]` is empty. # - Concurrent modifications: Addressed by using `threading.Lock` appropriately. # - Clock precision: `time.time()` is used, which is generally sufficient. For extremely high-precision requirements, platform-specific monotonic clocks might be considered, but `time.time()` is standard for this type of application. # - Race conditions in dequeue: Handled by re-checking capacity and re-recording the request within the critical section of `dequeue_and_process`. ```
Result
Winning Votes
1 / 3
Average Score
Total Score
Overall Comments
Answer A provides a complete implementation with all five features, good documentation, and a demonstration script. However, it has a critical design flaw: the sliding window counter is shared per-tier rather than per-client, meaning all clients in the same tier share a single request count. This means if client_A (free tier) uses 5 requests, client_D (also free tier) would be blocked even though they should have their own quota. This is a fundamental correctness issue. The priority queue is also per-tier rather than per-client, which means dequeue_and_process for a specific client might process another client's request. The dequeue_and_process method has a bug where it references `highest_priority_request` outside the `if queue:` block in the else clause, which would cause an UnboundLocalError. The code also uses `time.time()` instead of `time.monotonic()`, making it vulnerable to system clock changes. The demonstration script uses large tier configs (60-second windows) that make it hard to see the sliding window actually working within a reasonable demo time. Comments are extensive but somewhat verbose.
View Score Details ▼
Correctness
Weight 35%Answer A has a critical correctness flaw: the sliding window counter is per-tier, not per-client. All clients sharing a tier share one counter, so client_A's requests consume client_D's quota. The dequeue_and_process method has a potential UnboundLocalError (references highest_priority_request in an else branch where it may not be defined). Uses time.time() instead of time.monotonic(), vulnerable to clock jumps. The sliding window itself (deque of timestamps) is correctly implemented in isolation, but applied at the wrong granularity.
Completeness
Weight 20%Answer A implements all five features: sliding window, multiple tiers, priority queue, thread safety, and cleanup (via background thread). However, the per-tier rather than per-client rate limiting means the sliding window feature doesn't work as intended. The demo script is present and uses multiple threads. The cleanup runs as a background daemon thread. All required components (Request dataclass, RateLimiter class, demo) are present.
Code Quality
Weight 20%Answer A has reasonable structure with separate SlidingWindowCounter class, but the architectural decision to share counters per-tier is a design flaw. The Request dataclass defines both order=True and __lt__, which is redundant. The global_lock is used broadly, reducing concurrency benefits. Comments are extensive but verbose. Some commented-out print statements remain in the code. The dequeue_and_process method is complex with nested conditions and potential bugs.
Practical Value
Weight 15%Answer A's per-tier rate limiting makes it impractical for real use cases where each client should have independent rate limits. The demo uses 60-second windows which makes it hard to observe the sliding window behavior in a reasonable time. The background cleanup thread is a nice touch but the core functionality being incorrect limits practical value significantly.
Instruction Following
Weight 10%Answer A follows most instructions: implements RateLimiter class, Request dataclass, handles edge cases (duplicate registration, unregistered clients), provides demo with multiple threads. However, the per-tier rather than per-client rate limiting arguably misinterprets the requirement that 'clients are assigned a tier upon registration' - the tier defines the limits but each client should be tracked independently. The demo shows requests being allowed and enqueued. Design choices are explained in comments.
Total Score
Overall Comments
Implements a working sliding-window limiter using a deque of timestamps and uses heapq-based per-tier deferred queues, plus a multi-threaded demo and extensive design comments. However, the core design is incorrect for per-client rate limiting: the sliding window counter is per tier (shared across all clients in a tier), so one client’s traffic affects others, which violates typical and implied “clients are assigned a tier” semantics and the prompt’s per-client processing expectations. There are also concurrency/logic issues: enqueue_request calls _update_client_activity while holding global_lock, but _update_client_activity re-acquires global_lock (deadlock). dequeue_and_process has a bug path referencing highest_priority_request when capacity becomes unavailable after the inner check (variable may be undefined). Cleanup removes client registration but leaves queued requests that can no longer be dequeued (since client tier lookup fails).
View Score Details ▼
Correctness
Weight 35%Sliding window logic itself is fine, but it is implemented per tier instead of per client, so clients in the same tier throttle each other. enqueue_request deadlocks (global_lock held then _update_client_activity re-acquires global_lock). dequeue_and_process contains an undefined-variable bug in one branch and cleanup can orphan queued requests by removing client registration.
Completeness
Weight 20%Includes RateLimiter, Request dataclass, multi-tier config, per-tier priority queues, background cleanup thread, and a multi-thread demo. However, key operations are flawed (deadlock) and cleanup removes clients rather than just expired tracking, leaving deferred items inconsistent.
Code Quality
Weight 20%Readable and heavily commented, but has serious structural issues: nested locking causing deadlock, inconsistent dequeue return types (None vs (None, False)), and a buggy branch referencing an uninitialized variable. Also mixes dataclass(order=True) with a custom __lt__ unnecessarily.
Practical Value
Weight 15%In practice it can hang due to deadlock, and tier-wide limiting makes it unsuitable for real client-based rate limiting. Cleanup can make deferred queues unusable for cleaned clients. Demo is extensive but may not complete reliably.
Instruction Following
Weight 10%Follows most instructions (sliding window, tiers, priority queues, thread safety claims, demo, comments), but violates key intent around client-based limiting and has edge-case handling that can break (deadlock, orphaned queued requests).
Total Score
Overall Comments
Answer A provides a very strong, correct, and practical implementation of the concurrent rate limiter. Its design choices are sound, particularly the separation of the `SlidingWindowCounter` into its own class and the efficient logic for processing deferred requests from a shared tier queue. The implementation of thread safety is robust, with careful handling of race conditions in the dequeue method. The inclusion of an automated background cleanup thread and a comprehensive demonstration script further elevates its quality. While the use of a single global lock for client data and queues is a minor trade-off, the overall solution is well-engineered and production-ready.
View Score Details ▼
Correctness
Weight 35%The implementation is correct across all features. The sliding window logic is standard and effective. The priority queue and dequeue logic are particularly well-handled, including a robust check for race conditions. The overall system behaves as expected under concurrency.
Completeness
Weight 20%All five required features are fully implemented. The solution goes a step further by implementing the cleanup mechanism as an autonomous background thread, which is a more complete and robust solution than a simple manual method.
Code Quality
Weight 20%The code quality is high, with excellent separation of concerns (e.g., the `SlidingWindowCounter` class). The API is clean and intuitive. The use of a single global lock for queues is a slight simplification but is implemented correctly. The code is well-commented and readable.
Practical Value
Weight 15%The solution is highly practical. The design is scalable, efficient, and could be used in a production environment with minimal changes. The automated cleanup and robust dequeue logic make it a reliable component.
Instruction Following
Weight 10%The answer perfectly follows all instructions in the prompt. It implements all required features, provides a `Request` dataclass, includes a comprehensive demonstration script, and offers a detailed explanation of its design choices.