compression.py
python
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2
feat: add repair-commit wire endpoint (API parity with repa…
Opus 4.8
minor
⚠ breaking
1 day ago
| 1 | """Object decompression and delta reconstruction for wire transfer. |
| 2 | |
| 3 | Server-side counterpart to ``muse.core.compression``. The server only needs |
| 4 | to *decompress* and *apply* — it never compresses or computes deltas. |
| 5 | |
| 6 | Encoding values understood here: |
| 7 | ``"zstd"`` — zstd-compressed; call :func:`decompress_zstd`. |
| 8 | ``"zlib"`` — zlib-compressed; call :func:`decompress_zlib`. |
| 9 | ``"delta+zlib"`` — zlib-compressed delta stream; call :func:`apply_delta`. |
| 10 | """ |
| 11 | |
| 12 | import struct |
| 13 | import zlib |
| 14 | |
| 15 | # Two-byte headers that identify any of the four standard zlib compression levels. |
| 16 | # Objects pushed via the old wire path were stored zlib-compressed in R2; new |
| 17 | # objects are stored as plain bytes. decompress_if_needed() handles both so |
| 18 | # callers never need to know which storage generation produced a given object. |
| 19 | _ZLIB_MAGIC: tuple[bytes, ...] = (b"\x78\x01", b"\x78\x9c", b"\x78\xda", b"\x78\x5e") |
| 20 | _ZSTD_MAGIC: bytes = b"\x28\xb5\x2f\xfd" |
| 21 | |
| 22 | def decompress_zstd(data: bytes) -> bytes: |
| 23 | """Decompress zstd-compressed *data*. |
| 24 | |
| 25 | Raises on corrupt input so the caller can return 422. |
| 26 | """ |
| 27 | import zstandard |
| 28 | return zstandard.ZstdDecompressor().decompress(data) |
| 29 | |
| 30 | def decompress_if_needed(data: bytes) -> bytes: |
| 31 | """Return *data* decompressed if it carries a zlib or zstd magic header. |
| 32 | |
| 33 | Handles delta bases fetched from R2 that may have been stored compressed |
| 34 | by either the old zlib wire path or the current zstd wire path. |
| 35 | On decompression failure the original bytes are returned unchanged. |
| 36 | """ |
| 37 | if len(data) >= 4 and data[:4] == _ZSTD_MAGIC: |
| 38 | try: |
| 39 | return decompress_zstd(data) |
| 40 | except Exception: |
| 41 | pass |
| 42 | if len(data) >= 2 and data[:2] in _ZLIB_MAGIC: |
| 43 | try: |
| 44 | return zlib.decompress(data) |
| 45 | except zlib.error: |
| 46 | pass |
| 47 | return data |
| 48 | |
| 49 | def decompress_zlib(data: bytes) -> bytes: |
| 50 | """Decompress zlib-compressed *data*. |
| 51 | |
| 52 | Raises :class:`zlib.error` on corrupt input so the caller can return 400. |
| 53 | """ |
| 54 | return zlib.decompress(data) |
| 55 | |
| 56 | def apply_delta(base: bytes, delta_compressed: bytes) -> bytes: |
| 57 | """Reconstruct a target object by applying *delta_compressed* to *base*. |
| 58 | |
| 59 | The delta format is the binary instruction stream produced by |
| 60 | ``muse.core.compression.compute_delta``: |
| 61 | |
| 62 | * ``COPY``: ``b'\\x00' + struct.pack('>II', offset, length)`` |
| 63 | * ``DATA``: ``b'\\x01' + struct.pack('>I', length) + data_bytes`` |
| 64 | |
| 65 | Args: |
| 66 | base: Base object bytes (fetched from storage or content_cache). |
| 67 | delta_compressed: Zlib-compressed binary instruction stream. |
| 68 | |
| 69 | Returns: |
| 70 | Reconstructed target bytes. |
| 71 | |
| 72 | Raises: |
| 73 | zlib.error: *delta_compressed* is corrupt. |
| 74 | ValueError: Unknown instruction type in stream. |
| 75 | struct.error: Truncated instruction stream. |
| 76 | """ |
| 77 | stream = zlib.decompress(delta_compressed) |
| 78 | result = bytearray() |
| 79 | pos = 0 |
| 80 | n = len(stream) |
| 81 | |
| 82 | while pos < n: |
| 83 | instr_type = stream[pos] |
| 84 | pos += 1 |
| 85 | |
| 86 | if instr_type == 0: # COPY |
| 87 | offset, length = struct.unpack_from(">II", stream, pos) |
| 88 | pos += 8 |
| 89 | result.extend(base[offset : offset + length]) |
| 90 | |
| 91 | elif instr_type == 1: # DATA |
| 92 | (length,) = struct.unpack_from(">I", stream, pos) |
| 93 | pos += 4 |
| 94 | result.extend(stream[pos : pos + length]) |
| 95 | pos += length |
| 96 | |
| 97 | else: |
| 98 | raise ValueError( |
| 99 | f"unknown delta instruction type: {instr_type:#04x} at stream offset {pos - 1}" |
| 100 | ) |
| 101 | |
| 102 | return bytes(result) |
File History
1 commit
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2
feat: add repair-commit wire endpoint (API parity with repa…
Opus 4.8
minor
⚠
1 day ago