"""Section 37 — API Contracts & OpenAPI: 7-layer test suite. Layers: 1. Unit – to_camel, CamelModel config, model field validation 2. Integration – OpenAPI schema generation, all paths have 2xx + summaries 3. E2E – /api/openapi.json, /docs, component schemas via test client 4. Stress – idempotent generation, bulk serialization 5. Data Integrity – camelCase round-trips, required fields, alias correctness 6. Security – error shapes, 401 not 500, no stack traces in responses 7. Performance – schema under 1 second, model validation under 500ms """ from __future__ import annotations import time import warnings from datetime import datetime, timezone from typing import Any import pytest # ── suppress known alias warnings from Pydantic union fields ────────────────── warnings.filterwarnings("ignore", message=".*alias.*Field.*no effect.*") # ── imports ─────────────────────────────────────────────────────────────────── from muse.core.types import fake_id from musehub.core.genesis import compute_identity_id from musehub.models.base import CamelModel, to_camel from musehub.models.musehub import ( BranchResponse, CommitResponse, IssueResponse, ReleaseResponse, RepoResponse, TagResponse, ) # ───────────────────────────────────────────────────────────────────────────── # LAYER 1 — UNIT # ───────────────────────────────────────────────────────────────────────────── class TestToCamel: """Unit tests for the to_camel converter.""" def test_single_word_unchanged(self) -> None: assert to_camel("name") == "name" def test_two_word_snake(self) -> None: assert to_camel("repo_id") == "repoId" def test_three_word_snake(self) -> None: assert to_camel("head_commit_id") == "headCommitId" def test_is_prefix(self) -> None: assert to_camel("is_verified") == "isVerified" def test_all_caps_word(self) -> None: # Each segment after split capitalised by .capitalize() assert to_camel("snake_url") == "snakeUrl" def test_empty_string(self) -> None: assert to_camel("") == "" def test_already_camel_unchanged(self) -> None: # No underscores → parts[0] only assert to_camel("repoId") == "repoId" def test_trailing_underscore(self) -> None: # trailing underscore adds empty segment → no change after capitalize assert to_camel("name_") == "name" def test_multiple_underscores(self) -> None: assert to_camel("a_b_c_d") == "aBCD" def test_return_type_is_str(self) -> None: assert isinstance(to_camel("some_field"), str) class TestCamelModelConfig: """Unit tests for CamelModel base class configuration.""" def test_alias_generator_set(self) -> None: assert CamelModel.model_config.get("alias_generator") is to_camel def test_populate_by_name_enabled(self) -> None: assert CamelModel.model_config.get("populate_by_name") is True def test_subclass_inherits_alias_generator(self) -> None: class MyModel(CamelModel): my_field: str m = MyModel(my_field="x") dumped = m.model_dump(by_alias=True) assert "myField" in dumped assert dumped["myField"] == "x" def test_populate_by_snake_name(self) -> None: class MyModel(CamelModel): repo_id: str m = MyModel(repo_id="abc") assert m.repo_id == "abc" def test_populate_by_camel_alias(self) -> None: class MyModel(CamelModel): repo_id: str m = MyModel(**{"repoId": "abc"}) assert m.repo_id == "abc" def test_snake_dump_default(self) -> None: class MyModel(CamelModel): repo_id: str m = MyModel(repo_id="x") assert "repo_id" in m.model_dump() assert "repoId" not in m.model_dump() def test_camel_dump_by_alias(self) -> None: class MyModel(CamelModel): repo_id: str m = MyModel(repo_id="x") assert "repoId" in m.model_dump(by_alias=True) class TestModelFieldValidation: """Unit tests for required fields on key response models.""" def test_repo_response_required_fields(self) -> None: fields = RepoResponse.model_fields for f in ("repo_id", "name", "owner", "slug", "visibility", "owner_user_id", "created_at", "updated_at"): assert f in fields, f"{f} missing from RepoResponse" def test_commit_response_required_fields(self) -> None: fields = CommitResponse.model_fields for f in ("commit_id", "branch", "message", "author", "timestamp"): assert f in fields, f"{f} missing from CommitResponse" def test_issue_response_required_fields(self) -> None: fields = IssueResponse.model_fields for f in ("issue_id", "number", "title", "state", "author"): assert f in fields, f"{f} missing from IssueResponse" def test_release_response_required_fields(self) -> None: fields = ReleaseResponse.model_fields for f in ("release_id", "tag", "title", "commit_id"): assert f in fields, f"{f} missing from ReleaseResponse" def test_tag_response_required_fields(self) -> None: fields = TagResponse.model_fields for f in ("tag", "namespace", "commit_id", "created_at"): assert f in fields, f"{f} missing from TagResponse" def test_branch_response_required_fields(self) -> None: fields = BranchResponse.model_fields for f in ("branch_id", "name", "head_commit_id"): assert f in fields, f"{f} missing from BranchResponse" def test_repo_response_json_schema_has_properties(self) -> None: schema = RepoResponse.model_json_schema() assert "properties" in schema or "$defs" in schema def test_commit_response_json_schema_generated(self) -> None: schema = CommitResponse.model_json_schema() assert schema.get("type") == "object" or "properties" in schema or "allOf" in schema # ───────────────────────────────────────────────────────────────────────────── # LAYER 2 — INTEGRATION # ───────────────────────────────────────────────────────────────────────────── @pytest.fixture(scope="module") def openapi_schema() -> None: """Generate the OpenAPI schema once per module.""" from musehub.main import app return app.openapi() class TestOpenAPISchemaGeneration: """Integration tests for the generated OpenAPI schema.""" def test_openapi_version_is_3_1(self, openapi_schema: None) -> None: assert openapi_schema["openapi"].startswith("3.1") def test_has_info_block(self, openapi_schema: None) -> None: assert "info" in openapi_schema assert "title" in openapi_schema["info"] def test_path_count_at_least_170(self, openapi_schema: None) -> None: assert len(openapi_schema["paths"]) >= 170 def test_component_schema_count_at_least_130(self, openapi_schema: None) -> None: schemas = openapi_schema.get("components", {}).get("schemas", {}) assert len(schemas) >= 130 def test_all_operations_have_summary(self, openapi_schema: None) -> None: HTTP_METHODS = {"get", "post", "put", "patch", "delete", "head", "options"} missing = [] for path, methods in openapi_schema["paths"].items(): for method, op in methods.items(): if method in HTTP_METHODS and not op.get("summary"): missing.append(f"{method.upper()} {path}") assert missing == [], f"Operations missing summary: {missing[:5]}" def test_all_operations_have_2xx_response(self, openapi_schema: None) -> None: HTTP_METHODS = {"get", "post", "put", "patch", "delete", "head", "options"} missing = [] for path, methods in openapi_schema["paths"].items(): for method, op in methods.items(): if method in HTTP_METHODS: has_2xx = any( str(code).startswith("2") for code in op.get("responses", {}) ) if not has_2xx: missing.append(f"{method.upper()} {path}") assert missing == [], f"Operations missing 2xx response: {missing[:5]}" def test_schema_is_dict(self, openapi_schema: None) -> None: assert isinstance(openapi_schema, dict) def test_schema_has_paths_key(self, openapi_schema: None) -> None: assert "paths" in openapi_schema def test_schema_has_components_key(self, openapi_schema: None) -> None: assert "components" in openapi_schema def test_total_operation_count(self, openapi_schema: None) -> None: HTTP_METHODS = {"get", "post", "put", "patch", "delete", "head", "options"} total = sum( 1 for methods in openapi_schema["paths"].values() for m in methods if m in HTTP_METHODS ) assert total >= 190 def test_key_response_models_in_components(self, openapi_schema: None) -> None: schemas = openapi_schema.get("components", {}).get("schemas", {}) for model_name in ("RepoResponse", "CommitResponse", "IssueResponse"): assert model_name in schemas, f"{model_name} missing from component schemas" def test_validation_error_schema_present(self, openapi_schema: None) -> None: schemas = openapi_schema.get("components", {}).get("schemas", {}) assert "HTTPValidationError" in schemas def test_schema_serialisable_to_json(self, openapi_schema: None) -> None: import json raw = json.dumps(openapi_schema) assert len(raw) > 100_000 # schema is large # ───────────────────────────────────────────────────────────────────────────── # LAYER 3 — E2E (via test client) # ───────────────────────────────────────────────────────────────────────────── class TestOpenAPIEndpointsE2E: """E2E tests hitting /api/openapi.json and /docs via the test client.""" async def test_openapi_json_returns_200(self, client: AsyncClient) -> None: resp = await client.get("/api/openapi.json") assert resp.status_code == 200 async def test_openapi_json_content_type(self, client: AsyncClient) -> None: resp = await client.get("/api/openapi.json") assert "application/json" in resp.headers["content-type"] async def test_openapi_json_has_paths(self, client: AsyncClient) -> None: resp = await client.get("/api/openapi.json") body = resp.json() assert "paths" in body assert len(body["paths"]) >= 170 async def test_openapi_json_version_3_1(self, client: AsyncClient) -> None: resp = await client.get("/api/openapi.json") assert resp.json()["openapi"].startswith("3.1") async def test_openapi_json_has_components(self, client: AsyncClient) -> None: resp = await client.get("/api/openapi.json") body = resp.json() assert "components" in body assert "schemas" in body["components"] async def test_docs_returns_200(self, client: AsyncClient) -> None: resp = await client.get("/docs") assert resp.status_code == 200 async def test_docs_returns_html(self, client: AsyncClient) -> None: resp = await client.get("/docs") assert "text/html" in resp.headers["content-type"] async def test_redoc_returns_200(self, client: AsyncClient) -> None: resp = await client.get("/redoc") assert resp.status_code == 200 async def test_openapi_json_repo_response_in_schemas(self, client: AsyncClient) -> None: resp = await client.get("/api/openapi.json") schemas = resp.json()["components"]["schemas"] assert "RepoResponse" in schemas async def test_openapi_json_commit_response_in_schemas(self, client: AsyncClient) -> None: resp = await client.get("/api/openapi.json") schemas = resp.json()["components"]["schemas"] assert "CommitResponse" in schemas async def test_openapi_json_issue_response_in_schemas(self, client: AsyncClient) -> None: resp = await client.get("/api/openapi.json") schemas = resp.json()["components"]["schemas"] assert "IssueResponse" in schemas async def test_openapi_json_release_response_in_schemas(self, client: AsyncClient) -> None: resp = await client.get("/api/openapi.json") schemas = resp.json()["components"]["schemas"] assert "ReleaseResponse" in schemas # ───────────────────────────────────────────────────────────────────────────── # LAYER 4 — STRESS # ───────────────────────────────────────────────────────────────────────────── class TestOpenAPIStress: """Stress tests: idempotency and bulk serialisation.""" def test_schema_generation_idempotent(self) -> None: from musehub.main import app s1 = app.openapi() s2 = app.openapi() assert s1 == s2 def test_schema_path_count_stable_across_calls(self) -> None: from musehub.main import app counts = [len(app.openapi()["paths"]) for _ in range(3)] assert len(set(counts)) == 1 def test_bulk_repo_response_serialisation(self) -> None: """Serialise 500 RepoResponse objects — must not raise.""" ts = datetime(2025, 1, 1, tzinfo=timezone.utc) # Build one valid instance first to discover required fields sample = dict( repo_id=fake_id("rid"), name="repo", owner="gabriel", slug="repo", visibility="public", owner_user_id=compute_identity_id(b"gabriel"), clone_url="https://localhost:1337/gabriel/repo", tags=[], created_at=ts, updated_at=ts, ) # Try with optional fields omitted — Pydantic will apply defaults proto = RepoResponse.model_validate(sample) for i in range(500): r = RepoResponse.model_validate({**sample, "repo_id": fake_id(f"rid-{i}")}) r.model_dump(by_alias=True) def test_bulk_commit_response_serialisation(self) -> None: """Serialise 500 CommitResponse objects.""" ts = datetime(2025, 1, 1, tzinfo=timezone.utc) for i in range(500): c = CommitResponse( commit_id=f"cid-{i:040d}", branch="main", parent_ids=[], message=f"commit {i}", author="gabriel", timestamp=ts, snapshot_id=f"sid-{i}", ) c.model_dump(by_alias=True) def test_to_camel_bulk(self) -> None: """Convert 10 000 snake_case strings without error.""" samples = [ "repo_id", "head_commit_id", "is_verified", "owner_user_id", "created_at", "updated_at", "download_urls", "semver_major", ] for _ in range(1250): for s in samples: to_camel(s) def test_model_json_schema_repeated(self) -> None: """model_json_schema() called 50 times produces consistent output.""" schemas = [RepoResponse.model_json_schema() for _ in range(50)] first = schemas[0] for s in schemas[1:]: assert s == first # ───────────────────────────────────────────────────────────────────────────── # LAYER 5 — DATA INTEGRITY # ───────────────────────────────────────────────────────────────────────────── class TestCamelCaseRoundTrip: """Data integrity: camelCase alias round-trips.""" def _make_repo(self) -> RepoResponse: ts = datetime(2025, 6, 1, tzinfo=timezone.utc) return RepoResponse.model_validate(dict( repo_id=fake_id("repo-abc"), name="my-repo", owner="gabriel", slug="my-repo", visibility="public", owner_user_id=compute_identity_id(b"gabriel"), clone_url="https://localhost:1337/gabriel/my-repo", tags=["music", "demo"], tempo_bpm=120.0, created_at=ts, updated_at=ts, )) def test_repo_camel_keys(self) -> None: wire = self._make_repo().model_dump(by_alias=True) assert "repoId" in wire assert "ownerId" not in wire # field is owner_user_id → ownerUserId assert "ownerUserId" in wire assert "createdAt" in wire def test_repo_round_trip(self) -> None: original = self._make_repo() wire = original.model_dump(by_alias=True) restored = RepoResponse.model_validate(wire) assert restored.model_dump(by_alias=True) == wire def test_repo_snake_dump_no_camel(self) -> None: wire = self._make_repo().model_dump() assert "repo_id" in wire assert "repoId" not in wire def test_commit_camel_keys(self) -> None: ts = datetime(2025, 1, 1, tzinfo=timezone.utc) c = CommitResponse( commit_id="abc123", branch="main", parent_ids=["p1"], message="init", author="gabriel", timestamp=ts, snapshot_id="snap1", ) wire = c.model_dump(by_alias=True) assert "commitId" in wire assert "parentIds" in wire assert "snapshotId" in wire def test_issue_camel_keys(self) -> None: ts = datetime(2025, 1, 1, tzinfo=timezone.utc) i = IssueResponse.model_validate(dict( issue_id=fake_id("iss-1"), number=1, title="Bug", body="", state="open", labels=[], author="gabriel", created_at=ts, updated_at=ts, comment_count=0, )) wire = i.model_dump(by_alias=True) assert "issueId" in wire assert "commentCount" in wire def test_tag_camel_keys(self) -> None: ts = datetime(2025, 1, 1, tzinfo=timezone.utc) t = TagResponse.model_validate(dict( tag="v1.0", namespace="release", commit_id="abc123", created_at=ts, )) wire = t.model_dump(by_alias=True) assert "commitId" in wire assert "createdAt" in wire def test_branch_camel_keys(self) -> None: b = BranchResponse(branch_id="bid-1", name="main", head_commit_id="abc") wire = b.model_dump(by_alias=True) assert "branchId" in wire assert "headCommitId" in wire def test_camelmodel_requires_snake_or_camel_on_input(self) -> None: class M(CamelModel): owner_id: str m1 = M(owner_id="a") m2 = M(**{"ownerId": "a"}) assert m1.owner_id == m2.owner_id def test_alias_generator_consistent(self) -> None: """All model fields with underscores produce valid camelCase aliases.""" for field_name in RepoResponse.model_fields: alias = to_camel(field_name) assert "_" not in alias or field_name == alias # no underscores in result # ───────────────────────────────────────────────────────────────────────────── # LAYER 6 — SECURITY # ───────────────────────────────────────────────────────────────────────────── class TestAPIContractsSecurity: """Security: proper error shapes, no stack traces, auth enforced.""" async def test_invalid_json_body_returns_422_not_500(self, client: AsyncClient, auth_headers: StrDict) -> None: # Sending invalid JSON to a POST endpoint should yield 422 resp = await client.post( "/api/repos", content=b"not-json", headers={**auth_headers, "content-type": "application/json"}, ) assert resp.status_code in (400, 422), f"Expected 4xx, got {resp.status_code}" async def test_422_response_has_detail_key(self, client: AsyncClient, auth_headers: StrDict) -> None: resp = await client.post( "/api/repos", content=b"not-json", headers={**auth_headers, "content-type": "application/json"}, ) if resp.status_code == 422: body = resp.json() assert "detail" in body async def test_missing_required_body_field_returns_422(self, client: AsyncClient, auth_headers: StrDict) -> None: # POST /repos with empty JSON object — missing required fields resp = await client.post( "/api/repos", json={}, headers=auth_headers, ) assert resp.status_code == 422 async def test_openapi_json_no_stack_trace(self, client: AsyncClient) -> None: resp = await client.get("/api/openapi.json") body = resp.text assert "Traceback" not in body # Check for Python exception raise syntax (not natural English "raise") assert "raise Exception" not in body assert "raise ValueError" not in body assert "raise HTTPException" not in body async def test_error_body_no_python_exception_class(self, client: AsyncClient, auth_headers: StrDict) -> None: resp = await client.post( "/api/repos", json={"x": 1}, headers=auth_headers, ) # Should be 422, body should not leak Python exception names if resp.status_code != 200: body = resp.text assert "Exception" not in body or "detail" in resp.json() async def test_unknown_api_route_returns_404(self, client: AsyncClient) -> None: # /api/* routes are strict — unknown API paths return 404 resp = await client.get("/api/this-route-does-not-exist-xyz-abc-12345") assert resp.status_code == 404 async def test_404_response_no_traceback(self, client: AsyncClient) -> None: resp = await client.get("/api/totally-unknown-endpoint-xyzabc-99999") assert "Traceback" not in resp.text async def test_auth_required_route_returns_401_not_500(self, client: AsyncClient) -> None: # Access a protected route WITHOUT auth headers — must yield 401, not 500 resp = await client.get("/api/repos") assert resp.status_code == 401, f"Expected 401, got {resp.status_code}" async def test_method_not_allowed_returns_405(self, client: AsyncClient, auth_headers: StrDict) -> None: # DELETE on a GET-only endpoint should yield 405 resp = await client.delete("/api/openapi.json", headers=auth_headers) assert resp.status_code in (405, 404) # FastAPI returns 405 for wrong method async def test_openapi_json_not_exposing_server_internals(self, client: AsyncClient) -> None: resp = await client.get("/api/openapi.json") body = resp.text # Should not contain local filesystem paths assert "/Users/" not in body assert "/home/" not in body # ───────────────────────────────────────────────────────────────────────────── # LAYER 7 — PERFORMANCE # ───────────────────────────────────────────────────────────────────────────── class TestAPIContractsPerformance: """Performance: schema generation and model validation within time budgets.""" def test_openapi_schema_generation_under_1_second(self) -> None: from musehub.main import app # Clear cached schema to force regeneration app.openapi_schema = None start = time.perf_counter() app.openapi() elapsed = time.perf_counter() - start assert elapsed < 1.0, f"Schema generation took {elapsed:.3f}s (limit 1.0s)" def test_openapi_cached_retrieval_under_10ms(self) -> None: from musehub.main import app # Ensure schema is cached app.openapi() start = time.perf_counter() for _ in range(10): app.openapi() elapsed = time.perf_counter() - start avg = elapsed / 10 assert avg < 0.010, f"Cached schema avg retrieval {avg*1000:.1f}ms (limit 10ms)" def test_to_camel_10k_strings_under_100ms(self) -> None: samples = [ "repo_id", "head_commit_id", "is_verified", "owner_user_id", "created_at", "updated_at", "download_urls", "semver_major", ] start = time.perf_counter() for _ in range(1250): for s in samples: to_camel(s) elapsed = time.perf_counter() - start assert elapsed < 0.100, f"to_camel 10K calls took {elapsed*1000:.1f}ms (limit 100ms)" def test_1k_repo_responses_validated_under_500ms(self) -> None: ts = datetime(2025, 1, 1, tzinfo=timezone.utc) base = { "repo_id": fake_id("rid"), "name": "repo", "owner": "gabriel", "slug": "repo", "visibility": "public", "owner_user_id": "uid", "clone_url": "https://localhost:1337/gabriel/repo", "tags": [], "created_at": ts.isoformat(), "updated_at": ts.isoformat(), } data = [{**base, "repo_id": fake_id(f"rid-{i}")} for i in range(1000)] start = time.perf_counter() for d in data: RepoResponse.model_validate(d) elapsed = time.perf_counter() - start assert elapsed < 0.500, f"1K RepoResponse validates took {elapsed*1000:.1f}ms (limit 500ms)" async def test_openapi_json_endpoint_under_200ms(self, client: AsyncClient) -> None: # First call may be slower; measure after warmup await client.get("/api/openapi.json") start = time.perf_counter() resp = await client.get("/api/openapi.json") elapsed = time.perf_counter() - start assert resp.status_code == 200 assert elapsed < 0.200, f"/api/openapi.json took {elapsed*1000:.1f}ms (limit 200ms)" def test_model_dump_by_alias_1k_under_200ms(self) -> None: ts = datetime(2025, 1, 1, tzinfo=timezone.utc) instances = [ CommitResponse( commit_id=f"cid-{i:040d}", branch="main", parent_ids=[], message=f"msg {i}", author="gabriel", timestamp=ts, snapshot_id=f"sid-{i}", ) for i in range(1000) ] start = time.perf_counter() for inst in instances: inst.model_dump(by_alias=True) elapsed = time.perf_counter() - start assert elapsed < 0.200, f"1K model_dump(by_alias) took {elapsed*1000:.1f}ms (limit 200ms)"