gabriel / musehub public

test_serve_corrupt_manifest_recovery.py file-level

at sha256:4 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 💥 blast risk
sha256:9 docs: document all 6 HD path segments; fix deep link anchors Three new… · gabriel · Jun 17, 2026
1 """TDD — the serving path must RECOVER from a corrupt cached manifest_blob.
2
3 Belt-and-suspenders for the staging gabriel/muse clone failure. The corruption is
4 LEGACY data (snapshots written 2026-05-27/28 with entry_count=0 — an empty
5 manifest_blob that does not reproduce its snapshot_id). Current push code no longer
6 produces it, but the bad rows persist, and any cached corrupt manifest poisons the
7 clone: `_snap_row_to_wire` serves the corrupt manifest, the client's hash check
8 fails, the commit is dropped, and every descendant fails "parent not in mpack".
9
10 Invariant under test: when a snapshot's cached `manifest_blob` does NOT reproduce
11 its `snapshot_id`, the serving path must reconstruct the correct manifest from the
12 intact delta chain (parent manifest + this snapshot's delta_blob) rather than serve
13 the corrupt cache. This makes clone self-healing (repair-on-read) against any
14 corrupt cache, legacy or future.
15
16 RED before the hardening (serves the corrupt {} → hash mismatch); GREEN after.
17
18 Integration test against localhost (musehub @ :1337, postgres @ :5434). It pushes a
19 short chain, then *simulates the legacy corruption* by overwriting the head
20 snapshot's manifest_blob with packb({}) in the DB (unique content per run, so the
21 global content-addressed snapshot row is not shared with any other repo), and
22 restores it afterward.
23 """
24 from __future__ import annotations
25
26 import asyncio
27 import json
28 import subprocess
29 import time as _time
30 from collections.abc import Iterator
31 from pathlib import Path
32
33 import msgpack
34 import pytest
35 from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
36 from sqlalchemy.orm import sessionmaker
37
38 from muse.core.ids import hash_snapshot
39 from musehub.db.musehub_repo_models import MusehubCommit, MusehubSnapshot
40 from musehub.services.musehub_wire_shared import _snap_row_to_wire_s3
41 from musehub.types.json_types import StrDict
42
43 HUB = "https://localhost:1337"
44 DB_URL = "postgresql+asyncpg://musehub:musehub@localhost:5434/musehub"
45 REPO_ROOT = Path(__file__).parent.parent
46
47
48 def muse(*args: str, cwd: Path, timeout: int = 90) -> subprocess.CompletedProcess:
49 return subprocess.run(
50 ["muse", *args], cwd=str(cwd), capture_output=True, text=True, timeout=timeout
51 )
52
53
54 def muse_check(*args: str, cwd: Path, timeout: int = 90) -> str:
55 r = muse(*args, cwd=cwd, timeout=timeout)
56 if r.returncode != 0:
57 raise RuntimeError(f"muse {' '.join(args)} failed (rc={r.returncode}):\n{r.stderr[:600]}")
58 return r.stdout
59
60
61 def _commit_id_by_message(repo: Path, message: str) -> str:
62 commits = json.loads(muse_check("log", "--json", cwd=repo))["commits"]
63 for c in commits:
64 if (c.get("message") or "").strip() == message:
65 return c["commit_id"]
66 raise AssertionError(f"no commit with message {message!r}")
67
68
69 async def _snapshot_id_for_commit(commit_id: str) -> str:
70 engine = create_async_engine(DB_URL)
71 Session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
72 try:
73 async with Session() as s:
74 commit = await s.get(MusehubCommit, commit_id)
75 assert commit is not None and commit.snapshot_id, f"no snapshot for {commit_id[:18]}"
76 return commit.snapshot_id
77 finally:
78 await engine.dispose()
79
80
81 async def _corrupt_then_serve(snapshot_id: str) -> tuple[StrDict, list[str]]:
82 """Overwrite the snapshot's manifest_blob with packb({}) (simulating the legacy
83 corruption), serve it through the wire path, then restore the original blob.
84
85 Returns (served_manifest, served_directories).
86 """
87 engine = create_async_engine(DB_URL)
88 Session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
89 # _snap_row_to_wire_s3 serves the manifest from the DB (delta-chain reconstruction);
90 # it does not use the storage backend for that path, so a backend is not required.
91 backend = None
92 try:
93 # snapshot original state
94 async with Session() as s:
95 row = await s.get(MusehubSnapshot, snapshot_id)
96 assert row is not None, f"snapshot {snapshot_id[:18]} not on server"
97 original_blob = row.manifest_blob
98 original_count = row.entry_count
99 try:
100 # inject the legacy corruption: empty manifest
101 async with Session() as s:
102 row = await s.get(MusehubSnapshot, snapshot_id)
103 row.manifest_blob = msgpack.packb({}, use_bin_type=True)
104 row.entry_count = 0
105 await s.commit()
106 # serve it through the real wire path (session-aware entry point)
107 async with Session() as s:
108 row = await s.get(MusehubSnapshot, snapshot_id)
109 wire = await _snap_row_to_wire_s3(row, backend, session=s)
110 return dict(wire.get("manifest") or {}), list(wire.get("directories") or [])
111 finally:
112 # restore — never leave the corrupt blob behind
113 async with Session() as s:
114 row = await s.get(MusehubSnapshot, snapshot_id)
115 row.manifest_blob = original_blob
116 row.entry_count = original_count
117 await s.commit()
118 finally:
119 await engine.dispose()
120
121
122 @pytest.fixture
123 def hub_repo(tmp_path: Path) -> Iterator[str]:
124 name = f"test-serve-recover-{tmp_path.name[-6:]}"
125 out = muse_check(
126 "hub", "repo", "create", "--name", name,
127 "--visibility", "public", "--no-init", "--hub", HUB, "--json",
128 cwd=REPO_ROOT,
129 )
130 slug = f"gabriel/{json.loads(out)['slug']}"
131 yield slug
132 muse("hub", "repo", "delete", slug, "--yes", "--hub", HUB, "--json", cwd=REPO_ROOT)
133
134
135 def test_serving_path_recovers_from_corrupt_manifest_blob(tmp_path: Path, hub_repo: str) -> None:
136 repo = tmp_path / "seed"
137 repo.mkdir()
138 muse_check("init", cwd=repo)
139
140 # Unique content per run so the content-addressed snapshot rows are not shared
141 # with any other repo (safe to corrupt/restore the global row).
142 tag = f"{tmp_path.name}-{int(_time.time())}"
143 # A -> B -> C ; C is the head (full manifest), B is delta-only, A is root.
144 for fname, msg in [("f1.txt", "A"), ("f2.txt", "B"), ("f3.txt", "C")]:
145 (repo / fname).write_text(f"{msg}-{tag}\n")
146 muse_check("code", "add", ".", cwd=repo)
147 muse_check("commit", "-m", msg, "--agent-id", "test", "--model-id", "test", cwd=repo)
148 muse_check("remote", "add", "origin", f"{HUB}/{hub_repo}", cwd=repo)
149 muse_check("push", "origin", "main", cwd=repo)
150
151 c_commit = _commit_id_by_message(repo, "C")
152 snap_id = asyncio.run(_snapshot_id_for_commit(c_commit))
153
154 served_manifest, served_dirs = asyncio.run(_corrupt_then_serve(snap_id))
155
156 # The invariant: even though the cached manifest_blob was corrupt ({}), the
157 # serving path must return a manifest that reproduces the snapshot_id by
158 # reconstructing from the delta chain.
159 assert served_manifest, (
160 "serving path returned an EMPTY manifest for a corrupt cache — it must "
161 "reconstruct from the delta chain instead of serving the corrupt blob"
162 )
163 assert hash_snapshot(served_manifest, served_dirs) == snap_id, (
164 f"served manifest does not reproduce snapshot_id {snap_id[:18]} — the "
165 f"serving path served the corrupt cached manifest instead of reconstructing "
166 f"from the delta chain.\n served paths={sorted(served_manifest)} dirs={served_dirs}"
167 )