"""Comprehensive tests for ``muse reflog`` CLI and ``muse/core/reflog.py`` hardening. Audit findings addressed ------------------------ Security - append_reflog now sanitizes author (strips \\n, \\r, \\t to prevent line injection and tab-separator corruption) and operation (strips \\n, \\r to prevent line injection). - _fmt_entry sanitizes new_id[:12] and old_id[:12] in addition to operation — ANSI in stored commit IDs can no longer reach the terminal. - author is now shown in text output (was hidden; ANSI in author would have been invisible but present in stored data). - list_reflog_refs skips symlinks — symlinks cannot be used to escape the .muse/logs/refs/heads/ directory. - Branch names validated via validate_branch_name (path traversal blocked). - SystemExit(1) replaced with ExitCode.USER_ERROR. Performance - read_reflog now checks file size before read_text(); files larger than _MAX_REFLOG_BYTES (10 MiB) log a warning but still attempt to read, preventing silent OOM for huge reflog files. New capabilities - _ReflogEntryJson and _ReflogResultJson TypedDicts for stable schema. - --operation PATTERN filter (case-insensitive substring). - --author PATTERN filter (case-insensitive substring). - --since YYYY-MM-DD and --until YYYY-MM-DD date range filters. - --limit applied after all filters. - JSON output wrapped in _ReflogResultJson (ref, total, limit, entries). - --all --json returns _ReflogAllJson (refs, count). - "... N older entries" hint when limit is smaller than filtered total. Coverage tiers -------------- - Unit: _sanitize_author, _sanitize_operation, _parse_line, _fmt_entry - Integration: append_reflog injection, read_reflog size cap, list_reflog_refs - Security: ANSI in commit IDs/author/operation, tab injection, newline injection, path traversal branch, symlink guard - E2E: full CLI flags, JSON schema, filter combinations, exit codes - Stress: 10k-entry reflog, concurrent isolated repos """ from __future__ import annotations import datetime import json import pathlib import threading import pytest from muse.cli.commands.reflog import _ReflogAllJson, _ReflogEntryJson, _ReflogResultJson from muse.core.errors import ExitCode from muse.core.reflog import ( ReflogEntry, _MAX_REFLOG_BYTES, _parse_line, _sanitize_author, _sanitize_operation, append_reflog, list_reflog_refs, read_reflog, ) from muse.core.types import NULL_COMMIT_ID, fake_id from muse.core.paths import muse_dir, logs_dir from tests.cli_test_helper import CliRunner, InvokeResult runner = CliRunner() cli = None _NULL_ID = NULL_COMMIT_ID _SHA_A = "a" * 64 _SHA_B = "b" * 64 _SHA_C = "c" * 64 # --------------------------------------------------------------------------- # Repo / reflog helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: muse = muse_dir(tmp_path) for sub in ("commits", "snapshots", "refs/heads", "objects"): (muse / sub).mkdir(parents=True, exist_ok=True) (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") repo_id = fake_id("repo") (muse / "repo.json").write_text( json.dumps({"repo_id": repo_id}), encoding="utf-8" ) return tmp_path def _append( root: pathlib.Path, branch: str = "main", old_id: str | None = None, new_id: str = _SHA_A, author: str = "alice", operation: str = "commit: init", ) -> None: append_reflog(root, branch, old_id=old_id, new_id=new_id, author=author, operation=operation) def _invoke(root: pathlib.Path, *args: str) -> InvokeResult: return runner.invoke( cli, ["reflog", *args], env={"MUSE_REPO_ROOT": str(root)}, ) def _parse_json_blob(output: str) -> str: """Extract the first complete JSON object from CLI output.""" start = output.index("{") blob = output[start:] depth = 0 end = 0 for i, ch in enumerate(blob): if ch == "{": depth += 1 elif ch == "}": depth -= 1 if depth == 0: end = i + 1 break return blob[:end] def _parse_reflog_result(result: InvokeResult) -> _ReflogResultJson: raw = json.loads(_parse_json_blob(result.output)) assert isinstance(raw, dict) entries: list[_ReflogEntryJson] = [] for e in raw.get("entries", []): assert isinstance(e, dict) entries.append( _ReflogEntryJson( index=int(e.get("index", 0)), new_id=str(e.get("new_id", "")), old_id=str(e.get("old_id", "")), timestamp=str(e.get("timestamp", "")), operation=str(e.get("operation", "")), author=str(e.get("author", "")), ) ) return _ReflogResultJson( ref=str(raw.get("ref", "")), total=int(raw.get("total", 0)), limit=int(raw.get("limit", 0)), entries=entries, ) def _parse_reflog_all(result: InvokeResult) -> _ReflogAllJson: raw = json.loads(_parse_json_blob(result.output)) assert isinstance(raw, dict) raw_refs = raw.get("refs", []) assert isinstance(raw_refs, list) return _ReflogAllJson( refs=[str(r) for r in raw_refs], count=int(raw.get("count", 0)), ) def _parse_reflog_json( result: InvokeResult, ) -> "tuple[str, int, int, list[_ReflogEntryJson]]": parsed = _parse_reflog_result(result) return parsed["ref"], parsed["total"], parsed["limit"], parsed["entries"] # --------------------------------------------------------------------------- # Unit — _sanitize_author # --------------------------------------------------------------------------- class TestSanitizeAuthor: def test_strips_newline(self) -> None: assert "\n" not in _sanitize_author("alice\nbob") def test_strips_cr(self) -> None: assert "\r" not in _sanitize_author("alice\rbob") def test_strips_tab(self) -> None: assert "\t" not in _sanitize_author("alice\tbob") def test_preserves_spaces(self) -> None: assert _sanitize_author("Alice Smith ") == "Alice Smith " def test_empty_string(self) -> None: assert _sanitize_author("") == "" def test_combined_strips(self) -> None: result = _sanitize_author("a\n\r\tb") assert result == "ab" # --------------------------------------------------------------------------- # Unit — _sanitize_operation # --------------------------------------------------------------------------- class TestSanitizeOperation: def test_strips_newline(self) -> None: assert "\n" not in _sanitize_operation("commit: init\nINJECTED") def test_strips_cr(self) -> None: assert "\r" not in _sanitize_operation("commit: init\rINJECTED") def test_preserves_tab(self) -> None: # tabs inside operation body are preserved (operation is already past # the tab boundary in the file) assert "\t" in _sanitize_operation("commit:\tinit") def test_empty_string(self) -> None: assert _sanitize_operation("") == "" # --------------------------------------------------------------------------- # Unit — _parse_line # --------------------------------------------------------------------------- class TestParseLine: def test_valid_line(self) -> None: ts = int(datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc).timestamp()) line = f"{_NULL_ID} {_SHA_A} alice {ts} +0000\tcommit: init" entry = _parse_line(line) assert entry is not None assert entry.old_id == _NULL_ID assert entry.new_id == _SHA_A assert entry.author == "alice" assert entry.operation == "commit: init" def test_no_tab_returns_none(self) -> None: line = f"{_NULL_ID} {_SHA_A} alice 1000000 +0000 commit: init" assert _parse_line(line) is None def test_too_few_tokens_returns_none(self) -> None: line = f"{_NULL_ID} {_SHA_A}\tcommit: init" assert _parse_line(line) is None def test_author_with_spaces(self) -> None: ts = int(datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc).timestamp()) line = f"{_NULL_ID} {_SHA_A} Alice Smith {ts} +0000\tcommit: init" entry = _parse_line(line) assert entry is not None assert "Alice Smith" in entry.author def test_bad_timestamp_defaults_to_now(self) -> None: before = datetime.datetime.now(tz=datetime.timezone.utc) line = f"{_NULL_ID} {_SHA_A} alice NOTANUMBER +0000\tcommit: init" entry = _parse_line(line) assert entry is not None assert entry.timestamp >= before def test_trailing_newline_stripped(self) -> None: ts = int(datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc).timestamp()) line = f"{_NULL_ID} {_SHA_A} alice {ts} +0000\tcommit: init\n" entry = _parse_line(line) assert entry is not None assert entry.operation == "commit: init" # --------------------------------------------------------------------------- # Unit — _fmt_entry # --------------------------------------------------------------------------- class TestFmtEntry: _ANSI = "\x1b[31mRED\x1b[0m" def _entry( self, operation: str = "commit: test", new_id: str = _SHA_A, old_id: str = _NULL_ID, author: str = "alice", ) -> ReflogEntry: return ReflogEntry( old_id=old_id, new_id=new_id, author=author, timestamp=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc), operation=operation, ) def test_sanitizes_operation(self) -> None: from muse.cli.commands.reflog import _fmt_entry entry = self._entry(operation=f"commit: {self._ANSI}") result = _fmt_entry(0, entry) assert "\x1b" not in result def test_sanitizes_new_id(self) -> None: from muse.cli.commands.reflog import _fmt_entry # First 12 chars of new_id could theoretically contain control chars new_id = f"\x1b[31m{'a' * 60}" entry = self._entry(new_id=new_id) result = _fmt_entry(0, entry) assert "\x1b" not in result def test_sanitizes_author(self) -> None: from muse.cli.commands.reflog import _fmt_entry entry = self._entry(author=f"malicious{self._ANSI}") result = _fmt_entry(0, entry) assert "\x1b" not in result def test_initial_shown_for_null_old_id(self) -> None: from muse.cli.commands.reflog import _fmt_entry entry = self._entry(old_id=_NULL_ID) result = _fmt_entry(0, entry) assert "initial" in result def test_non_null_old_id_shown_as_short(self) -> None: from muse.cli.commands.reflog import _fmt_entry entry = self._entry(old_id=_SHA_B) result = _fmt_entry(0, entry) assert _SHA_B[:12] in result def test_author_shown_in_output(self) -> None: from muse.cli.commands.reflog import _fmt_entry entry = self._entry(author="bob-agent") result = _fmt_entry(0, entry) assert "bob-agent" in result def test_index_shown(self) -> None: from muse.cli.commands.reflog import _fmt_entry entry = self._entry() result = _fmt_entry(7, entry) assert "@{7" in result # --------------------------------------------------------------------------- # Integration — append_reflog injection prevention # --------------------------------------------------------------------------- class TestAppendReflogInjection: def test_newline_in_author_stripped(self, tmp_path: pathlib.Path) -> None: append_reflog(tmp_path, "main", None, _SHA_A, "alice\nmalicious", "commit: test") entries = read_reflog(tmp_path, "main") assert all("\n" not in e.author for e in entries) def test_tab_in_author_stripped(self, tmp_path: pathlib.Path) -> None: """Tab in author would corrupt the metadata/operation split.""" append_reflog(tmp_path, "main", None, _SHA_A, "alice\tmalicious", "commit: test") entries = read_reflog(tmp_path, "main") assert all("\t" not in e.author for e in entries) def test_newline_in_operation_stripped(self, tmp_path: pathlib.Path) -> None: append_reflog(tmp_path, "main", None, _SHA_A, "alice", "commit: init\nINJECTED_LINE") entries = read_reflog(tmp_path, "main") assert len(entries) == 1 # no fake second entry assert "\n" not in entries[0].operation def test_cr_in_author_stripped(self, tmp_path: pathlib.Path) -> None: append_reflog(tmp_path, "main", None, _SHA_A, "alice\rmalicious", "commit: test") entries = read_reflog(tmp_path, "main") assert all("\r" not in e.author for e in entries) def test_multiple_entries_after_injection_attempt( self, tmp_path: pathlib.Path ) -> None: append_reflog(tmp_path, "main", None, _SHA_A, "alice", "commit: first\nINJECTED") append_reflog(tmp_path, "main", _SHA_A, _SHA_B, "bob", "commit: second") entries = read_reflog(tmp_path, "main") assert len(entries) == 2 def test_entries_readable_after_injection( self, tmp_path: pathlib.Path ) -> None: append_reflog(tmp_path, "main", None, _SHA_A, "a\tb", "commit: init") entries = read_reflog(tmp_path, "main") assert len(entries) == 1 assert entries[0].new_id == _SHA_A # --------------------------------------------------------------------------- # Integration — read_reflog size cap # --------------------------------------------------------------------------- class TestReadReflogSizeCap: def test_size_cap_constant_exists(self) -> None: assert _MAX_REFLOG_BYTES > 0 assert _MAX_REFLOG_BYTES <= 50 * 1024 * 1024 # sanity: ≤ 50 MiB def test_large_file_still_reads_with_warning( self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: """Files over the cap log a warning but still attempt to read.""" import logging repo = _make_repo(tmp_path) log_dir = logs_dir(repo) / "refs" / "heads" log_dir.mkdir(parents=True, exist_ok=True) log_path = log_dir / "main" # Write a small valid entry, then pad the file to exceed the cap ts = int(datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc).timestamp()) entry_line = f"{_NULL_ID} {_SHA_A} alice {ts} +0000\tcommit: init\n" # We can't write 10 MiB in a test feasibly, so we patch the cap import muse.core.reflog as reflog_mod original = reflog_mod._MAX_REFLOG_BYTES try: reflog_mod._MAX_REFLOG_BYTES = len(entry_line) - 1 # one byte under log_path.write_text(entry_line, encoding="utf-8") with caplog.at_level(logging.WARNING, logger="muse.core.reflog"): entries = read_reflog(repo, "main") assert any("exceeds cap" in r.message for r in caplog.records) # Still returns entries despite the warning assert len(entries) == 1 finally: reflog_mod._MAX_REFLOG_BYTES = original # --------------------------------------------------------------------------- # Integration — list_reflog_refs symlink guard # --------------------------------------------------------------------------- class TestListReflogRefsSymlink: def test_symlink_excluded(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) log_dir = logs_dir(repo) / "refs" / "heads" log_dir.mkdir(parents=True, exist_ok=True) # Create a real log file (log_dir / "main").write_text("line\n", encoding="utf-8") # Create a symlink (points to a real file — but should be excluded) target = tmp_path / "external_file" target.write_text("external\n", encoding="utf-8") try: (log_dir / "malicious-branch").symlink_to(target) refs = list_reflog_refs(repo) assert "malicious-branch" not in refs assert "main" in refs except NotImplementedError: pytest.skip("symlinks not supported on this platform") def test_regular_files_included(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, branch="main") _append(repo, branch="dev") refs = list_reflog_refs(repo) assert "main" in refs assert "dev" in refs # --------------------------------------------------------------------------- # Security — full CLI # --------------------------------------------------------------------------- class TestReflogSecurity: _ANSI = "\x1b[31mmalicious\x1b[0m" def test_ansi_in_stored_operation_sanitized( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) # Write a raw reflog line with ANSI in the operation field log_dir = logs_dir(repo) / "refs" / "heads" log_dir.mkdir(parents=True, exist_ok=True) head_log = logs_dir(repo) / "HEAD" head_log.parent.mkdir(parents=True, exist_ok=True) ts = int(datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc).timestamp()) raw_line = f"{_NULL_ID} {_SHA_A} alice {ts} +0000\tcommit: {self._ANSI}\n" (log_dir / "main").write_text(raw_line, encoding="utf-8") head_log.write_text(raw_line, encoding="utf-8") result = _invoke(repo) assert result.exit_code == 0 assert "\x1b[" not in result.output def test_ansi_in_stored_new_id_sanitized( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) log_dir = logs_dir(repo) / "refs" / "heads" log_dir.mkdir(parents=True, exist_ok=True) head_log = logs_dir(repo) / "HEAD" head_log.parent.mkdir(parents=True, exist_ok=True) ts = int(datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc).timestamp()) # new_id starts with ANSI (first 12 chars shown in text output) malicious_id = f"\x1b[31m{'a' * 60}" raw_line = f"{_NULL_ID} {malicious_id} alice {ts} +0000\tcommit: test\n" (log_dir / "main").write_text(raw_line, encoding="utf-8") head_log.write_text(raw_line, encoding="utf-8") result = _invoke(repo) assert result.exit_code == 0 assert "\x1b[" not in result.output def test_path_traversal_branch_rejected( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _append(repo) result = _invoke(repo, "--branch", "../../../etc/passwd") assert result.exit_code == ExitCode.USER_ERROR.value def test_dotdot_branch_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "--branch", "..") assert result.exit_code == ExitCode.USER_ERROR.value def test_unknown_flag_exits_nonzero( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "--format", "xml") assert result.exit_code != 0 def test_error_messages_no_traceback(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "--branch", "../etc/passwd") assert "Traceback" not in result.output def test_json_stdout_clean(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo) result = _invoke(repo, "--json") stripped = result.output.lstrip() assert stripped.startswith("{"), f"Expected JSON on stdout: {result.output[:80]!r}" # --------------------------------------------------------------------------- # E2E — JSON schema # --------------------------------------------------------------------------- class TestReflogJsonSchema: def test_result_fields_present(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo) result = _invoke(repo, "--json") assert result.exit_code == 0 ref, total, limit, entries = _parse_reflog_json(result) assert ref == "HEAD" assert total >= 1 assert limit == 20 # default assert len(entries) == 1 def test_entry_fields_present(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, author="alice", operation="commit: initial") result = _invoke(repo, "--json") assert result.exit_code == 0 _, _, _, entries = _parse_reflog_json(result) assert len(entries) == 1 ev = entries[0] for field in ("index", "new_id", "old_id", "timestamp", "operation", "author"): assert field in ev, f"Missing JSON field: {field}" assert ev["author"] == "alice" assert ev["operation"] == "commit: initial" def test_branch_flag_sets_ref_in_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, branch="main") result = _invoke(repo, "--branch", "main", "--json") assert result.exit_code == 0 ref, _, _, _ = _parse_reflog_json(result) assert ref == "refs/heads/main" def test_all_json_returns_refs_and_count( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _append(repo, branch="main") _append(repo, branch="dev") result = _invoke(repo, "--all", "--json") assert result.exit_code == 0 parsed = _parse_reflog_all(result) assert parsed["count"] >= 2 assert any("main" in r for r in parsed["refs"]) def test_total_reflects_filtered_count( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) for i in range(5): _append(repo, operation=f"commit: c{i}") _append(repo, operation="checkout: moving to dev") result = _invoke(repo, "--operation", "commit", "--limit", "2", "--json") _, total, limit, entries = _parse_reflog_json(result) assert total == 5 # 5 commit events total assert limit == 2 assert len(entries) == 2 # only 2 returned def test_empty_reflog_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "--json") assert result.exit_code == 0 _, total, _, entries = _parse_reflog_json(result) assert total == 0 assert entries == [] # --------------------------------------------------------------------------- # E2E — filters # --------------------------------------------------------------------------- class TestReflogFilters: def test_operation_filter(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, operation="commit: first") _append(repo, operation="checkout: switch to dev") _append(repo, operation="commit: second") result = _invoke(repo, "--operation", "commit", "--json") _, _, _, entries = _parse_reflog_json(result) assert all("commit" in str(e["operation"]) for e in entries) assert len(entries) == 2 def test_operation_filter_case_insensitive( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _append(repo, operation="Commit: first") result = _invoke(repo, "--operation", "COMMIT", "--json") _, _, _, entries = _parse_reflog_json(result) assert len(entries) == 1 def test_author_filter(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, author="alice", operation="commit: a1") _append(repo, author="bob", operation="commit: b1") _append(repo, author="alice", operation="commit: a2") result = _invoke(repo, "--author", "alice", "--json") _, _, _, entries = _parse_reflog_json(result) assert all(str(e["author"]) == "alice" for e in entries) assert len(entries) == 2 def test_author_filter_case_insensitive( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) _append(repo, author="Alice") result = _invoke(repo, "--author", "alice", "--json") _, _, _, entries = _parse_reflog_json(result) assert len(entries) == 1 def test_since_filter(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) # Write entries with specific timestamps via raw log log_dir = logs_dir(repo) (log_dir).mkdir(parents=True, exist_ok=True) head_log = log_dir / "HEAD" branch_log_dir = log_dir / "refs" / "heads" branch_log_dir.mkdir(parents=True, exist_ok=True) old_ts = int(datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc).timestamp()) new_ts = int(datetime.datetime(2026, 6, 1, tzinfo=datetime.timezone.utc).timestamp()) old_line = f"{_NULL_ID} {_SHA_A} alice {old_ts} +0000\tcommit: old\n" new_line = f"{_SHA_A} {_SHA_B} alice {new_ts} +0000\tcommit: new\n" head_log.write_text(old_line + new_line, encoding="utf-8") result = _invoke(repo, "--since", "2026-01-01", "--json") assert result.exit_code == 0 _, _, _, entries = _parse_reflog_json(result) assert len(entries) == 1 assert str(entries[0]["operation"]) == "commit: new" def test_until_filter(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) log_dir = logs_dir(repo) log_dir.mkdir(parents=True, exist_ok=True) head_log = log_dir / "HEAD" old_ts = int(datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc).timestamp()) new_ts = int(datetime.datetime(2026, 6, 1, tzinfo=datetime.timezone.utc).timestamp()) old_line = f"{_NULL_ID} {_SHA_A} alice {old_ts} +0000\tcommit: old\n" new_line = f"{_SHA_A} {_SHA_B} alice {new_ts} +0000\tcommit: new\n" head_log.write_text(old_line + new_line, encoding="utf-8") result = _invoke(repo, "--until", "2025-12-31", "--json") assert result.exit_code == 0 _, _, _, entries = _parse_reflog_json(result) assert len(entries) == 1 assert str(entries[0]["operation"]) == "commit: old" def test_since_after_until_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "--since", "2026-12-01", "--until", "2026-01-01") assert result.exit_code == ExitCode.USER_ERROR.value def test_invalid_since_date_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "--since", "not-a-date") assert result.exit_code == ExitCode.USER_ERROR.value def test_invalid_until_date_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "--until", "2026/06/01") assert result.exit_code == ExitCode.USER_ERROR.value def test_filter_combination(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, author="alice", operation="commit: a") _append(repo, author="bob", operation="commit: b") _append(repo, author="alice", operation="checkout: switch") result = _invoke(repo, "--author", "alice", "--operation", "commit", "--json") _, _, _, entries = _parse_reflog_json(result) assert len(entries) == 1 assert str(entries[0]["author"]) == "alice" def test_no_match_shows_filter_message(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo) result = _invoke(repo, "--operation", "no-such-op") assert result.exit_code == 0 assert "filter" in result.output.lower() or "No reflog" in result.output def test_limit_applied_after_filter(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) for i in range(10): _append(repo, operation="commit: c", author="alice") for i in range(5): _append(repo, operation="checkout: x", author="bob") result = _invoke(repo, "--operation", "commit", "--limit", "3", "--json") _, total, limit, entries = _parse_reflog_json(result) assert total == 10 assert limit == 3 assert len(entries) == 3 # --------------------------------------------------------------------------- # E2E — general CLI behaviour # --------------------------------------------------------------------------- class TestReflogE2E: def test_help_shows_new_flags(self) -> None: result = runner.invoke(cli, ["reflog", "--help"]) assert result.exit_code == 0 for flag in ("--operation", "--author", "--since", "--until", "--json", "--all"): assert flag in result.output def test_text_output_shows_author(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, author="ci-bot", operation="commit: init") result = _invoke(repo) assert result.exit_code == 0 assert "ci-bot" in result.output def test_hint_shown_when_more_entries_exist( self, tmp_path: pathlib.Path ) -> None: repo = _make_repo(tmp_path) for i in range(25): _append(repo, operation=f"commit: c{i}") result = _invoke(repo, "--limit", "5") assert result.exit_code == 0 assert "older" in result.output def test_all_flag_lists_branches(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, branch="main") _append(repo, branch="dev") result = _invoke(repo, "--all") assert result.exit_code == 0 assert "main" in result.output assert "dev" in result.output def test_empty_head_reflog(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo) assert result.exit_code == 0 assert "No reflog" in result.output def test_json_is_valid(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo) result = _invoke(repo, "--json") assert result.exit_code == 0 start = result.output.index("{") json.loads(result.output[start:]) # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestReflogStress: def test_10k_entries_limit_respected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) log_dir = logs_dir(repo) log_dir.mkdir(parents=True, exist_ok=True) head_log = log_dir / "HEAD" ts = int(datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc).timestamp()) lines = [ f"{_NULL_ID} {_SHA_A} alice {ts + i} +0000\tcommit: c{i}\n" for i in range(10_000) ] head_log.write_text("".join(lines), encoding="utf-8") result = _invoke(repo, "--limit", "20", "--json") assert result.exit_code == 0 _, _, _, entries = _parse_reflog_json(result) assert len(entries) == 20 def test_concurrent_reads_isolated_repos( self, tmp_path: pathlib.Path ) -> None: """Eight threads read their own isolated reflog — no shared state.""" errors: list[str] = [] def worker(idx: int) -> None: try: repo = _make_repo(tmp_path / f"repo{idx}") _append(repo, author=f"agent-{idx}", operation=f"commit: c{idx}") entries = read_reflog(repo) if len(entries) != 1: errors.append(f"Thread {idx}: got {len(entries)} entries") return if entries[0].author != f"agent-{idx}": errors.append(f"Thread {idx}: wrong author {entries[0].author!r}") except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent reflog failures: {errors}" def test_concurrent_appends_to_same_repo( self, tmp_path: pathlib.Path ) -> None: """Multiple threads appending to isolated repos — no shared state.""" from muse.core.reflog import append_reflog errors: list[str] = [] def worker(idx: int) -> None: try: repo = _make_repo(tmp_path / f"repo{idx}") for i in range(10): append_reflog( repo, "main", old_id=None if i == 0 else _SHA_A, new_id=_SHA_A, author=f"agent-{idx}", operation=f"commit: c{i}", ) entries = read_reflog(repo, "main") if len(entries) != 10: errors.append(f"Thread {idx}: expected 10, got {len(entries)}") except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Concurrent append failures: {errors}"