gabriel / musehub public
test_serve_corrupt_manifest_recovery.py python
165 lines 7.1 KB
Raw
sha256:4669620efda9ff41c55bdefd1f7bfe1c239d468428744c84ead9957e5a003a53 merge: rescue snapshot-recovery hardening (c00aa21d) into d… Opus 4.8 minor ⚠ breaking 18 hours ago
1 """TDD — the serving path must RECOVER from a corrupt cached manifest_blob.
2
3 Belt-and-suspenders for the staging gabriel/muse clone failure. The corruption is
4 LEGACY data (snapshots written 2026-05-27/28 with entry_count=0 — an empty
5 manifest_blob that does not reproduce its snapshot_id). Current push code no longer
6 produces it, but the bad rows persist, and any cached corrupt manifest poisons the
7 clone: `_snap_row_to_wire` serves the corrupt manifest, the client's hash check
8 fails, the commit is dropped, and every descendant fails "parent not in mpack".
9
10 Invariant under test: when a snapshot's cached `manifest_blob` does NOT reproduce
11 its `snapshot_id`, the serving path must reconstruct the correct manifest from the
12 intact delta chain (parent manifest + this snapshot's delta_blob) rather than serve
13 the corrupt cache. This makes clone self-healing (repair-on-read) against any
14 corrupt cache, legacy or future.
15
16 RED before the hardening (serves the corrupt {} → hash mismatch); GREEN after.
17
18 Integration test against localhost (musehub @ :1337, postgres @ :5434). It pushes a
19 short chain, then *simulates the legacy corruption* by overwriting the head
20 snapshot's manifest_blob with packb({}) in the DB (unique content per run, so the
21 global content-addressed snapshot row is not shared with any other repo), and
22 restores it afterward.
23 """
24 from __future__ import annotations
25
26 import asyncio
27 import json
28 import subprocess
29 import time as _time
30 from pathlib import Path
31
32 import msgpack
33 import pytest
34 from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
35 from sqlalchemy.orm import sessionmaker
36
37 from muse.core.ids import hash_snapshot
38 from musehub.db.musehub_repo_models import MusehubCommit, MusehubSnapshot
39 from musehub.services.musehub_wire_shared import _snap_row_to_wire_s3
40
41 HUB = "https://localhost:1337"
42 DB_URL = "postgresql+asyncpg://musehub:musehub@localhost:5434/musehub"
43 REPO_ROOT = Path(__file__).parent.parent
44
45
46 def muse(*args: str, cwd: Path, timeout: int = 90) -> subprocess.CompletedProcess:
47 return subprocess.run(
48 ["muse", *args], cwd=str(cwd), capture_output=True, text=True, timeout=timeout
49 )
50
51
52 def muse_check(*args: str, cwd: Path, timeout: int = 90) -> str:
53 r = muse(*args, cwd=cwd, timeout=timeout)
54 if r.returncode != 0:
55 raise RuntimeError(f"muse {' '.join(args)} failed (rc={r.returncode}):\n{r.stderr[:600]}")
56 return r.stdout
57
58
59 def _commit_id_by_message(repo: Path, message: str) -> str:
60 commits = json.loads(muse_check("log", "--json", cwd=repo))["commits"]
61 for c in commits:
62 if (c.get("message") or "").strip() == message:
63 return c["commit_id"]
64 raise AssertionError(f"no commit with message {message!r}")
65
66
67 async def _snapshot_id_for_commit(commit_id: str) -> str:
68 engine = create_async_engine(DB_URL)
69 Session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
70 try:
71 async with Session() as s:
72 commit = await s.get(MusehubCommit, commit_id)
73 assert commit is not None and commit.snapshot_id, f"no snapshot for {commit_id[:18]}"
74 return commit.snapshot_id
75 finally:
76 await engine.dispose()
77
78
79 async def _corrupt_then_serve(snapshot_id: str):
80 """Overwrite the snapshot's manifest_blob with packb({}) (simulating the legacy
81 corruption), serve it through the wire path, then restore the original blob.
82
83 Returns (served_manifest, served_directories).
84 """
85 engine = create_async_engine(DB_URL)
86 Session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
87 # _snap_row_to_wire_s3 serves the manifest from the DB (delta-chain reconstruction);
88 # it does not use the storage backend for that path, so a backend is not required.
89 backend = None
90 try:
91 # snapshot original state
92 async with Session() as s:
93 row = await s.get(MusehubSnapshot, snapshot_id)
94 assert row is not None, f"snapshot {snapshot_id[:18]} not on server"
95 original_blob = row.manifest_blob
96 original_count = row.entry_count
97 try:
98 # inject the legacy corruption: empty manifest
99 async with Session() as s:
100 row = await s.get(MusehubSnapshot, snapshot_id)
101 row.manifest_blob = msgpack.packb({}, use_bin_type=True)
102 row.entry_count = 0
103 await s.commit()
104 # serve it through the real wire path (session-aware entry point)
105 async with Session() as s:
106 row = await s.get(MusehubSnapshot, snapshot_id)
107 wire = await _snap_row_to_wire_s3(row, backend, session=s)
108 return dict(wire.get("manifest") or {}), list(wire.get("directories") or [])
109 finally:
110 # restore — never leave the corrupt blob behind
111 async with Session() as s:
112 row = await s.get(MusehubSnapshot, snapshot_id)
113 row.manifest_blob = original_blob
114 row.entry_count = original_count
115 await s.commit()
116 finally:
117 await engine.dispose()
118
119
120 @pytest.fixture
121 def hub_repo(tmp_path: Path):
122 name = f"test-serve-recover-{tmp_path.name[-6:]}"
123 out = muse_check(
124 "hub", "repo", "create", "--name", name,
125 "--visibility", "public", "--no-init", "--hub", HUB, "--json",
126 cwd=REPO_ROOT,
127 )
128 slug = f"gabriel/{json.loads(out)['slug']}"
129 yield slug
130 muse("hub", "repo", "delete", slug, "--yes", "--hub", HUB, "--json", cwd=REPO_ROOT)
131
132
133 def test_serving_path_recovers_from_corrupt_manifest_blob(tmp_path: Path, hub_repo: str) -> None:
134 repo = tmp_path / "seed"
135 repo.mkdir()
136 muse_check("init", cwd=repo)
137
138 # Unique content per run so the content-addressed snapshot rows are not shared
139 # with any other repo (safe to corrupt/restore the global row).
140 tag = f"{tmp_path.name}-{int(_time.time())}"
141 # A -> B -> C ; C is the head (full manifest), B is delta-only, A is root.
142 for fname, msg in [("f1.txt", "A"), ("f2.txt", "B"), ("f3.txt", "C")]:
143 (repo / fname).write_text(f"{msg}-{tag}\n")
144 muse_check("code", "add", ".", cwd=repo)
145 muse_check("commit", "-m", msg, "--agent-id", "test", "--model-id", "test", cwd=repo)
146 muse_check("remote", "add", "origin", f"{HUB}/{hub_repo}", cwd=repo)
147 muse_check("push", "origin", "main", cwd=repo)
148
149 c_commit = _commit_id_by_message(repo, "C")
150 snap_id = asyncio.run(_snapshot_id_for_commit(c_commit))
151
152 served_manifest, served_dirs = asyncio.run(_corrupt_then_serve(snap_id))
153
154 # The invariant: even though the cached manifest_blob was corrupt ({}), the
155 # serving path must return a manifest that reproduces the snapshot_id by
156 # reconstructing from the delta chain.
157 assert served_manifest, (
158 "serving path returned an EMPTY manifest for a corrupt cache — it must "
159 "reconstruct from the delta chain instead of serving the corrupt blob"
160 )
161 assert hash_snapshot(served_manifest, served_dirs) == snap_id, (
162 f"served manifest does not reproduce snapshot_id {snap_id[:18]} — the "
163 f"serving path served the corrupt cached manifest instead of reconstructing "
164 f"from the delta chain.\n served paths={sorted(served_manifest)} dirs={served_dirs}"
165 )
File History 1 commit
sha256:4669620efda9ff41c55bdefd1f7bfe1c239d468428744c84ead9957e5a003a53 merge: rescue snapshot-recovery hardening (c00aa21d) into d… Opus 4.8 minor 18 hours ago