"""Tests for Phase 2 of issue #35 — list_proposals filter extensions, /heat, /readiness. Tier 2 — Integration Real test DB via ``db_session`` fixture. Exercises filter predicates, sort orders, and the aggregate query functions against live rows. Tier 6 — Performance Measured assertions: each query must complete under a documented threshold on the test-DB hardware. No mocks — all assertions hit the real DB. """ from __future__ import annotations import time import uuid from datetime import datetime, timezone import pytest import pytest_asyncio from httpx import AsyncClient from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_identity_models import MusehubIdentity from musehub.db.musehub_repo_models import MusehubRepo from musehub.db.musehub_social_models import MusehubProposalReview from musehub.models.musehub import ProposalListFilters from musehub.services import musehub_proposals from tests.factories import create_proposal # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── def _now() -> datetime: return datetime.now(tz=timezone.utc) async def _make_repo(session: AsyncSession, owner: str = "testuser", slug: str | None = None) -> MusehubRepo: from musehub.core.genesis import compute_identity_id, compute_repo_id slug = slug or f"repo-{uuid.uuid4().hex[:8]}" owner_user_id = compute_identity_id(owner.encode()) created_at = _now() repo_id = compute_repo_id(owner_user_id, slug, "code", created_at.isoformat()) repo = MusehubRepo( repo_id=repo_id, name=slug, owner=owner, slug=slug, visibility="public", owner_user_id=owner_user_id, description="", tags=[], created_at=created_at, ) session.add(repo) await session.commit() return repo async def _make_identity( session: AsyncSession, handle: str, identity_type: str = "human", ) -> MusehubIdentity: from musehub.core.genesis import compute_identity_id iid = compute_identity_id(handle.encode()) identity = MusehubIdentity( identity_id=iid, handle=handle, identity_type=identity_type, ) session.add(identity) await session.commit() return identity async def _make_review( session: AsyncSession, proposal_id: str, reviewer: str, state: str = "approved", ) -> MusehubProposalReview: from musehub.core.genesis import compute_identity_id, compute_review_id reviewer_identity_id = compute_identity_id(reviewer.encode()) review = MusehubProposalReview( review_id=compute_review_id(proposal_id, reviewer_identity_id, _now().isoformat()), proposal_id=proposal_id, reviewer_username=reviewer, state=state, body=None, submitted_at=_now(), created_at=_now(), ) session.add(review) await session.commit() return review # ───────────────────────────────────────────────────────────────────────────── # Tier 2 — Integration: filter correctness # ───────────────────────────────────────────────────────────────────────────── class TestIntegrationListProposalsState: """list_proposals state filter hits the DB and returns matching rows.""" @pytest.mark.asyncio async def test_state_open_returns_only_open(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) await create_proposal(db_session, repo.repo_id, state="open") await create_proposal(db_session, repo.repo_id, state="merged") result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open") ) assert all(p.state == "open" for p in result.proposals) assert result.total == 1 @pytest.mark.asyncio async def test_state_merged_returns_only_merged(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) await create_proposal(db_session, repo.repo_id, state="open") await create_proposal(db_session, repo.repo_id, state="merged") result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="merged") ) assert all(p.state == "merged" for p in result.proposals) assert result.total == 1 @pytest.mark.asyncio async def test_state_all_returns_all(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) await create_proposal(db_session, repo.repo_id, state="open") await create_proposal(db_session, repo.repo_id, state="merged") result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="all") ) assert result.total == 2 class TestIntegrationListProposalsRiskBand: """list_proposals risk_band filter maps score ranges to SQL predicates.""" @pytest.mark.asyncio async def test_critical_band_returns_high_score_proposals(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) p_critical = await create_proposal(db_session, repo.repo_id, state="open") p_low = await create_proposal(db_session, repo.repo_id, state="open") # Set risk scores directly p_critical.risk_score = 0.9 p_low.risk_score = 0.1 await db_session.commit() result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", risk_band=["critical"]), ) ids = {p.proposal_id for p in result.proposals} assert p_critical.proposal_id in ids assert p_low.proposal_id not in ids @pytest.mark.asyncio async def test_high_band_excludes_critical(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) p_critical = await create_proposal(db_session, repo.repo_id, state="open") p_high = await create_proposal(db_session, repo.repo_id, state="open") p_critical.risk_score = 0.8 p_high.risk_score = 0.6 await db_session.commit() result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", risk_band=["high"]), ) ids = {p.proposal_id for p in result.proposals} assert p_high.proposal_id in ids assert p_critical.proposal_id not in ids @pytest.mark.asyncio async def test_multiple_bands_or_semantics(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) p_crit = await create_proposal(db_session, repo.repo_id, state="open") p_low = await create_proposal(db_session, repo.repo_id, state="open") p_med = await create_proposal(db_session, repo.repo_id, state="open") p_crit.risk_score = 0.9 p_low.risk_score = 0.05 p_med.risk_score = 0.3 await db_session.commit() result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", risk_band=["critical", "low"]), ) ids = {p.proposal_id for p in result.proposals} assert p_crit.proposal_id in ids assert p_low.proposal_id in ids assert p_med.proposal_id not in ids @pytest.mark.asyncio async def test_none_band_returns_zero_score_proposals(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) p_zero = await create_proposal(db_session, repo.repo_id, state="open") p_nonzero = await create_proposal(db_session, repo.repo_id, state="open") p_zero.risk_score = 0.0 p_nonzero.risk_score = 0.5 await db_session.commit() result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", risk_band=["none"]), ) ids = {p.proposal_id for p in result.proposals} assert p_zero.proposal_id in ids assert p_nonzero.proposal_id not in ids class TestIntegrationListProposalsDomain: """list_proposals domain filter returns only proposals active in that domain.""" @pytest.mark.asyncio async def test_code_domain_returns_proposals_with_nonzero_risk(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) p_code = await create_proposal(db_session, repo.repo_id, state="open") p_none = await create_proposal(db_session, repo.repo_id, state="open") p_code.risk_score = 0.4 p_none.risk_score = 0.0 await db_session.commit() result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", domain=["code"]), ) ids = {p.proposal_id for p in result.proposals} assert p_code.proposal_id in ids assert p_none.proposal_id not in ids @pytest.mark.asyncio async def test_no_domain_filter_returns_all(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) p1 = await create_proposal(db_session, repo.repo_id, state="open") p2 = await create_proposal(db_session, repo.repo_id, state="open") p1.risk_score = 0.4 p2.risk_score = 0.0 await db_session.commit() result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open"), ) assert result.total == 2 class TestIntegrationListProposalsAuthorType: """list_proposals author_type filter joins identities table.""" @pytest.mark.asyncio async def test_agent_filter_returns_agent_proposals(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) await _make_identity(db_session, "bot-1", identity_type="agent") await _make_identity(db_session, "human-1", identity_type="human") p_agent = await create_proposal(db_session, repo.repo_id, state="open", author="bot-1") p_human = await create_proposal(db_session, repo.repo_id, state="open", author="human-1") result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", author_type="agent"), ) ids = {p.proposal_id for p in result.proposals} assert p_agent.proposal_id in ids assert p_human.proposal_id not in ids @pytest.mark.asyncio async def test_human_filter_returns_human_and_unknown_authors(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) await _make_identity(db_session, "human-2", identity_type="human") p_human = await create_proposal(db_session, repo.repo_id, state="open", author="human-2") p_unknown = await create_proposal(db_session, repo.repo_id, state="open", author="no-identity") result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", author_type="human"), ) ids = {p.proposal_id for p in result.proposals} assert p_human.proposal_id in ids assert p_unknown.proposal_id in ids class TestIntegrationListProposalsAssignedReviewer: """list_proposals assigned_reviewer filter checks proposal reviews.""" @pytest.mark.asyncio async def test_assigned_reviewer_returns_proposals_with_pending_review( self, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) p_assigned = await create_proposal(db_session, repo.repo_id, state="open") p_other = await create_proposal(db_session, repo.repo_id, state="open") await _make_review(db_session, p_assigned.proposal_id, "alice", state="pending") result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", assigned_reviewer="alice"), ) ids = {p.proposal_id for p in result.proposals} assert p_assigned.proposal_id in ids assert p_other.proposal_id not in ids @pytest.mark.asyncio async def test_dismissed_review_not_returned(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) p = await create_proposal(db_session, repo.repo_id, state="open") await _make_review(db_session, p.proposal_id, "alice", state="dismissed") result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", assigned_reviewer="alice"), ) assert result.total == 0 class TestIntegrationListProposalsSort: """list_proposals sort orders return proposals in correct sequence.""" @pytest.mark.asyncio async def test_newest_sort_returns_most_recent_first(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) p1 = await create_proposal(db_session, repo.repo_id, state="open") p2 = await create_proposal(db_session, repo.repo_id, state="open") # Ensure p2 has later created_at p2.created_at = datetime(2030, 1, 2, tzinfo=timezone.utc) p1.created_at = datetime(2030, 1, 1, tzinfo=timezone.utc) await db_session.commit() result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", sort="newest"), ) assert result.proposals[0].proposal_id == p2.proposal_id @pytest.mark.asyncio async def test_oldest_sort_returns_earliest_first(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) p1 = await create_proposal(db_session, repo.repo_id, state="open") p2 = await create_proposal(db_session, repo.repo_id, state="open") p2.created_at = datetime(2030, 1, 2, tzinfo=timezone.utc) p1.created_at = datetime(2030, 1, 1, tzinfo=timezone.utc) await db_session.commit() result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", sort="oldest"), ) assert result.proposals[0].proposal_id == p1.proposal_id @pytest.mark.asyncio async def test_risk_desc_returns_highest_risk_first(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) p_low = await create_proposal(db_session, repo.repo_id, state="open") p_high = await create_proposal(db_session, repo.repo_id, state="open") p_low.risk_score = 0.2 p_high.risk_score = 0.8 await db_session.commit() result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", sort="risk_desc"), ) assert result.proposals[0].proposal_id == p_high.proposal_id @pytest.mark.asyncio async def test_merge_ready_first_surfaces_ready_proposals(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) p_ready = await create_proposal(db_session, repo.repo_id, state="open") p_not_ready = await create_proposal(db_session, repo.repo_id, state="open") p_ready.breakage_count = 0 p_not_ready.breakage_count = 5 await db_session.commit() # Give p_ready two approvals await _make_review(db_session, p_ready.proposal_id, "reviewer-a") await _make_review(db_session, p_ready.proposal_id, "reviewer-b") result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", sort="merge_ready_first"), ) assert len(result.proposals) == 2 assert result.proposals[0].proposal_id == p_ready.proposal_id class TestIntegrationListProposalsPagination: """list_proposals pagination respects limit and cursor.""" @pytest.mark.asyncio async def test_limit_caps_result_count(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) for _ in range(5): await create_proposal(db_session, repo.repo_id, state="open") result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", limit=3), ) assert len(result.proposals) == 3 assert result.total == 5 assert result.next_cursor is not None @pytest.mark.asyncio async def test_cursor_advances_to_next_page(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) for _ in range(4): await create_proposal(db_session, repo.repo_id, state="open") page1 = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", sort="oldest", limit=2), ) assert page1.next_cursor is not None page2 = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", sort="oldest", limit=2, cursor=page1.next_cursor), ) ids1 = {p.proposal_id for p in page1.proposals} ids2 = {p.proposal_id for p in page2.proposals} assert ids1 & ids2 == set() # no overlap between pages # ───────────────────────────────────────────────────────────────────────────── # Tier 2 — Integration: get_domain_heat # ───────────────────────────────────────────────────────────────────────────── class TestIntegrationDomainHeat: """get_domain_heat aggregates domain activity from live rows.""" @pytest.mark.asyncio async def test_code_domain_count_matches_proposals_with_nonzero_risk( self, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) p1 = await create_proposal(db_session, repo.repo_id, state="open") p2 = await create_proposal(db_session, repo.repo_id, state="open") p3 = await create_proposal(db_session, repo.repo_id, state="open") p1.risk_score = 0.6 p2.risk_score = 0.3 p3.risk_score = 0.0 await db_session.commit() heat = await musehub_proposals.get_domain_heat(repo.repo_id, "open", db_session) assert heat.total_open == 3 assert "code" in heat.domains # code is the only domain; all proposals belong to it regardless of risk score assert heat.domains["code"].count == 3 @pytest.mark.asyncio async def test_empty_repo_returns_zero_counts(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) heat = await musehub_proposals.get_domain_heat(repo.repo_id, "open", db_session) assert heat.total_open == 0 assert heat.domains == {} or all(v.count == 0 for v in heat.domains.values()) @pytest.mark.asyncio async def test_merged_proposals_excluded_from_open_heat( self, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) p_open = await create_proposal(db_session, repo.repo_id, state="open") p_merged = await create_proposal(db_session, repo.repo_id, state="merged") p_open.risk_score = 0.5 p_merged.risk_score = 0.9 await db_session.commit() heat = await musehub_proposals.get_domain_heat(repo.repo_id, "open", db_session) assert heat.total_open == 1 @pytest.mark.asyncio async def test_avg_risk_in_code_domain(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) p1 = await create_proposal(db_session, repo.repo_id, state="open") p2 = await create_proposal(db_session, repo.repo_id, state="open") p1.risk_score = 0.4 p2.risk_score = 0.6 await db_session.commit() heat = await musehub_proposals.get_domain_heat(repo.repo_id, "open", db_session) assert "code" in heat.domains assert abs(heat.domains["code"].avg_risk - 0.5) < 0.01 # ───────────────────────────────────────────────────────────────────────────── # Tier 2 — Integration: get_merge_readiness # ───────────────────────────────────────────────────────────────────────────── class TestIntegrationMergeReadiness: """get_merge_readiness buckets proposals by merge readiness from live rows.""" @pytest.mark.asyncio async def test_ready_proposal_with_two_approvals_no_breakage( self, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) p = await create_proposal(db_session, repo.repo_id, state="open") p.breakage_count = 0 await db_session.commit() await _make_review(db_session, p.proposal_id, "r1") await _make_review(db_session, p.proposal_id, "r2") readiness = await musehub_proposals.get_merge_readiness(repo.repo_id, db_session) assert p.proposal_number in readiness.ready @pytest.mark.asyncio async def test_proposal_with_breakage_goes_to_needs_review( self, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) p = await create_proposal(db_session, repo.repo_id, state="open") p.breakage_count = 3 await db_session.commit() await _make_review(db_session, p.proposal_id, "r1") await _make_review(db_session, p.proposal_id, "r2") readiness = await musehub_proposals.get_merge_readiness(repo.repo_id, db_session) assert p.proposal_number in readiness.needs_review assert p.proposal_number not in readiness.ready @pytest.mark.asyncio async def test_settling_proposal_goes_to_settling_bucket( self, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) p = await create_proposal(db_session, repo.repo_id, state="settling") await db_session.commit() readiness = await musehub_proposals.get_merge_readiness(repo.repo_id, db_session) assert p.proposal_number in readiness.settling @pytest.mark.asyncio async def test_merged_proposals_excluded_from_readiness( self, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) p_merged = await create_proposal(db_session, repo.repo_id, state="merged") await db_session.commit() readiness = await musehub_proposals.get_merge_readiness(repo.repo_id, db_session) all_numbers = readiness.ready + readiness.blocked + readiness.settling + readiness.needs_review assert p_merged.proposal_number not in all_numbers @pytest.mark.asyncio async def test_proposal_with_one_approval_goes_to_needs_review( self, db_session: AsyncSession ) -> None: repo = await _make_repo(db_session) p = await create_proposal(db_session, repo.repo_id, state="open") p.breakage_count = 0 await db_session.commit() await _make_review(db_session, p.proposal_id, "r1") # only 1, need 2 readiness = await musehub_proposals.get_merge_readiness(repo.repo_id, db_session) assert p.proposal_number in readiness.needs_review assert p.proposal_number not in readiness.ready # ───────────────────────────────────────────────────────────────────────────── # Tier 6 — Performance: query timing assertions # ───────────────────────────────────────────────────────────────────────────── class TestPerformanceListProposals: """list_proposals completes within documented time budgets. Tier 6 (Performance): measured assertions on the test DB. Thresholds are deliberately generous to avoid flaky CI — these are floor benchmarks, not tight SLAs. """ @pytest.mark.asyncio async def test_list_20_proposals_under_100ms(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) for _ in range(20): await create_proposal(db_session, repo.repo_id, state="open") t0 = time.perf_counter() result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", limit=20) ) elapsed_ms = (time.perf_counter() - t0) * 1000 assert len(result.proposals) == 20 assert elapsed_ms < 100, f"list_proposals took {elapsed_ms:.1f}ms (budget: 100ms)" @pytest.mark.asyncio async def test_get_domain_heat_under_50ms(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) for i in range(10): p = await create_proposal(db_session, repo.repo_id, state="open") p.risk_score = float(i) / 10 await db_session.commit() t0 = time.perf_counter() heat = await musehub_proposals.get_domain_heat(repo.repo_id, "open", db_session) elapsed_ms = (time.perf_counter() - t0) * 1000 assert heat.total_open == 10 assert elapsed_ms < 50, f"get_domain_heat took {elapsed_ms:.1f}ms (budget: 50ms)" @pytest.mark.asyncio async def test_get_merge_readiness_under_50ms(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) for _ in range(10): await create_proposal(db_session, repo.repo_id, state="open") t0 = time.perf_counter() readiness = await musehub_proposals.get_merge_readiness(repo.repo_id, db_session) elapsed_ms = (time.perf_counter() - t0) * 1000 assert len(readiness.needs_review) == 10 assert elapsed_ms < 50, f"get_merge_readiness took {elapsed_ms:.1f}ms (budget: 50ms)" @pytest.mark.asyncio async def test_risk_desc_sort_under_100ms(self, db_session: AsyncSession) -> None: repo = await _make_repo(db_session) for i in range(20): p = await create_proposal(db_session, repo.repo_id, state="open") p.risk_score = float(i) / 20 await db_session.commit() t0 = time.perf_counter() result = await musehub_proposals.list_proposals( db_session, repo.repo_id, filters=ProposalListFilters(state="open", sort="risk_desc", limit=20), ) elapsed_ms = (time.perf_counter() - t0) * 1000 assert len(result.proposals) == 20 assert elapsed_ms < 100, f"risk_desc sort took {elapsed_ms:.1f}ms (budget: 100ms)"