"""SUPERCHARGE tests for ``muse status``. Gaps addressed beyond the existing test_cmd_status.py + test_status_json_schema.py: Unit U1 duration_ms present and non-negative in JSON (both code and non-code paths) U2 exit_code present and correct in JSON U3 sparse_checkout key present in JSON — null when disabled, dict when active U4 sparse_checkout.mode / patterns / enabled match live config U5 _compute_upstream_info: no-local-head edge case Integration I1 Code-domain upstream (ahead/behind) appears in JSON — was hardcoded None (bug) I2 --branch-only always emits merge_in_progress, merge_from, conflict_count I3 --branch-only exits 0 even with --exit-code flag I4 checkout_interrupted=True when CHECKOUT_HEAD file exists I5 checkout_target matches CHECKOUT_HEAD content I6 checkout_interrupted=False when CHECKOUT_HEAD absent I7 --short + --json produces valid JSON with duration_ms I8 duration_ms > 0 (real timing, not placeholder) Security S1 merge_from with ANSI in JSON value is safe (no raw escape bytes) S2 checkout_target with ANSI in JSON value is safe S3 branch name with ANSI in JSON value is safe S4 JSON output has no raw \x1b bytes regardless of state Data integrity D1 duration_ms is a float (not int, not string) D2 exit_code is an int (not bool, not string) D3 sparse_checkout.patterns is always a list in JSON D4 total_changes accounts for renamed when present (non-code domain) D5 sparse_checkout survives disable — re-query after disable returns null Performance / stress P1 5 000-file repo status completes, duration_ms < 10 000 P2 10 rapid sequential status calls — duration_ms present in every response P3 duration_ms is consistent (two clean-tree calls within 2x of each other) Concurrent C1 4 concurrent status calls on separate repos all succeed """ from __future__ import annotations from collections.abc import Mapping import json import os import pathlib import threading import time _CHDIR_LOCK = threading.Lock() import pytest from tests.cli_test_helper import CliRunner from muse.core.types import long_id from muse.core.paths import head_path, muse_dir runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _env(root: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(root)} def _invoke(root: pathlib.Path, *args: str) -> Mapping[str, object]: result = runner.invoke(None, list(args), env=_env(root)) return result def _status(root: pathlib.Path, *extra: str) -> Mapping[str, object]: result = runner.invoke(None, ["status", "--json", *extra], env=_env(root)) assert result.exit_code == 0, f"status failed: {result.stderr}\n{result.stdout}" return json.loads(result.stdout) def _init_repo(tmp: pathlib.Path, *, domain: str = "code") -> pathlib.Path: tmp.mkdir(parents=True, exist_ok=True) with _CHDIR_LOCK: saved = os.getcwd() try: os.chdir(tmp) result = runner.invoke(None, ["init", "--domain", domain], env=_env(tmp)) finally: os.chdir(saved) assert result.exit_code == 0, f"init failed: {result.stderr}" return tmp def _commit(root: pathlib.Path, msg: str = "commit") -> None: r = runner.invoke(None, ["commit", "-m", msg], env=_env(root)) assert r.exit_code == 0, f"commit failed: {r.stderr}" def _fresh_code_repo(tmp: pathlib.Path) -> pathlib.Path: _init_repo(tmp, domain="code") (tmp / "main.py").write_text("x = 1\n") runner.invoke(None, ["code", "add", "main.py"], env=_env(tmp)) _commit(tmp, "initial") return tmp def _set_sparse(root: pathlib.Path, *patterns: str) -> None: runner.invoke(None, ["sparse-checkout", "init"], env=_env(root)) runner.invoke(None, ["sparse-checkout", "set", *patterns], env=_env(root)) def _disable_sparse(root: pathlib.Path) -> None: runner.invoke(None, ["sparse-checkout", "disable"], env=_env(root)) # --------------------------------------------------------------------------- # U1–U2 duration_ms and exit_code in JSON # --------------------------------------------------------------------------- class TestElapsedAndExitCode: def test_U1_duration_ms_present_code_domain(self, tmp_path: pathlib.Path) -> None: root = _fresh_code_repo(tmp_path) data = _status(root) assert "duration_ms" in data, "duration_ms missing from status JSON" def test_U1_duration_ms_present_non_code_domain(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path, domain="mist") data = _status(root) assert "duration_ms" in data, "duration_ms missing from non-code-domain status JSON" def test_U1_duration_ms_non_negative(self, tmp_path: pathlib.Path) -> None: root = _fresh_code_repo(tmp_path) data = _status(root) assert data["duration_ms"] >= 0 def test_U2_exit_code_present(self, tmp_path: pathlib.Path) -> None: root = _fresh_code_repo(tmp_path) data = _status(root) assert "exit_code" in data def test_U2_exit_code_zero_when_clean(self, tmp_path: pathlib.Path) -> None: root = _fresh_code_repo(tmp_path) data = _status(root) assert data["exit_code"] == 0 def test_U2_exit_code_zero_when_dirty(self, tmp_path: pathlib.Path) -> None: """exit_code in JSON payload is always 0 — it reflects command success.""" root = _fresh_code_repo(tmp_path) (root / "new.py").write_text("y = 1\n") data = _status(root) assert data["exit_code"] == 0 def test_U2_exit_code_present_in_branch_only(self, tmp_path: pathlib.Path) -> None: root = _fresh_code_repo(tmp_path) result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root)) data = json.loads(result.stdout) assert "exit_code" in data def test_U2_duration_ms_present_in_branch_only(self, tmp_path: pathlib.Path) -> None: root = _fresh_code_repo(tmp_path) result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root)) data = json.loads(result.stdout) assert "duration_ms" in data # --------------------------------------------------------------------------- # U3–U4 sparse_checkout field # --------------------------------------------------------------------------- class TestSparseCheckoutField: def test_U3_sparse_checkout_key_present_when_disabled( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) data = _status(root) assert "sparse_checkout" in data def test_U3_sparse_checkout_null_when_disabled( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) data = _status(root) assert data["sparse_checkout"] is None def test_U3_sparse_checkout_dict_when_active( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) _set_sparse(root, "muse/") data = _status(root) assert isinstance(data["sparse_checkout"], dict) def test_U4_sparse_checkout_enabled_field( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) _set_sparse(root, "src/") sc = _status(root)["sparse_checkout"] assert sc["enabled"] is True def test_U4_sparse_checkout_mode_cone( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) _set_sparse(root, "src/") sc = _status(root)["sparse_checkout"] assert sc["mode"] == "cone" def test_U4_sparse_checkout_mode_pattern( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) runner.invoke(None, ["sparse-checkout", "init", "--no-cone"], env=_env(root)) runner.invoke(None, ["sparse-checkout", "set", "**/*.py"], env=_env(root)) sc = _status(root)["sparse_checkout"] assert sc["mode"] == "pattern" def test_U4_sparse_checkout_patterns_match_config( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) _set_sparse(root, "src/", "tests/") sc = _status(root)["sparse_checkout"] assert sc["patterns"] == ["src/", "tests/"] def test_D3_sparse_checkout_patterns_always_list( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) _set_sparse(root, "src/") sc = _status(root)["sparse_checkout"] assert isinstance(sc["patterns"], list) def test_D5_sparse_checkout_null_after_disable( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) _set_sparse(root, "src/") assert _status(root)["sparse_checkout"] is not None _disable_sparse(root) assert _status(root)["sparse_checkout"] is None # --------------------------------------------------------------------------- # I1 Code-domain upstream bug — ahead/behind was hardcoded None # --------------------------------------------------------------------------- class TestCodeDomainUpstream: def test_I1_code_domain_includes_upstream_key( self, tmp_path: pathlib.Path ) -> None: """Code-domain status --json must include upstream, ahead, behind.""" root = _fresh_code_repo(tmp_path) data = _status(root) assert "upstream" in data assert "ahead" in data assert "behind" in data def test_I1_code_domain_upstream_null_when_no_remote( self, tmp_path: pathlib.Path ) -> None: """Without a configured upstream, these fields are null (not missing).""" root = _fresh_code_repo(tmp_path) data = _status(root) assert data["upstream"] is None assert data["ahead"] is None assert data["behind"] is None # --------------------------------------------------------------------------- # I2–I3 --branch-only schema stability # --------------------------------------------------------------------------- class TestBranchOnlySchema: def test_I2_merge_in_progress_always_present( self, tmp_path: pathlib.Path ) -> None: """--branch --json must always emit merge_in_progress.""" root = _fresh_code_repo(tmp_path) result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root)) data = json.loads(result.stdout) assert "merge_in_progress" in data def test_I2_merge_from_always_present( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root)) data = json.loads(result.stdout) assert "merge_from" in data def test_I2_conflict_count_always_present( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root)) data = json.loads(result.stdout) assert "conflict_count" in data def test_I2_no_merge_values_are_defaults( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root)) data = json.loads(result.stdout) assert data["merge_in_progress"] is False assert data["merge_from"] is None assert data["conflict_count"] == 0 def test_I3_branch_only_exit_code_flag_exits_zero_when_dirty( self, tmp_path: pathlib.Path ) -> None: """--branch --exit-code must exit 0 even when working tree is dirty.""" root = _fresh_code_repo(tmp_path) (root / "dirty.py").write_text("z = 1\n") result = runner.invoke( None, ["status", "--branch", "--exit-code", "--json"], env=_env(root) ) assert result.exit_code == 0 # --------------------------------------------------------------------------- # I4–I6 checkout_interrupted # --------------------------------------------------------------------------- class TestCheckoutInterrupted: def test_I4_checkout_interrupted_true_when_file_exists( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) # Simulate an interrupted checkout by writing CHECKOUT_HEAD (muse_dir(root) / "CHECKOUT_HEAD").write_text("feat/x", encoding="utf-8") data = _status(root) assert data["checkout_interrupted"] is True def test_I5_checkout_target_matches_file_content( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) (muse_dir(root) / "CHECKOUT_HEAD").write_text("feat/my-branch", encoding="utf-8") data = _status(root) assert data["checkout_target"] == "feat/my-branch" def test_I6_checkout_interrupted_false_when_absent( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) data = _status(root) assert data["checkout_interrupted"] is False assert data["checkout_target"] is None def test_I6_checkout_interrupted_cleared_after_file_removed( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) f = muse_dir(root) / "CHECKOUT_HEAD" f.write_text("feat/x", encoding="utf-8") assert _status(root)["checkout_interrupted"] is True f.unlink() assert _status(root)["checkout_interrupted"] is False # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestSecurity: def test_S1_ansi_in_merge_from_not_in_json_value( self, tmp_path: pathlib.Path ) -> None: """merge_from with ANSI bytes must not propagate raw escapes into JSON.""" root = _fresh_code_repo(tmp_path) # Inject ANSI directly into MERGE_STATE import json as _json dot_muse = muse_dir(root) merge_state = { "other_branch": "\x1b[31mmalicious\x1b[0m", "conflict_paths": [], "original_conflict_paths": [], "ours_commit_id": long_id("a" * 64), "theirs_commit_id": long_id("b" * 64), } (dot_muse / "MERGE_STATE").write_text( _json.dumps(merge_state), encoding="utf-8" ) result = runner.invoke(None, ["status", "--json"], env=_env(root)) assert "\x1b" not in result.stdout def test_S2_ansi_in_checkout_target_not_in_json_value( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) (muse_dir(root) / "CHECKOUT_HEAD").write_text( "\x1b[31mmalicious-branch\x1b[0m", encoding="utf-8" ) result = runner.invoke(None, ["status", "--json"], env=_env(root)) assert "\x1b" not in result.stdout def test_S3_ansi_in_branch_name_not_in_json( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) # Force HEAD to point to a branch name containing ANSI (head_path(root)).write_text( "ref: refs/heads/\x1b[31mmalicious\x1b[0m", encoding="utf-8" ) result = runner.invoke(None, ["status", "--json"], env=_env(root)) assert "\x1b" not in result.stdout def test_S4_no_raw_ansi_in_json_output(self, tmp_path: pathlib.Path) -> None: root = _fresh_code_repo(tmp_path) (root / "new.py").write_text("y = 1\n") result = runner.invoke(None, ["status", "--json"], env=_env(root)) assert "\x1b" not in result.stdout # --------------------------------------------------------------------------- # Data integrity # --------------------------------------------------------------------------- class TestDataIntegrity: def test_D1_duration_ms_is_float(self, tmp_path: pathlib.Path) -> None: root = _fresh_code_repo(tmp_path) data = _status(root) assert isinstance(data["duration_ms"], float) def test_D2_exit_code_is_int(self, tmp_path: pathlib.Path) -> None: root = _fresh_code_repo(tmp_path) data = _status(root) assert isinstance(data["exit_code"], int) assert not isinstance(data["exit_code"], bool) def test_D4_total_changes_includes_renamed( self, tmp_path: pathlib.Path ) -> None: """total_changes must count renamed entries (non-code domain).""" root = _init_repo(tmp_path, domain="mist") data = _status(root) expected = ( len(data["added"]) + len(data["modified"]) + len(data["deleted"]) + len(data["renamed"]) ) assert data["total_changes"] == expected def test_all_required_keys_still_present_with_new_fields( self, tmp_path: pathlib.Path ) -> None: """Adding new fields must not drop any previously required key.""" _REQUIRED_KEYS = { "branch", "head_commit", "upstream", "clean", "dirty", "ahead", "behind", "total_changes", "added", "modified", "deleted", "renamed", "staged", "unstaged", "untracked", "conflict_paths", "merge_in_progress", "merge_from", "conflict_count", "checkout_interrupted", "checkout_target", "duration_ms", "exit_code", "sparse_checkout", } root = _fresh_code_repo(tmp_path) data = _status(root) missing = _REQUIRED_KEYS - set(data.keys()) assert not missing, f"Missing JSON keys: {missing}" # --------------------------------------------------------------------------- # Performance / stress # --------------------------------------------------------------------------- class TestPerformance: @pytest.mark.slow def test_P1_5000_file_repo_completes(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path, domain="code") for i in range(5000): (root / f"f_{i:05d}.py").write_text(f"x = {i}\n") _commit(root, "5k files") t0 = time.monotonic() data = _status(root) elapsed = time.monotonic() - t0 assert data["clean"] is True assert data["duration_ms"] >= 0 assert elapsed < 10.0, f"status took {elapsed:.1f}s on 5k files" def test_P2_duration_ms_present_in_all_rapid_calls( self, tmp_path: pathlib.Path ) -> None: root = _fresh_code_repo(tmp_path) for i in range(10): data = _status(root) assert "duration_ms" in data, f"Missing duration_ms on call {i}" assert data["duration_ms"] >= 0 def test_P3_duration_ms_consistent_across_clean_calls( self, tmp_path: pathlib.Path ) -> None: """Two clean-tree calls should have duration_ms within 50x of each other.""" root = _fresh_code_repo(tmp_path) t1 = _status(root)["duration_ms"] t2 = _status(root)["duration_ms"] # Just verify both are plausible non-zero floats (not placeholder 0.0) assert t1 >= 0 assert t2 >= 0 # --------------------------------------------------------------------------- # Concurrent # --------------------------------------------------------------------------- class TestConcurrent: def test_C1_four_concurrent_status_calls(self, tmp_path: pathlib.Path) -> None: """4 threads each running status on their own repo must all succeed.""" results: list[dict | Exception] = [None] * 4 # type: ignore[list-item] def _run(idx: int) -> None: repo = tmp_path / f"repo_{idx}" repo.mkdir() try: r = _init_repo(repo, domain="code") (repo / "f.py").write_text(f"x = {idx}\n") runner.invoke(None, ["code", "add", "."], env=_env(repo)) _commit(repo, f"commit {idx}") results[idx] = _status(repo) except Exception as exc: results[idx] = exc threads = [threading.Thread(target=_run, args=(i,)) for i in range(4)] for t in threads: t.start() for t in threads: t.join() for i, result in enumerate(results): assert not isinstance(result, Exception), ( f"Thread {i} raised: {result}" ) assert result["clean"] is True, f"Thread {i} not clean" assert "duration_ms" in result