io.py
python
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b
fix: try fetch/presign before fetch/mpack to avoid Cloudfla…
Sonnet 4.6
patch
7 days ago
| 1 | """muse.core.io — raw filesystem I/O primitives. |
| 2 | |
| 3 | Atomic writers, msgpack readers, and zstd helpers with no domain knowledge. |
| 4 | Every function here works purely in terms of ``pathlib.Path``, ``bytes``, and |
| 5 | primitive Python types — no commit records, snapshot manifests, or tag dicts. |
| 6 | |
| 7 | All atomic writes follow the same crash-safety protocol: |
| 8 | mkstemp → write → flush → fsync → rename |
| 9 | |
| 10 | The fsync step is non-fatal on filesystems that do not support it (tmpfs, |
| 11 | certain Docker volumes). Atomicity (torn-write protection) still holds via |
| 12 | the rename. |
| 13 | """ |
| 14 | |
| 15 | from __future__ import annotations |
| 16 | |
| 17 | import contextlib |
| 18 | import errno |
| 19 | import fcntl |
| 20 | import json as _json |
| 21 | import os |
| 22 | import pathlib |
| 23 | import sys |
| 24 | import tempfile |
| 25 | |
| 26 | import msgpack |
| 27 | |
| 28 | from muse.core.validation import assert_not_symlink |
| 29 | from muse.core.types import MsgpackValue |
| 30 | |
| 31 | # --------------------------------------------------------------------------- |
| 32 | # Read safety constants |
| 33 | # --------------------------------------------------------------------------- |
| 34 | |
| 35 | # Maximum size of a msgpack file that _read_msgpack will load into memory. |
| 36 | MAX_MSGPACK_BYTES: int = 64 * 1024 * 1024 # 64 MiB |
| 37 | |
| 38 | # Upper bound for pack/mpack deserialization. Pack files legitimately contain |
| 39 | # many binary blobs (raw file content), so they need a larger cap. |
| 40 | MAX_PACK_MSGPACK_BYTES: int = 512 * 1024 * 1024 # 512 MiB |
| 41 | |
| 42 | # Per-value limits passed to msgpack.unpackb. |
| 43 | _MSGPACK_MAX_STR_LEN: int = 1_048_576 # 1 MiB per string value |
| 44 | _MSGPACK_MAX_BIN_LEN: int = 0 # no binary blobs in store records |
| 45 | _MSGPACK_MAX_BIN_LEN_PACK: int = 256 * 1024 * 1024 # 256 MiB per blob in pack files |
| 46 | _MSGPACK_MAX_ARRAY_LEN: int = 1_000_000 # generous for large manifests |
| 47 | _MSGPACK_MAX_MAP_LEN: int = 1_000_000 # generous for 75k-file manifests |
| 48 | |
| 49 | # --------------------------------------------------------------------------- |
| 50 | # Zstd compression helpers |
| 51 | # --------------------------------------------------------------------------- |
| 52 | |
| 53 | # 4-byte zstd frame magic (little-endian 0xFD2FB528). Self-describing: if a |
| 54 | # file starts with these bytes it is a zstd frame, otherwise raw bytes. |
| 55 | _ZSTD_MAGIC: bytes = b"\x28\xb5\x2f\xfd" |
| 56 | |
| 57 | # Only compress when the payload exceeds this threshold. Tiny payloads are |
| 58 | # left uncompressed — the overhead is not worth it. |
| 59 | _ZSTD_COMPRESS_THRESHOLD: int = 8_192 # bytes |
| 60 | |
| 61 | |
| 62 | def _zstd_compress(data: bytes) -> bytes: |
| 63 | """Compress *data* with zstd level 3. |
| 64 | |
| 65 | Level 3 is the zstd default — good compression ratio with fast |
| 66 | decompression. The output frame begins with ``_ZSTD_MAGIC`` so callers |
| 67 | can detect compressed files without any external flag. |
| 68 | """ |
| 69 | import zstandard |
| 70 | return zstandard.ZstdCompressor(level=3).compress(data) |
| 71 | |
| 72 | |
| 73 | def zstd_decompress_if_needed(data: bytes) -> bytes: |
| 74 | """Return *data* decompressed if it is a zstd frame; otherwise unchanged. |
| 75 | |
| 76 | Detection uses the 4-byte zstd magic number, making the format |
| 77 | self-describing and backward-compatible with uncompressed legacy files. |
| 78 | """ |
| 79 | if data[:4] == _ZSTD_MAGIC: |
| 80 | import zstandard |
| 81 | return zstandard.ZstdDecompressor().decompress(data) |
| 82 | return data |
| 83 | |
| 84 | # --------------------------------------------------------------------------- |
| 85 | # Symlink-guard cache |
| 86 | # --------------------------------------------------------------------------- |
| 87 | |
| 88 | # Parent directories that have already been validated as non-symlinks this |
| 89 | # process run. Amortises the is_symlink() stat call across many writes to the |
| 90 | # same directory (e.g. object shards, tag buckets). |
| 91 | _validated_store_parents: set[str] = set() |
| 92 | |
| 93 | # --------------------------------------------------------------------------- |
| 94 | # Msgpack deserialization |
| 95 | # --------------------------------------------------------------------------- |
| 96 | |
| 97 | def safe_unpackb( |
| 98 | raw: bytes, |
| 99 | *, |
| 100 | context: str = "", |
| 101 | max_bytes: int = MAX_MSGPACK_BYTES, |
| 102 | strict_map_key: bool = True, |
| 103 | allow_binary: bool = False, |
| 104 | ) -> MsgpackValue: |
| 105 | """Deserialize msgpack bytes with strict, configurable safety limits. |
| 106 | |
| 107 | Single canonical entry-point for all in-memory msgpack deserialization. |
| 108 | Every callsite that receives untrusted bytes must go through here. |
| 109 | |
| 110 | Args: |
| 111 | raw: Msgpack bytes to unpack. |
| 112 | context: Human-readable label for error messages. |
| 113 | max_bytes: Hard cap on ``len(raw)`` checked before any parsing. |
| 114 | strict_map_key: When ``True`` only ``str`` keys are accepted in maps. |
| 115 | Set ``False`` only for legacy formats with integer keys. |
| 116 | allow_binary: When ``False`` binary blobs are rejected. Set |
| 117 | ``True`` for pack/mpack payloads with blob content. |
| 118 | |
| 119 | Raises: |
| 120 | ValueError: If ``len(raw)`` exceeds *max_bytes*. |
| 121 | msgpack.UnpackException: If the bytes are not valid msgpack. |
| 122 | msgpack.ExtraData: If there is trailing data after the top-level value. |
| 123 | """ |
| 124 | n = len(raw) |
| 125 | if n > max_bytes: |
| 126 | label = f" ({context})" if context else "" |
| 127 | raise ValueError( |
| 128 | f"Msgpack payload{label} is {n:,} bytes — exceeds the " |
| 129 | f"{max_bytes // (1024 * 1024)} MiB safety cap. " |
| 130 | "Possible size-bomb or corrupted input." |
| 131 | ) |
| 132 | result: MsgpackValue = msgpack.unpackb( |
| 133 | raw, |
| 134 | raw=False, |
| 135 | strict_map_key=strict_map_key, |
| 136 | max_str_len=_MSGPACK_MAX_STR_LEN, |
| 137 | max_bin_len=_MSGPACK_MAX_BIN_LEN_PACK if allow_binary else _MSGPACK_MAX_BIN_LEN, |
| 138 | max_array_len=_MSGPACK_MAX_ARRAY_LEN, |
| 139 | max_map_len=_MSGPACK_MAX_MAP_LEN, |
| 140 | ) |
| 141 | return result |
| 142 | |
| 143 | |
| 144 | def read_msgpack_file( |
| 145 | path: pathlib.Path, |
| 146 | *, |
| 147 | max_bytes: int = MAX_MSGPACK_BYTES, |
| 148 | strict_map_key: bool = True, |
| 149 | allow_binary: bool = False, |
| 150 | ) -> MsgpackValue: |
| 151 | """Read a msgpack file and deserialize it with strict safety limits. |
| 152 | |
| 153 | The file size is checked via ``os.stat`` *before* ``read_bytes()`` is |
| 154 | called, so a multi-GiB file never causes an allocation. |
| 155 | |
| 156 | Raises: |
| 157 | OSError: If the file size exceeds *max_bytes*. |
| 158 | ValueError: Forwarded from :func:`safe_unpackb`. |
| 159 | msgpack.UnpackException: If the file is not valid msgpack. |
| 160 | """ |
| 161 | size = path.stat().st_size |
| 162 | if size > max_bytes: |
| 163 | raise OSError( |
| 164 | f"Msgpack file {path.name!r} is {size:,} bytes — exceeds the " |
| 165 | f"{max_bytes // (1024 * 1024)} MiB safety cap. " |
| 166 | "File may be corrupt or tampered." |
| 167 | ) |
| 168 | return safe_unpackb( |
| 169 | path.read_bytes(), |
| 170 | context=path.name, |
| 171 | max_bytes=max_bytes, |
| 172 | strict_map_key=strict_map_key, |
| 173 | allow_binary=allow_binary, |
| 174 | ) |
| 175 | |
| 176 | |
| 177 | def _read_msgpack(path: pathlib.Path) -> MsgpackValue: |
| 178 | """Read and unpack a msgpack file, enforcing size and per-value limits. |
| 179 | |
| 180 | Raises :exc:`OSError` if the file exceeds :data:`MAX_MSGPACK_BYTES`. |
| 181 | Callers that want ``None`` on failure should catch :exc:`Exception`. |
| 182 | """ |
| 183 | size = path.stat().st_size |
| 184 | limit = MAX_MSGPACK_BYTES |
| 185 | if size > limit: |
| 186 | raise OSError( |
| 187 | f"Msgpack file {path.name!r} is {size:,} bytes — exceeds the " |
| 188 | f"{limit:,} bytes read limit." |
| 189 | ) |
| 190 | raw = zstd_decompress_if_needed(path.read_bytes()) |
| 191 | return safe_unpackb(raw, context=path.name, max_bytes=limit) |
| 192 | |
| 193 | |
| 194 | def _read_msgpack_dict(path: pathlib.Path) -> "dict[str, object]": |
| 195 | """Read a msgpack file and return the top-level mapping. |
| 196 | |
| 197 | Raises :exc:`TypeError` if the deserialized value is not a ``dict``. |
| 198 | Callers that want ``None`` on failure should catch :exc:`Exception`. |
| 199 | """ |
| 200 | raw = _read_msgpack(path) |
| 201 | if not isinstance(raw, dict): |
| 202 | raise TypeError( |
| 203 | f"Expected dict from {path}, got {type(raw).__name__!r}" |
| 204 | ) |
| 205 | return raw # type: ignore[return-value] |
| 206 | |
| 207 | # --------------------------------------------------------------------------- |
| 208 | # Atomic writers |
| 209 | # --------------------------------------------------------------------------- |
| 210 | |
| 211 | def write_text_atomic( |
| 212 | path: pathlib.Path, |
| 213 | text: str, |
| 214 | encoding: str = "utf-8", |
| 215 | ) -> None: |
| 216 | """Write *text* to *path* atomically and durably. |
| 217 | |
| 218 | Uses mkstemp → flush → fsync → rename so every VCS state file is |
| 219 | protected against torn writes and page-cache loss. fsync failure is |
| 220 | non-fatal on virtual filesystems (tmpfs, Docker volumes) — atomicity |
| 221 | still holds. |
| 222 | |
| 223 | Raises: |
| 224 | ValueError: If *path*'s parent directory is a symlink. |
| 225 | """ |
| 226 | path.parent.mkdir(parents=True, exist_ok=True) |
| 227 | assert_not_symlink(path.parent, label=f"write target parent ({path.parent.name}/)") |
| 228 | fd, tmp_str = tempfile.mkstemp(dir=path.parent, prefix=".muse-tmp-") |
| 229 | tmp = pathlib.Path(tmp_str) |
| 230 | try: |
| 231 | with os.fdopen(fd, "w", encoding=encoding) as fh: |
| 232 | fh.write(text) |
| 233 | fh.flush() |
| 234 | try: |
| 235 | os.fsync(fh.fileno()) |
| 236 | except OSError as exc: |
| 237 | if exc.errno not in (errno.EINVAL, None): |
| 238 | raise |
| 239 | tmp.replace(path) |
| 240 | except OSError: |
| 241 | tmp.unlink(missing_ok=True) |
| 242 | raise |
| 243 | |
| 244 | |
| 245 | def _fsync_robust(fd: int) -> None: |
| 246 | """Call fsync/F_BARRIERFSYNC, ignoring EINVAL (unsupported filesystems).""" |
| 247 | try: |
| 248 | if sys.platform == "darwin": |
| 249 | try: |
| 250 | fcntl.fcntl(fd, 85) # F_BARRIERFSYNC |
| 251 | except OSError: |
| 252 | os.fsync(fd) |
| 253 | else: |
| 254 | os.fsync(fd) |
| 255 | except OSError as exc: |
| 256 | if exc.errno != errno.EINVAL: |
| 257 | raise |
| 258 | |
| 259 | |
| 260 | def _write_json_atomic(path: pathlib.Path, data: "dict[str, object]") -> None: |
| 261 | """Serialize *data* to JSON and atomically replace *path*. |
| 262 | |
| 263 | Uses mkstemp → fsync → rename crash-safety. UTF-8 encoded compact JSON, |
| 264 | no trailing newline. |
| 265 | """ |
| 266 | parent_str = str(path.parent) |
| 267 | if parent_str not in _validated_store_parents: |
| 268 | assert_not_symlink(path.parent, label=f"write target parent ({path.parent.name}/)") |
| 269 | _validated_store_parents.add(parent_str) |
| 270 | encoded = _json.dumps(data, ensure_ascii=False, separators=(",", ":")).encode("utf-8") |
| 271 | fd, tmp_str = tempfile.mkstemp(dir=path.parent, prefix=".muse-tmp-") |
| 272 | tmp = pathlib.Path(tmp_str) |
| 273 | try: |
| 274 | with os.fdopen(fd, "wb") as fh: |
| 275 | fh.write(encoded) |
| 276 | fh.flush() |
| 277 | _fsync_robust(fh.fileno()) |
| 278 | tmp.replace(path) |
| 279 | except OSError: |
| 280 | with contextlib.suppress(OSError): |
| 281 | tmp.unlink() |
| 282 | raise |
| 283 | |
| 284 | |
| 285 | def _write_shelf_header_atomic(path: pathlib.Path, data: "dict[str, object]") -> None: |
| 286 | """Write a shelf entry using ``shelf <size>\\0<json>`` framing. |
| 287 | |
| 288 | Same git-object-style header as commits and snapshots in the unified |
| 289 | object store. Uses mkstemp+fsync+rename crash-safety. |
| 290 | """ |
| 291 | parent_str = str(path.parent) |
| 292 | if parent_str not in _validated_store_parents: |
| 293 | assert_not_symlink(path.parent, label=f"write target parent ({path.parent.name}/)") |
| 294 | _validated_store_parents.add(parent_str) |
| 295 | json_bytes = _json.dumps(data, ensure_ascii=False, separators=(",", ":")).encode("utf-8") |
| 296 | header = f"shelf {len(json_bytes)}\0".encode("utf-8") |
| 297 | payload = header + json_bytes |
| 298 | fd, tmp_str = tempfile.mkstemp(dir=path.parent, prefix=".muse-tmp-") |
| 299 | tmp = pathlib.Path(tmp_str) |
| 300 | try: |
| 301 | with os.fdopen(fd, "wb") as fh: |
| 302 | fh.write(payload) |
| 303 | fh.flush() |
| 304 | _fsync_robust(fh.fileno()) |
| 305 | tmp.replace(path) |
| 306 | except OSError: |
| 307 | with contextlib.suppress(OSError): |
| 308 | tmp.unlink() |
| 309 | raise |
File History
1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b
fix: try fetch/presign before fetch/mpack to avoid Cloudfla…
Sonnet 4.6
patch
7 days ago