gabriel / musehub public
Closed #42
filed by gabriel human · 40 days ago

Full content-addressing across the wire: commits and snapshots as first-class blobs

0 Anchors
Blast radius
Churn 30d
0 Proposals

Problem

The wire protocol and storage layer are almost a shining example of content-addressing — but not quite.

Current state:

Object type sha256 ID Stored as blob in MinIO Stored as DB row
File blobs ✅ (index)
Snapshot manifests ✅ (source of truth)
Commit records ✅ (source of truth)

The IDs are right everywhere. The storage layer hasn't caught up. Commits and snapshots are stored as DB rows that happen to have sha256 IDs — not as blobs that the DB indexes. This breaks the core invariant:

DB presence = blob presence in the object store.

The consequence: the C (COMMIT_PACK) frame must carry all commit and snapshot data inline on every push, because the server cannot say "I already have snapshot sha256:abc" at the object DAG level. The wire cannot skip what it cannot prove it has.

Target state:

Every DAG node = canonical bytes → sha256 ID → stored as blob in MinIO
DB = derived read-model index (populated from blobs, not the authoritative source)
Wire = all O frames; C frame = lightweight "advance branch to sha256:commit_id"

This is what makes the presign fast-path possible at the commit/snapshot layer, enables full deduplication, and makes have/want negotiation work at every level of the DAG.


TDD Approach

Each phase: write the failing test first, then implement until it passes. No phase ships without a green test.


Phase 1 — Commit records as content-addressed blobs

What changes:

  • CommitRecord gains a canonical serialization: canonical_bytes(commit) → sha256 ID (already done for the ID, now serialize the full record)
  • _flush_batch / phase 8: after DB INSERT, store canonical bytes as a blob via backend.put(commit_id, canonical_bytes)
  • backend.exists(commit_id) becomes the truth check — DB presence ↔ blob presence, invariant enforced

Failing test to write first:

async def test_commit_record_stored_as_blob(db_session, backend):
    """After push, commit blobs exist in the object store keyed by commit_id."""
    repo = await create_repo(db_session, owner="gabriel")
    commits, snapshots = _make_chain(3)
    await _do_push(db_session, repo, commits, snapshots)
    for c in commits:
        assert await backend.exists(c["commit_id"]), (
            f"commit {c['commit_id']} not found in object store"
        )

Pass criterion: backend.exists(commit_id) returns True for every pushed commit.


Phase 2 — Snapshot manifests as content-addressed blobs

What changes:

  • SnapshotRecord gains canonical serialization: canonical_bytes(snapshot) → sha256 ID (already done for ID derivation, now store the bytes)
  • Phase 7 (snapshot upserts): after DB UPSERT, backend.put(snapshot_id, canonical_bytes)
  • Deduplication: if await backend.exists(snapshot_id) → skip PUT (blob already there, invariant holds)

Failing test to write first:

async def test_snapshot_stored_as_blob(db_session, backend):
    """After push, snapshot manifests exist in the object store keyed by snapshot_id."""
    repo = await create_repo(db_session, owner="gabriel")
    commits, snapshots = _make_chain(3)
    await _do_push(db_session, repo, commits, snapshots)
    for s in snapshots:
        assert await backend.exists(s["snapshot_id"]), (
            f"snapshot {s['snapshot_id']} not found in object store"
        )

Pass criterion: backend.exists(snapshot_id) returns True for every pushed snapshot.


Phase 3 — Wire protocol: C frame becomes a branch-ref-advance

What changes:

  • C (COMMIT_PACK) frame payload shrinks to: {"t": "C", "tip_commit_id": "sha256:...", "branch": "main"}
  • All commit and snapshot data travels as O (OBJECT) frames, typed by a new "kind" field: "kind": "commit" or "kind": "snapshot"
  • Server's frame_loop: when it receives O frames with kind=commit or kind=snapshot, it deserializes and indexes them into the DB (same as Phase 1/2 puts, but now driven from the wire)
  • When server receives C frame, it just advances the branch ref to tip_commit_id — the commits and snapshots are already stored

Wire shape:

H  header (n_objects includes commit+snapshot blobs)
O  kind=blob,    id=sha256:file1,     content=...
O  kind=snapshot, id=sha256:snap1,   content=canonical_bytes(snapshot1)
O  kind=commit,  id=sha256:commit1,  content=canonical_bytes(commit1)
C  tip_commit_id=sha256:commit1, branch=main
E  end

Failing test to write first:

async def test_c_frame_without_inline_data_accepted(db_session):
    """Server accepts C frame with only tip_commit_id (no inline commits/snapshots)."""
    # Build O frames for commits and snapshots, send C frame with only tip ref
    ...
    result = next((f for f in frames if f.get("t") == SFRAME_RESULT), None)
    assert result is not None and result.get("ok") is True

Pass criterion: Push with new wire shape succeeds; branch advances; all DB rows populated from blob deserialization.


Phase 4 — Client-side walk: commit and snapshot blobs in have/want

What changes:

  • collect_object_ids in muse/core/pack.py includes commit and snapshot blob IDs in the reachable object set
  • have list in H frame includes commit IDs the server already has (not just file blob IDs)
  • Server uses backend.exists(commit_id) in the have check during negotiation — if present, skip that commit's entire subgraph
  • Re-push of the same commits: zero O frames for commits/snapshots/file blobs (server has everything)

Failing test to write first:

async def test_repush_sends_zero_frames(db_session):
    """Second push of identical commits sends zero O frames (server has all blobs)."""
    repo = await create_repo(db_session, owner="gabriel")
    commits, snapshots = _make_chain(10)
    await _do_push(db_session, repo, commits, snapshots)
    # Capture second push — count O frames received by server
    server_o_frame_count = ...
    await _do_push(db_session, repo, commits, snapshots)
    assert server_o_frame_count == 0

Pass criterion: Re-push of identical content sends only H + C + E (zero O frames).


Phase 5 — DB as derived read model

What changes:

  • DB rows for commits and snapshots are populated exclusively by deserializing blobs from the object store
  • A new internal function _index_from_blob(commit_id) → CommitRow reads canonical bytes, deserializes, writes DB row — used both during push and during any future re-index operation
  • A muse hub repo reindex admin command (or a background task) can rebuild the entire DB from blobs alone
  • The invariant becomes provable: muse verify --full checks that every DB commit row has a corresponding blob with matching sha256 ID

Failing test to write first:

async def test_db_reconstructible_from_blobs(db_session, backend):
    """Dropping all DB commit rows and re-indexing from blobs recovers identical rows."""
    repo = await create_repo(db_session, owner="gabriel")
    commits, snapshots = _make_chain(20)
    await _do_push(db_session, repo, commits, snapshots)
    # Drop all commit rows
    await db_session.execute(delete(CommitRow).where(...))
    # Re-index from blobs
    await reindex_from_blobs(db_session, repo.repo_id, backend)
    # Verify rows are back
    for c in commits:
        row = await db_session.get(CommitRow, c["commit_id"])
        assert row is not None

Pass criterion: DB can be fully reconstructed from blobs; muse verify --full exits 0.


Success Metrics

Metric Target
Re-push of any repo 0 O frames sent, < 1s round-trip
have/want negotiation Works at all DAG levels (files, snapshots, commits)
DB integrity Every commit and snapshot row has a corresponding blob
muse verify --full Exits 0 on any repo
Wire C frame size < 100 bytes (tip_commit_id + branch only)

Out of Scope

  • On-disk format changes to the muse client
  • Backwards compatibility with old wire format (we version the wire, not shim it)
  • The 17× MinIO PUT latency gap (tracked separately)
Activity9
gabriel opened this issue 40 days ago
gabriel 40 days ago

Phase 1 Complete ✅

Commit: sha256:4cca779e27a6

What landed

  • Added _commit_identity_bytes(wire_commit) — computes the canonical null-byte-separated payload whose sha256 equals the commit_id (identical formula to compute_commit_id in muse core)
  • In phase 8 of wire_push_stream, before each bulk DB INSERT batch: parallel asyncio.gather PUTs the identity blob to MinIO for every new commit
  • PUT happens before DB INSERT — invariant maintained: DB presence implies blob presence (same ordering guarantee as file blobs)
  • Fixed three test files where mock backends declared store_object/object_exists but not put as AsyncMock, causing TypeError on gather; fixed TestT5ServiceStream.stub_session to patch AsyncSessionLocal so unit tests survive the session-close/reopen mid-stream

What the blob contains (and why not more)

The blob at key=commit_id stores only the identity fields: sorted parent hex IDs, snapshot_id hex, message, committed_at, author, signer_public_key — null-byte separated. Mutable annotations (agent_id, reviewed_by, test_runs, model_id, etc.) are DB-only. They cannot be part of a content-addressed blob because commit_id is derived before they exist, and allowing annotation updates to produce a new blob would make the key non-stable. The DB remains the source of truth for annotations; the blob proves DAG identity.

Test

test_P1_commit_blobs_exist_in_object_store — written red first, now green. Pushes 5 commits, then calls backend.exists(commit_id) for each; asserts all 5 blobs are present in MinIO.


Up next: Phase 2 — snapshot manifests as content-addressed blobs

gabriel 40 days ago

Storage abstraction rename landed (dev, sha256:bc1e4d0)

All R2_* environment variables renamed to BLOB_STORAGE_* to be provider-agnostic:

Old New
R2_BUCKET BLOB_STORAGE_BUCKET
R2_ENDPOINT BLOB_STORAGE_ENDPOINT
R2_PUBLIC_ENDPOINT BLOB_STORAGE_PUBLIC_ENDPOINT
R2_ACCESS_KEY_ID BLOB_STORAGE_ACCESS_KEY_ID
R2_SECRET_ACCESS_KEY BLOB_STORAGE_SECRET_ACCESS_KEY
R2_REGION BLOB_STORAGE_REGION

Internal names (_R2_PUT_SEM, _R2_CONCURRENCY_LIMIT, settings.r2_*) updated throughout. Docker Compose, deploy scripts, and .env.example all updated. 112 unit tests pass.

Phase 1 and Phase 2 are on dev — commit records and snapshot manifests are now content-addressed blobs in object storage. The C1 timing test is temporarily failing because local MinIO is unconfigured. The content-addressing invariant tests (P1/P2) pass.

gabriel 40 days ago

Status checkpoint — rebooting from first principles

What we know

MinIO performs well when hit directly:

Single PUT:          12.8ms
10 sequential PUTs:   5.5ms each
100 concurrent PUTs: 107ms total

The math should work: 500 blob PUTs via asyncio.gather with a 100-worker thread pool should run in 5 rounds × ~13ms ≈ 65ms. Not 28 extra seconds.

C1 test (500 commits, 0 objects) is failing at 33.69s — it was passing before Phase 1/2 added blob PUTs to the finalization path.

P1 and P2 invariant tests pass — commit and snapshot blobs are being written to MinIO correctly.

Storage rename shippedR2_*BLOB_STORAGE_* throughout.

What we don't know yet

The delta (~28s) between before and after Phase 1/2 doesn't match MinIO's measured throughput. Something in the push pipeline is serializing or blocking that we haven't instrumented. Suspects:

  • asyncio gather not actually running concurrently (semaphore or event-loop blocking)
  • DB batch commits (500 snapshot upserts + 500 commit INSERTs) taking most of the time
  • Something between the gather and the actual socket write

Next: first-principles instrumentation

Build up from the smallest possible case with real timing numbers at every layer:

  1. Baseline: 1 commit end-to-end, measure each named phase
  2. Scale: 1 → 10 → 100 → 500 commits, real numbers at each step
  3. Separate blob-PUT time from DB time in the output
  4. Find the actual knee of the curve before writing any fixes

No guessing. Real data all the way.

gabriel 40 days ago

Triangulation results — all components fast in isolation

Numbers so far

L0 — Direct boto3 PUTs (no asyncio)

N Total Per-PUT
1 5ms 5ms
10 22ms 2.2ms
100 230ms 2.3ms

L1 — BlobBackend.put via asyncio.gather

N Total Per-PUT
1 10ms 10ms
10 17ms 1.7ms
100 120ms 1.2ms
500 490ms 1.0ms

L2 — DB bulk_upsert_snapshot_entries

N Total Per-row
1 15ms 15ms
10 16ms 1.6ms
100 30ms 0.3ms
500 70ms 0.1ms

L3 — Raw INSERT via SQLAlchemy text(): 5ms ✅ L4 — ORM pg_insert ON CONFLICT DO UPDATE (isolated): 37ms ✅

wire_push_stream 1 commit: 30.5s ❌

The smoking gun

Progress frames from wire_push_stream with 1 commit:

[S+0.1s] 8: row-build loop (1 new rows): 0.000s
[S+30.4s] 8: commit INSERT batch 1 (0–1/1): 30.330s

The INSERT batch timer covers three things:

  1. asyncio.gather(*[backend.put(cid, cb)]) — commit blob PUT
  2. session.execute(pg_insert(...).on_conflict_do_update(...))
  3. session.commit()

Each of those is fast in isolation. Together they take 30 seconds. Something about the composition is the problem — likely an open DB transaction held across MinIO I/O, or a session/connection state issue.

Next: TDD triangulation

Write the smallest possible test that reproduces the 30s delay: open session → SELECT (start transaction) → blob PUT via gather → INSERT → commit Measure each sub-step explicitly. That pins exactly which transition is the bottleneck.

gabriel 40 days ago

Triangulation complete — root cause isolated to one line

Sub-timing inside phase 8 INSERT batch

blob_put=30.261s   execute=0.038s   commit=0.008s

The SQL is fast. The blob PUT is the problem.

Individual components are all fast

Operation Time
Phase 7 blob PUT (same backend, same function) 11ms ✅
Phase 8 blob PUT 30s ❌
bench_phase8_repro SELECT → blob PUT → INSERT → commit <200ms ✅
ORM pg_insert ON CONFLICT DO UPDATE (isolated) 37ms ✅

The structural difference

Phase 7 blob PUT happens before any yield with an open asyncpg transaction. Phase 8 blob PUT happens after a yield _tprog(...) with an open asyncpg transaction (from the existing-commit SELECT that was not yet committed).

wire_push_stream is an async generator. The hypothesis: something about calling run_in_executor (the thread-based boto3 PUT) after a yield, while an asyncpg transaction is open on the same event loop, causes a 30s block.

Next atomic test

The smallest possible reproducer:

async def gen(session, backend):
    await session.execute(SELECT)   # open transaction
    yield b"chunk"                  # suspend generator
    # resume — does this block for 30s?
    await asyncio.gather(backend.put(oid, data))
    yield b"done"

async for chunk in gen(session, backend):
    pass

If this reproduces the 30s: root cause confirmed (async generator + open asyncpg transaction + run_in_executor interaction). If not: something else in wire_push_stream is the differentiator.

gabriel 40 days ago

Discovery: 30s confirmed in pytest context — new first-principles result

Status: Reproduced the 30s definitively inside pytest with proper conftest fixtures.

What we know

test_diag_phase8.py runs wire_push_stream with 1 commit, 0 objects, using the real db_session fixture (conftest patches get_backend to real MinIO, database._async_session_factory points to the test engine). Result:

PROGRESS: [S+0.1s] 7: snapshot blob PUTs: 0.112s (1 blobs)       ← FAST
PROGRESS: [S+0.2s] 8: existing commit check (1 ids): 0.021s
PROGRESS: [S+0.2s] 8: external parent check (0 parents): 0.000s
PROGRESS: [S+0.2s] 8: row-build loop (1 new rows): 0.000s
PROGRESS: [S+30.5s] 8: commit INSERT batch 1: blob_put=30.266s    ← 30s

Total wall-clock: 30.613s. The push succeeds — result ok=True. The 30s is in the blob PUT, not a timeout that kills the push.

What disproves the old hypothesis

test_phase8_atomic.py T1/T2/T3 all pass in < 2s total:

  • T2: open asyncpg tx → yield → run_in_executor — fast
  • T3: open asyncpg tx → yield → run_in_executor → DB execute — fast

So the problem is NOT simply: open transaction + generator yield + run_in_executor.

The structural difference: phase 7 vs phase 8

Phase 7 blob PUT (fast, 0.112s) Phase 8 blob PUT (slow, 30.266s)
Session state at PUT No open transaction (previous batch committed) Open transaction (from 3 preceding SELECTs)
Preceding yields ~12 generator yields 3 generator yields since last SELECT
Session origin Internal (AsyncSessionLocal()) Same internal session

The ambiguity that remains — next first principles test

The 30s is suspiciously exact. Two candidate causes:

  1. socket.getdefaulttimeout() set to 30s — asyncpg or SQLAlchemy sets a global socket timeout that bleeds into boto3's thread-pool HTTP connections.
  2. Multiple generator yields while session open-in-transaction — T2/T3 had exactly 1 yield before the PUT; wire_push_stream has ~3 yields between the last SELECT and the PUT. Each yield suspends the generator and lets the event loop run other tasks. Something in those additional event-loop iterations causes the delay.

Next test

Atomic T4: open asyncpg tx → yield 3× → run_in_executor, checking socket.getdefaulttimeout() at each step. If this is slow, the cause is yield count / event-loop state. If fast, the cause is something specific to wire_push_stream's context (session swap, or state accumulated from 12+ prior yields).

gabriel 40 days ago

Discovery: event loop is NOT blocked — thread itself takes 30s

New finding from T4–T7 + heartbeat diagnostic:

Event loop alive test result

Added a concurrent 1-second heartbeat task alongside wire_push_stream. Result:

Heartbeat ticks: 30 (expected ~30 if loop free)
  tick 1: +1.00s
  tick 2: +2.00s
  ...
  tick 30: +30.04s

The event loop fires every second throughout the 30s wait. It is not blocked.

This means run_in_executor is behaving correctly (it does not block the event loop). The 30s is inside the thread — specifically inside client.put_object() in urllib3/boto3.

What we disproved (T4–T7)

Hypothesis Test Result
socket.getdefaulttimeout() set to 30s by asyncpg T4 ✅ PASS — timeout stays None
NullPool reconnect + 3 yields + PUT causes 30s T5 (fresh backend) ✅ PASS — 111ms
Conftest-patched backend + phase-7 warmup + PUT T6 ✅ PASS — 9ms
Many (5+) NullPool cycles before PUT T7 ✅ PASS — 9ms

What remains

All T4–T7 tests use controlled generators where the PUT is the 4th–6th operation. In wire_push_stream, the phase-8 PUT is the 20th+ yield. But we do NOT believe yield count is the cause — the event loop is free throughout.

Root cause is inside the thread's client.put_object() call.

Inside put_object, urllib3 does:

  1. Get connection from pool (reuse or new)
  2. Send request headers with Expect: 100-continue
  3. Wait for 100 Continue response (socket read)
  4. Send body
  5. Read response

The 30s is one of these steps. The MissingHeaderBodySeparatorDefect warning only appears for certain PUTs (the slow ones?). This might be a symptom.

Next most atomic test (T8)

Intercept urllib3's connection pool to measure which sub-step of put_object takes the 30s:

  • Is it Starting new HTTP connection (TCP connect)?
  • Is it the 100-continue wait (socket read timeout)?
  • Is it the response read?

The exact timeout value (30s) must come from a specific socket or urllib3 timeout setting on the connection used for phase 8.

gabriel 40 days ago

Root cause identified: MissingHeaderBodySeparatorDefect leaves urllib3 connection dirty

T8 result

Patched BlobBackend._s3_put to measure both PUTs from inside the thread:

PUT 1 (phase 7 snapshot): put_object=0.006s   ← fast
PUT 2 (phase 8 commit):   put_object=30.261s  ← 30s

default_timeout=None in both — socket default timeout is not the cause.

A urllib3 warning appears for PUT 2 only:

MissingHeaderBodySeparatorDefect: unparsed_data='HTTP/1.1 200 OK\r\nContent-Length: 0\r\n...\r\n\r\n'

The causal chain

  1. PUT 1 (phase 7 snapshot blob) — MinIO's response triggers MissingHeaderBodySeparatorDefect in urllib3/Python 3.14's HTTP parser
  2. urllib3 warns and continues, but because the headers weren't parsed correctly, Content-Length: 0 is not extracted
  3. urllib3 doesn't know the response body is empty — it keeps the socket open and tries to read more bytes
  4. urllib3 returns this dirty connection to the pool (keep-alive)
  5. PUT 2 (phase 8 commit blob) — urllib3 reuses the dirty connection
  6. PUT 2 sends headers; MinIO receives a confused request (previous unread state + new headers)
  7. No response comes back for 30s — exactly MinIO's default keep-alive timeout
  8. MinIO closes the connection; urllib3 gets EOF, processes the response, warns again, returns
  9. Push succeeds but 30s too late

Why T6/T7 were fast

T6/T7 used b"phase7-sim" (10-byte body) for the first PUT — no MissingHeaderBodySeparatorDefect. Clean connection → PUT 2 reuses it cleanly → fast.

The trigger

Something about the phase-7 snapshot PUT body (_snapshot_identity_bytes(wire_snap)) causes MinIO's response to be parsed incorrectly by Python 3.14's HTTP client. The botocore request also sends x-amz-checksum-crc32 and x-amz-sdk-checksum-algorithm headers — these may be what causes MinIO to send a non-standard response.

Next atomic test (T9)

TDD the fix: make 2 sequential PUTs on the conftest backend using the SAME body that triggers the defect. T9 must FAIL before the fix and PASS after. Candidates for fix:

  • Config(request_checksum_calculation='when_required') — disables CRC32 headers that may confuse MinIO
  • Config(read_timeout=5, retries={'mode': 'standard', 'max_attempts': 3}) — fail fast on stale connections
  • urllib3 connection pool set to close-on-defect
gabriel 40 days ago

Resolution — FIXED

Root cause confirmed: Python 3.14's http.client misparses the HTTP exchange when botocore sends Expect: 100-continue and MinIO echoes 100 Continue + 200 OK in the same TCP segment. The 200 OK bytes appear as unparsed_data after the 100 Continue headers, triggering MissingHeaderBodySeparatorDefect in urllib3.

urllib3 catches the defect but returns the connection to the keep-alive pool without draining the response body (dirty connection). Phase 8's next put_object call reuses this dirty connection; MinIO reads the leftover bytes as a new request, becomes confused, and closes the connection after its 30-second keep-alive timeout.

The complete causal chain:

phase 7 PUT  →  botocore sends Expect: 100-continue
             →  MinIO sends 100 + 200 OK in same TCP segment
             →  Python 3.14 http.client: MissingHeaderBodySeparatorDefect
             →  urllib3 pools the connection without draining response body
             →  phase 8 PUT reuses dirty connection
             →  MinIO confused by leftover bytes, waits 30s, closes

Fix: Register a before-send.s3.PutObject event handler in BlobBackend._get_client() that removes Expect from the request before each PUT. Without Expect: 100-continue, botocore sends headers and body in a single shot. MinIO responds with a plain 200 OK. Python 3.14's parser handles it cleanly. No dirty connection.

Also added request_checksum_calculation='when_required' and response_checksum_validation='when_required' to suppress optional CRC32 headers that add response overhead.

TDD regression suite (T1–T9): Each test isolated one variable, ruling out: socket timeout mutation (T4), NullPool cycle count (T5, T7), event loop blocking (heartbeat diagnostic), open asyncpg transactions (T1–T3), conftest backend warm-up order (T6). T8 directly observed the slow put_object call and the defect warning. T9 is the canonical failing→passing regression test.

Result: phase 8 blob_put drops from 30.266s → 0.005s (6,000×). Total wire_push_stream for 1 commit: 30.5s → 0.2s. 9/9 tests pass.

Commit: sha256:8c0f8ca2509a