"""Phase 5 — API Surface tests (issue #37). Tier 1 — Unit (pure / no DB) ProposalResponse new fields: - blocked_by / is_blocked / blocks present and default to empty/False - latest_simulations defaults to empty dict ProposalListEntry new fields: - merge_strategy field present and defaults to overlay - simulation_conflict_count field present and defaults to None ProposalListFilters new fields: - is_draft filter field present - merge_strategy filter field present Tier 5 — Integration (DB) get_proposal enrichment: - blocked_by populated when unmerged dependency exists - is_blocked True when proposal has live dependency - is_blocked False when all dependencies are merged - latest_simulations populated after run_simulation called list_proposals filters: - proposal_type filter returns only matching type - is_draft=True returns only draft proposals - is_draft=False returns only non-draft proposals - merge_strategy filter returns only matching strategy _enrich_one multi-domain risk: - dimensional_risk dict drives domain_risk when populated - active_domains contains all non-zero dimensional_risk domains - aggregate_risk_score uses dimensional_risk values enrich_proposal_list_batch simulation column: - simulation_conflict_count populated from prefetched conflict_scan - simulation_conflict_count is None before any simulation run """ from __future__ import annotations import os from datetime import datetime, timezone import pytest from muse.core.types import blob_id, fake_id, short_id from sqlalchemy.ext.asyncio import AsyncSession from musehub.types.json_types import StrDict from musehub.models.musehub import ( ProposalListEntry, ProposalListFilters, ProposalResponse, ) # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── def _now() -> datetime: return datetime.now(tz=timezone.utc) def _uid() -> str: return short_id(blob_id(os.urandom(16)), strip=True) def _oid(label: int | str) -> str: return fake_id(str(label)) # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — model shape # ───────────────────────────────────────────────────────────────────────────── class TestProposalResponseShape: def test_blocked_by_defaults_empty(self) -> None: r = ProposalResponse( proposal_id=fake_id("p1"), proposal_number=1, title="t", body="", state="open", from_branch="feat/x", to_branch="dev", created_at=_now(), ) assert r.blocked_by == [] assert r.blocks == [] assert r.is_blocked is False def test_latest_simulations_defaults_empty(self) -> None: r = ProposalResponse( proposal_id=fake_id("p1"), proposal_number=1, title="t", body="", state="open", from_branch="feat/x", to_branch="dev", created_at=_now(), ) assert r.latest_simulations == {} class TestProposalListEntryShape: def _entry(self, **kwargs: typing.Any) -> ProposalListEntry: defaults = dict( proposal_id=fake_id("le"), proposal_number=1, title="t", state="open", from_branch="feat/x", to_branch="dev", created_at=_now(), ) defaults.update(kwargs) return ProposalListEntry(**defaults) def test_merge_strategy_defaults_overlay(self) -> None: e = self._entry() assert e.merge_strategy == "overlay" def test_simulation_conflict_count_defaults_none(self) -> None: e = self._entry() assert e.simulation_conflict_count is None def test_merge_strategy_round_trips(self) -> None: e = self._entry(merge_strategy="weave") assert e.merge_strategy == "weave" class TestProposalListFiltersShape: def test_is_draft_defaults_none(self) -> None: f = ProposalListFilters() assert f.is_draft is None def test_merge_strategy_defaults_none(self) -> None: f = ProposalListFilters() assert f.merge_strategy is None def test_is_draft_true_accepted(self) -> None: f = ProposalListFilters(is_draft=True) assert f.is_draft is True def test_merge_strategy_list_accepted(self) -> None: f = ProposalListFilters(merge_strategy=["weave", "phased"]) assert f.merge_strategy == ["weave", "phased"] # ───────────────────────────────────────────────────────────────────────────── # Integration helpers # ───────────────────────────────────────────────────────────────────────────── async def _make_repo(session: AsyncSession) -> str: from musehub.core.genesis import compute_identity_id, compute_repo_id from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubRepo from musehub.db.musehub_social_models import MusehubProposal owner = "p5test" slug = f"p5-{_uid()}" owner_id = compute_identity_id(owner.encode()) created_at = _now() repo = MusehubRepo( repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()), name=slug, owner=owner, slug=slug, visibility="public", owner_user_id=owner_id, description="", tags=[], created_at=created_at, ) session.add(repo) await session.flush() return repo.repo_id async def _make_branch_with_commit( session: AsyncSession, repo_id: str, branch_name: str, manifest: StrDict, ) -> str: from musehub.core.genesis import compute_branch_id from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitRef, MusehubRepo from musehub.db.musehub_social_models import MusehubProposal from musehub.muse_cli.snapshot import compute_commit_id, compute_snapshot_id from musehub.services.musehub_snapshot import upsert_snapshot_entries created_at = _now() snapshot_id = compute_snapshot_id(manifest) await upsert_snapshot_entries(session, repo_id, snapshot_id, manifest) commit_id = compute_commit_id( [], snapshot_id, f"init {branch_name}", created_at.isoformat(), author="p5test", signer_public_key="", ) commit = MusehubCommit( commit_id=commit_id, branch=branch_name, parent_ids=[], message=f"init {branch_name}", author="p5test", timestamp=created_at, snapshot_id=snapshot_id, ) session.add(commit) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id)) branch = MusehubBranch( branch_id=compute_branch_id(repo_id, branch_name), repo_id=repo_id, name=branch_name, head_commit_id=commit_id, ) session.add(branch) await session.flush() return commit_id async def _make_proposal( session: AsyncSession, repo_id: str, *, from_branch: str = "feat/x", to_branch: str = "dev", proposal_type: str = "state_merge", is_draft: bool = False, merge_strategy: str = "overlay", ) -> str: from musehub.services.musehub_proposals import create_proposal p = await create_proposal( session, repo_id=repo_id, title="p5 proposal", from_branch=from_branch, to_branch=to_branch, author="p5test", author_identity_id=fake_id("p5-identity"), proposal_type=proposal_type, is_draft=is_draft, merge_strategy=merge_strategy, ) return p.proposal_id # ───────────────────────────────────────────────────────────────────────────── # Tier 5 — get_proposal enrichment # ───────────────────────────────────────────────────────────────────────────── class TestGetProposalEnrichment: @pytest.mark.asyncio async def test_blocked_by_populated_when_dep_exists( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_proposals import get_proposal from musehub.services.proposal_dag import create_dependency_edges repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit(db_session, repo_id, "feat/dep", {}) await _make_branch_with_commit(db_session, repo_id, "feat/main", {}) dep_id = await _make_proposal( db_session, repo_id, from_branch="feat/dep", to_branch="dev" ) main_id = await _make_proposal( db_session, repo_id, from_branch="feat/main", to_branch="dev" ) await create_dependency_edges(db_session, main_id, [dep_id]) result = await get_proposal(db_session, repo_id, main_id) assert result is not None assert result.is_blocked is True assert len(result.blocked_by) == 1 @pytest.mark.asyncio async def test_is_blocked_false_when_dep_merged( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_proposals import get_proposal from musehub.services.proposal_dag import create_dependency_edges from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubRepo from musehub.db.musehub_social_models import MusehubProposal repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit(db_session, repo_id, "feat/dep", {}) await _make_branch_with_commit(db_session, repo_id, "feat/main", {}) dep_id = await _make_proposal( db_session, repo_id, from_branch="feat/dep", to_branch="dev" ) main_id = await _make_proposal( db_session, repo_id, from_branch="feat/main", to_branch="dev" ) await create_dependency_edges(db_session, main_id, [dep_id]) # Mark dep as merged dep_row = await db_session.get(MusehubProposal, dep_id) assert dep_row is not None dep_row.state = "merged" await db_session.flush() result = await get_proposal(db_session, repo_id, main_id) assert result is not None assert result.is_blocked is False assert result.blocked_by == [] @pytest.mark.asyncio async def test_latest_simulations_populated_after_run( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_proposals import get_proposal, run_simulation repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {"a.py": _oid(1)}) await _make_branch_with_commit( db_session, repo_id, "feat/x", {"a.py": _oid(2)} ) proposal_id = await _make_proposal(db_session, repo_id) await run_simulation(db_session, repo_id, proposal_id, "conflict_scan") result = await get_proposal(db_session, repo_id, proposal_id) assert result is not None assert "conflict_scan" in result.latest_simulations assert "conflict_count" in result.latest_simulations["conflict_scan"]["result"] @pytest.mark.asyncio async def test_latest_simulations_empty_before_run( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_proposals import get_proposal repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit(db_session, repo_id, "feat/x", {}) proposal_id = await _make_proposal(db_session, repo_id) result = await get_proposal(db_session, repo_id, proposal_id) assert result is not None assert result.latest_simulations == {} # ───────────────────────────────────────────────────────────────────────────── # Tier 5 — list_proposals filters # ───────────────────────────────────────────────────────────────────────────── class TestListProposalsFilters: @pytest.mark.asyncio async def test_proposal_type_filter(self, db_session: AsyncSession) -> None: from musehub.services.musehub_proposals import list_proposals repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit(db_session, repo_id, "feat/a", {}) await _make_branch_with_commit(db_session, repo_id, "feat/b", {}) await _make_proposal( db_session, repo_id, from_branch="feat/a", proposal_type="midi_evolution" ) await _make_proposal( db_session, repo_id, from_branch="feat/b", proposal_type="state_merge" ) resp = await list_proposals( db_session, repo_id, filters=ProposalListFilters(proposal_type=["midi_evolution"], state="all"), ) assert resp.total == 1 assert resp.proposals[0].proposal_type.value == "midi_evolution" @pytest.mark.asyncio async def test_is_draft_true_filter(self, db_session: AsyncSession) -> None: from musehub.services.musehub_proposals import list_proposals repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit(db_session, repo_id, "feat/a", {}) await _make_branch_with_commit(db_session, repo_id, "feat/b", {}) await _make_proposal(db_session, repo_id, from_branch="feat/a", is_draft=True) await _make_proposal(db_session, repo_id, from_branch="feat/b", is_draft=False) resp = await list_proposals( db_session, repo_id, filters=ProposalListFilters(is_draft=True, state="all"), ) assert resp.total == 1 assert resp.proposals[0].is_draft is True @pytest.mark.asyncio async def test_is_draft_false_filter(self, db_session: AsyncSession) -> None: from musehub.services.musehub_proposals import list_proposals repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit(db_session, repo_id, "feat/a", {}) await _make_branch_with_commit(db_session, repo_id, "feat/b", {}) await _make_proposal(db_session, repo_id, from_branch="feat/a", is_draft=True) await _make_proposal(db_session, repo_id, from_branch="feat/b", is_draft=False) resp = await list_proposals( db_session, repo_id, filters=ProposalListFilters(is_draft=False, state="all"), ) assert resp.total == 1 assert resp.proposals[0].is_draft is False @pytest.mark.asyncio async def test_merge_strategy_filter(self, db_session: AsyncSession) -> None: from musehub.services.musehub_proposals import list_proposals repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit(db_session, repo_id, "feat/a", {}) await _make_branch_with_commit(db_session, repo_id, "feat/b", {}) await _make_proposal( db_session, repo_id, from_branch="feat/a", merge_strategy="weave" ) await _make_proposal( db_session, repo_id, from_branch="feat/b", merge_strategy="overlay" ) resp = await list_proposals( db_session, repo_id, filters=ProposalListFilters(merge_strategy=["weave"], state="all"), ) assert resp.total == 1 # ───────────────────────────────────────────────────────────────────────────── # Tier 5 — _enrich_one multi-domain risk # ───────────────────────────────────────────────────────────────────────────── class TestEnrichOneMultiDomainRisk: @pytest.mark.asyncio async def test_dimensional_risk_drives_domain_risk( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_proposals import ( enrich_proposal_list_batch, list_proposals, ) from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubRepo from musehub.db.musehub_social_models import MusehubProposal repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit(db_session, repo_id, "feat/x", {}) proposal_id = await _make_proposal(db_session, repo_id) # Manually set dimensional_risk on the ORM row row = await db_session.get(MusehubProposal, proposal_id) assert row is not None row.dimensional_risk = {"code": 0.6, "midi": 0.3} await db_session.flush() # Fetch the row and enrich it rows = list( ( await db_session.execute( __import__("sqlalchemy").select(MusehubProposal).where( MusehubProposal.proposal_id == proposal_id ) ) ).scalars() ) entries = await enrich_proposal_list_batch(rows, db_session) assert len(entries) == 1 entry = entries[0] assert "code" in entry.domain_risk assert "midi" in entry.domain_risk assert entry.domain_risk["code"] == pytest.approx(0.6) assert entry.domain_risk["midi"] == pytest.approx(0.3) assert "code" in entry.active_domains assert "midi" in entry.active_domains @pytest.mark.asyncio async def test_aggregate_risk_uses_weighted_mean( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_proposals import enrich_proposal_list_batch from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubRepo from musehub.db.musehub_social_models import MusehubProposal repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit(db_session, repo_id, "feat/x", {}) proposal_id = await _make_proposal(db_session, repo_id) row = await db_session.get(MusehubProposal, proposal_id) assert row is not None row.dimensional_risk = {"code": 0.8} await db_session.flush() rows = list( ( await db_session.execute( __import__("sqlalchemy").select(MusehubProposal).where( MusehubProposal.proposal_id == proposal_id ) ) ).scalars() ) entries = await enrich_proposal_list_batch(rows, db_session) assert entries[0].aggregate_risk_score > 0.0 assert entries[0].aggregate_risk_band in ("critical", "high", "medium", "low") # ───────────────────────────────────────────────────────────────────────────── # Tier 5 — simulation_conflict_count in list batch # ───────────────────────────────────────────────────────────────────────────── class TestSimulationConflictCountInBatch: @pytest.mark.asyncio async def test_conflict_count_none_before_run( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_proposals import enrich_proposal_list_batch from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubRepo from musehub.db.musehub_social_models import MusehubProposal repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit(db_session, repo_id, "feat/x", {}) proposal_id = await _make_proposal(db_session, repo_id) rows = list( ( await db_session.execute( __import__("sqlalchemy").select(MusehubProposal).where( MusehubProposal.proposal_id == proposal_id ) ) ).scalars() ) entries = await enrich_proposal_list_batch(rows, db_session) assert entries[0].simulation_conflict_count is None @pytest.mark.asyncio async def test_conflict_count_populated_after_run( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_proposals import ( enrich_proposal_list_batch, run_simulation, ) from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubRepo from musehub.db.musehub_social_models import MusehubProposal repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {"a.py": _oid(1)}) await _make_branch_with_commit( db_session, repo_id, "feat/x", {"a.py": _oid(2)} ) proposal_id = await _make_proposal(db_session, repo_id) await run_simulation(db_session, repo_id, proposal_id, "conflict_scan") rows = list( ( await db_session.execute( __import__("sqlalchemy").select(MusehubProposal).where( MusehubProposal.proposal_id == proposal_id ) ) ).scalars() ) entries = await enrich_proposal_list_batch(rows, db_session) assert entries[0].simulation_conflict_count is not None assert isinstance(entries[0].simulation_conflict_count, int)