Answer A: Google Gemini 2.5 Flash
import threading import random import time from typing import Any, Optional, List class _Node: """ Represents a node in the doubly linked list used for LRU ordering. Uses __slots__ for memory efficiency. """ __slots__ = 'key', 'value', 'prev', 'next' def __init__(self, key: str, value: Any): self.key = key self.value = value self.prev = None self.next = None class ReadWriteLock: """ A custom Read-Write Lock implementation. This lock allows multiple readers to hold the lock concurrently, but only one writer at a time. A writer must wait for...
Show Full Answer ▼
import threading import random import time from typing import Any, Optional, List class _Node: """ Represents a node in the doubly linked list used for LRU ordering. Uses __slots__ for memory efficiency. """ __slots__ = 'key', 'value', 'prev', 'next' def __init__(self, key: str, value: Any): self.key = key self.value = value self.prev = None self.next = None class ReadWriteLock: """ A custom Read-Write Lock implementation. This lock allows multiple readers to hold the lock concurrently, but only one writer at a time. A writer must wait for all readers to finish, and readers must wait for a writer to finish. This is a readers-preferring implementation, meaning new readers can acquire the read lock even if a writer is waiting. This class also provides context managers for cleaner lock acquisition and release. """ def __init__(self): self._read_lock = threading.Lock() # Protects _readers count and condition variable self._write_lock = threading.Lock() # The actual write lock self._readers = 0 # Number of active readers self._no_readers = threading.Condition(self._read_lock) # Condition for writers to wait for readers def acquire_read(self): """Acquire a read lock. Multiple readers can hold this lock concurrently.""" with self._read_lock: # Wait if a writer currently holds the write lock. # New readers will block if a writer is active. while self._write_lock.locked(): self._no_readers.wait() self._readers += 1 def release_read(self): """Release a read lock.""" with self._read_lock: self._readers -= 1 # If no readers left, notify any waiting writers if self._readers == 0: self._no_readers.notify_all() def acquire_write(self): """Acquire a write lock. Only one writer can hold this lock at a time.""" # Acquire the main write lock first to signal intent to write self._write_lock.acquire() with self._read_lock: # Then acquire read_lock to wait for active readers # Wait until all active readers have finished while self._readers > 0: self._no_readers.wait() def release_write(self): """Release a write lock.""" with self._read_lock: # Acquire read_lock to notify potential readers/writers self._no_readers.notify_all() # Notify any waiting readers or writers self._write_lock.release() # Release the main write lock class _ReadContextManager: def __init__(self, lock): self.lock = lock def __enter__(self): self.lock.acquire_read() def __exit__(self, exc_type, exc_val, exc_tb): self.lock.release_read() class _WriteContextManager: def __init__(self, lock): self.lock = lock def __enter__(self): self.lock.acquire_write() def __exit__(self, exc_type, exc_val, exc_tb): self.lock.release_write() def read_locked(self): """Returns a context manager for acquiring a read lock.""" return self._ReadContextManager(self) def write_locked(self): """Returns a context manager for acquiring a write lock.""" return self._WriteContextManager(self) class LRUCache: """ A thread-safe LRU (Least Recently Used) cache implementation. This cache uses a ReadWriteLock to manage concurrent access. The design aims to allow concurrent reads for cache misses and the `keys()` operation. However, operations that modify the cache state or LRU order (`put`, `delete`, and cache hits in `get`) acquire a write lock, ensuring atomicity and consistency for these critical sections. This approach provides better concurrency than a single global mutex for all operations, especially for read-heavy workloads with frequent cache misses or `keys()` calls. The `get` operation employs an "upgrade" pattern: it first attempts a read lock to check for key existence. If found, it releases the read lock and acquires a write lock to update the LRU order and retrieve the latest value. This pattern is carefully implemented to handle race conditions that might occur during the lock upgrade (e.g., key being deleted or updated by another thread). """ def __init__(self, capacity: int): if capacity <= 0: raise ValueError("Capacity must be a positive integer.") self._capacity = capacity self._cache = {} # Stores key -> _Node for O(1) lookup self._size = 0 # Dummy head and tail nodes for the doubly linked list. # This simplifies edge cases for adding/removing nodes, as actual data nodes # are always between head and tail. self._head = _Node("dummy_head", None) self._tail = _Node("dummy_tail", None) self._head.next = self._tail self._tail.prev = self._head # Concurrency control: A single ReadWriteLock protects the entire cache state. self._rw_lock = ReadWriteLock() def _add_node(self, node: _Node): """Adds a node right after the head (most recently used position). Assumes write lock is held.""" node.prev = self._head node.next = self._head.next self._head.next.prev = node self._head.next = node def _remove_node(self, node: _Node): """Removes a node from the doubly linked list. Assumes write lock is held.""" prev_node = node.prev next_node = node.next prev_node.next = next_node next_node.prev = prev_node def _move_to_front(self, node: _Node): """Moves an existing node to the front of the list (most recently used). Assumes write lock is held.""" self._remove_node(node) self._add_node(node) def _pop_tail(self) -> _Node: """Removes and returns the least recently used node (the one just before the tail). Assumes write lock is held.""" node = self._tail.prev self._remove_node(node) return node def get(self, key: str) -> Optional[Any]: """ Returns the value associated with the key if it exists, and marks it as recently used. Returns None if the key is not in the cache. This operation first acquires a read lock to check for key existence. If the key is found, it releases the read lock and then acquires a write lock to update the LRU order and retrieve the latest value. This "upgrade" pattern allows concurrent `get` misses and `keys()` operations, but serializes `get` hits with other writes. """ node_found_under_read_lock = None with self._rw_lock.read_locked(): node_found_under_read_lock = self._cache.get(key) if not node_found_under_read_lock: return None # If key found, acquire write lock to update LRU order and get the latest value. # This is an "upgrade" pattern. A race condition can occur here: another thread # might delete or update the key between releasing the read lock and acquiring the write lock. with self._rw_lock.write_locked(): # Re-check if the key still exists and refers to the exact same node object. # This is crucial for correctness under contention. If another thread performed # a `put` for the same key, `_cache.get(key)` would return a *new* node. # If another thread performed a `delete`, it would return `None`. current_node = self._cache.get(key) if current_node is node_found_under_read_lock: # Ensure it's the exact same node object self._move_to_front(node_found_under_read_lock) value = current_node.value # Get the latest value after moving return value else: # The node was either deleted or replaced by another 'put' operation # between our read-lock and write-lock acquisition. Treat as a miss. return None def put(self, key: str, value: Any) -> None: """ Inserts or updates the key-value pair. If the cache exceeds capacity after insertion, evicts the least recently used item. This is a write operation and acquires a write lock. """ with self._rw_lock.write_locked(): node = self._cache.get(key) if node: # Key exists: update value and move to front node.value = value self._move_to_front(node) else: # Key does not exist: add new node new_node = _Node(key, value) self._cache[key] = new_node self._add_node(new_node) self._size += 1 # Evict if capacity exceeded if self._size > self._capacity: lru_node = self._pop_tail() del self._cache[lru_node.key] self._size -= 1 def delete(self, key: str) -> bool: """ Removes the key from the cache. Returns True if the key was present, False otherwise. This is a write operation and acquires a write lock. """ with self._rw_lock.write_locked(): node = self._cache.get(key) if node: self._remove_node(node) del self._cache[key] self._size -= 1 return True return False def keys(self) -> List[str]: """ Returns a list of all keys currently in the cache, ordered from most recently used to least recently used. This is a read operation and acquires a read lock. """ with self._rw_lock.read_locked(): keys_list = [] current = self._head.next while current is not self._tail: keys_list.append(current.key) current = current.next return keys_list def run_tests(): """ Runs a series of single-threaded and multi-threaded tests to verify the LRUCache implementation. """ print("--- Starting LRU Cache Tests ---") # --- Single-threaded Tests --- print("\n--- Single-threaded Tests ---") cache = LRUCache(capacity=3) assert cache.get("a") is None, "ST: Get non-existent key should return None" assert cache.keys() == [], "ST: Empty cache should have empty keys list" cache.put("a", 1) assert cache.get("a") == 1, "ST: Get existing key after put" assert cache.keys() == ["a"], "ST: Keys list after one put" cache.put("b", 2) cache.put("c", 3) assert cache.keys() == ["c", "b", "a"], "ST: Keys list after multiple puts" assert cache._size == 3, "ST: Cache size should be 3" # Test capacity eviction cache.put("d", 4) # 'a' should be evicted as it's LRU assert cache.get("a") is None, "ST: Evicted key 'a' should be None" assert cache.get("d") == 4, "ST: New key 'd' should be present" assert cache.keys() == ["d", "c", "b"], "ST: Keys list after eviction" assert cache._size == 3, "ST: Cache size should remain 3 after eviction" # Test put with existing key (update value and move to front) cache.put("b", 22) # 'b' should move to front, value updated assert cache.get("b") == 22, "ST: Updated value for 'b'" assert cache.keys() == ["b", "d", "c"], "ST: Keys list after update and move 'b'" # Test delete assert cache.delete("c") is True, "ST: Delete existing key 'c'" assert cache.get("c") is None, "ST: Deleted key 'c' should be None" assert cache.keys() == ["b", "d"], "ST: Keys list after deleting 'c'" assert cache._size == 2, "ST: Cache size after delete" assert cache.delete("z") is False, "ST: Delete non-existent key 'z'" assert cache.keys() == ["b", "d"], "ST: Keys list unchanged after deleting non-existent key" # Test capacity 1 cache_cap1 = LRUCache(capacity=1) cache_cap1.put("x", 10) assert cache_cap1.get("x") == 10, "ST: Capacity 1, get 'x'" cache_cap1.put("y", 20) # 'x' should be evicted assert cache_cap1.get("x") is None, "ST: Capacity 1, 'x' evicted" assert cache_cap1.get("y") == 20, "ST: Capacity 1, get 'y'" assert cache_cap1.keys() == ["y"], "ST: Capacity 1, keys list" print("Single-threaded tests passed!") # --- Multi-threaded Tests --- print("\n--- Multi-threaded Tests ---") MT_CAPACITY = 10 MT_NUM_THREADS = 8 MT_OPS_PER_THREAD = 2000 MT_KEY_RANGE = 20 # Keys from 0 to 19 (e.g., "0", "1", ..., "19") mt_cache = LRUCache(capacity=MT_CAPACITY) # Barrier to ensure all worker threads start their operations simultaneously barrier = threading.Barrier(MT_NUM_THREADS + 1) # +1 for the main thread error_flag = threading.Event() # Set if any thread encounters an error # Tracks keys that have *ever* been successfully put into the cache by any thread. # Used to verify that 'get' never returns a value for a key that was truly never inserted. global_ever_inserted_keys = set() global_ever_inserted_keys_lock = threading.Lock() def mt_worker(cache_instance, thread_id, barrier_instance, error_event, ever_inserted_keys_set, ever_inserted_keys_lock): """Worker function for multi-threaded tests.""" barrier_instance.wait() # Wait for all threads to be ready to start operations for i in range(MT_OPS_PER_THREAD): if error_event.is_set(): break # Stop if another thread already found an error # Randomly choose an operation with a bias towards 'get' and 'put' op = random.choices(['get', 'put', 'delete'], weights=[0.5, 0.4, 0.1], k=1)[0] key = str(random.randint(0, MT_KEY_RANGE - 1)) value = f"value_T{thread_id}_K{key}_I{i}" if op == 'put': cache_instance.put(key, value) with ever_inserted_keys_lock: ever_inserted_keys_set.add(key) elif op == 'get': val = cache_instance.get(key) if val is not None: # Assert that if a value is returned, the key must have been inserted at some point. with ever_inserted_keys_lock: if key not in ever_inserted_keys_set: print(f"MT Error: Thread {thread_id} got value '{val}' for key '{key}' " f"which was never inserted globally. This should not happen.") error_event.set() break # Assert that the value is not corrupted (simple format check) if not isinstance(val, str) or not val.startswith("value_T"): print(f"MT Error: Thread {thread_id} got corrupted value '{val}' for key '{key}'.") error_event.set() break elif op == 'delete': cache_instance.delete(key) # Optional: Introduce a small random sleep to increase context switching and contention # time.sleep(random.uniform(0.00001, 0.0001)) # print(f"Thread {thread_id} finished.") # Uncomment for verbose output threads = [] for i in range(MT_NUM_THREADS): thread = threading.Thread(target=mt_worker, args=(mt_cache, i, barrier, error_flag, global_ever_inserted_keys, global_ever_inserted_keys_lock)) threads.append(thread) thread.start() # Wait for all worker threads to reach the barrier and start operations barrier.wait() print(f"All {MT_NUM_THREADS} threads started operations.") # Wait for all worker threads to complete their operations for thread in threads: thread.join() print("All threads completed.") # Final assertions for multi-threaded tests assert not error_flag.is_set(), "MT: Errors detected during multi-threaded operations. Check console for details." assert mt_cache._size <= MT_CAPACITY, f"MT: Cache size ({mt_cache._size}) exceeded capacity ({MT_CAPACITY})." # Check that all keys remaining in the cache were at some point inserted final_keys = mt_cache.keys() with global_ever_inserted_keys_lock: for key in final_keys: assert key in global_ever_inserted_keys, f"MT Error: Final cache contains key '{key}' that was never globally inserted." print("Multi-threaded tests passed!") print("\n--- All LRU Cache Tests Passed! ---") if __name__ == '__main__': run_tests()
Result
Winning Votes
0 / 3
Average Score
Total Score
Overall Comments
Answer A provides a correct and well-structured implementation of a concurrent LRU cache. It uses a custom ReadWriteLock and a clever "lock upgrade" pattern in the `get` method to handle race conditions. The code is clean, well-commented, and includes a comprehensive test suite. However, its primary weakness lies in its concurrency model. Because a `get` hit requires acquiring a full write lock to update the LRU order, the cache's performance is severely bottlenecked under read-heavy workloads with a warm cache, making its practical advantage over a simple global lock marginal.
View Score Details ▼
Correctness
Weight 35%The implementation is correct, and the handling of the race condition in the `get` method's lock-upgrade pattern is sound. However, the choice of a readers-preferring lock could lead to writer starvation, and the overall concurrency model has inherent performance issues that border on a design flaw for this specific problem.
Completeness
Weight 20%The answer is fully complete. It implements all required methods of the interface and provides a `run_tests` function with solid single-threaded and multi-threaded test cases that meet the prompt's requirements.
Code Quality
Weight 20%The code quality is very good. The code is well-structured, readable, and includes clear docstrings and comments explaining its concurrency strategy. The use of `__slots__` in the `_Node` class is a nice optimization.
Practical Value
Weight 15%The practical value is limited by the concurrency model. Since `get` hits (the most common operation on a warm cache) require a full write lock, the implementation serializes most operations and offers little performance benefit over a simple global lock in many real-world, read-heavy scenarios.
Instruction Following
Weight 10%The answer perfectly follows all instructions in the prompt, including the interface, concurrency requirements, edge cases, testing specifications, and standard library constraint.
Total Score
Overall Comments
Provides a runnable LRU cache with a custom RW lock and a doubly linked list. Basic single-threaded behavior and capacity handling look correct. However, the concurrency strategy is effectively a single cache-wide writer lock for all hits, puts, and deletes, and the RWLock implementation is flawed: readers check `write_lock.locked()` without holding it, and writers can acquire the write lock while new readers are still allowed, potentially violating RW semantics. The `get` “upgrade” path can incorrectly return None even if the key exists (replaced node) and can miss values under contention; while arguably avoiding stale data, it sacrifices correctness expectations for `get` under concurrent put on the same key. Tests are decent but rely on internal `_size` and don’t validate list/map integrity under contention.
View Score Details ▼
Correctness
Weight 35%LRU logic is mostly correct in single-threaded use, but the RWLock implementation is not a sound readers-writer lock (reader gating based on `write_lock.locked()`/condition usage is race-prone). Under contention, `get` may return None for an existing key if it was updated (node replaced) between locks, which is an avoidable semantic degradation and can violate expected behavior for concurrent put/get on same key.
Completeness
Weight 20%Implements all required methods and provides run_tests with single- and multi-threaded scenarios using 8 threads and invariant checks (capacity, never-inserted key). However, it lacks deeper structural integrity checks (list/map consistency) and uses internal fields in tests.
Code Quality
Weight 20%Readable structure and comments, but RWLock implementation is complex and arguably incorrect; uses private fields in tests and exposes `_size` that is not strictly necessary. Some explanations (readers-preferring vs actual behavior) are inconsistent with code.
Practical Value
Weight 15%Works as a baseline concurrent cache, but contention performance is limited because all hits require write locking and RWLock concerns reduce confidence for production use.
Instruction Following
Weight 10%Meets interface requirements and includes docstrings/comments and an 8-thread mixed test. However, the prompt asks to avoid a global lock for every operation; in practice most meaningful operations serialize via the write lock, and concurrency benefits are limited.
Total Score
Overall Comments
Answer A provides a complete, runnable implementation with a custom ReadWriteLock and LRU cache using a doubly linked list with sentinel nodes. The concurrency strategy uses a single RWLock for the entire cache, which is better than a global mutex but still limited. The get() method uses a read-then-write upgrade pattern, which has a subtle issue: between releasing the read lock and acquiring the write lock, the node could be deleted and the key re-inserted with a different node, causing the get to return None even though the key exists (a correctness concern). The ReadWriteLock implementation has a potential issue: it checks `self._write_lock.locked()` inside acquire_read, but this is a readers-preferring lock that could starve writers. The tests are reasonable but could be more thorough. The code is well-documented with clear docstrings explaining the concurrency strategy.
View Score Details ▼
Correctness
Weight 35%Answer A's get() has a correctness issue: the read-to-write lock upgrade pattern means that between releasing the read lock and acquiring the write lock, another thread could delete and re-insert the same key with a new node. The identity check (current_node is node_found_under_read_lock) would then fail, causing get() to return None even though the key exists with a valid value. The ReadWriteLock's acquire_read checks _write_lock.locked() which is not fully reliable. The readers-preferring design can starve writers.
Completeness
Weight 20%Answer A implements all required operations (get, put, delete, keys) with proper LRU semantics. It handles capacity of 1, updating existing keys, and deleting nonexistent keys. The tests cover single-threaded and multi-threaded scenarios with 8 threads. However, it lacks internal validation utilities and the multi-threaded test could be more thorough.
Code Quality
Weight 20%Answer A has good documentation with clear docstrings explaining the concurrency strategy and trade-offs. The code is well-structured with clean separation of concerns. The ReadWriteLock has context managers for clean usage. Uses __slots__ on _Node for memory efficiency. However, the _size tracking is manual and could drift if bugs exist, and the ReadWriteLock implementation has some questionable design choices (checking locked() state).
Practical Value
Weight 15%Answer A provides a functional LRU cache but the get() upgrade pattern race condition reduces practical reliability. The readers-preferring RWLock could cause writer starvation in production. The concurrency benefit is limited since get hits still require a write lock, and the upgrade pattern adds overhead. Would need fixes before production use.
Instruction Following
Weight 10%Answer A follows most instructions: implements all required operations, uses a concurrency strategy beyond a single global lock, includes docstrings and comments, handles edge cases, and provides run_tests() with single-threaded and multi-threaded tests using 8 threads. However, the multi-threaded test assertions could be stronger, and the concurrency design explanation could better address trade-offs.