Resposta A: OpenAI GPT-5.5
import math import threading import time import unittest from collections import deque from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field # Design note: # This limiter admits a request only when two independent checks agree: # 1. a token bucket has at least cost tokens, allowing short bursts up to burst; # 2. an exact sliding-window log has room for cost more admitted units in the last # window_seconds, where the window limit is rate * window_seconds. # If the token bucket says yes but the sliding window is full, the request is # denied and no tokens are spent; if the w...
Mostrar resposta completa ▼
import math import threading import time import unittest from collections import deque from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field # Design note: # This limiter admits a request only when two independent checks agree: # 1. a token bucket has at least cost tokens, allowing short bursts up to burst; # 2. an exact sliding-window log has room for cost more admitted units in the last # window_seconds, where the window limit is rate * window_seconds. # If the token bucket says yes but the sliding window is full, the request is # denied and no tokens are spent; if the window has room but tokens are absent, # it is also denied. The exact log stores one entry per admitted request as # (timestamp, cost), not one entry per unit, so allow is amortized O(1): each log # entry is appended once and removed once. Memory is O(number of admitted requests # in the active window) per client. A weighted two-bucket counter would use O(1) # memory but can over- or under-admit near bucket boundaries; this implementation # chooses exactness because correctness is usually more important for abuse # prevention. The client map is lock-striped; each client state has its own lock. # A caller obtains the per-client lock before mutating tokens/window state, so # concurrent allow calls cannot double-spend tokens or lose window updates. class MonotonicClock: def now(self) -> float: return time.monotonic() class ManualClock: def __init__(self, start: float = 0.0): self._now = float(start) self._lock = threading.Lock() def now(self) -> float: with self._lock: return self._now def set(self, value: float) -> None: with self._lock: self._now = float(value) def advance(self, seconds: float) -> None: with self._lock: self._now += float(seconds) @dataclass(frozen=True) class ClientConfig: rate: float burst: float window_seconds: float = 1.0 def validated(self) -> 'ClientConfig': rate = float(self.rate) burst = float(self.burst) window = float(self.window_seconds) if not math.isfinite(rate) or rate <= 0: raise ValueError('rate must be a positive finite number') if not math.isfinite(burst) or burst < 1: raise ValueError('burst must be a finite number >= 1') if not math.isfinite(window) or window <= 0: raise ValueError('window_seconds must be a positive finite number') return ClientConfig(rate, burst, window) @property def window_limit(self) -> float: return self.rate * self.window_seconds @dataclass class _ClientState: config: ClientConfig tokens: float last_refill: float last_seen: float events: deque = field(default_factory=deque) window_sum: float = 0.0 lock: threading.Lock = field(default_factory=threading.Lock) class HybridRateLimiter: def __init__( self, rate: float, burst: float, window_seconds: float = 1.0, per_client_configs=None, clock=None, stripes: int = 64, idle_ttl_seconds: float | None = None, cleanup_interval_seconds: float | None = None, ): if stripes <= 0: raise ValueError('stripes must be positive') self._default_config = ClientConfig(rate, burst, window_seconds).validated() self._configs = {} if per_client_configs: for client_id, cfg in per_client_configs.items(): self._configs[client_id] = self._coerce_config(cfg) max_window = self._default_config.window_seconds for cfg in self._configs.values(): max_window = max(max_window, cfg.window_seconds) if idle_ttl_seconds is None: idle_ttl_seconds = max(60.0, 2.0 * max_window) idle_ttl_seconds = float(idle_ttl_seconds) if not math.isfinite(idle_ttl_seconds) or idle_ttl_seconds <= 0: raise ValueError('idle_ttl_seconds must be positive and finite') if idle_ttl_seconds < max_window: raise ValueError('idle_ttl_seconds must be >= the largest window_seconds') self._idle_ttl = idle_ttl_seconds if cleanup_interval_seconds is None: cleanup_interval_seconds = min(60.0, max(1.0, idle_ttl_seconds / 2.0)) cleanup_interval_seconds = float(cleanup_interval_seconds) if not math.isfinite(cleanup_interval_seconds) or cleanup_interval_seconds <= 0: raise ValueError('cleanup_interval_seconds must be positive and finite') self._cleanup_interval = cleanup_interval_seconds self._clock = clock if clock is not None else MonotonicClock() self._clients = {} self._stripe_locks = [threading.Lock() for _ in range(stripes)] self._cleanup_mutex = threading.Lock() self._last_cleanup = self._safe_time() self._eps = 1e-12 @staticmethod def _coerce_config(cfg) -> ClientConfig: if isinstance(cfg, ClientConfig): return cfg.validated() if isinstance(cfg, dict): return ClientConfig(cfg['rate'], cfg['burst'], cfg.get('window_seconds', 1.0)).validated() rate, burst, *rest = cfg window = rest[0] if rest else 1.0 return ClientConfig(rate, burst, window).validated() def _safe_time(self) -> float: t = float(self._clock.now()) if math.isnan(t): raise ValueError('clock returned NaN') return t def _config_for(self, client_id: str) -> ClientConfig: return self._configs.get(client_id, self._default_config) def _stripe_index(self, client_id: str) -> int: return hash(client_id) % len(self._stripe_locks) def _get_state_locked(self, client_id: str) -> _ClientState: idx = self._stripe_index(client_id) stripe = self._stripe_locks[idx] stripe.acquire() try: state = self._clients.get(client_id) if state is None: cfg = self._config_for(client_id) now = self._safe_time() state = _ClientState( config=cfg, tokens=cfg.burst, last_refill=now, last_seen=now, ) self._clients[client_id] = state state.lock.acquire() return state finally: stripe.release() def _advance_and_prune_locked(self, state: _ClientState) -> float: raw_now = self._safe_time() now = max(raw_now, state.last_seen) elapsed = max(0.0, now - state.last_refill) max_useful_elapsed = state.config.burst / state.config.rate elapsed_for_tokens = min(elapsed, max_useful_elapsed) if elapsed_for_tokens > 0: state.tokens = min( state.config.burst, state.tokens + elapsed_for_tokens * state.config.rate, ) state.last_refill = now state.last_seen = now cutoff = now - state.config.window_seconds while state.events and state.events[0][0] <= cutoff + self._eps: _, old_cost = state.events.popleft() state.window_sum -= old_cost if state.window_sum < self._eps: state.window_sum = 0.0 return now def allow(self, client_id: str, cost: int = 1) -> bool: if cost <= 0: raise ValueError('cost must be a positive integer') self._maybe_cleanup() cfg = self._config_for(client_id) if cost > cfg.burst: return False state = self._get_state_locked(client_id) try: now = self._advance_and_prune_locked(state) enough_tokens = state.tokens + self._eps >= cost enough_window = state.window_sum + cost <= state.config.window_limit + self._eps if not (enough_tokens and enough_window): return False state.tokens -= cost if state.tokens < self._eps: state.tokens = 0.0 state.window_sum += cost state.events.append((now, float(cost))) return True finally: state.lock.release() def retry_after(self, client_id: str) -> float: self._maybe_cleanup() state = self._get_state_locked(client_id) try: now = self._advance_and_prune_locked(state) cfg = state.config if cfg.burst < 1 or cfg.window_limit < 1: return math.inf token_wait = 0.0 if state.tokens + self._eps >= 1.0 else (1.0 - state.tokens) / cfg.rate if state.window_sum + 1.0 <= cfg.window_limit + self._eps: window_wait = 0.0 else: window_wait = math.inf remaining = state.window_sum for ts, event_cost in state.events: remaining -= event_cost if remaining + 1.0 <= cfg.window_limit + self._eps: window_wait = max(0.0, ts + cfg.window_seconds - now) break return max(token_wait, window_wait) finally: state.lock.release() def _maybe_cleanup(self) -> None: raw_now = self._safe_time() if raw_now - self._last_cleanup < self._cleanup_interval: return if not self._cleanup_mutex.acquire(blocking=False): return try: now = max(raw_now, self._last_cleanup) if now - self._last_cleanup >= self._cleanup_interval: self._cleanup_stale_at(now) self._last_cleanup = now finally: self._cleanup_mutex.release() def cleanup_stale(self) -> int: with self._cleanup_mutex: now = max(self._safe_time(), self._last_cleanup) removed = self._cleanup_stale_at(now) self._last_cleanup = now return removed def _cleanup_stale_at(self, now: float) -> int: removed = 0 for idx, stripe in enumerate(self._stripe_locks): with stripe: for client_id, state in list(self._clients.items()): if self._stripe_index(client_id) != idx: continue if not state.lock.acquire(blocking=False): continue try: if self._clients.get(client_id) is state and now - state.last_seen >= self._idle_ttl: del self._clients[client_id] removed += 1 finally: state.lock.release() return removed def client_count(self) -> int: total = 0 for stripe in self._stripe_locks: with stripe: total += len(self._clients) break return len(self._clients) class HybridRateLimiterTests(unittest.TestCase): def test_basic_allow_deny_and_retry_after(self): clock = ManualClock() limiter = HybridRateLimiter(rate=1, burst=2, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9) def test_burst_draining_and_fractional_refill(self): clock = ManualClock() limiter = HybridRateLimiter(rate=2, burst=3, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) clock.advance(0.25) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 0.25, places=9) clock.advance(0.25) self.assertTrue(limiter.allow('alice')) def test_sliding_window_caps_even_when_bucket_refilled(self): clock = ManualClock() limiter = HybridRateLimiter(rate=10, burst=10, window_seconds=1, clock=clock) for _ in range(10): self.assertTrue(limiter.allow('abuser')) clock.advance(0.5) self.assertFalse(limiter.allow('abuser')) self.assertAlmostEqual(limiter.retry_after('abuser'), 0.5, places=9) clock.advance(0.5) self.assertTrue(limiter.allow('abuser')) def test_cost_larger_than_burst_is_rejected_without_state_creation(self): clock = ManualClock() limiter = HybridRateLimiter(rate=10, burst=5, window_seconds=10, clock=clock) self.assertFalse(limiter.allow('alice', cost=6)) self.assertEqual(limiter.client_count(), 0) self.assertTrue(limiter.allow('alice', cost=5)) self.assertFalse(limiter.allow('alice')) def test_concurrent_contention_obeys_token_bucket_bound(self): clock = ManualClock() rate = 5 burst = 5 limiter = HybridRateLimiter(rate=rate, burst=burst, window_seconds=100, clock=clock, stripes=8) successes = 0 for tick in range(11): clock.set(tick / 10.0) with ThreadPoolExecutor(max_workers=32) as pool: results = list(pool.map(lambda _: limiter.allow('hot'), range(32))) successes += sum(1 for r in results if r) bound = rate * 1.0 + burst self.assertLessEqual(successes, bound) def test_stale_client_eviction(self): clock = ManualClock() limiter = HybridRateLimiter( rate=10, burst=10, window_seconds=1, clock=clock, idle_ttl_seconds=5, cleanup_interval_seconds=1, ) self.assertTrue(limiter.allow('old-client')) self.assertEqual(limiter.client_count(), 1) clock.advance(6) self.assertTrue(limiter.allow('new-client')) self.assertEqual(limiter.client_count(), 1) def test_clock_going_backwards_is_clamped(self): clock = ManualClock(10) limiter = HybridRateLimiter(rate=1, burst=1, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) clock.set(5) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9) def test_large_pause_refills_only_to_burst(self): clock = ManualClock() limiter = HybridRateLimiter(rate=1, burst=3, window_seconds=1000, clock=clock) for _ in range(3): self.assertTrue(limiter.allow('alice')) clock.advance(10 ** 9) for _ in range(3): self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) if __name__ == '__main__': unittest.main()
Resultado
Votos de vitoria
3 / 3
Pontuacao media
Pontuacao total
Comentario geral
A Resposta A fornece uma implementação sofisticada, robusta e de qualidade de produção do limitador de taxa híbrido. Seus principais pontos fortes são o modelo de concorrência altamente escalável usando 'lock striping' combinado com bloqueios por cliente, um mecanismo de limpeza elegante que evita uma thread em segundo plano e um conjunto de testes abrangente e bem projetado que valida corretamente comportamentos complexos como contenção concorrente. O código é limpo, bem estruturado e adere totalmente a todos os requisitos do prompt, incluindo recursos avançados como configurações por cliente.
Ver detalhes da avaliacao ▼
Correcao
Peso 35%A implementação é altamente correta e robusta. O modelo de concorrência usando 'lock striping' e bloqueios por cliente é uma maneira sofisticada e correta de evitar gargalos. Todos os casos de borda, incluindo segurança de relógio e recargas fracionárias, são tratados corretamente. A lógica tanto para o 'token bucket' quanto para a janela deslizante é sólida.
Completude
Peso 20%Esta resposta é excepcionalmente completa. Ela implementa todos os métodos de API necessários, lida com todos os casos de borda especificados, fornece um conjunto de testes abrangente com mais do que o número exigido de testes e inclui uma nota de design clara. Ela também implementa a configuração sugerida por cliente, demonstrando uma compreensão completa do prompt.
Qualidade do codigo
Peso 20%A qualidade do código é excelente. Ele usa recursos modernos do Python, como 'dataclasses', de forma eficaz, é bem estruturado em métodos lógicos e é altamente legível. As escolhas de design, como o mecanismo de limpeza integrado e o 'lock striping', são elegantes e demonstram um alto nível de habilidade de engenharia.
Valor pratico
Peso 15%A implementação é de qualidade de produção e altamente prática. O modelo de concorrência escalável o torna adequado para sistemas de alto rendimento. A configuração flexível e o tratamento robusto de casos de borda significam que ele pode ser implantado com confiança em uma aplicação do mundo real.
Seguimento de instrucoes
Peso 10%A resposta segue todas as instruções perfeitamente. Ela fornece código em uma linguagem solicitada, implementa a API especificada, o algoritmo e a estratégia de concorrência, lida com todos os casos de borda, inclui os testes necessários e a análise de complexidade, e fornece uma nota de design dentro das restrições especificadas.
Pontuacao total
Comentario geral
A Resposta A é uma implementação robusta e, em grande parte, completa que corresponde diretamente à API e à semântica solicitadas. Ela combina um balde de tokens com um log de janela deslizante exato, utiliza um relógio monotônico injetável, evita um gargalo global único através de bloqueios de mapa listrados mais bloqueios por cliente e inclui testes determinísticos sólidos que cobrem os casos de borda necessários. A nota de design é concisa e aborda os trade-offs, a correção da concorrência e a complexidade. As fraquezas menores incluem uma implementação de client_count um tanto estranha e retry_after que raciocina apenas sobre a disponibilidade de 1 unidade em vez de um custo arbitrário, embora isso corresponda à API solicitada.
Ver detalhes da avaliacao ▼
Correcao
Peso 35%Implementa o híbrido necessário corretamente: a solicitação só passa se o balde de tokens e a janela deslizante exata permitirem, com recarga adequada de tokens, inicialização preguiçosa, limpeza de obsoletos e mutação serializada por cliente evitando gasto duplo. retry_after é coerente para disponibilidade de 1 unidade, e os testes cobrem invariantes importantes. Preocupações menores são pequenas peculiaridades de implementação, como a leitura de client_count do mapa sem sincronização completa e algumas escolhas de limite na evicção da janela.
Completude
Peso 20%Cobre quase todos os itens solicitados: superfície de API necessária, algoritmo híbrido, relógio monotônico injetável, suporte à configuração por cliente, casos de borda explícitos, limpeza de cliente obsoleto, tempo fracionário, discussão de concorrência, nota de complexidade e 8 testes significativos, incluindo contenção e evicção concorrentes determinísticas. Isso está muito perto de satisfazer completamente o prompt.
Qualidade do codigo
Peso 20%Bem estruturado e idiomático, com dataclasses, helpers separados, validação explícita e nomenclatura clara. A estratégia de bloqueio é organizada de forma atenciosa e a nota de design é concisa. Algumas arestas ásperas permanecem, como a implementação incomum de client_count e algum gerenciamento de bloqueio de baixo nível que poderia ser simplificado.
Valor pratico
Peso 15%Prático para uso em backend: injeção de relógio determinístico, limpeza de estado obsoleto limitada, bloqueio de lista mais bloqueios por cliente para contenção, contabilidade exata da janela e testes sólidos o tornam implantável ou adaptável. A implementação é realista e aborda casos de borda comuns de produção, como longas pausas e suspensão de VM.
Seguimento de instrucoes
Peso 10%Segue o prompt de perto: a escolha da linguagem é válida, os nomes da API correspondem, a nota de design está dentro do escopo, os testes usam o relógio injetável e os casos de borda solicitados são abordados explicitamente. A resposta se alinha bem com os requisitos técnicos e de documentação.
Pontuacao total
Comentario geral
A é um limitador de taxa híbrido polido e de qualidade de produção. Ele usa dataclasses, travas de stripe mais travas por cliente, integra a limpeza de itens obsoletos nas chamadas de permissão (evitando problemas de thread em segundo plano), valida a configuração, limita adequadamente o tempo decorrido tanto para relógios invertidos quanto para pausas longas e rejeita custo>burst sem criar estado de cliente. Os 8 testes usam o ManualClock injetável de forma determinística, incluindo um teste de contenção concorrente que realmente afirma o invariante rate*T+burst. A nota de design é concisa e aborda todos os pontos necessários.
Ver detalhes da avaliacao ▼
Correcao
Peso 35%Implementa corretamente o balde de tokens híbrido + log de janela deslizante exato. Lida com relógio invertido limitando via max(raw_now, last_seen), rejeita custo>burst sem criar estado, limita o tempo decorrido para tokens para evitar estouro, usa travas por cliente com travas de stripe para o mapa. retry_after calcula corretamente o tempo de espera com base na expiração do log da janela. O tratamento de eps e a contagem de eventos parecem sólidos.
Completude
Peso 20%Cobre todos os casos de borda necessários: custo>burst (sem criação de estado), relógio invertido (teste incluído), pausas longas (teste incluído), inicialização preguiçosa, remoção de itens obsoletos (determinística via _maybe_cleanup ligada ao relógio injetado), tokens fracionários. 8 testes incluindo teste de limite concorrente que afirma o invariante rate*T+burst deterministicamente usando ManualClock.
Qualidade do codigo
Peso 20%Bem estruturado com dataclasses, separação de MonotonicClock/ManualClock, validação de ClientConfig, travas por cliente e de stripe claramente explicadas. A limpeza é integrada ao caminho de permissão (sem complicações de thread em segundo plano). Nota de design concisa no topo.
Valor pratico
Peso 15%Qualidade de produção: travas de stripe escalam melhor, limpeza preguiçosa integrada evita ciclo de vida de thread em segundo plano, suporte a testes determinísticos, validação de configuração. Adequado para incorporação em um serviço real.
Seguimento de instrucoes
Peso 10%Segue todas as instruções: a superfície da API corresponde às assinaturas, algoritmo híbrido, travas por cliente com travas de stripe, relógio monotônico injetável, todos os casos de borda, 8 testes incluindo teste invariante concorrente determinístico, nota de design com semântica e complexidade declaradas.