test_fetch_mpack_cleanup.py
python
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠ breaking
20 days ago
| 1 | """TDD — fetch mpack cleanup (issue #47). |
| 2 | |
| 3 | After wire_fetch_mpack stores a mpack blob for a presigned GET, the blob is |
| 4 | a transient artifact and must be deleted after ttl_seconds. |
| 5 | |
| 6 | Tests: |
| 7 | BC0 Presign path: backend.delete(mpack_id) is called after ttl_seconds. |
| 8 | BC2 Cleanup failure (backend.delete raises) does not propagate to the caller. |
| 9 | BC3 Each wire_fetch_mpack call schedules exactly one cleanup — not zero, not two. |
| 10 | """ |
| 11 | from __future__ import annotations |
| 12 | |
| 13 | import asyncio |
| 14 | import hashlib |
| 15 | import msgpack |
| 16 | import pytest |
| 17 | from datetime import datetime, timezone |
| 18 | from sqlalchemy.dialects.postgresql import insert as pg_insert |
| 19 | from sqlalchemy.ext.asyncio import AsyncSession |
| 20 | from unittest.mock import AsyncMock, call |
| 21 | |
| 22 | from muse.core.types import blob_id, fake_id |
| 23 | from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitGraph, MusehubCommitRef, MusehubMPackIndex, MusehubObject, MusehubObjectRef, MusehubSnapshot, MusehubSnapshotRef |
| 24 | from tests.factories import create_repo |
| 25 | |
| 26 | |
| 27 | # ── helpers ─────────────────────────────────────────────────────────────────── |
| 28 | |
| 29 | |
| 30 | def _now() -> datetime: |
| 31 | return datetime.now(tz=timezone.utc) |
| 32 | |
| 33 | |
| 34 | def _stub_backend(monkeypatch: pytest.MonkeyPatch, *, supports_presign: bool) -> tuple[dict, AsyncMock]: |
| 35 | store: dict[str, bytes] = {} |
| 36 | |
| 37 | async def _put(oid: str, data: bytes, **_: typing.Any) -> str: |
| 38 | store[oid] = data |
| 39 | return f"mem://{oid}" |
| 40 | |
| 41 | async def _get(oid: str) -> bytes | None: |
| 42 | return store.get(oid) |
| 43 | |
| 44 | async def _exists(oid: str, **_: typing.Any) -> bool: |
| 45 | return oid in store |
| 46 | |
| 47 | async def _presign_get(oid: str, ttl_seconds: int) -> str: |
| 48 | return f"https://minio.example.com/objects/{oid}?ttl={ttl_seconds}" |
| 49 | |
| 50 | async def _get_mpack(mpack_id: str) -> bytes | None: |
| 51 | return store.get(mpack_id) |
| 52 | |
| 53 | async def _put_mpack(mpack_id: str, data: bytes, **_: typing.Any) -> str: |
| 54 | store[mpack_id] = data |
| 55 | return f"mem://mpacks/{mpack_id}" |
| 56 | |
| 57 | async def _presign_mpack_get(mpack_id: str, ttl_seconds: int) -> str: |
| 58 | return f"https://minio.example.com/mpacks/{mpack_id}?ttl={ttl_seconds}" |
| 59 | |
| 60 | backend = AsyncMock() |
| 61 | backend.put = _put |
| 62 | backend.get = _get |
| 63 | backend.get_mpack = _get_mpack |
| 64 | backend.put_mpack = _put_mpack |
| 65 | backend.presign_mpack_get = _presign_mpack_get |
| 66 | backend.exists = _exists |
| 67 | backend.presign_get = _presign_get |
| 68 | backend.supports_presign = supports_presign |
| 69 | monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend) |
| 70 | monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend) |
| 71 | return store, backend |
| 72 | |
| 73 | |
| 74 | async def _store_object( |
| 75 | session: AsyncSession, |
| 76 | repo_id: str, |
| 77 | oid: str, |
| 78 | content: bytes, |
| 79 | store: dict[str, bytes], |
| 80 | ) -> None: |
| 81 | store[oid] = content |
| 82 | # Build a minimal mpack blob so get_mpack returns a valid msgpack dict. |
| 83 | mpack_blob = msgpack.packb( |
| 84 | {"objects": [{"object_id": oid, "content": content}]}, |
| 85 | use_bin_type=True, |
| 86 | ) |
| 87 | mpack_id = blob_id(mpack_blob) |
| 88 | store[mpack_id] = mpack_blob |
| 89 | await session.execute( |
| 90 | pg_insert(MusehubObject) |
| 91 | .values( |
| 92 | object_id=oid, |
| 93 | path="file.dat", |
| 94 | size_bytes=len(content), |
| 95 | storage_uri=f"mem://{oid}", |
| 96 | ) |
| 97 | .on_conflict_do_nothing(index_elements=["object_id"]) |
| 98 | ) |
| 99 | await session.execute( |
| 100 | pg_insert(MusehubObjectRef) |
| 101 | .values(repo_id=repo_id, object_id=oid) |
| 102 | .on_conflict_do_nothing() |
| 103 | ) |
| 104 | await session.execute( |
| 105 | pg_insert(MusehubMPackIndex) |
| 106 | .values(entity_id=oid, mpack_id=mpack_id, entity_type="object") |
| 107 | .on_conflict_do_nothing() |
| 108 | ) |
| 109 | await session.commit() |
| 110 | |
| 111 | |
| 112 | async def _make_commit( |
| 113 | session: AsyncSession, |
| 114 | repo_id: str, |
| 115 | *, |
| 116 | manifest: dict[str, str], |
| 117 | seed: str = "c1", |
| 118 | parent_ids: list[str] | None = None, |
| 119 | ) -> tuple[MusehubCommit, MusehubSnapshot]: |
| 120 | from sqlalchemy import select |
| 121 | |
| 122 | snap_id = fake_id(f"snap-{seed}") |
| 123 | snap = MusehubSnapshot( |
| 124 | snapshot_id=snap_id, |
| 125 | directories=[], |
| 126 | manifest_blob=msgpack.packb(manifest, use_bin_type=True), |
| 127 | entry_count=len(manifest), |
| 128 | created_at=_now(), |
| 129 | ) |
| 130 | session.add(snap) |
| 131 | await session.execute( |
| 132 | pg_insert(MusehubSnapshotRef) |
| 133 | .values(repo_id=repo_id, snapshot_id=snap_id) |
| 134 | .on_conflict_do_nothing() |
| 135 | ) |
| 136 | commit_id = fake_id(f"commit-{seed}") |
| 137 | commit = MusehubCommit( |
| 138 | commit_id=commit_id, |
| 139 | branch="main", |
| 140 | parent_ids=parent_ids or [], |
| 141 | message=f"commit {seed}", |
| 142 | author="gabriel", |
| 143 | timestamp=_now(), |
| 144 | snapshot_id=snap_id, |
| 145 | ) |
| 146 | session.add(commit) |
| 147 | await session.execute( |
| 148 | pg_insert(MusehubCommitRef) |
| 149 | .values(repo_id=repo_id, commit_id=commit_id) |
| 150 | .on_conflict_do_nothing() |
| 151 | ) |
| 152 | await session.execute( |
| 153 | pg_insert(MusehubCommitGraph) |
| 154 | .values( |
| 155 | commit_id=commit_id, |
| 156 | parent_ids=parent_ids or [], |
| 157 | generation=0, |
| 158 | snapshot_id=snap_id, |
| 159 | ) |
| 160 | .on_conflict_do_nothing() |
| 161 | ) |
| 162 | branch_q = await session.execute( |
| 163 | select(MusehubBranch).where( |
| 164 | MusehubBranch.repo_id == repo_id, |
| 165 | MusehubBranch.name == "main", |
| 166 | ) |
| 167 | ) |
| 168 | branch = branch_q.scalar_one_or_none() |
| 169 | if branch: |
| 170 | branch.head_commit_id = commit_id |
| 171 | await session.commit() |
| 172 | return commit, snap |
| 173 | |
| 174 | |
| 175 | # ══════════════════════════════════════════════════════════════════════════════ |
| 176 | # BC0 — presign path: backend.delete(mpack_id) called after ttl_seconds |
| 177 | # ══════════════════════════════════════════════════════════════════════════════ |
| 178 | |
| 179 | @pytest.mark.asyncio |
| 180 | async def test_bc0_mpack_deleted_after_ttl( |
| 181 | db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch |
| 182 | ) -> None: |
| 183 | """On the presign path, backend.delete(mpack_id) must be called after ttl_seconds.""" |
| 184 | from musehub.services.musehub_wire import wire_fetch_mpack |
| 185 | |
| 186 | store, backend = _stub_backend(monkeypatch, supports_presign=True) |
| 187 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 188 | |
| 189 | raw = b"content for cleanup test" |
| 190 | oid = blob_id(raw) |
| 191 | await _store_object(db_session, repo.repo_id, oid, raw, store) |
| 192 | commit, _ = await _make_commit( |
| 193 | db_session, repo.repo_id, manifest={"file.dat": oid}, seed="bc0" |
| 194 | ) |
| 195 | |
| 196 | ttl = 0.05 # 50 ms — fast enough to verify in a test |
| 197 | result = await wire_fetch_mpack( |
| 198 | db_session, |
| 199 | repo.repo_id, |
| 200 | want=[commit.commit_id], |
| 201 | have=[], |
| 202 | ttl_seconds=ttl, |
| 203 | ) |
| 204 | |
| 205 | assert "mpack_url" in result, "wire_fetch_mpack must return mpack_url" |
| 206 | mpack_id = result["mpack_id"] |
| 207 | |
| 208 | # Cleanup has not happened yet (TTL not elapsed) |
| 209 | backend.delete.assert_not_called() |
| 210 | |
| 211 | # Wait for TTL to elapse and the cleanup task to fire |
| 212 | await asyncio.sleep(ttl * 3) |
| 213 | |
| 214 | backend.delete.assert_called_once_with(mpack_id) |
| 215 | |
| 216 | |
| 217 | |
| 218 | # ══════════════════════════════════════════════════════════════════════════════ |
| 219 | # BC2 — cleanup failure does not propagate to the caller |
| 220 | # ══════════════════════════════════════════════════════════════════════════════ |
| 221 | |
| 222 | @pytest.mark.asyncio |
| 223 | async def test_bc2_cleanup_failure_is_swallowed( |
| 224 | db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch |
| 225 | ) -> None: |
| 226 | """backend.delete raising must not crash the caller — cleanup is best-effort.""" |
| 227 | from musehub.services.musehub_wire import wire_fetch_mpack |
| 228 | |
| 229 | store, backend = _stub_backend(monkeypatch, supports_presign=True) |
| 230 | backend.delete = AsyncMock(side_effect=RuntimeError("MinIO unavailable")) |
| 231 | |
| 232 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 233 | |
| 234 | raw = b"error path content" |
| 235 | oid = blob_id(raw) |
| 236 | await _store_object(db_session, repo.repo_id, oid, raw, store) |
| 237 | commit, _ = await _make_commit( |
| 238 | db_session, repo.repo_id, manifest={"file.dat": oid}, seed="bc2" |
| 239 | ) |
| 240 | |
| 241 | ttl = 0.05 |
| 242 | # wire_fetch_mpack itself must not raise |
| 243 | result = await wire_fetch_mpack( |
| 244 | db_session, |
| 245 | repo.repo_id, |
| 246 | want=[commit.commit_id], |
| 247 | have=[], |
| 248 | ttl_seconds=ttl, |
| 249 | ) |
| 250 | assert "mpack_url" in result |
| 251 | |
| 252 | # Wait for the cleanup task to run (and fail silently) |
| 253 | await asyncio.sleep(ttl * 3) |
| 254 | |
| 255 | # delete was attempted (once), but the error was swallowed |
| 256 | backend.delete.assert_called_once() |
| 257 | |
| 258 | |
| 259 | # ══════════════════════════════════════════════════════════════════════════════ |
| 260 | # BC3 — exactly one cleanup scheduled per call |
| 261 | # ══════════════════════════════════════════════════════════════════════════════ |
| 262 | |
| 263 | @pytest.mark.asyncio |
| 264 | async def test_bc3_exactly_one_cleanup_per_call( |
| 265 | db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch |
| 266 | ) -> None: |
| 267 | """Each wire_fetch_mpack invocation schedules exactly one delete — not zero, not two.""" |
| 268 | from musehub.services.musehub_wire import wire_fetch_mpack |
| 269 | |
| 270 | store, backend = _stub_backend(monkeypatch, supports_presign=True) |
| 271 | repo = await create_repo(db_session, owner="gabriel", visibility="public") |
| 272 | |
| 273 | raw = b"count check content" |
| 274 | oid = blob_id(raw) |
| 275 | await _store_object(db_session, repo.repo_id, oid, raw, store) |
| 276 | commit, _ = await _make_commit( |
| 277 | db_session, repo.repo_id, manifest={"file.dat": oid}, seed="bc3" |
| 278 | ) |
| 279 | |
| 280 | ttl = 0.05 |
| 281 | await wire_fetch_mpack( |
| 282 | db_session, |
| 283 | repo.repo_id, |
| 284 | want=[commit.commit_id], |
| 285 | have=[], |
| 286 | ttl_seconds=ttl, |
| 287 | ) |
| 288 | |
| 289 | await asyncio.sleep(ttl * 4) |
| 290 | |
| 291 | assert backend.delete.call_count == 1, ( |
| 292 | f"Expected exactly 1 delete call, got {backend.delete.call_count}" |
| 293 | ) |
File History
1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠
20 days ago