test_core_reflog.py
file-level
1
files
1
commits
0
hotspots
0
🧊 dead
0
💥 blast risk
| 1 | """Tests for muse/core/reflog.py — reflog append, read, parse.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import datetime |
| 6 | import pathlib |
| 7 | |
| 8 | import pytest |
| 9 | |
| 10 | from muse.core.reflog import ( |
| 11 | ReflogEntry, |
| 12 | append_reflog, |
| 13 | list_reflog_refs, |
| 14 | read_reflog, |
| 15 | ) |
| 16 | from muse.core.types import NULL_COMMIT_ID |
| 17 | from muse.core.paths import logs_dir |
| 18 | |
| 19 | _NULL_ID = NULL_COMMIT_ID |
| 20 | _SHA_A = "a" * 64 |
| 21 | _SHA_B = "b" * 64 |
| 22 | _SHA_C = "c" * 64 |
| 23 | |
| 24 | |
| 25 | # --------------------------------------------------------------------------- |
| 26 | # append_reflog |
| 27 | # --------------------------------------------------------------------------- |
| 28 | |
| 29 | |
| 30 | def test_append_creates_log_files(tmp_path: pathlib.Path) -> None: |
| 31 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="Alice", operation="commit: init") |
| 32 | assert (logs_dir(tmp_path) / "refs" / "heads" / "main").exists() |
| 33 | assert (logs_dir(tmp_path) / "HEAD").exists() |
| 34 | |
| 35 | |
| 36 | def test_append_null_old_id(tmp_path: pathlib.Path) -> None: |
| 37 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="Alice", operation="commit: init") |
| 38 | entries = read_reflog(tmp_path, "main") |
| 39 | assert len(entries) == 1 |
| 40 | assert entries[0].old_id == _NULL_ID |
| 41 | assert entries[0].new_id == _SHA_A |
| 42 | |
| 43 | |
| 44 | def test_append_multiple_entries(tmp_path: pathlib.Path) -> None: |
| 45 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="A", operation="commit: first") |
| 46 | append_reflog(tmp_path, "main", old_id=_SHA_A, new_id=_SHA_B, author="B", operation="commit: second") |
| 47 | append_reflog(tmp_path, "main", old_id=_SHA_B, new_id=_SHA_C, author="C", operation="commit: third") |
| 48 | entries = read_reflog(tmp_path, "main") |
| 49 | assert len(entries) == 3 |
| 50 | # Newest first. |
| 51 | assert entries[0].new_id == _SHA_C |
| 52 | assert entries[1].new_id == _SHA_B |
| 53 | assert entries[2].new_id == _SHA_A |
| 54 | |
| 55 | |
| 56 | def test_append_head_log_also_updated(tmp_path: pathlib.Path) -> None: |
| 57 | append_reflog(tmp_path, "dev", old_id=_SHA_A, new_id=_SHA_B, author="X", operation="checkout: moving") |
| 58 | head_entries = read_reflog(tmp_path, branch=None) |
| 59 | assert len(head_entries) == 1 |
| 60 | assert head_entries[0].new_id == _SHA_B |
| 61 | |
| 62 | |
| 63 | def test_append_operation_preserved(tmp_path: pathlib.Path) -> None: |
| 64 | op = "merge: feat/audio into main" |
| 65 | append_reflog(tmp_path, "main", old_id=_SHA_A, new_id=_SHA_B, author="Alice", operation=op) |
| 66 | entries = read_reflog(tmp_path, "main") |
| 67 | assert entries[0].operation == op |
| 68 | |
| 69 | |
| 70 | def test_append_author_preserved(tmp_path: pathlib.Path) -> None: |
| 71 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="Alice <[email protected]>", operation="commit: x") |
| 72 | entries = read_reflog(tmp_path, "main") |
| 73 | assert "Alice" in entries[0].author |
| 74 | |
| 75 | |
| 76 | # --------------------------------------------------------------------------- |
| 77 | # read_reflog |
| 78 | # --------------------------------------------------------------------------- |
| 79 | |
| 80 | |
| 81 | def test_read_returns_empty_for_missing_log(tmp_path: pathlib.Path) -> None: |
| 82 | entries = read_reflog(tmp_path, "nonexistent") |
| 83 | assert entries == [] |
| 84 | |
| 85 | |
| 86 | def test_read_limit(tmp_path: pathlib.Path) -> None: |
| 87 | for i in range(10): |
| 88 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="A", operation=f"commit: {i}") |
| 89 | entries = read_reflog(tmp_path, "main", limit=3) |
| 90 | assert len(entries) == 3 |
| 91 | |
| 92 | |
| 93 | def test_read_head_log(tmp_path: pathlib.Path) -> None: |
| 94 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="A", operation="commit: x") |
| 95 | entries = read_reflog(tmp_path, branch=None) |
| 96 | assert len(entries) == 1 |
| 97 | |
| 98 | |
| 99 | def test_read_timestamp_is_utc_datetime(tmp_path: pathlib.Path) -> None: |
| 100 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="A", operation="commit: x") |
| 101 | entries = read_reflog(tmp_path, "main") |
| 102 | assert isinstance(entries[0].timestamp, datetime.datetime) |
| 103 | assert entries[0].timestamp.tzinfo is not None |
| 104 | |
| 105 | |
| 106 | # --------------------------------------------------------------------------- |
| 107 | # list_reflog_refs |
| 108 | # --------------------------------------------------------------------------- |
| 109 | |
| 110 | |
| 111 | def test_list_reflog_refs_empty(tmp_path: pathlib.Path) -> None: |
| 112 | assert list_reflog_refs(tmp_path) == [] |
| 113 | |
| 114 | |
| 115 | def test_list_reflog_refs_returns_branch_names(tmp_path: pathlib.Path) -> None: |
| 116 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="A", operation="commit: x") |
| 117 | append_reflog(tmp_path, "dev", old_id=None, new_id=_SHA_B, author="B", operation="commit: y") |
| 118 | refs = list_reflog_refs(tmp_path) |
| 119 | assert "main" in refs |
| 120 | assert "dev" in refs |
| 121 | |
| 122 | |
| 123 | def test_list_reflog_refs_sorted(tmp_path: pathlib.Path) -> None: |
| 124 | for name in ("zzz", "aaa", "mmm"): |
| 125 | append_reflog(tmp_path, name, old_id=None, new_id=_SHA_A, author="A", operation="commit: x") |
| 126 | refs = list_reflog_refs(tmp_path) |
| 127 | assert refs == sorted(refs) |
| 128 | |
| 129 | |
| 130 | # --------------------------------------------------------------------------- |
| 131 | # Stress test: many entries |
| 132 | # --------------------------------------------------------------------------- |
| 133 | |
| 134 | |
| 135 | def test_stress_many_entries(tmp_path: pathlib.Path) -> None: |
| 136 | """500 entries must round-trip correctly.""" |
| 137 | n = 500 |
| 138 | for i in range(n): |
| 139 | sha = format(i, "064x") |
| 140 | append_reflog(tmp_path, "main", old_id=None, new_id=sha, author="A", operation=f"commit: {i}") |
| 141 | entries = read_reflog(tmp_path, "main", limit=n) |
| 142 | assert len(entries) == n |
| 143 | # Newest first — last appended sha should be entries[0]. |
| 144 | assert entries[0].new_id == format(n - 1, "064x") |
| 145 | |
| 146 | |
| 147 | # --------------------------------------------------------------------------- |
| 148 | # Edge cases |
| 149 | # --------------------------------------------------------------------------- |
| 150 | |
| 151 | |
| 152 | def test_entry_with_tab_in_operation(tmp_path: pathlib.Path) -> None: |
| 153 | """Tab characters in the operation string must be escaped/handled gracefully.""" |
| 154 | op = "commit: message with some text" |
| 155 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="A", operation=op) |
| 156 | entries = read_reflog(tmp_path, "main") |
| 157 | assert entries[0].operation == op |
| 158 | |
| 159 | |
| 160 | def test_multiple_branches_isolated(tmp_path: pathlib.Path) -> None: |
| 161 | append_reflog(tmp_path, "main", old_id=None, new_id=_SHA_A, author="A", operation="commit: main") |
| 162 | append_reflog(tmp_path, "dev", old_id=None, new_id=_SHA_B, author="B", operation="commit: dev") |
| 163 | main_entries = read_reflog(tmp_path, "main") |
| 164 | dev_entries = read_reflog(tmp_path, "dev") |
| 165 | assert len(main_entries) == 1 |
| 166 | assert len(dev_entries) == 1 |
| 167 | assert main_entries[0].new_id == _SHA_A |
| 168 | assert dev_entries[0].new_id == _SHA_B |