gabriel / musehub public

test_symbol_intelligence.py file-level

at sha256:3 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 💥 blast risk
sha256:0 fix: fall back to any indexed mpack in read_object_bytes when push mpac… · gabriel · Jun 17, 2026
1 """Section 6 — Symbol Intelligence (Intel): 7-layer test suite.
2
3 Covers:
4 - musehub/services/musehub_intel.py (compute_intel, _parse_ts, _health_label,
5 _health_color_class, IntelSnapshot, as_dict/from_dict)
6 - musehub/api/routes/musehub/blame.py (_build_real_symbol_blame, GET /repos/{repo_id}/blame/{ref})
7 - musehub/services/musehub_cross_repo.py (search_symbol_across_repos, cross_repo_impact,
8 workspace_blast_risk_top_n, build_deps_graph,
9 _module_prefix, _short_label)
10
11 Layers:
12 1. Unit — pure function tests, no DB, no I/O
13 2. Integration — real DB (PostgreSQL), service calls, no HTTP layer
14 3. End-to-End — full HTTP via AsyncClient, real DB
15 4. Stress — large data sets, volume correctness
16 5. Data Integrity — stored data correctness, field validation, round-trip
17 6. Security — auth guards, private repo access, injection safety
18 7. Performance — latency budgets for critical paths
19 """
20 from __future__ import annotations
21
22 import json
23 import secrets
24 import time
25 from datetime import datetime, timedelta, timezone
26
27 import msgpack
28
29 type SymbolHistory = dict[str, list[JSONObject]]
30 import pytest
31 import pytest_asyncio
32 from httpx import AsyncClient
33 from sqlalchemy.ext.asyncio import AsyncSession
34
35 from musehub.services.musehub_intel import (
36 IntelSnapshot,
37 BlastRiskEntry,
38 CouplingPair,
39 DeadEntry,
40 HotspotEntry,
41 VelocityWindow,
42 _health_color_class,
43 _health_label,
44 _parse_ts,
45 compute_intel,
46 )
47 from musehub.types.json_types import JSONObject, StrDict
48 from tests.factories import create_repo
49
50 # ---------------------------------------------------------------------------
51 # Local helpers
52 # ---------------------------------------------------------------------------
53
54 def _now() -> datetime:
55 return datetime.now(tz=timezone.utc)
56
57
58 def _ago(days: int = 0, **kwargs: int) -> datetime:
59 return _now() - timedelta(days=days, **kwargs)
60
61
62 def _ts(dt: datetime) -> str:
63 return dt.isoformat()
64
65
66 def _entry(commit_id: str, op: str = "add", ts: datetime | None = None,
67 content_id: str = "sha256:abc") -> JSONObject:
68 return {
69 "commit_id": commit_id,
70 "op": op,
71 "timestamp": _ts(ts or _now()),
72 "committed_at": _ts(ts or _now()),
73 "content_id": content_id,
74 }
75
76
77 def _history(**kwargs: list[JSONObject]) -> SymbolHistory:
78 """Build a symbol_history dict from keyword args: addr=entries."""
79 return dict(kwargs)
80
81
82 async def _build_index(session: AsyncSession, repo_id: str, head_id: str,
83 ops: list[JSONObject]) -> "types.SimpleNamespace":
84 """Insert one commit, build the symbol index, persist results, and return
85 a namespace with intel_full_json and intel_summary attributes."""
86 import types as _types
87 from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef
88 from musehub.services.musehub_symbol_indexer import build_symbol_index
89 from musehub.services.musehub_intel_providers import persist_intel_results
90
91 commit = MusehubCommit(
92 commit_id=head_id,
93 branch="main",
94 parent_ids=[],
95 message="test commit",
96 author="gabriel",
97 timestamp=_now(),
98 structured_delta={"ops": ops},
99 )
100 session.add(commit)
101 session.add(MusehubCommitRef(repo_id=repo_id, commit_id=head_id))
102 await session.flush()
103 results = await build_symbol_index(session, repo_id, head_id)
104 await persist_intel_results(session, repo_id, head_id, results)
105 await session.commit()
106 data_by_type = {t: json.dumps(d) for t, d in results}
107 return _types.SimpleNamespace(
108 intel_full_json=data_by_type.get("code.intel_snapshot"),
109 intel_summary=data_by_type.get("code.intel_summary"),
110 )
111
112
113 def _insert_op(address: str, content_id: str = "sha256:abc") -> JSONObject:
114 return {"address": address, "op": "insert", "content_id": content_id}
115
116
117 # ===========================================================================
118 # Layer 1 — Unit tests (pure functions, no DB, no I/O)
119 # ===========================================================================
120
121 class TestParseTs:
122 def test_iso_string_utc(self) -> None:
123 dt = _parse_ts("2025-01-15T10:30:00+00:00")
124 assert dt.year == 2025
125 assert dt.month == 1
126 assert dt.tzinfo is not None
127
128 def test_iso_string_z_suffix(self) -> None:
129 dt = _parse_ts("2025-06-01T00:00:00Z")
130 assert dt.tzinfo is not None
131 assert dt.year == 2025
132
133 def test_unix_int(self) -> None:
134 dt = _parse_ts(0)
135 assert dt.year == 1970
136 assert dt.tzinfo is not None
137
138 def test_unix_float(self) -> None:
139 dt = _parse_ts(1_700_000_000.5)
140 assert dt.year == 2023
141
142 def test_invalid_string_raises(self) -> None:
143 with pytest.raises(Exception):
144 _parse_ts("not-a-date")
145
146
147 class TestHealthLabel:
148 def test_excellent(self) -> None:
149 assert _health_label(100) == "Excellent"
150 assert _health_label(90) == "Excellent"
151
152 def test_good(self) -> None:
153 assert _health_label(89) == "Good"
154 assert _health_label(75) == "Good"
155
156 def test_fair(self) -> None:
157 assert _health_label(74) == "Fair"
158 assert _health_label(55) == "Fair"
159
160 def test_poor(self) -> None:
161 assert _health_label(54) == "Poor"
162 assert _health_label(35) == "Poor"
163
164 def test_critical(self) -> None:
165 assert _health_label(34) == "Critical"
166 assert _health_label(0) == "Critical"
167
168
169 class TestHealthColorClass:
170 def test_excellent(self) -> None:
171 assert _health_color_class(90) == "intel-health--excellent"
172
173 def test_good(self) -> None:
174 assert _health_color_class(75) == "intel-health--good"
175
176 def test_fair(self) -> None:
177 assert _health_color_class(55) == "intel-health--fair"
178
179 def test_poor(self) -> None:
180 assert _health_color_class(35) == "intel-health--poor"
181
182 def test_critical(self) -> None:
183 assert _health_color_class(0) == "intel-health--critical"
184
185
186 class TestComputeIntelUnit:
187 def test_empty_history_returns_zero_score(self) -> None:
188 snap = compute_intel({}, [], now_utc=_now())
189 assert snap.total_symbols == 0
190 assert snap.total_commits_indexed == 0
191 assert snap.health_score == 100 # no penalties = 100
192 assert snap.health_label == "Excellent"
193
194 def test_single_symbol_no_ts(self) -> None:
195 history = {"file.py::Foo": [{"commit_id": "c1", "op": "add"}]}
196 snap = compute_intel(history, [], now_utc=_now())
197 assert snap.total_symbols == 1
198 assert snap.total_commits_indexed == 1
199
200 def test_hotspot_detection(self) -> None:
201 # 12 changes on one symbol — exceeds _HOTSPOT_THRESHOLD (10)
202 entries = [_entry(f"c{i}") for i in range(12)]
203 history = {"file.py::HotFn": entries}
204 snap = compute_intel(history, [], now_utc=_now())
205 assert snap.alert_hotspot_count >= 1
206 assert any(h.address == "file.py::HotFn" for h in snap.hotspots)
207
208 def test_dead_code_detection(self) -> None:
209 # One old entry, last touched 100 days ago
210 old_ts = _ago(100)
211 history = {"file.py::Stale": [_entry("c1", ts=old_ts)]}
212 snap = compute_intel(history, [], now_utc=_now())
213 assert snap.alert_dead_count >= 1
214 assert any(d.address == "file.py::Stale" for d in snap.dead_candidates)
215
216 def test_recent_symbol_not_dead(self) -> None:
217 recent_ts = _ago(5)
218 history = {"file.py::Fresh": [_entry("c1", ts=recent_ts)]}
219 snap = compute_intel(history, [], now_utc=_now())
220 assert snap.alert_dead_count == 0
221
222 def test_blast_risk_co_change(self) -> None:
223 # Two symbols always change together → blast risk for both
224 entries_a = [_entry("c1"), _entry("c2")]
225 entries_b = [_entry("c1"), _entry("c2")]
226 history = {
227 "file.py::Alpha": entries_a,
228 "file.py::Beta": entries_b,
229 }
230 snap = compute_intel(history, [], now_utc=_now())
231 # Both are co-changed — blast risk entries should include at least one
232 assert len(snap.blast_risk) >= 1
233
234 def test_coupling_pairs_detected(self) -> None:
235 # Symbols sharing same commit → coupling pair
236 entries_a = [_entry("shared-commit")]
237 entries_b = [_entry("shared-commit")]
238 history = {
239 "file.py::A": entries_a,
240 "file.py::B": entries_b,
241 }
242 snap = compute_intel(history, [], now_utc=_now())
243 assert len(snap.coupling_pairs) >= 1
244 pair = snap.coupling_pairs[0]
245 assert pair.shared_commits >= 1
246
247 def test_breaking_changes_reduce_score(self) -> None:
248 snap_no_breaks = compute_intel({}, [], now_utc=_now())
249 snap_with_breaks = compute_intel({}, ["break1", "break2", "break3"], now_utc=_now())
250 assert snap_with_breaks.health_score < snap_no_breaks.health_score
251 assert snap_with_breaks.alert_breaking_count == 3
252
253 def test_velocity_buckets_populated(self) -> None:
254 recent = _ago(days=1)
255 history = {"file.py::Fn": [_entry("c1", ts=recent)]}
256 snap = compute_intel(history, [], now_utc=_now())
257 assert len(snap.velocity.weeks) == 12
258 assert snap.velocity.weeks[0] >= 1 # most recent week bucket
259
260 def test_health_score_capped_at_100(self) -> None:
261 snap = compute_intel({}, [], now_utc=_now())
262 assert 0 <= snap.health_score <= 100
263
264 def test_top_n_hotspots_limit(self) -> None:
265 # 20 symbols each changed 15 times → _TOP_N=10 returned
266 history: SymbolHistory = {}
267 for i in range(20):
268 history[f"file.py::Fn{i}"] = [_entry(f"c{i}_{j}") for j in range(15)]
269 snap = compute_intel(history, [], now_utc=_now())
270 assert len(snap.hotspots) <= 10
271
272 def test_dead_candidates_sorted_by_coldest_first(self) -> None:
273 h = {
274 "file.py::Old": [_entry("c1", ts=_ago(200))],
275 "file.py::Older": [_entry("c2", ts=_ago(300))],
276 }
277 snap = compute_intel(h, [], now_utc=_now())
278 if len(snap.dead_candidates) >= 2:
279 assert snap.dead_candidates[0].days_cold >= snap.dead_candidates[1].days_cold
280
281 def test_timestamp_invalid_gracefully_ignored(self) -> None:
282 history = {
283 "file.py::BadTs": [{"commit_id": "c1", "op": "add", "timestamp": "NOT_A_DATE"}]
284 }
285 snap = compute_intel(history, [], now_utc=_now())
286 # Should not raise; symbol counted but ts ignored
287 assert snap.total_symbols == 1
288
289
290 class TestIntelSnapshotSerialisation:
291 def _make_snap(self) -> IntelSnapshot:
292 return IntelSnapshot(
293 health_score=80,
294 health_label="Good",
295 alert_hotspot_count=2,
296 alert_dead_count=1,
297 alert_blast_risk_count=3,
298 alert_breaking_count=0,
299 hotspots=[HotspotEntry(address="a.py::Fn", change_count=15, last_changed=None)],
300 dead_candidates=[DeadEntry(address="b.py::Old", days_cold=120, blast_radius=0, added_at=None)],
301 blast_risk=[BlastRiskEntry(address="c.py::Risk", co_change_count=25, top_co_symbols=["d.py::X"])],
302 coupling_pairs=[CouplingPair(address_a="a.py::F", address_b="b.py::G", shared_commits=5)],
303 velocity=VelocityWindow(weeks=[1, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
304 total_symbols=50,
305 total_commits_indexed=10,
306 )
307
308 def test_as_dict_round_trip(self) -> None:
309 snap = self._make_snap()
310 d = snap.as_dict()
311 reconstructed = IntelSnapshot.from_dict(d)
312 assert reconstructed.health_score == 80
313 assert reconstructed.health_label == "Good"
314 assert reconstructed.total_symbols == 50
315 assert reconstructed.hotspots[0].address == "a.py::Fn"
316 assert reconstructed.dead_candidates[0].days_cold == 120
317 assert reconstructed.blast_risk[0].co_change_count == 25
318 assert reconstructed.coupling_pairs[0].shared_commits == 5
319 assert reconstructed.velocity.weeks[0] == 1
320
321 def test_as_dict_json_serialisable(self) -> None:
322 snap = self._make_snap()
323 d = snap.as_dict()
324 # Must be JSON-serialisable (no datetimes, no custom objects)
325 json_str = json.dumps(d)
326 assert "health_score" in json_str
327
328 def test_from_dict_missing_optional_fields(self) -> None:
329 minimal = {
330 "health_score": 70,
331 "health_label": "Fair",
332 "alert_hotspot_count": 0,
333 "alert_dead_count": 0,
334 "alert_blast_risk_count": 0,
335 "alert_breaking_count": 0,
336 "total_symbols": 0,
337 "total_commits_indexed": 0,
338 }
339 snap = IntelSnapshot.from_dict(minimal)
340 assert snap.hotspots == []
341 assert snap.dead_candidates == []
342 assert snap.coupling_pairs == []
343 assert snap.velocity.weeks == []
344
345
346 class TestModulePrefix:
347 def test_three_segments(self) -> None:
348 from musehub.services.musehub_cross_repo import _module_prefix
349 result = _module_prefix("musehub.services.musehub_ci.enqueue_run")
350 assert result == "musehub.services.musehub_ci"
351
352 def test_fewer_than_depth(self) -> None:
353 from musehub.services.musehub_cross_repo import _module_prefix
354 result = _module_prefix("a.b")
355 assert result == "a.b" # shorter than depth=3, returns as-is
356
357 def test_exactly_depth(self) -> None:
358 from musehub.services.musehub_cross_repo import _module_prefix
359 result = _module_prefix("a.b.c")
360 assert result == "a.b.c"
361
362 def test_custom_depth(self) -> None:
363 from musehub.services.musehub_cross_repo import _module_prefix
364 result = _module_prefix("a.b.c.d.e", depth=2)
365 assert result == "a.b"
366
367
368 class TestShortLabel:
369 def test_two_segments(self) -> None:
370 from musehub.services.musehub_cross_repo import _short_label
371 assert _short_label("a.b.c") == "b.c"
372
373 def test_single_segment(self) -> None:
374 from musehub.services.musehub_cross_repo import _short_label
375 assert _short_label("single") == "single"
376
377
378 class TestBuildRealSymbolBlame:
379 def test_filters_to_path(self) -> None:
380 from musehub.api.routes.musehub.blame import _build_real_symbol_blame
381
382 history = {
383 "musehub/api.py::Foo": [_entry("c1")],
384 "other/file.py::Bar": [_entry("c2")],
385 }
386 commit_map = {
387 "c1": {"message": "add Foo", "author": "gabriel", "timestamp": _now()},
388 }
389 results = _build_real_symbol_blame(history, "musehub/api.py", commit_map)
390 assert len(results) == 1
391 assert results[0].symbol_name == "Foo"
392
393 def test_excludes_import_declarations(self) -> None:
394 from musehub.api.routes.musehub.blame import _build_real_symbol_blame
395
396 history = {
397 "file.py::import::os": [_entry("c1")],
398 "file.py::MyFn": [_entry("c1")],
399 }
400 commit_map = {"c1": {"message": "m", "author": "g", "timestamp": _now()}}
401 results = _build_real_symbol_blame(history, "file.py", commit_map)
402 names = [r.symbol_name for r in results]
403 assert "MyFn" in names
404 assert "import::os" not in names
405
406 def test_excludes_deleted_symbols(self) -> None:
407 from musehub.api.routes.musehub.blame import _build_real_symbol_blame
408
409 history = {
410 "file.py::Gone": [_entry("c1", op="delete")],
411 "file.py::Here": [_entry("c2", op="add")],
412 }
413 commit_map = {
414 "c1": {"message": "del", "author": "g", "timestamp": _now()},
415 "c2": {"message": "add", "author": "g", "timestamp": _now()},
416 }
417 results = _build_real_symbol_blame(history, "file.py", commit_map)
418 names = [r.symbol_name for r in results]
419 assert "Gone" not in names
420 assert "Here" in names
421
422 def test_intel_signals_populated(self) -> None:
423 from musehub.api.routes.musehub.blame import _build_real_symbol_blame
424
425 history = {
426 "file.py::HotFn": [_entry("c1")],
427 }
428 commit_map = {"c1": {"message": "m", "author": "g", "timestamp": _now()}}
429 intel = compute_intel(
430 {"file.py::HotFn": [_entry(f"c{i}") for i in range(15)]},
431 [],
432 now_utc=_now(),
433 )
434 results = _build_real_symbol_blame(history, "file.py", commit_map, intel=intel)
435 assert len(results) == 1
436 assert results[0].is_hotspot is True
437
438 def test_change_count_reflects_history_length(self) -> None:
439 from musehub.api.routes.musehub.blame import _build_real_symbol_blame
440
441 history = {
442 "file.py::Changed": [_entry("c1"), _entry("c2"), _entry("c3")],
443 }
444 commit_map = {
445 "c1": {"message": "m", "author": "g", "timestamp": _now()},
446 "c2": {"message": "m", "author": "g", "timestamp": _now()},
447 "c3": {"message": "m", "author": "g", "timestamp": _now()},
448 }
449 results = _build_real_symbol_blame(history, "file.py", commit_map)
450 assert results[0].change_count == 3
451
452 def test_empty_history_returns_empty_list(self) -> None:
453 from musehub.api.routes.musehub.blame import _build_real_symbol_blame
454
455 results = _build_real_symbol_blame({}, "file.py", {})
456 assert results == []
457
458 def test_unknown_commit_id_falls_back_gracefully(self) -> None:
459 from musehub.api.routes.musehub.blame import _build_real_symbol_blame
460
461 history = {"file.py::Fn": [_entry("unknown-commit")]}
462 results = _build_real_symbol_blame(history, "file.py", {})
463 assert len(results) == 1
464 assert results[0].author == ""
465 assert results[0].commit_message == ""
466
467
468 # ===========================================================================
469 # Layer 2 — Integration tests (real DB, service layer, no HTTP)
470 # ===========================================================================
471
472 class TestComputeIntelIntegration:
473 @pytest.mark.asyncio
474 async def test_load_intel_snapshot_none_when_no_index(
475 self, db_session: AsyncSession
476 ) -> None:
477 from musehub.services.musehub_symbol_indexer import load_intel_snapshot
478
479 repo = await create_repo(db_session, slug="intel-no-index")
480 result = await load_intel_snapshot(db_session, repo.repo_id)
481 assert result is None
482
483 @pytest.mark.asyncio
484 async def test_build_index_populates_intel_full_json(
485 self, db_session: AsyncSession
486 ) -> None:
487 from musehub.services.musehub_symbol_indexer import load_intel_snapshot
488
489 repo = await create_repo(db_session, slug="intel-populated")
490 ops = [_insert_op("src/main.py::run"), _insert_op("src/main.py::setup")]
491 row = await _build_index(db_session, repo.repo_id, "head-intel-1", ops)
492 assert row is not None
493 assert row.intel_full_json is not None
494
495 snap = await load_intel_snapshot(db_session, repo.repo_id)
496 assert snap is not None
497 assert snap.total_symbols == 2
498
499 @pytest.mark.asyncio
500 async def test_intel_health_score_range(
501 self, db_session: AsyncSession
502 ) -> None:
503 from musehub.services.musehub_symbol_indexer import load_intel_snapshot
504
505 repo = await create_repo(db_session, slug="intel-health-range")
506 ops = [_insert_op(f"src/f.py::Fn{i}") for i in range(5)]
507 await _build_index(db_session, repo.repo_id, "head-hr", ops)
508
509 snap = await load_intel_snapshot(db_session, repo.repo_id)
510 assert snap is not None
511 assert 0 <= snap.health_score <= 100
512
513 @pytest.mark.asyncio
514 async def test_intel_summary_json_fields(
515 self, db_session: AsyncSession
516 ) -> None:
517 repo = await create_repo(db_session, slug="intel-summary-fields")
518 ops = [_insert_op("api.py::endpoint")]
519 row = await _build_index(db_session, repo.repo_id, "head-summ", ops)
520 assert row is not None
521 assert row.intel_summary is not None
522 summary = json.loads(row.intel_summary)
523 assert "health_score" in summary
524 assert "symbol_count" in summary
525 assert "hotspot_count" in summary
526 assert "dead_symbol_count" in summary
527
528
529 class TestBlameIntegration:
530 @pytest.mark.asyncio
531 async def test_blame_returns_empty_when_no_index(
532 self, db_session: AsyncSession
533 ) -> None:
534 from musehub.services.musehub_symbol_indexer import load_symbol_history
535 from musehub.api.routes.musehub.blame import _build_real_symbol_blame
536
537 repo = await create_repo(db_session, slug="blame-no-idx")
538 history = await load_symbol_history(db_session, repo.repo_id, file_path="file.py")
539 results = _build_real_symbol_blame(history, "file.py", {})
540 assert results == []
541
542 @pytest.mark.asyncio
543 async def test_blame_entries_after_index_build(
544 self, db_session: AsyncSession
545 ) -> None:
546 from musehub.services.musehub_symbol_indexer import load_symbol_history
547 from musehub.api.routes.musehub.blame import _build_real_symbol_blame
548
549 repo = await create_repo(db_session, slug="blame-with-idx")
550 ops = [
551 _insert_op("src/api.py::handle_request"),
552 _insert_op("src/api.py::parse_args"),
553 ]
554 await _build_index(db_session, repo.repo_id, "head-blame", ops)
555
556 history = await load_symbol_history(
557 db_session, repo.repo_id, file_path="src/api.py"
558 )
559 results = _build_real_symbol_blame(history, "src/api.py", {})
560 names = [r.symbol_name for r in results]
561 assert "handle_request" in names
562 assert "parse_args" in names
563
564
565 class TestCrossRepoIntegration:
566 @pytest.mark.asyncio
567 async def test_search_symbol_no_repos(
568 self, db_session: AsyncSession
569 ) -> None:
570 from musehub.services.musehub_cross_repo import search_symbol_across_repos
571
572 result = await search_symbol_across_repos(
573 db_session, "ghost-owner", "Fn", visible_to_user="ghost-owner"
574 )
575 assert result == []
576
577 @pytest.mark.asyncio
578 async def test_search_symbol_finds_match(
579 self, db_session: AsyncSession
580 ) -> None:
581 from musehub.services.musehub_cross_repo import search_symbol_across_repos
582
583 owner = f"owner-{secrets.token_hex(4)}"
584 repo = await create_repo(db_session, slug="search-sym-repo", owner=owner,
585 visibility="public")
586 ops = [_insert_op("api.py::compute_intel")]
587 await _build_index(db_session, repo.repo_id, "head-search", ops)
588
589 results = await search_symbol_across_repos(
590 db_session, owner, "compute_intel", visible_to_user=owner
591 )
592 assert len(results) >= 1
593 assert any("compute_intel" in r.address for r in results)
594
595 @pytest.mark.asyncio
596 async def test_search_symbol_case_insensitive(
597 self, db_session: AsyncSession
598 ) -> None:
599 from musehub.services.musehub_cross_repo import search_symbol_across_repos
600
601 owner = f"owner-{secrets.token_hex(4)}"
602 repo = await create_repo(db_session, slug="search-case-repo", owner=owner,
603 visibility="public")
604 ops = [_insert_op("api.py::MyFunction")]
605 await _build_index(db_session, repo.repo_id, "head-case", ops)
606
607 results = await search_symbol_across_repos(
608 db_session, owner, "myfunction", visible_to_user=owner
609 )
610 assert any("MyFunction" in r.address for r in results)
611
612 @pytest.mark.asyncio
613 async def test_search_symbol_private_repo_excluded_without_auth(
614 self, db_session: AsyncSession
615 ) -> None:
616 from musehub.services.musehub_cross_repo import search_symbol_across_repos
617
618 owner = f"owner-{secrets.token_hex(4)}"
619 repo = await create_repo(db_session, slug="search-private-repo", owner=owner,
620 visibility="private")
621 ops = [_insert_op("api.py::SecretFn")]
622 await _build_index(db_session, repo.repo_id, "head-priv", ops)
623
624 # visible_to_user=None → only public repos
625 results = await search_symbol_across_repos(
626 db_session, owner, "SecretFn", visible_to_user=None
627 )
628 assert not any("SecretFn" in r.address for r in results)
629
630 @pytest.mark.asyncio
631 async def test_workspace_blast_risk_empty(
632 self, db_session: AsyncSession
633 ) -> None:
634 from musehub.services.musehub_cross_repo import workspace_blast_risk_top_n
635
636 result = await workspace_blast_risk_top_n(
637 db_session, "nonexistent-owner", visible_to_user="nonexistent-owner"
638 )
639 assert result == []
640
641 @pytest.mark.asyncio
642 async def test_workspace_blast_risk_populated(
643 self, db_session: AsyncSession
644 ) -> None:
645 from musehub.services.musehub_cross_repo import workspace_blast_risk_top_n
646
647 owner = f"owner-{secrets.token_hex(4)}"
648 repo = await create_repo(db_session, slug="wbr-repo", owner=owner,
649 visibility="public")
650 ops = [_insert_op("a.py::Fn"), _insert_op("b.py::Gn")]
651 await _build_index(db_session, repo.repo_id, "head-wbr", ops)
652
653 results = await workspace_blast_risk_top_n(
654 db_session, owner, visible_to_user=owner
655 )
656 assert len(results) >= 2
657 # Sorted by co_change_count descending
658 for i in range(len(results) - 1):
659 assert results[i].co_change_count >= results[i + 1].co_change_count
660
661 @pytest.mark.asyncio
662 async def test_cross_repo_impact_no_source_repo(
663 self, db_session: AsyncSession
664 ) -> None:
665 from musehub.services.musehub_cross_repo import cross_repo_impact
666
667 result = await cross_repo_impact(
668 db_session, "ghost-owner", secrets.token_hex(16), "file.py::Fn",
669 visible_to_user="ghost-owner",
670 )
671 assert result is None
672
673 @pytest.mark.asyncio
674 async def test_cross_repo_impact_unknown_address(
675 self, db_session: AsyncSession
676 ) -> None:
677 from musehub.services.musehub_cross_repo import cross_repo_impact
678
679 owner = f"owner-{secrets.token_hex(4)}"
680 repo = await create_repo(db_session, slug="cri-unknown", owner=owner,
681 visibility="public")
682 ops = [_insert_op("a.py::KnownFn")]
683 await _build_index(db_session, repo.repo_id, "head-cri", ops)
684
685 result = await cross_repo_impact(
686 db_session, owner, repo.repo_id, "a.py::NonExistent",
687 visible_to_user=owner,
688 )
689 assert result is None
690
691 @pytest.mark.asyncio
692 async def test_build_deps_graph_single_repo(
693 self, db_session: AsyncSession
694 ) -> None:
695 from musehub.services.musehub_cross_repo import build_deps_graph
696
697 owner = f"owner-{secrets.token_hex(4)}"
698 repo = await create_repo(db_session, slug="deps-single", owner=owner,
699 visibility="public")
700 ops = [
701 _insert_op("a.b.c.Fn"),
702 _insert_op("a.b.d.Gn"),
703 ]
704 await _build_index(db_session, repo.repo_id, "head-deps", ops)
705
706 graph = await build_deps_graph(
707 db_session, owner, repo.repo_id, visible_to_user=owner
708 )
709 assert hasattr(graph, "nodes")
710 assert hasattr(graph, "edges")
711
712 @pytest.mark.asyncio
713 async def test_build_deps_graph_no_source_repo_returns_empty(
714 self, db_session: AsyncSession
715 ) -> None:
716 from musehub.services.musehub_cross_repo import build_deps_graph, DepsGraph
717
718 owner = f"owner-{secrets.token_hex(4)}"
719 graph = await build_deps_graph(
720 db_session, owner, secrets.token_hex(16), visible_to_user=owner
721 )
722 assert isinstance(graph, DepsGraph)
723
724
725 # ===========================================================================
726 # Layer 3 — End-to-End tests (full HTTP via AsyncClient, real DB)
727 # ===========================================================================
728
729 class TestBlameEndToEnd:
730 @pytest.mark.asyncio
731 async def test_blame_404_unknown_repo(
732 self, client: AsyncClient, db_session: AsyncSession
733 ) -> None:
734 resp = await client.get(
735 f"/api/repos/{secrets.token_hex(16)}/blame/HEAD",
736 params={"path": "file.py"},
737 )
738 assert resp.status_code == 404
739
740 @pytest.mark.asyncio
741 async def test_blame_public_repo_no_auth(
742 self, client: AsyncClient, db_session: AsyncSession
743 ) -> None:
744 repo = await create_repo(db_session, slug="blame-e2e-pub", visibility="public")
745 await db_session.commit()
746 resp = await client.get(
747 f"/api/repos/{repo.repo_id}/blame/HEAD",
748 params={"path": "file.py"},
749 )
750 assert resp.status_code == 200
751 data = resp.json()
752 assert "entries" in data
753 assert "totalEntries" in data
754 assert "path" in data
755
756 @pytest.mark.asyncio
757 async def test_blame_private_repo_requires_auth(
758 self, client: AsyncClient, db_session: AsyncSession
759 ) -> None:
760 repo = await create_repo(db_session, slug="blame-e2e-priv", visibility="private")
761 await db_session.commit()
762 resp = await client.get(
763 f"/api/repos/{repo.repo_id}/blame/HEAD",
764 params={"path": "file.py"},
765 )
766 assert resp.status_code == 401
767
768 @pytest.mark.asyncio
769 async def test_blame_returns_entries_after_index_build(
770 self, client: AsyncClient, db_session: AsyncSession
771 ) -> None:
772 repo = await create_repo(db_session, slug="blame-e2e-entries", visibility="public")
773 ops = [_insert_op("api/routes.py::dispatch"), _insert_op("api/routes.py::validate")]
774 await _build_index(db_session, repo.repo_id, "head-blame-e2e", ops)
775
776 resp = await client.get(
777 f"/api/repos/{repo.repo_id}/blame/HEAD",
778 params={"path": "api/routes.py"},
779 )
780 assert resp.status_code == 200
781 data = resp.json()
782 names = [e["symbolName"] for e in data["entries"]]
783 assert "dispatch" in names
784 assert "validate" in names
785
786 @pytest.mark.asyncio
787 async def test_blame_path_filter_respected(
788 self, client: AsyncClient, db_session: AsyncSession
789 ) -> None:
790 repo = await create_repo(db_session, slug="blame-e2e-filter", visibility="public")
791 ops = [
792 _insert_op("path/a.py::FnA"),
793 _insert_op("path/b.py::FnB"),
794 ]
795 await _build_index(db_session, repo.repo_id, "head-filter", ops)
796
797 resp = await client.get(
798 f"/api/repos/{repo.repo_id}/blame/HEAD",
799 params={"path": "path/a.py"},
800 )
801 assert resp.status_code == 200
802 data = resp.json()
803 names = [e["symbolName"] for e in data["entries"]]
804 assert "FnA" in names
805 assert "FnB" not in names
806
807 @pytest.mark.asyncio
808 async def test_symbol_index_rebuild_endpoint(
809 self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict
810 ) -> None:
811 from musehub.db.musehub_repo_models import MusehubBranch as _Branch, MusehubCommit as _Commit, MusehubCommitRef as _CommitRef
812
813 repo = await create_repo(db_session, slug="rebuild-e2e")
814 # Create a head commit on "main"
815 commit = _Commit(
816 commit_id="rebuild-head",
817 branch="main",
818 parent_ids=[],
819 message="initial",
820 author="gabriel",
821 timestamp=_now(),
822 structured_delta={"ops": [_insert_op("x.py::Fn")]},
823 )
824 db_branch = _Branch(
825 branch_id=secrets.token_hex(16),
826 repo_id=repo.repo_id,
827 name="main",
828 head_commit_id="rebuild-head",
829 )
830 db_session.add(commit)
831 db_session.add(_CommitRef(repo_id=repo.repo_id, commit_id="rebuild-head"))
832 db_session.add(db_branch)
833 await db_session.commit()
834
835 resp = await client.post(
836 f"/api/repos/{repo.repo_id}/symbol-index/rebuild",
837 headers=auth_headers,
838 )
839 assert resp.status_code in (200, 202)
840
841 @pytest.mark.asyncio
842 async def test_symbol_index_rebuild_requires_auth(
843 self, client: AsyncClient, db_session: AsyncSession
844 ) -> None:
845 repo = await create_repo(db_session, slug="rebuild-noauth")
846 await db_session.commit()
847 resp = await client.post(f"/api/repos/{repo.repo_id}/symbol-index/rebuild")
848 assert resp.status_code == 401
849
850
851 # ===========================================================================
852 # Layer 4 — Stress tests
853 # ===========================================================================
854
855 class TestStress:
856 def test_compute_intel_1000_symbols(self) -> None:
857 """compute_intel on 1000 symbols completes without error."""
858 history: SymbolHistory = {}
859 for i in range(1000):
860 ts = _ago(days=i % 200)
861 history[f"module/file_{i % 20}.py::Fn{i}"] = [
862 _entry(f"c{i}", ts=ts)
863 ]
864 snap = compute_intel(history, [], now_utc=_now())
865 assert snap.total_symbols == 1000
866 assert 0 <= snap.health_score <= 100
867
868 def test_compute_intel_many_co_changing_symbols(self) -> None:
869 """50 symbols all sharing the same commit — coupling matrix stays bounded."""
870 commit_id = "shared"
871 history = {
872 f"file.py::Fn{i}": [_entry(commit_id)] for i in range(50)
873 }
874 snap = compute_intel(history, [], now_utc=_now())
875 # _TOP_COUPLING=5 cap must be respected
876 assert len(snap.coupling_pairs) <= 5
877
878 @pytest.mark.asyncio
879 async def test_search_symbol_across_10_repos(
880 self, db_session: AsyncSession
881 ) -> None:
882 """Search across 10 repos each with 20 symbols."""
883 from musehub.services.musehub_cross_repo import search_symbol_across_repos
884
885 owner = f"stress-owner-{secrets.token_hex(3)}"
886 for i in range(10):
887 repo = await create_repo(
888 db_session, slug=f"stress-repo-{i}", owner=owner, visibility="public"
889 )
890 ops = [_insert_op(f"mod{j}.py::TargetFn{j}") for j in range(20)]
891 await _build_index(db_session, repo.repo_id, f"head-stress-{i}", ops)
892
893 results = await search_symbol_across_repos(
894 db_session, owner, "TargetFn", visible_to_user=owner, limit=50
895 )
896 assert len(results) >= 1
897
898 @pytest.mark.asyncio
899 async def test_workspace_blast_risk_across_5_repos(
900 self, db_session: AsyncSession
901 ) -> None:
902 from musehub.services.musehub_cross_repo import workspace_blast_risk_top_n
903
904 owner = f"wbr-owner-{secrets.token_hex(3)}"
905 for i in range(5):
906 repo = await create_repo(
907 db_session, slug=f"wbr-sr-{i}", owner=owner, visibility="public"
908 )
909 ops = [_insert_op(f"f{j}.py::Fn{j}") for j in range(10)]
910 await _build_index(db_session, repo.repo_id, f"head-wbr-{i}", ops)
911
912 results = await workspace_blast_risk_top_n(
913 db_session, owner, top_n=20, visible_to_user=owner
914 )
915 # 5 repos × 10 symbols each = 50 entries, capped at top_n=20
916 assert len(results) <= 20
917 assert len(results) >= 1
918
919 def test_blame_build_500_symbols(self) -> None:
920 """_build_real_symbol_blame with 500 symbols in one file stays fast."""
921 from musehub.api.routes.musehub.blame import _build_real_symbol_blame
922
923 history = {f"big/file.py::Fn{i}": [_entry(f"c{i}")] for i in range(500)}
924 commit_map = {f"c{i}": {"message": "m", "author": "g", "timestamp": _now()}
925 for i in range(500)}
926 results = _build_real_symbol_blame(history, "big/file.py", commit_map)
927 assert len(results) == 500
928
929
930 # ===========================================================================
931 # Layer 5 — Data Integrity tests
932 # ===========================================================================
933
934 class TestDataIntegrity:
935 def test_intel_snapshot_as_dict_from_dict_identity(self) -> None:
936 """Round-trip through as_dict/from_dict is lossless for all fields."""
937 snap = compute_intel(
938 {
939 "file.py::Fn": [_entry(f"c{i}") for i in range(15)],
940 "file.py::Old": [_entry("co", ts=_ago(150))],
941 },
942 ["breaking1"],
943 now_utc=_now(),
944 )
945 d = snap.as_dict()
946 reconstructed = IntelSnapshot.from_dict(d)
947 assert reconstructed.health_score == snap.health_score
948 assert reconstructed.alert_hotspot_count == snap.alert_hotspot_count
949 assert reconstructed.alert_dead_count == snap.alert_dead_count
950 assert reconstructed.alert_breaking_count == snap.alert_breaking_count
951 assert len(reconstructed.hotspots) == len(snap.hotspots)
952
953 @pytest.mark.asyncio
954 async def test_intel_full_json_stored_and_retrievable(
955 self, db_session: AsyncSession
956 ) -> None:
957 from musehub.services.musehub_symbol_indexer import load_intel_snapshot
958
959 repo = await create_repo(db_session, slug="di-intel-json")
960 ops = [_insert_op("svc.py::do_work", "sha256:beef")]
961 row = await _build_index(db_session, repo.repo_id, "head-di", ops)
962
963 assert row.intel_full_json is not None
964 snap = await load_intel_snapshot(db_session, repo.repo_id)
965 assert snap is not None
966 assert snap.total_symbols == 1
967 hotspot_addrs = [h.address for h in snap.hotspots]
968 # Address must be present in symbol set
969 all_in_dict = json.loads(row.intel_full_json)
970 assert all_in_dict["total_symbols"] == 1
971
972 def test_velocity_week_buckets_count(self) -> None:
973 """Velocity must always have exactly 12 buckets."""
974 history = {
975 "f.py::Fn": [_entry("c1", ts=_ago(days=1))],
976 }
977 snap = compute_intel(history, [], now_utc=_now())
978 assert len(snap.velocity.weeks) == 12
979
980 def test_hotspot_entries_have_required_fields(self) -> None:
981 history = {
982 "f.py::Fn": [_entry(f"c{i}") for i in range(12)],
983 }
984 snap = compute_intel(history, [], now_utc=_now())
985 for h in snap.hotspots:
986 assert isinstance(h.address, str)
987 assert isinstance(h.change_count, int)
988 assert h.change_count > 0
989
990 def test_dead_entry_days_cold_matches_expected(self) -> None:
991 old_ts = _ago(120)
992 history = {"f.py::Old": [_entry("c1", ts=old_ts)]}
993 snap = compute_intel(history, [], now_utc=_now())
994 if snap.dead_candidates:
995 entry = snap.dead_candidates[0]
996 assert 110 <= entry.days_cold <= 130 # allow ±10 days rounding
997
998 @pytest.mark.asyncio
999 async def test_blame_entry_fields_complete(
1000 self, db_session: AsyncSession
1001 ) -> None:
1002 from musehub.services.musehub_symbol_indexer import load_symbol_history
1003 from musehub.api.routes.musehub.blame import _build_real_symbol_blame
1004
1005 repo = await create_repo(db_session, slug="di-blame-fields")
1006 ops = [_insert_op("f.py::Fn", "sha256:data1")]
1007 await _build_index(db_session, repo.repo_id, "head-di-blame", ops)
1008
1009 history = await load_symbol_history(db_session, repo.repo_id, file_path="f.py")
1010 commit_map = {"head-di-blame": {"message": "feat: add fn", "author": "gabriel",
1011 "timestamp": _now()}}
1012 results = _build_real_symbol_blame(history, "f.py", commit_map)
1013 assert len(results) == 1
1014 entry = results[0]
1015 assert entry.symbol_name == "Fn"
1016 assert entry.symbol_address == "f.py::Fn"
1017 assert entry.op in ("add", "modify", "delete", "insert", "replace", "patch", "rename")
1018
1019
1020 # ===========================================================================
1021 # Layer 6 — Security tests
1022 # ===========================================================================
1023
1024 class TestSecurity:
1025 @pytest.mark.asyncio
1026 async def test_blame_private_repo_401_no_token(
1027 self, client: AsyncClient, db_session: AsyncSession
1028 ) -> None:
1029 repo = await create_repo(db_session, slug="sec-blame-priv", visibility="private")
1030 await db_session.commit()
1031 resp = await client.get(
1032 f"/api/repos/{repo.repo_id}/blame/HEAD",
1033 params={"path": "file.py"},
1034 )
1035 assert resp.status_code == 401
1036
1037 @pytest.mark.asyncio
1038 async def test_blame_404_for_deleted_repo(
1039 self, client: AsyncClient, db_session: AsyncSession
1040 ) -> None:
1041 repo = await create_repo(db_session, slug="sec-blame-deleted", visibility="public")
1042 await db_session.delete(repo)
1043 await db_session.commit()
1044
1045 resp = await client.get(
1046 f"/api/repos/{repo.repo_id}/blame/HEAD",
1047 params={"path": "file.py"},
1048 )
1049 assert resp.status_code == 404
1050
1051 @pytest.mark.asyncio
1052 async def test_search_private_repo_not_visible_to_other_user(
1053 self, db_session: AsyncSession
1054 ) -> None:
1055 from musehub.services.musehub_cross_repo import search_symbol_across_repos
1056
1057 owner = f"sec-owner-{secrets.token_hex(3)}"
1058 repo = await create_repo(db_session, slug="sec-priv-search", owner=owner,
1059 visibility="private")
1060 ops = [_insert_op("secret.py::TopSecretFn")]
1061 await _build_index(db_session, repo.repo_id, "head-sec-priv", ops)
1062
1063 # Different user can't see private repo
1064 results = await search_symbol_across_repos(
1065 db_session, owner, "TopSecretFn", visible_to_user="other-user"
1066 )
1067 assert not any("TopSecretFn" in r.address for r in results)
1068
1069 @pytest.mark.asyncio
1070 async def test_blame_path_with_traversal_chars_no_crash(
1071 self, client: AsyncClient, db_session: AsyncSession
1072 ) -> None:
1073 repo = await create_repo(db_session, slug="sec-traversal", visibility="public")
1074 await db_session.commit()
1075 # Path with traversal attempt — server should return 200 with empty entries
1076 resp = await client.get(
1077 f"/api/repos/{repo.repo_id}/blame/HEAD",
1078 params={"path": "../../../etc/passwd"},
1079 )
1080 assert resp.status_code == 200
1081 data = resp.json()
1082 assert data["entries"] == []
1083
1084 def test_compute_intel_with_injected_commit_ids(self) -> None:
1085 """Malformed commit IDs in history do not cause exceptions."""
1086 history = {
1087 "f.py::Fn": [
1088 {"commit_id": "'; DROP TABLE commits; --", "op": "add"},
1089 {"commit_id": "", "op": "modify"},
1090 {"commit_id": None, "op": "add"},
1091 ]
1092 }
1093 snap = compute_intel(history, [], now_utc=_now())
1094 assert snap.total_symbols == 1
1095
1096 def test_blame_build_with_xss_in_commit_message(self) -> None:
1097 """XSS in commit messages is returned verbatim, not executed."""
1098 from musehub.api.routes.musehub.blame import _build_real_symbol_blame
1099
1100 history = {"f.py::Fn": [_entry("c1")]}
1101 xss_msg = "<script>alert('xss')</script>"
1102 commit_map = {"c1": {"message": xss_msg, "author": "<img onerror=alert()>",
1103 "timestamp": _now()}}
1104 results = _build_real_symbol_blame(history, "f.py", commit_map)
1105 assert results[0].commit_message == xss_msg # stored as-is (escaping is UI's job)
1106
1107
1108 # ===========================================================================
1109 # Layer 7 — Performance tests
1110 # ===========================================================================
1111
1112 class TestPerformance:
1113 def test_compute_intel_500_symbols_under_200ms(self) -> None:
1114 history = {
1115 f"pkg/mod_{i}.py::Symbol{i}": [
1116 _entry(f"c{i}_{j}", ts=_ago(j % 300))
1117 for j in range(5)
1118 ]
1119 for i in range(100)
1120 }
1121 t0 = time.perf_counter()
1122 snap = compute_intel(history, [], now_utc=_now())
1123 elapsed_ms = (time.perf_counter() - t0) * 1000
1124 assert elapsed_ms < 200, f"compute_intel took {elapsed_ms:.1f}ms"
1125 assert snap.total_symbols == 100
1126
1127 def test_intel_as_dict_from_dict_1000_entries_under_50ms(self) -> None:
1128 history = {f"f.py::Fn{i}": [_entry(f"c{i}")] for i in range(1000)}
1129 snap = compute_intel(history, [], now_utc=_now())
1130 t0 = time.perf_counter()
1131 d = snap.as_dict()
1132 IntelSnapshot.from_dict(d)
1133 elapsed_ms = (time.perf_counter() - t0) * 1000
1134 assert elapsed_ms < 50, f"as_dict/from_dict took {elapsed_ms:.1f}ms"
1135
1136 def test_blame_build_1000_symbols_under_200ms(self) -> None:
1137 from musehub.api.routes.musehub.blame import _build_real_symbol_blame
1138
1139 history = {f"big/file.py::Fn{i}": [_entry(f"c{i}")] for i in range(1000)}
1140 commit_map = {f"c{i}": {"message": "m", "author": "g", "timestamp": _now()}
1141 for i in range(1000)}
1142 t0 = time.perf_counter()
1143 results = _build_real_symbol_blame(history, "big/file.py", commit_map)
1144 elapsed_ms = (time.perf_counter() - t0) * 1000
1145 assert elapsed_ms < 200, f"_build_real_symbol_blame took {elapsed_ms:.1f}ms"
1146 assert len(results) == 1000
1147
1148 @pytest.mark.asyncio
1149 async def test_search_across_5_repos_under_1s(
1150 self, db_session: AsyncSession
1151 ) -> None:
1152 from musehub.services.musehub_cross_repo import search_symbol_across_repos
1153
1154 owner = f"perf-owner-{secrets.token_hex(3)}"
1155 for i in range(5):
1156 repo = await create_repo(
1157 db_session, slug=f"perf-repo-{i}", owner=owner, visibility="public"
1158 )
1159 ops = [_insert_op(f"m{j}.py::Fn{j}") for j in range(30)]
1160 await _build_index(db_session, repo.repo_id, f"head-perf-{i}", ops)
1161
1162 t0 = time.perf_counter()
1163 results = await search_symbol_across_repos(
1164 db_session, owner, "Fn", visible_to_user=owner
1165 )
1166 elapsed_ms = (time.perf_counter() - t0) * 1000
1167 assert elapsed_ms < 1000, f"search_symbol_across_repos took {elapsed_ms:.1f}ms"
1168 assert len(results) >= 1