"""TDD — mpack byte-range index: byte_offset + byte_length stored on push, used for O(object_size) range GET reads instead of full-mpack download. Coverage matrix --------------- BR-1 byte_offset and byte_length columns exist on musehub_mpack_index. BR-2 After push unpack, each mpack_index row has non-NULL byte_offset and byte_length that point to valid bytes in the stored mpack. BR-3 Slicing mpack_bytes[byte_offset : byte_offset + byte_length] recovers the (possibly compressed) object content stored for that oid. BR-4 decompressing the slice (if encoding=zstd) returns the raw object bytes. BR-5 BlobBackend.get_range() issues a Range: bytes= GET and returns the slice. BR-6 read_object_bytes() for a mpack:// URI uses get_range (not get_mpack) when byte_offset is present. BR-7 read_object_bytes() falls back to full mpack download when byte_offset is NULL (rows inserted before the migration). """ from __future__ import annotations import hashlib import struct from typing import Any from unittest.mock import AsyncMock, MagicMock, patch import msgpack import pytest from sqlalchemy import inspect, select, text from sqlalchemy.ext.asyncio import AsyncSession from muse.core.mpack import build_wire_mpack from muse.core.types import blob_id from musehub.db.musehub_repo_models import MusehubMPackIndex, MusehubObject # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _PACK_MAGIC = b"MUSE" _OID_BYTES = 71 # len("sha256:") + 64 hex chars _PACK_HEADER = 13 # magic(4) + version(1) + count(8) _PER_OBJ_HEADER = 79 # OID(71) + length(8) _WIRE_SEC_BLOBS = 1 # section type for OBJECTS in wire mpack def _make_raw_object(content: bytes) -> tuple[str, bytes]: """Return (sha256:hex, content) for a blob.""" oid = "sha256:" + hashlib.sha256(content).hexdigest() return oid, content def _build_test_mpack(blobs: list[tuple[str, bytes]]) -> tuple[bytes, str]: """Build a wire mpack containing *blobs*, return (wire_bytes, mpack_key).""" wire_bytes = build_wire_mpack({ "commits": [], "snapshots": [], "blobs": [{"object_id": oid, "content": data} for oid, data in blobs], "tags": [], }) return wire_bytes, blob_id(wire_bytes) def _parse_objects_section_offset(wire_bytes: bytes) -> int: """Return the byte offset of the OBJECTS section within *wire_bytes*.""" section_count = wire_bytes[5] cursor = 6 for _ in range(section_count): sec_type = wire_bytes[cursor] sec_offset, _sec_length = struct.unpack_from(" "dict[str, tuple[int, int]]": """Return {oid: (absolute_byte_offset, compressed_byte_length)} by walking the OBJECTS section of *wire_bytes*. This is the reference implementation used to verify the server's output. """ objects_start = _parse_objects_section_offset(wire_bytes) cursor = objects_start + _PACK_HEADER # skip pack magic+version+count result: dict[str, tuple[int, int]] = {} for _ in range(len(oids)): oid_bytes = wire_bytes[cursor : cursor + _OID_BYTES] oid = oid_bytes.decode() cursor += _OID_BYTES (length,) = struct.unpack_from(" None: """BR-1: musehub_mpack_index has byte_offset and byte_length columns.""" from sqlalchemy import inspect as sa_inspect from sqlalchemy.engine import create_engine import os # Check via SQLAlchemy model inspection mapper = MusehubMPackIndex.__mapper__ column_names = {col.key for col in mapper.columns} assert "byte_offset" in column_names, ( "musehub_mpack_index is missing 'byte_offset' column — " "run the migration to add it" ) assert "byte_length" in column_names, ( "musehub_mpack_index is missing 'byte_length' column — " "run the migration to add it" ) # --------------------------------------------------------------------------- # BR-2 Push unpack stores non-NULL byte_offset / byte_length # --------------------------------------------------------------------------- @pytest.mark.tier2 async def test_BR2_push_unpack_stores_byte_offsets( db_session: AsyncSession, ) -> None: """BR-2: After push unpack, mpack_index rows have non-NULL byte_offset.""" from musehub.services.musehub_wire_push import record_mpack_bytes_uploaded oid1, data1 = _make_raw_object(b"hello world from BR2 test") oid2, data2 = _make_raw_object(b"second object in BR2 pack") wire_bytes, mpack_key = _build_test_mpack([(oid1, data1), (oid2, data2)]) # Simulate push unpack storing byte offsets from musehub.services.musehub_wire_push import compute_object_byte_offsets offsets = compute_object_byte_offsets(wire_bytes) # Verify offsets map correctly assert oid1 in offsets, f"oid1 not in offsets: {list(offsets)[:3]}" assert oid2 in offsets, f"oid2 not in offsets: {list(offsets)[:3]}" off1, len1 = offsets[oid1] off2, len2 = offsets[oid2] assert off1 > 0, "byte_offset must be positive" assert len1 == len(data1), f"byte_length {len1} != content length {len(data1)}" assert off2 > off1, "oid2 must follow oid1" assert len2 == len(data2) # --------------------------------------------------------------------------- # BR-3 Slicing mpack_bytes at offset recovers compressed content # --------------------------------------------------------------------------- def test_BR3_slice_recovers_object_content() -> None: """BR-3: wire_bytes[offset:offset+length] is the exact stored content bytes.""" from musehub.services.musehub_wire_push import compute_object_byte_offsets oid1, data1 = _make_raw_object(b"alpha payload for BR3") oid2, data2 = _make_raw_object(b"beta payload for BR3 test case") wire_bytes, _ = _build_test_mpack([(oid1, data1), (oid2, data2)]) offsets = compute_object_byte_offsets(wire_bytes) for oid, expected_data in [(oid1, data1), (oid2, data2)]: off, length = offsets[oid] sliced = wire_bytes[off : off + length] assert sliced == expected_data, ( f"Slice at offset={off} length={length} for {oid[:20]} " f"does not match expected content" ) # --------------------------------------------------------------------------- # BR-4 zstd-compressed content: slice + decompress = raw bytes # --------------------------------------------------------------------------- def test_BR4_slice_is_raw_bytes_even_when_input_was_zstd() -> None: """BR-4: build_wire_mpack decompresses zstd before packing (see mpack.py fix), so the slice at the computed offset is always raw bytes — no decompression needed. """ import zstandard as zstd from musehub.services.musehub_wire_push import compute_object_byte_offsets raw = b"uncompressed payload for BR4 " * 20 cctx = zstd.ZstdCompressor(level=3) compressed = cctx.compress(raw) oid = "sha256:" + hashlib.sha256(raw).hexdigest() # Build mpack with zstd-encoded content — build_wire_mpack decompresses it wire_bytes = build_wire_mpack({ "commits": [], "snapshots": [], "tags": [], "blobs": [{"object_id": oid, "content": compressed, "encoding": "zstd"}], }) offsets = compute_object_byte_offsets(wire_bytes) off, length = offsets[oid] sliced = wire_bytes[off : off + length] # Slice is the raw bytes (build_wire_mpack already decompressed) assert sliced == raw, "Slice must be raw bytes since build_wire_mpack decompresses zstd" assert length == len(raw), f"byte_length {length} must match raw size {len(raw)}" # --------------------------------------------------------------------------- # BR-5 BlobBackend.get_range() issues Range header # --------------------------------------------------------------------------- async def test_BR5_get_range_issues_range_header() -> None: """BR-5: BlobBackend.get_range() calls S3 get_object with a Range header.""" from musehub.storage.backends import BlobBackend backend = BlobBackend.__new__(BlobBackend) backend._bucket = "test-bucket" backend._region = "us-east-1" backend._endpoint_url = None backend._public_endpoint_url = None backend._cdn_base_url = None backend._access_key_id = None backend._secret_access_key = None backend._client = None mpack_key = "sha256:" + "a" * 64 byte_offset = 1234 byte_length = 56 mock_client = MagicMock() mock_body = MagicMock() mock_body.read.return_value = b"x" * byte_length mock_client.get_object.return_value = {"Body": mock_body} backend._client = mock_client result = await backend.get_range(mpack_key, byte_offset, byte_length) expected_key = f"mpacks/{mpack_key}" expected_range = f"bytes={byte_offset}-{byte_offset + byte_length - 1}" mock_client.get_object.assert_called_once_with( Bucket="test-bucket", Key=expected_key, Range=expected_range, ) assert result == b"x" * byte_length # --------------------------------------------------------------------------- # BR-6 read_object_bytes uses get_range when byte_offset is present # --------------------------------------------------------------------------- async def test_BR6_read_object_bytes_uses_range_get_when_offset_present() -> None: """BR-6: read_object_bytes issues a range GET when byte_offset is set.""" from musehub.storage.backends import read_object_bytes raw_content = b"range get content for BR6" oid = "sha256:" + hashlib.sha256(raw_content).hexdigest() mpack_key = "sha256:" + "b" * 64 byte_offset = 512 byte_length = len(raw_content) obj = MagicMock() obj.object_id = oid obj.content_cache = None obj.storage_uri = f"mpack://{mpack_key}" obj.byte_offset = byte_offset obj.byte_length = byte_length mock_backend = AsyncMock() mock_backend.get_range = AsyncMock(return_value=raw_content) mock_backend.get_mpack = AsyncMock(return_value=None) # must NOT be called with patch("musehub.storage.backends.get_backend", return_value=mock_backend): result = await read_object_bytes(obj) mock_backend.get_range.assert_called_once_with(mpack_key, byte_offset, byte_length) mock_backend.get_mpack.assert_not_called() assert result == raw_content # --------------------------------------------------------------------------- # BR-7 read_object_bytes falls back to full mpack when byte_offset is NULL # --------------------------------------------------------------------------- async def test_BR7_read_object_bytes_falls_back_when_offset_is_null() -> None: """BR-7: read_object_bytes falls back to full mpack when byte_offset=None.""" from musehub.storage.backends import read_object_bytes from muse.core.mpack import build_wire_mpack as _bwm raw_content = b"fallback content for BR7" oid = "sha256:" + hashlib.sha256(raw_content).hexdigest() mpack_key = "sha256:" + "c" * 64 full_mpack = _bwm({ "commits": [], "snapshots": [], "tags": [], "blobs": [{"object_id": oid, "content": raw_content}], }) obj = MagicMock() obj.object_id = oid obj.content_cache = None obj.storage_uri = f"mpack://{mpack_key}" obj.byte_offset = None # ← NULL: must fall back to full download obj.byte_length = None mock_backend = AsyncMock() mock_backend.get_mpack = AsyncMock(return_value=full_mpack) mock_backend.get_range = AsyncMock(return_value=None) # must NOT be called with patch("musehub.storage.backends.get_backend", return_value=mock_backend): result = await read_object_bytes(obj) mock_backend.get_mpack.assert_called_once_with(mpack_key) mock_backend.get_range.assert_not_called() assert result == raw_content