Resposta A: Google Gemini 2.5 Flash
Resposta: import threading import random import time from typing import Any, Optional, List class _Node: """ Representa um nó na lista duplamente ligada usada para a ordenação LRU. Usa __slots__ para eficiência de memória. """ __slots__ = 'key', 'value', 'prev', 'next' def __init__(self, key: str, value: Any): self.key = key self.value = value self.prev = None self.next = None class ReadWriteLock: """ Uma implementação personalizada de bloqueio de Leitura-Escrita. Este bloqueio permite que múltiplos leitores mantenham o bloqueio simultaneament...
Mostrar resposta completa ▼
Resposta: import threading import random import time from typing import Any, Optional, List class _Node: """ Representa um nó na lista duplamente ligada usada para a ordenação LRU. Usa __slots__ para eficiência de memória. """ __slots__ = 'key', 'value', 'prev', 'next' def __init__(self, key: str, value: Any): self.key = key self.value = value self.prev = None self.next = None class ReadWriteLock: """ Uma implementação personalizada de bloqueio de Leitura-Escrita. Este bloqueio permite que múltiplos leitores mantenham o bloqueio simultaneamente, mas apenas um escritor por vez. Um escritor deve esperar que todos os leitores terminem, e os leitores devem esperar que um escritor termine. Esta é uma implementação com preferência para leitores, o que significa que novos leitores podem adquirir o bloqueio de leitura mesmo que um escritor esteja à espera. Esta classe também fornece gestores de contexto para tornar a aquisição e libertação de bloqueios mais limpas. """ def __init__(self): self._read_lock = threading.Lock() # Protege a contagem de _readers e a variável de condição self._write_lock = threading.Lock() # O bloqueio de escrita real self._readers = 0 # Número de leitores ativos self._no_readers = threading.Condition(self._read_lock) # Condição para escritores esperarem pelos leitores def acquire_read(self): """Adquire um bloqueio de leitura. Múltiplos leitores podem manter este bloqueio simultaneamente.""" with self._read_lock: # Espera se um escritor estiver atualmente a manter o bloqueio de escrita. # Novos leitores serão bloqueados se um escritor estiver ativo. while self._write_lock.locked(): self._no_readers.wait() self._readers += 1 def release_read(self): """Liberta um bloqueio de leitura.""" with self._read_lock: self._readers -= 1 # Se não restarem leitores, notifica quaisquer escritores à espera if self._readers == 0: self._no_readers.notify_all() def acquire_write(self): """Adquire um bloqueio de escrita. Apenas um escritor pode manter este bloqueio de cada vez.""" # Adquire primeiro o bloqueio principal de escrita para sinalizar intenção de escrever self._write_lock.acquire() with self._read_lock: # Depois adquire read_lock para esperar pelos leitores ativos # Espera até que todos os leitores ativos terminem while self._readers > 0: self._no_readers.wait() def release_write(self): """Liberta um bloqueio de escrita.""" with self._read_lock: # Adquire read_lock para notificar potenciais leitores/escritores self._no_readers.notify_all() # Notifica quaisquer leitores ou escritores à espera self._write_lock.release() # Liberta o bloqueio principal de escrita class _ReadContextManager: def __init__(self, lock): self.lock = lock def __enter__(self): self.lock.acquire_read() def __exit__(self, exc_type, exc_val, exc_tb): self.lock.release_read() class _WriteContextManager: def __init__(self, lock): self.lock = lock def __enter__(self): self.lock.acquire_write() def __exit__(self, exc_type, exc_val, exc_tb): self.lock.release_write() def read_locked(self): """Retorna um gestor de contexto para adquirir um bloqueio de leitura.""" return self._ReadContextManager(self) def write_locked(self): """Retorna um gestor de contexto para adquirir um bloqueio de escrita.""" return self._WriteContextManager(self) class LRUCache: """ Uma implementação de cache LRU (Least Recently Used) segura para threads. Este cache usa um ReadWriteLock para gerir o acesso concorrente. O design procura permitir leituras concorrentes para falhas de cache e para a operação `keys()`. No entanto, operações que modificam o estado do cache ou a ordem LRU (`put`, `delete`, e acertos de cache em `get`) adquirem um bloqueio de escrita, garantindo atomicidade e consistência para estas secções críticas. Esta abordagem oferece melhor concorrência do que um único mutex global para todas as operações, especialmente para cargas de trabalho com muitas leituras com falhas de cache frequentes ou chamadas a `keys()`. A operação `get` emprega um padrão de "upgrade": primeiro tenta um bloqueio de leitura para verificar a existência da chave. Se encontrada, liberta o bloqueio de leitura e adquire um bloqueio de escrita para atualizar a ordem LRU e obter o valor mais recente. Este padrão é cuidadosamente implementado para lidar com condições de corrida que possam ocorrer durante o upgrade do bloqueio (por exemplo, a chave ser apagada ou atualizada por outra thread). """ def __init__(self, capacity: int): if capacity <= 0: raise ValueError("A capacidade deve ser um inteiro positivo.") self._capacity = capacity self._cache = {} # Armazena key -> _Node para pesquisa O(1) self._size = 0 # Nós fictícios de cabeça e cauda para a lista duplamente ligada. # Isto simplifica casos extremos para adicionar/remover nós, já que os nós de dados reais # estão sempre entre a cabeça e a cauda. self._head = _Node("dummy_head", None) self._tail = _Node("dummy_tail", None) self._head.next = self._tail self._tail.prev = self._head # Controlo de concorrência: um único ReadWriteLock protege todo o estado do cache. self._rw_lock = ReadWriteLock() def _add_node(self, node: _Node): """Adiciona um nó logo após a cabeça (posição mais recentemente usada). Assume que o bloqueio de escrita está ativo.""" node.prev = self._head node.next = self._head.next self._head.next.prev = node self._head.next = node def _remove_node(self, node: _Node): """Remove um nó da lista duplamente ligada. Assume que o bloqueio de escrita está ativo.""" prev_node = node.prev next_node = node.next prev_node.next = next_node next_node.prev = prev_node def _move_to_front(self, node: _Node): """Move um nó existente para a frente da lista (mais recentemente usado). Assume que o bloqueio de escrita está ativo.""" self._remove_node(node) self._add_node(node) def _pop_tail(self) -> _Node: """Remove e retorna o nó menos recentemente usado (o que está imediatamente antes da cauda). Assume que o bloqueio de escrita está ativo.""" node = self._tail.prev self._remove_node(node) return node def get(self, key: str) -> Optional[Any]: """ Retorna o valor associado à chave se ela existir, e marca-a como recentemente usada. Retorna None se a chave não estiver no cache. Esta operação primeiro adquire um bloqueio de leitura para verificar a existência da chave. Se a chave for encontrada, liberta o bloqueio de leitura e depois adquire um bloqueio de escrita para atualizar a ordem LRU e obter o valor mais recente. Este padrão de "upgrade" permite falhas de `get` e operações `keys()` concorrentes, mas serializa acertos de `get` com outras escritas. """ node_found_under_read_lock = None with self._rw_lock.read_locked(): node_found_under_read_lock = self._cache.get(key) if not node_found_under_read_lock: return None # Se a chave foi encontrada, adquire um bloqueio de escrita para atualizar a ordem LRU e obter o valor mais recente. # Este é um padrão de "upgrade". Pode ocorrer uma condição de corrida aqui: outra thread # pode apagar ou atualizar a chave entre a libertação do bloqueio de leitura e a aquisição do bloqueio de escrita. with self._rw_lock.write_locked(): # Verifica novamente se a chave ainda existe e se se refere exatamente ao mesmo objeto nó. # Isto é crucial para a correção sob contenção. Se outra thread tiver executado # um `put` para a mesma chave, `_cache.get(key)` retornaria um *novo* nó. # Se outra thread tiver executado um `delete`, retornaria `None`. current_node = self._cache.get(key) if current_node is node_found_under_read_lock: # Garante que é exatamente o mesmo objeto nó self._move_to_front(node_found_under_read_lock) value = current_node.value # Obtém o valor mais recente depois de mover return value else: # O nó foi apagado ou substituído por outra operação `put` # entre a nossa aquisição do bloqueio de leitura e a do bloqueio de escrita. Trata como falha. return None def put(self, key: str, value: Any) -> None: """ Insere ou atualiza o par chave-valor. Se o cache exceder a capacidade após a inserção, remove o item menos recentemente usado. Esta é uma operação de escrita e adquire um bloqueio de escrita. """ with self._rw_lock.write_locked(): node = self._cache.get(key) if node: # A chave existe: atualiza o valor e move para a frente node.value = value self._move_to_front(node) else: # A chave não existe: adiciona um novo nó new_node = _Node(key, value) self._cache[key] = new_node self._add_node(new_node) self._size += 1 # Remove se a capacidade for excedida if self._size > self._capacity: lru_node = self._pop_tail() del self._cache[lru_node.key] self._size -= 1 def delete(self, key: str) -> bool: """ Remove a chave do cache. Retorna True se a chave estava presente, False caso contrário. Esta é uma operação de escrita e adquire um bloqueio de escrita. """ with self._rw_lock.write_locked(): node = self._cache.get(key) if node: self._remove_node(node) del self._cache[key] self._size -= 1 return True return False def keys(self) -> List[str]: """ Retorna uma lista de todas as chaves atualmente no cache, ordenadas da mais recentemente usada para a menos recentemente usada. Esta é uma operação de leitura e adquire um bloqueio de leitura. """ with self._rw_lock.read_locked(): keys_list = [] current = self._head.next while current is not self._tail: keys_list.append(current.key) current = current.next return keys_list def run_tests(): """ Executa uma série de testes de thread única e multithread para verificar a implementação de LRUCache. """ print("--- A iniciar testes da cache LRU ---") # --- Testes de thread única --- print("\n--- Testes de thread única ---") cache = LRUCache(capacity=3) assert cache.get("a") is None, "ST: Obter chave inexistente deve retornar None" assert cache.keys() == [], "ST: Cache vazio deve ter lista de chaves vazia" cache.put("a", 1) assert cache.get("a") == 1, "ST: Obter chave existente após put" assert cache.keys() == ["a"], "ST: Lista de chaves após um put" cache.put("b", 2) cache.put("c", 3) assert cache.keys() == ["c", "b", "a"], "ST: Lista de chaves após vários puts" assert cache._size == 3, "ST: O tamanho do cache deve ser 3" # Testar remoção por capacidade cache.put("d", 4) # 'a' deve ser removida por ser a LRU assert cache.get("a") is None, "ST: A chave removida 'a' deve ser None" assert cache.get("d") == 4, "ST: A nova chave 'd' deve estar presente" assert cache.keys() == ["d", "c", "b"], "ST: Lista de chaves após remoção" assert cache._size == 3, "ST: O tamanho do cache deve continuar a ser 3 após remoção" # Testar put com chave existente (atualizar valor e mover para a frente) cache.put("b", 22) # 'b' deve mover-se para a frente, valor atualizado assert cache.get("b") == 22, "ST: Valor atualizado para 'b'" assert cache.keys() == ["b", "d", "c"], "ST: Lista de chaves após atualizar e mover 'b'" # Testar delete assert cache.delete("c") is True, "ST: Apagar chave existente 'c'" assert cache.get("c") is None, "ST: A chave apagada 'c' deve ser None" assert cache.keys() == ["b", "d"], "ST: Lista de chaves após apagar 'c'" assert cache._size == 2, "ST: Tamanho do cache após delete" assert cache.delete("z") is False, "ST: Apagar chave inexistente 'z'" assert cache.keys() == ["b", "d"], "ST: Lista de chaves inalterada após apagar chave inexistente" # Testar capacidade 1 cache_cap1 = LRUCache(capacity=1) cache_cap1.put("x", 10) assert cache_cap1.get("x") == 10, "ST: Capacidade 1, obter 'x'" cache_cap1.put("y", 20) # 'x' deve ser removida assert cache_cap1.get("x") is None, "ST: Capacidade 1, 'x' removida" assert cache_cap1.get("y") == 20, "ST: Capacidade 1, obter 'y'" assert cache_cap1.keys() == ["y"], "ST: Capacidade 1, lista de chaves" print("Testes de thread única aprovados!") # --- Testes multithread --- print("\n--- Testes multithread ---") MT_CAPACITY = 10 MT_NUM_THREADS = 8 MT_OPS_PER_THREAD = 2000 MT_KEY_RANGE = 20 # Chaves de 0 a 19 (por ex., "0", "1", ..., "19") mt_cache = LRUCache(capacity=MT_CAPACITY) # Barreira para garantir que todas as threads de trabalho iniciam as suas operações simultaneamente barrier = threading.Barrier(MT_NUM_THREADS + 1) # +1 para a thread principal error_flag = threading.Event() # Definida se alguma thread encontrar um erro # Rastreia chaves que *alguma vez* foram inseridas com sucesso no cache por qualquer thread. # Usado para verificar que `get` nunca retorna um valor para uma chave que realmente nunca foi inserida. global_ever_inserted_keys = set() global_ever_inserted_keys_lock = threading.Lock() def mt_worker(cache_instance, thread_id, barrier_instance, error_event, ever_inserted_keys_set, ever_inserted_keys_lock): """Função worker para testes multithread.""" barrier_instance.wait() # Espera que todas as threads estejam prontas para iniciar operações for i in range(MT_OPS_PER_THREAD): if error_event.is_set(): break # Para se outra thread já encontrou um erro # Escolhe aleatoriamente uma operação com viés para 'get' e 'put' op = random.choices(['get', 'put', 'delete'], weights=[0.5, 0.4, 0.1], k=1)[0] key = str(random.randint(0, MT_KEY_RANGE - 1)) value = f"value_T{thread_id}_K{key}_I{i}" if op == 'put': cache_instance.put(key, value) with ever_inserted_keys_lock: ever_inserted_keys_set.add(key) elif op == 'get': val = cache_instance.get(key) if val is not None: # Afirma que, se um valor for retornado, a chave deve ter sido inserida em algum momento. with ever_inserted_keys_lock: if key not in ever_inserted_keys_set: print(f"MT Error: Thread {thread_id} obteve o valor '{val}' para a chave '{key}' " f"que nunca foi inserida globalmente. Isto não deveria acontecer.") error_event.set() break # Afirma que o valor não está corrompido (verificação simples de formato) if not isinstance(val, str) or not val.startswith("value_T"): print(f"MT Error: Thread {thread_id} obteve um valor corrompido '{val}' para a chave '{key}'.") error_event.set() break elif op == 'delete': cache_instance.delete(key) # Opcional: introduzir um pequeno sleep aleatório para aumentar a troca de contexto e a contenção # time.sleep(random.uniform(0.00001, 0.0001)) # print(f"Thread {thread_id} terminou.") # Descomente para saída detalhada threads = [] for i in range(MT_NUM_THREADS): thread = threading.Thread(target=mt_worker, args=(mt_cache, i, barrier, error_flag, global_ever_inserted_keys, global_ever_inserted_keys_lock)) threads.append(thread) thread.start() # Espera que todas as threads de trabalho cheguem à barreira e iniciem as operações barrier.wait() print(f"Todas as {MT_NUM_THREADS} threads iniciaram as operações.") # Espera que todas as threads de trabalho completem as suas operações for thread in threads: thread.join() print("Todas as threads terminaram.") # Afirmações finais para testes multithread assert not error_flag.is_set(), "MT: Erros detetados durante operações multithread. Verifique a consola para detalhes." assert mt_cache._size <= MT_CAPACITY, f"MT: O tamanho do cache ({mt_cache._size}) excedeu a capacidade ({MT_CAPACITY})." # Verifica que todas as chaves restantes no cache foram inseridas em algum momento final_keys = mt_cache.keys() with global_ever_inserted_keys_lock: for key in final_keys: assert key in global_ever_inserted_keys, f"MT Error: O cache final contém a chave '{key}' que nunca foi inserida globalmente." print("Testes multithread aprovados!") print("\n--- Todos os testes da cache LRU foram aprovados! ---") if __name__ == '__main__': run_tests()
Resultado
Votos de vitoria
0 / 3
Pontuacao media
Pontuacao total
Comentario geral
A Resposta A fornece uma implementação correta e bem estruturada de um cache LRU concorrente. Utiliza um ReadWriteLock personalizado e um padrão inteligente de "atualização de bloqueio" no método `get` para lidar com condições de corrida. O código é limpo, bem comentado e inclui um conjunto de testes abrangente. No entanto, sua principal fraqueza reside em seu modelo de concorrência. Como um acerto em `get` requer a aquisição de um bloqueio de escrita completo para atualizar a ordem LRU, o desempenho do cache é severamente gargalado sob cargas de trabalho com muitas leituras e um cache aquecido, tornando sua vantagem prática sobre um bloqueio global simples marginal.
Ver detalhes da avaliacao ▼
Correcao
Peso 35%A implementação está correta e o tratamento da condição de corrida no padrão de atualização de bloqueio do método `get` é sólido. No entanto, a escolha de um bloqueio que prioriza leitores pode levar à inanição de escritores, e o modelo de concorrência geral tem problemas de desempenho inerentes que beiram uma falha de design para este problema específico.
Completude
Peso 20%A resposta está totalmente completa. Implementa todos os métodos necessários da interface e fornece uma função `run_tests` com casos de teste sólidos de thread única e multi-thread que atendem aos requisitos do prompt.
Qualidade do codigo
Peso 20%A qualidade do código é muito boa. O código é bem estruturado, legível e inclui docstrings e comentários claros explicando sua estratégia de concorrência. O uso de `__slots__` na classe `_Node` é uma otimização interessante.
Valor pratico
Peso 15%O valor prático é limitado pelo modelo de concorrência. Como os acertos em `get` (a operação mais comum em um cache aquecido) exigem um bloqueio de escrita completo, a implementação serializa a maioria das operações e oferece pouco benefício de desempenho em comparação com um bloqueio global simples em muitos cenários do mundo real com muitas leituras.
Seguimento de instrucoes
Peso 10%A resposta segue perfeitamente todas as instruções do prompt, incluindo a interface, os requisitos de concorrência, os casos de ponta, as especificações de teste e a restrição de biblioteca padrão.
Pontuacao total
Comentario geral
Fornece um cache LRU executável com um RW lock personalizado e uma lista duplamente ligada. O comportamento básico single-threaded e o manuseio de capacidade parecem corretos. No entanto, a estratégia de concorrência é efetivamente um bloqueio de escrita global para todos os hits, puts e deletes, e a implementação do RWLock é falha: leitores verificam `write_lock.locked()` sem o possuir, e escritores podem adquirir o bloqueio de escrita enquanto novos leitores ainda são permitidos, violando potencialmente a semântica RW. O caminho de "upgrade" do `get` pode retornar None incorretamente mesmo que a chave exista (nó substituído) e pode perder valores sob contenção; embora argumentavelmente evitando dados obsoletos, sacrifica as expectativas de correção para `get` sob put concorrente na mesma chave. Os testes são decentes, mas dependem do `_size` interno e não validam a integridade da lista/mapa sob contenção.
Ver detalhes da avaliacao ▼
Correcao
Peso 35%A lógica LRU está em grande parte correta em uso single-threaded, mas a implementação do RWLock não é um bloqueio de leitores-escritores sólido (a sinalização do leitor baseada em `write_lock.locked()`/uso de condição é propensa a race conditions). Sob contenção, `get` pode retornar None para uma chave existente se ela foi atualizada (nó substituído) entre os bloqueios, o que é uma degradação semântica evitável e pode violar o comportamento esperado para put/get concorrente na mesma chave.
Completude
Peso 20%Implementa todos os métodos necessários e fornece `run_tests` com cenários single- e multi-threaded usando 8 threads e verificações de invariantes (capacidade, chave nunca inserida). No entanto, carece de verificações mais profundas de integridade estrutural (consistência lista/mapa) e usa campos internos em testes.
Qualidade do codigo
Peso 20%Estrutura e comentários legíveis, mas a implementação do RWLock é complexa e argumentavelmente incorreta; usa campos privados em testes e expõe `_size` que não é estritamente necessário. Algumas explicações (preferência por leitores vs comportamento real) são inconsistentes com o código.
Valor pratico
Peso 15%Funciona como um cache concorrente de base, mas o desempenho sob contenção é limitado porque todos os hits exigem bloqueio de escrita e as preocupações com o RWLock reduzem a confiança para uso em produção.
Seguimento de instrucoes
Peso 10%Atende aos requisitos da interface e inclui docstrings/comentários e um teste misto de 8 threads. No entanto, o prompt pede para evitar um bloqueio global para cada operação; na prática, a maioria das operações significativas serializa através do bloqueio de escrita, e os benefícios de concorrência são limitados.
Pontuacao total
Comentario geral
A Resposta A fornece uma implementação completa e executável com um ReadWriteLock personalizado e cache LRU usando uma lista duplamente ligada com nós sentinela. A estratégia de concorrência usa um único RWLock para todo o cache, o que é melhor do que um mutex global, mas ainda limitado. O método get() usa um padrão de atualização de leitura para gravação, que tem um problema sutil: entre liberar o bloqueio de leitura e adquirir o bloqueio de gravação, o nó pode ser excluído e a chave reinserida com um nó diferente, fazendo com que o get() retorne None, embora a chave exista (uma preocupação de correção). A implementação do ReadWriteLock tem um problema potencial: ela verifica `self._write_lock.locked()` dentro de acquire_read, mas este é um bloqueio que prioriza leitores e pode causar inanição de escritores. Os testes são razoáveis, mas poderiam ser mais completos. O código é bem documentado com docstrings claras explicando a estratégia de concorrência.
Ver detalhes da avaliacao ▼
Correcao
Peso 35%O get() da Resposta A tem um problema de correção: o padrão de atualização de bloqueio de leitura para gravação significa que, entre liberar o bloqueio de leitura e adquirir o bloqueio de gravação, outra thread pode excluir e reinserir a mesma chave com um novo nó. A verificação de identidade (current_node é node_found_under_read_lock) falharia então, fazendo com que get() retornasse None, embora a chave exista com um valor válido. O acquire_read do ReadWriteLock verifica _write_lock.locked(), o que não é totalmente confiável. O design que prioriza leitores pode causar inanição de escritores.
Completude
Peso 20%A Resposta A implementa todas as operações necessárias (get, put, delete, keys) com semântica LRU adequada. Lida com capacidade de 1, atualizando chaves existentes e excluindo chaves inexistentes. Os testes cobrem cenários single-threaded e multi-threaded com 8 threads. No entanto, faltam utilitários de validação interna e o teste multi-threaded poderia ser mais completo.
Qualidade do codigo
Peso 20%A Resposta A tem boa documentação com docstrings claras explicando a estratégia de concorrência e os trade-offs. O código é bem estruturado com clara separação de responsabilidades. O ReadWriteLock possui gerenciadores de contexto para uso limpo. Usa __slots__ em _Node para eficiência de memória. No entanto, o rastreamento de _size é manual e pode desviar se houver bugs, e a implementação do ReadWriteLock tem algumas escolhas de design questionáveis (verificação do estado locked()).
Valor pratico
Peso 15%A Resposta A fornece um cache LRU funcional, mas a condição de corrida do padrão de atualização do get() reduz a confiabilidade prática. O RWLock que prioriza leitores pode causar inanição de escritores em produção. O benefício da concorrência é limitado, pois os acertos de get() ainda exigem um bloqueio de gravação, e o padrão de atualização adiciona sobrecarga. Precisaria de correções antes do uso em produção.
Seguimento de instrucoes
Peso 10%A Resposta A segue a maioria das instruções: implementa todas as operações necessárias, usa uma estratégia de concorrência além de um único bloqueio global, inclui docstrings e comentários, lida com casos extremos e fornece run_tests() com testes single-threaded e multi-threaded usando 8 threads. No entanto, as asserções do teste multi-threaded poderiam ser mais fortes, e a explicação do design de concorrência poderia abordar melhor os trade-offs.