"""TDD spec for Phase 3, Part 1 — gravity schema extension (issue #9). Extends musehub_symbol_intel with 6 new columns that power /intel/gravity: gravity_pct FLOAT — gravity_pct from muse code gravity gravity_direct_dependents INTEGER — direct_dependents count gravity_transitive_dependents INTEGER — transitive_dependents count gravity_max_depth SMALLINT — deepest dependency chain gravity_depth_distribution JSONB — {depth_level: count} for the sparkline symbol_kind VARCHAR(64) — method/function/class/async_method New index: (repo_id, gravity_pct DESC) — primary sort key for the page. Layers: 1. Schema — column types, nullability, ORM model 2. Index — (repo_id, gravity_pct DESC) exists in DB metadata 3. Write — insert and read back all 6 new fields 4. JSONB — depth_dist round-trips as a Python dict 5. Upsert — gravity update leaves churn/blast columns untouched 6. Null-safe — existing rows without gravity data remain valid 7. Ordering — rows ordered by gravity_pct DESC via SQL 8. Kind — all four symbol_kind values round-trip correctly """ from __future__ import annotations import secrets from datetime import datetime, timezone import pytest from sqlalchemy import inspect, select, text from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import fake_id from tests.factories import create_repo def _uid() -> str: return fake_id(secrets.token_hex(16)) def _now() -> datetime: return datetime.now(tz=timezone.utc) # ───────────────────────────────────────────────────────────────────────────── # Layer 1 — Schema: 6 new columns on MusehubSymbolIntel # ───────────────────────────────────────────────────────────────────────────── class TestGravitySchemaColumns: def test_P3_01_gravity_pct_column_exists(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolIntel cols = {c.name for c in MusehubSymbolIntel.__table__.columns} assert "gravity_pct" in cols def test_P3_02_gravity_direct_dependents_column_exists(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolIntel cols = {c.name for c in MusehubSymbolIntel.__table__.columns} assert "gravity_direct_dependents" in cols def test_P3_03_gravity_transitive_dependents_column_exists(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolIntel cols = {c.name for c in MusehubSymbolIntel.__table__.columns} assert "gravity_transitive_dependents" in cols def test_P3_04_gravity_max_depth_column_exists(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolIntel cols = {c.name for c in MusehubSymbolIntel.__table__.columns} assert "gravity_max_depth" in cols def test_P3_05_gravity_depth_distribution_column_exists(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolIntel cols = {c.name for c in MusehubSymbolIntel.__table__.columns} assert "gravity_depth_distribution" in cols def test_P3_06_symbol_kind_column_exists(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolIntel cols = {c.name for c in MusehubSymbolIntel.__table__.columns} assert "symbol_kind" in cols def test_P3_07_all_six_columns_present(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolIntel cols = {c.name for c in MusehubSymbolIntel.__table__.columns} assert cols >= { "gravity_pct", "gravity_direct_dependents", "gravity_transitive_dependents", "gravity_max_depth", "gravity_depth_distribution", "symbol_kind", } def test_P3_08_gravity_pct_is_nullable(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolIntel col = MusehubSymbolIntel.__table__.c.gravity_pct assert col.nullable is True, "gravity_pct must be nullable — existing rows have no gravity data" def test_P3_09_gravity_depth_distribution_is_nullable(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolIntel col = MusehubSymbolIntel.__table__.c.gravity_depth_distribution assert col.nullable is True, "gravity_depth_distribution must be nullable" def test_P3_10_symbol_kind_is_nullable(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolIntel col = MusehubSymbolIntel.__table__.c.symbol_kind assert col.nullable is True, "symbol_kind must be nullable — backfilled on next push" # ───────────────────────────────────────────────────────────────────────────── # Layer 2 — Index: (repo_id, gravity_pct DESC) in table args # ───────────────────────────────────────────────────────────────────────────── class TestGravityIndex: def test_P3_11_gravity_pct_index_defined(self) -> None: from musehub.db.musehub_intel_models import MusehubSymbolIntel index_names = {idx.name for idx in MusehubSymbolIntel.__table__.indexes} assert any("gravity_pct" in name for name in index_names), ( f"Expected an index on gravity_pct, got: {index_names}" ) # ───────────────────────────────────────────────────────────────────────────── # Layer 3 — Write: all 6 new fields insert and read back # ───────────────────────────────────────────────────────────────────────────── class TestGravityWrite: @pytest.mark.asyncio async def test_P3_12_gravity_full_write(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) dist = {"1": 11, "2": 484, "3": 197, "4": 35, "5": 5, "6": 1} row = db.MusehubSymbolIntel( repo_id=repo.repo_id, address="musehub/storage/backends.py::S3Backend._key", gravity_pct=38.9, gravity_direct_dependents=11, gravity_transitive_dependents=733, gravity_max_depth=6, gravity_depth_distribution=dist, symbol_kind="method", ) db_session.add(row) await db_session.flush() result = await db_session.execute( select(db.MusehubSymbolIntel).where( db.MusehubSymbolIntel.repo_id == repo.repo_id ) ) back = result.scalars().first() assert back is not None assert back.gravity_pct == pytest.approx(38.9) assert back.gravity_direct_dependents == 11 assert back.gravity_transitive_dependents == 733 assert back.gravity_max_depth == 6 assert back.symbol_kind == "method" @pytest.mark.asyncio async def test_P3_13_gravity_write_without_optional_fields(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) row = db.MusehubSymbolIntel( repo_id=repo.repo_id, address="musehub/services/musehub_jobs.py::enqueue_push_intel", ) db_session.add(row) await db_session.flush() result = await db_session.execute( select(db.MusehubSymbolIntel).where( db.MusehubSymbolIntel.repo_id == repo.repo_id ) ) back = result.scalars().first() assert back is not None assert back.gravity_pct is None assert back.gravity_depth_distribution is None assert back.symbol_kind is None # ───────────────────────────────────────────────────────────────────────────── # Layer 4 — JSONB: depth_distribution round-trips as a Python dict # ───────────────────────────────────────────────────────────────────────────── class TestGravityDepthDistJsonb: @pytest.mark.asyncio async def test_P3_14_depth_dist_shallow_broad(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) dist = {"1": 424, "2": 206, "3": 46, "4": 5, "5": 1} row = db.MusehubSymbolIntel( repo_id=repo.repo_id, address="musehub/storage/backends.py::StorageBackend.get", gravity_depth_distribution=dist, ) db_session.add(row) await db_session.flush() await db_session.refresh(row) assert row.gravity_depth_distribution == dist @pytest.mark.asyncio async def test_P3_15_depth_dist_deep_narrow(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) dist = {"1": 1, "2": 17, "3": 27, "4": 406, "5": 189, "6": 42, "7": 5, "8": 1} row = db.MusehubSymbolIntel( repo_id=repo.repo_id, address="musehub/storage/backends.py::BlobBackend", gravity_depth_distribution=dist, ) db_session.add(row) await db_session.flush() await db_session.refresh(row) assert row.gravity_depth_distribution == dist @pytest.mark.asyncio async def test_P3_16_depth_dist_single_depth(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) dist = {"1": 3} row = db.MusehubSymbolIntel( repo_id=repo.repo_id, address="musehub/services/some_leaf.py::leaf_fn", gravity_depth_distribution=dist, ) db_session.add(row) await db_session.flush() await db_session.refresh(row) assert row.gravity_depth_distribution == dist @pytest.mark.asyncio async def test_P3_17_depth_dist_nine_levels(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) dist = {"1": 1, "2": 8, "3": 155, "4": 71, "5": 11, "6": 4, "7": 8, "8": 5, "9": 3} row = db.MusehubSymbolIntel( repo_id=repo.repo_id, address="musehub/models/musehub.py::RepoResponse", gravity_depth_distribution=dist, ) db_session.add(row) await db_session.flush() await db_session.refresh(row) assert row.gravity_depth_distribution == dist assert len(row.gravity_depth_distribution) == 9 # ───────────────────────────────────────────────────────────────────────────── # Layer 5 — Upsert: gravity update does not touch churn/blast columns # ───────────────────────────────────────────────────────────────────────────── class TestGravityUpsert: @pytest.mark.asyncio async def test_P3_18_upsert_gravity_preserves_churn(self, db_session: AsyncSession) -> None: from sqlalchemy.dialects.postgresql import insert as pg_insert from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) address = "musehub/storage/backends.py::S3Backend._key" # Insert base row with churn data await db_session.execute( pg_insert(db.MusehubSymbolIntel).values( repo_id=repo.repo_id, address=address, churn=42, gravity=0.0, blast_top=[], weekly=[], ).on_conflict_do_update( index_elements=["repo_id", "address"], set_={"churn": 42}, ) ) await db_session.flush() # Now upsert with gravity fields only await db_session.execute( pg_insert(db.MusehubSymbolIntel).values( repo_id=repo.repo_id, address=address, blast_top=[], weekly=[], gravity_pct=38.9, gravity_direct_dependents=11, gravity_transitive_dependents=733, gravity_max_depth=6, gravity_depth_distribution={"1": 11, "2": 484, "3": 197, "4": 35, "5": 5, "6": 1}, symbol_kind="method", ).on_conflict_do_update( index_elements=["repo_id", "address"], set_={ "gravity_pct": 38.9, "gravity_direct_dependents": 11, "gravity_transitive_dependents": 733, "gravity_max_depth": 6, "gravity_depth_distribution": {"1": 11, "2": 484, "3": 197, "4": 35, "5": 5, "6": 1}, "symbol_kind": "method", }, ) ) await db_session.flush() result = await db_session.execute( select(db.MusehubSymbolIntel).where( db.MusehubSymbolIntel.repo_id == repo.repo_id, db.MusehubSymbolIntel.address == address, ) ) back = result.scalars().first() assert back is not None assert back.churn == 42, "churn must be preserved after gravity upsert" assert back.gravity_pct == pytest.approx(38.9) assert back.gravity_direct_dependents == 11 @pytest.mark.asyncio async def test_P3_19_upsert_gravity_idempotent(self, db_session: AsyncSession) -> None: from sqlalchemy.dialects.postgresql import insert as pg_insert from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) address = "musehub/storage/backends.py::get_backend" dist = {"1": 17, "2": 27, "3": 406, "4": 189, "5": 42, "6": 5, "7": 1} for _ in range(3): await db_session.execute( pg_insert(db.MusehubSymbolIntel).values( repo_id=repo.repo_id, address=address, blast_top=[], weekly=[], gravity_pct=36.5, gravity_direct_dependents=17, gravity_transitive_dependents=687, gravity_max_depth=7, gravity_depth_distribution=dist, symbol_kind="function", ).on_conflict_do_update( index_elements=["repo_id", "address"], set_={ "gravity_pct": 36.5, "gravity_direct_dependents": 17, "gravity_transitive_dependents": 687, "gravity_max_depth": 7, "gravity_depth_distribution": dist, "symbol_kind": "function", }, ) ) await db_session.flush() result = await db_session.execute( select(db.MusehubSymbolIntel).where( db.MusehubSymbolIntel.repo_id == repo.repo_id ) ) rows = result.scalars().all() assert len(rows) == 1, "idempotent upsert must not create duplicate rows" assert rows[0].gravity_pct == pytest.approx(36.5) # ───────────────────────────────────────────────────────────────────────────── # Layer 6 — Null-safe: rows without gravity data are valid # ───────────────────────────────────────────────────────────────────────────── class TestGravityNullSafe: @pytest.mark.asyncio async def test_P3_20_churn_only_row_valid(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) row = db.MusehubSymbolIntel( repo_id=repo.repo_id, address="musehub/services/musehub_jobs.py::some_fn", churn=5, blast=2, ) db_session.add(row) await db_session.flush() result = await db_session.execute( select(db.MusehubSymbolIntel).where( db.MusehubSymbolIntel.repo_id == repo.repo_id ) ) back = result.scalars().first() assert back is not None assert back.gravity_pct is None assert back.gravity_depth_distribution is None assert back.symbol_kind is None assert back.churn == 5 @pytest.mark.asyncio async def test_P3_21_gravity_pct_filter_excludes_nulls(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) # One row with gravity, one without db_session.add(db.MusehubSymbolIntel( repo_id=repo.repo_id, address="backends.py::S3Backend._key", gravity_pct=38.9, )) db_session.add(db.MusehubSymbolIntel( repo_id=repo.repo_id, address="backends.py::some_utility", )) await db_session.flush() result = await db_session.execute( select(db.MusehubSymbolIntel).where( db.MusehubSymbolIntel.repo_id == repo.repo_id, db.MusehubSymbolIntel.gravity_pct.is_not(None), ) ) rows = result.scalars().all() assert len(rows) == 1 assert rows[0].address == "backends.py::S3Backend._key" # ───────────────────────────────────────────────────────────────────────────── # Layer 7 — Ordering: gravity_pct DESC gives correct rank order # ───────────────────────────────────────────────────────────────────────────── class TestGravityOrdering: @pytest.mark.asyncio async def test_P3_22_ordered_by_gravity_pct_desc(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) symbols = [ ("backends.py::_key", 38.9), ("backends.py::_get_client", 38.8), ("backends.py::get_backend", 36.5), ("models.py::RepoResponse", 14.1), ] for address, pct in symbols: db_session.add(db.MusehubSymbolIntel( repo_id=repo.repo_id, address=address, gravity_pct=pct, )) await db_session.flush() result = await db_session.execute( select(db.MusehubSymbolIntel) .where( db.MusehubSymbolIntel.repo_id == repo.repo_id, db.MusehubSymbolIntel.gravity_pct.is_not(None), ) .order_by(db.MusehubSymbolIntel.gravity_pct.desc()) ) rows = result.scalars().all() pcts = [r.gravity_pct for r in rows] assert pcts == sorted(pcts, reverse=True) assert pcts[0] == pytest.approx(38.9) @pytest.mark.asyncio async def test_P3_23_top_n_query(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) for i in range(10): db_session.add(db.MusehubSymbolIntel( repo_id=repo.repo_id, address=f"backends.py::sym_{i:02d}", gravity_pct=float(i), )) await db_session.flush() result = await db_session.execute( select(db.MusehubSymbolIntel) .where( db.MusehubSymbolIntel.repo_id == repo.repo_id, db.MusehubSymbolIntel.gravity_pct.is_not(None), ) .order_by(db.MusehubSymbolIntel.gravity_pct.desc()) .limit(3) ) rows = result.scalars().all() assert len(rows) == 3 assert rows[0].gravity_pct == pytest.approx(9.0) # ───────────────────────────────────────────────────────────────────────────── # Layer 8 — Kind: all four symbol_kind values round-trip # ───────────────────────────────────────────────────────────────────────────── class TestGravitySymbolKind: @pytest.mark.asyncio @pytest.mark.parametrize("kind", ["method", "function", "class", "async_method"]) async def test_P3_24_symbol_kind_roundtrip(self, db_session: AsyncSession, kind: str) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) row = db.MusehubSymbolIntel( repo_id=repo.repo_id, address=f"backends.py::sym_for_{kind}", symbol_kind=kind, gravity_pct=10.0, ) db_session.add(row) await db_session.flush() await db_session.refresh(row) assert row.symbol_kind == kind @pytest.mark.asyncio async def test_P3_28_kind_filter_query(self, db_session: AsyncSession) -> None: from musehub.db import musehub_intel_models as db repo = await create_repo(db_session) for kind in ("method", "function", "class", "async_method"): db_session.add(db.MusehubSymbolIntel( repo_id=repo.repo_id, address=f"backends.py::sym_{kind}", symbol_kind=kind, gravity_pct=10.0, )) await db_session.flush() result = await db_session.execute( select(db.MusehubSymbolIntel).where( db.MusehubSymbolIntel.repo_id == repo.repo_id, db.MusehubSymbolIntel.symbol_kind == "method", ) ) rows = result.scalars().all() assert len(rows) == 1 assert rows[0].symbol_kind == "method"