gabriel / musehub public
test_mist_phase1_intel_pipeline.py python
378 lines 13.6 KB
Raw
sha256:3c58668648c7323bb9f5c6881cfe6a3f14fc93fcb73b537d253732952a5bf8bf chore: bump version to 0.2.0rc12 Sonnet 4.6 patch 9 days ago
1 """Phase 1 TDD: Mist domain intel pipeline — job dispatch + MistProvider.
2
3 Tests are written RED first. Run them before touching musehub_intel_providers.py
4 to confirm they fail for the right reason, then implement to make them green.
5
6 Coverage:
7 1. job_types_for_push("mist") includes "intel.mist"
8 2. MistProvider.compute extracts anchors and persists intel results
9 3. MistProvider.compute handles binary / anchor-free artifacts gracefully
10 4. Regression: code and midi dispatch are unaffected by the mist branch
11 5. MistProvider is registered in _PROVIDER_REGISTRY under "intel.mist"
12 """
13 from __future__ import annotations
14
15 import secrets
16 from datetime import datetime, timezone
17
18 import pytest
19 from sqlalchemy import select
20 from sqlalchemy.ext.asyncio import AsyncSession
21
22 from musehub.db.musehub_intel_models import MusehubIntelResult
23 from musehub.db.musehub_repo_models import MusehubMist, MusehubRepo
24 from musehub.core.genesis import compute_identity_id, compute_repo_id
25
26
27 # ---------------------------------------------------------------------------
28 # Helpers
29 # ---------------------------------------------------------------------------
30
31 def _now() -> datetime:
32 return datetime.now(tz=timezone.utc)
33
34
35 def _uid() -> str:
36 return secrets.token_hex(16)
37
38
39 def _repo_id(owner: str, slug: str) -> str:
40 return compute_repo_id(
41 compute_identity_id(owner.encode()),
42 slug,
43 "mist",
44 _now().isoformat(),
45 )
46
47
48 async def _seed_mist(
49 session: AsyncSession,
50 *,
51 owner: str = "testuser",
52 filename: str = "snippet.py",
53 content: str = "def hello():\n return 'world'\n",
54 artifact_type: str = "code",
55 symbol_anchors: list[str] | None = None,
56 mist_id: str | None = None,
57 ) -> tuple[MusehubRepo, MusehubMist]:
58 """Create a MusehubRepo (domain_id='mist') and a linked MusehubMist row."""
59 slug = mist_id or f"mist-{secrets.token_hex(4)}"
60 owner_id = compute_identity_id(owner.encode())
61 created_at = _now()
62 repo_id = compute_repo_id(owner_id, slug, "mist", created_at.isoformat())
63
64 repo = MusehubRepo(
65 repo_id=repo_id,
66 name=slug,
67 owner=owner,
68 slug=slug,
69 visibility="public",
70 owner_user_id=owner_id,
71 domain_id="mist",
72 description="test mist repo",
73 tags=[],
74 created_at=created_at,
75 )
76 session.add(repo)
77 await session.flush()
78
79 actual_mist_id = mist_id or f"Abc{secrets.token_hex(5)[:9]}"
80 mist = MusehubMist(
81 mist_id=actual_mist_id,
82 repo_id=repo_id,
83 owner=owner,
84 filename=filename,
85 content=content,
86 artifact_type=artifact_type,
87 language="python" if filename.endswith(".py") else "",
88 size_bytes=len(content.encode()),
89 symbol_anchors=symbol_anchors or [],
90 )
91 session.add(mist)
92 await session.commit()
93 await session.refresh(repo)
94 await session.refresh(mist)
95 return repo, mist
96
97
98 # ---------------------------------------------------------------------------
99 # 1. job_types_for_push dispatch
100 # ---------------------------------------------------------------------------
101
102 class TestJobTypesForPush:
103 def test_mist_domain_dispatches_intel_mist(self) -> None:
104 from musehub.services.musehub_intel_providers import job_types_for_push
105
106 types = job_types_for_push("mist")
107 assert "intel.mist" in types, (
108 f"job_types_for_push('mist') must include 'intel.mist'; got {types}"
109 )
110
111 def test_mist_domain_always_includes_structural(self) -> None:
112 from musehub.services.musehub_intel_providers import job_types_for_push
113
114 types = job_types_for_push("mist")
115 assert "intel.structural" in types
116
117 def test_mist_domain_always_includes_gc(self) -> None:
118 from musehub.services.musehub_intel_providers import job_types_for_push
119
120 types = job_types_for_push("mist")
121 assert "gc" in types
122
123 def test_mist_domain_does_not_include_intel_code(self) -> None:
124 from musehub.services.musehub_intel_providers import job_types_for_push
125
126 types = job_types_for_push("mist")
127 assert "intel.code" not in types, (
128 "mist domain must not trigger code intel job"
129 )
130
131 def test_code_domain_unaffected(self) -> None:
132 from musehub.services.musehub_intel_providers import job_types_for_push
133
134 types = job_types_for_push("code")
135 assert "intel.code" in types
136 assert "intel.mist" not in types
137
138 def test_midi_domain_unaffected(self) -> None:
139 from musehub.services.musehub_intel_providers import job_types_for_push
140
141 types = job_types_for_push("midi")
142 assert "intel.midi" in types
143 assert "intel.mist" not in types
144
145 def test_none_domain_defaults_to_code(self) -> None:
146 from musehub.services.musehub_intel_providers import job_types_for_push
147
148 types = job_types_for_push(None)
149 assert "intel.code" in types
150 assert "intel.mist" not in types
151
152
153 # ---------------------------------------------------------------------------
154 # 2. _PROVIDER_REGISTRY contains "intel.mist"
155 # ---------------------------------------------------------------------------
156
157 class TestProviderRegistry:
158 def test_intel_mist_is_registered(self) -> None:
159 from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
160
161 assert "intel.mist" in _PROVIDER_REGISTRY, (
162 "'intel.mist' must be in _PROVIDER_REGISTRY"
163 )
164
165 def test_intel_mist_satisfies_protocol(self) -> None:
166 from musehub.services.musehub_intel_providers import (
167 _PROVIDER_REGISTRY,
168 IntelProvider,
169 )
170
171 provider = _PROVIDER_REGISTRY["intel.mist"]
172 assert isinstance(provider, IntelProvider), (
173 "MistProvider must satisfy the IntelProvider protocol"
174 )
175
176
177 # ---------------------------------------------------------------------------
178 # 3. MistProvider.compute — anchor extraction
179 # ---------------------------------------------------------------------------
180
181 class TestMistProviderCompute:
182 @pytest.mark.asyncio
183 async def test_extracts_anchors_for_python_artifact(
184 self, db_session: AsyncSession, test_user: db.MusehubIdentity
185 ) -> None:
186 """Provider returns mist.anchors result with correct symbol addresses."""
187 from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
188
189 provider = _PROVIDER_REGISTRY["intel.mist"]
190 repo, mist = await _seed_mist(
191 db_session,
192 owner=test_user.handle,
193 filename="utils.py",
194 content="def add(a, b):\n return a + b\n\ndef subtract(a, b):\n return a - b\n",
195 artifact_type="code",
196 )
197
198 results = await provider.compute(db_session, repo.repo_id, "HEAD", {})
199
200 assert len(results) == 1
201 intel_type, data = results[0]
202 assert intel_type == "mist.anchors"
203 assert data["mist_id"] == mist.mist_id
204 assert data["filename"] == "utils.py"
205 assert data["artifact_type"] == "code"
206 anchors: list[str] = data["symbol_anchors"]
207 assert any("add" in a for a in anchors), f"Expected 'add' anchor; got {anchors}"
208 assert any("subtract" in a for a in anchors), f"Expected 'subtract' anchor; got {anchors}"
209 assert data["anchor_count"] == len(anchors)
210
211 @pytest.mark.asyncio
212 async def test_anchor_count_matches_symbol_anchors_length(
213 self, db_session: AsyncSession, test_user: db.MusehubIdentity
214 ) -> None:
215 from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
216
217 provider = _PROVIDER_REGISTRY["intel.mist"]
218 repo, _ = await _seed_mist(
219 db_session,
220 owner=test_user.handle,
221 filename="calc.py",
222 content=(
223 "class Calc:\n"
224 " def mul(self, a, b): return a * b\n"
225 " def div(self, a, b): return a / b\n"
226 ),
227 )
228
229 results = await provider.compute(db_session, repo.repo_id, "HEAD", {})
230 _, data = results[0]
231 assert data["anchor_count"] == len(data["symbol_anchors"])
232
233 @pytest.mark.asyncio
234 async def test_binary_artifact_produces_zero_anchors(
235 self, db_session: AsyncSession, test_user: db.MusehubIdentity
236 ) -> None:
237 """Binary content (e.g. base64) with no parsable symbols → zero anchors, no crash."""
238 from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
239
240 provider = _PROVIDER_REGISTRY["intel.mist"]
241 # Base64-encoded PNG header — not parseable as Python/JS/TS
242 binary_content = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk"
243 repo, _ = await _seed_mist(
244 db_session,
245 owner=test_user.handle,
246 filename="image.png",
247 content=binary_content,
248 artifact_type="image",
249 )
250
251 results = await provider.compute(db_session, repo.repo_id, "HEAD", {})
252
253 assert len(results) == 1
254 _, data = results[0]
255 assert data["symbol_anchors"] == []
256 assert data["anchor_count"] == 0
257
258 @pytest.mark.asyncio
259 async def test_no_mist_for_repo_returns_empty(
260 self, db_session: AsyncSession, test_user: db.MusehubIdentity
261 ) -> None:
262 """A repo with no mist row (edge case) → empty results, no crash."""
263 from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
264
265 provider = _PROVIDER_REGISTRY["intel.mist"]
266 owner_id = compute_identity_id(test_user.handle.encode())
267 created_at = _now()
268 repo_id = compute_repo_id(owner_id, "orphan-repo", "mist", created_at.isoformat())
269 repo = MusehubRepo(
270 repo_id=repo_id,
271 name="orphan-repo",
272 owner=test_user.handle,
273 slug="orphan-repo",
274 visibility="public",
275 owner_user_id=owner_id,
276 domain_id="mist",
277 description="",
278 tags=[],
279 created_at=created_at,
280 )
281 db_session.add(repo)
282 await db_session.commit()
283
284 results = await provider.compute(db_session, repo_id, "HEAD", {})
285 assert results == []
286
287 @pytest.mark.asyncio
288 async def test_updates_symbol_anchors_on_mist_row(
289 self, db_session: AsyncSession, test_user: db.MusehubIdentity
290 ) -> None:
291 """Provider refreshes mist.symbol_anchors in the DB if they were stale."""
292 from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
293
294 provider = _PROVIDER_REGISTRY["intel.mist"]
295 # Seed with deliberately empty symbol_anchors
296 repo, mist = await _seed_mist(
297 db_session,
298 owner=test_user.handle,
299 filename="module.py",
300 content="def process(data):\n return data\n",
301 symbol_anchors=[], # stale — will be refreshed by provider
302 )
303
304 await provider.compute(db_session, repo.repo_id, "HEAD", {})
305 await db_session.commit()
306
307 await db_session.refresh(mist)
308 assert any("process" in a for a in mist.symbol_anchors), (
309 f"mist.symbol_anchors should be refreshed; got {mist.symbol_anchors}"
310 )
311
312 @pytest.mark.asyncio
313 async def test_results_persisted_via_persist_intel_results(
314 self, db_session: AsyncSession, test_user: db.MusehubIdentity
315 ) -> None:
316 """Full pipeline: compute → persist_intel_results → row in musehub_intel_results."""
317 from musehub.services.musehub_intel_providers import (
318 _PROVIDER_REGISTRY,
319 persist_intel_results,
320 )
321
322 provider = _PROVIDER_REGISTRY["intel.mist"]
323 repo, _ = await _seed_mist(
324 db_session,
325 owner=test_user.handle,
326 filename="api.py",
327 content="async def handle(request):\n pass\n",
328 )
329
330 results = await provider.compute(db_session, repo.repo_id, "HEAD", {})
331 await persist_intel_results(db_session, repo.repo_id, "HEAD", results)
332 await db_session.commit()
333
334 row = (await db_session.execute(
335 select(MusehubIntelResult).where(
336 MusehubIntelResult.repo_id == repo.repo_id,
337 MusehubIntelResult.intel_type == "mist.anchors",
338 )
339 )).scalar_one_or_none()
340
341 assert row is not None, "intel result row must exist after persist_intel_results"
342 import json
343 data = json.loads(row.data_json)
344 assert data["mist_id"] is not None
345 assert "symbol_anchors" in data
346
347 @pytest.mark.asyncio
348 async def test_persist_is_idempotent(
349 self, db_session: AsyncSession, test_user: db.MusehubIdentity
350 ) -> None:
351 """Running compute + persist twice for the same repo produces exactly one row."""
352 from musehub.services.musehub_intel_providers import (
353 _PROVIDER_REGISTRY,
354 persist_intel_results,
355 )
356 from sqlalchemy import func
357
358 provider = _PROVIDER_REGISTRY["intel.mist"]
359 repo, _ = await _seed_mist(
360 db_session,
361 owner=test_user.handle,
362 filename="idempotent.py",
363 content="def noop(): pass\n",
364 )
365
366 for _ in range(2):
367 results = await provider.compute(db_session, repo.repo_id, "HEAD", {})
368 await persist_intel_results(db_session, repo.repo_id, "HEAD", results)
369 await db_session.commit()
370
371 count = (await db_session.execute(
372 select(func.count()).select_from(MusehubIntelResult).where(
373 MusehubIntelResult.repo_id == repo.repo_id,
374 MusehubIntelResult.intel_type == "mist.anchors",
375 )
376 )).scalar_one()
377
378 assert count == 1, f"Idempotent upsert must produce exactly 1 row; got {count}"
File History 1 commit
sha256:35d76015db2541686c33edd44343ea2d9f751325b4a5556cc9c4c9c0f84edbbe chore: bump version to 0.2.0rc12 Sonnet 4.6 patch 7 days ago