mpack.py
python
sha256:889892bcd79c5b77cfb314c90ff6caaeb9f6575dc0cab81b85259d5bbb235971
type fix
Human
patch
3 days ago
| 1 | """Muse MPack format — mpack of commits, snapshots, and blobs for wire transfer. |
| 2 | |
| 3 | An :class:`MPack` is the unit of exchange between the Muse CLI and a remote |
| 4 | (e.g. MuseHub). It carries everything needed to reconstruct a slice of commit |
| 5 | history locally: |
| 6 | |
| 7 | - :class:`CommitDict` records (full metadata + agent provenance) |
| 8 | - :class:`SnapshotDict` records (file manifests) |
| 9 | - :class:`BlobPayload` entries (raw blob bytes) |
| 10 | - ``summary`` (:class:`MPackSummary`) — advisory counts for agent routing |
| 11 | |
| 12 | :func:`build_mpack` collects all data reachable from a set of commit IDs and |
| 13 | populates the summary field. |
| 14 | :func:`apply_mpack` writes a mpack into a local ``.muse/`` directory. |
| 15 | |
| 16 | MPack wire encoding |
| 17 | --------------------- |
| 18 | An MPack is encoded in the MPack binary wire format (``b"MUSE"`` magic, |
| 19 | section table, SHA-256 footer) and transmitted with |
| 20 | ``Content-Type: application/x-muse-pack``. |
| 21 | |
| 22 | Agent contract |
| 23 | -------------- |
| 24 | |
| 25 | - ``exit_code`` 0: all data applied successfully. |
| 26 | - ``exit_code`` 1: validation error (malformed object ID, path traversal). |
| 27 | - ``exit_code`` 3: I/O error reading from local store. |
| 28 | - ``duration_ms``: wall-clock milliseconds for build or apply. |
| 29 | """ |
| 30 | |
| 31 | import collections |
| 32 | import datetime |
| 33 | import hashlib as _hashlib |
| 34 | import logging |
| 35 | import os |
| 36 | import pathlib |
| 37 | import struct as _struct |
| 38 | from typing import TypedDict |
| 39 | |
| 40 | from muse.core.graph import iter_ancestors |
| 41 | from muse.core.object_availability import ObjectState, load_promisor_remotes, object_state |
| 42 | from muse.core.object_store import has_object, read_object |
| 43 | from muse.core.pack_store import write_pack |
| 44 | from muse.core.ids import hash_snapshot |
| 45 | from muse.core.validation import ( |
| 46 | MAX_OBJECT_WRITE_BYTES, |
| 47 | MAX_PACK_OBJECTS, |
| 48 | validate_object_id, |
| 49 | validate_workspace_path, |
| 50 | ) |
| 51 | from muse.core.types import BranchHeads, blob_id, short_id |
| 52 | from muse.core.commits import ( |
| 53 | CommitDict, |
| 54 | CommitRecord, |
| 55 | MissingParentError, |
| 56 | read_commit, |
| 57 | write_commit, |
| 58 | ) |
| 59 | from muse.core.snapshots import ( |
| 60 | SnapshotDict, |
| 61 | SnapshotRecord, |
| 62 | read_snapshot, |
| 63 | write_snapshot, |
| 64 | ) |
| 65 | from muse.core.tags import ( |
| 66 | TagDict, |
| 67 | TagRecord, |
| 68 | get_all_tags, |
| 69 | get_tags_for_commit, |
| 70 | write_tag, |
| 71 | ) |
| 72 | |
| 73 | logger = logging.getLogger(__name__) |
| 74 | |
| 75 | # --------------------------------------------------------------------------- |
| 76 | # Type aliases — avoid bare dict[str, X] at boundaries |
| 77 | # --------------------------------------------------------------------------- |
| 78 | |
| 79 | _Manifest = dict[str, str] # path → object_id |
| 80 | _JsonValue = str | int | float | bool | None |
| 81 | _MetaDict = dict[str, _JsonValue] # loose metadata (dynamic keys, JSON-serialisable) |
| 82 | _SnapshotResolvedMap = dict[str, tuple[_Manifest, list[str]]] # sid → (manifest, dirs) |
| 83 | |
| 84 | # --------------------------------------------------------------------------- |
| 85 | # Wire-format TypedDicts |
| 86 | # --------------------------------------------------------------------------- |
| 87 | |
| 88 | class _BlobPayloadBase(TypedDict): |
| 89 | """Required fields for every blob payload in an MPack.""" |
| 90 | |
| 91 | object_id: str |
| 92 | content: bytes |
| 93 | |
| 94 | class BlobPayload(_BlobPayloadBase, total=False): |
| 95 | """A single content-addressed blob with encoding metadata for mpack transfer. |
| 96 | |
| 97 | Required fields (always present): |
| 98 | object_id: Content-addressed SHA-256 identifier (``sha256:<hex>``). |
| 99 | content: Raw or encoded bytes — see *encoding*. |
| 100 | |
| 101 | Optional fields (omit for ``"raw"`` with no base): |
| 102 | path: Repository path of this blob; used by the server to look |
| 103 | up delta base candidates for the next push. |
| 104 | encoding: ``"raw"`` (default) | ``"zlib"`` | ``"delta+zlib"``. |
| 105 | base_id: Base blob ID for ``"delta+zlib"`` encoding. |
| 106 | sz: Uncompressed byte count of the target blob. Required when |
| 107 | ``encoding`` is ``"delta+zlib"`` so the server can pre-allocate |
| 108 | before decompression. Ignored for ``"raw"`` payloads. |
| 109 | """ |
| 110 | |
| 111 | path: str |
| 112 | encoding: str |
| 113 | base_id: str |
| 114 | sz: int |
| 115 | |
| 116 | class WireTag(TypedDict): |
| 117 | """A tag record serialised for wire transfer inside an :class:`MPack`.""" |
| 118 | |
| 119 | tag_id: str |
| 120 | repo_id: str |
| 121 | commit_id: str |
| 122 | tag: str |
| 123 | created_at: str |
| 124 | |
| 125 | class MPackMeta(TypedDict, total=False): |
| 126 | """Self-describing metadata embedded in every :class:`MPack`. |
| 127 | |
| 128 | Agents read this to understand the mpack's scope without inspecting |
| 129 | commits or objects. |
| 130 | |
| 131 | Fields: |
| 132 | mode: ``"full"`` — all referenced objects must be in the mpack |
| 133 | or the local store. ``"incremental"`` — some objects are |
| 134 | expected to exist at the receiver's base (declared in |
| 135 | ``base_commits``); they are not included in this mpack. |
| 136 | base_commits: Commit IDs passed as ``--have`` when the mpack was |
| 137 | built. Empty for full bundles. |
| 138 | created_at: ISO 8601 UTC timestamp of when the mpack was assembled. |
| 139 | """ |
| 140 | |
| 141 | mode: str # "full" | "incremental" |
| 142 | base_commits: list[str] # sha256:-prefixed commit IDs |
| 143 | created_at: str # ISO 8601 — e.g. "2026-01-01T00:00:00Z" |
| 144 | |
| 145 | class MPackSummary(TypedDict, total=False): |
| 146 | """Advisory summary embedded in every :class:`MPack`. |
| 147 | |
| 148 | Agents read this to make routing/accept/reject decisions before |
| 149 | touching commits or objects. All fields are advisory — receivers |
| 150 | must not rely on them for correctness, only for optimisation. |
| 151 | |
| 152 | Fields: |
| 153 | commits_count: Number of commits in this mpack. |
| 154 | blobs_count: Number of unique blobs in this mpack. |
| 155 | blobs_bytes: Total uncompressed blob bytes. |
| 156 | branches: Branch name → tip commit_id at build time. |
| 157 | agent_ids: All agent_id values from commits in this mpack. |
| 158 | """ |
| 159 | |
| 160 | commits_count: int |
| 161 | blobs_count: int |
| 162 | blobs_bytes: int |
| 163 | branches: BranchHeads # branch_name → commit_id |
| 164 | agent_ids: list[str] # distinct agent_ids in commits |
| 165 | |
| 166 | class SnapshotDeltaDict(TypedDict, total=False): |
| 167 | """Wire representation of a snapshot as a delta from its parent snapshot. |
| 168 | |
| 169 | Guiding principle: content-addressing is a proof, not a label. |
| 170 | ``snapshot_id = sha256(sorted path-NUL-oid pairs)``. A receiver who |
| 171 | holds ``snapshot_id`` and the delta can reconstruct the full manifest |
| 172 | and verify it by hashing — no external store needed. |
| 173 | |
| 174 | Fields: |
| 175 | snapshot_id: sha256 of the *full* manifest (the proof). |
| 176 | parent_snapshot_id: snapshot_id of the parent, or ``None`` for root. |
| 177 | delta_upsert: Paths added or changed relative to parent. |
| 178 | delta_remove: Paths removed relative to parent. |
| 179 | |
| 180 | Reconstruction:: |
| 181 | |
| 182 | manifest = dict(resolved[parent_snapshot_id]) # or {} if None |
| 183 | manifest.update(delta_upsert) |
| 184 | for path in delta_remove: |
| 185 | del manifest[path] |
| 186 | assert hash_snapshot(manifest) == snapshot_id # the math IS the proof |
| 187 | """ |
| 188 | |
| 189 | snapshot_id: str |
| 190 | parent_snapshot_id: str | None |
| 191 | delta_upsert: dict[str, str] # path → object_id |
| 192 | delta_remove: list[str] # paths removed |
| 193 | |
| 194 | |
| 195 | class MPack(TypedDict, total=False): |
| 196 | """The unit of exchange between the Muse CLI and a remote. |
| 197 | |
| 198 | All fields are optional so that partial bundles (fetch-only, objects-only) |
| 199 | are valid wire messages. Callers check for presence before consuming. |
| 200 | |
| 201 | The ``summary`` field carries advisory metadata for agent routing — |
| 202 | agents can make decisions from it without deserialising commits or objects. |
| 203 | |
| 204 | The ``meta`` field declares the mpack's scope (full vs incremental) and |
| 205 | base commits, allowing receivers to verify it correctly without out-of-band |
| 206 | knowledge of how it was built. |
| 207 | |
| 208 | Snapshots are stored as :class:`SnapshotDeltaDict` entries in commit-graph |
| 209 | order (oldest first). The first entry has ``parent_snapshot_id=None`` and |
| 210 | ``delta_upsert`` equal to the full manifest. Every subsequent entry carries |
| 211 | only the paths that changed. Receivers reconstruct full manifests by |
| 212 | applying the delta chain and verify correctness by hashing the result. |
| 213 | """ |
| 214 | |
| 215 | commits: list[CommitDict] |
| 216 | snapshots: list[SnapshotDeltaDict] |
| 217 | blobs: list[BlobPayload] |
| 218 | #: Tags attached to any commit included in this mpack. |
| 219 | tags: list[WireTag] |
| 220 | #: Advisory summary — populated by :func:`build_mpack`. |
| 221 | summary: MPackSummary |
| 222 | #: Self-describing metadata — always written by :func:`build_mpack`. |
| 223 | meta: MPackMeta |
| 224 | |
| 225 | class RemoteInfo(TypedDict, total=False): |
| 226 | """Repository metadata returned by ``GET {url}/refs``.""" |
| 227 | |
| 228 | repo_id: str # always present |
| 229 | domain: str # always present |
| 230 | #: Maps branch name → commit ID for every branch on the remote. |
| 231 | branch_heads: BranchHeads # always present |
| 232 | default_branch: str # always present |
| 233 | |
| 234 | class PushResult(TypedDict): |
| 235 | """Server response after a push attempt.""" |
| 236 | |
| 237 | ok: bool |
| 238 | message: str |
| 239 | #: Updated branch heads on the remote after the push (if successful). |
| 240 | branch_heads: BranchHeads |
| 241 | |
| 242 | class FetchRequest(TypedDict, total=False): |
| 243 | """Body of ``POST {url}/fetch`` — negotiates which commits to transfer. |
| 244 | |
| 245 | ``want`` lists commit IDs the client wants to receive. |
| 246 | ``have`` lists commit IDs already present locally, allowing the server |
| 247 | to send only the commits the client lacks (delta negotiation). |
| 248 | """ |
| 249 | |
| 250 | want: list[str] |
| 251 | have: list[str] |
| 252 | |
| 253 | class ApplyResult(TypedDict): |
| 254 | """Counts returned by :func:`apply_mpack` describing what was written. |
| 255 | |
| 256 | ``blobs_skipped`` counts blobs already present in the store (not |
| 257 | rewritten, idempotent). All other counts reflect *new* writes only. |
| 258 | ``tags_written`` counts tag records written from the mpack's ``tags`` |
| 259 | section (0 for bundles created without tag data). |
| 260 | ``failed_blobs`` blob IDs that failed integrity or write checks. |
| 261 | ``skipped_snapshots`` snapshot IDs skipped because a referenced blob failed. |
| 262 | """ |
| 263 | |
| 264 | commits_written: int |
| 265 | snapshots_written: int |
| 266 | blobs_written: int |
| 267 | blobs_skipped: int |
| 268 | tags_written: int |
| 269 | failed_blobs: list[str] |
| 270 | skipped_snapshots: list[str] |
| 271 | |
| 272 | # --------------------------------------------------------------------------- |
| 273 | # Pack building |
| 274 | # --------------------------------------------------------------------------- |
| 275 | |
| 276 | class _WalkResult(TypedDict): |
| 277 | """Cached result of a BFS commit-graph walk. |
| 278 | |
| 279 | Produced once by :func:`walk_commits` and consumed by both |
| 280 | :func:`collect_blob_ids_from_walk` (object ID collection) and |
| 281 | :func:`build_mpack_from_walk` (load blobs for transmission). |
| 282 | |
| 283 | Sharing this avoids two identical BFS traversals per push: the first to |
| 284 | gather object IDs for client-side deduplication, and the second to load |
| 285 | blobs and assemble the pack mpack. |
| 286 | |
| 287 | ``missing_snapshots`` is populated by :func:`walk_commits` with the |
| 288 | snapshot_ids of any reachable commit whose snapshot file is absent from |
| 289 | the local store. Callers should surface this to the user before pushing — |
| 290 | a pack that contains a commit but not its snapshot creates a dangling |
| 291 | reference on the remote. |
| 292 | """ |
| 293 | |
| 294 | commits: list[CommitRecord] |
| 295 | snapshot_ids: set[str] |
| 296 | all_blob_ids: list[str] # sorted, deduplicated — blobs_to_send = manifest_blobs - have_blobs |
| 297 | have_blobs: set[str] # blob IDs reachable from any have-commit's snapshot |
| 298 | manifest_blobs: set[str] # blob IDs referenced by new commits' manifests (old and new alike) |
| 299 | oid_to_path: dict[str, str] # blob_id → repository path (from snapshot manifests) |
| 300 | missing_snapshots: set[str] # snapshot_ids present in commits but absent on disk |
| 301 | snapshot_deltas: list[SnapshotDeltaDict] # pre-computed, reuse in build_mpack_from_walk |
| 302 | |
| 303 | def walk_commits( |
| 304 | repo_root: pathlib.Path, |
| 305 | commit_ids: list[str], |
| 306 | *, |
| 307 | have: list[str] | None = None, |
| 308 | ) -> _WalkResult: |
| 309 | """BFS-walk the commit graph from *commit_ids*, stopping at *have*. |
| 310 | |
| 311 | Returns a :class:`_WalkResult` that can be passed to both |
| 312 | :func:`collect_blob_ids_from_walk` and :func:`build_mpack_from_walk` |
| 313 | to avoid repeating the traversal. |
| 314 | |
| 315 | This is the **single source of truth** for what goes into a push mpack. |
| 316 | Callers that need both the object ID list and the full pack should call |
| 317 | this once and pass the result to both downstream functions. |
| 318 | |
| 319 | Uses ``prune=lambda cid: cid in have_set`` so the walk terminates the |
| 320 | moment it reaches a commit the server already has — no ancestor subgraph |
| 321 | is expanded beyond the boundary. |
| 322 | """ |
| 323 | have_set: set[str] = set(have or []) |
| 324 | commits_to_send: list[CommitRecord] = list( |
| 325 | iter_ancestors(repo_root, commit_ids, prune=lambda cid: cid in have_set) |
| 326 | ) |
| 327 | |
| 328 | # Collect blobs already on the remote (have-commits' snapshots). |
| 329 | # Subtracting these gives us only genuinely new blobs to send. |
| 330 | have_blobs: set[str] = set() |
| 331 | for cid in have_set: |
| 332 | have_commit = read_commit(repo_root, cid) |
| 333 | if have_commit is not None: |
| 334 | have_snap = read_snapshot(repo_root, have_commit.snapshot_id) |
| 335 | if have_snap is not None: |
| 336 | have_blobs.update(have_snap.manifest.values()) |
| 337 | |
| 338 | snapshot_ids: set[str] = {c.snapshot_id for c in commits_to_send} |
| 339 | commits_oldest_first = list(reversed(commits_to_send)) |
| 340 | |
| 341 | # One pass: compute deltas (one read_snapshot per commit) then derive |
| 342 | # object IDs from delta_upsert — no separate manifest scan needed. |
| 343 | missing_snapshots: set[str] = set() |
| 344 | try: |
| 345 | snapshot_deltas = _build_snapshot_deltas(repo_root, commits_oldest_first) |
| 346 | except ValueError as exc: |
| 347 | # Missing snapshot — extract sid and record it. |
| 348 | missing_snapshots = { |
| 349 | sid for sid in snapshot_ids |
| 350 | if read_snapshot(repo_root, sid) is None |
| 351 | } |
| 352 | snapshot_deltas = [] |
| 353 | |
| 354 | manifest_blobs: set[str] = set(collect_blob_ids_from_deltas(snapshot_deltas)) |
| 355 | |
| 356 | # Build oid→path from delta_upsert entries (path → oid in each delta). |
| 357 | oid_to_path: dict[str, str] = {} |
| 358 | for delta in snapshot_deltas: |
| 359 | for path, oid in (delta.get("delta_upsert") or {}).items(): |
| 360 | oid_to_path[oid] = path |
| 361 | |
| 362 | blobs_to_send: set[str] = manifest_blobs - have_blobs |
| 363 | |
| 364 | if missing_snapshots: |
| 365 | for sid in sorted(missing_snapshots): |
| 366 | logger.warning( |
| 367 | "⚠️ walk_commits: snapshot %s is missing from the local store — " |
| 368 | "the commit(s) referencing it will be excluded from the pack. " |
| 369 | "Run `muse verify` to audit store integrity.", |
| 370 | sid, |
| 371 | ) |
| 372 | |
| 373 | return _WalkResult( |
| 374 | commits=commits_to_send, |
| 375 | snapshot_ids=snapshot_ids, |
| 376 | all_blob_ids=sorted(blobs_to_send), |
| 377 | have_blobs=have_blobs, |
| 378 | manifest_blobs=manifest_blobs, |
| 379 | oid_to_path=oid_to_path, |
| 380 | missing_snapshots=missing_snapshots, |
| 381 | snapshot_deltas=snapshot_deltas, |
| 382 | ) |
| 383 | |
| 384 | def stream_blob_chunks( |
| 385 | repo_root: pathlib.Path, |
| 386 | blob_ids: list[str], |
| 387 | chunk_size: int, |
| 388 | ) -> "collections.abc.Iterator[list[BlobPayload]]": |
| 389 | """Yield blobs in chunks of *chunk_size* as they are read from disk. |
| 390 | |
| 391 | This is the hot path for ``muse push`` — reads one chunk at a time to |
| 392 | avoid loading all blobs into RAM at once (peak RAM = one chunk, not the |
| 393 | full set). The caller can start the first upload while the second chunk |
| 394 | is still being assembled — reducing both peak memory and time-to-first-upload. |
| 395 | |
| 396 | Missing blobs are logged and skipped, consistent with :func:`build_mpack`. |
| 397 | """ |
| 398 | import collections.abc # local to avoid circular at module level |
| 399 | |
| 400 | chunk: list[BlobPayload] = [] |
| 401 | for oid in blob_ids: |
| 402 | raw = read_object(repo_root, oid) |
| 403 | if raw is None: |
| 404 | logger.warning("⚠️ stream_blob_chunks: blob %s absent — skipping", oid) |
| 405 | continue |
| 406 | chunk.append(BlobPayload(object_id=oid, content=raw)) |
| 407 | if len(chunk) >= chunk_size: |
| 408 | yield chunk |
| 409 | chunk = [] |
| 410 | if chunk: |
| 411 | yield chunk |
| 412 | |
| 413 | def collect_blob_ids_from_walk(walk: _WalkResult) -> list[str]: |
| 414 | """Return the sorted blob ID list from a pre-computed :func:`walk_commits` result. |
| 415 | |
| 416 | Zero disk I/O — the walk already read all snapshots. |
| 417 | """ |
| 418 | return walk["all_blob_ids"] |
| 419 | |
| 420 | |
| 421 | def collect_blob_ids_from_deltas(deltas: list[SnapshotDeltaDict]) -> list[str]: |
| 422 | """Return all unique object IDs from a pre-computed delta list. |
| 423 | |
| 424 | Zero additional disk I/O — extracts oids from delta_upsert.values() only. |
| 425 | The first delta encodes the full base manifest; subsequent deltas encode |
| 426 | only changed files. Their union is identical to the union of all full |
| 427 | manifests (proof: every oid ever introduced appears in exactly one |
| 428 | delta_upsert entry). |
| 429 | |
| 430 | Use this instead of collect_blob_ids on the mpack path — the deltas |
| 431 | are already computed by _build_snapshot_deltas (one read per snapshot), |
| 432 | so this is a pure in-memory operation. |
| 433 | """ |
| 434 | seen: set[str] = set() |
| 435 | for delta in deltas: |
| 436 | seen.update(delta.get("delta_upsert", {}).values()) |
| 437 | return sorted(seen) |
| 438 | |
| 439 | |
| 440 | def _build_snapshot_deltas( |
| 441 | repo_root: pathlib.Path, |
| 442 | commits_oldest_first: list[CommitRecord], |
| 443 | ) -> list[SnapshotDeltaDict]: |
| 444 | """Compute delta-encoded snapshots from a commit chain, oldest first. |
| 445 | |
| 446 | Each entry carries only the paths that changed relative to the previous |
| 447 | snapshot in the chain. The first entry (no parent in this mpack) uses |
| 448 | ``parent_snapshot_id=None`` and encodes the full manifest as ``delta_upsert``. |
| 449 | |
| 450 | Correctness invariant (content-addressing as proof):: |
| 451 | |
| 452 | manifest = apply_delta(prev_manifest, entry) |
| 453 | assert hash_snapshot(manifest) == entry["snapshot_id"] |
| 454 | |
| 455 | This replaces per-snapshot full-manifest storage with O(changed_files) |
| 456 | deltas — a 10–100× reduction for typical commit chains. |
| 457 | """ |
| 458 | deltas: list[SnapshotDeltaDict] = [] |
| 459 | seen_sids: set[str] = set() |
| 460 | prev_manifest: dict[str, str] = {} |
| 461 | prev_sid: str | None = None |
| 462 | |
| 463 | for commit in commits_oldest_first: |
| 464 | sid = commit.snapshot_id |
| 465 | if sid in seen_sids: |
| 466 | continue |
| 467 | seen_sids.add(sid) |
| 468 | |
| 469 | snap = read_snapshot(repo_root, sid) |
| 470 | if snap is None: |
| 471 | raise ValueError( |
| 472 | f"Push aborted: snapshot {sid} is missing from the local store " |
| 473 | f"but is required by a commit being sent. " |
| 474 | f"Run 'muse verify' to audit store integrity." |
| 475 | ) |
| 476 | manifest = snap.manifest |
| 477 | |
| 478 | delta_upsert = {k: v for k, v in manifest.items() if prev_manifest.get(k) != v} |
| 479 | delta_remove = [k for k in prev_manifest if k not in manifest] |
| 480 | |
| 481 | deltas.append(SnapshotDeltaDict( |
| 482 | snapshot_id=sid, |
| 483 | parent_snapshot_id=prev_sid, |
| 484 | delta_upsert=delta_upsert, |
| 485 | delta_remove=delta_remove, |
| 486 | directories=snap.directories or [], |
| 487 | )) |
| 488 | prev_manifest = manifest |
| 489 | prev_sid = sid |
| 490 | |
| 491 | return deltas |
| 492 | |
| 493 | |
| 494 | def _apply_snapshot_deltas( |
| 495 | raw_snapshots: list[SnapshotDeltaDict], |
| 496 | ) -> _SnapshotResolvedMap: |
| 497 | """Reconstruct full manifests from a delta chain. |
| 498 | |
| 499 | Applies each delta in order and verifies the result by hashing. |
| 500 | The hash check IS the integrity proof — no external validation needed. |
| 501 | |
| 502 | Two snapshot formats are accepted: |
| 503 | - Delta format (build_mpack): {snapshot_id, parent_snapshot_id, |
| 504 | delta_upsert, delta_remove} |
| 505 | - Full-manifest format: {snapshot_id, manifest, directories, ...} |
| 506 | |
| 507 | Corrupt or hash-mismatched entries are logged and skipped; they do not |
| 508 | block independent valid entries with parent_snapshot_id=None. Dependent |
| 509 | entries whose parent was skipped are also skipped (base = {} → hash |
| 510 | mismatch → skip). |
| 511 | |
| 512 | Returns {snapshot_id: (full_manifest, directories)} for every valid entry. |
| 513 | """ |
| 514 | resolved: _SnapshotResolvedMap = {} |
| 515 | for snap in raw_snapshots: |
| 516 | sid = snap.get("snapshot_id", "") |
| 517 | if not sid: |
| 518 | continue |
| 519 | |
| 520 | # Two formats arrive here: |
| 521 | # 1. Delta format (build_mpack): {snapshot_id, parent_snapshot_id, |
| 522 | # delta_upsert, delta_remove} — reconstruct from parent + diff. |
| 523 | # 2. Full-manifest format (via _coerce_snapshot_dict): |
| 524 | # {snapshot_id, manifest, directories, ...} — use manifest directly. |
| 525 | directories: list[str] = [] |
| 526 | manifest_raw = snap.get("manifest") |
| 527 | if isinstance(manifest_raw, dict): |
| 528 | base = {k: v for k, v in manifest_raw.items() |
| 529 | if isinstance(k, str) and isinstance(v, str)} |
| 530 | dirs_raw = snap.get("directories") |
| 531 | if isinstance(dirs_raw, list): |
| 532 | directories = [d for d in dirs_raw if isinstance(d, str)] |
| 533 | else: |
| 534 | parent_sid = snap.get("parent_snapshot_id") |
| 535 | delta_upsert: dict[str, str] = snap.get("delta_upsert") or {} |
| 536 | delta_remove: list[str] = snap.get("delta_remove") or [] |
| 537 | parent_entry = resolved.get(parent_sid) if parent_sid else None |
| 538 | base = dict(parent_entry[0]) if parent_entry else {} |
| 539 | base.update(delta_upsert) |
| 540 | for path in delta_remove: |
| 541 | base.pop(path, None) |
| 542 | |
| 543 | # Content-addressing IS the proof — hash the result. |
| 544 | try: |
| 545 | got = hash_snapshot(base, directories or None) |
| 546 | except ValueError as _hash_exc: |
| 547 | logger.warning( |
| 548 | "⚠️ apply_mpack: snapshot %s has invalid object IDs in delta — skipped: %s", |
| 549 | sid[:20], _hash_exc, |
| 550 | ) |
| 551 | continue |
| 552 | if got != sid: |
| 553 | logger.warning( |
| 554 | "⚠️ apply_mpack: snapshot %s hash mismatch " |
| 555 | "(reconstructed=%s) — skipped", |
| 556 | sid[:20], got[:20], |
| 557 | ) |
| 558 | continue |
| 559 | resolved[sid] = (base, directories) |
| 560 | return resolved |
| 561 | |
| 562 | |
| 563 | def build_mpack_from_walk( |
| 564 | repo_root: pathlib.Path, |
| 565 | walk: _WalkResult, |
| 566 | *, |
| 567 | only_blobs: set[str] | None = None, |
| 568 | repo_id: str = "", |
| 569 | compress: bool = False, |
| 570 | ) -> MPack: |
| 571 | """Assemble an :class:`MPack` from a pre-computed :func:`walk_commits` result. |
| 572 | |
| 573 | Avoids the second BFS traversal that :func:`build_mpack` would otherwise |
| 574 | perform. Only reads blob bytes for blobs in *only_blobs* (or all blobs |
| 575 | when *only_blobs* is ``None``). |
| 576 | |
| 577 | When *compress* is ``True``, each blob is zstd-compressed (level 3). |
| 578 | Falls back to ``raw`` when zstd makes the blob larger (rare for binary data). |
| 579 | Uses the zstandard C extension — one call per blob, no Python loop. |
| 580 | |
| 581 | Returns: |
| 582 | An :class:`MPack` ready for serialisation and transfer. |
| 583 | """ |
| 584 | missing_snapshots: set[str] = walk.get("missing_snapshots") or set() |
| 585 | all_blob_ids: set[str] = set(walk["all_blob_ids"]) |
| 586 | |
| 587 | # Hard failure on any missing snapshot — silently skipping would push |
| 588 | # commits without their snapshots, creating dangling references on the |
| 589 | # remote that can never be healed without rewriting history. |
| 590 | if missing_snapshots: |
| 591 | sample = sorted(missing_snapshots)[:3] |
| 592 | sample_str = ", ".join(sample) |
| 593 | raise ValueError( |
| 594 | f"Push aborted: {len(missing_snapshots)} snapshot(s) are missing from " |
| 595 | f"the local store but are required by commits being sent " |
| 596 | f"({sample_str}{'…' if len(missing_snapshots) > 3 else ''}). " |
| 597 | f"Run 'muse verify' to audit store integrity." |
| 598 | ) |
| 599 | |
| 600 | commits_to_send = list(walk["commits"]) |
| 601 | snapshot_deltas = walk["snapshot_deltas"] |
| 602 | |
| 603 | candidate_blob_ids = ( |
| 604 | all_blob_ids & only_blobs if only_blobs is not None else all_blob_ids |
| 605 | ) |
| 606 | |
| 607 | blob_payloads: list[BlobPayload] = [] |
| 608 | |
| 609 | if not compress: |
| 610 | for oid in sorted(candidate_blob_ids): |
| 611 | raw = read_object(repo_root, oid) |
| 612 | if raw is None: |
| 613 | logger.warning("⚠️ build_mpack_from_walk: blob %s absent — skipping", oid) |
| 614 | continue |
| 615 | blob_payloads.append(BlobPayload(object_id=oid, content=raw)) |
| 616 | else: |
| 617 | import zstandard as _zstd |
| 618 | cctx = _zstd.ZstdCompressor(level=3) |
| 619 | for oid in sorted(candidate_blob_ids): |
| 620 | raw = read_object(repo_root, oid) |
| 621 | if raw is None: |
| 622 | logger.warning("⚠️ build_mpack_from_walk: blob %s absent — skipping", oid) |
| 623 | continue |
| 624 | compressed = cctx.compress(raw) |
| 625 | if len(compressed) < len(raw): |
| 626 | blob_payloads.append(BlobPayload(object_id=oid, content=compressed, encoding="zstd")) |
| 627 | else: |
| 628 | blob_payloads.append(BlobPayload(object_id=oid, content=raw)) |
| 629 | |
| 630 | sent_commit_ids = [c.commit_id for c in commits_to_send] |
| 631 | wire_tags = _tags_for_commits(repo_root, sent_commit_ids, repo_id) if repo_id else [] |
| 632 | |
| 633 | total_bytes = sum(len(o.get("content") or b"") for o in blob_payloads) |
| 634 | agent_ids = sorted({ |
| 635 | c.to_dict().get("agent_id", "") for c in commits_to_send |
| 636 | if c.to_dict().get("agent_id") |
| 637 | }) |
| 638 | summary = MPackSummary( |
| 639 | commits_count=len(commits_to_send), |
| 640 | blobs_count=len(blob_payloads), |
| 641 | blobs_bytes=total_bytes, |
| 642 | branches={}, |
| 643 | agent_ids=agent_ids, |
| 644 | ) |
| 645 | mpack: MPack = { |
| 646 | "commits": [c.to_dict() for c in commits_to_send], |
| 647 | "snapshots": snapshot_deltas, |
| 648 | "blobs": blob_payloads, |
| 649 | "summary": summary, |
| 650 | "meta": MPackMeta( |
| 651 | mode="full", |
| 652 | base_commits=[], |
| 653 | created_at=datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), |
| 654 | ), |
| 655 | } |
| 656 | if wire_tags: |
| 657 | mpack["tags"] = wire_tags |
| 658 | |
| 659 | logger.info( |
| 660 | "✅ Built MPack (from walk): %d commits, %d snapshots, %d blobs, %d tags", |
| 661 | len(commits_to_send), |
| 662 | len(snapshot_deltas), |
| 663 | len(blob_payloads), |
| 664 | len(wire_tags), |
| 665 | ) |
| 666 | return mpack |
| 667 | |
| 668 | def _tags_for_commits( |
| 669 | repo_root: pathlib.Path, commit_ids: list[str], repo_id: str |
| 670 | ) -> list[WireTag]: |
| 671 | """Return all tags attached to *commit_ids* as serialisable :class:`WireTag` dicts.""" |
| 672 | seen_tag_ids: set[str] = set() |
| 673 | wire_tags: list[WireTag] = [] |
| 674 | for cid in commit_ids: |
| 675 | for tag in get_tags_for_commit(repo_root, repo_id, cid): |
| 676 | if tag.tag_id not in seen_tag_ids: |
| 677 | seen_tag_ids.add(tag.tag_id) |
| 678 | wire_tags.append(WireTag( |
| 679 | tag_id=tag.tag_id, |
| 680 | repo_id=tag.repo_id, |
| 681 | commit_id=tag.commit_id, |
| 682 | tag=tag.tag, |
| 683 | created_at=tag.created_at.isoformat(), |
| 684 | )) |
| 685 | return wire_tags |
| 686 | |
| 687 | def build_mpack( |
| 688 | repo_root: pathlib.Path, |
| 689 | commit_ids: list[str], |
| 690 | *, |
| 691 | have: list[str] | None = None, |
| 692 | only_blobs: set[str] | None = None, |
| 693 | repo_id: str = "", |
| 694 | ) -> MPack: |
| 695 | """Assemble an :class:`MPack` from *commit_ids*, excluding commits in *have*. |
| 696 | |
| 697 | Performs a BFS walk of the commit graph from every ID in *commit_ids*, |
| 698 | stopping at any commit already in *have*. Collects all snapshot manifests |
| 699 | and blobs reachable from the selected commits. |
| 700 | |
| 701 | Missing blobs or snapshots are logged and skipped — the caller decides |
| 702 | whether that constitutes an error. |
| 703 | |
| 704 | Args: |
| 705 | repo_root: Root of the Muse repository. |
| 706 | commit_ids: Tip commit IDs to include (e.g. current branch HEAD). |
| 707 | have: Commit IDs already known to the receiver. The BFS stops |
| 708 | at these, reducing mpack size. Pass ``None`` or ``[]`` |
| 709 | to send the full history. |
| 710 | only_blobs: When set, only include blobs whose IDs are in this set. |
| 711 | Pass the set of blobs missing from the remote so the |
| 712 | client only uploads what the remote actually needs. |
| 713 | repo_id: Repository content ID used to look up tags. When omitted, |
| 714 | tags are not included in the mpack. |
| 715 | |
| 716 | Returns: |
| 717 | An :class:`MPack` ready for serialisation and transfer. |
| 718 | """ |
| 719 | walk = walk_commits(repo_root, commit_ids, have=have) |
| 720 | if walk["missing_snapshots"]: |
| 721 | sample = sorted(walk["missing_snapshots"])[:3] |
| 722 | sample_str = ", ".join(sample) |
| 723 | raise ValueError( |
| 724 | f"Push aborted: {len(walk['missing_snapshots'])} snapshot(s) are missing from " |
| 725 | f"the local store but are required by commits being sent " |
| 726 | f"({sample_str}{'…' if len(walk['missing_snapshots']) > 3 else ''}). " |
| 727 | f"Run 'muse verify' to audit store integrity." |
| 728 | ) |
| 729 | |
| 730 | commits_to_send: list[CommitRecord] = list(walk["commits"]) |
| 731 | snapshot_deltas = walk["snapshot_deltas"] |
| 732 | all_blob_ids: set[str] = set(walk["all_blob_ids"]) |
| 733 | |
| 734 | # When only_blobs is provided skip any blob the remote already has — |
| 735 | # only transmit the missing delta. |
| 736 | candidate_blob_ids = ( |
| 737 | all_blob_ids & only_blobs if only_blobs is not None else all_blob_ids |
| 738 | ) |
| 739 | |
| 740 | promisor_remotes = load_promisor_remotes(repo_root) |
| 741 | blob_payloads: list[BlobPayload] = [] |
| 742 | for oid in sorted(candidate_blob_ids): |
| 743 | raw = read_object(repo_root, oid) |
| 744 | if raw is None: |
| 745 | state = object_state(repo_root, oid, promisor_remotes) |
| 746 | if state == ObjectState.PROMISED: |
| 747 | logger.debug("build_mpack: blob %s is PROMISED — skipping", short_id(oid)) |
| 748 | continue |
| 749 | raise ValueError( |
| 750 | f"Pack aborted: blob {oid} is missing from the local store " |
| 751 | f"and no promisor remote is configured. " |
| 752 | f"Run 'muse verify' to audit store integrity." |
| 753 | ) |
| 754 | blob_payloads.append(BlobPayload(object_id=oid, content=raw)) |
| 755 | |
| 756 | sent_commit_ids = [c.commit_id for c in commits_to_send] |
| 757 | wire_tags = _tags_for_commits(repo_root, sent_commit_ids, repo_id) if repo_id else [] |
| 758 | |
| 759 | total_bytes = sum(len(b["content"]) for b in blob_payloads) |
| 760 | agent_ids = sorted({ |
| 761 | c.to_dict().get("agent_id", "") for c in commits_to_send |
| 762 | if c.to_dict().get("agent_id") |
| 763 | }) |
| 764 | summary = MPackSummary( |
| 765 | commits_count=len(commits_to_send), |
| 766 | blobs_count=len(blob_payloads), |
| 767 | blobs_bytes=total_bytes, |
| 768 | branches={}, |
| 769 | agent_ids=agent_ids, |
| 770 | ) |
| 771 | _have_list = list(have or []) |
| 772 | mpack: MPack = { |
| 773 | "commits": [c.to_dict() for c in commits_to_send], |
| 774 | "snapshots": snapshot_deltas, |
| 775 | "blobs": blob_payloads, |
| 776 | "summary": summary, |
| 777 | "meta": MPackMeta( |
| 778 | mode="incremental" if _have_list else "full", |
| 779 | base_commits=_have_list, |
| 780 | created_at=datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), |
| 781 | ), |
| 782 | } |
| 783 | if wire_tags: |
| 784 | mpack["tags"] = wire_tags |
| 785 | |
| 786 | logger.info( |
| 787 | "✅ Built MPack: %d commits, %d snapshots, %d blobs, %d tags", |
| 788 | len(commits_to_send), |
| 789 | len(snapshot_deltas), |
| 790 | len(blob_payloads), |
| 791 | len(wire_tags), |
| 792 | ) |
| 793 | return mpack |
| 794 | |
| 795 | # --------------------------------------------------------------------------- |
| 796 | # Object ID collection — for pre-push deduplication negotiation |
| 797 | # --------------------------------------------------------------------------- |
| 798 | |
| 799 | def collect_blob_ids( |
| 800 | repo_root: pathlib.Path, |
| 801 | commit_ids: list[str], |
| 802 | *, |
| 803 | have: list[str] | None = None, |
| 804 | ) -> list[str]: |
| 805 | """Return all blob IDs reachable from *commit_ids*, excluding *have*. |
| 806 | |
| 807 | Identical BFS walk to :func:`build_mpack` but without reading object bytes. |
| 808 | Used by ``muse push`` for client-side deduplication — the result is compared |
| 809 | against the remote's known objects, and :func:`build_mpack` is called with |
| 810 | ``only_blobs`` set to the missing subset. This avoids loading any blob |
| 811 | content until we know it is actually needed. |
| 812 | |
| 813 | Uses ``prune=lambda cid: cid in have_set`` so the walk terminates the |
| 814 | moment it hits a server-known commit, without expanding its ancestor |
| 815 | subgraph. |
| 816 | |
| 817 | Args: |
| 818 | repo_root: Root of the Muse repository. |
| 819 | commit_ids: Tip commit IDs to examine. |
| 820 | have: Commit IDs already known to the receiver (BFS stops here). |
| 821 | |
| 822 | Returns: |
| 823 | Sorted list of object IDs reachable from the delta. |
| 824 | """ |
| 825 | have_set: set[str] = set(have or []) |
| 826 | commits_to_examine: list[CommitRecord] = list( |
| 827 | iter_ancestors(repo_root, commit_ids, prune=lambda cid: cid in have_set) |
| 828 | ) |
| 829 | |
| 830 | # Collect blobs already on the remote (have-commits' snapshots). |
| 831 | have_blobs: set[str] = set() |
| 832 | for cid in have_set: |
| 833 | have_commit = read_commit(repo_root, cid) |
| 834 | if have_commit is not None: |
| 835 | have_snap = read_snapshot(repo_root, have_commit.snapshot_id) |
| 836 | if have_snap is not None: |
| 837 | have_blobs.update(have_snap.manifest.values()) |
| 838 | |
| 839 | snapshot_ids: set[str] = {c.snapshot_id for c in commits_to_examine} |
| 840 | all_blob_ids: set[str] = set() |
| 841 | for sid in snapshot_ids: |
| 842 | snap = read_snapshot(repo_root, sid) |
| 843 | if snap is not None: |
| 844 | all_blob_ids.update(snap.manifest.values()) |
| 845 | |
| 846 | return sorted(all_blob_ids - have_blobs) |
| 847 | |
| 848 | |
| 849 | def compute_snapshot_delta( |
| 850 | base: _Manifest, |
| 851 | new: _Manifest, |
| 852 | ) -> tuple[_Manifest, list[str]]: |
| 853 | """Compute the delta between two snapshot manifests. |
| 854 | |
| 855 | Returns (added_or_modified, removed): |
| 856 | - added_or_modified: paths whose object_id changed or are new in *new* |
| 857 | - removed: paths present in *base* but absent from *new* |
| 858 | """ |
| 859 | added = {p: h for p, h in new.items() if base.get(p) != h} |
| 860 | removed = [p for p in base if p not in new] |
| 861 | return added, removed |
| 862 | |
| 863 | |
| 864 | def apply_snapshot_delta( |
| 865 | base: _Manifest, |
| 866 | added: _Manifest, |
| 867 | removed: list[str], |
| 868 | ) -> _Manifest: |
| 869 | """Reconstruct a full manifest by applying a delta to a base manifest. |
| 870 | |
| 871 | Inverse of compute_snapshot_delta: |
| 872 | apply_snapshot_delta(base, *compute_snapshot_delta(base, new)) == new |
| 873 | """ |
| 874 | manifest = dict(base) |
| 875 | manifest.update(added) |
| 876 | for path in removed: |
| 877 | manifest.pop(path, None) |
| 878 | return manifest |
| 879 | |
| 880 | |
| 881 | # --------------------------------------------------------------------------- |
| 882 | # Presign helpers |
| 883 | # --------------------------------------------------------------------------- |
| 884 | |
| 885 | class _PresignPayload(TypedDict): |
| 886 | mpack_key: str |
| 887 | size_bytes: int |
| 888 | |
| 889 | |
| 890 | class _UnpackPayload(TypedDict, total=False): |
| 891 | mpack_key: str |
| 892 | branch: str |
| 893 | head: str |
| 894 | commits_count: int |
| 895 | blobs_count: int |
| 896 | force: bool |
| 897 | |
| 898 | |
| 899 | def build_presign_payload(mpack_bytes: bytes) -> _PresignPayload: |
| 900 | """Return the request body for POST /push/mpack-presign. |
| 901 | |
| 902 | The server uses mpack_key to name the MinIO object and to verify integrity |
| 903 | after the client PUTs the bytes directly to MinIO. |
| 904 | """ |
| 905 | return {"mpack_key": blob_id(mpack_bytes), "size_bytes": len(mpack_bytes)} |
| 906 | |
| 907 | |
| 908 | def build_unpack_payload( |
| 909 | mpack_key: str, |
| 910 | *, |
| 911 | branch: str = "main", |
| 912 | head: str = "", |
| 913 | commits_count: int = 0, |
| 914 | blobs_count: int = 0, |
| 915 | force: bool = False, |
| 916 | ) -> _UnpackPayload: |
| 917 | """Return the request body for POST /push/unpack-mpack (Step 3).""" |
| 918 | return { |
| 919 | "mpack_key": mpack_key, |
| 920 | "branch": branch, |
| 921 | "head": head, |
| 922 | "commits_count": int(commits_count), |
| 923 | "blobs_count": int(blobs_count), |
| 924 | "force": force, |
| 925 | } |
| 926 | |
| 927 | |
| 928 | # --------------------------------------------------------------------------- |
| 929 | # Wire MPack encode / decode (Phase 3) |
| 930 | # --------------------------------------------------------------------------- |
| 931 | # |
| 932 | # Wire format: |
| 933 | # [4B] magic: b"MUSE" |
| 934 | # [1B] version: 1 |
| 935 | # [1B] section_count: N |
| 936 | # [N*17B] section table: each entry is (1B type, 8B offset LE, 8B length LE) |
| 937 | # [...] section data (concatenated, no padding) |
| 938 | # [32B] SHA-256 of every byte above (footer, not included in its own hash) |
| 939 | # |
| 940 | # Section types: |
| 941 | # 1 = BLOBS — raw _build_pack() bytes (byte-identical to Phase 1 .mpack) |
| 942 | # 2 = COMMITS — [8B count] + N × [8B record_len + JSON bytes] |
| 943 | # 3 = SNAPSHOTS — same length-prefixed JSON layout as COMMITS |
| 944 | # 4 = TAGS — same layout |
| 945 | # 5 = META — [8B json_len + JSON bytes] key-value pairs (repo_id, branch, head_commit_id) |
| 946 | |
| 947 | _WIRE_VERSION = 1 |
| 948 | _WIRE_SEC_BLOBS = 1 |
| 949 | _WIRE_SEC_COMMITS = 2 |
| 950 | _WIRE_SEC_SNAPSHOTS = 3 |
| 951 | _WIRE_SEC_TAGS = 4 |
| 952 | _WIRE_SEC_META = 5 |
| 953 | |
| 954 | |
| 955 | def _encode_records(records: list[dict]) -> bytes: |
| 956 | """Encode a list of dicts as [8B count] + N × [8B len + JSON bytes].""" |
| 957 | import json as _json |
| 958 | parts = [_struct.pack("<Q", len(records))] |
| 959 | for rec in records: |
| 960 | enc = _json.dumps(rec, separators=(",", ":")).encode() |
| 961 | parts.append(_struct.pack("<Q", len(enc))) |
| 962 | parts.append(enc) |
| 963 | return b"".join(parts) |
| 964 | |
| 965 | |
| 966 | def _decode_records(data: bytes) -> list[dict]: |
| 967 | """Decode [8B count] + N × [8B len + JSON bytes] into a list of dicts.""" |
| 968 | if len(data) < 8: |
| 969 | return [] |
| 970 | import json as _json |
| 971 | count = _struct.unpack_from("<Q", data, 0)[0] |
| 972 | cursor = 8 |
| 973 | records: list[dict] = [] |
| 974 | for _ in range(count): |
| 975 | if cursor + 8 > len(data): |
| 976 | break |
| 977 | rec_len = _struct.unpack_from("<Q", data, cursor)[0] |
| 978 | cursor += 8 |
| 979 | rec = _json.loads(data[cursor: cursor + rec_len]) |
| 980 | cursor += rec_len |
| 981 | records.append(rec) |
| 982 | return records |
| 983 | |
| 984 | |
| 985 | def _encode_meta(meta: _MetaDict) -> bytes: |
| 986 | """Encode a META dict as [8B count] + N × [8B key_len + key + 8B val_len + val].""" |
| 987 | import json as _json |
| 988 | enc = _json.dumps(meta, separators=(",", ":")).encode() |
| 989 | return _struct.pack("<Q", len(enc)) + enc |
| 990 | |
| 991 | |
| 992 | def _decode_meta(data: bytes) -> _MetaDict: |
| 993 | """Decode a META section back to a dict.""" |
| 994 | if len(data) < 8: |
| 995 | return {} |
| 996 | import json as _json |
| 997 | val_len = _struct.unpack_from("<Q", data, 0)[0] |
| 998 | return _json.loads(data[8: 8 + val_len]) |
| 999 | |
| 1000 | |
| 1001 | def build_wire_mpack(mpack: MPack, *, meta: _MetaDict | None = None) -> bytes: |
| 1002 | """Encode an :class:`MPack` as a wire MPack binary bundle. |
| 1003 | |
| 1004 | The OBJECTS section bytes are byte-identical to what :func:`write_pack` |
| 1005 | writes to disk in Phase 1, so the client can extract the section and |
| 1006 | write it directly without any per-object decode step. |
| 1007 | |
| 1008 | Returns: |
| 1009 | Raw bytes of the wire bundle, starting with ``b"MUSE"`` and ending |
| 1010 | with a 32-byte SHA-256 footer. |
| 1011 | """ |
| 1012 | from muse.core.pack_store import _build_pack as _ps_build_pack |
| 1013 | |
| 1014 | blobs = mpack.get("blobs") or [] |
| 1015 | commits = mpack.get("commits") or [] |
| 1016 | snapshots = mpack.get("snapshots") or [] |
| 1017 | tags = mpack.get("tags") or [] |
| 1018 | |
| 1019 | def _content(blob: BlobPayload) -> bytes: |
| 1020 | raw = blob["content"] |
| 1021 | if blob.get("encoding") == "zstd": |
| 1022 | import zstandard as _zstd |
| 1023 | return _zstd.ZstdDecompressor().decompress(raw) |
| 1024 | return raw |
| 1025 | |
| 1026 | obj_pairs = [(o["object_id"], _content(o)) for o in blobs] |
| 1027 | blobs_bytes = _ps_build_pack(obj_pairs) if obj_pairs else b"" |
| 1028 | commits_bytes = _encode_records(commits) |
| 1029 | snapshots_bytes = _encode_records(snapshots) |
| 1030 | tags_bytes = _encode_records(tags) |
| 1031 | meta_bytes = _encode_meta(meta) if meta else b"" |
| 1032 | |
| 1033 | sections = [ |
| 1034 | (_WIRE_SEC_BLOBS, blobs_bytes), |
| 1035 | (_WIRE_SEC_COMMITS, commits_bytes), |
| 1036 | (_WIRE_SEC_SNAPSHOTS, snapshots_bytes), |
| 1037 | (_WIRE_SEC_TAGS, tags_bytes), |
| 1038 | (_WIRE_SEC_META, meta_bytes), |
| 1039 | ] |
| 1040 | section_count = len(sections) |
| 1041 | # Header: 4B magic + 1B version + 1B section_count = 6B |
| 1042 | # Table: section_count × (1B type + 8B offset + 8B length) = section_count × 17B |
| 1043 | header_size = 6 + section_count * 17 |
| 1044 | |
| 1045 | offset = header_size |
| 1046 | table_entries: list[tuple[int, int, int]] = [] |
| 1047 | for sec_type, sec_data in sections: |
| 1048 | table_entries.append((sec_type, offset, len(sec_data))) |
| 1049 | offset += len(sec_data) |
| 1050 | |
| 1051 | h = _hashlib.sha256() |
| 1052 | parts: list[bytes] = [] |
| 1053 | |
| 1054 | def _emit(chunk: bytes) -> None: |
| 1055 | h.update(chunk) |
| 1056 | parts.append(chunk) |
| 1057 | |
| 1058 | _emit(b"MUSE") |
| 1059 | _emit(_struct.pack("<BB", _WIRE_VERSION, section_count)) |
| 1060 | for sec_type, sec_offset, sec_length in table_entries: |
| 1061 | _emit(_struct.pack("<BQQ", sec_type, sec_offset, sec_length)) |
| 1062 | for _, sec_data in sections: |
| 1063 | _emit(sec_data) |
| 1064 | parts.append(h.digest()) # footer — not fed back into h |
| 1065 | return b"".join(parts) |
| 1066 | |
| 1067 | |
| 1068 | def parse_wire_mpack(data: bytes) -> MPack: |
| 1069 | """Parse a wire MPack binary bundle back into an :class:`MPack` dict. |
| 1070 | |
| 1071 | Verifies the SHA-256 footer before parsing any section. |
| 1072 | |
| 1073 | Raises: |
| 1074 | ValueError: Bad magic bytes or unknown version. |
| 1075 | OSError: Footer integrity check failed. |
| 1076 | """ |
| 1077 | from muse.core.pack_store import _parse_pack_bytes as _ps_parse |
| 1078 | |
| 1079 | if len(data) < 38: # 4+1+1 header + 1×17 section entry + 32 footer (minimum) |
| 1080 | raise ValueError("Wire MPack too short") |
| 1081 | if data[:4] != b"MUSE": |
| 1082 | raise ValueError(f"Wire MPack bad magic: {data[:4]!r}") |
| 1083 | version = data[4] |
| 1084 | if version != _WIRE_VERSION: |
| 1085 | raise ValueError(f"Wire MPack unknown version: {version}") |
| 1086 | |
| 1087 | body = data[:-32] |
| 1088 | stored = data[-32:] |
| 1089 | if _hashlib.sha256(body).digest() != stored: |
| 1090 | raise OSError("Wire MPack failed SHA-256 integrity check") |
| 1091 | |
| 1092 | section_count = data[5] |
| 1093 | cursor = 6 |
| 1094 | sections: dict[int, bytes] = {} |
| 1095 | for _ in range(section_count): |
| 1096 | sec_type = data[cursor] |
| 1097 | sec_offset, sec_length = _struct.unpack_from("<QQ", data, cursor + 1) |
| 1098 | cursor += 17 |
| 1099 | sections[sec_type] = data[sec_offset: sec_offset + sec_length] |
| 1100 | |
| 1101 | obj_pairs = _ps_parse(sections.get(_WIRE_SEC_BLOBS, b"")) |
| 1102 | blobs: list[BlobPayload] = [ |
| 1103 | BlobPayload(object_id=oid, content=content) |
| 1104 | for oid, content in obj_pairs |
| 1105 | ] |
| 1106 | commits = _decode_records(sections.get(_WIRE_SEC_COMMITS, b"")) |
| 1107 | snapshots = _decode_records(sections.get(_WIRE_SEC_SNAPSHOTS, b"")) |
| 1108 | tags = _decode_records(sections.get(_WIRE_SEC_TAGS, b"")) |
| 1109 | meta = _decode_meta(sections.get(_WIRE_SEC_META, b"")) |
| 1110 | |
| 1111 | result = MPack( |
| 1112 | blobs=blobs, |
| 1113 | commits=commits, |
| 1114 | snapshots=snapshots, |
| 1115 | tags=tags, |
| 1116 | ) |
| 1117 | if meta: |
| 1118 | result["meta"] = meta |
| 1119 | return result |
| 1120 | |
| 1121 | |
| 1122 | # --------------------------------------------------------------------------- |
| 1123 | # Pack applying |
| 1124 | # --------------------------------------------------------------------------- |
| 1125 | |
| 1126 | def apply_mpack( |
| 1127 | repo_root: pathlib.Path, |
| 1128 | mpack: MPack, |
| 1129 | *, |
| 1130 | shallow_commits: "collections.abc.Container[str] | None" = None, |
| 1131 | ) -> ApplyResult: |
| 1132 | """Write the contents of *mpack* into a local ``.muse/`` directory. |
| 1133 | |
| 1134 | Writes in dependency order: objects first (blobs), then snapshots (which |
| 1135 | reference object IDs), then commits (which reference snapshot IDs). All |
| 1136 | writes are idempotent — already-present items are silently skipped. |
| 1137 | |
| 1138 | Args: |
| 1139 | repo_root: Root of the Muse repository to write into. |
| 1140 | mpack: :class:`MPack` received from the remote. |
| 1141 | shallow_commits: Optional set of boundary commit IDs (shallow clone). |
| 1142 | These commits are written even if their parents are |
| 1143 | absent from the local store. |
| 1144 | |
| 1145 | Returns: |
| 1146 | :class:`ApplyResult` with counts of newly written and skipped items. |
| 1147 | """ |
| 1148 | import sys as _sys |
| 1149 | import time as _time |
| 1150 | blobs_written = 0 |
| 1151 | blobs_skipped = 0 |
| 1152 | snapshots_written = 0 |
| 1153 | commits_written = 0 |
| 1154 | tags_written = 0 |
| 1155 | |
| 1156 | raw_blobs = mpack.get("blobs") or [] |
| 1157 | raw_snapshots = mpack.get("snapshots") or [] |
| 1158 | raw_commits = mpack.get("commits") or [] |
| 1159 | _t_apply_start = _time.monotonic() |
| 1160 | |
| 1161 | # Pack-bomb guard: cap the total number of items accepted per call. |
| 1162 | # A legitimate push carries at most tens of thousands of blobs per chunk; |
| 1163 | # a pack claiming millions is an adversarial input. |
| 1164 | total_items = len(raw_blobs) + len(raw_snapshots) + len(raw_commits) |
| 1165 | if total_items > MAX_PACK_OBJECTS: |
| 1166 | raise ValueError( |
| 1167 | f"Pack rejected: {total_items:,} total items exceeds the " |
| 1168 | f"{MAX_PACK_OBJECTS:,} item limit per apply_mpack call. " |
| 1169 | "Split the pack into smaller chunks." |
| 1170 | ) |
| 1171 | |
| 1172 | # Deduplicate blob IDs before the write loop. A malicious or buggy |
| 1173 | # sender may repeat the same blob ID N times, forcing N sha256 hashes |
| 1174 | # before the "already exists" short-circuit returns False. |
| 1175 | seen_blob_ids: set[str] = set() |
| 1176 | # Track blob IDs that failed validation so dependent snapshots and commits |
| 1177 | # can be skipped — prevents dangling reference chains in the store. |
| 1178 | failed_blob_ids: set[str] = set() |
| 1179 | # Blobs that pass all checks and are new to this store — written as a pack. |
| 1180 | pack_blobs: list[tuple[str, bytes]] = [] |
| 1181 | |
| 1182 | for obj in raw_blobs: |
| 1183 | oid = obj.get("object_id", "") |
| 1184 | raw = obj.get("content", b"") |
| 1185 | if not oid or not isinstance(raw, bytes): |
| 1186 | logger.warning("⚠️ apply_mpack: blob entry missing fields — skipped") |
| 1187 | continue |
| 1188 | if oid in seen_blob_ids: |
| 1189 | logger.debug("⚠️ apply_mpack: duplicate blob_id %s — skipped", short_id(oid)) |
| 1190 | blobs_skipped += 1 |
| 1191 | continue |
| 1192 | seen_blob_ids.add(oid) |
| 1193 | if len(raw) > MAX_OBJECT_WRITE_BYTES: |
| 1194 | logger.warning( |
| 1195 | "⚠️ apply_mpack: blob %s is %d bytes, exceeding %d MiB limit — skipped", |
| 1196 | oid, len(raw), MAX_OBJECT_WRITE_BYTES // (1024 * 1024), |
| 1197 | ) |
| 1198 | failed_blob_ids.add(oid) |
| 1199 | continue |
| 1200 | try: |
| 1201 | validate_object_id(oid) |
| 1202 | actual = blob_id(raw) |
| 1203 | if actual != oid: |
| 1204 | raise ValueError( |
| 1205 | f"Content integrity failure: expected {oid} got {actual}" |
| 1206 | ) |
| 1207 | except ValueError as exc: |
| 1208 | logger.warning("⚠️ apply_mpack: malformed blob entry — skipped: %s", exc) |
| 1209 | failed_blob_ids.add(oid) |
| 1210 | continue |
| 1211 | if has_object(repo_root, oid): |
| 1212 | blobs_skipped += 1 |
| 1213 | continue |
| 1214 | pack_blobs.append((oid, raw)) |
| 1215 | blobs_written += 1 |
| 1216 | |
| 1217 | # Write all new blobs as a single MPack file + index — O(1) file writes |
| 1218 | # regardless of blob count. Raises OSError on disk failure, which |
| 1219 | # propagates before any snapshot or commit is written. |
| 1220 | write_pack(repo_root, pack_blobs) |
| 1221 | |
| 1222 | # Reconstruct full manifests from the delta chain, then write snapshots. |
| 1223 | # The hash IS the proof: hash_snapshot(reconstructed) == snapshot_id. |
| 1224 | # _apply_snapshot_deltas logs and skips corrupt entries — never raises. |
| 1225 | resolved_manifests = _apply_snapshot_deltas(raw_snapshots) |
| 1226 | |
| 1227 | # Track snapshot IDs skipped due to referencing failed objects so dependent |
| 1228 | # commits can be skipped — prevents dangling commit → snapshot → (missing object). |
| 1229 | skipped_snapshot_ids: set[str] = set() |
| 1230 | # Track snapshot IDs that were successfully written in this mpack. |
| 1231 | # Used below to refuse commits whose snapshots were not sent (snaps=0 bug guard). |
| 1232 | written_snapshot_ids: set[str] = set() |
| 1233 | |
| 1234 | for snap_entry in raw_snapshots: |
| 1235 | sid = snap_entry.get("snapshot_id", "") |
| 1236 | if not sid or sid not in resolved_manifests: |
| 1237 | continue |
| 1238 | try: |
| 1239 | manifest, directories = resolved_manifests[sid] |
| 1240 | # Guard against zip-slip: manifest keys are stored as-is and later |
| 1241 | # used to construct checkout paths. A malicious mpack could inject |
| 1242 | # keys like "../../etc/cron.d/malicious". We validate all keys here — |
| 1243 | # before writing — so no traversal path ever enters the store. |
| 1244 | for key in manifest: |
| 1245 | validate_workspace_path(key) |
| 1246 | # Manifest values are object IDs — validate they are safe hex strings. |
| 1247 | for oid in manifest.values(): |
| 1248 | validate_object_id(oid) |
| 1249 | # Refuse to write a snapshot whose objects were not fully written — |
| 1250 | # that would create a dangling snapshot → object reference. |
| 1251 | missing = failed_blob_ids.intersection(manifest.values()) |
| 1252 | if missing: |
| 1253 | logger.warning( |
| 1254 | "⚠️ apply_mpack: snapshot %s skipped — references %d object(s) " |
| 1255 | "that failed to write", |
| 1256 | short_id(sid), len(missing), |
| 1257 | ) |
| 1258 | skipped_snapshot_ids.add(sid) |
| 1259 | continue |
| 1260 | snap = SnapshotRecord(snapshot_id=sid, manifest=manifest, directories=directories) |
| 1261 | is_new = read_snapshot(repo_root, snap.snapshot_id) is None |
| 1262 | write_snapshot(repo_root, snap, sync=False) |
| 1263 | written_snapshot_ids.add(sid) |
| 1264 | if is_new: |
| 1265 | snapshots_written += 1 |
| 1266 | except (KeyError, ValueError) as exc: |
| 1267 | logger.warning("⚠️ apply_mpack: malformed snapshot — skipped: %s", exc) |
| 1268 | |
| 1269 | # Parse all commits first so per-commit validation errors are counted |
| 1270 | # before any writes begin. |
| 1271 | parsed_commits: list[CommitRecord] = [] |
| 1272 | for commit_dict in raw_commits: |
| 1273 | try: |
| 1274 | commit = CommitRecord.from_dict(commit_dict) |
| 1275 | if not commit.commit_id: |
| 1276 | logger.warning("⚠️ apply_mpack: commit missing commit_id — skipped") |
| 1277 | continue |
| 1278 | if not commit.snapshot_id: |
| 1279 | logger.warning( |
| 1280 | "⚠️ apply_mpack: commit %s missing snapshot_id — skipped", |
| 1281 | commit.commit_id, |
| 1282 | ) |
| 1283 | continue |
| 1284 | # Refuse to write a commit whose snapshot was skipped due to failed |
| 1285 | # objects — that would create a dangling commit → snapshot reference. |
| 1286 | if commit.snapshot_id in skipped_snapshot_ids: |
| 1287 | logger.warning( |
| 1288 | "⚠️ apply_mpack: commit %s skipped — its snapshot %s was not written " |
| 1289 | "(referenced object(s) failed)", |
| 1290 | short_id(commit.commit_id), short_id(commit.snapshot_id), |
| 1291 | ) |
| 1292 | continue |
| 1293 | # Refuse to write a commit whose snapshot was not included in this |
| 1294 | # mpack AND is not already in the local store. Writing such a commit |
| 1295 | # corrupts the store: the commit is then present in `have` on the |
| 1296 | # next pull, the server returns nothing new, and pull aborts forever |
| 1297 | # with "snapshot missing". The correct behaviour is to skip the commit |
| 1298 | # so the next pull re-requests it together with its snapshot. |
| 1299 | if ( |
| 1300 | commit.snapshot_id not in written_snapshot_ids |
| 1301 | and read_snapshot(repo_root, commit.snapshot_id) is None |
| 1302 | ): |
| 1303 | logger.warning( |
| 1304 | "⚠️ apply_mpack: commit %s skipped — snapshot %s was not in this " |
| 1305 | "mpack and is not in the local store (server sent snaps=0). " |
| 1306 | "The next pull will re-request this commit with its snapshot.", |
| 1307 | short_id(commit.commit_id), short_id(commit.snapshot_id), |
| 1308 | ) |
| 1309 | continue |
| 1310 | parsed_commits.append(commit) |
| 1311 | except (KeyError, ValueError, TypeError) as exc: |
| 1312 | logger.warning("⚠️ apply_mpack: malformed commit — skipped: %s", exc) |
| 1313 | |
| 1314 | # Write commits in dependency order: retry until stable. |
| 1315 | # Bundles may arrive with commits in BFS order (newest-first). Phase 2's |
| 1316 | # parent-existence guard rejects a commit whose parent hasn't been written |
| 1317 | # yet. Keep cycling through MissingParentError commits until every parent |
| 1318 | # in the mpack has been written, or until a full pass produces no progress |
| 1319 | # (the missing parent isn't in this mpack — log and skip). |
| 1320 | _shallow: "collections.abc.Container[str]" = shallow_commits or () |
| 1321 | pending = parsed_commits |
| 1322 | while pending: |
| 1323 | deferred: list[CommitRecord] = [] |
| 1324 | for commit in pending: |
| 1325 | # Shallow boundary commits have their parent check bypassed so they |
| 1326 | # can be written even when their parents are not in the local store. |
| 1327 | is_shallow = commit.commit_id in _shallow |
| 1328 | try: |
| 1329 | is_new = not has_object(repo_root, commit.commit_id) |
| 1330 | write_commit(repo_root, commit, skip_parent_check=is_shallow, sync=False) |
| 1331 | if is_new: |
| 1332 | commits_written += 1 |
| 1333 | except MissingParentError: |
| 1334 | deferred.append(commit) |
| 1335 | except OSError as exc: |
| 1336 | logger.critical( |
| 1337 | "❌ apply_mpack: store integrity violation for commit — skipped: %s", exc |
| 1338 | ) |
| 1339 | except (ValueError, TypeError) as exc: |
| 1340 | logger.warning("⚠️ apply_mpack: malformed commit — skipped: %s", exc) |
| 1341 | if len(deferred) == len(pending): |
| 1342 | # No progress in this pass — the missing parents are not in the mpack. |
| 1343 | for commit in deferred: |
| 1344 | logger.warning( |
| 1345 | "⚠️ apply_mpack: commit %s skipped — parent not in mpack or local store", |
| 1346 | commit.commit_id, |
| 1347 | ) |
| 1348 | break |
| 1349 | pending = deferred |
| 1350 | |
| 1351 | # Bulk-fsync: all snapshot and commit files were written with sync=False |
| 1352 | # (no per-file fsync) for throughput. A single directory fsync here makes |
| 1353 | # all preceding renames durable in one barrier — the same strategy git uses |
| 1354 | # for pack writes. The ref is not updated until after apply_mpack returns, |
| 1355 | # so a crash before this fsync leaves the store in a consistent (if |
| 1356 | # incomplete) state that is safe to re-apply. |
| 1357 | if snapshots_written > 0 or commits_written > 0: |
| 1358 | from muse.core.paths import muse_dir as _muse_dir |
| 1359 | _dot = _muse_dir(repo_root) |
| 1360 | for _dir in (_dot / "snapshots", _dot / "commits"): |
| 1361 | if _dir.exists(): |
| 1362 | try: |
| 1363 | _fd = os.open(str(_dir), os.O_RDONLY) |
| 1364 | try: |
| 1365 | os.fsync(_fd) |
| 1366 | finally: |
| 1367 | os.close(_fd) |
| 1368 | except OSError: |
| 1369 | pass # best-effort — some filesystems (tmpfs) reject dir fsync |
| 1370 | |
| 1371 | for wire_tag in mpack.get("tags") or []: |
| 1372 | try: |
| 1373 | tag_record = TagRecord.from_dict(TagDict( |
| 1374 | tag_id=wire_tag["tag_id"], |
| 1375 | repo_id=wire_tag["repo_id"], |
| 1376 | commit_id=wire_tag["commit_id"], |
| 1377 | tag=wire_tag["tag"], |
| 1378 | created_at=wire_tag["created_at"], |
| 1379 | )) |
| 1380 | write_tag(repo_root, tag_record) |
| 1381 | tags_written += 1 |
| 1382 | except (KeyError, ValueError) as exc: |
| 1383 | logger.warning("⚠️ apply_mpack: malformed tag — skipped: %s", exc) |
| 1384 | |
| 1385 | logger.info( |
| 1386 | "✅ Applied pack: %d new blobs, %d new snapshots, %d new commits, %d tags (%d blobs skipped)", |
| 1387 | blobs_written, |
| 1388 | snapshots_written, |
| 1389 | commits_written, |
| 1390 | tags_written, |
| 1391 | blobs_skipped, |
| 1392 | ) |
| 1393 | return ApplyResult( |
| 1394 | commits_written=commits_written, |
| 1395 | snapshots_written=snapshots_written, |
| 1396 | blobs_written=blobs_written, |
| 1397 | blobs_skipped=blobs_skipped, |
| 1398 | tags_written=tags_written, |
| 1399 | failed_blobs=list(failed_blob_ids), |
| 1400 | skipped_snapshots=list(skipped_snapshot_ids), |
| 1401 | ) |
File History
13 commits
sha256:889892bcd79c5b77cfb314c90ff6caaeb9f6575dc0cab81b85259d5bbb235971
type fix
Human
patch
3 days ago
sha256:4d09a52c06fbc389006963ad1e5ca6ee48c3cb72799f1a322561035b263db67d
merge conflict resolve
Human
patch
3 days ago
sha256:b5ec4e4a3a73cae0cd08224f32090f2a4836afa0a804cb3231e70c42a3e89295
fix adapter for agent config
Human
patch
3 days ago
sha256:8e790785ceed39352c123388b8680542965afb7b2305a48827a9fbf4f3ae75fb
fix: build_wire_mpack decompresses zstd-encoded blobs befor…
Sonnet 4.6
patch
3 days ago
sha256:2c59968e5fd34f1740180d630338fddfb8c465b71e150a0965f11dbdcba5dec7
fix: apply_mpack refuses commits when their snapshot is abs…
Sonnet 4.6
patch
3 days ago
sha256:31316ea2e76b244dea3dd67bd0e608d56279ab9fdd2d4ecacbf41060d91f323f
merge dev → main: rc11, urllib migration, object store fixe…
Sonnet 4.6
minor
⚠
5 days ago
sha256:6a70ad4088ca3c07a6b1d85b257a735aa0927db2fc017ed046b982fe95805450
perf: remove debug print statements from mpack hot paths; f…
Sonnet 4.6
patch
5 days ago
sha256:35855b7bbce81b93612c655e23587bdebbb5fc09856ff5cbf5cd5b195d0547d4
merge: dev → main (rc11, urllib migration, object store inv…
Sonnet 4.6
patch
8 days ago
sha256:633dfa2940e97bf1a3d04996c772027a57d70d103f1693c96da04969613dba6c
fix: urllib migration regressions — force flag, job_id, Con…
Sonnet 4.6
minor
⚠
8 days ago
sha256:8ca6bcf5ebccd1b36d8ec716358b2b634d55e75db19c1d5d339082664c773006
fix: include directories in snapshot deltas sent in push mpack
Sonnet 4.6
9 days ago
sha256:85a465e59c29055c46f6a2a9efd5e2036f66857c638240b528e63b64b5d1d5af
fix: topo-sort snapshot delta chain in apply_mpack; decompr…
Sonnet 4.6
10 days ago
sha256:dbebcbb60d67ec6ee0f09918db3266576c7ca66e93381a69b67ba40fe74174f3
fix: decompress zstd blobs in build_wire_mpack — pack secti…
Sonnet 4.6
patch
10 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e
fix: rename objects→blobs in push client and all stale test…
Sonnet 4.6
patch
11 days ago