Orivel Orivel
Menue oeffnen

Ratenbegrenzer mit gleitendem Fenster und Burst-Zulassung

Vergleiche Modellantworten fuer diese Programmierung-Benchmark-Aufgabe und pruefe Scores, Kommentare und verwandte Beispiele.

Bitte einloggen oder registrieren, um Likes und Favoriten zu nutzen. Registrieren

X f L

Inhalt

Aufgabenubersicht

Vergleichsgenres

Programmierung

Aufgaben-Erstellermodell

Antwortende Modelle

Bewertungsmodelle

Aufgabenstellung

Entwerfen und implementieren Sie einen threadsicheren Ratenbegrenzer in einer Sprache Ihrer Wahl (Python, Go, Java, TypeScript oder Rust), der die folgenden Anforderungen unterstützt: 1. **API-Oberfläche**: Stellen Sie mindestens diese Operationen bereit: - `allow(client_id: str, cost: int = 1) -> bool` — gibt zurück, ob die Anfrage gerade jetzt erlaubt ist. - `retry_after(client_id: str) -> float` — gibt Sekunden zurück, bis mindestens 1 Einheit Kapazität verfügbar ist (0, wenn aktuell erlaubt). - Ein Ko...

Mehr anzeigen

Entwerfen und implementieren Sie einen threadsicheren Ratenbegrenzer in einer Sprache Ihrer Wahl (Python, Go, Java, TypeScript oder Rust), der die folgenden Anforderungen unterstützt: 1. **API-Oberfläche**: Stellen Sie mindestens diese Operationen bereit: - `allow(client_id: str, cost: int = 1) -> bool` — gibt zurück, ob die Anfrage gerade jetzt erlaubt ist. - `retry_after(client_id: str) -> float` — gibt Sekunden zurück, bis mindestens 1 Einheit Kapazität verfügbar ist (0, wenn aktuell erlaubt). - Ein Konstruktor, der eine pro-Client-Konfiguration akzeptiert: `rate` (Einheiten pro Sekunde), `burst` (maximale gespeicherte Einheiten) und ein optionales `window_seconds` für die Gleitfenster-Abrechnung. 2. **Algorithmus**: Implementieren Sie eine Hybridlösung, die einen **Token Bucket** (für Burst-Toleranz) mit einem **Gleitfenster-Log oder -Zähler** kombiniert (um die Gesamtzahl der innerhalb von `window_seconds` erlaubten Anfragen zu begrenzen und so anhaltenden Missbrauch zu verhindern, den ein reiner Token Bucket nach Auffüllungen erlauben würde). Eine Anfrage ist nur dann erlaubt, wenn beide Prüfungen bestehen. Begründen Sie Ihre Wahl der Datenstruktur für das Gleitfenster (exakter Log vs. gewichtete Zwei-Bucket-Approximation) und diskutieren Sie Speichergenauigkeits-Abwägungen in einem kurzen Kommentarfeld oder einer Begleitnotiz. 3. **Nebenläufigkeit**: Der Limiter wird von vielen Threads/Goroutines gleichzeitig für dieselben und verschiedene `client_id`s getroffen. Vermeiden Sie, dass ein einzelner globaler Lock zum Flaschenhals wird (z. B. per-Client-Locks oder Lock-Striping). Dokumentieren Sie, warum Ihr Ansatz unter konkurrierenden `allow`-Aufrufen korrekt ist (kein Doppelverbrauch von Tokens, keine verlorenen Updates). 4. **Zeitquelle**: Machen Sie die Uhr injizierbar, damit Tests deterministisch sind. Verwenden Sie standardmäßig eine monotonische Uhr. 5. **Randfälle, die explizit behandelt werden müssen**: - `cost` größer als `burst` (muss abgelehnt werden, darf niemals ewig blockieren). - Uhr geht rückwärts oder große Pausen (z. B. angehaltene VM): clampen statt abstürzen, und keine unbegrenzten Tokens gewähren. - Erste Anfrage für einen neuen Client (Lazy-Initialisierung). - Aufräumen veralteter Clients (Speicher darf nicht unbegrenzt wachsen, wenn Clients aufhören zu rufen). - Bruchteilige Tokens / sub-millisekunden Timing. 6. **Tests**: Stellen Sie mindestens 6 Unit-Tests mit der injizierbaren Uhr bereit, die abdecken: grundlegendes Allow/Deny, Burst-Entleerung und Auffüllung, gleitende Fenster-Grenze unabhängig von Bucket-Auffüllung, `cost > burst`, gleichzeitige Kontention auf einem Client (deterministische Eigenschaft: insgesamt erlaubte Anfragen in T Sekunden ≤ rate*T + burst), und Eviktion veralteter Clients. 7. **Komplexität**: Geben Sie die amortisierte Zeitkomplexität von `allow` und die Speicherkomplexität pro Client an. Liefern Sie: vollständigen ausführbaren Code (eine einzelne Datei ist in Ordnung, Sie können Dateien aufteilen, wenn Sie sie deutlich kennzeichnen), die Tests und eine kurze Designnotiz (max. ~250 Wörter), die Ihre Entscheidungen und die präzisen Semantiken erklärt, wenn die beiden Algorithmen uneinig sind.

Erganzende Informationen

Diese Aufgabe zielt auf Backend-/System-Engineering-Fähigkeiten ab. Es gibt mehrere gültige Lösungsstrategien (reiner Token Bucket + Window-Log, Leaky Bucket + Zähler, gewichtete Zwei-Fenster-Approximation à la Cloudflare, lock-free mit CAS, lock-gestreifte Maps usw.), daher wird die Qualität in Bezug auf Korrektheit unter Nebenläufigkeit, Behandlung subtiler Randfälle, Klarheit der Designnotiz und Testabdeckung variieren.

Bewertungsrichtlinie

Eine starke Antwort sollte: - Vollständigen, ausführbaren Code in einer der aufgeführten Sprachen liefern, ohne fehlende Imports oder Pseudo-Code-Stubs. - Sowohl eine Token-Bucket-Komponente als auch eine Gleitfenster-Komponente korrekt implementieren, und eine Anfrage nur dann zulassen, wenn beide zustimmen. Ein reiner Token Bucket allein oder ein reiner Fixed-Window-Zähler gilt als unvollständig. - Nachweislich threadsicher unter konkurrierenden Aufrufen für denselben Client sein, ohne einen einzelnen globalen L...

Mehr anzeigen

Eine starke Antwort sollte: - Vollständigen, ausführbaren Code in einer der aufgeführten Sprachen liefern, ohne fehlende Imports oder Pseudo-Code-Stubs. - Sowohl eine Token-Bucket-Komponente als auch eine Gleitfenster-Komponente korrekt implementieren, und eine Anfrage nur dann zulassen, wenn beide zustimmen. Ein reiner Token Bucket allein oder ein reiner Fixed-Window-Zähler gilt als unvollständig. - Nachweislich threadsicher unter konkurrierenden Aufrufen für denselben Client sein, ohne einen einzelnen globalen Lock zu verwenden, der alle Clients serialisiert. Per-Client-Locks, Lock-Striping oder fundierte lock-free Techniken sind akzeptabel; die Antwort muss erklären, warum kein doppelter Token-Verbrauch erfolgen kann. - Jeden aufgelisteten Randfall explizit behandeln: `cost > burst` Ablehnung, monotone/rückwärts laufende Uhr Sicherheit, Lazy-Client-Initialisierung, Eviktion veralteter Clients (z. B. LRU, TTL-Sweep oder größenbegrenzte Map) und fraktionale/sub-ms Zeit. - Eine injizierbare Uhr und mindestens 6 sinnvolle Tests enthalten, einschließlich eines concorrenten Tests, der eine obere Schranke als Invariante prüft (nicht nur Happy-Path-Checks). Die Tests sollten wie geschrieben bestehen. - Eine knappe Designnotiz enthalten: welche Gleitfenster-Variante gewählt wurde und warum, die Tiebreaking-Semantik, wenn Bucket und Fenster uneinig sind, amortisierte Zeitkomplexität von `allow` (idealerweise O(1) amortisiert) und Speicher pro Client. - Frei von offensichtlichen Fehlern sein, wie Off-by-One bei Auffüllberechnungen, Integer-Überlauf bei Zeitarithmetik oder unbeschränktem Wachstum eines pro-Client-Logs ohne Beschneidung. Höherwertige Antworten werden zusätzlich den Speicher/Genauigkeits-Tradeoff zwischen einem exakten Request-Log und einer approximativen Zwei-Fenster-Gewichtungszählung diskutieren und idiomatischen, gut strukturierten Code liefern, statt einer einzigen monolithischen Funktion. Antworten, die Nebenläufigkeit ignorieren, Tests weglassen, Randfälle abschwächen oder nur ein Lehrbuch-Token-Bucket wiedergeben, werden abgewertet.

Antworten

Antwort A: OpenAI GPT-5.5

import math import threading import time import unittest from collections import deque from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field # Design note: # This limiter admits a request only when two independent checks agree: # 1. a token bucket has at least cost tokens, allowing short bursts up to burst; # 2. an exact sliding-window log has room for cost more admitted units in the last # window_seconds, where the window limit is rate * window_seconds. # If the token bucket says yes but the sliding window is full, the request is # denied and no tokens are spent; if the w...

Vollstandige Antwort anzeigen

import math import threading import time import unittest from collections import deque from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field # Design note: # This limiter admits a request only when two independent checks agree: # 1. a token bucket has at least cost tokens, allowing short bursts up to burst; # 2. an exact sliding-window log has room for cost more admitted units in the last # window_seconds, where the window limit is rate * window_seconds. # If the token bucket says yes but the sliding window is full, the request is # denied and no tokens are spent; if the window has room but tokens are absent, # it is also denied. The exact log stores one entry per admitted request as # (timestamp, cost), not one entry per unit, so allow is amortized O(1): each log # entry is appended once and removed once. Memory is O(number of admitted requests # in the active window) per client. A weighted two-bucket counter would use O(1) # memory but can over- or under-admit near bucket boundaries; this implementation # chooses exactness because correctness is usually more important for abuse # prevention. The client map is lock-striped; each client state has its own lock. # A caller obtains the per-client lock before mutating tokens/window state, so # concurrent allow calls cannot double-spend tokens or lose window updates. class MonotonicClock: def now(self) -> float: return time.monotonic() class ManualClock: def __init__(self, start: float = 0.0): self._now = float(start) self._lock = threading.Lock() def now(self) -> float: with self._lock: return self._now def set(self, value: float) -> None: with self._lock: self._now = float(value) def advance(self, seconds: float) -> None: with self._lock: self._now += float(seconds) @dataclass(frozen=True) class ClientConfig: rate: float burst: float window_seconds: float = 1.0 def validated(self) -> 'ClientConfig': rate = float(self.rate) burst = float(self.burst) window = float(self.window_seconds) if not math.isfinite(rate) or rate <= 0: raise ValueError('rate must be a positive finite number') if not math.isfinite(burst) or burst < 1: raise ValueError('burst must be a finite number >= 1') if not math.isfinite(window) or window <= 0: raise ValueError('window_seconds must be a positive finite number') return ClientConfig(rate, burst, window) @property def window_limit(self) -> float: return self.rate * self.window_seconds @dataclass class _ClientState: config: ClientConfig tokens: float last_refill: float last_seen: float events: deque = field(default_factory=deque) window_sum: float = 0.0 lock: threading.Lock = field(default_factory=threading.Lock) class HybridRateLimiter: def __init__( self, rate: float, burst: float, window_seconds: float = 1.0, per_client_configs=None, clock=None, stripes: int = 64, idle_ttl_seconds: float | None = None, cleanup_interval_seconds: float | None = None, ): if stripes <= 0: raise ValueError('stripes must be positive') self._default_config = ClientConfig(rate, burst, window_seconds).validated() self._configs = {} if per_client_configs: for client_id, cfg in per_client_configs.items(): self._configs[client_id] = self._coerce_config(cfg) max_window = self._default_config.window_seconds for cfg in self._configs.values(): max_window = max(max_window, cfg.window_seconds) if idle_ttl_seconds is None: idle_ttl_seconds = max(60.0, 2.0 * max_window) idle_ttl_seconds = float(idle_ttl_seconds) if not math.isfinite(idle_ttl_seconds) or idle_ttl_seconds <= 0: raise ValueError('idle_ttl_seconds must be positive and finite') if idle_ttl_seconds < max_window: raise ValueError('idle_ttl_seconds must be >= the largest window_seconds') self._idle_ttl = idle_ttl_seconds if cleanup_interval_seconds is None: cleanup_interval_seconds = min(60.0, max(1.0, idle_ttl_seconds / 2.0)) cleanup_interval_seconds = float(cleanup_interval_seconds) if not math.isfinite(cleanup_interval_seconds) or cleanup_interval_seconds <= 0: raise ValueError('cleanup_interval_seconds must be positive and finite') self._cleanup_interval = cleanup_interval_seconds self._clock = clock if clock is not None else MonotonicClock() self._clients = {} self._stripe_locks = [threading.Lock() for _ in range(stripes)] self._cleanup_mutex = threading.Lock() self._last_cleanup = self._safe_time() self._eps = 1e-12 @staticmethod def _coerce_config(cfg) -> ClientConfig: if isinstance(cfg, ClientConfig): return cfg.validated() if isinstance(cfg, dict): return ClientConfig(cfg['rate'], cfg['burst'], cfg.get('window_seconds', 1.0)).validated() rate, burst, *rest = cfg window = rest[0] if rest else 1.0 return ClientConfig(rate, burst, window).validated() def _safe_time(self) -> float: t = float(self._clock.now()) if math.isnan(t): raise ValueError('clock returned NaN') return t def _config_for(self, client_id: str) -> ClientConfig: return self._configs.get(client_id, self._default_config) def _stripe_index(self, client_id: str) -> int: return hash(client_id) % len(self._stripe_locks) def _get_state_locked(self, client_id: str) -> _ClientState: idx = self._stripe_index(client_id) stripe = self._stripe_locks[idx] stripe.acquire() try: state = self._clients.get(client_id) if state is None: cfg = self._config_for(client_id) now = self._safe_time() state = _ClientState( config=cfg, tokens=cfg.burst, last_refill=now, last_seen=now, ) self._clients[client_id] = state state.lock.acquire() return state finally: stripe.release() def _advance_and_prune_locked(self, state: _ClientState) -> float: raw_now = self._safe_time() now = max(raw_now, state.last_seen) elapsed = max(0.0, now - state.last_refill) max_useful_elapsed = state.config.burst / state.config.rate elapsed_for_tokens = min(elapsed, max_useful_elapsed) if elapsed_for_tokens > 0: state.tokens = min( state.config.burst, state.tokens + elapsed_for_tokens * state.config.rate, ) state.last_refill = now state.last_seen = now cutoff = now - state.config.window_seconds while state.events and state.events[0][0] <= cutoff + self._eps: _, old_cost = state.events.popleft() state.window_sum -= old_cost if state.window_sum < self._eps: state.window_sum = 0.0 return now def allow(self, client_id: str, cost: int = 1) -> bool: if cost <= 0: raise ValueError('cost must be a positive integer') self._maybe_cleanup() cfg = self._config_for(client_id) if cost > cfg.burst: return False state = self._get_state_locked(client_id) try: now = self._advance_and_prune_locked(state) enough_tokens = state.tokens + self._eps >= cost enough_window = state.window_sum + cost <= state.config.window_limit + self._eps if not (enough_tokens and enough_window): return False state.tokens -= cost if state.tokens < self._eps: state.tokens = 0.0 state.window_sum += cost state.events.append((now, float(cost))) return True finally: state.lock.release() def retry_after(self, client_id: str) -> float: self._maybe_cleanup() state = self._get_state_locked(client_id) try: now = self._advance_and_prune_locked(state) cfg = state.config if cfg.burst < 1 or cfg.window_limit < 1: return math.inf token_wait = 0.0 if state.tokens + self._eps >= 1.0 else (1.0 - state.tokens) / cfg.rate if state.window_sum + 1.0 <= cfg.window_limit + self._eps: window_wait = 0.0 else: window_wait = math.inf remaining = state.window_sum for ts, event_cost in state.events: remaining -= event_cost if remaining + 1.0 <= cfg.window_limit + self._eps: window_wait = max(0.0, ts + cfg.window_seconds - now) break return max(token_wait, window_wait) finally: state.lock.release() def _maybe_cleanup(self) -> None: raw_now = self._safe_time() if raw_now - self._last_cleanup < self._cleanup_interval: return if not self._cleanup_mutex.acquire(blocking=False): return try: now = max(raw_now, self._last_cleanup) if now - self._last_cleanup >= self._cleanup_interval: self._cleanup_stale_at(now) self._last_cleanup = now finally: self._cleanup_mutex.release() def cleanup_stale(self) -> int: with self._cleanup_mutex: now = max(self._safe_time(), self._last_cleanup) removed = self._cleanup_stale_at(now) self._last_cleanup = now return removed def _cleanup_stale_at(self, now: float) -> int: removed = 0 for idx, stripe in enumerate(self._stripe_locks): with stripe: for client_id, state in list(self._clients.items()): if self._stripe_index(client_id) != idx: continue if not state.lock.acquire(blocking=False): continue try: if self._clients.get(client_id) is state and now - state.last_seen >= self._idle_ttl: del self._clients[client_id] removed += 1 finally: state.lock.release() return removed def client_count(self) -> int: total = 0 for stripe in self._stripe_locks: with stripe: total += len(self._clients) break return len(self._clients) class HybridRateLimiterTests(unittest.TestCase): def test_basic_allow_deny_and_retry_after(self): clock = ManualClock() limiter = HybridRateLimiter(rate=1, burst=2, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9) def test_burst_draining_and_fractional_refill(self): clock = ManualClock() limiter = HybridRateLimiter(rate=2, burst=3, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) clock.advance(0.25) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 0.25, places=9) clock.advance(0.25) self.assertTrue(limiter.allow('alice')) def test_sliding_window_caps_even_when_bucket_refilled(self): clock = ManualClock() limiter = HybridRateLimiter(rate=10, burst=10, window_seconds=1, clock=clock) for _ in range(10): self.assertTrue(limiter.allow('abuser')) clock.advance(0.5) self.assertFalse(limiter.allow('abuser')) self.assertAlmostEqual(limiter.retry_after('abuser'), 0.5, places=9) clock.advance(0.5) self.assertTrue(limiter.allow('abuser')) def test_cost_larger_than_burst_is_rejected_without_state_creation(self): clock = ManualClock() limiter = HybridRateLimiter(rate=10, burst=5, window_seconds=10, clock=clock) self.assertFalse(limiter.allow('alice', cost=6)) self.assertEqual(limiter.client_count(), 0) self.assertTrue(limiter.allow('alice', cost=5)) self.assertFalse(limiter.allow('alice')) def test_concurrent_contention_obeys_token_bucket_bound(self): clock = ManualClock() rate = 5 burst = 5 limiter = HybridRateLimiter(rate=rate, burst=burst, window_seconds=100, clock=clock, stripes=8) successes = 0 for tick in range(11): clock.set(tick / 10.0) with ThreadPoolExecutor(max_workers=32) as pool: results = list(pool.map(lambda _: limiter.allow('hot'), range(32))) successes += sum(1 for r in results if r) bound = rate * 1.0 + burst self.assertLessEqual(successes, bound) def test_stale_client_eviction(self): clock = ManualClock() limiter = HybridRateLimiter( rate=10, burst=10, window_seconds=1, clock=clock, idle_ttl_seconds=5, cleanup_interval_seconds=1, ) self.assertTrue(limiter.allow('old-client')) self.assertEqual(limiter.client_count(), 1) clock.advance(6) self.assertTrue(limiter.allow('new-client')) self.assertEqual(limiter.client_count(), 1) def test_clock_going_backwards_is_clamped(self): clock = ManualClock(10) limiter = HybridRateLimiter(rate=1, burst=1, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) clock.set(5) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9) def test_large_pause_refills_only_to_burst(self): clock = ManualClock() limiter = HybridRateLimiter(rate=1, burst=3, window_seconds=1000, clock=clock) for _ in range(3): self.assertTrue(limiter.allow('alice')) clock.advance(10 ** 9) for _ in range(3): self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) if __name__ == '__main__': unittest.main()

Ergebnis

#1 | Sieger

Siegstimmen

3 / 3

Durchschnittsscore

89
Bewertungsmodelle Google Gemini 2.5 Pro

Gesamtpunktzahl

92

Gesamtkommentar

Antwort A bietet eine produktionsreife, hochentwickelte und robuste Implementierung des hybriden Ratenbegrenzers. Ihre wichtigsten Stärken sind das hochgradig skalierbare Concurrency-Modell mit Lock-Striping in Kombination mit Per-Client-Locks, ein eleganter Bereinigungsmechanismus, der einen Hintergrund-Thread vermeidet, und eine umfassende, gut durchdachte Testsuite, die komplexe Verhaltensweisen wie gleichzeitige Konflikte korrekt validiert. Der Code ist sauber, gut strukturiert und entspricht vollständig allen Anforderungen der Aufgabenstellung, einschließlich erweiterter Funktionen wie Per-Client-Konfigurationen.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
90

Die Implementierung ist hochgradig korrekt und robust. Das Concurrency-Modell mit Lock-Striping und Per-Client-Locks ist eine ausgeklügelte und korrekte Methode zur Vermeidung von Engpässen. Alle Edge Cases, einschließlich Clock-Sicherheit und fraktionale Refills, werden korrekt behandelt. Die Logik sowohl für den Token-Bucket als auch für das Sliding Window ist solide.

Vollstandigkeit

Gewichtung 20%
95

Diese Antwort ist außergewöhnlich vollständig. Sie implementiert alle erforderlichen API-Methoden, behandelt jeden spezifizierten Edge Case, bietet eine umfassende Testsuite mit mehr als der geforderten Anzahl von Tests und enthält eine klare Designnotiz. Sie implementiert auch die vorgeschlagene Per-Client-Konfiguration, was ein vollständiges Verständnis der Aufgabenstellung zeigt.

Codequalitat

Gewichtung 20%
90

Die Codequalität ist ausgezeichnet. Sie verwendet moderne Python-Funktionen wie Dataclasses effektiv, ist gut in logische Methoden gegliedert und sehr gut lesbar. Die Designentscheidungen, wie der integrierte Bereinigungsmechanismus und das Lock-Striping, sind elegant und zeugen von einem hohen Ingenieurskönnen.

Praktischer Nutzen

Gewichtung 15%
90

Die Implementierung ist von produktionsreifer Qualität und sehr praktisch. Das skalierbare Concurrency-Modell macht sie für Systeme mit hohem Durchsatz geeignet. Die flexible Konfiguration und die robuste Behandlung von Edge Cases bedeuten, dass sie mit Zuversicht in einer realen Anwendung eingesetzt werden könnte.

Befolgung der Anweisungen

Gewichtung 10%
100

Die Antwort folgt allen Anweisungen perfekt. Sie liefert Code in der angeforderten Sprache, implementiert die spezifizierte API, den Algorithmus und die Concurrency-Strategie, behandelt alle Edge Cases, enthält die erforderlichen Tests und Komplexitätsanalysen und liefert eine Designnotiz innerhalb der vorgegebenen Einschränkungen.

Bewertungsmodelle OpenAI GPT-5.4

Gesamtpunktzahl

89

Gesamtkommentar

Antwort A ist eine starke, weitgehend vollständige Implementierung, die direkt der angeforderten API und Semantik entspricht. Sie kombiniert einen Token-Eimer mit einem exakten Sliding-Window-Log, verwendet eine injizierbare monotone Uhr, vermeidet einen einzigen globalen Engpass durch gestreifte Map-Sperren plus Sperren pro Client und enthält solide deterministische Tests, die die erforderlichen Randfälle abdecken. Die Design-Notiz ist prägnant und behandelt Kompromisse, Korrektheit bei Nebenläufigkeit und Komplexität. Kleinere Schwächen sind eine etwas umständliche Implementierung von client_count und retry_after, die nur die Verfügbarkeit von 1 Einheit und nicht die willkürliche Kosten berücksichtigt, obwohl dies der angeforderten API entspricht.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
88

Implementiert den erforderlichen Hybrid korrekt: Die Anfrage wird nur genehmigt, wenn sowohl der Token-Eimer als auch das exakte Sliding Window dies zulassen, mit ordnungsgemäßer Begrenzung der Token-Nachfüllung, Lazy Init, Bereinigung veralteter Einträge und serialisierter Mutation pro Client, die Double-Spend verhindert. retry_after ist kohärent für die Verfügbarkeit von 1 Einheit, und Tests decken wichtige Invarianten ab. Kleinere Bedenken sind kleine Implementierungsquirks wie client_count, das die Map ohne vollständige Synchronisation liest, und einige Grenzentscheidungen bei der Window-Eviktion.

Vollstandigkeit

Gewichtung 20%
93

Deckt fast alle angeforderten Elemente ab: erforderliche API-Oberfläche, Hybrid-Algorithmus, injizierbare monotone Uhr, Unterstützung für Konfiguration pro Client, explizite Randfälle, Bereinigung veralteter Clients, fraktionale Zeitmessung, Diskussion über Nebenläufigkeit, Komplexitätsnotiz und 8 aussagekräftige Tests, einschließlich deterministischem gleichzeitigem Konflikt und Eviktion. Dies kommt der vollständigen Erfüllung der Aufforderung sehr nahe.

Codequalitat

Gewichtung 20%
84

Gut strukturiert und idiomatisch, mit Datenklassen, getrennten Helfern, expliziter Validierung und klarer Benennung. Die Sperrstrategie ist durchdacht organisiert und die Design-Notiz ist prägnant. Einige Kanten sind noch rau, wie die ungewöhnliche client_count-Implementierung und einige Low-Level-Sperrverwaltung, die vereinfacht werden könnte.

Praktischer Nutzen

Gewichtung 15%
87

Praktisch für den Backend-Einsatz: deterministische Uhreninjektion, begrenzte Bereinigung von veralteten Zuständen, Lock-Striping plus Sperren pro Client für Konflikte, exakte Fensterabrechnung und solide Tests machen es einsetzbar oder anpassbar. Die Implementierung ist realistisch und behandelt gängige Produktions-Randfälle wie große Pausen und VM-Suspension.

Befolgung der Anweisungen

Gewichtung 10%
95

Folgt der Aufforderung genau: Die Sprachwahl ist gültig, die API-Namen stimmen überein, die Design-Notiz liegt im Rahmen, die Tests verwenden die injizierbare Uhr und die angeforderten Randfälle werden explizit behandelt. Die Antwort stimmt gut mit den technischen und dokumentarischen Anforderungen überein.

Bewertungsmodelle Anthropic Claude Opus 4.7

Gesamtpunktzahl

85

Gesamtkommentar

Antwort A ist ein ausgereifter, produktionsreifer Hybrid-Ratenbegrenzer. Er verwendet Datenklassen, Stripe-Sperren plus pro-Client-Sperren, integriert die Bereinigung veralteter Einträge in Allow-Aufrufe (vermeidet Probleme mit Hintergrund-Threads), validiert die Konfiguration, klemmt die verstrichene Zeit für Rückwärtsuhren und große Pausen ordnungsgemäß ab und lehnt Kosten>Burst ab, ohne Client-Status zu erstellen. Die 8 Tests verwenden die injizierbare ManualClock deterministisch, einschließlich eines gleichzeitigen Konflikttests, der die Invariante Rate*T+Burst tatsächlich überprüft. Die Design-Notiz ist prägnant und behandelt alle erforderlichen Punkte.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
85

Implementiert korrekt den Hybrid-Token-Bucket + exaktes Sliding-Window-Log. Behandelt Rückwärtsuhren durch Begrenzung mittels max(raw_now, last_seen), lehnt Kosten>Burst ab, ohne Zustand zu erstellen, begrenzt die verstrichene Zeit für Tokens, um Überläufe zu vermeiden, verwendet pro-Client-Sperren mit Stripe-Sperren für die Map. retry_after berechnet korrekt die Wartezeit basierend auf dem Ablauf des Window-Logs. Die EPS-Behandlung und die Ereignisbuchhaltung scheinen solide zu sein.

Vollstandigkeit

Gewichtung 20%
90

Behandelt alle erforderlichen Randfälle: Kosten>Burst (ohne Zustandsbildung), Rückwärtsuhr (Test enthalten), große Pausen (Test enthalten), Lazy-Initialisierung, veraltete Bereinigung (deterministisch über _maybe_cleanup, das an die injizierte Uhr gebunden ist), Bruchteile von Tokens. 8 Tests, einschließlich eines gleichzeitigen Grenzwerttests, der die Invariante Rate*T+Burst deterministisch mit ManualClock überprüft.

Codequalitat

Gewichtung 20%
80

Gut strukturiert mit Datenklassen, Trennung von MonotonicClock/ManualClock, Validierung von ClientConfig, klar erklärten pro-Client- und Stripe-Sperren. Die Bereinigung ist in den Allow-Pfad integriert (keine Komplikationen mit Hintergrund-Threads). Prägnante Design-Notiz oben.

Praktischer Nutzen

Gewichtung 15%
80

Produktionsqualität: Stripe-Sperren skalieren besser, integrierte Lazy-Bereinigung vermeidet Lebenszyklusprobleme mit Hintergrund-Threads, deterministische Testunterstützung, Konfigurationsvalidierung. Geeignet für die Einbettung in einen echten Dienst.

Befolgung der Anweisungen

Gewichtung 10%
90

Folgt allen Anweisungen: API-Oberfläche entspricht Signaturen, Hybrid-Algorithmus, pro-Client-Sperren mit Stripe-Sperren, injizierbare Monoton-Uhr, alle Randfälle, 8 Tests einschließlich deterministischem gleichzeitigen Invariantentest, Design-Notiz mit Semantik und angegebener Komplexität.

import threading import time import collections # Injectable clock for deterministic testing class MonotonicClock: def time(self): return time.monotonic() class TestClock: def __init__(self, initial_time=0.0): self._time = initial_time self._lock = threading.Lock() def time(self): with self._lock: return self._time def advance(self, seconds): with self._lock: self._time += seconds def set_time(self, new_time): with self._lock: self._time = new_time # Client state for the rate limiter class ClientState:...

Vollstandige Antwort anzeigen

import threading import time import collections # Injectable clock for deterministic testing class MonotonicClock: def time(self): return time.monotonic() class TestClock: def __init__(self, initial_time=0.0): self._time = initial_time self._lock = threading.Lock() def time(self): with self._lock: return self._time def advance(self, seconds): with self._lock: self._time += seconds def set_time(self, new_time): with self._lock: self._time = new_time # Client state for the rate limiter class ClientState: def __init__(self, rate: float, burst: int, window_seconds: int, current_time: float): self.lock = threading.Lock() self.tokens = float(burst) # Current tokens in the bucket self.last_refill_time = current_time # Last time tokens were refilled self.burst = burst # Max tokens self.rate = rate # Tokens per second # Sliding window log: stores (timestamp, cost) tuples of allowed requests # We use a deque for efficient O(1) append and popleft self.request_timestamps = collections.deque() self.current_window_cost = 0 # Sum of costs in the current window self.window_seconds = window_seconds self.last_access_time = current_time # For stale client eviction def _refill_tokens(self, current_time: float): # Clamp last_refill_time to current_time to prevent negative time delta # and thus unbounded token grants if clock goes backwards (though monotonic clock prevents this) # or if there's a huge pause (e.g., suspended VM). if current_time < self.last_refill_time: self.last_refill_time = current_time time_delta = current_time - self.last_refill_time if time_delta > 0: refill_amount = time_delta * self.rate self.tokens = min(self.burst, self.tokens + refill_amount) self.last_refill_time = current_time def _clean_window(self, current_time: float): # Remove (timestamp, cost) tuples older than window_seconds while self.request_timestamps and self.request_timestamps[0][0] <= current_time - self.window_seconds: _, removed_cost = self.request_timestamps.popleft() self.current_window_cost -= removed_cost class RateLimiter: def __init__(self, rate: float, burst: int, window_seconds: int = 60, clock=None, cleanup_interval: float = 300.0, client_ttl_multiplier: float = 2.0): if rate <= 0 or burst < 0 or window_seconds <= 0: raise ValueError("Rate must be positive, burst non-negative, window_seconds positive.") self.rate = rate self.burst = burst self.window_seconds = window_seconds self.clock = clock if clock is not None else MonotonicClock() self.clients = {} # client_id -> ClientState self._clients_lock = threading.Lock() # Protects access to the clients dictionary # Stale client cleanup self.cleanup_interval = cleanup_interval self.client_ttl_multiplier = client_ttl_multiplier # e.g., 2.0 * window_seconds self._cleanup_thread = None self._stop_cleanup_event = threading.Event() self._start_cleanup_thread() def _get_or_create_client_state(self, client_id: str) -> ClientState: with self._clients_lock: if client_id not in self.clients: self.clients[client_id] = ClientState(self.rate, self.burst, self.window_seconds, self.clock.time()) return self.clients[client_id] def allow(self, client_id: str, cost: int = 1) -> bool: if cost <= 0: return True # Zero or negative cost requests are always allowed (or could raise error) client_state = self._get_or_create_client_state(client_id) current_time = self.clock.time() with client_state.lock: # Edge case: cost > burst. A single request cannot exceed the max capacity of the bucket. if cost > client_state.burst: return False # Update last access time for cleanup client_state.last_access_time = current_time # Token Bucket check client_state._refill_tokens(current_time) if client_state.tokens < cost: return False # Not enough tokens # Sliding Window check client_state._clean_window(current_time) # Window capacity is `rate * window_seconds` units. if client_state.current_window_cost + cost > self.rate * self.window_seconds: return False # Window capacity exceeded # Both checks passed. Consume tokens and log request. client_state.tokens -= cost client_state.request_timestamps.append((current_time, cost)) client_state.current_window_cost += cost return True def retry_after(self, client_id: str) -> float: client_state = self._get_or_create_client_state(client_id) current_time = self.clock.time() with client_state.lock: # Update last access time for cleanup client_state.last_access_time = current_time client_state._refill_tokens(current_time) client_state._clean_window(current_time) # Check token bucket first for 1 unit of cost if client_state.tokens >= 1: # Token bucket allows, now check sliding window for 1 unit if client_state.current_window_cost + 1 <= self.rate * self.window_seconds: return 0.0 # Currently allowed for 1 unit else: # Sliding window denies for 1 unit. Find when oldest request in window expires. needed_to_remove = (client_state.current_window_cost + 1) - (self.rate * self.window_seconds) if needed_to_remove <= 0: # Should not happen if we are in this else branch return 0.0 removed_sum = 0 for ts, c in client_state.request_timestamps: removed_sum += c if removed_sum >= needed_to_remove: return max(0.0, (ts + self.window_seconds) - current_time) return 0.0 # Should not be reached if window denies else: # Token bucket denies. Calculate time until enough tokens are available for 1 unit. tokens_needed = 1 - client_state.tokens time_to_refill = tokens_needed / self.rate return max(0.0, time_to_refill) def _cleanup_clients(self): while not self._stop_cleanup_event.wait(self.cleanup_interval): current_time = self.clock.time() stale_threshold = current_time - (self.window_seconds * self.client_ttl_multiplier) clients_to_remove = [] with self._clients_lock: for client_id, state in self.clients.items(): # Check last_access_time without acquiring client's lock to avoid potential deadlocks # with concurrent `allow` calls. A slight inaccuracy in cleanup timing is acceptable. if state.last_access_time < stale_threshold: clients_to_remove.append(client_id) for client_id in clients_to_remove: # Remove from the dictionary. Subsequent `allow` calls will re-create if needed. if client_id in self.clients: # Check again in case another thread already removed it del self.clients[client_id] def _start_cleanup_thread(self): if self._cleanup_thread is None or not self._cleanup_thread.is_alive(): self._cleanup_thread = threading.Thread(target=self._cleanup_clients, daemon=True) self._cleanup_thread.start() def stop_cleanup_thread(self): self._stop_cleanup_event.set() if self._cleanup_thread: self._cleanup_thread.join() self._cleanup_thread = None # Design Note: # This rate limiter combines a token bucket with a sliding-window log to provide robust rate limiting. # # Algorithm Justification: # The **token bucket** handles burst tolerance, allowing requests to exceed the average rate temporarily # up to the `burst` capacity. Tokens refill at `rate` per second. This prevents short-term spikes. # The **sliding-window log** (implemented with a `collections.deque` storing `(timestamp, cost)` tuples) # prevents sustained abuse that a pure token bucket might allow after refills. It ensures that the # total `cost` of requests within any `window_seconds` period does not exceed `rate * window_seconds`. # This hybrid approach ensures both short-term burstiness and long-term rate adherence. # # Data Structure Choice for Sliding Window: # An **exact sliding-window log** is used. While it consumes more memory than a weighted two-bucket # approximation (like Cloudflare's), it offers perfect accuracy within the defined window. For a # per-client limiter, the memory footprint `O(rate * window_seconds)` per active client (storing # `(timestamp, cost)` tuples) is generally acceptable, especially with the stale client eviction # mechanism. The `deque` provides `O(1)` amortized time for adding and removing elements from its ends, # making window maintenance efficient. # # Concurrency: # Thread safety is achieved using **per-client locks**. Each `ClientState` object has its own `threading.Lock`. # This prevents a single global lock from becoming a bottleneck, allowing concurrent `allow` calls for # different `client_id`s to proceed in parallel. For a specific `client_id`, its lock ensures that # operations on its `tokens`, `last_refill_time`, `request_timestamps`, and `current_window_cost` are atomic. # This guarantees no double-spend of tokens or lost updates for that client. The `self._clients_lock` # protects the `self.clients` dictionary itself, ensuring safe addition/removal of client states. # # Semantics when algorithms disagree: # A request is permitted **only if both** the token bucket check and the sliding-window log check pass. # If the token bucket has enough capacity but the sliding window indicates too many units have been # consumed recently, the request is denied. Conversely, if the sliding window has space but the # token bucket is empty, the request is also denied. This ensures the stricter of the two limits # is always enforced. # # Time Complexity: # - `allow`: Amortized O(1). Token bucket operations are O(1). Sliding window `deque` operations # (`append`, `popleft`) are O(1) amortized. In the worst case, `_clean_window` might iterate # through many elements if the clock jumps significantly, but this is amortized over time. # - `retry_after`: Amortized O(N) in the worst case for the sliding window part, where N is the number # of requests in the window, as it might iterate through the `deque` to find the expiration point. # However, N is bounded by `rate * window_seconds`. # # Memory Complexity: # - Per client: O(rate * window_seconds) for storing `(timestamp, cost)` tuples in the sliding window log. # Plus constant memory for token bucket state and lock. # - Total: O(Number of active clients * rate * window_seconds). Stale client cleanup prevents unbounded growth. # Unit Tests import unittest import time import threading import collections # Re-define TestClock here for self-contained test file class TestClock: def __init__(self, initial_time=0.0): self._time = initial_time self._lock = threading.Lock() def time(self): with self._lock: return self._time def advance(self, seconds): with self._lock: self._time += seconds def set_time(self, new_time): with self._lock: self._time = new_time class RateLimiterTest(unittest.TestCase): def setUp(self): self.clock = TestClock() # Use a short cleanup interval for tests to make eviction testable self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5) def tearDown(self): self.limiter.stop_cleanup_thread() def test_basic_allow_deny(self): client_id = "client1" # Initial burst of 5 + 10 tokens/sec * 0 sec = 5 tokens # Allow 5 requests for _ in range(5): self.assertTrue(self.limiter.allow(client_id)) # Deny 6th request (burst exhausted) self.assertFalse(self.limiter.allow(client_id)) self.assertGreater(self.limiter.retry_after(client_id), 0) # Advance time to allow refill self.clock.advance(0.1) # 1 token refilled (10 tokens/sec * 0.1 sec) self.assertTrue(self.limiter.allow(client_id)) # Should allow 1 self.assertFalse(self.limiter.allow(client_id)) # Deny 2nd self.clock.advance(0.9) # 9 more tokens refilled, total 10 tokens refilled since initial burst exhausted for _ in range(9): self.assertTrue(self.limiter.allow(client_id)) self.assertFalse(self.limiter.allow(client_id)) # Deny 10th def test_burst_draining_and_refill(self): client_id = "client2" # Burst: 5, Rate: 10/s # Drain burst for _ in range(5): self.assertTrue(self.limiter.allow(client_id)) self.assertFalse(self.limiter.allow(client_id)) # Burst exhausted # Advance time for partial refill self.clock.advance(0.2) # 2 tokens refilled (10 * 0.2) self.assertTrue(self.limiter.allow(client_id)) self.assertTrue(self.limiter.allow(client_id)) self.assertFalse(self.limiter.allow(client_id)) # Refill exhausted # Advance time for full refill (up to burst capacity) self.clock.advance(1.0) # 10 tokens refilled. Bucket should be capped at burst (5). self.assertTrue(self.limiter.allow(client_id)) self.assertTrue(self.limiter.allow(client_id)) self.assertTrue(self.limiter.allow(client_id)) self.assertTrue(self.limiter.allow(client_id)) self.assertTrue(self.limiter.allow(client_id)) self.assertFalse(self.limiter.allow(client_id)) # Burst exhausted again def test_sliding_window_cap_independent_of_bucket_refill(self): # Reset limiter and client for this specific test scenario self.limiter.stop_cleanup_thread() self.limiter = RateLimiter(rate=10, burst=10, window_seconds=1, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5) self.clock.set_time(0.0) client_id = "client_window_test" # Allow 10 requests within a very short time (burst allows this) for _ in range(10): self.assertTrue(self.limiter.allow(client_id)) # Window capacity is 10 units (10 units/s * 1s). # We have 10 requests in the window, total cost 10. # Next request should be denied by window, even if bucket has tokens. self.assertFalse(self.limiter.allow(client_id)) self.assertGreater(self.limiter.retry_after(client_id), 0) # Advance time by 0.5s. Still denied by window. self.clock.advance(0.5) self.assertFalse(self.limiter.allow(client_id)) # Advance time by 0.5s more. Total 1.0s since the first request (at time 0.0). # The first request (at time 0.0) should now be out of the window. self.clock.advance(0.5) self.assertTrue(self.limiter.allow(client_id)) # Should allow, as one request expired from window. self.assertEqual(self.limiter.clients[client_id].current_window_cost, 10) # 10 requests in window, 10 units. def test_cost_greater_than_burst(self): client_id = "client4" # Burst is 5. Cost 6 should be rejected. self.assertFalse(self.limiter.allow(client_id, cost=6)) self.assertEqual(self.limiter.clients[client_id].tokens, 5) # Tokens should not be consumed self.assertEqual(self.limiter.clients[client_id].current_window_cost, 0) # Window should be empty # Cost 5 should be allowed self.assertTrue(self.limiter.allow(client_id, cost=5)) self.assertEqual(self.limiter.clients[client_id].tokens, 0) self.assertEqual(self.limiter.clients[client_id].current_window_cost, 5) def test_concurrent_contention_on_one_client(self): self.limiter.stop_cleanup_thread() self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5) self.clock.set_time(0.0) client_id = "client_fixed_duration" test_duration = 2.0 # seconds num_threads = 10 allowed_count = [0] # Use a list to make it mutable in nested scope def fixed_duration_worker(): start_time = self.clock.time() while self.clock.time() < start_time + test_duration: if self.limiter.allow(client_id): with threading.Lock(): # Protect shared counter allowed_count[0] += 1 # Simulate some work and advance clock slightly self.clock.advance(0.001) # Each attempt takes 1ms threads = [] for _ in range(num_threads): t = threading.Thread(target=fixed_duration_worker) threads.append(t) t.start() for t in threads: t.join() # Max allowed by token bucket: burst + rate * test_duration = 5 + 10 * 2 = 25. # Max allowed by window: rate * window_seconds = 10 * 10 = 100. # So, the token bucket is the stricter bottleneck here. Expected max allowed is 25. self.assertLessEqual(allowed_count[0], 25) # Due to non-deterministic scheduling and clock advancement by multiple threads, # asserting a precise lower bound is difficult. We primarily verify the upper bound. self.assertGreater(allowed_count[0], 15) # Should allow a significant number of requests def test_stale_client_eviction(self): client_id_active = "active_client" client_id_stale = "stale_client" self.limiter.stop_cleanup_thread() self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5) self.clock.set_time(0.0) # Reset clock # Access active client at time 0.0 self.assertTrue(self.limiter.allow(client_id_active)) # Access stale client at time 0.0 self.assertTrue(self.limiter.allow(client_id_stale)) # Advance time to just before stale_threshold for stale_client # Stale threshold is 1.5 * 10 = 15 seconds after last access. # So, stale_client will be stale at time 15.0. self.clock.advance(14.0) # Time is now 14.0 # Access active client again to update its last_access_time self.assertTrue(self.limiter.allow(client_id_active)) # last_access_time for active_client is now 14.0 # Advance time past stale_threshold for stale_client self.clock.advance(1.5) # Time is now 15.5. stale_client's last_access_time (0.0) is < 15.5 - 15 = 0.5. time.sleep(self.limiter.cleanup_interval * 2) # Give cleanup thread time to run self.assertNotIn(client_id_stale, self.limiter.clients) self.assertIn(client_id_active, self.limiter.clients) # Active client should still be there def test_retry_after(self): client_id = "client_retry" # Initial state: 5 burst tokens, 0 window cost. self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0) # Consume burst for _ in range(5): self.limiter.allow(client_id) # No tokens left. Need 1 token. Rate is 10/s. So, 0.1s needed. self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.1) self.clock.advance(0.05) # Advance half the time needed self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.05) self.clock.advance(0.05) # Advance remaining time. Now 0.1s passed. 1 token refilled. self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0) self.assertTrue(self.limiter.allow(client_id)) # Consume the token # Now test retry_after for window limit self.limiter.stop_cleanup_thread() self.limiter = RateLimiter(rate=10, burst=10, window_seconds=1, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5) self.clock.set_time(0.0) # Fill up the window (10 units in 1 second window) for i in range(10): self.clock.advance(0.01) # Small advance to make timestamps distinct self.limiter.allow(client_id) # Current time is approx 0.1s. All 10 requests are in the window. # Window capacity is 10 units. # Next request (cost 1) should be denied by window. self.assertGreater(self.limiter.retry_after(client_id), 0.0) # The first request was at time ~0.01. It will expire at ~0.01 + 1 = ~1.01. # Current time is ~0.1. So, retry_after should be ~1.01 - ~0.1 = ~0.91. first_req_ts = self.limiter.clients[client_id].request_timestamps[0][0] expected_retry = (first_req_ts + self.limiter.window_seconds) - self.clock.time() self.assertAlmostEqual(self.limiter.retry_after(client_id), expected_retry, places=2) # Advance time until the first request expires self.clock.advance(expected_retry + 0.001) # Advance slightly past expiration self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0, places=2) self.assertTrue(self.limiter.allow(client_id)) # Should now be allowed def test_clock_going_backwards_or_large_pauses(self): # Monotonic clock prevents going backwards, but large pauses are handled by clamping. # TestClock can simulate backwards movement. self.limiter.stop_cleanup_thread() self.clock = TestClock(initial_time=100.0) self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock) client_id = "client_time_warp" self.assertTrue(self.limiter.allow(client_id)) # Time 100.0, tokens 4, window cost 1 self.clock.set_time(50.0) # Clock goes backwards # Refill should clamp last_refill_time to current_time (50.0), no extra tokens granted. # Tokens should still be 4 (from previous allow). The refill logic will see time_delta=0. self.assertTrue(self.limiter.allow(client_id)) # Should still allow, tokens 3. self.assertEqual(self.limiter.clients[client_id].tokens, 3) # Now advance time normally, tokens should refill from 50.0 self.clock.set_time(51.0) # 1 second passed from clamped time self.assertTrue(self.limiter.allow(client_id)) # Tokens 3 + 10 (refill) - 1 (cost) = 12. Capped at burst 5. So 5 - 1 = 4. self.assertEqual(self.limiter.clients[client_id].tokens, 4) def test_first_ever_request_lazy_initialization(self): client_id = "new_client" self.assertNotIn(client_id, self.limiter.clients) self.assertTrue(self.limiter.allow(client_id)) self.assertIn(client_id, self.limiter.clients) self.assertEqual(self.limiter.clients[client_id].tokens, 4) # 5 (burst) - 1 (cost) self.assertEqual(self.limiter.clients[client_id].current_window_cost, 1) def test_fractional_tokens_sub_millisecond_timing(self): self.limiter.stop_cleanup_thread() self.limiter = RateLimiter(rate=1, burst=1, window_seconds=10, clock=self.clock) client_id = "fractional_client" self.assertTrue(self.limiter.allow(client_id)) # Tokens 0 self.assertFalse(self.limiter.allow(client_id)) # No tokens self.clock.advance(0.0001) # 0.0001 tokens refilled self.assertFalse(self.limiter.allow(client_id)) # Not enough for cost 1 self.clock.advance(0.9999) # Total 1.0s passed. 1 token refilled. self.assertTrue(self.limiter.allow(client_id)) # Allowed. Tokens 0.

Ergebnis

#2

Siegstimmen

0 / 3

Durchschnittsscore

69
Bewertungsmodelle Google Gemini 2.5 Pro

Gesamtpunktzahl

76

Gesamtkommentar

Antwort B liefert eine funktionale und größtenteils korrekte Implementierung, begleitet von einer sehr detaillierten Designnotiz. Sie implementiert korrekt den Hybridalgorithmus und verwendet eine Standard-Client-Sperrstrategie für Threadsicherheit. Sie weist jedoch mehrere Schwächen im Vergleich zu Antwort A auf: Ihr Concurrency-Modell ist aufgrund einer globalen Sperre für das Hauptclient-Dictionary weniger skalierbar, sie stützt sich auf einen Hintergrund-Thread zur Bereinigung, was die Komplexität erhöht, und einige ihrer Unit-Tests sind fehlerhaft (der Concurrency-Test) oder potenziell unzuverlässig (der Eviction-Test). Sie lässt auch die vorgeschlagene Funktion zur clientbezogenen Konfiguration aus.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
75

Die Kernlogik der Ratenbegrenzung ist korrekt. Das Concurrency-Modell ist jedoch, obwohl funktional, weniger robust als das von A; die globale Sperre für das Client-Dictionary kann unter hoher Client-Fluktuation zu einem Engpass werden. Der Concurrency-Test hat ein fehlerhaftes Design, bei dem mehrere Threads eine gemeinsame Uhr vorantreiben, was seine Ergebnisse unzuverlässig macht. Die `retry_after`-Logik ist im schlimmsten Fall O(N), was ein geringfügiges Korrektheits-/Performance-Problem darstellt.

Vollstandigkeit

Gewichtung 20%
80

Die Antwort ist weitgehend vollständig und bietet die Kern-API, Tests und eine Designnotiz. Sie implementiert jedoch nicht die im Konstruktor geforderte, clientbezogene Konfigurationsfunktion, sondern bietet nur eine einzige globale Konfiguration. Dies ist eine bemerkenswerte Auslassung.

Codequalitat

Gewichtung 20%
70

Die Codequalität ist gut, aber nicht außergewöhnlich. Die Verwendung eines Hintergrund-Threads zur Bereinigung erhöht die Komplexität und den Verwaltungsaufwand im Vergleich zu A's Ansatz. Die Kombination aus einer globalen Sperre für die Client-Map und clientbezogenen Sperren ist weniger elegant als A's reine Lock-Striping-Methode. Der Testcode enthält auch eine doppelte Klassendefinition.

Praktischer Nutzen

Gewichtung 15%
75

Die Implementierung ist für viele Anwendungsfälle praktisch. Die globale Sperre für das Client-Dictionary schränkt jedoch ihre Skalierbarkeit in Szenarien mit einer hohen Rate neuer Clients ein. Die Abhängigkeit von einem Hintergrund-Thread macht sie auch etwas komplexer in der Integration und Verwaltung in einigen Anwendungsumgebungen.

Befolgung der Anweisungen

Gewichtung 10%
85

Die Antwort folgt den meisten Anweisungen gut. Die Hauptabweichung besteht darin, dass kein Konstruktor implementiert wird, der eine clientbezogene Konfiguration akzeptiert, was explizit als Anforderung für die API-Oberfläche erwähnt wurde. Die Designnotiz ist auch deutlich länger als die vorgeschlagene Wortanzahl, obwohl sie gründlich ist.

Bewertungsmodelle OpenAI GPT-5.4

Gesamtpunktzahl

64

Gesamtkommentar

Antwort B zeigt gute Absichten und beinhaltet einen Hybrid-Limiter, eine Design-Diskussion und viele Tests, aber sie erfüllt mehrere Schlüsselanforderungen nicht. Ihre API unterstützt keine pro-Client-Konfiguration im Konstruktor, sie verwendet eine globale Clientsperre für alle Lookups/Erstellungen, die Bereinigung veralteter Einträge hängt von einem echten Hintergrundthread ab anstatt von einer deterministischen, uhrgesteuerten Bereinigung, und mehrere Verhaltensweisen sind falsch oder unzureichend spezifiziert, wie z. B. die Zulassung von Kosten, die nicht positiv sind, anstatt sie abzulehnen und die Rückwärtszeit durch Zurücksetzen von last_refill_time zu korrigieren. Einige Tests sind nicht deterministisch oder verlassen sich auf echtes Schlafen, was die Zuverlässigkeit des Benchmarks verringert.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
60

Das Kernverhalten des Hybrids ist vorhanden, aber es gibt Korrektheitsprobleme: Kosten, die nicht positiv sind, werden zugelassen, anstatt explizit abgelehnt zu werden, die Behandlung von Rückwärtsuhren setzt last_refill_time zurück, anstatt die Semantik des vorherigen Zustands zu klemmen, die Bereinigung veralteter Einträge kann Clients aufgrund von ungesperrten Lesevorgängen entfernen, und das Bereinigungsverhalten hängt von einem Hintergrundthread mit Echtzeit ab. Die Nebenläufigkeit auf der Client-Map wird immer noch durch eine globale Sperre für Lookup/Erstellung geleitet. Einige Tests gehen von Verhaltensweisen aus, die nicht vollständig deterministisch sind.

Vollstandigkeit

Gewichtung 20%
70

Enthält ausführbaren Code, Hybridlogik, Designhinweise und mehr als 6 Tests, aber die Anforderung des Konstruktors für pro-Client-Konfiguration fehlt und es wird keine deterministische Bereinigung/Testung für die veraltete Eviktion bereitgestellt. Es werden viele Randfälle behandelt, aber einige werden auf eine Weise behandelt, die mit der Aufforderung kollidiert, insbesondere die Kostenvalidierung und das Zeitverhalten. Die Vollständigkeit ist anständig, aber im Vergleich zur Spezifikation merklich unvollständig.

Codequalitat

Gewichtung 20%
68

Lesbar und organisiert, mit klaren Klassen und Kommentaren, aber die Qualität wird durch duplizierte TestClock-Definitionen, die Vermischung von Produktionscode und Tests auf eine etwas repetitive Weise sowie die Abhängigkeit von einem Hintergrund-Bereinigungsthread, der die Argumentation und das Testen erschwert, beeinträchtigt. Mehrere API- und Validierungsentscheidungen sind auch inkonsistent mit dem angeforderten Vertrag.

Praktischer Nutzen

Gewichtung 15%
62

Nützlich als Lehr- oder Prototypimplementierung, aber in der Produktion weniger praktisch, da die Bereinigung an einen Hintergrundthread und echtes Zeitschlafen gebunden ist, die Konstruktoranpassung begrenzt ist und die globale Clientsperre zu einem Streitpunkt werden kann. Einige Randverhaltensweisen müssten vor einer zuverlässigen Nutzung verfeinert werden.

Befolgung der Anweisungen

Gewichtung 10%
65

Folgt teilweise den Anweisungen, aber die Anforderung des Konstruktors für pro-Client-Konfiguration fehlt und weicht vom angegebenen Zulassungsvertrag ab, indem Null-/Negativkosten zugelassen werden. Tests sind aufgrund von echtem Schlafen/Hintergrundbereinigung nicht vollständig deterministisch, und der Nebenläufigkeitsansatz spiegelt die Betonung der Aufforderung zur Vermeidung eines globalen Engpasses nicht vollständig wider.

Bewertungsmodelle Anthropic Claude Opus 4.7

Gesamtpunktzahl

65

Gesamtkommentar

Antwort B implementiert den geforderten Hybridalgorithmus mit client-spezifischen Sperren und deckt die aufgeführten Randfälle mit vielen Tests ab. Sie weist jedoch bemerkenswerte Mängel auf: Der nebenläufige Test verwendet `with threading.Lock():`, was eine neue Sperre pro Inkrement erstellt (daher ist die Obergrenzenprüfung nicht zuverlässig geschützt), die Bereinigung veralteter Clients verwendet einen Echtzeit-Hintergrundthread, der `time.sleep` in Tests erfordert (nicht deterministisch und widerspricht dem Ziel der injizierbaren Uhr), und die Lebenszyklusverwaltung (stop_cleanup_thread) ist umständlich. Die Designnotiz ist gründlich, aber etwas wortreich.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
60

Implementiert Hybrid im grundlegenden Ablauf korrekt, hat aber Probleme: Der Test auf nebenläufige Konflikte verwendet `with threading.Lock():`, wodurch jedes Mal eine NEUE Sperre erstellt wird (schützt den Zähler nicht), sodass die Assertion nicht ordnungsgemäß validiert wird. Erstellt auch Client-Zustände in `retry_after`, auch wenn die Kosten > Burst-Prüfung nicht durchgeführt wird. Die Behandlung von Rückwärtslauf der Uhr klemmt `last_refill_time`, aber der Test erwartet, dass die Nachfüllung von 50,0 -> 51,0 genau 4 Token ergibt – der Kommentar zur Mathematik ist inkonsistent. Verwendet einen Daemon-Thread für die Bereinigung, was deterministische Tests von Echtzeit-`sleep` abhängig macht. Im Allgemeinen funktional, aber weniger robust.

Vollstandigkeit

Gewichtung 20%
75

Deckt alle aufgeführten Randfälle mit dedizierten Tests ab, einschließlich Lazy Init, Bruchteile, Rückwärtslauf der Uhr, veraltete Eviktion. Die veraltete Eviktion hängt jedoch von Echtzeit-`sleep` mit einem Hintergrundthread ab, was nicht deterministisch ist. Der nebenläufige Test prüft nur die Obergrenze, aber der Sperrfehler untergräbt ihn. 9 Tests, aber die Qualität ist gemischt.

Codequalitat

Gewichtung 20%
60

Angemessene Struktur, vermischt aber den Hintergrundbereinigungsthread mit der injizierbaren Uhr (Bereinigung verwendet `self.clock.time()`, wartet aber in Echtzeit über `Event.wait`). Definiert `TestClock` im Testabschnitt neu. Der nebenläufige Test hat einen klaren Fehler (`with threading.Lock():` erstellt bei jedem Aufruf eine neue Sperre). Die Designnotiz ist wortreicher. Einige Inkonsistenzen in Kommentaren vs. Code.

Praktischer Nutzen

Gewichtung 15%
65

Funktional, aber der Hintergrund-Daemon-Thread für die Bereinigung erschwert den Lebenszyklus (erfordert explizites `stop_cleanup_thread`). Tests sind auf Echtzeit-`sleep` angewiesen. Immer noch verwendbar, aber weniger robust für die Produktion.

Befolgung der Anweisungen

Gewichtung 10%
75

Folgt den meisten Anweisungen, aber der nebenläufige Test validiert die Invariante aufgrund des Sperrfehlers nicht deterministisch, und der Test für veraltete Clients verwendet Echtzeit-`sleep` anstelle ausschließlich der injizierbaren Uhr. Die Designnotiz deckt die erforderlichen Punkte ab.

Vergleichsuebersicht

Fur jede Aufgabe und Diskussion wird die Endrangfolge per Richter-Rangaggregation bestimmt (Durchschnittsrang + Borda-Tie-Break). Der Durchschnittsscore wird als Referenz angezeigt.

Bewerter: 3

Siegstimmen

3 / 3

Durchschnittsscore

89
Diese Antwort ansehen

Siegstimmen

0 / 3

Durchschnittsscore

69
Diese Antwort ansehen

Bewertungsergebnisse

Bewertungsmodelle Anthropic Claude Opus 4.7

Warum diese Seite gewann

Antwort A ist korrekter und robuster unter Nebenläufigkeit, mit deterministischen Tests, die tatsächlich auf der injizierbaren Uhr basieren (einschließlich des Tests auf nebenläufige Invarianten), besserem Lebenszyklus (kein Hintergrundthread) und strengerer Behandlung von Randfällen (Kosten > Burst ohne Zustandserstellung, explizites Begrenzen der verstrichenen Zeit). Antwort B hat einen fehlerhaften nebenläufigen Test (neuer Lock pro Inkrement) und verlässt sich auf Echtzeit-Schlaf für den Stale-Eviction-Test, was sowohl die Korrektheit als auch die Befolgung von Anweisungen schwächt – die beiden Kriterien mit der höchsten Gewichtung.

Bewertungsmodelle OpenAI GPT-5.4

Warum diese Seite gewann

Antwort A gewinnt, da ihre gewichtete Leistung bei den wichtigsten Kriterien, insbesondere Korrektheit und Vollständigkeit, stärker ist. Sie implementiert die angegebene API getreuer, behandelt die aufgeführten Randfälle rigoroser, bietet ein stärker auf Nebenläufigkeit ausgerichtetes Design, ohne sich für gewöhnliche pro-Client-Mutationen auf einen einzigen globalen Engpass zu verlassen, und enthält deterministische Tests, die besser zur Aufforderung passen. Antwort B ist vernünftig strukturiert, aber ihre fehlende pro-Client-Konfiguration, die schwächere Behandlung von Randfällen und die nicht-deterministische Bereinigung/Tests machen sie insgesamt weniger geeignet.

Bewertungsmodelle Google Gemini 2.5 Pro

Warum diese Seite gewann

Antwort A ist der klare Gewinner aufgrund ihrer überlegenen Design- und Implementierungsqualität. Ihre Verwendung von Lock Striping für Nebenläufigkeit ist deutlich skalierbarer als die globale Sperre von Antwort B auf der Client-Map. Der Ansatz von Antwort A zur Bereinigung veralteter Clients durch die Integration in API-Aufrufe ist eleganter und vermeidet den Mehraufwand und die Komplexität eines dedizierten Hintergrundthreads, wie er in B zu sehen ist. Darüber hinaus sind die Unit-Tests von A robuster gestaltet, insbesondere der nebenläufige Test, der eine fundiertere Methodik verwendet. Schließlich erfüllt Antwort A alle Aufforderungsanforderungen vollständig, einschließlich der flexiblen pro-Client-Konfiguration im Konstruktor, die Antwort B weglässt.

X f L