Orivel Orivel
Open menu

Implement a Lock-Free Concurrent LRU Cache

Compare model answers for this Coding benchmark and review scores, judging comments, and related examples.

Login or register to use likes and favorites. Register

X f L

Contents

Task Overview

Benchmark Genres

Coding

Task Creator Model

Answering Models

Judge Models

Task Prompt

Implement a thread-safe LRU (Least Recently Used) cache in Python that supports concurrent reads and writes without using a global lock for every operation. Your implementation must satisfy the following requirements: 1. **Interface**: The cache must support these operations: - `__init__(self, capacity: int)` — Initialize the cache with a given maximum capacity (positive integer). - `get(self, key: str) -> Optional[Any]` — Return the value associated with the key if it exists (and mark it as recently used),...

Show more

Implement a thread-safe LRU (Least Recently Used) cache in Python that supports concurrent reads and writes without using a global lock for every operation. Your implementation must satisfy the following requirements: 1. **Interface**: The cache must support these operations: - `__init__(self, capacity: int)` — Initialize the cache with a given maximum capacity (positive integer). - `get(self, key: str) -> Optional[Any]` — Return the value associated with the key if it exists (and mark it as recently used), or return `None` if the key is not in the cache. - `put(self, key: str, value: Any) -> None` — Insert or update the key-value pair. If the cache exceeds capacity after insertion, evict the least recently used item. - `delete(self, key: str) -> bool` — Remove the key from the cache. Return `True` if the key was present, `False` otherwise. - `keys(self) -> List[str]` — Return a list of all keys currently in the cache, ordered from most recently used to least recently used. 2. **Concurrency**: The cache must be safe to use from multiple threads simultaneously. Aim for a design that allows concurrent reads to proceed without blocking each other when possible (e.g., using read-write locks, fine-grained locking, or lock-free techniques). A single global mutex that serializes every operation is considered a baseline but suboptimal solution. 3. **Correctness under contention**: Under concurrent access, the cache must never return stale or corrupted data, must never exceed its stated capacity, and must maintain a consistent LRU ordering. 4. **Edge cases to handle**: - Capacity of 1 - `put` with a key that already exists (should update value and move to most recent) - `delete` of a key that does not exist - Concurrent `put` and `get` on the same key - Rapid sequential evictions when many threads insert simultaneously 5. **Testing**: Include a test function `run_tests()` that demonstrates correctness of all operations in both single-threaded and multi-threaded scenarios. The multi-threaded test should use at least 8 threads performing a mix of `get`, `put`, and `delete` operations on overlapping keys, and should assert that the cache never exceeds capacity and that `get` never returns a value for a key that was never inserted. Provide your complete implementation in Python. Use only the standard library (no third-party packages). Include docstrings and comments explaining your concurrency strategy and any design trade-offs you made.

Judging Policy

A strong answer should provide a complete, runnable Python implementation that meets all five requirements. Key quality dimensions to evaluate: Correctness: All operations (get, put, delete, keys) must behave correctly per LRU semantics in both single-threaded and multi-threaded contexts. The cache must never exceed capacity and must never return corrupted data. Concurrency design: The solution should go beyond a single global lock. Higher-quality answers will use read-write locks, sharded locks, or other fine-gr...

Show more

A strong answer should provide a complete, runnable Python implementation that meets all five requirements. Key quality dimensions to evaluate: Correctness: All operations (get, put, delete, keys) must behave correctly per LRU semantics in both single-threaded and multi-threaded contexts. The cache must never exceed capacity and must never return corrupted data. Concurrency design: The solution should go beyond a single global lock. Higher-quality answers will use read-write locks, sharded locks, or other fine-grained concurrency strategies that allow concurrent reads. The answer should clearly explain the chosen strategy and its trade-offs. Edge case handling: The implementation should correctly handle capacity of 1, updating existing keys, deleting nonexistent keys, and concurrent operations on the same key. Testing: The run_tests function should include meaningful single-threaded correctness tests and a multi-threaded stress test with at least 8 threads, asserting invariants like capacity limits and data integrity. Code quality: The code should be well-structured, include docstrings and comments explaining the concurrency approach, and use only the Python standard library. Readability and maintainability matter. Robustness: The implementation should not deadlock, should handle exceptions gracefully, and should work correctly under high contention. Partial or incomplete solutions, solutions with race conditions, or solutions that only use a trivial global lock should be rated lower.

Answers

import threading import random import time from typing import Any, Optional, List class _Node: """ Represents a node in the doubly linked list used for LRU ordering. Uses __slots__ for memory efficiency. """ __slots__ = 'key', 'value', 'prev', 'next' def __init__(self, key: str, value: Any): self.key = key self.value = value self.prev = None self.next = None class ReadWriteLock: """ A custom Read-Write Lock implementation. This lock allows multiple readers to hold the lock concurrently, but only one writer at a time. A writer must wait for...

Show Full Answer

import threading import random import time from typing import Any, Optional, List class _Node: """ Represents a node in the doubly linked list used for LRU ordering. Uses __slots__ for memory efficiency. """ __slots__ = 'key', 'value', 'prev', 'next' def __init__(self, key: str, value: Any): self.key = key self.value = value self.prev = None self.next = None class ReadWriteLock: """ A custom Read-Write Lock implementation. This lock allows multiple readers to hold the lock concurrently, but only one writer at a time. A writer must wait for all readers to finish, and readers must wait for a writer to finish. This is a readers-preferring implementation, meaning new readers can acquire the read lock even if a writer is waiting. This class also provides context managers for cleaner lock acquisition and release. """ def __init__(self): self._read_lock = threading.Lock() # Protects _readers count and condition variable self._write_lock = threading.Lock() # The actual write lock self._readers = 0 # Number of active readers self._no_readers = threading.Condition(self._read_lock) # Condition for writers to wait for readers def acquire_read(self): """Acquire a read lock. Multiple readers can hold this lock concurrently.""" with self._read_lock: # Wait if a writer currently holds the write lock. # New readers will block if a writer is active. while self._write_lock.locked(): self._no_readers.wait() self._readers += 1 def release_read(self): """Release a read lock.""" with self._read_lock: self._readers -= 1 # If no readers left, notify any waiting writers if self._readers == 0: self._no_readers.notify_all() def acquire_write(self): """Acquire a write lock. Only one writer can hold this lock at a time.""" # Acquire the main write lock first to signal intent to write self._write_lock.acquire() with self._read_lock: # Then acquire read_lock to wait for active readers # Wait until all active readers have finished while self._readers > 0: self._no_readers.wait() def release_write(self): """Release a write lock.""" with self._read_lock: # Acquire read_lock to notify potential readers/writers self._no_readers.notify_all() # Notify any waiting readers or writers self._write_lock.release() # Release the main write lock class _ReadContextManager: def __init__(self, lock): self.lock = lock def __enter__(self): self.lock.acquire_read() def __exit__(self, exc_type, exc_val, exc_tb): self.lock.release_read() class _WriteContextManager: def __init__(self, lock): self.lock = lock def __enter__(self): self.lock.acquire_write() def __exit__(self, exc_type, exc_val, exc_tb): self.lock.release_write() def read_locked(self): """Returns a context manager for acquiring a read lock.""" return self._ReadContextManager(self) def write_locked(self): """Returns a context manager for acquiring a write lock.""" return self._WriteContextManager(self) class LRUCache: """ A thread-safe LRU (Least Recently Used) cache implementation. This cache uses a ReadWriteLock to manage concurrent access. The design aims to allow concurrent reads for cache misses and the `keys()` operation. However, operations that modify the cache state or LRU order (`put`, `delete`, and cache hits in `get`) acquire a write lock, ensuring atomicity and consistency for these critical sections. This approach provides better concurrency than a single global mutex for all operations, especially for read-heavy workloads with frequent cache misses or `keys()` calls. The `get` operation employs an "upgrade" pattern: it first attempts a read lock to check for key existence. If found, it releases the read lock and acquires a write lock to update the LRU order and retrieve the latest value. This pattern is carefully implemented to handle race conditions that might occur during the lock upgrade (e.g., key being deleted or updated by another thread). """ def __init__(self, capacity: int): if capacity <= 0: raise ValueError("Capacity must be a positive integer.") self._capacity = capacity self._cache = {} # Stores key -> _Node for O(1) lookup self._size = 0 # Dummy head and tail nodes for the doubly linked list. # This simplifies edge cases for adding/removing nodes, as actual data nodes # are always between head and tail. self._head = _Node("dummy_head", None) self._tail = _Node("dummy_tail", None) self._head.next = self._tail self._tail.prev = self._head # Concurrency control: A single ReadWriteLock protects the entire cache state. self._rw_lock = ReadWriteLock() def _add_node(self, node: _Node): """Adds a node right after the head (most recently used position). Assumes write lock is held.""" node.prev = self._head node.next = self._head.next self._head.next.prev = node self._head.next = node def _remove_node(self, node: _Node): """Removes a node from the doubly linked list. Assumes write lock is held.""" prev_node = node.prev next_node = node.next prev_node.next = next_node next_node.prev = prev_node def _move_to_front(self, node: _Node): """Moves an existing node to the front of the list (most recently used). Assumes write lock is held.""" self._remove_node(node) self._add_node(node) def _pop_tail(self) -> _Node: """Removes and returns the least recently used node (the one just before the tail). Assumes write lock is held.""" node = self._tail.prev self._remove_node(node) return node def get(self, key: str) -> Optional[Any]: """ Returns the value associated with the key if it exists, and marks it as recently used. Returns None if the key is not in the cache. This operation first acquires a read lock to check for key existence. If the key is found, it releases the read lock and then acquires a write lock to update the LRU order and retrieve the latest value. This "upgrade" pattern allows concurrent `get` misses and `keys()` operations, but serializes `get` hits with other writes. """ node_found_under_read_lock = None with self._rw_lock.read_locked(): node_found_under_read_lock = self._cache.get(key) if not node_found_under_read_lock: return None # If key found, acquire write lock to update LRU order and get the latest value. # This is an "upgrade" pattern. A race condition can occur here: another thread # might delete or update the key between releasing the read lock and acquiring the write lock. with self._rw_lock.write_locked(): # Re-check if the key still exists and refers to the exact same node object. # This is crucial for correctness under contention. If another thread performed # a `put` for the same key, `_cache.get(key)` would return a *new* node. # If another thread performed a `delete`, it would return `None`. current_node = self._cache.get(key) if current_node is node_found_under_read_lock: # Ensure it's the exact same node object self._move_to_front(node_found_under_read_lock) value = current_node.value # Get the latest value after moving return value else: # The node was either deleted or replaced by another 'put' operation # between our read-lock and write-lock acquisition. Treat as a miss. return None def put(self, key: str, value: Any) -> None: """ Inserts or updates the key-value pair. If the cache exceeds capacity after insertion, evicts the least recently used item. This is a write operation and acquires a write lock. """ with self._rw_lock.write_locked(): node = self._cache.get(key) if node: # Key exists: update value and move to front node.value = value self._move_to_front(node) else: # Key does not exist: add new node new_node = _Node(key, value) self._cache[key] = new_node self._add_node(new_node) self._size += 1 # Evict if capacity exceeded if self._size > self._capacity: lru_node = self._pop_tail() del self._cache[lru_node.key] self._size -= 1 def delete(self, key: str) -> bool: """ Removes the key from the cache. Returns True if the key was present, False otherwise. This is a write operation and acquires a write lock. """ with self._rw_lock.write_locked(): node = self._cache.get(key) if node: self._remove_node(node) del self._cache[key] self._size -= 1 return True return False def keys(self) -> List[str]: """ Returns a list of all keys currently in the cache, ordered from most recently used to least recently used. This is a read operation and acquires a read lock. """ with self._rw_lock.read_locked(): keys_list = [] current = self._head.next while current is not self._tail: keys_list.append(current.key) current = current.next return keys_list def run_tests(): """ Runs a series of single-threaded and multi-threaded tests to verify the LRUCache implementation. """ print("--- Starting LRU Cache Tests ---") # --- Single-threaded Tests --- print("\n--- Single-threaded Tests ---") cache = LRUCache(capacity=3) assert cache.get("a") is None, "ST: Get non-existent key should return None" assert cache.keys() == [], "ST: Empty cache should have empty keys list" cache.put("a", 1) assert cache.get("a") == 1, "ST: Get existing key after put" assert cache.keys() == ["a"], "ST: Keys list after one put" cache.put("b", 2) cache.put("c", 3) assert cache.keys() == ["c", "b", "a"], "ST: Keys list after multiple puts" assert cache._size == 3, "ST: Cache size should be 3" # Test capacity eviction cache.put("d", 4) # 'a' should be evicted as it's LRU assert cache.get("a") is None, "ST: Evicted key 'a' should be None" assert cache.get("d") == 4, "ST: New key 'd' should be present" assert cache.keys() == ["d", "c", "b"], "ST: Keys list after eviction" assert cache._size == 3, "ST: Cache size should remain 3 after eviction" # Test put with existing key (update value and move to front) cache.put("b", 22) # 'b' should move to front, value updated assert cache.get("b") == 22, "ST: Updated value for 'b'" assert cache.keys() == ["b", "d", "c"], "ST: Keys list after update and move 'b'" # Test delete assert cache.delete("c") is True, "ST: Delete existing key 'c'" assert cache.get("c") is None, "ST: Deleted key 'c' should be None" assert cache.keys() == ["b", "d"], "ST: Keys list after deleting 'c'" assert cache._size == 2, "ST: Cache size after delete" assert cache.delete("z") is False, "ST: Delete non-existent key 'z'" assert cache.keys() == ["b", "d"], "ST: Keys list unchanged after deleting non-existent key" # Test capacity 1 cache_cap1 = LRUCache(capacity=1) cache_cap1.put("x", 10) assert cache_cap1.get("x") == 10, "ST: Capacity 1, get 'x'" cache_cap1.put("y", 20) # 'x' should be evicted assert cache_cap1.get("x") is None, "ST: Capacity 1, 'x' evicted" assert cache_cap1.get("y") == 20, "ST: Capacity 1, get 'y'" assert cache_cap1.keys() == ["y"], "ST: Capacity 1, keys list" print("Single-threaded tests passed!") # --- Multi-threaded Tests --- print("\n--- Multi-threaded Tests ---") MT_CAPACITY = 10 MT_NUM_THREADS = 8 MT_OPS_PER_THREAD = 2000 MT_KEY_RANGE = 20 # Keys from 0 to 19 (e.g., "0", "1", ..., "19") mt_cache = LRUCache(capacity=MT_CAPACITY) # Barrier to ensure all worker threads start their operations simultaneously barrier = threading.Barrier(MT_NUM_THREADS + 1) # +1 for the main thread error_flag = threading.Event() # Set if any thread encounters an error # Tracks keys that have *ever* been successfully put into the cache by any thread. # Used to verify that 'get' never returns a value for a key that was truly never inserted. global_ever_inserted_keys = set() global_ever_inserted_keys_lock = threading.Lock() def mt_worker(cache_instance, thread_id, barrier_instance, error_event, ever_inserted_keys_set, ever_inserted_keys_lock): """Worker function for multi-threaded tests.""" barrier_instance.wait() # Wait for all threads to be ready to start operations for i in range(MT_OPS_PER_THREAD): if error_event.is_set(): break # Stop if another thread already found an error # Randomly choose an operation with a bias towards 'get' and 'put' op = random.choices(['get', 'put', 'delete'], weights=[0.5, 0.4, 0.1], k=1)[0] key = str(random.randint(0, MT_KEY_RANGE - 1)) value = f"value_T{thread_id}_K{key}_I{i}" if op == 'put': cache_instance.put(key, value) with ever_inserted_keys_lock: ever_inserted_keys_set.add(key) elif op == 'get': val = cache_instance.get(key) if val is not None: # Assert that if a value is returned, the key must have been inserted at some point. with ever_inserted_keys_lock: if key not in ever_inserted_keys_set: print(f"MT Error: Thread {thread_id} got value '{val}' for key '{key}' " f"which was never inserted globally. This should not happen.") error_event.set() break # Assert that the value is not corrupted (simple format check) if not isinstance(val, str) or not val.startswith("value_T"): print(f"MT Error: Thread {thread_id} got corrupted value '{val}' for key '{key}'.") error_event.set() break elif op == 'delete': cache_instance.delete(key) # Optional: Introduce a small random sleep to increase context switching and contention # time.sleep(random.uniform(0.00001, 0.0001)) # print(f"Thread {thread_id} finished.") # Uncomment for verbose output threads = [] for i in range(MT_NUM_THREADS): thread = threading.Thread(target=mt_worker, args=(mt_cache, i, barrier, error_flag, global_ever_inserted_keys, global_ever_inserted_keys_lock)) threads.append(thread) thread.start() # Wait for all worker threads to reach the barrier and start operations barrier.wait() print(f"All {MT_NUM_THREADS} threads started operations.") # Wait for all worker threads to complete their operations for thread in threads: thread.join() print("All threads completed.") # Final assertions for multi-threaded tests assert not error_flag.is_set(), "MT: Errors detected during multi-threaded operations. Check console for details." assert mt_cache._size <= MT_CAPACITY, f"MT: Cache size ({mt_cache._size}) exceeded capacity ({MT_CAPACITY})." # Check that all keys remaining in the cache were at some point inserted final_keys = mt_cache.keys() with global_ever_inserted_keys_lock: for key in final_keys: assert key in global_ever_inserted_keys, f"MT Error: Final cache contains key '{key}' that was never globally inserted." print("Multi-threaded tests passed!") print("\n--- All LRU Cache Tests Passed! ---") if __name__ == '__main__': run_tests()

Result

#2

Winning Votes

0 / 3

Average Score

68

Total Score

82

Overall Comments

Answer A provides a correct and well-structured implementation of a concurrent LRU cache. It uses a custom ReadWriteLock and a clever "lock upgrade" pattern in the `get` method to handle race conditions. The code is clean, well-commented, and includes a comprehensive test suite. However, its primary weakness lies in its concurrency model. Because a `get` hit requires acquiring a full write lock to update the LRU order, the cache's performance is severely bottlenecked under read-heavy workloads with a warm cache, making its practical advantage over a simple global lock marginal.

View Score Details

Correctness

Weight 35%
80

The implementation is correct, and the handling of the race condition in the `get` method's lock-upgrade pattern is sound. However, the choice of a readers-preferring lock could lead to writer starvation, and the overall concurrency model has inherent performance issues that border on a design flaw for this specific problem.

Completeness

Weight 20%
90

The answer is fully complete. It implements all required methods of the interface and provides a `run_tests` function with solid single-threaded and multi-threaded test cases that meet the prompt's requirements.

Code Quality

Weight 20%
85

The code quality is very good. The code is well-structured, readable, and includes clear docstrings and comments explaining its concurrency strategy. The use of `__slots__` in the `_Node` class is a nice optimization.

Practical Value

Weight 15%
60

The practical value is limited by the concurrency model. Since `get` hits (the most common operation on a warm cache) require a full write lock, the implementation serializes most operations and offers little performance benefit over a simple global lock in many real-world, read-heavy scenarios.

Instruction Following

Weight 10%
100

The answer perfectly follows all instructions in the prompt, including the interface, concurrency requirements, edge cases, testing specifications, and standard library constraint.

Judge Models OpenAI GPT-5.2

Total Score

62

Overall Comments

Provides a runnable LRU cache with a custom RW lock and a doubly linked list. Basic single-threaded behavior and capacity handling look correct. However, the concurrency strategy is effectively a single cache-wide writer lock for all hits, puts, and deletes, and the RWLock implementation is flawed: readers check `write_lock.locked()` without holding it, and writers can acquire the write lock while new readers are still allowed, potentially violating RW semantics. The `get` “upgrade” path can incorrectly return None even if the key exists (replaced node) and can miss values under contention; while arguably avoiding stale data, it sacrifices correctness expectations for `get` under concurrent put on the same key. Tests are decent but rely on internal `_size` and don’t validate list/map integrity under contention.

View Score Details

Correctness

Weight 35%
55

LRU logic is mostly correct in single-threaded use, but the RWLock implementation is not a sound readers-writer lock (reader gating based on `write_lock.locked()`/condition usage is race-prone). Under contention, `get` may return None for an existing key if it was updated (node replaced) between locks, which is an avoidable semantic degradation and can violate expected behavior for concurrent put/get on same key.

Completeness

Weight 20%
70

Implements all required methods and provides run_tests with single- and multi-threaded scenarios using 8 threads and invariant checks (capacity, never-inserted key). However, it lacks deeper structural integrity checks (list/map consistency) and uses internal fields in tests.

Code Quality

Weight 20%
65

Readable structure and comments, but RWLock implementation is complex and arguably incorrect; uses private fields in tests and exposes `_size` that is not strictly necessary. Some explanations (readers-preferring vs actual behavior) are inconsistent with code.

Practical Value

Weight 15%
60

Works as a baseline concurrent cache, but contention performance is limited because all hits require write locking and RWLock concerns reduce confidence for production use.

Instruction Following

Weight 10%
70

Meets interface requirements and includes docstrings/comments and an 8-thread mixed test. However, the prompt asks to avoid a global lock for every operation; in practice most meaningful operations serialize via the write lock, and concurrency benefits are limited.

Total Score

60

Overall Comments

Answer A provides a complete, runnable implementation with a custom ReadWriteLock and LRU cache using a doubly linked list with sentinel nodes. The concurrency strategy uses a single RWLock for the entire cache, which is better than a global mutex but still limited. The get() method uses a read-then-write upgrade pattern, which has a subtle issue: between releasing the read lock and acquiring the write lock, the node could be deleted and the key re-inserted with a different node, causing the get to return None even though the key exists (a correctness concern). The ReadWriteLock implementation has a potential issue: it checks `self._write_lock.locked()` inside acquire_read, but this is a readers-preferring lock that could starve writers. The tests are reasonable but could be more thorough. The code is well-documented with clear docstrings explaining the concurrency strategy.

View Score Details

Correctness

Weight 35%
55

Answer A's get() has a correctness issue: the read-to-write lock upgrade pattern means that between releasing the read lock and acquiring the write lock, another thread could delete and re-insert the same key with a new node. The identity check (current_node is node_found_under_read_lock) would then fail, causing get() to return None even though the key exists with a valid value. The ReadWriteLock's acquire_read checks _write_lock.locked() which is not fully reliable. The readers-preferring design can starve writers.

Completeness

Weight 20%
65

Answer A implements all required operations (get, put, delete, keys) with proper LRU semantics. It handles capacity of 1, updating existing keys, and deleting nonexistent keys. The tests cover single-threaded and multi-threaded scenarios with 8 threads. However, it lacks internal validation utilities and the multi-threaded test could be more thorough.

Code Quality

Weight 20%
65

Answer A has good documentation with clear docstrings explaining the concurrency strategy and trade-offs. The code is well-structured with clean separation of concerns. The ReadWriteLock has context managers for clean usage. Uses __slots__ on _Node for memory efficiency. However, the _size tracking is manual and could drift if bugs exist, and the ReadWriteLock implementation has some questionable design choices (checking locked() state).

Practical Value

Weight 15%
55

Answer A provides a functional LRU cache but the get() upgrade pattern race condition reduces practical reliability. The readers-preferring RWLock could cause writer starvation in production. The concurrency benefit is limited since get hits still require a write lock, and the upgrade pattern adds overhead. Would need fixes before production use.

Instruction Following

Weight 10%
65

Answer A follows most instructions: implements all required operations, uses a concurrency strategy beyond a single global lock, includes docstrings and comments, handles edge cases, and provides run_tests() with single-threaded and multi-threaded tests using 8 threads. However, the multi-threaded test assertions could be stronger, and the concurrency design explanation could better address trade-offs.

from __future__ import annotations import random import threading import time from dataclasses import dataclass, field from typing import Any, Dict, List, Optional class RWLock: """ A simple writer-preferred readers-writer lock. Concurrency strategy: - Multiple readers may hold the lock concurrently. - Writers are exclusive. - Once a writer is waiting, new readers are blocked so writers are not starved. This is used to protect the dictionary that maps keys to nodes. Reads that only need membership/value lookup can proceed concurrently. Operations that mutate the index or t...

Show Full Answer

from __future__ import annotations import random import threading import time from dataclasses import dataclass, field from typing import Any, Dict, List, Optional class RWLock: """ A simple writer-preferred readers-writer lock. Concurrency strategy: - Multiple readers may hold the lock concurrently. - Writers are exclusive. - Once a writer is waiting, new readers are blocked so writers are not starved. This is used to protect the dictionary that maps keys to nodes. Reads that only need membership/value lookup can proceed concurrently. Operations that mutate the index or the LRU list take the write lock. Note: Python does not provide true lock-free primitives for building a fully lock-free LRU in the standard library. This implementation instead avoids a single global mutex for all access by using: - a readers-writer lock for the key->node index - a separate lock for the doubly linked LRU list - per-node locks for node-local state That gives substantially better concurrency than a single global lock while preserving correctness under contention. """ def __init__(self) -> None: self._cond = threading.Condition(threading.Lock()) self._readers = 0 self._writer = False self._writers_waiting = 0 def acquire_read(self) -> None: with self._cond: while self._writer or self._writers_waiting > 0: self._cond.wait() self._readers += 1 def release_read(self) -> None: with self._cond: self._readers -= 1 if self._readers == 0: self._cond.notify_all() def acquire_write(self) -> None: with self._cond: self._writers_waiting += 1 try: while self._writer or self._readers > 0: self._cond.wait() self._writer = True finally: self._writers_waiting -= 1 def release_write(self) -> None: with self._cond: self._writer = False self._cond.notify_all() @dataclass class _Node: key: str value: Any prev: Optional["_Node"] = None next: Optional["_Node"] = None deleted: bool = False lock: threading.RLock = field(default_factory=threading.RLock) class ConcurrentLRUCache: """ Thread-safe LRU cache with fine-grained synchronization. Design overview --------------- Data structures: - self._map: dict[str, _Node] Maps keys to nodes. Protected by self._index_lock (RWLock). - Doubly linked list with sentinels self._head (MRU side) and self._tail (LRU side) Protected by self._list_lock. Locking rules ------------- 1. Dictionary/index reads use self._index_lock.acquire_read(). 2. Dictionary/index mutations use self._index_lock.acquire_write(). 3. Any pointer changes in the linked list use self._list_lock. 4. Per-node locks serialize access to a node's value/deleted state. Ordering rule to avoid deadlock -------------------------------- When multiple locks are needed, operations follow a stable order: index lock -> list lock -> node lock(s) Concurrency trade-offs ---------------------- - get() needs to update recency, so it cannot be a pure read. It first performs a concurrent index lookup under a read lock, then briefly takes the list lock to move the located node to the MRU position. - put(), delete(), and eviction modify both index and list, so they take the index write lock and the list lock. - This is not strictly lock-free; Python's standard library does not make a practical lock-free linked-list LRU feasible. It is, however, more concurrent than a design using one global mutex for every operation. """ def __init__(self, capacity: int): """Initialize the cache with a positive maximum capacity.""" if not isinstance(capacity, int) or capacity <= 0: raise ValueError("capacity must be a positive integer") self._capacity = capacity self._map: Dict[str, _Node] = {} self._index_lock = RWLock() self._list_lock = threading.RLock() self._head = _Node("__HEAD__", None) self._tail = _Node("__TAIL__", None) self._head.next = self._tail self._tail.prev = self._head def _insert_after_head_unlocked(self, node: _Node) -> None: """Insert node immediately after head. Caller must hold self._list_lock.""" first = self._head.next node.prev = self._head node.next = first self._head.next = node if first is not None: first.prev = node def _detach_unlocked(self, node: _Node) -> None: """Detach node from list. Caller must hold self._list_lock.""" prev_node = node.prev next_node = node.next if prev_node is not None: prev_node.next = next_node if next_node is not None: next_node.prev = prev_node node.prev = None node.next = None def _move_to_front_if_present(self, node: _Node) -> None: """ Move an existing node to MRU position if it is still live. Uses the list lock plus the node lock. The map read lock held by caller ensures the node object remains reachable from the index during lookup. """ with self._list_lock: with node.lock: if node.deleted: return if node.prev is self._head: return if node.prev is not None and node.next is not None: self._detach_unlocked(node) self._insert_after_head_unlocked(node) def get(self, key: str) -> Optional[Any]: """ Return the value for key if present and mark it most recently used. Return None if absent. """ self._index_lock.acquire_read() try: node = self._map.get(key) if node is None: return None with node.lock: if node.deleted: return None value = node.value self._move_to_front_if_present(node) return value finally: self._index_lock.release_read() def put(self, key: str, value: Any) -> None: """ Insert or update a key-value pair. If capacity is exceeded, evict the least recently used item. """ self._index_lock.acquire_write() try: with self._list_lock: existing = self._map.get(key) if existing is not None: with existing.lock: if not existing.deleted: existing.value = value if existing.prev is not self._head: if existing.prev is not None and existing.next is not None: self._detach_unlocked(existing) self._insert_after_head_unlocked(existing) return else: self._map.pop(key, None) node = _Node(key, value) self._map[key] = node self._insert_after_head_unlocked(node) while len(self._map) > self._capacity: victim = self._tail.prev if victim is None or victim is self._head: break with victim.lock: if victim.deleted: if victim.prev is not None and victim.next is not None: self._detach_unlocked(victim) self._map.pop(victim.key, None) continue victim.deleted = True self._detach_unlocked(victim) self._map.pop(victim.key, None) finally: self._index_lock.release_write() def delete(self, key: str) -> bool: """ Remove key from the cache. Return True if key existed, otherwise False. """ self._index_lock.acquire_write() try: with self._list_lock: node = self._map.pop(key, None) if node is None: return False with node.lock: if node.deleted: return False node.deleted = True if node.prev is not None and node.next is not None: self._detach_unlocked(node) return True finally: self._index_lock.release_write() def keys(self) -> List[str]: """ Return keys from most recently used to least recently used. We take the index read lock and list lock to obtain a consistent snapshot. """ result: List[str] = [] self._index_lock.acquire_read() try: with self._list_lock: cur = self._head.next while cur is not None and cur is not self._tail: with cur.lock: if not cur.deleted: result.append(cur.key) cur = cur.next return result finally: self._index_lock.release_read() def _debug_size(self) -> int: """Internal helper for tests.""" self._index_lock.acquire_read() try: return len(self._map) finally: self._index_lock.release_read() def _debug_validate(self) -> None: """ Internal consistency checker for tests. Verifies: - list links are well-formed - every live list node is present in the map - no duplicate keys in the list - size never exceeds capacity """ self._index_lock.acquire_read() try: with self._list_lock: assert len(self._map) <= self._capacity, "cache exceeded capacity" seen = set() cur = self._head assert self._head.prev is None assert self._tail.next is None while cur is not None: nxt = cur.next if nxt is not None: assert nxt.prev is cur, "broken backward pointer" if cur not in (self._head, self._tail): with cur.lock: assert not cur.deleted, "deleted node remained in list" assert cur.key in self._map, "live list node missing from map" assert self._map[cur.key] is cur, "map points to wrong node" assert cur.key not in seen, "duplicate key in list" seen.add(cur.key) if cur is self._tail: break cur = nxt assert seen == set(self._map.keys()), "map/list key mismatch" finally: self._index_lock.release_read() def run_tests() -> None: """ Run single-threaded and multi-threaded tests. Multi-threaded test requirements covered: - at least 8 threads - mix of get/put/delete on overlapping keys - assert cache never exceeds capacity - assert get never returns a value for a key that was never inserted """ # Single-threaded tests c = ConcurrentLRUCache(2) assert c.get("missing") is None c.put("a", 1) c.put("b", 2) assert c.keys() == ["b", "a"] assert c.get("a") == 1 assert c.keys() == ["a", "b"] c.put("c", 3) assert c.get("b") is None assert c.keys() == ["c", "a"] c.put("a", 10) assert c.get("a") == 10 assert c.keys()[0] == "a" assert c.delete("a") is True assert c.delete("a") is False assert c.get("a") is None assert c.keys() == ["c"] c1 = ConcurrentLRUCache(1) c1.put("x", 1) assert c1.keys() == ["x"] c1.put("y", 2) assert c1.get("x") is None assert c1.get("y") == 2 assert c1.keys() == ["y"] c1.put("y", 3) assert c1.get("y") == 3 assert c1.delete("z") is False c1._debug_validate() # Concurrent tests capacity = 16 cache = ConcurrentLRUCache(capacity) inserted_ever = set() inserted_lock = threading.Lock() errors: List[str] = [] errors_lock = threading.Lock() stop_flag = threading.Event() key_space = [f"k{i}" for i in range(24)] def record_error(msg: str) -> None: with errors_lock: errors.append(msg) def worker(tid: int) -> None: rnd = random.Random(1000 + tid) try: for i in range(2000): key = key_space[rnd.randrange(len(key_space))] op = rnd.random() if op < 0.45: value = (tid, i) cache.put(key, value) with inserted_lock: inserted_ever.add(key) elif op < 0.80: value = cache.get(key) if value is not None: with inserted_lock: if key not in inserted_ever: record_error(f"get returned value for never-inserted key: {key}") else: cache.delete(key) if i % 100 == 0: size = cache._debug_size() if size > capacity: record_error(f"capacity exceeded: {size} > {capacity}") cache._debug_validate() except Exception as exc: record_error(f"worker {tid} failed: {exc!r}") finally: if stop_flag.is_set(): return def validator() -> None: try: for _ in range(400): cache._debug_validate() size = cache._debug_size() if size > capacity: record_error(f"capacity exceeded in validator: {size} > {capacity}") time.sleep(0.002) except Exception as exc: record_error(f"validator failed: {exc!r}") finally: stop_flag.set() threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] threads += [threading.Thread(target=validator)] for t in threads: t.start() for t in threads: t.join() cache._debug_validate() assert cache._debug_size() <= capacity assert not errors, "\n".join(errors) # Focused same-key contention test hot = ConcurrentLRUCache(4) hot.put("shared", 0) def same_key_worker(tid: int) -> None: rnd = random.Random(2000 + tid) for i in range(1000): if rnd.random() < 0.5: hot.put("shared", (tid, i)) else: _ = hot.get("shared") if rnd.random() < 0.1: hot.delete("shared") hot.put("shared", (tid, i, "reinsert")) ts = [threading.Thread(target=same_key_worker, args=(i,)) for i in range(8)] for t in ts: t.start() for t in ts: t.join() hot._debug_validate() assert hot._debug_size() <= 4 assert hot.get("shared") is not None print("All tests passed.") if __name__ == "__main__": run_tests()

Result

#1 | Winner

Winning Votes

3 / 3

Average Score

86

Total Score

98

Overall Comments

Answer B presents an exceptionally well-engineered solution with a sophisticated, fine-grained locking strategy. It uses three levels of locks (a ReadWriteLock for the index, a mutex for the list, and per-node locks) to maximize concurrency, which is a much more scalable and practical design. The code quality is outstanding, with extremely detailed docstrings that explain the complex design and deadlock avoidance strategy. The testing is also more rigorous, including a dedicated validator thread and a focused contention test. The solution demonstrates a deep understanding of concurrent programming principles.

View Score Details

Correctness

Weight 35%
95

The implementation appears to be highly correct and robust. The complex, fine-grained locking strategy is carefully designed with a strict lock acquisition order to prevent deadlocks. The use of a `deleted` flag on nodes is an effective way to handle race conditions between the index and the list. The included `_debug_validate` function further increases confidence in its correctness.

Completeness

Weight 20%
100

The answer is exceptionally complete. It implements the full required interface and provides an extremely thorough test suite that goes beyond the prompt's requirements by including a dedicated validator thread and a focused test for same-key contention, demonstrating a commitment to robustness.

Code Quality

Weight 20%
100

The code quality is outstanding. The docstrings are exceptionally detailed, explaining not just what the code does but also the complex concurrency design, trade-offs, and the critical lock-ordering rules to prevent deadlocks. The inclusion of a `_debug_validate` method is a hallmark of high-quality, defensive programming.

Practical Value

Weight 15%
95

The solution has very high practical value. Its fine-grained locking design is highly scalable and would perform well under high contention in a real-world application. This is the kind of robust, high-performance design one would expect to find in a production-grade concurrency library.

Instruction Following

Weight 10%
100

The answer perfectly follows all instructions in the prompt, including the interface, concurrency requirements, edge cases, testing specifications, and standard library constraint.

Judge Models OpenAI GPT-5.2

Total Score

87

Overall Comments

Delivers a more carefully engineered concurrent LRU with a writer-preferred RWLock for the index plus a separate list lock and per-node locks, with an explicit lock ordering to reduce deadlock risk. Operations maintain LRU order consistently, keep size within capacity, and handle concurrent update/delete via a `deleted` flag. Includes substantial testing: single-threaded checks, an 8-thread mixed workload plus a separate validator thread performing invariant checks, and a focused same-key contention test. Code is well-structured, documented, and includes internal validation helpers to catch corruption.

View Score Details

Correctness

Weight 35%
85

Maintains coherent map+list invariants with explicit deleted-state handling, stable lock ordering, and eviction loop guarded by index write + list lock. `get/put/delete/keys` behave consistently and `_debug_validate` provides additional assurance against corruption and capacity overflow under contention.

Completeness

Weight 20%
90

Implements the full interface plus helpful debug helpers. run_tests is comprehensive: single-threaded coverage, 8-thread mixed ops, continuous validator thread, capacity and never-inserted checks, and a same-key contention test.

Code Quality

Weight 20%
85

Clear documentation of strategy, lock ordering, and trade-offs; good separation of concerns (index/list/node). Internal validation routines are well-designed and aid maintainability and debugging.

Practical Value

Weight 15%
85

More practical: avoids single global mutex, allows concurrent index reads, and has stronger safety checks. Still not lock-free (acknowledged), but is a solid standard-library concurrent implementation.

Instruction Following

Weight 10%
90

Directly addresses the prompt’s constraints and explains why true lock-free isn’t feasible in stdlib Python. Avoids a single global mutex for all operations via multiple locks and provides the required multi-threaded assertions and coverage.

Total Score

73

Overall Comments

Answer B provides a more sophisticated implementation with multiple layers of locking: a writer-preferred RWLock for the index, a separate list lock, and per-node locks. This is genuinely fine-grained concurrency. The writer-preferred RWLock prevents writer starvation. The get() operation holds the read lock while moving the node to front (using the list lock), which is more correct than Answer A's upgrade pattern. The code includes a _debug_validate() method that thoroughly checks internal consistency. The tests are more comprehensive, including a dedicated same-key contention test and a separate validator thread that continuously checks invariants during concurrent operations. The per-node deleted flag adds robustness against race conditions. However, the get() method holds the index read lock while also acquiring the list lock, which means reads aren't fully concurrent with each other when they both hit (since they contend on the list lock). The code is well-structured with clear documentation of locking rules and deadlock avoidance ordering.

View Score Details

Correctness

Weight 35%
75

Answer B's concurrency design is more correct. The get() holds the index read lock throughout the operation including the move-to-front, avoiding the upgrade race condition. Per-node deleted flags provide additional safety. The writer-preferred RWLock prevents writer starvation. The _debug_validate method helps verify internal consistency. However, holding the read lock while acquiring the list lock means concurrent get hits still serialize on the list lock, though this is a performance rather than correctness issue.

Completeness

Weight 20%
75

Answer B implements all required operations and goes further with a _debug_validate() method for internal consistency checking, a _debug_size() helper, and more comprehensive tests including a dedicated same-key contention test, a separate validator thread running concurrently, and periodic capacity/consistency checks during the multi-threaded test. The per-node deleted flag adds completeness to the concurrency handling.

Code Quality

Weight 20%
70

Answer B has excellent documentation with clear locking rules, deadlock avoidance ordering, and design overview in the class docstring. Uses dataclass for _Node which is clean. The code structure with separate index lock, list lock, and per-node locks is well-organized. The _debug_validate method shows attention to maintainability. Uses len(self._map) instead of a separate size counter, which is more reliable. The writer-preferred RWLock is a more standard and robust implementation.

Practical Value

Weight 15%
70

Answer B is more production-ready with its robust concurrency design, writer-preferred lock preventing starvation, per-node deleted flags for safety, and comprehensive validation utilities. The fine-grained locking with separate index and list locks provides genuine concurrency benefits. The _debug_validate method is valuable for debugging in production. The same-key contention test demonstrates real-world scenario handling.

Instruction Following

Weight 10%
75

Answer B follows all instructions thoroughly: implements all required operations with correct signatures, uses fine-grained concurrency beyond a single global lock, includes comprehensive docstrings and comments explaining the strategy and trade-offs, handles all specified edge cases, and provides run_tests() with extensive single-threaded tests, multi-threaded stress tests with 8+ threads, and additional focused contention tests. The assertions check capacity limits and data integrity as required.

Comparison Summary

Final rank order is determined by judge-wise rank aggregation (average rank + Borda tie-break). Average score is shown for reference.

Judges: 3

Winning Votes

0 / 3

Average Score

68
View this answer

Winning Votes

3 / 3

Average Score

86
View this answer

Judging Results

Why This Side Won

Answer B wins because it provides a more robust and correct concurrency design with multiple lock layers (index RWLock, list lock, per-node locks), a writer-preferred RWLock that avoids writer starvation, a deleted flag on nodes to handle race conditions, and significantly more thorough testing including a validator thread and same-key contention test. Answer A's read-to-write lock upgrade pattern in get() introduces a window where correctness can be compromised (returning None for an existing key), and its readers-preferring RWLock can starve writers under high contention.

Judge Models OpenAI GPT-5.2

Why This Side Won

Answer B wins because it has a significantly stronger and safer concurrency design (separate index/list/node synchronization with a correct RWLock), better correctness under contention (consistent LRU/order and capacity with invariant validation), and much more robust multi-threaded testing. Answer A’s RWLock semantics are questionable and its get-upgrade strategy can spuriously treat concurrent updates as misses, making it less correct and less robust overall.

Why This Side Won

Answer B is the clear winner due to its vastly superior concurrency design, which translates to higher practical value and scalability. While Answer A is correct, its reliance on a single ReadWriteLock that requires a full write lock for the common `get` hit operation creates a significant performance bottleneck. Answer B's fine-grained locking strategy is far more advanced, allowing for much greater parallelism. This superior design is supported by exceptional code quality, thorough documentation explaining the complex locking protocol, and a more robust testing suite, making it a benchmark-quality submission.

X f L