"""Tests for MuseHub repo, branch, and commit endpoints. Covers every acceptance criterion: - POST /musehub/repos returns 201 with correct fields - POST requires auth — unauthenticated requests return 401 - GET /repos/{repo_id} returns 200; 404 for unknown repo - GET /repos/{repo_id}/branches returns empty list on new repo - GET /repos/{repo_id}/commits returns newest first, respects ?limit Covers (compare view API endpoint): - test_compare_radar_data — compare endpoint returns 5 dimension scores - test_compare_commit_list — commits unique to head are listed - test_compare_unknown_ref_404 — unknown ref returns 422 All tests use the shared ``client`` and ``auth_headers`` fixtures from conftest.py. """ from __future__ import annotations from datetime import datetime, timezone import pytest from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from musehub.core.genesis import compute_collaborator_id, compute_identity_id, compute_repo_id from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo from musehub.services import musehub_repository from musehub.types.json_types import StrDict def _make_repo( slug: str, owner: str = "testuser", owner_user_id: str | None = None, visibility: str = "private", **kwargs: str | int | bool | None, ) -> MusehubRepo: if owner_user_id is None: owner_user_id = TEST_OWNER_USER_ID created_at = datetime.now(tz=timezone.utc) return MusehubRepo( repo_id=compute_repo_id(owner_user_id, slug, "code", created_at.isoformat()), name=slug, owner=owner, slug=slug, visibility=visibility, owner_user_id=owner_user_id, created_at=created_at, updated_at=created_at, **kwargs, ) # --------------------------------------------------------------------------- # POST /musehub/repos # --------------------------------------------------------------------------- async def test_create_repo_returns_201( client: AsyncClient, auth_headers: StrDict, ) -> None: """POST /musehub/repos creates a repo and returns all required fields.""" response = await client.post( "/api/repos", json={"name": "my-beats", "owner": "testuser", "visibility": "private"}, headers=auth_headers, ) assert response.status_code == 201 body = response.json() assert body["name"] == "my-beats" assert body["visibility"] == "private" assert "repoId" in body assert "cloneUrl" in body assert "ownerUserId" in body assert "createdAt" in body async def test_create_repo_requires_auth(client: AsyncClient) -> None: """POST /musehub/repos returns 401 without a MSign Authorization header.""" response = await client.post( "/api/repos", json={"name": "my-beats", "owner": "testuser"}, ) assert response.status_code == 401 async def test_create_repo_default_visibility_is_public( client: AsyncClient, auth_headers: StrDict, ) -> None: """Omitting visibility defaults to 'public'.""" response = await client.post( "/api/repos", json={"name": "silent-sessions", "owner": "testuser"}, headers=auth_headers, ) assert response.status_code == 201 assert response.json()["visibility"] == "public" # --------------------------------------------------------------------------- # GET /repos/{repo_id} # --------------------------------------------------------------------------- async def test_get_repo_returns_200( client: AsyncClient, auth_headers: StrDict, ) -> None: """GET /repos/{repo_id} returns the repo after creation.""" create = await client.post( "/api/repos", json={"name": "jazz-sessions", "owner": "testuser"}, headers=auth_headers, ) assert create.status_code == 201 repo_id = create.json()["repoId"] response = await client.get(f"/api/repos/{repo_id}", headers=auth_headers) assert response.status_code == 200 assert response.json()["repoId"] == repo_id assert response.json()["name"] == "jazz-sessions" async def test_get_repo_not_found_returns_404( client: AsyncClient, auth_headers: StrDict, ) -> None: """GET /repos/{repo_id} returns 404 for unknown repo.""" response = await client.get( "/api/repos/does-not-exist", headers=auth_headers, ) assert response.status_code == 404 async def test_get_nonexistent_repo_returns_404_without_auth(client: AsyncClient) -> None: """GET /repos/{repo_id} returns 404 for a non-existent repo without auth. Uses optional_token — auth is visibility-based; missing repo → 404 before auth check. """ response = await client.get("/api/repos/non-existent-repo-id") assert response.status_code == 404 # --------------------------------------------------------------------------- # GET /repos/{repo_id}/branches # --------------------------------------------------------------------------- async def test_list_branches_empty_on_new_repo( client: AsyncClient, auth_headers: StrDict, ) -> None: """A newly created repo has an empty branches list when not initialized.""" create = await client.post( "/api/repos", json={"name": "drum-patterns", "owner": "testuser", "initialize": False}, headers=auth_headers, ) repo_id = create.json()["repoId"] response = await client.get( f"/api/repos/{repo_id}/branches", headers=auth_headers, ) assert response.status_code == 200 assert response.json()["branches"] == [] async def test_list_branches_not_found_returns_404( client: AsyncClient, auth_headers: StrDict, ) -> None: """GET /branches returns 404 when the repo doesn't exist.""" response = await client.get( "/api/repos/ghost-repo/branches", headers=auth_headers, ) assert response.status_code == 404 # --------------------------------------------------------------------------- # GET /repos/{repo_id}/commits # --------------------------------------------------------------------------- async def test_list_commits_empty_on_new_repo( client: AsyncClient, auth_headers: StrDict, ) -> None: """A new repo has no commits when initialize=false.""" create = await client.post( "/api/repos", json={"name": "empty-repo", "owner": "testuser", "initialize": False}, headers=auth_headers, ) repo_id = create.json()["repoId"] response = await client.get( f"/api/repos/{repo_id}/commits", headers=auth_headers, ) assert response.status_code == 200 body = response.json() assert body["commits"] == [] assert body["total"] == 0 async def test_list_commits_returns_newest_first( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Commits are returned newest-first after being pushed.""" from datetime import datetime, timezone, timedelta # Create repo via API (no init commit so we control the full history) create = await client.post( "/api/repos", json={"name": "ordered-commits", "owner": "testuser", "initialize": False}, headers=auth_headers, ) repo_id = create.json()["repoId"] # Insert two commits directly with known timestamps now = datetime.now(tz=timezone.utc) older = MusehubCommit( commit_id="aaa111", branch="main", parent_ids=[], message="first", author="gabriel", timestamp=now - timedelta(hours=1), ) newer = MusehubCommit( commit_id="bbb222", branch="main", parent_ids=["aaa111"], message="second", author="gabriel", timestamp=now, ) db_session.add_all([older, newer]) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id="aaa111")) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id="bbb222")) await db_session.commit() response = await client.get( f"/api/repos/{repo_id}/commits", headers=auth_headers, ) assert response.status_code == 200 commits = response.json()["commits"] assert len(commits) == 2 assert commits[0]["commitId"] == "bbb222" assert commits[1]["commitId"] == "aaa111" async def test_list_commits_limit_param( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """?limit=1 returns exactly 1 commit.""" from datetime import datetime, timezone, timedelta create = await client.post( "/api/repos", json={"name": "limited-repo", "owner": "testuser", "initialize": False}, headers=auth_headers, ) repo_id = create.json()["repoId"] now = datetime.now(tz=timezone.utc) for i in range(3): db_session.add( MusehubCommit( commit_id=f"commit-{i}", branch="main", parent_ids=[], message=f"commit {i}", author="gabriel", timestamp=now + timedelta(seconds=i), ) ) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id=f"commit-{i}")) await db_session.commit() response = await client.get( f"/api/repos/{repo_id}/commits?limit=1", headers=auth_headers, ) assert response.status_code == 200 body = response.json() assert len(body["commits"]) == 1 assert body["total"] == 3 # --------------------------------------------------------------------------- # Service layer — direct DB tests (no HTTP) # --------------------------------------------------------------------------- async def test_create_repo_service_persists_to_db(db_session: AsyncSession) -> None: """musehub_repository.create_repo() persists the row.""" repo = await musehub_repository.create_repo( db_session, name="service-test-repo", owner="testuser", visibility="public", owner_user_id=compute_identity_id(b"testuser"), ) await db_session.commit() fetched = await musehub_repository.get_repo(db_session, repo.repo_id) assert fetched is not None assert fetched.name == "service-test-repo" assert fetched.visibility == "public" async def test_get_repo_returns_none_when_missing(db_session: AsyncSession) -> None: """get_repo() returns None for an unknown repo_id.""" result = await musehub_repository.get_repo(db_session, "nonexistent-id") assert result is None async def test_list_branches_returns_empty_for_new_repo(db_session: AsyncSession) -> None: """list_branches() returns [] for a repo with no branches.""" repo = await musehub_repository.create_repo( db_session, name="branchless", owner="testuser", visibility="private", owner_user_id=compute_identity_id(b"testuser"), ) await db_session.commit() branches = await musehub_repository.list_branches(db_session, repo.repo_id) assert branches == [] # --------------------------------------------------------------------------- # GET /repos/{repo_id}/divergence # --------------------------------------------------------------------------- async def test_divergence_endpoint_returns_five_dimensions( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """GET /divergence returns five dimension scores with level labels.""" from datetime import datetime, timezone, timedelta create = await client.post( "/api/repos", json={"name": "divergence-test-repo", "owner": "testuser"}, headers=auth_headers, ) assert create.status_code == 201 repo_id = create.json()["repoId"] now = datetime.now(tz=timezone.utc) db_session.add(MusehubCommit(commit_id="aaa-melody", branch="main", parent_ids=[], message="add lead melody line", author="alice", timestamp=now - timedelta(hours=2))) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id="aaa-melody")) db_session.add(MusehubCommit(commit_id="bbb-chord", branch="feature", parent_ids=[], message="update chord progression", author="bob", timestamp=now - timedelta(hours=1))) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id="bbb-chord")) await db_session.commit() response = await client.get( f"/api/repos/{repo_id}/divergence?branch_a=main&branch_b=feature", headers=auth_headers, ) assert response.status_code == 200 body = response.json() assert "dimensions" in body assert len(body["dimensions"]) == 5 dim_names = {d["dimension"] for d in body["dimensions"]} assert dim_names == {"melodic", "harmonic", "rhythmic", "structural", "dynamic"} for dim in body["dimensions"]: assert "level" in dim assert dim["level"] in {"NONE", "LOW", "MED", "HIGH"} assert "score" in dim assert 0.0 <= dim["score"] <= 1.0 async def test_divergence_overall_score_is_mean_of_dimensions( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Overall divergence score equals the mean of all five dimension scores.""" from datetime import datetime, timezone, timedelta create = await client.post( "/api/repos", json={"name": "divergence-mean-repo", "owner": "testuser"}, headers=auth_headers, ) repo_id = create.json()["repoId"] now = datetime.now(tz=timezone.utc) db_session.add(MusehubCommit(commit_id="c1-beat", branch="alpha", parent_ids=[], message="rework drum beat groove", author="producer-a", timestamp=now - timedelta(hours=3))) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id="c1-beat")) db_session.add(MusehubCommit(commit_id="c2-mix", branch="beta", parent_ids=[], message="fix master volume level", author="producer-b", timestamp=now - timedelta(hours=2))) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id="c2-mix")) await db_session.commit() response = await client.get( f"/api/repos/{repo_id}/divergence?branch_a=alpha&branch_b=beta", headers=auth_headers, ) assert response.status_code == 200 body = response.json() dims = body["dimensions"] computed_mean = round(sum(d["score"] for d in dims) / len(dims), 4) assert abs(body["overallScore"] - computed_mean) < 1e-6 async def test_divergence_json_response_structure( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """JSON response has all required top-level fields and camelCase keys.""" from datetime import datetime, timezone, timedelta create = await client.post( "/api/repos", json={"name": "divergence-struct-repo", "owner": "testuser"}, headers=auth_headers, ) repo_id = create.json()["repoId"] now = datetime.now(tz=timezone.utc) for i, (branch, msg) in enumerate( [("main", "add melody riff"), ("dev", "update chorus section")] ): db_session.add(MusehubCommit(commit_id=f"struct-{i}", branch=branch, parent_ids=[], message=msg, author="test", timestamp=now + timedelta(seconds=i))) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id=f"struct-{i}")) await db_session.commit() response = await client.get( f"/api/repos/{repo_id}/divergence?branch_a=main&branch_b=dev", headers=auth_headers, ) assert response.status_code == 200 body = response.json() assert body["repoId"] == repo_id assert body["branchA"] == "main" assert body["branchB"] == "dev" assert "commonAncestor" in body assert "overallScore" in body assert isinstance(body["overallScore"], float) assert isinstance(body["dimensions"], list) assert len(body["dimensions"]) == 5 for dim in body["dimensions"]: assert "dimension" in dim assert "level" in dim assert "score" in dim assert "description" in dim assert "branchACommits" in dim assert "branchBCommits" in dim async def test_divergence_endpoint_returns_404_for_unknown_repo( client: AsyncClient, auth_headers: StrDict, ) -> None: """GET /divergence returns 404 for an unknown repo.""" response = await client.get( "/api/repos/no-such-repo/divergence?branch_a=a&branch_b=b", headers=auth_headers, ) assert response.status_code == 404 async def test_divergence_endpoint_returns_422_for_empty_branch( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """GET /divergence returns 422 when a branch has no commits.""" create = await client.post( "/api/repos", json={"name": "empty-branch-repo", "owner": "testuser"}, headers=auth_headers, ) repo_id = create.json()["repoId"] response = await client.get( f"/api/repos/{repo_id}/divergence?branch_a=ghost&branch_b=also-ghost", headers=auth_headers, ) assert response.status_code == 422 # --------------------------------------------------------------------------- # GET /repos/{repo_id}/dag # --------------------------------------------------------------------------- async def test_graph_dag_endpoint_returns_empty_for_new_repo( client: AsyncClient, auth_headers: StrDict, ) -> None: """GET /dag returns empty nodes/edges for a repo with no commits (initialize=false).""" create = await client.post( "/api/repos", json={"name": "dag-empty", "owner": "testuser", "initialize": False}, headers=auth_headers, ) repo_id = create.json()["repoId"] response = await client.get( f"/api/repos/{repo_id}/dag", headers=auth_headers, ) assert response.status_code == 200 body = response.json() assert body["nodes"] == [] assert body["edges"] == [] assert body["headCommitId"] is None async def test_graph_dag_has_edges( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """DAG endpoint returns correct edges representing parent relationships.""" from datetime import datetime, timezone, timedelta create = await client.post( "/api/repos", json={"name": "dag-edges", "owner": "testuser", "initialize": False}, headers=auth_headers, ) repo_id = create.json()["repoId"] now = datetime.now(tz=timezone.utc) root = MusehubCommit(commit_id="root111", branch="main", parent_ids=[], message="root commit", author="gabriel", timestamp=now - timedelta(hours=2)) child = MusehubCommit(commit_id="child222", branch="main", parent_ids=["root111"], message="child commit", author="gabriel", timestamp=now - timedelta(hours=1)) db_session.add_all([root, child]) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id="root111")) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id="child222")) await db_session.commit() response = await client.get( f"/api/repos/{repo_id}/dag", headers=auth_headers, ) assert response.status_code == 200 body = response.json() nodes = body["nodes"] edges = body["edges"] assert len(nodes) == 2 # Verify edge: child → root assert any(e["source"] == "child222" and e["target"] == "root111" for e in edges) async def test_graph_dag_endpoint_topological_order( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """DAG endpoint returns nodes in topological order (oldest ancestor first).""" from datetime import datetime, timedelta, timezone create = await client.post( "/api/repos", json={"name": "dag-topo", "owner": "testuser"}, headers=auth_headers, ) repo_id = create.json()["repoId"] now = datetime.now(tz=timezone.utc) commits = [ MusehubCommit(commit_id="topo-a", branch="main", parent_ids=[], message="root", author="gabriel", timestamp=now - timedelta(hours=3)), MusehubCommit(commit_id="topo-b", branch="main", parent_ids=["topo-a"], message="second", author="gabriel", timestamp=now - timedelta(hours=2)), MusehubCommit(commit_id="topo-c", branch="main", parent_ids=["topo-b"], message="third", author="gabriel", timestamp=now - timedelta(hours=1)), ] db_session.add_all(commits) for cid in ("topo-a", "topo-b", "topo-c"): db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id=cid)) await db_session.commit() response = await client.get( f"/api/repos/{repo_id}/dag", headers=auth_headers, ) assert response.status_code == 200 node_ids = [n["commitId"] for n in response.json()["nodes"]] # Root must appear before children in topological order assert node_ids.index("topo-a") < node_ids.index("topo-b") assert node_ids.index("topo-b") < node_ids.index("topo-c") async def test_graph_dag_nonexistent_repo_returns_404_without_auth(client: AsyncClient) -> None: """GET /dag returns 404 for a non-existent repo without a token. Uses optional_token — auth is visibility-based; missing repo → 404. """ response = await client.get("/api/repos/non-existent-repo/dag") assert response.status_code == 404 async def test_graph_dag_404_for_unknown_repo( client: AsyncClient, auth_headers: StrDict, ) -> None: """GET /dag returns 404 for a non-existent repo.""" response = await client.get( "/api/repos/ghost-repo-dag/dag", headers=auth_headers, ) assert response.status_code == 404 async def test_graph_json_response_has_required_fields( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """DAG JSON response includes nodes (with required fields) and edges arrays.""" from datetime import datetime, timezone create = await client.post( "/api/repos", json={"name": "dag-fields", "owner": "testuser"}, headers=auth_headers, ) repo_id = create.json()["repoId"] db_session.add(MusehubCommit(commit_id="fields-aaa", branch="main", parent_ids=[], message="check fields", author="tester", timestamp=datetime.now(tz=timezone.utc))) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id="fields-aaa")) await db_session.commit() response = await client.get( f"/api/repos/{repo_id}/dag", headers=auth_headers, ) assert response.status_code == 200 body = response.json() assert "nodes" in body assert "edges" in body assert "headCommitId" in body node = body["nodes"][0] for field in ("commitId", "message", "author", "timestamp", "branch", "parentIds", "isHead"): assert field in node, f"Missing field '{field}' in DAG node" # --------------------------------------------------------------------------- # GET /repos/{repo_id}/credits # --------------------------------------------------------------------------- async def _seed_credits_repo(db_session: AsyncSession) -> str: """Create a repo with commits from two distinct authors and return repo_id.""" from datetime import timedelta repo = _make_repo("liner-notes", visibility="public") db_session.add(repo) await db_session.flush() repo_id = str(repo.repo_id) now = datetime.now(tz=timezone.utc) # Alice: 2 commits (most prolific), most recent 1 day ago db_session.add(MusehubCommit(commit_id="alice-001", branch="main", parent_ids=[], message="compose the main melody", author="Alice", timestamp=now - timedelta(days=3))) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id="alice-001")) db_session.add(MusehubCommit(commit_id="alice-002", branch="main", parent_ids=["alice-001"], message="mix the final arrangement", author="Alice", timestamp=now - timedelta(days=1))) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id="alice-002")) # Bob: 1 commit, last active 5 days ago db_session.add(MusehubCommit(commit_id="bob-001", branch="main", parent_ids=[], message="arrange the bridge section", author="Bob", timestamp=now - timedelta(days=5))) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id="bob-001")) await db_session.commit() return repo_id async def test_credits_aggregation( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """GET /api/repos/{repo_id}/credits aggregates contributors from commits.""" repo_id = await _seed_credits_repo(db_session) response = await client.get( f"/api/repos/{repo_id}/credits", headers=auth_headers, ) assert response.status_code == 200 body = response.json() assert body["totalContributors"] == 2 authors = {c["author"] for c in body["contributors"]} assert "Alice" in authors assert "Bob" in authors async def test_credits_sorted_by_count( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """Default sort (count) puts the most prolific contributor first.""" repo_id = await _seed_credits_repo(db_session) response = await client.get( f"/api/repos/{repo_id}/credits?sort=count", headers=auth_headers, ) assert response.status_code == 200 contributors = response.json()["contributors"] assert contributors[0]["author"] == "Alice" assert contributors[0]["sessionCount"] == 2 async def test_credits_sorted_by_recency( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """sort=recency puts the most recently active contributor first.""" repo_id = await _seed_credits_repo(db_session) response = await client.get( f"/api/repos/{repo_id}/credits?sort=recency", headers=auth_headers, ) assert response.status_code == 200 contributors = response.json()["contributors"] # Alice has a commit 1 day ago; Bob's last was 5 days ago assert contributors[0]["author"] == "Alice" async def test_credits_sorted_by_alpha( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """sort=alpha returns contributors in alphabetical order.""" repo_id = await _seed_credits_repo(db_session) response = await client.get( f"/api/repos/{repo_id}/credits?sort=alpha", headers=auth_headers, ) assert response.status_code == 200 contributors = response.json()["contributors"] authors = [c["author"] for c in contributors] assert authors == sorted(authors, key=str.lower) async def test_credits_contribution_types_inferred( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """Contribution types are inferred from commit messages.""" repo_id = await _seed_credits_repo(db_session) response = await client.get( f"/api/repos/{repo_id}/credits", headers=auth_headers, ) assert response.status_code == 200 contributors = response.json()["contributors"] alice = next(c for c in contributors if c["author"] == "Alice") # Alice's commits mention "compose" and "mix" types = set(alice["contributionTypes"]) assert len(types) > 0 async def test_credits_404_for_unknown_repo( client: AsyncClient, auth_headers: StrDict, ) -> None: """GET /api/repos/{unknown}/credits returns 404.""" response = await client.get( "/api/repos/does-not-exist/credits", headers=auth_headers, ) assert response.status_code == 404 async def test_credits_requires_auth( client: AsyncClient, db_session: AsyncSession, ) -> None: """GET /api/repos/{repo_id}/credits returns 401 without MSign auth.""" repo = _make_repo("auth-test-repo") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) response = await client.get(f"/api/repos/{repo.repo_id}/credits") assert response.status_code == 401 async def test_credits_invalid_sort_param( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """GET /api/repos/{repo_id}/credits with invalid sort returns 422.""" repo = _make_repo("sort-test") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) response = await client.get( f"/api/repos/{repo.repo_id}/credits?sort=invalid", headers=auth_headers, ) assert response.status_code == 422 async def test_credits_aggregation_service_direct(db_session: AsyncSession) -> None: """musehub_credits.aggregate_credits() returns correct data without HTTP layer.""" from musehub.services import musehub_credits repo = _make_repo("direct-test") db_session.add(repo) await db_session.flush() repo_id = str(repo.repo_id) now = datetime.now(tz=timezone.utc) db_session.add(MusehubCommit(commit_id="svc-001", branch="main", parent_ids=[], message="produce and mix the drop", author="Charlie", timestamp=now)) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id="svc-001")) await db_session.commit() result = await musehub_credits.aggregate_credits(db_session, repo_id, sort="count") assert result.total_contributors == 1 assert result.contributors[0].author == "Charlie" assert result.contributors[0].session_count == 1 # --------------------------------------------------------------------------- # Compare endpoint # --------------------------------------------------------------------------- async def _make_compare_repo( db_session: AsyncSession, client: AsyncClient, auth_headers: StrDict, ) -> str: """Seed a repo with commits on two branches and return repo_id.""" from datetime import datetime, timezone create = await client.post( "/api/repos", json={"name": "compare-test", "owner": "testuser", "visibility": "private"}, headers=auth_headers, ) assert create.status_code == 201 repo_id: str = str(create.json()["repoId"]) now = datetime.now(tz=timezone.utc) db_session.add(MusehubCommit(commit_id="base001", branch="main", parent_ids=[], message="add melody line", author="Alice", timestamp=now)) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id="base001")) db_session.add(MusehubCommit(commit_id="head001", branch="feature", parent_ids=["base001"], message="add chord progression", author="Bob", timestamp=now)) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id="head001")) await db_session.commit() return repo_id async def test_compare_radar_data( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """GET /api/repos/{id}/compare returns 5 dimension scores.""" repo_id = await _make_compare_repo(db_session, client, auth_headers) response = await client.get( f"/api/repos/{repo_id}/compare?base=main&head=feature", headers=auth_headers, ) assert response.status_code == 200 body = response.json() assert "dimensions" in body assert len(body["dimensions"]) == 5 expected_dims = {"melodic", "harmonic", "rhythmic", "structural", "dynamic"} found_dims = {d["dimension"] for d in body["dimensions"]} assert found_dims == expected_dims for dim in body["dimensions"]: assert 0.0 <= dim["score"] <= 1.0 assert dim["level"] in ("NONE", "LOW", "MED", "HIGH") assert "overallScore" in body assert 0.0 <= body["overallScore"] <= 1.0 async def test_compare_commit_list( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """Commits unique to head are listed in the compare response.""" repo_id = await _make_compare_repo(db_session, client, auth_headers) response = await client.get( f"/api/repos/{repo_id}/compare?base=main&head=feature", headers=auth_headers, ) assert response.status_code == 200 body = response.json() assert "commits" in body # head001 is on feature but not on main commit_ids = [c["commitId"] for c in body["commits"]] assert "head001" in commit_ids # base001 is on main so should NOT appear as unique to head assert "base001" not in commit_ids async def test_compare_unknown_ref_422( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """Unknown ref (branch with no commits) returns 422.""" create = await client.post( "/api/repos", json={"name": "empty-compare", "owner": "testuser", "visibility": "private"}, headers=auth_headers, ) assert create.status_code == 201 repo_id = create.json()["repoId"] response = await client.get( f"/api/repos/{repo_id}/compare?base=nonexistent&head=alsoabsent", headers=auth_headers, ) assert response.status_code == 422 async def test_compare_emotion_diff_fields( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """Compare response includes emotion diff with required delta fields.""" repo_id = await _make_compare_repo(db_session, client, auth_headers) response = await client.get( f"/api/repos/{repo_id}/compare?base=main&head=feature", headers=auth_headers, ) assert response.status_code == 200 body = response.json() assert "emotionDiff" in body ed = body["emotionDiff"] for field in ("energyDelta", "valenceDelta", "tensionDelta", "darknessDelta"): assert field in ed assert -1.0 <= ed[field] <= 1.0 for field in ("baseEnergy", "headEnergy", "baseValence", "headValence"): assert field in ed assert 0.0 <= ed[field] <= 1.0 # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # GET /repos/{repo_id}/settings # --------------------------------------------------------------------------- TEST_OWNER_USER_ID = compute_identity_id(b"testuser") async def test_get_repo_settings_returns_defaults( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """GET /repos/{repo_id}/settings returns full settings with canonical defaults.""" repo = _make_repo("settings-get-test") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) resp = await client.get( f"/api/repos/{repo.repo_id}/settings", headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() assert body["name"] == "settings-get-test" assert body["visibility"] == "private" assert body["hasIssues"] is True assert body["allowMergeCommit"] is True assert body["allowRebaseMerge"] is False assert body["deleteBranchOnMerge"] is True assert body["defaultBranch"] == "main" async def test_get_repo_settings_requires_auth( client: AsyncClient, db_session: AsyncSession, ) -> None: """GET /repos/{repo_id}/settings returns 401 without a MSign Authorization header.""" repo = _make_repo("settings-noauth") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) resp = await client.get(f"/api/repos/{repo.repo_id}/settings") assert resp.status_code == 401 async def test_get_repo_settings_returns_403_for_non_admin( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """GET /repos/{repo_id}/settings returns 403 when caller is not owner or admin.""" repo = _make_repo("settings-403-test", owner="other-owner", owner_user_id=compute_identity_id(b"other-owner"), visibility="public") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) resp = await client.get( f"/api/repos/{repo.repo_id}/settings", headers=auth_headers, ) assert resp.status_code == 403 async def test_get_repo_settings_returns_404_for_unknown_repo( client: AsyncClient, auth_headers: StrDict, ) -> None: """GET /repos/{repo_id}/settings returns 404 for a non-existent repo.""" resp = await client.get( "/api/repos/nonexistent-repo-id/settings", headers=auth_headers, ) assert resp.status_code == 404 # --------------------------------------------------------------------------- # PATCH /repos/{repo_id}/settings # --------------------------------------------------------------------------- async def test_patch_repo_settings_updates_fields( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """PATCH /repos/{repo_id}/settings owner can update dedicated and flag fields.""" repo = _make_repo("settings-patch-test") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) resp = await client.patch( f"/api/repos/{repo.repo_id}/settings", json={ "description": "Updated description", "visibility": "public", "hasIssues": False, "allowRebaseMerge": True, "homepageUrl": "https://muse.app", "topics": ["classical", "baroque"], }, headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() assert body["description"] == "Updated description" assert body["visibility"] == "public" assert body["hasIssues"] is False assert body["allowRebaseMerge"] is True assert body["homepageUrl"] == "https://muse.app" assert body["topics"] == ["classical", "baroque"] # Untouched field should retain its default assert body["allowMergeCommit"] is True async def test_patch_repo_settings_partial_update_preserves_other_fields( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """PATCH with a single field leaves all other settings unchanged.""" repo = _make_repo("settings-partial-test") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) resp = await client.patch( f"/api/repos/{repo.repo_id}/settings", json={"defaultBranch": "develop"}, headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() assert body["defaultBranch"] == "develop" # Other fields kept assert body["name"] == "settings-partial-test" assert body["visibility"] == "private" assert body["hasIssues"] is True async def test_patch_repo_settings_requires_auth( client: AsyncClient, db_session: AsyncSession, ) -> None: """PATCH /repos/{repo_id}/settings returns 401 without a MSign Authorization header.""" repo = _make_repo("settings-patch-noauth") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) resp = await client.patch( f"/api/repos/{repo.repo_id}/settings", json={"visibility": "public"}, ) assert resp.status_code == 401 async def test_patch_repo_settings_returns_403_for_non_admin( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """PATCH /repos/{repo_id}/settings returns 403 when caller is not owner or admin.""" repo = _make_repo("settings-patch-403", owner="other-owner", owner_user_id=compute_identity_id(b"other-owner"), visibility="public") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) resp = await client.patch( f"/api/repos/{repo.repo_id}/settings", json={"hasWiki": True}, headers=auth_headers, ) assert resp.status_code == 403 # --------------------------------------------------------------------------- # DELETE /repos/{repo_id} — soft-delete # --------------------------------------------------------------------------- async def test_delete_repo_returns_204( client: AsyncClient, auth_headers: StrDict, ) -> None: """DELETE /repos/{repo_id} soft-deletes a repo owned by the caller and returns 204.""" create = await client.post( "/api/repos", json={"name": "to-delete", "owner": "testuser", "visibility": "private"}, headers=auth_headers, ) assert create.status_code == 201 repo_id = create.json()["repoId"] resp = await client.delete(f"/api/repos/{repo_id}", headers=auth_headers) assert resp.status_code == 204 async def test_delete_repo_hides_repo_from_get( client: AsyncClient, auth_headers: StrDict, ) -> None: """After DELETE, GET /repos/{repo_id} returns 404.""" create = await client.post( "/api/repos", json={"name": "hidden-after-delete", "owner": "testuser", "visibility": "private"}, headers=auth_headers, ) repo_id = create.json()["repoId"] await client.delete(f"/api/repos/{repo_id}", headers=auth_headers) get_resp = await client.get(f"/api/repos/{repo_id}", headers=auth_headers) assert get_resp.status_code == 404 async def test_delete_repo_removes_row_from_db( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """DELETE /repos/{repo_id} must hard-delete — no row must remain in the DB.""" from sqlalchemy import select as sa_select from musehub.db.musehub_repo_models import MusehubRepo create = await client.post( "/api/repos", json={"name": "hard-delete-me", "owner": "testuser", "visibility": "private"}, headers=auth_headers, ) assert create.status_code == 201 repo_id = create.json()["repoId"] resp = await client.delete(f"/api/repos/{repo_id}", headers=auth_headers) assert resp.status_code == 204 row = ( await db_session.execute( sa_select(MusehubRepo).where(MusehubRepo.repo_id == repo_id) ) ).scalar_one_or_none() assert row is None, "Hard delete must remove the row — soft delete is not acceptable" async def test_delete_repo_requires_auth( client: AsyncClient, db_session: AsyncSession, ) -> None: """DELETE /repos/{repo_id} returns 401 without a MSign Authorization header.""" repo = _make_repo("delete-noauth", visibility="public") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) resp = await client.delete(f"/api/repos/{repo.repo_id}") assert resp.status_code == 401 async def test_delete_repo_returns_403_for_non_owner( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """DELETE /repos/{repo_id} returns 403 when caller is not the owner.""" repo = _make_repo("delete-403", owner="other-owner", owner_user_id=compute_identity_id(b"other-owner"), visibility="public") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) resp = await client.delete( f"/api/repos/{repo.repo_id}", headers=auth_headers ) assert resp.status_code == 403 async def test_delete_repo_returns_404_for_unknown_repo( client: AsyncClient, auth_headers: StrDict, ) -> None: """DELETE /repos/{repo_id} returns 404 for a non-existent repo.""" resp = await client.delete( "/api/repos/nonexistent-repo-id", headers=auth_headers ) assert resp.status_code == 404 async def test_delete_repo_service_hard_deletes_row( db_session: AsyncSession, ) -> None: """delete_repo() service hard-deletes the row from the DB.""" repo = await musehub_repository.create_repo( db_session, name="svc-delete-test", owner="testuser", visibility="private", owner_user_id=compute_identity_id(b"testuser"), ) repo_id = repo.repo_id await db_session.commit() deleted = await musehub_repository.delete_repo(db_session, repo_id) await db_session.commit() assert deleted is True # Row must be completely gone from the DB row = await db_session.get(MusehubRepo, repo_id) assert row is None async def test_delete_repo_service_returns_false_for_unknown( db_session: AsyncSession, ) -> None: """delete_repo() returns False for a non-existent repo.""" result = await musehub_repository.delete_repo(db_session, "does-not-exist") assert result is False # --------------------------------------------------------------------------- # POST /repos/{repo_id}/transfer — transfer ownership # --------------------------------------------------------------------------- async def test_transfer_repo_ownership_returns_200( client: AsyncClient, auth_headers: StrDict, ) -> None: """POST /repos/{repo_id}/transfer returns 200 with updated ownerUserId.""" create = await client.post( "/api/repos", json={"name": "transfer-me", "owner": "testuser", "visibility": "private"}, headers=auth_headers, ) assert create.status_code == 201 repo_id = create.json()["repoId"] new_owner = "another-user-id-1234" resp = await client.post( f"/api/repos/{repo_id}/transfer", json={"newOwnerUserId": new_owner}, headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() assert body["ownerUserId"] == new_owner assert body["repoId"] == repo_id async def test_transfer_repo_requires_auth( client: AsyncClient, db_session: AsyncSession, ) -> None: """POST /repos/{repo_id}/transfer returns 401 without an MSign token.""" repo = _make_repo("transfer-noauth", visibility="public") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) resp = await client.post( f"/api/repos/{repo.repo_id}/transfer", json={"newOwnerUserId": "new-user-id"}, ) assert resp.status_code == 401 # --------------------------------------------------------------------------- # Wizard creation endpoint — # --------------------------------------------------------------------------- async def test_create_repo_wizard_initialize_creates_branch_and_commit( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """POST /repos with initialize=true creates a default branch + initial commit.""" resp = await client.post( "/api/repos", json={ "name": "wizard-init-repo", "owner": "testuser", "visibility": "public", "initialize": True, "defaultBranch": "main", }, headers=auth_headers, ) assert resp.status_code == 201 repo_id = resp.json()["repoId"] branches_resp = await client.get( f"/api/repos/{repo_id}/branches", headers=auth_headers, ) assert branches_resp.status_code == 200 branches = branches_resp.json()["branches"] assert any(b["name"] == "main" for b in branches), "Expected 'main' branch to be created" commits_resp = await client.get( f"/api/repos/{repo_id}/commits", headers=auth_headers, ) assert commits_resp.status_code == 200 commits = commits_resp.json()["commits"] assert len(commits) == 1 assert commits[0]["message"] == "Initial commit" async def test_create_repo_wizard_no_initialize_stays_empty( client: AsyncClient, auth_headers: StrDict, ) -> None: """POST /repos with initialize=false leaves branches and commits empty.""" resp = await client.post( "/api/repos", json={ "name": "wizard-noinit-repo", "owner": "testuser", "initialize": False, }, headers=auth_headers, ) assert resp.status_code == 201 repo_id = resp.json()["repoId"] branches_resp = await client.get( f"/api/repos/{repo_id}/branches", headers=auth_headers, ) assert branches_resp.json()["branches"] == [] commits_resp = await client.get( f"/api/repos/{repo_id}/commits", headers=auth_headers, ) assert commits_resp.json()["commits"] == [] async def test_create_repo_wizard_topics_merged_into_tags( client: AsyncClient, auth_headers: StrDict, ) -> None: """POST /repos with topics merges them into the tag list (deduplicated).""" resp = await client.post( "/api/repos", json={ "name": "topics-test-repo", "owner": "testuser", "tags": ["jazz"], "topics": ["classical", "jazz"], # 'jazz' deduped "initialize": False, }, headers=auth_headers, ) assert resp.status_code == 201 body = resp.json() tags: list[str] = body["tags"] assert "jazz" in tags assert "classical" in tags assert tags.count("jazz") == 1, "Duplicate 'jazz' must be removed" async def test_create_repo_wizard_clone_url_uses_https_scheme( client: AsyncClient, auth_headers: StrDict, ) -> None: """Clone URL returned by POST /repos uses an http(s):// scheme, not musehub://. musehub:// is not a scheme the muse CLI understands. The clone URL must be a valid HTTP(S) URL so that `muse clone ` works without --hub. """ resp = await client.post( "/api/repos", json={"name": "clone-url-test", "owner": "testuser", "initialize": False}, headers=auth_headers, ) assert resp.status_code == 201 clone_url: str = resp.json()["cloneUrl"] assert clone_url.startswith("http"), f"Expected http(s):// prefix, got: {clone_url}" assert "musehub://" not in clone_url, "musehub:// is not a valid CLI scheme" assert "testuser" in clone_url async def test_create_repo_wizard_template_copies_description( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """POST /repos with template_repo_id copies description from a public template.""" template = _make_repo( "template-source", owner="template-owner", owner_user_id=compute_identity_id(b"template-owner"), visibility="public", description="A great neo-baroque composition template", tags=["baroque", "piano"], ) db_session.add(template) await db_session.commit() await db_session.refresh(template) template_id = str(template.repo_id) resp = await client.post( "/api/repos", json={ "name": "from-template-repo", "owner": "testuser", "initialize": False, "templateRepoId": template_id, }, headers=auth_headers, ) assert resp.status_code == 201 body = resp.json() assert body["description"] == "A great neo-baroque composition template" assert "baroque" in body["tags"] assert "piano" in body["tags"] async def test_create_repo_wizard_private_template_not_copied( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Private template repo metadata is NOT copied (must be public).""" private_template = _make_repo( "private-template", owner="secret-owner", owner_user_id=compute_identity_id(b"secret-owner"), description="Secret description", tags=["secret"], ) db_session.add(private_template) await db_session.commit() await db_session.refresh(private_template) template_id = str(private_template.repo_id) resp = await client.post( "/api/repos", json={ "name": "refused-template-repo", "owner": "testuser", "description": "My own description", "initialize": False, "templateRepoId": template_id, }, headers=auth_headers, ) assert resp.status_code == 201 body = resp.json() # Private template must not override user's own description assert body["description"] == "My own description" assert "secret" not in body["tags"] async def test_create_repo_wizard_custom_default_branch( client: AsyncClient, auth_headers: StrDict, ) -> None: """POST /repos with initialize=true and custom defaultBranch creates the right branch.""" resp = await client.post( "/api/repos", json={ "name": "custom-branch-repo", "owner": "testuser", "initialize": True, "defaultBranch": "develop", }, headers=auth_headers, ) assert resp.status_code == 201 repo_id = resp.json()["repoId"] branches_resp = await client.get( f"/api/repos/{repo_id}/branches", headers=auth_headers, ) branch_names = [b["name"] for b in branches_resp.json()["branches"]] assert "develop" in branch_names assert "main" not in branch_names # --------------------------------------------------------------------------- # GET /repos — list repos for authenticated user # --------------------------------------------------------------------------- async def test_list_my_repos_returns_owned_repos( client: AsyncClient, auth_headers: StrDict, ) -> None: """GET /repos returns repos created by the authenticated user.""" # Create two repos for name in ("owned-repo-a", "owned-repo-b"): await client.post( "/api/repos", json={"name": name, "owner": "testuser", "initialize": False}, headers=auth_headers, ) resp = await client.get("/api/repos", headers=auth_headers) assert resp.status_code == 200 body = resp.json() assert "repos" in body assert "total" in body assert "nextCursor" in body names = [r["name"] for r in body["repos"]] assert "owned-repo-a" in names assert "owned-repo-b" in names async def test_list_my_repos_requires_auth(client: AsyncClient) -> None: """GET /repos returns 401 without an MSign token.""" resp = await client.get("/api/repos") assert resp.status_code == 401 async def test_transfer_repo_returns_403_for_non_owner( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """POST /repos/{repo_id}/transfer returns 403 when caller is not the owner.""" repo = _make_repo("transfer-403", owner="other-owner", owner_user_id=compute_identity_id(b"other-owner"), visibility="public") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) resp = await client.post( f"/api/repos/{repo.repo_id}/transfer", json={"newOwnerUserId": "attacker-user-id"}, headers=auth_headers, ) assert resp.status_code == 403 async def test_transfer_repo_returns_404_for_unknown_repo( client: AsyncClient, auth_headers: StrDict, ) -> None: """POST /repos/{repo_id}/transfer returns 404 for a non-existent repo.""" resp = await client.post( "/api/repos/nonexistent-repo-id/transfer", json={"newOwnerUserId": "some-user"}, headers=auth_headers, ) assert resp.status_code == 404 async def test_transfer_repo_service_updates_owner_user_id( db_session: AsyncSession, ) -> None: """transfer_repo_ownership() service updates owner_user_id on the row.""" _new_owner_id = compute_identity_id(b"new-owner") repo = await musehub_repository.create_repo( db_session, name="svc-transfer-test", owner="testuser", visibility="private", owner_user_id=compute_identity_id(b"original-owner"), ) await db_session.commit() updated = await musehub_repository.transfer_repo_ownership( db_session, repo.repo_id, _new_owner_id ) await db_session.commit() assert updated is not None assert updated.owner_user_id == _new_owner_id # Verify persisted fetched = await musehub_repository.get_repo(db_session, repo.repo_id) assert fetched is not None assert fetched.owner_user_id == _new_owner_id async def test_transfer_repo_service_returns_none_for_unknown( db_session: AsyncSession, ) -> None: """transfer_repo_ownership() returns None for a non-existent repo.""" result = await musehub_repository.transfer_repo_ownership( db_session, "does-not-exist", "new-owner" ) assert result is None # --------------------------------------------------------------------------- # GET /repos — list repos for authenticated user # --------------------------------------------------------------------------- async def test_list_my_repos_total_matches_count( client: AsyncClient, auth_headers: StrDict, ) -> None: """total field in GET /repos matches the number of repos created.""" initial = await client.get("/api/repos", headers=auth_headers) initial_total: int = initial.json()["total"] await client.post( "/api/repos", json={"name": "total-count-test", "owner": "testuser", "initialize": False}, headers=auth_headers, ) resp = await client.get("/api/repos", headers=auth_headers) assert resp.status_code == 200 assert resp.json()["total"] == initial_total + 1 async def test_list_my_repos_pagination_cursor( client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """GET /repos with limit=1 returns a nextCursor that fetches the next page.""" from datetime import timedelta now = datetime.now(tz=timezone.utc) for i in range(3): slug = f"paged-repo-{i}" created_at = now - timedelta(seconds=i) repo = MusehubRepo( repo_id=compute_repo_id(TEST_OWNER_USER_ID, slug, "code", created_at.isoformat()), name=slug, owner="testuser", slug=slug, visibility="public", owner_user_id=TEST_OWNER_USER_ID, created_at=created_at, updated_at=created_at, ) db_session.add(repo) await db_session.commit() first_page = await client.get( "/api/repos?limit=1", headers=auth_headers, ) assert first_page.status_code == 200 body = first_page.json() assert len(body["repos"]) == 1 next_cursor = body["nextCursor"] assert next_cursor is not None second_page = await client.get( f"/api/repos?limit=1&cursor={next_cursor}", headers=auth_headers, ) assert second_page.status_code == 200 second_body = second_page.json() assert len(second_body["repos"]) == 1 # Pages must not overlap first_id = body["repos"][0]["repoId"] second_id = second_body["repos"][0]["repoId"] assert first_id != second_id async def test_list_my_repos_service_direct(db_session: AsyncSession) -> None: """list_repos_for_user() returns only repos owned by the given user.""" from musehub.services.musehub_repository import list_repos_for_user owner_handle = "user-list-direct" other_handle = "user-other-direct" repo_mine = _make_repo("mine-direct", owner=owner_handle, owner_user_id=compute_identity_id(owner_handle.encode())) repo_other = _make_repo("not-mine-direct", owner=other_handle, owner_user_id=compute_identity_id(other_handle.encode())) db_session.add_all([repo_mine, repo_other]) await db_session.commit() result = await list_repos_for_user(db_session, owner_handle) repo_ids = {r.repo_id for r in result.repos} assert str(repo_mine.repo_id) in repo_ids assert str(repo_other.repo_id) not in repo_ids # --------------------------------------------------------------------------- # GET /repos/{repo_id}/collaborators/{username}/permission # --------------------------------------------------------------------------- async def test_collab_access_owner_returns_owner_permission( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """Owner's username returns permission='owner' with accepted_at=null.""" from musehub.db.musehub_collaborator_models import MusehubCollaborator owner_id = TEST_OWNER_USER_ID repo = _make_repo("access-owner-test") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) resp = await client.get( f"/api/repos/{repo.repo_id}/collaborators/{owner_id}/permission", headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() assert body["username"] == owner_id assert body["permission"] == "owner" assert body["acceptedAt"] is None async def test_collab_access_collaborator_returns_permission( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """A known collaborator returns their permission level and accepted_at.""" from musehub.db.musehub_collaborator_models import MusehubCollaborator owner_id = TEST_OWNER_USER_ID collab_user_id = "collab-user-write" repo = _make_repo("access-collab-test") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) accepted = datetime(2026, 1, 10, 10, 0, 0, tzinfo=timezone.utc) _rid = str(repo.repo_id) collab = MusehubCollaborator( id=compute_collaborator_id(_rid, collab_user_id, accepted.isoformat()), repo_id=_rid, identity_handle=collab_user_id, permission="write", accepted_at=accepted, ) db_session.add(collab) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/collaborators/{collab_user_id}/permission", headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() assert body["username"] == collab_user_id assert body["permission"] == "write" assert body["acceptedAt"] is not None async def test_collab_access_non_collaborator_returns_404( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """A user who is not a collaborator returns 404 with an informative message.""" repo = _make_repo("access-404-test") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) stranger = "total-stranger-user" resp = await client.get( f"/api/repos/{repo.repo_id}/collaborators/{stranger}/permission", headers=auth_headers, ) assert resp.status_code == 404 assert stranger in resp.json()["detail"] async def test_collab_access_unknown_repo_returns_404( client: AsyncClient, auth_headers: StrDict, ) -> None: """Querying an unknown repo_id returns 404.""" resp = await client.get( "/api/repos/nonexistent-repo/collaborators/anyone/permission", headers=auth_headers, ) assert resp.status_code == 404 async def test_collab_access_requires_auth( client: AsyncClient, db_session: AsyncSession, ) -> None: """GET /collaborators/{username}/permission returns 401 without an MSign token.""" repo = _make_repo("access-auth-test", visibility="public") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) resp = await client.get( f"/api/repos/{repo.repo_id}/collaborators/anyone/permission" ) assert resp.status_code == 401 async def test_collab_access_admin_permission( client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict, ) -> None: """A collaborator with admin permission returns permission='admin'.""" from musehub.db.musehub_collaborator_models import MusehubCollaborator repo = _make_repo("access-admin-test") db_session.add(repo) await db_session.commit() await db_session.refresh(repo) admin_user = "admin-collab-user" _rid = str(repo.repo_id) _now = datetime.now(tz=timezone.utc) collab = MusehubCollaborator( id=compute_collaborator_id(_rid, admin_user, _now.isoformat()), repo_id=_rid, identity_handle=admin_user, permission="admin", accepted_at=None, ) db_session.add(collab) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/collaborators/{admin_user}/permission", headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() assert body["permission"] == "admin"