gabriel / musehub public
musehub_snapshot.py python
723 lines 26.2 KB
Raw
sha256:f3995ec2c05c9c34b0e4d6e96349a811d0117a1c51d78096d757998ccb3c0520 fix: blobs only in S3/mpack — remove commit/snapshot indivi… Sonnet 4.6 patch 17 days ago
1 """Snapshot service — manifest reads, writes, and paginated queries.
2
3 Storage model
4 -------------
5 Every snapshot stores its ``{path: object_id}`` manifest as a single msgpack
6 BYTEA column (``manifest_blob``). A companion ``entry_count`` integer column
7 holds ``len(manifest)`` so that list and summary reads return accurate file
8 counts in O(1) without blob decoding.
9
10 The read hierarchy is therefore:
11
12 - **Count only** (list / summary): read ``entry_count`` directly — no blob
13 decode, no secondary query.
14 - **Manifest needed** (full detail, diff, tree, entries page): decode
15 ``manifest_blob`` in Python once and work with the resulting dict.
16
17 Public API
18 ----------
19 - ``upsert_snapshot_entries`` — single-snapshot write (merge, sync)
20 - ``bulk_upsert_snapshot_entries`` — O(1) bulk write for push mpacks
21 - ``get_snapshot_manifest`` — ``{path: object_id}`` for one snapshot
22 - ``get_snapshot_manifests_batch`` — bulk manifest fetch for N snapshot IDs
23 - ``get_snapshot`` — full ``SnapshotResponse`` (manifest decoded)
24 - ``get_snapshot_summary`` — lightweight ``SnapshotSummaryResponse``
25 - ``list_snapshots`` — cursor-paginated summary list
26 - ``count_snapshot_entries`` — O(1) entry count via ``entry_count`` column
27 - ``get_snapshot_entries_page`` — cursor-paginated entries from decoded blob
28 - ``get_snapshot_for_commit`` — resolve commit → snapshot in one round-trip
29 - ``diff_snapshots`` — ``SnapshotDiffResponse`` between two snapshots
30 - ``batch_get_snapshots`` — resolve up to 100 snapshot IDs in one query
31 """
32
33 import logging
34 from datetime import datetime, timezone
35
36 import msgpack
37 from sqlalchemy import ColumnElement, select
38 from sqlalchemy.dialects.postgresql import insert as pg_insert
39 from sqlalchemy.ext.asyncio import AsyncSession
40
41 from musehub.db.musehub_repo_models import MusehubCommit, MusehubSnapshot, MusehubSnapshotRef
42 from musehub.types.json_types import StrDict
43
44 type _SizeMap = dict[str, int]
45 type _SnapMap = dict[str, MusehubSnapshot]
46 from musehub.models.musehub import (
47 SnapshotDiffEntry,
48 SnapshotDiffResponse,
49 SnapshotEntryListResponse,
50 SnapshotEntryResponse,
51 SnapshotListResponse,
52 SnapshotResponse,
53 SnapshotSummaryResponse,
54 )
55
56 type ManifestDict = dict[str, str]
57 type BatchManifestDict = dict[str, ManifestDict]
58
59 logger = logging.getLogger(__name__)
60
61 # Maximum snapshot_id length accepted at write time. SHA-256 hex = 64 chars;
62 # allow longer IDs for future algorithm agility but cap to prevent abuse.
63 _MAX_SNAPSHOT_ID_LEN = 128
64
65 # Hard cap on batch lookups — prevents a single request from triggering
66 # O(n) snapshot header queries.
67 _MAX_BATCH_SIZE = 100
68
69 # msgpack encoding of {} — used as the canonical empty manifest.
70 _EMPTY_BLOB: bytes = msgpack.packb({}, use_bin_type=True)
71
72 # ---------------------------------------------------------------------------
73 # Low-level helpers
74 # ---------------------------------------------------------------------------
75
76 def _utc_now() -> datetime:
77 return datetime.now(timezone.utc)
78
79 def _decode_manifest_blob(blob: bytes) -> ManifestDict:
80 """Decode a msgpack manifest blob into a ``{path: object_id}`` dict.
81
82 Returns an empty dict for a zero-length or malformed blob so that callers
83 never have to handle ``None``.
84 """
85 if not blob:
86 return {}
87 raw = msgpack.unpackb(blob, raw=False)
88 if not isinstance(raw, dict):
89 return {}
90 return {str(k): str(v) for k, v in raw.items()}
91
92 def _entry_response(path: str, object_id: str) -> SnapshotEntryResponse:
93 """Build a ``SnapshotEntryResponse`` from a manifest path/object_id pair.
94
95 ``size_bytes`` is 0 because the manifest blob stores only
96 ``{path: object_id}`` — per-file sizes are not persisted. The model
97 documents this: *"0 when sizes were not recorded"*.
98 """
99 return SnapshotEntryResponse(path=path, object_id=object_id, size_bytes=0)
100
101 def _to_summary_response(snap: MusehubSnapshot, repo_id: str) -> SnapshotSummaryResponse:
102 """Assemble a ``SnapshotSummaryResponse`` from an ORM header row.
103
104 Reads ``entry_count`` directly from the column — no blob decode required.
105 ``total_size_bytes`` is 0 because size metadata is not stored in the blob.
106 ``repo_id`` must be supplied by the caller — it is no longer stored on
107 ``MusehubSnapshot`` (per-repo membership lives in ``MusehubSnapshotRef``).
108 """
109 return SnapshotSummaryResponse(
110 snapshot_id=snap.snapshot_id,
111 repo_id=repo_id,
112 entry_count=snap.entry_count,
113 total_size_bytes=0,
114 directories=snap.directories or [],
115 created_at=snap.created_at,
116 )
117
118 def _to_full_response(snap: MusehubSnapshot, manifest: ManifestDict, repo_id: str) -> SnapshotResponse:
119 """Assemble a full ``SnapshotResponse`` from a header row and decoded manifest.
120
121 Entries are sorted alphabetically by path — the canonical order for all
122 snapshot reads. ``total_size_bytes`` is 0 because per-file sizes are not
123 stored in the manifest blob.
124 ``repo_id`` must be supplied by the caller — it is no longer stored on
125 ``MusehubSnapshot`` (per-repo membership lives in ``MusehubSnapshotRef``).
126 """
127 sorted_paths = sorted(manifest)
128 entries = [_entry_response(p, manifest[p]) for p in sorted_paths]
129 return SnapshotResponse(
130 snapshot_id=snap.snapshot_id,
131 repo_id=repo_id,
132 directories=snap.directories or [],
133 entries=entries,
134 entry_count=snap.entry_count,
135 total_size_bytes=0,
136 created_at=snap.created_at,
137 )
138
139 async def _snap_has_ref(session: AsyncSession, repo_id: str, snapshot_id: str) -> bool:
140 """Return True when a MusehubSnapshotRef exists for (repo_id, snapshot_id)."""
141 ref = await session.get(MusehubSnapshotRef, (repo_id, snapshot_id))
142 return ref is not None
143
144 # ---------------------------------------------------------------------------
145 # Write path
146 # ---------------------------------------------------------------------------
147
148 async def upsert_snapshot_entries(
149 session: AsyncSession,
150 repo_id: str,
151 snapshot_id: str,
152 manifest: StrDict,
153 directories: list[str] | None = None,
154 size_map: _SizeMap | None = None,
155 ) -> None:
156 """Write snapshot row + repo ref, or no-op if snapshot already exists.
157
158 Inserts a ``MusehubSnapshot`` (content-addressed, globally shared) and a
159 ``MusehubSnapshotRef`` (per-repo membership) in one flush. If a row with
160 *snapshot_id* already exists the entire call is a no-op — same ID means
161 identical content and the ref is assumed to be already recorded.
162
163 Args:
164 session: Async DB session (caller owns the transaction).
165 repo_id: ID of the repo this snapshot belongs to (written to MusehubSnapshotRef).
166 snapshot_id: Content-addressed snapshot ID.
167 manifest: ``{path: object_id}`` mapping for all tracked files.
168 directories: Sorted workspace-relative directory paths. Must
169 round-trip faithfully so that client verification passes
170 on fetch/clone.
171 size_map: Ignored — not persisted. Retained so call-sites that
172 pass size information do not need to be updated.
173 """
174 if len(snapshot_id) > _MAX_SNAPSHOT_ID_LEN:
175 raise ValueError(
176 f"snapshot_id too long: {len(snapshot_id)} > {_MAX_SNAPSHOT_ID_LEN}"
177 )
178
179 existing = await session.get(MusehubSnapshot, snapshot_id)
180 if existing is not None:
181 return # content-addressed: same ID = same content; ref assumed present
182
183 dirs = sorted(directories) if directories else []
184 blob = msgpack.packb(manifest, use_bin_type=True)
185 now = _utc_now()
186 session.add(
187 MusehubSnapshot(
188 snapshot_id=snapshot_id,
189 directories=dirs,
190 manifest_blob=blob,
191 entry_count=len(manifest),
192 created_at=now,
193 )
194 )
195 session.add(
196 MusehubSnapshotRef(
197 repo_id=repo_id,
198 snapshot_id=snapshot_id,
199 created_at=now,
200 )
201 )
202 await session.flush()
203
204 async def bulk_upsert_snapshot_entries(
205 session: AsyncSession,
206 repo_id: str,
207 snapshots: list[tuple[str, ManifestDict, list[str]]],
208 ) -> None:
209 """Upsert all snapshot rows for a push mpack in one round-trip.
210
211 Snapshots are content-addressed and globally shared — no repo_id column.
212 Per-repo membership is tracked in musehub_snapshot_refs (mirrors object_refs).
213
214 Args:
215 session: Async DB session (caller owns the transaction).
216 repo_id: ID of the repo pushing these snapshots (used for ref upsert).
217 snapshots: List of ``(snapshot_id, manifest, directories)`` tuples.
218 """
219 if not snapshots:
220 return
221
222 now = _utc_now()
223 rows = [
224 {
225 "snapshot_id": snapshot_id,
226 "directories": sorted(directories) if directories else [],
227 "manifest_blob": msgpack.packb(manifest, use_bin_type=True),
228 "entry_count": len(manifest),
229 "created_at": now,
230 }
231 for snapshot_id, manifest, directories in snapshots
232 ]
233 stmt = pg_insert(MusehubSnapshot).values(rows)
234 await session.execute(stmt.on_conflict_do_nothing(index_elements=["snapshot_id"]))
235
236 ref_rows = [{"repo_id": repo_id, "snapshot_id": sid, "created_at": now} for sid, _, _ in snapshots]
237 await session.execute(
238 pg_insert(MusehubSnapshotRef).values(ref_rows)
239 .on_conflict_do_nothing(index_elements=["repo_id", "snapshot_id"])
240 )
241
242 # ---------------------------------------------------------------------------
243 # Read path — manifest helpers
244 # ---------------------------------------------------------------------------
245
246 async def get_snapshot_manifest(
247 session: AsyncSession,
248 snapshot_id: str,
249 ) -> StrDict:
250 """Return ``{path: object_id}`` for *snapshot_id*, or ``{}`` if unknown.
251
252 Decodes ``manifest_blob`` from the snapshot header row. This is the fast
253 path used by pull, tree view, and blame — callers that only need the path →
254 object_id mapping without size metadata or full ``SnapshotResponse`` shape.
255
256 Args:
257 session: Async DB session.
258 snapshot_id: Snapshot to resolve.
259
260 Returns:
261 Path-to-object-ID mapping, or an empty dict when *snapshot_id* is not
262 found.
263 """
264 snap = await session.get(MusehubSnapshot, snapshot_id)
265 if snap is None:
266 return {}
267 return _decode_manifest_blob(snap.manifest_blob)
268
269 async def get_snapshot_manifests_batch(
270 session: AsyncSession,
271 snapshot_ids: list[str],
272 ) -> BatchManifestDict:
273 """Return ``{snapshot_id: {path: object_id}}`` for all *snapshot_ids*.
274
275 Issues a single SELECT for all requested header rows, then decodes each
276 ``manifest_blob`` in Python. Unknown IDs appear as empty dicts — callers
277 must not assume presence.
278
279 Args:
280 session: Async DB session.
281 snapshot_ids: Up to ``_MAX_BATCH_SIZE`` snapshot IDs to resolve.
282
283 Returns:
284 Dict keyed by every requested snapshot_id.
285
286 Raises:
287 ValueError: When ``len(snapshot_ids) > _MAX_BATCH_SIZE``.
288 """
289 if not snapshot_ids:
290 return {}
291 if len(snapshot_ids) > _MAX_BATCH_SIZE:
292 raise ValueError(
293 f"batch size {len(snapshot_ids)} exceeds limit {_MAX_BATCH_SIZE}"
294 )
295
296 result: BatchManifestDict = {sid: {} for sid in snapshot_ids}
297 snap_rows = list(
298 (
299 await session.execute(
300 select(MusehubSnapshot).where(
301 MusehubSnapshot.snapshot_id.in_(snapshot_ids)
302 )
303 )
304 ).scalars()
305 )
306 for snap in snap_rows:
307 result[snap.snapshot_id] = _decode_manifest_blob(snap.manifest_blob)
308 return result
309
310 # ---------------------------------------------------------------------------
311 # Read API — single snapshot
312 # ---------------------------------------------------------------------------
313
314 async def get_snapshot(
315 session: AsyncSession,
316 repo_id: str,
317 snapshot_id: str,
318 ) -> SnapshotResponse | None:
319 """Return the full snapshot record including all file-tree entries.
320
321 Decodes ``manifest_blob`` and sorts entries alphabetically by path. For
322 snapshots with thousands of files, callers that only need a paginated slice
323 should use ``get_snapshot_entries_page`` instead.
324
325 Returns ``None`` when the snapshot does not exist or belongs to a different
326 repo. Repo-mismatch is treated as not-found to prevent cross-repo ID
327 enumeration.
328
329 Args:
330 session: Async DB session.
331 repo_id: Must match the snapshot's ``repo_id``.
332 snapshot_id: Snapshot to fetch.
333
334 Returns:
335 Full ``SnapshotResponse``, or ``None``.
336 """
337 snap = await session.get(MusehubSnapshot, snapshot_id)
338 if snap is None or not await _snap_has_ref(session, repo_id, snapshot_id):
339 return None
340 return _to_full_response(snap, _decode_manifest_blob(snap.manifest_blob), repo_id)
341
342 async def get_snapshot_summary(
343 session: AsyncSession,
344 repo_id: str,
345 snapshot_id: str,
346 ) -> SnapshotSummaryResponse | None:
347 """Return a lightweight snapshot summary without decoding the manifest.
348
349 Reads ``entry_count`` directly from the header column — O(1), no blob
350 decoding, no secondary query.
351
352 Returns ``None`` when the snapshot does not exist or belongs to a different
353 repo.
354
355 Args:
356 session: Async DB session.
357 repo_id: Must match the snapshot's ``repo_id``.
358 snapshot_id: Snapshot to fetch.
359
360 Returns:
361 Lightweight ``SnapshotSummaryResponse``, or ``None``.
362 """
363 snap = await session.get(MusehubSnapshot, snapshot_id)
364 if snap is None or not await _snap_has_ref(session, repo_id, snapshot_id):
365 return None
366 return _to_summary_response(snap, repo_id)
367
368 # ---------------------------------------------------------------------------
369 # Read API — list and paginate
370 # ---------------------------------------------------------------------------
371
372 async def list_snapshots(
373 session: AsyncSession,
374 repo_id: str,
375 cursor: str | None = None,
376 limit: int = 20,
377 ) -> SnapshotListResponse:
378 """Return snapshot summaries with cursor-based keyset pagination (newest first).
379
380 Each summary is assembled from the header row alone — ``entry_count`` is
381 read directly from the column so no blob decode or secondary query is
382 needed. This makes the function O(page_size) in both time and data
383 transferred regardless of manifest size.
384
385 Args:
386 session: Async DB session.
387 repo_id: Repo whose snapshots to list.
388 cursor: Opaque ISO 8601 ``created_at`` from a previous response's
389 ``nextCursor``. Omit to start from the most recent snapshot.
390 limit: Page size (default 20, capped by the route at 200).
391
392 Returns:
393 ``SnapshotListResponse`` with summaries newest-first, ``total``, and
394 ``nextCursor`` when more pages exist.
395 """
396 from sqlalchemy import func
397
398 total: int = (
399 await session.execute(
400 select(func.count(MusehubSnapshotRef.snapshot_id)).where(
401 MusehubSnapshotRef.repo_id == repo_id
402 )
403 )
404 ).scalar_one()
405
406 if total == 0:
407 return SnapshotListResponse(snapshots=[], total=0, next_cursor=None)
408
409 conditions: list[ColumnElement[bool]] = [MusehubSnapshotRef.repo_id == repo_id]
410 if cursor is not None:
411 conditions.append(
412 MusehubSnapshot.created_at < datetime.fromisoformat(cursor)
413 )
414
415 snaps = list(
416 (
417 await session.execute(
418 select(MusehubSnapshot)
419 .join(MusehubSnapshotRef, MusehubSnapshotRef.snapshot_id == MusehubSnapshot.snapshot_id)
420 .where(*conditions)
421 .order_by(MusehubSnapshot.created_at.desc())
422 .limit(limit + 1)
423 )
424 ).scalars()
425 )
426
427 next_cursor: str | None = None
428 if len(snaps) == limit + 1:
429 next_cursor = snaps[limit - 1].created_at.isoformat()
430 snaps = snaps[:limit]
431
432 return SnapshotListResponse(
433 snapshots=[_to_summary_response(s, repo_id) for s in snaps],
434 total=total,
435 next_cursor=next_cursor,
436 )
437
438 async def count_snapshot_entries(
439 session: AsyncSession,
440 snapshot_id: str,
441 ) -> int:
442 """Return the number of tracked files for *snapshot_id* in O(1).
443
444 Reads ``entry_count`` directly from the header column — no blob decode,
445 no secondary query. Returns 0 when *snapshot_id* is unknown.
446
447 Args:
448 session: Async DB session.
449 snapshot_id: Snapshot to count.
450
451 Returns:
452 Number of tracked files, or 0 if not found.
453 """
454 snap = await session.get(MusehubSnapshot, snapshot_id)
455 return snap.entry_count if snap is not None else 0
456
457 async def get_snapshot_entries_page(
458 session: AsyncSession,
459 repo_id: str,
460 snapshot_id: str,
461 cursor: str | None = None,
462 limit: int = 100,
463 ) -> SnapshotEntryListResponse | None:
464 """Return a cursor-paginated slice of file-tree entries sorted by path.
465
466 Decodes ``manifest_blob`` once, sorts paths alphabetically, then slices
467 the in-memory list using the cursor as a lower bound. ``total`` is read
468 from ``entry_count`` (O(1)) rather than ``len(manifest)`` so that a
469 second blob decode is avoided.
470
471 Verifies repo ownership before returning data — returns ``None`` when the
472 snapshot is unknown or belongs to a different repo.
473
474 Args:
475 session: Async DB session.
476 repo_id: Must match the snapshot's ``repo_id``.
477 snapshot_id: Snapshot whose entries to paginate.
478 cursor: Opaque path from a previous response's ``nextCursor``.
479 Omit to start from the first path alphabetically.
480 limit: Max entries per page (default 100).
481
482 Returns:
483 ``SnapshotEntryListResponse`` with the page slice and ``nextCursor``,
484 or ``None`` if the snapshot does not exist for this repo.
485 """
486 snap = await session.get(MusehubSnapshot, snapshot_id)
487 if snap is None or not await _snap_has_ref(session, repo_id, snapshot_id):
488 return None
489
490 manifest = _decode_manifest_blob(snap.manifest_blob)
491 # Sort once; all pagination is done in Python on this sorted list.
492 sorted_paths = sorted(manifest)
493
494 if cursor is not None:
495 sorted_paths = [p for p in sorted_paths if p > cursor]
496
497 next_cursor: str | None = None
498 if len(sorted_paths) > limit:
499 next_cursor = sorted_paths[limit - 1]
500 sorted_paths = sorted_paths[:limit]
501
502 entries = [_entry_response(p, manifest[p]) for p in sorted_paths]
503 return SnapshotEntryListResponse(
504 snapshot_id=snapshot_id,
505 entries=entries,
506 total=snap.entry_count,
507 next_cursor=next_cursor,
508 )
509
510 # ---------------------------------------------------------------------------
511 # Read API — commit → snapshot shortcut
512 # ---------------------------------------------------------------------------
513
514 async def get_snapshot_for_commit(
515 session: AsyncSession,
516 repo_id: str,
517 commit_id: str,
518 ) -> SnapshotResponse | None:
519 """Resolve a commit ID to its snapshot in one round-trip.
520
521 Looks up the commit, reads its ``snapshot_id``, then returns the full
522 ``SnapshotResponse``. Returns ``None`` when the commit is not found,
523 belongs to a different repo, or has no snapshot attached.
524
525 Args:
526 session: Async DB session.
527 repo_id: Repo the commit must belong to.
528 commit_id: Commit whose snapshot to fetch.
529
530 Returns:
531 Full ``SnapshotResponse``, or ``None``.
532 """
533 commit = (
534 await session.execute(
535 select(MusehubCommit).where(
536 MusehubCommit.commit_id == commit_id,
537 )
538 )
539 ).scalar_one_or_none()
540 if commit is None or commit.snapshot_id is None:
541 return None
542 return await get_snapshot(session, repo_id, commit.snapshot_id)
543
544 # ---------------------------------------------------------------------------
545 # Read API — diff
546 # ---------------------------------------------------------------------------
547
548 async def diff_snapshots(
549 session: AsyncSession,
550 repo_id: str,
551 snapshot_id: str,
552 base_snapshot_id: str,
553 include_unchanged: bool = False,
554 ) -> SnapshotDiffResponse | None:
555 """Compute a file-level diff between two snapshots.
556
557 Decodes both manifests and diffs the resulting ``{path: object_id}``
558 mappings. Files are classified as added, removed, modified (different
559 object_id), or unchanged. ``size_bytes`` fields are 0 because per-file
560 sizes are not stored in the manifest.
561
562 Both snapshots must belong to *repo_id* — mismatched ownership returns
563 ``None`` rather than leaking cross-repo IDs.
564
565 Args:
566 session: Async DB session.
567 repo_id: Repo both snapshots must belong to.
568 snapshot_id: The "new" snapshot.
569 base_snapshot_id: The "base" snapshot to compare against.
570 include_unchanged: When ``True``, emit ``status="unchanged"`` entries
571 for files identical in both snapshots. Off by
572 default because unchanged files dominate large repos.
573
574 Returns:
575 ``SnapshotDiffResponse``, or ``None`` if either snapshot is missing
576 or belongs to a different repo.
577 """
578 snap_new = await session.get(MusehubSnapshot, snapshot_id)
579 snap_base = await session.get(MusehubSnapshot, base_snapshot_id)
580 if snap_new is None or not await _snap_has_ref(session, repo_id, snapshot_id):
581 return None
582 if snap_base is None or not await _snap_has_ref(session, repo_id, base_snapshot_id):
583 return None
584
585 new_map = _decode_manifest_blob(snap_new.manifest_blob)
586 base_map = _decode_manifest_blob(snap_base.manifest_blob)
587
588 all_paths = sorted(set(new_map) | set(base_map))
589 changes: list[SnapshotDiffEntry] = []
590 added = removed = modified = unchanged = 0
591
592 for path in all_paths:
593 in_new = path in new_map
594 in_base = path in base_map
595
596 if in_new and not in_base:
597 added += 1
598 changes.append(
599 SnapshotDiffEntry(
600 path=path,
601 status="added",
602 base_object_id=None,
603 new_object_id=new_map[path],
604 base_size_bytes=0,
605 new_size_bytes=0,
606 )
607 )
608 elif in_base and not in_new:
609 removed += 1
610 changes.append(
611 SnapshotDiffEntry(
612 path=path,
613 status="removed",
614 base_object_id=base_map[path],
615 new_object_id=None,
616 base_size_bytes=0,
617 new_size_bytes=0,
618 )
619 )
620 elif new_map[path] != base_map[path]:
621 modified += 1
622 changes.append(
623 SnapshotDiffEntry(
624 path=path,
625 status="modified",
626 base_object_id=base_map[path],
627 new_object_id=new_map[path],
628 base_size_bytes=0,
629 new_size_bytes=0,
630 )
631 )
632 else:
633 unchanged += 1
634 if include_unchanged:
635 changes.append(
636 SnapshotDiffEntry(
637 path=path,
638 status="unchanged",
639 base_object_id=base_map[path],
640 new_object_id=new_map[path],
641 base_size_bytes=0,
642 new_size_bytes=0,
643 )
644 )
645
646 return SnapshotDiffResponse(
647 snapshot_id=snapshot_id,
648 base_snapshot_id=base_snapshot_id,
649 added_count=added,
650 removed_count=removed,
651 modified_count=modified,
652 unchanged_count=unchanged,
653 bytes_added=0,
654 bytes_removed=0,
655 changes=changes,
656 )
657
658 # ---------------------------------------------------------------------------
659 # Read API — batch
660 # ---------------------------------------------------------------------------
661
662 async def batch_get_snapshots(
663 session: AsyncSession,
664 repo_id: str,
665 snapshot_ids: list[str],
666 include_entries: bool = False,
667 ) -> list[SnapshotResponse | SnapshotSummaryResponse]:
668 """Resolve up to ``_MAX_BATCH_SIZE`` snapshot IDs in one round-trip.
669
670 Fetches all matching header rows in a single SELECT, then either decodes
671 ``manifest_blob`` for full ``SnapshotResponse`` objects or reads
672 ``entry_count`` for lightweight ``SnapshotSummaryResponse`` objects.
673
674 Unknown IDs and IDs belonging to a different repo are silently omitted.
675 Callers that need completeness guarantees should compare the returned list
676 length against their request.
677
678 Args:
679 session: Async DB session.
680 repo_id: All returned snapshots must belong to this repo.
681 snapshot_ids: Up to ``_MAX_BATCH_SIZE`` IDs to resolve.
682 include_entries: When ``True``, return full ``SnapshotResponse``
683 objects (blob decoded); otherwise return lightweight
684 ``SnapshotSummaryResponse`` objects.
685
686 Returns:
687 Responses in the same order as *snapshot_ids*, omitting unknown or
688 foreign IDs.
689
690 Raises:
691 ValueError: When ``len(snapshot_ids) > _MAX_BATCH_SIZE``.
692 """
693 if len(snapshot_ids) > _MAX_BATCH_SIZE:
694 raise ValueError(
695 f"batch size {len(snapshot_ids)} exceeds limit {_MAX_BATCH_SIZE}"
696 )
697 if not snapshot_ids:
698 return []
699
700 snaps_by_id: _SnapMap = {
701 s.snapshot_id: s
702 for s in (
703 await session.execute(
704 select(MusehubSnapshot)
705 .join(MusehubSnapshotRef, MusehubSnapshotRef.snapshot_id == MusehubSnapshot.snapshot_id)
706 .where(
707 MusehubSnapshot.snapshot_id.in_(snapshot_ids),
708 MusehubSnapshotRef.repo_id == repo_id,
709 )
710 )
711 ).scalars()
712 }
713
714 results: list[SnapshotResponse | SnapshotSummaryResponse] = []
715 for sid in snapshot_ids:
716 snap = snaps_by_id.get(sid)
717 if snap is None:
718 continue
719 if include_entries:
720 results.append(_to_full_response(snap, _decode_manifest_blob(snap.manifest_blob), repo_id))
721 else:
722 results.append(_to_summary_response(snap, repo_id))
723 return results
File History 2 commits
sha256:f3995ec2c05c9c34b0e4d6e96349a811d0117a1c51d78096d757998ccb3c0520 fix: blobs only in S3/mpack — remove commit/snapshot indivi… Sonnet 4.6 patch 17 days ago
sha256:450998d182617fa93b737cbbdb3fe956c61566051739acec8c63ec5e7b4587f8 feat(phase3): write snapshot objects to S3 at all 3 write s… Sonnet 4.6 patch 20 days ago