test_phase3_strategy_matrix.py
python
sha256:f02589f8e157757da430d82f35a64c0b7eee5033f6d13076ea395f9942151790
test(phase3): full strategy matrix — 24 SM tests, rebase→linear
Sonnet 4.6
1 day ago
| 1 | """TDD tests for Phase 3 — Full strategy × history × granularity matrix. |
| 2 | |
| 3 | Issue #86 Phase 3 deliverables: |
| 4 | SM_01–04: convergent (untouched file) under each strategy → no conflict |
| 5 | SM_05–08: convergent (convergent edit) under each strategy → no conflict |
| 6 | SM_09–12: file divergence: recursive/overlay/snapshot/replay |
| 7 | SM_13–15: add/add collision: recursive/overlay/snapshot |
| 8 | SM_16–18: delete/modify: recursive/overlay/snapshot |
| 9 | SM_19–21: history mode → correct commit graph shape |
| 10 | SM_22–24: PHANTOM regression guard under all 4 strategies |
| 11 | """ |
| 12 | from __future__ import annotations |
| 13 | |
| 14 | import datetime |
| 15 | import json |
| 16 | import pathlib |
| 17 | |
| 18 | import pytest |
| 19 | from tests.cli_test_helper import CliRunner |
| 20 | from muse.core.types import blob_id, fake_id |
| 21 | from muse.core.object_store import write_object |
| 22 | from muse.core.paths import heads_dir, muse_dir, ref_path |
| 23 | |
| 24 | runner = CliRunner() |
| 25 | cli = None |
| 26 | |
| 27 | |
| 28 | # --------------------------------------------------------------------------- |
| 29 | # Shared helpers |
| 30 | # --------------------------------------------------------------------------- |
| 31 | |
| 32 | def _env(root: pathlib.Path) -> dict: |
| 33 | return {"MUSE_REPO_ROOT": str(root)} |
| 34 | |
| 35 | |
| 36 | def _init_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: |
| 37 | dot_muse = muse_dir(tmp_path) |
| 38 | dot_muse.mkdir() |
| 39 | repo_id = fake_id("repo") |
| 40 | (dot_muse / "repo.json").write_text(json.dumps({ |
| 41 | "repo_id": repo_id, |
| 42 | "domain": "code", |
| 43 | "default_branch": "main", |
| 44 | "created_at": "2025-01-01T00:00:00+00:00", |
| 45 | }), encoding="utf-8") |
| 46 | (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 47 | (dot_muse / "refs" / "heads").mkdir(parents=True) |
| 48 | (dot_muse / "snapshots").mkdir() |
| 49 | (dot_muse / "commits").mkdir() |
| 50 | (dot_muse / "objects").mkdir() |
| 51 | return tmp_path, repo_id |
| 52 | |
| 53 | |
| 54 | def _write_obj(root: pathlib.Path, content: bytes) -> str: |
| 55 | oid = blob_id(content) |
| 56 | write_object(root, oid, content) |
| 57 | return oid |
| 58 | |
| 59 | |
| 60 | def _make_commit( |
| 61 | root: pathlib.Path, |
| 62 | repo_id: str, |
| 63 | branch: str = "main", |
| 64 | message: str = "test", |
| 65 | manifest: dict | None = None, |
| 66 | parent_id: str | None = None, |
| 67 | ) -> str: |
| 68 | from muse.core.commits import CommitRecord, write_commit |
| 69 | from muse.core.snapshots import SnapshotRecord, write_snapshot |
| 70 | from muse.core.ids import hash_snapshot, hash_commit |
| 71 | |
| 72 | ref_file = ref_path(root, branch) |
| 73 | if parent_id is None: |
| 74 | parent_id = ref_file.read_text().strip() if ref_file.exists() else None |
| 75 | m = manifest or {} |
| 76 | snap_id = hash_snapshot(m) |
| 77 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 78 | commit_id = hash_commit( |
| 79 | parent_ids=[parent_id] if parent_id else [], |
| 80 | snapshot_id=snap_id, |
| 81 | message=message, |
| 82 | committed_at_iso=committed_at.isoformat(), |
| 83 | ) |
| 84 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=m)) |
| 85 | write_commit(root, CommitRecord( |
| 86 | commit_id=commit_id, |
| 87 | branch=branch, |
| 88 | snapshot_id=snap_id, |
| 89 | message=message, |
| 90 | committed_at=committed_at, |
| 91 | parent_commit_id=parent_id, |
| 92 | )) |
| 93 | ref_file.parent.mkdir(parents=True, exist_ok=True) |
| 94 | ref_file.write_text(commit_id, encoding="utf-8") |
| 95 | return commit_id |
| 96 | |
| 97 | |
| 98 | def _checkout(root: pathlib.Path, branch: str, manifest: dict) -> None: |
| 99 | """Set HEAD to branch and write manifest file contents to disk.""" |
| 100 | (muse_dir(root) / "HEAD").write_text(f"ref: refs/heads/{branch}", encoding="utf-8") |
| 101 | for path, oid in manifest.items(): |
| 102 | from muse.core.object_store import read_object |
| 103 | content = read_object(root, oid) |
| 104 | if content is not None: |
| 105 | dest = root / path |
| 106 | dest.parent.mkdir(parents=True, exist_ok=True) |
| 107 | dest.write_bytes(content) |
| 108 | |
| 109 | |
| 110 | def _merged_snapshot(root: pathlib.Path, branch: str = "main") -> dict: |
| 111 | """Read the manifest of the current HEAD commit on *branch*.""" |
| 112 | from muse.core.commits import read_commit |
| 113 | from muse.core.snapshots import read_snapshot |
| 114 | from muse.core.refs import resolve_any_ref |
| 115 | commit_id = resolve_any_ref(root, branch) |
| 116 | assert commit_id is not None |
| 117 | rec = read_commit(root, commit_id) |
| 118 | assert rec is not None |
| 119 | snap = read_snapshot(root, rec.snapshot_id) |
| 120 | assert snap is not None |
| 121 | return snap.manifest |
| 122 | |
| 123 | |
| 124 | def _untouched_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str, str, str]: |
| 125 | """Repo where feat only modifies file_x; shared.py is untouched on both sides.""" |
| 126 | root, repo_id = _init_repo(tmp_path) |
| 127 | shared_id = _write_obj(root, b"shared unchanged on both sides") |
| 128 | x_base = _write_obj(root, b"file_x base version") |
| 129 | base_id = _make_commit(root, repo_id, "main", "base", |
| 130 | {"file_x.py": x_base, "shared.py": shared_id}) |
| 131 | x_feat = _write_obj(root, b"file_x modified by feat") |
| 132 | (heads_dir(root) / "feat").write_text(base_id, encoding="utf-8") |
| 133 | _make_commit(root, repo_id, "feat", "feat changes x", |
| 134 | {"file_x.py": x_feat, "shared.py": shared_id}, parent_id=base_id) |
| 135 | _checkout(root, "main", {"file_x.py": x_base, "shared.py": shared_id}) |
| 136 | return root, repo_id, x_feat, shared_id |
| 137 | |
| 138 | |
| 139 | def _convergent_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str, str]: |
| 140 | """Repo where both sides independently arrive at the same content for config.py.""" |
| 141 | root, repo_id = _init_repo(tmp_path) |
| 142 | cfg_v1 = _write_obj(root, b"config = 1") |
| 143 | base_id = _make_commit(root, repo_id, "main", "base", {"config.py": cfg_v1}) |
| 144 | cfg_v2 = _write_obj(root, b"config = 2 # same on both sides") |
| 145 | (heads_dir(root) / "feat").write_text(base_id, encoding="utf-8") |
| 146 | _make_commit(root, repo_id, "feat", "feat to v2", |
| 147 | {"config.py": cfg_v2}, parent_id=base_id) |
| 148 | # main also arrives at v2 |
| 149 | _make_commit(root, repo_id, "main", "main to v2", |
| 150 | {"config.py": cfg_v2}, parent_id=base_id) |
| 151 | _checkout(root, "main", {"config.py": cfg_v2}) |
| 152 | return root, repo_id, cfg_v2 |
| 153 | |
| 154 | |
| 155 | def _divergent_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str, str, str]: |
| 156 | """Repo where config.py is modified differently on main and feat. |
| 157 | |
| 158 | Uses valid Python with the same symbol name (`config`) set to different |
| 159 | values so the CodePlugin detects a symbol-level conflict and does NOT |
| 160 | auto-merge via the independence path. |
| 161 | """ |
| 162 | root, repo_id = _init_repo(tmp_path) |
| 163 | # Valid Python: single assignment so CodePlugin sees symbol `config` |
| 164 | cfg_base = _write_obj(root, b"config = 1\n") |
| 165 | base_id = _make_commit(root, repo_id, "main", "base", {"config.py": cfg_base}) |
| 166 | cfg_ours = _write_obj(root, b"config = 2\n") |
| 167 | cfg_theirs = _write_obj(root, b"config = 3\n") |
| 168 | (heads_dir(root) / "feat").write_text(base_id, encoding="utf-8") |
| 169 | _make_commit(root, repo_id, "feat", "feat changes config", |
| 170 | {"config.py": cfg_theirs}, parent_id=base_id) |
| 171 | _make_commit(root, repo_id, "main", "main changes config", |
| 172 | {"config.py": cfg_ours}, parent_id=base_id) |
| 173 | _checkout(root, "main", {"config.py": cfg_ours}) |
| 174 | return root, repo_id, cfg_ours, cfg_theirs |
| 175 | |
| 176 | |
| 177 | def _addadd_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str, str, str]: |
| 178 | """Repo where both sides add new.py at same path with different content. |
| 179 | |
| 180 | Uses valid Python with a function of the same name so the CodePlugin |
| 181 | can detect a symbol-level conflict rather than auto-merging via independence. |
| 182 | """ |
| 183 | root, repo_id = _init_repo(tmp_path) |
| 184 | base_file = _write_obj(root, b"x = 1\n") |
| 185 | base_id = _make_commit(root, repo_id, "main", "base", {"existing.py": base_file}) |
| 186 | # Both branches add the same path — different function bodies ensure conflict |
| 187 | new_ours = _write_obj(root, b"def helper():\n return 'ours'\n") |
| 188 | new_theirs = _write_obj(root, b"def helper():\n return 'theirs'\n") |
| 189 | (heads_dir(root) / "feat").write_text(base_id, encoding="utf-8") |
| 190 | _make_commit(root, repo_id, "feat", "feat adds new.py", |
| 191 | {"existing.py": base_file, "new.py": new_theirs}, parent_id=base_id) |
| 192 | _make_commit(root, repo_id, "main", "main adds new.py", |
| 193 | {"existing.py": base_file, "new.py": new_ours}, parent_id=base_id) |
| 194 | _checkout(root, "main", {"existing.py": base_file, "new.py": new_ours}) |
| 195 | return root, repo_id, new_ours, new_theirs |
| 196 | |
| 197 | |
| 198 | def _deletemodify_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str, str]: |
| 199 | """Repo where main deletes target.py and feat modifies it.""" |
| 200 | root, repo_id = _init_repo(tmp_path) |
| 201 | # Valid Python ensures the CodePlugin doesn't accidentally auto-merge |
| 202 | target_base = _write_obj(root, b"value = 0\n") |
| 203 | base_id = _make_commit(root, repo_id, "main", "base", {"target.py": target_base}) |
| 204 | target_modified = _write_obj(root, b"value = 42\n") |
| 205 | (heads_dir(root) / "feat").write_text(base_id, encoding="utf-8") |
| 206 | _make_commit(root, repo_id, "feat", "feat modifies target", |
| 207 | {"target.py": target_modified}, parent_id=base_id) |
| 208 | # main deletes target.py |
| 209 | _make_commit(root, repo_id, "main", "main deletes target", |
| 210 | {}, parent_id=base_id) |
| 211 | _checkout(root, "main", {}) |
| 212 | return root, repo_id, target_modified |
| 213 | |
| 214 | |
| 215 | def _clean_twobranch_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: |
| 216 | """Clean merge: main changes file_a, feat changes file_b — no conflicts.""" |
| 217 | root, repo_id = _init_repo(tmp_path) |
| 218 | a_id = _write_obj(root, b"file_a base") |
| 219 | b_id = _write_obj(root, b"file_b base") |
| 220 | base_id = _make_commit(root, repo_id, "main", "base", |
| 221 | {"file_a.py": a_id, "file_b.py": b_id}) |
| 222 | a_v2 = _write_obj(root, b"file_a v2 main only") |
| 223 | b_v2 = _write_obj(root, b"file_b v2 feat only") |
| 224 | (heads_dir(root) / "feat").write_text(base_id, encoding="utf-8") |
| 225 | _make_commit(root, repo_id, "feat", "feat changes b", |
| 226 | {"file_a.py": a_id, "file_b.py": b_v2}, parent_id=base_id) |
| 227 | _make_commit(root, repo_id, "main", "main changes a", |
| 228 | {"file_a.py": a_v2, "file_b.py": b_id}, parent_id=base_id) |
| 229 | _checkout(root, "main", {"file_a.py": a_v2, "file_b.py": b_id}) |
| 230 | return root, repo_id |
| 231 | |
| 232 | |
| 233 | # --------------------------------------------------------------------------- |
| 234 | # Group 1 — Convergent cases under every strategy (SM_01–SM_08) |
| 235 | # All must produce conflicts == [] |
| 236 | # --------------------------------------------------------------------------- |
| 237 | |
| 238 | class TestConvergentUntouched: |
| 239 | """SM_01–04: untouched file under each strategy must not produce a conflict.""" |
| 240 | |
| 241 | def test_SM_01_recursive_untouched_no_conflict(self, tmp_path: pathlib.Path) -> None: |
| 242 | """SM_01 — recursive + untouched file → no conflict.""" |
| 243 | root, _, _, shared_id = _untouched_repo(tmp_path) |
| 244 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "recursive", "--json"], |
| 245 | env=_env(root), catch_exceptions=False) |
| 246 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 247 | assert "shared.py" not in data.get("conflicts", []), "SM_01: shared.py must not conflict" |
| 248 | assert data.get("exit_code") == 0 |
| 249 | |
| 250 | def test_SM_02_overlay_untouched_no_conflict(self, tmp_path: pathlib.Path) -> None: |
| 251 | """SM_02 — overlay + untouched file → no conflict.""" |
| 252 | root, _, _, shared_id = _untouched_repo(tmp_path) |
| 253 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "overlay", "--json"], |
| 254 | env=_env(root), catch_exceptions=False) |
| 255 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 256 | assert "shared.py" not in data.get("conflicts", []), "SM_02: shared.py must not conflict" |
| 257 | assert data.get("exit_code") == 0 |
| 258 | |
| 259 | def test_SM_03_snapshot_untouched_no_conflict(self, tmp_path: pathlib.Path) -> None: |
| 260 | """SM_03 — snapshot + untouched file → no conflict.""" |
| 261 | root, _, _, shared_id = _untouched_repo(tmp_path) |
| 262 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "snapshot", "--json"], |
| 263 | env=_env(root), catch_exceptions=False) |
| 264 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 265 | assert "shared.py" not in data.get("conflicts", []), "SM_03: shared.py must not conflict" |
| 266 | assert data.get("exit_code") == 0 |
| 267 | |
| 268 | def test_SM_04_replay_untouched_no_conflict(self, tmp_path: pathlib.Path) -> None: |
| 269 | """SM_04 — replay + untouched file → no conflict.""" |
| 270 | root, _, _, shared_id = _untouched_repo(tmp_path) |
| 271 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "replay", "--json"], |
| 272 | env=_env(root), catch_exceptions=False) |
| 273 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 274 | assert "shared.py" not in data.get("conflicts", []), "SM_04: shared.py must not conflict" |
| 275 | assert data.get("exit_code") == 0 |
| 276 | |
| 277 | |
| 278 | class TestConvergentEdit: |
| 279 | """SM_05–08: convergent edit under each strategy → no conflict, convergent content in snapshot.""" |
| 280 | |
| 281 | def test_SM_05_recursive_convergent_no_conflict(self, tmp_path: pathlib.Path) -> None: |
| 282 | """SM_05 — recursive + convergent edit → no conflict, convergent content in merged snapshot.""" |
| 283 | root, _, cfg_v2 = _convergent_repo(tmp_path) |
| 284 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "recursive", "--json"], |
| 285 | env=_env(root), catch_exceptions=False) |
| 286 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 287 | assert data.get("conflicts", []) == [], "SM_05: convergent edit must not conflict" |
| 288 | assert data.get("exit_code") == 0 |
| 289 | snap = _merged_snapshot(root, "main") |
| 290 | assert snap.get("config.py") == cfg_v2, "SM_05: merged snapshot must have convergent content" |
| 291 | |
| 292 | def test_SM_06_overlay_convergent_no_conflict(self, tmp_path: pathlib.Path) -> None: |
| 293 | """SM_06 — overlay + convergent edit → no conflict, convergent content in merged snapshot.""" |
| 294 | root, _, cfg_v2 = _convergent_repo(tmp_path) |
| 295 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "overlay", "--json"], |
| 296 | env=_env(root), catch_exceptions=False) |
| 297 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 298 | assert data.get("conflicts", []) == [], "SM_06: convergent edit must not conflict" |
| 299 | assert data.get("exit_code") == 0 |
| 300 | snap = _merged_snapshot(root, "main") |
| 301 | assert snap.get("config.py") == cfg_v2, "SM_06: merged snapshot must have convergent content" |
| 302 | |
| 303 | def test_SM_07_snapshot_convergent_no_conflict(self, tmp_path: pathlib.Path) -> None: |
| 304 | """SM_07 — snapshot + convergent edit → no conflict, convergent content in merged snapshot.""" |
| 305 | root, _, cfg_v2 = _convergent_repo(tmp_path) |
| 306 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "snapshot", "--json"], |
| 307 | env=_env(root), catch_exceptions=False) |
| 308 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 309 | assert data.get("conflicts", []) == [], "SM_07: convergent edit must not conflict" |
| 310 | assert data.get("exit_code") == 0 |
| 311 | snap = _merged_snapshot(root, "main") |
| 312 | assert snap.get("config.py") == cfg_v2, "SM_07: merged snapshot must have convergent content" |
| 313 | |
| 314 | def test_SM_08_replay_convergent_no_conflict(self, tmp_path: pathlib.Path) -> None: |
| 315 | """SM_08 — replay + convergent edit → no conflict, convergent content in merged snapshot.""" |
| 316 | root, _, cfg_v2 = _convergent_repo(tmp_path) |
| 317 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "replay", "--json"], |
| 318 | env=_env(root), catch_exceptions=False) |
| 319 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 320 | assert data.get("conflicts", []) == [], "SM_08: convergent edit must not conflict" |
| 321 | assert data.get("exit_code") == 0 |
| 322 | snap = _merged_snapshot(root, "main") |
| 323 | assert snap.get("config.py") == cfg_v2, "SM_08: merged snapshot must have convergent content" |
| 324 | |
| 325 | |
| 326 | # --------------------------------------------------------------------------- |
| 327 | # Group 2 — Divergent cases: conflict surfacing vs auto-resolution (SM_09–SM_18) |
| 328 | # --------------------------------------------------------------------------- |
| 329 | |
| 330 | class TestFileDivergence: |
| 331 | """SM_09–12: file divergence under each strategy.""" |
| 332 | |
| 333 | def test_SM_09_recursive_file_divergence_conflicts(self, tmp_path: pathlib.Path) -> None: |
| 334 | """SM_09 — recursive + file divergence → conflicts non-empty.""" |
| 335 | root, _, _, _ = _divergent_repo(tmp_path) |
| 336 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "recursive", "--json"], |
| 337 | env=_env(root), catch_exceptions=False) |
| 338 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 339 | assert len(data.get("conflicts", [])) > 0, "SM_09: recursive must surface file divergence conflict" |
| 340 | |
| 341 | def test_SM_10_overlay_file_divergence_no_conflict_theirs_wins(self, tmp_path: pathlib.Path) -> None: |
| 342 | """SM_10 — overlay + file divergence → conflicts == [], theirs wins in merged snapshot.""" |
| 343 | root, _, cfg_ours, cfg_theirs = _divergent_repo(tmp_path) |
| 344 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "overlay", "--json"], |
| 345 | env=_env(root), catch_exceptions=False) |
| 346 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 347 | assert data.get("conflicts", []) == [], "SM_10: overlay must auto-resolve file divergence" |
| 348 | assert data.get("exit_code") == 0 |
| 349 | snap = _merged_snapshot(root, "main") |
| 350 | assert snap.get("config.py") == cfg_theirs, "SM_10: overlay must keep theirs (feat) version" |
| 351 | |
| 352 | def test_SM_11_snapshot_file_divergence_conflicts(self, tmp_path: pathlib.Path) -> None: |
| 353 | """SM_11 — snapshot + file divergence → conflicts non-empty.""" |
| 354 | root, _, _, _ = _divergent_repo(tmp_path) |
| 355 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "snapshot", "--json"], |
| 356 | env=_env(root), catch_exceptions=False) |
| 357 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 358 | assert len(data.get("conflicts", [])) > 0, "SM_11: snapshot must surface file divergence conflict" |
| 359 | |
| 360 | def test_SM_12_replay_file_divergence_conflicts(self, tmp_path: pathlib.Path) -> None: |
| 361 | """SM_12 — replay + file divergence → conflicts non-empty.""" |
| 362 | root, _, _, _ = _divergent_repo(tmp_path) |
| 363 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "replay", "--json"], |
| 364 | env=_env(root), catch_exceptions=False) |
| 365 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 366 | assert len(data.get("conflicts", [])) > 0, "SM_12: replay must surface file divergence conflict" |
| 367 | |
| 368 | |
| 369 | class TestAddAddCollision: |
| 370 | """SM_13–15: add/add collision under recursive, overlay, snapshot.""" |
| 371 | |
| 372 | def test_SM_13_recursive_addadd_conflicts(self, tmp_path: pathlib.Path) -> None: |
| 373 | """SM_13 — recursive + add/add collision → conflicts non-empty.""" |
| 374 | root, _, _, _ = _addadd_repo(tmp_path) |
| 375 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "recursive", "--json"], |
| 376 | env=_env(root), catch_exceptions=False) |
| 377 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 378 | assert len(data.get("conflicts", [])) > 0, "SM_13: recursive must surface add/add collision" |
| 379 | |
| 380 | def test_SM_14_overlay_addadd_no_conflict_theirs_wins(self, tmp_path: pathlib.Path) -> None: |
| 381 | """SM_14 — overlay + add/add collision → conflicts == [], theirs content wins.""" |
| 382 | root, _, new_ours, new_theirs = _addadd_repo(tmp_path) |
| 383 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "overlay", "--json"], |
| 384 | env=_env(root), catch_exceptions=False) |
| 385 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 386 | assert data.get("conflicts", []) == [], "SM_14: overlay must auto-resolve add/add collision" |
| 387 | assert data.get("exit_code") == 0 |
| 388 | snap = _merged_snapshot(root, "main") |
| 389 | assert snap.get("new.py") == new_theirs, "SM_14: overlay must keep theirs (feat) version of new.py" |
| 390 | |
| 391 | def test_SM_15_snapshot_addadd_conflicts(self, tmp_path: pathlib.Path) -> None: |
| 392 | """SM_15 — snapshot + add/add collision → conflicts non-empty.""" |
| 393 | root, _, _, _ = _addadd_repo(tmp_path) |
| 394 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "snapshot", "--json"], |
| 395 | env=_env(root), catch_exceptions=False) |
| 396 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 397 | assert len(data.get("conflicts", [])) > 0, "SM_15: snapshot must surface add/add collision" |
| 398 | |
| 399 | |
| 400 | class TestDeleteModify: |
| 401 | """SM_16–18: delete/modify under recursive, overlay, snapshot.""" |
| 402 | |
| 403 | def test_SM_16_recursive_deletemodify_conflicts(self, tmp_path: pathlib.Path) -> None: |
| 404 | """SM_16 — recursive + delete/modify → conflicts non-empty.""" |
| 405 | root, _, _ = _deletemodify_repo(tmp_path) |
| 406 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "recursive", "--json"], |
| 407 | env=_env(root), catch_exceptions=False) |
| 408 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 409 | assert len(data.get("conflicts", [])) > 0, "SM_16: recursive must surface delete/modify conflict" |
| 410 | |
| 411 | def test_SM_17_overlay_deletemodify_no_conflict_modified_survives(self, tmp_path: pathlib.Path) -> None: |
| 412 | """SM_17 — overlay + delete/modify → conflicts == [], modified version survives.""" |
| 413 | root, _, target_modified = _deletemodify_repo(tmp_path) |
| 414 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "overlay", "--json"], |
| 415 | env=_env(root), catch_exceptions=False) |
| 416 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 417 | assert data.get("conflicts", []) == [], "SM_17: overlay must auto-resolve delete/modify" |
| 418 | assert data.get("exit_code") == 0 |
| 419 | snap = _merged_snapshot(root, "main") |
| 420 | assert "target.py" in snap, "SM_17: target.py must survive (theirs/feat modified it)" |
| 421 | assert snap["target.py"] == target_modified, "SM_17: theirs modified version must win" |
| 422 | |
| 423 | def test_SM_18_snapshot_deletemodify_conflicts(self, tmp_path: pathlib.Path) -> None: |
| 424 | """SM_18 — snapshot + delete/modify → conflicts non-empty.""" |
| 425 | root, _, _ = _deletemodify_repo(tmp_path) |
| 426 | result = runner.invoke(cli, ["merge", "feat", "--strategy", "snapshot", "--json"], |
| 427 | env=_env(root), catch_exceptions=False) |
| 428 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 429 | assert len(data.get("conflicts", [])) > 0, "SM_18: snapshot must surface delete/modify conflict" |
| 430 | |
| 431 | |
| 432 | # --------------------------------------------------------------------------- |
| 433 | # Group 3 — History mode produces correct commit graph shape (SM_19–SM_21) |
| 434 | # --------------------------------------------------------------------------- |
| 435 | |
| 436 | class TestHistoryMode: |
| 437 | """SM_19–21: --history flag controls commit graph structure.""" |
| 438 | |
| 439 | def test_SM_19_history_merge_two_parent_commit(self, tmp_path: pathlib.Path) -> None: |
| 440 | """SM_19 — --history merge → commit has two parents (parent2_commit_id set).""" |
| 441 | root, _ = _clean_twobranch_repo(tmp_path) |
| 442 | result = runner.invoke(cli, ["merge", "feat", "--history", "merge", "--json"], |
| 443 | env=_env(root), catch_exceptions=False) |
| 444 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 445 | assert data.get("exit_code") == 0, "SM_19: clean merge must succeed" |
| 446 | commit_id = data.get("commit_id") |
| 447 | assert commit_id is not None |
| 448 | |
| 449 | from muse.core.commits import read_commit |
| 450 | rec = read_commit(root, commit_id) |
| 451 | assert rec is not None |
| 452 | assert rec.parent2_commit_id is not None, "SM_19: --history merge must produce two-parent commit" |
| 453 | |
| 454 | def test_SM_20_history_squash_single_parent_commit(self, tmp_path: pathlib.Path) -> None: |
| 455 | """SM_20 — --history squash → commit has one parent (parent2_commit_id is None).""" |
| 456 | root, _ = _clean_twobranch_repo(tmp_path) |
| 457 | result = runner.invoke(cli, ["merge", "feat", "--history", "squash", "--json"], |
| 458 | env=_env(root), catch_exceptions=False) |
| 459 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 460 | assert data.get("exit_code") == 0, "SM_20: clean squash merge must succeed" |
| 461 | commit_id = data.get("commit_id") |
| 462 | assert commit_id is not None |
| 463 | |
| 464 | from muse.core.commits import read_commit |
| 465 | rec = read_commit(root, commit_id) |
| 466 | assert rec is not None |
| 467 | assert rec.parent2_commit_id is None, "SM_20: --history squash must produce single-parent commit" |
| 468 | |
| 469 | def test_SM_21_history_rebase_single_parent_commit(self, tmp_path: pathlib.Path) -> None: |
| 470 | """SM_21 — --history rebase → commits are linear; no merge commit created. |
| 471 | |
| 472 | Full commit-by-commit replay is Phase 4 scope. For now, rebase produces a |
| 473 | single-parent commit (same graph shape as squash). |
| 474 | """ |
| 475 | root, _ = _clean_twobranch_repo(tmp_path) |
| 476 | result = runner.invoke(cli, ["merge", "feat", "--history", "rebase", "--json"], |
| 477 | env=_env(root), catch_exceptions=False) |
| 478 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 479 | assert data.get("exit_code") == 0, "SM_21: clean rebase merge must succeed" |
| 480 | commit_id = data.get("commit_id") |
| 481 | assert commit_id is not None |
| 482 | |
| 483 | from muse.core.commits import read_commit |
| 484 | rec = read_commit(root, commit_id) |
| 485 | assert rec is not None |
| 486 | assert rec.parent2_commit_id is None, "SM_21: --history rebase must produce linear (single-parent) commit" |
| 487 | |
| 488 | |
| 489 | # --------------------------------------------------------------------------- |
| 490 | # Group 4 — PHANTOM regressions under all 4 strategies (SM_22–SM_24) |
| 491 | # --------------------------------------------------------------------------- |
| 492 | |
| 493 | class TestPhantomRegressions: |
| 494 | """SM_22–24: PHANTOM_01/02/05 scenarios must pass under every strategy.""" |
| 495 | |
| 496 | def test_SM_22_phantom01_untouched_all_strategies(self, tmp_path: pathlib.Path) -> None: |
| 497 | """SM_22 — PHANTOM_01 (untouched file) passes under recursive, overlay, snapshot, replay.""" |
| 498 | for strategy in ("recursive", "overlay", "snapshot", "replay"): |
| 499 | sub = tmp_path / strategy |
| 500 | sub.mkdir() |
| 501 | root, _, _, shared_id = _untouched_repo(sub) |
| 502 | result = runner.invoke( |
| 503 | cli, ["merge", "feat", "--strategy", strategy, "--json"], |
| 504 | env=_env(root), catch_exceptions=False, |
| 505 | ) |
| 506 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 507 | assert "shared.py" not in data.get("conflicts", []), ( |
| 508 | f"SM_22: untouched shared.py must not conflict under --strategy {strategy}" |
| 509 | ) |
| 510 | assert data.get("exit_code") == 0, ( |
| 511 | f"SM_22: merge must succeed under --strategy {strategy}" |
| 512 | ) |
| 513 | |
| 514 | def test_SM_23_phantom02_convergent_edit_all_strategies(self, tmp_path: pathlib.Path) -> None: |
| 515 | """SM_23 — PHANTOM_02 (convergent edit) passes under all 4 strategies; merged snapshot has convergent content.""" |
| 516 | for strategy in ("recursive", "overlay", "snapshot", "replay"): |
| 517 | sub = tmp_path / strategy |
| 518 | sub.mkdir() |
| 519 | root, _, cfg_v2 = _convergent_repo(sub) |
| 520 | result = runner.invoke( |
| 521 | cli, ["merge", "feat", "--strategy", strategy, "--json"], |
| 522 | env=_env(root), catch_exceptions=False, |
| 523 | ) |
| 524 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 525 | assert data.get("conflicts", []) == [], ( |
| 526 | f"SM_23: convergent edit must not conflict under --strategy {strategy}" |
| 527 | ) |
| 528 | assert data.get("exit_code") == 0, ( |
| 529 | f"SM_23: merge must succeed under --strategy {strategy}" |
| 530 | ) |
| 531 | snap = _merged_snapshot(root, "main") |
| 532 | assert snap.get("config.py") == cfg_v2, ( |
| 533 | f"SM_23: convergent content must be in merged snapshot under --strategy {strategy}" |
| 534 | ) |
| 535 | |
| 536 | def test_SM_24_phantom05_both_changes_in_merged_snapshot_all_strategies(self, tmp_path: pathlib.Path) -> None: |
| 537 | """SM_24 — PHANTOM_05 (clean merge has changes from both branches) under all 4 strategies.""" |
| 538 | for strategy in ("recursive", "overlay", "snapshot", "replay"): |
| 539 | sub = tmp_path / strategy |
| 540 | sub.mkdir() |
| 541 | root, repo_id = _init_repo(sub) |
| 542 | a_id = _write_obj(root, b"file_a base") |
| 543 | b_id = _write_obj(root, b"file_b base") |
| 544 | base_id = _make_commit(root, repo_id, "main", "base", |
| 545 | {"file_a.py": a_id, "file_b.py": b_id}) |
| 546 | a_v2 = _write_obj(root, b"file_a v2 main only changes this") |
| 547 | b_v2 = _write_obj(root, b"file_b v2 feat only changes this") |
| 548 | (heads_dir(root) / "feat").write_text(base_id, encoding="utf-8") |
| 549 | _make_commit(root, repo_id, "feat", "feat changes b", |
| 550 | {"file_a.py": a_id, "file_b.py": b_v2}, parent_id=base_id) |
| 551 | _make_commit(root, repo_id, "main", "main changes a", |
| 552 | {"file_a.py": a_v2, "file_b.py": b_id}, parent_id=base_id) |
| 553 | _checkout(root, "main", {"file_a.py": a_v2, "file_b.py": b_id}) |
| 554 | |
| 555 | result = runner.invoke( |
| 556 | cli, ["merge", "feat", "--strategy", strategy, "--json"], |
| 557 | env=_env(root), catch_exceptions=False, |
| 558 | ) |
| 559 | data = json.loads(result.output.strip().splitlines()[-1]) |
| 560 | assert data.get("conflicts", []) == [], ( |
| 561 | f"SM_24: clean merge must not conflict under --strategy {strategy}" |
| 562 | ) |
| 563 | assert data.get("exit_code") == 0, ( |
| 564 | f"SM_24: clean merge must succeed under --strategy {strategy}" |
| 565 | ) |
| 566 | snap = _merged_snapshot(root, "main") |
| 567 | assert snap.get("file_a.py") == a_v2, ( |
| 568 | f"SM_24: main's file_a change must survive under --strategy {strategy}" |
| 569 | ) |
| 570 | assert snap.get("file_b.py") == b_v2, ( |
| 571 | f"SM_24: feat's file_b change must survive under --strategy {strategy}" |
| 572 | ) |
File History
1 commit
sha256:f02589f8e157757da430d82f35a64c0b7eee5033f6d13076ea395f9942151790
test(phase3): full strategy matrix — 24 SM tests, rebase→linear
Sonnet 4.6
1 day ago