"""Fidelity tests — providers and routes must match CLI data shape. Verifies three gaps found in the CLI-vs-DB sweep: 1. StableProvider — days_stable must be calendar days, not commit-walk index 2. EntangleProvider — co_change_rate must use Jaccard (co / |union|), not min 3. Hotspots route — must read MusehubSymbolIntel, not the legacy snapshot blob Cases: F01 StableProvider: symbol untouched for 30 calendar days → days_stable ≈ 30 F02 StableProvider: symbol changed today → days_stable = 0 F03 EntangleProvider: rate = co_changes / |union(commits_a, commits_b)| F04 EntangleProvider: commits_both_active stores union cardinality F05 Hotspots route: 200 with no legacy snapshot row (no longer depends on it) F06 Hotspots route: symbols ranked by churn_30d descending F07 Hotspots route: address and change_count present in HTML body """ from __future__ import annotations from datetime import datetime, timedelta, timezone import pytest import pytest_asyncio import sqlalchemy as sa from httpx import AsyncClient from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_intel_models import MusehubIntelEntangle, MusehubIntelStable, MusehubSymbolHistoryEntry, MusehubSymbolIntel from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo from musehub.services.musehub_intel_providers import EntangleProvider, StableProvider from tests.factories import create_repo _NOW = datetime.now(tz=timezone.utc) # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _ts(days_ago: int) -> datetime: return _NOW - timedelta(days=days_ago) async def _insert_symbol_intel( session: AsyncSession, repo_id: str, address: str, churn_30d: int = 0, last_changed: datetime | None = None, ) -> None: await session.execute( pg_insert(MusehubSymbolIntel) .values( repo_id=repo_id, address=address, churn=churn_30d, churn_30d=churn_30d, churn_90d=0, blast=0, blast_direct=0, blast_cross=0, blast_top=[], last_changed=last_changed, author_count=1, gravity=0.0, weekly=[], ) .on_conflict_do_update( index_elements=["repo_id", "address"], set_={"churn_30d": churn_30d, "last_changed": last_changed}, ) ) async def _insert_history_entry( session: AsyncSession, repo_id: str, address: str, commit_id: str, committed_at: datetime, op: str = "modify", ) -> None: await session.execute( pg_insert(MusehubSymbolHistoryEntry) .values( repo_id=repo_id, address=address, commit_id=commit_id, committed_at=committed_at, op=op, ) .on_conflict_do_nothing() ) # --------------------------------------------------------------------------- # F01 / F02 — StableProvider: calendar days, not commit-walk index # --------------------------------------------------------------------------- @pytest_asyncio.fixture async def stable_repo(db_session: AsyncSession) -> tuple[MusehubRepo, str]: """Repo with two commits (today and 30 days ago) and two symbols.""" from muse.core.types import blob_id repo = await create_repo(db_session, owner="fid", slug="stable-fid") repo_id = str(repo.repo_id) c_old_id = blob_id(b"commit-30d") c_new_id = blob_id(b"commit-today") # Chain: today's commit's parent is the 30-day-old commit c_old = MusehubCommit( commit_id=c_old_id, message="old", author="a", branch="main", parent_ids=[], timestamp=_ts(30), ) c_new = MusehubCommit( commit_id=c_new_id, message="new", author="a", branch="main", parent_ids=[c_old_id], timestamp=_ts(0), ) db_session.add_all([c_old, c_new]) await db_session.flush() db_session.add_all([ MusehubCommitRef(repo_id=repo_id, commit_id=c_old_id), MusehubCommitRef(repo_id=repo_id, commit_id=c_new_id), ]) # symbol_a was last touched today; symbol_b was last touched 30 days ago await db_session.flush() await _insert_history_entry(db_session, repo_id, "src/a.py::fn_a", c_new_id, _ts(0)) await _insert_history_entry(db_session, repo_id, "src/b.py::fn_b", c_old_id, _ts(30)) # Both symbols must exist in MusehubSymbolIntel (provider reads current symbols from here) await _insert_symbol_intel(db_session, repo_id, "src/a.py::fn_a", last_changed=_ts(0)) await _insert_symbol_intel(db_session, repo_id, "src/b.py::fn_b", last_changed=_ts(30)) await db_session.commit() return repo, c_new_id class TestStableCalendarDays: @pytest.mark.asyncio async def test_F01_symbol_30d_stale_has_days_stable_approx_30( self, db_session: AsyncSession, stable_repo: tuple[MusehubRepo, str] ) -> None: """Symbol last touched 30 calendar days ago → days_stable ≈ 30, not 1.""" repo, head = stable_repo provider = StableProvider() await provider.compute(db_session, str(repo.repo_id), head, {}) row = await db_session.scalar( sa.select(MusehubIntelStable).where( MusehubIntelStable.repo_id == str(repo.repo_id), MusehubIntelStable.address == "src/b.py::fn_b", ) ) assert row is not None # Must be close to 30 calendar days — definitely not the commit index (1) assert row.days_stable >= 28, f"Expected ~30, got {row.days_stable}" assert row.days_stable <= 32, f"Expected ~30, got {row.days_stable}" @pytest.mark.asyncio async def test_F02_symbol_changed_today_has_days_stable_zero( self, db_session: AsyncSession, stable_repo: tuple[MusehubRepo, str] ) -> None: """Symbol changed today → days_stable = 0, not the commit index.""" repo, head = stable_repo provider = StableProvider() await provider.compute(db_session, str(repo.repo_id), head, {}) row = await db_session.scalar( sa.select(MusehubIntelStable).where( MusehubIntelStable.repo_id == str(repo.repo_id), MusehubIntelStable.address == "src/a.py::fn_a", ) ) assert row is not None assert row.days_stable == 0, f"Expected 0, got {row.days_stable}" # --------------------------------------------------------------------------- # F03 / F04 — EntangleProvider: Jaccard co_change_rate # --------------------------------------------------------------------------- @pytest_asyncio.fixture async def entangle_repo(db_session: AsyncSession) -> tuple[MusehubRepo, str]: """Repo whose commit graph gives a clear Jaccard vs min distinction. symbol_a touched in: c1, c2, c3, c4, c5 → 5 commits symbol_b touched in: c3, c4, c5, c6, c7 → 5 commits co_changes = 3 (c3, c4, c5) union = 7 (c1..c7) Jaccard rate = 3/7 ≈ 0.4286 min rate = 3/5 = 0.6 (the wrong answer) """ from muse.core.types import blob_id repo = await create_repo(db_session, owner="fid", slug="entangle-fid") repo_id = str(repo.repo_id) # Build a linear chain c1 → c2 → … → c7 (c7 = HEAD) commit_ids = [blob_id(f"entangle-c{i}".encode()) for i in range(1, 8)] for i, cid in enumerate(commit_ids): parent = [commit_ids[i - 1]] if i > 0 else [] db_session.add(MusehubCommit( commit_id=cid, message=f"c{i+1}", author="a", branch="main", parent_ids=parent, timestamp=_ts(7 - i), )) await db_session.flush() db_session.add_all([ MusehubCommitRef(repo_id=repo_id, commit_id=cid) for cid in commit_ids ]) await db_session.flush() # symbol_a in c1–c5, symbol_b in c3–c7 sym_a = "src/a.py::fn_a" sym_b = "src/b.py::fn_b" ts = _ts(1) for cid in commit_ids[:5]: # c1-c5 → symbol_a await _insert_history_entry(db_session, repo_id, sym_a, cid, ts) for cid in commit_ids[2:]: # c3-c7 → symbol_b await _insert_history_entry(db_session, repo_id, sym_b, cid, ts) await db_session.commit() return repo, commit_ids[-1] # HEAD = c7 class TestEntangleJaccard: @pytest.mark.asyncio async def test_F03_co_change_rate_is_jaccard( self, db_session: AsyncSession, entangle_repo: tuple[MusehubRepo, str] ) -> None: """co_change_rate = co_changes / |union| (Jaccard), not co / min.""" repo, head = entangle_repo provider = EntangleProvider() await provider.compute(db_session, str(repo.repo_id), head, {}) row = await db_session.scalar( sa.select(MusehubIntelEntangle).where( MusehubIntelEntangle.repo_id == str(repo.repo_id), ) ) assert row is not None, "Expected one entangle pair to be stored" expected_jaccard = 3 / 7 expected_min_rate = 3 / 5 assert abs(row.co_change_rate - expected_jaccard) < 0.001, ( f"Rate {row.co_change_rate:.4f} looks like min ({expected_min_rate}) " f"not Jaccard ({expected_jaccard:.4f})" ) @pytest.mark.asyncio async def test_F04_commits_both_active_is_union_cardinality( self, db_session: AsyncSession, entangle_repo: tuple[MusehubRepo, str] ) -> None: """commits_both_active stores |union(commits_a, commits_b)| = 7.""" repo, head = entangle_repo provider = EntangleProvider() await provider.compute(db_session, str(repo.repo_id), head, {}) row = await db_session.scalar( sa.select(MusehubIntelEntangle).where( MusehubIntelEntangle.repo_id == str(repo.repo_id), ) ) assert row is not None assert row.commits_both_active == 7, ( f"Expected union cardinality 7, got {row.commits_both_active}" ) # --------------------------------------------------------------------------- # F05 / F06 / F07 — Hotspots route: reads MusehubSymbolIntel, not snapshot # --------------------------------------------------------------------------- @pytest_asyncio.fixture async def hotspots_repo(db_session: AsyncSession) -> MusehubRepo: """Repo with symbol intel rows but NO legacy snapshot.""" repo = await create_repo(db_session, owner="fid", slug="hotspots-fid") repo_id = str(repo.repo_id) # Three symbols with different churn_30d values for addr, churn in [ ("src/hot.py::fn_hot", 42), ("src/med.py::fn_med", 15), ("src/cold.py::fn_cold", 3), ]: await _insert_symbol_intel(db_session, repo_id, addr, churn_30d=churn) await db_session.commit() return repo class TestHotspotsRoute: @pytest.mark.asyncio async def test_F05_hotspots_returns_200_without_legacy_snapshot( self, client: AsyncClient, hotspots_repo: MusehubRepo ) -> None: """Route must not 500 when there is no legacy snapshot row.""" r = await client.get("/fid/hotspots-fid/intel/hotspots") assert r.status_code == 200 @pytest.mark.asyncio async def test_F06_hotspots_ranked_by_churn_30d_descending( self, client: AsyncClient, hotspots_repo: MusehubRepo ) -> None: """Symbols appear highest-churn first (42, 15, 3).""" r = await client.get("/fid/hotspots-fid/intel/hotspots") assert r.status_code == 200 body = r.text pos_hot = body.find("fn_hot") pos_med = body.find("fn_med") pos_cold = body.find("fn_cold") assert pos_hot != -1 and pos_med != -1 and pos_cold != -1, ( "Not all symbols found in response" ) assert pos_hot < pos_med < pos_cold, ( "Symbols not in churn-descending order" ) @pytest.mark.asyncio async def test_F07_hotspots_renders_address_and_change_count( self, client: AsyncClient, hotspots_repo: MusehubRepo ) -> None: """Address and change count appear in the rendered HTML.""" r = await client.get("/fid/hotspots-fid/intel/hotspots") assert r.status_code == 200 assert "src/hot.py::fn_hot" in r.text assert "42" in r.text