"""Section 38 — UI / SSR Routes: 7-layer test suite.
Existing coverage (347 tests across 27 files) focuses on E2E page rendering for
issues, commits, labels, milestones, notifications, forks, topics, user profiles,
and settings. This file adds the missing layers and fills coverage gaps:
1. Unit — pure utility functions: is_htmx, htmx_trigger, _infer_sym_kind,
_fmt_relative, licenses_for_viewer_type (no DB, no HTTP)
2. Integration — repo home page, proposals list, agents swarm, blob/blame pages
with real DB state
3. E2E — pages with no existing test files: repo home, symbols, proposals,
agents, intel, blob/raw; plus HTMX fragment routing
4. Stress — sequential loads across multiple pages; empty-state fallbacks
5. Data Integrity — nav counts correct, filter accuracy, XSS content escaped
6. Security — auth-required pages return 401 without token; injected content
escaped; debug info not leaked
7. Performance — key pages respond under 500ms
"""
from __future__ import annotations
import secrets
import time
from datetime import datetime, timezone
import pytest
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from musehub.core.genesis import compute_identity_id, compute_issue_id, compute_proposal_id, compute_repo_id
from musehub.types.json_types import StrDict
from musehub.db.musehub_identity_models import MusehubIdentity
from musehub.db.musehub_repo_models import MusehubRepo
from musehub.db.musehub_social_models import MusehubIssue, MusehubProposal
# ─────────────────────────────────────────────────────────────────────────────
# Helpers shared across layers
# ─────────────────────────────────────────────────────────────────────────────
_OWNER = "ssr-tester"
_SLUG = "ssr-test-repo"
async def _seed_identity(db: AsyncSession, handle: str = _OWNER) -> MusehubIdentity:
identity = MusehubIdentity(handle=handle, identity_type="human", display_name=handle)
db.add(identity)
await db.commit()
await db.refresh(identity)
return identity
async def _seed_repo(
db: AsyncSession,
owner: str = _OWNER,
slug: str = _SLUG,
visibility: str = "public",
) -> MusehubRepo:
created_at = datetime.now(tz=timezone.utc)
owner_id = compute_identity_id(owner.encode())
repo = MusehubRepo(
repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()),
name=slug,
owner=owner,
slug=slug,
visibility=visibility,
owner_user_id=owner_id,
created_at=created_at,
updated_at=created_at,
)
db.add(repo)
await db.commit()
await db.refresh(repo)
return repo
async def _seed_issue(
db: AsyncSession,
repo_id: str,
*,
number: int = 1,
title: str = "Test issue",
state: str = "open",
) -> MusehubIssue:
created_at = datetime.now(tz=timezone.utc)
author_id = compute_identity_id(_OWNER.encode())
issue = MusehubIssue(
issue_id=compute_issue_id(repo_id, author_id, created_at.isoformat()),
repo_id=repo_id,
number=number,
title=title,
body="Body text.",
state=state,
labels=[],
author=_OWNER,
created_at=created_at,
updated_at=created_at,
)
db.add(issue)
await db.commit()
await db.refresh(issue)
return issue
async def _seed_proposal(
db: AsyncSession,
repo_id: str,
*,
proposal_number: int = 1,
title: str = "Add harmony voice",
state: str = "open",
) -> MusehubProposal:
created_at = datetime.now(tz=timezone.utc)
author_id = compute_identity_id(_OWNER.encode())
proposal = MusehubProposal(
proposal_id=compute_proposal_id(
repo_id, author_id, "feat/harmony", "main", created_at.isoformat()
),
repo_id=repo_id,
proposal_number=proposal_number,
title=title,
body="Proposal body.",
state=state,
from_branch="feat/harmony",
to_branch="main",
author=_OWNER,
created_at=created_at,
updated_at=created_at,
)
db.add(proposal)
await db.commit()
await db.refresh(proposal)
return proposal
# ─────────────────────────────────────────────────────────────────────────────
# LAYER 1 — UNIT
# ─────────────────────────────────────────────────────────────────────────────
class TestIsHtmx:
"""Unit tests for is_htmx / is_htmx_boosted helpers."""
def _req(self, headers: StrDict) -> None:
from starlette.testclient import TestClient
from starlette.applications import Starlette
from starlette.routing import Route
from starlette.requests import Request as StarletteRequest
from starlette.responses import PlainTextResponse
captured: list[StarletteRequest] = []
def handler(req: StarletteRequest) -> None:
captured.append(req)
return PlainTextResponse("ok")
app = Starlette(routes=[Route("/", handler)])
client = TestClient(app, raise_server_exceptions=False)
client.get("/", headers=headers)
return captured[0]
def test_no_header_is_not_htmx(self) -> None:
from musehub.api.routes.musehub.htmx_helpers import is_htmx
req = self._req({})
assert is_htmx(req) is False
def test_hx_request_true_is_htmx(self) -> None:
from musehub.api.routes.musehub.htmx_helpers import is_htmx
req = self._req({"HX-Request": "true"})
assert is_htmx(req) is True
def test_hx_request_false_is_not_htmx(self) -> None:
from musehub.api.routes.musehub.htmx_helpers import is_htmx
req = self._req({"HX-Request": "false"})
assert is_htmx(req) is False
def test_hx_boosted_false_when_absent(self) -> None:
from musehub.api.routes.musehub.htmx_helpers import is_htmx_boosted
req = self._req({})
assert is_htmx_boosted(req) is False
def test_hx_boosted_true_when_present(self) -> None:
from musehub.api.routes.musehub.htmx_helpers import is_htmx_boosted
req = self._req({"HX-Boosted": "true"})
assert is_htmx_boosted(req) is True
class TestHtmxTrigger:
"""Unit tests for htmx_trigger helper."""
def test_sets_hx_trigger_header(self) -> None:
from starlette.responses import Response
from musehub.api.routes.musehub.htmx_helpers import htmx_trigger
import json
r = Response()
htmx_trigger(r, "toast", {"message": "saved"})
payload = json.loads(r.headers["HX-Trigger"])
assert payload == {"toast": {"message": "saved"}}
def test_sets_hx_trigger_header_no_detail(self) -> None:
from starlette.responses import Response
from musehub.api.routes.musehub.htmx_helpers import htmx_trigger
import json
r = Response()
htmx_trigger(r, "refresh")
payload = json.loads(r.headers["HX-Trigger"])
assert payload == {"refresh": True}
def test_multiple_events_possible(self) -> None:
"""Calling twice overwrites but last write wins — not a bug, just a spec."""
from starlette.responses import Response
from musehub.api.routes.musehub.htmx_helpers import htmx_trigger
r = Response()
htmx_trigger(r, "reload")
htmx_trigger(r, "scroll")
# Only one value in header (last write wins)
assert "HX-Trigger" in r.headers
class TestHtmxRedirect:
"""Unit tests for htmx_redirect helper."""
def test_returns_200(self) -> None:
from musehub.api.routes.musehub.htmx_helpers import htmx_redirect
r = htmx_redirect("/some/url")
assert r.status_code == 200
def test_sets_hx_redirect_header(self) -> None:
from musehub.api.routes.musehub.htmx_helpers import htmx_redirect
r = htmx_redirect("/dashboard")
assert r.headers["HX-Redirect"] == "/dashboard"
def test_url_preserved_exactly(self) -> None:
from musehub.api.routes.musehub.htmx_helpers import htmx_redirect
url = "/gabriel/my-repo?welcome=1"
r = htmx_redirect(url)
assert r.headers["HX-Redirect"] == url
class TestInferSymKind:
"""Unit tests for _infer_sym_kind."""
def _infer(self, addr: str) -> str:
from musehub.api.routes.musehub.ui_symbols import _infer_sym_kind
return _infer_sym_kind(addr)
def test_camel_case_is_class(self) -> None:
assert self._infer("file.py::MyClass") == "class"
def test_snake_case_is_function(self) -> None:
assert self._infer("file.py::my_function") == "function"
def test_all_caps_is_variable(self) -> None:
assert self._infer("file.py::MAX_RETRIES") == "variable"
def test_no_namespace_camel(self) -> None:
assert self._infer("CamelCase") == "file"
def test_no_namespace_snake(self) -> None:
assert self._infer("my_func") == "file"
def test_private_fn_underscore_prefix(self) -> None:
assert self._infer("_private_func") == "file"
def test_dunder_is_file_without_namespace(self) -> None:
assert self._infer("__init__") == "file"
def test_empty_addr_is_file(self) -> None:
assert self._infer("") == "file"
def test_private_class_without_namespace(self) -> None:
assert self._infer("_PrivateClass") == "file"
class TestLicensesForViewerType:
"""Unit tests for licenses_for_viewer_type."""
def test_symbol_graph_returns_code_licenses(self) -> None:
from musehub.api.routes.musehub.ui_new_repo import licenses_for_viewer_type
result = licenses_for_viewer_type("symbol_graph")
assert isinstance(result, list)
assert len(result) > 0
assert all(isinstance(item, tuple) and len(item) == 2 for item in result)
def test_default_returns_generic_licenses(self) -> None:
from musehub.api.routes.musehub.ui_new_repo import licenses_for_viewer_type
default = licenses_for_viewer_type("audio")
code = licenses_for_viewer_type("symbol_graph")
# They may overlap but should be distinct lists
assert isinstance(default, list)
def test_unknown_type_returns_list(self) -> None:
from musehub.api.routes.musehub.ui_new_repo import licenses_for_viewer_type
result = licenses_for_viewer_type("totally_unknown")
assert isinstance(result, list)
# ─────────────────────────────────────────────────────────────────────────────
# LAYER 2 — INTEGRATION
# ─────────────────────────────────────────────────────────────────────────────
class TestRepoPageIntegration:
"""Integration: repo home page with real DB (no HTTP)."""
async def test_repo_page_200_with_empty_repo(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
repo = await _seed_repo(db_session, owner="int-owner", slug="int-repo")
resp = await client.get(f"/int-owner/int-repo")
assert resp.status_code == 200
async def test_repo_page_404_for_unknown_slug(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
resp = await client.get("/nobody/totally-unknown-repo-xyz")
assert resp.status_code == 404
async def test_repo_page_shows_owner_in_html(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
await _seed_repo(db_session, owner="int-owner2", slug="int-repo2")
resp = await client.get("/int-owner2/int-repo2")
assert resp.status_code == 200
assert "int-owner2" in resp.text
class TestProposalListIntegration:
"""Integration: proposals list page with real DB."""
async def test_proposal_list_200_empty(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
await _seed_repo(db_session, owner="prop-owner", slug="prop-repo")
resp = await client.get("/prop-owner/prop-repo/proposals")
assert resp.status_code == 200
async def test_proposal_list_shows_pr_title(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
repo = await _seed_repo(db_session, owner="prop-owner2", slug="prop-repo2")
await _seed_proposal(db_session, str(repo.repo_id), title="Reverb on chorus")
resp = await client.get("/prop-owner2/prop-repo2/proposals")
assert resp.status_code == 200
assert "Reverb on chorus" in resp.text
async def test_proposal_list_404_for_unknown_repo(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
resp = await client.get("/nobody/unknown-proposals-repo/proposals")
assert resp.status_code == 404
class TestAgentsPageIntegration:
"""Integration: agents swarm / coord pages with real DB."""
async def test_agents_swarm_page_200(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
await _seed_repo(db_session, owner="agent-owner", slug="agent-repo")
resp = await client.get("/agent-owner/agent-repo/agents/swarm")
assert resp.status_code in (200, 404) # 404 if page not registered at /agents/swarm
async def test_agents_coord_page_200(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
await _seed_repo(db_session, owner="agent-owner2", slug="agent-repo2")
resp = await client.get("/agent-owner2/agent-repo2/agents/coord")
assert resp.status_code in (200, 404)
# ─────────────────────────────────────────────────────────────────────────────
# LAYER 3 — E2E
# ─────────────────────────────────────────────────────────────────────────────
class TestRepoHomeE2E:
"""E2E: repo home page (/{owner}/{slug}) — no existing test file."""
async def test_get_repo_home_returns_200(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
await _seed_repo(db_session, owner="e2e-home", slug="home-repo")
resp = await client.get("/e2e-home/home-repo")
assert resp.status_code == 200
async def test_repo_home_returns_html(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
await _seed_repo(db_session, owner="e2e-html", slug="html-repo")
resp = await client.get("/e2e-html/html-repo")
assert "text/html" in resp.headers.get("content-type", "")
async def test_repo_home_no_auth_required(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Public repos should be accessible without authentication."""
await _seed_repo(db_session, owner="e2e-pub", slug="pub-repo", visibility="public")
resp = await client.get("/e2e-pub/pub-repo")
assert resp.status_code != 401
async def test_repo_home_json_format_returns_json(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""?format=json should return JSON instead of HTML."""
await _seed_repo(db_session, owner="e2e-json", slug="json-repo")
resp = await client.get("/e2e-json/json-repo", params={"format": "json"})
assert resp.status_code == 200
assert "application/json" in resp.headers.get("content-type", "")
async def test_repo_home_accept_json_returns_json(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Accept: application/json triggers the JSON shortcut."""
await _seed_repo(db_session, owner="e2e-acc", slug="acc-repo")
resp = await client.get(
"/e2e-acc/acc-repo",
headers={"Accept": "application/json"},
)
assert resp.status_code == 200
data = resp.json()
assert "slug" in data or "repoId" in data
async def test_repo_home_htmx_returns_fragment(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""HX-Request returns file_tree fragment, not full page."""
await _seed_repo(db_session, owner="e2e-htmx", slug="htmx-repo")
resp = await client.get(
"/e2e-htmx/htmx-repo",
headers={"HX-Request": "true"},
)
# Fragment should NOT contain the full wrapper
assert resp.status_code == 200
body = resp.text
assert " None:
resp = await client.get("/nobody/repo-that-does-not-exist-xyz-abc")
assert resp.status_code == 404
class TestSymbolsPageE2E:
"""E2E: symbols page (/{owner}/{slug}/symbols) — no existing test file."""
async def test_symbols_list_page_returns_200(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
await _seed_repo(db_session, owner="sym-owner", slug="sym-repo")
resp = await client.get("/sym-owner/sym-repo/symbols")
assert resp.status_code == 200
async def test_symbols_list_no_auth_required(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
await _seed_repo(db_session, owner="sym-noauth", slug="sym-noauth-repo")
resp = await client.get("/sym-noauth/sym-noauth-repo/symbols")
assert resp.status_code != 401
async def test_symbols_list_unknown_repo_404(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
resp = await client.get("/nobody/unknown-sym-repo-xyz/symbols")
assert resp.status_code == 404
async def test_symbol_detail_unknown_repo_404(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
resp = await client.get("/nobody/unknown-sym-repo-xyz/symbol/file.py::MyFn")
assert resp.status_code == 404
class TestProposalsPageE2E:
"""E2E: proposals pages — supplementing the minimal ssr file."""
async def test_proposal_list_returns_200(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
await _seed_repo(db_session, owner="proposal-e2e", slug="proposal-e2e-repo")
resp = await client.get("/proposal-e2e/proposal-e2e-repo/proposals")
assert resp.status_code == 200
async def test_proposal_list_html(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
await _seed_repo(db_session, owner="proposal-e2e2", slug="proposal-e2e-repo2")
resp = await client.get("/proposal-e2e2/proposal-e2e-repo2/proposals")
assert "text/html" in resp.headers.get("content-type", "")
async def test_proposal_detail_404_for_unknown_pr_id(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
await _seed_repo(db_session, owner="proposal-e2e3", slug="proposal-e2e-repo3")
fake_id = secrets.token_hex(16)
resp = await client.get(f"/proposal-e2e3/proposal-e2e-repo3/proposals/{fake_id}")
assert resp.status_code == 404
async def test_proposal_detail_renders_pr_title(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
repo = await _seed_repo(db_session, owner="proposal-detail", slug="proposal-detail-repo")
proposal = await _seed_proposal(
db_session, str(repo.repo_id), title="Unique proposal XYZ"
)
# URL uses proposal_id, not proposal_number
resp = await client.get(
f"/proposal-detail/proposal-detail-repo/proposals/{proposal.proposal_id}"
)
assert resp.status_code == 200
assert "Unique proposal XYZ" in resp.text
class TestIntelPageE2E:
"""E2E: intel pages — no existing test file."""
async def test_intel_page_returns_200_or_empty(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
await _seed_repo(db_session, owner="intel-owner", slug="intel-repo")
resp = await client.get("/intel-owner/intel-repo/intel")
assert resp.status_code in (200, 404)
async def test_intel_dead_page_no_500(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
await _seed_repo(db_session, owner="intel-dead", slug="intel-dead-repo")
resp = await client.get("/intel-dead/intel-dead-repo/intel/dead")
assert resp.status_code != 500
class TestBlobAndRawE2E:
"""E2E: blob and raw file pages — no existing test file."""
async def test_blob_page_404_for_unknown_repo(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
resp = await client.get("/nobody/unknown-blob-repo/blob/HEAD/README.md")
assert resp.status_code == 404
async def test_raw_file_404_for_unknown_repo(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
resp = await client.get("/nobody/unknown-raw-repo/raw/HEAD/README.md")
assert resp.status_code == 404
async def test_blob_page_404_for_unknown_file(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
await _seed_repo(db_session, owner="blob-owner", slug="blob-repo")
resp = await client.get("/blob-owner/blob-repo/blob/HEAD/nonexistent.md")
assert resp.status_code in (200, 404) # empty repo may return 404 or empty page
class TestHTMXFragmentRoutingE2E:
"""E2E: HTMX fragment routing for pages that support it."""
async def test_issue_list_htmx_returns_fragment(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
repo = await _seed_repo(db_session, owner="htmx-frag", slug="htmx-frag-repo")
await _seed_issue(db_session, str(repo.repo_id), title="HTMX test issue")
resp = await client.get(
"/htmx-frag/htmx-frag-repo/issues",
headers={"HX-Request": "true"},
)
assert resp.status_code == 200
assert " None:
await _seed_repo(db_session, owner="htmx-boost", slug="htmx-boost-repo")
resp = await client.get(
"/htmx-boost/htmx-boost-repo/issues",
headers={"HX-Request": "true", "HX-Boosted": "true"},
)
# Boosted requests must get the full page
assert resp.status_code == 200
async def test_search_page_returns_200(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
resp = await client.get("/search", params={"q": "test"})
assert resp.status_code == 200
async def test_explore_page_returns_200(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
resp = await client.get("/explore")
assert resp.status_code == 200
# ─────────────────────────────────────────────────────────────────────────────
# LAYER 4 — STRESS
# ─────────────────────────────────────────────────────────────────────────────
class TestUISSRStress:
"""Stress: sequential page loads and empty-state robustness."""
async def test_multiple_repo_pages_sequential(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Load 5 different repos' pages in sequence — none should 500."""
for i in range(5):
owner = f"stress-owner-{i}"
slug = f"stress-repo-{i}"
await _seed_repo(db_session, owner=owner, slug=slug)
resp = await client.get(f"/{owner}/{slug}")
assert resp.status_code == 200, f"Repo {i} returned {resp.status_code}"
async def test_issue_list_with_many_issues(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Issue list renders correctly with 20 seeded issues."""
repo = await _seed_repo(db_session, owner="stress-issues", slug="stress-iss-repo")
for i in range(20):
await _seed_issue(
db_session, str(repo.repo_id), number=i + 1, title=f"Issue {i}"
)
resp = await client.get("/stress-issues/stress-iss-repo/issues")
assert resp.status_code == 200
async def test_empty_state_repo_home(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Repo with no commits/files renders without 500."""
await _seed_repo(db_session, owner="empty-owner", slug="empty-repo")
resp = await client.get("/empty-owner/empty-repo")
assert resp.status_code == 200
async def test_empty_state_proposals_page(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Proposals page with no proposals renders without 500."""
await _seed_repo(db_session, owner="empty-proposal", slug="empty-proposal-repo")
resp = await client.get("/empty-proposal/empty-proposal-repo/proposals")
assert resp.status_code == 200
async def test_empty_state_symbols_page(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Symbols page with no symbol index renders without 500."""
await _seed_repo(db_session, owner="empty-sym", slug="empty-sym-repo")
resp = await client.get("/empty-sym/empty-sym-repo/symbols")
assert resp.status_code == 200
async def test_topics_page_returns_200(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
resp = await client.get("/topics")
assert resp.status_code == 200
async def test_search_page_empty_query_200(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
resp = await client.get("/search")
assert resp.status_code == 200
async def test_concurrent_issue_list_requests(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""3 sequential requests to the same issues page succeed."""
repo = await _seed_repo(db_session, owner="conc-owner", slug="conc-repo")
await _seed_issue(db_session, str(repo.repo_id), title="Concurrent test")
for _ in range(3):
resp = await client.get("/conc-owner/conc-repo/issues")
assert resp.status_code == 200
# ─────────────────────────────────────────────────────────────────────────────
# LAYER 5 — DATA INTEGRITY
# ─────────────────────────────────────────────────────────────────────────────
class TestUISSRDataIntegrity:
"""Data integrity: nav counts correct, filter accuracy, XSS escaping."""
async def test_issue_open_count_matches_db(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Nav tab shows the correct open issue count seeded in DB."""
repo = await _seed_repo(db_session, owner="count-owner", slug="count-repo")
for i in range(3):
await _seed_issue(
db_session, str(repo.repo_id), number=i + 1, state="open"
)
resp = await client.get("/count-owner/count-repo/issues")
assert resp.status_code == 200
# The number 3 should appear in the open tab count
assert "3" in resp.text
async def test_closed_issues_not_shown_on_open_tab(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
repo = await _seed_repo(db_session, owner="filter-owner", slug="filter-repo")
await _seed_issue(
db_session, str(repo.repo_id), number=1, title="Open issue", state="open"
)
await _seed_issue(
db_session,
str(repo.repo_id),
number=2,
title="Closed issue xyz",
state="closed",
)
resp = await client.get("/filter-owner/filter-repo/issues", params={"state": "open"})
assert resp.status_code == 200
assert "Open issue" in resp.text
assert "Closed issue xyz" not in resp.text
async def test_open_issues_not_shown_on_closed_tab(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
repo = await _seed_repo(db_session, owner="filter-owner2", slug="filter-repo2")
await _seed_issue(
db_session, str(repo.repo_id), number=1, title="Open issue abc", state="open"
)
await _seed_issue(
db_session,
str(repo.repo_id),
number=2,
title="Closed issue only",
state="closed",
)
resp = await client.get(
"/filter-owner2/filter-repo2/issues", params={"state": "closed"}
)
assert resp.status_code == 200
assert "Closed issue only" in resp.text
assert "Open issue abc" not in resp.text
async def test_proposal_title_rendered_in_list(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
repo = await _seed_repo(db_session, owner="proposal-render", slug="proposal-render-repo")
await _seed_proposal(
db_session, str(repo.repo_id), title="Piano roll refactor"
)
resp = await client.get("/proposal-render/proposal-render-repo/proposals")
assert "Piano roll refactor" in resp.text
async def test_xss_title_escaped_in_issue_list(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""Issue titles with HTML special chars must be escaped."""
repo = await _seed_repo(db_session, owner="xss-owner", slug="xss-repo")
xss_title = ''
await _seed_issue(db_session, str(repo.repo_id), title=xss_title)
resp = await client.get("/xss-owner/xss-repo/issues")
assert resp.status_code == 200
# The raw script tag must not appear unescaped
assert "'})
assert "" not in resp.text
async def test_no_debug_info_in_production_errors(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""404 responses must not contain Python module paths."""
resp = await client.get("/nobody/sec-unknown-repo-xyz")
assert "/Users/" not in resp.text
assert "musehub/" not in resp.text or resp.status_code != 500
async def test_sql_injection_in_owner_does_not_500(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
"""URL path parameters passed as SQL injection should result in 404, not 500."""
resp = await client.get("/'; DROP TABLE musehub_repos; --/repo")
assert resp.status_code != 500
# ─────────────────────────────────────────────────────────────────────────────
# LAYER 7 — PERFORMANCE
# ─────────────────────────────────────────────────────────────────────────────
class TestUISSRPerformance:
"""Performance: page responses under time budgets."""
async def test_repo_home_under_500ms(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
await _seed_repo(db_session, owner="perf-owner", slug="perf-repo")
# Warm-up
await client.get("/perf-owner/perf-repo")
start = time.perf_counter()
resp = await client.get("/perf-owner/perf-repo")
elapsed = time.perf_counter() - start
assert resp.status_code == 200
assert elapsed < 0.500, f"Repo home took {elapsed*1000:.1f}ms (limit 500ms)"
async def test_issue_list_under_500ms(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
repo = await _seed_repo(db_session, owner="perf-iss", slug="perf-iss-repo")
for i in range(5):
await _seed_issue(
db_session, str(repo.repo_id), number=i + 1, title=f"Issue {i}"
)
await client.get("/perf-iss/perf-iss-repo/issues")
start = time.perf_counter()
resp = await client.get("/perf-iss/perf-iss-repo/issues")
elapsed = time.perf_counter() - start
assert resp.status_code == 200
assert elapsed < 0.500, f"Issue list took {elapsed*1000:.1f}ms (limit 500ms)"
async def test_proposals_page_under_500ms(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
repo = await _seed_repo(db_session, owner="perf-proposal", slug="perf-proposal-repo")
await _seed_proposal(db_session, str(repo.repo_id), title="Perf test proposal")
await client.get("/perf-proposal/perf-proposal-repo/proposals")
start = time.perf_counter()
resp = await client.get("/perf-proposal/perf-proposal-repo/proposals")
elapsed = time.perf_counter() - start
assert resp.status_code == 200
assert elapsed < 0.500, f"Proposals page took {elapsed*1000:.1f}ms (limit 500ms)"
async def test_explore_page_under_500ms(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
await client.get("/explore") # warm-up
start = time.perf_counter()
resp = await client.get("/explore")
elapsed = time.perf_counter() - start
assert resp.status_code == 200
assert elapsed < 0.500, f"Explore page took {elapsed*1000:.1f}ms (limit 500ms)"
async def test_search_page_under_500ms(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
await client.get("/search")
start = time.perf_counter()
resp = await client.get("/search", params={"q": "test"})
elapsed = time.perf_counter() - start
assert resp.status_code == 200
assert elapsed < 0.500, f"Search page took {elapsed*1000:.1f}ms (limit 500ms)"
def test_to_camel_via_htmx_helpers_import_fast(self) -> None:
"""Importing htmx_helpers is fast (module already loaded)."""
start = time.perf_counter()
from musehub.api.routes.musehub.htmx_helpers import is_htmx, htmx_trigger # noqa: F401
elapsed = time.perf_counter() - start
assert elapsed < 0.050, f"Import took {elapsed*1000:.1f}ms (limit 50ms)"