"""TDD — server-side fast-forward check on push. Without this check the branch pointer is advanced unconditionally, meaning a non-force push silently overwrites concurrent work. The force flag sent by the client (muse push --force) is received but ignored. Test plan --------- FF-1 Normal FF push: remote at c1, client pushes c2 (parent=c1) → 200, branch=c2 FF-2 Non-FF, no force: remote at c1, client pushes c2' (diverged) → 409, branch unchanged FF-3 Non-FF, force=True: same diverged push with force=True → 200, branch=c2' FF-4 New branch (no prior head): any push is always FF → 200 FF-5 Push where incoming_head == current_head (no-op): → 200, branch unchanged """ from __future__ import annotations import datetime import hashlib import pathlib import httpx import msgpack import pytest import pytest_asyncio from httpx import AsyncClient, ASGITransport from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request from musehub.db.musehub_repo_models import MusehubBranch, MusehubRepo from musehub.main import app from muse.core.mpack import build_mpack from musehub.types.json_types import JSONObject from muse.core.object_store import write_object from muse.core.paths import muse_dir from muse.core.snapshot import compute_commit_id, compute_snapshot_id from muse.core.commits import CommitRecord, write_commit from muse.core.refs import write_branch_ref from muse.core.snapshots import SnapshotRecord, write_snapshot from muse.core.types import Manifest, blob_id pytestmark = pytest.mark.skip(reason="muse wire protocol in flux") _AUTH_CTX = MSignContext( handle="gabriel", identity_id="sha256:" + "0" * 64, is_agent=False, is_admin=True, ) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest_asyncio.fixture() async def client(db_session: AsyncSession) -> None: # Only override auth — conftest.db_session already wires get_db to a # per-request session that commits after each handler, so our test # db_session sees committed data when it queries the DB directly. app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX async with AsyncClient( transport=ASGITransport(app=app), base_url="https://localhost:1337", ) as c: yield c app.dependency_overrides.pop(require_signed_request, None) app.dependency_overrides.pop(optional_signed_request, None) @pytest_asyncio.fixture() async def repo(client: AsyncClient) -> None: resp = await client.post( "/api/repos", json={"owner": "gabriel", "name": "ff-check-test", "visibility": "public", "initialize": False}, ) assert resp.status_code in (200, 201), resp.text data = resp.json() yield data["slug"] await client.delete(f"/api/repos/{data['repoId']}") # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_local_repo(tmp: pathlib.Path, repo_id: str = "ff-test") -> pathlib.Path: tmp.mkdir(parents=True, exist_ok=True) dot = muse_dir(tmp) dot.mkdir() (dot / "repo.json").write_text(f'{{"repo_id":"{repo_id}","owner":"gabriel"}}') for d in ("commits", "snapshots", "objects"): (dot / d).mkdir() (dot / "refs" / "heads").mkdir(parents=True) (dot / "HEAD").write_text("ref: refs/heads/main\n") (dot / "config.toml").write_text("") return tmp def _add_commit( root: pathlib.Path, label: str, parent_id: str | None = None, repo_id: str = "ff-test", ) -> CommitRecord: raw = f"content-{label}".encode() oid = blob_id(raw) write_object(root, oid, raw) manifest: Manifest = {"file.txt": oid} snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) parent_ids = [parent_id] if parent_id else [] cid = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=f"commit {label}", committed_at_iso=ts.isoformat(), author="gabriel", ) commit = CommitRecord( repo_id=repo_id, commit_id=cid, branch="main", snapshot_id=snap_id, message=f"commit {label}", committed_at=ts, parent_commit_id=parent_id, parent2_commit_id=None, author="gabriel", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", ) write_commit(root, commit) return commit async def _upload_and_unpack( client: AsyncClient, repo_slug: str, root: pathlib.Path, tip: str, have: list[str], *, force: bool = False, branch: str = "main", expect_status: int = 200, ) -> JSONObject: """Build mpack, presign-upload to MinIO, call unpack-mpack. Returns response JSON.""" mpack_dict = build_mpack(root, [tip], have=have) wire_bytes = msgpack.packb(mpack_dict, use_bin_type=True) mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() presign_resp = await client.post( f"/gabriel/{repo_slug}/push/mpack-presign", content=msgpack.packb( {"mpack_key": mpack_key, "size_bytes": len(wire_bytes)}, use_bin_type=True, ), headers={"Content-Type": "application/x-msgpack"}, ) assert presign_resp.status_code == 200, presign_resp.text upload_url = ( presign_resp.json().get("upload_url") or presign_resp.json().get("uploadUrl") ) async with httpx.AsyncClient() as raw: put = await raw.put(upload_url, content=wire_bytes) assert put.status_code in (200, 204) n_commits = len(mpack_dict.get("commits") or []) n_objects = len(mpack_dict.get("objects") or []) unpack_resp = await client.post( f"/gabriel/{repo_slug}/push/unpack-mpack", content=msgpack.packb( { "mpack_key": mpack_key, "branch": branch, "head": tip, "commits_count": n_commits, "objects_count": n_objects, "force": force, }, use_bin_type=True, ), headers={"Content-Type": "application/x-msgpack"}, ) assert unpack_resp.status_code == expect_status, ( f"Expected HTTP {expect_status}, got {unpack_resp.status_code}: {unpack_resp.text}" ) return unpack_resp.json() async def _get_branch_head(db_session: AsyncSession, repo_slug: str, branch: str = "main") -> str | None: """Return the current head_commit_id for the branch, or None.""" repo_row = (await db_session.execute( select(MusehubRepo).where(MusehubRepo.slug == repo_slug) )).scalar_one_or_none() if not repo_row: return None branch_row = (await db_session.execute( select(MusehubBranch).where( MusehubBranch.repo_id == repo_row.repo_id, MusehubBranch.name == branch, ) )).scalar_one_or_none() return branch_row.head_commit_id if branch_row else None # --------------------------------------------------------------------------- # FF-1 — normal fast-forward push succeeds and advances branch # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_ff1_fast_forward_push_advances_branch( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """Remote at c1 → push c2 (parent=c1): 200, branch advances to c2.""" root = _make_local_repo(tmp_path / "repo") c1 = _add_commit(root, "c1") c2 = _add_commit(root, "c2", parent_id=c1.commit_id) write_branch_ref(root, "main", c2.commit_id) # First push: establish c1 on remote await _upload_and_unpack(client, repo, root, c1.commit_id, have=[]) head_after_first = await _get_branch_head(db_session, repo) assert head_after_first == c1.commit_id, "setup: branch should be at c1 after first push" # Second push: FF from c1 → c2 await _upload_and_unpack(client, repo, root, c2.commit_id, have=[c1.commit_id]) head_after_second = await _get_branch_head(db_session, repo) assert head_after_second == c2.commit_id, ( f"FF push must advance branch to c2, got {head_after_second}" ) # --------------------------------------------------------------------------- # FF-2 — non-FF push without force is rejected with 409 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_ff2_non_ff_push_rejected_without_force( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """Remote at c1 → push c2' (diverged, no force): 409, branch unchanged at c1.""" root = _make_local_repo(tmp_path / "repo") c1 = _add_commit(root, "c1") # c2' is a genesis commit (different content, diverges from c1) c2_diverged = _add_commit(root, "c2-diverged", parent_id=None) # Establish c1 on remote await _upload_and_unpack(client, repo, root, c1.commit_id, have=[]) head_after_first = await _get_branch_head(db_session, repo) assert head_after_first == c1.commit_id # Try to push c2' (diverged) without force → must be rejected await _upload_and_unpack( client, repo, root, c2_diverged.commit_id, have=[], force=False, expect_status=409, ) head_after_rejected = await _get_branch_head(db_session, repo) assert head_after_rejected == c1.commit_id, ( f"Rejected non-FF push must not change branch, " f"expected c1={c1.commit_id[:16]} got {str(head_after_rejected)[:16]}" ) # --------------------------------------------------------------------------- # FF-3 — non-FF push with force=True is allowed # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_ff3_non_ff_push_allowed_with_force( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """Remote at c1 → push c2' (diverged) with force=True: 200, branch at c2'.""" root = _make_local_repo(tmp_path / "repo") c1 = _add_commit(root, "c1") c2_diverged = _add_commit(root, "c2-force", parent_id=None) # Establish c1 on remote await _upload_and_unpack(client, repo, root, c1.commit_id, have=[]) # Force-push c2' (diverged) await _upload_and_unpack( client, repo, root, c2_diverged.commit_id, have=[], force=True, expect_status=200, ) head = await _get_branch_head(db_session, repo) assert head == c2_diverged.commit_id, ( f"Force push must advance branch to c2_diverged, got {str(head)[:16]}" ) # --------------------------------------------------------------------------- # FF-4 — new branch (no prior head) always succeeds # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_ff4_new_branch_always_succeeds( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """First push to a brand-new branch is always allowed (no prior head to protect).""" root = _make_local_repo(tmp_path / "repo") c1 = _add_commit(root, "genesis") write_branch_ref(root, "main", c1.commit_id) await _upload_and_unpack(client, repo, root, c1.commit_id, have=[], expect_status=200) head = await _get_branch_head(db_session, repo) assert head == c1.commit_id # --------------------------------------------------------------------------- # FF-5 — push where incoming_head == current_head is a no-op, always 200 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_ff5_same_head_is_noop( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """Pushing when local tip already equals remote head is a no-op: 200, branch unchanged.""" root = _make_local_repo(tmp_path / "repo") c1 = _add_commit(root, "c1-noop") write_branch_ref(root, "main", c1.commit_id) await _upload_and_unpack(client, repo, root, c1.commit_id, have=[]) # Push again with the same head — empty mpack, same tip await _upload_and_unpack( client, repo, root, c1.commit_id, have=[c1.commit_id], expect_status=200, ) head = await _get_branch_head(db_session, repo) assert head == c1.commit_id