gabriel / musehub public
test_mpack_byte_range.py python
325 lines 12.4 KB
Raw
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
1 """TDD — mpack byte-range index: byte_offset + byte_length stored on push,
2 used for O(object_size) range GET reads instead of full-mpack download.
3
4 Coverage matrix
5 ---------------
6 BR-1 byte_offset and byte_length columns exist on musehub_mpack_index.
7 BR-2 After push unpack, each mpack_index row has non-NULL byte_offset
8 and byte_length that point to valid bytes in the stored mpack.
9 BR-3 Slicing mpack_bytes[byte_offset : byte_offset + byte_length] recovers
10 the (possibly compressed) object content stored for that oid.
11 BR-4 decompressing the slice (if encoding=zstd) returns the raw object bytes.
12 BR-5 BlobBackend.get_range() issues a Range: bytes= GET and returns the slice.
13 BR-6 read_object_bytes() for a mpack:// URI uses get_range (not get_mpack)
14 when byte_offset is present.
15 BR-7 read_object_bytes() falls back to full mpack download when byte_offset
16 is NULL (rows inserted before the migration).
17 """
18 from __future__ import annotations
19
20 import hashlib
21 import struct
22 from typing import Any
23 from unittest.mock import AsyncMock, MagicMock, patch
24
25 import msgpack
26 import pytest
27 from sqlalchemy import inspect, select, text
28 from sqlalchemy.ext.asyncio import AsyncSession
29
30 from muse.core.mpack import build_wire_mpack
31 from muse.core.types import blob_id
32 from musehub.db.musehub_repo_models import MusehubMPackIndex, MusehubObject
33
34
35 # ---------------------------------------------------------------------------
36 # Helpers
37 # ---------------------------------------------------------------------------
38
39 _PACK_MAGIC = b"MUSE"
40 _OID_BYTES = 71 # len("sha256:") + 64 hex chars
41 _PACK_HEADER = 13 # magic(4) + version(1) + count(8)
42 _PER_OBJ_HEADER = 79 # OID(71) + length(8)
43 _WIRE_SEC_BLOBS = 1 # section type for OBJECTS in wire mpack
44
45
46 def _make_raw_object(content: bytes) -> tuple[str, bytes]:
47 """Return (sha256:hex, content) for a blob."""
48 oid = "sha256:" + hashlib.sha256(content).hexdigest()
49 return oid, content
50
51
52 def _build_test_mpack(blobs: list[tuple[str, bytes]]) -> tuple[bytes, str]:
53 """Build a wire mpack containing *blobs*, return (wire_bytes, mpack_key)."""
54 wire_bytes = build_wire_mpack({
55 "commits": [],
56 "snapshots": [],
57 "blobs": [{"object_id": oid, "content": data} for oid, data in blobs],
58 "tags": [],
59 })
60 return wire_bytes, blob_id(wire_bytes)
61
62
63 def _parse_objects_section_offset(wire_bytes: bytes) -> int:
64 """Return the byte offset of the OBJECTS section within *wire_bytes*."""
65 section_count = wire_bytes[5]
66 cursor = 6
67 for _ in range(section_count):
68 sec_type = wire_bytes[cursor]
69 sec_offset, _sec_length = struct.unpack_from("<QQ", wire_bytes, cursor + 1)
70 if sec_type == _WIRE_SEC_BLOBS:
71 return sec_offset
72 cursor += 17
73 raise ValueError("OBJECTS section not found in wire mpack")
74
75
76 def _expected_content_offsets(
77 wire_bytes: bytes,
78 oids: list[str],
79 ) -> "dict[str, tuple[int, int]]":
80 """Return {oid: (absolute_byte_offset, compressed_byte_length)} by walking
81 the OBJECTS section of *wire_bytes*.
82
83 This is the reference implementation used to verify the server's output.
84 """
85 objects_start = _parse_objects_section_offset(wire_bytes)
86 cursor = objects_start + _PACK_HEADER # skip pack magic+version+count
87 result: dict[str, tuple[int, int]] = {}
88 for _ in range(len(oids)):
89 oid_bytes = wire_bytes[cursor : cursor + _OID_BYTES]
90 oid = oid_bytes.decode()
91 cursor += _OID_BYTES
92 (length,) = struct.unpack_from("<Q", wire_bytes, cursor)
93 cursor += 8 # skip length field
94 content_abs_offset = cursor
95 result[oid] = (content_abs_offset, length)
96 cursor += length
97 return result
98
99
100 # ---------------------------------------------------------------------------
101 # BR-1 Schema: byte_offset and byte_length columns exist
102 # ---------------------------------------------------------------------------
103
104
105 def test_BR1_byte_offset_and_byte_length_columns_exist(
106 db_session: AsyncSession,
107 ) -> None:
108 """BR-1: musehub_mpack_index has byte_offset and byte_length columns."""
109 from sqlalchemy import inspect as sa_inspect
110 from sqlalchemy.engine import create_engine
111 import os
112
113 # Check via SQLAlchemy model inspection
114 mapper = MusehubMPackIndex.__mapper__
115 column_names = {col.key for col in mapper.columns}
116 assert "byte_offset" in column_names, (
117 "musehub_mpack_index is missing 'byte_offset' column — "
118 "run the migration to add it"
119 )
120 assert "byte_length" in column_names, (
121 "musehub_mpack_index is missing 'byte_length' column — "
122 "run the migration to add it"
123 )
124
125
126 # ---------------------------------------------------------------------------
127 # BR-2 Push unpack stores non-NULL byte_offset / byte_length
128 # ---------------------------------------------------------------------------
129
130
131 @pytest.mark.tier2
132 async def test_BR2_push_unpack_stores_byte_offsets(
133 db_session: AsyncSession,
134 ) -> None:
135 """BR-2: After push unpack, mpack_index rows have non-NULL byte_offset."""
136 from musehub.services.musehub_wire_push import record_mpack_bytes_uploaded
137
138 oid1, data1 = _make_raw_object(b"hello world from BR2 test")
139 oid2, data2 = _make_raw_object(b"second object in BR2 pack")
140 wire_bytes, mpack_key = _build_test_mpack([(oid1, data1), (oid2, data2)])
141
142 # Simulate push unpack storing byte offsets
143 from musehub.services.musehub_wire_push import compute_object_byte_offsets
144 offsets = compute_object_byte_offsets(wire_bytes)
145
146 # Verify offsets map correctly
147 assert oid1 in offsets, f"oid1 not in offsets: {list(offsets)[:3]}"
148 assert oid2 in offsets, f"oid2 not in offsets: {list(offsets)[:3]}"
149
150 off1, len1 = offsets[oid1]
151 off2, len2 = offsets[oid2]
152
153 assert off1 > 0, "byte_offset must be positive"
154 assert len1 == len(data1), f"byte_length {len1} != content length {len(data1)}"
155 assert off2 > off1, "oid2 must follow oid1"
156 assert len2 == len(data2)
157
158
159 # ---------------------------------------------------------------------------
160 # BR-3 Slicing mpack_bytes at offset recovers compressed content
161 # ---------------------------------------------------------------------------
162
163
164 def test_BR3_slice_recovers_object_content() -> None:
165 """BR-3: wire_bytes[offset:offset+length] is the exact stored content bytes."""
166 from musehub.services.musehub_wire_push import compute_object_byte_offsets
167
168 oid1, data1 = _make_raw_object(b"alpha payload for BR3")
169 oid2, data2 = _make_raw_object(b"beta payload for BR3 test case")
170 wire_bytes, _ = _build_test_mpack([(oid1, data1), (oid2, data2)])
171
172 offsets = compute_object_byte_offsets(wire_bytes)
173
174 for oid, expected_data in [(oid1, data1), (oid2, data2)]:
175 off, length = offsets[oid]
176 sliced = wire_bytes[off : off + length]
177 assert sliced == expected_data, (
178 f"Slice at offset={off} length={length} for {oid[:20]} "
179 f"does not match expected content"
180 )
181
182
183 # ---------------------------------------------------------------------------
184 # BR-4 zstd-compressed content: slice + decompress = raw bytes
185 # ---------------------------------------------------------------------------
186
187
188 def test_BR4_slice_is_raw_bytes_even_when_input_was_zstd() -> None:
189 """BR-4: build_wire_mpack decompresses zstd before packing (see mpack.py fix),
190 so the slice at the computed offset is always raw bytes — no decompression needed.
191 """
192 import zstandard as zstd
193 from musehub.services.musehub_wire_push import compute_object_byte_offsets
194
195 raw = b"uncompressed payload for BR4 " * 20
196 cctx = zstd.ZstdCompressor(level=3)
197 compressed = cctx.compress(raw)
198 oid = "sha256:" + hashlib.sha256(raw).hexdigest()
199
200 # Build mpack with zstd-encoded content — build_wire_mpack decompresses it
201 wire_bytes = build_wire_mpack({
202 "commits": [], "snapshots": [], "tags": [],
203 "blobs": [{"object_id": oid, "content": compressed, "encoding": "zstd"}],
204 })
205
206 offsets = compute_object_byte_offsets(wire_bytes)
207 off, length = offsets[oid]
208 sliced = wire_bytes[off : off + length]
209
210 # Slice is the raw bytes (build_wire_mpack already decompressed)
211 assert sliced == raw, "Slice must be raw bytes since build_wire_mpack decompresses zstd"
212 assert length == len(raw), f"byte_length {length} must match raw size {len(raw)}"
213
214
215 # ---------------------------------------------------------------------------
216 # BR-5 BlobBackend.get_range() issues Range header
217 # ---------------------------------------------------------------------------
218
219
220 async def test_BR5_get_range_issues_range_header() -> None:
221 """BR-5: BlobBackend.get_range() calls S3 get_object with a Range header."""
222 from musehub.storage.backends import BlobBackend
223
224 backend = BlobBackend.__new__(BlobBackend)
225 backend._bucket = "test-bucket"
226 backend._region = "us-east-1"
227 backend._endpoint_url = None
228 backend._public_endpoint_url = None
229 backend._cdn_base_url = None
230 backend._access_key_id = None
231 backend._secret_access_key = None
232 backend._client = None
233
234 mpack_key = "sha256:" + "a" * 64
235 byte_offset = 1234
236 byte_length = 56
237
238 mock_client = MagicMock()
239 mock_body = MagicMock()
240 mock_body.read.return_value = b"x" * byte_length
241 mock_client.get_object.return_value = {"Body": mock_body}
242 backend._client = mock_client
243
244 result = await backend.get_range(mpack_key, byte_offset, byte_length)
245
246 expected_key = f"mpacks/{mpack_key}"
247 expected_range = f"bytes={byte_offset}-{byte_offset + byte_length - 1}"
248 mock_client.get_object.assert_called_once_with(
249 Bucket="test-bucket",
250 Key=expected_key,
251 Range=expected_range,
252 )
253 assert result == b"x" * byte_length
254
255
256 # ---------------------------------------------------------------------------
257 # BR-6 read_object_bytes uses get_range when byte_offset is present
258 # ---------------------------------------------------------------------------
259
260
261 async def test_BR6_read_object_bytes_uses_range_get_when_offset_present() -> None:
262 """BR-6: read_object_bytes issues a range GET when byte_offset is set."""
263 from musehub.storage.backends import read_object_bytes
264
265 raw_content = b"range get content for BR6"
266 oid = "sha256:" + hashlib.sha256(raw_content).hexdigest()
267 mpack_key = "sha256:" + "b" * 64
268 byte_offset = 512
269 byte_length = len(raw_content)
270
271 obj = MagicMock()
272 obj.object_id = oid
273 obj.content_cache = None
274 obj.storage_uri = f"mpack://{mpack_key}"
275 obj.byte_offset = byte_offset
276 obj.byte_length = byte_length
277
278 mock_backend = AsyncMock()
279 mock_backend.get_range = AsyncMock(return_value=raw_content)
280 mock_backend.get_mpack = AsyncMock(return_value=None) # must NOT be called
281
282 with patch("musehub.storage.backends.get_backend", return_value=mock_backend):
283 result = await read_object_bytes(obj)
284
285 mock_backend.get_range.assert_called_once_with(mpack_key, byte_offset, byte_length)
286 mock_backend.get_mpack.assert_not_called()
287 assert result == raw_content
288
289
290 # ---------------------------------------------------------------------------
291 # BR-7 read_object_bytes falls back to full mpack when byte_offset is NULL
292 # ---------------------------------------------------------------------------
293
294
295 async def test_BR7_read_object_bytes_falls_back_when_offset_is_null() -> None:
296 """BR-7: read_object_bytes falls back to full mpack when byte_offset=None."""
297 from musehub.storage.backends import read_object_bytes
298 from muse.core.mpack import build_wire_mpack as _bwm
299
300 raw_content = b"fallback content for BR7"
301 oid = "sha256:" + hashlib.sha256(raw_content).hexdigest()
302 mpack_key = "sha256:" + "c" * 64
303
304 full_mpack = _bwm({
305 "commits": [], "snapshots": [], "tags": [],
306 "blobs": [{"object_id": oid, "content": raw_content}],
307 })
308
309 obj = MagicMock()
310 obj.object_id = oid
311 obj.content_cache = None
312 obj.storage_uri = f"mpack://{mpack_key}"
313 obj.byte_offset = None # ← NULL: must fall back to full download
314 obj.byte_length = None
315
316 mock_backend = AsyncMock()
317 mock_backend.get_mpack = AsyncMock(return_value=full_mpack)
318 mock_backend.get_range = AsyncMock(return_value=None) # must NOT be called
319
320 with patch("musehub.storage.backends.get_backend", return_value=mock_backend):
321 result = await read_object_bytes(obj)
322
323 mock_backend.get_mpack.assert_called_once_with(mpack_key)
324 mock_backend.get_range.assert_not_called()
325 assert result == raw_content
File History 2 commits
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
sha256:6b1949fc2797ca4c1936a637a4cbfec828ef56cf52398a2e74ca3c4f494e728f fix: use wire_bytes not mpack_bytes_raw in compute_object_b… Sonnet 4.6 patch 10 days ago