"""Tests for ``muse coord plan-merge``. Coverage matrix --------------- Unit ~~~~ * :func:`_classify_change` — all six change classifications * :func:`_classify_conflict` — all three-way matrix cells (base/ours/theirs combinations: 8 cases + rename, symbol_edit_overlap) * :func:`_find_renames_and_moves` — rename vs move discrimination, trivial body skipped * :func:`_find_delete_use_conflicts` — new callers detected, call graph errors handled * :func:`_find_dependency_conflicts` — transitive dependency detection, errors handled * :class:`_MergeItem` — to_dict, slots Integration (mock-based — no real commits required) ~~~~~~~~~~~ * ``plan-merge OURS THEIRS`` — ref not found → exit 1 * ``plan-merge OURS THEIRS`` — theirs ref not found → exit 1 * ``plan-merge OURS THEIRS --base MISSING`` — base ref not found → exit 1 * ``plan-merge OURS THEIRS`` — base auto-computed * ``plan-merge OURS THEIRS --base BASE_REF`` * ``plan-merge OURS THEIRS --skip-call-graph`` * ``plan-merge OURS THEIRS --format json`` — schema complete * ``plan-merge OURS THEIRS --json`` — shorthand * JSON: ``base_auto_computed``, ``call_graph_available``, ``warnings``, ``duration_ms``, full commit IDs, ``conflicts_by_type`` breakdown * JSON output is compact (no indent — single line) * Text output: conflict summary, warnings visible, elapsed present * ``symbol_edit_overlap`` detected (both changed, content differs) * ``rename_edit`` detected via body_hash matching * ``move_edit`` detected via body_hash + file prefix mismatch * ``delete_use`` detected via forward call graph * ``dependency_conflict`` detected via reverse call graph * ``no_conflict`` for unilateral changes (three-way correctness) * call_graph unavailable → warning, not crash * unexpected call graph exception propagates Error shapes ~~~~~~~~~~~~ * JSON error has ``{"error": ..., "status": "error"}`` * Text error uses ``❌`` prefix on stderr * theirs-ref not-found JSON error shape * base-ref not-found JSON error shape Security ~~~~~~~~ * ANSI sequences in ref names, addresses, change descriptions stripped * Path traversal in ref args → resolve_commit_ref handles it (no FS access) Stress ~~~~~~ * 1000 symbols across ours + theirs → Pass 1 in < 2 s * 200 renames → Pass 2 in < 1 s * delete_use with 100 deleted symbols → < 2 s * conflicts_by_type counts correct with mixed conflict types at scale E2E ~~~ * Same commit for ours and theirs → 0 conflicts * Three distinct conflict types in one plan → all appear in conflicts_by_type """ from __future__ import annotations import argparse import json import os import pathlib import sys import time from unittest.mock import MagicMock, patch import pytest from muse.core.types import Manifest, fake_id from muse.core.paths import muse_dir from muse.plugins.code.ast_parser import SymbolRecord, SymbolTree type SymbolMap = dict[str, SymbolRecord] type SymbolsByFile = dict[str, SymbolMap] from muse.plugins.code._callgraph import ForwardGraph # ── Helpers ─────────────────────────────────────────────────────────────────── def _sym( name: str, content_id: str | None = None, body_hash: str | None = None, signature_id: str | None = None, metadata_id: str = "", ) -> SymbolRecord: """Build a minimal SymbolRecord for testing.""" h = content_id or f"cid-{name}" b = body_hash or f"bh-{name}" s = signature_id or f"sid-{name}" return { "kind": "function", "name": name, "qualified_name": name, "content_id": h, "body_hash": b, "signature_id": s, "metadata_id": metadata_id, "canonical_key": f"f.py##{name}", } def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": fake_id("repo"), "name": "test-repo"}) ) return tmp_path @pytest.fixture() def repo(tmp_path: pathlib.Path) -> pathlib.Path: return _make_repo(tmp_path) def _mock_commit(cid: str | None = None) -> MagicMock: m = MagicMock() m.commit_id = cid or fake_id("commit") return m def _run_plan_merge( repo: pathlib.Path, ours_ref: str = "HEAD", theirs_ref: str = "main", base_ref: str | None = None, skip_call_graph: bool = True, fmt: str = "json", mock_ours_commit: MagicMock | None = None, mock_theirs_commit: MagicMock | None = None, mock_base_cid: str | None = None, mock_ours_syms: SymbolMap | None = None, mock_theirs_syms: SymbolMap | None = None, mock_base_syms: SymbolMap | None = None, ) -> tuple[int, str]: """Run plan-merge with mocked commits/symbols. Returns (exit_code, stdout).""" from muse.cli.commands.plan_merge import run as pm_run ours_c = mock_ours_commit or _mock_commit(f"aaa{'0' * 61}") theirs_c = mock_theirs_commit or _mock_commit(f"bbb{'0' * 61}") base_cid = mock_base_cid or f"ccc{'0' * 61}" ours_syms = mock_ours_syms or {} theirs_syms = mock_theirs_syms or {} base_syms = mock_base_syms or {} def _sym_for_snapshot(root: pathlib.Path, manifest: Manifest, **kwargs: str) -> SymbolsByFile: if manifest.get("_branch") == "ours": return {"f.py": ours_syms} if manifest.get("_branch") == "theirs": return {"f.py": theirs_syms} if manifest.get("_branch") == "base": return {"f.py": base_syms} return {} ns = argparse.Namespace( ours_ref=ours_ref, theirs_ref=theirs_ref, base_ref=base_ref, skip_call_graph=skip_call_graph, fmt=fmt, json_out=(fmt == "json"), ) old = os.getcwd() os.chdir(repo) import io, sys captured = io.StringIO() old_stdout = sys.stdout sys.stdout = captured exit_code = 0 try: with ( patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), patch("muse.cli.commands.plan_merge.resolve_commit_ref", side_effect=lambda root, branch, ref: ( ours_c if ref in (ours_ref, None) else theirs_c if ref == theirs_ref else _mock_commit(mock_base_cid) if ref == base_ref else None )), patch("muse.cli.commands.plan_merge.find_merge_base", return_value=base_cid), patch("muse.cli.commands.plan_merge.get_commit_snapshot_manifest", side_effect=lambda root, cid: ( {"_branch": "ours"} if cid == ours_c.commit_id else {"_branch": "theirs"} if cid == theirs_c.commit_id else {"_branch": "base"} )), patch("muse.cli.commands.plan_merge.symbols_for_snapshot", side_effect=_sym_for_snapshot), ): pm_run(ns) except SystemExit as exc: exit_code = exc.code or 0 finally: sys.stdout = old_stdout os.chdir(old) return exit_code, captured.getvalue() # ───────────────────────────────────────────────────────────────────────────── # Unit tests — _classify_change # ───────────────────────────────────────────────────────────────────────────── class TestClassifyChange: def test_unchanged(self) -> None: from muse.cli.commands.plan_merge import _classify_change s = _sym("fn", content_id="X") assert _classify_change(s, s) == "unchanged" def test_metadata_only(self) -> None: from muse.cli.commands.plan_merge import _classify_change base = _sym("fn", content_id="X", body_hash="BH", signature_id="SIG") target = _sym("fn", content_id="Y", body_hash="BH", signature_id="SIG") assert _classify_change(base, target) == "metadata_only" def test_signature_only(self) -> None: from muse.cli.commands.plan_merge import _classify_change base = _sym("fn", body_hash="BH", signature_id="SIG1") target = _sym("fn", content_id="X2", body_hash="BH", signature_id="SIG2") assert _classify_change(base, target) == "signature_only" def test_impl_only(self) -> None: from muse.cli.commands.plan_merge import _classify_change base = _sym("fn", body_hash="BH1", signature_id="SIG") target = _sym("fn", content_id="X2", body_hash="BH2", signature_id="SIG") assert _classify_change(base, target) == "impl_only" def test_rename_modify(self) -> None: from muse.cli.commands.plan_merge import _classify_change base = {**_sym("fn_old"), "name": "fn_old"} target = {**_sym("fn_new", content_id="X2", body_hash="BH2"), "name": "fn_new"} assert _classify_change(base, target) == "rename+modify" def test_full_rewrite(self) -> None: from muse.cli.commands.plan_merge import _classify_change base = _sym("fn") target = _sym("fn", content_id="X2", body_hash="BH2", signature_id="SIG2") assert _classify_change(base, target) == "full_rewrite" # ───────────────────────────────────────────────────────────────────────────── # Unit tests — _classify_conflict (three-way matrix) # ───────────────────────────────────────────────────────────────────────────── class TestClassifyConflict: def test_both_absent_no_conflict(self) -> None: from muse.cli.commands.plan_merge import _classify_conflict item = _classify_conflict("x.py::fn", None, None, None) assert item.conflict_type == "no_conflict" def test_ours_new_no_base_no_conflict(self) -> None: from muse.cli.commands.plan_merge import _classify_conflict item = _classify_conflict("x.py::fn", None, _sym("fn"), None) assert item.conflict_type == "no_conflict" assert item.ours_change == "added" def test_theirs_new_no_base_no_conflict(self) -> None: from muse.cli.commands.plan_merge import _classify_conflict item = _classify_conflict("x.py::fn", None, None, _sym("fn")) assert item.conflict_type == "no_conflict" assert item.theirs_change == "added" def test_only_ours_changed_three_way(self) -> None: """base=X, ours=Y, theirs=X → only ours changed → no_conflict.""" from muse.cli.commands.plan_merge import _classify_conflict base = _sym("fn", content_id="X", body_hash="BH1", signature_id="SIG") ours = _sym("fn", content_id="Y", body_hash="BH2", signature_id="SIG") theirs = _sym("fn", content_id="X", body_hash="BH1", signature_id="SIG") item = _classify_conflict("x.py::fn", base, ours, theirs) assert item.conflict_type == "no_conflict" assert "fast-forward" in item.recommendation def test_only_theirs_changed_three_way(self) -> None: """base=X, ours=X, theirs=Y → only theirs changed → no_conflict.""" from muse.cli.commands.plan_merge import _classify_conflict base = _sym("fn", content_id="X", body_hash="BH1", signature_id="SIG") ours = _sym("fn", content_id="X", body_hash="BH1", signature_id="SIG") theirs = _sym("fn", content_id="Y", body_hash="BH2", signature_id="SIG") item = _classify_conflict("x.py::fn", base, ours, theirs) assert item.conflict_type == "no_conflict" def test_both_changed_three_way_overlap(self) -> None: """base=X, ours=Y, theirs=Z → both changed → symbol_edit_overlap.""" from muse.cli.commands.plan_merge import _classify_conflict base = _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG0") ours = _sym("fn", content_id="Y", body_hash="BH1", signature_id="SIG0") theirs = _sym("fn", content_id="Z", body_hash="BH2", signature_id="SIG0") item = _classify_conflict("x.py::fn", base, ours, theirs) assert item.conflict_type == "symbol_edit_overlap" def test_identical_on_both_no_conflict(self) -> None: from muse.cli.commands.plan_merge import _classify_conflict s = _sym("fn", content_id="SAME") item = _classify_conflict("x.py::fn", _sym("fn"), s, s) assert item.conflict_type == "no_conflict" assert "identical" in item.recommendation def test_ours_deleted_theirs_unchanged_no_conflict(self) -> None: """base=X, ours=None, theirs=X (unchanged) → no_conflict.""" from muse.cli.commands.plan_merge import _classify_conflict base = _sym("fn", content_id="X", body_hash="BH", signature_id="SIG") theirs = _sym("fn", content_id="X", body_hash="BH", signature_id="SIG") item = _classify_conflict("x.py::fn", base, None, theirs) assert item.conflict_type == "no_conflict" def test_ours_deleted_theirs_modified_review(self) -> None: """base=X, ours=None, theirs=Y → potentially delete_use → review.""" from muse.cli.commands.plan_merge import _classify_conflict base = _sym("fn", content_id="X", body_hash="BH", signature_id="SIG") theirs = _sym("fn", content_id="Y", body_hash="BH2", signature_id="SIG") item = _classify_conflict("x.py::fn", base, None, theirs) # delete_use is detected in Pass 3; this is a "review" no_conflict. assert item.conflict_type == "no_conflict" assert "review" in item.recommendation def test_rename_edit_detected_ours_renamed(self) -> None: """Ours renamed (same body, different name) + theirs modified → rename_edit.""" from muse.cli.commands.plan_merge import _classify_conflict base_rec = {**_sym("fn_old", body_hash="BH", signature_id="SIG"), "name": "fn_old"} # Ours: same body hash, but name changed (signature_only = rename) ours_rec = { **_sym("fn_new", content_id="C2", body_hash="BH", signature_id="SIG2"), "name": "fn_new", } theirs_rec = { **_sym("fn_old", content_id="C3", body_hash="BH3", signature_id="SIG"), "name": "fn_old", } item = _classify_conflict("x.py::fn_old", base_rec, ours_rec, theirs_rec) assert item.conflict_type == "rename_edit" def test_both_added_same_content_no_conflict(self) -> None: """No base, both added same symbol → no_conflict.""" from muse.cli.commands.plan_merge import _classify_conflict s = _sym("fn", content_id="SAME") item = _classify_conflict("x.py::fn", None, s, s) assert item.conflict_type == "no_conflict" def test_both_added_different_content_overlap(self) -> None: """No base, both added different content → symbol_edit_overlap.""" from muse.cli.commands.plan_merge import _classify_conflict item = _classify_conflict( "x.py::fn", None, _sym("fn", content_id="X"), _sym("fn", content_id="Y"), ) assert item.conflict_type == "symbol_edit_overlap" def test_merge_item_slots_and_to_dict(self) -> None: from muse.cli.commands.plan_merge import _MergeItem m = _MergeItem("addr", "no_conflict", "a", "b", "rec") d = m.to_dict() assert set(d.keys()) == {"address", "conflict_type", "ours_change", "theirs_change", "recommendation"} assert d["conflict_type"] == "no_conflict" # ───────────────────────────────────────────────────────────────────────────── # Unit tests — _find_renames_and_moves # ───────────────────────────────────────────────────────────────────────────── class TestFindRenamesAndMoves: def test_rename_detected_same_file(self) -> None: from muse.cli.commands.plan_merge import _find_renames_and_moves base = {"file.py::fn_old": _sym("fn_old", body_hash="BODYHASH123456")} branch = {"file.py::fn_new": {**_sym("fn_new", body_hash="BODYHASH123456"), "name": "fn_new"}} renames, moves = _find_renames_and_moves(base, branch) assert "file.py::fn_old" in renames assert renames["file.py::fn_old"] == "file.py::fn_new" assert not moves def test_move_detected_different_file(self) -> None: from muse.cli.commands.plan_merge import _find_renames_and_moves base = {"old/file.py::fn": _sym("fn", body_hash="BODYHASH123456")} branch = {"new/file.py::fn": _sym("fn", body_hash="BODYHASH123456")} renames, moves = _find_renames_and_moves(base, branch) assert not renames assert "old/file.py::fn" in moves def test_same_file_preferred_over_other_file(self) -> None: from muse.cli.commands.plan_merge import _find_renames_and_moves base = {"file.py::fn_old": _sym("fn_old", body_hash="BODYHASH123456")} branch = { "file.py::fn_new": _sym("fn_new", body_hash="BODYHASH123456"), "other.py::fn_copy": _sym("fn_copy", body_hash="BODYHASH123456"), } renames, moves = _find_renames_and_moves(base, branch) # Same file candidate should win → rename, not move. assert "file.py::fn_old" in renames assert "file.py::fn_old" not in moves def test_trivial_body_hash_skipped(self) -> None: """Very short body hashes are skipped to avoid false positives.""" from muse.cli.commands.plan_merge import _find_renames_and_moves base = {"file.py::fn": _sym("fn", body_hash="X")} # Too short. branch = {"file.py::fn_new": _sym("fn_new", body_hash="X")} renames, moves = _find_renames_and_moves(base, branch) assert not renames assert not moves def test_still_present_not_a_rename(self) -> None: from muse.cli.commands.plan_merge import _find_renames_and_moves sym = _sym("fn", body_hash="BODY1234567890") base = {"file.py::fn": sym} branch = {"file.py::fn": sym, "file.py::fn2": _sym("fn2", body_hash="BODY1234567890")} renames, moves = _find_renames_and_moves(base, branch) # Original still present → not a rename. assert "file.py::fn" not in renames # ───────────────────────────────────────────────────────────────────────────── # Unit tests — _find_delete_use_conflicts # ───────────────────────────────────────────────────────────────────────────── class TestFindDeleteUseConflicts: def _make_manifests(self) -> tuple[Manifest, Manifest, Manifest]: return {"base": "m"}, {"ours": "m"}, {"theirs": "m"} def test_delete_use_detected(self, repo: pathlib.Path) -> None: from muse.cli.commands.plan_merge import _find_delete_use_conflicts base_syms = {"src/api.py::fn": _sym("fn")} ours_syms = {} # deleted on ours theirs_syms = {"src/api.py::fn": _sym("fn")} # still in theirs # Theirs has a new caller of fn that wasn't in base. base_fg = {"caller_base.py::existing": frozenset()} ours_fg = {} theirs_fg = { "caller_new.py::new_caller": frozenset({"fn"}), # new! } with ( patch("muse.plugins.code._callgraph.build_forward_graph", side_effect=[base_fg, ours_fg, theirs_fg]), ): items, ok, warn = _find_delete_use_conflicts( repo, {}, {}, {}, base_syms, ours_syms, theirs_syms, ) assert ok is True assert warn is None assert len(items) == 1 assert items[0].conflict_type == "delete_use" assert "caller_new.py::new_caller" in items[0].theirs_change def test_no_delete_use_when_no_deletions(self, repo: pathlib.Path) -> None: from muse.cli.commands.plan_merge import _find_delete_use_conflicts base_syms = {"src/api.py::fn": _sym("fn")} ours_syms = {"src/api.py::fn": _sym("fn")} theirs_syms = {"src/api.py::fn": _sym("fn")} items, ok, warn = _find_delete_use_conflicts( repo, {}, {}, {}, base_syms, ours_syms, theirs_syms, ) assert ok is True assert items == [] def test_call_graph_unavailable_warns(self, repo: pathlib.Path) -> None: from muse.cli.commands.plan_merge import _find_delete_use_conflicts base_syms = {"src/api.py::fn": _sym("fn")} ours_syms = {} theirs_syms = {"src/api.py::fn": _sym("fn")} with patch("muse.plugins.code._callgraph.build_forward_graph", side_effect=OSError("no index")): items, ok, warn = _find_delete_use_conflicts( repo, {}, {}, {}, base_syms, ours_syms, theirs_syms, ) assert ok is False assert warn is not None assert "call graph unavailable" in warn assert items == [] def test_keyerror_from_call_graph_warns(self, repo: pathlib.Path) -> None: from muse.cli.commands.plan_merge import _find_delete_use_conflicts base_syms = {"src/api.py::fn": _sym("fn")} ours_syms = {} theirs_syms = {"src/api.py::fn": _sym("fn")} with patch("muse.plugins.code._callgraph.build_forward_graph", side_effect=KeyError("missing")): items, ok, warn = _find_delete_use_conflicts( repo, {}, {}, {}, base_syms, ours_syms, theirs_syms, ) assert ok is False def test_unexpected_exception_propagates(self, repo: pathlib.Path) -> None: from muse.cli.commands.plan_merge import _find_delete_use_conflicts base_syms = {"src/api.py::fn": _sym("fn")} ours_syms = {} theirs_syms = {"src/api.py::fn": _sym("fn")} with patch("muse.plugins.code._callgraph.build_forward_graph", side_effect=MemoryError("OOM")): with pytest.raises(MemoryError): _find_delete_use_conflicts( repo, {}, {}, {}, base_syms, ours_syms, theirs_syms, ) def test_caller_preview_truncated_at_3(self, repo: pathlib.Path) -> None: from muse.cli.commands.plan_merge import _find_delete_use_conflicts base_syms = {"src/api.py::fn": _sym("fn")} ours_syms = {} theirs_syms = {"src/api.py::fn": _sym("fn")} theirs_fg = {f"mod{i}.py::caller{i}": frozenset({"fn"}) for i in range(10)} base_fg = {} ours_fg = {} with patch("muse.plugins.code._callgraph.build_forward_graph", side_effect=[base_fg, ours_fg, theirs_fg]): items, ok, _ = _find_delete_use_conflicts( repo, {}, {}, {}, base_syms, ours_syms, theirs_syms, ) assert len(items) == 1 assert "+7 more" in items[0].theirs_change # ───────────────────────────────────────────────────────────────────────────── # Unit tests — _find_dependency_conflicts # ───────────────────────────────────────────────────────────────────────────── class TestFindDependencyConflicts: def test_dependency_detected(self, repo: pathlib.Path) -> None: from muse.cli.commands.plan_merge import _find_dependency_conflicts # Ours changed fn_a; theirs changed fn_b which calls fn_a. ours_changed = {"src/x.py::fn_a"} theirs_changed = {"src/y.py::fn_b"} reverse = {"fn_a": ["src/y.py::fn_b"]} # fn_b calls fn_a. with ( patch("muse.plugins.code._callgraph.build_reverse_graph", return_value=reverse), patch("muse.plugins.code._callgraph.transitive_callers", return_value={1: ["src/y.py::fn_b"]}), ): items, ok, warn = _find_dependency_conflicts( repo, {}, {}, ours_changed, theirs_changed, ) assert ok is True assert len(items) == 1 assert items[0].conflict_type == "dependency_conflict" assert "src/x.py::fn_a" == items[0].address def test_no_conflict_when_no_changes(self, repo: pathlib.Path) -> None: from muse.cli.commands.plan_merge import _find_dependency_conflicts items, ok, warn = _find_dependency_conflicts(repo, {}, {}, set(), set()) assert items == [] assert ok is True def test_call_graph_error_warns(self, repo: pathlib.Path) -> None: from muse.cli.commands.plan_merge import _find_dependency_conflicts with patch("muse.plugins.code._callgraph.build_reverse_graph", side_effect=ValueError("bad data")): items, ok, warn = _find_dependency_conflicts( repo, {}, {}, {"x.py::fn"}, {"y.py::fn"}, ) assert ok is False assert warn is not None def test_deduplication(self, repo: pathlib.Path) -> None: """Same (ours_addr, theirs_addr) pair not added twice.""" from muse.cli.commands.plan_merge import _find_dependency_conflicts ours_changed = {"x.py::fn_a"} theirs_changed = {"y.py::fn_b"} reverse = {"fn_a": ["y.py::fn_b", "y.py::fn_b"]} # Duplicate. with ( patch("muse.plugins.code._callgraph.build_reverse_graph", return_value=reverse), patch("muse.plugins.code._callgraph.transitive_callers", return_value={1: ["y.py::fn_b", "y.py::fn_b"]}), ): items, _, _ = _find_dependency_conflicts(repo, {}, {}, ours_changed, theirs_changed) assert len(items) == 1 # Not duplicated. # ───────────────────────────────────────────────────────────────────────────── # Integration tests — full CLI (mock-based) # ───────────────────────────────────────────────────────────────────────────── class TestPlanMergeIntegration: def test_ref_not_found_exits_1(self, repo: pathlib.Path) -> None: from muse.cli.commands.plan_merge import run as pm_run ns = argparse.Namespace( ours_ref="nonexistent", theirs_ref="main", base_ref=None, skip_call_graph=True, fmt="json", json_out=True, ) old = os.getcwd() os.chdir(repo) try: with ( patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), patch("muse.cli.commands.plan_merge.resolve_commit_ref", return_value=None), ): with pytest.raises(SystemExit) as exc: pm_run(ns) finally: os.chdir(old) assert exc.value.code == 1 def test_json_schema_complete(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: code, out = _run_plan_merge(repo) data = json.loads(out) required = { "schema", "ours", "theirs", "base", "base_auto_computed", "call_graph_available", "call_graph_skipped", "warnings", "total_symbols", "conflicts", "clean", "items", "duration_ms", } assert required.issubset(data.keys()) def test_duration_ms_is_non_negative_float(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: _, out = _run_plan_merge(repo) data = json.loads(out) assert isinstance(data["duration_ms"], float) assert data["duration_ms"] >= 0 def test_full_commit_ids_in_json(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: ours_c = _mock_commit("a" * 64) theirs_c = _mock_commit("b" * 64) _, out = _run_plan_merge( repo, mock_ours_commit=ours_c, mock_theirs_commit=theirs_c, ) data = json.loads(out) assert data["ours"] == "a" * 64 assert data["theirs"] == "b" * 64 def test_base_auto_computed_true_when_no_base_arg(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: _, out = _run_plan_merge(repo) data = json.loads(out) assert data["base_auto_computed"] is True def test_base_auto_computed_false_when_base_arg_given(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.plan_merge import run as pm_run ours_c = _mock_commit("a" * 64) theirs_c = _mock_commit("b" * 64) base_c = _mock_commit("c" * 64) ns = argparse.Namespace( ours_ref="HEAD", theirs_ref="main", base_ref="base-ref", skip_call_graph=True, fmt="json", json_out=True, ) import io, sys captured = io.StringIO() old_stdout = sys.stdout sys.stdout = captured old = os.getcwd() os.chdir(repo) try: with ( patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), patch("muse.cli.commands.plan_merge.resolve_commit_ref", side_effect=lambda *a, **kw: ( ours_c if a[2] in ("HEAD", None) else theirs_c if a[2] == "main" else base_c if a[2] == "base-ref" else None )), patch("muse.cli.commands.plan_merge.find_merge_base", return_value="c" * 64), patch("muse.cli.commands.plan_merge.get_commit_snapshot_manifest", return_value={}), patch("muse.cli.commands.plan_merge.symbols_for_snapshot", return_value={}), ): pm_run(ns) except SystemExit: pass finally: sys.stdout = old_stdout os.chdir(old) data = json.loads(captured.getvalue()) assert data["base_auto_computed"] is False def test_call_graph_skipped_flag(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: _, out = _run_plan_merge(repo, skip_call_graph=True) data = json.loads(out) assert data["call_graph_skipped"] is True def test_warnings_list_in_json(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: _, out = _run_plan_merge(repo) data = json.loads(out) assert isinstance(data["warnings"], list) def test_no_conflicts_empty_swarm(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: _, out = _run_plan_merge(repo) data = json.loads(out) assert data["conflicts"] == 0 assert data["items"] == [] def test_symbol_edit_overlap_detected(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Three-way: base=X, ours=Y, theirs=Z → symbol_edit_overlap.""" base = {"f.py::fn": _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} ours = {"f.py::fn": _sym("fn", content_id="Y", body_hash="BH1", signature_id="SIG")} theirs = {"f.py::fn": _sym("fn", content_id="Z", body_hash="BH2", signature_id="SIG")} _, out = _run_plan_merge( repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) data = json.loads(out) assert data["conflicts"] == 1 assert data["items"][0]["conflict_type"] == "symbol_edit_overlap" def test_no_false_positive_unilateral_change(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Three-way: base=X, ours=Y, theirs=X → only ours changed → no_conflict.""" base = {"f.py::fn": _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} ours = {"f.py::fn": _sym("fn", content_id="Y", body_hash="BH1", signature_id="SIG")} theirs = {"f.py::fn": _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} _, out = _run_plan_merge( repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) data = json.loads(out) assert data["conflicts"] == 0, ( "Three-way should detect no conflict when only ours changed" ) def test_rename_edit_via_pass2_body_hash(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Pass 2 upgrades symbol_edit_overlap → rename_edit via body_hash matching.""" body = "REAL_BODY_HASH_12345" base = {"f.py::fn_old": {**_sym("fn_old", content_id="X", body_hash=body), "name": "fn_old"}} # Ours: fn_old deleted, fn_new added with same body (rename) ours = {"f.py::fn_new": {**_sym("fn_new", content_id="Y", body_hash=body), "name": "fn_new"}} # Theirs: fn_old modified (impl change) theirs = {"f.py::fn_old": {**_sym("fn_old", content_id="Z", body_hash="OTHER"), "name": "fn_old"}} _, out = _run_plan_merge( repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) data = json.loads(out) conflict_types = [i["conflict_type"] for i in data["items"]] assert "rename_edit" in conflict_types def test_move_edit_via_pass2(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Pass 2 detects move_edit when ours moved to different file and theirs modified.""" body = "MOVED_BODY_HASH_12345" base = {"old/file.py::fn": _sym("fn", content_id="X", body_hash=body)} # Ours: fn moved to new/file.py ours = {"new/file.py::fn": _sym("fn", content_id="X", body_hash=body)} # Theirs: fn modified in original location theirs = {"old/file.py::fn": _sym("fn", content_id="Z", body_hash="OTHER")} _, out = _run_plan_merge( repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) data = json.loads(out) conflict_types = [i["conflict_type"] for i in data["items"]] assert "move_edit" in conflict_types def test_text_output_format(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: _, out = _run_plan_merge(repo, fmt="text") assert "Semantic merge plan" in out assert "base:" in out def test_text_output_shows_conflicts(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: base = {"f.py::fn": _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} ours = {"f.py::fn": _sym("fn", content_id="Y", body_hash="BH1", signature_id="SIG")} theirs = {"f.py::fn": _sym("fn", content_id="Z", body_hash="BH2", signature_id="SIG")} _, out = _run_plan_merge( repo, fmt="text", mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) assert "symbol_edit_overlap" in out assert "ours:" in out assert "theirs:" in out def test_text_output_shows_elapsed(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: _, out = _run_plan_merge(repo, fmt="text") assert "s)" in out def test_skip_call_graph_omits_delete_use(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: base = {"f.py::fn": _sym("fn")} ours = {} # deleted theirs = {"f.py::fn": _sym("fn")} _, out = _run_plan_merge( repo, skip_call_graph=True, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) data = json.loads(out) conflict_types = [i["conflict_type"] for i in data["items"]] assert "delete_use" not in conflict_types def test_base_none_warning_in_json(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """When find_merge_base returns None, warnings includes a notice.""" from muse.cli.commands.plan_merge import run as pm_run ours_c = _mock_commit("a" * 64) theirs_c = _mock_commit("b" * 64) ns = argparse.Namespace( ours_ref="HEAD", theirs_ref="main", base_ref=None, skip_call_graph=True, fmt="json", json_out=True, ) import io, sys captured = io.StringIO() old = os.getcwd() os.chdir(repo) sys.stdout = captured try: with ( patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), patch("muse.cli.commands.plan_merge.resolve_commit_ref", side_effect=lambda *a: ours_c if a[2] in ("HEAD", None) else theirs_c), patch("muse.cli.commands.plan_merge.find_merge_base", return_value=None), patch("muse.cli.commands.plan_merge.get_commit_snapshot_manifest", return_value={}), patch("muse.cli.commands.plan_merge.symbols_for_snapshot", return_value={}), ): pm_run(ns) except SystemExit: pass finally: sys.stdout = sys.__stdout__ os.chdir(old) data = json.loads(captured.getvalue()) assert any("no common ancestor" in w for w in data["warnings"]) assert data["base"] is None def test_format_json_shorthand(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """--json is equivalent to --format json.""" from muse.cli.commands.plan_merge import run as pm_run ours_c = _mock_commit("a" * 64) theirs_c = _mock_commit("b" * 64) ns = argparse.Namespace( ours_ref="HEAD", theirs_ref="main", base_ref=None, skip_call_graph=True, fmt="json", # same as --json json_out=True, ) import io, sys captured = io.StringIO() old = os.getcwd() os.chdir(repo) sys.stdout = captured try: with ( patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), patch("muse.cli.commands.plan_merge.resolve_commit_ref", side_effect=lambda *a: ours_c if a[2] in ("HEAD", None) else theirs_c), patch("muse.cli.commands.plan_merge.find_merge_base", return_value="c" * 64), patch("muse.cli.commands.plan_merge.get_commit_snapshot_manifest", return_value={}), patch("muse.cli.commands.plan_merge.symbols_for_snapshot", return_value={}), ): pm_run(ns) except SystemExit: pass finally: sys.stdout = sys.__stdout__ os.chdir(old) data = json.loads(captured.getvalue()) assert "conflicts" in data # ───────────────────────────────────────────────────────────────────────────── # Security tests # ───────────────────────────────────────────────────────────────────────────── class TestPlanMergeSecurity: def test_ansi_in_address_stripped_text_output(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """ANSI escape codes in symbol addresses are stripped before display.""" ansi_addr = "\x1b[31msrc/malicious.py::fn\x1b[0m" base = {ansi_addr: _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} ours = {ansi_addr: _sym("fn", content_id="Y", body_hash="BH1", signature_id="SIG")} theirs = {ansi_addr: _sym("fn", content_id="Z", body_hash="BH2", signature_id="SIG")} _, out = _run_plan_merge( repo, fmt="text", mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) assert "\x1b[" not in out assert "src/malicious.py::fn" in out # sanitized content still shown def test_ansi_in_recommendation_stripped(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: base = {"f.py::fn": _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} ours = {"f.py::fn": _sym("fn", content_id="Y", body_hash="BH1", signature_id="SIG")} theirs = {"f.py::fn": _sym("fn", content_id="Z", body_hash="BH2", signature_id="SIG")} _, out = _run_plan_merge( repo, fmt="text", mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) assert "\x1b[" not in out def test_control_chars_in_ref_not_escape_fs(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Control characters in ref names are sanitised before display in error output.""" from muse.cli.commands.plan_merge import run as pm_run import io, sys captured = io.StringIO() malicious_ref = "ref\x00/../../../etc" ns = argparse.Namespace( ours_ref=malicious_ref, theirs_ref="main", base_ref=None, skip_call_graph=True, fmt="json", json_out=True, ) old = os.getcwd() os.chdir(repo) sys.stdout = captured try: with ( patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), patch("muse.cli.commands.plan_merge.resolve_commit_ref", return_value=None), ): with pytest.raises(SystemExit) as exc: pm_run(ns) finally: sys.stdout = sys.__stdout__ os.chdir(old) assert exc.value.code == 1 out = captured.getvalue() # Output must not contain raw null bytes. assert "\x00" not in out # ───────────────────────────────────────────────────────────────────────────── # Stress tests # ───────────────────────────────────────────────────────────────────────────── class TestPlanMergeStress: def test_1000_symbols_pass1_under_2s(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Pass 1 with 1000 symbols completes in < 2 s.""" N = 1000 base = {f"f{i}.py::fn{i}": _sym(f"fn{i}", content_id=f"X{i}") for i in range(N)} # Ours and theirs both change half the symbols differently. ours = { f"f{i}.py::fn{i}": ( _sym(f"fn{i}", content_id=f"Y{i}") if i % 2 == 0 else _sym(f"fn{i}", content_id=f"X{i}") ) for i in range(N) } theirs = { f"f{i}.py::fn{i}": ( _sym(f"fn{i}", content_id=f"X{i}") if i % 2 == 0 else _sym(f"fn{i}", content_id=f"Z{i}") ) for i in range(N) } t0 = time.monotonic() _, out = _run_plan_merge( repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) elapsed = time.monotonic() - t0 assert elapsed < 2.0, f"Pass 1 took {elapsed:.2f}s — too slow" data = json.loads(out) assert data["total_symbols"] == N def test_200_renames_pass2_under_1s(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Pass 2 rename detection with 200 renames completes in < 1 s.""" N = 200 body_hashes = [f"BODYHASH{i:08d}" for i in range(N)] base = { f"f.py::fn_old_{i}": {**_sym(f"fn_old_{i}", body_hash=body_hashes[i]), "name": f"fn_old_{i}"} for i in range(N) } # Ours: all renamed ours = { f"f.py::fn_new_{i}": {**_sym(f"fn_new_{i}", body_hash=body_hashes[i]), "name": f"fn_new_{i}"} for i in range(N) } # Theirs: all modified (different body) theirs = { f"f.py::fn_old_{i}": {**_sym(f"fn_old_{i}", content_id=f"Z{i}", body_hash=f"DIFF{i}"), "name": f"fn_old_{i}"} for i in range(N) } t0 = time.monotonic() _, out = _run_plan_merge( repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) elapsed = time.monotonic() - t0 assert elapsed < 1.0, f"Pass 2 took {elapsed:.2f}s — too slow" data = json.loads(out) conflict_types = [i["conflict_type"] for i in data["items"]] assert "rename_edit" in conflict_types def test_100_deletes_delete_use_detection_under_2s(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """delete_use detection with 100 deleted symbols completes in < 2 s.""" from muse.cli.commands.plan_merge import _find_delete_use_conflicts N = 100 base_syms = {f"f{i}.py::fn{i}": _sym(f"fn{i}") for i in range(N)} ours_syms: SymbolTree = {} # All deleted on ours. theirs_syms = dict(base_syms) # All present on theirs. # Mock: each fn has a new caller on theirs. base_fg: ForwardGraph = {} ours_fg: ForwardGraph = {} theirs_fg = {f"new_caller{i}.py::caller{i}": frozenset({f"fn{i}"}) for i in range(N)} t0 = time.monotonic() with patch("muse.plugins.code._callgraph.build_forward_graph", side_effect=[base_fg, ours_fg, theirs_fg]): items, ok, warn = _find_delete_use_conflicts( repo, {}, {}, {}, base_syms, ours_syms, theirs_syms, ) elapsed = time.monotonic() - t0 assert elapsed < 2.0, f"delete_use took {elapsed:.2f}s — too slow" assert ok is True assert len(items) == N def test_correctness_three_way_no_false_positives(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Regression: three-way diff must not produce false positives on unilateral changes.""" N = 500 # Ours changes even-indexed symbols, theirs changes odd-indexed. # Each symbol is changed by only ONE side → all should be no_conflict. base = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"BASE{i}") for i in range(N)} ours = { f"f.py::fn{i}": ( _sym(f"fn{i}", content_id=f"OUR{i}") if i % 2 == 0 else _sym(f"fn{i}", content_id=f"BASE{i}") ) for i in range(N) } theirs = { f"f.py::fn{i}": ( _sym(f"fn{i}", content_id=f"BASE{i}") if i % 2 == 0 else _sym(f"fn{i}", content_id=f"THEIR{i}") ) for i in range(N) } _, out = _run_plan_merge( repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) data = json.loads(out) assert data["conflicts"] == 0, ( f"Three-way correctness: expected 0 conflicts, got {data['conflicts']}" ) # ───────────────────────────────────────────────────────────────────────────── # Error shape tests # ───────────────────────────────────────────────────────────────────────────── class TestPlanMergeErrorShapes: """Verify error output shapes are consistent across text and JSON modes.""" def _run_with_no_ours(self, repo: pathlib.Path, fmt: str = "json") -> tuple[int, str, str]: """Run plan-merge where ours-ref resolves to None.""" from muse.cli.commands.plan_merge import run as pm_run ns = argparse.Namespace( ours_ref="missing-ref", theirs_ref="main", base_ref=None, skip_call_graph=True, fmt=fmt, json_out=(fmt == "json"), ) import io captured = io.StringIO() old_stdout = sys.stdout old_stderr = sys.stderr sys.stdout = captured err_captured = io.StringIO() sys.stderr = err_captured old = os.getcwd() os.chdir(repo) exit_code = 0 try: with ( patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), patch("muse.cli.commands.plan_merge.resolve_commit_ref", return_value=None), ): with pytest.raises(SystemExit) as exc: pm_run(ns) exit_code = exc.value.code finally: sys.stdout = old_stdout sys.stderr = old_stderr os.chdir(old) return exit_code, captured.getvalue(), err_captured.getvalue() def _run_with_no_theirs(self, repo: pathlib.Path, fmt: str = "json") -> tuple[int, str, str]: from muse.cli.commands.plan_merge import run as pm_run ours_c = _mock_commit("a" * 64) ns = argparse.Namespace( ours_ref="HEAD", theirs_ref="missing-branch", base_ref=None, skip_call_graph=True, fmt=fmt, json_out=(fmt == "json"), ) import io captured = io.StringIO() err_captured = io.StringIO() old_stdout, old_stderr = sys.stdout, sys.stderr sys.stdout = captured sys.stderr = err_captured old = os.getcwd() os.chdir(repo) try: with ( patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), patch("muse.cli.commands.plan_merge.resolve_commit_ref", side_effect=lambda *a: ours_c if a[2] == "HEAD" else None), ): with pytest.raises(SystemExit) as exc: pm_run(ns) exit_code = exc.value.code finally: sys.stdout = old_stdout sys.stderr = old_stderr os.chdir(old) return exit_code, captured.getvalue(), err_captured.getvalue() def _run_with_no_base(self, repo: pathlib.Path, fmt: str = "json") -> tuple[int, str, str]: from muse.cli.commands.plan_merge import run as pm_run ours_c = _mock_commit("a" * 64) theirs_c = _mock_commit("b" * 64) ns = argparse.Namespace( ours_ref="HEAD", theirs_ref="main", base_ref="missing-base", skip_call_graph=True, fmt=fmt, json_out=(fmt == "json"), ) import io captured = io.StringIO() err_captured = io.StringIO() old_stdout, old_stderr = sys.stdout, sys.stderr sys.stdout = captured sys.stderr = err_captured old = os.getcwd() os.chdir(repo) try: with ( patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), patch("muse.cli.commands.plan_merge.resolve_commit_ref", side_effect=lambda *a: ( ours_c if a[2] == "HEAD" else theirs_c if a[2] == "main" else None # base-ref not found )), ): with pytest.raises(SystemExit) as exc: pm_run(ns) exit_code = exc.value.code finally: sys.stdout = old_stdout sys.stderr = old_stderr os.chdir(old) return exit_code, captured.getvalue(), err_captured.getvalue() # ── ours-ref not found ──────────────────────────────────────────────────── def test_ours_not_found_json_has_error_and_status(self, repo: pathlib.Path) -> None: code, out, _ = self._run_with_no_ours(repo, fmt="json") assert code == 1 data = json.loads(out.strip()) assert "error" in data assert data["status"] == "error" def test_ours_not_found_json_error_mentions_ref(self, repo: pathlib.Path) -> None: code, out, _ = self._run_with_no_ours(repo, fmt="json") data = json.loads(out.strip()) assert "missing-ref" in data["error"] def test_ours_not_found_text_uses_tick_prefix(self, repo: pathlib.Path) -> None: code, _, err = self._run_with_no_ours(repo, fmt="text") assert code == 1 assert "❌" in err def test_ours_not_found_text_no_output_on_stdout(self, repo: pathlib.Path) -> None: code, out, _ = self._run_with_no_ours(repo, fmt="text") assert out == "" # ── theirs-ref not found ────────────────────────────────────────────────── def test_theirs_not_found_exits_1(self, repo: pathlib.Path) -> None: code, _, _ = self._run_with_no_theirs(repo) assert code == 1 def test_theirs_not_found_json_has_status(self, repo: pathlib.Path) -> None: code, out, _ = self._run_with_no_theirs(repo, fmt="json") data = json.loads(out.strip()) assert data["status"] == "error" def test_theirs_not_found_error_mentions_ref(self, repo: pathlib.Path) -> None: code, out, _ = self._run_with_no_theirs(repo, fmt="json") data = json.loads(out.strip()) assert "missing-branch" in data["error"] def test_theirs_not_found_text_uses_tick_prefix(self, repo: pathlib.Path) -> None: code, _, err = self._run_with_no_theirs(repo, fmt="text") assert "❌" in err # ── base-ref not found ──────────────────────────────────────────────────── def test_base_not_found_exits_1(self, repo: pathlib.Path) -> None: code, _, _ = self._run_with_no_base(repo) assert code == 1 def test_base_not_found_json_has_status(self, repo: pathlib.Path) -> None: code, out, _ = self._run_with_no_base(repo, fmt="json") data = json.loads(out.strip()) assert data["status"] == "error" def test_base_not_found_error_mentions_ref(self, repo: pathlib.Path) -> None: code, out, _ = self._run_with_no_base(repo, fmt="json") data = json.loads(out.strip()) assert "missing-base" in data["error"] def test_base_not_found_text_uses_tick_prefix(self, repo: pathlib.Path) -> None: code, _, err = self._run_with_no_base(repo, fmt="text") assert "❌" in err # ───────────────────────────────────────────────────────────────────────────── # Compact JSON and conflicts_by_type # ───────────────────────────────────────────────────────────────────────────── class TestPlanMergeJsonOutput: """Verify JSON output shape, compactness, and new fields.""" def test_json_is_single_line(self, repo: pathlib.Path) -> None: """Output must be compact — no embedded newlines from indent=2.""" _, out = _run_plan_merge(repo) lines = [ln for ln in out.splitlines() if ln.strip()] assert len(lines) == 1, f"JSON output must be a single line, got {len(lines)} lines" def test_json_is_valid(self, repo: pathlib.Path) -> None: _, out = _run_plan_merge(repo) data = json.loads(out) # raises if invalid assert isinstance(data, dict) def test_json_includes_conflicts_by_type_empty(self, repo: pathlib.Path) -> None: """When no conflicts, conflicts_by_type is an empty dict.""" _, out = _run_plan_merge(repo) data = json.loads(out) assert "conflicts_by_type" in data assert data["conflicts_by_type"] == {} def test_json_conflicts_by_type_single_type(self, repo: pathlib.Path) -> None: """Single conflict type → one entry in conflicts_by_type.""" base = {"f.py::fn": _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} ours = {"f.py::fn": _sym("fn", content_id="Y", body_hash="BH1", signature_id="SIG")} theirs = {"f.py::fn": _sym("fn", content_id="Z", body_hash="BH2", signature_id="SIG")} _, out = _run_plan_merge( repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) data = json.loads(out) assert data["conflicts_by_type"].get("symbol_edit_overlap", 0) == 1 def test_json_conflicts_by_type_count_matches_conflicts_field(self, repo: pathlib.Path) -> None: """Sum of conflicts_by_type values must equal the 'conflicts' field.""" N = 5 base = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"X{i}") for i in range(N)} ours = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"Y{i}") for i in range(N)} theirs = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"Z{i}") for i in range(N)} _, out = _run_plan_merge( repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) data = json.loads(out) assert sum(data["conflicts_by_type"].values()) == data["conflicts"] def test_json_schema_includes_all_required_fields(self, repo: pathlib.Path) -> None: """Backward-compat check: all documented schema fields are present.""" _, out = _run_plan_merge(repo) data = json.loads(out) required = { "schema", "ours", "theirs", "base", "base_auto_computed", "call_graph_available", "call_graph_skipped", "warnings", "total_symbols", "conflicts", "clean", "conflicts_by_type", "items", "duration_ms", } missing = required - data.keys() assert not missing, f"Missing JSON fields: {missing}" def test_json_items_contains_only_conflicts(self, repo: pathlib.Path) -> None: """items must contain only conflicting symbols, not clean ones.""" base = {"f.py::fn": _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} ours = {"f.py::fn": _sym("fn", content_id="Y", body_hash="BH1", signature_id="SIG")} theirs = {"f.py::fn": _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} _, out = _run_plan_merge( repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) data = json.loads(out) assert data["conflicts"] == 0 assert data["items"] == [] def test_json_error_shape_has_status(self, repo: pathlib.Path) -> None: """Error JSON must include 'status' = 'error' (agent-parseable error shape).""" from muse.cli.commands.plan_merge import run as pm_run ns = argparse.Namespace( ours_ref="bad", theirs_ref="main", base_ref=None, skip_call_graph=True, fmt="json", json_out=True, ) import io captured = io.StringIO() old = os.getcwd() os.chdir(repo) old_stdout = sys.stdout sys.stdout = captured try: with ( patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), patch("muse.cli.commands.plan_merge.resolve_commit_ref", return_value=None), ): with pytest.raises(SystemExit): pm_run(ns) finally: sys.stdout = old_stdout os.chdir(old) data = json.loads(captured.getvalue().strip()) assert data["status"] == "error" assert "error" in data # ───────────────────────────────────────────────────────────────────────────── # Stress tests — conflicts_by_type correctness at scale # ───────────────────────────────────────────────────────────────────────────── class TestPlanMergeConflictsByTypeStress: def test_conflicts_by_type_counts_correct_at_scale(self, repo: pathlib.Path) -> None: """100 symbol_edit_overlap conflicts → conflicts_by_type["symbol_edit_overlap"] == 100.""" N = 100 base = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"X{i}") for i in range(N)} ours = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"Y{i}") for i in range(N)} theirs = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"Z{i}") for i in range(N)} _, out = _run_plan_merge( repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) data = json.loads(out) assert data["conflicts_by_type"]["symbol_edit_overlap"] == N assert sum(data["conflicts_by_type"].values()) == N def test_json_is_still_compact_with_many_items(self, repo: pathlib.Path) -> None: """Even with 500 conflict items, JSON output is a single line.""" N = 500 base = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"X{i}") for i in range(N)} ours = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"Y{i}") for i in range(N)} theirs = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"Z{i}") for i in range(N)} _, out = _run_plan_merge( repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) lines = [ln for ln in out.splitlines() if ln.strip()] assert len(lines) == 1 def test_total_symbols_counts_union_of_all_three(self, repo: pathlib.Path) -> None: """total_symbols is |ours ∪ theirs ∪ base| — not just conflicts.""" # 5 shared (conflicts) + 5 ours-only (clean) + 5 theirs-only (clean) base = {f"f.py::shared{i}": _sym(f"shared{i}") for i in range(5)} ours_extra = {f"f.py::ours{i}": _sym(f"ours{i}") for i in range(5)} theirs_extra = {f"f.py::theirs{i}": _sym(f"theirs{i}") for i in range(5)} # Make shared symbols conflict ours_syms = { **{f"f.py::shared{i}": _sym(f"shared{i}", content_id=f"Y{i}") for i in range(5)}, **ours_extra, } theirs_syms = { **{f"f.py::shared{i}": _sym(f"shared{i}", content_id=f"Z{i}") for i in range(5)}, **theirs_extra, } _, out = _run_plan_merge( repo, mock_ours_syms=ours_syms, mock_theirs_syms=theirs_syms, mock_base_syms=base, ) data = json.loads(out) assert data["total_symbols"] == 15 assert data["conflicts"] == 5 assert data["clean"] == 10 # ───────────────────────────────────────────────────────────────────────────── # E2E tests — same-commit and multi-type plans # ───────────────────────────────────────────────────────────────────────────── class TestPlanMergeE2E: """E2E-style integration tests using mock commits that simulate real scenarios.""" def test_same_symbols_on_both_branches_zero_conflicts(self, repo: pathlib.Path) -> None: """Identical symbol trees → no conflicts, all clean.""" syms = {f"f.py::fn{i}": _sym(f"fn{i}") for i in range(20)} _, out = _run_plan_merge( repo, mock_ours_syms=syms, mock_theirs_syms=syms, mock_base_syms=syms, ) data = json.loads(out) assert data["conflicts"] == 0 assert data["total_symbols"] == 20 assert data["conflicts_by_type"] == {} def test_disjoint_changes_no_conflicts(self, repo: pathlib.Path) -> None: """Ours and theirs each modify different symbols → all no_conflict.""" base = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"BASE{i}") for i in range(10)} ours = dict(base) theirs = dict(base) # Ours modifies 0–4, theirs modifies 5–9 for i in range(5): ours[f"f.py::fn{i}"] = _sym(f"fn{i}", content_id=f"OUR{i}") for i in range(5, 10): theirs[f"f.py::fn{i}"] = _sym(f"fn{i}", content_id=f"THEIR{i}") _, out = _run_plan_merge( repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) data = json.loads(out) assert data["conflicts"] == 0 def test_one_new_symbol_on_each_branch_no_conflict(self, repo: pathlib.Path) -> None: """Each branch adds a different new symbol → no overlap → no conflict.""" base: SymbolTree = {} ours = {"f.py::fn_ours": _sym("fn_ours")} theirs = {"f.py::fn_theirs": _sym("fn_theirs")} _, out = _run_plan_merge( repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) data = json.loads(out) assert data["conflicts"] == 0 def test_both_add_same_symbol_different_content_conflict(self, repo: pathlib.Path) -> None: """Both branches add the same address with different content → conflict.""" base: SymbolTree = {} ours = {"f.py::fn_new": _sym("fn_new", content_id="OUR", body_hash="BH1")} theirs = {"f.py::fn_new": _sym("fn_new", content_id="THEIR", body_hash="BH2")} _, out = _run_plan_merge( repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) data = json.loads(out) assert data["conflicts"] == 1 def test_both_add_same_symbol_same_content_no_conflict(self, repo: pathlib.Path) -> None: """Both branches add the same symbol with identical content → no conflict.""" base: SymbolTree = {} sym = _sym("fn_new", content_id="SAME") _, out = _run_plan_merge( repo, mock_ours_syms={"f.py::fn_new": sym}, mock_theirs_syms={"f.py::fn_new": sym}, mock_base_syms=base, ) data = json.loads(out) assert data["conflicts"] == 0 def test_both_delete_same_symbol_no_conflict(self, repo: pathlib.Path) -> None: """Both branches delete the same symbol → no conflict.""" base = {"f.py::fn": _sym("fn")} ours: SymbolTree = {} theirs: SymbolTree = {} _, out = _run_plan_merge( repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) data = json.loads(out) assert data["conflicts"] == 0 def test_rename_edit_and_overlap_in_same_plan(self, repo: pathlib.Path) -> None: """A plan with both rename_edit and symbol_edit_overlap appears correctly in conflicts_by_type.""" body = "UNIQUEBODY12345" # Symbol A: ours renames it, theirs modifies original → rename_edit base_a = {"f.py::fn_a_old": {**_sym("fn_a_old", body_hash=body), "name": "fn_a_old"}} ours_a = {"f.py::fn_a_new": {**_sym("fn_a_new", body_hash=body), "name": "fn_a_new"}} theirs_a = {"f.py::fn_a_old": {**_sym("fn_a_old", content_id="Z", body_hash="DIFF"), "name": "fn_a_old"}} # Symbol B: both change differently → symbol_edit_overlap base_b = {"f.py::fn_b": _sym("fn_b", content_id="X", body_hash="BH0", signature_id="SIG")} ours_b = {"f.py::fn_b": _sym("fn_b", content_id="Y", body_hash="BH1", signature_id="SIG")} theirs_b = {"f.py::fn_b": _sym("fn_b", content_id="Z2", body_hash="BH2", signature_id="SIG")} base = {**base_a, **base_b} ours = {**ours_a, **ours_b} theirs = {**theirs_a, **theirs_b} _, out = _run_plan_merge( repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, ) data = json.loads(out) cbt = data["conflicts_by_type"] assert cbt.get("rename_edit", 0) >= 1 assert cbt.get("symbol_edit_overlap", 0) >= 1 assert sum(cbt.values()) == data["conflicts"] class TestRegisterFlags: def _parse(self, *args: str) -> "argparse.Namespace": import argparse from muse.cli.commands.plan_merge import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) return p.parse_args(["plan-merge", "HEAD", "main", *args]) def test_json_short_flag(self) -> None: args = self._parse("-j") assert args.json_out is True def test_json_long_flag(self) -> None: args = self._parse("--json") assert args.json_out is True def test_default_no_json(self) -> None: args = self._parse() assert args.json_out is False