Reponse A: OpenAI GPT-5.5
import math import threading import time import unittest from collections import deque from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field # Conception : # Ce limiteur n'autorise une requête que lorsque deux vérifications indépendantes concordent : # 1. un seau de jetons contient au moins `cost` jetons, permettant des rafales courtes jusqu'à `burst` ; # 2. un journal exact à fenêtre glissante a de la place pour `cost` unités supplémentaires autorisées dans la dernière # fenêtre `window_seconds`, où la limite de fenêtre est `rate * window_seconds`. # Si le seau de jetons...
Afficher la reponse complete ▼
import math import threading import time import unittest from collections import deque from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field # Conception : # Ce limiteur n'autorise une requête que lorsque deux vérifications indépendantes concordent : # 1. un seau de jetons contient au moins `cost` jetons, permettant des rafales courtes jusqu'à `burst` ; # 2. un journal exact à fenêtre glissante a de la place pour `cost` unités supplémentaires autorisées dans la dernière # fenêtre `window_seconds`, où la limite de fenêtre est `rate * window_seconds`. # Si le seau de jetons dit oui mais que la fenêtre glissante est pleine, la requête est # refusée et aucun jeton n'est dépensé ; si la fenêtre a de la place mais que les jetons sont absents, # elle est également refusée. Le journal exact stocke une entrée par requête autorisée sous forme de # (timestamp, cost), et non une entrée par unité, de sorte que l'autorisation est amortie en O(1) : chaque entrée de journal # est ajoutée une fois et supprimée une fois. La mémoire est O(nombre de requêtes autorisées # dans la fenêtre active) par client. Un compteur à deux seaux pondérés utiliserait une mémoire O(1) # mais peut autoriser trop ou pas assez près des limites des seaux ; cette implémentation # choisit l'exactitude car la correction est généralement plus importante pour la prévention des abus. # La carte client est verrouillée par bandes ; l'état de chaque client a son propre verrou. # Un appelant obtient le verrou par client avant de modifier l'état des jetons/fenêtres, de sorte que les appels d'autorisation concurrents ne peuvent pas dépenser deux fois les jetons ni perdre les mises à jour de la fenêtre. class MonotonicClock: def now(self) -> float: return time.monotonic() class ManualClock: def __init__(self, start: float = 0.0): self._now = float(start) self._lock = threading.Lock() def now(self) -> float: with self._lock: return self._now def set(self, value: float) -> None: with self._lock: self._now = float(value) def advance(self, seconds: float) -> None: with self._lock: self._now += float(seconds) @dataclass(frozen=True) class ClientConfig: rate: float burst: float window_seconds: float = 1.0 def validated(self) -> 'ClientConfig': rate = float(self.rate) burst = float(self.burst) window = float(self.window_seconds) if not math.isfinite(rate) or rate <= 0: raise ValueError('rate must be a positive finite number') if not math.isfinite(burst) or burst < 1: raise ValueError('burst must be a finite number >= 1') if not math.isfinite(window) or window <= 0: raise ValueError('window_seconds must be a positive finite number') return ClientConfig(rate, burst, window) @property def window_limit(self) -> float: return self.rate * self.window_seconds @dataclass class _ClientState: config: ClientConfig tokens: float last_refill: float last_seen: float events: deque = field(default_factory=deque) window_sum: float = 0.0 lock: threading.Lock = field(default_factory=threading.Lock) class HybridRateLimiter: def __init__( self, rate: float, burst: float, window_seconds: float = 1.0, per_client_configs=None, clock=None, stripes: int = 64, idle_ttl_seconds: float | None = None, cleanup_interval_seconds: float | None = None, ): if stripes <= 0: raise ValueError('stripes must be positive') self._default_config = ClientConfig(rate, burst, window_seconds).validated() self._configs = {} if per_client_configs: for client_id, cfg in per_client_configs.items(): self._configs[client_id] = self._coerce_config(cfg) max_window = self._default_config.window_seconds for cfg in self._configs.values(): max_window = max(max_window, cfg.window_seconds) if idle_ttl_seconds is None: idle_ttl_seconds = max(60.0, 2.0 * max_window) idle_ttl_seconds = float(idle_ttl_seconds) if not math.isfinite(idle_ttl_seconds) or idle_ttl_seconds <= 0: raise ValueError('idle_ttl_seconds must be positive and finite') if idle_ttl_seconds < max_window: raise ValueError('idle_ttl_seconds must be >= the largest window_seconds') self._idle_ttl = idle_ttl_seconds if cleanup_interval_seconds is None: cleanup_interval_seconds = min(60.0, max(1.0, idle_ttl_seconds / 2.0)) cleanup_interval_seconds = float(cleanup_interval_seconds) if not math.isfinite(cleanup_interval_seconds) or cleanup_interval_seconds <= 0: raise ValueError('cleanup_interval_seconds must be positive and finite') self._cleanup_interval = cleanup_interval_seconds self._clock = clock if clock is not None else MonotonicClock() self._clients = {} self._stripe_locks = [threading.Lock() for _ in range(stripes)] self._cleanup_mutex = threading.Lock() self._last_cleanup = self._safe_time() self._eps = 1e-12 @staticmethod def _coerce_config(cfg) -> ClientConfig: if isinstance(cfg, ClientConfig): return cfg.validated() if isinstance(cfg, dict): return ClientConfig(cfg['rate'], cfg['burst'], cfg.get('window_seconds', 1.0)).validated() rate, burst, *rest = cfg window = rest[0] if rest else 1.0 return ClientConfig(rate, burst, window).validated() def _safe_time(self) -> float: t = float(self._clock.now()) if math.isnan(t): raise ValueError('clock returned NaN') return t def _config_for(self, client_id: str) -> ClientConfig: return self._configs.get(client_id, self._default_config) def _stripe_index(self, client_id: str) -> int: return hash(client_id) % len(self._stripe_locks) def _get_state_locked(self, client_id: str) -> _ClientState: idx = self._stripe_index(client_id) stripe = self._stripe_locks[idx] stripe.acquire() try: state = self._clients.get(client_id) if state is None: cfg = self._config_for(client_id) now = self._safe_time() state = _ClientState( config=cfg, tokens=cfg.burst, last_refill=now, last_seen=now, ) self._clients[client_id] = state state.lock.acquire() return state finally: stripe.release() def _advance_and_prune_locked(self, state: _ClientState) -> float: raw_now = self._safe_time() now = max(raw_now, state.last_seen) elapsed = max(0.0, now - state.last_refill) max_useful_elapsed = state.config.burst / state.config.rate elapsed_for_tokens = min(elapsed, max_useful_elapsed) if elapsed_for_tokens > 0: state.tokens = min( state.config.burst, state.tokens + elapsed_for_tokens * state.config.rate, ) state.last_refill = now state.last_seen = now cutoff = now - state.config.window_seconds while state.events and state.events[0][0] <= cutoff + self._eps: _, old_cost = state.events.popleft() state.window_sum -= old_cost if state.window_sum < self._eps: state.window_sum = 0.0 return now def allow(self, client_id: str, cost: int = 1) -> bool: if cost <= 0: raise ValueError('cost must be a positive integer') self._maybe_cleanup() cfg = self._config_for(client_id) if cost > cfg.burst: return False state = self._get_state_locked(client_id) try: now = self._advance_and_prune_locked(state) enough_tokens = state.tokens + self._eps >= cost enough_window = state.window_sum + cost <= state.config.window_limit + self._eps if not (enough_tokens and enough_window): return False state.tokens -= cost if state.tokens < self._eps: state.tokens = 0.0 state.window_sum += cost state.events.append((now, float(cost))) return True finally: state.lock.release() def retry_after(self, client_id: str) -> float: self._maybe_cleanup() state = self._get_state_locked(client_id) try: now = self._advance_and_prune_locked(state) cfg = state.config if cfg.burst < 1 or cfg.window_limit < 1: return math.inf token_wait = 0.0 if state.tokens + self._eps >= 1.0 else (1.0 - state.tokens) / cfg.rate if state.window_sum + 1.0 <= cfg.window_limit + self._eps: window_wait = 0.0 else: window_wait = math.inf remaining = state.window_sum for ts, event_cost in state.events: remaining -= event_cost if remaining + 1.0 <= cfg.window_limit + self._eps: window_wait = max(0.0, ts + cfg.window_seconds - now) break return max(token_wait, window_wait) finally: state.lock.release() def _maybe_cleanup(self) -> None: raw_now = self._safe_time() if raw_now - self._last_cleanup < self._cleanup_interval: return if not self._cleanup_mutex.acquire(blocking=False): return try: now = max(raw_now, self._last_cleanup) if now - self._last_cleanup >= self._cleanup_interval: self._cleanup_stale_at(now) self._last_cleanup = now finally: self._cleanup_mutex.release() def cleanup_stale(self) -> int: with self._cleanup_mutex: now = max(self._safe_time(), self._last_cleanup) removed = self._cleanup_stale_at(now) self._last_cleanup = now return removed def _cleanup_stale_at(self, now: float) -> int: removed = 0 for idx, stripe in enumerate(self._stripe_locks): with stripe: for client_id, state in list(self._clients.items()): if self._stripe_index(client_id) != idx: continue if not state.lock.acquire(blocking=False): continue try: if self._clients.get(client_id) is state and now - state.last_seen >= self._idle_ttl: del self._clients[client_id] removed += 1 finally: state.lock.release() return removed def client_count(self) -> int: total = 0 for stripe in self._stripe_locks: with stripe: total += len(self._clients) break return len(self._clients) class HybridRateLimiterTests(unittest.TestCase): def test_basic_allow_deny_and_retry_after(self): clock = ManualClock() limiter = HybridRateLimiter(rate=1, burst=2, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9) def test_burst_draining_and_fractional_refill(self): clock = ManualClock() limiter = HybridRateLimiter(rate=2, burst=3, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) clock.advance(0.25) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 0.25, places=9) clock.advance(0.25) self.assertTrue(limiter.allow('alice')) def test_sliding_window_caps_even_when_bucket_refilled(self): clock = ManualClock() limiter = HybridRateLimiter(rate=10, burst=10, window_seconds=1, clock=clock) for _ in range(10): self.assertTrue(limiter.allow('abuser')) clock.advance(0.5) self.assertFalse(limiter.allow('abuser')) self.assertAlmostEqual(limiter.retry_after('abuser'), 0.5, places=9) clock.advance(0.5) self.assertTrue(limiter.allow('abuser')) def test_cost_larger_than_burst_is_rejected_without_state_creation(self): clock = ManualClock() limiter = HybridRateLimiter(rate=10, burst=5, window_seconds=10, clock=clock) self.assertFalse(limiter.allow('alice', cost=6)) self.assertEqual(limiter.client_count(), 0) self.assertTrue(limiter.allow('alice', cost=5)) self.assertFalse(limiter.allow('alice')) def test_concurrent_contention_obeys_token_bucket_bound(self): clock = ManualClock() rate = 5 burst = 5 limiter = HybridRateLimiter(rate=rate, burst=burst, window_seconds=100, clock=clock, stripes=8) successes = 0 for tick in range(11): clock.set(tick / 10.0) with ThreadPoolExecutor(max_workers=32) as pool: results = list(pool.map(lambda _: limiter.allow('hot'), range(32))) successes += sum(1 for r in results if r) bound = rate * 1.0 + burst self.assertLessEqual(successes, bound) def test_stale_client_eviction(self): clock = ManualClock() limiter = HybridRateLimiter( rate=10, burst=10, window_seconds=1, clock=clock, idle_ttl_seconds=5, cleanup_interval_seconds=1, ) self.assertTrue(limiter.allow('old-client')) self.assertEqual(limiter.client_count(), 1) clock.advance(6) self.assertTrue(limiter.allow('new-client')) self.assertEqual(limiter.client_count(), 1) def test_clock_going_backwards_is_clamped(self): clock = ManualClock(10) limiter = HybridRateLimiter(rate=1, burst=1, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) clock.set(5) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9) def test_large_pause_refills_only_to_burst(self): clock = ManualClock() limiter = HybridRateLimiter(rate=1, burst=3, window_seconds=1000, clock=clock) for _ in range(3): self.assertTrue(limiter.allow('alice')) clock.advance(10 ** 9) for _ in range(3): self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) if __name__ == '__main__': unittest.main()
Resultat
Votes gagnants
3 / 3
Score moyen
Score total
Commentaire global
La réponse A fournit une implémentation sophistiquée, robuste et de qualité production du limiteur de débit hybride. Ses principaux atouts sont le modèle de concurrence hautement évolutif utilisant le verrouillage par bandes combiné à des verrous par client, un mécanisme de nettoyage élégant qui évite un thread d'arrière-plan, et une suite de tests complète et bien conçue qui valide correctement les comportements complexes tels que la contention concurrente. Le code est propre, bien structuré et respecte pleinement toutes les exigences de l'invite, y compris les fonctionnalités avancées telles que les configurations par client.
Afficher le detail de l evaluation ▼
Exactitude
Poids 35%L'implémentation est hautement correcte et robuste. Le modèle de concurrence utilisant le verrouillage par bandes et les verrous par client est une manière sophistiquée et correcte d'éviter les goulots d'étranglement. Tous les cas limites, y compris la sécurité de l'horloge et les recharges fractionnaires, sont gérés correctement. La logique du seau de jetons et de la fenêtre glissante est saine.
Completude
Poids 20%Cette réponse est exceptionnellement complète. Elle implémente toutes les méthodes d'API requises, gère tous les cas limites spécifiés, fournit une suite de tests complète avec plus que le nombre requis de tests, et inclut une note de conception claire. Elle implémente également la configuration suggérée par client, démontrant une compréhension complète de l'invite.
Qualite du code
Poids 20%La qualité du code est excellente. Il utilise efficacement les fonctionnalités modernes de Python telles que les dataclasses, est bien structuré en méthodes logiques et est très lisible. Les choix de conception, tels que le mécanisme de nettoyage intégré et le verrouillage par bandes, sont élégants et démontrent un haut niveau de compétence en ingénierie.
Valeur pratique
Poids 15%L'implémentation est de qualité production et très pratique. Le modèle de concurrence évolutif la rend adaptée aux systèmes à haut débit. La configuration flexible et la gestion robuste des cas limites signifient qu'elle pourrait être déployée en toute confiance dans une application réelle.
Respect des consignes
Poids 10%La réponse suit parfaitement toutes les instructions. Elle fournit du code dans une langue demandée, implémente l'API spécifiée, l'algorithme et la stratégie de concurrence, gère tous les cas limites, inclut les tests requis et l'analyse de complexité, et fournit une note de conception dans les limites spécifiées.
Score total
Commentaire global
La réponse A est une implémentation solide et largement complète qui correspond directement à l'API et à la sémantique demandées. Elle combine un seau à jetons avec un journal exact à fenêtre glissante, utilise une horloge monotone injectable, évite un goulot d'étranglement global unique grâce à des verrous de carte rayés plus des verrous par client, et inclut des tests déterministes solides couvrant les cas limites requis. La note de conception est concise et aborde les compromis, la correction de la concurrence et la complexité. Les faiblesses mineures incluent une implémentation de client_count quelque peu maladroite et un retry_after ne raisonnant que sur la disponibilité de 1 unité plutôt que sur un coût arbitraire, bien que cela corresponde à l'API demandée.
Afficher le detail de l evaluation ▼
Exactitude
Poids 35%Implémente correctement l'hybride requis : la requête n'est autorisée que si le seau à jetons et la fenêtre glissante exacte le permettent, avec un remplissage correct des jetons, une initialisation paresseuse, un nettoyage des obsolètes et une mutation sérialisée par client empêchant le double dépense. retry_after est cohérent pour une disponibilité de 1 unité, et les tests couvrent les invariants importants. Les préoccupations mineures concernent de petites bizarreries d'implémentation telles que la lecture de la carte par client_count sans synchronisation complète et certains choix de limites dans l'éviction de la fenêtre.
Completude
Poids 20%Couvre presque tous les éléments demandés : surface d'API requise, algorithme hybride, horloge monotone injectable, prise en charge de la configuration par client, cas limites explicites, nettoyage des clients obsolètes, synchronisation fractionnée, discussion sur la concurrence, note sur la complexité et 8 tests significatifs incluant la contention concurrente déterministe et l'éviction. Ceci est très proche de satisfaire pleinement la consigne.
Qualite du code
Poids 20%Bien structuré et idiomatique, avec des dataclasses, des fonctions d'aide séparées, une validation explicite et une nomenclature claire. La stratégie de verrouillage est organisée de manière réfléchie et la note de conception est concise. Quelques aspérités subsistent, telles que l'implémentation inhabituelle de client_count et une gestion des verrous de bas niveau qui pourrait être simplifiée.
Valeur pratique
Poids 15%Pratique pour une utilisation backend : injection d'horloge déterministe, nettoyage borné de l'état obsolète, rayage des verrous plus verrous par client pour la contention, comptage exact de la fenêtre et tests solides le rendent déployable ou adaptable. L'implémentation est réaliste et aborde les cas limites courants en production tels que les longues pauses et la suspension de VM.
Respect des consignes
Poids 10%Suit la consigne de près : le choix de la langue est valide, les noms d'API correspondent, la note de conception est dans le cadre, les tests utilisent l'horloge injectable et les cas limites demandés sont explicitement abordés. La réponse correspond bien aux exigences techniques et de documentation.
Score total
Commentaire global
La réponse A est un limiteur de débit hybride peaufiné et de qualité professionnelle. Il utilise des dataclasses, des verrous de bande plus des verrous par client, intègre le nettoyage des éléments obsolètes dans les appels d'autorisation (évitant les problèmes de thread d'arrière-plan), valide la configuration, limite correctement le temps écoulé pour les horloges inversées et les pauses importantes, et rejette les coûts supérieurs à la rafale sans créer d'état client. Les 8 tests utilisent le ManualClock injectable de manière déterministe, y compris un test de contention concurrent qui vérifie réellement l'invariant taux*T+rafale. La note de conception est concise et aborde tous les points requis.
Afficher le detail de l evaluation ▼
Exactitude
Poids 35%Implémente correctement le seau à jetons hybride + le journal de fenêtre glissante exact. Gère l'horloge inversée en limitant via max(raw_now, last_seen), rejette coût>rafale sans créer d'état, limite le temps écoulé pour les jetons afin d'éviter le dépassement, utilise des verrous par client avec des verrous de bande pour la carte. retry_after calcule correctement le temps d'attente en fonction de l'expiration du journal de fenêtre. La gestion des eps et la comptabilisation des événements semblent solides.
Completude
Poids 20%Couvre tous les cas limites requis : coût>rafale (sans création d'état), horloge inversée (test inclus), pauses importantes (test inclus), initialisation paresseuse, éviction des éléments obsolètes (déterministe via _maybe_cleanup lié à l'horloge injectée), jetons fractionnaires. 8 tests, y compris un test de limite concurrent qui vérifie l'invariant taux*T+rafale de manière déterministe en utilisant ManualClock.
Qualite du code
Poids 20%Bien structuré avec des dataclasses, séparation de MonotonicClock/ManualClock, validation de ClientConfig, verrous par client et par bande clairement expliqués. Le nettoyage est intégré au chemin d'autorisation (pas de complications de thread d'arrière-plan). Note de conception concise en haut.
Valeur pratique
Poids 15%Qualité professionnelle : les verrous de bande sont plus évolutifs, le nettoyage paresseux intégré évite le cycle de vie du thread d'arrière-plan, support de test déterministe, validation de la configuration. Convient pour être intégré dans un service réel.
Respect des consignes
Poids 10%Suit toutes les instructions : la surface de l'API correspond aux signatures, algorithme hybride, verrous par client avec verrous de bande, horloge monotone injectable, tous les cas limites, 8 tests, y compris le test d'invariant concurrent déterministe, note de conception avec sémantique et complexité énoncées.