gabriel / musehub public
test_fetch_mpack_cleanup.py python
293 lines 10.5 KB
Raw
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor ⚠ breaking 20 days ago
1 """TDD — fetch mpack cleanup (issue #47).
2
3 After wire_fetch_mpack stores a mpack blob for a presigned GET, the blob is
4 a transient artifact and must be deleted after ttl_seconds.
5
6 Tests:
7 BC0 Presign path: backend.delete(mpack_id) is called after ttl_seconds.
8 BC2 Cleanup failure (backend.delete raises) does not propagate to the caller.
9 BC3 Each wire_fetch_mpack call schedules exactly one cleanup — not zero, not two.
10 """
11 from __future__ import annotations
12
13 import asyncio
14 import hashlib
15 import msgpack
16 import pytest
17 from datetime import datetime, timezone
18 from sqlalchemy.dialects.postgresql import insert as pg_insert
19 from sqlalchemy.ext.asyncio import AsyncSession
20 from unittest.mock import AsyncMock, call
21
22 from muse.core.types import blob_id, fake_id
23 from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitGraph, MusehubCommitRef, MusehubMPackIndex, MusehubObject, MusehubObjectRef, MusehubSnapshot, MusehubSnapshotRef
24 from tests.factories import create_repo
25
26
27 # ── helpers ───────────────────────────────────────────────────────────────────
28
29
30 def _now() -> datetime:
31 return datetime.now(tz=timezone.utc)
32
33
34 def _stub_backend(monkeypatch: pytest.MonkeyPatch, *, supports_presign: bool) -> tuple[dict, AsyncMock]:
35 store: dict[str, bytes] = {}
36
37 async def _put(oid: str, data: bytes, **_: typing.Any) -> str:
38 store[oid] = data
39 return f"mem://{oid}"
40
41 async def _get(oid: str) -> bytes | None:
42 return store.get(oid)
43
44 async def _exists(oid: str, **_: typing.Any) -> bool:
45 return oid in store
46
47 async def _presign_get(oid: str, ttl_seconds: int) -> str:
48 return f"https://minio.example.com/objects/{oid}?ttl={ttl_seconds}"
49
50 async def _get_mpack(mpack_id: str) -> bytes | None:
51 return store.get(mpack_id)
52
53 async def _put_mpack(mpack_id: str, data: bytes, **_: typing.Any) -> str:
54 store[mpack_id] = data
55 return f"mem://mpacks/{mpack_id}"
56
57 async def _presign_mpack_get(mpack_id: str, ttl_seconds: int) -> str:
58 return f"https://minio.example.com/mpacks/{mpack_id}?ttl={ttl_seconds}"
59
60 backend = AsyncMock()
61 backend.put = _put
62 backend.get = _get
63 backend.get_mpack = _get_mpack
64 backend.put_mpack = _put_mpack
65 backend.presign_mpack_get = _presign_mpack_get
66 backend.exists = _exists
67 backend.presign_get = _presign_get
68 backend.supports_presign = supports_presign
69 monkeypatch.setattr("musehub.services.musehub_wire.get_backend", lambda: backend)
70 monkeypatch.setattr("musehub.services.musehub_wire_fetch.get_backend", lambda: backend)
71 return store, backend
72
73
74 async def _store_object(
75 session: AsyncSession,
76 repo_id: str,
77 oid: str,
78 content: bytes,
79 store: dict[str, bytes],
80 ) -> None:
81 store[oid] = content
82 # Build a minimal mpack blob so get_mpack returns a valid msgpack dict.
83 mpack_blob = msgpack.packb(
84 {"objects": [{"object_id": oid, "content": content}]},
85 use_bin_type=True,
86 )
87 mpack_id = blob_id(mpack_blob)
88 store[mpack_id] = mpack_blob
89 await session.execute(
90 pg_insert(MusehubObject)
91 .values(
92 object_id=oid,
93 path="file.dat",
94 size_bytes=len(content),
95 storage_uri=f"mem://{oid}",
96 )
97 .on_conflict_do_nothing(index_elements=["object_id"])
98 )
99 await session.execute(
100 pg_insert(MusehubObjectRef)
101 .values(repo_id=repo_id, object_id=oid)
102 .on_conflict_do_nothing()
103 )
104 await session.execute(
105 pg_insert(MusehubMPackIndex)
106 .values(entity_id=oid, mpack_id=mpack_id, entity_type="object")
107 .on_conflict_do_nothing()
108 )
109 await session.commit()
110
111
112 async def _make_commit(
113 session: AsyncSession,
114 repo_id: str,
115 *,
116 manifest: dict[str, str],
117 seed: str = "c1",
118 parent_ids: list[str] | None = None,
119 ) -> tuple[MusehubCommit, MusehubSnapshot]:
120 from sqlalchemy import select
121
122 snap_id = fake_id(f"snap-{seed}")
123 snap = MusehubSnapshot(
124 snapshot_id=snap_id,
125 directories=[],
126 manifest_blob=msgpack.packb(manifest, use_bin_type=True),
127 entry_count=len(manifest),
128 created_at=_now(),
129 )
130 session.add(snap)
131 await session.execute(
132 pg_insert(MusehubSnapshotRef)
133 .values(repo_id=repo_id, snapshot_id=snap_id)
134 .on_conflict_do_nothing()
135 )
136 commit_id = fake_id(f"commit-{seed}")
137 commit = MusehubCommit(
138 commit_id=commit_id,
139 branch="main",
140 parent_ids=parent_ids or [],
141 message=f"commit {seed}",
142 author="gabriel",
143 timestamp=_now(),
144 snapshot_id=snap_id,
145 )
146 session.add(commit)
147 await session.execute(
148 pg_insert(MusehubCommitRef)
149 .values(repo_id=repo_id, commit_id=commit_id)
150 .on_conflict_do_nothing()
151 )
152 await session.execute(
153 pg_insert(MusehubCommitGraph)
154 .values(
155 commit_id=commit_id,
156 parent_ids=parent_ids or [],
157 generation=0,
158 snapshot_id=snap_id,
159 )
160 .on_conflict_do_nothing()
161 )
162 branch_q = await session.execute(
163 select(MusehubBranch).where(
164 MusehubBranch.repo_id == repo_id,
165 MusehubBranch.name == "main",
166 )
167 )
168 branch = branch_q.scalar_one_or_none()
169 if branch:
170 branch.head_commit_id = commit_id
171 await session.commit()
172 return commit, snap
173
174
175 # ══════════════════════════════════════════════════════════════════════════════
176 # BC0 — presign path: backend.delete(mpack_id) called after ttl_seconds
177 # ══════════════════════════════════════════════════════════════════════════════
178
179 @pytest.mark.asyncio
180 async def test_bc0_mpack_deleted_after_ttl(
181 db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch
182 ) -> None:
183 """On the presign path, backend.delete(mpack_id) must be called after ttl_seconds."""
184 from musehub.services.musehub_wire import wire_fetch_mpack
185
186 store, backend = _stub_backend(monkeypatch, supports_presign=True)
187 repo = await create_repo(db_session, owner="gabriel", visibility="public")
188
189 raw = b"content for cleanup test"
190 oid = blob_id(raw)
191 await _store_object(db_session, repo.repo_id, oid, raw, store)
192 commit, _ = await _make_commit(
193 db_session, repo.repo_id, manifest={"file.dat": oid}, seed="bc0"
194 )
195
196 ttl = 0.05 # 50 ms — fast enough to verify in a test
197 result = await wire_fetch_mpack(
198 db_session,
199 repo.repo_id,
200 want=[commit.commit_id],
201 have=[],
202 ttl_seconds=ttl,
203 )
204
205 assert "mpack_url" in result, "wire_fetch_mpack must return mpack_url"
206 mpack_id = result["mpack_id"]
207
208 # Cleanup has not happened yet (TTL not elapsed)
209 backend.delete.assert_not_called()
210
211 # Wait for TTL to elapse and the cleanup task to fire
212 await asyncio.sleep(ttl * 3)
213
214 backend.delete.assert_called_once_with(mpack_id)
215
216
217
218 # ══════════════════════════════════════════════════════════════════════════════
219 # BC2 — cleanup failure does not propagate to the caller
220 # ══════════════════════════════════════════════════════════════════════════════
221
222 @pytest.mark.asyncio
223 async def test_bc2_cleanup_failure_is_swallowed(
224 db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch
225 ) -> None:
226 """backend.delete raising must not crash the caller — cleanup is best-effort."""
227 from musehub.services.musehub_wire import wire_fetch_mpack
228
229 store, backend = _stub_backend(monkeypatch, supports_presign=True)
230 backend.delete = AsyncMock(side_effect=RuntimeError("MinIO unavailable"))
231
232 repo = await create_repo(db_session, owner="gabriel", visibility="public")
233
234 raw = b"error path content"
235 oid = blob_id(raw)
236 await _store_object(db_session, repo.repo_id, oid, raw, store)
237 commit, _ = await _make_commit(
238 db_session, repo.repo_id, manifest={"file.dat": oid}, seed="bc2"
239 )
240
241 ttl = 0.05
242 # wire_fetch_mpack itself must not raise
243 result = await wire_fetch_mpack(
244 db_session,
245 repo.repo_id,
246 want=[commit.commit_id],
247 have=[],
248 ttl_seconds=ttl,
249 )
250 assert "mpack_url" in result
251
252 # Wait for the cleanup task to run (and fail silently)
253 await asyncio.sleep(ttl * 3)
254
255 # delete was attempted (once), but the error was swallowed
256 backend.delete.assert_called_once()
257
258
259 # ══════════════════════════════════════════════════════════════════════════════
260 # BC3 — exactly one cleanup scheduled per call
261 # ══════════════════════════════════════════════════════════════════════════════
262
263 @pytest.mark.asyncio
264 async def test_bc3_exactly_one_cleanup_per_call(
265 db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch
266 ) -> None:
267 """Each wire_fetch_mpack invocation schedules exactly one delete — not zero, not two."""
268 from musehub.services.musehub_wire import wire_fetch_mpack
269
270 store, backend = _stub_backend(monkeypatch, supports_presign=True)
271 repo = await create_repo(db_session, owner="gabriel", visibility="public")
272
273 raw = b"count check content"
274 oid = blob_id(raw)
275 await _store_object(db_session, repo.repo_id, oid, raw, store)
276 commit, _ = await _make_commit(
277 db_session, repo.repo_id, manifest={"file.dat": oid}, seed="bc3"
278 )
279
280 ttl = 0.05
281 await wire_fetch_mpack(
282 db_session,
283 repo.repo_id,
284 want=[commit.commit_id],
285 have=[],
286 ttl_seconds=ttl,
287 )
288
289 await asyncio.sleep(ttl * 4)
290
291 assert backend.delete.call_count == 1, (
292 f"Expected exactly 1 delete call, got {backend.delete.call_count}"
293 )
File History 1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor 20 days ago