test_commit_graph_phase2.py
python
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32
fix: fall back to DB ancestry check when mpack-only fast-fo…
Sonnet 4.6
patch
8 days ago
| 1 | """TDD — Phase 2: commit graph for O(1) BFS-frontier DAG walks (issue #63). |
| 2 | |
| 3 | CG-1 After process_mpack_index_job, every commit in the mpack has a row |
| 4 | in musehub_commit_graph with correct parent_ids. |
| 5 | CG-2 Generation numbers are monotonically correct: |
| 6 | - root commit → generation 0 |
| 7 | - each commit → generation = max(parent generations) + 1 |
| 8 | CG-3 _walk_commit_delta for 1000 commits completes in under 50ms |
| 9 | (proves bulk-query path, not one-per-commit path). |
| 10 | CG-4 _walk_commit_delta correctness: returns exactly the right delta |
| 11 | when the receiver already has some commits (partial clone / incremental fetch). |
| 12 | """ |
| 13 | from __future__ import annotations |
| 14 | |
| 15 | import datetime |
| 16 | import hashlib |
| 17 | import time |
| 18 | |
| 19 | import msgpack |
| 20 | import pytest |
| 21 | from sqlalchemy import select |
| 22 | from sqlalchemy.ext.asyncio import AsyncSession |
| 23 | |
| 24 | from muse.core.types import blob_id |
| 25 | from musehub.db import musehub_repo_models as db |
| 26 | from musehub.core.genesis import compute_identity_id |
| 27 | from musehub.services.musehub_repository import create_repo |
| 28 | |
| 29 | try: |
| 30 | from musehub.services.musehub_wire import process_mpack_index_job |
| 31 | _PROCESS_JOB_MISSING = False |
| 32 | except ImportError: |
| 33 | process_mpack_index_job = None # type: ignore[assignment] |
| 34 | _PROCESS_JOB_MISSING = True |
| 35 | |
| 36 | |
| 37 | # --------------------------------------------------------------------------- |
| 38 | # Helpers |
| 39 | # --------------------------------------------------------------------------- |
| 40 | |
| 41 | def _make_linear_chain(n: int, seed: str = "cg") -> tuple[list[dict], str]: |
| 42 | """Build a linear commit chain of length n. |
| 43 | |
| 44 | Returns (commits_list, tip_commit_id). |
| 45 | Each commit dict has: commit_id, parent_commit_id, snapshot_id, branch, message. |
| 46 | """ |
| 47 | commits = [] |
| 48 | parent_id: str | None = None |
| 49 | for i in range(n): |
| 50 | snap_id = blob_id(f"{seed}-snap-{i}".encode()) |
| 51 | msg_bytes = f"{seed}-commit-{i}-parent={parent_id}".encode() |
| 52 | cid = blob_id(msg_bytes) |
| 53 | commits.append({ |
| 54 | "commit_id": cid, |
| 55 | "branch": "main", |
| 56 | "message": f"commit {i}", |
| 57 | "author": "gabriel", |
| 58 | "committed_at": datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc).isoformat(), |
| 59 | "parent_commit_id": parent_id, |
| 60 | "parent2_commit_id": None, |
| 61 | "snapshot_id": snap_id, |
| 62 | "agent_id": "", |
| 63 | "model_id": "", |
| 64 | "toolchain_id": "", |
| 65 | "sem_ver_bump": "none", |
| 66 | "breaking_changes": [], |
| 67 | "signature": "", |
| 68 | "signer_key_id": "", |
| 69 | "signer_public_key": "", |
| 70 | "prompt_hash": "", |
| 71 | }) |
| 72 | parent_id = cid |
| 73 | return commits, parent_id # type: ignore[return-value] |
| 74 | |
| 75 | |
| 76 | def _make_mpack_from_commits(commits: list[dict]) -> tuple[bytes, str]: |
| 77 | mpack = { |
| 78 | "commits": commits, |
| 79 | "snapshots": [], |
| 80 | "objects": [], |
| 81 | "branch_heads": {"main": commits[-1]["commit_id"]}, |
| 82 | } |
| 83 | wire_bytes = msgpack.packb(mpack, use_bin_type=True) |
| 84 | mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() |
| 85 | return wire_bytes, mpack_key |
| 86 | |
| 87 | |
| 88 | async def _store_mpack(wire_bytes: bytes, mpack_key: str) -> None: |
| 89 | import musehub.storage.backends as _backends_mod |
| 90 | backend = _backends_mod.get_backend() |
| 91 | await backend.put_mpack(mpack_key, wire_bytes) |
| 92 | |
| 93 | |
| 94 | async def _enqueue_and_process( |
| 95 | session: AsyncSession, |
| 96 | repo_id: str, |
| 97 | mpack_key: str, |
| 98 | commits: list[dict], |
| 99 | ) -> str: |
| 100 | from musehub.core.genesis import compute_job_id |
| 101 | from musehub.db.musehub_jobs_models import MusehubBackgroundJob |
| 102 | |
| 103 | now = datetime.datetime.now(datetime.timezone.utc) |
| 104 | job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) |
| 105 | session.add(MusehubBackgroundJob( |
| 106 | job_id=job_id, |
| 107 | repo_id=repo_id, |
| 108 | job_type="mpack.index", |
| 109 | payload={ |
| 110 | "mpack_key": mpack_key, |
| 111 | "branch": "main", |
| 112 | "head": commits[-1]["commit_id"], |
| 113 | "pusher_id": "", |
| 114 | "declared_objects_count": 0, |
| 115 | "declared_commits_count": len(commits), |
| 116 | }, |
| 117 | status="pending", |
| 118 | created_at=now, |
| 119 | attempt=0, |
| 120 | )) |
| 121 | await session.commit() |
| 122 | await process_mpack_index_job(session, job_id) |
| 123 | await session.commit() |
| 124 | return job_id |
| 125 | |
| 126 | |
| 127 | # --------------------------------------------------------------------------- |
| 128 | # CG-1 |
| 129 | # --------------------------------------------------------------------------- |
| 130 | |
| 131 | @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available") |
| 132 | @pytest.mark.asyncio |
| 133 | async def test_cg1_commit_graph_written_for_every_commit(db_session: AsyncSession) -> None: |
| 134 | """Every commit in the mpack must have a row in musehub_commit_graph.""" |
| 135 | repo = await create_repo( |
| 136 | db_session, |
| 137 | name="cg-test-1", |
| 138 | owner="gabriel", |
| 139 | owner_user_id=compute_identity_id(b"gabriel"), |
| 140 | visibility="public", |
| 141 | initialize=False, |
| 142 | ) |
| 143 | |
| 144 | n = 20 |
| 145 | commits, tip = _make_linear_chain(n, seed="cg1") |
| 146 | wire_bytes, mpack_key = _make_mpack_from_commits(commits) |
| 147 | await _store_mpack(wire_bytes, mpack_key) |
| 148 | await _enqueue_and_process(db_session, repo.repo_id, mpack_key, commits) |
| 149 | |
| 150 | expected_cids = {c["commit_id"] for c in commits} |
| 151 | rows_q = await db_session.execute( |
| 152 | select(db.MusehubCommitGraph).where( |
| 153 | db.MusehubCommitGraph.commit_id.in_(expected_cids) |
| 154 | ) |
| 155 | ) |
| 156 | rows = rows_q.scalars().all() |
| 157 | indexed_cids = {r.commit_id for r in rows} |
| 158 | |
| 159 | assert indexed_cids == expected_cids, ( |
| 160 | f"expected {n} commit graph rows, got {len(rows)}\n" |
| 161 | f"missing: {expected_cids - indexed_cids}" |
| 162 | ) |
| 163 | |
| 164 | |
| 165 | # --------------------------------------------------------------------------- |
| 166 | # CG-2 |
| 167 | # --------------------------------------------------------------------------- |
| 168 | |
| 169 | @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available") |
| 170 | @pytest.mark.asyncio |
| 171 | async def test_cg2_generation_numbers_correct(db_session: AsyncSession) -> None: |
| 172 | """Root commit → generation 0; each subsequent commit → parent_generation + 1.""" |
| 173 | repo = await create_repo( |
| 174 | db_session, |
| 175 | name="cg-test-2", |
| 176 | owner="gabriel", |
| 177 | owner_user_id=compute_identity_id(b"gabriel"), |
| 178 | visibility="public", |
| 179 | initialize=False, |
| 180 | ) |
| 181 | |
| 182 | n = 10 |
| 183 | commits, tip = _make_linear_chain(n, seed="cg2") |
| 184 | wire_bytes, mpack_key = _make_mpack_from_commits(commits) |
| 185 | await _store_mpack(wire_bytes, mpack_key) |
| 186 | await _enqueue_and_process(db_session, repo.repo_id, mpack_key, commits) |
| 187 | |
| 188 | expected_cids_cg2 = {c["commit_id"] for c in commits} |
| 189 | rows_q = await db_session.execute( |
| 190 | select(db.MusehubCommitGraph).where( |
| 191 | db.MusehubCommitGraph.commit_id.in_(expected_cids_cg2) |
| 192 | ) |
| 193 | ) |
| 194 | rows = {r.commit_id: r for r in rows_q.scalars().all()} |
| 195 | |
| 196 | for i, commit in enumerate(commits): |
| 197 | cid = commit["commit_id"] |
| 198 | row = rows.get(cid) |
| 199 | assert row is not None, f"commit {i} missing from graph" |
| 200 | assert row.generation == i, ( |
| 201 | f"commit {i}: expected generation={i}, got {row.generation}" |
| 202 | ) |
| 203 | |
| 204 | |
| 205 | # --------------------------------------------------------------------------- |
| 206 | # CG-3 |
| 207 | # --------------------------------------------------------------------------- |
| 208 | |
| 209 | @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available") |
| 210 | @pytest.mark.asyncio |
| 211 | async def test_cg3_walk_commit_delta_1000_commits_under_50ms(db_session: AsyncSession) -> None: |
| 212 | """_walk_commit_delta for 1000 commits must complete in under 50ms.""" |
| 213 | repo = await create_repo( |
| 214 | db_session, |
| 215 | name="cg-test-3", |
| 216 | owner="gabriel", |
| 217 | owner_user_id=compute_identity_id(b"gabriel"), |
| 218 | visibility="public", |
| 219 | initialize=False, |
| 220 | ) |
| 221 | |
| 222 | n = 1000 |
| 223 | commits, tip = _make_linear_chain(n, seed="cg3") |
| 224 | wire_bytes, mpack_key = _make_mpack_from_commits(commits) |
| 225 | await _store_mpack(wire_bytes, mpack_key) |
| 226 | await _enqueue_and_process(db_session, repo.repo_id, mpack_key, commits) |
| 227 | |
| 228 | from musehub.services.musehub_wire import _walk_commit_delta |
| 229 | |
| 230 | t0 = time.perf_counter() |
| 231 | delta = await _walk_commit_delta(db_session, [tip], have=[]) |
| 232 | elapsed_ms = (time.perf_counter() - t0) * 1000 |
| 233 | |
| 234 | assert len(delta) == n, f"expected {n} commits in delta, got {len(delta)}" |
| 235 | assert elapsed_ms < 50, ( |
| 236 | f"_walk_commit_delta took {elapsed_ms:.1f}ms for {n} commits — must be < 50ms" |
| 237 | ) |
| 238 | |
| 239 | |
| 240 | # --------------------------------------------------------------------------- |
| 241 | # CG-4 |
| 242 | # --------------------------------------------------------------------------- |
| 243 | |
| 244 | @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available") |
| 245 | @pytest.mark.asyncio |
| 246 | async def test_cg4_walk_commit_delta_correctness_with_have(db_session: AsyncSession) -> None: |
| 247 | """Delta walk with a non-empty 'have' set returns only the commits the receiver lacks.""" |
| 248 | repo = await create_repo( |
| 249 | db_session, |
| 250 | name="cg-test-4", |
| 251 | owner="gabriel", |
| 252 | owner_user_id=compute_identity_id(b"gabriel"), |
| 253 | visibility="public", |
| 254 | initialize=False, |
| 255 | ) |
| 256 | |
| 257 | n = 50 |
| 258 | commits, tip = _make_linear_chain(n, seed="cg4") |
| 259 | wire_bytes, mpack_key = _make_mpack_from_commits(commits) |
| 260 | await _store_mpack(wire_bytes, mpack_key) |
| 261 | await _enqueue_and_process(db_session, repo.repo_id, mpack_key, commits) |
| 262 | |
| 263 | # Receiver already has the first 20 commits. |
| 264 | have_idx = 20 |
| 265 | have_commit_id = commits[have_idx]["commit_id"] |
| 266 | have = {c["commit_id"] for c in commits[: have_idx + 1]} |
| 267 | |
| 268 | from musehub.services.musehub_wire import _walk_commit_delta |
| 269 | |
| 270 | delta = await _walk_commit_delta(db_session, [tip], have=list(have), ) |
| 271 | |
| 272 | expected = {c["commit_id"] for c in commits[have_idx + 1:]} |
| 273 | assert set(delta.keys()) == expected, ( |
| 274 | f"delta wrong: expected {len(expected)} commits, got {len(delta)}\n" |
| 275 | f"extra: {set(delta.keys()) - expected}\n" |
| 276 | f"missing: {expected - set(delta.keys())}" |
| 277 | ) |
File History
1 commit
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32
fix: fall back to DB ancestry check when mpack-only fast-fo…
Sonnet 4.6
patch
8 days ago