"""TDD — mpack push is async: fast sync path + background indexing. The one principle: Content-addressing is a proof, not a label. The mpack is in MinIO. sha256(wire_bytes) == mpack_key is the proof. Everything that follows — commit rows, snapshot rows — is derived indexing. Derived indexing MUST NOT block the synchronous push response. Synchronous path (must complete < 1s for real-world repo): 1. GET mpack from MinIO 2. Verify sha256(wire_bytes) == mpack_key 3. Advance branch pointer 4. Enqueue mpack.index job 5. Return 200 Background path (mpack.index job): 1. GET mpack from MinIO 2. Reconstruct snapshot deltas → INSERT musehub_snapshots 3. INSERT commits → musehub_commits 4. Return counts Dimensions match the real musehub repo: ~1031 commits, ~700 files per snapshot, ~5 files changed per commit. """ from __future__ import annotations import datetime import hashlib import pathlib import time import httpx import msgpack import pytest import pytest_asyncio pytestmark = pytest.mark.skip(reason="muse wire protocol in flux") from httpx import AsyncClient, ASGITransport from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo, MusehubSnapshot, MusehubSnapshotRef from musehub.db.database import get_db from musehub.main import app from muse.core.object_store import write_object from muse.core.mpack import build_mpack from muse.core.paths import muse_dir from muse.core.snapshot import compute_commit_id, compute_snapshot_id from muse.core.commits import CommitRecord, write_commit from muse.core.refs import write_branch_ref from muse.core.snapshots import SnapshotRecord, write_snapshot from muse.core.types import blob_id # --------------------------------------------------------------------------- # Dimensions — match real musehub repo # --------------------------------------------------------------------------- _N_FILES = 700 _N_COMMITS = 1_031 _FILES_CHANGED = 5 _BLOB_SIZE = 512 # unpack-mpack does full indexing synchronously; gate is wall-clock for 1031 commits _SYNC_GATE_S = 10.0 # --------------------------------------------------------------------------- # Auth + fixtures # --------------------------------------------------------------------------- _AUTH_CTX = MSignContext( handle="gabriel", identity_id="sha256:" + "0" * 64, is_agent=False, is_admin=True, ) @pytest_asyncio.fixture() async def client(db_session: AsyncSession) -> None: async def _override_get_db() -> None: yield db_session app.dependency_overrides[get_db] = _override_get_db app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX async with AsyncClient( transport=ASGITransport(app=app), base_url="https://localhost:1337", ) as c: yield c app.dependency_overrides.clear() @pytest_asyncio.fixture() async def repo(client: AsyncClient) -> None: resp = await client.post( "/api/repos", json={"owner": "gabriel", "name": "mpack-async-e2e", "visibility": "public", "initialize": False}, ) assert resp.status_code in (200, 201), resp.text data = resp.json() yield data["slug"] await client.delete(f"/api/repos/{data['repoId']}") # --------------------------------------------------------------------------- # Local repo builder (shared with test_mpack_delta_e2e) # --------------------------------------------------------------------------- def _make_repo(tmp: pathlib.Path) -> pathlib.Path: tmp.mkdir(parents=True, exist_ok=True) dot = muse_dir(tmp) dot.mkdir() (dot / "repo.json").write_text('{"repo_id":"async-e2e","owner":"gabriel"}') for d in ("commits", "snapshots", "objects"): (dot / d).mkdir() (dot / "refs" / "heads").mkdir(parents=True) (dot / "HEAD").write_text("ref: refs/heads/main\n") (dot / "config.toml").write_text("") return tmp def _populate(repo: pathlib.Path) -> str: blob_ids: list[str] = [] for i in range(_N_FILES): data = f"base-{i:06d}".encode() + b"x" * _BLOB_SIZE oid = blob_id(data) write_object(repo, oid, data) blob_ids.append(oid) base_manifest: dict[str, str] = { f"src/file_{i:04d}.py": blob_ids[i] for i in range(_N_FILES) } parent: str | None = None tip = "" ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) for i in range(_N_COMMITS): manifest = dict(base_manifest) for j in range(_FILES_CHANGED): idx = (i * _FILES_CHANGED + j) % _N_FILES variant = f"commit-{i:05d}-file-{j}".encode() + b"y" * _BLOB_SIZE variant_oid = blob_id(variant) write_object(repo, variant_oid, variant) manifest[f"src/file_{idx:04d}.py"] = variant_oid sid = compute_snapshot_id(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=sid, manifest=manifest)) msg = f"commit-{i:05d}" cid = compute_commit_id( parent_ids=[parent] if parent else [], snapshot_id=sid, message=msg, committed_at_iso=ts.isoformat(), author="gabriel", ) write_commit(repo, CommitRecord( commit_id=cid, branch="main", snapshot_id=sid, message=msg, committed_at=ts, parent_commit_id=parent, parent2_commit_id=None, author="gabriel", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", )) parent = cid tip = cid ts += datetime.timedelta(seconds=60) write_branch_ref(repo, "main", tip) return tip # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- async def _upload_mpack(client: AsyncClient, repo_slug: str, wire_bytes: bytes) -> str: mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() presign_resp = await client.post( f"/gabriel/{repo_slug}/push/mpack-presign", content=msgpack.packb( {"mpack_key": mpack_key, "size_bytes": len(wire_bytes)}, use_bin_type=True, ), headers={"Content-Type": "application/x-msgpack"}, ) assert presign_resp.status_code == 200, presign_resp.text upload_url = presign_resp.json().get("upload_url") or presign_resp.json().get("uploadUrl") async with httpx.AsyncClient() as raw: put_resp = await raw.put(upload_url, content=wire_bytes) assert put_resp.status_code in (200, 204) return mpack_key # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_unpack_mpack_returns_fast( client: AsyncClient, repo: str, tmp_path: pathlib.Path ) -> None: """unpack-mpack must complete within the gate for the full repo. The synchronous path does all indexing inline (commits, snapshots, objects, branch advance). This gate proves the full pipeline stays within an acceptable wall-clock budget for the real-world repo dimensions. """ local_repo = _make_repo(tmp_path / "repo") head = _populate(local_repo) mpack = build_mpack(local_repo, [head], have=[]) wire_bytes = msgpack.packb(mpack, use_bin_type=True) mpack_key = await _upload_mpack(client, repo, wire_bytes) t0 = time.perf_counter() unpack_resp = await client.post( f"/gabriel/{repo}/push/unpack-mpack", content=msgpack.packb( {"mpack_key": mpack_key, "branch": "main", "head": head}, use_bin_type=True, ), headers={"Content-Type": "application/x-msgpack"}, ) elapsed = time.perf_counter() - t0 assert unpack_resp.status_code == 200, unpack_resp.text result = unpack_resp.json() # Branch pointer must be set refs_resp = await client.get(f"/gabriel/{repo}/refs") assert refs_resp.status_code == 200 assert refs_resp.json().get("branch_heads", {}).get("main") == head # Commits and snapshots must be written inline assert result.get("commits_written") == _N_COMMITS, result assert result.get("snapshots_written") == _N_COMMITS, result assert elapsed < _SYNC_GATE_S, ( f"unpack-mpack took {elapsed:.2f}s — gate is {_SYNC_GATE_S}s\n" f" {_N_COMMITS} commits × {_N_FILES} files × {_FILES_CHANGED} changed/commit" ) print( f"\n {_N_COMMITS} commits × {_N_FILES} files × {_FILES_CHANGED} changed/commit\n" f" sync elapsed: {elapsed*1000:.0f}ms (gate {_SYNC_GATE_S*1000:.0f}ms)\n" f" commits: {result.get('commits_written')}\n" f" snapshots: {result.get('snapshots_written')}" ) @pytest.mark.asyncio async def test_mpack_index_job_populates_commits_and_snapshots( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession ) -> None: """All commits and snapshots must be in PG after unpack-mpack returns. Indexing is synchronous — the route writes commits, snapshots, and objects inline before returning 200. This test verifies the DB state is correct. """ local_repo = _make_repo(tmp_path / "repo") head = _populate(local_repo) mpack = build_mpack(local_repo, [head], have=[]) wire_bytes = msgpack.packb(mpack, use_bin_type=True) mpack_key = await _upload_mpack(client, repo, wire_bytes) unpack_resp = await client.post( f"/gabriel/{repo}/push/unpack-mpack", content=msgpack.packb( {"mpack_key": mpack_key, "branch": "main", "head": head}, use_bin_type=True, ), headers={"Content-Type": "application/x-msgpack"}, ) assert unpack_resp.status_code == 200, unpack_resp.text result = unpack_resp.json() assert result.get("commits_written") == _N_COMMITS, result assert result.get("snapshots_written") == _N_COMMITS, result # Verify rows are actually in PG repo_row = (await db_session.execute( select(MusehubRepo).where(MusehubRepo.slug == repo) )).scalar_one() commit_count = (await db_session.execute( select(MusehubCommit) .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id) .where(MusehubCommitRef.repo_id == repo_row.repo_id) )).scalars().all() assert len(commit_count) >= _N_COMMITS snap_count = (await db_session.execute( select(MusehubSnapshot) .join(MusehubSnapshotRef, MusehubSnapshotRef.snapshot_id == MusehubSnapshot.snapshot_id) .where(MusehubSnapshotRef.repo_id == repo_row.repo_id) )).scalars().all() assert len(snap_count) >= _N_COMMITS @pytest.mark.asyncio async def test_branch_head_correct_before_indexing( client: AsyncClient, repo: str, tmp_path: pathlib.Path ) -> None: """Branch pointer is correct immediately after unpack-mpack, before worker runs. This is the fundamental correctness guarantee: push is 'done' once the branch pointer is updated. The indexing is eventual but the ref is immediate. """ local_repo = _make_repo(tmp_path / "repo") head = _populate(local_repo) mpack = build_mpack(local_repo, [head], have=[]) wire_bytes = msgpack.packb(mpack, use_bin_type=True) mpack_key = await _upload_mpack(client, repo, wire_bytes) unpack_resp = await client.post( f"/gabriel/{repo}/push/unpack-mpack", content=msgpack.packb( {"mpack_key": mpack_key, "branch": "main", "head": head}, use_bin_type=True, ), headers={"Content-Type": "application/x-msgpack"}, ) assert unpack_resp.status_code == 200, unpack_resp.text # Ref must be set — worker has NOT run yet refs_resp = await client.get(f"/gabriel/{repo}/refs") assert refs_resp.status_code == 200 assert refs_resp.json()["branch_heads"]["main"] == head