"""TDD tests for ``muse blame`` (core VCS line-level blame). Written *before* the implementation — all tests in this file define the target behaviour of the supercharged blame command: - ``--json`` / ``-j`` machine-readable single JSON object (replaces --porcelain) - ``--range START-END`` restrict output to a 1-based inclusive line range - ``--author PATTERN`` filter lines whose attributed author contains PATTERN (case-insensitive substring) - ``--ref REF`` blame at a named branch, tag, or commit prefix - ``--short N`` SHA display width in text output - All errors → stderr; JSON → stdout; exit codes 0/1/2 only JSON schema (``muse blame FILE --json``):: { "file": "README.md", "ref": "sha256:abc…", "line_count": 3, "lines": [ { "lineno": 1, "commit_id": "sha256:abc…", "short_id": "sha256:abc123456789", "author": "gabriel", "committed_at": "2026-01-01T00:00:00+00:00", "message": "initial commit", "content": "hello world" } ] } Seven test tiers ---------------- Unit — TypedDict shapes, helper isolation Integration — core blame engine via CLI E2E — flag combinations exercised end-to-end Security — null bytes, path traversal, ANSI sanitization Stress — large files, long histories Performance — wall-clock ceilings Data Integrity — lineno contiguity, sha256: prefixes, clean content """ from __future__ import annotations from collections.abc import Mapping import datetime import json import pathlib import time import pytest from muse.core.object_store import write_object from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, blob_id, fake_id from muse.core.paths import heads_dir, muse_dir from tests.cli_test_helper import CliRunner runner = CliRunner() # --------------------------------------------------------------------------- # Fixtures / helpers # --------------------------------------------------------------------------- _BASE_DT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Minimal Muse repo structure — no muse init required.""" dot_muse = muse_dir(tmp_path) for d in ("objects", "commits", "snapshots", "refs/heads"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "repo.json").write_text( json.dumps({"repo_id": fake_id("repo"), "domain": "code", "default_branch": "main", "created_at": "2026-01-01T00:00:00+00:00"}), encoding="utf-8", ) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") return tmp_path def _obj_id(content: bytes) -> str: return blob_id(content) def _store_text(repo: pathlib.Path, text: str) -> str: """Write a text blob and return its object ID (with sha256: prefix).""" raw = text.encode("utf-8") oid = _obj_id(raw) write_object(repo, oid, raw) return oid def _commit( repo: pathlib.Path, files: Mapping[str, str], *, message: str = "test commit", author: str = "gabriel", parent: str | None = None, dt_offset: int = 0, ) -> str: """Write a commit containing *files* (path → text) and return its commit_id.""" manifest: Manifest = {path: _store_text(repo, text) for path, text in files.items()} snap_id = hash_snapshot(manifest) write_snapshot(repo, SnapshotRecord( snapshot_id=snap_id, manifest=manifest, created_at=_BASE_DT + datetime.timedelta(hours=dt_offset), )) committed_at = _BASE_DT + datetime.timedelta(hours=dt_offset) commit_id = hash_commit( parent_ids=[parent] if parent else [], snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), author=author, ) write_commit(repo, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=message, committed_at=committed_at, parent_commit_id=parent, author=author, )) (heads_dir(repo) / "main").write_text(commit_id, encoding="utf-8") return commit_id def _invoke(repo: pathlib.Path, *args: str) -> "InvokeResult": return runner.invoke(None, ["blame", *args], env={"MUSE_REPO_ROOT": str(repo)}) def _parse_json(output: str) -> Mapping[str, object]: return json.loads(output.strip()) # --------------------------------------------------------------------------- # Tier 1 — Unit: JSON schema shape # --------------------------------------------------------------------------- class TestBlameJsonSchema: """Verify the top-level JSON object has all required keys.""" def test_top_level_keys(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "hello\n"}) result = _invoke(repo, "f.txt", "--json") assert result.exit_code == 0 d = _parse_json(result.output) assert set(d.keys()) >= {"file", "ref", "line_count", "lines"} def test_file_key_matches_input(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"readme.md": "# doc\n"}) result = _invoke(repo, "readme.md", "--json") assert result.exit_code == 0 assert _parse_json(result.output)["file"] == "readme.md" def test_ref_key_is_full_commit_id(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) cid = _commit(repo, {"f.txt": "x\n"}) result = _invoke(repo, "f.txt", "--json") assert result.exit_code == 0 assert _parse_json(result.output)["ref"] == cid def test_line_count_matches_lines_array(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "a\nb\nc\n"}) result = _invoke(repo, "f.txt", "--json") d = _parse_json(result.output) assert d["line_count"] == len(d["lines"]) def test_line_entry_keys(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "hello\n"}) result = _invoke(repo, "f.txt", "--json") line = _parse_json(result.output)["lines"][0] assert set(line.keys()) >= {"lineno", "commit_id", "short_id", "author", "committed_at", "message", "content"} def test_short_id_is_prefix_of_commit_id(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}) result = _invoke(repo, "f.txt", "--json") line = _parse_json(result.output)["lines"][0] assert line["commit_id"].startswith(line["short_id"]) def test_short_id_default_length_is_12(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}) result = _invoke(repo, "f.txt", "--json") line = _parse_json(result.output)["lines"][0] assert len(line["short_id"]) == len("sha256:") + 12 assert line["short_id"].startswith("sha256:") def test_line_count_is_int(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "one\ntwo\n"}) result = _invoke(repo, "f.txt", "--json") assert isinstance(_parse_json(result.output)["line_count"], int) # --------------------------------------------------------------------------- # Tier 2 — Integration: core attribution correctness via CLI # --------------------------------------------------------------------------- class TestBlameJsonAttribution: """Verify blame correctly attributes lines to the right commit.""" def test_single_commit_all_lines_attributed(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) cid = _commit(repo, {"f.txt": "a\nb\nc\n"}, author="alice") result = _invoke(repo, "f.txt", "--json") lines = _parse_json(result.output)["lines"] assert all(l["commit_id"] == cid for l in lines) assert all(l["author"] == "alice" for l in lines) def test_older_lines_attributed_to_older_commit(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) c1 = _commit(repo, {"f.txt": "line1\nline2\n"}, message="init", dt_offset=0) _commit(repo, {"f.txt": "line1\nline2\nline3\n"}, message="add line3", parent=c1, dt_offset=1) result = _invoke(repo, "f.txt", "--json") lines = _parse_json(result.output)["lines"] assert lines[0]["commit_id"] == c1 assert lines[1]["commit_id"] == c1 def test_new_line_attributed_to_newer_commit(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) c1 = _commit(repo, {"f.txt": "line1\nline2\n"}, dt_offset=0) c2 = _commit(repo, {"f.txt": "line1\nline2\nline3\n"}, parent=c1, dt_offset=1) result = _invoke(repo, "f.txt", "--json") lines = _parse_json(result.output)["lines"] assert lines[2]["commit_id"] == c2 def test_message_is_first_line_of_commit_message(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}, message="feat: add thing\n\nlong body") result = _invoke(repo, "f.txt", "--json") assert _parse_json(result.output)["lines"][0]["message"] == "feat: add thing" def test_content_has_no_trailing_newline(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "hello\nworld\n"}) result = _invoke(repo, "f.txt", "--json") for line in _parse_json(result.output)["lines"]: assert not line["content"].endswith("\n") def test_committed_at_is_iso8601(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}) result = _invoke(repo, "f.txt", "--json") ts = _parse_json(result.output)["lines"][0]["committed_at"] assert "T" in ts def test_empty_file_returns_zero_lines(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"empty.txt": ""}) result = _invoke(repo, "empty.txt", "--json") assert result.exit_code == 0 d = _parse_json(result.output) assert d["line_count"] == 0 assert d["lines"] == [] # --------------------------------------------------------------------------- # Tier 3 — E2E: flags and flag combinations # --------------------------------------------------------------------------- class TestBlameJsonFlag: """--json / -j flag behaviour.""" def test_json_flag_exits_0(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}) assert _invoke(repo, "f.txt", "--json").exit_code == 0 def test_j_short_alias(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}) result = _invoke(repo, "f.txt", "-j") assert result.exit_code == 0 _parse_json(result.output) # must be valid JSON def test_porcelain_flag_rejected(self, tmp_path: pathlib.Path) -> None: """--porcelain must no longer exist; argparse should reject it.""" repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}) result = _invoke(repo, "--porcelain", "f.txt") assert result.exit_code != 0 def test_text_output_no_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "hello\n"}) result = _invoke(repo, "f.txt") assert result.exit_code == 0 with pytest.raises((json.JSONDecodeError, ValueError)): json.loads(result.output.strip()) def test_text_output_contains_content(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "hello world\n"}) result = _invoke(repo, "f.txt") assert "hello world" in result.output def test_text_output_contains_lineno(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "a\nb\nc\n"}) result = _invoke(repo, "f.txt") assert "1" in result.output assert "2" in result.output assert "3" in result.output def test_short_n_changes_sha_width(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}) result8 = _invoke(repo, "f.txt", "--short", "8") result16 = _invoke(repo, "f.txt", "--short", "16") # Lines differ in width — just check both succeed assert result8.exit_code == 0 assert result16.exit_code == 0 def test_ref_branch_name(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "at main\n"}) result = _invoke(repo, "f.txt", "--ref", "main", "--json") assert result.exit_code == 0 assert _parse_json(result.output)["lines"][0]["content"] == "at main" def test_json_is_compact(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}) result = _invoke(repo, "f.txt", "--json") assert "\n" not in result.output.strip() # compact JSON for agents class TestBlameRange: """--range START-END flag.""" def test_range_limits_lines_returned(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "a\nb\nc\nd\ne\n"}) result = _invoke(repo, "f.txt", "--range", "2-4", "--json") assert result.exit_code == 0 lines = _parse_json(result.output)["lines"] assert len(lines) == 3 assert lines[0]["lineno"] == 2 assert lines[-1]["lineno"] == 4 def test_range_single_line(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "a\nb\nc\n"}) result = _invoke(repo, "f.txt", "--range", "2-2", "--json") assert result.exit_code == 0 lines = _parse_json(result.output)["lines"] assert len(lines) == 1 assert lines[0]["lineno"] == 2 assert lines[0]["content"] == "b" def test_range_full_file_explicit(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "a\nb\nc\n"}) result = _invoke(repo, "f.txt", "--range", "1-3", "--json") lines = _parse_json(result.output)["lines"] assert len(lines) == 3 def test_range_start_gt_end_is_error(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "a\nb\nc\n"}) result = _invoke(repo, "f.txt", "--range", "4-2") assert result.exit_code != 0 assert "❌" in result.stderr or "error" in result.stderr.lower() def test_range_zero_start_is_error(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "a\n"}) result = _invoke(repo, "f.txt", "--range", "0-1") assert result.exit_code != 0 def test_range_clamped_to_file_length(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "a\nb\nc\n"}) result = _invoke(repo, "f.txt", "--range", "2-999", "--json") assert result.exit_code == 0 lines = _parse_json(result.output)["lines"] # only lines 2 and 3 exist assert len(lines) == 2 def test_range_text_output_respected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "aa\nbb\ncc\n"}) result = _invoke(repo, "f.txt", "--range", "2-2") assert result.exit_code == 0 assert "bb" in result.output assert "aa" not in result.output assert "cc" not in result.output def test_range_line_count_reflects_filtered(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "a\nb\nc\nd\n"}) result = _invoke(repo, "f.txt", "--range", "1-2", "--json") d = _parse_json(result.output) assert d["line_count"] == 2 class TestBlameAuthor: """--author PATTERN flag.""" def test_author_filter_matches(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "by alice\n"}, author="alice") result = _invoke(repo, "f.txt", "--author", "alice", "--json") assert result.exit_code == 0 lines = _parse_json(result.output)["lines"] assert len(lines) == 1 def test_author_filter_case_insensitive(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}, author="Alice") result = _invoke(repo, "f.txt", "--author", "ALICE", "--json") assert result.exit_code == 0 assert len(_parse_json(result.output)["lines"]) == 1 def test_author_filter_no_match_returns_empty(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}, author="alice") result = _invoke(repo, "f.txt", "--author", "bob", "--json") assert result.exit_code == 0 assert _parse_json(result.output)["lines"] == [] def test_author_filter_substring_match(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}, author="gabriel cardona") result = _invoke(repo, "f.txt", "--author", "gabriel", "--json") assert result.exit_code == 0 assert len(_parse_json(result.output)["lines"]) == 1 def test_author_filter_with_two_authors(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) c1 = _commit(repo, {"f.txt": "alice line\n"}, author="alice", dt_offset=0) _commit(repo, {"f.txt": "alice line\nbob line\n"}, author="bob", parent=c1, dt_offset=1) result = _invoke(repo, "f.txt", "--author", "alice", "--json") assert result.exit_code == 0 lines = _parse_json(result.output)["lines"] assert all(l["author"] == "alice" for l in lines) def test_author_combined_with_range(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "a\nb\nc\n"}, author="alice") result = _invoke(repo, "f.txt", "--author", "alice", "--range", "1-2", "--json") assert result.exit_code == 0 lines = _parse_json(result.output)["lines"] assert len(lines) == 2 def test_author_line_count_reflects_filter(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) c1 = _commit(repo, {"f.txt": "alice\n"}, author="alice", dt_offset=0) _commit(repo, {"f.txt": "alice\nbob\n"}, author="bob", parent=c1, dt_offset=1) result = _invoke(repo, "f.txt", "--author", "bob", "--json") d = _parse_json(result.output) assert d["line_count"] == len(d["lines"]) # --------------------------------------------------------------------------- # Tier 4 — Security # --------------------------------------------------------------------------- class TestBlameSecurity: """Input validation and output sanitization.""" def test_null_byte_in_path_is_error(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}) result = _invoke(repo, "f.txt\x00malicious") assert result.exit_code != 0 assert "❌" in result.stderr def test_unknown_file_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}) result = _invoke(repo, "does_not_exist.txt") assert result.exit_code == 1 def test_unknown_file_error_on_stderr(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}) result = _invoke(repo, "does_not_exist.txt") assert "❌" in result.stderr def test_unknown_ref_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}) result = _invoke(repo, "f.txt", "--ref", "nonexistent-branch") assert result.exit_code == 1 def test_unknown_ref_error_on_stderr(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}) result = _invoke(repo, "f.txt", "--ref", "no-such-ref") assert "❌" in result.stderr def test_ansi_in_content_sanitized_text(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "normal\x1b[31mred\x1b[0m\n"}) result = _invoke(repo, "f.txt") assert "\x1b" not in result.output def test_ansi_in_author_sanitized_text(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}, author="bad\x1b[31mactor\x1b[0m") result = _invoke(repo, "f.txt") assert "\x1b" not in result.output def test_json_output_no_ansi(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "\x1b[31mcolor\x1b[0m\n"}, author="\x1b[32mmalicious\x1b[0m") result = _invoke(repo, "f.txt", "--json") assert "\x1b" not in result.output def test_no_repo_exits_2(self, tmp_path: pathlib.Path) -> None: empty = tmp_path / "not_a_repo" empty.mkdir() result = runner.invoke(None, ["blame", "f.txt"], env={"MUSE_REPO_ROOT": str(empty)}) assert result.exit_code == 2 def test_range_invalid_format_is_error(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}) result = _invoke(repo, "f.txt", "--range", "abc-xyz") assert result.exit_code != 0 # --------------------------------------------------------------------------- # Tier 5 — Stress # --------------------------------------------------------------------------- class TestBlameStress: """Correctness under scale.""" def test_500_line_file(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) text = "\n".join(f"line {i}" for i in range(1, 501)) + "\n" _commit(repo, {"big.txt": text}) result = _invoke(repo, "big.txt", "--json") assert result.exit_code == 0 d = _parse_json(result.output) assert d["line_count"] == 500 assert len(d["lines"]) == 500 def test_20_commit_chain(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) parent = None for i in range(20): lines = "\n".join(f"line {j}" for j in range(i + 1)) + "\n" parent = _commit(repo, {"f.txt": lines}, message=f"c{i}", parent=parent, dt_offset=i) result = _invoke(repo, "f.txt", "--json") assert result.exit_code == 0 assert _parse_json(result.output)["line_count"] == 20 def test_single_line_file(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "only line\n"}) result = _invoke(repo, "f.txt", "--json") d = _parse_json(result.output) assert d["line_count"] == 1 assert d["lines"][0]["content"] == "only line" def test_file_no_trailing_newline(self, tmp_path: pathlib.Path) -> None: """Files without a trailing newline must still blame correctly.""" repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "no newline"}) result = _invoke(repo, "f.txt", "--json") assert result.exit_code == 0 d = _parse_json(result.output) assert d["line_count"] == 1 assert d["lines"][0]["content"] == "no newline" def test_range_on_large_file(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) text = "\n".join(f"line {i}" for i in range(1, 201)) + "\n" _commit(repo, {"big.txt": text}) result = _invoke(repo, "big.txt", "--range", "50-100", "--json") assert result.exit_code == 0 lines = _parse_json(result.output)["lines"] assert len(lines) == 51 assert lines[0]["lineno"] == 50 assert lines[-1]["lineno"] == 100 # --------------------------------------------------------------------------- # Tier 6 — Performance # --------------------------------------------------------------------------- class TestBlamePerformance: """Wall-clock ceilings — fast enough not to block an agent loop.""" def test_100_line_file_under_2s(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) text = "\n".join(f"line {i}" for i in range(100)) + "\n" _commit(repo, {"f.txt": text}) t0 = time.monotonic() result = _invoke(repo, "f.txt", "--json") elapsed = time.monotonic() - t0 assert result.exit_code == 0 assert elapsed < 2.0, f"blame took {elapsed:.2f}s on 100-line file" def test_10_commit_chain_under_3s(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) parent = None for i in range(10): parent = _commit(repo, {"f.txt": f"line{i}\n"}, parent=parent, dt_offset=i) t0 = time.monotonic() result = _invoke(repo, "f.txt", "--json") elapsed = time.monotonic() - t0 assert result.exit_code == 0 assert elapsed < 3.0, f"blame took {elapsed:.2f}s over 10 commits" # --------------------------------------------------------------------------- # Tier 7 — Data Integrity # --------------------------------------------------------------------------- class TestBlameDataIntegrity: """Structural invariants that must hold for every blame output.""" def test_linenos_are_contiguous_from_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "a\nb\nc\nd\n"}) lines = _parse_json(_invoke(repo, "f.txt", "--json").output)["lines"] assert [l["lineno"] for l in lines] == [1, 2, 3, 4] def test_all_commit_ids_sha256_prefixed(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "a\nb\n"}) lines = _parse_json(_invoke(repo, "f.txt", "--json").output)["lines"] assert all(l["commit_id"].startswith("sha256:") for l in lines) def test_author_never_empty_string(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\n"}, author="gabriel") lines = _parse_json(_invoke(repo, "f.txt", "--json").output)["lines"] assert all(l["author"] for l in lines) def test_content_no_trailing_newline(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "hello\nworld\n"}) lines = _parse_json(_invoke(repo, "f.txt", "--json").output)["lines"] assert all(not l["content"].endswith("\n") for l in lines) def test_range_linenos_match_original_positions(self, tmp_path: pathlib.Path) -> None: """Lines filtered by --range must report their original file position.""" repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "a\nb\nc\nd\ne\n"}) lines = _parse_json(_invoke(repo, "f.txt", "--range", "3-5", "--json").output)["lines"] assert [l["lineno"] for l in lines] == [3, 4, 5] assert lines[0]["content"] == "c" assert lines[1]["content"] == "d" assert lines[2]["content"] == "e" def test_line_count_equals_lines_length_always(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "a\nb\nc\n"}) for flags in ([], ["--range", "1-2"], ["--author", "gabriel"]): result = _invoke(repo, "f.txt", "--json", *flags) d = _parse_json(result.output) assert d["line_count"] == len(d["lines"]) def test_json_is_valid_and_parseable(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _commit(repo, {"f.txt": "x\ny\n"}) result = _invoke(repo, "f.txt", "--json") assert result.exit_code == 0 d = _parse_json(result.output) assert isinstance(d, dict) assert isinstance(d["lines"], list)