"""Tests for the Snapshot & Symbol Indexer — Section 5 of test-coverage-checklist.md. Complements test_snapshot_entries.py (14 tests on the snapshot write/read path). This file focuses on the symbol indexer and the gaps not covered there. Coverage layers ─────────────── Unit — _extract_ops (flat/nested child_ops, missing address, non-dict delta); _op_to_muse_op (all mapping keys, unknown passthrough). Integration — build_symbol_index: empty list when no structured_delta; returns results for repos with structured_delta; correct symbol_history/hash_occurrence content; upsert semantics (only one row per repo/intel_type); BFS excludes orphaned commits. load_symbol_history: empty when no index; with/without file_path filter. load_hash_occurrence: empty when no index; correct content. get_index_meta: None/present states. load_intel_snapshot: None/present states. get_snapshot_manifests_batch: empty list, single, multi-snapshot. Data — upsert_snapshot_entries atomic replace (stale entries removed); build_symbol_index + persist_intel_results upserts on rebuild; BFS reachability excludes orphaned branches. Security — Corrupt JSON blob returns {} not exception; build_symbol_index with unknown head_commit_id returns empty list. Stress — upsert_snapshot_entries with 1 000-file manifest; get_snapshot_manifests_batch with 50 snapshots in one query; build_symbol_index with 100 commits (10 ops each); load_symbol_history file_path filter on large index. Performance — _extract_ops 1 000 calls < 100 ms; build_symbol_index 100 commits < 3 s. E2E — Full pipeline: commits with structured_delta → build_symbol_index → persist_intel_results → get_index_meta returns correct ref; rebuild replaces previous result; symbol list HTTP page returns 200. """ from __future__ import annotations import json import secrets import time from datetime import datetime, timezone import pytest from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_intel_models import MusehubIntelResult, MusehubSymbolHistoryEntry from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubSnapshot, MusehubSnapshotRef from tests.factories import create_repo from musehub.types.json_types import JSONObject from muse.core.types import long_id, blob_id # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── def _now() -> datetime: return datetime.now(tz=timezone.utc) async def _commit_with_delta( session: AsyncSession, repo_id: str, commit_id: str, ops: list[JSONObject], parent_ids: list[str] | None = None, branch: str = "main", author: str = "gabriel", ) -> MusehubCommit: """Insert a commit with a structured_delta.""" commit = MusehubCommit( commit_id=commit_id, branch=branch, parent_ids=parent_ids or [], message="feat: test commit", author=author, timestamp=_now(), structured_delta={"ops": ops}, ) session.add(commit) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id)) await session.flush() return commit def _insert_op(address: str, content_id: str = "sha256:abc") -> JSONObject: return {"address": address, "op": "insert", "content_id": content_id} def _move_op(address: str, from_address: str, content_id: str = "sha256:abc") -> JSONObject: return {"address": address, "op": "move", "from_address": from_address, "content_id": content_id} def _patch_op(file_addr: str, children: list[JSONObject]) -> JSONObject: return {"address": file_addr, "op": "patch", "child_ops": children} async def _build_and_persist( session: AsyncSession, repo_id: str, commit_id: str, ) -> list[tuple[str, dict]]: """Build symbol index and persist results; returns the result list.""" from musehub.services.musehub_symbol_indexer import build_symbol_index from musehub.services.musehub_intel_providers import persist_intel_results results = await build_symbol_index(session, repo_id, commit_id) if results: await persist_intel_results(session, repo_id, commit_id, results) return results def _get_result_data(results: list[tuple[str, JSONObject]], intel_type: str) -> JSONObject: """Extract data dict for a specific intel_type from the results list.""" for t, data in results: if t == intel_type: return data return {} # ───────────────────────────────────────────────────────────────────────────── # Layer 1 — Unit: pure functions # ───────────────────────────────────────────────────────────────────────────── class TestExtractOps: """_extract_ops pulls a flat list of ops including child_ops.""" def _run(self, structured_delta: JSONObject | None) -> list[JSONObject]: from musehub.services.musehub_symbol_indexer import _extract_ops return _extract_ops(structured_delta) def test_no_structured_delta_returns_empty(self) -> None: assert self._run(None) == [] def test_none_delta_returns_empty(self) -> None: assert self._run(None) == [] def test_non_dict_delta_returns_empty(self) -> None: assert self._run("bad") == [] # type: ignore[arg-type] def test_flat_ops_without_child_ops(self) -> None: delta = { "ops": [ {"address": "main.py::Foo", "op": "insert"}, {"address": "main.py::Bar", "op": "delete"}, ] } result = self._run(delta) assert len(result) == 2 assert result[0]["address"] == "main.py::Foo" assert result[1]["address"] == "main.py::Bar" def test_patch_op_with_child_ops_flattened(self) -> None: delta = { "ops": [ { "address": "src/app.py", "op": "patch", "child_ops": [ {"address": "src/app.py::MyClass", "op": "insert"}, {"address": "src/app.py::MyClass.run", "op": "insert"}, ], } ] } result = self._run(delta) # 1 top-level + 2 child_ops assert len(result) == 3 addresses = [op["address"] for op in result] assert "src/app.py" in addresses assert "src/app.py::MyClass" in addresses assert "src/app.py::MyClass.run" in addresses def test_op_without_address_skipped(self) -> None: delta = { "ops": [ {"op": "insert"}, # no address {"address": "ok.py", "op": "insert"}, ] } result = self._run(delta) assert len(result) == 1 assert result[0]["address"] == "ok.py" def test_child_op_without_address_skipped(self) -> None: delta = { "ops": [ { "address": "file.py", "op": "patch", "child_ops": [ {"op": "insert"}, # no address — must be skipped {"address": "file.py::Good", "op": "insert"}, ], } ] } result = self._run(delta) addresses = [op["address"] for op in result] assert "file.py::Good" in addresses for op in result: assert "address" in op def test_non_dict_op_skipped(self) -> None: delta = {"ops": ["not-a-dict", {"address": "f.py", "op": "add"}]} result = self._run(delta) assert len(result) == 1 class TestRawOpStorage: """Raw DomainOp types are stored verbatim in op; full payload in op_payload.""" @pytest.mark.asyncio async def test_insert_op_stored_raw(self, db_session: AsyncSession) -> None: from musehub.services.musehub_symbol_indexer import build_symbol_index from sqlalchemy import select repo = await create_repo(db_session, slug="raw-insert") commit = await _commit_with_delta( db_session, repo.repo_id, "raw-c001", ops=[{ "address": "main.py::Foo", "op": "insert", "content_id": "sha256:aaa", "content_summary": "added function Foo", "position": 0, }], ) await build_symbol_index(db_session, repo.repo_id, commit.commit_id) row = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.address == "main.py::Foo", ) )).scalar_one() assert row.op == "insert" assert row.op_payload is not None assert row.op_payload["content_summary"] == "added function Foo" assert row.op_payload["position"] == 0 assert "op" not in row.op_payload assert "address" not in row.op_payload @pytest.mark.asyncio async def test_replace_op_stored_raw(self, db_session: AsyncSession) -> None: from musehub.services.musehub_symbol_indexer import build_symbol_index from sqlalchemy import select repo = await create_repo(db_session, slug="raw-replace") commit = await _commit_with_delta( db_session, repo.repo_id, "raw-c002", ops=[{ "address": "main.py::Foo", "op": "replace", "old_content_id": "sha256:old", "new_content_id": "sha256:new", "old_summary": "function Foo v1", "new_summary": "function Foo v2", "position": None, }], ) await build_symbol_index(db_session, repo.repo_id, commit.commit_id) row = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.address == "main.py::Foo", ) )).scalar_one() assert row.op == "replace" assert row.content_id == "sha256:new" assert row.op_payload["old_content_id"] == "sha256:old" assert row.op_payload["new_content_id"] == "sha256:new" assert row.op_payload["old_summary"] == "function Foo v1" assert row.op_payload["new_summary"] == "function Foo v2" @pytest.mark.asyncio async def test_patch_op_stored_raw_with_child_summary( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import build_symbol_index from sqlalchemy import select repo = await create_repo(db_session, slug="raw-patch") commit = await _commit_with_delta( db_session, repo.repo_id, "raw-c003", ops=[{ "address": "src/app.py", "op": "patch", "child_domain": "python", "child_summary": "2 symbols changed", "child_ops": [ {"address": "src/app.py::MyClass", "op": "insert", "content_id": "sha256:cls", "content_summary": "added class", "position": 0}, ], }], ) await build_symbol_index(db_session, repo.repo_id, commit.commit_id) rows = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalars().all() by_addr = {r.address: r for r in rows} # Parent patch entry patch_row = by_addr["src/app.py"] assert patch_row.op == "patch" assert patch_row.op_payload["child_summary"] == "2 symbols changed" assert patch_row.op_payload["child_domain"] == "python" assert "child_ops" not in patch_row.op_payload # stripped — those are separate rows # Child entry child_row = by_addr["src/app.py::MyClass"] assert child_row.op == "insert" @pytest.mark.asyncio async def test_mutate_op_stored_raw_with_fields( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import build_symbol_index from sqlalchemy import select repo = await create_repo(db_session, slug="raw-mutate") commit = await _commit_with_delta( db_session, repo.repo_id, "raw-c004", ops=[{ "address": "track.mid::note@bar4", "op": "mutate", "entity_id": "test-note-42", "old_content_id": "sha256:old", "new_content_id": "sha256:new", "fields": {"velocity": {"old": "80", "new": "100"}}, "old_summary": "velocity 80", "new_summary": "velocity 100", "position": 3, }], ) await build_symbol_index(db_session, repo.repo_id, commit.commit_id) row = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalar_one() assert row.op == "mutate" assert row.op_payload["entity_id"] == "test-note-42" assert row.op_payload["fields"] == {"velocity": {"old": "80", "new": "100"}} assert row.op_payload["new_summary"] == "velocity 100" @pytest.mark.asyncio async def test_patch_with_from_address_is_rename( self, db_session: AsyncSession ) -> None: """PatchOp with from_address is a file rename+modify; from_address in payload.""" from musehub.services.musehub_symbol_indexer import build_symbol_index from sqlalchemy import select repo = await create_repo(db_session, slug="raw-rename") commit = await _commit_with_delta( db_session, repo.repo_id, "raw-c005", ops=[{ "address": "src/new.py", "op": "patch", "from_address": "src/old.py", "child_domain": "python", "child_summary": "file renamed", "child_ops": [], }], ) await build_symbol_index(db_session, repo.repo_id, commit.commit_id) row = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.address == "src/new.py", ) )).scalar_one() assert row.op == "patch" assert row.op_payload["from_address"] == "src/old.py" @pytest.mark.asyncio async def test_op_payload_excludes_op_and_address_keys( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import build_symbol_index from sqlalchemy import select repo = await create_repo(db_session, slug="raw-exclude") commit = await _commit_with_delta( db_session, repo.repo_id, "raw-c006", ops=[{ "address": "util.py::helper", "op": "insert", "content_id": "sha256:ccc", "content_summary": "added helper", "position": 1, }], ) await build_symbol_index(db_session, repo.repo_id, commit.commit_id) row = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalar_one() assert "op" not in row.op_payload assert "address" not in row.op_payload # ───────────────────────────────────────────────────────────────────────────── # Layer 2 — Integration: build_symbol_index + read functions # ───────────────────────────────────────────────────────────────────────────── class TestBuildSymbolIndex: @pytest.mark.asyncio async def test_returns_empty_when_no_structured_delta( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import build_symbol_index from tests.factories import create_commit repo = await create_repo(db_session, slug="idx-nodelta") commit = await create_commit(db_session, repo.repo_id, branch="main") results = await build_symbol_index(db_session, repo.repo_id, commit.commit_id) assert results == [] @pytest.mark.asyncio async def test_returns_results_for_structured_delta( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session, slug="idx-creates") commit = await _commit_with_delta( db_session, repo.repo_id, "c001", ops=[_insert_op("main.py::Foo", "sha256:aaa")], ) results = await _build_and_persist(db_session, repo.repo_id, commit.commit_id) await db_session.commit() assert results types = {t for t, _ in results} # Aggregate blobs are still produced. assert "code.intel_summary" in types assert "code.intel_snapshot" in types # Per-symbol data now lives in normalized tables, not in blobs. assert "code.symbol_history" not in types assert "code.hash_occurrence" not in types assert "code.per_symbol_intel" not in types # Confirm normalized rows were written. history = await load_symbol_history(db_session, repo.repo_id) assert "main.py::Foo" in history @pytest.mark.asyncio async def test_symbol_history_contains_correct_entries( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session, slug="idx-symhist") commit = await _commit_with_delta( db_session, repo.repo_id, "c002", ops=[ _insert_op("src/app.py::MyClass", "sha256:class"), _insert_op("src/app.py::my_func", "sha256:func"), ], ) await _build_and_persist(db_session, repo.repo_id, commit.commit_id) await db_session.commit() entries = await load_symbol_history(db_session, repo.repo_id) assert "src/app.py::MyClass" in entries assert "src/app.py::my_func" in entries assert entries["src/app.py::MyClass"][0]["op"] == "insert" @pytest.mark.asyncio async def test_hash_occurrence_tracks_shared_content( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="idx-hashoc") shared_hash = "sha256:shared" commit = await _commit_with_delta( db_session, repo.repo_id, "c003", ops=[ _insert_op("a.py::Foo", shared_hash), _insert_op("b.py::Bar", shared_hash), ], ) from musehub.services.musehub_symbol_indexer import load_hash_occurrence await _build_and_persist(db_session, repo.repo_id, commit.commit_id) await db_session.commit() entries = await load_hash_occurrence(db_session, repo.repo_id) assert shared_hash in entries assert set(entries[shared_hash]) == {"a.py::Foo", "b.py::Bar"} @pytest.mark.asyncio async def test_rebuild_upserts_one_row_per_intel_type( self, db_session: AsyncSession ) -> None: from sqlalchemy import select, func repo = await create_repo(db_session, slug="idx-prune") c1 = await _commit_with_delta(db_session, repo.repo_id, "c100", ops=[_insert_op("f.py::A")]) await _build_and_persist(db_session, repo.repo_id, c1.commit_id) await db_session.commit() c2 = await _commit_with_delta(db_session, repo.repo_id, "c101", ops=[_insert_op("f.py::B")]) await _build_and_persist(db_session, repo.repo_id, c2.commit_id) await db_session.commit() # code.symbol_history is no longer a blob — it lives in normalized rows. # intel_summary/intel_snapshot are the only blobs, each upserted once. blob_count = (await db_session.execute( select(func.count()).select_from(MusehubIntelResult).where( MusehubIntelResult.repo_id == repo.repo_id, MusehubIntelResult.intel_type == "code.intel_summary", ) )).scalar_one() assert blob_count == 1 @pytest.mark.asyncio async def test_bfs_excludes_orphaned_commits( self, db_session: AsyncSession ) -> None: """Commits not reachable from head must not appear in the symbol index.""" from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session, slug="idx-bfs") await _commit_with_delta( db_session, repo.repo_id, "orphan", ops=[_insert_op("orphan.py::OrphanSym", "sha256:orphan")], parent_ids=[], ) head = await _commit_with_delta( db_session, repo.repo_id, "head", ops=[_insert_op("main.py::RealSym", "sha256:real")], parent_ids=[], ) await _build_and_persist(db_session, repo.repo_id, head.commit_id) await db_session.commit() history = await load_symbol_history(db_session, repo.repo_id) assert "main.py::RealSym" in history assert "orphan.py::OrphanSym" not in history class TestLoadFunctions: @pytest.mark.asyncio async def test_load_symbol_history_empty_when_no_index( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session, slug="load-noindex") result = await load_symbol_history(db_session, repo.repo_id) assert result == {} @pytest.mark.asyncio async def test_load_symbol_history_with_file_path_filter( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session, slug="load-filter") commit = await _commit_with_delta( db_session, repo.repo_id, "cF01", ops=[ _insert_op("a.py::Foo", "sha256:x"), _insert_op("a.py", "sha256:file"), _insert_op("b.py::Bar", "sha256:y"), ], ) await _build_and_persist(db_session, repo.repo_id, commit.commit_id) await db_session.commit() result = await load_symbol_history(db_session, repo.repo_id, file_path="a.py") assert "a.py::Foo" in result assert "a.py" in result assert "b.py::Bar" not in result @pytest.mark.asyncio async def test_load_hash_occurrence_empty_when_no_index( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import load_hash_occurrence repo = await create_repo(db_session, slug="hash-noindex") assert await load_hash_occurrence(db_session, repo.repo_id) == {} @pytest.mark.asyncio async def test_load_hash_occurrence_returns_correct_entries( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import load_hash_occurrence repo = await create_repo(db_session, slug="hash-entries") commit = await _commit_with_delta( db_session, repo.repo_id, "cH01", ops=[_insert_op("x.py::X", "sha256:hash1"), _insert_op("y.py::Y", "sha256:hash1")], ) await _build_and_persist(db_session, repo.repo_id, commit.commit_id) await db_session.commit() result = await load_hash_occurrence(db_session, repo.repo_id) assert "sha256:hash1" in result assert set(result["sha256:hash1"]) == {"x.py::X", "y.py::Y"} @pytest.mark.asyncio async def test_get_index_meta_none_when_no_index( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import get_index_meta repo = await create_repo(db_session, slug="meta-none") assert await get_index_meta(db_session, repo.repo_id) is None @pytest.mark.asyncio async def test_get_index_meta_returns_ref_and_symbol_count( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import get_index_meta repo = await create_repo(db_session, slug="meta-ok") commit = await _commit_with_delta( db_session, repo.repo_id, "cM01", ops=[_insert_op("f.py::A"), _insert_op("f.py::B")], ) await _build_and_persist(db_session, repo.repo_id, commit.commit_id) await db_session.commit() meta = await get_index_meta(db_session, repo.repo_id) assert meta is not None assert meta["ref"] == commit.commit_id assert meta["built_at"] is not None assert meta["symbol_count"] >= 2 @pytest.mark.asyncio async def test_load_intel_snapshot_none_when_no_index( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import load_intel_snapshot repo = await create_repo(db_session, slug="intel-none") assert await load_intel_snapshot(db_session, repo.repo_id) is None @pytest.mark.asyncio async def test_load_intel_snapshot_returns_snapshot_when_built( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import load_intel_snapshot repo = await create_repo(db_session, slug="intel-ok") commit = await _commit_with_delta( db_session, repo.repo_id, "cI01", ops=[_insert_op("app.py::Handler", "sha256:h1")], ) results = await _build_and_persist(db_session, repo.repo_id, commit.commit_id) await db_session.commit() assert results, "build_symbol_index returned empty results" assert any(t == "code.intel_snapshot" for t, _ in results), "code.intel_snapshot not in results" snap = await load_intel_snapshot(db_session, repo.repo_id) assert snap is not None class TestGetSnapshotManifestsBatch: @pytest.mark.asyncio async def test_empty_list_returns_empty_dict( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_snapshot import get_snapshot_manifests_batch result = await get_snapshot_manifests_batch(db_session, []) assert result == {} @pytest.mark.asyncio async def test_single_snapshot_manifest( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_snapshot import ( get_snapshot_manifests_batch, upsert_snapshot_entries, ) repo = await create_repo(db_session, slug="batch-single") snap_id = "snap-batch-01" await upsert_snapshot_entries( db_session, repo.repo_id, snap_id, {"a.py": "sha256:a", "b.py": "sha256:b"} ) await db_session.commit() result = await get_snapshot_manifests_batch(db_session, [snap_id]) assert snap_id in result assert result[snap_id]["a.py"] == "sha256:a" assert result[snap_id]["b.py"] == "sha256:b" @pytest.mark.asyncio async def test_multiple_snapshots_grouped_correctly( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_snapshot import ( get_snapshot_manifests_batch, upsert_snapshot_entries, ) repo = await create_repo(db_session, slug="batch-multi") for i in range(5): snap_id = f"snap-multi-{i:02d}" await upsert_snapshot_entries( db_session, repo.repo_id, snap_id, {f"file{i}.py": long_id(f"{i}")} ) await db_session.commit() ids = [f"snap-multi-{i:02d}" for i in range(5)] result = await get_snapshot_manifests_batch(db_session, ids) assert len(result) == 5 for i, sid in enumerate(ids): assert f"file{i}.py" in result[sid] @pytest.mark.asyncio async def test_unknown_snapshot_id_returns_empty_manifest( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_snapshot import get_snapshot_manifests_batch result = await get_snapshot_manifests_batch(db_session, ["ghost-snap"]) assert result == {"ghost-snap": {}} # ───────────────────────────────────────────────────────────────────────────── # Layer 3 — E2E: full pipeline via direct service calls with real DB # ───────────────────────────────────────────────────────────────────────────── class TestSymbolIndexPipeline: @pytest.mark.asyncio async def test_build_then_meta_reflects_head_commit( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import get_index_meta repo = await create_repo(db_session, slug="e2e-pipeline") c1 = await _commit_with_delta( db_session, repo.repo_id, "pipe-c001", ops=[_insert_op("service.py::APIHandler", "sha256:h1")], ) await _build_and_persist(db_session, repo.repo_id, c1.commit_id) await db_session.commit() meta = await get_index_meta(db_session, repo.repo_id) assert meta is not None assert meta["ref"] == c1.commit_id @pytest.mark.asyncio async def test_rebuild_updates_ref_to_latest_commit( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import get_index_meta repo = await create_repo(db_session, slug="e2e-rebuild") c1 = await _commit_with_delta(db_session, repo.repo_id, "rb-c001", ops=[_insert_op("a.py::Old")]) await _build_and_persist(db_session, repo.repo_id, c1.commit_id) await db_session.commit() c2 = await _commit_with_delta(db_session, repo.repo_id, "rb-c002", ops=[_insert_op("b.py::New")], parent_ids=[c1.commit_id]) await _build_and_persist(db_session, repo.repo_id, c2.commit_id) await db_session.commit() meta = await get_index_meta(db_session, repo.repo_id) assert meta is not None assert meta["ref"] == c2.commit_id @pytest.mark.asyncio async def test_multi_commit_chain_all_symbols_indexed( self, db_session: AsyncSession ) -> None: """3-commit chain — every symbol from every commit must appear in the index.""" from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session, slug="e2e-chain") c1 = await _commit_with_delta(db_session, repo.repo_id, "chain-c001", ops=[_insert_op("a.py::A1")]) c2 = await _commit_with_delta(db_session, repo.repo_id, "chain-c002", ops=[_insert_op("b.py::B1")], parent_ids=[c1.commit_id]) c3 = await _commit_with_delta(db_session, repo.repo_id, "chain-c003", ops=[_insert_op("c.py::C1")], parent_ids=[c2.commit_id]) await _build_and_persist(db_session, repo.repo_id, c3.commit_id) await db_session.commit() history = await load_symbol_history(db_session, repo.repo_id) assert "a.py::A1" in history assert "b.py::B1" in history assert "c.py::C1" in history # ───────────────────────────────────────────────────────────────────────────── # Layer 4 — Data Integrity # ───────────────────────────────────────────────────────────────────────────── class TestDataIntegrity: @pytest.mark.asyncio async def test_upsert_atomic_replace_removes_stale_entries( self, db_session: AsyncSession ) -> None: """Different snap_ids store different manifests independently.""" from musehub.services.musehub_snapshot import ( get_snapshot_manifest, upsert_snapshot_entries, ) repo = await create_repo(db_session, slug="di-atomic") snap_id_a = "snap-atomic-a" snap_id_b = "snap-atomic-b" await upsert_snapshot_entries( db_session, repo.repo_id, snap_id_a, {"old_file.py": "sha256:old", "shared.py": "sha256:shared"}, ) await db_session.commit() await upsert_snapshot_entries( db_session, repo.repo_id, snap_id_b, {"new_file.py": "sha256:new"}, ) await db_session.commit() manifest_b = await get_snapshot_manifest(db_session, snap_id_b) assert "new_file.py" in manifest_b assert "old_file.py" not in manifest_b manifest_a = await get_snapshot_manifest(db_session, snap_id_a) assert "old_file.py" in manifest_a @pytest.mark.asyncio async def test_only_one_result_per_intel_type_after_multiple_builds( self, db_session: AsyncSession ) -> None: from sqlalchemy import select, func repo = await create_repo(db_session, slug="di-onerow") for i in range(3): c = await _commit_with_delta( db_session, repo.repo_id, f"di-c{i:03d}", ops=[_insert_op(f"f{i}.py::Sym")], ) await _build_and_persist(db_session, repo.repo_id, c.commit_id) await db_session.commit() # code.symbol_history is no longer a blob. # intel_summary must exist with exactly one row (upserted each push). count = (await db_session.execute( select(func.count()).select_from(MusehubIntelResult).where( MusehubIntelResult.repo_id == repo.repo_id, MusehubIntelResult.intel_type == "code.intel_summary", ) )).scalar_one() assert count == 1 @pytest.mark.asyncio async def test_symbol_history_includes_commit_id_and_timestamp( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="di-fields") commit = await _commit_with_delta( db_session, repo.repo_id, "di-field-001", ops=[_insert_op("service.py::MyFn", "sha256:myfn")], ) from musehub.services.musehub_symbol_indexer import load_symbol_history await _build_and_persist(db_session, repo.repo_id, commit.commit_id) await db_session.commit() entries = await load_symbol_history(db_session, repo.repo_id) entry = entries["service.py::MyFn"][0] assert entry["commit_id"] == commit.commit_id assert entry["committed_at"] != "" assert entry["op"] == "insert" assert entry["content_id"] == "sha256:myfn" # ───────────────────────────────────────────────────────────────────────────── # Layer 5 — Security # ───────────────────────────────────────────────────────────────────────────── class TestSecurity: @pytest.mark.asyncio async def test_corrupt_json_returns_empty_not_exception( self, db_session: AsyncSession ) -> None: """A corrupt code.symbol_history data_json must return {} — not raise.""" from musehub.services.musehub_symbol_indexer import load_symbol_history from musehub.core.genesis import compute_intel_result_id repo = await create_repo(db_session, slug="sec-corrupt") # Manually insert a row with garbage JSON result_id = compute_intel_result_id(repo.repo_id, "code.symbol_history", "bad-ref") from sqlalchemy.dialects.postgresql import insert as pg_insert await db_session.execute( pg_insert(MusehubIntelResult).values( result_id=result_id, repo_id=repo.repo_id, intel_type="code.symbol_history", domain="code", ref="bad-ref", data_json="not valid json {{{{", schema_version=1, computed_at=_now(), ).on_conflict_do_nothing() ) await db_session.commit() result = await load_symbol_history(db_session, repo.repo_id) assert result == {} @pytest.mark.asyncio async def test_build_with_unknown_head_commit_returns_empty( self, db_session: AsyncSession ) -> None: """Unknown head_commit_id must return [], not raise.""" from musehub.services.musehub_symbol_indexer import build_symbol_index repo = await create_repo(db_session, slug="sec-unknown-head") results = await build_symbol_index( db_session, repo.repo_id, "nonexistent-commit-id" ) assert results == [] @pytest.mark.asyncio async def test_corrupt_hash_occurrence_returns_empty( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import load_hash_occurrence from musehub.core.genesis import compute_intel_result_id from sqlalchemy.dialects.postgresql import insert as pg_insert repo = await create_repo(db_session, slug="sec-corrupt-hash") result_id = compute_intel_result_id(repo.repo_id, "code.hash_occurrence", "bad-ref") await db_session.execute( pg_insert(MusehubIntelResult).values( result_id=result_id, repo_id=repo.repo_id, intel_type="code.hash_occurrence", domain="code", ref="bad-ref", data_json="} invalid {", schema_version=1, computed_at=_now(), ).on_conflict_do_nothing() ) await db_session.commit() result = await load_hash_occurrence(db_session, repo.repo_id) assert result == {} # ───────────────────────────────────────────────────────────────────────────── # Layer 5B — Per-symbol intel # ───────────────────────────────────────────────────────────────────────────── class TestPerSymbolIntel: @pytest.mark.asyncio async def test_early_return_when_already_current( self, db_session: AsyncSession ) -> None: """When the index is current and code.per_symbol_intel exists, build_symbol_index must return [] (early exit, no recompute).""" from musehub.services.musehub_symbol_indexer import build_symbol_index repo = await create_repo(db_session, slug="bfil-current") commit = await _commit_with_delta( db_session, repo.repo_id, "bfil-c001", ops=[_insert_op("svc.py::Handler", "sha256:h1")], ) await _build_and_persist(db_session, repo.repo_id, commit.commit_id) await db_session.commit() # Second call with same head: must early-return (empty list). results2 = await build_symbol_index(db_session, repo.repo_id, commit.commit_id) assert results2 == [], ( "build_symbol_index must return [] when index is current " "and per_symbol_intel result exists." ) @pytest.mark.asyncio async def test_per_symbol_intel_populated_on_first_build( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import lookup_symbol_intel repo = await create_repo(db_session, slug="bfil-fresh") commit = await _commit_with_delta( db_session, repo.repo_id, "bfil-fresh-c001", ops=[_insert_op("api.py::Router", "sha256:r1")], ) await _build_and_persist(db_session, repo.repo_id, commit.commit_id) await db_session.commit() psi_data = await lookup_symbol_intel(db_session, repo.repo_id, ["api.py::Router"]) assert "api.py::Router" in psi_data @pytest.mark.asyncio async def test_per_symbol_intel_contains_expected_fields( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import lookup_symbol_intel repo = await create_repo(db_session, slug="bfil-fields") commit = await _commit_with_delta( db_session, repo.repo_id, "bfil-fields-c001", ops=[_insert_op("lib.py::Parser", "sha256:p1")], ) await _build_and_persist(db_session, repo.repo_id, commit.commit_id) await db_session.commit() psi_data = await lookup_symbol_intel(db_session, repo.repo_id, ["lib.py::Parser"]) entry = psi_data["lib.py::Parser"] for field in ("churn", "churn_30d", "churn_90d", "blast", "blast_direct", "blast_cross", "blast_top", "last_changed", "last_author", "author_count", "gravity", "weekly"): assert field in entry, f"Missing field '{field}' in per_symbol intel entry." @pytest.mark.asyncio async def test_author_count_reflects_unique_authors( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="bfil-authors") authors_seq = [("alice", "bfil-authors-c001"), ("bob", "bfil-authors-c002"), ("alice", "bfil-authors-c003")] prev_id: list[str] = [] for i, (author, cid) in enumerate(authors_seq, start=1): commit = await _commit_with_delta( db_session, repo.repo_id, cid, ops=[_insert_op("lib.py::Widget", f"sha256:w{i}")], parent_ids=prev_id, author=author, ) prev_id = [cid] await _build_and_persist(db_session, repo.repo_id, commit.commit_id) await db_session.commit() from musehub.services.musehub_symbol_indexer import lookup_symbol_intel psi_data = await lookup_symbol_intel(db_session, repo.repo_id, ["lib.py::Widget"]) entry = psi_data["lib.py::Widget"] assert entry["author_count"] == 2, ( f"Expected 2 unique authors (alice, bob), got {entry['author_count']}" ) assert entry["churn"] == 3 @pytest.mark.asyncio async def test_lookup_symbol_intel_returns_matching_addresses( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import lookup_symbol_intel repo = await create_repo(db_session, slug="bfil-lookup") commit = await _commit_with_delta( db_session, repo.repo_id, "bfil-lookup-c001", ops=[ _insert_op("a.py::Foo", "sha256:f1"), _insert_op("b.py::Bar", "sha256:b1"), _insert_op("c.py::Baz", "sha256:z1"), ], ) await _build_and_persist(db_session, repo.repo_id, commit.commit_id) await db_session.commit() result = await lookup_symbol_intel(db_session, repo.repo_id, ["a.py::Foo", "c.py::Baz"]) assert set(result.keys()) == {"a.py::Foo", "c.py::Baz"} assert "b.py::Bar" not in result @pytest.mark.asyncio async def test_lookup_symbol_intel_returns_empty_when_no_index( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import lookup_symbol_intel repo = await create_repo(db_session, slug="bfil-lookup-null") result = await lookup_symbol_intel(db_session, repo.repo_id, ["core.py::Engine"]) assert result == {} # ───────────────────────────────────────────────────────────────────────────── # Layer 6 — Stress # ───────────────────────────────────────────────────────────────────────────── class TestStress: @pytest.mark.asyncio async def test_upsert_1000_file_manifest(self, db_session: AsyncSession) -> None: from musehub.services.musehub_snapshot import ( get_snapshot_manifest, upsert_snapshot_entries, ) repo = await create_repo(db_session, slug="stress-1k-snap") snap_id = "snap-1k" manifest = {f"src/file_{i:04d}.py": long_id(f"{i:04d}") for i in range(1000)} await upsert_snapshot_entries(db_session, repo.repo_id, snap_id, manifest) await db_session.commit() result = await get_snapshot_manifest(db_session, snap_id) assert len(result) == 1000 assert result["src/file_0500.py"] == "sha256:0500" @pytest.mark.asyncio async def test_batch_manifest_50_snapshots(self, db_session: AsyncSession) -> None: from musehub.services.musehub_snapshot import ( get_snapshot_manifests_batch, upsert_snapshot_entries, ) repo = await create_repo(db_session, slug="stress-batch-50") ids: list[str] = [] for i in range(50): sid = f"stress-snap-{i:02d}" ids.append(sid) await upsert_snapshot_entries( db_session, repo.repo_id, sid, {f"f{i}.py": long_id(f"{i}")}, ) await db_session.commit() result = await get_snapshot_manifests_batch(db_session, ids) assert len(result) == 50 for i, sid in enumerate(ids): assert f"f{i}.py" in result[sid] @pytest.mark.asyncio async def test_build_symbol_index_100_commits( self, db_session: AsyncSession ) -> None: """100-commit chain with 5 ops each — indexer must complete successfully.""" from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session, slug="stress-100-commits") prev_id: str | None = None head_id = "stress-head" for i in range(100): cid = f"stress-{i:04d}" if i < 99 else head_id ops = [_insert_op(f"file{i}.py::Sym{j}", long_id(f"{i}{j}")) for j in range(5)] commit = await _commit_with_delta( db_session, repo.repo_id, cid, ops=ops, parent_ids=[prev_id] if prev_id else [], ) prev_id = commit.commit_id await _build_and_persist(db_session, repo.repo_id, head_id) await db_session.commit() history = await load_symbol_history(db_session, repo.repo_id) # 100 files × 5 symbols each = 500 top-level symbol entries assert len(history) == 500 @pytest.mark.asyncio async def test_load_symbol_history_file_filter_on_large_index( self, db_session: AsyncSession ) -> None: """Filter on large index returns only matching addresses.""" from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session, slug="stress-filter-large") ops = [] for i in range(50): for j in range(10): ops.append(_insert_op(f"src/module_{i:02d}.py::Sym{j}", long_id(f"{i}{j}"))) commit = await _commit_with_delta(db_session, repo.repo_id, "stress-fl-head", ops=ops) await _build_and_persist(db_session, repo.repo_id, commit.commit_id) await db_session.commit() result = await load_symbol_history(db_session, repo.repo_id, file_path="src/module_05.py") assert len(result) == 10 for key in result: assert key.startswith("src/module_05.py") # ───────────────────────────────────────────────────────────────────────────── # Layer: backfill_genesis_ops # ───────────────────────────────────────────────────────────────────────────── class TestBackfillGenesisOps: """backfill_genesis_ops corrects birth entries that were indexed as op='modify' because the genesis commit had no structured_delta.""" async def _seed_bad_birth( self, session: AsyncSession, repo_id: str, address: str = "src/a.py::my_fn", op: str = "modify", ) -> MusehubSymbolHistoryEntry: """Insert a history entry that simulates a mis-indexed birth op.""" from datetime import timedelta entry = MusehubSymbolHistoryEntry( repo_id=repo_id, address=address, commit_id=blob_id(secrets.token_bytes(16)), committed_at=_now() - timedelta(days=10), author="gabriel", op=op, content_id=blob_id(secrets.token_bytes(16)), ) session.add(entry) await session.flush() return entry @pytest.mark.asyncio async def test_dry_run_returns_count_without_writing( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import backfill_genesis_ops from sqlalchemy import select repo = await create_repo(db_session, slug="bf-dry-run") await self._seed_bad_birth(db_session, repo.repo_id, op="modify") await db_session.flush() count = await backfill_genesis_ops(db_session, repo_id=repo.repo_id, dry_run=True) assert count == 1 # Nothing written — row still has op='modify' rows = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalars().all() assert all(r.op == "modify" for r in rows) @pytest.mark.asyncio async def test_corrects_modify_to_add(self, db_session: AsyncSession) -> None: from musehub.services.musehub_symbol_indexer import backfill_genesis_ops from sqlalchemy import select repo = await create_repo(db_session, slug="bf-modify") entry = await self._seed_bad_birth(db_session, repo.repo_id, op="modify") await db_session.flush() updated = await backfill_genesis_ops(db_session, repo_id=repo.repo_id) assert updated == 1 refreshed = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.address == entry.address, MusehubSymbolHistoryEntry.commit_id == entry.commit_id, ) )).scalar_one() assert refreshed.op == "add" @pytest.mark.asyncio async def test_skips_entries_already_add(self, db_session: AsyncSession) -> None: from musehub.services.musehub_symbol_indexer import backfill_genesis_ops repo = await create_repo(db_session, slug="bf-already-add") await self._seed_bad_birth(db_session, repo.repo_id, op="add") await db_session.flush() updated = await backfill_genesis_ops(db_session, repo_id=repo.repo_id) assert updated == 0 @pytest.mark.asyncio async def test_only_corrects_oldest_entry_not_later_modifies( self, db_session: AsyncSession ) -> None: """A subsequent modify on the same symbol must not be changed.""" from datetime import timedelta from musehub.services.musehub_symbol_indexer import backfill_genesis_ops from sqlalchemy import select repo = await create_repo(db_session, slug="bf-oldest-only") address = "src/b.py::helper" birth = MusehubSymbolHistoryEntry( repo_id=repo.repo_id, address=address, commit_id=blob_id(secrets.token_bytes(16)), committed_at=_now() - timedelta(days=5), author="gabriel", op="modify", content_id=blob_id(secrets.token_bytes(16)), ) later = MusehubSymbolHistoryEntry( repo_id=repo.repo_id, address=address, commit_id=blob_id(secrets.token_bytes(16)), committed_at=_now() - timedelta(days=1), author="gabriel", op="modify", content_id=blob_id(secrets.token_bytes(16)), ) session = db_session session.add(birth) session.add(later) await session.flush() updated = await backfill_genesis_ops(session, repo_id=repo.repo_id) assert updated == 1 rows = (await session.execute( select(MusehubSymbolHistoryEntry) .where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.address == address, ) .order_by(MusehubSymbolHistoryEntry.committed_at.asc()) )).scalars().all() assert rows[0].op == "add" # birth corrected assert rows[1].op == "modify" # later change untouched @pytest.mark.asyncio async def test_repo_id_none_corrects_all_repos( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import backfill_genesis_ops repo_a = await create_repo(db_session, slug="bf-all-a") repo_b = await create_repo(db_session, slug="bf-all-b") await self._seed_bad_birth(db_session, repo_a.repo_id, op="modify") await self._seed_bad_birth(db_session, repo_b.repo_id, op="modify") await db_session.flush() updated = await backfill_genesis_ops(db_session, repo_id=None) assert updated >= 2 @pytest.mark.asyncio async def test_idempotent(self, db_session: AsyncSession) -> None: """Running twice returns 0 on the second pass.""" from musehub.services.musehub_symbol_indexer import backfill_genesis_ops repo = await create_repo(db_session, slug="bf-idempotent") await self._seed_bad_birth(db_session, repo.repo_id, op="modify") await db_session.flush() first = await backfill_genesis_ops(db_session, repo_id=repo.repo_id) assert first == 1 second = await backfill_genesis_ops(db_session, repo_id=repo.repo_id) assert second == 0 # ───────────────────────────────────────────────────────────────────────────── # Layer: backfill_content_ids_from_snapshots # ───────────────────────────────────────────────────────────────────────────── class TestBackfillContentIdsFromSnapshots: """backfill_content_ids_from_snapshots fills missing content_id values on file-level history entries by reading snapshot manifests from the DAG.""" async def _seed_snapshot_and_commit( self, session: AsyncSession, repo_id: str, manifest: dict[str, str], commit_id: str | None = None, ) -> tuple[MusehubSnapshot, MusehubCommit]: """Insert a snapshot (msgpack manifest) and a commit pointing to it.""" import msgpack cid = commit_id or blob_id(secrets.token_bytes(16)) snap_id = blob_id(secrets.token_bytes(16)) snapshot = MusehubSnapshot( snapshot_id=snap_id, directories=[], manifest_blob=msgpack.packb(manifest, use_bin_type=True), entry_count=len(manifest), ) session.add(snapshot) session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=snap_id)) commit = MusehubCommit( commit_id=cid, branch="main", parent_ids=[], message="test commit", author="gabriel", timestamp=_now(), snapshot_id=snap_id, ) session.add(commit) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=cid)) await session.flush() return snapshot, commit async def _seed_missing_entry( self, session: AsyncSession, repo_id: str, address: str, commit_id: str, ) -> MusehubSymbolHistoryEntry: """Insert a file-level history entry with content_id=None.""" entry = MusehubSymbolHistoryEntry( repo_id=repo_id, address=address, commit_id=commit_id, committed_at=_now(), author="gabriel", op="add", content_id=None, ) session.add(entry) await session.flush() return entry @pytest.mark.asyncio async def test_dry_run_returns_count_without_writing( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import backfill_content_ids_from_snapshots from sqlalchemy import select repo = await create_repo(db_session, slug="bcid-dry") address = "src/app.ts" content_id = blob_id(secrets.token_bytes(16)) _, commit = await self._seed_snapshot_and_commit( db_session, repo.repo_id, {address: content_id} ) await self._seed_missing_entry(db_session, repo.repo_id, address, commit.commit_id) await db_session.flush() count = await backfill_content_ids_from_snapshots( db_session, repo_id=repo.repo_id, dry_run=True ) assert count == 1 # Nothing written — content_id still None rows = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalars().all() assert all(r.content_id is None for r in rows) @pytest.mark.asyncio async def test_fills_content_id_from_manifest( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import backfill_content_ids_from_snapshots from sqlalchemy import select repo = await create_repo(db_session, slug="bcid-fill") address = "src/app.ts" expected_cid = blob_id(secrets.token_bytes(16)) _, commit = await self._seed_snapshot_and_commit( db_session, repo.repo_id, {address: expected_cid} ) await self._seed_missing_entry(db_session, repo.repo_id, address, commit.commit_id) await db_session.flush() updated = await backfill_content_ids_from_snapshots(db_session, repo_id=repo.repo_id) assert updated == 1 row = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.address == address, MusehubSymbolHistoryEntry.commit_id == commit.commit_id, ) )).scalar_one() assert row.content_id == expected_cid @pytest.mark.asyncio async def test_skips_symbol_level_addresses( self, db_session: AsyncSession ) -> None: """Entries with '::' in the address are symbol-level and must be skipped.""" from musehub.services.musehub_symbol_indexer import backfill_content_ids_from_snapshots repo = await create_repo(db_session, slug="bcid-sym") address = "src/app.ts::MyClass" content_id = blob_id(secrets.token_bytes(16)) _, commit = await self._seed_snapshot_and_commit( db_session, repo.repo_id, {"src/app.ts": content_id} ) entry = MusehubSymbolHistoryEntry( repo_id=repo.repo_id, address=address, commit_id=commit.commit_id, committed_at=_now(), author="gabriel", op="add", content_id=None, ) db_session.add(entry) await db_session.flush() updated = await backfill_content_ids_from_snapshots(db_session, repo_id=repo.repo_id) assert updated == 0 @pytest.mark.asyncio async def test_skips_entries_already_with_content_id( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import backfill_content_ids_from_snapshots from sqlalchemy import select repo = await create_repo(db_session, slug="bcid-skip") address = "src/keep.py" existing_cid = blob_id(secrets.token_bytes(16)) manifest_cid = blob_id(secrets.token_bytes(16)) _, commit = await self._seed_snapshot_and_commit( db_session, repo.repo_id, {address: manifest_cid} ) entry = MusehubSymbolHistoryEntry( repo_id=repo.repo_id, address=address, commit_id=commit.commit_id, committed_at=_now(), author="gabriel", op="add", content_id=existing_cid, ) db_session.add(entry) await db_session.flush() updated = await backfill_content_ids_from_snapshots(db_session, repo_id=repo.repo_id) assert updated == 0 # Original content_id preserved row = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalar_one() assert row.content_id == existing_cid @pytest.mark.asyncio async def test_skips_entry_when_path_absent_from_manifest( self, db_session: AsyncSession ) -> None: """If the manifest doesn't contain the address, the entry is left alone.""" from musehub.services.musehub_symbol_indexer import backfill_content_ids_from_snapshots from sqlalchemy import select repo = await create_repo(db_session, slug="bcid-absent") address = "src/ghost.py" _, commit = await self._seed_snapshot_and_commit( db_session, repo.repo_id, {"src/other.py": blob_id(secrets.token_bytes(16))} ) await self._seed_missing_entry(db_session, repo.repo_id, address, commit.commit_id) await db_session.flush() updated = await backfill_content_ids_from_snapshots(db_session, repo_id=repo.repo_id) assert updated == 0 row = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalar_one() assert row.content_id is None @pytest.mark.asyncio async def test_repo_id_none_fills_all_repos( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import backfill_content_ids_from_snapshots repo_a = await create_repo(db_session, slug="bcid-all-a") repo_b = await create_repo(db_session, slug="bcid-all-b") cid_a = blob_id(secrets.token_bytes(16)) cid_b = blob_id(secrets.token_bytes(16)) _, commit_a = await self._seed_snapshot_and_commit( db_session, repo_a.repo_id, {"src/a.py": cid_a} ) _, commit_b = await self._seed_snapshot_and_commit( db_session, repo_b.repo_id, {"src/b.py": cid_b} ) await self._seed_missing_entry(db_session, repo_a.repo_id, "src/a.py", commit_a.commit_id) await self._seed_missing_entry(db_session, repo_b.repo_id, "src/b.py", commit_b.commit_id) await db_session.flush() updated = await backfill_content_ids_from_snapshots(db_session, repo_id=None) assert updated >= 2 @pytest.mark.asyncio async def test_idempotent(self, db_session: AsyncSession) -> None: """Running twice returns 0 on the second pass.""" from musehub.services.musehub_symbol_indexer import backfill_content_ids_from_snapshots repo = await create_repo(db_session, slug="bcid-idem") address = "src/main.py" cid = blob_id(secrets.token_bytes(16)) _, commit = await self._seed_snapshot_and_commit( db_session, repo.repo_id, {address: cid} ) await self._seed_missing_entry(db_session, repo.repo_id, address, commit.commit_id) await db_session.flush() first = await backfill_content_ids_from_snapshots(db_session, repo_id=repo.repo_id) assert first == 1 second = await backfill_content_ids_from_snapshots(db_session, repo_id=repo.repo_id) assert second == 0 @pytest.mark.asyncio async def test_corrupt_manifest_blob_is_skipped_gracefully( self, db_session: AsyncSession ) -> None: """A corrupt manifest blob must not raise — entry is left with content_id=None.""" from musehub.services.musehub_symbol_indexer import backfill_content_ids_from_snapshots from sqlalchemy import select repo = await create_repo(db_session, slug="bcid-corrupt") snap_id = blob_id(secrets.token_bytes(16)) commit_id = blob_id(secrets.token_bytes(16)) address = "src/broken.py" snapshot = MusehubSnapshot( snapshot_id=snap_id, directories=[], manifest_blob=b"\xff\xfe not msgpack", entry_count=0, ) db_session.add(snapshot) db_session.add(MusehubSnapshotRef(repo_id=repo.repo_id, snapshot_id=snap_id)) commit = MusehubCommit( commit_id=commit_id, branch="main", parent_ids=[], message="corrupt test", author="gabriel", timestamp=_now(), snapshot_id=snap_id, ) db_session.add(commit) db_session.add(MusehubCommitRef(repo_id=repo.repo_id, commit_id=commit_id)) await self._seed_missing_entry(db_session, repo.repo_id, address, commit_id) await db_session.flush() # Must not raise updated = await backfill_content_ids_from_snapshots(db_session, repo_id=repo.repo_id) assert updated == 0 row = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalar_one() assert row.content_id is None # ───────────────────────────────────────────────────────────────────────────── # Layer: backfill_raw_ops_from_commits # ───────────────────────────────────────────────────────────────────────────── class TestBackfillRawOpsFromCommits: """backfill_raw_ops_from_commits re-indexes stale coarse-op rows by reading the original structured_delta from commit_meta.""" async def _seed_commit_with_meta( self, session: AsyncSession, repo_id: str, ops: list[dict], commit_id: str | None = None, ) -> MusehubCommit: cid = commit_id or blob_id(secrets.token_bytes(16)) commit = MusehubCommit( commit_id=cid, branch="main", parent_ids=[], message="test", author="gabriel", timestamp=_now(), structured_delta={"ops": ops}, ) session.add(commit) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=cid)) await session.flush() return commit async def _seed_stale_entry( self, session: AsyncSession, repo_id: str, address: str, commit_id: str, coarse_op: str, content_id: str | None = None, ) -> MusehubSymbolHistoryEntry: entry = MusehubSymbolHistoryEntry( repo_id=repo_id, address=address, commit_id=commit_id, committed_at=_now(), author="gabriel", op=coarse_op, op_payload=None, content_id=content_id, ) session.add(entry) await session.flush() return entry @pytest.mark.asyncio async def test_dry_run_returns_count_without_writing( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import backfill_raw_ops_from_commits from sqlalchemy import select repo = await create_repo(db_session, slug="bro-dry") commit = await self._seed_commit_with_meta(db_session, repo.repo_id, [ {"address": "src/a.py::Fn", "op": "insert", "content_id": "sha256:aaa", "content_summary": "added Fn", "position": 0}, ]) await self._seed_stale_entry(db_session, repo.repo_id, "src/a.py::Fn", commit.commit_id, "add") await db_session.flush() count = await backfill_raw_ops_from_commits(db_session, repo_id=repo.repo_id, dry_run=True) assert count == 1 row = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalar_one() assert row.op == "add" assert row.op_payload is None @pytest.mark.asyncio async def test_add_becomes_insert_with_payload( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import backfill_raw_ops_from_commits repo = await create_repo(db_session, slug="bro-insert") commit = await self._seed_commit_with_meta(db_session, repo.repo_id, [ {"address": "src/a.py::Fn", "op": "insert", "content_id": "sha256:aaa", "content_summary": "added function Fn", "position": 0}, ]) await self._seed_stale_entry(db_session, repo.repo_id, "src/a.py::Fn", commit.commit_id, "add", "sha256:aaa") await db_session.flush() updated = await backfill_raw_ops_from_commits(db_session, repo_id=repo.repo_id) assert updated == 1 row = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalar_one() assert row.op == "insert" assert row.op_payload["content_summary"] == "added function Fn" assert row.op_payload["position"] == 0 assert "op" not in row.op_payload assert "address" not in row.op_payload @pytest.mark.asyncio async def test_modify_becomes_replace_with_payload( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import backfill_raw_ops_from_commits repo = await create_repo(db_session, slug="bro-replace") commit = await self._seed_commit_with_meta(db_session, repo.repo_id, [ {"address": "src/b.py::Bar", "op": "replace", "old_content_id": "sha256:old", "new_content_id": "sha256:new", "old_summary": "Bar v1", "new_summary": "Bar v2", "position": None}, ]) await self._seed_stale_entry(db_session, repo.repo_id, "src/b.py::Bar", commit.commit_id, "modify", "sha256:new") await db_session.flush() updated = await backfill_raw_ops_from_commits(db_session, repo_id=repo.repo_id) assert updated == 1 row = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalar_one() assert row.op == "replace" assert row.op_payload["old_content_id"] == "sha256:old" assert row.op_payload["new_content_id"] == "sha256:new" assert row.op_payload["old_summary"] == "Bar v1" @pytest.mark.asyncio async def test_modify_becomes_patch_for_file_level( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import backfill_raw_ops_from_commits repo = await create_repo(db_session, slug="bro-patch") commit = await self._seed_commit_with_meta(db_session, repo.repo_id, [ {"address": "src/c.py", "op": "patch", "child_domain": "python", "child_summary": "3 symbols changed", "child_ops": [ {"address": "src/c.py::Cls", "op": "replace", "old_content_id": "sha256:o", "new_content_id": "sha256:n", "old_summary": "Cls v1", "new_summary": "Cls v2", "position": 0}, ]}, ]) # Both file-level and symbol-level stale entries await self._seed_stale_entry(db_session, repo.repo_id, "src/c.py", commit.commit_id, "modify") await self._seed_stale_entry(db_session, repo.repo_id, "src/c.py::Cls", commit.commit_id, "modify", "sha256:n") await db_session.flush() updated = await backfill_raw_ops_from_commits(db_session, repo_id=repo.repo_id) assert updated == 2 rows = {r.address: r for r in (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalars().all()} assert rows["src/c.py"].op == "patch" assert rows["src/c.py"].op_payload["child_summary"] == "3 symbols changed" assert "child_ops" not in rows["src/c.py"].op_payload assert rows["src/c.py::Cls"].op == "replace" assert rows["src/c.py::Cls"].op_payload["old_content_id"] == "sha256:o" @pytest.mark.asyncio async def test_already_correct_ops_not_touched( self, db_session: AsyncSession ) -> None: """delete and move are already correct raw values — must be skipped.""" from musehub.services.musehub_symbol_indexer import backfill_raw_ops_from_commits repo = await create_repo(db_session, slug="bro-skip") commit = await self._seed_commit_with_meta(db_session, repo.repo_id, [ {"address": "src/d.py::Gone", "op": "delete", "content_id": "sha256:gone", "content_summary": "removed Gone", "position": 0}, ]) await self._seed_stale_entry(db_session, repo.repo_id, "src/d.py::Gone", commit.commit_id, "delete", "sha256:gone") await db_session.flush() updated = await backfill_raw_ops_from_commits(db_session, repo_id=repo.repo_id) assert updated == 0 @pytest.mark.asyncio async def test_entry_missing_from_delta_left_alone( self, db_session: AsyncSession ) -> None: """If the delta has no matching address, the row is left untouched.""" from musehub.services.musehub_symbol_indexer import backfill_raw_ops_from_commits repo = await create_repo(db_session, slug="bro-missing") commit = await self._seed_commit_with_meta(db_session, repo.repo_id, [ {"address": "src/other.py::X", "op": "insert", "content_id": "sha256:x", "content_summary": "added X", "position": 0}, ]) await self._seed_stale_entry(db_session, repo.repo_id, "src/ghost.py::Y", commit.commit_id, "add") await db_session.flush() updated = await backfill_raw_ops_from_commits(db_session, repo_id=repo.repo_id) assert updated == 0 row = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, ) )).scalar_one() assert row.op == "add" @pytest.mark.asyncio async def test_repo_id_none_fixes_all_repos( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import backfill_raw_ops_from_commits repo_a = await create_repo(db_session, slug="bro-all-a") repo_b = await create_repo(db_session, slug="bro-all-b") for repo in (repo_a, repo_b): commit = await self._seed_commit_with_meta(db_session, repo.repo_id, [ {"address": "f.py::Fn", "op": "insert", "content_id": "sha256:x", "content_summary": "added Fn", "position": 0}, ]) await self._seed_stale_entry(db_session, repo.repo_id, "f.py::Fn", commit.commit_id, "add") await db_session.flush() updated = await backfill_raw_ops_from_commits(db_session, repo_id=None) assert updated >= 2 @pytest.mark.asyncio async def test_idempotent(self, db_session: AsyncSession) -> None: from musehub.services.musehub_symbol_indexer import backfill_raw_ops_from_commits repo = await create_repo(db_session, slug="bro-idem") commit = await self._seed_commit_with_meta(db_session, repo.repo_id, [ {"address": "src/e.py::E", "op": "insert", "content_id": "sha256:e", "content_summary": "added E", "position": 0}, ]) await self._seed_stale_entry(db_session, repo.repo_id, "src/e.py::E", commit.commit_id, "add") await db_session.flush() first = await backfill_raw_ops_from_commits(db_session, repo_id=repo.repo_id) assert first == 1 second = await backfill_raw_ops_from_commits(db_session, repo_id=repo.repo_id) assert second == 0 # ───────────────────────────────────────────────────────────────────────────── # Layer 2 — Snapshot-diff backfill # ───────────────────────────────────────────────────────────────────────────── import msgpack # type: ignore[import] async def _seed_commit_with_snapshot( session: AsyncSession, repo_id: str, commit_id: str, manifest: dict[str, str], parent_ids: list[str] | None = None, branch: str = "main", timestamp: datetime | None = None, ) -> MusehubCommit: """Seed a commit + snapshot row. manifest maps path → object_id. Snapshot is content-addressed; two commits with identical manifests share one snapshot row (INSERT ... ON CONFLICT DO NOTHING). """ from sqlalchemy.dialects.postgresql import insert as pg_insert snap_id = blob_id(msgpack.packb(sorted(manifest.items()), use_bin_type=True)) await session.execute( pg_insert(MusehubSnapshot).values( snapshot_id=snap_id, directories=[], manifest_blob=msgpack.packb(manifest, use_bin_type=True), entry_count=len(manifest), created_at=timestamp or _now(), ).on_conflict_do_nothing(index_elements=["snapshot_id"]) ) await session.execute( pg_insert(MusehubSnapshotRef).values( repo_id=repo_id, snapshot_id=snap_id, ).on_conflict_do_nothing() ) commit = MusehubCommit( commit_id=commit_id, branch=branch, parent_ids=parent_ids or [], message="test", author="gabriel", timestamp=timestamp or _now(), snapshot_id=snap_id, ) session.add(commit) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id)) await session.flush() return commit class TestBackfillHistoryFromSnapshots: """backfill_history_from_snapshots walks the commit graph, diffs adjacent snapshot manifests, and creates history entries for any address/commit pair not already covered by structured_delta indexing.""" @pytest.mark.asyncio async def test_genesis_commit_all_inserts(self, db_session: AsyncSession) -> None: """Every file in the first commit (no parent) is recorded as insert.""" from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots repo = await create_repo(db_session, slug="sdb-genesis") await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1", {"a.py": "sha256:aaa", "b.py": "sha256:bbb"}) await db_session.commit() count = await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id) assert count == 2 rows = (await db_session.execute( select(MusehubSymbolHistoryEntry) .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id) )).scalars().all() ops = {r.address: r.op for r in rows} assert ops == {"a.py": "insert", "b.py": "insert"} @pytest.mark.asyncio async def test_new_file_in_child_is_insert(self, db_session: AsyncSession) -> None: """A file present in commit N but absent from commit N-1 is an insert.""" from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots repo = await create_repo(db_session, slug="sdb-insert") t1 = datetime(2026, 1, 1, tzinfo=timezone.utc) t2 = datetime(2026, 1, 2, tzinfo=timezone.utc) await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1", {"a.py": "sha256:aaa"}, timestamp=t1) await _seed_commit_with_snapshot(db_session, repo.repo_id, "c2", {"a.py": "sha256:aaa", "b.py": "sha256:bbb"}, parent_ids=["c1"], timestamp=t2) await db_session.commit() await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id) rows = (await db_session.execute( select(MusehubSymbolHistoryEntry) .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.commit_id == "c2") )).scalars().all() ops = {r.address: r.op for r in rows} assert "b.py" in ops assert ops["b.py"] == "insert" # a.py content unchanged — no entry needed for c2 assert "a.py" not in ops @pytest.mark.asyncio async def test_changed_content_is_replace(self, db_session: AsyncSession) -> None: """A file with a different object_id in the child commit is a replace.""" from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots repo = await create_repo(db_session, slug="sdb-replace") t1 = datetime(2026, 1, 1, tzinfo=timezone.utc) t2 = datetime(2026, 1, 2, tzinfo=timezone.utc) await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1", {"a.py": "sha256:v1"}, timestamp=t1) await _seed_commit_with_snapshot(db_session, repo.repo_id, "c2", {"a.py": "sha256:v2"}, parent_ids=["c1"], timestamp=t2) await db_session.commit() await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id) rows = (await db_session.execute( select(MusehubSymbolHistoryEntry) .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.commit_id == "c2") )).scalars().all() assert len(rows) == 1 assert rows[0].address == "a.py" assert rows[0].op == "replace" assert rows[0].content_id == "sha256:v2" @pytest.mark.asyncio async def test_removed_file_is_delete(self, db_session: AsyncSession) -> None: """A file absent from the child but present in the parent is a delete.""" from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots repo = await create_repo(db_session, slug="sdb-delete") t1 = datetime(2026, 1, 1, tzinfo=timezone.utc) t2 = datetime(2026, 1, 2, tzinfo=timezone.utc) await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1", {"a.py": "sha256:v1", "b.py": "sha256:vb"}, timestamp=t1) await _seed_commit_with_snapshot(db_session, repo.repo_id, "c2", {"a.py": "sha256:v1"}, parent_ids=["c1"], timestamp=t2) await db_session.commit() await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id) rows = (await db_session.execute( select(MusehubSymbolHistoryEntry) .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.commit_id == "c2") )).scalars().all() ops = {r.address: r.op for r in rows} assert ops.get("b.py") == "delete" assert "a.py" not in ops # unchanged @pytest.mark.asyncio async def test_unambiguous_rename_is_move(self, db_session: AsyncSession) -> None: """When exactly one file disappears and one appears with the same object_id, the appearance is recorded as move with from_address in op_payload.""" from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots repo = await create_repo(db_session, slug="sdb-move") t1 = datetime(2026, 1, 1, tzinfo=timezone.utc) t2 = datetime(2026, 1, 2, tzinfo=timezone.utc) await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1", {"old.py": "sha256:content"}, timestamp=t1) await _seed_commit_with_snapshot(db_session, repo.repo_id, "c2", {"new.py": "sha256:content"}, parent_ids=["c1"], timestamp=t2) await db_session.commit() await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id) rows = (await db_session.execute( select(MusehubSymbolHistoryEntry) .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.commit_id == "c2") )).scalars().all() by_addr = {r.address: r for r in rows} assert "new.py" in by_addr assert by_addr["new.py"].op == "move" assert (by_addr["new.py"].op_payload or {}).get("from_address") == "old.py" # old.py emits a delete with to_address pointing to new location assert "old.py" in by_addr assert by_addr["old.py"].op == "delete" assert (by_addr["old.py"].op_payload or {}).get("to_address") == "new.py" @pytest.mark.asyncio async def test_ambiguous_rename_falls_back_to_insert_delete( self, db_session: AsyncSession ) -> None: """Same object_id disappears from two paths → ambiguous rename. Fall back: record inserts for new paths, deletes for old paths.""" from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots repo = await create_repo(db_session, slug="sdb-ambig") shared = "sha256:shared" t1 = datetime(2026, 1, 1, tzinfo=timezone.utc) t2 = datetime(2026, 1, 2, tzinfo=timezone.utc) await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1", {"a.py": shared, "b.py": shared}, timestamp=t1) await _seed_commit_with_snapshot(db_session, repo.repo_id, "c2", {"c.py": shared}, parent_ids=["c1"], timestamp=t2) await db_session.commit() await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id) rows = (await db_session.execute( select(MusehubSymbolHistoryEntry) .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.commit_id == "c2") )).scalars().all() ops = {r.address: r.op for r in rows} # c.py cannot be a move — two candidates for origin assert ops.get("c.py") == "insert" assert ops.get("a.py") == "delete" assert ops.get("b.py") == "delete" def test_diff_manifests_move_emits_delete_with_to_address(self) -> None: """_diff_manifests includes a delete tuple with to_address for move sources.""" from musehub.services.musehub_symbol_indexer import _diff_manifests parent = {"old.py": "sha256:content"} child = {"new.py": "sha256:content"} ops = _diff_manifests(parent, child) by_addr = {addr: (op, extra) for addr, op, extra in ops} # move destination carries from_address assert by_addr["new.py"] == ("move", "old.py") # move source carries to_address (not None) assert "old.py" in by_addr assert by_addr["old.py"][0] == "delete" assert by_addr["old.py"][1] == "new.py" # to_address def test_diff_manifests_ambiguous_delete_has_no_to_address(self) -> None: """Ambiguous renames fall back to plain delete (no to_address).""" from musehub.services.musehub_symbol_indexer import _diff_manifests shared = "sha256:shared" parent = {"a.py": shared, "b.py": shared} child = {"c.py": shared} ops = _diff_manifests(parent, child) by_addr = {addr: (op, extra) for addr, op, extra in ops} # c.py is an insert (ambiguous — two possible sources) assert by_addr["c.py"] == ("insert", None) # plain deletes: no to_address assert by_addr["a.py"] == ("delete", None) assert by_addr["b.py"] == ("delete", None) @pytest.mark.asyncio async def test_move_delete_op_payload_has_to_address( self, db_session: AsyncSession ) -> None: """DELETE entry for a move-source path carries to_address in op_payload.""" from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots repo = await create_repo(db_session, slug="sdb-move-payload") t1 = datetime(2026, 1, 1, tzinfo=timezone.utc) t2 = datetime(2026, 1, 2, tzinfo=timezone.utc) await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1", {"old.py": "sha256:content"}, timestamp=t1) await _seed_commit_with_snapshot(db_session, repo.repo_id, "c2", {"new.py": "sha256:content"}, parent_ids=["c1"], timestamp=t2) await db_session.commit() await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id) rows = (await db_session.execute( select(MusehubSymbolHistoryEntry) .where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.address == "old.py", MusehubSymbolHistoryEntry.commit_id == "c2", ) )).scalars().all() assert len(rows) == 1 row = rows[0] assert row.op == "delete" assert (row.op_payload or {}).get("to_address") == "new.py" assert (row.op_payload or {}).get("inferred_from") == "snapshot_diff" @pytest.mark.asyncio async def test_skips_addresses_already_covered_by_structured_delta( self, db_session: AsyncSession ) -> None: """Addresses that already have a history entry for the commit are not overwritten.""" from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots repo = await create_repo(db_session, slug="sdb-skip") t1 = datetime(2026, 1, 1, tzinfo=timezone.utc) await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1", {"a.py": "sha256:v1"}, timestamp=t1) # Pre-existing entry from structured_delta (e.g. 'patch' — richer semantics) session_entry = MusehubSymbolHistoryEntry( repo_id=repo.repo_id, address="a.py", commit_id="c1", op="patch", op_payload={"from_address": "old/a.py"}, content_id="sha256:v1", committed_at=t1, author="gabriel", ) db_session.add(session_entry) await db_session.commit() count = await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id) assert count == 0 # nothing to do rows = (await db_session.execute( select(MusehubSymbolHistoryEntry) .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id) )).scalars().all() assert len(rows) == 1 assert rows[0].op == "patch" # original preserved @pytest.mark.asyncio async def test_unchanged_files_produce_no_entries( self, db_session: AsyncSession ) -> None: """Files with identical object_ids across parent and child produce no entry.""" from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots repo = await create_repo(db_session, slug="sdb-nochange") t1 = datetime(2026, 1, 1, tzinfo=timezone.utc) t2 = datetime(2026, 1, 2, tzinfo=timezone.utc) await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1", {"a.py": "sha256:same", "b.py": "sha256:same2"}, timestamp=t1) await _seed_commit_with_snapshot(db_session, repo.repo_id, "c2", {"a.py": "sha256:same", "b.py": "sha256:same2"}, parent_ids=["c1"], timestamp=t2) await db_session.commit() await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id) c2_rows = (await db_session.execute( select(MusehubSymbolHistoryEntry) .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.commit_id == "c2") )).scalars().all() assert c2_rows == [] @pytest.mark.asyncio async def test_dry_run_returns_count_without_writing( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots repo = await create_repo(db_session, slug="sdb-dry") await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1", {"a.py": "sha256:v1", "b.py": "sha256:v2"}) await db_session.commit() count = await backfill_history_from_snapshots( db_session, repo_id=repo.repo_id, dry_run=True ) assert count == 2 existing = (await db_session.execute( select(MusehubSymbolHistoryEntry) .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id) )).scalars().all() assert existing == [] @pytest.mark.asyncio async def test_idempotent(self, db_session: AsyncSession) -> None: from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots repo = await create_repo(db_session, slug="sdb-idem") await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1", {"a.py": "sha256:v1"}) await db_session.commit() first = await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id) await db_session.commit() second = await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id) assert first == 1 assert second == 0 @pytest.mark.asyncio async def test_repo_id_filter(self, db_session: AsyncSession) -> None: from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots repo_a = await create_repo(db_session, slug="sdb-filter-a") repo_b = await create_repo(db_session, slug="sdb-filter-b") await _seed_commit_with_snapshot(db_session, repo_a.repo_id, "ca1", {"a.py": "sha256:a"}) await _seed_commit_with_snapshot(db_session, repo_b.repo_id, "cb1", {"b.py": "sha256:b"}) await db_session.commit() count = await backfill_history_from_snapshots(db_session, repo_id=repo_a.repo_id) assert count == 1 a_rows = (await db_session.execute( select(MusehubSymbolHistoryEntry) .where(MusehubSymbolHistoryEntry.repo_id == repo_a.repo_id) )).scalars().all() b_rows = (await db_session.execute( select(MusehubSymbolHistoryEntry) .where(MusehubSymbolHistoryEntry.repo_id == repo_b.repo_id) )).scalars().all() assert len(a_rows) == 1 assert len(b_rows) == 0 @pytest.mark.asyncio async def test_inferred_op_payload_marks_source( self, db_session: AsyncSession ) -> None: """Entries created by snapshot-diff carry inferred_from='snapshot_diff' in op_payload so callers can distinguish them from structured_delta entries.""" from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots repo = await create_repo(db_session, slug="sdb-mark") await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1", {"a.py": "sha256:v1"}) await db_session.commit() await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id) rows = (await db_session.execute( select(MusehubSymbolHistoryEntry) .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id) )).scalars().all() assert len(rows) == 1 assert (rows[0].op_payload or {}).get("inferred_from") == "snapshot_diff" # ───────────────────────────────────────────────────────────────────────────── # Layer 3 — Lineage walk: load_symbol_history follows from_address chains # ───────────────────────────────────────────────────────────────────────────── async def _seed_history_entry( session: AsyncSession, repo_id: str, address: str, commit_id: str, op: str, op_payload: JSONObject | None = None, content_id: str | None = None, committed_at: datetime | None = None, ) -> MusehubSymbolHistoryEntry: """Write a single history row directly (bypasses the indexer).""" row = MusehubSymbolHistoryEntry( repo_id=repo_id, address=address, commit_id=commit_id, op=op, op_payload=op_payload or {}, content_id=content_id, committed_at=committed_at or _now(), author="gabriel", ) session.add(row) await session.flush() return row class TestLoadSymbolHistoryLineage: """load_symbol_history follows from_address chains in op_payload to build full symbol lineage across renames and moves.""" @pytest.mark.asyncio async def test_no_from_address_unchanged(self, db_session: AsyncSession) -> None: """A symbol with no move history is returned as-is.""" from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session, slug="lin-noop") await _seed_history_entry(db_session, repo.repo_id, "src/a.py::Foo", "c1", "insert", content_id="sha256:v1") await _seed_history_entry(db_session, repo.repo_id, "src/a.py::Foo", "c2", "replace", content_id="sha256:v2") await db_session.commit() history = await load_symbol_history(db_session, repo.repo_id) assert "src/a.py::Foo" in history assert len(history["src/a.py::Foo"]) == 2 assert history["src/a.py::Foo"][0]["op"] == "insert" @pytest.mark.asyncio async def test_single_rename_prepends_origin_history( self, db_session: AsyncSession ) -> None: """History for new.py::Foo should include the insert at old.py::Foo.""" from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session, slug="lin-single") # old.py::Foo was inserted, then modified await _seed_history_entry(db_session, repo.repo_id, "old.py::Foo", "c1", "insert", content_id="sha256:v1") await _seed_history_entry(db_session, repo.repo_id, "old.py::Foo", "c2", "replace", content_id="sha256:v2") # new.py::Foo was born via a move from old.py::Foo await _seed_history_entry( db_session, repo.repo_id, "new.py::Foo", "c3", "move", op_payload={"from_address": "old.py::Foo"}, content_id="sha256:v2", ) await db_session.commit() history = await load_symbol_history(db_session, repo.repo_id) # The new address should have the full chain: insert → replace → move assert "new.py::Foo" in history ops = [e["op"] for e in history["new.py::Foo"]] assert ops[0] == "insert", f"Expected insert first, got: {ops}" assert ops[-1] == "move", f"Expected move last, got: {ops}" assert len(ops) == 3 @pytest.mark.asyncio async def test_origin_address_excluded_from_top_level_keys( self, db_session: AsyncSession ) -> None: """After a rename, the old address should not appear as a top-level key.""" from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session, slug="lin-noold") await _seed_history_entry(db_session, repo.repo_id, "old.py::Foo", "c1", "insert", content_id="sha256:v1") await _seed_history_entry( db_session, repo.repo_id, "new.py::Foo", "c2", "move", op_payload={"from_address": "old.py::Foo"}, content_id="sha256:v1", ) await db_session.commit() history = await load_symbol_history(db_session, repo.repo_id) assert "old.py::Foo" not in history, ( "Origin address should be folded into new.py::Foo's lineage, " "not kept as a separate top-level key" ) @pytest.mark.asyncio async def test_multi_hop_rename_walks_full_chain( self, db_session: AsyncSession ) -> None: """A→B→C chain: history for C includes all entries from A, B, and C.""" from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session, slug="lin-multi") await _seed_history_entry(db_session, repo.repo_id, "a.py::Fn", "c1", "insert", content_id="sha256:v1") await _seed_history_entry( db_session, repo.repo_id, "b.py::Fn", "c2", "move", op_payload={"from_address": "a.py::Fn"}, content_id="sha256:v1", ) await _seed_history_entry( db_session, repo.repo_id, "c.py::Fn", "c3", "move", op_payload={"from_address": "b.py::Fn"}, content_id="sha256:v1", ) await db_session.commit() history = await load_symbol_history(db_session, repo.repo_id) assert "c.py::Fn" in history assert "b.py::Fn" not in history assert "a.py::Fn" not in history ops = [e["op"] for e in history["c.py::Fn"]] assert ops[0] == "insert" assert ops[-1] == "move" assert len(ops) == 3 @pytest.mark.asyncio async def test_lineage_walk_is_bounded_on_missing_origin( self, db_session: AsyncSession ) -> None: """If from_address has no rows, lineage walk stops gracefully.""" from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session, slug="lin-bound") # new.py::Foo claims to have come from ghost.py::Foo, which has no rows await _seed_history_entry( db_session, repo.repo_id, "new.py::Foo", "c1", "move", op_payload={"from_address": "ghost.py::Foo"}, content_id="sha256:v1", ) await db_session.commit() # Must not raise, must not loop history = await load_symbol_history(db_session, repo.repo_id) assert "new.py::Foo" in history assert len(history["new.py::Foo"]) == 1 @pytest.mark.asyncio async def test_file_path_filter_includes_lineage( self, db_session: AsyncSession ) -> None: """file_path filter on new.py returns the full lineage including old.py origin.""" from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session, slug="lin-filter") await _seed_history_entry(db_session, repo.repo_id, "old.py::Foo", "c1", "insert", content_id="sha256:v1") await _seed_history_entry( db_session, repo.repo_id, "new.py::Foo", "c2", "move", op_payload={"from_address": "old.py::Foo"}, content_id="sha256:v1", ) # unrelated symbol in another file await _seed_history_entry(db_session, repo.repo_id, "other.py::Bar", "c3", "insert", content_id="sha256:vx") await db_session.commit() history = await load_symbol_history(db_session, repo.repo_id, file_path="new.py") assert "new.py::Foo" in history assert "other.py::Bar" not in history ops = [e["op"] for e in history["new.py::Foo"]] assert ops[0] == "insert" @pytest.mark.asyncio async def test_lineage_entries_carry_original_address( self, db_session: AsyncSession ) -> None: """Each entry in the merged lineage carries its original address so the UI can show where the symbol lived at that point in time.""" from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session, slug="lin-addr") await _seed_history_entry(db_session, repo.repo_id, "old.py::Foo", "c1", "insert", content_id="sha256:v1") await _seed_history_entry( db_session, repo.repo_id, "new.py::Foo", "c2", "move", op_payload={"from_address": "old.py::Foo"}, content_id="sha256:v1", ) await db_session.commit() history = await load_symbol_history(db_session, repo.repo_id) entries = history["new.py::Foo"] insert_entries = [e for e in entries if e["op"] == "insert"] assert insert_entries, "Expected at least one insert entry in lineage" assert insert_entries[0].get("address") == "old.py::Foo", ( "Lineage entries must carry their original address for UI rendering" )