Wire push: async unpack model, branch_have fix, and server optimizations
Context
Two weeks of wire push debugging has produced a complete timing profile. This issue is the implementation plan for fixing everything we found. It is organized into three phases, each independently mergeable, each with clear pseudocode so we can verify the architecture before touching code.
Measured timing (muse repo — 1077 commits, 9576 objects, 93 MB mpack)
First push (main, 1032 new commits)
| Step | Time | Description |
|---|---|---|
| step 3: parse mpack | 10,673 ms | Python msgpack/JSON decode of entire 93.8 MB wire blob |
| step 8e: UPSERT commit_refs | 8,409 ms | 1077 rows every push |
| step 8f: UPSERT commit_graph | 3,056 ms | 1077 rows every push |
| step 7e: UPSERT snapshot_refs | 1,606 ms | 1027 rows |
| step 7d: INSERT snapshots | 796 ms | 1027 rows |
| step 8d: INSERT commits | 592 ms | 1032 rows |
| everything else | ~1 s | |
| total | ~28 s | held open as one synchronous HTTP request |
Dev push (0 new anything — pure waste)
| Step | Time | Description |
|---|---|---|
| step 3: parse mpack | 12,179 ms | 93.5 MB sent and decoded for nothing — client bug |
| step 8f: UPSERT commit_graph | 3,389 ms | 1075 rows even though new=0 |
| step 8e: UPSERT commit_refs | 2,169 ms | 1075 rows even though new=0 |
| total | ~25 s | entirely avoidable |
The dev push is the smoking gun. The server correctly detected existing=9581 new=0 objects, existing=1069 new=0 snapshots, existing=1075 new=0 commits — and then still UPSERTed 1075 rows into commit_refs and commit_graph anyway. Two separate bugs feeding each other.
Root cause analysis
Bug A — client: branch_have only anchors the target branch
In push.py, the BFS stop set is built as:
# CURRENT (broken)
branch_have = [remote_head_for_target_branch] # one anchor
if local_tip_is_merge_commit:
branch_have = list(all_remote_branch_heads.values()) # merge exception only
When pushing dev after main, the client only knows about dev's remote head (which doesn't exist yet). It walks the entire DAG back to root and packs 1075 commits + 9581 objects — even though 1032 commits and all 9576 objects are already on the server under main.
The fix is unconditional:
# FIXED
branch_have = [h for h in all_remote_branch_heads.values() if _is_valid_commit_id(h)]
The merge-commit special case comment was correct reasoning but the wrong scope. It should apply to all pushes.
Bug B — server: UPSERT commit_refs/commit_graph even when new=0
After step 8c determines existing=1075 new=0, the server still runs:
# step 8e — runs unconditionally
UPSERT commit_refs for ALL commits in push # 1075 rows, 2.2 s wasted
# step 8f — runs unconditionally
UPSERT commit_graph for ALL commits in push # 1075 rows, 3.4 s wasted
These should be guarded:
if _new_commit_rows or _is_new_branch:
UPSERT commit_refs for ALL commits in push
UPSERT commit_graph for ALL commits in push
Note: commit_refs and commit_graph are still upserted for ALL commits in the push (not just new ones) when there are new commits — this is correct, because a commit can be globally-new on the server but still needs refs/graph rows for this repo.
Bug C — architecture: synchronous unpack holds the HTTP connection for 28 s
POST /push/unpack-mpack does all of parse → dedup → INSERT → advance branch → pg commit inside a single uvicorn worker request. This was not the original design.
MusehubMPackIndex docstring says: "Written by process_mpack_index_job after every push". The FetchNotIndexedError / 503 Retry-After: 60 path in the fetch route is still in the codebase. The background job infrastructure (worker.py, musehub_jobs.py) is already wired up.
The original design was:
- HTTP handler: accept mpack_key, enqueue job, return fast
- Background worker: do the expensive work
- Fetch path: check mpack_index, return 503 if not yet indexed
This was collapsed into a synchronous call at some point, causing the 28 s timeout risk.
Phase 1 — Fix branch_have (client only, no server changes)
File: muse/muse/cli/commands/push.py
Pseudocode:
# Build stop-anchor set from ALL known remote branch heads, unconditionally.
# This replaces the current merge-commit-only special case.
remote_branch_heads: dict[str, str] = fetch_remote_refs(transport, url, token)
# remote_branch_heads = { "main": "sha256:abc...", "dev": "sha256:def...", ... }
branch_have: list[str] = [
h for h in remote_branch_heads.values()
if _is_valid_commit_id(h) and commit_exists(root, h)
]
# If the repo has no remote history yet, branch_have = []
# Walk DAG from local tip, stopping at any commit in branch_have.
# Only commits NOT reachable from any remote head are new.
commit_walk = walk_commits(root, [local_head], have=branch_have)
Expected result after fix:
muse push local main: have=0 (or remote main head), walks 1032 new commits ✓muse push local devafter main: have=main's head (an ancestor of dev's tip), walks ~43 new commits, packs ~0 new objects ✓
Acceptance test:
muse -C ~/ecosystem/muse push local main
# step 1: have=N anchor(s) where N = number of remote branches
# step 7: commits_written=1032 objects_written=121
muse -C ~/ecosystem/muse push local dev
# step 1: have=N anchor(s) — should include main's head
# step 7: commits_written=~43 (dev-only commits)
# objects_written=~0 (all objects already on server from main push)
# total time: <2 s (no 93 MB mpack to send or parse)
Phase 2 — Fix server guard: skip UPSERTs when new=0
File: musehub/musehub/services/musehub_wire_push.py
Pseudocode:
# step 8c: dedup SELECT
_existing_cids: set[str] = { ... } # from DB
_new_commit_rows = [r for r in _commit_rows_inline if r["commit_id"] not in _existing_cids]
# step 8d: INSERT new commits (unchanged)
if _new_commit_rows:
session.add_all([MusehubCommit(**r) for r in _new_commit_rows])
# step 8e: UPSERT commit_refs
# GUARD: only run if there are new commits OR this is a new branch on this repo
_is_new_branch = (current_remote_head is None)
if _new_commit_rows or _is_new_branch:
UPSERT commit_refs for ALL commits in push (_commit_rows_inline)
# "ALL commits" = all commits in this push, not just globally-new ones.
# A commit may be globally-known but still need a ref row for this repo.
# step 8f: UPSERT commit_graph
if _new_commit_rows or _is_new_branch:
UPSERT commit_graph for ALL commits in push (_graph_rows_inline)
Expected result after Phase 2:
- Dev push (after Phase 1 reduces it to ~43 commits): 43 new commit_ref rows, not 1075
- Dev push if already fully pushed: 0 rows, ~0 ms for steps 8e+8f
Phase 3 — Restore async unpack model (background worker)
This is the structural fix. The HTTP handler becomes fast; the worker does the work.
New table: musehub_push_jobs
CREATE TABLE musehub_push_jobs (
job_id TEXT PRIMARY KEY,
repo_id TEXT NOT NULL,
mpack_key TEXT NOT NULL,
branch TEXT NOT NULL,
head_commit_id TEXT NOT NULL,
force BOOLEAN NOT NULL DEFAULT FALSE,
status TEXT NOT NULL DEFAULT 'pending', -- pending | running | done | failed
commits_written INT,
objects_written INT,
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX ON musehub_push_jobs (repo_id, status);
New HTTP handler pseudocode
POST /{owner}/{slug}/push/unpack-mpack
body: { mpack_key, branch, head_commit_id, force, commits_count, objects_count }
1. auth check (unchanged)
2. repo existence check (unchanged)
3. validate mpack_key format (sha256: prefix + 64 hex chars)
4. check mpack exists in MinIO (HEAD — fast, no download)
if missing → 422 "mpack not found: upload it first via /push/presign"
5. INSERT musehub_push_jobs {
job_id = new_uuid(),
repo_id, mpack_key, branch, head_commit_id, force,
status = "pending"
}
6. pg commit (job is now durable)
7. enqueue job to worker queue (fire-and-forget)
8. return 202 Accepted {
"job_id": job_id,
"status": "pending",
"poll_url": "/{owner}/{slug}/push/jobs/{job_id}"
}
Total HTTP time: <50 ms
New status endpoint pseudocode
GET /{owner}/{slug}/push/jobs/{job_id}
1. auth check (repo readable)
2. SELECT * FROM musehub_push_jobs WHERE job_id = ?
3. if not found → 404
4. return {
"job_id": job_id,
"status": "pending" | "running" | "done" | "failed",
"commits_written": int | null,
"objects_written": int | null,
"branch_head": str | null, # set when done
"error": str | null # set when failed
}
Background worker pseudocode
process_push_job(job_id):
# 0. claim job (prevent double-execution)
UPDATE musehub_push_jobs SET status="running", updated_at=now()
WHERE job_id=? AND status="pending"
if no rows updated: return # already claimed by another worker
try:
# 1. fetch mpack from MinIO (by mpack_key)
wire_bytes = backend.get_mpack(mpack_key)
# 2. verify sha256(wire_bytes) == mpack_key
assert sha256(wire_bytes) == mpack_key_hex
# 3. oid-only first pass: extract oid list WITHOUT copying content bytes
oid_list = scan_mpack_oids(wire_bytes)
raw_commit_records = parse_mpack_commits_section(wire_bytes) # JSON, small
raw_snap_records = parse_mpack_snapshots_section(wire_bytes) # JSON, small
# 4. zip-bomb guard (total decompressed size from metadata or content scan)
assert total_decompressed_size < MAX_DECOMPRESSED
# 5. check blocked hashes (oids only — no content needed)
blocked = SELECT object_id FROM blocked_hashes WHERE object_id IN (oid_list)
if blocked: quarantine mpack, raise MPackValidationError
# 6. objects
existing_oids = SELECT object_id FROM musehub_objects WHERE object_id IN (oid_list)
new_oids = set(oid_list) - existing_oids
if new_oids:
# Second pass: load content ONLY for new_oids
new_obj_bytes = extract_object_contents(wire_bytes, new_oids)
INSERT musehub_objects for new_oids
UPSERT object_refs for new_oids
INSERT mpack_index for new_oids
# 7. snapshots
snaps = build_snapshot_manifests(raw_snap_records)
existing_snap_ids = SELECT snapshot_id FROM musehub_snapshots WHERE ...
new_snaps = [s for s in snaps if s.snapshot_id not in existing_snap_ids]
if new_snaps:
INSERT musehub_snapshots for new_snaps
UPSERT snapshot_refs for new_snaps
# 8. commits (topo sorted)
commits = topo_sort(parse_commit_records(raw_commit_records))
existing_cids = SELECT commit_id FROM musehub_commits WHERE ...
new_commits = [c for c in commits if c.commit_id not in existing_cids]
if new_commits:
INSERT musehub_commits for new_commits # structured_delta stored as-is from wire
UPSERT commit_refs for ALL commits in push # repo-scoped; not just new
UPSERT commit_graph for ALL commits in push
# 9. fast-forward check (now that commits are written, we can verify)
current_branch_head = SELECT head_commit_id FROM musehub_branches WHERE ...
if current_branch_head and not force:
assert is_ancestor(current_branch_head, head_commit_id, commit_graph)
# 10. advance branch pointer
UPSERT musehub_branches SET head_commit_id=head_commit_id WHERE repo_id AND branch
# 11. pg commit
session.commit()
# 12. mark job done
UPDATE musehub_push_jobs SET
status="done", commits_written=len(new_commits),
objects_written=len(new_oids), branch_head=head_commit_id, updated_at=now()
# 13. enqueue intel jobs (unchanged from current step 11)
enqueue_push_intel(repo_id, head_commit_id, ...)
except Exception as e:
UPDATE musehub_push_jobs SET status="failed", error_message=str(e), updated_at=now()
raise
Client polling pseudocode
# push.py — replace blocking wait on unpack-mpack response
response = transport.post("/push/unpack-mpack", body)
# response is now 202 with job_id instead of 200 with counts
job_id = response["job_id"]
poll_url = response["poll_url"]
print(f" step 7: unpack queued job={job_id[:8]}", file=sys.stderr)
deadline = time.monotonic() + 300 # 5 min max
while time.monotonic() < deadline:
r = transport.get(poll_url)
if r["status"] == "done":
print(f" → commits_written={r['commits_written']} objects_written={r['objects_written']}", file=sys.stderr)
return r
elif r["status"] == "failed":
raise PushError(r["error"])
time.sleep(0.5)
raise PushError("push job timed out after 300 s")
Phase 3b — oid-only scan (inside Phase 3)
Key optimization: avoid copying 59 MB of object content for objects already on the server.
def scan_mpack_oids(wire_bytes: bytes) -> list[str]:
"""First pass: extract oid strings only. No content bytes copied."""
objects_section = extract_section(wire_bytes, WIRE_SEC_OBJECTS)
count = struct.unpack_from("<Q", objects_section, 0)[0]
cursor = 8
oids = []
for _ in range(count):
oid = objects_section[cursor: cursor + OID_BYTES].decode()
length = struct.unpack_from("<Q", objects_section, cursor + OID_BYTES)[0]
cursor += OID_BYTES + 8 + length # skip content entirely
oids.append(oid)
return oids
def extract_object_contents(wire_bytes: bytes, wanted: set[str]) -> list[tuple[str, bytes]]:
"""Second pass: load content only for wanted oids."""
objects_section = extract_section(wire_bytes, WIRE_SEC_OBJECTS)
# same loop structure, collect only entries whose oid is in wanted
RAM usage drops from ~59 MB (all content) to ~700 KB (oid strings only) during the dedup phase.
Structured delta gap (resolved — documentation rot)
The README comment structured_delta = ??? ← THIS IS THE GAP is stale. The server already stores _cr.structured_delta as-is from the wire commit record. The client computes it at commit time via the domain plugin and transmits it in the mpack. No server-side computation needed. README should be updated.
Implementation order
| Phase | Repo | Risk | Expected gain |
|---|---|---|---|
1: Fix branch_have |
muse | Low — 2-line change | Eliminates redundant dev push (93 MB → ~1 MB, 25 s → <2 s) |
| 2: UPSERT guard | musehub | Low — guard around existing code | Cuts ~11 s from pushes where commits already exist |
| 3: Async job model | muse + musehub | Medium — new table, new endpoint, client poll loop | Eliminates HTTP timeout risk for first push; 28 s moves to background |
| 3b: oid-only scan | muse (core) | Low — pure optimization | Drops parse memory from 59 MB to <1 MB for incremental pushes |
Each phase has a clear acceptance test. No phase requires the others to be present first.
Closing — completed work documented, Phase 3 deferred
Phase 1 — branch_have fix: ✅ Shipped. branch_have now uses all remote branch heads unconditionally. Eliminates the ~93 MB redundant mpack on second-branch pushes. 5 tests in test_push_branch_have.py (BH-1 through BH-5).
Phase 2 — UPSERT guard: ✅ Shipped. commit_refs and commit_graph UPSERTs skip when _commit_rows_inline is empty.
Phase 3 (async unpack model) and Phase 3b (OID-only scan): Deferred to a dedicated multi-phase implementation ticket. Not a blocker — basic push/fetch/clone/pull is working end-to-end. The synchronous unpack holds up under normal repo sizes. Phase 3 becomes important when repos exceed ~500 MB mpacks or first-push latency exceeds uvicorn worker timeout.
Existing test coverage for current synchronous model: 19 tests across tiers 1–7 (test_wire_mpack_unpack_step3_e2e.py + test_wire_mpack_unpack_step3_tiers4567.py). These will need updating when Phase 3 ships.
Status update — 2026-05-29
Phase 1 (branch_have fix): Partially done. The
havelist used for object dedup was fixed (all remote branch heads, no commit_exists filter). Butbranch_have— the separate BFS stop set passed towalk_commits— still only contains the target branch remote head for non-merge commits. The merge-commit special case correctly uses all remote heads; the fix is to make that unconditional. Remaining work: ~2 lines in push.py + tests.Phase 2 (UPSERT guard): ✅ Done. commit_refs and commit_graph UPSERTs are now guarded by
if _commit_rows_inline:.Phase 3 (async unpack model): ❌ Not started.
POST /push/unpack-mpackis still fully synchronous. Nomusehub_push_jobstable, no 202 response, no polling endpoint, no background worker.Phase 3b (OID-only scan): ❌ Not started. Dedup phase still materializes all object content into memory.
Structured delta gap: ✅ Done. Server stores
structured_deltaas-is from wire.Starting TDD on Phase 1 now, then Phase 3.