"""TDD spec for Phase 1 — intel indexing schema expansion (issue #8). New normalized tables replace JSON blobs for 11 new muse code intel types, plus two new columns on musehub_symbol_intel. New tables: musehub_intel_coupling — co-changing file pairs musehub_intel_entangle — symbol entanglement pairs musehub_intel_dead — dead-code candidates musehub_intel_blast_risk — composite pre-release risk per symbol musehub_intel_stable — long-stable symbols musehub_intel_velocity — module growth velocity musehub_intel_clones — duplicate code clusters musehub_intel_type — per-symbol type health musehub_intel_api_surface — public API surface entries musehub_intel_languages — language breakdown per push musehub_intel_refactor_events — detected refactoring events Extended columns on musehub_symbol_intel: last_commit_id VARCHAR(128) — most recent commit that touched this symbol op VARCHAR(16) — latest op (add/modify/delete) Layers: 1. Schema — ORM model shape, column types, PK, indexes, FK cascade 2. Write — upsert helpers insert and overwrite correctly 3. Cascade — deleting repo removes all intel rows 4. Extended — new columns on musehub_symbol_intel """ from __future__ import annotations import secrets from datetime import datetime, timezone import pytest from sqlalchemy import inspect, select, text from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import fake_id from tests.factories import create_repo def _uid() -> str: return fake_id(secrets.token_hex(16)) def _now() -> datetime: return datetime.now(tz=timezone.utc) # ───────────────────────────────────────────────────────────────────────────── # Layer 1 — Schema: ORM models exist, correct tablename, correct columns # ───────────────────────────────────────────────────────────────────────────── class TestPhase1SchemaModels: def test_P1_01_coupling_model_importable(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelCoupling assert MusehubIntelCoupling.__tablename__ == "musehub_intel_coupling" def test_P1_02_coupling_columns(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelCoupling cols = {c.name for c in MusehubIntelCoupling.__table__.columns} assert cols >= {"repo_id", "file_a", "file_b", "co_changes", "ref"} def test_P1_03_entangle_model_importable(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelEntangle assert MusehubIntelEntangle.__tablename__ == "musehub_intel_entangle" def test_P1_04_entangle_columns(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelEntangle cols = {c.name for c in MusehubIntelEntangle.__table__.columns} assert cols >= { "repo_id", "symbol_a", "symbol_b", "co_change_rate", "co_changes", "structurally_linked", "ref", } def test_P1_05_dead_model_importable(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelDead assert MusehubIntelDead.__tablename__ == "musehub_intel_dead" def test_P1_06_dead_columns(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelDead cols = {c.name for c in MusehubIntelDead.__table__.columns} assert cols >= {"repo_id", "address", "kind", "confidence", "reason", "ref"} def test_P1_07_blast_risk_model_importable(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelBlastRisk assert MusehubIntelBlastRisk.__tablename__ == "musehub_intel_blast_risk" def test_P1_08_blast_risk_columns(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelBlastRisk cols = {c.name for c in MusehubIntelBlastRisk.__table__.columns} assert cols >= { "repo_id", "address", "kind", "risk", "risk_score", "impact_score", "churn_score", "test_gap_score", "coupling_score", "ref", } def test_P1_09_stable_model_importable(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelStable assert MusehubIntelStable.__tablename__ == "musehub_intel_stable" def test_P1_10_stable_columns(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelStable cols = {c.name for c in MusehubIntelStable.__table__.columns} assert cols >= {"repo_id", "address", "days_stable", "since_start", "ref"} def test_P1_11_velocity_model_importable(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelVelocity assert MusehubIntelVelocity.__tablename__ == "musehub_intel_velocity" def test_P1_12_velocity_columns(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelVelocity cols = {c.name for c in MusehubIntelVelocity.__table__.columns} assert cols >= { "repo_id", "module", "added", "removed", "net", "modified", "active_commits", "acceleration", "ref", } def test_P1_13_clones_model_importable(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelClones assert MusehubIntelClones.__tablename__ == "musehub_intel_clones" def test_P1_14_clones_columns(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelClones cols = {c.name for c in MusehubIntelClones.__table__.columns} assert cols >= {"repo_id", "cluster_hash", "tier", "member_count", "members_json", "ref"} def test_P1_15_type_model_importable(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelType assert MusehubIntelType.__tablename__ == "musehub_intel_type" def test_P1_16_type_columns(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelType cols = {c.name for c in MusehubIntelType.__table__.columns} assert cols >= { "repo_id", "address", "kind", "type_score", "params_total", "params_annotated", "params_with_any", "ref", } def test_P1_17_api_surface_model_importable(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelApiSurface assert MusehubIntelApiSurface.__tablename__ == "musehub_intel_api_surface" def test_P1_18_api_surface_columns(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelApiSurface cols = {c.name for c in MusehubIntelApiSurface.__table__.columns} assert cols >= {"repo_id", "address", "kind", "signature_id", "visibility", "ref"} def test_P1_19_languages_model_importable(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelLanguages assert MusehubIntelLanguages.__tablename__ == "musehub_intel_languages" def test_P1_20_languages_columns(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelLanguages cols = {c.name for c in MusehubIntelLanguages.__table__.columns} assert cols >= {"repo_id", "language", "symbol_count", "file_count", "pct", "ref"} def test_P1_21_refactor_events_model_importable(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelRefactorEvent assert MusehubIntelRefactorEvent.__tablename__ == "musehub_intel_refactor_events" def test_P1_22_refactor_events_columns(self) -> None: from musehub.db.musehub_intel_models import MusehubIntelRefactorEvent cols = {c.name for c in MusehubIntelRefactorEvent.__table__.columns} assert cols >= { "repo_id", "event_id", "kind", "address", "detail", "commit_id", "committed_at", } def test_P1_23_symbol_intel_extended_columns(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolIntel cols = {c.name for c in MusehubSymbolIntel.__table__.columns} assert "last_commit_id" in cols, "missing last_commit_id on musehub_symbol_intel" assert "op" in cols, "missing op on musehub_symbol_intel" # ───────────────────────────────────────────────────────────────────────────── # Layer 2 — Write: rows insert and upsert correctly # ───────────────────────────────────────────────────────────────────────────── class TestPhase1Write: @pytest.mark.asyncio async def test_P1_24_coupling_insert(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) row = db.MusehubIntelCoupling( repo_id=repo.repo_id, file_a="musehub/services/musehub_jobs.py", file_b="musehub/services/musehub_wire.py", co_changes=12, ref=_uid(), ) db_session.add(row) await db_session.flush() result = await db_session.execute( select(db.MusehubIntelCoupling).where( db.MusehubIntelCoupling.repo_id == repo.repo_id ) ) assert result.scalars().first() is not None @pytest.mark.asyncio async def test_P1_25_entangle_insert(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) row = db.MusehubIntelEntangle( repo_id=repo.repo_id, symbol_a="services/jobs.py::enqueue_push_intel", symbol_b="services/wire.py::wire_push_unpack_mpack", co_change_rate=0.85, co_changes=17, structurally_linked=False, ref=_uid(), ) db_session.add(row) await db_session.flush() result = await db_session.execute( select(db.MusehubIntelEntangle).where( db.MusehubIntelEntangle.repo_id == repo.repo_id ) ) assert result.scalars().first() is not None @pytest.mark.asyncio async def test_P1_26_dead_insert(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) row = db.MusehubIntelDead( repo_id=repo.repo_id, address="musehub/utils/legacy.py::old_helper", kind="function", confidence="high", reason="no callers found", ref=_uid(), ) db_session.add(row) await db_session.flush() result = await db_session.execute( select(db.MusehubIntelDead).where(db.MusehubIntelDead.repo_id == repo.repo_id) ) assert result.scalars().first() is not None @pytest.mark.asyncio async def test_P1_27_blast_risk_insert(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) row = db.MusehubIntelBlastRisk( repo_id=repo.repo_id, address="musehub/services/musehub_jobs.py::enqueue_push_intel", kind="function", risk="high", risk_score=87, impact_score=0.9, churn_score=0.7, test_gap_score=0.5, coupling_score=0.6, ref=_uid(), ) db_session.add(row) await db_session.flush() result = await db_session.execute( select(db.MusehubIntelBlastRisk).where( db.MusehubIntelBlastRisk.repo_id == repo.repo_id ) ) assert result.scalars().first() is not None @pytest.mark.asyncio async def test_P1_28_stable_insert(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) row = db.MusehubIntelStable( repo_id=repo.repo_id, address="musehub/core/genesis.py::compute_identity_id", days_stable=90, since_start=False, ref=_uid(), ) db_session.add(row) await db_session.flush() result = await db_session.execute( select(db.MusehubIntelStable).where( db.MusehubIntelStable.repo_id == repo.repo_id ) ) assert result.scalars().first() is not None @pytest.mark.asyncio async def test_P1_29_velocity_insert(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) row = db.MusehubIntelVelocity( repo_id=repo.repo_id, module="musehub/services", added=5, removed=1, net=4, modified=3, active_commits=10, prior_added=3, prior_net=2, acceleration=1.5, stagnant_commits=0, ref=_uid(), ) db_session.add(row) await db_session.flush() result = await db_session.execute( select(db.MusehubIntelVelocity).where( db.MusehubIntelVelocity.repo_id == repo.repo_id ) ) assert result.scalars().first() is not None @pytest.mark.asyncio async def test_P1_30_clones_insert(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) row = db.MusehubIntelClones( repo_id=repo.repo_id, cluster_hash=_uid(), tier="exact", member_count=3, members_json='["a.py::fn", "b.py::fn", "c.py::fn"]', ref=_uid(), ) db_session.add(row) await db_session.flush() result = await db_session.execute( select(db.MusehubIntelClones).where( db.MusehubIntelClones.repo_id == repo.repo_id ) ) assert result.scalars().first() is not None @pytest.mark.asyncio async def test_P1_31_type_insert(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) row = db.MusehubIntelType( repo_id=repo.repo_id, address="musehub/services/musehub_jobs.py::enqueue_push_intel", kind="function", return_is_any=False, params_total=4, params_annotated=4, params_with_any=0, type_score=1.0, ref=_uid(), ) db_session.add(row) await db_session.flush() result = await db_session.execute( select(db.MusehubIntelType).where( db.MusehubIntelType.repo_id == repo.repo_id ) ) assert result.scalars().first() is not None @pytest.mark.asyncio async def test_P1_32_api_surface_insert(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) row = db.MusehubIntelApiSurface( repo_id=repo.repo_id, address="musehub/api/routes/musehub/ui_commits.py::commits_page", kind="function", signature_id=_uid(), visibility="public", ref=_uid(), ) db_session.add(row) await db_session.flush() result = await db_session.execute( select(db.MusehubIntelApiSurface).where( db.MusehubIntelApiSurface.repo_id == repo.repo_id ) ) assert result.scalars().first() is not None @pytest.mark.asyncio async def test_P1_33_languages_insert(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) row = db.MusehubIntelLanguages( repo_id=repo.repo_id, language="Python", symbol_count=1240, file_count=88, pct=97.5, ref=_uid(), ) db_session.add(row) await db_session.flush() result = await db_session.execute( select(db.MusehubIntelLanguages).where( db.MusehubIntelLanguages.repo_id == repo.repo_id ) ) assert result.scalars().first() is not None @pytest.mark.asyncio async def test_P1_34_refactor_event_insert(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) commit_id = _uid() row = db.MusehubIntelRefactorEvent( event_id=_uid(), repo_id=repo.repo_id, kind="rename", address="musehub/services/jobs.py::enqueue", detail="→ enqueue_push_intel", commit_id=commit_id, committed_at=_now(), ) db_session.add(row) await db_session.flush() result = await db_session.execute( select(db.MusehubIntelRefactorEvent).where( db.MusehubIntelRefactorEvent.repo_id == repo.repo_id ) ) assert result.scalars().first() is not None @pytest.mark.asyncio async def test_P1_35_symbol_intel_extended_columns_write(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) commit_id = _uid() row = db.MusehubSymbolIntel( repo_id=repo.repo_id, address="musehub/services/musehub_jobs.py::enqueue_push_intel", last_commit_id=commit_id, op="modify", ) db_session.add(row) await db_session.flush() result = await db_session.execute( select(db.MusehubSymbolIntel).where( db.MusehubSymbolIntel.repo_id == repo.repo_id ) ) row_back = result.scalars().first() assert row_back is not None assert row_back.last_commit_id == commit_id assert row_back.op == "modify" # ───────────────────────────────────────────────────────────────────────────── # Layer 3 — Cascade: deleting repo removes all intel rows # ───────────────────────────────────────────────────────────────────────────── class TestPhase1Cascade: @pytest.mark.asyncio async def test_P1_36_cascade_delete_removes_coupling(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) ref = _uid() db_session.add(db.MusehubIntelCoupling( repo_id=repo.repo_id, file_a="a.py", file_b="b.py", co_changes=5, ref=ref, )) await db_session.flush() await db_session.delete(repo) await db_session.flush() result = await db_session.execute( select(db.MusehubIntelCoupling).where( db.MusehubIntelCoupling.repo_id == repo.repo_id ) ) assert result.scalars().first() is None @pytest.mark.asyncio async def test_P1_37_cascade_delete_removes_dead(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) db_session.add(db.MusehubIntelDead( repo_id=repo.repo_id, address="a.py::fn", kind="function", confidence="high", reason="no callers", ref=_uid(), )) await db_session.flush() await db_session.delete(repo) await db_session.flush() result = await db_session.execute( select(db.MusehubIntelDead).where( db.MusehubIntelDead.repo_id == repo.repo_id ) ) assert result.scalars().first() is None @pytest.mark.asyncio async def test_P1_38_cascade_delete_removes_refactor_events(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) db_session.add(db.MusehubIntelRefactorEvent( event_id=_uid(), repo_id=repo.repo_id, kind="rename", address="a.py::old", detail="→ new", commit_id=_uid(), committed_at=_now(), )) await db_session.flush() await db_session.delete(repo) await db_session.flush() result = await db_session.execute( select(db.MusehubIntelRefactorEvent).where( db.MusehubIntelRefactorEvent.repo_id == repo.repo_id ) ) assert result.scalars().first() is None # ───────────────────────────────────────────────────────────────────────────── # Layer 4 — DB-level: tables exist in Postgres (not just ORM) # ───────────────────────────────────────────────────────────────────────────── class TestPhase1DatabaseTables: @pytest.mark.asyncio async def test_P1_39_all_new_tables_exist_in_db(self, db_session: AsyncSession) -> None: expected = { "musehub_intel_coupling", "musehub_intel_entangle", "musehub_intel_dead", "musehub_intel_blast_risk", "musehub_intel_stable", "musehub_intel_velocity", "musehub_intel_clones", "musehub_intel_type", "musehub_intel_api_surface", "musehub_intel_languages", "musehub_intel_refactor_events", } result = await db_session.execute( text("SELECT tablename FROM pg_tables WHERE schemaname = 'public'") ) existing = {row[0] for row in result} missing = expected - existing assert not missing, f"Tables missing from DB: {missing}" @pytest.mark.asyncio async def test_P1_40_symbol_intel_extended_columns_in_db(self, db_session: AsyncSession) -> None: result = await db_session.execute( text(""" SELECT column_name FROM information_schema.columns WHERE table_name = 'musehub_symbol_intel' AND column_name IN ('last_commit_id', 'op') """) ) found = {row[0] for row in result} assert "last_commit_id" in found, "last_commit_id missing from musehub_symbol_intel" assert "op" in found, "op missing from musehub_symbol_intel"