"""TDD — fetch mpack cleanup (issue #47). After wire_fetch_mpack stores a mpack blob for a presigned GET, the blob is a transient artifact and must be deleted after ttl_seconds. Tests: BC0 Presign path: backend.delete(mpack_id) is called after ttl_seconds. BC2 Cleanup failure (backend.delete raises) does not propagate to the caller. BC3 Each wire_fetch_mpack call schedules exactly one cleanup — not zero, not two. """ from __future__ import annotations import asyncio import hashlib import msgpack import pytest from datetime import datetime, timezone from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from unittest.mock import AsyncMock, call from muse.core.types import blob_id, fake_id from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitGraph, MusehubCommitRef, MusehubMPackIndex, MusehubObject, MusehubObjectRef, MusehubSnapshot, MusehubSnapshotRef from tests.factories import create_repo # ── helpers ─────────────────────────────────────────────────────────────────── def _now() -> datetime: return datetime.now(tz=timezone.utc) def _stub_backend(monkeypatch: pytest.MonkeyPatch, *, supports_presign: bool) -> tuple[dict, AsyncMock]: store: dict[str, bytes] = {} async def _put(oid: str, data: bytes, **_: typing.Any) -> str: store[oid] = data return f"mem://{oid}" async def _get(oid: str) -> bytes | None: return store.get(oid) async def _exists(oid: str, **_: typing.Any) -> bool: return oid in store async def _presign_get(oid: str, ttl_seconds: int) -> str: return f"https://minio.example.com/objects/{oid}?ttl={ttl_seconds}" async def _get_mpack(mpack_id: str) -> bytes | None: return store.get(mpack_id) async def _put_mpack(mpack_id: str, data: bytes, **_: typing.Any) -> str: store[mpack_id] = data return f"mem://mpacks/{mpack_id}" async def _presign_mpack_get(mpack_id: str, ttl_seconds: int) -> str: return f"https://minio.example.com/mpacks/{mpack_id}?ttl={ttl_seconds}" backend = AsyncMock() backend.put = _put backend.get = _get backend.get_mpack = _get_mpack backend.put_mpack = _put_mpack backend.presign_mpack_get = _presign_mpack_get backend.exists = _exists backend.presign_get = _presign_get backend.supports_presign = supports_presign monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) return store, backend async def _store_object( session: AsyncSession, repo_id: str, oid: str, content: bytes, store: dict[str, bytes], ) -> None: store[oid] = content # Build a minimal mpack blob so get_mpack returns a valid msgpack dict. mpack_blob = msgpack.packb( {"objects": [{"object_id": oid, "content": content}]}, use_bin_type=True, ) mpack_id = blob_id(mpack_blob) store[mpack_id] = mpack_blob await session.execute( pg_insert(MusehubObject) .values( object_id=oid, path="file.dat", size_bytes=len(content), storage_uri=f"mem://{oid}", ) .on_conflict_do_nothing(index_elements=["object_id"]) ) await session.execute( pg_insert(MusehubObjectRef) .values(repo_id=repo_id, object_id=oid) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubMPackIndex) .values(entity_id=oid, mpack_id=mpack_id, entity_type="object") .on_conflict_do_nothing() ) await session.commit() async def _make_commit( session: AsyncSession, repo_id: str, *, manifest: dict[str, str], seed: str = "c1", parent_ids: list[str] | None = None, ) -> tuple[MusehubCommit, MusehubSnapshot]: from sqlalchemy import select snap_id = fake_id(f"snap-{seed}") snap = MusehubSnapshot( snapshot_id=snap_id, directories=[], manifest_blob=msgpack.packb(manifest, use_bin_type=True), entry_count=len(manifest), created_at=_now(), ) session.add(snap) await session.execute( pg_insert(MusehubSnapshotRef) .values(repo_id=repo_id, snapshot_id=snap_id) .on_conflict_do_nothing() ) commit_id = fake_id(f"commit-{seed}") commit = MusehubCommit( commit_id=commit_id, branch="main", parent_ids=parent_ids or [], message=f"commit {seed}", author="gabriel", timestamp=_now(), snapshot_id=snap_id, ) session.add(commit) await session.execute( pg_insert(MusehubCommitRef) .values(repo_id=repo_id, commit_id=commit_id) .on_conflict_do_nothing() ) await session.execute( pg_insert(MusehubCommitGraph) .values( commit_id=commit_id, parent_ids=parent_ids or [], generation=0, snapshot_id=snap_id, ) .on_conflict_do_nothing() ) branch_q = await session.execute( select(MusehubBranch).where( MusehubBranch.repo_id == repo_id, MusehubBranch.name == "main", ) ) branch = branch_q.scalar_one_or_none() if branch: branch.head_commit_id = commit_id await session.commit() return commit, snap # ══════════════════════════════════════════════════════════════════════════════ # BC0 — presign path: backend.delete(mpack_id) called after ttl_seconds # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_bc0_mpack_deleted_after_ttl( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """On the presign path, backend.delete(mpack_id) must be called after ttl_seconds.""" from musehub.services.musehub_wire import wire_fetch_mpack store, backend = _stub_backend(monkeypatch, supports_presign=True) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw = b"content for cleanup test" oid = blob_id(raw) await _store_object(db_session, repo.repo_id, oid, raw, store) commit, _ = await _make_commit( db_session, repo.repo_id, manifest={"file.dat": oid}, seed="bc0" ) ttl = 0.05 # 50 ms — fast enough to verify in a test result = await wire_fetch_mpack( db_session, repo.repo_id, want=[commit.commit_id], have=[], ttl_seconds=ttl, ) assert "mpack_url" in result, "wire_fetch_mpack must return mpack_url" mpack_id = result["mpack_id"] # Cleanup has not happened yet (TTL not elapsed) backend.delete.assert_not_called() # Wait for TTL to elapse and the cleanup task to fire await asyncio.sleep(ttl * 3) backend.delete.assert_called_once_with(mpack_id) # ══════════════════════════════════════════════════════════════════════════════ # BC2 — cleanup failure does not propagate to the caller # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_bc2_cleanup_failure_is_swallowed( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """backend.delete raising must not crash the caller — cleanup is best-effort.""" from musehub.services.musehub_wire import wire_fetch_mpack store, backend = _stub_backend(monkeypatch, supports_presign=True) backend.delete = AsyncMock(side_effect=RuntimeError("MinIO unavailable")) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw = b"error path content" oid = blob_id(raw) await _store_object(db_session, repo.repo_id, oid, raw, store) commit, _ = await _make_commit( db_session, repo.repo_id, manifest={"file.dat": oid}, seed="bc2" ) ttl = 0.05 # wire_fetch_mpack itself must not raise result = await wire_fetch_mpack( db_session, repo.repo_id, want=[commit.commit_id], have=[], ttl_seconds=ttl, ) assert "mpack_url" in result # Wait for the cleanup task to run (and fail silently) await asyncio.sleep(ttl * 3) # delete was attempted (once), but the error was swallowed backend.delete.assert_called_once() # ══════════════════════════════════════════════════════════════════════════════ # BC3 — exactly one cleanup scheduled per call # ══════════════════════════════════════════════════════════════════════════════ @pytest.mark.asyncio async def test_bc3_exactly_one_cleanup_per_call( db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch ) -> None: """Each wire_fetch_mpack invocation schedules exactly one delete — not zero, not two.""" from musehub.services.musehub_wire import wire_fetch_mpack store, backend = _stub_backend(monkeypatch, supports_presign=True) repo = await create_repo(db_session, owner="gabriel", visibility="public") raw = b"count check content" oid = blob_id(raw) await _store_object(db_session, repo.repo_id, oid, raw, store) commit, _ = await _make_commit( db_session, repo.repo_id, manifest={"file.dat": oid}, seed="bc3" ) ttl = 0.05 await wire_fetch_mpack( db_session, repo.repo_id, want=[commit.commit_id], have=[], ttl_seconds=ttl, ) await asyncio.sleep(ttl * 4) assert backend.delete.call_count == 1, ( f"Expected exactly 1 delete call, got {backend.delete.call_count}" )