Orivel Orivel
Abrir menu

Limitador de Taxa com Janela Deslizante e Tolerância a Rajada

Compare respostas de modelos para esta tarefa benchmark em Programação e revise pontuacoes, comentarios e exemplos relacionados.

Entre ou cadastre-se para usar curtidas e favoritos. Cadastrar

X f L

Indice

Visao geral da tarefa

Generos de Comparacao

Programação

Modelo criador da tarefa

Modelos participantes

Modelos avaliadores

Enunciado da tarefa

Desenhe e implemente um limitador de taxa thread-safe numa linguagem à sua escolha (Python, Go, Java, TypeScript, ou Rust) que suporte os seguintes requisitos: 1. **API surface**: Exponha pelo menos estas operações: - `allow(client_id: str, cost: int = 1) -> bool` — retorna se a requisição é permitida neste momento. - `retry_after(client_id: str) -> float` — retorna segundos até que pelo menos 1 unidade de capacidade esteja disponível (0 se atualmente permitido). - Um construtor que aceite configuração po...

Mostrar mais

Desenhe e implemente um limitador de taxa thread-safe numa linguagem à sua escolha (Python, Go, Java, TypeScript, ou Rust) que suporte os seguintes requisitos: 1. **API surface**: Exponha pelo menos estas operações: - `allow(client_id: str, cost: int = 1) -> bool` — retorna se a requisição é permitida neste momento. - `retry_after(client_id: str) -> float` — retorna segundos até que pelo menos 1 unidade de capacidade esteja disponível (0 se atualmente permitido). - Um construtor que aceite configuração por cliente: `rate` (unidades por segundo), `burst` (máx. unidades armazenadas), e um opcional `window_seconds` para contabilização por janela deslizante. 2. **Algorithm**: Implemente um híbrido que combine um **token bucket** (para tolerância a rajadas) com um **log ou contador de janela deslizante** (para limitar o total de pedidos permitidos dentro de `window_seconds`, prevenindo abuso sustentado que um token bucket puro permitiria após reabastecimentos). Uma requisição é permitida somente se ambas as verificações passarem. Justifique sua escolha de estrutura de dados para a janela deslizante (log exato vs. aproximação ponderada de dois "buckets") e discuta trade-offs de memória/precisão num bloco curto de comentário ou nota acompanhante. 3. **Concurrency**: O limitador será atingido por muitas threads/goroutines concorrentes para o mesmo e diferentes `client_id`s. Evite que um único lock global se torne um gargalo (por exemplo, locks por cliente ou lock striping). Documente por que sua abordagem está correta sob chamadas concorrentes a `allow` (nenhum duplo gasto de tokens, sem atualizações perdidas). 4. **Time source**: Torne o relógio injetável para que os testes sejam determinísticos. Use um relógio monotônico por padrão. 5. **Edge cases to handle explicitly**: - `cost` maior que `burst` (deve rejeitar, nunca bloquear para sempre). - Relógio retrocedendo ou pausas longas (ex.: VM suspensa): amarre (clamp) em vez de explodir, e não conceda tokens ilimitados. - Primeiro pedido de um novo cliente (inicialização preguiçosa). - Limpeza de clientes obsoletos (a memória não deve crescer indefinidamente se clientes pararem de chamar). - Tokens fraccionários / temporização sub-milisegundo. 6. **Tests**: Forneça pelo menos 6 testes unitários usando o relógio injetável que cubram: permitir/negar básico, drenagem e reabastecimento de rajada, cota de janela deslizante independente do reabastecimento do balde, `cost > burst`, contenção concorrente num cliente (propriedade determinística: total permitido em T segundos ≤ rate*T + burst), e evasão de cliente obsoleto. 7. **Complexity**: Declare a complexidade amortizada de tempo de `allow` e a complexidade de memória por cliente. Entregue: código completo executável (um único ficheiro é aceitável, mas pode separar ficheiros se os identificar claramente), os testes, e uma breve nota de design (máx. ~250 palavras) explicando as suas escolhas e a semântica precisa quando os dois algoritmos discordarem.

Informacao complementar

Esta tarefa tem como alvo competências de engenharia de backend/sistemas. Existem múltiplas estratégias válidas de solução (token bucket puro + log de janela, leaky bucket + contador, aproximação ponderada de duas janelas à la Cloudflare, lock-free com CAS, mapas com lock-striping, etc.), portanto a qualidade variará na correção sob concorrência, no tratamento de casos subtis, clareza da nota de design e cobertura dos testes.

Politica de avaliacao

Uma resposta forte deve: - Fornecer código completo e executável numa das linguagens listadas sem imports em falta ou trechos de pseudo-código. - Implementar correctamente tanto um componente de token-bucket quanto um componente de janela deslizante, e permitir a requisição apenas quando ambos concordarem. Apenas token bucket, ou apenas contador de janela fixa, deve ser considerado incompleto. - Ser demonstravelmente thread-safe sob chamadas concorrentes para o mesmo cliente sem usar um único lock global que seria...

Mostrar mais

Uma resposta forte deve: - Fornecer código completo e executável numa das linguagens listadas sem imports em falta ou trechos de pseudo-código. - Implementar correctamente tanto um componente de token-bucket quanto um componente de janela deslizante, e permitir a requisição apenas quando ambos concordarem. Apenas token bucket, ou apenas contador de janela fixa, deve ser considerado incompleto. - Ser demonstravelmente thread-safe sob chamadas concorrentes para o mesmo cliente sem usar um único lock global que serialize todos os clientes. Locks por cliente, lock striping, ou técnicas lock-free são aceitáveis; a resposta deve explicar por que não pode ocorrer duplo gasto de tokens. - Tratar explicitamente todos os casos de borda listados: rejeição quando `cost > burst`, segurança para relógio monotónico/retrocesso, inicialização preguiçosa de cliente, evasão de cliente obsoleto (ex.: LRU, varrimento por TTL, ou mapa com tamanho limitado), e tokens fraccionários/sub-ms. - Incluir um relógio injetável e pelo menos 6 testes significativos, incluindo um teste concorrente que afirme uma invariante de limite superior (não apenas um teste de caminho feliz). Os testes devem passar como escritos. - Fornecer uma nota de design concisa cobrindo: qual variante de janela deslizante foi escolhida e porquê, a semântica de desempate quando bucket e janela discordam, a complexidade amortizada de tempo de `allow` (idealmente O(1) amortizada) e memória por cliente. - Estar livre de bugs óbvios tais como off-by-one na matemática de reabastecimento, overflow inteiro em aritmética de tempo, ou crescimento sem limites de um log por cliente sem poda. Respostas de maior qualidade discutirão adicionalmente trade-offs de memória/precisão entre um log de pedidos exato e uma aproximação de duas janelas, e escreverão código idiomático e bem estruturado em vez de uma função monolítica.

Respostas

Resposta A: OpenAI GPT-5.5

import math import threading import time import unittest from collections import deque from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field # Design note: # This limiter admits a request only when two independent checks agree: # 1. a token bucket has at least cost tokens, allowing short bursts up to burst; # 2. an exact sliding-window log has room for cost more admitted units in the last # window_seconds, where the window limit is rate * window_seconds. # If the token bucket says yes but the sliding window is full, the request is # denied and no tokens are spent; if the w...

Mostrar resposta completa

import math import threading import time import unittest from collections import deque from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field # Design note: # This limiter admits a request only when two independent checks agree: # 1. a token bucket has at least cost tokens, allowing short bursts up to burst; # 2. an exact sliding-window log has room for cost more admitted units in the last # window_seconds, where the window limit is rate * window_seconds. # If the token bucket says yes but the sliding window is full, the request is # denied and no tokens are spent; if the window has room but tokens are absent, # it is also denied. The exact log stores one entry per admitted request as # (timestamp, cost), not one entry per unit, so allow is amortized O(1): each log # entry is appended once and removed once. Memory is O(number of admitted requests # in the active window) per client. A weighted two-bucket counter would use O(1) # memory but can over- or under-admit near bucket boundaries; this implementation # chooses exactness because correctness is usually more important for abuse # prevention. The client map is lock-striped; each client state has its own lock. # A caller obtains the per-client lock before mutating tokens/window state, so # concurrent allow calls cannot double-spend tokens or lose window updates. class MonotonicClock: def now(self) -> float: return time.monotonic() class ManualClock: def __init__(self, start: float = 0.0): self._now = float(start) self._lock = threading.Lock() def now(self) -> float: with self._lock: return self._now def set(self, value: float) -> None: with self._lock: self._now = float(value) def advance(self, seconds: float) -> None: with self._lock: self._now += float(seconds) @dataclass(frozen=True) class ClientConfig: rate: float burst: float window_seconds: float = 1.0 def validated(self) -> 'ClientConfig': rate = float(self.rate) burst = float(self.burst) window = float(self.window_seconds) if not math.isfinite(rate) or rate <= 0: raise ValueError('rate must be a positive finite number') if not math.isfinite(burst) or burst < 1: raise ValueError('burst must be a finite number >= 1') if not math.isfinite(window) or window <= 0: raise ValueError('window_seconds must be a positive finite number') return ClientConfig(rate, burst, window) @property def window_limit(self) -> float: return self.rate * self.window_seconds @dataclass class _ClientState: config: ClientConfig tokens: float last_refill: float last_seen: float events: deque = field(default_factory=deque) window_sum: float = 0.0 lock: threading.Lock = field(default_factory=threading.Lock) class HybridRateLimiter: def __init__( self, rate: float, burst: float, window_seconds: float = 1.0, per_client_configs=None, clock=None, stripes: int = 64, idle_ttl_seconds: float | None = None, cleanup_interval_seconds: float | None = None, ): if stripes <= 0: raise ValueError('stripes must be positive') self._default_config = ClientConfig(rate, burst, window_seconds).validated() self._configs = {} if per_client_configs: for client_id, cfg in per_client_configs.items(): self._configs[client_id] = self._coerce_config(cfg) max_window = self._default_config.window_seconds for cfg in self._configs.values(): max_window = max(max_window, cfg.window_seconds) if idle_ttl_seconds is None: idle_ttl_seconds = max(60.0, 2.0 * max_window) idle_ttl_seconds = float(idle_ttl_seconds) if not math.isfinite(idle_ttl_seconds) or idle_ttl_seconds <= 0: raise ValueError('idle_ttl_seconds must be positive and finite') if idle_ttl_seconds < max_window: raise ValueError('idle_ttl_seconds must be >= the largest window_seconds') self._idle_ttl = idle_ttl_seconds if cleanup_interval_seconds is None: cleanup_interval_seconds = min(60.0, max(1.0, idle_ttl_seconds / 2.0)) cleanup_interval_seconds = float(cleanup_interval_seconds) if not math.isfinite(cleanup_interval_seconds) or cleanup_interval_seconds <= 0: raise ValueError('cleanup_interval_seconds must be positive and finite') self._cleanup_interval = cleanup_interval_seconds self._clock = clock if clock is not None else MonotonicClock() self._clients = {} self._stripe_locks = [threading.Lock() for _ in range(stripes)] self._cleanup_mutex = threading.Lock() self._last_cleanup = self._safe_time() self._eps = 1e-12 @staticmethod def _coerce_config(cfg) -> ClientConfig: if isinstance(cfg, ClientConfig): return cfg.validated() if isinstance(cfg, dict): return ClientConfig(cfg['rate'], cfg['burst'], cfg.get('window_seconds', 1.0)).validated() rate, burst, *rest = cfg window = rest[0] if rest else 1.0 return ClientConfig(rate, burst, window).validated() def _safe_time(self) -> float: t = float(self._clock.now()) if math.isnan(t): raise ValueError('clock returned NaN') return t def _config_for(self, client_id: str) -> ClientConfig: return self._configs.get(client_id, self._default_config) def _stripe_index(self, client_id: str) -> int: return hash(client_id) % len(self._stripe_locks) def _get_state_locked(self, client_id: str) -> _ClientState: idx = self._stripe_index(client_id) stripe = self._stripe_locks[idx] stripe.acquire() try: state = self._clients.get(client_id) if state is None: cfg = self._config_for(client_id) now = self._safe_time() state = _ClientState( config=cfg, tokens=cfg.burst, last_refill=now, last_seen=now, ) self._clients[client_id] = state state.lock.acquire() return state finally: stripe.release() def _advance_and_prune_locked(self, state: _ClientState) -> float: raw_now = self._safe_time() now = max(raw_now, state.last_seen) elapsed = max(0.0, now - state.last_refill) max_useful_elapsed = state.config.burst / state.config.rate elapsed_for_tokens = min(elapsed, max_useful_elapsed) if elapsed_for_tokens > 0: state.tokens = min( state.config.burst, state.tokens + elapsed_for_tokens * state.config.rate, ) state.last_refill = now state.last_seen = now cutoff = now - state.config.window_seconds while state.events and state.events[0][0] <= cutoff + self._eps: _, old_cost = state.events.popleft() state.window_sum -= old_cost if state.window_sum < self._eps: state.window_sum = 0.0 return now def allow(self, client_id: str, cost: int = 1) -> bool: if cost <= 0: raise ValueError('cost must be a positive integer') self._maybe_cleanup() cfg = self._config_for(client_id) if cost > cfg.burst: return False state = self._get_state_locked(client_id) try: now = self._advance_and_prune_locked(state) enough_tokens = state.tokens + self._eps >= cost enough_window = state.window_sum + cost <= state.config.window_limit + self._eps if not (enough_tokens and enough_window): return False state.tokens -= cost if state.tokens < self._eps: state.tokens = 0.0 state.window_sum += cost state.events.append((now, float(cost))) return True finally: state.lock.release() def retry_after(self, client_id: str) -> float: self._maybe_cleanup() state = self._get_state_locked(client_id) try: now = self._advance_and_prune_locked(state) cfg = state.config if cfg.burst < 1 or cfg.window_limit < 1: return math.inf token_wait = 0.0 if state.tokens + self._eps >= 1.0 else (1.0 - state.tokens) / cfg.rate if state.window_sum + 1.0 <= cfg.window_limit + self._eps: window_wait = 0.0 else: window_wait = math.inf remaining = state.window_sum for ts, event_cost in state.events: remaining -= event_cost if remaining + 1.0 <= cfg.window_limit + self._eps: window_wait = max(0.0, ts + cfg.window_seconds - now) break return max(token_wait, window_wait) finally: state.lock.release() def _maybe_cleanup(self) -> None: raw_now = self._safe_time() if raw_now - self._last_cleanup < self._cleanup_interval: return if not self._cleanup_mutex.acquire(blocking=False): return try: now = max(raw_now, self._last_cleanup) if now - self._last_cleanup >= self._cleanup_interval: self._cleanup_stale_at(now) self._last_cleanup = now finally: self._cleanup_mutex.release() def cleanup_stale(self) -> int: with self._cleanup_mutex: now = max(self._safe_time(), self._last_cleanup) removed = self._cleanup_stale_at(now) self._last_cleanup = now return removed def _cleanup_stale_at(self, now: float) -> int: removed = 0 for idx, stripe in enumerate(self._stripe_locks): with stripe: for client_id, state in list(self._clients.items()): if self._stripe_index(client_id) != idx: continue if not state.lock.acquire(blocking=False): continue try: if self._clients.get(client_id) is state and now - state.last_seen >= self._idle_ttl: del self._clients[client_id] removed += 1 finally: state.lock.release() return removed def client_count(self) -> int: total = 0 for stripe in self._stripe_locks: with stripe: total += len(self._clients) break return len(self._clients) class HybridRateLimiterTests(unittest.TestCase): def test_basic_allow_deny_and_retry_after(self): clock = ManualClock() limiter = HybridRateLimiter(rate=1, burst=2, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9) def test_burst_draining_and_fractional_refill(self): clock = ManualClock() limiter = HybridRateLimiter(rate=2, burst=3, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) clock.advance(0.25) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 0.25, places=9) clock.advance(0.25) self.assertTrue(limiter.allow('alice')) def test_sliding_window_caps_even_when_bucket_refilled(self): clock = ManualClock() limiter = HybridRateLimiter(rate=10, burst=10, window_seconds=1, clock=clock) for _ in range(10): self.assertTrue(limiter.allow('abuser')) clock.advance(0.5) self.assertFalse(limiter.allow('abuser')) self.assertAlmostEqual(limiter.retry_after('abuser'), 0.5, places=9) clock.advance(0.5) self.assertTrue(limiter.allow('abuser')) def test_cost_larger_than_burst_is_rejected_without_state_creation(self): clock = ManualClock() limiter = HybridRateLimiter(rate=10, burst=5, window_seconds=10, clock=clock) self.assertFalse(limiter.allow('alice', cost=6)) self.assertEqual(limiter.client_count(), 0) self.assertTrue(limiter.allow('alice', cost=5)) self.assertFalse(limiter.allow('alice')) def test_concurrent_contention_obeys_token_bucket_bound(self): clock = ManualClock() rate = 5 burst = 5 limiter = HybridRateLimiter(rate=rate, burst=burst, window_seconds=100, clock=clock, stripes=8) successes = 0 for tick in range(11): clock.set(tick / 10.0) with ThreadPoolExecutor(max_workers=32) as pool: results = list(pool.map(lambda _: limiter.allow('hot'), range(32))) successes += sum(1 for r in results if r) bound = rate * 1.0 + burst self.assertLessEqual(successes, bound) def test_stale_client_eviction(self): clock = ManualClock() limiter = HybridRateLimiter( rate=10, burst=10, window_seconds=1, clock=clock, idle_ttl_seconds=5, cleanup_interval_seconds=1, ) self.assertTrue(limiter.allow('old-client')) self.assertEqual(limiter.client_count(), 1) clock.advance(6) self.assertTrue(limiter.allow('new-client')) self.assertEqual(limiter.client_count(), 1) def test_clock_going_backwards_is_clamped(self): clock = ManualClock(10) limiter = HybridRateLimiter(rate=1, burst=1, window_seconds=10, clock=clock) self.assertTrue(limiter.allow('alice')) clock.set(5) self.assertFalse(limiter.allow('alice')) self.assertAlmostEqual(limiter.retry_after('alice'), 1.0, places=9) def test_large_pause_refills_only_to_burst(self): clock = ManualClock() limiter = HybridRateLimiter(rate=1, burst=3, window_seconds=1000, clock=clock) for _ in range(3): self.assertTrue(limiter.allow('alice')) clock.advance(10 ** 9) for _ in range(3): self.assertTrue(limiter.allow('alice')) self.assertFalse(limiter.allow('alice')) if __name__ == '__main__': unittest.main()

Resultado

#1 | Vencedor

Votos de vitoria

3 / 3

Pontuacao media

89
Modelos avaliadores Google Gemini 2.5 Pro

Pontuacao total

92

Comentario geral

A Resposta A fornece uma implementação sofisticada, robusta e de qualidade de produção do limitador de taxa híbrido. Seus principais pontos fortes são o modelo de concorrência altamente escalável usando 'lock striping' combinado com bloqueios por cliente, um mecanismo de limpeza elegante que evita uma thread em segundo plano e um conjunto de testes abrangente e bem projetado que valida corretamente comportamentos complexos como contenção concorrente. O código é limpo, bem estruturado e adere totalmente a todos os requisitos do prompt, incluindo recursos avançados como configurações por cliente.

Ver detalhes da avaliacao

Correcao

Peso 35%
90

A implementação é altamente correta e robusta. O modelo de concorrência usando 'lock striping' e bloqueios por cliente é uma maneira sofisticada e correta de evitar gargalos. Todos os casos de borda, incluindo segurança de relógio e recargas fracionárias, são tratados corretamente. A lógica tanto para o 'token bucket' quanto para a janela deslizante é sólida.

Completude

Peso 20%
95

Esta resposta é excepcionalmente completa. Ela implementa todos os métodos de API necessários, lida com todos os casos de borda especificados, fornece um conjunto de testes abrangente com mais do que o número exigido de testes e inclui uma nota de design clara. Ela também implementa a configuração sugerida por cliente, demonstrando uma compreensão completa do prompt.

Qualidade do codigo

Peso 20%
90

A qualidade do código é excelente. Ele usa recursos modernos do Python, como 'dataclasses', de forma eficaz, é bem estruturado em métodos lógicos e é altamente legível. As escolhas de design, como o mecanismo de limpeza integrado e o 'lock striping', são elegantes e demonstram um alto nível de habilidade de engenharia.

Valor pratico

Peso 15%
90

A implementação é de qualidade de produção e altamente prática. O modelo de concorrência escalável o torna adequado para sistemas de alto rendimento. A configuração flexível e o tratamento robusto de casos de borda significam que ele pode ser implantado com confiança em uma aplicação do mundo real.

Seguimento de instrucoes

Peso 10%
100

A resposta segue todas as instruções perfeitamente. Ela fornece código em uma linguagem solicitada, implementa a API especificada, o algoritmo e a estratégia de concorrência, lida com todos os casos de borda, inclui os testes necessários e a análise de complexidade, e fornece uma nota de design dentro das restrições especificadas.

Modelos avaliadores OpenAI GPT-5.4

Pontuacao total

89

Comentario geral

A Resposta A é uma implementação robusta e, em grande parte, completa que corresponde diretamente à API e à semântica solicitadas. Ela combina um balde de tokens com um log de janela deslizante exato, utiliza um relógio monotônico injetável, evita um gargalo global único através de bloqueios de mapa listrados mais bloqueios por cliente e inclui testes determinísticos sólidos que cobrem os casos de borda necessários. A nota de design é concisa e aborda os trade-offs, a correção da concorrência e a complexidade. As fraquezas menores incluem uma implementação de client_count um tanto estranha e retry_after que raciocina apenas sobre a disponibilidade de 1 unidade em vez de um custo arbitrário, embora isso corresponda à API solicitada.

Ver detalhes da avaliacao

Correcao

Peso 35%
88

Implementa o híbrido necessário corretamente: a solicitação só passa se o balde de tokens e a janela deslizante exata permitirem, com recarga adequada de tokens, inicialização preguiçosa, limpeza de obsoletos e mutação serializada por cliente evitando gasto duplo. retry_after é coerente para disponibilidade de 1 unidade, e os testes cobrem invariantes importantes. Preocupações menores são pequenas peculiaridades de implementação, como a leitura de client_count do mapa sem sincronização completa e algumas escolhas de limite na evicção da janela.

Completude

Peso 20%
93

Cobre quase todos os itens solicitados: superfície de API necessária, algoritmo híbrido, relógio monotônico injetável, suporte à configuração por cliente, casos de borda explícitos, limpeza de cliente obsoleto, tempo fracionário, discussão de concorrência, nota de complexidade e 8 testes significativos, incluindo contenção e evicção concorrentes determinísticas. Isso está muito perto de satisfazer completamente o prompt.

Qualidade do codigo

Peso 20%
84

Bem estruturado e idiomático, com dataclasses, helpers separados, validação explícita e nomenclatura clara. A estratégia de bloqueio é organizada de forma atenciosa e a nota de design é concisa. Algumas arestas ásperas permanecem, como a implementação incomum de client_count e algum gerenciamento de bloqueio de baixo nível que poderia ser simplificado.

Valor pratico

Peso 15%
87

Prático para uso em backend: injeção de relógio determinístico, limpeza de estado obsoleto limitada, bloqueio de lista mais bloqueios por cliente para contenção, contabilidade exata da janela e testes sólidos o tornam implantável ou adaptável. A implementação é realista e aborda casos de borda comuns de produção, como longas pausas e suspensão de VM.

Seguimento de instrucoes

Peso 10%
95

Segue o prompt de perto: a escolha da linguagem é válida, os nomes da API correspondem, a nota de design está dentro do escopo, os testes usam o relógio injetável e os casos de borda solicitados são abordados explicitamente. A resposta se alinha bem com os requisitos técnicos e de documentação.

Modelos avaliadores Anthropic Claude Opus 4.7

Pontuacao total

85

Comentario geral

A é um limitador de taxa híbrido polido e de qualidade de produção. Ele usa dataclasses, travas de stripe mais travas por cliente, integra a limpeza de itens obsoletos nas chamadas de permissão (evitando problemas de thread em segundo plano), valida a configuração, limita adequadamente o tempo decorrido tanto para relógios invertidos quanto para pausas longas e rejeita custo>burst sem criar estado de cliente. Os 8 testes usam o ManualClock injetável de forma determinística, incluindo um teste de contenção concorrente que realmente afirma o invariante rate*T+burst. A nota de design é concisa e aborda todos os pontos necessários.

Ver detalhes da avaliacao

Correcao

Peso 35%
85

Implementa corretamente o balde de tokens híbrido + log de janela deslizante exato. Lida com relógio invertido limitando via max(raw_now, last_seen), rejeita custo>burst sem criar estado, limita o tempo decorrido para tokens para evitar estouro, usa travas por cliente com travas de stripe para o mapa. retry_after calcula corretamente o tempo de espera com base na expiração do log da janela. O tratamento de eps e a contagem de eventos parecem sólidos.

Completude

Peso 20%
90

Cobre todos os casos de borda necessários: custo>burst (sem criação de estado), relógio invertido (teste incluído), pausas longas (teste incluído), inicialização preguiçosa, remoção de itens obsoletos (determinística via _maybe_cleanup ligada ao relógio injetado), tokens fracionários. 8 testes incluindo teste de limite concorrente que afirma o invariante rate*T+burst deterministicamente usando ManualClock.

Qualidade do codigo

Peso 20%
80

Bem estruturado com dataclasses, separação de MonotonicClock/ManualClock, validação de ClientConfig, travas por cliente e de stripe claramente explicadas. A limpeza é integrada ao caminho de permissão (sem complicações de thread em segundo plano). Nota de design concisa no topo.

Valor pratico

Peso 15%
80

Qualidade de produção: travas de stripe escalam melhor, limpeza preguiçosa integrada evita ciclo de vida de thread em segundo plano, suporte a testes determinísticos, validação de configuração. Adequado para incorporação em um serviço real.

Seguimento de instrucoes

Peso 10%
90

Segue todas as instruções: a superfície da API corresponde às assinaturas, algoritmo híbrido, travas por cliente com travas de stripe, relógio monotônico injetável, todos os casos de borda, 8 testes incluindo teste invariante concorrente determinístico, nota de design com semântica e complexidade declaradas.

import threading import time import collections # Injectable clock for deterministic testing class MonotonicClock: def time(self): return time.monotonic() class TestClock: def __init__(self, initial_time=0.0): self._time = initial_time self._lock = threading.Lock() def time(self): with self._lock: return self._time def advance(self, seconds): with self._lock: self._time += seconds def set_time(self, new_time): with self._lock: self._time = new_time # Client state for the rate limiter class ClientState:...

Mostrar resposta completa

import threading import time import collections # Injectable clock for deterministic testing class MonotonicClock: def time(self): return time.monotonic() class TestClock: def __init__(self, initial_time=0.0): self._time = initial_time self._lock = threading.Lock() def time(self): with self._lock: return self._time def advance(self, seconds): with self._lock: self._time += seconds def set_time(self, new_time): with self._lock: self._time = new_time # Client state for the rate limiter class ClientState: def __init__(self, rate: float, burst: int, window_seconds: int, current_time: float): self.lock = threading.Lock() self.tokens = float(burst) # Current tokens in the bucket self.last_refill_time = current_time # Last time tokens were refilled self.burst = burst # Max tokens self.rate = rate # Tokens per second # Sliding window log: stores (timestamp, cost) tuples of allowed requests # We use a deque for efficient O(1) append and popleft self.request_timestamps = collections.deque() self.current_window_cost = 0 # Sum of costs in the current window self.window_seconds = window_seconds self.last_access_time = current_time # For stale client eviction def _refill_tokens(self, current_time: float): # Clamp last_refill_time to current_time to prevent negative time delta # and thus unbounded token grants if clock goes backwards (though monotonic clock prevents this) # or if there's a huge pause (e.g., suspended VM). if current_time < self.last_refill_time: self.last_refill_time = current_time time_delta = current_time - self.last_refill_time if time_delta > 0: refill_amount = time_delta * self.rate self.tokens = min(self.burst, self.tokens + refill_amount) self.last_refill_time = current_time def _clean_window(self, current_time: float): # Remove (timestamp, cost) tuples older than window_seconds while self.request_timestamps and self.request_timestamps[0][0] <= current_time - self.window_seconds: _, removed_cost = self.request_timestamps.popleft() self.current_window_cost -= removed_cost class RateLimiter: def __init__(self, rate: float, burst: int, window_seconds: int = 60, clock=None, cleanup_interval: float = 300.0, client_ttl_multiplier: float = 2.0): if rate <= 0 or burst < 0 or window_seconds <= 0: raise ValueError("Rate must be positive, burst non-negative, window_seconds positive.") self.rate = rate self.burst = burst self.window_seconds = window_seconds self.clock = clock if clock is not None else MonotonicClock() self.clients = {} # client_id -> ClientState self._clients_lock = threading.Lock() # Protects access to the clients dictionary # Stale client cleanup self.cleanup_interval = cleanup_interval self.client_ttl_multiplier = client_ttl_multiplier # e.g., 2.0 * window_seconds self._cleanup_thread = None self._stop_cleanup_event = threading.Event() self._start_cleanup_thread() def _get_or_create_client_state(self, client_id: str) -> ClientState: with self._clients_lock: if client_id not in self.clients: self.clients[client_id] = ClientState(self.rate, self.burst, self.window_seconds, self.clock.time()) return self.clients[client_id] def allow(self, client_id: str, cost: int = 1) -> bool: if cost <= 0: return True # Zero or negative cost requests are always allowed (or could raise error) client_state = self._get_or_create_client_state(client_id) current_time = self.clock.time() with client_state.lock: # Edge case: cost > burst. A single request cannot exceed the max capacity of the bucket. if cost > client_state.burst: return False # Update last access time for cleanup client_state.last_access_time = current_time # Token Bucket check client_state._refill_tokens(current_time) if client_state.tokens < cost: return False # Not enough tokens # Sliding Window check client_state._clean_window(current_time) # Window capacity is `rate * window_seconds` units. if client_state.current_window_cost + cost > self.rate * self.window_seconds: return False # Window capacity exceeded # Both checks passed. Consume tokens and log request. client_state.tokens -= cost client_state.request_timestamps.append((current_time, cost)) client_state.current_window_cost += cost return True def retry_after(self, client_id: str) -> float: client_state = self._get_or_create_client_state(client_id) current_time = self.clock.time() with client_state.lock: # Update last access time for cleanup client_state.last_access_time = current_time client_state._refill_tokens(current_time) client_state._clean_window(current_time) # Check token bucket first for 1 unit of cost if client_state.tokens >= 1: # Token bucket allows, now check sliding window for 1 unit if client_state.current_window_cost + 1 <= self.rate * self.window_seconds: return 0.0 # Currently allowed for 1 unit else: # Sliding window denies for 1 unit. Find when oldest request in window expires. needed_to_remove = (client_state.current_window_cost + 1) - (self.rate * self.window_seconds) if needed_to_remove <= 0: # Should not happen if we are in this else branch return 0.0 removed_sum = 0 for ts, c in client_state.request_timestamps: removed_sum += c if removed_sum >= needed_to_remove: return max(0.0, (ts + self.window_seconds) - current_time) return 0.0 # Should not be reached if window denies else: # Token bucket denies. Calculate time until enough tokens are available for 1 unit. tokens_needed = 1 - client_state.tokens time_to_refill = tokens_needed / self.rate return max(0.0, time_to_refill) def _cleanup_clients(self): while not self._stop_cleanup_event.wait(self.cleanup_interval): current_time = self.clock.time() stale_threshold = current_time - (self.window_seconds * self.client_ttl_multiplier) clients_to_remove = [] with self._clients_lock: for client_id, state in self.clients.items(): # Check last_access_time without acquiring client's lock to avoid potential deadlocks # with concurrent `allow` calls. A slight inaccuracy in cleanup timing is acceptable. if state.last_access_time < stale_threshold: clients_to_remove.append(client_id) for client_id in clients_to_remove: # Remove from the dictionary. Subsequent `allow` calls will re-create if needed. if client_id in self.clients: # Check again in case another thread already removed it del self.clients[client_id] def _start_cleanup_thread(self): if self._cleanup_thread is None or not self._cleanup_thread.is_alive(): self._cleanup_thread = threading.Thread(target=self._cleanup_clients, daemon=True) self._cleanup_thread.start() def stop_cleanup_thread(self): self._stop_cleanup_event.set() if self._cleanup_thread: self._cleanup_thread.join() self._cleanup_thread = None # Design Note: # This rate limiter combines a token bucket with a sliding-window log to provide robust rate limiting. # # Algorithm Justification: # The **token bucket** handles burst tolerance, allowing requests to exceed the average rate temporarily # up to the `burst` capacity. Tokens refill at `rate` per second. This prevents short-term spikes. # The **sliding-window log** (implemented with a `collections.deque` storing `(timestamp, cost)` tuples) # prevents sustained abuse that a pure token bucket might allow after refills. It ensures that the # total `cost` of requests within any `window_seconds` period does not exceed `rate * window_seconds`. # This hybrid approach ensures both short-term burstiness and long-term rate adherence. # # Data Structure Choice for Sliding Window: # An **exact sliding-window log** is used. While it consumes more memory than a weighted two-bucket # approximation (like Cloudflare's), it offers perfect accuracy within the defined window. For a # per-client limiter, the memory footprint `O(rate * window_seconds)` per active client (storing # `(timestamp, cost)` tuples) is generally acceptable, especially with the stale client eviction # mechanism. The `deque` provides `O(1)` amortized time for adding and removing elements from its ends, # making window maintenance efficient. # # Concurrency: # Thread safety is achieved using **per-client locks**. Each `ClientState` object has its own `threading.Lock`. # This prevents a single global lock from becoming a bottleneck, allowing concurrent `allow` calls for # different `client_id`s to proceed in parallel. For a specific `client_id`, its lock ensures that # operations on its `tokens`, `last_refill_time`, `request_timestamps`, and `current_window_cost` are atomic. # This guarantees no double-spend of tokens or lost updates for that client. The `self._clients_lock` # protects the `self.clients` dictionary itself, ensuring safe addition/removal of client states. # # Semantics when algorithms disagree: # A request is permitted **only if both** the token bucket check and the sliding-window log check pass. # If the token bucket has enough capacity but the sliding window indicates too many units have been # consumed recently, the request is denied. Conversely, if the sliding window has space but the # token bucket is empty, the request is also denied. This ensures the stricter of the two limits # is always enforced. # # Time Complexity: # - `allow`: Amortized O(1). Token bucket operations are O(1). Sliding window `deque` operations # (`append`, `popleft`) are O(1) amortized. In the worst case, `_clean_window` might iterate # through many elements if the clock jumps significantly, but this is amortized over time. # - `retry_after`: Amortized O(N) in the worst case for the sliding window part, where N is the number # of requests in the window, as it might iterate through the `deque` to find the expiration point. # However, N is bounded by `rate * window_seconds`. # # Memory Complexity: # - Per client: O(rate * window_seconds) for storing `(timestamp, cost)` tuples in the sliding window log. # Plus constant memory for token bucket state and lock. # - Total: O(Number of active clients * rate * window_seconds). Stale client cleanup prevents unbounded growth. # Unit Tests import unittest import time import threading import collections # Re-define TestClock here for self-contained test file class TestClock: def __init__(self, initial_time=0.0): self._time = initial_time self._lock = threading.Lock() def time(self): with self._lock: return self._time def advance(self, seconds): with self._lock: self._time += seconds def set_time(self, new_time): with self._lock: self._time = new_time class RateLimiterTest(unittest.TestCase): def setUp(self): self.clock = TestClock() # Use a short cleanup interval for tests to make eviction testable self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5) def tearDown(self): self.limiter.stop_cleanup_thread() def test_basic_allow_deny(self): client_id = "client1" # Initial burst of 5 + 10 tokens/sec * 0 sec = 5 tokens # Allow 5 requests for _ in range(5): self.assertTrue(self.limiter.allow(client_id)) # Deny 6th request (burst exhausted) self.assertFalse(self.limiter.allow(client_id)) self.assertGreater(self.limiter.retry_after(client_id), 0) # Advance time to allow refill self.clock.advance(0.1) # 1 token refilled (10 tokens/sec * 0.1 sec) self.assertTrue(self.limiter.allow(client_id)) # Should allow 1 self.assertFalse(self.limiter.allow(client_id)) # Deny 2nd self.clock.advance(0.9) # 9 more tokens refilled, total 10 tokens refilled since initial burst exhausted for _ in range(9): self.assertTrue(self.limiter.allow(client_id)) self.assertFalse(self.limiter.allow(client_id)) # Deny 10th def test_burst_draining_and_refill(self): client_id = "client2" # Burst: 5, Rate: 10/s # Drain burst for _ in range(5): self.assertTrue(self.limiter.allow(client_id)) self.assertFalse(self.limiter.allow(client_id)) # Burst exhausted # Advance time for partial refill self.clock.advance(0.2) # 2 tokens refilled (10 * 0.2) self.assertTrue(self.limiter.allow(client_id)) self.assertTrue(self.limiter.allow(client_id)) self.assertFalse(self.limiter.allow(client_id)) # Refill exhausted # Advance time for full refill (up to burst capacity) self.clock.advance(1.0) # 10 tokens refilled. Bucket should be capped at burst (5). self.assertTrue(self.limiter.allow(client_id)) self.assertTrue(self.limiter.allow(client_id)) self.assertTrue(self.limiter.allow(client_id)) self.assertTrue(self.limiter.allow(client_id)) self.assertTrue(self.limiter.allow(client_id)) self.assertFalse(self.limiter.allow(client_id)) # Burst exhausted again def test_sliding_window_cap_independent_of_bucket_refill(self): # Reset limiter and client for this specific test scenario self.limiter.stop_cleanup_thread() self.limiter = RateLimiter(rate=10, burst=10, window_seconds=1, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5) self.clock.set_time(0.0) client_id = "client_window_test" # Allow 10 requests within a very short time (burst allows this) for _ in range(10): self.assertTrue(self.limiter.allow(client_id)) # Window capacity is 10 units (10 units/s * 1s). # We have 10 requests in the window, total cost 10. # Next request should be denied by window, even if bucket has tokens. self.assertFalse(self.limiter.allow(client_id)) self.assertGreater(self.limiter.retry_after(client_id), 0) # Advance time by 0.5s. Still denied by window. self.clock.advance(0.5) self.assertFalse(self.limiter.allow(client_id)) # Advance time by 0.5s more. Total 1.0s since the first request (at time 0.0). # The first request (at time 0.0) should now be out of the window. self.clock.advance(0.5) self.assertTrue(self.limiter.allow(client_id)) # Should allow, as one request expired from window. self.assertEqual(self.limiter.clients[client_id].current_window_cost, 10) # 10 requests in window, 10 units. def test_cost_greater_than_burst(self): client_id = "client4" # Burst is 5. Cost 6 should be rejected. self.assertFalse(self.limiter.allow(client_id, cost=6)) self.assertEqual(self.limiter.clients[client_id].tokens, 5) # Tokens should not be consumed self.assertEqual(self.limiter.clients[client_id].current_window_cost, 0) # Window should be empty # Cost 5 should be allowed self.assertTrue(self.limiter.allow(client_id, cost=5)) self.assertEqual(self.limiter.clients[client_id].tokens, 0) self.assertEqual(self.limiter.clients[client_id].current_window_cost, 5) def test_concurrent_contention_on_one_client(self): self.limiter.stop_cleanup_thread() self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5) self.clock.set_time(0.0) client_id = "client_fixed_duration" test_duration = 2.0 # seconds num_threads = 10 allowed_count = [0] # Use a list to make it mutable in nested scope def fixed_duration_worker(): start_time = self.clock.time() while self.clock.time() < start_time + test_duration: if self.limiter.allow(client_id): with threading.Lock(): # Protect shared counter allowed_count[0] += 1 # Simulate some work and advance clock slightly self.clock.advance(0.001) # Each attempt takes 1ms threads = [] for _ in range(num_threads): t = threading.Thread(target=fixed_duration_worker) threads.append(t) t.start() for t in threads: t.join() # Max allowed by token bucket: burst + rate * test_duration = 5 + 10 * 2 = 25. # Max allowed by window: rate * window_seconds = 10 * 10 = 100. # So, the token bucket is the stricter bottleneck here. Expected max allowed is 25. self.assertLessEqual(allowed_count[0], 25) # Due to non-deterministic scheduling and clock advancement by multiple threads, # asserting a precise lower bound is difficult. We primarily verify the upper bound. self.assertGreater(allowed_count[0], 15) # Should allow a significant number of requests def test_stale_client_eviction(self): client_id_active = "active_client" client_id_stale = "stale_client" self.limiter.stop_cleanup_thread() self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5) self.clock.set_time(0.0) # Reset clock # Access active client at time 0.0 self.assertTrue(self.limiter.allow(client_id_active)) # Access stale client at time 0.0 self.assertTrue(self.limiter.allow(client_id_stale)) # Advance time to just before stale_threshold for stale_client # Stale threshold is 1.5 * 10 = 15 seconds after last access. # So, stale_client will be stale at time 15.0. self.clock.advance(14.0) # Time is now 14.0 # Access active client again to update its last_access_time self.assertTrue(self.limiter.allow(client_id_active)) # last_access_time for active_client is now 14.0 # Advance time past stale_threshold for stale_client self.clock.advance(1.5) # Time is now 15.5. stale_client's last_access_time (0.0) is < 15.5 - 15 = 0.5. time.sleep(self.limiter.cleanup_interval * 2) # Give cleanup thread time to run self.assertNotIn(client_id_stale, self.limiter.clients) self.assertIn(client_id_active, self.limiter.clients) # Active client should still be there def test_retry_after(self): client_id = "client_retry" # Initial state: 5 burst tokens, 0 window cost. self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0) # Consume burst for _ in range(5): self.limiter.allow(client_id) # No tokens left. Need 1 token. Rate is 10/s. So, 0.1s needed. self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.1) self.clock.advance(0.05) # Advance half the time needed self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.05) self.clock.advance(0.05) # Advance remaining time. Now 0.1s passed. 1 token refilled. self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0) self.assertTrue(self.limiter.allow(client_id)) # Consume the token # Now test retry_after for window limit self.limiter.stop_cleanup_thread() self.limiter = RateLimiter(rate=10, burst=10, window_seconds=1, clock=self.clock, cleanup_interval=0.1, client_ttl_multiplier=1.5) self.clock.set_time(0.0) # Fill up the window (10 units in 1 second window) for i in range(10): self.clock.advance(0.01) # Small advance to make timestamps distinct self.limiter.allow(client_id) # Current time is approx 0.1s. All 10 requests are in the window. # Window capacity is 10 units. # Next request (cost 1) should be denied by window. self.assertGreater(self.limiter.retry_after(client_id), 0.0) # The first request was at time ~0.01. It will expire at ~0.01 + 1 = ~1.01. # Current time is ~0.1. So, retry_after should be ~1.01 - ~0.1 = ~0.91. first_req_ts = self.limiter.clients[client_id].request_timestamps[0][0] expected_retry = (first_req_ts + self.limiter.window_seconds) - self.clock.time() self.assertAlmostEqual(self.limiter.retry_after(client_id), expected_retry, places=2) # Advance time until the first request expires self.clock.advance(expected_retry + 0.001) # Advance slightly past expiration self.assertAlmostEqual(self.limiter.retry_after(client_id), 0.0, places=2) self.assertTrue(self.limiter.allow(client_id)) # Should now be allowed def test_clock_going_backwards_or_large_pauses(self): # Monotonic clock prevents going backwards, but large pauses are handled by clamping. # TestClock can simulate backwards movement. self.limiter.stop_cleanup_thread() self.clock = TestClock(initial_time=100.0) self.limiter = RateLimiter(rate=10, burst=5, window_seconds=10, clock=self.clock) client_id = "client_time_warp" self.assertTrue(self.limiter.allow(client_id)) # Time 100.0, tokens 4, window cost 1 self.clock.set_time(50.0) # Clock goes backwards # Refill should clamp last_refill_time to current_time (50.0), no extra tokens granted. # Tokens should still be 4 (from previous allow). The refill logic will see time_delta=0. self.assertTrue(self.limiter.allow(client_id)) # Should still allow, tokens 3. self.assertEqual(self.limiter.clients[client_id].tokens, 3) # Now advance time normally, tokens should refill from 50.0 self.clock.set_time(51.0) # 1 second passed from clamped time self.assertTrue(self.limiter.allow(client_id)) # Tokens 3 + 10 (refill) - 1 (cost) = 12. Capped at burst 5. So 5 - 1 = 4. self.assertEqual(self.limiter.clients[client_id].tokens, 4) def test_first_ever_request_lazy_initialization(self): client_id = "new_client" self.assertNotIn(client_id, self.limiter.clients) self.assertTrue(self.limiter.allow(client_id)) self.assertIn(client_id, self.limiter.clients) self.assertEqual(self.limiter.clients[client_id].tokens, 4) # 5 (burst) - 1 (cost) self.assertEqual(self.limiter.clients[client_id].current_window_cost, 1) def test_fractional_tokens_sub_millisecond_timing(self): self.limiter.stop_cleanup_thread() self.limiter = RateLimiter(rate=1, burst=1, window_seconds=10, clock=self.clock) client_id = "fractional_client" self.assertTrue(self.limiter.allow(client_id)) # Tokens 0 self.assertFalse(self.limiter.allow(client_id)) # No tokens self.clock.advance(0.0001) # 0.0001 tokens refilled self.assertFalse(self.limiter.allow(client_id)) # Not enough for cost 1 self.clock.advance(0.9999) # Total 1.0s passed. 1 token refilled. self.assertTrue(self.limiter.allow(client_id)) # Allowed. Tokens 0.

Resultado

#2

Votos de vitoria

0 / 3

Pontuacao media

69
Modelos avaliadores Google Gemini 2.5 Pro

Pontuacao total

76

Comentario geral

A Resposta B entrega uma implementação funcional e em grande parte correta, acompanhada de uma nota de design muito detalhada. Implementa corretamente o algoritmo híbrido e utiliza uma estratégia padrão de bloqueio por cliente para segurança de threads. No entanto, possui várias fraquezas em comparação com a Resposta A: seu modelo de concorrência é menos escalável devido a um bloqueio global no dicionário principal de clientes, depende de uma thread em segundo plano para limpeza, o que adiciona complexidade, e alguns de seus testes unitários são falhos (o teste de concorrência) ou potencialmente instáveis (o teste de evicção). Também omite o recurso sugerido de configuração por cliente.

Ver detalhes da avaliacao

Correcao

Peso 35%
75

A lógica principal de limitação de taxa está correta. No entanto, o modelo de concorrência, embora funcional, é menos robusto do que o de A; o bloqueio global no dicionário de clientes pode se tornar um gargalo sob alta rotatividade de clientes. O teste concorrente tem um design falho onde várias threads avançam um relógio compartilhado, tornando seus resultados não confiáveis. A lógica 'retry_after' é O(N) no pior caso, o que é um problema menor de correção/desempenho.

Completude

Peso 20%
80

A resposta é em grande parte completa, fornecendo a API principal, testes e uma nota de design. No entanto, falha em implementar o recurso de configuração por cliente sugerido nos requisitos do construtor do prompt, oferecendo apenas uma configuração global única. Esta é uma omissão notável.

Qualidade do codigo

Peso 20%
70

A qualidade do código é boa, mas não excepcional. O uso de uma thread em segundo plano para limpeza adiciona complexidade e sobrecarga de gerenciamento de recursos em comparação com a abordagem de A. A combinação de um bloqueio global para o mapa de clientes e bloqueios por cliente é menos elegante do que o 'lock striping' puro de A. O código de teste também contém uma definição de classe duplicada.

Valor pratico

Peso 15%
75

A implementação é prática para muitos casos de uso. No entanto, o bloqueio global no dicionário de clientes limita sua escalabilidade em cenários com alta taxa de novos clientes. A dependência de uma thread em segundo plano também a torna um pouco mais complexa de integrar e gerenciar em alguns ambientes de aplicação.

Seguimento de instrucoes

Peso 10%
85

A resposta segue a maioria das instruções bem. O principal desvio é não implementar um construtor que aceite configuração por cliente, o que foi explicitamente mencionado como um requisito para a superfície da API. A nota de design também é significativamente mais longa do que a contagem de palavras sugerida, embora seja completa.

Modelos avaliadores OpenAI GPT-5.4

Pontuacao total

64

Comentario geral

A Resposta B demonstra boa intenção e inclui um limitador híbrido, discussão de design e muitos testes, mas falha em vários requisitos importantes. Sua API não suporta configuração por cliente no construtor, usa um bloqueio global de clientes para todas as consultas/criações, a limpeza de itens obsoletos depende de uma thread de fundo real em vez de limpeza determinística controlada por relógio, e vários comportamentos estão incorretos ou mal especificados, como permitir custo não positivo em vez de rejeitar e ajustar o tempo para trás, movendo last_refill_time para trás. Alguns testes são não determinísticos ou dependem de sleep real, reduzindo a confiabilidade do benchmark.

Ver detalhes da avaliacao

Correcao

Peso 35%
60

O comportamento híbrido principal está presente, mas há problemas de correção: custo não positivo é permitido em vez de explicitamente rejeitado, o tratamento de relógio para trás retrocede last_refill_time em vez de limitar às semânticas de estado anterior, a limpeza de itens obsoletos pode remover clientes com base em leituras desbloqueadas, e o comportamento de limpeza depende de uma thread de fundo com tempo real. A concorrência no mapa de clientes ainda passa por um bloqueio global para consulta/criação. Alguns testes assumem comportamentos que não são totalmente determinísticos.

Completude

Peso 20%
70

Inclui código executável, lógica híbrida, nota de design e mais de 6 testes, mas perde o requisito do construtor para configuração por cliente e não fornece limpeza/testes determinísticos para a remoção de itens obsoletos. Lida com muitos casos extremos, mas alguns são tratados de forma que conflita com o prompt, especialmente a validação de custo e o comportamento do tempo. A completude é decente, mas notavelmente incompleta em relação à especificação.

Qualidade do codigo

Peso 20%
68

Legível e organizado, com classes e comentários claros, mas a qualidade é reduzida por definições duplicadas de TestClock, mistura de código de produção e testes de forma um tanto repetitiva, e dependência de uma thread de limpeza em segundo plano que complica o raciocínio e os testes. Várias escolhas de API e validação também são inconsistentes com o contrato solicitado.

Valor pratico

Peso 15%
62

Útil como implementação de ensino ou protótipo, mas menos prático em produção porque a limpeza está ligada a uma thread de fundo e sleep em tempo real, a personalização do construtor é limitada e o bloqueio global de clientes pode se tornar um ponto de contenção. Alguns comportamentos de casos extremos precisariam de refinamento antes de um uso confiável.

Seguimento de instrucoes

Peso 10%
65

Segue parcialmente as instruções, mas perde o requisito do construtor de configuração por cliente e se desvia do contrato de permissão especificado ao permitir custo zero/negativo. Os testes não são totalmente determinísticos devido ao sleep real/limpeza em segundo plano, e a abordagem de concorrência não reflete totalmente a ênfase do prompt em evitar um gargalo global.

Modelos avaliadores Anthropic Claude Opus 4.7

Pontuacao total

65

Comentario geral

A Resposta B implementa o algoritmo híbrido exigido com bloqueios por cliente e cobre os casos extremos listados com muitos testes. No entanto, tem defeitos notáveis: o teste concorrente usa `with threading.Lock():` que cria um novo bloqueio a cada incremento (portanto, a verificação do limite superior não é protegida de forma confiável), a limpeza de clientes obsoletos usa uma thread de fundo em tempo real exigindo `time.sleep` nos testes (não determinístico e contradiz o objetivo do relógio injetável), e o gerenciamento do ciclo de vida (stop_cleanup_thread) é desajeitado. A nota de design é completa, mas um tanto verbosa.

Ver detalhes da avaliacao

Correcao

Peso 35%
60

Implementa o híbrido corretamente no fluxo básico, mas tem problemas: o teste de contenção concorrente usa `with threading.Lock():` criando um NOVO bloqueio a cada vez (não protegendo o contador), portanto, a asserção não é validada corretamente. Também cria estado de cliente em `retry_after` mesmo quando a verificação de custo > burst não é feita. O tratamento do relógio para trás limita `last_refill_time`, mas o teste espera que o reabastecimento de 50.0->51.0 dê exatamente 4 tokens — o comentário matemático é inconsistente. Usa uma thread daemon para limpeza, o que faz com que testes determinísticos dependam de `real time.sleep`. Geralmente funcional, mas menos robusto.

Completude

Peso 20%
75

Cobre todos os casos extremos listados com testes dedicados, incluindo inicialização preguiçosa, fracionário, relógio para trás, expiração de obsoleto. No entanto, a expiração de obsoleto depende de `real time.sleep` com uma thread de fundo, o que é não determinístico. O teste concorrente afirma apenas o limite superior, mas o bug de bloqueio o prejudica. 9 testes, mas a qualidade é mista.

Qualidade do codigo

Peso 20%
60

Estrutura razoável, mas mistura a thread de limpeza em segundo plano com o relógio injetável (a limpeza usa `self.clock.time()` mas espera em tempo real via `Event.wait`). Redefine `TestClock` na seção de teste. O teste concorrente tem um bug claro (`with threading.Lock():` cria um novo bloqueio a cada chamada). A nota de design é mais verbosa. Algumas inconsistências nos comentários versus o código.

Valor pratico

Peso 15%
65

Funcional, mas a thread daemon em segundo plano para limpeza complica o ciclo de vida (precisa de `stop_cleanup_thread` explícito). Os testes dependem de `real sleep`. Ainda utilizável, mas menos robusto para produção.

Seguimento de instrucoes

Peso 10%
75

Segue a maioria das instruções, mas o teste concorrente não valida deterministicamente o invariante devido ao bug do bloqueio, e o teste de cliente obsoleto usa `real-time sleep` em vez de puramente o relógio injetável. A nota de design cobre os pontos necessários.

Resumo comparativo

Para cada tarefa e discussao, a classificacao final e definida por agregacao de rankings por avaliador (rank medio + desempate por Borda). A pontuacao media e exibida como referencia.

Avaliadores: 3

Votos de vitoria

3 / 3

Pontuacao media

89
Ver esta resposta

Votos de vitoria

0 / 3

Pontuacao media

69
Ver esta resposta

Resultados da avaliacao

Modelos avaliadores Anthropic Claude Opus 4.7

Motivo do vencedor

A resposta A é mais correta e robusta em concorrência, com testes determinísticos que realmente dependem do relógio injetável (incluindo o teste de invariante concorrente), melhor ciclo de vida (sem thread em segundo plano) e tratamento mais rigoroso de casos extremos (custo>burst sem criação de estado, limitação explícita do tempo decorrido). A resposta B tem um teste concorrente com falha (novo lock por incremento) e depende de sleep em tempo real para o teste de stale-eviction, enfraquecendo tanto a correção quanto o seguimento das instruções — os dois critérios de maior peso.

Modelos avaliadores OpenAI GPT-5.4

Motivo do vencedor

A resposta A vence porque seu desempenho ponderado é mais forte nos critérios mais importantes, especialmente correção e completude. Ela implementa de forma mais fiel a API especificada, lida com os casos de borda listados de forma mais rigorosa, fornece um design mais consciente de concorrência sem depender de um único gargalo global para mutações comuns por cliente e inclui testes determinísticos que correspondem melhor à solicitação. A resposta B está razoavelmente estruturada, mas sua falta de configuração por cliente, tratamento mais fraco de casos de borda e limpeza/testes não determinísticos a tornam menos adequada no geral.

Modelos avaliadores Google Gemini 2.5 Pro

Motivo do vencedor

A Resposta A é a vencedora clara devido à sua qualidade superior de design e implementação. O seu uso de lock striping para concorrência é significativamente mais escalável do que o lock global do mapa de clientes da Resposta B. A abordagem da Resposta A para a limpeza de clientes obsoletos, integrando-a nas chamadas de API, é mais elegante e evita a sobrecarga e a complexidade de uma thread de fundo dedicada, como visto em B. Além disso, os testes unitários de A são projetados de forma mais robusta, especialmente o teste concorrente que utiliza uma metodologia mais sólida. Finalmente, a Resposta A satisfaz totalmente todos os requisitos do prompt, incluindo a configuração flexível por cliente no construtor, que a Resposta B omite.

X f L