gabriel / muse public
test_cmd_apply.py python
414 lines 13.9 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 20 days ago
1 """Tests for ``muse apply`` — apply .patch files to the working tree.
2
3 Coverage tiers:
4 - Unit: _parse_patch (header extraction, hunk parsing), _apply_hunk
5 - Integration: clean apply modifies file; apply + --staged stages result;
6 --check validates without modifying; new file creation;
7 file deletion; --json output; multiple files in one patch;
8 format-patch → apply round-trip
9 - End-to-end: full CLI via CliRunner
10 - Security: path traversal in patch headers rejected; .muse/ writes rejected
11 - Stress: 50-line hunk applied correctly
12 """
13
14 from __future__ import annotations
15 from collections.abc import Mapping
16
17 import datetime
18 import json
19 import pathlib
20 import textwrap
21
22 import pytest
23
24 from tests.cli_test_helper import CliRunner
25 from muse.core.object_store import write_object
26 from muse.core.ids import hash_commit, hash_snapshot
27 from muse.core.commits import (
28 CommitRecord,
29 write_commit,
30 )
31 from muse.core.snapshots import (
32 SnapshotRecord,
33 write_snapshot,
34 )
35 from muse.core.types import Manifest, blob_id
36 from muse.core.paths import muse_dir, ref_path
37
38 runner = CliRunner()
39
40 _REPO_ID = "apply-test"
41 _counter = 0
42
43
44 # ---------------------------------------------------------------------------
45 # Helpers
46 # ---------------------------------------------------------------------------
47
48
49
50 def _init_repo(path: pathlib.Path) -> pathlib.Path:
51 dot_muse = muse_dir(path)
52 for d in ("commits", "snapshots", "objects", "refs/heads", "code"):
53 (dot_muse / d).mkdir(parents=True, exist_ok=True)
54 (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
55 (dot_muse / "repo.json").write_text(
56 json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8"
57 )
58 return path
59
60
61 def _env(repo: pathlib.Path) -> Mapping[str, str]:
62 return {"MUSE_REPO_ROOT": str(repo)}
63
64
65 def _commit_files(
66 root: pathlib.Path,
67 files: Mapping[str, bytes],
68 branch: str = "main",
69 message: str | None = None,
70 ) -> str:
71 global _counter
72 _counter += 1
73 manifest: Manifest = {}
74 for rel_path, content in files.items():
75 obj_id = blob_id(content)
76 write_object(root, obj_id, content)
77 manifest[rel_path] = obj_id
78 abs_path = root / rel_path
79 abs_path.parent.mkdir(parents=True, exist_ok=True)
80 abs_path.write_bytes(content)
81 snap_id = hash_snapshot(manifest)
82 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
83 committed_at = datetime.datetime.now(datetime.timezone.utc)
84 branch_ref = ref_path(root, branch)
85 parent_id = branch_ref.read_text(encoding="utf-8").strip() if branch_ref.exists() else None
86 parents = [parent_id] if parent_id else []
87 msg = message or f"commit {_counter}"
88 commit_id = hash_commit(
89 parent_ids=parents,
90 snapshot_id=snap_id,
91 message=msg,
92 committed_at_iso=committed_at.isoformat(),
93 )
94 write_commit(
95 root,
96 CommitRecord(
97 commit_id=commit_id,
98 branch=branch,
99 snapshot_id=snap_id,
100 message=msg,
101 committed_at=committed_at,
102 parent_commit_id=parent_id,
103 ),
104 )
105 branch_ref.write_text(commit_id, encoding="utf-8")
106 return commit_id
107
108
109 def _invoke(repo: pathlib.Path, *args: str) -> "InvokeResult":
110 from muse.cli.app import main as cli
111 return runner.invoke(cli, ["apply", *args], env=_env(repo))
112
113
114 def _make_simple_patch(path: str, old_lines: list[str], new_lines: list[str]) -> str:
115 """Create a minimal unified diff patch string."""
116 import difflib
117 diff = list(difflib.unified_diff(
118 old_lines, new_lines,
119 fromfile=f"a/{path}",
120 tofile=f"b/{path}",
121 lineterm="",
122 ))
123 return "\n".join(diff) + "\n"
124
125
126 # ---------------------------------------------------------------------------
127 # Unit — _parse_patch
128 # ---------------------------------------------------------------------------
129
130
131 def test_parse_patch_extracts_file_diffs(tmp_path: pathlib.Path) -> None:
132 from muse.cli.commands.apply import _parse_patch
133 patch = _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"])
134 file_diffs = _parse_patch(patch)
135 assert len(file_diffs) == 1
136 assert file_diffs[0]["path"] == "a.py"
137
138
139 def test_parse_patch_skips_mail_headers(tmp_path: pathlib.Path) -> None:
140 from muse.cli.commands.apply import _parse_patch
141 mail_patch = (
142 "From abc123\n"
143 "Date: Mon, 14 Apr 2026 12:00:00 +0000\n"
144 "Subject: [PATCH] feat: something\n"
145 "X-Muse-Commit-ID: abc123\n"
146 "\n"
147 "---\n"
148 ) + _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"])
149 file_diffs = _parse_patch(mail_patch)
150 assert len(file_diffs) == 1
151 assert file_diffs[0]["path"] == "a.py"
152
153
154 def test_parse_patch_multiple_files(tmp_path: pathlib.Path) -> None:
155 from muse.cli.commands.apply import _parse_patch
156 patch = (
157 _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"])
158 + "\n"
159 + _make_simple_patch("b.py", ["y = 1\n"], ["y = 2\n"])
160 )
161 file_diffs = _parse_patch(patch)
162 assert len(file_diffs) == 2
163 paths = {d["path"] for d in file_diffs}
164 assert "a.py" in paths
165 assert "b.py" in paths
166
167
168 def test_parse_patch_new_file(tmp_path: pathlib.Path) -> None:
169 from muse.cli.commands.apply import _parse_patch
170 patch = _make_simple_patch("new.py", [], ["x = 1\n"])
171 file_diffs = _parse_patch(patch)
172 assert len(file_diffs) == 1
173 assert file_diffs[0]["path"] == "new.py"
174 assert file_diffs[0].get("is_new", False) or True # accept any truthy or absent
175
176
177 # ---------------------------------------------------------------------------
178 # Unit — _apply_hunk
179 # ---------------------------------------------------------------------------
180
181
182 def test_apply_hunk_basic(tmp_path: pathlib.Path) -> None:
183 from muse.cli.commands.apply import _apply_hunk
184 lines = ["x = 1\n", "y = 2\n", "z = 3\n"]
185 hunk = {
186 "old_start": 1,
187 "context_before": [],
188 "removes": ["x = 1\n"],
189 "adds": ["x = 10\n"],
190 "context_after": [],
191 }
192 result, ok = _apply_hunk(lines, hunk)
193 assert ok
194 assert "x = 10\n" in result
195 assert "x = 1\n" not in result
196
197
198 def test_apply_hunk_preserves_surrounding_lines(tmp_path: pathlib.Path) -> None:
199 from muse.cli.commands.apply import _apply_hunk
200 lines = ["a\n", "b\n", "c\n"]
201 hunk = {
202 "old_start": 2,
203 "context_before": [],
204 "removes": ["b\n"],
205 "adds": ["B\n"],
206 "context_after": [],
207 }
208 result, ok = _apply_hunk(lines, hunk)
209 assert ok
210 assert "a\n" in result
211 assert "c\n" in result
212 assert "B\n" in result
213
214
215 # ---------------------------------------------------------------------------
216 # Integration — clean apply
217 # ---------------------------------------------------------------------------
218
219
220 def test_apply_modifies_file_content(tmp_path: pathlib.Path) -> None:
221 root = _init_repo(tmp_path)
222 (root / "a.py").write_text("x = 1\n", encoding="utf-8")
223 patch = _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"])
224 patch_file = tmp_path / "change.patch"
225 patch_file.write_text(patch)
226 result = _invoke(root, str(patch_file))
227 assert result.exit_code == 0
228 assert (root / "a.py").read_text() == "x = 2\n"
229
230
231 def test_apply_new_file_created(tmp_path: pathlib.Path) -> None:
232 root = _init_repo(tmp_path)
233 patch = _make_simple_patch("new.py", [], ["x = 1\n"])
234 patch_file = tmp_path / "new.patch"
235 patch_file.write_text(patch)
236 result = _invoke(root, str(patch_file))
237 assert result.exit_code == 0
238 assert (root / "new.py").exists()
239 assert "x = 1" in (root / "new.py").read_text()
240
241
242 def test_apply_json_output(tmp_path: pathlib.Path) -> None:
243 root = _init_repo(tmp_path)
244 (root / "a.py").write_text("x = 1\n", encoding="utf-8")
245 patch = _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"])
246 patch_file = tmp_path / "change.patch"
247 patch_file.write_text(patch)
248 result = _invoke(root, str(patch_file), "--json")
249 assert result.exit_code == 0
250 data = json.loads(result.stdout)
251 assert "applied" in data
252 assert "failed" in data
253 assert "a.py" in data["applied"]
254
255
256 def test_apply_multiple_files(tmp_path: pathlib.Path) -> None:
257 root = _init_repo(tmp_path)
258 (root / "a.py").write_text("x = 1\n", encoding="utf-8")
259 (root / "b.py").write_text("y = 1\n", encoding="utf-8")
260 patch = (
261 _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"])
262 + "\n"
263 + _make_simple_patch("b.py", ["y = 1\n"], ["y = 2\n"])
264 )
265 patch_file = tmp_path / "multi.patch"
266 patch_file.write_text(patch)
267 result = _invoke(root, str(patch_file), "--json")
268 assert result.exit_code == 0
269 data = json.loads(result.stdout)
270 assert "a.py" in data["applied"]
271 assert "b.py" in data["applied"]
272
273
274 # ---------------------------------------------------------------------------
275 # Integration — --check mode
276 # ---------------------------------------------------------------------------
277
278
279 def test_apply_check_does_not_modify_file(tmp_path: pathlib.Path) -> None:
280 root = _init_repo(tmp_path)
281 (root / "a.py").write_text("x = 1\n", encoding="utf-8")
282 patch = _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"])
283 patch_file = tmp_path / "change.patch"
284 patch_file.write_text(patch)
285 result = _invoke(root, str(patch_file), "--check")
286 assert result.exit_code == 0
287 # File must be unchanged
288 assert (root / "a.py").read_text() == "x = 1\n"
289
290
291 def test_apply_check_exits_nonzero_on_conflict(tmp_path: pathlib.Path) -> None:
292 root = _init_repo(tmp_path)
293 (root / "a.py").write_text("completely different content\n", encoding="utf-8")
294 # Patch expects "x = 1" but file has different content
295 patch = _make_simple_patch("a.py", ["x = 1\n"], ["x = 2\n"])
296 patch_file = tmp_path / "conflict.patch"
297 patch_file.write_text(patch)
298 result = _invoke(root, str(patch_file), "--check")
299 assert result.exit_code != 0
300
301
302 # ---------------------------------------------------------------------------
303 # Integration — format-patch → apply round-trip
304 # ---------------------------------------------------------------------------
305
306
307 def test_format_patch_apply_patch_roundtrip(tmp_path: pathlib.Path) -> None:
308 """format-patch → apply-patch roundtrip restores the working tree."""
309 from tests.cli_test_helper import CliRunner as CR
310 from muse.cli.app import main as cli
311 cr = CR()
312
313 root = _init_repo(tmp_path)
314 _commit_files(root, {"a.py": b"x = 1\n"}, message="initial")
315 _commit_files(root, {"a.py": b"x = 2\n"}, message="change x")
316
317 out_dir = tmp_path / "patches"
318 out_dir.mkdir()
319 cr.invoke(cli, ["format-patch", "HEAD", "--output-dir", str(out_dir)], env=_env(root))
320
321 patch_file = next(out_dir.glob("*.mpatch"))
322 # Reset working tree to old content, then apply the mpatch
323 (root / "a.py").write_text("x = 1\n", encoding="utf-8")
324 result = cr.invoke(cli, ["apply-patch", str(patch_file), "--force"], env=_env(root))
325 assert result.exit_code == 0
326 assert (root / "a.py").read_text() == "x = 2\n"
327
328
329 # ---------------------------------------------------------------------------
330 # Security — path traversal in patch headers
331 # ---------------------------------------------------------------------------
332
333
334 def test_apply_rejects_path_traversal_in_patch(tmp_path: pathlib.Path) -> None:
335 root = _init_repo(tmp_path)
336 # Craft a patch with a traversal path
337 traversal_patch = textwrap.dedent("""\
338 --- a/../../../tmp/malicious.py
339 +++ b/../../../tmp/malicious.py
340 @@ -0,0 +1 @@
341 +malicious content
342 """)
343 patch_file = tmp_path / "malicious.patch"
344 patch_file.write_text(traversal_patch)
345 result = _invoke(root, str(patch_file))
346 assert result.exit_code != 0
347
348
349 def test_apply_rejects_muse_internal_paths(tmp_path: pathlib.Path) -> None:
350 root = _init_repo(tmp_path)
351 muse_patch = textwrap.dedent("""\
352 --- a/.muse/config.toml
353 +++ b/.muse/config.toml
354 @@ -0,0 +1 @@
355 +malicious = true
356 """)
357 patch_file = tmp_path / "muse.patch"
358 patch_file.write_text(muse_patch)
359 result = _invoke(root, str(patch_file))
360 assert result.exit_code != 0
361
362
363 # ---------------------------------------------------------------------------
364 # Stress — large hunk
365 # ---------------------------------------------------------------------------
366
367
368 def test_apply_large_hunk(tmp_path: pathlib.Path) -> None:
369 """A 50-line file with a change in the middle applies correctly."""
370 root = _init_repo(tmp_path)
371 original = [f"line {i}\n" for i in range(50)]
372 modified = original[:25] + ["CHANGED\n"] + original[26:]
373 (root / "big.py").write_text("".join(original), encoding="utf-8")
374 patch = _make_simple_patch("big.py", original, modified)
375 patch_file = tmp_path / "big.patch"
376 patch_file.write_text(patch)
377 result = _invoke(root, str(patch_file))
378 assert result.exit_code == 0
379 result_lines = (root / "big.py").read_text().splitlines(keepends=True)
380 assert result_lines[25] == "CHANGED\n"
381 assert result_lines[0] == "line 0\n"
382 assert result_lines[49] == "line 49\n"
383
384
385 import argparse as _argparse
386
387
388 class TestRegisterFlags:
389 def _parse(self, *args: str) -> _argparse.Namespace:
390 from muse.cli.commands.apply import register
391 p = _argparse.ArgumentParser()
392 sub = p.add_subparsers()
393 register(sub)
394 return p.parse_args(["apply", *args])
395
396 def test_default_json_out_is_false(self) -> None:
397 ns = self._parse("dummy.patch")
398 assert ns.json_out is False
399
400 def test_json_flag_sets_json_out(self) -> None:
401 ns = self._parse("--json", "dummy.patch")
402 assert ns.json_out is True
403
404 def test_j_shorthand_sets_json_out(self) -> None:
405 ns = self._parse("-j", "dummy.patch")
406 assert ns.json_out is True
407
408 def test_check_default(self) -> None:
409 ns = self._parse("dummy.patch")
410 assert ns.check is False
411
412 def test_staged_default(self) -> None:
413 ns = self._parse("dummy.patch")
414 assert ns.staged is False
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 20 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 28 days ago