""" Tier 6 — Integrity tests for enrich_repo_cards(). Integrity tests verify structural invariants that must hold for every enrichment result regardless of input shape — bucket count, field ranges, type contracts, and cross-field consistency. Test IDs -------- T600 — pulse_buckets is always exactly _PULSE_DAYS entries T601 — pulse bucket dates are strictly ascending with no gaps or duplicates T602 — pulse bucket counts are always non-negative integers T603 — pulse bucket h values are in range [0, _SPARKLINE_HEIGHT] T604 — autonomy_pct is always in range [0, 100] T605 — dead_count, error_count, warning_count are always non-negative T606 — health_status is always one of {'clean', 'warn', 'risk'} T607 — hottest_symbol.churn_30d is always > 0 (never a zero-churn symbol) T608 — blast_leader.blast is always > 0 (never a zero-blast symbol) T609 — result always contains exactly the requested repo_ids as keys T610 — pulse bucket colors are all valid hex strings """ from __future__ import annotations import re from datetime import datetime, timedelta, timezone import pytest from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_intel_models import MusehubIntelBreakageMeta, MusehubIntelDead, MusehubSymbolIntel from musehub.services.repo_card_enrichment import ( _PULSE_DAYS, _SPARKLINE_HEIGHT, enrich_repo_cards, ) from tests.factories import create_commit, create_repo def _utc_now() -> datetime: return datetime.now(tz=timezone.utc) _HEX_RE = re.compile(r"^#[0-9a-fA-F]{6}$") # --------------------------------------------------------------------------- # Shared fixture: one richly-populated repo and one empty repo # --------------------------------------------------------------------------- async def _seed_two_repos(db: AsyncSession) -> None: """Return (rich_repo_id, empty_repo_id) after seeding data for rich.""" rich = await create_repo(db, visibility="public") empty = await create_repo(db, visibility="public") # Populate rich repo with every signal type for i in range(5): commit = await create_commit(db, rich.repo_id, timestamp=_utc_now() - timedelta(days=i)) db.add(MusehubSymbolIntel( repo_id=rich.repo_id, address="src/a.py::fast_fn", churn_30d=10, blast=50 )) db.add(MusehubSymbolIntel( repo_id=rich.repo_id, address="src/b.py::slow_fn", churn_30d=2, blast=200 )) db.add(MusehubIntelDead( repo_id=rich.repo_id, address="src/old.py::dead_fn", kind="function", confidence="high", ref="main" )) db.add(MusehubIntelBreakageMeta( repo_id=rich.repo_id, total_issues=1, error_count=0, warning_count=1, file_count=1, ref="main" )) await db.commit() return rich.repo_id, empty.repo_id # --------------------------------------------------------------------------- # T600 — exactly _PULSE_DAYS buckets # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t600_pulse_bucket_count_invariant(db_session: AsyncSession) -> None: """T600: pulse_buckets always has exactly _PULSE_DAYS entries.""" rich_id, empty_id = await _seed_two_repos(db_session) result = await enrich_repo_cards(db_session, [rich_id, empty_id]) for repo_id, enc in result.items(): assert len(enc.pulse_buckets) == _PULSE_DAYS, ( f"repo {repo_id}: expected {_PULSE_DAYS} buckets, got {len(enc.pulse_buckets)}" ) # --------------------------------------------------------------------------- # T601 — strictly ascending dates, no gaps or duplicates # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t601_pulse_dates_strictly_ascending(db_session: AsyncSession) -> None: """T601: bucket dates are strictly ascending ISO strings with no gaps.""" from datetime import date, timedelta rich_id, empty_id = await _seed_two_repos(db_session) result = await enrich_repo_cards(db_session, [rich_id, empty_id]) for repo_id, enc in result.items(): dates = [date.fromisoformat(b.date) for b in enc.pulse_buckets] # Strictly ascending assert dates == sorted(dates), f"repo {repo_id}: dates not sorted" assert len(dates) == len(set(dates)), f"repo {repo_id}: duplicate dates" # No gaps: each consecutive pair differs by exactly 1 day for a, b in zip(dates, dates[1:]): assert (b - a).days == 1, f"repo {repo_id}: gap between {a} and {b}" # --------------------------------------------------------------------------- # T602 — non-negative counts # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t602_pulse_counts_non_negative(db_session: AsyncSession) -> None: """T602: every bucket.count is >= 0.""" rich_id, empty_id = await _seed_two_repos(db_session) result = await enrich_repo_cards(db_session, [rich_id, empty_id]) for repo_id, enc in result.items(): for b in enc.pulse_buckets: assert b.count >= 0, f"repo {repo_id}: negative count {b.count} on {b.date}" # --------------------------------------------------------------------------- # T603 — h in [0, _SPARKLINE_HEIGHT] # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t603_pulse_h_within_range(db_session: AsyncSession) -> None: """T603: every bucket.h is in [0, _SPARKLINE_HEIGHT].""" rich_id, empty_id = await _seed_two_repos(db_session) result = await enrich_repo_cards(db_session, [rich_id, empty_id]) for repo_id, enc in result.items(): for b in enc.pulse_buckets: assert 0 <= b.h <= _SPARKLINE_HEIGHT, ( f"repo {repo_id}: h={b.h} out of [0,{_SPARKLINE_HEIGHT}] on {b.date}" ) # --------------------------------------------------------------------------- # T604 — autonomy_pct in [0, 100] # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t604_autonomy_pct_bounded(db_session: AsyncSession) -> None: """T604: autonomy_pct is always in [0, 100].""" rich_id, empty_id = await _seed_two_repos(db_session) result = await enrich_repo_cards(db_session, [rich_id, empty_id]) for repo_id, enc in result.items(): assert 0 <= enc.autonomy_pct <= 100, ( f"repo {repo_id}: autonomy_pct={enc.autonomy_pct} out of bounds" ) # --------------------------------------------------------------------------- # T605 — dead/error/warning counts non-negative # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t605_intel_counts_non_negative(db_session: AsyncSession) -> None: """T605: dead_count, error_count, and warning_count are always >= 0.""" rich_id, empty_id = await _seed_two_repos(db_session) result = await enrich_repo_cards(db_session, [rich_id, empty_id]) for repo_id, enc in result.items(): assert enc.dead_count >= 0 assert enc.error_count >= 0 assert enc.warning_count >= 0 # --------------------------------------------------------------------------- # T606 — health_status is a known literal # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t606_health_status_is_valid_literal(db_session: AsyncSession) -> None: """T606: health_status is always one of {'clean', 'warn', 'risk'}.""" rich_id, empty_id = await _seed_two_repos(db_session) result = await enrich_repo_cards(db_session, [rich_id, empty_id]) valid = {"clean", "warn", "risk"} for repo_id, enc in result.items(): assert enc.health_status in valid, ( f"repo {repo_id}: unexpected health_status={enc.health_status!r}" ) # --------------------------------------------------------------------------- # T607 — hottest_symbol.churn_30d > 0 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t607_hottest_symbol_has_positive_churn(db_session: AsyncSession) -> None: """T607: when hottest_symbol is not None its churn_30d is > 0.""" rich_id, _ = await _seed_two_repos(db_session) result = await enrich_repo_cards(db_session, [rich_id]) enc = result[rich_id] if enc.hottest_symbol is not None: assert enc.hottest_symbol.churn_30d > 0, ( f"hottest_symbol has churn_30d=0: {enc.hottest_symbol.address}" ) # --------------------------------------------------------------------------- # T608 — blast_leader.blast > 0 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t608_blast_leader_has_positive_blast(db_session: AsyncSession) -> None: """T608: when blast_leader is not None its blast score is > 0.""" rich_id, _ = await _seed_two_repos(db_session) result = await enrich_repo_cards(db_session, [rich_id]) enc = result[rich_id] if enc.blast_leader is not None: assert enc.blast_leader.blast > 0, ( f"blast_leader has blast=0: {enc.blast_leader.address}" ) # --------------------------------------------------------------------------- # T609 — result keys match requested repo_ids exactly # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t609_result_keys_match_requested_ids(db_session: AsyncSession) -> None: """T609: the returned dict has exactly the requested repo_ids as keys.""" repos = [await create_repo(db_session, visibility="public") for _ in range(10)] repo_ids = [r.repo_id for r in repos] result = await enrich_repo_cards(db_session, repo_ids) assert set(result.keys()) == set(repo_ids) # --------------------------------------------------------------------------- # T610 — bucket colors are valid hex strings # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t610_pulse_bucket_colors_are_valid_hex(db_session: AsyncSession) -> None: """T610: every bucket.color is a valid 6-digit lowercase hex color string.""" repo = await create_repo(db_session, visibility="public") result = await enrich_repo_cards(db_session, [repo.repo_id]) for b in result[repo.repo_id].pulse_buckets: assert _HEX_RE.match(b.color), f"invalid color {b.color!r} on {b.date}"