gabriel / musehub public
test_mpack_index_phase2.py python
329 lines 12.0 KB
Raw
sha256:0997d6250ae6476362f6fe2025af7789f46d03df3e9f34356d5e8ee79b201923 fix(issues): use issue number as pagination cursor, not cre… Sonnet 4.6 patch 8 days ago
1 """TDD — Phase 2: MPack index covers all three entity types.
2
3 The MusehubMPackIndex table must map every commit, snapshot, and object
4 in a pushed mpack to the mpack_id that contains it. This enables the
5 fetch path to locate covering mpacks for any entity without O(N) individual
6 GET calls.
7
8 PI-5 After process_mpack_index_job, every commit_id in the mpack has a row
9 in MusehubMPackIndex with entity_type="commit" and the correct mpack_id.
10 PI-6 After process_mpack_index_job, every snapshot_id in the mpack has a row
11 in MusehubMPackIndex with entity_type="snapshot" and the correct mpack_id.
12 PI-7 All three entity types (object, commit, snapshot) are written atomically
13 in one process_mpack_index_job call — a single push indexes everything.
14 PI-8 A second push with different commits and snapshots adds new rows without
15 disturbing the first push's rows (on_conflict_do_nothing idempotency).
16 """
17 from __future__ import annotations
18
19 import datetime
20 import hashlib
21 from collections.abc import Mapping
22
23 import msgpack
24 import pytest
25 from sqlalchemy import select
26
27 pytestmark = pytest.mark.skip(reason="muse wire protocol in flux")
28 from sqlalchemy.ext.asyncio import AsyncSession
29
30 from muse.core.snapshot import compute_commit_id, compute_snapshot_id
31 from muse.core.types import blob_id
32 from musehub.core.genesis import compute_identity_id
33 from musehub.db import musehub_repo_models as db
34 from musehub.services.musehub_repository import create_repo
35
36
37 # ---------------------------------------------------------------------------
38 # Helpers
39 # ---------------------------------------------------------------------------
40
41 _DT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
42
43
44 def _make_commit_dict(
45 repo_id: str,
46 message: str,
47 snapshot_id: str,
48 parent_ids: list[str] | None = None,
49 ) -> tuple[dict, str]:
50 """Build a minimal commit dict and return (dict, commit_id)."""
51 cid = compute_commit_id(
52 parent_ids=parent_ids or [],
53 snapshot_id=snapshot_id,
54 message=message,
55 committed_at_iso=_DT.isoformat(),
56 author="gabriel",
57 )
58 return {
59 "commit_id": cid,
60 "repo_id": repo_id,
61 "branch": "main",
62 "snapshot_id": snapshot_id,
63 "parent_commit_id": None,
64 "parent2_commit_id": None,
65 "message": message,
66 "committed_at": _DT.isoformat(),
67 "author": "gabriel",
68 "metadata": {},
69 "structured_delta": None,
70 "sem_ver_bump": "none",
71 "breaking_changes": [],
72 "agent_id": "",
73 "model_id": "",
74 "toolchain_id": "",
75 "prompt_hash": "",
76 "signature": "",
77 "signer_key_id": "",
78 }, cid
79
80
81 def _make_snapshot_dict(manifest: Mapping[str, str]) -> tuple[dict, str]:
82 """Build a minimal snapshot dict and return (dict, snapshot_id)."""
83 sid = compute_snapshot_id(manifest)
84 return {
85 "snapshot_id": sid,
86 "parent_snapshot_id": None,
87 "delta_upsert": manifest,
88 "delta_remove": [],
89 }, sid
90
91
92 def _make_full_mpack(
93 repo_id: str,
94 objects: dict[str, bytes],
95 extra_tag: str = "",
96 ) -> tuple[bytes, str, list[str], list[str]]:
97 """Build an mpack with objects, one snapshot, and one commit.
98
99 Returns (wire_bytes, mpack_key, [commit_id], [snapshot_id]).
100 """
101 manifest = {f"src/file_{extra_tag}_{k[-8:]}.py": k for k in objects}
102 snap_dict, sid = _make_snapshot_dict(manifest)
103 commit_dict, cid = _make_commit_dict(repo_id, f"commit {extra_tag}", sid)
104
105 mpack = {
106 "commits": [commit_dict],
107 "snapshots": [snap_dict],
108 "objects": [
109 {"object_id": oid, "content": content}
110 for oid, content in objects.items()
111 ],
112 }
113 wire_bytes = msgpack.packb(mpack, use_bin_type=True)
114 mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest()
115 return wire_bytes, mpack_key, [cid], [sid]
116
117
118 async def _store_mpack(wire_bytes: bytes, mpack_key: str) -> None:
119 import musehub.storage.backends as _backends_mod
120 backend = _backends_mod.get_backend()
121 await backend.put_mpack(mpack_key, wire_bytes)
122
123
124 async def _enqueue_and_process(
125 session: AsyncSession,
126 repo_id: str,
127 mpack_key: str,
128 n_objects: int,
129 n_commits: int = 1,
130 ) -> None:
131 from musehub.core.genesis import compute_job_id
132 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
133 from musehub.services.musehub_wire import process_mpack_index_job
134
135 now = datetime.datetime.now(datetime.timezone.utc)
136 job_id = compute_job_id(repo_id, "mpack.index", now.isoformat())
137 session.add(MusehubBackgroundJob(
138 job_id=job_id,
139 repo_id=repo_id,
140 job_type="mpack.index",
141 payload={
142 "mpack_key": mpack_key,
143 "branch": "main",
144 "head": "",
145 "pusher_id": "",
146 "declared_objects_count": n_objects,
147 "declared_commits_count": n_commits,
148 },
149 status="pending",
150 created_at=now,
151 attempt=0,
152 ))
153 await session.commit()
154 await process_mpack_index_job(session, job_id)
155 await session.commit()
156
157
158 # ---------------------------------------------------------------------------
159 # PI-5 commit_ids are indexed
160 # ---------------------------------------------------------------------------
161
162 @pytest.mark.asyncio
163 async def test_pi5_commit_ids_indexed_after_push(db_session: AsyncSession) -> None:
164 """Every commit_id in the mpack must appear in MusehubMPackIndex with entity_type='commit'."""
165 repo = await create_repo(
166 db_session,
167 name="pi5-test",
168 owner="gabriel",
169 owner_user_id=compute_identity_id(b"gabriel"),
170 visibility="public",
171 initialize=False,
172 )
173 objects = {blob_id(b"pi5-obj"): b"pi5-obj"}
174 wire_bytes, mpack_key, commit_ids, _ = _make_full_mpack(
175 repo.repo_id, objects, extra_tag="pi5"
176 )
177 await _store_mpack(wire_bytes, mpack_key)
178 await _enqueue_and_process(db_session, repo.repo_id, mpack_key, len(objects))
179
180 rows = (await db_session.execute(
181 select(db.MusehubMPackIndex).where(
182 db.MusehubMPackIndex.entity_id.in_(commit_ids),
183 db.MusehubMPackIndex.entity_type == "commit",
184 )
185 )).scalars().all()
186
187 assert {r.entity_id for r in rows} == set(commit_ids), (
188 f"commit_ids not indexed — missing: {set(commit_ids) - {r.entity_id for r in rows}}"
189 )
190 assert all(r.mpack_id == mpack_key for r in rows), (
191 "commit index rows have wrong mpack_id"
192 )
193
194
195 # ---------------------------------------------------------------------------
196 # PI-6 snapshot_ids are indexed
197 # ---------------------------------------------------------------------------
198
199 @pytest.mark.asyncio
200 async def test_pi6_snapshot_ids_indexed_after_push(db_session: AsyncSession) -> None:
201 """Every snapshot_id in the mpack must appear in MusehubMPackIndex with entity_type='snapshot'."""
202 repo = await create_repo(
203 db_session,
204 name="pi6-test",
205 owner="gabriel",
206 owner_user_id=compute_identity_id(b"gabriel"),
207 visibility="public",
208 initialize=False,
209 )
210 objects = {blob_id(b"pi6-obj"): b"pi6-obj"}
211 wire_bytes, mpack_key, _, snapshot_ids = _make_full_mpack(
212 repo.repo_id, objects, extra_tag="pi6"
213 )
214 await _store_mpack(wire_bytes, mpack_key)
215 await _enqueue_and_process(db_session, repo.repo_id, mpack_key, len(objects))
216
217 rows = (await db_session.execute(
218 select(db.MusehubMPackIndex).where(
219 db.MusehubMPackIndex.entity_id.in_(snapshot_ids),
220 db.MusehubMPackIndex.entity_type == "snapshot",
221 )
222 )).scalars().all()
223
224 assert {r.entity_id for r in rows} == set(snapshot_ids), (
225 f"snapshot_ids not indexed — missing: {set(snapshot_ids) - {r.entity_id for r in rows}}"
226 )
227 assert all(r.mpack_id == mpack_key for r in rows), (
228 "snapshot index rows have wrong mpack_id"
229 )
230
231
232 # ---------------------------------------------------------------------------
233 # PI-7 All three types indexed atomically in one job
234 # ---------------------------------------------------------------------------
235
236 @pytest.mark.asyncio
237 async def test_pi7_all_three_types_indexed_atomically(db_session: AsyncSession) -> None:
238 """One process_mpack_index_job call must index objects, commits, and snapshots."""
239 repo = await create_repo(
240 db_session,
241 name="pi7-test",
242 owner="gabriel",
243 owner_user_id=compute_identity_id(b"gabriel"),
244 visibility="public",
245 initialize=False,
246 )
247 objects = {
248 blob_id(f"pi7-obj-{i}".encode()): f"pi7-obj-{i}".encode()
249 for i in range(3)
250 }
251 wire_bytes, mpack_key, commit_ids, snapshot_ids = _make_full_mpack(
252 repo.repo_id, objects, extra_tag="pi7"
253 )
254 await _store_mpack(wire_bytes, mpack_key)
255 await _enqueue_and_process(db_session, repo.repo_id, mpack_key, len(objects))
256
257 all_entity_ids = set(objects.keys()) | set(commit_ids) | set(snapshot_ids)
258 rows = (await db_session.execute(
259 select(db.MusehubMPackIndex).where(
260 db.MusehubMPackIndex.entity_id.in_(list(all_entity_ids))
261 )
262 )).scalars().all()
263
264 indexed = {r.entity_id for r in rows}
265 by_type = {}
266 for r in rows:
267 by_type.setdefault(r.entity_type, set()).add(r.entity_id)
268
269 assert set(objects.keys()) <= indexed, (
270 f"object_ids not fully indexed: {set(objects.keys()) - indexed}"
271 )
272 assert set(commit_ids) <= indexed, (
273 f"commit_ids not indexed: {set(commit_ids) - indexed}"
274 )
275 assert set(snapshot_ids) <= indexed, (
276 f"snapshot_ids not indexed: {set(snapshot_ids) - indexed}"
277 )
278 assert by_type.get("object", set()) == set(objects.keys()), "wrong entity_type for objects"
279 assert by_type.get("commit", set()) == set(commit_ids), "wrong entity_type for commits"
280 assert by_type.get("snapshot", set()) == set(snapshot_ids), "wrong entity_type for snapshots"
281
282
283 # ---------------------------------------------------------------------------
284 # PI-8 Second push adds new rows without overwriting first push
285 # ---------------------------------------------------------------------------
286
287 @pytest.mark.asyncio
288 async def test_pi8_second_push_indexes_without_overwriting(db_session: AsyncSession) -> None:
289 """A second push adds new commit/snapshot/object rows; first push rows survive."""
290 repo = await create_repo(
291 db_session,
292 name="pi8-test",
293 owner="gabriel",
294 owner_user_id=compute_identity_id(b"gabriel"),
295 visibility="public",
296 initialize=False,
297 )
298
299 objs_a = {blob_id(f"pi8-a-{i}".encode()): f"pi8-a-{i}".encode() for i in range(2)}
300 wire_a, key_a, cids_a, sids_a = _make_full_mpack(repo.repo_id, objs_a, extra_tag="pi8a")
301 await _store_mpack(wire_a, key_a)
302 await _enqueue_and_process(db_session, repo.repo_id, key_a, len(objs_a))
303
304 objs_b = {blob_id(f"pi8-b-{i}".encode()): f"pi8-b-{i}".encode() for i in range(2)}
305 wire_b, key_b, cids_b, sids_b = _make_full_mpack(repo.repo_id, objs_b, extra_tag="pi8b")
306 await _store_mpack(wire_b, key_b)
307 await _enqueue_and_process(db_session, repo.repo_id, key_b, len(objs_b))
308
309 all_ids = (
310 set(objs_a) | set(objs_b)
311 | set(cids_a) | set(cids_b)
312 | set(sids_a) | set(sids_b)
313 )
314 rows = (await db_session.execute(
315 select(db.MusehubMPackIndex).where(
316 db.MusehubMPackIndex.entity_id.in_(list(all_ids))
317 )
318 )).scalars().all()
319 indexed = {r.entity_id for r in rows}
320 mpack_ids_found = {r.mpack_id for r in rows}
321
322 assert set(objs_a) <= indexed, "first push objects missing after second push"
323 assert set(objs_b) <= indexed, "second push objects missing"
324 assert set(cids_a) <= indexed, "first push commits missing after second push"
325 assert set(cids_b) <= indexed, "second push commits missing"
326 assert set(sids_a) <= indexed, "first push snapshots missing after second push"
327 assert set(sids_b) <= indexed, "second push snapshots missing"
328 assert key_a in mpack_ids_found, "first mpack_key lost from index"
329 assert key_b in mpack_ids_found, "second mpack_key missing from index"
File History 1 commit
sha256:0997d6250ae6476362f6fe2025af7789f46d03df3e9f34356d5e8ee79b201923 fix(issues): use issue number as pagination cursor, not cre… Sonnet 4.6 patch 8 days ago