test_mpack_gc_phase4.py
python
sha256:0997d6250ae6476362f6fe2025af7789f46d03df3e9f34356d5e8ee79b201923
fix(issues): use issue number as pagination cursor, not cre…
Sonnet 4.6
patch
8 days ago
| 1 | """TDD — Phase 4: mpack GC / consolidation (issue #63). |
| 2 | |
| 3 | PG-1 process_mpack_gc_job merges all mpack index rows for a repo into a single |
| 4 | consolidated mpack and updates the mpack index so every object_id maps |
| 5 | to the new consolidated mpack_key. |
| 6 | |
| 7 | PG-2 After GC, the number of distinct mpack_ids for any repo is ≤ 1 |
| 8 | (a freshly-consolidated repo has exactly one mpack). |
| 9 | |
| 10 | PG-3 The consolidated mpack is a valid msgpack mpack containing all objects |
| 11 | from the source mpacks. |
| 12 | |
| 13 | PG-4 process_mpack_gc_job is idempotent: running it twice produces the same |
| 14 | consolidated mpack_key (same content → same sha256 key) and does not |
| 15 | create duplicate mpack index rows. |
| 16 | |
| 17 | PG-5 A repo with ≤ 1 distinct mpack is skipped (no-op) — GC only acts when |
| 18 | consolidation would reduce mpack count. |
| 19 | |
| 20 | PG-6 After GC, wire_fetch_mpack returns exactly 1 mpack_url (the consolidated |
| 21 | mpack) for a repo that was previously spread across N mpacks. |
| 22 | """ |
| 23 | from __future__ import annotations |
| 24 | |
| 25 | import datetime |
| 26 | import hashlib |
| 27 | |
| 28 | import msgpack |
| 29 | import pytest |
| 30 | from sqlalchemy import select, func |
| 31 | |
| 32 | pytestmark = pytest.mark.skip(reason="muse wire protocol in flux") |
| 33 | from sqlalchemy.ext.asyncio import AsyncSession |
| 34 | |
| 35 | from muse.core.types import blob_id |
| 36 | from musehub.db import musehub_repo_models as db |
| 37 | from musehub.core.genesis import compute_identity_id |
| 38 | from musehub.services.musehub_repository import create_repo |
| 39 | |
| 40 | |
| 41 | # --------------------------------------------------------------------------- |
| 42 | # Helpers |
| 43 | # --------------------------------------------------------------------------- |
| 44 | |
| 45 | def _make_mpack( |
| 46 | objects: dict[str, bytes], |
| 47 | commits: list[dict] | None = None, |
| 48 | snapshots: list[dict] | None = None, |
| 49 | branch_heads: dict[str, str] | None = None, |
| 50 | ) -> tuple[bytes, str]: |
| 51 | mpack = { |
| 52 | "commits": commits or [], |
| 53 | "snapshots": snapshots or [], |
| 54 | "objects": [ |
| 55 | {"object_id": oid, "content": data} |
| 56 | for oid, data in objects.items() |
| 57 | ], |
| 58 | "branch_heads": branch_heads or {}, |
| 59 | } |
| 60 | wire_bytes = msgpack.packb(mpack, use_bin_type=True) |
| 61 | mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() |
| 62 | return wire_bytes, mpack_key |
| 63 | |
| 64 | |
| 65 | async def _store_mpack(wire_bytes: bytes, mpack_key: str) -> None: |
| 66 | import musehub.storage.backends as _backends_mod |
| 67 | backend = _backends_mod.get_backend() |
| 68 | await backend.put_mpack(mpack_key, wire_bytes) |
| 69 | |
| 70 | |
| 71 | async def _push_and_index( |
| 72 | session: AsyncSession, |
| 73 | repo_id: str, |
| 74 | objects: dict[str, bytes], |
| 75 | commits: list[dict] | None = None, |
| 76 | snapshots: list[dict] | None = None, |
| 77 | branch_heads: dict[str, str] | None = None, |
| 78 | ) -> str: |
| 79 | """Push an mpack and run process_mpack_index_job. Returns mpack_key.""" |
| 80 | from musehub.core.genesis import compute_job_id |
| 81 | from musehub.db.musehub_jobs_models import MusehubBackgroundJob |
| 82 | from musehub.services.musehub_wire import process_mpack_index_job |
| 83 | |
| 84 | wire_bytes, mpack_key = _make_mpack(objects, commits, snapshots, branch_heads) |
| 85 | await _store_mpack(wire_bytes, mpack_key) |
| 86 | |
| 87 | now = datetime.datetime.now(datetime.timezone.utc) |
| 88 | job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) |
| 89 | session.add(MusehubBackgroundJob( |
| 90 | job_id=job_id, |
| 91 | repo_id=repo_id, |
| 92 | job_type="mpack.index", |
| 93 | payload={ |
| 94 | "mpack_key": mpack_key, |
| 95 | "branch": "main", |
| 96 | "head": (commits or [{}])[-1].get("commit_id", ""), |
| 97 | "pusher_id": "", |
| 98 | "declared_objects_count": len(objects), |
| 99 | "declared_commits_count": len(commits or []), |
| 100 | }, |
| 101 | status="pending", |
| 102 | created_at=now, |
| 103 | attempt=0, |
| 104 | )) |
| 105 | await session.commit() |
| 106 | await process_mpack_index_job(session, job_id) |
| 107 | await session.commit() |
| 108 | return mpack_key |
| 109 | |
| 110 | |
| 111 | def _make_commit_chain(n: int, seed: str, parent_tip: str | None = None) -> tuple[list[dict], list[dict], str, dict[str, str]]: |
| 112 | """Return (commits, snapshots, tip_commit_id, objects_dict).""" |
| 113 | objects: dict[str, bytes] = {} |
| 114 | commits = [] |
| 115 | snapshots = [] |
| 116 | parent_id = parent_tip |
| 117 | |
| 118 | for i in range(n): |
| 119 | oid = blob_id(f"{seed}-obj-{i}".encode()) |
| 120 | objects[oid] = f"{seed}-obj-{i}".encode() |
| 121 | snap_id = blob_id(f"{seed}-snap-{i}".encode()) |
| 122 | snapshots.append({ |
| 123 | "snapshot_id": snap_id, |
| 124 | "parent_snapshot_id": None, |
| 125 | "delta_upsert": {f"file_{i}.txt": oid}, |
| 126 | "delta_remove": [], |
| 127 | }) |
| 128 | cid = blob_id(f"{seed}-commit-{i}-p={parent_id}".encode()) |
| 129 | commits.append({ |
| 130 | "commit_id": cid, |
| 131 | "branch": "main", |
| 132 | "message": f"commit {i}", |
| 133 | "author": "gabriel", |
| 134 | "committed_at": datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc).isoformat(), |
| 135 | "parent_commit_id": parent_id, |
| 136 | "parent2_commit_id": None, |
| 137 | "snapshot_id": snap_id, |
| 138 | "agent_id": "", |
| 139 | "model_id": "", |
| 140 | "toolchain_id": "", |
| 141 | "sem_ver_bump": "none", |
| 142 | "breaking_changes": [], |
| 143 | "signature": "", |
| 144 | "signer_key_id": "", |
| 145 | "signer_public_key": "", |
| 146 | "prompt_hash": "", |
| 147 | }) |
| 148 | parent_id = cid |
| 149 | |
| 150 | return commits, snapshots, parent_id, objects # type: ignore[return-value] |
| 151 | |
| 152 | |
| 153 | async def _repo_pack_oids(session: AsyncSession, repo_id: str) -> list[str]: |
| 154 | """Return object_ids that belong to this repo (via MusehubObjectRef).""" |
| 155 | q = await session.execute( |
| 156 | select(db.MusehubObjectRef.object_id).where(db.MusehubObjectRef.repo_id == repo_id) |
| 157 | ) |
| 158 | return [row[0] for row in q] |
| 159 | |
| 160 | |
| 161 | async def _distinct_mpack_count(session: AsyncSession, repo_id: str) -> int: |
| 162 | """Count distinct mpacks that cover this repo's objects.""" |
| 163 | oid_set = await _repo_pack_oids(session, repo_id) |
| 164 | if not oid_set: |
| 165 | return 0 |
| 166 | result = await session.execute( |
| 167 | select(db.MusehubMPackIndex.mpack_id) |
| 168 | .where(db.MusehubMPackIndex.entity_id.in_(oid_set)) |
| 169 | .where(db.MusehubMPackIndex.entity_type == "object") |
| 170 | .distinct() |
| 171 | ) |
| 172 | return len(result.all()) |
| 173 | |
| 174 | |
| 175 | # --------------------------------------------------------------------------- |
| 176 | # PG-1 |
| 177 | # --------------------------------------------------------------------------- |
| 178 | |
| 179 | @pytest.mark.asyncio |
| 180 | async def test_pg1_gc_consolidates_multiple_mpacks(db_session: AsyncSession) -> None: |
| 181 | """process_mpack_gc_job merges N mpacks into one and updates the mpack index.""" |
| 182 | from musehub.services.musehub_wire import process_mpack_gc_job |
| 183 | |
| 184 | repo = await create_repo( |
| 185 | db_session, |
| 186 | name="pg-test-1", |
| 187 | owner="gabriel", |
| 188 | owner_user_id=compute_identity_id(b"gabriel"), |
| 189 | visibility="public", |
| 190 | initialize=False, |
| 191 | ) |
| 192 | |
| 193 | # Three separate pushes → three mpacks |
| 194 | commits_a, snaps_a, tip_a, objs_a = _make_commit_chain(2, "pg1a") |
| 195 | await _push_and_index(db_session, repo.repo_id, objs_a, commits_a, snaps_a, |
| 196 | branch_heads={"main": tip_a}) |
| 197 | |
| 198 | commits_b, snaps_b, tip_b, objs_b = _make_commit_chain(2, "pg1b", parent_tip=tip_a) |
| 199 | await _push_and_index(db_session, repo.repo_id, objs_b, commits_b, snaps_b, |
| 200 | branch_heads={"main": tip_b}) |
| 201 | |
| 202 | commits_c, snaps_c, tip_c, objs_c = _make_commit_chain(2, "pg1c", parent_tip=tip_b) |
| 203 | await _push_and_index(db_session, repo.repo_id, objs_c, commits_c, snaps_c, |
| 204 | branch_heads={"main": tip_c}) |
| 205 | |
| 206 | before = await _distinct_mpack_count(db_session, repo.repo_id) |
| 207 | assert before == 3, f"expected 3 mpacks before GC, got {before}" |
| 208 | |
| 209 | result = await process_mpack_gc_job(db_session, repo.repo_id) |
| 210 | await db_session.commit() |
| 211 | |
| 212 | assert result["packs_before"] == 3 |
| 213 | assert result["packs_after"] == 1 |
| 214 | assert result["consolidated_key"], "no consolidated mpack_key returned" |
| 215 | |
| 216 | |
| 217 | # --------------------------------------------------------------------------- |
| 218 | # PG-2 |
| 219 | # --------------------------------------------------------------------------- |
| 220 | |
| 221 | @pytest.mark.asyncio |
| 222 | async def test_pg2_gc_leaves_exactly_one_mpack(db_session: AsyncSession) -> None: |
| 223 | """After GC, distinct mpack_id count for the repo is exactly 1.""" |
| 224 | from musehub.services.musehub_wire import process_mpack_gc_job |
| 225 | |
| 226 | repo = await create_repo( |
| 227 | db_session, |
| 228 | name="pg-test-2", |
| 229 | owner="gabriel", |
| 230 | owner_user_id=compute_identity_id(b"gabriel"), |
| 231 | visibility="public", |
| 232 | initialize=False, |
| 233 | ) |
| 234 | |
| 235 | for seed in ("pg2a", "pg2b", "pg2c", "pg2d"): |
| 236 | commits, snaps, tip, objs = _make_commit_chain(2, seed) |
| 237 | await _push_and_index(db_session, repo.repo_id, objs, commits, snaps, |
| 238 | branch_heads={"main": tip}) |
| 239 | |
| 240 | await process_mpack_gc_job(db_session, repo.repo_id) |
| 241 | await db_session.commit() |
| 242 | |
| 243 | after = await _distinct_mpack_count(db_session, repo.repo_id) |
| 244 | assert after == 1, f"expected 1 mpack after GC, got {after}" |
| 245 | |
| 246 | |
| 247 | # --------------------------------------------------------------------------- |
| 248 | # PG-3 |
| 249 | # --------------------------------------------------------------------------- |
| 250 | |
| 251 | @pytest.mark.asyncio |
| 252 | async def test_pg3_consolidated_mpack_contains_all_objects(db_session: AsyncSession) -> None: |
| 253 | """The consolidated mpack stored in MinIO is a valid msgpack mpack with all objects.""" |
| 254 | import musehub.storage.backends as _backends_mod |
| 255 | from musehub.services.musehub_wire import process_mpack_gc_job |
| 256 | |
| 257 | repo = await create_repo( |
| 258 | db_session, |
| 259 | name="pg-test-3", |
| 260 | owner="gabriel", |
| 261 | owner_user_id=compute_identity_id(b"gabriel"), |
| 262 | visibility="public", |
| 263 | initialize=False, |
| 264 | ) |
| 265 | |
| 266 | all_oids: set[str] = set() |
| 267 | commits_a, snaps_a, tip_a, objs_a = _make_commit_chain(3, "pg3a") |
| 268 | await _push_and_index(db_session, repo.repo_id, objs_a, commits_a, snaps_a, |
| 269 | branch_heads={"main": tip_a}) |
| 270 | all_oids.update(objs_a) |
| 271 | |
| 272 | commits_b, snaps_b, tip_b, objs_b = _make_commit_chain(3, "pg3b", parent_tip=tip_a) |
| 273 | await _push_and_index(db_session, repo.repo_id, objs_b, commits_b, snaps_b, |
| 274 | branch_heads={"main": tip_b}) |
| 275 | all_oids.update(objs_b) |
| 276 | |
| 277 | result = await process_mpack_gc_job(db_session, repo.repo_id) |
| 278 | await db_session.commit() |
| 279 | |
| 280 | consolidated_key = result["consolidated_key"] |
| 281 | backend = _backends_mod.get_backend() |
| 282 | mpack_bytes = await backend.get_mpack(consolidated_key) |
| 283 | assert mpack_bytes, "consolidated mpack not found in storage" |
| 284 | |
| 285 | if mpack_bytes[:4] == b"MUSE": |
| 286 | from muse.core.mpack import parse_wire_mpack as _parse_wm |
| 287 | mpack = _parse_wm(mpack_bytes) |
| 288 | else: |
| 289 | mpack = msgpack.unpackb(mpack_bytes, raw=False) |
| 290 | stored_oids = {obj["object_id"] for obj in mpack.get("objects", [])} |
| 291 | assert all_oids == stored_oids, ( |
| 292 | f"missing objects in consolidated mpack: {all_oids - stored_oids}" |
| 293 | ) |
| 294 | |
| 295 | |
| 296 | # --------------------------------------------------------------------------- |
| 297 | # PG-4 |
| 298 | # --------------------------------------------------------------------------- |
| 299 | |
| 300 | @pytest.mark.asyncio |
| 301 | async def test_pg4_gc_is_idempotent(db_session: AsyncSession) -> None: |
| 302 | """Running GC twice produces the same consolidated_key and no duplicate rows.""" |
| 303 | from musehub.services.musehub_wire import process_mpack_gc_job |
| 304 | |
| 305 | repo = await create_repo( |
| 306 | db_session, |
| 307 | name="pg-test-4", |
| 308 | owner="gabriel", |
| 309 | owner_user_id=compute_identity_id(b"gabriel"), |
| 310 | visibility="public", |
| 311 | initialize=False, |
| 312 | ) |
| 313 | |
| 314 | commits_a, snaps_a, tip_a, objs_a = _make_commit_chain(2, "pg4a") |
| 315 | await _push_and_index(db_session, repo.repo_id, objs_a, commits_a, snaps_a, |
| 316 | branch_heads={"main": tip_a}) |
| 317 | |
| 318 | commits_b, snaps_b, tip_b, objs_b = _make_commit_chain(2, "pg4b", parent_tip=tip_a) |
| 319 | await _push_and_index(db_session, repo.repo_id, objs_b, commits_b, snaps_b, |
| 320 | branch_heads={"main": tip_b}) |
| 321 | |
| 322 | result1 = await process_mpack_gc_job(db_session, repo.repo_id) |
| 323 | await db_session.commit() |
| 324 | |
| 325 | result2 = await process_mpack_gc_job(db_session, repo.repo_id) |
| 326 | await db_session.commit() |
| 327 | |
| 328 | assert result1["consolidated_key"] == result2["consolidated_key"], ( |
| 329 | "second GC produced a different key — consolidation is not content-addressed" |
| 330 | ) |
| 331 | # No duplicate rows — each (object_id, mpack_id) pair must be unique. |
| 332 | # Query via object refs since mpack index is global. |
| 333 | oid_set = await _repo_pack_oids(db_session, repo.repo_id) |
| 334 | total_q = await db_session.execute( |
| 335 | select(func.count()).select_from(db.MusehubMPackIndex) |
| 336 | .where(db.MusehubMPackIndex.entity_id.in_(oid_set)) |
| 337 | .where(db.MusehubMPackIndex.entity_type == "object") |
| 338 | ) |
| 339 | total = total_q.scalar_one() |
| 340 | all_oids = len(objs_a) + len(objs_b) |
| 341 | assert total == all_oids, f"expected {all_oids} rows (no dups), got {total}" |
| 342 | |
| 343 | |
| 344 | # --------------------------------------------------------------------------- |
| 345 | # PG-5 |
| 346 | # --------------------------------------------------------------------------- |
| 347 | |
| 348 | @pytest.mark.asyncio |
| 349 | async def test_pg5_gc_skips_single_mpack_repo(db_session: AsyncSession) -> None: |
| 350 | """GC is a no-op when the repo already has ≤ 1 mpack.""" |
| 351 | from musehub.services.musehub_wire import process_mpack_gc_job |
| 352 | |
| 353 | repo = await create_repo( |
| 354 | db_session, |
| 355 | name="pg-test-5", |
| 356 | owner="gabriel", |
| 357 | owner_user_id=compute_identity_id(b"gabriel"), |
| 358 | visibility="public", |
| 359 | initialize=False, |
| 360 | ) |
| 361 | |
| 362 | commits, snaps, tip, objs = _make_commit_chain(3, "pg5") |
| 363 | await _push_and_index(db_session, repo.repo_id, objs, commits, snaps, |
| 364 | branch_heads={"main": tip}) |
| 365 | |
| 366 | result = await process_mpack_gc_job(db_session, repo.repo_id) |
| 367 | await db_session.commit() |
| 368 | |
| 369 | assert result["skipped"] is True, "expected GC to be skipped for single-mpack repo" |
| 370 | assert result.get("packs_before", 1) <= 1 |
| 371 | after = await _distinct_mpack_count(db_session, repo.repo_id) |
| 372 | assert after == 1 |
| 373 | |
| 374 | |
| 375 | # --------------------------------------------------------------------------- |
| 376 | # PG-6 |
| 377 | # --------------------------------------------------------------------------- |
| 378 | |
| 379 | @pytest.mark.asyncio |
| 380 | async def test_pg6_fetch_works_after_gc_and_index_converges(db_session: AsyncSession) -> None: |
| 381 | """After GC, wire_fetch_mpack still works and MPackIndex collapses to 1 covering mpack. |
| 382 | |
| 383 | Pre-GC: two pushes → two source mpacks in MPackIndex. |
| 384 | Post-GC: consolidated mpack → all objects point to same mpack_id. |
| 385 | wire_fetch_mpack must assemble a correct fetch mpack in both cases. |
| 386 | """ |
| 387 | from musehub.services.musehub_wire import process_mpack_gc_job, wire_fetch_mpack |
| 388 | |
| 389 | repo = await create_repo( |
| 390 | db_session, |
| 391 | name="pg-test-6", |
| 392 | owner="gabriel", |
| 393 | owner_user_id=compute_identity_id(b"gabriel"), |
| 394 | visibility="public", |
| 395 | initialize=False, |
| 396 | ) |
| 397 | |
| 398 | commits_a, snaps_a, tip_a, objs_a = _make_commit_chain(3, "pg6a") |
| 399 | await _push_and_index(db_session, repo.repo_id, objs_a, commits_a, snaps_a, |
| 400 | branch_heads={"main": tip_a}) |
| 401 | |
| 402 | commits_b, snaps_b, tip_b, objs_b = _make_commit_chain(3, "pg6b", parent_tip=tip_a) |
| 403 | await _push_and_index(db_session, repo.repo_id, objs_b, commits_b, snaps_b, |
| 404 | branch_heads={"main": tip_b}) |
| 405 | |
| 406 | # Before GC: 2 distinct covering mpacks in MPackIndex for this repo's objects |
| 407 | assert await _distinct_mpack_count(db_session, repo.repo_id) == 2, ( |
| 408 | "expected 2 distinct mpacks in MPackIndex before GC" |
| 409 | ) |
| 410 | |
| 411 | # Before GC: fetch must already work |
| 412 | before_result = await wire_fetch_mpack( |
| 413 | db_session, repo.repo_id, |
| 414 | want=[tip_b], have=[], |
| 415 | ttl_seconds=3600, |
| 416 | ) |
| 417 | assert before_result.get("mpack_url"), "fetch must return mpack_url before GC" |
| 418 | assert before_result["object_count"] >= 1, "fetch must return at least one object before GC" |
| 419 | before_object_count = before_result["object_count"] |
| 420 | |
| 421 | await process_mpack_gc_job(db_session, repo.repo_id) |
| 422 | await db_session.commit() |
| 423 | |
| 424 | # After GC: exactly 1 distinct covering mpack in MPackIndex |
| 425 | assert await _distinct_mpack_count(db_session, repo.repo_id) == 1, ( |
| 426 | "expected 1 distinct mpack in MPackIndex after GC" |
| 427 | ) |
| 428 | |
| 429 | # After GC: fetch must still work and return the same objects |
| 430 | after_result = await wire_fetch_mpack( |
| 431 | db_session, repo.repo_id, |
| 432 | want=[tip_b], have=[], |
| 433 | ttl_seconds=3600, |
| 434 | ) |
| 435 | assert after_result.get("mpack_url"), "fetch must return mpack_url after GC" |
| 436 | assert after_result["object_count"] == before_object_count, ( |
| 437 | f"object count changed after GC: {before_object_count} → {after_result['object_count']}" |
| 438 | ) |
File History
1 commit
sha256:0997d6250ae6476362f6fe2025af7789f46d03df3e9f34356d5e8ee79b201923
fix(issues): use issue number as pagination cursor, not cre…
Sonnet 4.6
patch
8 days ago