test_cmd_apply_patch.py
file-level
1
files
1
commits
0
hotspots
0
🧊 dead
0
💥 blast risk
| 1 | """Tests for ``muse apply-patch`` — apply a Muse .mpatch file to the working tree. |
| 2 | |
| 3 | Test tiers |
| 4 | ---------- |
| 5 | - Unit: output schema, exit_code/duration_ms in JSON |
| 6 | - Integration: apply restores files, --dry-run reports without writing, --check |
| 7 | - Data integrity: patch_id verified before apply, tampered patch rejected |
| 8 | - Security: path traversal in manifest rejected, error to stderr |
| 9 | - Edge: empty diff applies cleanly, already-applied patch detectable via snapshot check |
| 10 | """ |
| 11 | from __future__ import annotations |
| 12 | from collections.abc import Mapping |
| 13 | |
| 14 | import datetime |
| 15 | import json |
| 16 | import pathlib |
| 17 | |
| 18 | import pytest |
| 19 | |
| 20 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 21 | from muse.core.ids import hash_commit, hash_snapshot |
| 22 | from muse.core.commits import ( |
| 23 | CommitRecord, |
| 24 | write_commit, |
| 25 | ) |
| 26 | from muse.core.snapshots import ( |
| 27 | SnapshotRecord, |
| 28 | write_snapshot, |
| 29 | ) |
| 30 | from muse.core.object_store import write_object |
| 31 | from muse.core.types import long_id, blob_id, fake_id |
| 32 | from muse.core.paths import muse_dir, ref_path |
| 33 | |
| 34 | runner = CliRunner() |
| 35 | |
| 36 | |
| 37 | # --------------------------------------------------------------------------- |
| 38 | # Helpers |
| 39 | # --------------------------------------------------------------------------- |
| 40 | |
| 41 | |
| 42 | def _init_repo(path: pathlib.Path) -> pathlib.Path: |
| 43 | dot_muse = muse_dir(path) |
| 44 | for sub in ("commits", "snapshots", "objects", "refs/heads"): |
| 45 | (dot_muse / sub).mkdir(parents=True, exist_ok=True) |
| 46 | (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") |
| 47 | (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo", "domain": "code"})) |
| 48 | return path |
| 49 | |
| 50 | |
| 51 | def _write_object(repo: pathlib.Path, content: bytes) -> str: |
| 52 | """Write bytes to the object store and return the sha256: prefixed ID.""" |
| 53 | oid = blob_id(content) |
| 54 | write_object(repo, oid, content) |
| 55 | return oid |
| 56 | |
| 57 | |
| 58 | def _commit( |
| 59 | repo: pathlib.Path, |
| 60 | msg: str, |
| 61 | manifest: dict[str, str], |
| 62 | branch: str = "main", |
| 63 | parent: str | None = None, |
| 64 | ts: datetime.datetime | None = None, |
| 65 | ) -> str: |
| 66 | ts = ts or datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) |
| 67 | sid = hash_snapshot(manifest) |
| 68 | write_snapshot(repo, SnapshotRecord(snapshot_id=sid, manifest=manifest, created_at=ts)) |
| 69 | parent_ids = [parent] if parent else [] |
| 70 | cid = hash_commit( |
| 71 | parent_ids=parent_ids, |
| 72 | snapshot_id=sid, |
| 73 | message=msg, |
| 74 | committed_at_iso=ts.isoformat(), |
| 75 | author="gabriel", |
| 76 | ) |
| 77 | write_commit(repo, CommitRecord( |
| 78 | commit_id=cid, branch=branch, |
| 79 | snapshot_id=sid, message=msg, committed_at=ts, |
| 80 | author="gabriel", parent_commit_id=parent, parent2_commit_id=None, |
| 81 | )) |
| 82 | branch_ref = ref_path(repo, branch) |
| 83 | branch_ref.parent.mkdir(parents=True, exist_ok=True) |
| 84 | branch_ref.write_text(cid) |
| 85 | return cid |
| 86 | |
| 87 | |
| 88 | def _make_patch(repo: pathlib.Path, tmp_path: pathlib.Path) -> pathlib.Path: |
| 89 | """Create a .mpatch file by running format-patch --output-dir.""" |
| 90 | out_dir = tmp_path / "patches" |
| 91 | out_dir.mkdir(exist_ok=True) |
| 92 | r = runner.invoke(None, ["format-patch", "--output-dir", str(out_dir)], |
| 93 | env={"MUSE_REPO_ROOT": str(repo)}) |
| 94 | assert r.exit_code == 0, f"format-patch failed: {r.output}" |
| 95 | patches = list(out_dir.glob("*.mpatch")) |
| 96 | assert len(patches) == 1 |
| 97 | return patches[0] |
| 98 | |
| 99 | |
| 100 | def _ap(repo: pathlib.Path, patch_file: pathlib.Path, *args: str) -> InvokeResult: |
| 101 | return runner.invoke(None, ["apply-patch", str(patch_file), *args], |
| 102 | env={"MUSE_REPO_ROOT": str(repo)}) |
| 103 | |
| 104 | |
| 105 | def _json(r: InvokeResult) -> Mapping[str, object]: |
| 106 | return json.loads(r.output) |
| 107 | |
| 108 | |
| 109 | # --------------------------------------------------------------------------- |
| 110 | # JSON output schema |
| 111 | # --------------------------------------------------------------------------- |
| 112 | |
| 113 | |
| 114 | class TestJsonSchema: |
| 115 | def test_exits_zero_on_success(self, tmp_path: pathlib.Path) -> None: |
| 116 | src = _init_repo(tmp_path / "src") |
| 117 | oid = _write_object(src, b"x = 1\n") |
| 118 | c1 = _commit(src, "c1", {"a.py": oid}) |
| 119 | patch = _make_patch(src, tmp_path) |
| 120 | |
| 121 | # Target repo at c1's snapshot (same as from_snapshot) |
| 122 | target = _init_repo(tmp_path / "target") |
| 123 | oid_t = _write_object(target, b"x = 1\n") |
| 124 | # Use the same snapshot_id as source c1 so applicability passes |
| 125 | sid = hash_snapshot({"a.py": oid_t}) |
| 126 | write_snapshot(target, SnapshotRecord( |
| 127 | snapshot_id=sid, |
| 128 | manifest={"a.py": oid_t}, |
| 129 | created_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc), |
| 130 | )) |
| 131 | _commit(target, "c1", {"a.py": oid_t}) |
| 132 | |
| 133 | r = _ap(target, patch, "--json") |
| 134 | assert r.exit_code == 0 |
| 135 | |
| 136 | def test_json_has_exit_code_zero(self, tmp_path: pathlib.Path) -> None: |
| 137 | src = _init_repo(tmp_path / "src") |
| 138 | oid = _write_object(src, b"x = 1\n") |
| 139 | _commit(src, "c1", {"a.py": oid}) |
| 140 | patch = _make_patch(src, tmp_path) |
| 141 | |
| 142 | target = _init_repo(tmp_path / "target") |
| 143 | oid_t = _write_object(target, b"x = 1\n") |
| 144 | _commit(target, "c1", {"a.py": oid_t}) |
| 145 | |
| 146 | data = _json(_ap(target, patch, "--json")) |
| 147 | assert data["exit_code"] == 0 |
| 148 | |
| 149 | def test_exit_code_is_int_not_bool(self, tmp_path: pathlib.Path) -> None: |
| 150 | src = _init_repo(tmp_path / "src") |
| 151 | oid = _write_object(src, b"x = 1\n") |
| 152 | _commit(src, "c1", {"a.py": oid}) |
| 153 | patch = _make_patch(src, tmp_path) |
| 154 | |
| 155 | target = _init_repo(tmp_path / "target") |
| 156 | oid_t = _write_object(target, b"x = 1\n") |
| 157 | _commit(target, "c1", {"a.py": oid_t}) |
| 158 | |
| 159 | data = _json(_ap(target, patch, "--json")) |
| 160 | assert type(data["exit_code"]) is int |
| 161 | |
| 162 | def test_json_has_duration_ms(self, tmp_path: pathlib.Path) -> None: |
| 163 | src = _init_repo(tmp_path / "src") |
| 164 | oid = _write_object(src, b"x = 1\n") |
| 165 | _commit(src, "c1", {"a.py": oid}) |
| 166 | patch = _make_patch(src, tmp_path) |
| 167 | |
| 168 | target = _init_repo(tmp_path / "target") |
| 169 | oid_t = _write_object(target, b"x = 1\n") |
| 170 | _commit(target, "c1", {"a.py": oid_t}) |
| 171 | |
| 172 | data = _json(_ap(target, patch, "--json")) |
| 173 | assert "duration_ms" in data |
| 174 | |
| 175 | def test_duration_ms_is_float(self, tmp_path: pathlib.Path) -> None: |
| 176 | src = _init_repo(tmp_path / "src") |
| 177 | oid = _write_object(src, b"x = 1\n") |
| 178 | _commit(src, "c1", {"a.py": oid}) |
| 179 | patch = _make_patch(src, tmp_path) |
| 180 | |
| 181 | target = _init_repo(tmp_path / "target") |
| 182 | oid_t = _write_object(target, b"x = 1\n") |
| 183 | _commit(target, "c1", {"a.py": oid_t}) |
| 184 | |
| 185 | data = _json(_ap(target, patch, "--json")) |
| 186 | assert isinstance(data["duration_ms"], float) |
| 187 | assert data["duration_ms"] >= 0.0 |
| 188 | |
| 189 | def test_json_output_is_compact(self, tmp_path: pathlib.Path) -> None: |
| 190 | src = _init_repo(tmp_path / "src") |
| 191 | oid = _write_object(src, b"x = 1\n") |
| 192 | _commit(src, "c1", {"a.py": oid}) |
| 193 | patch = _make_patch(src, tmp_path) |
| 194 | |
| 195 | target = _init_repo(tmp_path / "target") |
| 196 | oid_t = _write_object(target, b"x = 1\n") |
| 197 | _commit(target, "c1", {"a.py": oid_t}) |
| 198 | |
| 199 | r = _ap(target, patch, "--json") |
| 200 | assert len(r.output.strip().splitlines()) == 1 |
| 201 | |
| 202 | def test_json_has_patch_id(self, tmp_path: pathlib.Path) -> None: |
| 203 | src = _init_repo(tmp_path / "src") |
| 204 | oid = _write_object(src, b"x = 1\n") |
| 205 | _commit(src, "c1", {"a.py": oid}) |
| 206 | patch = _make_patch(src, tmp_path) |
| 207 | |
| 208 | target = _init_repo(tmp_path / "target") |
| 209 | oid_t = _write_object(target, b"x = 1\n") |
| 210 | _commit(target, "c1", {"a.py": oid_t}) |
| 211 | |
| 212 | data = _json(_ap(target, patch, "--json")) |
| 213 | assert "patch_id" in data |
| 214 | assert data["patch_id"].startswith("sha256:") |
| 215 | |
| 216 | def test_json_has_files_applied(self, tmp_path: pathlib.Path) -> None: |
| 217 | src = _init_repo(tmp_path / "src") |
| 218 | oid = _write_object(src, b"x = 1\n") |
| 219 | _commit(src, "c1", {"a.py": oid}) |
| 220 | patch = _make_patch(src, tmp_path) |
| 221 | |
| 222 | target = _init_repo(tmp_path / "target") |
| 223 | oid_t = _write_object(target, b"x = 1\n") |
| 224 | _commit(target, "c1", {"a.py": oid_t}) |
| 225 | |
| 226 | data = _json(_ap(target, patch, "--json")) |
| 227 | assert "files_applied" in data |
| 228 | assert isinstance(data["files_applied"], list) |
| 229 | |
| 230 | |
| 231 | # --------------------------------------------------------------------------- |
| 232 | # Files are restored to disk |
| 233 | # --------------------------------------------------------------------------- |
| 234 | |
| 235 | |
| 236 | class TestFileRestoration: |
| 237 | def test_added_file_exists_after_apply(self, tmp_path: pathlib.Path) -> None: |
| 238 | src = _init_repo(tmp_path / "src") |
| 239 | oid = _write_object(src, b"x = 1\n") |
| 240 | _commit(src, "c1", {"a.py": oid}) |
| 241 | patch = _make_patch(src, tmp_path) |
| 242 | |
| 243 | target = _init_repo(tmp_path / "target") |
| 244 | _commit(target, "empty", {}) |
| 245 | _ap(target, patch) |
| 246 | assert (target / "a.py").exists() |
| 247 | |
| 248 | def test_added_file_has_correct_content(self, tmp_path: pathlib.Path) -> None: |
| 249 | src = _init_repo(tmp_path / "src") |
| 250 | oid = _write_object(src, b"x = 42\n") |
| 251 | _commit(src, "c1", {"a.py": oid}) |
| 252 | patch = _make_patch(src, tmp_path) |
| 253 | |
| 254 | target = _init_repo(tmp_path / "target") |
| 255 | _commit(target, "empty", {}) |
| 256 | _ap(target, patch) |
| 257 | assert (target / "a.py").read_bytes() == b"x = 42\n" |
| 258 | |
| 259 | def test_deleted_file_removed_after_apply(self, tmp_path: pathlib.Path) -> None: |
| 260 | src = _init_repo(tmp_path / "src") |
| 261 | oid1 = _write_object(src, b"x = 1\n") |
| 262 | oid2 = _write_object(src, b"y = 2\n") |
| 263 | c1 = _commit(src, "c1", {"old.py": oid1, "keep.py": oid2}) |
| 264 | _commit(src, "c2", {"keep.py": oid2}, parent=c1) |
| 265 | patch = _make_patch(src, tmp_path) |
| 266 | |
| 267 | target = _init_repo(tmp_path / "target") |
| 268 | oid_old = _write_object(target, b"x = 1\n") |
| 269 | oid_keep = _write_object(target, b"y = 2\n") |
| 270 | _commit(target, "c1", {"old.py": oid_old, "keep.py": oid_keep}) |
| 271 | # Write both files to disk (simulating a working tree at c1) |
| 272 | (target / "old.py").write_bytes(b"x = 1\n") |
| 273 | (target / "keep.py").write_bytes(b"y = 2\n") |
| 274 | _ap(target, patch) |
| 275 | # old.py was deleted by the patch; keep.py was untouched on disk |
| 276 | assert not (target / "old.py").exists() |
| 277 | assert (target / "keep.py").exists() |
| 278 | |
| 279 | |
| 280 | # --------------------------------------------------------------------------- |
| 281 | # --dry-run does not modify disk |
| 282 | # --------------------------------------------------------------------------- |
| 283 | |
| 284 | |
| 285 | class TestDryRun: |
| 286 | def test_dry_run_exits_zero(self, tmp_path: pathlib.Path) -> None: |
| 287 | src = _init_repo(tmp_path / "src") |
| 288 | oid = _write_object(src, b"x = 1\n") |
| 289 | _commit(src, "c1", {"a.py": oid}) |
| 290 | patch = _make_patch(src, tmp_path) |
| 291 | |
| 292 | target = _init_repo(tmp_path / "target") |
| 293 | _commit(target, "empty", {}) |
| 294 | r = _ap(target, patch, "--dry-run") |
| 295 | assert r.exit_code == 0 |
| 296 | |
| 297 | def test_dry_run_does_not_write_files(self, tmp_path: pathlib.Path) -> None: |
| 298 | src = _init_repo(tmp_path / "src") |
| 299 | oid = _write_object(src, b"x = 1\n") |
| 300 | _commit(src, "c1", {"a.py": oid}) |
| 301 | patch = _make_patch(src, tmp_path) |
| 302 | |
| 303 | target = _init_repo(tmp_path / "target") |
| 304 | _commit(target, "empty", {}) |
| 305 | _ap(target, patch, "--dry-run") |
| 306 | assert not (target / "a.py").exists() |
| 307 | |
| 308 | def test_dry_run_json_has_dry_run_true(self, tmp_path: pathlib.Path) -> None: |
| 309 | src = _init_repo(tmp_path / "src") |
| 310 | oid = _write_object(src, b"x = 1\n") |
| 311 | _commit(src, "c1", {"a.py": oid}) |
| 312 | patch = _make_patch(src, tmp_path) |
| 313 | |
| 314 | target = _init_repo(tmp_path / "target") |
| 315 | _commit(target, "empty", {}) |
| 316 | data = _json(_ap(target, patch, "--dry-run", "--json")) |
| 317 | assert data.get("dry_run") is True |
| 318 | |
| 319 | |
| 320 | # --------------------------------------------------------------------------- |
| 321 | # --check (applicability only) |
| 322 | # --------------------------------------------------------------------------- |
| 323 | |
| 324 | |
| 325 | class TestCheck: |
| 326 | def test_check_exits_zero_when_applicable(self, tmp_path: pathlib.Path) -> None: |
| 327 | src = _init_repo(tmp_path / "src") |
| 328 | oid = _write_object(src, b"x = 1\n") |
| 329 | _commit(src, "c1", {"a.py": oid}) |
| 330 | patch = _make_patch(src, tmp_path) |
| 331 | |
| 332 | target = _init_repo(tmp_path / "target") |
| 333 | _commit(target, "empty", {}) |
| 334 | r = _ap(target, patch, "--check") |
| 335 | assert r.exit_code == 0 |
| 336 | |
| 337 | def test_check_does_not_write_files(self, tmp_path: pathlib.Path) -> None: |
| 338 | src = _init_repo(tmp_path / "src") |
| 339 | oid = _write_object(src, b"x = 1\n") |
| 340 | _commit(src, "c1", {"a.py": oid}) |
| 341 | patch = _make_patch(src, tmp_path) |
| 342 | |
| 343 | target = _init_repo(tmp_path / "target") |
| 344 | _commit(target, "empty", {}) |
| 345 | _ap(target, patch, "--check") |
| 346 | assert not (target / "a.py").exists() |
| 347 | |
| 348 | def test_check_json_has_applicable_field(self, tmp_path: pathlib.Path) -> None: |
| 349 | src = _init_repo(tmp_path / "src") |
| 350 | oid = _write_object(src, b"x = 1\n") |
| 351 | _commit(src, "c1", {"a.py": oid}) |
| 352 | patch = _make_patch(src, tmp_path) |
| 353 | |
| 354 | target = _init_repo(tmp_path / "target") |
| 355 | _commit(target, "empty", {}) |
| 356 | data = _json(_ap(target, patch, "--check", "--json")) |
| 357 | assert "applicable" in data |
| 358 | assert isinstance(data["applicable"], bool) |
| 359 | |
| 360 | |
| 361 | # --------------------------------------------------------------------------- |
| 362 | # Integrity verification |
| 363 | # --------------------------------------------------------------------------- |
| 364 | |
| 365 | |
| 366 | class TestIntegrity: |
| 367 | def test_tampered_patch_id_rejected(self, tmp_path: pathlib.Path) -> None: |
| 368 | src = _init_repo(tmp_path / "src") |
| 369 | oid = _write_object(src, b"x = 1\n") |
| 370 | _commit(src, "c1", {"a.py": oid}) |
| 371 | patch = _make_patch(src, tmp_path) |
| 372 | |
| 373 | # Tamper with patch_id |
| 374 | data = json.loads(patch.read_bytes()) |
| 375 | data["patch_id"] = fake_id("tampered-patch") |
| 376 | patch.write_bytes(json.dumps(data).encode()) |
| 377 | |
| 378 | target = _init_repo(tmp_path / "target") |
| 379 | _commit(target, "empty", {}) |
| 380 | r = _ap(target, patch) |
| 381 | assert r.exit_code != 0 |
| 382 | |
| 383 | def test_missing_patch_file_exits_nonzero(self, tmp_path: pathlib.Path) -> None: |
| 384 | repo = _init_repo(tmp_path) |
| 385 | oid = _write_object(repo, b"x = 1\n") |
| 386 | _commit(repo, "init", {"a.py": oid}) |
| 387 | r = _ap(repo, tmp_path / "nonexistent.mpatch") |
| 388 | assert r.exit_code != 0 |
| 389 | |
| 390 | def test_corrupt_json_exits_nonzero(self, tmp_path: pathlib.Path) -> None: |
| 391 | repo = _init_repo(tmp_path) |
| 392 | oid = _write_object(repo, b"x = 1\n") |
| 393 | _commit(repo, "init", {"a.py": oid}) |
| 394 | bad_patch = tmp_path / "bad.mpatch" |
| 395 | bad_patch.write_bytes(b"not valid json!!!") |
| 396 | r = _ap(repo, bad_patch) |
| 397 | assert r.exit_code != 0 |
| 398 | |
| 399 | |
| 400 | import argparse as _argparse |
| 401 | |
| 402 | |
| 403 | class TestRegisterFlags: |
| 404 | def _parse(self, *args: str) -> _argparse.Namespace: |
| 405 | from muse.cli.commands.apply_patch import register |
| 406 | p = _argparse.ArgumentParser() |
| 407 | sub = p.add_subparsers() |
| 408 | register(sub) |
| 409 | return p.parse_args(["apply-patch", *args]) |
| 410 | |
| 411 | def test_default_json_out_is_false(self) -> None: |
| 412 | ns = self._parse("dummy.mpatch") |
| 413 | assert ns.json_out is False |
| 414 | |
| 415 | def test_json_flag_sets_json_out(self) -> None: |
| 416 | ns = self._parse("--json", "dummy.mpatch") |
| 417 | assert ns.json_out is True |
| 418 | |
| 419 | def test_j_shorthand_sets_json_out(self) -> None: |
| 420 | ns = self._parse("-j", "dummy.mpatch") |
| 421 | assert ns.json_out is True |
| 422 | |
| 423 | def test_dry_run_default(self) -> None: |
| 424 | ns = self._parse("dummy.mpatch") |
| 425 | assert ns.dry_run is False |
| 426 | |
| 427 | def test_dry_run_flag(self) -> None: |
| 428 | ns = self._parse("--dry-run", "dummy.mpatch") |
| 429 | assert ns.dry_run is True |
| 430 | |
| 431 | def test_dry_run_n_shorthand(self) -> None: |
| 432 | ns = self._parse("-n", "dummy.mpatch") |
| 433 | assert ns.dry_run is True |
| 434 | |
| 435 | def test_force_default(self) -> None: |
| 436 | ns = self._parse("dummy.mpatch") |
| 437 | assert ns.force is False |
| 438 | |
| 439 | def test_force_flag(self) -> None: |
| 440 | ns = self._parse("--force", "dummy.mpatch") |
| 441 | assert ns.force is True |
| 442 | |
| 443 | def test_force_f_shorthand(self) -> None: |
| 444 | ns = self._parse("-f", "dummy.mpatch") |
| 445 | assert ns.force is True |