test_core_blame.py
file-level
1
files
1
commits
0
hotspots
0
🧊 dead
0
💥 blast risk
| 1 | """Tests for muse/core/blame.py — line-level text attribution.""" |
| 2 | |
| 3 | from __future__ import annotations |
| 4 | |
| 5 | import datetime |
| 6 | import json |
| 7 | import pathlib |
| 8 | |
| 9 | import pytest |
| 10 | |
| 11 | from muse.core.blame import BlameLine, blame_file |
| 12 | from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id |
| 13 | from muse.core.commits import ( |
| 14 | CommitRecord, |
| 15 | write_commit, |
| 16 | ) |
| 17 | from muse.core.snapshots import ( |
| 18 | SnapshotRecord, |
| 19 | write_snapshot, |
| 20 | ) |
| 21 | from muse.core.types import Manifest, blob_id |
| 22 | from muse.core.paths import muse_dir |
| 23 | |
| 24 | _BASE_DT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) |
| 25 | |
| 26 | |
| 27 | # --------------------------------------------------------------------------- |
| 28 | # Helpers |
| 29 | # --------------------------------------------------------------------------- |
| 30 | |
| 31 | |
| 32 | def _write_object(repo: pathlib.Path, content: bytes) -> str: |
| 33 | from muse.core.object_store import write_object |
| 34 | oid = blob_id(content) |
| 35 | write_object(repo, oid, content) |
| 36 | return oid |
| 37 | |
| 38 | |
| 39 | def _write_snapshot(repo: pathlib.Path, manifest: Manifest) -> str: |
| 40 | """Write a snapshot with a properly computed ID; return the snapshot ID.""" |
| 41 | snap_id = compute_snapshot_id(manifest) |
| 42 | write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 43 | return snap_id |
| 44 | |
| 45 | |
| 46 | def _write_commit( |
| 47 | repo: pathlib.Path, |
| 48 | snap_id: str, |
| 49 | message: str = "test", |
| 50 | parent: str | None = None, |
| 51 | author: str = "Author", |
| 52 | committed_at: datetime.datetime | None = None, |
| 53 | ) -> str: |
| 54 | """Write a commit with a properly computed ID; return the commit ID.""" |
| 55 | dt = committed_at if committed_at is not None else _BASE_DT |
| 56 | parent_ids = [parent] if parent else [] |
| 57 | commit_id = compute_commit_id( |
| 58 | parent_ids=parent_ids, |
| 59 | snapshot_id=snap_id, |
| 60 | message=message, |
| 61 | committed_at_iso=dt.isoformat(), |
| 62 | author=author, |
| 63 | ) |
| 64 | write_commit(repo, CommitRecord( |
| 65 | commit_id=commit_id, |
| 66 | branch="main", |
| 67 | snapshot_id=snap_id, |
| 68 | message=message, |
| 69 | committed_at=dt, |
| 70 | parent_commit_id=parent, |
| 71 | author=author, |
| 72 | )) |
| 73 | return commit_id |
| 74 | |
| 75 | |
| 76 | def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 77 | dot_muse = muse_dir(tmp_path) |
| 78 | for d in ("objects", "commits", "snapshots", "refs/heads"): |
| 79 | (dot_muse / d).mkdir(parents=True, exist_ok=True) |
| 80 | (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"})) |
| 81 | (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") |
| 82 | return tmp_path |
| 83 | |
| 84 | |
| 85 | # --------------------------------------------------------------------------- |
| 86 | # Tests |
| 87 | # --------------------------------------------------------------------------- |
| 88 | |
| 89 | |
| 90 | def test_blame_returns_none_for_missing_file(tmp_path: pathlib.Path) -> None: |
| 91 | repo = _make_repo(tmp_path) |
| 92 | snap_id = _write_snapshot(repo, {}) # empty manifest |
| 93 | commit_id = _write_commit(repo, snap_id) |
| 94 | |
| 95 | result = blame_file(repo, "nonexistent.txt", commit_id) |
| 96 | assert result is None |
| 97 | |
| 98 | |
| 99 | def test_blame_single_commit_all_lines_attributed(tmp_path: pathlib.Path) -> None: |
| 100 | repo = _make_repo(tmp_path) |
| 101 | content = b"line one\nline two\nline three\n" |
| 102 | obj_id = _write_object(repo, content) |
| 103 | snap_id = _write_snapshot(repo, {"readme.txt": obj_id}) |
| 104 | commit_id = _write_commit(repo, snap_id, message="initial commit", author="Alice") |
| 105 | |
| 106 | result = blame_file(repo, "readme.txt", commit_id) |
| 107 | assert result is not None |
| 108 | assert len(result) == 3 |
| 109 | for line in result: |
| 110 | assert isinstance(line, BlameLine) |
| 111 | assert line.commit_id == commit_id |
| 112 | |
| 113 | |
| 114 | def test_blame_line_numbers_are_1_indexed(tmp_path: pathlib.Path) -> None: |
| 115 | repo = _make_repo(tmp_path) |
| 116 | content = b"a\nb\nc\n" |
| 117 | obj_id = _write_object(repo, content) |
| 118 | snap_id = _write_snapshot(repo, {"f.txt": obj_id}) |
| 119 | commit_id = _write_commit(repo, snap_id) |
| 120 | |
| 121 | result = blame_file(repo, "f.txt", commit_id) |
| 122 | assert result is not None |
| 123 | assert [bl.lineno for bl in result] == [1, 2, 3] |
| 124 | |
| 125 | |
| 126 | def test_blame_content_matches_file(tmp_path: pathlib.Path) -> None: |
| 127 | repo = _make_repo(tmp_path) |
| 128 | content = b"hello\nworld\n" |
| 129 | obj_id = _write_object(repo, content) |
| 130 | snap_id = _write_snapshot(repo, {"f.txt": obj_id}) |
| 131 | commit_id = _write_commit(repo, snap_id) |
| 132 | |
| 133 | result = blame_file(repo, "f.txt", commit_id) |
| 134 | assert result is not None |
| 135 | assert result[0].content == "hello" |
| 136 | assert result[1].content == "world" |
| 137 | |
| 138 | |
| 139 | def test_blame_empty_file_returns_empty_list(tmp_path: pathlib.Path) -> None: |
| 140 | repo = _make_repo(tmp_path) |
| 141 | content = b"" |
| 142 | obj_id = _write_object(repo, content) |
| 143 | snap_id = _write_snapshot(repo, {"empty.txt": obj_id}) |
| 144 | commit_id = _write_commit(repo, snap_id) |
| 145 | |
| 146 | result = blame_file(repo, "empty.txt", commit_id) |
| 147 | assert result == [] |
| 148 | |
| 149 | |
| 150 | def test_blame_two_commits_attributes_older_lines_correctly(tmp_path: pathlib.Path) -> None: |
| 151 | """Lines present in both commits should be attributed to the older commit.""" |
| 152 | repo = _make_repo(tmp_path) |
| 153 | |
| 154 | # Commit 1: file with two lines. |
| 155 | content1 = b"original line 1\noriginal line 2\n" |
| 156 | obj1 = _write_object(repo, content1) |
| 157 | snap1 = _write_snapshot(repo, {"f.txt": obj1}) |
| 158 | commit1 = _write_commit( |
| 159 | repo, snap1, message="initial", author="Alice", |
| 160 | committed_at=_BASE_DT, |
| 161 | ) |
| 162 | |
| 163 | # Commit 2: same two lines + one new line. |
| 164 | content2 = b"original line 1\noriginal line 2\nnew line 3\n" |
| 165 | obj2 = _write_object(repo, content2) |
| 166 | snap2 = _write_snapshot(repo, {"f.txt": obj2}) |
| 167 | commit2 = _write_commit( |
| 168 | repo, snap2, message="add line 3", parent=commit1, author="Bob", |
| 169 | committed_at=_BASE_DT + datetime.timedelta(hours=1), |
| 170 | ) |
| 171 | |
| 172 | result = blame_file(repo, "f.txt", commit2) |
| 173 | assert result is not None |
| 174 | assert len(result) == 3 |
| 175 | # Lines 1 and 2 should be attributed to commit1 (they existed before commit2). |
| 176 | assert result[0].commit_id == commit1 |
| 177 | assert result[1].commit_id == commit1 |
| 178 | # Line 3 was added by commit2. |
| 179 | assert result[2].commit_id == commit2 |
| 180 | |
| 181 | |
| 182 | def test_blame_author_populated(tmp_path: pathlib.Path) -> None: |
| 183 | repo = _make_repo(tmp_path) |
| 184 | obj_id = _write_object(repo, b"line\n") |
| 185 | snap_id = _write_snapshot(repo, {"f.txt": obj_id}) |
| 186 | commit_id = _write_commit(repo, snap_id, author="Carol") |
| 187 | |
| 188 | result = blame_file(repo, "f.txt", commit_id) |
| 189 | assert result is not None |
| 190 | assert result[0].author == "Carol" |
| 191 | |
| 192 | |
| 193 | def test_blame_message_is_first_line_of_commit_message(tmp_path: pathlib.Path) -> None: |
| 194 | repo = _make_repo(tmp_path) |
| 195 | obj_id = _write_object(repo, b"line\n") |
| 196 | snap_id = _write_snapshot(repo, {"f.txt": obj_id}) |
| 197 | commit_id = _write_commit(repo, snap_id, message="feat: add feature\n\nLong body here.") |
| 198 | |
| 199 | result = blame_file(repo, "f.txt", commit_id) |
| 200 | assert result is not None |
| 201 | assert result[0].message == "feat: add feature" |
| 202 | |
| 203 | |
| 204 | # --------------------------------------------------------------------------- |
| 205 | # Stress |
| 206 | # --------------------------------------------------------------------------- |
| 207 | |
| 208 | |
| 209 | def test_blame_stress_100_line_file(tmp_path: pathlib.Path) -> None: |
| 210 | """Blame should handle a 100-line file without errors.""" |
| 211 | repo = _make_repo(tmp_path) |
| 212 | content = "\n".join(f"line {i}" for i in range(100)).encode() + b"\n" |
| 213 | obj_id = _write_object(repo, content) |
| 214 | snap_id = _write_snapshot(repo, {"big.txt": obj_id}) |
| 215 | commit_id = _write_commit(repo, snap_id) |
| 216 | |
| 217 | result = blame_file(repo, "big.txt", commit_id) |
| 218 | assert result is not None |
| 219 | assert len(result) == 100 |
| 220 | assert all(bl.commit_id == commit_id for bl in result) |
| 221 | |
| 222 | |
| 223 | # --------------------------------------------------------------------------- |
| 224 | # Performance |
| 225 | # --------------------------------------------------------------------------- |
| 226 | |
| 227 | |
| 228 | def test_walk_ancestry_delegates_to_iter_ancestors(tmp_path: pathlib.Path) -> None: |
| 229 | """_walk_ancestry must delegate to graph.iter_ancestors. |
| 230 | |
| 231 | The O(1) deque guarantee is provided by iter_ancestors (verified in |
| 232 | test_core_graph.py). This test confirms the delegation is in place so |
| 233 | _walk_ancestry cannot silently revert to a home-grown O(n) walk. |
| 234 | """ |
| 235 | import inspect |
| 236 | from muse.core import blame as blame_module |
| 237 | |
| 238 | source = inspect.getsource(blame_module._walk_ancestry) |
| 239 | assert "iter_ancestors" in source, "_walk_ancestry must delegate to graph.iter_ancestors" |
| 240 | assert "pop(0)" not in source, "_walk_ancestry must not use list.pop(0)" |
| 241 | assert "insert(0" not in source, "_walk_ancestry must not use list.insert(0, ...)" |
| 242 | |
| 243 | |
| 244 | def test_blame_skips_read_for_unchanged_commits(tmp_path: pathlib.Path) -> None: |
| 245 | """blame_file must skip snapshot reads when the file's object_id is unchanged. |
| 246 | |
| 247 | With 10 commits where the file only changes once, _read_file_at_commit |
| 248 | should be called at most twice (at the change boundary), not 10 times. |
| 249 | """ |
| 250 | from unittest.mock import patch |
| 251 | from muse.core import blame as blame_module |
| 252 | |
| 253 | repo = _make_repo(tmp_path) |
| 254 | |
| 255 | # Build a 10-commit chain where the file changes only on commit 5. |
| 256 | v1 = "\n".join(f"original line {i}" for i in range(5)).encode() + b"\n" |
| 257 | v2 = "\n".join(f"changed line {i}" for i in range(5)).encode() + b"\n" |
| 258 | |
| 259 | obj_v1 = _write_object(repo, v1) |
| 260 | obj_v2 = _write_object(repo, v2) |
| 261 | |
| 262 | prev = None |
| 263 | commit_ids = [] |
| 264 | for i in range(10): |
| 265 | obj = obj_v2 if i < 5 else obj_v1 # file changes at commit 5 |
| 266 | snap_id = _write_snapshot(repo, {"tracked.txt": obj}) |
| 267 | cid = _write_commit(repo, snap_id, message=f"c{i}", parent=prev) |
| 268 | commit_ids.append(cid) |
| 269 | prev = cid |
| 270 | |
| 271 | head = commit_ids[-1] |
| 272 | |
| 273 | call_count = 0 |
| 274 | original = blame_module._read_file_at_commit |
| 275 | |
| 276 | def counting_read(root: pathlib.Path, commit_id: str, rel_path: str) -> bytes | None: |
| 277 | nonlocal call_count |
| 278 | call_count += 1 |
| 279 | return original(root, commit_id, rel_path) |
| 280 | |
| 281 | with patch.object(blame_module, "_read_file_at_commit", side_effect=counting_read): |
| 282 | result = blame_file(repo, "tracked.txt", head) |
| 283 | |
| 284 | assert result is not None |
| 285 | # Should read at most once per distinct object_id (2 versions) plus the |
| 286 | # initial read, not once per commit in the chain (10). |
| 287 | assert call_count <= 4, ( |
| 288 | f"_read_file_at_commit called {call_count}× for 10 commits with " |
| 289 | "only 1 content change — unchanged commits should be skipped" |
| 290 | ) |
| 291 | |
| 292 | |
| 293 | class TestRegisterFlags: |
| 294 | def test_json_short_flag(self) -> None: |
| 295 | import argparse |
| 296 | from muse.cli.commands.core_blame import register |
| 297 | p = argparse.ArgumentParser() |
| 298 | subs = p.add_subparsers() |
| 299 | register(subs) |
| 300 | args = p.parse_args(['blame', 'file.py', '-j']) |
| 301 | assert args.json_out is True |
| 302 | |
| 303 | def test_json_long_flag(self) -> None: |
| 304 | import argparse |
| 305 | from muse.cli.commands.core_blame import register |
| 306 | p = argparse.ArgumentParser() |
| 307 | subs = p.add_subparsers() |
| 308 | register(subs) |
| 309 | args = p.parse_args(['blame', 'file.py', '--json']) |
| 310 | assert args.json_out is True |
| 311 | |
| 312 | def test_default_no_json(self) -> None: |
| 313 | import argparse |
| 314 | from muse.cli.commands.core_blame import register |
| 315 | p = argparse.ArgumentParser() |
| 316 | subs = p.add_subparsers() |
| 317 | register(subs) |
| 318 | # Command-specific required args may differ; just check dest exists when possible |
| 319 | try: |
| 320 | args = p.parse_args(['blame', 'file.py']) |
| 321 | assert args.json_out is False |
| 322 | except SystemExit: |
| 323 | pass # required positional args missing — flag default still correct |