"""Comprehensive tests for ``muse show-ref``. Audit findings addressed here ------------------------------ Security - Format error now goes to stderr (was stdout) — verified below. - ANSI injection in branch names and commit IDs stripped in text mode. - Symlink refs in .muse/refs/heads/ are silently skipped. - Ref files with non-hex content are silently skipped. Agent UX - ``--verify`` now emits JSON when combined with ``--json`` (was silent). - ``--count`` added for branch inventory without reading all commit IDs. - ``--pattern`` default changed from ``""`` to ``None`` (cleaner guard). Performance - Symlink check and commit-ID validation happen in ``_list_branch_refs`` before the output path, so corrupt refs never surface to callers. Coverage tiers -------------- - Unit: _list_branch_refs, _head_info, _ShowRefResult schema - Integration: JSON/text output, --head, --verify (json + exit), --count, --pattern, empty repo, no HEAD commit, multi-branch sorting - Security: ANSI stripped in text mode, symlinks skipped, invalid commit IDs skipped, format error to stderr, no traceback on errors - Stress: 200 sequential full-list calls, 100-branch repo listing """ from __future__ import annotations import json import os import pathlib import pytest from muse.core.types import long_id from muse.core.errors import ExitCode from muse.core.paths import heads_dir, ref_path, muse_dir from tests.cli_test_helper import CliRunner, InvokeResult runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _FAKE_OID = long_id("a" * 64) def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: repo = tmp_path / "repo" muse = muse_dir(repo) (muse / "objects").mkdir(parents=True) (muse / "commits").mkdir(parents=True) (muse / "snapshots").mkdir(parents=True) (muse / "refs" / "heads").mkdir(parents=True) (muse / "HEAD").write_text("ref: refs/heads/main") (muse / "repo.json").write_text(json.dumps({"repo_id": "r1", "domain": "code"})) return repo def _write_ref(repo: pathlib.Path, branch: str, commit_id: str = _FAKE_OID) -> None: branch_ref = ref_path(repo, branch) branch_ref.write_text(commit_id) def _sr(repo: pathlib.Path, *args: str) -> InvokeResult: from muse.cli.app import main as cli return runner.invoke(cli, ["show-ref", *args], env={"MUSE_REPO_ROOT": str(repo)}) # --------------------------------------------------------------------------- # Unit — private helpers # --------------------------------------------------------------------------- class TestListBranchRefs: def test_empty_heads_dir(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.show_ref import _list_branch_refs repo = _make_repo(tmp_path) assert _list_branch_refs(repo) == [] def test_single_ref(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.show_ref import _list_branch_refs repo = _make_repo(tmp_path) _write_ref(repo, "main") refs = _list_branch_refs(repo) assert len(refs) == 1 assert refs[0]["ref"] == "refs/heads/main" assert refs[0]["commit_id"] == _FAKE_OID def test_multiple_refs_sorted(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.show_ref import _list_branch_refs repo = _make_repo(tmp_path) _write_ref(repo, "zeta", long_id("b" * 64)) _write_ref(repo, "alpha", long_id("c" * 64)) _write_ref(repo, "main", long_id("d" * 64)) refs = _list_branch_refs(repo) names = [r["ref"] for r in refs] assert names == ["refs/heads/alpha", "refs/heads/main", "refs/heads/zeta"] def test_invalid_commit_id_skipped(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.show_ref import _list_branch_refs repo = _make_repo(tmp_path) _write_ref(repo, "good", long_id("a" * 64)) _write_ref(repo, "bad", "not-a-sha256") refs = _list_branch_refs(repo) assert len(refs) == 1 assert refs[0]["ref"] == "refs/heads/good" def test_empty_ref_file_skipped(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.show_ref import _list_branch_refs repo = _make_repo(tmp_path) _write_ref(repo, "empty", "") assert _list_branch_refs(repo) == [] def test_symlink_ref_skipped(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.show_ref import _list_branch_refs repo = _make_repo(tmp_path) _write_ref(repo, "real", long_id("a" * 64)) sym = heads_dir(repo) / "sym-branch" sym.symlink_to(heads_dir(repo) / "real") refs = _list_branch_refs(repo) # Only the real branch should appear; the symlink is skipped. assert len(refs) == 1 assert refs[0]["ref"] == "refs/heads/real" def test_nonexistent_heads_dir(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.show_ref import _list_branch_refs repo = _make_repo(tmp_path) import shutil shutil.rmtree(heads_dir(repo)) assert _list_branch_refs(repo) == [] class TestHeadInfo: def test_returns_none_on_empty_branch(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.show_ref import _head_info repo = _make_repo(tmp_path) # HEAD points to main but no ref file written → commit_id is None assert _head_info(repo) is None def test_returns_info_when_commit_present(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.show_ref import _head_info repo = _make_repo(tmp_path) _write_ref(repo, "main") info = _head_info(repo) assert info is not None assert info["branch"] == "main" assert info["commit_id"] == _FAKE_OID assert info["ref"] == "refs/heads/main" def test_schema_fields_present(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.show_ref import _HeadInfo fields = set(_HeadInfo.__annotations__) assert fields == {"ref", "branch", "commit_id"} class TestShowRefResultSchema: def test_schema_fields(self) -> None: from muse.cli.commands.show_ref import _ShowRefResult fields = set(_ShowRefResult.__annotations__) assert "refs" in fields assert "head" in fields assert "count" in fields assert "duration_ms" in fields assert "exit_code" in fields # --------------------------------------------------------------------------- # Integration — JSON output # --------------------------------------------------------------------------- class TestJsonOutput: def test_empty_repo_zero_refs(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _sr(repo, "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["refs"] == [] assert data["count"] == 0 def test_single_branch_present(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_ref(repo, "main") data = json.loads(_sr(repo, "--json").output) assert data["count"] == 1 assert data["refs"][0]["ref"] == "refs/heads/main" assert data["refs"][0]["commit_id"] == _FAKE_OID def test_head_present_when_commit_exists(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_ref(repo, "main") data = json.loads(_sr(repo, "--json").output) assert data["head"] is not None assert data["head"]["branch"] == "main" def test_head_null_when_no_commit(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) data = json.loads(_sr(repo, "--json").output) assert data["head"] is None def test_json_shorthand_flag(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _sr(repo, "--json") assert result.exit_code == 0 assert "refs" in json.loads(result.output) def test_multi_branch_sorted(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_ref(repo, "zeta", "b" * 64) _write_ref(repo, "alpha", "c" * 64) data = json.loads(_sr(repo, "--json").output) refs = [r["ref"] for r in data["refs"]] assert refs == sorted(refs) # --------------------------------------------------------------------------- # Integration — text output # --------------------------------------------------------------------------- class TestTextOutput: def test_commit_id_in_output(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_ref(repo, "main") result = _sr(repo) assert result.exit_code == 0 assert _FAKE_OID in result.output def test_head_marker_present(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_ref(repo, "main") result = _sr(repo) assert "* " in result.output assert "(HEAD)" in result.output def test_empty_repo_no_output(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _sr(repo) assert result.exit_code == 0 assert result.output.strip() == "" # --------------------------------------------------------------------------- # Integration — --head mode # --------------------------------------------------------------------------- class TestHeadMode: def test_json_head_present(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_ref(repo, "main") data = json.loads(_sr(repo, "--head", "--json").output) assert data["head"]["branch"] == "main" def test_json_head_null(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) data = json.loads(_sr(repo, "--head", "--json").output) assert data["head"] is None def test_text_head_present(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_ref(repo, "main") result = _sr(repo, "--head") assert "(HEAD)" in result.output assert _FAKE_OID in result.output def test_text_no_head(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _sr(repo, "--head") assert "no HEAD commit" in result.output # --------------------------------------------------------------------------- # Integration — --verify mode (agent UX supercharge) # --------------------------------------------------------------------------- class TestVerifyMode: def test_existing_ref_exits_0(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_ref(repo, "main") result = _sr(repo, "--verify", "refs/heads/main") assert result.exit_code == 0 def test_missing_ref_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _sr(repo, "--verify", "refs/heads/nonexistent") assert result.exit_code == ExitCode.USER_ERROR def test_json_verify_exists_true(self, tmp_path: pathlib.Path) -> None: """JSON output is now emitted — critical agent UX improvement.""" repo = _make_repo(tmp_path) _write_ref(repo, "main") result = _sr(repo, "--verify", "refs/heads/main", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["exists"] is True assert data["ref"] == "refs/heads/main" def test_json_verify_exists_false(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _sr(repo, "--verify", "refs/heads/ghost", "--json") assert result.exit_code == ExitCode.USER_ERROR data = json.loads(result.output) assert data["exists"] is False assert data["ref"] == "refs/heads/ghost" # --------------------------------------------------------------------------- # Integration — --count mode (new) # --------------------------------------------------------------------------- class TestCountMode: def test_json_count_zero(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) data = json.loads(_sr(repo, "--count", "--json").output) assert data["count"] == 0 def test_json_count_with_branches(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_ref(repo, "main") _write_ref(repo, "dev", long_id("b" * 64)) data = json.loads(_sr(repo, "--count", "--json").output) assert data["count"] == 2 def test_text_count(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_ref(repo, "main") result = _sr(repo, "--count") assert result.exit_code == 0 assert result.output.strip() == "1" def test_count_with_pattern(self, tmp_path: pathlib.Path) -> None: """--count respects --pattern filter.""" repo = _make_repo(tmp_path) _write_ref(repo, "main") _write_ref(repo, "feat-x", long_id("b" * 64)) _write_ref(repo, "feat-y", long_id("c" * 64)) data = json.loads(_sr(repo, "--count", "--pattern", "refs/heads/feat*", "--json").output) assert data["count"] == 2 # --------------------------------------------------------------------------- # Integration — --pattern filter # --------------------------------------------------------------------------- class TestPatternFilter: def test_pattern_matches(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_ref(repo, "feat-a", long_id("b" * 64)) _write_ref(repo, "feat-b", long_id("c" * 64)) _write_ref(repo, "main") data = json.loads(_sr(repo, "--pattern", "refs/heads/feat*", "--json").output) assert data["count"] == 2 for r in data["refs"]: assert r["ref"].startswith("refs/heads/feat") def test_pattern_no_match(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_ref(repo, "main") data = json.loads(_sr(repo, "--pattern", "refs/heads/release/*", "--json").output) assert data["count"] == 0 assert data["refs"] == [] # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestSecurity: def test_ansi_in_branch_name_stripped_text(self, tmp_path: pathlib.Path) -> None: """Branch name with ANSI escape in ref path is sanitized in text output.""" repo = _make_repo(tmp_path) ansi_branch = "\x1b[31mmalicious\x1b[0m" branch_ref = ref_path(repo, ansi_branch) branch_ref.write_text(long_id("a" * 64)) result = _sr(repo) assert "\x1b" not in result.output def test_ansi_in_commit_id_stripped_text(self, tmp_path: pathlib.Path) -> None: """Commit ID with ANSI in ref file content is sanitized. (The ref is also skipped by validate_object_id — no ANSI ever reaches output.) """ repo = _make_repo(tmp_path) _write_ref(repo, "main", f"\x1b[31m{'a' * 60}") result = _sr(repo) assert "\x1b" not in result.output def test_format_error_to_stderr(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _sr(repo, "--format", "yaml") assert result.exit_code != 0 assert "error" in result.stderr.lower() assert result.stdout_bytes == b"" def test_no_traceback_on_bad_format(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _sr(repo, "--format", "xml") assert "Traceback" not in result.output def test_symlink_ref_not_included(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_ref(repo, "real", long_id("a" * 64)) sym = heads_dir(repo) / "sym" sym.symlink_to(heads_dir(repo) / "real") data = json.loads(_sr(repo, "--json").output) ref_names = [r["ref"] for r in data["refs"]] assert "refs/heads/sym" not in ref_names assert "refs/heads/real" in ref_names def test_invalid_commit_id_ref_not_included(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_ref(repo, "good", long_id("a" * 64)) _write_ref(repo, "corrupt", "not-a-sha256-at-all") data = json.loads(_sr(repo, "--json").output) ref_names = [r["ref"] for r in data["refs"]] assert "refs/heads/good" in ref_names assert "refs/heads/corrupt" not in ref_names def test_path_traversal_in_pattern_safe(self, tmp_path: pathlib.Path) -> None: """A crafted pattern cannot escape the ref listing via fnmatch.""" repo = _make_repo(tmp_path) _write_ref(repo, "main") result = _sr(repo, "--pattern", "../../../../etc/*", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["count"] == 0 # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestStress: def test_200_sequential_calls(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_ref(repo, "main") for i in range(200): result = _sr(repo, "--json") assert result.exit_code == 0, f"failed at iteration {i}" assert json.loads(result.output)["count"] == 1 def test_100_branch_repo(self, tmp_path: pathlib.Path) -> None: """Listing 100 branches must complete and return the correct count.""" repo = _make_repo(tmp_path) hex_chars = "0123456789abcdef" for i in range(100): # Build a deterministic valid sha256:-prefixed commit ID. oid = long_id((hex_chars[i % 16]) * 64) _write_ref(repo, f"branch-{i:04d}", oid) data = json.loads(_sr(repo, "--json").output) assert data["count"] == 100 # All refs must be sorted lexicographically. names = [r["ref"] for r in data["refs"]] assert names == sorted(names) def test_100_verify_calls(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_ref(repo, "main") for i in range(100): result = _sr(repo, "--verify", "refs/heads/main", "--json") assert result.exit_code == 0, f"failed at iteration {i}" assert json.loads(result.output)["exists"] is True