gabriel / musehub public
Closed #47 Performance
filed by gabriel human · 35 days ago

Fetch and clone: bundle-based download mirrors bundle-based push

0 Anchors
Blast radius
Churn 30d
0 Proposals

Parent ticket

#46 — MuseWire Protocol — GitHub-parity performance across all wire commands

Prerequisite

#45 (push bundle path) must be merged first. Fetch downloads what push uploaded. If push stores a bundle, fetch reads from that same storage model.


The simple version

Push sends one box to the server. Fetch sends one box back to the client. Same box format (MPackBundle). Same one-request principle. Opposite direction.


What fetch does today (the slow path)

For a large fetch (≥ 500 objects OR ≥ 50 MB):

  1. Client POSTs want/have to /fetch/presign
  2. Server walks DAG, computes delta
  3. Server generates N individual presigned GET URLs (one per object)
  4. Client downloads N objects in parallel (up to 32 concurrent)
  5. Client assembles result locally

N individual presigned URL generations server-side. N individual GET requests client-side. Same envelope-per-object problem as push, just in reverse.


What fetch should do

Five steps

1. NEGOTIATE    client tells server what it wants and what it has
2. PACK         server assembles the delta into one MPackBundle
3. TRANSFER     client downloads the bundle — one request, direct from MinIO/R2
4. UNPACK       client extracts objects, commits, snapshots into local store
5. ADVANCE REF  client updates local branch pointer

Step 1 — Negotiate

  • Client sends POST /fetch/bundle with {want, have, ttl_seconds}
  • Server walks DAG from want, stops at have
  • Clone: have is empty — delta is the entire repo

Step 2 — Pack (server-side)

  • A. Walk from want, stop at have
  • B. Collect all commits in delta
  • C. Collect all snapshot manifests for those commits
  • D. Collect all file objects referenced by those snapshots
  • E. Assemble as MPackBundle: {commits, snapshots, objects, branch_heads}
  • F. bundle_id = sha256(bundle_bytes) — content-addressed
  • G. Write bundle to MinIO: backend.put(bundle_id, bundle_bytes)
  • H. Generate one presigned GET URL: backend.presign_get(bundle_id, ttl)

Step 3 — Transfer

  • Stream path (small): server returns bundle bytes inline in response body
  • Presign path (large): server returns {bundle_id, presigned_url, branch_heads}
    • Client does one GET presigned_url → bundle_bytes
    • Cloudflare never sees the object data

Step 4 — Unpack (client-side)

  • A. Verify sha256(bundle_bytes) == bundle_id — one check covers all contents
  • B. msgpack.unpackb(bundle_bytes) → extract commits, snapshots, objects
  • C. Write each file object to local store (write_object)
  • D. Write each snapshot to local store (write_snapshot)
  • E. Write each commit to local store (write_commit)
  • No per-object hash loop. Step A is the integrity check for everything.

Step 5 — Advance ref

  • Update local branch pointer to want tip

New endpoint

POST /{owner}/{slug}/fetch/bundle

Request body (msgpack):

{
  "want":        ["sha256:…"],   // commits the client wants
  "have":        ["sha256:…"],   // commits the client already has
  "ttl_seconds": 3600
}

Response (presign path):

{
  "presign":       true,
  "bundle_id":     "sha256:…",
  "presigned_url": "https://…",  // one GET URL for the entire bundle
  "branch_heads":  {"main": "sha256:…", "dev": "sha256:…"},
  "commit_count":  1024,
  "object_count":  5139
}

Response (stream path):

{
  "presign": false,
  "bundle":  <msgpack bytes>  // bundle inline
}

Server-side implementation — wire_fetch_bundle

async def wire_fetch_bundle(
    session: AsyncSession,
    repo_id: str,
    want: list[str],
    have: list[str],
    ttl_seconds: int = 3600,
) -> dict:
    # 1. Walk DAG — compute delta commits
    delta_commits = await _compute_fetch_delta(session, repo_id, want, have)

    # 2. Collect snapshot IDs and object IDs for delta
    snap_ids = {c.snapshot_id for c in delta_commits if c.snapshot_id}
    snapshots = await _load_snapshots(session, repo_id, snap_ids)
    all_oids = {oid for s in snapshots for oid in s["manifest"].values()}

    # 3. Load object bytes from MinIO in parallel
    objects = await _load_objects_parallel(all_oids)

    # 4. Assemble bundle
    bundle = {
        "commits":      [c.to_dict() for c in delta_commits],
        "snapshots":    snapshots,
        "objects":      [{"object_id": oid, "content": data} for oid, data in objects.items()],
        "branch_heads": await _get_branch_heads(session, repo_id),
    }
    bundle_bytes = msgpack.packb(bundle, use_bin_type=True)
    bundle_id = "sha256:" + hashlib.sha256(bundle_bytes).hexdigest()

    # 5. Write bundle to MinIO, issue presigned GET URL
    await backend.put(bundle_id, bundle_bytes)
    presigned_url = await backend.presign_get(bundle_id, ttl=ttl_seconds)

    return {
        "presign":       True,
        "bundle_id":     bundle_id,
        "presigned_url": presigned_url,
        "branch_heads":  bundle["branch_heads"],
        "commit_count":  len(delta_commits),
        "object_count":  len(objects),
    }

Client-side implementation — _run_fetch_bundle_path

In muse/muse/cli/commands/fetch.py (or equivalent fetch transport):

async def _run_fetch_bundle_path(response_data) -> FetchResult:
    bundle_id    = response_data["bundle_id"]
    presigned_url = response_data["presigned_url"]

    # One GET — entire bundle
    async with httpx.AsyncClient() as client:
        resp = await client.get(presigned_url)
    bundle_bytes = resp.content

    # Verify — one hash covers all contents
    actual_id = "sha256:" + hashlib.sha256(bundle_bytes).hexdigest()
    assert actual_id == bundle_id, f"bundle integrity failure: {bundle_id} != {actual_id}"

    # Unpack
    bundle = msgpack.unpackb(bundle_bytes, raw=False)
    for obj in bundle.get("objects", []):
        write_object(root, obj["object_id"], obj["content"])
    for snap in bundle.get("snapshots", []):
        write_snapshot(root, snap)
    for commit in bundle.get("commits", []):
        write_commit(root, commit)

    return FetchResult(
        commits=bundle["commits"],
        branch_heads=bundle["branch_heads"],
    )

Content-addressing checklist

Step Old (wrong) New (right)
Verify each fetched object per-object sha256 loop sha256(bundle) proves all contents
N presigned GET URLs one URL per object one URL for the bundle
Load objects from MinIO N individual GETs server-side parallel batch read, pack once

Performance target

Command Today Target
fetch (fresh, 1024 commits, 5139 objects) not measured < 15s
clone (same — have is empty) not measured < 15s
pull (incremental, ~10 commits) not measured < 2s

Bundle cleanup

Bundles are transient transfer artifacts. After the client confirms receipt (or after TTL), delete the bundle blob from MinIO. Individual objects remain under their own sha256 keys.

Activity6
gabriel opened this issue 35 days ago
gabriel 35 days ago

Performance gates — one per step

Step What it does Gate How to measure
1. Negotiate POST /fetch/bundle (want/have) < 500ms client timing output
2. Pack DAG walk + collect objects + serialize bundle < 10s server log [fetch/bundle] lines
3. Transfer GET bundle from MinIO/R2 < 5s for 128 MB client timing output
4. Unpack verify sha256 + write objects + snapshots + commits to local store < 5s client timing output
5. Advance ref update local branch pointer < 100ms client timing output
Total end to end < 15s time muse clone <url>

What we have today

Not yet measured end to end. Fetch/clone is the second half of this work. Implement and measure only after push (#45) passes all its gates.

gabriel 34 days ago

Phase 1 complete — wire_fetch_bundle server function

Implemented and green on dev.

What was built

FetchBundleResult TypedDict + wire_fetch_bundle in musehub/services/musehub_wire.py:

  • BFS DAG walk from want, stops at have (same logic as wire_fetch_presign)
  • Loads object bytes from storage in parallel (semaphore=50)
  • Assembles one MPackBundle: {commits, snapshots, objects, branch_heads}
  • bundle_id = sha256(bundle_bytes) — the content-addressing proof
  • Inline bytes for small bundles (presign=False, threshold < 500 obj / 50 MB)
  • Presigned GET URL for large bundles (presign=True)

6 TDD tests (FB0–FB5): single-commit bundle, sha256 proof, bundle contents, have-cut delta, empty want, per-object integrity.

Next

Phase 2: HTTP route POST /{owner}/{slug}/fetch/bundle in wire.py that calls wire_fetch_bundle and returns the msgpack response.

gabriel 34 days ago

Phase 2 complete — HTTP route POST /{owner}/{slug}/fetch/bundle

Also done. Route added to musehub/api/routes/wire.py, calls wire_fetch_bundle, returns msgpack FetchBundleResult.

5 TDD tests (RB0–RB4): 404 missing repo, inline bundle response, sha256 proof end-to-end, private repo auth gate, empty want.

Bonus refactor: extracted _walk_commit_delta helper that delegates to walk_dag_async (the existing DAG utility). Both wire_fetch_presign and wire_fetch_bundle now share one BFS implementation instead of two identical hand-rolled loops.

Next — Phase 3: client-side transport in muse repo

fetch_bundle function in muse/core/transport.py:

  • POST /fetch/bundle with {want, have, ttl_seconds}
  • If presign=False: bundle bytes are inline — verify sha256(bytes) == bundle_id
  • If presign=True: GET presigned_url → bytes → verify same
  • Call apply_mpack(root, bundle)
  • One hash covers all contents — no per-object loop
gabriel 34 days ago

Phase 3 complete — client-side fetch_bundle transport

Implemented in muse/core/transport.py on HttpTransport and LocalFileTransport.

What was built

fetch_bundle method on both transport classes:

  • POST /{owner}/{slug}/fetch/bundle with {want, have, ttl_seconds}
  • Stream path (presign=False): bundle bytes inline — verify sha256(bytes) == bundle_id, then call on_object for each object in the bundle
  • Presign path (presign=True): GET presigned_url → bytes → same verify + unpack
  • Single hash covers all contents — no per-object integrity loop
  • Returns the same dict shape as fetch_presign_or_stream so callers need no adaptation

Next

Phase 4: wire fetch_bundle into clone.py and pull.py, replacing fetch_presign_or_stream on the full (non-shallow) path.

gabriel 34 days ago

Phase 4 complete — fetch_bundle wired into clone and pull

What was built

clone.py: the full-clone path (depth=None) now calls transport.fetch_bundle instead of transport.fetch_presign_or_stream. Shallow clone (--depth N) stays on fetch_stream — bundle fetch has no depth-limit support.

pull.py: same replacement — fetch_bundle replaces fetch_presign_or_stream on the standard pull path.

New test file tests/test_fetch_bundle_command_wiring.py (3 tests, TDD red→green):

  • CW0: full clone calls fetch_bundle, never fetch_presign_or_stream
  • CW1: shallow clone still calls fetch_stream, never fetch_bundle
  • CW2: pull calls fetch_bundle, never fetch_presign_or_stream

Rippled fixes to 7 existing test files that mocked fetch_presign_or_stream — all now also wire fetch_bundle. Fixed two pre-existing snapshot format bugs (tests were using "manifest" key; apply_mpack requires "delta_add"/"delta_remove"). Fixed a thread-safety race in TestStressConcurrent where concurrent patch() calls on the same module attribute left stale mocks.

What remains

  • Bundle cleanup: delete bundle blobs from MinIO after TTL (ticket requirement, not yet implemented)
  • Performance measurement: end-to-end timing against the gates in the first comment
gabriel 34 days ago

Issue complete ✓

All five phases delivered:

Phase What Status
1 wire_fetch_bundle server function
2 HTTP route POST /{owner}/{slug}/fetch/bundle
3 fetch_bundle on HttpTransport + LocalFileTransport
4 Wired into clone (full path) and pull
5 Bundle blob cleanup after TTL (BC0–BC3)

Push sends one box. Fetch sends one box back. Same format. Same one-request principle. Opposite direction.

Performance measurement and end-to-end benchmarking across all four MuseWire verbs (push / pull / fetch / clone) tracked in a follow-on ticket.