"""Comprehensive tests for muse code lineage. Test layers ----------- Unit ``build_lineage`` exercised directly with synthetic ``CommitRecord`` objects carrying hand-crafted ``structured_delta`` data. No repo, no disk I/O. Integration CLI invocations via ``CliRunner`` against a real tmp-path repo with two Python commits (the shared ``code_repo`` fixture). Edge-case Empty history, deleted-then-re-created, address without ``::`` guard, unknown branch / ref, ``--filter`` narrowing, ``--since``/``--until`` date bounds, ``--count`` output, ``--stability`` output. Stress Programmatically generate N commits each carrying an InsertOp, ModifyOp, or DeleteOp and verify ``build_lineage`` produces the expected event count and kind sequence without error. """ from __future__ import annotations import datetime import json import pathlib import textwrap import pytest from tests.cli_test_helper import CliRunner from muse.cli.commands.lineage import _LineageEvent, _classify_replace, _stability, build_lineage from muse.core.commits import CommitRecord from muse.domain import DeleteOp, DomainOp, InsertOp, PatchOp, ReplaceOp cli = None # argparse migration — CliRunner ignores this arg runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _REPO_ID = "test-repo-id" _SEQ = [0] def _cid(tag: str) -> str: """Return a deterministic 64-char hex content_id from a short tag.""" return tag.ljust(64, "0")[:64] def _ts(offset_days: int = 0) -> datetime.datetime: base = datetime.datetime(2026, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc) return base + datetime.timedelta(days=offset_days) def _commit( *, message: str = "commit", ops: list[DomainOp] | None = None, day: int = 0, commit_id: str | None = None, ) -> CommitRecord: """Build a synthetic CommitRecord with the given symbol-level ops.""" _SEQ[0] += 1 cid = commit_id or f"c{_SEQ[0]:063d}" return CommitRecord( commit_id=cid, branch="main", snapshot_id=f"snap-{cid}", message=message, committed_at=_ts(day), structured_delta={"ops": ops or [], "domain": "code", "summary": message}, ) def _insert(address: str, content_id: str) -> InsertOp: return InsertOp( op="insert", address=address, position=None, content_id=_cid(content_id), content_summary=f"function {address.split('::')[-1]}", ) def _delete(address: str, content_id: str) -> DeleteOp: return DeleteOp( op="delete", address=address, position=None, content_id=_cid(content_id), content_summary=f"function {address.split('::')[-1]}", ) def _replace(address: str, old_cid: str, new_cid: str, old_sum: str = "", new_sum: str = "") -> ReplaceOp: return ReplaceOp( op="replace", address=address, position=None, old_content_id=_cid(old_cid), new_content_id=_cid(new_cid), old_summary=old_sum, new_summary=new_sum, ) def _patch(*child_ops: DomainOp, file: str = "billing.py") -> PatchOp: """Wrap symbol-level ops in a PatchOp (as Muse emits for file changes).""" return PatchOp( op="patch", address=file, child_ops=list(child_ops), child_domain="code", child_summary="", ) ADDR = "billing.py::compute_total" OTHER = "billing.py::compute_total_v2" OTHER_FILE = "utils.py::compute_total" # --------------------------------------------------------------------------- # Unit: _classify_replace # --------------------------------------------------------------------------- class TestClassifyReplace: def test_signature_change_detected_in_old(self) -> None: assert _classify_replace("signature changed", "") == "signature_change" def test_signature_change_detected_in_new(self) -> None: assert _classify_replace("", "new signature") == "signature_change" def test_full_rewrite_when_no_signature(self) -> None: assert _classify_replace("impl updated", "impl updated v2") == "full_rewrite" def test_empty_summaries_full_rewrite(self) -> None: assert _classify_replace("", "") == "full_rewrite" # --------------------------------------------------------------------------- # Unit: _stability # --------------------------------------------------------------------------- class TestStability: def test_no_events(self) -> None: assert _stability([]) == (0, 0) def test_all_created(self) -> None: evs = [_LineageEvent("c1", "2026-01-01", "init", "created")] assert _stability(evs) == (0, 1) def test_mixed(self) -> None: evs = [ _LineageEvent("c1", "2026-01-01", "init", "created"), _LineageEvent("c2", "2026-01-02", "fix", "modified", detail="impl_only"), _LineageEvent("c3", "2026-01-03", "fix2", "modified", detail="full_rewrite"), ] assert _stability(evs) == (2, 3) # --------------------------------------------------------------------------- # Unit: build_lineage — core event kinds # --------------------------------------------------------------------------- class TestBuildLineageCreated: def test_no_commits(self) -> None: assert build_lineage(ADDR, []) == [] def test_no_structured_delta(self) -> None: c = CommitRecord( commit_id="c" * 64, branch="main", snapshot_id="snap", message="empty", committed_at=_ts(), structured_delta=None, ) assert build_lineage(ADDR, [c]) == [] def test_single_insert_emits_created(self) -> None: c = _commit(ops=[_insert(ADDR, "aaa")], message="add fn", day=0) events = build_lineage(ADDR, [c]) assert len(events) == 1 assert events[0].kind == "created" assert events[0].message == "add fn" assert events[0].new_content_id == _cid("aaa") def test_insert_inside_patch_op(self) -> None: """flat_symbol_ops must recurse into PatchOp.child_ops.""" c = _commit(ops=[_patch(_insert(ADDR, "bbb"))], message="patch add") events = build_lineage(ADDR, [c]) assert len(events) == 1 assert events[0].kind == "created" def test_unrelated_insert_ignored(self) -> None: c = _commit(ops=[_insert("billing.py::other_fn", "ccc")]) assert build_lineage(ADDR, [c]) == [] class TestBuildLineageModified: def test_replace_emits_modified(self) -> None: c1 = _commit(ops=[_insert(ADDR, "v1")], day=0) c2 = _commit(ops=[_replace(ADDR, "v1", "v2")], day=1, message="fix") events = build_lineage(ADDR, [c1, c2]) kinds = [e.kind for e in events] assert kinds == ["created", "modified"] assert events[1].detail == "full_rewrite" assert events[1].message == "fix" def test_replace_with_signature_detail(self) -> None: c1 = _commit(ops=[_insert(ADDR, "v1")], day=0) c2 = _commit(ops=[_replace(ADDR, "v1", "v2", old_sum="signature changed")], day=1) events = build_lineage(ADDR, [c1, c2]) assert events[1].kind == "modified" assert events[1].detail == "signature_change" def test_multiple_modifications_in_sequence(self) -> None: commits = [ _commit(ops=[_insert(ADDR, "v1")], day=0), _commit(ops=[_replace(ADDR, "v1", "v2")], day=1), _commit(ops=[_replace(ADDR, "v2", "v3")], day=2), _commit(ops=[_replace(ADDR, "v3", "v4")], day=3), ] events = build_lineage(ADDR, commits) assert len(events) == 4 assert events[0].kind == "created" assert all(e.kind == "modified" for e in events[1:]) class TestBuildLineageDeleted: def test_delete_emits_deleted(self) -> None: c1 = _commit(ops=[_insert(ADDR, "v1")], day=0) c2 = _commit(ops=[_delete(ADDR, "v1")], day=1, message="remove fn") events = build_lineage(ADDR, [c1, c2]) assert events[-1].kind == "deleted" assert events[-1].message == "remove fn" def test_delete_marks_address_not_live(self) -> None: """After delete, re-inserting the same content should emit 'created', not 'copied_from'.""" c1 = _commit(ops=[_insert(ADDR, "v1")], day=0) c2 = _commit(ops=[_delete(ADDR, "v1")], day=1) c3 = _commit(ops=[_insert(ADDR, "v1")], day=2) events = build_lineage(ADDR, [c1, c2, c3]) kinds = [e.kind for e in events] assert kinds == ["created", "deleted", "created"] class TestBuildLineageRenamedMoved: def test_rename_within_same_file(self) -> None: """InsertOp at ADDR + DeleteOp at OTHER (same file, same content_id) → renamed_from.""" c1 = _commit(ops=[_insert(OTHER, "v1")], day=0) c2 = _commit(ops=[_insert(ADDR, "v1"), _delete(OTHER, "v1")], day=1, message="rename") events = build_lineage(ADDR, [c1, c2]) ev = next(e for e in events if e.kind == "renamed_from") assert ev.detail == OTHER assert ev.message == "rename" def test_move_across_files(self) -> None: """InsertOp at ADDR + DeleteOp at OTHER_FILE (different file) → moved_from.""" c1 = _commit(ops=[_insert(OTHER_FILE, "v1")], day=0) c2 = _commit(ops=[_insert(ADDR, "v1"), _delete(OTHER_FILE, "v1")], day=1, message="move") events = build_lineage(ADDR, [c1, c2]) ev = next(e for e in events if e.kind == "moved_from") assert ev.detail == OTHER_FILE assert ev.message == "move" def test_rename_file_correctly_classified(self) -> None: """Same file → renamed_from, not moved_from.""" c1 = _commit(ops=[_insert(OTHER, "v1")], day=0) c2 = _commit(ops=[_insert(ADDR, "v1"), _delete(OTHER, "v1")], day=1) events = build_lineage(ADDR, [c1, c2]) assert any(e.kind == "renamed_from" for e in events) assert not any(e.kind == "moved_from" for e in events) class TestBuildLineageCopied: def test_copied_from_living_symbol(self) -> None: """Insert at ADDR with content_id already live at OTHER → copied_from.""" c1 = _commit(ops=[_insert(OTHER, "v1")], day=0) c2 = _commit(ops=[_insert(ADDR, "v1")], day=1, message="copy fn") events = build_lineage(ADDR, [c1, c2]) assert events[0].kind == "copied_from" assert events[0].detail == OTHER assert events[0].message == "copy fn" def test_not_copied_when_no_living_symbol(self) -> None: """Insert with unique content_id → created, not copied_from.""" c = _commit(ops=[_insert(ADDR, "unique_content")], day=0) events = build_lineage(ADDR, [c]) assert events[0].kind == "created" # --------------------------------------------------------------------------- # Unit: build_lineage — complex lifecycle # --------------------------------------------------------------------------- class TestBuildLineageLifecycle: def test_full_lifecycle(self) -> None: """create → modify → rename_away → recreate → delete.""" # Phase 1: created at ADDR c1 = _commit(ops=[_insert(ADDR, "v1")], day=0, message="create") # Phase 2: modified c2 = _commit(ops=[_replace(ADDR, "v1", "v2")], day=1, message="modify") # Phase 3: renamed away — ADDR is deleted, NEW_ADDR is inserted new_addr = "billing.py::compute_total_renamed" c3 = _commit(ops=[_insert(new_addr, "v2"), _delete(ADDR, "v2")], day=2, message="rename away") # Phase 4: ADDR re-created with fresh content c4 = _commit(ops=[_insert(ADDR, "v3")], day=3, message="recreate") # Phase 5: deleted c5 = _commit(ops=[_delete(ADDR, "v3")], day=4, message="delete") events = build_lineage(ADDR, [c1, c2, c3, c4, c5]) kinds = [e.kind for e in events] assert kinds == ["created", "modified", "deleted", "created", "deleted"] def test_commit_message_propagated(self) -> None: c1 = _commit(ops=[_insert(ADDR, "v1")], message="Initial commit") events = build_lineage(ADDR, [c1]) assert events[0].message == "Initial commit" def test_to_dict_has_full_commit_id(self) -> None: c1 = _commit(ops=[_insert(ADDR, "v1")], commit_id="a" * 64) events = build_lineage(ADDR, [c1]) d = events[0].to_dict() assert d["commit_id"] == "a" * 64 # not truncated def test_to_dict_has_message(self) -> None: c1 = _commit(ops=[_insert(ADDR, "v1")], message="My message") events = build_lineage(ADDR, [c1]) assert events[0].to_dict()["message"] == "My message" def test_commits_without_symbol_ops_skipped(self) -> None: """File-level ops (no '::') must not generate any events.""" file_op = ReplaceOp( op="replace", address="billing.py", position=None, old_content_id=_cid("old"), new_content_id=_cid("new"), old_summary="", new_summary="", ) c = _commit(ops=[file_op]) assert build_lineage(ADDR, [c]) == [] # --------------------------------------------------------------------------- # Unit: build_lineage — date filtering via _gather_commits (tested indirectly # through the CLI --since/--until flags in integration tests below) # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # Stress: large commit sequence # --------------------------------------------------------------------------- class TestBuildLineageStress: def test_many_modifications(self) -> None: """500 sequential modifications produce 501 events without error.""" n = 500 commits: list[CommitRecord] = [_commit(ops=[_insert(ADDR, "v0")], day=0)] for i in range(1, n + 1): commits.append(_commit( ops=[_replace(ADDR, f"v{i-1}", f"v{i}")], day=i, )) events = build_lineage(ADDR, commits) assert len(events) == n + 1 assert events[0].kind == "created" assert all(e.kind == "modified" for e in events[1:]) def test_many_unrelated_commits_skipped_efficiently(self) -> None: """1000 commits touching only unrelated symbols → 0 events for ADDR.""" commits = [ _commit(ops=[_insert(f"billing.py::other_{i}", f"uid_{i}")], day=i) for i in range(1000) ] events = build_lineage(ADDR, commits) assert events == [] def test_interleaved_symbol_and_unrelated_ops(self) -> None: """Mix of target-symbol ops and noise — only target events emitted.""" commits: list[CommitRecord] = [] for i in range(200): ops: list[DomainOp] = [_insert(f"billing.py::noise_{i}", f"n{i}")] if i == 50: ops.append(_insert(ADDR, "start")) if i == 100: ops.append(_replace(ADDR, "start", "mid")) if i == 150: ops.append(_delete(ADDR, "mid")) commits.append(_commit(ops=ops, day=i)) events = build_lineage(ADDR, commits) kinds = [e.kind for e in events] assert kinds == ["created", "modified", "deleted"] def test_delete_recreate_cycle(self) -> None: """Symbol deleted and recreated 10 times → 10 deletes + 11 creates.""" commits: list[CommitRecord] = [_commit(ops=[_insert(ADDR, "v0")], day=0)] for cycle in range(10): base = cycle * 2 + 1 commits.append(_commit(ops=[_delete(ADDR, f"v{cycle}")], day=base)) commits.append(_commit(ops=[_insert(ADDR, f"v{cycle+1}")], day=base + 1)) events = build_lineage(ADDR, commits) kinds = [e.kind for e in events] assert kinds.count("created") == 11 assert kinds.count("deleted") == 10 # --------------------------------------------------------------------------- # Integration: CLI # --------------------------------------------------------------------------- @pytest.fixture def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) r = runner.invoke(cli, ["init", "--domain", "code"]) assert r.exit_code == 0, r.output return tmp_path @pytest.fixture def code_repo(repo: pathlib.Path) -> pathlib.Path: """Two-commit repo: billing.py created then function renamed.""" (repo / "billing.py").write_text(textwrap.dedent("""\ def compute_total(items): return sum(items) def process_order(invoice, items): return compute_total(items) """)) runner.invoke(cli, ["code", "add", "billing.py"]) r = runner.invoke(cli, ["commit", "-m", "Initial billing module"]) assert r.exit_code == 0, r.output (repo / "billing.py").write_text(textwrap.dedent("""\ def compute_invoice_total(items): return sum(items) def process_order(invoice, items): return compute_invoice_total(items) """)) runner.invoke(cli, ["code", "add", "billing.py"]) r = runner.invoke(cli, ["commit", "-m", "Rename compute_total"]) assert r.exit_code == 0, r.output return repo class TestLineageCLI: def test_exits_zero_for_existing_symbol(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "lineage", "billing.py::process_order"]) assert result.exit_code == 0, result.output def test_json_schema(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "lineage", "--json", "billing.py::process_order"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert "events" in data assert "total" in data assert "stability_pct" in data assert "modified_count" in data assert isinstance(data["events"], list) for ev in data["events"]: assert "commit_id" in ev assert "committed_at" in ev assert "event" in ev assert "message" in ev # Full sha256:<64-hex> format — not truncated assert ev["commit_id"].startswith("sha256:") assert len(ev["commit_id"]) == 71 def test_no_address_separator_rejected(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "lineage", "billing.py"]) assert result.exit_code == 1 def test_missing_symbol_returns_zero_events(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "lineage", "billing.py::nonexistent_xyz"]) assert result.exit_code == 0 assert "no events found" in result.output def test_count_only_outputs_integer(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "lineage", "--count", "billing.py::process_order"]) assert result.exit_code == 0 assert result.output.strip().isdigit() def test_filter_created_subset(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "lineage", "--filter", "created", "--json", "billing.py::process_order", ]) assert result.exit_code == 0 data = json.loads(result.output) for ev in data["events"]: assert ev["event"] == "created" def test_filter_modified_subset(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "lineage", "--filter", "modified", "--json", "billing.py::process_order", ]) assert result.exit_code == 0 data = json.loads(result.output) for ev in data["events"]: assert ev["event"] == "modified" def test_since_future_returns_empty(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "lineage", "--since", "2099-01-01", "billing.py::process_order", ]) assert result.exit_code == 0 assert "no events found" in result.output def test_since_invalid_date_rejected(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "lineage", "--since", "not-a-date", "billing.py::process_order", ]) assert result.exit_code == 1 def test_until_invalid_date_rejected(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "lineage", "--until", "99/99/99", "billing.py::process_order", ]) assert result.exit_code == 1 def test_commit_flag_accepted(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "lineage", "--commit", "HEAD", "billing.py::process_order", ]) assert result.exit_code == 0 def test_stability_flag_present_in_output(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "lineage", "--stability", "billing.py::process_order", ]) assert result.exit_code == 0 # Stability line only appears when there are events. # Just check no crash. def test_unknown_branch_rejected(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, [ "code", "lineage", "--branch", "nonexistent-branch-xyz", "billing.py::process_order", ]) assert result.exit_code == 1 def test_requires_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.chdir(tmp_path) result = runner.invoke(cli, ["code", "lineage", "src/a.py::f"]) assert result.exit_code != 0 def test_json_events_have_full_commit_id(self, code_repo: pathlib.Path) -> None: result = runner.invoke(cli, ["code", "lineage", "--json", "billing.py::process_order"]) assert result.exit_code == 0 data = json.loads(result.output) for ev in data["events"]: assert ev["commit_id"].startswith("sha256:"), ( "commit_id must use sha256: prefix format" ) assert len(ev["commit_id"]) == 71, ( f"sha256:<64-hex> should be 71 chars, got {len(ev['commit_id'])}" )