gabriel / musehub public

test_commit_graph_phase2.py file-level

at sha256:7 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 💥 blast risk
sha256:0 fix: fall back to any indexed mpack in read_object_bytes when push mpac… · gabriel · Jun 17, 2026
1 """TDD — Phase 2: commit graph for O(1) BFS-frontier DAG walks (issue #63).
2
3 CG-1 After process_mpack_index_job, every commit in the mpack has a row
4 in musehub_commit_graph with correct parent_ids.
5 CG-2 Generation numbers are monotonically correct:
6 - root commit → generation 0
7 - each commit → generation = max(parent generations) + 1
8 CG-3 _walk_commit_delta for 1000 commits completes in under 50ms
9 (proves bulk-query path, not one-per-commit path).
10 CG-4 _walk_commit_delta correctness: returns exactly the right delta
11 when the receiver already has some commits (partial clone / incremental fetch).
12 """
13 from __future__ import annotations
14
15 import datetime
16 import hashlib
17 import time
18
19 import msgpack
20 import pytest
21 from sqlalchemy import select
22 from sqlalchemy.ext.asyncio import AsyncSession
23
24 from muse.core.types import blob_id
25 from musehub.db import musehub_repo_models as db
26 from musehub.core.genesis import compute_identity_id
27 from musehub.services.musehub_repository import create_repo
28
29 try:
30 from musehub.services.musehub_wire import process_mpack_index_job
31 _PROCESS_JOB_MISSING = False
32 except ImportError:
33 process_mpack_index_job = None # type: ignore[assignment]
34 _PROCESS_JOB_MISSING = True
35
36
37 # ---------------------------------------------------------------------------
38 # Helpers
39 # ---------------------------------------------------------------------------
40
41 def _make_linear_chain(n: int, seed: str = "cg") -> tuple[list[dict], str]:
42 """Build a linear commit chain of length n.
43
44 Returns (commits_list, tip_commit_id).
45 Each commit dict has: commit_id, parent_commit_id, snapshot_id, branch, message.
46 """
47 commits = []
48 parent_id: str | None = None
49 for i in range(n):
50 snap_id = blob_id(f"{seed}-snap-{i}".encode())
51 msg_bytes = f"{seed}-commit-{i}-parent={parent_id}".encode()
52 cid = blob_id(msg_bytes)
53 commits.append({
54 "commit_id": cid,
55 "branch": "main",
56 "message": f"commit {i}",
57 "author": "gabriel",
58 "committed_at": datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc).isoformat(),
59 "parent_commit_id": parent_id,
60 "parent2_commit_id": None,
61 "snapshot_id": snap_id,
62 "agent_id": "",
63 "model_id": "",
64 "toolchain_id": "",
65 "sem_ver_bump": "none",
66 "breaking_changes": [],
67 "signature": "",
68 "signer_key_id": "",
69 "signer_public_key": "",
70 "prompt_hash": "",
71 })
72 parent_id = cid
73 return commits, parent_id # type: ignore[return-value]
74
75
76 def _make_mpack_from_commits(commits: list[dict]) -> tuple[bytes, str]:
77 mpack = {
78 "commits": commits,
79 "snapshots": [],
80 "objects": [],
81 "branch_heads": {"main": commits[-1]["commit_id"]},
82 }
83 wire_bytes = msgpack.packb(mpack, use_bin_type=True)
84 mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest()
85 return wire_bytes, mpack_key
86
87
88 async def _store_mpack(wire_bytes: bytes, mpack_key: str) -> None:
89 import musehub.storage.backends as _backends_mod
90 backend = _backends_mod.get_backend()
91 await backend.put_mpack(mpack_key, wire_bytes)
92
93
94 async def _enqueue_and_process(
95 session: AsyncSession,
96 repo_id: str,
97 mpack_key: str,
98 commits: list[dict],
99 ) -> str:
100 from musehub.core.genesis import compute_job_id
101 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
102
103 now = datetime.datetime.now(datetime.timezone.utc)
104 job_id = compute_job_id(repo_id, "mpack.index", now.isoformat())
105 session.add(MusehubBackgroundJob(
106 job_id=job_id,
107 repo_id=repo_id,
108 job_type="mpack.index",
109 payload={
110 "mpack_key": mpack_key,
111 "branch": "main",
112 "head": commits[-1]["commit_id"],
113 "pusher_id": "",
114 "declared_objects_count": 0,
115 "declared_commits_count": len(commits),
116 },
117 status="pending",
118 created_at=now,
119 attempt=0,
120 ))
121 await session.commit()
122 await process_mpack_index_job(session, job_id)
123 await session.commit()
124 return job_id
125
126
127 # ---------------------------------------------------------------------------
128 # CG-1
129 # ---------------------------------------------------------------------------
130
131 @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available")
132 @pytest.mark.asyncio
133 async def test_cg1_commit_graph_written_for_every_commit(db_session: AsyncSession) -> None:
134 """Every commit in the mpack must have a row in musehub_commit_graph."""
135 repo = await create_repo(
136 db_session,
137 name="cg-test-1",
138 owner="gabriel",
139 owner_user_id=compute_identity_id(b"gabriel"),
140 visibility="public",
141 initialize=False,
142 )
143
144 n = 20
145 commits, tip = _make_linear_chain(n, seed="cg1")
146 wire_bytes, mpack_key = _make_mpack_from_commits(commits)
147 await _store_mpack(wire_bytes, mpack_key)
148 await _enqueue_and_process(db_session, repo.repo_id, mpack_key, commits)
149
150 expected_cids = {c["commit_id"] for c in commits}
151 rows_q = await db_session.execute(
152 select(db.MusehubCommitGraph).where(
153 db.MusehubCommitGraph.commit_id.in_(expected_cids)
154 )
155 )
156 rows = rows_q.scalars().all()
157 indexed_cids = {r.commit_id for r in rows}
158
159 assert indexed_cids == expected_cids, (
160 f"expected {n} commit graph rows, got {len(rows)}\n"
161 f"missing: {expected_cids - indexed_cids}"
162 )
163
164
165 # ---------------------------------------------------------------------------
166 # CG-2
167 # ---------------------------------------------------------------------------
168
169 @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available")
170 @pytest.mark.asyncio
171 async def test_cg2_generation_numbers_correct(db_session: AsyncSession) -> None:
172 """Root commit → generation 0; each subsequent commit → parent_generation + 1."""
173 repo = await create_repo(
174 db_session,
175 name="cg-test-2",
176 owner="gabriel",
177 owner_user_id=compute_identity_id(b"gabriel"),
178 visibility="public",
179 initialize=False,
180 )
181
182 n = 10
183 commits, tip = _make_linear_chain(n, seed="cg2")
184 wire_bytes, mpack_key = _make_mpack_from_commits(commits)
185 await _store_mpack(wire_bytes, mpack_key)
186 await _enqueue_and_process(db_session, repo.repo_id, mpack_key, commits)
187
188 expected_cids_cg2 = {c["commit_id"] for c in commits}
189 rows_q = await db_session.execute(
190 select(db.MusehubCommitGraph).where(
191 db.MusehubCommitGraph.commit_id.in_(expected_cids_cg2)
192 )
193 )
194 rows = {r.commit_id: r for r in rows_q.scalars().all()}
195
196 for i, commit in enumerate(commits):
197 cid = commit["commit_id"]
198 row = rows.get(cid)
199 assert row is not None, f"commit {i} missing from graph"
200 assert row.generation == i, (
201 f"commit {i}: expected generation={i}, got {row.generation}"
202 )
203
204
205 # ---------------------------------------------------------------------------
206 # CG-3
207 # ---------------------------------------------------------------------------
208
209 @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available")
210 @pytest.mark.asyncio
211 async def test_cg3_walk_commit_delta_1000_commits_under_50ms(db_session: AsyncSession) -> None:
212 """_walk_commit_delta for 1000 commits must complete in under 50ms."""
213 repo = await create_repo(
214 db_session,
215 name="cg-test-3",
216 owner="gabriel",
217 owner_user_id=compute_identity_id(b"gabriel"),
218 visibility="public",
219 initialize=False,
220 )
221
222 n = 1000
223 commits, tip = _make_linear_chain(n, seed="cg3")
224 wire_bytes, mpack_key = _make_mpack_from_commits(commits)
225 await _store_mpack(wire_bytes, mpack_key)
226 await _enqueue_and_process(db_session, repo.repo_id, mpack_key, commits)
227
228 from musehub.services.musehub_wire import _walk_commit_delta
229
230 t0 = time.perf_counter()
231 delta = await _walk_commit_delta(db_session, [tip], have=[])
232 elapsed_ms = (time.perf_counter() - t0) * 1000
233
234 assert len(delta) == n, f"expected {n} commits in delta, got {len(delta)}"
235 assert elapsed_ms < 50, (
236 f"_walk_commit_delta took {elapsed_ms:.1f}ms for {n} commits — must be < 50ms"
237 )
238
239
240 # ---------------------------------------------------------------------------
241 # CG-4
242 # ---------------------------------------------------------------------------
243
244 @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available")
245 @pytest.mark.asyncio
246 async def test_cg4_walk_commit_delta_correctness_with_have(db_session: AsyncSession) -> None:
247 """Delta walk with a non-empty 'have' set returns only the commits the receiver lacks."""
248 repo = await create_repo(
249 db_session,
250 name="cg-test-4",
251 owner="gabriel",
252 owner_user_id=compute_identity_id(b"gabriel"),
253 visibility="public",
254 initialize=False,
255 )
256
257 n = 50
258 commits, tip = _make_linear_chain(n, seed="cg4")
259 wire_bytes, mpack_key = _make_mpack_from_commits(commits)
260 await _store_mpack(wire_bytes, mpack_key)
261 await _enqueue_and_process(db_session, repo.repo_id, mpack_key, commits)
262
263 # Receiver already has the first 20 commits.
264 have_idx = 20
265 have_commit_id = commits[have_idx]["commit_id"]
266 have = {c["commit_id"] for c in commits[: have_idx + 1]}
267
268 from musehub.services.musehub_wire import _walk_commit_delta
269
270 delta = await _walk_commit_delta(db_session, [tip], have=list(have), )
271
272 expected = {c["commit_id"] for c in commits[have_idx + 1:]}
273 assert set(delta.keys()) == expected, (
274 f"delta wrong: expected {len(expected)} commits, got {len(delta)}\n"
275 f"extra: {set(delta.keys()) - expected}\n"
276 f"missing: {expected - set(delta.keys())}"
277 )