test_cmd_plan_merge.py
python
sha256:d11a87833d5fad6059b7662844bf5448a8911a17cce7a51811f71ad394f248eb
bump to v0.2.0rc13
Human
patch
6 days ago
| 1 | """Tests for ``muse coord plan-merge``. |
| 2 | |
| 3 | Coverage matrix |
| 4 | --------------- |
| 5 | Unit |
| 6 | ~~~~ |
| 7 | * :func:`_classify_change` — all six change classifications |
| 8 | * :func:`_classify_conflict` — all three-way matrix cells |
| 9 | (base/ours/theirs combinations: 8 cases + rename, symbol_edit_overlap) |
| 10 | * :func:`_find_renames_and_moves` — rename vs move discrimination, trivial body skipped |
| 11 | * :func:`_find_delete_use_conflicts` — new callers detected, call graph errors handled |
| 12 | * :func:`_find_dependency_conflicts` — transitive dependency detection, errors handled |
| 13 | * :class:`_MergeItem` — to_dict, slots |
| 14 | |
| 15 | Integration (mock-based — no real commits required) |
| 16 | ~~~~~~~~~~~ |
| 17 | * ``plan-merge OURS THEIRS`` — ref not found → exit 1 |
| 18 | * ``plan-merge OURS THEIRS`` — theirs ref not found → exit 1 |
| 19 | * ``plan-merge OURS THEIRS --base MISSING`` — base ref not found → exit 1 |
| 20 | * ``plan-merge OURS THEIRS`` — base auto-computed |
| 21 | * ``plan-merge OURS THEIRS --base BASE_REF`` |
| 22 | * ``plan-merge OURS THEIRS --skip-call-graph`` |
| 23 | * ``plan-merge OURS THEIRS --format json`` — schema complete |
| 24 | * ``plan-merge OURS THEIRS --json`` — shorthand |
| 25 | * JSON: ``base_auto_computed``, ``call_graph_available``, ``warnings``, |
| 26 | ``duration_ms``, full commit IDs, ``conflicts_by_type`` breakdown |
| 27 | * JSON output is compact (no indent — single line) |
| 28 | * Text output: conflict summary, warnings visible, elapsed present |
| 29 | * ``symbol_edit_overlap`` detected (both changed, content differs) |
| 30 | * ``rename_edit`` detected via body_hash matching |
| 31 | * ``move_edit`` detected via body_hash + file prefix mismatch |
| 32 | * ``delete_use`` detected via forward call graph |
| 33 | * ``dependency_conflict`` detected via reverse call graph |
| 34 | * ``no_conflict`` for unilateral changes (three-way correctness) |
| 35 | * call_graph unavailable → warning, not crash |
| 36 | * unexpected call graph exception propagates |
| 37 | |
| 38 | Error shapes |
| 39 | ~~~~~~~~~~~~ |
| 40 | * JSON error has ``{"error": ..., "status": "error"}`` |
| 41 | * Text error uses ``❌`` prefix on stderr |
| 42 | * theirs-ref not-found JSON error shape |
| 43 | * base-ref not-found JSON error shape |
| 44 | |
| 45 | Security |
| 46 | ~~~~~~~~ |
| 47 | * ANSI sequences in ref names, addresses, change descriptions stripped |
| 48 | * Path traversal in ref args → resolve_commit_ref handles it (no FS access) |
| 49 | |
| 50 | Stress |
| 51 | ~~~~~~ |
| 52 | * 1000 symbols across ours + theirs → Pass 1 in < 2 s |
| 53 | * 200 renames → Pass 2 in < 1 s |
| 54 | * delete_use with 100 deleted symbols → < 2 s |
| 55 | * conflicts_by_type counts correct with mixed conflict types at scale |
| 56 | |
| 57 | E2E |
| 58 | ~~~ |
| 59 | * Same commit for ours and theirs → 0 conflicts |
| 60 | * Three distinct conflict types in one plan → all appear in conflicts_by_type |
| 61 | """ |
| 62 | |
| 63 | from __future__ import annotations |
| 64 | |
| 65 | import argparse |
| 66 | import json |
| 67 | import os |
| 68 | import pathlib |
| 69 | import sys |
| 70 | import time |
| 71 | from unittest.mock import MagicMock, patch |
| 72 | |
| 73 | import pytest |
| 74 | |
| 75 | from muse.core.types import Manifest, fake_id |
| 76 | from muse.core.paths import muse_dir |
| 77 | from muse.plugins.code.ast_parser import SymbolRecord, SymbolTree |
| 78 | type SymbolMap = dict[str, SymbolRecord] |
| 79 | type SymbolsByFile = dict[str, SymbolMap] |
| 80 | from muse.plugins.code._callgraph import ForwardGraph |
| 81 | |
| 82 | |
| 83 | # ── Helpers ─────────────────────────────────────────────────────────────────── |
| 84 | |
| 85 | |
| 86 | def _sym( |
| 87 | name: str, |
| 88 | content_id: str | None = None, |
| 89 | body_hash: str | None = None, |
| 90 | signature_id: str | None = None, |
| 91 | metadata_id: str = "", |
| 92 | ) -> SymbolRecord: |
| 93 | """Build a minimal SymbolRecord for testing.""" |
| 94 | h = content_id or f"cid-{name}" |
| 95 | b = body_hash or f"bh-{name}" |
| 96 | s = signature_id or f"sid-{name}" |
| 97 | return { |
| 98 | "kind": "function", |
| 99 | "name": name, |
| 100 | "qualified_name": name, |
| 101 | "content_id": h, |
| 102 | "body_hash": b, |
| 103 | "signature_id": s, |
| 104 | "metadata_id": metadata_id, |
| 105 | "canonical_key": f"f.py##{name}", |
| 106 | } |
| 107 | |
| 108 | |
| 109 | def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 110 | dot_muse = muse_dir(tmp_path) |
| 111 | dot_muse.mkdir() |
| 112 | (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") |
| 113 | (dot_muse / "repo.json").write_text( |
| 114 | json.dumps({"repo_id": fake_id("repo"), "name": "test-repo"}) |
| 115 | ) |
| 116 | return tmp_path |
| 117 | |
| 118 | |
| 119 | @pytest.fixture() |
| 120 | def repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 121 | return _make_repo(tmp_path) |
| 122 | |
| 123 | |
| 124 | def _mock_commit(cid: str | None = None) -> MagicMock: |
| 125 | m = MagicMock() |
| 126 | m.commit_id = cid or fake_id("commit") |
| 127 | return m |
| 128 | |
| 129 | |
| 130 | def _run_plan_merge( |
| 131 | repo: pathlib.Path, |
| 132 | ours_ref: str = "HEAD", |
| 133 | theirs_ref: str = "main", |
| 134 | base_ref: str | None = None, |
| 135 | skip_call_graph: bool = True, |
| 136 | fmt: str = "json", |
| 137 | mock_ours_commit: MagicMock | None = None, |
| 138 | mock_theirs_commit: MagicMock | None = None, |
| 139 | mock_base_cid: str | None = None, |
| 140 | mock_ours_syms: SymbolMap | None = None, |
| 141 | mock_theirs_syms: SymbolMap | None = None, |
| 142 | mock_base_syms: SymbolMap | None = None, |
| 143 | ) -> tuple[int, str]: |
| 144 | """Run plan-merge with mocked commits/symbols. Returns (exit_code, stdout).""" |
| 145 | from muse.cli.commands.plan_merge import run as pm_run |
| 146 | |
| 147 | ours_c = mock_ours_commit or _mock_commit(f"aaa{'0' * 61}") |
| 148 | theirs_c = mock_theirs_commit or _mock_commit(f"bbb{'0' * 61}") |
| 149 | base_cid = mock_base_cid or f"ccc{'0' * 61}" |
| 150 | |
| 151 | ours_syms = mock_ours_syms or {} |
| 152 | theirs_syms = mock_theirs_syms or {} |
| 153 | base_syms = mock_base_syms or {} |
| 154 | |
| 155 | def _sym_for_snapshot(root: pathlib.Path, manifest: Manifest, **kwargs: str) -> SymbolsByFile: |
| 156 | if manifest.get("_branch") == "ours": |
| 157 | return {"f.py": ours_syms} |
| 158 | if manifest.get("_branch") == "theirs": |
| 159 | return {"f.py": theirs_syms} |
| 160 | if manifest.get("_branch") == "base": |
| 161 | return {"f.py": base_syms} |
| 162 | return {} |
| 163 | |
| 164 | ns = argparse.Namespace( |
| 165 | ours_ref=ours_ref, |
| 166 | theirs_ref=theirs_ref, |
| 167 | base_ref=base_ref, |
| 168 | skip_call_graph=skip_call_graph, |
| 169 | fmt=fmt, |
| 170 | json_out=(fmt == "json"), |
| 171 | ) |
| 172 | old = os.getcwd() |
| 173 | os.chdir(repo) |
| 174 | import io, sys |
| 175 | captured = io.StringIO() |
| 176 | old_stdout = sys.stdout |
| 177 | sys.stdout = captured |
| 178 | |
| 179 | exit_code = 0 |
| 180 | try: |
| 181 | with ( |
| 182 | patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), |
| 183 | patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), |
| 184 | patch("muse.cli.commands.plan_merge.resolve_commit_ref", |
| 185 | side_effect=lambda root, branch, ref: ( |
| 186 | ours_c if ref in (ours_ref, None) else |
| 187 | theirs_c if ref == theirs_ref else |
| 188 | _mock_commit(mock_base_cid) if ref == base_ref else None |
| 189 | )), |
| 190 | patch("muse.cli.commands.plan_merge.find_merge_base", |
| 191 | return_value=base_cid), |
| 192 | patch("muse.cli.commands.plan_merge.get_commit_snapshot_manifest", |
| 193 | side_effect=lambda root, cid: ( |
| 194 | {"_branch": "ours"} if cid == ours_c.commit_id else |
| 195 | {"_branch": "theirs"} if cid == theirs_c.commit_id else |
| 196 | {"_branch": "base"} |
| 197 | )), |
| 198 | patch("muse.cli.commands.plan_merge.symbols_for_snapshot", |
| 199 | side_effect=_sym_for_snapshot), |
| 200 | ): |
| 201 | pm_run(ns) |
| 202 | except SystemExit as exc: |
| 203 | exit_code = exc.code or 0 |
| 204 | finally: |
| 205 | sys.stdout = old_stdout |
| 206 | os.chdir(old) |
| 207 | |
| 208 | return exit_code, captured.getvalue() |
| 209 | |
| 210 | |
| 211 | # ───────────────────────────────────────────────────────────────────────────── |
| 212 | # Unit tests — _classify_change |
| 213 | # ───────────────────────────────────────────────────────────────────────────── |
| 214 | |
| 215 | |
| 216 | class TestClassifyChange: |
| 217 | def test_unchanged(self) -> None: |
| 218 | from muse.cli.commands.plan_merge import _classify_change |
| 219 | s = _sym("fn", content_id="X") |
| 220 | assert _classify_change(s, s) == "unchanged" |
| 221 | |
| 222 | def test_metadata_only(self) -> None: |
| 223 | from muse.cli.commands.plan_merge import _classify_change |
| 224 | base = _sym("fn", content_id="X", body_hash="BH", signature_id="SIG") |
| 225 | target = _sym("fn", content_id="Y", body_hash="BH", signature_id="SIG") |
| 226 | assert _classify_change(base, target) == "metadata_only" |
| 227 | |
| 228 | def test_signature_only(self) -> None: |
| 229 | from muse.cli.commands.plan_merge import _classify_change |
| 230 | base = _sym("fn", body_hash="BH", signature_id="SIG1") |
| 231 | target = _sym("fn", content_id="X2", body_hash="BH", signature_id="SIG2") |
| 232 | assert _classify_change(base, target) == "signature_only" |
| 233 | |
| 234 | def test_impl_only(self) -> None: |
| 235 | from muse.cli.commands.plan_merge import _classify_change |
| 236 | base = _sym("fn", body_hash="BH1", signature_id="SIG") |
| 237 | target = _sym("fn", content_id="X2", body_hash="BH2", signature_id="SIG") |
| 238 | assert _classify_change(base, target) == "impl_only" |
| 239 | |
| 240 | def test_rename_modify(self) -> None: |
| 241 | from muse.cli.commands.plan_merge import _classify_change |
| 242 | base = {**_sym("fn_old"), "name": "fn_old"} |
| 243 | target = {**_sym("fn_new", content_id="X2", body_hash="BH2"), "name": "fn_new"} |
| 244 | assert _classify_change(base, target) == "rename+modify" |
| 245 | |
| 246 | def test_full_rewrite(self) -> None: |
| 247 | from muse.cli.commands.plan_merge import _classify_change |
| 248 | base = _sym("fn") |
| 249 | target = _sym("fn", content_id="X2", body_hash="BH2", signature_id="SIG2") |
| 250 | assert _classify_change(base, target) == "full_rewrite" |
| 251 | |
| 252 | |
| 253 | # ───────────────────────────────────────────────────────────────────────────── |
| 254 | # Unit tests — _classify_conflict (three-way matrix) |
| 255 | # ───────────────────────────────────────────────────────────────────────────── |
| 256 | |
| 257 | |
| 258 | class TestClassifyConflict: |
| 259 | def test_both_absent_no_conflict(self) -> None: |
| 260 | from muse.cli.commands.plan_merge import _classify_conflict |
| 261 | item = _classify_conflict("x.py::fn", None, None, None) |
| 262 | assert item.conflict_type == "no_conflict" |
| 263 | |
| 264 | def test_ours_new_no_base_no_conflict(self) -> None: |
| 265 | from muse.cli.commands.plan_merge import _classify_conflict |
| 266 | item = _classify_conflict("x.py::fn", None, _sym("fn"), None) |
| 267 | assert item.conflict_type == "no_conflict" |
| 268 | assert item.ours_change == "added" |
| 269 | |
| 270 | def test_theirs_new_no_base_no_conflict(self) -> None: |
| 271 | from muse.cli.commands.plan_merge import _classify_conflict |
| 272 | item = _classify_conflict("x.py::fn", None, None, _sym("fn")) |
| 273 | assert item.conflict_type == "no_conflict" |
| 274 | assert item.theirs_change == "added" |
| 275 | |
| 276 | def test_only_ours_changed_three_way(self) -> None: |
| 277 | """base=X, ours=Y, theirs=X → only ours changed → no_conflict.""" |
| 278 | from muse.cli.commands.plan_merge import _classify_conflict |
| 279 | base = _sym("fn", content_id="X", body_hash="BH1", signature_id="SIG") |
| 280 | ours = _sym("fn", content_id="Y", body_hash="BH2", signature_id="SIG") |
| 281 | theirs = _sym("fn", content_id="X", body_hash="BH1", signature_id="SIG") |
| 282 | item = _classify_conflict("x.py::fn", base, ours, theirs) |
| 283 | assert item.conflict_type == "no_conflict" |
| 284 | assert "fast-forward" in item.recommendation |
| 285 | |
| 286 | def test_only_theirs_changed_three_way(self) -> None: |
| 287 | """base=X, ours=X, theirs=Y → only theirs changed → no_conflict.""" |
| 288 | from muse.cli.commands.plan_merge import _classify_conflict |
| 289 | base = _sym("fn", content_id="X", body_hash="BH1", signature_id="SIG") |
| 290 | ours = _sym("fn", content_id="X", body_hash="BH1", signature_id="SIG") |
| 291 | theirs = _sym("fn", content_id="Y", body_hash="BH2", signature_id="SIG") |
| 292 | item = _classify_conflict("x.py::fn", base, ours, theirs) |
| 293 | assert item.conflict_type == "no_conflict" |
| 294 | |
| 295 | def test_both_changed_three_way_overlap(self) -> None: |
| 296 | """base=X, ours=Y, theirs=Z → both changed → symbol_edit_overlap.""" |
| 297 | from muse.cli.commands.plan_merge import _classify_conflict |
| 298 | base = _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG0") |
| 299 | ours = _sym("fn", content_id="Y", body_hash="BH1", signature_id="SIG0") |
| 300 | theirs = _sym("fn", content_id="Z", body_hash="BH2", signature_id="SIG0") |
| 301 | item = _classify_conflict("x.py::fn", base, ours, theirs) |
| 302 | assert item.conflict_type == "symbol_edit_overlap" |
| 303 | |
| 304 | def test_identical_on_both_no_conflict(self) -> None: |
| 305 | from muse.cli.commands.plan_merge import _classify_conflict |
| 306 | s = _sym("fn", content_id="SAME") |
| 307 | item = _classify_conflict("x.py::fn", _sym("fn"), s, s) |
| 308 | assert item.conflict_type == "no_conflict" |
| 309 | assert "identical" in item.recommendation |
| 310 | |
| 311 | def test_ours_deleted_theirs_unchanged_no_conflict(self) -> None: |
| 312 | """base=X, ours=None, theirs=X (unchanged) → no_conflict.""" |
| 313 | from muse.cli.commands.plan_merge import _classify_conflict |
| 314 | base = _sym("fn", content_id="X", body_hash="BH", signature_id="SIG") |
| 315 | theirs = _sym("fn", content_id="X", body_hash="BH", signature_id="SIG") |
| 316 | item = _classify_conflict("x.py::fn", base, None, theirs) |
| 317 | assert item.conflict_type == "no_conflict" |
| 318 | |
| 319 | def test_ours_deleted_theirs_modified_review(self) -> None: |
| 320 | """base=X, ours=None, theirs=Y → potentially delete_use → review.""" |
| 321 | from muse.cli.commands.plan_merge import _classify_conflict |
| 322 | base = _sym("fn", content_id="X", body_hash="BH", signature_id="SIG") |
| 323 | theirs = _sym("fn", content_id="Y", body_hash="BH2", signature_id="SIG") |
| 324 | item = _classify_conflict("x.py::fn", base, None, theirs) |
| 325 | # delete_use is detected in Pass 3; this is a "review" no_conflict. |
| 326 | assert item.conflict_type == "no_conflict" |
| 327 | assert "review" in item.recommendation |
| 328 | |
| 329 | def test_rename_edit_detected_ours_renamed(self) -> None: |
| 330 | """Ours renamed (same body, different name) + theirs modified → rename_edit.""" |
| 331 | from muse.cli.commands.plan_merge import _classify_conflict |
| 332 | base_rec = {**_sym("fn_old", body_hash="BH", signature_id="SIG"), "name": "fn_old"} |
| 333 | # Ours: same body hash, but name changed (signature_only = rename) |
| 334 | ours_rec = { |
| 335 | **_sym("fn_new", content_id="C2", body_hash="BH", signature_id="SIG2"), |
| 336 | "name": "fn_new", |
| 337 | } |
| 338 | theirs_rec = { |
| 339 | **_sym("fn_old", content_id="C3", body_hash="BH3", signature_id="SIG"), |
| 340 | "name": "fn_old", |
| 341 | } |
| 342 | item = _classify_conflict("x.py::fn_old", base_rec, ours_rec, theirs_rec) |
| 343 | assert item.conflict_type == "rename_edit" |
| 344 | |
| 345 | def test_both_added_same_content_no_conflict(self) -> None: |
| 346 | """No base, both added same symbol → no_conflict.""" |
| 347 | from muse.cli.commands.plan_merge import _classify_conflict |
| 348 | s = _sym("fn", content_id="SAME") |
| 349 | item = _classify_conflict("x.py::fn", None, s, s) |
| 350 | assert item.conflict_type == "no_conflict" |
| 351 | |
| 352 | def test_both_added_different_content_overlap(self) -> None: |
| 353 | """No base, both added different content → symbol_edit_overlap.""" |
| 354 | from muse.cli.commands.plan_merge import _classify_conflict |
| 355 | item = _classify_conflict( |
| 356 | "x.py::fn", None, |
| 357 | _sym("fn", content_id="X"), |
| 358 | _sym("fn", content_id="Y"), |
| 359 | ) |
| 360 | assert item.conflict_type == "symbol_edit_overlap" |
| 361 | |
| 362 | def test_merge_item_slots_and_to_dict(self) -> None: |
| 363 | from muse.cli.commands.plan_merge import _MergeItem |
| 364 | m = _MergeItem("addr", "no_conflict", "a", "b", "rec") |
| 365 | d = m.to_dict() |
| 366 | assert set(d.keys()) == {"address", "conflict_type", "ours_change", "theirs_change", "recommendation"} |
| 367 | assert d["conflict_type"] == "no_conflict" |
| 368 | |
| 369 | |
| 370 | # ───────────────────────────────────────────────────────────────────────────── |
| 371 | # Unit tests — _find_renames_and_moves |
| 372 | # ───────────────────────────────────────────────────────────────────────────── |
| 373 | |
| 374 | |
| 375 | class TestFindRenamesAndMoves: |
| 376 | def test_rename_detected_same_file(self) -> None: |
| 377 | from muse.cli.commands.plan_merge import _find_renames_and_moves |
| 378 | base = {"file.py::fn_old": _sym("fn_old", body_hash="BODYHASH123456")} |
| 379 | branch = {"file.py::fn_new": {**_sym("fn_new", body_hash="BODYHASH123456"), "name": "fn_new"}} |
| 380 | renames, moves = _find_renames_and_moves(base, branch) |
| 381 | assert "file.py::fn_old" in renames |
| 382 | assert renames["file.py::fn_old"] == "file.py::fn_new" |
| 383 | assert not moves |
| 384 | |
| 385 | def test_move_detected_different_file(self) -> None: |
| 386 | from muse.cli.commands.plan_merge import _find_renames_and_moves |
| 387 | base = {"old/file.py::fn": _sym("fn", body_hash="BODYHASH123456")} |
| 388 | branch = {"new/file.py::fn": _sym("fn", body_hash="BODYHASH123456")} |
| 389 | renames, moves = _find_renames_and_moves(base, branch) |
| 390 | assert not renames |
| 391 | assert "old/file.py::fn" in moves |
| 392 | |
| 393 | def test_same_file_preferred_over_other_file(self) -> None: |
| 394 | from muse.cli.commands.plan_merge import _find_renames_and_moves |
| 395 | base = {"file.py::fn_old": _sym("fn_old", body_hash="BODYHASH123456")} |
| 396 | branch = { |
| 397 | "file.py::fn_new": _sym("fn_new", body_hash="BODYHASH123456"), |
| 398 | "other.py::fn_copy": _sym("fn_copy", body_hash="BODYHASH123456"), |
| 399 | } |
| 400 | renames, moves = _find_renames_and_moves(base, branch) |
| 401 | # Same file candidate should win → rename, not move. |
| 402 | assert "file.py::fn_old" in renames |
| 403 | assert "file.py::fn_old" not in moves |
| 404 | |
| 405 | def test_trivial_body_hash_skipped(self) -> None: |
| 406 | """Very short body hashes are skipped to avoid false positives.""" |
| 407 | from muse.cli.commands.plan_merge import _find_renames_and_moves |
| 408 | base = {"file.py::fn": _sym("fn", body_hash="X")} # Too short. |
| 409 | branch = {"file.py::fn_new": _sym("fn_new", body_hash="X")} |
| 410 | renames, moves = _find_renames_and_moves(base, branch) |
| 411 | assert not renames |
| 412 | assert not moves |
| 413 | |
| 414 | def test_still_present_not_a_rename(self) -> None: |
| 415 | from muse.cli.commands.plan_merge import _find_renames_and_moves |
| 416 | sym = _sym("fn", body_hash="BODY1234567890") |
| 417 | base = {"file.py::fn": sym} |
| 418 | branch = {"file.py::fn": sym, "file.py::fn2": _sym("fn2", body_hash="BODY1234567890")} |
| 419 | renames, moves = _find_renames_and_moves(base, branch) |
| 420 | # Original still present → not a rename. |
| 421 | assert "file.py::fn" not in renames |
| 422 | |
| 423 | |
| 424 | # ───────────────────────────────────────────────────────────────────────────── |
| 425 | # Unit tests — _find_delete_use_conflicts |
| 426 | # ───────────────────────────────────────────────────────────────────────────── |
| 427 | |
| 428 | |
| 429 | class TestFindDeleteUseConflicts: |
| 430 | def _make_manifests(self) -> tuple[Manifest, Manifest, Manifest]: |
| 431 | return {"base": "m"}, {"ours": "m"}, {"theirs": "m"} |
| 432 | |
| 433 | def test_delete_use_detected(self, repo: pathlib.Path) -> None: |
| 434 | from muse.cli.commands.plan_merge import _find_delete_use_conflicts |
| 435 | base_syms = {"src/api.py::fn": _sym("fn")} |
| 436 | ours_syms = {} # deleted on ours |
| 437 | theirs_syms = {"src/api.py::fn": _sym("fn")} # still in theirs |
| 438 | |
| 439 | # Theirs has a new caller of fn that wasn't in base. |
| 440 | base_fg = {"caller_base.py::existing": frozenset()} |
| 441 | ours_fg = {} |
| 442 | theirs_fg = { |
| 443 | "caller_new.py::new_caller": frozenset({"fn"}), # new! |
| 444 | } |
| 445 | with ( |
| 446 | patch("muse.plugins.code._callgraph.build_forward_graph", |
| 447 | side_effect=[base_fg, ours_fg, theirs_fg]), |
| 448 | ): |
| 449 | items, ok, warn = _find_delete_use_conflicts( |
| 450 | repo, {}, {}, {}, base_syms, ours_syms, theirs_syms, |
| 451 | ) |
| 452 | |
| 453 | assert ok is True |
| 454 | assert warn is None |
| 455 | assert len(items) == 1 |
| 456 | assert items[0].conflict_type == "delete_use" |
| 457 | assert "caller_new.py::new_caller" in items[0].theirs_change |
| 458 | |
| 459 | def test_no_delete_use_when_no_deletions(self, repo: pathlib.Path) -> None: |
| 460 | from muse.cli.commands.plan_merge import _find_delete_use_conflicts |
| 461 | base_syms = {"src/api.py::fn": _sym("fn")} |
| 462 | ours_syms = {"src/api.py::fn": _sym("fn")} |
| 463 | theirs_syms = {"src/api.py::fn": _sym("fn")} |
| 464 | items, ok, warn = _find_delete_use_conflicts( |
| 465 | repo, {}, {}, {}, base_syms, ours_syms, theirs_syms, |
| 466 | ) |
| 467 | assert ok is True |
| 468 | assert items == [] |
| 469 | |
| 470 | def test_call_graph_unavailable_warns(self, repo: pathlib.Path) -> None: |
| 471 | from muse.cli.commands.plan_merge import _find_delete_use_conflicts |
| 472 | base_syms = {"src/api.py::fn": _sym("fn")} |
| 473 | ours_syms = {} |
| 474 | theirs_syms = {"src/api.py::fn": _sym("fn")} |
| 475 | |
| 476 | with patch("muse.plugins.code._callgraph.build_forward_graph", |
| 477 | side_effect=OSError("no index")): |
| 478 | items, ok, warn = _find_delete_use_conflicts( |
| 479 | repo, {}, {}, {}, base_syms, ours_syms, theirs_syms, |
| 480 | ) |
| 481 | |
| 482 | assert ok is False |
| 483 | assert warn is not None |
| 484 | assert "call graph unavailable" in warn |
| 485 | assert items == [] |
| 486 | |
| 487 | def test_keyerror_from_call_graph_warns(self, repo: pathlib.Path) -> None: |
| 488 | from muse.cli.commands.plan_merge import _find_delete_use_conflicts |
| 489 | base_syms = {"src/api.py::fn": _sym("fn")} |
| 490 | ours_syms = {} |
| 491 | theirs_syms = {"src/api.py::fn": _sym("fn")} |
| 492 | |
| 493 | with patch("muse.plugins.code._callgraph.build_forward_graph", |
| 494 | side_effect=KeyError("missing")): |
| 495 | items, ok, warn = _find_delete_use_conflicts( |
| 496 | repo, {}, {}, {}, base_syms, ours_syms, theirs_syms, |
| 497 | ) |
| 498 | |
| 499 | assert ok is False |
| 500 | |
| 501 | def test_unexpected_exception_propagates(self, repo: pathlib.Path) -> None: |
| 502 | from muse.cli.commands.plan_merge import _find_delete_use_conflicts |
| 503 | base_syms = {"src/api.py::fn": _sym("fn")} |
| 504 | ours_syms = {} |
| 505 | theirs_syms = {"src/api.py::fn": _sym("fn")} |
| 506 | |
| 507 | with patch("muse.plugins.code._callgraph.build_forward_graph", |
| 508 | side_effect=MemoryError("OOM")): |
| 509 | with pytest.raises(MemoryError): |
| 510 | _find_delete_use_conflicts( |
| 511 | repo, {}, {}, {}, base_syms, ours_syms, theirs_syms, |
| 512 | ) |
| 513 | |
| 514 | def test_caller_preview_truncated_at_3(self, repo: pathlib.Path) -> None: |
| 515 | from muse.cli.commands.plan_merge import _find_delete_use_conflicts |
| 516 | base_syms = {"src/api.py::fn": _sym("fn")} |
| 517 | ours_syms = {} |
| 518 | theirs_syms = {"src/api.py::fn": _sym("fn")} |
| 519 | |
| 520 | theirs_fg = {f"mod{i}.py::caller{i}": frozenset({"fn"}) for i in range(10)} |
| 521 | base_fg = {} |
| 522 | ours_fg = {} |
| 523 | |
| 524 | with patch("muse.plugins.code._callgraph.build_forward_graph", |
| 525 | side_effect=[base_fg, ours_fg, theirs_fg]): |
| 526 | items, ok, _ = _find_delete_use_conflicts( |
| 527 | repo, {}, {}, {}, base_syms, ours_syms, theirs_syms, |
| 528 | ) |
| 529 | |
| 530 | assert len(items) == 1 |
| 531 | assert "+7 more" in items[0].theirs_change |
| 532 | |
| 533 | |
| 534 | # ───────────────────────────────────────────────────────────────────────────── |
| 535 | # Unit tests — _find_dependency_conflicts |
| 536 | # ───────────────────────────────────────────────────────────────────────────── |
| 537 | |
| 538 | |
| 539 | class TestFindDependencyConflicts: |
| 540 | def test_dependency_detected(self, repo: pathlib.Path) -> None: |
| 541 | from muse.cli.commands.plan_merge import _find_dependency_conflicts |
| 542 | # Ours changed fn_a; theirs changed fn_b which calls fn_a. |
| 543 | ours_changed = {"src/x.py::fn_a"} |
| 544 | theirs_changed = {"src/y.py::fn_b"} |
| 545 | reverse = {"fn_a": ["src/y.py::fn_b"]} # fn_b calls fn_a. |
| 546 | |
| 547 | with ( |
| 548 | patch("muse.plugins.code._callgraph.build_reverse_graph", return_value=reverse), |
| 549 | patch("muse.plugins.code._callgraph.transitive_callers", |
| 550 | return_value={1: ["src/y.py::fn_b"]}), |
| 551 | ): |
| 552 | items, ok, warn = _find_dependency_conflicts( |
| 553 | repo, {}, {}, ours_changed, theirs_changed, |
| 554 | ) |
| 555 | |
| 556 | assert ok is True |
| 557 | assert len(items) == 1 |
| 558 | assert items[0].conflict_type == "dependency_conflict" |
| 559 | assert "src/x.py::fn_a" == items[0].address |
| 560 | |
| 561 | def test_no_conflict_when_no_changes(self, repo: pathlib.Path) -> None: |
| 562 | from muse.cli.commands.plan_merge import _find_dependency_conflicts |
| 563 | items, ok, warn = _find_dependency_conflicts(repo, {}, {}, set(), set()) |
| 564 | assert items == [] |
| 565 | assert ok is True |
| 566 | |
| 567 | def test_call_graph_error_warns(self, repo: pathlib.Path) -> None: |
| 568 | from muse.cli.commands.plan_merge import _find_dependency_conflicts |
| 569 | with patch("muse.plugins.code._callgraph.build_reverse_graph", |
| 570 | side_effect=ValueError("bad data")): |
| 571 | items, ok, warn = _find_dependency_conflicts( |
| 572 | repo, {}, {}, {"x.py::fn"}, {"y.py::fn"}, |
| 573 | ) |
| 574 | assert ok is False |
| 575 | assert warn is not None |
| 576 | |
| 577 | def test_deduplication(self, repo: pathlib.Path) -> None: |
| 578 | """Same (ours_addr, theirs_addr) pair not added twice.""" |
| 579 | from muse.cli.commands.plan_merge import _find_dependency_conflicts |
| 580 | ours_changed = {"x.py::fn_a"} |
| 581 | theirs_changed = {"y.py::fn_b"} |
| 582 | reverse = {"fn_a": ["y.py::fn_b", "y.py::fn_b"]} # Duplicate. |
| 583 | |
| 584 | with ( |
| 585 | patch("muse.plugins.code._callgraph.build_reverse_graph", return_value=reverse), |
| 586 | patch("muse.plugins.code._callgraph.transitive_callers", |
| 587 | return_value={1: ["y.py::fn_b", "y.py::fn_b"]}), |
| 588 | ): |
| 589 | items, _, _ = _find_dependency_conflicts(repo, {}, {}, ours_changed, theirs_changed) |
| 590 | |
| 591 | assert len(items) == 1 # Not duplicated. |
| 592 | |
| 593 | |
| 594 | # ───────────────────────────────────────────────────────────────────────────── |
| 595 | # Integration tests — full CLI (mock-based) |
| 596 | # ───────────────────────────────────────────────────────────────────────────── |
| 597 | |
| 598 | |
| 599 | class TestPlanMergeIntegration: |
| 600 | def test_ref_not_found_exits_1(self, repo: pathlib.Path) -> None: |
| 601 | from muse.cli.commands.plan_merge import run as pm_run |
| 602 | ns = argparse.Namespace( |
| 603 | ours_ref="nonexistent", theirs_ref="main", |
| 604 | base_ref=None, skip_call_graph=True, fmt="json", |
| 605 | json_out=True, |
| 606 | ) |
| 607 | old = os.getcwd() |
| 608 | os.chdir(repo) |
| 609 | try: |
| 610 | with ( |
| 611 | patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), |
| 612 | |
| 613 | patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), |
| 614 | patch("muse.cli.commands.plan_merge.resolve_commit_ref", return_value=None), |
| 615 | ): |
| 616 | with pytest.raises(SystemExit) as exc: |
| 617 | pm_run(ns) |
| 618 | finally: |
| 619 | os.chdir(old) |
| 620 | assert exc.value.code == 1 |
| 621 | |
| 622 | def test_json_schema_complete(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 623 | code, out = _run_plan_merge(repo) |
| 624 | data = json.loads(out) |
| 625 | required = { |
| 626 | "schema", "ours", "theirs", "base", "base_auto_computed", |
| 627 | "call_graph_available", "call_graph_skipped", "warnings", |
| 628 | "total_symbols", "conflicts", "clean", "items", "duration_ms", |
| 629 | } |
| 630 | assert required.issubset(data.keys()) |
| 631 | |
| 632 | def test_duration_ms_is_non_negative_float(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 633 | _, out = _run_plan_merge(repo) |
| 634 | data = json.loads(out) |
| 635 | assert isinstance(data["duration_ms"], float) |
| 636 | assert data["duration_ms"] >= 0 |
| 637 | |
| 638 | def test_full_commit_ids_in_json(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 639 | ours_c = _mock_commit("a" * 64) |
| 640 | theirs_c = _mock_commit("b" * 64) |
| 641 | _, out = _run_plan_merge( |
| 642 | repo, |
| 643 | mock_ours_commit=ours_c, |
| 644 | mock_theirs_commit=theirs_c, |
| 645 | ) |
| 646 | data = json.loads(out) |
| 647 | assert data["ours"] == "a" * 64 |
| 648 | assert data["theirs"] == "b" * 64 |
| 649 | |
| 650 | def test_base_auto_computed_true_when_no_base_arg(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 651 | _, out = _run_plan_merge(repo) |
| 652 | data = json.loads(out) |
| 653 | assert data["base_auto_computed"] is True |
| 654 | |
| 655 | def test_base_auto_computed_false_when_base_arg_given(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 656 | from muse.cli.commands.plan_merge import run as pm_run |
| 657 | ours_c = _mock_commit("a" * 64) |
| 658 | theirs_c = _mock_commit("b" * 64) |
| 659 | base_c = _mock_commit("c" * 64) |
| 660 | |
| 661 | ns = argparse.Namespace( |
| 662 | ours_ref="HEAD", theirs_ref="main", |
| 663 | base_ref="base-ref", skip_call_graph=True, fmt="json", |
| 664 | json_out=True, |
| 665 | ) |
| 666 | import io, sys |
| 667 | captured = io.StringIO() |
| 668 | old_stdout = sys.stdout |
| 669 | sys.stdout = captured |
| 670 | old = os.getcwd() |
| 671 | os.chdir(repo) |
| 672 | try: |
| 673 | with ( |
| 674 | patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), |
| 675 | |
| 676 | patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), |
| 677 | patch("muse.cli.commands.plan_merge.resolve_commit_ref", |
| 678 | side_effect=lambda *a, **kw: ( |
| 679 | ours_c if a[2] in ("HEAD", None) else |
| 680 | theirs_c if a[2] == "main" else |
| 681 | base_c if a[2] == "base-ref" else None |
| 682 | )), |
| 683 | patch("muse.cli.commands.plan_merge.find_merge_base", return_value="c" * 64), |
| 684 | patch("muse.cli.commands.plan_merge.get_commit_snapshot_manifest", return_value={}), |
| 685 | patch("muse.cli.commands.plan_merge.symbols_for_snapshot", return_value={}), |
| 686 | ): |
| 687 | pm_run(ns) |
| 688 | except SystemExit: |
| 689 | pass |
| 690 | finally: |
| 691 | sys.stdout = old_stdout |
| 692 | os.chdir(old) |
| 693 | |
| 694 | data = json.loads(captured.getvalue()) |
| 695 | assert data["base_auto_computed"] is False |
| 696 | |
| 697 | def test_call_graph_skipped_flag(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 698 | _, out = _run_plan_merge(repo, skip_call_graph=True) |
| 699 | data = json.loads(out) |
| 700 | assert data["call_graph_skipped"] is True |
| 701 | |
| 702 | def test_warnings_list_in_json(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 703 | _, out = _run_plan_merge(repo) |
| 704 | data = json.loads(out) |
| 705 | assert isinstance(data["warnings"], list) |
| 706 | |
| 707 | def test_no_conflicts_empty_swarm(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 708 | _, out = _run_plan_merge(repo) |
| 709 | data = json.loads(out) |
| 710 | assert data["conflicts"] == 0 |
| 711 | assert data["items"] == [] |
| 712 | |
| 713 | def test_symbol_edit_overlap_detected(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 714 | """Three-way: base=X, ours=Y, theirs=Z → symbol_edit_overlap.""" |
| 715 | base = {"f.py::fn": _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} |
| 716 | ours = {"f.py::fn": _sym("fn", content_id="Y", body_hash="BH1", signature_id="SIG")} |
| 717 | theirs = {"f.py::fn": _sym("fn", content_id="Z", body_hash="BH2", signature_id="SIG")} |
| 718 | _, out = _run_plan_merge( |
| 719 | repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 720 | ) |
| 721 | data = json.loads(out) |
| 722 | assert data["conflicts"] == 1 |
| 723 | assert data["items"][0]["conflict_type"] == "symbol_edit_overlap" |
| 724 | |
| 725 | def test_no_false_positive_unilateral_change(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 726 | """Three-way: base=X, ours=Y, theirs=X → only ours changed → no_conflict.""" |
| 727 | base = {"f.py::fn": _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} |
| 728 | ours = {"f.py::fn": _sym("fn", content_id="Y", body_hash="BH1", signature_id="SIG")} |
| 729 | theirs = {"f.py::fn": _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} |
| 730 | _, out = _run_plan_merge( |
| 731 | repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 732 | ) |
| 733 | data = json.loads(out) |
| 734 | assert data["conflicts"] == 0, ( |
| 735 | "Three-way should detect no conflict when only ours changed" |
| 736 | ) |
| 737 | |
| 738 | def test_rename_edit_via_pass2_body_hash(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 739 | """Pass 2 upgrades symbol_edit_overlap → rename_edit via body_hash matching.""" |
| 740 | body = "REAL_BODY_HASH_12345" |
| 741 | base = {"f.py::fn_old": {**_sym("fn_old", content_id="X", body_hash=body), "name": "fn_old"}} |
| 742 | # Ours: fn_old deleted, fn_new added with same body (rename) |
| 743 | ours = {"f.py::fn_new": {**_sym("fn_new", content_id="Y", body_hash=body), "name": "fn_new"}} |
| 744 | # Theirs: fn_old modified (impl change) |
| 745 | theirs = {"f.py::fn_old": {**_sym("fn_old", content_id="Z", body_hash="OTHER"), "name": "fn_old"}} |
| 746 | _, out = _run_plan_merge( |
| 747 | repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 748 | ) |
| 749 | data = json.loads(out) |
| 750 | conflict_types = [i["conflict_type"] for i in data["items"]] |
| 751 | assert "rename_edit" in conflict_types |
| 752 | |
| 753 | def test_move_edit_via_pass2(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 754 | """Pass 2 detects move_edit when ours moved to different file and theirs modified.""" |
| 755 | body = "MOVED_BODY_HASH_12345" |
| 756 | base = {"old/file.py::fn": _sym("fn", content_id="X", body_hash=body)} |
| 757 | # Ours: fn moved to new/file.py |
| 758 | ours = {"new/file.py::fn": _sym("fn", content_id="X", body_hash=body)} |
| 759 | # Theirs: fn modified in original location |
| 760 | theirs = {"old/file.py::fn": _sym("fn", content_id="Z", body_hash="OTHER")} |
| 761 | _, out = _run_plan_merge( |
| 762 | repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 763 | ) |
| 764 | data = json.loads(out) |
| 765 | conflict_types = [i["conflict_type"] for i in data["items"]] |
| 766 | assert "move_edit" in conflict_types |
| 767 | |
| 768 | def test_text_output_format(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 769 | _, out = _run_plan_merge(repo, fmt="text") |
| 770 | assert "Semantic merge plan" in out |
| 771 | assert "base:" in out |
| 772 | |
| 773 | def test_text_output_shows_conflicts(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 774 | base = {"f.py::fn": _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} |
| 775 | ours = {"f.py::fn": _sym("fn", content_id="Y", body_hash="BH1", signature_id="SIG")} |
| 776 | theirs = {"f.py::fn": _sym("fn", content_id="Z", body_hash="BH2", signature_id="SIG")} |
| 777 | _, out = _run_plan_merge( |
| 778 | repo, fmt="text", |
| 779 | mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 780 | ) |
| 781 | assert "symbol_edit_overlap" in out |
| 782 | assert "ours:" in out |
| 783 | assert "theirs:" in out |
| 784 | |
| 785 | def test_text_output_shows_elapsed(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 786 | _, out = _run_plan_merge(repo, fmt="text") |
| 787 | assert "s)" in out |
| 788 | |
| 789 | def test_skip_call_graph_omits_delete_use(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 790 | base = {"f.py::fn": _sym("fn")} |
| 791 | ours = {} # deleted |
| 792 | theirs = {"f.py::fn": _sym("fn")} |
| 793 | _, out = _run_plan_merge( |
| 794 | repo, skip_call_graph=True, |
| 795 | mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 796 | ) |
| 797 | data = json.loads(out) |
| 798 | conflict_types = [i["conflict_type"] for i in data["items"]] |
| 799 | assert "delete_use" not in conflict_types |
| 800 | |
| 801 | def test_base_none_warning_in_json(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 802 | """When find_merge_base returns None, warnings includes a notice.""" |
| 803 | from muse.cli.commands.plan_merge import run as pm_run |
| 804 | ours_c = _mock_commit("a" * 64) |
| 805 | theirs_c = _mock_commit("b" * 64) |
| 806 | ns = argparse.Namespace( |
| 807 | ours_ref="HEAD", theirs_ref="main", |
| 808 | base_ref=None, skip_call_graph=True, fmt="json", |
| 809 | json_out=True, |
| 810 | ) |
| 811 | import io, sys |
| 812 | captured = io.StringIO() |
| 813 | old = os.getcwd() |
| 814 | os.chdir(repo) |
| 815 | sys.stdout = captured |
| 816 | try: |
| 817 | with ( |
| 818 | patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), |
| 819 | |
| 820 | patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), |
| 821 | patch("muse.cli.commands.plan_merge.resolve_commit_ref", |
| 822 | side_effect=lambda *a: ours_c if a[2] in ("HEAD", None) else theirs_c), |
| 823 | patch("muse.cli.commands.plan_merge.find_merge_base", return_value=None), |
| 824 | patch("muse.cli.commands.plan_merge.get_commit_snapshot_manifest", return_value={}), |
| 825 | patch("muse.cli.commands.plan_merge.symbols_for_snapshot", return_value={}), |
| 826 | ): |
| 827 | pm_run(ns) |
| 828 | except SystemExit: |
| 829 | pass |
| 830 | finally: |
| 831 | sys.stdout = sys.__stdout__ |
| 832 | os.chdir(old) |
| 833 | |
| 834 | data = json.loads(captured.getvalue()) |
| 835 | assert any("no common ancestor" in w for w in data["warnings"]) |
| 836 | assert data["base"] is None |
| 837 | |
| 838 | def test_format_json_shorthand(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 839 | """--json is equivalent to --format json.""" |
| 840 | from muse.cli.commands.plan_merge import run as pm_run |
| 841 | ours_c = _mock_commit("a" * 64) |
| 842 | theirs_c = _mock_commit("b" * 64) |
| 843 | ns = argparse.Namespace( |
| 844 | ours_ref="HEAD", theirs_ref="main", |
| 845 | base_ref=None, skip_call_graph=True, fmt="json", # same as --json |
| 846 | json_out=True, |
| 847 | ) |
| 848 | import io, sys |
| 849 | captured = io.StringIO() |
| 850 | old = os.getcwd() |
| 851 | os.chdir(repo) |
| 852 | sys.stdout = captured |
| 853 | try: |
| 854 | with ( |
| 855 | patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), |
| 856 | |
| 857 | patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), |
| 858 | patch("muse.cli.commands.plan_merge.resolve_commit_ref", |
| 859 | side_effect=lambda *a: ours_c if a[2] in ("HEAD", None) else theirs_c), |
| 860 | patch("muse.cli.commands.plan_merge.find_merge_base", return_value="c" * 64), |
| 861 | patch("muse.cli.commands.plan_merge.get_commit_snapshot_manifest", return_value={}), |
| 862 | patch("muse.cli.commands.plan_merge.symbols_for_snapshot", return_value={}), |
| 863 | ): |
| 864 | pm_run(ns) |
| 865 | except SystemExit: |
| 866 | pass |
| 867 | finally: |
| 868 | sys.stdout = sys.__stdout__ |
| 869 | os.chdir(old) |
| 870 | |
| 871 | data = json.loads(captured.getvalue()) |
| 872 | assert "conflicts" in data |
| 873 | |
| 874 | |
| 875 | # ───────────────────────────────────────────────────────────────────────────── |
| 876 | # Security tests |
| 877 | # ───────────────────────────────────────────────────────────────────────────── |
| 878 | |
| 879 | |
| 880 | class TestPlanMergeSecurity: |
| 881 | def test_ansi_in_address_stripped_text_output(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 882 | """ANSI escape codes in symbol addresses are stripped before display.""" |
| 883 | ansi_addr = "\x1b[31msrc/malicious.py::fn\x1b[0m" |
| 884 | base = {ansi_addr: _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} |
| 885 | ours = {ansi_addr: _sym("fn", content_id="Y", body_hash="BH1", signature_id="SIG")} |
| 886 | theirs = {ansi_addr: _sym("fn", content_id="Z", body_hash="BH2", signature_id="SIG")} |
| 887 | _, out = _run_plan_merge( |
| 888 | repo, fmt="text", |
| 889 | mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 890 | ) |
| 891 | assert "\x1b[" not in out |
| 892 | assert "src/malicious.py::fn" in out # sanitized content still shown |
| 893 | |
| 894 | def test_ansi_in_recommendation_stripped(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 895 | base = {"f.py::fn": _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} |
| 896 | ours = {"f.py::fn": _sym("fn", content_id="Y", body_hash="BH1", signature_id="SIG")} |
| 897 | theirs = {"f.py::fn": _sym("fn", content_id="Z", body_hash="BH2", signature_id="SIG")} |
| 898 | _, out = _run_plan_merge( |
| 899 | repo, fmt="text", |
| 900 | mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 901 | ) |
| 902 | assert "\x1b[" not in out |
| 903 | |
| 904 | def test_control_chars_in_ref_not_escape_fs(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 905 | """Control characters in ref names are sanitised before display in error output.""" |
| 906 | from muse.cli.commands.plan_merge import run as pm_run |
| 907 | import io, sys |
| 908 | captured = io.StringIO() |
| 909 | malicious_ref = "ref\x00/../../../etc" |
| 910 | ns = argparse.Namespace( |
| 911 | ours_ref=malicious_ref, theirs_ref="main", |
| 912 | base_ref=None, skip_call_graph=True, fmt="json", |
| 913 | json_out=True, |
| 914 | ) |
| 915 | old = os.getcwd() |
| 916 | os.chdir(repo) |
| 917 | sys.stdout = captured |
| 918 | try: |
| 919 | with ( |
| 920 | patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), |
| 921 | |
| 922 | patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), |
| 923 | patch("muse.cli.commands.plan_merge.resolve_commit_ref", return_value=None), |
| 924 | ): |
| 925 | with pytest.raises(SystemExit) as exc: |
| 926 | pm_run(ns) |
| 927 | finally: |
| 928 | sys.stdout = sys.__stdout__ |
| 929 | os.chdir(old) |
| 930 | assert exc.value.code == 1 |
| 931 | out = captured.getvalue() |
| 932 | # Output must not contain raw null bytes. |
| 933 | assert "\x00" not in out |
| 934 | |
| 935 | |
| 936 | # ───────────────────────────────────────────────────────────────────────────── |
| 937 | # Stress tests |
| 938 | # ───────────────────────────────────────────────────────────────────────────── |
| 939 | |
| 940 | |
| 941 | class TestPlanMergeStress: |
| 942 | def test_1000_symbols_pass1_under_2s(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 943 | """Pass 1 with 1000 symbols completes in < 2 s.""" |
| 944 | N = 1000 |
| 945 | base = {f"f{i}.py::fn{i}": _sym(f"fn{i}", content_id=f"X{i}") for i in range(N)} |
| 946 | # Ours and theirs both change half the symbols differently. |
| 947 | ours = { |
| 948 | f"f{i}.py::fn{i}": ( |
| 949 | _sym(f"fn{i}", content_id=f"Y{i}") if i % 2 == 0 |
| 950 | else _sym(f"fn{i}", content_id=f"X{i}") |
| 951 | ) |
| 952 | for i in range(N) |
| 953 | } |
| 954 | theirs = { |
| 955 | f"f{i}.py::fn{i}": ( |
| 956 | _sym(f"fn{i}", content_id=f"X{i}") if i % 2 == 0 |
| 957 | else _sym(f"fn{i}", content_id=f"Z{i}") |
| 958 | ) |
| 959 | for i in range(N) |
| 960 | } |
| 961 | |
| 962 | t0 = time.monotonic() |
| 963 | _, out = _run_plan_merge( |
| 964 | repo, |
| 965 | mock_ours_syms=ours, |
| 966 | mock_theirs_syms=theirs, |
| 967 | mock_base_syms=base, |
| 968 | ) |
| 969 | elapsed = time.monotonic() - t0 |
| 970 | |
| 971 | assert elapsed < 2.0, f"Pass 1 took {elapsed:.2f}s — too slow" |
| 972 | data = json.loads(out) |
| 973 | assert data["total_symbols"] == N |
| 974 | |
| 975 | def test_200_renames_pass2_under_1s(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 976 | """Pass 2 rename detection with 200 renames completes in < 1 s.""" |
| 977 | N = 200 |
| 978 | body_hashes = [f"BODYHASH{i:08d}" for i in range(N)] |
| 979 | base = { |
| 980 | f"f.py::fn_old_{i}": {**_sym(f"fn_old_{i}", body_hash=body_hashes[i]), "name": f"fn_old_{i}"} |
| 981 | for i in range(N) |
| 982 | } |
| 983 | # Ours: all renamed |
| 984 | ours = { |
| 985 | f"f.py::fn_new_{i}": {**_sym(f"fn_new_{i}", body_hash=body_hashes[i]), "name": f"fn_new_{i}"} |
| 986 | for i in range(N) |
| 987 | } |
| 988 | # Theirs: all modified (different body) |
| 989 | theirs = { |
| 990 | f"f.py::fn_old_{i}": {**_sym(f"fn_old_{i}", content_id=f"Z{i}", body_hash=f"DIFF{i}"), "name": f"fn_old_{i}"} |
| 991 | for i in range(N) |
| 992 | } |
| 993 | |
| 994 | t0 = time.monotonic() |
| 995 | _, out = _run_plan_merge( |
| 996 | repo, |
| 997 | mock_ours_syms=ours, |
| 998 | mock_theirs_syms=theirs, |
| 999 | mock_base_syms=base, |
| 1000 | ) |
| 1001 | elapsed = time.monotonic() - t0 |
| 1002 | |
| 1003 | assert elapsed < 1.0, f"Pass 2 took {elapsed:.2f}s — too slow" |
| 1004 | data = json.loads(out) |
| 1005 | conflict_types = [i["conflict_type"] for i in data["items"]] |
| 1006 | assert "rename_edit" in conflict_types |
| 1007 | |
| 1008 | def test_100_deletes_delete_use_detection_under_2s(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 1009 | """delete_use detection with 100 deleted symbols completes in < 2 s.""" |
| 1010 | from muse.cli.commands.plan_merge import _find_delete_use_conflicts |
| 1011 | |
| 1012 | N = 100 |
| 1013 | base_syms = {f"f{i}.py::fn{i}": _sym(f"fn{i}") for i in range(N)} |
| 1014 | ours_syms: SymbolTree = {} # All deleted on ours. |
| 1015 | theirs_syms = dict(base_syms) # All present on theirs. |
| 1016 | |
| 1017 | # Mock: each fn has a new caller on theirs. |
| 1018 | base_fg: ForwardGraph = {} |
| 1019 | ours_fg: ForwardGraph = {} |
| 1020 | theirs_fg = {f"new_caller{i}.py::caller{i}": frozenset({f"fn{i}"}) for i in range(N)} |
| 1021 | |
| 1022 | t0 = time.monotonic() |
| 1023 | with patch("muse.plugins.code._callgraph.build_forward_graph", |
| 1024 | side_effect=[base_fg, ours_fg, theirs_fg]): |
| 1025 | items, ok, warn = _find_delete_use_conflicts( |
| 1026 | repo, {}, {}, {}, base_syms, ours_syms, theirs_syms, |
| 1027 | ) |
| 1028 | elapsed = time.monotonic() - t0 |
| 1029 | |
| 1030 | assert elapsed < 2.0, f"delete_use took {elapsed:.2f}s — too slow" |
| 1031 | assert ok is True |
| 1032 | assert len(items) == N |
| 1033 | |
| 1034 | def test_correctness_three_way_no_false_positives(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: |
| 1035 | """Regression: three-way diff must not produce false positives on unilateral changes.""" |
| 1036 | N = 500 |
| 1037 | # Ours changes even-indexed symbols, theirs changes odd-indexed. |
| 1038 | # Each symbol is changed by only ONE side → all should be no_conflict. |
| 1039 | base = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"BASE{i}") for i in range(N)} |
| 1040 | ours = { |
| 1041 | f"f.py::fn{i}": ( |
| 1042 | _sym(f"fn{i}", content_id=f"OUR{i}") if i % 2 == 0 |
| 1043 | else _sym(f"fn{i}", content_id=f"BASE{i}") |
| 1044 | ) |
| 1045 | for i in range(N) |
| 1046 | } |
| 1047 | theirs = { |
| 1048 | f"f.py::fn{i}": ( |
| 1049 | _sym(f"fn{i}", content_id=f"BASE{i}") if i % 2 == 0 |
| 1050 | else _sym(f"fn{i}", content_id=f"THEIR{i}") |
| 1051 | ) |
| 1052 | for i in range(N) |
| 1053 | } |
| 1054 | |
| 1055 | _, out = _run_plan_merge( |
| 1056 | repo, |
| 1057 | mock_ours_syms=ours, |
| 1058 | mock_theirs_syms=theirs, |
| 1059 | mock_base_syms=base, |
| 1060 | ) |
| 1061 | data = json.loads(out) |
| 1062 | assert data["conflicts"] == 0, ( |
| 1063 | f"Three-way correctness: expected 0 conflicts, got {data['conflicts']}" |
| 1064 | ) |
| 1065 | |
| 1066 | |
| 1067 | # ───────────────────────────────────────────────────────────────────────────── |
| 1068 | # Error shape tests |
| 1069 | # ───────────────────────────────────────────────────────────────────────────── |
| 1070 | |
| 1071 | |
| 1072 | class TestPlanMergeErrorShapes: |
| 1073 | """Verify error output shapes are consistent across text and JSON modes.""" |
| 1074 | |
| 1075 | def _run_with_no_ours(self, repo: pathlib.Path, fmt: str = "json") -> tuple[int, str, str]: |
| 1076 | """Run plan-merge where ours-ref resolves to None.""" |
| 1077 | from muse.cli.commands.plan_merge import run as pm_run |
| 1078 | ns = argparse.Namespace( |
| 1079 | ours_ref="missing-ref", theirs_ref="main", |
| 1080 | base_ref=None, skip_call_graph=True, fmt=fmt, |
| 1081 | json_out=(fmt == "json"), |
| 1082 | ) |
| 1083 | import io |
| 1084 | captured = io.StringIO() |
| 1085 | old_stdout = sys.stdout |
| 1086 | old_stderr = sys.stderr |
| 1087 | sys.stdout = captured |
| 1088 | err_captured = io.StringIO() |
| 1089 | sys.stderr = err_captured |
| 1090 | old = os.getcwd() |
| 1091 | os.chdir(repo) |
| 1092 | exit_code = 0 |
| 1093 | try: |
| 1094 | with ( |
| 1095 | patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), |
| 1096 | |
| 1097 | patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), |
| 1098 | patch("muse.cli.commands.plan_merge.resolve_commit_ref", return_value=None), |
| 1099 | ): |
| 1100 | with pytest.raises(SystemExit) as exc: |
| 1101 | pm_run(ns) |
| 1102 | exit_code = exc.value.code |
| 1103 | finally: |
| 1104 | sys.stdout = old_stdout |
| 1105 | sys.stderr = old_stderr |
| 1106 | os.chdir(old) |
| 1107 | return exit_code, captured.getvalue(), err_captured.getvalue() |
| 1108 | |
| 1109 | def _run_with_no_theirs(self, repo: pathlib.Path, fmt: str = "json") -> tuple[int, str, str]: |
| 1110 | from muse.cli.commands.plan_merge import run as pm_run |
| 1111 | ours_c = _mock_commit("a" * 64) |
| 1112 | ns = argparse.Namespace( |
| 1113 | ours_ref="HEAD", theirs_ref="missing-branch", |
| 1114 | base_ref=None, skip_call_graph=True, fmt=fmt, |
| 1115 | json_out=(fmt == "json"), |
| 1116 | ) |
| 1117 | import io |
| 1118 | captured = io.StringIO() |
| 1119 | err_captured = io.StringIO() |
| 1120 | old_stdout, old_stderr = sys.stdout, sys.stderr |
| 1121 | sys.stdout = captured |
| 1122 | sys.stderr = err_captured |
| 1123 | old = os.getcwd() |
| 1124 | os.chdir(repo) |
| 1125 | try: |
| 1126 | with ( |
| 1127 | patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), |
| 1128 | |
| 1129 | patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), |
| 1130 | patch("muse.cli.commands.plan_merge.resolve_commit_ref", |
| 1131 | side_effect=lambda *a: ours_c if a[2] == "HEAD" else None), |
| 1132 | ): |
| 1133 | with pytest.raises(SystemExit) as exc: |
| 1134 | pm_run(ns) |
| 1135 | exit_code = exc.value.code |
| 1136 | finally: |
| 1137 | sys.stdout = old_stdout |
| 1138 | sys.stderr = old_stderr |
| 1139 | os.chdir(old) |
| 1140 | return exit_code, captured.getvalue(), err_captured.getvalue() |
| 1141 | |
| 1142 | def _run_with_no_base(self, repo: pathlib.Path, fmt: str = "json") -> tuple[int, str, str]: |
| 1143 | from muse.cli.commands.plan_merge import run as pm_run |
| 1144 | ours_c = _mock_commit("a" * 64) |
| 1145 | theirs_c = _mock_commit("b" * 64) |
| 1146 | ns = argparse.Namespace( |
| 1147 | ours_ref="HEAD", theirs_ref="main", |
| 1148 | base_ref="missing-base", skip_call_graph=True, fmt=fmt, |
| 1149 | json_out=(fmt == "json"), |
| 1150 | ) |
| 1151 | import io |
| 1152 | captured = io.StringIO() |
| 1153 | err_captured = io.StringIO() |
| 1154 | old_stdout, old_stderr = sys.stdout, sys.stderr |
| 1155 | sys.stdout = captured |
| 1156 | sys.stderr = err_captured |
| 1157 | old = os.getcwd() |
| 1158 | os.chdir(repo) |
| 1159 | try: |
| 1160 | with ( |
| 1161 | patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), |
| 1162 | |
| 1163 | patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), |
| 1164 | patch("muse.cli.commands.plan_merge.resolve_commit_ref", |
| 1165 | side_effect=lambda *a: ( |
| 1166 | ours_c if a[2] == "HEAD" else |
| 1167 | theirs_c if a[2] == "main" else |
| 1168 | None # base-ref not found |
| 1169 | )), |
| 1170 | ): |
| 1171 | with pytest.raises(SystemExit) as exc: |
| 1172 | pm_run(ns) |
| 1173 | exit_code = exc.value.code |
| 1174 | finally: |
| 1175 | sys.stdout = old_stdout |
| 1176 | sys.stderr = old_stderr |
| 1177 | os.chdir(old) |
| 1178 | return exit_code, captured.getvalue(), err_captured.getvalue() |
| 1179 | |
| 1180 | # ── ours-ref not found ──────────────────────────────────────────────────── |
| 1181 | |
| 1182 | def test_ours_not_found_json_has_error_and_status(self, repo: pathlib.Path) -> None: |
| 1183 | code, out, _ = self._run_with_no_ours(repo, fmt="json") |
| 1184 | assert code == 1 |
| 1185 | data = json.loads(out.strip()) |
| 1186 | assert "error" in data |
| 1187 | assert data["status"] == "error" |
| 1188 | |
| 1189 | def test_ours_not_found_json_error_mentions_ref(self, repo: pathlib.Path) -> None: |
| 1190 | code, out, _ = self._run_with_no_ours(repo, fmt="json") |
| 1191 | data = json.loads(out.strip()) |
| 1192 | assert "missing-ref" in data["error"] |
| 1193 | |
| 1194 | def test_ours_not_found_text_uses_tick_prefix(self, repo: pathlib.Path) -> None: |
| 1195 | code, _, err = self._run_with_no_ours(repo, fmt="text") |
| 1196 | assert code == 1 |
| 1197 | assert "❌" in err |
| 1198 | |
| 1199 | def test_ours_not_found_text_no_output_on_stdout(self, repo: pathlib.Path) -> None: |
| 1200 | code, out, _ = self._run_with_no_ours(repo, fmt="text") |
| 1201 | assert out == "" |
| 1202 | |
| 1203 | # ── theirs-ref not found ────────────────────────────────────────────────── |
| 1204 | |
| 1205 | def test_theirs_not_found_exits_1(self, repo: pathlib.Path) -> None: |
| 1206 | code, _, _ = self._run_with_no_theirs(repo) |
| 1207 | assert code == 1 |
| 1208 | |
| 1209 | def test_theirs_not_found_json_has_status(self, repo: pathlib.Path) -> None: |
| 1210 | code, out, _ = self._run_with_no_theirs(repo, fmt="json") |
| 1211 | data = json.loads(out.strip()) |
| 1212 | assert data["status"] == "error" |
| 1213 | |
| 1214 | def test_theirs_not_found_error_mentions_ref(self, repo: pathlib.Path) -> None: |
| 1215 | code, out, _ = self._run_with_no_theirs(repo, fmt="json") |
| 1216 | data = json.loads(out.strip()) |
| 1217 | assert "missing-branch" in data["error"] |
| 1218 | |
| 1219 | def test_theirs_not_found_text_uses_tick_prefix(self, repo: pathlib.Path) -> None: |
| 1220 | code, _, err = self._run_with_no_theirs(repo, fmt="text") |
| 1221 | assert "❌" in err |
| 1222 | |
| 1223 | # ── base-ref not found ──────────────────────────────────────────────────── |
| 1224 | |
| 1225 | def test_base_not_found_exits_1(self, repo: pathlib.Path) -> None: |
| 1226 | code, _, _ = self._run_with_no_base(repo) |
| 1227 | assert code == 1 |
| 1228 | |
| 1229 | def test_base_not_found_json_has_status(self, repo: pathlib.Path) -> None: |
| 1230 | code, out, _ = self._run_with_no_base(repo, fmt="json") |
| 1231 | data = json.loads(out.strip()) |
| 1232 | assert data["status"] == "error" |
| 1233 | |
| 1234 | def test_base_not_found_error_mentions_ref(self, repo: pathlib.Path) -> None: |
| 1235 | code, out, _ = self._run_with_no_base(repo, fmt="json") |
| 1236 | data = json.loads(out.strip()) |
| 1237 | assert "missing-base" in data["error"] |
| 1238 | |
| 1239 | def test_base_not_found_text_uses_tick_prefix(self, repo: pathlib.Path) -> None: |
| 1240 | code, _, err = self._run_with_no_base(repo, fmt="text") |
| 1241 | assert "❌" in err |
| 1242 | |
| 1243 | |
| 1244 | # ───────────────────────────────────────────────────────────────────────────── |
| 1245 | # Compact JSON and conflicts_by_type |
| 1246 | # ───────────────────────────────────────────────────────────────────────────── |
| 1247 | |
| 1248 | |
| 1249 | class TestPlanMergeJsonOutput: |
| 1250 | """Verify JSON output shape, compactness, and new fields.""" |
| 1251 | |
| 1252 | def test_json_is_single_line(self, repo: pathlib.Path) -> None: |
| 1253 | """Output must be compact — no embedded newlines from indent=2.""" |
| 1254 | _, out = _run_plan_merge(repo) |
| 1255 | lines = [ln for ln in out.splitlines() if ln.strip()] |
| 1256 | assert len(lines) == 1, f"JSON output must be a single line, got {len(lines)} lines" |
| 1257 | |
| 1258 | def test_json_is_valid(self, repo: pathlib.Path) -> None: |
| 1259 | _, out = _run_plan_merge(repo) |
| 1260 | data = json.loads(out) # raises if invalid |
| 1261 | assert isinstance(data, dict) |
| 1262 | |
| 1263 | def test_json_includes_conflicts_by_type_empty(self, repo: pathlib.Path) -> None: |
| 1264 | """When no conflicts, conflicts_by_type is an empty dict.""" |
| 1265 | _, out = _run_plan_merge(repo) |
| 1266 | data = json.loads(out) |
| 1267 | assert "conflicts_by_type" in data |
| 1268 | assert data["conflicts_by_type"] == {} |
| 1269 | |
| 1270 | def test_json_conflicts_by_type_single_type(self, repo: pathlib.Path) -> None: |
| 1271 | """Single conflict type → one entry in conflicts_by_type.""" |
| 1272 | base = {"f.py::fn": _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} |
| 1273 | ours = {"f.py::fn": _sym("fn", content_id="Y", body_hash="BH1", signature_id="SIG")} |
| 1274 | theirs = {"f.py::fn": _sym("fn", content_id="Z", body_hash="BH2", signature_id="SIG")} |
| 1275 | _, out = _run_plan_merge( |
| 1276 | repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 1277 | ) |
| 1278 | data = json.loads(out) |
| 1279 | assert data["conflicts_by_type"].get("symbol_edit_overlap", 0) == 1 |
| 1280 | |
| 1281 | def test_json_conflicts_by_type_count_matches_conflicts_field(self, repo: pathlib.Path) -> None: |
| 1282 | """Sum of conflicts_by_type values must equal the 'conflicts' field.""" |
| 1283 | N = 5 |
| 1284 | base = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"X{i}") for i in range(N)} |
| 1285 | ours = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"Y{i}") for i in range(N)} |
| 1286 | theirs = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"Z{i}") for i in range(N)} |
| 1287 | _, out = _run_plan_merge( |
| 1288 | repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 1289 | ) |
| 1290 | data = json.loads(out) |
| 1291 | assert sum(data["conflicts_by_type"].values()) == data["conflicts"] |
| 1292 | |
| 1293 | def test_json_schema_includes_all_required_fields(self, repo: pathlib.Path) -> None: |
| 1294 | """Backward-compat check: all documented schema fields are present.""" |
| 1295 | _, out = _run_plan_merge(repo) |
| 1296 | data = json.loads(out) |
| 1297 | required = { |
| 1298 | "schema", "ours", "theirs", "base", "base_auto_computed", |
| 1299 | "call_graph_available", "call_graph_skipped", "warnings", |
| 1300 | "total_symbols", "conflicts", "clean", "conflicts_by_type", |
| 1301 | "items", "duration_ms", |
| 1302 | } |
| 1303 | missing = required - data.keys() |
| 1304 | assert not missing, f"Missing JSON fields: {missing}" |
| 1305 | |
| 1306 | def test_json_items_contains_only_conflicts(self, repo: pathlib.Path) -> None: |
| 1307 | """items must contain only conflicting symbols, not clean ones.""" |
| 1308 | base = {"f.py::fn": _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} |
| 1309 | ours = {"f.py::fn": _sym("fn", content_id="Y", body_hash="BH1", signature_id="SIG")} |
| 1310 | theirs = {"f.py::fn": _sym("fn", content_id="X", body_hash="BH0", signature_id="SIG")} |
| 1311 | _, out = _run_plan_merge( |
| 1312 | repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 1313 | ) |
| 1314 | data = json.loads(out) |
| 1315 | assert data["conflicts"] == 0 |
| 1316 | assert data["items"] == [] |
| 1317 | |
| 1318 | def test_json_error_shape_has_status(self, repo: pathlib.Path) -> None: |
| 1319 | """Error JSON must include 'status' = 'error' (agent-parseable error shape).""" |
| 1320 | from muse.cli.commands.plan_merge import run as pm_run |
| 1321 | ns = argparse.Namespace( |
| 1322 | ours_ref="bad", theirs_ref="main", |
| 1323 | base_ref=None, skip_call_graph=True, fmt="json", |
| 1324 | json_out=True, |
| 1325 | ) |
| 1326 | import io |
| 1327 | captured = io.StringIO() |
| 1328 | old = os.getcwd() |
| 1329 | os.chdir(repo) |
| 1330 | old_stdout = sys.stdout |
| 1331 | sys.stdout = captured |
| 1332 | try: |
| 1333 | with ( |
| 1334 | patch("muse.cli.commands.plan_merge.require_repo", return_value=repo), |
| 1335 | |
| 1336 | patch("muse.cli.commands.plan_merge.read_current_branch", return_value="main"), |
| 1337 | patch("muse.cli.commands.plan_merge.resolve_commit_ref", return_value=None), |
| 1338 | ): |
| 1339 | with pytest.raises(SystemExit): |
| 1340 | pm_run(ns) |
| 1341 | finally: |
| 1342 | sys.stdout = old_stdout |
| 1343 | os.chdir(old) |
| 1344 | data = json.loads(captured.getvalue().strip()) |
| 1345 | assert data["status"] == "error" |
| 1346 | assert "error" in data |
| 1347 | |
| 1348 | |
| 1349 | # ───────────────────────────────────────────────────────────────────────────── |
| 1350 | # Stress tests — conflicts_by_type correctness at scale |
| 1351 | # ───────────────────────────────────────────────────────────────────────────── |
| 1352 | |
| 1353 | |
| 1354 | class TestPlanMergeConflictsByTypeStress: |
| 1355 | def test_conflicts_by_type_counts_correct_at_scale(self, repo: pathlib.Path) -> None: |
| 1356 | """100 symbol_edit_overlap conflicts → conflicts_by_type["symbol_edit_overlap"] == 100.""" |
| 1357 | N = 100 |
| 1358 | base = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"X{i}") for i in range(N)} |
| 1359 | ours = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"Y{i}") for i in range(N)} |
| 1360 | theirs = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"Z{i}") for i in range(N)} |
| 1361 | _, out = _run_plan_merge( |
| 1362 | repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 1363 | ) |
| 1364 | data = json.loads(out) |
| 1365 | assert data["conflicts_by_type"]["symbol_edit_overlap"] == N |
| 1366 | assert sum(data["conflicts_by_type"].values()) == N |
| 1367 | |
| 1368 | def test_json_is_still_compact_with_many_items(self, repo: pathlib.Path) -> None: |
| 1369 | """Even with 500 conflict items, JSON output is a single line.""" |
| 1370 | N = 500 |
| 1371 | base = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"X{i}") for i in range(N)} |
| 1372 | ours = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"Y{i}") for i in range(N)} |
| 1373 | theirs = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"Z{i}") for i in range(N)} |
| 1374 | _, out = _run_plan_merge( |
| 1375 | repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 1376 | ) |
| 1377 | lines = [ln for ln in out.splitlines() if ln.strip()] |
| 1378 | assert len(lines) == 1 |
| 1379 | |
| 1380 | def test_total_symbols_counts_union_of_all_three(self, repo: pathlib.Path) -> None: |
| 1381 | """total_symbols is |ours ∪ theirs ∪ base| — not just conflicts.""" |
| 1382 | # 5 shared (conflicts) + 5 ours-only (clean) + 5 theirs-only (clean) |
| 1383 | base = {f"f.py::shared{i}": _sym(f"shared{i}") for i in range(5)} |
| 1384 | ours_extra = {f"f.py::ours{i}": _sym(f"ours{i}") for i in range(5)} |
| 1385 | theirs_extra = {f"f.py::theirs{i}": _sym(f"theirs{i}") for i in range(5)} |
| 1386 | # Make shared symbols conflict |
| 1387 | ours_syms = { |
| 1388 | **{f"f.py::shared{i}": _sym(f"shared{i}", content_id=f"Y{i}") for i in range(5)}, |
| 1389 | **ours_extra, |
| 1390 | } |
| 1391 | theirs_syms = { |
| 1392 | **{f"f.py::shared{i}": _sym(f"shared{i}", content_id=f"Z{i}") for i in range(5)}, |
| 1393 | **theirs_extra, |
| 1394 | } |
| 1395 | _, out = _run_plan_merge( |
| 1396 | repo, mock_ours_syms=ours_syms, mock_theirs_syms=theirs_syms, mock_base_syms=base, |
| 1397 | ) |
| 1398 | data = json.loads(out) |
| 1399 | assert data["total_symbols"] == 15 |
| 1400 | assert data["conflicts"] == 5 |
| 1401 | assert data["clean"] == 10 |
| 1402 | |
| 1403 | |
| 1404 | # ───────────────────────────────────────────────────────────────────────────── |
| 1405 | # E2E tests — same-commit and multi-type plans |
| 1406 | # ───────────────────────────────────────────────────────────────────────────── |
| 1407 | |
| 1408 | |
| 1409 | class TestPlanMergeE2E: |
| 1410 | """E2E-style integration tests using mock commits that simulate real scenarios.""" |
| 1411 | |
| 1412 | def test_same_symbols_on_both_branches_zero_conflicts(self, repo: pathlib.Path) -> None: |
| 1413 | """Identical symbol trees → no conflicts, all clean.""" |
| 1414 | syms = {f"f.py::fn{i}": _sym(f"fn{i}") for i in range(20)} |
| 1415 | _, out = _run_plan_merge( |
| 1416 | repo, mock_ours_syms=syms, mock_theirs_syms=syms, mock_base_syms=syms, |
| 1417 | ) |
| 1418 | data = json.loads(out) |
| 1419 | assert data["conflicts"] == 0 |
| 1420 | assert data["total_symbols"] == 20 |
| 1421 | assert data["conflicts_by_type"] == {} |
| 1422 | |
| 1423 | def test_disjoint_changes_no_conflicts(self, repo: pathlib.Path) -> None: |
| 1424 | """Ours and theirs each modify different symbols → all no_conflict.""" |
| 1425 | base = {f"f.py::fn{i}": _sym(f"fn{i}", content_id=f"BASE{i}") for i in range(10)} |
| 1426 | ours = dict(base) |
| 1427 | theirs = dict(base) |
| 1428 | # Ours modifies 0–4, theirs modifies 5–9 |
| 1429 | for i in range(5): |
| 1430 | ours[f"f.py::fn{i}"] = _sym(f"fn{i}", content_id=f"OUR{i}") |
| 1431 | for i in range(5, 10): |
| 1432 | theirs[f"f.py::fn{i}"] = _sym(f"fn{i}", content_id=f"THEIR{i}") |
| 1433 | _, out = _run_plan_merge( |
| 1434 | repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 1435 | ) |
| 1436 | data = json.loads(out) |
| 1437 | assert data["conflicts"] == 0 |
| 1438 | |
| 1439 | def test_one_new_symbol_on_each_branch_no_conflict(self, repo: pathlib.Path) -> None: |
| 1440 | """Each branch adds a different new symbol → no overlap → no conflict.""" |
| 1441 | base: SymbolTree = {} |
| 1442 | ours = {"f.py::fn_ours": _sym("fn_ours")} |
| 1443 | theirs = {"f.py::fn_theirs": _sym("fn_theirs")} |
| 1444 | _, out = _run_plan_merge( |
| 1445 | repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 1446 | ) |
| 1447 | data = json.loads(out) |
| 1448 | assert data["conflicts"] == 0 |
| 1449 | |
| 1450 | def test_both_add_same_symbol_different_content_conflict(self, repo: pathlib.Path) -> None: |
| 1451 | """Both branches add the same address with different content → conflict.""" |
| 1452 | base: SymbolTree = {} |
| 1453 | ours = {"f.py::fn_new": _sym("fn_new", content_id="OUR", body_hash="BH1")} |
| 1454 | theirs = {"f.py::fn_new": _sym("fn_new", content_id="THEIR", body_hash="BH2")} |
| 1455 | _, out = _run_plan_merge( |
| 1456 | repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 1457 | ) |
| 1458 | data = json.loads(out) |
| 1459 | assert data["conflicts"] == 1 |
| 1460 | |
| 1461 | def test_both_add_same_symbol_same_content_no_conflict(self, repo: pathlib.Path) -> None: |
| 1462 | """Both branches add the same symbol with identical content → no conflict.""" |
| 1463 | base: SymbolTree = {} |
| 1464 | sym = _sym("fn_new", content_id="SAME") |
| 1465 | _, out = _run_plan_merge( |
| 1466 | repo, mock_ours_syms={"f.py::fn_new": sym}, |
| 1467 | mock_theirs_syms={"f.py::fn_new": sym}, |
| 1468 | mock_base_syms=base, |
| 1469 | ) |
| 1470 | data = json.loads(out) |
| 1471 | assert data["conflicts"] == 0 |
| 1472 | |
| 1473 | def test_both_delete_same_symbol_no_conflict(self, repo: pathlib.Path) -> None: |
| 1474 | """Both branches delete the same symbol → no conflict.""" |
| 1475 | base = {"f.py::fn": _sym("fn")} |
| 1476 | ours: SymbolTree = {} |
| 1477 | theirs: SymbolTree = {} |
| 1478 | _, out = _run_plan_merge( |
| 1479 | repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 1480 | ) |
| 1481 | data = json.loads(out) |
| 1482 | assert data["conflicts"] == 0 |
| 1483 | |
| 1484 | def test_rename_edit_and_overlap_in_same_plan(self, repo: pathlib.Path) -> None: |
| 1485 | """A plan with both rename_edit and symbol_edit_overlap appears correctly in conflicts_by_type.""" |
| 1486 | body = "UNIQUEBODY12345" |
| 1487 | # Symbol A: ours renames it, theirs modifies original → rename_edit |
| 1488 | base_a = {"f.py::fn_a_old": {**_sym("fn_a_old", body_hash=body), "name": "fn_a_old"}} |
| 1489 | ours_a = {"f.py::fn_a_new": {**_sym("fn_a_new", body_hash=body), "name": "fn_a_new"}} |
| 1490 | theirs_a = {"f.py::fn_a_old": {**_sym("fn_a_old", content_id="Z", body_hash="DIFF"), "name": "fn_a_old"}} |
| 1491 | # Symbol B: both change differently → symbol_edit_overlap |
| 1492 | base_b = {"f.py::fn_b": _sym("fn_b", content_id="X", body_hash="BH0", signature_id="SIG")} |
| 1493 | ours_b = {"f.py::fn_b": _sym("fn_b", content_id="Y", body_hash="BH1", signature_id="SIG")} |
| 1494 | theirs_b = {"f.py::fn_b": _sym("fn_b", content_id="Z2", body_hash="BH2", signature_id="SIG")} |
| 1495 | |
| 1496 | base = {**base_a, **base_b} |
| 1497 | ours = {**ours_a, **ours_b} |
| 1498 | theirs = {**theirs_a, **theirs_b} |
| 1499 | |
| 1500 | _, out = _run_plan_merge( |
| 1501 | repo, mock_ours_syms=ours, mock_theirs_syms=theirs, mock_base_syms=base, |
| 1502 | ) |
| 1503 | data = json.loads(out) |
| 1504 | cbt = data["conflicts_by_type"] |
| 1505 | assert cbt.get("rename_edit", 0) >= 1 |
| 1506 | assert cbt.get("symbol_edit_overlap", 0) >= 1 |
| 1507 | assert sum(cbt.values()) == data["conflicts"] |
| 1508 | |
| 1509 | |
| 1510 | class TestRegisterFlags: |
| 1511 | def _parse(self, *args: str) -> "argparse.Namespace": |
| 1512 | import argparse |
| 1513 | from muse.cli.commands.plan_merge import register |
| 1514 | p = argparse.ArgumentParser() |
| 1515 | subs = p.add_subparsers() |
| 1516 | register(subs) |
| 1517 | return p.parse_args(["plan-merge", "HEAD", "main", *args]) |
| 1518 | |
| 1519 | def test_json_short_flag(self) -> None: |
| 1520 | args = self._parse("-j") |
| 1521 | assert args.json_out is True |
| 1522 | |
| 1523 | def test_json_long_flag(self) -> None: |
| 1524 | args = self._parse("--json") |
| 1525 | assert args.json_out is True |
| 1526 | |
| 1527 | def test_default_no_json(self) -> None: |
| 1528 | args = self._parse() |
| 1529 | assert args.json_out is False |
File History
1 commit
sha256:d11a87833d5fad6059b7662844bf5448a8911a17cce7a51811f71ad394f248eb
bump to v0.2.0rc13
Human
patch
6 days ago