gabriel / muse public
test_mpack_perf.py python
831 lines 30.8 KB
Raw
sha256:b561b3dbe85d8f76917476655c0b39e77f247f795a9eb924ff614cdb550e505e refactor(store): Phase 1 — extract raw I/O primitives into … Sonnet 4.6 minor ⚠ breaking 29 days ago
1 """Phase 3.4 — MPack build and apply at scale.
2
3 Target metrics (measured on a 2024 MacBook Pro M4, macOS 15):
4
5 build_mpack (10 000 objects × 4 KiB): < 60 s [@slow]
6 apply_mpack (10 000 objects × 4 KiB): < 60 s [@slow]
7 verify-pack (10 000 objects × 4 KiB): < 120 s [@slow] (in practice much faster)
8 collect_object_ids (1 000 objects): < 1 s (no blob reads)
9
10 Edges verified beyond the plan:
11
12 a. ``build_mpack`` loads ALL blob bytes simultaneously — peak RSS ≈ 2× blob total.
13 b. ``have=`` filter correctly reduces both commit count and blob payload.
14 c. ``MAX_PACK_OBJECTS`` applies to total_items (commits + snapshots + objects),
15 not per-type — an mpack within per-type limits can still be rejected.
16 d. Oversized object (> MAX_OBJECT_WRITE_BYTES) is silently skipped, not raised —
17 caller sees objects_skipped++ but no error; documented behaviour.
18 e. ``verify-pack --stat`` is purely structural — no SHA-256, near-instant.
19 f. Duplicate OID dedup in ``apply_mpack`` — each OID written at most once.
20 g. Round-trip integrity: build → msgpack-serialize → apply → all objects present.
21 """
22
23 from __future__ import annotations
24
25 import datetime
26 import pathlib
27 import sys
28 import tempfile
29 import time
30 import tracemalloc
31
32 import msgpack
33 import pytest
34 from unittest.mock import patch
35
36 from muse.core.object_store import write_object
37 from muse.core.mpack import (
38 MAX_OBJECT_WRITE_BYTES,
39 MAX_PACK_OBJECTS,
40 ObjectPayload,
41 MPack,
42 apply_mpack,
43 build_mpack,
44 collect_object_ids,
45 )
46 from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id
47
48 from muse.core.types import Manifest, blob_id
49 from muse.core.io import MAX_PACK_MSGPACK_BYTES
50 from muse.core.store import (
51 CommitRecord,
52 SnapshotRecord,
53 write_branch_ref,
54 write_commit,
55 write_snapshot,
56 )
57 from muse.core.paths import config_toml_path, muse_dir
58
59 # ---------------------------------------------------------------------------
60 # Helpers
61 # ---------------------------------------------------------------------------
62
63
64
65 def _make_repo(tmp: pathlib.Path) -> pathlib.Path:
66 tmp.mkdir(parents=True, exist_ok=True)
67 muse = muse_dir(tmp)
68 muse.mkdir()
69 (muse / "repo.json").write_text('{"repo_id":"bench","owner":"bench"}')
70 for d in ("commits", "snapshots", "objects"):
71 (muse / d).mkdir()
72 (muse / "refs" / "heads").mkdir(parents=True)
73 (muse / "HEAD").write_text("ref: refs/heads/main\n")
74 (muse / "config.toml").write_text("")
75 return tmp
76
77
78 def _fresh_repo(tmp: pathlib.Path) -> pathlib.Path:
79 tmp.mkdir(parents=True, exist_ok=True)
80 muse = muse_dir(tmp)
81 muse.mkdir()
82 (muse / "repo.json").write_text('{"repo_id":"dst"}')
83 for d in ("commits", "snapshots", "objects"):
84 (muse / d).mkdir()
85 return tmp
86
87
88 def _populate(
89 repo: pathlib.Path,
90 n_commits: int = 10,
91 n_unique_objects: int = 10,
92 blob_size: int = 4096,
93 branch: str = "main",
94 start: int = 0,
95 ) -> tuple[str, dict[str, str]]:
96 """Write *n_unique_objects* blobs and a *n_commits* chain.
97
98 Returns ``(tip_commit_id, {path: oid})`` manifest.
99 """
100 blobs: Manifest = {}
101 for i in range(n_unique_objects):
102 data = f"obj-{i + start:08d}-".encode() + b"x" * blob_size
103 oid = blob_id(data)
104 write_object(repo, oid, data)
105 blobs[f"file_{i:04d}.py"] = oid
106
107 sid = compute_snapshot_id(blobs)
108 write_snapshot(repo, SnapshotRecord(snapshot_id=sid, manifest=blobs))
109
110 parent: str | None = None
111 tip = ""
112 for i in range(n_commits):
113 ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
114 msg = f"c{start + i:07d}"
115 cid = compute_commit_id(
116 parent_ids=[parent] if parent else [],
117 snapshot_id=sid,
118 message=msg,
119 committed_at_iso=ts.isoformat(),
120 author="bench",
121 )
122 rec = CommitRecord(
123 commit_id=cid,
124 branch=branch,
125 snapshot_id=sid,
126 message=msg,
127 committed_at=ts,
128 parent_commit_id=parent,
129 parent2_commit_id=None,
130 author="bench",
131 metadata={},
132 structured_delta=None,
133 sem_ver_bump="none",
134 breaking_changes=[],
135 agent_id="",
136 model_id="",
137 toolchain_id="",
138 prompt_hash="",
139 signature="",
140 signer_key_id="",
141 )
142 write_commit(repo, rec)
143 parent = cid
144 tip = cid
145
146 write_branch_ref(repo, branch, tip)
147 return tip, blobs
148
149
150 # ---------------------------------------------------------------------------
151 # Phase 3.4.1 — build_mpack throughput
152 # ---------------------------------------------------------------------------
153
154
155 class TestBuildMPackThroughput:
156 """build_mpack must sustain ≥ 2 000 objects/sec in the object-read loop.
157
158 build_mpack's hot path is ``read_object`` for each unique blob. At the
159 Phase 3.1 floor of 2 000 objects/sec, 100 000 objects take ~50 s, within
160 the 60 s target. The fast test covers 1 000 objects and asserts the rate
161 directly; the slow test covers 10 000 objects and proves on-disk timing.
162 """
163
164 _MIN_OBJECTS_PER_SEC = 2_000
165
166 def test_build_mpack_1k_objects_rate(self, tmp_path: pathlib.Path) -> None:
167 """build_mpack on 1 000 objects must achieve ≥ 2 000 objects/sec."""
168 repo = _make_repo(tmp_path)
169 N = 1_000
170 tip, blobs = _populate(repo, n_commits=50, n_unique_objects=N)
171
172 t0 = time.perf_counter()
173 mpack = build_mpack(repo, [tip])
174 elapsed = time.perf_counter() - t0
175
176 assert len(mpack["objects"]) == N, (
177 f"Expected {N} objects in mpack, got {len(mpack['objects'])}"
178 )
179 rate = N / elapsed
180 assert rate >= self._MIN_OBJECTS_PER_SEC, (
181 f"build_mpack throughput {rate:.0f} objects/sec < {self._MIN_OBJECTS_PER_SEC} minimum. "
182 f"({N} objects took {elapsed:.2f}s.)"
183 )
184
185 def test_build_mpack_have_filter_excludes_base(
186 self, tmp_path: pathlib.Path
187 ) -> None:
188 """build_mpack with have=[base_tip] sends only delta commits, not the full history."""
189 repo = _make_repo(tmp_path)
190 base_tip, base_blobs = _populate(repo, n_commits=50, n_unique_objects=100, start=0)
191
192 # New commits on top of the base, with fresh objects.
193 delta_tip, delta_blobs = _populate(
194 repo, n_commits=20, n_unique_objects=50, start=1000
195 )
196 # Chain delta to base by writing a commit that has base_tip as parent.
197 ts = datetime.datetime(2026, 1, 2, tzinfo=datetime.timezone.utc)
198 sid = compute_snapshot_id(delta_blobs)
199 chained_cid = compute_commit_id(
200 parent_ids=[delta_tip, base_tip],
201 snapshot_id=sid,
202 message="merge",
203 committed_at_iso=ts.isoformat(),
204 author="bench",
205 )
206 chained = CommitRecord(
207 commit_id=chained_cid,
208 branch="main",
209 snapshot_id=sid,
210 message="merge",
211 committed_at=ts,
212 parent_commit_id=delta_tip,
213 parent2_commit_id=base_tip,
214 author="bench",
215 metadata={},
216 structured_delta=None,
217 sem_ver_bump="none",
218 breaking_changes=[],
219 agent_id="",
220 model_id="",
221 toolchain_id="",
222 prompt_hash="",
223 signature="",
224 signer_key_id="",
225 )
226 write_commit(repo, chained)
227 write_branch_ref(repo, "main", chained_cid)
228
229 # Full mpack (no have).
230 full_mpack = build_mpack(repo, [chained_cid])
231 # Delta mpack: receiver already has base history.
232 delta_mpack = build_mpack(repo, [chained_cid], have=[base_tip])
233
234 assert len(delta_mpack["commits"]) < len(full_mpack["commits"]), (
235 "have= filter must reduce commit count"
236 )
237 # The base objects must not be in the delta mpack since they share the snapshot.
238 full_oids = {o["object_id"] for o in full_mpack["objects"]}
239 delta_oids = {o["object_id"] for o in delta_mpack["objects"]}
240 assert delta_oids.issubset(full_oids), "delta mpack must be a subset of full mpack"
241
242 @pytest.mark.slow
243 def test_build_mpack_10k_objects_under_60s(
244 self, tmp_path: pathlib.Path
245 ) -> None:
246 """build_mpack on 10 000 objects must complete in < 60 s.
247
248 This extrapolates to 100 000 objects at the same rate: 100k / 2000 ≈ 50 s,
249 within the plan target of 60 s.
250 """
251 repo = _make_repo(tmp_path)
252 N = 10_000
253 tip, _ = _populate(repo, n_commits=100, n_unique_objects=N)
254
255 t0 = time.perf_counter()
256 mpack = build_mpack(repo, [tip])
257 elapsed = time.perf_counter() - t0
258
259 assert len(mpack["objects"]) == N
260 assert elapsed < 60.0, (
261 f"build_mpack({N} objects) took {elapsed:.1f}s — target < 60 s. "
262 f"Rate: {N/elapsed:.0f} objects/sec."
263 )
264
265
266 class TestCollectObjectIdsThroughput:
267 """collect_object_ids must be significantly faster than build_mpack.
268
269 It performs the same BFS + manifest traversal but skips reading blob bytes.
270 The result feeds client-side deduplication so only missing objects are sent.
271 """
272
273 def test_collect_object_ids_no_blob_reads(
274 self, tmp_path: pathlib.Path
275 ) -> None:
276 """collect_object_ids on 500 objects must be < 1 s (no blob reads)."""
277 repo = _make_repo(tmp_path)
278 N = 500
279 tip, blobs = _populate(repo, n_commits=50, n_unique_objects=N)
280
281 t0 = time.perf_counter()
282 oids = collect_object_ids(repo, [tip])
283 elapsed = time.perf_counter() - t0
284
285 assert len(oids) == N, f"Expected {N} OIDs, got {len(oids)}"
286 assert elapsed < 1.0, (
287 f"collect_object_ids({N}) took {elapsed:.3f}s — expected < 1 s. "
288 "It must not read blob bytes; only BFS + manifest traversal."
289 )
290
291 def test_collect_object_ids_faster_than_build_mpack(
292 self, tmp_path: pathlib.Path
293 ) -> None:
294 """collect_object_ids must be faster than build_mpack for the same input."""
295 repo = _make_repo(tmp_path)
296 N = 200
297 tip, _ = _populate(repo, n_commits=20, n_unique_objects=N)
298
299 t_collect = time.perf_counter()
300 oids = collect_object_ids(repo, [tip])
301 t_collect = time.perf_counter() - t_collect
302
303 t_build = time.perf_counter()
304 mpack = build_mpack(repo, [tip])
305 t_build = time.perf_counter() - t_build
306
307 assert len(oids) == len(mpack["objects"]) == N
308 assert t_collect <= t_build, (
309 f"collect_object_ids ({t_collect:.3f}s) must be ≤ build_mpack ({t_build:.3f}s) — "
310 "collect skips blob reads, build reads every byte."
311 )
312
313 def test_collect_object_ids_have_excludes_ancestors(
314 self, tmp_path: pathlib.Path
315 ) -> None:
316 """collect_object_ids with have= returns only new object IDs."""
317 repo = _make_repo(tmp_path)
318 base_tip, base_blobs = _populate(repo, n_commits=10, n_unique_objects=50, start=0)
319 delta_tip, delta_blobs = _populate(repo, n_commits=5, n_unique_objects=30, start=100)
320
321 all_oids = collect_object_ids(repo, [delta_tip])
322 delta_oids = collect_object_ids(repo, [delta_tip], have=[base_tip])
323
324 # Delta must be a subset and contain only the 30 new objects.
325 assert set(delta_blobs.values()).issubset(set(all_oids))
326 assert len(delta_oids) == len(set(delta_blobs.values())), (
327 f"Expected {len(set(delta_blobs.values()))} delta OIDs, got {len(delta_oids)}"
328 )
329
330
331 # ---------------------------------------------------------------------------
332 # Phase 3.4.2 — apply_mpack throughput
333 # ---------------------------------------------------------------------------
334
335
336 class TestApplyMPackThroughput:
337 """apply_mpack must sustain ≥ 1 500 objects/sec in the object-write loop.
338
339 fsync is mocked: the test measures the mpack unpacking + hash-verify +
340 mkstemp + fchmod + os.replace pipeline without OS I/O latency. Durability
341 ordering is verified by test_integrity_I2_fsync.py.
342 """
343
344 _MIN_OBJECTS_PER_SEC: int = 1_500
345
346 @pytest.fixture(autouse=True)
347 def no_fsync(self) -> None:
348 """Mock out all fsync calls so the test measures algorithmic throughput."""
349 with patch("muse.core.object_store._fsync_fd", return_value=None), \
350 patch("muse.core.store.os.fsync", return_value=None), \
351 patch("muse.core.store.fcntl.fcntl", return_value=0):
352 yield
353
354 @pytest.mark.perf
355 def test_apply_mpack_1k_objects_rate(self, tmp_path: pathlib.Path) -> None:
356 """apply_mpack of a 1 000-object mpack must achieve ≥ _MIN_OBJECTS_PER_SEC."""
357 src = _make_repo(tmp_path / "src")
358 N = 1_000
359 tip, _ = _populate(src, n_commits=50, n_unique_objects=N)
360 mpack = build_mpack(src, [tip])
361 assert len(mpack["objects"]) == N
362
363 dst = _fresh_repo(tmp_path / "dst")
364
365 t0 = time.perf_counter()
366 result = apply_mpack(dst, mpack)
367 elapsed = time.perf_counter() - t0
368
369 assert result["objects_written"] == N
370 rate = N / elapsed
371 assert rate >= self._MIN_OBJECTS_PER_SEC, (
372 f"apply_mpack throughput {rate:.0f} objects/sec < {self._MIN_OBJECTS_PER_SEC} minimum. "
373 f"({N} objects took {elapsed:.2f}s.)"
374 )
375
376 def test_apply_mpack_idempotent_second_apply_skips_all(
377 self, tmp_path: pathlib.Path
378 ) -> None:
379 """Applying the same mpack twice: second apply must skip every object."""
380 src = _make_repo(tmp_path / "src")
381 tip, _ = _populate(src, n_commits=20, n_unique_objects=100)
382 mpack = build_mpack(src, [tip])
383
384 dst = _fresh_repo(tmp_path / "dst")
385 r1 = apply_mpack(dst, mpack)
386 r2 = apply_mpack(dst, mpack)
387
388 assert r1["objects_written"] == 100
389 assert r2["objects_written"] == 0
390 assert r2["objects_skipped"] == 100, (
391 f"Expected 100 skipped on second apply, got {r2['objects_skipped']}"
392 )
393
394 @pytest.mark.slow
395 def test_apply_mpack_10k_objects_under_60s(
396 self, tmp_path: pathlib.Path
397 ) -> None:
398 """apply_mpack of a 10 000-object mpack must complete in < 60 s."""
399 src = _make_repo(tmp_path / "src")
400 N = 10_000
401 tip, _ = _populate(src, n_commits=100, n_unique_objects=N)
402 mpack = build_mpack(src, [tip])
403
404 dst = _fresh_repo(tmp_path / "dst")
405
406 t0 = time.perf_counter()
407 result = apply_mpack(dst, mpack)
408 elapsed = time.perf_counter() - t0
409
410 assert result["objects_written"] == N
411 assert elapsed < 60.0, (
412 f"apply_mpack({N} objects) took {elapsed:.1f}s — target < 60 s. "
413 f"Rate: {N/elapsed:.0f} objects/sec."
414 )
415
416
417 # ---------------------------------------------------------------------------
418 # Phase 3.4.3 — verify-pack
419 # ---------------------------------------------------------------------------
420
421
422 class TestVerifyPackIntegrity:
423 """verify-pack must detect hash mismatches and work fast in --stat mode."""
424
425 def test_verify_pack_stat_returns_counts(
426 self, tmp_path: pathlib.Path
427 ) -> None:
428 """verify-pack --stat must count objects/snapshots/commits without hashing."""
429 from tests.cli_test_helper import CliRunner
430 import json
431
432 repo = _make_repo(tmp_path)
433 tip, _ = _populate(repo, n_commits=20, n_unique_objects=50)
434 mpack = build_mpack(repo, [tip])
435 raw = msgpack.packb(mpack, use_bin_type=True)
436
437 mpack_file = tmp_path / "pack.muse"
438 mpack_file.write_bytes(raw)
439 (config_toml_path(repo)).write_text("")
440
441 runner = CliRunner()
442 result = runner.invoke(
443 None,
444 [
445 "verify-pack",
446 "--stat",
447 "--no-local",
448 "--json",
449 "--file", str(mpack_file),
450 ],
451 env={"MUSE_REPO_ROOT": str(repo)},
452 )
453 assert result.exit_code == 0, f"verify-pack --stat failed: {result.output}"
454 payload = json.loads(result.output)
455 assert payload["objects"] == 50
456 assert payload["commits"] == 20
457
458 def test_verify_pack_detects_hash_mismatch(
459 self, tmp_path: pathlib.Path
460 ) -> None:
461 """verify-pack must flag an object whose content hash does not match its ID."""
462 from tests.cli_test_helper import CliRunner
463 import json
464
465 repo = _make_repo(tmp_path)
466 tip, blobs = _populate(repo, n_commits=5, n_unique_objects=10)
467 mpack = build_mpack(repo, [tip])
468
469 # Tamper: set a wrong content for the first object while keeping the declared ID.
470 tampered_obj: ObjectPayload = {
471 "object_id": mpack["objects"][0]["object_id"],
472 "content": b"TAMPERED_CONTENT",
473 }
474 tampered: MPack = {
475 "commits": mpack["commits"],
476 "snapshots": mpack["snapshots"],
477 "objects": [tampered_obj] + mpack["objects"][1:],
478 "summary": mpack.get("summary", {}),
479 "meta": mpack.get("meta", {}),
480 }
481 raw = msgpack.packb(tampered, use_bin_type=True)
482 mpack_file = tmp_path / "tampered.muse"
483 mpack_file.write_bytes(raw)
484 (config_toml_path(repo)).write_text("")
485
486 runner = CliRunner()
487 result = runner.invoke(
488 None,
489 [
490 "verify-pack",
491 "--no-local",
492 "--json",
493 "--file", str(mpack_file),
494 ],
495 env={"MUSE_REPO_ROOT": str(repo)},
496 )
497 assert result.exit_code != 0, "verify-pack must exit non-zero when hash mismatches"
498 payload = json.loads(result.output)
499 assert payload["all_ok"] is False
500 assert any("hash mismatch" in f["error"] for f in payload["failures"]), (
501 f"Expected 'hash mismatch' in failures: {payload['failures']}"
502 )
503
504 @pytest.mark.slow
505 def test_verify_pack_10k_objects_under_120s(
506 self, tmp_path: pathlib.Path
507 ) -> None:
508 """verify-pack of a 10 000-object mpack must complete in < 120 s.
509
510 SHA-256 on M4 Silicon processes ~3 GiB/s; 10k × 4 KiB = 40 MiB → < 1 s.
511 The 120 s ceiling catches pathological I/O or per-object overhead.
512 """
513 from tests.cli_test_helper import CliRunner
514 import json
515
516 repo = _make_repo(tmp_path)
517 N = 10_000
518 tip, _ = _populate(repo, n_commits=100, n_unique_objects=N)
519 mpack = build_mpack(repo, [tip])
520 raw = msgpack.packb(mpack, use_bin_type=True)
521 mpack_file = tmp_path / "pack10k.muse"
522 mpack_file.write_bytes(raw)
523 (config_toml_path(repo)).write_text("")
524
525 t0 = time.perf_counter()
526 runner = CliRunner()
527 result = runner.invoke(
528 None,
529 [
530 "verify-pack",
531 "--no-local",
532 "--json",
533 "--file", str(mpack_file),
534 ],
535 env={"MUSE_REPO_ROOT": str(repo)},
536 )
537 elapsed = time.perf_counter() - t0
538
539 assert result.exit_code == 0, f"verify-pack failed: {result.output[:200]}"
540 payload = json.loads(result.output)
541 assert payload["all_ok"] is True
542 assert payload["objects_checked"] == N
543 assert elapsed < 120.0, (
544 f"verify-pack({N} objects) took {elapsed:.1f}s — target < 120 s."
545 )
546
547
548 # ---------------------------------------------------------------------------
549 # Phase 3.4.4 — cap and guard enforcement
550 # ---------------------------------------------------------------------------
551
552
553 class TestMPackCapEnforcement:
554 """MPack-bomb and size-cap guards must fire correctly."""
555
556 def test_apply_mpack_rejects_mpack_exceeding_max_pack_objects(
557 self, tmp_path: pathlib.Path
558 ) -> None:
559 """apply_mpack raises ValueError when total_items > MAX_PACK_OBJECTS.
560
561 MAX_PACK_OBJECTS counts commits + snapshots + objects combined — not
562 per-type. An mpack with MAX_PACK_OBJECTS + 1 total items is rejected.
563 """
564 repo = _fresh_repo(tmp_path)
565 oversized: MPack = {
566 "commits": [{}] * (MAX_PACK_OBJECTS + 1),
567 "snapshots": [],
568 "objects": [],
569 }
570 with pytest.raises(ValueError, match="Pack rejected"):
571 apply_mpack(repo, oversized)
572
573 def test_apply_mpack_accepts_mpack_at_exact_cap(
574 self, tmp_path: pathlib.Path
575 ) -> None:
576 """apply_mpack does NOT raise when total_items == MAX_PACK_OBJECTS.
577
578 Items are malformed (empty dicts) so they are skipped as bad entries,
579 but the cap check must pass.
580 """
581 repo = _fresh_repo(tmp_path)
582 at_cap: MPack = {
583 "commits": [{}] * MAX_PACK_OBJECTS,
584 "snapshots": [],
585 "objects": [],
586 }
587 # Must not raise ValueError for the cap — skips malformed entries instead.
588 result = apply_mpack(repo, at_cap)
589 # Each empty-dict commit is missing commit_id and snapshot_id, so every
590 # one is skipped by the essential-field guard added to apply_mpack.
591 assert result["commits_written"] == 0, (
592 "All malformed empty-dict commits must be skipped, not written"
593 )
594
595 def test_apply_mpack_total_items_cap_is_cross_type(
596 self, tmp_path: pathlib.Path
597 ) -> None:
598 """MAX_PACK_OBJECTS applies across commits+snapshots+objects, not per-type.
599
600 80 000 objects + 20 000 commits + 1 snapshot = 100 001 → rejected.
601 """
602 repo = _fresh_repo(tmp_path)
603 cross_type: MPack = {
604 "commits": [{}] * 20_000,
605 "snapshots": [{}] * 1,
606 "objects": [{}] * 80_000,
607 }
608 with pytest.raises(ValueError, match="Pack rejected"):
609 apply_mpack(repo, cross_type)
610
611 def test_apply_mpack_oversized_object_is_skipped_not_raised(
612 self, tmp_path: pathlib.Path
613 ) -> None:
614 """An object exceeding MAX_OBJECT_WRITE_BYTES is silently skipped.
615
616 This is documented behaviour: the per-object cap logs a warning and
617 increments the loop counter rather than raising an exception, so the
618 rest of the mpack is still applied.
619 """
620 repo = _fresh_repo(tmp_path)
621 good_data = b"x" * 64
622 good_oid = blob_id(good_data)
623 oversized_oid = blob_id(b"y") # real hash — but we'll fake the size check
624 # Construct an mpack with one valid object and one whose content we
625 # claim is MAX_OBJECT_WRITE_BYTES + 1 bytes.
626 # We use a real 1-byte payload but lie about the size by patching
627 # apply_mpack's check via len(raw) — we need an actually-oversized payload.
628 # Build a real oversized content string:
629 huge_data = b"z" * (MAX_OBJECT_WRITE_BYTES + 1)
630 huge_oid = blob_id(huge_data)
631 mpack: MPack = {
632 "commits": [],
633 "snapshots": [],
634 "objects": [
635 ObjectPayload(object_id=good_oid, content=good_data),
636 ObjectPayload(object_id=huge_oid, content=huge_data),
637 ],
638 }
639 result = apply_mpack(repo, mpack)
640 # Good object written; oversized object skipped.
641 assert result["objects_written"] == 1, (
642 f"Expected 1 object written (the good one), got {result['objects_written']}"
643 )
644 # Oversized object must NOT be in the store.
645 from muse.core.object_store import has_object
646 assert not has_object(repo, huge_oid), (
647 "Oversized object must be rejected and not written to store"
648 )
649
650 def test_apply_mpack_deduplicates_repeated_oid(
651 self, tmp_path: pathlib.Path
652 ) -> None:
653 """apply_mpack writes a repeated OID only once (dedup via seen_object_ids)."""
654 repo = _fresh_repo(tmp_path)
655 data = b"deduplicate-me" * 100
656 oid = blob_id(data)
657 REPEAT = 50
658 mpack: MPack = {
659 "commits": [],
660 "snapshots": [],
661 "objects": [ObjectPayload(object_id=oid, content=data)] * REPEAT,
662 }
663 result = apply_mpack(repo, mpack)
664 # First occurrence written; remaining 49 skipped.
665 assert result["objects_written"] == 1, (
666 f"Expected 1 write for {REPEAT} identical OIDs, got {result['objects_written']}"
667 )
668 assert result["objects_skipped"] == REPEAT - 1, (
669 f"Expected {REPEAT - 1} skipped, got {result['objects_skipped']}"
670 )
671
672 def test_apply_mpack_empty_mpack_is_noop(
673 self, tmp_path: pathlib.Path
674 ) -> None:
675 """apply_mpack on an mpack with no items returns all-zero counts."""
676 repo = _fresh_repo(tmp_path)
677 empty: MPack = {"commits": [], "snapshots": [], "objects": []}
678 result = apply_mpack(repo, empty)
679 assert result["commits_written"] == 0
680 assert result["snapshots_written"] == 0
681 assert result["objects_written"] == 0
682 assert result["objects_skipped"] == 0
683
684 def test_have_equals_want_produces_empty_mpack(
685 self, tmp_path: pathlib.Path
686 ) -> None:
687 """build_mpack with have=[tip] where tip is also in want returns empty mpack."""
688 repo = _make_repo(tmp_path)
689 tip, _ = _populate(repo, n_commits=10, n_unique_objects=20)
690
691 mpack = build_mpack(repo, [tip], have=[tip])
692
693 assert mpack["commits"] == [], (
694 "When have contains the want tip, BFS should yield 0 commits"
695 )
696 assert mpack["objects"] == [], (
697 "Empty commit set must produce empty object list"
698 )
699
700
701 # ---------------------------------------------------------------------------
702 # Phase 3.4.5 — memory ceiling
703 # ---------------------------------------------------------------------------
704
705
706 class TestMPackMemoryCeiling:
707 """build_mpack and apply_mpack peak memory must be proportional to blob payload.
708
709 build_mpack holds ALL object bytes in-memory simultaneously — this is a
710 known architectural property, not a bug. The test confirms:
711 1. Peak RSS ≈ total blob bytes (not 10× or 100×).
712 2. build_mpack does not accumulate unbounded intermediate structures.
713 """
714
715 def test_build_mpack_peak_rss_proportional_to_blob_total(
716 self, tmp_path: pathlib.Path
717 ) -> None:
718 """build_mpack peak allocation is ≤ 3× the total blob payload size."""
719 repo = _make_repo(tmp_path)
720 N = 500
721 BLOB_SZ = 4096 # 4 KiB
722 tip, _ = _populate(repo, n_commits=50, n_unique_objects=N, blob_size=BLOB_SZ)
723 blob_total_mib = N * BLOB_SZ / (1024 * 1024)
724
725 tracemalloc.start()
726 tracemalloc.clear_traces()
727 mpack = build_mpack(repo, [tip])
728 _, peak_bytes = tracemalloc.get_traced_memory()
729 tracemalloc.stop()
730
731 peak_mib = peak_bytes / (1024 * 1024)
732 ceiling_mib = blob_total_mib * 3 # generous: blobs + msgpack + overhead
733 assert len(mpack["objects"]) == N
734 assert peak_mib <= ceiling_mib, (
735 f"build_mpack peak {peak_mib:.1f} MiB exceeds 3× blob total "
736 f"({ceiling_mib:.1f} MiB for {N} × {BLOB_SZ//1024} KiB objects). "
737 "build_mpack must not accumulate more than the object bytes themselves."
738 )
739
740 def test_apply_mpack_peak_rss_under_64_mib_for_small_objects(
741 self, tmp_path: pathlib.Path
742 ) -> None:
743 """apply_mpack of 500 × 4 KiB objects stays under 64 MiB."""
744 src = _make_repo(tmp_path / "src")
745 tip, _ = _populate(src, n_commits=50, n_unique_objects=500, blob_size=4096)
746 mpack = build_mpack(src, [tip])
747
748 dst = _fresh_repo(tmp_path / "dst")
749
750 tracemalloc.start()
751 tracemalloc.clear_traces()
752 apply_mpack(dst, mpack)
753 _, peak_bytes = tracemalloc.get_traced_memory()
754 tracemalloc.stop()
755
756 peak_mib = peak_bytes / (1024 * 1024)
757 assert peak_mib <= 64, (
758 f"apply_mpack(500 × 4 KiB) peak {peak_mib:.1f} MiB — expected ≤ 64 MiB."
759 )
760
761
762 # ---------------------------------------------------------------------------
763 # Phase 3.4.6 — round-trip integrity
764 # ---------------------------------------------------------------------------
765
766
767 class TestMPackRoundTrip:
768 """End-to-end round-trip: build_mpack → msgpack serialize → apply_mpack → verify."""
769
770 def test_roundtrip_all_objects_restored(
771 self, tmp_path: pathlib.Path
772 ) -> None:
773 """build_mpack → msgpack → apply_mpack round-trip: all objects readable on dst."""
774 src = _make_repo(tmp_path / "src")
775 N_OBJECTS = 200
776 N_COMMITS = 30
777 tip, blobs = _populate(src, n_commits=N_COMMITS, n_unique_objects=N_OBJECTS)
778 mpack = build_mpack(src, [tip])
779
780 # Serialize (simulates wire transfer).
781 raw = msgpack.packb(mpack, use_bin_type=True)
782
783 # Re-hydrate using safe_unpackb (the same path as unpack-objects).
784 from muse.core.io import safe_unpackb, MAX_PACK_MSGPACK_BYTES
785 restored_dict = safe_unpackb(raw, context="roundtrip", max_bytes=MAX_PACK_MSGPACK_BYTES, allow_binary=True)
786 assert isinstance(restored_dict, dict)
787
788 from muse.core.mpack import ObjectPayload as OP, MPack as PB
789 raw_objects = restored_dict.get("objects") or []
790 objects: list[OP] = []
791 for item in raw_objects:
792 if isinstance(item, dict):
793 oid = item.get("object_id", "")
794 content = item.get("content", b"")
795 if isinstance(oid, str) and isinstance(content, (bytes, bytearray)):
796 objects.append(OP(object_id=oid, content=bytes(content)))
797
798 hydrated: PB = {
799 "commits": [c for c in (restored_dict.get("commits") or []) if isinstance(c, dict)],
800 "snapshots": [s for s in (restored_dict.get("snapshots") or []) if isinstance(s, dict)],
801 "objects": objects,
802 }
803 dst = _fresh_repo(tmp_path / "dst")
804 result = apply_mpack(dst, hydrated)
805
806 # Every source object must be readable on the destination.
807 from muse.core.object_store import read_object, has_object
808 missing = [oid for oid in blobs.values() if not has_object(dst, oid)]
809 assert not missing, (
810 f"{len(missing)}/{N_OBJECTS} objects missing after round-trip: "
811 f"{missing[:3]}"
812 )
813 assert result["commits_written"] == N_COMMITS
814 assert result["objects_written"] == N_OBJECTS
815
816 def test_roundtrip_msgpack_size_within_max_pack_bytes(
817 self, tmp_path: pathlib.Path
818 ) -> None:
819 """The serialised mpack for 1 000 × 4 KiB objects must be < MAX_PACK_MSGPACK_BYTES."""
820 src = _make_repo(tmp_path)
821 N = 1_000
822 tip, _ = _populate(src, n_commits=50, n_unique_objects=N, blob_size=4096)
823 mpack = build_mpack(src, [tip])
824 raw = msgpack.packb(mpack, use_bin_type=True)
825
826 limit = MAX_PACK_MSGPACK_BYTES
827 assert len(raw) < limit, (
828 f"MPack for {N} × 4 KiB objects is {len(raw):,} bytes — "
829 f"exceeds MAX_PACK_MSGPACK_BYTES ({limit:,} bytes / "
830 f"{limit // 1024 // 1024} MiB)."
831 )
File History 1 commit
sha256:b561b3dbe85d8f76917476655c0b39e77f247f795a9eb924ff614cdb550e505e refactor(store): Phase 1 — extract raw I/O primitives into … Sonnet 4.6 minor 29 days ago