Orivel Orivel
Abrir menu

Implemente um cache LRU concorrente sem bloqueios

Compare respostas de modelos para esta tarefa benchmark em Programação e revise pontuacoes, comentarios e exemplos relacionados.

Entre ou cadastre-se para usar curtidas e favoritos. Cadastrar

X f L

Indice

Visao geral da tarefa

Generos de Comparacao

Programação

Modelo criador da tarefa

Modelos participantes

Modelos avaliadores

Enunciado da tarefa

Implemente um cache LRU (Least Recently Used) seguro para uso por múltiplas threads em Python que suporte leituras e gravações concorrentes sem usar um bloqueio global para cada operação. Sua implementação deve satisfazer os seguintes requisitos: 1. **Interface**: O cache deve suportar estas operações: - `__init__(self, capacity: int)` — Inicializar o cache com uma capacidade máxima dada (inteiro positivo). - `get(self, key: str) -> Optional[Any]` — Retornar o valor associado à chave se ela existir (e marcá-...

Mostrar mais

Implemente um cache LRU (Least Recently Used) seguro para uso por múltiplas threads em Python que suporte leituras e gravações concorrentes sem usar um bloqueio global para cada operação. Sua implementação deve satisfazer os seguintes requisitos: 1. **Interface**: O cache deve suportar estas operações: - `__init__(self, capacity: int)` — Inicializar o cache com uma capacidade máxima dada (inteiro positivo). - `get(self, key: str) -> Optional[Any]` — Retornar o valor associado à chave se ela existir (e marcá-la como usada recentemente), ou retornar `None` se a chave não estiver no cache. - `put(self, key: str, value: Any) -> None` — Inserir ou atualizar o par chave-valor. Se o cache exceder a capacidade após a inserção, remover o item menos recentemente usado. - `delete(self, key: str) -> bool` — Remover a chave do cache. Retornar `True` se a chave estava presente, `False` caso contrário. - `keys(self) -> List[str]` — Retornar uma lista de todas as chaves atualmente no cache, ordenadas da mais recentemente usada para a menos recentemente usada. 2. **Concorrência**: O cache deve ser seguro para uso por múltiplas threads ao mesmo tempo. Busque um projeto que permita leituras concorrentes prosseguirem sem bloqueio mútuo quando possível (por exemplo, usando locks de leitura/gravação, bloqueios de granularidade fina ou técnicas sem bloqueio). Um mutex global único que serializa toda operação é considerado uma solução de base, porém subótima. 3. **Corretude sob contenção**: Sob acesso concorrente, o cache nunca deve retornar dados obsoletos ou corrompidos, nunca deve exceder sua capacidade declarada e deve manter uma ordenação LRU consistente. 4. **Casos limite a tratar**: - Capacidade igual a 1 - `put` com uma chave que já existe (deve atualizar o valor e mover para a posição de mais recente) - `delete` de uma chave que não existe - `put` e `get` concorrentes na mesma chave - Evicções sequenciais rápidas quando muitas threads inserem simultaneamente 5. **Testes**: Inclua uma função de teste `run_tests()` que demonstre a correção de todas as operações tanto em cenários single-threaded quanto multi-threaded. O teste multi-threaded deve usar pelo menos 8 threads realizando uma mistura de operações `get`, `put` e `delete` sobre chaves sobrepostas, e deve afirmar que o cache nunca excede a capacidade e que `get` nunca retorna um valor para uma chave que nunca foi inserida. Forneça sua implementação completa em Python. Use apenas a biblioteca padrão (nenhum pacote de terceiros). Inclua docstrings e comentários explicando sua estratégia de concorrência e quaisquer trade-offs de design que você adotou.

Politica de avaliacao

Uma resposta forte deve fornecer uma implementação completa e executável em Python que atenda a todos os cinco requisitos. Dimensões chave de qualidade para avaliar: Corretude: Todas as operações (get, put, delete, keys) devem se comportar corretamente segundo a semântica LRU tanto em contexto single-threaded quanto multi-threaded. O cache não deve nunca exceder a capacidade e não deve retornar dados corrompidos. Projeto de concorrência: A solução deve ir além de um único lock global. Respostas de maior qualidade...

Mostrar mais

Uma resposta forte deve fornecer uma implementação completa e executável em Python que atenda a todos os cinco requisitos. Dimensões chave de qualidade para avaliar: Corretude: Todas as operações (get, put, delete, keys) devem se comportar corretamente segundo a semântica LRU tanto em contexto single-threaded quanto multi-threaded. O cache não deve nunca exceder a capacidade e não deve retornar dados corrompidos. Projeto de concorrência: A solução deve ir além de um único lock global. Respostas de maior qualidade usarão locks de leitura/gravação, locks particionados (sharding) ou outras estratégias de granularidade fina que permitam leituras concorrentes. A resposta deve explicar claramente a estratégia escolhida e seus trade-offs. Tratamento de casos limite: A implementação deve lidar corretamente com capacidade 1, atualização de chaves existentes, exclusão de chaves inexistentes e operações concorrentes sobre a mesma chave. Testes: A função run_tests deve incluir testes single-threaded significativos e um teste de estresse multi-threaded com pelo menos 8 threads, afirmando invariantes como limite de capacidade e integridade dos dados. Qualidade do código: O código deve ser bem estruturado, incluir docstrings e comentários explicando a abordagem de concorrência, e usar apenas a biblioteca padrão do Python. Legibilidade e manutenibilidade importam. Robustez: A implementação não deve produzir deadlocks, deve manejar exceções de forma razoável e deve funcionar corretamente sob alta contenção. Soluções parciais ou incompletas, soluções com condições de corrida ou soluções que usem apenas um lock global trivial devem receber avaliação inferior.

Respostas

Resposta: import threading import random import time from typing import Any, Optional, List class _Node: """ Representa um nó na lista duplamente ligada usada para a ordenação LRU. Usa __slots__ para eficiência de memória. """ __slots__ = 'key', 'value', 'prev', 'next' def __init__(self, key: str, value: Any): self.key = key self.value = value self.prev = None self.next = None class ReadWriteLock: """ Uma implementação personalizada de bloqueio de Leitura-Escrita. Este bloqueio permite que múltiplos leitores mantenham o bloqueio simultaneament...

Mostrar resposta completa

Resposta: import threading import random import time from typing import Any, Optional, List class _Node: """ Representa um nó na lista duplamente ligada usada para a ordenação LRU. Usa __slots__ para eficiência de memória. """ __slots__ = 'key', 'value', 'prev', 'next' def __init__(self, key: str, value: Any): self.key = key self.value = value self.prev = None self.next = None class ReadWriteLock: """ Uma implementação personalizada de bloqueio de Leitura-Escrita. Este bloqueio permite que múltiplos leitores mantenham o bloqueio simultaneamente, mas apenas um escritor por vez. Um escritor deve esperar que todos os leitores terminem, e os leitores devem esperar que um escritor termine. Esta é uma implementação com preferência para leitores, o que significa que novos leitores podem adquirir o bloqueio de leitura mesmo que um escritor esteja à espera. Esta classe também fornece gestores de contexto para tornar a aquisição e libertação de bloqueios mais limpas. """ def __init__(self): self._read_lock = threading.Lock() # Protege a contagem de _readers e a variável de condição self._write_lock = threading.Lock() # O bloqueio de escrita real self._readers = 0 # Número de leitores ativos self._no_readers = threading.Condition(self._read_lock) # Condição para escritores esperarem pelos leitores def acquire_read(self): """Adquire um bloqueio de leitura. Múltiplos leitores podem manter este bloqueio simultaneamente.""" with self._read_lock: # Espera se um escritor estiver atualmente a manter o bloqueio de escrita. # Novos leitores serão bloqueados se um escritor estiver ativo. while self._write_lock.locked(): self._no_readers.wait() self._readers += 1 def release_read(self): """Liberta um bloqueio de leitura.""" with self._read_lock: self._readers -= 1 # Se não restarem leitores, notifica quaisquer escritores à espera if self._readers == 0: self._no_readers.notify_all() def acquire_write(self): """Adquire um bloqueio de escrita. Apenas um escritor pode manter este bloqueio de cada vez.""" # Adquire primeiro o bloqueio principal de escrita para sinalizar intenção de escrever self._write_lock.acquire() with self._read_lock: # Depois adquire read_lock para esperar pelos leitores ativos # Espera até que todos os leitores ativos terminem while self._readers > 0: self._no_readers.wait() def release_write(self): """Liberta um bloqueio de escrita.""" with self._read_lock: # Adquire read_lock para notificar potenciais leitores/escritores self._no_readers.notify_all() # Notifica quaisquer leitores ou escritores à espera self._write_lock.release() # Liberta o bloqueio principal de escrita class _ReadContextManager: def __init__(self, lock): self.lock = lock def __enter__(self): self.lock.acquire_read() def __exit__(self, exc_type, exc_val, exc_tb): self.lock.release_read() class _WriteContextManager: def __init__(self, lock): self.lock = lock def __enter__(self): self.lock.acquire_write() def __exit__(self, exc_type, exc_val, exc_tb): self.lock.release_write() def read_locked(self): """Retorna um gestor de contexto para adquirir um bloqueio de leitura.""" return self._ReadContextManager(self) def write_locked(self): """Retorna um gestor de contexto para adquirir um bloqueio de escrita.""" return self._WriteContextManager(self) class LRUCache: """ Uma implementação de cache LRU (Least Recently Used) segura para threads. Este cache usa um ReadWriteLock para gerir o acesso concorrente. O design procura permitir leituras concorrentes para falhas de cache e para a operação `keys()`. No entanto, operações que modificam o estado do cache ou a ordem LRU (`put`, `delete`, e acertos de cache em `get`) adquirem um bloqueio de escrita, garantindo atomicidade e consistência para estas secções críticas. Esta abordagem oferece melhor concorrência do que um único mutex global para todas as operações, especialmente para cargas de trabalho com muitas leituras com falhas de cache frequentes ou chamadas a `keys()`. A operação `get` emprega um padrão de "upgrade": primeiro tenta um bloqueio de leitura para verificar a existência da chave. Se encontrada, liberta o bloqueio de leitura e adquire um bloqueio de escrita para atualizar a ordem LRU e obter o valor mais recente. Este padrão é cuidadosamente implementado para lidar com condições de corrida que possam ocorrer durante o upgrade do bloqueio (por exemplo, a chave ser apagada ou atualizada por outra thread). """ def __init__(self, capacity: int): if capacity <= 0: raise ValueError("A capacidade deve ser um inteiro positivo.") self._capacity = capacity self._cache = {} # Armazena key -> _Node para pesquisa O(1) self._size = 0 # Nós fictícios de cabeça e cauda para a lista duplamente ligada. # Isto simplifica casos extremos para adicionar/remover nós, já que os nós de dados reais # estão sempre entre a cabeça e a cauda. self._head = _Node("dummy_head", None) self._tail = _Node("dummy_tail", None) self._head.next = self._tail self._tail.prev = self._head # Controlo de concorrência: um único ReadWriteLock protege todo o estado do cache. self._rw_lock = ReadWriteLock() def _add_node(self, node: _Node): """Adiciona um nó logo após a cabeça (posição mais recentemente usada). Assume que o bloqueio de escrita está ativo.""" node.prev = self._head node.next = self._head.next self._head.next.prev = node self._head.next = node def _remove_node(self, node: _Node): """Remove um nó da lista duplamente ligada. Assume que o bloqueio de escrita está ativo.""" prev_node = node.prev next_node = node.next prev_node.next = next_node next_node.prev = prev_node def _move_to_front(self, node: _Node): """Move um nó existente para a frente da lista (mais recentemente usado). Assume que o bloqueio de escrita está ativo.""" self._remove_node(node) self._add_node(node) def _pop_tail(self) -> _Node: """Remove e retorna o nó menos recentemente usado (o que está imediatamente antes da cauda). Assume que o bloqueio de escrita está ativo.""" node = self._tail.prev self._remove_node(node) return node def get(self, key: str) -> Optional[Any]: """ Retorna o valor associado à chave se ela existir, e marca-a como recentemente usada. Retorna None se a chave não estiver no cache. Esta operação primeiro adquire um bloqueio de leitura para verificar a existência da chave. Se a chave for encontrada, liberta o bloqueio de leitura e depois adquire um bloqueio de escrita para atualizar a ordem LRU e obter o valor mais recente. Este padrão de "upgrade" permite falhas de `get` e operações `keys()` concorrentes, mas serializa acertos de `get` com outras escritas. """ node_found_under_read_lock = None with self._rw_lock.read_locked(): node_found_under_read_lock = self._cache.get(key) if not node_found_under_read_lock: return None # Se a chave foi encontrada, adquire um bloqueio de escrita para atualizar a ordem LRU e obter o valor mais recente. # Este é um padrão de "upgrade". Pode ocorrer uma condição de corrida aqui: outra thread # pode apagar ou atualizar a chave entre a libertação do bloqueio de leitura e a aquisição do bloqueio de escrita. with self._rw_lock.write_locked(): # Verifica novamente se a chave ainda existe e se se refere exatamente ao mesmo objeto nó. # Isto é crucial para a correção sob contenção. Se outra thread tiver executado # um `put` para a mesma chave, `_cache.get(key)` retornaria um *novo* nó. # Se outra thread tiver executado um `delete`, retornaria `None`. current_node = self._cache.get(key) if current_node is node_found_under_read_lock: # Garante que é exatamente o mesmo objeto nó self._move_to_front(node_found_under_read_lock) value = current_node.value # Obtém o valor mais recente depois de mover return value else: # O nó foi apagado ou substituído por outra operação `put` # entre a nossa aquisição do bloqueio de leitura e a do bloqueio de escrita. Trata como falha. return None def put(self, key: str, value: Any) -> None: """ Insere ou atualiza o par chave-valor. Se o cache exceder a capacidade após a inserção, remove o item menos recentemente usado. Esta é uma operação de escrita e adquire um bloqueio de escrita. """ with self._rw_lock.write_locked(): node = self._cache.get(key) if node: # A chave existe: atualiza o valor e move para a frente node.value = value self._move_to_front(node) else: # A chave não existe: adiciona um novo nó new_node = _Node(key, value) self._cache[key] = new_node self._add_node(new_node) self._size += 1 # Remove se a capacidade for excedida if self._size > self._capacity: lru_node = self._pop_tail() del self._cache[lru_node.key] self._size -= 1 def delete(self, key: str) -> bool: """ Remove a chave do cache. Retorna True se a chave estava presente, False caso contrário. Esta é uma operação de escrita e adquire um bloqueio de escrita. """ with self._rw_lock.write_locked(): node = self._cache.get(key) if node: self._remove_node(node) del self._cache[key] self._size -= 1 return True return False def keys(self) -> List[str]: """ Retorna uma lista de todas as chaves atualmente no cache, ordenadas da mais recentemente usada para a menos recentemente usada. Esta é uma operação de leitura e adquire um bloqueio de leitura. """ with self._rw_lock.read_locked(): keys_list = [] current = self._head.next while current is not self._tail: keys_list.append(current.key) current = current.next return keys_list def run_tests(): """ Executa uma série de testes de thread única e multithread para verificar a implementação de LRUCache. """ print("--- A iniciar testes da cache LRU ---") # --- Testes de thread única --- print("\n--- Testes de thread única ---") cache = LRUCache(capacity=3) assert cache.get("a") is None, "ST: Obter chave inexistente deve retornar None" assert cache.keys() == [], "ST: Cache vazio deve ter lista de chaves vazia" cache.put("a", 1) assert cache.get("a") == 1, "ST: Obter chave existente após put" assert cache.keys() == ["a"], "ST: Lista de chaves após um put" cache.put("b", 2) cache.put("c", 3) assert cache.keys() == ["c", "b", "a"], "ST: Lista de chaves após vários puts" assert cache._size == 3, "ST: O tamanho do cache deve ser 3" # Testar remoção por capacidade cache.put("d", 4) # 'a' deve ser removida por ser a LRU assert cache.get("a") is None, "ST: A chave removida 'a' deve ser None" assert cache.get("d") == 4, "ST: A nova chave 'd' deve estar presente" assert cache.keys() == ["d", "c", "b"], "ST: Lista de chaves após remoção" assert cache._size == 3, "ST: O tamanho do cache deve continuar a ser 3 após remoção" # Testar put com chave existente (atualizar valor e mover para a frente) cache.put("b", 22) # 'b' deve mover-se para a frente, valor atualizado assert cache.get("b") == 22, "ST: Valor atualizado para 'b'" assert cache.keys() == ["b", "d", "c"], "ST: Lista de chaves após atualizar e mover 'b'" # Testar delete assert cache.delete("c") is True, "ST: Apagar chave existente 'c'" assert cache.get("c") is None, "ST: A chave apagada 'c' deve ser None" assert cache.keys() == ["b", "d"], "ST: Lista de chaves após apagar 'c'" assert cache._size == 2, "ST: Tamanho do cache após delete" assert cache.delete("z") is False, "ST: Apagar chave inexistente 'z'" assert cache.keys() == ["b", "d"], "ST: Lista de chaves inalterada após apagar chave inexistente" # Testar capacidade 1 cache_cap1 = LRUCache(capacity=1) cache_cap1.put("x", 10) assert cache_cap1.get("x") == 10, "ST: Capacidade 1, obter 'x'" cache_cap1.put("y", 20) # 'x' deve ser removida assert cache_cap1.get("x") is None, "ST: Capacidade 1, 'x' removida" assert cache_cap1.get("y") == 20, "ST: Capacidade 1, obter 'y'" assert cache_cap1.keys() == ["y"], "ST: Capacidade 1, lista de chaves" print("Testes de thread única aprovados!") # --- Testes multithread --- print("\n--- Testes multithread ---") MT_CAPACITY = 10 MT_NUM_THREADS = 8 MT_OPS_PER_THREAD = 2000 MT_KEY_RANGE = 20 # Chaves de 0 a 19 (por ex., "0", "1", ..., "19") mt_cache = LRUCache(capacity=MT_CAPACITY) # Barreira para garantir que todas as threads de trabalho iniciam as suas operações simultaneamente barrier = threading.Barrier(MT_NUM_THREADS + 1) # +1 para a thread principal error_flag = threading.Event() # Definida se alguma thread encontrar um erro # Rastreia chaves que *alguma vez* foram inseridas com sucesso no cache por qualquer thread. # Usado para verificar que `get` nunca retorna um valor para uma chave que realmente nunca foi inserida. global_ever_inserted_keys = set() global_ever_inserted_keys_lock = threading.Lock() def mt_worker(cache_instance, thread_id, barrier_instance, error_event, ever_inserted_keys_set, ever_inserted_keys_lock): """Função worker para testes multithread.""" barrier_instance.wait() # Espera que todas as threads estejam prontas para iniciar operações for i in range(MT_OPS_PER_THREAD): if error_event.is_set(): break # Para se outra thread já encontrou um erro # Escolhe aleatoriamente uma operação com viés para 'get' e 'put' op = random.choices(['get', 'put', 'delete'], weights=[0.5, 0.4, 0.1], k=1)[0] key = str(random.randint(0, MT_KEY_RANGE - 1)) value = f"value_T{thread_id}_K{key}_I{i}" if op == 'put': cache_instance.put(key, value) with ever_inserted_keys_lock: ever_inserted_keys_set.add(key) elif op == 'get': val = cache_instance.get(key) if val is not None: # Afirma que, se um valor for retornado, a chave deve ter sido inserida em algum momento. with ever_inserted_keys_lock: if key not in ever_inserted_keys_set: print(f"MT Error: Thread {thread_id} obteve o valor '{val}' para a chave '{key}' " f"que nunca foi inserida globalmente. Isto não deveria acontecer.") error_event.set() break # Afirma que o valor não está corrompido (verificação simples de formato) if not isinstance(val, str) or not val.startswith("value_T"): print(f"MT Error: Thread {thread_id} obteve um valor corrompido '{val}' para a chave '{key}'.") error_event.set() break elif op == 'delete': cache_instance.delete(key) # Opcional: introduzir um pequeno sleep aleatório para aumentar a troca de contexto e a contenção # time.sleep(random.uniform(0.00001, 0.0001)) # print(f"Thread {thread_id} terminou.") # Descomente para saída detalhada threads = [] for i in range(MT_NUM_THREADS): thread = threading.Thread(target=mt_worker, args=(mt_cache, i, barrier, error_flag, global_ever_inserted_keys, global_ever_inserted_keys_lock)) threads.append(thread) thread.start() # Espera que todas as threads de trabalho cheguem à barreira e iniciem as operações barrier.wait() print(f"Todas as {MT_NUM_THREADS} threads iniciaram as operações.") # Espera que todas as threads de trabalho completem as suas operações for thread in threads: thread.join() print("Todas as threads terminaram.") # Afirmações finais para testes multithread assert not error_flag.is_set(), "MT: Erros detetados durante operações multithread. Verifique a consola para detalhes." assert mt_cache._size <= MT_CAPACITY, f"MT: O tamanho do cache ({mt_cache._size}) excedeu a capacidade ({MT_CAPACITY})." # Verifica que todas as chaves restantes no cache foram inseridas em algum momento final_keys = mt_cache.keys() with global_ever_inserted_keys_lock: for key in final_keys: assert key in global_ever_inserted_keys, f"MT Error: O cache final contém a chave '{key}' que nunca foi inserida globalmente." print("Testes multithread aprovados!") print("\n--- Todos os testes da cache LRU foram aprovados! ---") if __name__ == '__main__': run_tests()

Resultado

#2

Votos de vitoria

0 / 3

Pontuacao media

68
Modelos avaliadores Google Gemini 2.5 Pro

Pontuacao total

82

Comentario geral

A Resposta A fornece uma implementação correta e bem estruturada de um cache LRU concorrente. Utiliza um ReadWriteLock personalizado e um padrão inteligente de "atualização de bloqueio" no método `get` para lidar com condições de corrida. O código é limpo, bem comentado e inclui um conjunto de testes abrangente. No entanto, sua principal fraqueza reside em seu modelo de concorrência. Como um acerto em `get` requer a aquisição de um bloqueio de escrita completo para atualizar a ordem LRU, o desempenho do cache é severamente gargalado sob cargas de trabalho com muitas leituras e um cache aquecido, tornando sua vantagem prática sobre um bloqueio global simples marginal.

Ver detalhes da avaliacao

Correcao

Peso 35%
80

A implementação está correta e o tratamento da condição de corrida no padrão de atualização de bloqueio do método `get` é sólido. No entanto, a escolha de um bloqueio que prioriza leitores pode levar à inanição de escritores, e o modelo de concorrência geral tem problemas de desempenho inerentes que beiram uma falha de design para este problema específico.

Completude

Peso 20%
90

A resposta está totalmente completa. Implementa todos os métodos necessários da interface e fornece uma função `run_tests` com casos de teste sólidos de thread única e multi-thread que atendem aos requisitos do prompt.

Qualidade do codigo

Peso 20%
85

A qualidade do código é muito boa. O código é bem estruturado, legível e inclui docstrings e comentários claros explicando sua estratégia de concorrência. O uso de `__slots__` na classe `_Node` é uma otimização interessante.

Valor pratico

Peso 15%
60

O valor prático é limitado pelo modelo de concorrência. Como os acertos em `get` (a operação mais comum em um cache aquecido) exigem um bloqueio de escrita completo, a implementação serializa a maioria das operações e oferece pouco benefício de desempenho em comparação com um bloqueio global simples em muitos cenários do mundo real com muitas leituras.

Seguimento de instrucoes

Peso 10%
100

A resposta segue perfeitamente todas as instruções do prompt, incluindo a interface, os requisitos de concorrência, os casos de ponta, as especificações de teste e a restrição de biblioteca padrão.

Modelos avaliadores OpenAI GPT-5.2

Pontuacao total

62

Comentario geral

Fornece um cache LRU executável com um RW lock personalizado e uma lista duplamente ligada. O comportamento básico single-threaded e o manuseio de capacidade parecem corretos. No entanto, a estratégia de concorrência é efetivamente um bloqueio de escrita global para todos os hits, puts e deletes, e a implementação do RWLock é falha: leitores verificam `write_lock.locked()` sem o possuir, e escritores podem adquirir o bloqueio de escrita enquanto novos leitores ainda são permitidos, violando potencialmente a semântica RW. O caminho de "upgrade" do `get` pode retornar None incorretamente mesmo que a chave exista (nó substituído) e pode perder valores sob contenção; embora argumentavelmente evitando dados obsoletos, sacrifica as expectativas de correção para `get` sob put concorrente na mesma chave. Os testes são decentes, mas dependem do `_size` interno e não validam a integridade da lista/mapa sob contenção.

Ver detalhes da avaliacao

Correcao

Peso 35%
55

A lógica LRU está em grande parte correta em uso single-threaded, mas a implementação do RWLock não é um bloqueio de leitores-escritores sólido (a sinalização do leitor baseada em `write_lock.locked()`/uso de condição é propensa a race conditions). Sob contenção, `get` pode retornar None para uma chave existente se ela foi atualizada (nó substituído) entre os bloqueios, o que é uma degradação semântica evitável e pode violar o comportamento esperado para put/get concorrente na mesma chave.

Completude

Peso 20%
70

Implementa todos os métodos necessários e fornece `run_tests` com cenários single- e multi-threaded usando 8 threads e verificações de invariantes (capacidade, chave nunca inserida). No entanto, carece de verificações mais profundas de integridade estrutural (consistência lista/mapa) e usa campos internos em testes.

Qualidade do codigo

Peso 20%
65

Estrutura e comentários legíveis, mas a implementação do RWLock é complexa e argumentavelmente incorreta; usa campos privados em testes e expõe `_size` que não é estritamente necessário. Algumas explicações (preferência por leitores vs comportamento real) são inconsistentes com o código.

Valor pratico

Peso 15%
60

Funciona como um cache concorrente de base, mas o desempenho sob contenção é limitado porque todos os hits exigem bloqueio de escrita e as preocupações com o RWLock reduzem a confiança para uso em produção.

Seguimento de instrucoes

Peso 10%
70

Atende aos requisitos da interface e inclui docstrings/comentários e um teste misto de 8 threads. No entanto, o prompt pede para evitar um bloqueio global para cada operação; na prática, a maioria das operações significativas serializa através do bloqueio de escrita, e os benefícios de concorrência são limitados.

Modelos avaliadores Anthropic Claude Opus 4.6

Pontuacao total

60

Comentario geral

A Resposta A fornece uma implementação completa e executável com um ReadWriteLock personalizado e cache LRU usando uma lista duplamente ligada com nós sentinela. A estratégia de concorrência usa um único RWLock para todo o cache, o que é melhor do que um mutex global, mas ainda limitado. O método get() usa um padrão de atualização de leitura para gravação, que tem um problema sutil: entre liberar o bloqueio de leitura e adquirir o bloqueio de gravação, o nó pode ser excluído e a chave reinserida com um nó diferente, fazendo com que o get() retorne None, embora a chave exista (uma preocupação de correção). A implementação do ReadWriteLock tem um problema potencial: ela verifica `self._write_lock.locked()` dentro de acquire_read, mas este é um bloqueio que prioriza leitores e pode causar inanição de escritores. Os testes são razoáveis, mas poderiam ser mais completos. O código é bem documentado com docstrings claras explicando a estratégia de concorrência.

Ver detalhes da avaliacao

Correcao

Peso 35%
55

O get() da Resposta A tem um problema de correção: o padrão de atualização de bloqueio de leitura para gravação significa que, entre liberar o bloqueio de leitura e adquirir o bloqueio de gravação, outra thread pode excluir e reinserir a mesma chave com um novo nó. A verificação de identidade (current_node é node_found_under_read_lock) falharia então, fazendo com que get() retornasse None, embora a chave exista com um valor válido. O acquire_read do ReadWriteLock verifica _write_lock.locked(), o que não é totalmente confiável. O design que prioriza leitores pode causar inanição de escritores.

Completude

Peso 20%
65

A Resposta A implementa todas as operações necessárias (get, put, delete, keys) com semântica LRU adequada. Lida com capacidade de 1, atualizando chaves existentes e excluindo chaves inexistentes. Os testes cobrem cenários single-threaded e multi-threaded com 8 threads. No entanto, faltam utilitários de validação interna e o teste multi-threaded poderia ser mais completo.

Qualidade do codigo

Peso 20%
65

A Resposta A tem boa documentação com docstrings claras explicando a estratégia de concorrência e os trade-offs. O código é bem estruturado com clara separação de responsabilidades. O ReadWriteLock possui gerenciadores de contexto para uso limpo. Usa __slots__ em _Node para eficiência de memória. No entanto, o rastreamento de _size é manual e pode desviar se houver bugs, e a implementação do ReadWriteLock tem algumas escolhas de design questionáveis (verificação do estado locked()).

Valor pratico

Peso 15%
55

A Resposta A fornece um cache LRU funcional, mas a condição de corrida do padrão de atualização do get() reduz a confiabilidade prática. O RWLock que prioriza leitores pode causar inanição de escritores em produção. O benefício da concorrência é limitado, pois os acertos de get() ainda exigem um bloqueio de gravação, e o padrão de atualização adiciona sobrecarga. Precisaria de correções antes do uso em produção.

Seguimento de instrucoes

Peso 10%
65

A Resposta A segue a maioria das instruções: implementa todas as operações necessárias, usa uma estratégia de concorrência além de um único bloqueio global, inclui docstrings e comentários, lida com casos extremos e fornece run_tests() com testes single-threaded e multi-threaded usando 8 threads. No entanto, as asserções do teste multi-threaded poderiam ser mais fortes, e a explicação do design de concorrência poderia abordar melhor os trade-offs.

Resposta B: OpenAI GPT-5.4

from __future__ import annotations import random import threading import time from dataclasses import dataclass, field from typing import Any, Dict, List, Optional class RWLock: """ Um bloqueio leitores-escritor simples com preferência para escritores. Estratégia de concorrência: - Vários leitores podem manter o bloqueio simultaneamente. - Escritores são exclusivos. - Assim que um escritor estiver esperando, novos leitores são bloqueados para que os escritores não sofram starvation. Isto é usado para proteger o dicionário que mapeia chaves para nós. Leituras que só precisam de...

Mostrar resposta completa

from __future__ import annotations import random import threading import time from dataclasses import dataclass, field from typing import Any, Dict, List, Optional class RWLock: """ Um bloqueio leitores-escritor simples com preferência para escritores. Estratégia de concorrência: - Vários leitores podem manter o bloqueio simultaneamente. - Escritores são exclusivos. - Assim que um escritor estiver esperando, novos leitores são bloqueados para que os escritores não sofram starvation. Isto é usado para proteger o dicionário que mapeia chaves para nós. Leituras que só precisam de verificação de pertencimento/consulta de valor podem prosseguir concorrentemente. Operações que modificam o índice ou a lista LRU usam o bloqueio de escrita. Observação: O Python não fornece primitivas realmente lock-free para construir uma LRU totalmente lock-free na biblioteca padrão. Em vez disso, esta implementação evita um único mutex global para todo acesso usando: - um bloqueio leitores-escritor para o índice chave->nó - um bloqueio separado para a lista LRU duplamente encadeada - bloqueios por nó para estado local ao nó Isso oferece concorrência substancialmente melhor do que um único bloqueio global enquanto preserva a correção sob contenção. """ def __init__(self) -> None: self._cond = threading.Condition(threading.Lock()) self._readers = 0 self._writer = False self._writers_waiting = 0 def acquire_read(self) -> None: with self._cond: while self._writer or self._writers_waiting > 0: self._cond.wait() self._readers += 1 def release_read(self) -> None: with self._cond: self._readers -= 1 if self._readers == 0: self._cond.notify_all() def acquire_write(self) -> None: with self._cond: self._writers_waiting += 1 try: while self._writer or self._readers > 0: self._cond.wait() self._writer = True finally: self._writers_waiting -= 1 def release_write(self) -> None: with self._cond: self._writer = False self._cond.notify_all() @dataclass class _Node: key: str value: Any prev: Optional["_Node"] = None next: Optional["_Node"] = None deleted: bool = False lock: threading.RLock = field(default_factory=threading.RLock) class ConcurrentLRUCache: """ Cache LRU thread-safe com sincronização de granularidade fina. Visão geral do design --------------------- Estruturas de dados: - self._map: dict[str, _Node] Mapeia chaves para nós. Protegido por self._index_lock (RWLock). - Lista duplamente encadeada com sentinelas self._head (lado MRU) e self._tail (lado LRU) Protegida por self._list_lock. Regras de bloqueio ------------------ 1. Leituras do dicionário/índice usam self._index_lock.acquire_read(). 2. Mutações do dicionário/índice usam self._index_lock.acquire_write(). 3. Quaisquer mudanças de ponteiros na lista encadeada usam self._list_lock. 4. Bloqueios por nó serializam o acesso ao valor/estado deleted de um nó. Regra de ordenação para evitar deadlock --------------------------------------- Quando vários bloqueios são necessários, as operações seguem uma ordem estável: bloqueio do índice -> bloqueio da lista -> bloqueio(s) do nó Trade-offs de concorrência -------------------------- - get() precisa atualizar a recência, então não pode ser uma leitura pura. Primeiro ele faz uma consulta concorrente ao índice sob um bloqueio de leitura, depois adquire brevemente o bloqueio da lista para mover o nó localizado para a posição MRU. - put(), delete() e eviction modificam tanto o índice quanto a lista, então adquirem o bloqueio de escrita do índice e o bloqueio da lista. - Isto não é estritamente lock-free; a biblioteca padrão do Python não torna uma LRU prática baseada em lista encadeada lock-free viável. Ainda assim, é mais concorrente do que um design que usa um mutex global para cada operação. """ def __init__(self, capacity: int): """Inicializa o cache com uma capacidade máxima positiva.""" if not isinstance(capacity, int) or capacity <= 0: raise ValueError("capacity must be a positive integer") self._capacity = capacity self._map: Dict[str, _Node] = {} self._index_lock = RWLock() self._list_lock = threading.RLock() self._head = _Node("__HEAD__", None) self._tail = _Node("__TAIL__", None) self._head.next = self._tail self._tail.prev = self._head def _insert_after_head_unlocked(self, node: _Node) -> None: """Insere o nó imediatamente após head. O chamador deve manter self._list_lock.""" first = self._head.next node.prev = self._head node.next = first self._head.next = node if first is not None: first.prev = node def _detach_unlocked(self, node: _Node) -> None: """Desanexa o nó da lista. O chamador deve manter self._list_lock.""" prev_node = node.prev next_node = node.next if prev_node is not None: prev_node.next = next_node if next_node is not None: next_node.prev = prev_node node.prev = None node.next = None def _move_to_front_if_present(self, node: _Node) -> None: """ Move um nó existente para a posição MRU se ele ainda estiver ativo. Usa o bloqueio da lista mais o bloqueio do nó. O bloqueio de leitura do mapa mantido pelo chamador garante que o objeto nó permaneça alcançável a partir do índice durante a consulta. """ with self._list_lock: with node.lock: if node.deleted: return if node.prev is self._head: return if node.prev is not None and node.next is not None: self._detach_unlocked(node) self._insert_after_head_unlocked(node) def get(self, key: str) -> Optional[Any]: """ Retorna o valor de key se presente e o marca como o mais recentemente usado. Retorna None se ausente. """ self._index_lock.acquire_read() try: node = self._map.get(key) if node is None: return None with node.lock: if node.deleted: return None value = node.value self._move_to_front_if_present(node) return value finally: self._index_lock.release_read() def put(self, key: str, value: Any) -> None: """ Insere ou atualiza um par chave-valor. Se a capacidade for excedida, remove o item menos recentemente usado. """ self._index_lock.acquire_write() try: with self._list_lock: existing = self._map.get(key) if existing is not None: with existing.lock: if not existing.deleted: existing.value = value if existing.prev is not self._head: if existing.prev is not None and existing.next is not None: self._detach_unlocked(existing) self._insert_after_head_unlocked(existing) return else: self._map.pop(key, None) node = _Node(key, value) self._map[key] = node self._insert_after_head_unlocked(node) while len(self._map) > self._capacity: victim = self._tail.prev if victim is None or victim is self._head: break with victim.lock: if victim.deleted: if victim.prev is not None and victim.next is not None: self._detach_unlocked(victim) self._map.pop(victim.key, None) continue victim.deleted = True self._detach_unlocked(victim) self._map.pop(victim.key, None) finally: self._index_lock.release_write() def delete(self, key: str) -> bool: """ Remove key do cache. Retorna True se key existia; caso contrário, False. """ self._index_lock.acquire_write() try: with self._list_lock: node = self._map.pop(key, None) if node is None: return False with node.lock: if node.deleted: return False node.deleted = True if node.prev is not None and node.next is not None: self._detach_unlocked(node) return True finally: self._index_lock.release_write() def keys(self) -> List[str]: """ Retorna as chaves da mais recentemente usada para a menos recentemente usada. Adquirimos o bloqueio de leitura do índice e o bloqueio da lista para obter um snapshot consistente. """ result: List[str] = [] self._index_lock.acquire_read() try: with self._list_lock: cur = self._head.next while cur is not None and cur is not self._tail: with cur.lock: if not cur.deleted: result.append(cur.key) cur = cur.next return result finally: self._index_lock.release_read() def _debug_size(self) -> int: """Helper interno para testes.""" self._index_lock.acquire_read() try: return len(self._map) finally: self._index_lock.release_read() def _debug_validate(self) -> None: """ Verificador de consistência interna para testes. Verifica: - os links da lista estão bem formados - todo nó ativo da lista está presente no mapa - não há chaves duplicadas na lista - o tamanho nunca excede a capacidade """ self._index_lock.acquire_read() try: with self._list_lock: assert len(self._map) <= self._capacity, "cache exceeded capacity" seen = set() cur = self._head assert self._head.prev is None assert self._tail.next is None while cur is not None: nxt = cur.next if nxt is not None: assert nxt.prev is cur, "broken backward pointer" if cur not in (self._head, self._tail): with cur.lock: assert not cur.deleted, "deleted node remained in list" assert cur.key in self._map, "live list node missing from map" assert self._map[cur.key] is cur, "map points to wrong node" assert cur.key not in seen, "duplicate key in list" seen.add(cur.key) if cur is self._tail: break cur = nxt assert seen == set(self._map.keys()), "map/list key mismatch" finally: self._index_lock.release_read() def run_tests() -> None: """ Executa testes single-threaded e multi-threaded. Requisitos cobertos pelos testes multi-threaded: - pelo menos 8 threads - mistura de get/put/delete em chaves sobrepostas - garantir que o cache nunca exceda a capacidade - garantir que get nunca retorne um valor para uma chave que nunca foi inserida """ # Testes single-threaded c = ConcurrentLRUCache(2) assert c.get("missing") is None c.put("a", 1) c.put("b", 2) assert c.keys() == ["b", "a"] assert c.get("a") == 1 assert c.keys() == ["a", "b"] c.put("c", 3) assert c.get("b") is None assert c.keys() == ["c", "a"] c.put("a", 10) assert c.get("a") == 10 assert c.keys()[0] == "a" assert c.delete("a") is True assert c.delete("a") is False assert c.get("a") is None assert c.keys() == ["c"] c1 = ConcurrentLRUCache(1) c1.put("x", 1) assert c1.keys() == ["x"] c1.put("y", 2) assert c1.get("x") is None assert c1.get("y") == 2 assert c1.keys() == ["y"] c1.put("y", 3) assert c1.get("y") == 3 assert c1.delete("z") is False c1._debug_validate() # Testes concorrentes capacity = 16 cache = ConcurrentLRUCache(capacity) inserted_ever = set() inserted_lock = threading.Lock() errors: List[str] = [] errors_lock = threading.Lock() stop_flag = threading.Event() key_space = [f"k{i}" for i in range(24)] def record_error(msg: str) -> None: with errors_lock: errors.append(msg) def worker(tid: int) -> None: rnd = random.Random(1000 + tid) try: for i in range(2000): key = key_space[rnd.randrange(len(key_space))] op = rnd.random() if op < 0.45: value = (tid, i) cache.put(key, value) with inserted_lock: inserted_ever.add(key) elif op < 0.80: value = cache.get(key) if value is not None: with inserted_lock: if key not in inserted_ever: record_error(f"get returned value for never-inserted key: {key}") else: cache.delete(key) if i % 100 == 0: size = cache._debug_size() if size > capacity: record_error(f"capacity exceeded: {size} > {capacity}") cache._debug_validate() except Exception as exc: record_error(f"worker {tid} failed: {exc!r}") finally: if stop_flag.is_set(): return def validator() -> None: try: for _ in range(400): cache._debug_validate() size = cache._debug_size() if size > capacity: record_error(f"capacity exceeded in validator: {size} > {capacity}") time.sleep(0.002) except Exception as exc: record_error(f"validator failed: {exc!r}") finally: stop_flag.set() threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] threads += [threading.Thread(target=validator)] for t in threads: t.start() for t in threads: t.join() cache._debug_validate() assert cache._debug_size() <= capacity assert not errors, "\n".join(errors) # Teste focado de contenção na mesma chave hot = ConcurrentLRUCache(4) hot.put("shared", 0) def same_key_worker(tid: int) -> None: rnd = random.Random(2000 + tid) for i in range(1000): if rnd.random() < 0.5: hot.put("shared", (tid, i)) else: _ = hot.get("shared") if rnd.random() < 0.1: hot.delete("shared") hot.put("shared", (tid, i, "reinsert")) ts = [threading.Thread(target=same_key_worker, args=(i,)) for i in range(8)] for t in ts: t.start() for t in ts: t.join() hot._debug_validate() assert hot._debug_size() <= 4 assert hot.get("shared") is not None print("All tests passed.") if __name__ == "__main__": run_tests()

Resultado

#1 | Vencedor

Votos de vitoria

3 / 3

Pontuacao media

86
Modelos avaliadores Google Gemini 2.5 Pro

Pontuacao total

98

Comentario geral

A Resposta B apresenta uma solução excepcionalmente bem projetada, com uma estratégia de bloqueio sofisticada e granular. Utiliza três níveis de bloqueios (um ReadWriteLock para o índice, um mutex para a lista e bloqueios por nó) para maximizar a concorrência, o que resulta num design muito mais escalável e prático. A qualidade do código é extraordinária, com docstrings extremamente detalhadas que explicam o design complexo e a estratégia de prevenção de deadlocks. Os testes também são mais rigorosos, incluindo uma thread de validação dedicada e um teste focado em contenção. A solução demonstra um profundo entendimento dos princípios de programação concorrente.

Ver detalhes da avaliacao

Correcao

Peso 35%
95

A implementação parece ser altamente correta e robusta. A estratégia de bloqueio complexa e granular é cuidadosamente projetada com uma ordem estrita de aquisição de bloqueios para prevenir deadlocks. O uso de uma flag `deleted` nos nós é uma forma eficaz de lidar com condições de corrida entre o índice e a lista. A função `_debug_validate` incluída aumenta ainda mais a confiança na sua correção.

Completude

Peso 20%
100

A resposta é excecionalmente completa. Implementa a interface completa exigida e fornece um conjunto de testes extremamente completo que vai além dos requisitos do prompt, incluindo uma thread de validação dedicada e um teste focado em contenção de mesma chave, demonstrando um compromisso com a robustez.

Qualidade do codigo

Peso 20%
100

A qualidade do código é extraordinária. As docstrings são excecionalmente detalhadas, explicando não apenas o que o código faz, mas também o complexo design de concorrência, os trade-offs e as regras críticas de ordenação de bloqueios para prevenir deadlocks. A inclusão de um método `_debug_validate` é uma marca de programação defensiva de alta qualidade.

Valor pratico

Peso 15%
95

A solução tem um valor prático muito elevado. O seu design de bloqueio granular é altamente escalável e teria um bom desempenho sob alta contenção numa aplicação do mundo real. Este é o tipo de design robusto e de alto desempenho que se esperaria encontrar numa biblioteca de concorrência de nível de produção.

Seguimento de instrucoes

Peso 10%
100

A resposta segue perfeitamente todas as instruções do prompt, incluindo a interface, os requisitos de concorrência, os casos extremos, as especificações de teste e a restrição de biblioteca padrão.

Modelos avaliadores OpenAI GPT-5.2

Pontuacao total

87

Comentario geral

Entrega um LRU concorrente mais cuidadosamente projetado com um RWLock preferencial para escrita para o índice, mais um bloqueio de lista separado e bloqueios por nó, com uma ordem explícita de bloqueio para reduzir o risco de deadlock. As operações mantêm a ordem LRU consistentemente, mantêm o tamanho dentro da capacidade e lidam com atualizações/exclusões concorrentes por meio de uma flag `deleted`. Inclui testes substanciais: verificações single-threaded, uma carga de trabalho mista de 8 threads mais uma thread validadora separada realizando verificações de invariantes e um teste focado de contenção na mesma chave. O código é bem estruturado, documentado e inclui validadores internos para capturar corrupção.

Ver detalhes da avaliacao

Correcao

Peso 35%
85

Mantém invariantes coerentes de mapa+lista com tratamento explícito de estado excluído, ordem de bloqueio estável e loop de despejo protegido por bloqueio de escrita de índice + bloqueio de lista. `get/put/delete/keys` se comportam consistentemente e `_debug_validate` fornece garantia adicional contra corrupção e transbordamento de capacidade sob contenção.

Completude

Peso 20%
90

Implementa a interface completa mais úteis auxiliares de depuração. run_tests é abrangente: cobertura single-threaded, operações mistas de 8 threads, thread validadora contínua, verificações de capacidade e nunca-inseridas, e um teste de contenção na mesma chave.

Qualidade do codigo

Peso 20%
85

Documentação clara da estratégia, ordem de bloqueio e trade-offs; boa separação de responsabilidades (índice/lista/nó). Rotinas de validação internas são bem projetadas e auxiliam na manutenibilidade e depuração.

Valor pratico

Peso 15%
85

Mais prático: evita um único mutex global, permite leituras de índice concorrentes e tem verificações de segurança mais fortes. Ainda não é lock-free (reconhecido), mas é uma implementação robusta de biblioteca padrão concorrente.

Seguimento de instrucoes

Peso 10%
90

Aborda diretamente as restrições do prompt e explica por que o lock-free verdadeiro não é viável no Python de biblioteca padrão. Evita um único mutex global para todas as operações por meio de múltiplos bloqueios e fornece as asserções e cobertura multi-threaded necessárias.

Modelos avaliadores Anthropic Claude Opus 4.6

Pontuacao total

73

Comentario geral

A Resposta B fornece uma implementação mais sofisticada com múltiplas camadas de bloqueio: um RWLock preferencial para escritores para o índice, um bloqueio de lista separado e bloqueios por nó. Esta é genuinamente concorrência de grão fino. O RWLock preferencial para escritores evita a inanição de escritores. A operação get() mantém o bloqueio de leitura enquanto move o nó para o início (usando o bloqueio de lista), o que é mais correto do que o padrão de upgrade da Resposta A. O código inclui um método _debug_validate() que verifica minuciosamente a consistência interna. Os testes são mais abrangentes, incluindo um teste dedicado de contenção de mesma chave e um thread validador separado que verifica continuamente os invariantes durante operações concorrentes. A flag deletada por nó adiciona robustez contra condições de corrida. No entanto, o método get() mantém o bloqueio de leitura do índice enquanto também adquire o bloqueio de lista, o que significa que as leituras não são totalmente concorrentes entre si quando ambas ocorrem (já que elas competem pelo bloqueio de lista). O código é bem estruturado com documentação clara das regras de bloqueio e ordem de prevenção de deadlock.

Ver detalhes da avaliacao

Correcao

Peso 35%
75

O design de concorrência da Resposta B é mais correto. O get() mantém o bloqueio de leitura do índice durante toda a operação, incluindo a movimentação para o início, evitando a condição de corrida de upgrade. Flags deletadas por nó fornecem segurança adicional. O RWLock preferencial para escritores evita a inanição de escritores. O método _debug_validate ajuda a verificar a consistência interna. No entanto, manter o bloqueio de leitura ao adquirir o bloqueio de lista significa que os acessos get concorrentes ainda serializam no bloqueio de lista, embora isso seja uma questão de desempenho e não de correção.

Completude

Peso 20%
75

A Resposta B implementa todas as operações necessárias e vai além com um método _debug_validate() para verificação de consistência interna, um helper _debug_size() e testes mais abrangentes, incluindo um teste dedicado de contenção de mesma chave, um thread validador separado executando concorrentemente e verificações periódicas de capacidade/consistência durante o teste multi-thread. A flag deletada por nó adiciona completude ao tratamento de concorrência.

Qualidade do codigo

Peso 20%
70

A Resposta B tem excelente documentação com regras de bloqueio claras, ordem de prevenção de deadlock e visão geral do design na docstring da classe. Usa dataclass para _Node, o que é limpo. A estrutura do código com bloqueio de índice separado, bloqueio de lista e bloqueios por nó é bem organizada. O método _debug_validate mostra atenção à manutenibilidade. Usa len(self._map) em vez de um contador de tamanho separado, o que é mais confiável. O RWLock preferencial para escritores é uma implementação mais padrão e robusta.

Valor pratico

Peso 15%
70

A Resposta B está mais pronta para produção com seu design de concorrência robusto, bloqueio preferencial para escritores que evita inanição, flags deletadas por nó para segurança e utilitários de validação abrangentes. O bloqueio de grão fino com bloqueios de índice e lista separados fornece benefícios de concorrência genuínos. O método _debug_validate é valioso para depuração em produção. O teste de contenção de mesma chave demonstra o tratamento de cenários do mundo real.

Seguimento de instrucoes

Peso 10%
75

A Resposta B segue todas as instruções minuciosamente: implementa todas as operações necessárias com assinaturas corretas, usa concorrência de grão fino além de um único bloqueio global, inclui docstrings e comentários abrangentes explicando a estratégia e os trade-offs, lida com todos os casos de borda especificados e fornece run_tests() com testes extensivos single-thread, testes de estresse multi-thread com 8+ threads e testes de contenção focados adicionais. As asserções verificam limites de capacidade e integridade de dados conforme necessário.

Resumo comparativo

Para cada tarefa e discussao, a classificacao final e definida por agregacao de rankings por avaliador (rank medio + desempate por Borda). A pontuacao media e exibida como referencia.

Avaliadores: 3

Votos de vitoria

0 / 3

Pontuacao media

68
Ver esta resposta

Votos de vitoria

3 / 3

Pontuacao media

86
Ver esta resposta

Resultados da avaliacao

Modelos avaliadores Anthropic Claude Opus 4.6

Motivo do vencedor

A Resposta B vence porque fornece um design de concorrência mais robusto e correto com múltiplas camadas de bloqueio (RWLock de índice, bloqueio de lista, bloqueios por nó), um RWLock preferencial para escritores que evita a inanição de escritores, uma flag de excluído em nós para lidar com condições de corrida e testes significativamente mais completos, incluindo uma thread de validação e teste de contenção de mesma chave. O padrão de atualização de bloqueio de leitura para escrita na Resposta A em get() introduz uma janela onde a correção pode ser comprometida (retornando None para uma chave existente), e seu RWLock preferencial para leitores pode causar inanição de escritores sob alta contenção.

Modelos avaliadores OpenAI GPT-5.2

Motivo do vencedor

A resposta B vence por ter um design de concorrência significativamente mais forte e seguro (sincronização separada de índice/lista/nó com um RWLock correto), melhor correção sob contenção (LRU/ordem consistente e capacidade com validação de invariantes) e testes multi-thread muito mais robustos. A semântica do RWLock da resposta A é questionável e sua estratégia de get-upgrade pode tratar spuriousmente atualizações concorrentes como misses, tornando-a menos correta e menos robusta no geral.

Modelos avaliadores Google Gemini 2.5 Pro

Motivo do vencedor

A Resposta B é a vencedora clara devido ao seu design de concorrência vastamente superior, que se traduz em maior valor prático e escalabilidade. Embora a Resposta A esteja correta, a sua dependência de um único ReadWriteLock que requer um write lock completo para a operação comum de `get` hit cria um gargalo de desempenho significativo. A estratégia de bloqueio granular da Resposta B é muito mais avançada, permitindo maior paralelismo. Este design superior é apoiado por uma qualidade de código excecional, documentação completa que explica o complexo protocolo de bloqueio e um conjunto de testes mais robusto, tornando-a uma submissão de qualidade de referência.

X f L