gabriel / muse public

test_mv_supercharge.py file-level

at sha256:f · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 💥 blast risk
sha256:b adding issues docs to bust staging mpack prebuild cache. · gabriel · Jun 20, 2026
1 """Supercharge tests for ``muse mv``.
2
3 Coverage tiers
4 --------------
5 - JSON envelope: exit_code and duration_ms present on success and dry_run
6 - Error payload: errors go to stdout as JSON in --json mode, no dual stderr prose
7 - Data integrity: sha256: OID prefix preserved; source/dest echoed verbatim
8 - TypedDicts: _MvResultJson and _MvErrorJson with required annotations
9 - Docstring: module docstring covers exit_code and duration_ms
10 - No-prose pollution: JSON stdout is valid on all non-error paths
11 - Stress: 50-file move-all with --json, correct stage state
12 """
13 from __future__ import annotations
14 from collections.abc import Mapping
15
16 import datetime
17 import argparse
18 import json
19 import pathlib
20 from typing import get_type_hints
21
22 from muse.core.object_store import write_object
23 from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id
24 from muse.core.commits import (
25 CommitRecord,
26 write_commit,
27 )
28 from muse.core.snapshots import (
29 SnapshotRecord,
30 write_snapshot,
31 )
32 from muse.core.types import Manifest, blob_id
33 from muse.plugins.code.stage import make_entry, read_stage, write_stage
34 from muse.core.paths import heads_dir, muse_dir
35 from tests.cli_test_helper import CliRunner, InvokeResult
36
37 runner = CliRunner()
38
39 _REPO_ID = "mv-supercharge"
40 _DT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
41
42
43 # ---------------------------------------------------------------------------
44 # Helpers
45 # ---------------------------------------------------------------------------
46
47
48
49
50 def _init_repo(tmp_path: pathlib.Path) -> pathlib.Path:
51 dot_muse = muse_dir(tmp_path)
52 for d in ("commits", "snapshots", "objects", "refs/heads", "code"):
53 (dot_muse / d).mkdir(parents=True, exist_ok=True)
54 (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
55 (dot_muse / "repo.json").write_text(
56 json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8"
57 )
58 return tmp_path
59
60
61 def _env(root: pathlib.Path) -> Mapping[str, str]:
62 return {"MUSE_REPO_ROOT": str(root)}
63
64
65 def _commit(root: pathlib.Path, files: Mapping[str, bytes]) -> str:
66 manifest: Manifest = {}
67 for rel, content in files.items():
68 oid = blob_id(content)
69 write_object(root, oid, content)
70 manifest[rel] = oid
71 abs_p = root / rel
72 abs_p.parent.mkdir(parents=True, exist_ok=True)
73 abs_p.write_bytes(content)
74 snap_id = compute_snapshot_id(manifest)
75 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest, created_at=_DT))
76 cid = compute_commit_id( parent_ids=[],
77 snapshot_id=snap_id,
78 message="test",
79 committed_at_iso=_DT.isoformat(),
80 )
81 write_commit(root, CommitRecord(
82 commit_id=cid, branch="main",
83 snapshot_id=snap_id, message="test", committed_at=_DT,
84 ))
85 (heads_dir(root) / "main").write_text(cid, encoding="utf-8")
86 return cid
87
88
89 def _mv(root: pathlib.Path, *args: str) -> InvokeResult:
90 from muse.cli.app import main as cli
91 return runner.invoke(cli, ["mv", *args], env=_env(root))
92
93
94 # ---------------------------------------------------------------------------
95 # JSON envelope — exit_code
96 # ---------------------------------------------------------------------------
97
98
99 class TestJsonEnvelopeExitCode:
100 def test_success_has_exit_code_zero(self, tmp_path: pathlib.Path) -> None:
101 root = _init_repo(tmp_path)
102 _commit(root, {"a.py": b"# a\n"})
103 r = _mv(root, "--json", "a.py", "b.py")
104 assert r.exit_code == 0
105 d = json.loads(r.output)
106 assert "exit_code" in d, "exit_code missing from success envelope"
107 assert d["exit_code"] == 0
108
109 def test_dry_run_has_exit_code_zero(self, tmp_path: pathlib.Path) -> None:
110 root = _init_repo(tmp_path)
111 _commit(root, {"a.py": b"# a\n"})
112 r = _mv(root, "--json", "--dry-run", "a.py", "b.py")
113 assert r.exit_code == 0
114 d = json.loads(r.output)
115 assert "exit_code" in d, "exit_code missing from dry_run envelope"
116 assert d["exit_code"] == 0
117
118 def test_error_payload_has_exit_code_nonzero(self, tmp_path: pathlib.Path) -> None:
119 root = _init_repo(tmp_path)
120 _commit(root, {"anchor.py": b"# a\n"})
121 r = _mv(root, "--json", "ghost.py", "dest.py")
122 d = json.loads(r.output)
123 assert "exit_code" in d
124 assert d["exit_code"] != 0
125
126
127 # ---------------------------------------------------------------------------
128 # JSON envelope — duration_ms
129 # ---------------------------------------------------------------------------
130
131
132 class TestJsonEnvelopeDurationMs:
133 def test_success_has_duration_ms(self, tmp_path: pathlib.Path) -> None:
134 root = _init_repo(tmp_path)
135 _commit(root, {"a.py": b"# a\n"})
136 r = _mv(root, "--json", "a.py", "b.py")
137 d = json.loads(r.output)
138 assert "duration_ms" in d, "duration_ms missing from success envelope"
139 assert isinstance(d["duration_ms"], float)
140 assert d["duration_ms"] >= 0.0
141
142 def test_dry_run_has_duration_ms(self, tmp_path: pathlib.Path) -> None:
143 root = _init_repo(tmp_path)
144 _commit(root, {"a.py": b"# a\n"})
145 r = _mv(root, "--json", "--dry-run", "a.py", "b.py")
146 d = json.loads(r.output)
147 assert "duration_ms" in d, "duration_ms missing from dry_run envelope"
148 assert isinstance(d["duration_ms"], float)
149
150 def test_force_move_has_duration_ms(self, tmp_path: pathlib.Path) -> None:
151 root = _init_repo(tmp_path)
152 _commit(root, {"src.py": b"# s\n", "dst.py": b"# d\n"})
153 r = _mv(root, "--json", "--force", "src.py", "dst.py")
154 d = json.loads(r.output)
155 assert "duration_ms" in d
156
157
158 # ---------------------------------------------------------------------------
159 # Error payload — errors route to stdout as JSON in --json mode
160 # ---------------------------------------------------------------------------
161
162
163 class TestErrorPayload:
164 def test_untracked_source_error_goes_to_stdout(self, tmp_path: pathlib.Path) -> None:
165 root = _init_repo(tmp_path)
166 _commit(root, {"anchor.py": b"# a\n"})
167 (root / "ghost.py").write_text("# untracked\n")
168 r = _mv(root, "--json", "ghost.py", "dest.py")
169 assert r.exit_code != 0
170 assert not r.stderr.strip(), f"unexpected stderr: {r.stderr!r}"
171 d = json.loads(r.output)
172 assert d["status"] == "error"
173
174 def test_missing_disk_source_error_goes_to_stdout(self, tmp_path: pathlib.Path) -> None:
175 root = _init_repo(tmp_path)
176 _commit(root, {"a.py": b"# a\n"})
177 (root / "a.py").unlink()
178 r = _mv(root, "--json", "a.py", "b.py")
179 assert r.exit_code != 0
180 assert not r.stderr.strip(), f"unexpected stderr: {r.stderr!r}"
181 d = json.loads(r.output)
182 assert d["status"] == "error"
183
184 def test_dest_already_tracked_error_goes_to_stdout(self, tmp_path: pathlib.Path) -> None:
185 root = _init_repo(tmp_path)
186 _commit(root, {"src.py": b"# s\n", "dst.py": b"# d\n"})
187 r = _mv(root, "--json", "src.py", "dst.py")
188 assert r.exit_code != 0
189 assert not r.stderr.strip(), f"unexpected stderr: {r.stderr!r}"
190 d = json.loads(r.output)
191 assert d["status"] == "error"
192
193 def test_dest_exists_on_disk_error_goes_to_stdout(self, tmp_path: pathlib.Path) -> None:
194 root = _init_repo(tmp_path)
195 _commit(root, {"src.py": b"# s\n"})
196 (root / "dst.py").write_text("# existing\n")
197 r = _mv(root, "--json", "src.py", "dst.py")
198 assert r.exit_code != 0
199 assert not r.stderr.strip(), f"unexpected stderr: {r.stderr!r}"
200 d = json.loads(r.output)
201 assert d["status"] == "error"
202
203 def test_path_traversal_error_goes_to_stderr_in_text_mode(self, tmp_path: pathlib.Path) -> None:
204 """Path traversal in text mode is caught before --json is parsed — stderr is OK."""
205 root = _init_repo(tmp_path)
206 _commit(root, {"anchor.py": b"# a\n"})
207 r = _mv(root, "../../../etc/passwd", "dest.py")
208 assert r.exit_code != 0
209
210 def test_error_payload_has_status_error(self, tmp_path: pathlib.Path) -> None:
211 root = _init_repo(tmp_path)
212 _commit(root, {"anchor.py": b"# a\n"})
213 r = _mv(root, "--json", "ghost.py", "dest.py")
214 d = json.loads(r.output)
215 assert d["status"] == "error"
216
217 def test_error_payload_has_error_field(self, tmp_path: pathlib.Path) -> None:
218 root = _init_repo(tmp_path)
219 _commit(root, {"anchor.py": b"# a\n"})
220 r = _mv(root, "--json", "ghost.py", "dest.py")
221 d = json.loads(r.output)
222 assert "error" in d
223 assert d["error"]
224
225 def test_no_emoji_on_stderr_in_json_mode(self, tmp_path: pathlib.Path) -> None:
226 root = _init_repo(tmp_path)
227 _commit(root, {"anchor.py": b"# a\n"})
228 r = _mv(root, "--json", "ghost.py", "dest.py")
229 assert "❌" not in r.stderr
230
231
232 # ---------------------------------------------------------------------------
233 # Data integrity
234 # ---------------------------------------------------------------------------
235
236
237 class TestDataIntegrity:
238 def test_object_id_is_sha256_prefixed(self, tmp_path: pathlib.Path) -> None:
239 """object_id in JSON response must carry sha256: prefix."""
240 root = _init_repo(tmp_path)
241 content = b"# content\n"
242 _commit(root, {"a.py": content})
243 r = _mv(root, "--json", "a.py", "b.py")
244 d = json.loads(r.output)
245 assert d["object_id"].startswith("sha256:"), (
246 f"object_id missing sha256: prefix: {d['object_id']!r}"
247 )
248
249 def test_source_dest_echoed_verbatim(self, tmp_path: pathlib.Path) -> None:
250 root = _init_repo(tmp_path)
251 _commit(root, {"src/module.py": b"# m\n"})
252 r = _mv(root, "--json", "src/module.py", "lib/module.py")
253 d = json.loads(r.output)
254 assert d["source"] == "src/module.py"
255 assert d["dest"] == "lib/module.py"
256
257 def test_object_id_matches_staged_entry(self, tmp_path: pathlib.Path) -> None:
258 """object_id in JSON must match what was written to the stage."""
259 root = _init_repo(tmp_path)
260 content = b"# exact\n"
261 _commit(root, {"x.py": content})
262 r = _mv(root, "--json", "x.py", "y.py")
263 d = json.loads(r.output)
264 stage = read_stage(root)
265 assert stage["y.py"]["object_id"] == d["object_id"]
266
267 def test_staged_only_move_object_id_preserved(self, tmp_path: pathlib.Path) -> None:
268 """Staged-only file move must echo the correct object_id."""
269 root = _init_repo(tmp_path)
270 _commit(root, {"anchor.py": b"# anchor\n"})
271 content = b"# staged only\n"
272 oid = blob_id(content)
273 write_object(root, oid, content)
274 (root / "new.py").write_bytes(content)
275 stage = read_stage(root)
276 stage["new.py"] = make_entry(oid, "A")
277 write_stage(root, stage)
278 r = _mv(root, "--json", "new.py", "renamed.py")
279 d = json.loads(r.output)
280 assert d["object_id"] == oid
281
282
283 # ---------------------------------------------------------------------------
284 # No-prose pollution
285 # ---------------------------------------------------------------------------
286
287
288 class TestNoProsePollution:
289 def test_success_stdout_is_valid_json(self, tmp_path: pathlib.Path) -> None:
290 root = _init_repo(tmp_path)
291 _commit(root, {"a.py": b"# a\n"})
292 r = _mv(root, "--json", "a.py", "b.py")
293 json.loads(r.output) # must not raise
294
295 def test_dry_run_stdout_is_valid_json(self, tmp_path: pathlib.Path) -> None:
296 root = _init_repo(tmp_path)
297 _commit(root, {"a.py": b"# a\n"})
298 r = _mv(root, "--json", "--dry-run", "a.py", "b.py")
299 json.loads(r.output) # must not raise
300
301 def test_no_emoji_in_success_json(self, tmp_path: pathlib.Path) -> None:
302 root = _init_repo(tmp_path)
303 _commit(root, {"a.py": b"# a\n"})
304 r = _mv(root, "--json", "a.py", "b.py")
305 assert "✅" not in r.output
306 assert "❌" not in r.output
307
308
309 # ---------------------------------------------------------------------------
310 # TypedDicts
311 # ---------------------------------------------------------------------------
312
313
314 class TestTypedDicts:
315 def test_mv_result_json_typeddict_exists(self) -> None:
316 from muse.cli.commands.mv import _MvResultJson
317 assert _MvResultJson is not None
318
319 def test_mv_error_json_typeddict_exists(self) -> None:
320 from muse.cli.commands.mv import _MvErrorJson
321 assert _MvErrorJson is not None
322
323 def test_mv_result_json_has_exit_code_annotation(self) -> None:
324 from muse.cli.commands.mv import _MvResultJson
325 hints = get_type_hints(_MvResultJson)
326 assert "exit_code" in hints
327
328 def test_mv_result_json_has_duration_ms_annotation(self) -> None:
329 from muse.cli.commands.mv import _MvResultJson
330 hints = get_type_hints(_MvResultJson)
331 assert "duration_ms" in hints
332
333 def test_mv_error_json_has_required_fields(self) -> None:
334 from muse.cli.commands.mv import _MvErrorJson
335 hints = get_type_hints(_MvErrorJson)
336 for field in ("status", "error", "exit_code"):
337 assert field in hints, f"Missing annotation: {field!r}"
338
339
340 # ---------------------------------------------------------------------------
341 # Docstring coverage
342 # ---------------------------------------------------------------------------
343
344
345 class TestDocstring:
346 def _doc(self) -> str:
347 import muse.cli.commands.mv as mod
348 return mod.__doc__ or ""
349
350 def test_docstring_documents_exit_code(self) -> None:
351 assert "exit_code" in self._doc()
352
353 def test_docstring_documents_duration_ms(self) -> None:
354 assert "duration_ms" in self._doc()
355
356
357 # ---------------------------------------------------------------------------
358 # Stress
359 # ---------------------------------------------------------------------------
360
361
362 class TestStress:
363 def test_50_files_move_all_json(self, tmp_path: pathlib.Path) -> None:
364 """Move all 50 tracked files with --json; all envelopes must be valid."""
365 root = _init_repo(tmp_path)
366 n = 50
367 files = {f"f{i:03d}.py": f"# {i}\n".encode() for i in range(n)}
368 _commit(root, files)
369
370 for i in range(n):
371 r = _mv(root, "--json", f"f{i:03d}.py", f"moved/f{i:03d}.py")
372 assert r.exit_code == 0, f"move {i} failed: {r.output}"
373 d = json.loads(r.output)
374 assert d["exit_code"] == 0
375 assert "duration_ms" in d
376 assert d["source"] == f"f{i:03d}.py"
377 assert d["dest"] == f"moved/f{i:03d}.py"
378
379 stage = read_stage(root)
380 for i in range(n):
381 assert stage[f"f{i:03d}.py"]["mode"] == "D"
382 assert stage[f"moved/f{i:03d}.py"]["mode"] == "A"
383
384
385 # ---------------------------------------------------------------------------
386 # TestRegisterFlags — argparse-level verification
387 # ---------------------------------------------------------------------------
388
389
390 class TestRegisterFlags:
391 """Verify that register() wires --json / -j correctly."""
392
393 def _make_parser(self) -> "argparse.ArgumentParser":
394 import argparse
395 from muse.cli.commands.mv import register
396 ap = argparse.ArgumentParser()
397 subs = ap.add_subparsers()
398 register(subs)
399 return ap
400
401 def test_json_flag_long(self) -> None:
402 ns = self._make_parser().parse_args(["mv", "src.py", "dst.py", "--json"])
403 assert ns.json_out is True
404
405 def test_j_alias(self) -> None:
406 ns = self._make_parser().parse_args(["mv", "src.py", "dst.py", "-j"])
407 assert ns.json_out is True
408
409 def test_default_is_text(self) -> None:
410 ns = self._make_parser().parse_args(["mv", "src.py", "dst.py"])
411 assert ns.json_out is False
412
413 def test_dest_is_json_out(self) -> None:
414 ns = self._make_parser().parse_args(["mv", "src.py", "dst.py", "-j"])
415 assert hasattr(ns, "json_out")
416 assert not hasattr(ns, "fmt")