gabriel / musehub public

test_collaborators.py file-level

at sha256:3 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 💥 blast risk
sha256:0 fix: fall back to any indexed mpack in read_object_bytes when push mpac… · gabriel · Jun 17, 2026
1 """Section 27 — Collaborators & Permissions: 7-layer test suite.
2
3 Covers:
4 - musehub/api/routes/musehub/collaborators.py (CRUD + permission logic)
5 - musehub/db/musehub_collaborator_models.py (ORM model)
6 - repos.py _guard_admin / check_collaborator_access (permission gating)
7
8 Endpoints:
9 GET /api/repos/{repo_id}/collaborators
10 POST /api/repos/{repo_id}/collaborators
11 PUT /api/repos/{repo_id}/collaborators/{handle}/permission
12 DELETE /api/repos/{repo_id}/collaborators/{handle}
13 GET /api/repos/{repo_id}/collaborators/{username}/permission
14
15 Layer map
16 ---------
17 1. Unit — Permission enum, _PERMISSION_RANK, _has_permission, _orm_to_response
18 2. Integration — DB-level collaborator CRUD via session
19 3. E2E — HTTP client against full app
20 4. Stress — 50 collaborators, concurrent list calls
21 5. Data Integrity — permission stored correctly, invited_by set, unique constraint
22 6. Security — auth required, non-admin blocked, owner un-removable
23 7. Performance — timing budgets
24 """
25 from __future__ import annotations
26
27 import asyncio
28 import secrets
29 import time
30 from datetime import datetime, timezone
31
32 import pytest
33 from httpx import AsyncClient
34 from muse.core.types import fake_id
35 from musehub.core.genesis import compute_identity_id, compute_repo_id
36 from sqlalchemy import select
37 from sqlalchemy.ext.asyncio import AsyncSession
38
39 from musehub.types.json_types import StrDict
40 from musehub.api.routes.musehub.collaborators import (
41 Permission,
42 _PERMISSION_RANK,
43 _has_permission,
44 _orm_to_response,
45 )
46 from musehub.db.musehub_collaborator_models import MusehubCollaborator
47 from musehub.db.musehub_identity_models import MusehubIdentity
48 from musehub.db.musehub_repo_models import MusehubRepo
49
50
51 # ---------------------------------------------------------------------------
52 # Fixtures / helpers
53 # ---------------------------------------------------------------------------
54
55 _TEST_HANDLE = "testuser" # matches auth_headers fixture's token.handle
56
57
58 def _uid() -> str:
59 return secrets.token_hex(16)
60
61
62 async def _db_repo(
63 session: AsyncSession,
64 owner: str = _TEST_HANDLE,
65 *,
66 visibility: str = "private",
67 ) -> MusehubRepo:
68 slug = f"repo-{_uid()[:8]}"
69 created_at = datetime.now(tz=timezone.utc)
70 owner_id = compute_identity_id(owner.encode())
71 repo = MusehubRepo(
72 repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()),
73 name=slug,
74 slug=slug,
75 owner=owner,
76 owner_user_id=owner_id,
77 visibility=visibility,
78 created_at=created_at,
79 updated_at=created_at,
80 )
81 session.add(repo)
82 await session.flush()
83 return repo
84
85
86 async def _db_collab(
87 session: AsyncSession,
88 repo_id: str,
89 handle: str,
90 *,
91 permission: str = "write",
92 invited_by: str | None = None,
93 accepted: bool = False,
94 ) -> MusehubCollaborator:
95 c = MusehubCollaborator(
96 id=fake_id(f"{repo_id}-{handle}"),
97 repo_id=repo_id,
98 identity_handle=handle,
99 permission=permission,
100 invited_by_handle=invited_by,
101 accepted_at=datetime.now(timezone.utc) if accepted else None,
102 )
103 session.add(c)
104 await session.flush()
105 return c
106
107
108 async def _db_identity(session: AsyncSession, handle: str) -> MusehubIdentity:
109 """Create a MusehubIdentity for *handle* and flush it into the session."""
110 identity = MusehubIdentity(
111 identity_id=_uid(),
112 handle=handle,
113 display_name=handle.title(),
114 identity_type="human",
115 )
116 session.add(identity)
117 await session.flush()
118 return identity
119
120
121 async def _api_repo(
122 client: AsyncClient,
123 auth_headers: StrDict,
124 *,
125 visibility: str = "private",
126 ) -> str:
127 r = await client.post(
128 "/api/repos",
129 json={"name": f"collab-{_uid()[:8]}", "owner": _TEST_HANDLE, "visibility": visibility},
130 headers=auth_headers,
131 )
132 assert r.status_code == 201, r.text
133 return r.json()["repoId"]
134
135
136 # ===========================================================================
137 # Layer 1 — Unit
138 # ===========================================================================
139
140
141 class TestUnitPermissionEnum:
142 def test_values(self) -> None:
143 assert Permission.read == "read"
144 assert Permission.write == "write"
145 assert Permission.admin == "admin"
146 assert Permission.owner == "owner"
147
148 def test_four_levels(self) -> None:
149 assert len(list(Permission)) == 4
150
151
152 class TestUnitPermissionRank:
153 def test_read_is_lowest(self) -> None:
154 assert _PERMISSION_RANK["read"] < _PERMISSION_RANK["write"]
155
156 def test_write_lt_admin(self) -> None:
157 assert _PERMISSION_RANK["write"] < _PERMISSION_RANK["admin"]
158
159 def test_admin_lt_owner(self) -> None:
160 assert _PERMISSION_RANK["admin"] < _PERMISSION_RANK["owner"]
161
162 def test_all_levels_covered(self) -> None:
163 for p in Permission:
164 assert p.value in _PERMISSION_RANK
165
166
167 class TestUnitHasPermission:
168 def test_exact_match(self) -> None:
169 assert _has_permission("write", Permission.write) is True
170
171 def test_higher_grants_lower(self) -> None:
172 assert _has_permission("admin", Permission.write) is True
173 assert _has_permission("owner", Permission.read) is True
174
175 def test_lower_denied_higher(self) -> None:
176 assert _has_permission("read", Permission.write) is False
177 assert _has_permission("write", Permission.admin) is False
178
179 def test_unknown_permission_denied(self) -> None:
180 assert _has_permission("", Permission.read) is False
181 assert _has_permission("superuser", Permission.read) is False
182
183 def test_read_satisfies_read(self) -> None:
184 assert _has_permission("read", Permission.read) is True
185
186 def test_owner_satisfies_admin(self) -> None:
187 assert _has_permission("owner", Permission.admin) is True
188
189
190 class TestUnitOrmToResponse:
191 async def test_fields_mapped_correctly(self, db_session: AsyncSession) -> None:
192 repo = await _db_repo(db_session)
193 collab = await _db_collab(
194 db_session, repo.repo_id, "alice",
195 permission="write", invited_by="bob"
196 )
197 resp = _orm_to_response(collab)
198 assert resp.handle == "alice"
199 assert resp.permission == "write"
200 assert resp.invited_by == "bob"
201 assert resp.repo_id == repo.repo_id
202 assert resp.collaborator_id == collab.id
203
204 async def test_invited_by_none_when_null(self, db_session: AsyncSession) -> None:
205 repo = await _db_repo(db_session)
206 collab = await _db_collab(db_session, repo.repo_id, "carol", invited_by=None)
207 resp = _orm_to_response(collab)
208 assert resp.invited_by is None
209
210
211 # ===========================================================================
212 # Layer 2 — Integration (DB-level)
213 # ===========================================================================
214
215
216 class TestIntegrationCollaboratorDB:
217 async def test_insert_and_query(self, db_session: AsyncSession) -> None:
218 repo = await _db_repo(db_session)
219 collab = await _db_collab(db_session, repo.repo_id, "alice", permission="admin")
220 await db_session.flush()
221
222 result = await db_session.execute(
223 select(MusehubCollaborator).where(
224 MusehubCollaborator.repo_id == repo.repo_id
225 )
226 )
227 rows = result.scalars().all()
228 assert len(rows) == 1
229 assert rows[0].identity_handle == "alice"
230 assert rows[0].permission == "admin"
231
232 async def test_unique_constraint_on_repo_handle(
233 self, db_session: AsyncSession
234 ) -> None:
235 from sqlalchemy.exc import IntegrityError
236
237 repo = await _db_repo(db_session)
238 await _db_collab(db_session, repo.repo_id, "alice")
239 await db_session.flush()
240
241 dup = MusehubCollaborator(
242 id=_uid(),
243 repo_id=repo.repo_id,
244 identity_handle="alice",
245 permission="read",
246 )
247 db_session.add(dup)
248 with pytest.raises(IntegrityError):
249 await db_session.flush()
250
251 async def test_delete_collaborator_directly(self, db_session: AsyncSession) -> None:
252 # Verify that a collaborator can be deleted explicitly and is gone afterwards.
253 repo = await _db_repo(db_session)
254 collab = await _db_collab(db_session, repo.repo_id, "alice")
255 await db_session.commit()
256
257 await db_session.delete(collab)
258 await db_session.commit()
259
260 result = await db_session.execute(
261 select(MusehubCollaborator).where(
262 MusehubCollaborator.repo_id == repo.repo_id
263 )
264 )
265 assert result.scalars().first() is None
266
267 async def test_accepted_at_null_by_default(self, db_session: AsyncSession) -> None:
268 repo = await _db_repo(db_session)
269 collab = await _db_collab(db_session, repo.repo_id, "dave")
270 assert collab.accepted_at is None
271
272 async def test_permission_default_write(self, db_session: AsyncSession) -> None:
273 repo = await _db_repo(db_session)
274 collab = MusehubCollaborator(
275 id=_uid(),
276 repo_id=repo.repo_id,
277 identity_handle="eve",
278 )
279 db_session.add(collab)
280 await db_session.flush()
281 assert collab.permission == "write"
282
283
284 # ===========================================================================
285 # Layer 3 — E2E
286 # ===========================================================================
287
288
289 class TestE2EListCollaborators:
290 async def test_list_returns_200(
291 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
292 ) -> None:
293 repo_id = await _api_repo(client, auth_headers)
294 await _db_collab(db_session, repo_id, "alice")
295 await db_session.commit()
296
297 r = await client.get(f"/api/repos/{repo_id}/collaborators", headers=auth_headers)
298 assert r.status_code == 200
299 body = r.json()
300 assert "collaborators" in body
301 assert "total" in body
302 assert body["total"] == 1
303
304 async def test_list_requires_auth(
305 self, client: AsyncClient, db_session: AsyncSession
306 ) -> None:
307 repo = await _db_repo(db_session)
308 await db_session.commit()
309
310 r = await client.get(f"/api/repos/{repo.repo_id}/collaborators")
311 assert r.status_code == 401
312
313 async def test_list_unknown_repo_404(
314 self, client: AsyncClient, auth_headers: StrDict
315 ) -> None:
316 r = await client.get("/api/repos/no-such-repo/collaborators", headers=auth_headers)
317 assert r.status_code == 404
318
319 async def test_list_empty_repo(
320 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
321 ) -> None:
322 repo_id = await _api_repo(client, auth_headers)
323 await db_session.commit()
324
325 r = await client.get(f"/api/repos/{repo_id}/collaborators", headers=auth_headers)
326 assert r.status_code == 200
327 assert r.json()["total"] == 0
328
329
330 class TestE2EInviteCollaborator:
331 async def test_owner_can_invite_201(
332 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
333 ) -> None:
334 repo_id = await _api_repo(client, auth_headers)
335 await _db_identity(db_session, "alice")
336 await db_session.commit()
337
338 r = await client.post(
339 f"/api/repos/{repo_id}/collaborators",
340 json={"handle": "alice", "permission": "write"},
341 headers=auth_headers,
342 )
343 assert r.status_code == 201
344 body = r.json()
345 assert body["handle"] == "alice"
346 assert body["permission"] == "write"
347 assert body["invitedBy"] == _TEST_HANDLE
348
349 async def test_invite_requires_auth(
350 self, client: AsyncClient, db_session: AsyncSession
351 ) -> None:
352 repo = await _db_repo(db_session)
353 await db_session.commit()
354
355 r = await client.post(
356 f"/api/repos/{repo.repo_id}/collaborators",
357 json={"handle": "bob", "permission": "read"},
358 )
359 assert r.status_code == 401
360
361 async def test_non_admin_gets_403(
362 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
363 ) -> None:
364 """testuser is not owner (alice owns repo) and has only 'write' — gets 403."""
365 repo = await _db_repo(db_session, owner="alice")
366 await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="write")
367 await db_session.commit()
368
369 r = await client.post(
370 f"/api/repos/{repo.repo_id}/collaborators",
371 json={"handle": "bob", "permission": "read"},
372 headers=auth_headers,
373 )
374 assert r.status_code == 403
375
376 async def test_admin_collab_can_invite(
377 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
378 ) -> None:
379 """testuser has admin permission → can invite."""
380 repo = await _db_repo(db_session, owner="alice")
381 await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="admin")
382 await _db_identity(db_session, "bob")
383 await db_session.commit()
384
385 r = await client.post(
386 f"/api/repos/{repo.repo_id}/collaborators",
387 json={"handle": "bob", "permission": "read"},
388 headers=auth_headers,
389 )
390 assert r.status_code == 201
391
392 async def test_duplicate_invite_409(
393 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
394 ) -> None:
395 repo_id = await _api_repo(client, auth_headers)
396 await _db_identity(db_session, "alice")
397 await db_session.commit()
398
399 body = {"handle": "alice", "permission": "write"}
400 r1 = await client.post(
401 f"/api/repos/{repo_id}/collaborators", json=body, headers=auth_headers
402 )
403 assert r1.status_code == 201
404
405 r2 = await client.post(
406 f"/api/repos/{repo_id}/collaborators", json=body, headers=auth_headers
407 )
408 assert r2.status_code == 409
409 assert "already a collaborator" in r2.json()["detail"]
410
411 async def test_invite_unknown_repo_404(
412 self, client: AsyncClient, auth_headers: StrDict
413 ) -> None:
414 r = await client.post(
415 "/api/repos/no-such-repo/collaborators",
416 json={"handle": "alice", "permission": "write"},
417 headers=auth_headers,
418 )
419 assert r.status_code == 404
420
421 async def test_default_permission_write(
422 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
423 ) -> None:
424 repo_id = await _api_repo(client, auth_headers)
425 await _db_identity(db_session, "alice")
426 await db_session.commit()
427
428 r = await client.post(
429 f"/api/repos/{repo_id}/collaborators",
430 json={"handle": "alice"}, # no permission field → defaults to write
431 headers=auth_headers,
432 )
433 assert r.status_code == 201
434 assert r.json()["permission"] == "write"
435
436
437 class TestE2EUpdatePermission:
438 async def test_owner_can_update_200(
439 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
440 ) -> None:
441 repo_id = await _api_repo(client, auth_headers)
442 await _db_collab(db_session, repo_id, "alice", permission="read")
443 await db_session.commit()
444
445 r = await client.put(
446 f"/api/repos/{repo_id}/collaborators/alice/permission",
447 json={"permission": "admin"},
448 headers=auth_headers,
449 )
450 assert r.status_code == 200
451 assert r.json()["permission"] == "admin"
452
453 async def test_update_requires_auth(
454 self, client: AsyncClient, db_session: AsyncSession
455 ) -> None:
456 repo = await _db_repo(db_session)
457 await _db_collab(db_session, repo.repo_id, "alice")
458 await db_session.commit()
459
460 r = await client.put(
461 f"/api/repos/{repo.repo_id}/collaborators/alice/permission",
462 json={"permission": "admin"},
463 )
464 assert r.status_code == 401
465
466 async def test_non_admin_gets_403(
467 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
468 ) -> None:
469 repo = await _db_repo(db_session, owner="alice")
470 await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="write")
471 await _db_collab(db_session, repo.repo_id, "bob", permission="read")
472 await db_session.commit()
473
474 r = await client.put(
475 f"/api/repos/{repo.repo_id}/collaborators/bob/permission",
476 json={"permission": "admin"},
477 headers=auth_headers,
478 )
479 assert r.status_code == 403
480
481 async def test_update_owner_permission_403(
482 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
483 ) -> None:
484 """Cannot change owner's permission via this endpoint."""
485 repo = await _db_repo(db_session, owner="alice")
486 # testuser has admin permission
487 await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="admin")
488 # alice has 'owner' permission in collaborators table
489 await _db_collab(db_session, repo.repo_id, "alice", permission="owner")
490 await db_session.commit()
491
492 r = await client.put(
493 f"/api/repos/{repo.repo_id}/collaborators/alice/permission",
494 json={"permission": "write"},
495 headers=auth_headers,
496 )
497 assert r.status_code == 403
498 assert "Owner permission" in r.json()["detail"]
499
500 async def test_update_nonexistent_collab_404(
501 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
502 ) -> None:
503 repo_id = await _api_repo(client, auth_headers)
504 await db_session.commit()
505
506 r = await client.put(
507 f"/api/repos/{repo_id}/collaborators/nobody/permission",
508 json={"permission": "read"},
509 headers=auth_headers,
510 )
511 assert r.status_code == 404
512
513
514 class TestE2ERemoveCollaborator:
515 async def test_owner_can_remove_204(
516 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
517 ) -> None:
518 repo_id = await _api_repo(client, auth_headers)
519 await _db_collab(db_session, repo_id, "alice")
520 await db_session.commit()
521
522 r = await client.delete(
523 f"/api/repos/{repo_id}/collaborators/alice", headers=auth_headers
524 )
525 assert r.status_code == 204
526
527 async def test_remove_requires_auth(
528 self, client: AsyncClient, db_session: AsyncSession
529 ) -> None:
530 repo = await _db_repo(db_session)
531 await _db_collab(db_session, repo.repo_id, "alice")
532 await db_session.commit()
533
534 r = await client.delete(
535 f"/api/repos/{repo.repo_id}/collaborators/alice"
536 )
537 assert r.status_code == 401
538
539 async def test_non_admin_gets_403(
540 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
541 ) -> None:
542 repo = await _db_repo(db_session, owner="alice")
543 await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="write")
544 await _db_collab(db_session, repo.repo_id, "bob", permission="read")
545 await db_session.commit()
546
547 r = await client.delete(
548 f"/api/repos/{repo.repo_id}/collaborators/bob", headers=auth_headers
549 )
550 assert r.status_code == 403
551
552 async def test_remove_owner_403(
553 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
554 ) -> None:
555 """Owner-permission collaborator cannot be removed."""
556 repo = await _db_repo(db_session, owner="alice")
557 await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="admin")
558 await _db_collab(db_session, repo.repo_id, "alice", permission="owner")
559 await db_session.commit()
560
561 r = await client.delete(
562 f"/api/repos/{repo.repo_id}/collaborators/alice", headers=auth_headers
563 )
564 assert r.status_code == 403
565 assert "Owner cannot be removed" in r.json()["detail"]
566
567 async def test_remove_nonexistent_404(
568 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
569 ) -> None:
570 repo_id = await _api_repo(client, auth_headers)
571 await db_session.commit()
572
573 r = await client.delete(
574 f"/api/repos/{repo_id}/collaborators/nobody", headers=auth_headers
575 )
576 assert r.status_code == 404
577
578
579 class TestE2ECheckAccess:
580 async def test_owner_access_is_owner_permission(
581 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
582 ) -> None:
583 repo_id = await _api_repo(client, auth_headers)
584 await db_session.commit()
585
586 # testuser is the owner; check their own permission
587 r = await client.get(
588 f"/api/repos/{repo_id}/collaborators/{_TEST_HANDLE}/permission",
589 headers=auth_headers,
590 )
591 assert r.status_code == 200
592 body = r.json()
593 assert body["permission"] == "owner"
594
595 async def test_collab_access_returns_permission(
596 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
597 ) -> None:
598 repo_id = await _api_repo(client, auth_headers)
599 await _db_collab(db_session, repo_id, "alice", permission="admin")
600 await db_session.commit()
601
602 r = await client.get(
603 f"/api/repos/{repo_id}/collaborators/alice/permission",
604 headers=auth_headers,
605 )
606 assert r.status_code == 200
607 assert r.json()["permission"] == "admin"
608
609 async def test_non_collab_404(
610 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
611 ) -> None:
612 repo_id = await _api_repo(client, auth_headers)
613 await db_session.commit()
614
615 r = await client.get(
616 f"/api/repos/{repo_id}/collaborators/stranger/permission",
617 headers=auth_headers,
618 )
619 assert r.status_code == 404
620
621 async def test_check_requires_auth(
622 self, client: AsyncClient, db_session: AsyncSession
623 ) -> None:
624 repo = await _db_repo(db_session)
625 await db_session.commit()
626
627 r = await client.get(
628 f"/api/repos/{repo.repo_id}/collaborators/{_TEST_HANDLE}/permission"
629 )
630 assert r.status_code == 401
631
632
633 # ===========================================================================
634 # Layer 4 — Stress
635 # ===========================================================================
636
637
638 class TestStress:
639 async def test_list_50_collaborators(
640 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
641 ) -> None:
642 repo_id = await _api_repo(client, auth_headers)
643 for i in range(50):
644 await _db_collab(db_session, repo_id, f"user{i}", permission="read")
645 await db_session.commit()
646
647 r = await client.get(f"/api/repos/{repo_id}/collaborators", headers=auth_headers)
648 assert r.status_code == 200
649 assert r.json()["total"] == 50
650
651 async def test_5_concurrent_list_calls(
652 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
653 ) -> None:
654 repo_id = await _api_repo(client, auth_headers)
655 for i in range(10):
656 await _db_collab(db_session, repo_id, f"stress{i}")
657 await db_session.commit()
658
659 responses = await asyncio.gather(
660 *[
661 client.get(
662 f"/api/repos/{repo_id}/collaborators", headers=auth_headers
663 )
664 for _ in range(5)
665 ]
666 )
667 assert all(r.status_code == 200 for r in responses)
668 assert all(r.json()["total"] == 10 for r in responses)
669
670
671 # ===========================================================================
672 # Layer 5 — Data Integrity
673 # ===========================================================================
674
675
676 class TestDataIntegrity:
677 async def test_invited_by_set_correctly(
678 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
679 ) -> None:
680 repo_id = await _api_repo(client, auth_headers)
681 await _db_identity(db_session, "alice")
682 await db_session.commit()
683
684 r = await client.post(
685 f"/api/repos/{repo_id}/collaborators",
686 json={"handle": "alice", "permission": "read"},
687 headers=auth_headers,
688 )
689 assert r.status_code == 201
690 assert r.json()["invitedBy"] == _TEST_HANDLE
691
692 async def test_permission_persisted_correctly(
693 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
694 ) -> None:
695 repo_id = await _api_repo(client, auth_headers)
696 await _db_identity(db_session, "alice")
697 await db_session.commit()
698
699 await client.post(
700 f"/api/repos/{repo_id}/collaborators",
701 json={"handle": "alice", "permission": "admin"},
702 headers=auth_headers,
703 )
704 db_session.expire_all()
705
706 row = (
707 await db_session.execute(
708 select(MusehubCollaborator).where(
709 MusehubCollaborator.repo_id == repo_id,
710 MusehubCollaborator.identity_handle == "alice",
711 )
712 )
713 ).scalar_one_or_none()
714 assert row is not None
715 assert row.permission == "admin"
716
717 async def test_update_persisted_in_db(
718 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
719 ) -> None:
720 repo_id = await _api_repo(client, auth_headers)
721 await _db_collab(db_session, repo_id, "alice", permission="read")
722 await db_session.commit()
723
724 await client.put(
725 f"/api/repos/{repo_id}/collaborators/alice/permission",
726 json={"permission": "admin"},
727 headers=auth_headers,
728 )
729 db_session.expire_all()
730
731 row = (
732 await db_session.execute(
733 select(MusehubCollaborator).where(
734 MusehubCollaborator.repo_id == repo_id,
735 MusehubCollaborator.identity_handle == "alice",
736 )
737 )
738 ).scalar_one_or_none()
739 assert row is not None
740 assert row.permission == "admin"
741
742 async def test_remove_deletes_db_row(
743 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
744 ) -> None:
745 repo_id = await _api_repo(client, auth_headers)
746 await _db_collab(db_session, repo_id, "alice")
747 await db_session.commit()
748
749 await client.delete(
750 f"/api/repos/{repo_id}/collaborators/alice", headers=auth_headers
751 )
752 db_session.expire_all()
753
754 row = (
755 await db_session.execute(
756 select(MusehubCollaborator).where(
757 MusehubCollaborator.repo_id == repo_id,
758 MusehubCollaborator.identity_handle == "alice",
759 )
760 )
761 ).scalar_one_or_none()
762 assert row is None
763
764 async def test_response_total_matches_actual_count(
765 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
766 ) -> None:
767 repo_id = await _api_repo(client, auth_headers)
768 for i in range(7):
769 await _db_collab(db_session, repo_id, f"u{i}")
770 await db_session.commit()
771
772 r = await client.get(f"/api/repos/{repo_id}/collaborators", headers=auth_headers)
773 body = r.json()
774 assert body["total"] == len(body["collaborators"])
775
776
777 # ===========================================================================
778 # Layer 6 — Security
779 # ===========================================================================
780
781
782 class TestSecurity:
783 async def test_all_endpoints_require_auth(
784 self, client: AsyncClient, db_session: AsyncSession
785 ) -> None:
786 repo = await _db_repo(db_session)
787 await _db_collab(db_session, repo.repo_id, "alice")
788 await db_session.commit()
789
790 endpoints = [
791 ("GET", f"/api/repos/{repo.repo_id}/collaborators"),
792 ("POST", f"/api/repos/{repo.repo_id}/collaborators"),
793 ("PUT", f"/api/repos/{repo.repo_id}/collaborators/alice/permission"),
794 ("DELETE", f"/api/repos/{repo.repo_id}/collaborators/alice"),
795 ]
796 for method, url in endpoints:
797 r = await client.request(method, url, json={"handle": "x", "permission": "read"})
798 assert r.status_code == 401, f"{method} {url} should require auth"
799
800 async def test_read_only_collab_cannot_invite(
801 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
802 ) -> None:
803 repo = await _db_repo(db_session, owner="alice")
804 await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="read")
805 await db_session.commit()
806
807 r = await client.post(
808 f"/api/repos/{repo.repo_id}/collaborators",
809 json={"handle": "bob", "permission": "read"},
810 headers=auth_headers,
811 )
812 assert r.status_code == 403
813
814 async def test_write_collab_cannot_remove(
815 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
816 ) -> None:
817 repo = await _db_repo(db_session, owner="alice")
818 await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="write")
819 await _db_collab(db_session, repo.repo_id, "bob", permission="read")
820 await db_session.commit()
821
822 r = await client.delete(
823 f"/api/repos/{repo.repo_id}/collaborators/bob", headers=auth_headers
824 )
825 assert r.status_code == 403
826
827 async def test_owner_permission_cannot_be_updated(
828 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
829 ) -> None:
830 repo = await _db_repo(db_session, owner="alice")
831 await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="admin")
832 await _db_collab(db_session, repo.repo_id, "alice", permission="owner")
833 await db_session.commit()
834
835 r = await client.put(
836 f"/api/repos/{repo.repo_id}/collaborators/alice/permission",
837 json={"permission": "read"},
838 headers=auth_headers,
839 )
840 assert r.status_code == 403
841
842 async def test_owner_collab_cannot_be_removed(
843 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
844 ) -> None:
845 repo = await _db_repo(db_session, owner="alice")
846 await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="admin")
847 await _db_collab(db_session, repo.repo_id, "alice", permission="owner")
848 await db_session.commit()
849
850 r = await client.delete(
851 f"/api/repos/{repo.repo_id}/collaborators/alice", headers=auth_headers
852 )
853 assert r.status_code == 403
854
855 async def test_check_access_requires_auth(
856 self, client: AsyncClient, db_session: AsyncSession
857 ) -> None:
858 repo = await _db_repo(db_session)
859 await db_session.commit()
860
861 r = await client.get(
862 f"/api/repos/{repo.repo_id}/collaborators/alice/permission"
863 )
864 assert r.status_code == 401
865
866 async def test_non_admin_cannot_update_permissions(
867 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
868 ) -> None:
869 repo = await _db_repo(db_session, owner="alice")
870 await _db_collab(db_session, repo.repo_id, _TEST_HANDLE, permission="write")
871 await _db_collab(db_session, repo.repo_id, "bob", permission="read")
872 await db_session.commit()
873
874 r = await client.put(
875 f"/api/repos/{repo.repo_id}/collaborators/bob/permission",
876 json={"permission": "admin"},
877 headers=auth_headers,
878 )
879 assert r.status_code == 403
880
881
882 # ===========================================================================
883 # Layer 7 — Performance
884 # ===========================================================================
885
886
887 class TestPerformance:
888 async def test_list_20_collaborators_under_100ms(
889 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
890 ) -> None:
891 repo_id = await _api_repo(client, auth_headers)
892 for i in range(20):
893 await _db_collab(db_session, repo_id, f"perf{i}")
894 await db_session.commit()
895
896 start = time.perf_counter()
897 r = await client.get(f"/api/repos/{repo_id}/collaborators", headers=auth_headers)
898 elapsed = time.perf_counter() - start
899
900 assert r.status_code == 200
901 assert elapsed < 0.1, f"list collaborators took {elapsed:.3f}s"
902
903 def test_has_permission_1m_calls_fast(self) -> None:
904 start = time.perf_counter()
905 for _ in range(1_000_000):
906 _has_permission("admin", Permission.write)
907 elapsed = time.perf_counter() - start
908 assert elapsed < 1.0, f"1M _has_permission calls took {elapsed:.3f}s"
909
910 async def test_invite_10_collabs_under_500ms(
911 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
912 ) -> None:
913 repo_id = await _api_repo(client, auth_headers)
914 for i in range(10):
915 await _db_identity(db_session, f"batch{i}")
916 await db_session.commit()
917
918 start = time.perf_counter()
919 for i in range(10):
920 r = await client.post(
921 f"/api/repos/{repo_id}/collaborators",
922 json={"handle": f"batch{i}", "permission": "read"},
923 headers=auth_headers,
924 )
925 assert r.status_code == 201
926 elapsed = time.perf_counter() - start
927 assert elapsed < 1.5, f"10 invite calls took {elapsed:.3f}s"