"""Tests for ``muse code add`` / ``muse code reset`` and stage-aware commit/status. Coverage matrix: Unit tests (pure functions): - _split_into_hunks: empty diff, single hunk, multi-hunk, trailing newlines - _apply_hunks_to_bytes: accept all, accept none, accept partial, new-file - _infer_mode: all three modes (A / M / D) - _colorize_hunk: color escape codes present for +/- lines Integration tests (CLI round-trips): - muse code add — stages modified file as mode M - muse code add — stages new file as mode A - muse code add . — stages everything - muse code add -A — stages all including new files - muse code add -u — stages tracked files only (excludes untracked) - muse code add -u — stages deleted files as mode D - muse code add — expands directory recursively - muse code add --dry-run — shows intent without writing - muse code add -v — verbose per-file output - muse code add (re-stage) — updates object_id when file changes again - nonexistent path — exits non-zero - wrong domain — exits non-zero Stage-aware commit: - Only staged files appear in the committed snapshot - Unstaged changes do NOT appear in the committed snapshot - Stage is cleared after a successful commit - Staged deletion removes file from next commit muse status — three-bucket view: - "Changes staged for commit" section present - "Changes not staged" section present - Untracked files listed - --format json includes staged/unstaged/untracked keys - --json format muse code reset: - reset — unstages that file only - reset HEAD — Git-syntax alias works - reset (no args) — clears everything - reset when nothing staged — exits cleanly Resilience: - Corrupt stage.json degrades gracefully (read_stage returns {}) - Staging a file outside the repo root is rejected Stress: - Staging 100 files in one shot """ from __future__ import annotations import json import os import pathlib from collections.abc import Mapping import pytest from muse.core.types import fake_id from muse.plugins.code.stage import read_stage, stage_path, StagedEntry, StagedFileMap from muse.core.paths import code_dir, muse_dir from tests.cli_test_helper import CliRunner cli = None # argparse migration — CliRunner ignores this arg runner = CliRunner() def _read_stage_raw(root: pathlib.Path) -> StagedFileMap: """Read the current stage index using the production API.""" return read_stage(root) # --------------------------------------------------------------------------- # Unit tests — pure functions # --------------------------------------------------------------------------- class TestSplitIntoHunks: """Unit tests for _split_into_hunks (no I/O).""" def _run(self, diff_text: str) -> list[list[str]]: from muse.cli.commands.code_stage import _split_into_hunks lines = [f"{l}\n" for l in diff_text.splitlines()] return _split_into_hunks(lines) def test_empty_diff_returns_no_hunks(self) -> None: assert self._run("") == [] def test_single_hunk(self) -> None: diff = ( "--- a/foo.py\n" "+++ b/foo.py\n" "@@ -1,2 +1,3 @@\n" " def f():\n" "- pass\n" "+ return 1\n" ) hunks = self._run(diff) assert len(hunks) == 1 assert any("@@" in l for l in hunks[0]) def test_multi_hunk_has_header_on_each(self) -> None: diff = ( "--- a/foo.py\n" "+++ b/foo.py\n" "@@ -1,2 +1,3 @@\n" " line1\n" "-old\n" "+new\n" "@@ -10,2 +11,3 @@\n" " line10\n" "-old10\n" "+new10\n" ) hunks = self._run(diff) assert len(hunks) == 2 # Each hunk starts with the file header (--- / +++), then @@ for h in hunks: assert any(l.startswith("---") for l in h) assert any(l.startswith("+++") for l in h) assert any(l.startswith("@@") for l in h) def test_no_header_lines_before_first_hunk_is_still_valid(self) -> None: diff = ( "@@ -1,1 +1,1 @@\n" "-old\n" "+new\n" ) hunks = self._run(diff) assert len(hunks) == 1 class TestApplyHunksToBytes: """Unit tests for _apply_hunks_to_bytes.""" def _run(self, before: str, diff_text: str, accept_all: bool = True) -> str: from muse.cli.commands.code_stage import _split_into_hunks, _apply_hunks_to_bytes before_lines = before.splitlines(keepends=True) after_lines = diff_text.splitlines(keepends=True) import difflib diff = list(difflib.unified_diff( before_lines, after_lines, fromfile="a/f", tofile="b/f", lineterm="" )) diff_nl = [f"{l}\n" for l in diff] hunks = _split_into_hunks(diff_nl) accepted = hunks if accept_all else [] result = _apply_hunks_to_bytes(before.encode(), accepted) return result.decode() def test_accept_all_hunks_produces_after_content(self) -> None: before = "def f():\n pass\n" after = "def f():\n return 1\n" result = self._run(before, after, accept_all=True) assert "return 1" in result def test_accept_no_hunks_preserves_original(self) -> None: before = "def f():\n pass\n" after = "def f():\n return 1\n" result = self._run(before, after, accept_all=False) assert result == before def test_new_file_from_empty(self) -> None: """Staging a new file from empty before-bytes produces after-content.""" before = "" after = "x = 1\ny = 2\n" result = self._run(before, after, accept_all=True) assert "x = 1" in result def test_binary_safe_with_replacement(self) -> None: from muse.cli.commands.code_stage import _apply_hunks_to_bytes result = _apply_hunks_to_bytes(b"\xff\xfe", []) assert isinstance(result, bytes) class TestInferMode: """Unit tests for _infer_mode.""" def _run(self, rel: str, head: Manifest, exists: bool) -> str: from muse.cli.commands.code_stage import _infer_mode return _infer_mode(rel, head, exists) def test_existing_tracked_is_M(self) -> None: assert self._run("src/a.py", {"src/a.py": "abc"}, True) == "M" def test_new_untracked_is_A(self) -> None: assert self._run("src/new.py", {}, True) == "A" def test_missing_from_disk_is_D(self) -> None: assert self._run("src/gone.py", {"src/gone.py": "abc"}, False) == "D" def test_missing_and_not_tracked_is_D(self) -> None: # Shouldn't normally occur, but must not crash. assert self._run("ghost.py", {}, False) == "D" class TestColorizeHunk: """Unit tests for _colorize_hunk.""" def test_added_lines_get_green(self) -> None: from muse.cli.commands.code_stage import _colorize_hunk result = _colorize_hunk(["+new line\n"]) assert "\x1b[32m" in result # green def test_removed_lines_get_red(self) -> None: from muse.cli.commands.code_stage import _colorize_hunk result = _colorize_hunk(["-old line\n"]) assert "\x1b[31m" in result # red def test_file_header_not_colored(self) -> None: from muse.cli.commands.code_stage import _colorize_hunk result = _colorize_hunk(["--- a/foo.py\n", "+++ b/foo.py\n"]) # file header lines should not get red/green assert "\x1b[31m" not in result assert "\x1b[32m" not in result def test_at_at_header_gets_cyan(self) -> None: from muse.cli.commands.code_stage import _colorize_hunk result = _colorize_hunk(["@@ -1,2 +1,3 @@\n"]) assert "\x1b[36m" in result # cyan # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- def _env(root: pathlib.Path) -> Manifest: return {"MUSE_REPO_ROOT": str(root)} @pytest.fixture() def code_repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: """Initialise a fresh code-domain Muse repo with one initial commit.""" monkeypatch.chdir(tmp_path) result = runner.invoke(cli, ["init", "--domain", "code"], env=_env(tmp_path)) assert result.exit_code == 0, result.output (tmp_path / "auth.py").write_text("def authenticate():\n pass\n") (tmp_path / "models.py").write_text("class User:\n pass\n") r = runner.invoke(cli, ["commit", "-m", "initial"], env=_env(tmp_path)) assert r.exit_code == 0, r.output return tmp_path # --------------------------------------------------------------------------- # muse code add — integration tests # --------------------------------------------------------------------------- class TestCodeAdd: def test_stage_modified_file_is_mode_M(self, code_repo: pathlib.Path) -> None: (code_repo / "auth.py").write_text("def authenticate():\n return True\n") result = runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) assert result.exit_code == 0, result.output assert "modified" in result.output stage = _read_stage_raw(code_repo) assert stage["auth.py"]["mode"] == "M" def test_stage_new_file_is_mode_A(self, code_repo: pathlib.Path) -> None: (code_repo / "new_module.py").write_text("x = 1\n") runner.invoke(cli, ["code", "add", "new_module.py"], env=_env(code_repo)) stage = _read_stage_raw(code_repo) assert stage["new_module.py"]["mode"] == "A" def test_stage_dot_stages_everything(self, code_repo: pathlib.Path) -> None: (code_repo / "auth.py").write_text("# changed\n") runner.invoke(cli, ["code", "add", "."], env=_env(code_repo)) stage = _read_stage_raw(code_repo) assert "auth.py" in stage def test_stage_A_includes_new_files(self, code_repo: pathlib.Path) -> None: (code_repo / "auth.py").write_text("# changed\n") (code_repo / "new.py").write_text("x = 1\n") runner.invoke(cli, ["code", "add", "-A"], env=_env(code_repo)) stage = _read_stage_raw(code_repo) assert "auth.py" in stage assert "new.py" in stage def test_stage_u_excludes_new_untracked_files( self, code_repo: pathlib.Path ) -> None: """-u stages only tracked files; new/untracked files are NOT staged.""" (code_repo / "auth.py").write_text("# tracked change\n") (code_repo / "brand_new.py").write_text("x = 1\n") runner.invoke(cli, ["code", "add", "-u"], env=_env(code_repo)) assert stage_path(code_repo).exists() stage = _read_stage_raw(code_repo) assert "auth.py" in stage assert "brand_new.py" not in stage def test_stage_u_includes_deleted_files(self, code_repo: pathlib.Path) -> None: (code_repo / "models.py").unlink() runner.invoke(cli, ["code", "add", "-u"], env=_env(code_repo)) stage = _read_stage_raw(code_repo) assert "models.py" in stage assert stage["models.py"]["mode"] == "D" def test_stage_directory_expands_recursively( self, code_repo: pathlib.Path ) -> None: src = code_repo / "src" src.mkdir() (src / "a.py").write_text("x = 1\n") (src / "b.py").write_text("y = 2\n") runner.invoke(cli, ["code", "add", "src"], env=_env(code_repo)) stage = _read_stage_raw(code_repo) assert "src/a.py" in stage assert "src/b.py" in stage def test_dry_run_does_not_write_stage(self, code_repo: pathlib.Path) -> None: (code_repo / "auth.py").write_text("# dry\n") runner.invoke( cli, ["code", "add", "--dry-run", "auth.py"], env=_env(code_repo) ) assert not stage_path(code_repo).exists() def test_dry_run_output_shows_files(self, code_repo: pathlib.Path) -> None: (code_repo / "auth.py").write_text("# dry\n") result = runner.invoke( cli, ["code", "add", "--dry-run", "auth.py"], env=_env(code_repo) ) assert "auth.py" in result.output def test_verbose_shows_per_file_output(self, code_repo: pathlib.Path) -> None: (code_repo / "auth.py").write_text("# verbose\n") result = runner.invoke( cli, ["code", "add", "-v", "auth.py"], env=_env(code_repo) ) assert result.exit_code == 0 assert "auth.py" in result.output def test_restage_updates_object_id(self, code_repo: pathlib.Path) -> None: """Staging a file twice with different content updates the object_id.""" (code_repo / "auth.py").write_text("# version 1\n") runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) oid_v1 = _read_stage_raw(code_repo)["auth.py"]["object_id"] (code_repo / "auth.py").write_text("# version 2\n") runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) oid_v2 = _read_stage_raw(code_repo)["auth.py"]["object_id"] assert oid_v1 != oid_v2 def test_staging_unchanged_file_is_idempotent( self, code_repo: pathlib.Path ) -> None: """Staging a file that has not changed since last staging is a no-op.""" (code_repo / "auth.py").write_text("# same\n") runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) result = runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) assert result.exit_code == 0 assert "already up to date" in result.output def test_nonexistent_path_exits_error(self, code_repo: pathlib.Path) -> None: result = runner.invoke( cli, ["code", "add", "does_not_exist.py"], env=_env(code_repo) ) assert result.exit_code != 0 def test_wrong_domain_exits_error( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.chdir(tmp_path) runner.invoke(cli, ["init", "--domain", "midi"], env=_env(tmp_path)) result = runner.invoke(cli, ["code", "add", "file.py"], env=_env(tmp_path)) assert result.exit_code != 0 # --------------------------------------------------------------------------- # Stage-aware commit # --------------------------------------------------------------------------- class TestStageAwareCommit: def test_only_staged_file_is_committed(self, code_repo: pathlib.Path) -> None: (code_repo / "auth.py").write_text("def authenticate():\n return True\n") (code_repo / "models.py").write_text("class User:\n name = 'anon'\n") runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) r = runner.invoke( cli, ["commit", "-m", "auth only", "--json"], env=_env(code_repo), ) assert r.exit_code == 0, r.output data = json.loads(r.output.strip()) from muse.core.commits import read_commit from muse.core.snapshots import read_snapshot from muse.core.object_store import read_object commit = read_commit(code_repo, data["commit_id"]) assert commit is not None snap = read_snapshot(code_repo, commit.snapshot_id) assert snap is not None auth_bytes = read_object(code_repo, snap.manifest["auth.py"]) assert auth_bytes is not None assert b"return True" in auth_bytes models_bytes = read_object(code_repo, snap.manifest["models.py"]) assert models_bytes is not None # models.py was NOT staged — should have old content (pass, not name='anon') assert b"name = 'anon'" not in models_bytes assert b"pass" in models_bytes def test_stage_cleared_after_commit(self, code_repo: pathlib.Path) -> None: (code_repo / "auth.py").write_text("# cleared after commit\n") runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) assert stage_path(code_repo).exists() runner.invoke(cli, ["commit", "-m", "clear stage test"], env=_env(code_repo)) assert not stage_path(code_repo).exists() def test_staged_deletion_removes_file_from_commit( self, code_repo: pathlib.Path ) -> None: (code_repo / "models.py").unlink() runner.invoke(cli, ["code", "add", "-u"], env=_env(code_repo)) r = runner.invoke( cli, ["commit", "-m", "delete models", "--json"], env=_env(code_repo), ) assert r.exit_code == 0, r.output data = json.loads(r.output.strip()) from muse.core.commits import read_commit from muse.core.snapshots import read_snapshot commit = read_commit(code_repo, data["commit_id"]) assert commit is not None snap = read_snapshot(code_repo, commit.snapshot_id) assert snap is not None assert "models.py" not in snap.manifest def test_commit_refuses_when_only_unstaged_changes_exist(self, code_repo: pathlib.Path) -> None: """If there are tracked modified files but NOTHING staged, commit must refuse. Matches git behaviour: unstaged changes alone are not enough to commit. The user must run `muse code add` first. """ (code_repo / "auth.py").write_text("unstaged change\n") r = runner.invoke(cli, ["commit", "-m", "should not commit", "--json"], env=_env(code_repo)) assert r.exit_code != 0, f"commit must refuse with only unstaged changes; got exit=0" data = json.loads(r.output.strip()) assert data.get("error") == "nothing_staged" def test_commit_with_staged_and_unstaged_only_commits_staged(self, code_repo: pathlib.Path) -> None: """When some files are staged and others not, only staged changes are committed.""" (code_repo / "auth.py").write_text("staged change\n") runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) (code_repo / "models.py").write_text("unstaged change\n") r = runner.invoke(cli, ["commit", "-m", "only staged", "--json"], env=_env(code_repo)) assert r.exit_code == 0, r.output data = json.loads(r.output.strip()) from muse.core.commits import read_commit from muse.core.snapshots import read_snapshot commit = read_commit(code_repo, data["commit_id"]) snap = read_snapshot(code_repo, commit.snapshot_id) # The unstaged change to models.py must not appear in the commit original_models = "class User:\n pass\n" assert snap.manifest.get("models.py") is not None # Verify the committed models.py still has the original content (not the unstaged change) from muse.core.object_store import read_object committed_content = read_object(code_repo, snap.manifest["models.py"]) assert committed_content is not None assert b"unstaged change" not in committed_content def test_first_commit_no_stage_required( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """First commit on a brand-new repo requires no staging — full working tree used.""" monkeypatch.chdir(tmp_path) r = runner.invoke(cli, ["init", "--domain", "code"], env=_env(tmp_path)) assert r.exit_code == 0, r.output (tmp_path / "file.py").write_text("x = 1\n") r = runner.invoke(cli, ["commit", "-m", "init", "--json"], env=_env(tmp_path)) assert r.exit_code == 0, r.output data = json.loads(r.output.strip()) from muse.core.commits import read_commit from muse.core.snapshots import read_snapshot commit = read_commit(tmp_path, data["commit_id"]) snap = read_snapshot(tmp_path, commit.snapshot_id) assert "file.py" in snap.manifest # --------------------------------------------------------------------------- # muse status — staged view # --------------------------------------------------------------------------- class TestStageStatus: def test_shows_staged_section_when_stage_active( self, code_repo: pathlib.Path ) -> None: (code_repo / "auth.py").write_text("# staged change\n") runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) result = runner.invoke(cli, ["status"], env=_env(code_repo)) assert result.exit_code == 0, result.output assert "staged for commit" in result.output assert "auth.py" in result.output def test_shows_unstaged_section_for_unmodified_tracked_with_changes( self, code_repo: pathlib.Path ) -> None: (code_repo / "auth.py").write_text("# staged\n") (code_repo / "models.py").write_text("# NOT staged\n") runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) result = runner.invoke(cli, ["status"], env=_env(code_repo)) assert "not staged" in result.output assert "models.py" in result.output def test_shows_untracked_section(self, code_repo: pathlib.Path) -> None: (code_repo / "auth.py").write_text("# staged\n") (code_repo / "brand_new.py").write_text("x = 1\n") runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) result = runner.invoke(cli, ["status"], env=_env(code_repo)) assert "Untracked" in result.output assert "brand_new.py" in result.output def test_json_format_has_all_buckets(self, code_repo: pathlib.Path) -> None: (code_repo / "auth.py").write_text("# json stage\n") (code_repo / "new_file.py").write_text("x = 1\n") runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) result = runner.invoke( cli, ["status", "--json"], env=_env(code_repo) ) assert result.exit_code == 0, result.output data = json.loads(result.output.strip()) assert "staged" in data assert "unstaged" in data assert "untracked" in data assert "auth.py" in data["staged"]["modified"] assert "new_file.py" in data["untracked"] def test_json_format_with_stage(self, code_repo: pathlib.Path) -> None: (code_repo / "auth.py").write_text("# staged\n") runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) result = runner.invoke(cli, ["status", "--json"], env=_env(code_repo)) assert result.exit_code == 0 assert "auth.py" in result.output def test_short_format_with_stage(self, code_repo: pathlib.Path) -> None: (code_repo / "auth.py").write_text("# short\n") runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) result = runner.invoke(cli, ["status", "--short"], env=_env(code_repo)) assert result.exit_code == 0 assert "auth.py" in result.output def test_clean_tree_after_commit_clears_stage( self, code_repo: pathlib.Path ) -> None: """After staging and committing, status should show clean tree.""" (code_repo / "auth.py").write_text("# committed\n") runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) runner.invoke(cli, ["commit", "-m", "staged commit"], env=_env(code_repo)) result = runner.invoke(cli, ["status"], env=_env(code_repo)) assert result.exit_code == 0 # No stage file → falls back to normal drift-based status. assert "staged for commit" not in result.output # --------------------------------------------------------------------------- # muse code reset # --------------------------------------------------------------------------- class TestCodeReset: def test_reset_specific_file(self, code_repo: pathlib.Path) -> None: (code_repo / "auth.py").write_text("# staged\n") (code_repo / "models.py").write_text("# also staged\n") runner.invoke(cli, ["code", "add", "-A"], env=_env(code_repo)) result = runner.invoke( cli, ["code", "reset", "auth.py"], env=_env(code_repo) ) assert result.exit_code == 0 stage = _read_stage_raw(code_repo) assert "auth.py" not in stage assert "models.py" in stage def test_reset_HEAD_syntax(self, code_repo: pathlib.Path) -> None: (code_repo / "auth.py").write_text("# head\n") runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) result = runner.invoke( cli, ["code", "reset", "HEAD", "auth.py"], env=_env(code_repo) ) assert result.exit_code == 0 assert not stage_path(code_repo).exists() def test_reset_no_args_clears_all(self, code_repo: pathlib.Path) -> None: (code_repo / "auth.py").write_text("# a\n") (code_repo / "models.py").write_text("# b\n") runner.invoke(cli, ["code", "add", "-A"], env=_env(code_repo)) result = runner.invoke(cli, ["code", "reset"], env=_env(code_repo)) assert result.exit_code == 0 assert not stage_path(code_repo).exists() def test_reset_when_nothing_staged(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "reset"], env=_env(code_repo)) assert result.exit_code == 0 assert "Nothing staged" in result.output def test_reset_nonexistent_file_does_not_crash( self, code_repo: pathlib.Path ) -> None: (code_repo / "auth.py").write_text("# staged\n") runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) result = runner.invoke( cli, ["code", "reset", "not_in_stage.py"], env=_env(code_repo) ) assert result.exit_code == 0 assert "not staged" in result.output # --------------------------------------------------------------------------- # Resilience # --------------------------------------------------------------------------- class TestResilience: def test_corrupt_stage_json_returns_empty( self, code_repo: pathlib.Path ) -> None: """Corrupt stage.json must degrade gracefully — returns {} on read.""" stage_dir = code_dir(code_repo) stage_dir.mkdir(parents=True, exist_ok=True) (stage_dir / "stage.json").write_bytes(b"\xde\xad\xbe\xef garbage") entries = read_stage(code_repo) assert entries == {} def test_truncated_stage_json_returns_empty( self, code_repo: pathlib.Path ) -> None: stage_dir = code_dir(code_repo) stage_dir.mkdir(parents=True, exist_ok=True) (stage_dir / "stage.json").write_bytes(b"\x00\x01\x02") entries = read_stage(code_repo) assert entries == {} def test_stage_json_is_readable_directly( self, code_repo: pathlib.Path ) -> None: """stage.json is the canonical format — existing files are read directly.""" import json as _json stage_dir = code_dir(code_repo) stage_dir.mkdir(parents=True, exist_ok=True) stage_dir.joinpath("stage.json").write_text(_json.dumps({ "version": 3, "entries": { "auth.py": {"object_id": f"{'abc123' * 10}ab12", "mode": "M", "staged_at": "2026-01-01T00:00:00+00:00"}, }, })) entries = read_stage(code_repo) assert "auth.py" in entries assert entries["auth.py"]["mode"] == "M" # stage.json is the canonical path — it stays on disk. assert stage_path(code_repo).exists() def test_missing_stage_returns_empty(self, code_repo: pathlib.Path) -> None: entries = read_stage(code_repo) assert entries == {} def test_write_empty_entries_removes_file( self, code_repo: pathlib.Path ) -> None: from muse.plugins.code.stage import write_stage, StagedFileMap path = stage_path(code_repo) path.parent.mkdir(parents=True, exist_ok=True) # Create a non-empty JSON stage file first. import json as _json path.write_bytes(_json.dumps({"version": 3, "entries": {"f.py": {"object_id": "a" * 64, "mode": "M", "staged_at": "x"}}}).encode()) write_stage(code_repo, {}) assert not path.exists() def test_clear_stage_idempotent(self, code_repo: pathlib.Path) -> None: from muse.plugins.code.stage import clear_stage, StagedFileMap clear_stage(code_repo) # no stage to clear — must not raise clear_stage(code_repo) # idempotent # --------------------------------------------------------------------------- # Stress test # --------------------------------------------------------------------------- class TestStageStress: def test_stage_100_files( self, code_repo: pathlib.Path ) -> None: """Staging 100 files must complete without error and write all entries.""" for i in range(100): (code_repo / f"module_{i:03d}.py").write_text(f"X_{i} = {i}\n") result = runner.invoke(cli, ["code", "add", "-A"], env=_env(code_repo)) assert result.exit_code == 0, result.output stage = _read_stage_raw(code_repo) # 100 new files + 2 original tracked files (auth.py, models.py) assert len(stage) >= 100 def test_commit_100_staged_files( self, code_repo: pathlib.Path ) -> None: """Committing 100 staged files produces a correct manifest.""" for i in range(100): (code_repo / f"mod_{i:03d}.py").write_text(f"V = {i}\n") runner.invoke(cli, ["code", "add", "-A"], env=_env(code_repo)) r = runner.invoke( cli, ["commit", "-m", "100 files", "--json"], env=_env(code_repo), ) assert r.exit_code == 0, r.output data = json.loads(r.output.strip()) from muse.core.commits import read_commit from muse.core.snapshots import read_snapshot commit = read_commit(code_repo, data["commit_id"]) assert commit is not None snap = read_snapshot(code_repo, commit.snapshot_id) assert snap is not None assert len(snap.manifest) >= 100 def test_add_all_stages_deletions( code_repo: pathlib.Path, ) -> None: """``muse code add -A`` must stage tracked files that have been deleted. Regression test: before the fix, ``-A`` used ``_walk_tree`` which only returns files present on disk. Deleted tracked files were therefore silently omitted and the deletion was never recorded in the stage. """ # code_repo already has auth.py and models.py committed. os.remove(code_repo / "auth.py") r = runner.invoke(cli, ["code", "add", "-A"], env=_env(code_repo)) assert r.exit_code == 0, r.output from muse.plugins.code.stage import read_stage, StagedFileMap stage = read_stage(code_repo) assert "auth.py" in stage, "deleted tracked file must appear in stage" assert stage["auth.py"]["mode"] == "D", "deleted file must have mode D" def test_add_dot_stages_museattributes( code_repo: pathlib.Path, ) -> None: """`muse code add .` must stage `.museattributes` when it exists. Regression test: before the fix, ``_walk_tree`` skipped all files whose name started with ``.``, so ``.museattributes`` and ``.museignore`` could never be staged with ``muse code add .`` — they required an explicit path. """ (code_repo / ".museattributes").write_text("[*.py]\nmerge = python\n") r = runner.invoke(cli, ["code", "add", "."], env=_env(code_repo)) assert r.exit_code == 0, r.output from muse.plugins.code.stage import read_stage stage = read_stage(code_repo) assert ".museattributes" in stage, ".museattributes must be staged by `muse code add .`" def test_add_dot_stages_museignore( code_repo: pathlib.Path, ) -> None: """`muse code add .` must stage `.museignore` itself when it exists. The file that controls ignore patterns should be version-controlled just like ``.gitignore`` is — ``muse code add .`` must include it. Note: the test uses empty patterns so the file doesn't suppress itself. """ (code_repo / ".museignore").write_text('[global]\npatterns = []\n') r = runner.invoke(cli, ["code", "add", "."], env=_env(code_repo)) assert r.exit_code == 0, r.output from muse.plugins.code.stage import read_stage stage = read_stage(code_repo) assert ".museignore" in stage, ".museignore itself must be staged by `muse code add .`" def test_add_dot_does_not_stage_museignore_files( code_repo: pathlib.Path, ) -> None: """``muse code add .`` must not stage files matched by ``.museignore``. Regression test: before the fix, ``_walk_tree`` never consulted ``.museignore``, so any file on disk — including ones the user explicitly excluded — could be silently staged and committed. """ (code_repo / ".museignore").write_text('[global]\npatterns = ["*.log"]\n') (code_repo / "debug.log").write_text("ignored content\n") (code_repo / "app.py").write_text("# new code\n") r = runner.invoke(cli, ["code", "add", "."], env=_env(code_repo)) assert r.exit_code == 0, r.output from muse.plugins.code.stage import read_stage, StagedFileMap stage = read_stage(code_repo) assert "debug.log" not in stage, ".museignore'd file must NOT be staged" assert "app.py" in stage, "non-ignored new file must be staged" def test_add_dot_does_not_stage_unchanged_files( code_repo: pathlib.Path, ) -> None: """``muse code add .`` must only stage files whose content differs from HEAD. Regression test for the bug where ``muse code add .`` staged every file in the working tree regardless of whether it had changed, because the "skip-if-already-staged" guard was only consulted (and only correct) after a second ``add`` run. On a fresh stage the check was vacuously false for all files, so even unchanged files were staged. """ # Make an initial commit so HEAD has a manifest (must stage first). (code_repo / "alpha.py").write_text("x = 1\n") (code_repo / "beta.py").write_text("y = 2\n") runner.invoke(cli, ["code", "add", "alpha.py", "beta.py"], env=_env(code_repo)) runner.invoke(cli, ["commit", "-m", "initial"], env=_env(code_repo)) # Modify only one file; leave the other untouched. (code_repo / "alpha.py").write_text("x = 99\n") # Stage everything. r = runner.invoke(cli, ["code", "add", "."], env=_env(code_repo)) assert r.exit_code == 0, r.output # Only the changed file must be staged — NOT the unchanged beta.py. from muse.plugins.code.stage import read_stage, StagedFileMap stage = read_stage(code_repo) assert "alpha.py" in stage, "modified file must be staged" assert "beta.py" not in stage, "unchanged file must NOT appear in stage" def test_add_dot_stages_deletions( code_repo: pathlib.Path, ) -> None: """``muse code add .`` must stage tracked files that have been deleted from disk. Regression test: before the fix, ``muse code add .`` (no flags) only walked the working tree, so deleted files were silently omitted. Users had to know to pass ``-A`` or explicitly name each deleted file — a significant ergonomic gap vs ``git add .`` which has staged deletions since Git 2.0. """ # code_repo already has auth.py and models.py committed. os.remove(code_repo / "auth.py") r = runner.invoke(cli, ["code", "add", "."], env=_env(code_repo)) assert r.exit_code == 0, r.output from muse.plugins.code.stage import read_stage, StagedFileMap stage = read_stage(code_repo) assert "auth.py" in stage, "deleted tracked file must appear in stage with `muse code add .`" assert stage["auth.py"]["mode"] == "D", "deleted file must have mode D" def test_add_explicit_path_stages_deletion( code_repo: pathlib.Path, ) -> None: """``muse code add `` must stage a deletion when the file is gone from disk. Mirrors ``git add `` which stages the deletion regardless of whether the file still exists on disk. Before the fix, naming a non-existent path emitted ``❌ Path not found`` and silently skipped the deletion. """ # code_repo already has auth.py committed — delete it from disk. os.remove(code_repo / "auth.py") r = runner.invoke(cli, ["code", "add", "auth.py"], env=_env(code_repo)) assert r.exit_code == 0, r.output from muse.plugins.code.stage import read_stage stage = read_stage(code_repo) assert "auth.py" in stage, "deleted tracked file must appear in stage when named explicitly" assert stage["auth.py"]["mode"] == "D", "deleted file must have mode D" # --------------------------------------------------------------------------- # Regression tests — _head_manifest branch resolution (Bug A) # # Written BEFORE the fix to document expected behaviour. Both tests verify # that _head_manifest resolves the branch through the store abstraction # (get_head_commit_id), not by reading the ref file directly. # --------------------------------------------------------------------------- class TestHeadManifestResolution: """_head_manifest must use the store abstraction, not the raw ref file.""" def test_empty_branch_returns_empty_dict( self, tmp_path: pathlib.Path ) -> None: """With no commits on the branch, _head_manifest returns {}.""" from muse.cli.commands.code_stage import _head_manifest dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "repo.json").write_text('{"repo_id":"test"}') (dot_muse / "HEAD").write_text("ref: refs/heads/main") (dot_muse / "refs" / "heads").mkdir(parents=True) (dot_muse / "commits").mkdir() (dot_muse / "snapshots").mkdir() # No ref file written — branch has no commits. result = _head_manifest(tmp_path) assert result == {} def test_branch_with_commit_returns_manifest( self, tmp_path: pathlib.Path ) -> None: """With a real commit on the branch, _head_manifest returns its manifest.""" import datetime from muse.cli.commands.code_stage import _head_manifest from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import SnapshotRecord from muse.core.snapshots import write_snapshot from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "repo.json").write_text('{"repo_id":"test"}') (dot_muse / "HEAD").write_text("ref: refs/heads/main") (dot_muse / "refs" / "heads").mkdir(parents=True) (dot_muse / "commits").mkdir() (dot_muse / "snapshots").mkdir() _hello_id = fake_id("hello.py-content") manifest = {"hello.py": _hello_id} snap_id = compute_snapshot_id(manifest) snap = SnapshotRecord(snapshot_id=snap_id, manifest=manifest) write_snapshot(tmp_path, snap) committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) commit_id = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="init", committed_at_iso=committed_at.isoformat(), author="tester", ) commit = CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message="init", committed_at=committed_at, author="tester", ) write_commit(tmp_path, commit) (dot_muse / "refs" / "heads" / "main").write_text(commit_id) result = _head_manifest(tmp_path) assert result == {"hello.py": _hello_id} # --------------------------------------------------------------------------- # TestRegisterFlags # --------------------------------------------------------------------------- # Regression tests — staging idempotency (Bug B) # # muse code add . on an already-staged repo must be a no-op. # Before the fix, deletions and empty-dir sentinels were re-staged on every # invocation, producing wrong counts and "Staged N" when nothing had changed. # --------------------------------------------------------------------------- def _env(tmp: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(tmp)} class TestStageIdempotency: """Running muse code add . twice must be a no-op on the second call.""" def _run(self, root: pathlib.Path, *args: str) -> str: r = runner.invoke(cli, list(args), env=_env(root)) assert r.exit_code == 0, f"{list(args)} failed:\n{r.output}" return r.output.strip() def _setup_repo(self, tmp: pathlib.Path) -> pathlib.Path: self._run(tmp, "init", "--domain", "code") (tmp / "keep.py").write_text("x = 1\n") self._run(tmp, "code", "add", ".") self._run(tmp, "commit", "-m", "initial") return tmp def test_second_add_after_deletion_staged_is_noop( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Staging a deletion then running muse code add . again must say 'Nothing to stage'.""" monkeypatch.chdir(tmp_path) root = self._setup_repo(tmp_path) (root / "keep.py").unlink() out1 = self._run(root, "code", "add", ".") assert "deleted" in out1 out2 = self._run(root, "code", "add", ".") assert "Nothing" in out2, f"second add should be no-op, got: {out2!r}" def test_second_add_after_modification_staged_is_noop( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Staging a modification then running muse code add . again must say 'Nothing to stage'.""" monkeypatch.chdir(tmp_path) root = self._setup_repo(tmp_path) (root / "keep.py").write_text("x = 2\n") out1 = self._run(root, "code", "add", ".") assert "modified" in out1 out2 = self._run(root, "code", "add", ".") assert "Nothing" in out2, f"second add should be no-op, got: {out2!r}" def test_committed_empty_dir_not_staged_on_add( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """muse code add . must not sentinel-stage an already-committed empty dir.""" monkeypatch.chdir(tmp_path) root = self._setup_repo(tmp_path) (root / "emptydir").mkdir() self._run(root, "code", "add", ".") self._run(root, "commit", "-m", "add emptydir") # Clean state — now run code add . again out = self._run(root, "code", "add", ".") assert "Nothing" in out, ( f"committed empty dir must not be re-staged: {out!r}" ) def test_total_matches_sum_of_categories( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Staged N file(s): A added, M modified, D deleted — N must equal A+M+D.""" monkeypatch.chdir(tmp_path) root = self._setup_repo(tmp_path) (root / "new.py").write_text("y = 1\n") (root / "keep.py").write_text("x = 99\n") (root / "gone.py").write_text("z = 0\n") self._run(root, "code", "add", ".") self._run(root, "commit", "-m", "add gone.py") (root / "gone.py").unlink() out = self._run(root, "code", "add", ".") # Format: "Staged {parts}." where parts are "N added files", "N modified", # "N deleted", "N directories" — all numbers must be positive. import re assert out.startswith("Staged ") and out.rstrip().endswith("."), ( f"unexpected output format: {out!r}" ) nums = [int(x) for x in re.findall(r"\d+", out)] assert nums and all(n > 0 for n in nums), ( f"all counts must be positive in: {out!r}" ) # --------------------------------------------------------------------------- import argparse as _argparse class TestRegisterFlags: """register_add() and register_reset() wire --json / -j correctly.""" def _parse_add(self, *args: str) -> _argparse.Namespace: from muse.cli.commands.code_stage import register_add p = _argparse.ArgumentParser() sub = p.add_subparsers() register_add(sub) return p.parse_args(["add", *args]) def _parse_reset(self, *args: str) -> _argparse.Namespace: from muse.cli.commands.code_stage import register_reset p = _argparse.ArgumentParser() sub = p.add_subparsers() register_reset(sub) return p.parse_args(["reset", *args]) def test_add_default_json_out_is_false(self) -> None: ns = self._parse_add("foo.py") assert ns.json_out is False def test_add_json_flag_sets_json_out(self) -> None: ns = self._parse_add("--json", "foo.py") assert ns.json_out is True def test_add_j_shorthand_sets_json_out(self) -> None: ns = self._parse_add("-j", "foo.py") assert ns.json_out is True def test_reset_default_json_out_is_false(self) -> None: ns = self._parse_reset() assert ns.json_out is False def test_reset_json_flag_sets_json_out(self) -> None: ns = self._parse_reset("--json") assert ns.json_out is True def test_reset_j_shorthand_sets_json_out(self) -> None: ns = self._parse_reset("-j") assert ns.json_out is True