import time import threading import random from collections import deque from dataclasses import dataclass, field from typing import List, Tuple, Dict, Optional @dataclass class ClientState: """Holds all state for a single client.""" lock: threading.Lock = field(default_factory=threading.Lock, repr=False) requests: deque[Tuple[float, int]] = field(default_factory=deque) burst: int = 0 stats: Dict[str, int] = field(default_factory=lambda: {"admitted": 0, "rejected": 0, "revoked": 0}) revoked_log: List[Tuple[float, int]] = field(default_factory=list) class RateLimiter: """ A thread...
全文を表示 ▼
import time import threading import random from collections import deque from dataclasses import dataclass, field from typing import List, Tuple, Dict, Optional @dataclass class ClientState: """Holds all state for a single client.""" lock: threading.Lock = field(default_factory=threading.Lock, repr=False) requests: deque[Tuple[float, int]] = field(default_factory=deque) burst: int = 0 stats: Dict[str, int] = field(default_factory=lambda: {"admitted": 0, "rejected": 0, "revoked": 0}) revoked_log: List[Tuple[float, int]] = field(default_factory=list) class RateLimiter: """ A thread-safe rate limiter implementing a sliding window algorithm with priority queues and burst allowances. """ def __init__(self, max_requests: int, window_seconds: float, default_burst: int = 0): if max_requests <= 0 or window_seconds <= 0: raise ValueError("max_requests and window_seconds must be positive.") self.max_requests = max_requests self.window_seconds = window_seconds self.default_burst = default_burst self._clients: Dict[str, ClientState] = {} self._clients_lock = threading.Lock() def _get_or_create_client(self, client_id: str) -> ClientState: """Atomically get or create a client's state object.""" if client_id not in self._clients: with self._clients_lock: # Double-check locking pattern to avoid race conditions on creation if client_id not in self._clients: self._clients[client_id] = ClientState(burst=self.default_burst) return self._clients[client_id] def set_client_burst(self, client_id: str, burst: int) -> None: """Override burst allowance for a specific client.""" client = self._get_or_create_client(client_id) with client.lock: client.burst = burst def allow(self, client_id: str, priority: int = 3, timestamp: Optional[float] = None) -> bool: """ Check if a request is allowed. If timestamp is None, use current time. Returns True if the request is admitted, False if rejected. """ if not 1 <= priority <= 5: raise ValueError("Priority must be an integer between 1 and 5.") now = timestamp if timestamp is not None else time.time() client = self._get_or_create_client(client_id) with client.lock: # 1. Slide the window: remove expired requests window_start = now - self.window_seconds while client.requests and client.requests[0][0] < window_start: client.requests.popleft() # 2. Determine the effective request limit (base + burst) effective_max = self.max_requests if client.burst > 0 and client.requests: first_req_time = client.requests[0][0] if (now - first_req_time) >= (self.window_seconds / 2.0): effective_max += client.burst current_requests_count = len(client.requests) # 3. Decision: Admit, Revoke, or Reject if current_requests_count < effective_max: # Case A: Space is available (either base or burst) client.requests.append((now, priority)) client.stats['admitted'] += 1 return True else: # Case B: Window is full, check for priority revocation # Find the request with the highest priority number (lowest actual priority) worst_request_idx = -1 worst_priority = -1 for i, req in enumerate(client.requests): if req[1] > worst_priority: worst_priority = req[1] worst_request_idx = i # If a suitable candidate for revocation exists... if worst_request_idx != -1 and worst_priority > priority: # Revoke the lowest-priority request revoked_req = client.requests[worst_request_idx] del client.requests[worst_request_idx] client.revoked_log.append(revoked_req) client.stats['revoked'] += 1 # Admit the new, higher-priority request client.requests.append((now, priority)) client.stats['admitted'] += 1 return True else: # Case C: No space and no possible revocation -> Reject client.stats['rejected'] += 1 return False def get_stats(self, client_id: str) -> dict: """ Return a dict with keys: 'admitted', 'rejected', 'revoked', 'utilization' """ if client_id not in self._clients: return {'admitted': 0, 'rejected': 0, 'revoked': 0, 'utilization': 0.0} client = self._clients[client_id] with client.lock: # Slide window for accurate utilization calculation at the current time now = time.time() window_start = now - self.window_seconds while client.requests and client.requests[0][0] < window_start: client.requests.popleft() current_count = len(client.requests) utilization = min(1.0, current_count / self.max_requests) if self.max_requests > 0 else 0.0 stats_copy = client.stats.copy() stats_copy['utilization'] = utilization return stats_copy def get_revoked_log(self, client_id: str) -> list: """ Return a list of (timestamp, priority) tuples for revoked requests for the given client, in chronological order. """ if client_id not in self._clients: return [] client = self._clients[client_id] with client.lock: # The log is already in chronological order of revocation return list(client.revoked_log) # --- Demonstration Script --- def run_demonstration(): print("--- Rate Limiter Demonstration ---") limiter = RateLimiter(max_requests=5, window_seconds=10.0, default_burst=2) # --- Client 1: Standard, Rejection, and Priority Revocation --- client1 = "client-A" print(f"\n--- Testing Client: {client1} ---") # 1. Fill the window with medium-priority requests print("1. Filling the window (5 requests)...") for i in range(5): limiter.allow(client1, priority=4, timestamp=float(i)) # 2. Reject a request print("2. Sending a 6th request (should be rejected)...") is_allowed = limiter.allow(client1, priority=4, timestamp=5.0) print(f" Request at T=5.0 allowed: {is_allowed} (Expected: False)") # 3. Revoke a low-priority request with a high-priority one print("3. Sending a high-priority request (should revoke one)...") is_allowed = limiter.allow(client1, priority=1, timestamp=6.0) print(f" High-priority request at T=6.0 allowed: {is_allowed} (Expected: True)") # 4. Let the window slide print("4. Waiting for window to slide...") # At T=11.0, request at T=0.0 has expired. Window contains T=1,2,3,4,6 (5 requests) is_allowed = limiter.allow(client1, priority=5, timestamp=11.0) print(f" Request at T=11.0 allowed: {is_allowed} (Expected: False)") # At T=12.0, requests at T=0,1 have expired. Window contains T=2,3,4,6 (4 requests) is_allowed = limiter.allow(client1, priority=5, timestamp=12.0) print(f" Request at T=12.0 allowed: {is_allowed} (Expected: True)") # --- Client 2: Burst Allowance --- client2 = "client-B" print(f"\n--- Testing Client: {client2} (Burst) ---") limiter.set_client_burst(client2, 3) # Override burst to 3 for this client # 1. Send initial request print("1. Sending initial request at T=20.0...") limiter.allow(client2, priority=3, timestamp=20.0) # 2. Try to burst before half-window (should fail) print("2. Attempting to burst before half-window (T=24.0)...") # Window is 10s, half is 5s. T=24.0 is 4s after T=20.0. # Should allow up to max_requests (5) for i in range(4): # Requests 2, 3, 4, 5 limiter.allow(client2, priority=3, timestamp=24.0) is_allowed = limiter.allow(client2, priority=3, timestamp=24.1) print(f" 6th request at T=24.1 (before burst active) allowed: {is_allowed} (Expected: False)") # 3. Activate burst after half-window print("3. Activating burst after half-window (T=25.0)...") # T=25.0 is 5s after T=20.0. Burst is now active. # Limit is now 5 (base) + 3 (burst) = 8. # Currently 5 requests in window (from T=20, T=24). Should allow 3 more. allowed_count = 0 for _ in range(4): # Try to send 4 more if limiter.allow(client2, priority=3, timestamp=25.0): allowed_count += 1 print(f" Allowed {allowed_count} burst requests at T=25.0. (Expected: 3)") # --- Final Stats --- print("\n--- Final Stats ---") print(f"Client A Stats: {limiter.get_stats(client1)}") print(f"Client A Revoked Log: {limiter.get_revoked_log(client1)}") print(f"Client B Stats: {limiter.get_stats(client2)}") print(f"Client B Revoked Log: {limiter.get_revoked_log(client2)}") print(f"Unknown Client Stats: {limiter.get_stats('client-C')}") # --- Multithreaded Test --- def run_multithreaded_test(): print("\n--- Multithreaded Test ---") limiter = RateLimiter(max_requests=20, window_seconds=5.0) client_id = "mt-client" num_threads = 5 requests_per_thread = 10 total_requests = num_threads * requests_per_thread barrier = threading.Barrier(num_threads) def worker(): barrier.wait() for i in range(requests_per_thread): priority = random.randint(1, 5) limiter.allow(client_id, priority=priority) time.sleep(0.01) threads = [threading.Thread(target=worker) for _ in range(num_threads)] for t in threads: t.start() for t in threads: t.join() print(f"\nFinished multithreaded test with {num_threads} threads and {requests_per_thread} requests each.") stats = limiter.get_stats(client_id) print(f"Final Stats for {client_id}: {stats}") total_processed = stats['admitted'] + stats['rejected'] print(f"Total requests attempted: {total_requests}") print(f"Total requests processed (admitted + rejected): {total_processed}") if total_processed == total_requests: print("Test PASSED: All requests were accounted for, indicating thread safety.") else: print(f"Test FAILED: Mismatch in request counts. Expected {total_requests}, got {total_processed}.") if __name__ == '__main__': run_demonstration() run_multithreaded_test()
判定
勝利票
1 / 3
平均スコア
総合点
総評
回答Aはレートリミッターの非常に強力で正確な実装を提供しています。クライアントごとのロックの使用は、複数のクライアントを同時に処理するレートリミッターにとって重要な優れた並行性を保証します。コードはクリーンで、よく構造化されており、堅牢なデモスクリプトを含む、指定されたすべての機能とエッジケースを効果的に処理します。優先度取り消しのためにヒープではなく線形スキャンを使用していますが、全体的な正確さ、明瞭さ、および実用的な並行性により、優れたソリューションとなっています。
採点詳細を表示 ▼
正確さ
重み 35%スライディングウィンドウ、優先度取り消し(線形スキャン)、バースト許容量、および統計はすべて正しく実装されています。クライアントごとのロックは、堅牢なスレッドセーフティを保証します。利用率は、一般的な解釈である基本のmax_requestsに対して計算されます。
完全性
重み 20%必要なすべてのメソッドと機能が完全に実装されており、デモスクリプトはレートリミッターのすべての側面を包括的にテストしています。
コード品質
重み 20%コードはよく構造化されており、データクラスを効果的に使用し、メソッド名とコメントが明確です。詳細なクライアントごとのロックは、並行性にとって強力な点です。取り消しのための線形スキャンは、単純さのためのマイナーなパフォーマンスのトレードオフです。
実用性
重み 15%このソリューションは、クライアントごとのロックを備えた堅牢なスレッドセーフティモデルにより、高い実用的な価値を提供し、異なるクライアント間での高い並行性を可能にします。これにより、複数のクライアントが同時にリミッターにヒットする可能性のある実世界のアプリケーションに適しています。
指示遵守
重み 10%インターフェース、機能、デモスクリプト、およびエッジケースを含むすべての指示が正しく従われています。唯一のマイナーな点は、プロンプトがヒープまたはソート済み構造を好んでいたのに対し、優先度取り消しに線形スキャンを使用していることです。
総合点
総評
回答Aは、クライアントごとのロック、スライディングウィンドウの削除、優先度ベースの置換、バーストサポート、統計情報、マルチスレッドデモを備えた実行可能な実装を提供します。構造は読みやすく、ほとんど正しいですが、クライアントIDやバースト値を検証しておらず、取り消しのために優先度構造ではなく線形スキャンを使用しており、指定されたエッジケースに対してウィンドウ境界の処理が不正確です。これは、カットオフちょうどのリクエストを保持するためです。また、利用率はmax_requestsに対してのみキャップされており、アクティブなバースト容量を無視しています。
採点詳細を表示 ▼
正確さ
重み 35%コアのスライディングウィンドウアドミッション、バーストゲーティング、優先度置換は一般的に機能しており、スレッドセーフティはクライアントごとのロックによって合理的に確保されています。しかし、ウィンドウ境界ちょうどのリクエストは、削除が `<=` ではなく `<` を使用するため、アクティブと見なされます。これは、指定されたエッジケースの要件と矛盾します。クライアントIDとバーストの検証も欠落しており、利用率はバースト調整された容量を無視しています。
完全性
重み 20%完全なクラスインターフェイス、統計情報、取り消されたログ、シーケンシャルデモンストレーション、マルチスレッドテストが含まれています。しかし、特に無効なクライアントID、正確な境界の動作、およびシーケンスでの明示的な複数回の取り消しなど、要求されたエッジケースの一部は完全には処理または実証されていません。
コード品質
重み 20%読みやすく整理されており、便利なデータクラスと明確なコメントが付いています。ただし、クラスのドキュメンテーションでは優先度キューが言及されていますが、取り消しはデックに対する線形スキャンであり、検証が一部欠落しており、デモの期待値が体系的にテストされるのではなく、非公式に埋め込まれています。
実用性
重み 15%実行可能な例として有用で、理解しやすいですが、検証の不完全さ、境界の不一致、および単純な取り消しスキャンにより、本番環境での使用にはやや不向きです。マルチスレッドテストはアカウンティングをチェックしますが、かなり軽量です。
指示遵守
重み 10%プロンプトのほとんどに従っており、完全な実行可能なコード、統計情報、取り消されたログ、および並行処理を提供しています。それでも、優先データ構造の選択、境界処理、およびクエリでの空のクライアントIDの正常な処理については不十分です。
総合点
総評
回答Aは、データクラスベースのクライアント状態とクライアントごとのロックを備えた、クリーンで読みやすい実装を提供しています。スライディングウィンドウは、優先度取り消しのために線形スキャンを行うdequeを使用しており、これは正しいですが最適ではありません。デモスクリプトはほとんどの機能をうまく実行しています。しかし、いくつかの正確性の問題があります。(1) スライディングウィンドウの削除は、厳密な小なり(< window_start)を使用しており、小なりイコールではないため、境界線上のリクエストは期限切れになりません。これは軽微な境界処理の問題です。(2) 取り消されたリクエストのスロットカウントは微妙に間違っています。リクエストが取り消されると、dequeから削除されますが、デクリメントされずに(取り消されたリクエストは許可されたため正しい)アドミテッド統計でカウントされ続けます。(3) get_statsでの利用率計算は、最大リクエスト数(base + burst)ではなく、最大リクエスト数で除算しており、これは議論の余地がありますが、バーストを考慮した容量とは一致しません。(4) バーストアクティベーションチェックは、半ウィンドウに対して>=を使用しており、これは正しいです。(5) マルチスレッドテストは、要求された最小の4ではなく5スレッドを使用していますが、これは問題ありません。しかし、共有クライアントでの競合をテストするよりも面白みのない単一クライアントを使用しています。クライアントごとのロック戦略は並行処理には適していますが、_get_or_create_client内のロックの外側にある最初のチェックには潜在的な問題があります(ロックなしで_clients辞書を読み取る)。ただし、二重チェックパターンがこれを緩和します。
採点詳細を表示 ▼
正確さ
重み 35%回答Aは、コアのスライディングウィンドウと優先度取り消しロジックを正しく実装しています。しかし、境界処理では有効期限の厳密な小なり(境界上のリクエストは期限切れにならない)を使用しており、利用率は実効容量ではなく最大リクエスト数に対して計算されています。取り消しのための線形スキャンは正しいですが、最適ではありません。バーストアクティベーションロジックは、>=半ウィンドウチェックで正しいです。
完全性
重み 20%回答Aは、スライディングウィンドウ、優先度取り消し、バースト許可、スレッドセーフティ、統計の5つの必須機能をすべてカバーしています。デモはほとんどの機能を実行します。しかし、マルチスレッドテストは、共有リソースでの競合を実証するのではなく、単一のクライアントを使用しています。未知のクライアントなどのエッジケースは処理されます。優先度検証は、予想通りValueErrorを発生させます。
コード品質
重み 20%回答Aは、クライアント状態のためのクリーンなデータクラス、明確なメソッド構造、および良好なコメントを備えた優れたコード構成を持っています。クライアントごとのロック戦略はよく設計されています。変数名は明確で一貫性があります。デモスクリプトは、明確な期待される出力とともに、よく整理されています。
実用性
重み 15%回答Aは、中程度のユースケースに適した動作するレートリミッターを提供します。クライアントごとのロックは並行処理に適しています。しかし、取り消しのための線形スキャンは、多くのリクエストがある場合にボトルネックになる可能性があります。デモ出力は明確で教育的です。この実装は、いくつかの改良を加えれば本番環境でも機能するでしょう。
指示遵守
重み 10%回答Aは、インターフェース仕様に正しく従っています。すべての必須メソッドを正しいシグネチャで実装しています。デモは指定されたパラメータでリミッターを作成し、ほとんどの機能を実行します。要求された最小の4スレッドではなく5スレッドを使用します。必要に応じて統計と取り消されたログを出力します。