"""Wire protocol step 2A — Pack performance gate (client-side). Ticket #45, Step 2A: BFS walk + mpack serialization must complete in under 5s. This test measures the two operations that constitute the client-side pack phase: build_mpack(root, [head], have=[]) — BFS walk + collect all objects msgpack.packb(mpack, use_bin_type) — serialize mpack to wire bytes Total < 5s Repo size: 100 commits, 600 unique objects, 4 KiB blobs (~2.4 MB raw). 600 objects sits just above _PRESIGN_OBJECT_THRESHOLD (500) — the size class that triggers the new mpack upload path instead of N individual PUTs. If the assertion fails, step 2A is not done. Do not move to step 2B. """ from __future__ import annotations import datetime import pathlib import time import msgpack import pytest from muse.core.object_store import write_object from muse.core.mpack import build_mpack from muse.core.paths import muse_dir from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.refs import write_branch_ref from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import blob_id # --------------------------------------------------------------------------- # Gate constants # --------------------------------------------------------------------------- _N_COMMITS = 100 _N_OBJECTS = 600 # just above _PRESIGN_OBJECT_THRESHOLD = 500 _BLOB_SIZE = 4096 # 4 KiB per object → ~2.4 MB raw total _TOTAL_GATE_S = 5.0 # --------------------------------------------------------------------------- # Repo fixture # --------------------------------------------------------------------------- def _make_repo(tmp: pathlib.Path) -> pathlib.Path: tmp.mkdir(parents=True, exist_ok=True) dot = muse_dir(tmp) dot.mkdir() (dot / "repo.json").write_text('{"repo_id":"step2a","owner":"gabriel"}') for d in ("commits", "snapshots", "objects"): (dot / d).mkdir() (dot / "refs" / "heads").mkdir(parents=True) (dot / "HEAD").write_text("ref: refs/heads/main\n") (dot / "config.toml").write_text("") return tmp def _populate(repo: pathlib.Path) -> str: """Write _N_OBJECTS blobs + _N_COMMITS chain; return tip commit ID.""" blobs: dict[str, str] = {} for i in range(_N_OBJECTS): data = f"step2a-{i:08d}-".encode() + b"x" * _BLOB_SIZE oid = blob_id(data) write_object(repo, oid, data) blobs[f"file_{i:04d}.py"] = oid sid = compute_snapshot_id(blobs) write_snapshot(repo, SnapshotRecord(snapshot_id=sid, manifest=blobs)) parent: str | None = None tip = "" for i in range(_N_COMMITS): ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) msg = f"commit-{i:05d}" cid = compute_commit_id( parent_ids=[parent] if parent else [], snapshot_id=sid, message=msg, committed_at_iso=ts.isoformat(), author="gabriel", ) rec = CommitRecord( commit_id=cid, branch="main", snapshot_id=sid, message=msg, committed_at=ts, parent_commit_id=parent, parent2_commit_id=None, author="gabriel", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", ) write_commit(repo, rec) parent = cid tip = cid write_branch_ref(repo, "main", tip) return tip # --------------------------------------------------------------------------- # THE test # --------------------------------------------------------------------------- def test_pack_step2a_performance_gate(tmp_path: pathlib.Path) -> None: """Step 2A gate: build_mpack + msgpack.packb must complete in under 5s. This is a pass/fail performance gate, not a benchmark. If the assertion fails, step 2A is not done and step 2B must not be attempted. """ repo = _make_repo(tmp_path / "repo") head = _populate(repo) t0 = time.perf_counter() mpack = build_mpack(repo, [head], have=[]) wire_bytes = msgpack.packb(mpack, use_bin_type=True) total_s = time.perf_counter() - t0 assert isinstance(wire_bytes, bytes), "msgpack.packb returned non-bytes" assert len(wire_bytes) > 0, "mpack serialized to empty bytes" n_commits = len(mpack.get("commits", [])) n_objects = len(mpack.get("objects", [])) wire_kb = len(wire_bytes) / 1024 assert n_commits == _N_COMMITS, ( f"expected {_N_COMMITS} commits in mpack, got {n_commits}" ) assert n_objects == _N_OBJECTS, ( f"expected {_N_OBJECTS} objects in mpack, got {n_objects}" ) assert total_s < _TOTAL_GATE_S, ( f"Step 2A FAIL: build_mpack + packb took {total_s:.2f}s — gate is {_TOTAL_GATE_S}s " f"({n_commits} commits, {n_objects} objects, {wire_kb:.1f} KiB wire)" ) print( f"\n Step 2A — Pack (client-side)\n" f" Commits: {n_commits}\n" f" Objects: {n_objects}\n" f" Wire size: {wire_kb:.1f} KiB\n" f" Total time: {total_s:.3f}s (gate {_TOTAL_GATE_S}s) ✅" )