gabriel / muse public
verify_object.py python
328 lines 10.0 KB
Raw
sha256:1c4b3e3a9a1f300774c3ee662b572a698d5fd405bf765a71e6011a2e9c3eaaaa feat: Muse — version control for the agent era Human 73 days ago
1 """muse plumbing verify-object — verify the integrity of stored objects.
2
3 Reads one or more objects from the content-addressed store and re-hashes each
4 one to confirm that its on-disk content still matches its claimed SHA-256
5 identity. Reports the result per object and exits non-zero if any object
6 fails verification.
7
8 This is the integrity primitive used by backup systems, replication agents,
9 and CI pipelines to detect silent data corruption without a full fsck.
10
11 Output (JSON, default)::
12
13 {
14 "results": [
15 {"object_id": "<sha256>", "ok": true, "size_bytes": 4096},
16 {"object_id": "<sha256>", "ok": false, "size_bytes": 512,
17 "error": "hash mismatch: stored <sha256a> recomputed <sha256b>"},
18 {"object_id": "<sha256>", "ok": false, "size_bytes": null,
19 "error": "object not found in store"}
20 ],
21 "all_ok": false,
22 "checked": 3,
23 "failed": 2
24 }
25
26 Text output (``--format text``)::
27
28 OK <sha256> (4096 bytes)
29 FAIL <sha256> hash mismatch: stored abc123… recomputed def456…
30 FAIL <sha256> object not found in store
31
32 With ``--quiet``: no output; exits 0 if all pass, exits 1 otherwise.
33
34 Plumbing contract
35 -----------------
36
37 - Exit 0: all objects verified successfully.
38 - Exit 1: one or more objects failed verification; object not found; bad args.
39 - Exit 3: unexpected I/O error (e.g. disk read failure).
40
41 Performance
42 -----------
43
44 Object size is measured by counting bytes as they are read during hashing —
45 no separate ``stat()`` call is made. For ``--all`` over a large store this
46 saves one syscall per object.
47
48 Agent use
49 ---------
50
51 Verify specific objects after a replication or backup::
52
53 muse plumbing verify-object <sha256> <sha256> --json
54
55 Full store integrity check (fsck equivalent)::
56
57 muse plumbing verify-object --all --json
58
59 Pipe object IDs from a manifest::
60
61 cat .muse/snapshot_manifest.txt | muse plumbing verify-object --stdin
62
63 Quiet mode for CI::
64
65 muse plumbing verify-object --all --quiet && echo "store intact"
66 """
67
68 from __future__ import annotations
69
70 import argparse
71 import hashlib
72 import json
73 import logging
74 import pathlib
75 import sys
76 from typing import TypedDict
77
78 from muse.core.errors import ExitCode
79 from muse.core.object_store import object_path, objects_dir
80 from muse.core.repo import require_repo
81 from muse.core.validation import sanitize_display, validate_object_id
82
83 logger = logging.getLogger(__name__)
84
85 _FORMAT_CHOICES = ("json", "text")
86 _CHUNK = 65536 # 64 KiB read chunks — keeps the heap clean for large blobs
87
88
89 class _ObjectResult(TypedDict):
90 object_id: str
91 ok: bool
92 size_bytes: int | None
93 error: str | None
94
95
96 def _iter_all_object_ids(root: pathlib.Path) -> list[str]:
97 """Walk the object store shard tree and return every stored object ID.
98
99 The store layout is ``<root>/.muse/objects/<shard2>/<remaining62>``.
100 Symlinks in the shard directories are skipped — they are not a valid
101 Muse store layout and could point outside the repository.
102
103 Returns:
104 Sorted list of 64-hex-char object IDs present on disk.
105 """
106 store = objects_dir(root)
107 if not store.exists():
108 return []
109
110 ids: list[str] = []
111 for shard_dir in sorted(store.iterdir()):
112 if shard_dir.is_symlink() or not shard_dir.is_dir():
113 continue
114 if len(shard_dir.name) != 2:
115 continue
116 prefix = shard_dir.name
117 for obj_file in sorted(shard_dir.iterdir()):
118 if obj_file.is_symlink() or not obj_file.is_file():
119 continue
120 oid = prefix + obj_file.name
121 if len(oid) == 64:
122 ids.append(oid)
123 return ids
124
125
126 def _verify_one(root: pathlib.Path, object_id: str) -> _ObjectResult:
127 """Integrity-check a single object and return its result record.
128
129 Streams the object in 64 KiB chunks to avoid loading large blobs into
130 memory. Measures size by counting bytes during hashing — no separate
131 ``stat()`` call is needed. Returns an :class:`_ObjectResult` — never
132 raises.
133 """
134 try:
135 validate_object_id(object_id)
136 except ValueError as exc:
137 return {
138 "object_id": object_id,
139 "ok": False,
140 "size_bytes": None,
141 "error": str(exc),
142 }
143
144 dest = object_path(root, object_id)
145 if not dest.exists():
146 return {
147 "object_id": object_id,
148 "ok": False,
149 "size_bytes": None,
150 "error": "object not found in store",
151 }
152
153 try:
154 size = 0
155 h = hashlib.sha256()
156 with dest.open("rb") as fh:
157 for chunk in iter(lambda: fh.read(_CHUNK), b""):
158 h.update(chunk)
159 size += len(chunk)
160 actual = h.hexdigest()
161 except OSError as exc:
162 return {
163 "object_id": object_id,
164 "ok": False,
165 "size_bytes": None,
166 "error": f"I/O error: {exc}",
167 }
168
169 if actual != object_id:
170 return {
171 "object_id": object_id,
172 "ok": False,
173 "size_bytes": size,
174 "error": (
175 f"hash mismatch: stored {object_id[:12]}… "
176 f"recomputed {actual[:12]}…"
177 ),
178 }
179
180 return {"object_id": object_id, "ok": True, "size_bytes": size, "error": None}
181
182
183 def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None:
184 """Register the verify-object subcommand."""
185 parser = subparsers.add_parser(
186 "verify-object",
187 help="Re-hash stored objects to detect data corruption.",
188 description=__doc__,
189 formatter_class=argparse.RawDescriptionHelpFormatter,
190 )
191 parser.add_argument(
192 "object_ids",
193 nargs="*",
194 help=(
195 "One or more SHA-256 object IDs to verify. "
196 "Required unless --all or --stdin is used."
197 ),
198 )
199 parser.add_argument(
200 "--all", "-a",
201 action="store_true",
202 dest="verify_all",
203 help=(
204 "Verify every object in the store — the fsck equivalent. "
205 "No object ID arguments needed. "
206 "Cannot be combined with positional object IDs."
207 ),
208 )
209 parser.add_argument(
210 "--stdin",
211 action="store_true",
212 dest="from_stdin",
213 help=(
214 "Read additional object IDs from stdin, one per line. "
215 "Blank lines and '#'-comments are skipped. "
216 "Combines with positional object ID arguments."
217 ),
218 )
219 parser.add_argument(
220 "--quiet", "-q",
221 action="store_true",
222 help="No output. Exit 0 if all objects are intact, exit 1 otherwise.",
223 )
224 parser.add_argument(
225 "--format", "-f",
226 dest="fmt",
227 default="json",
228 metavar="FORMAT",
229 help="Output format: json or text. (default: json)",
230 )
231 parser.add_argument(
232 "--json", action="store_const", const="json", dest="fmt",
233 help="Shorthand for --format json.",
234 )
235 parser.set_defaults(func=run)
236
237
238 def run(args: argparse.Namespace) -> None:
239 """Verify the integrity of one or more objects in the store.
240
241 Re-hashes each object's on-disk content and confirms it matches the
242 SHA-256 identity used as its filename. Any mismatch indicates silent
243 data corruption and is reported as a failure.
244
245 Size is counted during hashing — no separate stat() call is made.
246 """
247 fmt: str = args.fmt
248 cli_ids: list[str] = args.object_ids or []
249 verify_all: bool = args.verify_all
250 from_stdin: bool = args.from_stdin
251 quiet: bool = args.quiet
252
253 if fmt not in _FORMAT_CHOICES:
254 print(
255 json.dumps(
256 {"error": f"Unknown format {fmt!r}. Valid: {', '.join(_FORMAT_CHOICES)}"}
257 ),
258 file=sys.stderr,
259 )
260 raise SystemExit(ExitCode.USER_ERROR)
261
262 if verify_all and cli_ids:
263 print(
264 json.dumps(
265 {"error": "--all cannot be combined with explicit object ID arguments."}
266 ),
267 file=sys.stderr,
268 )
269 raise SystemExit(ExitCode.USER_ERROR)
270
271 root = require_repo()
272
273 # Collect object IDs from all sources.
274 if verify_all:
275 object_ids: list[str] = _iter_all_object_ids(root)
276 else:
277 object_ids = list(cli_ids)
278 if from_stdin:
279 for line in sys.stdin:
280 # Strip \r\n — CRLF from Windows or injection would embed \r
281 # in the ID, causing validate_object_id to reject it with a
282 # confusing error rather than a clear "malformed input" message.
283 stripped = line.rstrip("\r\n")
284 if stripped and not stripped.startswith("#"):
285 object_ids.append(stripped)
286
287 if not object_ids and not verify_all:
288 print(
289 json.dumps(
290 {"error": "At least one object ID is required (or use --all / --stdin)."}
291 ),
292 file=sys.stderr,
293 )
294 raise SystemExit(ExitCode.USER_ERROR)
295
296 results: list[_ObjectResult] = [_verify_one(root, oid) for oid in object_ids]
297 all_ok = all(r["ok"] for r in results)
298 failed_count = sum(1 for r in results if not r["ok"])
299
300 if quiet:
301 raise SystemExit(0 if all_ok else ExitCode.USER_ERROR)
302
303 if fmt == "text":
304 for r in results:
305 status = "OK " if r["ok"] else "FAIL"
306 oid_safe = sanitize_display(r["object_id"])
307 size_str = f" ({r['size_bytes']} bytes)" if r["size_bytes"] is not None else ""
308 err_str = (
309 f" {sanitize_display(r['error'])}"
310 if not r["ok"] and r["error"]
311 else ""
312 )
313 print(f"{status} {oid_safe}{size_str}{err_str}")
314 if not all_ok:
315 raise SystemExit(ExitCode.USER_ERROR)
316 return
317
318 print(
319 json.dumps({
320 "results": [dict(r) for r in results],
321 "all_ok": all_ok,
322 "checked": len(results),
323 "failed": failed_count,
324 })
325 )
326
327 if not all_ok:
328 raise SystemExit(ExitCode.USER_ERROR)
File History 1 commit
sha256:1c4b3e3a9a1f300774c3ee662b572a698d5fd405bf765a71e6011a2e9c3eaaaa feat: Muse — version control for the agent era Human 73 days ago