Stream path security parity: bring push/stream defenses to the level of bundle/presign
Problem
The presign/bundle path and the stream path are two entry points for the same operation (push), but only the bundle path received the full security treatment from issues #49 (now closed). A pusher who stays under the 500-object threshold — or deliberately crafts pushes to stay there — bypasses every control added in phases 1–4 of #49.
This is not hypothetical. The stream path is the future micropayment-capable path and will see deliberate use. It must be hardened to parity before it sees production traffic.
Gaps (confirmed by code audit)
| Gap | Bundle path | Stream path |
|---|---|---|
| Blocked-hash check (DMCA / NCMEC) | ✅ checked before any write | ❌ absent |
| Daily byte limit (rate limiting) | ✅ enforced at presign | ❌ not tracked |
| Cumulative decompressed size / zip-bomb gate | ✅ 4 GB total, quarantine on breach | ❌ only per-object wire cap (50 MB) |
content.scan job enqueue |
✅ one job per object after indexing | ❌ absent |
| Quarantine DB record on abuse | ✅ job row → status=quarantined |
❌ ERROR frame only, no DB record |
Implementation plan
Phase 1 — Blocked-hash check (critical, implement first)
Invariant: no object whose object_id appears in musehub_blocked_hashes may be written to storage or the DB via the stream path.
Where: inside _flush_batch in wire_push_stream, after the bulk DB-presence check and before any MinIO PUT.
Behaviour:
- Bulk-query
musehub_blocked_hashesfor allincoming_idsin the batch (one extra query per flush, same pattern as bundle path). - If any match: yield ERROR frame (403 body), record a
musehub_push_anomalyrow flagging the attempt, return without writing. - If no match: continue as today.
Tests (tests/test_stream_security_phase1.py):
- Blocked object in a single-batch stream → ERROR frame, nothing written to DB or MinIO.
- Blocked object in a multi-batch stream (arrives in batch 2) → ERROR frame, partial batch 1 writes do NOT roll back (content-addressing makes partial writes safe; the commit is never advanced).
- Clean stream bypasses check without regression.
- Idempotent: same blocked object pushed twice yields ERROR both times.
Phase 2 — Daily byte limit (rate limiting parity)
Invariant: the per-user daily byte counter (musehub_daily_push_bytes) is incremented for pushes via the stream path, and the pre-check fires before the stream is accepted.
Where:
- Pre-check: at the start of
wire_push_stream, after auth, before entering the frame loop. Read today's total frommusehub_daily_push_bytesand yield ERROR (429-equivalent) if already at or overbundle_daily_upload_limit_bytes. - Accounting: at the END of a successful push (after branch pointer is advanced), call
record_bundle_bytes_uploadedwith the total raw bytes received.
Note: checking at stream-open time rather than at a presign step means the limit is advisory for streams that start just under the threshold and push a lot of data. This is acceptable — streams are small by definition (< 500 objects / < 50 MB). An exact byte-level gate would require buffering the entire stream first, which defeats the point of streaming.
Tests (tests/test_stream_security_phase2.py):
- Stream push increments
musehub_daily_push_bytesby the correct byte count. - User at the daily limit is rejected with an ERROR frame at stream open (before any frame is read).
- User under the limit completes successfully.
- Per-user isolation: one user exhausted does not block another.
Phase 3 — Cumulative decompressed size gate (zip-bomb defense)
Invariant: the sum of len(raw_content) across all objects in a single stream may not exceed bundle_max_decompressed_bytes (currently 4 GB). Breaching it terminates the stream with an ERROR frame.
Where: running accumulator inside the frame loop, incremented after each successful decompression (same position as the bundle path's _total_decompressed counter).
Behaviour:
- On breach: yield ERROR frame (413), return. No quarantine record needed (stream abuse is stateless — the connection drops).
- The per-object wire cap (50 MB compressed) stays; this adds a cumulative cap on top.
Tests (tests/test_stream_security_phase3.py):
- Stream with one object whose decompressed size exceeds the cumulative cap alone → ERROR.
- Stream where no single object exceeds the cap but the total does → ERROR on the frame that crosses the threshold.
- Stream well under the cap → no regression.
- Cap is read from
settings.bundle_max_decompressed_bytes(configurable, not hardcoded).
Phase 4 — content.scan job enqueue (content moderation parity)
Invariant: every object successfully written to storage via the stream path gets a content.scan background job, exactly as the bundle path does.
Where: after _flush_batch returns stored objects, session.add one MusehubBackgroundJob(job_type="content.scan", ...)\ per newly-stored object. Skipped (already-present) objects do not get a duplicate scan job.
Tests (tests/test_stream_security_phase4.py):
- N new objects pushed via stream → N
content.scanjobs enqueued. - M objects skipped (already present) → 0 additional scan jobs for skipped objects.
- Zero objects pushed (all skipped) → 0 scan jobs.
Phase 5 — Quarantine DB record on stream abuse (observability parity)
Invariant: when the stream path rejects a push for a security reason (blocked hash, byte limit, zip bomb), a structured record is written to the DB so operators can investigate. An ERROR frame is not enough — it disappears with the connection.
New table (or reuse musehub_push_anomaly): musehub_stream_rejection with columns:
rejection_id(sha256 genesis)identity_idrepo_idreason(enum:blocked_hash|daily_limit|zip_bomb|other)detail(text, e.g. the blocked object_id)rejected_at
Tests (tests/test_stream_security_phase5.py):
- Blocked-hash rejection →
musehub_stream_rejectionrow withreason=blocked_hash. - Daily-limit rejection → row with
reason=daily_limit. - Zip-bomb rejection → row with
reason=zip_bomb. - Rejection row is written even if the session has a partial transaction (no silent swallow).
Acceptance criteria (all phases)
- All existing stream-path tests continue to pass.
- Each phase ships with its own TDD test file; tests are written first, confirmed failing, then implemented to green.
- No phase may be merged if it breaks the
push/streamhappy path. - After phase 1, a DMCA-blocked object cannot enter storage via any push path.
- After phase 2, the daily byte counter reflects both presign and stream pushes.
- After phase 3, no single stream can decompress more than
bundle_max_decompressed_bytestotal.
Order
Phases 1 → 2 → 3 → 4 → 5. Phase 1 is the critical one — ship it first.
Phase 2 complete. Daily byte limit + accounting in stream path. wire_push_stream now accepts identity_id; pre-check at stream open yields ERROR (429) if today's total >= bundle_daily_upload_limit_bytes; _stream_bytes_received accumulator records bytes on successful completion via record_bundle_bytes_uploaded. 4/4 tests green (test_stream_security_phase2.py), 70/70 regression. Commit sha256:76875e73746b. Phase 3 (zip-bomb gate) is next.
Phase 3 complete. Cumulative decompressed size gate (zip-bomb defense). _cumulative_decompressed accumulator added to wire_push_stream; checked after each O/OC object decode against settings.bundle_max_decompressed_bytes; yields ERROR (413) and returns on breach. Both O-frame paths (inline and OC-reassembled) are guarded. Cap is read from settings, not hardcoded. 4/4 tests green (test_stream_security_phase3.py), 74/74 regression. Commit sha256:d2f7b0fb06ed. Phase 4 (content.scan job enqueue) is next.
Phase 4 complete. content.scan job enqueue in stream path. Inside _flush_batch, after bulk INSERT and upsert_refs, one MusehubBackgroundJob(job_type='content.scan', status='pending') is created per newly-stored object. Skipped objects are excluded. Mirrors bundle.index exactly. 4/4 tests green (test_stream_security_phase4.py), 78/78 regression. Commit sha256:7f6873325a17. Phase 5 (quarantine DB record on stream abuse) is next.
Phase 5 complete. All 5 phases done — issue #51 fully resolved.
Phase 5 adds musehub_stream_rejections table (MusehubStreamRejection model: rejection_id, repo_id, identity_id, reason, detail, rejected_at) and a _record_rejection helper inside wire_push_stream. Called at every security rejection site before yielding the ERROR frame: blocked_hash (3 sites), daily_limit (1 site), zip_bomb (2 sites). Partial-commit safety verified: rejection row is written even when batch 1 already committed before the blocked object arrives in batch 2.
4/4 tests green (test_stream_security_phase5.py), 82/82 regression. Commit sha256:d1db89210c1d.
Stream path now has full security parity with the bundle/presign path.
Phase 1 complete ✓
Blocked-hash check in stream path — implemented and green.
What was added
_flush_batchcall sites wrapped withtry/except _BlockedHashError→ yields_err(str(be), 403)and returns without advancing the branch pointerTests (test_stream_security_phase1.py — 5/5 green)
Regression
65/65
test_wire_push_stream.pytests still pass.Commit: sha256:2fe28bd663b287cadba8cd2cb34c6aae89eaee726e4849295cf0f7986be9d57c
Phase 2 (daily byte limit on stream path) is next.