"""TDD spec for normalized symbol intel schema — SI1–SI40. Current architecture stores per-symbol data as unbounded JSON blobs in musehub_intel_results.data_json. Every symbol page load deserializes megabytes of JSON to return one entry. This test file defines the correct normalized architecture. All tests are RED until the implementation is complete. New tables (replaces code.symbol_history / code.per_symbol_intel / code.hash_occurrence blobs): musehub_symbol_history_entries — one row per (repo_id, address, commit_id) musehub_symbol_intel — one row per (repo_id, address) musehub_hash_occurrence_entries — one row per (content_id, repo_id, address) musehub_intel_results keeps only: code.intel_summary — small scalar aggregate, fine as blob code.intel_snapshot — computed panel data, fine as blob Layers: 1. Schema — ORM model shape, column types, indexes, constraints 2. Write — build_symbol_index upserts normalized rows 3. Read — helpers return correct data from normalized tables 4. Incremental— second push merges without duplication 5. Integrity — corrupt data, unknown refs, empty repos 6. Performance— point lookups sub-millisecond; no full-table deserialize 7. Stress — 500 symbols × 50 commits each 8. Aggregates — intel_summary and intel_snapshot still produced as blobs """ from __future__ import annotations import json import secrets import time from datetime import datetime, timezone import pytest from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import long_id from tests.factories import create_repo from musehub.types.json_types import JSONObject # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── def _now() -> datetime: return datetime.now(tz=timezone.utc) def _uid() -> str: return long_id(secrets.token_hex(32)) def _cid() -> str: return long_id(secrets.token_hex(32)) def _insert_op(address: str, content_id: str | None = None) -> JSONObject: return { "address": address, "op": "insert", "content_id": content_id or _cid(), } def _patch_op(file_addr: str, children: list[JSONObject]) -> JSONObject: return {"address": file_addr, "op": "patch", "child_ops": children} async def _commit_with_delta( session: AsyncSession, repo_id: str, commit_id: str, ops: list[JSONObject], parent_ids: list[str] | None = None, author: str = "gabriel", ) -> None: from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef c = MusehubCommit( commit_id=commit_id, branch="main", parent_ids=parent_ids or [], message="test commit", author=author, timestamp=_now(), structured_delta={"ops": ops}, ) session.add(c) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id)) await session.flush() async def _build_and_persist( session: AsyncSession, repo_id: str, commit_id: str, ) -> list[tuple[str, dict]]: from musehub.services.musehub_symbol_indexer import build_symbol_index from musehub.services.musehub_intel_providers import persist_intel_results results = await build_symbol_index(session, repo_id, commit_id) if results: await persist_intel_results(session, repo_id, commit_id, results) return results # ───────────────────────────────────────────────────────────────────────────── # Layer 1 — Schema: ORM model shape # ───────────────────────────────────────────────────────────────────────────── class TestNormalizedSchemaModels: """SI1–SI6: ORM models for the three new normalized tables exist and have the right columns, primary keys, and indexes.""" def test_SI1_symbol_history_entry_model_importable(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry assert MusehubSymbolHistoryEntry.__tablename__ == "musehub_symbol_history_entries" def test_SI2_symbol_history_entry_columns(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry cols = {c.name for c in MusehubSymbolHistoryEntry.__table__.columns} assert {"repo_id", "address", "commit_id", "committed_at", "author", "op", "content_id"} <= cols def test_SI3_symbol_history_entry_pk(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry pk_cols = {c.name for c in MusehubSymbolHistoryEntry.__table__.primary_key} assert pk_cols == {"repo_id", "address", "commit_id"} def test_SI4_symbol_intel_model_importable(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolIntel assert MusehubSymbolIntel.__tablename__ == "musehub_symbol_intel" def test_SI5_symbol_intel_columns(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolIntel cols = {c.name for c in MusehubSymbolIntel.__table__.columns} assert {"repo_id", "address", "churn", "churn_30d", "churn_90d", "blast", "blast_direct", "blast_cross", "blast_top", "last_changed", "last_author", "author_count", "gravity", "weekly"} <= cols def test_SI6_symbol_intel_pk(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolIntel pk_cols = {c.name for c in MusehubSymbolIntel.__table__.primary_key} assert pk_cols == {"repo_id", "address"} def test_SI7_hash_occurrence_entry_model_importable(self) -> None: from musehub.db.musehub_intel_models import MusehubHashOccurrenceEntry assert MusehubHashOccurrenceEntry.__tablename__ == "musehub_hash_occurrence_entries" def test_SI8_hash_occurrence_entry_pk(self) -> None: from musehub.db.musehub_intel_models import MusehubHashOccurrenceEntry pk_cols = {c.name for c in MusehubHashOccurrenceEntry.__table__.primary_key} assert pk_cols == {"content_id", "repo_id", "address"} # ───────────────────────────────────────────────────────────────────────────── # Layer 2 — Write: build_symbol_index upserts normalized rows # ───────────────────────────────────────────────────────────────────────────── class TestBuildWritesNormalizedRows: """SI9–SI16: build_symbol_index + persist_intel_results write to the normalized tables, not just to intel_results blobs.""" @pytest.mark.asyncio async def test_SI9_single_commit_writes_history_entry_rows( self, db_session: AsyncSession, ) -> None: from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry repo = await create_repo(db_session) commit_id = _uid() await _commit_with_delta(db_session, repo.repo_id, commit_id, [ _insert_op("src/main.py::parse"), _insert_op("src/main.py::render"), ]) await _build_and_persist(db_session, repo.repo_id, commit_id) rows = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id ) )).scalars().all() addresses = {r.address for r in rows} assert "src/main.py::parse" in addresses assert "src/main.py::render" in addresses @pytest.mark.asyncio async def test_SI10_history_entry_commit_id_stored( self, db_session: AsyncSession, ) -> None: from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry repo = await create_repo(db_session) commit_id = _uid() await _commit_with_delta(db_session, repo.repo_id, commit_id, [ _insert_op("src/auth.py::login"), ]) await _build_and_persist(db_session, repo.repo_id, commit_id) row = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.address == "src/auth.py::login", ) )).scalar_one() assert row.commit_id == commit_id assert row.op in ("add", "insert", "modify") @pytest.mark.asyncio async def test_SI11_single_commit_writes_symbol_intel_rows( self, db_session: AsyncSession, ) -> None: from musehub.db.musehub_intel_models import MusehubSymbolIntel repo = await create_repo(db_session) commit_id = _uid() await _commit_with_delta(db_session, repo.repo_id, commit_id, [ _insert_op("src/core.py::Engine"), ]) await _build_and_persist(db_session, repo.repo_id, commit_id) row = (await db_session.execute( select(MusehubSymbolIntel).where( MusehubSymbolIntel.repo_id == repo.repo_id, MusehubSymbolIntel.address == "src/core.py::Engine", ) )).scalar_one_or_none() assert row is not None assert row.churn >= 1 @pytest.mark.asyncio async def test_SI12_hash_occurrence_rows_written( self, db_session: AsyncSession, ) -> None: from musehub.db.musehub_intel_models import MusehubHashOccurrenceEntry repo = await create_repo(db_session) commit_id = _uid() shared_content_id = _cid() await _commit_with_delta(db_session, repo.repo_id, commit_id, [ _insert_op("src/a.py::foo", shared_content_id), _insert_op("src/b.py::bar", shared_content_id), ]) await _build_and_persist(db_session, repo.repo_id, commit_id) rows = (await db_session.execute( select(MusehubHashOccurrenceEntry).where( MusehubHashOccurrenceEntry.repo_id == repo.repo_id, MusehubHashOccurrenceEntry.content_id == shared_content_id, ) )).scalars().all() addrs = {r.address for r in rows} assert "src/a.py::foo" in addrs assert "src/b.py::bar" in addrs @pytest.mark.asyncio async def test_SI13_intel_summary_still_written_to_intel_results( self, db_session: AsyncSession, ) -> None: from musehub.db.musehub_intel_models import MusehubIntelResult repo = await create_repo(db_session) commit_id = _uid() await _commit_with_delta(db_session, repo.repo_id, commit_id, [ _insert_op("src/main.py::run"), ]) await _build_and_persist(db_session, repo.repo_id, commit_id) row = (await db_session.execute( select(MusehubIntelResult).where( MusehubIntelResult.repo_id == repo.repo_id, MusehubIntelResult.intel_type == "code.intel_summary", ) )).scalar_one_or_none() assert row is not None data = json.loads(row.data_json) assert "health_score" in data @pytest.mark.asyncio async def test_SI14_intel_snapshot_still_written_to_intel_results( self, db_session: AsyncSession, ) -> None: from musehub.db.musehub_intel_models import MusehubIntelResult repo = await create_repo(db_session) commit_id = _uid() await _commit_with_delta(db_session, repo.repo_id, commit_id, [ _insert_op("src/main.py::run"), ]) await _build_and_persist(db_session, repo.repo_id, commit_id) row = (await db_session.execute( select(MusehubIntelResult).where( MusehubIntelResult.repo_id == repo.repo_id, MusehubIntelResult.intel_type == "code.intel_snapshot", ) )).scalar_one_or_none() assert row is not None @pytest.mark.asyncio async def test_SI15_blob_types_not_written_to_intel_results( self, db_session: AsyncSession, ) -> None: """code.symbol_history, code.per_symbol_intel, code.hash_occurrence must NOT be written as blobs anymore.""" from musehub.db.musehub_intel_models import MusehubIntelResult repo = await create_repo(db_session) commit_id = _uid() await _commit_with_delta(db_session, repo.repo_id, commit_id, [ _insert_op("src/main.py::run"), ]) await _build_and_persist(db_session, repo.repo_id, commit_id) blob_types = (await db_session.execute( select(MusehubIntelResult.intel_type).where( MusehubIntelResult.repo_id == repo.repo_id, MusehubIntelResult.intel_type.in_([ "code.symbol_history", "code.per_symbol_intel", "code.hash_occurrence", ]) ) )).scalars().all() assert blob_types == [], f"blob types still written: {blob_types}" @pytest.mark.asyncio async def test_SI16_author_stored_in_history_entry( self, db_session: AsyncSession, ) -> None: from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry repo = await create_repo(db_session) commit_id = _uid() await _commit_with_delta( db_session, repo.repo_id, commit_id, [_insert_op("src/auth.py::validate")], author="gabriel", ) await _build_and_persist(db_session, repo.repo_id, commit_id) row = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.address == "src/auth.py::validate", ) )).scalar_one() assert row.author == "gabriel" # ───────────────────────────────────────────────────────────────────────────── # Layer 3 — Read: helpers return correct data from normalized tables # ───────────────────────────────────────────────────────────────────────────── class TestReadHelpers: """SI17–SI24: read helpers query normalized tables, not blobs.""" @pytest.mark.asyncio async def test_SI17_load_symbol_history_returns_entries_for_address( self, db_session: AsyncSession, ) -> None: from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session) commit_id = _uid() await _commit_with_delta(db_session, repo.repo_id, commit_id, [ _insert_op("src/auth.py::login"), _insert_op("src/core.py::Engine"), ]) await _build_and_persist(db_session, repo.repo_id, commit_id) history = await load_symbol_history(db_session, repo.repo_id) assert "src/auth.py::login" in history assert "src/core.py::Engine" in history @pytest.mark.asyncio async def test_SI18_load_symbol_history_file_path_filter( self, db_session: AsyncSession, ) -> None: from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session) commit_id = _uid() await _commit_with_delta(db_session, repo.repo_id, commit_id, [ _insert_op("src/auth.py::login"), _insert_op("src/auth.py::logout"), _insert_op("src/core.py::Engine"), ]) await _build_and_persist(db_session, repo.repo_id, commit_id) history = await load_symbol_history(db_session, repo.repo_id, file_path="src/auth.py") assert "src/auth.py::login" in history assert "src/auth.py::logout" in history assert "src/core.py::Engine" not in history @pytest.mark.asyncio async def test_SI19_load_symbol_history_empty_when_no_index( self, db_session: AsyncSession, ) -> None: from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session) history = await load_symbol_history(db_session, repo.repo_id) assert history == {} @pytest.mark.asyncio async def test_SI20_lookup_symbol_intel_returns_metrics( self, db_session: AsyncSession, ) -> None: from musehub.services.musehub_symbol_indexer import lookup_symbol_intel repo = await create_repo(db_session) commit_id = _uid() await _commit_with_delta(db_session, repo.repo_id, commit_id, [ _insert_op("src/billing.py::compute_total"), ]) await _build_and_persist(db_session, repo.repo_id, commit_id) result = await lookup_symbol_intel( db_session, repo.repo_id, ["src/billing.py::compute_total"] ) assert "src/billing.py::compute_total" in result intel = result["src/billing.py::compute_total"] assert "churn" in intel assert "gravity" in intel assert "blast" in intel @pytest.mark.asyncio async def test_SI21_lookup_symbol_intel_missing_address_excluded( self, db_session: AsyncSession, ) -> None: from musehub.services.musehub_symbol_indexer import lookup_symbol_intel repo = await create_repo(db_session) result = await lookup_symbol_intel(db_session, repo.repo_id, ["nonexistent::fn"]) assert result == {} @pytest.mark.asyncio async def test_SI22_load_hash_occurrence_returns_clone_pairs( self, db_session: AsyncSession, ) -> None: from musehub.services.musehub_symbol_indexer import load_hash_occurrence repo = await create_repo(db_session) commit_id = _uid() content_id = _cid() await _commit_with_delta(db_session, repo.repo_id, commit_id, [ _insert_op("src/a.py::foo", content_id), _insert_op("src/b.py::bar", content_id), ]) await _build_and_persist(db_session, repo.repo_id, commit_id) occurrence = await load_hash_occurrence(db_session, repo.repo_id) assert content_id in occurrence assert set(occurrence[content_id]) == {"src/a.py::foo", "src/b.py::bar"} @pytest.mark.asyncio async def test_SI23_load_intel_snapshot_still_works( self, db_session: AsyncSession, ) -> None: from musehub.services.musehub_symbol_indexer import load_intel_snapshot repo = await create_repo(db_session) commit_id = _uid() await _commit_with_delta(db_session, repo.repo_id, commit_id, [ _insert_op("src/main.py::run"), ]) await _build_and_persist(db_session, repo.repo_id, commit_id) snap = await load_intel_snapshot(db_session, repo.repo_id) assert snap is not None @pytest.mark.asyncio async def test_SI24_get_index_meta_returns_correct_ref( self, db_session: AsyncSession, ) -> None: from musehub.services.musehub_symbol_indexer import get_index_meta repo = await create_repo(db_session) commit_id = _uid() await _commit_with_delta(db_session, repo.repo_id, commit_id, [ _insert_op("src/main.py::run"), ]) await _build_and_persist(db_session, repo.repo_id, commit_id) meta = await get_index_meta(db_session, repo.repo_id) assert meta is not None assert meta["ref"] == commit_id # ───────────────────────────────────────────────────────────────────────────── # Layer 4 — Incremental: second push merges without duplication # ───────────────────────────────────────────────────────────────────────────── class TestIncrementalUpdates: """SI25–SI29: second push adds new rows, does not duplicate existing ones.""" @pytest.mark.asyncio async def test_SI25_second_push_adds_new_history_entries( self, db_session: AsyncSession, ) -> None: from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry repo = await create_repo(db_session) c1 = _uid() await _commit_with_delta(db_session, repo.repo_id, c1, [ _insert_op("src/auth.py::login"), ]) await _build_and_persist(db_session, repo.repo_id, c1) c2 = _uid() await _commit_with_delta(db_session, repo.repo_id, c2, [ _insert_op("src/auth.py::logout"), ], parent_ids=[c1]) await _build_and_persist(db_session, repo.repo_id, c2) rows = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id ) )).scalars().all() addresses = {r.address for r in rows} assert "src/auth.py::login" in addresses assert "src/auth.py::logout" in addresses @pytest.mark.asyncio async def test_SI26_second_push_does_not_duplicate_existing_entries( self, db_session: AsyncSession, ) -> None: from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry repo = await create_repo(db_session) c1 = _uid() await _commit_with_delta(db_session, repo.repo_id, c1, [ _insert_op("src/core.py::Engine"), ]) await _build_and_persist(db_session, repo.repo_id, c1) # Re-build with same head — no new rows await _build_and_persist(db_session, repo.repo_id, c1) count = (await db_session.execute( select(func.count()).select_from(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.address == "src/core.py::Engine", ) )).scalar_one() assert count == 1 @pytest.mark.asyncio async def test_SI27_modify_op_updates_symbol_intel_churn( self, db_session: AsyncSession, ) -> None: from musehub.services.musehub_symbol_indexer import lookup_symbol_intel repo = await create_repo(db_session) c1 = _uid() await _commit_with_delta(db_session, repo.repo_id, c1, [ _insert_op("src/core.py::Engine"), ]) await _build_and_persist(db_session, repo.repo_id, c1) c2 = _uid() await _commit_with_delta(db_session, repo.repo_id, c2, [ {"address": "src/core.py::Engine", "op": "replace", "content_id": _cid()}, ], parent_ids=[c1]) await _build_and_persist(db_session, repo.repo_id, c2) intel = await lookup_symbol_intel(db_session, repo.repo_id, ["src/core.py::Engine"]) assert intel["src/core.py::Engine"]["churn"] == 2 @pytest.mark.asyncio async def test_SI28_second_push_history_has_both_commit_ids( self, db_session: AsyncSession, ) -> None: from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry repo = await create_repo(db_session) c1, c2 = _uid(), _uid() await _commit_with_delta(db_session, repo.repo_id, c1, [ _insert_op("src/auth.py::login"), ]) await _build_and_persist(db_session, repo.repo_id, c1) await _commit_with_delta(db_session, repo.repo_id, c2, [ {"address": "src/auth.py::login", "op": "replace", "content_id": _cid()}, ], parent_ids=[c1]) await _build_and_persist(db_session, repo.repo_id, c2) rows = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.address == "src/auth.py::login", ) )).scalars().all() commit_ids = {r.commit_id for r in rows} assert c1 in commit_ids assert c2 in commit_ids @pytest.mark.asyncio async def test_SI29_intel_summary_ref_advances_after_second_push( self, db_session: AsyncSession, ) -> None: from musehub.services.musehub_symbol_indexer import get_index_meta repo = await create_repo(db_session) c1, c2 = _uid(), _uid() await _commit_with_delta(db_session, repo.repo_id, c1, [_insert_op("src/a.py::f")]) await _build_and_persist(db_session, repo.repo_id, c1) await _commit_with_delta(db_session, repo.repo_id, c2, [_insert_op("src/b.py::g")], parent_ids=[c1]) await _build_and_persist(db_session, repo.repo_id, c2) meta = await get_index_meta(db_session, repo.repo_id) assert meta is not None assert meta["ref"] == c2 # ───────────────────────────────────────────────────────────────────────────── # Layer 5 — Integrity: corrupt data, unknown refs, empty repos # ───────────────────────────────────────────────────────────────────────────── class TestDataIntegrity: """SI30–SI33: edge cases that must not raise or corrupt state.""" @pytest.mark.asyncio async def test_SI30_empty_repo_returns_empty_history( self, db_session: AsyncSession, ) -> None: from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session) assert await load_symbol_history(db_session, repo.repo_id) == {} @pytest.mark.asyncio async def test_SI31_unknown_head_commit_returns_empty_results( self, db_session: AsyncSession, ) -> None: from musehub.services.musehub_symbol_indexer import build_symbol_index repo = await create_repo(db_session) results = await build_symbol_index(db_session, repo.repo_id, _uid()) assert results == [] @pytest.mark.asyncio async def test_SI32_commit_with_no_structured_delta_skipped( self, db_session: AsyncSession, ) -> None: from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef repo = await create_repo(db_session) commit_id = _uid() c = MusehubCommit( commit_id=commit_id, branch="main", parent_ids=[], message="no delta", author="gabriel", timestamp=_now(), ) db_session.add(c) db_session.add(MusehubCommitRef(repo_id=repo.repo_id, commit_id=commit_id)) await db_session.flush() await _build_and_persist(db_session, repo.repo_id, commit_id) count = (await db_session.execute( select(func.count()).select_from(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id ) )).scalar_one() assert count == 0 @pytest.mark.asyncio async def test_SI33_lookup_symbol_intel_empty_address_list( self, db_session: AsyncSession, ) -> None: from musehub.services.musehub_symbol_indexer import lookup_symbol_intel repo = await create_repo(db_session) result = await lookup_symbol_intel(db_session, repo.repo_id, []) assert result == {} # ───────────────────────────────────────────────────────────────────────────── # Layer 6 — Performance: point lookups do not deserialize blobs # ───────────────────────────────────────────────────────────────────────────── class TestPerformance: """SI34–SI36: normalized reads are fast regardless of repo size. These budgets would be impossible with the blob approach at scale.""" @pytest.mark.asyncio async def test_SI34_single_symbol_lookup_under_50ms( self, db_session: AsyncSession, ) -> None: from musehub.services.musehub_symbol_indexer import lookup_symbol_intel repo = await create_repo(db_session) # Build a 50-commit index parent: list[str] = [] last_id = "" for i in range(50): cid = _uid() await _commit_with_delta(db_session, repo.repo_id, cid, [ _insert_op(f"src/file_{i}.py::fn_{i}"), _insert_op(f"src/file_{i}.py::helper_{i}"), ], parent_ids=parent) parent = [cid] last_id = cid await _build_and_persist(db_session, repo.repo_id, last_id) target = "src/file_25.py::fn_25" t0 = time.perf_counter() result = await lookup_symbol_intel(db_session, repo.repo_id, [target]) elapsed_ms = (time.perf_counter() - t0) * 1000 assert target in result assert elapsed_ms < 50, f"point lookup took {elapsed_ms:.1f}ms — too slow" @pytest.mark.asyncio async def test_SI35_file_scoped_history_lookup_under_50ms( self, db_session: AsyncSession, ) -> None: from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session) parent: list[str] = [] last_id = "" for i in range(50): cid = _uid() await _commit_with_delta(db_session, repo.repo_id, cid, [ _insert_op(f"src/other_{i}.py::fn"), _insert_op("src/target.py::hot_fn"), ], parent_ids=parent) parent = [cid] last_id = cid await _build_and_persist(db_session, repo.repo_id, last_id) t0 = time.perf_counter() history = await load_symbol_history(db_session, repo.repo_id, file_path="src/target.py") elapsed_ms = (time.perf_counter() - t0) * 1000 assert "src/target.py::hot_fn" in history assert elapsed_ms < 50, f"file-scoped lookup took {elapsed_ms:.1f}ms" @pytest.mark.asyncio async def test_SI36_load_symbol_history_no_file_filter_returns_all( self, db_session: AsyncSession, ) -> None: from musehub.services.musehub_symbol_indexer import load_symbol_history repo = await create_repo(db_session) cid = _uid() await _commit_with_delta(db_session, repo.repo_id, cid, [ _insert_op("src/a.py::fn_a"), _insert_op("src/b.py::fn_b"), _insert_op("src/c.py::fn_c"), ]) await _build_and_persist(db_session, repo.repo_id, cid) history = await load_symbol_history(db_session, repo.repo_id) assert {"src/a.py::fn_a", "src/b.py::fn_b", "src/c.py::fn_c"} <= set(history.keys()) # ───────────────────────────────────────────────────────────────────────────── # Layer 7 — Stress: 500 symbols × realistic commit volume # ───────────────────────────────────────────────────────────────────────────── class TestStress: """SI37–SI38: large repos index without timeout or corruption.""" @pytest.mark.asyncio async def test_SI37_index_500_symbols_across_10_commits( self, db_session: AsyncSession, ) -> None: from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry repo = await create_repo(db_session) symbols = [f"src/module_{i // 10}.py::fn_{i}" for i in range(500)] parent: list[str] = [] last_id = "" chunk = len(symbols) // 10 for batch_idx in range(10): cid = _uid() ops = [_insert_op(s) for s in symbols[batch_idx * chunk:(batch_idx + 1) * chunk]] await _commit_with_delta(db_session, repo.repo_id, cid, ops, parent_ids=parent) parent = [cid] last_id = cid t0 = time.perf_counter() await _build_and_persist(db_session, repo.repo_id, last_id) elapsed = time.perf_counter() - t0 count = (await db_session.execute( select(func.count()).select_from(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id ) )).scalar_one() assert count == 500 assert elapsed < 10.0, f"500-symbol index took {elapsed:.1f}s" @pytest.mark.asyncio async def test_SI38_same_symbol_modified_50_times( self, db_session: AsyncSession, ) -> None: from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry from musehub.services.musehub_symbol_indexer import lookup_symbol_intel repo = await create_repo(db_session) addr = "src/hot.py::hot_fn" parent: list[str] = [] last_id = "" for i in range(50): cid = _uid() op_type = "insert" if i == 0 else "replace" await _commit_with_delta(db_session, repo.repo_id, cid, [ {"address": addr, "op": op_type, "content_id": _cid()}, ], parent_ids=parent) parent = [cid] last_id = cid await _build_and_persist(db_session, repo.repo_id, last_id) rows = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.address == addr, ) )).scalars().all() assert len(rows) == 50 intel = await lookup_symbol_intel(db_session, repo.repo_id, [addr]) assert intel[addr]["churn"] == 50 # ───────────────────────────────────────────────────────────────────────────── # Layer 8 — Aggregates: intel_summary and intel_snapshot still produced # ───────────────────────────────────────────────────────────────────────────── class TestAggregatesStillWork: """SI39–SI40: aggregate outputs (summary, snapshot) are unaffected.""" @pytest.mark.asyncio async def test_SI39_intel_summary_fields_correct( self, db_session: AsyncSession, ) -> None: from musehub.db.musehub_intel_models import MusehubIntelResult repo = await create_repo(db_session) cid = _uid() await _commit_with_delta(db_session, repo.repo_id, cid, [ _insert_op("src/a.py::fn1"), _insert_op("src/b.py::fn2"), _insert_op("src/c.py::fn3"), ]) await _build_and_persist(db_session, repo.repo_id, cid) row = (await db_session.execute( select(MusehubIntelResult).where( MusehubIntelResult.repo_id == repo.repo_id, MusehubIntelResult.intel_type == "code.intel_summary", ) )).scalar_one() data = json.loads(row.data_json) assert data.get("symbol_count", 0) >= 3 assert "health_score" in data assert "health_label" in data @pytest.mark.asyncio async def test_SI40_rebuild_updates_summary_symbol_count( self, db_session: AsyncSession, ) -> None: from musehub.db.musehub_intel_models import MusehubIntelResult repo = await create_repo(db_session) c1, c2 = _uid(), _uid() await _commit_with_delta(db_session, repo.repo_id, c1, [_insert_op("src/a.py::fn1")]) await _build_and_persist(db_session, repo.repo_id, c1) await _commit_with_delta(db_session, repo.repo_id, c2, [ _insert_op("src/b.py::fn2"), _insert_op("src/c.py::fn3"), ], parent_ids=[c1]) await _build_and_persist(db_session, repo.repo_id, c2) row = (await db_session.execute( select(MusehubIntelResult).where( MusehubIntelResult.repo_id == repo.repo_id, MusehubIntelResult.intel_type == "code.intel_summary", ) )).scalar_one() data = json.loads(row.data_json) assert data.get("symbol_count", 0) >= 3