Full content-addressing across the wire: commits and snapshots as first-class blobs
Problem
The wire protocol and storage layer are almost a shining example of content-addressing — but not quite.
Current state:
| Object type | sha256 ID | Stored as blob in MinIO | Stored as DB row |
|---|---|---|---|
| File blobs | ✅ | ✅ | ✅ (index) |
| Snapshot manifests | ✅ | ❌ | ✅ (source of truth) |
| Commit records | ✅ | ❌ | ✅ (source of truth) |
The IDs are right everywhere. The storage layer hasn't caught up. Commits and snapshots are stored as DB rows that happen to have sha256 IDs — not as blobs that the DB indexes. This breaks the core invariant:
DB presence = blob presence in the object store.
The consequence: the C (COMMIT_PACK) frame must carry all commit and snapshot data inline on every push, because the server cannot say "I already have snapshot sha256:abc" at the object DAG level. The wire cannot skip what it cannot prove it has.
Target state:
Every DAG node = canonical bytes → sha256 ID → stored as blob in MinIO
DB = derived read-model index (populated from blobs, not the authoritative source)
Wire = all O frames; C frame = lightweight "advance branch to sha256:commit_id"
This is what makes the presign fast-path possible at the commit/snapshot layer, enables full deduplication, and makes have/want negotiation work at every level of the DAG.
TDD Approach
Each phase: write the failing test first, then implement until it passes. No phase ships without a green test.
Phase 1 — Commit records as content-addressed blobs
What changes:
CommitRecordgains a canonical serialization:canonical_bytes(commit) → sha256 ID(already done for the ID, now serialize the full record)_flush_batch/ phase 8: after DB INSERT, store canonical bytes as a blob viabackend.put(commit_id, canonical_bytes)backend.exists(commit_id)becomes the truth check — DB presence ↔ blob presence, invariant enforced
Failing test to write first:
async def test_commit_record_stored_as_blob(db_session, backend):
"""After push, commit blobs exist in the object store keyed by commit_id."""
repo = await create_repo(db_session, owner="gabriel")
commits, snapshots = _make_chain(3)
await _do_push(db_session, repo, commits, snapshots)
for c in commits:
assert await backend.exists(c["commit_id"]), (
f"commit {c['commit_id']} not found in object store"
)
Pass criterion: backend.exists(commit_id) returns True for every pushed commit.
Phase 2 — Snapshot manifests as content-addressed blobs
What changes:
SnapshotRecordgains canonical serialization:canonical_bytes(snapshot) → sha256 ID(already done for ID derivation, now store the bytes)- Phase 7 (snapshot upserts): after DB UPSERT,
backend.put(snapshot_id, canonical_bytes) - Deduplication: if
await backend.exists(snapshot_id)→ skip PUT (blob already there, invariant holds)
Failing test to write first:
async def test_snapshot_stored_as_blob(db_session, backend):
"""After push, snapshot manifests exist in the object store keyed by snapshot_id."""
repo = await create_repo(db_session, owner="gabriel")
commits, snapshots = _make_chain(3)
await _do_push(db_session, repo, commits, snapshots)
for s in snapshots:
assert await backend.exists(s["snapshot_id"]), (
f"snapshot {s['snapshot_id']} not found in object store"
)
Pass criterion: backend.exists(snapshot_id) returns True for every pushed snapshot.
Phase 3 — Wire protocol: C frame becomes a branch-ref-advance
What changes:
- C (COMMIT_PACK) frame payload shrinks to:
{"t": "C", "tip_commit_id": "sha256:...", "branch": "main"} - All commit and snapshot data travels as O (OBJECT) frames, typed by a new
"kind"field:"kind": "commit"or"kind": "snapshot" - Server's frame_loop: when it receives
Oframes withkind=commitorkind=snapshot, it deserializes and indexes them into the DB (same as Phase 1/2 puts, but now driven from the wire) - When server receives
Cframe, it just advances the branch ref totip_commit_id— the commits and snapshots are already stored
Wire shape:
H header (n_objects includes commit+snapshot blobs)
O kind=blob, id=sha256:file1, content=...
O kind=snapshot, id=sha256:snap1, content=canonical_bytes(snapshot1)
O kind=commit, id=sha256:commit1, content=canonical_bytes(commit1)
C tip_commit_id=sha256:commit1, branch=main
E end
Failing test to write first:
async def test_c_frame_without_inline_data_accepted(db_session):
"""Server accepts C frame with only tip_commit_id (no inline commits/snapshots)."""
# Build O frames for commits and snapshots, send C frame with only tip ref
...
result = next((f for f in frames if f.get("t") == SFRAME_RESULT), None)
assert result is not None and result.get("ok") is True
Pass criterion: Push with new wire shape succeeds; branch advances; all DB rows populated from blob deserialization.
Phase 4 — Client-side walk: commit and snapshot blobs in have/want
What changes:
collect_object_idsinmuse/core/pack.pyincludes commit and snapshot blob IDs in the reachable object sethavelist in H frame includes commit IDs the server already has (not just file blob IDs)- Server uses
backend.exists(commit_id)in thehavecheck during negotiation — if present, skip that commit's entire subgraph - Re-push of the same commits: zero O frames for commits/snapshots/file blobs (server has everything)
Failing test to write first:
async def test_repush_sends_zero_frames(db_session):
"""Second push of identical commits sends zero O frames (server has all blobs)."""
repo = await create_repo(db_session, owner="gabriel")
commits, snapshots = _make_chain(10)
await _do_push(db_session, repo, commits, snapshots)
# Capture second push — count O frames received by server
server_o_frame_count = ...
await _do_push(db_session, repo, commits, snapshots)
assert server_o_frame_count == 0
Pass criterion: Re-push of identical content sends only H + C + E (zero O frames).
Phase 5 — DB as derived read model
What changes:
- DB rows for commits and snapshots are populated exclusively by deserializing blobs from the object store
- A new internal function
_index_from_blob(commit_id) → CommitRowreads canonical bytes, deserializes, writes DB row — used both during push and during any future re-index operation - A
muse hub repo reindexadmin command (or a background task) can rebuild the entire DB from blobs alone - The invariant becomes provable:
muse verify --fullchecks that every DB commit row has a corresponding blob with matching sha256 ID
Failing test to write first:
async def test_db_reconstructible_from_blobs(db_session, backend):
"""Dropping all DB commit rows and re-indexing from blobs recovers identical rows."""
repo = await create_repo(db_session, owner="gabriel")
commits, snapshots = _make_chain(20)
await _do_push(db_session, repo, commits, snapshots)
# Drop all commit rows
await db_session.execute(delete(CommitRow).where(...))
# Re-index from blobs
await reindex_from_blobs(db_session, repo.repo_id, backend)
# Verify rows are back
for c in commits:
row = await db_session.get(CommitRow, c["commit_id"])
assert row is not None
Pass criterion: DB can be fully reconstructed from blobs; muse verify --full exits 0.
Success Metrics
| Metric | Target |
|---|---|
| Re-push of any repo | 0 O frames sent, < 1s round-trip |
have/want negotiation |
Works at all DAG levels (files, snapshots, commits) |
| DB integrity | Every commit and snapshot row has a corresponding blob |
muse verify --full |
Exits 0 on any repo |
| Wire C frame size | < 100 bytes (tip_commit_id + branch only) |
Out of Scope
- On-disk format changes to the muse client
- Backwards compatibility with old wire format (we version the wire, not shim it)
- The 17× MinIO PUT latency gap (tracked separately)
Storage abstraction rename landed (dev, sha256:bc1e4d0)
All R2_* environment variables renamed to BLOB_STORAGE_* to be provider-agnostic:
| Old | New |
|---|---|
R2_BUCKET |
BLOB_STORAGE_BUCKET |
R2_ENDPOINT |
BLOB_STORAGE_ENDPOINT |
R2_PUBLIC_ENDPOINT |
BLOB_STORAGE_PUBLIC_ENDPOINT |
R2_ACCESS_KEY_ID |
BLOB_STORAGE_ACCESS_KEY_ID |
R2_SECRET_ACCESS_KEY |
BLOB_STORAGE_SECRET_ACCESS_KEY |
R2_REGION |
BLOB_STORAGE_REGION |
Internal names (_R2_PUT_SEM, _R2_CONCURRENCY_LIMIT, settings.r2_*) updated throughout. Docker Compose, deploy scripts, and .env.example all updated. 112 unit tests pass.
Phase 1 and Phase 2 are on dev — commit records and snapshot manifests are now content-addressed blobs in object storage. The C1 timing test is temporarily failing because local MinIO is unconfigured. The content-addressing invariant tests (P1/P2) pass.
Status checkpoint — rebooting from first principles
What we know
MinIO performs well when hit directly:
Single PUT: 12.8ms
10 sequential PUTs: 5.5ms each
100 concurrent PUTs: 107ms total
The math should work: 500 blob PUTs via asyncio.gather with a 100-worker thread pool should run in 5 rounds × ~13ms ≈ 65ms. Not 28 extra seconds.
C1 test (500 commits, 0 objects) is failing at 33.69s — it was passing before Phase 1/2 added blob PUTs to the finalization path.
P1 and P2 invariant tests pass — commit and snapshot blobs are being written to MinIO correctly.
Storage rename shipped — R2_* → BLOB_STORAGE_* throughout.
What we don't know yet
The delta (~28s) between before and after Phase 1/2 doesn't match MinIO's measured throughput. Something in the push pipeline is serializing or blocking that we haven't instrumented. Suspects:
- asyncio gather not actually running concurrently (semaphore or event-loop blocking)
- DB batch commits (500 snapshot upserts + 500 commit INSERTs) taking most of the time
- Something between the gather and the actual socket write
Next: first-principles instrumentation
Build up from the smallest possible case with real timing numbers at every layer:
- Baseline: 1 commit end-to-end, measure each named phase
- Scale: 1 → 10 → 100 → 500 commits, real numbers at each step
- Separate blob-PUT time from DB time in the output
- Find the actual knee of the curve before writing any fixes
No guessing. Real data all the way.
Triangulation results — all components fast in isolation
Numbers so far
L0 — Direct boto3 PUTs (no asyncio)
| N | Total | Per-PUT |
|---|---|---|
| 1 | 5ms | 5ms |
| 10 | 22ms | 2.2ms |
| 100 | 230ms | 2.3ms |
L1 — BlobBackend.put via asyncio.gather
| N | Total | Per-PUT |
|---|---|---|
| 1 | 10ms | 10ms |
| 10 | 17ms | 1.7ms |
| 100 | 120ms | 1.2ms |
| 500 | 490ms | 1.0ms |
L2 — DB bulk_upsert_snapshot_entries
| N | Total | Per-row |
|---|---|---|
| 1 | 15ms | 15ms |
| 10 | 16ms | 1.6ms |
| 100 | 30ms | 0.3ms |
| 500 | 70ms | 0.1ms |
L3 — Raw INSERT via SQLAlchemy text(): 5ms ✅ L4 — ORM pg_insert ON CONFLICT DO UPDATE (isolated): 37ms ✅
wire_push_stream 1 commit: 30.5s ❌
The smoking gun
Progress frames from wire_push_stream with 1 commit:
[S+0.1s] 8: row-build loop (1 new rows): 0.000s
[S+30.4s] 8: commit INSERT batch 1 (0–1/1): 30.330s
The INSERT batch timer covers three things:
asyncio.gather(*[backend.put(cid, cb)])— commit blob PUTsession.execute(pg_insert(...).on_conflict_do_update(...))session.commit()
Each of those is fast in isolation. Together they take 30 seconds. Something about the composition is the problem — likely an open DB transaction held across MinIO I/O, or a session/connection state issue.
Next: TDD triangulation
Write the smallest possible test that reproduces the 30s delay: open session → SELECT (start transaction) → blob PUT via gather → INSERT → commit Measure each sub-step explicitly. That pins exactly which transition is the bottleneck.
Triangulation complete — root cause isolated to one line
Sub-timing inside phase 8 INSERT batch
blob_put=30.261s execute=0.038s commit=0.008s
The SQL is fast. The blob PUT is the problem.
Individual components are all fast
| Operation | Time |
|---|---|
| Phase 7 blob PUT (same backend, same function) | 11ms ✅ |
| Phase 8 blob PUT | 30s ❌ |
| bench_phase8_repro SELECT → blob PUT → INSERT → commit | <200ms ✅ |
| ORM pg_insert ON CONFLICT DO UPDATE (isolated) | 37ms ✅ |
The structural difference
Phase 7 blob PUT happens before any yield with an open asyncpg transaction.
Phase 8 blob PUT happens after a yield _tprog(...) with an open asyncpg transaction (from the existing-commit SELECT that was not yet committed).
wire_push_stream is an async generator. The hypothesis: something about calling run_in_executor (the thread-based boto3 PUT) after a yield, while an asyncpg transaction is open on the same event loop, causes a 30s block.
Next atomic test
The smallest possible reproducer:
async def gen(session, backend):
await session.execute(SELECT) # open transaction
yield b"chunk" # suspend generator
# resume — does this block for 30s?
await asyncio.gather(backend.put(oid, data))
yield b"done"
async for chunk in gen(session, backend):
pass
If this reproduces the 30s: root cause confirmed (async generator + open asyncpg transaction + run_in_executor interaction). If not: something else in wire_push_stream is the differentiator.
Discovery: 30s confirmed in pytest context — new first-principles result
Status: Reproduced the 30s definitively inside pytest with proper conftest fixtures.
What we know
test_diag_phase8.py runs wire_push_stream with 1 commit, 0 objects, using the real db_session fixture (conftest patches get_backend to real MinIO, database._async_session_factory points to the test engine). Result:
PROGRESS: [S+0.1s] 7: snapshot blob PUTs: 0.112s (1 blobs) ← FAST
PROGRESS: [S+0.2s] 8: existing commit check (1 ids): 0.021s
PROGRESS: [S+0.2s] 8: external parent check (0 parents): 0.000s
PROGRESS: [S+0.2s] 8: row-build loop (1 new rows): 0.000s
PROGRESS: [S+30.5s] 8: commit INSERT batch 1: blob_put=30.266s ← 30s
Total wall-clock: 30.613s. The push succeeds — result ok=True. The 30s is in the blob PUT, not a timeout that kills the push.
What disproves the old hypothesis
test_phase8_atomic.py T1/T2/T3 all pass in < 2s total:
- T2:
open asyncpg tx → yield → run_in_executor— fast - T3:
open asyncpg tx → yield → run_in_executor → DB execute— fast
So the problem is NOT simply: open transaction + generator yield + run_in_executor.
The structural difference: phase 7 vs phase 8
| Phase 7 blob PUT (fast, 0.112s) | Phase 8 blob PUT (slow, 30.266s) | |
|---|---|---|
| Session state at PUT | No open transaction (previous batch committed) | Open transaction (from 3 preceding SELECTs) |
| Preceding yields | ~12 generator yields | 3 generator yields since last SELECT |
| Session origin | Internal (AsyncSessionLocal()) |
Same internal session |
The ambiguity that remains — next first principles test
The 30s is suspiciously exact. Two candidate causes:
socket.getdefaulttimeout()set to 30s — asyncpg or SQLAlchemy sets a global socket timeout that bleeds into boto3's thread-pool HTTP connections.- Multiple generator yields while session open-in-transaction — T2/T3 had exactly 1 yield before the PUT; wire_push_stream has ~3 yields between the last SELECT and the PUT. Each yield suspends the generator and lets the event loop run other tasks. Something in those additional event-loop iterations causes the delay.
Next test
Atomic T4: open asyncpg tx → yield 3× → run_in_executor, checking socket.getdefaulttimeout() at each step. If this is slow, the cause is yield count / event-loop state. If fast, the cause is something specific to wire_push_stream's context (session swap, or state accumulated from 12+ prior yields).
Discovery: event loop is NOT blocked — thread itself takes 30s
New finding from T4–T7 + heartbeat diagnostic:
Event loop alive test result
Added a concurrent 1-second heartbeat task alongside wire_push_stream. Result:
Heartbeat ticks: 30 (expected ~30 if loop free)
tick 1: +1.00s
tick 2: +2.00s
...
tick 30: +30.04s
The event loop fires every second throughout the 30s wait. It is not blocked.
This means run_in_executor is behaving correctly (it does not block the event loop). The 30s is inside the thread — specifically inside client.put_object() in urllib3/boto3.
What we disproved (T4–T7)
| Hypothesis | Test | Result |
|---|---|---|
socket.getdefaulttimeout() set to 30s by asyncpg |
T4 | ✅ PASS — timeout stays None |
| NullPool reconnect + 3 yields + PUT causes 30s | T5 (fresh backend) | ✅ PASS — 111ms |
| Conftest-patched backend + phase-7 warmup + PUT | T6 | ✅ PASS — 9ms |
| Many (5+) NullPool cycles before PUT | T7 | ✅ PASS — 9ms |
What remains
All T4–T7 tests use controlled generators where the PUT is the 4th–6th operation. In wire_push_stream, the phase-8 PUT is the 20th+ yield. But we do NOT believe yield count is the cause — the event loop is free throughout.
Root cause is inside the thread's client.put_object() call.
Inside put_object, urllib3 does:
- Get connection from pool (reuse or new)
- Send request headers with
Expect: 100-continue - Wait for
100 Continueresponse (socket read) - Send body
- Read response
The 30s is one of these steps. The MissingHeaderBodySeparatorDefect warning only appears for certain PUTs (the slow ones?). This might be a symptom.
Next most atomic test (T8)
Intercept urllib3's connection pool to measure which sub-step of put_object takes the 30s:
- Is it
Starting new HTTP connection(TCP connect)? - Is it the
100-continuewait (socket read timeout)? - Is it the response read?
The exact timeout value (30s) must come from a specific socket or urllib3 timeout setting on the connection used for phase 8.
Root cause identified: MissingHeaderBodySeparatorDefect leaves urllib3 connection dirty
T8 result
Patched BlobBackend._s3_put to measure both PUTs from inside the thread:
PUT 1 (phase 7 snapshot): put_object=0.006s ← fast
PUT 2 (phase 8 commit): put_object=30.261s ← 30s
default_timeout=None in both — socket default timeout is not the cause.
A urllib3 warning appears for PUT 2 only:
MissingHeaderBodySeparatorDefect: unparsed_data='HTTP/1.1 200 OK\r\nContent-Length: 0\r\n...\r\n\r\n'
The causal chain
- PUT 1 (phase 7 snapshot blob) — MinIO's response triggers
MissingHeaderBodySeparatorDefectin urllib3/Python 3.14's HTTP parser - urllib3 warns and continues, but because the headers weren't parsed correctly,
Content-Length: 0is not extracted - urllib3 doesn't know the response body is empty — it keeps the socket open and tries to read more bytes
- urllib3 returns this dirty connection to the pool (keep-alive)
- PUT 2 (phase 8 commit blob) — urllib3 reuses the dirty connection
- PUT 2 sends headers; MinIO receives a confused request (previous unread state + new headers)
- No response comes back for 30s — exactly MinIO's default keep-alive timeout
- MinIO closes the connection; urllib3 gets EOF, processes the response, warns again, returns
- Push succeeds but 30s too late
Why T6/T7 were fast
T6/T7 used b"phase7-sim" (10-byte body) for the first PUT — no MissingHeaderBodySeparatorDefect. Clean connection → PUT 2 reuses it cleanly → fast.
The trigger
Something about the phase-7 snapshot PUT body (_snapshot_identity_bytes(wire_snap)) causes MinIO's response to be parsed incorrectly by Python 3.14's HTTP client. The botocore request also sends x-amz-checksum-crc32 and x-amz-sdk-checksum-algorithm headers — these may be what causes MinIO to send a non-standard response.
Next atomic test (T9)
TDD the fix: make 2 sequential PUTs on the conftest backend using the SAME body that triggers the defect. T9 must FAIL before the fix and PASS after. Candidates for fix:
Config(request_checksum_calculation='when_required')— disables CRC32 headers that may confuse MinIOConfig(read_timeout=5, retries={'mode': 'standard', 'max_attempts': 3})— fail fast on stale connections- urllib3 connection pool set to close-on-defect
Resolution — FIXED
Root cause confirmed: Python 3.14's http.client misparses the HTTP exchange when botocore sends Expect: 100-continue and MinIO echoes 100 Continue + 200 OK in the same TCP segment. The 200 OK bytes appear as unparsed_data after the 100 Continue headers, triggering MissingHeaderBodySeparatorDefect in urllib3.
urllib3 catches the defect but returns the connection to the keep-alive pool without draining the response body (dirty connection). Phase 8's next put_object call reuses this dirty connection; MinIO reads the leftover bytes as a new request, becomes confused, and closes the connection after its 30-second keep-alive timeout.
The complete causal chain:
phase 7 PUT → botocore sends Expect: 100-continue
→ MinIO sends 100 + 200 OK in same TCP segment
→ Python 3.14 http.client: MissingHeaderBodySeparatorDefect
→ urllib3 pools the connection without draining response body
→ phase 8 PUT reuses dirty connection
→ MinIO confused by leftover bytes, waits 30s, closes
Fix: Register a before-send.s3.PutObject event handler in BlobBackend._get_client() that removes Expect from the request before each PUT. Without Expect: 100-continue, botocore sends headers and body in a single shot. MinIO responds with a plain 200 OK. Python 3.14's parser handles it cleanly. No dirty connection.
Also added request_checksum_calculation='when_required' and response_checksum_validation='when_required' to suppress optional CRC32 headers that add response overhead.
TDD regression suite (T1–T9): Each test isolated one variable, ruling out: socket timeout mutation (T4), NullPool cycle count (T5, T7), event loop blocking (heartbeat diagnostic), open asyncpg transactions (T1–T3), conftest backend warm-up order (T6). T8 directly observed the slow put_object call and the defect warning. T9 is the canonical failing→passing regression test.
Result: phase 8 blob_put drops from 30.266s → 0.005s (6,000×). Total wire_push_stream for 1 commit: 30.5s → 0.2s. 9/9 tests pass.
Commit: sha256:8c0f8ca2509a
Phase 1 Complete ✅
Commit:
sha256:4cca779e27a6What landed
_commit_identity_bytes(wire_commit)— computes the canonical null-byte-separated payload whose sha256 equals the commit_id (identical formula tocompute_commit_idin muse core)wire_push_stream, before each bulk DB INSERT batch: parallelasyncio.gatherPUTs the identity blob to MinIO for every new commitstore_object/object_existsbut notputas AsyncMock, causing TypeError on gather; fixedTestT5ServiceStream.stub_sessionto patchAsyncSessionLocalso unit tests survive the session-close/reopen mid-streamWhat the blob contains (and why not more)
The blob at key=
commit_idstores only the identity fields: sorted parent hex IDs, snapshot_id hex, message, committed_at, author, signer_public_key — null-byte separated. Mutable annotations (agent_id, reviewed_by, test_runs, model_id, etc.) are DB-only. They cannot be part of a content-addressed blob because commit_id is derived before they exist, and allowing annotation updates to produce a new blob would make the key non-stable. The DB remains the source of truth for annotations; the blob proves DAG identity.Test
test_P1_commit_blobs_exist_in_object_store— written red first, now green. Pushes 5 commits, then callsbackend.exists(commit_id)for each; asserts all 5 blobs are present in MinIO.Up next: Phase 2 — snapshot manifests as content-addressed blobs