gabriel / muse public
io.py python
309 lines 11.2 KB
Raw
1 """muse.core.io — raw filesystem I/O primitives.
2
3 Atomic writers, msgpack readers, and zstd helpers with no domain knowledge.
4 Every function here works purely in terms of ``pathlib.Path``, ``bytes``, and
5 primitive Python types — no commit records, snapshot manifests, or tag dicts.
6
7 All atomic writes follow the same crash-safety protocol:
8 mkstemp → write → flush → fsync → rename
9
10 The fsync step is non-fatal on filesystems that do not support it (tmpfs,
11 certain Docker volumes). Atomicity (torn-write protection) still holds via
12 the rename.
13 """
14
15 from __future__ import annotations
16
17 import contextlib
18 import errno
19 import fcntl
20 import json as _json
21 import os
22 import pathlib
23 import sys
24 import tempfile
25
26 import msgpack
27
28 from muse.core.validation import assert_not_symlink
29 from muse.core.types import MsgpackValue
30
31 # ---------------------------------------------------------------------------
32 # Read safety constants
33 # ---------------------------------------------------------------------------
34
35 # Maximum size of a msgpack file that _read_msgpack will load into memory.
36 MAX_MSGPACK_BYTES: int = 64 * 1024 * 1024 # 64 MiB
37
38 # Upper bound for pack/mpack deserialization. Pack files legitimately contain
39 # many binary blobs (raw file content), so they need a larger cap.
40 MAX_PACK_MSGPACK_BYTES: int = 512 * 1024 * 1024 # 512 MiB
41
42 # Per-value limits passed to msgpack.unpackb.
43 _MSGPACK_MAX_STR_LEN: int = 1_048_576 # 1 MiB per string value
44 _MSGPACK_MAX_BIN_LEN: int = 0 # no binary blobs in store records
45 _MSGPACK_MAX_BIN_LEN_PACK: int = 256 * 1024 * 1024 # 256 MiB per blob in pack files
46 _MSGPACK_MAX_ARRAY_LEN: int = 1_000_000 # generous for large manifests
47 _MSGPACK_MAX_MAP_LEN: int = 1_000_000 # generous for 75k-file manifests
48
49 # ---------------------------------------------------------------------------
50 # Zstd compression helpers
51 # ---------------------------------------------------------------------------
52
53 # 4-byte zstd frame magic (little-endian 0xFD2FB528). Self-describing: if a
54 # file starts with these bytes it is a zstd frame, otherwise raw bytes.
55 _ZSTD_MAGIC: bytes = b"\x28\xb5\x2f\xfd"
56
57 # Only compress when the payload exceeds this threshold. Tiny payloads are
58 # left uncompressed — the overhead is not worth it.
59 _ZSTD_COMPRESS_THRESHOLD: int = 8_192 # bytes
60
61
62 def _zstd_compress(data: bytes) -> bytes:
63 """Compress *data* with zstd level 3.
64
65 Level 3 is the zstd default — good compression ratio with fast
66 decompression. The output frame begins with ``_ZSTD_MAGIC`` so callers
67 can detect compressed files without any external flag.
68 """
69 import zstandard
70 return zstandard.ZstdCompressor(level=3).compress(data)
71
72
73 def zstd_decompress_if_needed(data: bytes) -> bytes:
74 """Return *data* decompressed if it is a zstd frame; otherwise unchanged.
75
76 Detection uses the 4-byte zstd magic number, making the format
77 self-describing and backward-compatible with uncompressed legacy files.
78 """
79 if data[:4] == _ZSTD_MAGIC:
80 import zstandard
81 return zstandard.ZstdDecompressor().decompress(data)
82 return data
83
84 # ---------------------------------------------------------------------------
85 # Symlink-guard cache
86 # ---------------------------------------------------------------------------
87
88 # Parent directories that have already been validated as non-symlinks this
89 # process run. Amortises the is_symlink() stat call across many writes to the
90 # same directory (e.g. object shards, tag buckets).
91 _validated_store_parents: set[str] = set()
92
93 # ---------------------------------------------------------------------------
94 # Msgpack deserialization
95 # ---------------------------------------------------------------------------
96
97 def safe_unpackb(
98 raw: bytes,
99 *,
100 context: str = "",
101 max_bytes: int = MAX_MSGPACK_BYTES,
102 strict_map_key: bool = True,
103 allow_binary: bool = False,
104 ) -> MsgpackValue:
105 """Deserialize msgpack bytes with strict, configurable safety limits.
106
107 Single canonical entry-point for all in-memory msgpack deserialization.
108 Every callsite that receives untrusted bytes must go through here.
109
110 Args:
111 raw: Msgpack bytes to unpack.
112 context: Human-readable label for error messages.
113 max_bytes: Hard cap on ``len(raw)`` checked before any parsing.
114 strict_map_key: When ``True`` only ``str`` keys are accepted in maps.
115 Set ``False`` only for legacy formats with integer keys.
116 allow_binary: When ``False`` binary blobs are rejected. Set
117 ``True`` for pack/mpack payloads with blob content.
118
119 Raises:
120 ValueError: If ``len(raw)`` exceeds *max_bytes*.
121 msgpack.UnpackException: If the bytes are not valid msgpack.
122 msgpack.ExtraData: If there is trailing data after the top-level value.
123 """
124 n = len(raw)
125 if n > max_bytes:
126 label = f" ({context})" if context else ""
127 raise ValueError(
128 f"Msgpack payload{label} is {n:,} bytes — exceeds the "
129 f"{max_bytes // (1024 * 1024)} MiB safety cap. "
130 "Possible size-bomb or corrupted input."
131 )
132 result: MsgpackValue = msgpack.unpackb(
133 raw,
134 raw=False,
135 strict_map_key=strict_map_key,
136 max_str_len=_MSGPACK_MAX_STR_LEN,
137 max_bin_len=_MSGPACK_MAX_BIN_LEN_PACK if allow_binary else _MSGPACK_MAX_BIN_LEN,
138 max_array_len=_MSGPACK_MAX_ARRAY_LEN,
139 max_map_len=_MSGPACK_MAX_MAP_LEN,
140 )
141 return result
142
143
144 def read_msgpack_file(
145 path: pathlib.Path,
146 *,
147 max_bytes: int = MAX_MSGPACK_BYTES,
148 strict_map_key: bool = True,
149 allow_binary: bool = False,
150 ) -> MsgpackValue:
151 """Read a msgpack file and deserialize it with strict safety limits.
152
153 The file size is checked via ``os.stat`` *before* ``read_bytes()`` is
154 called, so a multi-GiB file never causes an allocation.
155
156 Raises:
157 OSError: If the file size exceeds *max_bytes*.
158 ValueError: Forwarded from :func:`safe_unpackb`.
159 msgpack.UnpackException: If the file is not valid msgpack.
160 """
161 size = path.stat().st_size
162 if size > max_bytes:
163 raise OSError(
164 f"Msgpack file {path.name!r} is {size:,} bytes — exceeds the "
165 f"{max_bytes // (1024 * 1024)} MiB safety cap. "
166 "File may be corrupt or tampered."
167 )
168 return safe_unpackb(
169 path.read_bytes(),
170 context=path.name,
171 max_bytes=max_bytes,
172 strict_map_key=strict_map_key,
173 allow_binary=allow_binary,
174 )
175
176
177 def _read_msgpack(path: pathlib.Path) -> MsgpackValue:
178 """Read and unpack a msgpack file, enforcing size and per-value limits.
179
180 Raises :exc:`OSError` if the file exceeds :data:`MAX_MSGPACK_BYTES`.
181 Callers that want ``None`` on failure should catch :exc:`Exception`.
182 """
183 size = path.stat().st_size
184 limit = MAX_MSGPACK_BYTES
185 if size > limit:
186 raise OSError(
187 f"Msgpack file {path.name!r} is {size:,} bytes — exceeds the "
188 f"{limit:,} bytes read limit."
189 )
190 raw = zstd_decompress_if_needed(path.read_bytes())
191 return safe_unpackb(raw, context=path.name, max_bytes=limit)
192
193
194 def _read_msgpack_dict(path: pathlib.Path) -> "dict[str, object]":
195 """Read a msgpack file and return the top-level mapping.
196
197 Raises :exc:`TypeError` if the deserialized value is not a ``dict``.
198 Callers that want ``None`` on failure should catch :exc:`Exception`.
199 """
200 raw = _read_msgpack(path)
201 if not isinstance(raw, dict):
202 raise TypeError(
203 f"Expected dict from {path}, got {type(raw).__name__!r}"
204 )
205 return raw # type: ignore[return-value]
206
207 # ---------------------------------------------------------------------------
208 # Atomic writers
209 # ---------------------------------------------------------------------------
210
211 def write_text_atomic(
212 path: pathlib.Path,
213 text: str,
214 encoding: str = "utf-8",
215 ) -> None:
216 """Write *text* to *path* atomically and durably.
217
218 Uses mkstemp → flush → fsync → rename so every VCS state file is
219 protected against torn writes and page-cache loss. fsync failure is
220 non-fatal on virtual filesystems (tmpfs, Docker volumes) — atomicity
221 still holds.
222
223 Raises:
224 ValueError: If *path*'s parent directory is a symlink.
225 """
226 path.parent.mkdir(parents=True, exist_ok=True)
227 assert_not_symlink(path.parent, label=f"write target parent ({path.parent.name}/)")
228 fd, tmp_str = tempfile.mkstemp(dir=path.parent, prefix=".muse-tmp-")
229 tmp = pathlib.Path(tmp_str)
230 try:
231 with os.fdopen(fd, "w", encoding=encoding) as fh:
232 fh.write(text)
233 fh.flush()
234 try:
235 os.fsync(fh.fileno())
236 except OSError as exc:
237 if exc.errno not in (errno.EINVAL, None):
238 raise
239 tmp.replace(path)
240 except OSError:
241 tmp.unlink(missing_ok=True)
242 raise
243
244
245 def _fsync_robust(fd: int) -> None:
246 """Call fsync/F_BARRIERFSYNC, ignoring EINVAL (unsupported filesystems)."""
247 try:
248 if sys.platform == "darwin":
249 try:
250 fcntl.fcntl(fd, 85) # F_BARRIERFSYNC
251 except OSError:
252 os.fsync(fd)
253 else:
254 os.fsync(fd)
255 except OSError as exc:
256 if exc.errno != errno.EINVAL:
257 raise
258
259
260 def _write_json_atomic(path: pathlib.Path, data: "dict[str, object]") -> None:
261 """Serialize *data* to JSON and atomically replace *path*.
262
263 Uses mkstemp → fsync → rename crash-safety. UTF-8 encoded compact JSON,
264 no trailing newline.
265 """
266 parent_str = str(path.parent)
267 if parent_str not in _validated_store_parents:
268 assert_not_symlink(path.parent, label=f"write target parent ({path.parent.name}/)")
269 _validated_store_parents.add(parent_str)
270 encoded = _json.dumps(data, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
271 fd, tmp_str = tempfile.mkstemp(dir=path.parent, prefix=".muse-tmp-")
272 tmp = pathlib.Path(tmp_str)
273 try:
274 with os.fdopen(fd, "wb") as fh:
275 fh.write(encoded)
276 fh.flush()
277 _fsync_robust(fh.fileno())
278 tmp.replace(path)
279 except OSError:
280 with contextlib.suppress(OSError):
281 tmp.unlink()
282 raise
283
284
285 def _write_shelf_header_atomic(path: pathlib.Path, data: "dict[str, object]") -> None:
286 """Write a shelf entry using ``shelf <size>\\0<json>`` framing.
287
288 Same git-object-style header as commits and snapshots in the unified
289 object store. Uses mkstemp+fsync+rename crash-safety.
290 """
291 parent_str = str(path.parent)
292 if parent_str not in _validated_store_parents:
293 assert_not_symlink(path.parent, label=f"write target parent ({path.parent.name}/)")
294 _validated_store_parents.add(parent_str)
295 json_bytes = _json.dumps(data, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
296 header = f"shelf {len(json_bytes)}\0".encode("utf-8")
297 payload = header + json_bytes
298 fd, tmp_str = tempfile.mkstemp(dir=path.parent, prefix=".muse-tmp-")
299 tmp = pathlib.Path(tmp_str)
300 try:
301 with os.fdopen(fd, "wb") as fh:
302 fh.write(payload)
303 fh.flush()
304 _fsync_robust(fh.fileno())
305 tmp.replace(path)
306 except OSError:
307 with contextlib.suppress(OSError):
308 tmp.unlink()
309 raise
File History 1 commit