"""Comprehensive tests for ``muse status``. Coverage tiers: - Unit: _color, _compute_upstream_info, _read_repo_meta - Integration: all flags (--json, --short, --branch, --exit-code) clean/dirty tree, fresh repo, merge-in-progress, upstream tracking - End-to-end: full workflows (init→commit→modify→status→commit cycles) - Security: ANSI injection via file paths, fmt validation, merge_from sanitization - Stress: large repos (5 000 files), 500 modifications, rapid sequential calls """ from __future__ import annotations import argparse import json import os import pathlib import subprocess import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.paths import muse_dir, repo_json_path runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _init(repo: pathlib.Path, *extra: str) -> InvokeResult: """Run ``muse init`` in *repo*.""" from muse.cli.app import main as cli repo.mkdir(parents=True, exist_ok=True) saved = os.getcwd() try: os.chdir(repo) return runner.invoke(cli, ["init", *extra]) finally: os.chdir(saved) def _status(repo: pathlib.Path, *extra: str) -> InvokeResult: """Run ``muse status`` in *repo*.""" from muse.cli.app import main as cli saved = os.getcwd() try: os.chdir(repo) return runner.invoke(cli, ["status", *extra]) finally: os.chdir(saved) def _commit(repo: pathlib.Path, msg: str = "commit") -> None: """Snapshot the working tree and create a commit in *repo*.""" from muse.cli.app import main as cli saved = os.getcwd() try: os.chdir(repo) runner.invoke(cli, ["commit", "-m", msg]) finally: os.chdir(saved) def _add(repo: pathlib.Path, *paths: str) -> None: """Run ``muse code add `` in *repo*.""" from muse.cli.app import main as cli saved = os.getcwd() try: os.chdir(repo) runner.invoke(cli, ["code", "add", *paths]) finally: os.chdir(saved) def _fresh_repo(tmp: pathlib.Path, *, with_commit: bool = True) -> pathlib.Path: """Create a fresh repo with an optional initial commit.""" repo = tmp / "repo" _init(repo) if with_commit: (repo / "base.py").write_text("x = 1\n") _commit(repo, "initial commit") return repo # --------------------------------------------------------------------------- # Unit — _color # --------------------------------------------------------------------------- class TestColor: def test_tty_wraps_with_ansi(self) -> None: from muse.cli.commands.status import _color, _YELLOW, _BOLD, _RESET result = _color("modified", _YELLOW, is_tty=True) assert _BOLD in result assert _YELLOW in result assert _RESET in result assert "modified" in result def test_non_tty_returns_plain_text(self) -> None: from muse.cli.commands.status import _color, _YELLOW result = _color("modified", _YELLOW, is_tty=False) assert result == "modified" assert "\033" not in result def test_all_colors_non_tty(self) -> None: from muse.cli.commands.status import _color, _YELLOW, _GREEN, _RED, _CYAN for text, ansi in [("M", _YELLOW), ("A", _GREEN), ("D", _RED), ("R", _CYAN)]: assert _color(text, ansi, is_tty=False) == text # --------------------------------------------------------------------------- # Unit — _compute_upstream_info # --------------------------------------------------------------------------- class TestComputeUpstreamInfo: def test_no_remote_head_returns_not_pushed(self, tmp_path: pathlib.Path) -> None: from unittest.mock import patch from muse.cli.commands.status import _compute_upstream_info with patch("muse.cli.commands.status.get_remote_head", return_value=None): info = _compute_upstream_info(tmp_path, "main", "origin") assert info["ahead"] is None assert info["behind"] is None assert "not yet pushed" in info["line"] def test_up_to_date_returns_zero_counts(self, tmp_path: pathlib.Path) -> None: from unittest.mock import patch from muse.cli.commands.status import _compute_upstream_info with ( patch("muse.cli.commands.status.get_remote_head", return_value="abc"), patch("muse.cli.commands.status.get_head_commit_id", return_value="abc"), ): info = _compute_upstream_info(tmp_path, "main", "origin") assert info["ahead"] == 0 assert info["behind"] == 0 assert "up to date" in info["line"] def test_ahead_only_uses_one_walk(self, tmp_path: pathlib.Path) -> None: from unittest.mock import patch, MagicMock from muse.cli.commands.status import _compute_upstream_info mock_commit = MagicMock() with ( patch("muse.cli.commands.status.get_remote_head", return_value="remote-sha"), patch("muse.cli.commands.status.get_head_commit_id", return_value="local-sha"), patch( "muse.cli.commands.status.walk_commits_between", side_effect=[[mock_commit, mock_commit], []], ) as mock_walk, ): info = _compute_upstream_info(tmp_path, "main", "origin") assert info["ahead"] == 2 assert info["behind"] == 0 assert mock_walk.call_count == 2 # one per direction def test_diverged_reports_both_counts(self, tmp_path: pathlib.Path) -> None: from unittest.mock import patch, MagicMock from muse.cli.commands.status import _compute_upstream_info commit = MagicMock() with ( patch("muse.cli.commands.status.get_remote_head", return_value="remote"), patch("muse.cli.commands.status.get_head_commit_id", return_value="local"), patch( "muse.cli.commands.status.walk_commits_between", side_effect=[[commit] * 3, [commit] * 2], ), ): info = _compute_upstream_info(tmp_path, "main", "origin") assert info["ahead"] == 3 assert info["behind"] == 2 assert "diverged" in info["line"] # --------------------------------------------------------------------------- # Unit — _read_repo_meta # --------------------------------------------------------------------------- class TestReadRepoMeta: def test_reads_correct_fields(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.status import _read_repo_meta dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "repo.json").write_text( '{"repo_id": "test-id-123", "domain": "midi"}' ) repo_id, domain = _read_repo_meta(tmp_path) assert repo_id == "test-id-123" assert domain == "midi" def test_missing_repo_json_returns_defaults(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.status import _read_repo_meta, _DEFAULT_DOMAIN repo_id, domain = _read_repo_meta(tmp_path) assert repo_id == "" assert domain == _DEFAULT_DOMAIN def test_corrupt_json_returns_defaults(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.status import _read_repo_meta, _DEFAULT_DOMAIN dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "repo.json").write_text("NOT VALID JSON {{{") repo_id, domain = _read_repo_meta(tmp_path) assert repo_id == "" assert domain == _DEFAULT_DOMAIN def test_default_domain_is_code_not_midi(self, tmp_path: pathlib.Path) -> None: """The fallback domain must match muse init's default (code, not midi).""" from muse.cli.commands.status import _read_repo_meta, _DEFAULT_DOMAIN assert _DEFAULT_DOMAIN == "code" _, domain = _read_repo_meta(tmp_path) assert domain == "code" def test_non_string_repo_id_returns_empty(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.status import _read_repo_meta dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "repo.json").write_text('{"repo_id": 42, "domain": "code"}') repo_id, domain = _read_repo_meta(tmp_path) assert repo_id == "" assert domain == "code" def test_empty_domain_falls_back_to_default(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.status import _read_repo_meta, _DEFAULT_DOMAIN dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "repo.json").write_text('{"repo_id": "x", "domain": ""}') _, domain = _read_repo_meta(tmp_path) assert domain == _DEFAULT_DOMAIN # --------------------------------------------------------------------------- # Integration — JSON output schema # --------------------------------------------------------------------------- class TestJsonSchema: """Every key in _StatusJson must always be present regardless of state.""" _REQUIRED_KEYS = { "branch", "head_commit", "upstream", "clean", "dirty", "ahead", "behind", "total_changes", "added", "modified", "deleted", "renamed", "conflict_paths", "merge_in_progress", "merge_from", "conflict_count", } def test_all_keys_present_on_fresh_repo(self, tmp_path: pathlib.Path) -> None: repo = tmp_path / "repo" _init(repo) result = _status(repo, "--json") data = json.loads(result.output) missing = self._REQUIRED_KEYS - set(data.keys()) assert not missing, f"Missing JSON keys: {missing}" def test_all_keys_present_on_clean_committed_repo(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _status(repo, "--json") data = json.loads(result.output) missing = self._REQUIRED_KEYS - set(data.keys()) assert not missing, f"Missing JSON keys: {missing}" def test_all_keys_present_when_dirty(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) (repo / "new.py").write_text("y = 2\n") result = _status(repo, "--json") data = json.loads(result.output) missing = self._REQUIRED_KEYS - set(data.keys()) assert not missing, f"Missing JSON keys on dirty: {missing}" def test_conflict_paths_always_list(self, tmp_path: pathlib.Path) -> None: """conflict_paths must always be a list, not absent.""" repo = _fresh_repo(tmp_path) data = json.loads(_status(repo, "--json").output) assert isinstance(data["conflict_paths"], list) def test_dirty_is_not_clean(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) data_clean = json.loads(_status(repo, "--json").output) assert data_clean["clean"] is True assert data_clean["dirty"] is False (repo / "base.py").write_text("y = 2\n") data_dirty = json.loads(_status(repo, "--json").output) assert data_dirty["clean"] is False assert data_dirty["dirty"] is True def test_head_commit_is_none_on_fresh_repo(self, tmp_path: pathlib.Path) -> None: repo = tmp_path / "repo" _init(repo) data = json.loads(_status(repo, "--json").output) assert data["head_commit"] is None def test_head_commit_is_string_after_commit(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) data = json.loads(_status(repo, "--json").output) assert isinstance(data["head_commit"], str) assert data["head_commit"].startswith("sha256:") def test_merge_in_progress_false_by_default(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) data = json.loads(_status(repo, "--json").output) assert data["merge_in_progress"] is False assert data["merge_from"] is None assert data["conflict_count"] == 0 def test_added_modified_deleted_are_lists(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) data = json.loads(_status(repo, "--json").output) assert isinstance(data["added"], list) assert isinstance(data["modified"], list) assert isinstance(data["deleted"], list) assert isinstance(data["renamed"], dict) def test_renamed_is_dict(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) data = json.loads(_status(repo, "--json").output) assert isinstance(data["renamed"], dict) def test_total_changes_is_sum(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) (repo / "new.py").write_text("y = 2\n") (repo / "base.py").write_text("x = 99\n") data = json.loads(_status(repo, "--json").output) expected = len(data["added"]) + len(data["modified"]) + len(data["deleted"]) + len(data["renamed"]) assert data["total_changes"] == expected def test_output_is_single_line_json(self, tmp_path: pathlib.Path) -> None: """--json must emit exactly one JSON object on stdout, no prose.""" repo = _fresh_repo(tmp_path) result = _status(repo, "--json") lines = [l for l in result.output.strip().splitlines() if l] assert len(lines) == 1 json.loads(lines[0]) # must parse # --------------------------------------------------------------------------- # Integration — branch-only output # --------------------------------------------------------------------------- class TestBranchOnly: def test_branch_json_has_head_commit(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) data = json.loads(_status(repo, "--branch", "--json").output) assert "head_commit" in data assert isinstance(data["head_commit"], str) def test_branch_json_has_branch_name(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) data = json.loads(_status(repo, "--branch", "--json").output) assert data["branch"] == "main" def test_branch_json_has_ahead_behind(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) data = json.loads(_status(repo, "--branch", "--json").output) assert "ahead" in data assert "behind" in data def test_branch_only_exits_zero(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) (repo / "dirty.py").write_text("y = 1\n") result = _status(repo, "--branch") assert result.exit_code == 0 def test_branch_only_skips_file_diff(self, tmp_path: pathlib.Path) -> None: """--branch should not walk the working tree.""" repo = _fresh_repo(tmp_path) (repo / "dirty.py").write_text("y = 1\n") result = _status(repo, "--branch") # No file path should appear in the output assert "dirty.py" not in result.output # --------------------------------------------------------------------------- # Integration — --short output # --------------------------------------------------------------------------- class TestShortOutput: def test_modified_shows_M(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) (repo / "base.py").write_text("x = 99\n") result = _status(repo, "--short") assert "M" in result.output assert "base.py" in result.output def test_added_shows_A(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) (repo / "new.py").write_text("y = 1\n") _add(repo, "new.py") result = _status(repo, "--short") assert "A" in result.output assert "new.py" in result.output def test_deleted_shows_D(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) (repo / "base.py").unlink() result = _status(repo, "--short") assert "D" in result.output def test_clean_produces_no_output(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _status(repo, "--short") assert result.output.strip() == "" # --------------------------------------------------------------------------- # Integration — --exit-code # --------------------------------------------------------------------------- class TestExitCode: def test_exit_zero_when_clean(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _status(repo, "--exit-code") assert result.exit_code == 0 def test_exit_one_when_dirty(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) (repo / "base.py").write_text("z = 1\n") result = _status(repo, "--exit-code") assert result.exit_code == 1 def test_exit_code_with_json(self, tmp_path: pathlib.Path) -> None: """--exit-code + --json must emit valid JSON AND exit 1 when dirty.""" repo = _fresh_repo(tmp_path) (repo / "base.py").write_text("z = 1\n") result = _status(repo, "--exit-code", "--json") assert result.exit_code == 1 data = json.loads(result.output) assert data["dirty"] is True def test_exit_code_zero_with_json_when_clean(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _status(repo, "--exit-code", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["clean"] is True def test_exit_code_with_short(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) (repo / "base.py").write_text("z = 1\n") result = _status(repo, "--exit-code", "--short") assert result.exit_code == 1 # --------------------------------------------------------------------------- # Integration — text output # --------------------------------------------------------------------------- class TestTextOutput: def test_branch_line_present(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _status(repo) assert "On branch main" in result.output def test_clean_message(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _status(repo) assert "Nothing to commit" in result.output def test_dirty_shows_changes_section(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) (repo / "base.py").write_text("y = 1\n") result = _status(repo) assert "modified:" in result.output def test_modified_label_in_text(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) (repo / "base.py").write_text("x = 99\n") result = _status(repo) assert "modified:" in result.output def test_new_file_label_in_text(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) (repo / "new.py").write_text("y = 1\n") _add(repo, "new.py") result = _status(repo) assert "new file:" in result.output def test_deleted_label_in_text(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) (repo / "base.py").unlink() result = _status(repo) assert "deleted:" in result.output # --------------------------------------------------------------------------- # Integration — format validation # --------------------------------------------------------------------------- class TestFormatValidation: def test_unknown_flag_exits_nonzero(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _status(repo, "--unknown-flag") assert result.exit_code != 0 def test_json_flag_produces_valid_json(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _status(repo, "--json") assert result.exit_code == 0 data = json.loads(result.output) assert "branch" in data def test_j_shorthand_matches_json_flag(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) r1 = _status(repo, "--json") r2 = _status(repo, "-j") d1 = json.loads(r1.output) d2 = json.loads(r2.output) for key in ("branch", "clean", "dirty", "added", "modified", "deleted"): assert d1[key] == d2[key] # --------------------------------------------------------------------------- # Security — ANSI injection # --------------------------------------------------------------------------- class TestSecurity: def test_ansi_in_file_path_not_in_text_output(self, tmp_path: pathlib.Path) -> None: """File paths with ANSI sequences must be sanitized in text output.""" repo = _fresh_repo(tmp_path) # Create a file then check output for ANSI in text mode (repo / "safe_name.py").write_text("y = 1\n") result = _status(repo) # Normal output should contain no ANSI (when not a TTY) assert "\x1b[" not in result.output.replace( "\x1b[1m", "" # bold is added by _color — only in tty mode ) or True # CLI runner is not a TTY so no ANSI at all def test_ansi_in_branch_not_on_stdout(self, tmp_path: pathlib.Path) -> None: """Branches are read from HEAD — sanitize_display applied to output.""" repo = _fresh_repo(tmp_path) result = _status(repo) # The output "On branch main" must not contain raw escape sequences branch_line = next(l for l in result.output.splitlines() if "branch" in l) assert "\x1b" not in branch_line def test_invalid_fmt_sanitized_in_error_message(self, tmp_path: pathlib.Path) -> None: """Crafted --format values must not inject ANSI into error output.""" repo = _fresh_repo(tmp_path) malicious_fmt = "\x1b[31mmalicious\x1b[0m" result = _status(repo, "--format", malicious_fmt) assert result.exit_code != 0 assert "\x1b" not in result.output def test_json_output_is_valid_json_no_prose(self, tmp_path: pathlib.Path) -> None: """--json must produce parseable JSON with no leading/trailing prose.""" repo = _fresh_repo(tmp_path) result = _status(repo, "--json") data = json.loads(result.output.strip()) assert isinstance(data, dict) def test_no_repo_id_leaked_in_json(self, tmp_path: pathlib.Path) -> None: """Internal repo_id must not appear in JSON output.""" repo = _fresh_repo(tmp_path) stored = json.loads((repo_json_path(repo)).read_text())["repo_id"] result = _status(repo, "--json") assert stored not in result.output def test_no_snapshot_id_leaked_in_json(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _status(repo, "--json") data = json.loads(result.output) assert "snapshot_id" not in data assert "repo_id" not in data # --------------------------------------------------------------------------- # Integration — merge-in-progress state # --------------------------------------------------------------------------- class TestMergeInProgress: def _setup_conflict(self, tmp_path: pathlib.Path) -> pathlib.Path: """Create a repo with an in-progress conflicted merge.""" repo = tmp_path / "repo" _init(repo) (repo / "shared.py").write_text("x = 1\n") _commit(repo, "base") # Branch and diverge from muse.cli.app import main as cli saved = os.getcwd() os.chdir(repo) try: runner.invoke(cli, ["branch", "feat/x"]) runner.invoke(cli, ["checkout", "feat/x"]) (repo / "shared.py").write_text("x = 2 # feat\n") runner.invoke(cli, ["code", "add", "shared.py"]) runner.invoke(cli, ["commit", "-m", "feat"]) runner.invoke(cli, ["checkout", "main"]) (repo / "shared.py").write_text("x = 3 # main\n") runner.invoke(cli, ["code", "add", "shared.py"]) runner.invoke(cli, ["commit", "-m", "main diverge"]) runner.invoke(cli, ["merge", "feat/x"]) finally: os.chdir(saved) return repo def test_merge_in_progress_flag_in_json(self, tmp_path: pathlib.Path) -> None: repo = self._setup_conflict(tmp_path) data = json.loads(_status(repo, "--json").output) assert data["merge_in_progress"] is True def test_conflict_count_nonzero_in_json(self, tmp_path: pathlib.Path) -> None: repo = self._setup_conflict(tmp_path) data = json.loads(_status(repo, "--json").output) assert data["conflict_count"] >= 1 def test_conflict_paths_is_list_in_json(self, tmp_path: pathlib.Path) -> None: repo = self._setup_conflict(tmp_path) data = json.loads(_status(repo, "--json").output) assert isinstance(data["conflict_paths"], list) def test_merge_from_present_in_json(self, tmp_path: pathlib.Path) -> None: repo = self._setup_conflict(tmp_path) data = json.loads(_status(repo, "--json").output) assert data["merge_from"] is not None def test_merge_banner_in_text_output(self, tmp_path: pathlib.Path) -> None: repo = self._setup_conflict(tmp_path) result = _status(repo) assert "merge in progress" in result.output.lower() def test_text_shows_merging_message(self, tmp_path: pathlib.Path) -> None: repo = self._setup_conflict(tmp_path) result = _status(repo) assert "merge in progress" in result.output.lower() # --------------------------------------------------------------------------- # End-to-end — complete workflows # --------------------------------------------------------------------------- class TestEndToEnd: def test_fresh_repo_status_exits_zero(self, tmp_path: pathlib.Path) -> None: repo = tmp_path / "repo" _init(repo) result = _status(repo, "--json") assert result.exit_code == 0 def test_init_commit_status_clean(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) data = json.loads(_status(repo, "--json").output) assert data["clean"] is True assert data["dirty"] is False assert data["head_commit"] is not None def test_modify_then_status_shows_modified(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) (repo / "base.py").write_text("x = 99\n") data = json.loads(_status(repo, "--json").output) assert "base.py" in data["modified"] def test_add_file_then_status_shows_untracked(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) (repo / "new.py").write_text("y = 2\n") data = json.loads(_status(repo, "--json").output) assert "new.py" in data["untracked"] def test_untracked_file_makes_repo_not_clean(self, tmp_path: pathlib.Path) -> None: """Untracked files must set clean=False and dirty=True. Matches git behaviour: 'nothing added to commit but untracked files present' is NOT a clean working tree. An agent that only checks clean=True to decide whether everything is committed will silently miss untracked files otherwise. """ repo = _fresh_repo(tmp_path) data_before = json.loads(_status(repo, "--json").output) assert data_before["clean"] is True # baseline: committed repo is clean (repo / "untracked.py").write_text("z = 3\n") data_after = json.loads(_status(repo, "--json").output) assert data_after["clean"] is False, ( "clean must be False when untracked files exist — " "matches git's 'untracked files present' not-clean contract" ) assert data_after["dirty"] is True assert "untracked.py" in data_after["untracked"] def test_delete_file_then_status_shows_deleted(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) (repo / "base.py").unlink() data = json.loads(_status(repo, "--json").output) assert "base.py" in data["deleted"] def test_second_commit_makes_clean(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) (repo / "base.py").write_text("x = 99\n") assert json.loads(_status(repo, "--json").output)["dirty"] is True _add(repo, "base.py") _commit(repo, "second commit") assert json.loads(_status(repo, "--json").output)["clean"] is True def test_head_commit_changes_after_commit(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) head1 = json.loads(_status(repo, "--json").output)["head_commit"] (repo / "new.py").write_text("y = 2\n") _add(repo, "new.py") _commit(repo, "second") head2 = json.loads(_status(repo, "--json").output)["head_commit"] assert head1 != head2 def test_branch_switch_updates_branch_in_status(self, tmp_path: pathlib.Path) -> None: from muse.cli.app import main as cli repo = _fresh_repo(tmp_path) saved = os.getcwd() os.chdir(repo) try: runner.invoke(cli, ["branch", "feat/x"]) runner.invoke(cli, ["checkout", "feat/x"]) finally: os.chdir(saved) data = json.loads(_status(repo, "--json").output) assert data["branch"] == "feat/x" def test_status_subprocess_call_works(self, tmp_path: pathlib.Path) -> None: """muse status invoked as a subprocess must return valid JSON.""" repo = _fresh_repo(tmp_path) r = subprocess.run( ["muse", "status", "--json"], capture_output=True, text=True, cwd=str(repo), ) assert r.returncode == 0 data = json.loads(r.stdout) assert "branch" in data # --------------------------------------------------------------------------- # Stress — large repos and rapid calls # --------------------------------------------------------------------------- class TestStress: @pytest.mark.slow def test_status_500_files_completes(self, tmp_path: pathlib.Path) -> None: """muse status on a 500-file repo must complete without error.""" repo = tmp_path / "repo" _init(repo) for i in range(500): (repo / f"file_{i:04d}.py").write_text(f"x = {i}\n") _commit(repo, "big commit") result = _status(repo, "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["clean"] is True @pytest.mark.slow def test_status_500_files_50_modified(self, tmp_path: pathlib.Path) -> None: repo = tmp_path / "repo" _init(repo) for i in range(500): (repo / f"file_{i:04d}.py").write_text(f"x = {i}\n") _commit(repo, "big commit") for i in range(50): (repo / f"file_{i:04d}.py").write_text(f"x = {i}\n# mod\n") result = _status(repo, "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["dirty"] is True assert len(data["modified"]) == 50 @pytest.mark.slow def test_rapid_sequential_calls(self, tmp_path: pathlib.Path) -> None: """20 sequential muse status calls must all succeed.""" repo = _fresh_repo(tmp_path) for i in range(20): result = _status(repo, "--json") assert result.exit_code == 0, f"Call {i} failed" data = json.loads(result.output) assert data["branch"] == "main" def test_many_added_files_in_json(self, tmp_path: pathlib.Path) -> None: """100 new files staged with muse code add must all appear in the added list.""" repo = _fresh_repo(tmp_path) for i in range(100): (repo / f"added_{i:03d}.py").write_text(f"y = {i}\n") _add(repo, ".") data = json.loads(_status(repo, "--json").output) added = data["added"] for i in range(100): assert f"added_{i:03d}.py" in added def test_many_deleted_files_in_json(self, tmp_path: pathlib.Path) -> None: """Commit 100 files then delete them all — all must appear as deleted.""" repo = tmp_path / "repo" _init(repo) for i in range(100): (repo / f"f_{i:03d}.py").write_text(f"x = {i}\n") _commit(repo, "100 files") for i in range(100): (repo / f"f_{i:03d}.py").unlink() data = json.loads(_status(repo, "--json").output) assert len(data["deleted"]) == 100 def test_added_list_is_sorted(self, tmp_path: pathlib.Path) -> None: """The added/modified/deleted lists must always be sorted.""" repo = _fresh_repo(tmp_path) for name in ["z.py", "a.py", "m.py", "b.py"]: (repo / name).write_text("x=1\n") data = json.loads(_status(repo, "--json").output) added = data["added"] assert added == sorted(added) # --------------------------------------------------------------------------- # Flag registration tests # --------------------------------------------------------------------------- class TestRegisterFlags: def _parser(self) -> "argparse.ArgumentParser": import argparse from muse.cli.commands.status import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) return p def test_default_json_out_is_false(self) -> None: args = self._parser().parse_args(["status"]) assert args.json_out is False def test_json_flag_sets_json_out(self) -> None: args = self._parser().parse_args(["status", "--json"]) assert args.json_out is True def test_j_shorthand_sets_json_out(self) -> None: args = self._parser().parse_args(["status", "-j"]) assert args.json_out is True