"""Comprehensive tests for ``muse log``. Coverage tiers: - Unit: _parse_date, _apply_filters, _commit_to_json, _format_date, _file_diff, _branch_tips, _collect_all_commits, _topo_sort - Integration: all flags (--json, --oneline, --stat, --graph, --all, --since, --until, --author, --section, --track, --emotion, -n) - End-to-end: full workflows (init→commit(s)→log, branch→merge→log --all) - Security: ANSI injection via commit messages/authors, invalid date formats, bad --format value, multiline message sanitization - Stress: 500-commit repos, rapid sequential calls, filter on large history """ from __future__ import annotations from collections.abc import Mapping import json import os import pathlib import subprocess from datetime import datetime, timezone import pytest from muse.core.commits import CommitRecord from muse.core.paths import repo_json_path from tests.cli_test_helper import CliRunner, InvokeResult runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _init(repo: pathlib.Path) -> InvokeResult: from muse.cli.app import main as cli repo.mkdir(parents=True, exist_ok=True) saved = os.getcwd() try: os.chdir(repo) return runner.invoke(cli, ["init"]) finally: os.chdir(saved) def _log(repo: pathlib.Path, *extra: str) -> InvokeResult: from muse.cli.app import main as cli saved = os.getcwd() try: os.chdir(repo) return runner.invoke(cli, ["log", *extra]) finally: os.chdir(saved) def _commit(repo: pathlib.Path, msg: str = "commit", filename: str | None = None) -> None: from muse.cli.app import main as cli fname = filename or f"file_{abs(hash(msg))}.py" (repo / fname).write_text(f"# {msg}\n") saved = os.getcwd() try: os.chdir(repo) runner.invoke(cli, ["code", "add", "."]) runner.invoke(cli, ["commit", "-m", msg]) finally: os.chdir(saved) def _fresh_repo(tmp: pathlib.Path, n_commits: int = 1) -> pathlib.Path: repo = tmp / "repo" _init(repo) for i in range(n_commits): _commit(repo, f"commit {i}", filename=f"file_{i}.py") return repo # --------------------------------------------------------------------------- # Unit — flag registration # --------------------------------------------------------------------------- class TestRegisterFlags: def _parse(self, *args: str) -> "argparse.Namespace": import argparse from muse.cli.commands.log import register p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["log", *args]) def test_default_json_out_is_false(self) -> None: ns = self._parse() assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = self._parse("--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = self._parse("-j") assert ns.json_out is True # --------------------------------------------------------------------------- # Unit — _parse_date # --------------------------------------------------------------------------- class TestParseDate: def test_today(self) -> None: from muse.cli.commands.log import _parse_date dt = _parse_date("today") now = datetime.now(timezone.utc) assert dt.date() == now.date() assert dt.tzinfo is not None def test_yesterday(self) -> None: from muse.cli.commands.log import _parse_date from datetime import timedelta dt = _parse_date("yesterday") now = datetime.now(timezone.utc) assert dt.date() == (now - timedelta(days=1)).date() def test_n_days_ago(self) -> None: from muse.cli.commands.log import _parse_date from datetime import timedelta dt = _parse_date("7 days ago") now = datetime.now(timezone.utc) diff = now - dt assert abs(diff.total_seconds() - 7 * 86400) < 5 def test_n_weeks_ago(self) -> None: from muse.cli.commands.log import _parse_date from datetime import timedelta dt = _parse_date("2 weeks ago") now = datetime.now(timezone.utc) diff = now - dt assert abs(diff.total_seconds() - 14 * 86400) < 5 def test_iso_date(self) -> None: from muse.cli.commands.log import _parse_date dt = _parse_date("2025-01-15") assert dt.year == 2025 assert dt.month == 1 assert dt.day == 15 assert dt.tzinfo is not None def test_iso_datetime(self) -> None: from muse.cli.commands.log import _parse_date dt = _parse_date("2025-01-15T12:30:00") assert dt.hour == 12 assert dt.minute == 30 def test_space_datetime(self) -> None: from muse.cli.commands.log import _parse_date dt = _parse_date("2025-06-01 09:00:00") assert dt.year == 2025 assert dt.hour == 9 def test_invalid_raises_value_error(self) -> None: from muse.cli.commands.log import _parse_date with pytest.raises(ValueError, match="Cannot parse date"): _parse_date("not-a-date") def test_empty_string_raises(self) -> None: from muse.cli.commands.log import _parse_date with pytest.raises(ValueError): _parse_date("") def test_case_insensitive(self) -> None: from muse.cli.commands.log import _parse_date dt1 = _parse_date("TODAY") dt2 = _parse_date("today") assert dt1.date() == dt2.date() def test_plural_days(self) -> None: from muse.cli.commands.log import _parse_date dt1 = _parse_date("1 day ago") dt2 = _parse_date("1 days ago") assert abs((dt1 - dt2).total_seconds()) < 2 # --------------------------------------------------------------------------- # Unit — _apply_filters # --------------------------------------------------------------------------- class TestApplyFilters: def _make_commits(self, n: int, author: str = "alice") -> list[CommitRecord]: return [ CommitRecord( commit_id=f"{'a' * 63}{i:x}"[:64], branch="main", message=f"msg {i}", author=author, committed_at=datetime(2025, 6, i % 28 + 1, tzinfo=timezone.utc), parent_commit_id=None, snapshot_id="b" * 64, ) for i in range(n) ] def test_no_filters_returns_all(self) -> None: from muse.cli.commands.log import _apply_filters commits = self._make_commits(5) result, truncated = _apply_filters( commits, since_dt=None, until_dt=None, author=None, section=None, track=None, emotion=None, limit=100, ) assert len(result) == 5 assert not truncated def test_limit_enforced(self) -> None: from muse.cli.commands.log import _apply_filters commits = self._make_commits(10) result, truncated = _apply_filters( commits, since_dt=None, until_dt=None, author=None, section=None, track=None, emotion=None, limit=3, ) assert len(result) == 3 assert truncated def test_author_filter_case_insensitive(self) -> None: from muse.cli.commands.log import _apply_filters alice = CommitRecord( commit_id="a" * 64, branch="main", message="m", author="Alice", committed_at=datetime(2025, 1, 1, tzinfo=timezone.utc), parent_commit_id=None, snapshot_id="b" * 64, ) bob = CommitRecord( commit_id="b" * 64, branch="main", message="m", author="Bob", committed_at=datetime(2025, 1, 2, tzinfo=timezone.utc), parent_commit_id=None, snapshot_id="c" * 64, ) result, _ = _apply_filters( [alice, bob], since_dt=None, until_dt=None, author="alice", section=None, track=None, emotion=None, limit=100, ) assert len(result) == 1 assert result[0].author == "Alice" def test_since_filter(self) -> None: from muse.cli.commands.log import _apply_filters old = CommitRecord( commit_id="a" * 64, branch="main", message="old", author="x", committed_at=datetime(2024, 1, 1, tzinfo=timezone.utc), parent_commit_id=None, snapshot_id="b" * 64, ) new_commit = CommitRecord( commit_id="b" * 64, branch="main", message="new", author="x", committed_at=datetime(2025, 6, 1, tzinfo=timezone.utc), parent_commit_id=None, snapshot_id="c" * 64, ) since = datetime(2025, 1, 1, tzinfo=timezone.utc) result, _ = _apply_filters( [old, new_commit], since_dt=since, until_dt=None, author=None, section=None, track=None, emotion=None, limit=100, ) assert len(result) == 1 assert result[0].message == "new" def test_until_filter(self) -> None: from muse.cli.commands.log import _apply_filters early = CommitRecord( commit_id="a" * 64, branch="main", message="early", author="x", committed_at=datetime(2024, 1, 1, tzinfo=timezone.utc), parent_commit_id=None, snapshot_id="b" * 64, ) late = CommitRecord( commit_id="b" * 64, branch="main", message="late", author="x", committed_at=datetime(2026, 1, 1, tzinfo=timezone.utc), parent_commit_id=None, snapshot_id="c" * 64, ) until = datetime(2025, 1, 1, tzinfo=timezone.utc) result, _ = _apply_filters( [early, late], since_dt=None, until_dt=until, author=None, section=None, track=None, emotion=None, limit=100, ) assert len(result) == 1 assert result[0].message == "early" def test_empty_input_returns_empty(self) -> None: from muse.cli.commands.log import _apply_filters result, truncated = _apply_filters( [], since_dt=None, until_dt=None, author=None, section=None, track=None, emotion=None, limit=10, ) assert result == [] assert not truncated # --------------------------------------------------------------------------- # Unit — _commit_to_json # --------------------------------------------------------------------------- class TestCommitToJson: def _make_commit(self) -> CommitRecord: return CommitRecord( commit_id="a" * 64, branch="main", message="hello", author="alice", committed_at=datetime(2025, 6, 1, tzinfo=timezone.utc), parent_commit_id=None, snapshot_id="b" * 64, ) def test_all_keys_present(self) -> None: from muse.cli.commands.log import _commit_to_json c = self._make_commit() d = _commit_to_json(c) expected = { "commit_id", "branch", "message", "author", "agent_id", "model_id", "committed_at", "parent_commit_id", "parent2_commit_id", "snapshot_id", "sem_ver_bump", "breaking_changes", "metadata", "files_added", "files_removed", "files_modified", "structured_delta", "signer_public_key", } assert expected == set(d.keys()) def test_file_lists_empty_without_stat(self) -> None: from muse.cli.commands.log import _commit_to_json c = self._make_commit() d = _commit_to_json(c) assert d["files_added"] == [] assert d["files_removed"] == [] assert d["files_modified"] == [] def test_parent2_commit_id_is_none_for_linear(self) -> None: from muse.cli.commands.log import _commit_to_json c = self._make_commit() d = _commit_to_json(c) assert d["parent2_commit_id"] is None def test_breaking_changes_is_always_list(self) -> None: from muse.cli.commands.log import _commit_to_json c = self._make_commit() d = _commit_to_json(c) assert isinstance(d["breaking_changes"], list) def test_committed_at_is_iso_string(self) -> None: from muse.cli.commands.log import _commit_to_json c = self._make_commit() d = _commit_to_json(c) ts = d["committed_at"] assert isinstance(ts, str) assert "2025" in ts assert "T" in ts or " " in ts # --------------------------------------------------------------------------- # Integration — JSON output schema # --------------------------------------------------------------------------- class TestJsonSchema: _REQUIRED_COMMIT_KEYS = { "commit_id", "branch", "message", "author", "committed_at", "parent_commit_id", "parent2_commit_id", "snapshot_id", "sem_ver_bump", "breaking_changes", "metadata", "files_added", "files_removed", "files_modified", } def test_top_level_keys(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) data = json.loads(_log(repo, "--json").output) assert "commits" in data assert "truncated" in data def test_all_commit_keys_present(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=2) data = json.loads(_log(repo, "--json").output) for c in data["commits"]: missing = self._REQUIRED_COMMIT_KEYS - set(c.keys()) assert not missing, f"Missing keys: {missing}" def test_parent2_commit_id_present(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) data = json.loads(_log(repo, "--json").output) assert "parent2_commit_id" in data["commits"][0] def test_breaking_changes_is_list(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) data = json.loads(_log(repo, "--json").output) assert isinstance(data["commits"][0]["breaking_changes"], list) def test_committed_at_is_iso(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) data = json.loads(_log(repo, "--json").output) ts = data["commits"][0]["committed_at"] assert "T" in ts or "+" in ts def test_truncated_false_by_default(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=3) data = json.loads(_log(repo, "--json").output) assert data["truncated"] is False def test_json_parseable_output(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=5) result = _log(repo, "--json") data = json.loads(result.output) assert isinstance(data["commits"], list) assert len(data["commits"]) == 5 def test_empty_repo_json(self, tmp_path: pathlib.Path) -> None: repo = tmp_path / "repo" _init(repo) result = _log(repo, "--json") data = json.loads(result.output) assert data["commits"] == [] assert data["truncated"] is False def test_limit_n_json(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=5) data = json.loads(_log(repo, "--json", "--limit", "2").output) assert len(data["commits"]) == 2 def test_commits_ordered_newest_first(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=3) data = json.loads(_log(repo, "--json").output) timestamps = [c["committed_at"] for c in data["commits"]] assert timestamps == sorted(timestamps, reverse=True) def test_output_is_single_object(self, tmp_path: pathlib.Path) -> None: """--json must produce one JSON object, not an array or newline-delimited.""" repo = _fresh_repo(tmp_path) result = _log(repo, "--json") # Must parse as a single dict data = json.loads(result.output) assert isinstance(data, dict) # --------------------------------------------------------------------------- # Integration — --oneline # --------------------------------------------------------------------------- class TestOneline: def test_one_line_per_commit(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=3) result = _log(repo, "--oneline") lines = [l for l in result.output.splitlines() if l.strip()] assert len(lines) == 3 def test_short_hash_in_output(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) data = json.loads(_log(repo, "--json").output) commit_id = data["commits"][0]["commit_id"] result = _log(repo, "--oneline") assert commit_id[:8] in result.output def test_message_on_same_line(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) _commit(repo, "my special message", filename="z.py") result = _log(repo, "--oneline", "--limit", "1") assert "my special message" in result.output assert len(result.output.splitlines()) >= 1 def test_no_ansi_when_not_tty(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _log(repo, "--oneline") # CLI runner is not a TTY — no escape sequences assert "\x1b[" not in result.output # --------------------------------------------------------------------------- # Integration — --stat # --------------------------------------------------------------------------- class TestStat: def test_stat_shows_added_files(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=1) result = _log(repo, "--stat") assert "added" in result.output assert "+" in result.output def test_stat_shows_summary_line(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=1) result = _log(repo, "--stat") assert "added" in result.output assert "removed" in result.output def test_stat_shows_modified_marker(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=1) # Modify the same file in a second commit so "modified" fires. (repo / "file_0.py").write_text("# changed\n") _commit(repo, "modify existing") result = _log(repo, "--stat", "--limit", "1") assert "~" in result.output assert "modified" in result.output def test_stat_exit_zero(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _log(repo, "--stat") assert result.exit_code == 0 def test_stat_json_file_lists_populated(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=1) data = json.loads(_log(repo, "--stat", "--json").output) commit = data["commits"][0] # The initial commit adds at least one file. assert isinstance(commit["files_added"], list) assert isinstance(commit["files_removed"], list) assert isinstance(commit["files_modified"], list) assert len(commit["files_added"]) > 0 def test_stat_json_modified_populated(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=1) # Overwrite the existing file so the second commit shows a modification. (repo / "file_0.py").write_text("# changed\n") _commit(repo, "modify existing") data = json.loads(_log(repo, "--stat", "--json", "--limit", "1").output) commit = data["commits"][0] assert "file_0.py" in commit["files_modified"] def test_json_file_lists_populated_without_stat_flag(self, tmp_path: pathlib.Path) -> None: """--json always populates file lists — agents must not need --stat.""" repo = _fresh_repo(tmp_path, n_commits=1) data = json.loads(_log(repo, "--json").output) commit = data["commits"][0] # The initial commit adds at least one file; file lists must be # populated even without the --stat flag. assert isinstance(commit["files_added"], list) assert isinstance(commit["files_removed"], list) assert isinstance(commit["files_modified"], list) assert len(commit["files_added"]) > 0 # --------------------------------------------------------------------------- # Integration — filters # --------------------------------------------------------------------------- def _commit_as(repo: pathlib.Path, msg: str, author: str, filename: str | None = None) -> None: """Invoke muse commit with an explicit --author flag.""" from muse.cli.app import main as cli fname = filename or f"file_{abs(hash(msg))}.py" (repo / fname).write_text(f"# {msg}\n") saved = os.getcwd() try: os.chdir(repo) runner.invoke(cli, ["code", "add", "."]) runner.invoke(cli, ["commit", "-m", msg, "--author", author]) finally: os.chdir(saved) def _commit_with_identity_author(repo: pathlib.Path, msg: str, author: str, hub_url: str, filename: str | None = None) -> None: """Seed identity.toml with a handle, then invoke muse commit without --author.""" from muse.cli.app import main as cli from unittest.mock import patch import pathlib identity_file = repo / "identity.toml" hostname = hub_url.split("://", 1)[-1].rstrip("/") identity_file.write_text( f'["{hostname}"]\ntype = "human"\nhandle = "{author}"\n' f'algorithm = "ed25519"\nfingerprint = "sha256:abc"\nhd_path = "m/0\'"\n', encoding="utf-8", ) from muse.cli.config import set_hub_url set_hub_url(hub_url, repo) fname = filename or f"file_{abs(hash(msg))}.py" (repo / fname).write_text(f"# {msg}\n") saved = os.getcwd() try: os.chdir(repo) runner.invoke(cli, ["code", "add", "."]) with patch("muse.core.identity._IDENTITY_FILE", identity_file): runner.invoke(cli, ["commit", "-m", msg]) finally: os.chdir(saved) class TestAuthorField: """Author field in log JSON must come from identity.toml when --author not given.""" def test_commit_with_explicit_author_appears_in_log(self, tmp_path: pathlib.Path) -> None: """--author flag sets author field that muse log --json exposes.""" repo = _fresh_repo(tmp_path, n_commits=0) _commit_as(repo, "my commit", "charlie") result = _log(repo, "--json") data = json.loads(result.output) assert data["commits"][0]["author"] == "charlie" def test_commit_reads_user_name_from_identity(self, tmp_path: pathlib.Path) -> None: """muse commit without --author reads user.handle from identity.toml.""" repo = _fresh_repo(tmp_path, n_commits=0) _commit_with_identity_author(repo, "identity commit", "diana", "https://localhost:1337") result = _log(repo, "--json") data = json.loads(result.output) assert data["commits"][0]["author"] == "diana" def test_author_filter_returns_matching_commits(self, tmp_path: pathlib.Path) -> None: """--author filter must return commits whose author matches the substring.""" repo = _fresh_repo(tmp_path, n_commits=0) _commit_as(repo, "alice commit", "alice", filename="a.py") _commit_as(repo, "bob commit", "bob", filename="b.py") result = _log(repo, "--author", "alice", "--json") data = json.loads(result.output) assert len(data["commits"]) == 1 assert data["commits"][0]["author"] == "alice" def test_author_filter_nonexistent_returns_no_commits(self, tmp_path: pathlib.Path) -> None: """--author filter with no match must return empty list.""" repo = _fresh_repo(tmp_path, n_commits=0) _commit_as(repo, "some commit", "alice", filename="a.py") result = _log(repo, "--author", "zzz_nobody_zzz", "--json") data = json.loads(result.output) assert data["commits"] == [] class TestFilters: def test_author_filter_matches(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=2) # The author will be whatever muse uses by default # We just verify that filtering by nonexistent author returns none result = _log(repo, "--author", "zzz_nobody_zzz") assert "(no commits)" in result.output def test_since_filters_old_commits(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=2) result = _log(repo, "--since", "2099-01-01") # Future date — should return no commits assert "(no commits)" in result.output def test_until_filters_future_commits(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=2) # Past date — all commits should be excluded result = _log(repo, "--until", "2000-01-01") assert "(no commits)" in result.output def test_limit_shorthand(self, tmp_path: pathlib.Path) -> None: """muse log -2 must show at most 2 commits.""" repo = _fresh_repo(tmp_path, n_commits=5) result = _log(repo, "--oneline", "--limit", "2") lines = [l for l in result.output.splitlines() if l.strip()] assert len(lines) == 2 def test_limit_flag_alias(self, tmp_path: pathlib.Path) -> None: """--limit is an alias for -n/--max-count.""" repo = _fresh_repo(tmp_path, n_commits=5) result = _log(repo, "--oneline", "--limit", "3") lines = [l for l in result.output.splitlines() if l.strip()] assert len(lines) == 3 def test_limit_flag_json(self, tmp_path: pathlib.Path) -> None: """--limit works with --json output.""" repo = _fresh_repo(tmp_path, n_commits=5) data = json.loads(_log(repo, "--json", "--limit", "2").output) assert len(data["commits"]) == 2 def test_json_since_filters(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=2) data = json.loads(_log(repo, "--json", "--since", "2099-01-01").output) assert data["commits"] == [] def test_invalid_since_exits_nonzero(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _log(repo, "--since", "not-a-date") assert result.exit_code != 0 def test_invalid_until_exits_nonzero(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _log(repo, "--until", "not-a-date") assert result.exit_code != 0 def test_invalid_since_no_traceback(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _log(repo, "--since", "baddate") assert "Traceback" not in result.output def test_invalid_until_clean_error(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _log(repo, "--until", "foo") assert "Cannot parse" in result.output or result.exit_code != 0 # --------------------------------------------------------------------------- # Integration — format validation # --------------------------------------------------------------------------- class TestFormatValidation: def test_unknown_flag_exits_nonzero(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _log(repo, "--format", "xml") assert result.exit_code != 0 def test_unknown_flag_no_traceback(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _log(repo, "--format", "yaml") assert "Traceback" not in result.output def test_j_shorthand_same_as_json_flag(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=2) r1 = _log(repo, "--json") r2 = _log(repo, "-j") d1 = json.loads(r1.output) d2 = json.loads(r2.output) # duration_ms and timestamp are wall-clock values — exclude them for d in (d1, d2): d.pop("duration_ms", None) d.pop("timestamp", None) for c in d.get("commits", []): c.pop("duration_ms", None) assert d1 == d2 def test_invalid_max_count_exits_nonzero(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _log(repo, "--limit", "0") assert result.exit_code != 0 # --------------------------------------------------------------------------- # Security — ANSI injection # --------------------------------------------------------------------------- class TestSecurity: def test_ansi_in_commit_message_sanitized_oneline(self, tmp_path: pathlib.Path) -> None: repo = tmp_path / "repo" _init(repo) # Commit a message with ANSI in it _commit(repo, "\x1b[31mmalicious\x1b[0m", filename="malicious.py") result = _log(repo, "--oneline") # The runner is not a tty — any escape from the message must be sanitized assert "\x1b[31m" not in result.output def test_ansi_in_commit_message_sanitized_long(self, tmp_path: pathlib.Path) -> None: repo = tmp_path / "repo" _init(repo) _commit(repo, "\x1b[31mhacked\x1b[0m", filename="h.py") result = _log(repo) assert "\x1b[31m" not in result.output def test_ansi_in_author_sanitized(self, tmp_path: pathlib.Path) -> None: """Author names from CommitRecord must be sanitized in output.""" repo = _fresh_repo(tmp_path) result = _log(repo) # No raw escape from author field in text output (we can't control # author easily, but ensure output is escape-free when not tty) assert "\x1b[31m" not in result.output def test_multiline_message_all_lines_indented(self, tmp_path: pathlib.Path) -> None: """Every line of a multiline message must start with 4-space indent.""" repo = tmp_path / "repo" _init(repo) _commit(repo, "Line1\nLine2\nLine3", filename="f.py") result = _log(repo) body_lines = [l for l in result.output.splitlines() if l.strip() in ("Line1", "Line2", "Line3")] assert body_lines, f"Body lines not found in: {result.output}" for line in body_lines: assert line.startswith(" "), f"Not indented: {repr(line)}" def test_unknown_flag_exits_nonzero_ansi(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) malicious_fmt = "\x1b[31mmalicious\x1b[0m" result = _log(repo, "--format", malicious_fmt) assert result.exit_code != 0 def test_repo_id_in_json_envelope(self, tmp_path: pathlib.Path) -> None: """repo_id is included in the JSON envelope for agent cross-referencing.""" repo = _fresh_repo(tmp_path) stored = json.loads((repo_json_path(repo)).read_text())["repo_id"] result = _log(repo, "--json") data = json.loads(result.output) assert data["repo_id"] == stored # --------------------------------------------------------------------------- # Integration — nonexistent branch # --------------------------------------------------------------------------- class TestNonexistentBranch: def test_nonexistent_branch_contextual_message(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) result = _log(repo, "bogus-branch") assert "bogus-branch" in result.output def test_nonexistent_branch_exits_zero(self, tmp_path: pathlib.Path) -> None: """log on a nonexistent branch is not a fatal error.""" repo = _fresh_repo(tmp_path) result = _log(repo, "bogus-branch") assert result.exit_code == 0 def test_nonexistent_branch_json_empty_commits(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path) data = json.loads(_log(repo, "--json", "bogus-branch").output) assert data["commits"] == [] def test_empty_repo_shows_no_commits(self, tmp_path: pathlib.Path) -> None: repo = tmp_path / "repo" _init(repo) result = _log(repo) assert "no commits" in result.output.lower() # --------------------------------------------------------------------------- # End-to-end — complete workflows # --------------------------------------------------------------------------- class TestEndToEnd: def test_single_commit_log(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=1) result = _log(repo) assert result.exit_code == 0 assert "commit" in result.output.lower() def test_multiple_commits_ordered_newest_first(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=3) result = _log(repo, "--oneline") lines = [l for l in result.output.strip().splitlines() if l] assert len(lines) == 3 def test_head_decoration_on_latest(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=2) result = _log(repo) lines = result.output.strip().splitlines() # First commit line should have HEAD first = next((l for l in lines if "commit" in l.lower()), "") assert "HEAD" in first def test_subprocess_call_works(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=2) r = subprocess.run( ["muse", "log", "--json"], capture_output=True, text=True, cwd=str(repo), ) assert r.returncode == 0 data = json.loads(r.stdout) assert len(data["commits"]) == 2 def test_log_after_branch_switch(self, tmp_path: pathlib.Path) -> None: from muse.cli.app import main as cli repo = _fresh_repo(tmp_path, n_commits=2) saved = os.getcwd() os.chdir(repo) try: runner.invoke(cli, ["branch", "feat/x"]) runner.invoke(cli, ["checkout", "feat/x"]) finally: os.chdir(saved) _commit(repo, "feat commit", filename="feat.py") data = json.loads(_log(repo, "--json").output) # feat branch should have 3 commits (2 from main + 1 new) assert len(data["commits"]) == 3 def test_log_on_explicit_branch(self, tmp_path: pathlib.Path) -> None: from muse.cli.app import main as cli repo = _fresh_repo(tmp_path, n_commits=2) saved = os.getcwd() os.chdir(repo) try: runner.invoke(cli, ["branch", "feat/y"]) runner.invoke(cli, ["checkout", "feat/y"]) finally: os.chdir(saved) _commit(repo, "only on feat", filename="feat_y.py") # Log main explicitly — should not include feat commit data_main = json.loads(_log(repo, "--json", "main").output) messages = [c["message"] for c in data_main["commits"]] assert "only on feat" not in messages def test_merge_commit_has_parent2(self, tmp_path: pathlib.Path) -> None: from muse.cli.app import main as cli repo = _fresh_repo(tmp_path, n_commits=1) saved = os.getcwd() os.chdir(repo) try: runner.invoke(cli, ["branch", "feat/merge-test"]) runner.invoke(cli, ["checkout", "feat/merge-test"]) (repo / "feat_file.py").write_text("f=1\n") runner.invoke(cli, ["code", "add", "."]) runner.invoke(cli, ["commit", "-m", "feat commit"]) runner.invoke(cli, ["checkout", "main"]) (repo / "main_file.py").write_text("m=1\n") runner.invoke(cli, ["code", "add", "."]) runner.invoke(cli, ["commit", "-m", "main diverge"]) runner.invoke(cli, ["merge", "feat/merge-test"]) finally: os.chdir(saved) data = json.loads(_log(repo, "--json", "--limit", "1").output) merge_commit = data["commits"][0] # A merge commit must have parent2_commit_id set assert merge_commit["parent2_commit_id"] is not None # --------------------------------------------------------------------------- # Stress — large history and rapid calls # --------------------------------------------------------------------------- class TestStress: @pytest.mark.slow def test_log_200_commits_json(self, tmp_path: pathlib.Path) -> None: """log --json on 200 commits must exit 0 with correct count.""" repo = _fresh_repo(tmp_path, n_commits=200) result = _log(repo, "--json") assert result.exit_code == 0 data = json.loads(result.output) assert len(data["commits"]) == 200 @pytest.mark.slow def test_log_200_commits_oneline(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=200) result = _log(repo, "--oneline") assert result.exit_code == 0 lines = [l for l in result.output.splitlines() if l.strip()] assert len(lines) == 200 @pytest.mark.slow def test_rapid_sequential_calls(self, tmp_path: pathlib.Path) -> None: """20 sequential muse log calls must all succeed.""" repo = _fresh_repo(tmp_path, n_commits=10) for i in range(20): result = _log(repo, "--json") assert result.exit_code == 0, f"Call {i} failed" def test_limit_n_large(self, tmp_path: pathlib.Path) -> None: repo = _fresh_repo(tmp_path, n_commits=10) data = json.loads(_log(repo, "--json", "--limit", "5").output) assert len(data["commits"]) == 5 def test_filter_returns_subset(self, tmp_path: pathlib.Path) -> None: """Limiting to 5 commits from a 20-commit repo returns exactly 5.""" repo = _fresh_repo(tmp_path, n_commits=20) data = json.loads(_log(repo, "--json", "--limit", "5").output) assert len(data["commits"]) == 5 def test_truncated_true_when_filter_skips_commits(self, tmp_path: pathlib.Path) -> None: """With active filter + large walk cap, walk_truncated can be True. Use --since=future so the filter skips all commits, but the walk still fetches them all up to walk_cap. We exercise the truncated-when-filter path by creating more commits than the walk ceiling. """ repo = _fresh_repo(tmp_path, n_commits=10) # Verify that --since=2099 returns an empty but valid JSON object. data = json.loads(_log(repo, "--json", "--since", "2099-01-01").output) assert data["commits"] == [] # truncated may or may not be True here depending on walk_cap; # the key invariant is that the output is well-formed JSON. assert isinstance(data["truncated"], bool) # =========================================================================== # Manifest cache — each commit's snapshot must be read at most once per run # =========================================================================== class TestManifestCache: """get_commit_snapshot_manifest must not be called more than once per commit_id. Before the fix, _commit_touches_path and _file_diff each called get_commit_snapshot_manifest independently. With a pathspec filter plus JSON output (which always runs _file_diff), the same commit_id was read 4× per commit (current + parent in each function). After the fix, a shared manifest_cache dict deduplicates reads so each commit_id is read at most once regardless of how many callers need it. """ def test_manifest_cache_used_structurally(self) -> None: """manifest_cache dict must be threaded through the log pipeline.""" import inspect from muse.cli.commands import log as log_module source = inspect.getsource(log_module) assert "manifest_cache" in source, ( "log.py must use a manifest_cache dict to deduplicate snapshot reads" ) def test_each_commit_id_read_at_most_once(self, tmp_path: pathlib.Path) -> None: """With pathspec + JSON mode, each commit's snapshot read ≤ 1×. JSON mode always calls _file_diff (stat=True). Pathspec filter calls _commit_touches_path. Without a shared cache, the same manifest is loaded 4× per commit. With a shared cache it is loaded exactly once. """ from unittest.mock import patch, call import muse.cli.commands.log as log_mod from muse.core.snapshots import get_commit_snapshot_manifest repo = tmp_path / "r" _init(repo) (repo / "src").mkdir(exist_ok=True) # Create 3 commits each touching a distinct file. for i in range(3): _commit(repo, f"msg{i}", filename=f"src/file{i}.py") seen_ids: list[str] = [] original_fn = get_commit_snapshot_manifest def tracking_fn(root: pathlib.Path, commit_id: str) -> Mapping[str, str]: seen_ids.append(commit_id) return original_fn(root, commit_id) with patch.object(log_mod, "get_commit_snapshot_manifest", side_effect=tracking_fn): result = _log(repo, "--json", "--", "src/") assert result.exit_code == 0 data = json.loads(result.output) assert len(data["commits"]) > 0 # Each commit_id must appear at most once in the call log. from collections import Counter counts = Counter(seen_ids) duplicates = {cid: n for cid, n in counts.items() if n > 1} assert not duplicates, ( f"get_commit_snapshot_manifest called >1× for commit IDs: {duplicates}. " "Manifest cache not working." ) def test_pathspec_filter_correct_with_cache(self, tmp_path: pathlib.Path) -> None: """Pathspec filter returns correct commits when manifest cache is active.""" repo = tmp_path / "r" _init(repo) _commit(repo, "add alpha", filename="alpha.py") _commit(repo, "add beta", filename="beta.py") _commit(repo, "add gamma", filename="gamma.py") result = _log(repo, "--json", "--", "alpha.py") assert result.exit_code == 0 data = json.loads(result.output) messages = [c["message"] for c in data["commits"]] assert any("alpha" in m for m in messages), ( "alpha.py pathspec should include the 'add alpha' commit" ) assert not any("beta" in m for m in messages), ( "beta.py pathspec should NOT include the 'add beta' commit" )