"""Phase 1 — Proposal reimagination model and ORM tests (issue #37). Tier 1 — Unit (no DB) - ProposalType, ProposalState, MergeStrategy enum values and str behaviour - MergeConditions defaults and validation bounds - ProposalCommentTarget target_type defaults and domain-specific fields - DimensionalRiskVector type alias usage - ProposalCreate with all new fields; defaults; round-trip JSON - ProposalResponse carries new fields; defaults survive serialization Tier 5 — Integration (DB required) - MusehubProposal stores and round-trips all 15 new columns - MusehubProposalReview stores and round-trips 3 new columns - MusehubProposalDependency creates valid DAG edge; unique constraint fires on duplicate - MusehubProposalSimulation caches result; unique constraint fires on (proposal, type) - Dependency cascade: deleting a proposal cascades to its dependency edges """ from __future__ import annotations import uuid from datetime import datetime, timezone import pytest from sqlalchemy.ext.asyncio import AsyncSession from musehub.types.json_types import JSONObject from musehub.models.musehub import ( DimensionalRiskVector, MergeConditions, MergeStrategy, ProposalCommentTarget, ProposalCreate, ProposalResponse, ProposalState, ProposalType, ) # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── def _now() -> datetime: return datetime.now(tz=timezone.utc) def _uid() -> str: return uuid.uuid4().hex[:12] async def _make_repo(session: AsyncSession) -> str: from musehub.core.genesis import compute_identity_id, compute_repo_id from musehub.db.musehub_repo_models import MusehubRepo from musehub.db.musehub_social_models import MusehubProposal, MusehubProposalDependency, MusehubProposalReview, MusehubProposalSimulation owner = "p1tester" slug = f"repo-{_uid()}" owner_id = compute_identity_id(owner.encode()) created_at = _now() repo = MusehubRepo( repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()), name=slug, owner=owner, slug=slug, visibility="public", owner_user_id=owner_id, description="", tags=[], created_at=created_at, ) session.add(repo) await session.flush() return repo.repo_id async def _make_proposal(session: AsyncSession, repo_id: str, *, proposal_number: int = 1) -> "MusehubProposal": from musehub.core.genesis import compute_identity_id, compute_proposal_id from musehub.db.musehub_repo_models import MusehubRepo from musehub.db.musehub_social_models import MusehubProposal, MusehubProposalDependency, MusehubProposalReview, MusehubProposalSimulation author_id = compute_identity_id(b"p1tester") created_at = _now() proposal = MusehubProposal( proposal_id=compute_proposal_id(repo_id, author_id, f"feat/{_uid()}", "dev", created_at.isoformat()), repo_id=repo_id, proposal_number=proposal_number, title="feat: phase 1 test proposal", from_branch=f"feat/{_uid()}", to_branch="dev", created_at=created_at, ) session.add(proposal) await session.flush() return proposal # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — Unit: enums # ───────────────────────────────────────────────────────────────────────────── class TestProposalTypeEnum: def test_all_seven_values_exist(self) -> None: expected = { "state_merge", "stem_integration", "midi_evolution", "payment_settlement", "agent_delegation", "identity_transition", "canonical_release", } assert {m.value for m in ProposalType} == expected def test_is_str_subclass(self) -> None: assert isinstance(ProposalType.STATE_MERGE, str) assert ProposalType.STATE_MERGE == "state_merge" def test_json_serialises_as_plain_string(self) -> None: import json payload = json.dumps({"t": ProposalType.MIDI_EVOLUTION}) assert json.loads(payload)["t"] == "midi_evolution" class TestProposalStateEnum: def test_all_seven_values_exist(self) -> None: expected = {"drafting", "open", "in_review", "approved", "settling", "merged", "abandoned"} assert {m.value for m in ProposalState} == expected def test_is_str_subclass(self) -> None: assert ProposalState.OPEN == "open" assert ProposalState.MERGED != ProposalState.ABANDONED class TestMergeStrategyEnum: def test_all_values_exist(self) -> None: expected = {"overlay", "weave", "replay", "selective", "phased", "cherry_pick"} assert {m.value for m in MergeStrategy} == expected def test_is_str_subclass(self) -> None: assert isinstance(MergeStrategy.PHASED, str) assert MergeStrategy.SELECTIVE == "selective" # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — Unit: MergeConditions # ───────────────────────────────────────────────────────────────────────────── class TestMergeConditions: def test_defaults(self) -> None: mc = MergeConditions() assert mc.require_approvals == 2 assert mc.require_domains_approved == [] assert mc.max_risk_score == 1.0 assert mc.require_signed_commits is False assert mc.require_no_breakage is False assert mc.require_test_coverage is False assert mc.require_payment_settled is False assert mc.require_dependency_merged is True assert mc.max_agent_commit_ratio == 1.0 def test_require_approvals_non_negative(self) -> None: mc = MergeConditions(require_approvals=0) assert mc.require_approvals == 0 with pytest.raises(Exception): MergeConditions(require_approvals=-1) def test_max_risk_score_clamped(self) -> None: MergeConditions(max_risk_score=0.0) MergeConditions(max_risk_score=1.0) with pytest.raises(Exception): MergeConditions(max_risk_score=1.1) with pytest.raises(Exception): MergeConditions(max_risk_score=-0.1) def test_max_agent_commit_ratio_clamped(self) -> None: MergeConditions(max_agent_commit_ratio=0.0) MergeConditions(max_agent_commit_ratio=1.0) with pytest.raises(Exception): MergeConditions(max_agent_commit_ratio=1.5) def test_require_domains_approved_list(self) -> None: mc = MergeConditions(require_domains_approved=["code", "midi"]) assert mc.require_domains_approved == ["code", "midi"] def test_round_trip_json(self) -> None: mc = MergeConditions(require_approvals=3, require_no_breakage=True) restored = MergeConditions.model_validate(mc.model_dump()) assert restored == mc # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — Unit: ProposalCommentTarget # ───────────────────────────────────────────────────────────────────────────── class TestProposalCommentTarget: def test_default_target_type_is_general(self) -> None: t = ProposalCommentTarget() assert t.target_type == "general" assert t.symbol_address is None def test_code_domain_fields(self) -> None: t = ProposalCommentTarget( target_type="code", symbol_address="auth.py::AuthService.login", line_start=42, line_end=58, ) assert t.symbol_address == "auth.py::AuthService.login" assert t.line_start == 42 assert t.line_end == 58 def test_midi_domain_fields(self) -> None: t = ProposalCommentTarget( target_type="midi", track_name="piano", beat_start=1.0, beat_end=5.0, note_pitch=60, ) assert t.track_name == "piano" assert t.note_pitch == 60 def test_note_pitch_bounds(self) -> None: ProposalCommentTarget(note_pitch=0) ProposalCommentTarget(note_pitch=127) with pytest.raises(Exception): ProposalCommentTarget(note_pitch=128) with pytest.raises(Exception): ProposalCommentTarget(note_pitch=-1) def test_payment_domain_field(self) -> None: t = ProposalCommentTarget(target_type="payment", nonce_hex="deadbeef") assert t.nonce_hex == "deadbeef" def test_identity_domain_field(self) -> None: t = ProposalCommentTarget(target_type="identity", identity_handle="gabriel") assert t.identity_handle == "gabriel" def test_stem_domain_fields(self) -> None: t = ProposalCommentTarget( target_type="stem", stem_id="stem_abc123", timestamp_start=0.5, timestamp_end=4.0, ) assert t.stem_id == "stem_abc123" # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — Unit: DimensionalRiskVector # ───────────────────────────────────────────────────────────────────────────── class TestDimensionalRiskVector: def test_is_dict_alias(self) -> None: v: DimensionalRiskVector = {"code": 0.8, "midi": 0.3} assert v["code"] == 0.8 assert isinstance(v, dict) def test_empty_vector_is_valid(self) -> None: v: DimensionalRiskVector = {} assert len(v) == 0 # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — Unit: ProposalCreate with new fields # ───────────────────────────────────────────────────────────────────────────── class TestProposalCreateNewFields: def _minimal(self) -> JSONObject: return {"title": "feat: x", "from_branch": "feat/x", "to_branch": "dev"} def test_defaults_for_new_fields(self) -> None: pc = ProposalCreate(**self._minimal()) assert pc.proposal_type == ProposalType.STATE_MERGE assert pc.is_draft is False assert pc.merge_conditions is None assert pc.merge_strategy == MergeStrategy.OVERLAY assert pc.selective_domains is None assert pc.depends_on == [] def test_proposal_type_set(self) -> None: pc = ProposalCreate(**self._minimal(), proposal_type=ProposalType.MIDI_EVOLUTION) assert pc.proposal_type == ProposalType.MIDI_EVOLUTION def test_is_draft_set(self) -> None: pc = ProposalCreate(**self._minimal(), is_draft=True) assert pc.is_draft is True def test_merge_conditions_set(self) -> None: mc = MergeConditions(require_approvals=1, require_no_breakage=True) pc = ProposalCreate(**self._minimal(), merge_conditions=mc) assert pc.merge_conditions is not None assert pc.merge_conditions.require_approvals == 1 def test_selective_strategy(self) -> None: pc = ProposalCreate( **self._minimal(), merge_strategy=MergeStrategy.SELECTIVE, selective_domains=["code"], ) assert pc.merge_strategy == MergeStrategy.SELECTIVE assert pc.selective_domains == ["code"] def test_depends_on_list(self) -> None: dep_id = "sha256:" + "a" * 64 pc = ProposalCreate(**self._minimal(), depends_on=[dep_id]) assert dep_id in pc.depends_on def test_camel_json_round_trip(self) -> None: mc = MergeConditions(require_approvals=3) pc = ProposalCreate( **self._minimal(), proposal_type=ProposalType.PAYMENT_SETTLEMENT, is_draft=True, merge_conditions=mc, merge_strategy=MergeStrategy.PHASED, ) wire = pc.model_dump(by_alias=True) assert wire["proposalType"] == "payment_settlement" assert wire["isDraft"] is True assert wire["mergeConditions"]["requireApprovals"] == 3 def test_existing_fields_unchanged(self) -> None: pc = ProposalCreate(title="feat: x", from_branch="feat/x", to_branch="main", body="desc") assert pc.title == "feat: x" assert pc.body == "desc" # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — Unit: ProposalResponse carries new fields # ───────────────────────────────────────────────────────────────────────────── class TestProposalResponseNewFields: def _base(self) -> JSONObject: return { "proposal_id": "sha256:" + "b" * 64, "title": "feat: x", "body": "", "state": "open", "from_branch": "feat/x", "to_branch": "dev", "created_at": datetime(2026, 1, 1, tzinfo=timezone.utc), } def test_defaults_present(self) -> None: pr = ProposalResponse(**self._base()) assert pr.proposal_type == ProposalType.STATE_MERGE assert pr.is_draft is False assert pr.merge_conditions is None assert pr.merge_strategy == MergeStrategy.OVERLAY assert pr.selective_domains is None assert pr.depends_on == [] assert pr.risk_score is None assert pr.dimensional_risk == {} def test_dimensional_risk_round_trip(self) -> None: pr = ProposalResponse(**self._base(), dimensional_risk={"code": 0.7, "midi": 0.2}) assert pr.dimensional_risk["code"] == 0.7 def test_risk_score_bounds(self) -> None: ProposalResponse(**self._base(), risk_score=0.0) ProposalResponse(**self._base(), risk_score=1.0) with pytest.raises(Exception): ProposalResponse(**self._base(), risk_score=1.1) # ───────────────────────────────────────────────────────────────────────────── # Tier 5 — Integration: ORM round-trips (require db_session fixture) # ───────────────────────────────────────────────────────────────────────────── class TestMusehubProposalORM: @pytest.mark.asyncio async def test_new_columns_default_values(self, db_session: AsyncSession) -> None: from musehub.db.musehub_repo_models import MusehubRepo from musehub.db.musehub_social_models import MusehubProposal, MusehubProposalDependency, MusehubProposalReview, MusehubProposalSimulation repo_id = await _make_repo(db_session) proposal = await _make_proposal(db_session, repo_id) await db_session.refresh(proposal) assert proposal.proposal_type == "state_merge" assert proposal.is_draft is False assert proposal.merge_conditions is None assert proposal.merge_strategy == "overlay" assert proposal.selective_domains is None assert proposal.dimensional_risk == {} assert proposal.midi_tracks_changed == 0 assert proposal.midi_notes_delta == 0 assert proposal.harmonic_tension_delta is None assert proposal.payment_claim_count == 0 assert proposal.payment_ledger_delta_nano == 0 assert proposal.payment_avax_address is None assert proposal.agent_model is None assert proposal.agent_spawned_by is None @pytest.mark.asyncio async def test_new_columns_explicit_values(self, db_session: AsyncSession) -> None: from musehub.core.genesis import compute_identity_id, compute_proposal_id from musehub.db.musehub_repo_models import MusehubRepo from musehub.db.musehub_social_models import MusehubProposal, MusehubProposalDependency, MusehubProposalReview, MusehubProposalSimulation repo_id = await _make_repo(db_session) author_id = compute_identity_id(b"p1tester") created_at = _now() proposal = MusehubProposal( proposal_id=compute_proposal_id(repo_id, author_id, "feat/rich", "dev", created_at.isoformat()), repo_id=repo_id, proposal_number=2, title="feat: rich proposal", from_branch="feat/rich", to_branch="dev", created_at=created_at, proposal_type="midi_evolution", is_draft=True, merge_conditions={"require_approvals": 1, "require_no_breakage": True}, merge_strategy="selective", selective_domains=["code", "midi"], dimensional_risk={"code": 0.8, "midi": 0.3}, midi_tracks_changed=4, midi_notes_delta=-12, harmonic_tension_delta=0.15, payment_claim_count=3, payment_ledger_delta_nano=500_000_000, payment_avax_address="0xdeadbeef", agent_model="claude-sonnet-4-6", agent_spawned_by="gabriel", ) db_session.add(proposal) await db_session.flush() await db_session.refresh(proposal) assert proposal.proposal_type == "midi_evolution" assert proposal.is_draft is True assert proposal.merge_conditions["require_approvals"] == 1 assert proposal.merge_strategy == "selective" assert proposal.selective_domains == ["code", "midi"] assert proposal.dimensional_risk == {"code": 0.8, "midi": 0.3} assert proposal.midi_tracks_changed == 4 assert proposal.midi_notes_delta == -12 assert abs(proposal.harmonic_tension_delta - 0.15) < 1e-6 assert proposal.payment_claim_count == 3 assert proposal.payment_ledger_delta_nano == 500_000_000 assert proposal.payment_avax_address == "0xdeadbeef" assert proposal.agent_model == "claude-sonnet-4-6" assert proposal.agent_spawned_by == "gabriel" class TestMusehubProposalReviewORM: @pytest.mark.asyncio async def test_new_review_columns_defaults(self, db_session: AsyncSession) -> None: from musehub.core.genesis import compute_identity_id, compute_review_id from musehub.db.musehub_repo_models import MusehubRepo from musehub.db.musehub_social_models import MusehubProposal, MusehubProposalDependency, MusehubProposalReview, MusehubProposalSimulation repo_id = await _make_repo(db_session) proposal = await _make_proposal(db_session, repo_id) reviewer_id = compute_identity_id(b"reviewer1") created_at = _now() review = MusehubProposalReview( review_id=compute_review_id(proposal.proposal_id, reviewer_id, created_at.isoformat()), proposal_id=proposal.proposal_id, reviewer_username="reviewer1", ) db_session.add(review) await db_session.flush() await db_session.refresh(review) assert review.reviewed_domains == [] assert review.domain_risk_acknowledged == {} assert review.suggested_merge_strategy is None @pytest.mark.asyncio async def test_new_review_columns_explicit(self, db_session: AsyncSession) -> None: from musehub.core.genesis import compute_identity_id, compute_review_id from musehub.db.musehub_repo_models import MusehubRepo from musehub.db.musehub_social_models import MusehubProposal, MusehubProposalDependency, MusehubProposalReview, MusehubProposalSimulation repo_id = await _make_repo(db_session) proposal = await _make_proposal(db_session, repo_id, proposal_number=2) reviewer_id = compute_identity_id(b"reviewer2") created_at = _now() review = MusehubProposalReview( review_id=compute_review_id(proposal.proposal_id, reviewer_id, created_at.isoformat()), proposal_id=proposal.proposal_id, reviewer_username="reviewer2", reviewed_domains=["code", "midi"], domain_risk_acknowledged={"code": True, "midi": False}, suggested_merge_strategy="weave", ) db_session.add(review) await db_session.flush() await db_session.refresh(review) assert review.reviewed_domains == ["code", "midi"] assert review.domain_risk_acknowledged["code"] is True assert review.suggested_merge_strategy == "weave" class TestMusehubProposalDependencyORM: @pytest.mark.asyncio async def test_dependency_edge_created(self, db_session: AsyncSession) -> None: from musehub.db.musehub_repo_models import MusehubRepo from musehub.db.musehub_social_models import MusehubProposal, MusehubProposalDependency, MusehubProposalReview, MusehubProposalSimulation repo_id = await _make_repo(db_session) p1 = await _make_proposal(db_session, repo_id, proposal_number=1) p2 = await _make_proposal(db_session, repo_id, proposal_number=2) dep = MusehubProposalDependency( dep_id=f"dep-{_uid()}", dependent_proposal_id=p2.proposal_id, dependency_proposal_id=p1.proposal_id, ) db_session.add(dep) await db_session.flush() await db_session.refresh(dep) assert dep.dependent_proposal_id == p2.proposal_id assert dep.dependency_proposal_id == p1.proposal_id assert dep.created_at is not None @pytest.mark.asyncio async def test_duplicate_edge_raises(self, db_session: AsyncSession) -> None: from sqlalchemy.exc import IntegrityError from musehub.db.musehub_repo_models import MusehubRepo from musehub.db.musehub_social_models import MusehubProposal, MusehubProposalDependency, MusehubProposalReview, MusehubProposalSimulation repo_id = await _make_repo(db_session) p1 = await _make_proposal(db_session, repo_id, proposal_number=1) p2 = await _make_proposal(db_session, repo_id, proposal_number=2) dep_a = MusehubProposalDependency( dep_id=f"dep-{_uid()}", dependent_proposal_id=p2.proposal_id, dependency_proposal_id=p1.proposal_id, ) dep_b = MusehubProposalDependency( dep_id=f"dep-{_uid()}", dependent_proposal_id=p2.proposal_id, dependency_proposal_id=p1.proposal_id, ) db_session.add(dep_a) db_session.add(dep_b) with pytest.raises(IntegrityError): await db_session.flush() @pytest.mark.asyncio async def test_cascade_delete_on_proposal_delete(self, db_session: AsyncSession) -> None: from sqlalchemy import select from musehub.db.musehub_repo_models import MusehubRepo from musehub.db.musehub_social_models import MusehubProposal, MusehubProposalDependency, MusehubProposalReview, MusehubProposalSimulation repo_id = await _make_repo(db_session) p1 = await _make_proposal(db_session, repo_id, proposal_number=1) p2 = await _make_proposal(db_session, repo_id, proposal_number=2) dep_id = f"dep-{_uid()}" dep = MusehubProposalDependency( dep_id=dep_id, dependent_proposal_id=p2.proposal_id, dependency_proposal_id=p1.proposal_id, ) db_session.add(dep) await db_session.flush() await db_session.delete(p1) await db_session.flush() result = await db_session.execute( select(MusehubProposalDependency).where( MusehubProposalDependency.dep_id == dep_id ) ) assert result.scalar_one_or_none() is None class TestMusehubProposalSimulationORM: @pytest.mark.asyncio async def test_simulation_stored_and_retrieved(self, db_session: AsyncSession) -> None: from musehub.db.musehub_repo_models import MusehubRepo from musehub.db.musehub_social_models import MusehubProposal, MusehubProposalDependency, MusehubProposalReview, MusehubProposalSimulation repo_id = await _make_repo(db_session) proposal = await _make_proposal(db_session, repo_id) sim = MusehubProposalSimulation( simulation_id=f"sim-{_uid()}", proposal_id=proposal.proposal_id, simulation_type="conflict_scan", from_branch_commit_id="sha256:" + "c" * 64, result={"conflicts": [], "safe_merge": True}, duration_ms=142, ) db_session.add(sim) await db_session.flush() await db_session.refresh(sim) assert sim.simulation_type == "conflict_scan" assert sim.result["safe_merge"] is True assert sim.duration_ms == 142 assert sim.expires_at is None @pytest.mark.asyncio async def test_duplicate_simulation_type_raises(self, db_session: AsyncSession) -> None: from sqlalchemy.exc import IntegrityError from musehub.db.musehub_repo_models import MusehubRepo from musehub.db.musehub_social_models import MusehubProposal, MusehubProposalDependency, MusehubProposalReview, MusehubProposalSimulation repo_id = await _make_repo(db_session) proposal = await _make_proposal(db_session, repo_id, proposal_number=2) commit_id = "sha256:" + "d" * 64 sim_a = MusehubProposalSimulation( simulation_id=f"sim-{_uid()}", proposal_id=proposal.proposal_id, simulation_type="risk_projection", from_branch_commit_id=commit_id, result={}, ) sim_b = MusehubProposalSimulation( simulation_id=f"sim-{_uid()}", proposal_id=proposal.proposal_id, simulation_type="risk_projection", from_branch_commit_id=commit_id, result={}, ) db_session.add(sim_a) db_session.add(sim_b) with pytest.raises(IntegrityError): await db_session.flush() @pytest.mark.asyncio async def test_three_simulation_types_coexist(self, db_session: AsyncSession) -> None: from musehub.db.musehub_repo_models import MusehubRepo from musehub.db.musehub_social_models import MusehubProposal, MusehubProposalDependency, MusehubProposalReview, MusehubProposalSimulation repo_id = await _make_repo(db_session) proposal = await _make_proposal(db_session, repo_id, proposal_number=3) commit_id = "sha256:" + "e" * 64 for sim_type in ("conflict_scan", "risk_projection", "dependency_order"): db_session.add(MusehubProposalSimulation( simulation_id=f"sim-{_uid()}", proposal_id=proposal.proposal_id, simulation_type=sim_type, from_branch_commit_id=commit_id, result={"type": sim_type}, )) await db_session.flush() from sqlalchemy import select, func result = await db_session.execute( select(func.count()).select_from(MusehubProposalSimulation).where( MusehubProposalSimulation.proposal_id == proposal.proposal_id ) ) assert result.scalar_one() == 3