gabriel / muse public
test_cmd_restore.py python
464 lines 15.9 KB
Raw
1 """Tests for ``muse restore`` — working-tree and stage file restoration.
2
3 Coverage tiers:
4 - Unit: _resolve_source_manifest, _resolve_file_path helpers
5 - Integration: restore worktree from HEAD (default), restore --staged (unstage),
6 restore --staged --worktree (full reset), --source <ref>,
7 multiple paths, glob patterns, --dry-run, --json
8 - End-to-end: full CLI via CliRunner
9 - Security: path traversal rejected, outside-repo paths rejected
10 - Edge cases: file not in HEAD/source, staged-only file restore
11 - Stress: restore 100 modified files
12 """
13
14 from __future__ import annotations
15 from collections.abc import Mapping
16
17 import datetime
18 import json
19 import pathlib
20
21 import pytest
22
23 from tests.cli_test_helper import CliRunner
24
25 from muse.core.object_store import write_object
26 from muse.core.ids import hash_commit, hash_snapshot
27 from muse.core.commits import (
28 CommitRecord,
29 write_commit,
30 )
31 from muse.core.snapshots import (
32 SnapshotRecord,
33 write_snapshot,
34 )
35 from muse.core.types import Manifest, blob_id
36 from muse.plugins.code.stage import StagedFileMap, make_entry, read_stage, write_stage
37 from muse.core.paths import muse_dir, ref_path
38
39 runner = CliRunner()
40
41 _REPO_ID = "restore-test"
42
43
44 # ---------------------------------------------------------------------------
45 # Helpers
46 # ---------------------------------------------------------------------------
47
48
49
50
51 _counter = 0
52
53
54 def _init_repo(path: pathlib.Path) -> pathlib.Path:
55 dot_muse = muse_dir(path)
56 for d in ("commits", "snapshots", "objects", "refs/heads", "code"):
57 (dot_muse / d).mkdir(parents=True, exist_ok=True)
58 (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
59 (dot_muse / "repo.json").write_text(
60 json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8"
61 )
62 return path
63
64
65 def _env(repo: pathlib.Path) -> Mapping[str, str]:
66 return {"MUSE_REPO_ROOT": str(repo)}
67
68
69 def _commit_files(root: pathlib.Path, files: Mapping[str, bytes], branch: str = "main") -> str:
70 global _counter
71 _counter += 1
72 manifest: Manifest = {}
73 for rel_path, content in files.items():
74 obj_id = blob_id(content)
75 write_object(root, obj_id, content)
76 manifest[rel_path] = obj_id
77 abs_path = root / rel_path
78 abs_path.parent.mkdir(parents=True, exist_ok=True)
79 abs_path.write_bytes(content)
80 snap_id = hash_snapshot(manifest)
81 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
82 committed_at = datetime.datetime.now(datetime.timezone.utc)
83 commit_id = hash_commit(
84 parent_ids=[],
85 snapshot_id=snap_id,
86 message=f"commit {_counter}",
87 committed_at_iso=committed_at.isoformat(),
88 )
89 write_commit(
90 root,
91 CommitRecord(
92 commit_id=commit_id,
93 branch=branch,
94 snapshot_id=snap_id,
95 message=f"commit {_counter}",
96 committed_at=committed_at,
97 ),
98 )
99 (ref_path(root, branch)).write_text(commit_id, encoding="utf-8")
100 return commit_id
101
102
103 def _invoke(repo: pathlib.Path, *args: str) -> "InvokeResult":
104 from muse.cli.app import main as cli
105 return runner.invoke(cli, ["restore", *args], env=_env(repo))
106
107
108 # ---------------------------------------------------------------------------
109 # Unit — helpers
110 # ---------------------------------------------------------------------------
111
112
113 def test_resolve_source_manifest_from_head(tmp_path: pathlib.Path) -> None:
114 from muse.cli.commands.restore import _resolve_source_manifest
115 root = _init_repo(tmp_path)
116 content = b"hello\n"
117 _commit_files(root, {"f.py": content})
118 manifest = _resolve_source_manifest(root, source_ref=None)
119 assert "f.py" in manifest
120 assert manifest["f.py"] == blob_id(content)
121
122
123 def test_resolve_source_manifest_empty_repo(tmp_path: pathlib.Path) -> None:
124 from muse.cli.commands.restore import _resolve_source_manifest
125 root = _init_repo(tmp_path)
126 # No commits yet — should return empty dict, not raise
127 manifest = _resolve_source_manifest(root, source_ref=None)
128 assert manifest == {}
129
130
131 def test_resolve_file_path_inside_repo(tmp_path: pathlib.Path) -> None:
132 from muse.cli.commands.restore import _resolve_file_path
133 root = _init_repo(tmp_path)
134 rel = _resolve_file_path(root, "src/main.py")
135 assert rel == "src/main.py"
136
137
138 def test_resolve_file_path_traversal_raises(tmp_path: pathlib.Path) -> None:
139 from muse.cli.commands.restore import _resolve_file_path
140 root = _init_repo(tmp_path)
141 with pytest.raises(SystemExit) as exc:
142 _resolve_file_path(root, "../../../etc/passwd")
143 assert exc.value.code != 0
144
145
146 # ---------------------------------------------------------------------------
147 # Integration — restore worktree (default)
148 # ---------------------------------------------------------------------------
149
150
151 def test_restore_worktree_overwrites_modified_file(tmp_path: pathlib.Path) -> None:
152 root = _init_repo(tmp_path)
153 original = b"# original\n"
154 _commit_files(root, {"a.py": original})
155 # Modify on disk
156 (root / "a.py").write_bytes(b"# dirty\n")
157
158 result = _invoke(root, "a.py")
159 assert result.exit_code == 0
160 assert (root / "a.py").read_bytes() == original
161
162
163 def test_restore_worktree_does_not_touch_stage(tmp_path: pathlib.Path) -> None:
164 root = _init_repo(tmp_path)
165 _commit_files(root, {"a.py": b"# orig\n"})
166 # Stage a modification
167 new_content = b"# staged\n"
168 obj_id = blob_id(new_content)
169 write_object(root, obj_id, new_content)
170 stage = read_stage(root)
171 stage["a.py"] = make_entry(obj_id, "M")
172 write_stage(root, stage)
173 # Dirty the disk
174 (root / "a.py").write_bytes(b"# dirty\n")
175
176 _invoke(root, "a.py")
177 # Stage must be untouched
178 stage_after = read_stage(root)
179 assert "a.py" in stage_after
180 assert stage_after["a.py"]["mode"] == "M"
181
182
183 def test_restore_worktree_from_staged_content(tmp_path: pathlib.Path) -> None:
184 """When a file is staged, default restore pulls from the staged object_id."""
185 root = _init_repo(tmp_path)
186 _commit_files(root, {"b.py": b"# head\n"})
187 staged_content = b"# staged version\n"
188 obj_id = blob_id(staged_content)
189 write_object(root, obj_id, staged_content)
190 stage = read_stage(root)
191 stage["b.py"] = make_entry(obj_id, "M")
192 write_stage(root, stage)
193 (root / "b.py").write_bytes(b"# dirty\n")
194
195 _invoke(root, "b.py")
196 assert (root / "b.py").read_bytes() == staged_content
197
198
199 def test_restore_exit_zero_on_success(tmp_path: pathlib.Path) -> None:
200 root = _init_repo(tmp_path)
201 _commit_files(root, {"a.py": b"# a\n"})
202 (root / "a.py").write_bytes(b"# dirty\n")
203 result = _invoke(root, "a.py")
204 assert result.exit_code == 0
205
206
207 def test_restore_file_not_in_head_exits_nonzero(tmp_path: pathlib.Path) -> None:
208 root = _init_repo(tmp_path)
209 _commit_files(root, {"other.py": b"# o\n"})
210 result = _invoke(root, "ghost.py")
211 assert result.exit_code != 0
212
213
214 # ---------------------------------------------------------------------------
215 # Integration — restore --staged
216 # ---------------------------------------------------------------------------
217
218
219 def test_restore_staged_removes_modification(tmp_path: pathlib.Path) -> None:
220 root = _init_repo(tmp_path)
221 _commit_files(root, {"a.py": b"# orig\n"})
222 obj_id = blob_id(b"# modified\n")
223 write_object(root, obj_id, b"# modified\n")
224 stage: StagedFileMap = {"a.py": make_entry(obj_id, "M")}
225 write_stage(root, stage)
226
227 result = _invoke(root, "--staged", "a.py")
228 assert result.exit_code == 0
229 stage_after = read_stage(root)
230 assert "a.py" not in stage_after
231
232
233 def test_restore_staged_does_not_touch_disk(tmp_path: pathlib.Path) -> None:
234 root = _init_repo(tmp_path)
235 _commit_files(root, {"a.py": b"# orig\n"})
236 modified_content = b"# modified\n"
237 obj_id = blob_id(modified_content)
238 write_object(root, obj_id, modified_content)
239 stage: StagedFileMap = {"a.py": make_entry(obj_id, "M")}
240 write_stage(root, stage)
241 (root / "a.py").write_bytes(modified_content)
242
243 _invoke(root, "--staged", "a.py")
244 # Disk still has the modified content
245 assert (root / "a.py").read_bytes() == modified_content
246
247
248 def test_restore_staged_removes_added_file(tmp_path: pathlib.Path) -> None:
249 """Unstaging a brand-new file (mode 'A', not in HEAD) removes it from stage."""
250 root = _init_repo(tmp_path)
251 _commit_files(root, {"anchor.py": b"# anchor\n"})
252 content = b"# new\n"
253 obj_id = blob_id(content)
254 write_object(root, obj_id, content)
255 (root / "new.py").write_bytes(content)
256 stage: StagedFileMap = {"new.py": make_entry(obj_id, "A")}
257 write_stage(root, stage)
258
259 result = _invoke(root, "--staged", "new.py")
260 assert result.exit_code == 0
261 stage_after = read_stage(root)
262 assert "new.py" not in stage_after
263 # Disk file untouched
264 assert (root / "new.py").exists()
265
266
267 def test_restore_staged_undeletes_from_stage(tmp_path: pathlib.Path) -> None:
268 """Restoring --staged a deleted file removes the 'D' tombstone."""
269 root = _init_repo(tmp_path)
270 _commit_files(root, {"gone.py": b"# original\n"})
271 stage: StagedFileMap = {"gone.py": make_entry("", "D")}
272 write_stage(root, stage)
273
274 result = _invoke(root, "--staged", "gone.py")
275 assert result.exit_code == 0
276 stage_after = read_stage(root)
277 assert "gone.py" not in stage_after
278
279
280 def test_restore_staged_not_staged_is_noop(tmp_path: pathlib.Path) -> None:
281 """Restoring --staged a file that isn't staged is a clean no-op."""
282 root = _init_repo(tmp_path)
283 _commit_files(root, {"a.py": b"# a\n"})
284 result = _invoke(root, "--staged", "a.py")
285 assert result.exit_code == 0
286
287
288 # ---------------------------------------------------------------------------
289 # Integration — restore --staged --worktree (full reset)
290 # ---------------------------------------------------------------------------
291
292
293 def test_restore_staged_worktree_resets_both(tmp_path: pathlib.Path) -> None:
294 """--staged --worktree restores disk and clears stage entry."""
295 root = _init_repo(tmp_path)
296 original = b"# original\n"
297 _commit_files(root, {"f.py": original})
298 modified = b"# modified\n"
299 obj_id = blob_id(modified)
300 write_object(root, obj_id, modified)
301 stage: StagedFileMap = {"f.py": make_entry(obj_id, "M")}
302 write_stage(root, stage)
303 (root / "f.py").write_bytes(modified)
304
305 result = _invoke(root, "--staged", "--worktree", "f.py")
306 assert result.exit_code == 0
307 assert (root / "f.py").read_bytes() == original
308 stage_after = read_stage(root)
309 assert "f.py" not in stage_after
310
311
312 # ---------------------------------------------------------------------------
313 # Integration — --source <ref>
314 # ---------------------------------------------------------------------------
315
316
317 def test_restore_source_ref_restores_from_commit(tmp_path: pathlib.Path) -> None:
318 """--source <commit_id> restores file from that commit's manifest."""
319 root = _init_repo(tmp_path)
320 v1 = b"# version 1\n"
321 commit_v1 = _commit_files(root, {"versioned.py": v1})
322 # Now update the file in HEAD
323 v2 = b"# version 2\n"
324 _commit_files(root, {"versioned.py": v2})
325 # Disk now has v2; restore to v1 using the first commit id
326 (root / "versioned.py").write_bytes(b"# dirty\n")
327
328 result = _invoke(root, "--source", commit_v1, "versioned.py")
329 assert result.exit_code == 0
330 assert (root / "versioned.py").read_bytes() == v1
331
332
333 def test_restore_source_ref_not_found_exits_nonzero(tmp_path: pathlib.Path) -> None:
334 root = _init_repo(tmp_path)
335 _commit_files(root, {"a.py": b"# a\n"})
336 result = _invoke(root, "--source", "nonexistent-ref", "a.py")
337 assert result.exit_code != 0
338
339
340 def test_restore_source_file_not_in_that_commit_exits_nonzero(tmp_path: pathlib.Path) -> None:
341 root = _init_repo(tmp_path)
342 v1_commit = _commit_files(root, {"only_in_v1.py": b"# v1\n"})
343 _commit_files(root, {"v2_only.py": b"# v2\n"})
344
345 result = _invoke(root, "--source", v1_commit, "v2_only.py")
346 assert result.exit_code != 0
347
348
349 # ---------------------------------------------------------------------------
350 # Integration -- multiple paths
351 # ---------------------------------------------------------------------------
352
353
354 def test_restore_multiple_paths(tmp_path: pathlib.Path) -> None:
355 root = _init_repo(tmp_path)
356 orig_a = b"# a orig\n"
357 orig_b = b"# b orig\n"
358 _commit_files(root, {"a.py": orig_a, "b.py": orig_b})
359 (root / "a.py").write_bytes(b"# a dirty\n")
360 (root / "b.py").write_bytes(b"# b dirty\n")
361
362 result = _invoke(root, "a.py", "b.py")
363 assert result.exit_code == 0
364 assert (root / "a.py").read_bytes() == orig_a
365 assert (root / "b.py").read_bytes() == orig_b
366
367
368 # ---------------------------------------------------------------------------
369 # Integration — --dry-run
370 # ---------------------------------------------------------------------------
371
372
373 def test_restore_dry_run_no_disk_change(tmp_path: pathlib.Path) -> None:
374 root = _init_repo(tmp_path)
375 _commit_files(root, {"a.py": b"# orig\n"})
376 dirty = b"# dirty\n"
377 (root / "a.py").write_bytes(dirty)
378
379 result = _invoke(root, "--dry-run", "a.py")
380 assert result.exit_code == 0
381 assert (root / "a.py").read_bytes() == dirty
382
383
384 def test_restore_dry_run_no_stage_change(tmp_path: pathlib.Path) -> None:
385 root = _init_repo(tmp_path)
386 _commit_files(root, {"a.py": b"# orig\n"})
387 obj_id = blob_id(b"# modified\n")
388 write_object(root, obj_id, b"# modified\n")
389 stage: StagedFileMap = {"a.py": make_entry(obj_id, "M")}
390 write_stage(root, stage)
391
392 _invoke(root, "--dry-run", "--staged", "a.py")
393 stage_after = read_stage(root)
394 assert "a.py" in stage_after # stage unchanged
395
396
397 def test_restore_dry_run_json(tmp_path: pathlib.Path) -> None:
398 root = _init_repo(tmp_path)
399 _commit_files(root, {"a.py": b"# orig\n"})
400 (root / "a.py").write_bytes(b"# dirty\n")
401
402 result = _invoke(root, "--dry-run", "--json", "a.py")
403 assert result.exit_code == 0
404 data = json.loads(result.stdout)
405 assert data["dry_run"] is True
406 assert "a.py" in data.get("restored", []) or len(data.get("paths", [])) >= 1
407
408
409 # ---------------------------------------------------------------------------
410 # Integration — --json
411 # ---------------------------------------------------------------------------
412
413
414 def test_restore_json_output_structure(tmp_path: pathlib.Path) -> None:
415 root = _init_repo(tmp_path)
416 _commit_files(root, {"j.py": b"# j\n"})
417 (root / "j.py").write_bytes(b"# dirty\n")
418
419 result = _invoke(root, "--json", "j.py")
420 assert result.exit_code == 0
421 data = json.loads(result.stdout)
422 assert "restored" in data
423 assert "j.py" in data["restored"]
424 assert data["dry_run"] is False
425
426
427 # ---------------------------------------------------------------------------
428 # Security
429 # ---------------------------------------------------------------------------
430
431
432 def test_restore_path_traversal_rejected(tmp_path: pathlib.Path) -> None:
433 root = _init_repo(tmp_path)
434 _commit_files(root, {"anchor.py": b"# a\n"})
435 result = _invoke(root, "../../../etc/passwd")
436 assert result.exit_code != 0
437
438
439 def test_restore_staged_path_traversal_rejected(tmp_path: pathlib.Path) -> None:
440 root = _init_repo(tmp_path)
441 _commit_files(root, {"anchor.py": b"# a\n"})
442 result = _invoke(root, "--staged", "../../malicious.py")
443 assert result.exit_code != 0
444
445
446 # ---------------------------------------------------------------------------
447 # Stress
448 # ---------------------------------------------------------------------------
449
450
451 def test_restore_100_modified_files(tmp_path: pathlib.Path) -> None:
452 """Restore 100 modified files in one invocation."""
453 root = _init_repo(tmp_path)
454 originals = {f"f{i}.py": f"# orig {i}\n".encode() for i in range(100)}
455 _commit_files(root, originals)
456
457 # Dirty all 100
458 for name in originals:
459 (root / name).write_bytes(b"# dirty\n")
460
461 result = _invoke(root, *originals.keys())
462 assert result.exit_code == 0
463 for name, orig_content in originals.items():
464 assert (root / name).read_bytes() == orig_content, f"{name} not restored"
File History 1 commit