"""Supercharge tests for ``muse ls-tree``. Coverage tiers -------------- - JSON envelope schema: status, error, exit_code, duration_ms, entry_count, path_prefix, recursive always present - Error payload shape: exactly {status, error, exit_code} — no prose in --json mode - OID integrity: blob object_ids sha256:-prefixed; synthetic tree object_ids sha256:-prefixed - TypedDicts: _LsTreeJson and _LsTreeErrorJson exist and are annotated - Docstring: module docstring covers all new envelope fields and error schema - No-prose pollution: no emoji in JSON stdout, errors to stdout in --json mode """ from __future__ import annotations from collections.abc import Mapping import datetime import argparse import json import pathlib from typing import get_type_hints import pytest from muse.core.errors import ExitCode from muse.core.object_store import write_object from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, blob_id, split_id from muse.core.paths import ref_path, muse_dir from tests.cli_test_helper import CliRunner, InvokeResult runner = CliRunner() _REPO_ID = "ls-tree-sg-test" _counter = 0 # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _init_repo(path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(path) for d in ("commits", "snapshots", "objects", "refs/heads", "code"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" ) return path def _commit_files(root: pathlib.Path, files: Mapping[str, bytes], branch: str = "main") -> str: global _counter _counter += 1 manifest: Manifest = {} for rel_path, content in files.items(): obj_id = blob_id(content) write_object(root, obj_id, content) manifest[rel_path] = obj_id abs_path = root / rel_path abs_path.parent.mkdir(parents=True, exist_ok=True) abs_path.write_bytes(content) snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime.now(datetime.timezone.utc) commit_id = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message=f"commit {_counter}", committed_at_iso=committed_at.isoformat(), ) write_commit(root, CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message=f"commit {_counter}", committed_at=committed_at, )) (ref_path(root, branch)).write_text(commit_id, encoding="utf-8") return commit_id def _invoke(repo: pathlib.Path, *args: str) -> InvokeResult: from muse.cli.app import main as cli return runner.invoke(cli, ["ls-tree", *args], env={"MUSE_REPO_ROOT": str(repo)}) # --------------------------------------------------------------------------- # JSON envelope schema # --------------------------------------------------------------------------- class TestJsonEnvelopeSchema: """Every required key is present in the success envelope.""" _REQUIRED = { "status", "error", "treeish", "commit_id", "path_prefix", "recursive", "entry_count", "entries", "duration_ms", "exit_code", } def test_all_required_keys_present(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"# a\n"}) r = _invoke(repo, "HEAD", "--json") assert r.exit_code == 0 d = json.loads(r.output) missing = self._REQUIRED - d.keys() assert not missing, f"Missing keys: {missing}" def test_status_ok_on_success(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"# a\n"}) r = _invoke(repo, "HEAD", "--json") assert json.loads(r.output)["status"] == "ok" def test_error_empty_on_success(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"# a\n"}) r = _invoke(repo, "HEAD", "--json") assert json.loads(r.output)["error"] == "" def test_exit_code_zero_on_success(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"# a\n"}) r = _invoke(repo, "HEAD", "--json") assert json.loads(r.output)["exit_code"] == 0 def test_duration_ms_is_nonneg_float(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"# a\n"}) r = _invoke(repo, "HEAD", "--json") d = json.loads(r.output) assert isinstance(d["duration_ms"], float) assert d["duration_ms"] >= 0.0 def test_entry_count_matches_entries_length(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"a", "b.py": b"b", "src/c.py": b"c"}) r = _invoke(repo, "HEAD", "--json") d = json.loads(r.output) assert d["entry_count"] == len(d["entries"]) def test_path_prefix_null_when_not_given(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"a"}) r = _invoke(repo, "HEAD", "--json") d = json.loads(r.output) assert d["path_prefix"] is None def test_path_prefix_echoed_when_given(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"src/a.py": b"a"}) r = _invoke(repo, "HEAD", "src/", "--json") d = json.loads(r.output) assert d["path_prefix"] == "src/" def test_recursive_false_by_default(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"src/a.py": b"a"}) r = _invoke(repo, "HEAD", "--json") d = json.loads(r.output) assert d["recursive"] is False def test_recursive_true_when_flag_given(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"src/a.py": b"a"}) r = _invoke(repo, "-r", "HEAD", "--json") d = json.loads(r.output) assert d["recursive"] is True def test_treeish_echoed(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"a"}) r = _invoke(repo, "HEAD", "--json") d = json.loads(r.output) assert d["treeish"] == "HEAD" def test_commit_id_sha256_prefixed(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"a"}) r = _invoke(repo, "HEAD", "--json") d = json.loads(r.output) assert d["commit_id"].startswith("sha256:") # --------------------------------------------------------------------------- # Error payload shape # --------------------------------------------------------------------------- class TestErrorPayloadShape: """In --json mode, errors go to stdout as {status, error, exit_code}.""" def test_error_on_empty_repo_is_json(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) r = _invoke(repo, "HEAD", "--json") assert r.exit_code != 0 d = json.loads(r.output) # must be valid JSON assert d["status"] == "error" def test_error_payload_has_required_keys(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) r = _invoke(repo, "HEAD", "--json") d = json.loads(r.output) assert {"error", "exit_code"} <= set(d.keys()) def test_error_message_nonempty(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) r = _invoke(repo, "HEAD", "--json") d = json.loads(r.output) assert d["error"] def test_exit_code_nonzero_on_error(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) r = _invoke(repo, "HEAD", "--json") assert r.exit_code != 0 d = json.loads(r.output) assert d["exit_code"] != 0 def test_ansi_in_ref_error_is_json(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"a"}) r = _invoke(repo, "\x1b[31mbad\x1b[0m", "--json") assert r.exit_code != 0 d = json.loads(r.output) assert d["status"] == "error" def test_bad_ref_error_is_json(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"a"}) r = _invoke(repo, "no-such-branch", "--json") assert r.exit_code != 0 d = json.loads(r.output) assert d["status"] == "error" def test_path_traversal_error_is_json(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"a"}) r = _invoke(repo, "HEAD", "../../../etc/", "--json") assert r.exit_code != 0 d = json.loads(r.output) assert d["status"] == "error" # --------------------------------------------------------------------------- # OID data integrity # --------------------------------------------------------------------------- class TestOidIntegrity: """All object IDs in output carry the sha256: prefix.""" def test_blob_object_ids_sha256_prefixed(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"content"}) r = _invoke(repo, "-r", "HEAD", "--json") d = json.loads(r.output) for e in d["entries"]: if e["type"] == "blob": assert e["object_id"].startswith("sha256:"), ( f"blob OID not prefixed: {e['object_id']!r}" ) def test_synthetic_tree_object_ids_sha256_prefixed(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"src/a.py": b"a", "lib/b.py": b"b"}) r = _invoke(repo, "HEAD", "--json") d = json.loads(r.output) for e in d["entries"]: if e["type"] == "tree": assert e["object_id"].startswith("sha256:"), ( f"tree OID not prefixed: {e['object_id']!r}" ) def test_blob_oid_hex_part_is_64_chars(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"content"}) r = _invoke(repo, "-r", "HEAD", "--json") d = json.loads(r.output) for e in d["entries"]: if e["type"] == "blob": _, hex_part = split_id(e["object_id"]) assert len(hex_part) == 64 assert all(c in "0123456789abcdef" for c in hex_part) def test_tree_oid_hex_part_is_64_chars(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"src/a.py": b"a"}) r = _invoke(repo, "HEAD", "--json") d = json.loads(r.output) for e in d["entries"]: if e["type"] == "tree": _, hex_part = split_id(e["object_id"]) assert len(hex_part) == 64 assert all(c in "0123456789abcdef" for c in hex_part) def test_text_format_blob_oid_sha256_prefixed(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"content"}) r = _invoke(repo, "-r", "HEAD") assert r.exit_code == 0 for line in r.output.strip().splitlines(): meta, _ = line.split("\t", 1) parts = meta.split() oid = parts[2] assert oid.startswith("sha256:"), f"text OID not prefixed: {oid!r}" # --------------------------------------------------------------------------- # No-prose pollution # --------------------------------------------------------------------------- class TestNoProsePollution: def test_stdout_valid_json_on_success(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"a"}) r = _invoke(repo, "HEAD", "--json") json.loads(r.output) # must not raise def test_no_emoji_in_json_stdout(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"a"}) r = _invoke(repo, "HEAD", "--json") assert "❌" not in r.output def test_error_stdout_valid_json(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) r = _invoke(repo, "HEAD", "--json") json.loads(r.output) # must not raise def test_no_traceback_on_bad_ref(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _commit_files(repo, {"a.py": b"a"}) r = _invoke(repo, "ghost-branch", "--json") assert "Traceback" not in r.output assert "Traceback" not in r.stderr def test_ansi_in_output_encoded_in_json(self, tmp_path: pathlib.Path) -> None: """File paths with ANSI sequences must be JSON-encoded, not emitted raw.""" repo = _init_repo(tmp_path) malicious = "src/\x1b[31mmalicious\x1b[0m.py" _commit_files(repo, {malicious: b"bad"}) r = _invoke(repo, "-r", "HEAD", "--json") assert r.exit_code == 0 assert "\x1b" not in r.output # --------------------------------------------------------------------------- # TypedDicts # --------------------------------------------------------------------------- class TestTypedDicts: def test_ls_tree_json_typeddict_exists(self) -> None: from muse.cli.commands.ls_tree import _LsTreeJson assert _LsTreeJson is not None def test_ls_tree_error_json_typeddict_exists(self) -> None: from muse.cli.commands.ls_tree import _LsTreeErrorJson assert _LsTreeErrorJson is not None def test_ls_tree_json_has_status_annotation(self) -> None: from muse.cli.commands.ls_tree import _LsTreeJson hints = get_type_hints(_LsTreeJson) assert "status" in hints def test_ls_tree_json_has_all_new_fields(self) -> None: from muse.cli.commands.ls_tree import _LsTreeJson hints = get_type_hints(_LsTreeJson) for field in ("status", "error", "entry_count", "path_prefix", "recursive", "duration_ms", "exit_code"): assert field in hints, f"Missing annotation: {field!r}" # --------------------------------------------------------------------------- # Docstring coverage # --------------------------------------------------------------------------- class TestDocstring: def _doc(self) -> str: import muse.cli.commands.ls_tree as mod return mod.__doc__ or "" def test_docstring_documents_status(self) -> None: assert "status" in self._doc() def test_docstring_documents_error(self) -> None: assert "error" in self._doc() def test_docstring_documents_entry_count(self) -> None: assert "entry_count" in self._doc() def test_docstring_documents_path_prefix(self) -> None: assert "path_prefix" in self._doc() def test_docstring_documents_duration_ms(self) -> None: assert "duration_ms" in self._doc() def test_docstring_documents_exit_code(self) -> None: assert "exit_code" in self._doc() def test_docstring_documents_error_schema(self) -> None: doc = self._doc() assert "error" in doc and "exit_code" in doc # --------------------------------------------------------------------------- # TestRegisterFlags — argparse-level verification # --------------------------------------------------------------------------- class TestRegisterFlags: """Verify that register() wires --json / -j correctly.""" def _make_parser(self) -> "argparse.ArgumentParser": import argparse from muse.cli.commands.ls_tree import register ap = argparse.ArgumentParser() subs = ap.add_subparsers() register(subs) return ap def test_json_flag_long(self) -> None: ns = self._make_parser().parse_args(["ls-tree", "--json"]) assert ns.json_out is True def test_j_alias(self) -> None: ns = self._make_parser().parse_args(["ls-tree", "-j"]) assert ns.json_out is True def test_default_is_text(self) -> None: ns = self._make_parser().parse_args(["ls-tree"]) assert ns.json_out is False def test_dest_is_json_out(self) -> None: ns = self._make_parser().parse_args(["ls-tree", "-j"]) assert hasattr(ns, "json_out") assert not hasattr(ns, "fmt")