Respuesta A: Google Gemini 2.5 Flash
import threading import random import time from typing import Any, Optional, List class _Node: """ Representa un nodo en la lista doblemente enlazada utilizada para el ordenamiento LRU. Usa __slots__ para eficiencia de memoria. """ __slots__ = 'key', 'value', 'prev', 'next' def __init__(self, key: str, value: Any): self.key = key self.value = value self.prev = None self.next = None class ReadWriteLock: """ Una implementación personalizada de bloqueo de lectura-escritura. Este bloqueo permite que múltiples lectores mantengan el bloqueo de forma...
Mostrar respuesta completa ▼
import threading import random import time from typing import Any, Optional, List class _Node: """ Representa un nodo en la lista doblemente enlazada utilizada para el ordenamiento LRU. Usa __slots__ para eficiencia de memoria. """ __slots__ = 'key', 'value', 'prev', 'next' def __init__(self, key: str, value: Any): self.key = key self.value = value self.prev = None self.next = None class ReadWriteLock: """ Una implementación personalizada de bloqueo de lectura-escritura. Este bloqueo permite que múltiples lectores mantengan el bloqueo de forma concurrente, pero solo un escritor a la vez. Un escritor debe esperar a que todos los lectores terminen, y los lectores deben esperar a que un escritor termine. Esta es una implementación con preferencia por los lectores, lo que significa que nuevos lectores pueden adquirir el bloqueo de lectura incluso si un escritor está esperando. Esta clase también proporciona administradores de contexto para una adquisición y liberación de bloqueos más limpias. """ def __init__(self): self._read_lock = threading.Lock() # Protege el conteo de _readers y la variable de condición self._write_lock = threading.Lock() # El bloqueo de escritura real self._readers = 0 # Número de lectores activos self._no_readers = threading.Condition(self._read_lock) # Condición para que los escritores esperen a los lectores def acquire_read(self): """Adquiere un bloqueo de lectura. Múltiples lectores pueden mantener este bloqueo concurrentemente.""" with self._read_lock: # Espera si un escritor mantiene actualmente el bloqueo de escritura. # Los nuevos lectores se bloquearán si un escritor está activo. while self._write_lock.locked(): self._no_readers.wait() self._readers += 1 def release_read(self): """Libera un bloqueo de lectura.""" with self._read_lock: self._readers -= 1 # Si no quedan lectores, notifica a cualquier escritor en espera if self._readers == 0: self._no_readers.notify_all() def acquire_write(self): """Adquiere un bloqueo de escritura. Solo un escritor puede mantener este bloqueo a la vez.""" # Adquiere primero el bloqueo principal de escritura para señalar la intención de escribir self._write_lock.acquire() with self._read_lock: # Luego adquiere read_lock para esperar a los lectores activos # Espera hasta que todos los lectores activos hayan terminado while self._readers > 0: self._no_readers.wait() def release_write(self): """Libera un bloqueo de escritura.""" with self._read_lock: # Adquiere read_lock para notificar a posibles lectores/escritores self._no_readers.notify_all() # Notifica a cualquier lector o escritor en espera self._write_lock.release() # Libera el bloqueo principal de escritura class _ReadContextManager: def __init__(self, lock): self.lock = lock def __enter__(self): self.lock.acquire_read() def __exit__(self, exc_type, exc_val, exc_tb): self.lock.release_read() class _WriteContextManager: def __init__(self, lock): self.lock = lock def __enter__(self): self.lock.acquire_write() def __exit__(self, exc_type, exc_val, exc_tb): self.lock.release_write() def read_locked(self): """Devuelve un administrador de contexto para adquirir un bloqueo de lectura.""" return self._ReadContextManager(self) def write_locked(self): """Devuelve un administrador de contexto para adquirir un bloqueo de escritura.""" return self._WriteContextManager(self) class LRUCache: """ Una implementación de caché LRU (Least Recently Used) segura para hilos. Esta caché usa un ReadWriteLock para gestionar el acceso concurrente. El diseño busca permitir lecturas concurrentes para fallos de caché y la operación `keys()`. Sin embargo, las operaciones que modifican el estado de la caché o el orden LRU (`put`, `delete`, y aciertos de caché en `get`) adquieren un bloqueo de escritura, garantizando atomicidad y consistencia para estas secciones críticas. Este enfoque proporciona mejor concurrencia que un único mutex global para todas las operaciones, especialmente para cargas de trabajo intensivas en lectura con fallos de caché frecuentes o llamadas a `keys()`. La operación `get` emplea un patrón de "actualización": primero intenta adquirir un bloqueo de lectura para comprobar la existencia de la clave. Si la encuentra, libera el bloqueo de lectura y adquiere un bloqueo de escritura para actualizar el orden LRU y recuperar el valor más reciente. Este patrón se implementa cuidadosamente para manejar las condiciones de carrera que pueden ocurrir durante la actualización del bloqueo (p. ej., que otra hebra elimine o actualice la clave). """ def __init__(self, capacity: int): if capacity <= 0: raise ValueError("La capacidad debe ser un entero positivo.") self._capacity = capacity self._cache = {} # Almacena key -> _Node para búsqueda O(1) self._size = 0 # Nodos ficticios de cabeza y cola para la lista doblemente enlazada. # Esto simplifica los casos límite para agregar/eliminar nodos, ya que los nodos de datos reales # siempre están entre la cabeza y la cola. self._head = _Node("dummy_head", None) self._tail = _Node("dummy_tail", None) self._head.next = self._tail self._tail.prev = self._head # Control de concurrencia: un único ReadWriteLock protege todo el estado de la caché. self._rw_lock = ReadWriteLock() def _add_node(self, node: _Node): """Agrega un nodo justo después de la cabeza (posición de uso más reciente). Asume que se mantiene el bloqueo de escritura.""" node.prev = self._head node.next = self._head.next self._head.next.prev = node self._head.next = node def _remove_node(self, node: _Node): """Elimina un nodo de la lista doblemente enlazada. Asume que se mantiene el bloqueo de escritura.""" prev_node = node.prev next_node = node.next prev_node.next = next_node next_node.prev = prev_node def _move_to_front(self, node: _Node): """Mueve un nodo existente al frente de la lista (más recientemente usado). Asume que se mantiene el bloqueo de escritura.""" self._remove_node(node) self._add_node(node) def _pop_tail(self) -> _Node: """Elimina y devuelve el nodo menos recientemente usado (el que está justo antes de la cola). Asume que se mantiene el bloqueo de escritura.""" node = self._tail.prev self._remove_node(node) return node def get(self, key: str) -> Optional[Any]: """ Devuelve el valor asociado con la clave si existe, y la marca como usada recientemente. Devuelve None si la clave no está en la caché. Esta operación primero adquiere un bloqueo de lectura para comprobar la existencia de la clave. Si la clave se encuentra, libera el bloqueo de lectura y luego adquiere un bloqueo de escritura para actualizar el orden LRU y recuperar el valor más reciente. Este patrón de "actualización" permite operaciones concurrentes de fallo en `get` y operaciones `keys()`, pero serializa los aciertos de `get` con otras escrituras. """ node_found_under_read_lock = None with self._rw_lock.read_locked(): node_found_under_read_lock = self._cache.get(key) if not node_found_under_read_lock: return None # Si se encontró la clave, adquiere un bloqueo de escritura para actualizar el orden LRU y obtener el valor más reciente. # Este es un patrón de "actualización". Aquí puede ocurrir una condición de carrera: otra hebra # podría eliminar o actualizar la clave entre la liberación del bloqueo de lectura y la adquisición del bloqueo de escritura. with self._rw_lock.write_locked(): # Vuelve a comprobar si la clave aún existe y se refiere exactamente al mismo objeto nodo. # Esto es crucial para la corrección bajo contención. Si otra hebra realizó # un `put` para la misma clave, `_cache.get(key)` devolvería un nodo *nuevo*. # Si otra hebra realizó un `delete`, devolvería `None`. current_node = self._cache.get(key) if current_node is node_found_under_read_lock: # Asegura que sea exactamente el mismo objeto nodo self._move_to_front(node_found_under_read_lock) value = current_node.value # Obtiene el valor más reciente después de moverlo return value else: # El nodo fue eliminado o reemplazado por otra operación 'put' # entre nuestra adquisición del bloqueo de lectura y el de escritura. Trátalo como un fallo. return None def put(self, key: str, value: Any) -> None: """ Inserta o actualiza el par clave-valor. Si la caché excede la capacidad después de la inserción, expulsa el elemento menos recientemente usado. Esta es una operación de escritura y adquiere un bloqueo de escritura. """ with self._rw_lock.write_locked(): node = self._cache.get(key) if node: # La clave existe: actualiza el valor y la mueve al frente node.value = value self._move_to_front(node) else: # La clave no existe: agrega un nodo nuevo new_node = _Node(key, value) self._cache[key] = new_node self._add_node(new_node) self._size += 1 # Expulsa si se excedió la capacidad if self._size > self._capacity: lru_node = self._pop_tail() del self._cache[lru_node.key] self._size -= 1 def delete(self, key: str) -> bool: """ Elimina la clave de la caché. Devuelve True si la clave estaba presente, False en caso contrario. Esta es una operación de escritura y adquiere un bloqueo de escritura. """ with self._rw_lock.write_locked(): node = self._cache.get(key) if node: self._remove_node(node) del self._cache[key] self._size -= 1 return True return False def keys(self) -> List[str]: """ Devuelve una lista de todas las claves actualmente en la caché, ordenadas de la más recientemente usada a la menos recientemente usada. Esta es una operación de lectura y adquiere un bloqueo de lectura. """ with self._rw_lock.read_locked(): keys_list = [] current = self._head.next while current is not self._tail: keys_list.append(current.key) current = current.next return keys_list def run_tests(): """ Ejecuta una serie de pruebas de un solo hilo y de múltiples hilos para verificar la implementación de LRUCache. """ print("--- Iniciando pruebas de la caché LRU ---") # --- Pruebas de un solo hilo --- print("\n--- Pruebas de un solo hilo ---") cache = LRUCache(capacity=3) assert cache.get("a") is None, "ST: Obtener una clave inexistente debe devolver None" assert cache.keys() == [], "ST: Una caché vacía debe tener una lista de claves vacía" cache.put("a", 1) assert cache.get("a") == 1, "ST: Obtener una clave existente después de put" assert cache.keys() == ["a"], "ST: Lista de claves después de un put" cache.put("b", 2) cache.put("c", 3) assert cache.keys() == ["c", "b", "a"], "ST: Lista de claves después de múltiples puts" assert cache._size == 3, "ST: El tamaño de la caché debe ser 3" # Probar la expulsión por capacidad cache.put("d", 4) # 'a' debe ser expulsada por ser la LRU assert cache.get("a") is None, "ST: La clave expulsada 'a' debe ser None" assert cache.get("d") == 4, "ST: La nueva clave 'd' debe estar presente" assert cache.keys() == ["d", "c", "b"], "ST: Lista de claves después de la expulsión" assert cache._size == 3, "ST: El tamaño de la caché debe seguir siendo 3 después de la expulsión" # Probar put con una clave existente (actualizar valor y mover al frente) cache.put("b", 22) # 'b' debe moverse al frente, valor actualizado assert cache.get("b") == 22, "ST: Valor actualizado para 'b'" assert cache.keys() == ["b", "d", "c"], "ST: Lista de claves después de actualizar y mover 'b'" # Probar delete assert cache.delete("c") is True, "ST: Eliminar la clave existente 'c'" assert cache.get("c") is None, "ST: La clave eliminada 'c' debe ser None" assert cache.keys() == ["b", "d"], "ST: Lista de claves después de eliminar 'c'" assert cache._size == 2, "ST: Tamaño de la caché después de delete" assert cache.delete("z") is False, "ST: Eliminar la clave inexistente 'z'" assert cache.keys() == ["b", "d"], "ST: Lista de claves sin cambios después de eliminar una clave inexistente" # Probar capacidad 1 cache_cap1 = LRUCache(capacity=1) cache_cap1.put("x", 10) assert cache_cap1.get("x") == 10, "ST: Capacidad 1, obtener 'x'" cache_cap1.put("y", 20) # 'x' debe ser expulsada assert cache_cap1.get("x") is None, "ST: Capacidad 1, 'x' expulsada" assert cache_cap1.get("y") == 20, "ST: Capacidad 1, obtener 'y'" assert cache_cap1.keys() == ["y"], "ST: Capacidad 1, lista de claves" print("¡Las pruebas de un solo hilo pasaron!") # --- Pruebas de múltiples hilos --- print("\n--- Pruebas de múltiples hilos ---") MT_CAPACITY = 10 MT_NUM_THREADS = 8 MT_OPS_PER_THREAD = 2000 MT_KEY_RANGE = 20 # Claves de 0 a 19 (p. ej., "0", "1", ..., "19") mt_cache = LRUCache(capacity=MT_CAPACITY) # Barrera para asegurar que todos los hilos trabajadores inicien sus operaciones simultáneamente barrier = threading.Barrier(MT_NUM_THREADS + 1) # +1 para el hilo principal error_flag = threading.Event() # Se activa si algún hilo encuentra un error # Rastrea claves que *alguna vez* fueron insertadas con éxito en la caché por cualquier hilo. # Se usa para verificar que 'get' nunca devuelva un valor para una clave que realmente nunca fue insertada. global_ever_inserted_keys = set() global_ever_inserted_keys_lock = threading.Lock() def mt_worker(cache_instance, thread_id, barrier_instance, error_event, ever_inserted_keys_set, ever_inserted_keys_lock): """Función trabajadora para pruebas de múltiples hilos.""" barrier_instance.wait() # Espera a que todos los hilos estén listos para iniciar operaciones for i in range(MT_OPS_PER_THREAD): if error_event.is_set(): break # Detenerse si otro hilo ya encontró un error # Elegir aleatoriamente una operación con sesgo hacia 'get' y 'put' op = random.choices(['get', 'put', 'delete'], weights=[0.5, 0.4, 0.1], k=1)[0] key = str(random.randint(0, MT_KEY_RANGE - 1)) value = f"value_T{thread_id}_K{key}_I{i}" if op == 'put': cache_instance.put(key, value) with ever_inserted_keys_lock: ever_inserted_keys_set.add(key) elif op == 'get': val = cache_instance.get(key) if val is not None: # Verifica que si se devuelve un valor, la clave debe haberse insertado en algún momento. with ever_inserted_keys_lock: if key not in ever_inserted_keys_set: print(f"MT Error: El hilo {thread_id} obtuvo el valor '{val}' para la clave '{key}' " f"que nunca fue insertada globalmente. Esto no debería ocurrir.") error_event.set() break # Verifica que el valor no esté corrupto (comprobación simple del formato) if not isinstance(val, str) or not val.startswith("value_T"): print(f"MT Error: El hilo {thread_id} obtuvo un valor corrupto '{val}' para la clave '{key}'.") error_event.set() break elif op == 'delete': cache_instance.delete(key) # Opcional: Introducir una pequeña pausa aleatoria para aumentar el cambio de contexto y la contención # time.sleep(random.uniform(0.00001, 0.0001)) # print(f"El hilo {thread_id} terminó.") # Descomentar para salida detallada threads = [] for i in range(MT_NUM_THREADS): thread = threading.Thread(target=mt_worker, args=(mt_cache, i, barrier, error_flag, global_ever_inserted_keys, global_ever_inserted_keys_lock)) threads.append(thread) thread.start() # Esperar a que todos los hilos trabajadores lleguen a la barrera e inicien operaciones barrier.wait() print(f"Todos los {MT_NUM_THREADS} hilos iniciaron operaciones.") # Esperar a que todos los hilos trabajadores completen sus operaciones for thread in threads: thread.join() print("Todos los hilos completaron.") # Aserciones finales para pruebas de múltiples hilos assert not error_flag.is_set(), "MT: Se detectaron errores durante las operaciones multihilo. Revise la consola para más detalles." assert mt_cache._size <= MT_CAPACITY, f"MT: El tamaño de la caché ({mt_cache._size}) excedió la capacidad ({MT_CAPACITY})." # Verificar que todas las claves que permanecen en la caché fueron insertadas en algún momento final_keys = mt_cache.keys() with global_ever_inserted_keys_lock: for key in final_keys: assert key in global_ever_inserted_keys, f"MT Error: La caché final contiene la clave '{key}' que nunca fue insertada globalmente." print("¡Las pruebas multihilo pasaron!") print("\n--- ¡Todas las pruebas de la caché LRU pasaron! ---") if __name__ == '__main__': run_tests()
Resultado
Votos ganadores
0 / 3
Puntuacion media
Puntuacion total
Comentario general
La Respuesta A proporciona una implementación correcta y bien estructurada de una caché LRU concurrente. Utiliza un ReadWriteLock personalizado y un ingenioso patrón de "actualización de bloqueo" en el método `get` para manejar las condiciones de carrera. El código es limpio, está bien comentado e incluye un conjunto de pruebas completo. Sin embargo, su principal debilidad radica en su modelo de concurrencia. Dado que un acierto en `get` requiere adquirir un bloqueo de escritura completo para actualizar el orden LRU, el rendimiento de la caché se ve gravemente limitado en cargas de trabajo con muchas lecturas y una caché caliente, lo que hace que su ventaja práctica sobre un simple bloqueo global sea marginal.
Ver detalle de evaluacion ▼
Correccion
Peso 35%La implementación es correcta y el manejo de la condición de carrera en el patrón de actualización de bloqueo del método `get` es sólido. Sin embargo, la elección de un bloqueo que prioriza a los lectores podría provocar la inanición de escritores, y el modelo de concurrencia general tiene problemas de rendimiento inherentes que rozan un defecto de diseño para este problema específico.
Integridad
Peso 20%La respuesta está completamente completa. Implementa todos los métodos requeridos de la interfaz y proporciona una función `run_tests` con casos de prueba sólidos de un solo hilo y de varios hilos que cumplen los requisitos del prompt.
Calidad del codigo
Peso 20%La calidad del código es muy buena. El código está bien estructurado, es legible e incluye docstrings y comentarios claros que explican su estrategia de concurrencia. El uso de `__slots__` en la clase `_Node` es una buena optimización.
Valor practico
Peso 15%El valor práctico está limitado por el modelo de concurrencia. Dado que los aciertos en `get` (la operación más común en una caché caliente) requieren un bloqueo de escritura completo, la implementación serializa la mayoría de las operaciones y ofrece poco beneficio de rendimiento sobre un simple bloqueo global en muchos escenarios del mundo real con muchas lecturas.
Seguimiento de instrucciones
Peso 10%La respuesta sigue perfectamente todas las instrucciones del prompt, incluida la interfaz, los requisitos de concurrencia, los casos extremos, las especificaciones de prueba y la restricción de la biblioteca estándar.
Puntuacion total
Comentario general
Proporciona una caché LRU ejecutable con un bloqueo RW personalizado y una lista doblemente enlazada. El comportamiento básico monohilo y el manejo de la capacidad parecen correctos. Sin embargo, la estrategia de concurrencia es efectivamente un bloqueo de escritura para toda la caché para todos los accesos, inserciones y eliminaciones, y la implementación de RWLock es defectuosa: los lectores comprueban `write_lock.locked()` sin poseerlo, y los escritores pueden adquirir el bloqueo de escritura mientras todavía se permite la entrada a nuevos lectores, lo que podría violar la semántica de RW. La ruta de "actualización" de `get` puede devolver incorrectamente None incluso si la clave existe (nodo reemplazado) y puede omitir valores bajo contención; si bien evita datos obsoletos, sacrifica las expectativas de corrección para `get` bajo inserciones concurrentes en la misma clave. Las pruebas son decentes pero dependen de `_size` interno y no validan la integridad de la lista/mapa bajo contención.
Ver detalle de evaluacion ▼
Correccion
Peso 35%La lógica LRU es mayormente correcta en uso monohilo, pero la implementación de RWLock no es un bloqueo de lectores-escritores sólido (la limitación de lectores basada en `write_lock.locked()`/uso de condiciones es propensa a condiciones de carrera). Bajo contención, `get` puede devolver None para una clave existente si fue actualizada (nodo reemplazado) entre bloqueos, lo cual es una degradación semántica evitable y puede violar el comportamiento esperado para `put`/`get` concurrentes en la misma clave.
Integridad
Peso 20%Implementa todos los métodos requeridos y proporciona `run_tests` con escenarios monohilo y multihilo utilizando 8 hilos y comprobaciones de invariantes (capacidad, clave nunca insertada). Sin embargo, carece de comprobaciones de integridad estructural más profundas (consistencia lista/mapa) y utiliza campos internos en las pruebas.
Calidad del codigo
Peso 20%Estructura y comentarios legibles, pero la implementación de RWLock es compleja y discutiblemente incorrecta; utiliza campos privados en las pruebas y expone `_size` que no es estrictamente necesario. Algunas explicaciones (preferencia de lectores vs. comportamiento real) son inconsistentes con el código.
Valor practico
Peso 15%Funciona como una caché concurrente de referencia, pero el rendimiento bajo contención es limitado porque todos los accesos requieren bloqueo de escritura y las preocupaciones sobre RWLock reducen la confianza para uso en producción.
Seguimiento de instrucciones
Peso 10%Cumple los requisitos de la interfaz e incluye docstrings/comentarios y una prueba mixta de 8 hilos. Sin embargo, la indicación pide evitar un bloqueo global para cada operación; en la práctica, la mayoría de las operaciones significativas se serializan a través del bloqueo de escritura, y los beneficios de concurrencia son limitados.
Puntuacion total
Comentario general
La Respuesta A proporciona una implementación completa y ejecutable con un ReadWriteLock personalizado y una caché LRU utilizando una lista doblemente enlazada con nodos centinela. La estrategia de concurrencia utiliza un único RWLock para toda la caché, lo que es mejor que un mutex global pero aún limitado. El método get() utiliza un patrón de mejora de lectura a escritura, que tiene un problema sutil: entre la liberación del bloqueo de lectura y la adquisición del bloqueo de escritura, el nodo podría ser eliminado y la clave reinserida con un nodo diferente, lo que haría que get() devolviera None aunque la clave exista (un problema de corrección). La implementación de ReadWriteLock tiene un problema potencial: comprueba `self._write_lock.locked()` dentro de acquire_read, pero este es un bloqueo que prefiere lectores y podría provocar la inanición de escritores. Las pruebas son razonables pero podrían ser más exhaustivas. El código está bien documentado con docstrings claras que explican la estrategia de concurrencia.
Ver detalle de evaluacion ▼
Correccion
Peso 35%El get() de la Respuesta A tiene un problema de corrección: el patrón de mejora de bloqueo de lectura a escritura significa que entre la liberación del bloqueo de lectura y la adquisición del bloqueo de escritura, otro hilo podría eliminar y volver a insertar la misma clave con un nuevo nodo. La comprobación de identidad (current_node is node_found_under_read_lock) fallaría entonces, haciendo que get() devuelva None aunque la clave exista con un valor válido. El acquire_read de ReadWriteLock comprueba _write_lock.locked() lo cual no es completamente fiable. El diseño que prefiere lectores puede provocar la inanición de escritores.
Integridad
Peso 20%La Respuesta A implementa todas las operaciones requeridas (get, put, delete, keys) con semántica LRU adecuada. Maneja la capacidad de 1, la actualización de claves existentes y la eliminación de claves inexistentes. Las pruebas cubren escenarios de un solo hilo y de varios hilos con 8 hilos. Sin embargo, carece de utilidades de validación interna y la prueba de varios hilos podría ser más exhaustiva.
Calidad del codigo
Peso 20%La Respuesta A tiene buena documentación con docstrings claras que explican la estrategia de concurrencia y los compromisos. El código está bien estructurado con una clara separación de responsabilidades. El ReadWriteLock tiene gestores de contexto para un uso limpio. Utiliza __slots__ en _Node para eficiencia de memoria. Sin embargo, el seguimiento del tamaño es manual y podría desviarse si existen errores, y la implementación de ReadWriteLock tiene algunas opciones de diseño cuestionables (comprobación del estado locked()).
Valor practico
Peso 15%La Respuesta A proporciona una caché LRU funcional, pero la condición de carrera del patrón de mejora de get() reduce la fiabilidad práctica. El RWLock que prefiere lectores podría causar inanición de escritores en producción. El beneficio de la concurrencia es limitado ya que los accesos a get() todavía requieren un bloqueo de escritura, y el patrón de mejora añade sobrecarga. Necesitaría correcciones antes de su uso en producción.
Seguimiento de instrucciones
Peso 10%La Respuesta A sigue la mayoría de las instrucciones: implementa todas las operaciones requeridas, utiliza una estrategia de concurrencia más allá de un único bloqueo global, incluye docstrings y comentarios, maneja casos extremos y proporciona run_tests() con pruebas de un solo hilo y de varios hilos utilizando 8 hilos. Sin embargo, las aserciones de la prueba de varios hilos podrían ser más sólidas, y la explicación del diseño de concurrencia podría abordar mejor los compromisos.