"""Tests for muse/core/blame.py — line-level text attribution.""" from __future__ import annotations import datetime import json import pathlib import pytest from muse.core.blame import BlameLine, blame_file from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, blob_id from muse.core.paths import muse_dir _BASE_DT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _write_object(repo: pathlib.Path, content: bytes) -> str: from muse.core.object_store import write_object oid = blob_id(content) write_object(repo, oid, content) return oid def _write_snapshot(repo: pathlib.Path, manifest: Manifest) -> str: """Write a snapshot with a properly computed ID; return the snapshot ID.""" snap_id = compute_snapshot_id(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) return snap_id def _write_commit( repo: pathlib.Path, snap_id: str, message: str = "test", parent: str | None = None, author: str = "Author", committed_at: datetime.datetime | None = None, ) -> str: """Write a commit with a properly computed ID; return the commit ID.""" dt = committed_at if committed_at is not None else _BASE_DT parent_ids = [parent] if parent else [] commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=message, committed_at_iso=dt.isoformat(), author=author, ) write_commit(repo, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=message, committed_at=dt, parent_commit_id=parent, author=author, )) return commit_id def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(tmp_path) for d in ("objects", "commits", "snapshots", "refs/heads"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"})) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") return tmp_path # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- def test_blame_returns_none_for_missing_file(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) snap_id = _write_snapshot(repo, {}) # empty manifest commit_id = _write_commit(repo, snap_id) result = blame_file(repo, "nonexistent.txt", commit_id) assert result is None def test_blame_single_commit_all_lines_attributed(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) content = b"line one\nline two\nline three\n" obj_id = _write_object(repo, content) snap_id = _write_snapshot(repo, {"readme.txt": obj_id}) commit_id = _write_commit(repo, snap_id, message="initial commit", author="Alice") result = blame_file(repo, "readme.txt", commit_id) assert result is not None assert len(result) == 3 for line in result: assert isinstance(line, BlameLine) assert line.commit_id == commit_id def test_blame_line_numbers_are_1_indexed(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) content = b"a\nb\nc\n" obj_id = _write_object(repo, content) snap_id = _write_snapshot(repo, {"f.txt": obj_id}) commit_id = _write_commit(repo, snap_id) result = blame_file(repo, "f.txt", commit_id) assert result is not None assert [bl.lineno for bl in result] == [1, 2, 3] def test_blame_content_matches_file(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) content = b"hello\nworld\n" obj_id = _write_object(repo, content) snap_id = _write_snapshot(repo, {"f.txt": obj_id}) commit_id = _write_commit(repo, snap_id) result = blame_file(repo, "f.txt", commit_id) assert result is not None assert result[0].content == "hello" assert result[1].content == "world" def test_blame_empty_file_returns_empty_list(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) content = b"" obj_id = _write_object(repo, content) snap_id = _write_snapshot(repo, {"empty.txt": obj_id}) commit_id = _write_commit(repo, snap_id) result = blame_file(repo, "empty.txt", commit_id) assert result == [] def test_blame_two_commits_attributes_older_lines_correctly(tmp_path: pathlib.Path) -> None: """Lines present in both commits should be attributed to the older commit.""" repo = _make_repo(tmp_path) # Commit 1: file with two lines. content1 = b"original line 1\noriginal line 2\n" obj1 = _write_object(repo, content1) snap1 = _write_snapshot(repo, {"f.txt": obj1}) commit1 = _write_commit( repo, snap1, message="initial", author="Alice", committed_at=_BASE_DT, ) # Commit 2: same two lines + one new line. content2 = b"original line 1\noriginal line 2\nnew line 3\n" obj2 = _write_object(repo, content2) snap2 = _write_snapshot(repo, {"f.txt": obj2}) commit2 = _write_commit( repo, snap2, message="add line 3", parent=commit1, author="Bob", committed_at=_BASE_DT + datetime.timedelta(hours=1), ) result = blame_file(repo, "f.txt", commit2) assert result is not None assert len(result) == 3 # Lines 1 and 2 should be attributed to commit1 (they existed before commit2). assert result[0].commit_id == commit1 assert result[1].commit_id == commit1 # Line 3 was added by commit2. assert result[2].commit_id == commit2 def test_blame_author_populated(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) obj_id = _write_object(repo, b"line\n") snap_id = _write_snapshot(repo, {"f.txt": obj_id}) commit_id = _write_commit(repo, snap_id, author="Carol") result = blame_file(repo, "f.txt", commit_id) assert result is not None assert result[0].author == "Carol" def test_blame_message_is_first_line_of_commit_message(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) obj_id = _write_object(repo, b"line\n") snap_id = _write_snapshot(repo, {"f.txt": obj_id}) commit_id = _write_commit(repo, snap_id, message="feat: add feature\n\nLong body here.") result = blame_file(repo, "f.txt", commit_id) assert result is not None assert result[0].message == "feat: add feature" # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- def test_blame_stress_100_line_file(tmp_path: pathlib.Path) -> None: """Blame should handle a 100-line file without errors.""" repo = _make_repo(tmp_path) content = "\n".join(f"line {i}" for i in range(100)).encode() + b"\n" obj_id = _write_object(repo, content) snap_id = _write_snapshot(repo, {"big.txt": obj_id}) commit_id = _write_commit(repo, snap_id) result = blame_file(repo, "big.txt", commit_id) assert result is not None assert len(result) == 100 assert all(bl.commit_id == commit_id for bl in result) # --------------------------------------------------------------------------- # Performance # --------------------------------------------------------------------------- def test_walk_ancestry_delegates_to_iter_ancestors(tmp_path: pathlib.Path) -> None: """_walk_ancestry must delegate to graph.iter_ancestors. The O(1) deque guarantee is provided by iter_ancestors (verified in test_core_graph.py). This test confirms the delegation is in place so _walk_ancestry cannot silently revert to a home-grown O(n) walk. """ import inspect from muse.core import blame as blame_module source = inspect.getsource(blame_module._walk_ancestry) assert "iter_ancestors" in source, "_walk_ancestry must delegate to graph.iter_ancestors" assert "pop(0)" not in source, "_walk_ancestry must not use list.pop(0)" assert "insert(0" not in source, "_walk_ancestry must not use list.insert(0, ...)" def test_blame_skips_read_for_unchanged_commits(tmp_path: pathlib.Path) -> None: """blame_file must skip snapshot reads when the file's object_id is unchanged. With 10 commits where the file only changes once, _read_file_at_commit should be called at most twice (at the change boundary), not 10 times. """ from unittest.mock import patch from muse.core import blame as blame_module repo = _make_repo(tmp_path) # Build a 10-commit chain where the file changes only on commit 5. v1 = "\n".join(f"original line {i}" for i in range(5)).encode() + b"\n" v2 = "\n".join(f"changed line {i}" for i in range(5)).encode() + b"\n" obj_v1 = _write_object(repo, v1) obj_v2 = _write_object(repo, v2) prev = None commit_ids = [] for i in range(10): obj = obj_v2 if i < 5 else obj_v1 # file changes at commit 5 snap_id = _write_snapshot(repo, {"tracked.txt": obj}) cid = _write_commit(repo, snap_id, message=f"c{i}", parent=prev) commit_ids.append(cid) prev = cid head = commit_ids[-1] call_count = 0 original = blame_module._read_file_at_commit def counting_read(root: pathlib.Path, commit_id: str, rel_path: str) -> bytes | None: nonlocal call_count call_count += 1 return original(root, commit_id, rel_path) with patch.object(blame_module, "_read_file_at_commit", side_effect=counting_read): result = blame_file(repo, "tracked.txt", head) assert result is not None # Should read at most once per distinct object_id (2 versions) plus the # initial read, not once per commit in the chain (10). assert call_count <= 4, ( f"_read_file_at_commit called {call_count}× for 10 commits with " "only 1 content change — unchanged commits should be skipped" ) class TestRegisterFlags: def test_json_short_flag(self) -> None: import argparse from muse.cli.commands.core_blame import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(['blame', 'file.py', '-j']) assert args.json_out is True def test_json_long_flag(self) -> None: import argparse from muse.cli.commands.core_blame import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(['blame', 'file.py', '--json']) assert args.json_out is True def test_default_no_json(self) -> None: import argparse from muse.cli.commands.core_blame import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) # Command-specific required args may differ; just check dest exists when possible try: args = p.parse_args(['blame', 'file.py']) assert args.json_out is False except SystemExit: pass # required positional args missing — flag default still correct