"""TDD Phase 2 — migrate inline BFS/DFS implementations to walk_dag / iter_ancestors. Structural tests FAIL now (inline patterns still present); PASS after migration. Behavioral tests PASS before and after (pin the public contract). Target sites ------------ bisect._ancestors — DFS list[str] oldest-first bisect._reachable_from_good — DFS set[str] branch._commit_ancestors — DFS set[str] pack.build_mpack BFS block — BFS with have-set exclusion pack.collect_blob_ids BFS block — BFS with have-set exclusion _query.walk_commits_bfs — BFS with stop_at_commit_id midi/_query.walk_commits_for_track — linear first-parent walk """ from __future__ import annotations import datetime import inspect import json import pathlib import types import pytest from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.paths import muse_dir _BASE_DT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) # --------------------------------------------------------------------------- # Shared helpers (same pattern as test_core_graph.py) # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(tmp_path) for d in ("objects", "commits", "snapshots", "refs/heads"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"})) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") return tmp_path def _write_commit( repo: pathlib.Path, message: str = "commit", parent: str | None = None, parent2: str | None = None, author: str = "Author", dt: datetime.datetime = _BASE_DT, ) -> str: snap_id = compute_snapshot_id({}) write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest={})) parent_ids = [p for p in (parent, parent2) if p is not None] commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=message, committed_at_iso=dt.isoformat(), author=author, ) write_commit( repo, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=message, committed_at=dt, parent_commit_id=parent, author=author, parent2_commit_id=parent2, ), ) return commit_id def _no_inline_bfs(source: str) -> bool: """True when source has no standalone BFS queue patterns.""" return "while queue:" not in source and "queue.popleft()" not in source and "queue.pop()" not in source # --------------------------------------------------------------------------- # bisect._ancestors # --------------------------------------------------------------------------- class TestBisectAncestors: def _fn(self) -> types.FunctionType: from muse.core import bisect return bisect._ancestors def test_structural_no_inline_bfs(self) -> None: """_ancestors must not use an inline BFS/DFS queue after migration.""" src = inspect.getsource(self._fn()) assert _no_inline_bfs(src), ( "bisect._ancestors still uses an inline queue — migrate to ancestor_ids" ) def test_linear_chain_all_ancestors_returned(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) root = _write_commit(repo, "root") mid = _write_commit(repo, "mid", parent=root) tip = _write_commit(repo, "tip", parent=mid) result = self._fn()(repo, tip) assert set(result) == {root, mid, tip} def test_returns_oldest_first(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) root = _write_commit(repo, "root") mid = _write_commit(repo, "mid", parent=root) tip = _write_commit(repo, "tip", parent=mid) result = self._fn()(repo, tip) # oldest-first: root then mid then tip assert result.index(root) < result.index(mid) < result.index(tip) def test_single_commit(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) cid = _write_commit(repo, "solo") assert self._fn()(repo, cid) == [cid] def test_diamond_no_duplicates(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) d = _write_commit(repo, "D") b = _write_commit(repo, "B", parent=d) c = _write_commit(repo, "C", parent=d) a = _write_commit(repo, "A", parent=b, parent2=c) result = self._fn()(repo, a) assert result.count(d) == 1 assert set(result) == {a, b, c, d} # --------------------------------------------------------------------------- # bisect._reachable_from_good # --------------------------------------------------------------------------- class TestBisectReachableFromGood: def _fn(self) -> types.FunctionType: from muse.core import bisect return bisect._reachable_from_good def test_structural_no_inline_bfs(self) -> None: """_reachable_from_good must not use an inline BFS/DFS queue.""" src = inspect.getsource(self._fn()) assert _no_inline_bfs(src), ( "bisect._reachable_from_good still uses an inline queue — migrate to ancestor_ids" ) def test_single_good_includes_ancestors(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) root = _write_commit(repo, "root") mid = _write_commit(repo, "mid", parent=root) tip = _write_commit(repo, "tip", parent=mid) result = self._fn()(repo, [tip]) assert result == {tip, mid, root} def test_multi_source_union(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) shared = _write_commit(repo, "shared") a = _write_commit(repo, "a", parent=shared) b = _write_commit(repo, "b", parent=shared) result = self._fn()(repo, [a, b]) assert result == {a, b, shared} def test_shared_ancestor_once(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) shared = _write_commit(repo, "shared") a = _write_commit(repo, "a", parent=shared) b = _write_commit(repo, "b", parent=shared) result = self._fn()(repo, [a, b]) assert shared in result def test_empty_good_ids(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = self._fn()(repo, []) assert result == set() # --------------------------------------------------------------------------- # branch._commit_ancestors # --------------------------------------------------------------------------- class TestBranchCommitAncestors: def _fn(self) -> types.FunctionType: from muse.cli.commands import branch as branch_module return branch_module._commit_ancestors def test_structural_no_inline_bfs(self) -> None: """_commit_ancestors must not use an inline BFS/DFS queue.""" src = inspect.getsource(self._fn()) assert _no_inline_bfs(src), ( "branch._commit_ancestors still uses an inline queue — migrate to ancestor_ids" ) def test_returns_set_including_self(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) root = _write_commit(repo, "root") tip = _write_commit(repo, "tip", parent=root) result = self._fn()(repo, tip) assert tip in result assert root in result assert isinstance(result, set) def test_diamond_no_duplicates(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) d = _write_commit(repo, "D") b = _write_commit(repo, "B", parent=d) c = _write_commit(repo, "C", parent=d) a = _write_commit(repo, "A", parent=b, parent2=c) result = self._fn()(repo, a) assert result == {a, b, c, d} def test_single_commit(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) cid = _write_commit(repo, "solo") assert self._fn()(repo, cid) == {cid} # --------------------------------------------------------------------------- # pack.build_mpack BFS block # --------------------------------------------------------------------------- class TestPackBuildMpackBFS: def test_structural_no_inline_bfs(self) -> None: """build_mpack must not contain its own inline BFS queue.""" from muse.core import mpack as pack_module src = inspect.getsource(pack_module.build_mpack) assert _no_inline_bfs(src), ( "pack.build_mpack still has an inline BFS queue — migrate to iter_ancestors" ) def test_have_set_excludes_old_commits(self, tmp_path: pathlib.Path) -> None: """build_mpack with have=[old] must not include old commit in mpack.""" from muse.core.mpack import build_mpack from muse.core.object_store import write_object from muse.core.types import blob_id repo = _make_repo(tmp_path) old_content = b"old-object" old_oid = blob_id(old_content) write_object(repo, old_oid, old_content) old_snap_id = compute_snapshot_id({"f.txt": old_oid}) write_snapshot(repo, SnapshotRecord(snapshot_id=old_snap_id, manifest={"f.txt": old_oid})) old_cid = compute_commit_id( parent_ids=[], snapshot_id=old_snap_id, message="old", committed_at_iso=_BASE_DT.isoformat(), author="A", ) write_commit(repo, CommitRecord( commit_id=old_cid, branch="main", snapshot_id=old_snap_id, message="old", committed_at=_BASE_DT, author="A", )) new_content = b"new-object" new_oid = blob_id(new_content) write_object(repo, new_oid, new_content) new_snap_id = compute_snapshot_id({"g.txt": new_oid}) write_snapshot(repo, SnapshotRecord(snapshot_id=new_snap_id, manifest={"g.txt": new_oid})) new_cid = compute_commit_id( parent_ids=[old_cid], snapshot_id=new_snap_id, message="new", committed_at_iso=_BASE_DT.isoformat(), author="A", ) write_commit(repo, CommitRecord( commit_id=new_cid, branch="main", snapshot_id=new_snap_id, message="new", committed_at=_BASE_DT, parent_commit_id=old_cid, author="A", )) mpack = build_mpack(repo, [new_cid], have=[old_cid]) sent_ids = {c["commit_id"] for c in mpack["commits"]} assert new_cid in sent_ids assert old_cid not in sent_ids # --------------------------------------------------------------------------- # pack.collect_blob_ids BFS block # --------------------------------------------------------------------------- class TestPackCollectObjectIds: def test_structural_no_inline_bfs(self) -> None: """collect_blob_ids must not contain its own inline BFS queue.""" from muse.core import mpack as pack_module src = inspect.getsource(pack_module.collect_blob_ids) assert _no_inline_bfs(src), ( "pack.collect_blob_ids still has an inline BFS queue — migrate to iter_ancestors" ) def test_have_excludes_known_objects(self, tmp_path: pathlib.Path) -> None: """Objects reachable only via have-commits are excluded from result.""" from muse.core.mpack import collect_blob_ids from muse.core.object_store import write_object from muse.core.types import blob_id repo = _make_repo(tmp_path) old_content = b"old" old_oid = blob_id(old_content) write_object(repo, old_oid, old_content) old_snap_id = compute_snapshot_id({"a.txt": old_oid}) write_snapshot(repo, SnapshotRecord(snapshot_id=old_snap_id, manifest={"a.txt": old_oid})) old_cid = compute_commit_id( parent_ids=[], snapshot_id=old_snap_id, message="old", committed_at_iso=_BASE_DT.isoformat(), author="A", ) write_commit(repo, CommitRecord( commit_id=old_cid, branch="main", snapshot_id=old_snap_id, message="old", committed_at=_BASE_DT, author="A", )) new_content = b"new" new_oid = blob_id(new_content) write_object(repo, new_oid, new_content) new_snap_id = compute_snapshot_id({"b.txt": new_oid}) write_snapshot(repo, SnapshotRecord(snapshot_id=new_snap_id, manifest={"b.txt": new_oid})) new_cid = compute_commit_id( parent_ids=[old_cid], snapshot_id=new_snap_id, message="new", committed_at_iso=_BASE_DT.isoformat(), author="A", ) write_commit(repo, CommitRecord( commit_id=new_cid, branch="main", snapshot_id=new_snap_id, message="new", committed_at=_BASE_DT, parent_commit_id=old_cid, author="A", )) result = collect_blob_ids(repo, [new_cid], have=[old_cid]) assert new_oid in result assert old_oid not in result # --------------------------------------------------------------------------- # code._query.walk_commits_bfs # --------------------------------------------------------------------------- class TestWalkCommitsBfs: def _fn(self) -> types.FunctionType: from muse.plugins.code import _query return _query.walk_commits_bfs def test_structural_no_inline_bfs(self) -> None: """walk_commits_bfs must not use an inline BFS queue after migration.""" src = inspect.getsource(self._fn()) assert _no_inline_bfs(src), ( "code._query.walk_commits_bfs still has an inline queue — migrate to iter_ancestors" ) def test_returns_all_commits_no_stop(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) root = _write_commit(repo, "root") mid = _write_commit(repo, "mid", parent=root) tip = _write_commit(repo, "tip", parent=mid) commits, truncated = self._fn()(repo, tip) assert {c.commit_id for c in commits} == {root, mid, tip} assert truncated is False def test_stop_at_commit_id_excludes_boundary(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) root = _write_commit(repo, "root") mid = _write_commit(repo, "mid", parent=root) tip = _write_commit(repo, "tip", parent=mid) commits, truncated = self._fn()(repo, tip, stop_at_commit_id=mid) ids = {c.commit_id for c in commits} assert tip in ids assert mid not in ids assert root not in ids def test_max_commits_returns_truncated_true(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) prev = None for i in range(5): prev = _write_commit(repo, f"c{i}", parent=prev) tip = prev commits, truncated = self._fn()(repo, tip, max_commits=3) assert len(commits) <= 3 assert truncated is True def test_merge_commit_visits_both_parents(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) root = _write_commit(repo, "root") left = _write_commit(repo, "left", parent=root) right = _write_commit(repo, "right", parent=root) merge = _write_commit(repo, "merge", parent=left, parent2=right) commits, _ = self._fn()(repo, merge) ids = {c.commit_id for c in commits} assert ids == {merge, left, right, root} # --------------------------------------------------------------------------- # midi._query.walk_commits_for_track (first-parent walk) # --------------------------------------------------------------------------- class TestWalkCommitsForTrack: def _fn(self) -> types.FunctionType: from muse.plugins.midi import _query return _query.walk_commits_for_track def test_structural_no_inline_while(self) -> None: """walk_commits_for_track must not use an inline while-loop walk.""" src = inspect.getsource(self._fn()) assert "while current_id" not in src and "while commit_id" not in src, ( "midi._query.walk_commits_for_track still uses an inline while-loop — " "migrate to iter_ancestors(first_parent_only=True)" ) def test_linear_chain_all_returned(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) root = _write_commit(repo, "root") mid = _write_commit(repo, "mid", parent=root) tip = _write_commit(repo, "tip", parent=mid) result = self._fn()(repo, tip, track_path="song.mid") commit_ids = {r[0].commit_id for r in result} assert commit_ids == {root, mid, tip} def test_first_parent_only_skips_second_parent(self, tmp_path: pathlib.Path) -> None: """walk_commits_for_track must follow only first-parent at merge commits.""" repo = _make_repo(tmp_path) root = _write_commit(repo, "root") branch = _write_commit(repo, "branch", parent=root) main_tip = _write_commit(repo, "main_tip", parent=root) merge = _write_commit(repo, "merge", parent=main_tip, parent2=branch) result = self._fn()(repo, merge, track_path="song.mid") commit_ids = {r[0].commit_id for r in result} assert merge in commit_ids assert main_tip in commit_ids assert root in commit_ids assert branch not in commit_ids # second-parent branch skipped def test_max_commits_cap(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) prev = None for i in range(10): prev = _write_commit(repo, f"c{i}", parent=prev) tip = prev result = self._fn()(repo, tip, track_path="song.mid", max_commits=3) assert len(result) <= 3