gabriel / musehub public
Closed #56 Performance
filed by gabriel human · 25 days ago

Wire push: async unpack model, branch_have fix, and server optimizations

0 Anchors
Blast radius
Churn 30d
0 Proposals

Context

Two weeks of wire push debugging has produced a complete timing profile. This issue is the implementation plan for fixing everything we found. It is organized into three phases, each independently mergeable, each with clear pseudocode so we can verify the architecture before touching code.


Measured timing (muse repo — 1077 commits, 9576 objects, 93 MB mpack)

First push (main, 1032 new commits)

Step Time Description
step 3: parse mpack 10,673 ms Python msgpack/JSON decode of entire 93.8 MB wire blob
step 8e: UPSERT commit_refs 8,409 ms 1077 rows every push
step 8f: UPSERT commit_graph 3,056 ms 1077 rows every push
step 7e: UPSERT snapshot_refs 1,606 ms 1027 rows
step 7d: INSERT snapshots 796 ms 1027 rows
step 8d: INSERT commits 592 ms 1032 rows
everything else ~1 s
total ~28 s held open as one synchronous HTTP request

Dev push (0 new anything — pure waste)

Step Time Description
step 3: parse mpack 12,179 ms 93.5 MB sent and decoded for nothing — client bug
step 8f: UPSERT commit_graph 3,389 ms 1075 rows even though new=0
step 8e: UPSERT commit_refs 2,169 ms 1075 rows even though new=0
total ~25 s entirely avoidable

The dev push is the smoking gun. The server correctly detected existing=9581 new=0 objects, existing=1069 new=0 snapshots, existing=1075 new=0 commits — and then still UPSERTed 1075 rows into commit_refs and commit_graph anyway. Two separate bugs feeding each other.


Root cause analysis

Bug A — client: branch_have only anchors the target branch

In push.py, the BFS stop set is built as:

# CURRENT (broken)
branch_have = [remote_head_for_target_branch]          # one anchor
if local_tip_is_merge_commit:
    branch_have = list(all_remote_branch_heads.values()) # merge exception only

When pushing dev after main, the client only knows about dev's remote head (which doesn't exist yet). It walks the entire DAG back to root and packs 1075 commits + 9581 objects — even though 1032 commits and all 9576 objects are already on the server under main.

The fix is unconditional:

# FIXED
branch_have = [h for h in all_remote_branch_heads.values() if _is_valid_commit_id(h)]

The merge-commit special case comment was correct reasoning but the wrong scope. It should apply to all pushes.

Bug B — server: UPSERT commit_refs/commit_graph even when new=0

After step 8c determines existing=1075 new=0, the server still runs:

# step 8e — runs unconditionally
UPSERT commit_refs for ALL commits in push  # 1075 rows, 2.2 s wasted

# step 8f — runs unconditionally
UPSERT commit_graph for ALL commits in push # 1075 rows, 3.4 s wasted

These should be guarded:

if _new_commit_rows or _is_new_branch:
    UPSERT commit_refs for ALL commits in push
    UPSERT commit_graph for ALL commits in push

Note: commit_refs and commit_graph are still upserted for ALL commits in the push (not just new ones) when there are new commits — this is correct, because a commit can be globally-new on the server but still needs refs/graph rows for this repo.

Bug C — architecture: synchronous unpack holds the HTTP connection for 28 s

POST /push/unpack-mpack does all of parse → dedup → INSERT → advance branch → pg commit inside a single uvicorn worker request. This was not the original design.

MusehubMPackIndex docstring says: "Written by process_mpack_index_job after every push". The FetchNotIndexedError / 503 Retry-After: 60 path in the fetch route is still in the codebase. The background job infrastructure (worker.py, musehub_jobs.py) is already wired up.

The original design was:

  • HTTP handler: accept mpack_key, enqueue job, return fast
  • Background worker: do the expensive work
  • Fetch path: check mpack_index, return 503 if not yet indexed

This was collapsed into a synchronous call at some point, causing the 28 s timeout risk.


Phase 1 — Fix branch_have (client only, no server changes)

File: muse/muse/cli/commands/push.py

Pseudocode:

# Build stop-anchor set from ALL known remote branch heads, unconditionally.
# This replaces the current merge-commit-only special case.

remote_branch_heads: dict[str, str] = fetch_remote_refs(transport, url, token)
# remote_branch_heads = { "main": "sha256:abc...", "dev": "sha256:def...", ... }

branch_have: list[str] = [
    h for h in remote_branch_heads.values()
    if _is_valid_commit_id(h) and commit_exists(root, h)
]
# If the repo has no remote history yet, branch_have = []

# Walk DAG from local tip, stopping at any commit in branch_have.
# Only commits NOT reachable from any remote head are new.
commit_walk = walk_commits(root, [local_head], have=branch_have)

Expected result after fix:

  • muse push local main: have=0 (or remote main head), walks 1032 new commits ✓
  • muse push local dev after main: have=main's head (an ancestor of dev's tip), walks ~43 new commits, packs ~0 new objects ✓

Acceptance test:

muse -C ~/ecosystem/muse push local main
# step 1: have=N anchor(s) where N = number of remote branches
# step 7: commits_written=1032 objects_written=121

muse -C ~/ecosystem/muse push local dev
# step 1: have=N anchor(s) — should include main's head
# step 7: commits_written=~43 (dev-only commits)
#         objects_written=~0 (all objects already on server from main push)
# total time: <2 s  (no 93 MB mpack to send or parse)

Phase 2 — Fix server guard: skip UPSERTs when new=0

File: musehub/musehub/services/musehub_wire_push.py

Pseudocode:

# step 8c: dedup SELECT
_existing_cids: set[str] = { ... }  # from DB
_new_commit_rows = [r for r in _commit_rows_inline if r["commit_id"] not in _existing_cids]

# step 8d: INSERT new commits (unchanged)
if _new_commit_rows:
    session.add_all([MusehubCommit(**r) for r in _new_commit_rows])

# step 8e: UPSERT commit_refs
# GUARD: only run if there are new commits OR this is a new branch on this repo
_is_new_branch = (current_remote_head is None)
if _new_commit_rows or _is_new_branch:
    UPSERT commit_refs for ALL commits in push (_commit_rows_inline)
    # "ALL commits" = all commits in this push, not just globally-new ones.
    # A commit may be globally-known but still need a ref row for this repo.

# step 8f: UPSERT commit_graph
if _new_commit_rows or _is_new_branch:
    UPSERT commit_graph for ALL commits in push (_graph_rows_inline)

Expected result after Phase 2:

  • Dev push (after Phase 1 reduces it to ~43 commits): 43 new commit_ref rows, not 1075
  • Dev push if already fully pushed: 0 rows, ~0 ms for steps 8e+8f

Phase 3 — Restore async unpack model (background worker)

This is the structural fix. The HTTP handler becomes fast; the worker does the work.

New table: musehub_push_jobs

CREATE TABLE musehub_push_jobs (
    job_id         TEXT PRIMARY KEY,
    repo_id        TEXT NOT NULL,
    mpack_key      TEXT NOT NULL,
    branch         TEXT NOT NULL,
    head_commit_id TEXT NOT NULL,
    force          BOOLEAN NOT NULL DEFAULT FALSE,
    status         TEXT NOT NULL DEFAULT 'pending',  -- pending | running | done | failed
    commits_written INT,
    objects_written INT,
    error_message  TEXT,
    created_at     TIMESTAMPTZ NOT NULL DEFAULT now(),
    updated_at     TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX ON musehub_push_jobs (repo_id, status);

New HTTP handler pseudocode

POST /{owner}/{slug}/push/unpack-mpack
  body: { mpack_key, branch, head_commit_id, force, commits_count, objects_count }

  1. auth check (unchanged)
  2. repo existence check (unchanged)
  3. validate mpack_key format (sha256: prefix + 64 hex chars)
  4. check mpack exists in MinIO (HEAD — fast, no download)
     if missing → 422 "mpack not found: upload it first via /push/presign"
  5. INSERT musehub_push_jobs {
         job_id = new_uuid(),
         repo_id, mpack_key, branch, head_commit_id, force,
         status = "pending"
     }
  6. pg commit (job is now durable)
  7. enqueue job to worker queue (fire-and-forget)
  8. return 202 Accepted {
         "job_id": job_id,
         "status": "pending",
         "poll_url": "/{owner}/{slug}/push/jobs/{job_id}"
     }

Total HTTP time: <50 ms

New status endpoint pseudocode

GET /{owner}/{slug}/push/jobs/{job_id}

  1. auth check (repo readable)
  2. SELECT * FROM musehub_push_jobs WHERE job_id = ?
  3. if not found → 404
  4. return {
         "job_id": job_id,
         "status": "pending" | "running" | "done" | "failed",
         "commits_written": int | null,
         "objects_written": int | null,
         "branch_head": str | null,   # set when done
         "error": str | null          # set when failed
     }

Background worker pseudocode

process_push_job(job_id):

  # 0. claim job (prevent double-execution)
  UPDATE musehub_push_jobs SET status="running", updated_at=now()
  WHERE job_id=? AND status="pending"
  if no rows updated: return  # already claimed by another worker

  try:
    # 1. fetch mpack from MinIO (by mpack_key)
    wire_bytes = backend.get_mpack(mpack_key)

    # 2. verify sha256(wire_bytes) == mpack_key
    assert sha256(wire_bytes) == mpack_key_hex

    # 3. oid-only first pass: extract oid list WITHOUT copying content bytes
    oid_list = scan_mpack_oids(wire_bytes)
    raw_commit_records = parse_mpack_commits_section(wire_bytes)   # JSON, small
    raw_snap_records   = parse_mpack_snapshots_section(wire_bytes) # JSON, small

    # 4. zip-bomb guard (total decompressed size from metadata or content scan)
    assert total_decompressed_size < MAX_DECOMPRESSED

    # 5. check blocked hashes (oids only — no content needed)
    blocked = SELECT object_id FROM blocked_hashes WHERE object_id IN (oid_list)
    if blocked: quarantine mpack, raise MPackValidationError

    # 6. objects
    existing_oids = SELECT object_id FROM musehub_objects WHERE object_id IN (oid_list)
    new_oids = set(oid_list) - existing_oids
    if new_oids:
      # Second pass: load content ONLY for new_oids
      new_obj_bytes = extract_object_contents(wire_bytes, new_oids)
      INSERT musehub_objects for new_oids
      UPSERT object_refs for new_oids
      INSERT mpack_index for new_oids

    # 7. snapshots
    snaps = build_snapshot_manifests(raw_snap_records)
    existing_snap_ids = SELECT snapshot_id FROM musehub_snapshots WHERE ...
    new_snaps = [s for s in snaps if s.snapshot_id not in existing_snap_ids]
    if new_snaps:
      INSERT musehub_snapshots for new_snaps
      UPSERT snapshot_refs for new_snaps

    # 8. commits (topo sorted)
    commits = topo_sort(parse_commit_records(raw_commit_records))
    existing_cids = SELECT commit_id FROM musehub_commits WHERE ...
    new_commits = [c for c in commits if c.commit_id not in existing_cids]
    if new_commits:
      INSERT musehub_commits for new_commits   # structured_delta stored as-is from wire
      UPSERT commit_refs for ALL commits in push  # repo-scoped; not just new
      UPSERT commit_graph for ALL commits in push

    # 9. fast-forward check (now that commits are written, we can verify)
    current_branch_head = SELECT head_commit_id FROM musehub_branches WHERE ...
    if current_branch_head and not force:
      assert is_ancestor(current_branch_head, head_commit_id, commit_graph)

    # 10. advance branch pointer
    UPSERT musehub_branches SET head_commit_id=head_commit_id WHERE repo_id AND branch

    # 11. pg commit
    session.commit()

    # 12. mark job done
    UPDATE musehub_push_jobs SET
      status="done", commits_written=len(new_commits),
      objects_written=len(new_oids), branch_head=head_commit_id, updated_at=now()

    # 13. enqueue intel jobs (unchanged from current step 11)
    enqueue_push_intel(repo_id, head_commit_id, ...)

  except Exception as e:
    UPDATE musehub_push_jobs SET status="failed", error_message=str(e), updated_at=now()
    raise

Client polling pseudocode

# push.py — replace blocking wait on unpack-mpack response

response = transport.post("/push/unpack-mpack", body)
# response is now 202 with job_id instead of 200 with counts

job_id   = response["job_id"]
poll_url = response["poll_url"]
print(f"  step 7: unpack queued  job={job_id[:8]}", file=sys.stderr)

deadline = time.monotonic() + 300  # 5 min max

while time.monotonic() < deadline:
    r = transport.get(poll_url)
    if r["status"] == "done":
        print(f"         → commits_written={r['commits_written']} objects_written={r['objects_written']}", file=sys.stderr)
        return r
    elif r["status"] == "failed":
        raise PushError(r["error"])
    time.sleep(0.5)

raise PushError("push job timed out after 300 s")

Phase 3b — oid-only scan (inside Phase 3)

Key optimization: avoid copying 59 MB of object content for objects already on the server.

def scan_mpack_oids(wire_bytes: bytes) -> list[str]:
    """First pass: extract oid strings only. No content bytes copied."""
    objects_section = extract_section(wire_bytes, WIRE_SEC_OBJECTS)
    count = struct.unpack_from("<Q", objects_section, 0)[0]
    cursor = 8
    oids = []
    for _ in range(count):
        oid = objects_section[cursor: cursor + OID_BYTES].decode()
        length = struct.unpack_from("<Q", objects_section, cursor + OID_BYTES)[0]
        cursor += OID_BYTES + 8 + length   # skip content entirely
        oids.append(oid)
    return oids

def extract_object_contents(wire_bytes: bytes, wanted: set[str]) -> list[tuple[str, bytes]]:
    """Second pass: load content only for wanted oids."""
    objects_section = extract_section(wire_bytes, WIRE_SEC_OBJECTS)
    # same loop structure, collect only entries whose oid is in wanted

RAM usage drops from ~59 MB (all content) to ~700 KB (oid strings only) during the dedup phase.


Structured delta gap (resolved — documentation rot)

The README comment structured_delta = ??? ← THIS IS THE GAP is stale. The server already stores _cr.structured_delta as-is from the wire commit record. The client computes it at commit time via the domain plugin and transmits it in the mpack. No server-side computation needed. README should be updated.


Implementation order

Phase Repo Risk Expected gain
1: Fix branch_have muse Low — 2-line change Eliminates redundant dev push (93 MB → ~1 MB, 25 s → <2 s)
2: UPSERT guard musehub Low — guard around existing code Cuts ~11 s from pushes where commits already exist
3: Async job model muse + musehub Medium — new table, new endpoint, client poll loop Eliminates HTTP timeout risk for first push; 28 s moves to background
3b: oid-only scan muse (core) Low — pure optimization Drops parse memory from 59 MB to <1 MB for incremental pushes

Each phase has a clear acceptance test. No phase requires the others to be present first.

Activity2
gabriel opened this issue 25 days ago
gabriel 16 days ago

Status update — 2026-05-29

Phase 1 (branch_have fix): Partially done. The have list used for object dedup was fixed (all remote branch heads, no commit_exists filter). But branch_have — the separate BFS stop set passed to walk_commits — still only contains the target branch remote head for non-merge commits. The merge-commit special case correctly uses all remote heads; the fix is to make that unconditional. Remaining work: ~2 lines in push.py + tests.

Phase 2 (UPSERT guard): ✅ Done. commit_refs and commit_graph UPSERTs are now guarded by if _commit_rows_inline:.

Phase 3 (async unpack model): ❌ Not started. POST /push/unpack-mpack is still fully synchronous. No musehub_push_jobs table, no 202 response, no polling endpoint, no background worker.

Phase 3b (OID-only scan): ❌ Not started. Dedup phase still materializes all object content into memory.

Structured delta gap: ✅ Done. Server stores structured_delta as-is from wire.

Starting TDD on Phase 1 now, then Phase 3.

gabriel 16 days ago

Closing — completed work documented, Phase 3 deferred

Phase 1 — branch_have fix: ✅ Shipped. branch_have now uses all remote branch heads unconditionally. Eliminates the ~93 MB redundant mpack on second-branch pushes. 5 tests in test_push_branch_have.py (BH-1 through BH-5).

Phase 2 — UPSERT guard: ✅ Shipped. commit_refs and commit_graph UPSERTs skip when _commit_rows_inline is empty.

Phase 3 (async unpack model) and Phase 3b (OID-only scan): Deferred to a dedicated multi-phase implementation ticket. Not a blocker — basic push/fetch/clone/pull is working end-to-end. The synchronous unpack holds up under normal repo sizes. Phase 3 becomes important when repos exceed ~500 MB mpacks or first-push latency exceeds uvicorn worker timeout.

Existing test coverage for current synchronous model: 19 tests across tiers 1–7 (test_wire_mpack_unpack_step3_e2e.py + test_wire_mpack_unpack_step3_tiers4567.py). These will need updating when Phase 3 ships.