"""Tests for ``muse apply-patch`` — apply a Muse .mpatch file to the working tree. Test tiers ---------- - Unit: output schema, exit_code/duration_ms in JSON - Integration: apply restores files, --dry-run reports without writing, --check - Data integrity: patch_id verified before apply, tampered patch rejected - Security: path traversal in manifest rejected, error to stderr - Edge: empty diff applies cleanly, already-applied patch detectable via snapshot check """ from __future__ import annotations from collections.abc import Mapping import datetime import json import pathlib import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.object_store import write_object from muse.core.types import long_id, blob_id, fake_id from muse.core.paths import muse_dir, ref_path runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _init_repo(path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(path) for sub in ("commits", "snapshots", "objects", "refs/heads"): (dot_muse / sub).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo", "domain": "code"})) return path def _write_object(repo: pathlib.Path, content: bytes) -> str: """Write bytes to the object store and return the sha256: prefixed ID.""" oid = blob_id(content) write_object(repo, oid, content) return oid def _commit( repo: pathlib.Path, msg: str, manifest: dict[str, str], branch: str = "main", parent: str | None = None, ts: datetime.datetime | None = None, ) -> str: ts = ts or datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) sid = hash_snapshot(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=sid, manifest=manifest, created_at=ts)) parent_ids = [parent] if parent else [] cid = hash_commit( parent_ids=parent_ids, snapshot_id=sid, message=msg, committed_at_iso=ts.isoformat(), author="gabriel", ) write_commit(repo, CommitRecord( commit_id=cid, branch=branch, snapshot_id=sid, message=msg, committed_at=ts, author="gabriel", parent_commit_id=parent, parent2_commit_id=None, )) branch_ref = ref_path(repo, branch) branch_ref.parent.mkdir(parents=True, exist_ok=True) branch_ref.write_text(cid) return cid def _make_patch(repo: pathlib.Path, tmp_path: pathlib.Path) -> pathlib.Path: """Create a .mpatch file by running format-patch --output-dir.""" out_dir = tmp_path / "patches" out_dir.mkdir(exist_ok=True) r = runner.invoke(None, ["format-patch", "--output-dir", str(out_dir)], env={"MUSE_REPO_ROOT": str(repo)}) assert r.exit_code == 0, f"format-patch failed: {r.output}" patches = list(out_dir.glob("*.mpatch")) assert len(patches) == 1 return patches[0] def _ap(repo: pathlib.Path, patch_file: pathlib.Path, *args: str) -> InvokeResult: return runner.invoke(None, ["apply-patch", str(patch_file), *args], env={"MUSE_REPO_ROOT": str(repo)}) def _json(r: InvokeResult) -> Mapping[str, object]: return json.loads(r.output) # --------------------------------------------------------------------------- # JSON output schema # --------------------------------------------------------------------------- class TestJsonSchema: def test_exits_zero_on_success(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid = _write_object(src, b"x = 1\n") c1 = _commit(src, "c1", {"a.py": oid}) patch = _make_patch(src, tmp_path) # Target repo at c1's snapshot (same as from_snapshot) target = _init_repo(tmp_path / "target") oid_t = _write_object(target, b"x = 1\n") # Use the same snapshot_id as source c1 so applicability passes sid = hash_snapshot({"a.py": oid_t}) write_snapshot(target, SnapshotRecord( snapshot_id=sid, manifest={"a.py": oid_t}, created_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc), )) _commit(target, "c1", {"a.py": oid_t}) r = _ap(target, patch, "--json") assert r.exit_code == 0 def test_json_has_exit_code_zero(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid = _write_object(src, b"x = 1\n") _commit(src, "c1", {"a.py": oid}) patch = _make_patch(src, tmp_path) target = _init_repo(tmp_path / "target") oid_t = _write_object(target, b"x = 1\n") _commit(target, "c1", {"a.py": oid_t}) data = _json(_ap(target, patch, "--json")) assert data["exit_code"] == 0 def test_exit_code_is_int_not_bool(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid = _write_object(src, b"x = 1\n") _commit(src, "c1", {"a.py": oid}) patch = _make_patch(src, tmp_path) target = _init_repo(tmp_path / "target") oid_t = _write_object(target, b"x = 1\n") _commit(target, "c1", {"a.py": oid_t}) data = _json(_ap(target, patch, "--json")) assert type(data["exit_code"]) is int def test_json_has_duration_ms(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid = _write_object(src, b"x = 1\n") _commit(src, "c1", {"a.py": oid}) patch = _make_patch(src, tmp_path) target = _init_repo(tmp_path / "target") oid_t = _write_object(target, b"x = 1\n") _commit(target, "c1", {"a.py": oid_t}) data = _json(_ap(target, patch, "--json")) assert "duration_ms" in data def test_duration_ms_is_float(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid = _write_object(src, b"x = 1\n") _commit(src, "c1", {"a.py": oid}) patch = _make_patch(src, tmp_path) target = _init_repo(tmp_path / "target") oid_t = _write_object(target, b"x = 1\n") _commit(target, "c1", {"a.py": oid_t}) data = _json(_ap(target, patch, "--json")) assert isinstance(data["duration_ms"], float) assert data["duration_ms"] >= 0.0 def test_json_output_is_compact(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid = _write_object(src, b"x = 1\n") _commit(src, "c1", {"a.py": oid}) patch = _make_patch(src, tmp_path) target = _init_repo(tmp_path / "target") oid_t = _write_object(target, b"x = 1\n") _commit(target, "c1", {"a.py": oid_t}) r = _ap(target, patch, "--json") assert len(r.output.strip().splitlines()) == 1 def test_json_has_patch_id(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid = _write_object(src, b"x = 1\n") _commit(src, "c1", {"a.py": oid}) patch = _make_patch(src, tmp_path) target = _init_repo(tmp_path / "target") oid_t = _write_object(target, b"x = 1\n") _commit(target, "c1", {"a.py": oid_t}) data = _json(_ap(target, patch, "--json")) assert "patch_id" in data assert data["patch_id"].startswith("sha256:") def test_json_has_files_applied(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid = _write_object(src, b"x = 1\n") _commit(src, "c1", {"a.py": oid}) patch = _make_patch(src, tmp_path) target = _init_repo(tmp_path / "target") oid_t = _write_object(target, b"x = 1\n") _commit(target, "c1", {"a.py": oid_t}) data = _json(_ap(target, patch, "--json")) assert "files_applied" in data assert isinstance(data["files_applied"], list) # --------------------------------------------------------------------------- # Files are restored to disk # --------------------------------------------------------------------------- class TestFileRestoration: def test_added_file_exists_after_apply(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid = _write_object(src, b"x = 1\n") _commit(src, "c1", {"a.py": oid}) patch = _make_patch(src, tmp_path) target = _init_repo(tmp_path / "target") _commit(target, "empty", {}) _ap(target, patch) assert (target / "a.py").exists() def test_added_file_has_correct_content(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid = _write_object(src, b"x = 42\n") _commit(src, "c1", {"a.py": oid}) patch = _make_patch(src, tmp_path) target = _init_repo(tmp_path / "target") _commit(target, "empty", {}) _ap(target, patch) assert (target / "a.py").read_bytes() == b"x = 42\n" def test_deleted_file_removed_after_apply(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid1 = _write_object(src, b"x = 1\n") oid2 = _write_object(src, b"y = 2\n") c1 = _commit(src, "c1", {"old.py": oid1, "keep.py": oid2}) _commit(src, "c2", {"keep.py": oid2}, parent=c1) patch = _make_patch(src, tmp_path) target = _init_repo(tmp_path / "target") oid_old = _write_object(target, b"x = 1\n") oid_keep = _write_object(target, b"y = 2\n") _commit(target, "c1", {"old.py": oid_old, "keep.py": oid_keep}) # Write both files to disk (simulating a working tree at c1) (target / "old.py").write_bytes(b"x = 1\n") (target / "keep.py").write_bytes(b"y = 2\n") _ap(target, patch) # old.py was deleted by the patch; keep.py was untouched on disk assert not (target / "old.py").exists() assert (target / "keep.py").exists() # --------------------------------------------------------------------------- # --dry-run does not modify disk # --------------------------------------------------------------------------- class TestDryRun: def test_dry_run_exits_zero(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid = _write_object(src, b"x = 1\n") _commit(src, "c1", {"a.py": oid}) patch = _make_patch(src, tmp_path) target = _init_repo(tmp_path / "target") _commit(target, "empty", {}) r = _ap(target, patch, "--dry-run") assert r.exit_code == 0 def test_dry_run_does_not_write_files(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid = _write_object(src, b"x = 1\n") _commit(src, "c1", {"a.py": oid}) patch = _make_patch(src, tmp_path) target = _init_repo(tmp_path / "target") _commit(target, "empty", {}) _ap(target, patch, "--dry-run") assert not (target / "a.py").exists() def test_dry_run_json_has_dry_run_true(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid = _write_object(src, b"x = 1\n") _commit(src, "c1", {"a.py": oid}) patch = _make_patch(src, tmp_path) target = _init_repo(tmp_path / "target") _commit(target, "empty", {}) data = _json(_ap(target, patch, "--dry-run", "--json")) assert data.get("dry_run") is True # --------------------------------------------------------------------------- # --check (applicability only) # --------------------------------------------------------------------------- class TestCheck: def test_check_exits_zero_when_applicable(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid = _write_object(src, b"x = 1\n") _commit(src, "c1", {"a.py": oid}) patch = _make_patch(src, tmp_path) target = _init_repo(tmp_path / "target") _commit(target, "empty", {}) r = _ap(target, patch, "--check") assert r.exit_code == 0 def test_check_does_not_write_files(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid = _write_object(src, b"x = 1\n") _commit(src, "c1", {"a.py": oid}) patch = _make_patch(src, tmp_path) target = _init_repo(tmp_path / "target") _commit(target, "empty", {}) _ap(target, patch, "--check") assert not (target / "a.py").exists() def test_check_json_has_applicable_field(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid = _write_object(src, b"x = 1\n") _commit(src, "c1", {"a.py": oid}) patch = _make_patch(src, tmp_path) target = _init_repo(tmp_path / "target") _commit(target, "empty", {}) data = _json(_ap(target, patch, "--check", "--json")) assert "applicable" in data assert isinstance(data["applicable"], bool) # --------------------------------------------------------------------------- # Integrity verification # --------------------------------------------------------------------------- class TestIntegrity: def test_tampered_patch_id_rejected(self, tmp_path: pathlib.Path) -> None: src = _init_repo(tmp_path / "src") oid = _write_object(src, b"x = 1\n") _commit(src, "c1", {"a.py": oid}) patch = _make_patch(src, tmp_path) # Tamper with patch_id data = json.loads(patch.read_bytes()) data["patch_id"] = fake_id("tampered-patch") patch.write_bytes(json.dumps(data).encode()) target = _init_repo(tmp_path / "target") _commit(target, "empty", {}) r = _ap(target, patch) assert r.exit_code != 0 def test_missing_patch_file_exits_nonzero(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid = _write_object(repo, b"x = 1\n") _commit(repo, "init", {"a.py": oid}) r = _ap(repo, tmp_path / "nonexistent.mpatch") assert r.exit_code != 0 def test_corrupt_json_exits_nonzero(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid = _write_object(repo, b"x = 1\n") _commit(repo, "init", {"a.py": oid}) bad_patch = tmp_path / "bad.mpatch" bad_patch.write_bytes(b"not valid json!!!") r = _ap(repo, bad_patch) assert r.exit_code != 0 import argparse as _argparse class TestRegisterFlags: def _parse(self, *args: str) -> _argparse.Namespace: from muse.cli.commands.apply_patch import register p = _argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["apply-patch", *args]) def test_default_json_out_is_false(self) -> None: ns = self._parse("dummy.mpatch") assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = self._parse("--json", "dummy.mpatch") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = self._parse("-j", "dummy.mpatch") assert ns.json_out is True def test_dry_run_default(self) -> None: ns = self._parse("dummy.mpatch") assert ns.dry_run is False def test_dry_run_flag(self) -> None: ns = self._parse("--dry-run", "dummy.mpatch") assert ns.dry_run is True def test_dry_run_n_shorthand(self) -> None: ns = self._parse("-n", "dummy.mpatch") assert ns.dry_run is True def test_force_default(self) -> None: ns = self._parse("dummy.mpatch") assert ns.force is False def test_force_flag(self) -> None: ns = self._parse("--force", "dummy.mpatch") assert ns.force is True def test_force_f_shorthand(self) -> None: ns = self._parse("-f", "dummy.mpatch") assert ns.force is True