"""TDD tests for governance quorum enforcement on proposal merges. Design: repos with a ``governance.json`` at HEAD require ≥ threshold approved reviews from declared quorum members before a proposal can merge. Repos without ``governance.json`` are unaffected. Surfaces: 1. ``load_governance`` — reads governance.json from repo HEAD object store 2. ``check_quorum`` — counts member approvals vs threshold 3. POST /repos/{repo_id}/proposals/{proposal_id}/merge — returns 403 when quorum is not met, proceeds normally when it is Tests are RED-first. Each assertion drives one concrete implementation decision. """ from __future__ import annotations import json import pytest from datetime import datetime, timezone from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from musehub.main import app from musehub.types.json_types import JSONObject from muse.core.types import long_id, blob_id from musehub.core.genesis import compute_identity_id, compute_repo_id, compute_proposal_id, compute_review_id # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _NOW = datetime.now(timezone.utc) _MEMBER_FP = long_id("a" * 64) _NONMEMBER_FP = long_id("b" * 64) _GOVERNANCE_1OF1 = { "schema": 1, "quorum": { "threshold": 1, "policy": "1-of-1", "members": [_MEMBER_FP], }, } _GOVERNANCE_2OF3 = { "schema": 1, "quorum": { "threshold": 2, "policy": "2-of-3", "members": [ _MEMBER_FP, long_id("c" * 64), long_id("d" * 64), ], }, } def _make_identity(handle: str, identity_id: str | None = None) -> None: from musehub.db.musehub_identity_models import MusehubIdentity return MusehubIdentity( identity_id=identity_id or compute_identity_id(handle.encode()), handle=handle, identity_type="human", agent_capabilities=[], pinned_repo_ids=[], is_verified=False, created_at=_NOW, updated_at=_NOW, ) def _make_auth_key(identity_id: str, fingerprint: str) -> None: from musehub.db.musehub_auth_models import MusehubAuthKey return MusehubAuthKey( key_id=fingerprint, identity_id=identity_id, algorithm="ed25519", public_key_b64=f"ed25519:{'Z' * 43}", fingerprint=fingerprint, label="test key", created_at=_NOW, ) def _make_repo(owner: str, slug: str, identity_id: str) -> None: from musehub.db.musehub_repo_models import MusehubRepo return MusehubRepo( repo_id=compute_repo_id(identity_id, slug, "muse/generic", _NOW.isoformat()), name=slug, owner=owner, slug=slug, visibility="public", owner_user_id=identity_id, ) _PROPOSAL_COUNTER: list[int] = [0] def _make_proposal(repo_id: str, proposal_id: str) -> None: from musehub.db.musehub_social_models import MusehubProposal _PROPOSAL_COUNTER[0] += 1 return MusehubProposal( proposal_id=proposal_id, repo_id=repo_id, proposal_number=_PROPOSAL_COUNTER[0], title="Test proposal", body="", from_branch="feat/x", to_branch="main", state="open", author="testuser", created_at=_NOW, updated_at=_NOW, ) def _make_review(proposal_id: str, reviewer: str, state: str) -> None: from musehub.db.musehub_social_models import MusehubProposalReview review_id = compute_review_id(proposal_id, compute_identity_id(reviewer.encode()), _NOW.isoformat()) return MusehubProposalReview( review_id=review_id, proposal_id=proposal_id, reviewer_username=reviewer, state=state, submitted_at=_NOW, created_at=_NOW, ) # --------------------------------------------------------------------------- # 1. load_governance — unit tests # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_load_governance_returns_none_when_no_file(db_session: AsyncSession) -> None: """Repo with no governance.json returns None.""" from musehub.services.musehub_governance import load_governance identity_id = compute_identity_id(b"govtest1") human = _make_identity("govtest1", identity_id) db_session.add(human) repo = _make_repo("govtest1", "no-gov-repo", identity_id) db_session.add(repo) await db_session.commit() result = await load_governance(db_session, repo.repo_id) assert result is None @pytest.mark.asyncio async def test_load_governance_returns_parsed_json(db_session: AsyncSession) -> None: """Repo with governance.json stored in object store returns parsed dict.""" from musehub.services.musehub_governance import load_governance from musehub.db.musehub_repo_models import ( MusehubRepo, MusehubCommit, MusehubCommitRef, MusehubBranch, MusehubSnapshot, MusehubSnapshotRef, MusehubObject, MusehubObjectRef, ) from musehub.core.genesis import compute_branch_id import msgpack identity_id = compute_identity_id(b"govtest2") human = _make_identity("govtest2", identity_id) db_session.add(human) repo = _make_repo("govtest2", "gov-repo", identity_id) db_session.add(repo) # Build a minimal snapshot containing governance.json content = json.dumps(_GOVERNANCE_1OF1).encode() object_id = blob_id(content) snapshot_id = blob_id(f"snap:{repo.repo_id}:governance".encode()) commit_id = blob_id(f"commit:{repo.repo_id}:init".encode()) obj = MusehubObject( object_id=object_id, path="governance.json", size_bytes=len(content), storage_uri=f"s3://muse-objects/objects/{object_id}", content_cache=content, ) db_session.add(obj) obj_ref = MusehubObjectRef( object_id=object_id, repo_id=repo.repo_id, ) db_session.add(obj_ref) snap = MusehubSnapshot( snapshot_id=snapshot_id, directories=[], manifest_blob=msgpack.packb({"governance.json": object_id}, use_bin_type=True), entry_count=1, created_at=_NOW, ) db_session.add(snap) db_session.add(MusehubSnapshotRef(repo_id=repo.repo_id, snapshot_id=snapshot_id)) commit = MusehubCommit( commit_id=commit_id, branch="main", parent_ids=[], message="init", author=identity_id, timestamp=_NOW, snapshot_id=snapshot_id, ) db_session.add(commit) db_session.add(MusehubCommitRef(repo_id=repo.repo_id, commit_id=commit_id)) branch = MusehubBranch( branch_id=compute_branch_id(repo.repo_id, "main"), repo_id=repo.repo_id, name="main", head_commit_id=commit_id, ) db_session.add(branch) await db_session.commit() result = await load_governance(db_session, repo.repo_id) assert result is not None assert result["quorum"]["threshold"] == 1 assert _MEMBER_FP in result["quorum"]["members"] # --------------------------------------------------------------------------- # 2. check_quorum — unit tests # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_check_quorum_no_approvals_not_met(db_session: AsyncSession) -> None: from musehub.services.musehub_governance import check_quorum identity_id = compute_identity_id(b"qtest1") human = _make_identity("qtest1", identity_id) db_session.add(human) repo = _make_repo("qtest1", "qtest-repo", identity_id) db_session.add(repo) proposal_id = compute_proposal_id(repo.repo_id, identity_id, "feat/x", "main", _NOW.isoformat()) proposal = _make_proposal(repo.repo_id, proposal_id) db_session.add(proposal) await db_session.commit() met, found, threshold = await check_quorum( db_session, repo.repo_id, proposal_id, _GOVERNANCE_1OF1 ) assert not met assert found == 0 assert threshold == 1 @pytest.mark.asyncio async def test_check_quorum_nonmember_approval_not_counted(db_session: AsyncSession) -> None: from musehub.services.musehub_governance import check_quorum identity_id = compute_identity_id(b"qtest2") human = _make_identity("qtest2", identity_id) db_session.add(human) await db_session.flush() auth_key = _make_auth_key(identity_id, _NONMEMBER_FP) db_session.add(auth_key) repo = _make_repo("qtest2", "qtest-repo2", identity_id) db_session.add(repo) proposal_id = compute_proposal_id(repo.repo_id, identity_id, "feat/x", "main", _NOW.isoformat()) proposal = _make_proposal(repo.repo_id, proposal_id) db_session.add(proposal) review = _make_review(proposal_id, "qtest2", "approved") db_session.add(review) await db_session.commit() met, found, threshold = await check_quorum( db_session, repo.repo_id, proposal_id, _GOVERNANCE_1OF1 ) assert not met assert found == 0 @pytest.mark.asyncio async def test_check_quorum_member_approval_counts(db_session: AsyncSession) -> None: from musehub.services.musehub_governance import check_quorum identity_id = compute_identity_id(b"qtest3") human = _make_identity("qtest3", identity_id) db_session.add(human) await db_session.flush() auth_key = _make_auth_key(identity_id, _MEMBER_FP) db_session.add(auth_key) repo = _make_repo("qtest3", "qtest-repo3", identity_id) db_session.add(repo) proposal_id = compute_proposal_id(repo.repo_id, identity_id, "feat/x", "main", _NOW.isoformat()) proposal = _make_proposal(repo.repo_id, proposal_id) db_session.add(proposal) review = _make_review(proposal_id, "qtest3", "approved") db_session.add(review) await db_session.commit() met, found, threshold = await check_quorum( db_session, repo.repo_id, proposal_id, _GOVERNANCE_1OF1 ) assert met assert found == 1 assert threshold == 1 @pytest.mark.asyncio async def test_check_quorum_changes_requested_not_counted(db_session: AsyncSession) -> None: from musehub.services.musehub_governance import check_quorum identity_id = compute_identity_id(b"qtest4") human = _make_identity("qtest4", identity_id) db_session.add(human) await db_session.flush() auth_key = _make_auth_key(identity_id, _MEMBER_FP) db_session.add(auth_key) repo = _make_repo("qtest4", "qtest-repo4", identity_id) db_session.add(repo) proposal_id = compute_proposal_id(repo.repo_id, identity_id, "feat/x", "main", _NOW.isoformat()) proposal = _make_proposal(repo.repo_id, proposal_id) db_session.add(proposal) review = _make_review(proposal_id, "qtest4", "changes_requested") db_session.add(review) await db_session.commit() met, found, threshold = await check_quorum( db_session, repo.repo_id, proposal_id, _GOVERNANCE_1OF1 ) assert not met assert found == 0 @pytest.mark.asyncio async def test_check_quorum_2of3_partial_not_met(db_session: AsyncSession) -> None: from musehub.services.musehub_governance import check_quorum identity_id = compute_identity_id(b"qtest5") human = _make_identity("qtest5", identity_id) db_session.add(human) await db_session.flush() auth_key = _make_auth_key(identity_id, _MEMBER_FP) db_session.add(auth_key) repo = _make_repo("qtest5", "qtest-repo5", identity_id) db_session.add(repo) proposal_id = compute_proposal_id(repo.repo_id, identity_id, "feat/x", "main", _NOW.isoformat()) proposal = _make_proposal(repo.repo_id, proposal_id) db_session.add(proposal) review = _make_review(proposal_id, "qtest5", "approved") db_session.add(review) await db_session.commit() met, found, threshold = await check_quorum( db_session, repo.repo_id, proposal_id, _GOVERNANCE_2OF3 ) assert not met assert found == 1 assert threshold == 2 # --------------------------------------------------------------------------- # 3. API — merge blocked when quorum not met # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_merge_blocked_when_quorum_not_met( client: AsyncClient, db_session: AsyncSession, auth_headers: dict[str, str], monkeypatch: pytest.MonkeyPatch, ) -> None: """POST merge returns 403 when governance exists but quorum is not met.""" import musehub.services.musehub_governance as _gov identity_id = compute_identity_id(b"testuser") repo = _make_repo("testuser", "governed-repo", identity_id) db_session.add(repo) proposal_id = compute_proposal_id(repo.repo_id, identity_id, "feat/x", "main", _NOW.isoformat()) proposal = _make_proposal(repo.repo_id, proposal_id) db_session.add(proposal) await db_session.commit() async def _fake_load(session: AsyncSession, repo_id: str) -> JSONObject | None: return _GOVERNANCE_1OF1 monkeypatch.setattr(_gov, "load_governance", _fake_load) resp = await client.post( f"/api/repos/{repo.repo_id}/proposals/{proposal_id}/merge", json={"merge_strategy": "merge_commit"}, headers=auth_headers, ) assert resp.status_code == 403, resp.text body = resp.json() assert "quorum" in body["detail"].lower() assert "0" in body["detail"] or "0/" in body["detail"] @pytest.mark.asyncio async def test_merge_allowed_when_quorum_met( client: AsyncClient, db_session: AsyncSession, auth_headers: dict[str, str], monkeypatch: pytest.MonkeyPatch, ) -> None: """POST merge succeeds when governance quorum is satisfied.""" import musehub.services.musehub_governance as _gov from musehub.db.musehub_repo_models import MusehubBranch from musehub.core.genesis import compute_branch_id identity_id = compute_identity_id(b"testuser") repo = _make_repo("testuser", "governed-repo2", identity_id) db_session.add(repo) # Branches needed for the merge to find commits for bname in ("main", "feat/x"): db_session.add(MusehubBranch( branch_id=compute_branch_id(repo.repo_id, bname), repo_id=repo.repo_id, name=bname, head_commit_id=None, )) proposal_id = compute_proposal_id(repo.repo_id, identity_id, "feat/x", "main", _NOW.isoformat()) proposal = _make_proposal(repo.repo_id, proposal_id) db_session.add(proposal) # Member approves auth_key = _make_auth_key(identity_id, _MEMBER_FP) db_session.add(auth_key) review = _make_review(proposal_id, "testuser", "approved") db_session.add(review) await db_session.commit() async def _fake_load(session: AsyncSession, repo_id: str) -> JSONObject | None: return _GOVERNANCE_1OF1 monkeypatch.setattr(_gov, "load_governance", _fake_load) resp = await client.post( f"/api/repos/{repo.repo_id}/proposals/{proposal_id}/merge", json={"merge_strategy": "merge_commit"}, headers=auth_headers, ) # 200 or 409 (already merged/no commits) — NOT 403 assert resp.status_code != 403, resp.text @pytest.mark.asyncio async def test_merge_unaffected_without_governance( client: AsyncClient, db_session: AsyncSession, auth_headers: dict[str, str], monkeypatch: pytest.MonkeyPatch, ) -> None: """POST merge proceeds normally when repo has no governance.json.""" import musehub.services.musehub_governance as _gov identity_id = compute_identity_id(b"testuser") repo = _make_repo("testuser", "ungoverned-repo", identity_id) db_session.add(repo) proposal_id = compute_proposal_id(repo.repo_id, identity_id, "feat/x", "main", _NOW.isoformat()) proposal = _make_proposal(repo.repo_id, proposal_id) db_session.add(proposal) await db_session.commit() async def _fake_load(session: AsyncSession, repo_id: str) -> JSONObject | None: return None # no governance.json monkeypatch.setattr(_gov, "load_governance", _fake_load) resp = await client.post( f"/api/repos/{repo.repo_id}/proposals/{proposal_id}/merge", json={"merge_strategy": "merge_commit"}, headers=auth_headers, ) assert resp.status_code != 403, resp.text