gabriel / muse public
snapshots.py python
353 lines 12.7 KB
Raw
sha256:18b983389ee1b55900fcd799bfbb496552d2e3ecded9d18cefbfef188947a12e chore: remove blob-debug test marker file Sonnet 4.6 1 day ago
1 """muse.core.snapshots — snapshot layer for the Muse VCS.
2
3 Everything that reads, writes, or queries snapshot records lives here.
4
5 Public API
6 ----------
7 SnapshotDict
8 JSON-serialisable TypedDict for SnapshotRecord wire format.
9
10 SnapshotRecord
11 Immutable snapshot dataclass with to_dict / from_dict.
12
13 SnapshotReadOk / SnapshotReadNotFound / SnapshotReadCorrupt
14 Typed result variants for read operations.
15
16 snapshot_path
17 On-disk path helper.
18
19 write_snapshot / read_snapshot
20 Core snapshot I/O.
21
22 read_snapshot_result
23 Result-typed read that distinguishes not-found from corrupt.
24
25 get_commit_snapshot_manifest / get_head_snapshot_manifest
26 Convenience helpers that follow commit → snapshot → manifest.
27 """
28 from __future__ import annotations
29
30 import datetime
31 import json as _json
32 import logging
33 import os
34 import pathlib
35 import tempfile
36 from dataclasses import dataclass, field
37 from typing import TypedDict, TypeGuard
38
39 from muse.core.io import write_text_atomic # noqa: F401 — re-exported for callers
40 from muse.core.object_store import object_path as _object_path
41 from muse.core.record_helpers import _str_dict, _str_val
42 from muse.core.snapshot import compute_snapshot_id
43 from muse.core.types import Manifest, MsgpackDict
44 from muse.core.validation import assert_not_symlink
45
46 logger = logging.getLogger(__name__)
47
48 _SNAPSHOT_SCHEMA_VERSION: int = 1
49
50
51 # ---------------------------------------------------------------------------
52 # Wire-format TypedDict
53 # ---------------------------------------------------------------------------
54
55 class SnapshotDict(TypedDict):
56 """JSON-serialisable representation of a SnapshotRecord."""
57
58 schema_version: int
59 snapshot_id: str
60 manifest: Manifest
61 directories: list[str]
62 created_at: str
63 note: str
64
65
66 # ---------------------------------------------------------------------------
67 # SnapshotRecord dataclass
68 # ---------------------------------------------------------------------------
69
70 @dataclass
71 class SnapshotRecord:
72 """An immutable snapshot record stored as a JSON object under .muse/objects/.
73
74 ``directories`` is the sorted list of workspace-relative POSIX directory
75 paths that were explicitly tracked at snapshot time. It is included in
76 the snapshot ID hash so that a directory rename produces a distinct
77 snapshot even when file contents are unchanged.
78
79 ``note`` is an optional human-readable label set at capture time.
80 """
81
82 snapshot_id: str
83 manifest: Manifest
84 directories: list[str] = field(default_factory=list)
85 created_at: datetime.datetime = field(
86 default_factory=lambda: datetime.datetime.now(datetime.timezone.utc)
87 )
88 note: str = ""
89 schema_version: int = field(default=_SNAPSHOT_SCHEMA_VERSION)
90
91 def to_dict(self) -> SnapshotDict:
92 return SnapshotDict(
93 schema_version=self.schema_version,
94 snapshot_id=self.snapshot_id,
95 manifest=self.manifest,
96 directories=list(self.directories),
97 created_at=self.created_at.isoformat(),
98 note=self.note,
99 )
100
101 @classmethod
102 def from_dict(cls, d: "MsgpackDict | SnapshotDict") -> "SnapshotRecord":
103 """Deserialise a :class:`SnapshotRecord` from a plain dict."""
104 created_at_str = _str_val(d, "created_at")
105 try:
106 created_at = datetime.datetime.fromisoformat(created_at_str)
107 except ValueError as exc:
108 raise ValueError(
109 f"Snapshot record has missing or unparseable created_at "
110 f"({created_at_str!r}): {exc}"
111 ) from exc
112 raw_dirs = d.get("directories")
113 directories = (
114 [v for v in raw_dirs if isinstance(v, str)]
115 if isinstance(raw_dirs, list)
116 else []
117 )
118 return cls(
119 snapshot_id=_str_val(d, "snapshot_id"),
120 manifest=_str_dict(d, "manifest"),
121 directories=directories,
122 created_at=created_at,
123 note=_str_val(d, "note"),
124 schema_version=int(d.get("schema_version", 1)),
125 )
126
127
128 # ---------------------------------------------------------------------------
129 # Path helper
130 # ---------------------------------------------------------------------------
131
132 def snapshot_path(repo_root: pathlib.Path, snapshot_id: str) -> pathlib.Path:
133 """Return the on-disk path for a snapshot record in the unified object store.
134
135 Path shape: ``.muse/objects/<algo>/<shard-2>/<hex-62>``
136
137 Snapshots are stored in the unified object store alongside blobs and
138 commits. The on-disk format is ``snapshot <size>\\0<json>``.
139 """
140 return _object_path(repo_root, snapshot_id)
141
142
143 # ---------------------------------------------------------------------------
144 # Internal helpers
145 # ---------------------------------------------------------------------------
146
147 def _verify_snapshot_id(
148 record: SnapshotRecord, expected_id: str, path: pathlib.Path
149 ) -> None:
150 """Re-derive the snapshot ID from the manifest and assert it matches *expected_id*.
151
152 The snapshot ID is a hash of every ``path → object_id`` pair in the
153 manifest, so any bit flip in any file path or object ID — however subtle —
154 produces a different hash. This catches the class of corruptions that
155 keep msgpack structure valid while silently altering manifest entries.
156
157 Raises:
158 OSError: If the recomputed ID does not match *expected_id*, indicating
159 silent manifest corruption.
160 """
161 recomputed = compute_snapshot_id(record.manifest, record.directories)
162 if recomputed != expected_id:
163 logger.critical(
164 "❌ Snapshot %s failed content-hash verification — "
165 "manifest entries are corrupt. Expected %s, recomputed %s. "
166 "Run `muse verify-pack` to audit the full store.",
167 expected_id,
168 expected_id,
169 recomputed,
170 )
171 raise OSError(
172 f"Snapshot {expected_id} failed content-hash verification. "
173 f"One or more manifest entries (file paths or object IDs) have "
174 f"been silently corrupted in {path.name}. "
175 "Run `muse verify-pack` to audit the full store."
176 )
177
178
179 # ---------------------------------------------------------------------------
180 # Snapshot I/O
181 # ---------------------------------------------------------------------------
182
183 def write_snapshot(repo_root: pathlib.Path, snapshot: SnapshotRecord, *, sync: bool = True) -> None:
184 """Persist a snapshot record to the unified object store."""
185 try:
186 _verify_snapshot_id(snapshot, snapshot.snapshot_id, pathlib.Path("<incoming>"))
187 except OSError as exc:
188 raise ValueError(
189 f"Refusing to write snapshot {snapshot.snapshot_id!r}: "
190 f"incoming record failed hash verification — {exc}"
191 ) from exc
192 path = snapshot_path(repo_root, snapshot.snapshot_id)
193 # Symlink guard runs before any I/O — path.exists() would resolve through
194 # a symlinked shard dir and land outside the repo.
195 if path.parent.exists():
196 assert_not_symlink(path.parent, label=f"write target parent ({path.parent.name}/)")
197 if path.exists():
198 # Purely idempotent: first writer wins. Corruption is detected at
199 # read time by read_snapshot — write_snapshot never repairs corrupt files.
200 from muse.core.types import short_id
201 logger.debug("⚠️ Snapshot %s already exists — skipped", short_id(snapshot.snapshot_id))
202 return
203 path.parent.mkdir(parents=True, exist_ok=True)
204 assert_not_symlink(path.parent, label=f"write target parent ({path.parent.name}/)")
205 json_bytes = _json.dumps(snapshot.to_dict()).encode()
206 content = f"snapshot {len(json_bytes)}\x00".encode() + json_bytes
207 fd, tmp_str = tempfile.mkstemp(dir=path.parent, prefix=".muse-tmp-")
208 tmp = pathlib.Path(tmp_str)
209 try:
210 with os.fdopen(fd, "wb") as fh:
211 fh.write(content)
212 fh.flush()
213 if sync:
214 try:
215 os.fsync(fh.fileno())
216 except OSError:
217 pass # fsync is best-effort; atomic rename already ensures durability
218 tmp.replace(path)
219 except OSError:
220 tmp.unlink(missing_ok=True)
221 raise
222 from muse.core.types import short_id
223 logger.debug(
224 "✅ Stored snapshot %s (%d files, %d dirs)",
225 short_id(snapshot.snapshot_id),
226 len(snapshot.manifest),
227 len(snapshot.directories),
228 )
229
230
231 def read_snapshot(repo_root: pathlib.Path, snapshot_id: str) -> SnapshotRecord | None:
232 """Load a snapshot record by ID, or ``None`` if it does not exist or is corrupt.
233
234 Every read re-verifies the snapshot ID by recomputing it from the stored
235 manifest. Any bit flip that alters a file path or object ID in the
236 manifest — even without breaking JSON structure — is caught here.
237
238 Callers that need to distinguish "not found" from "corrupt" should use
239 :func:`read_snapshot_result` instead.
240
241 Callers that accept user-supplied or remote-supplied snapshot IDs should
242 validate the ID with :func:`~muse.core.validation.validate_ref_id` before
243 calling this function. This function itself accepts any string to support
244 internal uses with computed IDs.
245 """
246 path = snapshot_path(repo_root, snapshot_id)
247 if not path.exists():
248 return None
249 try:
250 raw = path.read_bytes()
251 nl = raw.index(b"\x00")
252 record = SnapshotRecord.from_dict(_json.loads(raw[nl + 1:]))
253 _verify_snapshot_id(record, snapshot_id, path)
254 return record
255 except Exception as exc:
256 logger.critical("❌ Corrupt snapshot file %s: %s", path, exc)
257 return None
258
259
260 # ---------------------------------------------------------------------------
261 # Typed result variants
262 # ---------------------------------------------------------------------------
263
264 class SnapshotReadOk(TypedDict):
265 status: str
266 snapshot: SnapshotRecord
267
268
269 class SnapshotReadNotFound(TypedDict):
270 status: str
271
272
273 class SnapshotReadCorrupt(TypedDict):
274 status: str
275 path: str
276 error: str
277
278
279 def snapshot_read_is_ok(
280 r: SnapshotReadOk | SnapshotReadNotFound | SnapshotReadCorrupt,
281 ) -> TypeGuard[SnapshotReadOk]:
282 """``True`` when *r* is a successful :func:`read_snapshot_result`."""
283 return r["status"] == "ok"
284
285
286 def snapshot_read_is_corrupt(
287 r: SnapshotReadOk | SnapshotReadNotFound | SnapshotReadCorrupt,
288 ) -> TypeGuard[SnapshotReadCorrupt]:
289 """``True`` when *r* represents a corrupt snapshot file."""
290 return r["status"] == "corrupt"
291
292
293 def read_snapshot_result(
294 repo_root: pathlib.Path, snapshot_id: str
295 ) -> SnapshotReadOk | SnapshotReadNotFound | SnapshotReadCorrupt:
296 """Load a snapshot with a typed result that distinguishes all outcomes.
297
298 Returns one of:
299
300 * ``{"status": "ok", "snapshot": SnapshotRecord}``
301 * ``{"status": "not_found"}``
302 * ``{"status": "corrupt", "path": str, "error": str}``
303 """
304 path = snapshot_path(repo_root, snapshot_id)
305 if not path.exists():
306 return SnapshotReadNotFound(status="not_found")
307 try:
308 raw = path.read_bytes()
309 nl = raw.index(b"\x00")
310 record = SnapshotRecord.from_dict(_json.loads(raw[nl + 1:]))
311 _verify_snapshot_id(record, snapshot_id, path)
312 return SnapshotReadOk(status="ok", snapshot=record)
313 except Exception as exc:
314 logger.critical("❌ Corrupt snapshot file %s: %s", path, exc)
315 return SnapshotReadCorrupt(status="corrupt", path=str(path), error=str(exc))
316
317
318 # ---------------------------------------------------------------------------
319 # Manifest convenience helpers
320 # ---------------------------------------------------------------------------
321
322 def get_commit_snapshot_manifest(
323 repo_root: pathlib.Path, commit_id: str
324 ) -> Manifest | None:
325 """Return the file manifest for the snapshot attached to *commit_id*, or ``None``."""
326 from muse.core.commits import read_commit # local to avoid circular import
327 commit = read_commit(repo_root, commit_id)
328 if commit is None:
329 logger.warning("⚠️ Commit %s not found", commit_id)
330 return None
331 snapshot = read_snapshot(repo_root, commit.snapshot_id)
332 if snapshot is None:
333 logger.warning(
334 "⚠️ Snapshot %s referenced by commit %s not found",
335 commit.snapshot_id,
336 commit_id,
337 )
338 return None
339 return dict(snapshot.manifest)
340
341
342 def get_head_snapshot_manifest(
343 repo_root: pathlib.Path, branch: str
344 ) -> Manifest | None:
345 """Return the manifest of the most recent commit on *branch*, or ``None``."""
346 from muse.core.commits import get_head_snapshot_id # local to avoid circular import
347 snapshot_id = get_head_snapshot_id(repo_root, branch)
348 if snapshot_id is None:
349 return None
350 snapshot = read_snapshot(repo_root, snapshot_id)
351 if snapshot is None:
352 return None
353 return dict(snapshot.manifest)
File History 5 commits
sha256:18b983389ee1b55900fcd799bfbb496552d2e3ecded9d18cefbfef188947a12e chore: remove blob-debug test marker file Sonnet 4.6 1 day ago
sha256:e452ad9a6ace6ccc6d875a35e06caf9da5576a970c1c36133b69a891ce5fefa8 chore: prebuild timing test Sonnet 4.6 8 days ago
sha256:0008ab6695e3e064b3e236b24fd19e538fef6a588eb0d211622f4466d919c0b1 merge: pull staging/dev — advance to 0.2.0rc12 Sonnet 4.6 patch 10 days ago
sha256:9c33d61749fff814c5226d5386aa2af7064c2c02788594a25fdd709358132eea fix: _PROPOSAL_PREFIX_RESOLVE_LIMIT 200 → 100 to match hub … Sonnet 4.6 21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 24 days ago