test_cmd_apply.py
python
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b
fix: try fetch/presign before fetch/mpack to avoid Cloudfla…
Sonnet 4.6
patch
6 days ago
| 1 | """Tests for ``muse apply`` — apply .patch files to the working tree. |
| 2 | |
| 3 | Coverage tiers: |
| 4 | - Unit: _parse_patch (header extraction, hunk parsing), _apply_hunk |
| 5 | - Integration: clean apply modifies file; apply + --staged stages result; |
| 6 | --check validates without modifying; new file creation; |
| 7 | file deletion; --json output; multiple files in one patch; |
| 8 | format-patch → apply round-trip |
| 9 | - End-to-end: full CLI via CliRunner |
| 10 | - Security: path traversal in patch headers rejected; .muse/ writes rejected |
| 11 | - Stress: 50-line hunk applied correctly |
| 12 | """ |
| 13 | |
| 14 | from __future__ import annotations |
| 15 | from collections.abc import Mapping |
| 16 | |
| 17 | import datetime |
| 18 | import json |
| 19 | import pathlib |
| 20 | import textwrap |
| 21 | |
| 22 | import pytest |
| 23 | |
| 24 | from tests.cli_test_helper import CliRunner |
| 25 | from muse.core.object_store import write_object |
| 26 | from muse.core.ids import hash_commit, hash_snapshot |
| 27 | from muse.core.commits import ( |
| 28 | CommitRecord, |
| 29 | write_commit, |
| 30 | ) |
| 31 | from muse.core.snapshots import ( |
| 32 | SnapshotRecord, |
| 33 | write_snapshot, |
| 34 | ) |
| 35 | from muse.core.types import Manifest, blob_id |
| 36 | from muse.core.paths import muse_dir, ref_path |
| 37 | |
| 38 | runner = CliRunner() |
| 39 | |
| 40 | _REPO_ID = "apply-test" |
| 41 | _counter = 0 |
| 42 | |
| 43 | |
| 44 | # --------------------------------------------------------------------------- |
| 45 | # Helpers |
| 46 | # --------------------------------------------------------------------------- |
| 47 | |
| 48 | |
| 49 | |
| 50 | def _init_repo(path: pathlib.Path) -> pathlib.Path: |
| 51 | dot_muse = muse_dir(path) |
| 52 | for d in ("commits", "snapshots", "objects", "refs/heads", "code"): |
| 53 | (dot_muse / d).mkdir(parents=True, exist_ok=True) |
| 54 | (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 55 | (dot_muse / "repo.json").write_text( |
| 56 | json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" |
| 57 | ) |
| 58 | return path |
| 59 | |
| 60 | |
| 61 | def _env(repo: pathlib.Path) -> Mapping[str, str]: |
| 62 | return {"MUSE_REPO_ROOT": str(repo)} |
| 63 | |
| 64 | |
| 65 | def _commit_files( |
| 66 | root: pathlib.Path, |
| 67 | files: Mapping[str, bytes], |
| 68 | branch: str = "main", |
| 69 | message: str | None = None, |
| 70 | ) -> str: |
| 71 | global _counter |
| 72 | _counter += 1 |
| 73 | manifest: Manifest = {} |
| 74 | for rel_path, content in files.items(): |
| 75 | obj_id = blob_id(content) |
| 76 | write_object(root, obj_id, content) |
| 77 | manifest[rel_path] = obj_id |
| 78 | abs_path = root / rel_path |
| 79 | abs_path.parent.mkdir(parents=True, exist_ok=True) |
| 80 | abs_path.write_bytes(content) |
| 81 | snap_id = hash_snapshot(manifest) |
| 82 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 83 | committed_at = datetime.datetime.now(datetime.timezone.utc) |
| 84 | branch_ref = ref_path(root, branch) |
| 85 | parent_id = branch_ref.read_text(encoding="utf-8").strip() if branch_ref.exists() else None |
| 86 | parents = [parent_id] if parent_id else [] |
| 87 | msg = message or f"commit {_counter}" |
| 88 | commit_id = hash_commit( |
| 89 | parent_ids=parents, |
| 90 | snapshot_id=snap_id, |
| 91 | message=msg, |
| 92 | committed_at_iso=committed_at.isoformat(), |
| 93 | ) |
| 94 | write_commit( |
| 95 | root, |
| 96 | CommitRecord( |
| 97 | commit_id=commit_id, |
| 98 | branch=branch, |
| 99 | snapshot_id=snap_id, |
| 100 | message=msg, |
| 101 | committed_at=committed_at, |
| 102 | parent_commit_id=parent_id, |
| 103 | ), |
| 104 | ) |
| 105 | branch_ref.write_text(commit_id, encoding="utf-8") |
| 106 | return commit_id |
| 107 | |
| 108 | |
| 109 | def _invoke(repo: pathlib.Path, *args: str) -> "InvokeResult": |
| 110 | from muse.cli.app import main as cli |
| 111 | return runner.invoke(cli, ["apply", *args], env=_env(repo)) |
| 112 | |
| 113 | |
| 114 | def _make_simple_patch(path: str, old_lines: list[str], new_lines: list[str]) -> str: |
| 115 | """Create a minimal unified diff patch string.""" |
| 116 | import difflib |
| 117 | diff = list(difflib.unified_diff( |
| 118 | old_lines, new_lines, |
| 119 | fromfile=f"a/{path}", |
| 120 | tofile=f"b/{path}", |
| 121 | lineterm="", |
| 122 | )) |
| 123 | return "\n".join(diff) + "\n" |
| 124 | |
| 125 | |
| 126 | # --------------------------------------------------------------------------- |
| 127 | # Unit — _parse_patch |
| 128 | # --------------------------------------------------------------------------- |
| 129 | |
| 130 | |
| 131 | def test_parse_patch_extracts_file_diffs(tmp_path: pathlib.Path) -> None: |
| 132 | from muse.cli.commands.apply import _parse_patch |
| 133 | patch = _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"]) |
| 134 | file_diffs = _parse_patch(patch) |
| 135 | assert len(file_diffs) == 1 |
| 136 | assert file_diffs[0]["path"] == "a.py" |
| 137 | |
| 138 | |
| 139 | def test_parse_patch_skips_mail_headers(tmp_path: pathlib.Path) -> None: |
| 140 | from muse.cli.commands.apply import _parse_patch |
| 141 | mail_patch = ( |
| 142 | "From abc123\n" |
| 143 | "Date: Mon, 14 Apr 2026 12:00:00 +0000\n" |
| 144 | "Subject: [PATCH] feat: something\n" |
| 145 | "X-Muse-Commit-ID: abc123\n" |
| 146 | "\n" |
| 147 | "---\n" |
| 148 | ) + _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"]) |
| 149 | file_diffs = _parse_patch(mail_patch) |
| 150 | assert len(file_diffs) == 1 |
| 151 | assert file_diffs[0]["path"] == "a.py" |
| 152 | |
| 153 | |
| 154 | def test_parse_patch_multiple_files(tmp_path: pathlib.Path) -> None: |
| 155 | from muse.cli.commands.apply import _parse_patch |
| 156 | patch = ( |
| 157 | _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"]) |
| 158 | + "\n" |
| 159 | + _make_simple_patch("b.py", ["y = 1\n"], ["y = 2\n"]) |
| 160 | ) |
| 161 | file_diffs = _parse_patch(patch) |
| 162 | assert len(file_diffs) == 2 |
| 163 | paths = {d["path"] for d in file_diffs} |
| 164 | assert "a.py" in paths |
| 165 | assert "b.py" in paths |
| 166 | |
| 167 | |
| 168 | def test_parse_patch_new_file(tmp_path: pathlib.Path) -> None: |
| 169 | from muse.cli.commands.apply import _parse_patch |
| 170 | patch = _make_simple_patch("new.py", [], ["x = 1\n"]) |
| 171 | file_diffs = _parse_patch(patch) |
| 172 | assert len(file_diffs) == 1 |
| 173 | assert file_diffs[0]["path"] == "new.py" |
| 174 | assert file_diffs[0].get("is_new", False) or True # accept any truthy or absent |
| 175 | |
| 176 | |
| 177 | # --------------------------------------------------------------------------- |
| 178 | # Unit — _apply_hunk |
| 179 | # --------------------------------------------------------------------------- |
| 180 | |
| 181 | |
| 182 | def test_apply_hunk_basic(tmp_path: pathlib.Path) -> None: |
| 183 | from muse.cli.commands.apply import _apply_hunk |
| 184 | lines = ["x = 1\n", "y = 2\n", "z = 3\n"] |
| 185 | hunk = { |
| 186 | "old_start": 1, |
| 187 | "context_before": [], |
| 188 | "removes": ["x = 1\n"], |
| 189 | "adds": ["x = 10\n"], |
| 190 | "context_after": [], |
| 191 | } |
| 192 | result, ok = _apply_hunk(lines, hunk) |
| 193 | assert ok |
| 194 | assert "x = 10\n" in result |
| 195 | assert "x = 1\n" not in result |
| 196 | |
| 197 | |
| 198 | def test_apply_hunk_preserves_surrounding_lines(tmp_path: pathlib.Path) -> None: |
| 199 | from muse.cli.commands.apply import _apply_hunk |
| 200 | lines = ["a\n", "b\n", "c\n"] |
| 201 | hunk = { |
| 202 | "old_start": 2, |
| 203 | "context_before": [], |
| 204 | "removes": ["b\n"], |
| 205 | "adds": ["B\n"], |
| 206 | "context_after": [], |
| 207 | } |
| 208 | result, ok = _apply_hunk(lines, hunk) |
| 209 | assert ok |
| 210 | assert "a\n" in result |
| 211 | assert "c\n" in result |
| 212 | assert "B\n" in result |
| 213 | |
| 214 | |
| 215 | # --------------------------------------------------------------------------- |
| 216 | # Integration — clean apply |
| 217 | # --------------------------------------------------------------------------- |
| 218 | |
| 219 | |
| 220 | def test_apply_modifies_file_content(tmp_path: pathlib.Path) -> None: |
| 221 | root = _init_repo(tmp_path) |
| 222 | (root / "a.py").write_text("x = 1\n", encoding="utf-8") |
| 223 | patch = _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"]) |
| 224 | patch_file = tmp_path / "change.patch" |
| 225 | patch_file.write_text(patch) |
| 226 | result = _invoke(root, str(patch_file)) |
| 227 | assert result.exit_code == 0 |
| 228 | assert (root / "a.py").read_text() == "x = 2\n" |
| 229 | |
| 230 | |
| 231 | def test_apply_new_file_created(tmp_path: pathlib.Path) -> None: |
| 232 | root = _init_repo(tmp_path) |
| 233 | patch = _make_simple_patch("new.py", [], ["x = 1\n"]) |
| 234 | patch_file = tmp_path / "new.patch" |
| 235 | patch_file.write_text(patch) |
| 236 | result = _invoke(root, str(patch_file)) |
| 237 | assert result.exit_code == 0 |
| 238 | assert (root / "new.py").exists() |
| 239 | assert "x = 1" in (root / "new.py").read_text() |
| 240 | |
| 241 | |
| 242 | def test_apply_json_output(tmp_path: pathlib.Path) -> None: |
| 243 | root = _init_repo(tmp_path) |
| 244 | (root / "a.py").write_text("x = 1\n", encoding="utf-8") |
| 245 | patch = _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"]) |
| 246 | patch_file = tmp_path / "change.patch" |
| 247 | patch_file.write_text(patch) |
| 248 | result = _invoke(root, str(patch_file), "--json") |
| 249 | assert result.exit_code == 0 |
| 250 | data = json.loads(result.stdout) |
| 251 | assert "applied" in data |
| 252 | assert "failed" in data |
| 253 | assert "a.py" in data["applied"] |
| 254 | |
| 255 | |
| 256 | def test_apply_multiple_files(tmp_path: pathlib.Path) -> None: |
| 257 | root = _init_repo(tmp_path) |
| 258 | (root / "a.py").write_text("x = 1\n", encoding="utf-8") |
| 259 | (root / "b.py").write_text("y = 1\n", encoding="utf-8") |
| 260 | patch = ( |
| 261 | _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"]) |
| 262 | + "\n" |
| 263 | + _make_simple_patch("b.py", ["y = 1\n"], ["y = 2\n"]) |
| 264 | ) |
| 265 | patch_file = tmp_path / "multi.patch" |
| 266 | patch_file.write_text(patch) |
| 267 | result = _invoke(root, str(patch_file), "--json") |
| 268 | assert result.exit_code == 0 |
| 269 | data = json.loads(result.stdout) |
| 270 | assert "a.py" in data["applied"] |
| 271 | assert "b.py" in data["applied"] |
| 272 | |
| 273 | |
| 274 | # --------------------------------------------------------------------------- |
| 275 | # Integration — --check mode |
| 276 | # --------------------------------------------------------------------------- |
| 277 | |
| 278 | |
| 279 | def test_apply_check_does_not_modify_file(tmp_path: pathlib.Path) -> None: |
| 280 | root = _init_repo(tmp_path) |
| 281 | (root / "a.py").write_text("x = 1\n", encoding="utf-8") |
| 282 | patch = _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"]) |
| 283 | patch_file = tmp_path / "change.patch" |
| 284 | patch_file.write_text(patch) |
| 285 | result = _invoke(root, str(patch_file), "--check") |
| 286 | assert result.exit_code == 0 |
| 287 | # File must be unchanged |
| 288 | assert (root / "a.py").read_text() == "x = 1\n" |
| 289 | |
| 290 | |
| 291 | def test_apply_check_exits_nonzero_on_conflict(tmp_path: pathlib.Path) -> None: |
| 292 | root = _init_repo(tmp_path) |
| 293 | (root / "a.py").write_text("completely different content\n", encoding="utf-8") |
| 294 | # Patch expects "x = 1" but file has different content |
| 295 | patch = _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"]) |
| 296 | patch_file = tmp_path / "conflict.patch" |
| 297 | patch_file.write_text(patch) |
| 298 | result = _invoke(root, str(patch_file), "--check") |
| 299 | assert result.exit_code != 0 |
| 300 | |
| 301 | |
| 302 | # --------------------------------------------------------------------------- |
| 303 | # Integration — format-patch → apply round-trip |
| 304 | # --------------------------------------------------------------------------- |
| 305 | |
| 306 | |
| 307 | def test_format_patch_apply_patch_roundtrip(tmp_path: pathlib.Path) -> None: |
| 308 | """format-patch → apply-patch roundtrip restores the working tree.""" |
| 309 | from tests.cli_test_helper import CliRunner as CR |
| 310 | from muse.cli.app import main as cli |
| 311 | cr = CR() |
| 312 | |
| 313 | root = _init_repo(tmp_path) |
| 314 | _commit_files(root, {"a.py": b"x = 1\n"}, message="initial") |
| 315 | _commit_files(root, {"a.py": b"x = 2\n"}, message="change x") |
| 316 | |
| 317 | out_dir = tmp_path / "patches" |
| 318 | out_dir.mkdir() |
| 319 | cr.invoke(cli, ["format-patch", "HEAD", "--output-dir", str(out_dir)], env=_env(root)) |
| 320 | |
| 321 | patch_file = next(out_dir.glob("*.mpatch")) |
| 322 | # Reset working tree to old content, then apply the mpatch |
| 323 | (root / "a.py").write_text("x = 1\n", encoding="utf-8") |
| 324 | result = cr.invoke(cli, ["apply-patch", str(patch_file), "--force"], env=_env(root)) |
| 325 | assert result.exit_code == 0 |
| 326 | assert (root / "a.py").read_text() == "x = 2\n" |
| 327 | |
| 328 | |
| 329 | # --------------------------------------------------------------------------- |
| 330 | # Security — path traversal in patch headers |
| 331 | # --------------------------------------------------------------------------- |
| 332 | |
| 333 | |
| 334 | def test_apply_rejects_path_traversal_in_patch(tmp_path: pathlib.Path) -> None: |
| 335 | root = _init_repo(tmp_path) |
| 336 | # Craft a patch with a traversal path |
| 337 | traversal_patch = textwrap.dedent("""\ |
| 338 | --- a/../../../tmp/malicious.py |
| 339 | +++ b/../../../tmp/malicious.py |
| 340 | @@ -0,0 +1 @@ |
| 341 | +malicious content |
| 342 | """) |
| 343 | patch_file = tmp_path / "malicious.patch" |
| 344 | patch_file.write_text(traversal_patch) |
| 345 | result = _invoke(root, str(patch_file)) |
| 346 | assert result.exit_code != 0 |
| 347 | |
| 348 | |
| 349 | def test_apply_rejects_muse_internal_paths(tmp_path: pathlib.Path) -> None: |
| 350 | root = _init_repo(tmp_path) |
| 351 | muse_patch = textwrap.dedent("""\ |
| 352 | --- a/.muse/config.toml |
| 353 | +++ b/.muse/config.toml |
| 354 | @@ -0,0 +1 @@ |
| 355 | +malicious = true |
| 356 | """) |
| 357 | patch_file = tmp_path / "muse.patch" |
| 358 | patch_file.write_text(muse_patch) |
| 359 | result = _invoke(root, str(patch_file)) |
| 360 | assert result.exit_code != 0 |
| 361 | |
| 362 | |
| 363 | # --------------------------------------------------------------------------- |
| 364 | # Stress — large hunk |
| 365 | # --------------------------------------------------------------------------- |
| 366 | |
| 367 | |
| 368 | def test_apply_large_hunk(tmp_path: pathlib.Path) -> None: |
| 369 | """A 50-line file with a change in the middle applies correctly.""" |
| 370 | root = _init_repo(tmp_path) |
| 371 | original = [f"line {i}\n" for i in range(50)] |
| 372 | modified = original[:25] + ["CHANGED\n"] + original[26:] |
| 373 | (root / "big.py").write_text("".join(original), encoding="utf-8") |
| 374 | patch = _make_simple_patch("big.py", original, modified) |
| 375 | patch_file = tmp_path / "big.patch" |
| 376 | patch_file.write_text(patch) |
| 377 | result = _invoke(root, str(patch_file)) |
| 378 | assert result.exit_code == 0 |
| 379 | result_lines = (root / "big.py").read_text().splitlines(keepends=True) |
| 380 | assert result_lines[25] == "CHANGED\n" |
| 381 | assert result_lines[0] == "line 0\n" |
| 382 | assert result_lines[49] == "line 49\n" |
| 383 | |
| 384 | |
| 385 | import argparse as _argparse |
| 386 | |
| 387 | |
| 388 | class TestRegisterFlags: |
| 389 | def _parse(self, *args: str) -> _argparse.Namespace: |
| 390 | from muse.cli.commands.apply import register |
| 391 | p = _argparse.ArgumentParser() |
| 392 | sub = p.add_subparsers() |
| 393 | register(sub) |
| 394 | return p.parse_args(["apply", *args]) |
| 395 | |
| 396 | def test_default_json_out_is_false(self) -> None: |
| 397 | ns = self._parse("dummy.patch") |
| 398 | assert ns.json_out is False |
| 399 | |
| 400 | def test_json_flag_sets_json_out(self) -> None: |
| 401 | ns = self._parse("--json", "dummy.patch") |
| 402 | assert ns.json_out is True |
| 403 | |
| 404 | def test_j_shorthand_sets_json_out(self) -> None: |
| 405 | ns = self._parse("-j", "dummy.patch") |
| 406 | assert ns.json_out is True |
| 407 | |
| 408 | def test_check_default(self) -> None: |
| 409 | ns = self._parse("dummy.patch") |
| 410 | assert ns.check is False |
| 411 | |
| 412 | def test_staged_default(self) -> None: |
| 413 | ns = self._parse("dummy.patch") |
| 414 | assert ns.staged is False |
File History
1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b
fix: try fetch/presign before fetch/mpack to avoid Cloudfla…
Sonnet 4.6
patch
6 days ago