test_reflog_supercharge.py
python
sha256:d11a87833d5fad6059b7662844bf5448a8911a17cce7a51811f71ad394f248eb
bump to v0.2.0rc13
Human
patch
6 days ago
| 1 | """Supercharge tests for ``muse reflog``. |
| 2 | |
| 3 | Coverage tiers |
| 4 | -------------- |
| 5 | - Unit: _short_id helper — bare hex and sha256:-prefixed inputs |
| 6 | - Integration: duration_ms + exit_code in both JSON output paths |
| 7 | - Data integrity: new_id/old_id sha256:-prefixed in JSON; text short IDs |
| 8 | - Filter behaviour: total reflects post-filter count; date range edge cases |
| 9 | - Security: null-ID shown as sha256:000…, ANSI in IDs sanitised in text |
| 10 | - Performance: empty reflog and 100-entry reflog timing |
| 11 | """ |
| 12 | from __future__ import annotations |
| 13 | |
| 14 | import datetime |
| 15 | import json |
| 16 | import pathlib |
| 17 | import re |
| 18 | import time |
| 19 | |
| 20 | from muse.core.errors import ExitCode |
| 21 | from muse.core.reflog import append_reflog |
| 22 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 23 | from muse.core.types import NULL_COMMIT_ID, NULL_LONG_ID, long_id, fake_id, short_id as _short_id |
| 24 | from muse.core.paths import logs_dir, muse_dir |
| 25 | |
| 26 | runner = CliRunner() |
| 27 | |
| 28 | _NULL_ID = NULL_COMMIT_ID |
| 29 | _SHA_A = long_id("a" * 64) |
| 30 | _SHA_B = long_id("b" * 64) |
| 31 | |
| 32 | _SHA256_FULL = re.compile(r"^sha256:[0-9a-f]{64}$") |
| 33 | _SHA256_SHORT_19 = re.compile(r"^sha256:[0-9a-f]{12}$") |
| 34 | |
| 35 | _TS = datetime.datetime(2026, 1, 15, 12, 0, tzinfo=datetime.timezone.utc) |
| 36 | |
| 37 | |
| 38 | # --------------------------------------------------------------------------- |
| 39 | # Helpers |
| 40 | # --------------------------------------------------------------------------- |
| 41 | |
| 42 | |
| 43 | def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 44 | repo = tmp_path / "repo" |
| 45 | dot_muse = muse_dir(repo) |
| 46 | for sub in ("objects", "commits", "snapshots", "refs/heads", |
| 47 | "logs/refs/heads", "logs"): |
| 48 | (dot_muse / sub).mkdir(parents=True, exist_ok=True) |
| 49 | (dot_muse / "HEAD").write_text("ref: refs/heads/main") |
| 50 | (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo", "domain": "code"})) |
| 51 | return repo |
| 52 | |
| 53 | |
| 54 | def _append( |
| 55 | repo: pathlib.Path, |
| 56 | *, |
| 57 | branch: str = "main", |
| 58 | old_id: str = _NULL_ID, |
| 59 | new_id: str = _SHA_A, |
| 60 | author: str = "gabriel", |
| 61 | operation: str = "commit: test", |
| 62 | timestamp: datetime.datetime | None = None, |
| 63 | ) -> None: |
| 64 | """Write one reflog entry. |
| 65 | |
| 66 | When *timestamp* is given, write the raw log line directly so tests can |
| 67 | control the stored timestamp precisely. Otherwise delegate to |
| 68 | ``append_reflog`` which stamps with the current time. |
| 69 | """ |
| 70 | if timestamp is None: |
| 71 | append_reflog(repo, branch, old_id=old_id, new_id=new_id, |
| 72 | author=author, operation=operation) |
| 73 | return |
| 74 | # Write raw log line to both HEAD and branch logs (mirrors append_reflog). |
| 75 | ts_unix = int(timestamp.timestamp()) |
| 76 | safe_op = operation.replace("\n", "").replace("\r", "") |
| 77 | safe_author = author.replace("\n", "").replace("\r", "").replace("\t", "") |
| 78 | line = f"{old_id} {new_id} {safe_author} {ts_unix} +0000\t{safe_op}\n" |
| 79 | log_dir = logs_dir(repo) |
| 80 | head_log = log_dir / "HEAD" |
| 81 | head_log.write_text((head_log.read_text(encoding="utf-8") if head_log.exists() else "") + line, |
| 82 | encoding="utf-8") |
| 83 | if branch: |
| 84 | branch_log = log_dir / "refs" / "heads" / branch |
| 85 | branch_log.write_text( |
| 86 | (branch_log.read_text(encoding="utf-8") if branch_log.exists() else "") + line, |
| 87 | encoding="utf-8", |
| 88 | ) |
| 89 | |
| 90 | |
| 91 | def _invoke(repo: pathlib.Path, *args: str) -> InvokeResult: |
| 92 | from muse.cli.app import main as cli |
| 93 | return runner.invoke(cli, ["reflog", *args], env={"MUSE_REPO_ROOT": str(repo)}) |
| 94 | |
| 95 | |
| 96 | # --------------------------------------------------------------------------- |
| 97 | # Unit — _short_id |
| 98 | # --------------------------------------------------------------------------- |
| 99 | |
| 100 | |
| 101 | class TestShortId: |
| 102 | """_short_id handles both bare-hex (reflog on-disk format) and sha256:-prefixed input.""" |
| 103 | |
| 104 | def test_bare_hex_prepends_sha256_prefix(self) -> None: |
| 105 | result = _short_id(_SHA_A) |
| 106 | assert result.startswith("sha256:") |
| 107 | |
| 108 | def test_bare_hex_12_hex_chars_after_prefix(self) -> None: |
| 109 | result = _short_id(_SHA_A) |
| 110 | assert result == long_id("a" * 12) |
| 111 | |
| 112 | def test_bare_hex_total_length_is_19(self) -> None: |
| 113 | assert len(_short_id(_SHA_B)) == 19 |
| 114 | |
| 115 | def test_sha256_prefixed_input_handled(self) -> None: |
| 116 | prefixed = long_id("deadbeef" * 8) |
| 117 | result = _short_id(prefixed) |
| 118 | assert result.startswith("sha256:") |
| 119 | assert len(result) == 19 |
| 120 | |
| 121 | def test_null_id_shows_zeros(self) -> None: |
| 122 | result = _short_id(_NULL_ID) |
| 123 | assert result == "0" * 12 |
| 124 | |
| 125 | def test_matches_short_regex(self) -> None: |
| 126 | assert _SHA256_SHORT_19.match(_short_id(_SHA_A)) |
| 127 | |
| 128 | |
| 129 | # --------------------------------------------------------------------------- |
| 130 | # Integration — text format short IDs |
| 131 | # --------------------------------------------------------------------------- |
| 132 | |
| 133 | |
| 134 | class TestTextFormatShortId: |
| 135 | """Text format must show sha256:<12-hex> for new_id and old_id.""" |
| 136 | |
| 137 | def _short_tokens(self, line: str) -> list[str]: |
| 138 | return [tok for tok in line.split() if _SHA256_SHORT_19.match(tok)] |
| 139 | |
| 140 | def test_new_id_shown_as_sha256_short_in_text(self, tmp_path: pathlib.Path) -> None: |
| 141 | repo = _make_repo(tmp_path) |
| 142 | _append(repo, new_id=_SHA_A) |
| 143 | result = _invoke(repo) |
| 144 | assert result.exit_code == 0 |
| 145 | tokens = self._short_tokens(result.output) |
| 146 | assert any(t.startswith(long_id("a" * 12)) for t in tokens), \ |
| 147 | f"no sha256:aaa… token in text output:\n{result.output}" |
| 148 | |
| 149 | def test_old_id_shown_as_sha256_short_in_text(self, tmp_path: pathlib.Path) -> None: |
| 150 | repo = _make_repo(tmp_path) |
| 151 | _append(repo, old_id=_SHA_B, new_id=_SHA_A) |
| 152 | result = _invoke(repo) |
| 153 | assert result.exit_code == 0 |
| 154 | assert long_id("b" * 12) in result.output, \ |
| 155 | f"sha256:bbb… not in text output:\n{result.output}" |
| 156 | |
| 157 | def test_initial_entry_shows_initial_keyword(self, tmp_path: pathlib.Path) -> None: |
| 158 | """Null old_id must render as 'initial', not sha256:000….""" |
| 159 | repo = _make_repo(tmp_path) |
| 160 | _append(repo, old_id=_NULL_ID) |
| 161 | result = _invoke(repo) |
| 162 | assert "initial" in result.output |
| 163 | |
| 164 | def test_text_short_id_length_is_19(self, tmp_path: pathlib.Path) -> None: |
| 165 | repo = _make_repo(tmp_path) |
| 166 | _append(repo, new_id=_SHA_A) |
| 167 | result = _invoke(repo) |
| 168 | tokens = self._short_tokens(result.output) |
| 169 | for tok in tokens: |
| 170 | assert len(tok) == 19, f"short ID token has wrong length: {tok!r}" |
| 171 | |
| 172 | |
| 173 | # --------------------------------------------------------------------------- |
| 174 | # Data integrity — JSON IDs |
| 175 | # --------------------------------------------------------------------------- |
| 176 | |
| 177 | |
| 178 | class TestJsonIds: |
| 179 | """JSON new_id / old_id must be sha256:<64-hex> canonical form.""" |
| 180 | |
| 181 | def test_new_id_sha256_prefixed_in_json(self, tmp_path: pathlib.Path) -> None: |
| 182 | repo = _make_repo(tmp_path) |
| 183 | _append(repo, new_id=_SHA_A) |
| 184 | data = json.loads(_invoke(repo, "--json").output) |
| 185 | entry = data["entries"][0] |
| 186 | assert entry["new_id"].startswith("sha256:"), \ |
| 187 | f"new_id must have sha256: prefix, got {entry['new_id']!r}" |
| 188 | |
| 189 | def test_new_id_is_full_sha256_in_json(self, tmp_path: pathlib.Path) -> None: |
| 190 | repo = _make_repo(tmp_path) |
| 191 | _append(repo, new_id=_SHA_A) |
| 192 | entry = json.loads(_invoke(repo, "--json").output)["entries"][0] |
| 193 | assert _SHA256_FULL.match(entry["new_id"]), \ |
| 194 | f"new_id must be sha256:<64hex>, got {entry['new_id']!r}" |
| 195 | |
| 196 | def test_old_id_sha256_prefixed_in_json(self, tmp_path: pathlib.Path) -> None: |
| 197 | repo = _make_repo(tmp_path) |
| 198 | _append(repo, old_id=_SHA_B, new_id=_SHA_A) |
| 199 | entry = json.loads(_invoke(repo, "--json").output)["entries"][0] |
| 200 | assert entry["old_id"].startswith("sha256:") |
| 201 | |
| 202 | def test_old_id_is_full_sha256_in_json(self, tmp_path: pathlib.Path) -> None: |
| 203 | repo = _make_repo(tmp_path) |
| 204 | _append(repo, old_id=_SHA_B, new_id=_SHA_A) |
| 205 | entry = json.loads(_invoke(repo, "--json").output)["entries"][0] |
| 206 | assert _SHA256_FULL.match(entry["old_id"]) |
| 207 | |
| 208 | def test_null_old_id_sha256_zeros_in_json(self, tmp_path: pathlib.Path) -> None: |
| 209 | """Initial commit: old_id = sha256:0000…0000 (64 zeros).""" |
| 210 | repo = _make_repo(tmp_path) |
| 211 | _append(repo, old_id=_NULL_ID) |
| 212 | entry = json.loads(_invoke(repo, "--json").output)["entries"][0] |
| 213 | assert entry["old_id"] == NULL_LONG_ID |
| 214 | |
| 215 | def test_new_id_value_round_trips(self, tmp_path: pathlib.Path) -> None: |
| 216 | """sha256: prefix wraps the exact bare hex stored in the reflog.""" |
| 217 | repo = _make_repo(tmp_path) |
| 218 | _append(repo, new_id=_SHA_B) |
| 219 | entry = json.loads(_invoke(repo, "--json").output)["entries"][0] |
| 220 | assert entry["new_id"] == long_id(_SHA_B) |
| 221 | |
| 222 | |
| 223 | # --------------------------------------------------------------------------- |
| 224 | # Integration — duration_ms and exit_code |
| 225 | # --------------------------------------------------------------------------- |
| 226 | |
| 227 | |
| 228 | class TestDurationAndExitCode: |
| 229 | def test_duration_ms_present_in_json(self, tmp_path: pathlib.Path) -> None: |
| 230 | repo = _make_repo(tmp_path) |
| 231 | _append(repo) |
| 232 | data = json.loads(_invoke(repo, "--json").output) |
| 233 | assert "duration_ms" in data |
| 234 | |
| 235 | def test_exit_code_zero_on_success(self, tmp_path: pathlib.Path) -> None: |
| 236 | repo = _make_repo(tmp_path) |
| 237 | _append(repo) |
| 238 | data = json.loads(_invoke(repo, "--json").output) |
| 239 | assert data["exit_code"] == 0 |
| 240 | |
| 241 | def test_duration_ms_is_float(self, tmp_path: pathlib.Path) -> None: |
| 242 | repo = _make_repo(tmp_path) |
| 243 | _append(repo) |
| 244 | data = json.loads(_invoke(repo, "--json").output) |
| 245 | assert isinstance(data["duration_ms"], float) |
| 246 | |
| 247 | def test_duration_ms_non_negative(self, tmp_path: pathlib.Path) -> None: |
| 248 | repo = _make_repo(tmp_path) |
| 249 | _append(repo) |
| 250 | assert json.loads(_invoke(repo, "--json").output)["duration_ms"] >= 0.0 |
| 251 | |
| 252 | def test_duration_ms_3dp_precision(self, tmp_path: pathlib.Path) -> None: |
| 253 | repo = _make_repo(tmp_path) |
| 254 | _append(repo) |
| 255 | ms = json.loads(_invoke(repo, "--json").output)["duration_ms"] |
| 256 | assert round(ms, 3) == ms |
| 257 | |
| 258 | def test_all_json_has_duration_ms(self, tmp_path: pathlib.Path) -> None: |
| 259 | repo = _make_repo(tmp_path) |
| 260 | _append(repo, branch="main") |
| 261 | data = json.loads(_invoke(repo, "--all", "--json").output) |
| 262 | assert "duration_ms" in data |
| 263 | assert data["exit_code"] == 0 |
| 264 | |
| 265 | def test_filtered_json_has_duration_ms(self, tmp_path: pathlib.Path) -> None: |
| 266 | repo = _make_repo(tmp_path) |
| 267 | _append(repo, operation="commit: feature") |
| 268 | _append(repo, operation="checkout: dev", |
| 269 | timestamp=_TS + datetime.timedelta(seconds=1)) |
| 270 | data = json.loads(_invoke(repo, "--json", "--operation", "commit").output) |
| 271 | assert "duration_ms" in data |
| 272 | assert data["exit_code"] == 0 |
| 273 | |
| 274 | def test_empty_reflog_json_has_duration_ms(self, tmp_path: pathlib.Path) -> None: |
| 275 | """Even with no entries the JSON output must include duration_ms.""" |
| 276 | repo = _make_repo(tmp_path) |
| 277 | data = json.loads(_invoke(repo, "--json").output) |
| 278 | assert "duration_ms" in data |
| 279 | assert data["exit_code"] == 0 |
| 280 | |
| 281 | |
| 282 | # --------------------------------------------------------------------------- |
| 283 | # Filter behaviour |
| 284 | # --------------------------------------------------------------------------- |
| 285 | |
| 286 | |
| 287 | class TestFilterBehaviour: |
| 288 | def test_total_reflects_post_filter_count(self, tmp_path: pathlib.Path) -> None: |
| 289 | """total in JSON is the number of entries that pass all filters, |
| 290 | before --limit is applied.""" |
| 291 | repo = _make_repo(tmp_path) |
| 292 | for i in range(5): |
| 293 | _append(repo, operation="commit: work", |
| 294 | timestamp=_TS + datetime.timedelta(seconds=i)) |
| 295 | for i in range(3): |
| 296 | _append(repo, operation="checkout: branch", |
| 297 | timestamp=_TS + datetime.timedelta(seconds=10 + i)) |
| 298 | data = json.loads(_invoke(repo, "--json", "--operation", "commit", "--limit", "2").output) |
| 299 | assert data["total"] == 5, "total must count all matching entries, not just displayed" |
| 300 | assert len(data["entries"]) == 2, "entries must be capped by --limit" |
| 301 | |
| 302 | def test_since_until_single_day(self, tmp_path: pathlib.Path) -> None: |
| 303 | """--since and --until set to same day returns entries on that day.""" |
| 304 | repo = _make_repo(tmp_path) |
| 305 | day = datetime.datetime(2026, 3, 10, tzinfo=datetime.timezone.utc) |
| 306 | _append(repo, operation="commit: on-day", timestamp=day) |
| 307 | _append(repo, operation="commit: day-before", |
| 308 | timestamp=day - datetime.timedelta(days=1)) |
| 309 | _append(repo, operation="commit: day-after", |
| 310 | timestamp=day + datetime.timedelta(days=1)) |
| 311 | data = json.loads( |
| 312 | _invoke(repo, "--json", "--since", "2026-03-10", "--until", "2026-03-10").output |
| 313 | ) |
| 314 | assert data["total"] == 1 |
| 315 | assert data["entries"][0]["operation"] == "commit: on-day" |
| 316 | |
| 317 | def test_since_after_until_errors(self, tmp_path: pathlib.Path) -> None: |
| 318 | """--since after --until must exit USER_ERROR.""" |
| 319 | repo = _make_repo(tmp_path) |
| 320 | result = _invoke(repo, "--since", "2026-06-01", "--until", "2026-01-01") |
| 321 | assert result.exit_code == ExitCode.USER_ERROR |
| 322 | |
| 323 | def test_limit_applied_after_all_filters(self, tmp_path: pathlib.Path) -> None: |
| 324 | """--limit caps displayed entries but total reflects full filtered count.""" |
| 325 | repo = _make_repo(tmp_path) |
| 326 | for i in range(10): |
| 327 | _append(repo, operation="commit: x", |
| 328 | timestamp=_TS + datetime.timedelta(seconds=i)) |
| 329 | data = json.loads(_invoke(repo, "--json", "--limit", "3").output) |
| 330 | assert data["total"] == 10 |
| 331 | assert len(data["entries"]) == 3 |
| 332 | assert data["limit"] == 3 |
| 333 | |
| 334 | def test_operation_and_author_filters_combined(self, tmp_path: pathlib.Path) -> None: |
| 335 | repo = _make_repo(tmp_path) |
| 336 | _append(repo, author="alice", operation="commit: feature") |
| 337 | _append(repo, author="bob", operation="commit: feature", |
| 338 | timestamp=_TS + datetime.timedelta(seconds=1)) |
| 339 | _append(repo, author="alice", operation="checkout: main", |
| 340 | timestamp=_TS + datetime.timedelta(seconds=2)) |
| 341 | data = json.loads( |
| 342 | _invoke(repo, "--json", "--operation", "commit", "--author", "alice").output |
| 343 | ) |
| 344 | assert data["total"] == 1 |
| 345 | assert data["entries"][0]["author"] == "alice" |
| 346 | assert "commit" in data["entries"][0]["operation"] |
| 347 | |
| 348 | |
| 349 | # --------------------------------------------------------------------------- |
| 350 | # Security |
| 351 | # --------------------------------------------------------------------------- |
| 352 | |
| 353 | |
| 354 | class TestSecuritySupercharge: |
| 355 | def test_ansi_in_new_id_sanitized_in_text(self, tmp_path: pathlib.Path) -> None: |
| 356 | """ANSI in a stored new_id is stripped before terminal output.""" |
| 357 | repo = _make_repo(tmp_path) |
| 358 | malicious_id = f"\x1b[31m{'a' * 60}" # starts with ANSI, then hex |
| 359 | _append(repo, new_id=malicious_id) |
| 360 | result = _invoke(repo) |
| 361 | assert result.exit_code == 0 |
| 362 | assert "\x1b" not in result.output |
| 363 | |
| 364 | def test_no_traceback_on_bad_format(self, tmp_path: pathlib.Path) -> None: |
| 365 | repo = _make_repo(tmp_path) |
| 366 | result = _invoke(repo, "--format", "msgpack") |
| 367 | assert result.exit_code in (ExitCode.USER_ERROR, 2) |
| 368 | assert "Traceback" not in result.output |
| 369 | |
| 370 | def test_no_traceback_on_bad_date(self, tmp_path: pathlib.Path) -> None: |
| 371 | repo = _make_repo(tmp_path) |
| 372 | result = _invoke(repo, "--since", "not-a-date") |
| 373 | assert result.exit_code == ExitCode.USER_ERROR |
| 374 | assert "Traceback" not in result.output |
| 375 | |
| 376 | |
| 377 | # --------------------------------------------------------------------------- |
| 378 | # Performance |
| 379 | # --------------------------------------------------------------------------- |
| 380 | |
| 381 | |
| 382 | class TestPerformanceSupercharge: |
| 383 | def test_empty_reflog_under_100ms(self, tmp_path: pathlib.Path) -> None: |
| 384 | repo = _make_repo(tmp_path) |
| 385 | t0 = time.monotonic() |
| 386 | result = _invoke(repo, "--json") |
| 387 | duration_ms = (time.monotonic() - t0) * 1000 |
| 388 | assert result.exit_code == 0 |
| 389 | assert duration_ms < 100 |
| 390 | |
| 391 | def test_100_entries_under_500ms(self, tmp_path: pathlib.Path) -> None: |
| 392 | repo = _make_repo(tmp_path) |
| 393 | for i in range(100): |
| 394 | _append(repo, operation=f"commit: entry {i}", |
| 395 | timestamp=_TS + datetime.timedelta(seconds=i)) |
| 396 | t0 = time.monotonic() |
| 397 | result = _invoke(repo, "--json", "--limit", "100") |
| 398 | duration_ms = (time.monotonic() - t0) * 1000 |
| 399 | assert result.exit_code == 0 |
| 400 | assert duration_ms < 500 |
| 401 | |
| 402 | def test_duration_ms_plausible(self, tmp_path: pathlib.Path) -> None: |
| 403 | repo = _make_repo(tmp_path) |
| 404 | _append(repo) |
| 405 | data = json.loads(_invoke(repo, "--json").output) |
| 406 | assert data["duration_ms"] < 500 |
File History
1 commit
sha256:d11a87833d5fad6059b7662844bf5448a8911a17cce7a51811f71ad394f248eb
bump to v0.2.0rc13
Human
patch
6 days ago