gabriel / muse public
test_mpack_perf.py python
834 lines 30.8 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
1 """Phase 3.4 — MPack build and apply at scale.
2
3 Target metrics (measured on a 2024 MacBook Pro M4, macOS 15):
4
5 build_mpack (10 000 objects × 4 KiB): < 60 s [@slow]
6 apply_mpack (10 000 objects × 4 KiB): < 60 s [@slow]
7 verify-pack (10 000 objects × 4 KiB): < 120 s [@slow] (in practice much faster)
8 collect_blob_ids (1 000 objects): < 1 s (no blob reads)
9
10 Edges verified beyond the plan:
11
12 a. ``build_mpack`` loads ALL blob bytes simultaneously — peak RSS ≈ 2× blob total.
13 b. ``have=`` filter correctly reduces both commit count and blob payload.
14 c. ``MAX_PACK_OBJECTS`` applies to total_items (commits + snapshots + objects),
15 not per-type — an mpack within per-type limits can still be rejected.
16 d. Oversized object (> MAX_OBJECT_WRITE_BYTES) is silently skipped, not raised —
17 caller sees blobs_skipped++ but no error; documented behaviour.
18 e. ``verify-pack --stat`` is purely structural — no SHA-256, near-instant.
19 f. Duplicate OID dedup in ``apply_mpack`` — each OID written at most once.
20 g. Round-trip integrity: build → msgpack-serialize → apply → all objects present.
21 """
22
23 from __future__ import annotations
24
25 import datetime
26 import pathlib
27 import sys
28 import tempfile
29 import time
30 import tracemalloc
31
32 import msgpack
33 import pytest
34 from unittest.mock import patch
35
36 from muse.core.object_store import write_object
37 from muse.core.mpack import (
38 MAX_OBJECT_WRITE_BYTES,
39 MAX_PACK_OBJECTS,
40 BlobPayload,
41 MPack,
42 apply_mpack,
43 build_mpack,
44 collect_blob_ids,
45 )
46 from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id
47
48 from muse.core.types import Manifest, blob_id
49 from muse.core.io import MAX_PACK_MSGPACK_BYTES
50 from muse.core.refs import write_branch_ref
51 from muse.core.commits import (
52 CommitRecord,
53 write_commit,
54 )
55 from muse.core.snapshots import (
56 SnapshotRecord,
57 write_snapshot,
58 )
59 from muse.core.paths import config_toml_path, muse_dir
60
61 # ---------------------------------------------------------------------------
62 # Helpers
63 # ---------------------------------------------------------------------------
64
65
66
67 def _make_repo(tmp: pathlib.Path) -> pathlib.Path:
68 tmp.mkdir(parents=True, exist_ok=True)
69 muse = muse_dir(tmp)
70 muse.mkdir()
71 (muse / "repo.json").write_text('{"repo_id":"bench","owner":"bench"}')
72 for d in ("commits", "snapshots", "objects"):
73 (muse / d).mkdir()
74 (muse / "refs" / "heads").mkdir(parents=True)
75 (muse / "HEAD").write_text("ref: refs/heads/main\n")
76 (muse / "config.toml").write_text("")
77 return tmp
78
79
80 def _fresh_repo(tmp: pathlib.Path) -> pathlib.Path:
81 tmp.mkdir(parents=True, exist_ok=True)
82 muse = muse_dir(tmp)
83 muse.mkdir()
84 (muse / "repo.json").write_text('{"repo_id":"dst"}')
85 for d in ("commits", "snapshots", "objects"):
86 (muse / d).mkdir()
87 return tmp
88
89
90 def _populate(
91 repo: pathlib.Path,
92 n_commits: int = 10,
93 n_unique_objects: int = 10,
94 blob_size: int = 4096,
95 branch: str = "main",
96 start: int = 0,
97 ) -> tuple[str, dict[str, str]]:
98 """Write *n_unique_objects* blobs and a *n_commits* chain.
99
100 Returns ``(tip_commit_id, {path: oid})`` manifest.
101 """
102 blobs: Manifest = {}
103 for i in range(n_unique_objects):
104 data = f"obj-{i + start:08d}-".encode() + b"x" * blob_size
105 oid = blob_id(data)
106 write_object(repo, oid, data)
107 blobs[f"file_{i:04d}.py"] = oid
108
109 sid = compute_snapshot_id(blobs)
110 write_snapshot(repo, SnapshotRecord(snapshot_id=sid, manifest=blobs))
111
112 parent: str | None = None
113 tip = ""
114 for i in range(n_commits):
115 ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
116 msg = f"c{start + i:07d}"
117 cid = compute_commit_id(
118 parent_ids=[parent] if parent else [],
119 snapshot_id=sid,
120 message=msg,
121 committed_at_iso=ts.isoformat(),
122 author="bench",
123 )
124 rec = CommitRecord(
125 commit_id=cid,
126 branch=branch,
127 snapshot_id=sid,
128 message=msg,
129 committed_at=ts,
130 parent_commit_id=parent,
131 parent2_commit_id=None,
132 author="bench",
133 metadata={},
134 structured_delta=None,
135 sem_ver_bump="none",
136 breaking_changes=[],
137 agent_id="",
138 model_id="",
139 toolchain_id="",
140 prompt_hash="",
141 signature="",
142 signer_key_id="",
143 )
144 write_commit(repo, rec)
145 parent = cid
146 tip = cid
147
148 write_branch_ref(repo, branch, tip)
149 return tip, blobs
150
151
152 # ---------------------------------------------------------------------------
153 # Phase 3.4.1 — build_mpack throughput
154 # ---------------------------------------------------------------------------
155
156
157 class TestBuildMPackThroughput:
158 """build_mpack must sustain ≥ 2 000 objects/sec in the object-read loop.
159
160 build_mpack's hot path is ``read_object`` for each unique blob. At the
161 Phase 3.1 floor of 2 000 objects/sec, 100 000 objects take ~50 s, within
162 the 60 s target. The fast test covers 1 000 objects and asserts the rate
163 directly; the slow test covers 10 000 objects and proves on-disk timing.
164 """
165
166 _MIN_OBJECTS_PER_SEC = 2_000
167
168 def test_build_mpack_1k_objects_rate(self, tmp_path: pathlib.Path) -> None:
169 """build_mpack on 1 000 objects must achieve ≥ 2 000 objects/sec."""
170 repo = _make_repo(tmp_path)
171 N = 1_000
172 tip, blobs = _populate(repo, n_commits=50, n_unique_objects=N)
173
174 t0 = time.perf_counter()
175 mpack = build_mpack(repo, [tip])
176 elapsed = time.perf_counter() - t0
177
178 assert len(mpack["blobs"]) == N, (
179 f"Expected {N} blobs in mpack, got {len(mpack['blobs'])}"
180 )
181 rate = N / elapsed
182 assert rate >= self._MIN_OBJECTS_PER_SEC, (
183 f"build_mpack throughput {rate:.0f} objects/sec < {self._MIN_OBJECTS_PER_SEC} minimum. "
184 f"({N} objects took {elapsed:.2f}s.)"
185 )
186
187 def test_build_mpack_have_filter_excludes_base(
188 self, tmp_path: pathlib.Path
189 ) -> None:
190 """build_mpack with have=[base_tip] sends only delta commits, not the full history."""
191 repo = _make_repo(tmp_path)
192 base_tip, base_blobs = _populate(repo, n_commits=50, n_unique_objects=100, start=0)
193
194 # New commits on top of the base, with fresh objects.
195 delta_tip, delta_blobs = _populate(
196 repo, n_commits=20, n_unique_objects=50, start=1000
197 )
198 # Chain delta to base by writing a commit that has base_tip as parent.
199 ts = datetime.datetime(2026, 1, 2, tzinfo=datetime.timezone.utc)
200 sid = compute_snapshot_id(delta_blobs)
201 chained_cid = compute_commit_id(
202 parent_ids=[delta_tip, base_tip],
203 snapshot_id=sid,
204 message="merge",
205 committed_at_iso=ts.isoformat(),
206 author="bench",
207 )
208 chained = CommitRecord(
209 commit_id=chained_cid,
210 branch="main",
211 snapshot_id=sid,
212 message="merge",
213 committed_at=ts,
214 parent_commit_id=delta_tip,
215 parent2_commit_id=base_tip,
216 author="bench",
217 metadata={},
218 structured_delta=None,
219 sem_ver_bump="none",
220 breaking_changes=[],
221 agent_id="",
222 model_id="",
223 toolchain_id="",
224 prompt_hash="",
225 signature="",
226 signer_key_id="",
227 )
228 write_commit(repo, chained)
229 write_branch_ref(repo, "main", chained_cid)
230
231 # Full mpack (no have).
232 full_mpack = build_mpack(repo, [chained_cid])
233 # Delta mpack: receiver already has base history.
234 delta_mpack = build_mpack(repo, [chained_cid], have=[base_tip])
235
236 assert len(delta_mpack["commits"]) < len(full_mpack["commits"]), (
237 "have= filter must reduce commit count"
238 )
239 # The base objects must not be in the delta mpack since they share the snapshot.
240 full_oids = {o["object_id"] for o in full_mpack["blobs"]}
241 delta_oids = {o["object_id"] for o in delta_mpack["blobs"]}
242 assert delta_oids.issubset(full_oids), "delta mpack must be a subset of full mpack"
243
244 @pytest.mark.slow
245 def test_build_mpack_10k_objects_under_60s(
246 self, tmp_path: pathlib.Path
247 ) -> None:
248 """build_mpack on 10 000 objects must complete in < 60 s.
249
250 This extrapolates to 100 000 objects at the same rate: 100k / 2000 ≈ 50 s,
251 within the plan target of 60 s.
252 """
253 repo = _make_repo(tmp_path)
254 N = 10_000
255 tip, _ = _populate(repo, n_commits=100, n_unique_objects=N)
256
257 t0 = time.perf_counter()
258 mpack = build_mpack(repo, [tip])
259 elapsed = time.perf_counter() - t0
260
261 assert len(mpack["blobs"]) == N
262 assert elapsed < 60.0, (
263 f"build_mpack({N} objects) took {elapsed:.1f}s — target < 60 s. "
264 f"Rate: {N/elapsed:.0f} objects/sec."
265 )
266
267
268 class TestCollectObjectIdsThroughput:
269 """collect_blob_ids must be significantly faster than build_mpack.
270
271 It performs the same BFS + manifest traversal but skips reading blob bytes.
272 The result feeds client-side deduplication so only missing objects are sent.
273 """
274
275 def test_collect_blob_ids_no_blob_reads(
276 self, tmp_path: pathlib.Path
277 ) -> None:
278 """collect_blob_ids on 500 objects must be < 1 s (no blob reads)."""
279 repo = _make_repo(tmp_path)
280 N = 500
281 tip, blobs = _populate(repo, n_commits=50, n_unique_objects=N)
282
283 t0 = time.perf_counter()
284 oids = collect_blob_ids(repo, [tip])
285 elapsed = time.perf_counter() - t0
286
287 assert len(oids) == N, f"Expected {N} OIDs, got {len(oids)}"
288 assert elapsed < 1.0, (
289 f"collect_blob_ids({N}) took {elapsed:.3f}s — expected < 1 s. "
290 "It must not read blob bytes; only BFS + manifest traversal."
291 )
292
293 def test_collect_blob_ids_faster_than_build_mpack(
294 self, tmp_path: pathlib.Path
295 ) -> None:
296 """collect_blob_ids must be faster than build_mpack for the same input."""
297 repo = _make_repo(tmp_path)
298 N = 200
299 tip, _ = _populate(repo, n_commits=20, n_unique_objects=N)
300
301 t_collect = time.perf_counter()
302 oids = collect_blob_ids(repo, [tip])
303 t_collect = time.perf_counter() - t_collect
304
305 t_build = time.perf_counter()
306 mpack = build_mpack(repo, [tip])
307 t_build = time.perf_counter() - t_build
308
309 assert len(oids) == len(mpack["blobs"]) == N
310 assert t_collect <= t_build, (
311 f"collect_blob_ids ({t_collect:.3f}s) must be ≤ build_mpack ({t_build:.3f}s) — "
312 "collect skips blob reads, build reads every byte."
313 )
314
315 def test_collect_blob_ids_have_excludes_ancestors(
316 self, tmp_path: pathlib.Path
317 ) -> None:
318 """collect_blob_ids with have= returns only new object IDs."""
319 repo = _make_repo(tmp_path)
320 base_tip, base_blobs = _populate(repo, n_commits=10, n_unique_objects=50, start=0)
321 delta_tip, delta_blobs = _populate(repo, n_commits=5, n_unique_objects=30, start=100)
322
323 all_oids = collect_blob_ids(repo, [delta_tip])
324 delta_oids = collect_blob_ids(repo, [delta_tip], have=[base_tip])
325
326 # Delta must be a subset and contain only the 30 new objects.
327 assert set(delta_blobs.values()).issubset(set(all_oids))
328 assert len(delta_oids) == len(set(delta_blobs.values())), (
329 f"Expected {len(set(delta_blobs.values()))} delta OIDs, got {len(delta_oids)}"
330 )
331
332
333 # ---------------------------------------------------------------------------
334 # Phase 3.4.2 — apply_mpack throughput
335 # ---------------------------------------------------------------------------
336
337
338 class TestApplyMPackThroughput:
339 """apply_mpack must sustain ≥ 1 500 objects/sec in the object-write loop.
340
341 fsync is mocked: the test measures the mpack unpacking + hash-verify +
342 mkstemp + fchmod + os.replace pipeline without OS I/O latency. Durability
343 ordering is verified by test_integrity_I2_fsync.py.
344 """
345
346 _MIN_OBJECTS_PER_SEC: int = 1_500
347
348 @pytest.fixture(autouse=True)
349 def no_fsync(self) -> None:
350 """Mock out all fsync calls so the test measures algorithmic throughput."""
351 with patch("muse.core.object_store._fsync_fd", return_value=None), \
352 patch("muse.core.commits.os.fsync", return_value=None), \
353 patch("muse.core.io.os.fsync", return_value=None), \
354 patch("muse.core.io.fcntl.fcntl", return_value=0):
355 yield
356
357 @pytest.mark.perf
358 def test_apply_mpack_1k_objects_rate(self, tmp_path: pathlib.Path) -> None:
359 """apply_mpack of a 1 000-object mpack must achieve ≥ _MIN_OBJECTS_PER_SEC."""
360 src = _make_repo(tmp_path / "src")
361 N = 1_000
362 tip, _ = _populate(src, n_commits=50, n_unique_objects=N)
363 mpack = build_mpack(src, [tip])
364 assert len(mpack["blobs"]) == N
365
366 dst = _fresh_repo(tmp_path / "dst")
367
368 t0 = time.perf_counter()
369 result = apply_mpack(dst, mpack)
370 elapsed = time.perf_counter() - t0
371
372 assert result["blobs_written"] == N
373 rate = N / elapsed
374 assert rate >= self._MIN_OBJECTS_PER_SEC, (
375 f"apply_mpack throughput {rate:.0f} objects/sec < {self._MIN_OBJECTS_PER_SEC} minimum. "
376 f"({N} objects took {elapsed:.2f}s.)"
377 )
378
379 def test_apply_mpack_idempotent_second_apply_skips_all(
380 self, tmp_path: pathlib.Path
381 ) -> None:
382 """Applying the same mpack twice: second apply must skip every object."""
383 src = _make_repo(tmp_path / "src")
384 tip, _ = _populate(src, n_commits=20, n_unique_objects=100)
385 mpack = build_mpack(src, [tip])
386
387 dst = _fresh_repo(tmp_path / "dst")
388 r1 = apply_mpack(dst, mpack)
389 r2 = apply_mpack(dst, mpack)
390
391 assert r1["blobs_written"] == 100
392 assert r2["blobs_written"] == 0
393 assert r2["blobs_skipped"] == 100, (
394 f"Expected 100 skipped on second apply, got {r2['blobs_skipped']}"
395 )
396
397 @pytest.mark.slow
398 def test_apply_mpack_10k_objects_under_60s(
399 self, tmp_path: pathlib.Path
400 ) -> None:
401 """apply_mpack of a 10 000-object mpack must complete in < 60 s."""
402 src = _make_repo(tmp_path / "src")
403 N = 10_000
404 tip, _ = _populate(src, n_commits=100, n_unique_objects=N)
405 mpack = build_mpack(src, [tip])
406
407 dst = _fresh_repo(tmp_path / "dst")
408
409 t0 = time.perf_counter()
410 result = apply_mpack(dst, mpack)
411 elapsed = time.perf_counter() - t0
412
413 assert result["blobs_written"] == N
414 assert elapsed < 60.0, (
415 f"apply_mpack({N} objects) took {elapsed:.1f}s — target < 60 s. "
416 f"Rate: {N/elapsed:.0f} objects/sec."
417 )
418
419
420 # ---------------------------------------------------------------------------
421 # Phase 3.4.3 — verify-pack
422 # ---------------------------------------------------------------------------
423
424
425 class TestVerifyPackIntegrity:
426 """verify-pack must detect hash mismatches and work fast in --stat mode."""
427
428 def test_verify_pack_stat_returns_counts(
429 self, tmp_path: pathlib.Path
430 ) -> None:
431 """verify-pack --stat must count objects/snapshots/commits without hashing."""
432 from tests.cli_test_helper import CliRunner
433 import json
434
435 repo = _make_repo(tmp_path)
436 tip, _ = _populate(repo, n_commits=20, n_unique_objects=50)
437 mpack = build_mpack(repo, [tip])
438 raw = msgpack.packb(mpack, use_bin_type=True)
439
440 mpack_file = tmp_path / "pack.muse"
441 mpack_file.write_bytes(raw)
442 (config_toml_path(repo)).write_text("")
443
444 runner = CliRunner()
445 result = runner.invoke(
446 None,
447 [
448 "verify-pack",
449 "--stat",
450 "--no-local",
451 "--json",
452 "--file", str(mpack_file),
453 ],
454 env={"MUSE_REPO_ROOT": str(repo)},
455 )
456 assert result.exit_code == 0, f"verify-pack --stat failed: {result.output}"
457 payload = json.loads(result.output)
458 assert payload["blobs"] == 50
459 assert payload["commits"] == 20
460
461 def test_verify_pack_detects_hash_mismatch(
462 self, tmp_path: pathlib.Path
463 ) -> None:
464 """verify-pack must flag an object whose content hash does not match its ID."""
465 from tests.cli_test_helper import CliRunner
466 import json
467
468 repo = _make_repo(tmp_path)
469 tip, blobs = _populate(repo, n_commits=5, n_unique_objects=10)
470 mpack = build_mpack(repo, [tip])
471
472 # Tamper: set a wrong content for the first blob while keeping the declared ID.
473 tampered_blob: BlobPayload = {
474 "object_id": mpack["blobs"][0]["object_id"],
475 "content": b"TAMPERED_CONTENT",
476 }
477 tampered: MPack = {
478 "commits": mpack["commits"],
479 "snapshots": mpack["snapshots"],
480 "blobs": [tampered_blob] + mpack["blobs"][1:],
481 "summary": mpack.get("summary", {}),
482 "meta": mpack.get("meta", {}),
483 }
484 raw = msgpack.packb(tampered, use_bin_type=True)
485 mpack_file = tmp_path / "tampered.muse"
486 mpack_file.write_bytes(raw)
487 (config_toml_path(repo)).write_text("")
488
489 runner = CliRunner()
490 result = runner.invoke(
491 None,
492 [
493 "verify-pack",
494 "--no-local",
495 "--json",
496 "--file", str(mpack_file),
497 ],
498 env={"MUSE_REPO_ROOT": str(repo)},
499 )
500 assert result.exit_code != 0, "verify-pack must exit non-zero when hash mismatches"
501 payload = json.loads(result.output)
502 assert payload["all_ok"] is False
503 assert any("hash mismatch" in f["error"] for f in payload["failures"]), (
504 f"Expected 'hash mismatch' in failures: {payload['failures']}"
505 )
506
507 @pytest.mark.slow
508 def test_verify_pack_10k_objects_under_120s(
509 self, tmp_path: pathlib.Path
510 ) -> None:
511 """verify-pack of a 10 000-object mpack must complete in < 120 s.
512
513 SHA-256 on M4 Silicon processes ~3 GiB/s; 10k × 4 KiB = 40 MiB → < 1 s.
514 The 120 s ceiling catches pathological I/O or per-object overhead.
515 """
516 from tests.cli_test_helper import CliRunner
517 import json
518
519 repo = _make_repo(tmp_path)
520 N = 10_000
521 tip, _ = _populate(repo, n_commits=100, n_unique_objects=N)
522 mpack = build_mpack(repo, [tip])
523 raw = msgpack.packb(mpack, use_bin_type=True)
524 mpack_file = tmp_path / "pack10k.muse"
525 mpack_file.write_bytes(raw)
526 (config_toml_path(repo)).write_text("")
527
528 t0 = time.perf_counter()
529 runner = CliRunner()
530 result = runner.invoke(
531 None,
532 [
533 "verify-pack",
534 "--no-local",
535 "--json",
536 "--file", str(mpack_file),
537 ],
538 env={"MUSE_REPO_ROOT": str(repo)},
539 )
540 elapsed = time.perf_counter() - t0
541
542 assert result.exit_code == 0, f"verify-pack failed: {result.output[:200]}"
543 payload = json.loads(result.output)
544 assert payload["all_ok"] is True
545 assert payload["blobs_checked"] == N
546 assert elapsed < 120.0, (
547 f"verify-pack({N} objects) took {elapsed:.1f}s — target < 120 s."
548 )
549
550
551 # ---------------------------------------------------------------------------
552 # Phase 3.4.4 — cap and guard enforcement
553 # ---------------------------------------------------------------------------
554
555
556 class TestMPackCapEnforcement:
557 """MPack-bomb and size-cap guards must fire correctly."""
558
559 def test_apply_mpack_rejects_mpack_exceeding_max_pack_objects(
560 self, tmp_path: pathlib.Path
561 ) -> None:
562 """apply_mpack raises ValueError when total_items > MAX_PACK_OBJECTS.
563
564 MAX_PACK_OBJECTS counts commits + snapshots + objects combined — not
565 per-type. An mpack with MAX_PACK_OBJECTS + 1 total items is rejected.
566 """
567 repo = _fresh_repo(tmp_path)
568 oversized: MPack = {
569 "commits": [{}] * (MAX_PACK_OBJECTS + 1),
570 "snapshots": [],
571 "blobs": [],
572 }
573 with pytest.raises(ValueError, match="Pack rejected"):
574 apply_mpack(repo, oversized)
575
576 def test_apply_mpack_accepts_mpack_at_exact_cap(
577 self, tmp_path: pathlib.Path
578 ) -> None:
579 """apply_mpack does NOT raise when total_items == MAX_PACK_OBJECTS.
580
581 Items are malformed (empty dicts) so they are skipped as bad entries,
582 but the cap check must pass.
583 """
584 repo = _fresh_repo(tmp_path)
585 at_cap: MPack = {
586 "commits": [{}] * MAX_PACK_OBJECTS,
587 "snapshots": [],
588 "blobs": [],
589 }
590 # Must not raise ValueError for the cap — skips malformed entries instead.
591 result = apply_mpack(repo, at_cap)
592 # Each empty-dict commit is missing commit_id and snapshot_id, so every
593 # one is skipped by the essential-field guard added to apply_mpack.
594 assert result["commits_written"] == 0, (
595 "All malformed empty-dict commits must be skipped, not written"
596 )
597
598 def test_apply_mpack_total_items_cap_is_cross_type(
599 self, tmp_path: pathlib.Path
600 ) -> None:
601 """MAX_PACK_OBJECTS applies across commits+snapshots+objects, not per-type.
602
603 80 000 objects + 20 000 commits + 1 snapshot = 100 001 → rejected.
604 """
605 repo = _fresh_repo(tmp_path)
606 cross_type: MPack = {
607 "commits": [{}] * 20_000,
608 "snapshots": [{}] * 1,
609 "blobs": [{}] * 80_000,
610 }
611 with pytest.raises(ValueError, match="Pack rejected"):
612 apply_mpack(repo, cross_type)
613
614 def test_apply_mpack_oversized_object_is_skipped_not_raised(
615 self, tmp_path: pathlib.Path
616 ) -> None:
617 """An object exceeding MAX_OBJECT_WRITE_BYTES is silently skipped.
618
619 This is documented behaviour: the per-object cap logs a warning and
620 increments the loop counter rather than raising an exception, so the
621 rest of the mpack is still applied.
622 """
623 repo = _fresh_repo(tmp_path)
624 good_data = b"x" * 64
625 good_oid = blob_id(good_data)
626 oversized_oid = blob_id(b"y") # real hash — but we'll fake the size check
627 # Construct an mpack with one valid object and one whose content we
628 # claim is MAX_OBJECT_WRITE_BYTES + 1 bytes.
629 # We use a real 1-byte payload but lie about the size by patching
630 # apply_mpack's check via len(raw) — we need an actually-oversized payload.
631 # Build a real oversized content string:
632 huge_data = b"z" * (MAX_OBJECT_WRITE_BYTES + 1)
633 huge_oid = blob_id(huge_data)
634 mpack: MPack = {
635 "commits": [],
636 "snapshots": [],
637 "blobs": [
638 BlobPayload(object_id=good_oid, content=good_data),
639 BlobPayload(object_id=huge_oid, content=huge_data),
640 ],
641 }
642 result = apply_mpack(repo, mpack)
643 # Good blob written; oversized blob skipped.
644 assert result["blobs_written"] == 1, (
645 f"Expected 1 blob written (the good one), got {result['blobs_written']}"
646 )
647 # Oversized object must NOT be in the store.
648 from muse.core.object_store import has_object
649 assert not has_object(repo, huge_oid), (
650 "Oversized object must be rejected and not written to store"
651 )
652
653 def test_apply_mpack_deduplicates_repeated_oid(
654 self, tmp_path: pathlib.Path
655 ) -> None:
656 """apply_mpack writes a repeated OID only once (dedup via seen_object_ids)."""
657 repo = _fresh_repo(tmp_path)
658 data = b"deduplicate-me" * 100
659 oid = blob_id(data)
660 REPEAT = 50
661 mpack: MPack = {
662 "commits": [],
663 "snapshots": [],
664 "blobs": [BlobPayload(object_id=oid, content=data)] * REPEAT,
665 }
666 result = apply_mpack(repo, mpack)
667 # First occurrence written; remaining 49 skipped.
668 assert result["blobs_written"] == 1, (
669 f"Expected 1 write for {REPEAT} identical OIDs, got {result['blobs_written']}"
670 )
671 assert result["blobs_skipped"] == REPEAT - 1, (
672 f"Expected {REPEAT - 1} skipped, got {result['blobs_skipped']}"
673 )
674
675 def test_apply_mpack_empty_mpack_is_noop(
676 self, tmp_path: pathlib.Path
677 ) -> None:
678 """apply_mpack on an mpack with no items returns all-zero counts."""
679 repo = _fresh_repo(tmp_path)
680 empty: MPack = {"commits": [], "snapshots": [], "blobs": []}
681 result = apply_mpack(repo, empty)
682 assert result["commits_written"] == 0
683 assert result["snapshots_written"] == 0
684 assert result["blobs_written"] == 0
685 assert result["blobs_skipped"] == 0
686
687 def test_have_equals_want_produces_empty_mpack(
688 self, tmp_path: pathlib.Path
689 ) -> None:
690 """build_mpack with have=[tip] where tip is also in want returns empty mpack."""
691 repo = _make_repo(tmp_path)
692 tip, _ = _populate(repo, n_commits=10, n_unique_objects=20)
693
694 mpack = build_mpack(repo, [tip], have=[tip])
695
696 assert mpack["commits"] == [], (
697 "When have contains the want tip, BFS should yield 0 commits"
698 )
699 assert mpack["blobs"] == [], (
700 "Empty commit set must produce empty object list"
701 )
702
703
704 # ---------------------------------------------------------------------------
705 # Phase 3.4.5 — memory ceiling
706 # ---------------------------------------------------------------------------
707
708
709 class TestMPackMemoryCeiling:
710 """build_mpack and apply_mpack peak memory must be proportional to blob payload.
711
712 build_mpack holds ALL object bytes in-memory simultaneously — this is a
713 known architectural property, not a bug. The test confirms:
714 1. Peak RSS ≈ total blob bytes (not 10× or 100×).
715 2. build_mpack does not accumulate unbounded intermediate structures.
716 """
717
718 def test_build_mpack_peak_rss_proportional_to_blob_total(
719 self, tmp_path: pathlib.Path
720 ) -> None:
721 """build_mpack peak allocation is ≤ 3× the total blob payload size."""
722 repo = _make_repo(tmp_path)
723 N = 500
724 BLOB_SZ = 4096 # 4 KiB
725 tip, _ = _populate(repo, n_commits=50, n_unique_objects=N, blob_size=BLOB_SZ)
726 blob_total_mib = N * BLOB_SZ / (1024 * 1024)
727
728 tracemalloc.start()
729 tracemalloc.clear_traces()
730 mpack = build_mpack(repo, [tip])
731 _, peak_bytes = tracemalloc.get_traced_memory()
732 tracemalloc.stop()
733
734 peak_mib = peak_bytes / (1024 * 1024)
735 ceiling_mib = blob_total_mib * 3 # generous: blobs + msgpack + overhead
736 assert len(mpack["blobs"]) == N
737 assert peak_mib <= ceiling_mib, (
738 f"build_mpack peak {peak_mib:.1f} MiB exceeds 3× blob total "
739 f"({ceiling_mib:.1f} MiB for {N} × {BLOB_SZ//1024} KiB objects). "
740 "build_mpack must not accumulate more than the object bytes themselves."
741 )
742
743 def test_apply_mpack_peak_rss_under_64_mib_for_small_objects(
744 self, tmp_path: pathlib.Path
745 ) -> None:
746 """apply_mpack of 500 × 4 KiB objects stays under 64 MiB."""
747 src = _make_repo(tmp_path / "src")
748 tip, _ = _populate(src, n_commits=50, n_unique_objects=500, blob_size=4096)
749 mpack = build_mpack(src, [tip])
750
751 dst = _fresh_repo(tmp_path / "dst")
752
753 tracemalloc.start()
754 tracemalloc.clear_traces()
755 apply_mpack(dst, mpack)
756 _, peak_bytes = tracemalloc.get_traced_memory()
757 tracemalloc.stop()
758
759 peak_mib = peak_bytes / (1024 * 1024)
760 assert peak_mib <= 64, (
761 f"apply_mpack(500 × 4 KiB) peak {peak_mib:.1f} MiB — expected ≤ 64 MiB."
762 )
763
764
765 # ---------------------------------------------------------------------------
766 # Phase 3.4.6 — round-trip integrity
767 # ---------------------------------------------------------------------------
768
769
770 class TestMPackRoundTrip:
771 """End-to-end round-trip: build_mpack → msgpack serialize → apply_mpack → verify."""
772
773 def test_roundtrip_all_objects_restored(
774 self, tmp_path: pathlib.Path
775 ) -> None:
776 """build_mpack → msgpack → apply_mpack round-trip: all objects readable on dst."""
777 src = _make_repo(tmp_path / "src")
778 N_OBJECTS = 200
779 N_COMMITS = 30
780 tip, blobs = _populate(src, n_commits=N_COMMITS, n_unique_objects=N_OBJECTS)
781 mpack = build_mpack(src, [tip])
782
783 # Serialize (simulates wire transfer).
784 raw = msgpack.packb(mpack, use_bin_type=True)
785
786 # Re-hydrate using safe_unpackb (the same path as unpack-objects).
787 from muse.core.io import safe_unpackb, MAX_PACK_MSGPACK_BYTES
788 restored_dict = safe_unpackb(raw, context="roundtrip", max_bytes=MAX_PACK_MSGPACK_BYTES, allow_binary=True)
789 assert isinstance(restored_dict, dict)
790
791 from muse.core.mpack import BlobPayload as BP, MPack as PB
792 raw_blobs = restored_dict.get("blobs") or []
793 blobs_list: list[BP] = []
794 for item in raw_blobs:
795 if isinstance(item, dict):
796 oid = item.get("object_id", "")
797 content = item.get("content", b"")
798 if isinstance(oid, str) and isinstance(content, (bytes, bytearray)):
799 blobs_list.append(BP(object_id=oid, content=bytes(content)))
800
801 hydrated: PB = {
802 "commits": [c for c in (restored_dict.get("commits") or []) if isinstance(c, dict)],
803 "snapshots": [s for s in (restored_dict.get("snapshots") or []) if isinstance(s, dict)],
804 "blobs": blobs_list,
805 }
806 dst = _fresh_repo(tmp_path / "dst")
807 result = apply_mpack(dst, hydrated)
808
809 # Every source object must be readable on the destination.
810 from muse.core.object_store import read_object, has_object
811 missing = [oid for oid in blobs.values() if not has_object(dst, oid)]
812 assert not missing, (
813 f"{len(missing)}/{N_OBJECTS} objects missing after round-trip: "
814 f"{missing[:3]}"
815 )
816 assert result["commits_written"] == N_COMMITS
817 assert result["blobs_written"] == N_OBJECTS
818
819 def test_roundtrip_msgpack_size_within_max_pack_bytes(
820 self, tmp_path: pathlib.Path
821 ) -> None:
822 """The serialised mpack for 1 000 × 4 KiB objects must be < MAX_PACK_MSGPACK_BYTES."""
823 src = _make_repo(tmp_path)
824 N = 1_000
825 tip, _ = _populate(src, n_commits=50, n_unique_objects=N, blob_size=4096)
826 mpack = build_mpack(src, [tip])
827 raw = msgpack.packb(mpack, use_bin_type=True)
828
829 limit = MAX_PACK_MSGPACK_BYTES
830 assert len(raw) < limit, (
831 f"MPack for {N} × 4 KiB objects is {len(raw):,} bytes — "
832 f"exceeds MAX_PACK_MSGPACK_BYTES ({limit:,} bytes / "
833 f"{limit // 1024 // 1024} MiB)."
834 )
File History 5 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:0313c134f0ef4518a9c3a0ec359ffdc42546dc720010730374edfe0857caf7ef rename: delta_add → delta_upsert across wire format, source… Sonnet 4.6 minor 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 29 days ago