Respuesta A: OpenAI GPT-5.5
import math import threading import time import unittest from collections import deque from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field # Design note: # This limiter admits a request only when two independent checks agree: # 1. a token bucket has at least cost tokens, allowing short bursts up to burst; # 2. an exact sliding-window log has room for cost more admitted units in the last # window_seconds, where the window limit is rate * window_seconds. # If the token bucket says yes but the sliding window is full, the request is # denied and no tokens are spent; if the w...
Mostrar respuesta completa ▼
import math import threading import time import unittest from collections import deque from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field # Design note: # This limiter admits a request only when two independent checks agree: # 1. a token bucket has at least cost tokens, allowing short bursts up to burst; # 2. an exact sliding-window log has room for cost more admitted units in the last # window_seconds, where the window limit is rate * window_seconds. # If the token bucket says yes but the sliding window is full, the request is # denied and no tokens are spent; if the window has room but tokens are absent, # it is also denied. The exact log stores one entry per admitted request as # (timestamp, cost), not one entry per unit, so allow is amortized O(1): each log # entry is appended once and removed once. Memory is O(number of admitted requests # in the active window) per client. A weighted two-bucket counter would use O(1) # memory but can over- or under-admit near bucket boundaries; this implementation # chooses exactness because correctness is usually more important for abuse # prevention. The client map is lock-striped; each client state has its own lock. # A caller obtains the per-client lock before mutating tokens/window state, so # concurrent allow calls cannot double-spend tokens or lose window updates. class MonotonicClock: def now(self) -> float: return time.monotonic() class ManualClock: def __init__(self, start: float = 0.0): self._now = float(start) self._lock = threading.Lock() def now(self) -> float: with self._lock: return self._now def set(self, value: float) -> None: with self._lock: self._now = float(value) def advance(self, seconds: float) -> None: with self._lock: self._now += float(seconds) @dataclass(frozen=True) class ClientConfig: rate: float burst: float window_seconds: float = 1.0 def validated(self) -> 'ClientConfig': rate = float(self.rate) burst = float(self.burst) window = float(self.window_seconds) if not math.isfinite(rate) or rate <= 0: raise ValueError('rate must be a positive finite number') if not math.isfinite(burst) or burst < 1: raise ValueError('burst must be a finite number >= 1') if not math.isfinite(window) or window <= 0: raise ValueError('window_seconds must be a positive finite number') return ClientConfig(rate, burst, window) @property def window_limit(self) -> float: return self.rate * self.window_seconds @dataclass class _ClientState: config: ClientConfig tokens: float last_refill: float last_seen: float events: deque = field(default_factory=deque) window_sum: float = 0.0 lock: threading.Lock = field(default_factory=threading.Lock) class HybridRateLimiter: def __init__( self, rate: float, burst: float, window_seconds: float = 1.0, per_client_configs=None, clock=None, stripes: int = 64, idle_ttl_seconds: float | None = None, cleanup_interval_seconds: float | None = None, ): if stripes <= 0: raise ValueError('stripes must be positive') self._default_config = ClientConfig(rate, burst, window_seconds).validated() self._configs = {} if per_client_configs: for client_id, cfg in per_client_configs.items(): self._configs[client_id] = self._coerce_config(cfg) max_window = self._default_config.window_seconds for cfg in self._configs.values(): max_window = max(max_window, cfg.window_seconds) if idle_ttl_seconds is None: idle_ttl_seconds = max(60.0, 2.0 * max_window) idle_ttl_seconds = float(idle_ttl_seconds) if not math.isfinite(idle_ttl_seconds) or idle_ttl_seconds <= 0: raise ValueError('idle_ttl_seconds must be positive and finite') if idle_ttl_seconds < max_window: raise ValueError('idle_ttl_seconds must be >= the largest window_seconds') self._idle_ttl = idle_ttl_seconds if cleanup_interval_seconds is None: cleanup_interval_seconds = min(60.0, max(1.0, idle_ttl_seconds / 2.0)) cleanup_interval_seconds = float(cleanup_interval_seconds) if not math.isfinite(cleanup_interval_seconds) or cleanup_interval_seconds <= 0: raise ValueError('cleanup_interval_seconds must be positive and finite') self._cleanup_interval = cleanup_interval_seconds self._clock = clock if clock is not None else MonotonicClock() self._clients = {} self._stripe_locks = [threading.Lock() for _ in range(stripes)] self._cleanup_mutex = threading.Lock() self._last_cleanup = self._safe_time() self._eps = 1e-12 @staticmethod def _coerce_config(cfg) -> ClientConfig: if isinstance(cfg, ClientConfig): return cfg.validated() if isinstance(cfg, dict): return ClientConfig(cfg['rate'], cfg['burst'], cfg.get('window_seconds', 1.0)).validated() rate, burst, *rest = cfg window = rest[0] if rest else 1.0 return ClientConfig(rate, burst, window).validated() def _safe_time(self) -> float: t = float(self._clock.now()) if math.isnan(t): raise ValueError('clock returned NaN') return t def _config_for(self, client_id: str) -> ClientConfig: return self._configs.get(client_id, self._default_config) def _stripe_index(self, client_id: str) -> int: return hash(client_id) % len(self._stripe_locks) def _get_state_locked(self, client_id: str) -> _ClientState: idx = self._stripe_index(client_id) stripe = self._stripe_locks[idx] stripe.acquire() try: state = self._clients.get(client_id) if state is None: cfg = self._config_for(client_id) now = self._safe_time() state = _ClientState( config=cfg, tokens=cfg.burst, last_refill=now, last_seen=now, ) self._clients[client_id] = state state.lock.acquire() return state finally: stripe.release() def _advance_and_prune_locked(self, state: _ClientState) -> float: raw_now = self._safe_time() now = max(raw_now, state.last_seen) elapsed = max(0.0, now - state.last_refill) max_useful_elapsed = state.config.burst / state.config.rate elapsed_for_tokens = min(elapsed, max_useful_elapsed) if elapsed_for_tokens > 0: state.tokens = min( state.config.burst, state.tokens + elapsed_for_tokens * state.config.rate, ) state.last_refill = now state.last_seen = now cutoff = now - state.config.window_seconds while state.events and state.events[0][0] <= cutoff + self._eps: _, old_cost = state.events.popleft() state.window_sum -= old_cost if state.window_sum < self._eps: state.window_sum = 0.0 return now def allow(self, client_id: str, cost: int = 1) -> bool: if cost <= 0: raise ValueError('cost must be a positive integer') self._maybe_cleanup() cfg = self._config_for(client_id) if cost > cfg.burst: return False state = self._get_state_locked(client_id) try: now = self._advance_and_prune_locked(state) enough_tokens = state.tokens + self._eps >= cost enough_window = state.window_sum + cost <= state.config.window_limit + self._eps if not (enough_tokens and enough_window): return False state.tokens -= cost if state.tokens < self._eps: state.tokens = 0.0 state.window_sum += cost state.events.append((now, float(cost))) return True finally: state.lock.release() def retry_after(self, client_id: str) -> float: self._maybe_cleanup() state = self._get_state_locked(client_id) try: now = self._advance_and_prune_locked(state) cfg = state.config if cfg.burst < 1 or cfg.window_limit < 1: return math.inf token_wait = 0.0 if state.tokens + self._eps >= 1.0 else (1.0 - state.tokens) / cfg.rate if state.window_sum + 1.0 <= cfg.window_limit + self._eps: window_wait = 0.0 else: window_wait = math.inf remaining = state.window_sum for ts, event_cost in state.events: remaining -= event_cost if remaining + 1.0 <= cfg.window_limit + self._eps: window_wait = max(0.0, ts + cfg.window_seconds - now) break return max(token_wait, window_wait) finally: state.lock.release() def _maybe_cleanup(self) -> None: raw_now = self._safe_time() if raw_now - self._last_cleanup < self._cleanup_interval: return if not self._cleanup_mutex.acquire(blocking=False): return try: now = max(raw_now, self._last_cleanup) if now - self._last_cleanup >= self._cleanup_interval: self._cleanup_stale_at(now) self._last_cleanup = now finally: self._cleanup_mutex.release() def cleanup_stale(self) -> int: with self._cleanup_mutex: now = max(self._safe_time(), self._last_cleanup) removed = self._cleanup_stale_at(now) self._last_cleanup = now return removed def _cleanup_stale_at(self, now: float) -> int: removed = 0 for idx, stripe in enumerate(self._stripe_locks): with stripe: for client_id, state in list(self._clients.items()): if self._stripe_index(client_id) != idx: continue if not state.lock.acquire(blocking=False): continue try: if self._clients.get(client_id) is state and now - state.last_seen >= self._idle_ttl: del self._clients[client_id] removed += 1 finally: state.lock.release() return removed def client_count(self) -> int: total = 0 for stripe in self._stripe_locks: with stripe: total += len(self._clients) break return len(self._clients) class HybridRateLimiterTests(unittest.TestCase): def test_basic_allow_deny_and_retry_after(self): clock = ManualClock() limiter = HybridRateLimiter(rate=1, burst=2, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9) def test_burst_draining_and_fractional_refill(self): clock = ManualClock() limiter = HybridRateLimiter(rate=2, burst=3, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) clock.advance(0.25) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 0.25, places=9) clock.advance(0.25) self.assertTrue(limiter.allow('alice')) def test_sliding_window_caps_even_when_bucket_refilled(self): clock = ManualClock() limiter = HybridRateLimiter(rate=10, burst=10, window_seconds=1, clock=clock) for _ in range(10): self.assertTrue(limiter.allow('abuser')) clock.advance(0.5) self.assertFalse(limiter.allow('abuser')) self.assertAlmostEqual(limiter.retry_after('abuser'), 0.5, places=9) clock.advance(0.5) self.assertTrue(limiter.allow('abuser')) def test_cost_larger_than_burst_is_rejected_without_state_creation(self): clock = ManualClock() limiter = HybridRateLimiter(rate=10, burst=5, window_seconds=10, clock=clock) self.assertFalse(limiter.allow('alice', cost=6)) self.assertEqual(limiter.client_count(), 0) self.assertTrue(limiter.allow('alice', cost=5)) self.assertFalse(limiter.allow('alice')) def test_concurrent_contention_obeys_token_bucket_bound(self): clock = ManualClock() rate = 5 burst = 5 limiter = HybridRateLimiter(rate=rate, burst=burst, window_seconds=100, clock=clock, stripes=8) successes = 0 for tick in range(11): clock.set(tick / 10.0) with ThreadPoolExecutor(max_workers=32) as pool: results = list(pool.map(lambda _: limiter.allow('hot'), range(32))) successes += sum(1 for r in results if r) bound = rate * 1.0 + burst self.assertLessEqual(successes, bound) def test_stale_client_eviction(self): clock = ManualClock() limiter = HybridRateLimiter( rate=10, burst=10, window_seconds=1, clock=clock, idle_ttl_seconds=5, cleanup_interval_seconds=1, ) self.assertTrue(limiter.allow('old-client')) self.assertEqual(limiter.client_count(), 1) clock.advance(6) self.assertTrue(limiter.allow('new-client')) self.assertEqual(limiter.client_count(), 1) def test_clock_going_backwards_is_clamped(self): clock = ManualClock(10) limiter = HybridRateLimiter(rate=1, burst=1, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) clock.set(5) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9) def test_large_pause_refills_only_to_burst(self): clock = ManualClock() limiter = HybridRateLimiter(rate=1, burst=3, window_seconds=1000, clock=clock) for _ in range(3): self.assertTrue(limiter.allow('alice')) clock.advance(10 ** 9) for _ in range(3): self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) if __name__ == '__main__': unittest.main()
Resultado
Votos ganadores
3 / 3
Puntuacion media
Puntuacion total
Comentario general
La Respuesta A proporciona una implementación sofisticada, robusta y de calidad de producción del limitador de tasa híbrido. Sus puntos fuertes clave son el modelo de concurrencia altamente escalable que utiliza el bloqueo por franjas (lock striping) combinado con bloqueos por cliente, un elegante mecanismo de limpieza que evita un hilo en segundo plano y un conjunto de pruebas completo y bien diseñado que valida correctamente comportamientos complejos como la contención concurrente. El código está limpio, bien estructurado y cumple plenamente con todos los requisitos del prompt, incluidas funciones avanzadas como la configuración por cliente.
Ver detalle de evaluacion ▼
Correccion
Peso 35%La implementación es altamente correcta y robusta. El modelo de concurrencia que utiliza el bloqueo por franjas y los bloqueos por cliente es una forma sofisticada y correcta de evitar cuellos de botella. Todos los casos extremos, incluida la seguridad del reloj y los reabastecimientos fraccionarios, se manejan correctamente. La lógica tanto para el cubo de tokens como para la ventana deslizante es sólida.
Integridad
Peso 20%Esta respuesta es excepcionalmente completa. Implementa todos los métodos de API requeridos, maneja todos los casos extremos especificados, proporciona un conjunto de pruebas completo con más del número requerido de pruebas e incluye una nota de diseño clara. También implementa la configuración sugerida por cliente, lo que demuestra una comprensión completa del prompt.
Calidad del codigo
Peso 20%La calidad del código es excelente. Utiliza características modernas de Python como las clases de datos (dataclasses) de manera efectiva, está bien estructurado en métodos lógicos y es muy legible. Las decisiones de diseño, como el mecanismo de limpieza integrado y el bloqueo por franjas, son elegantes y demuestran un alto nivel de habilidad de ingeniería.
Valor practico
Peso 15%La implementación es de calidad de grado de producción y muy práctica. El modelo de concurrencia escalable la hace adecuada para sistemas de alto rendimiento. La configuración flexible y el manejo robusto de los casos extremos significan que podría implementarse con confianza en una aplicación del mundo real.
Seguimiento de instrucciones
Peso 10%La respuesta sigue todas las instrucciones a la perfección. Proporciona código en un idioma solicitado, implementa la API especificada, el algoritmo y la estrategia de concurrencia, maneja todos los casos extremos, incluye las pruebas requeridas y el análisis de complejidad, y proporciona una nota de diseño dentro de las restricciones especificadas.
Puntuacion total
Comentario general
La respuesta A es una implementación sólida y mayormente completa que coincide directamente con la API y la semántica solicitadas. Combina un cubo de tokens con un registro de ventana deslizante exacto, utiliza un reloj monótono inyectable, evita un cuello de botella global único a través de bloqueos de mapas rayados más bloqueos por cliente, e incluye pruebas deterministas sólidas que cubren los casos de borde requeridos. La nota de diseño es concisa y aborda las compensaciones, la corrección de la concurrencia y la complejidad. Las debilidades menores incluyen una implementación de client_count algo torpe y retry_after que solo considera la disponibilidad de 1 unidad en lugar de un costo arbitrario, aunque esto coincide con la API solicitada.
Ver detalle de evaluacion ▼
Correccion
Peso 35%Implementa correctamente el híbrido requerido: la solicitud se aprueba solo si tanto el cubo de tokens como la ventana deslizante exacta lo permiten, con recarga de tokens adecuada, inicialización perezosa, limpieza de obsoletos y mutación serializada por cliente que evita el doble gasto. retry_after es coherente para la disponibilidad de 1 unidad, y las pruebas cubren invariantes importantes. Las preocupaciones menores son pequeñas peculiaridades de implementación como la lectura de client_count del mapa sin sincronización completa y algunas elecciones de límites en el desalojo de la ventana.
Integridad
Peso 20%Cubre casi todos los elementos solicitados: superficie de API requerida, algoritmo híbrido, reloj monótono inyectable, soporte de configuración por cliente, casos de borde explícitos, limpieza de clientes obsoletos, temporización fraccionaria, discusión de concurrencia, nota de complejidad y 8 pruebas significativas que incluyen contención concurrente determinista y desalojo. Esto está muy cerca de satisfacer completamente la solicitud.
Calidad del codigo
Peso 20%Bien estructurado e idiomático, con dataclasses, ayudantes separados, validación explícita y nombres claros. La estrategia de bloqueo está organizada de manera reflexiva y la nota de diseño es concisa. Quedan algunos bordes ásperos, como la inusual implementación de client_count y alguna gestión de bloqueo de bajo nivel que podría simplificarse.
Valor practico
Peso 15%Práctico para uso en backend: inyección de reloj determinista, limpieza de estado obsoleto acotada, rayado de bloqueos más bloqueos por cliente para contención, contabilidad de ventana exacta y pruebas sólidas lo hacen desplegable o adaptable. La implementación es realista y aborda casos de borde comunes de producción como pausas largas y suspensión de VM.
Seguimiento de instrucciones
Peso 10%Sigue la solicitud de cerca: la elección del idioma es válida, los nombres de la API coinciden, la nota de diseño está dentro del alcance, las pruebas utilizan el reloj inyectable y se abordan explícitamente los casos de borde solicitados. La respuesta se alinea bien tanto con los requisitos técnicos como de documentación.
Puntuacion total
Comentario general
La respuesta A es un limitador de tasa híbrido pulido y de calidad de producción. Utiliza dataclasses, bloqueos de stripe y bloqueos por cliente, integra la limpieza de elementos obsoletos en las llamadas allow (evitando problemas de hilos en segundo plano), valida la configuración, ajusta adecuadamente el tiempo transcurrido tanto para relojes inversos como para pausas largas, y rechaza cost>burst sin crear estado de cliente. Las 8 pruebas utilizan el ManualClock inyectable de forma determinista, incluida una prueba de contención concurrente que afirma genuinamente el invariante rate*T+burst. La nota de diseño es concisa y aborda todos los puntos requeridos.
Ver detalle de evaluacion ▼
Correccion
Peso 35%Implementa correctamente el cubo de tokens híbrido + registro de ventana deslizante exacto. Maneja el reloj inverso ajustando mediante max(raw_now, last_seen), rechaza cost>burst sin crear estado, ajusta el tiempo transcurrido para los tokens para evitar desbordamientos, utiliza bloqueos por cliente con bloqueos de stripe para el mapa. retry_after calcula correctamente el tiempo de espera basándose en la expiración del registro de la ventana. El manejo de eps y la contabilidad de eventos parecen sólidos.
Integridad
Peso 20%Cubre todos los casos extremos requeridos: cost>burst (sin creación de estado), reloj inverso (prueba incluida), pausas largas (prueba incluida), inicialización diferida, eliminación de elementos obsoletos (determinista a través de _maybe_cleanup vinculado al reloj inyectado), tokens fraccionarios. 8 pruebas, incluida la prueba de límite concurrente que afirma el invariante rate*T+burst de forma determinista utilizando ManualClock.
Calidad del codigo
Peso 20%Bien estructurado con dataclasses, separación de MonotonicClock/ManualClock, validación de ClientConfig, bloqueos por cliente y de stripe claramente explicados. La limpieza se integra en la ruta de allow (sin complicaciones de hilos en segundo plano). Nota de diseño concisa en la parte superior.
Valor practico
Peso 15%Calidad de producción: los bloqueos de stripe escalan mejor, la limpieza diferida integrada evita el ciclo de vida del hilo en segundo plano, soporte de pruebas deterministas, validación de configuración. Adecuado para incrustar en un servicio real.
Seguimiento de instrucciones
Peso 10%Sigue todas las instrucciones: la superficie de la API coincide con las firmas, algoritmo híbrido, bloqueos por cliente con bloqueos de stripe, reloj monotónico inyectable, todos los casos extremos, 8 pruebas, incluida la prueba de invariante concurrente determinista, nota de diseño con semántica y complejidad declaradas.