"""Tests for the generic query engine in muse/core/query_engine.py. Also contains regression tests that prove the two dead walkers in ``muse.plugins.code._query`` (``walk_commits`` and ``walk_commits_range``) are fully covered by the live walkers (``walk_commits_bfs`` and ``store.walk_commits_between``) before those dead functions are deleted. """ import datetime import pathlib import tempfile import pytest from muse.core.query_engine import QueryMatch, format_matches, walk_history from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, walk_commits_between, write_commit, ) from muse.plugins.code._query import walk_commits_bfs from muse.core.types import Manifest from muse.core.paths import heads_dir, muse_dir # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Set up a minimal .muse/ structure for query_engine tests.""" muse = muse_dir(tmp_path) muse.mkdir() (muse / "repo.json").write_text('{"repo_id":"test-repo"}') (muse / "HEAD").write_text("ref: refs/heads/main") (muse / "commits").mkdir() (muse / "snapshots").mkdir() (muse / "refs" / "heads").mkdir(parents=True) return tmp_path def _write_commit(root: pathlib.Path, label: str, parent_id: str | None = None) -> CommitRecord: """Write a content-addressed CommitRecord. *label* is used only in the message.""" snap_id = compute_snapshot_id({}) committed_at = datetime.datetime.now(datetime.timezone.utc) parent_ids = [parent_id] if parent_id else [] commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=f"commit {label}", committed_at_iso=committed_at.isoformat(), author="test-author", ) record = CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=f"commit {label}", committed_at=committed_at, parent_commit_id=parent_id, author="test-author", ) write_commit(root, record) return record # --------------------------------------------------------------------------- # walk_history # --------------------------------------------------------------------------- class TestWalkHistory: def test_empty_branch_returns_empty(self) -> None: with tempfile.TemporaryDirectory() as tmp: root = _make_repo(pathlib.Path(tmp)) results = walk_history(root, "main", lambda c, m, r: []) assert results == [] def test_single_commit_visited(self) -> None: with tempfile.TemporaryDirectory() as tmp: root = _make_repo(pathlib.Path(tmp)) c = _write_commit(root, "aaa111") (heads_dir(root) / "main").write_text(c.commit_id) visited: list[str] = [] def evaluator(commit: CommitRecord, manifest: Manifest, r: pathlib.Path) -> list[QueryMatch]: visited.append(commit.commit_id) return [] walk_history(root, "main", evaluator, load_manifest=False) assert visited == [c.commit_id] def test_chain_walked_newest_first(self) -> None: with tempfile.TemporaryDirectory() as tmp: root = _make_repo(pathlib.Path(tmp)) c_aaa = _write_commit(root, "aaa111") c_bbb = _write_commit(root, "bbb222", parent_id=c_aaa.commit_id) (heads_dir(root) / "main").write_text(c_bbb.commit_id) visited: list[str] = [] def evaluator(commit: CommitRecord, manifest: Manifest, r: pathlib.Path) -> list[QueryMatch]: visited.append(commit.commit_id) return [] walk_history(root, "main", evaluator, load_manifest=False) assert visited == [c_bbb.commit_id, c_aaa.commit_id] def test_matches_collected(self) -> None: with tempfile.TemporaryDirectory() as tmp: root = _make_repo(pathlib.Path(tmp)) c = _write_commit(root, "ccc333") (heads_dir(root) / "main").write_text(c.commit_id) def evaluator(commit: CommitRecord, manifest: Manifest, r: pathlib.Path) -> list[QueryMatch]: return [QueryMatch( commit_id=commit.commit_id, author=commit.author, committed_at=commit.committed_at.isoformat(), branch=commit.branch, detail="test match", extra={}, )] results = walk_history(root, "main", evaluator, load_manifest=False) assert len(results) == 1 assert results[0]["detail"] == "test match" def test_max_commits_limits_walk(self) -> None: with tempfile.TemporaryDirectory() as tmp: root = _make_repo(pathlib.Path(tmp)) records: list[CommitRecord] = [] for i in range(10): parent_id = records[i - 1].commit_id if i > 0 else None records.append(_write_commit(root, f"commit{i:03d}", parent_id=parent_id)) (heads_dir(root) / "main").write_text(records[-1].commit_id) visited: list[str] = [] def evaluator(commit: CommitRecord, manifest: Manifest, r: pathlib.Path) -> list[QueryMatch]: visited.append(commit.commit_id) return [] walk_history(root, "main", evaluator, max_commits=3, load_manifest=False) assert len(visited) == 3 def test_head_commit_id_override(self) -> None: with tempfile.TemporaryDirectory() as tmp: root = _make_repo(pathlib.Path(tmp)) c_aaa = _write_commit(root, "aaa111") c_bbb = _write_commit(root, "bbb222", parent_id=c_aaa.commit_id) # HEAD points to bbb222 but we override to aaa111. (heads_dir(root) / "main").write_text(c_bbb.commit_id) visited: list[str] = [] def evaluator(commit: CommitRecord, manifest: Manifest, r: pathlib.Path) -> list[QueryMatch]: visited.append(commit.commit_id) return [] walk_history(root, "main", evaluator, head_commit_id=c_aaa.commit_id, load_manifest=False) assert visited == [c_aaa.commit_id] # --------------------------------------------------------------------------- # format_matches # --------------------------------------------------------------------------- class TestFormatMatches: def test_empty_returns_no_matches(self) -> None: assert "No matches" in format_matches([]) def test_single_match_formatted(self) -> None: m = QueryMatch( commit_id="a" * 64, author="gabriel", committed_at="2026-03-18T12:00:00+00:00", branch="main", detail="my_function (added)", extra={}, ) out = format_matches([m]) assert ("a" * 64)[:8] in out assert "gabriel" in out assert "my_function (added)" in out def test_agent_id_shown_when_present(self) -> None: m = QueryMatch( commit_id="a" * 64, author="bot", committed_at="2026-03-18T12:00:00+00:00", branch="main", detail="something", extra={}, agent_id="claude-v4", ) out = format_matches([m]) assert "claude-v4" in out def test_max_results_truncation_message_updated(self) -> None: """format_matches uses '--limit' in the truncation hint (not '--max').""" matches = [ QueryMatch( commit_id=f"commit{i:04d}", author="x", committed_at="2026-01-01T00:00:00+00:00", branch="main", detail=f"match {i}", extra={}, ) for i in range(10) ] out = format_matches(matches, max_results=5) assert "--limit" in out def test_max_results_capped(self) -> None: matches = [ QueryMatch( commit_id=f"commit{i:04d}", author="x", committed_at="2026-01-01T00:00:00+00:00", branch="main", detail=f"match {i}", extra={}, ) for i in range(100) ] out = format_matches(matches, max_results=5) assert "95 more" in out # --------------------------------------------------------------------------- # Regression tests: dead walkers covered by live walkers # # These tests prove that walk_commits_bfs and store.walk_commits_between # fully cover the use-cases of the dead walk_commits and walk_commits_range # before those functions are deleted. If these tests pass, deletion is safe. # --------------------------------------------------------------------------- def _make_repo_for_walker(tmp_path: pathlib.Path) -> pathlib.Path: muse = muse_dir(tmp_path) muse.mkdir() (muse / "repo.json").write_text('{"repo_id":"walker-test"}') (muse / "HEAD").write_text("main") (muse / "commits").mkdir() (muse / "snapshots").mkdir() (muse / "refs" / "heads").mkdir(parents=True) return tmp_path def _commit( root: pathlib.Path, label: str, parent: str | None = None, parent2: str | None = None, ) -> CommitRecord: """Write a content-addressed CommitRecord. *label* is used only in the message.""" snap_id = compute_snapshot_id({}) committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) parent_ids = [p for p in [parent, parent2] if p is not None] commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=f"msg {label}", committed_at_iso=committed_at.isoformat(), author="tester", ) rec = CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=f"msg {label}", committed_at=committed_at, parent_commit_id=parent, parent2_commit_id=parent2, author="tester", ) write_commit(root, rec) return rec class TestWalkHistoryFollowMerges: """Belt-and-suspenders tests for walk_history(follow_merges=True/False).""" def test_follow_merges_false_skips_parent2( self, tmp_path: pathlib.Path ) -> None: """follow_merges=False (default) stays on the main parent chain only.""" root = _make_repo_for_walker(tmp_path) c_main1 = _commit(root, "main1") c_feat1 = _commit(root, "feat1", parent=c_main1.commit_id) c_merge = _commit(root, "merge_c", parent=c_main1.commit_id, parent2=c_feat1.commit_id) (heads_dir(root) / "main").write_text(c_merge.commit_id) visited: list[str] = [] def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]: visited.append(c.commit_id) return [] walk_history(root, "main", ev, follow_merges=False, load_manifest=False) assert c_feat1.commit_id not in visited assert c_merge.commit_id in visited assert c_main1.commit_id in visited def test_follow_merges_true_visits_parent2( self, tmp_path: pathlib.Path ) -> None: """follow_merges=True visits both parents of a merge commit.""" root = _make_repo_for_walker(tmp_path) c_base = _commit(root, "base") c_feature = _commit(root, "feature", parent=c_base.commit_id) c_merge = _commit(root, "merge_c", parent=c_base.commit_id, parent2=c_feature.commit_id) (heads_dir(root) / "main").write_text(c_merge.commit_id) visited: list[str] = [] def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]: visited.append(c.commit_id) return [] walk_history(root, "main", ev, follow_merges=True, load_manifest=False) assert set(visited) == {c_merge.commit_id, c_base.commit_id, c_feature.commit_id} def test_follow_merges_true_linear_chain( self, tmp_path: pathlib.Path ) -> None: """follow_merges=True on a linear chain behaves identically to False.""" root = _make_repo_for_walker(tmp_path) c_a = _commit(root, "a") c_b = _commit(root, "b", parent=c_a.commit_id) c_c = _commit(root, "c", parent=c_b.commit_id) (heads_dir(root) / "main").write_text(c_c.commit_id) visited_ff: list[str] = [] visited_ft: list[str] = [] def ev_ff(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]: visited_ff.append(c.commit_id) return [] def ev_ft(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]: visited_ft.append(c.commit_id) return [] walk_history(root, "main", ev_ff, follow_merges=False, load_manifest=False) walk_history(root, "main", ev_ft, follow_merges=True, load_manifest=False) assert set(visited_ff) == set(visited_ft) == {c_a.commit_id, c_b.commit_id, c_c.commit_id} def test_follow_merges_since_filter_applies( self, tmp_path: pathlib.Path ) -> None: """since filter still prunes commits even with follow_merges=True.""" root = _make_repo_for_walker(tmp_path) # Pin explicit timestamps so since filter is deterministic. t_old = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc) t_new = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) snap_id = compute_snapshot_id({}) cid_old = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="old", committed_at_iso=t_old.isoformat(), author="tester", ) rec_old = CommitRecord( commit_id=cid_old, branch="main", snapshot_id=snap_id, message="old", committed_at=t_old, author="tester", ) write_commit(root, rec_old) cid_new = compute_commit_id( parent_ids=[cid_old], snapshot_id=snap_id, message="new", committed_at_iso=t_new.isoformat(), author="tester", ) rec_new = CommitRecord( commit_id=cid_new, branch="main", snapshot_id=snap_id, message="new", committed_at=t_new, parent_commit_id=cid_old, author="tester", ) write_commit(root, rec_new) (heads_dir(root) / "main").write_text(cid_new) visited: list[str] = [] def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]: visited.append(c.commit_id) return [] since = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc) walk_history(root, "main", ev, follow_merges=True, since=since, load_manifest=False) assert cid_new in visited assert cid_old not in visited def test_follow_merges_true_diamond_dag_no_duplicates( self, tmp_path: pathlib.Path ) -> None: """BFS never visits the same commit twice (diamond DAG case).""" root = _make_repo_for_walker(tmp_path) # Diamond: base ← left ← merge, base ← right ← merge c_base = _commit(root, "base") c_left = _commit(root, "left", parent=c_base.commit_id) c_right = _commit(root, "right", parent=c_base.commit_id) c_merge = _commit(root, "merge_c", parent=c_left.commit_id, parent2=c_right.commit_id) (heads_dir(root) / "main").write_text(c_merge.commit_id) visited: list[str] = [] def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]: visited.append(c.commit_id) return [] walk_history(root, "main", ev, follow_merges=True, load_manifest=False) # Each commit visited exactly once. assert len(visited) == len(set(visited)) assert set(visited) == {c_base.commit_id, c_left.commit_id, c_right.commit_id, c_merge.commit_id} def test_follow_merges_max_commits_respected( self, tmp_path: pathlib.Path ) -> None: """max_commits caps BFS walk even with follow_merges=True.""" root = _make_repo_for_walker(tmp_path) c1 = _commit(root, "c1") c2 = _commit(root, "c2", parent=c1.commit_id) c3 = _commit(root, "c3", parent=c2.commit_id) c4 = _commit(root, "c4", parent=c3.commit_id) (heads_dir(root) / "main").write_text(c4.commit_id) visited: list[str] = [] def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]: visited.append(c.commit_id) return [] walk_history(root, "main", ev, follow_merges=True, max_commits=2, load_manifest=False) assert len(visited) == 2 def test_follow_merges_evaluator_sees_match( self, tmp_path: pathlib.Path ) -> None: """Matches from parent2 commits are included in results.""" root = _make_repo_for_walker(tmp_path) c_base = _commit(root, "base") c_feature = _commit(root, "feature", parent=c_base.commit_id) c_merge = _commit(root, "merge_c", parent=c_base.commit_id, parent2=c_feature.commit_id) (heads_dir(root) / "main").write_text(c_merge.commit_id) def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]: if c.commit_id == c_feature.commit_id: return [QueryMatch( commit_id=c.commit_id, author=c.author, committed_at=c.committed_at.isoformat(), branch=c.branch, detail="feature found", extra={}, )] return [] results = walk_history(root, "main", ev, follow_merges=True, load_manifest=False) assert len(results) == 1 assert results[0]["detail"] == "feature found" def test_follow_merges_false_misses_parent2_commit( self, tmp_path: pathlib.Path ) -> None: """With follow_merges=False, parent2 commits are never evaluated.""" root = _make_repo_for_walker(tmp_path) c_base = _commit(root, "base") c_feature = _commit(root, "feature", parent=c_base.commit_id) c_merge = _commit(root, "merge_c", parent=c_base.commit_id, parent2=c_feature.commit_id) (heads_dir(root) / "main").write_text(c_merge.commit_id) def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]: if c.commit_id == c_feature.commit_id: return [QueryMatch( commit_id=c.commit_id, author=c.author, committed_at=c.committed_at.isoformat(), branch=c.branch, detail="feature found", extra={}, )] return [] results = walk_history(root, "main", ev, follow_merges=False, load_manifest=False) assert results == [] # feature commit is never visited class TestLiveWalkersContracts: """Regression: walk_commits_bfs and walk_commits_between cover deleted walkers. These tests lock down the contracts of the surviving walkers, proving the deleted walk_commits and walk_commits_range are fully superseded. """ def test_walk_commits_bfs_linear_chain(self, tmp_path: pathlib.Path) -> None: """walk_commits_bfs on a linear chain returns all commits, newest first.""" root = _make_repo_for_walker(tmp_path) c_aaa = _commit(root, "aaa") c_bbb = _commit(root, "bbb", parent=c_aaa.commit_id) c_ccc = _commit(root, "ccc", parent=c_bbb.commit_id) live_commits, truncated = walk_commits_bfs(root, c_ccc.commit_id) live_ids = [c.commit_id for c in live_commits] assert truncated is False assert set(live_ids) == {c_aaa.commit_id, c_bbb.commit_id, c_ccc.commit_id} def test_walk_commits_bfs_follows_parent2(self, tmp_path: pathlib.Path) -> None: """walk_commits_bfs reaches parent2 branches — supersedes dead linear walker.""" root = _make_repo_for_walker(tmp_path) c_base = _commit(root, "base") c_feature = _commit(root, "feature", parent=c_base.commit_id) c_merge = _commit(root, "merge_commit", parent=c_base.commit_id, parent2=c_feature.commit_id) live_commits, _ = walk_commits_bfs(root, c_merge.commit_id) live_ids = set(c.commit_id for c in live_commits) assert c_feature.commit_id in live_ids assert c_base.commit_id in live_ids assert c_merge.commit_id in live_ids def test_walk_commits_between_range(self, tmp_path: pathlib.Path) -> None: """walk_commits_between excludes from_commit_id — supersedes walk_commits_range.""" root = _make_repo_for_walker(tmp_path) c1 = _commit(root, "c1") c2 = _commit(root, "c2", parent=c1.commit_id) c3 = _commit(root, "c3", parent=c2.commit_id) c4 = _commit(root, "c4", parent=c3.commit_id) result = walk_commits_between(root, to_commit_id=c4.commit_id, from_commit_id=c1.commit_id) ids = [c.commit_id for c in result] assert ids == [c4.commit_id, c3.commit_id, c2.commit_id] assert c1.commit_id not in ids def test_walk_commits_between_none_from(self, tmp_path: pathlib.Path) -> None: """walk_commits_between with from_commit_id=None returns entire chain.""" root = _make_repo_for_walker(tmp_path) c_x1 = _commit(root, "x1") c_x2 = _commit(root, "x2", parent=c_x1.commit_id) ids = [c.commit_id for c in walk_commits_between(root, c_x2.commit_id, None)] assert ids == [c_x2.commit_id, c_x1.commit_id] def test_walk_commits_bfs_stop_at_excludes_boundary( self, tmp_path: pathlib.Path ) -> None: """walk_commits_bfs stop_at_commit_id excludes the boundary — same contract as walk_commits_between.""" root = _make_repo_for_walker(tmp_path) c_p1 = _commit(root, "p1") c_p2 = _commit(root, "p2", parent=c_p1.commit_id) c_p3 = _commit(root, "p3", parent=c_p2.commit_id) bfs_commits, _ = walk_commits_bfs(root, c_p3.commit_id, stop_at_commit_id=c_p1.commit_id) bfs_ids = [c.commit_id for c in bfs_commits] assert c_p1.commit_id not in bfs_ids assert set(bfs_ids) == {c_p3.commit_id, c_p2.commit_id}