gabriel / musehub public
Closed #50 Enhancement infrastructure
filed by gabriel human · 38 days ago

bundle.index background job: object writes, fetch readiness, and web-scale correctness

0 Anchors
Blast radius
Churn 30d
0 Proposals

The one principle

Content-addressing is a proof, not a label. sha256(bytes) == object_id proves the content. The bundle hash sha256(wire_bytes) == bundle_key proves every byte inside the bundle — including every object's content and its declared ID. Any step that re-verifies what the bundle hash already proved is checking the work of the hash. Delete it.


Current state

process_bundle_index_job exists and the tests pass. It does:

  • Fetches bundle from MinIO by bundle_key
  • Unpacks msgpack
  • Reconstructs snapshot manifests from delta chain → bulk INSERT musehub_snapshots
  • Topological sort + bulk INSERT musehub_commits

What it does not do:

  • Write object blobs to MinIO under their sha256 keys
  • INSERT rows into musehub_objects
  • Upsert object refs (path → object_id per snapshot)

The consequence: after a bundle push + index job, commits and snapshots are in PG but individual objects are not addressable by object_id. Fetch, clone, and pull cannot serve file content. The stream push ghost guard fails on any subsequent push because musehub_objects has no rows for this repo.

This is the root cause of the "1 snapshot object(s) not registered" error on stream push after bundle push.


Why per-object sha256 re-verification must NOT be added

The sync path already verified sha256(wire_bytes) == bundle_key. That proof authenticated every bit of wire_bytes, including every object's content bytes and their declared IDs. Re-hashing each object in the background job is checking the work of the bundle hash. Do not add it. Trust the math.


Phase 1 — Object writes: MinIO + PG (the missing step)

This is the load-bearing fix. Everything downstream (fetch, clone, pull) is blocked on it.

What to add to process_bundle_index_job

After unpacking, extract raw_objects = bundle.get("objects") or [].

Step A — parallel MinIO writes

Objects in the bundle may be zstd-compressed (encoding="zstd"). Decompress before writing to MinIO so the stored bytes are the raw content that sha256 was computed over.

import zstandard as _zstd
_dctx = _zstd.ZstdDecompressor()

async def _put_one(obj: dict) -> str:
    oid = obj["object_id"]
    raw = obj["content"]
    if obj.get("encoding") == "zstd":
        raw = _dctx.decompress(raw)
    await backend.put_object(oid, raw)
    return oid

Cap concurrency with _get_blob_put_sem() (100 slots, already exists). Chunk into batches of 200 and asyncio.gather within each chunk. No per-object sha256 check — the bundle hash already proved every byte.

Step B — bulk INSERT musehub_objects

_OBJ_BATCH = 500
obj_rows = [
    {
        "object_id":   obj["object_id"],
        "size_bytes":  len(raw_content),   # decompressed size
        "storage_uri": backend.uri_for(obj["object_id"]),
        "content_cache": None,
    }
    for obj in raw_objects
]
# chunked pg_insert ... on_conflict_do_nothing

Step C — upsert object refs

For each resolved snapshot manifest (already computed in the delta reconstruction loop), upsert one row per (repo_id, object_id) into musehub_object_refs so the ghost guard and fetch presign queries can find them.

Use the existing _upsert_object_refs helper if present, otherwise a chunked pg_insert ... on_conflict_do_nothing.

Web scale math

Objects Serial (5ms/PUT) Parallel (100 slots)
1k 5s ~50ms
5k 25s ~250ms
50k 250s ~2.5s
500k 2500s ~25s

Serial is a non-starter at web scale. Parallel writes are mandatory.

Acceptance criteria

  • After job runs: SELECT COUNT(*) FROM musehub_objects WHERE ... == objects in bundle
  • After job runs: a subsequent stream push does not fail the ghost guard
  • After job runs: individual objects are retrievable by object_id via the fetch path
  • No per-object sha256 verification anywhere in the job

Phase 2 — Fetch readiness: serve objects before the worker runs

The problem: there is a window between unpack-bundle returning 200 and the bundle.index job completing where a clone/fetch would fail — the branch points to a commit, but the objects that commit references are not yet in MinIO.

The fix: content_cache

Store object bytes inline in a content_cache column on musehub_objects at INSERT time, with storage_uri = 'pending'. The fetch path checks content_cache first; if non-null, serves from there. The background job (or a separate promotion job) then PUTs to MinIO and clears content_cache.

This means:

  • Sync unpack-bundle path: INSERT objects with content_cache = raw_bytes, storage_uri = 'pending'
  • Background job: for each object where storage_uri = 'pending': PUT to MinIO, set storage_uri = real URI, set content_cache = NULL
  • Fetch path: COALESCE(content_cache, MinIO GET)

The tradeoff: PG row size. A 25 KB blob inline in PG costs ~25 KB of table storage. For 5k objects that is 125 MB in PG. Acceptable for a push window; not acceptable permanently. Hence the two-step: inline for immediacy, promoted to MinIO for permanence.

Size guard: objects above a configurable threshold (default 1 MB) skip content_cache and go straight to MinIO. Only small objects benefit from inline serving.

Acceptance criteria

  • Immediately after unpack-bundle returns 200 (before worker runs): fetch of any file in the pushed commit returns the correct bytes
  • After worker runs: content_cache is NULL for all objects, storage_uri points to MinIO
  • Objects > 1 MB never enter content_cache

Phase 3 — Idempotency and retry safety

The job must be safe to run multiple times. Network failures, worker crashes, and OOM kills must not corrupt state.

Invariants:

  • All PG inserts use on_conflict_do_nothing — already true for snapshots and commits, must be true for objects and object refs
  • MinIO PUT is idempotent by nature (same key, same bytes — S3 accepts it)
  • Job status must not be set to done until all three write phases complete (objects, snapshots, commits). The worker already wraps in try/except and calls fail_job on error — verify this is wired correctly for partial failures mid-job.
  • A job that fails mid-object-write will be retried by stale claim recovery (_STALE_CLAIM_MINUTES). On retry, already-written objects are skipped via on_conflict_do_nothing, already-written snapshots and commits are skipped the same way.

Acceptance criteria

  • Running the job twice produces the same DB state as running it once
  • Killing the worker mid-job and restarting results in a completed correct state
  • No orphan content_cache rows after successful promotion

Phase 4 — Observability

The job is the critical path for fetch/clone correctness. It needs enough logging to diagnose failures in production.

Timing breakdown log (already partially present — extend):

[bundle.index] job=sha256:abc123 repo=sha256:def456
  fetch_bundle:    0.312s  (47,231 KB)
  unpack:          0.841s  (1,035 commits, 5,165 objects, 1,035 snapshots)
  object_puts:     0.287s  (5,165 objects, 100 parallel)
  object_insert:   0.043s  (5,165 rows)
  object_refs:     0.118s  (362,550 refs)
  snapshot_insert: 0.031s  (1,035 rows)
  commit_insert:   0.029s  (1,035 rows)
  total:           1.661s

Job result dict — extend return value:

{
    "commits_written":   1035,
    "snapshots_written": 1035,
    "objects_written":   5165,
    "object_refs_written": 362550,
    "elapsed_ms":        1661,
}

Implementation order

Phase 1 first — it unblocks fetch/clone/pull and fixes the ghost guard race. Everything else is hardening.

Phase 2 — implement only after Phase 1 is proven. It adds complexity (two-phase object storage) and should be gated on a measured fetch-before-worker test showing actual latency.

Phase 3 — implement alongside Phase 1 (idempotency is zero extra cost when you write Phase 1 correctly the first time).

Phase 4 — extend the existing log lines while writing Phase 1.


Files touched

Phase File
1, 3, 4 musehub/musehub/services/musehub_wire.pyprocess_bundle_index_job
2 musehub/musehub/services/musehub_wire.pywire_push_unpack_bundle (inline content_cache writes)
2 musehub/musehub/db/musehub_models.pycontent_cache column on MusehubObject
2 musehub/musehub/services/musehub_fetch.py (or equivalent) — serve from content_cache
1–4 musehub/tests/test_bundle_index_job.py — TDD test file (new)

TDD test plan

All tests in musehub/tests/test_bundle_index_job.py.

test_objects_written_to_minio_after_job
  After job runs: backend.get_object(oid) returns correct bytes for every object in bundle.

test_musehub_objects_rows_after_job
  After job runs: musehub_objects has one row per object with correct storage_uri.

test_object_refs_after_job
  After job runs: musehub_object_refs has rows for every (repo_id, object_id) in bundle.

test_no_per_object_sha256_verification
  Patch hashlib.sha256 to raise inside the job — job must complete without calling it.
  (bundle hash already proved content; re-hashing is checking the work of the hash)

test_ghost_guard_passes_after_job
  stream push immediately after bundle.index job completes does not fail ghost guard.

test_job_is_idempotent
  Run process_bundle_index_job twice with same job_id — DB state identical to one run.

test_job_timing_breakdown_in_return_value
  Return dict contains objects_written, object_refs_written, elapsed_ms.
Activity4
gabriel opened this issue 38 days ago
gabriel 38 days ago

Phase 1 complete — objects, refs, idempotency

What shipped

Object writes to MinIO (parallel) After the job runs, every object blob is at its sha256 address in storage. Writes use _get_blob_put_sem() (100 slots) so 5k objects takes ~250ms instead of ~25s serial.

musehub_objects rows Bulk INSERT with on_conflict_do_nothing — idempotent by construction.

musehub_object_refs _upsert_object_refs called with all object IDs — ghost guard and GC queries now return correct results for bundle-pushed repos.

No per-object sha256 re-verification sha256(wire_bytes) == bundle_key on the sync path already authenticated every byte. Re-hashing each object in the job is checking the work of the hash. Deleted.

Extended return dict objects_written, object_refs_written, elapsed_ms — observability required by Phase 4.

TDD: tests/test_bundle_index_job.py — 6 tests green. Existing test_bundle_push_async.py (3 tests) still green.

Root cause fixed: the ghost guard failure on stream push after bundle push was caused by missing musehub_object_refs rows. That race is gone.


Phase 2 next — fetch readiness before the worker runs

The remaining window: unpack-bundle returns 200, branch pointer is updated, but the bundle.index job hasn't run yet. A clone/fetch during that window hits the branch, finds the commit, follows the snapshot manifest to object IDs — and gets 404s because the objects aren't in MinIO yet.

The fix: at unpack-bundle time, INSERT objects with content_cache = raw_bytes and storage_uri = 'pending'. The fetch path checks content_cache first. The background job promotes to MinIO and clears content_cache.

Starting Phase 2 TDD now.

gabriel 38 days ago

Phase 2 complete — fetch readiness before worker runs

Commit: sha256:e587eb26e51c

What shipped

Sync-path content_cache write — small bundles (≤ bundle_content_cache_max_bytes, default 4 MB) are now parsed inline on the sync path. Each object is written to musehub_objects with content_cache=raw_bytes and storage_uri='pending'. Fetch requests are served immediately from content_cache via read_object_bytes — no background job required.

Threshold-gated — bundles above 4 MB skip the inline parse entirely. The sync path stays at the 3-step fast path (MinIO GET + sha256 verify → branch advance → enqueue job). Large bundles (production repos) rely on the worker.

Background job promotionprocess_bundle_index_job changed from ON CONFLICT DO NOTHING to ON CONFLICT DO UPDATE SET storage_uri=..., content_cache=NULL. This promotes pending rows written by the sync path: clears the inline bytes and sets the real s3:// URI once objects land in MinIO.

New setting: bundle_content_cache_max_bytes: int = 4 * 1024 * 1024 in config.py.

TDD: test_bundle_fetch_readiness.py (3 tests, all green)

  1. test_fetch_object_before_worker_returns_bytes — after unpack-bundle returns 200, wire_fetch_objects returns correct bytes without the job running
  2. test_content_cache_cleared_after_job — after job runs, content_cache is NULL and storage_uri is real s3:// URI
  3. test_large_bundle_skips_content_cache — threshold=0 simulation: objects not accessible before job runs

Updated test_unpack_bundle_skips_msgpack.py to target the large-bundle code path (threshold=0), which must remain parse-free.

19 bundle tests passing (Phase 1 + Phase 2 + size gates + async gate).

gabriel 38 days ago

Phase 3 complete — idempotency and retry safety

Commit: sha256:f3df6c68a916

Result

Phase 3 tests pass immediately. The idempotency and retry safety required by this phase were already built correctly during Phases 1 and 2:

  • ON CONFLICT DO UPDATE in process_bundle_index_job promotes pending rows atomically (content_cache cleared, real storage_uri set) — re-running the same job is always correct
  • MinIO PUT is idempotent — same key, same bytes, S3 accepts it silently
  • Session rollback safety — PG changes are uncommitted until the caller calls session.commit(); a worker crash rolls back PG writes while leaving MinIO writes in place; retry re-PUTs (no-op) and re-INSERTs (ON CONFLICT handles them)
  • Worker machinery (claim_next_job / fail_job / complete_job) already correctly resets status to 'pending' on failure if attempt < _MAX_ATTEMPTS (3)

TDD: test_bundle_index_job_phase3.py (3 tests, all green)

  1. test_partial_minio_failure_retry_produces_correct_state — crash after 2 PUTs, rollback, retry → all objects correct, no orphan content_cache
  2. test_worker_full_lifecycle_pending_to_done — claim_next_job (pending→running), process, complete_job (→done), done_at set
  3. test_failed_job_resets_to_pending_for_retry — fail_job resets to pending, retry increments attempt, second run completes to done

22 bundle tests passing total (Phase 1 + Phase 2 + Phase 3 + size gates + async gate + skip-parse gate).

gabriel 38 days ago

Phase 4 complete — observability timing breakdown

All 27 tests passing across all four phases.

What was added

7 per-phase timing keys in the process_bundle_index_job return dict:

Key What it measures
fetch_bundle_ms MinIO GET — one round-trip, critical path
unpack_ms msgpack.unpackb — O(bundle_size), CPU-bound
snapshot_insert_ms bulk INSERT musehub_snapshots
commit_insert_ms bulk INSERT musehub_commits
object_puts_ms parallel MinIO PUTs — network-bound, batched
object_insert_ms bulk INSERT / ON CONFLICT DO UPDATE musehub_objects
object_refs_ms upsert musehub_object_refs

Plus bundle_size_bytes (raw wire size) and the existing elapsed_ms.

Structured summary log — single line with all phase labels, parseable by ops without log line reconstruction:

✅ [bundle.index] done in 234.1ms  fetch_bundle=41.2ms  unpack=18.7ms  snapshot_insert=12.3ms  commit_insert=9.8ms  object_puts=142.6ms  object_insert=7.4ms  object_refs=2.1ms  snapshots=4 commits=4 objects=16 refs=16  bundle=28 KB  job=a1b2c3d4...

Tests (5 new in test_bundle_index_job_phase4.py)

  • test_return_dict_has_per_phase_timing_keys — all 7 keys present
  • test_all_phase_timings_are_non_negative — no clock anomalies
  • test_bundle_size_bytes_in_return_dict — matches wire size exactly
  • test_phase_timings_sum_within_elapsed — sum ≤ elapsed_ms + 50ms overhead budget
  • test_summary_log_contains_phase_breakdown — caplog verifies all labels in one line

All phases recap

Phase Tests Invariants
1 8 bundle.index job enqueued, objects written, snapshot/commit rows, object_refs
2 3 content_cache fetch-before-job, promotion after job, large-bundle skip
3 3 partial MinIO failure + retry, pending→running→done lifecycle, fail→pending reset
4 5 per-phase timing dict, non-negative values, bundle_size_bytes, sum ≤ elapsed, structured log

Closing.