gabriel / muse public

test_cmd_apply_patch.py file-level

at sha256:5 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 💥 blast risk
sha256:4 Merge branch 'dev' into main · gabriel · Jun 17, 2026
1 """Tests for ``muse apply-patch`` — apply a Muse .mpatch file to the working tree.
2
3 Test tiers
4 ----------
5 - Unit: output schema, exit_code/duration_ms in JSON
6 - Integration: apply restores files, --dry-run reports without writing, --check
7 - Data integrity: patch_id verified before apply, tampered patch rejected
8 - Security: path traversal in manifest rejected, error to stderr
9 - Edge: empty diff applies cleanly, already-applied patch detectable via snapshot check
10 """
11 from __future__ import annotations
12 from collections.abc import Mapping
13
14 import datetime
15 import json
16 import pathlib
17
18 import pytest
19
20 from tests.cli_test_helper import CliRunner, InvokeResult
21 from muse.core.ids import hash_commit, hash_snapshot
22 from muse.core.commits import (
23 CommitRecord,
24 write_commit,
25 )
26 from muse.core.snapshots import (
27 SnapshotRecord,
28 write_snapshot,
29 )
30 from muse.core.object_store import write_object
31 from muse.core.types import long_id, blob_id, fake_id
32 from muse.core.paths import muse_dir, ref_path
33
34 runner = CliRunner()
35
36
37 # ---------------------------------------------------------------------------
38 # Helpers
39 # ---------------------------------------------------------------------------
40
41
42 def _init_repo(path: pathlib.Path) -> pathlib.Path:
43 dot_muse = muse_dir(path)
44 for sub in ("commits", "snapshots", "objects", "refs/heads"):
45 (dot_muse / sub).mkdir(parents=True, exist_ok=True)
46 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n")
47 (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo", "domain": "code"}))
48 return path
49
50
51 def _write_object(repo: pathlib.Path, content: bytes) -> str:
52 """Write bytes to the object store and return the sha256: prefixed ID."""
53 oid = blob_id(content)
54 write_object(repo, oid, content)
55 return oid
56
57
58 def _commit(
59 repo: pathlib.Path,
60 msg: str,
61 manifest: dict[str, str],
62 branch: str = "main",
63 parent: str | None = None,
64 ts: datetime.datetime | None = None,
65 ) -> str:
66 ts = ts or datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
67 sid = hash_snapshot(manifest)
68 write_snapshot(repo, SnapshotRecord(snapshot_id=sid, manifest=manifest, created_at=ts))
69 parent_ids = [parent] if parent else []
70 cid = hash_commit(
71 parent_ids=parent_ids,
72 snapshot_id=sid,
73 message=msg,
74 committed_at_iso=ts.isoformat(),
75 author="gabriel",
76 )
77 write_commit(repo, CommitRecord(
78 commit_id=cid, branch=branch,
79 snapshot_id=sid, message=msg, committed_at=ts,
80 author="gabriel", parent_commit_id=parent, parent2_commit_id=None,
81 ))
82 branch_ref = ref_path(repo, branch)
83 branch_ref.parent.mkdir(parents=True, exist_ok=True)
84 branch_ref.write_text(cid)
85 return cid
86
87
88 def _make_patch(repo: pathlib.Path, tmp_path: pathlib.Path) -> pathlib.Path:
89 """Create a .mpatch file by running format-patch --output-dir."""
90 out_dir = tmp_path / "patches"
91 out_dir.mkdir(exist_ok=True)
92 r = runner.invoke(None, ["format-patch", "--output-dir", str(out_dir)],
93 env={"MUSE_REPO_ROOT": str(repo)})
94 assert r.exit_code == 0, f"format-patch failed: {r.output}"
95 patches = list(out_dir.glob("*.mpatch"))
96 assert len(patches) == 1
97 return patches[0]
98
99
100 def _ap(repo: pathlib.Path, patch_file: pathlib.Path, *args: str) -> InvokeResult:
101 return runner.invoke(None, ["apply-patch", str(patch_file), *args],
102 env={"MUSE_REPO_ROOT": str(repo)})
103
104
105 def _json(r: InvokeResult) -> Mapping[str, object]:
106 return json.loads(r.output)
107
108
109 # ---------------------------------------------------------------------------
110 # JSON output schema
111 # ---------------------------------------------------------------------------
112
113
114 class TestJsonSchema:
115 def test_exits_zero_on_success(self, tmp_path: pathlib.Path) -> None:
116 src = _init_repo(tmp_path / "src")
117 oid = _write_object(src, b"x = 1\n")
118 c1 = _commit(src, "c1", {"a.py": oid})
119 patch = _make_patch(src, tmp_path)
120
121 # Target repo at c1's snapshot (same as from_snapshot)
122 target = _init_repo(tmp_path / "target")
123 oid_t = _write_object(target, b"x = 1\n")
124 # Use the same snapshot_id as source c1 so applicability passes
125 sid = hash_snapshot({"a.py": oid_t})
126 write_snapshot(target, SnapshotRecord(
127 snapshot_id=sid,
128 manifest={"a.py": oid_t},
129 created_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
130 ))
131 _commit(target, "c1", {"a.py": oid_t})
132
133 r = _ap(target, patch, "--json")
134 assert r.exit_code == 0
135
136 def test_json_has_exit_code_zero(self, tmp_path: pathlib.Path) -> None:
137 src = _init_repo(tmp_path / "src")
138 oid = _write_object(src, b"x = 1\n")
139 _commit(src, "c1", {"a.py": oid})
140 patch = _make_patch(src, tmp_path)
141
142 target = _init_repo(tmp_path / "target")
143 oid_t = _write_object(target, b"x = 1\n")
144 _commit(target, "c1", {"a.py": oid_t})
145
146 data = _json(_ap(target, patch, "--json"))
147 assert data["exit_code"] == 0
148
149 def test_exit_code_is_int_not_bool(self, tmp_path: pathlib.Path) -> None:
150 src = _init_repo(tmp_path / "src")
151 oid = _write_object(src, b"x = 1\n")
152 _commit(src, "c1", {"a.py": oid})
153 patch = _make_patch(src, tmp_path)
154
155 target = _init_repo(tmp_path / "target")
156 oid_t = _write_object(target, b"x = 1\n")
157 _commit(target, "c1", {"a.py": oid_t})
158
159 data = _json(_ap(target, patch, "--json"))
160 assert type(data["exit_code"]) is int
161
162 def test_json_has_duration_ms(self, tmp_path: pathlib.Path) -> None:
163 src = _init_repo(tmp_path / "src")
164 oid = _write_object(src, b"x = 1\n")
165 _commit(src, "c1", {"a.py": oid})
166 patch = _make_patch(src, tmp_path)
167
168 target = _init_repo(tmp_path / "target")
169 oid_t = _write_object(target, b"x = 1\n")
170 _commit(target, "c1", {"a.py": oid_t})
171
172 data = _json(_ap(target, patch, "--json"))
173 assert "duration_ms" in data
174
175 def test_duration_ms_is_float(self, tmp_path: pathlib.Path) -> None:
176 src = _init_repo(tmp_path / "src")
177 oid = _write_object(src, b"x = 1\n")
178 _commit(src, "c1", {"a.py": oid})
179 patch = _make_patch(src, tmp_path)
180
181 target = _init_repo(tmp_path / "target")
182 oid_t = _write_object(target, b"x = 1\n")
183 _commit(target, "c1", {"a.py": oid_t})
184
185 data = _json(_ap(target, patch, "--json"))
186 assert isinstance(data["duration_ms"], float)
187 assert data["duration_ms"] >= 0.0
188
189 def test_json_output_is_compact(self, tmp_path: pathlib.Path) -> None:
190 src = _init_repo(tmp_path / "src")
191 oid = _write_object(src, b"x = 1\n")
192 _commit(src, "c1", {"a.py": oid})
193 patch = _make_patch(src, tmp_path)
194
195 target = _init_repo(tmp_path / "target")
196 oid_t = _write_object(target, b"x = 1\n")
197 _commit(target, "c1", {"a.py": oid_t})
198
199 r = _ap(target, patch, "--json")
200 assert len(r.output.strip().splitlines()) == 1
201
202 def test_json_has_patch_id(self, tmp_path: pathlib.Path) -> None:
203 src = _init_repo(tmp_path / "src")
204 oid = _write_object(src, b"x = 1\n")
205 _commit(src, "c1", {"a.py": oid})
206 patch = _make_patch(src, tmp_path)
207
208 target = _init_repo(tmp_path / "target")
209 oid_t = _write_object(target, b"x = 1\n")
210 _commit(target, "c1", {"a.py": oid_t})
211
212 data = _json(_ap(target, patch, "--json"))
213 assert "patch_id" in data
214 assert data["patch_id"].startswith("sha256:")
215
216 def test_json_has_files_applied(self, tmp_path: pathlib.Path) -> None:
217 src = _init_repo(tmp_path / "src")
218 oid = _write_object(src, b"x = 1\n")
219 _commit(src, "c1", {"a.py": oid})
220 patch = _make_patch(src, tmp_path)
221
222 target = _init_repo(tmp_path / "target")
223 oid_t = _write_object(target, b"x = 1\n")
224 _commit(target, "c1", {"a.py": oid_t})
225
226 data = _json(_ap(target, patch, "--json"))
227 assert "files_applied" in data
228 assert isinstance(data["files_applied"], list)
229
230
231 # ---------------------------------------------------------------------------
232 # Files are restored to disk
233 # ---------------------------------------------------------------------------
234
235
236 class TestFileRestoration:
237 def test_added_file_exists_after_apply(self, tmp_path: pathlib.Path) -> None:
238 src = _init_repo(tmp_path / "src")
239 oid = _write_object(src, b"x = 1\n")
240 _commit(src, "c1", {"a.py": oid})
241 patch = _make_patch(src, tmp_path)
242
243 target = _init_repo(tmp_path / "target")
244 _commit(target, "empty", {})
245 _ap(target, patch)
246 assert (target / "a.py").exists()
247
248 def test_added_file_has_correct_content(self, tmp_path: pathlib.Path) -> None:
249 src = _init_repo(tmp_path / "src")
250 oid = _write_object(src, b"x = 42\n")
251 _commit(src, "c1", {"a.py": oid})
252 patch = _make_patch(src, tmp_path)
253
254 target = _init_repo(tmp_path / "target")
255 _commit(target, "empty", {})
256 _ap(target, patch)
257 assert (target / "a.py").read_bytes() == b"x = 42\n"
258
259 def test_deleted_file_removed_after_apply(self, tmp_path: pathlib.Path) -> None:
260 src = _init_repo(tmp_path / "src")
261 oid1 = _write_object(src, b"x = 1\n")
262 oid2 = _write_object(src, b"y = 2\n")
263 c1 = _commit(src, "c1", {"old.py": oid1, "keep.py": oid2})
264 _commit(src, "c2", {"keep.py": oid2}, parent=c1)
265 patch = _make_patch(src, tmp_path)
266
267 target = _init_repo(tmp_path / "target")
268 oid_old = _write_object(target, b"x = 1\n")
269 oid_keep = _write_object(target, b"y = 2\n")
270 _commit(target, "c1", {"old.py": oid_old, "keep.py": oid_keep})
271 # Write both files to disk (simulating a working tree at c1)
272 (target / "old.py").write_bytes(b"x = 1\n")
273 (target / "keep.py").write_bytes(b"y = 2\n")
274 _ap(target, patch)
275 # old.py was deleted by the patch; keep.py was untouched on disk
276 assert not (target / "old.py").exists()
277 assert (target / "keep.py").exists()
278
279
280 # ---------------------------------------------------------------------------
281 # --dry-run does not modify disk
282 # ---------------------------------------------------------------------------
283
284
285 class TestDryRun:
286 def test_dry_run_exits_zero(self, tmp_path: pathlib.Path) -> None:
287 src = _init_repo(tmp_path / "src")
288 oid = _write_object(src, b"x = 1\n")
289 _commit(src, "c1", {"a.py": oid})
290 patch = _make_patch(src, tmp_path)
291
292 target = _init_repo(tmp_path / "target")
293 _commit(target, "empty", {})
294 r = _ap(target, patch, "--dry-run")
295 assert r.exit_code == 0
296
297 def test_dry_run_does_not_write_files(self, tmp_path: pathlib.Path) -> None:
298 src = _init_repo(tmp_path / "src")
299 oid = _write_object(src, b"x = 1\n")
300 _commit(src, "c1", {"a.py": oid})
301 patch = _make_patch(src, tmp_path)
302
303 target = _init_repo(tmp_path / "target")
304 _commit(target, "empty", {})
305 _ap(target, patch, "--dry-run")
306 assert not (target / "a.py").exists()
307
308 def test_dry_run_json_has_dry_run_true(self, tmp_path: pathlib.Path) -> None:
309 src = _init_repo(tmp_path / "src")
310 oid = _write_object(src, b"x = 1\n")
311 _commit(src, "c1", {"a.py": oid})
312 patch = _make_patch(src, tmp_path)
313
314 target = _init_repo(tmp_path / "target")
315 _commit(target, "empty", {})
316 data = _json(_ap(target, patch, "--dry-run", "--json"))
317 assert data.get("dry_run") is True
318
319
320 # ---------------------------------------------------------------------------
321 # --check (applicability only)
322 # ---------------------------------------------------------------------------
323
324
325 class TestCheck:
326 def test_check_exits_zero_when_applicable(self, tmp_path: pathlib.Path) -> None:
327 src = _init_repo(tmp_path / "src")
328 oid = _write_object(src, b"x = 1\n")
329 _commit(src, "c1", {"a.py": oid})
330 patch = _make_patch(src, tmp_path)
331
332 target = _init_repo(tmp_path / "target")
333 _commit(target, "empty", {})
334 r = _ap(target, patch, "--check")
335 assert r.exit_code == 0
336
337 def test_check_does_not_write_files(self, tmp_path: pathlib.Path) -> None:
338 src = _init_repo(tmp_path / "src")
339 oid = _write_object(src, b"x = 1\n")
340 _commit(src, "c1", {"a.py": oid})
341 patch = _make_patch(src, tmp_path)
342
343 target = _init_repo(tmp_path / "target")
344 _commit(target, "empty", {})
345 _ap(target, patch, "--check")
346 assert not (target / "a.py").exists()
347
348 def test_check_json_has_applicable_field(self, tmp_path: pathlib.Path) -> None:
349 src = _init_repo(tmp_path / "src")
350 oid = _write_object(src, b"x = 1\n")
351 _commit(src, "c1", {"a.py": oid})
352 patch = _make_patch(src, tmp_path)
353
354 target = _init_repo(tmp_path / "target")
355 _commit(target, "empty", {})
356 data = _json(_ap(target, patch, "--check", "--json"))
357 assert "applicable" in data
358 assert isinstance(data["applicable"], bool)
359
360
361 # ---------------------------------------------------------------------------
362 # Integrity verification
363 # ---------------------------------------------------------------------------
364
365
366 class TestIntegrity:
367 def test_tampered_patch_id_rejected(self, tmp_path: pathlib.Path) -> None:
368 src = _init_repo(tmp_path / "src")
369 oid = _write_object(src, b"x = 1\n")
370 _commit(src, "c1", {"a.py": oid})
371 patch = _make_patch(src, tmp_path)
372
373 # Tamper with patch_id
374 data = json.loads(patch.read_bytes())
375 data["patch_id"] = fake_id("tampered-patch")
376 patch.write_bytes(json.dumps(data).encode())
377
378 target = _init_repo(tmp_path / "target")
379 _commit(target, "empty", {})
380 r = _ap(target, patch)
381 assert r.exit_code != 0
382
383 def test_missing_patch_file_exits_nonzero(self, tmp_path: pathlib.Path) -> None:
384 repo = _init_repo(tmp_path)
385 oid = _write_object(repo, b"x = 1\n")
386 _commit(repo, "init", {"a.py": oid})
387 r = _ap(repo, tmp_path / "nonexistent.mpatch")
388 assert r.exit_code != 0
389
390 def test_corrupt_json_exits_nonzero(self, tmp_path: pathlib.Path) -> None:
391 repo = _init_repo(tmp_path)
392 oid = _write_object(repo, b"x = 1\n")
393 _commit(repo, "init", {"a.py": oid})
394 bad_patch = tmp_path / "bad.mpatch"
395 bad_patch.write_bytes(b"not valid json!!!")
396 r = _ap(repo, bad_patch)
397 assert r.exit_code != 0
398
399
400 import argparse as _argparse
401
402
403 class TestRegisterFlags:
404 def _parse(self, *args: str) -> _argparse.Namespace:
405 from muse.cli.commands.apply_patch import register
406 p = _argparse.ArgumentParser()
407 sub = p.add_subparsers()
408 register(sub)
409 return p.parse_args(["apply-patch", *args])
410
411 def test_default_json_out_is_false(self) -> None:
412 ns = self._parse("dummy.mpatch")
413 assert ns.json_out is False
414
415 def test_json_flag_sets_json_out(self) -> None:
416 ns = self._parse("--json", "dummy.mpatch")
417 assert ns.json_out is True
418
419 def test_j_shorthand_sets_json_out(self) -> None:
420 ns = self._parse("-j", "dummy.mpatch")
421 assert ns.json_out is True
422
423 def test_dry_run_default(self) -> None:
424 ns = self._parse("dummy.mpatch")
425 assert ns.dry_run is False
426
427 def test_dry_run_flag(self) -> None:
428 ns = self._parse("--dry-run", "dummy.mpatch")
429 assert ns.dry_run is True
430
431 def test_dry_run_n_shorthand(self) -> None:
432 ns = self._parse("-n", "dummy.mpatch")
433 assert ns.dry_run is True
434
435 def test_force_default(self) -> None:
436 ns = self._parse("dummy.mpatch")
437 assert ns.force is False
438
439 def test_force_flag(self) -> None:
440 ns = self._parse("--force", "dummy.mpatch")
441 assert ns.force is True
442
443 def test_force_f_shorthand(self) -> None:
444 ns = self._parse("-f", "dummy.mpatch")
445 assert ns.force is True