"""Comprehensive tests for ``muse diff``. Coverage tiers -------------- Unit — parser flags, _classify_patch_op, _op_category, _filter_manifest, _use_color, dead-code removal. Integration — HEAD vs working tree, staged, unstaged, two-commit diff, path filtering, --stat, --text, added/deleted/modified counts. End-to-end — CLI invocations: text and JSON output, --exit-code, --json. Security — ANSI injection in paths, commit refs. Stress — 500-file repos, many changes, concurrent reads. """ from __future__ import annotations import json import os import pathlib import subprocess import threading import time from collections.abc import Mapping from typing import TYPE_CHECKING import pytest from tests.cli_test_helper import CliRunner, InvokeResult if TYPE_CHECKING: import argparse from muse.domain import DeleteOp, DomainOp, InsertOp, MoveOp, PatchOp, ReplaceOp runner = CliRunner() # ────────────────────────────────────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────────────────────────────────────── def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult: saved = os.getcwd() try: os.chdir(repo) return runner.invoke(None, args) finally: os.chdir(saved) def _diff(repo: pathlib.Path, *extra: str) -> InvokeResult: return _invoke(repo, ["diff", *extra]) def _commit(repo: pathlib.Path, msg: str) -> InvokeResult: _invoke(repo, ["code", "add", "."]) return _invoke(repo, ["commit", "-m", msg]) @pytest.fixture() def repo(tmp_path: pathlib.Path) -> pathlib.Path: """Initialised repo with one tracked file and one commit.""" saved = os.getcwd() try: os.chdir(tmp_path) runner.invoke(None, ["init"]) finally: os.chdir(saved) (tmp_path / "a.py").write_text("x = 1\n") _commit(tmp_path, "first") return tmp_path # ────────────────────────────────────────────────────────────────────────────── # Unit — parser flags # ────────────────────────────────────────────────────────────────────────────── class TestRegisterFlags: def _parse(self, *args: str) -> "argparse.Namespace": import argparse from muse.cli.commands.diff import register p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["diff", *args]) def test_default_json_out_is_false(self) -> None: ns = self._parse() assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = self._parse("--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = self._parse("-j") assert ns.json_out is True def test_exit_code_long_flag(self) -> None: ns = self._parse("--exit-code") assert ns.exit_code is True def test_exit_code_short_flag(self) -> None: ns = self._parse("-z") assert ns.exit_code is True def test_exit_code_default_false(self) -> None: ns = self._parse() assert ns.exit_code is False def test_staged_flag(self) -> None: ns = self._parse("--staged") assert ns.staged is True def test_unstaged_flag(self) -> None: ns = self._parse("--unstaged") assert ns.unstaged is True def test_stat_flag(self) -> None: ns = self._parse("--stat") assert ns.stat is True def test_text_flag(self) -> None: ns = self._parse("--text") assert ns.text is True def test_path_flag(self) -> None: ns = self._parse("-p", "foo.py") assert "foo.py" in ns.paths def test_path_flag_repeatable(self) -> None: ns = self._parse("-p", "a.py", "-p", "b.py") assert "a.py" in ns.paths and "b.py" in ns.paths # ────────────────────────────────────────────────────────────────────────────── # Unit — dead-code removal # ────────────────────────────────────────────────────────────────────────────── class TestDeadCodeRemoved: def test_read_branch_removed(self) -> None: import muse.cli.commands.diff as m assert not hasattr(m, "_read_branch"), ( "_read_branch was a dead wrapper; it should have been deleted" ) # ────────────────────────────────────────────────────────────────────────────── # Unit — _filter_manifest # ────────────────────────────────────────────────────────────────────────────── class TestFilterManifest: def test_empty_paths_returns_all(self) -> None: from muse.cli.commands.diff import _filter_manifest m = {"a.py": "oid1", "b.py": "oid2"} assert _filter_manifest(m, []) == m def test_exact_file_match(self) -> None: from muse.cli.commands.diff import _filter_manifest m = {"a.py": "oid1", "b.py": "oid2"} result = _filter_manifest(m, ["a.py"]) assert result == {"a.py": "oid1"} def test_directory_prefix_match(self) -> None: from muse.cli.commands.diff import _filter_manifest m = { "src/foo.py": "oid1", "src/bar.py": "oid2", "tests/test_foo.py": "oid3", } result = _filter_manifest(m, ["src"]) assert set(result) == {"src/foo.py", "src/bar.py"} def test_trailing_slash_normalised(self) -> None: from muse.cli.commands.diff import _filter_manifest m = {"src/foo.py": "oid1", "other.py": "oid2"} assert _filter_manifest(m, ["src/"]) == {"src/foo.py": "oid1"} def test_multiple_paths(self) -> None: from muse.cli.commands.diff import _filter_manifest m = {"a.py": "oid1", "b.py": "oid2", "c.py": "oid3"} result = _filter_manifest(m, ["a.py", "c.py"]) assert set(result) == {"a.py", "c.py"} def test_no_match_returns_empty(self) -> None: from muse.cli.commands.diff import _filter_manifest m = {"a.py": "oid1"} assert _filter_manifest(m, ["z.py"]) == {} # ────────────────────────────────────────────────────────────────────────────── # Unit — _use_color # ────────────────────────────────────────────────────────────────────────────── class TestUseColor: def test_no_color_env_disables_color( self, monkeypatch: pytest.MonkeyPatch ) -> None: from muse.cli.commands.diff import _use_color monkeypatch.setenv("NO_COLOR", "1") assert _use_color() is False def test_dumb_term_disables_color( self, monkeypatch: pytest.MonkeyPatch ) -> None: from muse.cli.commands.diff import _use_color monkeypatch.setenv("TERM", "dumb") assert _use_color() is False def test_no_color_env_unset_does_not_force_color( self, monkeypatch: pytest.MonkeyPatch ) -> None: from muse.cli.commands.diff import _use_color monkeypatch.delenv("NO_COLOR", raising=False) monkeypatch.delenv("TERM", raising=False) # stdout is not a TTY in test; just verify the function returns a bool assert isinstance(_use_color(), bool) # ────────────────────────────────────────────────────────────────────────────── # Integration — HEAD vs working tree # ────────────────────────────────────────────────────────────────────────────── class TestHeadVsWorkingTree: def test_clean_tree_exits_0(self, repo: pathlib.Path) -> None: result = _diff(repo) assert result.exit_code == 0 assert "No differences" in result.output def test_modified_file_detected(self, repo: pathlib.Path) -> None: (repo / "a.py").write_text("x = 99\n") result = _diff(repo) assert result.exit_code == 0 assert "a.py" in result.output def test_added_file_detected(self, repo: pathlib.Path) -> None: (repo / "new.py").write_text("z = 0\n") result = _diff(repo) assert "new.py" in result.output def test_deleted_file_detected(self, repo: pathlib.Path) -> None: (repo / "b.py").write_text("b = 1\n") _commit(repo, "add b") (repo / "b.py").unlink() result = _diff(repo) assert "b.py" in result.output # ────────────────────────────────────────────────────────────────────────────── # Integration — JSON schema (including critical bug fix) # ────────────────────────────────────────────────────────────────────────────── class TestJsonSchema: """All keys agents depend on must be present.""" REQUIRED_KEYS = { "from_ref", "to_ref", "from_commit_id", "to_commit_id", "has_changes", "summary", "added", "deleted", "modified", "total_changes", "duration_ms", "exit_code", } def test_clean_tree_json_keys(self, repo: pathlib.Path) -> None: result = _diff(repo, "--json") assert result.exit_code == 0 data = json.loads(result.output) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing keys: {missing}" def test_has_changes_false_on_clean_tree(self, repo: pathlib.Path) -> None: result = _diff(repo, "--json") data = json.loads(result.output) assert data["has_changes"] is False def test_has_changes_true_on_modified(self, repo: pathlib.Path) -> None: (repo / "a.py").write_text("x = 99\n") result = _diff(repo, "--json") data = json.loads(result.output) assert data["has_changes"] is True def test_from_commit_id_present(self, repo: pathlib.Path) -> None: result = _diff(repo, "--json") data = json.loads(result.output) # from_commit_id should be the HEAD commit SHA assert data["from_commit_id"] is not None assert data["from_commit_id"].startswith("sha256:") assert len(data["from_commit_id"]) == len("sha256:") + 64 def test_to_commit_id_null_for_workdir_diff(self, repo: pathlib.Path) -> None: result = _diff(repo, "--json") data = json.loads(result.output) assert data["to_commit_id"] is None # ── Critical bug fix: deleted files must be in "deleted", not "modified" ── def test_deleted_file_in_deleted_list_not_modified(self, repo: pathlib.Path) -> None: """Regression test: files deleted from the working tree must appear in ``deleted``, not ``modified``. The plugin emits a ``patch`` op with all-delete child ops for file deletions; the JSON categorizer must recognise this.""" (repo / "b.py").write_text("b = 2\n") _commit(repo, "add b") (repo / "b.py").unlink() result = _diff(repo, "--json") data = json.loads(result.output) assert "b.py" in data["deleted"], f"b.py not in deleted: {data}" assert "b.py" not in data["modified"], f"b.py wrongly in modified: {data}" def test_added_file_in_added_list_not_modified(self, repo: pathlib.Path) -> None: """New files must appear in ``added``, not ``modified``.""" (repo / "new.py").write_text("n = 1\n") result = _diff(repo, "--json") data = json.loads(result.output) assert "new.py" in data["added"], f"new.py not in added: {data}" assert "new.py" not in data["modified"], f"new.py wrongly in modified: {data}" def test_modified_file_in_modified_list(self, repo: pathlib.Path) -> None: (repo / "a.py").write_text("x = 999\n") result = _diff(repo, "--json") data = json.loads(result.output) assert "a.py" in data["modified"] assert "a.py" not in data["added"] assert "a.py" not in data["deleted"] def test_combined_add_delete_modify(self, repo: pathlib.Path) -> None: """All three categories correct simultaneously.""" (repo / "b.py").write_text("b = 2\n") (repo / "c.py").write_text("c = 3\n") _commit(repo, "add b and c") (repo / "b.py").unlink() # deleted (repo / "c.py").write_text("c = 99\n") # modified (repo / "d.py").write_text("d = 4\n") # added result = _diff(repo, "--json") data = json.loads(result.output) assert "b.py" in data["deleted"] assert "c.py" in data["modified"] assert "d.py" in data["added"] def test_added_list_is_sorted(self, repo: pathlib.Path) -> None: for name in ["z.py", "a2.py", "m.py"]: (repo / name).write_text(f"x=1\n") result = _diff(repo, "--json") data = json.loads(result.output) assert data["added"] == sorted(data["added"]) def test_from_ref_is_head(self, repo: pathlib.Path) -> None: result = _diff(repo, "--json") data = json.loads(result.output) assert data["from_ref"] == "HEAD" def test_to_ref_is_working_tree(self, repo: pathlib.Path) -> None: result = _diff(repo, "--json") data = json.loads(result.output) assert data["to_ref"] == "working tree" def test_total_changes_matches_op_count(self, repo: pathlib.Path) -> None: (repo / "a.py").write_text("x = 50\n") (repo / "b.py").write_text("b = 1\n") result = _diff(repo, "--json") data = json.loads(result.output) total = len(data["added"]) + len(data["deleted"]) + len(data["modified"]) # total_changes counts plugin ops, not files; it can exceed the file count # if a file has multiple symbol ops, but it should be >= file count. assert data["total_changes"] >= total def test_two_commit_diff_has_commit_ids(self, repo: pathlib.Path) -> None: from muse.core.refs import get_head_commit_id cid1 = get_head_commit_id(repo, "main") (repo / "b.py").write_text("b = 1\n") _commit(repo, "second") cid2 = get_head_commit_id(repo, "main") result = _diff(repo, cid1 or "", cid2 or "", "--json") data = json.loads(result.output) assert data["from_commit_id"] == cid1 assert data["to_commit_id"] == cid2 # ────────────────────────────────────────────────────────────────────────────── # Integration — --exit-code # ────────────────────────────────────────────────────────────────────────────── class TestExitCode: def test_exit_code_0_on_clean_tree(self, repo: pathlib.Path) -> None: result = _diff(repo, "--exit-code") assert result.exit_code == 0 def test_exit_code_1_when_changes(self, repo: pathlib.Path) -> None: (repo / "a.py").write_text("x = 99\n") result = _diff(repo, "--exit-code") assert result.exit_code == 1 def test_exit_code_with_json_clean(self, repo: pathlib.Path) -> None: result = _diff(repo, "--exit-code", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["has_changes"] is False def test_exit_code_with_json_dirty(self, repo: pathlib.Path) -> None: (repo / "a.py").write_text("x = 99\n") result = _diff(repo, "--exit-code", "--json") assert result.exit_code == 1 data = json.loads(result.output) assert data["has_changes"] is True def test_exit_code_with_stat_clean(self, repo: pathlib.Path) -> None: result = _diff(repo, "--exit-code", "--stat") assert result.exit_code == 0 def test_exit_code_with_stat_dirty(self, repo: pathlib.Path) -> None: (repo / "a.py").write_text("x = 99\n") result = _diff(repo, "--exit-code", "--stat") assert result.exit_code == 1 def test_exit_code_with_text_dirty(self, repo: pathlib.Path) -> None: (repo / "a.py").write_text("x = 99\n") result = _diff(repo, "--exit-code", "--text") assert result.exit_code == 1 def test_exit_code_with_text_clean(self, repo: pathlib.Path) -> None: result = _diff(repo, "--exit-code", "--text") assert result.exit_code == 0 # ────────────────────────────────────────────────────────────────────────────── # Integration — two-commit diff # ────────────────────────────────────────────────────────────────────────────── class TestTwoCommitDiff: def test_two_commits_exits_0(self, repo: pathlib.Path) -> None: from muse.core.refs import get_head_commit_id cid1 = get_head_commit_id(repo, "main") (repo / "b.py").write_text("b=1\n") _commit(repo, "second") cid2 = get_head_commit_id(repo, "main") result = _diff(repo, cid1 or "", cid2 or "") assert result.exit_code == 0 def test_two_identical_commits_no_differences(self, repo: pathlib.Path) -> None: from muse.core.refs import get_head_commit_id cid = get_head_commit_id(repo, "main") result = _diff(repo, cid or "", cid or "") assert "No differences" in result.output def test_invalid_commit_ref_exits_1(self, repo: pathlib.Path) -> None: result = _diff(repo, "deadbeefdeadbeef") assert result.exit_code == 1 # ────────────────────────────────────────────────────────────────────────────── # Integration — --stat # ────────────────────────────────────────────────────────────────────────────── class TestStat: def test_stat_clean_tree(self, repo: pathlib.Path) -> None: result = _diff(repo, "--stat") assert result.exit_code == 0 assert "No differences" in result.output def test_stat_shows_summary(self, repo: pathlib.Path) -> None: (repo / "a.py").write_text("x = 50\n") result = _diff(repo, "--stat") assert result.exit_code == 0 # Should contain a human-readable summary (not empty) assert result.output.strip() != "" assert "No differences" not in result.output # ────────────────────────────────────────────────────────────────────────────── # Integration — --text (unified diff) # ────────────────────────────────────────────────────────────────────────────── class TestTextDiff: def test_text_clean_tree(self, repo: pathlib.Path) -> None: result = _diff(repo, "--text") assert result.exit_code == 0 assert "No differences" in result.output def test_text_modified_file_shows_diff(self, repo: pathlib.Path) -> None: (repo / "a.py").write_text("x = 99\n") result = _diff(repo, "--text") assert "a.py" in result.output def test_text_added_file_shown(self, repo: pathlib.Path) -> None: (repo / "new.py").write_text("n = 1\n") result = _diff(repo, "--text") assert "new.py" in result.output def test_text_deleted_file_shown(self, repo: pathlib.Path) -> None: (repo / "b.py").write_text("b=1\n") _commit(repo, "add b") (repo / "b.py").unlink() result = _diff(repo, "--text") assert "b.py" in result.output # ────────────────────────────────────────────────────────────────────────────── # Integration — --path filter # ────────────────────────────────────────────────────────────────────────────── class TestPathFilter: def test_path_filter_limits_output(self, repo: pathlib.Path) -> None: (repo / "a.py").write_text("x = 99\n") (repo / "b.py").write_text("b = 1\n") result = _diff(repo, "--json", "-p", "a.py") data = json.loads(result.output) # Should show a.py changes, not b.py all_paths = data["added"] + data["deleted"] + data["modified"] assert all(p.startswith("a") for p in all_paths) def test_directory_prefix_filter(self, repo: pathlib.Path) -> None: (repo / "src").mkdir() (repo / "src" / "foo.py").write_text("f = 1\n") (repo / "other.py").write_text("o = 1\n") result = _diff(repo, "--json", "-p", "src") data = json.loads(result.output) all_paths = data["added"] + data["deleted"] + data["modified"] assert all(p.startswith("src") for p in all_paths) def test_path_filter_with_nonexistent_path_returns_clean( self, repo: pathlib.Path ) -> None: (repo / "a.py").write_text("x = 99\n") result = _diff(repo, "--json", "-p", "nonexistent.py") data = json.loads(result.output) assert data["has_changes"] is False # ────────────────────────────────────────────────────────────────────────────── # Integration — validation # ────────────────────────────────────────────────────────────────────────────── class TestDiffShelf: """muse diff --shelf shows the shelved changes vs HEAD.""" def test_shelf_flag_shows_shelved_changes(self, repo: pathlib.Path) -> None: (repo / "a.py").write_text("x = 999\n") _invoke(repo, ["shelf", "save", "-m", "test shelf"]) result = _diff(repo, "--shelf") assert result.exit_code == 0 assert "a.py" in result.output def test_shelf_flag_json_schema(self, repo: pathlib.Path) -> None: (repo / "a.py").write_text("x = 999\n") _invoke(repo, ["shelf", "save", "-m", "test shelf"]) result = _diff(repo, "--shelf", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert "from_ref" in data assert "to_ref" in data assert data["from_ref"] == "HEAD" assert "shelf" in data["to_ref"] def test_shelf_flag_no_shelf_exits_1(self, repo: pathlib.Path) -> None: result = _diff(repo, "--shelf") assert result.exit_code == 1 def test_shelf_flag_with_index(self, repo: pathlib.Path) -> None: # Create two shelf entries, diff the second (index 1). (repo / "a.py").write_text("x = 10\n") _invoke(repo, ["shelf", "save", "-m", "first"]) (repo / "a.py").write_text("x = 20\n") _invoke(repo, ["shelf", "save", "-m", "second"]) # shelf/0 = second (newest), shelf/1 = first (oldest) result = _diff(repo, "--shelf", "1") assert result.exit_code == 0 def test_shelf_mutually_exclusive_with_staged(self, repo: pathlib.Path) -> None: result = _diff(repo, "--shelf", "--staged") assert result.exit_code == 1 def test_shelf_mutually_exclusive_with_unstaged(self, repo: pathlib.Path) -> None: result = _diff(repo, "--shelf", "--unstaged") assert result.exit_code == 1 class TestValidation: def test_staged_and_unstaged_mutually_exclusive(self, repo: pathlib.Path) -> None: result = _diff(repo, "--staged", "--unstaged") assert result.exit_code == 1 def test_unknown_flag_exits_nonzero(self, repo: pathlib.Path) -> None: result = _diff(repo, "--format", "xml") assert result.exit_code != 0 # ────────────────────────────────────────────────────────────────────────────── # Security — ANSI injection prevention # ────────────────────────────────────────────────────────────────────────────── class TestSecurityAnsi: """Text output must never emit raw ANSI sequences from user-controlled input.""" def _has_ansi(self, s: str) -> bool: return "\x1b[" in s or "\x1b]" in s def test_ansi_in_commit_ref_sanitized(self, repo: pathlib.Path) -> None: """An ANSI escape in a commit ref must not leak into terminal output.""" malicious = "\x1b[31mmalicious\x1b[0m" result = _diff(repo, malicious) assert not self._has_ansi(result.output) def test_ansi_in_path_filter_handled(self, repo: pathlib.Path) -> None: """An ANSI escape in --path must not leak into output.""" result = _diff(repo, "--json", "-p", "\x1b[31mmalicious\x1b[0m") assert not self._has_ansi(result.output) def test_text_diff_path_headers_sanitized(self, repo: pathlib.Path) -> None: """The a/path and b/path headers in unified diff must be sanitized.""" # We can't create files with ESC in names on most OS, so test via # the sanitize_display path indirectly by verifying clean output (repo / "normal.py").write_text("n = 1\n") result = _diff(repo, "--text") assert not self._has_ansi(result.output) # ────────────────────────────────────────────────────────────────────────────── # End-to-end — text output # ────────────────────────────────────────────────────────────────────────────── class TestTextOutput: def test_no_differences_on_clean_tree(self, repo: pathlib.Path) -> None: result = _diff(repo) assert "No differences" in result.output def test_summary_line_on_changes(self, repo: pathlib.Path) -> None: (repo / "a.py").write_text("x = 50\n") result = _diff(repo) # Summary line should appear after the file listing assert result.output.strip() != "" assert "No differences" not in result.output def test_deleted_file_shows_d_prefix(self, repo: pathlib.Path) -> None: (repo / "b.py").write_text("b=1\n") _commit(repo, "add b") (repo / "b.py").unlink() result = _diff(repo) assert "b.py" in result.output # Should show D (delete) status, not A or M assert "D" in result.output or "removed" in result.output.lower() # ────────────────────────────────────────────────────────────────────────────── # Stress — large repos # ────────────────────────────────────────────────────────────────────────────── @pytest.mark.slow class TestStressLargeRepo: def test_diff_500_files_10_changes_under_1s(self, repo: pathlib.Path) -> None: for i in range(500): (repo / f"f{i:04d}.py").write_text(f"x = {i}\n") _commit(repo, "base") for i in range(10): (repo / f"f{i:04d}.py").write_text(f"x = {i * 100}\n") t0 = time.perf_counter() result = _diff(repo, "--json") elapsed = (time.perf_counter() - t0) * 1000 assert result.exit_code == 0 data = json.loads(result.output) assert data["has_changes"] is True assert elapsed < 1000, f"diff took {elapsed:.0f}ms (limit 1000ms)" def test_diff_1000_added_files(self, repo: pathlib.Path) -> None: _commit(repo, "base") for i in range(1000): (repo / f"g{i:04d}.py").write_text(f"y = {i}\n") t0 = time.perf_counter() result = _diff(repo, "--json") elapsed = (time.perf_counter() - t0) * 1000 assert result.exit_code == 0 data = json.loads(result.output) assert data["has_changes"] is True assert elapsed < 3000, f"diff took {elapsed:.0f}ms (limit 3000ms)" def test_diff_with_100_deleted_files_correct_categorization( self, repo: pathlib.Path ) -> None: for i in range(100): (repo / f"h{i:04d}.py").write_text(f"h = {i}\n") _commit(repo, "base with 100 files") for i in range(100): (repo / f"h{i:04d}.py").unlink() result = _diff(repo, "--json") data = json.loads(result.output) # All 100 must be in deleted, not modified assert len(data["deleted"]) == 100 assert len(data["modified"]) == 0 @pytest.mark.slow class TestStressConcurrent: def test_concurrent_diffs_to_separate_repos(self, tmp_path: pathlib.Path) -> None: errors: list[str] = [] def do_diff(idx: int) -> None: repo_dir = tmp_path / f"repo_{idx}" repo_dir.mkdir() subprocess.run( ["muse", "init"], cwd=str(repo_dir), capture_output=True ) (repo_dir / "x.py").write_text(f"x = {idx}\n") subprocess.run( ["muse", "commit", "-m", "base"], cwd=str(repo_dir), capture_output=True, ) (repo_dir / "x.py").write_text(f"x = {idx + 100}\n") r = subprocess.run( ["muse", "diff", "--json"], cwd=str(repo_dir), capture_output=True, text=True, ) if r.returncode != 0: errors.append(f"repo_{idx}: diff failed") return data = json.loads(r.stdout) if not data.get("has_changes"): errors.append(f"repo_{idx}: expected has_changes=true") threads = [threading.Thread(target=do_diff, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() # ────────────────────────────────────────────────────────────────────────────── # TestDiffConflict — muse diff --conflict (Cohen Transform labeled diff) # ────────────────────────────────────────────────────────────────────────────── def _make_conflict_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Return a repo on *main* with an in-progress conflicting checkout -m. The repo has: - ``shared.py`` on main: line1 / line2 / line3 - branch ``other``: line1 / LINE2 / line3 (other changed line2) - dirty workdir on main: line1 / OURS2 / line3 (ours changed line2) Running ``checkout -m other`` produces a conflict on shared.py and writes MERGE_STATE.json. The caller receives the repo in that conflicted state. """ saved = os.getcwd() try: os.chdir(tmp_path) runner.invoke(None, ["init"]) finally: os.chdir(saved) (tmp_path / "shared.py").write_text("line1\nline2\nline3\n") _commit(tmp_path, "initial") _invoke(tmp_path, ["branch", "other"]) _invoke(tmp_path, ["checkout", "other"]) (tmp_path / "shared.py").write_text("line1\nLINE2\nline3\n") _commit(tmp_path, "other changes line2") _invoke(tmp_path, ["checkout", "main"]) (tmp_path / "shared.py").write_text("line1\nOURS2\nline3\n") # Trigger the conflicting checkout -m to create MERGE_STATE _invoke(tmp_path, ["checkout", "-m", "other"]) return tmp_path class TestDiffConflictParser: """Parser-level tests for the ``--conflict`` flag.""" def _parse(self, *args: str) -> "argparse.Namespace": import argparse from muse.cli.commands.diff import register p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["diff", *args]) def test_conflict_flag_parsed(self) -> None: ns = self._parse("--conflict") assert ns.conflict is True def test_conflict_false_by_default(self) -> None: ns = self._parse() assert ns.conflict is False def test_conflict_and_json_coexist(self) -> None: ns = self._parse("--conflict", "--json") assert ns.conflict is True assert ns.json_out is True def test_conflict_and_path_coexist(self) -> None: ns = self._parse("--conflict", "--path", "src/") assert ns.conflict is True assert "src/" in ns.paths class TestDiffConflictNoMerge: """--conflict when no merge is in progress must error cleanly.""" def test_no_merge_in_progress_exits_1(self, repo: pathlib.Path) -> None: r = _diff(repo, "--conflict") assert r.exit_code == 1 def test_no_merge_error_message_on_stderr(self, repo: pathlib.Path) -> None: r = _diff(repo, "--conflict") assert "MERGE_STATE" in r.stderr or "merge" in r.stderr.lower() def test_no_merge_json_also_exits_1(self, repo: pathlib.Path) -> None: r = _diff(repo, "--conflict", "--json") assert r.exit_code == 1 class TestDiffConflictOutput: """--conflict with an active merge in progress.""" def test_exits_nonzero_when_conflicts_exist(self, tmp_path: pathlib.Path) -> None: repo = _make_conflict_repo(tmp_path) r = _diff(repo, "--conflict") assert r.exit_code != 0 def test_output_mentions_conflict_file(self, tmp_path: pathlib.Path) -> None: repo = _make_conflict_repo(tmp_path) r = _diff(repo, "--conflict") assert "shared.py" in r.output def test_output_contains_ours_side(self, tmp_path: pathlib.Path) -> None: repo = _make_conflict_repo(tmp_path) r = _diff(repo, "--conflict") assert "[ours]" in r.output or "ours" in r.output.lower() def test_output_contains_theirs_side(self, tmp_path: pathlib.Path) -> None: repo = _make_conflict_repo(tmp_path) r = _diff(repo, "--conflict") assert "[theirs]" in r.output or "theirs" in r.output.lower() def test_output_contains_cohen_action_labels(self, tmp_path: pathlib.Path) -> None: """Cohen-style hunk labels (e.g. [branchname: modified]) must appear in @@-headers.""" repo = _make_conflict_repo(tmp_path) r = _diff(repo, "--conflict") combined = r.output + r.stderr # annotate_hunk_action produces [side_label: action] suffixes on @@ headers assert any( suffix in combined for suffix in (": modified]", ": inserted]", ": deleted]") ) def test_json_status_is_conflict(self, tmp_path: pathlib.Path) -> None: repo = _make_conflict_repo(tmp_path) r = _diff(repo, "--conflict", "--json") data = json.loads(r.output) assert data["status"] == "conflict" def test_json_conflicts_list_non_empty(self, tmp_path: pathlib.Path) -> None: repo = _make_conflict_repo(tmp_path) r = _diff(repo, "--conflict", "--json") data = json.loads(r.output) assert len(data["conflicts"]) >= 1 def test_json_conflict_entry_has_path_and_diffs(self, tmp_path: pathlib.Path) -> None: repo = _make_conflict_repo(tmp_path) r = _diff(repo, "--conflict", "--json") data = json.loads(r.output) entry = data["conflicts"][0] assert "path" in entry assert "ours_diff" in entry assert "theirs_diff" in entry def test_json_labels_match_branches(self, tmp_path: pathlib.Path) -> None: repo = _make_conflict_repo(tmp_path) r = _diff(repo, "--conflict", "--json") data = json.loads(r.output) assert data["ours_label"] in ("other", "main") # one of the branch names assert data["theirs_label"] in ("other", "main") def test_path_filter_limits_output(self, tmp_path: pathlib.Path) -> None: """``--path`` with a non-matching prefix must produce empty conflicts.""" repo = _make_conflict_repo(tmp_path) r = _diff(repo, "--conflict", "--json", "--path", "nonexistent/") data = json.loads(r.output) assert len(data["conflicts"]) == 0 def test_path_filter_matching_includes_file(self, tmp_path: pathlib.Path) -> None: """``--path shared.py`` must include the conflicting file.""" repo = _make_conflict_repo(tmp_path) r = _diff(repo, "--conflict", "--json", "--path", "shared.py") data = json.loads(r.output) assert any(e["path"] == "shared.py" for e in data["conflicts"]) class TestDiffConflictSecurity: """Security: ANSI injection via branch names / paths must be sanitized.""" def test_ansi_in_conflict_path_sanitized(self, tmp_path: pathlib.Path) -> None: """ANSI escape sequences in a conflict file path must not reach the output. The CliRunner strips ANSI from all output, so the raw escape code must not appear in the combined output string. """ repo = _make_conflict_repo(tmp_path) r = _diff(repo, "--conflict") # CliRunner already strips ANSI; double-check no raw escape CSI leaks through assert "\x1b[" not in r.output # ────────────────────────────────────────────────────────────────────────────── # Unit — PatchOp.file_change field # ────────────────────────────────────────────────────────────────────────────── class TestPatchOpFileChangeField: """PatchOp gains a file_change field: 'added' | 'deleted' | 'modified'. This field is set by build_diff_ops based on which path bucket the file belongs to — not inferred from child op direction after the fact. """ def _make_patch(self, file_change: str | None = None) -> "PatchOp": from muse.domain import PatchOp kwargs = dict( op="patch", address="file.py", child_ops=[], child_domain="code", child_summary="", ) if file_change is not None: kwargs["file_change"] = file_change return PatchOp(**kwargs) def test_patch_op_accepts_file_change_added(self) -> None: op = self._make_patch("added") assert op["file_change"] == "added" def test_patch_op_accepts_file_change_deleted(self) -> None: op = self._make_patch("deleted") assert op["file_change"] == "deleted" def test_patch_op_accepts_file_change_modified(self) -> None: op = self._make_patch("modified") assert op["file_change"] == "modified" def test_patch_op_file_change_is_optional(self) -> None: """Existing call sites without file_change must still work.""" op = self._make_patch() assert "file_change" not in op # ────────────────────────────────────────────────────────────────────────────── # Unit — build_diff_ops sets file_change from path bucket # ────────────────────────────────────────────────────────────────────────────── class TestBuildDiffOpsFileChange: """build_diff_ops sets PatchOp.file_change from the path bucket (added / removed / modified), never from child op direction. """ def _trees_with_symbols(self, path: str, names: list[str]) -> Mapping[str, object]: """Minimal SymbolTree with one entry per name.""" return { f"{path}::{name}": { "name": name, "kind": "function", "qualified_name": f"{path}::{name}", "lineno": 1, "end_lineno": 2, "content_id": f"c_{name}", "body_hash": f"b_{name}", "signature_id": f"s_{name}", } for name in names } def test_added_file_patch_has_file_change_added(self) -> None: from muse.plugins.code.symbol_diff import build_diff_ops base_files = {} target_files = {"new.py": "oid1"} base_trees = {} target_trees = {"new.py": self._trees_with_symbols("new.py", ["alpha", "beta"])} ops = build_diff_ops(base_files, target_files, base_trees, target_trees) patch_ops = [o for o in ops if o["op"] == "patch"] assert len(patch_ops) == 1 assert patch_ops[0]["file_change"] == "added" def test_removed_file_patch_has_file_change_deleted(self) -> None: from muse.plugins.code.symbol_diff import build_diff_ops base_files = {"old.py": "oid1"} target_files = {} base_trees = {"old.py": self._trees_with_symbols("old.py", ["alpha", "beta"])} target_trees = {} ops = build_diff_ops(base_files, target_files, base_trees, target_trees) patch_ops = [o for o in ops if o["op"] == "patch"] assert len(patch_ops) == 1 assert patch_ops[0]["file_change"] == "deleted" def test_modified_file_patch_has_file_change_modified(self) -> None: from muse.plugins.code.symbol_diff import build_diff_ops base_files = {"mod.py": "oid1"} target_files = {"mod.py": "oid2"} base_trees = {"mod.py": self._trees_with_symbols("mod.py", ["alpha"])} target_trees = {"mod.py": self._trees_with_symbols("mod.py", ["alpha", "beta"])} ops = build_diff_ops(base_files, target_files, base_trees, target_trees) patch_ops = [o for o in ops if o["op"] == "patch"] assert len(patch_ops) == 1 assert patch_ops[0]["file_change"] == "modified" def test_modified_file_all_symbol_deletions_still_file_change_modified(self) -> None: """The critical case: a living file that lost all its symbols must carry file_change='modified', not 'deleted'. Child op direction must NOT determine file status.""" from muse.plugins.code.symbol_diff import build_diff_ops base_files = {"shrunk.py": "oid1"} target_files = {"shrunk.py": "oid2"} # file still exists base_trees = {"shrunk.py": self._trees_with_symbols("shrunk.py", ["alpha", "beta", "gamma"])} target_trees = {"shrunk.py": {}} # all symbols gone, but file lives ops = build_diff_ops(base_files, target_files, base_trees, target_trees) patch_ops = [o for o in ops if o["op"] == "patch"] assert len(patch_ops) == 1, f"Expected 1 PatchOp, got: {ops}" assert patch_ops[0]["file_change"] == "modified", ( f"Living file with all-delete children must be 'modified', " f"got {patch_ops[0].get('file_change')!r}" ) def test_modified_file_all_symbol_additions_still_file_change_modified(self) -> None: """A living file that gained all new symbols is 'modified', not 'added'.""" from muse.plugins.code.symbol_diff import build_diff_ops base_files = {"grew.py": "oid1"} target_files = {"grew.py": "oid2"} base_trees = {"grew.py": {}} # was empty (no symbols) target_trees = {"grew.py": self._trees_with_symbols("grew.py", ["alpha", "beta"])} ops = build_diff_ops(base_files, target_files, base_trees, target_trees) patch_ops = [o for o in ops if o["op"] == "patch"] assert len(patch_ops) == 1 assert patch_ops[0]["file_change"] == "modified", ( f"Living file with all-insert children must be 'modified', " f"got {patch_ops[0].get('file_change')!r}" ) # ────────────────────────────────────────────────────────────────────────────── # Integration — file-level sigil correctness (the AX bug fix) # ────────────────────────────────────────────────────────────────────────────── class TestFileSignilCorrectnessTextOutput: """Text output: the file-level sigil (A/D/M/R) must reflect whether the file was added, deleted, or modified — never inferred from child op counts. Historically, a modified file that lost all its functions showed 'D' in the diff output (misread as 'file deleted' by agents). After the fix, that file must show 'M'. """ def test_modified_file_losing_all_symbols_shows_M_sigil( self, repo: pathlib.Path ) -> None: """Living file that lost all named functions → M, not D.""" (repo / "funcs.py").write_text( "def alpha():\n return 1\n\ndef beta():\n return 2\n" ) _commit(repo, "add funcs") # Overwrite with content that has no recognised symbols. (repo / "funcs.py").write_text("# no functions here\nPLACEHOLDER = True\n") result = _diff(repo) lines = result.output.splitlines() file_line = next( (l for l in lines if "funcs.py" in l and not l.strip().startswith("├─") and not l.strip().startswith("└─")), None, ) assert file_line is not None, f"funcs.py not in diff:\n{result.output}" assert file_line.strip().startswith("M"), ( f"Expected 'M funcs.py' (modified), got: {file_line!r}\n" f"Full output:\n{result.output}" ) def test_modified_file_gaining_all_symbols_shows_M_sigil( self, repo: pathlib.Path ) -> None: """Living file that gained functions → M, not A.""" # Start with a file that has no functions (repo / "empty.py").write_text("PLACEHOLDER = True\n") _commit(repo, "add empty.py") # Add functions to it (repo / "empty.py").write_text( "def alpha():\n return 1\n\ndef beta():\n return 2\n" ) result = _diff(repo) lines = result.output.splitlines() file_line = next( (l for l in lines if "empty.py" in l and not l.strip().startswith("├─") and not l.strip().startswith("└─")), None, ) assert file_line is not None, f"empty.py not in diff:\n{result.output}" assert file_line.strip().startswith("M"), ( f"Expected 'M empty.py' (modified), got: {file_line!r}\n" f"Full output:\n{result.output}" ) def test_actually_deleted_file_still_shows_D_sigil( self, repo: pathlib.Path ) -> None: """Sanity check: a file that is truly gone still shows D.""" (repo / "gone.py").write_text("def foo(): pass\n") _commit(repo, "add gone") (repo / "gone.py").unlink() result = _diff(repo) lines = result.output.splitlines() file_line = next( (l for l in lines if "gone.py" in l and not l.strip().startswith("├─") and not l.strip().startswith("└─")), None, ) assert file_line is not None assert file_line.strip().startswith("D"), ( f"Expected 'D gone.py' (deleted), got: {file_line!r}" ) def test_newly_added_file_still_shows_A_sigil( self, repo: pathlib.Path ) -> None: """Sanity check: a brand-new file still shows A.""" (repo / "brand_new.py").write_text("def foo(): pass\n") result = _diff(repo) lines = result.output.splitlines() file_line = next( (l for l in lines if "brand_new.py" in l and not l.strip().startswith("├─") and not l.strip().startswith("└─")), None, ) assert file_line is not None assert file_line.strip().startswith("A"), ( f"Expected 'A brand_new.py' (added), got: {file_line!r}" ) class TestFileSignilCorrectnessJsonOutput: """JSON output must categorize files by actual existence, not child op direction.""" def test_modified_file_losing_all_symbols_in_modified_not_deleted( self, repo: pathlib.Path ) -> None: (repo / "funcs.py").write_text( "def alpha():\n return 1\n\ndef beta():\n return 2\n" ) _commit(repo, "add funcs") (repo / "funcs.py").write_text("# no functions here\nPLACEHOLDER = True\n") result = _diff(repo, "--json") data = json.loads(result.output) assert "funcs.py" in data["modified"], ( f"funcs.py must be in modified, got: {data}" ) assert "funcs.py" not in data["deleted"], ( f"funcs.py must NOT be in deleted (file still exists), got: {data}" ) def test_modified_file_gaining_all_symbols_in_modified_not_added( self, repo: pathlib.Path ) -> None: (repo / "empty.py").write_text("PLACEHOLDER = True\n") _commit(repo, "add empty.py") (repo / "empty.py").write_text( "def alpha():\n return 1\n\ndef beta():\n return 2\n" ) result = _diff(repo, "--json") data = json.loads(result.output) assert "empty.py" in data["modified"], ( f"empty.py must be in modified, got: {data}" ) assert "empty.py" not in data["added"], ( f"empty.py must NOT be in added (file pre-existed), got: {data}" ) # ────────────────────────────────────────────────────────────────────────────── # Dead-code removal — _classify_patch_op and _op_category must be deleted # ────────────────────────────────────────────────────────────────────────────── class TestInferenceHelpersRemoved: """_classify_patch_op and _op_category exist only to infer file status from child op counts. Once PatchOp.file_change carries authoritative status, both helpers are dead code and must be deleted. """ def test_classify_patch_op_deleted(self) -> None: import muse.cli.commands.diff as m assert not hasattr(m, "_classify_patch_op"), ( "_classify_patch_op is dead code — file status is now read from " "PatchOp.file_change, not inferred from child ops" ) def test_op_category_deleted(self) -> None: import muse.cli.commands.diff as m assert not hasattr(m, "_op_category"), ( "_op_category is dead code — it existed only to wrap _classify_patch_op" )