"""Tests for the Snapshots REST API. Endpoints covered ----------------- GET /api/repos/{repo_id}/snapshots GET /api/repos/{repo_id}/snapshots/{snapshot_id} GET /api/repos/{repo_id}/snapshots/{snapshot_id}/entries GET /api/repos/{repo_id}/commits/{commit_id}/snapshot GET /api/repos/{repo_id}/snapshots/{snapshot_id}/diff POST /api/repos/{repo_id}/snapshots/batch Test strategy ------------- - Every happy path is covered with a seeded DB fixture. - Every 401 / 404 / 422 error path is covered. - Cursor-based pagination (Link header + limit cap) is verified. - The X-Snapshot-Entry-Count response header is verified on detail + entries. - Diff counts (added / removed / modified / unchanged) are verified. - Batch lookup is tested for both summary and full-entry modes. - Private-repo visibility is tested end-to-end. - Security: cross-repo snapshot access is blocked at the service layer. """ from __future__ import annotations import secrets from datetime import datetime, timezone import pytest from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession import msgpack from muse.core.types import fake_id from musehub.core.genesis import compute_identity_id, compute_repo_id from musehub.db.musehub_repo_models import MusehubCommit, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef from musehub.types.json_types import StrDict # --------------------------------------------------------------------------- # Seed helpers # --------------------------------------------------------------------------- def _uid() -> str: return secrets.token_hex(16) async def _make_repo( db: AsyncSession, *, visibility: str = "public", name: str | None = None, ) -> MusehubRepo: """Seed a repo row, commit, and return it.""" slug = name or f"repo-{_uid()[:8]}" created_at = datetime.now(tz=timezone.utc) owner_id = compute_identity_id(b"testuser") repo_id = compute_repo_id(owner_id, slug, "code", created_at.isoformat()) repo = MusehubRepo( repo_id=repo_id, name=slug, owner="testuser", slug=slug, visibility=visibility, owner_user_id=owner_id, created_at=created_at, updated_at=created_at, ) db.add(repo) await db.commit() return repo async def _make_snapshot( db: AsyncSession, repo_id: str, *, manifest: StrDict | None = None, directories: list[str] | None = None, snapshot_id: str | None = None, created_at: datetime | None = None, ) -> MusehubSnapshot: """Seed a snapshot row with manifest_blob and entry_count, then commit.""" effective_manifest: StrDict = manifest or {} sid = snapshot_id or fake_id(_uid()) now = created_at or datetime.now(timezone.utc) snap = MusehubSnapshot( snapshot_id=sid, directories=sorted(directories or []), manifest_blob=msgpack.packb(effective_manifest, use_bin_type=True), entry_count=len(effective_manifest), created_at=now, ) db.add(snap) db.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=sid, created_at=now)) await db.commit() return snap async def _make_commit( db: AsyncSession, repo_id: str, *, snapshot_id: str | None = None, branch: str = "main", ) -> MusehubCommit: """Seed a commit row, commit, and return it.""" commit_id = fake_id(_uid()) commit = MusehubCommit( commit_id=commit_id, branch=branch, parent_ids=[], message="test commit", author="testuser", timestamp=datetime.now(timezone.utc), snapshot_id=snapshot_id, ) db.add(commit) await db.commit() return commit _MANIFEST_A = { "muse/core/store.py": fake_id("oid-store-001"), "muse/core/snapshot.py": fake_id("oid-snap-001"), "tests/test_store.py": fake_id("oid-test-001"), } _MANIFEST_B = { "muse/core/store.py": fake_id("oid-store-002"), # modified "muse/core/pack.py": fake_id("oid-pack-001"), # added "tests/test_store.py": fake_id("oid-test-001"), # unchanged # muse/core/snapshot.py removed } # --------------------------------------------------------------------------- # GET /api/repos/{repo_id}/snapshots — list # --------------------------------------------------------------------------- class TestListSnapshots: """GET /api/repos/{repo_id}/snapshots""" async def test_empty_repo_returns_empty_list( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """A repo with no snapshots returns an empty list and total=0.""" repo = await _make_repo(db_session) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots", headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() assert body["snapshots"] == [] assert body["total"] == 0 async def test_returns_snapshots_newest_first( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Snapshots are returned newest-first by created_at.""" import asyncio from datetime import timedelta repo = await _make_repo(db_session) now = datetime.now(timezone.utc) snap_old = await _make_snapshot( db_session, repo.repo_id, created_at=now - timedelta(hours=2) ) snap_new = await _make_snapshot(db_session, repo.repo_id, created_at=now) resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots", headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() assert body["total"] == 2 ids = [s["snapshotId"] for s in body["snapshots"]] assert ids[0] == snap_new.snapshot_id assert ids[1] == snap_old.snapshot_id async def test_summary_includes_entry_count_and_size( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Each summary carries entry_count and total_size_bytes without full manifest.""" repo = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_A) resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots", headers=auth_headers, ) assert resp.status_code == 200 summary = resp.json()["snapshots"][0] assert summary["snapshotId"] == snap.snapshot_id assert summary["entryCount"] == len(_MANIFEST_A) assert summary["totalSizeBytes"] == 0 assert "entries" not in summary async def test_pagination_link_header_present( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Link header with rel=next is set when more pages exist.""" repo = await _make_repo(db_session) for _ in range(5): await _make_snapshot(db_session, repo.repo_id) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots?limit=2", headers=auth_headers, ) assert resp.status_code == 200 assert "Link" in resp.headers assert 'rel="next"' in resp.headers["Link"] assert resp.json()["nextCursor"] is not None async def test_limit_capped_at_200( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Requesting limit > 200 returns 422.""" repo = await _make_repo(db_session) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots?limit=201", headers=auth_headers, ) assert resp.status_code == 422 async def test_unknown_repo_returns_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Non-existent repo_id returns 404.""" resp = await client.get( f"/api/repos/{_uid()}/snapshots", headers=auth_headers, ) assert resp.status_code == 404 async def test_private_repo_requires_auth( self, client: AsyncClient, db_session: AsyncSession, ) -> None: """Unauthenticated access to a private repo returns 401.""" repo = await _make_repo(db_session, visibility="private") await db_session.commit() resp = await client.get(f"/api/repos/{repo.repo_id}/snapshots") assert resp.status_code == 401 assert resp.headers.get("WWW-Authenticate", "").startswith("MSign") async def test_public_repo_allows_unauthenticated( self, client: AsyncClient, db_session: AsyncSession, ) -> None: """Public repos can be read without auth.""" repo = await _make_repo(db_session, visibility="public") await db_session.commit() resp = await client.get(f"/api/repos/{repo.repo_id}/snapshots") assert resp.status_code == 200 # --------------------------------------------------------------------------- # GET /api/repos/{repo_id}/snapshots/{snapshot_id} — detail # --------------------------------------------------------------------------- class TestGetSnapshot: """GET /api/repos/{repo_id}/snapshots/{snapshot_id}""" async def test_returns_full_manifest_sorted_by_path( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Entries are present and sorted alphabetically by path.""" repo = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_A) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots/{snap.snapshot_id}", headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() assert body["snapshotId"] == snap.snapshot_id paths = [e["path"] for e in body["entries"]] assert paths == sorted(_MANIFEST_A.keys()) async def test_entry_count_header_matches_body( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """X-Snapshot-Entry-Count header equals len(entries) in the body.""" repo = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_A) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots/{snap.snapshot_id}", headers=auth_headers, ) assert resp.status_code == 200 assert resp.headers["X-Snapshot-Entry-Count"] == str(len(_MANIFEST_A)) assert resp.json()["entryCount"] == len(_MANIFEST_A) async def test_directories_round_trip( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """directories field is stored and returned faithfully.""" dirs = ["muse", "muse/core", "tests"] repo = await _make_repo(db_session) snap = await _make_snapshot( db_session, repo.repo_id, manifest=_MANIFEST_A, directories=dirs ) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots/{snap.snapshot_id}", headers=auth_headers, ) assert resp.status_code == 200 assert resp.json()["directories"] == sorted(dirs) async def test_cross_repo_snapshot_returns_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Snapshot owned by a different repo returns 404, not the snapshot.""" repo_a = await _make_repo(db_session) repo_b = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo_a.repo_id, manifest=_MANIFEST_A) await db_session.commit() # Request via repo_b's ID — must not leak the snapshot resp = await client.get( f"/api/repos/{repo_b.repo_id}/snapshots/{snap.snapshot_id}", headers=auth_headers, ) assert resp.status_code == 404 async def test_unknown_snapshot_returns_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Non-existent snapshot_id returns 404.""" repo = await _make_repo(db_session) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots/{fake_id(_uid())}", headers=auth_headers, ) assert resp.status_code == 404 async def test_empty_snapshot_has_zero_entry_count( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """A snapshot with no entries returns entryCount=0 and entries=[].""" repo = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo.repo_id, manifest={}) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots/{snap.snapshot_id}", headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() assert body["entries"] == [] assert body["entryCount"] == 0 assert body["totalSizeBytes"] == 0 # --------------------------------------------------------------------------- # GET /api/repos/{repo_id}/snapshots/{snapshot_id}/entries — paginated # --------------------------------------------------------------------------- class TestListSnapshotEntries: """GET /api/repos/{repo_id}/snapshots/{snapshot_id}/entries""" async def test_paginated_entries_sorted_by_path( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Entries are sorted by path and paginated correctly.""" # Build a 5-entry manifest manifest = {f"file_{i:02d}.py": f"oid-{i:03d}" for i in range(5)} repo = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo.repo_id, manifest=manifest) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots/{snap.snapshot_id}/entries" "?limit=2", headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() assert body["total"] == 5 assert len(body["entries"]) == 2 # First two alphabetically assert body["entries"][0]["path"] == "file_00.py" assert body["entries"][1]["path"] == "file_01.py" async def test_entry_count_header_on_entries_endpoint( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """X-Snapshot-Entry-Count is set and second page loads via cursor.""" manifest = {f"file_{i:02d}.py": f"oid-{i:03d}" for i in range(6)} repo = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo.repo_id, manifest=manifest) await db_session.commit() # Get first page to obtain cursor r1 = await client.get( f"/api/repos/{repo.repo_id}/snapshots/{snap.snapshot_id}/entries" "?limit=3", headers=auth_headers, ) assert r1.status_code == 200 assert r1.headers["X-Snapshot-Entry-Count"] == "6" cursor = r1.json()["nextCursor"] assert cursor is not None # Get second page via cursor r2 = await client.get( f"/api/repos/{repo.repo_id}/snapshots/{snap.snapshot_id}/entries" f"?limit=3&cursor={cursor}", headers=auth_headers, ) assert r2.status_code == 200 assert r2.headers["X-Snapshot-Entry-Count"] == "6" assert len(r2.json()["entries"]) == 3 async def test_link_header_on_entries( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """RFC 8288 Link header is present when entries span multiple pages.""" manifest = {f"file_{i:02d}.py": f"oid-{i:03d}" for i in range(10)} repo = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo.repo_id, manifest=manifest) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots/{snap.snapshot_id}/entries" "?limit=3", headers=auth_headers, ) assert resp.status_code == 200 assert 'rel="next"' in resp.headers["Link"] async def test_limit_capped_at_200_for_entries( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """limit > 200 returns 422 for entries endpoint.""" repo = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo.repo_id) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots/{snap.snapshot_id}/entries" "?limit=201", headers=auth_headers, ) assert resp.status_code == 422 async def test_cross_repo_entries_returns_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Entries request via wrong repo_id returns 404.""" repo_a = await _make_repo(db_session) repo_b = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo_a.repo_id, manifest=_MANIFEST_A) await db_session.commit() resp = await client.get( f"/api/repos/{repo_b.repo_id}/snapshots/{snap.snapshot_id}/entries", headers=auth_headers, ) assert resp.status_code == 404 # --------------------------------------------------------------------------- # GET /api/repos/{repo_id}/commits/{commit_id}/snapshot — commit shortcut # --------------------------------------------------------------------------- class TestGetCommitSnapshot: """GET /api/repos/{repo_id}/commits/{commit_id}/snapshot""" async def test_resolves_commit_to_snapshot( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Returns the snapshot attached to the commit.""" repo = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_A) commit = await _make_commit(db_session, repo.repo_id, snapshot_id=snap.snapshot_id) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/commits/{commit.commit_id}/snapshot", headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() assert body["snapshotId"] == snap.snapshot_id paths = {e["path"] for e in body["entries"]} assert paths == set(_MANIFEST_A.keys()) async def test_commit_without_snapshot_returns_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Commit with snapshot_id=None returns 404.""" repo = await _make_repo(db_session) commit = await _make_commit(db_session, repo.repo_id, snapshot_id=None) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/commits/{commit.commit_id}/snapshot", headers=auth_headers, ) assert resp.status_code == 404 async def test_unknown_commit_returns_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Non-existent commit_id returns 404.""" repo = await _make_repo(db_session) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/commits/{fake_id(_uid())}/snapshot", headers=auth_headers, ) assert resp.status_code == 404 async def test_entry_count_header_is_set( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """X-Snapshot-Entry-Count is set on the commit → snapshot shortcut.""" repo = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_A) commit = await _make_commit(db_session, repo.repo_id, snapshot_id=snap.snapshot_id) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/commits/{commit.commit_id}/snapshot", headers=auth_headers, ) assert resp.status_code == 200 assert resp.headers["X-Snapshot-Entry-Count"] == str(len(_MANIFEST_A)) async def test_cross_repo_commit_returns_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Commit from a different repo returns 404.""" repo_a = await _make_repo(db_session) repo_b = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo_a.repo_id, manifest=_MANIFEST_A) commit = await _make_commit(db_session, repo_a.repo_id, snapshot_id=snap.snapshot_id) await db_session.commit() resp = await client.get( f"/api/repos/{repo_b.repo_id}/commits/{commit.commit_id}/snapshot", headers=auth_headers, ) assert resp.status_code == 404 # --------------------------------------------------------------------------- # GET /api/repos/{repo_id}/snapshots/{snapshot_id}/diff — diff # --------------------------------------------------------------------------- class TestDiffSnapshots: """GET /api/repos/{repo_id}/snapshots/{snapshot_id}/diff?base={base_id}""" async def test_diff_counts_added_removed_modified( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Diff between MANIFEST_A (base) and MANIFEST_B (new) is computed correctly. MANIFEST_B vs MANIFEST_A: added: muse/core/pack.py removed: muse/core/snapshot.py modified: muse/core/store.py unchanged: tests/test_store.py """ repo = await _make_repo(db_session) snap_base = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_A) snap_new = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_B) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots/{snap_new.snapshot_id}/diff" f"?base={snap_base.snapshot_id}", headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() assert body["snapshotId"] == snap_new.snapshot_id assert body["baseSnapshotId"] == snap_base.snapshot_id assert body["addedCount"] == 1 assert body["removedCount"] == 1 assert body["modifiedCount"] == 1 assert body["unchangedCount"] == 1 # tests/test_store.py async def test_diff_changes_list_excludes_unchanged_by_default( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """changes list does not include unchanged entries unless includeUnchanged=true.""" repo = await _make_repo(db_session) snap_base = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_A) snap_new = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_B) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots/{snap_new.snapshot_id}/diff" f"?base={snap_base.snapshot_id}", headers=auth_headers, ) assert resp.status_code == 200 statuses = {e["status"] for e in resp.json()["changes"]} assert "unchanged" not in statuses async def test_diff_include_unchanged( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """includeUnchanged=true emits unchanged entries.""" repo = await _make_repo(db_session) snap_base = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_A) snap_new = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_B) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots/{snap_new.snapshot_id}/diff" f"?base={snap_base.snapshot_id}&includeUnchanged=true", headers=auth_headers, ) assert resp.status_code == 200 statuses = [e["status"] for e in resp.json()["changes"]] assert "unchanged" in statuses async def test_diff_same_snapshot_returns_422( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Diffing a snapshot against itself returns 422.""" repo = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_A) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots/{snap.snapshot_id}/diff" f"?base={snap.snapshot_id}", headers=auth_headers, ) assert resp.status_code == 422 async def test_diff_missing_base_returns_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Non-existent base snapshot returns 404.""" repo = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_A) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots/{snap.snapshot_id}/diff" f"?base={fake_id(_uid())}", headers=auth_headers, ) assert resp.status_code == 404 async def test_diff_missing_base_query_param_returns_422( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Omitting the required 'base' query parameter returns 422.""" repo = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_A) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots/{snap.snapshot_id}/diff", headers=auth_headers, ) assert resp.status_code == 422 async def test_diff_bytes_delta_is_accurate( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """bytes_added and bytes_removed are computed from size_bytes of changed entries.""" repo = await _make_repo(db_session) snap_base = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_A) snap_new = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_B) await db_session.commit() resp = await client.get( f"/api/repos/{repo.repo_id}/snapshots/{snap_new.snapshot_id}/diff" f"?base={snap_base.snapshot_id}", headers=auth_headers, ) assert resp.status_code == 200 body = resp.json() # bytes_added / bytes_removed are non-negative assert body["bytesAdded"] >= 0 assert body["bytesRemoved"] >= 0 async def test_diff_cross_repo_returns_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """A base snapshot owned by a different repo returns 404.""" repo_a = await _make_repo(db_session) repo_b = await _make_repo(db_session) snap_a = await _make_snapshot(db_session, repo_a.repo_id, manifest=_MANIFEST_A) snap_b = await _make_snapshot(db_session, repo_b.repo_id, manifest=_MANIFEST_B) await db_session.commit() # snap_b belongs to repo_b — using it as a base against repo_a's snapshot must fail resp = await client.get( f"/api/repos/{repo_a.repo_id}/snapshots/{snap_a.snapshot_id}/diff" f"?base={snap_b.snapshot_id}", headers=auth_headers, ) assert resp.status_code == 404 # --------------------------------------------------------------------------- # POST /api/repos/{repo_id}/snapshots/batch — bulk lookup # --------------------------------------------------------------------------- class TestBatchGetSnapshots: """POST /api/repos/{repo_id}/snapshots/batch""" async def test_batch_summary_mode( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Batch without include_entries returns lightweight summaries.""" repo = await _make_repo(db_session) snap_a = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_A) snap_b = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_B) await db_session.commit() resp = await client.post( f"/api/repos/{repo.repo_id}/snapshots/batch", json={"snapshotIds": [snap_a.snapshot_id, snap_b.snapshot_id]}, headers=auth_headers, ) assert resp.status_code == 200 results = resp.json() assert len(results) == 2 ids = {r["snapshotId"] for r in results} assert snap_a.snapshot_id in ids assert snap_b.snapshot_id in ids # Summaries do not include entries for r in results: assert "entries" not in r async def test_batch_full_mode_includes_entries( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """include_entries=true returns full SnapshotResponse with entries.""" repo = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_A) await db_session.commit() resp = await client.post( f"/api/repos/{repo.repo_id}/snapshots/batch", json={"snapshotIds": [snap.snapshot_id], "includeEntries": True}, headers=auth_headers, ) assert resp.status_code == 200 result = resp.json()[0] assert result["snapshotId"] == snap.snapshot_id assert "entries" in result assert len(result["entries"]) == len(_MANIFEST_A) async def test_batch_unknown_ids_are_omitted( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Unknown snapshot IDs are silently omitted from the result.""" repo = await _make_repo(db_session) snap = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_A) await db_session.commit() resp = await client.post( f"/api/repos/{repo.repo_id}/snapshots/batch", json={"snapshotIds": [snap.snapshot_id, fake_id(_uid())]}, headers=auth_headers, ) assert resp.status_code == 200 assert len(resp.json()) == 1 # only the known snapshot async def test_batch_cross_repo_ids_are_omitted( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Snapshot IDs from a different repo are silently omitted.""" repo_a = await _make_repo(db_session) repo_b = await _make_repo(db_session) snap_a = await _make_snapshot(db_session, repo_a.repo_id, manifest=_MANIFEST_A) snap_b = await _make_snapshot(db_session, repo_b.repo_id, manifest=_MANIFEST_B) await db_session.commit() # Ask repo_a for both IDs — snap_b belongs to repo_b and must be omitted resp = await client.post( f"/api/repos/{repo_a.repo_id}/snapshots/batch", json={"snapshotIds": [snap_a.snapshot_id, snap_b.snapshot_id]}, headers=auth_headers, ) assert resp.status_code == 200 result_ids = {r["snapshotId"] for r in resp.json()} assert snap_a.snapshot_id in result_ids assert snap_b.snapshot_id not in result_ids async def test_batch_exceeds_100_returns_422( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """More than 100 snapshot IDs returns 422.""" repo = await _make_repo(db_session) await db_session.commit() resp = await client.post( f"/api/repos/{repo.repo_id}/snapshots/batch", json={"snapshotIds": [fake_id(str(i)) for i in range(101)]}, headers=auth_headers, ) assert resp.status_code == 422 async def test_batch_empty_list_returns_422( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """Empty snapshot_ids list returns 422 (min_length=1).""" repo = await _make_repo(db_session) await db_session.commit() resp = await client.post( f"/api/repos/{repo.repo_id}/snapshots/batch", json={"snapshotIds": []}, headers=auth_headers, ) assert resp.status_code == 422 async def test_batch_private_repo_requires_auth( self, client: AsyncClient, db_session: AsyncSession, ) -> None: """Batch on a private repo without auth returns 401.""" repo = await _make_repo(db_session, visibility="private") snap = await _make_snapshot(db_session, repo.repo_id, manifest=_MANIFEST_A) await db_session.commit() resp = await client.post( f"/api/repos/{repo.repo_id}/snapshots/batch", json={"snapshotIds": [snap.snapshot_id]}, ) assert resp.status_code == 401