"""Fetch XS unit tests — issue #66. One verb. One size. Proven correct at each layer before moving to the next. F1 muse fetch exits 0 — push XS, clone it, push delta, fetch exits 0 F2 fetched object content correct — delta file object is locally readable with correct bytes F3 no integrity errors — fetch stdout/stderr clean even when exit 0 F4 fetch is idempotent — two fetches in a row both exit 0, same result F5 fetch does not clobber local — original file still intact after fetching delta Tests hit real infrastructure (musehub at localhost:1337, MinIO at localhost:9000). No conftest. No ASGI. No mocks. """ from __future__ import annotations import asyncio import json import os import shutil import socket import subprocess import tempfile import time as _time from pathlib import Path import pytest def _port_open(host: str, port: int) -> bool: try: with socket.create_connection((host, port), timeout=1): return True except OSError: return False pytestmark = pytest.mark.skipif( not (_port_open("localhost", 1337) and _port_open("localhost", 9000)), reason="live infrastructure not available — start with docker compose up minio createbuckets musehub", ) from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker from muse.core.types import blob_id from musehub.db.musehub_repo_models import MusehubMPackIndex _PROD_DB_URL = "postgresql+asyncpg://musehub:musehub@localhost:5434/musehub" LOCALHOST = "https://localhost:1337" REPO_ROOT = Path(__file__).parent.parent def _muse(*args: str, cwd: Path) -> subprocess.CompletedProcess: return subprocess.run( ["muse"] + list(args), cwd=str(cwd), capture_output=True, text=True, timeout=60, ) def _muse_check(*args: str, cwd: Path) -> str: r = _muse(*args, cwd=cwd) if r.returncode != 0: raise AssertionError(f"muse {' '.join(args)} failed:\n{r.stderr[:600]}") return r.stdout async def _wait_indexed(oid: str, timeout: float = 15.0) -> bool: engine = create_async_engine(_PROD_DB_URL) async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) try: deadline = _time.monotonic() + timeout while _time.monotonic() < deadline: async with async_session() as session: row = await session.scalar( select(MusehubMPackIndex).where(MusehubMPackIndex.entity_id == oid) ) if row is not None: return True await asyncio.sleep(0.5) return False finally: await engine.dispose() def _setup_fetch_scenario() -> tuple[str, Path, bytes, bytes]: """Push XS repo, clone it, push a delta commit. Returns (full_slug, clone_dir, original_content, delta_content). Caller is responsible for cleaning up clone_dir. """ original_content = os.urandom(4096) delta_content = os.urandom(4096) # --- push initial XS repo --- push_dir = Path(tempfile.mkdtemp(prefix="muse_fxs_push_")) try: _muse_check("init", cwd=push_dir) (push_dir / "original.txt").write_bytes(original_content) _muse_check("code", "add", "original.txt", cwd=push_dir) _muse_check( "commit", "-m", "xs fetch test: initial commit", "--agent-id", "bench", "--model-id", "bench", cwd=push_dir, ) name = f"bench-fetch-xs-{os.urandom(3).hex()}" out = _muse_check( "hub", "repo", "create", "--name", name, "--visibility", "public", "--no-init", "--hub", LOCALHOST, "--json", cwd=REPO_ROOT, ) slug = json.loads(out)["slug"] # bare name full_slug = f"gabriel/{slug}" _muse_check("remote", "add", "origin", f"{LOCALHOST}/{full_slug}", cwd=push_dir) r = _muse("push", "origin", "main", cwd=push_dir) assert r.returncode == 0, f"initial push failed:\n{r.stderr[:400]}" finally: shutil.rmtree(push_dir, ignore_errors=True) # wait for mpack index so clone can resolve the object orig_oid = blob_id(original_content) indexed = asyncio.run(_wait_indexed(orig_oid)) assert indexed, f"mpack index row never appeared for initial object {orig_oid}" # --- clone --- clone_parent = Path(tempfile.mkdtemp(prefix="muse_fxs_clone_")) _muse_check("clone", f"{LOCALHOST}/{full_slug}", cwd=clone_parent) repo_name = slug.split("/")[-1] if "/" in slug else slug clone_dir = clone_parent / repo_name # --- push delta commit from a fresh copy --- delta_dir = Path(tempfile.mkdtemp(prefix="muse_fxs_delta_")) try: shutil.copytree(str(clone_dir), str(delta_dir / "repo"), symlinks=False) delta_repo = delta_dir / "repo" (delta_repo / "delta.txt").write_bytes(delta_content) _muse_check("code", "add", "delta.txt", cwd=delta_repo) _muse_check( "commit", "-m", "xs fetch test: delta commit", "--agent-id", "bench", "--model-id", "bench", cwd=delta_repo, ) r = _muse("push", "origin", "main", cwd=delta_repo) assert r.returncode == 0, f"delta push failed:\n{r.stderr[:400]}" finally: shutil.rmtree(delta_dir, ignore_errors=True) # wait for delta object to be indexed before fetching delta_oid = blob_id(delta_content) indexed = asyncio.run(_wait_indexed(delta_oid)) assert indexed, f"mpack index row never appeared for delta object {delta_oid}" return full_slug, clone_dir, original_content, delta_content # --------------------------------------------------------------------------- # F1 — muse fetch exits 0 # --------------------------------------------------------------------------- def test_f1_muse_fetch_xs_exits_zero() -> None: """muse fetch of a 1-commit delta from a cloned XS repo must exit 0.""" _, clone_dir, _, _ = _setup_fetch_scenario() try: r = _muse("fetch", "origin", cwd=clone_dir) assert r.returncode == 0, ( f"muse fetch failed (exit {r.returncode})\n" f"stdout: {r.stdout[:400]}\n" f"stderr: {r.stderr[:400]}" ) finally: shutil.rmtree(clone_dir.parent, ignore_errors=True) # --------------------------------------------------------------------------- # F2 — fetched object content is correct # --------------------------------------------------------------------------- def test_f2_fetched_object_content_is_correct() -> None: """After fetch, the delta object must be readable locally with correct bytes. Verified by merging the fetched remote branch and reading the file from disk. """ _, clone_dir, original_content, delta_content = _setup_fetch_scenario() try: r = _muse("fetch", "origin", cwd=clone_dir) assert r.returncode == 0, f"fetch failed:\n{r.stderr[:400]}" # Merge the fetched remote tip into local main so the file lands on disk. _muse_check("merge", "origin/main", cwd=clone_dir) delta_file = clone_dir / "delta.txt" assert delta_file.exists(), "delta.txt not present after fetch + merge" actual_bytes = delta_file.read_bytes() expected_oid = blob_id(delta_content) actual_oid = blob_id(actual_bytes) assert actual_oid == expected_oid, ( f"delta.txt content integrity failure\n" f" expected oid: {expected_oid}\n" f" actual oid: {actual_oid}\n" f" file size: {len(actual_bytes)} bytes" ) finally: shutil.rmtree(clone_dir.parent, ignore_errors=True) # --------------------------------------------------------------------------- # F3 — no integrity errors in fetch output # --------------------------------------------------------------------------- def test_f3_fetch_output_has_no_integrity_errors() -> None: """Fetch stdout/stderr must contain no integrity-failure strings.""" _, clone_dir, _, _ = _setup_fetch_scenario() try: r = _muse("fetch", "origin", cwd=clone_dir) assert r.returncode == 0, f"fetch failed:\n{r.stderr[:400]}" combined = (r.stdout + r.stderr).lower() bad_phrases = ["integrity failure", "corrupted object", "skipping corrupted", "content integrity"] for phrase in bad_phrases: assert phrase not in combined, ( f"fetch exited 0 but output contains '{phrase}':\n" f"stdout: {r.stdout[:400]}\n" f"stderr: {r.stderr[:400]}" ) finally: shutil.rmtree(clone_dir.parent, ignore_errors=True) # --------------------------------------------------------------------------- # F4 — fetch is idempotent # --------------------------------------------------------------------------- def test_f4_fetch_is_idempotent() -> None: """Running muse fetch origin twice must both exit 0 with identical state.""" _, clone_dir, _, delta_content = _setup_fetch_scenario() try: r1 = _muse("fetch", "origin", cwd=clone_dir) assert r1.returncode == 0, f"first fetch failed:\n{r1.stderr[:400]}" r2 = _muse("fetch", "origin", cwd=clone_dir) assert r2.returncode == 0, f"second fetch failed:\n{r2.stderr[:400]}" # After merging, the delta file must be present and correct. _muse_check("merge", "origin/main", cwd=clone_dir) delta_file = clone_dir / "delta.txt" assert delta_file.exists(), "delta.txt missing after two fetches + merge" actual_oid = blob_id(delta_file.read_bytes()) expected_oid = blob_id(delta_content) assert actual_oid == expected_oid, ( f"delta.txt integrity failure after idempotent fetch\n" f" expected: {expected_oid}\n" f" actual: {actual_oid}" ) finally: shutil.rmtree(clone_dir.parent, ignore_errors=True) # --------------------------------------------------------------------------- # F5 — fetch does not clobber existing local content # --------------------------------------------------------------------------- def test_f5_fetch_does_not_clobber_local_content() -> None: """original.txt must be byte-for-byte intact after fetching the delta commit.""" _, clone_dir, original_content, _ = _setup_fetch_scenario() try: r = _muse("fetch", "origin", cwd=clone_dir) assert r.returncode == 0, f"fetch failed:\n{r.stderr[:400]}" original_file = clone_dir / "original.txt" assert original_file.exists(), "original.txt missing after fetch" actual_bytes = original_file.read_bytes() expected_oid = blob_id(original_content) actual_oid = blob_id(actual_bytes) assert actual_oid == expected_oid, ( f"original.txt was clobbered by fetch\n" f" expected oid: {expected_oid}\n" f" actual oid: {actual_oid}\n" f" file size: {len(actual_bytes)} bytes" ) finally: shutil.rmtree(clone_dir.parent, ignore_errors=True)