"""TDD spec — Phase 1: coupling_count column on MusehubSymbolVitals. Problem ─────── The symbol list page needs a per-symbol coupling score (how many unique symbols this one co-changes with) without running a COUNT(*) on musehub_symbol_coupling at request time. Solution ──────── Add ``coupling_count INT DEFAULT 0`` to ``musehub_symbol_vitals``. After ``_upsert_symbol_coupling`` runs, update vitals with the count derived from the coupling table (SELECT COUNT(*) WHERE repo_id=? AND address=?). This keeps the symbol list query a single LEFT JOIN — no sub-selects, no aggregations at request time. Tier breakdown ────────────── V101 Schema — coupling_count column exists on musehub_symbol_vitals V102 Schema — default value is 0, not nullable V103 Indexer — coupling_count populated after build_symbol_index V104 Indexer — coupling_count accurate: matches distinct co_address count V105 Indexer — coupling_count idempotent: re-running build_symbol_index gives same result V106 Indexer — symbol with no coupling partners has coupling_count = 0 V107 Schema — cascade delete removes vitals row (coupling_count included) """ from __future__ import annotations import secrets from datetime import datetime, timezone import pytest from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_intel_models import MusehubSymbolCoupling, MusehubSymbolVitals from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef from muse.core.types import blob_id, long_id from musehub.services.musehub_symbol_indexer import build_symbol_index from tests.factories import create_repo # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _now() -> datetime: return datetime.now(tz=timezone.utc) def _cid() -> str: return blob_id(secrets.token_bytes(32)) def _lid() -> str: return long_id(secrets.token_hex(32)) async def _make_commit( session: AsyncSession, repo_id: str, addresses: list[str], *, parent_id: str | None = None, branch: str = "dev", message: str = "feat: test", op: str = "insert", ) -> MusehubCommit: """Create a MusehubCommit with structured_delta so the indexer processes it.""" cid = _lid() commit = MusehubCommit( commit_id=cid, branch=branch, message=message, author="gabriel", parent_ids=[parent_id] if parent_id else [], timestamp=_now(), structured_delta={"ops": [ {"address": addr, "op": op, "new_content_id": _cid()} for addr in addresses ]}, ) session.add(commit) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=cid)) await session.flush() return commit async def _push_index(session: AsyncSession, repo_id: str, head_commit_id: str) -> None: """Run build_symbol_index + backfill_coupling as the background job does at push time.""" from musehub.services.musehub_symbol_indexer import backfill_coupling await build_symbol_index(session, repo_id, head_commit_id) await backfill_coupling(session, repo_id, min_shared=1) await session.flush() # --------------------------------------------------------------------------- # V101 — coupling_count column exists on musehub_symbol_vitals # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_v101_coupling_count_column_exists(db_session: AsyncSession) -> None: """musehub_symbol_vitals must have a coupling_count column.""" result = await db_session.execute( text( "SELECT column_name FROM information_schema.columns " "WHERE table_name = 'musehub_symbol_vitals' AND column_name = 'coupling_count'" ) ) assert result.fetchone() is not None, "coupling_count column not found on musehub_symbol_vitals" # --------------------------------------------------------------------------- # V102 — default is 0, not nullable # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_v102_coupling_count_default_zero_not_nullable(db_session: AsyncSession) -> None: """coupling_count must default to 0 and be NOT NULL.""" result = await db_session.execute( text( "SELECT is_nullable, column_default " "FROM information_schema.columns " "WHERE table_name = 'musehub_symbol_vitals' AND column_name = 'coupling_count'" ) ) row = result.fetchone() assert row is not None is_nullable, column_default = row assert is_nullable == "NO", "coupling_count must be NOT NULL" assert column_default is not None and "0" in str(column_default), \ f"coupling_count must default to 0, got: {column_default}" # --------------------------------------------------------------------------- # V103 — coupling_count populated after build_symbol_index # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_v103_coupling_count_populated_after_index(db_session: AsyncSession) -> None: """After build_symbol_index, symbols with coupling partners have coupling_count > 0.""" repo = await create_repo(db_session) repo_id = repo.repo_id # Two symbols changed in the same commit → they are coupled commit = await _make_commit( db_session, repo_id, ["src/foo.py::alpha", "src/foo.py::beta"], ) await _push_index(db_session, repo_id, commit.commit_id) vitals_alpha = (await db_session.execute( select(MusehubSymbolVitals).where( MusehubSymbolVitals.repo_id == repo_id, MusehubSymbolVitals.address == "src/foo.py::alpha", ) )).scalar_one_or_none() assert vitals_alpha is not None assert vitals_alpha.coupling_count == 1, \ f"alpha coupled to beta → coupling_count should be 1, got {vitals_alpha.coupling_count}" # --------------------------------------------------------------------------- # V104 — coupling_count accurate: matches distinct co_address count # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_v104_coupling_count_matches_distinct_co_address_count(db_session: AsyncSession) -> None: """coupling_count must equal the number of distinct partners in musehub_symbol_coupling.""" repo = await create_repo(db_session) repo_id = repo.repo_id # Commit 1: alpha + beta + gamma change together c1 = await _make_commit( db_session, repo_id, ["src/a.py::alpha", "src/a.py::beta", "src/a.py::gamma"], message="feat: first", ) # Commit 2: alpha + delta change together (another partner for alpha) c2 = await _make_commit( db_session, repo_id, ["src/a.py::alpha", "src/a.py::delta"], parent_id=c1.commit_id, message="feat: second", op="replace", ) await _push_index(db_session, repo_id, c2.commit_id) # alpha is coupled to beta, gamma, delta → coupling_count = 3 vitals = (await db_session.execute( select(MusehubSymbolVitals).where( MusehubSymbolVitals.repo_id == repo_id, MusehubSymbolVitals.address == "src/a.py::alpha", ) )).scalar_one() coupling_rows = (await db_session.execute( select(MusehubSymbolCoupling).where( MusehubSymbolCoupling.repo_id == repo_id, MusehubSymbolCoupling.address == "src/a.py::alpha", ) )).scalars().all() assert vitals.coupling_count == len(coupling_rows), ( f"coupling_count {vitals.coupling_count} != distinct coupling rows {len(coupling_rows)}" ) assert vitals.coupling_count == 3 # --------------------------------------------------------------------------- # V105 — idempotent: re-running build_symbol_index gives same result # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_v105_coupling_count_idempotent(db_session: AsyncSession) -> None: """Running build_symbol_index twice produces the same coupling_count.""" repo = await create_repo(db_session) repo_id = repo.repo_id c1 = await _make_commit(db_session, repo_id, ["src/b.py::x", "src/b.py::y"]) await _push_index(db_session, repo_id, c1.commit_id) await _push_index(db_session, repo_id, c1.commit_id) vitals = (await db_session.execute( select(MusehubSymbolVitals).where( MusehubSymbolVitals.repo_id == repo_id, MusehubSymbolVitals.address == "src/b.py::x", ) )).scalar_one() assert vitals.coupling_count == 1 # --------------------------------------------------------------------------- # V106 — isolated symbol has coupling_count = 0 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_v106_isolated_symbol_has_zero_coupling_count(db_session: AsyncSession) -> None: """A symbol that never changes with others must have coupling_count = 0.""" repo = await create_repo(db_session) repo_id = repo.repo_id # Single symbol in the commit → no coupling partners c1 = await _make_commit(db_session, repo_id, ["src/solo.py::lone_wolf"]) await _push_index(db_session, repo_id, c1.commit_id) vitals = (await db_session.execute( select(MusehubSymbolVitals).where( MusehubSymbolVitals.repo_id == repo_id, MusehubSymbolVitals.address == "src/solo.py::lone_wolf", ) )).scalar_one() assert vitals.coupling_count == 0 # --------------------------------------------------------------------------- # V107 — cascade delete removes vitals row (coupling_count included) # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_v107_cascade_delete_removes_vitals(db_session: AsyncSession) -> None: """Deleting the repo must cascade-delete all musehub_symbol_vitals rows.""" repo = await create_repo(db_session) repo_id = repo.repo_id row = MusehubSymbolVitals( repo_id=repo_id, address="src/c.py::fn", first_introduced=_now(), change_count=1, version_count=1, op_add=1, op_modify=0, op_delete=0, op_move=0, coupling_count=3, ) db_session.add(row) await db_session.flush() await db_session.delete(repo) await db_session.flush() remaining = (await db_session.execute( select(MusehubSymbolVitals).where(MusehubSymbolVitals.repo_id == repo_id) )).scalars().all() assert remaining == []