gabriel / muse public
mpack.py python
1,401 lines 54.0 KB Hotspot
Raw
1 """Muse MPack format — mpack of commits, snapshots, and blobs for wire transfer.
2
3 An :class:`MPack` is the unit of exchange between the Muse CLI and a remote
4 (e.g. MuseHub). It carries everything needed to reconstruct a slice of commit
5 history locally:
6
7 - :class:`CommitDict` records (full metadata + agent provenance)
8 - :class:`SnapshotDict` records (file manifests)
9 - :class:`BlobPayload` entries (raw blob bytes)
10 - ``summary`` (:class:`MPackSummary`) — advisory counts for agent routing
11
12 :func:`build_mpack` collects all data reachable from a set of commit IDs and
13 populates the summary field.
14 :func:`apply_mpack` writes a mpack into a local ``.muse/`` directory.
15
16 MPack wire encoding
17 ---------------------
18 An MPack is encoded in the MPack binary wire format (``b"MUSE"`` magic,
19 section table, SHA-256 footer) and transmitted with
20 ``Content-Type: application/x-muse-pack``.
21
22 Agent contract
23 --------------
24
25 - ``exit_code`` 0: all data applied successfully.
26 - ``exit_code`` 1: validation error (malformed object ID, path traversal).
27 - ``exit_code`` 3: I/O error reading from local store.
28 - ``duration_ms``: wall-clock milliseconds for build or apply.
29 """
30
31 import collections
32 import datetime
33 import hashlib as _hashlib
34 import logging
35 import os
36 import pathlib
37 import struct as _struct
38 from typing import TypedDict
39
40 from muse.core.graph import iter_ancestors
41 from muse.core.object_availability import ObjectState, load_promisor_remotes, object_state
42 from muse.core.object_store import has_object, read_object
43 from muse.core.pack_store import write_pack
44 from muse.core.ids import hash_snapshot
45 from muse.core.validation import (
46 MAX_OBJECT_WRITE_BYTES,
47 MAX_PACK_OBJECTS,
48 validate_object_id,
49 validate_workspace_path,
50 )
51 from muse.core.types import BranchHeads, blob_id, short_id
52 from muse.core.commits import (
53 CommitDict,
54 CommitRecord,
55 MissingParentError,
56 read_commit,
57 write_commit,
58 )
59 from muse.core.snapshots import (
60 SnapshotDict,
61 SnapshotRecord,
62 read_snapshot,
63 write_snapshot,
64 )
65 from muse.core.tags import (
66 TagDict,
67 TagRecord,
68 get_all_tags,
69 get_tags_for_commit,
70 write_tag,
71 )
72
73 logger = logging.getLogger(__name__)
74
75 # ---------------------------------------------------------------------------
76 # Type aliases — avoid bare dict[str, X] at boundaries
77 # ---------------------------------------------------------------------------
78
79 _Manifest = dict[str, str] # path → object_id
80 _JsonValue = str | int | float | bool | None
81 _MetaDict = dict[str, _JsonValue] # loose metadata (dynamic keys, JSON-serialisable)
82 _SnapshotResolvedMap = dict[str, tuple[_Manifest, list[str]]] # sid → (manifest, dirs)
83
84 # ---------------------------------------------------------------------------
85 # Wire-format TypedDicts
86 # ---------------------------------------------------------------------------
87
88 class _BlobPayloadBase(TypedDict):
89 """Required fields for every blob payload in an MPack."""
90
91 object_id: str
92 content: bytes
93
94 class BlobPayload(_BlobPayloadBase, total=False):
95 """A single content-addressed blob with encoding metadata for mpack transfer.
96
97 Required fields (always present):
98 object_id: Content-addressed SHA-256 identifier (``sha256:<hex>``).
99 content: Raw or encoded bytes — see *encoding*.
100
101 Optional fields (omit for ``"raw"`` with no base):
102 path: Repository path of this blob; used by the server to look
103 up delta base candidates for the next push.
104 encoding: ``"raw"`` (default) | ``"zlib"`` | ``"delta+zlib"``.
105 base_id: Base blob ID for ``"delta+zlib"`` encoding.
106 sz: Uncompressed byte count of the target blob. Required when
107 ``encoding`` is ``"delta+zlib"`` so the server can pre-allocate
108 before decompression. Ignored for ``"raw"`` payloads.
109 """
110
111 path: str
112 encoding: str
113 base_id: str
114 sz: int
115
116 class WireTag(TypedDict):
117 """A tag record serialised for wire transfer inside an :class:`MPack`."""
118
119 tag_id: str
120 repo_id: str
121 commit_id: str
122 tag: str
123 created_at: str
124
125 class MPackMeta(TypedDict, total=False):
126 """Self-describing metadata embedded in every :class:`MPack`.
127
128 Agents read this to understand the mpack's scope without inspecting
129 commits or objects.
130
131 Fields:
132 mode: ``"full"`` — all referenced objects must be in the mpack
133 or the local store. ``"incremental"`` — some objects are
134 expected to exist at the receiver's base (declared in
135 ``base_commits``); they are not included in this mpack.
136 base_commits: Commit IDs passed as ``--have`` when the mpack was
137 built. Empty for full bundles.
138 created_at: ISO 8601 UTC timestamp of when the mpack was assembled.
139 """
140
141 mode: str # "full" | "incremental"
142 base_commits: list[str] # sha256:-prefixed commit IDs
143 created_at: str # ISO 8601 — e.g. "2026-01-01T00:00:00Z"
144
145 class MPackSummary(TypedDict, total=False):
146 """Advisory summary embedded in every :class:`MPack`.
147
148 Agents read this to make routing/accept/reject decisions before
149 touching commits or objects. All fields are advisory — receivers
150 must not rely on them for correctness, only for optimisation.
151
152 Fields:
153 commits_count: Number of commits in this mpack.
154 blobs_count: Number of unique blobs in this mpack.
155 blobs_bytes: Total uncompressed blob bytes.
156 branches: Branch name → tip commit_id at build time.
157 agent_ids: All agent_id values from commits in this mpack.
158 """
159
160 commits_count: int
161 blobs_count: int
162 blobs_bytes: int
163 branches: BranchHeads # branch_name → commit_id
164 agent_ids: list[str] # distinct agent_ids in commits
165
166 class SnapshotDeltaDict(TypedDict, total=False):
167 """Wire representation of a snapshot as a delta from its parent snapshot.
168
169 Guiding principle: content-addressing is a proof, not a label.
170 ``snapshot_id = sha256(sorted path-NUL-oid pairs)``. A receiver who
171 holds ``snapshot_id`` and the delta can reconstruct the full manifest
172 and verify it by hashing — no external store needed.
173
174 Fields:
175 snapshot_id: sha256 of the *full* manifest (the proof).
176 parent_snapshot_id: snapshot_id of the parent, or ``None`` for root.
177 delta_upsert: Paths added or changed relative to parent.
178 delta_remove: Paths removed relative to parent.
179
180 Reconstruction::
181
182 manifest = dict(resolved[parent_snapshot_id]) # or {} if None
183 manifest.update(delta_upsert)
184 for path in delta_remove:
185 del manifest[path]
186 assert hash_snapshot(manifest) == snapshot_id # the math IS the proof
187 """
188
189 snapshot_id: str
190 parent_snapshot_id: str | None
191 delta_upsert: dict[str, str] # path → object_id
192 delta_remove: list[str] # paths removed
193
194
195 class MPack(TypedDict, total=False):
196 """The unit of exchange between the Muse CLI and a remote.
197
198 All fields are optional so that partial bundles (fetch-only, objects-only)
199 are valid wire messages. Callers check for presence before consuming.
200
201 The ``summary`` field carries advisory metadata for agent routing —
202 agents can make decisions from it without deserialising commits or objects.
203
204 The ``meta`` field declares the mpack's scope (full vs incremental) and
205 base commits, allowing receivers to verify it correctly without out-of-band
206 knowledge of how it was built.
207
208 Snapshots are stored as :class:`SnapshotDeltaDict` entries in commit-graph
209 order (oldest first). The first entry has ``parent_snapshot_id=None`` and
210 ``delta_upsert`` equal to the full manifest. Every subsequent entry carries
211 only the paths that changed. Receivers reconstruct full manifests by
212 applying the delta chain and verify correctness by hashing the result.
213 """
214
215 commits: list[CommitDict]
216 snapshots: list[SnapshotDeltaDict]
217 blobs: list[BlobPayload]
218 #: Tags attached to any commit included in this mpack.
219 tags: list[WireTag]
220 #: Advisory summary — populated by :func:`build_mpack`.
221 summary: MPackSummary
222 #: Self-describing metadata — always written by :func:`build_mpack`.
223 meta: MPackMeta
224
225 class RemoteInfo(TypedDict, total=False):
226 """Repository metadata returned by ``GET {url}/refs``."""
227
228 repo_id: str # always present
229 domain: str # always present
230 #: Maps branch name → commit ID for every branch on the remote.
231 branch_heads: BranchHeads # always present
232 default_branch: str # always present
233
234 class PushResult(TypedDict):
235 """Server response after a push attempt."""
236
237 ok: bool
238 message: str
239 #: Updated branch heads on the remote after the push (if successful).
240 branch_heads: BranchHeads
241
242 class FetchRequest(TypedDict, total=False):
243 """Body of ``POST {url}/fetch`` — negotiates which commits to transfer.
244
245 ``want`` lists commit IDs the client wants to receive.
246 ``have`` lists commit IDs already present locally, allowing the server
247 to send only the commits the client lacks (delta negotiation).
248 """
249
250 want: list[str]
251 have: list[str]
252
253 class ApplyResult(TypedDict):
254 """Counts returned by :func:`apply_mpack` describing what was written.
255
256 ``blobs_skipped`` counts blobs already present in the store (not
257 rewritten, idempotent). All other counts reflect *new* writes only.
258 ``tags_written`` counts tag records written from the mpack's ``tags``
259 section (0 for bundles created without tag data).
260 ``failed_blobs`` blob IDs that failed integrity or write checks.
261 ``skipped_snapshots`` snapshot IDs skipped because a referenced blob failed.
262 """
263
264 commits_written: int
265 snapshots_written: int
266 blobs_written: int
267 blobs_skipped: int
268 tags_written: int
269 failed_blobs: list[str]
270 skipped_snapshots: list[str]
271
272 # ---------------------------------------------------------------------------
273 # Pack building
274 # ---------------------------------------------------------------------------
275
276 class _WalkResult(TypedDict):
277 """Cached result of a BFS commit-graph walk.
278
279 Produced once by :func:`walk_commits` and consumed by both
280 :func:`collect_blob_ids_from_walk` (object ID collection) and
281 :func:`build_mpack_from_walk` (load blobs for transmission).
282
283 Sharing this avoids two identical BFS traversals per push: the first to
284 gather object IDs for client-side deduplication, and the second to load
285 blobs and assemble the pack mpack.
286
287 ``missing_snapshots`` is populated by :func:`walk_commits` with the
288 snapshot_ids of any reachable commit whose snapshot file is absent from
289 the local store. Callers should surface this to the user before pushing —
290 a pack that contains a commit but not its snapshot creates a dangling
291 reference on the remote.
292 """
293
294 commits: list[CommitRecord]
295 snapshot_ids: set[str]
296 all_blob_ids: list[str] # sorted, deduplicated — blobs_to_send = manifest_blobs - have_blobs
297 have_blobs: set[str] # blob IDs reachable from any have-commit's snapshot
298 manifest_blobs: set[str] # blob IDs referenced by new commits' manifests (old and new alike)
299 oid_to_path: dict[str, str] # blob_id → repository path (from snapshot manifests)
300 missing_snapshots: set[str] # snapshot_ids present in commits but absent on disk
301 snapshot_deltas: list[SnapshotDeltaDict] # pre-computed, reuse in build_mpack_from_walk
302
303 def walk_commits(
304 repo_root: pathlib.Path,
305 commit_ids: list[str],
306 *,
307 have: list[str] | None = None,
308 ) -> _WalkResult:
309 """BFS-walk the commit graph from *commit_ids*, stopping at *have*.
310
311 Returns a :class:`_WalkResult` that can be passed to both
312 :func:`collect_blob_ids_from_walk` and :func:`build_mpack_from_walk`
313 to avoid repeating the traversal.
314
315 This is the **single source of truth** for what goes into a push mpack.
316 Callers that need both the object ID list and the full pack should call
317 this once and pass the result to both downstream functions.
318
319 Uses ``prune=lambda cid: cid in have_set`` so the walk terminates the
320 moment it reaches a commit the server already has — no ancestor subgraph
321 is expanded beyond the boundary.
322 """
323 have_set: set[str] = set(have or [])
324 commits_to_send: list[CommitRecord] = list(
325 iter_ancestors(repo_root, commit_ids, prune=lambda cid: cid in have_set)
326 )
327
328 # Collect blobs already on the remote (have-commits' snapshots).
329 # Subtracting these gives us only genuinely new blobs to send.
330 have_blobs: set[str] = set()
331 for cid in have_set:
332 have_commit = read_commit(repo_root, cid)
333 if have_commit is not None:
334 have_snap = read_snapshot(repo_root, have_commit.snapshot_id)
335 if have_snap is not None:
336 have_blobs.update(have_snap.manifest.values())
337
338 snapshot_ids: set[str] = {c.snapshot_id for c in commits_to_send}
339 commits_oldest_first = list(reversed(commits_to_send))
340
341 # One pass: compute deltas (one read_snapshot per commit) then derive
342 # object IDs from delta_upsert — no separate manifest scan needed.
343 missing_snapshots: set[str] = set()
344 try:
345 snapshot_deltas = _build_snapshot_deltas(repo_root, commits_oldest_first)
346 except ValueError as exc:
347 # Missing snapshot — extract sid and record it.
348 missing_snapshots = {
349 sid for sid in snapshot_ids
350 if read_snapshot(repo_root, sid) is None
351 }
352 snapshot_deltas = []
353
354 manifest_blobs: set[str] = set(collect_blob_ids_from_deltas(snapshot_deltas))
355
356 # Build oid→path from delta_upsert entries (path → oid in each delta).
357 oid_to_path: dict[str, str] = {}
358 for delta in snapshot_deltas:
359 for path, oid in (delta.get("delta_upsert") or {}).items():
360 oid_to_path[oid] = path
361
362 blobs_to_send: set[str] = manifest_blobs - have_blobs
363
364 if missing_snapshots:
365 for sid in sorted(missing_snapshots):
366 logger.warning(
367 "⚠️ walk_commits: snapshot %s is missing from the local store — "
368 "the commit(s) referencing it will be excluded from the pack. "
369 "Run `muse verify` to audit store integrity.",
370 sid,
371 )
372
373 return _WalkResult(
374 commits=commits_to_send,
375 snapshot_ids=snapshot_ids,
376 all_blob_ids=sorted(blobs_to_send),
377 have_blobs=have_blobs,
378 manifest_blobs=manifest_blobs,
379 oid_to_path=oid_to_path,
380 missing_snapshots=missing_snapshots,
381 snapshot_deltas=snapshot_deltas,
382 )
383
384 def stream_blob_chunks(
385 repo_root: pathlib.Path,
386 blob_ids: list[str],
387 chunk_size: int,
388 ) -> "collections.abc.Iterator[list[BlobPayload]]":
389 """Yield blobs in chunks of *chunk_size* as they are read from disk.
390
391 This is the hot path for ``muse push`` — reads one chunk at a time to
392 avoid loading all blobs into RAM at once (peak RAM = one chunk, not the
393 full set). The caller can start the first upload while the second chunk
394 is still being assembled — reducing both peak memory and time-to-first-upload.
395
396 Missing blobs are logged and skipped, consistent with :func:`build_mpack`.
397 """
398 import collections.abc # local to avoid circular at module level
399
400 chunk: list[BlobPayload] = []
401 for oid in blob_ids:
402 raw = read_object(repo_root, oid)
403 if raw is None:
404 logger.warning("⚠️ stream_blob_chunks: blob %s absent — skipping", oid)
405 continue
406 chunk.append(BlobPayload(object_id=oid, content=raw))
407 if len(chunk) >= chunk_size:
408 yield chunk
409 chunk = []
410 if chunk:
411 yield chunk
412
413 def collect_blob_ids_from_walk(walk: _WalkResult) -> list[str]:
414 """Return the sorted blob ID list from a pre-computed :func:`walk_commits` result.
415
416 Zero disk I/O — the walk already read all snapshots.
417 """
418 return walk["all_blob_ids"]
419
420
421 def collect_blob_ids_from_deltas(deltas: list[SnapshotDeltaDict]) -> list[str]:
422 """Return all unique object IDs from a pre-computed delta list.
423
424 Zero additional disk I/O — extracts oids from delta_upsert.values() only.
425 The first delta encodes the full base manifest; subsequent deltas encode
426 only changed files. Their union is identical to the union of all full
427 manifests (proof: every oid ever introduced appears in exactly one
428 delta_upsert entry).
429
430 Use this instead of collect_blob_ids on the mpack path — the deltas
431 are already computed by _build_snapshot_deltas (one read per snapshot),
432 so this is a pure in-memory operation.
433 """
434 seen: set[str] = set()
435 for delta in deltas:
436 seen.update(delta.get("delta_upsert", {}).values())
437 return sorted(seen)
438
439
440 def _build_snapshot_deltas(
441 repo_root: pathlib.Path,
442 commits_oldest_first: list[CommitRecord],
443 ) -> list[SnapshotDeltaDict]:
444 """Compute delta-encoded snapshots from a commit chain, oldest first.
445
446 Each entry carries only the paths that changed relative to the previous
447 snapshot in the chain. The first entry (no parent in this mpack) uses
448 ``parent_snapshot_id=None`` and encodes the full manifest as ``delta_upsert``.
449
450 Correctness invariant (content-addressing as proof)::
451
452 manifest = apply_delta(prev_manifest, entry)
453 assert hash_snapshot(manifest) == entry["snapshot_id"]
454
455 This replaces per-snapshot full-manifest storage with O(changed_files)
456 deltas — a 10–100× reduction for typical commit chains.
457 """
458 deltas: list[SnapshotDeltaDict] = []
459 seen_sids: set[str] = set()
460 prev_manifest: dict[str, str] = {}
461 prev_sid: str | None = None
462
463 for commit in commits_oldest_first:
464 sid = commit.snapshot_id
465 if sid in seen_sids:
466 continue
467 seen_sids.add(sid)
468
469 snap = read_snapshot(repo_root, sid)
470 if snap is None:
471 raise ValueError(
472 f"Push aborted: snapshot {sid} is missing from the local store "
473 f"but is required by a commit being sent. "
474 f"Run 'muse verify' to audit store integrity."
475 )
476 manifest = snap.manifest
477
478 delta_upsert = {k: v for k, v in manifest.items() if prev_manifest.get(k) != v}
479 delta_remove = [k for k in prev_manifest if k not in manifest]
480
481 deltas.append(SnapshotDeltaDict(
482 snapshot_id=sid,
483 parent_snapshot_id=prev_sid,
484 delta_upsert=delta_upsert,
485 delta_remove=delta_remove,
486 directories=snap.directories or [],
487 ))
488 prev_manifest = manifest
489 prev_sid = sid
490
491 return deltas
492
493
494 def _apply_snapshot_deltas(
495 raw_snapshots: list[SnapshotDeltaDict],
496 ) -> _SnapshotResolvedMap:
497 """Reconstruct full manifests from a delta chain.
498
499 Applies each delta in order and verifies the result by hashing.
500 The hash check IS the integrity proof — no external validation needed.
501
502 Two snapshot formats are accepted:
503 - Delta format (build_mpack): {snapshot_id, parent_snapshot_id,
504 delta_upsert, delta_remove}
505 - Full-manifest format: {snapshot_id, manifest, directories, ...}
506
507 Corrupt or hash-mismatched entries are logged and skipped; they do not
508 block independent valid entries with parent_snapshot_id=None. Dependent
509 entries whose parent was skipped are also skipped (base = {} → hash
510 mismatch → skip).
511
512 Returns {snapshot_id: (full_manifest, directories)} for every valid entry.
513 """
514 resolved: _SnapshotResolvedMap = {}
515 for snap in raw_snapshots:
516 sid = snap.get("snapshot_id", "")
517 if not sid:
518 continue
519
520 # Two formats arrive here:
521 # 1. Delta format (build_mpack): {snapshot_id, parent_snapshot_id,
522 # delta_upsert, delta_remove} — reconstruct from parent + diff.
523 # 2. Full-manifest format (via _coerce_snapshot_dict):
524 # {snapshot_id, manifest, directories, ...} — use manifest directly.
525 directories: list[str] = []
526 manifest_raw = snap.get("manifest")
527 if isinstance(manifest_raw, dict):
528 base = {k: v for k, v in manifest_raw.items()
529 if isinstance(k, str) and isinstance(v, str)}
530 dirs_raw = snap.get("directories")
531 if isinstance(dirs_raw, list):
532 directories = [d for d in dirs_raw if isinstance(d, str)]
533 else:
534 parent_sid = snap.get("parent_snapshot_id")
535 delta_upsert: dict[str, str] = snap.get("delta_upsert") or {}
536 delta_remove: list[str] = snap.get("delta_remove") or []
537 parent_entry = resolved.get(parent_sid) if parent_sid else None
538 base = dict(parent_entry[0]) if parent_entry else {}
539 base.update(delta_upsert)
540 for path in delta_remove:
541 base.pop(path, None)
542
543 # Content-addressing IS the proof — hash the result.
544 try:
545 got = hash_snapshot(base, directories or None)
546 except ValueError as _hash_exc:
547 logger.warning(
548 "⚠️ apply_mpack: snapshot %s has invalid object IDs in delta — skipped: %s",
549 sid[:20], _hash_exc,
550 )
551 continue
552 if got != sid:
553 logger.warning(
554 "⚠️ apply_mpack: snapshot %s hash mismatch "
555 "(reconstructed=%s) — skipped",
556 sid[:20], got[:20],
557 )
558 continue
559 resolved[sid] = (base, directories)
560 return resolved
561
562
563 def build_mpack_from_walk(
564 repo_root: pathlib.Path,
565 walk: _WalkResult,
566 *,
567 only_blobs: set[str] | None = None,
568 repo_id: str = "",
569 compress: bool = False,
570 ) -> MPack:
571 """Assemble an :class:`MPack` from a pre-computed :func:`walk_commits` result.
572
573 Avoids the second BFS traversal that :func:`build_mpack` would otherwise
574 perform. Only reads blob bytes for blobs in *only_blobs* (or all blobs
575 when *only_blobs* is ``None``).
576
577 When *compress* is ``True``, each blob is zstd-compressed (level 3).
578 Falls back to ``raw`` when zstd makes the blob larger (rare for binary data).
579 Uses the zstandard C extension — one call per blob, no Python loop.
580
581 Returns:
582 An :class:`MPack` ready for serialisation and transfer.
583 """
584 missing_snapshots: set[str] = walk.get("missing_snapshots") or set()
585 all_blob_ids: set[str] = set(walk["all_blob_ids"])
586
587 # Hard failure on any missing snapshot — silently skipping would push
588 # commits without their snapshots, creating dangling references on the
589 # remote that can never be healed without rewriting history.
590 if missing_snapshots:
591 sample = sorted(missing_snapshots)[:3]
592 sample_str = ", ".join(sample)
593 raise ValueError(
594 f"Push aborted: {len(missing_snapshots)} snapshot(s) are missing from "
595 f"the local store but are required by commits being sent "
596 f"({sample_str}{'…' if len(missing_snapshots) > 3 else ''}). "
597 f"Run 'muse verify' to audit store integrity."
598 )
599
600 commits_to_send = list(walk["commits"])
601 snapshot_deltas = walk["snapshot_deltas"]
602
603 candidate_blob_ids = (
604 all_blob_ids & only_blobs if only_blobs is not None else all_blob_ids
605 )
606
607 blob_payloads: list[BlobPayload] = []
608
609 if not compress:
610 for oid in sorted(candidate_blob_ids):
611 raw = read_object(repo_root, oid)
612 if raw is None:
613 logger.warning("⚠️ build_mpack_from_walk: blob %s absent — skipping", oid)
614 continue
615 blob_payloads.append(BlobPayload(object_id=oid, content=raw))
616 else:
617 import zstandard as _zstd
618 cctx = _zstd.ZstdCompressor(level=3)
619 for oid in sorted(candidate_blob_ids):
620 raw = read_object(repo_root, oid)
621 if raw is None:
622 logger.warning("⚠️ build_mpack_from_walk: blob %s absent — skipping", oid)
623 continue
624 compressed = cctx.compress(raw)
625 if len(compressed) < len(raw):
626 blob_payloads.append(BlobPayload(object_id=oid, content=compressed, encoding="zstd"))
627 else:
628 blob_payloads.append(BlobPayload(object_id=oid, content=raw))
629
630 sent_commit_ids = [c.commit_id for c in commits_to_send]
631 wire_tags = _tags_for_commits(repo_root, sent_commit_ids, repo_id) if repo_id else []
632
633 total_bytes = sum(len(o.get("content") or b"") for o in blob_payloads)
634 agent_ids = sorted({
635 c.to_dict().get("agent_id", "") for c in commits_to_send
636 if c.to_dict().get("agent_id")
637 })
638 summary = MPackSummary(
639 commits_count=len(commits_to_send),
640 blobs_count=len(blob_payloads),
641 blobs_bytes=total_bytes,
642 branches={},
643 agent_ids=agent_ids,
644 )
645 mpack: MPack = {
646 "commits": [c.to_dict() for c in commits_to_send],
647 "snapshots": snapshot_deltas,
648 "blobs": blob_payloads,
649 "summary": summary,
650 "meta": MPackMeta(
651 mode="full",
652 base_commits=[],
653 created_at=datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
654 ),
655 }
656 if wire_tags:
657 mpack["tags"] = wire_tags
658
659 logger.info(
660 "✅ Built MPack (from walk): %d commits, %d snapshots, %d blobs, %d tags",
661 len(commits_to_send),
662 len(snapshot_deltas),
663 len(blob_payloads),
664 len(wire_tags),
665 )
666 return mpack
667
668 def _tags_for_commits(
669 repo_root: pathlib.Path, commit_ids: list[str], repo_id: str
670 ) -> list[WireTag]:
671 """Return all tags attached to *commit_ids* as serialisable :class:`WireTag` dicts."""
672 seen_tag_ids: set[str] = set()
673 wire_tags: list[WireTag] = []
674 for cid in commit_ids:
675 for tag in get_tags_for_commit(repo_root, repo_id, cid):
676 if tag.tag_id not in seen_tag_ids:
677 seen_tag_ids.add(tag.tag_id)
678 wire_tags.append(WireTag(
679 tag_id=tag.tag_id,
680 repo_id=tag.repo_id,
681 commit_id=tag.commit_id,
682 tag=tag.tag,
683 created_at=tag.created_at.isoformat(),
684 ))
685 return wire_tags
686
687 def build_mpack(
688 repo_root: pathlib.Path,
689 commit_ids: list[str],
690 *,
691 have: list[str] | None = None,
692 only_blobs: set[str] | None = None,
693 repo_id: str = "",
694 ) -> MPack:
695 """Assemble an :class:`MPack` from *commit_ids*, excluding commits in *have*.
696
697 Performs a BFS walk of the commit graph from every ID in *commit_ids*,
698 stopping at any commit already in *have*. Collects all snapshot manifests
699 and blobs reachable from the selected commits.
700
701 Missing blobs or snapshots are logged and skipped — the caller decides
702 whether that constitutes an error.
703
704 Args:
705 repo_root: Root of the Muse repository.
706 commit_ids: Tip commit IDs to include (e.g. current branch HEAD).
707 have: Commit IDs already known to the receiver. The BFS stops
708 at these, reducing mpack size. Pass ``None`` or ``[]``
709 to send the full history.
710 only_blobs: When set, only include blobs whose IDs are in this set.
711 Pass the set of blobs missing from the remote so the
712 client only uploads what the remote actually needs.
713 repo_id: Repository content ID used to look up tags. When omitted,
714 tags are not included in the mpack.
715
716 Returns:
717 An :class:`MPack` ready for serialisation and transfer.
718 """
719 walk = walk_commits(repo_root, commit_ids, have=have)
720 if walk["missing_snapshots"]:
721 sample = sorted(walk["missing_snapshots"])[:3]
722 sample_str = ", ".join(sample)
723 raise ValueError(
724 f"Push aborted: {len(walk['missing_snapshots'])} snapshot(s) are missing from "
725 f"the local store but are required by commits being sent "
726 f"({sample_str}{'…' if len(walk['missing_snapshots']) > 3 else ''}). "
727 f"Run 'muse verify' to audit store integrity."
728 )
729
730 commits_to_send: list[CommitRecord] = list(walk["commits"])
731 snapshot_deltas = walk["snapshot_deltas"]
732 all_blob_ids: set[str] = set(walk["all_blob_ids"])
733
734 # When only_blobs is provided skip any blob the remote already has —
735 # only transmit the missing delta.
736 candidate_blob_ids = (
737 all_blob_ids & only_blobs if only_blobs is not None else all_blob_ids
738 )
739
740 promisor_remotes = load_promisor_remotes(repo_root)
741 blob_payloads: list[BlobPayload] = []
742 for oid in sorted(candidate_blob_ids):
743 raw = read_object(repo_root, oid)
744 if raw is None:
745 state = object_state(repo_root, oid, promisor_remotes)
746 if state == ObjectState.PROMISED:
747 logger.debug("build_mpack: blob %s is PROMISED — skipping", short_id(oid))
748 continue
749 raise ValueError(
750 f"Pack aborted: blob {oid} is missing from the local store "
751 f"and no promisor remote is configured. "
752 f"Run 'muse verify' to audit store integrity."
753 )
754 blob_payloads.append(BlobPayload(object_id=oid, content=raw))
755
756 sent_commit_ids = [c.commit_id for c in commits_to_send]
757 wire_tags = _tags_for_commits(repo_root, sent_commit_ids, repo_id) if repo_id else []
758
759 total_bytes = sum(len(b["content"]) for b in blob_payloads)
760 agent_ids = sorted({
761 c.to_dict().get("agent_id", "") for c in commits_to_send
762 if c.to_dict().get("agent_id")
763 })
764 summary = MPackSummary(
765 commits_count=len(commits_to_send),
766 blobs_count=len(blob_payloads),
767 blobs_bytes=total_bytes,
768 branches={},
769 agent_ids=agent_ids,
770 )
771 _have_list = list(have or [])
772 mpack: MPack = {
773 "commits": [c.to_dict() for c in commits_to_send],
774 "snapshots": snapshot_deltas,
775 "blobs": blob_payloads,
776 "summary": summary,
777 "meta": MPackMeta(
778 mode="incremental" if _have_list else "full",
779 base_commits=_have_list,
780 created_at=datetime.datetime.now(datetime.timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
781 ),
782 }
783 if wire_tags:
784 mpack["tags"] = wire_tags
785
786 logger.info(
787 "✅ Built MPack: %d commits, %d snapshots, %d blobs, %d tags",
788 len(commits_to_send),
789 len(snapshot_deltas),
790 len(blob_payloads),
791 len(wire_tags),
792 )
793 return mpack
794
795 # ---------------------------------------------------------------------------
796 # Object ID collection — for pre-push deduplication negotiation
797 # ---------------------------------------------------------------------------
798
799 def collect_blob_ids(
800 repo_root: pathlib.Path,
801 commit_ids: list[str],
802 *,
803 have: list[str] | None = None,
804 ) -> list[str]:
805 """Return all blob IDs reachable from *commit_ids*, excluding *have*.
806
807 Identical BFS walk to :func:`build_mpack` but without reading object bytes.
808 Used by ``muse push`` for client-side deduplication — the result is compared
809 against the remote's known objects, and :func:`build_mpack` is called with
810 ``only_blobs`` set to the missing subset. This avoids loading any blob
811 content until we know it is actually needed.
812
813 Uses ``prune=lambda cid: cid in have_set`` so the walk terminates the
814 moment it hits a server-known commit, without expanding its ancestor
815 subgraph.
816
817 Args:
818 repo_root: Root of the Muse repository.
819 commit_ids: Tip commit IDs to examine.
820 have: Commit IDs already known to the receiver (BFS stops here).
821
822 Returns:
823 Sorted list of object IDs reachable from the delta.
824 """
825 have_set: set[str] = set(have or [])
826 commits_to_examine: list[CommitRecord] = list(
827 iter_ancestors(repo_root, commit_ids, prune=lambda cid: cid in have_set)
828 )
829
830 # Collect blobs already on the remote (have-commits' snapshots).
831 have_blobs: set[str] = set()
832 for cid in have_set:
833 have_commit = read_commit(repo_root, cid)
834 if have_commit is not None:
835 have_snap = read_snapshot(repo_root, have_commit.snapshot_id)
836 if have_snap is not None:
837 have_blobs.update(have_snap.manifest.values())
838
839 snapshot_ids: set[str] = {c.snapshot_id for c in commits_to_examine}
840 all_blob_ids: set[str] = set()
841 for sid in snapshot_ids:
842 snap = read_snapshot(repo_root, sid)
843 if snap is not None:
844 all_blob_ids.update(snap.manifest.values())
845
846 return sorted(all_blob_ids - have_blobs)
847
848
849 def compute_snapshot_delta(
850 base: _Manifest,
851 new: _Manifest,
852 ) -> tuple[_Manifest, list[str]]:
853 """Compute the delta between two snapshot manifests.
854
855 Returns (added_or_modified, removed):
856 - added_or_modified: paths whose object_id changed or are new in *new*
857 - removed: paths present in *base* but absent from *new*
858 """
859 added = {p: h for p, h in new.items() if base.get(p) != h}
860 removed = [p for p in base if p not in new]
861 return added, removed
862
863
864 def apply_snapshot_delta(
865 base: _Manifest,
866 added: _Manifest,
867 removed: list[str],
868 ) -> _Manifest:
869 """Reconstruct a full manifest by applying a delta to a base manifest.
870
871 Inverse of compute_snapshot_delta:
872 apply_snapshot_delta(base, *compute_snapshot_delta(base, new)) == new
873 """
874 manifest = dict(base)
875 manifest.update(added)
876 for path in removed:
877 manifest.pop(path, None)
878 return manifest
879
880
881 # ---------------------------------------------------------------------------
882 # Presign helpers
883 # ---------------------------------------------------------------------------
884
885 class _PresignPayload(TypedDict):
886 mpack_key: str
887 size_bytes: int
888
889
890 class _UnpackPayload(TypedDict, total=False):
891 mpack_key: str
892 branch: str
893 head: str
894 commits_count: int
895 blobs_count: int
896 force: bool
897
898
899 def build_presign_payload(mpack_bytes: bytes) -> _PresignPayload:
900 """Return the request body for POST /push/mpack-presign.
901
902 The server uses mpack_key to name the MinIO object and to verify integrity
903 after the client PUTs the bytes directly to MinIO.
904 """
905 return {"mpack_key": blob_id(mpack_bytes), "size_bytes": len(mpack_bytes)}
906
907
908 def build_unpack_payload(
909 mpack_key: str,
910 *,
911 branch: str = "main",
912 head: str = "",
913 commits_count: int = 0,
914 blobs_count: int = 0,
915 force: bool = False,
916 ) -> _UnpackPayload:
917 """Return the request body for POST /push/unpack-mpack (Step 3)."""
918 return {
919 "mpack_key": mpack_key,
920 "branch": branch,
921 "head": head,
922 "commits_count": int(commits_count),
923 "blobs_count": int(blobs_count),
924 "force": force,
925 }
926
927
928 # ---------------------------------------------------------------------------
929 # Wire MPack encode / decode (Phase 3)
930 # ---------------------------------------------------------------------------
931 #
932 # Wire format:
933 # [4B] magic: b"MUSE"
934 # [1B] version: 1
935 # [1B] section_count: N
936 # [N*17B] section table: each entry is (1B type, 8B offset LE, 8B length LE)
937 # [...] section data (concatenated, no padding)
938 # [32B] SHA-256 of every byte above (footer, not included in its own hash)
939 #
940 # Section types:
941 # 1 = BLOBS — raw _build_pack() bytes (byte-identical to Phase 1 .mpack)
942 # 2 = COMMITS — [8B count] + N × [8B record_len + JSON bytes]
943 # 3 = SNAPSHOTS — same length-prefixed JSON layout as COMMITS
944 # 4 = TAGS — same layout
945 # 5 = META — [8B json_len + JSON bytes] key-value pairs (repo_id, branch, head_commit_id)
946
947 _WIRE_VERSION = 1
948 _WIRE_SEC_BLOBS = 1
949 _WIRE_SEC_COMMITS = 2
950 _WIRE_SEC_SNAPSHOTS = 3
951 _WIRE_SEC_TAGS = 4
952 _WIRE_SEC_META = 5
953
954
955 def _encode_records(records: list[dict]) -> bytes:
956 """Encode a list of dicts as [8B count] + N × [8B len + JSON bytes]."""
957 import json as _json
958 parts = [_struct.pack("<Q", len(records))]
959 for rec in records:
960 enc = _json.dumps(rec, separators=(",", ":")).encode()
961 parts.append(_struct.pack("<Q", len(enc)))
962 parts.append(enc)
963 return b"".join(parts)
964
965
966 def _decode_records(data: bytes) -> list[dict]:
967 """Decode [8B count] + N × [8B len + JSON bytes] into a list of dicts."""
968 if len(data) < 8:
969 return []
970 import json as _json
971 count = _struct.unpack_from("<Q", data, 0)[0]
972 cursor = 8
973 records: list[dict] = []
974 for _ in range(count):
975 if cursor + 8 > len(data):
976 break
977 rec_len = _struct.unpack_from("<Q", data, cursor)[0]
978 cursor += 8
979 rec = _json.loads(data[cursor: cursor + rec_len])
980 cursor += rec_len
981 records.append(rec)
982 return records
983
984
985 def _encode_meta(meta: _MetaDict) -> bytes:
986 """Encode a META dict as [8B count] + N × [8B key_len + key + 8B val_len + val]."""
987 import json as _json
988 enc = _json.dumps(meta, separators=(",", ":")).encode()
989 return _struct.pack("<Q", len(enc)) + enc
990
991
992 def _decode_meta(data: bytes) -> _MetaDict:
993 """Decode a META section back to a dict."""
994 if len(data) < 8:
995 return {}
996 import json as _json
997 val_len = _struct.unpack_from("<Q", data, 0)[0]
998 return _json.loads(data[8: 8 + val_len])
999
1000
1001 def build_wire_mpack(mpack: MPack, *, meta: _MetaDict | None = None) -> bytes:
1002 """Encode an :class:`MPack` as a wire MPack binary bundle.
1003
1004 The OBJECTS section bytes are byte-identical to what :func:`write_pack`
1005 writes to disk in Phase 1, so the client can extract the section and
1006 write it directly without any per-object decode step.
1007
1008 Returns:
1009 Raw bytes of the wire bundle, starting with ``b"MUSE"`` and ending
1010 with a 32-byte SHA-256 footer.
1011 """
1012 from muse.core.pack_store import _build_pack as _ps_build_pack
1013
1014 blobs = mpack.get("blobs") or []
1015 commits = mpack.get("commits") or []
1016 snapshots = mpack.get("snapshots") or []
1017 tags = mpack.get("tags") or []
1018
1019 def _content(blob: BlobPayload) -> bytes:
1020 raw = blob["content"]
1021 if blob.get("encoding") == "zstd":
1022 import zstandard as _zstd
1023 return _zstd.ZstdDecompressor().decompress(raw)
1024 return raw
1025
1026 obj_pairs = [(o["object_id"], _content(o)) for o in blobs]
1027 blobs_bytes = _ps_build_pack(obj_pairs) if obj_pairs else b""
1028 commits_bytes = _encode_records(commits)
1029 snapshots_bytes = _encode_records(snapshots)
1030 tags_bytes = _encode_records(tags)
1031 meta_bytes = _encode_meta(meta) if meta else b""
1032
1033 sections = [
1034 (_WIRE_SEC_BLOBS, blobs_bytes),
1035 (_WIRE_SEC_COMMITS, commits_bytes),
1036 (_WIRE_SEC_SNAPSHOTS, snapshots_bytes),
1037 (_WIRE_SEC_TAGS, tags_bytes),
1038 (_WIRE_SEC_META, meta_bytes),
1039 ]
1040 section_count = len(sections)
1041 # Header: 4B magic + 1B version + 1B section_count = 6B
1042 # Table: section_count × (1B type + 8B offset + 8B length) = section_count × 17B
1043 header_size = 6 + section_count * 17
1044
1045 offset = header_size
1046 table_entries: list[tuple[int, int, int]] = []
1047 for sec_type, sec_data in sections:
1048 table_entries.append((sec_type, offset, len(sec_data)))
1049 offset += len(sec_data)
1050
1051 h = _hashlib.sha256()
1052 parts: list[bytes] = []
1053
1054 def _emit(chunk: bytes) -> None:
1055 h.update(chunk)
1056 parts.append(chunk)
1057
1058 _emit(b"MUSE")
1059 _emit(_struct.pack("<BB", _WIRE_VERSION, section_count))
1060 for sec_type, sec_offset, sec_length in table_entries:
1061 _emit(_struct.pack("<BQQ", sec_type, sec_offset, sec_length))
1062 for _, sec_data in sections:
1063 _emit(sec_data)
1064 parts.append(h.digest()) # footer — not fed back into h
1065 return b"".join(parts)
1066
1067
1068 def parse_wire_mpack(data: bytes) -> MPack:
1069 """Parse a wire MPack binary bundle back into an :class:`MPack` dict.
1070
1071 Verifies the SHA-256 footer before parsing any section.
1072
1073 Raises:
1074 ValueError: Bad magic bytes or unknown version.
1075 OSError: Footer integrity check failed.
1076 """
1077 from muse.core.pack_store import _parse_pack_bytes as _ps_parse
1078
1079 if len(data) < 38: # 4+1+1 header + 1×17 section entry + 32 footer (minimum)
1080 raise ValueError("Wire MPack too short")
1081 if data[:4] != b"MUSE":
1082 raise ValueError(f"Wire MPack bad magic: {data[:4]!r}")
1083 version = data[4]
1084 if version != _WIRE_VERSION:
1085 raise ValueError(f"Wire MPack unknown version: {version}")
1086
1087 body = data[:-32]
1088 stored = data[-32:]
1089 if _hashlib.sha256(body).digest() != stored:
1090 raise OSError("Wire MPack failed SHA-256 integrity check")
1091
1092 section_count = data[5]
1093 cursor = 6
1094 sections: dict[int, bytes] = {}
1095 for _ in range(section_count):
1096 sec_type = data[cursor]
1097 sec_offset, sec_length = _struct.unpack_from("<QQ", data, cursor + 1)
1098 cursor += 17
1099 sections[sec_type] = data[sec_offset: sec_offset + sec_length]
1100
1101 obj_pairs = _ps_parse(sections.get(_WIRE_SEC_BLOBS, b""))
1102 blobs: list[BlobPayload] = [
1103 BlobPayload(object_id=oid, content=content)
1104 for oid, content in obj_pairs
1105 ]
1106 commits = _decode_records(sections.get(_WIRE_SEC_COMMITS, b""))
1107 snapshots = _decode_records(sections.get(_WIRE_SEC_SNAPSHOTS, b""))
1108 tags = _decode_records(sections.get(_WIRE_SEC_TAGS, b""))
1109 meta = _decode_meta(sections.get(_WIRE_SEC_META, b""))
1110
1111 result = MPack(
1112 blobs=blobs,
1113 commits=commits,
1114 snapshots=snapshots,
1115 tags=tags,
1116 )
1117 if meta:
1118 result["meta"] = meta
1119 return result
1120
1121
1122 # ---------------------------------------------------------------------------
1123 # Pack applying
1124 # ---------------------------------------------------------------------------
1125
1126 def apply_mpack(
1127 repo_root: pathlib.Path,
1128 mpack: MPack,
1129 *,
1130 shallow_commits: "collections.abc.Container[str] | None" = None,
1131 ) -> ApplyResult:
1132 """Write the contents of *mpack* into a local ``.muse/`` directory.
1133
1134 Writes in dependency order: objects first (blobs), then snapshots (which
1135 reference object IDs), then commits (which reference snapshot IDs). All
1136 writes are idempotent — already-present items are silently skipped.
1137
1138 Args:
1139 repo_root: Root of the Muse repository to write into.
1140 mpack: :class:`MPack` received from the remote.
1141 shallow_commits: Optional set of boundary commit IDs (shallow clone).
1142 These commits are written even if their parents are
1143 absent from the local store.
1144
1145 Returns:
1146 :class:`ApplyResult` with counts of newly written and skipped items.
1147 """
1148 import sys as _sys
1149 import time as _time
1150 blobs_written = 0
1151 blobs_skipped = 0
1152 snapshots_written = 0
1153 commits_written = 0
1154 tags_written = 0
1155
1156 raw_blobs = mpack.get("blobs") or []
1157 raw_snapshots = mpack.get("snapshots") or []
1158 raw_commits = mpack.get("commits") or []
1159 _t_apply_start = _time.monotonic()
1160
1161 # Pack-bomb guard: cap the total number of items accepted per call.
1162 # A legitimate push carries at most tens of thousands of blobs per chunk;
1163 # a pack claiming millions is an adversarial input.
1164 total_items = len(raw_blobs) + len(raw_snapshots) + len(raw_commits)
1165 if total_items > MAX_PACK_OBJECTS:
1166 raise ValueError(
1167 f"Pack rejected: {total_items:,} total items exceeds the "
1168 f"{MAX_PACK_OBJECTS:,} item limit per apply_mpack call. "
1169 "Split the pack into smaller chunks."
1170 )
1171
1172 # Deduplicate blob IDs before the write loop. A malicious or buggy
1173 # sender may repeat the same blob ID N times, forcing N sha256 hashes
1174 # before the "already exists" short-circuit returns False.
1175 seen_blob_ids: set[str] = set()
1176 # Track blob IDs that failed validation so dependent snapshots and commits
1177 # can be skipped — prevents dangling reference chains in the store.
1178 failed_blob_ids: set[str] = set()
1179 # Blobs that pass all checks and are new to this store — written as a pack.
1180 pack_blobs: list[tuple[str, bytes]] = []
1181
1182 for obj in raw_blobs:
1183 oid = obj.get("object_id", "")
1184 raw = obj.get("content", b"")
1185 if not oid or not isinstance(raw, bytes):
1186 logger.warning("⚠️ apply_mpack: blob entry missing fields — skipped")
1187 continue
1188 if oid in seen_blob_ids:
1189 logger.debug("⚠️ apply_mpack: duplicate blob_id %s — skipped", short_id(oid))
1190 blobs_skipped += 1
1191 continue
1192 seen_blob_ids.add(oid)
1193 if len(raw) > MAX_OBJECT_WRITE_BYTES:
1194 logger.warning(
1195 "⚠️ apply_mpack: blob %s is %d bytes, exceeding %d MiB limit — skipped",
1196 oid, len(raw), MAX_OBJECT_WRITE_BYTES // (1024 * 1024),
1197 )
1198 failed_blob_ids.add(oid)
1199 continue
1200 try:
1201 validate_object_id(oid)
1202 actual = blob_id(raw)
1203 if actual != oid:
1204 raise ValueError(
1205 f"Content integrity failure: expected {oid} got {actual}"
1206 )
1207 except ValueError as exc:
1208 logger.warning("⚠️ apply_mpack: malformed blob entry — skipped: %s", exc)
1209 failed_blob_ids.add(oid)
1210 continue
1211 if has_object(repo_root, oid):
1212 blobs_skipped += 1
1213 continue
1214 pack_blobs.append((oid, raw))
1215 blobs_written += 1
1216
1217 # Write all new blobs as a single MPack file + index — O(1) file writes
1218 # regardless of blob count. Raises OSError on disk failure, which
1219 # propagates before any snapshot or commit is written.
1220 write_pack(repo_root, pack_blobs)
1221
1222 # Reconstruct full manifests from the delta chain, then write snapshots.
1223 # The hash IS the proof: hash_snapshot(reconstructed) == snapshot_id.
1224 # _apply_snapshot_deltas logs and skips corrupt entries — never raises.
1225 resolved_manifests = _apply_snapshot_deltas(raw_snapshots)
1226
1227 # Track snapshot IDs skipped due to referencing failed objects so dependent
1228 # commits can be skipped — prevents dangling commit → snapshot → (missing object).
1229 skipped_snapshot_ids: set[str] = set()
1230 # Track snapshot IDs that were successfully written in this mpack.
1231 # Used below to refuse commits whose snapshots were not sent (snaps=0 bug guard).
1232 written_snapshot_ids: set[str] = set()
1233
1234 for snap_entry in raw_snapshots:
1235 sid = snap_entry.get("snapshot_id", "")
1236 if not sid or sid not in resolved_manifests:
1237 continue
1238 try:
1239 manifest, directories = resolved_manifests[sid]
1240 # Guard against zip-slip: manifest keys are stored as-is and later
1241 # used to construct checkout paths. A malicious mpack could inject
1242 # keys like "../../etc/cron.d/malicious". We validate all keys here —
1243 # before writing — so no traversal path ever enters the store.
1244 for key in manifest:
1245 validate_workspace_path(key)
1246 # Manifest values are object IDs — validate they are safe hex strings.
1247 for oid in manifest.values():
1248 validate_object_id(oid)
1249 # Refuse to write a snapshot whose objects were not fully written —
1250 # that would create a dangling snapshot → object reference.
1251 missing = failed_blob_ids.intersection(manifest.values())
1252 if missing:
1253 logger.warning(
1254 "⚠️ apply_mpack: snapshot %s skipped — references %d object(s) "
1255 "that failed to write",
1256 short_id(sid), len(missing),
1257 )
1258 skipped_snapshot_ids.add(sid)
1259 continue
1260 snap = SnapshotRecord(snapshot_id=sid, manifest=manifest, directories=directories)
1261 is_new = read_snapshot(repo_root, snap.snapshot_id) is None
1262 write_snapshot(repo_root, snap, sync=False)
1263 written_snapshot_ids.add(sid)
1264 if is_new:
1265 snapshots_written += 1
1266 except (KeyError, ValueError) as exc:
1267 logger.warning("⚠️ apply_mpack: malformed snapshot — skipped: %s", exc)
1268
1269 # Parse all commits first so per-commit validation errors are counted
1270 # before any writes begin.
1271 parsed_commits: list[CommitRecord] = []
1272 for commit_dict in raw_commits:
1273 try:
1274 commit = CommitRecord.from_dict(commit_dict)
1275 if not commit.commit_id:
1276 logger.warning("⚠️ apply_mpack: commit missing commit_id — skipped")
1277 continue
1278 if not commit.snapshot_id:
1279 logger.warning(
1280 "⚠️ apply_mpack: commit %s missing snapshot_id — skipped",
1281 commit.commit_id,
1282 )
1283 continue
1284 # Refuse to write a commit whose snapshot was skipped due to failed
1285 # objects — that would create a dangling commit → snapshot reference.
1286 if commit.snapshot_id in skipped_snapshot_ids:
1287 logger.warning(
1288 "⚠️ apply_mpack: commit %s skipped — its snapshot %s was not written "
1289 "(referenced object(s) failed)",
1290 short_id(commit.commit_id), short_id(commit.snapshot_id),
1291 )
1292 continue
1293 # Refuse to write a commit whose snapshot was not included in this
1294 # mpack AND is not already in the local store. Writing such a commit
1295 # corrupts the store: the commit is then present in `have` on the
1296 # next pull, the server returns nothing new, and pull aborts forever
1297 # with "snapshot missing". The correct behaviour is to skip the commit
1298 # so the next pull re-requests it together with its snapshot.
1299 if (
1300 commit.snapshot_id not in written_snapshot_ids
1301 and read_snapshot(repo_root, commit.snapshot_id) is None
1302 ):
1303 logger.warning(
1304 "⚠️ apply_mpack: commit %s skipped — snapshot %s was not in this "
1305 "mpack and is not in the local store (server sent snaps=0). "
1306 "The next pull will re-request this commit with its snapshot.",
1307 short_id(commit.commit_id), short_id(commit.snapshot_id),
1308 )
1309 continue
1310 parsed_commits.append(commit)
1311 except (KeyError, ValueError, TypeError) as exc:
1312 logger.warning("⚠️ apply_mpack: malformed commit — skipped: %s", exc)
1313
1314 # Write commits in dependency order: retry until stable.
1315 # Bundles may arrive with commits in BFS order (newest-first). Phase 2's
1316 # parent-existence guard rejects a commit whose parent hasn't been written
1317 # yet. Keep cycling through MissingParentError commits until every parent
1318 # in the mpack has been written, or until a full pass produces no progress
1319 # (the missing parent isn't in this mpack — log and skip).
1320 _shallow: "collections.abc.Container[str]" = shallow_commits or ()
1321 pending = parsed_commits
1322 while pending:
1323 deferred: list[CommitRecord] = []
1324 for commit in pending:
1325 # Shallow boundary commits have their parent check bypassed so they
1326 # can be written even when their parents are not in the local store.
1327 is_shallow = commit.commit_id in _shallow
1328 try:
1329 is_new = not has_object(repo_root, commit.commit_id)
1330 write_commit(repo_root, commit, skip_parent_check=is_shallow, sync=False)
1331 if is_new:
1332 commits_written += 1
1333 except MissingParentError:
1334 deferred.append(commit)
1335 except OSError as exc:
1336 logger.critical(
1337 "❌ apply_mpack: store integrity violation for commit — skipped: %s", exc
1338 )
1339 except (ValueError, TypeError) as exc:
1340 logger.warning("⚠️ apply_mpack: malformed commit — skipped: %s", exc)
1341 if len(deferred) == len(pending):
1342 # No progress in this pass — the missing parents are not in the mpack.
1343 for commit in deferred:
1344 logger.warning(
1345 "⚠️ apply_mpack: commit %s skipped — parent not in mpack or local store",
1346 commit.commit_id,
1347 )
1348 break
1349 pending = deferred
1350
1351 # Bulk-fsync: all snapshot and commit files were written with sync=False
1352 # (no per-file fsync) for throughput. A single directory fsync here makes
1353 # all preceding renames durable in one barrier — the same strategy git uses
1354 # for pack writes. The ref is not updated until after apply_mpack returns,
1355 # so a crash before this fsync leaves the store in a consistent (if
1356 # incomplete) state that is safe to re-apply.
1357 if snapshots_written > 0 or commits_written > 0:
1358 from muse.core.paths import muse_dir as _muse_dir
1359 _dot = _muse_dir(repo_root)
1360 for _dir in (_dot / "snapshots", _dot / "commits"):
1361 if _dir.exists():
1362 try:
1363 _fd = os.open(str(_dir), os.O_RDONLY)
1364 try:
1365 os.fsync(_fd)
1366 finally:
1367 os.close(_fd)
1368 except OSError:
1369 pass # best-effort — some filesystems (tmpfs) reject dir fsync
1370
1371 for wire_tag in mpack.get("tags") or []:
1372 try:
1373 tag_record = TagRecord.from_dict(TagDict(
1374 tag_id=wire_tag["tag_id"],
1375 repo_id=wire_tag["repo_id"],
1376 commit_id=wire_tag["commit_id"],
1377 tag=wire_tag["tag"],
1378 created_at=wire_tag["created_at"],
1379 ))
1380 write_tag(repo_root, tag_record)
1381 tags_written += 1
1382 except (KeyError, ValueError) as exc:
1383 logger.warning("⚠️ apply_mpack: malformed tag — skipped: %s", exc)
1384
1385 logger.info(
1386 "✅ Applied pack: %d new blobs, %d new snapshots, %d new commits, %d tags (%d blobs skipped)",
1387 blobs_written,
1388 snapshots_written,
1389 commits_written,
1390 tags_written,
1391 blobs_skipped,
1392 )
1393 return ApplyResult(
1394 commits_written=commits_written,
1395 snapshots_written=snapshots_written,
1396 blobs_written=blobs_written,
1397 blobs_skipped=blobs_skipped,
1398 tags_written=tags_written,
1399 failed_blobs=list(failed_blob_ids),
1400 skipped_snapshots=list(skipped_snapshot_ids),
1401 )
File History 13 commits
sha256:4d09a52c06fbc389006963ad1e5ca6ee48c3cb72799f1a322561035b263db67d merge conflict resolve Human patch 3 days ago
sha256:b5ec4e4a3a73cae0cd08224f32090f2a4836afa0a804cb3231e70c42a3e89295 fix adapter for agent config Human patch 3 days ago
sha256:8e790785ceed39352c123388b8680542965afb7b2305a48827a9fbf4f3ae75fb fix: build_wire_mpack decompresses zstd-encoded blobs befor… Sonnet 4.6 patch 3 days ago
sha256:2c59968e5fd34f1740180d630338fddfb8c465b71e150a0965f11dbdcba5dec7 fix: apply_mpack refuses commits when their snapshot is abs… Sonnet 4.6 patch 3 days ago
sha256:31316ea2e76b244dea3dd67bd0e608d56279ab9fdd2d4ecacbf41060d91f323f merge dev → main: rc11, urllib migration, object store fixe… Sonnet 4.6 minor 5 days ago
sha256:6a70ad4088ca3c07a6b1d85b257a735aa0927db2fc017ed046b982fe95805450 perf: remove debug print statements from mpack hot paths; f… Sonnet 4.6 patch 5 days ago
sha256:35855b7bbce81b93612c655e23587bdebbb5fc09856ff5cbf5cd5b195d0547d4 merge: dev → main (rc11, urllib migration, object store inv… Sonnet 4.6 patch 8 days ago
sha256:633dfa2940e97bf1a3d04996c772027a57d70d103f1693c96da04969613dba6c fix: urllib migration regressions — force flag, job_id, Con… Sonnet 4.6 minor 8 days ago
sha256:8ca6bcf5ebccd1b36d8ec716358b2b634d55e75db19c1d5d339082664c773006 fix: include directories in snapshot deltas sent in push mpack Sonnet 4.6 9 days ago
sha256:85a465e59c29055c46f6a2a9efd5e2036f66857c638240b528e63b64b5d1d5af fix: topo-sort snapshot delta chain in apply_mpack; decompr… Sonnet 4.6 10 days ago
sha256:dbebcbb60d67ec6ee0f09918db3266576c7ca66e93381a69b67ba40fe74174f3 fix: decompress zstd blobs in build_wire_mpack — pack secti… Sonnet 4.6 patch 10 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 11 days ago