Antwort A: Google Gemini 2.5 Flash
Antwort: import threading import random import time from typing import Any, Optional, List class _Node: """ Repräsentiert einen Knoten in der doppelt verketteten Liste, die für die LRU-Reihenfolge verwendet wird. Verwendet __slots__ für Speichereffizienz. """ __slots__ = 'key', 'value', 'prev', 'next' def __init__(self, key: str, value: Any): self.key = key self.value = value self.prev = None self.next = None class ReadWriteLock: """ Eine benutzerdefinierte Read-Write-Lock-Implementierung. Dieses Lock erlaubt mehreren Lesern, das Lock gleichze...
Vollstandige Antwort anzeigen ▼
Antwort: import threading import random import time from typing import Any, Optional, List class _Node: """ Repräsentiert einen Knoten in der doppelt verketteten Liste, die für die LRU-Reihenfolge verwendet wird. Verwendet __slots__ für Speichereffizienz. """ __slots__ = 'key', 'value', 'prev', 'next' def __init__(self, key: str, value: Any): self.key = key self.value = value self.prev = None self.next = None class ReadWriteLock: """ Eine benutzerdefinierte Read-Write-Lock-Implementierung. Dieses Lock erlaubt mehreren Lesern, das Lock gleichzeitig zu halten, aber jeweils nur einem Schreiber. Ein Schreiber muss warten, bis alle Leser fertig sind, und Leser müssen warten, bis ein Schreiber fertig ist. Dies ist eine leserbevorzugende Implementierung, was bedeutet, dass neue Leser das Leselock erwerben können, selbst wenn ein Schreiber wartet. Diese Klasse stellt außerdem Context Manager für einen saubereren Erwerb und die Freigabe von Locks bereit. """ def __init__(self): self._read_lock = threading.Lock() # Schützt den _readers-Zähler und die Bedingungsvariable self._write_lock = threading.Lock() # Das eigentliche Schreib-Lock self._readers = 0 # Anzahl aktiver Leser self._no_readers = threading.Condition(self._read_lock) # Bedingung, damit Schreiber auf Leser warten können def acquire_read(self): """Erwirbt ein Leselock. Mehrere Leser können dieses Lock gleichzeitig halten.""" with self._read_lock: # Warten, falls ein Schreiber derzeit das Schreib-Lock hält. # Neue Leser blockieren, wenn ein Schreiber aktiv ist. while self._write_lock.locked(): self._no_readers.wait() self._readers += 1 def release_read(self): """Gibt ein Leselock frei.""" with self._read_lock: self._readers -= 1 # Wenn keine Leser mehr übrig sind, alle wartenden Schreiber benachrichtigen if self._readers == 0: self._no_readers.notify_all() def acquire_write(self): """Erwirbt ein Schreib-Lock. Jeweils nur ein Schreiber kann dieses Lock gleichzeitig halten.""" # Zuerst das Haupt-Schreib-Lock erwerben, um die Schreibabsicht zu signalisieren self._write_lock.acquire() with self._read_lock: # Dann read_lock erwerben, um auf aktive Leser zu warten # Warten, bis alle aktiven Leser fertig sind while self._readers > 0: self._no_readers.wait() def release_write(self): """Gibt ein Schreib-Lock frei.""" with self._read_lock: # read_lock erwerben, um potenzielle Leser/Schreiber zu benachrichtigen self._no_readers.notify_all() # Alle wartenden Leser oder Schreiber benachrichtigen self._write_lock.release() # Das Haupt-Schreib-Lock freigeben class _ReadContextManager: def __init__(self, lock): self.lock = lock def __enter__(self): self.lock.acquire_read() def __exit__(self, exc_type, exc_val, exc_tb): self.lock.release_read() class _WriteContextManager: def __init__(self, lock): self.lock = lock def __enter__(self): self.lock.acquire_write() def __exit__(self, exc_type, exc_val, exc_tb): self.lock.release_write() def read_locked(self): """Gibt einen Context Manager zum Erwerb eines Leselocks zurück.""" return self._ReadContextManager(self) def write_locked(self): """Gibt einen Context Manager zum Erwerb eines Schreib-Locks zurück.""" return self._WriteContextManager(self) class LRUCache: """ Eine thread-sichere LRU-Cache-Implementierung (Least Recently Used). Dieser Cache verwendet ein ReadWriteLock, um gleichzeitigen Zugriff zu verwalten. Das Design zielt darauf ab, gleichzeitige Lesezugriffe für Cache-Misses und die Operation `keys()` zu ermöglichen. Operationen jedoch, die den Cache-Zustand oder die LRU-Reihenfolge verändern (`put`, `delete` und Cache-Hits in `get`), erwerben ein Schreib-Lock und gewährleisten damit Atomarität und Konsistenz für diese kritischen Abschnitte. Dieser Ansatz bietet bessere Parallelität als ein einzelner globaler Mutex für alle Operationen, insbesondere bei leseintensiven Workloads mit häufigen Cache-Misses oder `keys()`-Aufrufen. Die Operation `get` verwendet ein "Upgrade"-Muster: Sie versucht zunächst mit einem Leselock, die Existenz des Schlüssels zu prüfen. Wird er gefunden, gibt sie das Leselock frei und erwirbt ein Schreib-Lock, um die LRU-Reihenfolge zu aktualisieren und den neuesten Wert abzurufen. Dieses Muster ist sorgfältig implementiert, um Race Conditions zu behandeln, die während des Lock-Upgrades auftreten können (z. B. wenn der Schlüssel von einem anderen Thread gelöscht oder aktualisiert wird). """ def __init__(self, capacity: int): if capacity <= 0: raise ValueError("Capacity must be a positive integer.") self._capacity = capacity self._cache = {} # Speichert key -> _Node für O(1)-Lookup self._size = 0 # Dummy-Knoten für Kopf und Ende der doppelt verketteten Liste. # Das vereinfacht Randfälle beim Hinzufügen/Entfernen von Knoten, da sich echte Datenknoten # immer zwischen Kopf und Ende befinden. self._head = _Node("dummy_head", None) self._tail = _Node("dummy_tail", None) self._head.next = self._tail self._tail.prev = self._head # Parallelitätskontrolle: Ein einzelnes ReadWriteLock schützt den gesamten Cache-Zustand. self._rw_lock = ReadWriteLock() def _add_node(self, node: _Node): """Fügt einen Knoten direkt nach dem Kopf hinzu (Position des zuletzt verwendeten Elements). Geht davon aus, dass das Schreib-Lock gehalten wird.""" node.prev = self._head node.next = self._head.next self._head.next.prev = node self._head.next = node def _remove_node(self, node: _Node): """Entfernt einen Knoten aus der doppelt verketteten Liste. Geht davon aus, dass das Schreib-Lock gehalten wird.""" prev_node = node.prev next_node = node.next prev_node.next = next_node next_node.prev = prev_node def _move_to_front(self, node: _Node): """Verschiebt einen vorhandenen Knoten an den Anfang der Liste (zuletzt verwendet). Geht davon aus, dass das Schreib-Lock gehalten wird.""" self._remove_node(node) self._add_node(node) def _pop_tail(self) -> _Node: """Entfernt und gibt den am längsten nicht verwendeten Knoten zurück (den direkt vor dem Ende). Geht davon aus, dass das Schreib-Lock gehalten wird.""" node = self._tail.prev self._remove_node(node) return node def get(self, key: str) -> Optional[Any]: """ Gibt den mit dem Schlüssel verknüpften Wert zurück, falls er existiert, und markiert ihn als kürzlich verwendet. Gibt None zurück, wenn sich der Schlüssel nicht im Cache befindet. Diese Operation erwirbt zunächst ein Leselock, um die Existenz des Schlüssels zu prüfen. Wird der Schlüssel gefunden, gibt sie das Leselock frei und erwirbt dann ein Schreib-Lock, um die LRU-Reihenfolge zu aktualisieren und den neuesten Wert abzurufen. Dieses "Upgrade"-Muster erlaubt gleichzeitige `get`-Misses und `keys()`-Operationen, serialisiert aber `get`-Hits mit anderen Schreibvorgängen. """ node_found_under_read_lock = None with self._rw_lock.read_locked(): node_found_under_read_lock = self._cache.get(key) if not node_found_under_read_lock: return None # Wenn der Schlüssel gefunden wurde, Schreib-Lock erwerben, um die LRU-Reihenfolge zu aktualisieren # und den neuesten Wert zu erhalten. # Dies ist ein "Upgrade"-Muster. Hier kann eine Race Condition auftreten: Ein anderer Thread # könnte den Schlüssel zwischen dem Freigeben des Leselocks und dem Erwerb des Schreib-Locks löschen oder aktualisieren. with self._rw_lock.write_locked(): # Erneut prüfen, ob der Schlüssel noch existiert und auf exakt dasselbe Knotenobjekt verweist. # Das ist für Korrektheit unter Konkurrenz entscheidend. Wenn ein anderer Thread # ein `put` für denselben Schlüssel ausgeführt hat, würde `_cache.get(key)` einen *neuen* Knoten zurückgeben. # Wenn ein anderer Thread ein `delete` ausgeführt hat, würde `None` zurückgegeben. current_node = self._cache.get(key) if current_node is node_found_under_read_lock: # Sicherstellen, dass es exakt dasselbe Knotenobjekt ist self._move_to_front(node_found_under_read_lock) value = current_node.value # Nach dem Verschieben den neuesten Wert holen return value else: # Der Knoten wurde zwischen dem Erwerb unseres Lese-Locks und Schreib-Locks # entweder gelöscht oder durch einen anderen 'put'-Vorgang ersetzt. Als Miss behandeln. return None def put(self, key: str, value: Any) -> None: """ Fügt das Schlüssel-Wert-Paar ein oder aktualisiert es. Wenn der Cache nach dem Einfügen die Kapazität überschreitet, wird das am längsten nicht verwendete Element entfernt. Dies ist eine Schreiboperation und erwirbt ein Schreib-Lock. """ with self._rw_lock.write_locked(): node = self._cache.get(key) if node: # Schlüssel existiert: Wert aktualisieren und an den Anfang verschieben node.value = value self._move_to_front(node) else: # Schlüssel existiert nicht: neuen Knoten hinzufügen new_node = _Node(key, value) self._cache[key] = new_node self._add_node(new_node) self._size += 1 # Entfernen, wenn Kapazität überschritten wurde if self._size > self._capacity: lru_node = self._pop_tail() del self._cache[lru_node.key] self._size -= 1 def delete(self, key: str) -> bool: """ Entfernt den Schlüssel aus dem Cache. Gibt True zurück, wenn der Schlüssel vorhanden war, andernfalls False. Dies ist eine Schreiboperation und erwirbt ein Schreib-Lock. """ with self._rw_lock.write_locked(): node = self._cache.get(key) if node: self._remove_node(node) del self._cache[key] self._size -= 1 return True return False def keys(self) -> List[str]: """ Gibt eine Liste aller derzeit im Cache befindlichen Schlüssel zurück, geordnet von zuletzt verwendet bis am längsten nicht verwendet. Dies ist eine Leseoperation und erwirbt ein Leselock. """ with self._rw_lock.read_locked(): keys_list = [] current = self._head.next while current is not self._tail: keys_list.append(current.key) current = current.next return keys_list def run_tests(): """ Führt eine Reihe von Single-Thread- und Multi-Thread-Tests aus, um die LRUCache-Implementierung zu überprüfen. """ print("--- Starte LRU-Cache-Tests ---") # --- Single-Thread-Tests --- print("\n--- Single-Thread-Tests ---") cache = LRUCache(capacity=3) assert cache.get("a") is None, "ST: Abruf eines nicht existierenden Schlüssels sollte None zurückgeben" assert cache.keys() == [], "ST: Ein leerer Cache sollte eine leere Schlüsselliste haben" cache.put("a", 1) assert cache.get("a") == 1, "ST: Vorhandenen Schlüssel nach put abrufen" assert cache.keys() == ["a"], "ST: Schlüsselliste nach einem put" cache.put("b", 2) cache.put("c", 3) assert cache.keys() == ["c", "b", "a"], "ST: Schlüsselliste nach mehreren puts" assert cache._size == 3, "ST: Cache-Größe sollte 3 sein" # Test der Kapazitätsverdrängung cache.put("d", 4) # 'a' sollte entfernt werden, da es LRU ist assert cache.get("a") is None, "ST: Entferntem Schlüssel 'a' sollte None entsprechen" assert cache.get("d") == 4, "ST: Neuer Schlüssel 'd' sollte vorhanden sein" assert cache.keys() == ["d", "c", "b"], "ST: Schlüsselliste nach Verdrängung" assert cache._size == 3, "ST: Cache-Größe sollte nach Verdrängung 3 bleiben" # put mit vorhandenem Schlüssel testen (Wert aktualisieren und an den Anfang verschieben) cache.put("b", 22) # 'b' sollte nach vorn verschoben und der Wert aktualisiert werden assert cache.get("b") == 22, "ST: Aktualisierter Wert für 'b'" assert cache.keys() == ["b", "d", "c"], "ST: Schlüsselliste nach Aktualisierung und Verschiebung von 'b'" # delete testen assert cache.delete("c") is True, "ST: Vorhandenen Schlüssel 'c' löschen" assert cache.get("c") is None, "ST: Gelöschter Schlüssel 'c' sollte None sein" assert cache.keys() == ["b", "d"], "ST: Schlüsselliste nach dem Löschen von 'c'" assert cache._size == 2, "ST: Cache-Größe nach delete" assert cache.delete("z") is False, "ST: Nicht existierenden Schlüssel 'z' löschen" assert cache.keys() == ["b", "d"], "ST: Schlüsselliste nach dem Löschen eines nicht existierenden Schlüssels unverändert" # Kapazität 1 testen cache_cap1 = LRUCache(capacity=1) cache_cap1.put("x", 10) assert cache_cap1.get("x") == 10, "ST: Kapazität 1, 'x' abrufen" cache_cap1.put("y", 20) # 'x' sollte entfernt werden assert cache_cap1.get("x") is None, "ST: Kapazität 1, 'x' entfernt" assert cache_cap1.get("y") == 20, "ST: Kapazität 1, 'y' abrufen" assert cache_cap1.keys() == ["y"], "ST: Kapazität 1, Schlüsselliste" print("Single-Thread-Tests bestanden!") # --- Multi-Thread-Tests --- print("\n--- Multi-Thread-Tests ---") MT_CAPACITY = 10 MT_NUM_THREADS = 8 MT_OPS_PER_THREAD = 2000 MT_KEY_RANGE = 20 # Schlüssel von 0 bis 19 (z. B. "0", "1", ..., "19") mt_cache = LRUCache(capacity=MT_CAPACITY) # Barriere, um sicherzustellen, dass alle Worker-Threads ihre Operationen gleichzeitig starten barrier = threading.Barrier(MT_NUM_THREADS + 1) # +1 für den Haupt-Thread error_flag = threading.Event() # Wird gesetzt, wenn ein Thread auf einen Fehler stößt # Verfolgt Schlüssel, die *jemals* von irgendeinem Thread erfolgreich in den Cache eingefügt wurden. # Wird verwendet, um zu überprüfen, dass 'get' niemals einen Wert für einen Schlüssel zurückgibt, der tatsächlich nie eingefügt wurde. global_ever_inserted_keys = set() global_ever_inserted_keys_lock = threading.Lock() def mt_worker(cache_instance, thread_id, barrier_instance, error_event, ever_inserted_keys_set, ever_inserted_keys_lock): """Worker-Funktion für Multi-Thread-Tests.""" barrier_instance.wait() # Warten, bis alle Threads bereit sind, Operationen zu starten for i in range(MT_OPS_PER_THREAD): if error_event.is_set(): break # Stoppen, wenn ein anderer Thread bereits einen Fehler gefunden hat # Zufällig eine Operation auswählen, mit einer Gewichtung zugunsten von 'get' und 'put' op = random.choices(['get', 'put', 'delete'], weights=[0.5, 0.4, 0.1], k=1)[0] key = str(random.randint(0, MT_KEY_RANGE - 1)) value = f"value_T{thread_id}_K{key}_I{i}" if op == 'put': cache_instance.put(key, value) with ever_inserted_keys_lock: ever_inserted_keys_set.add(key) elif op == 'get': val = cache_instance.get(key) if val is not None: # Sicherstellen, dass der Schlüssel, wenn ein Wert zurückgegeben wird, irgendwann eingefügt worden sein muss. with ever_inserted_keys_lock: if key not in ever_inserted_keys_set: print(f"MT-Fehler: Thread {thread_id} erhielt Wert '{val}' für Schlüssel '{key}', " f"der global nie eingefügt wurde. Das sollte nicht passieren.") error_event.set() break # Sicherstellen, dass der Wert nicht beschädigt ist (einfache Formatprüfung) if not isinstance(val, str) or not val.startswith("value_T"): print(f"MT-Fehler: Thread {thread_id} erhielt beschädigten Wert '{val}' für Schlüssel '{key}'.") error_event.set() break elif op == 'delete': cache_instance.delete(key) # Optional: Eine kleine zufällige Schlafzeit einführen, um Kontextwechsel und Konkurrenz zu erhöhen # time.sleep(random.uniform(0.00001, 0.0001)) # print(f"Thread {thread_id} beendet.") # Für ausführliche Ausgabe einkommentieren threads = [] for i in range(MT_NUM_THREADS): thread = threading.Thread(target=mt_worker, args=(mt_cache, i, barrier, error_flag, global_ever_inserted_keys, global_ever_inserted_keys_lock)) threads.append(thread) thread.start() # Warten, bis alle Worker-Threads die Barriere erreicht haben und mit den Operationen beginnen barrier.wait() print(f"Alle {MT_NUM_THREADS} Threads haben ihre Operationen gestartet.") # Warten, bis alle Worker-Threads ihre Operationen abgeschlossen haben for thread in threads: thread.join() print("Alle Threads abgeschlossen.") # Abschließende Assertions für Multi-Thread-Tests assert not error_flag.is_set(), "MT: Während der Multi-Thread-Operationen wurden Fehler erkannt. Details in der Konsole prüfen." assert mt_cache._size <= MT_CAPACITY, f"MT: Cache-Größe ({mt_cache._size}) überschritt die Kapazität ({MT_CAPACITY})." # Prüfen, dass alle im Cache verbleibenden Schlüssel irgendwann eingefügt wurden final_keys = mt_cache.keys() with global_ever_inserted_keys_lock: for key in final_keys: assert key in global_ever_inserted_keys, f"MT-Fehler: Finaler Cache enthält Schlüssel '{key}', der global nie eingefügt wurde." print("Multi-Thread-Tests bestanden!") print("\n--- Alle LRU-Cache-Tests bestanden! ---") if __name__ == '__main__': run_tests()
Ergebnis
Siegstimmen
0 / 3
Durchschnittsscore
Gesamtpunktzahl
Gesamtkommentar
Antwort A bietet eine korrekte und gut strukturierte Implementierung eines nebenläufigen LRU-Caches. Sie verwendet ein benutzerdefiniertes ReadWriteLock und ein cleveres „Lock-Upgrade“-Muster in der `get`-Methode, um Race Conditions zu behandeln. Der Code ist sauber, gut kommentiert und enthält eine umfassende Testsuite. Seine Hauptschwäche liegt jedoch in seinem Nebenläufigkeitsmodell. Da ein `get`-Treffer eine vollständige Schreibsperre zum Aktualisieren der LRU-Reihenfolge erfordert, wird die Leistung des Caches bei leseintensiven Workloads mit einem warmen Cache stark beeinträchtigt, was seinen praktischen Vorteil gegenüber einer einfachen globalen Sperre marginal macht.
Bewertungsdetails anzeigen ▼
Korrektheit
Gewichtung 35%Die Implementierung ist korrekt und die Behandlung der Race Condition im Lock-Upgrade-Muster der `get`-Methode ist solide. Die Wahl einer leserbevorzugenden Sperre könnte jedoch zu Writer-Starvation führen, und das allgemeine Nebenläufigkeitsmodell weist inhärente Leistungsprobleme auf, die bei diesem speziellen Problem an einen Designfehler grenzen.
Vollstandigkeit
Gewichtung 20%Die Antwort ist vollständig. Sie implementiert alle erforderlichen Methoden der Schnittstelle und stellt eine `run_tests`-Funktion mit soliden Single-Threaded- und Multi-Threaded-Testfällen bereit, die die Anforderungen der Aufforderung erfüllen.
Codequalitat
Gewichtung 20%Die Codequalität ist sehr gut. Der Code ist gut strukturiert, lesbar und enthält klare Docstrings und Kommentare, die seine Nebenläufigkeitsstrategie erklären. Die Verwendung von `__slots__` in der `_Node`-Klasse ist eine nette Optimierung.
Praktischer Nutzen
Gewichtung 15%Der praktische Wert ist durch das Nebenläufigkeitsmodell begrenzt. Da `get`-Treffer (die häufigste Operation bei einem warmen Cache) eine vollständige Schreibsperre erfordern, serialisiert die Implementierung die meisten Operationen und bietet in vielen realen, leseintensiven Szenarien nur geringe Leistungsvorteile gegenüber einer einfachen globalen Sperre.
Befolgung der Anweisungen
Gewichtung 10%Die Antwort folgt perfekt allen Anweisungen in der Aufforderung, einschließlich der Schnittstelle, der Nebenläufigkeitsanforderungen, der Randfälle, der Testspezifikationen und der Einschränkung der Standardbibliothek.
Gesamtpunktzahl
Gesamtkommentar
Bietet einen lauffähigen LRU-Cache mit einem benutzerdefinierten RW-Sperrmechanismus und einer doppelt verketteten Liste. Das grundlegende Single-Thread-Verhalten und die Kapazitätsverwaltung scheinen korrekt zu sein. Die Concurrency-Strategie ist jedoch effektiv eine globale Schreibsperre für alle Hits, Puts und Deletes, und die RWLock-Implementierung ist fehlerhaft: Leser prüfen `write_lock.locked()` ohne diese zu halten, und Schreiber können die Schreibsperre erwerben, während neue Leser noch zugelassen werden, was die RW-Semantik potenziell verletzt. Der „Upgrade“-Pfad von `get` kann fälschlicherweise None zurückgeben, auch wenn der Schlüssel existiert (ersetzter Knoten), und kann Werte unter starker Beanspruchung übersehen; obwohl er argumentativ veraltete Daten vermeidet, opfert er die Korrektheitserwartungen für `get` unter gleichzeitigen Put-Operationen auf demselben Schlüssel. Die Tests sind anständig, verlassen sich aber auf die internen `_size` und validieren nicht die Listen-/Map-Integrität unter starker Beanspruchung.
Bewertungsdetails anzeigen ▼
Korrektheit
Gewichtung 35%Die LRU-Logik ist im Single-Thread-Betrieb größtenteils korrekt, aber die RWLock-Implementierung ist kein solides Readers-Writer-Lock (die Leser-Gating-Logik basierend auf `write_lock.locked()`/Condition-Nutzung ist anfällig für Race Conditions). Unter starker Beanspruchung kann `get` None für einen vorhandenen Schlüssel zurückgeben, wenn dieser zwischen den Sperren aktualisiert (Knoten ersetzt) wurde, was eine vermeidbare semantische Verschlechterung darstellt und das erwartete Verhalten für gleichzeitige Put/Get-Operationen auf demselben Schlüssel verletzen kann.
Vollstandigkeit
Gewichtung 20%Implementiert alle erforderlichen Methoden und bietet `run_tests` mit Single- und Multi-Thread-Szenarien unter Verwendung von 8 Threads und Invarianzprüfungen (Kapazität, nie eingefügter Schlüssel). Es fehlen jedoch tiefere Integritätsprüfungen der Struktur (Listen-/Map-Konsistenz) und es werden interne Felder in Tests verwendet.
Codequalitat
Gewichtung 20%Lesbare Struktur und Kommentare, aber die RWLock-Implementierung ist komplex und arguably inkorrekt; verwendet private Felder in Tests und gibt `_size` preis, das nicht unbedingt erforderlich ist. Einige Erklärungen (reader-preferring vs. tatsächliches Verhalten) stimmen nicht mit dem Code überein.
Praktischer Nutzen
Gewichtung 15%Funktioniert als grundlegender paralleler Cache, aber die Leistung unter starker Beanspruchung ist begrenzt, da alle Hits eine Schreibsperre erfordern und Bedenken hinsichtlich des RWLocks das Vertrauen für den Produktionseinsatz verringern.
Befolgung der Anweisungen
Gewichtung 10%Erfüllt die Schnittstellenanforderungen und enthält Docstrings/Kommentare sowie einen 8-Thread-Mixed-Test. Die Aufforderung, eine globale Sperre für jede Operation zu vermeiden, wird jedoch nicht erfüllt; in der Praxis werden die meisten sinnvollen Operationen über die Schreibsperre serialisiert, und die Vorteile der Nebenläufigkeit sind begrenzt.
Gesamtpunktzahl
Gesamtkommentar
Antwort A bietet eine vollständige, lauffähige Implementierung mit einem benutzerdefinierten ReadWriteLock und einem LRU-Cache, der eine doppelt verkettete Liste mit Sentinel-Knoten verwendet. Die Concurrency-Strategie verwendet ein einzelnes RWLock für den gesamten Cache, was besser ist als ein globaler Mutex, aber immer noch begrenzt. Die get()-Methode verwendet ein Read-then-Write-Upgrade-Muster, das ein subtiles Problem aufweist: Zwischen der Freigabe der Lesesperre und dem Erwerb der Schreibsperre könnte der Knoten gelöscht und der Schlüssel mit einem anderen Knoten neu eingefügt werden, wodurch get() None zurückgibt, obwohl der Schlüssel existiert (ein Korrektheitsproblem). Die ReadWriteLock-Implementierung hat ein potenzielles Problem: Sie prüft `self._write_lock.locked()` in acquire_read, aber dies ist eine Lesers-bevorzugende Sperre, die Schreiber aushungern könnte. Die Tests sind angemessen, könnten aber gründlicher sein. Der Code ist gut dokumentiert mit klaren Docstrings, die die Concurrency-Strategie erklären.
Bewertungsdetails anzeigen ▼
Korrektheit
Gewichtung 35%Antwort A's get() hat ein Korrektheitsproblem: Das Read-to-Write-Lock-Upgrade-Muster bedeutet, dass zwischen der Freigabe der Lesesperre und dem Erwerb der Schreibsperre ein anderer Thread denselben Schlüssel mit einem neuen Knoten löschen und neu einfügen könnte. Die Identitätsprüfung (current_node ist node_found_under_read_lock) würde dann fehlschlagen, wodurch get() None zurückgibt, obwohl der Schlüssel mit einem gültigen Wert existiert. Das ReadWriteLock's acquire_read prüft _write_lock.locked(), was nicht vollständig zuverlässig ist. Das Lesers-bevorzugende Design kann Schreiber aushungern.
Vollstandigkeit
Gewichtung 20%Antwort A implementiert alle erforderlichen Operationen (get, put, delete, keys) mit ordnungsgemäßen LRU-Semantiken. Sie behandelt Kapazitäten von 1, aktualisiert vorhandene Schlüssel und löscht nicht vorhandene Schlüssel. Die Tests decken Single-Threaded- und Multi-Threaded-Szenarien mit 8 Threads ab. Es fehlen jedoch interne Validierungsdienstprogramme und der Multi-Threaded-Test könnte gründlicher sein.
Codequalitat
Gewichtung 20%Antwort A hat eine gute Dokumentation mit klaren Docstrings, die die Concurrency-Strategie und Kompromisse erklären. Der Code ist gut strukturiert mit sauberer Trennung der Zuständigkeiten. Das ReadWriteLock verfügt über Kontextmanager für eine saubere Nutzung. Verwendet __slots__ auf _Node für Speichereffizienz. Das _size-Tracking ist jedoch manuell und könnte bei Fehlern abweichen, und die ReadWriteLock-Implementierung weist einige fragwürdige Designentscheidungen auf (Prüfung des gesperrten Zustands).
Praktischer Nutzen
Gewichtung 15%Antwort A bietet einen funktionsfähigen LRU-Cache, aber die Race Condition des get()-Upgrade-Musters reduziert die praktische Zuverlässigkeit. Das Lesers-bevorzugende RWLock könnte im Produktionsbetrieb zu Schreiber-Aushungerung führen. Der Concurrency-Vorteil ist begrenzt, da get-Hits immer noch eine Schreibsperre erfordern und das Upgrade-Muster zusätzlichen Overhead hinzufügt. Müsste vor dem Produktionseinsatz behoben werden.
Befolgung der Anweisungen
Gewichtung 10%Antwort A folgt den meisten Anweisungen: implementiert alle erforderlichen Operationen, verwendet eine Concurrency-Strategie über eine einzelne globale Sperre hinaus, enthält Docstrings und Kommentare, behandelt Randfälle und bietet run_tests() mit Single-Threaded- und Multi-Threaded-Tests mit 8 Threads. Die Assertions des Multi-Threaded-Tests könnten jedoch stärker sein, und die Erklärung des Concurrency-Designs könnte die Kompromisse besser berücksichtigen.