"""Comprehensive tests for ``muse rev-parse``. Coverage tiers -------------- - Integration: branch, HEAD, SHA prefix, full SHA, --abbrev-ref, --format text - Edge cases: empty repo (no commits), empty ref, ambiguous prefix, HEAD→branch - Security: ANSI/control chars in ref → JSON-escaped, empty ref clean error - Stress: 200 rapid resolves """ from __future__ import annotations from collections.abc import Mapping import datetime import json import pathlib import pytest from muse.core.errors import ExitCode from muse.core.object_store import write_object from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, long_id, split_id from muse.core.paths import muse_dir, ref_path from tests.cli_test_helper import CliRunner, InvokeResult runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path, branch: str = "main") -> pathlib.Path: repo = tmp_path / "repo" dot_muse = muse_dir(repo) for sub in ("objects", "commits", "snapshots", "refs/heads"): (dot_muse / sub).mkdir(parents=True) (dot_muse / "HEAD").write_text(f"ref: refs/heads/{branch}") (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test", "domain": "code"})) return repo _TS = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) def _store_snap(repo: pathlib.Path, manifest: Manifest | None = None) -> str: sid = hash_snapshot(manifest or {}) write_snapshot(repo, SnapshotRecord( snapshot_id=sid, manifest=manifest or {}, created_at=_TS, )) return sid def _make_commit( repo: pathlib.Path, snapshot_id: str, *, branch: str = "main", parent: str | None = None, message: str = "test", ) -> str: parents = [parent] if parent else [] cid = hash_commit( parent_ids=parents, snapshot_id=snapshot_id, message=message, committed_at_iso=_TS.isoformat(), author="tester", ) rec = CommitRecord( commit_id=cid, branch=branch, snapshot_id=snapshot_id, message=message, committed_at=_TS, author="tester", parent_commit_id=parent, ) write_commit(repo, rec) return cid def _set_head(repo: pathlib.Path, branch: str, commit_id: str) -> None: ref = ref_path(repo, branch) ref.parent.mkdir(parents=True, exist_ok=True) ref.write_text(commit_id) def _rev(repo: pathlib.Path, *args: str) -> InvokeResult: from muse.cli.app import main as cli return runner.invoke( cli, ["rev-parse", *args], env={"MUSE_REPO_ROOT": str(repo)}, ) def _populated_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: """Return (repo, commit_id) with one commit on main, using real content-addressed IDs.""" repo = _make_repo(tmp_path) sid = _store_snap(repo) cid = _make_commit(repo, sid) _set_head(repo, "main", cid) return repo, cid # --------------------------------------------------------------------------- # Integration — branch resolution # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # New: default format is text, --json makes it meaningful # --------------------------------------------------------------------------- class TestDefaultFormat: def test_default_output_is_text(self, tmp_path: pathlib.Path) -> None: """Without --json the output is a plain commit ID.""" repo, cid = _populated_repo(tmp_path) result = _rev(repo, "main") assert result.exit_code == 0 assert result.output.strip() == cid def test_no_flags_output_is_not_json(self, tmp_path: pathlib.Path) -> None: """Default plain-text output is not parseable as JSON.""" repo, cid = _populated_repo(tmp_path) result = _rev(repo, "main") assert result.exit_code == 0 with pytest.raises((json.JSONDecodeError, ValueError)): json.loads(result.output) def test_json_flag_gives_dict_output(self, tmp_path: pathlib.Path) -> None: """With --json output is a dict.""" repo, cid = _populated_repo(tmp_path) result = _rev(repo, "--json", "main") assert result.exit_code == 0 data = json.loads(result.output) assert data["commit_id"] == cid assert data["ref"] == "main" def test_text_vs_json_differ(self, tmp_path: pathlib.Path) -> None: """Plain text and --json outputs differ in structure.""" repo, cid = _populated_repo(tmp_path) text_result = _rev(repo, "main") json_result = _rev(repo, "--json", "main") assert text_result.output.strip() == cid assert json.loads(json_result.output)["commit_id"] == cid # --------------------------------------------------------------------------- # New: sha256: prefix is required; bare hex is rejected # --------------------------------------------------------------------------- class TestSha256PrefixRequired: def test_bare_full_hex_rejected(self, tmp_path: pathlib.Path) -> None: """64-char bare hex without sha256: prefix must be rejected.""" repo, cid = _populated_repo(tmp_path) result = _rev(repo, split_id(cid)[1]) assert result.exit_code == ExitCode.USER_ERROR data = json.loads(result.output) assert "sha256:" in data["error"] def test_bare_short_hex_rejected(self, tmp_path: pathlib.Path) -> None: """Short bare hex without sha256: prefix must be rejected.""" repo, cid = _populated_repo(tmp_path) result = _rev(repo, split_id(cid)[1][:8]) # 8 bare hex chars, no prefix assert result.exit_code == ExitCode.USER_ERROR data = json.loads(result.output) assert "sha256:" in data["error"] def test_canonical_full_id_resolves(self, tmp_path: pathlib.Path) -> None: """sha256:<64hex> must resolve to the commit.""" repo, cid = _populated_repo(tmp_path) result = _rev(repo, "--json", cid) assert result.exit_code == 0 assert json.loads(result.output)["commit_id"] == cid def test_canonical_prefix_resolves(self, tmp_path: pathlib.Path) -> None: """sha256:<8hex> prefix must resolve to the commit.""" repo, cid = _populated_repo(tmp_path) prefix = long_id(split_id(cid)[1][:8])# sha256: + 8 hex chars result = _rev(repo, "--json", prefix) assert result.exit_code == 0 assert json.loads(result.output)["commit_id"] == cid # --------------------------------------------------------------------------- # Integration — branch resolution # --------------------------------------------------------------------------- class TestBranchResolution: def test_resolve_branch_json(self, tmp_path: pathlib.Path) -> None: repo, cid = _populated_repo(tmp_path) result = _rev(repo, "--json", "main") assert result.exit_code == 0 data = json.loads(result.output) assert data["commit_id"] == cid assert data["ref"] == "main" def test_resolve_branch_text(self, tmp_path: pathlib.Path) -> None: repo, cid = _populated_repo(tmp_path) result = _rev(repo, "main") assert result.exit_code == 0 assert result.output.strip() == cid def test_json_flag_shorthand(self, tmp_path: pathlib.Path) -> None: repo, cid = _populated_repo(tmp_path) result = _rev(repo, "--json", "main") assert result.exit_code == 0 data = json.loads(result.output) assert data["commit_id"] == cid def test_unknown_branch_not_found(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _rev(repo, "nonexistent-branch") assert result.exit_code == ExitCode.USER_ERROR data = json.loads(result.output) assert data["commit_id"] is None assert data["error"] == "not found" # --------------------------------------------------------------------------- # Integration — HEAD resolution # --------------------------------------------------------------------------- class TestHeadResolution: def test_resolve_head(self, tmp_path: pathlib.Path) -> None: repo, cid = _populated_repo(tmp_path) result = _rev(repo, "--json", "HEAD") assert result.exit_code == 0 data = json.loads(result.output) assert data["commit_id"] == cid def test_head_lowercase_also_resolves(self, tmp_path: pathlib.Path) -> None: """HEAD resolution is case-insensitive (matches git behaviour).""" repo, cid = _populated_repo(tmp_path) result = _rev(repo, "--json", "head") assert result.exit_code == 0 data = json.loads(result.output) assert data["commit_id"] == cid def test_head_on_empty_repo_errors(self, tmp_path: pathlib.Path) -> None: """HEAD on a repo with no commits should error cleanly.""" repo = _make_repo(tmp_path) result = _rev(repo, "HEAD") assert result.exit_code == ExitCode.USER_ERROR data = json.loads(result.output) assert data["commit_id"] is None assert "no commits" in data["error"] # --------------------------------------------------------------------------- # Integration — SHA prefix resolution # --------------------------------------------------------------------------- class TestShaResolution: def test_resolve_full_sha(self, tmp_path: pathlib.Path) -> None: repo, cid = _populated_repo(tmp_path) result = _rev(repo, "--json", cid) assert result.exit_code == 0 data = json.loads(result.output) assert data["commit_id"] == cid def test_resolve_8char_prefix(self, tmp_path: pathlib.Path) -> None: repo, cid = _populated_repo(tmp_path) prefix = long_id(split_id(cid)[1][:8])# sha256: + first 8 hex chars result = _rev(repo, "--json", prefix) assert result.exit_code == 0 data = json.loads(result.output) assert data["commit_id"] == cid def test_ambiguous_prefix_returns_candidates(self, tmp_path: pathlib.Path) -> None: """Two commits sharing a prefix → error with candidates list.""" # Messages "commit-search-143" and "commit-search-346" produce IDs # sharing the 4-char hex prefix "c1d3" (same snapshot, same timestamp). _AMBIG_MSG_1 = "commit-search-143" _AMBIG_MSG_2 = "commit-search-346" _AMBIG_PREFIX = "c1d3" repo = _make_repo(tmp_path) sid = _store_snap(repo) cid1 = _make_commit(repo, sid, branch="main", message=_AMBIG_MSG_1) cid2 = _make_commit(repo, sid, branch="dev", message=_AMBIG_MSG_2) # cid1/cid2 are sha256:; compare the hex portion only assert split_id(cid1)[1][:4] == split_id(cid2)[1][:4] == _AMBIG_PREFIX _set_head(repo, "main", cid1) _set_head(repo, "dev", cid2) result = _rev(repo, long_id(_AMBIG_PREFIX)) assert result.exit_code == ExitCode.USER_ERROR data = json.loads(result.output) assert data["error"] == "ambiguous" assert set(data["candidates"]) == {cid1, cid2} def test_nonexistent_full_sha_not_found(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _rev(repo, long_id("f" * 64)) assert result.exit_code == ExitCode.USER_ERROR data = json.loads(result.output) assert data["error"] == "not found" # --------------------------------------------------------------------------- # Integration — --abbrev-ref # --------------------------------------------------------------------------- class TestAbbrevRef: def test_abbrev_ref_head_returns_branch_name(self, tmp_path: pathlib.Path) -> None: """The canonical agent UX: what branch am I on?""" repo = _make_repo(tmp_path, branch="feat/my-feature") result = _rev(repo, "--abbrev-ref", "--json", "HEAD") assert result.exit_code == 0 data = json.loads(result.output) assert data["branch"] == "feat/my-feature" assert data["ref"] == "HEAD" def test_abbrev_ref_text_format(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path, branch="dev") result = _rev(repo, "--abbrev-ref", "HEAD") assert result.exit_code == 0 assert result.output.strip() == "dev" def test_abbrev_ref_main(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path, branch="main") result = _rev(repo, "--abbrev-ref", "--json", "HEAD") assert result.exit_code == 0 assert json.loads(result.output)["branch"] == "main" # --------------------------------------------------------------------------- # Edge cases # --------------------------------------------------------------------------- class TestEdgeCases: def test_empty_ref_clean_error(self, tmp_path: pathlib.Path) -> None: """Empty string ref must give a clear 'ref must not be empty' error.""" repo = _make_repo(tmp_path) result = _rev(repo, "") assert result.exit_code == ExitCode.USER_ERROR data = json.loads(result.output) assert "empty" in data["error"] def test_unrecognized_flag_errors(self, tmp_path: pathlib.Path) -> None: repo, _ = _populated_repo(tmp_path) result = _rev(repo, "--no-such-flag", "main") assert result.exit_code != 0 def test_branch_with_slash_resolves(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path, branch="feat/my-feature") sid = _store_snap(repo) cid = _make_commit(repo, sid, branch="feat/my-feature", message="feat-init") _set_head(repo, "feat/my-feature", cid) result = _rev(repo, "--json", "feat/my-feature") assert result.exit_code == 0 assert json.loads(result.output)["commit_id"] == cid # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestSecurity: def test_ansi_in_ref_is_json_escaped(self, tmp_path: pathlib.Path) -> None: """ANSI escape in ref is safely JSON-encoded, never echoed raw.""" repo = _make_repo(tmp_path) malicious = "\x1b[31mmalicious\x1b[0m" result = _rev(repo, malicious) assert result.exit_code == ExitCode.USER_ERROR # Output is JSON — ANSI must be encoded as \u001b, not emitted raw assert "\x1b" not in result.output data = json.loads(result.output) assert data["error"] == "not found" def test_path_traversal_ref_gives_not_found(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _rev(repo, "../../../etc/passwd") assert result.exit_code == ExitCode.USER_ERROR def test_null_byte_in_ref(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _rev(repo, "branch\x00null") assert result.exit_code == ExitCode.USER_ERROR def test_no_traceback_on_bad_input(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _rev(repo, "") assert "Traceback" not in result.output # --------------------------------------------------------------------------- # JSON schema — duration_ms + exit_code on every output path # --------------------------------------------------------------------------- class TestJsonSchema: """Every JSON response must carry duration_ms (float ≥ 0) and exit_code (int).""" def _assert_schema(self, d: Mapping[str, object], expected_exit: int = 0) -> None: assert "duration_ms" in d, f"duration_ms missing: {d}" assert isinstance(d["duration_ms"], (int, float)) assert d["duration_ms"] >= 0 assert "exit_code" in d, f"exit_code missing: {d}" assert d["exit_code"] == expected_exit def test_branch_resolution_has_schema(self, tmp_path: pathlib.Path) -> None: repo, cid = _populated_repo(tmp_path) result = _rev(repo, "--json", "main") self._assert_schema(json.loads(result.output)) def test_head_resolution_has_schema(self, tmp_path: pathlib.Path) -> None: repo, cid = _populated_repo(tmp_path) result = _rev(repo, "--json", "HEAD") self._assert_schema(json.loads(result.output)) def test_sha_resolution_has_schema(self, tmp_path: pathlib.Path) -> None: repo, cid = _populated_repo(tmp_path) result = _rev(repo, "--json", cid) self._assert_schema(json.loads(result.output)) def test_abbrev_ref_has_schema(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path, branch="feat/x") result = _rev(repo, "--abbrev-ref", "--json", "HEAD") self._assert_schema(json.loads(result.output)) def test_prefix_resolution_has_schema(self, tmp_path: pathlib.Path) -> None: repo, cid = _populated_repo(tmp_path) prefix = long_id(split_id(cid)[1][:8]) result = _rev(repo, "--json", prefix) self._assert_schema(json.loads(result.output)) # --------------------------------------------------------------------------- # Error JSON — all error paths emit structured JSON to stdout # --------------------------------------------------------------------------- class TestErrorJson: """Every error must emit a parseable JSON dict to stdout (not stderr).""" def _assert_error(self, result: InvokeResult) -> Mapping[str, object]: assert result.exit_code != 0, "expected non-zero exit" d = json.loads(result.output) # stdout, not stderr assert "error" in d assert "duration_ms" in d, f"duration_ms missing from error: {d}" assert "exit_code" in d assert d["exit_code"] != 0 return d def test_empty_ref_emits_json_to_stdout(self, tmp_path: pathlib.Path) -> None: """Empty ref error must land on stdout as JSON, not stderr.""" repo = _make_repo(tmp_path) result = _rev(repo, "") self._assert_error(result) assert "empty" in json.loads(result.output)["error"] def test_not_found_emits_json_to_stdout(self, tmp_path: pathlib.Path) -> None: """Not-found error must land on stdout as JSON, not stderr.""" repo = _make_repo(tmp_path) result = _rev(repo, "no-such-ref") self._assert_error(result) assert json.loads(result.output)["error"] == "not found" def test_not_found_has_schema(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _rev(repo, "nonexistent-branch") self._assert_error(result) assert json.loads(result.output)["error"] == "not found" def test_ambiguous_prefix_has_schema(self, tmp_path: pathlib.Path) -> None: _AMBIG_MSG_1 = "commit-search-143" _AMBIG_MSG_2 = "commit-search-346" _AMBIG_PREFIX = "c1d3" repo = _make_repo(tmp_path) sid = _store_snap(repo) cid1 = _make_commit(repo, sid, branch="main", message=_AMBIG_MSG_1) cid2 = _make_commit(repo, sid, branch="dev", message=_AMBIG_MSG_2) assert split_id(cid1)[1][:4] == split_id(cid2)[1][:4] == _AMBIG_PREFIX _set_head(repo, "main", cid1) _set_head(repo, "dev", cid2) result = _rev(repo, long_id(_AMBIG_PREFIX)) d = self._assert_error(result) assert d["error"] == "ambiguous" def test_head_no_commits_has_schema(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _rev(repo, "HEAD") self._assert_error(result) assert "no commits" in json.loads(result.output)["error"] def test_bare_hex_has_schema(self, tmp_path: pathlib.Path) -> None: repo, cid = _populated_repo(tmp_path) result = _rev(repo, split_id(cid)[1]) d = self._assert_error(result) assert "sha256:" in d["error"] def test_error_json_has_ref_key(self, tmp_path: pathlib.Path) -> None: """Every error dict must echo back the ref the caller passed.""" repo = _make_repo(tmp_path) result = _rev(repo, "missing-branch") d = json.loads(result.output) assert d["ref"] == "missing-branch" class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: import argparse from muse.cli.commands.rev_parse import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["rev-parse", "HEAD"]) assert args.json_out is False def test_json_flag_sets_json_out(self) -> None: import argparse from muse.cli.commands.rev_parse import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["rev-parse", "HEAD", "--json"]) assert args.json_out is True def test_j_shorthand_sets_json_out(self) -> None: import argparse from muse.cli.commands.rev_parse import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["rev-parse", "HEAD", "-j"]) assert args.json_out is True