gabriel / muse public
test_coord_performance.py python
1,234 lines 46.5 KB
Raw
1 """
2 EXTREME performance test suite for ``muse coord`` — Linus Torvalds porting
3 Linux from Git to Muse.
4
5 Scenario: 7 000+ agents, 150 000+ files, kernel subsystems as task clusters.
6 We want to break Muse Coord. We want to find where the edge is and go beyond.
7
8 Bottlenecks targeted:
9 1. _gather_local_records — O(N) file reads, no caching, no batching
10 2. _write_remote_records — mkdir() called PER record (N syscalls vs 7)
11 3. run_coord_gc — releases directory scanned TWICE (load_released_ids
12 + load_all_releases); active_reservations does 3
13 full directory scans
14 4. JSON payload scaling — 50B / 1 KB / 10 KB / 50 KB per record
15 5. Batch loop overhead — pure CPU cost of splitting 7 000 records into batches
16 6. Memory footprint — 10 000 records held in memory simultaneously
17 7. End-to-end throughput — records/second metric
18
19 Existing tests in test_core_coord_bus.py (do NOT duplicate):
20 - 500-record push serialization < 1 s
21 - 1000-record pull parse < 1 s
22 - 100 sequential push_to_hub < 2 s
23 - 100k _build_url calls < 1 s
24 """
25 from __future__ import annotations
26
27 import itertools
28 import json
29 import os
30 import pathlib
31 import sys
32 import time
33 import tracemalloc
34 from collections.abc import Callable
35 from typing import TYPE_CHECKING
36 from unittest.mock import MagicMock, patch
37
38 import pytest
39
40 from muse.core.types import MsgpackDict, content_hash
41 from muse.core.coord_bus import JsonDict
42 from muse.core.paths import coordination_dir, muse_dir
43
44 if TYPE_CHECKING:
45 from muse.core.coordination import CoordGcResult, Reservation
46 from muse.core.transport import SigningIdentity
47
48 # ── helpers ──────────────────────────────────────────────────────────────────
49
50 _id_seq = itertools.count()
51
52
53 def _new_id() -> str:
54 return content_hash({"seq": next(_id_seq)})
55
56
57 _ALL_KINDS = ("reservation", "intent", "release", "heartbeat", "dependency", "task", "claim")
58
59 _KIND_SUBDIR = {
60 "reservation": "reservations",
61 "intent": "intents",
62 "release": "releases",
63 "heartbeat": "heartbeats",
64 "dependency": "dependencies",
65 "task": "tasks",
66 "claim": "claims",
67 }
68
69 _FUTURE_TS = "2099-12-31T23:59:59+00:00"
70 _PAST_TS = "2000-01-01T00:00:00+00:00"
71
72
73 def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path:
74 muse_dir(tmp_path).mkdir(parents=True, exist_ok=True)
75 return tmp_path
76
77
78 def _coord_dir(root: pathlib.Path) -> pathlib.Path:
79 d = coordination_dir(root)
80 d.mkdir(parents=True, exist_ok=True)
81 return d
82
83
84 def _write_records(root: pathlib.Path, kind: str, n: int, payload_bytes: int = 64) -> None:
85 """Write *n* minimal records of *kind* to the local coordination store."""
86 subdir = _KIND_SUBDIR[kind]
87 d = _coord_dir(root) / subdir
88 d.mkdir(parents=True, exist_ok=True)
89
90 padding = "x" * max(0, payload_bytes - 80)
91
92 for i in range(n):
93 rid = f"{kind}-{i:06d}"
94 if kind == "reservation":
95 rec = {
96 "reservation_id": rid,
97 "run_id": f"run-{i}",
98 "expires_at": _FUTURE_TS,
99 "symbols": [f"kernel/subsys_{i % 50}/module.c::func_{i}"],
100 "_pad": padding,
101 }
102 elif kind == "heartbeat":
103 rec = {
104 "run_id": rid,
105 "extended_expires_at": _FUTURE_TS,
106 "expires_at": _FUTURE_TS,
107 "_pad": padding,
108 }
109 elif kind == "intent":
110 rec = {
111 "intent_id": rid,
112 "run_id": f"run-{i}",
113 "reservation_id": f"reservation-{i:06d}",
114 "operation": "modify",
115 "target": f"kernel/subsys_{i % 50}/module.c::func_{i}",
116 "_pad": padding,
117 }
118 elif kind == "release":
119 rec = {
120 "release_id": rid,
121 "reservation_id": f"reservation-{i:06d}",
122 "run_id": f"run-{i}",
123 "released_at": _FUTURE_TS,
124 "_pad": padding,
125 }
126 elif kind == "dependency":
127 rec = {
128 "reservation_id": rid,
129 "depends_on": [f"reservation-{j:06d}" for j in range(min(3, i))],
130 "_pad": padding,
131 }
132 elif kind == "task":
133 rec = {
134 "task_id": rid,
135 "run_id": f"run-{i}",
136 "description": f"patch kernel subsystem {i % 50}",
137 "_pad": padding,
138 }
139 elif kind == "claim":
140 rec = {
141 "task_id": rid,
142 "claimer_run_id": f"run-{i}",
143 "expires_at": _FUTURE_TS,
144 "_pad": padding,
145 }
146 else:
147 rec = {"id": rid, "_pad": padding}
148
149 (d / f"{rid}.json").write_text(json.dumps(rec), encoding="utf-8")
150
151
152 def _write_expired_reservations(root: pathlib.Path, n: int) -> None:
153 """Write *n* expired reservations (expires_at in the past, no heartbeat, no release)."""
154 d = _coord_dir(root) / "reservations"
155 d.mkdir(parents=True, exist_ok=True)
156 for i in range(n):
157 rid = f"exp-res-{i:06d}"
158 rec = {
159 "reservation_id": rid,
160 "run_id": f"run-{i}",
161 "expires_at": _PAST_TS,
162 "symbols": [f"drivers/block/hd_{i}.c::init"],
163 }
164 (d / f"{rid}.json").write_text(json.dumps(rec), encoding="utf-8")
165
166
167 def _write_released_reservations(root: pathlib.Path, n: int) -> None:
168 """Write *n* pairs: reservation + release tombstone (released in the far past)."""
169 res_dir = _coord_dir(root) / "reservations"
170 rel_dir = _coord_dir(root) / "releases"
171 res_dir.mkdir(parents=True, exist_ok=True)
172 rel_dir.mkdir(parents=True, exist_ok=True)
173 for i in range(n):
174 rid = f"rel-res-{i:06d}"
175 rec = {
176 "reservation_id": rid,
177 "run_id": f"run-{i}",
178 "expires_at": _PAST_TS,
179 "symbols": [f"net/ipv4/tcp_{i}.c::send"],
180 }
181 (res_dir / f"{rid}.json").write_text(json.dumps(rec), encoding="utf-8")
182 tombstone = {
183 "release_id": f"release-{i:06d}",
184 "reservation_id": rid,
185 "run_id": f"run-{i}",
186 "released_at": _PAST_TS,
187 }
188 (rel_dir / f"{rid}.json").write_text(json.dumps(tombstone), encoding="utf-8")
189
190
191 # ── import targets ────────────────────────────────────────────────────────────
192
193 def _import_gather() -> Callable[[pathlib.Path, list[str]], list[JsonDict]]:
194 from muse.cli.commands.coord_sync import _gather_local_records
195 return _gather_local_records
196
197
198 def _import_write_remote() -> Callable[[pathlib.Path, list[JsonDict]], None]:
199 from muse.cli.commands.coord_sync import _write_remote_records
200 return _write_remote_records
201
202
203 def _import_run_coord_gc() -> Callable[..., CoordGcResult]:
204 from muse.core.coordination import run_coord_gc
205 return run_coord_gc
206
207
208 def _import_active_reservations() -> Callable[[pathlib.Path], list[Reservation]]:
209 from muse.core.coordination import active_reservations
210 return active_reservations
211
212
213 # =============================================================================
214 # 1. _gather_local_records — O(N) file I/O
215 # =============================================================================
216
217 class TestCoordPerfGather:
218 """
219 _gather_local_records reads every file on every call. These tests measure
220 raw throughput at increasing scale and expose the O(N) cost.
221 """
222
223 def test_gather_100_reservations_under_200ms(self, tmp_path: pathlib.Path) -> None:
224 root = _make_repo(tmp_path)
225 _write_records(root, "reservation", 100)
226 gather = _import_gather()
227
228 t0 = time.monotonic()
229 records = gather(root, ["reservation"])
230 elapsed = time.monotonic() - t0
231
232 assert len(records) == 100
233 assert elapsed < 0.200, f"gather 100 reservations took {elapsed:.3f}s (> 200ms)"
234
235 def test_gather_500_reservations_under_500ms(self, tmp_path: pathlib.Path) -> None:
236 root = _make_repo(tmp_path)
237 _write_records(root, "reservation", 500)
238 gather = _import_gather()
239
240 t0 = time.monotonic()
241 records = gather(root, ["reservation"])
242 elapsed = time.monotonic() - t0
243
244 assert len(records) == 500
245 assert elapsed < 0.500, f"gather 500 reservations took {elapsed:.3f}s (> 500ms)"
246
247 def test_gather_1000_reservations_under_1s(self, tmp_path: pathlib.Path) -> None:
248 root = _make_repo(tmp_path)
249 _write_records(root, "reservation", 1000)
250 gather = _import_gather()
251
252 t0 = time.monotonic()
253 records = gather(root, ["reservation"])
254 elapsed = time.monotonic() - t0
255
256 assert len(records) == 1000
257 assert elapsed < 1.0, f"gather 1000 reservations took {elapsed:.3f}s (> 1s)"
258
259 @pytest.mark.slow
260 def test_gather_5000_reservations_under_5s(self, tmp_path: pathlib.Path) -> None:
261 root = _make_repo(tmp_path)
262 _write_records(root, "reservation", 5000)
263 gather = _import_gather()
264
265 t0 = time.monotonic()
266 records = gather(root, ["reservation"])
267 elapsed = time.monotonic() - t0
268
269 assert len(records) == 5000
270 assert elapsed < 5.0, f"gather 5000 reservations took {elapsed:.3f}s (> 5s)"
271
272 @pytest.mark.slow
273 def test_gather_all_7_kinds_1000_each_under_7s(self, tmp_path: pathlib.Path) -> None:
274 """7000 files across 7 kinds — worst-case full Linux kernel scenario."""
275 root = _make_repo(tmp_path)
276 for kind in _ALL_KINDS:
277 _write_records(root, kind, 1000)
278 gather = _import_gather()
279
280 t0 = time.monotonic()
281 records = gather(root, list(_ALL_KINDS))
282 elapsed = time.monotonic() - t0
283
284 assert len(records) == 7000
285 assert elapsed < 7.0, f"gather 7000 records (all kinds) took {elapsed:.3f}s (> 7s)"
286
287 def test_cold_vs_warm_gather_same_directory(self, tmp_path: pathlib.Path) -> None:
288 """
289 Second call must not be dramatically slower than the first.
290 There is no caching; both calls read from disk. The second call
291 benefits only from OS page-cache effects — that is acceptable.
292 """
293 root = _make_repo(tmp_path)
294 _write_records(root, "reservation", 200)
295 gather = _import_gather()
296
297 t0 = time.monotonic()
298 gather(root, ["reservation"])
299 cold = time.monotonic() - t0
300
301 t0 = time.monotonic()
302 gather(root, ["reservation"])
303 warm = time.monotonic() - t0
304
305 # Warm should not be more than 4× cold (OS cache should help).
306 # We do NOT assert warm < cold because there is no in-process cache.
307 assert warm < max(cold * 4, 0.500), (
308 f"warm gather ({warm:.3f}s) is unexpectedly slow vs cold ({cold:.3f}s)"
309 )
310
311 def test_gather_empty_directory_is_fast(self, tmp_path: pathlib.Path) -> None:
312 root = _make_repo(tmp_path)
313 _coord_dir(root) # creates .muse/coordination/ but no kind subdirs
314 gather = _import_gather()
315
316 t0 = time.monotonic()
317 for _ in range(100):
318 gather(root, list(_ALL_KINDS))
319 elapsed = time.monotonic() - t0
320
321 assert elapsed < 0.200, f"100 × empty gather took {elapsed:.3f}s (> 200ms)"
322
323 def test_gather_single_kind_ignores_other_dirs(self, tmp_path: pathlib.Path) -> None:
324 """Filtering to one kind must not scan 6 other directories."""
325 root = _make_repo(tmp_path)
326 for kind in _ALL_KINDS:
327 _write_records(root, kind, 200) # 1400 files total
328 gather = _import_gather()
329
330 t0 = time.monotonic()
331 records = gather(root, ["reservation"])
332 elapsed = time.monotonic() - t0
333
334 assert len(records) == 200
335 assert elapsed < 0.500, (
336 f"single-kind gather across 1400-file repo took {elapsed:.3f}s (> 500ms)"
337 )
338
339 def test_gather_throughput_records_per_second(self, tmp_path: pathlib.Path) -> None:
340 """
341 Measures records/second. Documents current throughput so regressions
342 become visible. On modern SSD hardware ≥ 1 000 records/s expected.
343 """
344 root = _make_repo(tmp_path)
345 _write_records(root, "reservation", 500)
346 gather = _import_gather()
347
348 # Warm up OS page cache
349 gather(root, ["reservation"])
350
351 t0 = time.monotonic()
352 records = gather(root, ["reservation"])
353 elapsed = time.monotonic() - t0
354
355 rps = len(records) / elapsed if elapsed > 0 else float("inf")
356 # This is a documentation assertion — fail loudly if throughput crashes.
357 assert rps >= 500, f"gather throughput {rps:.0f} records/s is below minimum 500/s"
358
359
360 # =============================================================================
361 # 2. _write_remote_records — mkdir() per record
362 # =============================================================================
363
364 class TestCoordPerfWriteRemote:
365 """
366 _write_remote_records calls kind_dir.mkdir(parents=True, exist_ok=True) for
367 EVERY record. With 7 distinct kinds and 7 000 records that is 7 000 mkdir
368 syscalls. These tests measure that overhead and expose the regression path.
369 """
370
371 def test_write_100_records_7_kinds_under_500ms(self, tmp_path: pathlib.Path) -> None:
372 root = _make_repo(tmp_path)
373 write_remote = _import_write_remote()
374
375 records = []
376 for i in range(100):
377 kind = _ALL_KINDS[i % len(_ALL_KINDS)]
378 records.append({
379 "kind": kind,
380 "record_id": _new_id(),
381 "run_id": f"run-{i}",
382 "payload": {"data": "x" * 64},
383 "expires_at": _FUTURE_TS,
384 })
385
386 t0 = time.monotonic()
387 write_remote(root, records)
388 elapsed = time.monotonic() - t0
389
390 assert elapsed < 0.500, f"write 100 remote records took {elapsed:.3f}s (> 500ms)"
391 # Verify files actually written
392 remote_dir = coordination_dir(root) / "remote"
393 written = sum(1 for _ in remote_dir.rglob("*.json"))
394 assert written == 100
395
396 def test_write_1000_records_7_kinds_under_3s(self, tmp_path: pathlib.Path) -> None:
397 root = _make_repo(tmp_path)
398 write_remote = _import_write_remote()
399
400 records = [
401 {
402 "kind": _ALL_KINDS[i % len(_ALL_KINDS)],
403 "record_id": _new_id(),
404 "run_id": f"run-{i}",
405 "payload": {"idx": i},
406 "expires_at": _FUTURE_TS,
407 }
408 for i in range(1000)
409 ]
410
411 t0 = time.monotonic()
412 write_remote(root, records)
413 elapsed = time.monotonic() - t0
414
415 assert elapsed < 3.0, f"write 1000 remote records took {elapsed:.3f}s (> 3s)"
416
417 @pytest.mark.slow
418 def test_write_7000_records_mkdir_overhead_documented(self, tmp_path: pathlib.Path) -> None:
419 """
420 Documents the mkdir-per-record overhead at full Linux kernel scale.
421 7 000 records × 7 kinds = 7 000 mkdir() calls instead of the optimal 7.
422
423 This test is expected to PASS with the current implementation.
424 Its purpose: if a future optimization pre-creates directories (7 calls),
425 the elapsed time should drop by ~30-50%. A regression would show here.
426 """
427 root = _make_repo(tmp_path)
428 write_remote = _import_write_remote()
429
430 records = [
431 {
432 "kind": _ALL_KINDS[i % len(_ALL_KINDS)],
433 "record_id": _new_id(),
434 "run_id": f"run-{i}",
435 "payload": {"idx": i},
436 "expires_at": None,
437 }
438 for i in range(7000)
439 ]
440
441 t0 = time.monotonic()
442 write_remote(root, records)
443 elapsed = time.monotonic() - t0
444
445 remote_dir = coordination_dir(root) / "remote"
446 written = sum(1 for _ in remote_dir.rglob("*.json"))
447 assert written == 7000, f"expected 7000 files, got {written}"
448 # Generous bound — this is the CURRENT (suboptimal) ceiling
449 assert elapsed < 20.0, f"write 7000 remote records took {elapsed:.3f}s (> 20s)"
450
451 def test_write_throughput_records_per_second(self, tmp_path: pathlib.Path) -> None:
452 """Baseline throughput for write_remote_records at 500 records."""
453 root = _make_repo(tmp_path)
454 write_remote = _import_write_remote()
455
456 records = [
457 {
458 "kind": _ALL_KINDS[i % len(_ALL_KINDS)],
459 "record_id": _new_id(),
460 "run_id": f"run-{i}",
461 "payload": {"idx": i},
462 "expires_at": None,
463 }
464 for i in range(500)
465 ]
466
467 t0 = time.monotonic()
468 write_remote(root, records)
469 elapsed = time.monotonic() - t0
470
471 rps = 500 / elapsed if elapsed > 0 else float("inf")
472 assert rps >= 100, f"write_remote throughput {rps:.0f} records/s is below minimum 100/s"
473
474 def test_write_remote_overwrites_are_not_slower_than_first_write(self, tmp_path: pathlib.Path) -> None:
475 """Overwriting 500 records (second call) must not be dramatically slower."""
476 root = _make_repo(tmp_path)
477 write_remote = _import_write_remote()
478
479 records = [
480 {
481 "kind": _ALL_KINDS[i % len(_ALL_KINDS)],
482 "record_id": _new_id(),
483 "run_id": f"run-{i}",
484 "payload": {"idx": i},
485 "expires_at": None,
486 }
487 for i in range(500)
488 ]
489
490 # First write — directories created
491 t0 = time.monotonic()
492 write_remote(root, records)
493 first = time.monotonic() - t0
494
495 # Second write — exist_ok=True, but still 500 mkdir() syscalls
496 t0 = time.monotonic()
497 write_remote(root, records)
498 second = time.monotonic() - t0
499
500 # Second should not be more than 5× the first (exist_ok adds a stat call)
501 assert second < max(first * 5, 2.0), (
502 f"overwrite pass ({second:.3f}s) is unexpectedly slow vs first write ({first:.3f}s)"
503 )
504
505
506 # =============================================================================
507 # 3. run_coord_gc — double-scan of releases, triple active_reservations scan
508 # =============================================================================
509
510 class TestCoordPerfGC:
511 """
512 run_coord_gc loads releases TWICE: load_released_ids() + load_all_releases().
513 These tests expose that cost at scale.
514 """
515
516 def test_gc_1000_expired_reservations_under_3s(self, tmp_path: pathlib.Path) -> None:
517 root = _make_repo(tmp_path)
518 _write_expired_reservations(root, 1000)
519 run_coord_gc = _import_run_coord_gc()
520
521 t0 = time.monotonic()
522 result = run_coord_gc(root, dry_run=False, grace_period_seconds=0)
523 elapsed = time.monotonic() - t0
524
525 assert result.reservations_removed == 1000
526 assert elapsed < 3.0, f"GC 1000 expired reservations took {elapsed:.3f}s (> 3s)"
527
528 def test_gc_1000_released_reservations_under_4s(self, tmp_path: pathlib.Path) -> None:
529 """
530 Released reservations force the double-scan: load_released_ids() then
531 load_all_releases(). 1 000 release tombstones = 2 × 1 000 file reads.
532 """
533 root = _make_repo(tmp_path)
534 _write_released_reservations(root, 1000)
535 run_coord_gc = _import_run_coord_gc()
536
537 t0 = time.monotonic()
538 result = run_coord_gc(root, dry_run=False, grace_period_seconds=0)
539 elapsed = time.monotonic() - t0
540
541 assert result.reservations_removed == 1000
542 assert elapsed < 4.0, (
543 f"GC 1000 released reservations took {elapsed:.3f}s (> 4s — double-scan cost)"
544 )
545
546 @pytest.mark.slow
547 def test_gc_5000_expired_reservations_under_15s(self, tmp_path: pathlib.Path) -> None:
548 root = _make_repo(tmp_path)
549 _write_expired_reservations(root, 5000)
550 run_coord_gc = _import_run_coord_gc()
551
552 t0 = time.monotonic()
553 result = run_coord_gc(root, dry_run=False, grace_period_seconds=0)
554 elapsed = time.monotonic() - t0
555
556 assert result.reservations_removed == 5000
557 assert elapsed < 15.0, f"GC 5000 expired reservations took {elapsed:.3f}s (> 15s)"
558
559 def test_gc_dry_run_is_not_slower_than_live_run(self, tmp_path: pathlib.Path) -> None:
560 """
561 Dry-run must not be significantly slower than a live run at the same scale.
562 Both run the same 4 directory scans; the only difference is no unlink().
563 """
564 root_live = _make_repo(tmp_path / "live")
565 root_dry = _make_repo(tmp_path / "dry")
566 _write_expired_reservations(root_live, 300)
567 _write_expired_reservations(root_dry, 300)
568 run_coord_gc = _import_run_coord_gc()
569
570 t0 = time.monotonic()
571 run_coord_gc(root_live, dry_run=False, grace_period_seconds=0)
572 live_elapsed = time.monotonic() - t0
573
574 t0 = time.monotonic()
575 run_coord_gc(root_dry, dry_run=True, grace_period_seconds=0)
576 dry_elapsed = time.monotonic() - t0
577
578 # Dry-run has no unlink() calls — it should not be slower than live run
579 assert dry_elapsed < max(live_elapsed * 3, 1.0), (
580 f"dry_run ({dry_elapsed:.3f}s) is unexpectedly slower than live ({live_elapsed:.3f}s)"
581 )
582
583 def test_gc_empty_repo_is_fast(self, tmp_path: pathlib.Path) -> None:
584 root = _make_repo(tmp_path)
585 _coord_dir(root)
586 run_coord_gc = _import_run_coord_gc()
587
588 t0 = time.monotonic()
589 for _ in range(50):
590 run_coord_gc(root, dry_run=True, grace_period_seconds=0)
591 elapsed = time.monotonic() - t0
592
593 assert elapsed < 0.500, f"50 × GC on empty repo took {elapsed:.3f}s (> 500ms)"
594
595 def test_gc_throughput_reservations_per_second(self, tmp_path: pathlib.Path) -> None:
596 root = _make_repo(tmp_path)
597 _write_expired_reservations(root, 500)
598 run_coord_gc = _import_run_coord_gc()
599
600 t0 = time.monotonic()
601 result = run_coord_gc(root, dry_run=False, grace_period_seconds=0)
602 elapsed = time.monotonic() - t0
603
604 rps = result.reservations_removed / elapsed if elapsed > 0 else float("inf")
605 assert rps >= 100, f"GC throughput {rps:.0f} reservations/s is below minimum 100/s"
606
607
608 # =============================================================================
609 # 4. active_reservations — triple directory scan
610 # =============================================================================
611
612 class TestCoordPerfActiveReservations:
613 """
614 active_reservations calls:
615 load_released_ids() — scan 1: releases/
616 load_heartbeat_map() — scan 2: heartbeats/
617 load_all_reservations() — scan 3: reservations/
618 At 1 000 live reservations this is 3 × 1 000 file reads.
619 """
620
621 def test_active_reservations_500_under_1_5s(self, tmp_path: pathlib.Path) -> None:
622 root = _make_repo(tmp_path)
623 _write_records(root, "reservation", 500)
624 active_reservations = _import_active_reservations()
625
626 t0 = time.monotonic()
627 result = active_reservations(root)
628 elapsed = time.monotonic() - t0
629
630 assert len(result) == 500
631 assert elapsed < 1.5, f"active_reservations(500) took {elapsed:.3f}s (> 1.5s)"
632
633 def test_active_reservations_1000_under_3s(self, tmp_path: pathlib.Path) -> None:
634 root = _make_repo(tmp_path)
635 _write_records(root, "reservation", 1000)
636 active_reservations = _import_active_reservations()
637
638 t0 = time.monotonic()
639 result = active_reservations(root)
640 elapsed = time.monotonic() - t0
641
642 assert len(result) == 1000
643 assert elapsed < 3.0, f"active_reservations(1000) took {elapsed:.3f}s (> 3s)"
644
645 @pytest.mark.slow
646 def test_active_reservations_3000_under_9s(self, tmp_path: pathlib.Path) -> None:
647 """3 000 reservations × 3 directory scans = ~9 000 file reads."""
648 root = _make_repo(tmp_path)
649 _write_records(root, "reservation", 3000)
650 active_reservations = _import_active_reservations()
651
652 t0 = time.monotonic()
653 result = active_reservations(root)
654 elapsed = time.monotonic() - t0
655
656 assert len(result) == 3000
657 assert elapsed < 9.0, f"active_reservations(3000) took {elapsed:.3f}s (> 9s — triple-scan)"
658
659 def test_active_reservations_with_heartbeats_not_slower_than_without(self, tmp_path: pathlib.Path) -> None:
660 """
661 Adding heartbeat files adds a second directory scan. Measure the
662 overhead explicitly to document the triple-scan cost.
663 """
664 root_no_hb = _make_repo(tmp_path / "no_hb")
665 root_hb = _make_repo(tmp_path / "hb")
666 _write_records(root_no_hb, "reservation", 200)
667 _write_records(root_hb, "reservation", 200)
668 _write_records(root_hb, "heartbeat", 200)
669 active_reservations = _import_active_reservations()
670
671 t0 = time.monotonic()
672 active_reservations(root_no_hb)
673 no_hb_elapsed = time.monotonic() - t0
674
675 t0 = time.monotonic()
676 active_reservations(root_hb)
677 hb_elapsed = time.monotonic() - t0
678
679 # With heartbeats should not be more than 5× slower (both are O(N))
680 assert hb_elapsed < max(no_hb_elapsed * 5, 1.0), (
681 f"active_reservations with heartbeats ({hb_elapsed:.3f}s) "
682 f"is unexpectedly slow vs without ({no_hb_elapsed:.3f}s)"
683 )
684
685 def test_active_reservations_throughput_per_second(self, tmp_path: pathlib.Path) -> None:
686 root = _make_repo(tmp_path)
687 _write_records(root, "reservation", 300)
688 active_reservations = _import_active_reservations()
689
690 # Warm up
691 active_reservations(root)
692
693 t0 = time.monotonic()
694 result = active_reservations(root)
695 elapsed = time.monotonic() - t0
696
697 rps = len(result) / elapsed if elapsed > 0 else float("inf")
698 assert rps >= 100, (
699 f"active_reservations throughput {rps:.0f} records/s is below minimum 100/s"
700 )
701
702
703 # =============================================================================
704 # 5. JSON payload scaling
705 # =============================================================================
706
707 class TestCoordPerfPayloadScaling:
708 """
709 Measures how _gather_local_records and _write_remote_records degrade as
710 payload sizes grow from 50 B to 50 KB per record.
711 """
712
713 @pytest.mark.parametrize("payload_bytes,n,max_seconds", [
714 (50, 500, 1.0),
715 (1024, 500, 2.0),
716 (10240, 200, 3.0),
717 (51200, 50, 3.0),
718 ])
719 def test_gather_payload_scaling(self, tmp_path: pathlib.Path, payload_bytes: int, n: int, max_seconds: float) -> None:
720 root = _make_repo(tmp_path)
721 _write_records(root, "reservation", n, payload_bytes=payload_bytes)
722 gather = _import_gather()
723
724 t0 = time.monotonic()
725 records = gather(root, ["reservation"])
726 elapsed = time.monotonic() - t0
727
728 assert len(records) == n
729 assert elapsed < max_seconds, (
730 f"gather {n} × {payload_bytes}B records took {elapsed:.3f}s (> {max_seconds}s)"
731 )
732
733 @pytest.mark.parametrize("payload_bytes,n,max_seconds", [
734 (50, 500, 2.0),
735 (1024, 500, 3.0),
736 (10240, 200, 3.0),
737 (51200, 50, 2.0),
738 ])
739 def test_write_remote_payload_scaling(self, tmp_path: pathlib.Path, payload_bytes: int, n: int, max_seconds: float) -> None:
740 root = _make_repo(tmp_path)
741 write_remote = _import_write_remote()
742
743 records = [
744 {
745 "kind": "reservation",
746 "record_id": _new_id(),
747 "run_id": f"run-{i}",
748 "payload": {"data": "x" * payload_bytes},
749 "expires_at": _FUTURE_TS,
750 }
751 for i in range(n)
752 ]
753
754 t0 = time.monotonic()
755 write_remote(root, records)
756 elapsed = time.monotonic() - t0
757
758 assert elapsed < max_seconds, (
759 f"write_remote {n} × {payload_bytes}B records took {elapsed:.3f}s (> {max_seconds}s)"
760 )
761
762 def test_50kb_payload_does_not_cause_memory_explosion(self, tmp_path: pathlib.Path) -> None:
763 """
764 50 records × 50 KB payload = 2.5 MB. After the gather, the live heap
765 growth should stay well under 100 MB (not 2.5 GB from pathological
766 duplication).
767 """
768 root = _make_repo(tmp_path)
769 _write_records(root, "reservation", 50, payload_bytes=51200)
770 gather = _import_gather()
771
772 tracemalloc.start()
773 before = tracemalloc.take_snapshot()
774 gather(root, ["reservation"])
775 after = tracemalloc.take_snapshot()
776 tracemalloc.stop()
777
778 stats = after.compare_to(before, "lineno")
779 total_delta = sum(s.size_diff for s in stats if s.size_diff > 0)
780 assert total_delta < 100 * 1024 * 1024, (
781 f"50-record 50KB gather allocated {total_delta / 1024 / 1024:.1f}MB "
782 "(expected < 100MB)"
783 )
784
785
786 # =============================================================================
787 # 6. Batch loop pure overhead
788 # =============================================================================
789
790 class TestCoordPerfBatchLoop:
791 """
792 The push batch loop in run_push splits records into chunks of MAX_PUSH_BATCH
793 (500) and calls push_to_hub once per chunk. At 7 000 records that is 14
794 HTTP calls. These tests isolate the loop overhead from network cost.
795 """
796
797 def test_batch_loop_7000_records_14_calls_under_500ms(self, tmp_path: pathlib.Path) -> None:
798 """
799 With push_to_hub mocked, 14 batch calls on 7 000 records should
800 complete in < 500ms. Any overhead above that is pure Python CPU cost.
801 """
802 from muse.core.coord_bus import MAX_PUSH_BATCH
803 root = _make_repo(tmp_path)
804
805 # Write 1000 each of 7 kinds = 7000 total
806 for kind in _ALL_KINDS:
807 _write_records(root, kind, 1000)
808
809 call_count: list[int] = []
810
811 def fake_push(hub_url: str, owner: str, slug: str, records: list[JsonDict], signing: SigningIdentity | None = None) -> MsgpackDict:
812 call_count.append(len(records))
813 return {"inserted": len(records), "skipped": 0}
814
815 with patch("muse.cli.commands.coord_sync._gather_local_records") as mock_gather, \
816 patch("muse.cli.commands.coord_sync.push_to_hub", side_effect=fake_push), \
817 patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \
818 patch("muse.cli.commands.coord_sync._resolve_hub_and_signing",
819 return_value=("https://localhost:1337", "tok")):
820
821 # Build 7000 records in memory for mock_gather
822 records = []
823 for i in range(7000):
824 kind = _ALL_KINDS[i % len(_ALL_KINDS)]
825 records.append({
826 "kind": kind,
827 "record_id": _new_id(),
828 "run_id": f"run-{i}",
829 "payload": {"idx": i},
830 "expires_at": _FUTURE_TS,
831 })
832 mock_gather.return_value = records
833
834 import argparse
835 args = argparse.Namespace(
836 owner="torvalds", slug="linux",
837 json_out=False, token=None,
838 hub_url=None,
839 kinds=list(_ALL_KINDS),
840 )
841
842 t0 = time.monotonic()
843 try:
844 from muse.cli.commands.coord_sync import run_push
845 run_push(args)
846 except SystemExit as exc:
847 assert exc.code == 0, f"run_push exited with code {exc.code}"
848 elapsed = time.monotonic() - t0
849
850 expected_batches = (7000 + MAX_PUSH_BATCH - 1) // MAX_PUSH_BATCH
851 assert len(call_count) == expected_batches, (
852 f"expected {expected_batches} batches, got {len(call_count)}"
853 )
854 assert elapsed < 0.500, (
855 f"batch loop (push_to_hub mocked) for 7000 records took {elapsed:.3f}s (> 500ms)"
856 )
857
858 def test_batch_sizes_are_never_over_max_push_batch(self, tmp_path: pathlib.Path) -> None:
859 """Every batch must be ≤ MAX_PUSH_BATCH records."""
860 from muse.core.coord_bus import MAX_PUSH_BATCH
861
862 root = _make_repo(tmp_path)
863 call_sizes: list[int] = []
864
865 def fake_push(hub_url: str, owner: str, slug: str, records: list[JsonDict], signing: SigningIdentity | None = None) -> MsgpackDict:
866 call_sizes.append(len(records))
867 return {"inserted": len(records), "skipped": 0}
868
869 with patch("muse.cli.commands.coord_sync._gather_local_records") as mock_gather, \
870 patch("muse.cli.commands.coord_sync.push_to_hub", side_effect=fake_push), \
871 patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \
872 patch("muse.cli.commands.coord_sync._resolve_hub_and_signing",
873 return_value=("https://localhost:1337", "tok")):
874
875 records = [
876 {
877 "kind": "reservation",
878 "record_id": _new_id(),
879 "run_id": "run-0",
880 "payload": {},
881 "expires_at": None,
882 }
883 for i in range(3333) # not a clean multiple of 500
884 ]
885 mock_gather.return_value = records
886
887 import argparse
888 args = argparse.Namespace(
889 owner="torvalds", slug="linux",
890 json_out=False, token=None,
891 hub_url=None,
892 kinds=["reservation"],
893 )
894 try:
895 from muse.cli.commands.coord_sync import run_push
896 run_push(args)
897 except SystemExit:
898 pass
899
900 assert all(s <= MAX_PUSH_BATCH for s in call_sizes), (
901 f"batch sizes {call_sizes} contain a batch > MAX_PUSH_BATCH ({MAX_PUSH_BATCH})"
902 )
903
904 def test_last_batch_is_correct_remainder(self, tmp_path: pathlib.Path) -> None:
905 """With 7001 records (14 full + 1 leftover), last batch must be size 1."""
906 from muse.core.coord_bus import MAX_PUSH_BATCH
907
908 root = _make_repo(tmp_path)
909 call_sizes: list[int] = []
910
911 def fake_push(hub_url: str, owner: str, slug: str, records: list[JsonDict], signing: SigningIdentity | None = None) -> MsgpackDict:
912 call_sizes.append(len(records))
913 return {"inserted": len(records), "skipped": 0}
914
915 with patch("muse.cli.commands.coord_sync._gather_local_records") as mock_gather, \
916 patch("muse.cli.commands.coord_sync.push_to_hub", side_effect=fake_push), \
917 patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \
918 patch("muse.cli.commands.coord_sync._resolve_hub_and_signing",
919 return_value=("https://localhost:1337", "tok")):
920
921 n = MAX_PUSH_BATCH * 14 + 1 # 7001
922 records = [
923 {
924 "kind": "reservation",
925 "record_id": _new_id(),
926 "run_id": "run-0",
927 "payload": {},
928 "expires_at": None,
929 }
930 for i in range(n)
931 ]
932 mock_gather.return_value = records
933
934 import argparse
935 args = argparse.Namespace(
936 owner="torvalds", slug="linux",
937 json_out=False, token=None,
938 hub_url=None,
939 kinds=["reservation"],
940 )
941 try:
942 from muse.cli.commands.coord_sync import run_push
943 run_push(args)
944 except SystemExit:
945 pass
946
947 assert len(call_sizes) == 15, f"expected 15 batches for {n} records, got {len(call_sizes)}"
948 assert call_sizes[-1] == 1, f"last batch should be 1 record, got {call_sizes[-1]}"
949
950 def test_zero_records_no_http_calls(self, tmp_path: pathlib.Path) -> None:
951 """Empty gather must result in zero push_to_hub calls."""
952 root = _make_repo(tmp_path)
953 call_count: list[int] = []
954
955 def fake_push(hub_url: str, owner: str, slug: str, records: list[JsonDict], signing: SigningIdentity | None = None) -> MsgpackDict:
956 call_count.append(len(records))
957 return {"inserted": 0, "skipped": 0}
958
959 with patch("muse.cli.commands.coord_sync._gather_local_records",
960 return_value=[]), \
961 patch("muse.cli.commands.coord_sync.push_to_hub", side_effect=fake_push), \
962 patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \
963 patch("muse.cli.commands.coord_sync._resolve_hub_and_signing",
964 return_value=("https://localhost:1337", "tok")):
965
966 import argparse
967 args = argparse.Namespace(
968 owner="torvalds", slug="linux",
969 json_out=False, token=None,
970 hub_url=None,
971 kinds=list(_ALL_KINDS),
972 )
973 try:
974 from muse.cli.commands.coord_sync import run_push
975 run_push(args)
976 except SystemExit:
977 pass
978
979 assert call_count == [], f"expected 0 HTTP calls for empty push, got {len(call_count)}"
980
981
982 # =============================================================================
983 # 7. Memory footprint
984 # =============================================================================
985
986 class TestCoordPerfMemory:
987 """
988 _gather_local_records holds all N records in a flat list before returning.
989 At 10 000 records with 10KB payloads, that is ~100 MB of in-process memory.
990 These tests document and bound the memory footprint.
991 """
992
993 def test_gather_1000_records_memory_under_50mb(self, tmp_path: pathlib.Path) -> None:
994 root = _make_repo(tmp_path)
995 _write_records(root, "reservation", 1000, payload_bytes=1024)
996 gather = _import_gather()
997
998 tracemalloc.start()
999 snap_before = tracemalloc.take_snapshot()
1000 records = gather(root, ["reservation"])
1001 snap_after = tracemalloc.take_snapshot()
1002 tracemalloc.stop()
1003
1004 stats = snap_after.compare_to(snap_before, "lineno")
1005 delta_bytes = sum(s.size_diff for s in stats if s.size_diff > 0)
1006
1007 assert len(records) == 1000
1008 assert delta_bytes < 50 * 1024 * 1024, (
1009 f"gather(1000 × 1KB) allocated {delta_bytes / 1024 / 1024:.1f}MB (> 50MB)"
1010 )
1011
1012 def test_gather_all_7_kinds_200_each_memory_under_100mb(self, tmp_path: pathlib.Path) -> None:
1013 """1400 records × 1KB payload = 1.4 MB on disk; heap delta should be < 100 MB."""
1014 root = _make_repo(tmp_path)
1015 for kind in _ALL_KINDS:
1016 _write_records(root, kind, 200, payload_bytes=1024)
1017 gather = _import_gather()
1018
1019 tracemalloc.start()
1020 snap_before = tracemalloc.take_snapshot()
1021 records = gather(root, list(_ALL_KINDS))
1022 snap_after = tracemalloc.take_snapshot()
1023 tracemalloc.stop()
1024
1025 stats = snap_after.compare_to(snap_before, "lineno")
1026 delta_bytes = sum(s.size_diff for s in stats if s.size_diff > 0)
1027
1028 assert len(records) == 1400
1029 assert delta_bytes < 100 * 1024 * 1024, (
1030 f"gather(1400 × 1KB, all kinds) allocated {delta_bytes / 1024 / 1024:.1f}MB (> 100MB)"
1031 )
1032
1033 def test_write_remote_1000_records_memory_under_100mb(self, tmp_path: pathlib.Path) -> None:
1034 root = _make_repo(tmp_path)
1035 write_remote = _import_write_remote()
1036
1037 records = [
1038 {
1039 "kind": _ALL_KINDS[i % len(_ALL_KINDS)],
1040 "record_id": _new_id(),
1041 "run_id": f"run-{i}",
1042 "payload": {"data": "x" * 1024},
1043 "expires_at": _FUTURE_TS,
1044 }
1045 for i in range(1000)
1046 ]
1047
1048 tracemalloc.start()
1049 snap_before = tracemalloc.take_snapshot()
1050 write_remote(root, records)
1051 snap_after = tracemalloc.take_snapshot()
1052 tracemalloc.stop()
1053
1054 stats = snap_after.compare_to(snap_before, "lineno")
1055 delta_bytes = sum(s.size_diff for s in stats if s.size_diff > 0)
1056
1057 assert delta_bytes < 100 * 1024 * 1024, (
1058 f"write_remote(1000 × 1KB) allocated {delta_bytes / 1024 / 1024:.1f}MB (> 100MB)"
1059 )
1060
1061 def test_gc_1000_reservations_memory_under_50mb(self, tmp_path: pathlib.Path) -> None:
1062 root = _make_repo(tmp_path)
1063 _write_expired_reservations(root, 1000)
1064 run_coord_gc = _import_run_coord_gc()
1065
1066 tracemalloc.start()
1067 snap_before = tracemalloc.take_snapshot()
1068 run_coord_gc(root, dry_run=False, grace_period_seconds=0)
1069 snap_after = tracemalloc.take_snapshot()
1070 tracemalloc.stop()
1071
1072 stats = snap_after.compare_to(snap_before, "lineno")
1073 delta_bytes = sum(s.size_diff for s in stats if s.size_diff > 0)
1074
1075 assert delta_bytes < 50 * 1024 * 1024, (
1076 f"GC(1000 expired) allocated {delta_bytes / 1024 / 1024:.1f}MB (> 50MB)"
1077 )
1078
1079
1080 # =============================================================================
1081 # 8. End-to-end throughput
1082 # =============================================================================
1083
1084 class TestCoordPerfThroughput:
1085 """
1086 End-to-end gather → batch → push cycle with a mock HTTP layer.
1087 Measures total records/second and total wall-clock time.
1088 """
1089
1090 def test_end_to_end_500_records_under_1s(self, tmp_path: pathlib.Path) -> None:
1091 root = _make_repo(tmp_path)
1092 for kind in _ALL_KINDS:
1093 _write_records(root, kind, 500 // len(_ALL_KINDS) + 1)
1094
1095 call_count = []
1096
1097 def fake_push(hub_url: str, owner: str, slug: str, records: list[JsonDict], signing: SigningIdentity | None = None) -> MsgpackDict:
1098 call_count.append(len(records))
1099 return {"inserted": len(records), "skipped": 0}
1100
1101 with patch("muse.cli.commands.coord_sync.push_to_hub", side_effect=fake_push), \
1102 patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \
1103 patch("muse.cli.commands.coord_sync._resolve_hub_and_signing",
1104 return_value=("https://localhost:1337", "tok")):
1105
1106 import argparse
1107 args = argparse.Namespace(
1108 owner="torvalds", slug="linux",
1109 json_out=False, token=None,
1110 hub_url=None,
1111 kinds=list(_ALL_KINDS),
1112 )
1113
1114 t0 = time.monotonic()
1115 try:
1116 from muse.cli.commands.coord_sync import run_push
1117 run_push(args)
1118 except SystemExit as exc:
1119 assert exc.code == 0
1120
1121 elapsed = time.monotonic() - t0
1122
1123 total_pushed = sum(call_count)
1124 assert total_pushed > 0
1125 assert elapsed < 1.0, f"end-to-end ~500 records took {elapsed:.3f}s (> 1s)"
1126
1127 @pytest.mark.slow
1128 def test_end_to_end_7000_records_under_10s(self, tmp_path: pathlib.Path) -> None:
1129 """
1130 Full Linux kernel agent swarm: 7 000 records across all 7 kinds.
1131 push_to_hub is mocked — this measures gather + batch splitting overhead only.
1132 """
1133 root = _make_repo(tmp_path)
1134 for kind in _ALL_KINDS:
1135 _write_records(root, kind, 1000)
1136
1137 call_count = []
1138
1139 def fake_push(hub_url: str, owner: str, slug: str, records: list[JsonDict], signing: SigningIdentity | None = None) -> MsgpackDict:
1140 call_count.append(len(records))
1141 return {"inserted": len(records), "skipped": 0}
1142
1143 with patch("muse.cli.commands.coord_sync.push_to_hub", side_effect=fake_push), \
1144 patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \
1145 patch("muse.cli.commands.coord_sync._resolve_hub_and_signing",
1146 return_value=("https://localhost:1337", "tok")):
1147
1148 import argparse
1149 args = argparse.Namespace(
1150 owner="torvalds", slug="linux",
1151 json_out=False, token=None,
1152 hub_url=None,
1153 kinds=list(_ALL_KINDS),
1154 )
1155
1156 t0 = time.monotonic()
1157 try:
1158 from muse.cli.commands.coord_sync import run_push
1159 run_push(args)
1160 except SystemExit as exc:
1161 assert exc.code == 0
1162
1163 elapsed = time.monotonic() - t0
1164
1165 total_pushed = sum(call_count)
1166 assert total_pushed == 7000, f"expected 7000 records pushed, got {total_pushed}"
1167 assert elapsed < 10.0, (
1168 f"end-to-end 7000 records (gather + batch, mocked HTTP) took {elapsed:.3f}s (> 10s)"
1169 )
1170
1171 rps = total_pushed / elapsed
1172 assert rps >= 700, f"end-to-end throughput {rps:.0f} records/s is below minimum 700/s"
1173
1174 def test_repeated_pulls_do_not_accumulate_state(self, tmp_path: pathlib.Path) -> None:
1175 """
1176 run_pull is stateless per call — repeated calls on the same repo should
1177 have stable (not growing) elapsed time.
1178 """
1179 root = _make_repo(tmp_path)
1180 (coordination_dir(root) / _REMOTE_DIR_NAME).mkdir(parents=True, exist_ok=True)
1181
1182 fake_records = [
1183 {
1184 "kind": "reservation",
1185 "record_id": _new_id(),
1186 "run_id": "run-0",
1187 "payload": {"idx": i},
1188 "expires_at": _FUTURE_TS,
1189 }
1190 for i in range(200)
1191 ]
1192
1193 def fake_pull(
1194 hub_url: str,
1195 owner: str,
1196 slug: str,
1197 since_id: int = 0,
1198 kinds: list[str] | None = None,
1199 limit: int = 500,
1200 signing: SigningIdentity | None = None,
1201 ) -> MsgpackDict:
1202 return {"records": fake_records, "cursor": since_id + len(fake_records)}
1203
1204 with patch("muse.cli.commands.coord_sync.pull_from_hub", side_effect=fake_pull), \
1205 patch("muse.cli.commands.coord_sync.require_repo", return_value=root), \
1206 patch("muse.cli.commands.coord_sync._resolve_hub_and_signing",
1207 return_value=("https://localhost:1337", "tok")):
1208
1209 import argparse
1210
1211 elapsed_list = []
1212 for _ in range(5):
1213 args = argparse.Namespace(
1214 owner="torvalds", slug="linux",
1215 json_out=False, token=None,
1216 hub_url=None,
1217 since_id=0, limit=1000,
1218 kinds=list(_ALL_KINDS),
1219 )
1220 t0 = time.monotonic()
1221 try:
1222 from muse.cli.commands.coord_sync import run_pull
1223 run_pull(args)
1224 except SystemExit:
1225 pass
1226 elapsed_list.append(time.monotonic() - t0)
1227
1228 # The 5th call must not take more than 5× the 1st call — no accumulation
1229 assert elapsed_list[4] < max(elapsed_list[0] * 5, 0.500), (
1230 f"repeated pull times grew suspiciously: {[f'{e:.3f}' for e in elapsed_list]}"
1231 )
1232
1233
1234 _REMOTE_DIR_NAME = "remote"
File History 1 commit