gabriel / musehub public
musehub_wire_push.py python
1,224 lines 47.9 KB Hotspot
Raw
sha256:80c35083d2abe19ef730693f2b35ad9a9342d2abbe81e802e6f397fe9ae75eb9 merge: pull staging/dev — asyncpg batch fix + content-addre… Sonnet 4.6 patch 8 hours ago
1 """Push path — wire_refs, wire_repair_*, wire_push_mpack_presign, wire_push_unpack_mpack."""
2
3 from typing import TypedDict
4
5 import asyncio
6 import collections
7 import hashlib
8 import logging
9 import msgpack as _msgpack
10 import time as _time_module
11 from datetime import datetime, timezone
12
13 from sqlalchemy import func, select, text as _sa_text
14 from sqlalchemy.dialects.postgresql import insert as _pg_insert
15 from sqlalchemy.ext.asyncio import AsyncSession
16
17 from musehub.db.musehub_abuse_models import MusehubBlockedHash, MusehubDailyPushBytes, MusehubPushAnomaly
18 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
19 from musehub.db.musehub_collaborator_models import MusehubCollaborator
20 from musehub.db.musehub_repo_models import (
21 MusehubBranch,
22 MusehubCommit,
23 MusehubCommitGraph,
24 MusehubCommitRef,
25 MusehubObject,
26 MusehubObjectRef,
27 MusehubMPackIndex,
28 MusehubRepo,
29 MusehubSnapshot,
30 MusehubSnapshotRef,
31 )
32 from musehub.models.wire import WireCommit, WireRefsResponse
33 from muse.core.types import blob_id, split_id
34 from musehub.core.genesis import compute_branch_id
35 from musehub.types.json_types import IntDict, JSONObject, JSONValue, StrDict
36 from musehub.config import settings
37 from musehub.storage import get_backend
38 from musehub.storage.backends import read_object_bytes
39
40 from musehub.services.musehub_wire_shared import (
41 MPackValidationError,
42 NonFastForwardError,
43 ObjectHashMismatch,
44 RepairResult,
45 _ChildMap,
46 _is_fast_forward,
47 _reconstruct_manifest,
48 _to_wire_commit,
49 _upsert_object_refs,
50 _utc_now,
51 _parse_iso,
52 _str_list,
53 _int_safe,
54 logger,
55 )
56
57 _ObjFetchMap = dict[str, tuple[str, str, str | None, bytes | None]]
58 _SnapshotManifest = dict[str, str]
59 _EMPTY_OID = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
60 _COMMIT_BATCH = 50
61
62
63 async def wire_refs(
64 session: AsyncSession,
65 repo_id: str,
66 ) -> WireRefsResponse | None:
67 _t0 = _time_module.perf_counter()
68 def _ms() -> float:
69 return (_time_module.perf_counter() - _t0) * 1000
70
71 logger.info("[wire_refs] START repo_id=%s", repo_id)
72
73 repo_row = await session.get(MusehubRepo, repo_id)
74 logger.info("[wire_refs] repo_row loaded found=%s t=%.1fms", repo_row is not None, _ms())
75 if repo_row is None:
76 return None
77
78 branch_rows = (
79 await session.execute(
80 select(MusehubBranch).where(MusehubBranch.repo_id == repo_id)
81 )
82 ).scalars().all()
83 logger.info("[wire_refs] branches loaded count=%d t=%.1fms", len(branch_rows), _ms())
84
85 branch_heads: StrDict = {
86 b.name: b.head_commit_id
87 for b in branch_rows
88 if b.head_commit_id
89 }
90
91 domain_meta: JSONObject = (
92 repo_row.domain_meta if isinstance(repo_row.domain_meta, dict) else {}
93 )
94 domain = str(domain_meta.get("domain", "code"))
95 default_branch = getattr(repo_row, "default_branch", None) or "main"
96
97 logger.info("[wire_refs] RETURN branches=%d TOTAL=%.1fms", len(branch_heads), _ms())
98 return WireRefsResponse(
99 repo_id=repo_id,
100 domain=domain,
101 default_branch=default_branch,
102 branch_heads=branch_heads,
103 )
104
105 async def wire_repair_object(
106 session: AsyncSession,
107 repo_id: str,
108 object_id: str,
109 content: bytes,
110 caller_id: str | None,
111 ) -> RepairResult:
112 repo_row = await session.get(MusehubRepo, repo_id)
113 if repo_row is None:
114 raise ValueError("repo not found")
115
116 if not caller_id:
117 raise PermissionError("repair rejected: unauthenticated")
118 if caller_id != repo_row.owner:
119 collab_row = (await session.execute(
120 select(MusehubCollaborator).where(
121 MusehubCollaborator.repo_id == repo_id,
122 MusehubCollaborator.identity_handle == caller_id,
123 MusehubCollaborator.accepted_at.isnot(None),
124 MusehubCollaborator.permission.in_(["write", "admin"]),
125 )
126 )).scalar_one_or_none()
127 if collab_row is None:
128 raise PermissionError("repair rejected: not authorized")
129
130 actual_id = blob_id(content)
131 if actual_id != object_id:
132 _, expected_hex = split_id(object_id)
133 _, actual = split_id(actual_id)
134 raise ObjectHashMismatch(
135 f"repair content hash mismatch for {object_id!r}: "
136 f"declared={expected_hex[:16]}… actual={actual[:16]}…"
137 )
138
139 backend = get_backend()
140 uri = await backend.put(object_id, content)
141
142 await session.merge(MusehubObject(
143 object_id=object_id, path="", size_bytes=len(content),
144 storage_uri=uri, content_cache=None,
145 ))
146 await _upsert_object_refs(session, repo_id, [object_id])
147 await session.commit()
148
149 logger.info(
150 "✅ repair-object repo=%s object_id=%s size=%d",
151 repo_id, object_id, len(content),
152 )
153 return {"repaired": True}
154
155 async def wire_fetch_objects(
156 session: AsyncSession,
157 repo_id: str,
158 object_ids: list[str],
159 ) -> list[dict]:
160 result: list[dict] = []
161 for oid in object_ids:
162 obj_row = await session.get(MusehubObject, oid)
163 if obj_row is None:
164 continue
165 content: bytes | None = await read_object_bytes(obj_row, session=session)
166 if content is None:
167 continue
168 result.append({"object_id": oid, "content": content})
169 return result
170
171 async def wire_repair_snapshot(
172 session: AsyncSession,
173 repo_id: str,
174 snapshot_id: str,
175 manifest: dict[str, str],
176 directories: list[str],
177 caller_id: str | None,
178 ) -> RepairResult:
179 from musehub.muse_cli.snapshot import compute_snapshot_id
180
181 repo_row = await session.get(MusehubRepo, repo_id)
182 if repo_row is None:
183 raise ValueError("repo not found")
184
185 if not caller_id:
186 raise PermissionError("repair rejected: unauthenticated")
187 if caller_id != repo_row.owner:
188 collab_row = (await session.execute(
189 select(MusehubCollaborator).where(
190 MusehubCollaborator.repo_id == repo_id,
191 MusehubCollaborator.identity_handle == caller_id,
192 MusehubCollaborator.accepted_at.isnot(None),
193 MusehubCollaborator.permission.in_(["write", "admin"]),
194 )
195 )).scalar_one_or_none()
196 if collab_row is None:
197 raise PermissionError("repair rejected: not authorized")
198
199 recomputed = compute_snapshot_id(manifest, directories or [])
200 if recomputed != snapshot_id:
201 raise ObjectHashMismatch(
202 f"repair snapshot hash mismatch for {snapshot_id!r}: "
203 f"declared={snapshot_id}… recomputed={recomputed}…"
204 )
205
206 import msgpack as _msgpack
207 blob = _msgpack.packb(manifest, use_bin_type=True)
208 dirs_sorted = sorted(directories) if directories else []
209 await session.merge(MusehubSnapshot(
210 snapshot_id=snapshot_id,
211 directories=dirs_sorted,
212 manifest_blob=blob,
213 entry_count=len(manifest),
214 created_at=_utc_now(),
215 ))
216 await session.execute(
217 _pg_insert(MusehubSnapshotRef)
218 .values([{"repo_id": repo_id, "snapshot_id": snapshot_id, "created_at": _utc_now()}])
219 .on_conflict_do_nothing(index_elements=["repo_id", "snapshot_id"])
220 )
221 await session.commit()
222
223 logger.info(
224 "✅ repair-snapshot repo=%s snapshot_id=%s entries=%d",
225 repo_id, snapshot_id, len(manifest),
226 )
227 return {"repaired": True}
228
229 async def _fetch_commit(
230 session: AsyncSession,
231 commit_id: str,
232 ) -> MusehubCommit | None:
233 return await session.get(MusehubCommit, commit_id)
234
235 _ObjFetchMap = dict[str, tuple[str, str, str | None, bytes | None]]
236
237 async def _fetch_object_meta(
238 session: AsyncSession,
239 repo_id: str,
240 object_ids: list[str],
241 ) -> _ObjFetchMap:
242 result: _ObjFetchMap = {}
243 if _EMPTY_OID in object_ids:
244 result[_EMPTY_OID] = (repo_id, "", None, None)
245 remaining = [oid for oid in object_ids if oid != _EMPTY_OID]
246 if remaining:
247 obj_rows_q = await session.execute(
248 select(MusehubObject).where(MusehubObject.object_id.in_(remaining))
249 )
250 result.update({
251 r.object_id: (repo_id, r.path or "", r.storage_uri, r.content_cache)
252 for r in obj_rows_q.scalars().all()
253 })
254 return result
255
256 def compute_object_byte_offsets(wire_bytes: bytes) -> "dict[str, tuple[int, int]]":
257 """Return {object_id: (absolute_byte_offset, byte_length)} for every object
258 in the OBJECTS section of a wire mpack binary.
259
260 Parses the section table to find the OBJECTS section, then walks the
261 ``_build_pack`` layout to compute each object's content start position
262 within *wire_bytes*. The returned offsets are absolute — callers can issue
263 ``Range: bytes=offset-offset+length-1`` S3 GETs directly against the mpack.
264
265 Returns an empty dict for wire mpacks with no OBJECTS section.
266 """
267 import struct as _struct
268
269 _WIRE_SEC_BLOBS = 1
270 _OID_BYTES = 71 # len("sha256:") + 64 hex chars
271 _PACK_HEADER = 13 # magic(4) + version(1) + count(8)
272
273 if len(wire_bytes) < 6 or wire_bytes[:4] != b"MUSE":
274 return {}
275
276 section_count = wire_bytes[5]
277 cursor = 6
278 objects_section_offset: int | None = None
279 for _ in range(section_count):
280 sec_type = wire_bytes[cursor]
281 (sec_offset,) = _struct.unpack_from("<Q", wire_bytes, cursor + 1)
282 if sec_type == _WIRE_SEC_BLOBS:
283 objects_section_offset = sec_offset
284 break
285 cursor += 17
286
287 if objects_section_offset is None:
288 return {}
289
290 # Walk the _build_pack layout within the OBJECTS section
291 pack_start = objects_section_offset
292 if wire_bytes[pack_start : pack_start + 4] != b"MUSE":
293 return {} # not a valid pack section
294
295 (object_count,) = _struct.unpack_from("<Q", wire_bytes, pack_start + 5)
296 pos = pack_start + _PACK_HEADER
297 result: dict[str, tuple[int, int]] = {}
298
299 for _ in range(object_count):
300 oid = wire_bytes[pos : pos + _OID_BYTES].decode("ascii", errors="replace")
301 pos += _OID_BYTES
302 (length,) = _struct.unpack_from("<Q", wire_bytes, pos)
303 pos += 8 # skip length field
304 result[oid] = (pos, length) # pos is now at content start
305 pos += length
306
307 return result
308
309
310 def _topological_sort(commits: list[WireCommit]) -> list[WireCommit]:
311 if not commits:
312 return []
313 by_id = {c.commit_id: c for c in commits}
314 in_degree: IntDict = {c.commit_id: 0 for c in commits}
315 children: _ChildMap = {c.commit_id: [] for c in commits}
316
317 for c in commits:
318 for pid in filter(None, [c.parent_commit_id, c.parent2_commit_id]):
319 if pid in by_id:
320 in_degree[c.commit_id] += 1
321 children[pid].append(c.commit_id)
322
323 queue: collections.deque[str] = collections.deque(
324 cid for cid, deg in in_degree.items() if deg == 0
325 )
326 result: list[WireCommit] = []
327 while queue:
328 cid = queue.popleft()
329 result.append(by_id[cid])
330 for child_id in children.get(cid, []):
331 in_degree[child_id] -= 1
332 if in_degree[child_id] == 0:
333 queue.append(child_id)
334
335 sorted_ids = {c.commit_id for c in result}
336 for c in commits:
337 if c.commit_id not in sorted_ids:
338 result.append(c)
339
340 return result
341
342 async def record_mpack_bytes_uploaded(
343 session: AsyncSession,
344 identity_id: str,
345 size_bytes: int,
346 ) -> None:
347 import datetime as _dt
348 today = _dt.date.today()
349 await session.execute(
350 _pg_insert(MusehubDailyPushBytes)
351 .values(identity_id=identity_id, date=today, bytes_uploaded=size_bytes)
352 .on_conflict_do_update(
353 index_elements=["identity_id", "date"],
354 set_={"bytes_uploaded": MusehubDailyPushBytes.bytes_uploaded + size_bytes},
355 )
356 )
357
358
359 async def check_push_anomaly(
360 session: AsyncSession,
361 identity_id: str,
362 bytes_today: int,
363 ) -> bool:
364 import datetime as _dt
365 import uuid as _uuid
366
367 today = _dt.date.today()
368 cutoff = today - _dt.timedelta(days=30)
369
370 result = await session.execute(
371 select(func.avg(MusehubDailyPushBytes.bytes_uploaded)).where(
372 MusehubDailyPushBytes.identity_id == identity_id,
373 MusehubDailyPushBytes.date >= cutoff,
374 MusehubDailyPushBytes.date < today,
375 )
376 )
377 avg_bytes = result.scalar()
378 if avg_bytes is None or avg_bytes == 0:
379 return False
380
381 ratio = bytes_today / float(avg_bytes)
382 if ratio <= 10.0:
383 return False
384
385 anomaly_id = "sha256:" + hashlib.sha256(
386 f"{identity_id}:{today.isoformat()}:{bytes_today}".encode()
387 ).hexdigest()
388 logger.warning(
389 "[anomaly] push spike detected identity_id=%s bytes_today=%d avg_bytes=%.0f ratio=%.1fx",
390 identity_id[:20], bytes_today, avg_bytes, ratio,
391 )
392 session.add(MusehubPushAnomaly(
393 anomaly_id=anomaly_id,
394 identity_id=identity_id,
395 bytes_today=bytes_today,
396 rolling_avg_bytes=float(avg_bytes),
397 ratio=ratio,
398 ))
399 return True
400
401
402 async def wire_push_mpack_presign(
403 mpack_key: str,
404 size_bytes: int,
405 ttl_seconds: int = 3600,
406 ) -> JSONObject:
407 logger.warning("[mpack-presign] START mpack_key=%s size_bytes=%d ttl=%ds", mpack_key, size_bytes, ttl_seconds)
408 backend = get_backend()
409 upload_url = await backend.presign_mpack_put(mpack_key, ttl_seconds)
410 logger.warning("[mpack-presign] upload_url=%s", upload_url)
411 return {"upload_url": upload_url, "mpack_key": mpack_key}
412
413
414 async def wire_push_unpack_mpack(
415 session: AsyncSession,
416 repo_id: str,
417 mpack_key: str,
418 pusher_id: str | None,
419 branch: str = "main",
420 head_commit_id: str = "",
421 commits_count: int = 0,
422 blobs_count: int = 0,
423 force: bool = False,
424 ) -> JSONObject:
425 t0 = _time_module.monotonic()
426 def _ms(ref: float = 0.0) -> float:
427 return (_time_module.monotonic() - (t0 if ref == 0.0 else ref)) * 1000
428
429 from musehub.storage.backends import get_backend
430 backend = get_backend()
431
432 logger.warning(
433 "[SERVER step 0] receive repo=%s branch=%s head=%s",
434 repo_id[:20], branch, head_commit_id[:20] if head_commit_id else "none",
435 )
436
437 # step 1: fetch mpack bytes from MinIO by mpack_key
438 wire_bytes = await backend.get_mpack(mpack_key)
439 t_got = _time_module.monotonic()
440 if not wire_bytes:
441 raise ValueError(f"mpack {mpack_key[:20]} not found in storage")
442 mpack_bytes_len = len(wire_bytes)
443 logger.warning(
444 "[SERVER step 1] fetched mpack from object store %.1fms size=%d bytes (%.4f MB) key=%s",
445 _ms(), mpack_bytes_len, mpack_bytes_len / 1_048_576, mpack_key,
446 )
447
448 from musehub.config import get_settings as _get_settings
449 _max_bytes = _get_settings().mpack_max_bytes
450 if mpack_bytes_len > _max_bytes:
451 raise ValueError(
452 f"mpack {mpack_key[:20]} size {mpack_bytes_len:,} bytes "
453 f"exceeds limit {_max_bytes:,} bytes"
454 )
455
456 # step 2: verify blob_id(bytes) == mpack_key
457 actual_id = blob_id(wire_bytes)
458 t_verify = _time_module.monotonic()
459 _, expected_hex = split_id(mpack_key)
460 _, got_hex = split_id(actual_id)
461 match = actual_id == mpack_key
462 logger.warning(
463 "[SERVER step 2] integrity check %.1fms expected=%s got=%s match=%s",
464 (t_verify - t_got) * 1000, expected_hex[:20], got_hex[:20], match,
465 )
466 if not match:
467 raise ValueError(
468 f"mpack integrity failure: sha256={got_hex[:20]}… ≠ key={expected_hex[:20]}…"
469 )
470
471 # ── 2. Inline index ───────────────────────────────────────────────────────
472 import zstandard as _zstd_sync
473
474 if wire_bytes[:4] != b"MUSE":
475 raise ValueError(f"mpack is not MUSE binary format (got {wire_bytes[:4]!r})")
476 from muse.core.mpack import parse_wire_mpack as _parse_wire_mpack
477 _mpack_sync: JSONObject = _parse_wire_mpack(wire_bytes)
478 t_parse = _time_module.monotonic()
479 _raw_blobs: list[JSONValue] = _mpack_sync.get("blobs") or []
480 _raw_snaps: list[JSONValue] = _mpack_sync.get("snapshots") or []
481 _raw_commits_inline: list[JSONValue] = _mpack_sync.get("commits") or []
482 logger.warning("[SERVER step 3+4] verify MUSE magic + parse %.1fms commits=%d snapshots=%d blobs=%d",
483 (t_parse - t_verify) * 1000, len(_raw_commits_inline), len(_raw_snaps), len(_raw_blobs))
484
485 # 2b. zstd decompress all blobs — zip bomb guard runs inline
486 _dctx_sync = _zstd_sync.ZstdDecompressor()
487 _decompressed_sync: list[tuple[str, bytes]] = []
488 _total_blob_bytes = 0
489 _max_decompressed = settings.mpack_max_decompressed_bytes
490 for _obj in _raw_blobs:
491 _oid = _obj["object_id"]
492 _raw = _obj.get("content") or b""
493 if _obj.get("encoding") == "zstd" and _raw:
494 _raw = _dctx_sync.decompress(_raw)
495 _total_blob_bytes += len(_raw)
496 if _total_blob_bytes > _max_decompressed:
497 _zip_err = MPackValidationError(
498 f"decompressed size {_total_blob_bytes:,} exceeds limit "
499 f"{_max_decompressed:,} — possible zip bomb"
500 )
501 await backend.quarantine_mpack(mpack_key)
502 raise _zip_err
503 _decompressed_sync.append((_oid, _raw))
504 t_decomp = _time_module.monotonic()
505 logger.warning("[SERVER step 5] decompress + zip-bomb check %.1fms blobs=%d uncompressed=%.2f MB",
506 (t_decomp - t_parse) * 1000, len(_decompressed_sync), _total_blob_bytes / 1_048_576)
507
508 # step 6: check blocked hashes
509 if _decompressed_sync:
510 _all_check_oids = [_oid for _oid, _ in _decompressed_sync]
511 _blocked_oids = (await session.execute(
512 select(MusehubBlockedHash.object_id)
513 .where(MusehubBlockedHash.object_id.in_(_all_check_oids))
514 )).scalars().all()
515 if _blocked_oids:
516 _blocked_err = MPackValidationError(
517 f"mpack contains {len(_blocked_oids)} blocked object(s): "
518 + ", ".join(oid[:20] for oid in _blocked_oids[:3])
519 + ("..." if len(_blocked_oids) > 3 else "")
520 )
521 await backend.quarantine_mpack(mpack_key)
522 raise _blocked_err
523 logger.warning("[SERVER step 6] check blocked hashes objects=%d blocked=0", len(_decompressed_sync))
524
525 # step 7: write objects
526 _new_oids: list[str] = []
527 if _decompressed_sync:
528 _cc_oids = [_oid for _oid, _ in _decompressed_sync]
529
530 t_obj_sel0 = _time_module.monotonic()
531 _existing_oids = set((await session.execute(
532 _sa_text("SELECT object_id FROM musehub_objects WHERE object_id = ANY(:ids)"),
533 {"ids": _cc_oids},
534 )).scalars())
535 t_obj_sel1 = _time_module.monotonic()
536 logger.warning("[SERVER step 7a] objects: dedup SELECT %.1fms total=%d existing=%d new=%d",
537 (t_obj_sel1 - t_obj_sel0) * 1000, len(_cc_oids), len(_existing_oids),
538 len(_cc_oids) - len(_existing_oids))
539
540 session.add_all([
541 MusehubObject(object_id=_oid, path="", size_bytes=len(_data),
542 storage_uri=f"mpack://{mpack_key}", content_cache=None)
543 for _oid, _data in _decompressed_sync
544 if _oid not in _existing_oids
545 ])
546 t_obj_add = _time_module.monotonic()
547 logger.warning("[SERVER step 7b] objects: INSERT new %.1fms", (t_obj_add - t_obj_sel1) * 1000)
548
549 _new_oids = [_oid for _oid in _cc_oids if _oid not in _existing_oids]
550 t_ref0 = _time_module.monotonic()
551 await _upsert_object_refs(session, repo_id, _cc_oids)
552 t_ref1 = _time_module.monotonic()
553 logger.warning("[SERVER step 7c] objects: UPSERT object_refs %.1fms oids=%d",
554 (t_ref1 - t_ref0) * 1000, len(_cc_oids))
555
556 if _cc_oids:
557 _byte_offsets = compute_object_byte_offsets(wire_bytes)
558 _mpack_idx_rows = [
559 {
560 "entity_id": _oid, "mpack_id": mpack_key,
561 "entity_type": "object", "created_at": _utc_now(),
562 "byte_offset": _byte_offsets.get(_oid, (None, None))[0],
563 "byte_length": _byte_offsets.get(_oid, (None, None))[1],
564 }
565 for _oid in _cc_oids
566 ]
567 # asyncpg caps query parameters at 32767; 6 columns per row → 5461 rows/batch
568 _MPACK_IDX_BATCH = 5461
569 for _batch_start in range(0, len(_mpack_idx_rows), _MPACK_IDX_BATCH):
570 _batch = _mpack_idx_rows[_batch_start:_batch_start + _MPACK_IDX_BATCH]
571 _idx_stmt = _pg_insert(MusehubMPackIndex).values(_batch)
572 await session.execute(
573 _idx_stmt.on_conflict_do_update(
574 index_elements=["entity_id", "mpack_id"],
575 set_={
576 "byte_offset": _idx_stmt.excluded.byte_offset,
577 "byte_length": _idx_stmt.excluded.byte_length,
578 },
579 where=MusehubMPackIndex.byte_offset.is_(None),
580 )
581 )
582 t_idx_add = _time_module.monotonic()
583 logger.warning("[SERVER step 7d] objects: UPSERT mpack_index %.1fms total=%d",
584 (t_idx_add - t_ref1) * 1000, len(_cc_oids))
585 else:
586 t_idx_add = t_decomp
587
588 # step 7: write snapshots
589 _snap_ids_in_mpack = {_sd.get("snapshot_id") for _sd in _raw_snaps if _sd.get("snapshot_id")}
590 _external_parent_sids = {
591 _sd.get("parent_snapshot_id")
592 for _sd in _raw_snaps
593 if _sd.get("parent_snapshot_id") and _sd.get("parent_snapshot_id") not in _snap_ids_in_mpack
594 }
595 _parent_snap_manifests: dict[str, dict] = {}
596 t_snap_ext0 = _time_module.monotonic()
597 if _external_parent_sids:
598 _psnap_rows = (await session.execute(
599 select(MusehubSnapshot.snapshot_id, MusehubSnapshot.manifest_blob)
600 .where(MusehubSnapshot.snapshot_id.in_(_external_parent_sids))
601 )).all()
602 for _psid, _pblob in _psnap_rows:
603 if _pblob:
604 _parent_snap_manifests[_psid] = dict(_msgpack.unpackb(_pblob, raw=False))
605 t_snap_ext1 = _time_module.monotonic()
606 logger.warning("[SERVER step 8a] snapshots: fetch external parents %.1fms external=%d loaded=%d",
607 (t_snap_ext1 - t_snap_ext0) * 1000, len(_external_parent_sids), len(_parent_snap_manifests))
608
609 _head_snap_id: str | None = None
610 for _cd in _raw_commits_inline:
611 if _cd.get("commit_id") == head_commit_id:
612 _head_snap_id = _cd.get("snapshot_id") or None
613 break
614
615 _snap_resolved: dict[str, _SnapshotManifest] = {}
616 _snap_rows_inline = []
617 _n_snap_root = 0
618 _n_snap_head = 0
619 _n_snap_delta_only = 0
620 _snap_manifest_bytes_total = 0
621 _snap_delta_bytes_total = 0
622
623 t_snap_loop0 = _time_module.monotonic()
624 for _sd in _raw_snaps:
625 _sid = _sd.get("snapshot_id", "")
626 if not _sid:
627 continue
628 _parent_sid = _sd.get("parent_snapshot_id")
629 _delta_upsert: dict[str, str] = _sd.get("delta_upsert") or {}
630 _delta_remove: list[str] = _sd.get("delta_remove") or []
631 if _parent_sid and _parent_sid in _snap_resolved:
632 _base = dict(_snap_resolved[_parent_sid])
633 elif _parent_sid:
634 _base = dict(_parent_snap_manifests.get(_parent_sid) or {})
635 else:
636 _base = {}
637 _base.update(_delta_upsert)
638 for _path in _delta_remove:
639 _base.pop(_path, None)
640 _snap_resolved[_sid] = _base
641
642 _delta_encoded: JSONObject = {"add": _delta_upsert}
643 if _delta_remove:
644 _delta_encoded["rm"] = _delta_remove
645 _delta_blob = _msgpack.packb(_delta_encoded, use_bin_type=True) if (_delta_upsert or _delta_remove) else None
646
647 _is_root = not _parent_sid
648 _is_head = _sid == _head_snap_id
649 # Only store full manifest for root and head — delta-only snapshots
650 # get manifest_blob=None here and are backfilled by the mpack.index job.
651 # Storing all 1000 manifests inline would push hundreds of MB into a single
652 # transaction, blocking snapshot_refs inserts (regression at scale).
653 _manifest_blob = _msgpack.packb(_base, use_bin_type=True) if (_is_root or _is_head) else None
654
655 if _is_root: _n_snap_root += 1
656 if _is_head: _n_snap_head += 1
657 if not _is_root and not _is_head: _n_snap_delta_only += 1
658 if _manifest_blob: _snap_manifest_bytes_total += len(_manifest_blob)
659 if _delta_blob: _snap_delta_bytes_total += len(_delta_blob)
660
661 _snap_dirs = [str(d) for d in (_sd.get("directories") or []) if d]
662 _snap_rows_inline.append({
663 "snapshot_id": _sid,
664 "directories": _snap_dirs,
665 "manifest_blob": _manifest_blob,
666 "entry_count": len(_base),
667 "created_at": _utc_now(),
668 "parent_snapshot_id": _parent_sid or None,
669 "delta_blob": _delta_blob,
670 })
671 t_snap_loop1 = _time_module.monotonic()
672 logger.warning(
673 "[SERVER step 8b] snapshots: replay delta chain %.1fms "
674 "total=%d root=%d head=%d delta_only=%d",
675 (t_snap_loop1 - t_snap_loop0) * 1000, len(_snap_rows_inline),
676 _n_snap_root, _n_snap_head, _n_snap_delta_only,
677 )
678
679 if _snap_rows_inline:
680 _snap_ids_to_check = [r["snapshot_id"] for r in _snap_rows_inline]
681 t_snap_sel0 = _time_module.monotonic()
682 _existing_snap_rows = (await session.execute(
683 _sa_text(
684 "SELECT snapshot_id, (delta_blob IS NOT NULL) AS has_delta "
685 "FROM musehub_snapshots WHERE snapshot_id = ANY(:ids)"
686 ),
687 {"ids": _snap_ids_to_check},
688 )).all()
689 _existing_sids: set[str] = {row[0] for row in _existing_snap_rows}
690 _sids_already_healed: set[str] = {row[0] for row in _existing_snap_rows if row[1]}
691 t_snap_sel1 = _time_module.monotonic()
692 logger.warning("[SERVER step 8c] snapshots: dedup SELECT %.1fms existing=%d new=%d",
693 (t_snap_sel1 - t_snap_sel0) * 1000, len(_existing_sids),
694 len(_snap_rows_inline) - len(_existing_sids))
695
696 _new_snap_dicts = [r for r in _snap_rows_inline if r["snapshot_id"] not in _existing_sids]
697 _heal_snap_dicts = [
698 r for r in _snap_rows_inline
699 if r["snapshot_id"] in _existing_sids
700 and r["snapshot_id"] not in _sids_already_healed
701 and r.get("delta_blob")
702 ]
703 if _new_snap_dicts:
704 session.add_all([MusehubSnapshot(**_r) for _r in _new_snap_dicts])
705 if _heal_snap_dicts:
706 _HEAL_CHUNK = 2000
707 for _hi in range(0, len(_heal_snap_dicts), _HEAL_CHUNK):
708 _chunk = _heal_snap_dicts[_hi : _hi + _HEAL_CHUNK]
709 _heal_stmt = _pg_insert(MusehubSnapshot).values(_chunk)
710 await session.execute(
711 _heal_stmt.on_conflict_do_update(
712 index_elements=["snapshot_id"],
713 set_={
714 "delta_blob": _heal_stmt.excluded.delta_blob,
715 "parent_snapshot_id": _heal_stmt.excluded.parent_snapshot_id,
716 },
717 where=MusehubSnapshot.delta_blob.is_(None),
718 )
719 )
720 t_snap_add = _time_module.monotonic()
721 logger.warning("[SERVER step 8d] snapshots: INSERT new %.1fms new=%d healed=%d",
722 (t_snap_add - t_snap_sel1) * 1000, len(_new_snap_dicts), len(_heal_snap_dicts))
723
724 _new_snap_rows = _new_snap_dicts
725 if _new_snap_rows:
726 await session.execute(
727 _pg_insert(MusehubSnapshotRef)
728 .values([{"repo_id": repo_id, "snapshot_id": r["snapshot_id"], "created_at": _utc_now()} for r in _new_snap_rows])
729 .on_conflict_do_nothing(index_elements=["repo_id", "snapshot_id"])
730 )
731 t_snap_ref = _time_module.monotonic()
732 logger.warning("[SERVER step 8e] snapshots: UPSERT snapshot_refs %.1fms new=%d", (t_snap_ref - t_snap_add) * 1000, len(_new_snap_rows))
733 else:
734 t_snap_ref = t_snap_loop1
735
736 # step 8: write commits
737 from muse.core.commits import CommitRecord as _CR_inline
738 _commit_rows_inline = []
739 _graph_rows_inline = []
740
741 _commit_ids_in_mpack = {
742 _cd.get("commit_id") for _cd in _raw_commits_inline if _cd.get("commit_id")
743 }
744 _external_parent_cids = {
745 pid
746 for _cd in _raw_commits_inline
747 for pid in (
748 ([_cd["parent_commit_id"]] if _cd.get("parent_commit_id") else [])
749 + ([_cd["parent2_commit_id"]] if _cd.get("parent2_commit_id") else [])
750 )
751 if pid not in _commit_ids_in_mpack
752 }
753 t_gen_sel0 = _time_module.monotonic()
754 _db_parent_gens: dict[str, int] = {}
755 if _external_parent_cids:
756 _gen_rows = (await session.execute(
757 select(MusehubCommitGraph.commit_id, MusehubCommitGraph.generation)
758 .where(MusehubCommitGraph.commit_id.in_(_external_parent_cids))
759 )).all()
760 _db_parent_gens = {_cid: _gval for _cid, _gval in _gen_rows}
761 t_gen_sel1 = _time_module.monotonic()
762 logger.warning("[SERVER step 9a] commits: fetch parent generations %.1fms external_parents=%d found=%d",
763 (t_gen_sel1 - t_gen_sel0) * 1000, len(_external_parent_cids), len(_db_parent_gens))
764
765 t_commit_parse0 = _time_module.monotonic()
766
767 _cid_set_inline = {_cd.get("commit_id") for _cd in _raw_commits_inline if _cd.get("commit_id")}
768 _cd_by_cid: dict[str, dict] = {_cd["commit_id"]: _cd for _cd in _raw_commits_inline if _cd.get("commit_id")}
769 _children: dict[str, list[dict]] = {cid: [] for cid in _cid_set_inline}
770 _in_degree: dict[str, int] = {cid: 0 for cid in _cid_set_inline}
771 for _cd in _raw_commits_inline:
772 for _pk in ("parent_commit_id", "parent2_commit_id"):
773 _p = _cd.get(_pk) or ""
774 if _p in _cid_set_inline:
775 _children[_p].append(_cd)
776 _in_degree[_cd["commit_id"]] = _in_degree.get(_cd["commit_id"], 0) + 1
777 from collections import deque as _deque
778 _queue: _deque[dict] = _deque(
779 _cd_by_cid[cid] for cid, deg in _in_degree.items() if deg == 0
780 )
781 _topo_sorted: list[dict] = []
782 while _queue:
783 _cd = _queue.popleft()
784 _topo_sorted.append(_cd)
785 for _child in _children.get(_cd.get("commit_id", ""), []):
786 _child_cid = _child.get("commit_id", "")
787 _in_degree[_child_cid] -= 1
788 if _in_degree[_child_cid] == 0:
789 _queue.append(_child)
790 _topo_seen = {_cd.get("commit_id") for _cd in _topo_sorted}
791 _topo_sorted.extend(_cd for _cd in _raw_commits_inline if _cd.get("commit_id") not in _topo_seen)
792
793 _inline_gen: dict[str, int] = {}
794 for _cd in _topo_sorted:
795 try:
796 _cr = _CR_inline.from_dict(_cd)
797 except Exception:
798 continue
799 if not _cr.commit_id:
800 continue
801 _pids: list[str] = []
802 if _cr.parent_commit_id:
803 _pids.append(_cr.parent_commit_id)
804 if _cr.parent2_commit_id:
805 _pids.append(_cr.parent2_commit_id)
806 _parent_gens: list[int] = []
807 for _pid in _pids:
808 if _pid in _inline_gen:
809 _parent_gens.append(_inline_gen[_pid])
810 elif _pid in _db_parent_gens:
811 _parent_gens.append(_db_parent_gens[_pid])
812 _gen = (max(_parent_gens) + 1) if _parent_gens else 0
813 _inline_gen[_cr.commit_id] = _gen
814 _commit_rows_inline.append({
815 "commit_id": _cr.commit_id,
816 "branch": _cr.branch or "main",
817 "message": _cr.message or "",
818 "author": _cr.author or "",
819 "timestamp": _cr.committed_at or _utc_now(),
820 "parent_ids": _pids,
821 "snapshot_id": _cr.snapshot_id or None,
822 "agent_id": _cr.agent_id or "",
823 "model_id": _cr.model_id or "",
824 "toolchain_id": _cr.toolchain_id or "",
825 "commit_branch": _cr.branch or None,
826 "signature": _cr.signature or "",
827 "signer_public_key": _cr.signer_public_key or "",
828 "signer_key_id": _cr.signer_key_id or "",
829 "sem_ver_bump": _cr.sem_ver_bump or "none",
830 "breaking_changes": _cr.breaking_changes or [],
831 "reviewed_by": [],
832 "test_runs": 0,
833 "prompt_hash": _cr.prompt_hash or "",
834 "structured_delta": _cr.structured_delta if isinstance(_cr.structured_delta, dict) else None,
835 })
836 _graph_rows_inline.append({
837 "commit_id": _cr.commit_id,
838 "parent_ids": _pids,
839 "generation": _gen,
840 "snapshot_id": _cr.snapshot_id or None,
841 })
842 t_commit_parse1 = _time_module.monotonic()
843 _gen_values = sorted(_inline_gen.values())
844 logger.warning(
845 "[SERVER step 9b] commits: topo sort + parse %.1fms parsed=%d gen_min=%s gen_max=%s",
846 (t_commit_parse1 - t_commit_parse0) * 1000, len(_commit_rows_inline),
847 _gen_values[0] if _gen_values else "NONE",
848 _gen_values[-1] if _gen_values else "NONE",
849 )
850
851 if _commit_rows_inline:
852 t_csel0 = _time_module.monotonic()
853 _commit_ids_to_check = [r["commit_id"] for r in _commit_rows_inline]
854 _existing_cids = set((await session.execute(
855 _sa_text("SELECT commit_id FROM musehub_commits WHERE commit_id = ANY(:ids)"),
856 {"ids": _commit_ids_to_check},
857 )).scalars())
858 t_csel1 = _time_module.monotonic()
859 logger.warning("[SERVER step 9c] commits: dedup SELECT %.1fms existing=%d new=%d",
860 (t_csel1 - t_csel0) * 1000, len(_existing_cids),
861 len(_commit_rows_inline) - len(_existing_cids))
862
863 _new_commit_rows = [r for r in _commit_rows_inline if r["commit_id"] not in _existing_cids]
864 session.add_all([MusehubCommit(**_r) for _r in _new_commit_rows])
865 t_cadd = _time_module.monotonic()
866 logger.warning("[SERVER step 9d] commits: INSERT new %.1fms new=%d", (t_cadd - t_csel1) * 1000, len(_new_commit_rows))
867 # Upsert refs for ALL commits in this push, not just globally-new ones.
868 # A commit may already exist in musehub_commits from another repo's push
869 # but still be missing its ref for THIS repo — using _new_commit_rows here
870 # silently drops those refs and leaves commit history invisible to intel.
871 if _commit_rows_inline:
872 await session.execute(
873 _pg_insert(MusehubCommitRef)
874 .values([{"repo_id": repo_id, "commit_id": r["commit_id"], "created_at": _utc_now()} for r in _commit_rows_inline])
875 .on_conflict_do_nothing(index_elements=["repo_id", "commit_id"])
876 )
877 t_cref = _time_module.monotonic()
878 logger.warning("[SERVER step 9e] commits: UPSERT commit_refs %.1fms total=%d", (t_cref - t_cadd) * 1000, len(_commit_rows_inline))
879
880 if _graph_rows_inline:
881 _graph_stmt = _pg_insert(MusehubCommitGraph).values(_graph_rows_inline)
882 await session.execute(_graph_stmt.on_conflict_do_update(
883 index_elements=["commit_id"],
884 set_={"generation": _graph_stmt.excluded.generation,
885 "snapshot_id": _graph_stmt.excluded.snapshot_id},
886 ))
887 t_graph = _time_module.monotonic()
888 logger.warning("[SERVER step 9f] commits: UPSERT commit_graph %.1fms rows=%d",
889 (t_graph - t_cref) * 1000, len(_graph_rows_inline))
890 else:
891 t_graph = t_commit_parse1
892
893 # step 9: advance branch pointer
894 tip = head_commit_id
895 if tip and branch:
896 t_branch0 = _time_module.monotonic()
897 branch_row = (await session.execute(
898 select(MusehubBranch).where(
899 MusehubBranch.repo_id == repo_id,
900 MusehubBranch.name == branch,
901 ).with_for_update()
902 )).scalar_one_or_none()
903
904 current_head = branch_row.head_commit_id if branch_row else None
905 if not force and not _is_fast_forward(tip, current_head or "", wire_bytes):
906 raise NonFastForwardError(
907 f"push rejected: {tip[:20]}… is not a fast-forward of "
908 f"{(current_head or '')[:20]}… — use --force to override"
909 )
910
911 if branch_row is None:
912 session.add(MusehubBranch(
913 branch_id=compute_branch_id(repo_id, branch),
914 repo_id=repo_id,
915 name=branch,
916 head_commit_id=tip,
917 ))
918 else:
919 branch_row.head_commit_id = tip
920 t_branch1 = _time_module.monotonic()
921 logger.warning("[SERVER step 10] advance branch pointer (atomic CAS) %.1fms", (t_branch1 - t_branch0) * 1000)
922 else:
923 t_branch1 = t_graph
924
925 # commit db transaction
926 t_pre_commit = _time_module.monotonic()
927 await session.commit()
928 t_commit = _time_module.monotonic()
929 logger.warning("[SERVER step 10] commit db transaction %.1fms", (t_commit - t_pre_commit) * 1000)
930
931 # step 11: enqueue async jobs — TODO: not yet implemented
932
933 elapsed_ms = (t_commit - t0) * 1000
934 logger.warning("[SERVER step 11] enqueue intel jobs (TODO) 0.0ms")
935 logger.warning("[SERVER ✅] TOTAL=%.1fms branch=%s head=%s", elapsed_ms, branch, head_commit_id[:20] if head_commit_id else "none")
936 return {
937 "head": tip,
938 "branch": branch,
939 "blobs_in_mpack": blobs_count,
940 "commits_in_mpack": commits_count,
941 "blobs_written": len(_new_oids),
942 "commits_written": len(_new_commit_rows) if _commit_rows_inline else 0,
943 "snapshots_written": len(_new_snap_dicts) if _snap_rows_inline else 0,
944 }
945
946
947 class PurgeResult(TypedDict):
948 removed: int
949 kept: int
950
951
952 async def purge_stale_mpack_index_entries(session: AsyncSession) -> PurgeResult:
953 """Delete MusehubMPackIndex rows for mpacks no longer in MinIO.
954
955 Left behind when mpack.gc deleted old mpacks but didn't prune the index.
956 Safe to delete: objects with stale entries have valid s3:// storage_uri
957 and are served directly from S3 without needing the mpack path.
958
959 Returns {"removed": N, "kept": M}.
960 """
961 from musehub.storage.backends import get_backend
962 backend = get_backend()
963
964 # Find all distinct mpack_ids in the index.
965 distinct_mpacks = (await session.execute(
966 select(MusehubMPackIndex.mpack_id).distinct()
967 )).scalars().all()
968
969 removed = 0
970 kept = 0
971 for mpack_id in distinct_mpacks:
972 alive = await backend.exists_mpack(mpack_id)
973 if alive:
974 kept_count = (await session.execute(
975 select(MusehubMPackIndex.entity_id).where(
976 MusehubMPackIndex.mpack_id == mpack_id
977 )
978 )).scalars().all()
979 kept += len(kept_count)
980 else:
981 # Delete all index entries for this dead mpack.
982 rows = (await session.execute(
983 select(MusehubMPackIndex).where(
984 MusehubMPackIndex.mpack_id == mpack_id
985 )
986 )).scalars().all()
987 for row in rows:
988 await session.delete(row)
989 removed += len(rows)
990
991 await session.flush()
992 logger.info("purge_stale_mpack_index_entries: removed=%d kept=%d", removed, kept)
993 return {"removed": removed, "kept": kept}
994
995
996 async def _build_commit_graph_from_raw(session: AsyncSession, raw_commits: list[dict]) -> int:
997 """Compute generation numbers and upsert MusehubCommitGraph rows.
998
999 Mirrors the generation logic in wire_push_unpack_mpack but operates on
1000 an already-parsed commit list (used by process_mpack_index_job).
1001 Returns the number of rows upserted.
1002 """
1003 if not raw_commits:
1004 return 0
1005
1006 from collections import deque as _deque
1007
1008 cid_set = {cd.get("commit_id") for cd in raw_commits if cd.get("commit_id")}
1009 cd_by_cid = {cd["commit_id"]: cd for cd in raw_commits if cd.get("commit_id")}
1010
1011 # Topological sort (Kahn's algorithm — oldest first so parents resolve before children).
1012 children: dict[str, list[dict]] = {cid: [] for cid in cid_set}
1013 in_degree: dict[str, int] = {cid: 0 for cid in cid_set}
1014 for cd in raw_commits:
1015 for pk in ("parent_commit_id", "parent2_commit_id"):
1016 p = cd.get(pk) or ""
1017 if p in cid_set:
1018 children[p].append(cd)
1019 in_degree[cd["commit_id"]] = in_degree.get(cd["commit_id"], 0) + 1
1020
1021 queue: _deque[dict] = _deque(cd_by_cid[cid] for cid, deg in in_degree.items() if deg == 0)
1022 topo: list[dict] = []
1023 while queue:
1024 cd = queue.popleft()
1025 topo.append(cd)
1026 for child in children.get(cd.get("commit_id", ""), []):
1027 child_cid = child.get("commit_id", "")
1028 in_degree[child_cid] -= 1
1029 if in_degree[child_cid] == 0:
1030 queue.append(child)
1031 seen = {cd.get("commit_id") for cd in topo}
1032 topo.extend(cd for cd in raw_commits if cd.get("commit_id") not in seen)
1033
1034 # Fetch generation numbers for any external parents already in the DB.
1035 external_pids = {
1036 pid
1037 for cd in raw_commits
1038 for pid in ([cd.get("parent_commit_id") or "", cd.get("parent2_commit_id") or ""])
1039 if pid and pid not in cid_set
1040 }
1041 db_parent_gens: dict[str, int] = {}
1042 if external_pids:
1043 gen_rows = (await session.execute(
1044 select(MusehubCommitGraph.commit_id, MusehubCommitGraph.generation)
1045 .where(MusehubCommitGraph.commit_id.in_(external_pids))
1046 )).all()
1047 db_parent_gens = {cid: gval for cid, gval in gen_rows}
1048
1049 inline_gen: dict[str, int] = {}
1050 graph_rows: list[dict] = []
1051 now = _utc_now()
1052 for cd in topo:
1053 cid = cd.get("commit_id", "")
1054 if not cid:
1055 continue
1056 pids = [p for p in [cd.get("parent_commit_id") or "", cd.get("parent2_commit_id") or ""] if p]
1057 parent_gens = [inline_gen[p] if p in inline_gen else db_parent_gens[p]
1058 for p in pids if p in inline_gen or p in db_parent_gens]
1059 gen = (max(parent_gens) + 1) if parent_gens else 0
1060 inline_gen[cid] = gen
1061 graph_rows.append({
1062 "commit_id": cid,
1063 "parent_ids": pids,
1064 "generation": gen,
1065 "snapshot_id": cd.get("snapshot_id") or None,
1066 "created_at": now,
1067 })
1068
1069 if not graph_rows:
1070 return 0
1071
1072 _CHUNK = 2000
1073 written = 0
1074 for i in range(0, len(graph_rows), _CHUNK):
1075 chunk = graph_rows[i: i + _CHUNK]
1076 stmt = _pg_insert(MusehubCommitGraph).values(chunk)
1077 await session.execute(stmt.on_conflict_do_update(
1078 index_elements=["commit_id"],
1079 set_={
1080 "generation": stmt.excluded.generation,
1081 "snapshot_id": stmt.excluded.snapshot_id,
1082 },
1083 ))
1084 written += len(chunk)
1085 return written
1086
1087
1088 async def process_mpack_index_job(session: AsyncSession, job_id: str) -> JSONObject:
1089 """Populate MusehubMPackIndex byte-ranges for every object in a pushed mpack.
1090
1091 Called by the background worker after every push. Reads the mpack from
1092 MinIO, computes byte offsets for each blob, then bulk-upserts
1093 MusehubMPackIndex rows with byte_offset and byte_length so the fetch path
1094 can serve blobs via byte-range GETs instead of downloading the full mpack.
1095
1096 Returns a result dict with counts for logging.
1097 """
1098 t0 = _time_module.monotonic()
1099
1100 job_row = (await session.execute(
1101 select(MusehubBackgroundJob).where(MusehubBackgroundJob.job_id == job_id)
1102 )).scalar_one_or_none()
1103 if job_row is None:
1104 raise ValueError(f"mpack.index job not found: {job_id}")
1105
1106 payload = job_row.payload or {}
1107 repo_id = job_row.repo_id
1108 mpack_key = str(payload.get("mpack_key", ""))
1109 if not mpack_key:
1110 raise ValueError(f"mpack.index job {job_id} missing mpack_key in payload")
1111
1112 from musehub.storage.backends import get_backend
1113 backend = get_backend()
1114 wire_bytes = await backend.get_mpack(mpack_key)
1115 if not wire_bytes:
1116 raise ValueError(f"mpack {mpack_key[:20]} not found in storage")
1117
1118 t_fetched = _time_module.monotonic()
1119
1120 # Parse: handle both MUSE binary and legacy msgpack formats.
1121 if wire_bytes[:4] == b"MUSE":
1122 from muse.core.mpack import parse_wire_mpack as _parse_wire
1123 mpack: JSONObject = _parse_wire(wire_bytes)
1124 else:
1125 mpack = _msgpack.unpackb(wire_bytes, raw=False)
1126
1127 raw_blobs: list[JSONObject] = mpack.get("blobs") or mpack.get("objects") or []
1128 t_parsed = _time_module.monotonic()
1129
1130 logger.info(
1131 "[mpack.index] job=%s fetch=%.3fs parse=%.3fs blobs=%d mpack=%d KB",
1132 job_id[:16], t_fetched - t0, t_parsed - t_fetched,
1133 len(raw_blobs), len(wire_bytes) // 1024,
1134 )
1135
1136 # Compute byte offsets for every object in the MUSE binary mpack.
1137 byte_offsets: dict[str, tuple[int, int]] = {}
1138 if wire_bytes[:4] == b"MUSE":
1139 all_oids = [obj.get("object_id", "") for obj in raw_blobs if obj.get("object_id")]
1140 byte_offsets = compute_object_byte_offsets(wire_bytes)
1141
1142 # Upsert MusehubMPackIndex rows — one per blob with byte_offset/byte_length.
1143 now = _utc_now()
1144 _CHUNK = 2000
1145 index_rows = []
1146 for oid, (offset, length) in byte_offsets.items():
1147 index_rows.append({
1148 "entity_id": oid,
1149 "mpack_id": mpack_key,
1150 "entity_type": "object",
1151 "created_at": now,
1152 "byte_offset": offset,
1153 "byte_length": length,
1154 })
1155
1156 # Also index any blobs without byte-range data (fallback: full-mpack serve).
1157 indexed_oids = set(byte_offsets.keys())
1158 for obj in raw_blobs:
1159 oid = obj.get("object_id", "")
1160 if oid and oid not in indexed_oids:
1161 index_rows.append({
1162 "entity_id": oid,
1163 "mpack_id": mpack_key,
1164 "entity_type": "object",
1165 "created_at": now,
1166 })
1167
1168 written = 0
1169 for i in range(0, len(index_rows), _CHUNK):
1170 chunk = index_rows[i: i + _CHUNK]
1171 stmt = _pg_insert(MusehubMPackIndex).values(chunk)
1172 # On conflict: update byte_offset/byte_length if we now have them.
1173 await session.execute(
1174 stmt.on_conflict_do_update(
1175 index_elements=["entity_id", "mpack_id"],
1176 set_={
1177 "byte_offset": stmt.excluded.byte_offset,
1178 "byte_length": stmt.excluded.byte_length,
1179 },
1180 where=MusehubMPackIndex.byte_offset.is_(None),
1181 )
1182 )
1183 written += len(chunk)
1184
1185 # Upsert MusehubObject rows — storage_uri points to covering mpack, no per-object S3 PUT.
1186 # Must happen before object_refs because object_refs has a FK on musehub_objects.
1187 all_blob_oids = [obj.get("object_id", "") for obj in raw_blobs if obj.get("object_id")]
1188 if all_blob_oids:
1189 mpack_uri = f"mpack://{mpack_key}"
1190 obj_rows = [
1191 {"object_id": oid, "storage_uri": mpack_uri, "path": "", "created_at": now}
1192 for oid in all_blob_oids
1193 ]
1194 for i in range(0, len(obj_rows), _CHUNK):
1195 chunk = obj_rows[i: i + _CHUNK]
1196 obj_stmt = _pg_insert(MusehubObject).values(chunk)
1197 await session.execute(
1198 obj_stmt.on_conflict_do_update(
1199 index_elements=["object_id"],
1200 set_={"storage_uri": obj_stmt.excluded.storage_uri},
1201 where=MusehubObject.storage_uri.is_(None),
1202 )
1203 )
1204
1205 # Write object_refs after musehub_objects so the FK constraint is satisfied.
1206 if all_blob_oids and repo_id:
1207 await _upsert_object_refs(session, repo_id, all_blob_oids)
1208
1209 # Build commit graph rows from mpack commits.
1210 raw_commits: list[dict] = mpack.get("commits") or []
1211 graph_written = await _build_commit_graph_from_raw(session, raw_commits)
1212
1213 t_done = _time_module.monotonic()
1214 logger.info(
1215 "✅ [mpack.index] done job=%s index_rows=%d graph_rows=%d elapsed=%.3fs",
1216 job_id[:16], written, graph_written, t_done - t0,
1217 )
1218 return {
1219 "mpack_index_written": written,
1220 "byte_ranges_computed": len(byte_offsets),
1221 "commit_graph_written": graph_written,
1222 "mpack_size_bytes": len(wire_bytes),
1223 "elapsed_ms": (t_done - t0) * 1000,
1224 }
File History 18 commits
sha256:80c35083d2abe19ef730693f2b35ad9a9342d2abbe81e802e6f397fe9ae75eb9 merge: pull staging/dev — asyncpg batch fix + content-addre… Sonnet 4.6 patch 8 hours ago
sha256:5667a3e21bf16fd2e6d6bd4a769bd1c0cf7634afa12cef6450cc77573196b7f9 asyncpg caps query parameters Human patch 10 hours ago
sha256:c3e12fae919d99c0e030595f58eb0a7b7df9ae4bd4ed6ce582a46c33784a54d0 fix(mpack-index-job): write object_refs after musehub_objec… Sonnet 4.6 patch 3 days ago
sha256:8ba2515ec3cb2a4089d54422f342b5a47161df29d5db72cb7356591d128fba6a fix(wire-push): write object_refs and mpack_index for all m… Sonnet 4.6 patch 3 days ago
sha256:009b5a222314f47640a58d75ce5a1f428f1624cf0b51384dfcdfbdfab3cc42a4 feat: migration idempotency, file attribution DAG walk, mpa… Sonnet 4.6 minor 7 days ago
sha256:92528ae07d0e1239d87fd5fd1f439e8fbb49c9778a9a400bc4a736073fb28316 feat: byte-range blob reads, file attribution DAG walk, bra… Sonnet 4.6 minor 7 days ago
sha256:f3995ec2c05c9c34b0e4d6e96349a811d0117a1c51d78096d757998ccb3c0520 fix: blobs only in S3/mpack — remove commit/snapshot indivi… Sonnet 4.6 patch 9 days ago
sha256:8a7ff43f27504c1f6abba59ffbab3dc89bd1d8bcfa4c57f6e280286d1a58195a feat: resurrect process_mpack_index_job with byte-range ind… Sonnet 4.6 patch 9 days ago
sha256:ad616c6113d6c00f4efed6b2993734ca46d3e9b5bee25addd4ce8ae6b57136e5 chore: bump version to 0.2.0rc11; typing audit clean + all … Sonnet 4.6 minor 11 days ago
sha256:ecca645d94c5f39c88f4bc1283447ba0f4635ef3cbb11d0cd9b3759cba289d00 fix: compute_snapshot_id uses typed-object formula and wire… Sonnet 4.6 minor 11 days ago
sha256:e35be48854f182f7bf02dc6cc0f58d22b3de3a544b570c0e2bc53f9e75a3607d feat(phase6): remove delta_blob path, dead imports, add fal… Sonnet 4.6 minor 12 days ago
sha256:450998d182617fa93b737cbbdb3fe956c61566051739acec8c63ec5e7b4587f8 feat(phase3): write snapshot objects to S3 at all 3 write s… Sonnet 4.6 patch 12 days ago
sha256:e597c0b97ade9c3c52ac4735ceb437ee69d1b6f0db61b8d7caa6467c5866566d feat(phase2): write commit objects to S3 at all 5 write sit… Sonnet 4.6 patch 12 days ago
sha256:6b1949fc2797ca4c1936a637a4cbfec828ef56cf52398a2e74ca3c4f494e728f fix: use wire_bytes not mpack_bytes_raw in compute_object_b… Sonnet 4.6 patch 12 days ago
sha256:302574ddba13c9a20694c0fb051176eef4896f943b63bc458df886633b1bfcd6 feat: mpack byte-range index — store byte_offset/byte_lengt… Sonnet 4.6 minor 12 days ago
sha256:ba425baae57a4223a95f2cf4841181d790d04d02b9c98d0371f7b43046d92ff4 fix: store directories from incoming snapshot deltas instea… Sonnet 4.6 patch 12 days ago
sha256:4c039010d7b35b3bb7e9242df3e73ff50336264cb89a85a7509dc959e8516fc1 fix: store signer_public_key from incoming commits instead … Sonnet 4.6 patch 12 days ago
sha256:1b52d53ca7bae6c2b32297aaa798cd9f25e8241f457e8de4186aded84ab9c4a1 debug: log full presign URL and get_mpack key/bytes to isol… Sonnet 4.6 minor 14 days ago