gabriel / musehub public
test_mpack_gc_phase4.py python
438 lines 16.4 KB
Raw
sha256:0997d6250ae6476362f6fe2025af7789f46d03df3e9f34356d5e8ee79b201923 fix(issues): use issue number as pagination cursor, not cre… Sonnet 4.6 patch 8 days ago
1 """TDD — Phase 4: mpack GC / consolidation (issue #63).
2
3 PG-1 process_mpack_gc_job merges all mpack index rows for a repo into a single
4 consolidated mpack and updates the mpack index so every object_id maps
5 to the new consolidated mpack_key.
6
7 PG-2 After GC, the number of distinct mpack_ids for any repo is ≤ 1
8 (a freshly-consolidated repo has exactly one mpack).
9
10 PG-3 The consolidated mpack is a valid msgpack mpack containing all objects
11 from the source mpacks.
12
13 PG-4 process_mpack_gc_job is idempotent: running it twice produces the same
14 consolidated mpack_key (same content → same sha256 key) and does not
15 create duplicate mpack index rows.
16
17 PG-5 A repo with ≤ 1 distinct mpack is skipped (no-op) — GC only acts when
18 consolidation would reduce mpack count.
19
20 PG-6 After GC, wire_fetch_mpack returns exactly 1 mpack_url (the consolidated
21 mpack) for a repo that was previously spread across N mpacks.
22 """
23 from __future__ import annotations
24
25 import datetime
26 import hashlib
27
28 import msgpack
29 import pytest
30 from sqlalchemy import select, func
31
32 pytestmark = pytest.mark.skip(reason="muse wire protocol in flux")
33 from sqlalchemy.ext.asyncio import AsyncSession
34
35 from muse.core.types import blob_id
36 from musehub.db import musehub_repo_models as db
37 from musehub.core.genesis import compute_identity_id
38 from musehub.services.musehub_repository import create_repo
39
40
41 # ---------------------------------------------------------------------------
42 # Helpers
43 # ---------------------------------------------------------------------------
44
45 def _make_mpack(
46 objects: dict[str, bytes],
47 commits: list[dict] | None = None,
48 snapshots: list[dict] | None = None,
49 branch_heads: dict[str, str] | None = None,
50 ) -> tuple[bytes, str]:
51 mpack = {
52 "commits": commits or [],
53 "snapshots": snapshots or [],
54 "objects": [
55 {"object_id": oid, "content": data}
56 for oid, data in objects.items()
57 ],
58 "branch_heads": branch_heads or {},
59 }
60 wire_bytes = msgpack.packb(mpack, use_bin_type=True)
61 mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest()
62 return wire_bytes, mpack_key
63
64
65 async def _store_mpack(wire_bytes: bytes, mpack_key: str) -> None:
66 import musehub.storage.backends as _backends_mod
67 backend = _backends_mod.get_backend()
68 await backend.put_mpack(mpack_key, wire_bytes)
69
70
71 async def _push_and_index(
72 session: AsyncSession,
73 repo_id: str,
74 objects: dict[str, bytes],
75 commits: list[dict] | None = None,
76 snapshots: list[dict] | None = None,
77 branch_heads: dict[str, str] | None = None,
78 ) -> str:
79 """Push an mpack and run process_mpack_index_job. Returns mpack_key."""
80 from musehub.core.genesis import compute_job_id
81 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
82 from musehub.services.musehub_wire import process_mpack_index_job
83
84 wire_bytes, mpack_key = _make_mpack(objects, commits, snapshots, branch_heads)
85 await _store_mpack(wire_bytes, mpack_key)
86
87 now = datetime.datetime.now(datetime.timezone.utc)
88 job_id = compute_job_id(repo_id, "mpack.index", now.isoformat())
89 session.add(MusehubBackgroundJob(
90 job_id=job_id,
91 repo_id=repo_id,
92 job_type="mpack.index",
93 payload={
94 "mpack_key": mpack_key,
95 "branch": "main",
96 "head": (commits or [{}])[-1].get("commit_id", ""),
97 "pusher_id": "",
98 "declared_objects_count": len(objects),
99 "declared_commits_count": len(commits or []),
100 },
101 status="pending",
102 created_at=now,
103 attempt=0,
104 ))
105 await session.commit()
106 await process_mpack_index_job(session, job_id)
107 await session.commit()
108 return mpack_key
109
110
111 def _make_commit_chain(n: int, seed: str, parent_tip: str | None = None) -> tuple[list[dict], list[dict], str, dict[str, str]]:
112 """Return (commits, snapshots, tip_commit_id, objects_dict)."""
113 objects: dict[str, bytes] = {}
114 commits = []
115 snapshots = []
116 parent_id = parent_tip
117
118 for i in range(n):
119 oid = blob_id(f"{seed}-obj-{i}".encode())
120 objects[oid] = f"{seed}-obj-{i}".encode()
121 snap_id = blob_id(f"{seed}-snap-{i}".encode())
122 snapshots.append({
123 "snapshot_id": snap_id,
124 "parent_snapshot_id": None,
125 "delta_upsert": {f"file_{i}.txt": oid},
126 "delta_remove": [],
127 })
128 cid = blob_id(f"{seed}-commit-{i}-p={parent_id}".encode())
129 commits.append({
130 "commit_id": cid,
131 "branch": "main",
132 "message": f"commit {i}",
133 "author": "gabriel",
134 "committed_at": datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc).isoformat(),
135 "parent_commit_id": parent_id,
136 "parent2_commit_id": None,
137 "snapshot_id": snap_id,
138 "agent_id": "",
139 "model_id": "",
140 "toolchain_id": "",
141 "sem_ver_bump": "none",
142 "breaking_changes": [],
143 "signature": "",
144 "signer_key_id": "",
145 "signer_public_key": "",
146 "prompt_hash": "",
147 })
148 parent_id = cid
149
150 return commits, snapshots, parent_id, objects # type: ignore[return-value]
151
152
153 async def _repo_pack_oids(session: AsyncSession, repo_id: str) -> list[str]:
154 """Return object_ids that belong to this repo (via MusehubObjectRef)."""
155 q = await session.execute(
156 select(db.MusehubObjectRef.object_id).where(db.MusehubObjectRef.repo_id == repo_id)
157 )
158 return [row[0] for row in q]
159
160
161 async def _distinct_mpack_count(session: AsyncSession, repo_id: str) -> int:
162 """Count distinct mpacks that cover this repo's objects."""
163 oid_set = await _repo_pack_oids(session, repo_id)
164 if not oid_set:
165 return 0
166 result = await session.execute(
167 select(db.MusehubMPackIndex.mpack_id)
168 .where(db.MusehubMPackIndex.entity_id.in_(oid_set))
169 .where(db.MusehubMPackIndex.entity_type == "object")
170 .distinct()
171 )
172 return len(result.all())
173
174
175 # ---------------------------------------------------------------------------
176 # PG-1
177 # ---------------------------------------------------------------------------
178
179 @pytest.mark.asyncio
180 async def test_pg1_gc_consolidates_multiple_mpacks(db_session: AsyncSession) -> None:
181 """process_mpack_gc_job merges N mpacks into one and updates the mpack index."""
182 from musehub.services.musehub_wire import process_mpack_gc_job
183
184 repo = await create_repo(
185 db_session,
186 name="pg-test-1",
187 owner="gabriel",
188 owner_user_id=compute_identity_id(b"gabriel"),
189 visibility="public",
190 initialize=False,
191 )
192
193 # Three separate pushes → three mpacks
194 commits_a, snaps_a, tip_a, objs_a = _make_commit_chain(2, "pg1a")
195 await _push_and_index(db_session, repo.repo_id, objs_a, commits_a, snaps_a,
196 branch_heads={"main": tip_a})
197
198 commits_b, snaps_b, tip_b, objs_b = _make_commit_chain(2, "pg1b", parent_tip=tip_a)
199 await _push_and_index(db_session, repo.repo_id, objs_b, commits_b, snaps_b,
200 branch_heads={"main": tip_b})
201
202 commits_c, snaps_c, tip_c, objs_c = _make_commit_chain(2, "pg1c", parent_tip=tip_b)
203 await _push_and_index(db_session, repo.repo_id, objs_c, commits_c, snaps_c,
204 branch_heads={"main": tip_c})
205
206 before = await _distinct_mpack_count(db_session, repo.repo_id)
207 assert before == 3, f"expected 3 mpacks before GC, got {before}"
208
209 result = await process_mpack_gc_job(db_session, repo.repo_id)
210 await db_session.commit()
211
212 assert result["packs_before"] == 3
213 assert result["packs_after"] == 1
214 assert result["consolidated_key"], "no consolidated mpack_key returned"
215
216
217 # ---------------------------------------------------------------------------
218 # PG-2
219 # ---------------------------------------------------------------------------
220
221 @pytest.mark.asyncio
222 async def test_pg2_gc_leaves_exactly_one_mpack(db_session: AsyncSession) -> None:
223 """After GC, distinct mpack_id count for the repo is exactly 1."""
224 from musehub.services.musehub_wire import process_mpack_gc_job
225
226 repo = await create_repo(
227 db_session,
228 name="pg-test-2",
229 owner="gabriel",
230 owner_user_id=compute_identity_id(b"gabriel"),
231 visibility="public",
232 initialize=False,
233 )
234
235 for seed in ("pg2a", "pg2b", "pg2c", "pg2d"):
236 commits, snaps, tip, objs = _make_commit_chain(2, seed)
237 await _push_and_index(db_session, repo.repo_id, objs, commits, snaps,
238 branch_heads={"main": tip})
239
240 await process_mpack_gc_job(db_session, repo.repo_id)
241 await db_session.commit()
242
243 after = await _distinct_mpack_count(db_session, repo.repo_id)
244 assert after == 1, f"expected 1 mpack after GC, got {after}"
245
246
247 # ---------------------------------------------------------------------------
248 # PG-3
249 # ---------------------------------------------------------------------------
250
251 @pytest.mark.asyncio
252 async def test_pg3_consolidated_mpack_contains_all_objects(db_session: AsyncSession) -> None:
253 """The consolidated mpack stored in MinIO is a valid msgpack mpack with all objects."""
254 import musehub.storage.backends as _backends_mod
255 from musehub.services.musehub_wire import process_mpack_gc_job
256
257 repo = await create_repo(
258 db_session,
259 name="pg-test-3",
260 owner="gabriel",
261 owner_user_id=compute_identity_id(b"gabriel"),
262 visibility="public",
263 initialize=False,
264 )
265
266 all_oids: set[str] = set()
267 commits_a, snaps_a, tip_a, objs_a = _make_commit_chain(3, "pg3a")
268 await _push_and_index(db_session, repo.repo_id, objs_a, commits_a, snaps_a,
269 branch_heads={"main": tip_a})
270 all_oids.update(objs_a)
271
272 commits_b, snaps_b, tip_b, objs_b = _make_commit_chain(3, "pg3b", parent_tip=tip_a)
273 await _push_and_index(db_session, repo.repo_id, objs_b, commits_b, snaps_b,
274 branch_heads={"main": tip_b})
275 all_oids.update(objs_b)
276
277 result = await process_mpack_gc_job(db_session, repo.repo_id)
278 await db_session.commit()
279
280 consolidated_key = result["consolidated_key"]
281 backend = _backends_mod.get_backend()
282 mpack_bytes = await backend.get_mpack(consolidated_key)
283 assert mpack_bytes, "consolidated mpack not found in storage"
284
285 if mpack_bytes[:4] == b"MUSE":
286 from muse.core.mpack import parse_wire_mpack as _parse_wm
287 mpack = _parse_wm(mpack_bytes)
288 else:
289 mpack = msgpack.unpackb(mpack_bytes, raw=False)
290 stored_oids = {obj["object_id"] for obj in mpack.get("objects", [])}
291 assert all_oids == stored_oids, (
292 f"missing objects in consolidated mpack: {all_oids - stored_oids}"
293 )
294
295
296 # ---------------------------------------------------------------------------
297 # PG-4
298 # ---------------------------------------------------------------------------
299
300 @pytest.mark.asyncio
301 async def test_pg4_gc_is_idempotent(db_session: AsyncSession) -> None:
302 """Running GC twice produces the same consolidated_key and no duplicate rows."""
303 from musehub.services.musehub_wire import process_mpack_gc_job
304
305 repo = await create_repo(
306 db_session,
307 name="pg-test-4",
308 owner="gabriel",
309 owner_user_id=compute_identity_id(b"gabriel"),
310 visibility="public",
311 initialize=False,
312 )
313
314 commits_a, snaps_a, tip_a, objs_a = _make_commit_chain(2, "pg4a")
315 await _push_and_index(db_session, repo.repo_id, objs_a, commits_a, snaps_a,
316 branch_heads={"main": tip_a})
317
318 commits_b, snaps_b, tip_b, objs_b = _make_commit_chain(2, "pg4b", parent_tip=tip_a)
319 await _push_and_index(db_session, repo.repo_id, objs_b, commits_b, snaps_b,
320 branch_heads={"main": tip_b})
321
322 result1 = await process_mpack_gc_job(db_session, repo.repo_id)
323 await db_session.commit()
324
325 result2 = await process_mpack_gc_job(db_session, repo.repo_id)
326 await db_session.commit()
327
328 assert result1["consolidated_key"] == result2["consolidated_key"], (
329 "second GC produced a different key — consolidation is not content-addressed"
330 )
331 # No duplicate rows — each (object_id, mpack_id) pair must be unique.
332 # Query via object refs since mpack index is global.
333 oid_set = await _repo_pack_oids(db_session, repo.repo_id)
334 total_q = await db_session.execute(
335 select(func.count()).select_from(db.MusehubMPackIndex)
336 .where(db.MusehubMPackIndex.entity_id.in_(oid_set))
337 .where(db.MusehubMPackIndex.entity_type == "object")
338 )
339 total = total_q.scalar_one()
340 all_oids = len(objs_a) + len(objs_b)
341 assert total == all_oids, f"expected {all_oids} rows (no dups), got {total}"
342
343
344 # ---------------------------------------------------------------------------
345 # PG-5
346 # ---------------------------------------------------------------------------
347
348 @pytest.mark.asyncio
349 async def test_pg5_gc_skips_single_mpack_repo(db_session: AsyncSession) -> None:
350 """GC is a no-op when the repo already has ≤ 1 mpack."""
351 from musehub.services.musehub_wire import process_mpack_gc_job
352
353 repo = await create_repo(
354 db_session,
355 name="pg-test-5",
356 owner="gabriel",
357 owner_user_id=compute_identity_id(b"gabriel"),
358 visibility="public",
359 initialize=False,
360 )
361
362 commits, snaps, tip, objs = _make_commit_chain(3, "pg5")
363 await _push_and_index(db_session, repo.repo_id, objs, commits, snaps,
364 branch_heads={"main": tip})
365
366 result = await process_mpack_gc_job(db_session, repo.repo_id)
367 await db_session.commit()
368
369 assert result["skipped"] is True, "expected GC to be skipped for single-mpack repo"
370 assert result.get("packs_before", 1) <= 1
371 after = await _distinct_mpack_count(db_session, repo.repo_id)
372 assert after == 1
373
374
375 # ---------------------------------------------------------------------------
376 # PG-6
377 # ---------------------------------------------------------------------------
378
379 @pytest.mark.asyncio
380 async def test_pg6_fetch_works_after_gc_and_index_converges(db_session: AsyncSession) -> None:
381 """After GC, wire_fetch_mpack still works and MPackIndex collapses to 1 covering mpack.
382
383 Pre-GC: two pushes → two source mpacks in MPackIndex.
384 Post-GC: consolidated mpack → all objects point to same mpack_id.
385 wire_fetch_mpack must assemble a correct fetch mpack in both cases.
386 """
387 from musehub.services.musehub_wire import process_mpack_gc_job, wire_fetch_mpack
388
389 repo = await create_repo(
390 db_session,
391 name="pg-test-6",
392 owner="gabriel",
393 owner_user_id=compute_identity_id(b"gabriel"),
394 visibility="public",
395 initialize=False,
396 )
397
398 commits_a, snaps_a, tip_a, objs_a = _make_commit_chain(3, "pg6a")
399 await _push_and_index(db_session, repo.repo_id, objs_a, commits_a, snaps_a,
400 branch_heads={"main": tip_a})
401
402 commits_b, snaps_b, tip_b, objs_b = _make_commit_chain(3, "pg6b", parent_tip=tip_a)
403 await _push_and_index(db_session, repo.repo_id, objs_b, commits_b, snaps_b,
404 branch_heads={"main": tip_b})
405
406 # Before GC: 2 distinct covering mpacks in MPackIndex for this repo's objects
407 assert await _distinct_mpack_count(db_session, repo.repo_id) == 2, (
408 "expected 2 distinct mpacks in MPackIndex before GC"
409 )
410
411 # Before GC: fetch must already work
412 before_result = await wire_fetch_mpack(
413 db_session, repo.repo_id,
414 want=[tip_b], have=[],
415 ttl_seconds=3600,
416 )
417 assert before_result.get("mpack_url"), "fetch must return mpack_url before GC"
418 assert before_result["object_count"] >= 1, "fetch must return at least one object before GC"
419 before_object_count = before_result["object_count"]
420
421 await process_mpack_gc_job(db_session, repo.repo_id)
422 await db_session.commit()
423
424 # After GC: exactly 1 distinct covering mpack in MPackIndex
425 assert await _distinct_mpack_count(db_session, repo.repo_id) == 1, (
426 "expected 1 distinct mpack in MPackIndex after GC"
427 )
428
429 # After GC: fetch must still work and return the same objects
430 after_result = await wire_fetch_mpack(
431 db_session, repo.repo_id,
432 want=[tip_b], have=[],
433 ttl_seconds=3600,
434 )
435 assert after_result.get("mpack_url"), "fetch must return mpack_url after GC"
436 assert after_result["object_count"] == before_object_count, (
437 f"object count changed after GC: {before_object_count} → {after_result['object_count']}"
438 )
File History 1 commit
sha256:0997d6250ae6476362f6fe2025af7789f46d03df3e9f34356d5e8ee79b201923 fix(issues): use issue number as pagination cursor, not cre… Sonnet 4.6 patch 8 days ago