""" Tier 2 — Integration tests for enrich_repo_cards() against a real test database. These tests exercise the full service call — SQL queries run against the test postgres instance populated via factory helpers and direct ORM inserts. Test IDs -------- T200 — repo with commits on every day of the 30-day window gets correct daily counts T201 — autonomy_pct is 100 when all commits carry a non-empty agent_id T202 — autonomy_pct is 0 when no commits carry an agent_id T203 — autonomy_pct is rounded correctly for a mixed repo (e.g. 3/4 = 75%) T204 — hottest_symbol matches the symbol with the highest churn_30d T205 — blast_leader matches the symbol with the highest blast score T206 — dead_count counts only high-confidence dead symbols (medium/low excluded) T207 — health_status is 'risk' when breakage_meta has error_count > 0 T208 — health_status is 'warn' when dead_count > 0, error_count == 0 T209 — health_status is 'clean' when no dead symbols and no breakage meta row T210 — enrich_repo_cards batches two repos correctly in a single call T211 — repos with no intel rows return safe zero-value enrichment (no crash) T212 — pulse_buckets always has exactly 30 entries regardless of commit pattern T213 — commits older than 30 days do not appear in pulse_buckets T214 — hottest_symbol is None when symbol_intel has no rows for the repo T215 — blast_leader is None when all blast scores are zero """ from __future__ import annotations import secrets from datetime import datetime, timedelta, timezone import pytest import pytest_asyncio from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_intel_models import MusehubIntelBreakageMeta, MusehubIntelDead, MusehubSymbolIntel from musehub.db.musehub_repo_models import MusehubCommit from musehub.services.repo_card_enrichment import ( _PULSE_DAYS, enrich_repo_cards, ) from tests.factories import create_commit, create_repo # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _utc_now() -> datetime: return datetime.now(tz=timezone.utc) def _days_ago(n: int) -> datetime: return _utc_now() - timedelta(days=n) def _commit_id() -> str: return f"sha256:{secrets.token_hex(32)}" async def _insert_symbol_intel( session: AsyncSession, repo_id: str, address: str, churn_30d: int = 0, blast: int = 0, ) -> MusehubSymbolIntel: """Insert a MusehubSymbolIntel row and commit.""" row = MusehubSymbolIntel( repo_id=repo_id, address=address, churn_30d=churn_30d, blast=blast, ) session.add(row) await session.commit() return row async def _insert_dead( session: AsyncSession, repo_id: str, address: str, confidence: str = "high", ) -> MusehubIntelDead: """Insert a MusehubIntelDead row and commit.""" row = MusehubIntelDead( repo_id=repo_id, address=address, kind="function", confidence=confidence, ref="main", ) session.add(row) await session.commit() return row async def _insert_breakage_meta( session: AsyncSession, repo_id: str, error_count: int = 0, warning_count: int = 0, ) -> MusehubIntelBreakageMeta: """Insert a MusehubIntelBreakageMeta row and commit.""" row = MusehubIntelBreakageMeta( repo_id=repo_id, total_issues=error_count + warning_count, error_count=error_count, warning_count=warning_count, file_count=1, ref="main", ) session.add(row) await session.commit() return row async def _add_agent_commit( session: AsyncSession, repo_id: str, timestamp: datetime | None = None, agent_id: str = "claude-code", ) -> MusehubCommit: """Create a commit with a non-empty agent_id (agent commit).""" commit = await create_commit(session, repo_id, timestamp=timestamp or _utc_now()) # MusehubCommit.agent_id is not in CommitFactory; set it directly via update from sqlalchemy import text await session.execute( text("UPDATE musehub_commits SET agent_id = :aid WHERE commit_id = :cid"), {"aid": agent_id, "cid": commit.commit_id}, ) await session.commit() return commit async def _add_human_commit( session: AsyncSession, repo_id: str, timestamp: datetime | None = None, ) -> MusehubCommit: """Create a commit with an empty agent_id (human commit).""" return await create_commit(session, repo_id, timestamp=timestamp or _utc_now()) # --------------------------------------------------------------------------- # T200: correct daily pulse counts # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t200_pulse_correct_daily_counts(db_session: AsyncSession) -> None: """T200: commits on known days produce the correct count in pulse_buckets.""" repo = await create_repo(db_session, visibility="public") today = _utc_now().replace(hour=12, minute=0, second=0, microsecond=0) # 3 commits today, 2 commits yesterday for _ in range(3): await create_commit(db_session, repo.repo_id, timestamp=today) for _ in range(2): await create_commit(db_session, repo.repo_id, timestamp=today - timedelta(days=1)) result = await enrich_repo_cards(db_session, [repo.repo_id]) enc = result[repo.repo_id] today_bucket = next(b for b in enc.pulse_buckets if b.date == today.date().isoformat()) yesterday_bucket = next( b for b in enc.pulse_buckets if b.date == (today - timedelta(days=1)).date().isoformat() ) assert today_bucket.count == 3 assert yesterday_bucket.count == 2 # --------------------------------------------------------------------------- # T201–T203: autonomy_pct # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t201_autonomy_pct_100_all_agent(db_session: AsyncSession) -> None: """T201: autonomy_pct is 100 when every commit has a non-empty agent_id.""" repo = await create_repo(db_session, visibility="public") for _ in range(4): await _add_agent_commit(db_session, repo.repo_id) result = await enrich_repo_cards(db_session, [repo.repo_id]) assert result[repo.repo_id].autonomy_pct == 100 @pytest.mark.asyncio async def test_t202_autonomy_pct_0_all_human(db_session: AsyncSession) -> None: """T202: autonomy_pct is 0 when no commits have an agent_id set.""" repo = await create_repo(db_session, visibility="public") for _ in range(3): await _add_human_commit(db_session, repo.repo_id) result = await enrich_repo_cards(db_session, [repo.repo_id]) assert result[repo.repo_id].autonomy_pct == 0 @pytest.mark.asyncio async def test_t203_autonomy_pct_mixed(db_session: AsyncSession) -> None: """T203: autonomy_pct rounds correctly for a 3-agent / 1-human repo (75%).""" repo = await create_repo(db_session, visibility="public") for _ in range(3): await _add_agent_commit(db_session, repo.repo_id) await _add_human_commit(db_session, repo.repo_id) result = await enrich_repo_cards(db_session, [repo.repo_id]) assert result[repo.repo_id].autonomy_pct == 75 # --------------------------------------------------------------------------- # T204–T205: hottest_symbol and blast_leader # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t204_hottest_symbol_highest_churn(db_session: AsyncSession) -> None: """T204: hottest_symbol is the symbol with the highest churn_30d.""" repo = await create_repo(db_session, visibility="public") await _insert_symbol_intel(db_session, repo.repo_id, "src/a.py::slow_fn", churn_30d=2) await _insert_symbol_intel(db_session, repo.repo_id, "src/b.py::hot_fn", churn_30d=9) await _insert_symbol_intel(db_session, repo.repo_id, "src/c.py::mid_fn", churn_30d=5) result = await enrich_repo_cards(db_session, [repo.repo_id]) enc = result[repo.repo_id] assert enc.hottest_symbol is not None assert enc.hottest_symbol.address == "src/b.py::hot_fn" assert enc.hottest_symbol.churn_30d == 9 @pytest.mark.asyncio async def test_t205_blast_leader_highest_blast(db_session: AsyncSession) -> None: """T205: blast_leader is the symbol with the highest blast score.""" repo = await create_repo(db_session, visibility="public") await _insert_symbol_intel(db_session, repo.repo_id, "src/a.py::small", blast=10) await _insert_symbol_intel(db_session, repo.repo_id, "src/b.py::large", blast=847) result = await enrich_repo_cards(db_session, [repo.repo_id]) enc = result[repo.repo_id] assert enc.blast_leader is not None assert enc.blast_leader.address == "src/b.py::large" assert enc.blast_leader.blast == 847 # --------------------------------------------------------------------------- # T206: dead_count confidence filtering # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t206_dead_count_only_high_confidence(db_session: AsyncSession) -> None: """T206: dead_count excludes medium and low confidence dead symbols.""" repo = await create_repo(db_session, visibility="public") await _insert_dead(db_session, repo.repo_id, "src/a.py::fn_high", confidence="high") await _insert_dead(db_session, repo.repo_id, "src/b.py::fn_medium", confidence="medium") await _insert_dead(db_session, repo.repo_id, "src/c.py::fn_low", confidence="low") result = await enrich_repo_cards(db_session, [repo.repo_id]) assert result[repo.repo_id].dead_count == 1 # --------------------------------------------------------------------------- # T207–T209: health_status via breakage + dead data # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t207_health_risk_when_breakage_errors(db_session: AsyncSession) -> None: """T207: health_status is 'risk' when breakage_meta has error_count > 0.""" repo = await create_repo(db_session, visibility="public") await _insert_breakage_meta(db_session, repo.repo_id, error_count=2, warning_count=1) result = await enrich_repo_cards(db_session, [repo.repo_id]) assert result[repo.repo_id].health_status == "risk" @pytest.mark.asyncio async def test_t208_health_warn_when_dead_no_errors(db_session: AsyncSession) -> None: """T208: health_status is 'warn' when dead symbols exist but no errors.""" repo = await create_repo(db_session, visibility="public") await _insert_dead(db_session, repo.repo_id, "src/a.py::old_fn", confidence="high") result = await enrich_repo_cards(db_session, [repo.repo_id]) assert result[repo.repo_id].health_status == "warn" @pytest.mark.asyncio async def test_t209_health_clean_no_data(db_session: AsyncSession) -> None: """T209: health_status is 'clean' when intel tables have no rows for repo.""" repo = await create_repo(db_session, visibility="public") result = await enrich_repo_cards(db_session, [repo.repo_id]) assert result[repo.repo_id].health_status == "clean" # --------------------------------------------------------------------------- # T210: batching multiple repos # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t210_batches_multiple_repos(db_session: AsyncSession) -> None: """T210: enrich_repo_cards correctly enriches two repos in one call.""" repo_a = await create_repo(db_session, visibility="public") repo_b = await create_repo(db_session, visibility="public") await _add_agent_commit(db_session, repo_a.repo_id) await _insert_dead(db_session, repo_b.repo_id, "src/b.py::fn", confidence="high") result = await enrich_repo_cards(db_session, [repo_a.repo_id, repo_b.repo_id]) assert result[repo_a.repo_id].autonomy_pct == 100 assert result[repo_b.repo_id].dead_count == 1 # cross-repo isolation assert result[repo_a.repo_id].dead_count == 0 assert result[repo_b.repo_id].autonomy_pct == 0 # --------------------------------------------------------------------------- # T211: safe zero-value for repos with no intel data # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t211_safe_zero_value_no_intel(db_session: AsyncSession) -> None: """T211: a repo with no intel rows returns a zero-value enrichment without crashing.""" repo = await create_repo(db_session, visibility="public") result = await enrich_repo_cards(db_session, [repo.repo_id]) enc = result[repo.repo_id] assert enc.autonomy_pct == 0 assert enc.hottest_symbol is None assert enc.blast_leader is None assert enc.dead_count == 0 assert enc.error_count == 0 assert enc.warning_count == 0 assert len(enc.pulse_buckets) == _PULSE_DAYS # --------------------------------------------------------------------------- # T212: pulse always 30 buckets # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t212_pulse_always_30_buckets(db_session: AsyncSession) -> None: """T212: pulse_buckets always has exactly 30 entries regardless of commit pattern.""" repo = await create_repo(db_session, visibility="public") # Scatter commits across random days in the window for n in [0, 5, 10, 15, 20, 25]: await create_commit(db_session, repo.repo_id, timestamp=_days_ago(n)) result = await enrich_repo_cards(db_session, [repo.repo_id]) assert len(result[repo.repo_id].pulse_buckets) == _PULSE_DAYS # --------------------------------------------------------------------------- # T213: old commits excluded from pulse # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t213_commits_older_than_30d_excluded_from_pulse(db_session: AsyncSession) -> None: """T213: commits older than 30 days do not appear in pulse_buckets.""" repo = await create_repo(db_session, visibility="public") await create_commit(db_session, repo.repo_id, timestamp=_days_ago(31)) await create_commit(db_session, repo.repo_id, timestamp=_days_ago(60)) result = await enrich_repo_cards(db_session, [repo.repo_id]) enc = result[repo.repo_id] total_counted = sum(b.count for b in enc.pulse_buckets) assert total_counted == 0 # --------------------------------------------------------------------------- # T214–T215: None when no qualifying intel rows # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t214_hottest_symbol_none_when_no_symbol_intel(db_session: AsyncSession) -> None: """T214: hottest_symbol is None when musehub_symbol_intel has no rows for repo.""" repo = await create_repo(db_session, visibility="public") result = await enrich_repo_cards(db_session, [repo.repo_id]) assert result[repo.repo_id].hottest_symbol is None @pytest.mark.asyncio async def test_t215_blast_leader_none_when_all_blast_zero(db_session: AsyncSession) -> None: """T215: blast_leader is None when all blast scores are zero.""" repo = await create_repo(db_session, visibility="public") await _insert_symbol_intel(db_session, repo.repo_id, "src/a.py::fn", churn_30d=5, blast=0) result = await enrich_repo_cards(db_session, [repo.repo_id]) assert result[repo.repo_id].blast_leader is None