gabriel / musehub public
compression.py python
102 lines 3.5 KB
Raw
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2 feat: add repair-commit wire endpoint (API parity with repa… Opus 4.8 minor ⚠ breaking 1 day ago
1 """Object decompression and delta reconstruction for wire transfer.
2
3 Server-side counterpart to ``muse.core.compression``. The server only needs
4 to *decompress* and *apply* — it never compresses or computes deltas.
5
6 Encoding values understood here:
7 ``"zstd"`` — zstd-compressed; call :func:`decompress_zstd`.
8 ``"zlib"`` — zlib-compressed; call :func:`decompress_zlib`.
9 ``"delta+zlib"`` — zlib-compressed delta stream; call :func:`apply_delta`.
10 """
11
12 import struct
13 import zlib
14
15 # Two-byte headers that identify any of the four standard zlib compression levels.
16 # Objects pushed via the old wire path were stored zlib-compressed in R2; new
17 # objects are stored as plain bytes. decompress_if_needed() handles both so
18 # callers never need to know which storage generation produced a given object.
19 _ZLIB_MAGIC: tuple[bytes, ...] = (b"\x78\x01", b"\x78\x9c", b"\x78\xda", b"\x78\x5e")
20 _ZSTD_MAGIC: bytes = b"\x28\xb5\x2f\xfd"
21
22 def decompress_zstd(data: bytes) -> bytes:
23 """Decompress zstd-compressed *data*.
24
25 Raises on corrupt input so the caller can return 422.
26 """
27 import zstandard
28 return zstandard.ZstdDecompressor().decompress(data)
29
30 def decompress_if_needed(data: bytes) -> bytes:
31 """Return *data* decompressed if it carries a zlib or zstd magic header.
32
33 Handles delta bases fetched from R2 that may have been stored compressed
34 by either the old zlib wire path or the current zstd wire path.
35 On decompression failure the original bytes are returned unchanged.
36 """
37 if len(data) >= 4 and data[:4] == _ZSTD_MAGIC:
38 try:
39 return decompress_zstd(data)
40 except Exception:
41 pass
42 if len(data) >= 2 and data[:2] in _ZLIB_MAGIC:
43 try:
44 return zlib.decompress(data)
45 except zlib.error:
46 pass
47 return data
48
49 def decompress_zlib(data: bytes) -> bytes:
50 """Decompress zlib-compressed *data*.
51
52 Raises :class:`zlib.error` on corrupt input so the caller can return 400.
53 """
54 return zlib.decompress(data)
55
56 def apply_delta(base: bytes, delta_compressed: bytes) -> bytes:
57 """Reconstruct a target object by applying *delta_compressed* to *base*.
58
59 The delta format is the binary instruction stream produced by
60 ``muse.core.compression.compute_delta``:
61
62 * ``COPY``: ``b'\\x00' + struct.pack('>II', offset, length)``
63 * ``DATA``: ``b'\\x01' + struct.pack('>I', length) + data_bytes``
64
65 Args:
66 base: Base object bytes (fetched from storage or content_cache).
67 delta_compressed: Zlib-compressed binary instruction stream.
68
69 Returns:
70 Reconstructed target bytes.
71
72 Raises:
73 zlib.error: *delta_compressed* is corrupt.
74 ValueError: Unknown instruction type in stream.
75 struct.error: Truncated instruction stream.
76 """
77 stream = zlib.decompress(delta_compressed)
78 result = bytearray()
79 pos = 0
80 n = len(stream)
81
82 while pos < n:
83 instr_type = stream[pos]
84 pos += 1
85
86 if instr_type == 0: # COPY
87 offset, length = struct.unpack_from(">II", stream, pos)
88 pos += 8
89 result.extend(base[offset : offset + length])
90
91 elif instr_type == 1: # DATA
92 (length,) = struct.unpack_from(">I", stream, pos)
93 pos += 4
94 result.extend(stream[pos : pos + length])
95 pos += length
96
97 else:
98 raise ValueError(
99 f"unknown delta instruction type: {instr_type:#04x} at stream offset {pos - 1}"
100 )
101
102 return bytes(result)
File History 1 commit
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2 feat: add repair-commit wire endpoint (API parity with repa… Opus 4.8 minor 1 day ago