bundle.index background job: object writes, fetch readiness, and web-scale correctness
The one principle
Content-addressing is a proof, not a label. sha256(bytes) == object_id proves the content. The bundle hash sha256(wire_bytes) == bundle_key proves every byte inside the bundle — including every object's content and its declared ID. Any step that re-verifies what the bundle hash already proved is checking the work of the hash. Delete it.
Current state
process_bundle_index_job exists and the tests pass. It does:
- Fetches bundle from MinIO by bundle_key
- Unpacks msgpack
- Reconstructs snapshot manifests from delta chain → bulk INSERT
musehub_snapshots - Topological sort + bulk INSERT
musehub_commits
What it does not do:
- Write object blobs to MinIO under their sha256 keys
- INSERT rows into
musehub_objects - Upsert object refs (path → object_id per snapshot)
The consequence: after a bundle push + index job, commits and snapshots are in PG but individual objects are not addressable by object_id. Fetch, clone, and pull cannot serve file content. The stream push ghost guard fails on any subsequent push because musehub_objects has no rows for this repo.
This is the root cause of the "1 snapshot object(s) not registered" error on stream push after bundle push.
Why per-object sha256 re-verification must NOT be added
The sync path already verified sha256(wire_bytes) == bundle_key. That proof authenticated every bit of wire_bytes, including every object's content bytes and their declared IDs. Re-hashing each object in the background job is checking the work of the bundle hash. Do not add it. Trust the math.
Phase 1 — Object writes: MinIO + PG (the missing step)
This is the load-bearing fix. Everything downstream (fetch, clone, pull) is blocked on it.
What to add to process_bundle_index_job
After unpacking, extract raw_objects = bundle.get("objects") or [].
Step A — parallel MinIO writes
Objects in the bundle may be zstd-compressed (encoding="zstd"). Decompress before writing to MinIO so the stored bytes are the raw content that sha256 was computed over.
import zstandard as _zstd
_dctx = _zstd.ZstdDecompressor()
async def _put_one(obj: dict) -> str:
oid = obj["object_id"]
raw = obj["content"]
if obj.get("encoding") == "zstd":
raw = _dctx.decompress(raw)
await backend.put_object(oid, raw)
return oid
Cap concurrency with _get_blob_put_sem() (100 slots, already exists). Chunk into batches of 200 and asyncio.gather within each chunk. No per-object sha256 check — the bundle hash already proved every byte.
Step B — bulk INSERT musehub_objects
_OBJ_BATCH = 500
obj_rows = [
{
"object_id": obj["object_id"],
"size_bytes": len(raw_content), # decompressed size
"storage_uri": backend.uri_for(obj["object_id"]),
"content_cache": None,
}
for obj in raw_objects
]
# chunked pg_insert ... on_conflict_do_nothing
Step C — upsert object refs
For each resolved snapshot manifest (already computed in the delta reconstruction loop), upsert one row per (repo_id, object_id) into musehub_object_refs so the ghost guard and fetch presign queries can find them.
Use the existing _upsert_object_refs helper if present, otherwise a chunked pg_insert ... on_conflict_do_nothing.
Web scale math
| Objects | Serial (5ms/PUT) | Parallel (100 slots) |
|---|---|---|
| 1k | 5s | ~50ms |
| 5k | 25s | ~250ms |
| 50k | 250s | ~2.5s |
| 500k | 2500s | ~25s |
Serial is a non-starter at web scale. Parallel writes are mandatory.
Acceptance criteria
- After job runs:
SELECT COUNT(*) FROM musehub_objects WHERE ...== objects in bundle - After job runs: a subsequent stream push does not fail the ghost guard
- After job runs: individual objects are retrievable by object_id via the fetch path
- No per-object sha256 verification anywhere in the job
Phase 2 — Fetch readiness: serve objects before the worker runs
The problem: there is a window between unpack-bundle returning 200 and the bundle.index job completing where a clone/fetch would fail — the branch points to a commit, but the objects that commit references are not yet in MinIO.
The fix: content_cache
Store object bytes inline in a content_cache column on musehub_objects at INSERT time, with storage_uri = 'pending'. The fetch path checks content_cache first; if non-null, serves from there. The background job (or a separate promotion job) then PUTs to MinIO and clears content_cache.
This means:
- Sync unpack-bundle path: INSERT objects with content_cache = raw_bytes, storage_uri = 'pending'
- Background job: for each object where storage_uri = 'pending': PUT to MinIO, set storage_uri = real URI, set content_cache = NULL
- Fetch path:
COALESCE(content_cache, MinIO GET)
The tradeoff: PG row size. A 25 KB blob inline in PG costs ~25 KB of table storage. For 5k objects that is 125 MB in PG. Acceptable for a push window; not acceptable permanently. Hence the two-step: inline for immediacy, promoted to MinIO for permanence.
Size guard: objects above a configurable threshold (default 1 MB) skip content_cache and go straight to MinIO. Only small objects benefit from inline serving.
Acceptance criteria
- Immediately after unpack-bundle returns 200 (before worker runs): fetch of any file in the pushed commit returns the correct bytes
- After worker runs: content_cache is NULL for all objects, storage_uri points to MinIO
- Objects > 1 MB never enter content_cache
Phase 3 — Idempotency and retry safety
The job must be safe to run multiple times. Network failures, worker crashes, and OOM kills must not corrupt state.
Invariants:
- All PG inserts use
on_conflict_do_nothing— already true for snapshots and commits, must be true for objects and object refs - MinIO PUT is idempotent by nature (same key, same bytes — S3 accepts it)
- Job status must not be set to
doneuntil all three write phases complete (objects, snapshots, commits). The worker already wraps in try/except and callsfail_jobon error — verify this is wired correctly for partial failures mid-job. - A job that fails mid-object-write will be retried by stale claim recovery (
_STALE_CLAIM_MINUTES). On retry, already-written objects are skipped viaon_conflict_do_nothing, already-written snapshots and commits are skipped the same way.
Acceptance criteria
- Running the job twice produces the same DB state as running it once
- Killing the worker mid-job and restarting results in a completed correct state
- No orphan
content_cacherows after successful promotion
Phase 4 — Observability
The job is the critical path for fetch/clone correctness. It needs enough logging to diagnose failures in production.
Timing breakdown log (already partially present — extend):
[bundle.index] job=sha256:abc123 repo=sha256:def456
fetch_bundle: 0.312s (47,231 KB)
unpack: 0.841s (1,035 commits, 5,165 objects, 1,035 snapshots)
object_puts: 0.287s (5,165 objects, 100 parallel)
object_insert: 0.043s (5,165 rows)
object_refs: 0.118s (362,550 refs)
snapshot_insert: 0.031s (1,035 rows)
commit_insert: 0.029s (1,035 rows)
total: 1.661s
Job result dict — extend return value:
{
"commits_written": 1035,
"snapshots_written": 1035,
"objects_written": 5165,
"object_refs_written": 362550,
"elapsed_ms": 1661,
}
Implementation order
Phase 1 first — it unblocks fetch/clone/pull and fixes the ghost guard race. Everything else is hardening.
Phase 2 — implement only after Phase 1 is proven. It adds complexity (two-phase object storage) and should be gated on a measured fetch-before-worker test showing actual latency.
Phase 3 — implement alongside Phase 1 (idempotency is zero extra cost when you write Phase 1 correctly the first time).
Phase 4 — extend the existing log lines while writing Phase 1.
Files touched
| Phase | File |
|---|---|
| 1, 3, 4 | musehub/musehub/services/musehub_wire.py — process_bundle_index_job |
| 2 | musehub/musehub/services/musehub_wire.py — wire_push_unpack_bundle (inline content_cache writes) |
| 2 | musehub/musehub/db/musehub_models.py — content_cache column on MusehubObject |
| 2 | musehub/musehub/services/musehub_fetch.py (or equivalent) — serve from content_cache |
| 1–4 | musehub/tests/test_bundle_index_job.py — TDD test file (new) |
TDD test plan
All tests in musehub/tests/test_bundle_index_job.py.
test_objects_written_to_minio_after_job
After job runs: backend.get_object(oid) returns correct bytes for every object in bundle.
test_musehub_objects_rows_after_job
After job runs: musehub_objects has one row per object with correct storage_uri.
test_object_refs_after_job
After job runs: musehub_object_refs has rows for every (repo_id, object_id) in bundle.
test_no_per_object_sha256_verification
Patch hashlib.sha256 to raise inside the job — job must complete without calling it.
(bundle hash already proved content; re-hashing is checking the work of the hash)
test_ghost_guard_passes_after_job
stream push immediately after bundle.index job completes does not fail ghost guard.
test_job_is_idempotent
Run process_bundle_index_job twice with same job_id — DB state identical to one run.
test_job_timing_breakdown_in_return_value
Return dict contains objects_written, object_refs_written, elapsed_ms.
Phase 2 complete — fetch readiness before worker runs
Commit: sha256:e587eb26e51c
What shipped
Sync-path content_cache write — small bundles (≤ bundle_content_cache_max_bytes, default 4 MB) are now parsed inline on the sync path. Each object is written to musehub_objects with content_cache=raw_bytes and storage_uri='pending'. Fetch requests are served immediately from content_cache via read_object_bytes — no background job required.
Threshold-gated — bundles above 4 MB skip the inline parse entirely. The sync path stays at the 3-step fast path (MinIO GET + sha256 verify → branch advance → enqueue job). Large bundles (production repos) rely on the worker.
Background job promotion — process_bundle_index_job changed from ON CONFLICT DO NOTHING to ON CONFLICT DO UPDATE SET storage_uri=..., content_cache=NULL. This promotes pending rows written by the sync path: clears the inline bytes and sets the real s3:// URI once objects land in MinIO.
New setting: bundle_content_cache_max_bytes: int = 4 * 1024 * 1024 in config.py.
TDD: test_bundle_fetch_readiness.py (3 tests, all green)
test_fetch_object_before_worker_returns_bytes— after unpack-bundle returns 200, wire_fetch_objects returns correct bytes without the job runningtest_content_cache_cleared_after_job— after job runs, content_cache is NULL and storage_uri is real s3:// URItest_large_bundle_skips_content_cache— threshold=0 simulation: objects not accessible before job runs
Updated test_unpack_bundle_skips_msgpack.py to target the large-bundle code path (threshold=0), which must remain parse-free.
19 bundle tests passing (Phase 1 + Phase 2 + size gates + async gate).
Phase 3 complete — idempotency and retry safety
Commit: sha256:f3df6c68a916
Result
Phase 3 tests pass immediately. The idempotency and retry safety required by this phase were already built correctly during Phases 1 and 2:
- ON CONFLICT DO UPDATE in
process_bundle_index_jobpromotes pending rows atomically (content_cache cleared, real storage_uri set) — re-running the same job is always correct - MinIO PUT is idempotent — same key, same bytes, S3 accepts it silently
- Session rollback safety — PG changes are uncommitted until the caller calls
session.commit(); a worker crash rolls back PG writes while leaving MinIO writes in place; retry re-PUTs (no-op) and re-INSERTs (ON CONFLICT handles them) - Worker machinery (
claim_next_job/fail_job/complete_job) already correctly resets status to 'pending' on failure ifattempt < _MAX_ATTEMPTS(3)
TDD: test_bundle_index_job_phase3.py (3 tests, all green)
test_partial_minio_failure_retry_produces_correct_state— crash after 2 PUTs, rollback, retry → all objects correct, no orphan content_cachetest_worker_full_lifecycle_pending_to_done— claim_next_job (pending→running), process, complete_job (→done), done_at settest_failed_job_resets_to_pending_for_retry— fail_job resets to pending, retry increments attempt, second run completes to done
22 bundle tests passing total (Phase 1 + Phase 2 + Phase 3 + size gates + async gate + skip-parse gate).
Phase 4 complete — observability timing breakdown
All 27 tests passing across all four phases.
What was added
7 per-phase timing keys in the process_bundle_index_job return dict:
| Key | What it measures |
|---|---|
fetch_bundle_ms |
MinIO GET — one round-trip, critical path |
unpack_ms |
msgpack.unpackb — O(bundle_size), CPU-bound |
snapshot_insert_ms |
bulk INSERT musehub_snapshots |
commit_insert_ms |
bulk INSERT musehub_commits |
object_puts_ms |
parallel MinIO PUTs — network-bound, batched |
object_insert_ms |
bulk INSERT / ON CONFLICT DO UPDATE musehub_objects |
object_refs_ms |
upsert musehub_object_refs |
Plus bundle_size_bytes (raw wire size) and the existing elapsed_ms.
Structured summary log — single line with all phase labels, parseable by ops without log line reconstruction:
✅ [bundle.index] done in 234.1ms fetch_bundle=41.2ms unpack=18.7ms snapshot_insert=12.3ms commit_insert=9.8ms object_puts=142.6ms object_insert=7.4ms object_refs=2.1ms snapshots=4 commits=4 objects=16 refs=16 bundle=28 KB job=a1b2c3d4...
Tests (5 new in test_bundle_index_job_phase4.py)
test_return_dict_has_per_phase_timing_keys— all 7 keys presenttest_all_phase_timings_are_non_negative— no clock anomaliestest_bundle_size_bytes_in_return_dict— matches wire size exactlytest_phase_timings_sum_within_elapsed— sum ≤ elapsed_ms + 50ms overhead budgettest_summary_log_contains_phase_breakdown— caplog verifies all labels in one line
All phases recap
| Phase | Tests | Invariants |
|---|---|---|
| 1 | 8 | bundle.index job enqueued, objects written, snapshot/commit rows, object_refs |
| 2 | 3 | content_cache fetch-before-job, promotion after job, large-bundle skip |
| 3 | 3 | partial MinIO failure + retry, pending→running→done lifecycle, fail→pending reset |
| 4 | 5 | per-phase timing dict, non-negative values, bundle_size_bytes, sum ≤ elapsed, structured log |
Closing.
Phase 1 complete — objects, refs, idempotency
What shipped
Object writes to MinIO (parallel) After the job runs, every object blob is at its sha256 address in storage. Writes use
_get_blob_put_sem()(100 slots) so 5k objects takes ~250ms instead of ~25s serial.musehub_objects rows Bulk INSERT with
on_conflict_do_nothing— idempotent by construction.musehub_object_refs
_upsert_object_refscalled with all object IDs — ghost guard and GC queries now return correct results for bundle-pushed repos.No per-object sha256 re-verification
sha256(wire_bytes) == bundle_keyon the sync path already authenticated every byte. Re-hashing each object in the job is checking the work of the hash. Deleted.Extended return dict
objects_written,object_refs_written,elapsed_ms— observability required by Phase 4.TDD:
tests/test_bundle_index_job.py— 6 tests green. Existingtest_bundle_push_async.py(3 tests) still green.Root cause fixed: the ghost guard failure on stream push after bundle push was caused by missing
musehub_object_refsrows. That race is gone.Phase 2 next — fetch readiness before the worker runs
The remaining window: unpack-bundle returns 200, branch pointer is updated, but the bundle.index job hasn't run yet. A clone/fetch during that window hits the branch, finds the commit, follows the snapshot manifest to object IDs — and gets 404s because the objects aren't in MinIO yet.
The fix: at unpack-bundle time, INSERT objects with
content_cache = raw_bytesandstorage_uri = 'pending'. The fetch path checkscontent_cachefirst. The background job promotes to MinIO and clearscontent_cache.Starting Phase 2 TDD now.