"""TDD spec for Phase 1 — StableProvider migration + pure SQL rewrite (issue #12). Verifies that ``StableProvider`` derives stability records entirely from ``musehub_symbol_intel`` without any subprocess calls, and that the new ``last_changed_commit`` column is populated correctly. Seven test tiers ---------------- Unit P1_01 – P1_06 _days_stable_from_dt() helper Integration P1_07 – P1_14 Provider upserts, filtering, reruns E2E P1_15 – P1_18 Seed symbol_intel → run provider → verify DB Stress P1_19 – P1_21 500-row batch, idempotency Data Integrity P1_22 – P1_24 NULL exclusion, kind filter, uniqueness Performance P1_25 – P1_26 Batch timing bounds Security P1_27 – P1_28 Injection verbatim storage, repo isolation """ from __future__ import annotations import secrets import time from datetime import datetime, timedelta, timezone import pytest import pytest_asyncio from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import fake_id, long_id from musehub.db.musehub_intel_models import MusehubIntelStable, MusehubSymbolIntel from musehub.db.musehub_repo_models import MusehubRepo from musehub.services.musehub_intel_providers import StableProvider, _days_stable_from_dt from tests.factories import create_repo # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _uid() -> str: return fake_id(secrets.token_hex(16)) _OWNER = "testuser" _SLUG = "stableprovider" _REF = long_id("a" * 64) _REF2 = long_id("b" * 64) async def _seed_symbol( session: AsyncSession, repo_id: str, *, address: str, churn: int = 0, churn_30d: int = 0, churn_90d: int = 0, last_changed: datetime | None = None, last_commit_id: str | None = None, symbol_kind: str = "function", ) -> None: """Insert or upsert a ``musehub_symbol_intel`` row for test fixtures. Parameters ---------- session: Active async SQLAlchemy session. repo_id: Target repository ID. address: Symbol address (``file.py::fn``). churn: Lifetime change count (0 → since_start eligible). churn_30d: Changes in last 30 days (0 → stable candidate). churn_90d: Changes in last 90 days (0 → stable candidate). last_changed: UTC datetime of last modification; None excludes from stable. last_commit_id: Commit ID of last modification; stored as last_changed_commit. symbol_kind: Symbol kind string (function / class / etc.). """ stmt = ( pg_insert(MusehubSymbolIntel) .values( repo_id=repo_id, address=address, symbol_kind=symbol_kind, churn=churn, churn_30d=churn_30d, churn_90d=churn_90d, blast=0, blast_direct=0, blast_cross=0, last_changed=last_changed, last_commit_id=last_commit_id, author_count=1, gravity=0.0, weekly=[0] * 12, blast_top=[], ) .on_conflict_do_update( index_elements=["repo_id", "address"], set_={ "churn": churn, "churn_30d": churn_30d, "churn_90d": churn_90d, "last_changed": last_changed, "last_commit_id": last_commit_id, }, ) ) await session.execute(stmt) await session.flush() # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest_asyncio.fixture async def stable_repo(db_session: AsyncSession) -> MusehubRepo: """Bare repo — no symbol_intel rows seeded.""" return await create_repo(db_session, owner=_OWNER, slug=_SLUG) @pytest_asyncio.fixture async def stable_repo_with_symbols(db_session: AsyncSession, stable_repo: MusehubRepo) -> MusehubRepo: """Repo seeded with a mix of stable and unstable symbols.""" repo_id = stable_repo.repo_id now = datetime.now(timezone.utc) await db_session.commit() # stable — untouched for 180 days, churn_30d=0, churn_90d=0 await _seed_symbol( db_session, repo_id, address="pkg/core.py::parse_frame", churn=3, churn_30d=0, churn_90d=0, last_changed=now - timedelta(days=180), last_commit_id=_REF, ) # eternal — never modified await _seed_symbol( db_session, repo_id, address="pkg/codec.py::pack", churn=0, churn_30d=0, churn_90d=0, last_changed=now - timedelta(days=365), last_commit_id=None, ) # unstable — active in last 30 days await _seed_symbol( db_session, repo_id, address="pkg/api.py::handler", churn=12, churn_30d=4, churn_90d=4, last_changed=now - timedelta(days=10), last_commit_id=_REF, ) # no last_changed — should be excluded await _seed_symbol( db_session, repo_id, address="pkg/init.py::bootstrap", churn=0, churn_30d=0, churn_90d=0, last_changed=None, last_commit_id=None, ) await db_session.commit() return stable_repo # --------------------------------------------------------------------------- # Tier 1 — Unit: _days_stable_from_dt() # --------------------------------------------------------------------------- class TestDaysStableHelper: """Unit tests for the ``_days_stable_from_dt`` pure helper function.""" def test_P1_01_none_returns_zero(self) -> None: """None input → 0 (no last_changed means no stability data).""" assert _days_stable_from_dt(None) == 0 def test_P1_02_exactly_90_days_ago(self) -> None: """Datetime 90 days ago → 90.""" dt = datetime.now(timezone.utc) - timedelta(days=90) assert _days_stable_from_dt(dt) == 90 def test_P1_03_future_timestamp_clamped_to_zero(self) -> None: """Future timestamp → 0, never negative.""" dt = datetime.now(timezone.utc) + timedelta(days=30) assert _days_stable_from_dt(dt) == 0 def test_P1_04_epoch_returns_large_positive(self) -> None: """Unix epoch → thousands of days (sanity check for ancient timestamps).""" epoch = datetime(1970, 1, 1, tzinfo=timezone.utc) assert _days_stable_from_dt(epoch) > 10_000 def test_P1_05_naive_datetime_treated_as_utc(self) -> None: """Timezone-naive datetime treated as UTC — no TypeError raised.""" naive = datetime.now(timezone.utc).replace(tzinfo=None) - timedelta(days=45) assert _days_stable_from_dt(naive) == 45 def test_P1_06_one_day_ago_returns_one(self) -> None: """One day ago → 1.""" dt = datetime.now(timezone.utc) - timedelta(days=1, seconds=1) assert _days_stable_from_dt(dt) == 1 # --------------------------------------------------------------------------- # Tier 2 — Integration: provider upserts and filtering # --------------------------------------------------------------------------- class TestStableProviderIntegration: """Integration tests — provider run against a real async DB session.""" @pytest.mark.asyncio async def test_P1_07_stable_symbol_upserted( self, db_session: AsyncSession, stable_repo_with_symbols: MusehubRepo ) -> None: """Stable symbol with churn_30d=0, churn_90d=0 → row written to intel_stable.""" repo_id = stable_repo_with_symbols.repo_id await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() from sqlalchemy import select row = (await db_session.execute( select(MusehubIntelStable).where( MusehubIntelStable.repo_id == repo_id, MusehubIntelStable.address == "pkg/core.py::parse_frame", ) )).scalar_one_or_none() assert row is not None @pytest.mark.asyncio async def test_P1_08_days_stable_value_correct( self, db_session: AsyncSession, stable_repo_with_symbols: MusehubRepo ) -> None: """days_stable ≈ 180 for symbol last changed 180 days ago.""" repo_id = stable_repo_with_symbols.repo_id await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() from sqlalchemy import select row = (await db_session.execute( select(MusehubIntelStable).where( MusehubIntelStable.repo_id == repo_id, MusehubIntelStable.address == "pkg/core.py::parse_frame", ) )).scalar_one() assert 178 <= row.days_stable <= 182 @pytest.mark.asyncio async def test_P1_09_since_start_true_for_zero_lifetime_churn( self, db_session: AsyncSession, stable_repo_with_symbols: MusehubRepo ) -> None: """Symbol with churn=0 → since_start=True.""" repo_id = stable_repo_with_symbols.repo_id await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() from sqlalchemy import select row = (await db_session.execute( select(MusehubIntelStable).where( MusehubIntelStable.repo_id == repo_id, MusehubIntelStable.address == "pkg/codec.py::pack", ) )).scalar_one() assert row.since_start is True @pytest.mark.asyncio async def test_P1_10_since_start_false_for_nonzero_lifetime_churn( self, db_session: AsyncSession, stable_repo_with_symbols: MusehubRepo ) -> None: """Symbol with churn=3 (but churn_30d=0) → since_start=False.""" repo_id = stable_repo_with_symbols.repo_id await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() from sqlalchemy import select row = (await db_session.execute( select(MusehubIntelStable).where( MusehubIntelStable.repo_id == repo_id, MusehubIntelStable.address == "pkg/core.py::parse_frame", ) )).scalar_one() assert row.since_start is False @pytest.mark.asyncio async def test_P1_11_last_changed_commit_populated( self, db_session: AsyncSession, stable_repo_with_symbols: MusehubRepo ) -> None: """last_changed_commit carries the last_commit_id from symbol_intel.""" repo_id = stable_repo_with_symbols.repo_id await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() from sqlalchemy import select row = (await db_session.execute( select(MusehubIntelStable).where( MusehubIntelStable.repo_id == repo_id, MusehubIntelStable.address == "pkg/core.py::parse_frame", ) )).scalar_one() assert row.last_changed_commit == _REF @pytest.mark.asyncio async def test_P1_12_nonzero_churn_30d_excluded( self, db_session: AsyncSession, stable_repo_with_symbols: MusehubRepo ) -> None: """Symbol with churn_30d=4 must NOT appear in intel_stable.""" repo_id = stable_repo_with_symbols.repo_id await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() from sqlalchemy import select row = (await db_session.execute( select(MusehubIntelStable).where( MusehubIntelStable.repo_id == repo_id, MusehubIntelStable.address == "pkg/api.py::handler", ) )).scalar_one_or_none() assert row is None @pytest.mark.asyncio async def test_P1_13_rerun_updates_days_stable_in_place( self, db_session: AsyncSession, stable_repo_with_symbols: MusehubRepo ) -> None: """Second provider run updates existing row — no duplicate.""" repo_id = stable_repo_with_symbols.repo_id await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() await StableProvider().compute(db_session, repo_id, _REF2, {}) await db_session.flush() from sqlalchemy import select, func count = (await db_session.execute( select(func.count()).where( MusehubIntelStable.repo_id == repo_id, MusehubIntelStable.address == "pkg/core.py::parse_frame", ) )).scalar_one() assert count == 1 @pytest.mark.asyncio async def test_P1_14_ref_column_updated_on_rerun( self, db_session: AsyncSession, stable_repo_with_symbols: MusehubRepo ) -> None: """Second run with a different ref → ref column reflects the new value.""" repo_id = stable_repo_with_symbols.repo_id await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() db_session.expire_all() await StableProvider().compute(db_session, repo_id, _REF2, {}) await db_session.flush() from sqlalchemy import select row = (await db_session.execute( select(MusehubIntelStable).where( MusehubIntelStable.repo_id == repo_id, MusehubIntelStable.address == "pkg/core.py::parse_frame", ).execution_options(populate_existing=True) )).scalar_one() assert row.ref == _REF2 # --------------------------------------------------------------------------- # Tier 3 — E2E: seed → provider → verify DB shape # --------------------------------------------------------------------------- class TestStableProviderE2E: """End-to-end tests — full seed-to-DB round-trip.""" @pytest.mark.asyncio async def test_P1_15_row_count_positive( self, db_session: AsyncSession, stable_repo_with_symbols: MusehubRepo ) -> None: """At least one row written after running provider on seeded data.""" repo_id = stable_repo_with_symbols.repo_id results = await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() count = results[0][1]["count"] if results else 0 assert count > 0 @pytest.mark.asyncio async def test_P1_16_days_stable_positive( self, db_session: AsyncSession, stable_repo_with_symbols: MusehubRepo ) -> None: """All written rows have days_stable > 0.""" from sqlalchemy import select repo_id = stable_repo_with_symbols.repo_id await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() rows = (await db_session.execute( select(MusehubIntelStable).where( MusehubIntelStable.repo_id == repo_id ) )).scalars().all() assert all(r.days_stable > 0 for r in rows) @pytest.mark.asyncio async def test_P1_17_last_changed_commit_is_sha256_prefixed_or_none( self, db_session: AsyncSession, stable_repo_with_symbols: MusehubRepo ) -> None: """last_changed_commit is either None or starts with 'sha256:'.""" from sqlalchemy import select repo_id = stable_repo_with_symbols.repo_id await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() rows = (await db_session.execute( select(MusehubIntelStable).where( MusehubIntelStable.repo_id == repo_id ) )).scalars().all() for row in rows: assert row.last_changed_commit is None or row.last_changed_commit.startswith("sha256:") @pytest.mark.asyncio async def test_P1_18_since_start_only_when_lifetime_churn_zero( self, db_session: AsyncSession, stable_repo_with_symbols: MusehubRepo ) -> None: """since_start=True only for symbols whose lifetime churn is 0.""" from sqlalchemy import select repo_id = stable_repo_with_symbols.repo_id await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() rows = (await db_session.execute( select(MusehubIntelStable).where( MusehubIntelStable.repo_id == repo_id, MusehubIntelStable.since_start == True, # noqa: E712 ) )).scalars().all() # Only pkg/codec.py::pack has churn=0 addresses = {r.address for r in rows} assert "pkg/api.py::handler" not in addresses assert "pkg/core.py::parse_frame" not in addresses # --------------------------------------------------------------------------- # Tier 4 — Stress: large batch, idempotency # --------------------------------------------------------------------------- class TestStableProviderStress: """Stress tests — large symbol counts and repeated runs.""" @pytest.mark.asyncio async def test_P1_19_500_symbols_all_upserted( self, db_session: AsyncSession, stable_repo: MusehubRepo ) -> None: """500 qualifying symbols all land in intel_stable after one run.""" from sqlalchemy import select, func repo_id = stable_repo.repo_id now = datetime.now(timezone.utc) await db_session.commit() for i in range(500): await _seed_symbol( db_session, repo_id, address=f"pkg/mod{i}.py::fn_{i}", churn=0, churn_30d=0, churn_90d=0, last_changed=now - timedelta(days=100 + i), last_commit_id=_REF, ) await db_session.commit() await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() count = (await db_session.execute( select(func.count()).where(MusehubIntelStable.repo_id == repo_id) )).scalar_one() assert count == 500 @pytest.mark.asyncio async def test_P1_20_no_duplicates_after_single_run( self, db_session: AsyncSession, stable_repo: MusehubRepo ) -> None: """No duplicate (repo_id, address) pairs after a single run.""" from sqlalchemy import select, func repo_id = stable_repo.repo_id now = datetime.now(timezone.utc) await db_session.commit() for i in range(50): await _seed_symbol( db_session, repo_id, address=f"pkg/dup{i}.py::fn", churn=0, churn_30d=0, churn_90d=0, last_changed=now - timedelta(days=200), last_commit_id=_REF, ) await db_session.commit() await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() total = (await db_session.execute( select(func.count()).where(MusehubIntelStable.repo_id == repo_id) )).scalar_one() assert total == 50 @pytest.mark.asyncio async def test_P1_21_upsert_is_idempotent( self, db_session: AsyncSession, stable_repo: MusehubRepo ) -> None: """Running the provider twice produces the same row count as running once.""" from sqlalchemy import select, func repo_id = stable_repo.repo_id now = datetime.now(timezone.utc) await db_session.commit() for i in range(20): await _seed_symbol( db_session, repo_id, address=f"pkg/idem{i}.py::fn", churn=0, churn_30d=0, churn_90d=0, last_changed=now - timedelta(days=150), last_commit_id=_REF, ) await db_session.commit() await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() await StableProvider().compute(db_session, repo_id, _REF2, {}) await db_session.flush() count = (await db_session.execute( select(func.count()).where(MusehubIntelStable.repo_id == repo_id) )).scalar_one() assert count == 20 # --------------------------------------------------------------------------- # Tier 5 — Data Integrity # --------------------------------------------------------------------------- class TestStableProviderDataIntegrity: """Data integrity tests — exclusion rules and uniqueness guarantees.""" @pytest.mark.asyncio async def test_P1_22_null_last_changed_excluded( self, db_session: AsyncSession, stable_repo: MusehubRepo ) -> None: """Symbols with last_changed=NULL are not written to intel_stable.""" from sqlalchemy import select, func repo_id = stable_repo.repo_id await db_session.commit() await _seed_symbol( db_session, repo_id, address="pkg/null.py::fn", churn=0, churn_30d=0, churn_90d=0, last_changed=None, last_commit_id=None, ) await db_session.commit() await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() count = (await db_session.execute( select(func.count()).where(MusehubIntelStable.repo_id == repo_id) )).scalar_one() assert count == 0 @pytest.mark.asyncio async def test_P1_23_nonzero_churn_90d_excluded( self, db_session: AsyncSession, stable_repo: MusehubRepo ) -> None: """Symbol with churn_90d > 0 is excluded even if churn_30d = 0.""" from sqlalchemy import select, func repo_id = stable_repo.repo_id now = datetime.now(timezone.utc) await db_session.commit() await _seed_symbol( db_session, repo_id, address="pkg/slow.py::fn", churn=5, churn_30d=0, churn_90d=2, last_changed=now - timedelta(days=45), last_commit_id=_REF, ) await db_session.commit() await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() count = (await db_session.execute( select(func.count()).where(MusehubIntelStable.repo_id == repo_id) )).scalar_one() assert count == 0 @pytest.mark.asyncio async def test_P1_24_address_unique_per_repo( self, db_session: AsyncSession, stable_repo: MusehubRepo ) -> None: """(repo_id, address) primary key — two repos can share the same address.""" from sqlalchemy import select, func repo_id = stable_repo.repo_id repo2 = await create_repo(db_session, owner=_OWNER, slug="stableprovider2") repo_id2 = repo2.repo_id now = datetime.now(timezone.utc) await db_session.commit() for rid in (repo_id, repo_id2): await _seed_symbol( db_session, rid, address="shared/utils.py::parse", churn=0, churn_30d=0, churn_90d=0, last_changed=now - timedelta(days=200), last_commit_id=_REF, ) await db_session.commit() await StableProvider().compute(db_session, repo_id, _REF, {}) await StableProvider().compute(db_session, repo_id2, _REF, {}) await db_session.flush() count = (await db_session.execute( select(func.count()).where( MusehubIntelStable.address == "shared/utils.py::parse" ) )).scalar_one() assert count == 2 # --------------------------------------------------------------------------- # Tier 6 — Performance # --------------------------------------------------------------------------- class TestStableProviderPerformance: """Performance tests — batch timing bounds for production-scale data.""" @pytest.mark.asyncio async def test_P1_25_1000_row_batch_under_5s( self, db_session: AsyncSession, stable_repo: MusehubRepo ) -> None: """First-run upsert of 1000 symbols completes in under 5 seconds.""" repo_id = stable_repo.repo_id now = datetime.now(timezone.utc) await db_session.commit() for i in range(1000): await _seed_symbol( db_session, repo_id, address=f"pkg/perf{i}.py::fn", churn=0, churn_30d=0, churn_90d=0, last_changed=now - timedelta(days=100 + (i % 900)), last_commit_id=_REF, ) await db_session.commit() start = time.monotonic() await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() elapsed = time.monotonic() - start assert elapsed < 5.0, f"First run took {elapsed:.2f}s — expected < 5s" @pytest.mark.asyncio async def test_P1_26_second_run_all_conflicts_under_5s( self, db_session: AsyncSession, stable_repo: MusehubRepo ) -> None: """Second run (all-conflict upsert path) also completes in under 5 seconds.""" repo_id = stable_repo.repo_id now = datetime.now(timezone.utc) await db_session.commit() for i in range(1000): await _seed_symbol( db_session, repo_id, address=f"pkg/perf2_{i}.py::fn", churn=0, churn_30d=0, churn_90d=0, last_changed=now - timedelta(days=100 + (i % 900)), last_commit_id=_REF, ) await db_session.commit() await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() start = time.monotonic() await StableProvider().compute(db_session, repo_id, _REF2, {}) await db_session.flush() elapsed = time.monotonic() - start assert elapsed < 5.0, f"Second run took {elapsed:.2f}s — expected < 5s" # --------------------------------------------------------------------------- # Tier 7 — Security # --------------------------------------------------------------------------- class TestStableProviderSecurity: """Security tests — injection safety and repo isolation.""" @pytest.mark.asyncio async def test_P1_27_sql_injection_in_address_stored_verbatim( self, db_session: AsyncSession, stable_repo: MusehubRepo ) -> None: """Malicious address string is stored as plain text — not executed.""" from sqlalchemy import select repo_id = stable_repo.repo_id injection = "'; DROP TABLE musehub_intel_stable; --" now = datetime.now(timezone.utc) await db_session.commit() await _seed_symbol( db_session, repo_id, address=injection, churn=0, churn_30d=0, churn_90d=0, last_changed=now - timedelta(days=100), last_commit_id=_REF, ) await db_session.commit() await StableProvider().compute(db_session, repo_id, _REF, {}) await db_session.flush() # Table must still exist and contain the verbatim string row = (await db_session.execute( select(MusehubIntelStable).where( MusehubIntelStable.repo_id == repo_id, MusehubIntelStable.address == injection, ) )).scalar_one_or_none() assert row is not None assert row.address == injection @pytest.mark.asyncio async def test_P1_28_repo_isolation( self, db_session: AsyncSession, stable_repo: MusehubRepo ) -> None: """Running provider for repo A does not write rows for repo B.""" from sqlalchemy import select, func repo_id_a = stable_repo.repo_id repo_b = await create_repo(db_session, owner=_OWNER, slug="stableisolation") repo_id_b = repo_b.repo_id now = datetime.now(timezone.utc) await db_session.commit() # Seed only repo A await _seed_symbol( db_session, repo_id_a, address="shared/fn.py::do_work", churn=0, churn_30d=0, churn_90d=0, last_changed=now - timedelta(days=120), last_commit_id=_REF, ) await db_session.commit() await StableProvider().compute(db_session, repo_id_a, _REF, {}) await db_session.flush() # repo B must have zero rows count_b = (await db_session.execute( select(func.count()).where(MusehubIntelStable.repo_id == repo_id_b) )).scalar_one() assert count_b == 0