回答: import threading import random import time from typing import Any, Optional, List class _Node: """ LRU の順序付けに使用される双方向連結リスト内のノードを表します。 メモリ効率のために __slots__ を使用します。 """ __slots__ = 'key', 'value', 'prev', 'next' def __init__(self, key: str, value: Any): self.key = key self.value = value self.prev = None self.next = None class ReadWriteLock: """ カスタムの読み取り・書き込みロック実装です。このロックは複数のリーダーが 同時にロックを保持できる一方で、ライターは一度に 1 つだけしか保持できません。ラ...
全文を表示 ▼
回答: import threading import random import time from typing import Any, Optional, List class _Node: """ LRU の順序付けに使用される双方向連結リスト内のノードを表します。 メモリ効率のために __slots__ を使用します。 """ __slots__ = 'key', 'value', 'prev', 'next' def __init__(self, key: str, value: Any): self.key = key self.value = value self.prev = None self.next = None class ReadWriteLock: """ カスタムの読み取り・書き込みロック実装です。このロックは複数のリーダーが 同時にロックを保持できる一方で、ライターは一度に 1 つだけしか保持できません。ライターは すべてのリーダーが終了するのを待つ必要があり、リーダーはライターが 終了するのを待つ必要があります。これはリーダー優先の実装であり、新しいリーダーは ライターが待機中であっても読み取りロックを取得できます。 このクラスは、よりクリーンにロックを取得・解放するためのコンテキストマネージャも提供します。 """ def __init__(self): self._read_lock = threading.Lock() # _readers のカウントと条件変数を保護 self._write_lock = threading.Lock() # 実際の書き込みロック self._readers = 0 # アクティブなリーダー数 self._no_readers = threading.Condition(self._read_lock) # ライターがリーダーを待つための条件 def acquire_read(self): """読み取りロックを取得します。複数のリーダーが同時にこのロックを保持できます。""" with self._read_lock: # 現在ライターが書き込みロックを保持している場合は待機します。 # ライターがアクティブな間、新しいリーダーはブロックされます。 while self._write_lock.locked(): self._no_readers.wait() self._readers += 1 def release_read(self): """読み取りロックを解放します。""" with self._read_lock: self._readers -= 1 # リーダーがいなくなった場合は、待機中のライターに通知します if self._readers == 0: self._no_readers.notify_all() def acquire_write(self): """書き込みロックを取得します。一度に 1 つのライターのみがこのロックを保持できます。""" # まずメインの書き込みロックを取得して、書き込みの意図を示します self._write_lock.acquire() with self._read_lock: # 次に read_lock を取得し、アクティブなリーダーを待ちます # すべてのアクティブなリーダーが終了するまで待機します while self._readers > 0: self._no_readers.wait() def release_write(self): """書き込みロックを解放します。""" with self._read_lock: # read_lock を取得して、待機中のリーダー/ライターに通知します self._no_readers.notify_all() # 待機中のリーダーまたはライターに通知します self._write_lock.release() # メインの書き込みロックを解放します class _ReadContextManager: def __init__(self, lock): self.lock = lock def __enter__(self): self.lock.acquire_read() def __exit__(self, exc_type, exc_val, exc_tb): self.lock.release_read() class _WriteContextManager: def __init__(self, lock): self.lock = lock def __enter__(self): self.lock.acquire_write() def __exit__(self, exc_type, exc_val, exc_tb): self.lock.release_write() def read_locked(self): """読み取りロックを取得するためのコンテキストマネージャを返します。""" return self._ReadContextManager(self) def write_locked(self): """書き込みロックを取得するためのコンテキストマネージャを返します。""" return self._WriteContextManager(self) class LRUCache: """ スレッドセーフな LRU(Least Recently Used: 最近最も使われていないものを削除)キャッシュ実装です。 このキャッシュは、同時アクセスを管理するために ReadWriteLock を使用します。この設計は キャッシュミス時の並行読み取りと `keys()` 操作を可能にすることを目指しています。 一方で、キャッシュ状態または LRU 順序を変更する操作(`put`、`delete`、 および `get` におけるキャッシュヒット)は書き込みロックを取得し、これらの重要区間に対して 原子性と一貫性を保証します。このアプローチは、すべての操作に対して単一のグローバルミューテックスを 用いる場合よりも高い並行性を提供します。特に、キャッシュミスや `keys()` 呼び出しが頻繁な 読み取り中心のワークロードで効果的です。 `get` 操作は「アップグレード」パターンを採用しています。まず読み取りロックを試みて キーの存在を確認します。見つかった場合、読み取りロックを解放してから書き込みロックを取得し、 LRU 順序を更新して最新の値を取得します。このパターンは、ロックのアップグレード中に発生しうる 競合状態(たとえば、他のスレッドによってキーが削除または更新されるなど)を処理するように 慎重に実装されています。 """ def __init__(self, capacity: int): if capacity <= 0: raise ValueError("容量は正の整数でなければなりません。") self._capacity = capacity self._cache = {} # O(1) で検索できるように key -> _Node を保持 self._size = 0 # 双方向連結リスト用のダミーの先頭ノードと末尾ノード。 # 実データノードは常に head と tail の間にあるため、ノードの追加/削除における # 境界ケースが簡略化されます。 self._head = _Node("dummy_head", None) self._tail = _Node("dummy_tail", None) self._head.next = self._tail self._tail.prev = self._head # 並行性制御: 単一の ReadWriteLock がキャッシュ全体の状態を保護します。 self._rw_lock = ReadWriteLock() def _add_node(self, node: _Node): """head の直後(最も最近使われた位置)にノードを追加します。 書き込みロックが保持されていることを前提とします。""" node.prev = self._head node.next = self._head.next self._head.next.prev = node self._head.next = node def _remove_node(self, node: _Node): """双方向連結リストからノードを削除します。 書き込みロックが保持されていることを前提とします。""" prev_node = node.prev next_node = node.next prev_node.next = next_node next_node.prev = prev_node def _move_to_front(self, node: _Node): """既存のノードをリスト先頭(最も最近使われた位置)に移動します。 書き込みロックが保持されていることを前提とします。""" self._remove_node(node) self._add_node(node) def _pop_tail(self) -> _Node: """最も最近使われていないノード(tail の直前のノード)を削除して返します。 書き込みロックが保持されていることを前提とします。""" node = self._tail.prev self._remove_node(node) return node def get(self, key: str) -> Optional[Any]: """ キーが存在する場合は、そのキーに関連付けられた値を返し、最近使われたものとしてマークします。 キーがキャッシュ内にない場合は None を返します。 この操作はまず読み取りロックを取得してキーの存在を確認します。キーが 見つかった場合は読み取りロックを解放し、その後書き込みロックを取得して LRU 順序を更新し、最新の値を取得します。この「アップグレード」パターンにより、 `get` のミスや `keys()` 操作は並行実行できますが、`get` のヒットは他の書き込みと直列化されます。 """ node_found_under_read_lock = None with self._rw_lock.read_locked(): node_found_under_read_lock = self._cache.get(key) if not node_found_under_read_lock: return None # キーが見つかった場合は、LRU 順序を更新して最新の値を得るために書き込みロックを取得します。 # これは「アップグレード」パターンです。ここでは競合状態が発生する可能性があります: 別スレッドが # 読み取りロックの解放から書き込みロック取得までの間に、そのキーを削除または更新するかもしれません。 with self._rw_lock.write_locked(): # そのキーが依然として存在し、かつまったく同じノードオブジェクトを参照しているかを再確認します。 # これは競合下での正しさのために非常に重要です。別スレッドが同じキーに対して # `put` を実行していた場合、`_cache.get(key)` は *新しい* ノードを返します。 # 別スレッドが `delete` を実行していた場合は `None` を返します。 current_node = self._cache.get(key) if current_node is node_found_under_read_lock: # まったく同じノードオブジェクトであることを確認 self._move_to_front(node_found_under_read_lock) value = current_node.value # 移動後に最新の値を取得 return value else: # ノードは、読み取りロックと書き込みロック取得の間に、別の 'put' 操作によって # 削除されたか置き換えられました。ミスとして扱います。 return None def put(self, key: str, value: Any) -> None: """ キーと値のペアを挿入または更新します。挿入後にキャッシュが容量を超えた場合は、 最も最近使われていない項目を追い出します。これは書き込み操作であり、書き込みロックを取得します。 """ with self._rw_lock.write_locked(): node = self._cache.get(key) if node: # キーが存在する場合: 値を更新して先頭へ移動 node.value = value self._move_to_front(node) else: # キーが存在しない場合: 新しいノードを追加 new_node = _Node(key, value) self._cache[key] = new_node self._add_node(new_node) self._size += 1 # 容量超過時は追い出し if self._size > self._capacity: lru_node = self._pop_tail() del self._cache[lru_node.key] self._size -= 1 def delete(self, key: str) -> bool: """ キャッシュからキーを削除します。キーが存在した場合は True、そうでない場合は False を返します。 これは書き込み操作であり、書き込みロックを取得します。 """ with self._rw_lock.write_locked(): node = self._cache.get(key) if node: self._remove_node(node) del self._cache[key] self._size -= 1 return True return False def keys(self) -> List[str]: """ 現在キャッシュ内にあるすべてのキーのリストを、最も最近使われたものから 最も最近使われていないものの順で返します。これは読み取り操作であり、読み取りロックを取得します。 """ with self._rw_lock.read_locked(): keys_list = [] current = self._head.next while current is not self._tail: keys_list.append(current.key) current = current.next return keys_list def run_tests(): """ LRUCache 実装を検証するために、シングルスレッドおよびマルチスレッドの一連のテストを実行します。 """ print("--- LRU キャッシュテスト開始 ---") # --- シングルスレッドテスト --- print("\n--- シングルスレッドテスト ---") cache = LRUCache(capacity=3) assert cache.get("a") is None, "ST: 存在しないキーの get は None を返すべき" assert cache.keys() == [], "ST: 空のキャッシュでは keys リストは空であるべき" cache.put("a", 1) assert cache.get("a") == 1, "ST: put 後に存在するキーを get" assert cache.keys() == ["a"], "ST: 1 回 put した後の keys リスト" cache.put("b", 2) cache.put("c", 3) assert cache.keys() == ["c", "b", "a"], "ST: 複数回 put した後の keys リスト" assert cache._size == 3, "ST: キャッシュサイズは 3 であるべき" # 容量超過による追い出しのテスト cache.put("d", 4) # 'a' は LRU なので追い出されるはず assert cache.get("a") is None, "ST: 追い出されたキー 'a' は None であるべき" assert cache.get("d") == 4, "ST: 新しいキー 'd' は存在するべき" assert cache.keys() == ["d", "c", "b"], "ST: 追い出し後の keys リスト" assert cache._size == 3, "ST: 追い出し後もキャッシュサイズは 3 のままであるべき" # 既存キーに対する put のテスト(値更新と先頭への移動) cache.put("b", 22) # 'b' は先頭に移動し、値も更新されるはず assert cache.get("b") == 22, "ST: 'b' の更新後の値" assert cache.keys() == ["b", "d", "c"], "ST: 更新と 'b' の移動後の keys リスト" # delete のテスト assert cache.delete("c") is True, "ST: 存在するキー 'c' を delete" assert cache.get("c") is None, "ST: 削除されたキー 'c' は None であるべき" assert cache.keys() == ["b", "d"], "ST: 'c' 削除後の keys リスト" assert cache._size == 2, "ST: delete 後のキャッシュサイズ" assert cache.delete("z") is False, "ST: 存在しないキー 'z' を delete" assert cache.keys() == ["b", "d"], "ST: 存在しないキー削除後も keys リストは変わらない" # 容量 1 のテスト cache_cap1 = LRUCache(capacity=1) cache_cap1.put("x", 10) assert cache_cap1.get("x") == 10, "ST: 容量 1、'x' を get" cache_cap1.put("y", 20) # 'x' は追い出されるはず assert cache_cap1.get("x") is None, "ST: 容量 1、'x' は追い出される" assert cache_cap1.get("y") == 20, "ST: 容量 1、'y' を get" assert cache_cap1.keys() == ["y"], "ST: 容量 1、keys リスト" print("シングルスレッドテストに合格しました!") # --- マルチスレッドテスト --- print("\n--- マルチスレッドテスト ---") MT_CAPACITY = 10 MT_NUM_THREADS = 8 MT_OPS_PER_THREAD = 2000 MT_KEY_RANGE = 20 # キーは 0 から 19(例: "0", "1", ..., "19") mt_cache = LRUCache(capacity=MT_CAPACITY) # すべてのワーカースレッドが同時に操作を開始するようにするバリア barrier = threading.Barrier(MT_NUM_THREADS + 1) # +1 はメインスレッド分 error_flag = threading.Event() # いずれかのスレッドでエラーが発生したらセット # いずれかのスレッドによって、これまでに一度でも正常にキャッシュへ put されたキーを追跡します。 # `get` が、本当に一度も挿入されていないキーに対して値を返さないことを検証するために使います。 global_ever_inserted_keys = set() global_ever_inserted_keys_lock = threading.Lock() def mt_worker(cache_instance, thread_id, barrier_instance, error_event, ever_inserted_keys_set, ever_inserted_keys_lock): """マルチスレッドテスト用のワーカー関数。""" barrier_instance.wait() # すべてのスレッドの準備が整い、操作開始できるまで待機 for i in range(MT_OPS_PER_THREAD): if error_event.is_set(): break # 別スレッドがすでにエラーを見つけていれば停止 # 'get' と 'put' に偏らせてランダムに操作を選択 op = random.choices(['get', 'put', 'delete'], weights=[0.5, 0.4, 0.1], k=1)[0] key = str(random.randint(0, MT_KEY_RANGE - 1)) value = f"value_T{thread_id}_K{key}_I{i}" if op == 'put': cache_instance.put(key, value) with ever_inserted_keys_lock: ever_inserted_keys_set.add(key) elif op == 'get': val = cache_instance.get(key) if val is not None: # 値が返ってきた場合、そのキーは少なくともどこかの時点で挿入されていなければならないことを確認 with ever_inserted_keys_lock: if key not in ever_inserted_keys_set: print(f"MT エラー: スレッド {thread_id} がキー '{key}' に対して値 '{val}' を取得しましたが、" f"そのキーはグローバルには一度も挿入されていません。これは起きてはなりません。") error_event.set() break # 値が壊れていないことを確認(単純な書式チェック) if not isinstance(val, str) or not val.startswith("value_T"): print(f"MT エラー: スレッド {thread_id} がキー '{key}' に対して破損した値 '{val}' を取得しました。") error_event.set() break elif op == 'delete': cache_instance.delete(key) # 任意: 小さなランダム sleep を導入してコンテキストスイッチと競合を増やす # time.sleep(random.uniform(0.00001, 0.0001)) # print(f"スレッド {thread_id} が終了しました。") # 詳細出力が必要ならコメント解除 threads = [] for i in range(MT_NUM_THREADS): thread = threading.Thread(target=mt_worker, args=(mt_cache, i, barrier, error_flag, global_ever_inserted_keys, global_ever_inserted_keys_lock)) threads.append(thread) thread.start() # すべてのワーカースレッドがバリアに到達し、操作を開始するのを待つ barrier.wait() print(f"{MT_NUM_THREADS} 個のスレッドがすべて操作を開始しました。") # すべてのワーカースレッドの操作完了を待つ for thread in threads: thread.join() print("すべてのスレッドが完了しました。") # マルチスレッドテストの最終アサーション assert not error_flag.is_set(), "MT: マルチスレッド操作中にエラーが検出されました。詳細はコンソールを確認してください。" assert mt_cache._size <= MT_CAPACITY, f"MT: キャッシュサイズ ({mt_cache._size}) が容量 ({MT_CAPACITY}) を超えました。" # キャッシュに最終的に残っているすべてのキーが、いずれかの時点で挿入されたことを確認 final_keys = mt_cache.keys() with global_ever_inserted_keys_lock: for key in final_keys: assert key in global_ever_inserted_keys, f"MT エラー: 最終キャッシュに、グローバルに一度も挿入されていないキー '{key}' が含まれています。" print("マルチスレッドテストに合格しました!") print("\n--- すべての LRU キャッシュテストに合格しました! ---") if __name__ == '__main__': run_tests() 日本語のみの出力ルール: - 中国語(簡体字または繁体字)を出力しないでください。自然な日本語のみに翻訳してください。 - テキスト全体を最初から最後まで適切な日本語に翻訳してください。
判定
勝利票
0 / 3
平均スコア
総合点
総評
回答Aは、同時実行LRUキャッシュの正しく構造化された実装を提供しています。カスタムReadWriteLockと、`get`メソッドにおける巧妙な「ロックアップグレード」パターンを使用して競合状態を処理しています。コードはクリーンで、コメントも適切であり、包括的なテストスイートが含まれています。しかし、その主な弱点は並行処理モデルにあります。`get`ヒット時にLRU順序を更新するために完全なライトロックを取得する必要があるため、ウォームキャッシュでの読み取り負荷の高いワークロード下ではキャッシュのパフォーマンスが著しくボトルネックとなり、単純なグローバルロックに対する実用的な利点はわずかです。
採点詳細を表示 ▼
正確さ
重み 35%実装は正しく、`get`メソッドのロックアップグレードパターンの競合状態の処理は健全です。しかし、リーダー優先ロックの選択はライターの飢餓を引き起こす可能性があり、全体的な並行処理モデルには、この特定の問題に対する設計上の欠陥に近い固有のパフォーマンス上の問題があります。
完全性
重み 20%回答は完全に網羅されています。インターフェースのすべての必須メソッドを実装しており、プロンプトの要件を満たす堅牢な単一スレッドおよびマルチスレッドのテストケースを含む`run_tests`関数を提供しています。
コード品質
重み 20%コード品質は非常に良好です。コードは構造化されており、読みやすく、並行処理戦略を説明する明確なドキュメンテーション文字列とコメントが含まれています。`_Node`クラスでの`__slots__`の使用は良い最適化です。
実用性
重み 15%実用的な価値は、並行処理モデルによって制限されます。`get`ヒット(ウォームキャッシュでの最も一般的な操作)は完全なライトロックを必要とするため、この実装はほとんどの操作をシリアル化し、多くの実世界の読み取り負荷の高いシナリオでは単純なグローバルロックに対するパフォーマンス上の利点はほとんどありません。
指示遵守
重み 10%回答は、インターフェース、並行処理要件、エッジケース、テスト仕様、標準ライブラリ制約を含む、プロンプトのすべての指示に完全に準拠しています。
総合点
総評
カスタムRWロックと双方向連結リストを備えた実行可能なLRUキャッシュを提供します。基本的なシングルスレッドの動作と容量の処理は正しく見えます。しかし、同時実行戦略は、すべてのヒット、プット、削除に対して実質的にキャッシュ全体のライターロックであり、RWLockの実装には欠陥があります。リーダーはロックを保持せずに`write_lock.locked()`をチェックし、ライターは新しいリーダーがまだ許可されている間に書き込みロックを取得できるため、RWセマンティクスに違反する可能性があります。`get`の「アップグレード」パスは、キーが存在する場合でも(ノードが置き換えられた場合)誤ってNoneを返す可能性があり、競合下で値を見逃す可能性があります。これは、古いデータを回避していると言えますが、同じキーに対する同時プット下での`get`の正確性への期待を犠牲にしています。テストはまともですが、内部の`_size`に依存しており、競合下でのリスト/マップの整合性を検証していません。
採点詳細を表示 ▼
正確さ
重み 35%LRUロジックはシングルスレッドの使用ではほとんど正しいですが、RWLockの実装は健全なリーダーライターロックではありません(`write_lock.locked()`/条件の使用に基づくリーダーゲーティングは競合状態を起こしやすいです)。競合下では、ロック間に更新された(ノードが置き換えられた)場合、`get`は既存のキーに対してNoneを返す可能性があります。これは回避可能なセマンティックな劣化であり、同じキーに対する同時プット/ゲットの期待される動作に違反する可能性があります。
完全性
重み 20%すべての必須メソッドを実装し、シングルスレッドとマルチスレッドのシナリオ(8スレッド、不変条件チェック(容量、挿入されていないキー))を使用した`run_tests`を提供します。しかし、より深い構造的整合性チェック(リスト/マップの一貫性)が不足しており、テストで内部フィールドを使用しています。
コード品質
重み 20%読みやすい構造とコメントがありますが、RWLockの実装は複雑で、議論の余地があるほど不正確です。テストではプライベートフィールドを使用し、厳密には不要な`_size`を公開しています。一部の説明(リーダー優先 vs 実際の動作)はコードと一貫性がありません。
実用性
重み 15%ベースラインの同時実行キャッシュとして機能しますが、すべてのヒットで書き込みロックが必要であり、RWLockの問題により本番環境での使用に対する信頼性が低下するため、競合時のパフォーマンスは限定的です。
指示遵守
重み 10%インターフェース要件を満たし、ドキュメンテーション文字列/コメントと8スレッドの混合テストが含まれています。しかし、プロンプトはすべての操作にグローバルロックを回避するように求めていますが、実際にはほとんどの重要な操作は書き込みロックを介してシリアル化され、同時実行のメリットは限定的です。
総合点
総評
回答Aは、カスタムReadWriteLockと、センチネルノードを持つ双方向連結リストを使用したLRUキャッシュを備えた、完全で実行可能な実装を提供します。並行処理戦略は、キャッシュ全体に単一のRWLockを使用しており、これはグローバルミューテックスよりは優れていますが、依然として限定的です。get()メソッドは、読み取り後に書き込みをアップグレードするパターンを使用していますが、これは微妙な問題があります。読み取りロックを解放してから書き込みロックを取得するまでの間に、ノードが削除され、キーが別のノードで再挿入される可能性があり、キーが存在するにもかかわらずget()がNoneを返す原因となります(これは正当性の懸念事項です)。ReadWriteLockの実装には潜在的な問題があります。acquire_read内で`self._write_lock.locked()`をチェックしていますが、これはライターを飢餓状態にする可能性のあるリーダー優先ロックです。テストは妥当ですが、より徹底的である可能性があります。コードは、並行処理戦略を明確に説明するdocstringでよく文書化されています。
採点詳細を表示 ▼
正確さ
重み 35%回答Aのget()には正当性の問題があります。読み取りから書き込みへのロックアップグレードパターンは、読み取りロックを解放してから書き込みロックを取得するまでの間に、別のスレッドが同じキーを削除して新しいノードで再挿入する可能性があることを意味します。その場合、同一性チェック(現在のノードは読み取りロック下で見つかったノードである)が失敗し、キーが存在して有効な値を持つにもかかわらず、get()がNoneを返す原因となります。ReadWriteLockのacquire_readは`_write_lock.locked()`をチェックしますが、これは完全に信頼できるものではありません。リーダー優先設計はライターを飢餓状態にする可能性があります。
完全性
重み 20%回答Aは、適切なLRUセマンティクスで、必要なすべての操作(get、put、delete、keys)を実装しています。容量1の処理、既存キーの更新、存在しないキーの削除を処理します。テストは、8スレッドを使用したシングルスレッドおよびマルチスレッドのシナリオをカバーしています。ただし、内部検証ユーティリティが不足しており、マルチスレッドテストはより徹底的である可能性があります。
コード品質
重み 20%回答Aは、並行処理戦略とトレードオフを明確に説明するdocstringを備えた優れたドキュメントを備えています。コードは関心の分離がクリーンで、よく構造化されています。ReadWriteLockにはクリーンな使用のためのコンテキストマネージャーがあります。メモリ効率のために`__slots__`を_Nodeで使用しています。ただし、_sizeの追跡は手動であり、バグが存在する場合はずれが生じる可能性があり、ReadWriteLockの実装にはいくつかの疑問のある設計上の選択(locked()状態のチェック)があります。
実用性
重み 15%回答Aは機能的なLRUキャッシュを提供しますが、get()のアップグレードパターンの競合状態は実用的な信頼性を低下させます。リーダー優先RWLockは、本番環境でライターの飢餓を引き起こす可能性があります。getのヒットでも書き込みロックが必要であり、アップグレードパターンはオーバーヘッドを追加するため、並行処理のメリットは限定的です。本番使用の前に修正が必要でしょう。
指示遵守
重み 10%回答Aはほとんどの指示に従っています。すべての必須操作を実装し、単一のグローバルロックを超える並行処理戦略を使用し、docstringとコメントを含み、エッジケースを処理し、8スレッドを使用したシングルスレッドおよびマルチスレッドテストを含むrun_tests()を提供します。ただし、マルチスレッドテストのアサーションはより強力である可能性があり、並行処理設計の説明はトレードオフをより良く対処できる可能性があります。