""" EXTREME performance test suite for ``muse coord`` — Linus Torvalds porting Linux from Git to Muse. Scenario: 7 000+ agents, 150 000+ files, kernel subsystems as task clusters. We want to break Muse Coord. We want to find where the edge is and go beyond. Bottlenecks targeted: 1. _gather_local_records — O(N) file reads, no caching, no batching 2. _write_remote_records — mkdir() called PER record (N syscalls vs 7) 3. run_coord_gc — releases directory scanned TWICE (load_released_ids + load_all_releases); active_reservations does 3 full directory scans 4. JSON payload scaling — 50B / 1 KB / 10 KB / 50 KB per record 5. Batch loop overhead — pure CPU cost of splitting 7 000 records into batches 6. Memory footprint — 10 000 records held in memory simultaneously 7. End-to-end throughput — records/second metric Existing tests in test_core_coord_bus.py (do NOT duplicate): - 500-record push serialization < 1 s - 1000-record pull parse < 1 s - 100 sequential push_to_hub < 2 s - 100k _build_url calls < 1 s """ from __future__ import annotations import itertools import json import os import pathlib import sys import time import tracemalloc from collections.abc import Callable from typing import TYPE_CHECKING from unittest.mock import MagicMock, patch import pytest from muse.core.types import MsgpackDict, content_hash from muse.core.coord_bus import JsonDict from muse.core.paths import coordination_dir, muse_dir if TYPE_CHECKING: from muse.core.coordination import CoordGcResult, Reservation from muse.core.transport import SigningIdentity # ── helpers ────────────────────────────────────────────────────────────────── _id_seq = itertools.count() def _new_id() -> str: return content_hash({"seq": next(_id_seq)}) _ALL_KINDS = ("reservation", "intent", "release", "heartbeat", "dependency", "task", "claim") _KIND_SUBDIR = { "reservation": "reservations", "intent": "intents", "release": "releases", "heartbeat": "heartbeats", "dependency": "dependencies", "task": "tasks", "claim": "claims", } _FUTURE_TS = "2099-12-31T23:59:59+00:00" _PAST_TS = "2000-01-01T00:00:00+00:00" def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: muse_dir(tmp_path).mkdir(parents=True, exist_ok=True) return tmp_path def _coord_dir(root: pathlib.Path) -> pathlib.Path: d = coordination_dir(root) d.mkdir(parents=True, exist_ok=True) return d def _write_records(root: pathlib.Path, kind: str, n: int, payload_bytes: int = 64) -> None: """Write *n* minimal records of *kind* to the local coordination store.""" subdir = _KIND_SUBDIR[kind] d = _coord_dir(root) / subdir d.mkdir(parents=True, exist_ok=True) padding = "x" * max(0, payload_bytes - 80) for i in range(n): rid = f"{kind}-{i:06d}" if kind == "reservation": rec = { "reservation_id": rid, "run_id": f"run-{i}", "expires_at": _FUTURE_TS, "symbols": [f"kernel/subsys_{i % 50}/module.c::func_{i}"], "_pad": padding, } elif kind == "heartbeat": rec = { "run_id": rid, "extended_expires_at": _FUTURE_TS, "expires_at": _FUTURE_TS, "_pad": padding, } elif kind == "intent": rec = { "intent_id": rid, "run_id": f"run-{i}", "reservation_id": f"reservation-{i:06d}", "operation": "modify", "target": f"kernel/subsys_{i % 50}/module.c::func_{i}", "_pad": padding, } elif kind == "release": rec = { "release_id": rid, "reservation_id": f"reservation-{i:06d}", "run_id": f"run-{i}", "released_at": _FUTURE_TS, "_pad": padding, } elif kind == "dependency": rec = { "reservation_id": rid, "depends_on": [f"reservation-{j:06d}" for j in range(min(3, i))], "_pad": padding, } elif kind == "task": rec = { "task_id": rid, "run_id": f"run-{i}", "description": f"patch kernel subsystem {i % 50}", "_pad": padding, } elif kind == "claim": rec = { "task_id": rid, "claimer_run_id": f"run-{i}", "expires_at": _FUTURE_TS, "_pad": padding, } else: rec = {"id": rid, "_pad": padding} (d / f"{rid}.json").write_text(json.dumps(rec), encoding="utf-8") def _write_expired_reservations(root: pathlib.Path, n: int) -> None: """Write *n* expired reservations (expires_at in the past, no heartbeat, no release).""" d = _coord_dir(root) / "reservations" d.mkdir(parents=True, exist_ok=True) for i in range(n): rid = f"exp-res-{i:06d}" rec = { "reservation_id": rid, "run_id": f"run-{i}", "expires_at": _PAST_TS, "symbols": [f"drivers/block/hd_{i}.c::init"], } (d / f"{rid}.json").write_text(json.dumps(rec), encoding="utf-8") def _write_released_reservations(root: pathlib.Path, n: int) -> None: """Write *n* pairs: reservation + release tombstone (released in the far past).""" res_dir = _coord_dir(root) / "reservations" rel_dir = _coord_dir(root) / "releases" res_dir.mkdir(parents=True, exist_ok=True) rel_dir.mkdir(parents=True, exist_ok=True) for i in range(n): rid = f"rel-res-{i:06d}" rec = { "reservation_id": rid, "run_id": f"run-{i}", "expires_at": _PAST_TS, "symbols": [f"net/ipv4/tcp_{i}.c::send"], } (res_dir / f"{rid}.json").write_text(json.dumps(rec), encoding="utf-8") tombstone = { "release_id": f"release-{i:06d}", "reservation_id": rid, "run_id": f"run-{i}", "released_at": _PAST_TS, } (rel_dir / f"{rid}.json").write_text(json.dumps(tombstone), encoding="utf-8") # ── import targets ──────────────────────────────────────────────────────────── def _import_gather() -> Callable[[pathlib.Path, list[str]], list[JsonDict]]: from muse.cli.commands.coord_sync import _gather_local_records return _gather_local_records def _import_write_remote() -> Callable[[pathlib.Path, list[JsonDict]], None]: from muse.cli.commands.coord_sync import _write_remote_records return _write_remote_records def _import_run_coord_gc() -> Callable[..., CoordGcResult]: from muse.core.coordination import run_coord_gc return run_coord_gc def _import_active_reservations() -> Callable[[pathlib.Path], list[Reservation]]: from muse.core.coordination import active_reservations return active_reservations # ============================================================================= # 1. _gather_local_records — O(N) file I/O # ============================================================================= class TestCoordPerfGather: """ _gather_local_records reads every file on every call. These tests measure raw throughput at increasing scale and expose the O(N) cost. """ def test_gather_100_reservations_under_200ms(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_records(root, "reservation", 100) gather = _import_gather() t0 = time.monotonic() records = gather(root, ["reservation"]) elapsed = time.monotonic() - t0 assert len(records) == 100 assert elapsed < 0.200, f"gather 100 reservations took {elapsed:.3f}s (> 200ms)" def test_gather_500_reservations_under_500ms(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_records(root, "reservation", 500) gather = _import_gather() t0 = time.monotonic() records = gather(root, ["reservation"]) elapsed = time.monotonic() - t0 assert len(records) == 500 assert elapsed < 0.500, f"gather 500 reservations took {elapsed:.3f}s (> 500ms)" def test_gather_1000_reservations_under_1s(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_records(root, "reservation", 1000) gather = _import_gather() t0 = time.monotonic() records = gather(root, ["reservation"]) elapsed = time.monotonic() - t0 assert len(records) == 1000 assert elapsed < 1.0, f"gather 1000 reservations took {elapsed:.3f}s (> 1s)" @pytest.mark.slow def test_gather_5000_reservations_under_5s(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_records(root, "reservation", 5000) gather = _import_gather() t0 = time.monotonic() records = gather(root, ["reservation"]) elapsed = time.monotonic() - t0 assert len(records) == 5000 assert elapsed < 5.0, f"gather 5000 reservations took {elapsed:.3f}s (> 5s)" @pytest.mark.slow def test_gather_all_7_kinds_1000_each_under_7s(self, tmp_path: pathlib.Path) -> None: """7000 files across 7 kinds — worst-case full Linux kernel scenario.""" root = _make_repo(tmp_path) for kind in _ALL_KINDS: _write_records(root, kind, 1000) gather = _import_gather() t0 = time.monotonic() records = gather(root, list(_ALL_KINDS)) elapsed = time.monotonic() - t0 assert len(records) == 7000 assert elapsed < 7.0, f"gather 7000 records (all kinds) took {elapsed:.3f}s (> 7s)" def test_cold_vs_warm_gather_same_directory(self, tmp_path: pathlib.Path) -> None: """ Second call must not be dramatically slower than the first. There is no caching; both calls read from disk. The second call benefits only from OS page-cache effects — that is acceptable. """ root = _make_repo(tmp_path) _write_records(root, "reservation", 200) gather = _import_gather() t0 = time.monotonic() gather(root, ["reservation"]) cold = time.monotonic() - t0 t0 = time.monotonic() gather(root, ["reservation"]) warm = time.monotonic() - t0 # Warm should not be more than 4× cold (OS cache should help). # We do NOT assert warm < cold because there is no in-process cache. assert warm < max(cold * 4, 0.500), ( f"warm gather ({warm:.3f}s) is unexpectedly slow vs cold ({cold:.3f}s)" ) def test_gather_empty_directory_is_fast(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _coord_dir(root) # creates .muse/coordination/ but no kind subdirs gather = _import_gather() t0 = time.monotonic() for _ in range(100): gather(root, list(_ALL_KINDS)) elapsed = time.monotonic() - t0 assert elapsed < 0.200, f"100 × empty gather took {elapsed:.3f}s (> 200ms)" def test_gather_single_kind_ignores_other_dirs(self, tmp_path: pathlib.Path) -> None: """Filtering to one kind must not scan 6 other directories.""" root = _make_repo(tmp_path) for kind in _ALL_KINDS: _write_records(root, kind, 200) # 1400 files total gather = _import_gather() t0 = time.monotonic() records = gather(root, ["reservation"]) elapsed = time.monotonic() - t0 assert len(records) == 200 assert elapsed < 0.500, ( f"single-kind gather across 1400-file repo took {elapsed:.3f}s (> 500ms)" ) def test_gather_throughput_records_per_second(self, tmp_path: pathlib.Path) -> None: """ Measures records/second. Documents current throughput so regressions become visible. On modern SSD hardware ≥ 1 000 records/s expected. """ root = _make_repo(tmp_path) _write_records(root, "reservation", 500) gather = _import_gather() # Warm up OS page cache gather(root, ["reservation"]) t0 = time.monotonic() records = gather(root, ["reservation"]) elapsed = time.monotonic() - t0 rps = len(records) / elapsed if elapsed > 0 else float("inf") # This is a documentation assertion — fail loudly if throughput crashes. assert rps >= 500, f"gather throughput {rps:.0f} records/s is below minimum 500/s" # ============================================================================= # 2. _write_remote_records — mkdir() per record # ============================================================================= class TestCoordPerfWriteRemote: """ _write_remote_records calls kind_dir.mkdir(parents=True, exist_ok=True) for EVERY record. With 7 distinct kinds and 7 000 records that is 7 000 mkdir syscalls. These tests measure that overhead and expose the regression path. """ def test_write_100_records_7_kinds_under_500ms(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) write_remote = _import_write_remote() records = [] for i in range(100): kind = _ALL_KINDS[i % len(_ALL_KINDS)] records.append({ "kind": kind, "record_id": _new_id(), "run_id": f"run-{i}", "payload": {"data": "x" * 64}, "expires_at": _FUTURE_TS, }) t0 = time.monotonic() write_remote(root, records) elapsed = time.monotonic() - t0 assert elapsed < 0.500, f"write 100 remote records took {elapsed:.3f}s (> 500ms)" # Verify files actually written remote_dir = coordination_dir(root) / "remote" written = sum(1 for _ in remote_dir.rglob("*.json")) assert written == 100 def test_write_1000_records_7_kinds_under_3s(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) write_remote = _import_write_remote() records = [ { "kind": _ALL_KINDS[i % len(_ALL_KINDS)], "record_id": _new_id(), "run_id": f"run-{i}", "payload": {"idx": i}, "expires_at": _FUTURE_TS, } for i in range(1000) ] t0 = time.monotonic() write_remote(root, records) elapsed = time.monotonic() - t0 assert elapsed < 3.0, f"write 1000 remote records took {elapsed:.3f}s (> 3s)" @pytest.mark.slow def test_write_7000_records_mkdir_overhead_documented(self, tmp_path: pathlib.Path) -> None: """ Documents the mkdir-per-record overhead at full Linux kernel scale. 7 000 records × 7 kinds = 7 000 mkdir() calls instead of the optimal 7. This test is expected to PASS with the current implementation. Its purpose: if a future optimization pre-creates directories (7 calls), the elapsed time should drop by ~30-50%. A regression would show here. """ root = _make_repo(tmp_path) write_remote = _import_write_remote() records = [ { "kind": _ALL_KINDS[i % len(_ALL_KINDS)], "record_id": _new_id(), "run_id": f"run-{i}", "payload": {"idx": i}, "expires_at": None, } for i in range(7000) ] t0 = time.monotonic() write_remote(root, records) elapsed = time.monotonic() - t0 remote_dir = coordination_dir(root) / "remote" written = sum(1 for _ in remote_dir.rglob("*.json")) assert written == 7000, f"expected 7000 files, got {written}" # Generous bound — this is the CURRENT (suboptimal) ceiling assert elapsed < 20.0, f"write 7000 remote records took {elapsed:.3f}s (> 20s)" def test_write_throughput_records_per_second(self, tmp_path: pathlib.Path) -> None: """Baseline throughput for write_remote_records at 500 records.""" root = _make_repo(tmp_path) write_remote = _import_write_remote() records = [ { "kind": _ALL_KINDS[i % len(_ALL_KINDS)], "record_id": _new_id(), "run_id": f"run-{i}", "payload": {"idx": i}, "expires_at": None, } for i in range(500) ] t0 = time.monotonic() write_remote(root, records) elapsed = time.monotonic() - t0 rps = 500 / elapsed if elapsed > 0 else float("inf") assert rps >= 100, f"write_remote throughput {rps:.0f} records/s is below minimum 100/s" def test_write_remote_overwrites_are_not_slower_than_first_write(self, tmp_path: pathlib.Path) -> None: """Overwriting 500 records (second call) must not be dramatically slower.""" root = _make_repo(tmp_path) write_remote = _import_write_remote() records = [ { "kind": _ALL_KINDS[i % len(_ALL_KINDS)], "record_id": _new_id(), "run_id": f"run-{i}", "payload": {"idx": i}, "expires_at": None, } for i in range(500) ] # First write — directories created t0 = time.monotonic() write_remote(root, records) first = time.monotonic() - t0 # Second write — exist_ok=True, but still 500 mkdir() syscalls t0 = time.monotonic() write_remote(root, records) second = time.monotonic() - t0 # Second should not be more than 5× the first (exist_ok adds a stat call) assert second < max(first * 5, 2.0), ( f"overwrite pass ({second:.3f}s) is unexpectedly slow vs first write ({first:.3f}s)" ) # ============================================================================= # 3. run_coord_gc — double-scan of releases, triple active_reservations scan # ============================================================================= class TestCoordPerfGC: """ run_coord_gc loads releases TWICE: load_released_ids() + load_all_releases(). These tests expose that cost at scale. """ def test_gc_1000_expired_reservations_under_3s(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_expired_reservations(root, 1000) run_coord_gc = _import_run_coord_gc() t0 = time.monotonic() result = run_coord_gc(root, dry_run=False, grace_period_seconds=0) elapsed = time.monotonic() - t0 assert result.reservations_removed == 1000 assert elapsed < 3.0, f"GC 1000 expired reservations took {elapsed:.3f}s (> 3s)" def test_gc_1000_released_reservations_under_4s(self, tmp_path: pathlib.Path) -> None: """ Released reservations force the double-scan: load_released_ids() then load_all_releases(). 1 000 release tombstones = 2 × 1 000 file reads. """ root = _make_repo(tmp_path) _write_released_reservations(root, 1000) run_coord_gc = _import_run_coord_gc() t0 = time.monotonic() result = run_coord_gc(root, dry_run=False, grace_period_seconds=0) elapsed = time.monotonic() - t0 assert result.reservations_removed == 1000 assert elapsed < 4.0, ( f"GC 1000 released reservations took {elapsed:.3f}s (> 4s — double-scan cost)" ) @pytest.mark.slow def test_gc_5000_expired_reservations_under_15s(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_expired_reservations(root, 5000) run_coord_gc = _import_run_coord_gc() t0 = time.monotonic() result = run_coord_gc(root, dry_run=False, grace_period_seconds=0) elapsed = time.monotonic() - t0 assert result.reservations_removed == 5000 assert elapsed < 15.0, f"GC 5000 expired reservations took {elapsed:.3f}s (> 15s)" def test_gc_dry_run_is_not_slower_than_live_run(self, tmp_path: pathlib.Path) -> None: """ Dry-run must not be significantly slower than a live run at the same scale. Both run the same 4 directory scans; the only difference is no unlink(). """ root_live = _make_repo(tmp_path / "live") root_dry = _make_repo(tmp_path / "dry") _write_expired_reservations(root_live, 300) _write_expired_reservations(root_dry, 300) run_coord_gc = _import_run_coord_gc() t0 = time.monotonic() run_coord_gc(root_live, dry_run=False, grace_period_seconds=0) live_elapsed = time.monotonic() - t0 t0 = time.monotonic() run_coord_gc(root_dry, dry_run=True, grace_period_seconds=0) dry_elapsed = time.monotonic() - t0 # Dry-run has no unlink() calls — it should not be slower than live run assert dry_elapsed < max(live_elapsed * 3, 1.0), ( f"dry_run ({dry_elapsed:.3f}s) is unexpectedly slower than live ({live_elapsed:.3f}s)" ) def test_gc_empty_repo_is_fast(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _coord_dir(root) run_coord_gc = _import_run_coord_gc() t0 = time.monotonic() for _ in range(50): run_coord_gc(root, dry_run=True, grace_period_seconds=0) elapsed = time.monotonic() - t0 assert elapsed < 0.500, f"50 × GC on empty repo took {elapsed:.3f}s (> 500ms)" def test_gc_throughput_reservations_per_second(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_expired_reservations(root, 500) run_coord_gc = _import_run_coord_gc() t0 = time.monotonic() result = run_coord_gc(root, dry_run=False, grace_period_seconds=0) elapsed = time.monotonic() - t0 rps = result.reservations_removed / elapsed if elapsed > 0 else float("inf") assert rps >= 100, f"GC throughput {rps:.0f} reservations/s is below minimum 100/s" # ============================================================================= # 4. active_reservations — triple directory scan # ============================================================================= class TestCoordPerfActiveReservations: """ active_reservations calls: load_released_ids() — scan 1: releases/ load_heartbeat_map() — scan 2: heartbeats/ load_all_reservations() — scan 3: reservations/ At 1 000 live reservations this is 3 × 1 000 file reads. """ def test_active_reservations_500_under_1_5s(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_records(root, "reservation", 500) active_reservations = _import_active_reservations() t0 = time.monotonic() result = active_reservations(root) elapsed = time.monotonic() - t0 assert len(result) == 500 assert elapsed < 1.5, f"active_reservations(500) took {elapsed:.3f}s (> 1.5s)" def test_active_reservations_1000_under_3s(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_records(root, "reservation", 1000) active_reservations = _import_active_reservations() t0 = time.monotonic() result = active_reservations(root) elapsed = time.monotonic() - t0 assert len(result) == 1000 assert elapsed < 3.0, f"active_reservations(1000) took {elapsed:.3f}s (> 3s)" @pytest.mark.slow def test_active_reservations_3000_under_9s(self, tmp_path: pathlib.Path) -> None: """3 000 reservations × 3 directory scans = ~9 000 file reads.""" root = _make_repo(tmp_path) _write_records(root, "reservation", 3000) active_reservations = _import_active_reservations() t0 = time.monotonic() result = active_reservations(root) elapsed = time.monotonic() - t0 assert len(result) == 3000 assert elapsed < 9.0, f"active_reservations(3000) took {elapsed:.3f}s (> 9s — triple-scan)" def test_active_reservations_with_heartbeats_not_slower_than_without(self, tmp_path: pathlib.Path) -> None: """ Adding heartbeat files adds a second directory scan. Measure the overhead explicitly to document the triple-scan cost. """ root_no_hb = _make_repo(tmp_path / "no_hb") root_hb = _make_repo(tmp_path / "hb") _write_records(root_no_hb, "reservation", 200) _write_records(root_hb, "reservation", 200) _write_records(root_hb, "heartbeat", 200) active_reservations = _import_active_reservations() t0 = time.monotonic() active_reservations(root_no_hb) no_hb_elapsed = time.monotonic() - t0 t0 = time.monotonic() active_reservations(root_hb) hb_elapsed = time.monotonic() - t0 # With heartbeats should not be more than 5× slower (both are O(N)) assert hb_elapsed < max(no_hb_elapsed * 5, 1.0), ( f"active_reservations with heartbeats ({hb_elapsed:.3f}s) " f"is unexpectedly slow vs without ({no_hb_elapsed:.3f}s)" ) def test_active_reservations_throughput_per_second(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_records(root, "reservation", 300) active_reservations = _import_active_reservations() # Warm up active_reservations(root) t0 = time.monotonic() result = active_reservations(root) elapsed = time.monotonic() - t0 rps = len(result) / elapsed if elapsed > 0 else float("inf") assert rps >= 100, ( f"active_reservations throughput {rps:.0f} records/s is below minimum 100/s" ) # ============================================================================= # 5. JSON payload scaling # ============================================================================= class TestCoordPerfPayloadScaling: """ Measures how _gather_local_records and _write_remote_records degrade as payload sizes grow from 50 B to 50 KB per record. """ @pytest.mark.parametrize("payload_bytes,n,max_seconds", [ (50, 500, 1.0), (1024, 500, 2.0), (10240, 200, 3.0), (51200, 50, 3.0), ]) def test_gather_payload_scaling(self, tmp_path: pathlib.Path, payload_bytes: int, n: int, max_seconds: float) -> None: root = _make_repo(tmp_path) _write_records(root, "reservation", n, payload_bytes=payload_bytes) gather = _import_gather() t0 = time.monotonic() records = gather(root, ["reservation"]) elapsed = time.monotonic() - t0 assert len(records) == n assert elapsed < max_seconds, ( f"gather {n} × {payload_bytes}B records took {elapsed:.3f}s (> {max_seconds}s)" ) @pytest.mark.parametrize("payload_bytes,n,max_seconds", [ (50, 500, 2.0), (1024, 500, 3.0), (10240, 200, 3.0), (51200, 50, 2.0), ]) def test_write_remote_payload_scaling(self, tmp_path: pathlib.Path, payload_bytes: int, n: int, max_seconds: float) -> None: root = _make_repo(tmp_path) write_remote = _import_write_remote() records = [ { "kind": "reservation", "record_id": _new_id(), "run_id": f"run-{i}", "payload": {"data": "x" * payload_bytes}, "expires_at": _FUTURE_TS, } for i in range(n) ] t0 = time.monotonic() write_remote(root, records) elapsed = time.monotonic() - t0 assert elapsed < max_seconds, ( f"write_remote {n} × {payload_bytes}B records took {elapsed:.3f}s (> {max_seconds}s)" ) def test_50kb_payload_does_not_cause_memory_explosion(self, tmp_path: pathlib.Path) -> None: """ 50 records × 50 KB payload = 2.5 MB. After the gather, the live heap growth should stay well under 100 MB (not 2.5 GB from pathological duplication). """ root = _make_repo(tmp_path) _write_records(root, "reservation", 50, payload_bytes=51200) gather = _import_gather() tracemalloc.start() before = tracemalloc.take_snapshot() gather(root, ["reservation"]) after = tracemalloc.take_snapshot() tracemalloc.stop() stats = after.compare_to(before, "lineno") total_delta = sum(s.size_diff for s in stats if s.size_diff > 0) assert total_delta < 100 * 1024 * 1024, ( f"50-record 50KB gather allocated {total_delta / 1024 / 1024:.1f}MB " "(expected < 100MB)" ) # ============================================================================= # 6. Batch loop pure overhead # ============================================================================= class TestCoordPerfBatchLoop: """ The push batch loop in run_push splits records into chunks of MAX_PUSH_BATCH (500) and calls push_to_hub once per chunk. At 7 000 records that is 14 HTTP calls. These tests isolate the loop overhead from network cost. """ def test_batch_loop_7000_records_14_calls_under_500ms(self, tmp_path: pathlib.Path) -> None: """ With push_to_hub mocked, 14 batch calls on 7 000 records should complete in < 500ms. Any overhead above that is pure Python CPU cost. """ from muse.core.coord_bus import MAX_PUSH_BATCH root = _make_repo(tmp_path) # Write 1000 each of 7 kinds = 7000 total for kind in _ALL_KINDS: _write_records(root, kind, 1000) call_count: list[int] = [] def fake_push(hub_url: str, owner: str, slug: str, records: list[JsonDict], signing: SigningIdentity | None = None) -> MsgpackDict: call_count.append(len(records)) return {"inserted": len(records), "skipped": 0} with patch("muse.cli.commands.coord_sync._gather_local_records") as mock_gather, \ patch("muse.cli.commands.coord_sync.push_to_hub", side_effect=fake_push), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("https://localhost:1337", "tok")): # Build 7000 records in memory for mock_gather records = [] for i in range(7000): kind = _ALL_KINDS[i % len(_ALL_KINDS)] records.append({ "kind": kind, "record_id": _new_id(), "run_id": f"run-{i}", "payload": {"idx": i}, "expires_at": _FUTURE_TS, }) mock_gather.return_value = records import argparse args = argparse.Namespace( owner="torvalds", slug="linux", json_out=False, token=None, hub_url=None, kinds=list(_ALL_KINDS), ) t0 = time.monotonic() try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit as exc: assert exc.code == 0, f"run_push exited with code {exc.code}" elapsed = time.monotonic() - t0 expected_batches = (7000 + MAX_PUSH_BATCH - 1) // MAX_PUSH_BATCH assert len(call_count) == expected_batches, ( f"expected {expected_batches} batches, got {len(call_count)}" ) assert elapsed < 0.500, ( f"batch loop (push_to_hub mocked) for 7000 records took {elapsed:.3f}s (> 500ms)" ) def test_batch_sizes_are_never_over_max_push_batch(self, tmp_path: pathlib.Path) -> None: """Every batch must be ≤ MAX_PUSH_BATCH records.""" from muse.core.coord_bus import MAX_PUSH_BATCH root = _make_repo(tmp_path) call_sizes: list[int] = [] def fake_push(hub_url: str, owner: str, slug: str, records: list[JsonDict], signing: SigningIdentity | None = None) -> MsgpackDict: call_sizes.append(len(records)) return {"inserted": len(records), "skipped": 0} with patch("muse.cli.commands.coord_sync._gather_local_records") as mock_gather, \ patch("muse.cli.commands.coord_sync.push_to_hub", side_effect=fake_push), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("https://localhost:1337", "tok")): records = [ { "kind": "reservation", "record_id": _new_id(), "run_id": "run-0", "payload": {}, "expires_at": None, } for i in range(3333) # not a clean multiple of 500 ] mock_gather.return_value = records import argparse args = argparse.Namespace( owner="torvalds", slug="linux", json_out=False, token=None, hub_url=None, kinds=["reservation"], ) try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit: pass assert all(s <= MAX_PUSH_BATCH for s in call_sizes), ( f"batch sizes {call_sizes} contain a batch > MAX_PUSH_BATCH ({MAX_PUSH_BATCH})" ) def test_last_batch_is_correct_remainder(self, tmp_path: pathlib.Path) -> None: """With 7001 records (14 full + 1 leftover), last batch must be size 1.""" from muse.core.coord_bus import MAX_PUSH_BATCH root = _make_repo(tmp_path) call_sizes: list[int] = [] def fake_push(hub_url: str, owner: str, slug: str, records: list[JsonDict], signing: SigningIdentity | None = None) -> MsgpackDict: call_sizes.append(len(records)) return {"inserted": len(records), "skipped": 0} with patch("muse.cli.commands.coord_sync._gather_local_records") as mock_gather, \ patch("muse.cli.commands.coord_sync.push_to_hub", side_effect=fake_push), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("https://localhost:1337", "tok")): n = MAX_PUSH_BATCH * 14 + 1 # 7001 records = [ { "kind": "reservation", "record_id": _new_id(), "run_id": "run-0", "payload": {}, "expires_at": None, } for i in range(n) ] mock_gather.return_value = records import argparse args = argparse.Namespace( owner="torvalds", slug="linux", json_out=False, token=None, hub_url=None, kinds=["reservation"], ) try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit: pass assert len(call_sizes) == 15, f"expected 15 batches for {n} records, got {len(call_sizes)}" assert call_sizes[-1] == 1, f"last batch should be 1 record, got {call_sizes[-1]}" def test_zero_records_no_http_calls(self, tmp_path: pathlib.Path) -> None: """Empty gather must result in zero push_to_hub calls.""" root = _make_repo(tmp_path) call_count: list[int] = [] def fake_push(hub_url: str, owner: str, slug: str, records: list[JsonDict], signing: SigningIdentity | None = None) -> MsgpackDict: call_count.append(len(records)) return {"inserted": 0, "skipped": 0} with patch("muse.cli.commands.coord_sync._gather_local_records", return_value=[]), \ patch("muse.cli.commands.coord_sync.push_to_hub", side_effect=fake_push), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("https://localhost:1337", "tok")): import argparse args = argparse.Namespace( owner="torvalds", slug="linux", json_out=False, token=None, hub_url=None, kinds=list(_ALL_KINDS), ) try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit: pass assert call_count == [], f"expected 0 HTTP calls for empty push, got {len(call_count)}" # ============================================================================= # 7. Memory footprint # ============================================================================= class TestCoordPerfMemory: """ _gather_local_records holds all N records in a flat list before returning. At 10 000 records with 10KB payloads, that is ~100 MB of in-process memory. These tests document and bound the memory footprint. """ def test_gather_1000_records_memory_under_50mb(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_records(root, "reservation", 1000, payload_bytes=1024) gather = _import_gather() tracemalloc.start() snap_before = tracemalloc.take_snapshot() records = gather(root, ["reservation"]) snap_after = tracemalloc.take_snapshot() tracemalloc.stop() stats = snap_after.compare_to(snap_before, "lineno") delta_bytes = sum(s.size_diff for s in stats if s.size_diff > 0) assert len(records) == 1000 assert delta_bytes < 50 * 1024 * 1024, ( f"gather(1000 × 1KB) allocated {delta_bytes / 1024 / 1024:.1f}MB (> 50MB)" ) def test_gather_all_7_kinds_200_each_memory_under_100mb(self, tmp_path: pathlib.Path) -> None: """1400 records × 1KB payload = 1.4 MB on disk; heap delta should be < 100 MB.""" root = _make_repo(tmp_path) for kind in _ALL_KINDS: _write_records(root, kind, 200, payload_bytes=1024) gather = _import_gather() tracemalloc.start() snap_before = tracemalloc.take_snapshot() records = gather(root, list(_ALL_KINDS)) snap_after = tracemalloc.take_snapshot() tracemalloc.stop() stats = snap_after.compare_to(snap_before, "lineno") delta_bytes = sum(s.size_diff for s in stats if s.size_diff > 0) assert len(records) == 1400 assert delta_bytes < 100 * 1024 * 1024, ( f"gather(1400 × 1KB, all kinds) allocated {delta_bytes / 1024 / 1024:.1f}MB (> 100MB)" ) def test_write_remote_1000_records_memory_under_100mb(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) write_remote = _import_write_remote() records = [ { "kind": _ALL_KINDS[i % len(_ALL_KINDS)], "record_id": _new_id(), "run_id": f"run-{i}", "payload": {"data": "x" * 1024}, "expires_at": _FUTURE_TS, } for i in range(1000) ] tracemalloc.start() snap_before = tracemalloc.take_snapshot() write_remote(root, records) snap_after = tracemalloc.take_snapshot() tracemalloc.stop() stats = snap_after.compare_to(snap_before, "lineno") delta_bytes = sum(s.size_diff for s in stats if s.size_diff > 0) assert delta_bytes < 100 * 1024 * 1024, ( f"write_remote(1000 × 1KB) allocated {delta_bytes / 1024 / 1024:.1f}MB (> 100MB)" ) def test_gc_1000_reservations_memory_under_50mb(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _write_expired_reservations(root, 1000) run_coord_gc = _import_run_coord_gc() tracemalloc.start() snap_before = tracemalloc.take_snapshot() run_coord_gc(root, dry_run=False, grace_period_seconds=0) snap_after = tracemalloc.take_snapshot() tracemalloc.stop() stats = snap_after.compare_to(snap_before, "lineno") delta_bytes = sum(s.size_diff for s in stats if s.size_diff > 0) assert delta_bytes < 50 * 1024 * 1024, ( f"GC(1000 expired) allocated {delta_bytes / 1024 / 1024:.1f}MB (> 50MB)" ) # ============================================================================= # 8. End-to-end throughput # ============================================================================= class TestCoordPerfThroughput: """ End-to-end gather → batch → push cycle with a mock HTTP layer. Measures total records/second and total wall-clock time. """ def test_end_to_end_500_records_under_1s(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) for kind in _ALL_KINDS: _write_records(root, kind, 500 // len(_ALL_KINDS) + 1) call_count = [] def fake_push(hub_url: str, owner: str, slug: str, records: list[JsonDict], signing: SigningIdentity | None = None) -> MsgpackDict: call_count.append(len(records)) return {"inserted": len(records), "skipped": 0} with patch("muse.cli.commands.coord_sync.push_to_hub", side_effect=fake_push), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("https://localhost:1337", "tok")): import argparse args = argparse.Namespace( owner="torvalds", slug="linux", json_out=False, token=None, hub_url=None, kinds=list(_ALL_KINDS), ) t0 = time.monotonic() try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit as exc: assert exc.code == 0 elapsed = time.monotonic() - t0 total_pushed = sum(call_count) assert total_pushed > 0 assert elapsed < 1.0, f"end-to-end ~500 records took {elapsed:.3f}s (> 1s)" @pytest.mark.slow def test_end_to_end_7000_records_under_10s(self, tmp_path: pathlib.Path) -> None: """ Full Linux kernel agent swarm: 7 000 records across all 7 kinds. push_to_hub is mocked — this measures gather + batch splitting overhead only. """ root = _make_repo(tmp_path) for kind in _ALL_KINDS: _write_records(root, kind, 1000) call_count = [] def fake_push(hub_url: str, owner: str, slug: str, records: list[JsonDict], signing: SigningIdentity | None = None) -> MsgpackDict: call_count.append(len(records)) return {"inserted": len(records), "skipped": 0} with patch("muse.cli.commands.coord_sync.push_to_hub", side_effect=fake_push), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("https://localhost:1337", "tok")): import argparse args = argparse.Namespace( owner="torvalds", slug="linux", json_out=False, token=None, hub_url=None, kinds=list(_ALL_KINDS), ) t0 = time.monotonic() try: from muse.cli.commands.coord_sync import run_push run_push(args) except SystemExit as exc: assert exc.code == 0 elapsed = time.monotonic() - t0 total_pushed = sum(call_count) assert total_pushed == 7000, f"expected 7000 records pushed, got {total_pushed}" assert elapsed < 10.0, ( f"end-to-end 7000 records (gather + batch, mocked HTTP) took {elapsed:.3f}s (> 10s)" ) rps = total_pushed / elapsed assert rps >= 700, f"end-to-end throughput {rps:.0f} records/s is below minimum 700/s" def test_repeated_pulls_do_not_accumulate_state(self, tmp_path: pathlib.Path) -> None: """ run_pull is stateless per call — repeated calls on the same repo should have stable (not growing) elapsed time. """ root = _make_repo(tmp_path) (coordination_dir(root) / _REMOTE_DIR_NAME).mkdir(parents=True, exist_ok=True) fake_records = [ { "kind": "reservation", "record_id": _new_id(), "run_id": "run-0", "payload": {"idx": i}, "expires_at": _FUTURE_TS, } for i in range(200) ] def fake_pull( hub_url: str, owner: str, slug: str, since_id: int = 0, kinds: list[str] | None = None, limit: int = 500, signing: SigningIdentity | None = None, ) -> MsgpackDict: return {"records": fake_records, "cursor": since_id + len(fake_records)} with patch("muse.cli.commands.coord_sync.pull_from_hub", side_effect=fake_pull), \ patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \ patch("muse.cli.commands.coord_sync._resolve_hub_and_signing", return_value=("https://localhost:1337", "tok")): import argparse elapsed_list = [] for _ in range(5): args = argparse.Namespace( owner="torvalds", slug="linux", json_out=False, token=None, hub_url=None, since_id=0, limit=1000, kinds=list(_ALL_KINDS), ) t0 = time.monotonic() try: from muse.cli.commands.coord_sync import run_pull run_pull(args) except SystemExit: pass elapsed_list.append(time.monotonic() - t0) # The 5th call must not take more than 5× the 1st call — no accumulation assert elapsed_list[4] < max(elapsed_list[0] * 5, 0.500), ( f"repeated pull times grew suspiciously: {[f'{e:.3f}' for e in elapsed_list]}" ) _REMOTE_DIR_NAME = "remote"