"""Section 39 — Cursor Pagination: 7-layer test suite. All list endpoints share a single, uniform cursor-based pagination contract: - Query params: ?cursor=&limit=N - Response body: next_cursor: str | null, total: int - Response header: Link: ; rel="next" when more pages exist Coverage layers: 1. Unit — PaginationParams bounds (limit 1–200, cursor optional), build_cursor_link_header URL encoding, param override 2. Integration — empty dataset → no cursor, filter+pagination combined, sort stability, label filter scoped to paginated results 3. E2E — full HTTP round-trips: total field consistent, Link header present/absent correctly, cursor echoed into Link URL 4. Stress — 100-item dataset full sequential cursor walk, verify no item skipped or duplicated, last page detected 5. Data Integrity — total stable across all pages, all items recovered by walking cursor chain, no duplicates 6. Security — limit=0 rejected (422), limit=201 rejected (422), cursor with special chars URL-safe (no 422) 7. Performance — HTTP issue list with 50 items under 500ms """ from __future__ import annotations import time import pytest from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from starlette.requests import Request as StarletteRequest from musehub.types.json_types import StrDict type _IssueJson = dict[str, str | int] from musehub.api.routes.musehub.pagination import ( PaginationParams, build_cursor_link_header, ) from musehub.services import musehub_issues as issues_svc # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _make_request(url: str) -> StarletteRequest: scope = { "type": "http", "method": "GET", "path": url.split("?")[0], "query_string": url.split("?")[1].encode() if "?" in url else b"", "headers": [], } return StarletteRequest(scope) async def _create_repo( client: AsyncClient, auth_headers: StrDict, name: str = "repo" ) -> str: r = await client.post( "/api/repos", json={"name": name, "owner": "testuser"}, headers=auth_headers, ) assert r.status_code == 201 return r.json()["repoId"] async def _create_issue( client: AsyncClient, auth_headers: StrDict, repo_id: str, title: str = "Issue", labels: list[str] | None = None, ) -> _IssueJson: r = await client.post( f"/api/repos/{repo_id}/issues", json={"title": title, "body": "", "labels": labels or []}, headers=auth_headers, ) assert r.status_code == 201 return r.json() async def _walk_all_issues( client: AsyncClient, auth_headers: StrDict, repo_id: str, limit: int = 10, state: str = "open", ) -> list[dict]: """Walk all pages via next_cursor and collect every issue in order.""" all_items: list[dict] = [] cursor: str | None = None while True: params = f"limit={limit}&state={state}" if cursor: params += f"&cursor={cursor}" r = await client.get( f"/api/repos/{repo_id}/issues?{params}", headers=auth_headers, ) assert r.status_code == 200 body = r.json() all_items.extend(body["issues"]) cursor = body["nextCursor"] if cursor is None: break return all_items # --------------------------------------------------------------------------- # Layer 1 — Unit tests # --------------------------------------------------------------------------- def test_pagination_params_defaults() -> None: """PaginationParams stores cursor=None and limit=20 when passed explicitly.""" p = PaginationParams(cursor=None, limit=20) assert p.cursor is None assert p.limit == 20 def test_pagination_params_min_limit() -> None: """PaginationParams accepts limit=1 (the minimum).""" p = PaginationParams(cursor=None, limit=1) assert p.limit == 1 def test_pagination_params_max_limit() -> None: """PaginationParams accepts limit=200 (the maximum).""" p = PaginationParams(cursor=None, limit=200) assert p.limit == 200 def test_build_cursor_link_header_contains_rel_next() -> None: """build_cursor_link_header always emits rel='next'.""" req = _make_request("http://test/api/repos/r1/issues") header = build_cursor_link_header(req, next_cursor="tok", limit=20) assert 'rel="next"' in header def test_build_cursor_link_header_encodes_special_chars() -> None: """Cursor with special chars is URL-encoded in the Link header.""" req = _make_request("http://test/api/repos/r1/issues") header = build_cursor_link_header(req, next_cursor="a=b&c=d", limit=20) # The cursor value must appear encoded in the link assert "rel=\"next\"" in header # Raw & would break URL parsing; encoded form is acceptable assert "a=b&c=d" not in header.split(";")[0] def test_build_cursor_link_header_overrides_existing_cursor() -> None: """build_cursor_link_header replaces any existing cursor param.""" req = _make_request("http://test/api/repos/r1/issues?cursor=old&limit=5") header = build_cursor_link_header(req, next_cursor="new", limit=5) link_url = header.split(";")[0].strip("<>") assert "cursor=new" in link_url assert "cursor=old" not in link_url def test_build_cursor_link_header_preserves_filter_params() -> None: """build_cursor_link_header carries forward non-pagination query params.""" req = _make_request("http://test/api/repos/r1/issues?state=open&label=bug") header = build_cursor_link_header(req, next_cursor="abc", limit=10) assert "state=open" in header assert "label=bug" in header # --------------------------------------------------------------------------- # Layer 2 — Integration tests # --------------------------------------------------------------------------- async def test_empty_repo_returns_empty_list_not_404( client: AsyncClient, auth_headers: StrDict, ) -> None: """GET /issues on an empty repo returns 200 with empty list, not 404.""" repo_id = await _create_repo(client, auth_headers, "empty-l2") r = await client.get(f"/api/repos/{repo_id}/issues", headers=auth_headers) assert r.status_code == 200 body = r.json() assert body["issues"] == [] assert body["total"] == 0 assert body["nextCursor"] is None async def test_label_filter_combined_with_pagination( client: AsyncClient, auth_headers: StrDict, ) -> None: """Label filter is applied before pagination; total reflects filtered count.""" repo_id = await _create_repo(client, auth_headers, "label-filter-l2") for i in range(6): label = "bug" if i % 2 == 0 else "enhancement" await _create_issue(client, auth_headers, repo_id, title=f"Issue {i + 1}", labels=[label]) r = await client.get( f"/api/repos/{repo_id}/issues?label=bug&limit=2", headers=auth_headers, ) assert r.status_code == 200 body = r.json() assert body["total"] == 3 # 3 bug issues assert len(body["issues"]) == 2 for issue in body["issues"]: assert "bug" in issue["labels"] async def test_sort_stability_ascending_by_number( client: AsyncClient, auth_headers: StrDict, ) -> None: """Issues are returned in ascending number order across pages.""" repo_id = await _create_repo(client, auth_headers, "sort-stable-l2") for i in range(5): await _create_issue(client, auth_headers, repo_id, title=f"Issue {i + 1}") all_issues = await _walk_all_issues(client, auth_headers, repo_id, limit=2) numbers = [issue["number"] for issue in all_issues] assert numbers == sorted(numbers) async def test_state_filter_combined_with_pagination( client: AsyncClient, auth_headers: StrDict, ) -> None: """?state=closed filters to closed issues only and total reflects that.""" repo_id = await _create_repo(client, auth_headers, "state-filter-l2") issues = [] for i in range(4): issue = await _create_issue(client, auth_headers, repo_id, title=f"Issue {i + 1}") issues.append(issue) # Close the first two for issue in issues[:2]: await client.post( f"/api/repos/{repo_id}/issues/{issue['number']}/close", headers=auth_headers, ) r = await client.get( f"/api/repos/{repo_id}/issues?state=closed&limit=10", headers=auth_headers, ) assert r.status_code == 200 body = r.json() assert body["total"] == 2 assert all(i["state"] == "closed" for i in body["issues"]) # --------------------------------------------------------------------------- # Layer 3 — E2E tests (full HTTP round-trips) # --------------------------------------------------------------------------- async def test_e2e_total_consistent_across_pages( client: AsyncClient, auth_headers: StrDict, ) -> None: """total field is identical across all pages of the same request.""" repo_id = await _create_repo(client, auth_headers, "e2e-total") for i in range(7): await _create_issue(client, auth_headers, repo_id, title=f"Issue {i + 1}") totals: list[int] = [] cursor: str | None = None while True: params = "limit=3" if cursor: params += f"&cursor={cursor}" r = await client.get(f"/api/repos/{repo_id}/issues?{params}", headers=auth_headers) body = r.json() totals.append(body["total"]) cursor = body["nextCursor"] if cursor is None: break assert all(t == 7 for t in totals) async def test_e2e_link_header_present_only_when_more_pages( client: AsyncClient, auth_headers: StrDict, ) -> None: """Link header is present on non-last pages and absent on the last page.""" repo_id = await _create_repo(client, auth_headers, "e2e-link") for i in range(4): await _create_issue(client, auth_headers, repo_id, title=f"Issue {i + 1}") # First page — more exist → Link present r1 = await client.get( f"/api/repos/{repo_id}/issues?limit=3", headers=auth_headers, ) assert r1.json()["nextCursor"] is not None assert "Link" in r1.headers # Second page — last page → Link absent cursor = r1.json()["nextCursor"] r2 = await client.get( f"/api/repos/{repo_id}/issues?limit=3&cursor={cursor}", headers=auth_headers, ) assert r2.json()["nextCursor"] is None assert "Link" not in r2.headers async def test_e2e_cursor_in_link_header_matches_body( client: AsyncClient, auth_headers: StrDict, ) -> None: """The cursor in the Link header matches next_cursor in the body.""" repo_id = await _create_repo(client, auth_headers, "e2e-cursor-match") for i in range(5): await _create_issue(client, auth_headers, repo_id, title=f"Issue {i + 1}") r = await client.get( f"/api/repos/{repo_id}/issues?limit=3", headers=auth_headers, ) next_cursor = r.json()["nextCursor"] link_header = r.headers.get("Link", "") assert f"cursor={next_cursor}" in link_header # --------------------------------------------------------------------------- # Layer 4 — Stress tests # --------------------------------------------------------------------------- async def test_stress_100_item_full_cursor_walk( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Walking 100 issues with limit=10 requires exactly 10 pages. Issues are created via the service layer to bypass the HTTP rate limiter. """ repo_id = await _create_repo(client, auth_headers, "stress-100") for i in range(100): await issues_svc.create_issue( db_session, repo_id=repo_id, title=f"Issue {i + 1}", body="", labels=[], author="testuser", ) await db_session.commit() all_issues = await _walk_all_issues(client, auth_headers, repo_id, limit=10) assert len(all_issues) == 100 async def test_stress_last_page_has_correct_remainder( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """The last page carries exactly total % limit items when not evenly divisible. Issues are created via the service layer to bypass the HTTP rate limiter. """ repo_id = await _create_repo(client, auth_headers, "stress-remainder") for i in range(23): await issues_svc.create_issue( db_session, repo_id=repo_id, title=f"Issue {i + 1}", body="", labels=[], author="testuser", ) await db_session.commit() all_issues = await _walk_all_issues(client, auth_headers, repo_id, limit=10) assert len(all_issues) == 23 # --------------------------------------------------------------------------- # Layer 5 — Data Integrity # --------------------------------------------------------------------------- async def test_integrity_no_items_skipped_across_pages( client: AsyncClient, auth_headers: StrDict, ) -> None: """Walking all pages recovers every issue with no gaps.""" repo_id = await _create_repo(client, auth_headers, "integrity-no-skip") for i in range(15): await _create_issue(client, auth_headers, repo_id, title=f"Issue {i + 1}") all_issues = await _walk_all_issues(client, auth_headers, repo_id, limit=4) numbers = sorted(issue["number"] for issue in all_issues) assert numbers == list(range(1, 16)) async def test_integrity_no_duplicates_across_pages( client: AsyncClient, auth_headers: StrDict, ) -> None: """No issue appears on more than one page.""" repo_id = await _create_repo(client, auth_headers, "integrity-no-dupe") for i in range(12): await _create_issue(client, auth_headers, repo_id, title=f"Issue {i + 1}") all_issues = await _walk_all_issues(client, auth_headers, repo_id, limit=5) numbers = [issue["number"] for issue in all_issues] assert len(numbers) == len(set(numbers)), "Duplicate issues found across pages" async def test_integrity_filter_preserved_across_pages( client: AsyncClient, auth_headers: StrDict, ) -> None: """Label filter is applied consistently on every page of a paginated walk.""" repo_id = await _create_repo(client, auth_headers, "integrity-filter") for i in range(10): label = "bug" if i < 6 else "other" await _create_issue(client, auth_headers, repo_id, title=f"Issue {i + 1}", labels=[label]) cursor: str | None = None all_issues: list[dict] = [] while True: params = "limit=3&label=bug" if cursor: params += f"&cursor={cursor}" r = await client.get(f"/api/repos/{repo_id}/issues?{params}", headers=auth_headers) body = r.json() all_issues.extend(body["issues"]) cursor = body["nextCursor"] if cursor is None: break assert len(all_issues) == 6 assert all("bug" in issue["labels"] for issue in all_issues) # --------------------------------------------------------------------------- # Layer 6 — Security # --------------------------------------------------------------------------- async def test_security_limit_zero_rejected( client: AsyncClient, auth_headers: StrDict, ) -> None: """limit=0 is rejected with 422 Unprocessable Entity.""" repo_id = await _create_repo(client, auth_headers, "sec-limit-zero") r = await client.get(f"/api/repos/{repo_id}/issues?limit=0", headers=auth_headers) assert r.status_code == 422 async def test_security_limit_over_max_rejected( client: AsyncClient, auth_headers: StrDict, ) -> None: """limit=201 is rejected with 422 Unprocessable Entity.""" repo_id = await _create_repo(client, auth_headers, "sec-limit-over") r = await client.get(f"/api/repos/{repo_id}/issues?limit=201", headers=auth_headers) assert r.status_code == 422 async def test_security_cursor_with_special_chars_accepted( client: AsyncClient, auth_headers: StrDict, ) -> None: """A cursor with URL-safe base64 chars is accepted without a 422.""" repo_id = await _create_repo(client, auth_headers, "sec-cursor-chars") # A cursor that won't match any real cursor — returns empty page, not 422. r = await client.get( f"/api/repos/{repo_id}/issues?cursor=9999999&limit=10", headers=auth_headers, ) assert r.status_code == 200 # --------------------------------------------------------------------------- # Layer 7 — Performance # --------------------------------------------------------------------------- async def test_performance_issue_list_50_items_under_500ms( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """GET /issues with 50 items in the repo responds in under 500ms. Issues are created via the service layer to bypass the HTTP rate limiter. """ repo_id = await _create_repo(client, auth_headers, "perf-50") for i in range(50): await issues_svc.create_issue( db_session, repo_id=repo_id, title=f"Issue {i + 1}", body="", labels=[], author="testuser", ) await db_session.commit() start = time.monotonic() r = await client.get( f"/api/repos/{repo_id}/issues?limit=50", headers=auth_headers, ) elapsed = time.monotonic() - start assert r.status_code == 200 assert elapsed < 0.5, f"Response took {elapsed:.3f}s (limit: 0.5s)"