Antwort A: OpenAI GPT-5.5
import math import threading import time import unittest from collections import deque from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field # Design note: # This limiter admits a request only when two independent checks agree: # 1. a token bucket has at least cost tokens, allowing short bursts up to burst; # 2. an exact sliding-window log has room for cost more admitted units in the last # window_seconds, where the window limit is rate * window_seconds. # If the token bucket says yes but the sliding window is full, the request is # denied and no tokens are spent; if the w...
Vollstandige Antwort anzeigen ▼
import math import threading import time import unittest from collections import deque from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field # Design note: # This limiter admits a request only when two independent checks agree: # 1. a token bucket has at least cost tokens, allowing short bursts up to burst; # 2. an exact sliding-window log has room for cost more admitted units in the last # window_seconds, where the window limit is rate * window_seconds. # If the token bucket says yes but the sliding window is full, the request is # denied and no tokens are spent; if the window has room but tokens are absent, # it is also denied. The exact log stores one entry per admitted request as # (timestamp, cost), not one entry per unit, so allow is amortized O(1): each log # entry is appended once and removed once. Memory is O(number of admitted requests # in the active window) per client. A weighted two-bucket counter would use O(1) # memory but can over- or under-admit near bucket boundaries; this implementation # chooses exactness because correctness is usually more important for abuse # prevention. The client map is lock-striped; each client state has its own lock. # A caller obtains the per-client lock before mutating tokens/window state, so # concurrent allow calls cannot double-spend tokens or lose window updates. class MonotonicClock: def now(self) -> float: return time.monotonic() class ManualClock: def __init__(self, start: float = 0.0): self._now = float(start) self._lock = threading.Lock() def now(self) -> float: with self._lock: return self._now def set(self, value: float) -> None: with self._lock: self._now = float(value) def advance(self, seconds: float) -> None: with self._lock: self._now += float(seconds) @dataclass(frozen=True) class ClientConfig: rate: float burst: float window_seconds: float = 1.0 def validated(self) -> 'ClientConfig': rate = float(self.rate) burst = float(self.burst) window = float(self.window_seconds) if not math.isfinite(rate) or rate <= 0: raise ValueError('rate must be a positive finite number') if not math.isfinite(burst) or burst < 1: raise ValueError('burst must be a finite number >= 1') if not math.isfinite(window) or window <= 0: raise ValueError('window_seconds must be a positive finite number') return ClientConfig(rate, burst, window) @property def window_limit(self) -> float: return self.rate * self.window_seconds @dataclass class _ClientState: config: ClientConfig tokens: float last_refill: float last_seen: float events: deque = field(default_factory=deque) window_sum: float = 0.0 lock: threading.Lock = field(default_factory=threading.Lock) class HybridRateLimiter: def __init__( self, rate: float, burst: float, window_seconds: float = 1.0, per_client_configs=None, clock=None, stripes: int = 64, idle_ttl_seconds: float | None = None, cleanup_interval_seconds: float | None = None, ): if stripes <= 0: raise ValueError('stripes must be positive') self._default_config = ClientConfig(rate, burst, window_seconds).validated() self._configs = {} if per_client_configs: for client_id, cfg in per_client_configs.items(): self._configs[client_id] = self._coerce_config(cfg) max_window = self._default_config.window_seconds for cfg in self._configs.values(): max_window = max(max_window, cfg.window_seconds) if idle_ttl_seconds is None: idle_ttl_seconds = max(60.0, 2.0 * max_window) idle_ttl_seconds = float(idle_ttl_seconds) if not math.isfinite(idle_ttl_seconds) or idle_ttl_seconds <= 0: raise ValueError('idle_ttl_seconds must be positive and finite') if idle_ttl_seconds < max_window: raise ValueError('idle_ttl_seconds must be >= the largest window_seconds') self._idle_ttl = idle_ttl_seconds if cleanup_interval_seconds is None: cleanup_interval_seconds = min(60.0, max(1.0, idle_ttl_seconds / 2.0)) cleanup_interval_seconds = float(cleanup_interval_seconds) if not math.isfinite(cleanup_interval_seconds) or cleanup_interval_seconds <= 0: raise ValueError('cleanup_interval_seconds must be positive and finite') self._cleanup_interval = cleanup_interval_seconds self._clock = clock if clock is not None else MonotonicClock() self._clients = {} self._stripe_locks = [threading.Lock() for _ in range(stripes)] self._cleanup_mutex = threading.Lock() self._last_cleanup = self._safe_time() self._eps = 1e-12 @staticmethod def _coerce_config(cfg) -> ClientConfig: if isinstance(cfg, ClientConfig): return cfg.validated() if isinstance(cfg, dict): return ClientConfig(cfg['rate'], cfg['burst'], cfg.get('window_seconds', 1.0)).validated() rate, burst, *rest = cfg window = rest[0] if rest else 1.0 return ClientConfig(rate, burst, window).validated() def _safe_time(self) -> float: t = float(self._clock.now()) if math.isnan(t): raise ValueError('clock returned NaN') return t def _config_for(self, client_id: str) -> ClientConfig: return self._configs.get(client_id, self._default_config) def _stripe_index(self, client_id: str) -> int: return hash(client_id) % len(self._stripe_locks) def _get_state_locked(self, client_id: str) -> _ClientState: idx = self._stripe_index(client_id) stripe = self._stripe_locks[idx] stripe.acquire() try: state = self._clients.get(client_id) if state is None: cfg = self._config_for(client_id) now = self._safe_time() state = _ClientState( config=cfg, tokens=cfg.burst, last_refill=now, last_seen=now, ) self._clients[client_id] = state state.lock.acquire() return state finally: stripe.release() def _advance_and_prune_locked(self, state: _ClientState) -> float: raw_now = self._safe_time() now = max(raw_now, state.last_seen) elapsed = max(0.0, now - state.last_refill) max_useful_elapsed = state.config.burst / state.config.rate elapsed_for_tokens = min(elapsed, max_useful_elapsed) if elapsed_for_tokens > 0: state.tokens = min( state.config.burst, state.tokens + elapsed_for_tokens * state.config.rate, ) state.last_refill = now state.last_seen = now cutoff = now - state.config.window_seconds while state.events and state.events[0][0] <= cutoff + self._eps: _, old_cost = state.events.popleft() state.window_sum -= old_cost if state.window_sum < self._eps: state.window_sum = 0.0 return now def allow(self, client_id: str, cost: int = 1) -> bool: if cost <= 0: raise ValueError('cost must be a positive integer') self._maybe_cleanup() cfg = self._config_for(client_id) if cost > cfg.burst: return False state = self._get_state_locked(client_id) try: now = self._advance_and_prune_locked(state) enough_tokens = state.tokens + self._eps >= cost enough_window = state.window_sum + cost <= state.config.window_limit + self._eps if not (enough_tokens and enough_window): return False state.tokens -= cost if state.tokens < self._eps: state.tokens = 0.0 state.window_sum += cost state.events.append((now, float(cost))) return True finally: state.lock.release() def retry_after(self, client_id: str) -> float: self._maybe_cleanup() state = self._get_state_locked(client_id) try: now = self._advance_and_prune_locked(state) cfg = state.config if cfg.burst < 1 or cfg.window_limit < 1: return math.inf token_wait = 0.0 if state.tokens + self._eps >= 1.0 else (1.0 - state.tokens) / cfg.rate if state.window_sum + 1.0 <= cfg.window_limit + self._eps: window_wait = 0.0 else: window_wait = math.inf remaining = state.window_sum for ts, event_cost in state.events: remaining -= event_cost if remaining + 1.0 <= cfg.window_limit + self._eps: window_wait = max(0.0, ts + cfg.window_seconds - now) break return max(token_wait, window_wait) finally: state.lock.release() def _maybe_cleanup(self) -> None: raw_now = self._safe_time() if raw_now - self._last_cleanup < self._cleanup_interval: return if not self._cleanup_mutex.acquire(blocking=False): return try: now = max(raw_now, self._last_cleanup) if now - self._last_cleanup >= self._cleanup_interval: self._cleanup_stale_at(now) self._last_cleanup = now finally: self._cleanup_mutex.release() def cleanup_stale(self) -> int: with self._cleanup_mutex: now = max(self._safe_time(), self._last_cleanup) removed = self._cleanup_stale_at(now) self._last_cleanup = now return removed def _cleanup_stale_at(self, now: float) -> int: removed = 0 for idx, stripe in enumerate(self._stripe_locks): with stripe: for client_id, state in list(self._clients.items()): if self._stripe_index(client_id) != idx: continue if not state.lock.acquire(blocking=False): continue try: if self._clients.get(client_id) is state and now - state.last_seen >= self._idle_ttl: del self._clients[client_id] removed += 1 finally: state.lock.release() return removed def client_count(self) -> int: total = 0 for stripe in self._stripe_locks: with stripe: total += len(self._clients) break return len(self._clients) class HybridRateLimiterTests(unittest.TestCase): def test_basic_allow_deny_and_retry_after(self): clock = ManualClock() limiter = HybridRateLimiter(rate=1, burst=2, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9) def test_burst_draining_and_fractional_refill(self): clock = ManualClock() limiter = HybridRateLimiter(rate=2, burst=3, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) clock.advance(0.25) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 0.25, places=9) clock.advance(0.25) self.assertTrue(limiter.allow('alice')) def test_sliding_window_caps_even_when_bucket_refilled(self): clock = ManualClock() limiter = HybridRateLimiter(rate=10, burst=10, window_seconds=1, clock=clock) for _ in range(10): self.assertTrue(limiter.allow('abuser')) clock.advance(0.5) self.assertFalse(limiter.allow('abuser')) self.assertAlmostEqual(limiter.retry_after('abuser'), 0.5, places=9) clock.advance(0.5) self.assertTrue(limiter.allow('abuser')) def test_cost_larger_than_burst_is_rejected_without_state_creation(self): clock = ManualClock() limiter = HybridRateLimiter(rate=10, burst=5, window_seconds=10, clock=clock) self.assertFalse(limiter.allow('alice', cost=6)) self.assertEqual(limiter.client_count(), 0) self.assertTrue(limiter.allow('alice', cost=5)) self.assertFalse(limiter.allow('alice')) def test_concurrent_contention_obeys_token_bucket_bound(self): clock = ManualClock() rate = 5 burst = 5 limiter = HybridRateLimiter(rate=rate, burst=burst, window_seconds=100, clock=clock, stripes=8) successes = 0 for tick in range(11): clock.set(tick / 10.0) with ThreadPoolExecutor(max_workers=32) as pool: results = list(pool.map(lambda _: limiter.allow('hot'), range(32))) successes += sum(1 for r in results if r) bound = rate * 1.0 + burst self.assertLessEqual(successes, bound) def test_stale_client_eviction(self): clock = ManualClock() limiter = HybridRateLimiter( rate=10, burst=10, window_seconds=1, clock=clock, idle_ttl_seconds=5, cleanup_interval_seconds=1, ) self.assertTrue(limiter.allow('old-client')) self.assertEqual(limiter.client_count(), 1) clock.advance(6) self.assertTrue(limiter.allow('new-client')) self.assertEqual(limiter.client_count(), 1) def test_clock_going_backwards_is_clamped(self): clock = ManualClock(10) limiter = HybridRateLimiter(rate=1, burst=1, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) clock.set(5) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9) def test_large_pause_refills_only_to_burst(self): clock = ManualClock() limiter = HybridRateLimiter(rate=1, burst=3, window_seconds=1000, clock=clock) for _ in range(3): self.assertTrue(limiter.allow('alice')) clock.advance(10 ** 9) for _ in range(3): self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) if __name__ == '__main__': unittest.main()
Ergebnis
Siegstimmen
3 / 3
Durchschnittsscore
Gesamtpunktzahl
Gesamtkommentar
Antwort A bietet eine produktionsreife, hochentwickelte und robuste Implementierung des hybriden Ratenbegrenzers. Ihre wichtigsten Stärken sind das hochgradig skalierbare Concurrency-Modell mit Lock-Striping in Kombination mit Per-Client-Locks, ein eleganter Bereinigungsmechanismus, der einen Hintergrund-Thread vermeidet, und eine umfassende, gut durchdachte Testsuite, die komplexe Verhaltensweisen wie gleichzeitige Konflikte korrekt validiert. Der Code ist sauber, gut strukturiert und entspricht vollständig allen Anforderungen der Aufgabenstellung, einschließlich erweiterter Funktionen wie Per-Client-Konfigurationen.
Bewertungsdetails anzeigen ▼
Korrektheit
Gewichtung 35%Die Implementierung ist hochgradig korrekt und robust. Das Concurrency-Modell mit Lock-Striping und Per-Client-Locks ist eine ausgeklügelte und korrekte Methode zur Vermeidung von Engpässen. Alle Edge Cases, einschließlich Clock-Sicherheit und fraktionale Refills, werden korrekt behandelt. Die Logik sowohl für den Token-Bucket als auch für das Sliding Window ist solide.
Vollstandigkeit
Gewichtung 20%Diese Antwort ist außergewöhnlich vollständig. Sie implementiert alle erforderlichen API-Methoden, behandelt jeden spezifizierten Edge Case, bietet eine umfassende Testsuite mit mehr als der geforderten Anzahl von Tests und enthält eine klare Designnotiz. Sie implementiert auch die vorgeschlagene Per-Client-Konfiguration, was ein vollständiges Verständnis der Aufgabenstellung zeigt.
Codequalitat
Gewichtung 20%Die Codequalität ist ausgezeichnet. Sie verwendet moderne Python-Funktionen wie Dataclasses effektiv, ist gut in logische Methoden gegliedert und sehr gut lesbar. Die Designentscheidungen, wie der integrierte Bereinigungsmechanismus und das Lock-Striping, sind elegant und zeugen von einem hohen Ingenieurskönnen.
Praktischer Nutzen
Gewichtung 15%Die Implementierung ist von produktionsreifer Qualität und sehr praktisch. Das skalierbare Concurrency-Modell macht sie für Systeme mit hohem Durchsatz geeignet. Die flexible Konfiguration und die robuste Behandlung von Edge Cases bedeuten, dass sie mit Zuversicht in einer realen Anwendung eingesetzt werden könnte.
Befolgung der Anweisungen
Gewichtung 10%Die Antwort folgt allen Anweisungen perfekt. Sie liefert Code in der angeforderten Sprache, implementiert die spezifizierte API, den Algorithmus und die Concurrency-Strategie, behandelt alle Edge Cases, enthält die erforderlichen Tests und Komplexitätsanalysen und liefert eine Designnotiz innerhalb der vorgegebenen Einschränkungen.
Gesamtpunktzahl
Gesamtkommentar
Antwort A ist eine starke, weitgehend vollständige Implementierung, die direkt der angeforderten API und Semantik entspricht. Sie kombiniert einen Token-Eimer mit einem exakten Sliding-Window-Log, verwendet eine injizierbare monotone Uhr, vermeidet einen einzigen globalen Engpass durch gestreifte Map-Sperren plus Sperren pro Client und enthält solide deterministische Tests, die die erforderlichen Randfälle abdecken. Die Design-Notiz ist prägnant und behandelt Kompromisse, Korrektheit bei Nebenläufigkeit und Komplexität. Kleinere Schwächen sind eine etwas umständliche Implementierung von client_count und retry_after, die nur die Verfügbarkeit von 1 Einheit und nicht die willkürliche Kosten berücksichtigt, obwohl dies der angeforderten API entspricht.
Bewertungsdetails anzeigen ▼
Korrektheit
Gewichtung 35%Implementiert den erforderlichen Hybrid korrekt: Die Anfrage wird nur genehmigt, wenn sowohl der Token-Eimer als auch das exakte Sliding Window dies zulassen, mit ordnungsgemäßer Begrenzung der Token-Nachfüllung, Lazy Init, Bereinigung veralteter Einträge und serialisierter Mutation pro Client, die Double-Spend verhindert. retry_after ist kohärent für die Verfügbarkeit von 1 Einheit, und Tests decken wichtige Invarianten ab. Kleinere Bedenken sind kleine Implementierungsquirks wie client_count, das die Map ohne vollständige Synchronisation liest, und einige Grenzentscheidungen bei der Window-Eviktion.
Vollstandigkeit
Gewichtung 20%Deckt fast alle angeforderten Elemente ab: erforderliche API-Oberfläche, Hybrid-Algorithmus, injizierbare monotone Uhr, Unterstützung für Konfiguration pro Client, explizite Randfälle, Bereinigung veralteter Clients, fraktionale Zeitmessung, Diskussion über Nebenläufigkeit, Komplexitätsnotiz und 8 aussagekräftige Tests, einschließlich deterministischem gleichzeitigem Konflikt und Eviktion. Dies kommt der vollständigen Erfüllung der Aufforderung sehr nahe.
Codequalitat
Gewichtung 20%Gut strukturiert und idiomatisch, mit Datenklassen, getrennten Helfern, expliziter Validierung und klarer Benennung. Die Sperrstrategie ist durchdacht organisiert und die Design-Notiz ist prägnant. Einige Kanten sind noch rau, wie die ungewöhnliche client_count-Implementierung und einige Low-Level-Sperrverwaltung, die vereinfacht werden könnte.
Praktischer Nutzen
Gewichtung 15%Praktisch für den Backend-Einsatz: deterministische Uhreninjektion, begrenzte Bereinigung von veralteten Zuständen, Lock-Striping plus Sperren pro Client für Konflikte, exakte Fensterabrechnung und solide Tests machen es einsetzbar oder anpassbar. Die Implementierung ist realistisch und behandelt gängige Produktions-Randfälle wie große Pausen und VM-Suspension.
Befolgung der Anweisungen
Gewichtung 10%Folgt der Aufforderung genau: Die Sprachwahl ist gültig, die API-Namen stimmen überein, die Design-Notiz liegt im Rahmen, die Tests verwenden die injizierbare Uhr und die angeforderten Randfälle werden explizit behandelt. Die Antwort stimmt gut mit den technischen und dokumentarischen Anforderungen überein.
Gesamtpunktzahl
Gesamtkommentar
Antwort A ist ein ausgereifter, produktionsreifer Hybrid-Ratenbegrenzer. Er verwendet Datenklassen, Stripe-Sperren plus pro-Client-Sperren, integriert die Bereinigung veralteter Einträge in Allow-Aufrufe (vermeidet Probleme mit Hintergrund-Threads), validiert die Konfiguration, klemmt die verstrichene Zeit für Rückwärtsuhren und große Pausen ordnungsgemäß ab und lehnt Kosten>Burst ab, ohne Client-Status zu erstellen. Die 8 Tests verwenden die injizierbare ManualClock deterministisch, einschließlich eines gleichzeitigen Konflikttests, der die Invariante Rate*T+Burst tatsächlich überprüft. Die Design-Notiz ist prägnant und behandelt alle erforderlichen Punkte.
Bewertungsdetails anzeigen ▼
Korrektheit
Gewichtung 35%Implementiert korrekt den Hybrid-Token-Bucket + exaktes Sliding-Window-Log. Behandelt Rückwärtsuhren durch Begrenzung mittels max(raw_now, last_seen), lehnt Kosten>Burst ab, ohne Zustand zu erstellen, begrenzt die verstrichene Zeit für Tokens, um Überläufe zu vermeiden, verwendet pro-Client-Sperren mit Stripe-Sperren für die Map. retry_after berechnet korrekt die Wartezeit basierend auf dem Ablauf des Window-Logs. Die EPS-Behandlung und die Ereignisbuchhaltung scheinen solide zu sein.
Vollstandigkeit
Gewichtung 20%Behandelt alle erforderlichen Randfälle: Kosten>Burst (ohne Zustandsbildung), Rückwärtsuhr (Test enthalten), große Pausen (Test enthalten), Lazy-Initialisierung, veraltete Bereinigung (deterministisch über _maybe_cleanup, das an die injizierte Uhr gebunden ist), Bruchteile von Tokens. 8 Tests, einschließlich eines gleichzeitigen Grenzwerttests, der die Invariante Rate*T+Burst deterministisch mit ManualClock überprüft.
Codequalitat
Gewichtung 20%Gut strukturiert mit Datenklassen, Trennung von MonotonicClock/ManualClock, Validierung von ClientConfig, klar erklärten pro-Client- und Stripe-Sperren. Die Bereinigung ist in den Allow-Pfad integriert (keine Komplikationen mit Hintergrund-Threads). Prägnante Design-Notiz oben.
Praktischer Nutzen
Gewichtung 15%Produktionsqualität: Stripe-Sperren skalieren besser, integrierte Lazy-Bereinigung vermeidet Lebenszyklusprobleme mit Hintergrund-Threads, deterministische Testunterstützung, Konfigurationsvalidierung. Geeignet für die Einbettung in einen echten Dienst.
Befolgung der Anweisungen
Gewichtung 10%Folgt allen Anweisungen: API-Oberfläche entspricht Signaturen, Hybrid-Algorithmus, pro-Client-Sperren mit Stripe-Sperren, injizierbare Monoton-Uhr, alle Randfälle, 8 Tests einschließlich deterministischem gleichzeitigen Invariantentest, Design-Notiz mit Semantik und angegebener Komplexität.