"""Section 38 — UI / SSR Routes: 7-layer test suite. Existing coverage (347 tests across 27 files) focuses on E2E page rendering for issues, commits, labels, milestones, notifications, forks, topics, user profiles, and settings. This file adds the missing layers and fills coverage gaps: 1. Unit — pure utility functions: is_htmx, htmx_trigger, _infer_sym_kind, _fmt_relative, licenses_for_viewer_type (no DB, no HTTP) 2. Integration — repo home page, proposals list, agents swarm, blob/blame pages with real DB state 3. E2E — pages with no existing test files: repo home, symbols, proposals, agents, intel, blob/raw; plus HTMX fragment routing 4. Stress — sequential loads across multiple pages; empty-state fallbacks 5. Data Integrity — nav counts correct, filter accuracy, XSS content escaped 6. Security — auth-required pages return 401 without token; injected content escaped; debug info not leaked 7. Performance — key pages respond under 500ms """ from __future__ import annotations import secrets import time from datetime import datetime, timezone import pytest from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_identity_id, compute_issue_id, compute_proposal_id, compute_repo_id from musehub.types.json_types import StrDict from musehub.db.musehub_identity_models import MusehubIdentity from musehub.db.musehub_repo_models import MusehubRepo from musehub.db.musehub_social_models import MusehubIssue, MusehubProposal # ───────────────────────────────────────────────────────────────────────────── # Helpers shared across layers # ───────────────────────────────────────────────────────────────────────────── _OWNER = "ssr-tester" _SLUG = "ssr-test-repo" async def _seed_identity(db: AsyncSession, handle: str = _OWNER) -> MusehubIdentity: identity = MusehubIdentity(handle=handle, identity_type="human", display_name=handle) db.add(identity) await db.commit() await db.refresh(identity) return identity async def _seed_repo( db: AsyncSession, owner: str = _OWNER, slug: str = _SLUG, visibility: str = "public", ) -> MusehubRepo: created_at = datetime.now(tz=timezone.utc) owner_id = compute_identity_id(owner.encode()) repo = MusehubRepo( repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()), name=slug, owner=owner, slug=slug, visibility=visibility, owner_user_id=owner_id, created_at=created_at, updated_at=created_at, ) db.add(repo) await db.commit() await db.refresh(repo) return repo async def _seed_issue( db: AsyncSession, repo_id: str, *, number: int = 1, title: str = "Test issue", state: str = "open", ) -> MusehubIssue: created_at = datetime.now(tz=timezone.utc) author_id = compute_identity_id(_OWNER.encode()) issue = MusehubIssue( issue_id=compute_issue_id(repo_id, author_id, created_at.isoformat()), repo_id=repo_id, number=number, title=title, body="Body text.", state=state, labels=[], author=_OWNER, created_at=created_at, updated_at=created_at, ) db.add(issue) await db.commit() await db.refresh(issue) return issue async def _seed_proposal( db: AsyncSession, repo_id: str, *, proposal_number: int = 1, title: str = "Add harmony voice", state: str = "open", ) -> MusehubProposal: created_at = datetime.now(tz=timezone.utc) author_id = compute_identity_id(_OWNER.encode()) proposal = MusehubProposal( proposal_id=compute_proposal_id( repo_id, author_id, "feat/harmony", "main", created_at.isoformat() ), repo_id=repo_id, proposal_number=proposal_number, title=title, body="Proposal body.", state=state, from_branch="feat/harmony", to_branch="main", author=_OWNER, created_at=created_at, updated_at=created_at, ) db.add(proposal) await db.commit() await db.refresh(proposal) return proposal # ───────────────────────────────────────────────────────────────────────────── # LAYER 1 — UNIT # ───────────────────────────────────────────────────────────────────────────── class TestIsHtmx: """Unit tests for is_htmx / is_htmx_boosted helpers.""" def _req(self, headers: StrDict) -> None: from starlette.testclient import TestClient from starlette.applications import Starlette from starlette.routing import Route from starlette.requests import Request as StarletteRequest from starlette.responses import PlainTextResponse captured: list[StarletteRequest] = [] def handler(req: StarletteRequest) -> None: captured.append(req) return PlainTextResponse("ok") app = Starlette(routes=[Route("/", handler)]) client = TestClient(app, raise_server_exceptions=False) client.get("/", headers=headers) return captured[0] def test_no_header_is_not_htmx(self) -> None: from musehub.api.routes.musehub.htmx_helpers import is_htmx req = self._req({}) assert is_htmx(req) is False def test_hx_request_true_is_htmx(self) -> None: from musehub.api.routes.musehub.htmx_helpers import is_htmx req = self._req({"HX-Request": "true"}) assert is_htmx(req) is True def test_hx_request_false_is_not_htmx(self) -> None: from musehub.api.routes.musehub.htmx_helpers import is_htmx req = self._req({"HX-Request": "false"}) assert is_htmx(req) is False def test_hx_boosted_false_when_absent(self) -> None: from musehub.api.routes.musehub.htmx_helpers import is_htmx_boosted req = self._req({}) assert is_htmx_boosted(req) is False def test_hx_boosted_true_when_present(self) -> None: from musehub.api.routes.musehub.htmx_helpers import is_htmx_boosted req = self._req({"HX-Boosted": "true"}) assert is_htmx_boosted(req) is True class TestHtmxTrigger: """Unit tests for htmx_trigger helper.""" def test_sets_hx_trigger_header(self) -> None: from starlette.responses import Response from musehub.api.routes.musehub.htmx_helpers import htmx_trigger import json r = Response() htmx_trigger(r, "toast", {"message": "saved"}) payload = json.loads(r.headers["HX-Trigger"]) assert payload == {"toast": {"message": "saved"}} def test_sets_hx_trigger_header_no_detail(self) -> None: from starlette.responses import Response from musehub.api.routes.musehub.htmx_helpers import htmx_trigger import json r = Response() htmx_trigger(r, "refresh") payload = json.loads(r.headers["HX-Trigger"]) assert payload == {"refresh": True} def test_multiple_events_possible(self) -> None: """Calling twice overwrites but last write wins — not a bug, just a spec.""" from starlette.responses import Response from musehub.api.routes.musehub.htmx_helpers import htmx_trigger r = Response() htmx_trigger(r, "reload") htmx_trigger(r, "scroll") # Only one value in header (last write wins) assert "HX-Trigger" in r.headers class TestHtmxRedirect: """Unit tests for htmx_redirect helper.""" def test_returns_200(self) -> None: from musehub.api.routes.musehub.htmx_helpers import htmx_redirect r = htmx_redirect("/some/url") assert r.status_code == 200 def test_sets_hx_redirect_header(self) -> None: from musehub.api.routes.musehub.htmx_helpers import htmx_redirect r = htmx_redirect("/dashboard") assert r.headers["HX-Redirect"] == "/dashboard" def test_url_preserved_exactly(self) -> None: from musehub.api.routes.musehub.htmx_helpers import htmx_redirect url = "/gabriel/my-repo?welcome=1" r = htmx_redirect(url) assert r.headers["HX-Redirect"] == url class TestInferSymKind: """Unit tests for _infer_sym_kind.""" def _infer(self, addr: str) -> str: from musehub.api.routes.musehub.ui_symbols import _infer_sym_kind return _infer_sym_kind(addr) def test_camel_case_is_class(self) -> None: assert self._infer("file.py::MyClass") == "class" def test_snake_case_is_function(self) -> None: assert self._infer("file.py::my_function") == "function" def test_all_caps_is_variable(self) -> None: assert self._infer("file.py::MAX_RETRIES") == "variable" def test_no_namespace_camel(self) -> None: assert self._infer("CamelCase") == "file" def test_no_namespace_snake(self) -> None: assert self._infer("my_func") == "file" def test_private_fn_underscore_prefix(self) -> None: assert self._infer("_private_func") == "file" def test_dunder_is_file_without_namespace(self) -> None: assert self._infer("__init__") == "file" def test_empty_addr_is_file(self) -> None: assert self._infer("") == "file" def test_private_class_without_namespace(self) -> None: assert self._infer("_PrivateClass") == "file" class TestLicensesForViewerType: """Unit tests for licenses_for_viewer_type.""" def test_symbol_graph_returns_code_licenses(self) -> None: from musehub.api.routes.musehub.ui_new_repo import licenses_for_viewer_type result = licenses_for_viewer_type("symbol_graph") assert isinstance(result, list) assert len(result) > 0 assert all(isinstance(item, tuple) and len(item) == 2 for item in result) def test_default_returns_generic_licenses(self) -> None: from musehub.api.routes.musehub.ui_new_repo import licenses_for_viewer_type default = licenses_for_viewer_type("audio") code = licenses_for_viewer_type("symbol_graph") # They may overlap but should be distinct lists assert isinstance(default, list) def test_unknown_type_returns_list(self) -> None: from musehub.api.routes.musehub.ui_new_repo import licenses_for_viewer_type result = licenses_for_viewer_type("totally_unknown") assert isinstance(result, list) # ───────────────────────────────────────────────────────────────────────────── # LAYER 2 — INTEGRATION # ───────────────────────────────────────────────────────────────────────────── class TestRepoPageIntegration: """Integration: repo home page with real DB (no HTTP).""" async def test_repo_page_200_with_empty_repo( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _seed_repo(db_session, owner="int-owner", slug="int-repo") resp = await client.get(f"/int-owner/int-repo") assert resp.status_code == 200 async def test_repo_page_404_for_unknown_slug( self, client: AsyncClient, db_session: AsyncSession ) -> None: resp = await client.get("/nobody/totally-unknown-repo-xyz") assert resp.status_code == 404 async def test_repo_page_shows_owner_in_html( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_repo(db_session, owner="int-owner2", slug="int-repo2") resp = await client.get("/int-owner2/int-repo2") assert resp.status_code == 200 assert "int-owner2" in resp.text class TestProposalListIntegration: """Integration: proposals list page with real DB.""" async def test_proposal_list_200_empty( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_repo(db_session, owner="prop-owner", slug="prop-repo") resp = await client.get("/prop-owner/prop-repo/proposals") assert resp.status_code == 200 async def test_proposal_list_shows_pr_title( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _seed_repo(db_session, owner="prop-owner2", slug="prop-repo2") await _seed_proposal(db_session, str(repo.repo_id), title="Reverb on chorus") resp = await client.get("/prop-owner2/prop-repo2/proposals") assert resp.status_code == 200 assert "Reverb on chorus" in resp.text async def test_proposal_list_404_for_unknown_repo( self, client: AsyncClient, db_session: AsyncSession ) -> None: resp = await client.get("/nobody/unknown-proposals-repo/proposals") assert resp.status_code == 404 class TestAgentsPageIntegration: """Integration: agents swarm / coord pages with real DB.""" async def test_agents_swarm_page_200( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_repo(db_session, owner="agent-owner", slug="agent-repo") resp = await client.get("/agent-owner/agent-repo/agents/swarm") assert resp.status_code in (200, 404) # 404 if page not registered at /agents/swarm async def test_agents_coord_page_200( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_repo(db_session, owner="agent-owner2", slug="agent-repo2") resp = await client.get("/agent-owner2/agent-repo2/agents/coord") assert resp.status_code in (200, 404) # ───────────────────────────────────────────────────────────────────────────── # LAYER 3 — E2E # ───────────────────────────────────────────────────────────────────────────── class TestRepoHomeE2E: """E2E: repo home page (/{owner}/{slug}) — no existing test file.""" async def test_get_repo_home_returns_200( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_repo(db_session, owner="e2e-home", slug="home-repo") resp = await client.get("/e2e-home/home-repo") assert resp.status_code == 200 async def test_repo_home_returns_html( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_repo(db_session, owner="e2e-html", slug="html-repo") resp = await client.get("/e2e-html/html-repo") assert "text/html" in resp.headers.get("content-type", "") async def test_repo_home_no_auth_required( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Public repos should be accessible without authentication.""" await _seed_repo(db_session, owner="e2e-pub", slug="pub-repo", visibility="public") resp = await client.get("/e2e-pub/pub-repo") assert resp.status_code != 401 async def test_repo_home_json_format_returns_json( self, client: AsyncClient, db_session: AsyncSession ) -> None: """?format=json should return JSON instead of HTML.""" await _seed_repo(db_session, owner="e2e-json", slug="json-repo") resp = await client.get("/e2e-json/json-repo", params={"format": "json"}) assert resp.status_code == 200 assert "application/json" in resp.headers.get("content-type", "") async def test_repo_home_accept_json_returns_json( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Accept: application/json triggers the JSON shortcut.""" await _seed_repo(db_session, owner="e2e-acc", slug="acc-repo") resp = await client.get( "/e2e-acc/acc-repo", headers={"Accept": "application/json"}, ) assert resp.status_code == 200 data = resp.json() assert "slug" in data or "repoId" in data async def test_repo_home_htmx_returns_fragment( self, client: AsyncClient, db_session: AsyncSession ) -> None: """HX-Request returns file_tree fragment, not full page.""" await _seed_repo(db_session, owner="e2e-htmx", slug="htmx-repo") resp = await client.get( "/e2e-htmx/htmx-repo", headers={"HX-Request": "true"}, ) # Fragment should NOT contain the full wrapper assert resp.status_code == 200 body = resp.text assert " None: resp = await client.get("/nobody/repo-that-does-not-exist-xyz-abc") assert resp.status_code == 404 class TestSymbolsPageE2E: """E2E: symbols page (/{owner}/{slug}/symbols) — no existing test file.""" async def test_symbols_list_page_returns_200( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_repo(db_session, owner="sym-owner", slug="sym-repo") resp = await client.get("/sym-owner/sym-repo/symbols") assert resp.status_code == 200 async def test_symbols_list_no_auth_required( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_repo(db_session, owner="sym-noauth", slug="sym-noauth-repo") resp = await client.get("/sym-noauth/sym-noauth-repo/symbols") assert resp.status_code != 401 async def test_symbols_list_unknown_repo_404( self, client: AsyncClient, db_session: AsyncSession ) -> None: resp = await client.get("/nobody/unknown-sym-repo-xyz/symbols") assert resp.status_code == 404 async def test_symbol_detail_unknown_repo_404( self, client: AsyncClient, db_session: AsyncSession ) -> None: resp = await client.get("/nobody/unknown-sym-repo-xyz/symbol/file.py::MyFn") assert resp.status_code == 404 class TestProposalsPageE2E: """E2E: proposals pages — supplementing the minimal ssr file.""" async def test_proposal_list_returns_200( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_repo(db_session, owner="proposal-e2e", slug="proposal-e2e-repo") resp = await client.get("/proposal-e2e/proposal-e2e-repo/proposals") assert resp.status_code == 200 async def test_proposal_list_html( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_repo(db_session, owner="proposal-e2e2", slug="proposal-e2e-repo2") resp = await client.get("/proposal-e2e2/proposal-e2e-repo2/proposals") assert "text/html" in resp.headers.get("content-type", "") async def test_proposal_detail_404_for_unknown_pr_id( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_repo(db_session, owner="proposal-e2e3", slug="proposal-e2e-repo3") fake_id = secrets.token_hex(16) resp = await client.get(f"/proposal-e2e3/proposal-e2e-repo3/proposals/{fake_id}") assert resp.status_code == 404 async def test_proposal_detail_renders_pr_title( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _seed_repo(db_session, owner="proposal-detail", slug="proposal-detail-repo") proposal = await _seed_proposal( db_session, str(repo.repo_id), title="Unique proposal XYZ" ) # URL uses proposal_id, not proposal_number resp = await client.get( f"/proposal-detail/proposal-detail-repo/proposals/{proposal.proposal_id}" ) assert resp.status_code == 200 assert "Unique proposal XYZ" in resp.text class TestIntelPageE2E: """E2E: intel pages — no existing test file.""" async def test_intel_page_returns_200_or_empty( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_repo(db_session, owner="intel-owner", slug="intel-repo") resp = await client.get("/intel-owner/intel-repo/intel") assert resp.status_code in (200, 404) async def test_intel_dead_page_no_500( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_repo(db_session, owner="intel-dead", slug="intel-dead-repo") resp = await client.get("/intel-dead/intel-dead-repo/intel/dead") assert resp.status_code != 500 class TestBlobAndRawE2E: """E2E: blob and raw file pages — no existing test file.""" async def test_blob_page_404_for_unknown_repo( self, client: AsyncClient, db_session: AsyncSession ) -> None: resp = await client.get("/nobody/unknown-blob-repo/blob/HEAD/README.md") assert resp.status_code == 404 async def test_raw_file_404_for_unknown_repo( self, client: AsyncClient, db_session: AsyncSession ) -> None: resp = await client.get("/nobody/unknown-raw-repo/raw/HEAD/README.md") assert resp.status_code == 404 async def test_blob_page_404_for_unknown_file( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_repo(db_session, owner="blob-owner", slug="blob-repo") resp = await client.get("/blob-owner/blob-repo/blob/HEAD/nonexistent.md") assert resp.status_code in (200, 404) # empty repo may return 404 or empty page class TestHTMXFragmentRoutingE2E: """E2E: HTMX fragment routing for pages that support it.""" async def test_issue_list_htmx_returns_fragment( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _seed_repo(db_session, owner="htmx-frag", slug="htmx-frag-repo") await _seed_issue(db_session, str(repo.repo_id), title="HTMX test issue") resp = await client.get( "/htmx-frag/htmx-frag-repo/issues", headers={"HX-Request": "true"}, ) assert resp.status_code == 200 assert " None: await _seed_repo(db_session, owner="htmx-boost", slug="htmx-boost-repo") resp = await client.get( "/htmx-boost/htmx-boost-repo/issues", headers={"HX-Request": "true", "HX-Boosted": "true"}, ) # Boosted requests must get the full page assert resp.status_code == 200 async def test_search_page_returns_200( self, client: AsyncClient, db_session: AsyncSession ) -> None: resp = await client.get("/search", params={"q": "test"}) assert resp.status_code == 200 async def test_explore_page_returns_200( self, client: AsyncClient, db_session: AsyncSession ) -> None: resp = await client.get("/explore") assert resp.status_code == 200 # ───────────────────────────────────────────────────────────────────────────── # LAYER 4 — STRESS # ───────────────────────────────────────────────────────────────────────────── class TestUISSRStress: """Stress: sequential page loads and empty-state robustness.""" async def test_multiple_repo_pages_sequential( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Load 5 different repos' pages in sequence — none should 500.""" for i in range(5): owner = f"stress-owner-{i}" slug = f"stress-repo-{i}" await _seed_repo(db_session, owner=owner, slug=slug) resp = await client.get(f"/{owner}/{slug}") assert resp.status_code == 200, f"Repo {i} returned {resp.status_code}" async def test_issue_list_with_many_issues( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Issue list renders correctly with 20 seeded issues.""" repo = await _seed_repo(db_session, owner="stress-issues", slug="stress-iss-repo") for i in range(20): await _seed_issue( db_session, str(repo.repo_id), number=i + 1, title=f"Issue {i}" ) resp = await client.get("/stress-issues/stress-iss-repo/issues") assert resp.status_code == 200 async def test_empty_state_repo_home( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Repo with no commits/files renders without 500.""" await _seed_repo(db_session, owner="empty-owner", slug="empty-repo") resp = await client.get("/empty-owner/empty-repo") assert resp.status_code == 200 async def test_empty_state_proposals_page( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Proposals page with no proposals renders without 500.""" await _seed_repo(db_session, owner="empty-proposal", slug="empty-proposal-repo") resp = await client.get("/empty-proposal/empty-proposal-repo/proposals") assert resp.status_code == 200 async def test_empty_state_symbols_page( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Symbols page with no symbol index renders without 500.""" await _seed_repo(db_session, owner="empty-sym", slug="empty-sym-repo") resp = await client.get("/empty-sym/empty-sym-repo/symbols") assert resp.status_code == 200 async def test_topics_page_returns_200( self, client: AsyncClient, db_session: AsyncSession ) -> None: resp = await client.get("/topics") assert resp.status_code == 200 async def test_search_page_empty_query_200( self, client: AsyncClient, db_session: AsyncSession ) -> None: resp = await client.get("/search") assert resp.status_code == 200 async def test_concurrent_issue_list_requests( self, client: AsyncClient, db_session: AsyncSession ) -> None: """3 sequential requests to the same issues page succeed.""" repo = await _seed_repo(db_session, owner="conc-owner", slug="conc-repo") await _seed_issue(db_session, str(repo.repo_id), title="Concurrent test") for _ in range(3): resp = await client.get("/conc-owner/conc-repo/issues") assert resp.status_code == 200 # ───────────────────────────────────────────────────────────────────────────── # LAYER 5 — DATA INTEGRITY # ───────────────────────────────────────────────────────────────────────────── class TestUISSRDataIntegrity: """Data integrity: nav counts correct, filter accuracy, XSS escaping.""" async def test_issue_open_count_matches_db( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Nav tab shows the correct open issue count seeded in DB.""" repo = await _seed_repo(db_session, owner="count-owner", slug="count-repo") for i in range(3): await _seed_issue( db_session, str(repo.repo_id), number=i + 1, state="open" ) resp = await client.get("/count-owner/count-repo/issues") assert resp.status_code == 200 # The number 3 should appear in the open tab count assert "3" in resp.text async def test_closed_issues_not_shown_on_open_tab( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _seed_repo(db_session, owner="filter-owner", slug="filter-repo") await _seed_issue( db_session, str(repo.repo_id), number=1, title="Open issue", state="open" ) await _seed_issue( db_session, str(repo.repo_id), number=2, title="Closed issue xyz", state="closed", ) resp = await client.get("/filter-owner/filter-repo/issues", params={"state": "open"}) assert resp.status_code == 200 assert "Open issue" in resp.text assert "Closed issue xyz" not in resp.text async def test_open_issues_not_shown_on_closed_tab( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _seed_repo(db_session, owner="filter-owner2", slug="filter-repo2") await _seed_issue( db_session, str(repo.repo_id), number=1, title="Open issue abc", state="open" ) await _seed_issue( db_session, str(repo.repo_id), number=2, title="Closed issue only", state="closed", ) resp = await client.get( "/filter-owner2/filter-repo2/issues", params={"state": "closed"} ) assert resp.status_code == 200 assert "Closed issue only" in resp.text assert "Open issue abc" not in resp.text async def test_proposal_title_rendered_in_list( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _seed_repo(db_session, owner="proposal-render", slug="proposal-render-repo") await _seed_proposal( db_session, str(repo.repo_id), title="Piano roll refactor" ) resp = await client.get("/proposal-render/proposal-render-repo/proposals") assert "Piano roll refactor" in resp.text async def test_xss_title_escaped_in_issue_list( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Issue titles with HTML special chars must be escaped.""" repo = await _seed_repo(db_session, owner="xss-owner", slug="xss-repo") xss_title = '' await _seed_issue(db_session, str(repo.repo_id), title=xss_title) resp = await client.get("/xss-owner/xss-repo/issues") assert resp.status_code == 200 # The raw script tag must not appear unescaped assert "'}) assert "" not in resp.text async def test_no_debug_info_in_production_errors( self, client: AsyncClient, db_session: AsyncSession ) -> None: """404 responses must not contain Python module paths.""" resp = await client.get("/nobody/sec-unknown-repo-xyz") assert "/Users/" not in resp.text assert "musehub/" not in resp.text or resp.status_code != 500 async def test_sql_injection_in_owner_does_not_500( self, client: AsyncClient, db_session: AsyncSession ) -> None: """URL path parameters passed as SQL injection should result in 404, not 500.""" resp = await client.get("/'; DROP TABLE musehub_repos; --/repo") assert resp.status_code != 500 # ───────────────────────────────────────────────────────────────────────────── # LAYER 7 — PERFORMANCE # ───────────────────────────────────────────────────────────────────────────── class TestUISSRPerformance: """Performance: page responses under time budgets.""" async def test_repo_home_under_500ms( self, client: AsyncClient, db_session: AsyncSession ) -> None: await _seed_repo(db_session, owner="perf-owner", slug="perf-repo") # Warm-up await client.get("/perf-owner/perf-repo") start = time.perf_counter() resp = await client.get("/perf-owner/perf-repo") elapsed = time.perf_counter() - start assert resp.status_code == 200 assert elapsed < 0.500, f"Repo home took {elapsed*1000:.1f}ms (limit 500ms)" async def test_issue_list_under_500ms( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _seed_repo(db_session, owner="perf-iss", slug="perf-iss-repo") for i in range(5): await _seed_issue( db_session, str(repo.repo_id), number=i + 1, title=f"Issue {i}" ) await client.get("/perf-iss/perf-iss-repo/issues") start = time.perf_counter() resp = await client.get("/perf-iss/perf-iss-repo/issues") elapsed = time.perf_counter() - start assert resp.status_code == 200 assert elapsed < 0.500, f"Issue list took {elapsed*1000:.1f}ms (limit 500ms)" async def test_proposals_page_under_500ms( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _seed_repo(db_session, owner="perf-proposal", slug="perf-proposal-repo") await _seed_proposal(db_session, str(repo.repo_id), title="Perf test proposal") await client.get("/perf-proposal/perf-proposal-repo/proposals") start = time.perf_counter() resp = await client.get("/perf-proposal/perf-proposal-repo/proposals") elapsed = time.perf_counter() - start assert resp.status_code == 200 assert elapsed < 0.500, f"Proposals page took {elapsed*1000:.1f}ms (limit 500ms)" async def test_explore_page_under_500ms( self, client: AsyncClient, db_session: AsyncSession ) -> None: await client.get("/explore") # warm-up start = time.perf_counter() resp = await client.get("/explore") elapsed = time.perf_counter() - start assert resp.status_code == 200 assert elapsed < 0.500, f"Explore page took {elapsed*1000:.1f}ms (limit 500ms)" async def test_search_page_under_500ms( self, client: AsyncClient, db_session: AsyncSession ) -> None: await client.get("/search") start = time.perf_counter() resp = await client.get("/search", params={"q": "test"}) elapsed = time.perf_counter() - start assert resp.status_code == 200 assert elapsed < 0.500, f"Search page took {elapsed*1000:.1f}ms (limit 500ms)" def test_to_camel_via_htmx_helpers_import_fast(self) -> None: """Importing htmx_helpers is fast (module already loaded).""" start = time.perf_counter() from musehub.api.routes.musehub.htmx_helpers import is_htmx, htmx_trigger # noqa: F401 elapsed = time.perf_counter() - start assert elapsed < 0.050, f"Import took {elapsed*1000:.1f}ms (limit 50ms)"