"""MuseWire CLI benchmark — times actual muse CLI commands against localhost and staging. Every operation uses the muse CLI. No HTTP clients. No internal imports. Usage: python3 tests/bench_cli.py --size xs python3 tests/bench_cli.py --size xs s m --hubs localhost staging python3 tests/bench_cli.py --size all --runs 3 """ from __future__ import annotations import argparse import itertools import json import math import os import re import shutil import statistics import subprocess import sys import tempfile import time from pathlib import Path from mnemonic import Mnemonic import datetime from muse.core import transport, mpack from muse.core.object_store import write_object, _created_object_shards from muse.core.paths import muse_dir, server_objects_dir # noqa: F401 from muse.core.snapshot import compute_commit_id, compute_snapshot_id from muse.core.commits import CommitRecord, write_commit from muse.core.refs import write_branch_ref from muse.core.snapshots import SnapshotRecord, write_snapshot from muse.core.types import blob_id, hash_file import musehub.services.musehub_wire as musehub_wire REPO_ROOT = Path(__file__).parent.parent LOCALHOST = "https://localhost:1337" STAGING = "https://staging.musehub.ai" HUB_URLS = {"localhost": LOCALHOST, "staging": STAGING} BENCH_PREFIX = "bench-" SEED_PREFIX = "bench-seed-" # persistent — never auto-purged CACHE_DIR = Path.home() / ".cache" / "muse_bench" # Repos that must never be deleted under any circumstances. _PERMANENT_REPOS: frozenset[str] = frozenset({ "muse", "musehub", "agentception", "contracts", "stori", "Stori", "maestro", "muse-zsh", "identity", }) # Exact pattern a transient bench repo name must match before deletion is allowed. # Covers all historical naming conventions: # bench-push-xs-0-abc123 (current: integer run index) # bench-push-xs-p2-abc123 (old: letter-prefixed run index) # bench-fetch-xs-abc123 (old: no run index, hex only) # bench-clone-xs-debug-abc123 (ad-hoc debug runs) # bench-reftest-abc123 (ref-test repos) # Also covers ad-hoc debug/diag repos created during debugging sessions. _TRANSIENT_RE = re.compile( r"^bench-(push|clone|fetch|pull)-[a-z]+-([a-z]*\d+|debug)-[0-9a-f]{6,}$" r"|^bench-(push|clone|fetch|pull)-[a-z]+-[0-9a-f]{6,}$" r"|^bench-reftest-[0-9a-f]{6,}$" r"|^(dbg|diag)\d*-[0-9a-f]{6,}$" ) # commits, files_per_commit, file_size_bytes SIZE_MATRIX = { "xs": (1, 1, 4_096), "s": (10, 5, 4_096), "m": (100, 5, 4_096), "l": (1_000, 5, 4_096), "xl": (2_000, 5, 4_096), } # gates in ms — None means no gate (expected to be slow / CF ceiling) GATES: dict[tuple[str, str, str], float | None] = { ("push", "xs", "localhost"): 2_000, ("push", "s", "localhost"): 5_000, ("push", "m", "localhost"): 15_000, ("push", "l", "localhost"): 15_000, ("push", "xl", "localhost"): None, ("clone", "xs", "localhost"): 2_000, ("clone", "s", "localhost"): 5_000, ("clone", "m", "localhost"): 15_000, ("clone", "l", "localhost"): 15_000, ("clone", "xl", "localhost"): None, ("fetch", "xs", "localhost"): 2_000, ("fetch", "s", "localhost"): 5_000, ("fetch", "m", "localhost"): 15_000, ("fetch", "l", "localhost"): 15_000, ("fetch", "xl", "localhost"): None, ("pull", "xs", "localhost"): 2_000, ("pull", "s", "localhost"): 5_000, ("pull", "m", "localhost"): 15_000, ("pull", "l", "localhost"): 15_000, ("pull", "xl", "localhost"): None, ("push", "xs", "staging"): 5_000, ("push", "s", "staging"): 10_000, ("push", "m", "staging"): 20_000, ("push", "l", "staging"): 30_000, ("push", "xl", "staging"): None, ("clone", "xs", "staging"): 5_000, ("clone", "s", "staging"): 10_000, ("clone", "m", "staging"): 20_000, ("clone", "l", "staging"): 30_000, ("clone", "xl", "staging"): None, ("fetch", "xs", "staging"): 5_000, ("fetch", "s", "staging"): 10_000, ("fetch", "m", "staging"): 20_000, ("fetch", "l", "staging"): 30_000, ("fetch", "xl", "staging"): None, ("pull", "xs", "staging"): 5_000, ("pull", "s", "staging"): 10_000, ("pull", "m", "staging"): 20_000, ("pull", "l", "staging"): 30_000, ("pull", "xl", "staging"): None, } # Source files whose content determines wire protocol correctness. # Resolved from the actual loaded modules — robust regardless of install path. _WIRE_SOURCES: list[Path] = [ Path(transport.__file__), Path(mpack.__file__), Path(musehub_wire.__file__), ] def wire_hash() -> str: """Return a hex digest of all wire protocol source files. Changes to pack.py, transport.py, mpack.py (client) or musehub_wire.py (server) produce a new hash, automatically invalidating stale seed caches. Uses muse.core.types.hash_file — the same content-addressing primitive used throughout the rest of the ecosystem. """ combined = b"".join( hash_file(p).encode() for p in _WIRE_SOURCES if p.exists() ) return blob_id(combined).split(":")[-1][:16] # ── muse wrappers ───────────────────────────────────────────────────────────── def muse(*args: str, cwd: Path, timeout: int = 300) -> subprocess.CompletedProcess: return subprocess.run( ["muse"] + list(args), cwd=str(cwd), capture_output=True, text=True, timeout=timeout, ) def muse_check(*args: str, cwd: Path, timeout: int = 300) -> str: r = muse(*args, cwd=cwd, timeout=timeout) if r.returncode != 0: raise RuntimeError(f"muse {' '.join(args)} failed:\n{r.stderr[:400]}") return r.stdout def timed_muse(*args: str, cwd: Path, timeout: int = 600) -> tuple[float, bool, str]: """Returns (elapsed_ms, success, error_snippet).""" t0 = time.perf_counter() r = muse(*args, cwd=cwd, timeout=timeout) ms = (time.perf_counter() - t0) * 1000 if r.stderr.strip(): for line in r.stderr.strip().splitlines(): print(f"\n [muse-log] {line}", flush=True) return ms, r.returncode == 0, r.stderr[:400] if r.returncode != 0 else "" def _wait_indexed(hub_url: str, slug: str, n_commits: int, *, timeout: int = 600) -> None: """Block until the hub has HEAD accessible via clone+read. Verifies with `muse read --json` (HEAD only) — avoids serializing thousands of commits with `muse log` which hangs on large repos. """ deadline = time.time() + timeout attempt = 0 t_start = time.time() tmp = Path(tempfile.mkdtemp(prefix="muse_probe_")) name = slug.split("/")[-1] clone_dir = tmp / name cloned = False try: while time.time() < deadline: attempt += 1 if not cloned: t_clone0 = time.time() r = muse("clone", f"{hub_url}/{slug}", cwd=tmp, timeout=300) clone_ms = (time.time() - t_clone0) * 1000 if r.stderr.strip(): for line in r.stderr.strip().splitlines(): print(f"\n [clone-log] {line}", flush=True) if r.returncode != 0: print(f"\n [_wait_indexed] attempt={attempt} clone FAILED in {clone_ms:.0f}ms: {r.stderr[-200:]}", flush=True) time.sleep(2) continue cloned = True else: t_fetch0 = time.time() fr = muse("fetch", "origin", cwd=clone_dir, timeout=300) clone_ms = (time.time() - t_fetch0) * 1000 if fr.stderr.strip(): for line in fr.stderr.strip().splitlines(): print(f"\n [fetch-log] {line}", flush=True) t_read0 = time.time() read_r = muse("read", "--json", cwd=clone_dir) read_ms = (time.time() - t_read0) * 1000 elapsed = time.time() - t_start if read_r.returncode != 0: print(f"\n [_wait_indexed] attempt={attempt} clone={clone_ms:.0f}ms read FAILED in {read_ms:.0f}ms: {read_r.stderr[:120]}", flush=True) else: try: commit_id = json.loads(read_r.stdout).get("commit_id", "") print(f"\n [_wait_indexed] attempt={attempt} elapsed={elapsed:.1f}s clone={clone_ms:.0f}ms read={read_ms:.0f}ms commit={commit_id[:16]}", flush=True) if commit_id: return except (ValueError, KeyError) as exc: print(f"\n [_wait_indexed] attempt={attempt} JSON parse error: {exc}", flush=True) time.sleep(2) finally: shutil.rmtree(tmp, ignore_errors=True) raise TimeoutError(f"{slug}: HEAD not indexed within {timeout}s") # ── repo lifecycle (muse CLI only) ──────────────────────────────────────────── def create_repo(hub_url: str, name: str) -> str: """Create a bench repo on hub. Returns slug.""" assert name.startswith(BENCH_PREFIX) out = muse_check( "hub", "repo", "create", "--name", name, "--visibility", "public", "--no-init", "--hub", hub_url, "--json", cwd=REPO_ROOT, ) return json.loads(out)["slug"] def _safe_delete_repo(hub_url: str, slug: str) -> None: """Delete a hub repo — only if it passes both independent safety guards. Guard 1 — permanent blocklist: repo name must not be in _PERMANENT_REPOS. Guard 2 — exact pattern: name must match _TRANSIENT_RE. Both must pass. Any failure raises AssertionError before any network call. """ name = slug.split("/")[-1] assert name.lower() not in {r.lower() for r in _PERMANENT_REPOS}, ( f"SAFETY: refusing to delete permanent repo '{slug}'" ) assert _TRANSIENT_RE.match(name), ( f"SAFETY: refusing to delete repo whose name doesn't match transient pattern: '{slug}'" ) r = muse("hub", "repo", "delete", slug, "--yes", "--hub", hub_url, "--json", cwd=REPO_ROOT) if r.returncode != 0 and "404" not in r.stderr: # Warn but don't raise — stale repos are purged at the next run's start. # Deletion can fail transiently if a background job is still writing to the repo. print(f"\n WARN: repo delete {slug} failed (will be purged next run): {r.stderr[:200]}", flush=True) def purge_stale(hub_url: str) -> None: out = muse_check("hub", "repo", "list", "--limit", "200", "--hub", hub_url, "--json", cwd=REPO_ROOT) repos = json.loads(out).get("repos", []) stale = [r for r in repos if _TRANSIENT_RE.match(r["name"])] if stale: print(f" purging {len(stale)} stale bench repo(s) on {hub_url}…") for r in stale: _safe_delete_repo(hub_url, r["slug"]) # ── local repo population ───────────────────────────────────────────────────── # BIP39 English wordlist — the same list used to back up your muse identity # mnemonic. Each bench file is a unique deterministic slice, formatted as verse. BIP39_WORDS: tuple[str, ...] = tuple(Mnemonic('english').wordlist) def bench_text(size: int, commit: int, file: int) -> bytes: """Generate a deterministic BIP39 verse of exactly *size* bytes. Each file is a unique slice of the BIP39 wordlist — the same list used to back up your muse identity mnemonic. Four words per line, six lines per stanza, blank line between stanzas. """ header = f"# muse bench commit={commit} file={file}\n\n" offset = (commit * 17 + file * 7) % len(BIP39_WORDS) words = itertools.islice(itertools.cycle(BIP39_WORDS[offset:] + BIP39_WORDS[:offset]), size) buf = [header] total = len(header.encode()) col = 0 row = 0 for word in words: chunk = word + (" " if col < 3 else "\n") buf.append(chunk) total += len(chunk.encode()) col = (col + 1) % 4 if col == 0: row += 1 if row % 6 == 0: buf.append("\n") total += 1 if total >= size: break return "".join(buf)[:size].encode() def make_local_repo(n_commits: int, files_per_commit: int, file_size: int) -> Path: """Create a tmpdir with a muse repo populated with n_commits commits.""" tmpdir = Path(tempfile.mkdtemp(prefix="muse_bench_")) muse_check("init", cwd=tmpdir) for ci in range(n_commits): for fi in range(files_per_commit): (tmpdir / f"f{ci}_{fi}.txt").write_bytes(bench_text(file_size, ci, fi)) muse_check("code", "add", ".", cwd=tmpdir) muse_check("commit", "-m", f"bench commit {ci}", "--agent-id", "bench", "--model-id", "bench", cwd=tmpdir) return tmpdir # ── persistent seed helpers (Phase 2–3 implementation) ─────────────────────── def ensure_local_seed(size: str, *, reseed: bool = False) -> Path: """Return path to a cached local muse repo seeded for *size*. Cache lives at CACHE_DIR/{size}/. Metadata is verified on every hit; stale or missing metadata triggers a full rebuild. reseed=True forces a rebuild even when metadata is valid. """ n_commits, files_per_commit, file_size = SIZE_MATRIX[size] seed_dir = CACHE_DIR / size meta_path = seed_dir / "cache_meta.json" def _valid_cache() -> bool: if not seed_dir.exists() or not meta_path.exists(): return False try: meta = json.loads(meta_path.read_text()) return ( meta.get("n_commits") == n_commits and meta.get("files_per_commit") == files_per_commit and meta.get("file_size") == file_size and meta.get("wire_hash") == wire_hash() ) except Exception: return False if not reseed and _valid_cache(): return seed_dir # Build (or rebuild) the seeded repo in-process — no subprocess per commit. if seed_dir.exists(): # Purge any stale shard-cache entries for this seed_dir before deleting # it. _created_object_shards is a module-level set in object_store that # skips mkdir on subsequent writes to the same shard path. Without this # purge, a reseed would rmtree the directory but leave stale entries in # the set, causing write_object to skip mkdir and fail with ENOENT. stale_prefix = str(seed_dir) + "/" stale = {s for s in _created_object_shards if s.startswith(stale_prefix)} _created_object_shards.difference_update(stale) shutil.rmtree(seed_dir) seed_dir.mkdir(parents=True) dot = muse_dir(seed_dir) dot.mkdir() repo_id = blob_id(f"bench-seed-{size}".encode()) (dot / "repo.json").write_text(json.dumps({"repo_id": repo_id, "owner": "gabriel"})) for d in ("commits", "snapshots", "objects"): (dot / d).mkdir() (dot / "refs" / "heads").mkdir(parents=True) (dot / "HEAD").write_text("ref: refs/heads/main\n") (dot / "config.toml").write_text("") ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) parent: str | None = None tip = "" for ci in range(n_commits): blobs: dict[str, str] = {} for fi in range(files_per_commit): data = bench_text(file_size, ci, fi) oid = blob_id(data) write_object(seed_dir, oid, data) blobs[f"f{ci}_{fi}.txt"] = oid sid = compute_snapshot_id(blobs) write_snapshot(seed_dir, SnapshotRecord(snapshot_id=sid, manifest=blobs)) cid = compute_commit_id( parent_ids=[parent] if parent else [], snapshot_id=sid, message=f"bench commit {ci}", committed_at_iso=ts.isoformat(), author="gabriel", ) write_commit(seed_dir, CommitRecord( commit_id=cid, branch="main", snapshot_id=sid, message=f"bench commit {ci}", committed_at=ts, parent_commit_id=parent, parent2_commit_id=None, author="gabriel", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="bench", model_id="bench", toolchain_id="", prompt_hash="", signature="", signer_key_id="", )) parent = cid tip = cid ts = ts + datetime.timedelta(seconds=1) write_branch_ref(seed_dir, "main", tip) meta_path.write_text(json.dumps({ "n_commits": n_commits, "files_per_commit": files_per_commit, "file_size": file_size, "wire_hash": wire_hash(), })) return seed_dir # In-process cache: once a hub seed is confirmed valid, skip all checks for the rest of the run. _hub_seed_cache: dict[tuple[str, str], str] = {} def ensure_hub_seed(hub_url: str, hub_alias: str, size: str, *, reseed: bool = False) -> str: """Ensure bench-seed-{size} exists on hub; return slug gabriel/bench-seed-{size}. Checks hub repo list first — pushes only when the repo is absent (or reseed=True). Never deletes the existing repo; reseed re-pushes on top. """ name = f"{SEED_PREFIX}{size}" slug = f"gabriel/{name}" cache_key = (hub_url, size) if not reseed and cache_key in _hub_seed_cache: return _hub_seed_cache[cache_key] current_hash = wire_hash() out = muse_check("hub", "repo", "list", "--limit", "200", "--hub", hub_url, "--json", cwd=REPO_ROOT) repos = json.loads(out).get("repos", []) match = next((r for r in repos if r["name"] == name), None) if not reseed and match is not None: # Validate wire_hash stored in repo description. desc = match.get("description", "") stored_hash = "" for part in desc.split(): if part.startswith("wire_hash="): stored_hash = part.split("=", 1)[1] if stored_hash == current_hash: # Also verify the push completed — a repo created via wizard but # never pushed has head_commit_id == the init placeholder. head_commit = match.get("head_commit_id") or match.get("head_commit") or "" if not head_commit: print(f" head_commit missing — rebuilding hub seed {name}…") else: _hub_seed_cache[cache_key] = slug return slug # Hash mismatch — delete stale seed and rebuild. print(f" wire_hash changed — rebuilding hub seed {name}…") muse_check("hub", "repo", "delete", slug, "--yes", "--hub", hub_url, "--json", cwd=REPO_ROOT) # Repo absent (or stale/reseed) — create it, then push local seed. if match is None or not reseed: pass # already deleted above if stale muse_check("hub", "repo", "create", "--name", name, "--description", f"wire_hash={current_hash}", "--visibility", "public", "--no-init", "--hub", hub_url, "--json", cwd=REPO_ROOT) seed_dir = ensure_local_seed(size) # Always reset origin so stale tracking refs never cause a silent no-op push. # bench_push uses the same remove+add pattern on its per-run copies. muse("remote", "remove", "origin", cwd=seed_dir) # ignore error if absent muse_check("remote", "add", "origin", f"{hub_url}/{slug}", cwd=seed_dir) muse_check("push", "origin", "main", cwd=seed_dir) n_commits, _, _ = SIZE_MATRIX[size] _wait_indexed(hub_url, slug, n_commits) _hub_seed_cache[cache_key] = slug return slug # ── verb benchmarks ─────────────────────────────────────────────────────────── def bench_push(hub_url: str, hub_alias: str, size: str, runs: int, cleanup: bool) -> list[float]: """Measure push throughput using the cached local seed repo. Creates a fresh hub repo per run (fast, ~100ms) and pushes the cached local seed into it. No commit creation overhead after the first run. """ _, _, file_size = SIZE_MATRIX[size] times: list[float] = [] local = ensure_local_seed(size) for run_i in range(runs): name = f"{BENCH_PREFIX}push-{size}-{run_i}-{os.urandom(3).hex()}" slug = create_repo(hub_url, name) # Wire a fresh remote for this run's hub repo. run_dir = Path(tempfile.mkdtemp(prefix="muse_bench_push_")) try: # Work from a copy so the cached seed's remote stays clean. shutil.copytree(str(local), str(run_dir / "repo"), symlinks=False) run_repo = run_dir / "repo" muse("remote", "remove", "origin", cwd=run_repo) # ignore error if absent muse_check("remote", "add", "origin", f"{hub_url}/gabriel/{slug}", cwd=run_repo) ms, ok, err = timed_muse("push", "origin", "main", cwd=run_repo) if ok: times.append(ms) print(f" {ms:.0f}ms", end="", flush=True) else: print(f"\n ERROR: {err}", flush=True) finally: shutil.rmtree(run_dir, ignore_errors=True) if cleanup: _safe_delete_repo(hub_url, slug) return times def bench_clone(hub_url: str, hub_alias: str, size: str, runs: int, cleanup: bool) -> list[float]: """Measure clone throughput against the persistent hub seed repo.""" print(f"\n HELLO WORLD bench_clone start hub={hub_url} size={size}", flush=True) times: list[float] = [] slug = ensure_hub_seed(hub_url, hub_alias, size) print(f"\n HELLO WORLD ensure_hub_seed done slug={slug}", flush=True) for _ in range(runs): clone_parent = Path(tempfile.mkdtemp(prefix="muse_bench_clone_")) try: ms, ok, err = timed_muse("clone", f"{hub_url}/{slug}", cwd=clone_parent) if ok: times.append(ms) print(f" {ms:.0f}ms", end="", flush=True) else: print(f"\n ERROR: {err}", flush=True) finally: shutil.rmtree(clone_parent, ignore_errors=True) return times def _bench_fetch_or_pull( verb: str, hub_url: str, hub_alias: str, size: str, runs: int, cleanup: bool, ) -> list[float]: """Measure fetch/pull against the persistent hub seed. User story: 1. Hub seed already exists (ensure_hub_seed). 2. Clone the seed — client is at the seeded state. 3. Add exactly 1 delta commit to a run-local copy and push it. 4. Measure fetch/pull from the clone (now 1 commit behind). The delta is always 1 commit — we measure wire-protocol latency, not local commit-creation overhead. """ n_commits, _, file_size = SIZE_MATRIX[size] times: list[float] = [] seed_slug = ensure_hub_seed(hub_url, hub_alias, size) for run_i in range(runs): # Per-run hub repo so delta pushes don't accumulate on the seed. run_name = f"{BENCH_PREFIX}{verb}-{size}-{run_i}-{os.urandom(3).hex()}" run_slug = create_repo(hub_url, run_name) run_dir = Path(tempfile.mkdtemp(prefix=f"muse_bench_{verb}_src_")) clone_parent = Path(tempfile.mkdtemp(prefix=f"muse_bench_{verb}_dst_")) try: _t = time.time def _step(label: str, t0: float) -> float: t1 = _t() print(f"\n [step] {label}: {(t1-t0)*1000:.0f}ms", flush=True) return t1 t0 = _t() # Copy local seed → run dir, push to run hub repo. local_seed = ensure_local_seed(size) shutil.copytree(str(local_seed), str(run_dir / "repo"), symlinks=False) run_repo = run_dir / "repo" t0 = _step("copytree", t0) muse("remote", "remove", "origin", cwd=run_repo) # ignore error if absent muse_check("remote", "add", "origin", f"{hub_url}/gabriel/{run_slug}", cwd=run_repo) muse_check("push", "origin", "main", cwd=run_repo) t0 = _step("seed push", t0) # Large mpacks defer commit writes to a background job — wait until # the server's commit graph is fully indexed before cloning. _wait_indexed(hub_url, f"gabriel/{run_slug}", n_commits) t0 = _step("wait_indexed seed", t0) # Clone run repo — client is now at seeded state. muse_check("clone", f"{hub_url}/gabriel/{run_slug}", cwd=clone_parent) cloned = clone_parent / run_slug t0 = _step("clone", t0) # Add exactly 1 delta commit and push. # Materialise the working tree first so the delta commit doesn't # incorrectly delete the seed files that are absent from disk. # --force discards the "pending deletions" muse sees for unwritten seed files. muse_check("checkout", "--force", "main", cwd=run_repo) (run_repo / f"delta_{run_i}.txt").write_bytes(bench_text(file_size, run_i, 0)) muse_check("code", "add", ".", cwd=run_repo) muse_check("commit", "-m", f"delta {run_i}", "--agent-id", "bench", "--model-id", "bench", cwd=run_repo) muse_check("push", "origin", "main", cwd=run_repo) t0 = _step("delta push", t0) # Wait for the delta commit to be indexed (inline for small mpacks, # but generation computation depends on seed commits being in commit_graph). _wait_indexed(hub_url, f"gabriel/{run_slug}", n_commits + 1) t0 = _step("wait_indexed delta", t0) # Measure fetch/pull (client is 1 commit behind). if verb == "fetch": ms, ok, err = timed_muse("fetch", "origin", cwd=cloned) else: ms, ok, err = timed_muse("pull", "origin", "main", cwd=cloned) _step(f"{verb} measurement", t0) if ok: times.append(ms) print(f" {ms:.0f}ms", end="", flush=True) else: print(f"\n ERROR: {err}", flush=True) finally: shutil.rmtree(run_dir, ignore_errors=True) shutil.rmtree(clone_parent, ignore_errors=True) if cleanup: _safe_delete_repo(hub_url, f"gabriel/{run_slug}") return times def bench_fetch(hub_url: str, hub_alias: str, size: str, runs: int, cleanup: bool) -> list[float]: return _bench_fetch_or_pull("fetch", hub_url, hub_alias, size, runs, cleanup) def bench_pull(hub_url: str, hub_alias: str, size: str, runs: int, cleanup: bool) -> list[float]: return _bench_fetch_or_pull("pull", hub_url, hub_alias, size, runs, cleanup) VERB_FNS = { "push": bench_push, "clone": bench_clone, "fetch": bench_fetch, "pull": bench_pull, } # ── output ──────────────────────────────────────────────────────────────────── def gate_str(verb: str, size: str, hub_alias: str, p50: float) -> str: g = GATES.get((verb, size, hub_alias)) if g is None: return "—" return f"✓ <{int(g/1000)}s" if p50 <= g else f"✗ <{int(g/1000)}s" def print_table(rows: list[tuple], size: str) -> None: cols = ["verb", "size", "hub", "p50 (ms)", "p95 (ms)", "gate"] widths = [6, 5, 12, 10, 10, 12] sep = " " print() print(f"muse CLI bench — size={size.upper()}") print("=" * (sum(widths) + len(sep) * (len(widths) - 1))) print(sep.join(c.ljust(w) for c, w in zip(cols, widths))) print(sep.join("-" * w for w in widths)) for verb, sz, hub_alias, times in rows: if not times: row = [verb, sz, hub_alias, "FAILED", "FAILED", "✗"] else: p50 = statistics.median(times) t = sorted(times) p95 = t[min(len(t) - 1, max(0, math.ceil(len(t) * 0.95) - 1))] row = [verb, sz, hub_alias, f"{p50:.0f}", f"{p95:.0f}", gate_str(verb, sz, hub_alias, p50)] print(sep.join(s.ljust(w) for s, w in zip(row, widths))) print() def markdown_table(rows: list[tuple], size: str) -> str: n_commits, fpc, fsz = SIZE_MATRIX[size] mb = n_commits * fpc * fsz // 1024 // 1024 lines = [ f"### {size.upper()} ({n_commits} commits, {n_commits*fpc} files, ~{mb or '<1'} MB)", "", "| verb | hub | p50 (ms) | p95 (ms) | gate |", "|------|-----|----------|----------|------|", ] for verb, sz, hub_alias, times in rows: if not times: lines.append(f"| {verb} | {hub_alias} | FAILED | FAILED | ✗ |") else: p50 = statistics.median(times) t = sorted(times) p95 = t[min(len(t) - 1, max(0, math.ceil(len(t) * 0.95) - 1))] lines.append(f"| {verb} | {hub_alias} | **{p50:.0f}** | {p95:.0f} |" f" {gate_str(verb, sz, hub_alias, p50)} |") return "\n".join(lines) # ── main ────────────────────────────────────────────────────────────────────── def main() -> None: parser = argparse.ArgumentParser() parser.add_argument("--size", nargs="+", default=["xs"], help="xs s m l xl all") parser.add_argument("--hubs", nargs="+", default=["localhost", "staging"], choices=["localhost", "staging"]) parser.add_argument("--verb", nargs="+", default=["push", "clone", "fetch", "pull"], choices=["push", "clone", "fetch", "pull"]) parser.add_argument("--runs", type=int, default=1) parser.add_argument("--no-cleanup", action="store_true") parser.add_argument("--reseed", action="store_true", help="Rebuild local seed cache even if valid") parser.add_argument("--reseed-hub", action="store_true", help="Re-push hub seed repos even if present") args = parser.parse_args() sizes = list(SIZE_MATRIX) if "all" in args.size else args.size cleanup = not args.no_cleanup print(f"muse CLI bench hubs={args.hubs} verbs={args.verb} " f"sizes={sizes} runs={args.runs}") print() all_markdown: list[str] = [] total_start = time.perf_counter() for size in sizes: for hub_alias in args.hubs: hub_url = HUB_URLS[hub_alias] purge_stale(hub_url) rows: list[tuple] = [] size_start = time.perf_counter() for verb in args.verb: for hub_alias in args.hubs: hub_url = HUB_URLS[hub_alias] verb_start = time.perf_counter() print(f" {verb}/{size}/{hub_alias}…", end="", flush=True) try: times = VERB_FNS[verb](hub_url, hub_alias, size, args.runs, cleanup) except Exception as exc: print(f"\n ERROR: {exc}", flush=True) times = [] verb_elapsed = (time.perf_counter() - verb_start) * 1000 # Inline verb summary: p50 + gate + total wall time for this verb if times: p50 = statistics.median(times) g = gate_str(verb, size, hub_alias, p50) print(f" → p50={p50:.0f}ms {g} (verb wall={verb_elapsed:.0f}ms)") else: print(f" → FAILED (verb wall={verb_elapsed:.0f}ms)") rows.append((verb, size, hub_alias, times)) size_elapsed = (time.perf_counter() - size_start) * 1000 print_table(rows, size) print(f" size={size.upper()} total: {size_elapsed:.0f}ms") all_markdown.append(markdown_table(rows, size)) total_elapsed = (time.perf_counter() - total_start) * 1000 print(f"\n ── overall: {total_elapsed:.0f}ms ──\n") if all_markdown: print("Markdown (copy to issue comment):") print("\n\n".join(all_markdown)) if __name__ == "__main__": main()