gabriel / musehub public

test_repository_service.py file-level

at sha256:3 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 πŸ’₯ blast risk
sha256:0 fix: fall back to any indexed mpack in read_object_bytes when push mpac… · gabriel · Jun 17, 2026
1 """Supplemental tests for the Repository Service β€” Section 4.
2
3 This file fills the gaps left by the existing test_musehub_repos.py (170 tests).
4 It does NOT duplicate what is already covered there. Focus:
5
6 Coverage layers
7 ───────────────
8 Unit β€” _generate_slug (all edge cases), _guard_visibility, _guard_owner
9 as pure-function unit tests; resolve_head_ref logic;
10 list_branches_with_detail ahead/behind computation.
11 Integration β€” get_repo_home_stats (commit counts, 14-day activity array, file
12 count from snapshot); get_recently_pushed_branches; collaborator
13 repos appearing in list_repos_for_user; template copy (private
14 template silently skipped); transfer on soft-deleted repo β†’ None.
15 E2E β€” GET /api/repos/{repo_id}/stats; GET /api/repos/{repo_id}/branches/detail;
16 GET /api/repos/{repo_id}/snapshots/{snapshot_id};
17 private repo branches/commits β†’ 401 without auth;
18 invalid owner pattern β†’ 422; stats on private repo β†’ 401.
19 Stress β€” Create and list 50 repos; cursor pagination through 100 repos;
20 200-commit history paging.
21 Data β€” Soft-delete preserves data in DB; double soft-delete is idempotent;
22 get_repo skips soft-deleted rows; transfer on deleted repo β†’ None;
23 duplicate (owner, slug) β†’ 409 on HTTP, IntegrityError at service level.
24 Security β€” Invalid owner pattern (spaces, uppercase, leading hyphen) β†’ 422;
25 private branches endpoint β†’ 401; private commits endpoint β†’ 401;
26 private stats endpoint β†’ 401; non-owner delete β†’ 403;
27 non-owner transfer β†’ 403.
28 Performance β€” _generate_slug 1 000 calls < 100 ms; list_repos_for_user 50 repos
29 < 500 ms; get_repo_home_stats with 200 commits < 500 ms;
30 list_commits 200-row page < 200 ms.
31 """
32 from __future__ import annotations
33
34 import secrets
35 import time
36 from datetime import datetime, timezone
37 from pathlib import Path
38
39 import pytest
40 from httpx import AsyncClient
41 from sqlalchemy.ext.asyncio import AsyncSession
42
43 from musehub.core.genesis import compute_branch_id, compute_collaborator_id, compute_identity_id, compute_repo_id
44 from musehub.db.musehub_repo_models import MusehubBranch, MusehubObject, MusehubObjectRef, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef
45 from musehub.models.musehub import RepoResponse
46 from tests.factories import create_repo, create_branch, create_commit
47 from musehub.types.json_types import StrDict
48
49
50 # ─────────────────────────────────────────────────────────────────────────────
51 # Layer 1 β€” Unit: pure functions (no DB, no HTTP)
52 # ─────────────────────────────────────────────────────────────────────────────
53
54 class TestGenerateSlug:
55 """_generate_slug must produce valid URL-safe slugs from arbitrary names."""
56
57 def _slug(self, name: str) -> str:
58 from musehub.services.musehub_repository import _generate_slug
59 return _generate_slug(name)
60
61 def test_lowercase(self) -> None:
62 assert self._slug("Neo Soul Experiment") == "neo-soul-experiment"
63
64 def test_special_chars_collapsed_to_hyphens(self) -> None:
65 assert self._slug("jazz & blues / 2024") == "jazz-blues-2024"
66
67 def test_leading_trailing_hyphens_stripped(self) -> None:
68 assert self._slug("---beats---") == "beats"
69
70 def test_all_symbols_falls_back_to_repo(self) -> None:
71 assert self._slug("!!!@@@###") == "repo"
72
73 def test_empty_string_falls_back_to_repo(self) -> None:
74 assert self._slug("") == "repo"
75
76 def test_max_64_chars(self) -> None:
77 long_name = "a" * 100
78 result = self._slug(long_name)
79 assert len(result) <= 64
80
81 def test_truncation_does_not_leave_trailing_hyphen(self) -> None:
82 # Name that would produce a hyphen right at position 64
83 name = "a" * 63 + "-b" * 10
84 result = self._slug(name)
85 assert not result.endswith("-")
86 assert len(result) <= 64
87
88 def test_numbers_preserved(self) -> None:
89 assert self._slug("track-01") == "track-01"
90
91 def test_consecutive_special_chars_single_hyphen(self) -> None:
92 assert self._slug("a -- b") == "a-b"
93
94 def test_unicode_non_ascii_collapsed(self) -> None:
95 result = self._slug("cafΓ©")
96 # "cafΓ©" β†’ "caf-" β†’ "caf" (stripped) or similar β€” must be alphanumeric+hyphen only
97 assert all(c.isascii() and (c.isalnum() or c == "-") for c in result)
98
99
100 class TestGuardVisibility:
101 """_guard_visibility raises correct HTTP exceptions."""
102
103 def test_raises_404_when_repo_is_none(self) -> None:
104 from fastapi import HTTPException
105 from musehub.api.routes.musehub.repos import _guard_visibility
106 with pytest.raises(HTTPException) as exc_info:
107 _guard_visibility(None, None)
108 assert exc_info.value.status_code == 404
109
110 def test_raises_401_for_private_repo_without_auth(self) -> None:
111 from fastapi import HTTPException
112 from musehub.api.routes.musehub.repos import _guard_visibility
113 from musehub.models.musehub import RepoResponse
114 from datetime import datetime, timezone
115
116 _alice_id = compute_identity_id(b"alice")
117 _ts = datetime.now(tz=timezone.utc)
118 repo = RepoResponse(
119 repo_id=compute_repo_id(_alice_id, "secret", "code", _ts.isoformat()),
120 name="secret",
121 owner="alice",
122 slug="secret",
123 visibility="private",
124 owner_user_id=_alice_id,
125 description="",
126 tags=[],
127 clone_url="musehub://alice/secret",
128 created_at=_ts,
129 updated_at=_ts,
130 default_branch="main",
131 )
132 with pytest.raises(HTTPException) as exc_info:
133 _guard_visibility(repo, None)
134 assert exc_info.value.status_code == 401
135
136 def test_no_raise_for_public_repo_without_auth(self) -> None:
137 from musehub.api.routes.musehub.repos import _guard_visibility
138 from musehub.models.musehub import RepoResponse
139
140 _alice_id2 = compute_identity_id(b"alice")
141 _ts2 = datetime.now(tz=timezone.utc)
142 repo = RepoResponse(
143 repo_id=compute_repo_id(_alice_id2, "open", "code", _ts2.isoformat()),
144 name="open",
145 owner="alice",
146 slug="open",
147 visibility="public",
148 owner_user_id=_alice_id2,
149 description="",
150 tags=[],
151 clone_url="musehub://alice/open",
152 created_at=_ts2,
153 updated_at=_ts2,
154 default_branch="main",
155 )
156 _guard_visibility(repo, None) # must not raise
157
158
159 class TestGuardOwner:
160 """_guard_owner raises correct HTTP exceptions."""
161
162 def _repo(self, owner: str = "alice") -> RepoResponse:
163 _owner_id = compute_identity_id(owner.encode())
164 _ts = datetime.now(tz=timezone.utc)
165 return RepoResponse(
166 repo_id=compute_repo_id(_owner_id, "r", "code", _ts.isoformat()),
167 name="r",
168 owner=owner,
169 slug="r",
170 visibility="public",
171 owner_user_id=_owner_id,
172 description="",
173 tags=[],
174 clone_url=f"musehub://{owner}/r",
175 created_at=_ts,
176 updated_at=_ts,
177 default_branch="main",
178 )
179
180 def test_raises_404_when_repo_is_none(self) -> None:
181 from fastapi import HTTPException
182 from musehub.api.routes.musehub.repos import _guard_owner
183 with pytest.raises(HTTPException) as exc_info:
184 _guard_owner(None, "alice")
185 assert exc_info.value.status_code == 404
186
187 def test_raises_403_for_non_owner(self) -> None:
188 from fastapi import HTTPException
189 from musehub.api.routes.musehub.repos import _guard_owner
190 with pytest.raises(HTTPException) as exc_info:
191 _guard_owner(self._repo("alice"), "bob")
192 assert exc_info.value.status_code == 403
193
194 def test_no_raise_for_owner(self) -> None:
195 from musehub.api.routes.musehub.repos import _guard_owner
196 _guard_owner(self._repo("alice"), "alice") # must not raise
197
198
199 class TestResolveHeadRef:
200 """resolve_head_ref prefers 'main', falls back to first alphabetically."""
201
202 @pytest.mark.asyncio
203 async def test_empty_repo_returns_main(self, db_session: AsyncSession) -> None:
204 from musehub.services import musehub_repository
205 repo = await create_repo(db_session, slug="rhr-empty")
206 result = await musehub_repository.resolve_head_ref(db_session, repo.repo_id)
207 assert result == "main"
208
209 @pytest.mark.asyncio
210 async def test_prefers_main_branch(self, db_session: AsyncSession) -> None:
211 from musehub.services import musehub_repository
212 repo = await create_repo(db_session, slug="rhr-main")
213 await create_branch(db_session, repo.repo_id, name="dev")
214 await create_branch(db_session, repo.repo_id, name="main")
215 result = await musehub_repository.resolve_head_ref(db_session, repo.repo_id)
216 assert result == "main"
217
218 @pytest.mark.asyncio
219 async def test_falls_back_to_first_alpha_when_no_main(
220 self, db_session: AsyncSession
221 ) -> None:
222 from musehub.services import musehub_repository
223 repo = await create_repo(db_session, slug="rhr-alpha")
224 await create_branch(db_session, repo.repo_id, name="dev")
225 await create_branch(db_session, repo.repo_id, name="alpha")
226 result = await musehub_repository.resolve_head_ref(db_session, repo.repo_id)
227 assert result == "alpha" # first alphabetically
228
229
230 class TestListBranchesWithDetail:
231 """list_branches_with_detail computes ahead/behind counts correctly."""
232
233 @pytest.mark.asyncio
234 async def test_empty_repo_returns_empty(self, db_session: AsyncSession) -> None:
235 from musehub.services import musehub_repository
236 repo = await create_repo(db_session, slug="bwd-empty")
237 result = await musehub_repository.list_branches_with_detail(db_session, repo.repo_id)
238 assert result.branches == []
239
240 @pytest.mark.asyncio
241 async def test_default_branch_has_zero_ahead_behind(
242 self, db_session: AsyncSession
243 ) -> None:
244 from musehub.services import musehub_repository
245 repo = await create_repo(db_session, slug="bwd-default")
246 await create_branch(db_session, repo.repo_id, name="main")
247 await create_commit(db_session, repo.repo_id, branch="main")
248
249 result = await musehub_repository.list_branches_with_detail(db_session, repo.repo_id)
250 main_detail = next(b for b in result.branches if b.name == "main")
251 assert main_detail.is_default is True
252 assert main_detail.ahead_count == 0
253 assert main_detail.behind_count == 0
254
255 @pytest.mark.asyncio
256 async def test_feature_branch_ahead_count(self, db_session: AsyncSession) -> None:
257 from musehub.services import musehub_repository
258 repo = await create_repo(db_session, slug="bwd-ahead")
259 await create_branch(db_session, repo.repo_id, name="main")
260 await create_branch(db_session, repo.repo_id, name="feat")
261 # 1 commit on main, 3 on feat
262 await create_commit(db_session, repo.repo_id, branch="main")
263 for _ in range(3):
264 await create_commit(db_session, repo.repo_id, branch="feat")
265
266 result = await musehub_repository.list_branches_with_detail(db_session, repo.repo_id)
267 feat = next(b for b in result.branches if b.name == "feat")
268 # feat has 3 commits not in main β†’ ahead=3; main has 1 commit not in feat β†’ behind=1
269 assert feat.ahead_count == 3
270 assert feat.behind_count == 1
271
272
273 # ─────────────────────────────────────────────────────────────────────────────
274 # Layer 2 β€” Integration: service layer with real DB
275 # ─────────────────────────────────────────────────────────────────────────────
276
277 class TestGetRepoHomeStats:
278 @pytest.mark.asyncio
279 async def test_empty_repo_returns_zeros(self, db_session: AsyncSession) -> None:
280 from musehub.services import musehub_repository
281 repo = await create_repo(db_session, slug="stats-empty")
282 stats = await musehub_repository.get_repo_home_stats(db_session, repo.repo_id, "main")
283 assert stats["total_commits"] == 0
284 assert stats["total_objects"] == 0
285 assert stats["total_size_bytes"] == 0
286 assert stats["commit_activity"] == [0] * 14
287
288 @pytest.mark.asyncio
289 async def test_commit_count_reflects_actual_commits(
290 self, db_session: AsyncSession
291 ) -> None:
292 from musehub.services import musehub_repository
293 repo = await create_repo(db_session, slug="stats-commits")
294 for _ in range(5):
295 await create_commit(db_session, repo.repo_id, branch="main")
296
297 stats = await musehub_repository.get_repo_home_stats(db_session, repo.repo_id, "main")
298 assert stats["total_commits"] == 5
299
300 @pytest.mark.asyncio
301 async def test_activity_array_has_14_entries(self, db_session: AsyncSession) -> None:
302 from musehub.services import musehub_repository
303 repo = await create_repo(db_session, slug="stats-activity")
304 stats = await musehub_repository.get_repo_home_stats(db_session, repo.repo_id, "main")
305 assert len(stats["commit_activity"]) == 14
306
307 @pytest.mark.asyncio
308 async def test_object_count_and_size_bytes(
309 self, db_session: AsyncSession, tmp_path: Path
310 ) -> None:
311 from musehub.services import musehub_repository
312 repo = await create_repo(db_session, slug="stats-objects")
313 for i in range(3):
314 oid = f"sha256:stats{i}"
315 obj = MusehubObject(
316 object_id=oid,
317 path=f"f{i}.bin",
318 size_bytes=100,
319 )
320 db_session.add(obj)
321 db_session.add(MusehubObjectRef(repo_id=repo.repo_id, object_id=oid))
322 await db_session.commit()
323
324 stats = await musehub_repository.get_repo_home_stats(db_session, repo.repo_id, "main")
325 assert stats["total_objects"] == 3
326 assert stats["total_size_bytes"] == 300
327
328
329 class TestGetRecentlyPushedBranches:
330 @pytest.mark.asyncio
331 async def test_no_recent_branches_returns_empty(
332 self, db_session: AsyncSession
333 ) -> None:
334 from musehub.services import musehub_repository
335 repo = await create_repo(db_session, slug="recent-empty")
336 result = await musehub_repository.get_recently_pushed_branches(
337 db_session, repo.repo_id, "main"
338 )
339 assert result == []
340
341 @pytest.mark.asyncio
342 async def test_current_ref_excluded(self, db_session: AsyncSession) -> None:
343 from musehub.services import musehub_repository
344 repo = await create_repo(db_session, slug="recent-exclude")
345 commit = await create_commit(db_session, repo.repo_id, branch="main")
346 await create_branch(db_session, repo.repo_id, name="main",
347 head_commit_id=commit.commit_id)
348 result = await musehub_repository.get_recently_pushed_branches(
349 db_session, repo.repo_id, "main"
350 )
351 assert all(b["name"] != "main" for b in result)
352
353 @pytest.mark.asyncio
354 async def test_recent_branch_appears(self, db_session: AsyncSession) -> None:
355 from musehub.services import musehub_repository
356 repo = await create_repo(db_session, slug="recent-feat")
357 commit = await create_commit(db_session, repo.repo_id, branch="feat")
358 feat = MusehubBranch(
359 branch_id=compute_branch_id(repo.repo_id, "feat"),
360 repo_id=repo.repo_id,
361 name="feat",
362 head_commit_id=commit.commit_id,
363 )
364 db_session.add(feat)
365 await db_session.commit()
366
367 result = await musehub_repository.get_recently_pushed_branches(
368 db_session, repo.repo_id, "main", within_hours=72
369 )
370 assert any(b["name"] == "feat" for b in result)
371
372
373 class TestListReposForUserWithCollaborators:
374 @pytest.mark.asyncio
375 async def test_collab_repos_included_in_list(
376 self, db_session: AsyncSession
377 ) -> None:
378 from musehub.services import musehub_repository
379 from musehub.db.musehub_collaborator_models import MusehubCollaborator
380
381 owner_repo = await create_repo(db_session, slug="collab-owned",
382 owner="alice", owner_user_id=compute_identity_id(b"alice"))
383 other_repo = await create_repo(db_session, slug="collab-shared",
384 owner="bob", owner_user_id=compute_identity_id(b"bob"))
385 # alice is an accepted collaborator on bob's repo
386 _accepted_at = datetime.now(tz=timezone.utc)
387 collab = MusehubCollaborator(
388 id=compute_collaborator_id(other_repo.repo_id, compute_identity_id(b"alice"), _accepted_at.isoformat()),
389 repo_id=other_repo.repo_id,
390 identity_handle="alice",
391 permission="read",
392 accepted_at=_accepted_at,
393 )
394 db_session.add(collab)
395 await db_session.commit()
396
397 result = await musehub_repository.list_repos_for_user(db_session, "alice")
398 repo_ids = [r.repo_id for r in result.repos]
399 assert owner_repo.repo_id in repo_ids
400 assert other_repo.repo_id in repo_ids
401
402 @pytest.mark.asyncio
403 async def test_unaccepted_collab_not_included(
404 self, db_session: AsyncSession
405 ) -> None:
406 from musehub.services import musehub_repository
407 from musehub.db.musehub_collaborator_models import MusehubCollaborator
408
409 other_repo = await create_repo(db_session, slug="collab-pending",
410 owner="carol", owner_user_id=compute_identity_id(b"carol"))
411 _invited_at = datetime.now(tz=timezone.utc)
412 collab = MusehubCollaborator(
413 id=compute_collaborator_id(other_repo.repo_id, compute_identity_id(b"dave"), _invited_at.isoformat()),
414 repo_id=other_repo.repo_id,
415 identity_handle="dave",
416 permission="read",
417 accepted_at=None, # invitation not yet accepted
418 )
419 db_session.add(collab)
420 await db_session.commit()
421
422 result = await musehub_repository.list_repos_for_user(db_session, "dave")
423 assert all(r.repo_id != other_repo.repo_id for r in result.repos)
424
425
426 class TestTemplateRepoCopy:
427 @pytest.mark.asyncio
428 async def test_private_template_not_copied(self, db_session: AsyncSession) -> None:
429 from musehub.services import musehub_repository
430 tmpl = await create_repo(db_session, slug="tmpl-priv",
431 visibility="private", owner="alice",
432 owner_user_id=compute_identity_id(b"alice"))
433 # Give template a description
434 tmpl_row = await db_session.get(MusehubRepo, tmpl.repo_id)
435 assert tmpl_row is not None
436 tmpl_row.description = "Private description"
437 await db_session.commit()
438
439 new_repo = await musehub_repository.create_repo(
440 db_session,
441 name="my-new-repo",
442 owner="bob",
443 visibility="public",
444 owner_user_id=compute_identity_id(b"bob"),
445 template_repo_id=tmpl.repo_id,
446 )
447 await db_session.commit()
448 assert new_repo.description == "" # private template not applied
449
450
451 # ─────────────────────────────────────────────────────────────────────────────
452 # Layer 3 β€” E2E: HTTP endpoints not covered in test_musehub_repos.py
453 # ─────────────────────────────────────────────────────────────────────────────
454
455 class TestRepoStatsEndpoint:
456 @pytest.mark.asyncio
457 async def test_empty_repo_returns_zero_counts(
458 self, client: AsyncClient, db_session: AsyncSession
459 ) -> None:
460 repo = await create_repo(db_session, slug="e2e-stats-empty", visibility="public")
461 resp = await client.get(f"/api/repos/{repo.repo_id}/stats")
462 assert resp.status_code == 200
463 body = resp.json()
464 assert body["commitCount"] == 0
465 assert body["branchCount"] == 0
466 assert body["releaseCount"] == 0
467
468 @pytest.mark.asyncio
469 async def test_counts_reflect_data(
470 self, client: AsyncClient, db_session: AsyncSession
471 ) -> None:
472 repo = await create_repo(db_session, slug="e2e-stats-data", visibility="public")
473 await create_branch(db_session, repo.repo_id, name="main")
474 await create_branch(db_session, repo.repo_id, name="dev")
475 await create_commit(db_session, repo.repo_id, branch="main")
476
477 resp = await client.get(f"/api/repos/{repo.repo_id}/stats")
478 assert resp.status_code == 200
479 body = resp.json()
480 assert body["commitCount"] == 1
481 assert body["branchCount"] == 2
482
483 @pytest.mark.asyncio
484 async def test_unknown_repo_returns_404(
485 self, client: AsyncClient, db_session: AsyncSession
486 ) -> None:
487 resp = await client.get(f"/api/repos/{secrets.token_hex(16)}/stats")
488 assert resp.status_code == 404
489
490 @pytest.mark.asyncio
491 async def test_private_repo_without_auth_returns_401(
492 self, client: AsyncClient, db_session: AsyncSession
493 ) -> None:
494 repo = await create_repo(db_session, slug="e2e-stats-priv", visibility="private")
495 resp = await client.get(f"/api/repos/{repo.repo_id}/stats")
496 assert resp.status_code == 401
497
498
499 class TestBranchDetailEndpoint:
500 @pytest.mark.asyncio
501 async def test_returns_branch_list_with_detail(
502 self, client: AsyncClient, db_session: AsyncSession
503 ) -> None:
504 repo = await create_repo(db_session, slug="e2e-bwd-ok", visibility="public")
505 await create_branch(db_session, repo.repo_id, name="main")
506 await create_commit(db_session, repo.repo_id, branch="main")
507
508 resp = await client.get(f"/api/repos/{repo.repo_id}/branches/detail")
509 assert resp.status_code == 200
510 body = resp.json()
511 assert "branches" in body
512 assert "defaultBranch" in body
513 assert len(body["branches"]) == 1
514 branch = body["branches"][0]
515 assert branch["name"] == "main"
516 assert branch["isDefault"] is True
517 assert branch["aheadCount"] == 0
518 assert branch["behindCount"] == 0
519
520 @pytest.mark.asyncio
521 async def test_unknown_repo_returns_404(
522 self, client: AsyncClient, db_session: AsyncSession
523 ) -> None:
524 resp = await client.get(f"/api/repos/{secrets.token_hex(16)}/branches/detail")
525 assert resp.status_code == 404
526
527 @pytest.mark.asyncio
528 async def test_private_repo_without_auth_returns_401(
529 self, client: AsyncClient, db_session: AsyncSession
530 ) -> None:
531 repo = await create_repo(db_session, slug="e2e-bwd-priv", visibility="private")
532 resp = await client.get(f"/api/repos/{repo.repo_id}/branches/detail")
533 assert resp.status_code == 401
534
535
536 class TestSnapshotManifestEndpoint:
537 @pytest.mark.asyncio
538 async def test_returns_manifest(
539 self, client: AsyncClient, db_session: AsyncSession
540 ) -> None:
541 import msgpack
542 repo = await create_repo(db_session, slug="e2e-snap-ok", visibility="public")
543 snap_id = f"snap-{secrets.token_hex(4)}"
544 manifest = {"main.py": "sha256:abc"}
545 manifest_blob = msgpack.packb(manifest, use_bin_type=True)
546 snap = MusehubSnapshot(
547 snapshot_id=snap_id,
548 manifest_blob=manifest_blob,
549 entry_count=1,
550 )
551 db_session.add(snap)
552 db_session.add(MusehubSnapshotRef(repo_id=repo.repo_id, snapshot_id=snap_id))
553 await db_session.commit()
554
555 resp = await client.get(f"/api/repos/{repo.repo_id}/snapshots/{snap_id}")
556 assert resp.status_code == 200
557 body = resp.json()
558 assert body["snapshotId"] == snap_id
559 entry_paths = [e["path"] for e in body.get("entries", [])]
560 assert "main.py" in entry_paths
561
562 @pytest.mark.asyncio
563 async def test_unknown_snapshot_returns_404(
564 self, client: AsyncClient, db_session: AsyncSession
565 ) -> None:
566 repo = await create_repo(db_session, slug="e2e-snap-404", visibility="public")
567 resp = await client.get(f"/api/repos/{repo.repo_id}/snapshots/ghost-snap")
568 assert resp.status_code == 404
569
570
571 class TestPrivateRepoBranchesAndCommits:
572 @pytest.mark.asyncio
573 async def test_private_repo_branches_without_auth_returns_401(
574 self, client: AsyncClient, db_session: AsyncSession
575 ) -> None:
576 repo = await create_repo(db_session, slug="e2e-priv-branches", visibility="private")
577 resp = await client.get(f"/api/repos/{repo.repo_id}/branches")
578 assert resp.status_code == 401
579
580 @pytest.mark.asyncio
581 async def test_private_repo_commits_without_auth_returns_401(
582 self, client: AsyncClient, db_session: AsyncSession
583 ) -> None:
584 repo = await create_repo(db_session, slug="e2e-priv-commits", visibility="private")
585 resp = await client.get(f"/api/repos/{repo.repo_id}/commits")
586 assert resp.status_code == 401
587
588
589 class TestCreateRepoValidation:
590 @pytest.mark.asyncio
591 async def test_invalid_owner_with_spaces_returns_422(
592 self,
593 client: AsyncClient,
594 auth_headers: StrDict,
595 ) -> None:
596 resp = await client.post(
597 "/api/repos",
598 json={"name": "my-repo", "owner": "alice bob"},
599 headers=auth_headers,
600 )
601 assert resp.status_code == 422
602
603 @pytest.mark.asyncio
604 async def test_invalid_owner_uppercase_returns_422(
605 self,
606 client: AsyncClient,
607 auth_headers: StrDict,
608 ) -> None:
609 resp = await client.post(
610 "/api/repos",
611 json={"name": "my-repo", "owner": "Alice"},
612 headers=auth_headers,
613 )
614 assert resp.status_code == 422
615
616 @pytest.mark.asyncio
617 async def test_invalid_owner_leading_hyphen_returns_422(
618 self,
619 client: AsyncClient,
620 auth_headers: StrDict,
621 ) -> None:
622 resp = await client.post(
623 "/api/repos",
624 json={"name": "my-repo", "owner": "-alice"},
625 headers=auth_headers,
626 )
627 assert resp.status_code == 422
628
629 @pytest.mark.asyncio
630 async def test_empty_name_returns_422(
631 self,
632 client: AsyncClient,
633 auth_headers: StrDict,
634 ) -> None:
635 resp = await client.post(
636 "/api/repos",
637 json={"name": "", "owner": "testuser"},
638 headers=auth_headers,
639 )
640 assert resp.status_code == 422
641
642
643 # ─────────────────────────────────────────────────────────────────────────────
644 # Layer 4 β€” Stress
645 # ─────────────────────────────────────────────────────────────────────────────
646
647 class TestRepositoryServiceStress:
648 @pytest.mark.asyncio
649 async def test_create_50_repos_and_list_all(
650 self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict
651 ) -> None:
652 """Create 50 repos via HTTP; list must report total=50."""
653 COUNT = 50
654 for i in range(COUNT):
655 resp = await client.post(
656 "/api/repos",
657 json={"name": f"stress-repo-{i:03d}", "owner": "testuser",
658 "visibility": "public"},
659 headers=auth_headers,
660 )
661 assert resp.status_code == 201
662
663 resp = await client.get("/api/repos?limit=100", headers=auth_headers)
664 assert resp.status_code == 200
665 body = resp.json()
666 assert body["total"] >= COUNT
667
668 @pytest.mark.asyncio
669 async def test_cursor_pagination_traverses_all_repos(
670 self, db_session: AsyncSession
671 ) -> None:
672 """Insert 100 repos; cursor pagination must visit all of them."""
673 from musehub.services import musehub_repository
674
675 TOTAL = 100
676 owner_id = f"paginator-{secrets.token_hex(4)}"
677 for i in range(TOTAL):
678 await create_repo(db_session, slug=f"page-{i:03d}",
679 owner=owner_id, owner_user_id=owner_id)
680
681 collected: list[str] = []
682 cursor: str | None = None
683 while True:
684 page = await musehub_repository.list_repos_for_user(
685 db_session, owner_id, limit=10, cursor=cursor
686 )
687 collected.extend(r.repo_id for r in page.repos)
688 cursor = page.next_cursor
689 if cursor is None:
690 break
691
692 assert len(collected) == TOTAL
693 assert len(set(collected)) == TOTAL # no duplicates
694
695 @pytest.mark.asyncio
696 async def test_200_commit_history_pageable(self, db_session: AsyncSession) -> None:
697 """Push 200 commits; paging through them must yield all without duplicates."""
698 from musehub.services import musehub_repository
699
700 repo = await create_repo(db_session, slug="stress-200c")
701 for _ in range(200):
702 await create_commit(db_session, repo.repo_id, branch="main")
703
704 all_ids: list[str] = []
705 cursor: str | None = None
706 per_page = 50
707 for _ in range(1, 5):
708 result = await musehub_repository.list_commits(
709 db_session, repo.repo_id, limit=per_page, cursor=cursor
710 )
711 all_ids.extend(c.commit_id for c in result.commits)
712 cursor = result.next_cursor
713 if cursor is None:
714 break
715
716 assert result.total == 200
717 assert len(all_ids) == 200
718 assert len(set(all_ids)) == 200 # no duplicates across pages
719
720
721 # ─────────────────────────────────────────────────────────────────────────────
722 # Layer 5 β€” Data Integrity
723 # ─────────────────────────────────────────────────────────────────────────────
724
725 class TestDataIntegrity:
726 @pytest.mark.asyncio
727 async def test_delete_hard_deletes_row(
728 self, db_session: AsyncSession
729 ) -> None:
730 from musehub.services import musehub_repository
731
732 repo = await create_repo(db_session, slug="del-preserve")
733 repo_id = repo.repo_id
734 deleted = await musehub_repository.delete_repo(db_session, repo_id)
735 await db_session.commit()
736
737 assert deleted is True
738 # Row must be completely gone from the DB
739 row = await db_session.get(MusehubRepo, repo_id)
740 assert row is None
741
742 @pytest.mark.asyncio
743 async def test_double_soft_delete_is_idempotent(
744 self, db_session: AsyncSession
745 ) -> None:
746 from musehub.services import musehub_repository
747
748 repo = await create_repo(db_session, slug="del-idempotent")
749 first = await musehub_repository.delete_repo(db_session, repo.repo_id)
750 await db_session.commit()
751 second = await musehub_repository.delete_repo(db_session, repo.repo_id)
752 await db_session.commit()
753
754 assert first is True
755 assert second is False # already deleted
756
757 @pytest.mark.asyncio
758 async def test_transfer_on_deleted_repo_returns_none(
759 self, db_session: AsyncSession
760 ) -> None:
761 from musehub.services import musehub_repository
762
763 repo = await create_repo(db_session, slug="del-transfer")
764 await musehub_repository.delete_repo(db_session, repo.repo_id)
765 await db_session.commit()
766
767 result = await musehub_repository.transfer_repo_ownership(
768 db_session, repo.repo_id, "new-owner"
769 )
770 assert result is None
771
772 @pytest.mark.asyncio
773 async def test_duplicate_owner_slug_returns_409_via_http(
774 self,
775 client: AsyncClient,
776 db_session: AsyncSession,
777 auth_headers: StrDict,
778 ) -> None:
779 payload = {"name": "duplicate-name", "owner": "testuser"}
780 resp1 = await client.post("/api/repos", json=payload, headers=auth_headers)
781 assert resp1.status_code == 201
782 resp2 = await client.post("/api/repos", json=payload, headers=auth_headers)
783 assert resp2.status_code == 409
784
785 @pytest.mark.asyncio
786 async def test_create_repo_service_sets_correct_slug(
787 self, db_session: AsyncSession
788 ) -> None:
789 from musehub.services import musehub_repository
790
791 repo = await musehub_repository.create_repo(
792 db_session,
793 name="My Jazz Experiment!",
794 owner="gabriel",
795 visibility="public",
796 owner_user_id=compute_identity_id(b"gabriel"),
797 )
798 await db_session.commit()
799 assert repo.slug == "my-jazz-experiment"
800
801
802 # ─────────────────────────────────────────────────────────────────────────────
803 # Layer 6 β€” Security
804 # ─────────────────────────────────────────────────────────────────────────────
805
806 class TestSecurity:
807 @pytest.mark.asyncio
808 async def test_non_owner_delete_returns_403(
809 self,
810 client: AsyncClient,
811 db_session: AsyncSession,
812 auth_headers: StrDict,
813 ) -> None:
814 """Only owner may delete β€” authenticated non-owner gets 403."""
815 # Create a repo owned by someone else
816 _other_id = compute_identity_id(b"other-user")
817 _now = datetime.now(tz=timezone.utc)
818 repo_row = MusehubRepo(
819 repo_id=compute_repo_id(_other_id, "not-mine", "code", _now.isoformat()),
820 name="not-mine",
821 owner="other-user",
822 slug="not-mine",
823 visibility="public",
824 owner_user_id=_other_id,
825 description="",
826 tags=[],
827 created_at=_now,
828 updated_at=_now,
829 )
830 db_session.add(repo_row)
831 await db_session.commit()
832
833 resp = await client.delete(
834 f"/api/repos/{repo_row.repo_id}", headers=auth_headers
835 )
836 assert resp.status_code == 403
837
838 @pytest.mark.asyncio
839 async def test_non_owner_transfer_returns_403(
840 self,
841 client: AsyncClient,
842 db_session: AsyncSession,
843 auth_headers: StrDict,
844 ) -> None:
845 _stranger_id = compute_identity_id(b"stranger")
846 _now2 = datetime.now(tz=timezone.utc)
847 repo_row = MusehubRepo(
848 repo_id=compute_repo_id(_stranger_id, "no-transfer", "code", _now2.isoformat()),
849 name="no-transfer",
850 owner="stranger",
851 slug="no-transfer",
852 visibility="public",
853 owner_user_id=_stranger_id,
854 description="",
855 tags=[],
856 created_at=_now2,
857 updated_at=_now2,
858 )
859 db_session.add(repo_row)
860 await db_session.commit()
861
862 resp = await client.post(
863 f"/api/repos/{repo_row.repo_id}/transfer",
864 json={"newOwnerUserId": "hacker"},
865 headers=auth_headers,
866 )
867 assert resp.status_code == 403
868
869 @pytest.mark.asyncio
870 async def test_private_repo_get_without_auth_returns_401(
871 self, client: AsyncClient, db_session: AsyncSession
872 ) -> None:
873 repo = await create_repo(db_session, slug="sec-priv-get", visibility="private")
874 resp = await client.get(f"/api/repos/{repo.repo_id}")
875 assert resp.status_code == 401
876
877 @pytest.mark.asyncio
878 async def test_delete_requires_auth(
879 self, client: AsyncClient, db_session: AsyncSession
880 ) -> None:
881 repo = await create_repo(db_session, slug="sec-del-noauth", visibility="public")
882 resp = await client.delete(f"/api/repos/{repo.repo_id}")
883 assert resp.status_code == 401
884
885 @pytest.mark.asyncio
886 async def test_transfer_requires_auth(
887 self, client: AsyncClient, db_session: AsyncSession
888 ) -> None:
889 repo = await create_repo(db_session, slug="sec-xfer-noauth", visibility="public")
890 resp = await client.post(
891 f"/api/repos/{repo.repo_id}/transfer",
892 json={"newOwnerUserId": "anyone"},
893 )
894 assert resp.status_code == 401
895
896
897 # ─────────────────────────────────────────────────────────────────────────────
898 # Layer 7 β€” Performance
899 # ─────────────────────────────────────────────────────────────────────────────
900
901 class TestPerformance:
902 def test_generate_slug_1000_calls_under_100ms(self) -> None:
903 from musehub.services.musehub_repository import _generate_slug
904 names = [f"My Repo Number {i} β€” Special Γ‰dition!" for i in range(1000)]
905 t0 = time.perf_counter()
906 for name in names:
907 _generate_slug(name)
908 elapsed_ms = (time.perf_counter() - t0) * 1000
909 assert elapsed_ms < 100, f"1000 slug calls took {elapsed_ms:.1f}ms > 100ms"
910
911 @pytest.mark.asyncio
912 async def test_list_repos_50_users_under_500ms(
913 self, db_session: AsyncSession
914 ) -> None:
915 from musehub.services import musehub_repository
916
917 uid = f"perf-user-{secrets.token_hex(4)}"
918 for i in range(50):
919 await create_repo(db_session, slug=f"perf-r{i:02d}",
920 owner=uid, owner_user_id=uid)
921
922 # Warm-up
923 await musehub_repository.list_repos_for_user(db_session, uid, limit=50)
924
925 t0 = time.perf_counter()
926 result = await musehub_repository.list_repos_for_user(db_session, uid, limit=50)
927 elapsed_ms = (time.perf_counter() - t0) * 1000
928 assert len(result.repos) == 50
929 assert elapsed_ms < 500, f"list_repos 50 items took {elapsed_ms:.1f}ms > 500ms"
930
931 @pytest.mark.asyncio
932 async def test_get_repo_home_stats_200_commits_under_500ms(
933 self, db_session: AsyncSession
934 ) -> None:
935 from musehub.services import musehub_repository
936
937 repo = await create_repo(db_session, slug="perf-stats-200c")
938 for _ in range(200):
939 await create_commit(db_session, repo.repo_id, branch="main")
940
941 # Warm-up
942 await musehub_repository.get_repo_home_stats(db_session, repo.repo_id, "main")
943
944 t0 = time.perf_counter()
945 stats = await musehub_repository.get_repo_home_stats(db_session, repo.repo_id, "main")
946 elapsed_ms = (time.perf_counter() - t0) * 1000
947 assert stats["total_commits"] == 200
948 assert elapsed_ms < 500, f"get_repo_home_stats took {elapsed_ms:.1f}ms > 500ms"
949
950 @pytest.mark.asyncio
951 async def test_list_commits_200_rows_under_200ms(
952 self, db_session: AsyncSession
953 ) -> None:
954 from musehub.services import musehub_repository
955
956 repo = await create_repo(db_session, slug="perf-commits-200")
957 for _ in range(200):
958 await create_commit(db_session, repo.repo_id, branch="main")
959
960 # Warm-up
961 await musehub_repository.list_commits(db_session, repo.repo_id, limit=200)
962
963 t0 = time.perf_counter()
964 result = await musehub_repository.list_commits(
965 db_session, repo.repo_id, limit=200
966 )
967 elapsed_ms = (time.perf_counter() - t0) * 1000
968 assert result.total == 200
969 assert len(result.commits) == 200
970 assert elapsed_ms < 200, f"list_commits 200 rows took {elapsed_ms:.1f}ms > 200ms"
971
972
973 # ─────────────────────────────────────────────────────────────────────────────
974 # Layer 8 β€” Regression: soft-deleted repos must not surface via owner/slug path
975 # ─────────────────────────────────────────────────────────────────────────────
976
977 class TestDeleteOwnerSlugRegression:
978 """Regression suite: hard-deleted repos must not surface via owner/slug path."""
979
980 @pytest.mark.asyncio
981 async def test_owner_slug_http_returns_404_for_deleted_repo(
982 self, client: AsyncClient, db_session: AsyncSession
983 ) -> None:
984 """HTTP /{owner}/{slug} must return 404 for a hard-deleted repo."""
985 repo = await create_repo(db_session, slug="http-deleted", owner="httpuser",
986 owner_user_id=compute_identity_id(b"httpuser"), visibility="public")
987 await db_session.delete(repo)
988 await db_session.commit()
989
990 resp = await client.get("/api/httpuser/http-deleted")
991 assert resp.status_code == 404, (
992 f"Expected 404 for hard-deleted repo via /owner/slug path, got {resp.status_code}"
993 )
994
995
996 # ─────────────────────────────────────────────────────────────────────────────
997 # Layer 9 β€” domain_id always persisted on creation
998 # ─────────────────────────────────────────────────────────────────────────────
999
1000 class TestDomainIdAlwaysPersisted:
1001 """domain_id must be written to the DB row on every creation path.
1002
1003 Previously create_repo() used the domain arg only for compute_repo_id()
1004 but never stored it β€” leaving domain_id NULL for every normal repo.
1005 """
1006
1007 @pytest.mark.asyncio
1008 async def test_create_repo_explicit_domain_stored(
1009 self, db_session: AsyncSession
1010 ) -> None:
1011 """When caller passes domain='midi', domain_id='midi' is on the DB row."""
1012 from musehub.services.musehub_repository import create_repo as svc_create_repo
1013 result = await svc_create_repo(
1014 db_session,
1015 name="midi-test",
1016 owner="testuser",
1017 visibility="public",
1018 owner_user_id="testuser",
1019 owner_identity_id="testuser",
1020 domain="midi",
1021 )
1022 await db_session.commit()
1023 row = await db_session.get(MusehubRepo, result.repo_id)
1024 assert row is not None
1025 assert row.domain_id == "midi"
1026
1027 @pytest.mark.asyncio
1028 async def test_create_repo_no_domain_defaults_to_code(
1029 self, db_session: AsyncSession
1030 ) -> None:
1031 """When no domain is passed, domain_id='code' is stored β€” never NULL."""
1032 from musehub.services.musehub_repository import create_repo as svc_create_repo
1033 result = await svc_create_repo(
1034 db_session,
1035 name="no-domain-test",
1036 owner="testuser",
1037 visibility="public",
1038 owner_user_id="testuser",
1039 owner_identity_id="testuser",
1040 )
1041 await db_session.commit()
1042 row = await db_session.get(MusehubRepo, result.repo_id)
1043 assert row is not None
1044 assert row.domain_id == "code"
1045
1046 @pytest.mark.asyncio
1047 async def test_create_repo_empty_domain_defaults_to_code(
1048 self, db_session: AsyncSession
1049 ) -> None:
1050 """Explicit empty-string domain also resolves to 'code', never NULL."""
1051 from musehub.services.musehub_repository import create_repo as svc_create_repo
1052 result = await svc_create_repo(
1053 db_session,
1054 name="empty-domain-test",
1055 owner="testuser",
1056 visibility="public",
1057 owner_user_id="testuser",
1058 owner_identity_id="testuser",
1059 domain="",
1060 )
1061 await db_session.commit()
1062 row = await db_session.get(MusehubRepo, result.repo_id)
1063 assert row is not None
1064 assert row.domain_id == "code"
1065
1066 @pytest.mark.asyncio
1067 async def test_fork_inherits_source_domain(
1068 self, db_session: AsyncSession
1069 ) -> None:
1070 """Forking a midi repo produces a fork with domain_id='midi'."""
1071 from musehub.services.musehub_repository import create_repo as svc_create_repo, fork_repo
1072 from musehub.models.musehub import ForkRepoRequest
1073
1074 source = await svc_create_repo(
1075 db_session,
1076 name="source-midi",
1077 owner="sourceuser",
1078 visibility="public",
1079 owner_user_id="sourceuser",
1080 owner_identity_id="sourceuser",
1081 domain="midi",
1082 )
1083 await db_session.commit()
1084
1085 fork_result = await fork_repo(
1086 db_session,
1087 source_repo_id=source.repo_id,
1088 forked_by_handle="forkuser",
1089 request=ForkRepoRequest(),
1090 )
1091 await db_session.commit()
1092 fork_row = await db_session.get(MusehubRepo, fork_result.fork_repo.repo_id)
1093 assert fork_row is not None
1094 assert fork_row.domain_id == "midi"
1095
1096 @pytest.mark.asyncio
1097 async def test_fork_source_null_domain_becomes_code(
1098 self, db_session: AsyncSession
1099 ) -> None:
1100 """Forking a repo whose domain_id is NULL in DB produces fork with domain_id='code'."""
1101 from musehub.services.musehub_repository import fork_repo
1102 from musehub.models.musehub import ForkRepoRequest
1103
1104 # Simulate a legacy row with NULL domain_id
1105 created_at = datetime.now(tz=timezone.utc)
1106 legacy = MusehubRepo(
1107 repo_id=compute_repo_id("legacyuser", "legacy-repo", "muse/generic", created_at.isoformat()),
1108 name="legacy-repo",
1109 owner="legacyuser",
1110 slug="legacy-repo",
1111 visibility="public",
1112 owner_user_id="legacyuser",
1113 domain_id=None,
1114 created_at=created_at,
1115 updated_at=created_at,
1116 )
1117 db_session.add(legacy)
1118 await db_session.commit()
1119
1120 fork_result = await fork_repo(
1121 db_session,
1122 source_repo_id=legacy.repo_id,
1123 forked_by_handle="forkuser2",
1124 request=ForkRepoRequest(),
1125 )
1126 await db_session.commit()
1127 fork_row = await db_session.get(MusehubRepo, fork_result.fork_repo.repo_id)
1128 assert fork_row is not None
1129 assert fork_row.domain_id == "code"
1130
1131 @pytest.mark.asyncio
1132 async def test_api_create_repo_domain_stored(
1133 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
1134 ) -> None:
1135 """POST /api/repos with domain='midi' stores domain_id='midi' on the DB row."""
1136 resp = await client.post(
1137 "/api/repos",
1138 json={"name": "api-midi-repo", "owner": "testuser", "visibility": "public", "domain": "midi"},
1139 headers=auth_headers,
1140 )
1141 assert resp.status_code == 201
1142 repo_id = resp.json()["repoId"]
1143 row = await db_session.get(MusehubRepo, repo_id)
1144 assert row is not None
1145 assert row.domain_id == "midi"
1146
1147 @pytest.mark.asyncio
1148 async def test_api_create_repo_no_domain_defaults_to_code(
1149 self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession
1150 ) -> None:
1151 """POST /api/repos without domain field stores domain_id='code' on the DB row."""
1152 resp = await client.post(
1153 "/api/repos",
1154 json={"name": "api-no-domain-repo", "owner": "testuser", "visibility": "public"},
1155 headers=auth_headers,
1156 )
1157 assert resp.status_code == 201
1158 repo_id = resp.json()["repoId"]
1159 row = await db_session.get(MusehubRepo, repo_id)
1160 assert row is not None
1161 assert row.domain_id == "code"
1162