"""Tests for the fork-a-repo feature. Covers: POST /api/repos/{repo_id}/fork - Happy path: fork a public repo - 404 when source repo does not exist - 403 when source repo is private - 403 when caller tries to fork their own repo - 409 when caller has already forked the same repo - 401 when unauthenticated - Optional name / description / visibility fields GET /api/repos/{repo_id}/forks - Returns empty list when no forks exist - Returns all direct forks with source attribution - 404 when source repo does not exist - Public endpoint (no auth required) GET /api/repos/{repo_id}/fork-network - Returns root node with children - Total_forks count is correct - Public endpoint (no auth required) GET /api/users/{username}/forks - Returns empty list when user has no forks - Returns forks with source attribution after forking - 404 when username does not exist Service layer - fork_repo raises ValueError for business rule violations - get_user_forks returns real data after forks are created - list_repo_forks_flat returns real data after forks are created All tests use the shared ``client``, ``auth_headers``, ``test_user``, and ``db_session`` fixtures from conftest.py. """ from __future__ import annotations from datetime import datetime, timezone import pytest from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_fork_id, compute_identity_id, compute_repo_id from musehub.db.musehub_identity_models import MusehubIdentity from musehub.db.musehub_repo_models import MusehubRepo from musehub.types.json_types import StrDict # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _TEST_HANDLE = "testuser" # matches conftest._TEST_HANDLE async def _create_public_repo( client: AsyncClient, auth_headers: StrDict, name: str = "upstream-beats", ) -> str: """Create a public repo via the API and return its repo_id.""" resp = await client.post( "/api/repos", json={"name": name, "owner": _TEST_HANDLE, "visibility": "public", "initialize": False}, headers=auth_headers, ) assert resp.status_code == 201, resp.text return str(resp.json()["repoId"]) async def _create_private_repo( client: AsyncClient, auth_headers: StrDict, name: str = "secret-project", ) -> str: """Create a private repo via the API and return its repo_id.""" resp = await client.post( "/api/repos", json={"name": name, "owner": _TEST_HANDLE, "visibility": "private", "initialize": False}, headers=auth_headers, ) assert resp.status_code == 201, resp.text return str(resp.json()["repoId"]) async def _seed_identity(db: AsyncSession, handle: str) -> MusehubIdentity: """Seed a secondary identity in the DB (simulating a different user).""" identity = MusehubIdentity( identity_id=compute_identity_id(handle.encode()), handle=handle, display_name=handle.title(), identity_type="human", ) db.add(identity) await db.commit() await db.refresh(identity) return identity async def _seed_source_repo( db: AsyncSession, owner: str, slug: str, visibility: str = "public", **kwargs: str | list[str], ) -> str: """Seed owner identity + source repo; return repo_id string.""" await _seed_identity(db, owner) created_at = datetime.now(tz=timezone.utc) owner_id = compute_identity_id(owner.encode()) repo = MusehubRepo( repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()), name=slug, owner=owner, slug=slug, visibility=visibility, owner_user_id=owner_id, created_at=created_at, updated_at=created_at, **kwargs, ) db.add(repo) await db.commit() await db.refresh(repo) return str(repo.repo_id) # --------------------------------------------------------------------------- # POST /api/repos/{repo_id}/fork — happy path # --------------------------------------------------------------------------- async def test_fork_public_repo_returns_201( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Forking a public repo returns 201 with fork metadata.""" # Seed a public source repo owned by a different identity so the caller # can fork it. source_id = await _seed_source_repo(db_session, "alice", "shared-beats", description="Alice's public beats") resp = await client.post( f"/api/repos/{source_id}/fork", json={}, headers=auth_headers, ) assert resp.status_code == 201, resp.text body = resp.json() assert "forkId" in body assert body["sourceOwner"] == "alice" assert body["sourceSlug"] == "shared-beats" assert "forkRepo" in body fork_repo = body["forkRepo"] assert fork_repo["owner"] == _TEST_HANDLE assert fork_repo["visibility"] == "public" assert "forkedAt" in body async def test_fork_sets_description_with_attribution( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Fork description defaults to 'Fork of {owner}/{slug}: {source description}'.""" source_id = await _seed_source_repo(db_session, "bob", "groove-box", description="Bob's groove box") resp = await client.post( f"/api/repos/{source_id}/fork", json={}, headers=auth_headers, ) assert resp.status_code == 201, resp.text description = resp.json()["forkRepo"]["description"] assert "bob" in description assert "groove-box" in description assert "Bob's groove box" in description async def test_fork_with_custom_name( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Fork accepts an optional custom name for the new repo.""" source_id = await _seed_source_repo(db_session, "carol", "jazz-trio", description="Carol's jazz trio") resp = await client.post( f"/api/repos/{source_id}/fork", json={"name": "my-jazz-experiment"}, headers=auth_headers, ) assert resp.status_code == 201, resp.text fork_repo = resp.json()["forkRepo"] assert "jazz-experiment" in fork_repo["slug"] async def test_fork_with_private_visibility( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Fork accepts visibility='private' to create a private fork.""" source_id = await _seed_source_repo(db_session, "dave", "open-source-beats", description="Dave's open source beats") resp = await client.post( f"/api/repos/{source_id}/fork", json={"visibility": "private"}, headers=auth_headers, ) assert resp.status_code == 201, resp.text assert resp.json()["forkRepo"]["visibility"] == "private" async def test_fork_with_custom_description( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Fork accepts a custom description that overrides the default attribution.""" source_id = await _seed_source_repo(db_session, "eve", "synth-lab", description="") resp = await client.post( f"/api/repos/{source_id}/fork", json={"description": "My custom synth fork"}, headers=auth_headers, ) assert resp.status_code == 201, resp.text assert resp.json()["forkRepo"]["description"] == "My custom synth fork" # --------------------------------------------------------------------------- # POST /api/repos/{repo_id}/fork — error cases # --------------------------------------------------------------------------- async def test_fork_nonexistent_repo_returns_404( client: AsyncClient, auth_headers: StrDict, ) -> None: """Forking a repo that doesn't exist returns 404.""" resp = await client.post( "/api/repos/00000000-0000-0000-0000-000000000000/fork", json={}, headers=auth_headers, ) assert resp.status_code == 404 async def test_fork_private_repo_returns_403( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Forking a private repo returns 403.""" source_id = await _seed_source_repo(db_session, "frank", "private-session", visibility="private", description="Frank's private work") resp = await client.post( f"/api/repos/{source_id}/fork", json={}, headers=auth_headers, ) assert resp.status_code == 403 assert "public" in resp.json()["detail"].lower() async def test_fork_own_repo_returns_403( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Caller cannot fork a repository they already own — returns 403.""" source_id = await _create_public_repo(client, auth_headers, name="my-own-beats") resp = await client.post( f"/api/repos/{source_id}/fork", json={}, headers=auth_headers, ) assert resp.status_code == 403 assert "own" in resp.json()["detail"].lower() async def test_fork_same_repo_twice_returns_409( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Forking the same repo twice returns 409 Conflict.""" source_id = await _seed_source_repo(db_session, "grace", "shared-vibes", description="Grace's vibes") # First fork succeeds resp1 = await client.post( f"/api/repos/{source_id}/fork", json={}, headers=auth_headers, ) assert resp1.status_code == 201, resp1.text # Second fork of the same repo → 409 resp2 = await client.post( f"/api/repos/{source_id}/fork", json={"name": "another-fork"}, headers=auth_headers, ) assert resp2.status_code == 409 async def test_fork_requires_auth(client: AsyncClient, db_session: AsyncSession) -> None: """Unauthenticated fork request returns 401.""" source_id = await _seed_source_repo(db_session, "henry", "open-beats", description="") resp = await client.post(f"/api/repos/{source_id}/fork", json={}) assert resp.status_code == 401 # --------------------------------------------------------------------------- # GET /api/repos/{repo_id}/forks # --------------------------------------------------------------------------- async def test_list_forks_empty_when_no_forks( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """A repo with no forks returns an empty list.""" source_id = await _seed_source_repo(db_session, "iris", "unfork-able", description="") resp = await client.get(f"/api/repos/{source_id}/forks") assert resp.status_code == 200 body = resp.json() assert body["forks"] == [] assert body["total"] == 0 async def test_list_forks_shows_fork_after_creation( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """A fork appears in the list after being created.""" source_id = await _seed_source_repo(db_session, "jack", "popular-track", description="Jack's popular track") # Fork it fork_resp = await client.post( f"/api/repos/{source_id}/fork", json={}, headers=auth_headers, ) assert fork_resp.status_code == 201, fork_resp.text # List forks resp = await client.get(f"/api/repos/{source_id}/forks") assert resp.status_code == 200 body = resp.json() assert body["total"] == 1 fork = body["forks"][0] assert fork["sourceOwner"] == "jack" assert fork["sourceSlug"] == "popular-track" assert fork["forkRepo"]["owner"] == _TEST_HANDLE async def test_list_forks_no_auth_required( client: AsyncClient, db_session: AsyncSession, ) -> None: """List forks endpoint is publicly accessible without authentication.""" source_id = await _seed_source_repo(db_session, "kate", "public-beats", description="") # No auth_headers passed resp = await client.get(f"/api/repos/{source_id}/forks") assert resp.status_code == 200 async def test_list_forks_returns_404_for_missing_repo( client: AsyncClient, ) -> None: """List forks for a non-existent repo returns 404.""" resp = await client.get("/api/repos/00000000-0000-0000-0000-000000000000/forks") assert resp.status_code == 404 # --------------------------------------------------------------------------- # GET /api/repos/{repo_id}/fork-network # --------------------------------------------------------------------------- async def test_fork_network_has_root_and_children( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Fork network returns root with forked repo as a child.""" source_id = await _seed_source_repo(db_session, "liam", "groove-machine", description="Liam's groove machine") # Fork it fork_resp = await client.post( f"/api/repos/{source_id}/fork", json={}, headers=auth_headers, ) assert fork_resp.status_code == 201, fork_resp.text # Get fork network resp = await client.get(f"/api/repos/{source_id}/fork-network") assert resp.status_code == 200 body = resp.json() assert "root" in body assert body["totalForks"] == 1 root = body["root"] assert root["owner"] == "liam" assert root["repoSlug"] == "groove-machine" assert len(root["children"]) == 1 child = root["children"][0] assert child["owner"] == _TEST_HANDLE assert child["forkedBy"] == _TEST_HANDLE async def test_fork_network_empty_children_when_no_forks( client: AsyncClient, db_session: AsyncSession, ) -> None: """Fork network for a repo with no forks has an empty children list.""" source_id = await _seed_source_repo(db_session, "mia", "solo-track", description="") resp = await client.get(f"/api/repos/{source_id}/fork-network") assert resp.status_code == 200 body = resp.json() assert body["totalForks"] == 0 assert body["root"]["children"] == [] async def test_fork_network_returns_404_for_missing_repo( client: AsyncClient, ) -> None: """Fork network for a non-existent repo returns 404.""" resp = await client.get("/api/repos/00000000-0000-0000-0000-000000000000/fork-network") assert resp.status_code == 404 async def test_fork_network_no_auth_required( client: AsyncClient, db_session: AsyncSession, ) -> None: """Fork network endpoint is publicly accessible without authentication.""" source_id = await _seed_source_repo(db_session, "noah", "collab-beats", description="") resp = await client.get(f"/api/repos/{source_id}/fork-network") assert resp.status_code == 200 # --------------------------------------------------------------------------- # GET /api/users/{username}/forks # --------------------------------------------------------------------------- async def test_get_user_forks_empty_for_new_user( client: AsyncClient, test_user: MusehubIdentity, ) -> None: """A user with no forks returns an empty list.""" resp = await client.get(f"/api/users/{_TEST_HANDLE}/forks") assert resp.status_code == 200 body = resp.json() assert body["forks"] == [] assert body["total"] == 0 async def test_get_user_forks_shows_fork_after_creation( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, test_user: MusehubIdentity, ) -> None: """User's forks list is populated after forking a repo.""" source_id = await _seed_source_repo(db_session, "olivia", "soul-session", description="Olivia's soul session") # Fork it fork_resp = await client.post( f"/api/repos/{source_id}/fork", json={}, headers=auth_headers, ) assert fork_resp.status_code == 201, fork_resp.text # Get user forks resp = await client.get(f"/api/users/{_TEST_HANDLE}/forks") assert resp.status_code == 200 body = resp.json() assert body["total"] == 1 entry = body["forks"][0] assert entry["sourceOwner"] == "olivia" assert entry["sourceSlug"] == "soul-session" assert entry["forkRepo"]["owner"] == _TEST_HANDLE assert "forkId" in entry assert "forkedAt" in entry async def test_get_user_forks_404_for_unknown_user( client: AsyncClient, ) -> None: """Requesting forks for an unknown user returns 404.""" resp = await client.get("/api/users/nonexistent-user-xyz/forks") assert resp.status_code == 404 async def test_get_user_forks_no_auth_required( client: AsyncClient, test_user: MusehubIdentity, ) -> None: """User forks endpoint is publicly accessible without authentication.""" resp = await client.get(f"/api/users/{_TEST_HANDLE}/forks") assert resp.status_code == 200 # --------------------------------------------------------------------------- # Fork response shape # --------------------------------------------------------------------------- async def test_fork_response_contains_all_required_fields( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Fork creation response contains all documented fields.""" source_id = await _seed_source_repo(db_session, "peter", "field-check-beats", description="Peter's beats", tags=["jazz", "soul"]) resp = await client.post( f"/api/repos/{source_id}/fork", json={}, headers=auth_headers, ) assert resp.status_code == 201, resp.text body = resp.json() # Top-level fork entry fields for field in ("forkId", "forkRepo", "sourceOwner", "sourceSlug", "forkedAt"): assert field in body, f"Missing field: {field}" # Fork repo fields fork_repo = body["forkRepo"] for field in ("repoId", "name", "owner", "slug", "visibility", "description", "tags", "createdAt"): assert field in fork_repo, f"Missing forkRepo field: {field}" # Tags are copied from source assert "jazz" in fork_repo["tags"] assert "soul" in fork_repo["tags"] # --------------------------------------------------------------------------- # Multiple forks of the same source # --------------------------------------------------------------------------- async def test_multiple_forks_appear_in_list( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Multiple forks by different users all appear in the source repo's fork list.""" source_id = await _seed_source_repo(db_session, "quinn", "viral-track", description="Quinn's viral track") # testuser forks it fork_resp = await client.post( f"/api/repos/{source_id}/fork", json={}, headers=auth_headers, ) assert fork_resp.status_code == 201, fork_resp.text # Seed a second forker via direct DB insert (bypasses auth) _rachel_id = compute_identity_id(b"rachel") await _seed_identity(db_session, "rachel") _fr2_created = datetime.now(tz=timezone.utc) _fr2_id = compute_repo_id(_rachel_id, "viral-track", "code", _fr2_created.isoformat()) fork_repo_2 = MusehubRepo( repo_id=_fr2_id, name="viral-track", owner="rachel", slug="viral-track", visibility="public", owner_user_id=_rachel_id, description="Fork of quinn/viral-track: Quinn's viral track", created_at=_fr2_created, updated_at=_fr2_created, ) db_session.add(fork_repo_2) await db_session.commit() await db_session.refresh(fork_repo_2) from musehub.db.musehub_social_models import MusehubFork _fork_now = datetime.now(tz=timezone.utc) fork_record = MusehubFork( fork_id=compute_fork_id(source_id, _fr2_id, _fork_now.isoformat()), source_repo_id=source_id, fork_repo_id=fork_repo_2.repo_id, forked_by="rachel", ) db_session.add(fork_record) await db_session.commit() resp = await client.get(f"/api/repos/{source_id}/forks") assert resp.status_code == 200 body = resp.json() assert body["total"] == 2 owners = {f["forkRepo"]["owner"] for f in body["forks"]} assert _TEST_HANDLE in owners assert "rachel" in owners # --------------------------------------------------------------------------- # Private fork visibility — security hardening # --------------------------------------------------------------------------- async def test_private_fork_hidden_from_source_forks_list( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """A private fork must NOT appear in the public GET /repos/{id}/forks list.""" source_id = await _seed_source_repo(db_session, "sam", "secret-upstream", description="Sam's upstream") # Fork with private visibility resp = await client.post( f"/api/repos/{source_id}/fork", json={"visibility": "private"}, headers=auth_headers, ) assert resp.status_code == 201, resp.text # Public listing must be empty — the fork is private list_resp = await client.get(f"/api/repos/{source_id}/forks") assert list_resp.status_code == 200 body = list_resp.json() assert body["total"] == 0 assert body["forks"] == [] async def test_private_fork_hidden_from_fork_network( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """A private fork must NOT appear in the public fork-network tree.""" source_id = await _seed_source_repo(db_session, "tara", "silent-upstream", description="Tara's upstream") # Fork with private visibility resp = await client.post( f"/api/repos/{source_id}/fork", json={"visibility": "private"}, headers=auth_headers, ) assert resp.status_code == 201, resp.text # Fork network must show 0 forks — private fork is not in tree net_resp = await client.get(f"/api/repos/{source_id}/fork-network") assert net_resp.status_code == 200 body = net_resp.json() assert body["totalForks"] == 0 assert body["root"]["children"] == [] async def test_private_fork_hidden_from_public_user_forks( client: AsyncClient, test_user: MusehubIdentity, db_session: AsyncSession, ) -> None: """A private fork must NOT appear when an unauthenticated caller views a user's forks. Note: this test does NOT request the ``auth_headers`` fixture because that fixture globally overrides ``optional_signed_request`` to return the test context, making every request in the test look authenticated. Instead we seed the fork directly in the DB so we can make a genuinely anonymous call. """ _uma_id = compute_identity_id(b"uma") _test_id = compute_identity_id(_TEST_HANDLE.encode()) await _seed_identity(db_session, "uma") _src_ts = datetime.now(tz=timezone.utc) _src_id = compute_repo_id(_uma_id, "covert-upstream", "code", _src_ts.isoformat()) source = MusehubRepo( repo_id=_src_id, name="covert-upstream", owner="uma", slug="covert-upstream", visibility="public", owner_user_id=_uma_id, description="Uma's upstream", created_at=_src_ts, updated_at=_src_ts, ) _frk_ts = datetime.now(tz=timezone.utc) _frk_id = compute_repo_id(_test_id, "covert-upstream", "code", _frk_ts.isoformat()) fork_repo = MusehubRepo( repo_id=_frk_id, name="covert-upstream", owner=_TEST_HANDLE, slug="covert-upstream", visibility="private", # private fork owner_user_id=_test_id, description="Fork of uma/covert-upstream: Uma's upstream", created_at=_frk_ts, updated_at=_frk_ts, ) db_session.add(source) db_session.add(fork_repo) await db_session.commit() await db_session.refresh(source) await db_session.refresh(fork_repo) from musehub.db.musehub_social_models import MusehubFork _fk_ts = datetime.now(tz=timezone.utc) fork_record = MusehubFork( fork_id=compute_fork_id(_src_id, _frk_id, _fk_ts.isoformat()), source_repo_id=str(source.repo_id), fork_repo_id=str(fork_repo.repo_id), forked_by=_TEST_HANDLE, ) db_session.add(fork_record) await db_session.commit() # Genuinely unauthenticated GET — no auth_headers fixture, no dep override anon_resp = await client.get(f"/api/users/{_TEST_HANDLE}/forks") assert anon_resp.status_code == 200 body = anon_resp.json() assert body["total"] == 0 assert body["forks"] == [] async def test_private_fork_visible_to_owner( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """The fork owner can see their own private fork via the authenticated forks endpoint.""" source_id = await _seed_source_repo(db_session, "vera", "owner-visible-upstream", description="Vera's upstream") # Fork with private visibility resp = await client.post( f"/api/repos/{source_id}/fork", json={"visibility": "private"}, headers=auth_headers, ) assert resp.status_code == 201, resp.text # Authenticated as owner — private fork IS visible auth_resp = await client.get(f"/api/users/{_TEST_HANDLE}/forks", headers=auth_headers) assert auth_resp.status_code == 200 body = auth_resp.json() assert body["total"] == 1 assert body["forks"][0]["forkRepo"]["visibility"] == "private" async def test_invalid_visibility_returns_422( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Fork request with invalid visibility value returns 422 Unprocessable Entity.""" source_id = await _seed_source_repo(db_session, "walter", "valid-upstream", description="Walter's upstream") resp = await client.post( f"/api/repos/{source_id}/fork", json={"visibility": "superadmin"}, headers=auth_headers, ) assert resp.status_code == 422 async def test_slug_collision_auto_resolved( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Forking when the caller already owns a repo with the same name auto-suffixes the slug.""" # testuser already owns a repo with the same name as the source existing_resp = await client.post( "/api/repos", json={"name": "classic-track", "owner": _TEST_HANDLE, "visibility": "public", "initialize": False}, headers=auth_headers, ) assert existing_resp.status_code == 201, existing_resp.text source_id = await _seed_source_repo(db_session, "xavier", "classic-track", description="Xavier's classic track") # Fork should succeed despite slug collision — gets auto-suffixed slug fork_resp = await client.post( f"/api/repos/{source_id}/fork", json={}, headers=auth_headers, ) assert fork_resp.status_code == 201, fork_resp.text fork_slug = fork_resp.json()["forkRepo"]["slug"] # Slug must differ from the existing one (auto-suffixed) assert fork_slug == "classic-track-2"