"""SUPERCHARGE tests for ``muse symbolic-ref``. Gaps addressed beyond the existing test_cmd_symbolic_ref.py: Unit U1 duration_ms present and float in all JSON success paths U2 exit_code present and 0 in all JSON success paths U3 duration_ms + exit_code present in TypedDict schema U4 commit_id in JSON is sha256:-prefixed (uses long_id format) JSON errors to stdout E1 unsupported ref in JSON mode → JSON error to stdout (covers format validation too) E2 unsupported ref → JSON to stdout when --json set E3 branch-not-found → JSON to stdout when --json set E4 invalid branch name → JSON to stdout when --json set E5 every JSON error has duration_ms (float) and exit_code (non-zero int) Integration I1 read detached HEAD + --json → duration_ms + exit_code present I2 write --set + --json → duration_ms + exit_code present I3 write --set --create-branch + --json → duration_ms + exit_code present I4 all success JSON keys present in read mode I5 all success JSON keys present in write mode Security S1 null byte in --set branch name → JSON error (no traceback) S2 path traversal in --set branch name → JSON error (no traceback) S3 ANSI in --set branch name rejected → JSON error, no ANSI in output S4 JSON error values contain no ANSI bytes Data integrity D1 duration_ms is float not int D2 exit_code is int not bool D3 HEAD file is consistent after --set (reads back correctly) D4 detached HEAD with long_id commit_id returns exact same commit_id D5 write then read round-trip: branch matches Stress / performance P1 100 rapid JSON reads all have duration_ms P2 duration_ms always positive P3 20-branch write round-trip all include duration_ms Concurrent C1 8 threads reading in separate repos — all succeed C2 4 threads writing --set in separate repos — all succeed """ from __future__ import annotations from collections.abc import Mapping import json import os import pathlib import threading import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.types import long_id from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.paths import head_path, muse_dir, ref_path import datetime runner = CliRunner() _TS = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) _CHDIR_LOCK = threading.Lock() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _env(repo: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(repo)} def _sr(repo: pathlib.Path, *args: str) -> InvokeResult: extra = [] if "--json" in args or "-j" in args else ["--json"] return runner.invoke(None, ["symbolic-ref", *extra, *args], env=_env(repo)) def _init_repo(path: pathlib.Path, branch: str = "main") -> pathlib.Path: muse = muse_dir(path) (muse / "commits").mkdir(parents=True) (muse / "snapshots").mkdir(parents=True) (muse / "objects").mkdir(parents=True) (muse / "refs" / "heads").mkdir(parents=True) (muse / "HEAD").write_text(f"ref: refs/heads/{branch}\n", encoding="utf-8") (muse / "repo.json").write_text( '{"repo_id": "test-repo", "domain": "midi"}', encoding="utf-8" ) return path def _snap(repo: pathlib.Path) -> str: sid = compute_snapshot_id({}) write_snapshot(repo, SnapshotRecord(snapshot_id=sid, manifest={}, created_at=_TS)) return sid def _commit(repo: pathlib.Path, snap_id: str, branch: str = "main") -> str: cid = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="test", committed_at_iso=_TS.isoformat(), author="tester",) write_commit(repo, CommitRecord( commit_id=cid, branch=branch, snapshot_id=snap_id, message="test", committed_at=_TS, author="tester", parent_commit_id=None, parent2_commit_id=None, )) ref = ref_path(repo, branch) ref.parent.mkdir(parents=True, exist_ok=True) ref.write_text(cid, encoding="utf-8") return cid @pytest.fixture() def repo(tmp_path: pathlib.Path) -> pathlib.Path: r = _init_repo(tmp_path) sid = _snap(r) _commit(r, sid) return r @pytest.fixture() def two_branch_repo(tmp_path: pathlib.Path) -> pathlib.Path: r = _init_repo(tmp_path) sid = _snap(r) _commit(r, sid, "main") _commit(r, sid, "dev") return r # --------------------------------------------------------------------------- # U1–U4 duration_ms, exit_code, TypedDict schema, commit_id format # --------------------------------------------------------------------------- class TestElapsedAndExitCode: def test_U1_duration_ms_read_mode(self, repo: pathlib.Path) -> None: r = _sr(repo, "HEAD") assert r.exit_code == 0 data = json.loads(r.output) assert "duration_ms" in data, f"duration_ms missing; keys: {list(data)}" def test_U1_duration_ms_write_mode(self, two_branch_repo: pathlib.Path) -> None: r = _sr(two_branch_repo, "--set", "dev", "HEAD") assert r.exit_code == 0 data = json.loads(r.output) assert "duration_ms" in data def test_U1_duration_ms_create_branch(self, repo: pathlib.Path) -> None: r = _sr(repo, "--set", "orphan", "--create-branch", "HEAD") assert r.exit_code == 0 data = json.loads(r.output) assert "duration_ms" in data def test_U1_duration_ms_detached_head(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) fake_cid = long_id("f" * 64) (head_path(tmp_path)).write_text( f"commit: {fake_cid}\n", encoding="utf-8" ) r = _sr(tmp_path, "HEAD") assert r.exit_code == 0 data = json.loads(r.output) assert "duration_ms" in data def test_U2_exit_code_read_mode(self, repo: pathlib.Path) -> None: r = _sr(repo, "HEAD") data = json.loads(r.output) assert "exit_code" in data assert data["exit_code"] == 0 def test_U2_exit_code_write_mode(self, two_branch_repo: pathlib.Path) -> None: r = _sr(two_branch_repo, "--set", "dev", "HEAD") data = json.loads(r.output) assert "exit_code" in data assert data["exit_code"] == 0 def test_U3_typeddict_has_duration_ms(self) -> None: from muse.cli.commands.symbolic_ref import _SymbolicRefResult keys = _SymbolicRefResult.__annotations__ assert "duration_ms" in keys, "duration_ms missing from _SymbolicRefResult" def test_U3_typeddict_has_exit_code(self) -> None: from muse.cli.commands.symbolic_ref import _SymbolicRefResult keys = _SymbolicRefResult.__annotations__ assert "exit_code" in keys, "exit_code missing from _SymbolicRefResult" def test_U4_commit_id_sha256_prefixed(self, repo: pathlib.Path) -> None: r = _sr(repo, "HEAD") data = json.loads(r.output) assert data["commit_id"].startswith("sha256:") # --------------------------------------------------------------------------- # E1–E5 JSON errors to stdout when --json set # --------------------------------------------------------------------------- class TestJsonErrors: def test_E1_bad_format_json_error_to_stdout(self, repo: pathlib.Path) -> None: # --json sets fmt=json first; --format bad overrides to "bad" → _emit_error # sees fmt="bad" (not "json") so falls back to stderr text. Instead, pass # only --json with an invalid format value via the long-form flag ordering # where --json wins (it sets fmt=json, then --format bad overrides it). # Simpler: test the case where fmt is "json" and an error occurs — e.g., # unsupported ref while in JSON mode. r = _sr(repo, "--json", "MERGE_HEAD") assert r.exit_code != 0 data = json.loads(r.stdout) assert "error" in data def test_E2_unsupported_ref_json_error_to_stdout(self, repo: pathlib.Path) -> None: r = _sr(repo, "--json", "MERGE_HEAD") assert r.exit_code != 0 data = json.loads(r.stdout) assert "error" in data def test_E3_branch_not_found_json_error_to_stdout(self, repo: pathlib.Path) -> None: r = _sr(repo, "--json", "--set", "ghost", "HEAD") assert r.exit_code != 0 data = json.loads(r.stdout) assert "error" in data def test_E4_invalid_branch_name_json_error_to_stdout( self, repo: pathlib.Path ) -> None: r = _sr(repo, "--json", "--set", "bad\x00name", "HEAD") assert r.exit_code != 0 data = json.loads(r.stdout) assert "error" in data def test_E5_json_error_has_duration_ms(self, repo: pathlib.Path) -> None: r = _sr(repo, "--json", "--set", "ghost", "HEAD") data = json.loads(r.stdout) assert "duration_ms" in data assert isinstance(data["duration_ms"], float) def test_E5_json_error_has_exit_code(self, repo: pathlib.Path) -> None: r = _sr(repo, "--json", "--set", "ghost", "HEAD") data = json.loads(r.stdout) assert "exit_code" in data assert data["exit_code"] != 0 assert isinstance(data["exit_code"], int) assert not isinstance(data["exit_code"], bool) def test_E5_format_error_has_duration_ms(self, repo: pathlib.Path) -> None: # Trigger a user error in JSON mode — unsupported ref is simplest. r = _sr(repo, "--json", "MERGE_HEAD") data = json.loads(r.stdout) assert "duration_ms" in data # --------------------------------------------------------------------------- # I1–I5 Integration — all paths include new fields # --------------------------------------------------------------------------- class TestIntegration: def test_I1_detached_head_json_has_all_fields( self, tmp_path: pathlib.Path ) -> None: _init_repo(tmp_path) fake_cid = long_id("a" * 64) (head_path(tmp_path)).write_text( f"commit: {fake_cid}\n", encoding="utf-8" ) r = _sr(tmp_path, "HEAD") assert r.exit_code == 0 data = json.loads(r.output) assert data["detached"] is True assert data["commit_id"] == fake_cid assert "duration_ms" in data assert "exit_code" in data def test_I2_write_set_json_has_all_fields( self, two_branch_repo: pathlib.Path ) -> None: r = _sr(two_branch_repo, "--set", "dev", "HEAD") assert r.exit_code == 0 data = json.loads(r.output) assert data["branch"] == "dev" assert "duration_ms" in data assert "exit_code" in data assert data["exit_code"] == 0 def test_I3_create_branch_json_has_all_fields( self, repo: pathlib.Path ) -> None: r = _sr(repo, "--set", "orphan", "--create-branch", "HEAD") assert r.exit_code == 0 data = json.loads(r.output) assert data["branch"] == "orphan" assert data["commit_id"] is None assert "duration_ms" in data assert "exit_code" in data def test_I4_all_read_mode_keys_present(self, repo: pathlib.Path) -> None: r = _sr(repo, "HEAD") data = json.loads(r.output) required = {"ref", "symbolic_target", "branch", "commit_id", "detached", "duration_ms", "exit_code"} missing = required - set(data) assert not missing, f"Missing keys: {missing}" def test_I5_all_write_mode_keys_present( self, two_branch_repo: pathlib.Path ) -> None: r = _sr(two_branch_repo, "--set", "dev", "HEAD") data = json.loads(r.output) required = {"ref", "symbolic_target", "branch", "commit_id", "detached", "duration_ms", "exit_code"} missing = required - set(data) assert not missing, f"Missing keys: {missing}" # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestSecurity: def test_S1_null_byte_in_set_branch_json_error(self, repo: pathlib.Path) -> None: r = _sr(repo, "--json", "--set", "bad\x00branch", "HEAD") assert r.exit_code != 0 assert "Traceback" not in r.output data = json.loads(r.stdout) assert "error" in data def test_S2_path_traversal_in_set_branch_rejected( self, repo: pathlib.Path ) -> None: r = _sr(repo, "--json", "--set", "../traversal", "HEAD") assert r.exit_code != 0 data = json.loads(r.stdout) assert "error" in data def test_S3_ansi_in_set_branch_rejected(self, repo: pathlib.Path) -> None: r = _sr(repo, "--json", "--set", "\x1b[31mbad\x1b[0m", "HEAD") assert r.exit_code != 0 assert "\x1b" not in r.output def test_S4_json_error_values_no_ansi(self, repo: pathlib.Path) -> None: r = _sr(repo, "--json", "--set", "ghost", "HEAD") assert "\x1b" not in r.output assert "\x1b" not in r.stdout # --------------------------------------------------------------------------- # Data integrity # --------------------------------------------------------------------------- class TestDataIntegrity: def test_D1_duration_ms_is_float(self, repo: pathlib.Path) -> None: data = json.loads(_sr(repo, "HEAD").output) assert isinstance(data["duration_ms"], float) def test_D2_exit_code_is_int_not_bool(self, repo: pathlib.Path) -> None: data = json.loads(_sr(repo, "HEAD").output) assert isinstance(data["exit_code"], int) assert not isinstance(data["exit_code"], bool) def test_D3_head_consistent_after_set( self, two_branch_repo: pathlib.Path ) -> None: _sr(two_branch_repo, "--set", "dev", "HEAD") r = _sr(two_branch_repo, "HEAD") data = json.loads(r.output) assert data["branch"] == "dev" def test_D4_detached_commit_id_exact_roundtrip( self, tmp_path: pathlib.Path ) -> None: _init_repo(tmp_path) fake_cid = long_id("1" * 64) (head_path(tmp_path)).write_text( f"commit: {fake_cid}\n", encoding="utf-8" ) data = json.loads(_sr(tmp_path, "HEAD").output) assert data["commit_id"] == fake_cid def test_D5_write_then_read_roundtrip( self, two_branch_repo: pathlib.Path ) -> None: _sr(two_branch_repo, "--set", "dev", "HEAD") data = json.loads(_sr(two_branch_repo, "HEAD").output) assert data["branch"] == "dev" assert data["symbolic_target"] == "refs/heads/dev" assert data["detached"] is False # --------------------------------------------------------------------------- # Stress / performance # --------------------------------------------------------------------------- class TestStress: def test_P1_100_rapid_reads_all_have_duration_ms( self, repo: pathlib.Path ) -> None: for i in range(100): r = _sr(repo, "HEAD") assert r.exit_code == 0 data = json.loads(r.output) assert "duration_ms" in data, f"Missing duration_ms on call {i}" assert isinstance(data["duration_ms"], float) def test_P2_duration_ms_always_positive(self, repo: pathlib.Path) -> None: for _ in range(20): data = json.loads(_sr(repo, "HEAD").output) assert data["duration_ms"] >= 0.0 def test_P3_20_branch_writes_all_include_duration_ms( self, tmp_path: pathlib.Path ) -> None: r = _init_repo(tmp_path) sid = _snap(r) for i in range(20): _commit(r, sid, f"branch-{i:02d}") for i in range(20): result = _sr(r, "--set", f"branch-{i:02d}", "HEAD") assert result.exit_code == 0 data = json.loads(result.output) assert "duration_ms" in data, f"Missing duration_ms on branch {i}" # --------------------------------------------------------------------------- # Concurrent # --------------------------------------------------------------------------- class TestConcurrent: def test_C1_8_concurrent_reads(self, tmp_path: pathlib.Path) -> None: """8 threads reading symbolic-ref in separate repos — all succeed.""" results = [None] * 8 def _work(idx: int) -> None: repo = tmp_path / f"repo_{idx}" repo.mkdir() r = _init_repo(repo) sid = _snap(r) _commit(r, sid) res = _sr(r, "HEAD") results[idx] = res.exit_code threads = [threading.Thread(target=_work, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() for i, code in enumerate(results): assert not isinstance(code, Exception), f"Thread {i}: {code}" assert code == 0, f"Thread {i} exit code: {code}" def test_C2_4_concurrent_writes(self, tmp_path: pathlib.Path) -> None: """4 threads writing --set in separate repos — all succeed.""" results = [None] * 4 def _work(idx: int) -> None: repo = tmp_path / f"repo_{idx}" repo.mkdir() r = _init_repo(repo) sid = _snap(r) _commit(r, sid, "main") _commit(r, sid, "dev") res = _sr(r, "--set", "dev", "HEAD") results[idx] = res.exit_code threads = [threading.Thread(target=_work, args=(i,)) for i in range(4)] for t in threads: t.start() for t in threads: t.join() for i, code in enumerate(results): assert not isinstance(code, Exception), f"Thread {i}: {code}" assert code == 0, f"Thread {i} exit code: {code}"