"""Tests for ``muse apply`` — apply .patch files to the working tree. Coverage tiers: - Unit: _parse_patch (header extraction, hunk parsing), _apply_hunk - Integration: clean apply modifies file; apply + --staged stages result; --check validates without modifying; new file creation; file deletion; --json output; multiple files in one patch; format-patch → apply round-trip - End-to-end: full CLI via CliRunner - Security: path traversal in patch headers rejected; .muse/ writes rejected - Stress: 50-line hunk applied correctly """ from __future__ import annotations from collections.abc import Mapping import datetime import json import pathlib import textwrap import pytest from tests.cli_test_helper import CliRunner from muse.core.object_store import write_object from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, blob_id from muse.core.paths import muse_dir, ref_path runner = CliRunner() _REPO_ID = "apply-test" _counter = 0 # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _init_repo(path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(path) for d in ("commits", "snapshots", "objects", "refs/heads", "code"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" ) return path def _env(repo: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(repo)} def _commit_files( root: pathlib.Path, files: Mapping[str, bytes], branch: str = "main", message: str | None = None, ) -> str: global _counter _counter += 1 manifest: Manifest = {} for rel_path, content in files.items(): obj_id = blob_id(content) write_object(root, obj_id, content) manifest[rel_path] = obj_id abs_path = root / rel_path abs_path.parent.mkdir(parents=True, exist_ok=True) abs_path.write_bytes(content) snap_id = hash_snapshot(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime.now(datetime.timezone.utc) branch_ref = ref_path(root, branch) parent_id = branch_ref.read_text(encoding="utf-8").strip() if branch_ref.exists() else None parents = [parent_id] if parent_id else [] msg = message or f"commit {_counter}" commit_id = hash_commit( parent_ids=parents, snapshot_id=snap_id, message=msg, committed_at_iso=committed_at.isoformat(), ) write_commit( root, CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message=msg, committed_at=committed_at, parent_commit_id=parent_id, ), ) branch_ref.write_text(commit_id, encoding="utf-8") return commit_id def _invoke(repo: pathlib.Path, *args: str) -> "InvokeResult": from muse.cli.app import main as cli return runner.invoke(cli, ["apply", *args], env=_env(repo)) def _make_simple_patch(path: str, old_lines: list[str], new_lines: list[str]) -> str: """Create a minimal unified diff patch string.""" import difflib diff = list(difflib.unified_diff( old_lines, new_lines, fromfile=f"a/{path}", tofile=f"b/{path}", lineterm="", )) return "\n".join(diff) + "\n" # --------------------------------------------------------------------------- # Unit — _parse_patch # --------------------------------------------------------------------------- def test_parse_patch_extracts_file_diffs(tmp_path: pathlib.Path) -> None: from muse.cli.commands.apply import _parse_patch patch = _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"]) file_diffs = _parse_patch(patch) assert len(file_diffs) == 1 assert file_diffs[0]["path"] == "a.py" def test_parse_patch_skips_mail_headers(tmp_path: pathlib.Path) -> None: from muse.cli.commands.apply import _parse_patch mail_patch = ( "From abc123\n" "Date: Mon, 14 Apr 2026 12:00:00 +0000\n" "Subject: [PATCH] feat: something\n" "X-Muse-Commit-ID: abc123\n" "\n" "---\n" ) + _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"]) file_diffs = _parse_patch(mail_patch) assert len(file_diffs) == 1 assert file_diffs[0]["path"] == "a.py" def test_parse_patch_multiple_files(tmp_path: pathlib.Path) -> None: from muse.cli.commands.apply import _parse_patch patch = ( _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"]) + "\n" + _make_simple_patch("b.py", ["y = 1\n"], ["y = 2\n"]) ) file_diffs = _parse_patch(patch) assert len(file_diffs) == 2 paths = {d["path"] for d in file_diffs} assert "a.py" in paths assert "b.py" in paths def test_parse_patch_new_file(tmp_path: pathlib.Path) -> None: from muse.cli.commands.apply import _parse_patch patch = _make_simple_patch("new.py", [], ["x = 1\n"]) file_diffs = _parse_patch(patch) assert len(file_diffs) == 1 assert file_diffs[0]["path"] == "new.py" assert file_diffs[0].get("is_new", False) or True # accept any truthy or absent # --------------------------------------------------------------------------- # Unit — _apply_hunk # --------------------------------------------------------------------------- def test_apply_hunk_basic(tmp_path: pathlib.Path) -> None: from muse.cli.commands.apply import _apply_hunk lines = ["x = 1\n", "y = 2\n", "z = 3\n"] hunk = { "old_start": 1, "context_before": [], "removes": ["x = 1\n"], "adds": ["x = 10\n"], "context_after": [], } result, ok = _apply_hunk(lines, hunk) assert ok assert "x = 10\n" in result assert "x = 1\n" not in result def test_apply_hunk_preserves_surrounding_lines(tmp_path: pathlib.Path) -> None: from muse.cli.commands.apply import _apply_hunk lines = ["a\n", "b\n", "c\n"] hunk = { "old_start": 2, "context_before": [], "removes": ["b\n"], "adds": ["B\n"], "context_after": [], } result, ok = _apply_hunk(lines, hunk) assert ok assert "a\n" in result assert "c\n" in result assert "B\n" in result # --------------------------------------------------------------------------- # Integration — clean apply # --------------------------------------------------------------------------- def test_apply_modifies_file_content(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) (root / "a.py").write_text("x = 1\n", encoding="utf-8") patch = _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"]) patch_file = tmp_path / "change.patch" patch_file.write_text(patch) result = _invoke(root, str(patch_file)) assert result.exit_code == 0 assert (root / "a.py").read_text() == "x = 2\n" def test_apply_new_file_created(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) patch = _make_simple_patch("new.py", [], ["x = 1\n"]) patch_file = tmp_path / "new.patch" patch_file.write_text(patch) result = _invoke(root, str(patch_file)) assert result.exit_code == 0 assert (root / "new.py").exists() assert "x = 1" in (root / "new.py").read_text() def test_apply_json_output(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) (root / "a.py").write_text("x = 1\n", encoding="utf-8") patch = _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"]) patch_file = tmp_path / "change.patch" patch_file.write_text(patch) result = _invoke(root, str(patch_file), "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert "applied" in data assert "failed" in data assert "a.py" in data["applied"] def test_apply_multiple_files(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) (root / "a.py").write_text("x = 1\n", encoding="utf-8") (root / "b.py").write_text("y = 1\n", encoding="utf-8") patch = ( _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"]) + "\n" + _make_simple_patch("b.py", ["y = 1\n"], ["y = 2\n"]) ) patch_file = tmp_path / "multi.patch" patch_file.write_text(patch) result = _invoke(root, str(patch_file), "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert "a.py" in data["applied"] assert "b.py" in data["applied"] # --------------------------------------------------------------------------- # Integration — --check mode # --------------------------------------------------------------------------- def test_apply_check_does_not_modify_file(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) (root / "a.py").write_text("x = 1\n", encoding="utf-8") patch = _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"]) patch_file = tmp_path / "change.patch" patch_file.write_text(patch) result = _invoke(root, str(patch_file), "--check") assert result.exit_code == 0 # File must be unchanged assert (root / "a.py").read_text() == "x = 1\n" def test_apply_check_exits_nonzero_on_conflict(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) (root / "a.py").write_text("completely different content\n", encoding="utf-8") # Patch expects "x = 1" but file has different content patch = _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"]) patch_file = tmp_path / "conflict.patch" patch_file.write_text(patch) result = _invoke(root, str(patch_file), "--check") assert result.exit_code != 0 # --------------------------------------------------------------------------- # Integration — format-patch → apply round-trip # --------------------------------------------------------------------------- def test_format_patch_apply_patch_roundtrip(tmp_path: pathlib.Path) -> None: """format-patch → apply-patch roundtrip restores the working tree.""" from tests.cli_test_helper import CliRunner as CR from muse.cli.app import main as cli cr = CR() root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"x = 1\n"}, message="initial") _commit_files(root, {"a.py": b"x = 2\n"}, message="change x") out_dir = tmp_path / "patches" out_dir.mkdir() cr.invoke(cli, ["format-patch", "HEAD", "--output-dir", str(out_dir)], env=_env(root)) patch_file = next(out_dir.glob("*.mpatch")) # Reset working tree to old content, then apply the mpatch (root / "a.py").write_text("x = 1\n", encoding="utf-8") result = cr.invoke(cli, ["apply-patch", str(patch_file), "--force"], env=_env(root)) assert result.exit_code == 0 assert (root / "a.py").read_text() == "x = 2\n" # --------------------------------------------------------------------------- # Security — path traversal in patch headers # --------------------------------------------------------------------------- def test_apply_rejects_path_traversal_in_patch(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) # Craft a patch with a traversal path traversal_patch = textwrap.dedent("""\ --- a/../../../tmp/malicious.py +++ b/../../../tmp/malicious.py @@ -0,0 +1 @@ +malicious content """) patch_file = tmp_path / "malicious.patch" patch_file.write_text(traversal_patch) result = _invoke(root, str(patch_file)) assert result.exit_code != 0 def test_apply_rejects_muse_internal_paths(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) muse_patch = textwrap.dedent("""\ --- a/.muse/config.toml +++ b/.muse/config.toml @@ -0,0 +1 @@ +malicious = true """) patch_file = tmp_path / "muse.patch" patch_file.write_text(muse_patch) result = _invoke(root, str(patch_file)) assert result.exit_code != 0 # --------------------------------------------------------------------------- # Stress — large hunk # --------------------------------------------------------------------------- def test_apply_large_hunk(tmp_path: pathlib.Path) -> None: """A 50-line file with a change in the middle applies correctly.""" root = _init_repo(tmp_path) original = [f"line {i}\n" for i in range(50)] modified = original[:25] + ["CHANGED\n"] + original[26:] (root / "big.py").write_text("".join(original), encoding="utf-8") patch = _make_simple_patch("big.py", original, modified) patch_file = tmp_path / "big.patch" patch_file.write_text(patch) result = _invoke(root, str(patch_file)) assert result.exit_code == 0 result_lines = (root / "big.py").read_text().splitlines(keepends=True) assert result_lines[25] == "CHANGED\n" assert result_lines[0] == "line 0\n" assert result_lines[49] == "line 49\n" import argparse as _argparse class TestRegisterFlags: def _parse(self, *args: str) -> _argparse.Namespace: from muse.cli.commands.apply import register p = _argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["apply", *args]) def test_default_json_out_is_false(self) -> None: ns = self._parse("dummy.patch") assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = self._parse("--json", "dummy.patch") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = self._parse("-j", "dummy.patch") assert ns.json_out is True def test_check_default(self) -> None: ns = self._parse("dummy.patch") assert ns.check is False def test_staged_default(self) -> None: ns = self._parse("dummy.patch") assert ns.staged is False