"""Supplemental tests for the Repository Service — Section 4. This file fills the gaps left by the existing test_musehub_repos.py (170 tests). It does NOT duplicate what is already covered there. Focus: Coverage layers ─────────────── Unit — _generate_slug (all edge cases), _guard_visibility, _guard_owner as pure-function unit tests; resolve_head_ref logic; list_branches_with_detail ahead/behind computation. Integration — get_repo_home_stats (commit counts, 14-day activity array, file count from snapshot); get_recently_pushed_branches; collaborator repos appearing in list_repos_for_user; template copy (private template silently skipped); transfer on soft-deleted repo → None. E2E — GET /api/repos/{repo_id}/stats; GET /api/repos/{repo_id}/branches/detail; GET /api/repos/{repo_id}/snapshots/{snapshot_id}; private repo branches/commits → 401 without auth; invalid owner pattern → 422; stats on private repo → 401. Stress — Create and list 50 repos; cursor pagination through 100 repos; 200-commit history paging. Data — Soft-delete preserves data in DB; double soft-delete is idempotent; get_repo skips soft-deleted rows; transfer on deleted repo → None; duplicate (owner, slug) → 409 on HTTP, IntegrityError at service level. Security — Invalid owner pattern (spaces, uppercase, leading hyphen) → 422; private branches endpoint → 401; private commits endpoint → 401; private stats endpoint → 401; non-owner delete → 403; non-owner transfer → 403. Performance — _generate_slug 1 000 calls < 100 ms; list_repos_for_user 50 repos < 500 ms; get_repo_home_stats with 200 commits < 500 ms; list_commits 200-row page < 200 ms. """ from __future__ import annotations import secrets import time from datetime import datetime, timezone from pathlib import Path import pytest from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_branch_id, compute_collaborator_id, compute_identity_id, compute_repo_id from musehub.db.musehub_repo_models import MusehubBranch, MusehubObject, MusehubObjectRef, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef from musehub.models.musehub import RepoResponse from tests.factories import create_repo, create_branch, create_commit from musehub.types.json_types import StrDict # ───────────────────────────────────────────────────────────────────────────── # Layer 1 — Unit: pure functions (no DB, no HTTP) # ───────────────────────────────────────────────────────────────────────────── class TestGenerateSlug: """_generate_slug must produce valid URL-safe slugs from arbitrary names.""" def _slug(self, name: str) -> str: from musehub.services.musehub_repository import _generate_slug return _generate_slug(name) def test_lowercase(self) -> None: assert self._slug("Neo Soul Experiment") == "neo-soul-experiment" def test_special_chars_collapsed_to_hyphens(self) -> None: assert self._slug("jazz & blues / 2024") == "jazz-blues-2024" def test_leading_trailing_hyphens_stripped(self) -> None: assert self._slug("---beats---") == "beats" def test_all_symbols_falls_back_to_repo(self) -> None: assert self._slug("!!!@@@###") == "repo" def test_empty_string_falls_back_to_repo(self) -> None: assert self._slug("") == "repo" def test_max_64_chars(self) -> None: long_name = "a" * 100 result = self._slug(long_name) assert len(result) <= 64 def test_truncation_does_not_leave_trailing_hyphen(self) -> None: # Name that would produce a hyphen right at position 64 name = "a" * 63 + "-b" * 10 result = self._slug(name) assert not result.endswith("-") assert len(result) <= 64 def test_numbers_preserved(self) -> None: assert self._slug("track-01") == "track-01" def test_consecutive_special_chars_single_hyphen(self) -> None: assert self._slug("a -- b") == "a-b" def test_unicode_non_ascii_collapsed(self) -> None: result = self._slug("café") # "café" → "caf-" → "caf" (stripped) or similar — must be alphanumeric+hyphen only assert all(c.isascii() and (c.isalnum() or c == "-") for c in result) class TestGuardVisibility: """_guard_visibility raises correct HTTP exceptions.""" def test_raises_404_when_repo_is_none(self) -> None: from fastapi import HTTPException from musehub.api.routes.musehub.repos import _guard_visibility with pytest.raises(HTTPException) as exc_info: _guard_visibility(None, None) assert exc_info.value.status_code == 404 def test_raises_401_for_private_repo_without_auth(self) -> None: from fastapi import HTTPException from musehub.api.routes.musehub.repos import _guard_visibility from musehub.models.musehub import RepoResponse from datetime import datetime, timezone _alice_id = compute_identity_id(b"alice") _ts = datetime.now(tz=timezone.utc) repo = RepoResponse( repo_id=compute_repo_id(_alice_id, "secret", "code", _ts.isoformat()), name="secret", owner="alice", slug="secret", visibility="private", owner_user_id=_alice_id, description="", tags=[], clone_url="musehub://alice/secret", created_at=_ts, updated_at=_ts, default_branch="main", ) with pytest.raises(HTTPException) as exc_info: _guard_visibility(repo, None) assert exc_info.value.status_code == 401 def test_no_raise_for_public_repo_without_auth(self) -> None: from musehub.api.routes.musehub.repos import _guard_visibility from musehub.models.musehub import RepoResponse _alice_id2 = compute_identity_id(b"alice") _ts2 = datetime.now(tz=timezone.utc) repo = RepoResponse( repo_id=compute_repo_id(_alice_id2, "open", "code", _ts2.isoformat()), name="open", owner="alice", slug="open", visibility="public", owner_user_id=_alice_id2, description="", tags=[], clone_url="musehub://alice/open", created_at=_ts2, updated_at=_ts2, default_branch="main", ) _guard_visibility(repo, None) # must not raise class TestGuardOwner: """_guard_owner raises correct HTTP exceptions.""" def _repo(self, owner: str = "alice") -> RepoResponse: _owner_id = compute_identity_id(owner.encode()) _ts = datetime.now(tz=timezone.utc) return RepoResponse( repo_id=compute_repo_id(_owner_id, "r", "code", _ts.isoformat()), name="r", owner=owner, slug="r", visibility="public", owner_user_id=_owner_id, description="", tags=[], clone_url=f"musehub://{owner}/r", created_at=_ts, updated_at=_ts, default_branch="main", ) def test_raises_404_when_repo_is_none(self) -> None: from fastapi import HTTPException from musehub.api.routes.musehub.repos import _guard_owner with pytest.raises(HTTPException) as exc_info: _guard_owner(None, "alice") assert exc_info.value.status_code == 404 def test_raises_403_for_non_owner(self) -> None: from fastapi import HTTPException from musehub.api.routes.musehub.repos import _guard_owner with pytest.raises(HTTPException) as exc_info: _guard_owner(self._repo("alice"), "bob") assert exc_info.value.status_code == 403 def test_no_raise_for_owner(self) -> None: from musehub.api.routes.musehub.repos import _guard_owner _guard_owner(self._repo("alice"), "alice") # must not raise class TestResolveHeadRef: """resolve_head_ref prefers 'main', falls back to first alphabetically.""" @pytest.mark.asyncio async def test_empty_repo_returns_main(self, db_session: AsyncSession) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="rhr-empty") result = await musehub_repository.resolve_head_ref(db_session, repo.repo_id) assert result == "main" @pytest.mark.asyncio async def test_prefers_main_branch(self, db_session: AsyncSession) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="rhr-main") await create_branch(db_session, repo.repo_id, name="dev") await create_branch(db_session, repo.repo_id, name="main") result = await musehub_repository.resolve_head_ref(db_session, repo.repo_id) assert result == "main" @pytest.mark.asyncio async def test_falls_back_to_first_alpha_when_no_main( self, db_session: AsyncSession ) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="rhr-alpha") await create_branch(db_session, repo.repo_id, name="dev") await create_branch(db_session, repo.repo_id, name="alpha") result = await musehub_repository.resolve_head_ref(db_session, repo.repo_id) assert result == "alpha" # first alphabetically class TestListBranchesWithDetail: """list_branches_with_detail computes ahead/behind counts correctly.""" @pytest.mark.asyncio async def test_empty_repo_returns_empty(self, db_session: AsyncSession) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="bwd-empty") result = await musehub_repository.list_branches_with_detail(db_session, repo.repo_id) assert result.branches == [] @pytest.mark.asyncio async def test_default_branch_has_zero_ahead_behind( self, db_session: AsyncSession ) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="bwd-default") await create_branch(db_session, repo.repo_id, name="main") await create_commit(db_session, repo.repo_id, branch="main") result = await musehub_repository.list_branches_with_detail(db_session, repo.repo_id) main_detail = next(b for b in result.branches if b.name == "main") assert main_detail.is_default is True assert main_detail.ahead_count == 0 assert main_detail.behind_count == 0 @pytest.mark.asyncio async def test_feature_branch_ahead_count(self, db_session: AsyncSession) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="bwd-ahead") await create_branch(db_session, repo.repo_id, name="main") await create_branch(db_session, repo.repo_id, name="feat") # 1 commit on main, 3 on feat await create_commit(db_session, repo.repo_id, branch="main") for _ in range(3): await create_commit(db_session, repo.repo_id, branch="feat") result = await musehub_repository.list_branches_with_detail(db_session, repo.repo_id) feat = next(b for b in result.branches if b.name == "feat") # feat has 3 commits not in main → ahead=3; main has 1 commit not in feat → behind=1 assert feat.ahead_count == 3 assert feat.behind_count == 1 # ───────────────────────────────────────────────────────────────────────────── # Layer 2 — Integration: service layer with real DB # ───────────────────────────────────────────────────────────────────────────── class TestGetRepoHomeStats: @pytest.mark.asyncio async def test_empty_repo_returns_zeros(self, db_session: AsyncSession) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="stats-empty") stats = await musehub_repository.get_repo_home_stats(db_session, repo.repo_id, "main") assert stats["total_commits"] == 0 assert stats["total_objects"] == 0 assert stats["total_size_bytes"] == 0 assert stats["commit_activity"] == [0] * 14 @pytest.mark.asyncio async def test_commit_count_reflects_actual_commits( self, db_session: AsyncSession ) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="stats-commits") for _ in range(5): await create_commit(db_session, repo.repo_id, branch="main") stats = await musehub_repository.get_repo_home_stats(db_session, repo.repo_id, "main") assert stats["total_commits"] == 5 @pytest.mark.asyncio async def test_activity_array_has_14_entries(self, db_session: AsyncSession) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="stats-activity") stats = await musehub_repository.get_repo_home_stats(db_session, repo.repo_id, "main") assert len(stats["commit_activity"]) == 14 @pytest.mark.asyncio async def test_object_count_and_size_bytes( self, db_session: AsyncSession, tmp_path: Path ) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="stats-objects") for i in range(3): oid = f"sha256:stats{i}" obj = MusehubObject( object_id=oid, path=f"f{i}.bin", size_bytes=100, ) db_session.add(obj) db_session.add(MusehubObjectRef(repo_id=repo.repo_id, object_id=oid)) await db_session.commit() stats = await musehub_repository.get_repo_home_stats(db_session, repo.repo_id, "main") assert stats["total_objects"] == 3 assert stats["total_size_bytes"] == 300 class TestGetRecentlyPushedBranches: @pytest.mark.asyncio async def test_no_recent_branches_returns_empty( self, db_session: AsyncSession ) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="recent-empty") result = await musehub_repository.get_recently_pushed_branches( db_session, repo.repo_id, "main" ) assert result == [] @pytest.mark.asyncio async def test_current_ref_excluded(self, db_session: AsyncSession) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="recent-exclude") commit = await create_commit(db_session, repo.repo_id, branch="main") await create_branch(db_session, repo.repo_id, name="main", head_commit_id=commit.commit_id) result = await musehub_repository.get_recently_pushed_branches( db_session, repo.repo_id, "main" ) assert all(b["name"] != "main" for b in result) @pytest.mark.asyncio async def test_recent_branch_appears(self, db_session: AsyncSession) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="recent-feat") commit = await create_commit(db_session, repo.repo_id, branch="feat") feat = MusehubBranch( branch_id=compute_branch_id(repo.repo_id, "feat"), repo_id=repo.repo_id, name="feat", head_commit_id=commit.commit_id, ) db_session.add(feat) await db_session.commit() result = await musehub_repository.get_recently_pushed_branches( db_session, repo.repo_id, "main", within_hours=72 ) assert any(b["name"] == "feat" for b in result) class TestListReposForUserWithCollaborators: @pytest.mark.asyncio async def test_collab_repos_included_in_list( self, db_session: AsyncSession ) -> None: from musehub.services import musehub_repository from musehub.db.musehub_collaborator_models import MusehubCollaborator owner_repo = await create_repo(db_session, slug="collab-owned", owner="alice", owner_user_id=compute_identity_id(b"alice")) other_repo = await create_repo(db_session, slug="collab-shared", owner="bob", owner_user_id=compute_identity_id(b"bob")) # alice is an accepted collaborator on bob's repo _accepted_at = datetime.now(tz=timezone.utc) collab = MusehubCollaborator( id=compute_collaborator_id(other_repo.repo_id, compute_identity_id(b"alice"), _accepted_at.isoformat()), repo_id=other_repo.repo_id, identity_handle="alice", permission="read", accepted_at=_accepted_at, ) db_session.add(collab) await db_session.commit() result = await musehub_repository.list_repos_for_user(db_session, "alice") repo_ids = [r.repo_id for r in result.repos] assert owner_repo.repo_id in repo_ids assert other_repo.repo_id in repo_ids @pytest.mark.asyncio async def test_unaccepted_collab_not_included( self, db_session: AsyncSession ) -> None: from musehub.services import musehub_repository from musehub.db.musehub_collaborator_models import MusehubCollaborator other_repo = await create_repo(db_session, slug="collab-pending", owner="carol", owner_user_id=compute_identity_id(b"carol")) _invited_at = datetime.now(tz=timezone.utc) collab = MusehubCollaborator( id=compute_collaborator_id(other_repo.repo_id, compute_identity_id(b"dave"), _invited_at.isoformat()), repo_id=other_repo.repo_id, identity_handle="dave", permission="read", accepted_at=None, # invitation not yet accepted ) db_session.add(collab) await db_session.commit() result = await musehub_repository.list_repos_for_user(db_session, "dave") assert all(r.repo_id != other_repo.repo_id for r in result.repos) class TestTemplateRepoCopy: @pytest.mark.asyncio async def test_private_template_not_copied(self, db_session: AsyncSession) -> None: from musehub.services import musehub_repository tmpl = await create_repo(db_session, slug="tmpl-priv", visibility="private", owner="alice", owner_user_id=compute_identity_id(b"alice")) # Give template a description tmpl_row = await db_session.get(MusehubRepo, tmpl.repo_id) assert tmpl_row is not None tmpl_row.description = "Private description" await db_session.commit() new_repo = await musehub_repository.create_repo( db_session, name="my-new-repo", owner="bob", visibility="public", owner_user_id=compute_identity_id(b"bob"), template_repo_id=tmpl.repo_id, ) await db_session.commit() assert new_repo.description == "" # private template not applied # ───────────────────────────────────────────────────────────────────────────── # Layer 3 — E2E: HTTP endpoints not covered in test_musehub_repos.py # ───────────────────────────────────────────────────────────────────────────── class TestRepoStatsEndpoint: @pytest.mark.asyncio async def test_empty_repo_returns_zero_counts( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="e2e-stats-empty", visibility="public") resp = await client.get(f"/api/repos/{repo.repo_id}/stats") assert resp.status_code == 200 body = resp.json() assert body["commitCount"] == 0 assert body["branchCount"] == 0 assert body["releaseCount"] == 0 @pytest.mark.asyncio async def test_counts_reflect_data( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="e2e-stats-data", visibility="public") await create_branch(db_session, repo.repo_id, name="main") await create_branch(db_session, repo.repo_id, name="dev") await create_commit(db_session, repo.repo_id, branch="main") resp = await client.get(f"/api/repos/{repo.repo_id}/stats") assert resp.status_code == 200 body = resp.json() assert body["commitCount"] == 1 assert body["branchCount"] == 2 @pytest.mark.asyncio async def test_unknown_repo_returns_404( self, client: AsyncClient, db_session: AsyncSession ) -> None: resp = await client.get(f"/api/repos/{secrets.token_hex(16)}/stats") assert resp.status_code == 404 @pytest.mark.asyncio async def test_private_repo_without_auth_returns_401( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="e2e-stats-priv", visibility="private") resp = await client.get(f"/api/repos/{repo.repo_id}/stats") assert resp.status_code == 401 class TestBranchDetailEndpoint: @pytest.mark.asyncio async def test_returns_branch_list_with_detail( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="e2e-bwd-ok", visibility="public") await create_branch(db_session, repo.repo_id, name="main") await create_commit(db_session, repo.repo_id, branch="main") resp = await client.get(f"/api/repos/{repo.repo_id}/branches/detail") assert resp.status_code == 200 body = resp.json() assert "branches" in body assert "defaultBranch" in body assert len(body["branches"]) == 1 branch = body["branches"][0] assert branch["name"] == "main" assert branch["isDefault"] is True assert branch["aheadCount"] == 0 assert branch["behindCount"] == 0 @pytest.mark.asyncio async def test_unknown_repo_returns_404( self, client: AsyncClient, db_session: AsyncSession ) -> None: resp = await client.get(f"/api/repos/{secrets.token_hex(16)}/branches/detail") assert resp.status_code == 404 @pytest.mark.asyncio async def test_private_repo_without_auth_returns_401( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="e2e-bwd-priv", visibility="private") resp = await client.get(f"/api/repos/{repo.repo_id}/branches/detail") assert resp.status_code == 401 class TestSnapshotManifestEndpoint: @pytest.mark.asyncio async def test_returns_manifest( self, client: AsyncClient, db_session: AsyncSession ) -> None: import msgpack repo = await create_repo(db_session, slug="e2e-snap-ok", visibility="public") snap_id = f"snap-{secrets.token_hex(4)}" manifest = {"main.py": "sha256:abc"} manifest_blob = msgpack.packb(manifest, use_bin_type=True) snap = MusehubSnapshot( snapshot_id=snap_id, manifest_blob=manifest_blob, entry_count=1, ) db_session.add(snap) db_session.add(MusehubSnapshotRef(repo_id=repo.repo_id, snapshot_id=snap_id)) await db_session.commit() resp = await client.get(f"/api/repos/{repo.repo_id}/snapshots/{snap_id}") assert resp.status_code == 200 body = resp.json() assert body["snapshotId"] == snap_id entry_paths = [e["path"] for e in body.get("entries", [])] assert "main.py" in entry_paths @pytest.mark.asyncio async def test_unknown_snapshot_returns_404( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="e2e-snap-404", visibility="public") resp = await client.get(f"/api/repos/{repo.repo_id}/snapshots/ghost-snap") assert resp.status_code == 404 class TestPrivateRepoBranchesAndCommits: @pytest.mark.asyncio async def test_private_repo_branches_without_auth_returns_401( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="e2e-priv-branches", visibility="private") resp = await client.get(f"/api/repos/{repo.repo_id}/branches") assert resp.status_code == 401 @pytest.mark.asyncio async def test_private_repo_commits_without_auth_returns_401( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="e2e-priv-commits", visibility="private") resp = await client.get(f"/api/repos/{repo.repo_id}/commits") assert resp.status_code == 401 class TestCreateRepoValidation: @pytest.mark.asyncio async def test_invalid_owner_with_spaces_returns_422( self, client: AsyncClient, auth_headers: StrDict, ) -> None: resp = await client.post( "/api/repos", json={"name": "my-repo", "owner": "alice bob"}, headers=auth_headers, ) assert resp.status_code == 422 @pytest.mark.asyncio async def test_invalid_owner_uppercase_returns_422( self, client: AsyncClient, auth_headers: StrDict, ) -> None: resp = await client.post( "/api/repos", json={"name": "my-repo", "owner": "Alice"}, headers=auth_headers, ) assert resp.status_code == 422 @pytest.mark.asyncio async def test_invalid_owner_leading_hyphen_returns_422( self, client: AsyncClient, auth_headers: StrDict, ) -> None: resp = await client.post( "/api/repos", json={"name": "my-repo", "owner": "-alice"}, headers=auth_headers, ) assert resp.status_code == 422 @pytest.mark.asyncio async def test_empty_name_returns_422( self, client: AsyncClient, auth_headers: StrDict, ) -> None: resp = await client.post( "/api/repos", json={"name": "", "owner": "testuser"}, headers=auth_headers, ) assert resp.status_code == 422 # ───────────────────────────────────────────────────────────────────────────── # Layer 4 — Stress # ───────────────────────────────────────────────────────────────────────────── class TestRepositoryServiceStress: @pytest.mark.asyncio async def test_create_50_repos_and_list_all( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict ) -> None: """Create 50 repos via HTTP; list must report total=50.""" COUNT = 50 for i in range(COUNT): resp = await client.post( "/api/repos", json={"name": f"stress-repo-{i:03d}", "owner": "testuser", "visibility": "public"}, headers=auth_headers, ) assert resp.status_code == 201 resp = await client.get("/api/repos?limit=100", headers=auth_headers) assert resp.status_code == 200 body = resp.json() assert body["total"] >= COUNT @pytest.mark.asyncio async def test_cursor_pagination_traverses_all_repos( self, db_session: AsyncSession ) -> None: """Insert 100 repos; cursor pagination must visit all of them.""" from musehub.services import musehub_repository TOTAL = 100 owner_id = f"paginator-{secrets.token_hex(4)}" for i in range(TOTAL): await create_repo(db_session, slug=f"page-{i:03d}", owner=owner_id, owner_user_id=owner_id) collected: list[str] = [] cursor: str | None = None while True: page = await musehub_repository.list_repos_for_user( db_session, owner_id, limit=10, cursor=cursor ) collected.extend(r.repo_id for r in page.repos) cursor = page.next_cursor if cursor is None: break assert len(collected) == TOTAL assert len(set(collected)) == TOTAL # no duplicates @pytest.mark.asyncio async def test_200_commit_history_pageable(self, db_session: AsyncSession) -> None: """Push 200 commits; paging through them must yield all without duplicates.""" from musehub.services import musehub_repository repo = await create_repo(db_session, slug="stress-200c") for _ in range(200): await create_commit(db_session, repo.repo_id, branch="main") all_ids: list[str] = [] cursor: str | None = None per_page = 50 for _ in range(1, 5): result = await musehub_repository.list_commits( db_session, repo.repo_id, limit=per_page, cursor=cursor ) all_ids.extend(c.commit_id for c in result.commits) cursor = result.next_cursor if cursor is None: break assert result.total == 200 assert len(all_ids) == 200 assert len(set(all_ids)) == 200 # no duplicates across pages # ───────────────────────────────────────────────────────────────────────────── # Layer 5 — Data Integrity # ───────────────────────────────────────────────────────────────────────────── class TestDataIntegrity: @pytest.mark.asyncio async def test_delete_hard_deletes_row( self, db_session: AsyncSession ) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="del-preserve") repo_id = repo.repo_id deleted = await musehub_repository.delete_repo(db_session, repo_id) await db_session.commit() assert deleted is True # Row must be completely gone from the DB row = await db_session.get(MusehubRepo, repo_id) assert row is None @pytest.mark.asyncio async def test_double_soft_delete_is_idempotent( self, db_session: AsyncSession ) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="del-idempotent") first = await musehub_repository.delete_repo(db_session, repo.repo_id) await db_session.commit() second = await musehub_repository.delete_repo(db_session, repo.repo_id) await db_session.commit() assert first is True assert second is False # already deleted @pytest.mark.asyncio async def test_transfer_on_deleted_repo_returns_none( self, db_session: AsyncSession ) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="del-transfer") await musehub_repository.delete_repo(db_session, repo.repo_id) await db_session.commit() result = await musehub_repository.transfer_repo_ownership( db_session, repo.repo_id, "new-owner" ) assert result is None @pytest.mark.asyncio async def test_duplicate_owner_slug_returns_409_via_http( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: payload = {"name": "duplicate-name", "owner": "testuser"} resp1 = await client.post("/api/repos", json=payload, headers=auth_headers) assert resp1.status_code == 201 resp2 = await client.post("/api/repos", json=payload, headers=auth_headers) assert resp2.status_code == 409 @pytest.mark.asyncio async def test_create_repo_service_sets_correct_slug( self, db_session: AsyncSession ) -> None: from musehub.services import musehub_repository repo = await musehub_repository.create_repo( db_session, name="My Jazz Experiment!", owner="gabriel", visibility="public", owner_user_id=compute_identity_id(b"gabriel"), ) await db_session.commit() assert repo.slug == "my-jazz-experiment" # ───────────────────────────────────────────────────────────────────────────── # Layer 6 — Security # ───────────────────────────────────────────────────────────────────────────── class TestSecurity: @pytest.mark.asyncio async def test_non_owner_delete_returns_403( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """Only owner may delete — authenticated non-owner gets 403.""" # Create a repo owned by someone else _other_id = compute_identity_id(b"other-user") _now = datetime.now(tz=timezone.utc) repo_row = MusehubRepo( repo_id=compute_repo_id(_other_id, "not-mine", "code", _now.isoformat()), name="not-mine", owner="other-user", slug="not-mine", visibility="public", owner_user_id=_other_id, description="", tags=[], created_at=_now, updated_at=_now, ) db_session.add(repo_row) await db_session.commit() resp = await client.delete( f"/api/repos/{repo_row.repo_id}", headers=auth_headers ) assert resp.status_code == 403 @pytest.mark.asyncio async def test_non_owner_transfer_returns_403( self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: _stranger_id = compute_identity_id(b"stranger") _now2 = datetime.now(tz=timezone.utc) repo_row = MusehubRepo( repo_id=compute_repo_id(_stranger_id, "no-transfer", "code", _now2.isoformat()), name="no-transfer", owner="stranger", slug="no-transfer", visibility="public", owner_user_id=_stranger_id, description="", tags=[], created_at=_now2, updated_at=_now2, ) db_session.add(repo_row) await db_session.commit() resp = await client.post( f"/api/repos/{repo_row.repo_id}/transfer", json={"newOwnerUserId": "hacker"}, headers=auth_headers, ) assert resp.status_code == 403 @pytest.mark.asyncio async def test_private_repo_get_without_auth_returns_401( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="sec-priv-get", visibility="private") resp = await client.get(f"/api/repos/{repo.repo_id}") assert resp.status_code == 401 @pytest.mark.asyncio async def test_delete_requires_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="sec-del-noauth", visibility="public") resp = await client.delete(f"/api/repos/{repo.repo_id}") assert resp.status_code == 401 @pytest.mark.asyncio async def test_transfer_requires_auth( self, client: AsyncClient, db_session: AsyncSession ) -> None: repo = await create_repo(db_session, slug="sec-xfer-noauth", visibility="public") resp = await client.post( f"/api/repos/{repo.repo_id}/transfer", json={"newOwnerUserId": "anyone"}, ) assert resp.status_code == 401 # ───────────────────────────────────────────────────────────────────────────── # Layer 7 — Performance # ───────────────────────────────────────────────────────────────────────────── class TestPerformance: def test_generate_slug_1000_calls_under_100ms(self) -> None: from musehub.services.musehub_repository import _generate_slug names = [f"My Repo Number {i} — Special Édition!" for i in range(1000)] t0 = time.perf_counter() for name in names: _generate_slug(name) elapsed_ms = (time.perf_counter() - t0) * 1000 assert elapsed_ms < 100, f"1000 slug calls took {elapsed_ms:.1f}ms > 100ms" @pytest.mark.asyncio async def test_list_repos_50_users_under_500ms( self, db_session: AsyncSession ) -> None: from musehub.services import musehub_repository uid = f"perf-user-{secrets.token_hex(4)}" for i in range(50): await create_repo(db_session, slug=f"perf-r{i:02d}", owner=uid, owner_user_id=uid) # Warm-up await musehub_repository.list_repos_for_user(db_session, uid, limit=50) t0 = time.perf_counter() result = await musehub_repository.list_repos_for_user(db_session, uid, limit=50) elapsed_ms = (time.perf_counter() - t0) * 1000 assert len(result.repos) == 50 assert elapsed_ms < 500, f"list_repos 50 items took {elapsed_ms:.1f}ms > 500ms" @pytest.mark.asyncio async def test_get_repo_home_stats_200_commits_under_500ms( self, db_session: AsyncSession ) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="perf-stats-200c") for _ in range(200): await create_commit(db_session, repo.repo_id, branch="main") # Warm-up await musehub_repository.get_repo_home_stats(db_session, repo.repo_id, "main") t0 = time.perf_counter() stats = await musehub_repository.get_repo_home_stats(db_session, repo.repo_id, "main") elapsed_ms = (time.perf_counter() - t0) * 1000 assert stats["total_commits"] == 200 assert elapsed_ms < 500, f"get_repo_home_stats took {elapsed_ms:.1f}ms > 500ms" @pytest.mark.asyncio async def test_list_commits_200_rows_under_200ms( self, db_session: AsyncSession ) -> None: from musehub.services import musehub_repository repo = await create_repo(db_session, slug="perf-commits-200") for _ in range(200): await create_commit(db_session, repo.repo_id, branch="main") # Warm-up await musehub_repository.list_commits(db_session, repo.repo_id, limit=200) t0 = time.perf_counter() result = await musehub_repository.list_commits( db_session, repo.repo_id, limit=200 ) elapsed_ms = (time.perf_counter() - t0) * 1000 assert result.total == 200 assert len(result.commits) == 200 assert elapsed_ms < 200, f"list_commits 200 rows took {elapsed_ms:.1f}ms > 200ms" # ───────────────────────────────────────────────────────────────────────────── # Layer 8 — Regression: soft-deleted repos must not surface via owner/slug path # ───────────────────────────────────────────────────────────────────────────── class TestDeleteOwnerSlugRegression: """Regression suite: hard-deleted repos must not surface via owner/slug path.""" @pytest.mark.asyncio async def test_owner_slug_http_returns_404_for_deleted_repo( self, client: AsyncClient, db_session: AsyncSession ) -> None: """HTTP /{owner}/{slug} must return 404 for a hard-deleted repo.""" repo = await create_repo(db_session, slug="http-deleted", owner="httpuser", owner_user_id=compute_identity_id(b"httpuser"), visibility="public") await db_session.delete(repo) await db_session.commit() resp = await client.get("/api/httpuser/http-deleted") assert resp.status_code == 404, ( f"Expected 404 for hard-deleted repo via /owner/slug path, got {resp.status_code}" ) # ───────────────────────────────────────────────────────────────────────────── # Layer 9 — domain_id always persisted on creation # ───────────────────────────────────────────────────────────────────────────── class TestDomainIdAlwaysPersisted: """domain_id must be written to the DB row on every creation path. Previously create_repo() used the domain arg only for compute_repo_id() but never stored it — leaving domain_id NULL for every normal repo. """ @pytest.mark.asyncio async def test_create_repo_explicit_domain_stored( self, db_session: AsyncSession ) -> None: """When caller passes domain='midi', domain_id='midi' is on the DB row.""" from musehub.services.musehub_repository import create_repo as svc_create_repo result = await svc_create_repo( db_session, name="midi-test", owner="testuser", visibility="public", owner_user_id="testuser", owner_identity_id="testuser", domain="midi", ) await db_session.commit() row = await db_session.get(MusehubRepo, result.repo_id) assert row is not None assert row.domain_id == "midi" @pytest.mark.asyncio async def test_create_repo_no_domain_defaults_to_code( self, db_session: AsyncSession ) -> None: """When no domain is passed, domain_id='code' is stored — never NULL.""" from musehub.services.musehub_repository import create_repo as svc_create_repo result = await svc_create_repo( db_session, name="no-domain-test", owner="testuser", visibility="public", owner_user_id="testuser", owner_identity_id="testuser", ) await db_session.commit() row = await db_session.get(MusehubRepo, result.repo_id) assert row is not None assert row.domain_id == "code" @pytest.mark.asyncio async def test_create_repo_empty_domain_defaults_to_code( self, db_session: AsyncSession ) -> None: """Explicit empty-string domain also resolves to 'code', never NULL.""" from musehub.services.musehub_repository import create_repo as svc_create_repo result = await svc_create_repo( db_session, name="empty-domain-test", owner="testuser", visibility="public", owner_user_id="testuser", owner_identity_id="testuser", domain="", ) await db_session.commit() row = await db_session.get(MusehubRepo, result.repo_id) assert row is not None assert row.domain_id == "code" @pytest.mark.asyncio async def test_fork_inherits_source_domain( self, db_session: AsyncSession ) -> None: """Forking a midi repo produces a fork with domain_id='midi'.""" from musehub.services.musehub_repository import create_repo as svc_create_repo, fork_repo from musehub.models.musehub import ForkRepoRequest source = await svc_create_repo( db_session, name="source-midi", owner="sourceuser", visibility="public", owner_user_id="sourceuser", owner_identity_id="sourceuser", domain="midi", ) await db_session.commit() fork_result = await fork_repo( db_session, source_repo_id=source.repo_id, forked_by_handle="forkuser", request=ForkRepoRequest(), ) await db_session.commit() fork_row = await db_session.get(MusehubRepo, fork_result.fork_repo.repo_id) assert fork_row is not None assert fork_row.domain_id == "midi" @pytest.mark.asyncio async def test_fork_source_null_domain_becomes_code( self, db_session: AsyncSession ) -> None: """Forking a repo whose domain_id is NULL in DB produces fork with domain_id='code'.""" from musehub.services.musehub_repository import fork_repo from musehub.models.musehub import ForkRepoRequest # Simulate a legacy row with NULL domain_id created_at = datetime.now(tz=timezone.utc) legacy = MusehubRepo( repo_id=compute_repo_id("legacyuser", "legacy-repo", "muse/generic", created_at.isoformat()), name="legacy-repo", owner="legacyuser", slug="legacy-repo", visibility="public", owner_user_id="legacyuser", domain_id=None, created_at=created_at, updated_at=created_at, ) db_session.add(legacy) await db_session.commit() fork_result = await fork_repo( db_session, source_repo_id=legacy.repo_id, forked_by_handle="forkuser2", request=ForkRepoRequest(), ) await db_session.commit() fork_row = await db_session.get(MusehubRepo, fork_result.fork_repo.repo_id) assert fork_row is not None assert fork_row.domain_id == "code" @pytest.mark.asyncio async def test_api_create_repo_domain_stored( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """POST /api/repos with domain='midi' stores domain_id='midi' on the DB row.""" resp = await client.post( "/api/repos", json={"name": "api-midi-repo", "owner": "testuser", "visibility": "public", "domain": "midi"}, headers=auth_headers, ) assert resp.status_code == 201 repo_id = resp.json()["repoId"] row = await db_session.get(MusehubRepo, repo_id) assert row is not None assert row.domain_id == "midi" @pytest.mark.asyncio async def test_api_create_repo_no_domain_defaults_to_code( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """POST /api/repos without domain field stores domain_id='code' on the DB row.""" resp = await client.post( "/api/repos", json={"name": "api-no-domain-repo", "owner": "testuser", "visibility": "public"}, headers=auth_headers, ) assert resp.status_code == 201 repo_id = resp.json()["repoId"] row = await db_session.get(MusehubRepo, repo_id) assert row is not None assert row.domain_id == "code"