musehub_proposals.py
python
sha256:400438cf8bc700a611f1ba798aa9def68290f487dc19f7dbf317985ad17050c9
chore: delete muse/prose domain — hallucinated, never existed
Sonnet 4.6
minor
⚠ breaking
3 days ago
| 1 | """MuseHub merge proposal persistence adapter — single point of DB access for proposals. |
| 2 | |
| 3 | This module is the ONLY place that touches the ``musehub_proposals`` table. |
| 4 | Route handlers delegate here; no business logic lives in routes. |
| 5 | |
| 6 | Boundary rules: |
| 7 | - Must NOT import state stores, SSE queues, or LLM clients. |
| 8 | - May import ORM models from musehub.db domain-specific modules. |
| 9 | - May import Pydantic response models from musehub.models.musehub. |
| 10 | - May import musehub.core.genesis for genesis ID computation. |
| 11 | |
| 12 | Merge strategy |
| 13 | -------------- |
| 14 | ``merge_commit`` is the only strategy at MVP. It creates a new commit on |
| 15 | ``to_branch`` whose parent_ids are [to_branch head, from_branch head], then |
| 16 | updates the ``to_branch`` head pointer and marks the proposal as merged. |
| 17 | |
| 18 | If either branch has no commits yet (no head commit), the merge is rejected with |
| 19 | a ``ValueError`` — there is nothing to merge. |
| 20 | """ |
| 21 | |
| 22 | import logging |
| 23 | from datetime import datetime, timezone |
| 24 | |
| 25 | import sqlalchemy as sa |
| 26 | from sqlalchemy import ColumnElement, func, select |
| 27 | from sqlalchemy.ext.asyncio import AsyncSession |
| 28 | |
| 29 | from musehub.core.genesis import compute_branch_id, compute_comment_id, compute_proposal_id, compute_review_id, compute_simulation_id |
| 30 | from musehub.db.musehub_identity_models import MusehubIdentity |
| 31 | from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitGraph, MusehubCommitRef |
| 32 | from musehub.db.musehub_social_models import ( |
| 33 | MusehubProposal, |
| 34 | MusehubProposalComment, |
| 35 | MusehubProposalReview, |
| 36 | MusehubProposalSimulation, |
| 37 | ) |
| 38 | from musehub.services.proposal_dag import ( |
| 39 | CycleError, |
| 40 | ProposalDag, |
| 41 | blocked_by_numbers, |
| 42 | blocks_numbers, |
| 43 | create_dependency_edges, |
| 44 | is_blocked, |
| 45 | load_dag_for_proposals, |
| 46 | ) |
| 47 | from musehub.muse_cli.snapshot import compute_commit_id, compute_snapshot_id |
| 48 | |
| 49 | |
| 50 | class BranchNotFoundError(Exception): |
| 51 | """Raised when a required branch does not exist in the repo.""" |
| 52 | from musehub.types.json_types import JSONObject, StrDict |
| 53 | from musehub.models.musehub import ( |
| 54 | DomainHeatEntry, |
| 55 | DomainHeatResponse, |
| 56 | MergeReadinessResponse, |
| 57 | MergeResultEmbed, |
| 58 | ProposalCommentListResponse, |
| 59 | ProposalCommentResponse, |
| 60 | ProposalListEntry, |
| 61 | ProposalListFilters, |
| 62 | ProposalListResponse, |
| 63 | ProposalResponse, |
| 64 | ProposalReviewListResponse, |
| 65 | ProposalReviewResponse, |
| 66 | SimulationListResponse, |
| 67 | SimulationResponse, |
| 68 | ) |
| 69 | |
| 70 | type _CommentMap = dict[str, ProposalCommentResponse] |
| 71 | |
| 72 | logger = logging.getLogger(__name__) |
| 73 | |
| 74 | |
| 75 | def _utc_now() -> datetime: |
| 76 | return datetime.now(tz=timezone.utc) |
| 77 | |
| 78 | |
| 79 | |
| 80 | def _symbols_from_delta(delta: JSONObject | None) -> list[str]: |
| 81 | """Extract unique symbol addresses from a structured_delta dict. |
| 82 | |
| 83 | Only child_op addresses that contain ``::`` are returned — file-level ops |
| 84 | without a symbol component are intentionally excluded. |
| 85 | """ |
| 86 | if not isinstance(delta, dict): |
| 87 | return [] |
| 88 | seen: set[str] = set() |
| 89 | for file_op in delta.get("ops") or []: |
| 90 | if not isinstance(file_op, dict): |
| 91 | continue |
| 92 | for child_op in file_op.get("child_ops") or []: |
| 93 | if not isinstance(child_op, dict): |
| 94 | continue |
| 95 | addr = child_op.get("address", "") |
| 96 | if "::" in addr and addr not in seen: |
| 97 | seen.add(addr) |
| 98 | return list(seen) |
| 99 | |
| 100 | |
| 101 | async def _touched_symbols_for_branch( |
| 102 | session: AsyncSession, repo_id: str, branch: str |
| 103 | ) -> list[str]: |
| 104 | """Return the union of symbol addresses touched by all commits on ``branch``.""" |
| 105 | rows = (await session.execute( |
| 106 | select(MusehubCommit.structured_delta) |
| 107 | .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id) |
| 108 | .where( |
| 109 | MusehubCommitRef.repo_id == repo_id, |
| 110 | MusehubCommit.branch == branch, |
| 111 | MusehubCommit.structured_delta.isnot(None), |
| 112 | ) |
| 113 | )).scalars().all() |
| 114 | seen: set[str] = set() |
| 115 | for delta in rows: |
| 116 | seen.update(_symbols_from_delta(delta)) |
| 117 | return list(seen) |
| 118 | |
| 119 | |
| 120 | def _to_proposal_response( |
| 121 | row: MusehubProposal, |
| 122 | *, |
| 123 | dag: ProposalDag | None = None, |
| 124 | simulations: "SimulationListResponse | None" = None, |
| 125 | merge_result: MergeResultEmbed | None = None, |
| 126 | url_prefix: str = "", |
| 127 | ) -> ProposalResponse: |
| 128 | from musehub.models.musehub import MergeConditions, MergeStrategy, ProposalType |
| 129 | mc_raw = getattr(row, "merge_conditions", None) |
| 130 | mc = MergeConditions.model_validate(mc_raw) if mc_raw else None |
| 131 | |
| 132 | blocked_by: list[int] = [] |
| 133 | blocks: list[int] = [] |
| 134 | is_blocked_flag = False |
| 135 | if dag is not None: |
| 136 | blocked_by = blocked_by_numbers(dag, row.proposal_id) |
| 137 | blocks = blocks_numbers(dag, row.proposal_id) |
| 138 | is_blocked_flag = is_blocked(dag, row.proposal_id) |
| 139 | |
| 140 | latest_simulations: dict[str, dict] = {} |
| 141 | if simulations is not None: |
| 142 | for sim in simulations.simulations: |
| 143 | latest_simulations[sim.simulation_type] = { |
| 144 | "simulation_id": sim.simulation_id, |
| 145 | "result": sim.result, |
| 146 | "is_stale": sim.is_stale, |
| 147 | "from_branch_commit_id": sim.from_branch_commit_id, |
| 148 | "duration_ms": sim.duration_ms, |
| 149 | "created_at": sim.created_at.isoformat(), |
| 150 | } |
| 151 | |
| 152 | return ProposalResponse( |
| 153 | proposal_id=row.proposal_id, |
| 154 | proposal_number=row.proposal_number, |
| 155 | url=f"{url_prefix}/proposals/{row.proposal_id}" if url_prefix else "", |
| 156 | title=row.title, |
| 157 | body=row.body, |
| 158 | state=row.state, |
| 159 | from_branch=row.from_branch, |
| 160 | to_branch=row.to_branch, |
| 161 | merge_commit_id=row.merge_commit_id, |
| 162 | merged_at=row.merged_at, |
| 163 | author=row.author, |
| 164 | created_at=row.created_at, |
| 165 | proposal_type=ProposalType(getattr(row, "proposal_type", "state_merge")), |
| 166 | is_draft=getattr(row, "is_draft", False), |
| 167 | merge_conditions=mc, |
| 168 | merge_strategy=MergeStrategy(getattr(row, "merge_strategy", "overlay")), |
| 169 | selective_domains=getattr(row, "selective_domains", None), |
| 170 | risk_score=getattr(row, "risk_score", None), |
| 171 | dimensional_risk=dict(getattr(row, "dimensional_risk", None) or {}), |
| 172 | blocked_by=blocked_by, |
| 173 | blocks=blocks, |
| 174 | is_blocked=is_blocked_flag, |
| 175 | latest_simulations=latest_simulations, |
| 176 | proposer_signature=getattr(row, "proposer_signature", None), |
| 177 | proposer_public_key=getattr(row, "proposer_public_key", None), |
| 178 | from_snapshot_id=getattr(row, "from_snapshot_id", None), |
| 179 | to_snapshot_id=getattr(row, "to_snapshot_id", None), |
| 180 | merge_result=merge_result, |
| 181 | ) |
| 182 | |
| 183 | |
| 184 | async def _get_branch( |
| 185 | session: AsyncSession, repo_id: str, branch_name: str |
| 186 | ) -> MusehubBranch | None: |
| 187 | """Return the branch record by repo + name, or None.""" |
| 188 | stmt = select(MusehubBranch).where( |
| 189 | MusehubBranch.repo_id == repo_id, |
| 190 | MusehubBranch.name == branch_name, |
| 191 | ) |
| 192 | return (await session.execute(stmt)).scalar_one_or_none() |
| 193 | |
| 194 | |
| 195 | async def create_proposal( |
| 196 | session: AsyncSession, |
| 197 | *, |
| 198 | repo_id: str, |
| 199 | title: str, |
| 200 | from_branch: str, |
| 201 | to_branch: str, |
| 202 | body: str = "", |
| 203 | author: str = "", |
| 204 | author_identity_id: str = "", |
| 205 | proposal_type: str = "state_merge", |
| 206 | is_draft: bool = False, |
| 207 | merge_strategy: str = "overlay", |
| 208 | merge_conditions: JSONObject | None = None, |
| 209 | selective_domains: list[str] | None = None, |
| 210 | depends_on: list[str] | None = None, |
| 211 | proposer_signature: str | None = None, |
| 212 | proposer_public_key: str | None = None, |
| 213 | url_prefix: str = "", |
| 214 | ) -> ProposalResponse: |
| 215 | """Persist a new merge proposal in ``open`` state and return its wire representation. |
| 216 | |
| 217 | ``author`` identifies the user opening the proposal — typically the MSign handle |
| 218 | from the request context, or a display name from the seed script. |
| 219 | |
| 220 | Raises ``BranchNotFoundError`` if ``from_branch`` does not exist in the repo; |
| 221 | the caller should surface this as HTTP 404. |
| 222 | """ |
| 223 | branch = await _get_branch(session, repo_id, from_branch) |
| 224 | if branch is None: |
| 225 | raise BranchNotFoundError(f"Branch '{from_branch}' not found in repo {repo_id}") |
| 226 | from_snapshot_id = branch.head_commit_id |
| 227 | to_branch_row = await _get_branch(session, repo_id, to_branch) |
| 228 | to_snapshot_id = to_branch_row.head_commit_id if to_branch_row else None |
| 229 | |
| 230 | # Assign the next sequential proposal_number for this repo (1-based, like GitHub) |
| 231 | max_num_result = await session.execute( |
| 232 | select(func.max(MusehubProposal.proposal_number)).where( |
| 233 | MusehubProposal.repo_id == repo_id |
| 234 | ) |
| 235 | ) |
| 236 | max_num: int | None = max_num_result.scalar_one_or_none() |
| 237 | next_num = (max_num or 0) + 1 |
| 238 | |
| 239 | touched = await _touched_symbols_for_branch(session, repo_id, from_branch) |
| 240 | _created_at = _utc_now() |
| 241 | initial_state = "drafting" if is_draft else "open" |
| 242 | proposal = MusehubProposal( |
| 243 | proposal_id=compute_proposal_id( |
| 244 | repo_id, author_identity_id, from_branch, to_branch, _created_at.isoformat() |
| 245 | ), |
| 246 | repo_id=repo_id, |
| 247 | proposal_number=next_num, |
| 248 | title=title, |
| 249 | body=body, |
| 250 | state=initial_state, |
| 251 | from_branch=from_branch, |
| 252 | to_branch=to_branch, |
| 253 | author=author, |
| 254 | touched_symbols=touched, |
| 255 | created_at=_created_at, |
| 256 | proposal_type=proposal_type, |
| 257 | is_draft=is_draft, |
| 258 | merge_strategy=merge_strategy, |
| 259 | merge_conditions=merge_conditions, |
| 260 | selective_domains=selective_domains, |
| 261 | proposer_signature=proposer_signature, |
| 262 | proposer_public_key=proposer_public_key, |
| 263 | from_snapshot_id=from_snapshot_id, |
| 264 | to_snapshot_id=to_snapshot_id, |
| 265 | ) |
| 266 | session.add(proposal) |
| 267 | await session.flush() |
| 268 | |
| 269 | # Persist dependency edges — validates existence, detects cycles before commit |
| 270 | if depends_on: |
| 271 | await create_dependency_edges(session, proposal.proposal_id, depends_on) |
| 272 | |
| 273 | await session.refresh(proposal) |
| 274 | logger.info("✅ Created proposal '%s' (%s → %s) in repo %s", title, from_branch, to_branch, repo_id) |
| 275 | return _to_proposal_response(proposal, url_prefix=url_prefix) |
| 276 | |
| 277 | |
| 278 | def _risk_band_conditions(bands: list[str]) -> list[ColumnElement[bool]]: |
| 279 | """Return SQLAlchemy conditions that match risk_score to the given band names. |
| 280 | |
| 281 | Each band maps to a half-open interval on [0.0, 1.0]: |
| 282 | critical ≥ 0.75 |
| 283 | high 0.50 – 0.74… |
| 284 | medium 0.25 – 0.49… |
| 285 | low 0.01 – 0.24… |
| 286 | none == 0.0 (or NULL) |
| 287 | |
| 288 | Multiple bands are OR-ed together. |
| 289 | """ |
| 290 | band_ranges: dict[str, tuple[float | None, float | None]] = { |
| 291 | "critical": (0.75, None), |
| 292 | "high": (0.50, 0.75), |
| 293 | "medium": (0.25, 0.50), |
| 294 | "low": (0.01, 0.25), |
| 295 | "none": (None, 0.01), |
| 296 | } |
| 297 | clauses = [] |
| 298 | for band in bands: |
| 299 | lo, hi = band_ranges.get(band, (None, None)) |
| 300 | if band == "none": |
| 301 | clauses.append( |
| 302 | sa.or_( |
| 303 | MusehubProposal.risk_score.is_(None), |
| 304 | MusehubProposal.risk_score == 0.0, |
| 305 | ) |
| 306 | ) |
| 307 | elif lo is not None and hi is not None: |
| 308 | clauses.append( |
| 309 | sa.and_( |
| 310 | MusehubProposal.risk_score >= lo, |
| 311 | MusehubProposal.risk_score < hi, |
| 312 | ) |
| 313 | ) |
| 314 | elif lo is not None: |
| 315 | clauses.append(MusehubProposal.risk_score >= lo) |
| 316 | return clauses |
| 317 | |
| 318 | |
| 319 | async def list_proposals( |
| 320 | session: AsyncSession, |
| 321 | repo_id: str, |
| 322 | *, |
| 323 | state: str = "all", |
| 324 | cursor: str | None = None, |
| 325 | limit: int = 20, |
| 326 | filters: ProposalListFilters | None = None, |
| 327 | url_prefix: str = "", |
| 328 | ) -> ProposalListResponse: |
| 329 | """Return merge proposals for a repo with cursor-based keyset pagination. |
| 330 | |
| 331 | ``state`` may be ``"open"``, ``"merged"``, ``"closed"``, ``"all"``, or any |
| 332 | value in the extended 7-state machine accepted by ``ProposalListFilters``. |
| 333 | |
| 334 | When ``filters`` is provided, the following additional predicates are applied: |
| 335 | - ``filters.risk_band`` → ``risk_score`` range filter (OR across bands) |
| 336 | - ``filters.domain`` → ``risk_score > 0`` when "code" is included; |
| 337 | other domains require per-domain risk rows |
| 338 | (Phase 2 DB prerequisite) |
| 339 | - ``filters.author_type`` → LEFT JOIN to ``musehub_identities`` |
| 340 | - ``filters.assigned_reviewer`` → EXISTS sub-select on ``musehub_proposal_reviews`` |
| 341 | - ``filters.sort`` → ordering; ``merge_ready_first`` uses an approval |
| 342 | count sub-select to surface ready proposals first |
| 343 | |
| 344 | ``cursor`` is the ISO 8601 ``created_at`` of the last seen proposal (opaque |
| 345 | to callers — pass ``nextCursor`` from a previous response verbatim). |
| 346 | |
| 347 | Args: |
| 348 | session: Async database session. |
| 349 | repo_id: Target repository ID. |
| 350 | state: Proposal state filter; defaults to "all". |
| 351 | cursor: Pagination cursor (opaque ISO 8601 string). |
| 352 | limit: Page size. |
| 353 | filters: Optional ``ProposalListFilters``; overrides ``state``, ``limit``, |
| 354 | ``cursor``, and ``sort`` when provided. ``state`` in ``filters`` |
| 355 | takes precedence over the top-level ``state`` kwarg. |
| 356 | |
| 357 | Returns: |
| 358 | ``ProposalListResponse`` with paginated proposals and a ``nextCursor``. |
| 359 | |
| 360 | Raises: |
| 361 | ValueError: If ``cursor`` is not a valid ISO 8601 datetime string. |
| 362 | """ |
| 363 | f = filters |
| 364 | effective_state = (f.state if f else None) or state |
| 365 | effective_limit = (f.limit if f else None) or limit |
| 366 | effective_cursor = (f.cursor if f else None) or cursor |
| 367 | effective_sort = (f.sort if f else None) or "newest" |
| 368 | |
| 369 | conditions: list[ColumnElement[bool]] = [MusehubProposal.repo_id == repo_id] |
| 370 | if effective_state != "all": |
| 371 | conditions.append(MusehubProposal.state == effective_state) |
| 372 | |
| 373 | stmt = select(MusehubProposal) |
| 374 | |
| 375 | # ── Author-type filter (requires join to identities) ─────────────────────── |
| 376 | if f and f.author_type != "all": |
| 377 | stmt = stmt.join( |
| 378 | MusehubIdentity, |
| 379 | MusehubIdentity.handle == MusehubProposal.author, |
| 380 | isouter=True, |
| 381 | ) |
| 382 | if f.author_type == "human": |
| 383 | conditions.append( |
| 384 | sa.or_( |
| 385 | MusehubIdentity.identity_type == "human", |
| 386 | MusehubIdentity.identity_type.is_(None), |
| 387 | ) |
| 388 | ) |
| 389 | else: |
| 390 | conditions.append(MusehubIdentity.identity_type == f.author_type) |
| 391 | |
| 392 | # ── Risk-band filter ─────────────────────────────────────────────────────── |
| 393 | if f and f.risk_band: |
| 394 | band_clauses = _risk_band_conditions(f.risk_band) |
| 395 | if band_clauses: |
| 396 | conditions.append(sa.or_(*band_clauses)) |
| 397 | |
| 398 | # ── Domain filter — code only at Phase 2 (other domains require risk rows) ─ |
| 399 | if f and f.domain: |
| 400 | domain_clauses = [] |
| 401 | if "code" in f.domain: |
| 402 | domain_clauses.append( |
| 403 | sa.and_( |
| 404 | MusehubProposal.risk_score.is_not(None), |
| 405 | MusehubProposal.risk_score > 0.0, |
| 406 | ) |
| 407 | ) |
| 408 | if domain_clauses: |
| 409 | conditions.append(sa.or_(*domain_clauses)) |
| 410 | |
| 411 | # ── Proposal-type filter ────────────────────────────────────────────────── |
| 412 | if f and f.proposal_type: |
| 413 | conditions.append(MusehubProposal.proposal_type.in_(f.proposal_type)) |
| 414 | |
| 415 | # ── Is-draft filter ─────────────────────────────────────────────────────── |
| 416 | if f and f.is_draft is not None: |
| 417 | conditions.append(MusehubProposal.is_draft == f.is_draft) |
| 418 | |
| 419 | # ── Merge-strategy filter ───────────────────────────────────────────────── |
| 420 | if f and f.merge_strategy: |
| 421 | conditions.append(MusehubProposal.merge_strategy.in_(f.merge_strategy)) |
| 422 | |
| 423 | # ── Assigned-reviewer filter ─────────────────────────────────────────────── |
| 424 | if f and f.assigned_reviewer: |
| 425 | reviewer_subq = ( |
| 426 | select(MusehubProposalReview.proposal_id) |
| 427 | .where( |
| 428 | MusehubProposalReview.reviewer_username == f.assigned_reviewer, |
| 429 | MusehubProposalReview.state.in_(["pending", "approved", "changes_requested"]), |
| 430 | ) |
| 431 | .correlate(MusehubProposal) |
| 432 | ) |
| 433 | conditions.append(MusehubProposal.proposal_id.in_(reviewer_subq)) |
| 434 | |
| 435 | # ── Count total matching rows (re-use same joins as data query) ─────────── |
| 436 | count_stmt = stmt.with_only_columns( |
| 437 | func.count(MusehubProposal.proposal_id) |
| 438 | ).where(*conditions).order_by(None) |
| 439 | total: int = (await session.execute(count_stmt)).scalar_one() |
| 440 | |
| 441 | # ── Sort order ───────────────────────────────────────────────────────────── |
| 442 | order_clauses: list[ColumnElement[bool]] |
| 443 | cursor_ascending: bool # True → > cursor, False → < cursor |
| 444 | if effective_sort == "oldest": |
| 445 | order_clauses = [MusehubProposal.created_at.asc()] |
| 446 | cursor_ascending = True |
| 447 | elif effective_sort == "risk_desc": |
| 448 | order_clauses = [MusehubProposal.risk_score.desc().nulls_last()] |
| 449 | cursor_ascending = False |
| 450 | elif effective_sort == "risk_asc": |
| 451 | order_clauses = [MusehubProposal.risk_score.asc().nulls_last()] |
| 452 | cursor_ascending = False |
| 453 | elif effective_sort == "merge_ready_first": |
| 454 | # Sub-select: count approved reviews per proposal; ready = count>=2 AND breakage=0 |
| 455 | approval_subq = ( |
| 456 | select(func.count(MusehubProposalReview.review_id)) |
| 457 | .where( |
| 458 | MusehubProposalReview.proposal_id == MusehubProposal.proposal_id, |
| 459 | MusehubProposalReview.state == "approved", |
| 460 | ) |
| 461 | .correlate(MusehubProposal) |
| 462 | .scalar_subquery() |
| 463 | ) |
| 464 | is_ready = sa.case( |
| 465 | ( |
| 466 | sa.and_( |
| 467 | approval_subq >= _DEFAULT_REQUIRED_APPROVALS, |
| 468 | MusehubProposal.breakage_count == 0, |
| 469 | ), |
| 470 | 0, |
| 471 | ), |
| 472 | else_=1, |
| 473 | ) |
| 474 | order_clauses = [is_ready, MusehubProposal.created_at.desc()] |
| 475 | cursor_ascending = False |
| 476 | else: |
| 477 | # newest (default) |
| 478 | order_clauses = [MusehubProposal.created_at.desc()] |
| 479 | cursor_ascending = False |
| 480 | |
| 481 | # ── Cursor predicate ─────────────────────────────────────────────────────── |
| 482 | data_conditions = list(conditions) |
| 483 | if effective_cursor is not None: |
| 484 | cursor_dt = datetime.fromisoformat(effective_cursor) |
| 485 | if cursor_ascending: |
| 486 | data_conditions.append(MusehubProposal.created_at > cursor_dt) |
| 487 | else: |
| 488 | data_conditions.append(MusehubProposal.created_at < cursor_dt) |
| 489 | |
| 490 | rows = list( |
| 491 | ( |
| 492 | await session.execute( |
| 493 | stmt.where(*data_conditions).order_by(*order_clauses).limit(effective_limit + 1) |
| 494 | ) |
| 495 | ).scalars() |
| 496 | ) |
| 497 | |
| 498 | next_cursor: str | None = None |
| 499 | if len(rows) == effective_limit + 1: |
| 500 | next_cursor = rows[effective_limit - 1].created_at.isoformat() |
| 501 | rows = rows[:effective_limit] |
| 502 | |
| 503 | return ProposalListResponse( |
| 504 | proposals=[_to_proposal_response(r, url_prefix=url_prefix) for r in rows], |
| 505 | total=total, |
| 506 | next_cursor=next_cursor, |
| 507 | ) |
| 508 | |
| 509 | |
| 510 | async def get_proposal( |
| 511 | session: AsyncSession, |
| 512 | repo_id: str, |
| 513 | proposal_id: str, |
| 514 | *, |
| 515 | url_prefix: str = "", |
| 516 | ) -> ProposalResponse | None: |
| 517 | """Return a single proposal enriched with DAG position and simulation summaries.""" |
| 518 | stmt = select(MusehubProposal).where( |
| 519 | MusehubProposal.repo_id == repo_id, |
| 520 | MusehubProposal.proposal_id == proposal_id, |
| 521 | ) |
| 522 | row = (await session.execute(stmt)).scalar_one_or_none() |
| 523 | if row is None: |
| 524 | return None |
| 525 | dag = await load_dag_for_proposals(session, [proposal_id]) |
| 526 | sims = await list_simulations(session, repo_id, proposal_id) |
| 527 | return _to_proposal_response(row, dag=dag, simulations=sims, url_prefix=url_prefix) |
| 528 | |
| 529 | |
| 530 | async def update_proposal( |
| 531 | session: AsyncSession, |
| 532 | repo_id: str, |
| 533 | proposal_id: str, |
| 534 | *, |
| 535 | title: str | None = None, |
| 536 | body: str | None = None, |
| 537 | proposal_type: str | None = None, |
| 538 | merge_strategy: str | None = None, |
| 539 | ) -> ProposalResponse | None: |
| 540 | """Apply a partial update to a proposal. Returns None if not found.""" |
| 541 | stmt = select(MusehubProposal).where( |
| 542 | MusehubProposal.repo_id == repo_id, |
| 543 | MusehubProposal.proposal_id == proposal_id, |
| 544 | ) |
| 545 | row = (await session.execute(stmt)).scalar_one_or_none() |
| 546 | if row is None: |
| 547 | return None |
| 548 | if title is not None: |
| 549 | row.title = title |
| 550 | if body is not None: |
| 551 | row.body = body |
| 552 | if proposal_type is not None: |
| 553 | row.proposal_type = proposal_type |
| 554 | if merge_strategy is not None: |
| 555 | row.merge_strategy = merge_strategy |
| 556 | await session.commit() |
| 557 | await session.refresh(row) |
| 558 | dag = await load_dag_for_proposals(session, [proposal_id]) |
| 559 | sims = await list_simulations(session, repo_id, proposal_id) |
| 560 | return _to_proposal_response(row, dag=dag, simulations=sims) |
| 561 | |
| 562 | |
| 563 | async def _resolve_ancestor_manifest( |
| 564 | session: AsyncSession, |
| 565 | repo_id: str, |
| 566 | from_branch: str, |
| 567 | to_branch: str, |
| 568 | ) -> StrDict | None: |
| 569 | """Find the common ancestor snapshot manifest for a branch pair. |
| 570 | |
| 571 | Walks the from_branch commit history looking for the first commit whose |
| 572 | parent_ids overlap with commits reachable from to_branch. This is a |
| 573 | lightweight merge-base approximation suitable for server-side strategy |
| 574 | computation (not a full LCA traversal). |
| 575 | |
| 576 | Returns None if no ancestor can be found (new repo, orphan branches). |
| 577 | """ |
| 578 | from musehub.graph.walk import walk_dag_async |
| 579 | from musehub.services.musehub_snapshot import get_snapshot_manifest |
| 580 | |
| 581 | to_b = await _get_branch(session, repo_id, to_branch) |
| 582 | from_b = await _get_branch(session, repo_id, from_branch) |
| 583 | if to_b is None or from_b is None: |
| 584 | return None |
| 585 | |
| 586 | # Walk 1: collect to_branch ancestry (bounded BFS) |
| 587 | to_commit_ids: set[str] = set() |
| 588 | |
| 589 | async def _to_adj(cid: str) -> list[str]: |
| 590 | commit = await session.get(MusehubCommit, cid) |
| 591 | return commit.parent_ids if commit and commit.parent_ids else [] |
| 592 | |
| 593 | async for cid in walk_dag_async( |
| 594 | [to_b.head_commit_id] if to_b.head_commit_id else [], |
| 595 | _to_adj, |
| 596 | max_nodes=200, |
| 597 | ): |
| 598 | to_commit_ids.add(cid) |
| 599 | |
| 600 | # Walk 2: first-parent walk on from_branch, looking for merge base |
| 601 | candidate_id: str | None = None |
| 602 | |
| 603 | async def _from_adj(cid: str) -> list[str]: |
| 604 | nonlocal candidate_id |
| 605 | commit = await session.get(MusehubCommit, cid) |
| 606 | if commit is None: |
| 607 | return [] |
| 608 | for parent_id in (commit.parent_ids or []): |
| 609 | if parent_id in to_commit_ids: |
| 610 | candidate_id = parent_id |
| 611 | return [] # stop walking once merge base is found |
| 612 | return commit.parent_ids[:1] if commit.parent_ids else [] |
| 613 | |
| 614 | async for _ in walk_dag_async( |
| 615 | [from_b.head_commit_id] if from_b.head_commit_id else [], |
| 616 | _from_adj, |
| 617 | max_nodes=200, |
| 618 | ): |
| 619 | if candidate_id: |
| 620 | break |
| 621 | |
| 622 | if not candidate_id: |
| 623 | return None |
| 624 | |
| 625 | ancestor_commit = await session.get(MusehubCommit, candidate_id) |
| 626 | if ancestor_commit is None or not ancestor_commit.snapshot_id: |
| 627 | return None |
| 628 | |
| 629 | return await get_snapshot_manifest(session, ancestor_commit.snapshot_id) |
| 630 | |
| 631 | |
| 632 | async def _rebase_commits( |
| 633 | session: AsyncSession, |
| 634 | repo_id: str, |
| 635 | proposal: "MusehubProposal", |
| 636 | merger_handle: str, |
| 637 | to_b: "MusehubBranch | None", |
| 638 | from_b: "MusehubBranch | None", |
| 639 | to_manifest: StrDict, |
| 640 | compute_commit_id_fn: object, |
| 641 | upsert_snapshot_entries_fn: object, |
| 642 | ) -> str: |
| 643 | """Replay each from_branch commit individually onto to_branch (linear history). |
| 644 | |
| 645 | Algorithm: |
| 646 | 1. Collect from_branch commits not reachable from to_branch, oldest-first. |
| 647 | 2. For each commit in order: |
| 648 | a. Compute its file delta vs its parent snapshot. |
| 649 | b. Apply that delta to the running rebased state (starts = to_manifest). |
| 650 | c. Upsert the new snapshot. |
| 651 | d. Create a new MusehubCommit with parent = previous replayed commit. |
| 652 | 3. Return the commit_id of the tip (last replayed commit). |
| 653 | """ |
| 654 | from musehub.services.musehub_snapshot import get_snapshot_manifest |
| 655 | |
| 656 | to_head_cid = to_b.head_commit_id if to_b else None |
| 657 | |
| 658 | # Collect from_branch commits not on to_branch (walk parent_ids BFS). |
| 659 | from_head_cid = from_b.head_commit_id if from_b else None |
| 660 | if not from_head_cid: |
| 661 | raise ValueError(f"from_branch '{proposal.from_branch}' has no commits") |
| 662 | |
| 663 | to_ancestors: set[str] = set() |
| 664 | if to_head_cid: |
| 665 | _q: list[str] = [to_head_cid] |
| 666 | while _q: |
| 667 | _cid = _q.pop() |
| 668 | if _cid in to_ancestors: |
| 669 | continue |
| 670 | to_ancestors.add(_cid) |
| 671 | _row = await session.get(MusehubCommit, _cid) |
| 672 | if _row: |
| 673 | _q.extend(_row.parent_ids or []) |
| 674 | |
| 675 | # BFS from from_head, collecting commits not in to_ancestors. |
| 676 | from_commits_unordered: list[MusehubCommit] = [] |
| 677 | _seen: set[str] = set() |
| 678 | _frontier = [from_head_cid] |
| 679 | while _frontier: |
| 680 | _cid = _frontier.pop(0) |
| 681 | if _cid in _seen or _cid in to_ancestors: |
| 682 | continue |
| 683 | _seen.add(_cid) |
| 684 | _c = await session.get(MusehubCommit, _cid) |
| 685 | if _c is None: |
| 686 | continue |
| 687 | from_commits_unordered.append(_c) |
| 688 | _frontier.extend(p for p in (_c.parent_ids or []) if p not in _seen) |
| 689 | |
| 690 | # Topological sort: oldest-first (Kahn's algorithm on parent_ids subset). |
| 691 | cid_set = {c.commit_id for c in from_commits_unordered} |
| 692 | in_degree: dict[str, int] = {c.commit_id: 0 for c in from_commits_unordered} |
| 693 | children: dict[str, list[MusehubCommit]] = {c.commit_id: [] for c in from_commits_unordered} |
| 694 | for c in from_commits_unordered: |
| 695 | for p in (c.parent_ids or []): |
| 696 | if p in cid_set: |
| 697 | children[p].append(c) |
| 698 | in_degree[c.commit_id] += 1 |
| 699 | queue = [c for c in from_commits_unordered if in_degree[c.commit_id] == 0] |
| 700 | from_commits: list[MusehubCommit] = [] |
| 701 | while queue: |
| 702 | c = queue.pop(0) |
| 703 | from_commits.append(c) |
| 704 | for child in children[c.commit_id]: |
| 705 | in_degree[child.commit_id] -= 1 |
| 706 | if in_degree[child.commit_id] == 0: |
| 707 | queue.append(child) |
| 708 | |
| 709 | # Look up generation of to_head for commit graph. |
| 710 | _to_gen = 0 |
| 711 | if to_head_cid: |
| 712 | _tg_row = await session.get(MusehubCommitGraph, to_head_cid) |
| 713 | if _tg_row: |
| 714 | _to_gen = _tg_row.generation |
| 715 | |
| 716 | # Replay each commit. |
| 717 | rebased_state: StrDict = dict(to_manifest) |
| 718 | current_parent_id: str = to_head_cid or "" |
| 719 | current_gen = _to_gen |
| 720 | tip_cid = current_parent_id |
| 721 | |
| 722 | for orig in from_commits: |
| 723 | # Compute file delta: orig_snapshot vs orig's parent snapshot. |
| 724 | orig_manifest = await get_snapshot_manifest(session, orig.snapshot_id) if orig.snapshot_id else {} |
| 725 | parent_manifest: StrDict = {} |
| 726 | for p in (orig.parent_ids or []): |
| 727 | _p_row = await session.get(MusehubCommit, p) |
| 728 | if _p_row and _p_row.snapshot_id: |
| 729 | parent_manifest = await get_snapshot_manifest(session, _p_row.snapshot_id) |
| 730 | break |
| 731 | |
| 732 | # Apply delta to rebased_state. |
| 733 | for path, oid in orig_manifest.items(): |
| 734 | rebased_state[path] = oid |
| 735 | for path in list(parent_manifest.keys()): |
| 736 | if path not in orig_manifest: |
| 737 | rebased_state.pop(path, None) |
| 738 | |
| 739 | # Upsert new snapshot for this rebased state. |
| 740 | rebased_snap_id = compute_snapshot_id(rebased_state) |
| 741 | await upsert_snapshot_entries_fn(session, repo_id, rebased_snap_id, rebased_state) |
| 742 | |
| 743 | # Create the replayed commit. |
| 744 | committed_at = _utc_now() |
| 745 | new_parent_ids = [current_parent_id] if current_parent_id else [] |
| 746 | new_cid = compute_commit_id_fn( |
| 747 | new_parent_ids, rebased_snap_id, orig.message, |
| 748 | committed_at.isoformat(), author=orig.author or merger_handle, |
| 749 | signer_public_key="", |
| 750 | ) |
| 751 | session.add(MusehubCommit( |
| 752 | commit_id=new_cid, branch=proposal.to_branch, |
| 753 | parent_ids=new_parent_ids, message=orig.message, |
| 754 | author=orig.author or merger_handle, timestamp=committed_at, |
| 755 | snapshot_id=rebased_snap_id, |
| 756 | structured_delta=orig.structured_delta, |
| 757 | agent_id=orig.agent_id or "", model_id=orig.model_id or "", |
| 758 | )) |
| 759 | session.add(MusehubCommitRef(repo_id=repo_id, commit_id=new_cid)) |
| 760 | current_gen += 1 |
| 761 | session.add(MusehubCommitGraph( |
| 762 | commit_id=new_cid, parent_ids=new_parent_ids, |
| 763 | generation=current_gen, snapshot_id=rebased_snap_id, |
| 764 | )) |
| 765 | |
| 766 | current_parent_id = new_cid |
| 767 | tip_cid = new_cid |
| 768 | |
| 769 | if not tip_cid: |
| 770 | raise ValueError("rebase produced no commits — from_branch has no unique commits") |
| 771 | |
| 772 | return tip_cid |
| 773 | |
| 774 | |
| 775 | async def merge_proposal( |
| 776 | session: AsyncSession, |
| 777 | repo_id: str, |
| 778 | proposal_id: str, |
| 779 | *, |
| 780 | merge_strategy: str = "merge_commit", |
| 781 | merger_handle: str = "", |
| 782 | commit_history: str = "merge", |
| 783 | ) -> ProposalResponse: |
| 784 | """Merge an open proposal. |
| 785 | |
| 786 | Args: |
| 787 | merge_strategy: Content/snapshot merge strategy — how file manifests combine. |
| 788 | ``overlay`` (default), ``weave``, ``replay``, ``selective``. |
| 789 | ``overlay`` (default), ``weave``, ``replay``, ``selective``. |
| 790 | commit_history: VCS commit graph style. |
| 791 | ``merge`` (default) — new commit with two parents [to_head, from_head]. |
| 792 | ``squash`` — one new commit, parent = [to_head] only. |
| 793 | ``rebase`` — replay each from_branch commit linearly (TODO). |
| 794 | |
| 795 | Raises: |
| 796 | ValueError: Proposal not found or branch has no commits. |
| 797 | RuntimeError: Proposal is already merged or closed (surfaces as 409). |
| 798 | """ |
| 799 | stmt = select(MusehubProposal).where( |
| 800 | MusehubProposal.repo_id == repo_id, |
| 801 | MusehubProposal.proposal_id == proposal_id, |
| 802 | ) |
| 803 | proposal = (await session.execute(stmt)).scalar_one_or_none() |
| 804 | if proposal is None: |
| 805 | raise ValueError(f"Proposal {proposal_id} not found in repo {repo_id}") |
| 806 | |
| 807 | if proposal.state not in ("open", "approved"): |
| 808 | raise RuntimeError(f"Proposal {proposal_id} is already {proposal.state}") |
| 809 | |
| 810 | # Gate on hard dependencies — check require_dependency_merged from merge_conditions |
| 811 | mc_raw = proposal.merge_conditions or {} |
| 812 | if mc_raw.get("require_dependency_merged", True): |
| 813 | dag = await load_dag_for_proposals(session, [proposal_id]) |
| 814 | if is_blocked(dag, proposal_id): |
| 815 | unmerged_nums = blocked_by_numbers(dag, proposal_id) |
| 816 | raise RuntimeError( |
| 817 | f"Proposal {proposal_id} cannot be merged: " |
| 818 | f"unmerged dependencies: proposal numbers {unmerged_nums}" |
| 819 | ) |
| 820 | |
| 821 | from_b = await _get_branch(session, repo_id, proposal.from_branch) |
| 822 | to_b = await _get_branch(session, repo_id, proposal.to_branch) |
| 823 | |
| 824 | # Collect parent commit IDs for the merge commit. |
| 825 | parent_ids: list[str] = [] |
| 826 | if to_b is not None and to_b.head_commit_id is not None: |
| 827 | parent_ids.append(to_b.head_commit_id) |
| 828 | if from_b is not None and from_b.head_commit_id is not None: |
| 829 | parent_ids.append(from_b.head_commit_id) |
| 830 | |
| 831 | if not parent_ids: |
| 832 | raise ValueError( |
| 833 | f"Cannot merge: neither '{proposal.from_branch}' nor '{proposal.to_branch}' has any commits" |
| 834 | ) |
| 835 | |
| 836 | # Squash: drop from_head from parent_ids so the result is linear. |
| 837 | if commit_history == "squash" and to_b is not None and to_b.head_commit_id is not None: |
| 838 | parent_ids = [to_b.head_commit_id] |
| 839 | |
| 840 | from musehub.services.musehub_snapshot import get_snapshot_manifest, upsert_snapshot_entries |
| 841 | from musehub.services.proposal_merge_strategies import execute_merge_strategy |
| 842 | |
| 843 | to_manifest: StrDict = {} |
| 844 | if to_b is not None and to_b.head_commit_id is not None: |
| 845 | to_head = await session.get(MusehubCommit, to_b.head_commit_id) |
| 846 | if to_head is not None and to_head.snapshot_id: |
| 847 | to_manifest = await get_snapshot_manifest(session, to_head.snapshot_id) |
| 848 | |
| 849 | from_manifest: StrDict = {} |
| 850 | from_head_snapshot_id: str | None = None |
| 851 | if from_b is not None and from_b.head_commit_id is not None: |
| 852 | from_head = await session.get(MusehubCommit, from_b.head_commit_id) |
| 853 | if from_head is not None and from_head.snapshot_id: |
| 854 | from_head_snapshot_id = from_head.snapshot_id |
| 855 | from_manifest = await get_snapshot_manifest(session, from_head.snapshot_id) |
| 856 | |
| 857 | # Resolve ancestor manifest for three-way strategies. |
| 858 | # The ancestor is the snapshot at the point from_branch was cut from to_branch. |
| 859 | # We approximate this as the earliest commit on from_branch that has a parent |
| 860 | # on to_branch — or the repo's first commit if the branch predates any tracking. |
| 861 | # For OVERLAY (default), the ancestor is used only for conflict audit. |
| 862 | ancestor_manifest: StrDict | None = None |
| 863 | strategy_name = getattr(proposal, "merge_strategy", "overlay") or "overlay" |
| 864 | # Caller can override via merge_strategy parameter (passed as the kwarg). |
| 865 | if merge_strategy and merge_strategy != "merge_commit": |
| 866 | strategy_name = merge_strategy |
| 867 | if strategy_name in ("weave", "replay", "selective", "phased"): |
| 868 | # Walk from_branch commit history to find the merge-base with to_branch. |
| 869 | # Simplified: look for the first from_branch commit whose parent is in to_branch. |
| 870 | ancestor_manifest = await _resolve_ancestor_manifest( |
| 871 | session, repo_id, proposal.from_branch, proposal.to_branch |
| 872 | ) |
| 873 | |
| 874 | selective_domains: list[str] | None = getattr(proposal, "selective_domains", None) |
| 875 | |
| 876 | merge_result = execute_merge_strategy( |
| 877 | strategy_name, |
| 878 | to_manifest, |
| 879 | from_manifest, |
| 880 | ancestor_manifest=ancestor_manifest, |
| 881 | selective_domains=selective_domains, |
| 882 | ) |
| 883 | merged_manifest: StrDict = merge_result.manifest |
| 884 | |
| 885 | logger.info( |
| 886 | "🔀 Merge strategy=%s added=%d modified=%d removed=%d conflicts=%d domains=%s", |
| 887 | merge_result.strategy, |
| 888 | merge_result.files_added, |
| 889 | merge_result.files_modified, |
| 890 | merge_result.files_removed, |
| 891 | len(merge_result.conflicts), |
| 892 | merge_result.domains_merged, |
| 893 | ) |
| 894 | |
| 895 | # ── Commit creation — varies by commit_history style ──────────────── |
| 896 | merge_commit_id: str |
| 897 | |
| 898 | if commit_history == "rebase": |
| 899 | # Replay each from_branch commit individually onto to_branch. |
| 900 | # Walk from_branch ancestors oldest-first, stopping at to_branch head. |
| 901 | merge_commit_id = await _rebase_commits( |
| 902 | session, repo_id, proposal, merger_handle, |
| 903 | to_b, from_b, to_manifest, |
| 904 | compute_commit_id, upsert_snapshot_entries, |
| 905 | ) |
| 906 | else: |
| 907 | # merge and squash: create a single new commit. |
| 908 | merged_snapshot_id = compute_snapshot_id(merged_manifest) |
| 909 | await upsert_snapshot_entries(session, repo_id, merged_snapshot_id, merged_manifest) |
| 910 | |
| 911 | merge_message = f"Merge '{proposal.from_branch}' into '{proposal.to_branch}' — proposal: {proposal.title}" |
| 912 | committed_at = _utc_now() |
| 913 | merge_commit_id = compute_commit_id( |
| 914 | parent_ids, merged_snapshot_id, merge_message, |
| 915 | committed_at.isoformat(), author=merger_handle, signer_public_key="", |
| 916 | ) |
| 917 | session.add(MusehubCommit( |
| 918 | commit_id=merge_commit_id, branch=proposal.to_branch, |
| 919 | parent_ids=parent_ids, message=merge_message, |
| 920 | author=merger_handle, timestamp=committed_at, snapshot_id=merged_snapshot_id, |
| 921 | )) |
| 922 | session.add(MusehubCommitRef(repo_id=repo_id, commit_id=merge_commit_id)) |
| 923 | |
| 924 | _parent_gen_q = await session.execute( |
| 925 | select(MusehubCommitGraph.commit_id, MusehubCommitGraph.generation) |
| 926 | .where(MusehubCommitGraph.commit_id.in_(parent_ids)) |
| 927 | ) |
| 928 | _parent_gens = [row[1] for row in _parent_gen_q.all()] |
| 929 | _merge_generation = (max(_parent_gens) + 1) if _parent_gens else 0 |
| 930 | session.add(MusehubCommitGraph( |
| 931 | commit_id=merge_commit_id, parent_ids=parent_ids, |
| 932 | generation=_merge_generation, snapshot_id=merged_snapshot_id, |
| 933 | )) |
| 934 | |
| 935 | # Advance (or create) the to_branch head pointer. |
| 936 | if to_b is None: |
| 937 | to_b = MusehubBranch( |
| 938 | branch_id=compute_branch_id(repo_id, proposal.to_branch), |
| 939 | repo_id=repo_id, |
| 940 | name=proposal.to_branch, |
| 941 | head_commit_id=merge_commit_id, |
| 942 | ) |
| 943 | session.add(to_b) |
| 944 | else: |
| 945 | to_b.head_commit_id = merge_commit_id |
| 946 | |
| 947 | # Delete the source branch so it disappears from refs. This lets |
| 948 | # `muse fetch --prune` clean up the local tracking ref automatically, |
| 949 | # matching the behaviour users expect after a proposal merge. |
| 950 | if from_b is not None: |
| 951 | await session.delete(from_b) |
| 952 | |
| 953 | # Refresh touched_symbols from the from_branch commits before marking merged. |
| 954 | # This gives the most accurate symbol set at the moment of merge, capturing |
| 955 | # any commits pushed to from_branch after the proposal was created. |
| 956 | touched = await _touched_symbols_for_branch(session, repo_id, proposal.from_branch) |
| 957 | proposal.touched_symbols = touched |
| 958 | |
| 959 | # Mark proposal as merged and record the exact merge timestamp. |
| 960 | proposal.state = "merged" |
| 961 | proposal.merge_commit_id = merge_commit_id |
| 962 | proposal.merged_at = _utc_now() |
| 963 | |
| 964 | await session.flush() |
| 965 | await session.refresh(proposal) |
| 966 | logger.info( |
| 967 | "✅ Merged proposal %s ('%s' → '%s') in repo %s, merge commit %s", |
| 968 | proposal_id, |
| 969 | proposal.from_branch, |
| 970 | proposal.to_branch, |
| 971 | repo_id, |
| 972 | merge_commit_id, |
| 973 | ) |
| 974 | |
| 975 | embed = MergeResultEmbed( |
| 976 | status="merged", |
| 977 | commit_id=merge_commit_id, |
| 978 | strategy=merge_result.strategy, |
| 979 | on_conflict=None, |
| 980 | history=commit_history, |
| 981 | conflicts=[c.path for c in merge_result.conflicts], |
| 982 | files_changed={ |
| 983 | "added": merge_result.files_added, |
| 984 | "modified": merge_result.files_modified, |
| 985 | "deleted": merge_result.files_removed, |
| 986 | }, |
| 987 | semver_impact="", |
| 988 | ) |
| 989 | return _to_proposal_response(proposal, merge_result=embed) |
| 990 | |
| 991 | |
| 992 | # --------------------------------------------------------------------------- |
| 993 | # Reopen proposal |
| 994 | # --------------------------------------------------------------------------- |
| 995 | |
| 996 | |
| 997 | async def close_proposal( |
| 998 | session: AsyncSession, |
| 999 | repo_id: str, |
| 1000 | proposal_id: str, |
| 1001 | ) -> ProposalResponse: |
| 1002 | """Set an open proposal to ``closed`` state. |
| 1003 | |
| 1004 | Raises: |
| 1005 | KeyError: proposal not found in this repo. |
| 1006 | RuntimeError: proposal is already closed or merged. |
| 1007 | """ |
| 1008 | proposal = (await session.execute( |
| 1009 | select(MusehubProposal).where( |
| 1010 | MusehubProposal.proposal_id == proposal_id, |
| 1011 | MusehubProposal.repo_id == repo_id, |
| 1012 | ) |
| 1013 | )).scalar_one_or_none() |
| 1014 | if proposal is None: |
| 1015 | raise KeyError(f"Proposal {proposal_id} not found in repo {repo_id}") |
| 1016 | if proposal.state != "open": |
| 1017 | raise RuntimeError(f"Proposal {proposal_id} is already {proposal.state}") |
| 1018 | proposal.state = "closed" |
| 1019 | await session.flush() |
| 1020 | await session.refresh(proposal) |
| 1021 | return _to_proposal_response(proposal) |
| 1022 | |
| 1023 | |
| 1024 | async def reopen_proposal( |
| 1025 | session: AsyncSession, |
| 1026 | repo_id: str, |
| 1027 | proposal_id: str, |
| 1028 | ) -> ProposalResponse: |
| 1029 | """Reset a merged or closed proposal back to ``open`` state. |
| 1030 | |
| 1031 | Clears ``merge_commit_id`` and ``merged_at`` so the proposal can be |
| 1032 | re-merged after a corrupt commit is cleaned up (e.g. bug #36). |
| 1033 | |
| 1034 | Raises: |
| 1035 | KeyError: proposal not found in this repo. |
| 1036 | RuntimeError: proposal is already open. |
| 1037 | """ |
| 1038 | proposal = (await session.execute( |
| 1039 | select(MusehubProposal).where( |
| 1040 | MusehubProposal.proposal_id == proposal_id, |
| 1041 | MusehubProposal.repo_id == repo_id, |
| 1042 | ) |
| 1043 | )).scalar_one_or_none() |
| 1044 | if proposal is None: |
| 1045 | raise KeyError(f"Proposal {proposal_id} not found in repo {repo_id}") |
| 1046 | if proposal.state == "open": |
| 1047 | raise RuntimeError(f"Proposal {proposal_id} is already open") |
| 1048 | proposal.state = "open" |
| 1049 | proposal.merge_commit_id = None |
| 1050 | proposal.merged_at = None |
| 1051 | await session.flush() |
| 1052 | await session.refresh(proposal) |
| 1053 | return _to_proposal_response(proposal) |
| 1054 | |
| 1055 | |
| 1056 | # --------------------------------------------------------------------------- |
| 1057 | # Proposal review comments |
| 1058 | # --------------------------------------------------------------------------- |
| 1059 | |
| 1060 | |
| 1061 | def _to_comment_response(row: MusehubProposalComment) -> ProposalCommentResponse: |
| 1062 | dim_ref: JSONObject = row.dimension_ref or {} |
| 1063 | return ProposalCommentResponse( |
| 1064 | comment_id=row.comment_id, |
| 1065 | proposal_id=row.proposal_id, |
| 1066 | author=row.author, |
| 1067 | author_user_id=getattr(row, "author_user_id", None), |
| 1068 | agent_id=getattr(row, "agent_id", None) or None, |
| 1069 | model_id=getattr(row, "model_id", None) or None, |
| 1070 | body=row.body, |
| 1071 | target_type=str(dim_ref.get("type", "general")), |
| 1072 | target_track=str(dim_ref["track"]) if "track" in dim_ref else None, |
| 1073 | target_beat_start=float(dim_ref["beat_start"]) if isinstance(dim_ref.get("beat_start"), (int, float)) else None, |
| 1074 | target_beat_end=float(dim_ref["beat_end"]) if isinstance(dim_ref.get("beat_end"), (int, float)) else None, |
| 1075 | target_note_pitch=int(dim_ref["pitch"]) if isinstance(dim_ref.get("pitch"), int) else None, |
| 1076 | parent_comment_id=row.parent_comment_id, |
| 1077 | symbol_address=row.symbol_address, |
| 1078 | created_at=row.created_at, |
| 1079 | updated_at=getattr(row, "updated_at", None), |
| 1080 | is_deleted=bool(getattr(row, "is_deleted", False)), |
| 1081 | ) |
| 1082 | |
| 1083 | |
| 1084 | async def create_proposal_comment( |
| 1085 | session: AsyncSession, |
| 1086 | *, |
| 1087 | proposal_id: str, |
| 1088 | repo_id: str, |
| 1089 | author: str, |
| 1090 | author_identity_id: str = "", |
| 1091 | body: str, |
| 1092 | target_type: str = "general", |
| 1093 | target_track: str | None = None, |
| 1094 | target_beat_start: float | None = None, |
| 1095 | target_beat_end: float | None = None, |
| 1096 | target_note_pitch: int | None = None, |
| 1097 | parent_comment_id: str | None = None, |
| 1098 | symbol_address: str | None = None, |
| 1099 | ) -> ProposalCommentResponse: |
| 1100 | """Persist a new review comment on a proposal and return its wire representation. |
| 1101 | |
| 1102 | ``author`` is the MSign handle of the reviewer. |
| 1103 | ``parent_comment_id`` must be an existing top-level comment on the same proposal |
| 1104 | when creating a threaded reply; the caller validates this constraint before |
| 1105 | calling here. |
| 1106 | |
| 1107 | Raises ``ValueError`` if the proposal does not exist in the given repo. |
| 1108 | """ |
| 1109 | stmt = select(MusehubProposal).where( |
| 1110 | MusehubProposal.proposal_id == proposal_id, |
| 1111 | MusehubProposal.repo_id == repo_id, |
| 1112 | ) |
| 1113 | proposal = (await session.execute(stmt)).scalar_one_or_none() |
| 1114 | if proposal is None: |
| 1115 | raise ValueError(f"Proposal {proposal_id} not found in repo {repo_id}") |
| 1116 | |
| 1117 | dimension_ref: JSONObject = {"type": target_type} |
| 1118 | if target_track is not None: |
| 1119 | dimension_ref["track"] = target_track |
| 1120 | if target_beat_start is not None: |
| 1121 | dimension_ref["beat_start"] = target_beat_start |
| 1122 | if target_beat_end is not None: |
| 1123 | dimension_ref["beat_end"] = target_beat_end |
| 1124 | if target_note_pitch is not None: |
| 1125 | dimension_ref["pitch"] = target_note_pitch |
| 1126 | |
| 1127 | _created_at = _utc_now() |
| 1128 | comment = MusehubProposalComment( |
| 1129 | comment_id=compute_comment_id(proposal_id, author_identity_id, _created_at.isoformat()), |
| 1130 | proposal_id=proposal_id, |
| 1131 | repo_id=repo_id, |
| 1132 | author=author, |
| 1133 | body=body, |
| 1134 | dimension_ref=dimension_ref, |
| 1135 | parent_comment_id=parent_comment_id, |
| 1136 | symbol_address=symbol_address or None, |
| 1137 | created_at=_created_at, |
| 1138 | ) |
| 1139 | session.add(comment) |
| 1140 | await session.flush() |
| 1141 | await session.refresh(comment) |
| 1142 | logger.info("✅ Created proposal comment %s on proposal %s by %s", comment.comment_id, proposal_id, author) |
| 1143 | return _to_comment_response(comment) |
| 1144 | |
| 1145 | |
| 1146 | async def backfill_comment_author_user_ids( |
| 1147 | session: AsyncSession, |
| 1148 | repo_id: str | None = None, |
| 1149 | batch: int = 500, |
| 1150 | ) -> int: |
| 1151 | """Populate author_user_id on proposal comments that have none. |
| 1152 | |
| 1153 | Joins musehub_proposal_comments → musehub_identities on handle (author) |
| 1154 | and writes the identity_id. Idempotent — rows already populated are skipped. |
| 1155 | Returns count of rows updated. |
| 1156 | """ |
| 1157 | from musehub.db.musehub_identity_models import MusehubIdentity |
| 1158 | |
| 1159 | where = [MusehubProposalComment.author_user_id.is_(None)] |
| 1160 | if repo_id: |
| 1161 | where.append(MusehubProposalComment.repo_id == repo_id) |
| 1162 | |
| 1163 | rows = (await session.execute( |
| 1164 | select(MusehubProposalComment).where(*where).limit(batch) |
| 1165 | )).scalars().all() |
| 1166 | |
| 1167 | if not rows: |
| 1168 | return 0 |
| 1169 | |
| 1170 | handles = {r.author for r in rows} |
| 1171 | identity_rows = (await session.execute( |
| 1172 | select(MusehubIdentity.handle, MusehubIdentity.identity_id) |
| 1173 | .where(MusehubIdentity.handle.in_(handles)) |
| 1174 | )).all() |
| 1175 | handle_to_id = {h: iid for h, iid in identity_rows} |
| 1176 | |
| 1177 | updated = 0 |
| 1178 | for row in rows: |
| 1179 | uid = handle_to_id.get(row.author) |
| 1180 | if uid: |
| 1181 | row.author_user_id = uid |
| 1182 | updated += 1 |
| 1183 | |
| 1184 | if updated: |
| 1185 | await session.flush() |
| 1186 | |
| 1187 | return updated |
| 1188 | |
| 1189 | |
| 1190 | async def list_proposal_comments( |
| 1191 | session: AsyncSession, |
| 1192 | proposal_id: str, |
| 1193 | repo_id: str, |
| 1194 | cursor: str | None = None, |
| 1195 | limit: int = 20, |
| 1196 | ) -> ProposalCommentListResponse: |
| 1197 | """Return review comments for a proposal with cursor-based keyset pagination. |
| 1198 | |
| 1199 | Comments are assembled into a two-level thread tree from the current page. |
| 1200 | Top-level comments (``parent_comment_id`` is None) form the root list. |
| 1201 | Each carries a ``replies`` list with direct children that appear within |
| 1202 | the same page, sorted by ``created_at`` ascending. Grandchildren are not |
| 1203 | supported — callers should reply to the original top-level comment. |
| 1204 | |
| 1205 | ``cursor`` is the ISO 8601 ``created_at`` of the last seen comment |
| 1206 | (opaque to callers — pass ``nextCursor`` from a previous response |
| 1207 | verbatim). Omit to start from the beginning. ``total`` covers all |
| 1208 | comments on the proposal regardless of the current page. |
| 1209 | """ |
| 1210 | conditions = [ |
| 1211 | MusehubProposalComment.proposal_id == proposal_id, |
| 1212 | MusehubProposalComment.repo_id == repo_id, |
| 1213 | MusehubProposalComment.is_deleted.is_(False), |
| 1214 | ] |
| 1215 | |
| 1216 | count_stmt = select(func.count(MusehubProposalComment.comment_id)).where(*conditions) |
| 1217 | total: int = (await session.execute(count_stmt)).scalar_one() |
| 1218 | |
| 1219 | data_conditions = list(conditions) |
| 1220 | if cursor is not None: |
| 1221 | data_conditions.append( |
| 1222 | MusehubProposalComment.created_at > datetime.fromisoformat(cursor) |
| 1223 | ) |
| 1224 | |
| 1225 | rows = list( |
| 1226 | ( |
| 1227 | await session.execute( |
| 1228 | select(MusehubProposalComment) |
| 1229 | .where(*data_conditions) |
| 1230 | .order_by(MusehubProposalComment.created_at) |
| 1231 | .limit(limit + 1) |
| 1232 | ) |
| 1233 | ).scalars() |
| 1234 | ) |
| 1235 | |
| 1236 | next_cursor: str | None = None |
| 1237 | if len(rows) == limit + 1: |
| 1238 | next_cursor = rows[limit - 1].created_at.isoformat() |
| 1239 | rows = rows[:limit] |
| 1240 | |
| 1241 | # Build id → response map first; attach replies in a second pass. |
| 1242 | top_level: list[ProposalCommentResponse] = [] |
| 1243 | by_id: _CommentMap = {} |
| 1244 | for row in rows: |
| 1245 | resp = _to_comment_response(row) |
| 1246 | by_id[row.comment_id] = resp |
| 1247 | if row.parent_comment_id is None: |
| 1248 | top_level.append(resp) |
| 1249 | |
| 1250 | for row in rows: |
| 1251 | if row.parent_comment_id is not None: |
| 1252 | parent = by_id.get(row.parent_comment_id) |
| 1253 | if parent is not None: |
| 1254 | parent.replies.append(by_id[row.comment_id]) |
| 1255 | |
| 1256 | return ProposalCommentListResponse(comments=top_level, total=total, next_cursor=next_cursor) |
| 1257 | |
| 1258 | |
| 1259 | # --------------------------------------------------------------------------- |
| 1260 | # Proposal reviews (reviewer assignment + approval workflow) |
| 1261 | # --------------------------------------------------------------------------- |
| 1262 | |
| 1263 | |
| 1264 | def _to_review_response(row: MusehubProposalReview) -> ProposalReviewResponse: |
| 1265 | return ProposalReviewResponse( |
| 1266 | id=row.review_id, |
| 1267 | proposal_id=row.proposal_id, |
| 1268 | reviewer_username=row.reviewer_username, |
| 1269 | state=row.state, |
| 1270 | body=row.body, |
| 1271 | submitted_at=row.submitted_at, |
| 1272 | created_at=row.created_at, |
| 1273 | ) |
| 1274 | |
| 1275 | |
| 1276 | async def _assert_proposal_exists(session: AsyncSession, repo_id: str, proposal_id: str) -> None: |
| 1277 | """Raise ``ValueError`` if the proposal does not exist in the given repo.""" |
| 1278 | stmt = select(MusehubProposal).where( |
| 1279 | MusehubProposal.proposal_id == proposal_id, |
| 1280 | MusehubProposal.repo_id == repo_id, |
| 1281 | ) |
| 1282 | proposal = (await session.execute(stmt)).scalar_one_or_none() |
| 1283 | if proposal is None: |
| 1284 | raise ValueError(f"Proposal {proposal_id} not found in repo {repo_id}") |
| 1285 | |
| 1286 | |
| 1287 | async def request_reviewers( |
| 1288 | session: AsyncSession, |
| 1289 | *, |
| 1290 | repo_id: str, |
| 1291 | proposal_id: str, |
| 1292 | reviewers: list[str], |
| 1293 | ) -> ProposalReviewListResponse: |
| 1294 | """Add reviewer assignments to a proposal, creating a ``pending`` row for each. |
| 1295 | |
| 1296 | Idempotent: if a reviewer already has a row (in any state), the existing row |
| 1297 | is left unchanged so a submitted approval is never reset by a re-request. |
| 1298 | |
| 1299 | Raises ``ValueError`` if the proposal does not exist in the repo. |
| 1300 | |
| 1301 | Returns the full updated review list for the proposal. |
| 1302 | """ |
| 1303 | await _assert_proposal_exists(session, repo_id, proposal_id) |
| 1304 | |
| 1305 | for username in reviewers: |
| 1306 | existing_stmt = select(MusehubProposalReview).where( |
| 1307 | MusehubProposalReview.proposal_id == proposal_id, |
| 1308 | MusehubProposalReview.reviewer_username == username, |
| 1309 | ) |
| 1310 | existing = (await session.execute(existing_stmt)).scalar_one_or_none() |
| 1311 | if existing is None: |
| 1312 | now = _utc_now() |
| 1313 | identity_stmt = select(MusehubIdentity.identity_id).where( |
| 1314 | MusehubIdentity.handle == username |
| 1315 | ) |
| 1316 | reviewer_identity_id = (await session.execute(identity_stmt)).scalar_one_or_none() or username |
| 1317 | review = MusehubProposalReview( |
| 1318 | review_id=compute_review_id(proposal_id, reviewer_identity_id, now.isoformat()), |
| 1319 | proposal_id=proposal_id, |
| 1320 | reviewer_username=username, |
| 1321 | state="pending", |
| 1322 | created_at=now, |
| 1323 | ) |
| 1324 | session.add(review) |
| 1325 | logger.info("✅ Requested review from '%s' on proposal %s", username, proposal_id) |
| 1326 | |
| 1327 | await session.flush() |
| 1328 | return await list_reviews(session, repo_id=repo_id, proposal_id=proposal_id) |
| 1329 | |
| 1330 | |
| 1331 | async def remove_reviewer( |
| 1332 | session: AsyncSession, |
| 1333 | *, |
| 1334 | repo_id: str, |
| 1335 | proposal_id: str, |
| 1336 | username: str, |
| 1337 | ) -> ProposalReviewListResponse: |
| 1338 | """Remove a pending review request for ``username`` on a proposal. |
| 1339 | |
| 1340 | Only ``pending`` rows may be removed — submitted reviews are immutable to |
| 1341 | preserve the audit trail. |
| 1342 | |
| 1343 | Raises ``ValueError`` if the proposal does not exist, the reviewer was never |
| 1344 | requested, or the reviewer has already submitted a non-pending review. |
| 1345 | |
| 1346 | Returns the updated review list. |
| 1347 | """ |
| 1348 | await _assert_proposal_exists(session, repo_id, proposal_id) |
| 1349 | |
| 1350 | stmt = select(MusehubProposalReview).where( |
| 1351 | MusehubProposalReview.proposal_id == proposal_id, |
| 1352 | MusehubProposalReview.reviewer_username == username, |
| 1353 | ) |
| 1354 | row = (await session.execute(stmt)).scalar_one_or_none() |
| 1355 | if row is None: |
| 1356 | raise ValueError(f"Reviewer '{username}' was not requested on proposal {proposal_id}") |
| 1357 | if row.state != "pending": |
| 1358 | raise ValueError( |
| 1359 | f"Cannot remove reviewer '{username}': review already submitted (state={row.state})" |
| 1360 | ) |
| 1361 | |
| 1362 | await session.delete(row) |
| 1363 | await session.flush() |
| 1364 | logger.info("✅ Removed review request for '%s' from proposal %s", username, proposal_id) |
| 1365 | return await list_reviews(session, repo_id=repo_id, proposal_id=proposal_id) |
| 1366 | |
| 1367 | |
| 1368 | async def list_reviews( |
| 1369 | session: AsyncSession, |
| 1370 | *, |
| 1371 | repo_id: str, |
| 1372 | proposal_id: str, |
| 1373 | state: str | None = None, |
| 1374 | cursor: str | None = None, |
| 1375 | limit: int = 20, |
| 1376 | ) -> ProposalReviewListResponse: |
| 1377 | """Return reviews for a proposal with cursor-based keyset pagination. |
| 1378 | |
| 1379 | ``state`` may be one of ``pending``, ``approved``, ``changes_requested``, |
| 1380 | or ``dismissed``. When ``None``, all reviews are returned. |
| 1381 | Results are ordered by ``created_at`` ascending. |
| 1382 | |
| 1383 | ``cursor`` is the ISO 8601 ``created_at`` of the last seen review |
| 1384 | (opaque to callers — pass ``nextCursor`` from a previous response |
| 1385 | verbatim). Omit to start from the beginning. |
| 1386 | |
| 1387 | Raises ``ValueError`` if the proposal does not exist in the repo. |
| 1388 | """ |
| 1389 | await _assert_proposal_exists(session, repo_id, proposal_id) |
| 1390 | |
| 1391 | conditions = [MusehubProposalReview.proposal_id == proposal_id] |
| 1392 | if state is not None: |
| 1393 | conditions.append(MusehubProposalReview.state == state) |
| 1394 | |
| 1395 | count_stmt = select(func.count(MusehubProposalReview.review_id)).where(*conditions) |
| 1396 | total: int = (await session.execute(count_stmt)).scalar_one() |
| 1397 | |
| 1398 | data_conditions = list(conditions) |
| 1399 | if cursor is not None: |
| 1400 | data_conditions.append( |
| 1401 | MusehubProposalReview.created_at > datetime.fromisoformat(cursor) |
| 1402 | ) |
| 1403 | |
| 1404 | rows = list( |
| 1405 | ( |
| 1406 | await session.execute( |
| 1407 | select(MusehubProposalReview) |
| 1408 | .where(*data_conditions) |
| 1409 | .order_by(MusehubProposalReview.created_at) |
| 1410 | .limit(limit + 1) |
| 1411 | ) |
| 1412 | ).scalars() |
| 1413 | ) |
| 1414 | |
| 1415 | next_cursor: str | None = None |
| 1416 | if len(rows) == limit + 1: |
| 1417 | next_cursor = rows[limit - 1].created_at.isoformat() |
| 1418 | rows = rows[:limit] |
| 1419 | |
| 1420 | return ProposalReviewListResponse( |
| 1421 | reviews=[_to_review_response(r) for r in rows], |
| 1422 | total=total, |
| 1423 | next_cursor=next_cursor, |
| 1424 | ) |
| 1425 | |
| 1426 | |
| 1427 | async def submit_review( |
| 1428 | session: AsyncSession, |
| 1429 | *, |
| 1430 | repo_id: str, |
| 1431 | proposal_id: str, |
| 1432 | reviewer_username: str, |
| 1433 | reviewer_identity_id: str = "", |
| 1434 | verdict: str, |
| 1435 | body: str = "", |
| 1436 | ) -> ProposalReviewResponse: |
| 1437 | """Submit or update a formal review verdict for ``reviewer_username`` on a proposal. |
| 1438 | |
| 1439 | ``verdict`` maps to a new state: |
| 1440 | - ``approve`` → ``approved`` |
| 1441 | - ``request_changes`` → ``changes_requested`` |
| 1442 | |
| 1443 | If an existing row for this reviewer already exists, it is updated in-place, |
| 1444 | so changing from approve → request_changes (or vice versa) works by resubmitting. |
| 1445 | Ad-hoc reviews (no prior reviewer request) are also allowed. |
| 1446 | |
| 1447 | Raises ``ValueError`` if the proposal does not exist in the repo. |
| 1448 | """ |
| 1449 | await _assert_proposal_exists(session, repo_id, proposal_id) |
| 1450 | |
| 1451 | _VERDICT_TO_STATE: StrDict = { |
| 1452 | "approve": "approved", |
| 1453 | "request_changes": "changes_requested", |
| 1454 | } |
| 1455 | if verdict not in _VERDICT_TO_STATE: |
| 1456 | raise ValueError(f"Invalid verdict '{verdict}'. Must be approve or request_changes.") |
| 1457 | new_state = _VERDICT_TO_STATE[verdict] |
| 1458 | |
| 1459 | stmt = select(MusehubProposalReview).where( |
| 1460 | MusehubProposalReview.proposal_id == proposal_id, |
| 1461 | MusehubProposalReview.reviewer_username == reviewer_username, |
| 1462 | ) |
| 1463 | row = (await session.execute(stmt)).scalar_one_or_none() |
| 1464 | |
| 1465 | now = _utc_now() |
| 1466 | if row is None: |
| 1467 | row = MusehubProposalReview( |
| 1468 | review_id=compute_review_id(proposal_id, reviewer_identity_id, now.isoformat()), |
| 1469 | proposal_id=proposal_id, |
| 1470 | reviewer_username=reviewer_username, |
| 1471 | state=new_state, |
| 1472 | body=body or None, |
| 1473 | submitted_at=now, |
| 1474 | created_at=now, |
| 1475 | ) |
| 1476 | session.add(row) |
| 1477 | else: |
| 1478 | row.state = new_state |
| 1479 | row.body = body or None |
| 1480 | row.submitted_at = now |
| 1481 | |
| 1482 | await session.flush() |
| 1483 | await session.refresh(row) |
| 1484 | logger.info( |
| 1485 | "✅ Review submitted by '%s' on proposal %s: verdict=%s state=%s", |
| 1486 | reviewer_username, |
| 1487 | proposal_id, |
| 1488 | verdict, |
| 1489 | new_state, |
| 1490 | ) |
| 1491 | return _to_review_response(row) |
| 1492 | |
| 1493 | # ── Proposal list enrichment ────────────────────────────────────────────────── |
| 1494 | |
| 1495 | _RISK_BAND_THRESHOLDS: list[tuple[float, str]] = [ |
| 1496 | (0.75, "critical"), |
| 1497 | (0.50, "high"), |
| 1498 | (0.25, "medium"), |
| 1499 | (0.01, "low"), |
| 1500 | ] |
| 1501 | |
| 1502 | |
| 1503 | def _score_to_band(score: float) -> str: |
| 1504 | """Map a [0.0, 1.0] risk score to a human-readable band label. |
| 1505 | |
| 1506 | Thresholds: |
| 1507 | ≥ 0.75 → "critical" |
| 1508 | ≥ 0.50 → "high" |
| 1509 | ≥ 0.25 → "medium" |
| 1510 | > 0.0 → "low" |
| 1511 | 0.0 → "none" |
| 1512 | """ |
| 1513 | for threshold, band in _RISK_BAND_THRESHOLDS: |
| 1514 | if score >= threshold: |
| 1515 | return band |
| 1516 | return "none" |
| 1517 | |
| 1518 | |
| 1519 | # Default required-approvals when merge_conditions is null. |
| 1520 | _DEFAULT_REQUIRED_APPROVALS = 2 |
| 1521 | |
| 1522 | # Domain weight map used for aggregate risk score. Unknown domains default to 1.0. |
| 1523 | _DOMAIN_WEIGHTS: dict[str, float] = { |
| 1524 | "code": 1.2, |
| 1525 | "midi": 1.0, |
| 1526 | "stems": 1.0, |
| 1527 | "pay": 1.5, |
| 1528 | } |
| 1529 | |
| 1530 | |
| 1531 | class _ProposalPrefetch: |
| 1532 | """Holds pre-fetched batch data for a page of proposals. |
| 1533 | |
| 1534 | All DB reads for an entire page happen once in |
| 1535 | ``enrich_proposal_list_batch``; each ``enrich_proposal_list_entry`` call |
| 1536 | consults these in-memory maps — zero additional DB I/O per row. |
| 1537 | """ |
| 1538 | |
| 1539 | def __init__( |
| 1540 | self, |
| 1541 | *, |
| 1542 | reviews_by_proposal: dict[str, list[MusehubProposalReview]], |
| 1543 | author_types: dict[str, str], |
| 1544 | dag: ProposalDag | None = None, |
| 1545 | conflict_counts: dict[str, int | None] | None = None, |
| 1546 | ) -> None: |
| 1547 | self.reviews_by_proposal = reviews_by_proposal |
| 1548 | self.author_types = author_types |
| 1549 | self.dag: ProposalDag = dag or ProposalDag() |
| 1550 | # proposal_id → conflict_scan.result["conflict_count"]; None if not run |
| 1551 | self.conflict_counts: dict[str, int | None] = conflict_counts or {} |
| 1552 | |
| 1553 | |
| 1554 | async def _prefetch_for_batch( |
| 1555 | proposals: list[MusehubProposal], |
| 1556 | session: AsyncSession, |
| 1557 | ) -> _ProposalPrefetch: |
| 1558 | """Run the batch pre-fetch queries for a page of proposals. |
| 1559 | |
| 1560 | Issues exactly two DB queries regardless of page size: |
| 1561 | 1. All reviews for every proposal in the page. |
| 1562 | 2. Identity types for every author in the page. |
| 1563 | |
| 1564 | Args: |
| 1565 | proposals: ORM rows for the current page. |
| 1566 | session: Shared async session. |
| 1567 | |
| 1568 | Returns: |
| 1569 | ``_ProposalPrefetch`` with maps keyed by proposal_id / author handle. |
| 1570 | """ |
| 1571 | proposal_ids = [p.proposal_id for p in proposals] |
| 1572 | author_handles = list({p.author for p in proposals if p.author}) |
| 1573 | |
| 1574 | # Query 1 — reviews |
| 1575 | reviews_by_proposal: dict[str, list[MusehubProposalReview]] = {pid: [] for pid in proposal_ids} |
| 1576 | if proposal_ids: |
| 1577 | review_rows = list( |
| 1578 | ( |
| 1579 | await session.execute( |
| 1580 | select(MusehubProposalReview).where( |
| 1581 | MusehubProposalReview.proposal_id.in_(proposal_ids) |
| 1582 | ) |
| 1583 | ) |
| 1584 | ).scalars() |
| 1585 | ) |
| 1586 | for row in review_rows: |
| 1587 | reviews_by_proposal[row.proposal_id].append(row) |
| 1588 | |
| 1589 | # Query 2 — identity types |
| 1590 | author_types: dict[str, str] = {} |
| 1591 | if author_handles: |
| 1592 | identity_rows = list( |
| 1593 | ( |
| 1594 | await session.execute( |
| 1595 | select(MusehubIdentity.handle, MusehubIdentity.identity_type).where( |
| 1596 | MusehubIdentity.handle.in_(author_handles) |
| 1597 | ) |
| 1598 | ) |
| 1599 | ).all() |
| 1600 | ) |
| 1601 | for handle, itype in identity_rows: |
| 1602 | author_types[handle] = itype |
| 1603 | |
| 1604 | # Query 3 — dependency DAG (partial, scoped to this page + neighbours) |
| 1605 | dag = await load_dag_for_proposals(session, proposal_ids) |
| 1606 | |
| 1607 | # Query 4 — latest conflict_scan simulation per proposal (for list summary) |
| 1608 | conflict_counts: dict[str, int | None] = {pid: None for pid in proposal_ids} |
| 1609 | if proposal_ids: |
| 1610 | sim_rows = list( |
| 1611 | ( |
| 1612 | await session.execute( |
| 1613 | select( |
| 1614 | MusehubProposalSimulation.proposal_id, |
| 1615 | MusehubProposalSimulation.result, |
| 1616 | ).where( |
| 1617 | MusehubProposalSimulation.proposal_id.in_(proposal_ids), |
| 1618 | MusehubProposalSimulation.simulation_type == "conflict_scan", |
| 1619 | ) |
| 1620 | ) |
| 1621 | ).all() |
| 1622 | ) |
| 1623 | for pid, result_json in sim_rows: |
| 1624 | if isinstance(result_json, dict): |
| 1625 | conflict_counts[pid] = result_json.get("conflict_count") |
| 1626 | |
| 1627 | return _ProposalPrefetch( |
| 1628 | reviews_by_proposal=reviews_by_proposal, |
| 1629 | author_types=author_types, |
| 1630 | dag=dag, |
| 1631 | conflict_counts=conflict_counts, |
| 1632 | ) |
| 1633 | |
| 1634 | |
| 1635 | def _enrich_one( |
| 1636 | proposal: MusehubProposal, |
| 1637 | prefetch: _ProposalPrefetch, |
| 1638 | ) -> ProposalListEntry: |
| 1639 | """Compute all display-facing fields for a single proposal list row. |
| 1640 | |
| 1641 | This is the single source of truth for what the proposals list view renders |
| 1642 | per row. It does not issue any DB queries — all needed data comes from |
| 1643 | ``prefetch``, which is populated by ``_prefetch_for_batch`` before this |
| 1644 | function is called. |
| 1645 | |
| 1646 | Computed fields (all server-side): |
| 1647 | - active_domains: domains with non-zero risk_score |
| 1648 | - domain_risk / domain_risk_band: derived from proposal.risk_score |
| 1649 | (currently a single code-domain score; extended as more domains land) |
| 1650 | - aggregate_risk_score: weighted mean across active domains |
| 1651 | - aggregate_risk_band: band for the aggregate score |
| 1652 | - approval_count / domains_approved / domains_pending_review: |
| 1653 | derived from pre-fetched reviews |
| 1654 | - all_merge_conditions_met: approval_count >= required_approvals |
| 1655 | and breakage_count == 0 |
| 1656 | - author_type: resolved from pre-fetched MusehubIdentity rows |
| 1657 | |
| 1658 | Performance contract: |
| 1659 | Zero DB I/O. All reads come from the ``prefetch`` maps. Typical |
| 1660 | wall time: < 1ms per row on warm prefetch data. |
| 1661 | |
| 1662 | Args: |
| 1663 | proposal: ORM row for this proposal. |
| 1664 | prefetch: Pre-fetched batch data from ``_prefetch_for_batch``. |
| 1665 | |
| 1666 | Returns: |
| 1667 | ``ProposalListEntry`` with all fields populated. |
| 1668 | |
| 1669 | Raises: |
| 1670 | ValueError: If ``proposal.risk_score`` is outside ``[0.0, 1.0]``. |
| 1671 | """ |
| 1672 | pid = proposal.proposal_id |
| 1673 | |
| 1674 | # ── Risk ───────────────────────────────────────────────────────────────── |
| 1675 | # Use dimensional_risk dict (Phase 1 ORM columns) when present; fall back |
| 1676 | # to the scalar risk_score as code-domain risk for backwards compatibility. |
| 1677 | raw_dimensional = dict(getattr(proposal, "dimensional_risk", None) or {}) |
| 1678 | if raw_dimensional: |
| 1679 | domain_risk = {d: float(v) for d, v in raw_dimensional.items() if float(v) > 0.0} |
| 1680 | else: |
| 1681 | code_risk = float(proposal.risk_score or 0.0) |
| 1682 | if not (0.0 <= code_risk <= 1.0): |
| 1683 | raise ValueError(f"proposal {pid}: risk_score {code_risk!r} out of [0, 1]") |
| 1684 | domain_risk = {"code": code_risk} if code_risk > 0.0 else {} |
| 1685 | active_domains = list(domain_risk.keys()) |
| 1686 | domain_risk_band = {d: _score_to_band(v) for d, v in domain_risk.items()} |
| 1687 | |
| 1688 | # Weighted aggregate |
| 1689 | if domain_risk: |
| 1690 | total_weight = sum(_DOMAIN_WEIGHTS.get(d, 1.0) for d in domain_risk) |
| 1691 | aggregate_risk_score = sum( |
| 1692 | v * _DOMAIN_WEIGHTS.get(d, 1.0) for d, v in domain_risk.items() |
| 1693 | ) / total_weight |
| 1694 | else: |
| 1695 | aggregate_risk_score = 0.0 |
| 1696 | aggregate_risk_band = _score_to_band(aggregate_risk_score) |
| 1697 | |
| 1698 | # ── Dependency position ─────────────────────────────────────────────────── |
| 1699 | dag = prefetch.dag |
| 1700 | dep_blocked_by = blocked_by_numbers(dag, pid) |
| 1701 | dep_blocks = blocks_numbers(dag, pid) |
| 1702 | dep_is_blocked = is_blocked(dag, pid) |
| 1703 | mc_raw = proposal.merge_conditions or {} |
| 1704 | require_dep_merged: bool = mc_raw.get("require_dependency_merged", True) |
| 1705 | deps_satisfied = (not require_dep_merged) or (not dep_is_blocked) |
| 1706 | |
| 1707 | # ── Reviews ─────────────────────────────────────────────────────────────── |
| 1708 | reviews = prefetch.reviews_by_proposal.get(pid, []) |
| 1709 | approved_reviews = [r for r in reviews if r.state == "approved"] |
| 1710 | approval_count = len(approved_reviews) |
| 1711 | required_approvals = _DEFAULT_REQUIRED_APPROVALS |
| 1712 | domains_approved = ["code"] if approval_count > 0 and "code" in active_domains else [] |
| 1713 | domains_pending_review = [d for d in active_domains if d not in domains_approved] |
| 1714 | all_merge_conditions_met = ( |
| 1715 | approval_count >= required_approvals |
| 1716 | and proposal.breakage_count == 0 |
| 1717 | and deps_satisfied |
| 1718 | ) |
| 1719 | |
| 1720 | # ── Author type ─────────────────────────────────────────────────────────── |
| 1721 | author_type = prefetch.author_types.get(proposal.author, "human") |
| 1722 | agent_model: str | None = getattr(proposal, "agent_model", None) |
| 1723 | agent_spawned_by: str | None = getattr(proposal, "agent_spawned_by", None) |
| 1724 | |
| 1725 | # ── Symbol preview ──────────────────────────────────────────────────────── |
| 1726 | touched = list(proposal.touched_symbols or []) |
| 1727 | touched_symbols_preview = touched[:3] |
| 1728 | |
| 1729 | return ProposalListEntry( |
| 1730 | proposal_id=proposal.proposal_id, |
| 1731 | proposal_number=proposal.proposal_number, |
| 1732 | title=(proposal.title[:80] + "…") if len(proposal.title) > 80 else proposal.title, |
| 1733 | state=proposal.state, |
| 1734 | proposal_type=getattr(proposal, "proposal_type", "state_merge"), |
| 1735 | from_branch=proposal.from_branch, |
| 1736 | to_branch=proposal.to_branch, |
| 1737 | author=proposal.author, |
| 1738 | author_type=author_type, |
| 1739 | created_at=proposal.created_at, |
| 1740 | merged_at=proposal.merged_at, |
| 1741 | is_draft=getattr(proposal, "is_draft", proposal.state == "drafting"), |
| 1742 | active_domains=active_domains, |
| 1743 | domain_risk=domain_risk, |
| 1744 | domain_risk_band=domain_risk_band, |
| 1745 | aggregate_risk_score=round(aggregate_risk_score, 4), |
| 1746 | aggregate_risk_band=aggregate_risk_band, |
| 1747 | approval_count=approval_count, |
| 1748 | required_approvals=required_approvals, |
| 1749 | domains_approved=domains_approved, |
| 1750 | domains_pending_review=domains_pending_review, |
| 1751 | all_merge_conditions_met=all_merge_conditions_met, |
| 1752 | blocked_by=dep_blocked_by, |
| 1753 | blocks=dep_blocks, |
| 1754 | is_blocked=dep_is_blocked, |
| 1755 | symbols_changed=proposal.symbols_changed, |
| 1756 | breakage_count=proposal.breakage_count, |
| 1757 | test_gap_count=proposal.test_gap_count, |
| 1758 | touched_symbols_preview=touched_symbols_preview, |
| 1759 | midi_tracks_changed=getattr(proposal, "midi_tracks_changed", 0), |
| 1760 | midi_notes_delta=getattr(proposal, "midi_notes_delta", 0), |
| 1761 | harmonic_tension_delta=getattr(proposal, "harmonic_tension_delta", None), |
| 1762 | payment_claim_count=getattr(proposal, "payment_claim_count", 0), |
| 1763 | payment_ledger_delta_nano=getattr(proposal, "payment_ledger_delta_nano", 0), |
| 1764 | payment_avax_address=getattr(proposal, "payment_avax_address", None), |
| 1765 | payment_settling=proposal.state == "settling" and "pay" in active_domains, |
| 1766 | agent_model=agent_model, |
| 1767 | agent_spawned_by=agent_spawned_by, |
| 1768 | merge_strategy=getattr(proposal, "merge_strategy", "overlay") or "overlay", |
| 1769 | simulation_conflict_count=prefetch.conflict_counts.get(pid), |
| 1770 | ) |
| 1771 | |
| 1772 | |
| 1773 | async def enrich_proposal_list_entry( |
| 1774 | proposal: MusehubProposal, |
| 1775 | session: AsyncSession, |
| 1776 | ) -> ProposalListEntry: |
| 1777 | """Compute all display-facing fields for a single proposal list row. |
| 1778 | |
| 1779 | Convenience wrapper around ``_enrich_one`` for callers that need to enrich |
| 1780 | a single proposal without a pre-existing batch context. Issues its own |
| 1781 | prefetch queries (2 DB round-trips). |
| 1782 | |
| 1783 | For a full page (≥2 proposals) prefer ``enrich_proposal_list_batch`` which |
| 1784 | amortises the prefetch cost across all rows via a single parallel pass. |
| 1785 | |
| 1786 | Args: |
| 1787 | proposal: ORM row for this proposal. Must have ``repo_id`` set. |
| 1788 | session: Async database session. |
| 1789 | |
| 1790 | Returns: |
| 1791 | ``ProposalListEntry`` with all fields populated. |
| 1792 | |
| 1793 | Raises: |
| 1794 | ValueError: If ``proposal.risk_score`` is outside ``[0.0, 1.0]``. |
| 1795 | """ |
| 1796 | prefetch = await _prefetch_for_batch([proposal], session) |
| 1797 | return _enrich_one(proposal, prefetch) |
| 1798 | |
| 1799 | |
| 1800 | async def enrich_proposal_list_batch( |
| 1801 | proposals: list[MusehubProposal], |
| 1802 | session: AsyncSession, |
| 1803 | ) -> list[ProposalListEntry]: |
| 1804 | """Enrich a full page of proposal rows in a single parallel pass. |
| 1805 | |
| 1806 | Pre-fetch strategy: |
| 1807 | Issues exactly 2 DB queries for the entire batch (reviews + identity |
| 1808 | types), then calls ``_enrich_one`` for each row synchronously. |
| 1809 | The synchronous per-row work is CPU-only (no I/O), so no concurrency |
| 1810 | overhead is needed. |
| 1811 | |
| 1812 | Ordering: |
| 1813 | Result list is in the same order as ``proposals``. |
| 1814 | |
| 1815 | Args: |
| 1816 | proposals: ORM rows for the current page (typically ≤ 20). |
| 1817 | session: Async session shared across the batch. |
| 1818 | |
| 1819 | Returns: |
| 1820 | List of ``ProposalListEntry`` in the same order as ``proposals``. |
| 1821 | |
| 1822 | Performance target: < 50ms for 20 proposals (dominated by the 2 DB |
| 1823 | queries; per-row computation is < 0.1ms). |
| 1824 | """ |
| 1825 | if not proposals: |
| 1826 | return [] |
| 1827 | prefetch = await _prefetch_for_batch(proposals, session) |
| 1828 | return [_enrich_one(p, prefetch) for p in proposals] |
| 1829 | |
| 1830 | |
| 1831 | async def get_domain_heat( |
| 1832 | repo_id: str, |
| 1833 | state: str, |
| 1834 | session: AsyncSession, |
| 1835 | ) -> DomainHeatResponse: |
| 1836 | """Return per-domain proposal counts and average risk for the heat bar. |
| 1837 | |
| 1838 | Runs a single aggregation query against ``musehub_proposals`` filtered by |
| 1839 | ``repo_id`` and ``state``. The heat bar currently reflects the code domain |
| 1840 | only (the only domain with populated risk scores at this phase); additional |
| 1841 | domains are added as multi-domain risk rows land in Phase 2. |
| 1842 | |
| 1843 | ``avg_risk`` is the arithmetic mean of non-zero ``risk_score`` values for |
| 1844 | proposals in the given state. Domains with zero matching proposals are |
| 1845 | omitted from the response dict. |
| 1846 | |
| 1847 | Args: |
| 1848 | repo_id: Repository to query. |
| 1849 | state: Proposal state filter (e.g. ``"open"``). Pass ``"open"`` for |
| 1850 | the standard heat bar view. Pass ``"all"`` to skip the state |
| 1851 | filter entirely. |
| 1852 | session: Async session. |
| 1853 | |
| 1854 | Returns: |
| 1855 | ``DomainHeatResponse`` with ``domains`` dict and ``total_open`` count. |
| 1856 | |
| 1857 | Performance target: < 20ms (single aggregation query, no per-row work). |
| 1858 | """ |
| 1859 | conditions = [MusehubProposal.repo_id == repo_id] |
| 1860 | if state != "all": |
| 1861 | conditions.append(MusehubProposal.state == state) |
| 1862 | |
| 1863 | total: int = ( |
| 1864 | await session.execute( |
| 1865 | select(func.count(MusehubProposal.proposal_id)).where(*conditions) |
| 1866 | ) |
| 1867 | ).scalar_one() |
| 1868 | |
| 1869 | # Code domain: all proposals in this repo/state (code is the only domain for now). |
| 1870 | # avg_risk computed from non-null, non-zero risk_score values only. |
| 1871 | risk_rows = list( |
| 1872 | ( |
| 1873 | await session.execute( |
| 1874 | select(MusehubProposal.risk_score).where( |
| 1875 | *conditions, |
| 1876 | MusehubProposal.risk_score.isnot(None), |
| 1877 | MusehubProposal.risk_score > 0.0, |
| 1878 | ) |
| 1879 | ) |
| 1880 | ).scalars() |
| 1881 | ) |
| 1882 | avg_risk = round(sum(risk_rows) / len(risk_rows), 4) if risk_rows else 0.0 |
| 1883 | |
| 1884 | # All known domains — code count = total (single-domain repos); others = 0. |
| 1885 | # Multi-domain heat will populate midi/stems/pay when those proposals land. |
| 1886 | domains: dict[str, DomainHeatEntry] = { |
| 1887 | "code": DomainHeatEntry(count=total, avg_risk=avg_risk), |
| 1888 | "midi": DomainHeatEntry(count=0, avg_risk=0.0), |
| 1889 | "stems": DomainHeatEntry(count=0, avg_risk=0.0), |
| 1890 | "pay": DomainHeatEntry(count=0, avg_risk=0.0), |
| 1891 | } |
| 1892 | |
| 1893 | return DomainHeatResponse(domains=domains, total_open=total) |
| 1894 | |
| 1895 | |
| 1896 | async def get_merge_readiness( |
| 1897 | repo_id: str, |
| 1898 | session: AsyncSession, |
| 1899 | ) -> MergeReadinessResponse: |
| 1900 | """Bucket all non-merged proposals into readiness categories. |
| 1901 | |
| 1902 | Categories: |
| 1903 | ready: approval_count >= required threshold AND breakage_count == 0 |
| 1904 | settling: state == 'settling' |
| 1905 | needs_review: not settling, conditions not fully met |
| 1906 | |
| 1907 | Dependency-blocked proposals (``blocked_by`` non-empty) are not yet |
| 1908 | tracked in the DB at this phase; ``blocked`` will always be empty until |
| 1909 | the dependency graph table lands in Phase 2. |
| 1910 | |
| 1911 | Runs in one DB query; no per-row enrichment. |
| 1912 | |
| 1913 | Args: |
| 1914 | repo_id: Repository to query. |
| 1915 | session: Async session. |
| 1916 | |
| 1917 | Returns: |
| 1918 | ``MergeReadinessResponse`` with ``ready``, ``blocked``, ``settling``, |
| 1919 | and ``needs_review`` lists of proposal numbers. |
| 1920 | |
| 1921 | Performance target: < 20ms. |
| 1922 | """ |
| 1923 | rows = list( |
| 1924 | ( |
| 1925 | await session.execute( |
| 1926 | select( |
| 1927 | MusehubProposal.proposal_number, |
| 1928 | MusehubProposal.state, |
| 1929 | MusehubProposal.breakage_count, |
| 1930 | ).where( |
| 1931 | MusehubProposal.repo_id == repo_id, |
| 1932 | MusehubProposal.state.notin_(["merged", "abandoned"]), |
| 1933 | ) |
| 1934 | ) |
| 1935 | ).all() |
| 1936 | ) |
| 1937 | |
| 1938 | # Pre-fetch approval counts in one query |
| 1939 | proposal_numbers_to_check = [r.proposal_number for r in rows] |
| 1940 | if proposal_numbers_to_check: |
| 1941 | approval_counts_rows = list( |
| 1942 | ( |
| 1943 | await session.execute( |
| 1944 | select( |
| 1945 | MusehubProposal.proposal_number, |
| 1946 | func.count(MusehubProposalReview.review_id).label("approved_count"), |
| 1947 | ) |
| 1948 | .join( |
| 1949 | MusehubProposalReview, |
| 1950 | (MusehubProposalReview.proposal_id == MusehubProposal.proposal_id) |
| 1951 | & (MusehubProposalReview.state == "approved"), |
| 1952 | isouter=True, |
| 1953 | ) |
| 1954 | .where( |
| 1955 | MusehubProposal.repo_id == repo_id, |
| 1956 | MusehubProposal.proposal_number.in_(proposal_numbers_to_check), |
| 1957 | ) |
| 1958 | .group_by(MusehubProposal.proposal_number) |
| 1959 | ) |
| 1960 | ).all() |
| 1961 | ) |
| 1962 | approval_by_number: dict[int, int] = {r.proposal_number: r.approved_count for r in approval_counts_rows} |
| 1963 | else: |
| 1964 | approval_by_number = {} |
| 1965 | |
| 1966 | ready: list[int] = [] |
| 1967 | blocked: list[int] = [] |
| 1968 | settling: list[int] = [] |
| 1969 | needs_review: list[int] = [] |
| 1970 | |
| 1971 | for row in rows: |
| 1972 | if row.state == "settling": |
| 1973 | settling.append(row.proposal_number) |
| 1974 | elif ( |
| 1975 | approval_by_number.get(row.proposal_number, 0) >= _DEFAULT_REQUIRED_APPROVALS |
| 1976 | and row.breakage_count == 0 |
| 1977 | ): |
| 1978 | ready.append(row.proposal_number) |
| 1979 | else: |
| 1980 | needs_review.append(row.proposal_number) |
| 1981 | |
| 1982 | return MergeReadinessResponse( |
| 1983 | ready=ready, |
| 1984 | blocked=blocked, |
| 1985 | settling=settling, |
| 1986 | needs_review=needs_review, |
| 1987 | ) |
| 1988 | |
| 1989 | |
| 1990 | # ───────────────────────────────────────────────────────────────────────────── |
| 1991 | # Phase 4 — Simulation Engine |
| 1992 | # ───────────────────────────────────────────────────────────────────────────── |
| 1993 | |
| 1994 | _VALID_SIMULATION_TYPES = {"conflict_scan", "risk_projection", "dependency_order"} |
| 1995 | |
| 1996 | |
| 1997 | def _to_simulation_response(row: MusehubProposalSimulation, *, is_stale: bool) -> SimulationResponse: |
| 1998 | return SimulationResponse( |
| 1999 | simulation_id=row.simulation_id, |
| 2000 | proposal_id=row.proposal_id, |
| 2001 | simulation_type=row.simulation_type, |
| 2002 | result=row.result, |
| 2003 | is_stale=is_stale, |
| 2004 | from_branch_commit_id=row.from_branch_commit_id, |
| 2005 | duration_ms=row.duration_ms, |
| 2006 | created_at=row.created_at, |
| 2007 | expires_at=row.expires_at, |
| 2008 | ) |
| 2009 | |
| 2010 | |
| 2011 | async def _current_from_commit( |
| 2012 | session: AsyncSession, |
| 2013 | repo_id: str, |
| 2014 | from_branch: str, |
| 2015 | ) -> str: |
| 2016 | """Return the current head_commit_id of from_branch, or '' if missing.""" |
| 2017 | branch = await _get_branch(session, repo_id, from_branch) |
| 2018 | return (branch.head_commit_id or "") if branch else "" |
| 2019 | |
| 2020 | |
| 2021 | async def run_simulation( |
| 2022 | session: AsyncSession, |
| 2023 | repo_id: str, |
| 2024 | proposal_id: str, |
| 2025 | simulation_type: str, |
| 2026 | ) -> SimulationResponse: |
| 2027 | """Run a simulation for the proposal and upsert the cached result. |
| 2028 | |
| 2029 | Always recomputes — use get_simulation to read the cache without re-running. |
| 2030 | |
| 2031 | Raises: |
| 2032 | ValueError: Unknown simulation_type or proposal not found. |
| 2033 | """ |
| 2034 | import time |
| 2035 | |
| 2036 | from musehub.services.proposal_simulation import ( |
| 2037 | simulate_conflict_scan, |
| 2038 | simulate_dependency_order, |
| 2039 | simulate_risk_projection, |
| 2040 | ) |
| 2041 | from musehub.services.musehub_snapshot import get_snapshot_manifest |
| 2042 | |
| 2043 | if simulation_type not in _VALID_SIMULATION_TYPES: |
| 2044 | raise ValueError( |
| 2045 | f"Unknown simulation_type '{simulation_type}'. " |
| 2046 | f"Valid types: {sorted(_VALID_SIMULATION_TYPES)}" |
| 2047 | ) |
| 2048 | |
| 2049 | stmt = select(MusehubProposal).where( |
| 2050 | MusehubProposal.repo_id == repo_id, |
| 2051 | MusehubProposal.proposal_id == proposal_id, |
| 2052 | ) |
| 2053 | proposal = (await session.execute(stmt)).scalar_one_or_none() |
| 2054 | if proposal is None: |
| 2055 | raise ValueError(f"Proposal {proposal_id} not found in repo {repo_id}") |
| 2056 | |
| 2057 | from_commit_id = await _current_from_commit(session, repo_id, proposal.from_branch) |
| 2058 | to_b = await _get_branch(session, repo_id, proposal.to_branch) |
| 2059 | from_b = await _get_branch(session, repo_id, proposal.from_branch) |
| 2060 | |
| 2061 | to_manifest: StrDict = {} |
| 2062 | if to_b and to_b.head_commit_id: |
| 2063 | to_head = await session.get(MusehubCommit, to_b.head_commit_id) |
| 2064 | if to_head and to_head.snapshot_id: |
| 2065 | to_manifest = await get_snapshot_manifest(session, to_head.snapshot_id) |
| 2066 | |
| 2067 | from_manifest: StrDict = {} |
| 2068 | if from_b and from_b.head_commit_id: |
| 2069 | from_head = await session.get(MusehubCommit, from_b.head_commit_id) |
| 2070 | if from_head and from_head.snapshot_id: |
| 2071 | from_manifest = await get_snapshot_manifest(session, from_head.snapshot_id) |
| 2072 | |
| 2073 | strategy_name = getattr(proposal, "merge_strategy", "overlay") or "overlay" |
| 2074 | selective_domains: list[str] | None = getattr(proposal, "selective_domains", None) |
| 2075 | dimensional_risk: dict[str, float] | None = getattr(proposal, "dimensional_risk", None) |
| 2076 | |
| 2077 | ancestor_manifest: StrDict | None = None |
| 2078 | if strategy_name in ("weave", "replay", "selective", "phased"): |
| 2079 | ancestor_manifest = await _resolve_ancestor_manifest( |
| 2080 | session, repo_id, proposal.from_branch, proposal.to_branch |
| 2081 | ) |
| 2082 | |
| 2083 | t0 = time.monotonic() |
| 2084 | if simulation_type == "conflict_scan": |
| 2085 | result_payload = simulate_conflict_scan( |
| 2086 | to_manifest, |
| 2087 | from_manifest, |
| 2088 | ancestor_manifest=ancestor_manifest, |
| 2089 | strategy=strategy_name, |
| 2090 | selective_domains=selective_domains, |
| 2091 | ) |
| 2092 | elif simulation_type == "risk_projection": |
| 2093 | result_payload = simulate_risk_projection( |
| 2094 | to_manifest, |
| 2095 | from_manifest, |
| 2096 | ancestor_manifest=ancestor_manifest, |
| 2097 | current_dimensional_risk=dimensional_risk, |
| 2098 | strategy=strategy_name, |
| 2099 | selective_domains=selective_domains, |
| 2100 | ) |
| 2101 | else: # dependency_order |
| 2102 | dag = await load_dag_for_proposals(session, [proposal_id]) |
| 2103 | result_payload = simulate_dependency_order(dag) |
| 2104 | |
| 2105 | duration_ms = int((time.monotonic() - t0) * 1000) |
| 2106 | |
| 2107 | sim_id = compute_simulation_id(proposal_id, simulation_type, from_commit_id) |
| 2108 | now = _utc_now() |
| 2109 | |
| 2110 | # Upsert: update on conflict (same proposal_id + simulation_type) |
| 2111 | existing_stmt = select(MusehubProposalSimulation).where( |
| 2112 | MusehubProposalSimulation.proposal_id == proposal_id, |
| 2113 | MusehubProposalSimulation.simulation_type == simulation_type, |
| 2114 | ) |
| 2115 | existing = (await session.execute(existing_stmt)).scalar_one_or_none() |
| 2116 | |
| 2117 | if existing is not None: |
| 2118 | existing.simulation_id = sim_id |
| 2119 | existing.from_branch_commit_id = from_commit_id |
| 2120 | existing.result = result_payload |
| 2121 | existing.duration_ms = duration_ms |
| 2122 | existing.created_at = now |
| 2123 | row = existing |
| 2124 | else: |
| 2125 | row = MusehubProposalSimulation( |
| 2126 | simulation_id=sim_id, |
| 2127 | proposal_id=proposal_id, |
| 2128 | simulation_type=simulation_type, |
| 2129 | from_branch_commit_id=from_commit_id, |
| 2130 | result=result_payload, |
| 2131 | duration_ms=duration_ms, |
| 2132 | created_at=now, |
| 2133 | ) |
| 2134 | session.add(row) |
| 2135 | |
| 2136 | await session.flush() |
| 2137 | |
| 2138 | logger.info( |
| 2139 | "🔬 Simulation %s for proposal %s (%dms)", |
| 2140 | simulation_type, proposal_id, duration_ms |
| 2141 | ) |
| 2142 | return _to_simulation_response(row, is_stale=False) |
| 2143 | |
| 2144 | |
| 2145 | async def get_simulation( |
| 2146 | session: AsyncSession, |
| 2147 | repo_id: str, |
| 2148 | proposal_id: str, |
| 2149 | simulation_type: str, |
| 2150 | ) -> SimulationResponse | None: |
| 2151 | """Return the cached simulation result, or None if never run. |
| 2152 | |
| 2153 | Sets ``is_stale=True`` when the from_branch has advanced since the |
| 2154 | simulation was last run. Does NOT re-run; call run_simulation for that. |
| 2155 | """ |
| 2156 | if simulation_type not in _VALID_SIMULATION_TYPES: |
| 2157 | raise ValueError( |
| 2158 | f"Unknown simulation_type '{simulation_type}'. " |
| 2159 | f"Valid types: {sorted(_VALID_SIMULATION_TYPES)}" |
| 2160 | ) |
| 2161 | |
| 2162 | stmt = select(MusehubProposalSimulation).where( |
| 2163 | MusehubProposalSimulation.proposal_id == proposal_id, |
| 2164 | MusehubProposalSimulation.simulation_type == simulation_type, |
| 2165 | ) |
| 2166 | row = (await session.execute(stmt)).scalar_one_or_none() |
| 2167 | if row is None: |
| 2168 | return None |
| 2169 | |
| 2170 | # Staleness check: compare stored commit ID with current branch tip |
| 2171 | proposal_stmt = select(MusehubProposal.from_branch).where( |
| 2172 | MusehubProposal.proposal_id == proposal_id, |
| 2173 | MusehubProposal.repo_id == repo_id, |
| 2174 | ) |
| 2175 | from_branch = (await session.execute(proposal_stmt)).scalar_one_or_none() |
| 2176 | is_stale = False |
| 2177 | if from_branch: |
| 2178 | current_commit = await _current_from_commit(session, repo_id, from_branch) |
| 2179 | is_stale = bool(current_commit) and current_commit != row.from_branch_commit_id |
| 2180 | |
| 2181 | return _to_simulation_response(row, is_stale=is_stale) |
| 2182 | |
| 2183 | |
| 2184 | async def list_simulations( |
| 2185 | session: AsyncSession, |
| 2186 | repo_id: str, |
| 2187 | proposal_id: str, |
| 2188 | ) -> SimulationListResponse: |
| 2189 | """Return all cached simulations for a proposal. |
| 2190 | |
| 2191 | Includes staleness flags. Never re-runs. |
| 2192 | """ |
| 2193 | # Resolve from_branch once for staleness checks |
| 2194 | proposal_stmt = select(MusehubProposal.from_branch).where( |
| 2195 | MusehubProposal.proposal_id == proposal_id, |
| 2196 | MusehubProposal.repo_id == repo_id, |
| 2197 | ) |
| 2198 | from_branch = (await session.execute(proposal_stmt)).scalar_one_or_none() |
| 2199 | current_commit = "" |
| 2200 | if from_branch: |
| 2201 | current_commit = await _current_from_commit(session, repo_id, from_branch) |
| 2202 | |
| 2203 | rows_stmt = select(MusehubProposalSimulation).where( |
| 2204 | MusehubProposalSimulation.proposal_id == proposal_id, |
| 2205 | ) |
| 2206 | rows = list((await session.execute(rows_stmt)).scalars().all()) |
| 2207 | |
| 2208 | responses = [ |
| 2209 | _to_simulation_response( |
| 2210 | r, |
| 2211 | is_stale=bool(current_commit) and current_commit != r.from_branch_commit_id, |
| 2212 | ) |
| 2213 | for r in rows |
| 2214 | ] |
| 2215 | return SimulationListResponse(simulations=responses, total=len(responses)) |
File History
11 commits
sha256:400438cf8bc700a611f1ba798aa9def68290f487dc19f7dbf317985ad17050c9
chore: delete muse/prose domain — hallucinated, never existed
Sonnet 4.6
minor
⚠
3 days ago
sha256:4d42a346263e7cbbd152c147f3e6f24576f4b4440df9249ffb9fbcf9db699fcb
feat: populate url in create_issue and create_proposal responses
Sonnet 4.6
minor
⚠
3 days ago
sha256:3707eba7ad42cadedf18c8b9c534d839b88cfd1c30924c3c5a3edc74e1d809de
feat: add url field to mist, issue, and proposal list/read …
Sonnet 4.6
minor
⚠
3 days ago
sha256:d110dd71fb7c1f5e064162de1262b2976841a00d7549bc4f441045f5c13ef33f
feat: add MergeResultEmbed to ProposalResponse (deliverable 5)
Sonnet 4.6
minor
⚠
4 days ago
sha256:50b52eda7afb2f122863aef47d684d1a9e4684b48f5f95367fc956e28ceb7d42
refactor: rename merge strategy aliases to canonical names
Sonnet 4.6
minor
⚠
9 days ago
sha256:af9422a68cbd2db7c88f664388e11134b0ae0057ee5ad14465d82208548a9d7d
changing --event to --verdict. displaying changes requested…
Human
minor
⚠
11 days ago
sha256:a909058d727faac4d77f6e659cc0b1f9315efcb6aabfd870d08763525a67093d
dialing in --strategy and --history on merge proposal
Human
minor
⚠
11 days ago
sha256:1f95928652d220172ddd56fb71bdd7bee3158e1b86f1b8ba8b2470edde498c6a
update proposal detail w/ --history and --strategy flags
Human
minor
⚠
12 days ago
sha256:1b1a47509ee8154b9c61b68e8e871f6f6dd4d48125c5bcc1df61733bd3657c42
insert `MusehubCommitGraph` row when `merge_proposal` creat…
Human
patch
12 days ago
sha256:f3995ec2c05c9c34b0e4d6e96349a811d0117a1c51d78096d757998ccb3c0520
fix: blobs only in S3/mpack — remove commit/snapshot indivi…
Sonnet 4.6
patch
14 days ago
sha256:e597c0b97ade9c3c52ac4735ceb437ee69d1b6f0db61b8d7caa6467c5866566d
feat(phase2): write commit objects to S3 at all 5 write sit…
Sonnet 4.6
patch
17 days ago