"""Comprehensive tests for ``muse merge-base`` and ``snapshot-diff``. Coverage tiers -------------- - Integration: linear ancestor, diverged branches, no common ancestor, branch name resolution, HEAD resolution, JSON/text format - Security: ANSI in paths stripped in text mode, errors to stderr - Stress: 10-commit chain merge-base, 50-path manifest diff """ from __future__ import annotations import datetime import json import pathlib from muse.core.errors import ExitCode from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest from muse.core.paths import head_path, muse_dir, ref_path from tests.cli_test_helper import CliRunner, InvokeResult runner = CliRunner() _DT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: repo = tmp_path / "repo" dot_muse = muse_dir(repo) for sub in ("objects", "commits", "snapshots", "refs/heads"): (dot_muse / sub).mkdir(parents=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main") (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo", "domain": "code"})) return repo def _snap( repo: pathlib.Path, *, manifest: Manifest | None = None, ) -> str: """Write a snapshot with a real content-addressed ID; return the ID.""" m = manifest if manifest is not None else {} sid = hash_snapshot(m) write_snapshot(repo, SnapshotRecord( snapshot_id=sid, manifest=m, created_at=_DT, )) return sid def _commit( repo: pathlib.Path, snap_id: str, *, message: str = "test", parent: str | None = None, parent2: str | None = None, branch: str = "main", ) -> str: """Write a commit with a real content-addressed ID; return the ID.""" parent_ids: list[str] = [p for p in [parent, parent2] if p is not None] cid = hash_commit( parent_ids=parent_ids, snapshot_id=snap_id, message=message, committed_at_iso=_DT.isoformat(), ) write_commit(repo, CommitRecord( commit_id=cid, branch=branch, snapshot_id=snap_id, message=message, committed_at=_DT, parent_commit_id=parent, parent2_commit_id=parent2, )) return cid def _set_head(repo: pathlib.Path, branch: str, commit_id: str) -> None: ref = ref_path(repo, branch) ref.parent.mkdir(parents=True, exist_ok=True) ref.write_text(commit_id) (head_path(repo)).write_text(f"ref: refs/heads/{branch}") def _mb(repo: pathlib.Path, *args: str) -> InvokeResult: from muse.cli.app import main as cli return runner.invoke( cli, ["merge-base", *args], env={"MUSE_REPO_ROOT": str(repo)}, ) def _sd(repo: pathlib.Path, *args: str) -> InvokeResult: from muse.cli.app import main as cli return runner.invoke( cli, ["snapshot-diff", *args], env={"MUSE_REPO_ROOT": str(repo)}, ) def _fake_oid(n: int) -> str: return format(n, "064x") # =========================================================================== # merge-base tests # =========================================================================== class TestMergeBase: def test_same_commit_is_its_own_base(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid, message="solo") result = _mb(repo, "--json", cid, cid) assert result.exit_code == 0 data = json.loads(result.output) assert data["merge_base"] == cid def test_linear_chain_base_is_parent(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) c1 = _commit(repo, sid, message="c1") c2 = _commit(repo, sid, message="c2", parent=c1) result = _mb(repo, "--json", c1, c2) assert result.exit_code == 0 data = json.loads(result.output) assert data["merge_base"] == c1 def test_diverged_branches_find_common_ancestor(self, tmp_path: pathlib.Path) -> None: """ base → left → right merge-base(left, right) == base """ repo = _make_repo(tmp_path) sid = _snap(repo) base = _commit(repo, sid, message="base") left = _commit(repo, sid, message="left", parent=base) right = _commit(repo, sid, message="right", parent=base) result = _mb(repo, "--json", left, right) assert result.exit_code == 0 data = json.loads(result.output) assert data["merge_base"] == base def test_unrelated_commits_no_common_ancestor(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) c1 = _commit(repo, sid, message="unrelated-c1") c2 = _commit(repo, sid, message="unrelated-c2") result = _mb(repo, "--json", c1, c2) assert result.exit_code == 0 data = json.loads(result.output) assert data["merge_base"] is None assert "error" in data def test_branch_name_resolution(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid, message="branch-res") _set_head(repo, "main", cid) result = _mb(repo, "--json", "main", cid) assert result.exit_code == 0 data = json.loads(result.output) assert data["merge_base"] == cid def test_head_resolution(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid, message="head-res") _set_head(repo, "main", cid) result = _mb(repo, "--json", "HEAD", cid) assert result.exit_code == 0 data = json.loads(result.output) assert data["merge_base"] == cid def test_text_format_prints_bare_id(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid, message="text-bare") result = _mb(repo, cid, cid) assert result.exit_code == 0 assert cid in result.output def test_text_format_no_ancestor(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) c1 = _commit(repo, sid, message="no-anc-c1") c2 = _commit(repo, sid, message="no-anc-c2") result = _mb(repo, c1, c2) assert result.exit_code == 0 assert "no common ancestor" in result.output def test_invalid_ref_errors(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid, message="inv-ref") result = _mb(repo, cid, "nonexistent-branch") assert result.exit_code == ExitCode.USER_ERROR def test_no_traceback_on_bad_ref(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _mb(repo, "bad", "refs") assert "Traceback" not in result.output def test_10_commit_chain(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) ids: list[str] = [] for i in range(10): parent = ids[i - 1] if i > 0 else None cid = _commit(repo, sid, message=f"chain-{i}", parent=parent) ids.append(cid) result = _mb(repo, "--json", ids[-1], ids[5]) assert result.exit_code == 0 data = json.loads(result.output) assert data["merge_base"] == ids[5] # =========================================================================== # snapshot-diff tests # =========================================================================== class TestSnapshotDiff: def test_identical_snapshots_zero_changes(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo, manifest={"a.py": _fake_oid(1)}) result = _sd(repo, "--json", sid, sid) assert result.exit_code == 0 data = json.loads(result.output) assert data["total_changes"] == 0 assert data["added"] == [] assert data["modified"] == [] assert data["deleted"] == [] def test_added_files(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sa = _snap(repo, manifest={}) sb = _snap(repo, manifest={"new.py": _fake_oid(1)}) data = json.loads(_sd(repo, "--json", sa, sb).output) assert len(data["added"]) == 1 assert data["added"][0]["path"] == "new.py" def test_deleted_files(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sa = _snap(repo, manifest={"old.py": _fake_oid(1)}) sb = _snap(repo, manifest={}) data = json.loads(_sd(repo, "--json", sa, sb).output) assert len(data["deleted"]) == 1 assert data["deleted"][0]["path"] == "old.py" def test_modified_files(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sa = _snap(repo, manifest={"main.py": _fake_oid(1)}) sb = _snap(repo, manifest={"main.py": _fake_oid(2)}) data = json.loads(_sd(repo, "--json", sa, sb).output) assert len(data["modified"]) == 1 assert data["modified"][0]["path"] == "main.py" def test_text_format_prefixes(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sa = _snap(repo, manifest={"old.py": _fake_oid(1)}) sb = _snap(repo, manifest={"new.py": _fake_oid(2)}) result = _sd(repo, sa, sb) assert result.exit_code == 0 assert "A new.py" in result.output assert "D old.py" in result.output def test_stat_flag_appends_summary(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sa = _snap(repo, manifest={}) sb = _snap(repo, manifest={"x.py": _fake_oid(1), "y.py": _fake_oid(2)}) result = _sd(repo, "--stat", sa, sb) assert "2 added" in result.output def test_commit_id_resolution(self, tmp_path: pathlib.Path) -> None: """snapshot-diff should accept a commit ID and resolve its snapshot.""" repo = _make_repo(tmp_path) sid = _snap(repo, manifest={"x.py": _fake_oid(1)}) cid = _commit(repo, sid, message="cid-res") data = json.loads(_sd(repo, "--json", sid, cid).output) assert data["total_changes"] == 0 def test_invalid_ref_errors(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _sd(repo, "notexist", "also-not") assert result.exit_code == ExitCode.USER_ERROR def test_no_traceback_on_bad_ref(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _sd(repo, "bad", "also-bad") assert "Traceback" not in result.output def test_ansi_in_paths_stripped_text_mode(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) malicious_path = "\x1b[31mmalicious.py\x1b[0m" sa = _snap(repo, manifest={}) sb = _snap(repo, manifest={malicious_path: _fake_oid(3)}) result = _sd(repo, sa, sb) assert result.exit_code == 0 assert "\x1b" not in result.output def test_50_path_manifest_diff(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) manifest_a = {f"src/file{i:03d}.py": _fake_oid(i) for i in range(50)} manifest_b = {f"src/file{i:03d}.py": _fake_oid(i + 100) for i in range(50)} sa = _snap(repo, manifest=manifest_a) sb = _snap(repo, manifest=manifest_b) data = json.loads(_sd(repo, "--json", sa, sb).output) assert data["total_changes"] == 50 assert len(data["modified"]) == 50 # =========================================================================== # Unit tests for private helpers # =========================================================================== class TestMergeBaseUnit: def test_resolve_ref_branch_name(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.merge_base import _resolve_ref repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid, message="branch-resolve", branch="main") _set_head(repo, "main", cid) result = _resolve_ref(repo, "main") assert result == cid def test_resolve_ref_head(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.merge_base import _resolve_ref repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid, message="head-resolve", branch="main") _set_head(repo, "main", cid) assert _resolve_ref(repo, "HEAD") == cid def test_resolve_ref_commit_id(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.merge_base import _resolve_ref repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid, message="cid-resolve", branch="main") assert _resolve_ref(repo, cid) == cid def test_resolve_ref_nonexistent_returns_none(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.merge_base import _resolve_ref repo = _make_repo(tmp_path) assert _resolve_ref(repo, f"deadbeef{'0' * 56}") is None def test_resolve_ref_invalid_hex_returns_none(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.merge_base import _resolve_ref repo = _make_repo(tmp_path) assert _resolve_ref(repo, "not-valid") is None class TestSnapshotDiffUnit: def test_added_entry_fields(self) -> None: from muse.cli.commands.snapshot_diff import _AddedEntry fields = set(_AddedEntry.__annotations__.keys()) assert "path" in fields assert "object_id" in fields def test_modified_entry_fields(self) -> None: from muse.cli.commands.snapshot_diff import _ModifiedEntry fields = set(_ModifiedEntry.__annotations__.keys()) assert "path" in fields assert "object_id_a" in fields assert "object_id_b" in fields def test_deleted_entry_fields(self) -> None: from muse.cli.commands.snapshot_diff import _DeletedEntry fields = set(_DeletedEntry.__annotations__.keys()) assert "path" in fields assert "object_id" in fields def test_diff_result_fields(self) -> None: from muse.cli.commands.snapshot_diff import _DiffResult fields = set(_DiffResult.__annotations__.keys()) assert "snapshot_a" in fields assert "snapshot_b" in fields assert "added" in fields assert "modified" in fields assert "deleted" in fields assert "total_changes" in fields def test_resolve_to_snapshot_id_branch(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_diff import _resolve_to_snapshot_id repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid, message="branch-snap-res", branch="main") _set_head(repo, "main", cid) result = _resolve_to_snapshot_id(repo, "main") assert result == sid def test_resolve_to_snapshot_id_head(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_diff import _resolve_to_snapshot_id repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid, message="head-snap-res", branch="main") _set_head(repo, "main", cid) result = _resolve_to_snapshot_id(repo, "HEAD") assert result == sid def test_resolve_to_snapshot_id_direct(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_diff import _resolve_to_snapshot_id repo = _make_repo(tmp_path) sid = _snap(repo) result = _resolve_to_snapshot_id(repo, sid) assert result == sid def test_resolve_to_snapshot_id_invalid_returns_none(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_diff import _resolve_to_snapshot_id repo = _make_repo(tmp_path) assert _resolve_to_snapshot_id(repo, "not-valid") is None def test_resolve_to_snapshot_id_missing_returns_none(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_diff import _resolve_to_snapshot_id repo = _make_repo(tmp_path) assert _resolve_to_snapshot_id(repo, f"ab{'0' * 62}") is None # =========================================================================== # Additional security & format tests # =========================================================================== class TestMergeBaseSecurity: def test_format_error_to_stderr(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) r = _mb(repo, "--format", "xml", "main", "dev") assert r.exit_code != 0 assert r.stdout_bytes == b"" assert r.stderr.strip() # some message emitted to stderr def test_no_traceback_on_bad_format(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) r = _mb(repo, "--format", "bad", "main", "dev") assert "Traceback" not in r.output def test_json_shorthand(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid, message="json-sh") _set_head(repo, "main", cid) r = _mb(repo, "--json", cid, cid) assert r.exit_code == 0 d = json.loads(r.output) assert d["merge_base"] == cid def test_200_sequential_merge_base_calls(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) c1 = _commit(repo, sid, message="seq-mb-c1") c2 = _commit(repo, sid, message="seq-mb-c2", parent=c1) _set_head(repo, "main", c2) for i in range(200): r = _mb(repo, c1, c2) assert r.exit_code == 0, f"failed at {i}" class TestSnapshotDiffSecurity: def test_format_error_to_stderr(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) r = _sd(repo, "--format", "xml", sid, sid) assert r.exit_code != 0 assert r.stdout_bytes == b"" assert "error" in r.stderr.lower() def test_no_traceback_on_bad_format(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) r = _sd(repo, "--format", "bad", sid, sid) assert "Traceback" not in r.output def test_json_shorthand(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sa = _snap(repo, manifest={"a.py": _fake_oid(1)}) sb = _snap(repo, manifest={"b.py": _fake_oid(2)}) r = _sd(repo, "--json", sa, sb) assert r.exit_code == 0 d = json.loads(r.output) assert "added" in d assert "deleted" in d assert "modified" in d def test_200_sequential_snapshot_diff_calls(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) for i in range(200): r = _sd(repo, sid, sid) assert r.exit_code == 0, f"failed at {i}"