gabriel / muse public
test_ls_tree_supercharge.py python
445 lines 16.7 KB
Raw
sha256:248464b6a2f758985cbef90f864fa62c61842be699d975d6e00b6a9509ef919c fix(delta): detect blob-identical file renames for files wi… Sonnet 4.6 patch 24 days ago
1 """Supercharge tests for ``muse ls-tree``.
2
3 Coverage tiers
4 --------------
5 - JSON envelope schema: status, error, exit_code, duration_ms, entry_count,
6 path_prefix, recursive always present
7 - Error payload shape: exactly {status, error, exit_code} — no prose in --json mode
8 - OID integrity: blob object_ids sha256:-prefixed; synthetic tree object_ids sha256:-prefixed
9 - TypedDicts: _LsTreeJson and _LsTreeErrorJson exist and are annotated
10 - Docstring: module docstring covers all new envelope fields and error schema
11 - No-prose pollution: no emoji in JSON stdout, errors to stdout in --json mode
12 """
13 from __future__ import annotations
14 from collections.abc import Mapping
15
16 import datetime
17 import argparse
18 import json
19 import pathlib
20 from typing import get_type_hints
21
22 import pytest
23
24 from muse.core.errors import ExitCode
25 from muse.core.object_store import write_object
26 from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id
27 from muse.core.commits import (
28 CommitRecord,
29 write_commit,
30 )
31 from muse.core.snapshots import (
32 SnapshotRecord,
33 write_snapshot,
34 )
35 from muse.core.types import Manifest, blob_id, split_id
36 from muse.core.paths import ref_path, muse_dir
37 from tests.cli_test_helper import CliRunner, InvokeResult
38
39 runner = CliRunner()
40
41 _REPO_ID = "ls-tree-sg-test"
42 _counter = 0
43
44
45 # ---------------------------------------------------------------------------
46 # Helpers
47 # ---------------------------------------------------------------------------
48
49
50
51 def _init_repo(path: pathlib.Path) -> pathlib.Path:
52 dot_muse = muse_dir(path)
53 for d in ("commits", "snapshots", "objects", "refs/heads", "code"):
54 (dot_muse / d).mkdir(parents=True, exist_ok=True)
55 (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
56 (dot_muse / "repo.json").write_text(
57 json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8"
58 )
59 return path
60
61
62 def _commit_files(root: pathlib.Path, files: Mapping[str, bytes], branch: str = "main") -> str:
63 global _counter
64 _counter += 1
65 manifest: Manifest = {}
66 for rel_path, content in files.items():
67 obj_id = blob_id(content)
68 write_object(root, obj_id, content)
69 manifest[rel_path] = obj_id
70 abs_path = root / rel_path
71 abs_path.parent.mkdir(parents=True, exist_ok=True)
72 abs_path.write_bytes(content)
73 snap_id = compute_snapshot_id(manifest)
74 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
75 committed_at = datetime.datetime.now(datetime.timezone.utc)
76 commit_id = compute_commit_id(
77 parent_ids=[],
78 snapshot_id=snap_id,
79 message=f"commit {_counter}",
80 committed_at_iso=committed_at.isoformat(),
81 )
82 write_commit(root, CommitRecord(
83 commit_id=commit_id,
84 branch=branch,
85 snapshot_id=snap_id,
86 message=f"commit {_counter}",
87 committed_at=committed_at,
88 ))
89 (ref_path(root, branch)).write_text(commit_id, encoding="utf-8")
90 return commit_id
91
92
93 def _invoke(repo: pathlib.Path, *args: str) -> InvokeResult:
94 from muse.cli.app import main as cli
95 return runner.invoke(cli, ["ls-tree", *args], env={"MUSE_REPO_ROOT": str(repo)})
96
97
98 # ---------------------------------------------------------------------------
99 # JSON envelope schema
100 # ---------------------------------------------------------------------------
101
102 class TestJsonEnvelopeSchema:
103 """Every required key is present in the success envelope."""
104
105 _REQUIRED = {
106 "status", "error", "treeish", "commit_id",
107 "path_prefix", "recursive", "entry_count", "entries",
108 "duration_ms", "exit_code",
109 }
110
111 def test_all_required_keys_present(self, tmp_path: pathlib.Path) -> None:
112 repo = _init_repo(tmp_path)
113 _commit_files(repo, {"a.py": b"# a\n"})
114 r = _invoke(repo, "HEAD", "--json")
115 assert r.exit_code == 0
116 d = json.loads(r.output)
117 missing = self._REQUIRED - d.keys()
118 assert not missing, f"Missing keys: {missing}"
119
120 def test_status_ok_on_success(self, tmp_path: pathlib.Path) -> None:
121 repo = _init_repo(tmp_path)
122 _commit_files(repo, {"a.py": b"# a\n"})
123 r = _invoke(repo, "HEAD", "--json")
124 assert json.loads(r.output)["status"] == "ok"
125
126 def test_error_empty_on_success(self, tmp_path: pathlib.Path) -> None:
127 repo = _init_repo(tmp_path)
128 _commit_files(repo, {"a.py": b"# a\n"})
129 r = _invoke(repo, "HEAD", "--json")
130 assert json.loads(r.output)["error"] == ""
131
132 def test_exit_code_zero_on_success(self, tmp_path: pathlib.Path) -> None:
133 repo = _init_repo(tmp_path)
134 _commit_files(repo, {"a.py": b"# a\n"})
135 r = _invoke(repo, "HEAD", "--json")
136 assert json.loads(r.output)["exit_code"] == 0
137
138 def test_duration_ms_is_nonneg_float(self, tmp_path: pathlib.Path) -> None:
139 repo = _init_repo(tmp_path)
140 _commit_files(repo, {"a.py": b"# a\n"})
141 r = _invoke(repo, "HEAD", "--json")
142 d = json.loads(r.output)
143 assert isinstance(d["duration_ms"], float)
144 assert d["duration_ms"] >= 0.0
145
146 def test_entry_count_matches_entries_length(self, tmp_path: pathlib.Path) -> None:
147 repo = _init_repo(tmp_path)
148 _commit_files(repo, {"a.py": b"a", "b.py": b"b", "src/c.py": b"c"})
149 r = _invoke(repo, "HEAD", "--json")
150 d = json.loads(r.output)
151 assert d["entry_count"] == len(d["entries"])
152
153 def test_path_prefix_null_when_not_given(self, tmp_path: pathlib.Path) -> None:
154 repo = _init_repo(tmp_path)
155 _commit_files(repo, {"a.py": b"a"})
156 r = _invoke(repo, "HEAD", "--json")
157 d = json.loads(r.output)
158 assert d["path_prefix"] is None
159
160 def test_path_prefix_echoed_when_given(self, tmp_path: pathlib.Path) -> None:
161 repo = _init_repo(tmp_path)
162 _commit_files(repo, {"src/a.py": b"a"})
163 r = _invoke(repo, "HEAD", "src/", "--json")
164 d = json.loads(r.output)
165 assert d["path_prefix"] == "src/"
166
167 def test_recursive_false_by_default(self, tmp_path: pathlib.Path) -> None:
168 repo = _init_repo(tmp_path)
169 _commit_files(repo, {"src/a.py": b"a"})
170 r = _invoke(repo, "HEAD", "--json")
171 d = json.loads(r.output)
172 assert d["recursive"] is False
173
174 def test_recursive_true_when_flag_given(self, tmp_path: pathlib.Path) -> None:
175 repo = _init_repo(tmp_path)
176 _commit_files(repo, {"src/a.py": b"a"})
177 r = _invoke(repo, "-r", "HEAD", "--json")
178 d = json.loads(r.output)
179 assert d["recursive"] is True
180
181 def test_treeish_echoed(self, tmp_path: pathlib.Path) -> None:
182 repo = _init_repo(tmp_path)
183 _commit_files(repo, {"a.py": b"a"})
184 r = _invoke(repo, "HEAD", "--json")
185 d = json.loads(r.output)
186 assert d["treeish"] == "HEAD"
187
188 def test_commit_id_sha256_prefixed(self, tmp_path: pathlib.Path) -> None:
189 repo = _init_repo(tmp_path)
190 _commit_files(repo, {"a.py": b"a"})
191 r = _invoke(repo, "HEAD", "--json")
192 d = json.loads(r.output)
193 assert d["commit_id"].startswith("sha256:")
194
195
196 # ---------------------------------------------------------------------------
197 # Error payload shape
198 # ---------------------------------------------------------------------------
199
200 class TestErrorPayloadShape:
201 """In --json mode, errors go to stdout as {status, error, exit_code}."""
202
203 def test_error_on_empty_repo_is_json(self, tmp_path: pathlib.Path) -> None:
204 repo = _init_repo(tmp_path)
205 r = _invoke(repo, "HEAD", "--json")
206 assert r.exit_code != 0
207 d = json.loads(r.output) # must be valid JSON
208 assert d["status"] == "error"
209
210 def test_error_payload_has_required_keys(self, tmp_path: pathlib.Path) -> None:
211 repo = _init_repo(tmp_path)
212 r = _invoke(repo, "HEAD", "--json")
213 d = json.loads(r.output)
214 assert {"error", "exit_code"} <= set(d.keys())
215
216 def test_error_message_nonempty(self, tmp_path: pathlib.Path) -> None:
217 repo = _init_repo(tmp_path)
218 r = _invoke(repo, "HEAD", "--json")
219 d = json.loads(r.output)
220 assert d["error"]
221
222 def test_exit_code_nonzero_on_error(self, tmp_path: pathlib.Path) -> None:
223 repo = _init_repo(tmp_path)
224 r = _invoke(repo, "HEAD", "--json")
225 assert r.exit_code != 0
226 d = json.loads(r.output)
227 assert d["exit_code"] != 0
228
229 def test_ansi_in_ref_error_is_json(self, tmp_path: pathlib.Path) -> None:
230 repo = _init_repo(tmp_path)
231 _commit_files(repo, {"a.py": b"a"})
232 r = _invoke(repo, "\x1b[31mbad\x1b[0m", "--json")
233 assert r.exit_code != 0
234 d = json.loads(r.output)
235 assert d["status"] == "error"
236
237 def test_bad_ref_error_is_json(self, tmp_path: pathlib.Path) -> None:
238 repo = _init_repo(tmp_path)
239 _commit_files(repo, {"a.py": b"a"})
240 r = _invoke(repo, "no-such-branch", "--json")
241 assert r.exit_code != 0
242 d = json.loads(r.output)
243 assert d["status"] == "error"
244
245 def test_path_traversal_error_is_json(self, tmp_path: pathlib.Path) -> None:
246 repo = _init_repo(tmp_path)
247 _commit_files(repo, {"a.py": b"a"})
248 r = _invoke(repo, "HEAD", "../../../etc/", "--json")
249 assert r.exit_code != 0
250 d = json.loads(r.output)
251 assert d["status"] == "error"
252
253
254 # ---------------------------------------------------------------------------
255 # OID data integrity
256 # ---------------------------------------------------------------------------
257
258 class TestOidIntegrity:
259 """All object IDs in output carry the sha256: prefix."""
260
261 def test_blob_object_ids_sha256_prefixed(self, tmp_path: pathlib.Path) -> None:
262 repo = _init_repo(tmp_path)
263 _commit_files(repo, {"a.py": b"content"})
264 r = _invoke(repo, "-r", "HEAD", "--json")
265 d = json.loads(r.output)
266 for e in d["entries"]:
267 if e["type"] == "blob":
268 assert e["object_id"].startswith("sha256:"), (
269 f"blob OID not prefixed: {e['object_id']!r}"
270 )
271
272 def test_synthetic_tree_object_ids_sha256_prefixed(self, tmp_path: pathlib.Path) -> None:
273 repo = _init_repo(tmp_path)
274 _commit_files(repo, {"src/a.py": b"a", "lib/b.py": b"b"})
275 r = _invoke(repo, "HEAD", "--json")
276 d = json.loads(r.output)
277 for e in d["entries"]:
278 if e["type"] == "tree":
279 assert e["object_id"].startswith("sha256:"), (
280 f"tree OID not prefixed: {e['object_id']!r}"
281 )
282
283 def test_blob_oid_hex_part_is_64_chars(self, tmp_path: pathlib.Path) -> None:
284 repo = _init_repo(tmp_path)
285 _commit_files(repo, {"a.py": b"content"})
286 r = _invoke(repo, "-r", "HEAD", "--json")
287 d = json.loads(r.output)
288 for e in d["entries"]:
289 if e["type"] == "blob":
290 _, hex_part = split_id(e["object_id"])
291 assert len(hex_part) == 64
292 assert all(c in "0123456789abcdef" for c in hex_part)
293
294 def test_tree_oid_hex_part_is_64_chars(self, tmp_path: pathlib.Path) -> None:
295 repo = _init_repo(tmp_path)
296 _commit_files(repo, {"src/a.py": b"a"})
297 r = _invoke(repo, "HEAD", "--json")
298 d = json.loads(r.output)
299 for e in d["entries"]:
300 if e["type"] == "tree":
301 _, hex_part = split_id(e["object_id"])
302 assert len(hex_part) == 64
303 assert all(c in "0123456789abcdef" for c in hex_part)
304
305 def test_text_format_blob_oid_sha256_prefixed(self, tmp_path: pathlib.Path) -> None:
306 repo = _init_repo(tmp_path)
307 _commit_files(repo, {"a.py": b"content"})
308 r = _invoke(repo, "-r", "HEAD")
309 assert r.exit_code == 0
310 for line in r.output.strip().splitlines():
311 meta, _ = line.split("\t", 1)
312 parts = meta.split()
313 oid = parts[2]
314 assert oid.startswith("sha256:"), f"text OID not prefixed: {oid!r}"
315
316
317 # ---------------------------------------------------------------------------
318 # No-prose pollution
319 # ---------------------------------------------------------------------------
320
321 class TestNoProsePollution:
322 def test_stdout_valid_json_on_success(self, tmp_path: pathlib.Path) -> None:
323 repo = _init_repo(tmp_path)
324 _commit_files(repo, {"a.py": b"a"})
325 r = _invoke(repo, "HEAD", "--json")
326 json.loads(r.output) # must not raise
327
328 def test_no_emoji_in_json_stdout(self, tmp_path: pathlib.Path) -> None:
329 repo = _init_repo(tmp_path)
330 _commit_files(repo, {"a.py": b"a"})
331 r = _invoke(repo, "HEAD", "--json")
332 assert "❌" not in r.output
333
334 def test_error_stdout_valid_json(self, tmp_path: pathlib.Path) -> None:
335 repo = _init_repo(tmp_path)
336 r = _invoke(repo, "HEAD", "--json")
337 json.loads(r.output) # must not raise
338
339 def test_no_traceback_on_bad_ref(self, tmp_path: pathlib.Path) -> None:
340 repo = _init_repo(tmp_path)
341 _commit_files(repo, {"a.py": b"a"})
342 r = _invoke(repo, "ghost-branch", "--json")
343 assert "Traceback" not in r.output
344 assert "Traceback" not in r.stderr
345
346 def test_ansi_in_output_encoded_in_json(self, tmp_path: pathlib.Path) -> None:
347 """File paths with ANSI sequences must be JSON-encoded, not emitted raw."""
348 repo = _init_repo(tmp_path)
349 malicious = "src/\x1b[31mmalicious\x1b[0m.py"
350 _commit_files(repo, {malicious: b"bad"})
351 r = _invoke(repo, "-r", "HEAD", "--json")
352 assert r.exit_code == 0
353 assert "\x1b" not in r.output
354
355
356 # ---------------------------------------------------------------------------
357 # TypedDicts
358 # ---------------------------------------------------------------------------
359
360 class TestTypedDicts:
361 def test_ls_tree_json_typeddict_exists(self) -> None:
362 from muse.cli.commands.ls_tree import _LsTreeJson
363 assert _LsTreeJson is not None
364
365 def test_ls_tree_error_json_typeddict_exists(self) -> None:
366 from muse.cli.commands.ls_tree import _LsTreeErrorJson
367 assert _LsTreeErrorJson is not None
368
369 def test_ls_tree_json_has_status_annotation(self) -> None:
370 from muse.cli.commands.ls_tree import _LsTreeJson
371 hints = get_type_hints(_LsTreeJson)
372 assert "status" in hints
373
374 def test_ls_tree_json_has_all_new_fields(self) -> None:
375 from muse.cli.commands.ls_tree import _LsTreeJson
376 hints = get_type_hints(_LsTreeJson)
377 for field in ("status", "error", "entry_count", "path_prefix", "recursive",
378 "duration_ms", "exit_code"):
379 assert field in hints, f"Missing annotation: {field!r}"
380
381
382 # ---------------------------------------------------------------------------
383 # Docstring coverage
384 # ---------------------------------------------------------------------------
385
386 class TestDocstring:
387 def _doc(self) -> str:
388 import muse.cli.commands.ls_tree as mod
389 return mod.__doc__ or ""
390
391 def test_docstring_documents_status(self) -> None:
392 assert "status" in self._doc()
393
394 def test_docstring_documents_error(self) -> None:
395 assert "error" in self._doc()
396
397 def test_docstring_documents_entry_count(self) -> None:
398 assert "entry_count" in self._doc()
399
400 def test_docstring_documents_path_prefix(self) -> None:
401 assert "path_prefix" in self._doc()
402
403 def test_docstring_documents_duration_ms(self) -> None:
404 assert "duration_ms" in self._doc()
405
406 def test_docstring_documents_exit_code(self) -> None:
407 assert "exit_code" in self._doc()
408
409 def test_docstring_documents_error_schema(self) -> None:
410 doc = self._doc()
411 assert "error" in doc and "exit_code" in doc
412
413
414 # ---------------------------------------------------------------------------
415 # TestRegisterFlags — argparse-level verification
416 # ---------------------------------------------------------------------------
417
418
419 class TestRegisterFlags:
420 """Verify that register() wires --json / -j correctly."""
421
422 def _make_parser(self) -> "argparse.ArgumentParser":
423 import argparse
424 from muse.cli.commands.ls_tree import register
425 ap = argparse.ArgumentParser()
426 subs = ap.add_subparsers()
427 register(subs)
428 return ap
429
430 def test_json_flag_long(self) -> None:
431 ns = self._make_parser().parse_args(["ls-tree", "--json"])
432 assert ns.json_out is True
433
434 def test_j_alias(self) -> None:
435 ns = self._make_parser().parse_args(["ls-tree", "-j"])
436 assert ns.json_out is True
437
438 def test_default_is_text(self) -> None:
439 ns = self._make_parser().parse_args(["ls-tree"])
440 assert ns.json_out is False
441
442 def test_dest_is_json_out(self) -> None:
443 ns = self._make_parser().parse_args(["ls-tree", "-j"])
444 assert hasattr(ns, "json_out")
445 assert not hasattr(ns, "fmt")
File History 1 commit
sha256:248464b6a2f758985cbef90f864fa62c61842be699d975d6e00b6a9509ef919c fix(delta): detect blob-identical file renames for files wi… Sonnet 4.6 patch 24 days ago