"""Tests for ``muse commit``. Coverage tiers -------------- Unit — parser flags, pure-logic helpers, sanitization. Integration — actual repo operations: commits, snapshots, reflog, harmony. End-to-end — CLI invocations, text and JSON output paths. Security — ANSI injection, author impersonation, provenance field caps. Stress — 100 sequential commits, large manifests, concurrent writes. """ from __future__ import annotations import argparse import json import os import pathlib import subprocess import threading import time from unittest.mock import patch import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.refs import ( get_head_commit_id, read_current_branch, ) from muse.core.commits import read_commit from muse.core.snapshots import read_snapshot runner = CliRunner() # ────────────────────────────────────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────────────────────────────────────── def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult: """Run a muse command in *repo* and return the result.""" saved = os.getcwd() try: os.chdir(repo) return runner.invoke(None, args) finally: os.chdir(saved) def _commit(repo: pathlib.Path, *extra: str) -> InvokeResult: _invoke(repo, ["code", "add", "."]) return _invoke(repo, ["commit", *extra]) def _init_repo(repo: pathlib.Path) -> InvokeResult: repo.mkdir(parents=True, exist_ok=True) return _invoke(repo, ["init"]) # ────────────────────────────────────────────────────────────────────────────── # Fixtures # ────────────────────────────────────────────────────────────────────────────── @pytest.fixture() def repo(tmp_path: pathlib.Path) -> pathlib.Path: """Initialised repo with one tracked file ready to commit.""" _init_repo(tmp_path) (tmp_path / "a.py").write_text("x = 1\n") return tmp_path # ────────────────────────────────────────────────────────────────────────────── # Unit — parser flags # ────────────────────────────────────────────────────────────────────────────── class TestRegisterFlags: """All expected CLI flags are registered on the commit subcommand.""" def _parse(self, *args: str) -> argparse.Namespace: from muse.cli.commands.commit import register p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["commit", *args]) def test_message_flag(self) -> None: ns = self._parse("-m", "hello") assert ns.message == "hello" def test_allow_empty_flag(self) -> None: ns = self._parse("-m", "x", "--allow-empty") assert ns.allow_empty is True def test_dry_run_short_flag(self) -> None: ns = self._parse("-m", "x", "-n") assert ns.dry_run is True def test_dry_run_long_flag(self) -> None: ns = self._parse("-m", "x", "--dry-run") assert ns.dry_run is True def test_json_flag(self) -> None: ns = self._parse("-m", "x", "--json") assert ns.json_out is True def test_j_shorthand(self) -> None: ns = self._parse("-m", "x", "-j") assert ns.json_out is True def test_default_json_out_is_false(self) -> None: ns = self._parse("-m", "x") assert ns.json_out is False def test_agent_id_flag(self) -> None: ns = self._parse("-m", "x", "--agent-id", "bot-1") assert ns.agent_id == "bot-1" def test_model_id_flag(self) -> None: ns = self._parse("-m", "x", "--model-id", "claude-4") assert ns.model_id == "claude-4" def test_toolchain_id_flag(self) -> None: ns = self._parse("-m", "x", "--toolchain-id", "cursor-v1") assert ns.toolchain_id == "cursor-v1" def test_section_flag(self) -> None: ns = self._parse("-m", "x", "--section", "chorus") assert ns.section == "chorus" def test_track_flag(self) -> None: ns = self._parse("-m", "x", "--track", "bass") assert ns.track == "bass" def test_emotion_flag(self) -> None: ns = self._parse("-m", "x", "--emotion", "joyful") assert ns.emotion == "joyful" def test_author_flag(self) -> None: ns = self._parse("-m", "x", "--author", "alice") assert ns.author == "alice" def test_sign_flag(self) -> None: ns = self._parse("-m", "x", "--sign") assert ns.sign is True # ────────────────────────────────────────────────────────────────────────────── # Unit — _MAX_FIELD_LEN constant # ────────────────────────────────────────────────────────────────────────────── class TestMaxFieldLen: def test_constant_exists_and_is_256(self) -> None: from muse.cli.commands.commit import _MAX_FIELD_LEN assert _MAX_FIELD_LEN == 256 def test_no_separate_max_author_constant(self) -> None: import muse.cli.commands.commit as m assert not hasattr(m, "_MAX_AUTHOR"), "_MAX_AUTHOR should be replaced by _MAX_FIELD_LEN" assert not hasattr(m, "_MAX_PROV"), "_MAX_PROV should be replaced by _MAX_FIELD_LEN" # ────────────────────────────────────────────────────────────────────────────── # Unit — dead-code removal # ────────────────────────────────────────────────────────────────────────────── class TestDeadCodeRemoved: def test_read_branch_removed(self) -> None: import muse.cli.commands.commit as m assert not hasattr(m, "_read_branch"), ( "_read_branch was a dead wrapper; it should have been deleted" ) def test_read_parent_id_removed(self) -> None: import muse.cli.commands.commit as m assert not hasattr(m, "_read_parent_id"), ( "_read_parent_id was a dead wrapper; it should have been deleted" ) # ────────────────────────────────────────────────────────────────────────────── # Unit — inline imports removed # ────────────────────────────────────────────────────────────────────────────── class TestNoInlineImports: def test_sign_commit_record_is_module_level_import(self) -> None: import inspect import muse.cli.commands.commit as m src = inspect.getsource(m.run) assert "from muse.core.provenance import sign_commit_record" not in src, ( "sign_commit_record import must be at module level, not inside run()" ) def test_no_inline_store_imports(self) -> None: import inspect import muse.cli.commands.commit as m src = inspect.getsource(m.run) assert "from muse.core.store import" not in src, ( "store imports inside run() should be at module level" ) # ────────────────────────────────────────────────────────────────────────────── # Integration — basic commit lifecycle # ────────────────────────────────────────────────────────────────────────────── class TestBasicCommit: def test_first_commit_succeeds(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "init") assert result.exit_code == 0 assert "init" in result.output def test_commit_creates_commit_record(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "first") branch = read_current_branch(repo) cid = get_head_commit_id(repo, branch) assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert rec.message == "first" def test_commit_creates_snapshot(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "snap") branch = read_current_branch(repo) cid = get_head_commit_id(repo, branch) assert cid is not None rec = read_commit(repo, cid) assert rec is not None snap = read_snapshot(repo, rec.snapshot_id) assert snap is not None assert len(snap.manifest) >= 1 def test_commit_advances_branch_ref(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "first") cid1 = get_head_commit_id(repo, "main") (repo / "b.py").write_text("y = 2\n") _commit(repo, "-m", "second") cid2 = get_head_commit_id(repo, "main") assert cid1 != cid2 def test_second_commit_has_parent(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "first") cid1 = get_head_commit_id(repo, "main") (repo / "b.py").write_text("y = 2\n") _commit(repo, "-m", "second") cid2 = get_head_commit_id(repo, "main") assert cid2 is not None rec2 = read_commit(repo, cid2) assert rec2 is not None assert rec2.parent_commit_id == cid1 def test_nothing_to_commit_exits_0(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "first") result = _commit(repo, "-m", "second") assert result.exit_code == 0 assert "Nothing to commit" in result.output def test_metadata_section_stored(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "chorus", "--section", "chorus") branch = read_current_branch(repo) cid = get_head_commit_id(repo, branch) assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert rec.metadata.get("section") == "chorus" def test_metadata_track_stored(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "bass", "--track", "bass") branch = read_current_branch(repo) cid = get_head_commit_id(repo, branch) assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert rec.metadata.get("track") == "bass" def test_metadata_emotion_stored(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "joy", "--emotion", "joyful") branch = read_current_branch(repo) cid = get_head_commit_id(repo, branch) assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert rec.metadata.get("emotion") == "joyful" # ────────────────────────────────────────────────────────────────────────────── # Integration — allow-empty # ────────────────────────────────────────────────────────────────────────────── class TestAllowEmpty: def test_allow_empty_creates_commit(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "empty", "--allow-empty") assert result.exit_code == 0 def test_allow_empty_without_message_warns( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: import logging with caplog.at_level(logging.WARNING, logger="muse.cli.commands.commit"): _commit(repo, "--allow-empty") assert any( "empty message" in r.message or "--allow-empty" in r.message for r in caplog.records ) def test_allow_empty_without_message_exits_0(self, repo: pathlib.Path) -> None: result = _commit(repo, "--allow-empty") assert result.exit_code == 0 def test_allow_empty_json_message_is_empty_string(self, repo: pathlib.Path) -> None: result = _commit(repo, "--allow-empty", "--json") data = json.loads(result.output) assert data["message"] == "" # ────────────────────────────────────────────────────────────────────────────── # Integration — validation errors # ────────────────────────────────────────────────────────────────────────────── class TestValidation: def test_missing_message_exits_1(self, repo: pathlib.Path) -> None: result = _commit(repo) assert result.exit_code == 1 def test_missing_message_prints_hint(self, repo: pathlib.Path) -> None: result = _commit(repo) assert "-m" in result.stderr or "message" in result.stderr.lower() def test_unknown_flag_exits_nonzero(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "x", "--no-such-flag") assert result.exit_code != 0 def test_empty_tree_without_allow_empty_exits_1(self, tmp_path: pathlib.Path) -> None: # Create a bare .muse structure with no tracked files at all (pre-init state). # This is the only scenario where the "empty tree" guard fires, because # muse init always writes .museattributes and .museignore as tracked files. bare = tmp_path / "bare" bare.mkdir() muse_dir(bare).mkdir() (head_path(bare)).write_text("ref: refs/heads/main\n") (muse_dir(bare) / "refs").mkdir() (heads_dir(bare)).mkdir() (repo_json_path(bare)).write_text( f'{{"repo_id": "{"a" * 36}", "schema_version": 1, "domain": "code"}}' ) result = _invoke(bare, ["commit", "-m", "empty tree"]) # Either exits 1 (empty tree guard) or 0 (domain plugin tracks no files). # The point is that it must not crash. assert result.exit_code in (0, 1) # ────────────────────────────────────────────────────────────────────────────── # End-to-end — JSON output schema # ────────────────────────────────────────────────────────────────────────────── class TestJsonSchema: """All keys agents depend on must be present in every JSON response.""" REQUIRED_KEYS = { "commit_id", "branch", "snapshot_id", "message", "parent_commit_id", "parent2_commit_id", "committed_at", "author", "agent_id", "sem_ver_bump", "breaking_changes", "files_changed", "dry_run", } def test_first_commit_json_keys(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "first", "--json") assert result.exit_code == 0 data = json.loads(result.output) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing keys: {missing}" def test_parent_commit_id_null_on_first_commit(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "first", "--json") data = json.loads(result.output) assert data["parent_commit_id"] is None def test_parent_commit_id_populated_on_second_commit(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "first") cid1 = get_head_commit_id(repo, "main") (repo / "b.py").write_text("y=2\n") result = _commit(repo, "-m", "second", "--json") data = json.loads(result.output) assert data["parent_commit_id"] == cid1 def test_parent2_commit_id_null_on_regular_commit(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "first", "--json") data = json.loads(result.output) assert data["parent2_commit_id"] is None def test_breaking_changes_is_list(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "first", "--json") data = json.loads(result.output) assert isinstance(data["breaking_changes"], list) def test_sem_ver_bump_is_string(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "first", "--json") data = json.loads(result.output) assert isinstance(data["sem_ver_bump"], str) def test_agent_id_default_empty_string(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "first", "--json") data = json.loads(result.output) assert data["agent_id"] == "" def test_agent_id_from_flag(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "x", "--agent-id", "bot-42", "--json") data = json.loads(result.output) assert data["agent_id"] == "bot-42" def test_agent_id_from_env(self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("MUSE_AGENT_ID", "env-bot") result = _invoke(repo, ["commit", "-m", "x", "--json"]) data = json.loads(result.output) assert data["agent_id"] == "env-bot" def test_dry_run_false_on_real_commit(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "x", "--json") data = json.loads(result.output) assert data["dry_run"] is False def test_files_changed_structure(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "x", "--json") data = json.loads(result.output) fc = data["files_changed"] assert isinstance(fc, dict) assert {"added", "modified", "deleted", "total"} <= set(fc.keys()) def test_files_added_counted(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "x", "--json") data = json.loads(result.output) assert data["files_changed"]["added"] >= 1 def test_files_modified_counted(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "first") (repo / "a.py").write_text("x = 99\n") result = _commit(repo, "-m", "mod", "--json") data = json.loads(result.output) assert data["files_changed"]["modified"] == 1 assert data["files_changed"]["added"] == 0 def test_files_deleted_counted(self, repo: pathlib.Path) -> None: (repo / "del.py").write_text("z = 3\n") _commit(repo, "-m", "add del.py") (repo / "del.py").unlink() result = _commit(repo, "-m", "remove", "--json") data = json.loads(result.output) assert data["files_changed"]["deleted"] == 1 def test_committed_at_is_utc_iso(self, repo: pathlib.Path) -> None: import datetime result = _commit(repo, "-m", "x", "--json") data = json.loads(result.output) dt = datetime.datetime.fromisoformat(data["committed_at"]) assert dt.tzinfo is not None # ────────────────────────────────────────────────────────────────────────────── # End-to-end — dry-run # ────────────────────────────────────────────────────────────────────────────── class TestDryRun: def test_dry_run_no_commit_written(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "dr", "--dry-run") assert result.exit_code == 0 assert get_head_commit_id(repo, "main") is None def test_dry_run_json_schema(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "dr", "--dry-run", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["dry_run"] is True assert data["clean"] is False assert "commit_id" in data assert "files_changed" in data def test_dry_run_snapshot_id_stable(self, repo: pathlib.Path) -> None: """Same tree content → same snapshot_id on repeated dry-runs.""" r1 = _commit(repo, "-m", "dr", "--dry-run", "--json") r2 = _commit(repo, "-m", "dr", "--dry-run", "--json") d1 = json.loads(r1.output) d2 = json.loads(r2.output) assert d1["snapshot_id"] == d2["snapshot_id"] def test_dry_run_clean_tree_exits_1(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "first") result = _commit(repo, "-m", "no changes", "--dry-run") assert result.exit_code == 1 def test_dry_run_clean_tree_json_clean_flag(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "first") result = _commit(repo, "-m", "no changes", "--dry-run", "--json") data = json.loads(result.output) assert data["clean"] is True def test_dry_run_text_output_prefix(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "preview", "--dry-run") assert "dry-run" in result.output def test_dry_run_text_output_nothing_written_note(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "preview", "--dry-run") assert "nothing written" in result.output def test_dry_run_shows_sem_ver_in_json(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "dr", "--dry-run", "--json") data = json.loads(result.output) assert "sem_ver_bump" in data def test_dry_run_does_not_advance_branch(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "first") cid_before = get_head_commit_id(repo, "main") (repo / "b.py").write_text("z=9\n") _commit(repo, "-m", "second", "--dry-run") cid_after = get_head_commit_id(repo, "main") assert cid_before == cid_after def test_dry_run_parent_commit_id_in_json(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "first") cid1 = get_head_commit_id(repo, "main") (repo / "b.py").write_text("z=9\n") result = _commit(repo, "-m", "second", "--dry-run", "--json") data = json.loads(result.output) assert data["parent_commit_id"] == cid1 # ────────────────────────────────────────────────────────────────────────────── # End-to-end — text output # ────────────────────────────────────────────────────────────────────────────── class TestTextOutput: def test_text_shows_branch_and_short_id(self, repo: pathlib.Path) -> None: import re result = _commit(repo, "-m", "hello") assert "main" in result.output # Output format: "[main sha256:X...] message" # The sha256: prefix is canonical — check for it directly. assert re.search(r"sha256:[0-9a-f]+", result.output), ( f"No sha256:-prefixed commit ID found in: {result.output!r}" ) def test_text_shows_message(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "verse melody") assert "verse melody" in result.output def test_text_shows_files_changed(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "x") assert "file" in result.output def test_text_nothing_to_commit_message(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "first") result = _commit(repo, "-m", "second") assert "Nothing to commit" in result.output # ────────────────────────────────────────────────────────────────────────────── # Security — ANSI injection prevention # ────────────────────────────────────────────────────────────────────────────── class TestSecurityAnsi: """Text output must never emit raw ANSI escape sequences from user input.""" def _has_ansi(self, s: str) -> bool: return "\x1b[" in s or "\x1b]" in s def test_ansi_in_message_stripped_from_text_output(self, repo: pathlib.Path) -> None: msg = "hello \x1b[31mred\x1b[0m world" result = _commit(repo, "-m", msg) assert not self._has_ansi(result.output), "ANSI in message leaked to text output" def test_ansi_in_message_flag_sanitized(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "\x1b[31mmalicious\x1b[0m message") assert not self._has_ansi(result.output) def test_ansi_in_author_sanitized(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "x", "--author", "\x1b[1mmalicious\x1b[0m") assert not self._has_ansi(result.output) # ────────────────────────────────────────────────────────────────────────────── # Security — author / provenance field caps # ────────────────────────────────────────────────────────────────────────────── class TestSecurityProvenance: def test_author_capped_at_256_chars(self, repo: pathlib.Path) -> None: long_author = "a" * 500 _commit(repo, "-m", "x", "--author", long_author) branch = read_current_branch(repo) cid = get_head_commit_id(repo, branch) assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert len(rec.author) <= 256 def test_agent_id_capped_at_256_chars(self, repo: pathlib.Path) -> None: long_id = "b" * 500 _commit(repo, "-m", "x", "--agent-id", long_id) branch = read_current_branch(repo) cid = get_head_commit_id(repo, branch) assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert len(rec.agent_id) <= 256 def test_author_control_chars_stripped(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "x", "--author", "alice\x00\x01\x02") branch = read_current_branch(repo) cid = get_head_commit_id(repo, branch) assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert "\x00" not in rec.author assert "\x01" not in rec.author def test_author_override_emits_warning( self, repo: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: import logging with caplog.at_level(logging.WARNING, logger="muse.cli.commands.commit"): _commit(repo, "-m", "x", "--author", "malicious-impersonator") assert any( "impersonation" in r.message or "--author" in r.message for r in caplog.records ) def test_agent_id_from_flag_overrides_env( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.setenv("MUSE_AGENT_ID", "env-agent") result = _invoke(repo, ["commit", "-m", "x", "--agent-id", "flag-agent", "--json"]) data = json.loads(result.output) assert data["agent_id"] == "flag-agent" # ────────────────────────────────────────────────────────────────────────────── # Integration — merge-parent recording # ────────────────────────────────────────────────────────────────────────────── class TestMergeParent: """When a merge commit is created, parent2_commit_id must be set.""" def test_merge_commit_has_two_parents(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "base") _invoke(repo, ["branch", "feat"]) _invoke(repo, ["checkout", "feat"]) (repo / "feat.py").write_text("f = 1\n") _commit(repo, "-m", "feat commit") _invoke(repo, ["checkout", "main"]) (repo / "main_only.py").write_text("m = 1\n") _commit(repo, "-m", "main commit") _invoke(repo, ["merge", "feat"]) cid = get_head_commit_id(repo, "main") assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert rec.parent2_commit_id is not None def test_regular_commit_parent2_is_none(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "first") (repo / "b.py").write_text("b=1\n") _commit(repo, "-m", "second") branch = read_current_branch(repo) cid = get_head_commit_id(repo, branch) assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert rec.parent2_commit_id is None # ────────────────────────────────────────────────────────────────────────────── # Integration — SemVer bump inference # ────────────────────────────────────────────────────────────────────────────── class TestSemVerBump: def test_first_commit_sem_ver_bump_valid(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "init") cid = get_head_commit_id(repo, "main") assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert rec.sem_ver_bump in ("none", "patch", "minor", "major") def test_json_sem_ver_bump_is_valid_value(self, repo: pathlib.Path) -> None: result = _commit(repo, "-m", "x", "--json") data = json.loads(result.output) assert data["sem_ver_bump"] in ("none", "patch", "minor", "major") def test_breaking_changes_list_in_record(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "first") branch = read_current_branch(repo) cid = get_head_commit_id(repo, branch) assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert isinstance(rec.breaking_changes, list) # ────────────────────────────────────────────────────────────────────────────── # Integration — reflog # ────────────────────────────────────────────────────────────────────────────── class TestReflog: def test_commit_appends_reflog_entry(self, repo: pathlib.Path) -> None: from muse.core.reflog import read_reflog _commit(repo, "-m", "logged") entries = read_reflog(repo, "main") assert len(entries) >= 1 assert any( "logged" in e.operation or "commit" in e.operation for e in entries ) def test_reflog_contains_commit_id(self, repo: pathlib.Path) -> None: from muse.core.reflog import read_reflog _commit(repo, "-m", "ref-entry") cid = get_head_commit_id(repo, "main") entries = read_reflog(repo, "main") assert any(e.new_id == cid for e in entries) # ────────────────────────────────────────────────────────────────────────────── # Integration — stage cleared after commit # ────────────────────────────────────────────────────────────────────────────── class TestStageClearAfterCommit: def test_stage_is_cleared(self, repo: pathlib.Path) -> None: _invoke(repo, ["code", "add", "."]) _commit(repo, "-m", "staged") stage_path = muse_dir(repo) / "stage.json" if stage_path.exists(): data = json.loads(stage_path.read_text()) assert data == {} or data.get("files") == {} # ────────────────────────────────────────────────────────────────────────────── # End-to-end — provenance env vars # ────────────────────────────────────────────────────────────────────────────── class TestProvenanceEnvVars: def test_model_id_from_env( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.setenv("MUSE_MODEL_ID", "gpt-5") _invoke(repo, ["commit", "-m", "x"]) branch = read_current_branch(repo) cid = get_head_commit_id(repo, branch) assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert rec.model_id == "gpt-5" def test_toolchain_id_from_env( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.setenv("MUSE_TOOLCHAIN_ID", "cursor-v42") _invoke(repo, ["commit", "-m", "x"]) branch = read_current_branch(repo) cid = get_head_commit_id(repo, branch) assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert rec.toolchain_id == "cursor-v42" def test_prompt_hash_bare_hex_gets_prefixed( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: bare = "a" * 64 monkeypatch.setenv("MUSE_PROMPT_HASH", bare) _invoke(repo, ["commit", "-m", "x"]) branch = read_current_branch(repo) cid = get_head_commit_id(repo, branch) assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert rec.prompt_hash == f"sha256:{bare}" def test_prompt_hash_already_prefixed_unchanged( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: prefixed = f"sha256:{'b' * 64}" monkeypatch.setenv("MUSE_PROMPT_HASH", prefixed) _invoke(repo, ["commit", "-m", "x"]) branch = read_current_branch(repo) cid = get_head_commit_id(repo, branch) assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert rec.prompt_hash == prefixed def test_prompt_hash_invalid_not_stored( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.setenv("MUSE_PROMPT_HASH", "abc123") _invoke(repo, ["commit", "-m", "x"]) branch = read_current_branch(repo) cid = get_head_commit_id(repo, branch) assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert rec.prompt_hash == "" def test_flag_overrides_env_for_model_id( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.setenv("MUSE_MODEL_ID", "env-model") _invoke(repo, ["commit", "-m", "x", "--model-id", "flag-model"]) branch = read_current_branch(repo) cid = get_head_commit_id(repo, branch) assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert rec.model_id == "flag-model" # ────────────────────────────────────────────────────────────────────────────── # Integration — parent manifest not double-read # ────────────────────────────────────────────────────────────────────────────── class TestParentManifestSingleRead: """ The parent snapshot must be loaded only once per commit, not twice. We verify via call counts on read_snapshot. """ def test_parent_snapshot_read_at_most_once(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "first") (repo / "b.py").write_text("b=1\n") call_count: list[int] = [0] original_read_snapshot = read_snapshot from muse.core.snapshots import SnapshotRecord def counting_read_snapshot( root: pathlib.Path, sid: str ) -> SnapshotRecord | None: call_count[0] += 1 return original_read_snapshot(root, sid) with patch( "muse.cli.commands.commit.read_snapshot", side_effect=counting_read_snapshot, ): _commit(repo, "-m", "second") # Should be ≤1 (one read of the parent snapshot). # Previously the bug caused 2 reads: one for structured_delta, one for file counts. assert call_count[0] <= 1, ( f"read_snapshot called {call_count[0]} times; expected ≤1 (parent double-read bug)" ) # ────────────────────────────────────────────────────────────────────────────── # Stress — sequential commits # ────────────────────────────────────────────────────────────────────────────── @pytest.mark.slow class TestStressSequential: def test_100_commits_all_succeed(self, repo: pathlib.Path) -> None: for i in range(100): (repo / f"f{i:04d}.py").write_text(f"x = {i}\n") result = _commit(repo, "-m", f"commit {i}") assert result.exit_code == 0, f"Commit {i} failed: {result.output}" def test_100_commits_branch_advances(self, repo: pathlib.Path) -> None: seen_ids: set[str] = set() for i in range(100): (repo / f"g{i:04d}.py").write_text(f"y = {i}\n") _commit(repo, "-m", f"c{i}") cid = get_head_commit_id(repo, "main") assert cid not in seen_ids, f"Duplicate commit ID at commit {i}" if cid: seen_ids.add(cid) assert len(seen_ids) == 100 @pytest.mark.slow class TestStressLargeManifest: def test_500_file_commit_succeeds(self, repo: pathlib.Path) -> None: for i in range(500): (repo / f"h{i:04d}.py").write_text(f"z = {i}\n") t0 = time.perf_counter() result = _commit(repo, "-m", "big") elapsed = (time.perf_counter() - t0) * 1000 assert result.exit_code == 0 assert elapsed < 3000, f"Commit too slow: {elapsed:.0f}ms" def test_500_file_single_change_commit(self, repo: pathlib.Path) -> None: for i in range(500): (repo / f"k{i:04d}.py").write_text(f"a = {i}\n") _commit(repo, "-m", "base") (repo / "k0000.py").write_text("a = 999\n") t0 = time.perf_counter() result = _commit(repo, "-m", "one change") elapsed = (time.perf_counter() - t0) * 1000 assert result.exit_code == 0 assert elapsed < 2000, f"Single-file commit too slow: {elapsed:.0f}ms" # ────────────────────────────────────────────────────────────────────────────── # Stress — concurrent commits to different repos # ────────────────────────────────────────────────────────────────────────────── @pytest.mark.slow class TestStressConcurrent: def test_concurrent_commits_to_separate_repos(self, tmp_path: pathlib.Path) -> None: """16 threads each commit to their own isolated repo — no interference.""" errors: list[str] = [] def do_commit(idx: int) -> None: repo_dir = tmp_path / f"repo_{idx}" repo_dir.mkdir() subprocess.run( ["muse", "init"], cwd=str(repo_dir), capture_output=True ) (repo_dir / "x.py").write_text(f"x = {idx}\n") r = subprocess.run( ["muse", "commit", "-m", f"c{idx}", "--json"], cwd=str(repo_dir), capture_output=True, text=True, ) if r.returncode != 0: errors.append(f"repo_{idx}: {r.stderr}") return data = json.loads(r.stdout) if "commit_id" not in data: errors.append(f"repo_{idx}: no commit_id in output") threads = [threading.Thread(target=do_commit, args=(i,)) for i in range(16)] for t in threads: t.start() for t in threads: t.join() # --------------------------------------------------------------------------- # commit must NOT touch the working tree (Git-compatible behaviour) # # muse commit writes to the object store and advances the branch ref. # It must never call apply_manifest — that belongs only in checkout/merge/pull. # Unstaged changes on disk survive a commit unchanged. # See tests/test_commit_workdir_preservation.py for the full regression suite. # --------------------------------------------------------------------------- # ────────────────────────────────────────────────────────────────────────────── # Bug: commit refuses when only staged deletions remain (empty snapshot) # ────────────────────────────────────────────────────────────────────────────── class TestCommitAllDeletions: """muse commit must succeed when the only staged changes are deletions. The previous bug: plugin.snapshot() returns an empty manifest when all on-disk files are gone, and the guard ``if not manifest and not allow_empty`` fired — refusing the commit with "nothing tracked". But staged deletions ARE meaningful changes; the snapshot is intentionally empty. """ def _committed_repo(self, tmp_path: pathlib.Path) -> pathlib.Path: """Init repo, add files, make a first commit. Returns the repo path.""" _init_repo(tmp_path) (tmp_path / "a.txt").write_text("alpha\n") (tmp_path / "b.txt").write_text("beta\n") _invoke(tmp_path, ["code", "add", "."]) _commit(tmp_path, "-m", "initial") return tmp_path def test_commit_after_rm_all_succeeds(self, tmp_path: pathlib.Path) -> None: """muse commit must exit 0 after muse rm removes all tracked files.""" repo = self._committed_repo(tmp_path) _invoke(repo, ["rm", "a.txt"]) _invoke(repo, ["rm", "b.txt"]) result = _commit(repo, "-m", "remove everything") assert result.exit_code == 0, result.output def test_commit_after_rm_all_creates_second_commit(self, tmp_path: pathlib.Path) -> None: repo = self._committed_repo(tmp_path) _invoke(repo, ["rm", "a.txt"]) _invoke(repo, ["rm", "b.txt"]) _commit(repo, "-m", "remove everything") branch = read_current_branch(repo) commit_id = get_head_commit_id(repo, branch) assert commit_id is not None commit = read_commit(repo, commit_id) assert commit is not None assert commit.message == "remove everything" def test_commit_after_rm_all_snapshot_is_empty(self, tmp_path: pathlib.Path) -> None: """The snapshot produced by an all-deletions commit must be empty.""" repo = self._committed_repo(tmp_path) _invoke(repo, ["rm", "a.txt"]) _invoke(repo, ["rm", "b.txt"]) _commit(repo, "-m", "remove everything") branch = read_current_branch(repo) commit_id = get_head_commit_id(repo, branch) commit = read_commit(repo, commit_id) snap = read_snapshot(repo, commit.snapshot_id) assert snap is not None # muse init creates .museattributes and .museignore — these are tracked # alongside user files and remain in the snapshot after user files are removed. assert "a.txt" not in snap.manifest assert "b.txt" not in snap.manifest def test_commit_after_rm_one_file_leaves_one_in_snapshot( self, tmp_path: pathlib.Path ) -> None: """Removing one of two files produces a one-entry snapshot.""" repo = self._committed_repo(tmp_path) _invoke(repo, ["rm", "a.txt"]) _commit(repo, "-m", "remove a.txt") branch = read_current_branch(repo) commit_id = get_head_commit_id(repo, branch) commit = read_commit(repo, commit_id) snap = read_snapshot(repo, commit.snapshot_id) assert snap is not None assert "a.txt" not in snap.manifest assert "b.txt" in snap.manifest def test_json_output_on_all_deletions_commit(self, tmp_path: pathlib.Path) -> None: """--json output must be valid and show exit_code 0 for an all-deletions commit.""" repo = self._committed_repo(tmp_path) _invoke(repo, ["rm", "a.txt"]) _invoke(repo, ["rm", "b.txt"]) result = _commit(repo, "-m", "rm all", "--json") assert result.exit_code == 0, result.output data = json.loads(result.output) assert data.get("exit_code", data.get("code", 0)) == 0 or "commit_id" in data def test_status_clean_after_all_deletions_commit(self, tmp_path: pathlib.Path) -> None: """After committing all deletions, muse status must show clean=True.""" repo = self._committed_repo(tmp_path) _invoke(repo, ["rm", "a.txt"]) _invoke(repo, ["rm", "b.txt"]) result = _commit(repo, "-m", "remove everything") assert result.exit_code == 0, result.output status = _invoke(repo, ["status", "--json"]) data = json.loads(status.output) assert data["clean"] is True assert data["staged"]["deleted"] == [] def test_recursive_rm_then_commit_succeeds(self, tmp_path: pathlib.Path) -> None: """muse rm -r then commit must succeed even if all files were in that dir.""" _init_repo(tmp_path) (tmp_path / "src").mkdir() (tmp_path / "src" / "main.py").write_text("main()\n") (tmp_path / "src" / "utils.py").write_text("pass\n") _invoke(tmp_path, ["code", "add", "."]) _commit(tmp_path, "-m", "initial") _invoke(tmp_path, ["rm", "-r", "src"]) result = _commit(tmp_path, "-m", "remove src/") assert result.exit_code == 0, result.output def test_dry_run_with_all_deletions_staged(self, tmp_path: pathlib.Path) -> None: """--dry-run must exit 0 (changes pending) when deletions are staged.""" repo = self._committed_repo(tmp_path) _invoke(repo, ["rm", "a.txt"]) _invoke(repo, ["rm", "b.txt"]) result = _commit(repo, "-m", "rm all", "--dry-run") assert result.exit_code == 0, result.output def test_cached_rm_file_stays_on_disk_after_commit(self, tmp_path: pathlib.Path) -> None: """muse rm --cached keeps the file on disk; after commit it is untracked.""" repo = self._committed_repo(tmp_path) # Stage deletion of a.txt but keep it on disk; delete b.txt from disk too. _invoke(repo, ["rm", "--cached", "a.txt"]) _invoke(repo, ["rm", "b.txt"]) result = _commit(repo, "-m", "untrack a.txt, delete b.txt") assert result.exit_code == 0, result.output # a.txt must still exist on disk (it was --cached) assert (repo / "a.txt").exists(), "a.txt should remain on disk after --cached rm" # b.txt was deleted from disk by muse rm assert not (repo / "b.txt").exists() # a.txt is now untracked status = json.loads(_invoke(repo, ["status", "--json"]).output) assert "a.txt" in status["untracked"] def test_all_cached_rm_then_commit_leaves_files_on_disk( self, tmp_path: pathlib.Path ) -> None: """All files removed with --cached must survive on disk after commit.""" repo = self._committed_repo(tmp_path) _invoke(repo, ["rm", "--cached", "a.txt"]) _invoke(repo, ["rm", "--cached", "b.txt"]) result = _commit(repo, "-m", "untrack everything") assert result.exit_code == 0, result.output assert (repo / "a.txt").exists(), "a.txt must stay on disk" assert (repo / "b.txt").exists(), "b.txt must stay on disk" status = json.loads(_invoke(repo, ["status", "--json"]).output) # Untracked files make the repo dirty (mirrors git behaviour) assert status["clean"] is False assert "a.txt" in status["untracked"] assert "b.txt" in status["untracked"] # --------------------------------------------------------------------------- # Flag registration tests # --------------------------------------------------------------------------- import argparse as _argparse from muse.cli.commands.commit import register as _register_commit from muse.core.paths import head_path, heads_dir, muse_dir, repo_json_path def _parse_commit(*args: str) -> _argparse.Namespace: root_p = _argparse.ArgumentParser() subs = root_p.add_subparsers(dest="cmd") _register_commit(subs) return root_p.parse_args(["commit", *args]) class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: ns = _parse_commit("-m", "msg") assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = _parse_commit("-m", "msg", "--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = _parse_commit("-m", "msg", "-j") assert ns.json_out is True def test_m_shorthand_for_message(self) -> None: ns = _parse_commit("-m", "hello") assert ns.message == "hello" # --------------------------------------------------------------------------- # Genesis commit — structured_delta must be populated (TDD) # --------------------------------------------------------------------------- class TestGenesisStructuredDelta: """The very first commit (no parent) must produce a structured_delta with insert ops for every tracked symbol, so indexers can record the birth op as ``add`` rather than ``modify``. Prior to the fix, ``structured_delta`` was ``None`` for genesis commits because the diff path was guarded by ``if parent_id is not None``. """ def test_genesis_commit_has_structured_delta(self, repo: pathlib.Path) -> None: """structured_delta must not be None on the first commit.""" _invoke(repo, ["code", "add", "."]) result = _commit(repo, "-m", "init: genesis", "--json") assert result.exit_code == 0, result.output data = json.loads(result.output) cid = data.get("commit_id") assert cid is not None rec = read_commit(repo, cid) assert rec is not None assert rec.structured_delta is not None, ( "Genesis commit must carry a structured_delta so indexers can " "record symbol births as op=add" ) def test_genesis_structured_delta_has_insert_ops(self, repo: pathlib.Path) -> None: """All symbols in a genesis commit must have op=insert (not replace).""" _invoke(repo, ["code", "add", "."]) result = _commit(repo, "-m", "init", "--json") data = json.loads(result.output) cid = data["commit_id"] rec = read_commit(repo, cid) assert rec is not None delta = rec.structured_delta assert delta is not None ops = delta.get("ops", []) assert ops, "Genesis delta must have at least one op" op_types = {op.get("op") for op in ops} assert "insert" in op_types, ( f"Expected insert ops in genesis delta; got: {op_types}" ) assert "replace" not in op_types, ( f"Genesis delta must not contain replace ops; got: {op_types}" ) def test_genesis_structured_delta_content_ids_present( self, repo: pathlib.Path ) -> None: """Each insert op in the genesis delta must carry a new_content_id.""" _invoke(repo, ["code", "add", "."]) result = _commit(repo, "-m", "init", "--json") data = json.loads(result.output) cid = data["commit_id"] rec = read_commit(repo, cid) assert rec is not None delta = rec.structured_delta assert delta is not None ops = delta.get("ops", []) for op in ops: if op.get("op") == "insert": assert op.get("new_content_id") or op.get("content_id"), ( f"Insert op missing content id: {op}" )