"""Comprehensive supercharge tests for ``muse snapshot-diff``. Covers gaps in test_cmd_snapshot_diff.py: * JSON envelope — duration_ms / exit_code / added_count / modified_count / deleted_count on every successful JSON result * JSON schema completeness — all documented fields, correct types * Short prefix ID resolution — bare hex and sha256: both accepted * --only filter — restricts output to one category; suppressed lists are empty * --path-prefix filter — scopes diff to a subdirectory; counts are filtered * --only + --path-prefix combined * Batch mode (--stdin) with envelope fields per line * Security — ANSI injection in file paths sanitized in text output * Security — path traversal in path_prefix (no escape outside manifest keys) * Idempotency — diffing a snapshot against itself always yields zero changes * Symmetric diff — (A→B) and (B→A) produce complementary add/delete counts * Large manifest stress — 500-file diff completes and counts correctly * Concurrent batch stress — 10 threads each diffing independently * Empty snapshot edge cases — both empty, one empty * All-modified edge case — every file changed between snapshots * HEAD resolution — snapshot-diff HEAD HEAD produces zero changes * Commit ID resolution — snapshot-diff * _resolve_to_snapshot_id unit tests — branch / HEAD / snap_id / commit_id / bad * _compute_diff unit tests — only/path_prefix interaction """ from __future__ import annotations from collections.abc import Mapping import datetime import json import pathlib import threading import pytest from tests.cli_test_helper import CliRunner from muse.core.errors import ExitCode from muse.core.object_store import write_object from muse.core.paths import muse_dir, ref_path from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, blob_id, short_id cli = None # argparse migration — CliRunner ignores this arg runner = CliRunner() # --------------------------------------------------------------------------- # Shared helpers (identical to test_cmd_snapshot_diff.py — not imported to # keep each file self-contained) # --------------------------------------------------------------------------- def _init_repo(path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(path) (dot_muse / "commits").mkdir(parents=True) (dot_muse / "snapshots").mkdir(parents=True) (dot_muse / "objects").mkdir(parents=True) (dot_muse / "refs" / "heads").mkdir(parents=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": "supercharge-diff", "domain": "midi"}), encoding="utf-8" ) return path def _env(repo: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(repo)} def _obj(repo: pathlib.Path, content: bytes) -> str: oid = blob_id(content) write_object(repo, oid, content) return oid def _snap(repo: pathlib.Path, manifest: Manifest) -> str: sid = compute_snapshot_id(manifest) write_snapshot( repo, SnapshotRecord( snapshot_id=sid, manifest=manifest, created_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc), ), ) return sid def _commit(repo: pathlib.Path, tag: str, sid: str, branch: str = "main") -> str: committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) cid = compute_commit_id( parent_ids=[], snapshot_id=sid, message=tag, committed_at_iso=committed_at.isoformat(), author="tester",) write_commit( repo, CommitRecord( commit_id=cid, branch=branch, snapshot_id=sid, message=tag, committed_at=committed_at, author="tester", parent_commit_id=None, ), ) branch_ref = ref_path(repo, branch) branch_ref.write_text(cid, encoding="utf-8") return cid # --------------------------------------------------------------------------- # JSON envelope — duration_ms / exit_code / per-category counts # --------------------------------------------------------------------------- class TestJsonEnvelope: """The JSON result must include duration_ms, exit_code, and per-category counts.""" def test_duration_ms_present(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, {"f.mid": _obj(repo, b"x")}) result = runner.invoke(cli, ["snapshot-diff", "--json", sid, sid], env=_env(repo)) assert result.exit_code == 0 data = json.loads(result.stdout) assert "duration_ms" in data assert isinstance(data["duration_ms"], (int, float)) assert data["duration_ms"] >= 0 def test_exit_code_zero_on_success(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, {}) result = runner.invoke(cli, ["snapshot-diff", "--json", sid, sid], env=_env(repo)) data = json.loads(result.stdout) assert data["exit_code"] == 0 def test_added_count_matches_list_length(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a = _snap(repo, {}) sid_b = _snap(repo, { "a.mid": _obj(repo, b"a"), "b.mid": _obj(repo, b"b"), "c.mid": _obj(repo, b"c"), }) result = runner.invoke(cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo)) data = json.loads(result.stdout) assert data["added_count"] == 3 assert data["added_count"] == len(data["added"]) def test_modified_count_matches_list_length(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) v1 = _obj(repo, b"v1") v2 = _obj(repo, b"v2") sid_a = _snap(repo, {"t.mid": v1, "u.mid": v1}) sid_b = _snap(repo, {"t.mid": v2, "u.mid": v2}) result = runner.invoke(cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo)) data = json.loads(result.stdout) assert data["modified_count"] == 2 assert data["modified_count"] == len(data["modified"]) def test_deleted_count_matches_list_length(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid = _obj(repo, b"gone") sid_a = _snap(repo, {"x.mid": oid, "y.mid": oid}) sid_b = _snap(repo, {}) result = runner.invoke(cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo)) data = json.loads(result.stdout) assert data["deleted_count"] == 2 assert data["deleted_count"] == len(data["deleted"]) def test_total_changes_equals_sum_of_counts(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) v1 = _obj(repo, b"v1") v2 = _obj(repo, b"v2") sid_a = _snap(repo, {"gone.mid": v1, "same.mid": v1, "changed.mid": v1}) sid_b = _snap(repo, {"new.mid": v2, "same.mid": v1, "changed.mid": v2}) result = runner.invoke(cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo)) data = json.loads(result.stdout) assert data["total_changes"] == ( data["added_count"] + data["modified_count"] + data["deleted_count"] ) assert data["total_changes"] == 3 # 1 added, 1 modified, 1 deleted # --------------------------------------------------------------------------- # JSON schema completeness # --------------------------------------------------------------------------- class TestJsonSchema: """All documented fields must be present with correct types.""" def test_all_fields_present(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) v1 = _obj(repo, b"v1") v2 = _obj(repo, b"v2") sid_a = _snap(repo, {"a.mid": v1, "b.mid": v1}) sid_b = _snap(repo, {"b.mid": v2, "c.mid": v2}) result = runner.invoke(cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo)) assert result.exit_code == 0 data = json.loads(result.stdout) for field in ( "snapshot_a", "snapshot_b", "added", "modified", "deleted", "added_count", "modified_count", "deleted_count", "total_changes", "duration_ms", "exit_code", ): assert field in data, f"Missing field: {field}" def test_snapshot_ids_are_sha256_prefixed(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, {}) result = runner.invoke(cli, ["snapshot-diff", "--json", sid, sid], env=_env(repo)) data = json.loads(result.stdout) assert data["snapshot_a"].startswith("sha256:") assert data["snapshot_b"].startswith("sha256:") def test_added_entry_schema(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid = _obj(repo, b"new") sid_a = _snap(repo, {}) sid_b = _snap(repo, {"new.mid": oid}) result = runner.invoke(cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo)) data = json.loads(result.stdout) entry = data["added"][0] assert isinstance(entry["path"], str) assert isinstance(entry["object_id"], str) def test_modified_entry_schema(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) v1 = _obj(repo, b"v1") v2 = _obj(repo, b"v2") sid_a = _snap(repo, {"t.mid": v1}) sid_b = _snap(repo, {"t.mid": v2}) result = runner.invoke(cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo)) data = json.loads(result.stdout) entry = data["modified"][0] assert isinstance(entry["path"], str) assert isinstance(entry["object_id_a"], str) assert isinstance(entry["object_id_b"], str) assert entry["object_id_a"] != entry["object_id_b"] def test_deleted_entry_schema(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid = _obj(repo, b"gone") sid_a = _snap(repo, {"gone.mid": oid}) sid_b = _snap(repo, {}) result = runner.invoke(cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo)) data = json.loads(result.stdout) entry = data["deleted"][0] assert isinstance(entry["path"], str) assert isinstance(entry["object_id"], str) def test_counts_are_integers(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, {}) result = runner.invoke(cli, ["snapshot-diff", "--json", sid, sid], env=_env(repo)) data = json.loads(result.stdout) assert isinstance(data["added_count"], int) assert isinstance(data["modified_count"], int) assert isinstance(data["deleted_count"], int) assert isinstance(data["total_changes"], int) # --------------------------------------------------------------------------- # --only filter # --------------------------------------------------------------------------- class TestOnlyFilter: """--only restricts output to one category; suppressed lists are empty.""" def _mixed_diff(self, repo: pathlib.Path) -> tuple[str, str]: v1 = _obj(repo, b"v1") v2 = _obj(repo, b"v2") sid_a = _snap(repo, {"gone.mid": v1, "same.mid": v1, "changed.mid": v1}) sid_b = _snap(repo, {"new.mid": v2, "same.mid": v1, "changed.mid": v2}) return sid_a, sid_b def test_only_added_suppresses_modified_and_deleted(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a, sid_b = self._mixed_diff(repo) result = runner.invoke(cli, ["snapshot-diff", "--json", "--only", "added", sid_a, sid_b], env=_env(repo)) assert result.exit_code == 0 data = json.loads(result.stdout) assert data["added_count"] >= 1 assert data["modified_count"] == 0 assert data["deleted_count"] == 0 assert data["modified"] == [] assert data["deleted"] == [] def test_only_modified_suppresses_added_and_deleted(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a, sid_b = self._mixed_diff(repo) result = runner.invoke(cli, ["snapshot-diff", "--json", "--only", "modified", sid_a, sid_b], env=_env(repo)) data = json.loads(result.stdout) assert data["modified_count"] >= 1 assert data["added_count"] == 0 assert data["deleted_count"] == 0 def test_only_deleted_suppresses_added_and_modified(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a, sid_b = self._mixed_diff(repo) result = runner.invoke(cli, ["snapshot-diff", "--json", "--only", "deleted", sid_a, sid_b], env=_env(repo)) data = json.loads(result.stdout) assert data["deleted_count"] >= 1 assert data["added_count"] == 0 assert data["modified_count"] == 0 def test_only_added_total_changes_reflects_filter(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a, sid_b = self._mixed_diff(repo) result = runner.invoke(cli, ["snapshot-diff", "--json", "--only", "added", sid_a, sid_b], env=_env(repo)) data = json.loads(result.stdout) assert data["total_changes"] == data["added_count"] def test_only_text_mode_added(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a, sid_b = self._mixed_diff(repo) result = runner.invoke( cli, ["snapshot-diff", "--only", "added", sid_a, sid_b], env=_env(repo), ) assert result.exit_code == 0 assert "A " in result.stdout assert "M " not in result.stdout assert "D " not in result.stdout def test_only_text_mode_deleted(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a, sid_b = self._mixed_diff(repo) result = runner.invoke( cli, ["snapshot-diff", "--only", "deleted", sid_a, sid_b], env=_env(repo), ) assert result.exit_code == 0 assert "D " in result.stdout assert "A " not in result.stdout assert "M " not in result.stdout def test_only_invalid_value_rejected(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, {}) result = runner.invoke( cli, ["snapshot-diff", "--only", "unchanged", sid, sid], env=_env(repo) ) assert result.exit_code != 0 def test_only_short_flag(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a, sid_b = self._mixed_diff(repo) result = runner.invoke(cli, ["snapshot-diff", "--json", "--only", "added", sid_a, sid_b], env=_env(repo)) assert result.exit_code == 0 data = json.loads(result.stdout) assert data["modified"] == [] assert data["deleted"] == [] # --------------------------------------------------------------------------- # --path-prefix filter # --------------------------------------------------------------------------- class TestPathPrefixFilter: """--path-prefix scopes the diff to a subdirectory.""" def _multi_dir_diff(self, repo: pathlib.Path) -> tuple[str, str]: v1 = _obj(repo, b"v1") v2 = _obj(repo, b"v2") sid_a = _snap(repo, { "src/a.mid": v1, "src/b.mid": v1, "docs/guide.md": v1, }) sid_b = _snap(repo, { "src/a.mid": v2, # modified "src/c.mid": v2, # added "docs/guide.md": v1, # unchanged }) return sid_a, sid_b def test_prefix_scopes_to_src(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a, sid_b = self._multi_dir_diff(repo) result = runner.invoke( cli, ["snapshot-diff", "--json", "--path-prefix", "src/", sid_a, sid_b], env=_env(repo) ) assert result.exit_code == 0 data = json.loads(result.stdout) all_paths = ( [e["path"] for e in data["added"]] + [e["path"] for e in data["modified"]] + [e["path"] for e in data["deleted"]] ) assert all(p.startswith("src/") for p in all_paths), all_paths def test_prefix_excludes_docs(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a, sid_b = self._multi_dir_diff(repo) result = runner.invoke( cli, ["snapshot-diff", "--json", "--path-prefix", "src/", sid_a, sid_b], env=_env(repo) ) data = json.loads(result.stdout) all_paths = ( [e["path"] for e in data["added"]] + [e["path"] for e in data["modified"]] + [e["path"] for e in data["deleted"]] ) assert not any(p.startswith("docs/") for p in all_paths) def test_prefix_counts_are_filtered(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a, sid_b = self._multi_dir_diff(repo) # Full diff full = json.loads(runner.invoke(cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo)).stdout) # Scoped to src/ scoped = json.loads(runner.invoke( cli, ["snapshot-diff", "--json", "--path-prefix", "src/", sid_a, sid_b], env=_env(repo) ).stdout) # src/ diff should have fewer total_changes than the full diff assert scoped["total_changes"] <= full["total_changes"] def test_nonmatching_prefix_yields_zero_changes(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a, sid_b = self._multi_dir_diff(repo) result = runner.invoke( cli, ["snapshot-diff", "--json", "--path-prefix", "nonexistent/", sid_a, sid_b], env=_env(repo) ) data = json.loads(result.stdout) assert data["total_changes"] == 0 def test_prefix_and_only_combined(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a, sid_b = self._multi_dir_diff(repo) result = runner.invoke( cli, ["snapshot-diff", "--json", "--path-prefix", "src/", "--only", "added", sid_a, sid_b], env=_env(repo), ) data = json.loads(result.stdout) assert data["modified"] == [] assert data["deleted"] == [] assert all(e["path"].startswith("src/") for e in data["added"]) # --------------------------------------------------------------------------- # Batch mode (--stdin) with envelope # --------------------------------------------------------------------------- class TestStdinEnvelope: """Batch mode results must include duration_ms/exit_code/count fields.""" def test_batch_json_has_duration_ms(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, {}) stdin = f"{sid} {sid}\n" result = runner.invoke(cli, ["snapshot-diff", "--json", "--stdin"], env=_env(repo), input=stdin) assert result.exit_code == 0 data = json.loads(result.stdout.strip()) assert "duration_ms" in data assert isinstance(data["duration_ms"], (int, float)) def test_batch_json_has_count_fields(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid = _obj(repo, b"x") sid_a = _snap(repo, {"a.mid": oid}) sid_b = _snap(repo, {}) stdin = f"{sid_a} {sid_b}\n" result = runner.invoke(cli, ["snapshot-diff", "--json", "--stdin"], env=_env(repo), input=stdin) data = json.loads(result.stdout.strip()) assert "added_count" in data assert "modified_count" in data assert "deleted_count" in data assert data["deleted_count"] == 1 def test_batch_with_only_filter(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) v1 = _obj(repo, b"v1") v2 = _obj(repo, b"v2") sid_a = _snap(repo, {"gone.mid": v1, "changed.mid": v1}) sid_b = _snap(repo, {"new.mid": v2, "changed.mid": v2}) stdin = f"{sid_a} {sid_b}\n" result = runner.invoke( cli, ["snapshot-diff", "--json", "--stdin", "--only", "added"], env=_env(repo), input=stdin ) data = json.loads(result.stdout.strip()) assert data["modified"] == [] assert data["deleted"] == [] def test_batch_with_path_prefix(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid = _obj(repo, b"x") sid_a = _snap(repo, {"src/a.mid": oid, "docs/b.mid": oid}) sid_b = _snap(repo, {}) stdin = f"{sid_a} {sid_b}\n" result = runner.invoke( cli, ["snapshot-diff", "--json", "--stdin", "--path-prefix", "src/"], env=_env(repo), input=stdin ) data = json.loads(result.stdout.strip()) assert all(e["path"].startswith("src/") for e in data["deleted"]) # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestSecurity: def test_ansi_in_path_sanitized_in_text_output(self, tmp_path: pathlib.Path) -> None: """File paths with ANSI escapes must be sanitized in text output.""" repo = _init_repo(tmp_path) malicious_path = "\x1b[31msrc/malicious.mid\x1b[0m" oid = _obj(repo, b"malicious") sid_a = _snap(repo, {}) sid_b = _snap(repo, {malicious_path: oid}) result = runner.invoke( cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo) ) assert result.exit_code == 0 assert "\x1b" not in result.stdout def test_ansi_in_path_not_sanitized_in_json(self, tmp_path: pathlib.Path) -> None: """JSON output preserves raw path strings — callers must sanitize for display.""" repo = _init_repo(tmp_path) malicious_path = "\x1b[31mmalicious.mid\x1b[0m" oid = _obj(repo, b"content") sid_a = _snap(repo, {}) sid_b = _snap(repo, {malicious_path: oid}) result = runner.invoke(cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo)) assert result.exit_code == 0 data = json.loads(result.stdout) # JSON preserves the raw path for programmatic use. assert data["added"][0]["path"] == malicious_path def test_path_prefix_cannot_escape_manifest(self, tmp_path: pathlib.Path) -> None: """A crafted --path-prefix with ../ cannot expose paths outside the filter.""" repo = _init_repo(tmp_path) oid = _obj(repo, b"safe") sid_a = _snap(repo, {"safe/file.mid": oid}) sid_b = _snap(repo, {}) # path_prefix is applied as a startswith filter against manifest keys — # "../" will simply not match any key, so zero changes are returned. result = runner.invoke( cli, ["snapshot-diff", "--json", "--path-prefix", "../", sid_a, sid_b], env=_env(repo) ) assert result.exit_code == 0 data = json.loads(result.stdout) assert data["total_changes"] == 0 # --------------------------------------------------------------------------- # Idempotency and symmetry # --------------------------------------------------------------------------- class TestDiffProperties: def test_self_diff_always_zero(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid = _obj(repo, b"content") sid = _snap(repo, {"a.mid": oid, "b.mid": oid}) result = runner.invoke(cli, ["snapshot-diff", "--json", sid, sid], env=_env(repo)) data = json.loads(result.stdout) assert data["total_changes"] == 0 assert data["added"] == [] assert data["modified"] == [] assert data["deleted"] == [] def test_symmetric_add_delete_counts(self, tmp_path: pathlib.Path) -> None: """A→B adds N files; B→A deletes N files.""" repo = _init_repo(tmp_path) oid_a = _obj(repo, b"a") oid_b = _obj(repo, b"b") sid_a = _snap(repo, {"x.mid": oid_a}) sid_b = _snap(repo, {"y.mid": oid_b}) fwd = json.loads(runner.invoke(cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo)).stdout) rev = json.loads(runner.invoke(cli, ["snapshot-diff", "--json", sid_b, sid_a], env=_env(repo)).stdout) assert fwd["added_count"] == rev["deleted_count"] assert fwd["deleted_count"] == rev["added_count"] def test_all_modified_no_add_delete(self, tmp_path: pathlib.Path) -> None: """Same paths, all different OIDs → modified only, zero added/deleted.""" repo = _init_repo(tmp_path) n = 10 manifest_a = {f"track_{i}.mid": _obj(repo, f"v1_{i}".encode()) for i in range(n)} manifest_b = {f"track_{i}.mid": _obj(repo, f"v2_{i}".encode()) for i in range(n)} sid_a = _snap(repo, manifest_a) sid_b = _snap(repo, manifest_b) result = runner.invoke(cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo)) data = json.loads(result.stdout) assert data["modified_count"] == n assert data["added_count"] == 0 assert data["deleted_count"] == 0 assert data["total_changes"] == n def test_both_empty_zero_changes(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a = _snap(repo, {}) sid_b = _snap(repo, {}) result = runner.invoke(cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo)) data = json.loads(result.stdout) assert data["total_changes"] == 0 def test_a_empty_all_added(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid = _obj(repo, b"content") sid_a = _snap(repo, {}) sid_b = _snap(repo, {"a.mid": oid, "b.mid": oid, "c.mid": oid}) result = runner.invoke(cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo)) data = json.loads(result.stdout) assert data["added_count"] == 3 assert data["modified_count"] == 0 assert data["deleted_count"] == 0 def test_b_empty_all_deleted(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid = _obj(repo, b"content") sid_a = _snap(repo, {"a.mid": oid, "b.mid": oid}) sid_b = _snap(repo, {}) result = runner.invoke(cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo)) data = json.loads(result.stdout) assert data["deleted_count"] == 2 assert data["added_count"] == 0 assert data["modified_count"] == 0 # --------------------------------------------------------------------------- # Resolution — HEAD / commit ID / branch # --------------------------------------------------------------------------- class TestResolution: def test_head_vs_head_zero_changes(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, {"f.mid": _obj(repo, b"x")}) _commit(repo, "init", sid, branch="main") result = runner.invoke(cli, ["snapshot-diff", "--json", "HEAD", "HEAD"], env=_env(repo)) assert result.exit_code == 0 data = json.loads(result.stdout) assert data["total_changes"] == 0 def test_commit_id_resolution(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid_a = _obj(repo, b"v1") oid_b = _obj(repo, b"v2") sid_a = _snap(repo, {"f.mid": oid_a}) sid_b = _snap(repo, {"f.mid": oid_b}) cid_a = _commit(repo, "cmt-a", sid_a, branch="main") cid_b = _commit(repo, "cmt-b", sid_b, branch="dev") result = runner.invoke(cli, ["snapshot-diff", "--json", cid_a, cid_b], env=_env(repo)) assert result.exit_code == 0 data = json.loads(result.stdout) assert data["modified_count"] == 1 def test_branch_vs_snapshot_id(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid = _obj(repo, b"x") sid_a = _snap(repo, {"f.mid": oid}) sid_b = _snap(repo, {}) _commit(repo, "cmt", sid_a, branch="main") result = runner.invoke(cli, ["snapshot-diff", "--json", "main", sid_b], env=_env(repo)) assert result.exit_code == 0 data = json.loads(result.stdout) assert data["deleted_count"] == 1 # --------------------------------------------------------------------------- # _resolve_to_snapshot_id unit # --------------------------------------------------------------------------- class TestResolveToSnapshotId: def test_resolves_snapshot_id_directly(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_diff import _resolve_to_snapshot_id repo = _init_repo(tmp_path) sid = _snap(repo, {}) resolved = _resolve_to_snapshot_id(repo, sid) assert resolved == sid def test_resolves_branch_name(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_diff import _resolve_to_snapshot_id repo = _init_repo(tmp_path) sid = _snap(repo, {"f.mid": _obj(repo, b"x")}) _commit(repo, "cmt", sid, branch="feature") resolved = _resolve_to_snapshot_id(repo, "feature") assert resolved == sid def test_resolves_head(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_diff import _resolve_to_snapshot_id repo = _init_repo(tmp_path) sid = _snap(repo, {}) _commit(repo, "cmt", sid, branch="main") resolved = _resolve_to_snapshot_id(repo, "HEAD") assert resolved == sid def test_head_case_insensitive(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_diff import _resolve_to_snapshot_id repo = _init_repo(tmp_path) sid = _snap(repo, {}) _commit(repo, "cmt", sid, branch="main") assert _resolve_to_snapshot_id(repo, "head") == sid assert _resolve_to_snapshot_id(repo, "Head") == sid def test_returns_none_for_unknown_branch(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_diff import _resolve_to_snapshot_id repo = _init_repo(tmp_path) assert _resolve_to_snapshot_id(repo, "no-such-branch") is None def test_returns_none_for_bad_ref(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_diff import _resolve_to_snapshot_id repo = _init_repo(tmp_path) assert _resolve_to_snapshot_id(repo, "not-an-id-at-all") is None def test_returns_none_for_head_with_no_commits(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_diff import _resolve_to_snapshot_id repo = _init_repo(tmp_path) # No commits written — HEAD cannot be resolved. assert _resolve_to_snapshot_id(repo, "HEAD") is None # --------------------------------------------------------------------------- # _compute_diff unit # --------------------------------------------------------------------------- class TestComputeDiff: def test_only_filter_zeroes_suppressed_lists(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_diff import _compute_diff repo = _init_repo(tmp_path) v1 = _obj(repo, b"v1") v2 = _obj(repo, b"v2") sid_a = _snap(repo, {"gone.mid": v1, "changed.mid": v1}) sid_b = _snap(repo, {"new.mid": v2, "changed.mid": v2}) result = _compute_diff(repo, sid_a, sid_b, only="added") assert result["modified"] == [] assert result["deleted"] == [] assert len(result["added"]) == 1 def test_path_prefix_filters_entries(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_diff import _compute_diff repo = _init_repo(tmp_path) oid = _obj(repo, b"x") sid_a = _snap(repo, {"src/a.mid": oid, "docs/b.mid": oid}) sid_b = _snap(repo, {}) result = _compute_diff(repo, sid_a, sid_b, path_prefix="src/") assert all(e["path"].startswith("src/") for e in result["deleted"]) assert result["deleted_count"] == 1 def test_error_on_bad_ref(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_diff import _compute_diff repo = _init_repo(tmp_path) result = _compute_diff(repo, "bad-ref", "also-bad") assert "error" in result # --------------------------------------------------------------------------- # Large manifest stress # --------------------------------------------------------------------------- class TestLargeManifestStress: def test_500_file_diff_counts_correctly(self, tmp_path: pathlib.Path) -> None: """500 adds, 250 modifies, 250 deletes — counts must be exact.""" repo = _init_repo(tmp_path) n = 500 # Build manifest A: 500 files (first 250 will be deleted, 250 will be modified) manifest_a: Manifest = {} for i in range(n): manifest_a[f"track_{i:04d}.mid"] = _obj(repo, f"v1_{i}".encode()) # Build manifest B: keep 250 modified + add 500 new manifest_b: Manifest = {} for i in range(250): manifest_b[f"track_{i:04d}.mid"] = _obj(repo, f"v2_{i}".encode()) # modified for i in range(250, n): manifest_b[f"track_{i:04d}.mid"] = manifest_a[f"track_{i:04d}.mid"] # unchanged (kept same OID) for i in range(n): manifest_b[f"new_{i:04d}.mid"] = _obj(repo, f"new_{i}".encode()) # added sid_a = _snap(repo, manifest_a) sid_b = _snap(repo, manifest_b) result = runner.invoke(cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo)) assert result.exit_code == 0 data = json.loads(result.stdout) assert data["added_count"] == n # 500 new files assert data["modified_count"] == 250 # first 250 modified assert data["deleted_count"] == 0 # none deleted (250 unchanged kept same OID) assert data["total_changes"] == n + 250 # --------------------------------------------------------------------------- # Concurrent stress # --------------------------------------------------------------------------- class TestConcurrentStress: def test_10_threads_diff_independently(self, tmp_path: pathlib.Path) -> None: """10 threads diffing the same pair concurrently must all succeed.""" repo = _init_repo(tmp_path) v1 = _obj(repo, b"v1") v2 = _obj(repo, b"v2") sid_a = _snap(repo, {"f.mid": v1}) sid_b = _snap(repo, {"f.mid": v2}) errors: list[str] = [] results: list[dict] = [] lock = threading.Lock() def _diff() -> None: r = runner.invoke(cli, ["snapshot-diff", "--json", sid_a, sid_b], env=_env(repo)) with lock: if r.exit_code != 0: errors.append(r.output) else: results.append(json.loads(r.stdout)) threads = [threading.Thread(target=_diff) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() assert not errors, errors assert len(results) == 10 for r in results: assert r["modified_count"] == 1 assert r["exit_code"] == 0 # --------------------------------------------------------------------------- # Short prefix ID resolution # --------------------------------------------------------------------------- class TestPrefixIdResolution: """snapshot-diff must accept short hex prefixes, mirroring snapshot read.""" def test_bare_hex_prefix_rejected(self, tmp_path: pathlib.Path) -> None: """Bare hex prefix (no sha256: type tag) must be rejected at the CLI boundary.""" repo = _init_repo(tmp_path) oid = _obj(repo, b"x") sid_a = _snap(repo, {"f.mid": oid}) sid_b = _snap(repo, {}) # Strip "sha256:" — bare hex must be rejected, not resolved. prefix_a = sid_a[len("sha256:"):len("sha256:") + 12] prefix_b = sid_b[len("sha256:"):len("sha256:") + 12] result = runner.invoke(cli, ["snapshot-diff", prefix_a, prefix_b], env=_env(repo)) assert result.exit_code != 0, "bare hex must be rejected, not resolved" def test_sha256_prefixed_short_id_resolves(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid = _obj(repo, b"y") sid_a = _snap(repo, {}) sid_b = _snap(repo, {"g.mid": oid}) # Keep the "sha256:" prefix but truncate the hex portion. short_b = sid_b[:len("sha256:") + 16] result = runner.invoke(cli, ["snapshot-diff", "--json", sid_a, short_b], env=_env(repo)) assert result.exit_code == 0, result.output data = json.loads(result.stdout) assert data["added_count"] == 1 def test_full_id_still_resolves(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, {}) result = runner.invoke(cli, ["snapshot-diff", "--json", sid, sid], env=_env(repo)) assert result.exit_code == 0 data = json.loads(result.stdout) assert data["total_changes"] == 0 def test_prefix_resolves_correct_snapshot(self, tmp_path: pathlib.Path) -> None: """sha256:-prefixed short IDs resolve to the correct full snapshot IDs.""" repo = _init_repo(tmp_path) oid_a = _obj(repo, b"v_a") oid_b = _obj(repo, b"v_b") sid_a = _snap(repo, {"a.mid": oid_a}) sid_b = _snap(repo, {"b.mid": oid_b}) # Short prefix must carry the sha256: type tag. prefix_a = short_id(sid_a) prefix_b = short_id(sid_b) result = runner.invoke(cli, ["snapshot-diff", "--json", prefix_a, prefix_b], env=_env(repo)) assert result.exit_code == 0 data = json.loads(result.stdout) # snapshot_a and snapshot_b in output must be the full resolved IDs. assert data["snapshot_a"] == sid_a assert data["snapshot_b"] == sid_b def test_nonexistent_prefix_returns_error(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, {}) result = runner.invoke(cli, ["snapshot-diff", "000000000000", sid], env=_env(repo)) assert result.exit_code != 0 def test_resolve_snapshot_prefix_unit(self, tmp_path: pathlib.Path) -> None: """Bare hex (no sha256:) must return None — rejected at the function level.""" from muse.cli.commands.snapshot_diff import _resolve_snapshot_prefix repo = _init_repo(tmp_path) sid = _snap(repo, {"f.mid": _obj(repo, b"x")}) bare_prefix = sid[len("sha256:"):len("sha256:") + 10] resolved = _resolve_snapshot_prefix(repo, bare_prefix) assert resolved is None, "bare hex must not resolve — sha256: prefix required" def test_resolve_snapshot_prefix_with_sha256_prefix_unit(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_diff import _resolve_snapshot_prefix repo = _init_repo(tmp_path) sid = _snap(repo, {}) short = sid[:len("sha256:") + 8] resolved = _resolve_snapshot_prefix(repo, short) assert resolved == sid def test_resolve_snapshot_prefix_returns_none_for_no_match(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_diff import _resolve_snapshot_prefix repo = _init_repo(tmp_path) assert _resolve_snapshot_prefix(repo, "000000000000") is None def test_prefix_in_batch_stdin_mode(self, tmp_path: pathlib.Path) -> None: """sha256:-prefixed short IDs must resolve correctly in batch stdin mode.""" repo = _init_repo(tmp_path) oid = _obj(repo, b"batch") sid_a = _snap(repo, {"f.mid": oid}) sid_b = _snap(repo, {}) # Short prefixes must carry the sha256: type tag. prefix_a = short_id(sid_a) prefix_b = short_id(sid_b) stdin = f"{prefix_a} {prefix_b}\n" result = runner.invoke(cli, ["snapshot-diff", "--json", "--stdin"], env=_env(repo), input=stdin) assert result.exit_code == 0 data = json.loads(result.stdout.strip()) assert data["deleted_count"] == 1