"""Comprehensive tests for ``muse merge``. Covers: - E2E: merge fast-forward, merge with conflicts, --format json - Integration: HEAD updated after merge, conflict state written - Stress: merge with many files """ from __future__ import annotations type _FileStore = dict[str, bytes] import datetime import json import pathlib import uuid import pytest from tests.cli_test_helper import CliRunner cli = None # argparse migration — CliRunner ignores this arg runner = CliRunner() # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _env(root: pathlib.Path) -> Manifest: return {"MUSE_REPO_ROOT": str(root)} def _init_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: muse_dir = tmp_path / ".muse" muse_dir.mkdir() repo_id = str(uuid.uuid4()) (muse_dir / "repo.json").write_text(json.dumps({ "repo_id": repo_id, "domain": "code", "default_branch": "main", "created_at": "2025-01-01T00:00:00+00:00", }), encoding="utf-8") (muse_dir / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (muse_dir / "refs" / "heads").mkdir(parents=True) (muse_dir / "snapshots").mkdir() (muse_dir / "commits").mkdir() (muse_dir / "objects").mkdir() return tmp_path, repo_id def _make_commit(root: pathlib.Path, repo_id: str, branch: str = "main", message: str = "test", manifest: Manifest | None = None) -> str: from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot from muse.core.snapshot import compute_snapshot_id, compute_commit_id ref_file = root / ".muse" / "refs" / "heads" / branch parent_id = ref_file.read_text().strip() if ref_file.exists() else None m = manifest or {} snap_id = compute_snapshot_id(m) committed_at = datetime.datetime.now(datetime.timezone.utc) commit_id = compute_commit_id( parent_ids=[parent_id] if parent_id else [], snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), ) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=m)) write_commit(root, CommitRecord( commit_id=commit_id, repo_id=repo_id, branch=branch, snapshot_id=snap_id, message=message, committed_at=committed_at, parent_commit_id=parent_id, )) ref_file.parent.mkdir(parents=True, exist_ok=True) ref_file.write_text(commit_id, encoding="utf-8") return commit_id def _write_object(root: pathlib.Path, content: bytes) -> str: import hashlib obj_id = hashlib.sha256(content).hexdigest() obj_path = root / ".muse" / "objects" / obj_id[:2] / obj_id[2:] obj_path.parent.mkdir(parents=True, exist_ok=True) obj_path.write_bytes(content) return obj_id # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- class TestMergeCLI: def test_merge_branch_into_main(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) base_id = _make_commit(root, repo_id, branch="main", message="base") (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id) obj = _write_object(root, b"feature content") _make_commit(root, repo_id, branch="feature", message="feature work", manifest={"new_track.mid": obj}) result = runner.invoke(cli, ["merge", "feature"], env=_env(root), catch_exceptions=False) assert result.exit_code == 0 def test_merge_nonexistent_branch_fails(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = runner.invoke(cli, ["merge", "does-not-exist"], env=_env(root)) assert result.exit_code != 0 def test_merge_format_json(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) base_id = _make_commit(root, repo_id, branch="main", message="base") (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id) _make_commit(root, repo_id, branch="feature", message="feat") result = runner.invoke( cli, ["merge", "--format", "json", "feature"], env=_env(root), catch_exceptions=False ) assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data, dict) def test_merge_message_flag(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) base_id = _make_commit(root, repo_id, branch="main", message="base") (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id) _make_commit(root, repo_id, branch="feature", message="feat") result = runner.invoke( cli, ["merge", "--message", "Merge feature", "feature"], env=_env(root), catch_exceptions=False ) assert result.exit_code == 0 def test_merge_invalid_branch_name_rejected(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = runner.invoke(cli, ["merge", "../evil"], env=_env(root)) assert result.exit_code != 0 def test_merge_output_sanitized(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) base_id = _make_commit(root, repo_id, branch="main", message="base") (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id) _make_commit(root, repo_id, branch="feature", message="feat") result = runner.invoke(cli, ["merge", "feature"], env=_env(root), catch_exceptions=False) assert "\x1b" not in result.output class TestMergeConflictWorkdir: """Regression: non-conflicting additions from theirs must reach the working tree even when a conflicted merge exits early. Bug: muse merge called ``raise SystemExit`` before ``_restore_from_manifest`` when conflicts existed. Theirs-only file additions were computed but never written to disk; ``muse checkout --theirs --all`` only resolved the conflict_paths, so ``muse code add .`` missed the new files and the merge commit was silently incomplete. """ def _make_commit_with_files( self, root: pathlib.Path, repo_id: str, branch: str, files: _FileStore, parent_id: str | None = None, message: str = "commit", ) -> str: manifest: Manifest = {} for rel, content in files.items(): oid = _write_object(root, content) manifest[rel] = oid dest = root / rel dest.parent.mkdir(parents=True, exist_ok=True) dest.write_bytes(content) return _make_commit(root, repo_id, branch=branch, message=message, manifest=manifest) def test_theirs_only_additions_written_to_workdir_on_conflict( self, tmp_path: pathlib.Path ) -> None: """Theirs-only new files must appear in the working tree after a conflicted merge so that ``muse code add .`` captures them.""" root, repo_id = _init_repo(tmp_path) # Base: one shared file that both sides will modify (guaranteeing conflict). base_id = self._make_commit_with_files( root, repo_id, "main", {"shared.py": b"def foo(): pass\n"}, message="base", ) # Theirs: modifies shared.py AND adds two brand-new files. (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id) self._make_commit_with_files( root, repo_id, "feature", { "shared.py": b"def foo(): return 'theirs'\n", "new_security_test.py": b"# security test\n", "new_perf_test.py": b"# perf test\n", }, message="feature: add tests + modify shared", ) # Ours: also modifies shared.py (guaranteeing a conflict on that file). (root / "shared.py").write_bytes(b"def foo(): return 'ours'\n") _make_commit( root, repo_id, "main", message="ours: modify shared", manifest={"shared.py": _write_object(root, b"def foo(): return 'ours'\n")}, ) result = runner.invoke(cli, ["merge", "feature"], env=_env(root)) # Merge must exit with a conflict status, not a clean merge. assert result.exit_code != 0, "Expected conflict exit code" assert "CONFLICT" in result.output or "conflict" in result.output.lower() # The fix: theirs-only additions MUST now exist in the working tree. assert (root / "new_security_test.py").exists(), ( "new_security_test.py (theirs-only addition) must be written to the " "working tree even though a conflict was detected on shared.py" ) assert (root / "new_perf_test.py").exists(), ( "new_perf_test.py (theirs-only addition) must be written to the " "working tree even though a conflict was detected on shared.py" ) assert (root / "new_security_test.py").read_bytes() == b"# security test\n" assert (root / "new_perf_test.py").read_bytes() == b"# perf test\n" def test_conflicting_file_left_at_ours_version_on_conflict( self, tmp_path: pathlib.Path ) -> None: """Conflicting files must remain at their ours content in the working tree after a partial restore — the agent resolves via --ours/--theirs.""" root, repo_id = _init_repo(tmp_path) base_id = self._make_commit_with_files( root, repo_id, "main", {"shared.py": b"def foo(): pass\n"}, message="base", ) (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id) self._make_commit_with_files( root, repo_id, "feature", { "shared.py": b"def foo(): return 'theirs'\n", "only_on_theirs.py": b"# new\n", }, message="feature", ) ours_content = b"def foo(): return 'ours'\n" (root / "shared.py").write_bytes(ours_content) _make_commit( root, repo_id, "main", message="ours", manifest={"shared.py": _write_object(root, ours_content)}, ) runner.invoke(cli, ["merge", "feature"], env=_env(root)) # Conflicting file must stay at ours content for agent inspection. assert (root / "shared.py").read_bytes() == ours_content # Theirs-only addition must be present. assert (root / "only_on_theirs.py").exists() class TestMergeStress: def test_merge_feature_with_many_files(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) base_id = _make_commit(root, repo_id, branch="main", message="base") (root / ".muse" / "refs" / "heads" / "feature").write_text(base_id) manifest = {f"track_{i:03d}.mid": _write_object(root, f"data {i}".encode()) for i in range(30)} _make_commit(root, repo_id, branch="feature", message="many files", manifest=manifest) result = runner.invoke(cli, ["merge", "feature"], env=_env(root), catch_exceptions=False) assert result.exit_code == 0