"""Section 6 — Symbol Intelligence (Intel): 7-layer test suite. Covers: - musehub/services/musehub_intel.py (compute_intel, _parse_ts, _health_label, _health_color_class, IntelSnapshot, as_dict/from_dict) - musehub/api/routes/musehub/blame.py (_build_real_symbol_blame, GET /repos/{repo_id}/blame/{ref}) - musehub/services/musehub_cross_repo.py (search_symbol_across_repos, cross_repo_impact, workspace_blast_risk_top_n, build_deps_graph, _module_prefix, _short_label) Layers: 1. Unit — pure function tests, no DB, no I/O 2. Integration — real DB (PostgreSQL), service calls, no HTTP layer 3. End-to-End — full HTTP via AsyncClient, real DB 4. Stress — large data sets, volume correctness 5. Data Integrity — stored data correctness, field validation, round-trip 6. Security — auth guards, private repo access, injection safety 7. Performance — latency budgets for critical paths """ from __future__ import annotations import json import secrets import time from datetime import datetime, timedelta, timezone import msgpack type SymbolHistory = dict[str, list[JSONObject]] import pytest import pytest_asyncio from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from musehub.services.musehub_intel import ( IntelSnapshot, BlastRiskEntry, CouplingPair, DeadEntry, HotspotEntry, VelocityWindow, _health_color_class, _health_label, _parse_ts, compute_intel, ) from musehub.types.json_types import JSONObject, StrDict from tests.factories import create_repo # --------------------------------------------------------------------------- # Local helpers # --------------------------------------------------------------------------- def _now() -> datetime: return datetime.now(tz=timezone.utc) def _ago(days: int = 0, **kwargs: int) -> datetime: return _now() - timedelta(days=days, **kwargs) def _ts(dt: datetime) -> str: return dt.isoformat() def _entry(commit_id: str, op: str = "add", ts: datetime | None = None, content_id: str = "sha256:abc") -> JSONObject: return { "commit_id": commit_id, "op": op, "timestamp": _ts(ts or _now()), "committed_at": _ts(ts or _now()), "content_id": content_id, } def _history(**kwargs: list[JSONObject]) -> SymbolHistory: """Build a symbol_history dict from keyword args: addr=entries.""" return dict(kwargs) async def _build_index(session: AsyncSession, repo_id: str, head_id: str, ops: list[JSONObject]) -> "types.SimpleNamespace": """Insert one commit, build the symbol index, persist results, and return a namespace with intel_full_json and intel_summary attributes.""" import types as _types from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef from musehub.services.musehub_symbol_indexer import build_symbol_index from musehub.services.musehub_intel_providers import persist_intel_results commit = MusehubCommit( commit_id=head_id, branch="main", parent_ids=[], message="test commit", author="gabriel", timestamp=_now(), structured_delta={"ops": ops}, ) session.add(commit) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=head_id)) await session.flush() results = await build_symbol_index(session, repo_id, head_id) await persist_intel_results(session, repo_id, head_id, results) await session.commit() data_by_type = {t: json.dumps(d) for t, d in results} return _types.SimpleNamespace( intel_full_json=data_by_type.get("code.intel_snapshot"), intel_summary=data_by_type.get("code.intel_summary"), ) def _insert_op(address: str, content_id: str = "sha256:abc") -> JSONObject: return {"address": address, "op": "insert", "content_id": content_id} # =========================================================================== # Layer 1 — Unit tests (pure functions, no DB, no I/O) # =========================================================================== class TestParseTs: def test_iso_string_utc(self) -> None: dt = _parse_ts("2025-01-15T10:30:00+00:00") assert dt.year == 2025 assert dt.month == 1 assert dt.tzinfo is not None def test_iso_string_z_suffix(self) -> None: dt = _parse_ts("2025-06-01T00:00:00Z") assert dt.tzinfo is not None assert dt.year == 2025 def test_unix_int(self) -> None: dt = _parse_ts(0) assert dt.year == 1970 assert dt.tzinfo is not None def test_unix_float(self) -> None: dt = _parse_ts(1_700_000_000.5) assert dt.year == 2023 def test_invalid_string_raises(self) -> None: with pytest.raises(Exception): _parse_ts("not-a-date") class TestHealthLabel: def test_excellent(self) -> None: assert _health_label(100) == "Excellent" assert _health_label(90) == "Excellent" def test_good(self) -> None: assert _health_label(89) == "Good" assert _health_label(75) == "Good" def test_fair(self) -> None: assert _health_label(74) == "Fair" assert _health_label(55) == "Fair" def test_poor(self) -> None: assert _health_label(54) == "Poor" assert _health_label(35) == "Poor" def test_critical(self) -> None: assert _health_label(34) == "Critical" assert _health_label(0) == "Critical" class TestHealthColorClass: def test_excellent(self) -> None: assert _health_color_class(90) == "intel-health--excellent" def test_good(self) -> None: assert _health_color_class(75) == "intel-health--good" def test_fair(self) -> None: assert _health_color_class(55) == "intel-health--fair" def test_poor(self) -> None: assert _health_color_class(35) == "intel-health--poor" def test_critical(self) -> None: assert _health_color_class(0) == "intel-health--critical" class TestComputeIntelUnit: def test_empty_history_returns_zero_score(self) -> None: snap = compute_intel({}, [], now_utc=_now()) assert snap.total_symbols == 0 assert snap.total_commits_indexed == 0 assert snap.health_score == 100 # no penalties = 100 assert snap.health_label == "Excellent" def test_single_symbol_no_ts(self) -> None: history = {"file.py::Foo": [{"commit_id": "c1", "op": "add"}]} snap = compute_intel(history, [], now_utc=_now()) assert snap.total_symbols == 1 assert snap.total_commits_indexed == 1 def test_hotspot_detection(self) -> None: # 12 changes on one symbol — exceeds _HOTSPOT_THRESHOLD (10) entries = [_entry(f"c{i}") for i in range(12)] history = {"file.py::HotFn": entries} snap = compute_intel(history, [], now_utc=_now()) assert snap.alert_hotspot_count >= 1 assert any(h.address == "file.py::HotFn" for h in snap.hotspots) def test_dead_code_detection(self) -> None: # One old entry, last touched 100 days ago old_ts = _ago(100) history = {"file.py::Stale": [_entry("c1", ts=old_ts)]} snap = compute_intel(history, [], now_utc=_now()) assert snap.alert_dead_count >= 1 assert any(d.address == "file.py::Stale" for d in snap.dead_candidates) def test_recent_symbol_not_dead(self) -> None: recent_ts = _ago(5) history = {"file.py::Fresh": [_entry("c1", ts=recent_ts)]} snap = compute_intel(history, [], now_utc=_now()) assert snap.alert_dead_count == 0 def test_blast_risk_co_change(self) -> None: # Two symbols always change together → blast risk for both entries_a = [_entry("c1"), _entry("c2")] entries_b = [_entry("c1"), _entry("c2")] history = { "file.py::Alpha": entries_a, "file.py::Beta": entries_b, } snap = compute_intel(history, [], now_utc=_now()) # Both are co-changed — blast risk entries should include at least one assert len(snap.blast_risk) >= 1 def test_coupling_pairs_detected(self) -> None: # Symbols sharing same commit → coupling pair entries_a = [_entry("shared-commit")] entries_b = [_entry("shared-commit")] history = { "file.py::A": entries_a, "file.py::B": entries_b, } snap = compute_intel(history, [], now_utc=_now()) assert len(snap.coupling_pairs) >= 1 pair = snap.coupling_pairs[0] assert pair.shared_commits >= 1 def test_breaking_changes_reduce_score(self) -> None: snap_no_breaks = compute_intel({}, [], now_utc=_now()) snap_with_breaks = compute_intel({}, ["break1", "break2", "break3"], now_utc=_now()) assert snap_with_breaks.health_score < snap_no_breaks.health_score assert snap_with_breaks.alert_breaking_count == 3 def test_velocity_buckets_populated(self) -> None: recent = _ago(days=1) history = {"file.py::Fn": [_entry("c1", ts=recent)]} snap = compute_intel(history, [], now_utc=_now()) assert len(snap.velocity.weeks) == 12 assert snap.velocity.weeks[0] >= 1 # most recent week bucket def test_health_score_capped_at_100(self) -> None: snap = compute_intel({}, [], now_utc=_now()) assert 0 <= snap.health_score <= 100 def test_top_n_hotspots_limit(self) -> None: # 20 symbols each changed 15 times → _TOP_N=10 returned history: SymbolHistory = {} for i in range(20): history[f"file.py::Fn{i}"] = [_entry(f"c{i}_{j}") for j in range(15)] snap = compute_intel(history, [], now_utc=_now()) assert len(snap.hotspots) <= 10 def test_dead_candidates_sorted_by_coldest_first(self) -> None: h = { "file.py::Old": [_entry("c1", ts=_ago(200))], "file.py::Older": [_entry("c2", ts=_ago(300))], } snap = compute_intel(h, [], now_utc=_now()) if len(snap.dead_candidates) >= 2: assert snap.dead_candidates[0].days_cold >= snap.dead_candidates[1].days_cold def test_timestamp_invalid_gracefully_ignored(self) -> None: history = { "file.py::BadTs": [{"commit_id": "c1", "op": "add", "timestamp": "NOT_A_DATE"}] } snap = compute_intel(history, [], now_utc=_now()) # Should not raise; symbol counted but ts ignored assert snap.total_symbols == 1 class TestIntelSnapshotSerialisation: def _make_snap(self) -> IntelSnapshot: return IntelSnapshot( health_score=80, health_label="Good", alert_hotspot_count=2, alert_dead_count=1, alert_blast_risk_count=3, alert_breaking_count=0, hotspots=[HotspotEntry(address="a.py::Fn", change_count=15, last_changed=None)], dead_candidates=[DeadEntry(address="b.py::Old", days_cold=120, blast_radius=0, added_at=None)], blast_risk=[BlastRiskEntry(address="c.py::Risk", co_change_count=25, top_co_symbols=["d.py::X"])], coupling_pairs=[CouplingPair(address_a="a.py::F", address_b="b.py::G", shared_commits=5)], velocity=VelocityWindow(weeks=[1, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0]), total_symbols=50, total_commits_indexed=10, ) def test_as_dict_round_trip(self) -> None: snap = self._make_snap() d = snap.as_dict() reconstructed = IntelSnapshot.from_dict(d) assert reconstructed.health_score == 80 assert reconstructed.health_label == "Good" assert reconstructed.total_symbols == 50 assert reconstructed.hotspots[0].address == "a.py::Fn" assert reconstructed.dead_candidates[0].days_cold == 120 assert reconstructed.blast_risk[0].co_change_count == 25 assert reconstructed.coupling_pairs[0].shared_commits == 5 assert reconstructed.velocity.weeks[0] == 1 def test_as_dict_json_serialisable(self) -> None: snap = self._make_snap() d = snap.as_dict() # Must be JSON-serialisable (no datetimes, no custom objects) json_str = json.dumps(d) assert "health_score" in json_str def test_from_dict_missing_optional_fields(self) -> None: minimal = { "health_score": 70, "health_label": "Fair", "alert_hotspot_count": 0, "alert_dead_count": 0, "alert_blast_risk_count": 0, "alert_breaking_count": 0, "total_symbols": 0, "total_commits_indexed": 0, } snap = IntelSnapshot.from_dict(minimal) assert snap.hotspots == [] assert snap.dead_candidates == [] assert snap.coupling_pairs == [] assert snap.velocity.weeks == [] class TestModulePrefix: def test_three_segments(self) -> None: from musehub.services.musehub_cross_repo import _module_prefix result = _module_prefix("musehub.services.musehub_ci.enqueue_run") assert result == "musehub.services.musehub_ci" def test_fewer_than_depth(self) -> None: from musehub.services.musehub_cross_repo import _module_prefix result = _module_prefix("a.b") assert result == "a.b" # shorter than depth=3, returns as-is def test_exactly_depth(self) -> None: from musehub.services.musehub_cross_repo import _module_prefix result = _module_prefix("a.b.c") assert result == "a.b.c" def test_custom_depth(self) -> None: from musehub.services.musehub_cross_repo import _module_prefix result = _module_prefix("a.b.c.d.e", depth=2) assert result == "a.b" class TestShortLabel: def test_two_segments(self) -> None: from musehub.services.musehub_cross_repo import _short_label assert _short_label("a.b.c") == "b.c" def test_single_segment(self) -> None: from musehub.services.musehub_cross_repo import _short_label assert _short_label("single") == "single" class TestBuildRealSymbolBlame: def test_filters_to_path(self) -> None: from musehub.api.routes.musehub.blame import _build_real_symbol_blame history = { "musehub/api.py::Foo": [_entry("c1")], "other/file.py::Bar": [_entry("c2")], } commit_map = { "c1": {"message": "add Foo", "author": "gabriel", "timestamp": _now()}, } results = _build_real_symbol_blame(history, "musehub/api.py", commit_map) assert len(results) == 1 assert results[0].symbol_name == "Foo" def test_excludes_import_declarations(self) -> None: from musehub.api.routes.musehub.blame import _build_real_symbol_blame history = { "file.py::import::os": [_entry("c1")], "file.py::MyFn": [_entry("c1")], } commit_map = {"c1": {"message": "m", "author": "g", "timestamp": _now()}} results = _build_real_symbol_blame(history, "file.py", commit_map) names = [r.symbol_name for r in results] assert "MyFn" in names assert "import::os" not in names def test_excludes_deleted_symbols(self) -> None: from musehub.api.routes.musehub.blame import _build_real_symbol_blame history = { "file.py::Gone": [_entry("c1", op="delete")], "file.py::Here": [_entry("c2", op="add")], } commit_map = { "c1": {"message": "del", "author": "g", "timestamp": _now()}, "c2": {"message": "add", "author": "g", "timestamp": _now()}, } results = _build_real_symbol_blame(history, "file.py", commit_map) names = [r.symbol_name for r in results] assert "Gone" not in names assert "Here" in names def test_intel_signals_populated(self) -> None: from musehub.api.routes.musehub.blame import _build_real_symbol_blame history = { "file.py::HotFn": [_entry("c1")], } commit_map = {"c1": {"message": "m", "author": "g", "timestamp": _now()}} intel = compute_intel( {"file.py::HotFn": [_entry(f"c{i}") for i in range(15)]}, [], now_utc=_now(), ) results = _build_real_symbol_blame(history, "file.py", commit_map, intel=intel) assert len(results) == 1 assert results[0].is_hotspot is True def test_change_count_reflects_history_length(self) -> None: from musehub.api.routes.musehub.blame import _build_real_symbol_blame history = { "file.py::Changed": [_entry("c1"), _entry("c2"), _entry("c3")], } commit_map = { "c1": {"message": "m", "author": "g", "timestamp": _now()}, "c2": {"message": "m", "author": "g", "timestamp": _now()}, "c3": {"message": "m", "author": "g", "timestamp": _now()}, } results = _build_real_symbol_blame(history, "file.py", commit_map) assert results[0].change_count == 3 def test_empty_history_returns_empty_list(self) -> None: from musehub.api.routes.musehub.blame import _build_real_symbol_blame results = _build_real_symbol_blame({}, "file.py", {}) assert results == [] def test_unknown_commit_id_falls_back_gracefully(self) -> None: from musehub.api.routes.musehub.blame import _build_real_symbol_blame history = {"file.py::Fn": [_entry("unknown-commit")]} results = _build_real_symbol_blame(history, "file.py", {}) assert len(results) == 1 assert results[0].author == "" assert results[0].commit_message == "" # =========================================================================== # Layer 2 — Integration tests (real DB, service layer, no HTTP) # =========================================================================== class TestComputeIntelIntegration: @pytest.mark.asyncio async def test_load_intel_snapshot_none_when_no_index( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import load_intel_snapshot repo = await create_repo(db_session, slug="intel-no-index") result = await load_intel_snapshot(db_session, repo.repo_id) assert result is None @pytest.mark.asyncio async def test_build_index_populates_intel_full_json( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import load_intel_snapshot repo = await create_repo(db_session, slug="intel-populated") ops = [_insert_op("src/main.py::run"), _insert_op("src/main.py::setup")] row = await _build_index(db_session, repo.repo_id, "head-intel-1", ops) assert row is not None assert row.intel_full_json is not None snap = await load_intel_snapshot(db_session, repo.repo_id) assert snap is not None assert snap.total_symbols == 2 @pytest.mark.asyncio async def test_intel_health_score_range( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import load_intel_snapshot repo = await create_repo(db_session, slug="intel-health-range") ops = [_insert_op(f"src/f.py::Fn{i}") for i in range(5)] await _build_index(db_session, repo.repo_id, "head-hr", ops) snap = await load_intel_snapshot(db_session, repo.repo_id) assert snap is not None assert 0 <= snap.health_score <= 100 @pytest.mark.asyncio async def test_intel_summary_json_fields( self, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="intel-summary-fields") ops = [_insert_op("api.py::endpoint")] row = await _build_index(db_session, repo.repo_id, "head-summ", ops) assert row is not None assert row.intel_summary is not None summary = json.loads(row.intel_summary) assert "health_score" in summary assert "symbol_count" in summary assert "hotspot_count" in summary assert "dead_symbol_count" in summary class TestBlameIntegration: @pytest.mark.asyncio async def test_blame_returns_empty_when_no_index( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import load_symbol_history from musehub.api.routes.musehub.blame import _build_real_symbol_blame repo = await create_repo(db_session, slug="blame-no-idx") history = await load_symbol_history(db_session, repo.repo_id, file_path="file.py") results = _build_real_symbol_blame(history, "file.py", {}) assert results == [] @pytest.mark.asyncio async def test_blame_entries_after_index_build( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import load_symbol_history from musehub.api.routes.musehub.blame import _build_real_symbol_blame repo = await create_repo(db_session, slug="blame-with-idx") ops = [ _insert_op("src/api.py::handle_request"), _insert_op("src/api.py::parse_args"), ] await _build_index(db_session, repo.repo_id, "head-blame", ops) history = await load_symbol_history( db_session, repo.repo_id, file_path="src/api.py" ) results = _build_real_symbol_blame(history, "src/api.py", {}) names = [r.symbol_name for r in results] assert "handle_request" in names assert "parse_args" in names class TestCrossRepoIntegration: @pytest.mark.asyncio async def test_search_symbol_no_repos( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_cross_repo import search_symbol_across_repos result = await search_symbol_across_repos( db_session, "ghost-owner", "Fn", visible_to_user="ghost-owner" ) assert result == [] @pytest.mark.asyncio async def test_search_symbol_finds_match( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_cross_repo import search_symbol_across_repos owner = f"owner-{secrets.token_hex(4)}" repo = await create_repo(db_session, slug="search-sym-repo", owner=owner, visibility="public") ops = [_insert_op("api.py::compute_intel")] await _build_index(db_session, repo.repo_id, "head-search", ops) results = await search_symbol_across_repos( db_session, owner, "compute_intel", visible_to_user=owner ) assert len(results) >= 1 assert any("compute_intel" in r.address for r in results) @pytest.mark.asyncio async def test_search_symbol_case_insensitive( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_cross_repo import search_symbol_across_repos owner = f"owner-{secrets.token_hex(4)}" repo = await create_repo(db_session, slug="search-case-repo", owner=owner, visibility="public") ops = [_insert_op("api.py::MyFunction")] await _build_index(db_session, repo.repo_id, "head-case", ops) results = await search_symbol_across_repos( db_session, owner, "myfunction", visible_to_user=owner ) assert any("MyFunction" in r.address for r in results) @pytest.mark.asyncio async def test_search_symbol_private_repo_excluded_without_auth( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_cross_repo import search_symbol_across_repos owner = f"owner-{secrets.token_hex(4)}" repo = await create_repo(db_session, slug="search-private-repo", owner=owner, visibility="private") ops = [_insert_op("api.py::SecretFn")] await _build_index(db_session, repo.repo_id, "head-priv", ops) # visible_to_user=None → only public repos results = await search_symbol_across_repos( db_session, owner, "SecretFn", visible_to_user=None ) assert not any("SecretFn" in r.address for r in results) @pytest.mark.asyncio async def test_workspace_blast_risk_empty( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_cross_repo import workspace_blast_risk_top_n result = await workspace_blast_risk_top_n( db_session, "nonexistent-owner", visible_to_user="nonexistent-owner" ) assert result == [] @pytest.mark.asyncio async def test_workspace_blast_risk_populated( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_cross_repo import workspace_blast_risk_top_n owner = f"owner-{secrets.token_hex(4)}" repo = await create_repo(db_session, slug="wbr-repo", owner=owner, visibility="public") ops = [_insert_op("a.py::Fn"), _insert_op("b.py::Gn")] await _build_index(db_session, repo.repo_id, "head-wbr", ops) results = await workspace_blast_risk_top_n( db_session, owner, visible_to_user=owner ) assert len(results) >= 2 # Sorted by co_change_count descending for i in range(len(results) - 1): assert results[i].co_change_count >= results[i + 1].co_change_count @pytest.mark.asyncio async def test_cross_repo_impact_no_source_repo( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_cross_repo import cross_repo_impact result = await cross_repo_impact( db_session, "ghost-owner", secrets.token_hex(16), "file.py::Fn", visible_to_user="ghost-owner", ) assert result is None @pytest.mark.asyncio async def test_cross_repo_impact_unknown_address( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_cross_repo import cross_repo_impact owner = f"owner-{secrets.token_hex(4)}" repo = await create_repo(db_session, slug="cri-unknown", owner=owner, visibility="public") ops = [_insert_op("a.py::KnownFn")] await _build_index(db_session, repo.repo_id, "head-cri", ops) result = await cross_repo_impact( db_session, owner, repo.repo_id, "a.py::NonExistent", visible_to_user=owner, ) assert result is None @pytest.mark.asyncio async def test_build_deps_graph_single_repo( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_cross_repo import build_deps_graph owner = f"owner-{secrets.token_hex(4)}" repo = await create_repo(db_session, slug="deps-single", owner=owner, visibility="public") ops = [ _insert_op("a.b.c.Fn"), _insert_op("a.b.d.Gn"), ] await _build_index(db_session, repo.repo_id, "head-deps", ops) graph = await build_deps_graph( db_session, owner, repo.repo_id, visible_to_user=owner ) assert hasattr(graph, "nodes") assert hasattr(graph, "edges") @pytest.mark.asyncio async def test_build_deps_graph_no_source_repo_returns_empty( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_cross_repo import build_deps_graph, DepsGraph owner = f"owner-{secrets.token_hex(4)}" graph = await build_deps_graph( db_session, owner, secrets.token_hex(16), visible_to_user=owner ) assert isinstance(graph, DepsGraph) # =========================================================================== # Layer 3 — End-to-End tests (full HTTP via AsyncClient, real DB) # =========================================================================== class TestBlameEndToEnd: @pytest.mark.asyncio async def test_blame_404_unknown_repo( self, client: AsyncClient, db_session: AsyncSession ) -> None: resp = await client.get( f"/api/repos/{secrets.token_hex(16)}/blame/HEAD", params={"path": "file.py"}, ) assert resp.status_code == 404 @pytest.mark.asyncio async def test_blame_public_repo_no_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="blame-e2e-pub", visibility="public") await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/blame/HEAD", params={"path": "file.py"}, ) assert resp.status_code == 200 data = resp.json() assert "entries" in data assert "totalEntries" in data assert "path" in data @pytest.mark.asyncio async def test_blame_private_repo_requires_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="blame-e2e-priv", visibility="private") await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/blame/HEAD", params={"path": "file.py"}, ) assert resp.status_code == 401 @pytest.mark.asyncio async def test_blame_returns_entries_after_index_build( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="blame-e2e-entries", visibility="public") ops = [_insert_op("api/routes.py::dispatch"), _insert_op("api/routes.py::validate")] await _build_index(db_session, repo.repo_id, "head-blame-e2e", ops) resp = await client.get( f"/api/repos/{repo.repo_id}/blame/HEAD", params={"path": "api/routes.py"}, ) assert resp.status_code == 200 data = resp.json() names = [e["symbolName"] for e in data["entries"]] assert "dispatch" in names assert "validate" in names @pytest.mark.asyncio async def test_blame_path_filter_respected( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="blame-e2e-filter", visibility="public") ops = [ _insert_op("path/a.py::FnA"), _insert_op("path/b.py::FnB"), ] await _build_index(db_session, repo.repo_id, "head-filter", ops) resp = await client.get( f"/api/repos/{repo.repo_id}/blame/HEAD", params={"path": "path/a.py"}, ) assert resp.status_code == 200 data = resp.json() names = [e["symbolName"] for e in data["entries"]] assert "FnA" in names assert "FnB" not in names @pytest.mark.asyncio async def test_symbol_index_rebuild_endpoint( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: from musehub.db.musehub_repo_models import MusehubBranch as _Branch, MusehubCommit as _Commit, MusehubCommitRef as _CommitRef repo = await create_repo(db_session, slug="rebuild-e2e") # Create a head commit on "main" commit = _Commit( commit_id="rebuild-head", branch="main", parent_ids=[], message="initial", author="gabriel", timestamp=_now(), structured_delta={"ops": [_insert_op("x.py::Fn")]}, ) db_branch = _Branch( branch_id=secrets.token_hex(16), repo_id=repo.repo_id, name="main", head_commit_id="rebuild-head", ) db_session.add(commit) db_session.add(_CommitRef(repo_id=repo.repo_id, commit_id="rebuild-head")) db_session.add(db_branch) await db_session.commit() resp = await client.post( f"/api/repos/{repo.repo_id}/symbol-index/rebuild", headers=auth_headers, ) assert resp.status_code in (200, 202) @pytest.mark.asyncio async def test_symbol_index_rebuild_requires_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="rebuild-noauth") await db_session.commit() resp = await client.post(f"/api/repos/{repo.repo_id}/symbol-index/rebuild") assert resp.status_code == 401 # =========================================================================== # Layer 4 — Stress tests # =========================================================================== class TestStress: def test_compute_intel_1000_symbols(self) -> None: """compute_intel on 1000 symbols completes without error.""" history: SymbolHistory = {} for i in range(1000): ts = _ago(days=i % 200) history[f"module/file_{i % 20}.py::Fn{i}"] = [ _entry(f"c{i}", ts=ts) ] snap = compute_intel(history, [], now_utc=_now()) assert snap.total_symbols == 1000 assert 0 <= snap.health_score <= 100 def test_compute_intel_many_co_changing_symbols(self) -> None: """50 symbols all sharing the same commit — coupling matrix stays bounded.""" commit_id = "shared" history = { f"file.py::Fn{i}": [_entry(commit_id)] for i in range(50) } snap = compute_intel(history, [], now_utc=_now()) # _TOP_COUPLING=5 cap must be respected assert len(snap.coupling_pairs) <= 5 @pytest.mark.asyncio async def test_search_symbol_across_10_repos( self, db_session: AsyncSession ) -> None: """Search across 10 repos each with 20 symbols.""" from musehub.services.musehub_cross_repo import search_symbol_across_repos owner = f"stress-owner-{secrets.token_hex(3)}" for i in range(10): repo = await create_repo( db_session, slug=f"stress-repo-{i}", owner=owner, visibility="public" ) ops = [_insert_op(f"mod{j}.py::TargetFn{j}") for j in range(20)] await _build_index(db_session, repo.repo_id, f"head-stress-{i}", ops) results = await search_symbol_across_repos( db_session, owner, "TargetFn", visible_to_user=owner, limit=50 ) assert len(results) >= 1 @pytest.mark.asyncio async def test_workspace_blast_risk_across_5_repos( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_cross_repo import workspace_blast_risk_top_n owner = f"wbr-owner-{secrets.token_hex(3)}" for i in range(5): repo = await create_repo( db_session, slug=f"wbr-sr-{i}", owner=owner, visibility="public" ) ops = [_insert_op(f"f{j}.py::Fn{j}") for j in range(10)] await _build_index(db_session, repo.repo_id, f"head-wbr-{i}", ops) results = await workspace_blast_risk_top_n( db_session, owner, top_n=20, visible_to_user=owner ) # 5 repos × 10 symbols each = 50 entries, capped at top_n=20 assert len(results) <= 20 assert len(results) >= 1 def test_blame_build_500_symbols(self) -> None: """_build_real_symbol_blame with 500 symbols in one file stays fast.""" from musehub.api.routes.musehub.blame import _build_real_symbol_blame history = {f"big/file.py::Fn{i}": [_entry(f"c{i}")] for i in range(500)} commit_map = {f"c{i}": {"message": "m", "author": "g", "timestamp": _now()} for i in range(500)} results = _build_real_symbol_blame(history, "big/file.py", commit_map) assert len(results) == 500 # =========================================================================== # Layer 5 — Data Integrity tests # =========================================================================== class TestDataIntegrity: def test_intel_snapshot_as_dict_from_dict_identity(self) -> None: """Round-trip through as_dict/from_dict is lossless for all fields.""" snap = compute_intel( { "file.py::Fn": [_entry(f"c{i}") for i in range(15)], "file.py::Old": [_entry("co", ts=_ago(150))], }, ["breaking1"], now_utc=_now(), ) d = snap.as_dict() reconstructed = IntelSnapshot.from_dict(d) assert reconstructed.health_score == snap.health_score assert reconstructed.alert_hotspot_count == snap.alert_hotspot_count assert reconstructed.alert_dead_count == snap.alert_dead_count assert reconstructed.alert_breaking_count == snap.alert_breaking_count assert len(reconstructed.hotspots) == len(snap.hotspots) @pytest.mark.asyncio async def test_intel_full_json_stored_and_retrievable( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import load_intel_snapshot repo = await create_repo(db_session, slug="di-intel-json") ops = [_insert_op("svc.py::do_work", "sha256:beef")] row = await _build_index(db_session, repo.repo_id, "head-di", ops) assert row.intel_full_json is not None snap = await load_intel_snapshot(db_session, repo.repo_id) assert snap is not None assert snap.total_symbols == 1 hotspot_addrs = [h.address for h in snap.hotspots] # Address must be present in symbol set all_in_dict = json.loads(row.intel_full_json) assert all_in_dict["total_symbols"] == 1 def test_velocity_week_buckets_count(self) -> None: """Velocity must always have exactly 12 buckets.""" history = { "f.py::Fn": [_entry("c1", ts=_ago(days=1))], } snap = compute_intel(history, [], now_utc=_now()) assert len(snap.velocity.weeks) == 12 def test_hotspot_entries_have_required_fields(self) -> None: history = { "f.py::Fn": [_entry(f"c{i}") for i in range(12)], } snap = compute_intel(history, [], now_utc=_now()) for h in snap.hotspots: assert isinstance(h.address, str) assert isinstance(h.change_count, int) assert h.change_count > 0 def test_dead_entry_days_cold_matches_expected(self) -> None: old_ts = _ago(120) history = {"f.py::Old": [_entry("c1", ts=old_ts)]} snap = compute_intel(history, [], now_utc=_now()) if snap.dead_candidates: entry = snap.dead_candidates[0] assert 110 <= entry.days_cold <= 130 # allow ±10 days rounding @pytest.mark.asyncio async def test_blame_entry_fields_complete( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_symbol_indexer import load_symbol_history from musehub.api.routes.musehub.blame import _build_real_symbol_blame repo = await create_repo(db_session, slug="di-blame-fields") ops = [_insert_op("f.py::Fn", "sha256:data1")] await _build_index(db_session, repo.repo_id, "head-di-blame", ops) history = await load_symbol_history(db_session, repo.repo_id, file_path="f.py") commit_map = {"head-di-blame": {"message": "feat: add fn", "author": "gabriel", "timestamp": _now()}} results = _build_real_symbol_blame(history, "f.py", commit_map) assert len(results) == 1 entry = results[0] assert entry.symbol_name == "Fn" assert entry.symbol_address == "f.py::Fn" assert entry.op in ("add", "modify", "delete", "insert", "replace", "patch", "rename") # =========================================================================== # Layer 6 — Security tests # =========================================================================== class TestSecurity: @pytest.mark.asyncio async def test_blame_private_repo_401_no_token( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="sec-blame-priv", visibility="private") await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/blame/HEAD", params={"path": "file.py"}, ) assert resp.status_code == 401 @pytest.mark.asyncio async def test_blame_404_for_deleted_repo( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="sec-blame-deleted", visibility="public") await db_session.delete(repo) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/blame/HEAD", params={"path": "file.py"}, ) assert resp.status_code == 404 @pytest.mark.asyncio async def test_search_private_repo_not_visible_to_other_user( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_cross_repo import search_symbol_across_repos owner = f"sec-owner-{secrets.token_hex(3)}" repo = await create_repo(db_session, slug="sec-priv-search", owner=owner, visibility="private") ops = [_insert_op("secret.py::TopSecretFn")] await _build_index(db_session, repo.repo_id, "head-sec-priv", ops) # Different user can't see private repo results = await search_symbol_across_repos( db_session, owner, "TopSecretFn", visible_to_user="other-user" ) assert not any("TopSecretFn" in r.address for r in results) @pytest.mark.asyncio async def test_blame_path_with_traversal_chars_no_crash( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="sec-traversal", visibility="public") await db_session.commit() # Path with traversal attempt — server should return 200 with empty entries resp = await client.get( f"/api/repos/{repo.repo_id}/blame/HEAD", params={"path": "../../../etc/passwd"}, ) assert resp.status_code == 200 data = resp.json() assert data["entries"] == [] def test_compute_intel_with_injected_commit_ids(self) -> None: """Malformed commit IDs in history do not cause exceptions.""" history = { "f.py::Fn": [ {"commit_id": "'; DROP TABLE commits; --", "op": "add"}, {"commit_id": "", "op": "modify"}, {"commit_id": None, "op": "add"}, ] } snap = compute_intel(history, [], now_utc=_now()) assert snap.total_symbols == 1 def test_blame_build_with_xss_in_commit_message(self) -> None: """XSS in commit messages is returned verbatim, not executed.""" from musehub.api.routes.musehub.blame import _build_real_symbol_blame history = {"f.py::Fn": [_entry("c1")]} xss_msg = "" commit_map = {"c1": {"message": xss_msg, "author": "", "timestamp": _now()}} results = _build_real_symbol_blame(history, "f.py", commit_map) assert results[0].commit_message == xss_msg # stored as-is (escaping is UI's job) # =========================================================================== # Layer 7 — Performance tests # =========================================================================== class TestPerformance: def test_compute_intel_500_symbols_under_200ms(self) -> None: history = { f"pkg/mod_{i}.py::Symbol{i}": [ _entry(f"c{i}_{j}", ts=_ago(j % 300)) for j in range(5) ] for i in range(100) } t0 = time.perf_counter() snap = compute_intel(history, [], now_utc=_now()) elapsed_ms = (time.perf_counter() - t0) * 1000 assert elapsed_ms < 200, f"compute_intel took {elapsed_ms:.1f}ms" assert snap.total_symbols == 100 def test_intel_as_dict_from_dict_1000_entries_under_50ms(self) -> None: history = {f"f.py::Fn{i}": [_entry(f"c{i}")] for i in range(1000)} snap = compute_intel(history, [], now_utc=_now()) t0 = time.perf_counter() d = snap.as_dict() IntelSnapshot.from_dict(d) elapsed_ms = (time.perf_counter() - t0) * 1000 assert elapsed_ms < 50, f"as_dict/from_dict took {elapsed_ms:.1f}ms" def test_blame_build_1000_symbols_under_200ms(self) -> None: from musehub.api.routes.musehub.blame import _build_real_symbol_blame history = {f"big/file.py::Fn{i}": [_entry(f"c{i}")] for i in range(1000)} commit_map = {f"c{i}": {"message": "m", "author": "g", "timestamp": _now()} for i in range(1000)} t0 = time.perf_counter() results = _build_real_symbol_blame(history, "big/file.py", commit_map) elapsed_ms = (time.perf_counter() - t0) * 1000 assert elapsed_ms < 200, f"_build_real_symbol_blame took {elapsed_ms:.1f}ms" assert len(results) == 1000 @pytest.mark.asyncio async def test_search_across_5_repos_under_1s( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_cross_repo import search_symbol_across_repos owner = f"perf-owner-{secrets.token_hex(3)}" for i in range(5): repo = await create_repo( db_session, slug=f"perf-repo-{i}", owner=owner, visibility="public" ) ops = [_insert_op(f"m{j}.py::Fn{j}") for j in range(30)] await _build_index(db_session, repo.repo_id, f"head-perf-{i}", ops) t0 = time.perf_counter() results = await search_symbol_across_repos( db_session, owner, "Fn", visible_to_user=owner ) elapsed_ms = (time.perf_counter() - t0) * 1000 assert elapsed_ms < 1000, f"search_symbol_across_repos took {elapsed_ms:.1f}ms" assert len(results) >= 1