Orivel Orivel
メニューを開く

スライディングウィンドウと優先度付きキューを備えた並行レートリミッタを実装する

このプログラミングベンチマークに対する各AIの回答と比較結果を確認できます。

いいね・お気に入り機能を使うにはログインまたは新規登録が必要です。 新規登録

X f L

目次

お題概要

比較ジャンル

プログラミング

お題作成モデル

回答モデル

採点モデル

お題本文

スレッドセーフなレートリミッタを Python で設計および実装してください。以下の機能をサポートすること。 1. **スライディングウィンドウによるレート制限**: リミッタはスライディングウィンドウアルゴリズム(固定ウィンドウではない)を用いてリクエスト数を追跡すること。`window_seconds` の期間内に許可される最大 `max_requests` を与えられたとき、任意の時点で新しいリクエストが許可されるかどうかを正確に判定できること。 2. **複数の階層(ティア)**: レートリミッタは名前付きの複数のティア(例: "free", "standard", "premium")をサポー...

さらに表示

スレッドセーフなレートリミッタを Python で設計および実装してください。以下の機能をサポートすること。 1. **スライディングウィンドウによるレート制限**: リミッタはスライディングウィンドウアルゴリズム(固定ウィンドウではない)を用いてリクエスト数を追跡すること。`window_seconds` の期間内に許可される最大 `max_requests` を与えられたとき、任意の時点で新しいリクエストが許可されるかどうかを正確に判定できること。 2. **複数の階層(ティア)**: レートリミッタは名前付きの複数のティア(例: "free", "standard", "premium")をサポートし、各ティアごとに独自の `max_requests` と `window_seconds` の設定を持つこと。クライアントは登録時にティアが割り当てられる。 3. **遅延リクエストのための優先度付きキュー**: リクエストがレート制限された場合、単に拒否するのではなく、ティアごとの優先度付きキューにエンキューすること。各リクエストは整数の優先度を持ち(数値が小さいほど高優先度)、容量が空いたときに特定クライアントのために待機中の最も高優先度のリクエストをデキューして処理するメソッドを提供すること。 4. **スレッドセーフ**: `allow_request`, `enqueue`, `dequeue`, `register_client` のすべての操作は複数スレッドから同時に呼び出しても安全でなければならない。 5. **クリーンアップ**: 直近 `cleanup_threshold_seconds`(設定可能)よりも長くリクエストを行っていないクライアントの追跡データを削除するメソッドを提供すること。 実装には以下を含めること: - 説明されたインターフェースを持つ `RateLimiter` クラス。 - 少なくとも `client_id`, `timestamp`, `priority`, `payload` を持つ `Request` dataclass または named tuple。 - 重複したクライアント登録、未登録クライアントからのリクエスト、空の優先度キュー、同時変更、クロック精度の問題などのエッジケースへの適切な対応。 さらに、デモ用スクリプトを `if __name__ == "__main__"` ブロック内に書くこと。デモでは以下を行うこと: - 少なくとも 2 つのティアを持つレートリミッタを作成する。 - 複数のクライアントを登録する。 - 複数スレッドからのバーストリクエストをシミュレートし、一部が許可され、他がエンキューされる様子を示す。 - 容量が空いたときに遅延リクエストが処理される様子を示す。 - イベントのシーケンスがわかるように明確な出力を表示する。 コメント内で設計上の選択を説明すること。特にスライディングウィンドウの実装、同期プリミティブの選択、および精度と性能の間のトレードオフについて記述すること。

補足情報

このタスクは、スライディングウィンドウカウンタ、優先度付きキュー、スレッドセーフな状態管理といった複数のアルゴリズム的概念を組み合わせた非自明な並行データ構造を設計する能力をテストする。優れた解答は、クリーンなAPI設計、過剰なロックを避けた正しい同期、適切なエッジケース処理、およびトレードオフに関する明確なドキュメントを示す。

採点方針

高品質な応答は以下の次元で評価されるべきである: 正確性: スライディングウィンドウアルゴリズムは実際にスライドしていること(リセットする固定時間バケットを使用していないこと)。優先度付きキューは優先度順に正しく並べ替えること。スレッド安全性は単なる主張ではなく、本当に共有可変状態に対する適切なロックや他の同期手段の使用が見られること。 完全性: プロンプトに列挙された 5 つの機能がすべて実装されていること。デモスクリプトは実行可能であり、複数スレッドでレートリミッタが実際に機能していることを明確に示すこと。 エッジケー...

さらに表示

高品質な応答は以下の次元で評価されるべきである: 正確性: スライディングウィンドウアルゴリズムは実際にスライドしていること(リセットする固定時間バケットを使用していないこと)。優先度付きキューは優先度順に正しく並べ替えること。スレッド安全性は単なる主張ではなく、本当に共有可変状態に対する適切なロックや他の同期手段の使用が見られること。 完全性: プロンプトに列挙された 5 つの機能がすべて実装されていること。デモスクリプトは実行可能であり、複数スレッドでレートリミッタが実際に機能していることを明確に示すこと。 エッジケースの取り扱い: 未登録クライアント、重複登録、空のキュー、および同時アクセスパターンをコードが優雅に扱うこと。これらの状況でクラッシュしたり誤った結果を出したりしてはならない。 設計品質: 関心事の分離が明確であること、合理的なクラス/メソッド構造、標準ライブラリ(threading, heapq, dataclasses, time)の適切な使用が見られること。不要に複雑な解決策は減点の対象となり、優雅で単純な解決は評価される。 ドキュメンテーション: コメントは選択したスライディングウィンドウアプローチ、同期プリミティブの選択、およびトレードオフ(例: メモリ対精度、ロック粒度)を説明するべきである。あいまいなまたは欠如したコメントは品質を下げる。 デモンストレーション: デモスクリプトは許可されたリクエスト、レート制限されたリクエスト、エンキューされたリクエスト、および後でデキューされたリクエストをわかりやすく表示すること。複数スレッドを使用して実際に並行性を試すこと。 減点対象: レースコンディション、誤ったスライディングウィンドウロジック、欠落した機能、デモで実際にスレッドを使用していない、または Python 3.8+ でそのまま実行できないコードなど。

回答

```python import time import threading import collections import heapq import uuid from dataclasses import dataclass, field from typing import Dict, List, Tuple, Any, Optional # --- 設定 --- # 各ティアのレート制限とウィンドウサイズを定義します。 # window_seconds が小さいほど、レート制限は厳しくなります。 TIER_CONFIG = { "free": {"max_requests": 5, "window_seconds": 60}, "standard": {"max_requests": 50, "window_seconds": 60}, "premium": {"max_requests": 200, "window_seconds": 60}, } # アクティビティがない場合にクライアントデータを削除するまでの保持期間。 CLEANUP_THRESHOLD_SECONDS = 300 # --- データ...

全文を表示

```python import time import threading import collections import heapq import uuid from dataclasses import dataclass, field from typing import Dict, List, Tuple, Any, Optional # --- 設定 --- # 各ティアのレート制限とウィンドウサイズを定義します。 # window_seconds が小さいほど、レート制限は厳しくなります。 TIER_CONFIG = { "free": {"max_requests": 5, "window_seconds": 60}, "standard": {"max_requests": 50, "window_seconds": 60}, "premium": {"max_requests": 200, "window_seconds": 60}, } # アクティビティがない場合にクライアントデータを削除するまでの保持期間。 CLEANUP_THRESHOLD_SECONDS = 300 # --- データ構造 --- @dataclass(order=True) class Request: """優先度付きリクエストを表します。優先度キュー用です。""" # heapq が優先度でソートできるように、優先度を最初の要素として使用します。 # 数値が小さいほど優先度が高くなります。 priority: int # 優先度のタイブレークと安定ソートを保証するために、一意のIDを使用します。 request_id: str = field(default_factory=lambda: str(uuid.uuid4())) client_id: str timestamp: float payload: Any def __lt__(self, other): """heapq のカスタム比較: 優先度でソートし、次に request_id で安定ソートします。""" if self.priority != other.priority: return self.priority < other.priority return self.request_id < other.request_id class SlidingWindowCounter: """スライディングウィンドウカウンターをレート制限のために実装します。""" def __init__(self, max_requests: int, window_seconds: float): self.max_requests = max_requests self.window_seconds = window_seconds # ウィンドウ内のリクエストのタイムスタンプを格納するために deque を使用します。 # deque は両端からの追加と削除が効率的です。 self.request_timestamps: collections.deque[float] = collections.deque() # request_timestamps へのスレッドセーフなアクセス用のロック。 self.lock = threading.Lock() def record_request(self) -> bool: """リクエストを記録し、制限内であれば True を、そうでなければ False を返します。""" with self.lock: current_time = time.time() # ウィンドウより古いタイムスタンプを削除します。 while self.request_timestamps and self.request_timestamps[0] <= current_time - self.window_seconds: self.request_timestamps.popleft() # 新しいリクエストを追加すると制限を超えるかどうかを確認します。 if len(self.request_timestamps) < self.max_requests: self.request_timestamps.append(current_time) return True else: return False def get_current_count(self) -> int: """ウィンドウ内の現在リクエスト数を返します。""" with self.lock: current_time = time.time() # カウントを返す前に古いタイムスタンプをクリーンアップします。 while self.request_timestamps and self.request_timestamps[0] <= current_time - self.window_seconds: self.request_timestamps.popleft() return len(self.request_timestamps) class RateLimiter: """複数のティア、スライディングウィンドウ、優先度付きキューを備えたスレッドセーフなレートリミッターです。""" def __init__(self, tier_config: Dict[str, Dict[str, Any]], cleanup_threshold_seconds: float): self.tier_config = tier_config self.cleanup_threshold_seconds = cleanup_threshold_seconds # 各ティアのリクエストキュー(遅延リクエスト用)を格納します。 self.tier_queues: Dict[str, List[Request]] = collections.defaultdict(list) # クリーンアップ用の各クライアントの最終アクティビティタイムスタンプを格納します。 self.client_last_activity: Dict[str, float] = {} # 各クライアントに割り当てられたティアを格納します。 self.client_tiers: Dict[str, str] = {} # スライディングウィンドウカウンターを格納します。 self.tier_counters: Dict[str, SlidingWindowCounter] = {} # クライアントティア、クライアント最終アクティビティ、ティアキューなどの共有データ構造を保護するためのメインロック。 # SlidingWindowCounter は独自の内部ロックを持っています。 self.global_lock = threading.Lock() # ティアカウンターを初期化します。 for tier_name, config in tier_config.items(): self.tier_counters[tier_name] = SlidingWindowCounter( max_requests=config["max_requests"], window_seconds=config["window_seconds"] ) # クリーンアップ用のバックグラウンドスレッドを開始します。 self.cleanup_thread = threading.Thread(target=self._cleanup_task, daemon=True) self.cleanup_thread.start() def register_client(self, client_id: str, tier: str) -> bool: """クライアントを指定されたティアで登録します。成功した場合は True を、クライアントが既に存在するかティアが無効な場合は False を返します。""" if tier not in self.tier_config: print(f"[!] エラー: ティア '{tier}' は存在しません。") return False with self.global_lock: if client_id in self.client_tiers: print(f"[!] 警告: クライアント '{client_id}' は既に登録されています。") return False self.client_tiers[client_id] = tier self.client_last_activity[client_id] = time.time() print(f"[*] クライアント '{client_id}' がティア '{tier}' で登録されました。") return True def _get_client_tier(self, client_id: str) -> Optional[str]: """指定された client_id のティアを安全に取得します。""" with self.global_lock: return self.client_tiers.get(client_id) def _update_client_activity(self, client_id: str): """クライアントの最終アクティビティタイムスタンプを更新します。""" with self.global_lock: self.client_last_activity[client_id] = time.time() def allow_request(self, client_id: str) -> bool: """クライアントのティアのレート制限に基づいて、client_id からのリクエストが許可されるかどうかを確認します。""" tier = self._get_client_tier(client_id) if not tier: print(f"[!] エラー: クライアント '{client_id}' は登録されていません。") return False counter = self.tier_counters.get(tier) if not counter: # tier_config が有効で初期化が正しければ、これは発生しないはずです。 print(f"[!] 内部エラー: ティア '{tier}' のカウンターが見つかりません。") return False if counter.record_request(): self._update_client_activity(client_id) # print(f"[*] リクエスト許可: クライアント '{client_id}' (ティア: {tier})。現在のカウント: {counter.get_current_count()}") return True else: # print(f"[*] リクエスト拒否: クライアント '{client_id}' (ティア: {tier})。現在のカウント: {counter.get_current_count()}") return False def enqueue_request(self, client_id: str, priority: int, payload: Any): """レート制限されたリクエストを適切な優先度付きキューにエンキューします。""" tier = self._get_client_tier(client_id) if not tier: print(f"[!] エラー: クライアント '{client_id}' は登録されていません。エンキューできません。") return request = Request(priority=priority, client_id=client_id, timestamp=time.time(), payload=payload) with self.global_lock: heapq.heappush(self.tier_queues[tier], request) print(f"[*] リクエストエンキュー: クライアント '{client_id}' (ティア: {tier}, 優先度: {priority})。キューサイズ: {len(self.tier_queues[tier])}") self._update_client_activity(client_id) def dequeue_and_process(self, client_id: str) -> Optional[Tuple[Request, bool]]: """クライアントのティアで利用可能な容量がある場合、最優先度リクエストをデキューして処理しようとします。""" tier = self._get_client_tier(client_id) if not tier: print(f"[!] エラー: クライアント '{client_id}' は登録されていません。デキューできません。") return None counter = self.tier_counters.get(tier) if not counter: print(f"[!] 内部エラー: ティア '{tier}' のカウンターが見つかりません。") return None # デキューを試みる前に、容量が利用可能かどうかを確認します。 # 容量がない場合に不要なロックとヒープ操作を防ぎます。 if counter.get_current_count() < counter.max_requests: with self.global_lock: queue = self.tier_queues[tier] if queue: # ロック内で容量を再確認します。競合状態を処理するためです。 # 他のスレッドがちょうど容量を使い切った可能性があります。 if counter.get_current_count() < counter.max_requests: highest_priority_request = heapq.heappop(queue) # リクエストを再度記録しようとします。容量があれば成功するはずです。 if counter.record_request(): self._update_client_activity(highest_priority_request.client_id) print(f"[*] リクエストデキューと処理完了: クライアント '{highest_priority_request.client_id}' (ティア: {tier}, 優先度: {highest_priority_request.priority})。キューサイズ: {len(queue)}") return highest_priority_request, True else: # これはまれな競合状態です: 容量は利用可能でしたが、record_request が失敗しました。 # リクエストを戻し、エラーを記録します。 heapq.heappush(queue, highest_priority_request) print(f"[!] 競合状態: {highest_priority_request.client_id} のデキューされたリクエストの記録に失敗しました。戻しました。") return highest_priority_request, False else: # 外側のチェックと内側のチェックの間に、別のスレッドが容量を占有しました。 # ポップされた場合、リクエストをキューに戻します。 heapq.heappush(queue, highest_priority_request) # print(f"[*] {client_id} のデキュー試行失敗: 容量が他のスレッドによって満たされました。") return highest_priority_request, False else: # キューは空です。 # print(f"[*] {client_id} のデキュー試行: キューは空です。") return None, False else: # 容量が利用できません。 # print(f"[*] {client_id} のデキュー試行: 容量が利用できません。") return None, False def _cleanup_task(self): """非アクティブなクライアントデータを削除するバックグラウンドタスクです。""" while True: time.sleep(self.cleanup_threshold_seconds / 2) # 定期的にチェック current_time = time.time() clients_to_remove = [] with self.global_lock: for client_id, last_activity in self.client_last_activity.items(): if current_time - last_activity > self.cleanup_threshold_seconds: clients_to_remove.append(client_id) for client_id in clients_to_remove: tier = self.client_tiers.pop(client_id, None) if tier: self.client_last_activity.pop(client_id, None) # 必要に応じて、このクライアントのキューをクリアすることもできますが、通常は行われません。 # 簡単のため、エンキューされたリクエストはそのままにしておきます。 print(f"[*] 非アクティブなクライアント '{client_id}' をクリーンアップしました。") # --- デモンストレーションスクリプト --- def simulate_requests(rate_limiter: RateLimiter, client_id: str, num_requests: int, delay: float, priority_offset: int = 0): """単一クライアントからのリクエストのバーストをシミュレートします。""" print(f"--- クライアント '{client_id}' から {num_requests} 件のリクエストをシミュレート中 ---") for i in range(num_requests): # エンキューされたリクエストの優先度を変化させます priority = i % 5 + priority_offset # 数値が小さいほど優先度が高い payload = f"data_{client_id}_{i}" if rate_limiter.allow_request(client_id): print(f"[スレッド {threading.current_thread().name}] リクエスト {i+1}/{num_requests} 許可: '{client_id}'。ペイロード: {payload}") else: print(f"[スレッド {threading.current_thread().name}] リクエスト {i+1}/{num_requests} 拒否: '{client_id}'。優先度 {priority} でエンキューします。") rate_limiter.enqueue_request(client_id, priority, payload) time.sleep(delay) def process_deferred_requests(rate_limiter: RateLimiter, client_id: str, num_attempts: int): """クライアントの遅延リクエストを処理しようと定期的に試みます。""" print(f"--- クライアント '{client_id}' の遅延リクエストの処理を試行中 ---") for i in range(num_attempts): time.sleep(1.5) # 容量が解放されるのを少し待ちます print(f"[プロセッサースレッド] '{client_id}' のデキュー試行 {i+1}/{num_attempts}...") result = rate_limiter.dequeue_and_process(client_id) if result: request, success = result if success: print(f"[プロセッサースレッド] 正常に処理されました: {request.payload} (優先度: {request.priority})") else: print(f"[プロセッサースレッド] 処理に失敗しました (容量の問題?): {request.payload} (優先度: {request.priority})。再エンキューしました。") else: print(f"[プロセッサースレッド] '{client_id}' の遅延リクエストはこの試行では処理されませんでした (キュー空または容量なし)。") if __name__ == "__main__": print("--- レートリミッターデモンストレーション ---") # レートリミッターを初期化します limiter = RateLimiter(tier_config=TIER_CONFIG, cleanup_threshold_seconds=CLEANUP_THRESHOLD_SECONDS) # クライアントを登録します clients = { "client_A": "free", "client_B": "standard", "client_C": "premium", "client_D": "free", "client_E": "standard", } for cid, tier in clients.items(): limiter.register_client(cid, tier) print("\n--- 同時リクエストのシミュレーション ---") # 複数のスレッドからリクエストのバーストをシミュレートします threads = [] # クライアント A (無料ティア) - レート制限に達する可能性が高い t1 = threading.Thread(target=simulate_requests, args=(limiter, "client_A", 15, 0.1, 0), name="Thread-A") threads.append(t1) # クライアント B (標準ティア) - より多くの容量 t2 = threading.Thread(target=simulate_requests, args=(limiter, "client_B", 60, 0.05, 1), name="Thread-B") threads.append(t2) # クライアント C (プレミアムティア) - 最も高い容量 t3 = threading.Thread(target=simulate_requests, args=(limiter, "client_C", 250, 0.01, 2), name="Thread-C") threads.append(t3) # クライアント D (無料ティア) - 同じ制限に達する別のクライアント t4 = threading.Thread(target=simulate_requests, args=(limiter, "client_D", 10, 0.15, 3), name="Thread-D") threads.append(t4) # シミュレーションスレッドを開始します for t in threads: t.start() # 初期リクエストとエンキューに時間を置きます time.sleep(2) print("\n--- 遅延リクエストの処理を試行中 ---") # レート制限された可能性のあるクライアントの遅延リクエストを処理するスレッドを開始します # クライアント A と D (無料ティア) について複数回試行します。 processor_threads = [] tp1 = threading.Thread(target=process_deferred_requests, args=(limiter, "client_A", 5), name="Processor-A") processor_threads.append(tp1) tp2 = threading.Thread(target=process_deferred_requests, args=(limiter, "client_D", 5), name="Processor-D") processor_threads.append(tp2) # クライアント B についても試行します。 tp3 = threading.Thread(target=process_deferred_requests, args=(limiter, "client_B", 3), name="Processor-B") processor_threads.append(tp3) for tp in processor_threads: tp.start() # すべてのシミュレーションスレッドが完了するのを待ちます for t in threads: t.join() # プロセッサースレッドが試行を完了するのを待ちます for tp in processor_threads: tp.join() print("\n--- デモンストレーション完了 ---") print("最終状態チェック (カウントはタイミングにより概算です):") for tier_name, counter in limiter.tier_counters.items(): print(f"ティア '{tier_name}': 現在のウィンドウ内のリクエスト数: {counter.get_current_count()} (最大: {counter.max_requests})") for tier_name, queue in limiter.tier_queues.items(): print(f"ティア '{tier_name}': キュー内の遅延リクエスト数: {len(queue)}") # --- 設計上の選択に関する説明 --- # 1. スライディングウィンドウの実装: # - ウィンドウ内のリクエストのタイムスタンプを格納するために `collections.deque` を使用します。 # - `deque` は、新しいタイムスタンプの追加と古いタイムスタンプの左からの削除に O(1) の計算量を提供します。 # - `record_request` が呼び出されると、まず `current_time - window_seconds` より古いすべてのタイムスタンプを削除します。 # - これにより、`len(self.request_timestamps)` が現在のスライディングウィンドウ内のリクエスト数を正確に反映することが保証されます。 # - トレードオフ: このアプローチは正確ですが、多数のリクエストが短時間で到着し、その後停止した場合、`record_request` で複数のポップが発生する可能性があります。ただし、時間の経過とともに deque のサイズは管理されます。 # - スレッドセーフティ: 各 `SlidingWindowCounter` インスタンスは、`request_timestamps` deque を保護するための独自の `threading.Lock` を持ち、同じティアに対する `record_request` または `get_current_count` の同時呼び出しがシリアル化されることを保証します。 # 2. 複数のティア: # - `RateLimiter` クラスは、ティア名をレート制限パラメータ (`max_requests`, `window_seconds`) にマッピングする辞書 `tier_config` を保持します。 # - また、各ティアに対して個別の `SlidingWindowCounter` インスタンスと優先度付きキュー (`tier_queues`) を保持します。 # - クライアントティアの割り当ては `client_tiers` に格納されます。 # - これにより、異なるクライアントタイプ間で柔軟な設定とレート制限の分離が可能になります。 # 3. 遅延リクエスト用の優先度付きキュー: # - 各ティアについて、標準の Python リストが `heapq` モジュールによって管理される最小ヒープとして使用されます。 # - `Request` dataclass は、`priority` をプライマリソートキーとして設計されています (値が小さいほど優先度が高い)。 # - 一意の `request_id` が含まれており、安定ソート (同じ優先度の場合は FIFO) を保証し、タイムスタンプが同一の場合の比較エラーを防ぎます。 # - `enqueue_request` は `heapq.heappush` を使用してリクエストを追加し、ヒーププロパティを維持します。 # - `dequeue_and_process` は `heapq.heappop` を使用して最優先度リクエストを取得します。 # - 重要なのは、`dequeue_and_process` はヒープからポップしようとする前に、まず利用可能な容量を確認することです。容量が利用可能な場合、グローバルロックを取得し、容量を再確認し (競合状態を処理するため)、リクエストをポップし、`record_request` を再度呼び出します。`record_request` が成功すると、リクエストが処理されます。失敗した場合 (ちょうど容量が埋められた競合状態のため)、リクエストはヒープに戻されます。 # - トレードオフ: Python のリスト + heapq を使用することは、適度な数の遅延リクエストに対して効率的です。非常に高いボリュームの場合、より専門的な並列優先度キュー実装が検討されるかもしれませんが、それは複雑さを増します。 # 4. スレッドセーフティ: # - `SlidingWindowCounter`: 内部の `threading.Lock` を使用して `request_timestamps` deque を保護します。 # - `RateLimiter`: # - `global_lock` (`threading.Lock`): 共有クライアント状態 (`client_tiers`, `client_last_activity`) とティア優先度キュー (`tier_queues`) を保護します。このロックは、`register_client`、`enqueue_request`、および `dequeue_and_process` 内でキューにアクセス/変更する操作のために取得されます。 # - `allow_request`: スレッドセーフな `counter.record_request()` を呼び出します。また、グローバルロックの下でクライアントアクティビティを更新します。 # - `dequeue_and_process`: ロックを慎重に管理します。まずグローバルロックなしで容量を確認します。容量が利用可能である可能性がある場合、グローバルロックを取得し、容量を再確認し、ヒープ操作を実行し、次にスレッドセーフな `counter.record_request()` を再度呼び出します。 # - トレードオフ: 複数のロック (カウンターごとに 1 つ + グローバルロック 1 つ) の使用は、すべての操作に対して単一のグローバルロックを使用する場合と比較して、競合を減らすことを目的としています。ただし、クライアント状態とティア状態にまたがる操作 (デキューなど) では、慎重なロック順序付けまたはグローバルロックの取得が必要です。 # 5. クリーンアップ: # - バックグラウンドスレッド (`_cleanup_task`) が `client_last_activity` を定期的にスキャンします。 # - 最終アクティビティタイムスタンプが `cleanup_threshold_seconds` より古いクライアントを特定します。 # - これらのクライアントは、`global_lock` の下で `client_tiers` と `client_last_activity` から削除されます。 # - これにより、`RateLimiter` が非アクティブなクライアントの状態を無限に蓄積するのを防ぎます。 # - トレードオフ: クリーンアップは即時ではありません。定期的に実行されます。`cleanup_threshold_seconds` は、メモリ使用量とクリーンアップの応答性の間のトレードオフを決定します。 # 6. エッジケースの処理: # - クライアントの重複登録: `register_client` は False を返し、警告をログに記録します。 # - 未登録クライアント: `allow_request` および `enqueue_request` は `client_tiers` をチェックし、エラーを返します/ログに記録します。 # - 空の優先度付きキュー: `dequeue_and_process` は、`self.tier_queues[tier]` が空の場合のケースを処理します。 # - 同時変更: 適切な `threading.Lock` の使用によって対処されます。 # - 時計の精度: `time.time()` が使用されますが、これは一般的に十分です。非常に高い精度が要求される場合は、プラットフォーム固有の単調クロックが検討されるかもしれませんが、`time.time()` はこの種のアプリケーションでは標準です。 # - デキュー時の競合状態: 容量の再確認と、`dequeue_and_process` のクリティカルセクション内でのリクエストの再記録によって処理されます。 ```

判定

2位

勝利票

1 / 3

平均スコア

61

総合点

44

総評

回答Aは、5つの機能すべて、優れたドキュメント、デモ用スクリプトを備えた完全な実装を提供しています。しかし、重大な設計上の欠陥があります。スライディングウィンドウカウンターがクライアントごとではなくティアごとに共有されているため、同じティアのすべてのクライアントが単一のリクエストカウントを共有します。これにより、クライアントA(無料ティア)が5つのリクエストを使用した場合、クライアントD(同じく無料ティア)は独自のクォータを持つはずなのにブロックされてしまいます。これは根本的な正しさの問題です。優先度付きキューもクライアントごとではなくティアごとに存在するため、特定のクライアントのdequeue_and_processが別のクライアントのリクエストを処理する可能性があります。dequeue_and_processメソッドには、else句の`if queue:`ブロックの外で`highest_priority_request`を参照するバグがあり、UnboundLocalErrorが発生します。また、`time.monotonic()`の代わりに`time.time()`を使用しているため、システムクロックの変更に対して脆弱です。デモ用スクリプトは、合理的なデモ時間内にスライディングウィンドウが実際に機能していることを確認するのが難しい、大きなティア設定(60秒ウィンドウ)を使用しています。コメントは豊富ですが、やや冗長です。

採点詳細を表示

正確さ

重み 35%
35

回答Aには重大な正しさの欠陥があります。スライディングウィンドウカウンターがクライアントごとではなくティアごとであることです。ティアを共有するすべてのクライアントが1つのカウンターを共有するため、クライアントAのリクエストがクライアントDのクォータを消費します。dequeue_and_processメソッドには、潜在的なUnboundLocalErrorがあります(else節で定義されていない可能性のあるhighest_priority_requestを参照しています)。time.monotonic()の代わりにtime.time()を使用しており、クロックジャンプに対して脆弱です。スライディングウィンドウ自体(タイムスタンプのdeque)は単独では正しく実装されていますが、誤った粒度で適用されています。

完全性

重み 20%
55

回答Aは、スライディングウィンドウ、複数のティア、優先度付きキュー、スレッドセーフ、クリーンアップ(バックグラウンドスレッド経由)の5つの機能をすべて実装しています。しかし、クライアントごとではなくティアごとのレート制限により、スライディングウィンドウ機能が意図したとおりに機能しません。デモ用スクリプトが存在し、複数のスレッドを使用しています。クリーンアップはバックグラウンドデーモンスレッドとして実行されます。必要なすべてのコンポーネント(Requestデータクラス、RateLimiterクラス、デモ)が存在します。

コード品質

重み 20%
50

回答Aは、個別のSlidingWindowCounterクラスを備えた合理的な構造を持っていますが、カウンターをティアごとに共有するというアーキテクチャ上の決定は設計上の欠陥です。Requestデータクラスは、order=Trueと__lt__の両方を定義しており、冗長です。global_lockは広範囲に使用されており、並行性の利点を低下させています。コメントは豊富ですが冗長です。コードには、コメントアウトされたprint文がいくつか残っています。dequeue_and_processメソッドは、ネストされた条件と潜在的なバグを備えた複雑なものです。

実用性

重み 15%
35

回答Aのティアごとのレート制限は、各クライアントが独立したレート制限を持つべき実際のユースケースには実用的ではありません。デモでは60秒のウィンドウを使用しているため、合理的な時間でスライディングウィンドウの動作を観察するのが困難です。バックグラウンドクリーンアップスレッドは良い点ですが、コア機能が不正確であるため、実用的な価値が大幅に制限されます。

指示遵守

重み 10%
55

回答Aはほとんどの指示に従っています。RateLimiterクラス、Requestデータクラスを実装し、エッジケース(重複登録、未登録クライアント)を処理し、複数のスレッドを使用したデモを提供します。しかし、クライアントごとではなくティアごとのレート制限は、「クライアントは登録時にティアが割り当てられる」という要件を、解釈を誤っている可能性があります。ティアは制限を定義しますが、各クライアントは個別に追跡されるべきです。デモでは、リクエストが許可され、エンキューされる様子が示されています。設計上の選択はコメントで説明されています。

採点モデル OpenAI GPT-5.2

総合点

46

総評

dequeを使用したワーキングスライディングウィンドウリミッターを実装し、heapqベースのティアごとの遅延キュー、マルチスレッドデモ、および詳細な設計コメントを備えています。しかし、クライアントごとのレート制限のコア設計は誤っています。スライディングウィンドウカウンターはティアごと(同じティアのすべてのクライアントで共有)であるため、あるクライアントのトラフィックが他のクライアントに影響を与え、これは典型的な「クライアントはティアに割り当てられる」というセマンティクスや、プロンプトのクライアントごとの処理の期待に違反します。また、競合状態/ロジックの問題もあります。enqueue_requestはglobal_lockを保持したまま_update_client_activityを呼び出しますが、_update_client_activityはglobal_lockを再取得します(デッドロック)。dequeue_and_processには、内部チェック後に容量が利用できなくなった場合のhighest_priority_requestを参照するバグパスがあります(変数が未定義の可能性があります)。クリーンアップはクライアント登録を削除しますが、キューに入れられたリクエストは(クライアントティアのルックアップが失敗するため)デキューできなくなり、残ります。

採点詳細を表示

正確さ

重み 35%
30

スライディングウィンドウロジック自体は問題ありませんが、クライアントごとではなくティアごとに実装されているため、同じティアのクライアントが互いにスロットリングします。enqueue_requestはデッドロックします(global_lockが保持されたまま_update_client_activityがglobal_lockを再取得します)。dequeue_and_processには、あるブランチに未定義変数バグがあり、クリーンアップはクライアント登録を削除することでキューに入れられたリクエストを孤立させる可能性があります。

完全性

重み 20%
65

RateLimiter、Request dataclass、マルチティア設定、ティアごとの優先度付きキュー、バックグラウンドクリーンアップスレッド、マルチスレッドデモが含まれています。しかし、主要な操作は(デッドロックにより)不完全であり、クリーンアップは期限切れの追跡だけでなくクライアントを削除するため、遅延アイテムに一貫性がなくなります。

コード品質

重み 20%
50

可読性が高く、コメントも豊富ですが、深刻な構造上の問題があります。ネストされたロッキングによるデッドロック、一貫性のないデキュー戻り値(None vs (None, False))、未初期化変数に参照するバグのあるブランチがあります。また、dataclass(order=True)とカスタム__lt__が不必要に混在しています。

実用性

重み 15%
45

実際にはデッドロックによりハングする可能性があり、ティア全体の制限はクライアントベースのレート制限には不向きです。クリーンアップは、削除されたクライアントに対して遅延キューを使用不能にする可能性があります。デモは豊富ですが、確実に完了しない可能性があります。

指示遵守

重み 10%
60

ほとんどの指示(スライディングウィンドウ、ティア、優先度付きキュー、スレッドセーフ性の主張、デモ、コメント)に従っていますが、クライアントベースの制限に関する主要な意図に違反しており、エッジケースの処理が壊れる可能性があります(デッドロック、孤立したキューに入れられたリクエスト)。

採点モデル Google Gemini 2.5 Pro

総合点

92

総評

回答Aは、同時実行レートリミッターの非常に強力で、正確かつ実用的な実装を提供しています。特に`SlidingWindowCounter`を独自のクラスに分離した設計上の選択や、共有ティアキューからの遅延リクエストを処理するための効率的なロジックは堅実です。スレッドセーフの実装は堅牢で、dequeueメソッドにおける競合状態の処理も慎重に行われています。自動化されたバックグラウンドクリーンアップスレッドと包括的なデモスクリプトの組み込みは、その品質をさらに向上させています。クライアントデータとキューに単一のグローバルロックを使用している点はわずかなトレードオフですが、全体的なソリューションはよく設計されており、本番環境での使用に適しています。

採点詳細を表示

正確さ

重み 35%
90

すべての機能にわたって実装は正確です。スライディングウィンドウロジックは標準的で効果的です。優先度付きキューとdequeueロジックは特にうまく処理されており、競合状態に対する堅牢なチェックも含まれています。全体的なシステムは、同時実行下で期待どおりに動作します。

完全性

重み 20%
100

要求された5つの機能はすべて完全に実装されています。このソリューションは、クリーンアップメカニズムを自律的なバックグラウンドスレッドとして実装することで、さらに一歩進んでおり、単純な手動メソッドよりも完全で堅牢なソリューションとなっています。

コード品質

重み 20%
85

関心の分離(例:`SlidingWindowCounter`クラス)が優れており、コード品質は高いです。APIはクリーンで直感的です。キューに単一のグローバルロックを使用している点はわずかな簡略化ですが、正しく実装されています。コードはよくコメントされており、読みやすいです。

実用性

重み 15%
90

このソリューションは非常に実用的です。設計はスケーラブルで効率的であり、最小限の変更で本番環境で使用できる可能性があります。自動化されたクリーンアップと堅牢なdequeueロジックにより、信頼性の高いコンポーネントとなっています。

指示遵守

重み 10%
100

回答は、プロンプトのすべての指示に完全に準拠しています。要求されたすべての機能を実装し、`Request`データクラスを提供し、包括的なデモスクリプトを含み、設計上の選択に関する詳細な説明を提供しています。

import threading import time import heapq from collections import deque from dataclasses import dataclass from typing import Any, Dict, Optional, Tuple import itertools import random # Request dataclass holds minimal required fields @dataclass(order=True) class Request: # ordering will be driven primarily by the priority in heaps priority: int client_id: str timestamp: float payload: Any = None class RateLimiter: """ Thread-safe rate limiter with sliding window per client and per-tier priority queues. Design choices (high-level): - Sliding window implemented per client usin...

全文を表示

import threading import time import heapq from collections import deque from dataclasses import dataclass from typing import Any, Dict, Optional, Tuple import itertools import random # Request dataclass holds minimal required fields @dataclass(order=True) class Request: # ordering will be driven primarily by the priority in heaps priority: int client_id: str timestamp: float payload: Any = None class RateLimiter: """ Thread-safe rate limiter with sliding window per client and per-tier priority queues. Design choices (high-level): - Sliding window implemented per client using a deque of request timestamps. This gives exact counts for the sliding window (precision) at the cost of storing one timestamp per request. - Per-tier priority queues implemented with heapq. Each queued Request is pushed as (priority, counter, request) to preserve FIFO among same-priority items. Lower priority number = higher priority. - Concurrency: use a combination of locks: * self.clients_lock protects the client registry structure. * each client has its own lock to protect its timestamp deque and last_seen metadata (reduces contention). * each tier has its own lock to protect its priority queue. Lock ordering is important to avoid deadlocks: when both client lock and tier lock need to be acquired, the implementation always acquires the client lock first, then the tier lock. All methods follow this order. - For dequeue_for_client, we must find the highest-priority waiting request for that client in the per-tier heap. Because heapq does not support efficient arbitrary-item removal, we pop items until we find one for the client, buffering others and pushing them back afterwards. This can be O(n) in the queue size in the worst case — a trade-off favoring simpler data structures and correctness over per-operation worst-case performance. - Time functions use time.monotonic() for durations to avoid system clock jumps affecting rate limiting. Trade-offs: - Precision vs space: deque per client is precise for sliding window but stores each request timestamp. - Performance vs complexity: scanning heaps for dequeue_for_client is simple but can be costly if queues are large. """ def __init__(self, tiers: Dict[str, Dict[str, float]]): """ tiers: mapping tier_name -> { 'max_requests': int, 'window_seconds': float } """ self.tiers = {} self.tiers_lock = threading.Lock() self._counter = itertools.count() # Initialize tier structures for name, conf in tiers.items(): if 'max_requests' not in conf or 'window_seconds' not in conf: raise ValueError(f"Tier {name} missing configuration") self.tiers[name] = { 'max_requests': int(conf['max_requests']), 'window_seconds': float(conf['window_seconds']), 'queue': [], # heap: elements (priority, counter, Request) 'lock': threading.Lock(), } # Clients registry: client_id -> { 'tier': str, 'timestamps': deque, 'lock': Lock, 'last_seen': float } self.clients: Dict[str, Dict[str, Any]] = {} self.clients_lock = threading.Lock() def register_client(self, client_id: str, tier_name: str): """Register a client into a tier. Raises ValueError for duplicate registration or unknown tier. Thread-safe. """ with self.clients_lock: if client_id in self.clients: raise ValueError(f"Client {client_id} already registered") if tier_name not in self.tiers: raise ValueError(f"Unknown tier {tier_name}") self.clients[client_id] = { 'tier': tier_name, 'timestamps': deque(), 'lock': threading.Lock(), 'last_seen': time.monotonic(), } def _remove_old_timestamps(self, client_entry: Dict[str, Any], window_seconds: float, now: float): # remove timestamps older than sliding window timestamps: deque = client_entry['timestamps'] cutoff = now - window_seconds while timestamps and timestamps[0] <= cutoff: timestamps.popleft() def allow_request(self, client_id: str, priority: int = 100, payload: Any = None, timestamp: Optional[float] = None) -> Tuple[bool, Optional[Request]]: """ Attempt to allow a request for client_id. If under the sliding window limit, the request is recorded and allowed (returns (True, None)). If the client is at capacity, the request is enqueued into the client's tier priority queue and (False, request) is returned. Raises KeyError if client is not registered. Thread-safe. """ if timestamp is None: now = time.monotonic() else: # If caller supplied an external timestamp, convert it to monotonic-relative by using monotonic() now # and assuming external timestamp is in same epoch; for demonstration we accept timestamp as-is. now = timestamp # Get client entry with self.clients_lock: client_entry = self.clients.get(client_id) if client_entry is None: raise KeyError(f"Client {client_id} is not registered") tier_name = client_entry['tier'] tier = self.tiers[tier_name] # Acquire client lock first (lock ordering rule), then possibly tier lock. with client_entry['lock']: self._remove_old_timestamps(client_entry, tier['window_seconds'], now) if len(client_entry['timestamps']) < tier['max_requests']: # allow request client_entry['timestamps'].append(now) client_entry['last_seen'] = now return True, None else: # at capacity -> enqueue into tier priority queue req = Request(priority=priority, client_id=client_id, timestamp=now, payload=payload) # Acquire tier lock to push into heap with tier['lock']: count = next(self._counter) heapq.heappush(tier['queue'], (req.priority, count, req)) client_entry['last_seen'] = now return False, req def enqueue_request(self, req: Request): """Directly enqueue a Request into its client's tier queue. Raises if client not registered. Thread-safe. """ with self.clients_lock: client_entry = self.clients.get(req.client_id) if client_entry is None: raise KeyError(f"Client {req.client_id} is not registered") tier_name = client_entry['tier'] tier = self.tiers[tier_name] with tier['lock']: count = next(self._counter) heapq.heappush(tier['queue'], (req.priority, count, req)) def dequeue_for_client(self, client_id: str, timestamp: Optional[float] = None) -> Optional[Request]: """ If capacity is available for client_id, dequeue and return the highest-priority queued Request for the client's tier. The returned Request is considered processed and will be counted toward the window. If no queued request is present for the client or capacity isn't available, return None. Raises KeyError if client not registered. Thread-safe. """ if timestamp is None: now = time.monotonic() else: now = timestamp with self.clients_lock: client_entry = self.clients.get(client_id) if client_entry is None: raise KeyError(f"Client {client_id} is not registered") tier_name = client_entry['tier'] tier = self.tiers[tier_name] # Acquire client lock first, then tier lock (consistent ordering) with client_entry['lock']: self._remove_old_timestamps(client_entry, tier['window_seconds'], now) if len(client_entry['timestamps']) >= tier['max_requests']: # no capacity return None # There is capacity; try to find highest-priority queued request for this client with tier['lock']: queue = tier['queue'] if not queue: return None temp = [] found_req = None try: while queue: priority, count, req = heapq.heappop(queue) if req.client_id == client_id: found_req = req break else: temp.append((priority, count, req)) finally: # push back any popped non-matching requests for item in temp: heapq.heappush(queue, item) if found_req is None: return None # mark the request as processed: append timestamp to client's timestamps client_entry['timestamps'].append(now) client_entry['last_seen'] = now return found_req def cleanup(self, cleanup_threshold_seconds: float): """ Remove tracking data for clients who have not made requests in the last cleanup_threshold_seconds. Does not unregister the client (keeps registration), but removes timestamp history to free memory. If a client has queued deferred requests, we keep their tracking data. Thread-safe. """ now = time.monotonic() cutoff = now - cleanup_threshold_seconds removed = [] # iterate clients safely with self.clients_lock: for client_id, client_entry in list(self.clients.items()): # acquire client lock to inspect safely with client_entry['lock']: # if the client has no activity recently and no queued requests, clear timestamps last_seen = client_entry.get('last_seen', 0) tier_name = client_entry['tier'] tier = self.tiers[tier_name] has_queued = False # check if any queued requests exist for this client in tier queue with tier['lock']: for _, _, req in tier['queue']: if req.client_id == client_id: has_queued = True break if last_seen < cutoff and not has_queued: # clear timestamps to free memory client_entry['timestamps'].clear() removed.append(client_id) return removed # Demonstration script if __name__ == "__main__": # Create a rate limiter with two tiers: free and premium tiers_conf = { 'free': {'max_requests': 3, 'window_seconds': 5.0}, 'premium': {'max_requests': 10, 'window_seconds': 5.0}, } rl = RateLimiter(tiers_conf) # Register clients clients = [ ('alice', 'free'), ('bob', 'free'), ('carol', 'premium'), ] for cid, tier in clients: rl.register_client(cid, tier) print(f"Registered client {cid} as tier {tier}") stop_event = threading.Event() # Worker threads simulate bursts of requests def client_worker(client_id: str, burst: int, sleep_between: float): for i in range(burst): allowed, req = rl.allow_request(client_id, priority=random.randint(1, 10), payload={'i': i}) ts = time.monotonic() if allowed: print(f"[{ts:.3f}] {client_id}: request {i} ALLOWED") else: print(f"[{ts:.3f}] {client_id}: request {i} ENQUEUED (priority={req.priority})") time.sleep(sleep_between) # Background processor that periodically tries to dequeue queued requests for all clients def deferred_processor(): while not stop_event.is_set(): # iterate all clients and try to dequeue one request if capacity exists with rl.clients_lock: client_ids = list(rl.clients.keys()) progressed = False for cid in client_ids: try: req = rl.dequeue_for_client(cid) except KeyError: continue if req is not None: ts = time.monotonic() print(f"[{ts:.3f}] Processed deferred request for {cid} (priority={req.priority}, payload={req.payload})") progressed = True if not progressed: # nothing processed; sleep a bit time.sleep(0.2) # Start threads: bursty clients threads = [] t1 = threading.Thread(target=client_worker, args=('alice', 8, 0.4)) t2 = threading.Thread(target=client_worker, args=('bob', 6, 0.3)) t3 = threading.Thread(target=client_worker, args=('carol', 12, 0.1)) threads.extend([t1, t2, t3]) processor_thread = threading.Thread(target=deferred_processor) processor_thread.daemon = True processor_thread.start() for t in threads: t.start() # Let threads finish for t in threads: t.join() # Allow background processor to handle remaining queued items for some time time.sleep(3.0) stop_event.set() processor_thread.join(timeout=1.0) # Show cleanup removed = rl.cleanup(cleanup_threshold_seconds=1.0) print(f"Cleanup removed timestamp history for clients: {removed}") # Show queued items left in each tier for tier_name, tier in rl.tiers.items(): with tier['lock']: qsize = len(tier['queue']) print(f"Tier '{tier_name}' has {qsize} queued requests left") print("Demo complete")

判定

1位 | 勝者

勝利票

2 / 3

平均スコア

76

総合点

73

総評

回答Bは、クライアントごとのスライディングウィンドウを正しく実装し、クロックの堅牢性のためにtime.monotonic()を使用し、デッドロックを防ぐためにドキュメント化されたロック順序を持つきめ細かなロッキングを採用し、妥当な時間で実行されるよく設計されたデモンストレーションを提供しています。クライアントごとのレート制限は、問題の正しい解釈です。dequeue_for_clientメソッドは、ティアキュー内で特定のクライアントに属するリクエストを正しく検索しますが、O(n)のスキャンは文書化されたトレードオフです。cleanupメソッドは、バックグラウンドスレッドではなく明示的な呼び出しとして実装されており、これはわずかな逸脱ですが、要件を満たしています。allow_requestメソッドは、許可チェックとエンキューを1つの呼び出しに結合しており、プロンプトの別々のメソッドとはわずかに異なりますが、enqueue_requestメソッドも別途提供されています。コードは適切なロック順序(クライアントロック、次にティアロック)を一貫して使用しています。1つの軽微な問題:Request dataclassは、順序=Trueで混合型を使用しており、比較の問題を引き起こす可能性がありますが、ヒープタプルは(priority, counter, request)を使用しており、直接のRequest比較を回避しています。デモは簡潔で、イベントのシーケンスを明確に示しています。

採点詳細を表示

正確さ

重み 35%
75

回答Bは、クライアントごとのタイムスタンプのdequeを使用して、クライアントごとのスライディングウィンドウを正しく実装しています。クロック変更に対する堅牢性のためにtime.monotonic()を使用しています。ロック順序は文書化され、一貫して従われています(クライアントロック、次にティアロック)。dequeue_for_clientは、ティアキュー内の特定のクライアントのリクエストを正しく検索します。ヒープは、安定した順序付けを保証するために(priority, counter, request)タプルを使用しています。軽微な懸念:dequeue内のO(n)スキャンは文書化されたトレードオフです。全体的な正しさは堅固です。

完全性

重み 20%
70

回答Bは、5つの機能すべてを実装しています。スライディングウィンドウはクライアントごと(正しい)、複数のティアがサポートされ、優先度付きキュー(heapqを使用)が実装され、スレッドセーフ(きめ細かなロッキング)が提供され、クリーンアップが明示的なメソッドとして提供されています。cleanupメソッドはクライアント全体を削除するのではなくタイムスタンプをクリアしますが、これは妥当な解釈です。デモスクリプトは、複数のスレッドとバックグラウンドプロセッサースレッドを使用しています。Request dataclassには、必要なフィールドがすべて含まれています。

コード品質

重み 20%
75

回答Bは、クライアントごとおよびティアごとのきめ細かなロッキングにより、クリーンな分離を実現しています。ロック順序は明示的に文書化され、従われています。ヒープのタイブレークにitertools.count()を使用しているのはエレガントです。_remove_old_timestampsヘルパーはうまくファクタリングされています。allow_requestがチェックとエンキューを結合しているのは実用的です。コードは、明確なメソッドの責任を持つように整理されています。cleanupメソッドがキューに入れられたリクエストを確認してからクリアするのは思慮深いです。軽微な問題:順序=TrueのRequest dataclassは、比較不可能なペイロード型で問題を引き起こす可能性がありますが、ヒープタプルは直接のRequest比較を回避しています。

実用性

重み 15%
70

回答Bのクライアントごとのレート制限は、正しく実用的なアプローチです。デモでは5秒のウィンドウを使用してスライディングウィンドウの動作を明確に示しています。きめ細かなロッキングにより、本番環境での同時実行性が向上します。dequeue_for_clientのO(n)スキャンは、大規模な環境では懸念事項となる可能性がありますが、文書化されています。cleanupメソッドは実用的であり、デモではそれが呼び出されていることが示されています。全体として、この実装は実際のレートリミッターの合理的な出発点として機能する可能性があります。

指示遵守

重み 10%
70

回答Bは指示にうまく従っています。説明されたインターフェースを持つRateLimiterクラス、必要なフィールドを持つRequest dataclassを実装し、エッジケース(ValueError/KeyErrorを適切に発生させる)を処理し、マルチスレッドデモを提供します。allow_requestメソッドは、プロンプトの別々のallow_requestおよびenqueueメソッドとはわずかに異なりますが、チェックとエンキューを結合しており、enqueue_requestも別途提供されています。デモは、許可されたリクエスト、キューに入れられたリクエスト、および遅延処理を明確に示しています。設計上の選択はクラスのdocstringに文書化されています。

採点モデル OpenAI GPT-5.2

総合点

82

総評

クライアントごとの真のスライディングウィンドウ(タイムスタンプのdeque)、安定したFIFOタイブレークを備えたティアごとの優先度付きキュー、および一貫したロッキング戦略(clients_lockに加えて、明示的な順序付けを備えたクライアントごとおよびティアごとのロック)を備えた、明確で実行可能な実装を提供します。allow_requestは、制限時にエンキューし、構造化された結果を返し、dequeue_for_clientは、クライアントごとの容量を正しくチェックしてから、その特定のクライアントの最も優先度の高いキューに入れられたリクエスト(明示的なドキュメント化されたO(n)ヒープスキャントレードオフ付き)をプルします。clock-jumpの問題を回避するためにtime.monotonicを使用します。クリーンアップは実装されており、スレッドセーフですが、クライアントを完全に削除するのではなくタイムスタンプ履歴をクリアします。これは合理的な解釈ですが、「期限切れの追跡データを削除する」とはわずかに異なります。デモでは、複数のスレッドとバックグラウンドプロセッサを使用して、遅延処理を読みやすい出力で示します。

採点詳細を表示

正確さ

重み 35%
85

eviction by cutoffを備えたdequeを使用したクライアントごとの正しいスライディングウィンドウ。ティアごとのヒープは(優先度、カウンター)で順序付けられます。明示的なロック順序付けにより、スレッドセーフティは大部分健全です。dequeue_for_clientは機能的には正しいですが、O(n)スキャンです。モノトニック時間を使用してクロックの問題を回避します。

完全性

重み 20%
85

要求されたすべての機能(スライディングウィンドウ、複数のティア、ティアごとの優先度付きキュー、スレッドセーフメソッド、およびクリーンアップメソッド)を実装しています。デモは、同時バースト、エンキュー、およびバックグラウンドプロセッサによる後続の処理を示しています。クリーンアップの動作はわずかに異なります(クライアントを削除するのではなくタイムスタンプをクリアします)が、期限切れの追跡データを削除するという点では同じです。

コード品質

重み 20%
75

クリーンな構造、トレードオフを説明する優れたドキュメンテーション文字列、一貫したロッキング戦略、および標準ライブラリの妥当な使用。いくつかの非効率性(クライアントごとのデキューのためのヒープスキャン)と、わずかに混乱を招くオプションのタイムスタンプコメントがありますが、全体的には堅実で保守可能です。

実用性

重み 15%
75

クライアントごとのスロットリング、クロックジャンプに対する堅牢性、および実際の同時実行性と遅延処理のデモンストレーションを備えた、参照実装として実用的で利用可能です。クリーンアップはメモリ制御に役立ちますが、クライアントの登録解除は行いません。

指示遵守

重み 10%
85

要求されたインターフェイス(RateLimiter、Request dataclass、必須メソッド、例外/None戻り値によるエッジケース、および明確なデモ出力)に非常に近いです。わずかな逸脱:クリーンアップはクライアントを削除するのではなく追跡をクリアしますが、期限切れの追跡データを削除するという点では依然として一致しています。

採点モデル Google Gemini 2.5 Pro

総合点

73

総評

回答Bは、きめ細かなロック戦略を採用し、精度向上のために`time.monotonic()`を正しく使用するなど、並行処理に関する高度な理解を示しています。docstring内のドキュメントも優れています。しかし、遅延リクエストのデキュー設計には致命的な欠陥があります。`dequeue_for_client`メソッドは、特定のクライアントのリクエストを見つけるために、優先度付きキュー全体をスキャンする必要があり、これはO(n)の操作で非常に非効率的であり、スケーリングしません。このアルゴリズム上の弱点が、他の領域での強みにもかかわらず、ソリューションの実用的な価値を著しく損なっています。

採点詳細を表示

正確さ

重み 35%
65

スライディングウィンドウと基本的なスレッドセーフティ(きめ細かなロックと明示的な順序付けを使用)は正しく、うまく実装されています。`time.monotonic`の使用はプラスです。しかし、`dequeue_for_client`のコアロジックはアルゴリズム的に非効率的(ヒープのO(n)スキャン)であり、高性能データ構造のコンテキストでは重大な正しさの欠陥です。これにより、意図された目的に対して設計が不適切になります。

完全性

重み 20%
90

要求された5つの機能はすべて実装されています。クリーンアップ機能は手動メソッドとして提供されており、プロンプトの「メソッドを提供する」という要件を満たしていますが、回答Aが提供する自動バックグラウンドタスクよりも不完全なソリューションです。

コード品質

重み 20%
75

コードは、きめ細かなロックや明示的なロック順序付けルールなど、高品質な並行プログラミングプラクティスを示しています。例外を使用したエラー処理も良いプラクティスです。しかし、設計は非効率的なデキューアルゴリズムによって損なわれており、メインクラスはネストされた辞書ですべての状態を保持しているため、ややモノリシックです。

実用性

重み 15%
50

非効率的な`dequeue_for_client`実装により、実用的な価値は著しく制限されています。多数の遅延リクエストがある実際のシナリオでは、優先度付きキューの線形スキャンが主要なパフォーマンスボトルネックとなり、システム全体が使用不能になります。高度なロックでは、この根本的な設計上の欠陥を克服できません。

指示遵守

重み 10%
100

回答はプロンプトのすべての指示に完全に準拠しています。要求されたすべての機能を実装し、`Request`データクラスを提供し、包括的なデモンストレーションスクリプトを含み、クラスdocstringで設計上の選択について詳細な説明を提供しています。

比較結果サマリー

最終順位は、採点者ごとの順位集約(平均順位 + ボルダ方式の同点処理)で決定します。平均点は参考表示です。

採点者数: 3

勝利票

2 / 3

平均点

76
この回答を見る

採点結果

採点モデル Google Gemini 2.5 Pro

勝者理由

回答Aは、より優れた、より実用的なコア設計により、勝者となります。このタスクの最も重要な部分は、基本的なスライディングウィンドウを超えて、遅延リクエストの処理です。回答Aは、容量が利用可能になったときに、ティアのキュー内の最優先リクエストが処理される効率的なシステムを実装しており、これは標準的でスケーラブルなアプローチです。対照的に、回答Bのデキュー設計は根本的に非効率であり、優先度付きキューの線形スキャンが必要です。このアルゴリズム上の欠陥により、回答Bのソリューションは実際の使用には実用的ではありません。回答Bはより高度なロッキングメカニズムを備えていますが、これはコアロジックにおける重大なパフォーマンスの問題を補うものではありません。回答Aのソリューションは、より堅牢で、より構造化され、全体的により効率的です。

採点モデル OpenAI GPT-5.2

勝者理由

回答Bは、クライアントごとのスライディングウィンドウレート制限を、真の並行安全性と動作する遅延優先度メカニズムを備えて正しく実装しているため、加重スコアリングで勝利します。一方、回答Aは、機能豊富でコメントも充実しているにもかかわらず、コア機能の基盤を損なう重大な正誤性および並行性のバグ(クライアント間干渉を引き起こすティア全体で共有されるカウンター、enqueue_requestでの確実なデッドロック、およびバグのあるdequeueパス)を抱えています。

勝者理由

回答Bは主に正しさ(重み35%)で勝利します。回答Aには、スライディングウィンドウカウンターがクライアントごとではなくティアごとに共有されるという根本的な設計上の欠陥があり、同じティアのクライアントが互いのレート制限に干渉します。回答Bは、クライアントごとのスライディングウィンドウを正しく実装しています。また、回答Bはクロックの堅牢性のためにtime.monotonic()を使用し、適切な細粒度ロックと文書化されたロック順序を持ち、妥当な時間で実行され、レートリミッターが機能していることを明確に示すデモンストレーションを提供します。回答Aはより広範なコメントを持っていますが、その中核となるアルゴリズム設計は、指定された要件に対して不正確です。

X f L