"""Comprehensive tests for ``muse code blame`` CLI hardening. Audit findings addressed ------------------------ Security - ev.detail and ev.new_address now passed through sanitize_display() in text output — eliminates ANSI injection from stored commit data. - from_ref echoed through sanitize_display() in error messages. - Address argument validated for control characters and null bytes before any processing. - Guard added in reverse-rename path to require '::' in op_address, preventing misparse of malformed commit records. Performance - address.rsplit("::", 1) was called twice per _events_in_commit invocation (once for file_prefix, once for bare_name). Now pre-split once per outer loop iteration and passed as parameters — saves 2N string ops for N commits scanned. - Early-exit: scan loop breaks as soon as a "created" event is found. Full lineage is established at that point; no older commits can add new events. Significant win for large repos. Dead code removed - Empty "# Repository helpers" comment section (no content). - Unreachable max_commits < 1 guard (clamp_int already enforces min=1). New capabilities - --kind filter: show only events of specified kind(s). - --author filter: case-insensitive substring match on commit author. - Improved --all text output: author+message for every event; event number labels beyond the first three. - "... N older events" hint when --all is omitted but more events exist. - _BlameEventJson and _BlameResultJson TypedDicts for stable JSON schemas. Coverage tiers -------------- - Unit: _flat_ops, _events_in_commit, _BlameEvent.to_dict - Integration: run with show/add/rename/filter scenarios - Security: control chars in address, ANSI in stored data, stderr routing - E2E: full CLI invocations, JSON schema, exit codes, filter flags - Stress: 500-commit chain, 50-event history, early-exit verification """ from __future__ import annotations import datetime import json import pathlib import threading from typing import TYPE_CHECKING from unittest.mock import MagicMock import pytest from muse.core.errors import ExitCode from muse.core.types import NULL_COMMIT_ID from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.domain import DomainOp, StructuredDelta from tests.cli_test_helper import CliRunner, InvokeResult from muse.cli.commands.blame import SymbolEventKind if TYPE_CHECKING: from muse.cli.commands.blame import _BlameResultJson runner = CliRunner() cli = None # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(tmp_path) for sub in ("commits", "snapshots", "refs/heads", "objects"): (dot_muse / sub).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": "test-repo"}), encoding="utf-8" ) return tmp_path _EPOCH = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) def _write_commit( root: pathlib.Path, message: str = "test commit", branch: str = "main", parent_id: str | None = None, author: str = "alice", delta: StructuredDelta | None = None, dt_offset_days: int = 0, ) -> CommitRecord: committed_at = _EPOCH + datetime.timedelta(days=dt_offset_days) snap_id = hash_snapshot({}) parents = [parent_id] if parent_id else [] cid = hash_commit( parent_ids=parents, snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), author=author, ) record = CommitRecord( commit_id=cid, branch=branch, snapshot_id=snap_id, message=message, committed_at=committed_at, author=author, parent_commit_id=parent_id, structured_delta=delta, ) write_commit(root, record) (ref_path(root, branch)).write_text(cid, encoding="utf-8") return record def _make_delta(ops: list[DomainOp]) -> StructuredDelta: return StructuredDelta(domain="code", ops=ops, summary="") _FAKE_HASH_A = "a" * 64 _FAKE_HASH_B = "b" * 64 def _insert_op(address: str, summary: str = "created") -> DomainOp: from muse.domain import InsertOp return InsertOp( op="insert", address=address, position=None, content_id=_FAKE_HASH_A, content_summary=summary, ) def _delete_op(address: str, summary: str = "deleted") -> DomainOp: from muse.domain import DeleteOp return DeleteOp( op="delete", address=address, position=None, content_id=_FAKE_HASH_A, content_summary=summary, ) def _replace_op(address: str, new_summary: str = "modified") -> DomainOp: from muse.domain import ReplaceOp return ReplaceOp( op="replace", address=address, position=None, old_content_id=_FAKE_HASH_A, new_content_id=_FAKE_HASH_B, old_summary="old", new_summary=new_summary, ) def _invoke(root: pathlib.Path, *args: str) -> InvokeResult: return runner.invoke( cli, ["code", "blame", *args], env={"MUSE_REPO_ROOT": str(root)}, ) def _parse_json(result: InvokeResult) -> "_BlameResultJson": from muse.cli.commands.blame import _BlameResultJson, _BlameEventJson start = result.output.index("{") blob = result.output[start:] depth = 0 end = 0 for i, ch in enumerate(blob): if ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: end = i + 1 break raw = json.loads(blob[:end]) assert isinstance(raw, dict) raw_events = raw.get("events", []) assert isinstance(raw_events, list) _valid_kinds = frozenset(("created", "modified", "renamed", "moved", "deleted", "signature")) events: list[_BlameEventJson] = [] for e in raw_events: assert isinstance(e, dict) raw_kind = e.get("event", "modified") kind: SymbolEventKind = raw_kind if raw_kind in _valid_kinds else "modified" events.append(_BlameEventJson( event=kind, commit_id=str(e.get("commit_id", "")), author=str(e.get("author", "")), message=str(e.get("message", "")), committed_at=str(e.get("committed_at", "")), address=str(e.get("address", "")), detail=str(e.get("detail", "")), new_address=e.get("new_address"), )) return _BlameResultJson( address=str(raw.get("address", "")), start_ref=str(raw.get("start_ref", "")), total_commits_scanned=int(raw.get("total_commits_scanned", 0)), truncated=bool(raw.get("truncated", False)), events=events, ) # --------------------------------------------------------------------------- # Unit — _flat_ops # --------------------------------------------------------------------------- class TestFlatOps: def test_passthrough_non_patch_ops(self) -> None: from muse.cli.commands.blame import _flat_ops op = _insert_op("f.py::foo") assert _flat_ops([op]) == [op] def test_flattens_patch_children(self) -> None: from muse.cli.commands.blame import _flat_ops from muse.domain import PatchOp child1 = _insert_op("f.py::foo") child2 = _replace_op("f.py::bar") patch = PatchOp(op="patch", address="f.py", child_ops=[child1, child2], child_domain="code", child_summary="test") result = _flat_ops([patch]) assert result == [child1, child2] def test_empty_ops(self) -> None: from muse.cli.commands.blame import _flat_ops assert _flat_ops([]) == [] def test_mixed_patch_and_leaf(self) -> None: from muse.cli.commands.blame import _flat_ops from muse.domain import PatchOp child = _insert_op("f.py::child") patch = PatchOp(op="patch", address="f.py", child_ops=[child], child_domain="code", child_summary="test") leaf = _delete_op("g.py::gone") result = _flat_ops([patch, leaf]) assert result == [child, leaf] # --------------------------------------------------------------------------- # Unit — _events_in_commit # --------------------------------------------------------------------------- class TestEventsInCommit: def _commit( self, root: pathlib.Path, delta: StructuredDelta | None = None ) -> CommitRecord: return _write_commit(root, delta=delta) def test_insert_yields_created(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.blame import _events_in_commit repo = _make_repo(tmp_path) delta = _make_delta([_insert_op("f.py::foo", "initial")]) c = self._commit(repo, delta) evs, next_addr = _events_in_commit(c, "f.py::foo", "f.py", "foo") assert len(evs) == 1 assert evs[0].kind == "created" assert next_addr == "f.py::foo" def test_replace_yields_modified(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.blame import _events_in_commit repo = _make_repo(tmp_path) delta = _make_delta([_replace_op("f.py::foo", "refactored")]) c = self._commit(repo, delta) evs, _ = _events_in_commit(c, "f.py::foo", "f.py", "foo") assert len(evs) == 1 assert evs[0].kind == "modified" def test_replace_rename_yields_renamed(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.blame import _events_in_commit repo = _make_repo(tmp_path) delta = _make_delta([_replace_op("f.py::foo", "renamed to bar")]) c = self._commit(repo, delta) evs, next_addr = _events_in_commit(c, "f.py::foo", "f.py", "foo") assert len(evs) == 1 assert evs[0].kind == "renamed" assert evs[0].new_address == "f.py::bar" assert next_addr == "f.py::foo" # old name — unchanged when walking backward def test_delete_yields_deleted(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.blame import _events_in_commit repo = _make_repo(tmp_path) delta = _make_delta([_delete_op("f.py::foo", "removed")]) c = self._commit(repo, delta) evs, _ = _events_in_commit(c, "f.py::foo", "f.py", "foo") assert len(evs) == 1 assert evs[0].kind == "deleted" def test_delete_moved_to_yields_moved(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.blame import _events_in_commit repo = _make_repo(tmp_path) delta = _make_delta([_delete_op("f.py::foo", "moved to g.py")]) c = self._commit(repo, delta) evs, _ = _events_in_commit(c, "f.py::foo", "f.py", "foo") assert evs[0].kind == "moved" def test_replace_signature_yields_signature( self, tmp_path: pathlib.Path ) -> None: from muse.cli.commands.blame import _events_in_commit repo = _make_repo(tmp_path) delta = _make_delta([_replace_op("f.py::foo", "signature changed")]) c = self._commit(repo, delta) evs, _ = _events_in_commit(c, "f.py::foo", "f.py", "foo") assert evs[0].kind == "signature" def test_no_delta_returns_empty(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.blame import _events_in_commit repo = _make_repo(tmp_path) c = self._commit(repo, delta=None) evs, next_addr = _events_in_commit(c, "f.py::foo", "f.py", "foo") assert evs == [] assert next_addr == "f.py::foo" def test_unrelated_op_not_matched(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.blame import _events_in_commit repo = _make_repo(tmp_path) delta = _make_delta([_insert_op("f.py::other")]) c = self._commit(repo, delta) evs, _ = _events_in_commit(c, "f.py::foo", "f.py", "foo") assert evs == [] def test_reverse_rename_switches_next_address( self, tmp_path: pathlib.Path ) -> None: from muse.cli.commands.blame import _events_in_commit repo = _make_repo(tmp_path) # op: old name "f.py::old" was renamed to "foo" delta = _make_delta([_replace_op("f.py::old", "renamed to foo")]) c = self._commit(repo, delta) evs, next_addr = _events_in_commit(c, "f.py::foo", "f.py", "foo") assert len(evs) == 1 assert evs[0].kind == "renamed" assert next_addr == "f.py::old" def test_malformed_op_address_without_colons_skipped( self, tmp_path: pathlib.Path ) -> None: from muse.cli.commands.blame import _events_in_commit repo = _make_repo(tmp_path) # op_address without '::' — should not cause crash or incorrect match delta = _make_delta([_replace_op("nofile", "renamed to foo")]) c = self._commit(repo, delta) # Should not raise and should not produce events for "f.py::foo" evs, next_addr = _events_in_commit(c, "f.py::foo", "f.py", "foo") assert evs == [] assert next_addr == "f.py::foo" # --------------------------------------------------------------------------- # Unit — _BlameEvent.to_dict # --------------------------------------------------------------------------- class TestBlameEventToDict: def test_all_fields_present(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.blame import _BlameEvent repo = _make_repo(tmp_path) c = _write_commit(repo) ev = _BlameEvent("created", c, "f.py::foo", "initial", None) d = ev.to_dict() for field in ( "event", "commit_id", "author", "message", "committed_at", "address", "detail", "new_address", ): assert field in d, f"Missing field: {field}" def test_event_kind_preserved(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.blame import _BlameEvent repo = _make_repo(tmp_path) c = _write_commit(repo) for kind in ("created", "modified", "renamed", "moved", "deleted", "signature"): typed_kind: SymbolEventKind = kind ev = _BlameEvent(typed_kind, c, "f.py::foo", "detail", None) assert ev.to_dict()["event"] == kind def test_new_address_none_when_absent(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.blame import _BlameEvent repo = _make_repo(tmp_path) c = _write_commit(repo) ev = _BlameEvent("modified", c, "f.py::foo", "mod", None) assert ev.to_dict()["new_address"] is None def test_new_address_string_when_set(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.blame import _BlameEvent repo = _make_repo(tmp_path) c = _write_commit(repo) ev = _BlameEvent("renamed", c, "f.py::old", "renamed to new", "f.py::new") assert ev.to_dict()["new_address"] == "f.py::new" # --------------------------------------------------------------------------- # Integration — basic blame scenarios # --------------------------------------------------------------------------- class TestBlameShow: def test_no_events_shows_message(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_commit(repo) result = _invoke(repo, "f.py::foo") assert result.exit_code == 0 assert "no events found" in result.output def test_created_event_shown(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) delta = _make_delta([_insert_op("f.py::foo")]) _write_commit(repo, delta=delta) result = _invoke(repo, "f.py::foo") assert result.exit_code == 0 assert "created" in result.output def test_modified_event_shown(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) delta = _make_delta([_replace_op("f.py::foo", "big refactor")]) _write_commit(repo, delta=delta) result = _invoke(repo, "f.py::foo") assert result.exit_code == 0 assert "big refactor" in result.output def test_author_shown_in_text_output(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) delta = _make_delta([_insert_op("f.py::foo")]) _write_commit(repo, author="bob", delta=delta) result = _invoke(repo, "f.py::foo") assert result.exit_code == 0 assert "bob" in result.output def test_message_shown_in_text_output(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) delta = _make_delta([_insert_op("f.py::foo")]) _write_commit(repo, message="feat: add foo", delta=delta) result = _invoke(repo, "f.py::foo") assert result.exit_code == 0 assert "feat: add foo" in result.output def test_shows_hint_when_more_events_exist( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) # 4 commits each touching f.py::foo c1 = _write_commit(repo, message="c1", delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0) c2 = _write_commit(repo, message="c2", parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod1")]), dt_offset_days=1) c3 = _write_commit(repo, message="c3", parent_id=c2.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod2")]), dt_offset_days=2) _write_commit(repo, message="c4", parent_id=c3.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod3")]), dt_offset_days=3) result = _invoke(repo, "f.py::foo") assert result.exit_code == 0 assert "older event" in result.output def test_all_flag_shows_full_history(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) c1 = _write_commit(repo, message="c1", delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0) c2 = _write_commit(repo, message="c2", parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod1")]), dt_offset_days=1) c3 = _write_commit(repo, message="c3", parent_id=c2.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod2")]), dt_offset_days=2) _write_commit(repo, message="c4", parent_id=c3.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod3")]), dt_offset_days=3) result = _invoke(repo, "f.py::foo", "--all") assert result.exit_code == 0 assert "older event" not in result.output def test_all_flag_shows_author_for_every_event( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) c1 = _write_commit(repo, author="alice", delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0) _write_commit(repo, author="bob", parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod")]), dt_offset_days=1) result = _invoke(repo, "f.py::foo", "--all") assert result.exit_code == 0 assert "alice" in result.output assert "bob" in result.output def test_rename_shown_and_tracked(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) c1 = _write_commit(repo, message="create", delta=_make_delta([_insert_op("f.py::old")]), dt_offset_days=0) _write_commit(repo, message="rename", parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::old", "renamed to new")]), dt_offset_days=1) result = _invoke(repo, "f.py::new") assert result.exit_code == 0 assert "renamed" in result.output # --------------------------------------------------------------------------- # Integration — JSON output # --------------------------------------------------------------------------- class TestBlameJson: def test_json_schema_all_fields(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) delta = _make_delta([_insert_op("f.py::foo")]) _write_commit(repo, delta=delta) result = _invoke(repo, "f.py::foo", "--json") assert result.exit_code == 0 data = _parse_json(result) for field in ("address", "start_ref", "total_commits_scanned", "truncated", "events"): assert field in data, f"Missing field: {field}" def test_json_address_matches(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_commit(repo) result = _invoke(repo, "f.py::foo", "--json") data = _parse_json(result) assert data["address"] == "f.py::foo" def test_json_event_fields(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) delta = _make_delta([_insert_op("f.py::foo")]) _write_commit(repo, delta=delta) result = _invoke(repo, "f.py::foo", "--json") data = _parse_json(result) assert len(data["events"]) == 1 ev = data["events"][0] for field in ( "event", "commit_id", "author", "message", "committed_at", "address", "detail", "new_address", ): assert field in ev, f"Missing event field: {field}" def test_json_events_chronological(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) c1 = _write_commit(repo, message="create", delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0) _write_commit(repo, message="modify", parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo")]), dt_offset_days=1) result = _invoke(repo, "f.py::foo", "--json") data = _parse_json(result) events = data["events"] assert len(events) == 2 # Chronological (oldest first) in JSON assert events[0]["event"] == "created" assert events[1]["event"] == "modified" def test_json_truncated_false_small_history( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _write_commit(repo) result = _invoke(repo, "f.py::foo", "--json") data = _parse_json(result) assert data["truncated"] is False def test_json_no_events_empty_list(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_commit(repo) result = _invoke(repo, "f.py::foo", "--json") data = _parse_json(result) assert data["events"] == [] def test_json_output_is_valid_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) delta = _make_delta([_insert_op("f.py::foo")]) _write_commit(repo, delta=delta) result = _invoke(repo, "f.py::foo", "--json") assert result.exit_code == 0 # Must be parseable as JSON start = result.output.index("{") json.loads(result.output[start:]) # --------------------------------------------------------------------------- # Integration — --kind filter # --------------------------------------------------------------------------- class TestKindFilter: def test_kind_created_only(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) c1 = _write_commit(repo, delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0) _write_commit(repo, parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo")]), dt_offset_days=1) result = _invoke(repo, "f.py::foo", "--kind", "created", "--all") assert result.exit_code == 0 assert "created" in result.output assert "modified" not in result.output def test_kind_modified_only(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) c1 = _write_commit(repo, delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0) _write_commit(repo, parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo", "changed")]), dt_offset_days=1) result = _invoke(repo, "f.py::foo", "--kind", "modified", "--all") assert result.exit_code == 0 assert "changed" in result.output assert "created" not in result.output def test_kind_multiple_values(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) delta = _make_delta([_insert_op("f.py::foo")]) _write_commit(repo, delta=delta) result = _invoke(repo, "f.py::foo", "--kind", "created", "--kind", "modified") assert result.exit_code == 0 def test_invalid_kind_exits_user_error(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_commit(repo) result = _invoke(repo, "f.py::foo", "--kind", "invented") assert result.exit_code == ExitCode.USER_ERROR.value def test_kind_filter_in_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) c1 = _write_commit(repo, delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0) _write_commit(repo, parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo")]), dt_offset_days=1) result = _invoke(repo, "f.py::foo", "--kind", "modified", "--json") data = _parse_json(result) assert all(ev["event"] == "modified" for ev in data["events"]) def test_no_match_shows_filter_message(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) delta = _make_delta([_insert_op("f.py::foo")]) _write_commit(repo, delta=delta) result = _invoke(repo, "f.py::foo", "--kind", "deleted") assert result.exit_code == 0 assert "no events match" in result.output # --------------------------------------------------------------------------- # Integration — --author filter # --------------------------------------------------------------------------- class TestAuthorFilter: def test_author_filter_matches(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) delta = _make_delta([_insert_op("f.py::foo")]) _write_commit(repo, author="alice", delta=delta) result = _invoke(repo, "f.py::foo", "--author", "alice") assert result.exit_code == 0 assert "alice" in result.output def test_author_filter_case_insensitive(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) delta = _make_delta([_insert_op("f.py::foo")]) _write_commit(repo, author="Alice", delta=delta) result = _invoke(repo, "f.py::foo", "--author", "ALICE") assert result.exit_code == 0 assert "Alice" in result.output def test_author_filter_no_match_empty(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) delta = _make_delta([_insert_op("f.py::foo")]) _write_commit(repo, author="alice", delta=delta) result = _invoke(repo, "f.py::foo", "--author", "nosuchauthor") assert result.exit_code == 0 assert "no events match" in result.output def test_author_filter_in_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) c1 = _write_commit(repo, author="alice", delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0) _write_commit(repo, author="bob", parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo")]), dt_offset_days=1) result = _invoke(repo, "f.py::foo", "--author", "alice", "--json") data = _parse_json(result) assert all(ev["author"] == "alice" for ev in data["events"]) # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestBlameSecurity: _ANSI = "\x1b[31mmalicious\x1b[0m" def test_ansi_in_address_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_commit(repo) result = _invoke(repo, f"f.py::{self._ANSI}") assert result.exit_code == ExitCode.USER_ERROR.value def test_null_byte_in_address_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_commit(repo) result = _invoke(repo, "f.py::foo\x00bar") assert result.exit_code == ExitCode.USER_ERROR.value def test_control_char_in_address_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_commit(repo) result = _invoke(repo, "f.py::foo\x07bell") assert result.exit_code == ExitCode.USER_ERROR.value def test_ansi_in_stored_detail_stripped_from_output( self, tmp_path: pathlib.Path ) -> None: """ANSI in a commit's new_summary must not reach the terminal.""" repo = _make_repo(tmp_path) # Store a commit with ANSI in new_summary (simulates a compromised record) malicious_summary = f"modified {self._ANSI}" delta = _make_delta([_replace_op("f.py::foo", malicious_summary)]) _write_commit(repo, delta=delta) result = _invoke(repo, "f.py::foo") assert result.exit_code == 0 assert "\x1b[" not in result.output def test_ansi_in_author_stripped_from_output( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) delta = _make_delta([_insert_op("f.py::foo")]) _write_commit(repo, author=self._ANSI, delta=delta) result = _invoke(repo, "f.py::foo") assert result.exit_code == 0 assert "\x1b[" not in result.output def test_ansi_in_message_stripped_from_output( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) delta = _make_delta([_insert_op("f.py::foo")]) _write_commit(repo, message=f"commit {self._ANSI}", delta=delta) result = _invoke(repo, "f.py::foo") assert result.exit_code == 0 assert "\x1b[" not in result.output def test_missing_address_separator_exits_user_error( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _write_commit(repo) result = _invoke(repo, "no-separator") assert result.exit_code == ExitCode.USER_ERROR.value def test_commit_not_found_exits_not_found( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _write_commit(repo) result = _invoke(repo, "f.py::foo", "--from", NULL_COMMIT_ID) assert result.exit_code == ExitCode.NOT_FOUND.value def test_error_message_no_traceback(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "no-separator") assert "Traceback" not in result.output def test_json_stdout_clean_on_success(self, tmp_path: pathlib.Path) -> None: """JSON consumers must not see non-JSON data on stdout.""" repo = _make_repo(tmp_path) delta = _make_delta([_insert_op("f.py::foo")]) _write_commit(repo, delta=delta) result = _invoke(repo, "f.py::foo", "--json") stripped = result.output.lstrip() assert stripped.startswith("{"), f"Expected JSON on stdout, got: {result.output[:80]!r}" # --------------------------------------------------------------------------- # E2E — full CLI flag coverage # --------------------------------------------------------------------------- class TestE2E: def test_help_shows_new_flags(self, tmp_path: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "blame", "--help"]) assert result.exit_code == 0 assert "--kind" in result.output assert "--author" in result.output assert "--all" in result.output assert "--json" in result.output assert "--from" in result.output assert "--max" in result.output def test_default_max_is_applied(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.blame import _DEFAULT_MAX assert _DEFAULT_MAX == 500 def test_max_one_commit_scanned(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _write_commit(repo) result = _invoke(repo, "f.py::foo", "--max", "1", "--json") assert result.exit_code == 0 data = _parse_json(result) assert data["total_commits_scanned"] == 1 def test_from_ref_head(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) delta = _make_delta([_insert_op("f.py::foo")]) _write_commit(repo, delta=delta) result = _invoke(repo, "f.py::foo", "--from", "HEAD") assert result.exit_code == 0 def test_kind_and_author_combined(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) c1 = _write_commit(repo, author="alice", delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0) _write_commit(repo, author="bob", parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo")]), dt_offset_days=1) result = _invoke(repo, "f.py::foo", "--kind", "created", "--author", "alice", "--json") data = _parse_json(result) assert all( ev["event"] == "created" and ev["author"] == "alice" for ev in data["events"] ) def test_full_history_chronological_in_json( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) c1 = _write_commit(repo, message="create", delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0) c2 = _write_commit(repo, message="mod1", parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod1")]), dt_offset_days=1) _write_commit(repo, message="mod2", parent_id=c2.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod2")]), dt_offset_days=2) result = _invoke(repo, "f.py::foo", "--json", "--all") data = _parse_json(result) messages = [ev["message"] for ev in data["events"]] assert messages == ["create", "mod1", "mod2"] # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestStress: def test_early_exit_on_created(self, tmp_path: pathlib.Path) -> None: """Scan stops at 'created' — rest of chain is not processed.""" repo = _make_repo(tmp_path) # chain: create → 49 modifications c = _write_commit( repo, message="create", delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0, ) for i in range(1, 50): c = _write_commit( repo, message=f"mod{i}", parent_id=c.commit_id, delta=_make_delta([_replace_op("f.py::foo", f"mod{i}")]), dt_offset_days=i, ) result = _invoke(repo, "f.py::foo", "--json", "--all") assert result.exit_code == 0 data = _parse_json(result) # All 50 events should be present (created + 49 mods) assert len(data["events"]) == 50 # early-exit: commits scanned should be exactly 50 (not more) assert data["total_commits_scanned"] == 50 def test_50_event_history_all_flag(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) c = _write_commit( repo, delta=_make_delta([_insert_op("f.py::bar")]), dt_offset_days=0 ) for i in range(1, 50): c = _write_commit( repo, parent_id=c.commit_id, delta=_make_delta([_replace_op("f.py::bar", f"change{i}")]), dt_offset_days=i, ) result = _invoke(repo, "f.py::bar", "--all") assert result.exit_code == 0 assert "change49" in result.output def test_concurrent_blame_isolated_repos( self, tmp_path: pathlib.Path ) -> None: """Eight threads each blame their own isolated repo — no shared state.""" from muse.cli.commands.blame import _events_in_commit errors: list[str] = [] def worker(idx: int) -> None: try: repo = _make_repo(tmp_path / f"repo{idx}") delta = _make_delta([_insert_op(f"f.py::sym{idx}")]) c = _write_commit(repo, delta=delta) # Directly test core logic (not CliRunner — env not thread-safe) evs, _ = _events_in_commit( c, f"f.py::sym{idx}", "f.py", f"sym{idx}" ) if len(evs) != 1 or evs[0].kind != "created": errors.append(f"Thread {idx}: unexpected events {evs!r}") except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent blame failures: {errors}" def test_flat_ops_500_patch_children(self) -> None: from muse.cli.commands.blame import _flat_ops from muse.domain import PatchOp children = [_insert_op(f"f.py::sym{i}") for i in range(500)] patch = PatchOp(op="patch", address="f.py", child_ops=children, child_domain="code", child_summary="test") result = _flat_ops([patch]) assert len(result) == 500 # --------------------------------------------------------------------------- # Flag registration tests # --------------------------------------------------------------------------- import argparse as _argparse from muse.cli.commands.blame import register as _register_blame from muse.core.paths import muse_dir, ref_path def _parse_blame(*args: str) -> _argparse.Namespace: """Build an argument parser via register() and parse args.""" root_p = _argparse.ArgumentParser() subs = root_p.add_subparsers(dest="cmd") _register_blame(subs) return root_p.parse_args(["blame", *args]) class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: ns = _parse_blame("src/foo.py") assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = _parse_blame("src/foo.py", "--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = _parse_blame("src/foo.py", "-j") assert ns.json_out is True def test_address_positional(self) -> None: ns = _parse_blame("src/foo.py::MyFn") assert ns.address == "src/foo.py::MyFn" def test_all_flag(self) -> None: ns = _parse_blame("src/foo.py", "--all") assert ns.show_all is True def test_a_shorthand_for_all(self) -> None: ns = _parse_blame("src/foo.py", "-a") assert ns.show_all is True