gabriel / musehub public
test_repo_card_integrity.py python
244 lines 10.3 KB
Raw
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor ⚠ breaking 20 days ago
1 """
2 Tier 6 — Integrity tests for enrich_repo_cards().
3
4 Integrity tests verify structural invariants that must hold for every
5 enrichment result regardless of input shape — bucket count, field ranges,
6 type contracts, and cross-field consistency.
7
8 Test IDs
9 --------
10 T600 — pulse_buckets is always exactly _PULSE_DAYS entries
11 T601 — pulse bucket dates are strictly ascending with no gaps or duplicates
12 T602 — pulse bucket counts are always non-negative integers
13 T603 — pulse bucket h values are in range [0, _SPARKLINE_HEIGHT]
14 T604 — autonomy_pct is always in range [0, 100]
15 T605 — dead_count, error_count, warning_count are always non-negative
16 T606 — health_status is always one of {'clean', 'warn', 'risk'}
17 T607 — hottest_symbol.churn_30d is always > 0 (never a zero-churn symbol)
18 T608 — blast_leader.blast is always > 0 (never a zero-blast symbol)
19 T609 — result always contains exactly the requested repo_ids as keys
20 T610 — pulse bucket colors are all valid hex strings
21 """
22 from __future__ import annotations
23
24 import re
25 from datetime import datetime, timedelta, timezone
26
27 import pytest
28 from sqlalchemy.ext.asyncio import AsyncSession
29
30 from musehub.db.musehub_intel_models import MusehubIntelBreakageMeta, MusehubIntelDead, MusehubSymbolIntel
31 from musehub.services.repo_card_enrichment import (
32 _PULSE_DAYS,
33 _SPARKLINE_HEIGHT,
34 enrich_repo_cards,
35 )
36 from tests.factories import create_commit, create_repo
37
38
39 def _utc_now() -> datetime:
40 return datetime.now(tz=timezone.utc)
41
42
43 _HEX_RE = re.compile(r"^#[0-9a-fA-F]{6}$")
44
45
46 # ---------------------------------------------------------------------------
47 # Shared fixture: one richly-populated repo and one empty repo
48 # ---------------------------------------------------------------------------
49
50 async def _seed_two_repos(db: AsyncSession) -> None:
51 """Return (rich_repo_id, empty_repo_id) after seeding data for rich."""
52 rich = await create_repo(db, visibility="public")
53 empty = await create_repo(db, visibility="public")
54
55 # Populate rich repo with every signal type
56 for i in range(5):
57 commit = await create_commit(db, rich.repo_id, timestamp=_utc_now() - timedelta(days=i))
58 db.add(MusehubSymbolIntel(
59 repo_id=rich.repo_id, address="src/a.py::fast_fn", churn_30d=10, blast=50
60 ))
61 db.add(MusehubSymbolIntel(
62 repo_id=rich.repo_id, address="src/b.py::slow_fn", churn_30d=2, blast=200
63 ))
64 db.add(MusehubIntelDead(
65 repo_id=rich.repo_id, address="src/old.py::dead_fn",
66 kind="function", confidence="high", ref="main"
67 ))
68 db.add(MusehubIntelBreakageMeta(
69 repo_id=rich.repo_id, total_issues=1,
70 error_count=0, warning_count=1, file_count=1, ref="main"
71 ))
72 await db.commit()
73 return rich.repo_id, empty.repo_id
74
75
76 # ---------------------------------------------------------------------------
77 # T600 — exactly _PULSE_DAYS buckets
78 # ---------------------------------------------------------------------------
79
80 @pytest.mark.asyncio
81 async def test_t600_pulse_bucket_count_invariant(db_session: AsyncSession) -> None:
82 """T600: pulse_buckets always has exactly _PULSE_DAYS entries."""
83 rich_id, empty_id = await _seed_two_repos(db_session)
84 result = await enrich_repo_cards(db_session, [rich_id, empty_id])
85 for repo_id, enc in result.items():
86 assert len(enc.pulse_buckets) == _PULSE_DAYS, (
87 f"repo {repo_id}: expected {_PULSE_DAYS} buckets, got {len(enc.pulse_buckets)}"
88 )
89
90
91 # ---------------------------------------------------------------------------
92 # T601 — strictly ascending dates, no gaps or duplicates
93 # ---------------------------------------------------------------------------
94
95 @pytest.mark.asyncio
96 async def test_t601_pulse_dates_strictly_ascending(db_session: AsyncSession) -> None:
97 """T601: bucket dates are strictly ascending ISO strings with no gaps."""
98 from datetime import date, timedelta
99 rich_id, empty_id = await _seed_two_repos(db_session)
100 result = await enrich_repo_cards(db_session, [rich_id, empty_id])
101
102 for repo_id, enc in result.items():
103 dates = [date.fromisoformat(b.date) for b in enc.pulse_buckets]
104 # Strictly ascending
105 assert dates == sorted(dates), f"repo {repo_id}: dates not sorted"
106 assert len(dates) == len(set(dates)), f"repo {repo_id}: duplicate dates"
107 # No gaps: each consecutive pair differs by exactly 1 day
108 for a, b in zip(dates, dates[1:]):
109 assert (b - a).days == 1, f"repo {repo_id}: gap between {a} and {b}"
110
111
112 # ---------------------------------------------------------------------------
113 # T602 — non-negative counts
114 # ---------------------------------------------------------------------------
115
116 @pytest.mark.asyncio
117 async def test_t602_pulse_counts_non_negative(db_session: AsyncSession) -> None:
118 """T602: every bucket.count is >= 0."""
119 rich_id, empty_id = await _seed_two_repos(db_session)
120 result = await enrich_repo_cards(db_session, [rich_id, empty_id])
121 for repo_id, enc in result.items():
122 for b in enc.pulse_buckets:
123 assert b.count >= 0, f"repo {repo_id}: negative count {b.count} on {b.date}"
124
125
126 # ---------------------------------------------------------------------------
127 # T603 — h in [0, _SPARKLINE_HEIGHT]
128 # ---------------------------------------------------------------------------
129
130 @pytest.mark.asyncio
131 async def test_t603_pulse_h_within_range(db_session: AsyncSession) -> None:
132 """T603: every bucket.h is in [0, _SPARKLINE_HEIGHT]."""
133 rich_id, empty_id = await _seed_two_repos(db_session)
134 result = await enrich_repo_cards(db_session, [rich_id, empty_id])
135 for repo_id, enc in result.items():
136 for b in enc.pulse_buckets:
137 assert 0 <= b.h <= _SPARKLINE_HEIGHT, (
138 f"repo {repo_id}: h={b.h} out of [0,{_SPARKLINE_HEIGHT}] on {b.date}"
139 )
140
141
142 # ---------------------------------------------------------------------------
143 # T604 — autonomy_pct in [0, 100]
144 # ---------------------------------------------------------------------------
145
146 @pytest.mark.asyncio
147 async def test_t604_autonomy_pct_bounded(db_session: AsyncSession) -> None:
148 """T604: autonomy_pct is always in [0, 100]."""
149 rich_id, empty_id = await _seed_two_repos(db_session)
150 result = await enrich_repo_cards(db_session, [rich_id, empty_id])
151 for repo_id, enc in result.items():
152 assert 0 <= enc.autonomy_pct <= 100, (
153 f"repo {repo_id}: autonomy_pct={enc.autonomy_pct} out of bounds"
154 )
155
156
157 # ---------------------------------------------------------------------------
158 # T605 — dead/error/warning counts non-negative
159 # ---------------------------------------------------------------------------
160
161 @pytest.mark.asyncio
162 async def test_t605_intel_counts_non_negative(db_session: AsyncSession) -> None:
163 """T605: dead_count, error_count, and warning_count are always >= 0."""
164 rich_id, empty_id = await _seed_two_repos(db_session)
165 result = await enrich_repo_cards(db_session, [rich_id, empty_id])
166 for repo_id, enc in result.items():
167 assert enc.dead_count >= 0
168 assert enc.error_count >= 0
169 assert enc.warning_count >= 0
170
171
172 # ---------------------------------------------------------------------------
173 # T606 — health_status is a known literal
174 # ---------------------------------------------------------------------------
175
176 @pytest.mark.asyncio
177 async def test_t606_health_status_is_valid_literal(db_session: AsyncSession) -> None:
178 """T606: health_status is always one of {'clean', 'warn', 'risk'}."""
179 rich_id, empty_id = await _seed_two_repos(db_session)
180 result = await enrich_repo_cards(db_session, [rich_id, empty_id])
181 valid = {"clean", "warn", "risk"}
182 for repo_id, enc in result.items():
183 assert enc.health_status in valid, (
184 f"repo {repo_id}: unexpected health_status={enc.health_status!r}"
185 )
186
187
188 # ---------------------------------------------------------------------------
189 # T607 — hottest_symbol.churn_30d > 0
190 # ---------------------------------------------------------------------------
191
192 @pytest.mark.asyncio
193 async def test_t607_hottest_symbol_has_positive_churn(db_session: AsyncSession) -> None:
194 """T607: when hottest_symbol is not None its churn_30d is > 0."""
195 rich_id, _ = await _seed_two_repos(db_session)
196 result = await enrich_repo_cards(db_session, [rich_id])
197 enc = result[rich_id]
198 if enc.hottest_symbol is not None:
199 assert enc.hottest_symbol.churn_30d > 0, (
200 f"hottest_symbol has churn_30d=0: {enc.hottest_symbol.address}"
201 )
202
203
204 # ---------------------------------------------------------------------------
205 # T608 — blast_leader.blast > 0
206 # ---------------------------------------------------------------------------
207
208 @pytest.mark.asyncio
209 async def test_t608_blast_leader_has_positive_blast(db_session: AsyncSession) -> None:
210 """T608: when blast_leader is not None its blast score is > 0."""
211 rich_id, _ = await _seed_two_repos(db_session)
212 result = await enrich_repo_cards(db_session, [rich_id])
213 enc = result[rich_id]
214 if enc.blast_leader is not None:
215 assert enc.blast_leader.blast > 0, (
216 f"blast_leader has blast=0: {enc.blast_leader.address}"
217 )
218
219
220 # ---------------------------------------------------------------------------
221 # T609 — result keys match requested repo_ids exactly
222 # ---------------------------------------------------------------------------
223
224 @pytest.mark.asyncio
225 async def test_t609_result_keys_match_requested_ids(db_session: AsyncSession) -> None:
226 """T609: the returned dict has exactly the requested repo_ids as keys."""
227 repos = [await create_repo(db_session, visibility="public") for _ in range(10)]
228 repo_ids = [r.repo_id for r in repos]
229
230 result = await enrich_repo_cards(db_session, repo_ids)
231 assert set(result.keys()) == set(repo_ids)
232
233
234 # ---------------------------------------------------------------------------
235 # T610 — bucket colors are valid hex strings
236 # ---------------------------------------------------------------------------
237
238 @pytest.mark.asyncio
239 async def test_t610_pulse_bucket_colors_are_valid_hex(db_session: AsyncSession) -> None:
240 """T610: every bucket.color is a valid 6-digit lowercase hex color string."""
241 repo = await create_repo(db_session, visibility="public")
242 result = await enrich_repo_cards(db_session, [repo.repo_id])
243 for b in result[repo.repo_id].pulse_buckets:
244 assert _HEX_RE.match(b.color), f"invalid color {b.color!r} on {b.date}"
File History 1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor 20 days ago