回答A: OpenAI GPT-5.5
import math import threading import time import unittest from collections import deque from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field # 設計上の注意: # このリミッターは、2つの独立したチェックが一致した場合にのみリクエストを許可します。 # 1. トークンバケットに少なくともコストトークンがあり、バーストまで短いバーストを許可します。 # 2. 精密なスライディングウィンドウログに、最後のウィンドウ秒内にコスト以上の許可されたユニットの空きがあること。 # ウィンドウ制限は rate * window_seconds です。 # トークンバケットがはいと言ってもスライディングウィンドウがいっぱいの場合、リク...
全文を表示 ▼
import math import threading import time import unittest from collections import deque from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field # 設計上の注意: # このリミッターは、2つの独立したチェックが一致した場合にのみリクエストを許可します。 # 1. トークンバケットに少なくともコストトークンがあり、バーストまで短いバーストを許可します。 # 2. 精密なスライディングウィンドウログに、最後のウィンドウ秒内にコスト以上の許可されたユニットの空きがあること。 # ウィンドウ制限は rate * window_seconds です。 # トークンバケットがはいと言ってもスライディングウィンドウがいっぱいの場合、リクエストは拒否され、トークンは消費されません。 # ウィンドウに空きがあってもトークンがない場合も拒否されます。 # 精密ログは、許可されたリクエストごとに1つのエントリを (タイムスタンプ, コスト) として格納し、ユニットごとではありません。 # そのため、許可は償却 O(1) です。各ログエントリは一度追加され、一度削除されます。 # メモリは、アクティブウィンドウ内の許可されたリクエスト数あたり O(1) です。 # 重み付き2バケットカウンターは O(1) メモリを使用しますが、バケット境界付近で過剰または過小に許可する可能性があります。 # この実装は、正確性が通常、不正行為防止により重要であるため、正確性を選択します。 # クライアントマップはロックストライピングされています。各クライアント状態には独自のロックがあります。 # 呼び出し元は、トークン/ウィンドウ状態を変更する前にクライアントごとのロックを取得するため、 # 同時allow呼び出しはトークンを二重消費したり、ウィンドウ更新を失ったりすることはありません。 class MonotonicClock: def now(self) -> float: return time.monotonic() class ManualClock: def __init__(self, start: float = 0.0): self._now = float(start) self._lock = threading.Lock() def now(self) -> float: with self._lock: return self._now def set(self, value: float) -> None: with self._lock: self._now = float(value) def advance(self, seconds: float) -> None: with self._lock: self._now += float(seconds) @dataclass(frozen=True) class ClientConfig: rate: float burst: float window_seconds: float = 1.0 def validated(self) -> 'ClientConfig': rate = float(self.rate) burst = float(self.burst) window = float(self.window_seconds) if not math.isfinite(rate) or rate <= 0: raise ValueError('rate must be a positive finite number') if not math.isfinite(burst) or burst < 1: raise ValueError('burst must be a finite number >= 1') if not math.isfinite(window) or window <= 0: raise ValueError('window_seconds must be a positive finite number') return ClientConfig(rate, burst, window) @property def window_limit(self) -> float: return self.rate * self.window_seconds @dataclass class _ClientState: config: ClientConfig tokens: float last_refill: float last_seen: float events: deque = field(default_factory=deque) window_sum: float = 0.0 lock: threading.Lock = field(default_factory=threading.Lock) class HybridRateLimiter: def __init__( self, rate: float, burst: float, window_seconds: float = 1.0, per_client_configs=None, clock=None, stripes: int = 64, idle_ttl_seconds: float | None = None, cleanup_interval_seconds: float | None = None, ): if stripes <= 0: raise ValueError('stripes must be positive') self._default_config = ClientConfig(rate, burst, window_seconds).validated() self._configs = {} if per_client_configs: for client_id, cfg in per_client_configs.items(): self._configs[client_id] = self._coerce_config(cfg) max_window = self._default_config.window_seconds for cfg in self._configs.values(): max_window = max(max_window, cfg.window_seconds) if idle_ttl_seconds is None: idle_ttl_seconds = max(60.0, 2.0 * max_window) idle_ttl_seconds = float(idle_ttl_seconds) if not math.isfinite(idle_ttl_seconds) or idle_ttl_seconds <= 0: raise ValueError('idle_ttl_seconds must be positive and finite') if idle_ttl_seconds < max_window: raise ValueError('idle_ttl_seconds must be >= the largest window_seconds') self._idle_ttl = idle_ttl_seconds if cleanup_interval_seconds is None: cleanup_interval_seconds = min(60.0, max(1.0, idle_ttl_seconds / 2.0)) cleanup_interval_seconds = float(cleanup_interval_seconds) if not math.isfinite(cleanup_interval_seconds) or cleanup_interval_seconds <= 0: raise ValueError('cleanup_interval_seconds must be positive and finite') self._cleanup_interval = cleanup_interval_seconds self._clock = clock if clock is not None else MonotonicClock() self._clients = {} self._stripe_locks = [threading.Lock() for _ in range(stripes)] self._cleanup_mutex = threading.Lock() self._last_cleanup = self._safe_time() self._eps = 1e-12 @staticmethod def _coerce_config(cfg) -> ClientConfig: if isinstance(cfg, ClientConfig): return cfg.validated() if isinstance(cfg, dict): return ClientConfig(cfg['rate'], cfg['burst'], cfg.get('window_seconds', 1.0)).validated() rate, burst, *rest = cfg window = rest[0] if rest else 1.0 return ClientConfig(rate, burst, window).validated() def _safe_time(self) -> float: t = float(self._clock.now()) if math.isnan(t): raise ValueError('clock returned NaN') return t def _config_for(self, client_id: str) -> ClientConfig: return self._configs.get(client_id, self._default_config) def _stripe_index(self, client_id: str) -> int: return hash(client_id) % len(self._stripe_locks) def _get_state_locked(self, client_id: str) -> _ClientState: idx = self._stripe_index(client_id) stripe = self._stripe_locks[idx] stripe.acquire() try: state = self._clients.get(client_id) if state is None: cfg = self._config_for(client_id) now = self._safe_time() state = _ClientState( config=cfg, tokens=cfg.burst, last_refill=now, last_seen=now, ) self._clients[client_id] = state state.lock.acquire() return state finally: stripe.release() def _advance_and_prune_locked(self, state: _ClientState) -> float: raw_now = self._safe_time() now = max(raw_now, state.last_seen) elapsed = max(0.0, now - state.last_refill) max_useful_elapsed = state.config.burst / state.config.rate elapsed_for_tokens = min(elapsed, max_useful_elapsed) if elapsed_for_tokens > 0: state.tokens = min( state.config.burst, state.tokens + elapsed_for_tokens * state.config.rate, ) state.last_refill = now state.last_seen = now cutoff = now - state.config.window_seconds while state.events and state.events[0][0] <= cutoff + self._eps: _, old_cost = state.events.popleft() state.window_sum -= old_cost if state.window_sum < self._eps: state.window_sum = 0.0 return now def allow(self, client_id: str, cost: int = 1) -> bool: if cost <= 0: raise ValueError('cost must be a positive integer') self._maybe_cleanup() cfg = self._config_for(client_id) if cost > cfg.burst: return False state = self._get_state_locked(client_id) try: now = self._advance_and_prune_locked(state) enough_tokens = state.tokens + self._eps >= cost enough_window = state.window_sum + cost <= state.config.window_limit + self._eps if not (enough_tokens and enough_window): return False state.tokens -= cost if state.tokens < self._eps: state.tokens = 0.0 state.window_sum += cost state.events.append((now, float(cost))) return True finally: state.lock.release() def retry_after(self, client_id: str) -> float: self._maybe_cleanup() state = self._get_state_locked(client_id) try: now = self._advance_and_prune_locked(state) cfg = state.config if cfg.burst < 1 or cfg.window_limit < 1: return math.inf token_wait = 0.0 if state.tokens + self._eps >= 1.0 else (1.0 - state.tokens) / cfg.rate if state.window_sum + 1.0 <= cfg.window_limit + self._eps: window_wait = 0.0 else: window_wait = math.inf remaining = state.window_sum for ts, event_cost in state.events: remaining -= event_cost if remaining + 1.0 <= cfg.window_limit + self._eps: window_wait = max(0.0, ts + cfg.window_seconds - now) break return max(token_wait, window_wait) finally: state.lock.release() def _maybe_cleanup(self) -> None: raw_now = self._safe_time() if raw_now - self._last_cleanup < self._cleanup_interval: return if not self._cleanup_mutex.acquire(blocking=False): return try: now = max(raw_now, self._last_cleanup) if now - self._last_cleanup >= self._cleanup_interval: self._cleanup_stale_at(now) self._last_cleanup = now finally: self._cleanup_mutex.release() def cleanup_stale(self) -> int: with self._cleanup_mutex: now = max(self._safe_time(), self._last_cleanup) removed = self._cleanup_stale_at(now) self._last_cleanup = now return removed def _cleanup_stale_at(self, now: float) -> int: removed = 0 for idx, stripe in enumerate(self._stripe_locks): with stripe: for client_id, state in list(self._clients.items()): if self._stripe_index(client_id) != idx: continue if not state.lock.acquire(blocking=False): continue try: if self._clients.get(client_id) is state and now - state.last_seen >= self._idle_ttl: del self._clients[client_id] removed += 1 finally: state.lock.release() return removed def client_count(self) -> int: total = 0 for stripe in self._stripe_locks: with stripe: total += len(self._clients) break return len(self._clients) class HybridRateLimiterTests(unittest.TestCase): def test_basic_allow_deny_and_retry_after(self): clock = ManualClock() limiter = HybridRateLimiter(rate=1, burst=2, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9) def test_burst_draining_and_fractional_refill(self): clock = ManualClock() limiter = HybridRateLimiter(rate=2, burst=3, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) clock.advance(0.25) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 0.25, places=9) clock.advance(0.25) self.assertTrue(limiter.allow('alice')) def test_sliding_window_caps_even_when_bucket_refilled(self): clock = ManualClock() limiter = HybridRateLimiter(rate=10, burst=10, window_seconds=1, clock=clock) for _ in range(10): self.assertTrue(limiter.allow('abuser')) clock.advance(0.5) self.assertFalse(limiter.allow('abuser')) self.assertAlmostEqual(limiter.retry_after('abuser'), 0.5, places=9) clock.advance(0.5) self.assertTrue(limiter.allow('abuser')) def test_cost_larger_than_burst_is_rejected_without_state_creation(self): clock = ManualClock() limiter = HybridRateLimiter(rate=10, burst=5, window_seconds=10, clock=clock) self.assertFalse(limiter.allow('alice', cost=6)) self.assertEqual(limiter.client_count(), 0) self.assertTrue(limiter.allow('alice', cost=5)) self.assertFalse(limiter.allow('alice')) def test_concurrent_contention_obeys_token_bucket_bound(self): clock = ManualClock() rate = 5 burst = 5 limiter = HybridRateLimiter(rate=rate, burst=burst, window_seconds=100, clock=clock, stripes=8) successes = 0 for tick in range(11): clock.set(tick / 10.0) with ThreadPoolExecutor(max_workers=32) as pool: results = list(pool.map(lambda _: limiter.allow('hot'), range(32))) successes += sum(1 for r in results if r) bound = rate * 1.0 + burst self.assertLessEqual(successes, bound) def test_stale_client_eviction(self): clock = ManualClock() limiter = HybridRateLimiter( rate=10, burst=10, window_seconds=1, clock=clock, idle_ttl_seconds=5, cleanup_interval_seconds=1, ) self.assertTrue(limiter.allow('old-client')) self.assertEqual(limiter.client_count(), 1) clock.advance(6) self.assertTrue(limiter.allow('new-client')) self.assertEqual(limiter.client_count(), 1) def test_clock_going_backwards_is_clamped(self): clock = ManualClock(10) limiter = HybridRateLimiter(rate=1, burst=1, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) clock.set(5) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9) def test_large_pause_refills_only_to_burst(self): clock = ManualClock() limiter = HybridRateLimiter(rate=1, burst=3, window_seconds=1000, clock=clock) for _ in range(3): self.assertTrue(limiter.allow('alice')) clock.advance(10 ** 9) for _ in range(3): self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) if __name__ == '__main__': unittest.main()
判定
勝利票
3 / 3
平均スコア
総合点
総評
回答Aは、ハイブリッドレートリミッターの本番品質で洗練された堅牢な実装を提供します。その主な強みは、ロックストライピングとクライアントごとのロックを組み合わせた高度にスケーラブルな並行処理モデル、バックグラウンドスレッドを回避するエレガントなクリーンアップメカニズム、および競合状態のような複雑な動作を正確に検証する包括的でよく設計されたテストスイートです。コードはクリーンで、構造化されており、クライアントごとの設定のような高度な機能を含む、すべてのプロンプト要件に完全に準拠しています。
採点詳細を表示 ▼
正確さ
重み 35%実装は非常に正確で堅牢です。ロックストライピングとクライアントごとのロックを使用した並行処理モデルは、ボトルネックを回避するための洗練された正確な方法です。クロックの安全性や端数リフィルを含むすべてのエッジケースが正しく処理されています。トークンバケットとスライディングウィンドウの両方のロジックは健全です。
完全性
重み 20%この回答は非常に包括的です。要求されたすべてのAPIメソッドを実装し、指定されたすべてのエッジケースを処理し、要求数を超える包括的なテストスイートを提供し、明確な設計ノートを含んでいます。また、提案されたクライアントごとの設定も実装しており、プロンプトの完全な理解を示しています。
コード品質
重み 20%コード品質は優れています。データクラスのような最新のPython機能を効果的に使用し、論理的なメソッドに構造化されており、非常に読みやすいです。統合されたクリーンアップメカニズムやロックストライピングなどの設計上の選択はエレガントで、高いレベルのエンジニアリングスキルを示しています。
実用性
重み 15%実装は本番グレードの品質であり、非常に実用的です。スケーラブルな並行処理モデルにより、高スループットシステムに適しています。柔軟な設定とエッジケースの堅牢な処理により、実際のアプリケーションで自信を持ってデプロイできます。
指示遵守
重み 10%回答はすべての指示に完全に準拠しています。要求された言語でコードを提供し、指定されたAPI、アルゴリズム、および並行処理戦略を実装し、すべてのエッジケースを処理し、要求されたテストと複雑さの分析を含み、指定された制約内で設計ノートを提供しています。
総合点
総評
回答Aは、要求されたAPIとセマンティクスに直接一致する、強力でほぼ完全な実装です。トークンバケットと正確なスライディングウィンドウログを組み合わせ、注入可能な単調クロックを使用し、ストライプ化されたマップロックとクライアントごとのロックにより単一のグローバルボトルネックを回避し、要求されたエッジケースを網羅する堅牢な決定論的テストが含まれています。設計ノートは簡潔で、トレードオフ、並行処理の正しさ、複雑性に対処しています。マイナーな弱点としては、やや扱いにくいclient_countの実装や、任意のコストではなく1単位の可用性のみを考慮するretry_afterがありますが、これは要求されたAPIに一致しています。
採点詳細を表示 ▼
正確さ
重み 35%要求されたハイブリッドを正しく実装しています:リクエストは、トークンバケットと正確なスライディングウィンドウの両方が許可した場合にのみ通過し、適切なトークン補充クランプ、遅延初期化、古い状態のクリーンアップ、およびクライアントごとのシリアル化されたミューテーションにより二重消費を防ぎます。retry_afterは1単位の可用性に対して一貫性があり、テストは重要な不変条件を網羅しています。マイナーな懸念事項は、client_countが完全な同期なしにマップを読み取ることや、ウィンドウエビクションにおける一部の境界選択などの小さな実装上の癖です。
完全性
重み 20%要求されたほぼすべての項目を網羅しています:必須のAPIサーフェス、ハイブリッドアルゴリズム、注入可能な単調クロック、クライアントごとの設定サポート、明示的なエッジケース、古いクライアントのクリーンアップ、分数タイミング、並行処理に関する議論、複雑性に関する注記、および決定論的な同時実行競合とエビクションを含む8つの意味のあるテスト。これはプロンプトをほぼ完全に満たしています。
コード品質
重み 20%データクラス、分離されたヘルパー、明示的な検証、明確な命名により、構造化され、イディオムに沿ったコードです。ロッキング戦略は慎重に構成されており、設計ノートは簡潔です。クライアントカウントの実装が独特であることや、簡略化できる可能性のある低レベルのロック管理など、いくつかの粗いエッジが残っています。
実用性
重み 15%バックエンドでの使用に実用的です:決定論的なクロック注入、境界のある古い状態のクリーンアップ、競合に対するロックストライピングとクライアントごとのロック、正確なウィンドウアカウンティング、および堅牢なテストにより、デプロイまたは適応可能です。実装は現実的であり、大きな一時停止やVMサスペンションなどの一般的な本番環境のエッジケースに対処しています。
指示遵守
重み 10%プロンプトに密接に従っています:言語選択は有効で、API名は一致し、設計ノートは範囲内であり、テストは注入可能なクロックを使用し、要求されたエッジケースは明示的に対処されています。回答は、技術的要件とドキュメント要件の両方にうまく適合しています。
総合点
総評
回答Aは、洗練された本番品質のハイブリッドレートリミッターです。データクラス、ストライプロック、およびクライアントごとのロックを使用し、stale cleanupをallow呼び出しに統合(バックグラウンドスレッドの問題を回避)、設定を検証し、バッククロックと大きな一時停止の両方に対して経過時間を適切にクランプし、クライアント状態を作成せずにcost>burstを拒否します。8つのテストは、決定論的にManualClockを注入し、rate*T+burst不変量を実際にアサートする同時競合テストを含みます。設計ノートは簡潔で、必要なすべてのポイントに対処しています。
採点詳細を表示 ▼
正確さ
重み 35%ハイブリッドトークンバケット+正確なスライディングウィンドウログを正しく実装しています。max(raw_now, last_seen)によるクランプでclock-backwardsを処理し、状態を作成せずにcost>burstを拒否し、オーバーフローを回避するためにトークンの経過時間をクランプし、マップに対してストライプロックを備えたクライアントごとのロックを使用します。retry_afterは、ウィンドウログの有効期限に基づいて待機時間を正しく計算します。eps処理とイベントアカウンティングは健全に見えます。
完全性
重み 20%すべての必須エッジケースをカバーしています:cost>burst(状態作成なし)、clock-backwards(テストを含む)、大きな一時停止(テストを含む)、遅延初期化、stale eviction(注入されたクロックに結び付けられた_maybe_cleanupによる決定論的)、小数トークン。8つのテストには、ManualClockを使用した決定論的な同時バウンドテストが含まれ、rate*T+burst不変量をアサートします。
コード品質
重み 20%データクラス、MonotonicClock/ManualClockの分離、ClientConfigの検証、クライアントごとおよびストライプロックが明確に説明されており、適切に構造化されています。Cleanupはallowパスに統合されており(バックグラウンドスレッドの複雑さはありません)。簡潔な設計ノートが冒頭にあります。
実用性
重み 15%本番品質:ストライプロックはよりスケーラブルであり、統合された遅延クリーンアップはバックグラウンドスレッドのライフサイクルを回避し、決定論的なテストサポート、設定検証。実際のサービスに組み込むのに適しています。
指示遵守
重み 10%すべての指示に従っています:APIサーフェスはシグネチャと一致し、ハイブリッドアルゴリズム、ストライプロックを備えたクライアントごとのロック、注入可能なモノトニッククロック、すべてエッジケース、決定論的な同時不変量テストを含む8つのテスト、セマンティクスと複雑さが述べられた設計ノート。