"""Supercharge tests for ``muse reflog``. Coverage tiers -------------- - Unit: _short_id helper — bare hex and sha256:-prefixed inputs - Integration: duration_ms + exit_code in both JSON output paths - Data integrity: new_id/old_id sha256:-prefixed in JSON; text short IDs - Filter behaviour: total reflects post-filter count; date range edge cases - Security: null-ID shown as sha256:000…, ANSI in IDs sanitised in text - Performance: empty reflog and 100-entry reflog timing """ from __future__ import annotations import datetime import json import pathlib import re import time from muse.core.errors import ExitCode from muse.core.reflog import append_reflog from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.types import NULL_COMMIT_ID, NULL_LONG_ID, long_id, fake_id, short_id as _short_id from muse.core.paths import logs_dir, muse_dir runner = CliRunner() _NULL_ID = NULL_COMMIT_ID _SHA_A = long_id("a" * 64) _SHA_B = long_id("b" * 64) _SHA256_FULL = re.compile(r"^sha256:[0-9a-f]{64}$") _SHA256_SHORT_19 = re.compile(r"^sha256:[0-9a-f]{12}$") _TS = datetime.datetime(2026, 1, 15, 12, 0, tzinfo=datetime.timezone.utc) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: repo = tmp_path / "repo" dot_muse = muse_dir(repo) for sub in ("objects", "commits", "snapshots", "refs/heads", "logs/refs/heads", "logs"): (dot_muse / sub).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main") (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo", "domain": "code"})) return repo def _append( repo: pathlib.Path, *, branch: str = "main", old_id: str = _NULL_ID, new_id: str = _SHA_A, author: str = "gabriel", operation: str = "commit: test", timestamp: datetime.datetime | None = None, ) -> None: """Write one reflog entry. When *timestamp* is given, write the raw log line directly so tests can control the stored timestamp precisely. Otherwise delegate to ``append_reflog`` which stamps with the current time. """ if timestamp is None: append_reflog(repo, branch, old_id=old_id, new_id=new_id, author=author, operation=operation) return # Write raw log line to both HEAD and branch logs (mirrors append_reflog). ts_unix = int(timestamp.timestamp()) safe_op = operation.replace("\n", "").replace("\r", "") safe_author = author.replace("\n", "").replace("\r", "").replace("\t", "") line = f"{old_id} {new_id} {safe_author} {ts_unix} +0000\t{safe_op}\n" log_dir = logs_dir(repo) head_log = log_dir / "HEAD" head_log.write_text((head_log.read_text(encoding="utf-8") if head_log.exists() else "") + line, encoding="utf-8") if branch: branch_log = log_dir / "refs" / "heads" / branch branch_log.write_text( (branch_log.read_text(encoding="utf-8") if branch_log.exists() else "") + line, encoding="utf-8", ) def _invoke(repo: pathlib.Path, *args: str) -> InvokeResult: from muse.cli.app import main as cli return runner.invoke(cli, ["reflog", *args], env={"MUSE_REPO_ROOT": str(repo)}) # --------------------------------------------------------------------------- # Unit — _short_id # --------------------------------------------------------------------------- class TestShortId: """_short_id handles both bare-hex (reflog on-disk format) and sha256:-prefixed input.""" def test_bare_hex_prepends_sha256_prefix(self) -> None: result = _short_id(_SHA_A) assert result.startswith("sha256:") def test_bare_hex_12_hex_chars_after_prefix(self) -> None: result = _short_id(_SHA_A) assert result == long_id("a" * 12) def test_bare_hex_total_length_is_19(self) -> None: assert len(_short_id(_SHA_B)) == 19 def test_sha256_prefixed_input_handled(self) -> None: prefixed = long_id("deadbeef" * 8) result = _short_id(prefixed) assert result.startswith("sha256:") assert len(result) == 19 def test_null_id_shows_zeros(self) -> None: result = _short_id(_NULL_ID) assert result == "0" * 12 def test_matches_short_regex(self) -> None: assert _SHA256_SHORT_19.match(_short_id(_SHA_A)) # --------------------------------------------------------------------------- # Integration — text format short IDs # --------------------------------------------------------------------------- class TestTextFormatShortId: """Text format must show sha256:<12-hex> for new_id and old_id.""" def _short_tokens(self, line: str) -> list[str]: return [tok for tok in line.split() if _SHA256_SHORT_19.match(tok)] def test_new_id_shown_as_sha256_short_in_text(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, new_id=_SHA_A) result = _invoke(repo) assert result.exit_code == 0 tokens = self._short_tokens(result.output) assert any(t.startswith(long_id("a" * 12)) for t in tokens), \ f"no sha256:aaa… token in text output:\n{result.output}" def test_old_id_shown_as_sha256_short_in_text(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, old_id=_SHA_B, new_id=_SHA_A) result = _invoke(repo) assert result.exit_code == 0 assert long_id("b" * 12) in result.output, \ f"sha256:bbb… not in text output:\n{result.output}" def test_initial_entry_shows_initial_keyword(self, tmp_path: pathlib.Path) -> None: """Null old_id must render as 'initial', not sha256:000….""" repo = _make_repo(tmp_path) _append(repo, old_id=_NULL_ID) result = _invoke(repo) assert "initial" in result.output def test_text_short_id_length_is_19(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, new_id=_SHA_A) result = _invoke(repo) tokens = self._short_tokens(result.output) for tok in tokens: assert len(tok) == 19, f"short ID token has wrong length: {tok!r}" # --------------------------------------------------------------------------- # Data integrity — JSON IDs # --------------------------------------------------------------------------- class TestJsonIds: """JSON new_id / old_id must be sha256:<64-hex> canonical form.""" def test_new_id_sha256_prefixed_in_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, new_id=_SHA_A) data = json.loads(_invoke(repo, "--json").output) entry = data["entries"][0] assert entry["new_id"].startswith("sha256:"), \ f"new_id must have sha256: prefix, got {entry['new_id']!r}" def test_new_id_is_full_sha256_in_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, new_id=_SHA_A) entry = json.loads(_invoke(repo, "--json").output)["entries"][0] assert _SHA256_FULL.match(entry["new_id"]), \ f"new_id must be sha256:<64hex>, got {entry['new_id']!r}" def test_old_id_sha256_prefixed_in_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, old_id=_SHA_B, new_id=_SHA_A) entry = json.loads(_invoke(repo, "--json").output)["entries"][0] assert entry["old_id"].startswith("sha256:") def test_old_id_is_full_sha256_in_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, old_id=_SHA_B, new_id=_SHA_A) entry = json.loads(_invoke(repo, "--json").output)["entries"][0] assert _SHA256_FULL.match(entry["old_id"]) def test_null_old_id_sha256_zeros_in_json(self, tmp_path: pathlib.Path) -> None: """Initial commit: old_id = sha256:0000…0000 (64 zeros).""" repo = _make_repo(tmp_path) _append(repo, old_id=_NULL_ID) entry = json.loads(_invoke(repo, "--json").output)["entries"][0] assert entry["old_id"] == NULL_LONG_ID def test_new_id_value_round_trips(self, tmp_path: pathlib.Path) -> None: """sha256: prefix wraps the exact bare hex stored in the reflog.""" repo = _make_repo(tmp_path) _append(repo, new_id=_SHA_B) entry = json.loads(_invoke(repo, "--json").output)["entries"][0] assert entry["new_id"] == long_id(_SHA_B) # --------------------------------------------------------------------------- # Integration — duration_ms and exit_code # --------------------------------------------------------------------------- class TestDurationAndExitCode: def test_duration_ms_present_in_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo) data = json.loads(_invoke(repo, "--json").output) assert "duration_ms" in data def test_exit_code_zero_on_success(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo) data = json.loads(_invoke(repo, "--json").output) assert data["exit_code"] == 0 def test_duration_ms_is_float(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo) data = json.loads(_invoke(repo, "--json").output) assert isinstance(data["duration_ms"], float) def test_duration_ms_non_negative(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo) assert json.loads(_invoke(repo, "--json").output)["duration_ms"] >= 0.0 def test_duration_ms_3dp_precision(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo) ms = json.loads(_invoke(repo, "--json").output)["duration_ms"] assert round(ms, 3) == ms def test_all_json_has_duration_ms(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, branch="main") data = json.loads(_invoke(repo, "--all", "--json").output) assert "duration_ms" in data assert data["exit_code"] == 0 def test_filtered_json_has_duration_ms(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, operation="commit: feature") _append(repo, operation="checkout: dev", timestamp=_TS + datetime.timedelta(seconds=1)) data = json.loads(_invoke(repo, "--json", "--operation", "commit").output) assert "duration_ms" in data assert data["exit_code"] == 0 def test_empty_reflog_json_has_duration_ms(self, tmp_path: pathlib.Path) -> None: """Even with no entries the JSON output must include duration_ms.""" repo = _make_repo(tmp_path) data = json.loads(_invoke(repo, "--json").output) assert "duration_ms" in data assert data["exit_code"] == 0 # --------------------------------------------------------------------------- # Filter behaviour # --------------------------------------------------------------------------- class TestFilterBehaviour: def test_total_reflects_post_filter_count(self, tmp_path: pathlib.Path) -> None: """total in JSON is the number of entries that pass all filters, before --limit is applied.""" repo = _make_repo(tmp_path) for i in range(5): _append(repo, operation="commit: work", timestamp=_TS + datetime.timedelta(seconds=i)) for i in range(3): _append(repo, operation="checkout: branch", timestamp=_TS + datetime.timedelta(seconds=10 + i)) data = json.loads(_invoke(repo, "--json", "--operation", "commit", "--limit", "2").output) assert data["total"] == 5, "total must count all matching entries, not just displayed" assert len(data["entries"]) == 2, "entries must be capped by --limit" def test_since_until_single_day(self, tmp_path: pathlib.Path) -> None: """--since and --until set to same day returns entries on that day.""" repo = _make_repo(tmp_path) day = datetime.datetime(2026, 3, 10, tzinfo=datetime.timezone.utc) _append(repo, operation="commit: on-day", timestamp=day) _append(repo, operation="commit: day-before", timestamp=day - datetime.timedelta(days=1)) _append(repo, operation="commit: day-after", timestamp=day + datetime.timedelta(days=1)) data = json.loads( _invoke(repo, "--json", "--since", "2026-03-10", "--until", "2026-03-10").output ) assert data["total"] == 1 assert data["entries"][0]["operation"] == "commit: on-day" def test_since_after_until_errors(self, tmp_path: pathlib.Path) -> None: """--since after --until must exit USER_ERROR.""" repo = _make_repo(tmp_path) result = _invoke(repo, "--since", "2026-06-01", "--until", "2026-01-01") assert result.exit_code == ExitCode.USER_ERROR def test_limit_applied_after_all_filters(self, tmp_path: pathlib.Path) -> None: """--limit caps displayed entries but total reflects full filtered count.""" repo = _make_repo(tmp_path) for i in range(10): _append(repo, operation="commit: x", timestamp=_TS + datetime.timedelta(seconds=i)) data = json.loads(_invoke(repo, "--json", "--limit", "3").output) assert data["total"] == 10 assert len(data["entries"]) == 3 assert data["limit"] == 3 def test_operation_and_author_filters_combined(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo, author="alice", operation="commit: feature") _append(repo, author="bob", operation="commit: feature", timestamp=_TS + datetime.timedelta(seconds=1)) _append(repo, author="alice", operation="checkout: main", timestamp=_TS + datetime.timedelta(seconds=2)) data = json.loads( _invoke(repo, "--json", "--operation", "commit", "--author", "alice").output ) assert data["total"] == 1 assert data["entries"][0]["author"] == "alice" assert "commit" in data["entries"][0]["operation"] # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestSecuritySupercharge: def test_ansi_in_new_id_sanitized_in_text(self, tmp_path: pathlib.Path) -> None: """ANSI in a stored new_id is stripped before terminal output.""" repo = _make_repo(tmp_path) malicious_id = f"\x1b[31m{'a' * 60}" # starts with ANSI, then hex _append(repo, new_id=malicious_id) result = _invoke(repo) assert result.exit_code == 0 assert "\x1b" not in result.output def test_no_traceback_on_bad_format(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "--format", "msgpack") assert result.exit_code in (ExitCode.USER_ERROR, 2) assert "Traceback" not in result.output def test_no_traceback_on_bad_date(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _invoke(repo, "--since", "not-a-date") assert result.exit_code == ExitCode.USER_ERROR assert "Traceback" not in result.output # --------------------------------------------------------------------------- # Performance # --------------------------------------------------------------------------- class TestPerformanceSupercharge: def test_empty_reflog_under_100ms(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t0 = time.monotonic() result = _invoke(repo, "--json") duration_ms = (time.monotonic() - t0) * 1000 assert result.exit_code == 0 assert duration_ms < 100 def test_100_entries_under_500ms(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) for i in range(100): _append(repo, operation=f"commit: entry {i}", timestamp=_TS + datetime.timedelta(seconds=i)) t0 = time.monotonic() result = _invoke(repo, "--json", "--limit", "100") duration_ms = (time.monotonic() - t0) * 1000 assert result.exit_code == 0 assert duration_ms < 500 def test_duration_ms_plausible(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _append(repo) data = json.loads(_invoke(repo, "--json").output) assert data["duration_ms"] < 500