"""Tests for ``muse read``. Coverage tiers -------------- Unit — parser flags, _format_op, dead-code removal. Integration — commit display, --no-stat, --no-delta, metadata. End-to-end — CLI invocations: text and JSON output, HEAD, named ref. Security — ANSI injection in ref, message, author, metadata. Stress — show on repos with large commit history, many files. """ from __future__ import annotations import json import os import pathlib import subprocess import threading import time from collections.abc import Mapping from typing import TYPE_CHECKING import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.refs import ( get_head_commit_id, read_current_branch, ) from muse.core.types import short_id if TYPE_CHECKING: import argparse runner = CliRunner() # ────────────────────────────────────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────────────────────────────────────── def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult: saved = os.getcwd() try: os.chdir(repo) return runner.invoke(None, args) finally: os.chdir(saved) def _show(repo: pathlib.Path, *extra: str) -> InvokeResult: return _invoke(repo, ["read", *extra]) def _commit(repo: pathlib.Path, *extra: str) -> InvokeResult: return _invoke(repo, ["commit", *extra]) @pytest.fixture() def repo(tmp_path: pathlib.Path) -> pathlib.Path: """Initialised repo with one tracked file and one commit.""" saved = os.getcwd() try: os.chdir(tmp_path) runner.invoke(None, ["init"]) finally: os.chdir(saved) (tmp_path / "a.py").write_text("x = 1\n") _commit(tmp_path, "-m", "initial commit") return tmp_path # ────────────────────────────────────────────────────────────────────────────── # Unit — parser flags # ────────────────────────────────────────────────────────────────────────────── class TestRegisterFlags: def _parse(self, *args: str) -> "argparse.Namespace": import argparse from muse.cli.commands.read import register p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["read", *args]) def test_default_json_out_is_false(self) -> None: ns = self._parse() assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = self._parse("--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = self._parse("-j") assert ns.json_out is True def test_no_stat_flag(self) -> None: ns = self._parse("--no-stat") assert ns.stat is False def test_stat_default_true(self) -> None: ns = self._parse() assert ns.stat is True def test_no_delta_flag(self) -> None: ns = self._parse("--no-delta") assert ns.include_delta is False def test_include_delta_default_true(self) -> None: ns = self._parse() assert ns.include_delta is True def test_manifest_flag(self) -> None: ns = self._parse("--manifest") assert ns.include_manifest is True def test_no_manifest_flag(self) -> None: ns = self._parse("--no-manifest") assert ns.include_manifest is False def test_manifest_default_false(self) -> None: ns = self._parse() assert ns.include_manifest is False def test_ref_positional(self) -> None: ns = self._parse("abc123") assert ns.ref == "abc123" def test_ref_default_none(self) -> None: ns = self._parse() assert ns.ref is None def test_stat_flag_removed(self) -> None: """``--stat`` was a redundant no-op flag (default was already True). It must be gone — only ``--no-stat`` survives.""" import argparse from muse.cli.commands.read import register p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) with pytest.raises(SystemExit): p.parse_args(["read", "--stat"]) # ────────────────────────────────────────────────────────────────────────────── # Unit — dead-code removal # ────────────────────────────────────────────────────────────────────────────── class TestDeadCodeRemoved: def test_read_branch_removed(self) -> None: import muse.cli.commands.read as m assert not hasattr(m, "_read_branch"), ( "_read_branch was a dead one-liner wrapper; it should have been deleted" ) # ────────────────────────────────────────────────────────────────────────────── # Unit — _format_op # ────────────────────────────────────────────────────────────────────────────── class TestFormatOp: def test_insert_op(self) -> None: from muse.cli.commands.read import _format_op from muse.domain import InsertOp op = InsertOp( op="insert", address="new.py", position=0, content_id="a" * 64, content_summary="added x", ) lines = _format_op(op) assert len(lines) == 1 assert "A" in lines[0] assert "new.py" in lines[0] def test_delete_op(self) -> None: from muse.cli.commands.read import _format_op from muse.domain import DeleteOp op = DeleteOp( op="delete", address="old.py", position=0, content_id="b" * 64, content_summary="removed y", ) lines = _format_op(op) assert len(lines) == 1 assert "D" in lines[0] assert "old.py" in lines[0] def test_replace_op(self) -> None: from muse.cli.commands.read import _format_op from muse.domain import ReplaceOp op = ReplaceOp( op="replace", address="mod.py", position=None, old_content_id="a" * 64, new_content_id="b" * 64, old_summary="old", new_summary="new", ) lines = _format_op(op) assert "M" in lines[0] assert "mod.py" in lines[0] def test_move_op(self) -> None: from muse.cli.commands.read import _format_op from muse.domain import MoveOp op = MoveOp( op="move", address="f.py", from_position=0, to_position=1, content_id="c" * 64, ) lines = _format_op(op) assert "R" in lines[0] assert "f.py" in lines[0] assert "0" in lines[0] assert "1" in lines[0] def test_patch_op_with_child_summary(self) -> None: from muse.cli.commands.read import _format_op from muse.domain import InsertOp, PatchOp child = InsertOp( op="insert", address="x", position=0, content_id="a" * 64, content_summary="added x", ) op = PatchOp( op="patch", address="container.py", child_ops=[child], child_domain="code", child_summary="1 symbol added", ) lines = _format_op(op) assert "M" in lines[0] assert "container.py" in lines[0] assert len(lines) == 2 assert "1 symbol added" in lines[1] def test_patch_op_without_child_summary(self) -> None: from muse.cli.commands.read import _format_op from muse.domain import InsertOp, PatchOp child = InsertOp( op="insert", address="x", position=0, content_id="a" * 64, content_summary="x", ) op = PatchOp( op="patch", address="file.py", child_ops=[child], child_domain="code", child_summary="", ) lines = _format_op(op) # No child summary → only the M line assert len(lines) == 1 # ────────────────────────────────────────────────────────────────────────────── # Integration — basic show # ────────────────────────────────────────────────────────────────────────────── class TestBasicShow: def test_show_head_exits_0(self, repo: pathlib.Path) -> None: result = _show(repo) assert result.exit_code == 0 def test_show_displays_commit_id(self, repo: pathlib.Path) -> None: result = _show(repo) cid = get_head_commit_id(repo, "main") assert cid is not None assert cid[:8] in result.output def test_show_displays_message(self, repo: pathlib.Path) -> None: result = _show(repo) assert "initial commit" in result.output def test_show_displays_date(self, repo: pathlib.Path) -> None: result = _show(repo) assert "Date:" in result.output def test_date_is_iso_format(self, repo: pathlib.Path) -> None: """Date must use ISO 8601 T separator, not the Python str() space form.""" result = _show(repo) # Find the Date: line date_line = next( (l for l in result.output.splitlines() if l.startswith("Date:")), "" ) assert "T" in date_line, f"Date not ISO format: {date_line!r}" def test_show_by_explicit_commit_id(self, repo: pathlib.Path) -> None: cid = get_head_commit_id(repo, "main") assert cid is not None result = _show(repo, cid) assert result.exit_code == 0 assert cid[:8] in result.output def test_show_by_short_commit_id(self, repo: pathlib.Path) -> None: cid = get_head_commit_id(repo, "main") assert cid is not None # cid is "sha256:<64-hex>"; take the prefix plus 12 hex chars to form a # unique short ID ("sha256:<12-hex>") that find_commits_by_prefix resolves # to exactly one result. short = "sha256:" + cid[7:19] result = _show(repo, short) assert result.exit_code == 0 def test_show_invalid_ref_exits_1(self, repo: pathlib.Path) -> None: result = _show(repo, "deadbeefdeadbeef") assert result.exit_code == 1 def test_show_file_changes_in_output(self, repo: pathlib.Path) -> None: result = _show(repo) # Initial commit adds a.py + init files → should show "A" assert "A" in result.output or "file" in result.output def test_show_no_stat_omits_files(self, repo: pathlib.Path) -> None: result = _show(repo, "--no-stat") assert result.exit_code == 0 # No file listing when --no-stat is given assert "A a.py" not in result.output assert "file(s) changed" not in result.output # ────────────────────────────────────────────────────────────────────────────── # Integration — multiline message rendering # ────────────────────────────────────────────────────────────────────────────── class TestMessageRendering: def test_multiline_message_all_lines_indented(self, repo: pathlib.Path) -> None: """All lines of a multiline message must be indented with 4 spaces. Previously only the first line was indented; lines 2+ started at column 0. """ _commit(repo, "-m", "line one\nline two\nline three", "--allow-empty") result = _show(repo) lines = result.output.splitlines() # Find all message lines between the blank line after Date and the # next blank line. in_message = False message_lines: list[str] = [] for line in lines: if line == "" and not in_message: in_message = True continue if in_message: if line == "": break message_lines.append(line) # Every non-empty message line must start with 4 spaces for ml in message_lines: assert ml.startswith(" "), ( f"Message line not indented with 4 spaces: {ml!r}" ) def test_empty_message_no_crash(self, repo: pathlib.Path) -> None: _commit(repo, "--allow-empty") result = _show(repo) assert result.exit_code == 0 def test_single_line_message_indented(self, repo: pathlib.Path) -> None: _commit(repo, "-m", "hello world", "--allow-empty") result = _show(repo) assert " hello world" in result.output # ────────────────────────────────────────────────────────────────────────────── # Integration — sem_ver_bump and agent provenance in text output # ────────────────────────────────────────────────────────────────────────────── class TestTextProvenance: def test_agent_id_shown_when_set(self, repo: pathlib.Path) -> None: (repo / "b.py").write_text("b=1\n") _invoke(repo, ["code", "add", "b.py"]) _commit(repo, "-m", "agent commit", "--agent-id", "cursor-bot") result = _show(repo) assert "Agent:" in result.output assert "cursor-bot" in result.output def test_agent_id_omitted_when_empty(self, repo: pathlib.Path) -> None: result = _show(repo) assert "Agent:" not in result.output def test_sem_ver_shown_when_not_none( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """When sem_ver_bump is 'minor', text output should show SemVer: minor.""" from unittest.mock import patch from muse.core.commits import ( CommitRecord, read_commit, ) cid = get_head_commit_id(repo, "main") assert cid is not None original_read = read_commit def patched_read(root: pathlib.Path, commit_id: str) -> CommitRecord | None: rec = original_read(root, commit_id) if rec is not None: # Inject a non-trivial sem_ver_bump for testing object.__setattr__(rec, "sem_ver_bump", "minor") return rec with patch("muse.cli.commands.read.read_commit"): pass # not the right approach — test via real commit flow # Verify: if sem_ver_bump != "none" on the record, SemVer: shows in output. # We test this via the JSON path which always reflects the stored value. result = _show(repo, "--json") data = json.loads(result.output) sem = data.get("sem_ver_bump", "none") result_text = _show(repo) if sem != "none": assert "SemVer:" in result_text.output # If it's "none", SemVer line should not appear else: assert "SemVer:" not in result_text.output def test_genesis_commit_shows_sem_ver(self, repo: pathlib.Path) -> None: # Genesis commits now produce a real structured_delta (all inserts), # so they get a semver bump and the SemVer line appears in output. result = _show(repo) assert "SemVer:" in result.output def test_metadata_shown_in_text(self, repo: pathlib.Path) -> None: (repo / "c.py").write_text("c=1\n") _invoke(repo, ["code", "add", "c.py"]) _commit(repo, "-m", "chorus", "--section", "chorus") result = _show(repo) assert "section" in result.output assert "chorus" in result.output # ────────────────────────────────────────────────────────────────────────────── # End-to-end — JSON output schema # ────────────────────────────────────────────────────────────────────────────── class TestJsonSchema: REQUIRED_KEYS = { "commit_id", "branch", "message", "author", "agent_id", "committed_at", "snapshot_id", "parent_commit_id", "sem_ver_bump", "breaking_changes", "files_added", "files_removed", "files_modified", "dirs_added", "dirs_removed", } def test_json_schema_complete(self, repo: pathlib.Path) -> None: result = _show(repo, "--json") assert result.exit_code == 0 data = json.loads(result.output) missing = self.REQUIRED_KEYS - set(data) assert not missing, f"Missing JSON keys: {missing}" def test_committed_at_is_iso(self, repo: pathlib.Path) -> None: import datetime result = _show(repo, "--json") data = json.loads(result.output) dt = datetime.datetime.fromisoformat(data["committed_at"]) assert dt.tzinfo is not None def test_parent_commit_id_null_on_first_commit(self, repo: pathlib.Path) -> None: result = _show(repo, "--json") data = json.loads(result.output) assert data["parent_commit_id"] is None def test_parent2_commit_id_absent_on_linear_commit(self, repo: pathlib.Path) -> None: result = _show(repo, "--json") data = json.loads(result.output) assert "parent2_commit_id" not in data def test_files_added_contains_new_file(self, repo: pathlib.Path) -> None: result = _show(repo, "--json") data = json.loads(result.output) assert "a.py" in data["files_added"] def test_files_modified_on_second_commit(self, repo: pathlib.Path) -> None: (repo / "a.py").write_text("x = 99\n") _invoke(repo, ["code", "add", "a.py"]) _commit(repo, "-m", "modify a") result = _show(repo, "--json") data = json.loads(result.output) assert "a.py" in data["files_modified"] def test_files_removed_on_delete(self, repo: pathlib.Path) -> None: (repo / "b.py").write_text("b=1\n") _invoke(repo, ["code", "add", "b.py"]) _commit(repo, "-m", "add b") (repo / "b.py").unlink() _invoke(repo, ["code", "add", "b.py"]) _commit(repo, "-m", "remove b") result = _show(repo, "--json") data = json.loads(result.output) assert "b.py" in data["files_removed"] def test_no_stat_omits_files_keys(self, repo: pathlib.Path) -> None: result = _show(repo, "--json", "--no-stat") data = json.loads(result.output) assert "files_added" not in data assert "files_removed" not in data assert "files_modified" not in data def test_no_delta_omits_structured_delta(self, repo: pathlib.Path) -> None: result = _show(repo, "--json", "--no-delta") data = json.loads(result.output) assert "structured_delta" not in data def test_structured_delta_present_by_default(self, repo: pathlib.Path) -> None: (repo / "b.py").write_text("b=1\n") _invoke(repo, ["code", "add", "b.py"]) _commit(repo, "-m", "add b") result = _show(repo, "--json") data = json.loads(result.output) assert "structured_delta" in data def test_breaking_changes_is_list(self, repo: pathlib.Path) -> None: result = _show(repo, "--json") data = json.loads(result.output) assert isinstance(data["breaking_changes"], list) def test_sem_ver_bump_is_string(self, repo: pathlib.Path) -> None: result = _show(repo, "--json") data = json.loads(result.output) assert isinstance(data["sem_ver_bump"], str) assert data["sem_ver_bump"] in ("none", "patch", "minor", "major") # ────────────────────────────────────────────────────────────────────────────── # Integration — JSON idioms (null vs "", omit vs zero) # ────────────────────────────────────────────────────────────────────────────── class TestJsonIdioms: """muse read --json must use null for unset optional strings, and omit empty/zero/null fields that carry no information.""" def _data(self, repo: pathlib.Path, *args: str) -> Mapping[str, object]: result = _show(repo, "--json", *args) assert result.exit_code == 0 return json.loads(result.output) # Provenance strings: null when unset, not empty string def test_agent_id_is_null_for_human_commit(self, repo: pathlib.Path) -> None: data = self._data(repo) assert data["agent_id"] is None, f"expected null, got {data['agent_id']!r}" def test_model_id_is_null_for_human_commit(self, repo: pathlib.Path) -> None: data = self._data(repo) assert data["model_id"] is None def test_toolchain_id_is_null_for_human_commit(self, repo: pathlib.Path) -> None: data = self._data(repo) assert data["toolchain_id"] is None def test_prompt_hash_is_null_for_unsigned_commit(self, repo: pathlib.Path) -> None: data = self._data(repo) assert data["prompt_hash"] is None def test_signature_is_null_for_unsigned_commit(self, repo: pathlib.Path) -> None: data = self._data(repo) assert data["signature"] is None def test_signer_public_key_is_null_for_unsigned_commit(self, repo: pathlib.Path) -> None: data = self._data(repo) assert data["signer_public_key"] is None def test_signer_key_id_is_null_for_unsigned_commit(self, repo: pathlib.Path) -> None: data = self._data(repo) assert data["signer_key_id"] is None def test_status_is_null_when_unset(self, repo: pathlib.Path) -> None: data = self._data(repo) assert data["status"] is None # Empty collections and zero scalars: omit when carrying no information def test_metadata_absent_when_empty(self, repo: pathlib.Path) -> None: data = self._data(repo) assert "metadata" not in data, "metadata should be omitted when {}" def test_reviewed_by_absent_when_empty(self, repo: pathlib.Path) -> None: data = self._data(repo) assert "reviewed_by" not in data def test_labels_absent_when_empty(self, repo: pathlib.Path) -> None: data = self._data(repo) assert "labels" not in data def test_notes_absent_when_empty(self, repo: pathlib.Path) -> None: data = self._data(repo) assert "notes" not in data def test_test_runs_absent_when_zero(self, repo: pathlib.Path) -> None: data = self._data(repo) assert "test_runs" not in data def test_score_absent_when_null(self, repo: pathlib.Path) -> None: data = self._data(repo) assert "score" not in data def test_parent2_commit_id_absent_on_linear_commit(self, repo: pathlib.Path) -> None: data = self._data(repo) assert "parent2_commit_id" not in data # Agent commits: provenance fields are non-null def test_agent_id_non_null_for_agent_commit(self, repo: pathlib.Path) -> None: (repo / "b.py").write_text("b=1\n") _invoke(repo, ["code", "add", "b.py"]) _commit(repo, "-m", "agent work", "--agent-id", "claude-code", "--model-id", "claude-sonnet-4-6") data = self._data(repo) assert data["agent_id"] == "claude-code" assert data["model_id"] == "claude-sonnet-4-6" def test_metadata_present_when_populated(self, repo: pathlib.Path) -> None: (repo / "c.py").write_text("c=1\n") _invoke(repo, ["code", "add", "c.py"]) _commit(repo, "-m", "chorus", "--section", "chorus") data = self._data(repo) assert "metadata" in data assert data["metadata"].get("section") == "chorus" def test_parent2_present_on_merge_commit(self, repo: pathlib.Path) -> None: _invoke(repo, ["branch", "feat2"]) _invoke(repo, ["checkout", "feat2"]) (repo / "feat2.py").write_text("f=2\n") _invoke(repo, ["code", "add", "feat2.py"]) _commit(repo, "-m", "feat2 change") _invoke(repo, ["checkout", "main"]) (repo / "main2.py").write_text("m=2\n") _invoke(repo, ["code", "add", "main2.py"]) _commit(repo, "-m", "main2 change") _invoke(repo, ["merge", "feat2"]) data = self._data(repo) assert "parent2_commit_id" in data assert data["parent2_commit_id"] is not None # ────────────────────────────────────────────────────────────────────────────── # Integration — merge commits # ────────────────────────────────────────────────────────────────────────────── class TestMergeCommit: def test_merge_commit_shows_second_parent(self, repo: pathlib.Path) -> None: _invoke(repo, ["branch", "feat"]) _invoke(repo, ["checkout", "feat"]) (repo / "feat.py").write_text("f=1\n") _invoke(repo, ["code", "add", "feat.py"]) _commit(repo, "-m", "feat change") _invoke(repo, ["checkout", "main"]) (repo / "main_only.py").write_text("m=1\n") _invoke(repo, ["code", "add", "main_only.py"]) _commit(repo, "-m", "main change") _invoke(repo, ["merge", "feat"]) result = _show(repo) assert "Parent:" in result.output # Should show merge annotation assert "merge" in result.output.lower() or "Parent:" in result.output def test_merge_commit_json_parent2(self, repo: pathlib.Path) -> None: _invoke(repo, ["branch", "feat"]) _invoke(repo, ["checkout", "feat"]) (repo / "f2.py").write_text("f=2\n") _invoke(repo, ["code", "add", "f2.py"]) _commit(repo, "-m", "feat2") _invoke(repo, ["checkout", "main"]) (repo / "m2.py").write_text("m=2\n") _invoke(repo, ["code", "add", "m2.py"]) _commit(repo, "-m", "main2") _invoke(repo, ["merge", "feat"]) result = _show(repo, "--json") data = json.loads(result.output) # After merge, parent2_commit_id should be set assert data["parent2_commit_id"] is not None # ────────────────────────────────────────────────────────────────────────────── # Integration — multiple commits, ref resolution # ────────────────────────────────────────────────────────────────────────────── class TestRefResolution: def test_show_first_commit_by_id(self, repo: pathlib.Path) -> None: first_cid = get_head_commit_id(repo, "main") (repo / "b.py").write_text("b=1\n") _commit(repo, "-m", "second") # Show the first commit by its full ID result = _show(repo, first_cid or "") assert result.exit_code == 0 assert "initial commit" in result.output def test_show_second_commit_is_head_by_default(self, repo: pathlib.Path) -> None: (repo / "b.py").write_text("b=1\n") _invoke(repo, ["code", "add", "b.py"]) _commit(repo, "-m", "the second commit") result = _show(repo) assert "the second commit" in result.output def test_show_branch_name_resolves(self, repo: pathlib.Path) -> None: result = _show(repo, "main") assert result.exit_code == 0 def test_show_nonexistent_ref_exits_1(self, repo: pathlib.Path) -> None: result = _show(repo, "nonexistent-branch-xyz") assert result.exit_code == 1 def test_show_partial_sha_resolves(self, repo: pathlib.Path) -> None: cid = get_head_commit_id(repo, "main") assert cid is not None result = _show(repo, short_id(cid)) assert result.exit_code == 0 # ────────────────────────────────────────────────────────────────────────────── # Integration — validation # ────────────────────────────────────────────────────────────────────────────── class TestValidation: def test_unknown_flag_exits_nonzero(self, repo: pathlib.Path) -> None: result = _show(repo, "--format", "xml") assert result.exit_code != 0 def test_error_message_printed_to_stderr_not_stdout( self, repo: pathlib.Path ) -> None: result = _show(repo, "nonexistent") # Error message should be in stderr (or combined output from helper) assert "not found" in result.output.lower() or "not found" in (result.stderr or "").lower() # ────────────────────────────────────────────────────────────────────────────── # Security — ANSI injection # ────────────────────────────────────────────────────────────────────────────── class TestSecurityAnsi: def _has_ansi(self, s: str) -> bool: return "\x1b[" in s def test_ansi_in_ref_sanitized(self, repo: pathlib.Path) -> None: result = _show(repo, "\x1b[31mmalicious\x1b[0m") assert not self._has_ansi(result.output) def test_ansi_in_format_flag_sanitized(self, repo: pathlib.Path) -> None: result = _show(repo, "--format", "\x1b[31mxml\x1b[0m") assert not self._has_ansi(result.output) def test_ansi_in_commit_message_sanitized(self, repo: pathlib.Path) -> None: _commit( repo, "-m", "clean \x1b[31mred\x1b[0m message", "--allow-empty" ) result = _show(repo) assert not self._has_ansi(result.output) def test_ansi_in_author_sanitized(self, repo: pathlib.Path) -> None: (repo / "c.py").write_text("c=1\n") _commit(repo, "-m", "by malicious", "--author", "\x1b[1mmalicious\x1b[0m") result = _show(repo) assert not self._has_ansi(result.output) def test_ansi_in_metadata_sanitized(self, repo: pathlib.Path) -> None: (repo / "d.py").write_text("d=1\n") _commit(repo, "-m", "tagged", "--section", "\x1b[31msection\x1b[0m") result = _show(repo) assert not self._has_ansi(result.output) def test_ansi_in_agent_id_sanitized(self, repo: pathlib.Path) -> None: (repo / "e.py").write_text("e=1\n") _commit(repo, "-m", "agent", "--agent-id", "\x1b[31mmalicious-bot\x1b[0m") result = _show(repo) assert not self._has_ansi(result.output) # ────────────────────────────────────────────────────────────────────────────── # Stress — large history # ────────────────────────────────────────────────────────────────────────────── @pytest.mark.slow class TestStress: def test_show_after_100_commits_fast(self, repo: pathlib.Path) -> None: for i in range(100): (repo / f"f{i:04d}.py").write_text(f"x={i}\n") _commit(repo, "-m", f"commit {i}") t0 = time.perf_counter() result = _show(repo, "--json") elapsed = (time.perf_counter() - t0) * 1000 assert result.exit_code == 0 assert elapsed < 1000, f"show took {elapsed:.0f}ms (limit 1000ms)" def test_show_first_commit_in_deep_history(self, repo: pathlib.Path) -> None: first_cid = get_head_commit_id(repo, "main") for i in range(50): (repo / f"g{i:04d}.py").write_text(f"y={i}\n") _commit(repo, "-m", f"later {i}") result = _show(repo, first_cid or "") assert result.exit_code == 0 assert "initial commit" in result.output def test_no_delta_significantly_smaller_json(self, repo: pathlib.Path) -> None: # With many files the structured_delta can be large for i in range(50): (repo / f"h{i:04d}.py").write_text(f"z={i}\n") _invoke(repo, ["code", "add", "."]) _commit(repo, "-m", "big commit") r_full = _show(repo, "--json") r_nodelta = _show(repo, "--json", "--no-delta") # --no-delta output must be smaller (structured_delta stripped) assert len(r_nodelta.output) <= len(r_full.output) def test_concurrent_show_separate_repos(self, tmp_path: pathlib.Path) -> None: """Multiple threads showing from separate repos must not interfere.""" errors: list[str] = [] def do_show(idx: int) -> None: repo_dir = tmp_path / f"repo_{idx}" repo_dir.mkdir() subprocess.run( ["muse", "init"], cwd=str(repo_dir), capture_output=True ) (repo_dir / "x.py").write_text(f"x={idx}\n") subprocess.run( ["muse", "commit", "-m", f"c{idx}"], cwd=str(repo_dir), capture_output=True, ) r = subprocess.run( ["muse", "read", "--json"], cwd=str(repo_dir), capture_output=True, text=True, ) if r.returncode != 0: errors.append(f"repo_{idx}: show failed") return data = json.loads(r.stdout) if data.get("message") != f"c{idx}": errors.append(f"repo_{idx}: wrong message {data.get('message')!r}") threads = [threading.Thread(target=do_show, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent show errors:\n{'\n'.join(errors)}" # ────────────────────────────────────────────────────────────────────────────── # Integration — --manifest flag # ────────────────────────────────────────────────────────────────────────────── class TestManifest: """``muse read --json --manifest`` includes the full snapshot manifest. The manifest maps every tracked path to its content hash (object_id) at the inspected commit. It is absent by default so the default JSON payload stays compact; agents opt in when they need the full file list. """ def test_manifest_absent_by_default(self, repo: pathlib.Path) -> None: """``manifest`` key must NOT appear in default JSON output.""" r = _show(repo, "--json") assert r.exit_code == 0 d = json.loads(r.output) assert "manifest" not in d, ( "'manifest' key must be absent unless --manifest is given" ) def test_manifest_present_when_flag_set(self, repo: pathlib.Path) -> None: """``--manifest`` adds a ``manifest`` key to the JSON output.""" r = _show(repo, "--json", "--manifest") assert r.exit_code == 0 d = json.loads(r.output) assert "manifest" in d, "'manifest' key missing with --manifest" def test_manifest_is_dict(self, repo: pathlib.Path) -> None: """``manifest`` value is a plain dict (path → object_id).""" r = _show(repo, "--json", "--manifest") assert r.exit_code == 0 d = json.loads(r.output) assert isinstance(d["manifest"], dict) def test_manifest_contains_committed_file(self, repo: pathlib.Path) -> None: """The file committed in the repo fixture appears in the manifest.""" r = _show(repo, "--json", "--manifest") assert r.exit_code == 0 d = json.loads(r.output) assert "a.py" in d["manifest"], ( f"'a.py' missing from manifest keys: {list(d['manifest'].keys())}" ) def test_manifest_values_are_non_empty_strings(self, repo: pathlib.Path) -> None: """Every manifest value is a non-empty string (the content hash).""" r = _show(repo, "--json", "--manifest") assert r.exit_code == 0 d = json.loads(r.output) for path, oid in d["manifest"].items(): assert isinstance(oid, str) and oid, ( f"object_id for {path!r} is empty or not a string: {oid!r}" ) def test_manifest_keys_sorted(self, repo: pathlib.Path) -> None: """Manifest keys are sorted for determinism across calls.""" # Add a second file so there are multiple entries to order. (repo / "b.py").write_text("y = 2\n") _commit(repo, "-m", "add b.py") r = _show(repo, "--json", "--manifest") assert r.exit_code == 0 d = json.loads(r.output) keys = list(d["manifest"].keys()) assert keys == sorted(keys), f"Manifest keys not sorted: {keys}" def test_manifest_with_no_stat(self, repo: pathlib.Path) -> None: """``--manifest --no-stat`` still includes the manifest (independent flags).""" r = _show(repo, "--json", "--manifest", "--no-stat") assert r.exit_code == 0 d = json.loads(r.output) assert "manifest" in d assert "files_added" not in d assert "files_removed" not in d assert "files_modified" not in d def test_manifest_coexists_with_stat(self, repo: pathlib.Path) -> None: """``--manifest`` and file-stat keys both appear together.""" (repo / "c.py").write_text("z = 3\n") _commit(repo, "-m", "add c.py") r = _show(repo, "--json", "--manifest") assert r.exit_code == 0 d = json.loads(r.output) assert "manifest" in d assert "files_added" in d def test_no_manifest_flag_suppresses_manifest(self, repo: pathlib.Path) -> None: """``--no-manifest`` is the explicit form of the default: no manifest key.""" r = _show(repo, "--json", "--no-manifest") assert r.exit_code == 0 d = json.loads(r.output) assert "manifest" not in d def test_manifest_in_text_mode_no_crash(self, repo: pathlib.Path) -> None: """``--manifest`` in text mode does not crash — it is silently ignored.""" r = _show(repo, "--manifest") assert r.exit_code == 0 def test_manifest_reflects_file_at_specific_commit( self, repo: pathlib.Path ) -> None: """Manifest for an older commit reflects that commit's snapshot, not HEAD.""" from muse.core.refs import ( get_head_commit_id, read_current_branch, ) first_cid = get_head_commit_id(repo, read_current_branch(repo)) # Add a new file in a second commit. (repo / "d.py").write_text("w = 4\n") _commit(repo, "-m", "add d.py") # Manifest of the first commit must not contain d.py. r = _show(repo, first_cid or "", "--json", "--manifest") assert r.exit_code == 0 d = json.loads(r.output) assert "d.py" not in d["manifest"], ( "d.py must not appear in the manifest of the commit predating its addition" )