"""Push XS unit tests — issue #64. One verb. One size. Proven correct at each layer before moving to the next. P1 mpack integrity — object_id == sha256(content), mpack_key == sha256(mpack) P2 muse push XS — real muse CLI pushes XS repo to localhost:1337, exit 0 P3 unpack stores correctly — sha256(stored_bytes) == object_id for every object P4 mpack index rows — every object has an mpack index row with correct mpack_id P5 fetch round-trip — fetch/mpack presigned URL unpacks to correct objects Tests hit real infrastructure (musehub at localhost:1337, MinIO at localhost:9000). No conftest. No ASGI. No mocks. """ from __future__ import annotations import asyncio import json import os import shutil import subprocess import tempfile import time as _time from pathlib import Path import msgpack import pytest from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.orm import sessionmaker from muse.core.types import blob_id from musehub.db.musehub_repo_models import MusehubMPackIndex _PROD_DB_URL = "postgresql+asyncpg://musehub:musehub@localhost:5434/musehub" async def _wait_indexed(oid: str, timeout: float = 15.0) -> bool: engine = create_async_engine(_PROD_DB_URL) async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) try: deadline = _time.monotonic() + timeout while _time.monotonic() < deadline: async with async_session() as session: row = await session.scalar( select(MusehubMPackIndex).where(MusehubMPackIndex.entity_id == oid) ) if row is not None: return True await asyncio.sleep(0.5) return False finally: await engine.dispose() async def _fetch_index_rows(oid: str, timeout: float = 10.0) -> list[MusehubMPackIndex]: engine = create_async_engine(_PROD_DB_URL) async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) try: deadline = _time.monotonic() + timeout while _time.monotonic() < deadline: async with async_session() as session: result = await session.scalars( select(MusehubMPackIndex).where(MusehubMPackIndex.entity_id == oid) ) rows = result.all() if rows: return list(rows) await asyncio.sleep(0.5) return [] finally: await engine.dispose() LOCALHOST = "https://localhost:1337" REPO_ROOT = Path(__file__).parent.parent # --------------------------------------------------------------------------- # Shared constants — XS is exactly 1 object, 4 KB, 1 commit # --------------------------------------------------------------------------- OBJ_CONTENT = b"a" * 4096 OBJ_ID = blob_id(OBJ_CONTENT) COMMIT_ID = blob_id(b"xs-commit") SNAPSHOT_ID = blob_id(b"xs-snapshot") def _build_mpack() -> tuple[bytes, str]: """Build the XS mpack and return (wire_bytes, mpack_key).""" mpack = { "commits": [{ "commit_id": COMMIT_ID, "branch": "main", "message": "xs unit test commit", "author": "gabriel", "committed_at": "2026-01-01T00:00:00+00:00", "parent_commit_id": None, "parent2_commit_id": None, "snapshot_id": SNAPSHOT_ID, "agent_id": "", "model_id": "", "toolchain_id": "", "sem_ver_bump": "none", "breaking_changes": [], "signature": "", "signer_key_id": "", "signer_public_key": "", "prompt_hash": "", }], "snapshots": [{ "snapshot_id": SNAPSHOT_ID, "parent_snapshot_id": None, "delta_upsert": {"file.txt": OBJ_ID}, "delta_remove": [], }], "blobs": [{"object_id": OBJ_ID, "content": OBJ_CONTENT}], "branch_heads": {"main": COMMIT_ID}, } wire_bytes = msgpack.packb(mpack, use_bin_type=True) mpack_key = blob_id(wire_bytes) return wire_bytes, mpack_key def _muse(*args: str, cwd: Path) -> subprocess.CompletedProcess: return subprocess.run( ["muse"] + list(args), cwd=str(cwd), capture_output=True, text=True, timeout=60, ) def _muse_check(*args: str, cwd: Path) -> str: r = _muse(*args, cwd=cwd) if r.returncode != 0: raise AssertionError(f"muse {' '.join(args)} failed:\n{r.stderr[:600]}") return r.stdout # --------------------------------------------------------------------------- # P1 — mpack integrity (no network, no DB, no fixtures) # --------------------------------------------------------------------------- def test_p1_object_id_matches_content() -> None: """object_id must equal sha256 of the raw content bytes.""" expected = blob_id(OBJ_CONTENT) assert OBJ_ID == expected, ( f"object_id mismatch\n got: {OBJ_ID}\n expected: {expected}" ) def test_p1_mpack_key_matches_wire_bytes() -> None: """mpack_key must equal sha256 of the msgpack-encoded mpack.""" wire_bytes, mpack_key = _build_mpack() expected = blob_id(wire_bytes) assert mpack_key == expected, ( f"mpack_key mismatch\n got: {mpack_key}\n expected: {expected}" ) def test_p1_mpack_objects_round_trip() -> None: """Every object unpacked from the mpack must hash to its declared object_id.""" wire_bytes, _ = _build_mpack() mpack = msgpack.unpackb(wire_bytes, raw=False) for obj in mpack["blobs"]: oid = obj["object_id"] content = obj["content"] computed = blob_id(content) assert oid == computed, ( f"object content integrity failure\n" f" declared object_id: {oid}\n" f" sha256(content): {computed}" ) # --------------------------------------------------------------------------- # P2 — muse push XS to real localhost:1337 server, assert exit 0 # --------------------------------------------------------------------------- def test_p2_muse_push_xs_exits_zero() -> None: """muse push of a 1-commit, 1-file XS repo to localhost must exit 0.""" tmpdir = Path(tempfile.mkdtemp(prefix="muse_p2_")) try: # Init repo _muse_check("init", cwd=tmpdir) # Write one 4 KB file — same content as our mpack constants (tmpdir / "file.txt").write_bytes(OBJ_CONTENT) _muse_check("code", "add", "file.txt", cwd=tmpdir) _muse_check( "commit", "-m", "xs unit test commit", "--agent-id", "bench", "--model-id", "bench", cwd=tmpdir, ) # Create a hub repo and push name = f"bench-push-xs-p2-{os.urandom(3).hex()}" out = _muse_check( "hub", "repo", "create", "--name", name, "--visibility", "public", "--no-init", "--hub", LOCALHOST, "--json", cwd=REPO_ROOT, ) slug = json.loads(out)["slug"] _muse_check("remote", "add", "origin", f"{LOCALHOST}/gabriel/{slug}", cwd=tmpdir) r = _muse("push", "origin", "main", cwd=tmpdir) assert r.returncode == 0, ( f"muse push XS failed (exit {r.returncode})\n" f"stdout: {r.stdout[:400]}\n" f"stderr: {r.stderr[:400]}" ) finally: shutil.rmtree(tmpdir, ignore_errors=True) # --------------------------------------------------------------------------- # P3 — after muse push, mpack in MinIO has correct object bytes # --------------------------------------------------------------------------- def test_p3_pushed_mpack_in_minio_is_muse_format() -> None: """After muse push XS, the mpack in MinIO must be in MUSE wire format and must contain the pushed object with correct content. Objects are no longer stored individually under objects/{oid} — they live inside the covering mpack. This test verifies the mpack is parseable and its content is intact. """ import boto3 from muse.core.mpack import parse_wire_mpack unique_content = os.urandom(4096) expected_oid = blob_id(unique_content) tmpdir = Path(tempfile.mkdtemp(prefix="muse_p3_")) try: _muse_check("init", cwd=tmpdir) (tmpdir / "file.txt").write_bytes(unique_content) _muse_check("code", "add", "file.txt", cwd=tmpdir) _muse_check( "commit", "-m", "xs p3 commit", "--agent-id", "bench", "--model-id", "bench", cwd=tmpdir, ) name = f"bench-push-xs-p3-{os.urandom(3).hex()}" out = _muse_check( "hub", "repo", "create", "--name", name, "--visibility", "public", "--no-init", "--hub", LOCALHOST, "--json", cwd=REPO_ROOT, ) slug = json.loads(out)["slug"] _muse_check("remote", "add", "origin", f"{LOCALHOST}/gabriel/{slug}", cwd=tmpdir) r = _muse("push", "origin", "main", cwd=tmpdir) assert r.returncode == 0, f"push failed:\n{r.stderr[:400]}" finally: shutil.rmtree(tmpdir, ignore_errors=True) # Wait for the mpack.index job to write the index row. indexed = asyncio.run(_wait_indexed(expected_oid, timeout=15)) assert indexed, ( f"mpack.index job did not complete within 15s\n object_id: {expected_oid}" ) # Retrieve the mpack_id from the index. rows = asyncio.run(_fetch_index_rows(expected_oid, timeout=5)) assert rows, f"No mpack index row found for object_id: {expected_oid}" mpack_id = rows[0].mpack_id # Fetch the mpack from MinIO and verify it is MUSE wire format. s3 = boto3.client( "s3", endpoint_url="http://localhost:9000", aws_access_key_id="minioadmin", aws_secret_access_key="minioadmin", region_name="us-east-1", ) wire_bytes = s3.get_object(Bucket="muse-objects", Key=f"mpacks/{mpack_id}")["Body"].read() assert wire_bytes[:4] == b"MUSE", ( f"Mpack in MinIO is not MUSE format — got magic {wire_bytes[:4]!r}" ) mpack = parse_wire_mpack(wire_bytes) oids_in_pack = {o["object_id"] for o in mpack.get("blobs", [])} assert expected_oid in oids_in_pack, ( f"Pushed object not found in mpack\n" f" expected: {expected_oid}\n" f" objects in mpack: {len(oids_in_pack)}" ) # --------------------------------------------------------------------------- # P4 — mpack index rows exist and mpack_id points to the correct mpack # --------------------------------------------------------------------------- def test_p4_mpack_index_has_row_for_pushed_object() -> None: """After muse push XS, musehub_mpack_index must have a row for the pushed object_id, and mpack_id must point to an mpack that contains that object. Queries the real production DB directly (same DB the server uses). """ import asyncio # Known content — same derivation as P3 unique_content = os.urandom(4096) expected_oid = blob_id(unique_content) # Push tmpdir = Path(tempfile.mkdtemp(prefix="muse_p4_")) try: _muse_check("init", cwd=tmpdir) (tmpdir / "file.txt").write_bytes(unique_content) _muse_check("code", "add", "file.txt", cwd=tmpdir) _muse_check( "commit", "-m", "xs p4 commit", "--agent-id", "bench", "--model-id", "bench", cwd=tmpdir, ) name = f"bench-push-xs-p4-{os.urandom(3).hex()}" out = _muse_check( "hub", "repo", "create", "--name", name, "--visibility", "public", "--no-init", "--hub", LOCALHOST, "--json", cwd=REPO_ROOT, ) slug = json.loads(out)["slug"] _muse_check("remote", "add", "origin", f"{LOCALHOST}/gabriel/{slug}", cwd=tmpdir) r = _muse("push", "origin", "main", cwd=tmpdir) assert r.returncode == 0, f"push failed:\n{r.stderr[:400]}" finally: shutil.rmtree(tmpdir, ignore_errors=True) # Query the real DB — poll up to 10s for the async mpack.index job to complete rows = asyncio.run(_fetch_index_rows(expected_oid, timeout=10)) assert rows, ( f"No mpack index row for object after push\n" f" object_id: {expected_oid}" ) # Verify the mpack_id points to an mpack in MinIO that contains our object import boto3 s3 = boto3.client( "s3", endpoint_url="http://localhost:9000", aws_access_key_id="minioadmin", aws_secret_access_key="minioadmin", region_name="us-east-1", ) bucket = "muse-objects" for row in rows: mpack_id = row.mpack_id s3_key = f"mpacks/{mpack_id}" try: wire_bytes = s3.get_object(Bucket=bucket, Key=s3_key)["Body"].read() except Exception as exc: raise AssertionError( f"mpack_id {mpack_id} not found in MinIO\n" f" key tried: {s3_key}\n" f" error: {exc}" ) from muse.core.mpack import parse_wire_mpack mpack = parse_wire_mpack(wire_bytes) oids_in_pack = {obj["object_id"] for obj in mpack.get("blobs", [])} assert expected_oid in oids_in_pack, ( f"mpack index row exists but mpack does not contain the object\n" f" object_id: {expected_oid}\n" f" mpack_id: {mpack_id}\n" f" objects in mpack: {len(oids_in_pack)}" ) # --------------------------------------------------------------------------- # P5 — muse clone round-trip: push then clone, no integrity errors # --------------------------------------------------------------------------- def test_p5_muse_clone_xs_no_integrity_errors() -> None: """Push an XS repo then clone it. The clone must exit 0 with no content integrity errors. This is the exact failure mode from bench_cli.py. If this passes, the full push → clone round-trip is correct for XS. """ unique_content = os.urandom(4096) # Push push_dir = Path(tempfile.mkdtemp(prefix="muse_p5_push_")) slug = None try: _muse_check("init", cwd=push_dir) (push_dir / "file.txt").write_bytes(unique_content) _muse_check("code", "add", "file.txt", cwd=push_dir) _muse_check( "commit", "-m", "xs p5 commit", "--agent-id", "bench", "--model-id", "bench", cwd=push_dir, ) name = f"bench-push-xs-p5-{os.urandom(3).hex()}" out = _muse_check( "hub", "repo", "create", "--name", name, "--visibility", "public", "--no-init", "--hub", LOCALHOST, "--json", cwd=REPO_ROOT, ) slug = json.loads(out)["slug"] _muse_check("remote", "add", "origin", f"{LOCALHOST}/gabriel/{slug}", cwd=push_dir) r = _muse("push", "origin", "main", cwd=push_dir) assert r.returncode == 0, f"push failed:\n{r.stderr[:400]}" finally: shutil.rmtree(push_dir, ignore_errors=True) # Wait for mpack.index job to complete (same as P4) expected_oid = blob_id(unique_content) indexed = asyncio.run(_wait_indexed(expected_oid, timeout=10)) assert indexed, "mpack index row never appeared — mpack.index job did not complete" # Clone clone_parent = Path(tempfile.mkdtemp(prefix="muse_p5_clone_")) try: r = _muse("clone", f"{LOCALHOST}/gabriel/{slug}", cwd=clone_parent) assert r.returncode == 0, ( f"muse clone failed (exit {r.returncode})\n" f"stdout: {r.stdout[:600]}\n" f"stderr: {r.stderr[:600]}" ) assert "integrity failure" not in r.stderr.lower(), ( f"clone exited 0 but reported integrity failures:\n{r.stderr[:600]}" ) assert "corrupted object" not in r.stderr.lower(), ( f"clone exited 0 but reported corrupted objects:\n{r.stderr[:600]}" ) finally: shutil.rmtree(clone_parent, ignore_errors=True)