gabriel / musehub public
Closed #57
filed by gabriel human · 25 days ago

Wire push pseudocode — client and server (first principles)

0 Anchors
Blast radius
Churn 30d
0 Proposals

Client

push local <branch>:

  0. discover remote state
       GET /refs  (no filter — returns all branch heads)
       → { branch_heads: { <name>: <commit_id>, ... } }
       remote_head = branch_heads[<branch>] or null   (null = branch doesn't exist on remote yet)
       have = list(branch_heads.values())             (ALL remote heads = object dedup anchors)

  1. walk local DAG → find commits not on remote (want - have)
       if remote_head is null:
           new_commits = all commits reachable from local tip (topo sorted, ancestors first)
       else if remote_head == local_tip:
           nothing to push → exit
       else:
           new_commits = commits reachable from local tip, not reachable from remote_head
           (topo sorted, ancestors first)
       NOTE: use remote_head (target branch only) as the commit walk boundary;
             use full "have" set (all remote branch heads) for object dedup

  2. for each new commit (ancestors first):
       collect snapshot delta (delta_add, delta_remove) vs parent snapshot
       compute structured_delta = { added, modified, removed } with object IDs
           ← client has both parent + child snapshot manifests; compute here, not server-side
       collect new object bytes:
           for each object_id in delta_add:
               if object_id not in objects_set AND object_id not reachable from have:
                   add (object_id, raw_bytes) to objects list

  3. pack into one mpack (commits topo sorted: parents before children):
       { commits: [...], snapshots: [...], objects: [...] }
       wire format: MUSE binary (b"MUSE" magic + section table + SHA-256 footer)
       NOT msgpack

  4. mpack_bytes = build_wire_mpack(mpack)
     mpack_key = "sha256:" + sha256hex(mpack_bytes)
     size_bytes = len(mpack_bytes)

  5. POST /push/mpack-presign { mpack_key, size_bytes }
       if 413 → abort: "push too large; reduce commits or file sizes" (MVP: 512 MiB hard limit)
       → { presigned_url }

  6. PUT mpack_bytes directly to MinIO/S3/R2 via presigned_url
       Content-Type: application/x-muse-pack
       no auth header — presigned URL is the credential

  7. POST /push/unpack-mpack { mpack_key, branch, head_commit_id, force }
       → { objects_written, objects_skipped, commits_written, branch_head }

Server

POST /push/mpack-presign

receive { mpack_key, size_bytes }:
  if size_bytes > MAX_MPACK_SIZE (512 MiB for MVP):
      return 413 "mpack exceeds size limit"
  generate presigned PUT URL (key=mpack_key, expiry=15 min)
  return { presigned_url }

POST /push/unpack-mpack

receive { mpack_key, branch, head_commit_id, force }:

  1. fetch mpack bytes from object store (MinIO/S3/R2) by mpack_key
  2. verify sha256(bytes) == mpack_key  → 400 if mismatch
  3. verify b"MUSE" magic               → 400 if not MUSE binary format
  4. parse mpack → { commits, snapshots, objects }
  5. for each object:
       decompress zstd payload
       enforce zip-bomb guard (max 10× ratio, max 256 MiB per object)
  6. check each object_id against blocked hashes → add to failed_objects, skip

  begin db transaction:

  7. write objects  [batch ≤ 500 rows per statement]
       a. SELECT existing object_ids WHERE object_id IN (batch) → known_ids
       b. for new objects only (not in known_ids):
            record byte_offset + byte_length within the mpack binary
            INSERT objects (object_id, size_bytes, storage_uri="mpack://<mpack_key>", ...)
            INSERT mpack_index (mpack_key, object_id, byte_offset, byte_length)
       c. UPSERT object_refs for ALL objects in this push (new + already-known)

  8. write snapshots  [batch ≤ 500]
       for each snapshot in topo order:
           if parent snapshot in DB → load parent manifest + apply delta_add/delta_remove
           else → delta_add IS the full manifest (first push to this remote)
           store delta_blob for intermediate snapshots
           store manifest_blob for head snapshot
       INSERT new snapshot rows
       UPSERT snapshot_refs for ALL snapshots in this push

  9. write commits  [topo sorted: parents before children; batch ≤ 500]
       for each commit:
           structured_delta already populated by client — store as-is
           generation = max(parent_generation) + 1  (parents already in this tx)
       INSERT new commit rows (structured_delta, generation, all provenance fields)
       UPSERT commit_refs for ALL commits in this push
       UPSERT commit_graph (generation, snapshot_id) for ALL commits in this push

  10. advance branch pointer (atomic CAS):
       SELECT head_commit_id FROM branches WHERE name=<branch> FOR UPDATE
       if not force:
           verify current head is ancestor of (or equal to) client head_commit_id
           → 409 non-fast-forward if not
       UPDATE branches SET head_commit_id = <tip of new_commits>

  commit db transaction

  11. enqueue async jobs: push.intel, push.file_last_commits

  return { objects_written, objects_skipped, commits_written, branch_head }

Read path for mpack:// objects

Objects stored under a mpack:// URI are resolved via S3 byte-range GET — no full pack download.

read_object(object_id):
  1. SELECT mpack_key, byte_offset, byte_length
       FROM mpack_index WHERE object_id = <id>
  2. GET <s3_url_for_mpack_key>
       Range: bytes=<byte_offset>-<byte_offset+byte_length-1>
  3. decompress zstd → raw payload
  return payload

mpack_index schema:

mpack_key    TEXT    NOT NULL,
object_id    TEXT    NOT NULL,
byte_offset  BIGINT  NOT NULL,
byte_length  INT     NOT NULL,
PRIMARY KEY (mpack_key, object_id),
INDEX (object_id)

Out of scope (post-MVP)

  • Chunked / multi-part push for packs > 512 MiB
  • Pull / fetch / clone (separate issue)
Activity2
gabriel opened this issue 25 days ago
gabriel 16 days ago

Delta compression + pack index — implementation plan

Push/fetch/clone/pull are now confirmed working end-to-end. The mpack_index table exists and is populated, but the byte-range read path is not wired in. Here is the exact gap and the steps to close it.

Current state

  • musehub_mpack_index exists with columns: (entity_id, mpack_id, entity_type, created_at)
  • During push unpack, rows are inserted mapping each object_id → mpack_id ✅
  • read_object_bytes in backends.py handles mpack:// URIs by downloading the entire mpack and scanning it — O(mpack_size) per object lookup ❌
  • No byte_offset / byte_length columns exist on the table ❌
  • No S3 byte-range GET anywhere in the codebase ❌

Steps to wire it in

1. Migration — add byte_offset and byte_length to musehub_mpack_index

ALTER TABLE musehub_mpack_index ADD COLUMN byte_offset BIGINT;
ALTER TABLE musehub_mpack_index ADD COLUMN byte_length INT;

2. Push unpack — when inserting mpack_index rows, compute each object's byte position within the mpack binary using the OBJECTS section layout (deterministic: _build_pack output is header + fixed-width per-object records). Record (mpack_key, object_id, byte_offset, byte_length).

3. Storage backend — add get_range(mpack_key, byte_offset, byte_length) -> bytes that issues an S3 Range: bytes=N-M GET instead of fetching the whole mpack.

4. read_object_bytes — when storage_uri = "mpack://...", look up (byte_offset, byte_length) from mpack_index, issue the byte-range GET. Fall back to full-mpack download if columns are NULL (existing rows before migration).

5. TDD — prove via tests:

  • A pushed object's byte_offset/byte_length are recorded correctly
  • The byte-range GET recovers the exact bytes
  • read_object_bytes uses range GET, not full download
  • Fallback to full download when offset columns are NULL
gabriel 16 days ago

Implementation complete — closing

All steps from the pseudocode are now wired in and verified on staging.

What was built:

  • Migration 0067byte_offset BIGINT and byte_length INT added to musehub_mpack_index (nullable for backward compat with existing rows)
  • compute_object_byte_offsets(wire_bytes) — parses the OBJECTS section table and walks the _build_pack layout to return {oid: (abs_offset, length)} in O(objects) time, no I/O
  • Push unpack step 7d — calls compute_object_byte_offsets on the downloaded mpack and stores byte_offset + byte_length with each musehub_mpack_index row
  • BlobBackend.get_range(mpack_key, offset, length) — issues Range: bytes=N-M S3 GET, O(object_size) transfer
  • read_object_bytes — uses get_range when byte_offset is set; falls back to full mpack download for pre-migration rows (NULL offset)

Verified on staging: 21 objects pushed to wire-hello, all have non-NULL byte_offset/byte_length in musehub_mpack_index. First object at offset=183, consistent with wire mpack header layout.

TDD: 7 tests in test_mpack_byte_range.py — BR-1 through BR-7 all green.