"""Supercharge tests for ``muse mv``. Coverage tiers -------------- - JSON envelope: exit_code and duration_ms present on success and dry_run - Error payload: errors go to stdout as JSON in --json mode, no dual stderr prose - Data integrity: sha256: OID prefix preserved; source/dest echoed verbatim - TypedDicts: _MvResultJson and _MvErrorJson with required annotations - Docstring: module docstring covers exit_code and duration_ms - No-prose pollution: JSON stdout is valid on all non-error paths - Stress: 50-file move-all with --json, correct stage state """ from __future__ import annotations from collections.abc import Mapping import datetime import argparse import json import pathlib from typing import get_type_hints from muse.core.object_store import write_object from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot from muse.core.types import Manifest, blob_id from muse.plugins.code.stage import make_entry, read_stage, write_stage from muse.core.paths import heads_dir, muse_dir from tests.cli_test_helper import CliRunner, InvokeResult runner = CliRunner() _REPO_ID = "mv-supercharge" _DT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _init_repo(tmp_path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(tmp_path) for d in ("commits", "snapshots", "objects", "refs/heads", "code"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" ) return tmp_path def _env(root: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(root)} def _commit(root: pathlib.Path, files: Mapping[str, bytes]) -> str: manifest: Manifest = {} for rel, content in files.items(): oid = blob_id(content) write_object(root, oid, content) manifest[rel] = oid abs_p = root / rel abs_p.parent.mkdir(parents=True, exist_ok=True) abs_p.write_bytes(content) snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest, created_at=_DT)) cid = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="test", committed_at_iso=_DT.isoformat(), ) write_commit(root, CommitRecord( commit_id=cid, branch="main", snapshot_id=snap_id, message="test", committed_at=_DT, )) (heads_dir(root) / "main").write_text(cid, encoding="utf-8") return cid def _mv(root: pathlib.Path, *args: str) -> InvokeResult: from muse.cli.app import main as cli return runner.invoke(cli, ["mv", *args], env=_env(root)) # --------------------------------------------------------------------------- # JSON envelope — exit_code # --------------------------------------------------------------------------- class TestJsonEnvelopeExitCode: def test_success_has_exit_code_zero(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"a.py": b"# a\n"}) r = _mv(root, "--json", "a.py", "b.py") assert r.exit_code == 0 d = json.loads(r.output) assert "exit_code" in d, "exit_code missing from success envelope" assert d["exit_code"] == 0 def test_dry_run_has_exit_code_zero(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"a.py": b"# a\n"}) r = _mv(root, "--json", "--dry-run", "a.py", "b.py") assert r.exit_code == 0 d = json.loads(r.output) assert "exit_code" in d, "exit_code missing from dry_run envelope" assert d["exit_code"] == 0 def test_error_payload_has_exit_code_nonzero(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"anchor.py": b"# a\n"}) r = _mv(root, "--json", "ghost.py", "dest.py") d = json.loads(r.output) assert "exit_code" in d assert d["exit_code"] != 0 # --------------------------------------------------------------------------- # JSON envelope — duration_ms # --------------------------------------------------------------------------- class TestJsonEnvelopeDurationMs: def test_success_has_duration_ms(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"a.py": b"# a\n"}) r = _mv(root, "--json", "a.py", "b.py") d = json.loads(r.output) assert "duration_ms" in d, "duration_ms missing from success envelope" assert isinstance(d["duration_ms"], float) assert d["duration_ms"] >= 0.0 def test_dry_run_has_duration_ms(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"a.py": b"# a\n"}) r = _mv(root, "--json", "--dry-run", "a.py", "b.py") d = json.loads(r.output) assert "duration_ms" in d, "duration_ms missing from dry_run envelope" assert isinstance(d["duration_ms"], float) def test_force_move_has_duration_ms(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"src.py": b"# s\n", "dst.py": b"# d\n"}) r = _mv(root, "--json", "--force", "src.py", "dst.py") d = json.loads(r.output) assert "duration_ms" in d # --------------------------------------------------------------------------- # Error payload — errors route to stdout as JSON in --json mode # --------------------------------------------------------------------------- class TestErrorPayload: def test_untracked_source_error_goes_to_stdout(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"anchor.py": b"# a\n"}) (root / "ghost.py").write_text("# untracked\n") r = _mv(root, "--json", "ghost.py", "dest.py") assert r.exit_code != 0 assert not r.stderr.strip(), f"unexpected stderr: {r.stderr!r}" d = json.loads(r.output) assert d["status"] == "error" def test_missing_disk_source_error_goes_to_stdout(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"a.py": b"# a\n"}) (root / "a.py").unlink() r = _mv(root, "--json", "a.py", "b.py") assert r.exit_code != 0 assert not r.stderr.strip(), f"unexpected stderr: {r.stderr!r}" d = json.loads(r.output) assert d["status"] == "error" def test_dest_already_tracked_error_goes_to_stdout(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"src.py": b"# s\n", "dst.py": b"# d\n"}) r = _mv(root, "--json", "src.py", "dst.py") assert r.exit_code != 0 assert not r.stderr.strip(), f"unexpected stderr: {r.stderr!r}" d = json.loads(r.output) assert d["status"] == "error" def test_dest_exists_on_disk_error_goes_to_stdout(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"src.py": b"# s\n"}) (root / "dst.py").write_text("# existing\n") r = _mv(root, "--json", "src.py", "dst.py") assert r.exit_code != 0 assert not r.stderr.strip(), f"unexpected stderr: {r.stderr!r}" d = json.loads(r.output) assert d["status"] == "error" def test_path_traversal_error_goes_to_stderr_in_text_mode(self, tmp_path: pathlib.Path) -> None: """Path traversal in text mode is caught before --json is parsed — stderr is OK.""" root = _init_repo(tmp_path) _commit(root, {"anchor.py": b"# a\n"}) r = _mv(root, "../../../etc/passwd", "dest.py") assert r.exit_code != 0 def test_error_payload_has_status_error(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"anchor.py": b"# a\n"}) r = _mv(root, "--json", "ghost.py", "dest.py") d = json.loads(r.output) assert d["status"] == "error" def test_error_payload_has_error_field(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"anchor.py": b"# a\n"}) r = _mv(root, "--json", "ghost.py", "dest.py") d = json.loads(r.output) assert "error" in d assert d["error"] def test_no_emoji_on_stderr_in_json_mode(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"anchor.py": b"# a\n"}) r = _mv(root, "--json", "ghost.py", "dest.py") assert "❌" not in r.stderr # --------------------------------------------------------------------------- # Data integrity # --------------------------------------------------------------------------- class TestDataIntegrity: def test_object_id_is_sha256_prefixed(self, tmp_path: pathlib.Path) -> None: """object_id in JSON response must carry sha256: prefix.""" root = _init_repo(tmp_path) content = b"# content\n" _commit(root, {"a.py": content}) r = _mv(root, "--json", "a.py", "b.py") d = json.loads(r.output) assert d["object_id"].startswith("sha256:"), ( f"object_id missing sha256: prefix: {d['object_id']!r}" ) def test_source_dest_echoed_verbatim(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"src/module.py": b"# m\n"}) r = _mv(root, "--json", "src/module.py", "lib/module.py") d = json.loads(r.output) assert d["source"] == "src/module.py" assert d["dest"] == "lib/module.py" def test_object_id_matches_staged_entry(self, tmp_path: pathlib.Path) -> None: """object_id in JSON must match what was written to the stage.""" root = _init_repo(tmp_path) content = b"# exact\n" _commit(root, {"x.py": content}) r = _mv(root, "--json", "x.py", "y.py") d = json.loads(r.output) stage = read_stage(root) assert stage["y.py"]["object_id"] == d["object_id"] def test_staged_only_move_object_id_preserved(self, tmp_path: pathlib.Path) -> None: """Staged-only file move must echo the correct object_id.""" root = _init_repo(tmp_path) _commit(root, {"anchor.py": b"# anchor\n"}) content = b"# staged only\n" oid = blob_id(content) write_object(root, oid, content) (root / "new.py").write_bytes(content) stage = read_stage(root) stage["new.py"] = make_entry(oid, "A") write_stage(root, stage) r = _mv(root, "--json", "new.py", "renamed.py") d = json.loads(r.output) assert d["object_id"] == oid # --------------------------------------------------------------------------- # No-prose pollution # --------------------------------------------------------------------------- class TestNoProsePollution: def test_success_stdout_is_valid_json(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"a.py": b"# a\n"}) r = _mv(root, "--json", "a.py", "b.py") json.loads(r.output) # must not raise def test_dry_run_stdout_is_valid_json(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"a.py": b"# a\n"}) r = _mv(root, "--json", "--dry-run", "a.py", "b.py") json.loads(r.output) # must not raise def test_no_emoji_in_success_json(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"a.py": b"# a\n"}) r = _mv(root, "--json", "a.py", "b.py") assert "✅" not in r.output assert "❌" not in r.output # --------------------------------------------------------------------------- # TypedDicts # --------------------------------------------------------------------------- class TestTypedDicts: def test_mv_result_json_typeddict_exists(self) -> None: from muse.cli.commands.mv import _MvResultJson assert _MvResultJson is not None def test_mv_error_json_typeddict_exists(self) -> None: from muse.cli.commands.mv import _MvErrorJson assert _MvErrorJson is not None def test_mv_result_json_has_exit_code_annotation(self) -> None: from muse.cli.commands.mv import _MvResultJson hints = get_type_hints(_MvResultJson) assert "exit_code" in hints def test_mv_result_json_has_duration_ms_annotation(self) -> None: from muse.cli.commands.mv import _MvResultJson hints = get_type_hints(_MvResultJson) assert "duration_ms" in hints def test_mv_error_json_has_required_fields(self) -> None: from muse.cli.commands.mv import _MvErrorJson hints = get_type_hints(_MvErrorJson) for field in ("status", "error", "exit_code"): assert field in hints, f"Missing annotation: {field!r}" # --------------------------------------------------------------------------- # Docstring coverage # --------------------------------------------------------------------------- class TestDocstring: def _doc(self) -> str: import muse.cli.commands.mv as mod return mod.__doc__ or "" def test_docstring_documents_exit_code(self) -> None: assert "exit_code" in self._doc() def test_docstring_documents_duration_ms(self) -> None: assert "duration_ms" in self._doc() # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestStress: def test_50_files_move_all_json(self, tmp_path: pathlib.Path) -> None: """Move all 50 tracked files with --json; all envelopes must be valid.""" root = _init_repo(tmp_path) n = 50 files = {f"f{i:03d}.py": f"# {i}\n".encode() for i in range(n)} _commit(root, files) for i in range(n): r = _mv(root, "--json", f"f{i:03d}.py", f"moved/f{i:03d}.py") assert r.exit_code == 0, f"move {i} failed: {r.output}" d = json.loads(r.output) assert d["exit_code"] == 0 assert "duration_ms" in d assert d["source"] == f"f{i:03d}.py" assert d["dest"] == f"moved/f{i:03d}.py" stage = read_stage(root) for i in range(n): assert stage[f"f{i:03d}.py"]["mode"] == "D" assert stage[f"moved/f{i:03d}.py"]["mode"] == "A" # --------------------------------------------------------------------------- # TestRegisterFlags — argparse-level verification # --------------------------------------------------------------------------- class TestRegisterFlags: """Verify that register() wires --json / -j correctly.""" def _make_parser(self) -> "argparse.ArgumentParser": import argparse from muse.cli.commands.mv import register ap = argparse.ArgumentParser() subs = ap.add_subparsers() register(subs) return ap def test_json_flag_long(self) -> None: ns = self._make_parser().parse_args(["mv", "src.py", "dst.py", "--json"]) assert ns.json_out is True def test_j_alias(self) -> None: ns = self._make_parser().parse_args(["mv", "src.py", "dst.py", "-j"]) assert ns.json_out is True def test_default_is_text(self) -> None: ns = self._make_parser().parse_args(["mv", "src.py", "dst.py"]) assert ns.json_out is False def test_dest_is_json_out(self) -> None: ns = self._make_parser().parse_args(["mv", "src.py", "dst.py", "-j"]) assert hasattr(ns, "json_out") assert not hasattr(ns, "fmt")