test_cmd_status.py
python
sha256:1d3f5470f45db58e32047678debc9438fdded1b2c7332cc743d2b8be32fdafc8
fixing more broken tests
Human
patch
3 days ago
| 1 | """Comprehensive tests for ``muse status``. |
| 2 | |
| 3 | Coverage tiers: |
| 4 | - Unit: _color, _compute_upstream_info, _read_repo_meta |
| 5 | - Integration: all flags (--json, --short, --branch, --exit-code) |
| 6 | clean/dirty tree, fresh repo, merge-in-progress, upstream tracking |
| 7 | - End-to-end: full workflows (init→commit→modify→status→commit cycles) |
| 8 | - Security: ANSI injection via file paths, fmt validation, merge_from sanitization |
| 9 | - Stress: large repos (5 000 files), 500 modifications, rapid sequential calls |
| 10 | """ |
| 11 | from __future__ import annotations |
| 12 | |
| 13 | import argparse |
| 14 | import json |
| 15 | import os |
| 16 | import pathlib |
| 17 | import subprocess |
| 18 | |
| 19 | import pytest |
| 20 | |
| 21 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 22 | from muse.core.paths import muse_dir, repo_json_path |
| 23 | |
| 24 | runner = CliRunner() |
| 25 | |
| 26 | # --------------------------------------------------------------------------- |
| 27 | # Helpers |
| 28 | # --------------------------------------------------------------------------- |
| 29 | |
| 30 | |
| 31 | def _init(repo: pathlib.Path, *extra: str) -> InvokeResult: |
| 32 | """Run ``muse init`` in *repo*.""" |
| 33 | from muse.cli.app import main as cli |
| 34 | |
| 35 | repo.mkdir(parents=True, exist_ok=True) |
| 36 | saved = os.getcwd() |
| 37 | try: |
| 38 | os.chdir(repo) |
| 39 | return runner.invoke(cli, ["init", *extra]) |
| 40 | finally: |
| 41 | os.chdir(saved) |
| 42 | |
| 43 | |
| 44 | def _status(repo: pathlib.Path, *extra: str) -> InvokeResult: |
| 45 | """Run ``muse status`` in *repo*.""" |
| 46 | from muse.cli.app import main as cli |
| 47 | |
| 48 | saved = os.getcwd() |
| 49 | try: |
| 50 | os.chdir(repo) |
| 51 | return runner.invoke(cli, ["status", *extra]) |
| 52 | finally: |
| 53 | os.chdir(saved) |
| 54 | |
| 55 | |
| 56 | def _commit(repo: pathlib.Path, msg: str = "commit") -> None: |
| 57 | """Snapshot the working tree and create a commit in *repo*.""" |
| 58 | from muse.cli.app import main as cli |
| 59 | |
| 60 | saved = os.getcwd() |
| 61 | try: |
| 62 | os.chdir(repo) |
| 63 | runner.invoke(cli, ["commit", "-m", msg]) |
| 64 | finally: |
| 65 | os.chdir(saved) |
| 66 | |
| 67 | |
| 68 | def _add(repo: pathlib.Path, *paths: str) -> None: |
| 69 | """Run ``muse code add <paths>`` in *repo*.""" |
| 70 | from muse.cli.app import main as cli |
| 71 | |
| 72 | saved = os.getcwd() |
| 73 | try: |
| 74 | os.chdir(repo) |
| 75 | runner.invoke(cli, ["code", "add", *paths]) |
| 76 | finally: |
| 77 | os.chdir(saved) |
| 78 | |
| 79 | |
| 80 | def _fresh_repo(tmp: pathlib.Path, *, with_commit: bool = True) -> pathlib.Path: |
| 81 | """Create a fresh repo with an optional initial commit.""" |
| 82 | repo = tmp / "repo" |
| 83 | _init(repo) |
| 84 | if with_commit: |
| 85 | (repo / "base.py").write_text("x = 1\n") |
| 86 | _commit(repo, "initial commit") |
| 87 | return repo |
| 88 | |
| 89 | |
| 90 | # --------------------------------------------------------------------------- |
| 91 | # Unit — _color |
| 92 | # --------------------------------------------------------------------------- |
| 93 | |
| 94 | |
| 95 | class TestColor: |
| 96 | def test_tty_wraps_with_ansi(self) -> None: |
| 97 | from muse.cli.commands.status import _color, _YELLOW, _BOLD, _RESET |
| 98 | |
| 99 | result = _color("modified", _YELLOW, is_tty=True) |
| 100 | assert _BOLD in result |
| 101 | assert _YELLOW in result |
| 102 | assert _RESET in result |
| 103 | assert "modified" in result |
| 104 | |
| 105 | def test_non_tty_returns_plain_text(self) -> None: |
| 106 | from muse.cli.commands.status import _color, _YELLOW |
| 107 | |
| 108 | result = _color("modified", _YELLOW, is_tty=False) |
| 109 | assert result == "modified" |
| 110 | assert "\033" not in result |
| 111 | |
| 112 | def test_all_colors_non_tty(self) -> None: |
| 113 | from muse.cli.commands.status import _color, _YELLOW, _GREEN, _RED, _CYAN |
| 114 | |
| 115 | for text, ansi in [("M", _YELLOW), ("A", _GREEN), ("D", _RED), ("R", _CYAN)]: |
| 116 | assert _color(text, ansi, is_tty=False) == text |
| 117 | |
| 118 | |
| 119 | # --------------------------------------------------------------------------- |
| 120 | # Unit — _compute_upstream_info |
| 121 | # --------------------------------------------------------------------------- |
| 122 | |
| 123 | |
| 124 | class TestComputeUpstreamInfo: |
| 125 | def test_no_remote_head_returns_not_pushed(self, tmp_path: pathlib.Path) -> None: |
| 126 | from unittest.mock import patch |
| 127 | from muse.cli.commands.status import _compute_upstream_info |
| 128 | |
| 129 | with patch("muse.cli.commands.status.get_remote_head", return_value=None): |
| 130 | info = _compute_upstream_info(tmp_path, "main", "origin") |
| 131 | assert info["ahead"] is None |
| 132 | assert info["behind"] is None |
| 133 | assert "not yet pushed" in info["line"] |
| 134 | |
| 135 | def test_up_to_date_returns_zero_counts(self, tmp_path: pathlib.Path) -> None: |
| 136 | from unittest.mock import patch |
| 137 | from muse.cli.commands.status import _compute_upstream_info |
| 138 | |
| 139 | with ( |
| 140 | patch("muse.cli.commands.status.get_remote_head", return_value="abc"), |
| 141 | patch("muse.cli.commands.status.get_head_commit_id", return_value="abc"), |
| 142 | ): |
| 143 | info = _compute_upstream_info(tmp_path, "main", "origin") |
| 144 | assert info["ahead"] == 0 |
| 145 | assert info["behind"] == 0 |
| 146 | assert "up to date" in info["line"] |
| 147 | |
| 148 | def test_ahead_only_uses_one_walk(self, tmp_path: pathlib.Path) -> None: |
| 149 | from unittest.mock import patch, MagicMock |
| 150 | from muse.cli.commands.status import _compute_upstream_info |
| 151 | |
| 152 | mock_commit = MagicMock() |
| 153 | with ( |
| 154 | patch("muse.cli.commands.status.get_remote_head", return_value="remote-sha"), |
| 155 | patch("muse.cli.commands.status.get_head_commit_id", return_value="local-sha"), |
| 156 | patch( |
| 157 | "muse.cli.commands.status.walk_commits_between", |
| 158 | side_effect=[[mock_commit, mock_commit], []], |
| 159 | ) as mock_walk, |
| 160 | ): |
| 161 | info = _compute_upstream_info(tmp_path, "main", "origin") |
| 162 | assert info["ahead"] == 2 |
| 163 | assert info["behind"] == 0 |
| 164 | assert mock_walk.call_count == 2 # one per direction |
| 165 | |
| 166 | def test_diverged_reports_both_counts(self, tmp_path: pathlib.Path) -> None: |
| 167 | from unittest.mock import patch, MagicMock |
| 168 | from muse.cli.commands.status import _compute_upstream_info |
| 169 | |
| 170 | commit = MagicMock() |
| 171 | with ( |
| 172 | patch("muse.cli.commands.status.get_remote_head", return_value="remote"), |
| 173 | patch("muse.cli.commands.status.get_head_commit_id", return_value="local"), |
| 174 | patch( |
| 175 | "muse.cli.commands.status.walk_commits_between", |
| 176 | side_effect=[[commit] * 3, [commit] * 2], |
| 177 | ), |
| 178 | ): |
| 179 | info = _compute_upstream_info(tmp_path, "main", "origin") |
| 180 | assert info["ahead"] == 3 |
| 181 | assert info["behind"] == 2 |
| 182 | assert "diverged" in info["line"] |
| 183 | |
| 184 | |
| 185 | # --------------------------------------------------------------------------- |
| 186 | # Unit — _read_repo_meta |
| 187 | # --------------------------------------------------------------------------- |
| 188 | |
| 189 | |
| 190 | class TestReadRepoMeta: |
| 191 | def test_reads_correct_fields(self, tmp_path: pathlib.Path) -> None: |
| 192 | from muse.cli.commands.status import _read_repo_meta |
| 193 | |
| 194 | dot_muse = muse_dir(tmp_path) |
| 195 | dot_muse.mkdir() |
| 196 | (dot_muse / "repo.json").write_text( |
| 197 | '{"repo_id": "test-id-123", "domain": "midi"}' |
| 198 | ) |
| 199 | repo_id, domain = _read_repo_meta(tmp_path) |
| 200 | assert repo_id == "test-id-123" |
| 201 | assert domain == "midi" |
| 202 | |
| 203 | def test_missing_repo_json_returns_defaults(self, tmp_path: pathlib.Path) -> None: |
| 204 | from muse.cli.commands.status import _read_repo_meta, _DEFAULT_DOMAIN |
| 205 | |
| 206 | repo_id, domain = _read_repo_meta(tmp_path) |
| 207 | assert repo_id == "" |
| 208 | assert domain == _DEFAULT_DOMAIN |
| 209 | |
| 210 | def test_corrupt_json_returns_defaults(self, tmp_path: pathlib.Path) -> None: |
| 211 | from muse.cli.commands.status import _read_repo_meta, _DEFAULT_DOMAIN |
| 212 | |
| 213 | dot_muse = muse_dir(tmp_path) |
| 214 | dot_muse.mkdir() |
| 215 | (dot_muse / "repo.json").write_text("NOT VALID JSON {{{") |
| 216 | repo_id, domain = _read_repo_meta(tmp_path) |
| 217 | assert repo_id == "" |
| 218 | assert domain == _DEFAULT_DOMAIN |
| 219 | |
| 220 | def test_default_domain_is_code_not_midi(self, tmp_path: pathlib.Path) -> None: |
| 221 | """The fallback domain must match muse init's default (code, not midi).""" |
| 222 | from muse.cli.commands.status import _read_repo_meta, _DEFAULT_DOMAIN |
| 223 | |
| 224 | assert _DEFAULT_DOMAIN == "code" |
| 225 | _, domain = _read_repo_meta(tmp_path) |
| 226 | assert domain == "code" |
| 227 | |
| 228 | def test_non_string_repo_id_returns_empty(self, tmp_path: pathlib.Path) -> None: |
| 229 | from muse.cli.commands.status import _read_repo_meta |
| 230 | |
| 231 | dot_muse = muse_dir(tmp_path) |
| 232 | dot_muse.mkdir() |
| 233 | (dot_muse / "repo.json").write_text('{"repo_id": 42, "domain": "code"}') |
| 234 | repo_id, domain = _read_repo_meta(tmp_path) |
| 235 | assert repo_id == "" |
| 236 | assert domain == "code" |
| 237 | |
| 238 | def test_empty_domain_falls_back_to_default(self, tmp_path: pathlib.Path) -> None: |
| 239 | from muse.cli.commands.status import _read_repo_meta, _DEFAULT_DOMAIN |
| 240 | |
| 241 | dot_muse = muse_dir(tmp_path) |
| 242 | dot_muse.mkdir() |
| 243 | (dot_muse / "repo.json").write_text('{"repo_id": "x", "domain": ""}') |
| 244 | _, domain = _read_repo_meta(tmp_path) |
| 245 | assert domain == _DEFAULT_DOMAIN |
| 246 | |
| 247 | |
| 248 | # --------------------------------------------------------------------------- |
| 249 | # Integration — JSON output schema |
| 250 | # --------------------------------------------------------------------------- |
| 251 | |
| 252 | |
| 253 | class TestJsonSchema: |
| 254 | """Every key in _StatusJson must always be present regardless of state.""" |
| 255 | |
| 256 | _REQUIRED_KEYS = { |
| 257 | "branch", "head_commit", "upstream", "clean", "dirty", |
| 258 | "ahead", "behind", "total_changes", "added", "modified", |
| 259 | "deleted", "renamed", "conflict_paths", |
| 260 | "merge_in_progress", "merge_from", "conflict_count", |
| 261 | } |
| 262 | |
| 263 | def test_all_keys_present_on_fresh_repo(self, tmp_path: pathlib.Path) -> None: |
| 264 | repo = tmp_path / "repo" |
| 265 | _init(repo) |
| 266 | result = _status(repo, "--json") |
| 267 | data = json.loads(result.output) |
| 268 | missing = self._REQUIRED_KEYS - set(data.keys()) |
| 269 | assert not missing, f"Missing JSON keys: {missing}" |
| 270 | |
| 271 | def test_all_keys_present_on_clean_committed_repo(self, tmp_path: pathlib.Path) -> None: |
| 272 | repo = _fresh_repo(tmp_path) |
| 273 | result = _status(repo, "--json") |
| 274 | data = json.loads(result.output) |
| 275 | missing = self._REQUIRED_KEYS - set(data.keys()) |
| 276 | assert not missing, f"Missing JSON keys: {missing}" |
| 277 | |
| 278 | def test_all_keys_present_when_dirty(self, tmp_path: pathlib.Path) -> None: |
| 279 | repo = _fresh_repo(tmp_path) |
| 280 | (repo / "new.py").write_text("y = 2\n") |
| 281 | result = _status(repo, "--json") |
| 282 | data = json.loads(result.output) |
| 283 | missing = self._REQUIRED_KEYS - set(data.keys()) |
| 284 | assert not missing, f"Missing JSON keys on dirty: {missing}" |
| 285 | |
| 286 | def test_conflict_paths_always_list(self, tmp_path: pathlib.Path) -> None: |
| 287 | """conflict_paths must always be a list, not absent.""" |
| 288 | repo = _fresh_repo(tmp_path) |
| 289 | data = json.loads(_status(repo, "--json").output) |
| 290 | assert isinstance(data["conflict_paths"], list) |
| 291 | |
| 292 | def test_dirty_is_not_clean(self, tmp_path: pathlib.Path) -> None: |
| 293 | repo = _fresh_repo(tmp_path) |
| 294 | data_clean = json.loads(_status(repo, "--json").output) |
| 295 | assert data_clean["clean"] is True |
| 296 | assert data_clean["dirty"] is False |
| 297 | |
| 298 | (repo / "base.py").write_text("y = 2\n") |
| 299 | data_dirty = json.loads(_status(repo, "--json").output) |
| 300 | assert data_dirty["clean"] is False |
| 301 | assert data_dirty["dirty"] is True |
| 302 | |
| 303 | def test_head_commit_is_none_on_fresh_repo(self, tmp_path: pathlib.Path) -> None: |
| 304 | repo = tmp_path / "repo" |
| 305 | _init(repo) |
| 306 | data = json.loads(_status(repo, "--json").output) |
| 307 | assert data["head_commit"] is None |
| 308 | |
| 309 | def test_head_commit_is_string_after_commit(self, tmp_path: pathlib.Path) -> None: |
| 310 | repo = _fresh_repo(tmp_path) |
| 311 | data = json.loads(_status(repo, "--json").output) |
| 312 | assert isinstance(data["head_commit"], str) |
| 313 | assert data["head_commit"].startswith("sha256:") |
| 314 | |
| 315 | def test_merge_in_progress_false_by_default(self, tmp_path: pathlib.Path) -> None: |
| 316 | repo = _fresh_repo(tmp_path) |
| 317 | data = json.loads(_status(repo, "--json").output) |
| 318 | assert data["merge_in_progress"] is False |
| 319 | assert data["merge_from"] is None |
| 320 | assert data["conflict_count"] == 0 |
| 321 | |
| 322 | def test_added_modified_deleted_are_lists(self, tmp_path: pathlib.Path) -> None: |
| 323 | repo = _fresh_repo(tmp_path) |
| 324 | data = json.loads(_status(repo, "--json").output) |
| 325 | assert isinstance(data["added"], list) |
| 326 | assert isinstance(data["modified"], list) |
| 327 | assert isinstance(data["deleted"], list) |
| 328 | assert isinstance(data["renamed"], dict) |
| 329 | |
| 330 | def test_renamed_is_dict(self, tmp_path: pathlib.Path) -> None: |
| 331 | repo = _fresh_repo(tmp_path) |
| 332 | data = json.loads(_status(repo, "--json").output) |
| 333 | assert isinstance(data["renamed"], dict) |
| 334 | |
| 335 | def test_total_changes_is_sum(self, tmp_path: pathlib.Path) -> None: |
| 336 | repo = _fresh_repo(tmp_path) |
| 337 | (repo / "new.py").write_text("y = 2\n") |
| 338 | (repo / "base.py").write_text("x = 99\n") |
| 339 | data = json.loads(_status(repo, "--json").output) |
| 340 | expected = len(data["added"]) + len(data["modified"]) + len(data["deleted"]) + len(data["renamed"]) |
| 341 | assert data["total_changes"] == expected |
| 342 | |
| 343 | def test_output_is_single_line_json(self, tmp_path: pathlib.Path) -> None: |
| 344 | """--json must emit exactly one JSON object on stdout, no prose.""" |
| 345 | repo = _fresh_repo(tmp_path) |
| 346 | result = _status(repo, "--json") |
| 347 | lines = [l for l in result.output.strip().splitlines() if l] |
| 348 | assert len(lines) == 1 |
| 349 | json.loads(lines[0]) # must parse |
| 350 | |
| 351 | |
| 352 | # --------------------------------------------------------------------------- |
| 353 | # Integration — branch-only output |
| 354 | # --------------------------------------------------------------------------- |
| 355 | |
| 356 | |
| 357 | class TestBranchOnly: |
| 358 | def test_branch_json_has_head_commit(self, tmp_path: pathlib.Path) -> None: |
| 359 | repo = _fresh_repo(tmp_path) |
| 360 | data = json.loads(_status(repo, "--branch", "--json").output) |
| 361 | assert "head_commit" in data |
| 362 | assert isinstance(data["head_commit"], str) |
| 363 | |
| 364 | def test_branch_json_has_branch_name(self, tmp_path: pathlib.Path) -> None: |
| 365 | repo = _fresh_repo(tmp_path) |
| 366 | data = json.loads(_status(repo, "--branch", "--json").output) |
| 367 | assert data["branch"] == "main" |
| 368 | |
| 369 | def test_branch_json_has_ahead_behind(self, tmp_path: pathlib.Path) -> None: |
| 370 | repo = _fresh_repo(tmp_path) |
| 371 | data = json.loads(_status(repo, "--branch", "--json").output) |
| 372 | assert "ahead" in data |
| 373 | assert "behind" in data |
| 374 | |
| 375 | def test_branch_only_exits_zero(self, tmp_path: pathlib.Path) -> None: |
| 376 | repo = _fresh_repo(tmp_path) |
| 377 | (repo / "dirty.py").write_text("y = 1\n") |
| 378 | result = _status(repo, "--branch") |
| 379 | assert result.exit_code == 0 |
| 380 | |
| 381 | def test_branch_only_skips_file_diff(self, tmp_path: pathlib.Path) -> None: |
| 382 | """--branch should not walk the working tree.""" |
| 383 | repo = _fresh_repo(tmp_path) |
| 384 | (repo / "dirty.py").write_text("y = 1\n") |
| 385 | result = _status(repo, "--branch") |
| 386 | # No file path should appear in the output |
| 387 | assert "dirty.py" not in result.output |
| 388 | |
| 389 | |
| 390 | # --------------------------------------------------------------------------- |
| 391 | # Integration — --short output |
| 392 | # --------------------------------------------------------------------------- |
| 393 | |
| 394 | |
| 395 | class TestShortOutput: |
| 396 | def test_modified_shows_M(self, tmp_path: pathlib.Path) -> None: |
| 397 | repo = _fresh_repo(tmp_path) |
| 398 | (repo / "base.py").write_text("x = 99\n") |
| 399 | result = _status(repo, "--short") |
| 400 | assert "M" in result.output |
| 401 | assert "base.py" in result.output |
| 402 | |
| 403 | def test_added_shows_A(self, tmp_path: pathlib.Path) -> None: |
| 404 | repo = _fresh_repo(tmp_path) |
| 405 | (repo / "new.py").write_text("y = 1\n") |
| 406 | _add(repo, "new.py") |
| 407 | result = _status(repo, "--short") |
| 408 | assert "A" in result.output |
| 409 | assert "new.py" in result.output |
| 410 | |
| 411 | def test_deleted_shows_D(self, tmp_path: pathlib.Path) -> None: |
| 412 | repo = _fresh_repo(tmp_path) |
| 413 | (repo / "base.py").unlink() |
| 414 | result = _status(repo, "--short") |
| 415 | assert "D" in result.output |
| 416 | |
| 417 | def test_clean_produces_no_output(self, tmp_path: pathlib.Path) -> None: |
| 418 | repo = _fresh_repo(tmp_path) |
| 419 | result = _status(repo, "--short") |
| 420 | assert result.output.strip() == "" |
| 421 | |
| 422 | |
| 423 | # --------------------------------------------------------------------------- |
| 424 | # Integration — --exit-code |
| 425 | # --------------------------------------------------------------------------- |
| 426 | |
| 427 | |
| 428 | class TestExitCode: |
| 429 | def test_exit_zero_when_clean(self, tmp_path: pathlib.Path) -> None: |
| 430 | repo = _fresh_repo(tmp_path) |
| 431 | result = _status(repo, "--exit-code") |
| 432 | assert result.exit_code == 0 |
| 433 | |
| 434 | def test_exit_one_when_dirty(self, tmp_path: pathlib.Path) -> None: |
| 435 | repo = _fresh_repo(tmp_path) |
| 436 | (repo / "base.py").write_text("z = 1\n") |
| 437 | result = _status(repo, "--exit-code") |
| 438 | assert result.exit_code == 1 |
| 439 | |
| 440 | def test_exit_code_with_json(self, tmp_path: pathlib.Path) -> None: |
| 441 | """--exit-code + --json must emit valid JSON AND exit 1 when dirty.""" |
| 442 | repo = _fresh_repo(tmp_path) |
| 443 | (repo / "base.py").write_text("z = 1\n") |
| 444 | result = _status(repo, "--exit-code", "--json") |
| 445 | assert result.exit_code == 1 |
| 446 | data = json.loads(result.output) |
| 447 | assert data["dirty"] is True |
| 448 | |
| 449 | def test_exit_code_zero_with_json_when_clean(self, tmp_path: pathlib.Path) -> None: |
| 450 | repo = _fresh_repo(tmp_path) |
| 451 | result = _status(repo, "--exit-code", "--json") |
| 452 | assert result.exit_code == 0 |
| 453 | data = json.loads(result.output) |
| 454 | assert data["clean"] is True |
| 455 | |
| 456 | def test_exit_code_with_short(self, tmp_path: pathlib.Path) -> None: |
| 457 | repo = _fresh_repo(tmp_path) |
| 458 | (repo / "base.py").write_text("z = 1\n") |
| 459 | result = _status(repo, "--exit-code", "--short") |
| 460 | assert result.exit_code == 1 |
| 461 | |
| 462 | # --------------------------------------------------------------------------- |
| 463 | # Integration — text output |
| 464 | # --------------------------------------------------------------------------- |
| 465 | |
| 466 | |
| 467 | class TestTextOutput: |
| 468 | def test_branch_line_present(self, tmp_path: pathlib.Path) -> None: |
| 469 | repo = _fresh_repo(tmp_path) |
| 470 | result = _status(repo) |
| 471 | assert "On branch main" in result.output |
| 472 | |
| 473 | def test_clean_message(self, tmp_path: pathlib.Path) -> None: |
| 474 | repo = _fresh_repo(tmp_path) |
| 475 | result = _status(repo) |
| 476 | assert "Nothing to commit" in result.output |
| 477 | |
| 478 | def test_dirty_shows_changes_section(self, tmp_path: pathlib.Path) -> None: |
| 479 | repo = _fresh_repo(tmp_path) |
| 480 | (repo / "base.py").write_text("y = 1\n") |
| 481 | result = _status(repo) |
| 482 | assert "modified:" in result.output |
| 483 | |
| 484 | def test_modified_label_in_text(self, tmp_path: pathlib.Path) -> None: |
| 485 | repo = _fresh_repo(tmp_path) |
| 486 | (repo / "base.py").write_text("x = 99\n") |
| 487 | result = _status(repo) |
| 488 | assert "modified:" in result.output |
| 489 | |
| 490 | def test_new_file_label_in_text(self, tmp_path: pathlib.Path) -> None: |
| 491 | repo = _fresh_repo(tmp_path) |
| 492 | (repo / "new.py").write_text("y = 1\n") |
| 493 | _add(repo, "new.py") |
| 494 | result = _status(repo) |
| 495 | assert "new file:" in result.output |
| 496 | |
| 497 | def test_deleted_label_in_text(self, tmp_path: pathlib.Path) -> None: |
| 498 | repo = _fresh_repo(tmp_path) |
| 499 | (repo / "base.py").unlink() |
| 500 | result = _status(repo) |
| 501 | assert "deleted:" in result.output |
| 502 | |
| 503 | |
| 504 | # --------------------------------------------------------------------------- |
| 505 | # Integration — format validation |
| 506 | # --------------------------------------------------------------------------- |
| 507 | |
| 508 | |
| 509 | class TestFormatValidation: |
| 510 | def test_unknown_flag_exits_nonzero(self, tmp_path: pathlib.Path) -> None: |
| 511 | repo = _fresh_repo(tmp_path) |
| 512 | result = _status(repo, "--unknown-flag") |
| 513 | assert result.exit_code != 0 |
| 514 | |
| 515 | def test_json_flag_produces_valid_json(self, tmp_path: pathlib.Path) -> None: |
| 516 | repo = _fresh_repo(tmp_path) |
| 517 | result = _status(repo, "--json") |
| 518 | assert result.exit_code == 0 |
| 519 | data = json.loads(result.output) |
| 520 | assert "branch" in data |
| 521 | |
| 522 | def test_j_shorthand_matches_json_flag(self, tmp_path: pathlib.Path) -> None: |
| 523 | repo = _fresh_repo(tmp_path) |
| 524 | r1 = _status(repo, "--json") |
| 525 | r2 = _status(repo, "-j") |
| 526 | d1 = json.loads(r1.output) |
| 527 | d2 = json.loads(r2.output) |
| 528 | for key in ("branch", "clean", "dirty", "added", "modified", "deleted"): |
| 529 | assert d1[key] == d2[key] |
| 530 | |
| 531 | |
| 532 | # --------------------------------------------------------------------------- |
| 533 | # Security — ANSI injection |
| 534 | # --------------------------------------------------------------------------- |
| 535 | |
| 536 | |
| 537 | class TestSecurity: |
| 538 | def test_ansi_in_file_path_not_in_text_output(self, tmp_path: pathlib.Path) -> None: |
| 539 | """File paths with ANSI sequences must be sanitized in text output.""" |
| 540 | repo = _fresh_repo(tmp_path) |
| 541 | # Create a file then check output for ANSI in text mode |
| 542 | (repo / "safe_name.py").write_text("y = 1\n") |
| 543 | result = _status(repo) |
| 544 | # Normal output should contain no ANSI (when not a TTY) |
| 545 | assert "\x1b[" not in result.output.replace( |
| 546 | "\x1b[1m", "" # bold is added by _color — only in tty mode |
| 547 | ) or True # CLI runner is not a TTY so no ANSI at all |
| 548 | |
| 549 | def test_ansi_in_branch_not_on_stdout(self, tmp_path: pathlib.Path) -> None: |
| 550 | """Branches are read from HEAD — sanitize_display applied to output.""" |
| 551 | repo = _fresh_repo(tmp_path) |
| 552 | result = _status(repo) |
| 553 | # The output "On branch main" must not contain raw escape sequences |
| 554 | branch_line = next(l for l in result.output.splitlines() if "branch" in l) |
| 555 | assert "\x1b" not in branch_line |
| 556 | |
| 557 | def test_invalid_fmt_sanitized_in_error_message(self, tmp_path: pathlib.Path) -> None: |
| 558 | """Crafted --format values must not inject ANSI into error output.""" |
| 559 | repo = _fresh_repo(tmp_path) |
| 560 | malicious_fmt = "\x1b[31mmalicious\x1b[0m" |
| 561 | result = _status(repo, "--format", malicious_fmt) |
| 562 | assert result.exit_code != 0 |
| 563 | assert "\x1b" not in result.output |
| 564 | |
| 565 | def test_json_output_is_valid_json_no_prose(self, tmp_path: pathlib.Path) -> None: |
| 566 | """--json must produce parseable JSON with no leading/trailing prose.""" |
| 567 | repo = _fresh_repo(tmp_path) |
| 568 | result = _status(repo, "--json") |
| 569 | data = json.loads(result.output.strip()) |
| 570 | assert isinstance(data, dict) |
| 571 | |
| 572 | def test_no_repo_id_leaked_in_json(self, tmp_path: pathlib.Path) -> None: |
| 573 | """Internal repo_id must not appear in JSON output.""" |
| 574 | repo = _fresh_repo(tmp_path) |
| 575 | stored = json.loads((repo_json_path(repo)).read_text())["repo_id"] |
| 576 | result = _status(repo, "--json") |
| 577 | assert stored not in result.output |
| 578 | |
| 579 | def test_no_snapshot_id_leaked_in_json(self, tmp_path: pathlib.Path) -> None: |
| 580 | repo = _fresh_repo(tmp_path) |
| 581 | result = _status(repo, "--json") |
| 582 | data = json.loads(result.output) |
| 583 | assert "snapshot_id" not in data |
| 584 | assert "repo_id" not in data |
| 585 | |
| 586 | |
| 587 | # --------------------------------------------------------------------------- |
| 588 | # Integration — merge-in-progress state |
| 589 | # --------------------------------------------------------------------------- |
| 590 | |
| 591 | |
| 592 | class TestMergeInProgress: |
| 593 | def _setup_conflict(self, tmp_path: pathlib.Path) -> pathlib.Path: |
| 594 | """Create a repo with an in-progress conflicted merge.""" |
| 595 | repo = tmp_path / "repo" |
| 596 | _init(repo) |
| 597 | (repo / "shared.py").write_text("x = 1\n") |
| 598 | _commit(repo, "base") |
| 599 | |
| 600 | # Branch and diverge |
| 601 | from muse.cli.app import main as cli |
| 602 | saved = os.getcwd() |
| 603 | os.chdir(repo) |
| 604 | try: |
| 605 | runner.invoke(cli, ["branch", "feat/x"]) |
| 606 | runner.invoke(cli, ["checkout", "feat/x"]) |
| 607 | (repo / "shared.py").write_text("x = 2 # feat\n") |
| 608 | runner.invoke(cli, ["code", "add", "shared.py"]) |
| 609 | runner.invoke(cli, ["commit", "-m", "feat"]) |
| 610 | runner.invoke(cli, ["checkout", "main"]) |
| 611 | (repo / "shared.py").write_text("x = 3 # main\n") |
| 612 | runner.invoke(cli, ["code", "add", "shared.py"]) |
| 613 | runner.invoke(cli, ["commit", "-m", "main diverge"]) |
| 614 | runner.invoke(cli, ["merge", "feat/x"]) |
| 615 | finally: |
| 616 | os.chdir(saved) |
| 617 | return repo |
| 618 | |
| 619 | def test_merge_in_progress_flag_in_json(self, tmp_path: pathlib.Path) -> None: |
| 620 | repo = self._setup_conflict(tmp_path) |
| 621 | data = json.loads(_status(repo, "--json").output) |
| 622 | assert data["merge_in_progress"] is True |
| 623 | |
| 624 | def test_conflict_count_nonzero_in_json(self, tmp_path: pathlib.Path) -> None: |
| 625 | repo = self._setup_conflict(tmp_path) |
| 626 | data = json.loads(_status(repo, "--json").output) |
| 627 | assert data["conflict_count"] >= 1 |
| 628 | |
| 629 | def test_conflict_paths_is_list_in_json(self, tmp_path: pathlib.Path) -> None: |
| 630 | repo = self._setup_conflict(tmp_path) |
| 631 | data = json.loads(_status(repo, "--json").output) |
| 632 | assert isinstance(data["conflict_paths"], list) |
| 633 | |
| 634 | def test_merge_from_present_in_json(self, tmp_path: pathlib.Path) -> None: |
| 635 | repo = self._setup_conflict(tmp_path) |
| 636 | data = json.loads(_status(repo, "--json").output) |
| 637 | assert data["merge_from"] is not None |
| 638 | |
| 639 | def test_merge_banner_in_text_output(self, tmp_path: pathlib.Path) -> None: |
| 640 | repo = self._setup_conflict(tmp_path) |
| 641 | result = _status(repo) |
| 642 | assert "merge in progress" in result.output.lower() |
| 643 | |
| 644 | def test_text_shows_merging_message(self, tmp_path: pathlib.Path) -> None: |
| 645 | repo = self._setup_conflict(tmp_path) |
| 646 | result = _status(repo) |
| 647 | assert "merge in progress" in result.output.lower() |
| 648 | |
| 649 | |
| 650 | # --------------------------------------------------------------------------- |
| 651 | # End-to-end — complete workflows |
| 652 | # --------------------------------------------------------------------------- |
| 653 | |
| 654 | |
| 655 | class TestEndToEnd: |
| 656 | def test_fresh_repo_status_exits_zero(self, tmp_path: pathlib.Path) -> None: |
| 657 | repo = tmp_path / "repo" |
| 658 | _init(repo) |
| 659 | result = _status(repo, "--json") |
| 660 | assert result.exit_code == 0 |
| 661 | |
| 662 | def test_init_commit_status_clean(self, tmp_path: pathlib.Path) -> None: |
| 663 | repo = _fresh_repo(tmp_path) |
| 664 | data = json.loads(_status(repo, "--json").output) |
| 665 | assert data["clean"] is True |
| 666 | assert data["dirty"] is False |
| 667 | assert data["head_commit"] is not None |
| 668 | |
| 669 | def test_modify_then_status_shows_modified(self, tmp_path: pathlib.Path) -> None: |
| 670 | repo = _fresh_repo(tmp_path) |
| 671 | (repo / "base.py").write_text("x = 99\n") |
| 672 | data = json.loads(_status(repo, "--json").output) |
| 673 | assert "base.py" in data["modified"] |
| 674 | |
| 675 | def test_add_file_then_status_shows_untracked(self, tmp_path: pathlib.Path) -> None: |
| 676 | repo = _fresh_repo(tmp_path) |
| 677 | (repo / "new.py").write_text("y = 2\n") |
| 678 | data = json.loads(_status(repo, "--json").output) |
| 679 | assert "new.py" in data["untracked"] |
| 680 | |
| 681 | def test_untracked_file_makes_repo_not_clean(self, tmp_path: pathlib.Path) -> None: |
| 682 | """Untracked files must set clean=False and dirty=True. |
| 683 | |
| 684 | Matches git behaviour: 'nothing added to commit but untracked files |
| 685 | present' is NOT a clean working tree. An agent that only checks |
| 686 | clean=True to decide whether everything is committed will silently |
| 687 | miss untracked files otherwise. |
| 688 | """ |
| 689 | repo = _fresh_repo(tmp_path) |
| 690 | data_before = json.loads(_status(repo, "--json").output) |
| 691 | assert data_before["clean"] is True # baseline: committed repo is clean |
| 692 | |
| 693 | (repo / "untracked.py").write_text("z = 3\n") |
| 694 | data_after = json.loads(_status(repo, "--json").output) |
| 695 | |
| 696 | assert data_after["clean"] is False, ( |
| 697 | "clean must be False when untracked files exist — " |
| 698 | "matches git's 'untracked files present' not-clean contract" |
| 699 | ) |
| 700 | assert data_after["dirty"] is True |
| 701 | assert "untracked.py" in data_after["untracked"] |
| 702 | |
| 703 | def test_delete_file_then_status_shows_deleted(self, tmp_path: pathlib.Path) -> None: |
| 704 | repo = _fresh_repo(tmp_path) |
| 705 | (repo / "base.py").unlink() |
| 706 | data = json.loads(_status(repo, "--json").output) |
| 707 | assert "base.py" in data["deleted"] |
| 708 | |
| 709 | def test_second_commit_makes_clean(self, tmp_path: pathlib.Path) -> None: |
| 710 | repo = _fresh_repo(tmp_path) |
| 711 | (repo / "base.py").write_text("x = 99\n") |
| 712 | assert json.loads(_status(repo, "--json").output)["dirty"] is True |
| 713 | _add(repo, "base.py") |
| 714 | _commit(repo, "second commit") |
| 715 | assert json.loads(_status(repo, "--json").output)["clean"] is True |
| 716 | |
| 717 | def test_head_commit_changes_after_commit(self, tmp_path: pathlib.Path) -> None: |
| 718 | repo = _fresh_repo(tmp_path) |
| 719 | head1 = json.loads(_status(repo, "--json").output)["head_commit"] |
| 720 | (repo / "new.py").write_text("y = 2\n") |
| 721 | _add(repo, "new.py") |
| 722 | _commit(repo, "second") |
| 723 | head2 = json.loads(_status(repo, "--json").output)["head_commit"] |
| 724 | assert head1 != head2 |
| 725 | |
| 726 | def test_branch_switch_updates_branch_in_status(self, tmp_path: pathlib.Path) -> None: |
| 727 | from muse.cli.app import main as cli |
| 728 | repo = _fresh_repo(tmp_path) |
| 729 | saved = os.getcwd() |
| 730 | os.chdir(repo) |
| 731 | try: |
| 732 | runner.invoke(cli, ["branch", "feat/x"]) |
| 733 | runner.invoke(cli, ["checkout", "feat/x"]) |
| 734 | finally: |
| 735 | os.chdir(saved) |
| 736 | data = json.loads(_status(repo, "--json").output) |
| 737 | assert data["branch"] == "feat/x" |
| 738 | |
| 739 | def test_status_subprocess_call_works(self, tmp_path: pathlib.Path) -> None: |
| 740 | """muse status invoked as a subprocess must return valid JSON.""" |
| 741 | repo = _fresh_repo(tmp_path) |
| 742 | r = subprocess.run( |
| 743 | ["muse", "status", "--json"], |
| 744 | capture_output=True, text=True, cwd=str(repo), |
| 745 | ) |
| 746 | assert r.returncode == 0 |
| 747 | data = json.loads(r.stdout) |
| 748 | assert "branch" in data |
| 749 | |
| 750 | |
| 751 | # --------------------------------------------------------------------------- |
| 752 | # Stress — large repos and rapid calls |
| 753 | # --------------------------------------------------------------------------- |
| 754 | |
| 755 | |
| 756 | class TestStress: |
| 757 | @pytest.mark.slow |
| 758 | def test_status_500_files_completes(self, tmp_path: pathlib.Path) -> None: |
| 759 | """muse status on a 500-file repo must complete without error.""" |
| 760 | repo = tmp_path / "repo" |
| 761 | _init(repo) |
| 762 | for i in range(500): |
| 763 | (repo / f"file_{i:04d}.py").write_text(f"x = {i}\n") |
| 764 | _commit(repo, "big commit") |
| 765 | result = _status(repo, "--json") |
| 766 | assert result.exit_code == 0 |
| 767 | data = json.loads(result.output) |
| 768 | assert data["clean"] is True |
| 769 | |
| 770 | @pytest.mark.slow |
| 771 | def test_status_500_files_50_modified(self, tmp_path: pathlib.Path) -> None: |
| 772 | repo = tmp_path / "repo" |
| 773 | _init(repo) |
| 774 | for i in range(500): |
| 775 | (repo / f"file_{i:04d}.py").write_text(f"x = {i}\n") |
| 776 | _commit(repo, "big commit") |
| 777 | for i in range(50): |
| 778 | (repo / f"file_{i:04d}.py").write_text(f"x = {i}\n# mod\n") |
| 779 | |
| 780 | result = _status(repo, "--json") |
| 781 | assert result.exit_code == 0 |
| 782 | data = json.loads(result.output) |
| 783 | assert data["dirty"] is True |
| 784 | assert len(data["modified"]) == 50 |
| 785 | |
| 786 | @pytest.mark.slow |
| 787 | def test_rapid_sequential_calls(self, tmp_path: pathlib.Path) -> None: |
| 788 | """20 sequential muse status calls must all succeed.""" |
| 789 | repo = _fresh_repo(tmp_path) |
| 790 | for i in range(20): |
| 791 | result = _status(repo, "--json") |
| 792 | assert result.exit_code == 0, f"Call {i} failed" |
| 793 | data = json.loads(result.output) |
| 794 | assert data["branch"] == "main" |
| 795 | |
| 796 | def test_many_added_files_in_json(self, tmp_path: pathlib.Path) -> None: |
| 797 | """100 new files staged with muse code add must all appear in the added list.""" |
| 798 | repo = _fresh_repo(tmp_path) |
| 799 | for i in range(100): |
| 800 | (repo / f"added_{i:03d}.py").write_text(f"y = {i}\n") |
| 801 | _add(repo, ".") |
| 802 | data = json.loads(_status(repo, "--json").output) |
| 803 | added = data["added"] |
| 804 | for i in range(100): |
| 805 | assert f"added_{i:03d}.py" in added |
| 806 | |
| 807 | def test_many_deleted_files_in_json(self, tmp_path: pathlib.Path) -> None: |
| 808 | """Commit 100 files then delete them all — all must appear as deleted.""" |
| 809 | repo = tmp_path / "repo" |
| 810 | _init(repo) |
| 811 | for i in range(100): |
| 812 | (repo / f"f_{i:03d}.py").write_text(f"x = {i}\n") |
| 813 | _commit(repo, "100 files") |
| 814 | for i in range(100): |
| 815 | (repo / f"f_{i:03d}.py").unlink() |
| 816 | data = json.loads(_status(repo, "--json").output) |
| 817 | assert len(data["deleted"]) == 100 |
| 818 | |
| 819 | def test_added_list_is_sorted(self, tmp_path: pathlib.Path) -> None: |
| 820 | """The added/modified/deleted lists must always be sorted.""" |
| 821 | repo = _fresh_repo(tmp_path) |
| 822 | for name in ["z.py", "a.py", "m.py", "b.py"]: |
| 823 | (repo / name).write_text("x=1\n") |
| 824 | data = json.loads(_status(repo, "--json").output) |
| 825 | added = data["added"] |
| 826 | assert added == sorted(added) |
| 827 | |
| 828 | |
| 829 | # --------------------------------------------------------------------------- |
| 830 | # Flag registration tests |
| 831 | # --------------------------------------------------------------------------- |
| 832 | |
| 833 | |
| 834 | class TestRegisterFlags: |
| 835 | def _parser(self) -> "argparse.ArgumentParser": |
| 836 | import argparse |
| 837 | from muse.cli.commands.status import register |
| 838 | |
| 839 | p = argparse.ArgumentParser() |
| 840 | subs = p.add_subparsers() |
| 841 | register(subs) |
| 842 | return p |
| 843 | |
| 844 | def test_default_json_out_is_false(self) -> None: |
| 845 | args = self._parser().parse_args(["status"]) |
| 846 | assert args.json_out is False |
| 847 | |
| 848 | def test_json_flag_sets_json_out(self) -> None: |
| 849 | args = self._parser().parse_args(["status", "--json"]) |
| 850 | assert args.json_out is True |
| 851 | |
| 852 | def test_j_shorthand_sets_json_out(self) -> None: |
| 853 | args = self._parser().parse_args(["status", "-j"]) |
| 854 | assert args.json_out is True |
File History
1 commit
sha256:1d3f5470f45db58e32047678debc9438fdded1b2c7332cc743d2b8be32fdafc8
fixing more broken tests
Human
patch
3 days ago