test_mpack_index_phase1.py
python
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠ breaking
20 days ago
| 1 | """TDD — Phase 1: mpack index written for every pushed object (issue #63). |
| 2 | |
| 3 | PI-1 After process_mpack_index_job, every object in the mpack has a row |
| 4 | in musehub_mpack_index with the correct mpack_id (= mpack_key). |
| 5 | PI-2 The mpack_id stored matches the mpack_key exactly. |
| 6 | PI-3 A second push of different objects adds new rows; existing rows are |
| 7 | untouched (on_conflict_do_nothing). |
| 8 | PI-4 Objects from different repos are distinct in the global index |
| 9 | (content-addressed objects are globally unique by object_id). |
| 10 | |
| 11 | NOTE: process_mpack_index_job was removed — indexing is now inline during |
| 12 | push (musehub_wire_push.py step 7d). These tests will be revisited once |
| 13 | the MVP wire protocol and pack index are fully wired in. |
| 14 | """ |
| 15 | from __future__ import annotations |
| 16 | |
| 17 | import datetime |
| 18 | import hashlib |
| 19 | from collections.abc import Mapping |
| 20 | |
| 21 | import msgpack |
| 22 | import pytest |
| 23 | |
| 24 | pytestmark = pytest.mark.skip(reason="process_mpack_index_job removed — revisit with MWP pack index wiring") |
| 25 | |
| 26 | from sqlalchemy import select |
| 27 | from sqlalchemy.ext.asyncio import AsyncSession |
| 28 | |
| 29 | from muse.core.types import blob_id |
| 30 | from musehub.db import musehub_repo_models as db |
| 31 | from musehub.core.genesis import compute_identity_id |
| 32 | from musehub.services.musehub_repository import create_repo |
| 33 | |
| 34 | |
| 35 | # --------------------------------------------------------------------------- |
| 36 | # Helpers |
| 37 | # --------------------------------------------------------------------------- |
| 38 | |
| 39 | def _make_mpack(objects: Mapping[str, bytes]) -> tuple[bytes, str]: |
| 40 | """Build a minimal MPack and return (wire_bytes, mpack_key).""" |
| 41 | mpack = { |
| 42 | "commits": [], |
| 43 | "snapshots": [], |
| 44 | "objects": [ |
| 45 | {"object_id": oid, "content": data} |
| 46 | for oid, data in objects.items() |
| 47 | ], |
| 48 | "branch_heads": {}, |
| 49 | } |
| 50 | wire_bytes = msgpack.packb(mpack, use_bin_type=True) |
| 51 | mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() |
| 52 | return wire_bytes, mpack_key |
| 53 | |
| 54 | |
| 55 | async def _store_mpack(wire_bytes: bytes, mpack_key: str) -> None: |
| 56 | """Write mpack bytes to storage so process_mpack_index_job can fetch it.""" |
| 57 | import musehub.storage.backends as _backends_mod |
| 58 | backend = _backends_mod.get_backend() |
| 59 | await backend.put_mpack(mpack_key, wire_bytes) |
| 60 | |
| 61 | |
| 62 | async def _enqueue_and_process( |
| 63 | session: AsyncSession, |
| 64 | repo_id: str, |
| 65 | mpack_key: str, |
| 66 | n_objects: int, |
| 67 | ) -> str: |
| 68 | """Enqueue a mpack.index job and run it synchronously. Returns job_id.""" |
| 69 | from musehub.core.genesis import compute_job_id |
| 70 | from musehub.db.musehub_jobs_models import MusehubBackgroundJob |
| 71 | from musehub.services.musehub_wire import process_mpack_index_job |
| 72 | |
| 73 | now = datetime.datetime.now(datetime.timezone.utc) |
| 74 | job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) |
| 75 | session.add(MusehubBackgroundJob( |
| 76 | job_id=job_id, |
| 77 | repo_id=repo_id, |
| 78 | job_type="mpack.index", |
| 79 | payload={ |
| 80 | "mpack_key": mpack_key, |
| 81 | "branch": "main", |
| 82 | "head": "", |
| 83 | "pusher_id": "", |
| 84 | "declared_objects_count": n_objects, |
| 85 | "declared_commits_count": 0, |
| 86 | }, |
| 87 | status="pending", |
| 88 | created_at=now, |
| 89 | attempt=0, |
| 90 | )) |
| 91 | await session.commit() |
| 92 | await process_mpack_index_job(session, job_id) |
| 93 | await session.commit() |
| 94 | return job_id |
| 95 | |
| 96 | |
| 97 | # --------------------------------------------------------------------------- |
| 98 | # PI-1 |
| 99 | # --------------------------------------------------------------------------- |
| 100 | |
| 101 | @pytest.mark.asyncio |
| 102 | async def test_pi1_mpack_index_written_for_every_object(db_session: AsyncSession) -> None: |
| 103 | """Every object in the mpack must have a row in musehub_mpack_index.""" |
| 104 | repo = await create_repo( |
| 105 | db_session, |
| 106 | name="pi-test-1", |
| 107 | owner="gabriel", |
| 108 | owner_user_id=compute_identity_id(b"gabriel"), |
| 109 | visibility="public", |
| 110 | initialize=False, |
| 111 | ) |
| 112 | |
| 113 | objects = {blob_id(f"pi1-obj-{i}".encode()): f"pi1-obj-{i}".encode() for i in range(5)} |
| 114 | wire_bytes, mpack_key = _make_mpack(objects) |
| 115 | await _store_mpack(wire_bytes, mpack_key) |
| 116 | await _enqueue_and_process(db_session, repo.repo_id, mpack_key, len(objects)) |
| 117 | |
| 118 | rows_q = await db_session.execute( |
| 119 | select(db.MusehubMPackIndex).where( |
| 120 | db.MusehubMPackIndex.entity_id.in_(list(objects.keys())) |
| 121 | ) |
| 122 | ) |
| 123 | rows = rows_q.scalars().all() |
| 124 | indexed_oids = {r.entity_id for r in rows} |
| 125 | |
| 126 | assert indexed_oids == set(objects.keys()), ( |
| 127 | f"expected {len(objects)} mpack index rows, got {len(rows)}\n" |
| 128 | f"missing: {set(objects.keys()) - indexed_oids}" |
| 129 | ) |
| 130 | |
| 131 | |
| 132 | # --------------------------------------------------------------------------- |
| 133 | # PI-2 |
| 134 | # --------------------------------------------------------------------------- |
| 135 | |
| 136 | @pytest.mark.asyncio |
| 137 | async def test_pi2_mpack_id_matches_mpack_key(db_session: AsyncSession) -> None: |
| 138 | """The mpack_id stored in musehub_mpack_index must equal the mpack_key.""" |
| 139 | repo = await create_repo( |
| 140 | db_session, |
| 141 | name="pi-test-2", |
| 142 | owner="gabriel", |
| 143 | owner_user_id=compute_identity_id(b"gabriel"), |
| 144 | visibility="public", |
| 145 | initialize=False, |
| 146 | ) |
| 147 | |
| 148 | objects = {blob_id(b"pi2-only-obj"): b"pi2-only-obj"} |
| 149 | wire_bytes, mpack_key = _make_mpack(objects) |
| 150 | await _store_mpack(wire_bytes, mpack_key) |
| 151 | await _enqueue_and_process(db_session, repo.repo_id, mpack_key, len(objects)) |
| 152 | |
| 153 | row = await db_session.get(db.MusehubMPackIndex, (list(objects.keys())[0], mpack_key)) |
| 154 | assert row is not None, "mpack index row not found" |
| 155 | assert row.mpack_id == mpack_key, f"mpack_id {row.mpack_id!r} != mpack_key {mpack_key!r}" |
| 156 | |
| 157 | |
| 158 | # --------------------------------------------------------------------------- |
| 159 | # PI-3 |
| 160 | # --------------------------------------------------------------------------- |
| 161 | |
| 162 | @pytest.mark.asyncio |
| 163 | async def test_pi3_second_push_adds_rows_without_overwriting(db_session: AsyncSession) -> None: |
| 164 | """A second push of different objects adds new rows; first push rows survive.""" |
| 165 | repo = await create_repo( |
| 166 | db_session, |
| 167 | name="pi-test-3", |
| 168 | owner="gabriel", |
| 169 | owner_user_id=compute_identity_id(b"gabriel"), |
| 170 | visibility="public", |
| 171 | initialize=False, |
| 172 | ) |
| 173 | |
| 174 | objs_a = {blob_id(f"pi3-a-{i}".encode()): f"pi3-a-{i}".encode() for i in range(3)} |
| 175 | wire_a, key_a = _make_mpack(objs_a) |
| 176 | await _store_mpack(wire_a, key_a) |
| 177 | await _enqueue_and_process(db_session, repo.repo_id, key_a, len(objs_a)) |
| 178 | |
| 179 | objs_b = {blob_id(f"pi3-b-{i}".encode()): f"pi3-b-{i}".encode() for i in range(3)} |
| 180 | wire_b, key_b = _make_mpack(objs_b) |
| 181 | await _store_mpack(wire_b, key_b) |
| 182 | await _enqueue_and_process(db_session, repo.repo_id, key_b, len(objs_b)) |
| 183 | |
| 184 | all_oids = set(objs_a.keys()) | set(objs_b.keys()) |
| 185 | rows_q = await db_session.execute( |
| 186 | select(db.MusehubMPackIndex).where(db.MusehubMPackIndex.entity_id.in_(list(all_oids))) |
| 187 | ) |
| 188 | rows = rows_q.scalars().all() |
| 189 | indexed_oids = {r.entity_id for r in rows} |
| 190 | indexed_mpacks = {r.mpack_id for r in rows} |
| 191 | |
| 192 | assert set(objs_a.keys()) <= indexed_oids, "first push objects missing from index" |
| 193 | assert set(objs_b.keys()) <= indexed_oids, "second push objects missing from index" |
| 194 | assert key_a in indexed_mpacks, "first mpack_key missing from index" |
| 195 | assert key_b in indexed_mpacks, "second mpack_key missing from index" |
| 196 | |
| 197 | |
| 198 | # --------------------------------------------------------------------------- |
| 199 | # PI-4 |
| 200 | # --------------------------------------------------------------------------- |
| 201 | |
| 202 | @pytest.mark.asyncio |
| 203 | async def test_pi4_objects_from_different_repos_are_distinct(db_session: AsyncSession) -> None: |
| 204 | """Objects pushed to different repos are distinct in the global index (no collisions).""" |
| 205 | repo_a = await create_repo( |
| 206 | db_session, |
| 207 | name="pi-test-4a", |
| 208 | owner="gabriel", |
| 209 | owner_user_id=compute_identity_id(b"gabriel"), |
| 210 | visibility="public", |
| 211 | initialize=False, |
| 212 | ) |
| 213 | repo_b = await create_repo( |
| 214 | db_session, |
| 215 | name="pi-test-4b", |
| 216 | owner="gabriel", |
| 217 | owner_user_id=compute_identity_id(b"gabriel"), |
| 218 | visibility="public", |
| 219 | initialize=False, |
| 220 | ) |
| 221 | |
| 222 | objs_a = {blob_id(b"pi4-repo-a-obj"): b"pi4-repo-a-obj"} |
| 223 | wire_a, key_a = _make_mpack(objs_a) |
| 224 | await _store_mpack(wire_a, key_a) |
| 225 | await _enqueue_and_process(db_session, repo_a.repo_id, key_a, 1) |
| 226 | |
| 227 | objs_b = {blob_id(b"pi4-repo-b-obj"): b"pi4-repo-b-obj"} |
| 228 | wire_b, key_b = _make_mpack(objs_b) |
| 229 | await _store_mpack(wire_b, key_b) |
| 230 | await _enqueue_and_process(db_session, repo_b.repo_id, key_b, 1) |
| 231 | |
| 232 | rows_a = (await db_session.execute( |
| 233 | select(db.MusehubMPackIndex).where( |
| 234 | db.MusehubMPackIndex.entity_id.in_(list(objs_a.keys())) |
| 235 | ) |
| 236 | )).scalars().all() |
| 237 | |
| 238 | rows_b = (await db_session.execute( |
| 239 | select(db.MusehubMPackIndex).where( |
| 240 | db.MusehubMPackIndex.entity_id.in_(list(objs_b.keys())) |
| 241 | ) |
| 242 | )).scalars().all() |
| 243 | |
| 244 | assert {r.entity_id for r in rows_a} == set(objs_a.keys()), "repo_a objects not indexed" |
| 245 | assert {r.entity_id for r in rows_b} == set(objs_b.keys()), "repo_b objects not indexed" |
| 246 | assert {r.entity_id for r in rows_a}.isdisjoint({r.entity_id for r in rows_b}), ( |
| 247 | "object collision — two repos pushed the same object_id (seeds must be unique)" |
| 248 | ) |
File History
1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠
20 days ago