gabriel / musehub public
musehub_wire_fetch.py python
1,108 lines 41.4 KB Hotspot
Raw
sha256:1c5b7a0aba79472f4b10e52326dc010bdab1a498c9e195593d0707860478a034 feat(#92): phase 3 — cache lookup in wire_fetch_mpack (FMC_… Sonnet 4.6 patch 5 days ago
1 """Fetch path — wire_fetch_presign, wire_fetch_mpack, wire_fetch, process_mpack_gc_job."""
2
3 import asyncio
4 import hashlib
5 import logging
6 import msgpack as _msgpack
7 import time as _time_module
8 from datetime import datetime, timezone
9 from typing import TypedDict
10
11 from sqlalchemy import func, select, text as _sa_text
12 from sqlalchemy.dialects.postgresql import insert as _pg_insert
13 from sqlalchemy.ext.asyncio import AsyncSession
14
15 from musehub.db.musehub_repo_models import (
16 MusehubBranch,
17 MusehubCommit,
18 MusehubCommitGraph,
19 MusehubFetchMPackCache,
20 MusehubMPackIndex,
21 MusehubObject,
22 MusehubObjectRef,
23 MusehubRepo,
24 MusehubSnapshot,
25 )
26 from musehub.models.wire import WireFetchRequest
27 from muse.core.types import blob_id
28 from musehub.storage import get_backend
29
30 from musehub.services.musehub_wire_shared import (
31 FetchMPackResult,
32 FetchNotIndexedError,
33 FetchPresignResult,
34 MPackValidationError,
35 _reconstruct_manifest,
36 _snap_row_to_wire_s3,
37 _commit_to_wire_s3,
38 _to_wire_commit,
39 _utc_now,
40 logger,
41 )
42
43
44 type _CommitDeltaMap = dict[str, MusehubCommitGraph | MusehubCommit]
45
46
47 async def _walk_commit_delta(
48 session: AsyncSession,
49 want: list[str] | set[str],
50 have: list[str] | set[str],
51 ) -> _CommitDeltaMap:
52 _wcd_t0 = _time_module.perf_counter()
53 _want_list = list(want)
54 _have_list = list(have)
55 logger.info("[_walk_commit_delta] START want=%d have=%d want_ids=%s",
56 len(_want_list), len(_have_list),
57 [cid[:16] for cid in _want_list[:5]])
58
59 have_set: frozenset[str] = frozenset(_have_list)
60 starts = [cid for cid in _want_list if cid not in have_set]
61 if not starts:
62 logger.info("[_walk_commit_delta] SKIP — all want in have, 0ms")
63 return {}
64
65 if True: # fast path always active — commit graph is global (no repo_id)
66 from sqlalchemy import func as _func
67
68 want_gen_q = await session.execute(
69 select(_func.max(MusehubCommitGraph.generation))
70 .where(MusehubCommitGraph.commit_id.in_(starts))
71 )
72 _want_gen_raw = want_gen_q.scalar()
73 max_want_gen: int = _want_gen_raw or 0
74
75 _missing_from_graph: list[str] = []
76 _found_in_graph: list[tuple[str, int]] = []
77 for _scid in starts[:10]:
78 _sg = await session.execute(
79 select(MusehubCommitGraph.generation)
80 .where(MusehubCommitGraph.commit_id == _scid)
81 )
82 _sg_val = _sg.scalar_one_or_none()
83 if _sg_val is None:
84 _missing_from_graph.append(_scid[:16])
85 else:
86 _found_in_graph.append((_scid[:16], _sg_val))
87 logger.info(
88 "[_walk_commit_delta] want_gen_raw=%s max_want_gen=%d "
89 "starts_in_graph=%s starts_missing_from_graph=%s",
90 _want_gen_raw, max_want_gen, _found_in_graph, _missing_from_graph,
91 )
92
93 min_have_gen: int = -1
94 if have_set:
95 have_gen_q = await session.execute(
96 select(_func.max(MusehubCommitGraph.generation))
97 .where(MusehubCommitGraph.commit_id.in_(list(have_set)))
98 )
99 min_have_gen = have_gen_q.scalar() or -1
100
101 range_q = await session.execute(
102 select(
103 MusehubCommitGraph.commit_id,
104 MusehubCommitGraph.parent_ids,
105 MusehubCommitGraph.snapshot_id,
106 MusehubCommitGraph.generation,
107 )
108 .where(MusehubCommitGraph.generation > min_have_gen)
109 .where(MusehubCommitGraph.generation <= max_want_gen)
110 )
111 graph_map: dict[str, tuple[list[str], str | None, int]] = {
112 cid: (pids or [], sid, gen) for cid, pids, sid, gen in range_q
113 }
114 logger.info(
115 "[_walk_commit_delta] range_scan gen=(%d,%d] returned %d rows "
116 "starts_in_map=%s",
117 min_have_gen, max_want_gen, len(graph_map),
118 [cid[:16] for cid in starts if cid in graph_map],
119 )
120
121 visited_mem: set[str] = set(have_set)
122 frontier_mem = [cid for cid in starts if cid not in visited_mem]
123 reachable_cids: set[str] = set()
124 while frontier_mem:
125 next_mem: list[str] = []
126 for cid in frontier_mem:
127 if cid in visited_mem:
128 continue
129 visited_mem.add(cid)
130 reachable_cids.add(cid)
131 pids_for_cid, _, _gen = graph_map.get(cid, ([], None, 0))
132 for p in pids_for_cid:
133 if p not in visited_mem and p not in have_set:
134 next_mem.append(p)
135 frontier_mem = next_mem
136
137 from types import SimpleNamespace as _SN
138 # Sort by generation ASC so parents appear before children in the dict.
139 # reachable_cids is a set — iteration order is undefined — and the client
140 # applies commits sequentially, skipping any whose parent hasn't been applied
141 # yet. Ascending generation = topological order = every parent precedes its
142 # children in the mpack.
143 sorted_cids = sorted(reachable_cids, key=lambda c: graph_map.get(c, ([], None, 0))[2])
144 needed_graph: dict[str, _SN] = {}
145 for cid in sorted_cids:
146 pids_ns, sid_ns, _ = graph_map.get(cid, ([], None, 0))
147 needed_graph[cid] = _SN(commit_id=cid, snapshot_id=sid_ns, parent_ids=pids_ns)
148
149 _wcd_elapsed = (_time_module.perf_counter() - _wcd_t0) * 1000
150 logger.info(
151 "[_walk_commit_delta] DONE (graph) commits=%d elapsed=%.1fms (%.3fms/commit) "
152 "gen_range=(%d,%d] graph_rows=%d reachable=%d",
153 len(needed_graph), _wcd_elapsed, _wcd_elapsed / max(len(needed_graph), 1),
154 min_have_gen, max_want_gen, len(graph_map), len(reachable_cids),
155 )
156 return needed_graph # type: ignore[return-value]
157
158 # Legacy fallback
159 from musehub.graph.walk import walk_dag_async
160
161 _row_cache: dict[str, MusehubCommit] = {}
162 _db_calls = 0
163
164 async def _adj(cid: str) -> list[str]:
165 nonlocal _db_calls
166 _db_calls += 1
167 row = await session.get(MusehubCommit, cid)
168 if row is not None:
169 _row_cache[cid] = row
170 return row.parent_ids or [] if row else []
171
172 needed_legacy: dict[str, MusehubCommit] = {}
173 async for cid in walk_dag_async(starts, _adj, exclude=have_set):
174 if cid in _row_cache:
175 needed_legacy[cid] = _row_cache[cid]
176
177 _wcd_elapsed = (_time_module.perf_counter() - _wcd_t0) * 1000
178 logger.info(
179 "[_walk_commit_delta] DONE (legacy) commits=%d db_calls=%d elapsed=%.1fms (%.2fms/commit)",
180 len(needed_legacy), _db_calls, _wcd_elapsed,
181 _wcd_elapsed / max(len(needed_legacy), 1),
182 )
183 return needed_legacy
184
185
186 async def wire_fetch_presign(
187 session: AsyncSession,
188 repo_id: str,
189 req: WireFetchRequest,
190 ttl_seconds: int = 3600,
191 ) -> FetchPresignResult:
192 import asyncio
193 from datetime import timedelta
194
195 _empty: FetchPresignResult = {
196 "presign": False,
197 "blob_urls": {},
198 "commits": [],
199 "snapshots": [],
200 "branch_heads": {},
201 "repo_id": repo_id,
202 "domain": "",
203 "default_branch": "main",
204 "expires_at": None,
205 "commit_count": 0,
206 "blob_count": 0,
207 }
208
209 if not req.want:
210 return _empty
211
212 repo_row = await session.get(MusehubRepo, repo_id)
213 if repo_row is None:
214 return _empty
215 _domain: str = repo_row.domain_id or ""
216 _default_branch: str = repo_row.default_branch if repo_row.default_branch else "main"
217 _empty["domain"] = _domain
218 _empty["default_branch"] = _default_branch
219 _empty["repo_id"] = repo_id
220
221 have_set = set(req.have)
222 needed_rows = await _walk_commit_delta(session, req.want, have_set)
223
224 if not needed_rows:
225 return {**_empty, "domain": _domain, "default_branch": _default_branch}
226
227 _presign_commit_rows: dict[str, MusehubCommit] = {}
228 _presign_any = next(iter(needed_rows.values()))
229 if not isinstance(_presign_any, MusehubCommit):
230 _PRESIGN_WIRE_BATCH = 2000
231 _presign_cids = list(needed_rows.keys())
232 for _pi in range(0, len(_presign_cids), _PRESIGN_WIRE_BATCH):
233 _pq = await session.execute(
234 select(MusehubCommit).where(MusehubCommit.commit_id.in_(_presign_cids[_pi : _pi + _PRESIGN_WIRE_BATCH]))
235 )
236 for _pr in _pq.scalars():
237 _presign_commit_rows[_pr.commit_id] = _pr
238 else:
239 _presign_commit_rows = needed_rows # type: ignore[assignment]
240
241 snap_ids = [r.snapshot_id for r in needed_rows.values() if r.snapshot_id]
242 all_oids: set[str] = set()
243 if snap_ids:
244 snaps_q = await session.execute(
245 select(MusehubSnapshot).where(MusehubSnapshot.snapshot_id.in_(snap_ids))
246 )
247 for snap in snaps_q.scalars().all():
248 manifest = (
249 _msgpack.unpackb(snap.manifest_blob, raw=False)
250 if snap.manifest_blob
251 else await _reconstruct_manifest(session, snap.snapshot_id)
252 )
253 all_oids.update(v for v in manifest.values() if v)
254
255 have_snap_ids: list[str] = []
256 if have_set:
257 have_commits_q = await session.execute(
258 select(MusehubCommit).where(MusehubCommit.commit_id.in_(have_set))
259 )
260 have_snap_ids = [r.snapshot_id for r in have_commits_q.scalars().all() if r.snapshot_id]
261 have_oids: set[str] = set()
262 if have_snap_ids:
263 have_snaps_q = await session.execute(
264 select(MusehubSnapshot).where(MusehubSnapshot.snapshot_id.in_(have_snap_ids))
265 )
266 for snap in have_snaps_q.scalars().all():
267 manifest = (
268 _msgpack.unpackb(snap.manifest_blob, raw=False)
269 if snap.manifest_blob
270 else await _reconstruct_manifest(session, snap.snapshot_id)
271 )
272 have_oids.update(v for v in manifest.values() if v)
273
274 new_oids = all_oids - have_oids
275 n_objects = len(new_oids)
276 n_commits = len(needed_rows)
277
278 total_size = 0
279 if new_oids:
280 size_q = await session.execute(
281 select(func.coalesce(func.sum(MusehubObject.size_bytes), 0)).where(
282 MusehubObject.object_id.in_(list(new_oids))
283 )
284 )
285 total_size = int(size_q.scalar() or 0)
286
287 backend = get_backend()
288
289 wire_commits = [
290 (await _commit_to_wire_s3(row, backend)).model_dump()
291 for row in _presign_commit_rows.values()
292 ]
293
294 snap_rows_q = await session.execute(
295 select(MusehubSnapshot).where(MusehubSnapshot.snapshot_id.in_(snap_ids))
296 )
297 wire_snaps = [
298 await _snap_row_to_wire_s3(snap, backend, session=session)
299 for snap in snap_rows_q.scalars().all()
300 ]
301
302 branch_rows_q = await session.execute(
303 select(MusehubBranch).where(MusehubBranch.repo_id == repo_id)
304 )
305 branch_heads = {
306 b.name: b.head_commit_id
307 for b in branch_rows_q.scalars().all()
308 if b.head_commit_id
309 }
310
311 sem = asyncio.Semaphore(50)
312
313 logger.info(
314 "fetch/presign: generating %d presigned GET URLs repo=%s/%s",
315 len(new_oids), repo_row.owner, repo_row.slug,
316 )
317
318 async def _presign_one(oid: str) -> tuple[str, str]:
319 async with sem:
320 url = await backend.presign_get(oid, ttl_seconds)
321 logger.debug("fetch/presign: presigned oid=%s", oid)
322 return oid, url
323
324 pairs = await asyncio.gather(*(_presign_one(oid) for oid in new_oids))
325 blob_urls = {oid: url for oid, url in pairs}
326 expires_at = (_utc_now() + timedelta(seconds=ttl_seconds)).isoformat()
327
328 return {
329 "presign": True,
330 "blob_urls": blob_urls,
331 "commits": wire_commits,
332 "snapshots": wire_snaps,
333 "branch_heads": branch_heads,
334 "repo_id": repo_id,
335 "domain": _domain,
336 "default_branch": _default_branch,
337 "expires_at": expires_at,
338 "commit_count": n_commits,
339 "blob_count": n_objects,
340 }
341
342 async def wire_fetch_mpack(
343 session: AsyncSession,
344 repo_id: str,
345 want: list[str],
346 have: list[str],
347 ttl_seconds: int = 3600,
348 ) -> FetchMPackResult:
349 import msgpack as _msgpack_local
350
351 _t0 = _time_module.perf_counter()
352 def _ms() -> float:
353 return (_time_module.perf_counter() - _t0) * 1000
354
355 logger.info("[wire_fetch_mpack] START repo_id=%s want=%d have=%d want_ids=%s",
356 repo_id, len(want), len(have), [cid[:16] for cid in want[:5]])
357
358 _up_to_date: FetchMPackResult = {
359 "mpack_url": None,
360 "mpack_id": None,
361 "commit_count": 0,
362 "blob_count": 0,
363 }
364
365 if not want:
366 logger.info("[wire_fetch_mpack] SKIP — want is empty")
367 return _up_to_date
368
369 backend = get_backend()
370
371 # FMC_09 / FMC_10 / FMC_12 — cache lookup for fresh clones (have=[]).
372 # For multi-tip requests, all tips must map to the same mpack_id (built by
373 # the prebuild as one combined mpack).
374 if not have:
375 _cache_t0 = _time_module.perf_counter()
376 _cached_rows = (await session.execute(
377 select(MusehubFetchMPackCache)
378 .where(MusehubFetchMPackCache.repo_id == repo_id)
379 .where(MusehubFetchMPackCache.tip_commit_id.in_(want))
380 .where(MusehubFetchMPackCache.expires_at > _utc_now())
381 )).scalars().all()
382 _cache_ms = (_time_module.perf_counter() - _cache_t0) * 1000
383 _cached_tips = {r.tip_commit_id: r.mpack_id for r in _cached_rows}
384 _mpack_ids = set(_cached_tips.values())
385 if len(_cached_tips) == len(want) and len(_mpack_ids) == 1:
386 _hit_mpack_id = next(iter(_mpack_ids))
387 _cached_url = await backend.presign_mpack_get(_hit_mpack_id, ttl_seconds)
388 logger.warning(
389 "[wire_fetch_mpack] cache=HIT tips=%d mpack_id=%s t=%.1fms",
390 len(want), _hit_mpack_id[:20], _cache_ms,
391 )
392 return {
393 "mpack_url": _cached_url,
394 "mpack_id": _hit_mpack_id,
395 "commit_count": 0,
396 "blob_count": 0,
397 }
398 logger.warning(
399 "[wire_fetch_mpack] cache=MISS tips=%d cached_tips=%d t=%.1fms",
400 len(want), len(_cached_tips), _cache_ms,
401 )
402
403 have_set = set(have)
404 logger.info("[wire_fetch_mpack] step=1 DAG walk starting t=%.1fms", _ms())
405 needed_rows = await _walk_commit_delta(session, want, have_set)
406 logger.info("[wire_fetch_mpack] step=1 DAG walk done commits=%d t=%.1fms", len(needed_rows), _ms())
407
408 if not needed_rows:
409 logger.info("[wire_fetch_mpack] SKIP — client already up-to-date (needed_rows empty)")
410 return _up_to_date
411
412 commit_rows: dict[str, MusehubCommit] = {}
413 _any = next(iter(needed_rows.values()))
414 _is_proxy = not isinstance(_any, MusehubCommit)
415 logger.info("[wire_fetch_mpack] step=1b needed_rows=%d is_proxy=%s t=%.1fms",
416 len(needed_rows), _is_proxy, _ms())
417 if _is_proxy:
418 _cids = list(needed_rows.keys())
419 _q = await session.execute(
420 select(MusehubCommit).where(
421 _sa_text("commit_id = ANY(:ids)").bindparams(ids=_cids)
422 )
423 )
424 for _row in _q.scalars():
425 commit_rows[_row.commit_id] = _row
426 _missing_from_db = set(_cids) - set(commit_rows.keys())
427 logger.info(
428 "[wire_fetch_mpack] step=1b bulk fetch done commits_in_db=%d missing_from_db=%d "
429 "missing_ids=%s t=%.1fms",
430 len(commit_rows), len(_missing_from_db),
431 [cid[:16] for cid in list(_missing_from_db)[:5]], _ms(),
432 )
433 else:
434 commit_rows = needed_rows # type: ignore[assignment]
435 logger.info("[wire_fetch_mpack] step=1b using MusehubCommit rows directly commits=%d t=%.1fms",
436 len(commit_rows), _ms())
437
438 _proxy_snap_ids_raw = [r.snapshot_id for r in needed_rows.values()]
439 _graph_snap_ids = [sid for sid in _proxy_snap_ids_raw if sid]
440 _proxy_snap_none_count = sum(1 for s in _proxy_snap_ids_raw if not s)
441 _commit_row_snap_ids = [r.snapshot_id for r in commit_rows.values() if r.snapshot_id]
442 # Defensive fallback: CommitGraph rows from server-side merges may have snapshot_id=None
443 # (pre-fix state). The MusehubCommit row always has the correct snapshot_id — merge both.
444 snap_ids = list({*_graph_snap_ids, *_commit_row_snap_ids})
445 logger.info(
446 "[wire_fetch_mpack] step=2 snap_ids_from_graph=%d snap_ids_none_in_graph=%d "
447 "snap_ids_from_commit_rows=%d snap_ids_total=%d t=%.1fms",
448 len(_graph_snap_ids), _proxy_snap_none_count, len(_commit_row_snap_ids), len(snap_ids), _ms(),
449 )
450
451 snap_map: dict[str, dict] = {}
452 if snap_ids:
453 snaps_q = await session.execute(
454 select(MusehubSnapshot).where(
455 _sa_text("snapshot_id = ANY(:ids)").bindparams(ids=snap_ids)
456 )
457 )
458 for snap in snaps_q.scalars().all():
459 snap_map[snap.snapshot_id] = await _snap_row_to_wire_s3(snap, backend, session=session)
460 logger.info("[wire_fetch_mpack] step=2 snap_map loaded=%d t=%.1fms", len(snap_map), _ms())
461
462 all_oids: set[str] = set()
463 _needed_cids = list(needed_rows.keys())
464 logger.warning("[GRAPH-DEBUG] wire_fetch_mpack: needed_rows=%d needed_cids_sample=%s",
465 len(_needed_cids), [c[:16] for c in _needed_cids[:3]])
466 _debug_graph_q = await session.execute(
467 select(MusehubCommitGraph.commit_id, MusehubCommitGraph.generation, MusehubCommitGraph.snapshot_id)
468 .where(MusehubCommitGraph.commit_id.in_(_needed_cids))
469 .order_by(MusehubCommitGraph.generation.desc())
470 .limit(5)
471 )
472 _debug_graph_rows = _debug_graph_q.all()
473 logger.warning("[GRAPH-DEBUG] wire_fetch_mpack: CommitGraph has %d rows for needed_cids (top 5 by gen): %s",
474 len(_debug_graph_rows),
475 [(r[1], r[0][:16]) for r in _debug_graph_rows])
476 want_tip_snap_q = await session.execute(
477 select(MusehubCommitGraph.snapshot_id)
478 .where(MusehubCommitGraph.commit_id.in_(_needed_cids))
479 .order_by(MusehubCommitGraph.generation.desc())
480 .limit(1)
481 )
482 want_tip_snap_id = want_tip_snap_q.scalar_one_or_none()
483 logger.warning("[GRAPH-DEBUG] wire_fetch_mpack: want_tip_snap_id=%s",
484 want_tip_snap_id[:20] if want_tip_snap_id else "NONE")
485 logger.info("[wire_fetch_mpack] step=2 want_tip_snap_id=%s (from CommitGraph) t=%.1fms",
486 want_tip_snap_id[:16] if want_tip_snap_id else None, _ms())
487 if want_tip_snap_id:
488 wt_blob_q = await session.execute(
489 select(MusehubSnapshot.manifest_blob)
490 .where(MusehubSnapshot.snapshot_id == want_tip_snap_id)
491 )
492 wt_blob = wt_blob_q.scalar_one_or_none()
493 if wt_blob:
494 all_oids.update(v for v in _msgpack_local.unpackb(wt_blob, raw=False).values() if v)
495 logger.warning("[GRAPH-DEBUG] wire_fetch_mpack: want_tip manifest all_oids=%d wt_blob_present=%s",
496 len(all_oids), wt_blob is not None)
497 logger.info("[wire_fetch_mpack] step=2 want_tip manifest all_oids=%d wt_blob_present=%s t=%.1fms",
498 len(all_oids), wt_blob is not None, _ms())
499 else:
500 logger.warning(
501 "[wire_fetch_mpack] step=2 WARN want_tip_snap_id=None — CommitGraph missing tip "
502 "needed_cids=%s commit_rows_snap_ids=%s",
503 [cid[:16] for cid in list(needed_rows.keys())[:5]],
504 [sid[:16] for sid in _commit_row_snap_ids[:5]],
505 )
506
507 have_oids: set[str] = set()
508 if have_set:
509 ht_snap_q = await session.execute(
510 select(MusehubCommitGraph.snapshot_id)
511 .where(MusehubCommitGraph.commit_id.in_(list(have_set)))
512 .order_by(MusehubCommitGraph.generation.desc())
513 .limit(1)
514 )
515 have_tip_snap_id = ht_snap_q.scalar_one_or_none()
516 if have_tip_snap_id:
517 ht_blob_q = await session.execute(
518 select(MusehubSnapshot.manifest_blob)
519 .where(MusehubSnapshot.snapshot_id == have_tip_snap_id)
520 )
521 ht_blob = ht_blob_q.scalar_one_or_none()
522 if ht_blob:
523 have_oids.update(v for v in _msgpack_local.unpackb(ht_blob, raw=False).values() if v)
524
525 new_oids = all_oids - have_oids
526 logger.info(
527 "[wire_fetch_mpack] step=2 done snap_map=%d all_oids=%d have_oids=%d new_oids=%d t=%.1fms",
528 len(snap_map), len(all_oids), len(have_oids), len(new_oids), _ms(),
529 )
530
531 if new_oids:
532 indexed_q = await session.execute(
533 select(MusehubMPackIndex.entity_id)
534 .where(MusehubMPackIndex.entity_id.in_(list(new_oids)))
535 .where(MusehubMPackIndex.entity_type == "object")
536 )
537 indexed_oids = {row[0] for row in indexed_q}
538 missing = new_oids - indexed_oids
539 if missing:
540 logger.warning(
541 "[wire_fetch_mpack] step=3 NOT INDEXED %d/%d objects — raising FetchNotIndexedError t=%.1fms",
542 len(missing), len(new_oids), _ms(),
543 )
544 raise FetchNotIndexedError(len(missing))
545 logger.info("[wire_fetch_mpack] step=3 index coverage OK oids=%d t=%.1fms", len(new_oids), _ms())
546
547 cache_hits: dict[str, bytes] = {}
548 if new_oids:
549 _CACHE_CHUNK = 10000
550 _new_oid_list = list(new_oids)
551 for _ci in range(0, len(_new_oid_list), _CACHE_CHUNK):
552 _chunk = _new_oid_list[_ci : _ci + _CACHE_CHUNK]
553 _cache_q = await session.execute(
554 select(MusehubObject.object_id, MusehubObject.content_cache)
555 .where(MusehubObject.object_id.in_(_chunk))
556 .where(MusehubObject.content_cache.isnot(None))
557 )
558 for _oid, _cached in _cache_q:
559 if _cached:
560 cache_hits[_oid] = bytes(_cached)
561
562 cache_miss_oids = [oid for oid in new_oids if oid not in cache_hits]
563
564 oid_to_mpack: dict[str, str] = {}
565 if cache_miss_oids:
566 _MIDX_CHUNK = 10000
567 for _ci in range(0, len(cache_miss_oids), _MIDX_CHUNK):
568 _chunk = cache_miss_oids[_ci : _ci + _MIDX_CHUNK]
569 _midx_q = await session.execute(
570 select(MusehubMPackIndex.entity_id, MusehubMPackIndex.mpack_id)
571 .where(MusehubMPackIndex.entity_id.in_(_chunk))
572 .where(MusehubMPackIndex.entity_type == "object")
573 )
574 for _oid, _mid in _midx_q:
575 oid_to_mpack[_oid] = _mid
576
577 mpack_to_oids: dict[str, list[str]] = {}
578 no_mpack_oids: list[str] = []
579 for oid in cache_miss_oids:
580 mid = oid_to_mpack.get(oid)
581 if mid:
582 mpack_to_oids.setdefault(mid, []).append(oid)
583 else:
584 no_mpack_oids.append(oid)
585
586 mpack_hits: dict[str, bytes] = {}
587 mpack_miss_oids: list[str] = []
588 _sem_mpack = asyncio.Semaphore(8)
589
590 async def _extract_from_mpack(mpack_id: str, oids: list[str]) -> None:
591 async with _sem_mpack:
592 raw = await backend.get_mpack(mpack_id)
593 if raw is None:
594 mpack_miss_oids.extend(oids)
595 return
596 import zstandard as _zstd_phase1
597 _dctx_phase1 = _zstd_phase1.ZstdDecompressor()
598 try:
599 if raw[:4] == b"MUSE":
600 from muse.core.mpack import parse_wire_mpack as _parse_wire_fetch
601 payload = _parse_wire_fetch(raw)
602 else:
603 payload = _msgpack_local.unpackb(raw, raw=False)
604 except Exception as _parse_err:
605 logger.warning(
606 "[_extract_from_mpack] failed to parse mpack=%s: %s",
607 mpack_id[:20], _parse_err,
608 )
609 mpack_miss_oids.extend(oids)
610 return
611 obj_index: dict[str, bytes] = {}
612 for o in payload.get("blobs", []):
613 oid_entry = o.get("object_id", "")
614 content = o.get("content") or b""
615 if not isinstance(content, bytes):
616 content = bytes(content)
617 _ZSTD_MAGIC = b"\x28\xb5\x2f\xfd"
618 if (o.get("encoding") == "zstd" or content[:4] == _ZSTD_MAGIC) and content:
619 try:
620 content = _dctx_phase1.decompress(content)
621 except Exception as _decomp_err:
622 logger.warning(
623 "[_extract_from_mpack] zstd decompress failed oid=%s: %s",
624 oid_entry[:20], _decomp_err,
625 )
626 continue
627 obj_index[oid_entry] = content
628 for oid in oids:
629 content = obj_index.get(oid)
630 if content is not None:
631 mpack_hits[oid] = content
632 else:
633 mpack_miss_oids.append(oid)
634
635 if mpack_to_oids:
636 await asyncio.gather(
637 *(_extract_from_mpack(mid, oids) for mid, oids in mpack_to_oids.items())
638 )
639
640 legacy_hits: dict[str, bytes] = {}
641 _fallback_oids = no_mpack_oids + mpack_miss_oids
642 if _fallback_oids:
643 _sem_legacy = asyncio.Semaphore(50)
644
645 async def _get_legacy(oid: str) -> None:
646 async with _sem_legacy:
647 data = await backend.get(oid)
648 if data:
649 legacy_hits[oid] = data
650
651 await asyncio.gather(*(_get_legacy(oid) for oid in _fallback_oids))
652
653 _all_blob_bytes: dict[str, bytes] = {**legacy_hits, **mpack_hits, **cache_hits}
654 blob_pairs = [(oid, _all_blob_bytes[oid]) for oid in new_oids if oid in _all_blob_bytes]
655 logger.info(
656 "[wire_fetch_mpack] step=4 fetched %d blobs (cache=%d mpack=%d legacy=%d) t=%.1fms",
657 len(blob_pairs), len(cache_hits), len(mpack_hits), len(legacy_hits), _ms(),
658 )
659
660 wire_commits = [
661 (await _commit_to_wire_s3(commit_rows[cid], backend)).model_dump()
662 for cid in needed_rows.keys()
663 if cid in commit_rows
664 ]
665 wire_snaps = [snap_map[sid] for sid in snap_ids if sid in snap_map]
666 wire_blobs = [
667 {"object_id": oid, "content": data}
668 for oid, data in blob_pairs
669 if data
670 ]
671 logger.info(
672 "[wire_fetch_mpack] step=5 assembly: wire_commits=%d wire_snaps=%d wire_blobs=%d "
673 "snap_ids_total=%d snap_ids_in_map=%d commit_rows=%d t=%.1fms",
674 len(wire_commits), len(wire_snaps), len(wire_blobs),
675 len(snap_ids), sum(1 for sid in snap_ids if sid in snap_map),
676 len(commit_rows), _ms(),
677 )
678
679 from muse.core.mpack import build_wire_mpack as _build_wire_mpack
680 _head_commit_id = want[0] if want else ""
681 mpack_bytes = _build_wire_mpack(
682 {
683 "commits": wire_commits,
684 "snapshots": wire_snaps,
685 "blobs": wire_blobs,
686 "tags": [],
687 },
688 meta={"repo_id": repo_id, "head_commit_id": _head_commit_id},
689 )
690 mpack_id = blob_id(mpack_bytes)
691
692 n_commits = len(wire_commits)
693 n_blobs = len(wire_blobs)
694 logger.info(
695 "[wire_fetch_mpack] step=5 assembled commits=%d snapshots=%d blobs=%d bytes=%d t=%.1fms",
696 n_commits, len(wire_snaps), n_blobs, len(mpack_bytes), _ms(),
697 )
698
699 await backend.put_mpack(mpack_id, mpack_bytes)
700 mpack_url = await backend.presign_mpack_get(mpack_id, ttl_seconds)
701 logger.info(
702 "[wire_fetch_mpack] step=6 mpack_id=%s mpack_url=%s t=%.1fms",
703 mpack_id[:20], mpack_url[:80] if mpack_url else None, _ms(),
704 )
705 logger.info("[wire_fetch_mpack] RETURN commits=%d blobs=%d TOTAL=%.1fms", n_commits, n_blobs, _ms())
706
707 # FMC_11 — on a fresh-clone miss, write a cache row per tip so the next clone is a hit.
708 # For multi-tip requests all tips get the same mpack_id so the multi-tip cache
709 # check (len(cached_tips)==len(want) and single mpack_id) will fire on the next request.
710 if not have:
711 from datetime import timedelta as _timedelta
712 _miss_expires = _utc_now() + _timedelta(days=7)
713 for _tip in want:
714 _miss_cache_id = blob_id((repo_id + _tip).encode()).replace("sha256:", "")
715 await session.execute(
716 _pg_insert(MusehubFetchMPackCache)
717 .values(
718 cache_id=_miss_cache_id,
719 repo_id=repo_id,
720 tip_commit_id=_tip,
721 mpack_id=mpack_id,
722 created_at=_utc_now(),
723 expires_at=_miss_expires,
724 )
725 .on_conflict_do_update(
726 index_elements=["repo_id", "tip_commit_id"],
727 set_={"mpack_id": mpack_id, "expires_at": _miss_expires},
728 )
729 )
730
731 return {
732 "mpack_url": mpack_url,
733 "mpack_id": mpack_id,
734 "commit_count": n_commits,
735 "blob_count": n_blobs,
736 }
737
738 async def _check_missing_objects(
739 session: AsyncSession,
740 needs_check: set[str],
741 ) -> set[str]:
742 if not needs_check:
743 return set()
744 from musehub.db.musehub_repo_models import MusehubObject
745 registered: set[str] = set(
746 (await session.execute(
747 select(MusehubObject.object_id).where(
748 MusehubObject.object_id.in_(list(needs_check)),
749 MusehubObject.deleted_at.is_(None),
750 )
751 )).scalars().all()
752 )
753 return needs_check - registered
754
755
756 class MPackGCResult(TypedDict):
757 skipped: bool
758 packs_before: int
759 packs_after: int
760 consolidated_key: str
761
762
763 async def process_mpack_gc_job(session: AsyncSession, repo_id: str) -> MPackGCResult:
764 import msgpack as _mp
765
766 _skipped: MPackGCResult = {
767 "skipped": True,
768 "packs_before": 0,
769 "packs_after": 0,
770 "consolidated_key": "",
771 }
772
773 repo_oids_q = await session.execute(
774 select(MusehubObjectRef.object_id)
775 .where(MusehubObjectRef.repo_id == repo_id)
776 )
777 repo_oid_set = {row[0] for row in repo_oids_q}
778 mpack_q = await session.execute(
779 select(MusehubMPackIndex.mpack_id)
780 .where(MusehubMPackIndex.entity_id.in_(list(repo_oid_set)))
781 .where(MusehubMPackIndex.entity_type == "object")
782 .distinct()
783 )
784 mpack_ids = [row[0] for row in mpack_q]
785 packs_before = len(mpack_ids)
786
787 if packs_before <= 1:
788 _skipped["packs_before"] = packs_before
789 if mpack_ids:
790 _skipped["consolidated_key"] = mpack_ids[0]
791 return _skipped
792
793 import musehub.storage.backends as _backends_mod
794 backend = _backends_mod.get_backend()
795
796 merged_objects: dict[str, bytes] = {}
797
798 async def _download(pid: str) -> None:
799 raw = await backend.get_mpack(pid)
800 if not raw:
801 logger.warning("[mpack_gc] mpack not found in storage: %s", pid)
802 return
803 if raw[:4] == b"MUSE":
804 from muse.core.mpack import parse_wire_mpack as _parse_gc
805 _parsed = _parse_gc(raw)
806 else:
807 _parsed = _mp.unpackb(raw, raw=False)
808 for obj in _parsed.get("blobs", []):
809 oid = obj.get("object_id", "")
810 content = obj.get("content", b"")
811 if oid and oid not in merged_objects:
812 merged_objects[oid] = content
813
814 await asyncio.gather(*(_download(pid) for pid in mpack_ids))
815
816 from muse.core.mpack import build_wire_mpack as _build_gc_mpack
817 consolidated_bytes = _build_gc_mpack({
818 "commits": [],
819 "snapshots": [],
820 "blobs": [
821 {"object_id": oid, "content": merged_objects[oid]}
822 for oid in sorted(merged_objects)
823 ],
824 "tags": [],
825 })
826 consolidated_key = "sha256:" + hashlib.sha256(consolidated_bytes).hexdigest()
827
828 await backend.put_mpack(consolidated_key, consolidated_bytes)
829
830 old_mpack_ids = [p for p in mpack_ids if p != consolidated_key]
831 if old_mpack_ids:
832 from sqlalchemy import delete as sa_delete
833 await session.execute(
834 sa_delete(MusehubMPackIndex)
835 .where(MusehubMPackIndex.mpack_id.in_(old_mpack_ids))
836 .where(MusehubMPackIndex.entity_type == "object")
837 )
838 _gc_now = datetime.now(timezone.utc)
839 new_rows = [
840 {
841 "entity_id": oid,
842 "mpack_id": consolidated_key,
843 "entity_type": "object",
844 "created_at": _gc_now,
845 }
846 for oid in merged_objects
847 ]
848 if new_rows:
849 _GC_MIDX_CHUNK = 5000
850 for _gmi in range(0, len(new_rows), _GC_MIDX_CHUNK):
851 await session.execute(
852 _pg_insert(MusehubMPackIndex)
853 .values(new_rows[_gmi : _gmi + _GC_MIDX_CHUNK])
854 .on_conflict_do_nothing(index_elements=["entity_id", "mpack_id"])
855 )
856
857 logger.info(
858 "[mpack_gc] repo=%s consolidated %d mpacks → 1 (objects=%d key=%s)",
859 repo_id, packs_before, len(merged_objects), consolidated_key,
860 )
861
862 return {
863 "skipped": False,
864 "packs_before": packs_before,
865 "packs_after": 1,
866 "consolidated_key": consolidated_key,
867 }
868
869
870 class FetchResult(TypedDict):
871 mpack_id: str
872 mpack_url: str | None
873 commit_count: int
874 blob_count: int
875
876
877 class FetchMPackPrebuildResult(TypedDict):
878 tips_requested: int
879 tips_built: int
880 tips_skipped: int
881 elapsed_ms: float
882
883
884 async def process_fetch_mpack_prebuild_job(
885 session: AsyncSession,
886 job_id: str,
887 ) -> FetchMPackPrebuildResult:
888 """Build and cache a fetch mpack for every branch tip in the job payload.
889
890 Called by the background worker after every push. For each tip commit ID
891 in ``payload["tip_commit_ids"]``, checks whether a fresh cache entry
892 already exists in ``musehub_fetch_mpack_cache``; skips tips that are
893 cached and builds the rest by calling ``wire_fetch_mpack``.
894
895 The mpack_id returned by ``wire_fetch_mpack`` is written (or upserted) into
896 ``musehub_fetch_mpack_cache`` so that subsequent fetch requests hit the
897 cache and return a presigned URL in under a second.
898 """
899 from datetime import timedelta
900
901 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
902 from sqlalchemy.dialects.postgresql import insert as _upsert
903
904 _t0 = _time_module.monotonic()
905
906 def _ms() -> float:
907 return (_time_module.monotonic() - _t0) * 1000
908
909 job_row = (await session.execute(
910 select(MusehubBackgroundJob).where(MusehubBackgroundJob.job_id == job_id)
911 )).scalar_one_or_none()
912 if job_row is None:
913 raise ValueError(f"fetch.mpack.prebuild job not found: {job_id}")
914
915 repo_id: str = job_row.repo_id
916 payload = job_row.payload or {}
917 tip_commit_ids: list[str] = [str(t) for t in (payload.get("tip_commit_ids") or [])]
918
919 if not tip_commit_ids:
920 logger.warning("[fetch.mpack.prebuild] job=%s repo=%s no tip_commit_ids in payload", job_id[:16], repo_id)
921 return {"tips_requested": 0, "tips_built": 0, "tips_skipped": 0, "elapsed_ms": 0.0}
922
923 # Find which tips already have a fresh (non-expired) cache entry.
924 now = _utc_now()
925 cached_q = await session.execute(
926 select(MusehubFetchMPackCache.tip_commit_id)
927 .where(MusehubFetchMPackCache.repo_id == repo_id)
928 .where(MusehubFetchMPackCache.tip_commit_id.in_(tip_commit_ids))
929 .where(MusehubFetchMPackCache.expires_at > now)
930 )
931 already_cached: set[str] = {row[0] for row in cached_q}
932
933 tips_to_build = [t for t in tip_commit_ids if t not in already_cached]
934 tips_skipped = len(already_cached)
935
936 logger.warning(
937 "[fetch.mpack.prebuild] job=%s repo=%s tips=%d cached=%d to_build=%d t=%.1fms",
938 job_id[:16], repo_id, len(tip_commit_ids), tips_skipped, len(tips_to_build), _ms(),
939 )
940
941 tips_built = 0
942 if tips_to_build:
943 _build_t0 = _time_module.monotonic()
944 try:
945 # Build ONE combined mpack covering all uncached tips together.
946 # wire_fetch_mpack writes a cache row per tip (all pointing to the same
947 # mpack_id) so the multi-tip cache check fires on the next clone.
948 result = await wire_fetch_mpack(session, repo_id, want=tips_to_build, have=[])
949 mpack_id = result.get("mpack_id") or ""
950 if not mpack_id:
951 logger.warning(
952 "[fetch.mpack.prebuild] combined build produced no mpack_id — skipping cache write",
953 )
954 else:
955 tips_built = len(tips_to_build)
956 _build_ms = (_time_module.monotonic() - _build_t0) * 1000
957 logger.warning(
958 "[fetch.mpack.prebuild] built tips=%d mpack_id=%s t=%.1fms",
959 tips_built, mpack_id[:20], _build_ms,
960 )
961 except Exception as exc:
962 logger.error(
963 "[fetch.mpack.prebuild] FAILED: %s", exc, exc_info=True,
964 )
965
966 total_ms = _ms()
967 logger.warning(
968 "[fetch.mpack.prebuild] DONE job=%s repo=%s tips=%d built=%d skipped=%d TOTAL=%.1fms",
969 job_id[:16], repo_id, len(tip_commit_ids), tips_built, tips_skipped, total_ms,
970 )
971 return {
972 "tips_requested": len(tip_commit_ids),
973 "tips_built": tips_built,
974 "tips_skipped": tips_skipped,
975 "elapsed_ms": total_ms,
976 }
977
978
979 class FetchCommitNotFound(Exception):
980 """A want commit_id does not exist in musehub_commits."""
981
982
983 class FetchNotReady(Exception):
984 """Needed objects are absent from musehub_mpack_index — client must retry."""
985
986
987 async def wire_fetch(
988 session: AsyncSession,
989 repo_id: str,
990 want: list[str],
991 have: list[str],
992 ttl_seconds: int = 3600,
993 ) -> FetchResult:
994 import msgpack as _mp
995
996 _empty: FetchResult = {"mpack_id": "", "mpack_url": None, "commit_count": 0, "blob_count": 0}
997
998 for entry in want:
999 if not (isinstance(entry, str) and entry.startswith("sha256:")):
1000 raise MPackValidationError(f"want entry is not a sha256: id: {entry!r}")
1001
1002 for entry in have:
1003 if not (isinstance(entry, str) and entry.startswith("sha256:")):
1004 raise MPackValidationError(f"have entry is not a sha256: id: {entry!r}")
1005
1006 if want:
1007 existing_q = await session.execute(
1008 select(MusehubCommit.commit_id).where(MusehubCommit.commit_id.in_(want))
1009 )
1010 found = {row[0] for row in existing_q}
1011 missing_want = [cid for cid in want if cid not in found]
1012 if missing_want:
1013 raise FetchCommitNotFound(missing_want[0])
1014
1015 have_set = set(have)
1016 needed = await _walk_commit_delta(session, want, have_set)
1017 if not needed:
1018 return _empty
1019
1020 cids = list(needed.keys())
1021 commit_rows: dict[str, MusehubCommit] = {}
1022 for i in range(0, len(cids), 2000):
1023 q = await session.execute(
1024 select(MusehubCommit).where(MusehubCommit.commit_id.in_(cids[i:i + 2000]))
1025 )
1026 for row in q.scalars():
1027 commit_rows[row.commit_id] = row
1028
1029 want_snap_ids = {r.snapshot_id for r in needed.values() if r.snapshot_id}
1030 have_snap_ids: set[str] = set()
1031 if have_set:
1032 have_commits_q = await session.execute(
1033 select(MusehubCommit.snapshot_id).where(MusehubCommit.commit_id.in_(list(have_set)))
1034 )
1035 have_snap_ids = {row[0] for row in have_commits_q if row[0]}
1036
1037 new_snap_ids = want_snap_ids - have_snap_ids
1038 snap_map: dict[str, dict] = {}
1039 new_oids: set[str] = set()
1040 backend = get_backend()
1041 if new_snap_ids:
1042 snaps_q = await session.execute(
1043 select(MusehubSnapshot).where(MusehubSnapshot.snapshot_id.in_(list(new_snap_ids)))
1044 )
1045 for snap in snaps_q.scalars():
1046 manifest = (
1047 _mp.unpackb(snap.manifest_blob, raw=False)
1048 if snap.manifest_blob
1049 else await _reconstruct_manifest(session, snap.snapshot_id)
1050 )
1051 new_oids.update(v for v in manifest.values() if v)
1052 snap_map[snap.snapshot_id] = await _snap_row_to_wire_s3(snap, backend, session=session)
1053
1054 if have_snap_ids:
1055 have_snaps_q = await session.execute(
1056 select(MusehubSnapshot).where(MusehubSnapshot.snapshot_id.in_(list(have_snap_ids)))
1057 )
1058 for snap in have_snaps_q.scalars():
1059 m = (
1060 _mp.unpackb(snap.manifest_blob, raw=False)
1061 if snap.manifest_blob
1062 else await _reconstruct_manifest(session, snap.snapshot_id)
1063 )
1064 new_oids -= {v for v in m.values() if v}
1065
1066 if new_oids:
1067 idx_q = await session.execute(
1068 select(MusehubMPackIndex.entity_id).where(
1069 MusehubMPackIndex.entity_id.in_(list(new_oids)),
1070 MusehubMPackIndex.entity_type == "object",
1071 )
1072 )
1073 indexed = {row[0] for row in idx_q}
1074 unindexed = new_oids - indexed
1075 if unindexed:
1076 raise FetchNotReady(f"{len(unindexed)} object(s) not yet in mpack_index")
1077
1078 objects: list[dict] = []
1079 if new_oids:
1080 obj_q = await session.execute(
1081 select(MusehubObject).where(MusehubObject.object_id.in_(list(new_oids)))
1082 )
1083 for obj_row in obj_q.scalars():
1084 if obj_row.content_cache is not None:
1085 content = obj_row.content_cache
1086 else:
1087 content = await backend.get(obj_row.object_id) or b""
1088 objects.append({"object_id": obj_row.object_id, "content": content})
1089
1090 wire_commits = [_to_wire_commit(r).model_dump() for r in commit_rows.values()]
1091 from muse.core.mpack import build_wire_mpack as _build_fetch_mpack
1092 wire_bytes = _build_fetch_mpack({
1093 "commits": wire_commits,
1094 "snapshots": list(snap_map.values()),
1095 "blobs": objects,
1096 "tags": [],
1097 })
1098 mpack_id = blob_id(wire_bytes)
1099
1100 await backend.put_mpack(mpack_id, wire_bytes)
1101 mpack_url = await backend.presign_mpack_get(mpack_id, ttl_seconds)
1102
1103 return {
1104 "mpack_id": mpack_id,
1105 "mpack_url": mpack_url,
1106 "commit_count": len(commit_rows),
1107 "blob_count": len(objects),
1108 }
File History 14 commits
sha256:1c5b7a0aba79472f4b10e52326dc010bdab1a498c9e195593d0707860478a034 feat(#92): phase 3 — cache lookup in wire_fetch_mpack (FMC_… Sonnet 4.6 patch 5 days ago
sha256:0e447fc3f6b7887d5d9e86b557c659ef7d0b05e2e09ddb0cb551ada240e48a51 feat(phase2): fetch.mpack.prebuild job handler + worker dis… Sonnet 4.6 patch 5 days ago
sha256:f4f731efef3f1cef7eeea99f8d5f49159110ec5c48137d9273b79e60a5aadf35 fix: fetch/presign tip-only blob coverage + timing instrume… Sonnet 4.6 patch 5 days ago
sha256:9acbc3291c00dc6dbad9685f88df517c29f130ac0238283278c1f2b46e6bc10b fix: send snapshots in full-manifest format; handle delta-o… Sonnet 4.6 patch 5 days ago
sha256:3f188050991b4e062a2dc0f2f99ee17cb85b7723dbeaedf5f766180582110d7b fix: reconstruct manifest for snapshots with null manifest_blob Sonnet 4.6 patch 9 days ago
sha256:e1171ab8958a8dc1f3a25fce5b7eedda123042373bc977a377c358ea429ef43e diag: add per-stage FETCH-DIAG logging to blob extraction path Sonnet 4.6 patch 9 days ago
sha256:9dbd6015bfeb02e1832f9bdbe80ed58ee9138566f0f317c5557bbd571d6946ca refactor: remove dual fetch paths in _walk_commit_delta and… Sonnet 4.6 patch 9 days ago
sha256:a68b91a9b1f1705e66730a0c328710192bc5e921c829892f0f9a11678341e148 fix: wire_fetch_mpack falls back to MusehubCommit.snapshot_… Sonnet 4.6 patch 13 days ago
sha256:ad616c6113d6c00f4efed6b2993734ca46d3e9b5bee25addd4ce8ae6b57136e5 chore: bump version to 0.2.0rc11; typing audit clean + all … Sonnet 4.6 minor 17 days ago
sha256:e35be48854f182f7bf02dc6cc0f58d22b3de3a544b570c0e2bc53f9e75a3607d feat(phase6): remove delta_blob path, dead imports, add fal… Sonnet 4.6 minor 19 days ago
sha256:9d0ffea20e344782dc6a969d4a240b3d7c96392b5dc03bbd9421890cb78c6f19 feat(phase4): wire fetch serves commits and snapshots from … Sonnet 4.6 patch 19 days ago
sha256:39e9c4e6f2134da0732e6983268a218178973936f8d7ca03c91f2b5ad42133c8 fix: use read_object_bytes in blob viewer; add zstd magic d… Sonnet 4.6 patch 19 days ago
sha256:ab9eda7b6479e1c35cdba9a54f62bacd2825de8faacec3ba67a9a8ef45914b7d fix: migration and wire protocol alignment Sonnet 4.6 minor 19 days ago
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor 20 days ago