musehub_wire_push.py
python
sha256:80c35083d2abe19ef730693f2b35ad9a9342d2abbe81e802e6f397fe9ae75eb9
merge: pull staging/dev — asyncpg batch fix + content-addre…
Sonnet 4.6
patch
8 hours ago
| 1 | """Push path — wire_refs, wire_repair_*, wire_push_mpack_presign, wire_push_unpack_mpack.""" |
| 2 | |
| 3 | from typing import TypedDict |
| 4 | |
| 5 | import asyncio |
| 6 | import collections |
| 7 | import hashlib |
| 8 | import logging |
| 9 | import msgpack as _msgpack |
| 10 | import time as _time_module |
| 11 | from datetime import datetime, timezone |
| 12 | |
| 13 | from sqlalchemy import func, select, text as _sa_text |
| 14 | from sqlalchemy.dialects.postgresql import insert as _pg_insert |
| 15 | from sqlalchemy.ext.asyncio import AsyncSession |
| 16 | |
| 17 | from musehub.db.musehub_abuse_models import MusehubBlockedHash, MusehubDailyPushBytes, MusehubPushAnomaly |
| 18 | from musehub.db.musehub_jobs_models import MusehubBackgroundJob |
| 19 | from musehub.db.musehub_collaborator_models import MusehubCollaborator |
| 20 | from musehub.db.musehub_repo_models import ( |
| 21 | MusehubBranch, |
| 22 | MusehubCommit, |
| 23 | MusehubCommitGraph, |
| 24 | MusehubCommitRef, |
| 25 | MusehubObject, |
| 26 | MusehubObjectRef, |
| 27 | MusehubMPackIndex, |
| 28 | MusehubRepo, |
| 29 | MusehubSnapshot, |
| 30 | MusehubSnapshotRef, |
| 31 | ) |
| 32 | from musehub.models.wire import WireCommit, WireRefsResponse |
| 33 | from muse.core.types import blob_id, split_id |
| 34 | from musehub.core.genesis import compute_branch_id |
| 35 | from musehub.types.json_types import IntDict, JSONObject, JSONValue, StrDict |
| 36 | from musehub.config import settings |
| 37 | from musehub.storage import get_backend |
| 38 | from musehub.storage.backends import read_object_bytes |
| 39 | |
| 40 | from musehub.services.musehub_wire_shared import ( |
| 41 | MPackValidationError, |
| 42 | NonFastForwardError, |
| 43 | ObjectHashMismatch, |
| 44 | RepairResult, |
| 45 | _ChildMap, |
| 46 | _is_fast_forward, |
| 47 | _reconstruct_manifest, |
| 48 | _to_wire_commit, |
| 49 | _upsert_object_refs, |
| 50 | _utc_now, |
| 51 | _parse_iso, |
| 52 | _str_list, |
| 53 | _int_safe, |
| 54 | logger, |
| 55 | ) |
| 56 | |
| 57 | _ObjFetchMap = dict[str, tuple[str, str, str | None, bytes | None]] |
| 58 | _SnapshotManifest = dict[str, str] |
| 59 | _EMPTY_OID = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" |
| 60 | _COMMIT_BATCH = 50 |
| 61 | |
| 62 | |
| 63 | async def wire_refs( |
| 64 | session: AsyncSession, |
| 65 | repo_id: str, |
| 66 | ) -> WireRefsResponse | None: |
| 67 | _t0 = _time_module.perf_counter() |
| 68 | def _ms() -> float: |
| 69 | return (_time_module.perf_counter() - _t0) * 1000 |
| 70 | |
| 71 | logger.info("[wire_refs] START repo_id=%s", repo_id) |
| 72 | |
| 73 | repo_row = await session.get(MusehubRepo, repo_id) |
| 74 | logger.info("[wire_refs] repo_row loaded found=%s t=%.1fms", repo_row is not None, _ms()) |
| 75 | if repo_row is None: |
| 76 | return None |
| 77 | |
| 78 | branch_rows = ( |
| 79 | await session.execute( |
| 80 | select(MusehubBranch).where(MusehubBranch.repo_id == repo_id) |
| 81 | ) |
| 82 | ).scalars().all() |
| 83 | logger.info("[wire_refs] branches loaded count=%d t=%.1fms", len(branch_rows), _ms()) |
| 84 | |
| 85 | branch_heads: StrDict = { |
| 86 | b.name: b.head_commit_id |
| 87 | for b in branch_rows |
| 88 | if b.head_commit_id |
| 89 | } |
| 90 | |
| 91 | domain_meta: JSONObject = ( |
| 92 | repo_row.domain_meta if isinstance(repo_row.domain_meta, dict) else {} |
| 93 | ) |
| 94 | domain = str(domain_meta.get("domain", "code")) |
| 95 | default_branch = getattr(repo_row, "default_branch", None) or "main" |
| 96 | |
| 97 | logger.info("[wire_refs] RETURN branches=%d TOTAL=%.1fms", len(branch_heads), _ms()) |
| 98 | return WireRefsResponse( |
| 99 | repo_id=repo_id, |
| 100 | domain=domain, |
| 101 | default_branch=default_branch, |
| 102 | branch_heads=branch_heads, |
| 103 | ) |
| 104 | |
| 105 | async def wire_repair_object( |
| 106 | session: AsyncSession, |
| 107 | repo_id: str, |
| 108 | object_id: str, |
| 109 | content: bytes, |
| 110 | caller_id: str | None, |
| 111 | ) -> RepairResult: |
| 112 | repo_row = await session.get(MusehubRepo, repo_id) |
| 113 | if repo_row is None: |
| 114 | raise ValueError("repo not found") |
| 115 | |
| 116 | if not caller_id: |
| 117 | raise PermissionError("repair rejected: unauthenticated") |
| 118 | if caller_id != repo_row.owner: |
| 119 | collab_row = (await session.execute( |
| 120 | select(MusehubCollaborator).where( |
| 121 | MusehubCollaborator.repo_id == repo_id, |
| 122 | MusehubCollaborator.identity_handle == caller_id, |
| 123 | MusehubCollaborator.accepted_at.isnot(None), |
| 124 | MusehubCollaborator.permission.in_(["write", "admin"]), |
| 125 | ) |
| 126 | )).scalar_one_or_none() |
| 127 | if collab_row is None: |
| 128 | raise PermissionError("repair rejected: not authorized") |
| 129 | |
| 130 | actual_id = blob_id(content) |
| 131 | if actual_id != object_id: |
| 132 | _, expected_hex = split_id(object_id) |
| 133 | _, actual = split_id(actual_id) |
| 134 | raise ObjectHashMismatch( |
| 135 | f"repair content hash mismatch for {object_id!r}: " |
| 136 | f"declared={expected_hex[:16]}… actual={actual[:16]}…" |
| 137 | ) |
| 138 | |
| 139 | backend = get_backend() |
| 140 | uri = await backend.put(object_id, content) |
| 141 | |
| 142 | await session.merge(MusehubObject( |
| 143 | object_id=object_id, path="", size_bytes=len(content), |
| 144 | storage_uri=uri, content_cache=None, |
| 145 | )) |
| 146 | await _upsert_object_refs(session, repo_id, [object_id]) |
| 147 | await session.commit() |
| 148 | |
| 149 | logger.info( |
| 150 | "✅ repair-object repo=%s object_id=%s size=%d", |
| 151 | repo_id, object_id, len(content), |
| 152 | ) |
| 153 | return {"repaired": True} |
| 154 | |
| 155 | async def wire_fetch_objects( |
| 156 | session: AsyncSession, |
| 157 | repo_id: str, |
| 158 | object_ids: list[str], |
| 159 | ) -> list[dict]: |
| 160 | result: list[dict] = [] |
| 161 | for oid in object_ids: |
| 162 | obj_row = await session.get(MusehubObject, oid) |
| 163 | if obj_row is None: |
| 164 | continue |
| 165 | content: bytes | None = await read_object_bytes(obj_row, session=session) |
| 166 | if content is None: |
| 167 | continue |
| 168 | result.append({"object_id": oid, "content": content}) |
| 169 | return result |
| 170 | |
| 171 | async def wire_repair_snapshot( |
| 172 | session: AsyncSession, |
| 173 | repo_id: str, |
| 174 | snapshot_id: str, |
| 175 | manifest: dict[str, str], |
| 176 | directories: list[str], |
| 177 | caller_id: str | None, |
| 178 | ) -> RepairResult: |
| 179 | from musehub.muse_cli.snapshot import compute_snapshot_id |
| 180 | |
| 181 | repo_row = await session.get(MusehubRepo, repo_id) |
| 182 | if repo_row is None: |
| 183 | raise ValueError("repo not found") |
| 184 | |
| 185 | if not caller_id: |
| 186 | raise PermissionError("repair rejected: unauthenticated") |
| 187 | if caller_id != repo_row.owner: |
| 188 | collab_row = (await session.execute( |
| 189 | select(MusehubCollaborator).where( |
| 190 | MusehubCollaborator.repo_id == repo_id, |
| 191 | MusehubCollaborator.identity_handle == caller_id, |
| 192 | MusehubCollaborator.accepted_at.isnot(None), |
| 193 | MusehubCollaborator.permission.in_(["write", "admin"]), |
| 194 | ) |
| 195 | )).scalar_one_or_none() |
| 196 | if collab_row is None: |
| 197 | raise PermissionError("repair rejected: not authorized") |
| 198 | |
| 199 | recomputed = compute_snapshot_id(manifest, directories or []) |
| 200 | if recomputed != snapshot_id: |
| 201 | raise ObjectHashMismatch( |
| 202 | f"repair snapshot hash mismatch for {snapshot_id!r}: " |
| 203 | f"declared={snapshot_id}… recomputed={recomputed}…" |
| 204 | ) |
| 205 | |
| 206 | import msgpack as _msgpack |
| 207 | blob = _msgpack.packb(manifest, use_bin_type=True) |
| 208 | dirs_sorted = sorted(directories) if directories else [] |
| 209 | await session.merge(MusehubSnapshot( |
| 210 | snapshot_id=snapshot_id, |
| 211 | directories=dirs_sorted, |
| 212 | manifest_blob=blob, |
| 213 | entry_count=len(manifest), |
| 214 | created_at=_utc_now(), |
| 215 | )) |
| 216 | await session.execute( |
| 217 | _pg_insert(MusehubSnapshotRef) |
| 218 | .values([{"repo_id": repo_id, "snapshot_id": snapshot_id, "created_at": _utc_now()}]) |
| 219 | .on_conflict_do_nothing(index_elements=["repo_id", "snapshot_id"]) |
| 220 | ) |
| 221 | await session.commit() |
| 222 | |
| 223 | logger.info( |
| 224 | "✅ repair-snapshot repo=%s snapshot_id=%s entries=%d", |
| 225 | repo_id, snapshot_id, len(manifest), |
| 226 | ) |
| 227 | return {"repaired": True} |
| 228 | |
| 229 | async def _fetch_commit( |
| 230 | session: AsyncSession, |
| 231 | commit_id: str, |
| 232 | ) -> MusehubCommit | None: |
| 233 | return await session.get(MusehubCommit, commit_id) |
| 234 | |
| 235 | _ObjFetchMap = dict[str, tuple[str, str, str | None, bytes | None]] |
| 236 | |
| 237 | async def _fetch_object_meta( |
| 238 | session: AsyncSession, |
| 239 | repo_id: str, |
| 240 | object_ids: list[str], |
| 241 | ) -> _ObjFetchMap: |
| 242 | result: _ObjFetchMap = {} |
| 243 | if _EMPTY_OID in object_ids: |
| 244 | result[_EMPTY_OID] = (repo_id, "", None, None) |
| 245 | remaining = [oid for oid in object_ids if oid != _EMPTY_OID] |
| 246 | if remaining: |
| 247 | obj_rows_q = await session.execute( |
| 248 | select(MusehubObject).where(MusehubObject.object_id.in_(remaining)) |
| 249 | ) |
| 250 | result.update({ |
| 251 | r.object_id: (repo_id, r.path or "", r.storage_uri, r.content_cache) |
| 252 | for r in obj_rows_q.scalars().all() |
| 253 | }) |
| 254 | return result |
| 255 | |
| 256 | def compute_object_byte_offsets(wire_bytes: bytes) -> "dict[str, tuple[int, int]]": |
| 257 | """Return {object_id: (absolute_byte_offset, byte_length)} for every object |
| 258 | in the OBJECTS section of a wire mpack binary. |
| 259 | |
| 260 | Parses the section table to find the OBJECTS section, then walks the |
| 261 | ``_build_pack`` layout to compute each object's content start position |
| 262 | within *wire_bytes*. The returned offsets are absolute — callers can issue |
| 263 | ``Range: bytes=offset-offset+length-1`` S3 GETs directly against the mpack. |
| 264 | |
| 265 | Returns an empty dict for wire mpacks with no OBJECTS section. |
| 266 | """ |
| 267 | import struct as _struct |
| 268 | |
| 269 | _WIRE_SEC_BLOBS = 1 |
| 270 | _OID_BYTES = 71 # len("sha256:") + 64 hex chars |
| 271 | _PACK_HEADER = 13 # magic(4) + version(1) + count(8) |
| 272 | |
| 273 | if len(wire_bytes) < 6 or wire_bytes[:4] != b"MUSE": |
| 274 | return {} |
| 275 | |
| 276 | section_count = wire_bytes[5] |
| 277 | cursor = 6 |
| 278 | objects_section_offset: int | None = None |
| 279 | for _ in range(section_count): |
| 280 | sec_type = wire_bytes[cursor] |
| 281 | (sec_offset,) = _struct.unpack_from("<Q", wire_bytes, cursor + 1) |
| 282 | if sec_type == _WIRE_SEC_BLOBS: |
| 283 | objects_section_offset = sec_offset |
| 284 | break |
| 285 | cursor += 17 |
| 286 | |
| 287 | if objects_section_offset is None: |
| 288 | return {} |
| 289 | |
| 290 | # Walk the _build_pack layout within the OBJECTS section |
| 291 | pack_start = objects_section_offset |
| 292 | if wire_bytes[pack_start : pack_start + 4] != b"MUSE": |
| 293 | return {} # not a valid pack section |
| 294 | |
| 295 | (object_count,) = _struct.unpack_from("<Q", wire_bytes, pack_start + 5) |
| 296 | pos = pack_start + _PACK_HEADER |
| 297 | result: dict[str, tuple[int, int]] = {} |
| 298 | |
| 299 | for _ in range(object_count): |
| 300 | oid = wire_bytes[pos : pos + _OID_BYTES].decode("ascii", errors="replace") |
| 301 | pos += _OID_BYTES |
| 302 | (length,) = _struct.unpack_from("<Q", wire_bytes, pos) |
| 303 | pos += 8 # skip length field |
| 304 | result[oid] = (pos, length) # pos is now at content start |
| 305 | pos += length |
| 306 | |
| 307 | return result |
| 308 | |
| 309 | |
| 310 | def _topological_sort(commits: list[WireCommit]) -> list[WireCommit]: |
| 311 | if not commits: |
| 312 | return [] |
| 313 | by_id = {c.commit_id: c for c in commits} |
| 314 | in_degree: IntDict = {c.commit_id: 0 for c in commits} |
| 315 | children: _ChildMap = {c.commit_id: [] for c in commits} |
| 316 | |
| 317 | for c in commits: |
| 318 | for pid in filter(None, [c.parent_commit_id, c.parent2_commit_id]): |
| 319 | if pid in by_id: |
| 320 | in_degree[c.commit_id] += 1 |
| 321 | children[pid].append(c.commit_id) |
| 322 | |
| 323 | queue: collections.deque[str] = collections.deque( |
| 324 | cid for cid, deg in in_degree.items() if deg == 0 |
| 325 | ) |
| 326 | result: list[WireCommit] = [] |
| 327 | while queue: |
| 328 | cid = queue.popleft() |
| 329 | result.append(by_id[cid]) |
| 330 | for child_id in children.get(cid, []): |
| 331 | in_degree[child_id] -= 1 |
| 332 | if in_degree[child_id] == 0: |
| 333 | queue.append(child_id) |
| 334 | |
| 335 | sorted_ids = {c.commit_id for c in result} |
| 336 | for c in commits: |
| 337 | if c.commit_id not in sorted_ids: |
| 338 | result.append(c) |
| 339 | |
| 340 | return result |
| 341 | |
| 342 | async def record_mpack_bytes_uploaded( |
| 343 | session: AsyncSession, |
| 344 | identity_id: str, |
| 345 | size_bytes: int, |
| 346 | ) -> None: |
| 347 | import datetime as _dt |
| 348 | today = _dt.date.today() |
| 349 | await session.execute( |
| 350 | _pg_insert(MusehubDailyPushBytes) |
| 351 | .values(identity_id=identity_id, date=today, bytes_uploaded=size_bytes) |
| 352 | .on_conflict_do_update( |
| 353 | index_elements=["identity_id", "date"], |
| 354 | set_={"bytes_uploaded": MusehubDailyPushBytes.bytes_uploaded + size_bytes}, |
| 355 | ) |
| 356 | ) |
| 357 | |
| 358 | |
| 359 | async def check_push_anomaly( |
| 360 | session: AsyncSession, |
| 361 | identity_id: str, |
| 362 | bytes_today: int, |
| 363 | ) -> bool: |
| 364 | import datetime as _dt |
| 365 | import uuid as _uuid |
| 366 | |
| 367 | today = _dt.date.today() |
| 368 | cutoff = today - _dt.timedelta(days=30) |
| 369 | |
| 370 | result = await session.execute( |
| 371 | select(func.avg(MusehubDailyPushBytes.bytes_uploaded)).where( |
| 372 | MusehubDailyPushBytes.identity_id == identity_id, |
| 373 | MusehubDailyPushBytes.date >= cutoff, |
| 374 | MusehubDailyPushBytes.date < today, |
| 375 | ) |
| 376 | ) |
| 377 | avg_bytes = result.scalar() |
| 378 | if avg_bytes is None or avg_bytes == 0: |
| 379 | return False |
| 380 | |
| 381 | ratio = bytes_today / float(avg_bytes) |
| 382 | if ratio <= 10.0: |
| 383 | return False |
| 384 | |
| 385 | anomaly_id = "sha256:" + hashlib.sha256( |
| 386 | f"{identity_id}:{today.isoformat()}:{bytes_today}".encode() |
| 387 | ).hexdigest() |
| 388 | logger.warning( |
| 389 | "[anomaly] push spike detected identity_id=%s bytes_today=%d avg_bytes=%.0f ratio=%.1fx", |
| 390 | identity_id[:20], bytes_today, avg_bytes, ratio, |
| 391 | ) |
| 392 | session.add(MusehubPushAnomaly( |
| 393 | anomaly_id=anomaly_id, |
| 394 | identity_id=identity_id, |
| 395 | bytes_today=bytes_today, |
| 396 | rolling_avg_bytes=float(avg_bytes), |
| 397 | ratio=ratio, |
| 398 | )) |
| 399 | return True |
| 400 | |
| 401 | |
| 402 | async def wire_push_mpack_presign( |
| 403 | mpack_key: str, |
| 404 | size_bytes: int, |
| 405 | ttl_seconds: int = 3600, |
| 406 | ) -> JSONObject: |
| 407 | logger.warning("[mpack-presign] START mpack_key=%s size_bytes=%d ttl=%ds", mpack_key, size_bytes, ttl_seconds) |
| 408 | backend = get_backend() |
| 409 | upload_url = await backend.presign_mpack_put(mpack_key, ttl_seconds) |
| 410 | logger.warning("[mpack-presign] upload_url=%s", upload_url) |
| 411 | return {"upload_url": upload_url, "mpack_key": mpack_key} |
| 412 | |
| 413 | |
| 414 | async def wire_push_unpack_mpack( |
| 415 | session: AsyncSession, |
| 416 | repo_id: str, |
| 417 | mpack_key: str, |
| 418 | pusher_id: str | None, |
| 419 | branch: str = "main", |
| 420 | head_commit_id: str = "", |
| 421 | commits_count: int = 0, |
| 422 | blobs_count: int = 0, |
| 423 | force: bool = False, |
| 424 | ) -> JSONObject: |
| 425 | t0 = _time_module.monotonic() |
| 426 | def _ms(ref: float = 0.0) -> float: |
| 427 | return (_time_module.monotonic() - (t0 if ref == 0.0 else ref)) * 1000 |
| 428 | |
| 429 | from musehub.storage.backends import get_backend |
| 430 | backend = get_backend() |
| 431 | |
| 432 | logger.warning( |
| 433 | "[SERVER step 0] receive repo=%s branch=%s head=%s", |
| 434 | repo_id[:20], branch, head_commit_id[:20] if head_commit_id else "none", |
| 435 | ) |
| 436 | |
| 437 | # step 1: fetch mpack bytes from MinIO by mpack_key |
| 438 | wire_bytes = await backend.get_mpack(mpack_key) |
| 439 | t_got = _time_module.monotonic() |
| 440 | if not wire_bytes: |
| 441 | raise ValueError(f"mpack {mpack_key[:20]} not found in storage") |
| 442 | mpack_bytes_len = len(wire_bytes) |
| 443 | logger.warning( |
| 444 | "[SERVER step 1] fetched mpack from object store %.1fms size=%d bytes (%.4f MB) key=%s", |
| 445 | _ms(), mpack_bytes_len, mpack_bytes_len / 1_048_576, mpack_key, |
| 446 | ) |
| 447 | |
| 448 | from musehub.config import get_settings as _get_settings |
| 449 | _max_bytes = _get_settings().mpack_max_bytes |
| 450 | if mpack_bytes_len > _max_bytes: |
| 451 | raise ValueError( |
| 452 | f"mpack {mpack_key[:20]} size {mpack_bytes_len:,} bytes " |
| 453 | f"exceeds limit {_max_bytes:,} bytes" |
| 454 | ) |
| 455 | |
| 456 | # step 2: verify blob_id(bytes) == mpack_key |
| 457 | actual_id = blob_id(wire_bytes) |
| 458 | t_verify = _time_module.monotonic() |
| 459 | _, expected_hex = split_id(mpack_key) |
| 460 | _, got_hex = split_id(actual_id) |
| 461 | match = actual_id == mpack_key |
| 462 | logger.warning( |
| 463 | "[SERVER step 2] integrity check %.1fms expected=%s got=%s match=%s", |
| 464 | (t_verify - t_got) * 1000, expected_hex[:20], got_hex[:20], match, |
| 465 | ) |
| 466 | if not match: |
| 467 | raise ValueError( |
| 468 | f"mpack integrity failure: sha256={got_hex[:20]}… ≠ key={expected_hex[:20]}…" |
| 469 | ) |
| 470 | |
| 471 | # ── 2. Inline index ─────────────────────────────────────────────────────── |
| 472 | import zstandard as _zstd_sync |
| 473 | |
| 474 | if wire_bytes[:4] != b"MUSE": |
| 475 | raise ValueError(f"mpack is not MUSE binary format (got {wire_bytes[:4]!r})") |
| 476 | from muse.core.mpack import parse_wire_mpack as _parse_wire_mpack |
| 477 | _mpack_sync: JSONObject = _parse_wire_mpack(wire_bytes) |
| 478 | t_parse = _time_module.monotonic() |
| 479 | _raw_blobs: list[JSONValue] = _mpack_sync.get("blobs") or [] |
| 480 | _raw_snaps: list[JSONValue] = _mpack_sync.get("snapshots") or [] |
| 481 | _raw_commits_inline: list[JSONValue] = _mpack_sync.get("commits") or [] |
| 482 | logger.warning("[SERVER step 3+4] verify MUSE magic + parse %.1fms commits=%d snapshots=%d blobs=%d", |
| 483 | (t_parse - t_verify) * 1000, len(_raw_commits_inline), len(_raw_snaps), len(_raw_blobs)) |
| 484 | |
| 485 | # 2b. zstd decompress all blobs — zip bomb guard runs inline |
| 486 | _dctx_sync = _zstd_sync.ZstdDecompressor() |
| 487 | _decompressed_sync: list[tuple[str, bytes]] = [] |
| 488 | _total_blob_bytes = 0 |
| 489 | _max_decompressed = settings.mpack_max_decompressed_bytes |
| 490 | for _obj in _raw_blobs: |
| 491 | _oid = _obj["object_id"] |
| 492 | _raw = _obj.get("content") or b"" |
| 493 | if _obj.get("encoding") == "zstd" and _raw: |
| 494 | _raw = _dctx_sync.decompress(_raw) |
| 495 | _total_blob_bytes += len(_raw) |
| 496 | if _total_blob_bytes > _max_decompressed: |
| 497 | _zip_err = MPackValidationError( |
| 498 | f"decompressed size {_total_blob_bytes:,} exceeds limit " |
| 499 | f"{_max_decompressed:,} — possible zip bomb" |
| 500 | ) |
| 501 | await backend.quarantine_mpack(mpack_key) |
| 502 | raise _zip_err |
| 503 | _decompressed_sync.append((_oid, _raw)) |
| 504 | t_decomp = _time_module.monotonic() |
| 505 | logger.warning("[SERVER step 5] decompress + zip-bomb check %.1fms blobs=%d uncompressed=%.2f MB", |
| 506 | (t_decomp - t_parse) * 1000, len(_decompressed_sync), _total_blob_bytes / 1_048_576) |
| 507 | |
| 508 | # step 6: check blocked hashes |
| 509 | if _decompressed_sync: |
| 510 | _all_check_oids = [_oid for _oid, _ in _decompressed_sync] |
| 511 | _blocked_oids = (await session.execute( |
| 512 | select(MusehubBlockedHash.object_id) |
| 513 | .where(MusehubBlockedHash.object_id.in_(_all_check_oids)) |
| 514 | )).scalars().all() |
| 515 | if _blocked_oids: |
| 516 | _blocked_err = MPackValidationError( |
| 517 | f"mpack contains {len(_blocked_oids)} blocked object(s): " |
| 518 | + ", ".join(oid[:20] for oid in _blocked_oids[:3]) |
| 519 | + ("..." if len(_blocked_oids) > 3 else "") |
| 520 | ) |
| 521 | await backend.quarantine_mpack(mpack_key) |
| 522 | raise _blocked_err |
| 523 | logger.warning("[SERVER step 6] check blocked hashes objects=%d blocked=0", len(_decompressed_sync)) |
| 524 | |
| 525 | # step 7: write objects |
| 526 | _new_oids: list[str] = [] |
| 527 | if _decompressed_sync: |
| 528 | _cc_oids = [_oid for _oid, _ in _decompressed_sync] |
| 529 | |
| 530 | t_obj_sel0 = _time_module.monotonic() |
| 531 | _existing_oids = set((await session.execute( |
| 532 | _sa_text("SELECT object_id FROM musehub_objects WHERE object_id = ANY(:ids)"), |
| 533 | {"ids": _cc_oids}, |
| 534 | )).scalars()) |
| 535 | t_obj_sel1 = _time_module.monotonic() |
| 536 | logger.warning("[SERVER step 7a] objects: dedup SELECT %.1fms total=%d existing=%d new=%d", |
| 537 | (t_obj_sel1 - t_obj_sel0) * 1000, len(_cc_oids), len(_existing_oids), |
| 538 | len(_cc_oids) - len(_existing_oids)) |
| 539 | |
| 540 | session.add_all([ |
| 541 | MusehubObject(object_id=_oid, path="", size_bytes=len(_data), |
| 542 | storage_uri=f"mpack://{mpack_key}", content_cache=None) |
| 543 | for _oid, _data in _decompressed_sync |
| 544 | if _oid not in _existing_oids |
| 545 | ]) |
| 546 | t_obj_add = _time_module.monotonic() |
| 547 | logger.warning("[SERVER step 7b] objects: INSERT new %.1fms", (t_obj_add - t_obj_sel1) * 1000) |
| 548 | |
| 549 | _new_oids = [_oid for _oid in _cc_oids if _oid not in _existing_oids] |
| 550 | t_ref0 = _time_module.monotonic() |
| 551 | await _upsert_object_refs(session, repo_id, _cc_oids) |
| 552 | t_ref1 = _time_module.monotonic() |
| 553 | logger.warning("[SERVER step 7c] objects: UPSERT object_refs %.1fms oids=%d", |
| 554 | (t_ref1 - t_ref0) * 1000, len(_cc_oids)) |
| 555 | |
| 556 | if _cc_oids: |
| 557 | _byte_offsets = compute_object_byte_offsets(wire_bytes) |
| 558 | _mpack_idx_rows = [ |
| 559 | { |
| 560 | "entity_id": _oid, "mpack_id": mpack_key, |
| 561 | "entity_type": "object", "created_at": _utc_now(), |
| 562 | "byte_offset": _byte_offsets.get(_oid, (None, None))[0], |
| 563 | "byte_length": _byte_offsets.get(_oid, (None, None))[1], |
| 564 | } |
| 565 | for _oid in _cc_oids |
| 566 | ] |
| 567 | # asyncpg caps query parameters at 32767; 6 columns per row → 5461 rows/batch |
| 568 | _MPACK_IDX_BATCH = 5461 |
| 569 | for _batch_start in range(0, len(_mpack_idx_rows), _MPACK_IDX_BATCH): |
| 570 | _batch = _mpack_idx_rows[_batch_start:_batch_start + _MPACK_IDX_BATCH] |
| 571 | _idx_stmt = _pg_insert(MusehubMPackIndex).values(_batch) |
| 572 | await session.execute( |
| 573 | _idx_stmt.on_conflict_do_update( |
| 574 | index_elements=["entity_id", "mpack_id"], |
| 575 | set_={ |
| 576 | "byte_offset": _idx_stmt.excluded.byte_offset, |
| 577 | "byte_length": _idx_stmt.excluded.byte_length, |
| 578 | }, |
| 579 | where=MusehubMPackIndex.byte_offset.is_(None), |
| 580 | ) |
| 581 | ) |
| 582 | t_idx_add = _time_module.monotonic() |
| 583 | logger.warning("[SERVER step 7d] objects: UPSERT mpack_index %.1fms total=%d", |
| 584 | (t_idx_add - t_ref1) * 1000, len(_cc_oids)) |
| 585 | else: |
| 586 | t_idx_add = t_decomp |
| 587 | |
| 588 | # step 7: write snapshots |
| 589 | _snap_ids_in_mpack = {_sd.get("snapshot_id") for _sd in _raw_snaps if _sd.get("snapshot_id")} |
| 590 | _external_parent_sids = { |
| 591 | _sd.get("parent_snapshot_id") |
| 592 | for _sd in _raw_snaps |
| 593 | if _sd.get("parent_snapshot_id") and _sd.get("parent_snapshot_id") not in _snap_ids_in_mpack |
| 594 | } |
| 595 | _parent_snap_manifests: dict[str, dict] = {} |
| 596 | t_snap_ext0 = _time_module.monotonic() |
| 597 | if _external_parent_sids: |
| 598 | _psnap_rows = (await session.execute( |
| 599 | select(MusehubSnapshot.snapshot_id, MusehubSnapshot.manifest_blob) |
| 600 | .where(MusehubSnapshot.snapshot_id.in_(_external_parent_sids)) |
| 601 | )).all() |
| 602 | for _psid, _pblob in _psnap_rows: |
| 603 | if _pblob: |
| 604 | _parent_snap_manifests[_psid] = dict(_msgpack.unpackb(_pblob, raw=False)) |
| 605 | t_snap_ext1 = _time_module.monotonic() |
| 606 | logger.warning("[SERVER step 8a] snapshots: fetch external parents %.1fms external=%d loaded=%d", |
| 607 | (t_snap_ext1 - t_snap_ext0) * 1000, len(_external_parent_sids), len(_parent_snap_manifests)) |
| 608 | |
| 609 | _head_snap_id: str | None = None |
| 610 | for _cd in _raw_commits_inline: |
| 611 | if _cd.get("commit_id") == head_commit_id: |
| 612 | _head_snap_id = _cd.get("snapshot_id") or None |
| 613 | break |
| 614 | |
| 615 | _snap_resolved: dict[str, _SnapshotManifest] = {} |
| 616 | _snap_rows_inline = [] |
| 617 | _n_snap_root = 0 |
| 618 | _n_snap_head = 0 |
| 619 | _n_snap_delta_only = 0 |
| 620 | _snap_manifest_bytes_total = 0 |
| 621 | _snap_delta_bytes_total = 0 |
| 622 | |
| 623 | t_snap_loop0 = _time_module.monotonic() |
| 624 | for _sd in _raw_snaps: |
| 625 | _sid = _sd.get("snapshot_id", "") |
| 626 | if not _sid: |
| 627 | continue |
| 628 | _parent_sid = _sd.get("parent_snapshot_id") |
| 629 | _delta_upsert: dict[str, str] = _sd.get("delta_upsert") or {} |
| 630 | _delta_remove: list[str] = _sd.get("delta_remove") or [] |
| 631 | if _parent_sid and _parent_sid in _snap_resolved: |
| 632 | _base = dict(_snap_resolved[_parent_sid]) |
| 633 | elif _parent_sid: |
| 634 | _base = dict(_parent_snap_manifests.get(_parent_sid) or {}) |
| 635 | else: |
| 636 | _base = {} |
| 637 | _base.update(_delta_upsert) |
| 638 | for _path in _delta_remove: |
| 639 | _base.pop(_path, None) |
| 640 | _snap_resolved[_sid] = _base |
| 641 | |
| 642 | _delta_encoded: JSONObject = {"add": _delta_upsert} |
| 643 | if _delta_remove: |
| 644 | _delta_encoded["rm"] = _delta_remove |
| 645 | _delta_blob = _msgpack.packb(_delta_encoded, use_bin_type=True) if (_delta_upsert or _delta_remove) else None |
| 646 | |
| 647 | _is_root = not _parent_sid |
| 648 | _is_head = _sid == _head_snap_id |
| 649 | # Only store full manifest for root and head — delta-only snapshots |
| 650 | # get manifest_blob=None here and are backfilled by the mpack.index job. |
| 651 | # Storing all 1000 manifests inline would push hundreds of MB into a single |
| 652 | # transaction, blocking snapshot_refs inserts (regression at scale). |
| 653 | _manifest_blob = _msgpack.packb(_base, use_bin_type=True) if (_is_root or _is_head) else None |
| 654 | |
| 655 | if _is_root: _n_snap_root += 1 |
| 656 | if _is_head: _n_snap_head += 1 |
| 657 | if not _is_root and not _is_head: _n_snap_delta_only += 1 |
| 658 | if _manifest_blob: _snap_manifest_bytes_total += len(_manifest_blob) |
| 659 | if _delta_blob: _snap_delta_bytes_total += len(_delta_blob) |
| 660 | |
| 661 | _snap_dirs = [str(d) for d in (_sd.get("directories") or []) if d] |
| 662 | _snap_rows_inline.append({ |
| 663 | "snapshot_id": _sid, |
| 664 | "directories": _snap_dirs, |
| 665 | "manifest_blob": _manifest_blob, |
| 666 | "entry_count": len(_base), |
| 667 | "created_at": _utc_now(), |
| 668 | "parent_snapshot_id": _parent_sid or None, |
| 669 | "delta_blob": _delta_blob, |
| 670 | }) |
| 671 | t_snap_loop1 = _time_module.monotonic() |
| 672 | logger.warning( |
| 673 | "[SERVER step 8b] snapshots: replay delta chain %.1fms " |
| 674 | "total=%d root=%d head=%d delta_only=%d", |
| 675 | (t_snap_loop1 - t_snap_loop0) * 1000, len(_snap_rows_inline), |
| 676 | _n_snap_root, _n_snap_head, _n_snap_delta_only, |
| 677 | ) |
| 678 | |
| 679 | if _snap_rows_inline: |
| 680 | _snap_ids_to_check = [r["snapshot_id"] for r in _snap_rows_inline] |
| 681 | t_snap_sel0 = _time_module.monotonic() |
| 682 | _existing_snap_rows = (await session.execute( |
| 683 | _sa_text( |
| 684 | "SELECT snapshot_id, (delta_blob IS NOT NULL) AS has_delta " |
| 685 | "FROM musehub_snapshots WHERE snapshot_id = ANY(:ids)" |
| 686 | ), |
| 687 | {"ids": _snap_ids_to_check}, |
| 688 | )).all() |
| 689 | _existing_sids: set[str] = {row[0] for row in _existing_snap_rows} |
| 690 | _sids_already_healed: set[str] = {row[0] for row in _existing_snap_rows if row[1]} |
| 691 | t_snap_sel1 = _time_module.monotonic() |
| 692 | logger.warning("[SERVER step 8c] snapshots: dedup SELECT %.1fms existing=%d new=%d", |
| 693 | (t_snap_sel1 - t_snap_sel0) * 1000, len(_existing_sids), |
| 694 | len(_snap_rows_inline) - len(_existing_sids)) |
| 695 | |
| 696 | _new_snap_dicts = [r for r in _snap_rows_inline if r["snapshot_id"] not in _existing_sids] |
| 697 | _heal_snap_dicts = [ |
| 698 | r for r in _snap_rows_inline |
| 699 | if r["snapshot_id"] in _existing_sids |
| 700 | and r["snapshot_id"] not in _sids_already_healed |
| 701 | and r.get("delta_blob") |
| 702 | ] |
| 703 | if _new_snap_dicts: |
| 704 | session.add_all([MusehubSnapshot(**_r) for _r in _new_snap_dicts]) |
| 705 | if _heal_snap_dicts: |
| 706 | _HEAL_CHUNK = 2000 |
| 707 | for _hi in range(0, len(_heal_snap_dicts), _HEAL_CHUNK): |
| 708 | _chunk = _heal_snap_dicts[_hi : _hi + _HEAL_CHUNK] |
| 709 | _heal_stmt = _pg_insert(MusehubSnapshot).values(_chunk) |
| 710 | await session.execute( |
| 711 | _heal_stmt.on_conflict_do_update( |
| 712 | index_elements=["snapshot_id"], |
| 713 | set_={ |
| 714 | "delta_blob": _heal_stmt.excluded.delta_blob, |
| 715 | "parent_snapshot_id": _heal_stmt.excluded.parent_snapshot_id, |
| 716 | }, |
| 717 | where=MusehubSnapshot.delta_blob.is_(None), |
| 718 | ) |
| 719 | ) |
| 720 | t_snap_add = _time_module.monotonic() |
| 721 | logger.warning("[SERVER step 8d] snapshots: INSERT new %.1fms new=%d healed=%d", |
| 722 | (t_snap_add - t_snap_sel1) * 1000, len(_new_snap_dicts), len(_heal_snap_dicts)) |
| 723 | |
| 724 | _new_snap_rows = _new_snap_dicts |
| 725 | if _new_snap_rows: |
| 726 | await session.execute( |
| 727 | _pg_insert(MusehubSnapshotRef) |
| 728 | .values([{"repo_id": repo_id, "snapshot_id": r["snapshot_id"], "created_at": _utc_now()} for r in _new_snap_rows]) |
| 729 | .on_conflict_do_nothing(index_elements=["repo_id", "snapshot_id"]) |
| 730 | ) |
| 731 | t_snap_ref = _time_module.monotonic() |
| 732 | logger.warning("[SERVER step 8e] snapshots: UPSERT snapshot_refs %.1fms new=%d", (t_snap_ref - t_snap_add) * 1000, len(_new_snap_rows)) |
| 733 | else: |
| 734 | t_snap_ref = t_snap_loop1 |
| 735 | |
| 736 | # step 8: write commits |
| 737 | from muse.core.commits import CommitRecord as _CR_inline |
| 738 | _commit_rows_inline = [] |
| 739 | _graph_rows_inline = [] |
| 740 | |
| 741 | _commit_ids_in_mpack = { |
| 742 | _cd.get("commit_id") for _cd in _raw_commits_inline if _cd.get("commit_id") |
| 743 | } |
| 744 | _external_parent_cids = { |
| 745 | pid |
| 746 | for _cd in _raw_commits_inline |
| 747 | for pid in ( |
| 748 | ([_cd["parent_commit_id"]] if _cd.get("parent_commit_id") else []) |
| 749 | + ([_cd["parent2_commit_id"]] if _cd.get("parent2_commit_id") else []) |
| 750 | ) |
| 751 | if pid not in _commit_ids_in_mpack |
| 752 | } |
| 753 | t_gen_sel0 = _time_module.monotonic() |
| 754 | _db_parent_gens: dict[str, int] = {} |
| 755 | if _external_parent_cids: |
| 756 | _gen_rows = (await session.execute( |
| 757 | select(MusehubCommitGraph.commit_id, MusehubCommitGraph.generation) |
| 758 | .where(MusehubCommitGraph.commit_id.in_(_external_parent_cids)) |
| 759 | )).all() |
| 760 | _db_parent_gens = {_cid: _gval for _cid, _gval in _gen_rows} |
| 761 | t_gen_sel1 = _time_module.monotonic() |
| 762 | logger.warning("[SERVER step 9a] commits: fetch parent generations %.1fms external_parents=%d found=%d", |
| 763 | (t_gen_sel1 - t_gen_sel0) * 1000, len(_external_parent_cids), len(_db_parent_gens)) |
| 764 | |
| 765 | t_commit_parse0 = _time_module.monotonic() |
| 766 | |
| 767 | _cid_set_inline = {_cd.get("commit_id") for _cd in _raw_commits_inline if _cd.get("commit_id")} |
| 768 | _cd_by_cid: dict[str, dict] = {_cd["commit_id"]: _cd for _cd in _raw_commits_inline if _cd.get("commit_id")} |
| 769 | _children: dict[str, list[dict]] = {cid: [] for cid in _cid_set_inline} |
| 770 | _in_degree: dict[str, int] = {cid: 0 for cid in _cid_set_inline} |
| 771 | for _cd in _raw_commits_inline: |
| 772 | for _pk in ("parent_commit_id", "parent2_commit_id"): |
| 773 | _p = _cd.get(_pk) or "" |
| 774 | if _p in _cid_set_inline: |
| 775 | _children[_p].append(_cd) |
| 776 | _in_degree[_cd["commit_id"]] = _in_degree.get(_cd["commit_id"], 0) + 1 |
| 777 | from collections import deque as _deque |
| 778 | _queue: _deque[dict] = _deque( |
| 779 | _cd_by_cid[cid] for cid, deg in _in_degree.items() if deg == 0 |
| 780 | ) |
| 781 | _topo_sorted: list[dict] = [] |
| 782 | while _queue: |
| 783 | _cd = _queue.popleft() |
| 784 | _topo_sorted.append(_cd) |
| 785 | for _child in _children.get(_cd.get("commit_id", ""), []): |
| 786 | _child_cid = _child.get("commit_id", "") |
| 787 | _in_degree[_child_cid] -= 1 |
| 788 | if _in_degree[_child_cid] == 0: |
| 789 | _queue.append(_child) |
| 790 | _topo_seen = {_cd.get("commit_id") for _cd in _topo_sorted} |
| 791 | _topo_sorted.extend(_cd for _cd in _raw_commits_inline if _cd.get("commit_id") not in _topo_seen) |
| 792 | |
| 793 | _inline_gen: dict[str, int] = {} |
| 794 | for _cd in _topo_sorted: |
| 795 | try: |
| 796 | _cr = _CR_inline.from_dict(_cd) |
| 797 | except Exception: |
| 798 | continue |
| 799 | if not _cr.commit_id: |
| 800 | continue |
| 801 | _pids: list[str] = [] |
| 802 | if _cr.parent_commit_id: |
| 803 | _pids.append(_cr.parent_commit_id) |
| 804 | if _cr.parent2_commit_id: |
| 805 | _pids.append(_cr.parent2_commit_id) |
| 806 | _parent_gens: list[int] = [] |
| 807 | for _pid in _pids: |
| 808 | if _pid in _inline_gen: |
| 809 | _parent_gens.append(_inline_gen[_pid]) |
| 810 | elif _pid in _db_parent_gens: |
| 811 | _parent_gens.append(_db_parent_gens[_pid]) |
| 812 | _gen = (max(_parent_gens) + 1) if _parent_gens else 0 |
| 813 | _inline_gen[_cr.commit_id] = _gen |
| 814 | _commit_rows_inline.append({ |
| 815 | "commit_id": _cr.commit_id, |
| 816 | "branch": _cr.branch or "main", |
| 817 | "message": _cr.message or "", |
| 818 | "author": _cr.author or "", |
| 819 | "timestamp": _cr.committed_at or _utc_now(), |
| 820 | "parent_ids": _pids, |
| 821 | "snapshot_id": _cr.snapshot_id or None, |
| 822 | "agent_id": _cr.agent_id or "", |
| 823 | "model_id": _cr.model_id or "", |
| 824 | "toolchain_id": _cr.toolchain_id or "", |
| 825 | "commit_branch": _cr.branch or None, |
| 826 | "signature": _cr.signature or "", |
| 827 | "signer_public_key": _cr.signer_public_key or "", |
| 828 | "signer_key_id": _cr.signer_key_id or "", |
| 829 | "sem_ver_bump": _cr.sem_ver_bump or "none", |
| 830 | "breaking_changes": _cr.breaking_changes or [], |
| 831 | "reviewed_by": [], |
| 832 | "test_runs": 0, |
| 833 | "prompt_hash": _cr.prompt_hash or "", |
| 834 | "structured_delta": _cr.structured_delta if isinstance(_cr.structured_delta, dict) else None, |
| 835 | }) |
| 836 | _graph_rows_inline.append({ |
| 837 | "commit_id": _cr.commit_id, |
| 838 | "parent_ids": _pids, |
| 839 | "generation": _gen, |
| 840 | "snapshot_id": _cr.snapshot_id or None, |
| 841 | }) |
| 842 | t_commit_parse1 = _time_module.monotonic() |
| 843 | _gen_values = sorted(_inline_gen.values()) |
| 844 | logger.warning( |
| 845 | "[SERVER step 9b] commits: topo sort + parse %.1fms parsed=%d gen_min=%s gen_max=%s", |
| 846 | (t_commit_parse1 - t_commit_parse0) * 1000, len(_commit_rows_inline), |
| 847 | _gen_values[0] if _gen_values else "NONE", |
| 848 | _gen_values[-1] if _gen_values else "NONE", |
| 849 | ) |
| 850 | |
| 851 | if _commit_rows_inline: |
| 852 | t_csel0 = _time_module.monotonic() |
| 853 | _commit_ids_to_check = [r["commit_id"] for r in _commit_rows_inline] |
| 854 | _existing_cids = set((await session.execute( |
| 855 | _sa_text("SELECT commit_id FROM musehub_commits WHERE commit_id = ANY(:ids)"), |
| 856 | {"ids": _commit_ids_to_check}, |
| 857 | )).scalars()) |
| 858 | t_csel1 = _time_module.monotonic() |
| 859 | logger.warning("[SERVER step 9c] commits: dedup SELECT %.1fms existing=%d new=%d", |
| 860 | (t_csel1 - t_csel0) * 1000, len(_existing_cids), |
| 861 | len(_commit_rows_inline) - len(_existing_cids)) |
| 862 | |
| 863 | _new_commit_rows = [r for r in _commit_rows_inline if r["commit_id"] not in _existing_cids] |
| 864 | session.add_all([MusehubCommit(**_r) for _r in _new_commit_rows]) |
| 865 | t_cadd = _time_module.monotonic() |
| 866 | logger.warning("[SERVER step 9d] commits: INSERT new %.1fms new=%d", (t_cadd - t_csel1) * 1000, len(_new_commit_rows)) |
| 867 | # Upsert refs for ALL commits in this push, not just globally-new ones. |
| 868 | # A commit may already exist in musehub_commits from another repo's push |
| 869 | # but still be missing its ref for THIS repo — using _new_commit_rows here |
| 870 | # silently drops those refs and leaves commit history invisible to intel. |
| 871 | if _commit_rows_inline: |
| 872 | await session.execute( |
| 873 | _pg_insert(MusehubCommitRef) |
| 874 | .values([{"repo_id": repo_id, "commit_id": r["commit_id"], "created_at": _utc_now()} for r in _commit_rows_inline]) |
| 875 | .on_conflict_do_nothing(index_elements=["repo_id", "commit_id"]) |
| 876 | ) |
| 877 | t_cref = _time_module.monotonic() |
| 878 | logger.warning("[SERVER step 9e] commits: UPSERT commit_refs %.1fms total=%d", (t_cref - t_cadd) * 1000, len(_commit_rows_inline)) |
| 879 | |
| 880 | if _graph_rows_inline: |
| 881 | _graph_stmt = _pg_insert(MusehubCommitGraph).values(_graph_rows_inline) |
| 882 | await session.execute(_graph_stmt.on_conflict_do_update( |
| 883 | index_elements=["commit_id"], |
| 884 | set_={"generation": _graph_stmt.excluded.generation, |
| 885 | "snapshot_id": _graph_stmt.excluded.snapshot_id}, |
| 886 | )) |
| 887 | t_graph = _time_module.monotonic() |
| 888 | logger.warning("[SERVER step 9f] commits: UPSERT commit_graph %.1fms rows=%d", |
| 889 | (t_graph - t_cref) * 1000, len(_graph_rows_inline)) |
| 890 | else: |
| 891 | t_graph = t_commit_parse1 |
| 892 | |
| 893 | # step 9: advance branch pointer |
| 894 | tip = head_commit_id |
| 895 | if tip and branch: |
| 896 | t_branch0 = _time_module.monotonic() |
| 897 | branch_row = (await session.execute( |
| 898 | select(MusehubBranch).where( |
| 899 | MusehubBranch.repo_id == repo_id, |
| 900 | MusehubBranch.name == branch, |
| 901 | ).with_for_update() |
| 902 | )).scalar_one_or_none() |
| 903 | |
| 904 | current_head = branch_row.head_commit_id if branch_row else None |
| 905 | if not force and not _is_fast_forward(tip, current_head or "", wire_bytes): |
| 906 | raise NonFastForwardError( |
| 907 | f"push rejected: {tip[:20]}… is not a fast-forward of " |
| 908 | f"{(current_head or '')[:20]}… — use --force to override" |
| 909 | ) |
| 910 | |
| 911 | if branch_row is None: |
| 912 | session.add(MusehubBranch( |
| 913 | branch_id=compute_branch_id(repo_id, branch), |
| 914 | repo_id=repo_id, |
| 915 | name=branch, |
| 916 | head_commit_id=tip, |
| 917 | )) |
| 918 | else: |
| 919 | branch_row.head_commit_id = tip |
| 920 | t_branch1 = _time_module.monotonic() |
| 921 | logger.warning("[SERVER step 10] advance branch pointer (atomic CAS) %.1fms", (t_branch1 - t_branch0) * 1000) |
| 922 | else: |
| 923 | t_branch1 = t_graph |
| 924 | |
| 925 | # commit db transaction |
| 926 | t_pre_commit = _time_module.monotonic() |
| 927 | await session.commit() |
| 928 | t_commit = _time_module.monotonic() |
| 929 | logger.warning("[SERVER step 10] commit db transaction %.1fms", (t_commit - t_pre_commit) * 1000) |
| 930 | |
| 931 | # step 11: enqueue async jobs — TODO: not yet implemented |
| 932 | |
| 933 | elapsed_ms = (t_commit - t0) * 1000 |
| 934 | logger.warning("[SERVER step 11] enqueue intel jobs (TODO) 0.0ms") |
| 935 | logger.warning("[SERVER ✅] TOTAL=%.1fms branch=%s head=%s", elapsed_ms, branch, head_commit_id[:20] if head_commit_id else "none") |
| 936 | return { |
| 937 | "head": tip, |
| 938 | "branch": branch, |
| 939 | "blobs_in_mpack": blobs_count, |
| 940 | "commits_in_mpack": commits_count, |
| 941 | "blobs_written": len(_new_oids), |
| 942 | "commits_written": len(_new_commit_rows) if _commit_rows_inline else 0, |
| 943 | "snapshots_written": len(_new_snap_dicts) if _snap_rows_inline else 0, |
| 944 | } |
| 945 | |
| 946 | |
| 947 | class PurgeResult(TypedDict): |
| 948 | removed: int |
| 949 | kept: int |
| 950 | |
| 951 | |
| 952 | async def purge_stale_mpack_index_entries(session: AsyncSession) -> PurgeResult: |
| 953 | """Delete MusehubMPackIndex rows for mpacks no longer in MinIO. |
| 954 | |
| 955 | Left behind when mpack.gc deleted old mpacks but didn't prune the index. |
| 956 | Safe to delete: objects with stale entries have valid s3:// storage_uri |
| 957 | and are served directly from S3 without needing the mpack path. |
| 958 | |
| 959 | Returns {"removed": N, "kept": M}. |
| 960 | """ |
| 961 | from musehub.storage.backends import get_backend |
| 962 | backend = get_backend() |
| 963 | |
| 964 | # Find all distinct mpack_ids in the index. |
| 965 | distinct_mpacks = (await session.execute( |
| 966 | select(MusehubMPackIndex.mpack_id).distinct() |
| 967 | )).scalars().all() |
| 968 | |
| 969 | removed = 0 |
| 970 | kept = 0 |
| 971 | for mpack_id in distinct_mpacks: |
| 972 | alive = await backend.exists_mpack(mpack_id) |
| 973 | if alive: |
| 974 | kept_count = (await session.execute( |
| 975 | select(MusehubMPackIndex.entity_id).where( |
| 976 | MusehubMPackIndex.mpack_id == mpack_id |
| 977 | ) |
| 978 | )).scalars().all() |
| 979 | kept += len(kept_count) |
| 980 | else: |
| 981 | # Delete all index entries for this dead mpack. |
| 982 | rows = (await session.execute( |
| 983 | select(MusehubMPackIndex).where( |
| 984 | MusehubMPackIndex.mpack_id == mpack_id |
| 985 | ) |
| 986 | )).scalars().all() |
| 987 | for row in rows: |
| 988 | await session.delete(row) |
| 989 | removed += len(rows) |
| 990 | |
| 991 | await session.flush() |
| 992 | logger.info("purge_stale_mpack_index_entries: removed=%d kept=%d", removed, kept) |
| 993 | return {"removed": removed, "kept": kept} |
| 994 | |
| 995 | |
| 996 | async def _build_commit_graph_from_raw(session: AsyncSession, raw_commits: list[dict]) -> int: |
| 997 | """Compute generation numbers and upsert MusehubCommitGraph rows. |
| 998 | |
| 999 | Mirrors the generation logic in wire_push_unpack_mpack but operates on |
| 1000 | an already-parsed commit list (used by process_mpack_index_job). |
| 1001 | Returns the number of rows upserted. |
| 1002 | """ |
| 1003 | if not raw_commits: |
| 1004 | return 0 |
| 1005 | |
| 1006 | from collections import deque as _deque |
| 1007 | |
| 1008 | cid_set = {cd.get("commit_id") for cd in raw_commits if cd.get("commit_id")} |
| 1009 | cd_by_cid = {cd["commit_id"]: cd for cd in raw_commits if cd.get("commit_id")} |
| 1010 | |
| 1011 | # Topological sort (Kahn's algorithm — oldest first so parents resolve before children). |
| 1012 | children: dict[str, list[dict]] = {cid: [] for cid in cid_set} |
| 1013 | in_degree: dict[str, int] = {cid: 0 for cid in cid_set} |
| 1014 | for cd in raw_commits: |
| 1015 | for pk in ("parent_commit_id", "parent2_commit_id"): |
| 1016 | p = cd.get(pk) or "" |
| 1017 | if p in cid_set: |
| 1018 | children[p].append(cd) |
| 1019 | in_degree[cd["commit_id"]] = in_degree.get(cd["commit_id"], 0) + 1 |
| 1020 | |
| 1021 | queue: _deque[dict] = _deque(cd_by_cid[cid] for cid, deg in in_degree.items() if deg == 0) |
| 1022 | topo: list[dict] = [] |
| 1023 | while queue: |
| 1024 | cd = queue.popleft() |
| 1025 | topo.append(cd) |
| 1026 | for child in children.get(cd.get("commit_id", ""), []): |
| 1027 | child_cid = child.get("commit_id", "") |
| 1028 | in_degree[child_cid] -= 1 |
| 1029 | if in_degree[child_cid] == 0: |
| 1030 | queue.append(child) |
| 1031 | seen = {cd.get("commit_id") for cd in topo} |
| 1032 | topo.extend(cd for cd in raw_commits if cd.get("commit_id") not in seen) |
| 1033 | |
| 1034 | # Fetch generation numbers for any external parents already in the DB. |
| 1035 | external_pids = { |
| 1036 | pid |
| 1037 | for cd in raw_commits |
| 1038 | for pid in ([cd.get("parent_commit_id") or "", cd.get("parent2_commit_id") or ""]) |
| 1039 | if pid and pid not in cid_set |
| 1040 | } |
| 1041 | db_parent_gens: dict[str, int] = {} |
| 1042 | if external_pids: |
| 1043 | gen_rows = (await session.execute( |
| 1044 | select(MusehubCommitGraph.commit_id, MusehubCommitGraph.generation) |
| 1045 | .where(MusehubCommitGraph.commit_id.in_(external_pids)) |
| 1046 | )).all() |
| 1047 | db_parent_gens = {cid: gval for cid, gval in gen_rows} |
| 1048 | |
| 1049 | inline_gen: dict[str, int] = {} |
| 1050 | graph_rows: list[dict] = [] |
| 1051 | now = _utc_now() |
| 1052 | for cd in topo: |
| 1053 | cid = cd.get("commit_id", "") |
| 1054 | if not cid: |
| 1055 | continue |
| 1056 | pids = [p for p in [cd.get("parent_commit_id") or "", cd.get("parent2_commit_id") or ""] if p] |
| 1057 | parent_gens = [inline_gen[p] if p in inline_gen else db_parent_gens[p] |
| 1058 | for p in pids if p in inline_gen or p in db_parent_gens] |
| 1059 | gen = (max(parent_gens) + 1) if parent_gens else 0 |
| 1060 | inline_gen[cid] = gen |
| 1061 | graph_rows.append({ |
| 1062 | "commit_id": cid, |
| 1063 | "parent_ids": pids, |
| 1064 | "generation": gen, |
| 1065 | "snapshot_id": cd.get("snapshot_id") or None, |
| 1066 | "created_at": now, |
| 1067 | }) |
| 1068 | |
| 1069 | if not graph_rows: |
| 1070 | return 0 |
| 1071 | |
| 1072 | _CHUNK = 2000 |
| 1073 | written = 0 |
| 1074 | for i in range(0, len(graph_rows), _CHUNK): |
| 1075 | chunk = graph_rows[i: i + _CHUNK] |
| 1076 | stmt = _pg_insert(MusehubCommitGraph).values(chunk) |
| 1077 | await session.execute(stmt.on_conflict_do_update( |
| 1078 | index_elements=["commit_id"], |
| 1079 | set_={ |
| 1080 | "generation": stmt.excluded.generation, |
| 1081 | "snapshot_id": stmt.excluded.snapshot_id, |
| 1082 | }, |
| 1083 | )) |
| 1084 | written += len(chunk) |
| 1085 | return written |
| 1086 | |
| 1087 | |
| 1088 | async def process_mpack_index_job(session: AsyncSession, job_id: str) -> JSONObject: |
| 1089 | """Populate MusehubMPackIndex byte-ranges for every object in a pushed mpack. |
| 1090 | |
| 1091 | Called by the background worker after every push. Reads the mpack from |
| 1092 | MinIO, computes byte offsets for each blob, then bulk-upserts |
| 1093 | MusehubMPackIndex rows with byte_offset and byte_length so the fetch path |
| 1094 | can serve blobs via byte-range GETs instead of downloading the full mpack. |
| 1095 | |
| 1096 | Returns a result dict with counts for logging. |
| 1097 | """ |
| 1098 | t0 = _time_module.monotonic() |
| 1099 | |
| 1100 | job_row = (await session.execute( |
| 1101 | select(MusehubBackgroundJob).where(MusehubBackgroundJob.job_id == job_id) |
| 1102 | )).scalar_one_or_none() |
| 1103 | if job_row is None: |
| 1104 | raise ValueError(f"mpack.index job not found: {job_id}") |
| 1105 | |
| 1106 | payload = job_row.payload or {} |
| 1107 | repo_id = job_row.repo_id |
| 1108 | mpack_key = str(payload.get("mpack_key", "")) |
| 1109 | if not mpack_key: |
| 1110 | raise ValueError(f"mpack.index job {job_id} missing mpack_key in payload") |
| 1111 | |
| 1112 | from musehub.storage.backends import get_backend |
| 1113 | backend = get_backend() |
| 1114 | wire_bytes = await backend.get_mpack(mpack_key) |
| 1115 | if not wire_bytes: |
| 1116 | raise ValueError(f"mpack {mpack_key[:20]} not found in storage") |
| 1117 | |
| 1118 | t_fetched = _time_module.monotonic() |
| 1119 | |
| 1120 | # Parse: handle both MUSE binary and legacy msgpack formats. |
| 1121 | if wire_bytes[:4] == b"MUSE": |
| 1122 | from muse.core.mpack import parse_wire_mpack as _parse_wire |
| 1123 | mpack: JSONObject = _parse_wire(wire_bytes) |
| 1124 | else: |
| 1125 | mpack = _msgpack.unpackb(wire_bytes, raw=False) |
| 1126 | |
| 1127 | raw_blobs: list[JSONObject] = mpack.get("blobs") or mpack.get("objects") or [] |
| 1128 | t_parsed = _time_module.monotonic() |
| 1129 | |
| 1130 | logger.info( |
| 1131 | "[mpack.index] job=%s fetch=%.3fs parse=%.3fs blobs=%d mpack=%d KB", |
| 1132 | job_id[:16], t_fetched - t0, t_parsed - t_fetched, |
| 1133 | len(raw_blobs), len(wire_bytes) // 1024, |
| 1134 | ) |
| 1135 | |
| 1136 | # Compute byte offsets for every object in the MUSE binary mpack. |
| 1137 | byte_offsets: dict[str, tuple[int, int]] = {} |
| 1138 | if wire_bytes[:4] == b"MUSE": |
| 1139 | all_oids = [obj.get("object_id", "") for obj in raw_blobs if obj.get("object_id")] |
| 1140 | byte_offsets = compute_object_byte_offsets(wire_bytes) |
| 1141 | |
| 1142 | # Upsert MusehubMPackIndex rows — one per blob with byte_offset/byte_length. |
| 1143 | now = _utc_now() |
| 1144 | _CHUNK = 2000 |
| 1145 | index_rows = [] |
| 1146 | for oid, (offset, length) in byte_offsets.items(): |
| 1147 | index_rows.append({ |
| 1148 | "entity_id": oid, |
| 1149 | "mpack_id": mpack_key, |
| 1150 | "entity_type": "object", |
| 1151 | "created_at": now, |
| 1152 | "byte_offset": offset, |
| 1153 | "byte_length": length, |
| 1154 | }) |
| 1155 | |
| 1156 | # Also index any blobs without byte-range data (fallback: full-mpack serve). |
| 1157 | indexed_oids = set(byte_offsets.keys()) |
| 1158 | for obj in raw_blobs: |
| 1159 | oid = obj.get("object_id", "") |
| 1160 | if oid and oid not in indexed_oids: |
| 1161 | index_rows.append({ |
| 1162 | "entity_id": oid, |
| 1163 | "mpack_id": mpack_key, |
| 1164 | "entity_type": "object", |
| 1165 | "created_at": now, |
| 1166 | }) |
| 1167 | |
| 1168 | written = 0 |
| 1169 | for i in range(0, len(index_rows), _CHUNK): |
| 1170 | chunk = index_rows[i: i + _CHUNK] |
| 1171 | stmt = _pg_insert(MusehubMPackIndex).values(chunk) |
| 1172 | # On conflict: update byte_offset/byte_length if we now have them. |
| 1173 | await session.execute( |
| 1174 | stmt.on_conflict_do_update( |
| 1175 | index_elements=["entity_id", "mpack_id"], |
| 1176 | set_={ |
| 1177 | "byte_offset": stmt.excluded.byte_offset, |
| 1178 | "byte_length": stmt.excluded.byte_length, |
| 1179 | }, |
| 1180 | where=MusehubMPackIndex.byte_offset.is_(None), |
| 1181 | ) |
| 1182 | ) |
| 1183 | written += len(chunk) |
| 1184 | |
| 1185 | # Upsert MusehubObject rows — storage_uri points to covering mpack, no per-object S3 PUT. |
| 1186 | # Must happen before object_refs because object_refs has a FK on musehub_objects. |
| 1187 | all_blob_oids = [obj.get("object_id", "") for obj in raw_blobs if obj.get("object_id")] |
| 1188 | if all_blob_oids: |
| 1189 | mpack_uri = f"mpack://{mpack_key}" |
| 1190 | obj_rows = [ |
| 1191 | {"object_id": oid, "storage_uri": mpack_uri, "path": "", "created_at": now} |
| 1192 | for oid in all_blob_oids |
| 1193 | ] |
| 1194 | for i in range(0, len(obj_rows), _CHUNK): |
| 1195 | chunk = obj_rows[i: i + _CHUNK] |
| 1196 | obj_stmt = _pg_insert(MusehubObject).values(chunk) |
| 1197 | await session.execute( |
| 1198 | obj_stmt.on_conflict_do_update( |
| 1199 | index_elements=["object_id"], |
| 1200 | set_={"storage_uri": obj_stmt.excluded.storage_uri}, |
| 1201 | where=MusehubObject.storage_uri.is_(None), |
| 1202 | ) |
| 1203 | ) |
| 1204 | |
| 1205 | # Write object_refs after musehub_objects so the FK constraint is satisfied. |
| 1206 | if all_blob_oids and repo_id: |
| 1207 | await _upsert_object_refs(session, repo_id, all_blob_oids) |
| 1208 | |
| 1209 | # Build commit graph rows from mpack commits. |
| 1210 | raw_commits: list[dict] = mpack.get("commits") or [] |
| 1211 | graph_written = await _build_commit_graph_from_raw(session, raw_commits) |
| 1212 | |
| 1213 | t_done = _time_module.monotonic() |
| 1214 | logger.info( |
| 1215 | "✅ [mpack.index] done job=%s index_rows=%d graph_rows=%d elapsed=%.3fs", |
| 1216 | job_id[:16], written, graph_written, t_done - t0, |
| 1217 | ) |
| 1218 | return { |
| 1219 | "mpack_index_written": written, |
| 1220 | "byte_ranges_computed": len(byte_offsets), |
| 1221 | "commit_graph_written": graph_written, |
| 1222 | "mpack_size_bytes": len(wire_bytes), |
| 1223 | "elapsed_ms": (t_done - t0) * 1000, |
| 1224 | } |
File History
18 commits
sha256:80c35083d2abe19ef730693f2b35ad9a9342d2abbe81e802e6f397fe9ae75eb9
merge: pull staging/dev — asyncpg batch fix + content-addre…
Sonnet 4.6
patch
8 hours ago
sha256:5667a3e21bf16fd2e6d6bd4a769bd1c0cf7634afa12cef6450cc77573196b7f9
asyncpg caps query parameters
Human
patch
10 hours ago
sha256:c3e12fae919d99c0e030595f58eb0a7b7df9ae4bd4ed6ce582a46c33784a54d0
fix(mpack-index-job): write object_refs after musehub_objec…
Sonnet 4.6
patch
3 days ago
sha256:8ba2515ec3cb2a4089d54422f342b5a47161df29d5db72cb7356591d128fba6a
fix(wire-push): write object_refs and mpack_index for all m…
Sonnet 4.6
patch
3 days ago
sha256:009b5a222314f47640a58d75ce5a1f428f1624cf0b51384dfcdfbdfab3cc42a4
feat: migration idempotency, file attribution DAG walk, mpa…
Sonnet 4.6
minor
⚠
7 days ago
sha256:92528ae07d0e1239d87fd5fd1f439e8fbb49c9778a9a400bc4a736073fb28316
feat: byte-range blob reads, file attribution DAG walk, bra…
Sonnet 4.6
minor
⚠
7 days ago
sha256:f3995ec2c05c9c34b0e4d6e96349a811d0117a1c51d78096d757998ccb3c0520
fix: blobs only in S3/mpack — remove commit/snapshot indivi…
Sonnet 4.6
patch
9 days ago
sha256:8a7ff43f27504c1f6abba59ffbab3dc89bd1d8bcfa4c57f6e280286d1a58195a
feat: resurrect process_mpack_index_job with byte-range ind…
Sonnet 4.6
patch
9 days ago
sha256:ad616c6113d6c00f4efed6b2993734ca46d3e9b5bee25addd4ce8ae6b57136e5
chore: bump version to 0.2.0rc11; typing audit clean + all …
Sonnet 4.6
minor
⚠
11 days ago
sha256:ecca645d94c5f39c88f4bc1283447ba0f4635ef3cbb11d0cd9b3759cba289d00
fix: compute_snapshot_id uses typed-object formula and wire…
Sonnet 4.6
minor
⚠
11 days ago
sha256:e35be48854f182f7bf02dc6cc0f58d22b3de3a544b570c0e2bc53f9e75a3607d
feat(phase6): remove delta_blob path, dead imports, add fal…
Sonnet 4.6
minor
⚠
12 days ago
sha256:450998d182617fa93b737cbbdb3fe956c61566051739acec8c63ec5e7b4587f8
feat(phase3): write snapshot objects to S3 at all 3 write s…
Sonnet 4.6
patch
12 days ago
sha256:e597c0b97ade9c3c52ac4735ceb437ee69d1b6f0db61b8d7caa6467c5866566d
feat(phase2): write commit objects to S3 at all 5 write sit…
Sonnet 4.6
patch
12 days ago
sha256:6b1949fc2797ca4c1936a637a4cbfec828ef56cf52398a2e74ca3c4f494e728f
fix: use wire_bytes not mpack_bytes_raw in compute_object_b…
Sonnet 4.6
patch
12 days ago
sha256:302574ddba13c9a20694c0fb051176eef4896f943b63bc458df886633b1bfcd6
feat: mpack byte-range index — store byte_offset/byte_lengt…
Sonnet 4.6
minor
⚠
12 days ago
sha256:ba425baae57a4223a95f2cf4841181d790d04d02b9c98d0371f7b43046d92ff4
fix: store directories from incoming snapshot deltas instea…
Sonnet 4.6
patch
12 days ago
sha256:4c039010d7b35b3bb7e9242df3e73ff50336264cb89a85a7509dc959e8516fc1
fix: store signer_public_key from incoming commits instead …
Sonnet 4.6
patch
12 days ago
sha256:1b52d53ca7bae6c2b32297aaa798cd9f25e8241f457e8de4186aded84ab9c4a1
debug: log full presign URL and get_mpack key/bytes to isol…
Sonnet 4.6
minor
⚠
14 days ago