Closed
#57
Wire push pseudocode — client and server (first principles)
0
Anchors
—
Blast radius
—
Churn 30d
0
Proposals
Client
push local <branch>:
0. discover remote state
GET /refs (no filter — returns all branch heads)
→ { branch_heads: { <name>: <commit_id>, ... } }
remote_head = branch_heads[<branch>] or null (null = branch doesn't exist on remote yet)
have = list(branch_heads.values()) (ALL remote heads = object dedup anchors)
1. walk local DAG → find commits not on remote (want - have)
if remote_head is null:
new_commits = all commits reachable from local tip (topo sorted, ancestors first)
else if remote_head == local_tip:
nothing to push → exit
else:
new_commits = commits reachable from local tip, not reachable from remote_head
(topo sorted, ancestors first)
NOTE: use remote_head (target branch only) as the commit walk boundary;
use full "have" set (all remote branch heads) for object dedup
2. for each new commit (ancestors first):
collect snapshot delta (delta_add, delta_remove) vs parent snapshot
compute structured_delta = { added, modified, removed } with object IDs
← client has both parent + child snapshot manifests; compute here, not server-side
collect new object bytes:
for each object_id in delta_add:
if object_id not in objects_set AND object_id not reachable from have:
add (object_id, raw_bytes) to objects list
3. pack into one mpack (commits topo sorted: parents before children):
{ commits: [...], snapshots: [...], objects: [...] }
wire format: MUSE binary (b"MUSE" magic + section table + SHA-256 footer)
NOT msgpack
4. mpack_bytes = build_wire_mpack(mpack)
mpack_key = "sha256:" + sha256hex(mpack_bytes)
size_bytes = len(mpack_bytes)
5. POST /push/mpack-presign { mpack_key, size_bytes }
if 413 → abort: "push too large; reduce commits or file sizes" (MVP: 512 MiB hard limit)
→ { presigned_url }
6. PUT mpack_bytes directly to MinIO/S3/R2 via presigned_url
Content-Type: application/x-muse-pack
no auth header — presigned URL is the credential
7. POST /push/unpack-mpack { mpack_key, branch, head_commit_id, force }
→ { objects_written, objects_skipped, commits_written, branch_head }
Server
POST /push/mpack-presign
receive { mpack_key, size_bytes }:
if size_bytes > MAX_MPACK_SIZE (512 MiB for MVP):
return 413 "mpack exceeds size limit"
generate presigned PUT URL (key=mpack_key, expiry=15 min)
return { presigned_url }
POST /push/unpack-mpack
receive { mpack_key, branch, head_commit_id, force }:
1. fetch mpack bytes from object store (MinIO/S3/R2) by mpack_key
2. verify sha256(bytes) == mpack_key → 400 if mismatch
3. verify b"MUSE" magic → 400 if not MUSE binary format
4. parse mpack → { commits, snapshots, objects }
5. for each object:
decompress zstd payload
enforce zip-bomb guard (max 10× ratio, max 256 MiB per object)
6. check each object_id against blocked hashes → add to failed_objects, skip
begin db transaction:
7. write objects [batch ≤ 500 rows per statement]
a. SELECT existing object_ids WHERE object_id IN (batch) → known_ids
b. for new objects only (not in known_ids):
record byte_offset + byte_length within the mpack binary
INSERT objects (object_id, size_bytes, storage_uri="mpack://<mpack_key>", ...)
INSERT mpack_index (mpack_key, object_id, byte_offset, byte_length)
c. UPSERT object_refs for ALL objects in this push (new + already-known)
8. write snapshots [batch ≤ 500]
for each snapshot in topo order:
if parent snapshot in DB → load parent manifest + apply delta_add/delta_remove
else → delta_add IS the full manifest (first push to this remote)
store delta_blob for intermediate snapshots
store manifest_blob for head snapshot
INSERT new snapshot rows
UPSERT snapshot_refs for ALL snapshots in this push
9. write commits [topo sorted: parents before children; batch ≤ 500]
for each commit:
structured_delta already populated by client — store as-is
generation = max(parent_generation) + 1 (parents already in this tx)
INSERT new commit rows (structured_delta, generation, all provenance fields)
UPSERT commit_refs for ALL commits in this push
UPSERT commit_graph (generation, snapshot_id) for ALL commits in this push
10. advance branch pointer (atomic CAS):
SELECT head_commit_id FROM branches WHERE name=<branch> FOR UPDATE
if not force:
verify current head is ancestor of (or equal to) client head_commit_id
→ 409 non-fast-forward if not
UPDATE branches SET head_commit_id = <tip of new_commits>
commit db transaction
11. enqueue async jobs: push.intel, push.file_last_commits
return { objects_written, objects_skipped, commits_written, branch_head }
Read path for mpack:// objects
Objects stored under a mpack:// URI are resolved via S3 byte-range GET — no full pack download.
read_object(object_id):
1. SELECT mpack_key, byte_offset, byte_length
FROM mpack_index WHERE object_id = <id>
2. GET <s3_url_for_mpack_key>
Range: bytes=<byte_offset>-<byte_offset+byte_length-1>
3. decompress zstd → raw payload
return payload
mpack_index schema:
mpack_key TEXT NOT NULL,
object_id TEXT NOT NULL,
byte_offset BIGINT NOT NULL,
byte_length INT NOT NULL,
PRIMARY KEY (mpack_key, object_id),
INDEX (object_id)
Out of scope (post-MVP)
- Chunked / multi-part push for packs > 512 MiB
- Pull / fetch / clone (separate issue)
Activity2
gabriel
16 days ago
Implementation complete — closing
All steps from the pseudocode are now wired in and verified on staging.
What was built:
- Migration 0067 —
byte_offset BIGINTandbyte_length INTadded tomusehub_mpack_index(nullable for backward compat with existing rows) compute_object_byte_offsets(wire_bytes)— parses the OBJECTS section table and walks the_build_packlayout to return{oid: (abs_offset, length)}in O(objects) time, no I/O- Push unpack step 7d — calls
compute_object_byte_offsetson the downloaded mpack and storesbyte_offset+byte_lengthwith eachmusehub_mpack_indexrow BlobBackend.get_range(mpack_key, offset, length)— issuesRange: bytes=N-MS3 GET, O(object_size) transferread_object_bytes— usesget_rangewhenbyte_offsetis set; falls back to full mpack download for pre-migration rows (NULL offset)
Verified on staging: 21 objects pushed to wire-hello, all have non-NULL byte_offset/byte_length in musehub_mpack_index. First object at offset=183, consistent with wire mpack header layout.
TDD: 7 tests in test_mpack_byte_range.py — BR-1 through BR-7 all green.
Delta compression + pack index — implementation plan
Push/fetch/clone/pull are now confirmed working end-to-end. The mpack_index table exists and is populated, but the byte-range read path is not wired in. Here is the exact gap and the steps to close it.
Current state
musehub_mpack_indexexists with columns:(entity_id, mpack_id, entity_type, created_at)read_object_bytesinbackends.pyhandlesmpack://URIs by downloading the entire mpack and scanning it — O(mpack_size) per object lookup ❌byte_offset/byte_lengthcolumns exist on the table ❌Steps to wire it in
1. Migration — add
byte_offsetandbyte_lengthtomusehub_mpack_index2. Push unpack — when inserting mpack_index rows, compute each object's byte position within the mpack binary using the OBJECTS section layout (deterministic:
_build_packoutput is header + fixed-width per-object records). Record(mpack_key, object_id, byte_offset, byte_length).3. Storage backend — add
get_range(mpack_key, byte_offset, byte_length) -> bytesthat issues an S3Range: bytes=N-MGET instead of fetching the whole mpack.4.
read_object_bytes— whenstorage_uri = "mpack://...", look up(byte_offset, byte_length)frommpack_index, issue the byte-range GET. Fall back to full-mpack download if columns are NULL (existing rows before migration).5. TDD — prove via tests:
read_object_bytesuses range GET, not full download