"""TDD — mpack push path must not read snapshot manifests redundantly. Today the mpack path reads 718K manifest entries three times: 1. snapshots_list loading — full manifests, redundant on the mpack path 2. collect_blob_ids — full manifests, used only for progress count 3. build_mpack/_build_snapshot_deltas — full manifests (the needed pass) Passes 1 and 2 are dead work on the mpack path. The fix: a new function collect_blob_ids_from_deltas() extracts object IDs directly from the already-computed delta list — zero additional disk reads. The mpack push path calls _build_snapshot_deltas once, then derives both the wire delta list AND the object ID set from that single pass. Correctness invariant: collect_blob_ids_from_deltas(deltas) == collect_blob_ids(repo, [head]) Performance gate for 1031 commits × 700 files × 5 changed/commit: collect_blob_ids_from_deltas: < 10ms (pure in-memory, no disk I/O) _build_snapshot_deltas: < 500ms (one disk read per snapshot) Dimensions match the real musehub repo. """ from __future__ import annotations import datetime import pathlib import time import pytest from muse.core.object_store import write_object from muse.core.mpack import ( _build_snapshot_deltas, collect_blob_ids, collect_blob_ids_from_deltas, ) from muse.core.paths import muse_dir from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.graph import iter_ancestors from muse.core.refs import write_branch_ref from muse.core.commits import ( CommitRecord, read_commit, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import blob_id # --------------------------------------------------------------------------- # Dimensions — match real musehub repo # --------------------------------------------------------------------------- _N_FILES = 700 _N_COMMITS = 1_031 _FILES_CHANGED = 5 _BLOB_SIZE = 512 # --------------------------------------------------------------------------- # Repo builder # --------------------------------------------------------------------------- def _make_repo(tmp: pathlib.Path) -> tuple[pathlib.Path, str]: """Return (repo_root, head_commit_id).""" tmp.mkdir(parents=True, exist_ok=True) dot = muse_dir(tmp) dot.mkdir() (dot / "repo.json").write_text('{"repo_id":"snap-load-test","owner":"gabriel"}') for d in ("commits", "snapshots", "objects"): (dot / d).mkdir() (dot / "refs" / "heads").mkdir(parents=True) (dot / "HEAD").write_text("ref: refs/heads/main\n") (dot / "config.toml").write_text("") blob_ids: list[str] = [] for i in range(_N_FILES): data = f"base-{i:06d}".encode() + b"x" * _BLOB_SIZE oid = blob_id(data) write_object(tmp, oid, data) blob_ids.append(oid) base_manifest: dict[str, str] = { f"src/file_{i:04d}.py": blob_ids[i] for i in range(_N_FILES) } parent: str | None = None tip = "" ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) for i in range(_N_COMMITS): manifest = dict(base_manifest) for j in range(_FILES_CHANGED): idx = (i * _FILES_CHANGED + j) % _N_FILES variant = f"commit-{i:05d}-file-{j}".encode() + b"y" * _BLOB_SIZE variant_oid = blob_id(variant) write_object(tmp, variant_oid, variant) manifest[f"src/file_{idx:04d}.py"] = variant_oid sid = compute_snapshot_id(manifest) write_snapshot(tmp, SnapshotRecord(snapshot_id=sid, manifest=manifest)) msg = f"commit-{i:05d}" cid = compute_commit_id( parent_ids=[parent] if parent else [], snapshot_id=sid, message=msg, committed_at_iso=ts.isoformat(), author="gabriel", ) write_commit(tmp, CommitRecord( commit_id=cid, branch="main", snapshot_id=sid, message=msg, committed_at=ts, parent_commit_id=parent, parent2_commit_id=None, author="gabriel", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", )) parent = cid tip = cid ts += datetime.timedelta(seconds=60) write_branch_ref(tmp, "main", tip) return tmp, tip # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- def test_collect_blob_ids_from_deltas_correctness(tmp_path: pathlib.Path) -> None: """collect_blob_ids_from_deltas returns the same set as collect_blob_ids. The delta-based method must be a drop-in replacement for the manifest-scan method: same objects, same count. This is the correctness invariant. """ repo, head = _make_repo(tmp_path / "repo") commits_oldest_first = list(reversed(list(iter_ancestors(repo, [head])))) deltas = _build_snapshot_deltas(repo, commits_oldest_first) ids_from_deltas = set(collect_blob_ids_from_deltas(deltas)) ids_from_manifests = set(collect_blob_ids(repo, [head])) assert ids_from_deltas == ids_from_manifests, ( f"delta method returned {len(ids_from_deltas)} IDs, " f"manifest method returned {len(ids_from_manifests)} IDs\n" f" only in deltas: {len(ids_from_deltas - ids_from_manifests)}\n" f" only in manifests: {len(ids_from_manifests - ids_from_deltas)}" ) def test_collect_blob_ids_from_deltas_is_fast(tmp_path: pathlib.Path) -> None: """collect_blob_ids_from_deltas must run in < 10ms — it is pure in-memory. The deltas are already computed (one disk read per snapshot in _build_snapshot_deltas). Extracting oids from delta_upsert.values() is pure dict iteration — no disk I/O, no hashing, no sorting beyond the final dedup. This gate proves we are not sneaking any per-delta disk reads back in. """ repo, head = _make_repo(tmp_path / "repo") commits_oldest_first = list(reversed(list(iter_ancestors(repo, [head])))) deltas = _build_snapshot_deltas(repo, commits_oldest_first) t0 = time.perf_counter() ids = collect_blob_ids_from_deltas(deltas) elapsed_ms = (time.perf_counter() - t0) * 1000 assert elapsed_ms < 10, ( f"collect_blob_ids_from_deltas took {elapsed_ms:.1f}ms — " f"expected < 10ms (pure in-memory, no disk I/O)\n" f" {len(ids)} object IDs from {len(deltas)} deltas" ) print( f"\n {_N_COMMITS} commits × {_N_FILES} files × {_FILES_CHANGED} changed\n" f" delta count: {len(deltas)}\n" f" object count: {len(ids)}\n" f" elapsed: {elapsed_ms:.2f}ms" ) def test_build_snapshot_deltas_single_pass(tmp_path: pathlib.Path) -> None: """_build_snapshot_deltas reads each snapshot exactly once. Proves that the authoritative manifest pass is O(N) disk reads, not O(N×F). After this pass, all subsequent operations (collect_blob_ids_from_deltas, build_mpack_from_walk) must derive their data from the delta list — no additional snapshot reads. """ repo, head = _make_repo(tmp_path / "repo") read_count = 0 from muse.core.snapshots import read_snapshot as original_read_snapshot import muse.core.mpack as _pack calls: list[str] = [] def counting_read_snapshot(root: pathlib.Path, sid: str) -> "SnapshotRecord | None": calls.append(sid) return original_read_snapshot(root, sid) _pack.read_snapshot = counting_read_snapshot try: commits_oldest_first = list(reversed(list(iter_ancestors(repo, [head])))) deltas = _build_snapshot_deltas(repo, commits_oldest_first) unique_reads = len(set(calls)) finally: _pack.read_snapshot = original_read_snapshot assert unique_reads == _N_COMMITS, ( f"_build_snapshot_deltas read {unique_reads} unique snapshots, " f"expected exactly {_N_COMMITS}" ) assert len(calls) == _N_COMMITS, ( f"_build_snapshot_deltas called read_snapshot {len(calls)} times, " f"expected exactly {_N_COMMITS} (one per commit, no duplicates)" )