test_repo_card_integrity.py
python
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠ breaking
20 days ago
| 1 | """ |
| 2 | Tier 6 — Integrity tests for enrich_repo_cards(). |
| 3 | |
| 4 | Integrity tests verify structural invariants that must hold for every |
| 5 | enrichment result regardless of input shape — bucket count, field ranges, |
| 6 | type contracts, and cross-field consistency. |
| 7 | |
| 8 | Test IDs |
| 9 | -------- |
| 10 | T600 — pulse_buckets is always exactly _PULSE_DAYS entries |
| 11 | T601 — pulse bucket dates are strictly ascending with no gaps or duplicates |
| 12 | T602 — pulse bucket counts are always non-negative integers |
| 13 | T603 — pulse bucket h values are in range [0, _SPARKLINE_HEIGHT] |
| 14 | T604 — autonomy_pct is always in range [0, 100] |
| 15 | T605 — dead_count, error_count, warning_count are always non-negative |
| 16 | T606 — health_status is always one of {'clean', 'warn', 'risk'} |
| 17 | T607 — hottest_symbol.churn_30d is always > 0 (never a zero-churn symbol) |
| 18 | T608 — blast_leader.blast is always > 0 (never a zero-blast symbol) |
| 19 | T609 — result always contains exactly the requested repo_ids as keys |
| 20 | T610 — pulse bucket colors are all valid hex strings |
| 21 | """ |
| 22 | from __future__ import annotations |
| 23 | |
| 24 | import re |
| 25 | from datetime import datetime, timedelta, timezone |
| 26 | |
| 27 | import pytest |
| 28 | from sqlalchemy.ext.asyncio import AsyncSession |
| 29 | |
| 30 | from musehub.db.musehub_intel_models import MusehubIntelBreakageMeta, MusehubIntelDead, MusehubSymbolIntel |
| 31 | from musehub.services.repo_card_enrichment import ( |
| 32 | _PULSE_DAYS, |
| 33 | _SPARKLINE_HEIGHT, |
| 34 | enrich_repo_cards, |
| 35 | ) |
| 36 | from tests.factories import create_commit, create_repo |
| 37 | |
| 38 | |
| 39 | def _utc_now() -> datetime: |
| 40 | return datetime.now(tz=timezone.utc) |
| 41 | |
| 42 | |
| 43 | _HEX_RE = re.compile(r"^#[0-9a-fA-F]{6}$") |
| 44 | |
| 45 | |
| 46 | # --------------------------------------------------------------------------- |
| 47 | # Shared fixture: one richly-populated repo and one empty repo |
| 48 | # --------------------------------------------------------------------------- |
| 49 | |
| 50 | async def _seed_two_repos(db: AsyncSession) -> None: |
| 51 | """Return (rich_repo_id, empty_repo_id) after seeding data for rich.""" |
| 52 | rich = await create_repo(db, visibility="public") |
| 53 | empty = await create_repo(db, visibility="public") |
| 54 | |
| 55 | # Populate rich repo with every signal type |
| 56 | for i in range(5): |
| 57 | commit = await create_commit(db, rich.repo_id, timestamp=_utc_now() - timedelta(days=i)) |
| 58 | db.add(MusehubSymbolIntel( |
| 59 | repo_id=rich.repo_id, address="src/a.py::fast_fn", churn_30d=10, blast=50 |
| 60 | )) |
| 61 | db.add(MusehubSymbolIntel( |
| 62 | repo_id=rich.repo_id, address="src/b.py::slow_fn", churn_30d=2, blast=200 |
| 63 | )) |
| 64 | db.add(MusehubIntelDead( |
| 65 | repo_id=rich.repo_id, address="src/old.py::dead_fn", |
| 66 | kind="function", confidence="high", ref="main" |
| 67 | )) |
| 68 | db.add(MusehubIntelBreakageMeta( |
| 69 | repo_id=rich.repo_id, total_issues=1, |
| 70 | error_count=0, warning_count=1, file_count=1, ref="main" |
| 71 | )) |
| 72 | await db.commit() |
| 73 | return rich.repo_id, empty.repo_id |
| 74 | |
| 75 | |
| 76 | # --------------------------------------------------------------------------- |
| 77 | # T600 — exactly _PULSE_DAYS buckets |
| 78 | # --------------------------------------------------------------------------- |
| 79 | |
| 80 | @pytest.mark.asyncio |
| 81 | async def test_t600_pulse_bucket_count_invariant(db_session: AsyncSession) -> None: |
| 82 | """T600: pulse_buckets always has exactly _PULSE_DAYS entries.""" |
| 83 | rich_id, empty_id = await _seed_two_repos(db_session) |
| 84 | result = await enrich_repo_cards(db_session, [rich_id, empty_id]) |
| 85 | for repo_id, enc in result.items(): |
| 86 | assert len(enc.pulse_buckets) == _PULSE_DAYS, ( |
| 87 | f"repo {repo_id}: expected {_PULSE_DAYS} buckets, got {len(enc.pulse_buckets)}" |
| 88 | ) |
| 89 | |
| 90 | |
| 91 | # --------------------------------------------------------------------------- |
| 92 | # T601 — strictly ascending dates, no gaps or duplicates |
| 93 | # --------------------------------------------------------------------------- |
| 94 | |
| 95 | @pytest.mark.asyncio |
| 96 | async def test_t601_pulse_dates_strictly_ascending(db_session: AsyncSession) -> None: |
| 97 | """T601: bucket dates are strictly ascending ISO strings with no gaps.""" |
| 98 | from datetime import date, timedelta |
| 99 | rich_id, empty_id = await _seed_two_repos(db_session) |
| 100 | result = await enrich_repo_cards(db_session, [rich_id, empty_id]) |
| 101 | |
| 102 | for repo_id, enc in result.items(): |
| 103 | dates = [date.fromisoformat(b.date) for b in enc.pulse_buckets] |
| 104 | # Strictly ascending |
| 105 | assert dates == sorted(dates), f"repo {repo_id}: dates not sorted" |
| 106 | assert len(dates) == len(set(dates)), f"repo {repo_id}: duplicate dates" |
| 107 | # No gaps: each consecutive pair differs by exactly 1 day |
| 108 | for a, b in zip(dates, dates[1:]): |
| 109 | assert (b - a).days == 1, f"repo {repo_id}: gap between {a} and {b}" |
| 110 | |
| 111 | |
| 112 | # --------------------------------------------------------------------------- |
| 113 | # T602 — non-negative counts |
| 114 | # --------------------------------------------------------------------------- |
| 115 | |
| 116 | @pytest.mark.asyncio |
| 117 | async def test_t602_pulse_counts_non_negative(db_session: AsyncSession) -> None: |
| 118 | """T602: every bucket.count is >= 0.""" |
| 119 | rich_id, empty_id = await _seed_two_repos(db_session) |
| 120 | result = await enrich_repo_cards(db_session, [rich_id, empty_id]) |
| 121 | for repo_id, enc in result.items(): |
| 122 | for b in enc.pulse_buckets: |
| 123 | assert b.count >= 0, f"repo {repo_id}: negative count {b.count} on {b.date}" |
| 124 | |
| 125 | |
| 126 | # --------------------------------------------------------------------------- |
| 127 | # T603 — h in [0, _SPARKLINE_HEIGHT] |
| 128 | # --------------------------------------------------------------------------- |
| 129 | |
| 130 | @pytest.mark.asyncio |
| 131 | async def test_t603_pulse_h_within_range(db_session: AsyncSession) -> None: |
| 132 | """T603: every bucket.h is in [0, _SPARKLINE_HEIGHT].""" |
| 133 | rich_id, empty_id = await _seed_two_repos(db_session) |
| 134 | result = await enrich_repo_cards(db_session, [rich_id, empty_id]) |
| 135 | for repo_id, enc in result.items(): |
| 136 | for b in enc.pulse_buckets: |
| 137 | assert 0 <= b.h <= _SPARKLINE_HEIGHT, ( |
| 138 | f"repo {repo_id}: h={b.h} out of [0,{_SPARKLINE_HEIGHT}] on {b.date}" |
| 139 | ) |
| 140 | |
| 141 | |
| 142 | # --------------------------------------------------------------------------- |
| 143 | # T604 — autonomy_pct in [0, 100] |
| 144 | # --------------------------------------------------------------------------- |
| 145 | |
| 146 | @pytest.mark.asyncio |
| 147 | async def test_t604_autonomy_pct_bounded(db_session: AsyncSession) -> None: |
| 148 | """T604: autonomy_pct is always in [0, 100].""" |
| 149 | rich_id, empty_id = await _seed_two_repos(db_session) |
| 150 | result = await enrich_repo_cards(db_session, [rich_id, empty_id]) |
| 151 | for repo_id, enc in result.items(): |
| 152 | assert 0 <= enc.autonomy_pct <= 100, ( |
| 153 | f"repo {repo_id}: autonomy_pct={enc.autonomy_pct} out of bounds" |
| 154 | ) |
| 155 | |
| 156 | |
| 157 | # --------------------------------------------------------------------------- |
| 158 | # T605 — dead/error/warning counts non-negative |
| 159 | # --------------------------------------------------------------------------- |
| 160 | |
| 161 | @pytest.mark.asyncio |
| 162 | async def test_t605_intel_counts_non_negative(db_session: AsyncSession) -> None: |
| 163 | """T605: dead_count, error_count, and warning_count are always >= 0.""" |
| 164 | rich_id, empty_id = await _seed_two_repos(db_session) |
| 165 | result = await enrich_repo_cards(db_session, [rich_id, empty_id]) |
| 166 | for repo_id, enc in result.items(): |
| 167 | assert enc.dead_count >= 0 |
| 168 | assert enc.error_count >= 0 |
| 169 | assert enc.warning_count >= 0 |
| 170 | |
| 171 | |
| 172 | # --------------------------------------------------------------------------- |
| 173 | # T606 — health_status is a known literal |
| 174 | # --------------------------------------------------------------------------- |
| 175 | |
| 176 | @pytest.mark.asyncio |
| 177 | async def test_t606_health_status_is_valid_literal(db_session: AsyncSession) -> None: |
| 178 | """T606: health_status is always one of {'clean', 'warn', 'risk'}.""" |
| 179 | rich_id, empty_id = await _seed_two_repos(db_session) |
| 180 | result = await enrich_repo_cards(db_session, [rich_id, empty_id]) |
| 181 | valid = {"clean", "warn", "risk"} |
| 182 | for repo_id, enc in result.items(): |
| 183 | assert enc.health_status in valid, ( |
| 184 | f"repo {repo_id}: unexpected health_status={enc.health_status!r}" |
| 185 | ) |
| 186 | |
| 187 | |
| 188 | # --------------------------------------------------------------------------- |
| 189 | # T607 — hottest_symbol.churn_30d > 0 |
| 190 | # --------------------------------------------------------------------------- |
| 191 | |
| 192 | @pytest.mark.asyncio |
| 193 | async def test_t607_hottest_symbol_has_positive_churn(db_session: AsyncSession) -> None: |
| 194 | """T607: when hottest_symbol is not None its churn_30d is > 0.""" |
| 195 | rich_id, _ = await _seed_two_repos(db_session) |
| 196 | result = await enrich_repo_cards(db_session, [rich_id]) |
| 197 | enc = result[rich_id] |
| 198 | if enc.hottest_symbol is not None: |
| 199 | assert enc.hottest_symbol.churn_30d > 0, ( |
| 200 | f"hottest_symbol has churn_30d=0: {enc.hottest_symbol.address}" |
| 201 | ) |
| 202 | |
| 203 | |
| 204 | # --------------------------------------------------------------------------- |
| 205 | # T608 — blast_leader.blast > 0 |
| 206 | # --------------------------------------------------------------------------- |
| 207 | |
| 208 | @pytest.mark.asyncio |
| 209 | async def test_t608_blast_leader_has_positive_blast(db_session: AsyncSession) -> None: |
| 210 | """T608: when blast_leader is not None its blast score is > 0.""" |
| 211 | rich_id, _ = await _seed_two_repos(db_session) |
| 212 | result = await enrich_repo_cards(db_session, [rich_id]) |
| 213 | enc = result[rich_id] |
| 214 | if enc.blast_leader is not None: |
| 215 | assert enc.blast_leader.blast > 0, ( |
| 216 | f"blast_leader has blast=0: {enc.blast_leader.address}" |
| 217 | ) |
| 218 | |
| 219 | |
| 220 | # --------------------------------------------------------------------------- |
| 221 | # T609 — result keys match requested repo_ids exactly |
| 222 | # --------------------------------------------------------------------------- |
| 223 | |
| 224 | @pytest.mark.asyncio |
| 225 | async def test_t609_result_keys_match_requested_ids(db_session: AsyncSession) -> None: |
| 226 | """T609: the returned dict has exactly the requested repo_ids as keys.""" |
| 227 | repos = [await create_repo(db_session, visibility="public") for _ in range(10)] |
| 228 | repo_ids = [r.repo_id for r in repos] |
| 229 | |
| 230 | result = await enrich_repo_cards(db_session, repo_ids) |
| 231 | assert set(result.keys()) == set(repo_ids) |
| 232 | |
| 233 | |
| 234 | # --------------------------------------------------------------------------- |
| 235 | # T610 — bucket colors are valid hex strings |
| 236 | # --------------------------------------------------------------------------- |
| 237 | |
| 238 | @pytest.mark.asyncio |
| 239 | async def test_t610_pulse_bucket_colors_are_valid_hex(db_session: AsyncSession) -> None: |
| 240 | """T610: every bucket.color is a valid 6-digit lowercase hex color string.""" |
| 241 | repo = await create_repo(db_session, visibility="public") |
| 242 | result = await enrich_repo_cards(db_session, [repo.repo_id]) |
| 243 | for b in result[repo.repo_id].pulse_buckets: |
| 244 | assert _HEX_RE.match(b.color), f"invalid color {b.color!r} on {b.date}" |
File History
1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠
20 days ago