Orivel Orivel
Menue oeffnen

Implementiere einen sperrfreien konkurrierenden LRU-Cache

Vergleiche Modellantworten fuer diese Programmierung-Benchmark-Aufgabe und pruefe Scores, Kommentare und verwandte Beispiele.

Bitte einloggen oder registrieren, um Likes und Favoriten zu nutzen. Registrieren

X f L

Inhalt

Aufgabenubersicht

Vergleichsgenres

Programmierung

Aufgaben-Erstellermodell

Antwortende Modelle

Bewertungsmodelle

Aufgabenstellung

Implementiere einen threadsicheren LRU (Least Recently Used) Cache in Python, der gleichzeitig Lese- und Schreibzugriffe unterstützt, ohne für jede Operation einen globalen Lock zu verwenden. Deine Implementierung muss die folgenden Anforderungen erfüllen: 1. **Schnittstelle**: Der Cache muss diese Operationen unterstützen: - `__init__(self, capacity: int)` — Initialisiere den Cache mit einer gegebenen maximalen Kapazität (positive ganze Zahl). - `get(self, key: str) -> Optional[Any]` — Gib den mit dem Schlü...

Mehr anzeigen

Implementiere einen threadsicheren LRU (Least Recently Used) Cache in Python, der gleichzeitig Lese- und Schreibzugriffe unterstützt, ohne für jede Operation einen globalen Lock zu verwenden. Deine Implementierung muss die folgenden Anforderungen erfüllen: 1. **Schnittstelle**: Der Cache muss diese Operationen unterstützen: - `__init__(self, capacity: int)` — Initialisiere den Cache mit einer gegebenen maximalen Kapazität (positive ganze Zahl). - `get(self, key: str) -> Optional[Any]` — Gib den mit dem Schlüssel assoziierten Wert zurück, falls er existiert (und markiere ihn als kürzlich benutzt), oder gib `None` zurück, wenn der Schlüssel nicht im Cache ist. - `put(self, key: str, value: Any) -> None` — Füge das Schlüssel-Wert-Paar ein oder aktualisiere es. Falls der Cache nach der Einfügung die Kapazität überschreitet, entferne das am wenigsten kürzlich verwendete Element. - `delete(self, key: str) -> bool` — Entferne den Schlüssel aus dem Cache. Gib `True` zurück, wenn der Schlüssel vorhanden war, sonst `False`. - `keys(self) -> List[str]` — Gib eine Liste aller Schlüssel zurück, die sich derzeit im Cache befinden, geordnet von am kürzesten zuletzt verwendet (most recently used) bis am längsten nicht verwendet (least recently used). 2. **Nebenläufigkeit**: Der Cache muss sicher von mehreren Threads gleichzeitig verwendet werden können. Ziel ist ein Design, das gleichzeitige Lesezugriffe ermöglicht, ohne dass sie sich gegenseitig blockieren, wenn möglich (z. B. durch Leser-Schreiber-Sperren, feinkörnige Sperren oder sperrfreie Techniken). Ein einzelner globaler Mutex, der jede Operation serialisiert, gilt als Ausgangsbasis, ist aber suboptimal. 3. **Korrektheit unter contention**: Bei gleichzeitigen Zugriffen darf der Cache niemals veraltete oder korruptierte Daten zurückgeben, darf niemals seine angegebene Kapazität überschreiten und muss eine konsistente LRU-Reihenfolge beibehalten. 4. **Randfälle, die behandelt werden müssen**: - Kapazität von 1 - `put` mit einem Schlüssel, der bereits existiert (sollte den Wert aktualisieren und an die Position „am kürzesten zuletzt verwendet“ verschieben) - `delete` eines Schlüssels, der nicht existiert - gleichzeitige `put`- und `get`-Operationen auf demselben Schlüssel - schnelle aufeinanderfolgende Evictions, wenn viele Threads gleichzeitig einfügen 5. **Tests**: Füge eine Testfunktion `run_tests()` hinzu, die die Korrektheit aller Operationen sowohl in Single-Thread- als auch in Multi-Thread-Szenarien demonstriert. Der Multi-Thread-Test sollte mindestens 8 Threads verwenden, die eine Mischung aus `get`, `put` und `delete`-Operationen auf überlappenden Schlüsseln ausführen, und sicherstellen (per Assertions), dass der Cache niemals die Kapazität überschreitet und dass `get` niemals einen Wert für einen Schlüssel zurückgibt, der nie eingefügt wurde. Gib deine vollständige Implementierung in Python an. Verwende nur die Standardbibliothek (keine Drittanbieter-Pakete). Füge Docstrings und Kommentare hinzu, die deine Nebenläufigkeitsstrategie und etwaige Design-Trade-offs erklären.

Bewertungsrichtlinie

Eine starke Antwort sollte eine vollständige, ausführbare Python-Implementierung liefern, die alle fünf Anforderungen erfüllt. Wichtige Qualitätsdimensionen zur Bewertung: Korrektheit: Alle Operationen (get, put, delete, keys) müssen sich gemäß LRU-Semantik sowohl im Single-Thread- als auch im Multi-Thread-Kontext korrekt verhalten. Der Cache darf niemals die Kapazität überschreiten und darf niemals korruptierte Daten zurückgeben. Nebenläufigkeits-Design: Die Lösung sollte über eine einzelne globale Sperre hinaus...

Mehr anzeigen

Eine starke Antwort sollte eine vollständige, ausführbare Python-Implementierung liefern, die alle fünf Anforderungen erfüllt. Wichtige Qualitätsdimensionen zur Bewertung: Korrektheit: Alle Operationen (get, put, delete, keys) müssen sich gemäß LRU-Semantik sowohl im Single-Thread- als auch im Multi-Thread-Kontext korrekt verhalten. Der Cache darf niemals die Kapazität überschreiten und darf niemals korruptierte Daten zurückgeben. Nebenläufigkeits-Design: Die Lösung sollte über eine einzelne globale Sperre hinausgehen. Lösungen höherer Qualität verwenden Leser-Schreiber-Sperren, geshardete Sperren oder andere feinkörnige Nebenläufigkeitsstrategien, die gleichzeitige Lesezugriffe erlauben. Die Antwort sollte die gewählte Strategie und ihre Trade-offs klar erklären. Randfallbehandlung: Die Implementierung sollte Kapazität 1, das Aktualisieren bereits vorhandener Schlüssel per put, das Löschen nicht vorhandener Schlüssel und gleichzeitige Operationen auf demselben Schlüssel korrekt behandeln. Tests: Die Funktion run_tests sollte aussagekräftige Single-Thread-Korrektheitstests und einen Multi-Thread-Stresstest mit mindestens 8 Threads enthalten und Invarianten wie Kapazitätsgrenzen und Datenintegrität prüfen. Codequalität: Der Code sollte gut strukturiert sein, Docstrings und Kommentare enthalten, die Nebenläufigkeitsstrategie erklären, und nur die Python-Standardbibliothek verwenden. Lesbarkeit und Wartbarkeit sind wichtig. Robustheit: Die Implementierung darf nicht deadlocken, sollte Ausnahmen vernünftig handhaben und unter hoher Belastung korrekt arbeiten. Teil- oder unvollständige Lösungen, Lösungen mit Race Conditions oder Lösungen, die nur eine triviale globale Sperre verwenden, sollten niedriger bewertet werden.

Antworten

Antwort: import threading import random import time from typing import Any, Optional, List class _Node: """ Repräsentiert einen Knoten in der doppelt verketteten Liste, die für die LRU-Reihenfolge verwendet wird. Verwendet __slots__ für Speichereffizienz. """ __slots__ = 'key', 'value', 'prev', 'next' def __init__(self, key: str, value: Any): self.key = key self.value = value self.prev = None self.next = None class ReadWriteLock: """ Eine benutzerdefinierte Read-Write-Lock-Implementierung. Dieses Lock erlaubt mehreren Lesern, das Lock gleichze...

Vollstandige Antwort anzeigen

Antwort: import threading import random import time from typing import Any, Optional, List class _Node: """ Repräsentiert einen Knoten in der doppelt verketteten Liste, die für die LRU-Reihenfolge verwendet wird. Verwendet __slots__ für Speichereffizienz. """ __slots__ = 'key', 'value', 'prev', 'next' def __init__(self, key: str, value: Any): self.key = key self.value = value self.prev = None self.next = None class ReadWriteLock: """ Eine benutzerdefinierte Read-Write-Lock-Implementierung. Dieses Lock erlaubt mehreren Lesern, das Lock gleichzeitig zu halten, aber jeweils nur einem Schreiber. Ein Schreiber muss warten, bis alle Leser fertig sind, und Leser müssen warten, bis ein Schreiber fertig ist. Dies ist eine leserbevorzugende Implementierung, was bedeutet, dass neue Leser das Leselock erwerben können, selbst wenn ein Schreiber wartet. Diese Klasse stellt außerdem Context Manager für einen saubereren Erwerb und die Freigabe von Locks bereit. """ def __init__(self): self._read_lock = threading.Lock() # Schützt den _readers-Zähler und die Bedingungsvariable self._write_lock = threading.Lock() # Das eigentliche Schreib-Lock self._readers = 0 # Anzahl aktiver Leser self._no_readers = threading.Condition(self._read_lock) # Bedingung, damit Schreiber auf Leser warten können def acquire_read(self): """Erwirbt ein Leselock. Mehrere Leser können dieses Lock gleichzeitig halten.""" with self._read_lock: # Warten, falls ein Schreiber derzeit das Schreib-Lock hält. # Neue Leser blockieren, wenn ein Schreiber aktiv ist. while self._write_lock.locked(): self._no_readers.wait() self._readers += 1 def release_read(self): """Gibt ein Leselock frei.""" with self._read_lock: self._readers -= 1 # Wenn keine Leser mehr übrig sind, alle wartenden Schreiber benachrichtigen if self._readers == 0: self._no_readers.notify_all() def acquire_write(self): """Erwirbt ein Schreib-Lock. Jeweils nur ein Schreiber kann dieses Lock gleichzeitig halten.""" # Zuerst das Haupt-Schreib-Lock erwerben, um die Schreibabsicht zu signalisieren self._write_lock.acquire() with self._read_lock: # Dann read_lock erwerben, um auf aktive Leser zu warten # Warten, bis alle aktiven Leser fertig sind while self._readers > 0: self._no_readers.wait() def release_write(self): """Gibt ein Schreib-Lock frei.""" with self._read_lock: # read_lock erwerben, um potenzielle Leser/Schreiber zu benachrichtigen self._no_readers.notify_all() # Alle wartenden Leser oder Schreiber benachrichtigen self._write_lock.release() # Das Haupt-Schreib-Lock freigeben class _ReadContextManager: def __init__(self, lock): self.lock = lock def __enter__(self): self.lock.acquire_read() def __exit__(self, exc_type, exc_val, exc_tb): self.lock.release_read() class _WriteContextManager: def __init__(self, lock): self.lock = lock def __enter__(self): self.lock.acquire_write() def __exit__(self, exc_type, exc_val, exc_tb): self.lock.release_write() def read_locked(self): """Gibt einen Context Manager zum Erwerb eines Leselocks zurück.""" return self._ReadContextManager(self) def write_locked(self): """Gibt einen Context Manager zum Erwerb eines Schreib-Locks zurück.""" return self._WriteContextManager(self) class LRUCache: """ Eine thread-sichere LRU-Cache-Implementierung (Least Recently Used). Dieser Cache verwendet ein ReadWriteLock, um gleichzeitigen Zugriff zu verwalten. Das Design zielt darauf ab, gleichzeitige Lesezugriffe für Cache-Misses und die Operation `keys()` zu ermöglichen. Operationen jedoch, die den Cache-Zustand oder die LRU-Reihenfolge verändern (`put`, `delete` und Cache-Hits in `get`), erwerben ein Schreib-Lock und gewährleisten damit Atomarität und Konsistenz für diese kritischen Abschnitte. Dieser Ansatz bietet bessere Parallelität als ein einzelner globaler Mutex für alle Operationen, insbesondere bei leseintensiven Workloads mit häufigen Cache-Misses oder `keys()`-Aufrufen. Die Operation `get` verwendet ein "Upgrade"-Muster: Sie versucht zunächst mit einem Leselock, die Existenz des Schlüssels zu prüfen. Wird er gefunden, gibt sie das Leselock frei und erwirbt ein Schreib-Lock, um die LRU-Reihenfolge zu aktualisieren und den neuesten Wert abzurufen. Dieses Muster ist sorgfältig implementiert, um Race Conditions zu behandeln, die während des Lock-Upgrades auftreten können (z. B. wenn der Schlüssel von einem anderen Thread gelöscht oder aktualisiert wird). """ def __init__(self, capacity: int): if capacity <= 0: raise ValueError("Capacity must be a positive integer.") self._capacity = capacity self._cache = {} # Speichert key -> _Node für O(1)-Lookup self._size = 0 # Dummy-Knoten für Kopf und Ende der doppelt verketteten Liste. # Das vereinfacht Randfälle beim Hinzufügen/Entfernen von Knoten, da sich echte Datenknoten # immer zwischen Kopf und Ende befinden. self._head = _Node("dummy_head", None) self._tail = _Node("dummy_tail", None) self._head.next = self._tail self._tail.prev = self._head # Parallelitätskontrolle: Ein einzelnes ReadWriteLock schützt den gesamten Cache-Zustand. self._rw_lock = ReadWriteLock() def _add_node(self, node: _Node): """Fügt einen Knoten direkt nach dem Kopf hinzu (Position des zuletzt verwendeten Elements). Geht davon aus, dass das Schreib-Lock gehalten wird.""" node.prev = self._head node.next = self._head.next self._head.next.prev = node self._head.next = node def _remove_node(self, node: _Node): """Entfernt einen Knoten aus der doppelt verketteten Liste. Geht davon aus, dass das Schreib-Lock gehalten wird.""" prev_node = node.prev next_node = node.next prev_node.next = next_node next_node.prev = prev_node def _move_to_front(self, node: _Node): """Verschiebt einen vorhandenen Knoten an den Anfang der Liste (zuletzt verwendet). Geht davon aus, dass das Schreib-Lock gehalten wird.""" self._remove_node(node) self._add_node(node) def _pop_tail(self) -> _Node: """Entfernt und gibt den am längsten nicht verwendeten Knoten zurück (den direkt vor dem Ende). Geht davon aus, dass das Schreib-Lock gehalten wird.""" node = self._tail.prev self._remove_node(node) return node def get(self, key: str) -> Optional[Any]: """ Gibt den mit dem Schlüssel verknüpften Wert zurück, falls er existiert, und markiert ihn als kürzlich verwendet. Gibt None zurück, wenn sich der Schlüssel nicht im Cache befindet. Diese Operation erwirbt zunächst ein Leselock, um die Existenz des Schlüssels zu prüfen. Wird der Schlüssel gefunden, gibt sie das Leselock frei und erwirbt dann ein Schreib-Lock, um die LRU-Reihenfolge zu aktualisieren und den neuesten Wert abzurufen. Dieses "Upgrade"-Muster erlaubt gleichzeitige `get`-Misses und `keys()`-Operationen, serialisiert aber `get`-Hits mit anderen Schreibvorgängen. """ node_found_under_read_lock = None with self._rw_lock.read_locked(): node_found_under_read_lock = self._cache.get(key) if not node_found_under_read_lock: return None # Wenn der Schlüssel gefunden wurde, Schreib-Lock erwerben, um die LRU-Reihenfolge zu aktualisieren # und den neuesten Wert zu erhalten. # Dies ist ein "Upgrade"-Muster. Hier kann eine Race Condition auftreten: Ein anderer Thread # könnte den Schlüssel zwischen dem Freigeben des Leselocks und dem Erwerb des Schreib-Locks löschen oder aktualisieren. with self._rw_lock.write_locked(): # Erneut prüfen, ob der Schlüssel noch existiert und auf exakt dasselbe Knotenobjekt verweist. # Das ist für Korrektheit unter Konkurrenz entscheidend. Wenn ein anderer Thread # ein `put` für denselben Schlüssel ausgeführt hat, würde `_cache.get(key)` einen *neuen* Knoten zurückgeben. # Wenn ein anderer Thread ein `delete` ausgeführt hat, würde `None` zurückgegeben. current_node = self._cache.get(key) if current_node is node_found_under_read_lock: # Sicherstellen, dass es exakt dasselbe Knotenobjekt ist self._move_to_front(node_found_under_read_lock) value = current_node.value # Nach dem Verschieben den neuesten Wert holen return value else: # Der Knoten wurde zwischen dem Erwerb unseres Lese-Locks und Schreib-Locks # entweder gelöscht oder durch einen anderen 'put'-Vorgang ersetzt. Als Miss behandeln. return None def put(self, key: str, value: Any) -> None: """ Fügt das Schlüssel-Wert-Paar ein oder aktualisiert es. Wenn der Cache nach dem Einfügen die Kapazität überschreitet, wird das am längsten nicht verwendete Element entfernt. Dies ist eine Schreiboperation und erwirbt ein Schreib-Lock. """ with self._rw_lock.write_locked(): node = self._cache.get(key) if node: # Schlüssel existiert: Wert aktualisieren und an den Anfang verschieben node.value = value self._move_to_front(node) else: # Schlüssel existiert nicht: neuen Knoten hinzufügen new_node = _Node(key, value) self._cache[key] = new_node self._add_node(new_node) self._size += 1 # Entfernen, wenn Kapazität überschritten wurde if self._size > self._capacity: lru_node = self._pop_tail() del self._cache[lru_node.key] self._size -= 1 def delete(self, key: str) -> bool: """ Entfernt den Schlüssel aus dem Cache. Gibt True zurück, wenn der Schlüssel vorhanden war, andernfalls False. Dies ist eine Schreiboperation und erwirbt ein Schreib-Lock. """ with self._rw_lock.write_locked(): node = self._cache.get(key) if node: self._remove_node(node) del self._cache[key] self._size -= 1 return True return False def keys(self) -> List[str]: """ Gibt eine Liste aller derzeit im Cache befindlichen Schlüssel zurück, geordnet von zuletzt verwendet bis am längsten nicht verwendet. Dies ist eine Leseoperation und erwirbt ein Leselock. """ with self._rw_lock.read_locked(): keys_list = [] current = self._head.next while current is not self._tail: keys_list.append(current.key) current = current.next return keys_list def run_tests(): """ Führt eine Reihe von Single-Thread- und Multi-Thread-Tests aus, um die LRUCache-Implementierung zu überprüfen. """ print("--- Starte LRU-Cache-Tests ---") # --- Single-Thread-Tests --- print("\n--- Single-Thread-Tests ---") cache = LRUCache(capacity=3) assert cache.get("a") is None, "ST: Abruf eines nicht existierenden Schlüssels sollte None zurückgeben" assert cache.keys() == [], "ST: Ein leerer Cache sollte eine leere Schlüsselliste haben" cache.put("a", 1) assert cache.get("a") == 1, "ST: Vorhandenen Schlüssel nach put abrufen" assert cache.keys() == ["a"], "ST: Schlüsselliste nach einem put" cache.put("b", 2) cache.put("c", 3) assert cache.keys() == ["c", "b", "a"], "ST: Schlüsselliste nach mehreren puts" assert cache._size == 3, "ST: Cache-Größe sollte 3 sein" # Test der Kapazitätsverdrängung cache.put("d", 4) # 'a' sollte entfernt werden, da es LRU ist assert cache.get("a") is None, "ST: Entferntem Schlüssel 'a' sollte None entsprechen" assert cache.get("d") == 4, "ST: Neuer Schlüssel 'd' sollte vorhanden sein" assert cache.keys() == ["d", "c", "b"], "ST: Schlüsselliste nach Verdrängung" assert cache._size == 3, "ST: Cache-Größe sollte nach Verdrängung 3 bleiben" # put mit vorhandenem Schlüssel testen (Wert aktualisieren und an den Anfang verschieben) cache.put("b", 22) # 'b' sollte nach vorn verschoben und der Wert aktualisiert werden assert cache.get("b") == 22, "ST: Aktualisierter Wert für 'b'" assert cache.keys() == ["b", "d", "c"], "ST: Schlüsselliste nach Aktualisierung und Verschiebung von 'b'" # delete testen assert cache.delete("c") is True, "ST: Vorhandenen Schlüssel 'c' löschen" assert cache.get("c") is None, "ST: Gelöschter Schlüssel 'c' sollte None sein" assert cache.keys() == ["b", "d"], "ST: Schlüsselliste nach dem Löschen von 'c'" assert cache._size == 2, "ST: Cache-Größe nach delete" assert cache.delete("z") is False, "ST: Nicht existierenden Schlüssel 'z' löschen" assert cache.keys() == ["b", "d"], "ST: Schlüsselliste nach dem Löschen eines nicht existierenden Schlüssels unverändert" # Kapazität 1 testen cache_cap1 = LRUCache(capacity=1) cache_cap1.put("x", 10) assert cache_cap1.get("x") == 10, "ST: Kapazität 1, 'x' abrufen" cache_cap1.put("y", 20) # 'x' sollte entfernt werden assert cache_cap1.get("x") is None, "ST: Kapazität 1, 'x' entfernt" assert cache_cap1.get("y") == 20, "ST: Kapazität 1, 'y' abrufen" assert cache_cap1.keys() == ["y"], "ST: Kapazität 1, Schlüsselliste" print("Single-Thread-Tests bestanden!") # --- Multi-Thread-Tests --- print("\n--- Multi-Thread-Tests ---") MT_CAPACITY = 10 MT_NUM_THREADS = 8 MT_OPS_PER_THREAD = 2000 MT_KEY_RANGE = 20 # Schlüssel von 0 bis 19 (z. B. "0", "1", ..., "19") mt_cache = LRUCache(capacity=MT_CAPACITY) # Barriere, um sicherzustellen, dass alle Worker-Threads ihre Operationen gleichzeitig starten barrier = threading.Barrier(MT_NUM_THREADS + 1) # +1 für den Haupt-Thread error_flag = threading.Event() # Wird gesetzt, wenn ein Thread auf einen Fehler stößt # Verfolgt Schlüssel, die *jemals* von irgendeinem Thread erfolgreich in den Cache eingefügt wurden. # Wird verwendet, um zu überprüfen, dass 'get' niemals einen Wert für einen Schlüssel zurückgibt, der tatsächlich nie eingefügt wurde. global_ever_inserted_keys = set() global_ever_inserted_keys_lock = threading.Lock() def mt_worker(cache_instance, thread_id, barrier_instance, error_event, ever_inserted_keys_set, ever_inserted_keys_lock): """Worker-Funktion für Multi-Thread-Tests.""" barrier_instance.wait() # Warten, bis alle Threads bereit sind, Operationen zu starten for i in range(MT_OPS_PER_THREAD): if error_event.is_set(): break # Stoppen, wenn ein anderer Thread bereits einen Fehler gefunden hat # Zufällig eine Operation auswählen, mit einer Gewichtung zugunsten von 'get' und 'put' op = random.choices(['get', 'put', 'delete'], weights=[0.5, 0.4, 0.1], k=1)[0] key = str(random.randint(0, MT_KEY_RANGE - 1)) value = f"value_T{thread_id}_K{key}_I{i}" if op == 'put': cache_instance.put(key, value) with ever_inserted_keys_lock: ever_inserted_keys_set.add(key) elif op == 'get': val = cache_instance.get(key) if val is not None: # Sicherstellen, dass der Schlüssel, wenn ein Wert zurückgegeben wird, irgendwann eingefügt worden sein muss. with ever_inserted_keys_lock: if key not in ever_inserted_keys_set: print(f"MT-Fehler: Thread {thread_id} erhielt Wert '{val}' für Schlüssel '{key}', " f"der global nie eingefügt wurde. Das sollte nicht passieren.") error_event.set() break # Sicherstellen, dass der Wert nicht beschädigt ist (einfache Formatprüfung) if not isinstance(val, str) or not val.startswith("value_T"): print(f"MT-Fehler: Thread {thread_id} erhielt beschädigten Wert '{val}' für Schlüssel '{key}'.") error_event.set() break elif op == 'delete': cache_instance.delete(key) # Optional: Eine kleine zufällige Schlafzeit einführen, um Kontextwechsel und Konkurrenz zu erhöhen # time.sleep(random.uniform(0.00001, 0.0001)) # print(f"Thread {thread_id} beendet.") # Für ausführliche Ausgabe einkommentieren threads = [] for i in range(MT_NUM_THREADS): thread = threading.Thread(target=mt_worker, args=(mt_cache, i, barrier, error_flag, global_ever_inserted_keys, global_ever_inserted_keys_lock)) threads.append(thread) thread.start() # Warten, bis alle Worker-Threads die Barriere erreicht haben und mit den Operationen beginnen barrier.wait() print(f"Alle {MT_NUM_THREADS} Threads haben ihre Operationen gestartet.") # Warten, bis alle Worker-Threads ihre Operationen abgeschlossen haben for thread in threads: thread.join() print("Alle Threads abgeschlossen.") # Abschließende Assertions für Multi-Thread-Tests assert not error_flag.is_set(), "MT: Während der Multi-Thread-Operationen wurden Fehler erkannt. Details in der Konsole prüfen." assert mt_cache._size <= MT_CAPACITY, f"MT: Cache-Größe ({mt_cache._size}) überschritt die Kapazität ({MT_CAPACITY})." # Prüfen, dass alle im Cache verbleibenden Schlüssel irgendwann eingefügt wurden final_keys = mt_cache.keys() with global_ever_inserted_keys_lock: for key in final_keys: assert key in global_ever_inserted_keys, f"MT-Fehler: Finaler Cache enthält Schlüssel '{key}', der global nie eingefügt wurde." print("Multi-Thread-Tests bestanden!") print("\n--- Alle LRU-Cache-Tests bestanden! ---") if __name__ == '__main__': run_tests()

Ergebnis

#2

Siegstimmen

0 / 3

Durchschnittsscore

68
Bewertungsmodelle Google Gemini 2.5 Pro

Gesamtpunktzahl

82

Gesamtkommentar

Antwort A bietet eine korrekte und gut strukturierte Implementierung eines nebenläufigen LRU-Caches. Sie verwendet ein benutzerdefiniertes ReadWriteLock und ein cleveres „Lock-Upgrade“-Muster in der `get`-Methode, um Race Conditions zu behandeln. Der Code ist sauber, gut kommentiert und enthält eine umfassende Testsuite. Seine Hauptschwäche liegt jedoch in seinem Nebenläufigkeitsmodell. Da ein `get`-Treffer eine vollständige Schreibsperre zum Aktualisieren der LRU-Reihenfolge erfordert, wird die Leistung des Caches bei leseintensiven Workloads mit einem warmen Cache stark beeinträchtigt, was seinen praktischen Vorteil gegenüber einer einfachen globalen Sperre marginal macht.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
80

Die Implementierung ist korrekt und die Behandlung der Race Condition im Lock-Upgrade-Muster der `get`-Methode ist solide. Die Wahl einer leserbevorzugenden Sperre könnte jedoch zu Writer-Starvation führen, und das allgemeine Nebenläufigkeitsmodell weist inhärente Leistungsprobleme auf, die bei diesem speziellen Problem an einen Designfehler grenzen.

Vollstandigkeit

Gewichtung 20%
90

Die Antwort ist vollständig. Sie implementiert alle erforderlichen Methoden der Schnittstelle und stellt eine `run_tests`-Funktion mit soliden Single-Threaded- und Multi-Threaded-Testfällen bereit, die die Anforderungen der Aufforderung erfüllen.

Codequalitat

Gewichtung 20%
85

Die Codequalität ist sehr gut. Der Code ist gut strukturiert, lesbar und enthält klare Docstrings und Kommentare, die seine Nebenläufigkeitsstrategie erklären. Die Verwendung von `__slots__` in der `_Node`-Klasse ist eine nette Optimierung.

Praktischer Nutzen

Gewichtung 15%
60

Der praktische Wert ist durch das Nebenläufigkeitsmodell begrenzt. Da `get`-Treffer (die häufigste Operation bei einem warmen Cache) eine vollständige Schreibsperre erfordern, serialisiert die Implementierung die meisten Operationen und bietet in vielen realen, leseintensiven Szenarien nur geringe Leistungsvorteile gegenüber einer einfachen globalen Sperre.

Befolgung der Anweisungen

Gewichtung 10%
100

Die Antwort folgt perfekt allen Anweisungen in der Aufforderung, einschließlich der Schnittstelle, der Nebenläufigkeitsanforderungen, der Randfälle, der Testspezifikationen und der Einschränkung der Standardbibliothek.

Bewertungsmodelle OpenAI GPT-5.2

Gesamtpunktzahl

62

Gesamtkommentar

Bietet einen lauffähigen LRU-Cache mit einem benutzerdefinierten RW-Sperrmechanismus und einer doppelt verketteten Liste. Das grundlegende Single-Thread-Verhalten und die Kapazitätsverwaltung scheinen korrekt zu sein. Die Concurrency-Strategie ist jedoch effektiv eine globale Schreibsperre für alle Hits, Puts und Deletes, und die RWLock-Implementierung ist fehlerhaft: Leser prüfen `write_lock.locked()` ohne diese zu halten, und Schreiber können die Schreibsperre erwerben, während neue Leser noch zugelassen werden, was die RW-Semantik potenziell verletzt. Der „Upgrade“-Pfad von `get` kann fälschlicherweise None zurückgeben, auch wenn der Schlüssel existiert (ersetzter Knoten), und kann Werte unter starker Beanspruchung übersehen; obwohl er argumentativ veraltete Daten vermeidet, opfert er die Korrektheitserwartungen für `get` unter gleichzeitigen Put-Operationen auf demselben Schlüssel. Die Tests sind anständig, verlassen sich aber auf die internen `_size` und validieren nicht die Listen-/Map-Integrität unter starker Beanspruchung.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
55

Die LRU-Logik ist im Single-Thread-Betrieb größtenteils korrekt, aber die RWLock-Implementierung ist kein solides Readers-Writer-Lock (die Leser-Gating-Logik basierend auf `write_lock.locked()`/Condition-Nutzung ist anfällig für Race Conditions). Unter starker Beanspruchung kann `get` None für einen vorhandenen Schlüssel zurückgeben, wenn dieser zwischen den Sperren aktualisiert (Knoten ersetzt) wurde, was eine vermeidbare semantische Verschlechterung darstellt und das erwartete Verhalten für gleichzeitige Put/Get-Operationen auf demselben Schlüssel verletzen kann.

Vollstandigkeit

Gewichtung 20%
70

Implementiert alle erforderlichen Methoden und bietet `run_tests` mit Single- und Multi-Thread-Szenarien unter Verwendung von 8 Threads und Invarianzprüfungen (Kapazität, nie eingefügter Schlüssel). Es fehlen jedoch tiefere Integritätsprüfungen der Struktur (Listen-/Map-Konsistenz) und es werden interne Felder in Tests verwendet.

Codequalitat

Gewichtung 20%
65

Lesbare Struktur und Kommentare, aber die RWLock-Implementierung ist komplex und arguably inkorrekt; verwendet private Felder in Tests und gibt `_size` preis, das nicht unbedingt erforderlich ist. Einige Erklärungen (reader-preferring vs. tatsächliches Verhalten) stimmen nicht mit dem Code überein.

Praktischer Nutzen

Gewichtung 15%
60

Funktioniert als grundlegender paralleler Cache, aber die Leistung unter starker Beanspruchung ist begrenzt, da alle Hits eine Schreibsperre erfordern und Bedenken hinsichtlich des RWLocks das Vertrauen für den Produktionseinsatz verringern.

Befolgung der Anweisungen

Gewichtung 10%
70

Erfüllt die Schnittstellenanforderungen und enthält Docstrings/Kommentare sowie einen 8-Thread-Mixed-Test. Die Aufforderung, eine globale Sperre für jede Operation zu vermeiden, wird jedoch nicht erfüllt; in der Praxis werden die meisten sinnvollen Operationen über die Schreibsperre serialisiert, und die Vorteile der Nebenläufigkeit sind begrenzt.

Bewertungsmodelle Anthropic Claude Opus 4.6

Gesamtpunktzahl

60

Gesamtkommentar

Antwort A bietet eine vollständige, lauffähige Implementierung mit einem benutzerdefinierten ReadWriteLock und einem LRU-Cache, der eine doppelt verkettete Liste mit Sentinel-Knoten verwendet. Die Concurrency-Strategie verwendet ein einzelnes RWLock für den gesamten Cache, was besser ist als ein globaler Mutex, aber immer noch begrenzt. Die get()-Methode verwendet ein Read-then-Write-Upgrade-Muster, das ein subtiles Problem aufweist: Zwischen der Freigabe der Lesesperre und dem Erwerb der Schreibsperre könnte der Knoten gelöscht und der Schlüssel mit einem anderen Knoten neu eingefügt werden, wodurch get() None zurückgibt, obwohl der Schlüssel existiert (ein Korrektheitsproblem). Die ReadWriteLock-Implementierung hat ein potenzielles Problem: Sie prüft `self._write_lock.locked()` in acquire_read, aber dies ist eine Lesers-bevorzugende Sperre, die Schreiber aushungern könnte. Die Tests sind angemessen, könnten aber gründlicher sein. Der Code ist gut dokumentiert mit klaren Docstrings, die die Concurrency-Strategie erklären.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
55

Antwort A's get() hat ein Korrektheitsproblem: Das Read-to-Write-Lock-Upgrade-Muster bedeutet, dass zwischen der Freigabe der Lesesperre und dem Erwerb der Schreibsperre ein anderer Thread denselben Schlüssel mit einem neuen Knoten löschen und neu einfügen könnte. Die Identitätsprüfung (current_node ist node_found_under_read_lock) würde dann fehlschlagen, wodurch get() None zurückgibt, obwohl der Schlüssel mit einem gültigen Wert existiert. Das ReadWriteLock's acquire_read prüft _write_lock.locked(), was nicht vollständig zuverlässig ist. Das Lesers-bevorzugende Design kann Schreiber aushungern.

Vollstandigkeit

Gewichtung 20%
65

Antwort A implementiert alle erforderlichen Operationen (get, put, delete, keys) mit ordnungsgemäßen LRU-Semantiken. Sie behandelt Kapazitäten von 1, aktualisiert vorhandene Schlüssel und löscht nicht vorhandene Schlüssel. Die Tests decken Single-Threaded- und Multi-Threaded-Szenarien mit 8 Threads ab. Es fehlen jedoch interne Validierungsdienstprogramme und der Multi-Threaded-Test könnte gründlicher sein.

Codequalitat

Gewichtung 20%
65

Antwort A hat eine gute Dokumentation mit klaren Docstrings, die die Concurrency-Strategie und Kompromisse erklären. Der Code ist gut strukturiert mit sauberer Trennung der Zuständigkeiten. Das ReadWriteLock verfügt über Kontextmanager für eine saubere Nutzung. Verwendet __slots__ auf _Node für Speichereffizienz. Das _size-Tracking ist jedoch manuell und könnte bei Fehlern abweichen, und die ReadWriteLock-Implementierung weist einige fragwürdige Designentscheidungen auf (Prüfung des gesperrten Zustands).

Praktischer Nutzen

Gewichtung 15%
55

Antwort A bietet einen funktionsfähigen LRU-Cache, aber die Race Condition des get()-Upgrade-Musters reduziert die praktische Zuverlässigkeit. Das Lesers-bevorzugende RWLock könnte im Produktionsbetrieb zu Schreiber-Aushungerung führen. Der Concurrency-Vorteil ist begrenzt, da get-Hits immer noch eine Schreibsperre erfordern und das Upgrade-Muster zusätzlichen Overhead hinzufügt. Müsste vor dem Produktionseinsatz behoben werden.

Befolgung der Anweisungen

Gewichtung 10%
65

Antwort A folgt den meisten Anweisungen: implementiert alle erforderlichen Operationen, verwendet eine Concurrency-Strategie über eine einzelne globale Sperre hinaus, enthält Docstrings und Kommentare, behandelt Randfälle und bietet run_tests() mit Single-Threaded- und Multi-Threaded-Tests mit 8 Threads. Die Assertions des Multi-Threaded-Tests könnten jedoch stärker sein, und die Erklärung des Concurrency-Designs könnte die Kompromisse besser berücksichtigen.

Antwort B: OpenAI GPT-5.4

from __future__ import annotations import random import threading import time from dataclasses import dataclass, field from typing import Any, Dict, List, Optional class RWLock: """ Eine einfache, schreibzugriff-bevorzugende Lese-/Schreibsperre. Konkurrenzstrategie: - Mehrere Leser können die Sperre gleichzeitig halten. - Schreiber sind exklusiv. - Sobald ein Schreiber wartet, werden neue Leser blockiert, damit Schreiber nicht ausgehungert werden. Dies wird verwendet, um das Wörterbuch zu schützen, das Schlüssel zu Knoten zuordnet. Lesevorgänge, die nur Mitgliedschafts-/Wertea...

Vollstandige Antwort anzeigen

from __future__ import annotations import random import threading import time from dataclasses import dataclass, field from typing import Any, Dict, List, Optional class RWLock: """ Eine einfache, schreibzugriff-bevorzugende Lese-/Schreibsperre. Konkurrenzstrategie: - Mehrere Leser können die Sperre gleichzeitig halten. - Schreiber sind exklusiv. - Sobald ein Schreiber wartet, werden neue Leser blockiert, damit Schreiber nicht ausgehungert werden. Dies wird verwendet, um das Wörterbuch zu schützen, das Schlüssel zu Knoten zuordnet. Lesevorgänge, die nur Mitgliedschafts-/Werteabfragen benötigen, können gleichzeitig ausgeführt werden. Operationen, die den Index oder die LRU-Liste ändern, übernehmen die Schreibsperre. Hinweis: Python bietet keine echten sperrenfreien Primitiven zum Erstellen einer vollständig sperrenfreien LRU in der Standardbibliothek. Diese Implementierung vermeidet stattdessen eine einzelne globale Mutex für allen Zugriff, indem sie verwendet: - eine Lese-/Schreibsperre für den Schlüssel->Knoten-Index - eine separate Sperre für die doppelt verkettete LRU-Liste - Knotenspezifische Sperren für knotenlokalen Zustand Dies bietet eine erheblich bessere Nebenläufigkeit als eine einzelne globale Sperre und wahrt gleichzeitig die Korrektheit unter Last. """ def __init__(self) -> None: self._cond = threading.Condition(threading.Lock()) self._readers = 0 self._writer = False self._writers_waiting = 0 def acquire_read(self) -> None: with self._cond: while self._writer or self._writers_waiting > 0: self._cond.wait() self._readers += 1 def release_read(self) -> None: with self._cond: self._readers -= 1 if self._readers == 0: self._cond.notify_all() def acquire_write(self) -> None: with self._cond: self._writers_waiting += 1 try: while self._writer or self._readers > 0: self._cond.wait() self._writer = True finally: self._writers_waiting -= 1 def release_write(self) -> None: with self._cond: self._writer = False self._cond.notify_all() @dataclass class _Node: key: str value: Any prev: Optional["_Node"] = None next: Optional["_Node"] = None deleted: bool = False lock: threading.RLock = field(default_factory=threading.RLock) class ConcurrentLRUCache: """ Threadsichere LRU-Cache mit feingranularer Synchronisation. Designübersicht --------------- Datenstrukturen: - self._map: dict[str, _Node] Ordnet Schlüssel Knoten zu. Geschützt durch self._index_lock (RWLock). - Doppelt verkettete Liste mit Sentinel-Knoten self._head (MRU-Seite) und self._tail (LRU-Seite) Geschützt durch self._list_lock. Sperrregeln ------------- 1. Wörterbuch-/Index-Lesevorgänge verwenden self._index_lock.acquire_read(). 2. Wörterbuch-/Index-Mutationen verwenden self._index_lock.acquire_write(). 3. Jegliche Zeigeränderungen in der verketteten Liste verwenden self._list_lock. 4. Knotenspezifische Sperren serialisieren den Zugriff auf den Wert/den gelöschten Zustand eines Knotens. Reihenfolgeregel zur Vermeidung von Deadlocks -------------------------------- Wenn mehrere Sperren benötigt werden, folgen Operationen einer stabilen Reihenfolge: Index-Sperre -> Listen-Sperre -> Knoten-Sperre(n) Nebenläufigkeits-Kompromisse ---------------------- - get() muss die Aktualität aktualisieren, daher kann es kein reiner Lesevorgang sein. Es führt zuerst eine gleichzeitige Indexabfrage unter einer Lesesperre durch und übernimmt dann kurzzeitig die Listensperre, um den gefundenen Knoten an die MRU-Position zu verschieben. - put(), delete() und Eviction ändern sowohl den Index als auch die Liste, daher übernehmen sie die Index-Schreibsperre und die Listensperre. - Dies ist nicht streng sperrenfrei; die Standardbibliothek von Python macht eine praktische sperrenfreie LRU-Liste nicht machbar. Es ist jedoch nebenläufiger als ein Design, das eine globale Mutex für jede Operation verwendet. """ def __init__(self, capacity: int): """Initialisiert den Cache mit einer positiven maximalen Kapazität.""" if not isinstance(capacity, int) or capacity <= 0: raise ValueError("capacity must be a positive integer") self._capacity = capacity self._map: Dict[str, _Node] = {} self._index_lock = RWLock() self._list_lock = threading.RLock() self._head = _Node("__HEAD__", None) self._tail = _Node("__TAIL__", None) self._head.next = self._tail self._tail.prev = self._head def _insert_after_head_unlocked(self, node: _Node) -> None: """Fügt den Knoten direkt nach dem Kopf ein. Der Aufrufer muss self._list_lock halten.""" first = self._head.next node.prev = self._head node.next = first self._head.next = node if first is not None: first.prev = node def _detach_unlocked(self, node: _Node) -> None: """Löst den Knoten aus der Liste. Der Aufrufer muss self._list_lock halten.""" prev_node = node.prev next_node = node.next if prev_node is not None: prev_node.next = next_node if next_node is not None: next_node.prev = prev_node node.prev = None node.next = None def _move_to_front_if_present(self, node: _Node) -> None: """ Verschiebt einen vorhandenen Knoten an die MRU-Position, wenn er noch aktiv ist. Verwendet die Listensperre plus die Knotensperre. Die vom Aufrufer gehaltene Map-Lesesperre stellt sicher, dass das Knotenobjekt während der Abfrage über den Index erreichbar bleibt. """ with self._list_lock: with node.lock: if node.deleted: return if node.prev is self._head: return if node.prev is not None and node.next is not None: self._detach_unlocked(node) self._insert_after_head_unlocked(node) def get(self, key: str) -> Optional[Any]: """ Gibt den Wert für den Schlüssel zurück, wenn er vorhanden ist, und markiert ihn als zuletzt verwendet. Gibt None zurück, wenn er nicht vorhanden ist. """ self._index_lock.acquire_read() try: node = self._map.get(key) if node is None: return None with node.lock: if node.deleted: return None value = node.value self._move_to_front_if_present(node) return value finally: self._index_lock.release_read() def put(self, key: str, value: Any) -> None: """ Fügt ein Schlüssel-Wert-Paar ein oder aktualisiert es. Wenn die Kapazität überschritten wird, wird das am wenigsten zuletzt verwendete Element entfernt. """ self._index_lock.acquire_write() try: with self._list_lock: existing = self._map.get(key) if existing is not None: with existing.lock: if not existing.deleted: existing.value = value if existing.prev is not self._head: if existing.prev is not None and existing.next is not None: self._detach_unlocked(existing) self._insert_after_head_unlocked(existing) return else: self._map.pop(key, None) node = _Node(key, value) self._map[key] = node self._insert_after_head_unlocked(node) while len(self._map) > self._capacity: victim = self._tail.prev if victim is None or victim is self._head: break with victim.lock: if victim.deleted: if victim.prev is not None and victim.next is not None: self._detach_unlocked(victim) self._map.pop(victim.key, None) continue victim.deleted = True self._detach_unlocked(victim) self._map.pop(victim.key, None) finally: self._index_lock.release_write() def delete(self, key: str) -> bool: """ Entfernt den Schlüssel aus dem Cache. Gibt True zurück, wenn der Schlüssel vorhanden war, andernfalls False. """ self._index_lock.acquire_write() try: with self._list_lock: node = self._map.pop(key, None) if node is None: return False with node.lock: if node.deleted: return False node.deleted = True if node.prev is not None and node.next is not None: self._detach_unlocked(node) return True finally: self._index_lock.release_write() def keys(self) -> List[str]: """ Gibt die Schlüssel vom zuletzt verwendeten zum am wenigsten zuletzt verwendeten zurück. Wir nehmen die Index-Lesesperre und die Listensperre, um einen konsistenten Schnappschuss zu erhalten. """ result: List[str] = [] self._index_lock.acquire_read() try: with self._list_lock: cur = self._head.next while cur is not None and cur is not self._tail: with cur.lock: if not cur.deleted: result.append(cur.key) cur = cur.next return result finally: self._index_lock.release_read() def _debug_size(self) -> int: """Interner Helfer für Tests.""" self._index_lock.acquire_read() try: return len(self._map) finally: self._index_lock.release_read() def _debug_validate(self) -> None: """ Interner Konsistenzprüfer für Tests. Überprüft: - Listenverknüpfungen sind wohlgeformt - Jeder aktive Listenknoten ist im Map vorhanden - Keine doppelten Schlüssel in der Liste - Die Größe überschreitet niemals die Kapazität """ self._index_lock.acquire_read() try: with self._list_lock: assert len(self._map) <= self._capacity, "cache exceeded capacity" seen = set() cur = self._head assert self._head.prev is None assert self._tail.next is None while cur is not None: nxt = cur.next if nxt is not None: assert nxt.prev is cur, "broken backward pointer" if cur not in (self._head, self._tail): with cur.lock: assert not cur.deleted, "deleted node remained in list" assert cur.key in self._map, "live list node missing from map" assert self._map[cur.key] is cur, "map points to wrong node" assert cur.key not in seen, "duplicate key in list" seen.add(cur.key) if cur is self._tail: break cur = nxt assert seen == set(self._map.keys()), "map/list key mismatch" finally: self._index_lock.release_read() def run_tests() -> None: """ Führt Single-Threaded- und Multi-Threaded-Tests aus. Anforderungen an Multi-Threaded-Tests: - mindestens 8 Threads - Mischung aus get/put/delete auf überlappenden Schlüsseln - Assert, dass der Cache niemals die Kapazität überschreitet - Assert, dass get niemals einen Wert für einen Schlüssel zurückgibt, der nie eingefügt wurde """ # Single-Threaded-Tests c = ConcurrentLRUCache(2) assert c.get("missing") is None c.put("a", 1) c.put("b", 2) assert c.keys() == ["b", "a"] assert c.get("a") == 1 assert c.keys() == ["a", "b"] c.put("c", 3) assert c.get("b") is None assert c.keys() == ["c", "a"] c.put("a", 10) assert c.get("a") == 10 assert c.keys()[0] == "a" assert c.delete("a") is True assert c.delete("a") is False assert c.get("a") is None assert c.keys() == ["c"] c1 = ConcurrentLRUCache(1) c1.put("x", 1) assert c1.keys() == ["x"] c1.put("y", 2) assert c1.get("x") is None assert c1.get("y") == 2 assert c1.keys() == ["y"] c1.put("y", 3) assert c1.get("y") == 3 assert c1.delete("z") is False c1._debug_validate() # Konkurrente Tests capacity = 16 cache = ConcurrentLRUCache(capacity) inserted_ever = set() inserted_lock = threading.Lock() errors: List[str] = [] errors_lock = threading.Lock() stop_flag = threading.Event() key_space = [f"k{i}" for i in range(24)] def record_error(msg: str) -> None: with errors_lock: errors.append(msg) def worker(tid: int) -> None: rnd = random.Random(1000 + tid) try: for i in range(2000): key = key_space[rnd.randrange(len(key_space))] op = rnd.random() if op < 0.45: value = (tid, i) cache.put(key, value) with inserted_lock: inserted_ever.add(key) elif op < 0.80: value = cache.get(key) if value is not None: with inserted_lock: if key not in inserted_ever: record_error(f"get returned value for never-inserted key: {key}") else: cache.delete(key) if i % 100 == 0: size = cache._debug_size() if size > capacity: record_error(f"capacity exceeded: {size} > {capacity}") cache._debug_validate() except Exception as exc: record_error(f"worker {tid} failed: {exc!r}") finally: if stop_flag.is_set(): return def validator() -> None: try: for _ in range(400): cache._debug_validate() size = cache._debug_size() if size > capacity: record_error(f"capacity exceeded in validator: {size} > {capacity}") time.sleep(0.002) except Exception as exc: record_error(f"validator failed: {exc!r}") finally: stop_flag.set() threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] threads += [threading.Thread(target=validator)] for t in threads: t.start() for t in threads: t.join() cache._debug_validate() assert cache._debug_size() <= capacity assert not errors, "\n".join(errors) # Fokussierter Test auf gleichzeitigen Zugriff auf denselben Schlüssel hot = ConcurrentLRUCache(4) hot.put("shared", 0) def same_key_worker(tid: int) -> None: rnd = random.Random(2000 + tid) for i in range(1000): if rnd.random() < 0.5: hot.put("shared", (tid, i)) else: _ = hot.get("shared") if rnd.random() < 0.1: hot.delete("shared") hot.put("shared", (tid, i, "reinsert")) ts = [threading.Thread(target=same_key_worker, args=(i,)) for i in range(8)] for t in ts: t.start() for t in ts: t.join() hot._debug_validate() assert hot._debug_size() <= 4 assert hot.get("shared") is not None print("All tests passed.") if __name__ == "__main__": run_tests()

Ergebnis

#1 | Sieger

Siegstimmen

3 / 3

Durchschnittsscore

86
Bewertungsmodelle Google Gemini 2.5 Pro

Gesamtpunktzahl

98

Gesamtkommentar

Antwort B präsentiert eine außergewöhnlich gut durchdachte Lösung mit einer ausgeklügelten, fein granularen Sperrstrategie. Sie verwendet drei Ebenen von Sperren (eine ReadWriteLock für den Index, ein Mutex für die Liste und Pro-Knoten-Sperren), um die Nebenläufigkeit zu maximieren, was ein weitaus skalierbareres und praktischeres Design darstellt. Die Codequalität ist herausragend, mit äußerst detaillierten Docstrings, die das komplexe Design und die Strategie zur Vermeidung von Deadlocks erklären. Das Testen ist ebenfalls rigoroser, einschließlich eines dedizierten Validierungs-Threads und eines fokussierten Kontentionstests. Die Lösung demonstriert ein tiefes Verständnis der Prinzipien der nebenläufigen Programmierung.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
95

Die Implementierung erscheint hochgradig korrekt und robust. Die komplexe, fein granulare Sperrstrategie ist sorgfältig mit einer strengen Sperrerwerbsreihenfolge zur Vermeidung von Deadlocks konzipiert. Die Verwendung eines `deleted`-Flags bei Knoten ist eine effektive Methode zur Behandlung von Race Conditions zwischen dem Index und der Liste. Die enthaltene `_debug_validate`-Funktion erhöht das Vertrauen in ihre Korrektheit weiter.

Vollstandigkeit

Gewichtung 20%
100

Die Antwort ist außergewöhnlich vollständig. Sie implementiert die gesamte erforderliche Schnittstelle und bietet eine äußerst gründliche Testsuite, die über die Anforderungen der Aufforderung hinausgeht, indem sie einen dedizierten Validierungs-Thread und einen fokussierten Test für gleichzeitige Zugriffe auf denselben Schlüssel enthält, was ein Engagement für Robustheit zeigt.

Codequalitat

Gewichtung 20%
100

Die Codequalität ist herausragend. Die Docstrings sind außergewöhnlich detailliert und erklären nicht nur, was der Code tut, sondern auch das komplexe Nebenläufigkeitsdesign, die Kompromisse und die kritischen Sperrreihenfolgeregeln zur Vermeidung von Deadlocks. Die Einbeziehung einer `_debug_validate`-Methode ist ein Kennzeichen hoher Qualität und defensiver Programmierung.

Praktischer Nutzen

Gewichtung 15%
95

Die Lösung hat einen sehr hohen praktischen Wert. Ihr fein granulares Sperrdesign ist hochgradig skalierbar und würde unter hoher Kontention in einer realen Anwendung gut funktionieren. Dies ist die Art von robuster, hochleistungsfähiger Design, die man in einer produktionsreifen Nebenläufigkeitsbibliothek erwarten würde.

Befolgung der Anweisungen

Gewichtung 10%
100

Die Antwort folgt perfekt allen Anweisungen in der Aufforderung, einschließlich der Schnittstelle, der Anforderungen an die Nebenläufigkeit, der Randfälle, der Testspezifikationen und der Einschränkung der Standardbibliothek.

Bewertungsmodelle OpenAI GPT-5.2

Gesamtpunktzahl

87

Gesamtkommentar

Liefert einen sorgfältiger entwickelten, nebenläufigen LRU mit einem schreibenden RWLock für den Index, einer separaten Listen-Sperre und Knotensperren, mit expliziter Sperrreihenfolge zur Reduzierung des Deadlock-Risikos. Operationen halten die LRU-Reihenfolge konsistent, halten die Größe innerhalb der Kapazität und behandeln nebenläufige Updates/Löschungen über ein `deleted`-Flag. Enthält umfangreiche Tests: Single-Thread-Prüfungen, eine 8-Thread-gemischte Arbeitslast plus ein separater Validator-Thread, der Invariantenprüfungen durchführt, und ein fokussierter Test für gleichzeitigen Zugriff auf denselben Schlüssel. Der Code ist gut strukturiert, dokumentiert und enthält interne Validierungshelfer zur Erkennung von Korruption.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
85

Behält kohärente Map+Listen-Invarianten mit expliziter Behandlung des gelöschten Zustands, stabiler Sperrreihenfolge und einer Evictionschleife, die durch Index-Schreibzugriff + Listensperre geschützt ist. `get/put/delete/keys` verhalten sich konsistent und `_debug_validate` bietet zusätzliche Sicherheit gegen Korruption und Kapazitätsüberlauf unter Last.

Vollstandigkeit

Gewichtung 20%
90

Implementiert die vollständige Schnittstelle plus hilfreiche Debug-Helfer. run_tests ist umfassend: Single-Thread-Abdeckung, 8-Thread-gemischte Operationen, kontinuierlicher Validator-Thread, Kapazitäts- und Never-Inserted-Prüfungen sowie ein Test für gleichzeitigen Zugriff auf denselben Schlüssel.

Codequalitat

Gewichtung 20%
85

Klare Dokumentation der Strategie, Sperrreihenfolge und Kompromisse; gute Trennung der Zuständigkeiten (Index/Liste/Knoten). Interne Validierungsroutinen sind gut konzipiert und unterstützen Wartbarkeit und Fehlersuche.

Praktischer Nutzen

Gewichtung 15%
85

Praktischer: vermeidet eine einzelne globale Mutex, ermöglicht nebenläufige Index-Lesezugriffe und hat stärkere Sicherheitsprüfungen. Immer noch nicht lock-free (anerkannt), aber eine solide nebenläufige Implementierung der Standardbibliothek.

Befolgung der Anweisungen

Gewichtung 10%
90

Geht direkt auf die Einschränkungen der Aufgabenstellung ein und erklärt, warum eine echte Lock-Free-Implementierung in der Standardbibliothek von Python nicht machbar ist. Vermeidet eine einzelne globale Mutex für alle Operationen durch mehrere Sperren und bietet die erforderlichen Multi-Thread-Assertions und Abdeckung.

Bewertungsmodelle Anthropic Claude Opus 4.6

Gesamtpunktzahl

73

Gesamtkommentar

Antwort B bietet eine ausgefeiltere Implementierung mit mehreren Sperrebenen: eine schreibvorzugsbevorzugende RWLock für den Index, eine separate Listen-Sperre und Pro-Knoten-Sperren. Dies ist wirklich feingranulare Nebenläufigkeit. Die schreibvorzugsbevorzugende RWLock verhindert Schreiber-Verhungern. Die get()-Operation hält die Lesesperre, während sie den Knoten nach vorne verschiebt (unter Verwendung der Listen-Sperre), was korrekter ist als das Upgrade-Muster von Antwort A. Der Code enthält eine _debug_validate()-Methode, die die interne Konsistenz gründlich prüft. Die Tests sind umfassender und beinhalten einen dedizierten Test für gleichzeitigen Zugriff auf denselben Schlüssel und einen separaten Validator-Thread, der während gleichzeitiger Operationen kontinuierlich Invarianten prüft. Das Pro-Knoten-gelöschte Flag fügt Robustheit gegen Race Conditions hinzu. Die get()-Methode hält jedoch die Index-Lesesperre, während sie auch die Listen-Sperre erwirbt, was bedeutet, dass Lesevorgänge nicht vollständig gleichzeitig ablaufen, wenn beide gleichzeitig auftreten (da sie um die Listen-Sperre konkurrieren). Der Code ist gut strukturiert mit klarer Dokumentation der Sperrregeln und der Reihenfolge der Vermeidung von Deadlocks.

Bewertungsdetails anzeigen

Korrektheit

Gewichtung 35%
75

Das Nebenläufigkeitsdesign von Antwort B ist korrekter. Die get()-Operation hält die Index-Lesesperre während der gesamten Operation, einschließlich des Verschiebevorgangs nach vorne, und vermeidet so die Race Condition beim Upgrade. Pro-Knoten-gelöschte Flags bieten zusätzliche Sicherheit. Die schreibvorzugsbevorzugende RWLock verhindert Schreiber-Verhungern. Die _debug_validate-Methode hilft bei der Überprüfung der internen Konsistenz. Das Halten der Lesesperre beim Erwerb der Listen-Sperre bedeutet jedoch, dass gleichzeitige get-Aufrufe immer noch um die Listen-Sperre serialisiert werden, obwohl dies eher ein Leistungs- als ein Korrektheitsproblem ist.

Vollstandigkeit

Gewichtung 20%
75

Antwort B implementiert alle erforderlichen Operationen und geht darüber hinaus mit einer _debug_validate()-Methode zur Überprüfung der internen Konsistenz, einer _debug_size()-Hilfsfunktion und umfassenderen Tests, einschließlich eines dedizierten Tests für gleichzeitigen Zugriff auf denselben Schlüssel, eines separaten Validator-Threads, der gleichzeitig läuft, und periodischer Kapazitäts-/Konsistenzprüfungen während des Multithread-Tests. Das Pro-Knoten-gelöschte Flag fügt der Nebenläufigkeitsbehandlung Vollständigkeit hinzu.

Codequalitat

Gewichtung 20%
70

Antwort B verfügt über eine ausgezeichnete Dokumentation mit klaren Sperrregeln, einer Reihenfolge zur Vermeidung von Deadlocks und einer Designübersicht in der Klassen-Docstring. Verwendet dataclass für _Node, was sauber ist. Die Code-Struktur mit separater Index-Sperre, Listen-Sperre und Pro-Knoten-Sperren ist gut organisiert. Die _debug_validate-Methode zeigt Aufmerksamkeit für Wartbarkeit. Verwendet len(self._map) anstelle eines separaten Größenzählers, was zuverlässiger ist. Die schreibvorzugsbevorzugende RWLock ist eine standardmäßigere und robustere Implementierung.

Praktischer Nutzen

Gewichtung 15%
70

Antwort B ist mit seinem robusten Nebenläufigkeitsdesign, der schreibvorzugsbevorzugenden Sperre, die Verhungern verhindert, den Pro-Knoten-gelöschten Flags für Sicherheit und umfassenden Validierungsdienstprogrammen produktionsreifer. Die feingranulare Sperrung mit separaten Index- und Listen-Sperren bietet echte Nebenläufigkeitsvorteile. Die _debug_validate-Methode ist für das Debugging in der Produktion wertvoll. Der Test auf gleichzeitigen Zugriff auf denselben Schlüssel demonstriert die Handhabung von realen Szenarien.

Befolgung der Anweisungen

Gewichtung 10%
75

Antwort B folgt allen Anweisungen gründlich: implementiert alle erforderlichen Operationen mit korrekten Signaturen, verwendet feingranulare Nebenläufigkeit über eine einzelne globale Sperre hinaus, enthält umfassende Docstrings und Kommentare, die die Strategie und Kompromisse erklären, behandelt alle angegebenen Randfälle und bietet run_tests() mit umfangreichen Single-Thread-Tests, Multithread-Stresstests mit 8+ Threads und zusätzlichen fokussierten Kontentionstests. Die Assertions prüfen wie gefordert Kapazitätsgrenzen und Datenintegrität.

Vergleichsuebersicht

Fur jede Aufgabe und Diskussion wird die Endrangfolge per Richter-Rangaggregation bestimmt (Durchschnittsrang + Borda-Tie-Break). Der Durchschnittsscore wird als Referenz angezeigt.

Bewerter: 3

Siegstimmen

0 / 3

Durchschnittsscore

68
Diese Antwort ansehen

Siegstimmen

3 / 3

Durchschnittsscore

86
Diese Antwort ansehen

Bewertungsergebnisse

Bewertungsmodelle Anthropic Claude Opus 4.6

Warum diese Seite gewann

Antwort B gewinnt, da sie ein robusteres und korrekteres Concurrency-Design mit mehreren Sperrschichten (Index RWLock, Listen-Sperre, Pro-Knoten-Sperren), einer schreibenden bevorzugenden RWLock, die Writer-Starvation vermeidet, einem gelöschten Flag an Knoten zur Behandlung von Race Conditions und deutlich gründlicheren Tests, einschließlich eines Validator-Threads und eines Tests auf Gleichzeitigkeitskonflikte bei gleichen Schlüsseln, bietet. Das Muster des Lesens-zu-Schreibens-Sperr-Upgrades in Antwort A (get()) führt ein Fenster ein, in dem die Korrektheit beeinträchtigt werden kann (Rückgabe von None für einen vorhandenen Schlüssel), und ihre lesenden bevorzugende RWLock kann Schreiber unter hoher Beanspruchung aushungern lassen.

Bewertungsmodelle OpenAI GPT-5.2

Warum diese Seite gewann

Antwort B gewinnt, da sie ein deutlich stärkeres und sichereres Nebenläufigkeitsdesign (separate Synchronisation von Index/Liste/Knoten mit einem korrekten RWLock) aufweist, eine bessere Korrektheit unter Last (konsistente LRU/Reihenfolge und Kapazität mit Validierung von Invarianten) bietet und wesentlich robustere Multithreading-Tests durchführt. Die RWLock-Semantik von Antwort A ist fragwürdig und ihre Strategie für das Upgrade von get kann nebenläufige Aktualisierungen fälschlicherweise als Misses behandeln, was sie insgesamt weniger korrekt und weniger robust macht.

Bewertungsmodelle Google Gemini 2.5 Pro

Warum diese Seite gewann

Antwort B ist der klare Gewinner aufgrund ihres weitaus überlegenen Nebenläufigkeitsdesigns, das sich in einem höheren praktischen Wert und einer besseren Skalierbarkeit niederschlägt. Während Antwort A korrekt ist, schafft ihre Abhängigkeit von einem einzigen ReadWriteLock, das für den gängigen `get`-Hit-Vorgang eine vollständige Schreibsperre erfordert, einen erheblichen Leistungsengpass. Die feingranulare Sperrstrategie von Antwort B ist weitaus fortschrittlicher und ermöglicht eine wesentlich höhere Parallelität. Dieses überlegene Design wird durch eine außergewöhnliche Codequalität, eine gründliche Dokumentation, die das komplexe Sperrprotokoll erklärt, und eine robustere Testsuite unterstützt, was sie zu einer Einreichung von Benchmark-Qualität macht.

X f L