"""Section 24 — Workspace & Cross-Repo Intelligence: 7-layer test suite. Covers musehub/services/musehub_cross_repo.py and the /{owner}/search UI endpoint in musehub/api/routes/musehub/ui_symbols.py. Layer map --------- 1. Unit — pure functions, dataclasses 2. Integration — service functions against real PostgreSQL DB + symbol index 3. E2E — HTTP client against the full app 4. Stress — many repos, many symbols, concurrent requests 5. Data Integrity — sort order, exclusion rules, limit enforcement 6. Security — private repo visibility gating 7. Performance — timing budgets """ from __future__ import annotations import asyncio import secrets import time import pytest from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from musehub.types.json_types import JSONObject, StrDict, SymbolHistoryEntry from datetime import datetime, timezone from musehub.core.genesis import compute_identity_id, compute_repo_id from musehub.db.musehub_intel_models import MusehubSymbolHistoryEntry from musehub.db.musehub_repo_models import MusehubRepo type SymbolHistoryMap = dict[str, list[SymbolHistoryEntry]] from musehub.services.musehub_cross_repo import ( CrossRepoImpact, CrossRepoMatch, DepsEdge, DepsGraph, DepsNode, ExternalImpact, WorkspaceForecast, WorkspaceRiskEntry, _load_owner_repos, _module_prefix, _short_label, build_deps_graph, cross_repo_impact, search_symbol_across_repos, workspace_blast_risk_top_n, ) # --------------------------------------------------------------------------- # DB helpers # --------------------------------------------------------------------------- def _uid() -> str: return secrets.token_hex(16) async def _db_repo( session: AsyncSession, owner: str = "alice", *, name: str | None = None, visibility: str = "public", deleted: bool = False, ) -> MusehubRepo: slug = name or f"repo-{_uid()[:8]}" owner_id = compute_identity_id(owner.encode()) created_at = datetime.now(tz=timezone.utc) repo = MusehubRepo( repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()), name=slug, slug=slug, owner=owner, owner_user_id=owner_id, visibility=visibility, created_at=created_at, updated_at=created_at, ) session.add(repo) await session.flush() if deleted: await session.delete(repo) await session.flush() return repo def _entry(commit_id: str, *, op: str = "add", committed_at: str = "2026-01-01T00:00:00") -> JSONObject: return {"commit_id": commit_id, "op": op, "committed_at": committed_at} async def _db_symbol_index( session: AsyncSession, repo_id: str, symbol_history: SymbolHistoryMap, ) -> None: """Insert MusehubSymbolHistoryEntry rows (normalized schema).""" from datetime import timezone from sqlalchemy.dialects.postgresql import insert as pg_insert for address, entries in symbol_history.items(): for entry in entries: committed_at_raw = entry.get("committed_at", "2026-01-01T00:00:00") if isinstance(committed_at_raw, str): dt = datetime.fromisoformat(committed_at_raw) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) else: dt = committed_at_raw await session.execute( pg_insert(MusehubSymbolHistoryEntry).values( repo_id=repo_id, address=address, commit_id=entry["commit_id"], committed_at=dt, author=entry.get("author"), op=entry.get("op", "add"), content_id=entry.get("content_id"), ).on_conflict_do_nothing() ) await session.flush() # =========================================================================== # Layer 1 — Unit # =========================================================================== class TestUnitModulePrefix: def test_returns_first_three_segments(self) -> None: assert _module_prefix("musehub.services.musehub_ci.enqueue_run") == "musehub.services.musehub_ci" def test_exactly_three_segments(self) -> None: assert _module_prefix("a.b.c") == "a.b.c" def test_fewer_than_depth_returns_address(self) -> None: assert _module_prefix("a.b") == "a.b" def test_single_segment_unchanged(self) -> None: assert _module_prefix("module") == "module" def test_custom_depth_two(self) -> None: assert _module_prefix("a.b.c.d", depth=2) == "a.b" def test_address_with_double_colon(self) -> None: # Dot-separated only; :: is ignored by _module_prefix result = _module_prefix("musehub.services.musehub_ci::fn_name") # Only splits on dots; the colons stay as-is assert result.startswith("musehub.services") class TestUnitShortLabel: def test_returns_last_two_segments(self) -> None: assert _short_label("musehub.services.musehub_ci") == "services.musehub_ci" def test_two_segments_unchanged(self) -> None: assert _short_label("services.musehub_ci") == "services.musehub_ci" def test_single_segment_unchanged(self) -> None: assert _short_label("module") == "module" def test_long_address(self) -> None: assert _short_label("a.b.c.d.e") == "d.e" class TestUnitDataclasses: def test_cross_repo_match_fields(self) -> None: m = CrossRepoMatch( repo_id="r1", repo_slug="my-repo", address="file.py::Foo", last_op="modify", co_change_count=3, ) assert m.co_change_count == 3 def test_external_impact_fields(self) -> None: ei = ExternalImpact( repo_id="r2", repo_slug="other", matches=[{"address": "a", "shared_commits": 2}] ) assert len(ei.matches) == 1 def test_cross_repo_impact_fields(self) -> None: cri = CrossRepoImpact( address="file.py::Foo", source_repo_id="r1", source_repo_slug="my-repo", local_co_changed=[], local_commit_count=5, external=[], ) assert cri.local_commit_count == 5 def test_workspace_risk_entry_fields(self) -> None: wre = WorkspaceRiskEntry( address="file.py::Bar", repo_id="r1", repo_slug="my-repo", co_change_count=10, commit_count=7, ) assert wre.commit_count == 7 def test_deps_node_fields(self) -> None: node = DepsNode( id="musehub.services.ci", label="services.ci", type="local", repo_id="r1", repo_slug="my-repo", address_count=5, ) assert node.type == "local" def test_deps_edge_fields(self) -> None: edge = DepsEdge(source="a", target="b", weight=3, type="co_change") assert edge.weight == 3 def test_deps_graph_default_empty(self) -> None: g = DepsGraph() assert g.nodes == [] assert g.edges == [] def test_workspace_forecast_fields(self) -> None: wf = WorkspaceForecast(owner="alice", repos=[], cross_repo_risk_symbols=[]) assert wf.owner == "alice" # =========================================================================== # Layer 2 — Integration # =========================================================================== class TestIntegrationLoadOwnerRepos: async def test_returns_public_repos_for_unauthenticated( self, db_session: AsyncSession ) -> None: pub = await _db_repo(db_session, "alice", visibility="public") priv = await _db_repo(db_session, "alice", visibility="private") await db_session.flush() repos = await _load_owner_repos(db_session, "alice", visible_to_user=None) ids = [r.repo_id for r in repos] assert pub.repo_id in ids assert priv.repo_id not in ids async def test_owner_sees_all_repos(self, db_session: AsyncSession) -> None: pub = await _db_repo(db_session, "alice", visibility="public") priv = await _db_repo(db_session, "alice", visibility="private") await db_session.flush() repos = await _load_owner_repos(db_session, "alice", visible_to_user="alice") ids = [r.repo_id for r in repos] assert pub.repo_id in ids assert priv.repo_id in ids async def test_deleted_repos_excluded(self, db_session: AsyncSession) -> None: active = await _db_repo(db_session, "alice", visibility="public") deleted = await _db_repo(db_session, "alice", visibility="public", deleted=True) await db_session.flush() repos = await _load_owner_repos(db_session, "alice", visible_to_user="alice") ids = [r.repo_id for r in repos] assert active.repo_id in ids assert deleted.repo_id not in ids async def test_other_owner_repos_excluded(self, db_session: AsyncSession) -> None: alice_repo = await _db_repo(db_session, "alice", visibility="public") bob_repo = await _db_repo(db_session, "bob", visibility="public") await db_session.flush() repos = await _load_owner_repos(db_session, "alice", visible_to_user="alice") ids = [r.repo_id for r in repos] assert alice_repo.repo_id in ids assert bob_repo.repo_id not in ids class TestIntegrationSearchSymbolAcrossRepos: async def test_finds_matching_symbol(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session, "alice", visibility="public") c_id = _uid() await _db_symbol_index( db_session, repo.repo_id, {"musehub.services.ci::enqueue_run": [_entry(c_id)]}, ) await db_session.flush() results = await search_symbol_across_repos( db_session, "alice", "enqueue_run", visible_to_user="alice" ) assert any("enqueue_run" in r.address for r in results) async def test_case_insensitive_match(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session, "alice", visibility="public") c_id = _uid() await _db_symbol_index( db_session, repo.repo_id, {"musehub.services.ci::EnqueueRun": [_entry(c_id)]}, ) await db_session.flush() results = await search_symbol_across_repos( db_session, "alice", "enqueuerun", visible_to_user="alice" ) assert any("EnqueueRun" in r.address for r in results) async def test_no_match_returns_empty(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session, "alice", visibility="public") c_id = _uid() await _db_symbol_index( db_session, repo.repo_id, {"file.py::Foo": [_entry(c_id)]} ) await db_session.flush() results = await search_symbol_across_repos( db_session, "alice", "no_such_symbol_xyz", visible_to_user="alice" ) assert results == [] async def test_limit_respected(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session, "alice", visibility="public") history = {f"file.py::Sym{i}": [_entry(_uid())] for i in range(20)} await _db_symbol_index(db_session, repo.repo_id, history) await db_session.flush() results = await search_symbol_across_repos( db_session, "alice", "Sym", limit=5, visible_to_user="alice" ) assert len(results) <= 5 async def test_private_repo_invisible_to_others( self, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, "alice", visibility="private") c_id = _uid() await _db_symbol_index( db_session, repo.repo_id, {"file.py::SecretFn": [_entry(c_id)]} ) await db_session.flush() results = await search_symbol_across_repos( db_session, "alice", "SecretFn", visible_to_user="bob" ) assert results == [] async def test_repo_without_index_skipped(self, db_session: AsyncSession) -> None: await _db_repo(db_session, "alice", visibility="public") await db_session.flush() results = await search_symbol_across_repos( db_session, "alice", "anything", visible_to_user="alice" ) assert results == [] class TestIntegrationCrossRepoImpact: async def test_returns_none_if_source_repo_not_in_workspace( self, db_session: AsyncSession ) -> None: await db_session.flush() result = await cross_repo_impact( db_session, "alice", "nonexistent-repo", "file.py::Foo", visible_to_user="alice" ) assert result is None async def test_returns_none_if_address_not_in_index( self, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, "alice", visibility="public") c_id = _uid() await _db_symbol_index(db_session, repo.repo_id, {"file.py::OtherFn": [_entry(c_id)]}) await db_session.flush() result = await cross_repo_impact( db_session, "alice", repo.repo_id, "file.py::Missing", visible_to_user="alice" ) assert result is None async def test_returns_impact_for_valid_address( self, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, "alice", visibility="public") c_id = _uid() await _db_symbol_index( db_session, repo.repo_id, { "file.py::Foo": [_entry(c_id)], "file.py::Bar": [_entry(c_id)], # co-changes with Foo }, ) await db_session.flush() result = await cross_repo_impact( db_session, "alice", repo.repo_id, "file.py::Foo", visible_to_user="alice" ) assert result is not None assert result.address == "file.py::Foo" assert result.source_repo_id == repo.repo_id # Bar co-changes with Foo in the same commit local_addresses = [e["address"] for e in result.local_co_changed] assert "file.py::Bar" in local_addresses class TestIntegrationWorkspaceBlastRisk: async def test_returns_top_n_symbols(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session, "alice", visibility="public") # sym_a: 5 commit entries; sym_b: 2 entries_a = [_entry(f"c{i}") for i in range(5)] entries_b = [_entry(f"d{i}") for i in range(2)] await _db_symbol_index( db_session, repo.repo_id, {"file.py::sym_a": entries_a, "file.py::sym_b": entries_b}, ) await db_session.flush() results = await workspace_blast_risk_top_n( db_session, "alice", top_n=1, visible_to_user="alice" ) assert len(results) == 1 assert results[0].address == "file.py::sym_a" async def test_sorted_by_co_change_count_desc( self, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, "alice", visibility="public") entries = {f"file.py::sym_{i}": [_entry(_uid())] * (10 - i) for i in range(5)} await _db_symbol_index(db_session, repo.repo_id, entries) await db_session.flush() results = await workspace_blast_risk_top_n( db_session, "alice", top_n=5, visible_to_user="alice" ) counts = [r.co_change_count for r in results] assert counts == sorted(counts, reverse=True) class TestIntegrationBuildDepsGraph: async def test_source_repo_not_in_workspace_returns_empty( self, db_session: AsyncSession ) -> None: await db_session.flush() g = await build_deps_graph( db_session, "alice", "nonexistent", visible_to_user="alice" ) assert g.nodes == [] assert g.edges == [] async def test_builds_nodes_from_symbol_history( self, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, "alice", visibility="public") c_id = _uid() # Use dot-only addresses so _module_prefix produces clean 3-segment node IDs await _db_symbol_index( db_session, repo.repo_id, { "musehub.services.ci.run": [_entry(c_id)], "musehub.services.ci.cancel": [_entry(c_id)], "musehub.services.auth.login": [_entry(c_id)], }, ) await db_session.flush() g = await build_deps_graph( db_session, "alice", repo.repo_id, visible_to_user="alice" ) node_ids = [n.id for n in g.nodes] # _module_prefix("musehub.services.ci.run") → "musehub.services.ci" assert "musehub.services.ci" in node_ids async def test_no_symbol_history_returns_empty_graph( self, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, "alice", visibility="public") await db_session.flush() g = await build_deps_graph( db_session, "alice", repo.repo_id, visible_to_user="alice" ) assert g.nodes == [] # =========================================================================== # Layer 3 — E2E # =========================================================================== class TestE2ESymbolSearch: async def test_search_page_200_with_query( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: repo = await _db_repo(db_session, "testuser", visibility="public") await _db_symbol_index( db_session, repo.repo_id, {"file.py::MyFunc": [_entry(_uid())]} ) await db_session.commit() r = await client.get("/testuser/search?q=MyFunc", headers=auth_headers) assert r.status_code == 200 async def test_search_page_200_empty_query( self, client: AsyncClient, auth_headers: StrDict, ) -> None: r = await client.get("/testuser/search", headers=auth_headers) assert r.status_code == 200 async def test_search_page_no_auth_public_owner( self, client: AsyncClient, db_session: AsyncSession, ) -> None: """Public symbol search is accessible without auth token.""" repo = await _db_repo(db_session, "testuser", visibility="public") await _db_symbol_index( db_session, repo.repo_id, {"file.py::PubFn": [_entry(_uid())]} ) await db_session.commit() r = await client.get("/testuser/search?q=PubFn") # UI route renders HTML; should succeed (200) assert r.status_code == 200 async def test_search_returns_html( self, client: AsyncClient, auth_headers: StrDict, ) -> None: r = await client.get("/testuser/search?q=foo", headers=auth_headers) assert r.status_code == 200 assert "text/html" in r.headers.get("content-type", "") # =========================================================================== # Layer 4 — Stress # =========================================================================== class TestStress: async def test_search_across_10_repos(self, db_session: AsyncSession) -> None: for i in range(10): repo = await _db_repo(db_session, "alice", name=f"repo-{i}", visibility="public") history = {f"file.py::Sym{i}_{j}": [_entry(_uid())] for j in range(10)} await _db_symbol_index(db_session, repo.repo_id, history) await db_session.flush() results = await search_symbol_across_repos( db_session, "alice", "Sym", limit=30, visible_to_user="alice" ) assert len(results) <= 30 async def test_concurrent_workspace_blast_risk( self, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, "alice", visibility="public") history = {f"file.py::sym_{i}": [_entry(_uid())] * 3 for i in range(30)} await _db_symbol_index(db_session, repo.repo_id, history) await db_session.flush() results = await asyncio.gather( *[ workspace_blast_risk_top_n(db_session, "alice", top_n=10, visible_to_user="alice") for _ in range(5) ] ) assert all(len(r) <= 10 for r in results) async def test_blast_risk_100_symbols(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session, "alice", visibility="public") history = { f"musehub.services.mod_{i}::fn_{j}": [_entry(_uid())] * (i + 1) for i in range(10) for j in range(10) } await _db_symbol_index(db_session, repo.repo_id, history) await db_session.flush() results = await workspace_blast_risk_top_n( db_session, "alice", top_n=20, visible_to_user="alice" ) assert len(results) == 20 # =========================================================================== # Layer 5 — Data Integrity # =========================================================================== class TestDataIntegrity: async def test_search_results_sorted_by_co_change_desc( self, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, "alice", visibility="public") history = { "file.py::Rarely": [_entry(_uid())], "file.py::Often": [_entry(_uid())] * 8, "file.py::Medium": [_entry(_uid())] * 3, } await _db_symbol_index(db_session, repo.repo_id, history) await db_session.flush() results = await search_symbol_across_repos( db_session, "alice", "file.py", visible_to_user="alice" ) counts = [r.co_change_count for r in results] assert counts == sorted(counts, reverse=True) async def test_blast_risk_top_n_hard_cap( self, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, "alice", visibility="public") history = {f"file.py::sym_{i}": [_entry(_uid())] for i in range(50)} await _db_symbol_index(db_session, repo.repo_id, history) await db_session.flush() results = await workspace_blast_risk_top_n( db_session, "alice", top_n=10, visible_to_user="alice" ) assert len(results) == 10 async def test_cross_repo_match_fields_populated( self, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, "alice", visibility="public") c_id = _uid() await _db_symbol_index( db_session, repo.repo_id, {"a.b.c::MyFn": [_entry(c_id)]} ) await db_session.flush() results = await search_symbol_across_repos( db_session, "alice", "MyFn", visible_to_user="alice" ) assert len(results) == 1 m = results[0] assert m.repo_id == repo.repo_id assert m.address == "a.b.c::MyFn" assert m.last_op in ("add", "modify", "delete") assert m.co_change_count == 1 async def test_deps_graph_max_nodes_cap( self, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, "alice", visibility="public") history = { f"module_{i}.sub.fn::Sym": [_entry(_uid())] for i in range(80) } await _db_symbol_index(db_session, repo.repo_id, history) await db_session.flush() g = await build_deps_graph( db_session, "alice", repo.repo_id, visible_to_user="alice", max_nodes=60 ) assert len(g.nodes) <= 60 # =========================================================================== # Layer 6 — Security # =========================================================================== class TestSecurity: async def test_private_repo_symbols_invisible_to_non_owner( self, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, "alice", visibility="private") await _db_symbol_index( db_session, repo.repo_id, {"secret.py::SecretKey": [_entry(_uid())]} ) await db_session.flush() results = await search_symbol_across_repos( db_session, "alice", "SecretKey", visible_to_user="bob" ) assert results == [] async def test_unauthenticated_only_sees_public( self, db_session: AsyncSession ) -> None: pub_repo = await _db_repo(db_session, "alice", visibility="public") priv_repo = await _db_repo(db_session, "alice", visibility="private") c1, c2 = _uid(), _uid() await _db_symbol_index(db_session, pub_repo.repo_id, {"pub.py::PubFn": [_entry(c1)]}) await _db_symbol_index(db_session, priv_repo.repo_id, {"priv.py::PrivFn": [_entry(c2)]}) await db_session.flush() results = await search_symbol_across_repos( db_session, "alice", "Fn", visible_to_user=None ) addresses = [r.address for r in results] assert "pub.py::PubFn" in addresses assert "priv.py::PrivFn" not in addresses async def test_blast_risk_private_repo_invisible_to_others( self, db_session: AsyncSession ) -> None: priv = await _db_repo(db_session, "alice", visibility="private") await _db_symbol_index( db_session, priv.repo_id, {"file.py::Hidden": [_entry(_uid())] * 10} ) await db_session.flush() results = await workspace_blast_risk_top_n( db_session, "alice", top_n=20, visible_to_user="bob" ) assert all(r.repo_id != priv.repo_id for r in results) async def test_cross_repo_impact_private_source_invisible( self, db_session: AsyncSession ) -> None: """cross_repo_impact returns None when source repo is private and caller is not owner.""" priv = await _db_repo(db_session, "alice", visibility="private") c_id = _uid() await _db_symbol_index( db_session, priv.repo_id, {"file.py::Fn": [_entry(c_id)]} ) await db_session.flush() result = await cross_repo_impact( db_session, "alice", priv.repo_id, "file.py::Fn", visible_to_user="bob" ) # bob can't see alice's private repo → source_repo is None → returns None assert result is None # =========================================================================== # Layer 7 — Performance # =========================================================================== class TestPerformance: async def test_search_across_5_repos_under_300ms( self, db_session: AsyncSession ) -> None: for i in range(5): repo = await _db_repo(db_session, "alice", name=f"perf-{i}", visibility="public") history = {f"file.py::Sym{i}_{j}": [_entry(_uid())] for j in range(20)} await _db_symbol_index(db_session, repo.repo_id, history) await db_session.flush() start = time.perf_counter() results = await search_symbol_across_repos( db_session, "alice", "Sym", limit=30, visible_to_user="alice" ) elapsed = time.perf_counter() - start assert elapsed < 0.3, f"search took {elapsed:.3f}s, expected <0.3s" assert len(results) <= 30 async def test_workspace_blast_risk_50_symbols_under_200ms( self, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, "alice", visibility="public") history = {f"file.py::sym_{i}": [_entry(_uid())] * (i % 5 + 1) for i in range(50)} await _db_symbol_index(db_session, repo.repo_id, history) await db_session.flush() start = time.perf_counter() results = await workspace_blast_risk_top_n( db_session, "alice", top_n=20, visible_to_user="alice" ) elapsed = time.perf_counter() - start assert elapsed < 0.2, f"blast risk took {elapsed:.3f}s, expected <0.2s"