"""TDD — the serving path must RECOVER from a corrupt cached manifest_blob. Belt-and-suspenders for the staging gabriel/muse clone failure. The corruption is LEGACY data (snapshots written 2026-05-27/28 with entry_count=0 — an empty manifest_blob that does not reproduce its snapshot_id). Current push code no longer produces it, but the bad rows persist, and any cached corrupt manifest poisons the clone: `_snap_row_to_wire` serves the corrupt manifest, the client's hash check fails, the commit is dropped, and every descendant fails "parent not in mpack". Invariant under test: when a snapshot's cached `manifest_blob` does NOT reproduce its `snapshot_id`, the serving path must reconstruct the correct manifest from the intact delta chain (parent manifest + this snapshot's delta_blob) rather than serve the corrupt cache. This makes clone self-healing (repair-on-read) against any corrupt cache, legacy or future. RED before the hardening (serves the corrupt {} → hash mismatch); GREEN after. Integration test against localhost (musehub @ :1337, postgres @ :5434). It pushes a short chain, then *simulates the legacy corruption* by overwriting the head snapshot's manifest_blob with packb({}) in the DB (unique content per run, so the global content-addressed snapshot row is not shared with any other repo), and restores it afterward. """ from __future__ import annotations import asyncio import json import subprocess import time as _time from pathlib import Path import msgpack import pytest from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker from muse.core.ids import hash_snapshot from musehub.db.musehub_repo_models import MusehubCommit, MusehubSnapshot from musehub.services.musehub_wire_shared import _snap_row_to_wire_s3 HUB = "https://localhost:1337" DB_URL = "postgresql+asyncpg://musehub:musehub@localhost:5434/musehub" REPO_ROOT = Path(__file__).parent.parent def muse(*args: str, cwd: Path, timeout: int = 90) -> subprocess.CompletedProcess: return subprocess.run( ["muse", *args], cwd=str(cwd), capture_output=True, text=True, timeout=timeout ) def muse_check(*args: str, cwd: Path, timeout: int = 90) -> str: r = muse(*args, cwd=cwd, timeout=timeout) if r.returncode != 0: raise RuntimeError(f"muse {' '.join(args)} failed (rc={r.returncode}):\n{r.stderr[:600]}") return r.stdout def _commit_id_by_message(repo: Path, message: str) -> str: commits = json.loads(muse_check("log", "--json", cwd=repo))["commits"] for c in commits: if (c.get("message") or "").strip() == message: return c["commit_id"] raise AssertionError(f"no commit with message {message!r}") async def _snapshot_id_for_commit(commit_id: str) -> str: engine = create_async_engine(DB_URL) Session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) try: async with Session() as s: commit = await s.get(MusehubCommit, commit_id) assert commit is not None and commit.snapshot_id, f"no snapshot for {commit_id[:18]}" return commit.snapshot_id finally: await engine.dispose() async def _corrupt_then_serve(snapshot_id: str): """Overwrite the snapshot's manifest_blob with packb({}) (simulating the legacy corruption), serve it through the wire path, then restore the original blob. Returns (served_manifest, served_directories). """ engine = create_async_engine(DB_URL) Session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) # _snap_row_to_wire_s3 serves the manifest from the DB (delta-chain reconstruction); # it does not use the storage backend for that path, so a backend is not required. backend = None try: # snapshot original state async with Session() as s: row = await s.get(MusehubSnapshot, snapshot_id) assert row is not None, f"snapshot {snapshot_id[:18]} not on server" original_blob = row.manifest_blob original_count = row.entry_count try: # inject the legacy corruption: empty manifest async with Session() as s: row = await s.get(MusehubSnapshot, snapshot_id) row.manifest_blob = msgpack.packb({}, use_bin_type=True) row.entry_count = 0 await s.commit() # serve it through the real wire path (session-aware entry point) async with Session() as s: row = await s.get(MusehubSnapshot, snapshot_id) wire = await _snap_row_to_wire_s3(row, backend, session=s) return dict(wire.get("manifest") or {}), list(wire.get("directories") or []) finally: # restore — never leave the corrupt blob behind async with Session() as s: row = await s.get(MusehubSnapshot, snapshot_id) row.manifest_blob = original_blob row.entry_count = original_count await s.commit() finally: await engine.dispose() @pytest.fixture def hub_repo(tmp_path: Path): name = f"test-serve-recover-{tmp_path.name[-6:]}" out = muse_check( "hub", "repo", "create", "--name", name, "--visibility", "public", "--no-init", "--hub", HUB, "--json", cwd=REPO_ROOT, ) slug = f"gabriel/{json.loads(out)['slug']}" yield slug muse("hub", "repo", "delete", slug, "--yes", "--hub", HUB, "--json", cwd=REPO_ROOT) def test_serving_path_recovers_from_corrupt_manifest_blob(tmp_path: Path, hub_repo: str) -> None: repo = tmp_path / "seed" repo.mkdir() muse_check("init", cwd=repo) # Unique content per run so the content-addressed snapshot rows are not shared # with any other repo (safe to corrupt/restore the global row). tag = f"{tmp_path.name}-{int(_time.time())}" # A -> B -> C ; C is the head (full manifest), B is delta-only, A is root. for fname, msg in [("f1.txt", "A"), ("f2.txt", "B"), ("f3.txt", "C")]: (repo / fname).write_text(f"{msg}-{tag}\n") muse_check("code", "add", ".", cwd=repo) muse_check("commit", "-m", msg, "--agent-id", "test", "--model-id", "test", cwd=repo) muse_check("remote", "add", "origin", f"{HUB}/{hub_repo}", cwd=repo) muse_check("push", "origin", "main", cwd=repo) c_commit = _commit_id_by_message(repo, "C") snap_id = asyncio.run(_snapshot_id_for_commit(c_commit)) served_manifest, served_dirs = asyncio.run(_corrupt_then_serve(snap_id)) # The invariant: even though the cached manifest_blob was corrupt ({}), the # serving path must return a manifest that reproduces the snapshot_id by # reconstructing from the delta chain. assert served_manifest, ( "serving path returned an EMPTY manifest for a corrupt cache — it must " "reconstruct from the delta chain instead of serving the corrupt blob" ) assert hash_snapshot(served_manifest, served_dirs) == snap_id, ( f"served manifest does not reproduce snapshot_id {snap_id[:18]} — the " f"serving path served the corrupt cached manifest instead of reconstructing " f"from the delta chain.\n served paths={sorted(served_manifest)} dirs={served_dirs}" )