"""TDD — Phase 2: commit graph for O(1) BFS-frontier DAG walks (issue #63). CG-1 After process_mpack_index_job, every commit in the mpack has a row in musehub_commit_graph with correct parent_ids. CG-2 Generation numbers are monotonically correct: - root commit → generation 0 - each commit → generation = max(parent generations) + 1 CG-3 _walk_commit_delta for 1000 commits completes in under 50ms (proves bulk-query path, not one-per-commit path). CG-4 _walk_commit_delta correctness: returns exactly the right delta when the receiver already has some commits (partial clone / incremental fetch). """ from __future__ import annotations import datetime import hashlib import time import msgpack import pytest from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import blob_id from musehub.db import musehub_repo_models as db from musehub.core.genesis import compute_identity_id from musehub.services.musehub_repository import create_repo try: from musehub.services.musehub_wire import process_mpack_index_job _PROCESS_JOB_MISSING = False except ImportError: process_mpack_index_job = None # type: ignore[assignment] _PROCESS_JOB_MISSING = True # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_linear_chain(n: int, seed: str = "cg") -> tuple[list[dict], str]: """Build a linear commit chain of length n. Returns (commits_list, tip_commit_id). Each commit dict has: commit_id, parent_commit_id, snapshot_id, branch, message. """ commits = [] parent_id: str | None = None for i in range(n): snap_id = blob_id(f"{seed}-snap-{i}".encode()) msg_bytes = f"{seed}-commit-{i}-parent={parent_id}".encode() cid = blob_id(msg_bytes) commits.append({ "commit_id": cid, "branch": "main", "message": f"commit {i}", "author": "gabriel", "committed_at": datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc).isoformat(), "parent_commit_id": parent_id, "parent2_commit_id": None, "snapshot_id": snap_id, "agent_id": "", "model_id": "", "toolchain_id": "", "sem_ver_bump": "none", "breaking_changes": [], "signature": "", "signer_key_id": "", "signer_public_key": "", "prompt_hash": "", }) parent_id = cid return commits, parent_id # type: ignore[return-value] def _make_mpack_from_commits(commits: list[dict]) -> tuple[bytes, str]: mpack = { "commits": commits, "snapshots": [], "objects": [], "branch_heads": {"main": commits[-1]["commit_id"]}, } wire_bytes = msgpack.packb(mpack, use_bin_type=True) mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() return wire_bytes, mpack_key async def _store_mpack(wire_bytes: bytes, mpack_key: str) -> None: import musehub.storage.backends as _backends_mod backend = _backends_mod.get_backend() await backend.put_mpack(mpack_key, wire_bytes) async def _enqueue_and_process( session: AsyncSession, repo_id: str, mpack_key: str, commits: list[dict], ) -> str: from musehub.core.genesis import compute_job_id from musehub.db.musehub_jobs_models import MusehubBackgroundJob now = datetime.datetime.now(datetime.timezone.utc) job_id = compute_job_id(repo_id, "mpack.index", now.isoformat()) session.add(MusehubBackgroundJob( job_id=job_id, repo_id=repo_id, job_type="mpack.index", payload={ "mpack_key": mpack_key, "branch": "main", "head": commits[-1]["commit_id"], "pusher_id": "", "declared_objects_count": 0, "declared_commits_count": len(commits), }, status="pending", created_at=now, attempt=0, )) await session.commit() await process_mpack_index_job(session, job_id) await session.commit() return job_id # --------------------------------------------------------------------------- # CG-1 # --------------------------------------------------------------------------- @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available") @pytest.mark.asyncio async def test_cg1_commit_graph_written_for_every_commit(db_session: AsyncSession) -> None: """Every commit in the mpack must have a row in musehub_commit_graph.""" repo = await create_repo( db_session, name="cg-test-1", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) n = 20 commits, tip = _make_linear_chain(n, seed="cg1") wire_bytes, mpack_key = _make_mpack_from_commits(commits) await _store_mpack(wire_bytes, mpack_key) await _enqueue_and_process(db_session, repo.repo_id, mpack_key, commits) expected_cids = {c["commit_id"] for c in commits} rows_q = await db_session.execute( select(db.MusehubCommitGraph).where( db.MusehubCommitGraph.commit_id.in_(expected_cids) ) ) rows = rows_q.scalars().all() indexed_cids = {r.commit_id for r in rows} assert indexed_cids == expected_cids, ( f"expected {n} commit graph rows, got {len(rows)}\n" f"missing: {expected_cids - indexed_cids}" ) # --------------------------------------------------------------------------- # CG-2 # --------------------------------------------------------------------------- @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available") @pytest.mark.asyncio async def test_cg2_generation_numbers_correct(db_session: AsyncSession) -> None: """Root commit → generation 0; each subsequent commit → parent_generation + 1.""" repo = await create_repo( db_session, name="cg-test-2", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) n = 10 commits, tip = _make_linear_chain(n, seed="cg2") wire_bytes, mpack_key = _make_mpack_from_commits(commits) await _store_mpack(wire_bytes, mpack_key) await _enqueue_and_process(db_session, repo.repo_id, mpack_key, commits) expected_cids_cg2 = {c["commit_id"] for c in commits} rows_q = await db_session.execute( select(db.MusehubCommitGraph).where( db.MusehubCommitGraph.commit_id.in_(expected_cids_cg2) ) ) rows = {r.commit_id: r for r in rows_q.scalars().all()} for i, commit in enumerate(commits): cid = commit["commit_id"] row = rows.get(cid) assert row is not None, f"commit {i} missing from graph" assert row.generation == i, ( f"commit {i}: expected generation={i}, got {row.generation}" ) # --------------------------------------------------------------------------- # CG-3 # --------------------------------------------------------------------------- @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available") @pytest.mark.asyncio async def test_cg3_walk_commit_delta_1000_commits_under_50ms(db_session: AsyncSession) -> None: """_walk_commit_delta for 1000 commits must complete in under 50ms.""" repo = await create_repo( db_session, name="cg-test-3", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) n = 1000 commits, tip = _make_linear_chain(n, seed="cg3") wire_bytes, mpack_key = _make_mpack_from_commits(commits) await _store_mpack(wire_bytes, mpack_key) await _enqueue_and_process(db_session, repo.repo_id, mpack_key, commits) from musehub.services.musehub_wire import _walk_commit_delta t0 = time.perf_counter() delta = await _walk_commit_delta(db_session, [tip], have=[]) elapsed_ms = (time.perf_counter() - t0) * 1000 assert len(delta) == n, f"expected {n} commits in delta, got {len(delta)}" assert elapsed_ms < 50, ( f"_walk_commit_delta took {elapsed_ms:.1f}ms for {n} commits — must be < 50ms" ) # --------------------------------------------------------------------------- # CG-4 # --------------------------------------------------------------------------- @pytest.mark.skipif(_PROCESS_JOB_MISSING, reason="process_mpack_index_job not yet available") @pytest.mark.asyncio async def test_cg4_walk_commit_delta_correctness_with_have(db_session: AsyncSession) -> None: """Delta walk with a non-empty 'have' set returns only the commits the receiver lacks.""" repo = await create_repo( db_session, name="cg-test-4", owner="gabriel", owner_user_id=compute_identity_id(b"gabriel"), visibility="public", initialize=False, ) n = 50 commits, tip = _make_linear_chain(n, seed="cg4") wire_bytes, mpack_key = _make_mpack_from_commits(commits) await _store_mpack(wire_bytes, mpack_key) await _enqueue_and_process(db_session, repo.repo_id, mpack_key, commits) # Receiver already has the first 20 commits. have_idx = 20 have_commit_id = commits[have_idx]["commit_id"] have = {c["commit_id"] for c in commits[: have_idx + 1]} from musehub.services.musehub_wire import _walk_commit_delta delta = await _walk_commit_delta(db_session, [tip], have=list(have), ) expected = {c["commit_id"] for c in commits[have_idx + 1:]} assert set(delta.keys()) == expected, ( f"delta wrong: expected {len(expected)} commits, got {len(delta)}\n" f"extra: {set(delta.keys()) - expected}\n" f"missing: {expected - set(delta.keys())}" )