snapshots.py
python
sha256:18b983389ee1b55900fcd799bfbb496552d2e3ecded9d18cefbfef188947a12e
chore: remove blob-debug test marker file
Sonnet 4.6
1 day ago
| 1 | """muse.core.snapshots — snapshot layer for the Muse VCS. |
| 2 | |
| 3 | Everything that reads, writes, or queries snapshot records lives here. |
| 4 | |
| 5 | Public API |
| 6 | ---------- |
| 7 | SnapshotDict |
| 8 | JSON-serialisable TypedDict for SnapshotRecord wire format. |
| 9 | |
| 10 | SnapshotRecord |
| 11 | Immutable snapshot dataclass with to_dict / from_dict. |
| 12 | |
| 13 | SnapshotReadOk / SnapshotReadNotFound / SnapshotReadCorrupt |
| 14 | Typed result variants for read operations. |
| 15 | |
| 16 | snapshot_path |
| 17 | On-disk path helper. |
| 18 | |
| 19 | write_snapshot / read_snapshot |
| 20 | Core snapshot I/O. |
| 21 | |
| 22 | read_snapshot_result |
| 23 | Result-typed read that distinguishes not-found from corrupt. |
| 24 | |
| 25 | get_commit_snapshot_manifest / get_head_snapshot_manifest |
| 26 | Convenience helpers that follow commit → snapshot → manifest. |
| 27 | """ |
| 28 | from __future__ import annotations |
| 29 | |
| 30 | import datetime |
| 31 | import json as _json |
| 32 | import logging |
| 33 | import os |
| 34 | import pathlib |
| 35 | import tempfile |
| 36 | from dataclasses import dataclass, field |
| 37 | from typing import TypedDict, TypeGuard |
| 38 | |
| 39 | from muse.core.io import write_text_atomic # noqa: F401 — re-exported for callers |
| 40 | from muse.core.object_store import object_path as _object_path |
| 41 | from muse.core.record_helpers import _str_dict, _str_val |
| 42 | from muse.core.snapshot import compute_snapshot_id |
| 43 | from muse.core.types import Manifest, MsgpackDict |
| 44 | from muse.core.validation import assert_not_symlink |
| 45 | |
| 46 | logger = logging.getLogger(__name__) |
| 47 | |
| 48 | _SNAPSHOT_SCHEMA_VERSION: int = 1 |
| 49 | |
| 50 | |
| 51 | # --------------------------------------------------------------------------- |
| 52 | # Wire-format TypedDict |
| 53 | # --------------------------------------------------------------------------- |
| 54 | |
| 55 | class SnapshotDict(TypedDict): |
| 56 | """JSON-serialisable representation of a SnapshotRecord.""" |
| 57 | |
| 58 | schema_version: int |
| 59 | snapshot_id: str |
| 60 | manifest: Manifest |
| 61 | directories: list[str] |
| 62 | created_at: str |
| 63 | note: str |
| 64 | |
| 65 | |
| 66 | # --------------------------------------------------------------------------- |
| 67 | # SnapshotRecord dataclass |
| 68 | # --------------------------------------------------------------------------- |
| 69 | |
| 70 | @dataclass |
| 71 | class SnapshotRecord: |
| 72 | """An immutable snapshot record stored as a JSON object under .muse/objects/. |
| 73 | |
| 74 | ``directories`` is the sorted list of workspace-relative POSIX directory |
| 75 | paths that were explicitly tracked at snapshot time. It is included in |
| 76 | the snapshot ID hash so that a directory rename produces a distinct |
| 77 | snapshot even when file contents are unchanged. |
| 78 | |
| 79 | ``note`` is an optional human-readable label set at capture time. |
| 80 | """ |
| 81 | |
| 82 | snapshot_id: str |
| 83 | manifest: Manifest |
| 84 | directories: list[str] = field(default_factory=list) |
| 85 | created_at: datetime.datetime = field( |
| 86 | default_factory=lambda: datetime.datetime.now(datetime.timezone.utc) |
| 87 | ) |
| 88 | note: str = "" |
| 89 | schema_version: int = field(default=_SNAPSHOT_SCHEMA_VERSION) |
| 90 | |
| 91 | def to_dict(self) -> SnapshotDict: |
| 92 | return SnapshotDict( |
| 93 | schema_version=self.schema_version, |
| 94 | snapshot_id=self.snapshot_id, |
| 95 | manifest=self.manifest, |
| 96 | directories=list(self.directories), |
| 97 | created_at=self.created_at.isoformat(), |
| 98 | note=self.note, |
| 99 | ) |
| 100 | |
| 101 | @classmethod |
| 102 | def from_dict(cls, d: "MsgpackDict | SnapshotDict") -> "SnapshotRecord": |
| 103 | """Deserialise a :class:`SnapshotRecord` from a plain dict.""" |
| 104 | created_at_str = _str_val(d, "created_at") |
| 105 | try: |
| 106 | created_at = datetime.datetime.fromisoformat(created_at_str) |
| 107 | except ValueError as exc: |
| 108 | raise ValueError( |
| 109 | f"Snapshot record has missing or unparseable created_at " |
| 110 | f"({created_at_str!r}): {exc}" |
| 111 | ) from exc |
| 112 | raw_dirs = d.get("directories") |
| 113 | directories = ( |
| 114 | [v for v in raw_dirs if isinstance(v, str)] |
| 115 | if isinstance(raw_dirs, list) |
| 116 | else [] |
| 117 | ) |
| 118 | return cls( |
| 119 | snapshot_id=_str_val(d, "snapshot_id"), |
| 120 | manifest=_str_dict(d, "manifest"), |
| 121 | directories=directories, |
| 122 | created_at=created_at, |
| 123 | note=_str_val(d, "note"), |
| 124 | schema_version=int(d.get("schema_version", 1)), |
| 125 | ) |
| 126 | |
| 127 | |
| 128 | # --------------------------------------------------------------------------- |
| 129 | # Path helper |
| 130 | # --------------------------------------------------------------------------- |
| 131 | |
| 132 | def snapshot_path(repo_root: pathlib.Path, snapshot_id: str) -> pathlib.Path: |
| 133 | """Return the on-disk path for a snapshot record in the unified object store. |
| 134 | |
| 135 | Path shape: ``.muse/objects/<algo>/<shard-2>/<hex-62>`` |
| 136 | |
| 137 | Snapshots are stored in the unified object store alongside blobs and |
| 138 | commits. The on-disk format is ``snapshot <size>\\0<json>``. |
| 139 | """ |
| 140 | return _object_path(repo_root, snapshot_id) |
| 141 | |
| 142 | |
| 143 | # --------------------------------------------------------------------------- |
| 144 | # Internal helpers |
| 145 | # --------------------------------------------------------------------------- |
| 146 | |
| 147 | def _verify_snapshot_id( |
| 148 | record: SnapshotRecord, expected_id: str, path: pathlib.Path |
| 149 | ) -> None: |
| 150 | """Re-derive the snapshot ID from the manifest and assert it matches *expected_id*. |
| 151 | |
| 152 | The snapshot ID is a hash of every ``path → object_id`` pair in the |
| 153 | manifest, so any bit flip in any file path or object ID — however subtle — |
| 154 | produces a different hash. This catches the class of corruptions that |
| 155 | keep msgpack structure valid while silently altering manifest entries. |
| 156 | |
| 157 | Raises: |
| 158 | OSError: If the recomputed ID does not match *expected_id*, indicating |
| 159 | silent manifest corruption. |
| 160 | """ |
| 161 | recomputed = compute_snapshot_id(record.manifest, record.directories) |
| 162 | if recomputed != expected_id: |
| 163 | logger.critical( |
| 164 | "❌ Snapshot %s failed content-hash verification — " |
| 165 | "manifest entries are corrupt. Expected %s, recomputed %s. " |
| 166 | "Run `muse verify-pack` to audit the full store.", |
| 167 | expected_id, |
| 168 | expected_id, |
| 169 | recomputed, |
| 170 | ) |
| 171 | raise OSError( |
| 172 | f"Snapshot {expected_id} failed content-hash verification. " |
| 173 | f"One or more manifest entries (file paths or object IDs) have " |
| 174 | f"been silently corrupted in {path.name}. " |
| 175 | "Run `muse verify-pack` to audit the full store." |
| 176 | ) |
| 177 | |
| 178 | |
| 179 | # --------------------------------------------------------------------------- |
| 180 | # Snapshot I/O |
| 181 | # --------------------------------------------------------------------------- |
| 182 | |
| 183 | def write_snapshot(repo_root: pathlib.Path, snapshot: SnapshotRecord, *, sync: bool = True) -> None: |
| 184 | """Persist a snapshot record to the unified object store.""" |
| 185 | try: |
| 186 | _verify_snapshot_id(snapshot, snapshot.snapshot_id, pathlib.Path("<incoming>")) |
| 187 | except OSError as exc: |
| 188 | raise ValueError( |
| 189 | f"Refusing to write snapshot {snapshot.snapshot_id!r}: " |
| 190 | f"incoming record failed hash verification — {exc}" |
| 191 | ) from exc |
| 192 | path = snapshot_path(repo_root, snapshot.snapshot_id) |
| 193 | # Symlink guard runs before any I/O — path.exists() would resolve through |
| 194 | # a symlinked shard dir and land outside the repo. |
| 195 | if path.parent.exists(): |
| 196 | assert_not_symlink(path.parent, label=f"write target parent ({path.parent.name}/)") |
| 197 | if path.exists(): |
| 198 | # Purely idempotent: first writer wins. Corruption is detected at |
| 199 | # read time by read_snapshot — write_snapshot never repairs corrupt files. |
| 200 | from muse.core.types import short_id |
| 201 | logger.debug("⚠️ Snapshot %s already exists — skipped", short_id(snapshot.snapshot_id)) |
| 202 | return |
| 203 | path.parent.mkdir(parents=True, exist_ok=True) |
| 204 | assert_not_symlink(path.parent, label=f"write target parent ({path.parent.name}/)") |
| 205 | json_bytes = _json.dumps(snapshot.to_dict()).encode() |
| 206 | content = f"snapshot {len(json_bytes)}\x00".encode() + json_bytes |
| 207 | fd, tmp_str = tempfile.mkstemp(dir=path.parent, prefix=".muse-tmp-") |
| 208 | tmp = pathlib.Path(tmp_str) |
| 209 | try: |
| 210 | with os.fdopen(fd, "wb") as fh: |
| 211 | fh.write(content) |
| 212 | fh.flush() |
| 213 | if sync: |
| 214 | try: |
| 215 | os.fsync(fh.fileno()) |
| 216 | except OSError: |
| 217 | pass # fsync is best-effort; atomic rename already ensures durability |
| 218 | tmp.replace(path) |
| 219 | except OSError: |
| 220 | tmp.unlink(missing_ok=True) |
| 221 | raise |
| 222 | from muse.core.types import short_id |
| 223 | logger.debug( |
| 224 | "✅ Stored snapshot %s (%d files, %d dirs)", |
| 225 | short_id(snapshot.snapshot_id), |
| 226 | len(snapshot.manifest), |
| 227 | len(snapshot.directories), |
| 228 | ) |
| 229 | |
| 230 | |
| 231 | def read_snapshot(repo_root: pathlib.Path, snapshot_id: str) -> SnapshotRecord | None: |
| 232 | """Load a snapshot record by ID, or ``None`` if it does not exist or is corrupt. |
| 233 | |
| 234 | Every read re-verifies the snapshot ID by recomputing it from the stored |
| 235 | manifest. Any bit flip that alters a file path or object ID in the |
| 236 | manifest — even without breaking JSON structure — is caught here. |
| 237 | |
| 238 | Callers that need to distinguish "not found" from "corrupt" should use |
| 239 | :func:`read_snapshot_result` instead. |
| 240 | |
| 241 | Callers that accept user-supplied or remote-supplied snapshot IDs should |
| 242 | validate the ID with :func:`~muse.core.validation.validate_ref_id` before |
| 243 | calling this function. This function itself accepts any string to support |
| 244 | internal uses with computed IDs. |
| 245 | """ |
| 246 | path = snapshot_path(repo_root, snapshot_id) |
| 247 | if not path.exists(): |
| 248 | return None |
| 249 | try: |
| 250 | raw = path.read_bytes() |
| 251 | nl = raw.index(b"\x00") |
| 252 | record = SnapshotRecord.from_dict(_json.loads(raw[nl + 1:])) |
| 253 | _verify_snapshot_id(record, snapshot_id, path) |
| 254 | return record |
| 255 | except Exception as exc: |
| 256 | logger.critical("❌ Corrupt snapshot file %s: %s", path, exc) |
| 257 | return None |
| 258 | |
| 259 | |
| 260 | # --------------------------------------------------------------------------- |
| 261 | # Typed result variants |
| 262 | # --------------------------------------------------------------------------- |
| 263 | |
| 264 | class SnapshotReadOk(TypedDict): |
| 265 | status: str |
| 266 | snapshot: SnapshotRecord |
| 267 | |
| 268 | |
| 269 | class SnapshotReadNotFound(TypedDict): |
| 270 | status: str |
| 271 | |
| 272 | |
| 273 | class SnapshotReadCorrupt(TypedDict): |
| 274 | status: str |
| 275 | path: str |
| 276 | error: str |
| 277 | |
| 278 | |
| 279 | def snapshot_read_is_ok( |
| 280 | r: SnapshotReadOk | SnapshotReadNotFound | SnapshotReadCorrupt, |
| 281 | ) -> TypeGuard[SnapshotReadOk]: |
| 282 | """``True`` when *r* is a successful :func:`read_snapshot_result`.""" |
| 283 | return r["status"] == "ok" |
| 284 | |
| 285 | |
| 286 | def snapshot_read_is_corrupt( |
| 287 | r: SnapshotReadOk | SnapshotReadNotFound | SnapshotReadCorrupt, |
| 288 | ) -> TypeGuard[SnapshotReadCorrupt]: |
| 289 | """``True`` when *r* represents a corrupt snapshot file.""" |
| 290 | return r["status"] == "corrupt" |
| 291 | |
| 292 | |
| 293 | def read_snapshot_result( |
| 294 | repo_root: pathlib.Path, snapshot_id: str |
| 295 | ) -> SnapshotReadOk | SnapshotReadNotFound | SnapshotReadCorrupt: |
| 296 | """Load a snapshot with a typed result that distinguishes all outcomes. |
| 297 | |
| 298 | Returns one of: |
| 299 | |
| 300 | * ``{"status": "ok", "snapshot": SnapshotRecord}`` |
| 301 | * ``{"status": "not_found"}`` |
| 302 | * ``{"status": "corrupt", "path": str, "error": str}`` |
| 303 | """ |
| 304 | path = snapshot_path(repo_root, snapshot_id) |
| 305 | if not path.exists(): |
| 306 | return SnapshotReadNotFound(status="not_found") |
| 307 | try: |
| 308 | raw = path.read_bytes() |
| 309 | nl = raw.index(b"\x00") |
| 310 | record = SnapshotRecord.from_dict(_json.loads(raw[nl + 1:])) |
| 311 | _verify_snapshot_id(record, snapshot_id, path) |
| 312 | return SnapshotReadOk(status="ok", snapshot=record) |
| 313 | except Exception as exc: |
| 314 | logger.critical("❌ Corrupt snapshot file %s: %s", path, exc) |
| 315 | return SnapshotReadCorrupt(status="corrupt", path=str(path), error=str(exc)) |
| 316 | |
| 317 | |
| 318 | # --------------------------------------------------------------------------- |
| 319 | # Manifest convenience helpers |
| 320 | # --------------------------------------------------------------------------- |
| 321 | |
| 322 | def get_commit_snapshot_manifest( |
| 323 | repo_root: pathlib.Path, commit_id: str |
| 324 | ) -> Manifest | None: |
| 325 | """Return the file manifest for the snapshot attached to *commit_id*, or ``None``.""" |
| 326 | from muse.core.commits import read_commit # local to avoid circular import |
| 327 | commit = read_commit(repo_root, commit_id) |
| 328 | if commit is None: |
| 329 | logger.warning("⚠️ Commit %s not found", commit_id) |
| 330 | return None |
| 331 | snapshot = read_snapshot(repo_root, commit.snapshot_id) |
| 332 | if snapshot is None: |
| 333 | logger.warning( |
| 334 | "⚠️ Snapshot %s referenced by commit %s not found", |
| 335 | commit.snapshot_id, |
| 336 | commit_id, |
| 337 | ) |
| 338 | return None |
| 339 | return dict(snapshot.manifest) |
| 340 | |
| 341 | |
| 342 | def get_head_snapshot_manifest( |
| 343 | repo_root: pathlib.Path, branch: str |
| 344 | ) -> Manifest | None: |
| 345 | """Return the manifest of the most recent commit on *branch*, or ``None``.""" |
| 346 | from muse.core.commits import get_head_snapshot_id # local to avoid circular import |
| 347 | snapshot_id = get_head_snapshot_id(repo_root, branch) |
| 348 | if snapshot_id is None: |
| 349 | return None |
| 350 | snapshot = read_snapshot(repo_root, snapshot_id) |
| 351 | if snapshot is None: |
| 352 | return None |
| 353 | return dict(snapshot.manifest) |
File History
5 commits
sha256:18b983389ee1b55900fcd799bfbb496552d2e3ecded9d18cefbfef188947a12e
chore: remove blob-debug test marker file
Sonnet 4.6
1 day ago
sha256:e452ad9a6ace6ccc6d875a35e06caf9da5576a970c1c36133b69a891ce5fefa8
chore: prebuild timing test
Sonnet 4.6
8 days ago
sha256:0008ab6695e3e064b3e236b24fd19e538fef6a588eb0d211622f4466d919c0b1
merge: pull staging/dev — advance to 0.2.0rc12
Sonnet 4.6
patch
10 days ago
sha256:9c33d61749fff814c5226d5386aa2af7064c2c02788594a25fdd709358132eea
fix: _PROPOSAL_PREFIX_RESOLVE_LIMIT 200 → 100 to match hub …
Sonnet 4.6
21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e
fix: rename objects→blobs in push client and all stale test…
Sonnet 4.6
patch
24 days ago