"""muse.core.snapshots — snapshot layer for the Muse VCS. Everything that reads, writes, or queries snapshot records lives here. Public API ---------- SnapshotDict JSON-serialisable TypedDict for SnapshotRecord wire format. SnapshotRecord Immutable snapshot dataclass with to_dict / from_dict. SnapshotReadOk / SnapshotReadNotFound / SnapshotReadCorrupt Typed result variants for read operations. snapshot_path On-disk path helper. write_snapshot / read_snapshot Core snapshot I/O. read_snapshot_result Result-typed read that distinguishes not-found from corrupt. get_commit_snapshot_manifest / get_head_snapshot_manifest Convenience helpers that follow commit → snapshot → manifest. """ from __future__ import annotations import datetime import json as _json import logging import os import pathlib import tempfile from dataclasses import dataclass, field from typing import TypedDict, TypeGuard from muse.core.io import write_text_atomic # noqa: F401 — re-exported for callers from muse.core.object_store import object_path as _object_path from muse.core.record_helpers import _str_dict, _str_val from muse.core.snapshot import compute_snapshot_id from muse.core.types import Manifest, MsgpackDict from muse.core.validation import assert_not_symlink logger = logging.getLogger(__name__) _SNAPSHOT_SCHEMA_VERSION: int = 1 # --------------------------------------------------------------------------- # Wire-format TypedDict # --------------------------------------------------------------------------- class SnapshotDict(TypedDict): """JSON-serialisable representation of a SnapshotRecord.""" schema_version: int snapshot_id: str manifest: Manifest directories: list[str] created_at: str note: str # --------------------------------------------------------------------------- # SnapshotRecord dataclass # --------------------------------------------------------------------------- @dataclass class SnapshotRecord: """An immutable snapshot record stored as a JSON object under .muse/objects/. ``directories`` is the sorted list of workspace-relative POSIX directory paths that were explicitly tracked at snapshot time. It is included in the snapshot ID hash so that a directory rename produces a distinct snapshot even when file contents are unchanged. ``note`` is an optional human-readable label set at capture time. """ snapshot_id: str manifest: Manifest directories: list[str] = field(default_factory=list) created_at: datetime.datetime = field( default_factory=lambda: datetime.datetime.now(datetime.timezone.utc) ) note: str = "" schema_version: int = field(default=_SNAPSHOT_SCHEMA_VERSION) def to_dict(self) -> SnapshotDict: return SnapshotDict( schema_version=self.schema_version, snapshot_id=self.snapshot_id, manifest=self.manifest, directories=list(self.directories), created_at=self.created_at.isoformat(), note=self.note, ) @classmethod def from_dict(cls, d: "MsgpackDict | SnapshotDict") -> "SnapshotRecord": """Deserialise a :class:`SnapshotRecord` from a plain dict.""" created_at_str = _str_val(d, "created_at") try: created_at = datetime.datetime.fromisoformat(created_at_str) except ValueError as exc: raise ValueError( f"Snapshot record has missing or unparseable created_at " f"({created_at_str!r}): {exc}" ) from exc raw_dirs = d.get("directories") directories = ( [v for v in raw_dirs if isinstance(v, str)] if isinstance(raw_dirs, list) else [] ) return cls( snapshot_id=_str_val(d, "snapshot_id"), manifest=_str_dict(d, "manifest"), directories=directories, created_at=created_at, note=_str_val(d, "note"), schema_version=int(d.get("schema_version", 1)), ) # --------------------------------------------------------------------------- # Path helper # --------------------------------------------------------------------------- def snapshot_path(repo_root: pathlib.Path, snapshot_id: str) -> pathlib.Path: """Return the on-disk path for a snapshot record in the unified object store. Path shape: ``.muse/objects///`` Snapshots are stored in the unified object store alongside blobs and commits. The on-disk format is ``snapshot \\0``. """ return _object_path(repo_root, snapshot_id) # --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _verify_snapshot_id( record: SnapshotRecord, expected_id: str, path: pathlib.Path ) -> None: """Re-derive the snapshot ID from the manifest and assert it matches *expected_id*. The snapshot ID is a hash of every ``path → object_id`` pair in the manifest, so any bit flip in any file path or object ID — however subtle — produces a different hash. This catches the class of corruptions that keep msgpack structure valid while silently altering manifest entries. Raises: OSError: If the recomputed ID does not match *expected_id*, indicating silent manifest corruption. """ recomputed = compute_snapshot_id(record.manifest, record.directories) if recomputed != expected_id: logger.critical( "❌ Snapshot %s failed content-hash verification — " "manifest entries are corrupt. Expected %s, recomputed %s. " "Run `muse verify-pack` to audit the full store.", expected_id, expected_id, recomputed, ) raise OSError( f"Snapshot {expected_id} failed content-hash verification. " f"One or more manifest entries (file paths or object IDs) have " f"been silently corrupted in {path.name}. " "Run `muse verify-pack` to audit the full store." ) # --------------------------------------------------------------------------- # Snapshot I/O # --------------------------------------------------------------------------- def write_snapshot(repo_root: pathlib.Path, snapshot: SnapshotRecord, *, sync: bool = True) -> None: """Persist a snapshot record to the unified object store.""" try: _verify_snapshot_id(snapshot, snapshot.snapshot_id, pathlib.Path("")) except OSError as exc: raise ValueError( f"Refusing to write snapshot {snapshot.snapshot_id!r}: " f"incoming record failed hash verification — {exc}" ) from exc path = snapshot_path(repo_root, snapshot.snapshot_id) # Symlink guard runs before any I/O — path.exists() would resolve through # a symlinked shard dir and land outside the repo. if path.parent.exists(): assert_not_symlink(path.parent, label=f"write target parent ({path.parent.name}/)") if path.exists(): # Purely idempotent: first writer wins. Corruption is detected at # read time by read_snapshot — write_snapshot never repairs corrupt files. from muse.core.types import short_id logger.debug("⚠️ Snapshot %s already exists — skipped", short_id(snapshot.snapshot_id)) return path.parent.mkdir(parents=True, exist_ok=True) assert_not_symlink(path.parent, label=f"write target parent ({path.parent.name}/)") json_bytes = _json.dumps(snapshot.to_dict()).encode() content = f"snapshot {len(json_bytes)}\x00".encode() + json_bytes fd, tmp_str = tempfile.mkstemp(dir=path.parent, prefix=".muse-tmp-") tmp = pathlib.Path(tmp_str) try: with os.fdopen(fd, "wb") as fh: fh.write(content) fh.flush() if sync: try: os.fsync(fh.fileno()) except OSError: pass # fsync is best-effort; atomic rename already ensures durability tmp.replace(path) except OSError: tmp.unlink(missing_ok=True) raise from muse.core.types import short_id logger.debug( "✅ Stored snapshot %s (%d files, %d dirs)", short_id(snapshot.snapshot_id), len(snapshot.manifest), len(snapshot.directories), ) def read_snapshot(repo_root: pathlib.Path, snapshot_id: str) -> SnapshotRecord | None: """Load a snapshot record by ID, or ``None`` if it does not exist or is corrupt. Every read re-verifies the snapshot ID by recomputing it from the stored manifest. Any bit flip that alters a file path or object ID in the manifest — even without breaking JSON structure — is caught here. Callers that need to distinguish "not found" from "corrupt" should use :func:`read_snapshot_result` instead. Callers that accept user-supplied or remote-supplied snapshot IDs should validate the ID with :func:`~muse.core.validation.validate_ref_id` before calling this function. This function itself accepts any string to support internal uses with computed IDs. """ path = snapshot_path(repo_root, snapshot_id) if not path.exists(): return None try: raw = path.read_bytes() nl = raw.index(b"\x00") record = SnapshotRecord.from_dict(_json.loads(raw[nl + 1:])) _verify_snapshot_id(record, snapshot_id, path) return record except Exception as exc: logger.critical("❌ Corrupt snapshot file %s: %s", path, exc) return None # --------------------------------------------------------------------------- # Typed result variants # --------------------------------------------------------------------------- class SnapshotReadOk(TypedDict): status: str snapshot: SnapshotRecord class SnapshotReadNotFound(TypedDict): status: str class SnapshotReadCorrupt(TypedDict): status: str path: str error: str def snapshot_read_is_ok( r: SnapshotReadOk | SnapshotReadNotFound | SnapshotReadCorrupt, ) -> TypeGuard[SnapshotReadOk]: """``True`` when *r* is a successful :func:`read_snapshot_result`.""" return r["status"] == "ok" def snapshot_read_is_corrupt( r: SnapshotReadOk | SnapshotReadNotFound | SnapshotReadCorrupt, ) -> TypeGuard[SnapshotReadCorrupt]: """``True`` when *r* represents a corrupt snapshot file.""" return r["status"] == "corrupt" def read_snapshot_result( repo_root: pathlib.Path, snapshot_id: str ) -> SnapshotReadOk | SnapshotReadNotFound | SnapshotReadCorrupt: """Load a snapshot with a typed result that distinguishes all outcomes. Returns one of: * ``{"status": "ok", "snapshot": SnapshotRecord}`` * ``{"status": "not_found"}`` * ``{"status": "corrupt", "path": str, "error": str}`` """ path = snapshot_path(repo_root, snapshot_id) if not path.exists(): return SnapshotReadNotFound(status="not_found") try: raw = path.read_bytes() nl = raw.index(b"\x00") record = SnapshotRecord.from_dict(_json.loads(raw[nl + 1:])) _verify_snapshot_id(record, snapshot_id, path) return SnapshotReadOk(status="ok", snapshot=record) except Exception as exc: logger.critical("❌ Corrupt snapshot file %s: %s", path, exc) return SnapshotReadCorrupt(status="corrupt", path=str(path), error=str(exc)) # --------------------------------------------------------------------------- # Manifest convenience helpers # --------------------------------------------------------------------------- def get_commit_snapshot_manifest( repo_root: pathlib.Path, commit_id: str ) -> Manifest | None: """Return the file manifest for the snapshot attached to *commit_id*, or ``None``.""" from muse.core.commits import read_commit # local to avoid circular import commit = read_commit(repo_root, commit_id) if commit is None: logger.warning("⚠️ Commit %s not found", commit_id) return None snapshot = read_snapshot(repo_root, commit.snapshot_id) if snapshot is None: logger.warning( "⚠️ Snapshot %s referenced by commit %s not found", commit.snapshot_id, commit_id, ) return None return dict(snapshot.manifest) def get_head_snapshot_manifest( repo_root: pathlib.Path, branch: str ) -> Manifest | None: """Return the manifest of the most recent commit on *branch*, or ``None``.""" from muse.core.commits import get_head_snapshot_id # local to avoid circular import snapshot_id = get_head_snapshot_id(repo_root, branch) if snapshot_id is None: return None snapshot = read_snapshot(repo_root, snapshot_id) if snapshot is None: return None return dict(snapshot.manifest)