"""TDD spec — fast symbol detail reads via push-time pre-computation. Problem ─────── The symbol detail page takes 2-3 s because it computes expensive data at request time: loading all history rows for an entire file, joining commit metadata for every entry, and running a large GROUP BY coupling query. Solution ──────── At push time a background job populates three structures: 1. ``MusehubSymbolHistoryEntry`` — add ``message`` + ``commit_branch`` columns so the timeline read needs no join to ``MusehubCommit``. 2. ``MusehubSymbolVitals`` — new table: one pre-computed row per symbol with first_introduced, change_count, version_count, op breakdown. Eliminates loading the full history to derive vitals at request time. 3. ``MusehubSymbolCoupling`` — new table: one row per (symbol, co_symbol) pair with shared_commits count. Replaces the request-time GROUP BY across a large commit_id IN (...) set. At page load the route becomes: SELECT * FROM musehub_symbol_history_entries WHERE repo_id=X AND address=Y SELECT * FROM musehub_symbol_vitals WHERE repo_id=X AND address=Y SELECT * FROM musehub_symbol_intel WHERE repo_id=X AND address=Y SELECT * FROM musehub_symbol_coupling WHERE repo_id=X AND address=Y LIMIT N OFFSET M + 4 small per-table intel reads (type, blast_risk, dead, api) Tier breakdown ────────────── D1xx Schema — new columns / tables exist and have correct types D2xx Indexer — background job populates data correctly at push time D3xx Route — detail page uses pre-computed data (no heavy queries) D4xx Edge — empty history, single entry, rename/lineage, rebuild idempotency """ from __future__ import annotations import datetime as _dt import secrets from datetime import timezone import pytest from httpx import AsyncClient from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_intel_models import MusehubSymbolCoupling, MusehubSymbolHistoryEntry, MusehubSymbolVitals from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef from muse.core.types import blob_id, long_id from tests.factories import create_repo # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _now(offset_days: int = 0) -> _dt.datetime: return _dt.datetime.now(tz=timezone.utc) + _dt.timedelta(days=offset_days) def _cid() -> str: return blob_id(secrets.token_bytes(32)) def _lid() -> str: return long_id(secrets.token_hex(32)) async def _insert_history( session: AsyncSession, repo_id: str, address: str, commit_id: str, op: str = "insert", content_id: str | None = None, message: str = "feat: test", commit_branch: str = "dev", committed_at: _dt.datetime | None = None, author: str = "gabriel", ) -> MusehubSymbolHistoryEntry: entry = MusehubSymbolHistoryEntry( repo_id=repo_id, address=address, commit_id=commit_id, op=op, content_id=content_id or _cid(), message=message, commit_branch=commit_branch, committed_at=committed_at or _now(), author=author, ) session.add(entry) await session.flush() return entry async def _insert_vitals( session: AsyncSession, repo_id: str, address: str, *, first_introduced: _dt.datetime | None = None, change_count: int = 1, version_count: int = 1, op_add: int = 1, op_modify: int = 0, op_delete: int = 0, op_move: int = 0, ) -> "MusehubSymbolVitals": row = MusehubSymbolVitals( repo_id=repo_id, address=address, first_introduced=first_introduced or _now(), change_count=change_count, version_count=version_count, op_add=op_add, op_modify=op_modify, op_delete=op_delete, op_move=op_move, ) session.add(row) await session.flush() return row async def _insert_coupling( session: AsyncSession, repo_id: str, address: str, co_address: str, shared_commits: int, ) -> "MusehubSymbolCoupling": row = MusehubSymbolCoupling( repo_id=repo_id, address=address, co_address=co_address, shared_commits=shared_commits, ) session.add(row) await session.flush() return row # --------------------------------------------------------------------------- # D1xx — Schema: new columns / tables exist # --------------------------------------------------------------------------- class TestHistoryEntrySchema: """D101–D103: MusehubSymbolHistoryEntry has message + commit_branch.""" async def test_D101_message_column_exists(self, db_session: AsyncSession) -> None: """D101: MusehubSymbolHistoryEntry.message column is present.""" repo = await create_repo(db_session, owner="gabriel") cid = _lid() entry = MusehubSymbolHistoryEntry( repo_id=repo.repo_id, address="src/foo.py::bar", commit_id=cid, op="insert", content_id=_cid(), message="feat: add bar", commit_branch="dev", committed_at=_now(), ) db_session.add(entry) await db_session.flush() fetched = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.address == "src/foo.py::bar", ) )).scalar_one() assert fetched.message == "feat: add bar" async def test_D102_commit_branch_column_exists(self, db_session: AsyncSession) -> None: """D102: MusehubSymbolHistoryEntry.commit_branch column is present.""" repo = await create_repo(db_session, owner="gabriel") cid = _lid() entry = MusehubSymbolHistoryEntry( repo_id=repo.repo_id, address="src/foo.py::bar", commit_id=cid, op="insert", content_id=_cid(), message="feat: x", commit_branch="feat/my-thing", committed_at=_now(), ) db_session.add(entry) await db_session.flush() fetched = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.address == "src/foo.py::bar", ) )).scalar_one() assert fetched.commit_branch == "feat/my-thing" async def test_D103_message_and_branch_nullable(self, db_session: AsyncSession) -> None: """D103: message and commit_branch accept None (backward compat for old rows).""" repo = await create_repo(db_session, owner="gabriel") entry = MusehubSymbolHistoryEntry( repo_id=repo.repo_id, address="src/foo.py::baz", commit_id=_lid(), op="insert", content_id=_cid(), message=None, commit_branch=None, committed_at=_now(), ) db_session.add(entry) await db_session.flush() fetched = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.address == "src/foo.py::baz", ) )).scalar_one() assert fetched.message is None assert fetched.commit_branch is None class TestSymbolVitalsSchema: """D110–D116: MusehubSymbolVitals table exists with correct columns.""" async def test_D110_table_exists(self, db_session: AsyncSession) -> None: """D110: MusehubSymbolVitals ORM class is importable and maps to a table.""" assert MusehubSymbolVitals is not None async def test_D111_insert_and_fetch(self, db_session: AsyncSession) -> None: """D111: can insert and retrieve a vitals row.""" repo = await create_repo(db_session, owner="gabriel") introduced = _now(offset_days=-30) row = await _insert_vitals( db_session, repo.repo_id, "src/core.py::process", first_introduced=introduced, change_count=42, version_count=7, op_add=1, op_modify=40, op_delete=0, op_move=1, ) fetched = (await db_session.execute( select(MusehubSymbolVitals).where( MusehubSymbolVitals.repo_id == repo.repo_id, MusehubSymbolVitals.address == "src/core.py::process", ) )).scalar_one() assert fetched.change_count == 42 assert fetched.version_count == 7 assert fetched.op_add == 1 assert fetched.op_modify == 40 assert fetched.op_delete == 0 assert fetched.op_move == 1 async def test_D112_first_introduced_is_datetime(self, db_session: AsyncSession) -> None: """D112: first_introduced stores a timezone-aware datetime.""" repo = await create_repo(db_session, owner="gabriel") ts = _now(offset_days=-10) await _insert_vitals(db_session, repo.repo_id, "src/a.py::fn", first_introduced=ts) fetched = (await db_session.execute( select(MusehubSymbolVitals).where( MusehubSymbolVitals.repo_id == repo.repo_id, MusehubSymbolVitals.address == "src/a.py::fn", ) )).scalar_one() assert fetched.first_introduced is not None assert fetched.first_introduced.tzinfo is not None async def test_D113_primary_key_is_repo_and_address(self, db_session: AsyncSession) -> None: """D113: upsert on (repo_id, address) replaces the row.""" repo = await create_repo(db_session, owner="gabriel") await _insert_vitals(db_session, repo.repo_id, "src/a.py::fn", change_count=5) await db_session.commit() # Re-insert with updated count row2 = MusehubSymbolVitals( repo_id=repo.repo_id, address="src/a.py::fn", first_introduced=_now(), change_count=10, version_count=2, op_add=1, op_modify=9, op_delete=0, op_move=0, ) from sqlalchemy.dialects.postgresql import insert as pg_insert stmt = pg_insert(MusehubSymbolVitals).values( repo_id=row2.repo_id, address=row2.address, first_introduced=row2.first_introduced, change_count=row2.change_count, version_count=row2.version_count, op_add=row2.op_add, op_modify=row2.op_modify, op_delete=row2.op_delete, op_move=row2.op_move, ).on_conflict_do_update( index_elements=["repo_id", "address"], set_={"change_count": row2.change_count, "version_count": row2.version_count}, ) await db_session.execute(stmt) await db_session.commit() rows = (await db_session.execute( select(MusehubSymbolVitals).where( MusehubSymbolVitals.repo_id == repo.repo_id, MusehubSymbolVitals.address == "src/a.py::fn", ) )).scalars().all() assert len(rows) == 1 assert rows[0].change_count == 10 async def test_D114_cascade_delete_with_repo(self, db_session: AsyncSession) -> None: """D114: vitals rows are deleted when the repo is deleted.""" repo = await create_repo(db_session, owner="gabriel") await _insert_vitals(db_session, repo.repo_id, "src/a.py::fn") await db_session.commit() await db_session.delete(repo) await db_session.commit() rows = (await db_session.execute( select(MusehubSymbolVitals).where( MusehubSymbolVitals.repo_id == repo.repo_id, ) )).scalars().all() assert rows == [] class TestSymbolCouplingSchema: """D120–D126: MusehubSymbolCoupling table exists with correct columns.""" async def test_D120_table_exists(self, db_session: AsyncSession) -> None: """D120: MusehubSymbolCoupling ORM class is importable.""" assert MusehubSymbolCoupling is not None async def test_D121_insert_and_fetch(self, db_session: AsyncSession) -> None: """D121: can insert and retrieve a coupling row.""" repo = await create_repo(db_session, owner="gabriel") row = await _insert_coupling( db_session, repo.repo_id, "src/a.py::fn", "src/b.py::helper", shared_commits=12, ) fetched = (await db_session.execute( select(MusehubSymbolCoupling).where( MusehubSymbolCoupling.repo_id == repo.repo_id, MusehubSymbolCoupling.address == "src/a.py::fn", ) )).scalar_one() assert fetched.co_address == "src/b.py::helper" assert fetched.shared_commits == 12 async def test_D122_ordered_by_shared_commits_desc(self, db_session: AsyncSession) -> None: """D122: coupling rows can be fetched ordered by shared_commits descending.""" repo = await create_repo(db_session, owner="gabriel") await _insert_coupling(db_session, repo.repo_id, "src/a.py::fn", "src/b.py::b", 3) await _insert_coupling(db_session, repo.repo_id, "src/a.py::fn", "src/c.py::c", 10) await _insert_coupling(db_session, repo.repo_id, "src/a.py::fn", "src/d.py::d", 7) await db_session.flush() rows = (await db_session.execute( select(MusehubSymbolCoupling) .where( MusehubSymbolCoupling.repo_id == repo.repo_id, MusehubSymbolCoupling.address == "src/a.py::fn", ) .order_by(MusehubSymbolCoupling.shared_commits.desc()) )).scalars().all() counts = [r.shared_commits for r in rows] assert counts == sorted(counts, reverse=True) assert counts[0] == 10 async def test_D123_pagination_with_limit_offset(self, db_session: AsyncSession) -> None: """D123: coupling supports limit/offset for cursor pagination.""" repo = await create_repo(db_session, owner="gabriel") for i in range(20): await _insert_coupling( db_session, repo.repo_id, "src/a.py::fn", f"src/x{i}.py::sym", shared_commits=20 - i, ) await db_session.flush() page1 = (await db_session.execute( select(MusehubSymbolCoupling) .where( MusehubSymbolCoupling.repo_id == repo.repo_id, MusehubSymbolCoupling.address == "src/a.py::fn", ) .order_by(MusehubSymbolCoupling.shared_commits.desc()) .limit(15).offset(0) )).scalars().all() page2 = (await db_session.execute( select(MusehubSymbolCoupling) .where( MusehubSymbolCoupling.repo_id == repo.repo_id, MusehubSymbolCoupling.address == "src/a.py::fn", ) .order_by(MusehubSymbolCoupling.shared_commits.desc()) .limit(15).offset(15) )).scalars().all() assert len(page1) == 15 assert len(page2) == 5 all_addrs = {r.co_address for r in page1} | {r.co_address for r in page2} assert len(all_addrs) == 20 async def test_D124_cascade_delete_with_repo(self, db_session: AsyncSession) -> None: """D124: coupling rows are deleted when the repo is deleted.""" repo = await create_repo(db_session, owner="gabriel") await _insert_coupling(db_session, repo.repo_id, "src/a.py::fn", "src/b.py::g", 5) await db_session.commit() await db_session.delete(repo) await db_session.commit() rows = (await db_session.execute( select(MusehubSymbolCoupling).where( MusehubSymbolCoupling.repo_id == repo.repo_id, ) )).scalars().all() assert rows == [] async def test_D125_primary_key_is_repo_address_co_address(self, db_session: AsyncSession) -> None: """D125: (repo_id, address, co_address) is the primary key — no duplicates.""" repo = await create_repo(db_session, owner="gabriel") await _insert_coupling(db_session, repo.repo_id, "src/a.py::fn", "src/b.py::g", 5) await db_session.commit() from sqlalchemy.dialects.postgresql import insert as pg_insert stmt = pg_insert(MusehubSymbolCoupling).values( repo_id=repo.repo_id, address="src/a.py::fn", co_address="src/b.py::g", shared_commits=99, ).on_conflict_do_update( index_elements=["repo_id", "address", "co_address"], set_={"shared_commits": 99}, ) await db_session.execute(stmt) await db_session.commit() rows = (await db_session.execute( select(MusehubSymbolCoupling).where( MusehubSymbolCoupling.repo_id == repo.repo_id, MusehubSymbolCoupling.address == "src/a.py::fn", ) )).scalars().all() assert len(rows) == 1 assert rows[0].shared_commits == 99 # --------------------------------------------------------------------------- # D2xx — Indexer: background job populates data at push time # --------------------------------------------------------------------------- class TestIndexerPopulatesVitals: """D201–D207: build_symbol_index writes MusehubSymbolVitals rows.""" async def test_D201_vitals_written_after_index_build(self, db_session: AsyncSession) -> None: """D201: after build_symbol_index, a vitals row exists for each indexed symbol.""" from musehub.services.musehub_symbol_indexer import build_symbol_index repo = await create_repo(db_session, owner="gabriel") cid = _lid() commit = MusehubCommit( commit_id=cid, branch="dev", parent_ids=[], message="feat: add fn", author="gabriel", timestamp=_now(), structured_delta={"ops": [ {"address": "src/core.py::process", "op": "insert", "content_id": _cid()}, ]}, ) db_session.add(commit) db_session.add(MusehubCommitRef(repo_id=repo.repo_id, commit_id=cid)) await db_session.flush() await build_symbol_index(db_session, repo.repo_id, cid) await db_session.flush() row = (await db_session.execute( select(MusehubSymbolVitals).where( MusehubSymbolVitals.repo_id == repo.repo_id, MusehubSymbolVitals.address == "src/core.py::process", ) )).scalar_one_or_none() assert row is not None assert row.change_count == 1 assert row.op_add == 1 async def test_D202_vitals_change_count_increments_on_rebuild(self, db_session: AsyncSession) -> None: """D202: change_count reflects all commits for the symbol after rebuild.""" from musehub.services.musehub_symbol_indexer import build_symbol_index repo = await create_repo(db_session, owner="gabriel") content = _cid() prev_id: str | None = None last_cid = "" for i in range(5): cid = _lid() last_cid = cid commit = MusehubCommit( commit_id=cid, branch="dev", parent_ids=[prev_id] if prev_id else [], message=f"feat: change {i}", author="gabriel", timestamp=_now(offset_days=i), structured_delta={"ops": [ {"address": "src/core.py::process", "op": "replace" if i else "insert", "content_id": content}, ]}, ) db_session.add(commit) db_session.add(MusehubCommitRef(repo_id=repo.repo_id, commit_id=cid)) prev_id = cid await db_session.flush() await build_symbol_index(db_session, repo.repo_id, last_cid) await db_session.flush() row = (await db_session.execute( select(MusehubSymbolVitals).where( MusehubSymbolVitals.repo_id == repo.repo_id, MusehubSymbolVitals.address == "src/core.py::process", ) )).scalar_one_or_none() assert row is not None assert row.change_count == 5 async def test_D203_vitals_first_introduced_is_earliest_commit(self, db_session: AsyncSession) -> None: """D203: first_introduced matches the committed_at of the symbol's first entry.""" from musehub.services.musehub_symbol_indexer import build_symbol_index repo = await create_repo(db_session, owner="gabriel") first_ts = _now(offset_days=-10) prev_id: str | None = None last_cid = "" for i, ts in enumerate([first_ts, _now(offset_days=-5), _now()]): cid = _lid() last_cid = cid commit = MusehubCommit( commit_id=cid, branch="dev", parent_ids=[prev_id] if prev_id else [], message="change", author="gabriel", timestamp=ts, structured_delta={"ops": [ {"address": "src/a.py::fn", "op": "insert" if i == 0 else "replace", "content_id": _cid()}, ]}, ) db_session.add(commit) db_session.add(MusehubCommitRef(repo_id=repo.repo_id, commit_id=cid)) prev_id = cid await db_session.flush() await build_symbol_index(db_session, repo.repo_id, last_cid) await db_session.flush() row = (await db_session.execute( select(MusehubSymbolVitals).where( MusehubSymbolVitals.repo_id == repo.repo_id, MusehubSymbolVitals.address == "src/a.py::fn", ) )).scalar_one_or_none() assert row is not None assert abs((row.first_introduced - first_ts).total_seconds()) < 2 class TestIndexerPopulatesCoupling: """D210–D215: build_symbol_index writes MusehubSymbolCoupling rows.""" async def test_D210_coupling_written_for_co_changed_symbols(self, db_session: AsyncSession) -> None: """D210: symbols changed in the same commit get coupling rows.""" from musehub.services.musehub_symbol_indexer import build_symbol_index, backfill_coupling repo = await create_repo(db_session, owner="gabriel") cid = _lid() commit = MusehubCommit( commit_id=cid, branch="dev", parent_ids=[], message="feat: big change", author="gabriel", timestamp=_now(), structured_delta={"ops": [ {"address": "src/a.py::fn_a", "op": "insert", "content_id": _cid()}, {"address": "src/b.py::fn_b", "op": "insert", "content_id": _cid()}, ]}, ) db_session.add(commit) db_session.add(MusehubCommitRef(repo_id=repo.repo_id, commit_id=cid)) await db_session.flush() await build_symbol_index(db_session, repo.repo_id, cid) await backfill_coupling(db_session, repo.repo_id, min_shared=1) await db_session.flush() coupling_a = (await db_session.execute( select(MusehubSymbolCoupling).where( MusehubSymbolCoupling.repo_id == repo.repo_id, MusehubSymbolCoupling.address == "src/a.py::fn_a", MusehubSymbolCoupling.co_address == "src/b.py::fn_b", ) )).scalar_one_or_none() assert coupling_a is not None assert coupling_a.shared_commits == 1 async def test_D211_coupling_is_symmetric(self, db_session: AsyncSession) -> None: """D211: if A couples with B, there is also a row for B→A.""" from musehub.services.musehub_symbol_indexer import build_symbol_index, backfill_coupling repo = await create_repo(db_session, owner="gabriel") cid = _lid() commit = MusehubCommit( commit_id=cid, branch="dev", parent_ids=[], message="change", author="gabriel", timestamp=_now(), structured_delta={"ops": [ {"address": "src/a.py::fn_a", "op": "insert", "content_id": _cid()}, {"address": "src/b.py::fn_b", "op": "insert", "content_id": _cid()}, ]}, ) db_session.add(commit) db_session.add(MusehubCommitRef(repo_id=repo.repo_id, commit_id=cid)) await db_session.flush() await build_symbol_index(db_session, repo.repo_id, cid) await backfill_coupling(db_session, repo.repo_id, min_shared=1) await db_session.flush() b_to_a = (await db_session.execute( select(MusehubSymbolCoupling).where( MusehubSymbolCoupling.repo_id == repo.repo_id, MusehubSymbolCoupling.address == "src/b.py::fn_b", MusehubSymbolCoupling.co_address == "src/a.py::fn_a", ) )).scalar_one_or_none() assert b_to_a is not None async def test_D212_coupling_count_accumulates_across_commits(self, db_session: AsyncSession) -> None: """D212: shared_commits increments each time both symbols appear in a commit.""" from musehub.services.musehub_symbol_indexer import build_symbol_index, backfill_coupling repo = await create_repo(db_session, owner="gabriel") last_cid = "" prev_id: str | None = None for i in range(3): cid = _lid() last_cid = cid commit = MusehubCommit( commit_id=cid, branch="dev", parent_ids=[prev_id] if prev_id else [], message=f"change {i}", author="gabriel", timestamp=_now(offset_days=i), structured_delta={"ops": [ {"address": "src/a.py::fn_a", "op": "insert" if i == 0 else "replace", "content_id": _cid()}, {"address": "src/b.py::fn_b", "op": "insert" if i == 0 else "replace", "content_id": _cid()}, ]}, ) db_session.add(commit) db_session.add(MusehubCommitRef(repo_id=repo.repo_id, commit_id=cid)) prev_id = cid await db_session.flush() await build_symbol_index(db_session, repo.repo_id, last_cid) await backfill_coupling(db_session, repo.repo_id, min_shared=1) await db_session.flush() row = (await db_session.execute( select(MusehubSymbolCoupling).where( MusehubSymbolCoupling.repo_id == repo.repo_id, MusehubSymbolCoupling.address == "src/a.py::fn_a", MusehubSymbolCoupling.co_address == "src/b.py::fn_b", ) )).scalar_one_or_none() assert row is not None assert row.shared_commits == 3 async def test_D213_rebuild_is_idempotent(self, db_session: AsyncSession) -> None: """D213: running build_symbol_index twice produces the same coupling counts.""" from musehub.services.musehub_symbol_indexer import build_symbol_index, backfill_coupling repo = await create_repo(db_session, owner="gabriel") cid = _lid() commit = MusehubCommit( commit_id=cid, branch="dev", parent_ids=[], message="change", author="gabriel", timestamp=_now(), structured_delta={"ops": [ {"address": "src/a.py::fn_a", "op": "insert", "content_id": _cid()}, {"address": "src/b.py::fn_b", "op": "insert", "content_id": _cid()}, ]}, ) db_session.add(commit) db_session.add(MusehubCommitRef(repo_id=repo.repo_id, commit_id=cid)) await db_session.flush() for _ in range(2): await build_symbol_index(db_session, repo.repo_id, cid) await backfill_coupling(db_session, repo.repo_id, min_shared=1) await db_session.flush() rows = (await db_session.execute( select(MusehubSymbolCoupling).where( MusehubSymbolCoupling.repo_id == repo.repo_id, MusehubSymbolCoupling.address == "src/a.py::fn_a", MusehubSymbolCoupling.co_address == "src/b.py::fn_b", ) )).scalars().all() assert len(rows) == 1 assert rows[0].shared_commits == 1 class TestIndexerDenormalizesCommitMessage: """D220–D223: build_symbol_index stores message + commit_branch on history entries.""" async def test_D220_history_entry_has_message(self, db_session: AsyncSession) -> None: """D220: history entry written by indexer carries the commit message.""" from musehub.services.musehub_symbol_indexer import build_symbol_index repo = await create_repo(db_session, owner="gabriel") cid = _lid() commit = MusehubCommit( commit_id=cid, branch="dev", parent_ids=[], message="feat: add important fn", author="gabriel", timestamp=_now(), structured_delta={"ops": [ {"address": "src/core.py::important_fn", "op": "insert", "content_id": _cid()}, ]}, ) db_session.add(commit) db_session.add(MusehubCommitRef(repo_id=repo.repo_id, commit_id=cid)) await db_session.flush() await build_symbol_index(db_session, repo.repo_id, cid) await db_session.flush() entry = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.address == "src/core.py::important_fn", ) )).scalar_one_or_none() assert entry is not None assert entry.message == "feat: add important fn" async def test_D221_history_entry_has_commit_branch(self, db_session: AsyncSession) -> None: """D221: history entry written by indexer carries the commit_branch.""" from musehub.services.musehub_symbol_indexer import build_symbol_index repo = await create_repo(db_session, owner="gabriel") cid = _lid() commit = MusehubCommit( commit_id=cid, branch="feat/audio", parent_ids=[], message="feat: audio fn", author="gabriel", timestamp=_now(), structured_delta={"ops": [ {"address": "src/audio.py::encode", "op": "insert", "content_id": _cid()}, ]}, ) db_session.add(commit) db_session.add(MusehubCommitRef(repo_id=repo.repo_id, commit_id=cid)) await db_session.flush() await build_symbol_index(db_session, repo.repo_id, cid) await db_session.flush() entry = (await db_session.execute( select(MusehubSymbolHistoryEntry).where( MusehubSymbolHistoryEntry.repo_id == repo.repo_id, MusehubSymbolHistoryEntry.address == "src/audio.py::encode", ) )).scalar_one_or_none() assert entry is not None assert entry.commit_branch == "feat/audio" # --------------------------------------------------------------------------- # D3xx — Route: detail page reads pre-computed data, no heavy queries # --------------------------------------------------------------------------- class TestRouteUsesPrecomputedData: """D301–D305: symbol detail route reads vitals + coupling tables.""" async def test_D301_page_renders_change_count_from_vitals( self, db_session: AsyncSession, client: AsyncClient ) -> None: """D301: change_count on the page comes from MusehubSymbolVitals, not history scan.""" repo = await create_repo(db_session, owner="gabriel", slug="myrepo") await db_session.commit() address = "src/core.py::process" cid = _lid() entry = MusehubSymbolHistoryEntry( repo_id=repo.repo_id, address=address, commit_id=cid, op="insert", content_id=_cid(), message="feat: add process", commit_branch="dev", committed_at=_now(), author="gabriel", ) db_session.add(entry) await _insert_vitals( db_session, repo.repo_id, address, change_count=99, version_count=5, op_add=1, op_modify=98, ) await db_session.commit() r = await client.get(f"/gabriel/myrepo/symbol/{address}") assert r.status_code == 200 assert b"99" in r.content async def test_D302_page_renders_coupling_from_coupling_table( self, db_session: AsyncSession, client: AsyncClient ) -> None: """D302: co-change section is populated from MusehubSymbolCoupling rows.""" repo = await create_repo(db_session, owner="gabriel", slug="myrepo2") await db_session.commit() address = "src/a.py::fn_a" cid = _lid() entry = MusehubSymbolHistoryEntry( repo_id=repo.repo_id, address=address, commit_id=cid, op="insert", content_id=_cid(), message="feat: add fn_a", commit_branch="dev", committed_at=_now(), author="gabriel", ) db_session.add(entry) await _insert_coupling(db_session, repo.repo_id, address, "src/b.py::fn_b", 7) await db_session.commit() r = await client.get(f"/gabriel/myrepo2/symbol/{address}") assert r.status_code == 200 assert b"src/b.py::fn_b" in r.content async def test_D303_history_timeline_needs_no_commit_join( self, db_session: AsyncSession, client: AsyncClient ) -> None: """D303: timeline message comes from history entry itself, not a join.""" repo = await create_repo(db_session, owner="gabriel", slug="myrepo3") await db_session.commit() address = "src/core.py::do_thing" cid = _lid() entry = MusehubSymbolHistoryEntry( repo_id=repo.repo_id, address=address, commit_id=cid, op="insert", content_id=_cid(), message="feat: do thing implemented", commit_branch="dev", committed_at=_now(), author="gabriel", ) db_session.add(entry) await db_session.commit() r = await client.get(f"/gabriel/myrepo3/symbol/{address}") assert r.status_code == 200 assert b"feat: do thing implemented" in r.content # --------------------------------------------------------------------------- # D4xx — Edge cases # --------------------------------------------------------------------------- class TestEdgeCases: """D401–D405: empty history, single entry, no coupling.""" async def test_D401_no_vitals_row_falls_back_gracefully( self, db_session: AsyncSession, client: AsyncClient ) -> None: """D401: page renders even when MusehubSymbolVitals row is absent (old data).""" repo = await create_repo(db_session, owner="gabriel", slug="oldrepo") await db_session.commit() address = "src/old.py::legacy" entry = MusehubSymbolHistoryEntry( repo_id=repo.repo_id, address=address, commit_id=_lid(), op="insert", content_id=_cid(), message=None, commit_branch=None, committed_at=_now(), author="gabriel", ) db_session.add(entry) await db_session.commit() r = await client.get(f"/gabriel/oldrepo/symbol/{address}") assert r.status_code == 200 async def test_D402_no_coupling_rows_renders_empty_section( self, db_session: AsyncSession, client: AsyncClient ) -> None: """D402: coupling section renders without error when table has no rows for symbol.""" repo = await create_repo(db_session, owner="gabriel", slug="lonerepo") await db_session.commit() address = "src/lone.py::solo" entry = MusehubSymbolHistoryEntry( repo_id=repo.repo_id, address=address, commit_id=_lid(), op="insert", content_id=_cid(), message="add solo", commit_branch="dev", committed_at=_now(), author="gabriel", ) db_session.add(entry) await db_session.commit() r = await client.get(f"/gabriel/lonerepo/symbol/{address}") assert r.status_code == 200 async def test_D403_coupling_pagination_second_page( self, db_session: AsyncSession, client: AsyncClient ) -> None: """D403: coupling_cursor offset into MusehubSymbolCoupling works correctly.""" repo = await create_repo(db_session, owner="gabriel", slug="bigcoupling") await db_session.commit() address = "src/hub.py::central" entry = MusehubSymbolHistoryEntry( repo_id=repo.repo_id, address=address, commit_id=_lid(), op="insert", content_id=_cid(), message="add central", commit_branch="dev", committed_at=_now(), author="gabriel", ) db_session.add(entry) for i in range(20): await _insert_coupling( db_session, repo.repo_id, address, f"src/dep{i}.py::fn", shared_commits=20 - i, ) await db_session.commit() r = await client.get(f"/gabriel/bigcoupling/symbol/{address}?coupling_cursor=15") assert r.status_code == 200 assert b"src/dep" in r.content async def test_D404_vitals_op_breakdown_correct(self, db_session: AsyncSession) -> None: """D404: op_add/modify/delete/move on vitals row sum to change_count.""" repo = await create_repo(db_session, owner="gabriel") row = await _insert_vitals( db_session, repo.repo_id, "src/x.py::fn", change_count=10, version_count=4, op_add=1, op_modify=7, op_delete=1, op_move=1, ) await db_session.flush() assert row.op_add + row.op_modify + row.op_delete + row.op_move == row.change_count async def test_D405_coupling_page_beyond_end_returns_200( self, db_session: AsyncSession, client: AsyncClient ) -> None: """D405: requesting a coupling offset past the end renders empty section, not 500.""" repo = await create_repo(db_session, owner="gabriel", slug="smallcoupling") await db_session.commit() address = "src/tiny.py::fn" entry = MusehubSymbolHistoryEntry( repo_id=repo.repo_id, address=address, commit_id=_lid(), op="insert", content_id=_cid(), message="add tiny fn", commit_branch="dev", committed_at=_now(), author="gabriel", ) db_session.add(entry) await _insert_coupling(db_session, repo.repo_id, address, "src/b.py::other", 3) await db_session.commit() r = await client.get(f"/gabriel/smallcoupling/symbol/{address}?coupling_cursor=9999") assert r.status_code == 200