gabriel / musehub public
test_intel_type.py python
644 lines 26.7 KB
Raw
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor ⚠ breaking 21 days ago
1 """Type Health dashboard — full 7-tier test suite (issue #18).
2
3 Tests are written TDD-first: all tests in this file must be RED before
4 Phase 3–5 implementation begins, then GREEN after.
5
6 Tiers:
7 T01–T03 Layer T1 — DB extension (return_annotation column)
8 T04–T05 Layer T2 — Provider batch performance
9 T06–T14 Layer T3 — Route (unit / integration)
10 T15–T19 Layer T4 — E2E (HTML body assertions)
11 T20–T22 Layer T5 — State integrity
12 T23–T25 Layer T6 — Performance
13 T26–T30 Layer T7 — Security
14 """
15 from __future__ import annotations
16
17 import time
18 from unittest.mock import AsyncMock, patch
19
20 import typing
21
22 import pytest
23 import pytest_asyncio
24 import sqlalchemy as sa
25 from httpx import AsyncClient
26 from sqlalchemy.engine import CursorResult
27 from sqlalchemy.dialects.postgresql import insert as pg_insert
28 from sqlalchemy.ext.asyncio import AsyncSession
29
30 from musehub.db.musehub_intel_models import MusehubIntelType
31 from musehub.db.musehub_repo_models import MusehubRepo
32 from tests.factories import create_repo
33 from muse.core.types import long_id
34
35 _REF = long_id("a" * 64)
36
37
38 # ---------------------------------------------------------------------------
39 # Helpers
40 # ---------------------------------------------------------------------------
41
42 async def _insert_type_row(
43 session: AsyncSession,
44 repo_id: str,
45 address: str,
46 kind: str = "function",
47 type_score: float = 1.0,
48 return_is_any: bool = False,
49 params_total: int = 2,
50 params_annotated: int = 2,
51 params_with_any: int = 0,
52 return_annotation: str | None = "str",
53 ) -> None:
54 await session.execute(
55 pg_insert(MusehubIntelType)
56 .values(
57 repo_id=repo_id,
58 address=address,
59 kind=kind,
60 type_score=type_score,
61 return_is_any=return_is_any,
62 params_total=params_total,
63 params_annotated=params_annotated,
64 params_with_any=params_with_any,
65 return_annotation=return_annotation,
66 ref=_REF,
67 )
68 .on_conflict_do_update(
69 index_elements=["repo_id", "address"],
70 set_={
71 "type_score": type_score,
72 "return_annotation": return_annotation,
73 },
74 )
75 )
76
77
78 @pytest_asyncio.fixture
79 async def type_repo(db_session: AsyncSession) -> MusehubRepo:
80 """Repo with a mix of fully-typed, partial, untyped, and any-polluted symbols."""
81 repo = await create_repo(db_session, owner="typeuser", slug="type-e2e")
82 rid = str(repo.repo_id)
83
84 # fully typed (score=1.0)
85 await _insert_type_row(db_session, rid, "src/a.py::fn_full",
86 type_score=1.0, return_annotation="str")
87 # partial (score=0.75)
88 await _insert_type_row(db_session, rid, "src/b.py::fn_partial",
89 kind="method", type_score=0.75,
90 params_total=4, params_annotated=3,
91 return_annotation="None")
92 # untyped (score=0.0)
93 await _insert_type_row(db_session, rid, "src/c.py::fn_untyped",
94 type_score=0.0, params_annotated=0,
95 return_annotation=None)
96 # any-polluted (has params_with_any)
97 await _insert_type_row(db_session, rid, "src/d.py::fn_any",
98 type_score=0.75, return_is_any=False,
99 params_with_any=1, return_annotation="Any")
100
101 await db_session.commit()
102 return repo
103
104
105 # ─────────────────────────────────────────────────────────────────────────────
106 # Layer T1 — DB extension
107 # ─────────────────────────────────────────────────────────────────────────────
108
109 class TestDBExtension:
110
111 def test_T01_return_annotation_column_exists_on_model(self) -> None:
112 """MusehubIntelType must have a return_annotation mapped column."""
113 cols = {c.key for c in sa.inspect(MusehubIntelType).mapper.column_attrs}
114 assert "return_annotation" in cols, (
115 "return_annotation column missing from MusehubIntelType"
116 )
117
118 def test_T02_return_annotation_is_nullable(self) -> None:
119 """return_annotation must be nullable (existing rows have no value)."""
120 col = MusehubIntelType.__table__.c["return_annotation"]
121 assert col.nullable, "return_annotation must be nullable"
122
123 @pytest.mark.asyncio
124 async def test_T03_return_annotation_stored_and_retrieved(
125 self, db_session: AsyncSession
126 ) -> None:
127 """Inserting a row with return_annotation persists and round-trips."""
128 repo = await create_repo(db_session, owner="typeuser", slug="t03")
129 await _insert_type_row(db_session, str(repo.repo_id),
130 "src/x.py::fn", return_annotation="list[str]")
131 await db_session.commit()
132
133 row = await db_session.scalar(
134 sa.select(MusehubIntelType).where(
135 MusehubIntelType.repo_id == str(repo.repo_id),
136 MusehubIntelType.address == "src/x.py::fn",
137 )
138 )
139 assert row is not None
140 assert row.return_annotation == "list[str]"
141
142
143 # ─────────────────────────────────────────────────────────────────────────────
144 # Layer T2 — Provider batch performance
145 # ─────────────────────────────────────────────────────────────────────────────
146
147 class TestProviderBatch:
148
149 @pytest.mark.asyncio
150 async def test_T04_type_provider_issues_one_sql_per_chunk(
151 self, db_session: AsyncSession
152 ) -> None:
153 """TypeProvider must use batch upsert, not one execute per symbol."""
154 from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
155
156 repo = await create_repo(db_session, owner="typeuser", slug="t04")
157 ref = _REF
158
159 symbols = [
160 {
161 "address": f"src/m{i}.py::fn",
162 "kind": "function",
163 "return_annotation": "str",
164 "return_is_any": False,
165 "params_total": 1,
166 "params_annotated": 1,
167 "params_with_any": 0,
168 "type_score": 1.0,
169 }
170 for i in range(50)
171 ]
172 muse_out = __import__("json").dumps({"symbols": symbols})
173
174 execute_calls: list[sa.Executable] = []
175 original_execute = db_session.execute
176
177 async def counting_execute(stmt: sa.Executable, *args: typing.Any, **kwargs: typing.Any) -> CursorResult[typing.Any]:
178 execute_calls.append(stmt)
179 return await original_execute(stmt, *args, **kwargs)
180
181 with patch("asyncio.create_subprocess_exec",
182 return_value=_mock_process(muse_out)):
183 db_session.execute = counting_execute # type: ignore[method-assign]
184 await _PROVIDER_REGISTRY["intel.code.type"].compute(
185 db_session, repo.repo_id, ref,
186 {"head": ref, "owner": repo.owner, "slug": repo.slug},
187 )
188 db_session.execute = original_execute # type: ignore[method-assign]
189
190 # 50 symbols fit in one chunk of 1000 — expect exactly 1 upsert execute
191 upsert_calls = [
192 c for c in execute_calls
193 if hasattr(c, "is_dml") or "INSERT" in str(type(c).__name__).upper()
194 or "insert" in str(c).lower()
195 ]
196 assert len(upsert_calls) == 1, (
197 f"Expected 1 batch upsert execute for 50 symbols, got {len(upsert_calls)}"
198 )
199
200 @pytest.mark.asyncio
201 async def test_T05_upsert_500_symbols_under_500ms(
202 self, db_session: AsyncSession
203 ) -> None:
204 """Batch-upserting 500 symbols must complete in under 500ms."""
205 from musehub.services.musehub_intel_providers import _PROVIDER_REGISTRY
206
207 repo = await create_repo(db_session, owner="typeuser", slug="t05")
208 symbols = [
209 {
210 "address": f"src/file{i}.py::fn_{i}",
211 "kind": "function",
212 "return_annotation": "int",
213 "return_is_any": False,
214 "params_total": 2,
215 "params_annotated": 2,
216 "params_with_any": 0,
217 "type_score": 1.0,
218 }
219 for i in range(500)
220 ]
221 muse_out = __import__("json").dumps({"symbols": symbols})
222
223 t0 = time.monotonic()
224 with patch("asyncio.create_subprocess_exec",
225 return_value=_mock_process(muse_out)):
226 await _PROVIDER_REGISTRY["intel.code.type"].compute(
227 db_session, repo.repo_id, _REF,
228 {"head": _REF, "owner": repo.owner, "slug": repo.slug},
229 )
230 elapsed = time.monotonic() - t0
231 assert elapsed < 0.5, f"500-symbol batch took {elapsed:.3f}s (limit: 0.5s)"
232
233
234 # ─────────────────────────────────────────────────────────────────────────────
235 # Layer T3 — Route (unit / integration)
236 # ─────────────────────────────────────────────────────────────────────────────
237
238 class TestRoute:
239
240 @pytest.mark.asyncio
241 async def test_T06_returns_200_with_empty_repo(
242 self, client: AsyncClient, db_session: AsyncSession
243 ) -> None:
244 """Route must return 200 even when musehub_intel_type has no rows."""
245 await create_repo(db_session, owner="typeuser", slug="t06-empty")
246 await db_session.commit()
247 r = await client.get("/typeuser/t06-empty/intel/type")
248 assert r.status_code == 200
249
250 @pytest.mark.asyncio
251 async def test_T07_returns_200_with_data(
252 self, client: AsyncClient, type_repo: MusehubRepo
253 ) -> None:
254 """Route returns 200 when rows exist."""
255 r = await client.get(f"/typeuser/type-e2e/intel/type")
256 assert r.status_code == 200
257
258 @pytest.mark.asyncio
259 async def test_T08_summary_stats_match_db_counts(
260 self, client: AsyncClient, type_repo: MusehubRepo
261 ) -> None:
262 """Coverage fraction and tier counts must be derived from DB, not hardcoded."""
263 r = await client.get("/typeuser/type-e2e/intel/type")
264 assert r.status_code == 200
265 body = r.text
266 # 1 fully typed out of 4 total → 25.0%
267 assert "25" in body, "coverage_pct (25%) not found in response"
268 # 1 untyped symbol
269 assert "fn_untyped" in body or "1" in body
270
271 @pytest.mark.asyncio
272 async def test_T09_filter_tier_untyped(
273 self, client: AsyncClient, type_repo: MusehubRepo
274 ) -> None:
275 """?tier=untyped returns only symbols with type_score < 0.5."""
276 r = await client.get("/typeuser/type-e2e/intel/type?tier=untyped")
277 assert r.status_code == 200
278 assert "fn_untyped" in r.text
279 assert "fn_full" not in r.text
280
281 @pytest.mark.asyncio
282 async def test_T10_filter_tier_partial(
283 self, client: AsyncClient, type_repo: MusehubRepo
284 ) -> None:
285 """?tier=partial returns only symbols with 0.5 ≤ type_score < 1.0."""
286 r = await client.get("/typeuser/type-e2e/intel/type?tier=partial")
287 assert r.status_code == 200
288 assert "fn_partial" in r.text
289 assert "fn_untyped" not in r.text
290 assert "fn_full" not in r.text
291
292 @pytest.mark.asyncio
293 async def test_T11_filter_tier_any(
294 self, client: AsyncClient, type_repo: MusehubRepo
295 ) -> None:
296 """?tier=any returns only symbols with return_is_any or params_with_any > 0."""
297 r = await client.get("/typeuser/type-e2e/intel/type?tier=any")
298 assert r.status_code == 200
299 assert "fn_any" in r.text
300 assert "fn_full" not in r.text
301
302 @pytest.mark.asyncio
303 async def test_T12_filter_kind_function(
304 self, client: AsyncClient, type_repo: MusehubRepo
305 ) -> None:
306 """?kind=function returns only function-kind symbols."""
307 r = await client.get("/typeuser/type-e2e/intel/type?kind=function")
308 assert r.status_code == 200
309 # fn_full is kind=function; fn_partial is kind=method
310 assert "fn_full" in r.text
311 assert "fn_partial" not in r.text
312
313 @pytest.mark.asyncio
314 async def test_T13_default_sort_score_ascending(
315 self, client: AsyncClient, type_repo: MusehubRepo
316 ) -> None:
317 """Default sort is type_score ASC (worst-typed first)."""
318 r = await client.get("/typeuser/type-e2e/intel/type")
319 assert r.status_code == 200
320 body = r.text
321 pos_untyped = body.find("fn_untyped")
322 pos_full = body.find("fn_full")
323 assert pos_untyped != -1 and pos_full != -1
324 assert pos_untyped < pos_full, "Untyped symbol must appear before fully-typed"
325
326 @pytest.mark.asyncio
327 async def test_T14_top_param_limits_results(
328 self, client: AsyncClient, db_session: AsyncSession
329 ) -> None:
330 """?top=20 returns at most 20 symbols even when 25 exist."""
331 repo = await create_repo(db_session, owner="typeuser", slug="t14-top")
332 rid = str(repo.repo_id)
333 for i in range(25):
334 await _insert_type_row(db_session, rid,
335 f"src/f{i}.py::fn_{i}", type_score=float(i) / 24)
336 await db_session.commit()
337
338 r = await client.get("/typeuser/t14-top/intel/type?top=20")
339 assert r.status_code == 200
340 count = sum(1 for i in range(25) if f"src/f{i}.py::fn_{i}" in r.text)
341 assert count <= 20, f"Expected ≤20 results for ?top=20, got {count}"
342
343
344 # ─────────────────────────────────────────────────────────────────────────────
345 # Layer T4 — E2E (HTML body assertions)
346 # ─────────────────────────────────────────────────────────────────────────────
347
348 class TestE2E:
349
350 @pytest.mark.asyncio
351 async def test_T15_coverage_fraction_rendered_as_pct(
352 self, client: AsyncClient, type_repo: MusehubRepo
353 ) -> None:
354 """Coverage fraction (0.25) must be rendered as a percentage string."""
355 r = await client.get("/typeuser/type-e2e/intel/type")
356 assert r.status_code == 200
357 # 1/4 fully typed = 25%
358 assert "25" in r.text
359
360 @pytest.mark.asyncio
361 async def test_T16_symbol_address_in_html(
362 self, client: AsyncClient, type_repo: MusehubRepo
363 ) -> None:
364 """Symbol addresses must appear verbatim in the HTML body."""
365 r = await client.get("/typeuser/type-e2e/intel/type")
366 assert r.status_code == 200
367 assert "src/c.py::fn_untyped" in r.text
368
369 @pytest.mark.asyncio
370 async def test_T17_return_annotation_in_html(
371 self, client: AsyncClient, type_repo: MusehubRepo
372 ) -> None:
373 """Non-null return_annotation must appear in the rendered HTML."""
374 r = await client.get("/typeuser/type-e2e/intel/type")
375 assert r.status_code == 200
376 assert "list[str]" in r.text or "str" in r.text
377
378 @pytest.mark.asyncio
379 async def test_T18_any_badge_rendered_for_any_polluted_symbol(
380 self, client: AsyncClient, type_repo: MusehubRepo
381 ) -> None:
382 """Any-pollution indicator must appear for symbols with params_with_any > 0."""
383 r = await client.get("/typeuser/type-e2e/intel/type")
384 assert r.status_code == 200
385 # The any-badge or warning marker must be in the HTML
386 assert "any" in r.text.lower() or "⚠" in r.text
387
388 @pytest.mark.asyncio
389 async def test_T19_dashboard_card_links_to_type_page(
390 self, client: AsyncClient, type_repo: MusehubRepo
391 ) -> None:
392 """Intel dashboard card must include a link to /intel/type."""
393 r = await client.get("/typeuser/type-e2e/intel")
394 assert r.status_code == 200
395 assert b"/intel/type" in r.content
396
397
398 # ─────────────────────────────────────────────────────────────────────────────
399 # Layer T5 — State integrity
400 # ─────────────────────────────────────────────────────────────────────────────
401
402 class TestStateIntegrity:
403
404 @pytest.mark.asyncio
405 async def test_T20_push_twice_produces_one_row_per_symbol(
406 self, db_session: AsyncSession
407 ) -> None:
408 """Upserting the same address twice must not create duplicate rows."""
409 repo = await create_repo(db_session, owner="typeuser", slug="t20-dup")
410 rid = str(repo.repo_id)
411 addr = "src/a.py::fn"
412
413 for _ in range(2):
414 await _insert_type_row(db_session, rid, addr, type_score=1.0)
415 await db_session.commit()
416
417 rows = (await db_session.execute(
418 sa.select(MusehubIntelType).where(
419 MusehubIntelType.repo_id == rid
420 )
421 )).scalars().all()
422 assert len(rows) == 1, f"Expected 1 row, got {len(rows)} — upsert broken"
423
424 @pytest.mark.asyncio
425 async def test_T21_second_push_overwrites_type_score(
426 self, db_session: AsyncSession
427 ) -> None:
428 """A second push with different type_score must overwrite the first."""
429 repo = await create_repo(db_session, owner="typeuser", slug="t21-overwrite")
430 rid = str(repo.repo_id)
431 addr = "src/a.py::fn"
432
433 await _insert_type_row(db_session, rid, addr, type_score=0.5)
434 await _insert_type_row(db_session, rid, addr, type_score=1.0)
435 await db_session.commit()
436
437 row = await db_session.scalar(
438 sa.select(MusehubIntelType).where(
439 MusehubIntelType.repo_id == rid,
440 MusehubIntelType.address == addr,
441 )
442 )
443 assert row is not None
444 assert row.type_score == pytest.approx(1.0), (
445 f"Expected score 1.0 after second push, got {row.type_score}"
446 )
447
448 @pytest.mark.asyncio
449 async def test_T22_repo_delete_cascades_type_rows(
450 self, db_session: AsyncSession
451 ) -> None:
452 """Deleting the repo must cascade-delete all musehub_intel_type rows."""
453 from musehub.db.musehub_repo_models import MusehubRepo
454
455 repo = await create_repo(db_session, owner="typeuser", slug="t22-cascade")
456 rid = str(repo.repo_id)
457 await _insert_type_row(db_session, rid, "src/a.py::fn")
458 await db_session.commit()
459
460 await db_session.delete(repo)
461 await db_session.commit()
462
463 remaining = (await db_session.execute(
464 sa.select(MusehubIntelType).where(
465 MusehubIntelType.repo_id == rid
466 )
467 )).scalars().all()
468 assert not remaining, "Cascade delete failed — type rows remain after repo delete"
469
470
471 # ─────────────────────────────────────────────────────────────────────────────
472 # Layer T6 — Performance
473 # ─────────────────────────────────────────────────────────────────────────────
474
475 class TestPerformance:
476
477 @pytest.mark.asyncio
478 async def test_T23_route_responds_under_200ms_for_10k_symbols(
479 self, client: AsyncClient, db_session: AsyncSession
480 ) -> None:
481 """Route must respond in < 200ms for a repo with 10,000 symbol rows."""
482 repo = await create_repo(db_session, owner="typeuser", slug="t23-perf")
483 rid = str(repo.repo_id)
484
485 # Insert 10k rows via direct batch insert
486 chunk = 1000
487 for start in range(0, 10_000, chunk):
488 rows = [
489 {
490 "repo_id": rid,
491 "address": f"src/file{i}.py::fn_{i}",
492 "kind": "function",
493 "type_score": 1.0 if i % 3 != 0 else 0.5,
494 "return_is_any": False,
495 "params_total": 2,
496 "params_annotated": 2,
497 "params_with_any": 0,
498 "return_annotation": "str",
499 "ref": _REF,
500 }
501 for i in range(start, start + chunk)
502 ]
503 await db_session.execute(
504 pg_insert(MusehubIntelType)
505 .values(rows)
506 .on_conflict_do_nothing()
507 )
508 await db_session.commit()
509
510 t0 = time.monotonic()
511 r = await client.get(f"/typeuser/t23-perf/intel/type")
512 elapsed = time.monotonic() - t0
513
514 assert r.status_code == 200
515 assert elapsed < 0.2, f"Route took {elapsed:.3f}s for 10k symbols (limit: 0.2s)"
516
517 @pytest.mark.asyncio
518 async def test_T24_db_query_uses_repo_index(
519 self, db_session: AsyncSession
520 ) -> None:
521 """SELECT on musehub_intel_type must use ix_intel_type_repo index."""
522 explain = await db_session.execute(
523 sa.text(
524 "EXPLAIN SELECT * FROM musehub_intel_type WHERE repo_id = 'x'"
525 )
526 )
527 plan = " ".join(row[0] for row in explain.all())
528 assert "ix_intel_type_repo" in plan or "Index" in plan, (
529 f"Query plan does not use ix_intel_type_repo:\n{plan}"
530 )
531
532 @pytest.mark.asyncio
533 async def test_T25_batch_upsert_500_symbols_under_500ms(
534 self, db_session: AsyncSession
535 ) -> None:
536 """Direct batch upsert of 500 rows must complete in < 500ms wall time."""
537 repo = await create_repo(db_session, owner="typeuser", slug="t25-batch")
538 rid = str(repo.repo_id)
539 rows = [
540 {
541 "repo_id": rid,
542 "address": f"src/f{i}.py::fn",
543 "kind": "function",
544 "type_score": 0.9,
545 "return_is_any": False,
546 "params_total": 1,
547 "params_annotated": 1,
548 "params_with_any": 0,
549 "return_annotation": None,
550 "ref": _REF,
551 }
552 for i in range(500)
553 ]
554 t0 = time.monotonic()
555 await db_session.execute(
556 pg_insert(MusehubIntelType)
557 .values(rows)
558 .on_conflict_do_nothing()
559 )
560 await db_session.commit()
561 elapsed = time.monotonic() - t0
562 assert elapsed < 0.5, f"500-row batch took {elapsed:.3f}s (limit: 0.5s)"
563
564
565 # ─────────────────────────────────────────────────────────────────────────────
566 # Layer T7 — Security
567 # ─────────────────────────────────────────────────────────────────────────────
568
569 class TestSecurity:
570
571 @pytest.mark.asyncio
572 async def test_T26_xss_in_address_is_escaped(
573 self, client: AsyncClient, db_session: AsyncSession
574 ) -> None:
575 """XSS payload in address must be HTML-escaped in the response."""
576 repo = await create_repo(db_session, owner="typeuser", slug="t26-xss")
577 rid = str(repo.repo_id)
578 xss = '<script>alert(1)</script>'
579 # Truncate to fit VARCHAR(512) and make it a valid-ish address
580 await _insert_type_row(db_session, rid,
581 f"src/x.py::{xss[:40]}", type_score=0.0)
582 await db_session.commit()
583
584 r = await client.get("/typeuser/t26-xss/intel/type")
585 assert r.status_code == 200
586 # Jinja2 autoescape must convert <script> to &lt;script&gt;.
587 # The raw executable tag must not appear unescaped.
588 assert "<script>alert" not in r.text, "XSS in address not escaped"
589
590 @pytest.mark.asyncio
591 async def test_T27_xss_in_return_annotation_is_escaped(
592 self, client: AsyncClient, db_session: AsyncSession
593 ) -> None:
594 """XSS payload in return_annotation must be HTML-escaped."""
595 repo = await create_repo(db_session, owner="typeuser", slug="t27-xss-ret")
596 rid = str(repo.repo_id)
597 await _insert_type_row(db_session, rid, "src/x.py::fn",
598 return_annotation='<img src=x onerror=alert(1)>')
599 await db_session.commit()
600
601 r = await client.get("/typeuser/t27-xss-ret/intel/type")
602 assert r.status_code == 200
603 # Raw unescaped tag must not appear; &lt;img ... is safe.
604 assert "<img src=x onerror" not in r.text, "XSS in return_annotation not escaped"
605
606 @pytest.mark.asyncio
607 async def test_T28_unknown_tier_param_treated_as_all(
608 self, client: AsyncClient, type_repo: MusehubRepo
609 ) -> None:
610 """?tier=unknown must return 200 (treated as 'all'), not 400/500."""
611 r = await client.get("/typeuser/type-e2e/intel/type?tier=garbage")
612 assert r.status_code == 200
613
614 @pytest.mark.asyncio
615 async def test_T29_non_integer_top_param_returns_422(
616 self, client: AsyncClient, type_repo: MusehubRepo
617 ) -> None:
618 """?top=notanumber must be rejected with 422 (FastAPI type validation)."""
619 r = await client.get("/typeuser/type-e2e/intel/type?top=notanumber")
620 assert r.status_code == 422
621
622 @pytest.mark.asyncio
623 async def test_T30_private_repo_returns_403_or_404_unauthenticated(
624 self, client: AsyncClient
625 ) -> None:
626 """A non-existent repo path must not return 200 or 500."""
627 r = await client.get("/nobody/no-such-repo/intel/type")
628 assert r.status_code in (403, 404)
629
630
631 # ---------------------------------------------------------------------------
632 # Internal helpers
633 # ---------------------------------------------------------------------------
634
635 def _mock_process(stdout: str, returncode: int = 0) -> AsyncMock:
636 proc = AsyncMock()
637 proc.returncode = returncode
638 proc.communicate = AsyncMock(return_value=(stdout.encode(), b""))
639 return proc
640
641
642 async def _make_repo_via_client(client: AsyncClient, owner: str, slug: str) -> str:
643 """Return slug — the caller constructs the path externally."""
644 return slug
File History 1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor 21 days ago