test_symbol_intelligence.py
file-level
1
files
1
commits
0
hotspots
0
🧊 dead
0
💥 blast risk
| 1 | """Section 6 — Symbol Intelligence (Intel): 7-layer test suite. |
| 2 | |
| 3 | Covers: |
| 4 | - musehub/services/musehub_intel.py (compute_intel, _parse_ts, _health_label, |
| 5 | _health_color_class, IntelSnapshot, as_dict/from_dict) |
| 6 | - musehub/api/routes/musehub/blame.py (_build_real_symbol_blame, GET /repos/{repo_id}/blame/{ref}) |
| 7 | - musehub/services/musehub_cross_repo.py (search_symbol_across_repos, cross_repo_impact, |
| 8 | workspace_blast_risk_top_n, build_deps_graph, |
| 9 | _module_prefix, _short_label) |
| 10 | |
| 11 | Layers: |
| 12 | 1. Unit — pure function tests, no DB, no I/O |
| 13 | 2. Integration — real DB (PostgreSQL), service calls, no HTTP layer |
| 14 | 3. End-to-End — full HTTP via AsyncClient, real DB |
| 15 | 4. Stress — large data sets, volume correctness |
| 16 | 5. Data Integrity — stored data correctness, field validation, round-trip |
| 17 | 6. Security — auth guards, private repo access, injection safety |
| 18 | 7. Performance — latency budgets for critical paths |
| 19 | """ |
| 20 | from __future__ import annotations |
| 21 | |
| 22 | import json |
| 23 | import secrets |
| 24 | import time |
| 25 | from datetime import datetime, timedelta, timezone |
| 26 | |
| 27 | import msgpack |
| 28 | |
| 29 | type SymbolHistory = dict[str, list[JSONObject]] |
| 30 | import pytest |
| 31 | import pytest_asyncio |
| 32 | from httpx import AsyncClient |
| 33 | from sqlalchemy.ext.asyncio import AsyncSession |
| 34 | |
| 35 | from musehub.services.musehub_intel import ( |
| 36 | IntelSnapshot, |
| 37 | BlastRiskEntry, |
| 38 | CouplingPair, |
| 39 | DeadEntry, |
| 40 | HotspotEntry, |
| 41 | VelocityWindow, |
| 42 | _health_color_class, |
| 43 | _health_label, |
| 44 | _parse_ts, |
| 45 | compute_intel, |
| 46 | ) |
| 47 | from musehub.types.json_types import JSONObject, StrDict |
| 48 | from tests.factories import create_repo |
| 49 | |
| 50 | # --------------------------------------------------------------------------- |
| 51 | # Local helpers |
| 52 | # --------------------------------------------------------------------------- |
| 53 | |
| 54 | def _now() -> datetime: |
| 55 | return datetime.now(tz=timezone.utc) |
| 56 | |
| 57 | |
| 58 | def _ago(days: int = 0, **kwargs: int) -> datetime: |
| 59 | return _now() - timedelta(days=days, **kwargs) |
| 60 | |
| 61 | |
| 62 | def _ts(dt: datetime) -> str: |
| 63 | return dt.isoformat() |
| 64 | |
| 65 | |
| 66 | def _entry(commit_id: str, op: str = "add", ts: datetime | None = None, |
| 67 | content_id: str = "sha256:abc") -> JSONObject: |
| 68 | return { |
| 69 | "commit_id": commit_id, |
| 70 | "op": op, |
| 71 | "timestamp": _ts(ts or _now()), |
| 72 | "committed_at": _ts(ts or _now()), |
| 73 | "content_id": content_id, |
| 74 | } |
| 75 | |
| 76 | |
| 77 | def _history(**kwargs: list[JSONObject]) -> SymbolHistory: |
| 78 | """Build a symbol_history dict from keyword args: addr=entries.""" |
| 79 | return dict(kwargs) |
| 80 | |
| 81 | |
| 82 | async def _build_index(session: AsyncSession, repo_id: str, head_id: str, |
| 83 | ops: list[JSONObject]) -> "types.SimpleNamespace": |
| 84 | """Insert one commit, build the symbol index, persist results, and return |
| 85 | a namespace with intel_full_json and intel_summary attributes.""" |
| 86 | import types as _types |
| 87 | from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef |
| 88 | from musehub.services.musehub_symbol_indexer import build_symbol_index |
| 89 | from musehub.services.musehub_intel_providers import persist_intel_results |
| 90 | |
| 91 | commit = MusehubCommit( |
| 92 | commit_id=head_id, |
| 93 | branch="main", |
| 94 | parent_ids=[], |
| 95 | message="test commit", |
| 96 | author="gabriel", |
| 97 | timestamp=_now(), |
| 98 | structured_delta={"ops": ops}, |
| 99 | ) |
| 100 | session.add(commit) |
| 101 | session.add(MusehubCommitRef(repo_id=repo_id, commit_id=head_id)) |
| 102 | await session.flush() |
| 103 | results = await build_symbol_index(session, repo_id, head_id) |
| 104 | await persist_intel_results(session, repo_id, head_id, results) |
| 105 | await session.commit() |
| 106 | data_by_type = {t: json.dumps(d) for t, d in results} |
| 107 | return _types.SimpleNamespace( |
| 108 | intel_full_json=data_by_type.get("code.intel_snapshot"), |
| 109 | intel_summary=data_by_type.get("code.intel_summary"), |
| 110 | ) |
| 111 | |
| 112 | |
| 113 | def _insert_op(address: str, content_id: str = "sha256:abc") -> JSONObject: |
| 114 | return {"address": address, "op": "insert", "content_id": content_id} |
| 115 | |
| 116 | |
| 117 | # =========================================================================== |
| 118 | # Layer 1 — Unit tests (pure functions, no DB, no I/O) |
| 119 | # =========================================================================== |
| 120 | |
| 121 | class TestParseTs: |
| 122 | def test_iso_string_utc(self) -> None: |
| 123 | dt = _parse_ts("2025-01-15T10:30:00+00:00") |
| 124 | assert dt.year == 2025 |
| 125 | assert dt.month == 1 |
| 126 | assert dt.tzinfo is not None |
| 127 | |
| 128 | def test_iso_string_z_suffix(self) -> None: |
| 129 | dt = _parse_ts("2025-06-01T00:00:00Z") |
| 130 | assert dt.tzinfo is not None |
| 131 | assert dt.year == 2025 |
| 132 | |
| 133 | def test_unix_int(self) -> None: |
| 134 | dt = _parse_ts(0) |
| 135 | assert dt.year == 1970 |
| 136 | assert dt.tzinfo is not None |
| 137 | |
| 138 | def test_unix_float(self) -> None: |
| 139 | dt = _parse_ts(1_700_000_000.5) |
| 140 | assert dt.year == 2023 |
| 141 | |
| 142 | def test_invalid_string_raises(self) -> None: |
| 143 | with pytest.raises(Exception): |
| 144 | _parse_ts("not-a-date") |
| 145 | |
| 146 | |
| 147 | class TestHealthLabel: |
| 148 | def test_excellent(self) -> None: |
| 149 | assert _health_label(100) == "Excellent" |
| 150 | assert _health_label(90) == "Excellent" |
| 151 | |
| 152 | def test_good(self) -> None: |
| 153 | assert _health_label(89) == "Good" |
| 154 | assert _health_label(75) == "Good" |
| 155 | |
| 156 | def test_fair(self) -> None: |
| 157 | assert _health_label(74) == "Fair" |
| 158 | assert _health_label(55) == "Fair" |
| 159 | |
| 160 | def test_poor(self) -> None: |
| 161 | assert _health_label(54) == "Poor" |
| 162 | assert _health_label(35) == "Poor" |
| 163 | |
| 164 | def test_critical(self) -> None: |
| 165 | assert _health_label(34) == "Critical" |
| 166 | assert _health_label(0) == "Critical" |
| 167 | |
| 168 | |
| 169 | class TestHealthColorClass: |
| 170 | def test_excellent(self) -> None: |
| 171 | assert _health_color_class(90) == "intel-health--excellent" |
| 172 | |
| 173 | def test_good(self) -> None: |
| 174 | assert _health_color_class(75) == "intel-health--good" |
| 175 | |
| 176 | def test_fair(self) -> None: |
| 177 | assert _health_color_class(55) == "intel-health--fair" |
| 178 | |
| 179 | def test_poor(self) -> None: |
| 180 | assert _health_color_class(35) == "intel-health--poor" |
| 181 | |
| 182 | def test_critical(self) -> None: |
| 183 | assert _health_color_class(0) == "intel-health--critical" |
| 184 | |
| 185 | |
| 186 | class TestComputeIntelUnit: |
| 187 | def test_empty_history_returns_zero_score(self) -> None: |
| 188 | snap = compute_intel({}, [], now_utc=_now()) |
| 189 | assert snap.total_symbols == 0 |
| 190 | assert snap.total_commits_indexed == 0 |
| 191 | assert snap.health_score == 100 # no penalties = 100 |
| 192 | assert snap.health_label == "Excellent" |
| 193 | |
| 194 | def test_single_symbol_no_ts(self) -> None: |
| 195 | history = {"file.py::Foo": [{"commit_id": "c1", "op": "add"}]} |
| 196 | snap = compute_intel(history, [], now_utc=_now()) |
| 197 | assert snap.total_symbols == 1 |
| 198 | assert snap.total_commits_indexed == 1 |
| 199 | |
| 200 | def test_hotspot_detection(self) -> None: |
| 201 | # 12 changes on one symbol — exceeds _HOTSPOT_THRESHOLD (10) |
| 202 | entries = [_entry(f"c{i}") for i in range(12)] |
| 203 | history = {"file.py::HotFn": entries} |
| 204 | snap = compute_intel(history, [], now_utc=_now()) |
| 205 | assert snap.alert_hotspot_count >= 1 |
| 206 | assert any(h.address == "file.py::HotFn" for h in snap.hotspots) |
| 207 | |
| 208 | def test_dead_code_detection(self) -> None: |
| 209 | # One old entry, last touched 100 days ago |
| 210 | old_ts = _ago(100) |
| 211 | history = {"file.py::Stale": [_entry("c1", ts=old_ts)]} |
| 212 | snap = compute_intel(history, [], now_utc=_now()) |
| 213 | assert snap.alert_dead_count >= 1 |
| 214 | assert any(d.address == "file.py::Stale" for d in snap.dead_candidates) |
| 215 | |
| 216 | def test_recent_symbol_not_dead(self) -> None: |
| 217 | recent_ts = _ago(5) |
| 218 | history = {"file.py::Fresh": [_entry("c1", ts=recent_ts)]} |
| 219 | snap = compute_intel(history, [], now_utc=_now()) |
| 220 | assert snap.alert_dead_count == 0 |
| 221 | |
| 222 | def test_blast_risk_co_change(self) -> None: |
| 223 | # Two symbols always change together → blast risk for both |
| 224 | entries_a = [_entry("c1"), _entry("c2")] |
| 225 | entries_b = [_entry("c1"), _entry("c2")] |
| 226 | history = { |
| 227 | "file.py::Alpha": entries_a, |
| 228 | "file.py::Beta": entries_b, |
| 229 | } |
| 230 | snap = compute_intel(history, [], now_utc=_now()) |
| 231 | # Both are co-changed — blast risk entries should include at least one |
| 232 | assert len(snap.blast_risk) >= 1 |
| 233 | |
| 234 | def test_coupling_pairs_detected(self) -> None: |
| 235 | # Symbols sharing same commit → coupling pair |
| 236 | entries_a = [_entry("shared-commit")] |
| 237 | entries_b = [_entry("shared-commit")] |
| 238 | history = { |
| 239 | "file.py::A": entries_a, |
| 240 | "file.py::B": entries_b, |
| 241 | } |
| 242 | snap = compute_intel(history, [], now_utc=_now()) |
| 243 | assert len(snap.coupling_pairs) >= 1 |
| 244 | pair = snap.coupling_pairs[0] |
| 245 | assert pair.shared_commits >= 1 |
| 246 | |
| 247 | def test_breaking_changes_reduce_score(self) -> None: |
| 248 | snap_no_breaks = compute_intel({}, [], now_utc=_now()) |
| 249 | snap_with_breaks = compute_intel({}, ["break1", "break2", "break3"], now_utc=_now()) |
| 250 | assert snap_with_breaks.health_score < snap_no_breaks.health_score |
| 251 | assert snap_with_breaks.alert_breaking_count == 3 |
| 252 | |
| 253 | def test_velocity_buckets_populated(self) -> None: |
| 254 | recent = _ago(days=1) |
| 255 | history = {"file.py::Fn": [_entry("c1", ts=recent)]} |
| 256 | snap = compute_intel(history, [], now_utc=_now()) |
| 257 | assert len(snap.velocity.weeks) == 12 |
| 258 | assert snap.velocity.weeks[0] >= 1 # most recent week bucket |
| 259 | |
| 260 | def test_health_score_capped_at_100(self) -> None: |
| 261 | snap = compute_intel({}, [], now_utc=_now()) |
| 262 | assert 0 <= snap.health_score <= 100 |
| 263 | |
| 264 | def test_top_n_hotspots_limit(self) -> None: |
| 265 | # 20 symbols each changed 15 times → _TOP_N=10 returned |
| 266 | history: SymbolHistory = {} |
| 267 | for i in range(20): |
| 268 | history[f"file.py::Fn{i}"] = [_entry(f"c{i}_{j}") for j in range(15)] |
| 269 | snap = compute_intel(history, [], now_utc=_now()) |
| 270 | assert len(snap.hotspots) <= 10 |
| 271 | |
| 272 | def test_dead_candidates_sorted_by_coldest_first(self) -> None: |
| 273 | h = { |
| 274 | "file.py::Old": [_entry("c1", ts=_ago(200))], |
| 275 | "file.py::Older": [_entry("c2", ts=_ago(300))], |
| 276 | } |
| 277 | snap = compute_intel(h, [], now_utc=_now()) |
| 278 | if len(snap.dead_candidates) >= 2: |
| 279 | assert snap.dead_candidates[0].days_cold >= snap.dead_candidates[1].days_cold |
| 280 | |
| 281 | def test_timestamp_invalid_gracefully_ignored(self) -> None: |
| 282 | history = { |
| 283 | "file.py::BadTs": [{"commit_id": "c1", "op": "add", "timestamp": "NOT_A_DATE"}] |
| 284 | } |
| 285 | snap = compute_intel(history, [], now_utc=_now()) |
| 286 | # Should not raise; symbol counted but ts ignored |
| 287 | assert snap.total_symbols == 1 |
| 288 | |
| 289 | |
| 290 | class TestIntelSnapshotSerialisation: |
| 291 | def _make_snap(self) -> IntelSnapshot: |
| 292 | return IntelSnapshot( |
| 293 | health_score=80, |
| 294 | health_label="Good", |
| 295 | alert_hotspot_count=2, |
| 296 | alert_dead_count=1, |
| 297 | alert_blast_risk_count=3, |
| 298 | alert_breaking_count=0, |
| 299 | hotspots=[HotspotEntry(address="a.py::Fn", change_count=15, last_changed=None)], |
| 300 | dead_candidates=[DeadEntry(address="b.py::Old", days_cold=120, blast_radius=0, added_at=None)], |
| 301 | blast_risk=[BlastRiskEntry(address="c.py::Risk", co_change_count=25, top_co_symbols=["d.py::X"])], |
| 302 | coupling_pairs=[CouplingPair(address_a="a.py::F", address_b="b.py::G", shared_commits=5)], |
| 303 | velocity=VelocityWindow(weeks=[1, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0]), |
| 304 | total_symbols=50, |
| 305 | total_commits_indexed=10, |
| 306 | ) |
| 307 | |
| 308 | def test_as_dict_round_trip(self) -> None: |
| 309 | snap = self._make_snap() |
| 310 | d = snap.as_dict() |
| 311 | reconstructed = IntelSnapshot.from_dict(d) |
| 312 | assert reconstructed.health_score == 80 |
| 313 | assert reconstructed.health_label == "Good" |
| 314 | assert reconstructed.total_symbols == 50 |
| 315 | assert reconstructed.hotspots[0].address == "a.py::Fn" |
| 316 | assert reconstructed.dead_candidates[0].days_cold == 120 |
| 317 | assert reconstructed.blast_risk[0].co_change_count == 25 |
| 318 | assert reconstructed.coupling_pairs[0].shared_commits == 5 |
| 319 | assert reconstructed.velocity.weeks[0] == 1 |
| 320 | |
| 321 | def test_as_dict_json_serialisable(self) -> None: |
| 322 | snap = self._make_snap() |
| 323 | d = snap.as_dict() |
| 324 | # Must be JSON-serialisable (no datetimes, no custom objects) |
| 325 | json_str = json.dumps(d) |
| 326 | assert "health_score" in json_str |
| 327 | |
| 328 | def test_from_dict_missing_optional_fields(self) -> None: |
| 329 | minimal = { |
| 330 | "health_score": 70, |
| 331 | "health_label": "Fair", |
| 332 | "alert_hotspot_count": 0, |
| 333 | "alert_dead_count": 0, |
| 334 | "alert_blast_risk_count": 0, |
| 335 | "alert_breaking_count": 0, |
| 336 | "total_symbols": 0, |
| 337 | "total_commits_indexed": 0, |
| 338 | } |
| 339 | snap = IntelSnapshot.from_dict(minimal) |
| 340 | assert snap.hotspots == [] |
| 341 | assert snap.dead_candidates == [] |
| 342 | assert snap.coupling_pairs == [] |
| 343 | assert snap.velocity.weeks == [] |
| 344 | |
| 345 | |
| 346 | class TestModulePrefix: |
| 347 | def test_three_segments(self) -> None: |
| 348 | from musehub.services.musehub_cross_repo import _module_prefix |
| 349 | result = _module_prefix("musehub.services.musehub_ci.enqueue_run") |
| 350 | assert result == "musehub.services.musehub_ci" |
| 351 | |
| 352 | def test_fewer_than_depth(self) -> None: |
| 353 | from musehub.services.musehub_cross_repo import _module_prefix |
| 354 | result = _module_prefix("a.b") |
| 355 | assert result == "a.b" # shorter than depth=3, returns as-is |
| 356 | |
| 357 | def test_exactly_depth(self) -> None: |
| 358 | from musehub.services.musehub_cross_repo import _module_prefix |
| 359 | result = _module_prefix("a.b.c") |
| 360 | assert result == "a.b.c" |
| 361 | |
| 362 | def test_custom_depth(self) -> None: |
| 363 | from musehub.services.musehub_cross_repo import _module_prefix |
| 364 | result = _module_prefix("a.b.c.d.e", depth=2) |
| 365 | assert result == "a.b" |
| 366 | |
| 367 | |
| 368 | class TestShortLabel: |
| 369 | def test_two_segments(self) -> None: |
| 370 | from musehub.services.musehub_cross_repo import _short_label |
| 371 | assert _short_label("a.b.c") == "b.c" |
| 372 | |
| 373 | def test_single_segment(self) -> None: |
| 374 | from musehub.services.musehub_cross_repo import _short_label |
| 375 | assert _short_label("single") == "single" |
| 376 | |
| 377 | |
| 378 | class TestBuildRealSymbolBlame: |
| 379 | def test_filters_to_path(self) -> None: |
| 380 | from musehub.api.routes.musehub.blame import _build_real_symbol_blame |
| 381 | |
| 382 | history = { |
| 383 | "musehub/api.py::Foo": [_entry("c1")], |
| 384 | "other/file.py::Bar": [_entry("c2")], |
| 385 | } |
| 386 | commit_map = { |
| 387 | "c1": {"message": "add Foo", "author": "gabriel", "timestamp": _now()}, |
| 388 | } |
| 389 | results = _build_real_symbol_blame(history, "musehub/api.py", commit_map) |
| 390 | assert len(results) == 1 |
| 391 | assert results[0].symbol_name == "Foo" |
| 392 | |
| 393 | def test_excludes_import_declarations(self) -> None: |
| 394 | from musehub.api.routes.musehub.blame import _build_real_symbol_blame |
| 395 | |
| 396 | history = { |
| 397 | "file.py::import::os": [_entry("c1")], |
| 398 | "file.py::MyFn": [_entry("c1")], |
| 399 | } |
| 400 | commit_map = {"c1": {"message": "m", "author": "g", "timestamp": _now()}} |
| 401 | results = _build_real_symbol_blame(history, "file.py", commit_map) |
| 402 | names = [r.symbol_name for r in results] |
| 403 | assert "MyFn" in names |
| 404 | assert "import::os" not in names |
| 405 | |
| 406 | def test_excludes_deleted_symbols(self) -> None: |
| 407 | from musehub.api.routes.musehub.blame import _build_real_symbol_blame |
| 408 | |
| 409 | history = { |
| 410 | "file.py::Gone": [_entry("c1", op="delete")], |
| 411 | "file.py::Here": [_entry("c2", op="add")], |
| 412 | } |
| 413 | commit_map = { |
| 414 | "c1": {"message": "del", "author": "g", "timestamp": _now()}, |
| 415 | "c2": {"message": "add", "author": "g", "timestamp": _now()}, |
| 416 | } |
| 417 | results = _build_real_symbol_blame(history, "file.py", commit_map) |
| 418 | names = [r.symbol_name for r in results] |
| 419 | assert "Gone" not in names |
| 420 | assert "Here" in names |
| 421 | |
| 422 | def test_intel_signals_populated(self) -> None: |
| 423 | from musehub.api.routes.musehub.blame import _build_real_symbol_blame |
| 424 | |
| 425 | history = { |
| 426 | "file.py::HotFn": [_entry("c1")], |
| 427 | } |
| 428 | commit_map = {"c1": {"message": "m", "author": "g", "timestamp": _now()}} |
| 429 | intel = compute_intel( |
| 430 | {"file.py::HotFn": [_entry(f"c{i}") for i in range(15)]}, |
| 431 | [], |
| 432 | now_utc=_now(), |
| 433 | ) |
| 434 | results = _build_real_symbol_blame(history, "file.py", commit_map, intel=intel) |
| 435 | assert len(results) == 1 |
| 436 | assert results[0].is_hotspot is True |
| 437 | |
| 438 | def test_change_count_reflects_history_length(self) -> None: |
| 439 | from musehub.api.routes.musehub.blame import _build_real_symbol_blame |
| 440 | |
| 441 | history = { |
| 442 | "file.py::Changed": [_entry("c1"), _entry("c2"), _entry("c3")], |
| 443 | } |
| 444 | commit_map = { |
| 445 | "c1": {"message": "m", "author": "g", "timestamp": _now()}, |
| 446 | "c2": {"message": "m", "author": "g", "timestamp": _now()}, |
| 447 | "c3": {"message": "m", "author": "g", "timestamp": _now()}, |
| 448 | } |
| 449 | results = _build_real_symbol_blame(history, "file.py", commit_map) |
| 450 | assert results[0].change_count == 3 |
| 451 | |
| 452 | def test_empty_history_returns_empty_list(self) -> None: |
| 453 | from musehub.api.routes.musehub.blame import _build_real_symbol_blame |
| 454 | |
| 455 | results = _build_real_symbol_blame({}, "file.py", {}) |
| 456 | assert results == [] |
| 457 | |
| 458 | def test_unknown_commit_id_falls_back_gracefully(self) -> None: |
| 459 | from musehub.api.routes.musehub.blame import _build_real_symbol_blame |
| 460 | |
| 461 | history = {"file.py::Fn": [_entry("unknown-commit")]} |
| 462 | results = _build_real_symbol_blame(history, "file.py", {}) |
| 463 | assert len(results) == 1 |
| 464 | assert results[0].author == "" |
| 465 | assert results[0].commit_message == "" |
| 466 | |
| 467 | |
| 468 | # =========================================================================== |
| 469 | # Layer 2 — Integration tests (real DB, service layer, no HTTP) |
| 470 | # =========================================================================== |
| 471 | |
| 472 | class TestComputeIntelIntegration: |
| 473 | @pytest.mark.asyncio |
| 474 | async def test_load_intel_snapshot_none_when_no_index( |
| 475 | self, db_session: AsyncSession |
| 476 | ) -> None: |
| 477 | from musehub.services.musehub_symbol_indexer import load_intel_snapshot |
| 478 | |
| 479 | repo = await create_repo(db_session, slug="intel-no-index") |
| 480 | result = await load_intel_snapshot(db_session, repo.repo_id) |
| 481 | assert result is None |
| 482 | |
| 483 | @pytest.mark.asyncio |
| 484 | async def test_build_index_populates_intel_full_json( |
| 485 | self, db_session: AsyncSession |
| 486 | ) -> None: |
| 487 | from musehub.services.musehub_symbol_indexer import load_intel_snapshot |
| 488 | |
| 489 | repo = await create_repo(db_session, slug="intel-populated") |
| 490 | ops = [_insert_op("src/main.py::run"), _insert_op("src/main.py::setup")] |
| 491 | row = await _build_index(db_session, repo.repo_id, "head-intel-1", ops) |
| 492 | assert row is not None |
| 493 | assert row.intel_full_json is not None |
| 494 | |
| 495 | snap = await load_intel_snapshot(db_session, repo.repo_id) |
| 496 | assert snap is not None |
| 497 | assert snap.total_symbols == 2 |
| 498 | |
| 499 | @pytest.mark.asyncio |
| 500 | async def test_intel_health_score_range( |
| 501 | self, db_session: AsyncSession |
| 502 | ) -> None: |
| 503 | from musehub.services.musehub_symbol_indexer import load_intel_snapshot |
| 504 | |
| 505 | repo = await create_repo(db_session, slug="intel-health-range") |
| 506 | ops = [_insert_op(f"src/f.py::Fn{i}") for i in range(5)] |
| 507 | await _build_index(db_session, repo.repo_id, "head-hr", ops) |
| 508 | |
| 509 | snap = await load_intel_snapshot(db_session, repo.repo_id) |
| 510 | assert snap is not None |
| 511 | assert 0 <= snap.health_score <= 100 |
| 512 | |
| 513 | @pytest.mark.asyncio |
| 514 | async def test_intel_summary_json_fields( |
| 515 | self, db_session: AsyncSession |
| 516 | ) -> None: |
| 517 | repo = await create_repo(db_session, slug="intel-summary-fields") |
| 518 | ops = [_insert_op("api.py::endpoint")] |
| 519 | row = await _build_index(db_session, repo.repo_id, "head-summ", ops) |
| 520 | assert row is not None |
| 521 | assert row.intel_summary is not None |
| 522 | summary = json.loads(row.intel_summary) |
| 523 | assert "health_score" in summary |
| 524 | assert "symbol_count" in summary |
| 525 | assert "hotspot_count" in summary |
| 526 | assert "dead_symbol_count" in summary |
| 527 | |
| 528 | |
| 529 | class TestBlameIntegration: |
| 530 | @pytest.mark.asyncio |
| 531 | async def test_blame_returns_empty_when_no_index( |
| 532 | self, db_session: AsyncSession |
| 533 | ) -> None: |
| 534 | from musehub.services.musehub_symbol_indexer import load_symbol_history |
| 535 | from musehub.api.routes.musehub.blame import _build_real_symbol_blame |
| 536 | |
| 537 | repo = await create_repo(db_session, slug="blame-no-idx") |
| 538 | history = await load_symbol_history(db_session, repo.repo_id, file_path="file.py") |
| 539 | results = _build_real_symbol_blame(history, "file.py", {}) |
| 540 | assert results == [] |
| 541 | |
| 542 | @pytest.mark.asyncio |
| 543 | async def test_blame_entries_after_index_build( |
| 544 | self, db_session: AsyncSession |
| 545 | ) -> None: |
| 546 | from musehub.services.musehub_symbol_indexer import load_symbol_history |
| 547 | from musehub.api.routes.musehub.blame import _build_real_symbol_blame |
| 548 | |
| 549 | repo = await create_repo(db_session, slug="blame-with-idx") |
| 550 | ops = [ |
| 551 | _insert_op("src/api.py::handle_request"), |
| 552 | _insert_op("src/api.py::parse_args"), |
| 553 | ] |
| 554 | await _build_index(db_session, repo.repo_id, "head-blame", ops) |
| 555 | |
| 556 | history = await load_symbol_history( |
| 557 | db_session, repo.repo_id, file_path="src/api.py" |
| 558 | ) |
| 559 | results = _build_real_symbol_blame(history, "src/api.py", {}) |
| 560 | names = [r.symbol_name for r in results] |
| 561 | assert "handle_request" in names |
| 562 | assert "parse_args" in names |
| 563 | |
| 564 | |
| 565 | class TestCrossRepoIntegration: |
| 566 | @pytest.mark.asyncio |
| 567 | async def test_search_symbol_no_repos( |
| 568 | self, db_session: AsyncSession |
| 569 | ) -> None: |
| 570 | from musehub.services.musehub_cross_repo import search_symbol_across_repos |
| 571 | |
| 572 | result = await search_symbol_across_repos( |
| 573 | db_session, "ghost-owner", "Fn", visible_to_user="ghost-owner" |
| 574 | ) |
| 575 | assert result == [] |
| 576 | |
| 577 | @pytest.mark.asyncio |
| 578 | async def test_search_symbol_finds_match( |
| 579 | self, db_session: AsyncSession |
| 580 | ) -> None: |
| 581 | from musehub.services.musehub_cross_repo import search_symbol_across_repos |
| 582 | |
| 583 | owner = f"owner-{secrets.token_hex(4)}" |
| 584 | repo = await create_repo(db_session, slug="search-sym-repo", owner=owner, |
| 585 | visibility="public") |
| 586 | ops = [_insert_op("api.py::compute_intel")] |
| 587 | await _build_index(db_session, repo.repo_id, "head-search", ops) |
| 588 | |
| 589 | results = await search_symbol_across_repos( |
| 590 | db_session, owner, "compute_intel", visible_to_user=owner |
| 591 | ) |
| 592 | assert len(results) >= 1 |
| 593 | assert any("compute_intel" in r.address for r in results) |
| 594 | |
| 595 | @pytest.mark.asyncio |
| 596 | async def test_search_symbol_case_insensitive( |
| 597 | self, db_session: AsyncSession |
| 598 | ) -> None: |
| 599 | from musehub.services.musehub_cross_repo import search_symbol_across_repos |
| 600 | |
| 601 | owner = f"owner-{secrets.token_hex(4)}" |
| 602 | repo = await create_repo(db_session, slug="search-case-repo", owner=owner, |
| 603 | visibility="public") |
| 604 | ops = [_insert_op("api.py::MyFunction")] |
| 605 | await _build_index(db_session, repo.repo_id, "head-case", ops) |
| 606 | |
| 607 | results = await search_symbol_across_repos( |
| 608 | db_session, owner, "myfunction", visible_to_user=owner |
| 609 | ) |
| 610 | assert any("MyFunction" in r.address for r in results) |
| 611 | |
| 612 | @pytest.mark.asyncio |
| 613 | async def test_search_symbol_private_repo_excluded_without_auth( |
| 614 | self, db_session: AsyncSession |
| 615 | ) -> None: |
| 616 | from musehub.services.musehub_cross_repo import search_symbol_across_repos |
| 617 | |
| 618 | owner = f"owner-{secrets.token_hex(4)}" |
| 619 | repo = await create_repo(db_session, slug="search-private-repo", owner=owner, |
| 620 | visibility="private") |
| 621 | ops = [_insert_op("api.py::SecretFn")] |
| 622 | await _build_index(db_session, repo.repo_id, "head-priv", ops) |
| 623 | |
| 624 | # visible_to_user=None → only public repos |
| 625 | results = await search_symbol_across_repos( |
| 626 | db_session, owner, "SecretFn", visible_to_user=None |
| 627 | ) |
| 628 | assert not any("SecretFn" in r.address for r in results) |
| 629 | |
| 630 | @pytest.mark.asyncio |
| 631 | async def test_workspace_blast_risk_empty( |
| 632 | self, db_session: AsyncSession |
| 633 | ) -> None: |
| 634 | from musehub.services.musehub_cross_repo import workspace_blast_risk_top_n |
| 635 | |
| 636 | result = await workspace_blast_risk_top_n( |
| 637 | db_session, "nonexistent-owner", visible_to_user="nonexistent-owner" |
| 638 | ) |
| 639 | assert result == [] |
| 640 | |
| 641 | @pytest.mark.asyncio |
| 642 | async def test_workspace_blast_risk_populated( |
| 643 | self, db_session: AsyncSession |
| 644 | ) -> None: |
| 645 | from musehub.services.musehub_cross_repo import workspace_blast_risk_top_n |
| 646 | |
| 647 | owner = f"owner-{secrets.token_hex(4)}" |
| 648 | repo = await create_repo(db_session, slug="wbr-repo", owner=owner, |
| 649 | visibility="public") |
| 650 | ops = [_insert_op("a.py::Fn"), _insert_op("b.py::Gn")] |
| 651 | await _build_index(db_session, repo.repo_id, "head-wbr", ops) |
| 652 | |
| 653 | results = await workspace_blast_risk_top_n( |
| 654 | db_session, owner, visible_to_user=owner |
| 655 | ) |
| 656 | assert len(results) >= 2 |
| 657 | # Sorted by co_change_count descending |
| 658 | for i in range(len(results) - 1): |
| 659 | assert results[i].co_change_count >= results[i + 1].co_change_count |
| 660 | |
| 661 | @pytest.mark.asyncio |
| 662 | async def test_cross_repo_impact_no_source_repo( |
| 663 | self, db_session: AsyncSession |
| 664 | ) -> None: |
| 665 | from musehub.services.musehub_cross_repo import cross_repo_impact |
| 666 | |
| 667 | result = await cross_repo_impact( |
| 668 | db_session, "ghost-owner", secrets.token_hex(16), "file.py::Fn", |
| 669 | visible_to_user="ghost-owner", |
| 670 | ) |
| 671 | assert result is None |
| 672 | |
| 673 | @pytest.mark.asyncio |
| 674 | async def test_cross_repo_impact_unknown_address( |
| 675 | self, db_session: AsyncSession |
| 676 | ) -> None: |
| 677 | from musehub.services.musehub_cross_repo import cross_repo_impact |
| 678 | |
| 679 | owner = f"owner-{secrets.token_hex(4)}" |
| 680 | repo = await create_repo(db_session, slug="cri-unknown", owner=owner, |
| 681 | visibility="public") |
| 682 | ops = [_insert_op("a.py::KnownFn")] |
| 683 | await _build_index(db_session, repo.repo_id, "head-cri", ops) |
| 684 | |
| 685 | result = await cross_repo_impact( |
| 686 | db_session, owner, repo.repo_id, "a.py::NonExistent", |
| 687 | visible_to_user=owner, |
| 688 | ) |
| 689 | assert result is None |
| 690 | |
| 691 | @pytest.mark.asyncio |
| 692 | async def test_build_deps_graph_single_repo( |
| 693 | self, db_session: AsyncSession |
| 694 | ) -> None: |
| 695 | from musehub.services.musehub_cross_repo import build_deps_graph |
| 696 | |
| 697 | owner = f"owner-{secrets.token_hex(4)}" |
| 698 | repo = await create_repo(db_session, slug="deps-single", owner=owner, |
| 699 | visibility="public") |
| 700 | ops = [ |
| 701 | _insert_op("a.b.c.Fn"), |
| 702 | _insert_op("a.b.d.Gn"), |
| 703 | ] |
| 704 | await _build_index(db_session, repo.repo_id, "head-deps", ops) |
| 705 | |
| 706 | graph = await build_deps_graph( |
| 707 | db_session, owner, repo.repo_id, visible_to_user=owner |
| 708 | ) |
| 709 | assert hasattr(graph, "nodes") |
| 710 | assert hasattr(graph, "edges") |
| 711 | |
| 712 | @pytest.mark.asyncio |
| 713 | async def test_build_deps_graph_no_source_repo_returns_empty( |
| 714 | self, db_session: AsyncSession |
| 715 | ) -> None: |
| 716 | from musehub.services.musehub_cross_repo import build_deps_graph, DepsGraph |
| 717 | |
| 718 | owner = f"owner-{secrets.token_hex(4)}" |
| 719 | graph = await build_deps_graph( |
| 720 | db_session, owner, secrets.token_hex(16), visible_to_user=owner |
| 721 | ) |
| 722 | assert isinstance(graph, DepsGraph) |
| 723 | |
| 724 | |
| 725 | # =========================================================================== |
| 726 | # Layer 3 — End-to-End tests (full HTTP via AsyncClient, real DB) |
| 727 | # =========================================================================== |
| 728 | |
| 729 | class TestBlameEndToEnd: |
| 730 | @pytest.mark.asyncio |
| 731 | async def test_blame_404_unknown_repo( |
| 732 | self, client: AsyncClient, db_session: AsyncSession |
| 733 | ) -> None: |
| 734 | resp = await client.get( |
| 735 | f"/api/repos/{secrets.token_hex(16)}/blame/HEAD", |
| 736 | params={"path": "file.py"}, |
| 737 | ) |
| 738 | assert resp.status_code == 404 |
| 739 | |
| 740 | @pytest.mark.asyncio |
| 741 | async def test_blame_public_repo_no_auth( |
| 742 | self, client: AsyncClient, db_session: AsyncSession |
| 743 | ) -> None: |
| 744 | repo = await create_repo(db_session, slug="blame-e2e-pub", visibility="public") |
| 745 | await db_session.commit() |
| 746 | resp = await client.get( |
| 747 | f"/api/repos/{repo.repo_id}/blame/HEAD", |
| 748 | params={"path": "file.py"}, |
| 749 | ) |
| 750 | assert resp.status_code == 200 |
| 751 | data = resp.json() |
| 752 | assert "entries" in data |
| 753 | assert "totalEntries" in data |
| 754 | assert "path" in data |
| 755 | |
| 756 | @pytest.mark.asyncio |
| 757 | async def test_blame_private_repo_requires_auth( |
| 758 | self, client: AsyncClient, db_session: AsyncSession |
| 759 | ) -> None: |
| 760 | repo = await create_repo(db_session, slug="blame-e2e-priv", visibility="private") |
| 761 | await db_session.commit() |
| 762 | resp = await client.get( |
| 763 | f"/api/repos/{repo.repo_id}/blame/HEAD", |
| 764 | params={"path": "file.py"}, |
| 765 | ) |
| 766 | assert resp.status_code == 401 |
| 767 | |
| 768 | @pytest.mark.asyncio |
| 769 | async def test_blame_returns_entries_after_index_build( |
| 770 | self, client: AsyncClient, db_session: AsyncSession |
| 771 | ) -> None: |
| 772 | repo = await create_repo(db_session, slug="blame-e2e-entries", visibility="public") |
| 773 | ops = [_insert_op("api/routes.py::dispatch"), _insert_op("api/routes.py::validate")] |
| 774 | await _build_index(db_session, repo.repo_id, "head-blame-e2e", ops) |
| 775 | |
| 776 | resp = await client.get( |
| 777 | f"/api/repos/{repo.repo_id}/blame/HEAD", |
| 778 | params={"path": "api/routes.py"}, |
| 779 | ) |
| 780 | assert resp.status_code == 200 |
| 781 | data = resp.json() |
| 782 | names = [e["symbolName"] for e in data["entries"]] |
| 783 | assert "dispatch" in names |
| 784 | assert "validate" in names |
| 785 | |
| 786 | @pytest.mark.asyncio |
| 787 | async def test_blame_path_filter_respected( |
| 788 | self, client: AsyncClient, db_session: AsyncSession |
| 789 | ) -> None: |
| 790 | repo = await create_repo(db_session, slug="blame-e2e-filter", visibility="public") |
| 791 | ops = [ |
| 792 | _insert_op("path/a.py::FnA"), |
| 793 | _insert_op("path/b.py::FnB"), |
| 794 | ] |
| 795 | await _build_index(db_session, repo.repo_id, "head-filter", ops) |
| 796 | |
| 797 | resp = await client.get( |
| 798 | f"/api/repos/{repo.repo_id}/blame/HEAD", |
| 799 | params={"path": "path/a.py"}, |
| 800 | ) |
| 801 | assert resp.status_code == 200 |
| 802 | data = resp.json() |
| 803 | names = [e["symbolName"] for e in data["entries"]] |
| 804 | assert "FnA" in names |
| 805 | assert "FnB" not in names |
| 806 | |
| 807 | @pytest.mark.asyncio |
| 808 | async def test_symbol_index_rebuild_endpoint( |
| 809 | self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict |
| 810 | ) -> None: |
| 811 | from musehub.db.musehub_repo_models import MusehubBranch as _Branch, MusehubCommit as _Commit, MusehubCommitRef as _CommitRef |
| 812 | |
| 813 | repo = await create_repo(db_session, slug="rebuild-e2e") |
| 814 | # Create a head commit on "main" |
| 815 | commit = _Commit( |
| 816 | commit_id="rebuild-head", |
| 817 | branch="main", |
| 818 | parent_ids=[], |
| 819 | message="initial", |
| 820 | author="gabriel", |
| 821 | timestamp=_now(), |
| 822 | structured_delta={"ops": [_insert_op("x.py::Fn")]}, |
| 823 | ) |
| 824 | db_branch = _Branch( |
| 825 | branch_id=secrets.token_hex(16), |
| 826 | repo_id=repo.repo_id, |
| 827 | name="main", |
| 828 | head_commit_id="rebuild-head", |
| 829 | ) |
| 830 | db_session.add(commit) |
| 831 | db_session.add(_CommitRef(repo_id=repo.repo_id, commit_id="rebuild-head")) |
| 832 | db_session.add(db_branch) |
| 833 | await db_session.commit() |
| 834 | |
| 835 | resp = await client.post( |
| 836 | f"/api/repos/{repo.repo_id}/symbol-index/rebuild", |
| 837 | headers=auth_headers, |
| 838 | ) |
| 839 | assert resp.status_code in (200, 202) |
| 840 | |
| 841 | @pytest.mark.asyncio |
| 842 | async def test_symbol_index_rebuild_requires_auth( |
| 843 | self, client: AsyncClient, db_session: AsyncSession |
| 844 | ) -> None: |
| 845 | repo = await create_repo(db_session, slug="rebuild-noauth") |
| 846 | await db_session.commit() |
| 847 | resp = await client.post(f"/api/repos/{repo.repo_id}/symbol-index/rebuild") |
| 848 | assert resp.status_code == 401 |
| 849 | |
| 850 | |
| 851 | # =========================================================================== |
| 852 | # Layer 4 — Stress tests |
| 853 | # =========================================================================== |
| 854 | |
| 855 | class TestStress: |
| 856 | def test_compute_intel_1000_symbols(self) -> None: |
| 857 | """compute_intel on 1000 symbols completes without error.""" |
| 858 | history: SymbolHistory = {} |
| 859 | for i in range(1000): |
| 860 | ts = _ago(days=i % 200) |
| 861 | history[f"module/file_{i % 20}.py::Fn{i}"] = [ |
| 862 | _entry(f"c{i}", ts=ts) |
| 863 | ] |
| 864 | snap = compute_intel(history, [], now_utc=_now()) |
| 865 | assert snap.total_symbols == 1000 |
| 866 | assert 0 <= snap.health_score <= 100 |
| 867 | |
| 868 | def test_compute_intel_many_co_changing_symbols(self) -> None: |
| 869 | """50 symbols all sharing the same commit — coupling matrix stays bounded.""" |
| 870 | commit_id = "shared" |
| 871 | history = { |
| 872 | f"file.py::Fn{i}": [_entry(commit_id)] for i in range(50) |
| 873 | } |
| 874 | snap = compute_intel(history, [], now_utc=_now()) |
| 875 | # _TOP_COUPLING=5 cap must be respected |
| 876 | assert len(snap.coupling_pairs) <= 5 |
| 877 | |
| 878 | @pytest.mark.asyncio |
| 879 | async def test_search_symbol_across_10_repos( |
| 880 | self, db_session: AsyncSession |
| 881 | ) -> None: |
| 882 | """Search across 10 repos each with 20 symbols.""" |
| 883 | from musehub.services.musehub_cross_repo import search_symbol_across_repos |
| 884 | |
| 885 | owner = f"stress-owner-{secrets.token_hex(3)}" |
| 886 | for i in range(10): |
| 887 | repo = await create_repo( |
| 888 | db_session, slug=f"stress-repo-{i}", owner=owner, visibility="public" |
| 889 | ) |
| 890 | ops = [_insert_op(f"mod{j}.py::TargetFn{j}") for j in range(20)] |
| 891 | await _build_index(db_session, repo.repo_id, f"head-stress-{i}", ops) |
| 892 | |
| 893 | results = await search_symbol_across_repos( |
| 894 | db_session, owner, "TargetFn", visible_to_user=owner, limit=50 |
| 895 | ) |
| 896 | assert len(results) >= 1 |
| 897 | |
| 898 | @pytest.mark.asyncio |
| 899 | async def test_workspace_blast_risk_across_5_repos( |
| 900 | self, db_session: AsyncSession |
| 901 | ) -> None: |
| 902 | from musehub.services.musehub_cross_repo import workspace_blast_risk_top_n |
| 903 | |
| 904 | owner = f"wbr-owner-{secrets.token_hex(3)}" |
| 905 | for i in range(5): |
| 906 | repo = await create_repo( |
| 907 | db_session, slug=f"wbr-sr-{i}", owner=owner, visibility="public" |
| 908 | ) |
| 909 | ops = [_insert_op(f"f{j}.py::Fn{j}") for j in range(10)] |
| 910 | await _build_index(db_session, repo.repo_id, f"head-wbr-{i}", ops) |
| 911 | |
| 912 | results = await workspace_blast_risk_top_n( |
| 913 | db_session, owner, top_n=20, visible_to_user=owner |
| 914 | ) |
| 915 | # 5 repos × 10 symbols each = 50 entries, capped at top_n=20 |
| 916 | assert len(results) <= 20 |
| 917 | assert len(results) >= 1 |
| 918 | |
| 919 | def test_blame_build_500_symbols(self) -> None: |
| 920 | """_build_real_symbol_blame with 500 symbols in one file stays fast.""" |
| 921 | from musehub.api.routes.musehub.blame import _build_real_symbol_blame |
| 922 | |
| 923 | history = {f"big/file.py::Fn{i}": [_entry(f"c{i}")] for i in range(500)} |
| 924 | commit_map = {f"c{i}": {"message": "m", "author": "g", "timestamp": _now()} |
| 925 | for i in range(500)} |
| 926 | results = _build_real_symbol_blame(history, "big/file.py", commit_map) |
| 927 | assert len(results) == 500 |
| 928 | |
| 929 | |
| 930 | # =========================================================================== |
| 931 | # Layer 5 — Data Integrity tests |
| 932 | # =========================================================================== |
| 933 | |
| 934 | class TestDataIntegrity: |
| 935 | def test_intel_snapshot_as_dict_from_dict_identity(self) -> None: |
| 936 | """Round-trip through as_dict/from_dict is lossless for all fields.""" |
| 937 | snap = compute_intel( |
| 938 | { |
| 939 | "file.py::Fn": [_entry(f"c{i}") for i in range(15)], |
| 940 | "file.py::Old": [_entry("co", ts=_ago(150))], |
| 941 | }, |
| 942 | ["breaking1"], |
| 943 | now_utc=_now(), |
| 944 | ) |
| 945 | d = snap.as_dict() |
| 946 | reconstructed = IntelSnapshot.from_dict(d) |
| 947 | assert reconstructed.health_score == snap.health_score |
| 948 | assert reconstructed.alert_hotspot_count == snap.alert_hotspot_count |
| 949 | assert reconstructed.alert_dead_count == snap.alert_dead_count |
| 950 | assert reconstructed.alert_breaking_count == snap.alert_breaking_count |
| 951 | assert len(reconstructed.hotspots) == len(snap.hotspots) |
| 952 | |
| 953 | @pytest.mark.asyncio |
| 954 | async def test_intel_full_json_stored_and_retrievable( |
| 955 | self, db_session: AsyncSession |
| 956 | ) -> None: |
| 957 | from musehub.services.musehub_symbol_indexer import load_intel_snapshot |
| 958 | |
| 959 | repo = await create_repo(db_session, slug="di-intel-json") |
| 960 | ops = [_insert_op("svc.py::do_work", "sha256:beef")] |
| 961 | row = await _build_index(db_session, repo.repo_id, "head-di", ops) |
| 962 | |
| 963 | assert row.intel_full_json is not None |
| 964 | snap = await load_intel_snapshot(db_session, repo.repo_id) |
| 965 | assert snap is not None |
| 966 | assert snap.total_symbols == 1 |
| 967 | hotspot_addrs = [h.address for h in snap.hotspots] |
| 968 | # Address must be present in symbol set |
| 969 | all_in_dict = json.loads(row.intel_full_json) |
| 970 | assert all_in_dict["total_symbols"] == 1 |
| 971 | |
| 972 | def test_velocity_week_buckets_count(self) -> None: |
| 973 | """Velocity must always have exactly 12 buckets.""" |
| 974 | history = { |
| 975 | "f.py::Fn": [_entry("c1", ts=_ago(days=1))], |
| 976 | } |
| 977 | snap = compute_intel(history, [], now_utc=_now()) |
| 978 | assert len(snap.velocity.weeks) == 12 |
| 979 | |
| 980 | def test_hotspot_entries_have_required_fields(self) -> None: |
| 981 | history = { |
| 982 | "f.py::Fn": [_entry(f"c{i}") for i in range(12)], |
| 983 | } |
| 984 | snap = compute_intel(history, [], now_utc=_now()) |
| 985 | for h in snap.hotspots: |
| 986 | assert isinstance(h.address, str) |
| 987 | assert isinstance(h.change_count, int) |
| 988 | assert h.change_count > 0 |
| 989 | |
| 990 | def test_dead_entry_days_cold_matches_expected(self) -> None: |
| 991 | old_ts = _ago(120) |
| 992 | history = {"f.py::Old": [_entry("c1", ts=old_ts)]} |
| 993 | snap = compute_intel(history, [], now_utc=_now()) |
| 994 | if snap.dead_candidates: |
| 995 | entry = snap.dead_candidates[0] |
| 996 | assert 110 <= entry.days_cold <= 130 # allow ±10 days rounding |
| 997 | |
| 998 | @pytest.mark.asyncio |
| 999 | async def test_blame_entry_fields_complete( |
| 1000 | self, db_session: AsyncSession |
| 1001 | ) -> None: |
| 1002 | from musehub.services.musehub_symbol_indexer import load_symbol_history |
| 1003 | from musehub.api.routes.musehub.blame import _build_real_symbol_blame |
| 1004 | |
| 1005 | repo = await create_repo(db_session, slug="di-blame-fields") |
| 1006 | ops = [_insert_op("f.py::Fn", "sha256:data1")] |
| 1007 | await _build_index(db_session, repo.repo_id, "head-di-blame", ops) |
| 1008 | |
| 1009 | history = await load_symbol_history(db_session, repo.repo_id, file_path="f.py") |
| 1010 | commit_map = {"head-di-blame": {"message": "feat: add fn", "author": "gabriel", |
| 1011 | "timestamp": _now()}} |
| 1012 | results = _build_real_symbol_blame(history, "f.py", commit_map) |
| 1013 | assert len(results) == 1 |
| 1014 | entry = results[0] |
| 1015 | assert entry.symbol_name == "Fn" |
| 1016 | assert entry.symbol_address == "f.py::Fn" |
| 1017 | assert entry.op in ("add", "modify", "delete", "insert", "replace", "patch", "rename") |
| 1018 | |
| 1019 | |
| 1020 | # =========================================================================== |
| 1021 | # Layer 6 — Security tests |
| 1022 | # =========================================================================== |
| 1023 | |
| 1024 | class TestSecurity: |
| 1025 | @pytest.mark.asyncio |
| 1026 | async def test_blame_private_repo_401_no_token( |
| 1027 | self, client: AsyncClient, db_session: AsyncSession |
| 1028 | ) -> None: |
| 1029 | repo = await create_repo(db_session, slug="sec-blame-priv", visibility="private") |
| 1030 | await db_session.commit() |
| 1031 | resp = await client.get( |
| 1032 | f"/api/repos/{repo.repo_id}/blame/HEAD", |
| 1033 | params={"path": "file.py"}, |
| 1034 | ) |
| 1035 | assert resp.status_code == 401 |
| 1036 | |
| 1037 | @pytest.mark.asyncio |
| 1038 | async def test_blame_404_for_deleted_repo( |
| 1039 | self, client: AsyncClient, db_session: AsyncSession |
| 1040 | ) -> None: |
| 1041 | repo = await create_repo(db_session, slug="sec-blame-deleted", visibility="public") |
| 1042 | await db_session.delete(repo) |
| 1043 | await db_session.commit() |
| 1044 | |
| 1045 | resp = await client.get( |
| 1046 | f"/api/repos/{repo.repo_id}/blame/HEAD", |
| 1047 | params={"path": "file.py"}, |
| 1048 | ) |
| 1049 | assert resp.status_code == 404 |
| 1050 | |
| 1051 | @pytest.mark.asyncio |
| 1052 | async def test_search_private_repo_not_visible_to_other_user( |
| 1053 | self, db_session: AsyncSession |
| 1054 | ) -> None: |
| 1055 | from musehub.services.musehub_cross_repo import search_symbol_across_repos |
| 1056 | |
| 1057 | owner = f"sec-owner-{secrets.token_hex(3)}" |
| 1058 | repo = await create_repo(db_session, slug="sec-priv-search", owner=owner, |
| 1059 | visibility="private") |
| 1060 | ops = [_insert_op("secret.py::TopSecretFn")] |
| 1061 | await _build_index(db_session, repo.repo_id, "head-sec-priv", ops) |
| 1062 | |
| 1063 | # Different user can't see private repo |
| 1064 | results = await search_symbol_across_repos( |
| 1065 | db_session, owner, "TopSecretFn", visible_to_user="other-user" |
| 1066 | ) |
| 1067 | assert not any("TopSecretFn" in r.address for r in results) |
| 1068 | |
| 1069 | @pytest.mark.asyncio |
| 1070 | async def test_blame_path_with_traversal_chars_no_crash( |
| 1071 | self, client: AsyncClient, db_session: AsyncSession |
| 1072 | ) -> None: |
| 1073 | repo = await create_repo(db_session, slug="sec-traversal", visibility="public") |
| 1074 | await db_session.commit() |
| 1075 | # Path with traversal attempt — server should return 200 with empty entries |
| 1076 | resp = await client.get( |
| 1077 | f"/api/repos/{repo.repo_id}/blame/HEAD", |
| 1078 | params={"path": "../../../etc/passwd"}, |
| 1079 | ) |
| 1080 | assert resp.status_code == 200 |
| 1081 | data = resp.json() |
| 1082 | assert data["entries"] == [] |
| 1083 | |
| 1084 | def test_compute_intel_with_injected_commit_ids(self) -> None: |
| 1085 | """Malformed commit IDs in history do not cause exceptions.""" |
| 1086 | history = { |
| 1087 | "f.py::Fn": [ |
| 1088 | {"commit_id": "'; DROP TABLE commits; --", "op": "add"}, |
| 1089 | {"commit_id": "", "op": "modify"}, |
| 1090 | {"commit_id": None, "op": "add"}, |
| 1091 | ] |
| 1092 | } |
| 1093 | snap = compute_intel(history, [], now_utc=_now()) |
| 1094 | assert snap.total_symbols == 1 |
| 1095 | |
| 1096 | def test_blame_build_with_xss_in_commit_message(self) -> None: |
| 1097 | """XSS in commit messages is returned verbatim, not executed.""" |
| 1098 | from musehub.api.routes.musehub.blame import _build_real_symbol_blame |
| 1099 | |
| 1100 | history = {"f.py::Fn": [_entry("c1")]} |
| 1101 | xss_msg = "<script>alert('xss')</script>" |
| 1102 | commit_map = {"c1": {"message": xss_msg, "author": "<img onerror=alert()>", |
| 1103 | "timestamp": _now()}} |
| 1104 | results = _build_real_symbol_blame(history, "f.py", commit_map) |
| 1105 | assert results[0].commit_message == xss_msg # stored as-is (escaping is UI's job) |
| 1106 | |
| 1107 | |
| 1108 | # =========================================================================== |
| 1109 | # Layer 7 — Performance tests |
| 1110 | # =========================================================================== |
| 1111 | |
| 1112 | class TestPerformance: |
| 1113 | def test_compute_intel_500_symbols_under_200ms(self) -> None: |
| 1114 | history = { |
| 1115 | f"pkg/mod_{i}.py::Symbol{i}": [ |
| 1116 | _entry(f"c{i}_{j}", ts=_ago(j % 300)) |
| 1117 | for j in range(5) |
| 1118 | ] |
| 1119 | for i in range(100) |
| 1120 | } |
| 1121 | t0 = time.perf_counter() |
| 1122 | snap = compute_intel(history, [], now_utc=_now()) |
| 1123 | elapsed_ms = (time.perf_counter() - t0) * 1000 |
| 1124 | assert elapsed_ms < 200, f"compute_intel took {elapsed_ms:.1f}ms" |
| 1125 | assert snap.total_symbols == 100 |
| 1126 | |
| 1127 | def test_intel_as_dict_from_dict_1000_entries_under_50ms(self) -> None: |
| 1128 | history = {f"f.py::Fn{i}": [_entry(f"c{i}")] for i in range(1000)} |
| 1129 | snap = compute_intel(history, [], now_utc=_now()) |
| 1130 | t0 = time.perf_counter() |
| 1131 | d = snap.as_dict() |
| 1132 | IntelSnapshot.from_dict(d) |
| 1133 | elapsed_ms = (time.perf_counter() - t0) * 1000 |
| 1134 | assert elapsed_ms < 50, f"as_dict/from_dict took {elapsed_ms:.1f}ms" |
| 1135 | |
| 1136 | def test_blame_build_1000_symbols_under_200ms(self) -> None: |
| 1137 | from musehub.api.routes.musehub.blame import _build_real_symbol_blame |
| 1138 | |
| 1139 | history = {f"big/file.py::Fn{i}": [_entry(f"c{i}")] for i in range(1000)} |
| 1140 | commit_map = {f"c{i}": {"message": "m", "author": "g", "timestamp": _now()} |
| 1141 | for i in range(1000)} |
| 1142 | t0 = time.perf_counter() |
| 1143 | results = _build_real_symbol_blame(history, "big/file.py", commit_map) |
| 1144 | elapsed_ms = (time.perf_counter() - t0) * 1000 |
| 1145 | assert elapsed_ms < 200, f"_build_real_symbol_blame took {elapsed_ms:.1f}ms" |
| 1146 | assert len(results) == 1000 |
| 1147 | |
| 1148 | @pytest.mark.asyncio |
| 1149 | async def test_search_across_5_repos_under_1s( |
| 1150 | self, db_session: AsyncSession |
| 1151 | ) -> None: |
| 1152 | from musehub.services.musehub_cross_repo import search_symbol_across_repos |
| 1153 | |
| 1154 | owner = f"perf-owner-{secrets.token_hex(3)}" |
| 1155 | for i in range(5): |
| 1156 | repo = await create_repo( |
| 1157 | db_session, slug=f"perf-repo-{i}", owner=owner, visibility="public" |
| 1158 | ) |
| 1159 | ops = [_insert_op(f"m{j}.py::Fn{j}") for j in range(30)] |
| 1160 | await _build_index(db_session, repo.repo_id, f"head-perf-{i}", ops) |
| 1161 | |
| 1162 | t0 = time.perf_counter() |
| 1163 | results = await search_symbol_across_repos( |
| 1164 | db_session, owner, "Fn", visible_to_user=owner |
| 1165 | ) |
| 1166 | elapsed_ms = (time.perf_counter() - t0) * 1000 |
| 1167 | assert elapsed_ms < 1000, f"search_symbol_across_repos took {elapsed_ms:.1f}ms" |
| 1168 | assert len(results) >= 1 |