"""Section 27 — Collaborators & Permissions: 7-layer test suite. Covers: - musehub/api/routes/musehub/collaborators.py (CRUD + permission logic) - musehub/db/musehub_collaborator_models.py (ORM model) - repos.py _guard_admin / check_collaborator_access (permission gating) Endpoints: GET /api/repos/{repo_id}/collaborators POST /api/repos/{repo_id}/collaborators PUT /api/repos/{repo_id}/collaborators/{handle}/permission DELETE /api/repos/{repo_id}/collaborators/{handle} GET /api/repos/{repo_id}/collaborators/{username}/permission Layer map --------- 1. Unit — Permission enum, _PERMISSION_RANK, _has_permission, _orm_to_response 2. Integration — DB-level collaborator CRUD via session 3. E2E — HTTP client against full app 4. Stress — 50 collaborators, concurrent list calls 5. Data Integrity — permission stored correctly, invited_by set, unique constraint 6. Security — auth required, non-admin blocked, owner un-removable 7. Performance — timing budgets """ from __future__ import annotations import asyncio import secrets import time from datetime import datetime, timezone import pytest from httpx import AsyncClient from muse.core.types import fake_id from musehub.core.genesis import compute_identity_id, compute_repo_id from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from musehub.types.json_types import StrDict from musehub.api.routes.musehub.collaborators import ( Permission, _PERMISSION_RANK, _has_permission, _orm_to_response, ) from musehub.db.musehub_collaborator_models import MusehubCollaborator from musehub.db.musehub_identity_models import MusehubIdentity from musehub.db.musehub_repo_models import MusehubRepo # --------------------------------------------------------------------------- # Fixtures / helpers # --------------------------------------------------------------------------- _TEST_HANDLE = "testuser" # matches auth_headers fixture's token.handle def _uid() -> str: return secrets.token_hex(16) async def _db_repo( session: AsyncSession, owner: str = _TEST_HANDLE, *, visibility: str = "private", ) -> MusehubRepo: slug = f"repo-{_uid()[:8]}" created_at = datetime.now(tz=timezone.utc) owner_id = compute_identity_id(owner.encode()) repo = MusehubRepo( repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()), name=slug, slug=slug, owner=owner, owner_user_id=owner_id, visibility=visibility, created_at=created_at, updated_at=created_at, ) session.add(repo) await session.flush() return repo async def _db_collab( session: AsyncSession, repo_id: str, handle: str, *, permission: str = "write", invited_by: str | None = None, accepted: bool = False, ) -> MusehubCollaborator: c = MusehubCollaborator( id=fake_id(f"{repo_id}-{handle}"), repo_id=repo_id, identity_handle=handle, permission=permission, invited_by_handle=invited_by, accepted_at=datetime.now(timezone.utc) if accepted else None, ) session.add(c) await session.flush() return c async def _db_identity(session: AsyncSession, handle: str) -> MusehubIdentity: """Create a MusehubIdentity for *handle* and flush it into the session.""" identity = MusehubIdentity( identity_id=_uid(), handle=handle, display_name=handle.title(), identity_type="human", ) session.add(identity) await session.flush() return identity async def _api_repo( client: AsyncClient, auth_headers: StrDict, *, visibility: str = "private", ) -> str: r = await client.post( "/api/repos", json={"name": f"collab-{_uid()[:8]}", "owner": _TEST_HANDLE, "visibility": visibility}, headers=auth_headers, ) assert r.status_code == 201, r.text return r.json()["repoId"] # =========================================================================== # Layer 1 — Unit # =========================================================================== class TestUnitPermissionEnum: def test_values(self) -> None: assert Permission.read == "read" assert Permission.write == "write" assert Permission.admin == "admin" assert Permission.owner == "owner" def test_four_levels(self) -> None: assert len(list(Permission)) == 4 class TestUnitPermissionRank: def test_read_is_lowest(self) -> None: assert _PERMISSION_RANK["read"] < _PERMISSION_RANK["write"] def test_write_lt_admin(self) -> None: assert _PERMISSION_RANK["write"] < _PERMISSION_RANK["admin"] def test_admin_lt_owner(self) -> None: assert _PERMISSION_RANK["admin"] < _PERMISSION_RANK["owner"] def test_all_levels_covered(self) -> None: for p in Permission: assert p.value in _PERMISSION_RANK class TestUnitHasPermission: def test_exact_match(self) -> None: assert _has_permission("write", Permission.write) is True def test_higher_grants_lower(self) -> None: assert _has_permission("admin", Permission.write) is True assert _has_permission("owner", Permission.read) is True def test_lower_denied_higher(self) -> None: assert _has_permission("read", Permission.write) is False assert _has_permission("write", Permission.admin) is False def test_unknown_permission_denied(self) -> None: assert _has_permission("", Permission.read) is False assert _has_permission("superuser", Permission.read) is False def test_read_satisfies_read(self) -> None: assert _has_permission("read", Permission.read) is True def test_owner_satisfies_admin(self) -> None: assert _has_permission("owner", Permission.admin) is True class TestUnitOrmToResponse: async def test_fields_mapped_correctly(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) collab = await _db_collab( db_session, repo.repo_id, "alice", permission="write", invited_by="bob" ) resp = _orm_to_response(collab) assert resp.handle == "alice" assert resp.permission == "write" assert resp.invited_by == "bob" assert resp.repo_id == repo.repo_id assert resp.collaborator_id == collab.id async def test_invited_by_none_when_null(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) collab = await _db_collab(db_session, repo.repo_id, "carol", invited_by=None) resp = _orm_to_response(collab) assert resp.invited_by is None # =========================================================================== # Layer 2 — Integration (DB-level) # =========================================================================== class TestIntegrationCollaboratorDB: async def test_insert_and_query(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) collab = await _db_collab(db_session, repo.repo_id, "alice", permission="admin") await db_session.flush() result = await db_session.execute( select(MusehubCollaborator).where( MusehubCollaborator.repo_id == repo.repo_id ) ) rows = result.scalars().all() assert len(rows) == 1 assert rows[0].identity_handle == "alice" assert rows[0].permission == "admin" async def test_unique_constraint_on_repo_handle( self, db_session: AsyncSession ) -> None: from sqlalchemy.exc import IntegrityError repo = await _db_repo(db_session) await _db_collab(db_session, repo.repo_id, "alice") await db_session.flush() dup = MusehubCollaborator( id=_uid(), repo_id=repo.repo_id, identity_handle="alice", permission="read", ) db_session.add(dup) with pytest.raises(IntegrityError): await db_session.flush() async def test_delete_collaborator_directly(self, db_session: AsyncSession) -> None: # Verify that a collaborator can be deleted explicitly and is gone afterwards. repo = await _db_repo(db_session) collab = await _db_collab(db_session, repo.repo_id, "alice") await db_session.commit() await db_session.delete(collab) await db_session.commit() result = await db_session.execute( select(MusehubCollaborator).where( MusehubCollaborator.repo_id == repo.repo_id ) ) assert result.scalars().first() is None async def test_accepted_at_null_by_default(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) collab = await _db_collab(db_session, repo.repo_id, "dave") assert collab.accepted_at is None async def test_permission_default_write(self, db_session: AsyncSession) -> None: repo = await _db_repo(db_session) collab = MusehubCollaborator( id=_uid(), repo_id=repo.repo_id, identity_handle="eve", ) db_session.add(collab) await db_session.flush() assert collab.permission == "write" # =========================================================================== # Layer 3 — E2E # =========================================================================== class TestE2EListCollaborators: async def test_list_returns_200( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) await _db_collab(db_session, repo_id, "alice") await db_session.commit() r = await client.get(f"/api/repos/{repo_id}/collaborators", headers=auth_headers) assert r.status_code == 200 body = r.json() assert "collaborators" in body assert "total" in body assert body["total"] == 1 async def test_list_requires_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) await db_session.commit() r = await client.get(f"/api/repos/{repo.repo_id}/collaborators") assert r.status_code == 401 async def test_list_unknown_repo_404( self, client: AsyncClient, auth_headers: StrDict ) -> None: r = await client.get("/api/repos/no-such-repo/collaborators", headers=auth_headers) assert r.status_code == 404 async def test_list_empty_repo( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) await db_session.commit() r = await client.get(f"/api/repos/{repo_id}/collaborators", headers=auth_headers) assert r.status_code == 200 assert r.json()["total"] == 0 class TestE2EInviteCollaborator: async def test_owner_can_invite_201( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) await _db_identity(db_session, "alice") await db_session.commit() r = await client.post( f"/api/repos/{repo_id}/collaborators", json={"handle": "alice", "permission": "write"}, headers=auth_headers, ) assert r.status_code == 201 body = r.json() assert body["handle"] == "alice" assert body["permission"] == "write" assert body["invitedBy"] == _TEST_HANDLE async def test_invite_requires_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) await db_session.commit() r = await client.post( f"/api/repos/{repo.repo_id}/collaborators", json={"handle": "bob", "permission": "read"}, ) assert r.status_code == 401 async def test_non_admin_gets_403( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """testuser is not owner (alice owns repo) and has only 'write' — gets 403.""" repo = await _db_repo(db_session, owner="alice") await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="write") await db_session.commit() r = await client.post( f"/api/repos/{repo.repo_id}/collaborators", json={"handle": "bob", "permission": "read"}, headers=auth_headers, ) assert r.status_code == 403 async def test_admin_collab_can_invite( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """testuser has admin permission → can invite.""" repo = await _db_repo(db_session, owner="alice") await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="admin") await _db_identity(db_session, "bob") await db_session.commit() r = await client.post( f"/api/repos/{repo.repo_id}/collaborators", json={"handle": "bob", "permission": "read"}, headers=auth_headers, ) assert r.status_code == 201 async def test_duplicate_invite_409( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) await _db_identity(db_session, "alice") await db_session.commit() body = {"handle": "alice", "permission": "write"} r1 = await client.post( f"/api/repos/{repo_id}/collaborators", json=body, headers=auth_headers ) assert r1.status_code == 201 r2 = await client.post( f"/api/repos/{repo_id}/collaborators", json=body, headers=auth_headers ) assert r2.status_code == 409 assert "already a collaborator" in r2.json()["detail"] async def test_invite_unknown_repo_404( self, client: AsyncClient, auth_headers: StrDict ) -> None: r = await client.post( "/api/repos/no-such-repo/collaborators", json={"handle": "alice", "permission": "write"}, headers=auth_headers, ) assert r.status_code == 404 async def test_default_permission_write( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) await _db_identity(db_session, "alice") await db_session.commit() r = await client.post( f"/api/repos/{repo_id}/collaborators", json={"handle": "alice"}, # no permission field → defaults to write headers=auth_headers, ) assert r.status_code == 201 assert r.json()["permission"] == "write" class TestE2EUpdatePermission: async def test_owner_can_update_200( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) await _db_collab(db_session, repo_id, "alice", permission="read") await db_session.commit() r = await client.put( f"/api/repos/{repo_id}/collaborators/alice/permission", json={"permission": "admin"}, headers=auth_headers, ) assert r.status_code == 200 assert r.json()["permission"] == "admin" async def test_update_requires_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) await _db_collab(db_session, repo.repo_id, "alice") await db_session.commit() r = await client.put( f"/api/repos/{repo.repo_id}/collaborators/alice/permission", json={"permission": "admin"}, ) assert r.status_code == 401 async def test_non_admin_gets_403( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, owner="alice") await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="write") await _db_collab(db_session, repo.repo_id, "bob", permission="read") await db_session.commit() r = await client.put( f"/api/repos/{repo.repo_id}/collaborators/bob/permission", json={"permission": "admin"}, headers=auth_headers, ) assert r.status_code == 403 async def test_update_owner_permission_403( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """Cannot change owner's permission via this endpoint.""" repo = await _db_repo(db_session, owner="alice") # testuser has admin permission await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="admin") # alice has 'owner' permission in collaborators table await _db_collab(db_session, repo.repo_id, "alice", permission="owner") await db_session.commit() r = await client.put( f"/api/repos/{repo.repo_id}/collaborators/alice/permission", json={"permission": "write"}, headers=auth_headers, ) assert r.status_code == 403 assert "Owner permission" in r.json()["detail"] async def test_update_nonexistent_collab_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) await db_session.commit() r = await client.put( f"/api/repos/{repo_id}/collaborators/nobody/permission", json={"permission": "read"}, headers=auth_headers, ) assert r.status_code == 404 class TestE2ERemoveCollaborator: async def test_owner_can_remove_204( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) await _db_collab(db_session, repo_id, "alice") await db_session.commit() r = await client.delete( f"/api/repos/{repo_id}/collaborators/alice", headers=auth_headers ) assert r.status_code == 204 async def test_remove_requires_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) await _db_collab(db_session, repo.repo_id, "alice") await db_session.commit() r = await client.delete( f"/api/repos/{repo.repo_id}/collaborators/alice" ) assert r.status_code == 401 async def test_non_admin_gets_403( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, owner="alice") await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="write") await _db_collab(db_session, repo.repo_id, "bob", permission="read") await db_session.commit() r = await client.delete( f"/api/repos/{repo.repo_id}/collaborators/bob", headers=auth_headers ) assert r.status_code == 403 async def test_remove_owner_403( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """Owner-permission collaborator cannot be removed.""" repo = await _db_repo(db_session, owner="alice") await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="admin") await _db_collab(db_session, repo.repo_id, "alice", permission="owner") await db_session.commit() r = await client.delete( f"/api/repos/{repo.repo_id}/collaborators/alice", headers=auth_headers ) assert r.status_code == 403 assert "Owner cannot be removed" in r.json()["detail"] async def test_remove_nonexistent_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) await db_session.commit() r = await client.delete( f"/api/repos/{repo_id}/collaborators/nobody", headers=auth_headers ) assert r.status_code == 404 class TestE2ECheckAccess: async def test_owner_access_is_owner_permission( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) await db_session.commit() # testuser is the owner; check their own permission r = await client.get( f"/api/repos/{repo_id}/collaborators/{_TEST_HANDLE}/permission", headers=auth_headers, ) assert r.status_code == 200 body = r.json() assert body["permission"] == "owner" async def test_collab_access_returns_permission( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) await _db_collab(db_session, repo_id, "alice", permission="admin") await db_session.commit() r = await client.get( f"/api/repos/{repo_id}/collaborators/alice/permission", headers=auth_headers, ) assert r.status_code == 200 assert r.json()["permission"] == "admin" async def test_non_collab_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) await db_session.commit() r = await client.get( f"/api/repos/{repo_id}/collaborators/stranger/permission", headers=auth_headers, ) assert r.status_code == 404 async def test_check_requires_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) await db_session.commit() r = await client.get( f"/api/repos/{repo.repo_id}/collaborators/{_TEST_HANDLE}/permission" ) assert r.status_code == 401 # =========================================================================== # Layer 4 — Stress # =========================================================================== class TestStress: async def test_list_50_collaborators( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) for i in range(50): await _db_collab(db_session, repo_id, f"user{i}", permission="read") await db_session.commit() r = await client.get(f"/api/repos/{repo_id}/collaborators", headers=auth_headers) assert r.status_code == 200 assert r.json()["total"] == 50 async def test_5_concurrent_list_calls( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) for i in range(10): await _db_collab(db_session, repo_id, f"stress{i}") await db_session.commit() responses = await asyncio.gather( *[ client.get( f"/api/repos/{repo_id}/collaborators", headers=auth_headers ) for _ in range(5) ] ) assert all(r.status_code == 200 for r in responses) assert all(r.json()["total"] == 10 for r in responses) # =========================================================================== # Layer 5 — Data Integrity # =========================================================================== class TestDataIntegrity: async def test_invited_by_set_correctly( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) await _db_identity(db_session, "alice") await db_session.commit() r = await client.post( f"/api/repos/{repo_id}/collaborators", json={"handle": "alice", "permission": "read"}, headers=auth_headers, ) assert r.status_code == 201 assert r.json()["invitedBy"] == _TEST_HANDLE async def test_permission_persisted_correctly( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) await _db_identity(db_session, "alice") await db_session.commit() await client.post( f"/api/repos/{repo_id}/collaborators", json={"handle": "alice", "permission": "admin"}, headers=auth_headers, ) db_session.expire_all() row = ( await db_session.execute( select(MusehubCollaborator).where( MusehubCollaborator.repo_id == repo_id, MusehubCollaborator.identity_handle == "alice", ) ) ).scalar_one_or_none() assert row is not None assert row.permission == "admin" async def test_update_persisted_in_db( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) await _db_collab(db_session, repo_id, "alice", permission="read") await db_session.commit() await client.put( f"/api/repos/{repo_id}/collaborators/alice/permission", json={"permission": "admin"}, headers=auth_headers, ) db_session.expire_all() row = ( await db_session.execute( select(MusehubCollaborator).where( MusehubCollaborator.repo_id == repo_id, MusehubCollaborator.identity_handle == "alice", ) ) ).scalar_one_or_none() assert row is not None assert row.permission == "admin" async def test_remove_deletes_db_row( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) await _db_collab(db_session, repo_id, "alice") await db_session.commit() await client.delete( f"/api/repos/{repo_id}/collaborators/alice", headers=auth_headers ) db_session.expire_all() row = ( await db_session.execute( select(MusehubCollaborator).where( MusehubCollaborator.repo_id == repo_id, MusehubCollaborator.identity_handle == "alice", ) ) ).scalar_one_or_none() assert row is None async def test_response_total_matches_actual_count( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) for i in range(7): await _db_collab(db_session, repo_id, f"u{i}") await db_session.commit() r = await client.get(f"/api/repos/{repo_id}/collaborators", headers=auth_headers) body = r.json() assert body["total"] == len(body["collaborators"]) # =========================================================================== # Layer 6 — Security # =========================================================================== class TestSecurity: async def test_all_endpoints_require_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) await _db_collab(db_session, repo.repo_id, "alice") await db_session.commit() endpoints = [ ("GET", f"/api/repos/{repo.repo_id}/collaborators"), ("POST", f"/api/repos/{repo.repo_id}/collaborators"), ("PUT", f"/api/repos/{repo.repo_id}/collaborators/alice/permission"), ("DELETE", f"/api/repos/{repo.repo_id}/collaborators/alice"), ] for method, url in endpoints: r = await client.request(method, url, json={"handle": "x", "permission": "read"}) assert r.status_code == 401, f"{method} {url} should require auth" async def test_read_only_collab_cannot_invite( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, owner="alice") await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="read") await db_session.commit() r = await client.post( f"/api/repos/{repo.repo_id}/collaborators", json={"handle": "bob", "permission": "read"}, headers=auth_headers, ) assert r.status_code == 403 async def test_write_collab_cannot_remove( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, owner="alice") await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="write") await _db_collab(db_session, repo.repo_id, "bob", permission="read") await db_session.commit() r = await client.delete( f"/api/repos/{repo.repo_id}/collaborators/bob", headers=auth_headers ) assert r.status_code == 403 async def test_owner_permission_cannot_be_updated( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, owner="alice") await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="admin") await _db_collab(db_session, repo.repo_id, "alice", permission="owner") await db_session.commit() r = await client.put( f"/api/repos/{repo.repo_id}/collaborators/alice/permission", json={"permission": "read"}, headers=auth_headers, ) assert r.status_code == 403 async def test_owner_collab_cannot_be_removed( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, owner="alice") await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="admin") await _db_collab(db_session, repo.repo_id, "alice", permission="owner") await db_session.commit() r = await client.delete( f"/api/repos/{repo.repo_id}/collaborators/alice", headers=auth_headers ) assert r.status_code == 403 async def test_check_access_requires_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session) await db_session.commit() r = await client.get( f"/api/repos/{repo.repo_id}/collaborators/alice/permission" ) assert r.status_code == 401 async def test_non_admin_cannot_update_permissions( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo = await _db_repo(db_session, owner="alice") await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="write") await _db_collab(db_session, repo.repo_id, "bob", permission="read") await db_session.commit() r = await client.put( f"/api/repos/{repo.repo_id}/collaborators/bob/permission", json={"permission": "admin"}, headers=auth_headers, ) assert r.status_code == 403 # =========================================================================== # Layer 7 — Performance # =========================================================================== class TestPerformance: async def test_list_20_collaborators_under_100ms( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) for i in range(20): await _db_collab(db_session, repo_id, f"perf{i}") await db_session.commit() start = time.perf_counter() r = await client.get(f"/api/repos/{repo_id}/collaborators", headers=auth_headers) elapsed = time.perf_counter() - start assert r.status_code == 200 assert elapsed < 0.1, f"list collaborators took {elapsed:.3f}s" def test_has_permission_1m_calls_fast(self) -> None: start = time.perf_counter() for _ in range(1_000_000): _has_permission("admin", Permission.write) elapsed = time.perf_counter() - start assert elapsed < 1.0, f"1M _has_permission calls took {elapsed:.3f}s" async def test_invite_10_collabs_under_500ms( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: repo_id = await _api_repo(client, auth_headers) for i in range(10): await _db_identity(db_session, f"batch{i}") await db_session.commit() start = time.perf_counter() for i in range(10): r = await client.post( f"/api/repos/{repo_id}/collaborators", json={"handle": f"batch{i}", "permission": "read"}, headers=auth_headers, ) assert r.status_code == 201 elapsed = time.perf_counter() - start assert elapsed < 1.5, f"10 invite calls took {elapsed:.3f}s"