gabriel / muse public

test_perf_phase3.py file-level

at sha256:a · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 πŸ’₯ blast risk
sha256:4 Merge branch 'dev' into main · gabriel · Jun 17, 2026
1 """Phase 3 β€” Performance regression tests.
2
3 Target metrics (measured on a 2024 MacBook Pro M4, macOS 15):
4
5 Phase 3.1 β€” Linux-kernel commit throughput
6 write_commit: β‰₯ 1 000 commits/sec
7 write_object: β‰₯ 2 000 objects/sec
8 build_snapshot_manifest: β‰₯ 10 000 files/sec
9 muse commit (e2e, 1 000-file workdir): < 5 000 ms
10
11 Phase 3.2 β€” Concurrent agent write storm
12 200 threads Γ— write_object: all objects readable, no corruption
13 200 threads Γ— write_commit: all commits readable, no corruption
14 100 threads Γ— write_head_commit: last-write-wins, valid ID written
15
16 Phase 3.3 β€” Memory ceiling
17 write_commit (10 000 commits): peak RSS < 512 MiB
18 build_snapshot_manifest (5 000 files): peak RSS < 128 MiB
19
20 Phase 3.3 extended β€” Linux-scale memory ceiling (100k / 75k)
21 get_all_commits (100 000 commits): peak RSS < 2 GiB [@slow]
22 get_commits_for_branch walk-cap: max_walk_commits bounds RSS
23 muse log --json pseudo-streaming: no double-buffer
24 build_snapshot_manifest (75 000 files): peak RSS < 512 MiB [@slow]
25 find_merge_base (deep chain): cap fires, no OOM
26 walk_commits_between: silent truncation at cap, not OOM
27
28 All slow tests are decorated with ``@pytest.mark.slow`` and are skipped by
29 default. Run the full suite with ``pytest tests/test_perf_phase3.py -v``.
30 """
31
32 from __future__ import annotations
33
34 type _FileStore = dict[str, bytes]
35
36 import datetime
37 import os
38 import pathlib
39 import resource
40 import sys
41 import threading
42 import time
43 import tracemalloc
44
45 import pytest
46 from unittest.mock import patch
47
48 from muse.core.object_store import (
49 _created_object_shards,
50 has_object,
51 object_path,
52 read_object,
53 write_object,
54 )
55 from muse.core.merge_engine import find_merge_base
56 from muse.core.snapshot import build_snapshot_manifest
57 from muse.core.ids import hash_commit as compute_commit_id
58 from muse.core.refs import (
59 write_branch_ref,
60 write_head_commit,
61 )
62 from muse.core.commits import (
63 CommitRecord,
64 get_all_commits,
65 get_commits_for_branch,
66 read_commit,
67 walk_commits_between_result,
68 write_commit,
69 )
70
71 # ---------------------------------------------------------------------------
72 # Helpers
73 # ---------------------------------------------------------------------------
74
75
76 from muse.core.types import blob_id, fake_id, split_id
77 from muse.core.paths import config_toml_path, heads_dir, muse_dir, repo_json_path
78
79
80
81 def _repo(tmp_path: pathlib.Path) -> pathlib.Path:
82 dot_muse = muse_dir(tmp_path)
83 dot_muse.mkdir()
84 (dot_muse / "repo.json").write_text('{"repo_id": "bench", "owner": "bench"}')
85 (dot_muse / "commits").mkdir()
86 (dot_muse / "snapshots").mkdir()
87 (dot_muse / "refs" / "heads").mkdir(parents=True)
88 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n")
89 return tmp_path
90
91
92 def _write_chain(
93 repo: pathlib.Path,
94 branch: str,
95 n: int,
96 snap_id: str = fake_id("chain-snap"),
97 start: int = 0,
98 ) -> str:
99 """Write a linear commit chain of length *n* and return the tip commit ID.
100
101 Sets the branch ref to the tip so ``get_commits_for_branch`` can walk it.
102 """
103 parent: str | None = None
104 tip = ""
105 for i in range(start, start + n):
106 msg = f"chain-{i:07d}"
107 ts = datetime.datetime(2026, 1, 1, i % 3600 // 3600, i % 3600 % 60, tzinfo=datetime.timezone.utc)
108 cid = compute_commit_id(
109 parent_ids=[parent] if parent else [],
110 snapshot_id=snap_id,
111 message=msg,
112 committed_at_iso=ts.isoformat(),
113 author="chain-agent",)
114 rec = CommitRecord(
115 commit_id=cid,
116 branch=branch,
117 snapshot_id=snap_id,
118 message=msg,
119 committed_at=ts,
120 parent_commit_id=parent,
121 parent2_commit_id=None,
122 author="chain-agent",
123 metadata={},
124 structured_delta=None,
125 sem_ver_bump="none",
126 breaking_changes=[],
127 agent_id="",
128 model_id="",
129 toolchain_id="",
130 prompt_hash="",
131 signature="",
132 signer_key_id="",
133 )
134 write_commit(repo, rec)
135 parent = cid
136 tip = cid
137 write_branch_ref(repo, branch, tip)
138 return tip
139
140
141 def _make_commit(index: int, snap_id: str, parent: str | None = None) -> CommitRecord:
142 """Build a CommitRecord whose ``commit_id`` passes content-hash verification."""
143 ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
144 msg = f"commit-{index:07d}"
145 cid = compute_commit_id(
146 parent_ids=[parent] if parent else [],
147 snapshot_id=snap_id,
148 message=msg,
149 committed_at_iso=ts.isoformat(),
150 author="perf-agent",)
151 return CommitRecord(
152 commit_id=cid,
153 branch="main",
154 snapshot_id=snap_id,
155 message=msg,
156 committed_at=ts,
157 parent_commit_id=parent,
158 parent2_commit_id=None,
159 author="perf-agent",
160 metadata={},
161 structured_delta=None,
162 sem_ver_bump="none",
163 breaking_changes=[],
164 agent_id="",
165 model_id="",
166 toolchain_id="",
167 prompt_hash="",
168 signature="",
169 signer_key_id="",
170 )
171
172
173 # ---------------------------------------------------------------------------
174 # Phase 3.1 β€” Linux-kernel commit throughput
175 # ---------------------------------------------------------------------------
176
177
178 class TestWriteCommitThroughput:
179 """write_commit must sustain β‰₯ 1 000 commits/sec in isolation.
180
181 The Linux-kernel migration target is 472 commits/sec over 850 000 commits
182 (< 30 min). A 1 000 commits/sec floor gives comfortable headroom for
183 real workload overhead (snapshot building, object writes, disk pressure).
184
185 fsync is mocked: these tests measure msgpack serialisation + filesystem
186 metadata throughput, not OS I/O durability. Durability ordering is
187 verified by test_integrity_I2_fsync.py.
188 """
189
190 _MIN_COMMITS_PER_SEC = 1_000
191
192 @pytest.fixture(autouse=True)
193 def no_fsync(self) -> None:
194 """Mock out all fsync calls so the test measures algorithmic throughput."""
195 with patch("muse.core.commits.os.fsync", return_value=None):
196 yield
197
198 @pytest.mark.slow
199 def test_write_commit_throughput_10k(self, tmp_path: pathlib.Path) -> None:
200 """Write 10 000 commits and assert throughput β‰₯ 1 000 commits/sec."""
201 repo = _repo(tmp_path)
202 snap_id = fake_id("snap-a")
203 N = 10_000
204 commits = [_make_commit(i, snap_id) for i in range(N)]
205
206 t0 = time.perf_counter()
207 for rec in commits:
208 write_commit(repo, rec)
209 elapsed = time.perf_counter() - t0
210 rate = N / elapsed
211
212 assert rate >= self._MIN_COMMITS_PER_SEC, (
213 f"write_commit throughput {rate:.0f} commits/sec is below the "
214 f"minimum {self._MIN_COMMITS_PER_SEC} commits/sec. "
215 f"(10k commits took {elapsed:.2f}s. "
216 f"Linux-kernel migration target: 472 commits/sec.)"
217 )
218
219 def test_write_commit_throughput_1k_fast(self, tmp_path: pathlib.Path) -> None:
220 """Smoke-speed: 1 000 commits must complete within 5 seconds."""
221 repo = _repo(tmp_path)
222 snap_id = fake_id("snap-b")
223 N = 1_000
224 commits = [_make_commit(i, snap_id) for i in range(N)]
225
226 t0 = time.perf_counter()
227 for rec in commits:
228 write_commit(repo, rec)
229 elapsed = time.perf_counter() - t0
230
231 assert elapsed <= 15.0, (
232 f"1 000 write_commit calls took {elapsed:.2f}s β€” expected ≀ 15.0s."
233 )
234
235
236 class TestWriteObjectThroughput:
237 """write_object must sustain β‰₯ 1 500 objects/sec in isolation.
238
239 fsync is mocked: the test measures mkstemp + hash-verify + fchmod +
240 os.replace throughput without OS I/O latency. Durability ordering is
241 verified by test_integrity_I2_fsync.py.
242 """
243
244 _MIN_OBJECTS_PER_SEC = 1_500
245
246 @pytest.fixture(autouse=True)
247 def no_fsync(self) -> None:
248 """Mock out all fsync calls so the test measures algorithmic throughput."""
249 with patch("muse.core.object_store._fsync_fd", return_value=None):
250 yield
251
252 @pytest.mark.slow
253 def test_write_object_throughput_10k(self, tmp_path: pathlib.Path) -> None:
254 """Write 10 000 4-KiB objects and assert throughput β‰₯ 2 000 objects/sec."""
255 repo = _repo(tmp_path)
256 N = 10_000
257 items = [
258 (
259 blob_id(f"perf-obj-{i:08d}".encode() * 16),
260 f"perf-obj-{i:08d}".encode() * 16,
261 )
262 for i in range(N)
263 ]
264
265 t0 = time.perf_counter()
266 for oid, content in items:
267 write_object(repo, oid, content)
268 elapsed = time.perf_counter() - t0
269 rate = N / elapsed
270
271 assert rate >= self._MIN_OBJECTS_PER_SEC, (
272 f"write_object throughput {rate:.0f} objects/sec is below the "
273 f"minimum {self._MIN_OBJECTS_PER_SEC} objects/sec. "
274 f"(10k objects took {elapsed:.2f}s.)"
275 )
276
277 @pytest.mark.perf
278 def test_write_object_throughput_1k_fast(self, tmp_path: pathlib.Path) -> None:
279 """Smoke-speed: 1 000 objects must complete within 2 seconds."""
280 repo = _repo(tmp_path)
281 N = 1_000
282 items = [
283 (
284 blob_id(f"fast-obj-{i:08d}".encode() * 8),
285 f"fast-obj-{i:08d}".encode() * 8,
286 )
287 for i in range(N)
288 ]
289
290 t0 = time.perf_counter()
291 for oid, content in items:
292 write_object(repo, oid, content)
293 elapsed = time.perf_counter() - t0
294
295 assert elapsed <= 6.0, (
296 f"1 000 write_object calls took {elapsed:.2f}s β€” expected ≀ 6.0s."
297 )
298
299
300 class TestSnapshotManifestThroughput:
301 """build_snapshot_manifest must sustain β‰₯ 10 000 files/sec."""
302
303 _MIN_FILES_PER_SEC = 10_000
304
305 @pytest.mark.slow
306 def test_snapshot_5k_files(self, tmp_path: pathlib.Path) -> None:
307 """Build manifest of 5 000 files; assert β‰₯ 10 000 files/sec."""
308 root = tmp_path / "workdir"
309 root.mkdir()
310 muse_dir(root).mkdir()
311 (repo_json_path(root)).write_text('{"repo_id": "bench"}')
312
313 # 50 dirs Γ— 100 files = 5 000 files
314 for d in range(50):
315 dp = root / f"pkg_{d:03d}"
316 dp.mkdir()
317 for f in range(100):
318 (dp / f"file_{f:03d}.py").write_bytes(
319 f"# content-{d}-{f}\n".encode() * 50
320 )
321
322 t0 = time.perf_counter()
323 manifest = build_snapshot_manifest(root)
324 elapsed = time.perf_counter() - t0
325 rate = len(manifest) / elapsed
326
327 assert len(manifest) == 5_000
328 assert rate >= self._MIN_FILES_PER_SEC, (
329 f"build_snapshot_manifest throughput {rate:.0f} files/sec is below "
330 f"the minimum {self._MIN_FILES_PER_SEC} files/sec. "
331 f"(5k files took {elapsed:.3f}s.)"
332 )
333
334 def test_snapshot_500_files_fast(self, tmp_path: pathlib.Path) -> None:
335 """Smoke-speed: 500-file manifest must complete within 500 ms."""
336 root = tmp_path / "workdir"
337 root.mkdir()
338 muse_dir(root).mkdir()
339 (repo_json_path(root)).write_text('{"repo_id": "bench"}')
340
341 for d in range(25):
342 dp = root / f"pkg_{d:02d}"
343 dp.mkdir()
344 for f in range(20):
345 (dp / f"file_{f:02d}.py").write_bytes(b"x" * 200)
346
347 t0 = time.perf_counter()
348 manifest = build_snapshot_manifest(root)
349 elapsed = time.perf_counter() - t0
350
351 assert len(manifest) == 500
352 assert elapsed <= 0.5, (
353 f"500-file manifest took {elapsed*1000:.1f} ms β€” expected ≀ 500 ms."
354 )
355
356
357 # ---------------------------------------------------------------------------
358 # Phase 3.2 β€” Concurrent agent write storm
359 # ---------------------------------------------------------------------------
360
361
362 class TestConcurrentWriteObjectStorm:
363 """200 threads writing distinct objects β€” no corruption, no data loss."""
364
365 def test_200_threads_write_distinct_objects(self, tmp_path: pathlib.Path) -> None:
366 """200 threads each write 50 distinct objects; all must be readable after join."""
367 repo = _repo(tmp_path)
368 N_THREADS = 200
369 N_PER_THREAD = 50
370 written: _FileStore = {}
371 lock = threading.Lock()
372 errors: list[str] = []
373
374 # Pre-compute all objects to avoid per-thread hashing noise.
375 all_items: list[list[tuple[str, bytes]]] = []
376 for t in range(N_THREADS):
377 thread_items: list[tuple[str, bytes]] = []
378 for i in range(N_PER_THREAD):
379 content = f"thread-{t:03d}-obj-{i:03d}".encode() * 4
380 oid = blob_id(content)
381 thread_items.append((oid, content))
382 with lock:
383 written[oid] = content
384 all_items.append(thread_items)
385
386 def writer(items: list[tuple[str, bytes]]) -> None:
387 try:
388 for oid, content in items:
389 write_object(repo, oid, content)
390 except Exception as exc:
391 with lock:
392 errors.append(str(exc))
393
394 threads = [
395 threading.Thread(target=writer, args=(all_items[t],))
396 for t in range(N_THREADS)
397 ]
398 for th in threads:
399 th.start()
400 for th in threads:
401 th.join(timeout=30.0)
402
403 assert not errors, f"Write errors during concurrent storm: {errors[:3]}"
404
405 # Verify every object is readable and byte-identical.
406 missing: list[str] = []
407 corrupt: list[str] = []
408 for oid, expected in written.items():
409 if not has_object(repo, oid):
410 missing.append(oid[:8])
411 else:
412 actual = read_object(repo, oid)
413 if actual != expected:
414 corrupt.append(oid[:8])
415
416 assert not missing, f"{len(missing)} objects missing after concurrent write storm"
417 assert not corrupt, f"{len(corrupt)} objects corrupted after concurrent write storm"
418
419
420 class TestConcurrentWriteCommitStorm:
421 """200 threads writing distinct commits β€” all must be readable after join."""
422
423 def test_200_threads_write_distinct_commits(self, tmp_path: pathlib.Path) -> None:
424 """200 threads each write 25 distinct commits; all must be readable."""
425 repo = _repo(tmp_path)
426 N_THREADS = 200
427 N_PER_THREAD = 25
428 snap_id = fake_id("snap-c")
429 all_records: list[list[CommitRecord]] = []
430 errors: list[str] = []
431 lock = threading.Lock()
432
433 for t in range(N_THREADS):
434 thread_recs: list[CommitRecord] = []
435 for i in range(N_PER_THREAD):
436 rec = _make_commit(t * N_PER_THREAD + i, snap_id)
437 thread_recs.append(rec)
438 all_records.append(thread_recs)
439
440 def writer(recs: list[CommitRecord]) -> None:
441 try:
442 for rec in recs:
443 write_commit(repo, rec)
444 except Exception as exc:
445 with lock:
446 errors.append(str(exc))
447
448 threads = [
449 threading.Thread(target=writer, args=(all_records[t],))
450 for t in range(N_THREADS)
451 ]
452 for th in threads:
453 th.start()
454 for th in threads:
455 th.join(timeout=30.0)
456
457 assert not errors, f"Write errors during concurrent commit storm: {errors[:3]}"
458
459 # All commits must be readable.
460 total = N_THREADS * N_PER_THREAD
461 missing: list[str] = []
462 for recs in all_records:
463 for rec in recs:
464 result = read_commit(repo, rec.commit_id)
465 if result is None:
466 missing.append(rec.commit_id[:8])
467
468 assert not missing, (
469 f"{len(missing)}/{total} commits missing after concurrent write storm"
470 )
471
472
473 class TestConcurrentWriteBranchRef:
474 """100 threads racing on write_branch_ref β€” last write wins, no corruption."""
475
476 def test_100_threads_write_branch_ref_last_write_wins(
477 self, tmp_path: pathlib.Path
478 ) -> None:
479 """100 threads each writing write_branch_ref; result must be a valid 64-char hex ID."""
480 repo = _repo(tmp_path)
481 refs_dir = heads_dir(repo) # already created by _repo()
482
483 N = 100
484 snap_id = fake_id("snap-d")
485 commit_ids: list[str] = []
486 errors: list[str] = []
487 lock = threading.Lock()
488
489 for i in range(N):
490 cid = compute_commit_id(parent_ids=[], snapshot_id=snap_id, message=f"head-{i}", committed_at_iso="2026-01-01T00:00:00+00:00")
491 commit_ids.append(cid)
492
493 def writer(cid: str) -> None:
494 try:
495 write_branch_ref(repo, "main", cid)
496 except Exception as exc:
497 with lock:
498 errors.append(str(exc))
499
500 threads = [threading.Thread(target=writer, args=(commit_ids[i],)) for i in range(N)]
501 for th in threads:
502 th.start()
503 for th in threads:
504 th.join(timeout=10.0)
505
506 assert not errors, f"Errors during concurrent write_branch_ref: {errors[:3]}"
507
508 # The branch ref must contain exactly one of the written IDs.
509 ref_file = refs_dir / "main"
510 assert ref_file.exists(), "Branch ref file was lost after concurrent writes"
511 final_cid = ref_file.read_text(encoding="utf-8").strip()
512 # Commit IDs are now "sha256:<64hex>" = 71 chars
513 assert final_cid.startswith("sha256:"), f"Branch ref has unexpected format: {final_cid!r}"
514 _, hex_part = split_id(final_cid)
515 assert len(hex_part) == 64, f"Branch ref hex part has unexpected length: {final_cid!r}"
516 assert all(c in "0123456789abcdef" for c in hex_part), (
517 f"Branch ref is not a valid hex ID: {final_cid!r}"
518 )
519 assert final_cid in commit_ids, (
520 f"Branch ref {final_cid[:19]} is not one of the written IDs"
521 )
522
523
524 # ---------------------------------------------------------------------------
525 # Phase 3.3 β€” Memory ceiling under load
526 # ---------------------------------------------------------------------------
527
528
529 class TestMemoryCeiling:
530 """Peak memory must not grow proportionally to the number of objects or commits."""
531
532 _MAX_WRITE_COMMIT_MIB = 512
533 _MAX_SNAPSHOT_MIB = 128
534
535 @pytest.mark.slow
536 def test_write_10k_commits_peak_rss_under_512_mib(
537 self, tmp_path: pathlib.Path
538 ) -> None:
539 """Writing 10 000 commits must not exceed 512 MiB peak RSS.
540
541 write_commit buffers one commit at a time; it must not accumulate
542 a list of all commits in memory.
543 """
544 repo = _repo(tmp_path)
545 snap_id = fake_id("snap-e")
546 N = 10_000
547
548 tracemalloc.start()
549 tracemalloc.clear_traces()
550
551 for i in range(N):
552 rec = _make_commit(i, snap_id)
553 write_commit(repo, rec)
554
555 _, peak_bytes = tracemalloc.get_traced_memory()
556 tracemalloc.stop()
557
558 peak_mib = peak_bytes / (1024 * 1024)
559 assert peak_mib <= self._MAX_WRITE_COMMIT_MIB, (
560 f"write_commit peak allocation {peak_mib:.1f} MiB exceeds "
561 f"{self._MAX_WRITE_COMMIT_MIB} MiB for 10k commits. "
562 "write_commit must stream one record at a time."
563 )
564
565 def test_snapshot_5k_files_peak_rss_under_128_mib(
566 self, tmp_path: pathlib.Path
567 ) -> None:
568 """build_snapshot_manifest on 5 000 small files stays under 128 MiB."""
569 root = tmp_path / "workdir"
570 root.mkdir()
571 muse_dir(root).mkdir()
572 (repo_json_path(root)).write_text('{"repo_id": "bench"}')
573
574 for d in range(50):
575 dp = root / f"d_{d:02d}"
576 dp.mkdir()
577 for f in range(100):
578 (dp / f"f_{f:02d}.txt").write_bytes(b"x" * 512)
579
580 tracemalloc.start()
581 tracemalloc.clear_traces()
582 build_snapshot_manifest(root)
583 _, peak_bytes = tracemalloc.get_traced_memory()
584 tracemalloc.stop()
585
586 peak_mib = peak_bytes / (1024 * 1024)
587 assert peak_mib <= self._MAX_SNAPSHOT_MIB, (
588 f"build_snapshot_manifest peak allocation {peak_mib:.1f} MiB "
589 f"exceeds {self._MAX_SNAPSHOT_MIB} MiB for 5k files. "
590 "The manifest dict must not accumulate large intermediate buffers."
591 )
592
593
594 # ---------------------------------------------------------------------------
595 # Phase 3.1 β€” Shard-cache amortisation (regression guard)
596 # ---------------------------------------------------------------------------
597
598
599 class TestShardCacheAmortisation:
600 """The shard-validation cache eliminates O(objects) resolve() calls.
601
602 This test is not a timing test β€” it verifies the structural invariant:
603 after N writes to the same shard, _created_object_shards contains exactly
604 that shard entry and subsequent writes to the same shard do not re-trigger
605 path-resolution (proved by exercising the idempotent write path).
606 """
607
608 def test_shard_cache_populated_after_first_write(
609 self, tmp_path: pathlib.Path
610 ) -> None:
611 """After the first write to a shard, _created_object_shards contains its path."""
612 repo = _repo(tmp_path)
613 content = b"shard-cache-test"
614 oid = blob_id(content)
615 shard_str = str(object_path(repo, oid).parent)
616
617 # Ensure cache starts without this shard.
618 _created_object_shards.discard(shard_str)
619
620 write_object(repo, oid, content)
621
622 assert shard_str in _created_object_shards, (
623 f"Shard {oid[:2]} not recorded in _created_object_shards after first write. "
624 "The amortisation optimisation is not active."
625 )
626
627 def test_repeated_writes_to_same_shard_succeed(
628 self, tmp_path: pathlib.Path
629 ) -> None:
630 """50 distinct writes to the same shard all land correctly."""
631 repo = _repo(tmp_path)
632 # Force a fixed shard prefix by constructing objects with the same prefix.
633 # We use a known prefix and craft content that happens to hash to it.
634 # Easier: just write 50 objects and verify they all land.
635 N = 50
636 items = [
637 (
638 blob_id(f"repeat-shard-{i:04d}".encode()),
639 f"repeat-shard-{i:04d}".encode(),
640 )
641 for i in range(N)
642 ]
643
644 for oid, content in items:
645 write_object(repo, oid, content)
646
647 missing = [oid[:8] for oid, _ in items if not has_object(repo, oid)]
648 assert not missing, (
649 f"{len(missing)} objects missing after repeated writes: {missing[:5]}"
650 )
651
652 def test_idempotent_write_bypasses_shard_write(
653 self, tmp_path: pathlib.Path
654 ) -> None:
655 """write_object returns False (idempotent) on the second call for the same OID."""
656 repo = _repo(tmp_path)
657 content = b"idempotent-check"
658 oid = blob_id(content)
659
660 first = write_object(repo, oid, content)
661 second = write_object(repo, oid, content)
662
663 assert first is True, "First write should return True"
664 assert second is False, "Second write should return False (already exists)"
665
666
667 # ---------------------------------------------------------------------------
668 # Phase 3.3 extended β€” Linux-scale memory ceiling
669 # ---------------------------------------------------------------------------
670
671
672 class TestGetAllCommitsMemory:
673 """get_all_commits must not OOM under Linux-scale commit counts.
674
675 This is the highest-risk accumulator: it loads *every* CommitRecord in the
676 store into a list simultaneously with no cap. At 100k commits Γ— ~2 KB per
677 serialised record the baseline is ~200 MiB. Commits with large
678 ``structured_delta`` payloads can be 100 KB each (100k Γ— 100 KB = 10 GiB).
679 The 64 MiB per-record msgpack cap (MAX_MSGPACK_BYTES) provides the guard.
680
681 The @slow variants build real on-disk commit chains; the fast variant uses
682 a small chain to confirm the structural property with tracemalloc.
683 """
684
685 def test_get_all_commits_1k_peak_rss_under_128_mib(
686 self, tmp_path: pathlib.Path
687 ) -> None:
688 """get_all_commits on 1 000 commits stays under 128 MiB (fast smoke)."""
689 repo = _repo(tmp_path)
690 N = 1_000
691 snap_id = fake_id("snap-aa")
692 for i in range(N):
693 write_commit(repo, _make_commit(i, snap_id))
694
695 tracemalloc.start()
696 tracemalloc.clear_traces()
697 results = get_all_commits(repo)
698 _, peak_bytes = tracemalloc.get_traced_memory()
699 tracemalloc.stop()
700
701 assert len(results) == N, f"Expected {N} commits, got {len(results)}"
702 peak_mib = peak_bytes / (1024 * 1024)
703 assert peak_mib <= 128, (
704 f"get_all_commits({N}) peak {peak_mib:.1f} MiB β€” expected ≀ 128 MiB. "
705 "CommitRecord size has grown; re-audit the dataclass fields."
706 )
707
708 @pytest.mark.slow
709 def test_get_all_commits_100k_under_2_gib(
710 self, tmp_path: pathlib.Path
711 ) -> None:
712 """get_all_commits on 100 000 commits stays under 2 GiB.
713
714 100k Γ— minimal CommitRecord β‰ˆ 200 MiB. The 2 GiB ceiling allows a 10Γ—
715 margin for realistic payloads (metadata, structured_delta) while still
716 catching runaway accumulation.
717 """
718 repo = _repo(tmp_path)
719 N = 100_000
720 snap_id = fake_id("snap-bb")
721
722 for i in range(N):
723 write_commit(repo, _make_commit(i, snap_id))
724
725 _MAX_MIB = 2_048 # 2 GiB
726
727 tracemalloc.start()
728 tracemalloc.clear_traces()
729 results = get_all_commits(repo)
730 _, peak_bytes = tracemalloc.get_traced_memory()
731 tracemalloc.stop()
732
733 assert len(results) == N
734 peak_mib = peak_bytes / (1024 * 1024)
735 assert peak_mib <= _MAX_MIB, (
736 f"get_all_commits(100k) peak {peak_mib:.1f} MiB exceeds {_MAX_MIB} MiB. "
737 "The function loads all CommitRecords into a list simultaneously β€” "
738 "consider streaming or paginating for very large repos."
739 )
740
741
742 class TestGetCommitsForBranchWalkCap:
743 """get_commits_for_branch must honour its walk cap.
744
745 The cap is the primary memory guard for ``muse log --json``. A branch with
746 N commits deeper than the cap must return exactly cap records, not N β€” even
747 when filters are active and the caller passes max_count=0.
748 """
749
750 def test_walk_cap_bounds_returned_records(
751 self, tmp_path: pathlib.Path
752 ) -> None:
753 """Chain of 500 commits with cap=100 returns exactly 100 records."""
754 repo = _repo(tmp_path)
755 N = 500
756 CAP = 100
757 _write_chain(repo, "main", N)
758
759 results = get_commits_for_branch(repo, "main", max_count=CAP)
760
761 assert len(results) == CAP, (
762 f"Expected cap={CAP} records, got {len(results)} β€” "
763 "walk cap is not being respected."
764 )
765
766 def test_walk_cap_memory_bounded_deep_chain(
767 self, tmp_path: pathlib.Path
768 ) -> None:
769 """2 000-commit chain with default cap stays under 64 MiB."""
770 repo = _repo(tmp_path)
771 N = 2_000
772 CAP = 500
773 _write_chain(repo, "main", N)
774
775 tracemalloc.start()
776 tracemalloc.clear_traces()
777 results = get_commits_for_branch(repo, "main", max_count=CAP)
778 _, peak_bytes = tracemalloc.get_traced_memory()
779 tracemalloc.stop()
780
781 assert len(results) == CAP
782 peak_mib = peak_bytes / (1024 * 1024)
783 assert peak_mib <= 64, (
784 f"get_commits_for_branch(cap={CAP}) peak {peak_mib:.1f} MiB β€” "
785 f"expected ≀ 64 MiB for {CAP} records."
786 )
787
788
789 class TestLogJsonStreaming:
790 """``muse log --json`` must not double-buffer commit JSON strings.
791
792 The previous implementation accumulated ``commit_jsons: list[str]`` before
793 writing to stdout β€” doubling peak memory vs. the CommitRecord list alone.
794 The fix uses a first-item flag to write each record inline immediately.
795 This test verifies the fix: tracemalloc peak for a 500-commit log should
796 not contain the signature of a large string buffer.
797 """
798
799 def test_log_json_output_is_valid_json(
800 self, tmp_path: pathlib.Path
801 ) -> None:
802 """muse log --json on a 100-commit branch emits valid JSON with all commits."""
803 import json
804
805 from tests.cli_test_helper import CliRunner
806
807 repo = _repo(tmp_path)
808 N = 100
809 _write_chain(repo, "main", N)
810 (config_toml_path(repo)).write_text("")
811
812 runner = CliRunner()
813 result = runner.invoke(
814 None,
815 ["log", "--json", "--max-count", str(N)],
816 env={"MUSE_REPO_ROOT": str(repo)},
817 )
818 assert result.exit_code == 0, f"muse log --json failed:\n{result.output}"
819
820 payload = json.loads(result.output)
821 assert "commits" in payload, f"Missing 'commits' key: {payload}"
822 assert len(payload["commits"]) == N, (
823 f"Expected {N} commits in JSON output, got {len(payload['commits'])}"
824 )
825
826 def test_log_json_empty_repo_returns_empty_array(
827 self, tmp_path: pathlib.Path
828 ) -> None:
829 """muse log --json on a branch with no commits returns empty commits array."""
830 import json
831
832 from tests.cli_test_helper import CliRunner
833
834 repo = _repo(tmp_path)
835 (config_toml_path(repo)).write_text("")
836
837 runner = CliRunner()
838 result = runner.invoke(
839 None,
840 ["log", "--json"],
841 env={"MUSE_REPO_ROOT": str(repo)},
842 )
843 # No commits β†’ either exit 0 with empty commits array or a graceful
844 # "(no commits)" message; both are acceptable.
845 if result.exit_code == 0 and result.output.startswith("{"):
846 payload = json.loads(result.output)
847 assert payload.get("commits") == []
848
849
850 class TestFindMergeBaseMemory:
851 """find_merge_base must fire its cap cleanly, not OOM.
852
853 The default ``max_ancestors`` cap is 50 000. A chain deeper than the cap
854 must raise ``MuseCLIError`` with an actionable message rather than
855 exhausting all available memory. This test confirms the error path, not
856 the OOM path.
857 """
858
859 def test_merge_base_cap_raises_gracefully_on_deep_chain(
860 self, tmp_path: pathlib.Path
861 ) -> None:
862 """Chain deeper than max_ancestors raises MuseCLIError, not MemoryError."""
863 from muse.core.errors import MuseCLIError
864
865 repo = _repo(tmp_path)
866 (config_toml_path(repo)).write_text(
867 "[limits]\nmax_ancestors = 50\n"
868 )
869
870 tip_a = _write_chain(repo, "branchA", 60, snap_id="cc" * 32)
871 # branchB is entirely separate β€” no common ancestor with branchA.
872 tip_b = _write_chain(repo, "branchB", 60, snap_id="dd" * 32, start=1000)
873
874 with pytest.raises(MuseCLIError, match="max_ancestors"):
875 find_merge_base(repo, tip_a, tip_b)
876
877 def test_merge_base_found_within_cap(
878 self, tmp_path: pathlib.Path
879 ) -> None:
880 """find_merge_base finds the base when both branches are within cap."""
881 repo = _repo(tmp_path)
882 (config_toml_path(repo)).write_text(
883 "[limits]\nmax_ancestors = 500\n"
884 )
885
886 # Build a common root commit.
887 snap_id = "ee" * 32
888 root_cid = compute_commit_id(
889 parent_ids=[],
890 snapshot_id=snap_id,
891 message="root",
892 committed_at_iso="2026-01-01T00:00:00+00:00",
893 author="test",)
894 root_rec = CommitRecord(
895 commit_id=root_cid,
896 branch="main",
897 snapshot_id=snap_id,
898 message="root",
899 committed_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
900 parent_commit_id=None,
901 parent2_commit_id=None,
902 author="test",
903 metadata={},
904 structured_delta=None,
905 sem_ver_bump="none",
906 breaking_changes=[],
907 agent_id="",
908 model_id="",
909 toolchain_id="",
910 prompt_hash="",
911 signature="",
912 signer_key_id="",
913 )
914 write_commit(repo, root_rec)
915
916 # Extend branchA and branchB from the same root.
917 parent_a: str | None = root_cid
918 tip_a = root_cid
919 for i in range(20):
920 msg = f"a-{i:04d}"
921 ts = datetime.datetime(2026, 1, 2, tzinfo=datetime.timezone.utc)
922 cid = compute_commit_id(
923 parent_ids=[parent_a] if parent_a else [],
924 snapshot_id=snap_id,
925 message=msg,
926 committed_at_iso=ts.isoformat(),
927 author="test",)
928 rec = CommitRecord(
929 commit_id=cid,
930 branch="branchA",
931 snapshot_id=snap_id,
932 message=msg,
933 committed_at=ts,
934 parent_commit_id=parent_a,
935 parent2_commit_id=None,
936 author="test",
937 metadata={},
938 structured_delta=None,
939 sem_ver_bump="none",
940 breaking_changes=[],
941 agent_id="",
942 model_id="",
943 toolchain_id="",
944 prompt_hash="",
945 signature="",
946 signer_key_id="",
947 )
948 write_commit(repo, rec)
949 parent_a = cid
950 tip_a = cid
951
952 parent_b: str | None = root_cid
953 tip_b = root_cid
954 for i in range(15):
955 msg = f"b-{i:04d}"
956 ts = datetime.datetime(2026, 1, 3, tzinfo=datetime.timezone.utc)
957 cid = compute_commit_id(
958 parent_ids=[parent_b] if parent_b else [],
959 snapshot_id=snap_id,
960 message=msg,
961 committed_at_iso=ts.isoformat(),
962 author="test",)
963 rec = CommitRecord(
964 commit_id=cid,
965 branch="branchB",
966 snapshot_id=snap_id,
967 message=msg,
968 committed_at=ts,
969 parent_commit_id=parent_b,
970 parent2_commit_id=None,
971 author="test",
972 metadata={},
973 structured_delta=None,
974 sem_ver_bump="none",
975 breaking_changes=[],
976 agent_id="",
977 model_id="",
978 toolchain_id="",
979 prompt_hash="",
980 signature="",
981 signer_key_id="",
982 )
983 write_commit(repo, rec)
984 parent_b = cid
985 tip_b = cid
986
987 base = find_merge_base(repo, tip_a, tip_b)
988 assert base == root_cid, (
989 f"Expected merge base {root_cid[:8]}, got {base[:8] if base else None}"
990 )
991
992 def test_merge_base_memory_bounded_within_cap(
993 self, tmp_path: pathlib.Path
994 ) -> None:
995 """find_merge_base BFS uses bounded memory proportional to max_ancestors."""
996 from muse.core.errors import MuseCLIError
997
998 repo = _repo(tmp_path)
999 CAP = 200
1000 (config_toml_path(repo)).write_text(
1001 f"[limits]\nmax_ancestors = {CAP}\n"
1002 )
1003
1004 # Build a shallow divergence: 50 commits each, well inside the cap.
1005 snap_id = "ff" * 32
1006 root_cid = compute_commit_id(
1007 parent_ids=[],
1008 snapshot_id=snap_id,
1009 message="base",
1010 committed_at_iso="2026-01-01T00:00:00+00:00",
1011 author="t",)
1012 root_rec = CommitRecord(
1013 commit_id=root_cid,
1014 branch="main",
1015 snapshot_id=snap_id,
1016 message="base",
1017 committed_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc),
1018 parent_commit_id=None,
1019 parent2_commit_id=None,
1020 author="t",
1021 metadata={},
1022 structured_delta=None,
1023 sem_ver_bump="none",
1024 breaking_changes=[],
1025 agent_id="",
1026 model_id="",
1027 toolchain_id="",
1028 prompt_hash="",
1029 signature="",
1030 signer_key_id="",
1031 )
1032 write_commit(repo, root_rec)
1033
1034 def _extend(parent: str, prefix: str, n: int) -> str:
1035 tip = parent
1036 for i in range(n):
1037 msg = f"{prefix}-{i:04d}"
1038 ts = datetime.datetime(2026, 2, 1, tzinfo=datetime.timezone.utc)
1039 cid = compute_commit_id(
1040 parent_ids=[tip],
1041 snapshot_id=snap_id,
1042 message=msg,
1043 committed_at_iso=ts.isoformat(),
1044 author="t",)
1045 rec = CommitRecord(
1046 commit_id=cid,
1047 branch=prefix,
1048 snapshot_id=snap_id,
1049 message=msg,
1050 committed_at=ts,
1051 parent_commit_id=tip,
1052 parent2_commit_id=None,
1053 author="t",
1054 metadata={},
1055 structured_delta=None,
1056 sem_ver_bump="none",
1057 breaking_changes=[],
1058 agent_id="",
1059 model_id="",
1060 toolchain_id="",
1061 prompt_hash="",
1062 signature="",
1063 signer_key_id="",
1064 )
1065 write_commit(repo, rec)
1066 tip = cid
1067 return tip
1068
1069 tip_a = _extend(root_cid, "xa", 50)
1070 tip_b = _extend(root_cid, "xb", 50)
1071
1072 tracemalloc.start()
1073 tracemalloc.clear_traces()
1074 try:
1075 base = find_merge_base(repo, tip_a, tip_b)
1076 except MuseCLIError:
1077 base = None # Cap triggered β€” that's also a valid outcome.
1078 _, peak_bytes = tracemalloc.get_traced_memory()
1079 tracemalloc.stop()
1080
1081 peak_mib = peak_bytes / (1024 * 1024)
1082 assert peak_mib <= 64, (
1083 f"find_merge_base peak {peak_mib:.1f} MiB β€” expected ≀ 64 MiB "
1084 f"for two 50-commit branches (cap={CAP})."
1085 )
1086
1087
1088 class TestSnapshotManifest75kFiles:
1089 """build_snapshot_manifest on 75 000 files must stay under 512 MiB.
1090
1091 The manifest dict holds only ``{rel_path: sha256_hex}`` β€” strings only.
1092 75k Γ— (60-char path + 64-char hash) β‰ˆ 9.3 MiB for the dict alone.
1093 The peak should be dominated by the stat-cache msgpack load, not by
1094 the manifest itself. 512 MiB is a generous ceiling to catch any
1095 accidental full-file-content buffering.
1096 """
1097
1098 @pytest.mark.slow
1099 def test_75k_files_peak_rss_under_512_mib(
1100 self, tmp_path: pathlib.Path
1101 ) -> None:
1102 """build_snapshot_manifest on 75 000 small files stays under 512 MiB."""
1103 root = tmp_path / "workdir"
1104 root.mkdir()
1105 muse_dir(root).mkdir()
1106 (repo_json_path(root)).write_text('{"repo_id": "bench"}')
1107
1108 # 750 dirs Γ— 100 files = 75 000 files, each 128 bytes.
1109 for d in range(750):
1110 dp = root / f"pkg_{d:04d}"
1111 dp.mkdir()
1112 for f in range(100):
1113 (dp / f"f_{f:03d}.py").write_bytes(b"x" * 128)
1114
1115 tracemalloc.start()
1116 tracemalloc.clear_traces()
1117 manifest = build_snapshot_manifest(root)
1118 _, peak_bytes = tracemalloc.get_traced_memory()
1119 tracemalloc.stop()
1120
1121 assert len(manifest) == 75_000, (
1122 f"Expected 75 000 files in manifest, got {len(manifest)}"
1123 )
1124 peak_mib = peak_bytes / (1024 * 1024)
1125 assert peak_mib <= 512, (
1126 f"build_snapshot_manifest(75k) peak {peak_mib:.1f} MiB exceeds 512 MiB. "
1127 "File content must never be loaded into memory β€” only stat + SHA-256."
1128 )
1129
1130 def test_10k_files_peak_rss_under_64_mib(
1131 self, tmp_path: pathlib.Path
1132 ) -> None:
1133 """10k-file manifest stays under 64 MiB (fast smoke for the ceiling property)."""
1134 root = tmp_path / "workdir"
1135 root.mkdir()
1136 muse_dir(root).mkdir()
1137 (repo_json_path(root)).write_text('{"repo_id": "bench"}')
1138
1139 for d in range(100):
1140 dp = root / f"pkg_{d:03d}"
1141 dp.mkdir()
1142 for f in range(100):
1143 (dp / f"f_{f:03d}.py").write_bytes(b"y" * 64)
1144
1145 tracemalloc.start()
1146 tracemalloc.clear_traces()
1147 manifest = build_snapshot_manifest(root)
1148 _, peak_bytes = tracemalloc.get_traced_memory()
1149 tracemalloc.stop()
1150
1151 assert len(manifest) == 10_000
1152 peak_mib = peak_bytes / (1024 * 1024)
1153 assert peak_mib <= 64, (
1154 f"build_snapshot_manifest(10k) peak {peak_mib:.1f} MiB β€” expected ≀ 64 MiB."
1155 )
1156
1157
1158 class TestWalkCommitsBetweenCap:
1159 """walk_commits_between must truncate at its cap, never OOM.
1160
1161 This function is used by ``muse status --json`` for ahead/behind counts.
1162 A branch 100k commits ahead of remote must return at most ``max_commits``
1163 records and set ``truncated=True`` β€” it must never allocate memory
1164 proportional to the full chain depth.
1165 """
1166
1167 def test_truncates_at_cap_not_oom(
1168 self, tmp_path: pathlib.Path
1169 ) -> None:
1170 """Chain of 1 000 commits with cap=100 truncates, doesn't exhaust memory."""
1171 repo = _repo(tmp_path)
1172 N = 1_000
1173 CAP = 100
1174 tip = _write_chain(repo, "main", N)
1175
1176 result = walk_commits_between_result(repo, tip, max_commits=CAP)
1177
1178 assert result["truncated"] is True, (
1179 "Expected truncated=True for chain longer than cap"
1180 )
1181 assert len(result["commits"]) == CAP, (
1182 f"Expected exactly {CAP} commits, got {len(result['commits'])}"
1183 )
1184
1185 def test_truncation_memory_bounded(
1186 self, tmp_path: pathlib.Path
1187 ) -> None:
1188 """walk_commits_between_result peak memory is bounded by cap, not chain depth."""
1189 repo = _repo(tmp_path)
1190 N = 2_000
1191 CAP = 200
1192 tip = _write_chain(repo, "main", N)
1193
1194 tracemalloc.start()
1195 tracemalloc.clear_traces()
1196 result = walk_commits_between_result(repo, tip, max_commits=CAP)
1197 _, peak_bytes = tracemalloc.get_traced_memory()
1198 tracemalloc.stop()
1199
1200 assert result["count"] == CAP
1201 peak_mib = peak_bytes / (1024 * 1024)
1202 assert peak_mib <= 32, (
1203 f"walk_commits_between_result(cap={CAP}) peak {peak_mib:.1f} MiB β€” "
1204 "memory must be proportional to cap, not chain depth."
1205 )