test_serve_corrupt_manifest_recovery.py
file-level
1
files
1
commits
0
hotspots
0
🧊 dead
0
💥 blast risk
| 1 | """TDD — the serving path must RECOVER from a corrupt cached manifest_blob. |
| 2 | |
| 3 | Belt-and-suspenders for the staging gabriel/muse clone failure. The corruption is |
| 4 | LEGACY data (snapshots written 2026-05-27/28 with entry_count=0 — an empty |
| 5 | manifest_blob that does not reproduce its snapshot_id). Current push code no longer |
| 6 | produces it, but the bad rows persist, and any cached corrupt manifest poisons the |
| 7 | clone: `_snap_row_to_wire` serves the corrupt manifest, the client's hash check |
| 8 | fails, the commit is dropped, and every descendant fails "parent not in mpack". |
| 9 | |
| 10 | Invariant under test: when a snapshot's cached `manifest_blob` does NOT reproduce |
| 11 | its `snapshot_id`, the serving path must reconstruct the correct manifest from the |
| 12 | intact delta chain (parent manifest + this snapshot's delta_blob) rather than serve |
| 13 | the corrupt cache. This makes clone self-healing (repair-on-read) against any |
| 14 | corrupt cache, legacy or future. |
| 15 | |
| 16 | RED before the hardening (serves the corrupt {} → hash mismatch); GREEN after. |
| 17 | |
| 18 | Integration test against localhost (musehub @ :1337, postgres @ :5434). It pushes a |
| 19 | short chain, then *simulates the legacy corruption* by overwriting the head |
| 20 | snapshot's manifest_blob with packb({}) in the DB (unique content per run, so the |
| 21 | global content-addressed snapshot row is not shared with any other repo), and |
| 22 | restores it afterward. |
| 23 | """ |
| 24 | from __future__ import annotations |
| 25 | |
| 26 | import asyncio |
| 27 | import json |
| 28 | import subprocess |
| 29 | import time as _time |
| 30 | from collections.abc import Iterator |
| 31 | from pathlib import Path |
| 32 | |
| 33 | import msgpack |
| 34 | import pytest |
| 35 | from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine |
| 36 | from sqlalchemy.orm import sessionmaker |
| 37 | |
| 38 | from muse.core.ids import hash_snapshot |
| 39 | from musehub.db.musehub_repo_models import MusehubCommit, MusehubSnapshot |
| 40 | from musehub.services.musehub_wire_shared import _snap_row_to_wire_s3 |
| 41 | from musehub.types.json_types import StrDict |
| 42 | |
| 43 | HUB = "https://localhost:1337" |
| 44 | DB_URL = "postgresql+asyncpg://musehub:musehub@localhost:5434/musehub" |
| 45 | REPO_ROOT = Path(__file__).parent.parent |
| 46 | |
| 47 | |
| 48 | def muse(*args: str, cwd: Path, timeout: int = 90) -> subprocess.CompletedProcess: |
| 49 | return subprocess.run( |
| 50 | ["muse", *args], cwd=str(cwd), capture_output=True, text=True, timeout=timeout |
| 51 | ) |
| 52 | |
| 53 | |
| 54 | def muse_check(*args: str, cwd: Path, timeout: int = 90) -> str: |
| 55 | r = muse(*args, cwd=cwd, timeout=timeout) |
| 56 | if r.returncode != 0: |
| 57 | raise RuntimeError(f"muse {' '.join(args)} failed (rc={r.returncode}):\n{r.stderr[:600]}") |
| 58 | return r.stdout |
| 59 | |
| 60 | |
| 61 | def _commit_id_by_message(repo: Path, message: str) -> str: |
| 62 | commits = json.loads(muse_check("log", "--json", cwd=repo))["commits"] |
| 63 | for c in commits: |
| 64 | if (c.get("message") or "").strip() == message: |
| 65 | return c["commit_id"] |
| 66 | raise AssertionError(f"no commit with message {message!r}") |
| 67 | |
| 68 | |
| 69 | async def _snapshot_id_for_commit(commit_id: str) -> str: |
| 70 | engine = create_async_engine(DB_URL) |
| 71 | Session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) |
| 72 | try: |
| 73 | async with Session() as s: |
| 74 | commit = await s.get(MusehubCommit, commit_id) |
| 75 | assert commit is not None and commit.snapshot_id, f"no snapshot for {commit_id[:18]}" |
| 76 | return commit.snapshot_id |
| 77 | finally: |
| 78 | await engine.dispose() |
| 79 | |
| 80 | |
| 81 | async def _corrupt_then_serve(snapshot_id: str) -> tuple[StrDict, list[str]]: |
| 82 | """Overwrite the snapshot's manifest_blob with packb({}) (simulating the legacy |
| 83 | corruption), serve it through the wire path, then restore the original blob. |
| 84 | |
| 85 | Returns (served_manifest, served_directories). |
| 86 | """ |
| 87 | engine = create_async_engine(DB_URL) |
| 88 | Session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) |
| 89 | # _snap_row_to_wire_s3 serves the manifest from the DB (delta-chain reconstruction); |
| 90 | # it does not use the storage backend for that path, so a backend is not required. |
| 91 | backend = None |
| 92 | try: |
| 93 | # snapshot original state |
| 94 | async with Session() as s: |
| 95 | row = await s.get(MusehubSnapshot, snapshot_id) |
| 96 | assert row is not None, f"snapshot {snapshot_id[:18]} not on server" |
| 97 | original_blob = row.manifest_blob |
| 98 | original_count = row.entry_count |
| 99 | try: |
| 100 | # inject the legacy corruption: empty manifest |
| 101 | async with Session() as s: |
| 102 | row = await s.get(MusehubSnapshot, snapshot_id) |
| 103 | row.manifest_blob = msgpack.packb({}, use_bin_type=True) |
| 104 | row.entry_count = 0 |
| 105 | await s.commit() |
| 106 | # serve it through the real wire path (session-aware entry point) |
| 107 | async with Session() as s: |
| 108 | row = await s.get(MusehubSnapshot, snapshot_id) |
| 109 | wire = await _snap_row_to_wire_s3(row, backend, session=s) |
| 110 | return dict(wire.get("manifest") or {}), list(wire.get("directories") or []) |
| 111 | finally: |
| 112 | # restore — never leave the corrupt blob behind |
| 113 | async with Session() as s: |
| 114 | row = await s.get(MusehubSnapshot, snapshot_id) |
| 115 | row.manifest_blob = original_blob |
| 116 | row.entry_count = original_count |
| 117 | await s.commit() |
| 118 | finally: |
| 119 | await engine.dispose() |
| 120 | |
| 121 | |
| 122 | @pytest.fixture |
| 123 | def hub_repo(tmp_path: Path) -> Iterator[str]: |
| 124 | name = f"test-serve-recover-{tmp_path.name[-6:]}" |
| 125 | out = muse_check( |
| 126 | "hub", "repo", "create", "--name", name, |
| 127 | "--visibility", "public", "--no-init", "--hub", HUB, "--json", |
| 128 | cwd=REPO_ROOT, |
| 129 | ) |
| 130 | slug = f"gabriel/{json.loads(out)['slug']}" |
| 131 | yield slug |
| 132 | muse("hub", "repo", "delete", slug, "--yes", "--hub", HUB, "--json", cwd=REPO_ROOT) |
| 133 | |
| 134 | |
| 135 | def test_serving_path_recovers_from_corrupt_manifest_blob(tmp_path: Path, hub_repo: str) -> None: |
| 136 | repo = tmp_path / "seed" |
| 137 | repo.mkdir() |
| 138 | muse_check("init", cwd=repo) |
| 139 | |
| 140 | # Unique content per run so the content-addressed snapshot rows are not shared |
| 141 | # with any other repo (safe to corrupt/restore the global row). |
| 142 | tag = f"{tmp_path.name}-{int(_time.time())}" |
| 143 | # A -> B -> C ; C is the head (full manifest), B is delta-only, A is root. |
| 144 | for fname, msg in [("f1.txt", "A"), ("f2.txt", "B"), ("f3.txt", "C")]: |
| 145 | (repo / fname).write_text(f"{msg}-{tag}\n") |
| 146 | muse_check("code", "add", ".", cwd=repo) |
| 147 | muse_check("commit", "-m", msg, "--agent-id", "test", "--model-id", "test", cwd=repo) |
| 148 | muse_check("remote", "add", "origin", f"{HUB}/{hub_repo}", cwd=repo) |
| 149 | muse_check("push", "origin", "main", cwd=repo) |
| 150 | |
| 151 | c_commit = _commit_id_by_message(repo, "C") |
| 152 | snap_id = asyncio.run(_snapshot_id_for_commit(c_commit)) |
| 153 | |
| 154 | served_manifest, served_dirs = asyncio.run(_corrupt_then_serve(snap_id)) |
| 155 | |
| 156 | # The invariant: even though the cached manifest_blob was corrupt ({}), the |
| 157 | # serving path must return a manifest that reproduces the snapshot_id by |
| 158 | # reconstructing from the delta chain. |
| 159 | assert served_manifest, ( |
| 160 | "serving path returned an EMPTY manifest for a corrupt cache — it must " |
| 161 | "reconstruct from the delta chain instead of serving the corrupt blob" |
| 162 | ) |
| 163 | assert hash_snapshot(served_manifest, served_dirs) == snap_id, ( |
| 164 | f"served manifest does not reproduce snapshot_id {snap_id[:18]} — the " |
| 165 | f"serving path served the corrupt cached manifest instead of reconstructing " |
| 166 | f"from the delta chain.\n served paths={sorted(served_manifest)} dirs={served_dirs}" |
| 167 | ) |