gabriel / musehub public
test_mpack_index_phase1.py python
248 lines 9.1 KB
Raw
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor ⚠ breaking 20 days ago
1 """TDD — Phase 1: mpack index written for every pushed object (issue #63).
2
3 PI-1 After process_mpack_index_job, every object in the mpack has a row
4 in musehub_mpack_index with the correct mpack_id (= mpack_key).
5 PI-2 The mpack_id stored matches the mpack_key exactly.
6 PI-3 A second push of different objects adds new rows; existing rows are
7 untouched (on_conflict_do_nothing).
8 PI-4 Objects from different repos are distinct in the global index
9 (content-addressed objects are globally unique by object_id).
10
11 NOTE: process_mpack_index_job was removed — indexing is now inline during
12 push (musehub_wire_push.py step 7d). These tests will be revisited once
13 the MVP wire protocol and pack index are fully wired in.
14 """
15 from __future__ import annotations
16
17 import datetime
18 import hashlib
19 from collections.abc import Mapping
20
21 import msgpack
22 import pytest
23
24 pytestmark = pytest.mark.skip(reason="process_mpack_index_job removed — revisit with MWP pack index wiring")
25
26 from sqlalchemy import select
27 from sqlalchemy.ext.asyncio import AsyncSession
28
29 from muse.core.types import blob_id
30 from musehub.db import musehub_repo_models as db
31 from musehub.core.genesis import compute_identity_id
32 from musehub.services.musehub_repository import create_repo
33
34
35 # ---------------------------------------------------------------------------
36 # Helpers
37 # ---------------------------------------------------------------------------
38
39 def _make_mpack(objects: Mapping[str, bytes]) -> tuple[bytes, str]:
40 """Build a minimal MPack and return (wire_bytes, mpack_key)."""
41 mpack = {
42 "commits": [],
43 "snapshots": [],
44 "objects": [
45 {"object_id": oid, "content": data}
46 for oid, data in objects.items()
47 ],
48 "branch_heads": {},
49 }
50 wire_bytes = msgpack.packb(mpack, use_bin_type=True)
51 mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest()
52 return wire_bytes, mpack_key
53
54
55 async def _store_mpack(wire_bytes: bytes, mpack_key: str) -> None:
56 """Write mpack bytes to storage so process_mpack_index_job can fetch it."""
57 import musehub.storage.backends as _backends_mod
58 backend = _backends_mod.get_backend()
59 await backend.put_mpack(mpack_key, wire_bytes)
60
61
62 async def _enqueue_and_process(
63 session: AsyncSession,
64 repo_id: str,
65 mpack_key: str,
66 n_objects: int,
67 ) -> str:
68 """Enqueue a mpack.index job and run it synchronously. Returns job_id."""
69 from musehub.core.genesis import compute_job_id
70 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
71 from musehub.services.musehub_wire import process_mpack_index_job
72
73 now = datetime.datetime.now(datetime.timezone.utc)
74 job_id = compute_job_id(repo_id, "mpack.index", now.isoformat())
75 session.add(MusehubBackgroundJob(
76 job_id=job_id,
77 repo_id=repo_id,
78 job_type="mpack.index",
79 payload={
80 "mpack_key": mpack_key,
81 "branch": "main",
82 "head": "",
83 "pusher_id": "",
84 "declared_objects_count": n_objects,
85 "declared_commits_count": 0,
86 },
87 status="pending",
88 created_at=now,
89 attempt=0,
90 ))
91 await session.commit()
92 await process_mpack_index_job(session, job_id)
93 await session.commit()
94 return job_id
95
96
97 # ---------------------------------------------------------------------------
98 # PI-1
99 # ---------------------------------------------------------------------------
100
101 @pytest.mark.asyncio
102 async def test_pi1_mpack_index_written_for_every_object(db_session: AsyncSession) -> None:
103 """Every object in the mpack must have a row in musehub_mpack_index."""
104 repo = await create_repo(
105 db_session,
106 name="pi-test-1",
107 owner="gabriel",
108 owner_user_id=compute_identity_id(b"gabriel"),
109 visibility="public",
110 initialize=False,
111 )
112
113 objects = {blob_id(f"pi1-obj-{i}".encode()): f"pi1-obj-{i}".encode() for i in range(5)}
114 wire_bytes, mpack_key = _make_mpack(objects)
115 await _store_mpack(wire_bytes, mpack_key)
116 await _enqueue_and_process(db_session, repo.repo_id, mpack_key, len(objects))
117
118 rows_q = await db_session.execute(
119 select(db.MusehubMPackIndex).where(
120 db.MusehubMPackIndex.entity_id.in_(list(objects.keys()))
121 )
122 )
123 rows = rows_q.scalars().all()
124 indexed_oids = {r.entity_id for r in rows}
125
126 assert indexed_oids == set(objects.keys()), (
127 f"expected {len(objects)} mpack index rows, got {len(rows)}\n"
128 f"missing: {set(objects.keys()) - indexed_oids}"
129 )
130
131
132 # ---------------------------------------------------------------------------
133 # PI-2
134 # ---------------------------------------------------------------------------
135
136 @pytest.mark.asyncio
137 async def test_pi2_mpack_id_matches_mpack_key(db_session: AsyncSession) -> None:
138 """The mpack_id stored in musehub_mpack_index must equal the mpack_key."""
139 repo = await create_repo(
140 db_session,
141 name="pi-test-2",
142 owner="gabriel",
143 owner_user_id=compute_identity_id(b"gabriel"),
144 visibility="public",
145 initialize=False,
146 )
147
148 objects = {blob_id(b"pi2-only-obj"): b"pi2-only-obj"}
149 wire_bytes, mpack_key = _make_mpack(objects)
150 await _store_mpack(wire_bytes, mpack_key)
151 await _enqueue_and_process(db_session, repo.repo_id, mpack_key, len(objects))
152
153 row = await db_session.get(db.MusehubMPackIndex, (list(objects.keys())[0], mpack_key))
154 assert row is not None, "mpack index row not found"
155 assert row.mpack_id == mpack_key, f"mpack_id {row.mpack_id!r} != mpack_key {mpack_key!r}"
156
157
158 # ---------------------------------------------------------------------------
159 # PI-3
160 # ---------------------------------------------------------------------------
161
162 @pytest.mark.asyncio
163 async def test_pi3_second_push_adds_rows_without_overwriting(db_session: AsyncSession) -> None:
164 """A second push of different objects adds new rows; first push rows survive."""
165 repo = await create_repo(
166 db_session,
167 name="pi-test-3",
168 owner="gabriel",
169 owner_user_id=compute_identity_id(b"gabriel"),
170 visibility="public",
171 initialize=False,
172 )
173
174 objs_a = {blob_id(f"pi3-a-{i}".encode()): f"pi3-a-{i}".encode() for i in range(3)}
175 wire_a, key_a = _make_mpack(objs_a)
176 await _store_mpack(wire_a, key_a)
177 await _enqueue_and_process(db_session, repo.repo_id, key_a, len(objs_a))
178
179 objs_b = {blob_id(f"pi3-b-{i}".encode()): f"pi3-b-{i}".encode() for i in range(3)}
180 wire_b, key_b = _make_mpack(objs_b)
181 await _store_mpack(wire_b, key_b)
182 await _enqueue_and_process(db_session, repo.repo_id, key_b, len(objs_b))
183
184 all_oids = set(objs_a.keys()) | set(objs_b.keys())
185 rows_q = await db_session.execute(
186 select(db.MusehubMPackIndex).where(db.MusehubMPackIndex.entity_id.in_(list(all_oids)))
187 )
188 rows = rows_q.scalars().all()
189 indexed_oids = {r.entity_id for r in rows}
190 indexed_mpacks = {r.mpack_id for r in rows}
191
192 assert set(objs_a.keys()) <= indexed_oids, "first push objects missing from index"
193 assert set(objs_b.keys()) <= indexed_oids, "second push objects missing from index"
194 assert key_a in indexed_mpacks, "first mpack_key missing from index"
195 assert key_b in indexed_mpacks, "second mpack_key missing from index"
196
197
198 # ---------------------------------------------------------------------------
199 # PI-4
200 # ---------------------------------------------------------------------------
201
202 @pytest.mark.asyncio
203 async def test_pi4_objects_from_different_repos_are_distinct(db_session: AsyncSession) -> None:
204 """Objects pushed to different repos are distinct in the global index (no collisions)."""
205 repo_a = await create_repo(
206 db_session,
207 name="pi-test-4a",
208 owner="gabriel",
209 owner_user_id=compute_identity_id(b"gabriel"),
210 visibility="public",
211 initialize=False,
212 )
213 repo_b = await create_repo(
214 db_session,
215 name="pi-test-4b",
216 owner="gabriel",
217 owner_user_id=compute_identity_id(b"gabriel"),
218 visibility="public",
219 initialize=False,
220 )
221
222 objs_a = {blob_id(b"pi4-repo-a-obj"): b"pi4-repo-a-obj"}
223 wire_a, key_a = _make_mpack(objs_a)
224 await _store_mpack(wire_a, key_a)
225 await _enqueue_and_process(db_session, repo_a.repo_id, key_a, 1)
226
227 objs_b = {blob_id(b"pi4-repo-b-obj"): b"pi4-repo-b-obj"}
228 wire_b, key_b = _make_mpack(objs_b)
229 await _store_mpack(wire_b, key_b)
230 await _enqueue_and_process(db_session, repo_b.repo_id, key_b, 1)
231
232 rows_a = (await db_session.execute(
233 select(db.MusehubMPackIndex).where(
234 db.MusehubMPackIndex.entity_id.in_(list(objs_a.keys()))
235 )
236 )).scalars().all()
237
238 rows_b = (await db_session.execute(
239 select(db.MusehubMPackIndex).where(
240 db.MusehubMPackIndex.entity_id.in_(list(objs_b.keys()))
241 )
242 )).scalars().all()
243
244 assert {r.entity_id for r in rows_a} == set(objs_a.keys()), "repo_a objects not indexed"
245 assert {r.entity_id for r in rows_b} == set(objs_b.keys()), "repo_b objects not indexed"
246 assert {r.entity_id for r in rows_a}.isdisjoint({r.entity_id for r in rows_b}), (
247 "object collision — two repos pushed the same object_id (seeds must be unique)"
248 )
File History 1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor 20 days ago