"""TDD spec — GravityProvider rewrite: SQL-derived gravity, no muse CLI. GravityProvider must compute gravity scores directly from the blast columns already written by intel.code, rather than calling `muse code gravity`. Formula (mirrors muse/muse/cli/commands/gravity.py exactly): total = count of tracked-kind symbols for this repo denom = max(1, total - 1) # exclude self, guard /0 gravity_pct = round(blast / denom * 100, 1) Column mapping: gravity_direct_dependents ← blast_direct gravity_transitive_dependents ← blast (blast_direct + blast_cross) gravity_pct ← round(blast / max(1, total - 1) * 100, 1) gravity_max_depth — not derivable from blast; left NULL gravity_depth_distribution — not derivable from blast; left NULL Tracked kinds (denominator scope, matching gravity.py _TRACKED_KINDS): function, async_function, method, async_method, class Layers: 1. No subprocess — compute() never spawns a process 2. Formula — gravity_pct matches gravity.py rounding + denominator 3. Mapping — gravity_direct_dependents = blast_direct; gravity_transitive_dependents = blast 4. Denominator — untracked kinds (import, None) excluded from total 5. Edge: single — denom = max(1, 1-1) = 1, no /0 6. Writes only — rows that exist get updated; no new rows inserted 7. Preserve — churn/blast columns untouched after compute() 8. Idempotent — run twice yields identical rows, no duplicates 9. Empty — no blast data → returns [] 10. Null max_depth — gravity_max_depth stays NULL (not derivable) """ from __future__ import annotations import pytest import pytest_asyncio from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func from musehub.db.musehub_intel_models import MusehubSymbolIntel from tests.factories import create_repo _TRACKED_KINDS = ("function", "async_function", "method", "async_method", "class") # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _SYMBOL_DEFAULTS = { "churn": 0, "churn_30d": 0, "churn_90d": 0, "blast": 0, "blast_direct": 0, "blast_cross": 0, "blast_top": [], "author_count": 0, "gravity": 0.0, "weekly": [0] * 12, } async def _seed_symbols( session: AsyncSession, repo_id: str, symbols: list[dict], ) -> None: """Insert musehub_symbol_intel rows with blast + kind data.""" for s in symbols: row = {**_SYMBOL_DEFAULTS, **s} stmt = ( pg_insert(MusehubSymbolIntel) .values(repo_id=repo_id, **row) .on_conflict_do_update( index_elements=["repo_id", "address"], set_={k: v for k, v in row.items() if k != "address"}, ) ) await session.execute(stmt) await session.flush() async def _get_row(session: AsyncSession, repo_id: str, address: str) -> MusehubSymbolIntel | None: result = await session.execute( select(MusehubSymbolIntel).where( MusehubSymbolIntel.repo_id == repo_id, MusehubSymbolIntel.address == address, ) ) return result.scalar_one_or_none() def _gravity_pct(blast: int, total: int) -> float: """Reference implementation — mirrors gravity.py exactly.""" denom = max(1, total - 1) return round(blast / denom * 100, 1) # --------------------------------------------------------------------------- # Layer 1 — No subprocess # --------------------------------------------------------------------------- class TestNoSubprocess: @pytest.mark.asyncio async def test_P5_01_compute_never_spawns_subprocess( self, db_session: AsyncSession ) -> None: """GravityProvider must not call muse CLI — pure SQL derivation.""" import asyncio from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) await _seed_symbols(db_session, repo.repo_id, [ {"address": "a.py::fn", "symbol_kind": "function", "blast": 5, "blast_direct": 2, "blast_cross": 3}, ]) spawned: list[tuple] = [] original = asyncio.create_subprocess_exec async def _spy(*args: typing.Any, **kwargs: typing.Any) -> None: spawned.append(args) return await original(*args, **kwargs) import unittest.mock as mock with mock.patch("asyncio.create_subprocess_exec", side_effect=_spy): await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {"owner": repo.owner, "slug": repo.slug}, ) assert spawned == [], ( f"GravityProvider spawned {len(spawned)} subprocess(es); expected 0. " "Gravity must be derived from blast columns, not from muse CLI." ) # --------------------------------------------------------------------------- # Layer 2 — Formula: gravity_pct matches gravity.py exactly # --------------------------------------------------------------------------- class TestFormula: @pytest.mark.asyncio async def test_P5_02_gravity_pct_matches_formula( self, db_session: AsyncSession ) -> None: """gravity_pct = round(blast / max(1, total-1) * 100, 1).""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) # 10 tracked symbols; symbol A has blast=9 symbols = [ {"address": f"a.py::fn{i}", "symbol_kind": "function", "blast": 1, "blast_direct": 1, "blast_cross": 0} for i in range(9) ] symbols.append( {"address": "a.py::target", "symbol_kind": "function", "blast": 9, "blast_direct": 3, "blast_cross": 6} ) await _seed_symbols(db_session, repo.repo_id, symbols) await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) row = await _get_row(db_session, repo.repo_id, "a.py::target") assert row is not None expected = _gravity_pct(blast=9, total=10) # round(9/9*100, 1) = 100.0 assert row.gravity_pct == pytest.approx(expected), ( f"gravity_pct={row.gravity_pct}, expected {expected}" ) @pytest.mark.asyncio async def test_P5_03_gravity_pct_fractional_rounding( self, db_session: AsyncSession ) -> None: """Rounding to 1 decimal place matches Python round().""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) # 9 symbols total; target has blast=4 → 4/8*100 = 50.0 symbols = [ {"address": f"a.py::fn{i}", "symbol_kind": "method", "blast": 0, "blast_direct": 0, "blast_cross": 0} for i in range(8) ] symbols.append( {"address": "a.py::target", "symbol_kind": "method", "blast": 4, "blast_direct": 1, "blast_cross": 3} ) await _seed_symbols(db_session, repo.repo_id, symbols) await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) row = await _get_row(db_session, repo.repo_id, "a.py::target") expected = _gravity_pct(blast=4, total=9) # round(4/8*100, 1) = 50.0 assert row.gravity_pct == pytest.approx(expected) @pytest.mark.asyncio async def test_P5_04_all_symbols_get_gravity_pct( self, db_session: AsyncSession ) -> None: """Every tracked-kind row gets a gravity_pct, including blast=0 rows.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) await _seed_symbols(db_session, repo.repo_id, [ {"address": "a.py::fn_a", "symbol_kind": "function", "blast": 3, "blast_direct": 1, "blast_cross": 2}, {"address": "a.py::fn_b", "symbol_kind": "function", "blast": 0, "blast_direct": 0, "blast_cross": 0}, ]) await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) for addr in ("a.py::fn_a", "a.py::fn_b"): row = await _get_row(db_session, repo.repo_id, addr) assert row is not None assert row.gravity_pct is not None, f"{addr} missing gravity_pct" # --------------------------------------------------------------------------- # Layer 3 — Column mapping # --------------------------------------------------------------------------- class TestColumnMapping: @pytest.mark.asyncio async def test_P5_05_gravity_direct_dependents_equals_blast_direct( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) await _seed_symbols(db_session, repo.repo_id, [ {"address": "a.py::fn", "symbol_kind": "function", "blast": 7, "blast_direct": 3, "blast_cross": 4}, ]) await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) row = await _get_row(db_session, repo.repo_id, "a.py::fn") assert row.gravity_direct_dependents == 3 @pytest.mark.asyncio async def test_P5_06_gravity_transitive_dependents_equals_blast( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) await _seed_symbols(db_session, repo.repo_id, [ {"address": "a.py::fn", "symbol_kind": "function", "blast": 7, "blast_direct": 3, "blast_cross": 4}, ]) await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) row = await _get_row(db_session, repo.repo_id, "a.py::fn") assert row.gravity_transitive_dependents == 7 # blast = blast_direct + blast_cross # --------------------------------------------------------------------------- # Layer 4 — Denominator scope: only tracked kinds # --------------------------------------------------------------------------- class TestDenominator: @pytest.mark.asyncio async def test_P5_07_import_kind_excluded_from_denominator( self, db_session: AsyncSession ) -> None: """import-kind rows don't count toward total_prod_symbols.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) # 2 tracked + 5 import = 2 tracked total for denominator await _seed_symbols(db_session, repo.repo_id, [ {"address": "a.py::fn_a", "symbol_kind": "function", "blast": 1, "blast_direct": 1, "blast_cross": 0}, {"address": "a.py::fn_b", "symbol_kind": "function", "blast": 1, "blast_direct": 1, "blast_cross": 0}, ] + [ {"address": f"a.py::import_{i}", "symbol_kind": "import", "blast": 0, "blast_direct": 0, "blast_cross": 0} for i in range(5) ]) await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) row = await _get_row(db_session, repo.repo_id, "a.py::fn_a") # total tracked = 2, denom = max(1, 2-1) = 1 expected = _gravity_pct(blast=1, total=2) assert row.gravity_pct == pytest.approx(expected) @pytest.mark.asyncio async def test_P5_08_null_kind_excluded_from_denominator( self, db_session: AsyncSession ) -> None: """Rows with symbol_kind=NULL don't count toward total.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) await _seed_symbols(db_session, repo.repo_id, [ {"address": "a.py::fn", "symbol_kind": "function", "blast": 1, "blast_direct": 1, "blast_cross": 0}, {"address": "a.py::unknown", "symbol_kind": None, "blast": 0, "blast_direct": 0, "blast_cross": 0}, ]) await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) row = await _get_row(db_session, repo.repo_id, "a.py::fn") # total tracked = 1, denom = max(1, 1-1) = 1 expected = _gravity_pct(blast=1, total=1) assert row.gravity_pct == pytest.approx(expected) @pytest.mark.asyncio async def test_P5_09_all_tracked_kinds_count_in_denominator( self, db_session: AsyncSession ) -> None: """All 5 tracked kinds contribute to the denominator.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) symbols = [ {"address": f"a.py::{kind}_sym", "symbol_kind": kind, "blast": 0, "blast_direct": 0, "blast_cross": 0} for kind in _TRACKED_KINDS ] symbols.append( {"address": "a.py::target", "symbol_kind": "function", "blast": 5, "blast_direct": 2, "blast_cross": 3} ) await _seed_symbols(db_session, repo.repo_id, symbols) await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) row = await _get_row(db_session, repo.repo_id, "a.py::target") # 6 tracked symbols total (5 kinds + target itself) expected = _gravity_pct(blast=5, total=6) assert row.gravity_pct == pytest.approx(expected) # --------------------------------------------------------------------------- # Layer 5 — Edge: single symbol # --------------------------------------------------------------------------- class TestEdgeCases: @pytest.mark.asyncio async def test_P5_10_single_symbol_denom_is_one( self, db_session: AsyncSession ) -> None: """Single symbol: denom = max(1, 1-1) = 1, no ZeroDivisionError.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) await _seed_symbols(db_session, repo.repo_id, [ {"address": "a.py::only", "symbol_kind": "function", "blast": 0, "blast_direct": 0, "blast_cross": 0}, ]) results = await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) row = await _get_row(db_session, repo.repo_id, "a.py::only") assert row.gravity_pct == pytest.approx(0.0) assert results != [] @pytest.mark.asyncio async def test_P5_11_zero_blast_yields_zero_pct( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) await _seed_symbols(db_session, repo.repo_id, [ {"address": "a.py::leaf", "symbol_kind": "function", "blast": 0, "blast_direct": 0, "blast_cross": 0}, {"address": "b.py::other", "symbol_kind": "function", "blast": 2, "blast_direct": 1, "blast_cross": 1}, ]) await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) row = await _get_row(db_session, repo.repo_id, "a.py::leaf") assert row.gravity_pct == pytest.approx(0.0) # --------------------------------------------------------------------------- # Layer 6 — Writes only existing rows, no new inserts # --------------------------------------------------------------------------- class TestWriteBehavior: @pytest.mark.asyncio async def test_P5_12_row_count_unchanged_after_compute( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) await _seed_symbols(db_session, repo.repo_id, [ {"address": "a.py::fn_a", "symbol_kind": "function", "blast": 3, "blast_direct": 1, "blast_cross": 2}, {"address": "a.py::fn_b", "symbol_kind": "function", "blast": 1, "blast_direct": 1, "blast_cross": 0}, ]) before = (await db_session.execute( select(func.count()).where( MusehubSymbolIntel.repo_id == repo.repo_id ) )).scalar_one() await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) after = (await db_session.execute( select(func.count()).where( MusehubSymbolIntel.repo_id == repo.repo_id ) )).scalar_one() assert after == before, ( f"Row count changed: {before} → {after}. " "GravityProvider must update existing rows, not insert new ones." ) @pytest.mark.asyncio async def test_P5_13_returns_intel_results_tuple( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) await _seed_symbols(db_session, repo.repo_id, [ {"address": "a.py::fn", "symbol_kind": "function", "blast": 1, "blast_direct": 1, "blast_cross": 0}, ]) results = await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) assert isinstance(results, list) and len(results) > 0 job_type, data = results[0] assert job_type == "intel.code.gravity" assert "count" in data assert data["count"] >= 1 # --------------------------------------------------------------------------- # Layer 7 — Preserve non-gravity columns # --------------------------------------------------------------------------- class TestPreserve: @pytest.mark.asyncio async def test_P5_14_churn_preserved_after_compute( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) await _seed_symbols(db_session, repo.repo_id, [ {"address": "a.py::fn", "symbol_kind": "function", "blast": 5, "blast_direct": 2, "blast_cross": 3, "churn": 42, "churn_30d": 7}, ]) await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) row = await _get_row(db_session, repo.repo_id, "a.py::fn") assert row.churn == 42, "churn must not be overwritten by gravity compute" assert row.churn_30d == 7, "churn_30d must not be overwritten" @pytest.mark.asyncio async def test_P5_15_blast_columns_preserved_after_compute( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) await _seed_symbols(db_session, repo.repo_id, [ {"address": "a.py::fn", "symbol_kind": "function", "blast": 5, "blast_direct": 2, "blast_cross": 3}, ]) await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) row = await _get_row(db_session, repo.repo_id, "a.py::fn") assert row.blast == 5 assert row.blast_direct == 2 assert row.blast_cross == 3 # --------------------------------------------------------------------------- # Layer 8 — Idempotent # --------------------------------------------------------------------------- class TestIdempotent: @pytest.mark.asyncio async def test_P5_16_second_run_yields_same_pct( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) await _seed_symbols(db_session, repo.repo_id, [ {"address": "a.py::fn", "symbol_kind": "function", "blast": 3, "blast_direct": 1, "blast_cross": 2}, ]) for _ in range(3): await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) row = await _get_row(db_session, repo.repo_id, "a.py::fn") expected = _gravity_pct(blast=3, total=1) assert row.gravity_pct == pytest.approx(expected) count = (await db_session.execute( select(func.count()).where( MusehubSymbolIntel.repo_id == repo.repo_id ) )).scalar_one() assert count == 1 # --------------------------------------------------------------------------- # Layer 9 — Empty: no tracked rows → returns [] # --------------------------------------------------------------------------- class TestEmpty: @pytest.mark.asyncio async def test_P5_17_no_rows_returns_empty( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) results = await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) assert results == [] @pytest.mark.asyncio async def test_P5_18_only_import_kind_rows_returns_empty( self, db_session: AsyncSession ) -> None: """No tracked-kind rows means no gravity to compute.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) await _seed_symbols(db_session, repo.repo_id, [ {"address": f"a.py::import_{i}", "symbol_kind": "import", "blast": 0, "blast_direct": 0, "blast_cross": 0} for i in range(3) ]) results = await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) assert results == [] # --------------------------------------------------------------------------- # Layer 10 — max_depth and depth_distribution not derivable # --------------------------------------------------------------------------- class TestNotDerivable: @pytest.mark.asyncio async def test_P5_19_gravity_max_depth_stays_null( self, db_session: AsyncSession ) -> None: """gravity_max_depth is not derivable from blast columns.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) await _seed_symbols(db_session, repo.repo_id, [ {"address": "a.py::fn", "symbol_kind": "function", "blast": 5, "blast_direct": 2, "blast_cross": 3}, ]) await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) row = await _get_row(db_session, repo.repo_id, "a.py::fn") assert row.gravity_max_depth is None @pytest.mark.asyncio async def test_P5_20_gravity_depth_distribution_stays_null( self, db_session: AsyncSession ) -> None: """gravity_depth_distribution is not derivable from blast columns.""" from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY repo = await create_repo(db_session) await _seed_symbols(db_session, repo.repo_id, [ {"address": "a.py::fn", "symbol_kind": "function", "blast": 5, "blast_direct": 2, "blast_cross": 3}, ]) await _PROVIDER_REGISTRY["intel.code.gravity"].compute( db_session, repo.repo_id, "ref", {}, ) row = await _get_row(db_session, repo.repo_id, "a.py::fn") assert row.gravity_depth_distribution is None