gabriel / musehub public
musehub_proposals.py python
2,215 lines 83.4 KB Hotspot
Raw
sha256:400438cf8bc700a611f1ba798aa9def68290f487dc19f7dbf317985ad17050c9 chore: delete muse/prose domain — hallucinated, never existed Sonnet 4.6 minor ⚠ breaking 3 days ago
1 """MuseHub merge proposal persistence adapter — single point of DB access for proposals.
2
3 This module is the ONLY place that touches the ``musehub_proposals`` table.
4 Route handlers delegate here; no business logic lives in routes.
5
6 Boundary rules:
7 - Must NOT import state stores, SSE queues, or LLM clients.
8 - May import ORM models from musehub.db domain-specific modules.
9 - May import Pydantic response models from musehub.models.musehub.
10 - May import musehub.core.genesis for genesis ID computation.
11
12 Merge strategy
13 --------------
14 ``merge_commit`` is the only strategy at MVP. It creates a new commit on
15 ``to_branch`` whose parent_ids are [to_branch head, from_branch head], then
16 updates the ``to_branch`` head pointer and marks the proposal as merged.
17
18 If either branch has no commits yet (no head commit), the merge is rejected with
19 a ``ValueError`` — there is nothing to merge.
20 """
21
22 import logging
23 from datetime import datetime, timezone
24
25 import sqlalchemy as sa
26 from sqlalchemy import ColumnElement, func, select
27 from sqlalchemy.ext.asyncio import AsyncSession
28
29 from musehub.core.genesis import compute_branch_id, compute_comment_id, compute_proposal_id, compute_review_id, compute_simulation_id
30 from musehub.db.musehub_identity_models import MusehubIdentity
31 from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitGraph, MusehubCommitRef
32 from musehub.db.musehub_social_models import (
33 MusehubProposal,
34 MusehubProposalComment,
35 MusehubProposalReview,
36 MusehubProposalSimulation,
37 )
38 from musehub.services.proposal_dag import (
39 CycleError,
40 ProposalDag,
41 blocked_by_numbers,
42 blocks_numbers,
43 create_dependency_edges,
44 is_blocked,
45 load_dag_for_proposals,
46 )
47 from musehub.muse_cli.snapshot import compute_commit_id, compute_snapshot_id
48
49
50 class BranchNotFoundError(Exception):
51 """Raised when a required branch does not exist in the repo."""
52 from musehub.types.json_types import JSONObject, StrDict
53 from musehub.models.musehub import (
54 DomainHeatEntry,
55 DomainHeatResponse,
56 MergeReadinessResponse,
57 MergeResultEmbed,
58 ProposalCommentListResponse,
59 ProposalCommentResponse,
60 ProposalListEntry,
61 ProposalListFilters,
62 ProposalListResponse,
63 ProposalResponse,
64 ProposalReviewListResponse,
65 ProposalReviewResponse,
66 SimulationListResponse,
67 SimulationResponse,
68 )
69
70 type _CommentMap = dict[str, ProposalCommentResponse]
71
72 logger = logging.getLogger(__name__)
73
74
75 def _utc_now() -> datetime:
76 return datetime.now(tz=timezone.utc)
77
78
79
80 def _symbols_from_delta(delta: JSONObject | None) -> list[str]:
81 """Extract unique symbol addresses from a structured_delta dict.
82
83 Only child_op addresses that contain ``::`` are returned — file-level ops
84 without a symbol component are intentionally excluded.
85 """
86 if not isinstance(delta, dict):
87 return []
88 seen: set[str] = set()
89 for file_op in delta.get("ops") or []:
90 if not isinstance(file_op, dict):
91 continue
92 for child_op in file_op.get("child_ops") or []:
93 if not isinstance(child_op, dict):
94 continue
95 addr = child_op.get("address", "")
96 if "::" in addr and addr not in seen:
97 seen.add(addr)
98 return list(seen)
99
100
101 async def _touched_symbols_for_branch(
102 session: AsyncSession, repo_id: str, branch: str
103 ) -> list[str]:
104 """Return the union of symbol addresses touched by all commits on ``branch``."""
105 rows = (await session.execute(
106 select(MusehubCommit.structured_delta)
107 .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id)
108 .where(
109 MusehubCommitRef.repo_id == repo_id,
110 MusehubCommit.branch == branch,
111 MusehubCommit.structured_delta.isnot(None),
112 )
113 )).scalars().all()
114 seen: set[str] = set()
115 for delta in rows:
116 seen.update(_symbols_from_delta(delta))
117 return list(seen)
118
119
120 def _to_proposal_response(
121 row: MusehubProposal,
122 *,
123 dag: ProposalDag | None = None,
124 simulations: "SimulationListResponse | None" = None,
125 merge_result: MergeResultEmbed | None = None,
126 url_prefix: str = "",
127 ) -> ProposalResponse:
128 from musehub.models.musehub import MergeConditions, MergeStrategy, ProposalType
129 mc_raw = getattr(row, "merge_conditions", None)
130 mc = MergeConditions.model_validate(mc_raw) if mc_raw else None
131
132 blocked_by: list[int] = []
133 blocks: list[int] = []
134 is_blocked_flag = False
135 if dag is not None:
136 blocked_by = blocked_by_numbers(dag, row.proposal_id)
137 blocks = blocks_numbers(dag, row.proposal_id)
138 is_blocked_flag = is_blocked(dag, row.proposal_id)
139
140 latest_simulations: dict[str, dict] = {}
141 if simulations is not None:
142 for sim in simulations.simulations:
143 latest_simulations[sim.simulation_type] = {
144 "simulation_id": sim.simulation_id,
145 "result": sim.result,
146 "is_stale": sim.is_stale,
147 "from_branch_commit_id": sim.from_branch_commit_id,
148 "duration_ms": sim.duration_ms,
149 "created_at": sim.created_at.isoformat(),
150 }
151
152 return ProposalResponse(
153 proposal_id=row.proposal_id,
154 proposal_number=row.proposal_number,
155 url=f"{url_prefix}/proposals/{row.proposal_id}" if url_prefix else "",
156 title=row.title,
157 body=row.body,
158 state=row.state,
159 from_branch=row.from_branch,
160 to_branch=row.to_branch,
161 merge_commit_id=row.merge_commit_id,
162 merged_at=row.merged_at,
163 author=row.author,
164 created_at=row.created_at,
165 proposal_type=ProposalType(getattr(row, "proposal_type", "state_merge")),
166 is_draft=getattr(row, "is_draft", False),
167 merge_conditions=mc,
168 merge_strategy=MergeStrategy(getattr(row, "merge_strategy", "overlay")),
169 selective_domains=getattr(row, "selective_domains", None),
170 risk_score=getattr(row, "risk_score", None),
171 dimensional_risk=dict(getattr(row, "dimensional_risk", None) or {}),
172 blocked_by=blocked_by,
173 blocks=blocks,
174 is_blocked=is_blocked_flag,
175 latest_simulations=latest_simulations,
176 proposer_signature=getattr(row, "proposer_signature", None),
177 proposer_public_key=getattr(row, "proposer_public_key", None),
178 from_snapshot_id=getattr(row, "from_snapshot_id", None),
179 to_snapshot_id=getattr(row, "to_snapshot_id", None),
180 merge_result=merge_result,
181 )
182
183
184 async def _get_branch(
185 session: AsyncSession, repo_id: str, branch_name: str
186 ) -> MusehubBranch | None:
187 """Return the branch record by repo + name, or None."""
188 stmt = select(MusehubBranch).where(
189 MusehubBranch.repo_id == repo_id,
190 MusehubBranch.name == branch_name,
191 )
192 return (await session.execute(stmt)).scalar_one_or_none()
193
194
195 async def create_proposal(
196 session: AsyncSession,
197 *,
198 repo_id: str,
199 title: str,
200 from_branch: str,
201 to_branch: str,
202 body: str = "",
203 author: str = "",
204 author_identity_id: str = "",
205 proposal_type: str = "state_merge",
206 is_draft: bool = False,
207 merge_strategy: str = "overlay",
208 merge_conditions: JSONObject | None = None,
209 selective_domains: list[str] | None = None,
210 depends_on: list[str] | None = None,
211 proposer_signature: str | None = None,
212 proposer_public_key: str | None = None,
213 url_prefix: str = "",
214 ) -> ProposalResponse:
215 """Persist a new merge proposal in ``open`` state and return its wire representation.
216
217 ``author`` identifies the user opening the proposal — typically the MSign handle
218 from the request context, or a display name from the seed script.
219
220 Raises ``BranchNotFoundError`` if ``from_branch`` does not exist in the repo;
221 the caller should surface this as HTTP 404.
222 """
223 branch = await _get_branch(session, repo_id, from_branch)
224 if branch is None:
225 raise BranchNotFoundError(f"Branch '{from_branch}' not found in repo {repo_id}")
226 from_snapshot_id = branch.head_commit_id
227 to_branch_row = await _get_branch(session, repo_id, to_branch)
228 to_snapshot_id = to_branch_row.head_commit_id if to_branch_row else None
229
230 # Assign the next sequential proposal_number for this repo (1-based, like GitHub)
231 max_num_result = await session.execute(
232 select(func.max(MusehubProposal.proposal_number)).where(
233 MusehubProposal.repo_id == repo_id
234 )
235 )
236 max_num: int | None = max_num_result.scalar_one_or_none()
237 next_num = (max_num or 0) + 1
238
239 touched = await _touched_symbols_for_branch(session, repo_id, from_branch)
240 _created_at = _utc_now()
241 initial_state = "drafting" if is_draft else "open"
242 proposal = MusehubProposal(
243 proposal_id=compute_proposal_id(
244 repo_id, author_identity_id, from_branch, to_branch, _created_at.isoformat()
245 ),
246 repo_id=repo_id,
247 proposal_number=next_num,
248 title=title,
249 body=body,
250 state=initial_state,
251 from_branch=from_branch,
252 to_branch=to_branch,
253 author=author,
254 touched_symbols=touched,
255 created_at=_created_at,
256 proposal_type=proposal_type,
257 is_draft=is_draft,
258 merge_strategy=merge_strategy,
259 merge_conditions=merge_conditions,
260 selective_domains=selective_domains,
261 proposer_signature=proposer_signature,
262 proposer_public_key=proposer_public_key,
263 from_snapshot_id=from_snapshot_id,
264 to_snapshot_id=to_snapshot_id,
265 )
266 session.add(proposal)
267 await session.flush()
268
269 # Persist dependency edges — validates existence, detects cycles before commit
270 if depends_on:
271 await create_dependency_edges(session, proposal.proposal_id, depends_on)
272
273 await session.refresh(proposal)
274 logger.info("✅ Created proposal '%s' (%s → %s) in repo %s", title, from_branch, to_branch, repo_id)
275 return _to_proposal_response(proposal, url_prefix=url_prefix)
276
277
278 def _risk_band_conditions(bands: list[str]) -> list[ColumnElement[bool]]:
279 """Return SQLAlchemy conditions that match risk_score to the given band names.
280
281 Each band maps to a half-open interval on [0.0, 1.0]:
282 critical ≥ 0.75
283 high 0.50 – 0.74…
284 medium 0.25 – 0.49…
285 low 0.01 – 0.24…
286 none == 0.0 (or NULL)
287
288 Multiple bands are OR-ed together.
289 """
290 band_ranges: dict[str, tuple[float | None, float | None]] = {
291 "critical": (0.75, None),
292 "high": (0.50, 0.75),
293 "medium": (0.25, 0.50),
294 "low": (0.01, 0.25),
295 "none": (None, 0.01),
296 }
297 clauses = []
298 for band in bands:
299 lo, hi = band_ranges.get(band, (None, None))
300 if band == "none":
301 clauses.append(
302 sa.or_(
303 MusehubProposal.risk_score.is_(None),
304 MusehubProposal.risk_score == 0.0,
305 )
306 )
307 elif lo is not None and hi is not None:
308 clauses.append(
309 sa.and_(
310 MusehubProposal.risk_score >= lo,
311 MusehubProposal.risk_score < hi,
312 )
313 )
314 elif lo is not None:
315 clauses.append(MusehubProposal.risk_score >= lo)
316 return clauses
317
318
319 async def list_proposals(
320 session: AsyncSession,
321 repo_id: str,
322 *,
323 state: str = "all",
324 cursor: str | None = None,
325 limit: int = 20,
326 filters: ProposalListFilters | None = None,
327 url_prefix: str = "",
328 ) -> ProposalListResponse:
329 """Return merge proposals for a repo with cursor-based keyset pagination.
330
331 ``state`` may be ``"open"``, ``"merged"``, ``"closed"``, ``"all"``, or any
332 value in the extended 7-state machine accepted by ``ProposalListFilters``.
333
334 When ``filters`` is provided, the following additional predicates are applied:
335 - ``filters.risk_band`` → ``risk_score`` range filter (OR across bands)
336 - ``filters.domain`` → ``risk_score > 0`` when "code" is included;
337 other domains require per-domain risk rows
338 (Phase 2 DB prerequisite)
339 - ``filters.author_type`` → LEFT JOIN to ``musehub_identities``
340 - ``filters.assigned_reviewer`` → EXISTS sub-select on ``musehub_proposal_reviews``
341 - ``filters.sort`` → ordering; ``merge_ready_first`` uses an approval
342 count sub-select to surface ready proposals first
343
344 ``cursor`` is the ISO 8601 ``created_at`` of the last seen proposal (opaque
345 to callers — pass ``nextCursor`` from a previous response verbatim).
346
347 Args:
348 session: Async database session.
349 repo_id: Target repository ID.
350 state: Proposal state filter; defaults to "all".
351 cursor: Pagination cursor (opaque ISO 8601 string).
352 limit: Page size.
353 filters: Optional ``ProposalListFilters``; overrides ``state``, ``limit``,
354 ``cursor``, and ``sort`` when provided. ``state`` in ``filters``
355 takes precedence over the top-level ``state`` kwarg.
356
357 Returns:
358 ``ProposalListResponse`` with paginated proposals and a ``nextCursor``.
359
360 Raises:
361 ValueError: If ``cursor`` is not a valid ISO 8601 datetime string.
362 """
363 f = filters
364 effective_state = (f.state if f else None) or state
365 effective_limit = (f.limit if f else None) or limit
366 effective_cursor = (f.cursor if f else None) or cursor
367 effective_sort = (f.sort if f else None) or "newest"
368
369 conditions: list[ColumnElement[bool]] = [MusehubProposal.repo_id == repo_id]
370 if effective_state != "all":
371 conditions.append(MusehubProposal.state == effective_state)
372
373 stmt = select(MusehubProposal)
374
375 # ── Author-type filter (requires join to identities) ───────────────────────
376 if f and f.author_type != "all":
377 stmt = stmt.join(
378 MusehubIdentity,
379 MusehubIdentity.handle == MusehubProposal.author,
380 isouter=True,
381 )
382 if f.author_type == "human":
383 conditions.append(
384 sa.or_(
385 MusehubIdentity.identity_type == "human",
386 MusehubIdentity.identity_type.is_(None),
387 )
388 )
389 else:
390 conditions.append(MusehubIdentity.identity_type == f.author_type)
391
392 # ── Risk-band filter ───────────────────────────────────────────────────────
393 if f and f.risk_band:
394 band_clauses = _risk_band_conditions(f.risk_band)
395 if band_clauses:
396 conditions.append(sa.or_(*band_clauses))
397
398 # ── Domain filter — code only at Phase 2 (other domains require risk rows) ─
399 if f and f.domain:
400 domain_clauses = []
401 if "code" in f.domain:
402 domain_clauses.append(
403 sa.and_(
404 MusehubProposal.risk_score.is_not(None),
405 MusehubProposal.risk_score > 0.0,
406 )
407 )
408 if domain_clauses:
409 conditions.append(sa.or_(*domain_clauses))
410
411 # ── Proposal-type filter ──────────────────────────────────────────────────
412 if f and f.proposal_type:
413 conditions.append(MusehubProposal.proposal_type.in_(f.proposal_type))
414
415 # ── Is-draft filter ───────────────────────────────────────────────────────
416 if f and f.is_draft is not None:
417 conditions.append(MusehubProposal.is_draft == f.is_draft)
418
419 # ── Merge-strategy filter ─────────────────────────────────────────────────
420 if f and f.merge_strategy:
421 conditions.append(MusehubProposal.merge_strategy.in_(f.merge_strategy))
422
423 # ── Assigned-reviewer filter ───────────────────────────────────────────────
424 if f and f.assigned_reviewer:
425 reviewer_subq = (
426 select(MusehubProposalReview.proposal_id)
427 .where(
428 MusehubProposalReview.reviewer_username == f.assigned_reviewer,
429 MusehubProposalReview.state.in_(["pending", "approved", "changes_requested"]),
430 )
431 .correlate(MusehubProposal)
432 )
433 conditions.append(MusehubProposal.proposal_id.in_(reviewer_subq))
434
435 # ── Count total matching rows (re-use same joins as data query) ───────────
436 count_stmt = stmt.with_only_columns(
437 func.count(MusehubProposal.proposal_id)
438 ).where(*conditions).order_by(None)
439 total: int = (await session.execute(count_stmt)).scalar_one()
440
441 # ── Sort order ─────────────────────────────────────────────────────────────
442 order_clauses: list[ColumnElement[bool]]
443 cursor_ascending: bool # True → > cursor, False → < cursor
444 if effective_sort == "oldest":
445 order_clauses = [MusehubProposal.created_at.asc()]
446 cursor_ascending = True
447 elif effective_sort == "risk_desc":
448 order_clauses = [MusehubProposal.risk_score.desc().nulls_last()]
449 cursor_ascending = False
450 elif effective_sort == "risk_asc":
451 order_clauses = [MusehubProposal.risk_score.asc().nulls_last()]
452 cursor_ascending = False
453 elif effective_sort == "merge_ready_first":
454 # Sub-select: count approved reviews per proposal; ready = count>=2 AND breakage=0
455 approval_subq = (
456 select(func.count(MusehubProposalReview.review_id))
457 .where(
458 MusehubProposalReview.proposal_id == MusehubProposal.proposal_id,
459 MusehubProposalReview.state == "approved",
460 )
461 .correlate(MusehubProposal)
462 .scalar_subquery()
463 )
464 is_ready = sa.case(
465 (
466 sa.and_(
467 approval_subq >= _DEFAULT_REQUIRED_APPROVALS,
468 MusehubProposal.breakage_count == 0,
469 ),
470 0,
471 ),
472 else_=1,
473 )
474 order_clauses = [is_ready, MusehubProposal.created_at.desc()]
475 cursor_ascending = False
476 else:
477 # newest (default)
478 order_clauses = [MusehubProposal.created_at.desc()]
479 cursor_ascending = False
480
481 # ── Cursor predicate ───────────────────────────────────────────────────────
482 data_conditions = list(conditions)
483 if effective_cursor is not None:
484 cursor_dt = datetime.fromisoformat(effective_cursor)
485 if cursor_ascending:
486 data_conditions.append(MusehubProposal.created_at > cursor_dt)
487 else:
488 data_conditions.append(MusehubProposal.created_at < cursor_dt)
489
490 rows = list(
491 (
492 await session.execute(
493 stmt.where(*data_conditions).order_by(*order_clauses).limit(effective_limit + 1)
494 )
495 ).scalars()
496 )
497
498 next_cursor: str | None = None
499 if len(rows) == effective_limit + 1:
500 next_cursor = rows[effective_limit - 1].created_at.isoformat()
501 rows = rows[:effective_limit]
502
503 return ProposalListResponse(
504 proposals=[_to_proposal_response(r, url_prefix=url_prefix) for r in rows],
505 total=total,
506 next_cursor=next_cursor,
507 )
508
509
510 async def get_proposal(
511 session: AsyncSession,
512 repo_id: str,
513 proposal_id: str,
514 *,
515 url_prefix: str = "",
516 ) -> ProposalResponse | None:
517 """Return a single proposal enriched with DAG position and simulation summaries."""
518 stmt = select(MusehubProposal).where(
519 MusehubProposal.repo_id == repo_id,
520 MusehubProposal.proposal_id == proposal_id,
521 )
522 row = (await session.execute(stmt)).scalar_one_or_none()
523 if row is None:
524 return None
525 dag = await load_dag_for_proposals(session, [proposal_id])
526 sims = await list_simulations(session, repo_id, proposal_id)
527 return _to_proposal_response(row, dag=dag, simulations=sims, url_prefix=url_prefix)
528
529
530 async def update_proposal(
531 session: AsyncSession,
532 repo_id: str,
533 proposal_id: str,
534 *,
535 title: str | None = None,
536 body: str | None = None,
537 proposal_type: str | None = None,
538 merge_strategy: str | None = None,
539 ) -> ProposalResponse | None:
540 """Apply a partial update to a proposal. Returns None if not found."""
541 stmt = select(MusehubProposal).where(
542 MusehubProposal.repo_id == repo_id,
543 MusehubProposal.proposal_id == proposal_id,
544 )
545 row = (await session.execute(stmt)).scalar_one_or_none()
546 if row is None:
547 return None
548 if title is not None:
549 row.title = title
550 if body is not None:
551 row.body = body
552 if proposal_type is not None:
553 row.proposal_type = proposal_type
554 if merge_strategy is not None:
555 row.merge_strategy = merge_strategy
556 await session.commit()
557 await session.refresh(row)
558 dag = await load_dag_for_proposals(session, [proposal_id])
559 sims = await list_simulations(session, repo_id, proposal_id)
560 return _to_proposal_response(row, dag=dag, simulations=sims)
561
562
563 async def _resolve_ancestor_manifest(
564 session: AsyncSession,
565 repo_id: str,
566 from_branch: str,
567 to_branch: str,
568 ) -> StrDict | None:
569 """Find the common ancestor snapshot manifest for a branch pair.
570
571 Walks the from_branch commit history looking for the first commit whose
572 parent_ids overlap with commits reachable from to_branch. This is a
573 lightweight merge-base approximation suitable for server-side strategy
574 computation (not a full LCA traversal).
575
576 Returns None if no ancestor can be found (new repo, orphan branches).
577 """
578 from musehub.graph.walk import walk_dag_async
579 from musehub.services.musehub_snapshot import get_snapshot_manifest
580
581 to_b = await _get_branch(session, repo_id, to_branch)
582 from_b = await _get_branch(session, repo_id, from_branch)
583 if to_b is None or from_b is None:
584 return None
585
586 # Walk 1: collect to_branch ancestry (bounded BFS)
587 to_commit_ids: set[str] = set()
588
589 async def _to_adj(cid: str) -> list[str]:
590 commit = await session.get(MusehubCommit, cid)
591 return commit.parent_ids if commit and commit.parent_ids else []
592
593 async for cid in walk_dag_async(
594 [to_b.head_commit_id] if to_b.head_commit_id else [],
595 _to_adj,
596 max_nodes=200,
597 ):
598 to_commit_ids.add(cid)
599
600 # Walk 2: first-parent walk on from_branch, looking for merge base
601 candidate_id: str | None = None
602
603 async def _from_adj(cid: str) -> list[str]:
604 nonlocal candidate_id
605 commit = await session.get(MusehubCommit, cid)
606 if commit is None:
607 return []
608 for parent_id in (commit.parent_ids or []):
609 if parent_id in to_commit_ids:
610 candidate_id = parent_id
611 return [] # stop walking once merge base is found
612 return commit.parent_ids[:1] if commit.parent_ids else []
613
614 async for _ in walk_dag_async(
615 [from_b.head_commit_id] if from_b.head_commit_id else [],
616 _from_adj,
617 max_nodes=200,
618 ):
619 if candidate_id:
620 break
621
622 if not candidate_id:
623 return None
624
625 ancestor_commit = await session.get(MusehubCommit, candidate_id)
626 if ancestor_commit is None or not ancestor_commit.snapshot_id:
627 return None
628
629 return await get_snapshot_manifest(session, ancestor_commit.snapshot_id)
630
631
632 async def _rebase_commits(
633 session: AsyncSession,
634 repo_id: str,
635 proposal: "MusehubProposal",
636 merger_handle: str,
637 to_b: "MusehubBranch | None",
638 from_b: "MusehubBranch | None",
639 to_manifest: StrDict,
640 compute_commit_id_fn: object,
641 upsert_snapshot_entries_fn: object,
642 ) -> str:
643 """Replay each from_branch commit individually onto to_branch (linear history).
644
645 Algorithm:
646 1. Collect from_branch commits not reachable from to_branch, oldest-first.
647 2. For each commit in order:
648 a. Compute its file delta vs its parent snapshot.
649 b. Apply that delta to the running rebased state (starts = to_manifest).
650 c. Upsert the new snapshot.
651 d. Create a new MusehubCommit with parent = previous replayed commit.
652 3. Return the commit_id of the tip (last replayed commit).
653 """
654 from musehub.services.musehub_snapshot import get_snapshot_manifest
655
656 to_head_cid = to_b.head_commit_id if to_b else None
657
658 # Collect from_branch commits not on to_branch (walk parent_ids BFS).
659 from_head_cid = from_b.head_commit_id if from_b else None
660 if not from_head_cid:
661 raise ValueError(f"from_branch '{proposal.from_branch}' has no commits")
662
663 to_ancestors: set[str] = set()
664 if to_head_cid:
665 _q: list[str] = [to_head_cid]
666 while _q:
667 _cid = _q.pop()
668 if _cid in to_ancestors:
669 continue
670 to_ancestors.add(_cid)
671 _row = await session.get(MusehubCommit, _cid)
672 if _row:
673 _q.extend(_row.parent_ids or [])
674
675 # BFS from from_head, collecting commits not in to_ancestors.
676 from_commits_unordered: list[MusehubCommit] = []
677 _seen: set[str] = set()
678 _frontier = [from_head_cid]
679 while _frontier:
680 _cid = _frontier.pop(0)
681 if _cid in _seen or _cid in to_ancestors:
682 continue
683 _seen.add(_cid)
684 _c = await session.get(MusehubCommit, _cid)
685 if _c is None:
686 continue
687 from_commits_unordered.append(_c)
688 _frontier.extend(p for p in (_c.parent_ids or []) if p not in _seen)
689
690 # Topological sort: oldest-first (Kahn's algorithm on parent_ids subset).
691 cid_set = {c.commit_id for c in from_commits_unordered}
692 in_degree: dict[str, int] = {c.commit_id: 0 for c in from_commits_unordered}
693 children: dict[str, list[MusehubCommit]] = {c.commit_id: [] for c in from_commits_unordered}
694 for c in from_commits_unordered:
695 for p in (c.parent_ids or []):
696 if p in cid_set:
697 children[p].append(c)
698 in_degree[c.commit_id] += 1
699 queue = [c for c in from_commits_unordered if in_degree[c.commit_id] == 0]
700 from_commits: list[MusehubCommit] = []
701 while queue:
702 c = queue.pop(0)
703 from_commits.append(c)
704 for child in children[c.commit_id]:
705 in_degree[child.commit_id] -= 1
706 if in_degree[child.commit_id] == 0:
707 queue.append(child)
708
709 # Look up generation of to_head for commit graph.
710 _to_gen = 0
711 if to_head_cid:
712 _tg_row = await session.get(MusehubCommitGraph, to_head_cid)
713 if _tg_row:
714 _to_gen = _tg_row.generation
715
716 # Replay each commit.
717 rebased_state: StrDict = dict(to_manifest)
718 current_parent_id: str = to_head_cid or ""
719 current_gen = _to_gen
720 tip_cid = current_parent_id
721
722 for orig in from_commits:
723 # Compute file delta: orig_snapshot vs orig's parent snapshot.
724 orig_manifest = await get_snapshot_manifest(session, orig.snapshot_id) if orig.snapshot_id else {}
725 parent_manifest: StrDict = {}
726 for p in (orig.parent_ids or []):
727 _p_row = await session.get(MusehubCommit, p)
728 if _p_row and _p_row.snapshot_id:
729 parent_manifest = await get_snapshot_manifest(session, _p_row.snapshot_id)
730 break
731
732 # Apply delta to rebased_state.
733 for path, oid in orig_manifest.items():
734 rebased_state[path] = oid
735 for path in list(parent_manifest.keys()):
736 if path not in orig_manifest:
737 rebased_state.pop(path, None)
738
739 # Upsert new snapshot for this rebased state.
740 rebased_snap_id = compute_snapshot_id(rebased_state)
741 await upsert_snapshot_entries_fn(session, repo_id, rebased_snap_id, rebased_state)
742
743 # Create the replayed commit.
744 committed_at = _utc_now()
745 new_parent_ids = [current_parent_id] if current_parent_id else []
746 new_cid = compute_commit_id_fn(
747 new_parent_ids, rebased_snap_id, orig.message,
748 committed_at.isoformat(), author=orig.author or merger_handle,
749 signer_public_key="",
750 )
751 session.add(MusehubCommit(
752 commit_id=new_cid, branch=proposal.to_branch,
753 parent_ids=new_parent_ids, message=orig.message,
754 author=orig.author or merger_handle, timestamp=committed_at,
755 snapshot_id=rebased_snap_id,
756 structured_delta=orig.structured_delta,
757 agent_id=orig.agent_id or "", model_id=orig.model_id or "",
758 ))
759 session.add(MusehubCommitRef(repo_id=repo_id, commit_id=new_cid))
760 current_gen += 1
761 session.add(MusehubCommitGraph(
762 commit_id=new_cid, parent_ids=new_parent_ids,
763 generation=current_gen, snapshot_id=rebased_snap_id,
764 ))
765
766 current_parent_id = new_cid
767 tip_cid = new_cid
768
769 if not tip_cid:
770 raise ValueError("rebase produced no commits — from_branch has no unique commits")
771
772 return tip_cid
773
774
775 async def merge_proposal(
776 session: AsyncSession,
777 repo_id: str,
778 proposal_id: str,
779 *,
780 merge_strategy: str = "merge_commit",
781 merger_handle: str = "",
782 commit_history: str = "merge",
783 ) -> ProposalResponse:
784 """Merge an open proposal.
785
786 Args:
787 merge_strategy: Content/snapshot merge strategy — how file manifests combine.
788 ``overlay`` (default), ``weave``, ``replay``, ``selective``.
789 ``overlay`` (default), ``weave``, ``replay``, ``selective``.
790 commit_history: VCS commit graph style.
791 ``merge`` (default) — new commit with two parents [to_head, from_head].
792 ``squash`` — one new commit, parent = [to_head] only.
793 ``rebase`` — replay each from_branch commit linearly (TODO).
794
795 Raises:
796 ValueError: Proposal not found or branch has no commits.
797 RuntimeError: Proposal is already merged or closed (surfaces as 409).
798 """
799 stmt = select(MusehubProposal).where(
800 MusehubProposal.repo_id == repo_id,
801 MusehubProposal.proposal_id == proposal_id,
802 )
803 proposal = (await session.execute(stmt)).scalar_one_or_none()
804 if proposal is None:
805 raise ValueError(f"Proposal {proposal_id} not found in repo {repo_id}")
806
807 if proposal.state not in ("open", "approved"):
808 raise RuntimeError(f"Proposal {proposal_id} is already {proposal.state}")
809
810 # Gate on hard dependencies — check require_dependency_merged from merge_conditions
811 mc_raw = proposal.merge_conditions or {}
812 if mc_raw.get("require_dependency_merged", True):
813 dag = await load_dag_for_proposals(session, [proposal_id])
814 if is_blocked(dag, proposal_id):
815 unmerged_nums = blocked_by_numbers(dag, proposal_id)
816 raise RuntimeError(
817 f"Proposal {proposal_id} cannot be merged: "
818 f"unmerged dependencies: proposal numbers {unmerged_nums}"
819 )
820
821 from_b = await _get_branch(session, repo_id, proposal.from_branch)
822 to_b = await _get_branch(session, repo_id, proposal.to_branch)
823
824 # Collect parent commit IDs for the merge commit.
825 parent_ids: list[str] = []
826 if to_b is not None and to_b.head_commit_id is not None:
827 parent_ids.append(to_b.head_commit_id)
828 if from_b is not None and from_b.head_commit_id is not None:
829 parent_ids.append(from_b.head_commit_id)
830
831 if not parent_ids:
832 raise ValueError(
833 f"Cannot merge: neither '{proposal.from_branch}' nor '{proposal.to_branch}' has any commits"
834 )
835
836 # Squash: drop from_head from parent_ids so the result is linear.
837 if commit_history == "squash" and to_b is not None and to_b.head_commit_id is not None:
838 parent_ids = [to_b.head_commit_id]
839
840 from musehub.services.musehub_snapshot import get_snapshot_manifest, upsert_snapshot_entries
841 from musehub.services.proposal_merge_strategies import execute_merge_strategy
842
843 to_manifest: StrDict = {}
844 if to_b is not None and to_b.head_commit_id is not None:
845 to_head = await session.get(MusehubCommit, to_b.head_commit_id)
846 if to_head is not None and to_head.snapshot_id:
847 to_manifest = await get_snapshot_manifest(session, to_head.snapshot_id)
848
849 from_manifest: StrDict = {}
850 from_head_snapshot_id: str | None = None
851 if from_b is not None and from_b.head_commit_id is not None:
852 from_head = await session.get(MusehubCommit, from_b.head_commit_id)
853 if from_head is not None and from_head.snapshot_id:
854 from_head_snapshot_id = from_head.snapshot_id
855 from_manifest = await get_snapshot_manifest(session, from_head.snapshot_id)
856
857 # Resolve ancestor manifest for three-way strategies.
858 # The ancestor is the snapshot at the point from_branch was cut from to_branch.
859 # We approximate this as the earliest commit on from_branch that has a parent
860 # on to_branch — or the repo's first commit if the branch predates any tracking.
861 # For OVERLAY (default), the ancestor is used only for conflict audit.
862 ancestor_manifest: StrDict | None = None
863 strategy_name = getattr(proposal, "merge_strategy", "overlay") or "overlay"
864 # Caller can override via merge_strategy parameter (passed as the kwarg).
865 if merge_strategy and merge_strategy != "merge_commit":
866 strategy_name = merge_strategy
867 if strategy_name in ("weave", "replay", "selective", "phased"):
868 # Walk from_branch commit history to find the merge-base with to_branch.
869 # Simplified: look for the first from_branch commit whose parent is in to_branch.
870 ancestor_manifest = await _resolve_ancestor_manifest(
871 session, repo_id, proposal.from_branch, proposal.to_branch
872 )
873
874 selective_domains: list[str] | None = getattr(proposal, "selective_domains", None)
875
876 merge_result = execute_merge_strategy(
877 strategy_name,
878 to_manifest,
879 from_manifest,
880 ancestor_manifest=ancestor_manifest,
881 selective_domains=selective_domains,
882 )
883 merged_manifest: StrDict = merge_result.manifest
884
885 logger.info(
886 "🔀 Merge strategy=%s added=%d modified=%d removed=%d conflicts=%d domains=%s",
887 merge_result.strategy,
888 merge_result.files_added,
889 merge_result.files_modified,
890 merge_result.files_removed,
891 len(merge_result.conflicts),
892 merge_result.domains_merged,
893 )
894
895 # ── Commit creation — varies by commit_history style ────────────────
896 merge_commit_id: str
897
898 if commit_history == "rebase":
899 # Replay each from_branch commit individually onto to_branch.
900 # Walk from_branch ancestors oldest-first, stopping at to_branch head.
901 merge_commit_id = await _rebase_commits(
902 session, repo_id, proposal, merger_handle,
903 to_b, from_b, to_manifest,
904 compute_commit_id, upsert_snapshot_entries,
905 )
906 else:
907 # merge and squash: create a single new commit.
908 merged_snapshot_id = compute_snapshot_id(merged_manifest)
909 await upsert_snapshot_entries(session, repo_id, merged_snapshot_id, merged_manifest)
910
911 merge_message = f"Merge '{proposal.from_branch}' into '{proposal.to_branch}' — proposal: {proposal.title}"
912 committed_at = _utc_now()
913 merge_commit_id = compute_commit_id(
914 parent_ids, merged_snapshot_id, merge_message,
915 committed_at.isoformat(), author=merger_handle, signer_public_key="",
916 )
917 session.add(MusehubCommit(
918 commit_id=merge_commit_id, branch=proposal.to_branch,
919 parent_ids=parent_ids, message=merge_message,
920 author=merger_handle, timestamp=committed_at, snapshot_id=merged_snapshot_id,
921 ))
922 session.add(MusehubCommitRef(repo_id=repo_id, commit_id=merge_commit_id))
923
924 _parent_gen_q = await session.execute(
925 select(MusehubCommitGraph.commit_id, MusehubCommitGraph.generation)
926 .where(MusehubCommitGraph.commit_id.in_(parent_ids))
927 )
928 _parent_gens = [row[1] for row in _parent_gen_q.all()]
929 _merge_generation = (max(_parent_gens) + 1) if _parent_gens else 0
930 session.add(MusehubCommitGraph(
931 commit_id=merge_commit_id, parent_ids=parent_ids,
932 generation=_merge_generation, snapshot_id=merged_snapshot_id,
933 ))
934
935 # Advance (or create) the to_branch head pointer.
936 if to_b is None:
937 to_b = MusehubBranch(
938 branch_id=compute_branch_id(repo_id, proposal.to_branch),
939 repo_id=repo_id,
940 name=proposal.to_branch,
941 head_commit_id=merge_commit_id,
942 )
943 session.add(to_b)
944 else:
945 to_b.head_commit_id = merge_commit_id
946
947 # Delete the source branch so it disappears from refs. This lets
948 # `muse fetch --prune` clean up the local tracking ref automatically,
949 # matching the behaviour users expect after a proposal merge.
950 if from_b is not None:
951 await session.delete(from_b)
952
953 # Refresh touched_symbols from the from_branch commits before marking merged.
954 # This gives the most accurate symbol set at the moment of merge, capturing
955 # any commits pushed to from_branch after the proposal was created.
956 touched = await _touched_symbols_for_branch(session, repo_id, proposal.from_branch)
957 proposal.touched_symbols = touched
958
959 # Mark proposal as merged and record the exact merge timestamp.
960 proposal.state = "merged"
961 proposal.merge_commit_id = merge_commit_id
962 proposal.merged_at = _utc_now()
963
964 await session.flush()
965 await session.refresh(proposal)
966 logger.info(
967 "✅ Merged proposal %s ('%s' → '%s') in repo %s, merge commit %s",
968 proposal_id,
969 proposal.from_branch,
970 proposal.to_branch,
971 repo_id,
972 merge_commit_id,
973 )
974
975 embed = MergeResultEmbed(
976 status="merged",
977 commit_id=merge_commit_id,
978 strategy=merge_result.strategy,
979 on_conflict=None,
980 history=commit_history,
981 conflicts=[c.path for c in merge_result.conflicts],
982 files_changed={
983 "added": merge_result.files_added,
984 "modified": merge_result.files_modified,
985 "deleted": merge_result.files_removed,
986 },
987 semver_impact="",
988 )
989 return _to_proposal_response(proposal, merge_result=embed)
990
991
992 # ---------------------------------------------------------------------------
993 # Reopen proposal
994 # ---------------------------------------------------------------------------
995
996
997 async def close_proposal(
998 session: AsyncSession,
999 repo_id: str,
1000 proposal_id: str,
1001 ) -> ProposalResponse:
1002 """Set an open proposal to ``closed`` state.
1003
1004 Raises:
1005 KeyError: proposal not found in this repo.
1006 RuntimeError: proposal is already closed or merged.
1007 """
1008 proposal = (await session.execute(
1009 select(MusehubProposal).where(
1010 MusehubProposal.proposal_id == proposal_id,
1011 MusehubProposal.repo_id == repo_id,
1012 )
1013 )).scalar_one_or_none()
1014 if proposal is None:
1015 raise KeyError(f"Proposal {proposal_id} not found in repo {repo_id}")
1016 if proposal.state != "open":
1017 raise RuntimeError(f"Proposal {proposal_id} is already {proposal.state}")
1018 proposal.state = "closed"
1019 await session.flush()
1020 await session.refresh(proposal)
1021 return _to_proposal_response(proposal)
1022
1023
1024 async def reopen_proposal(
1025 session: AsyncSession,
1026 repo_id: str,
1027 proposal_id: str,
1028 ) -> ProposalResponse:
1029 """Reset a merged or closed proposal back to ``open`` state.
1030
1031 Clears ``merge_commit_id`` and ``merged_at`` so the proposal can be
1032 re-merged after a corrupt commit is cleaned up (e.g. bug #36).
1033
1034 Raises:
1035 KeyError: proposal not found in this repo.
1036 RuntimeError: proposal is already open.
1037 """
1038 proposal = (await session.execute(
1039 select(MusehubProposal).where(
1040 MusehubProposal.proposal_id == proposal_id,
1041 MusehubProposal.repo_id == repo_id,
1042 )
1043 )).scalar_one_or_none()
1044 if proposal is None:
1045 raise KeyError(f"Proposal {proposal_id} not found in repo {repo_id}")
1046 if proposal.state == "open":
1047 raise RuntimeError(f"Proposal {proposal_id} is already open")
1048 proposal.state = "open"
1049 proposal.merge_commit_id = None
1050 proposal.merged_at = None
1051 await session.flush()
1052 await session.refresh(proposal)
1053 return _to_proposal_response(proposal)
1054
1055
1056 # ---------------------------------------------------------------------------
1057 # Proposal review comments
1058 # ---------------------------------------------------------------------------
1059
1060
1061 def _to_comment_response(row: MusehubProposalComment) -> ProposalCommentResponse:
1062 dim_ref: JSONObject = row.dimension_ref or {}
1063 return ProposalCommentResponse(
1064 comment_id=row.comment_id,
1065 proposal_id=row.proposal_id,
1066 author=row.author,
1067 author_user_id=getattr(row, "author_user_id", None),
1068 agent_id=getattr(row, "agent_id", None) or None,
1069 model_id=getattr(row, "model_id", None) or None,
1070 body=row.body,
1071 target_type=str(dim_ref.get("type", "general")),
1072 target_track=str(dim_ref["track"]) if "track" in dim_ref else None,
1073 target_beat_start=float(dim_ref["beat_start"]) if isinstance(dim_ref.get("beat_start"), (int, float)) else None,
1074 target_beat_end=float(dim_ref["beat_end"]) if isinstance(dim_ref.get("beat_end"), (int, float)) else None,
1075 target_note_pitch=int(dim_ref["pitch"]) if isinstance(dim_ref.get("pitch"), int) else None,
1076 parent_comment_id=row.parent_comment_id,
1077 symbol_address=row.symbol_address,
1078 created_at=row.created_at,
1079 updated_at=getattr(row, "updated_at", None),
1080 is_deleted=bool(getattr(row, "is_deleted", False)),
1081 )
1082
1083
1084 async def create_proposal_comment(
1085 session: AsyncSession,
1086 *,
1087 proposal_id: str,
1088 repo_id: str,
1089 author: str,
1090 author_identity_id: str = "",
1091 body: str,
1092 target_type: str = "general",
1093 target_track: str | None = None,
1094 target_beat_start: float | None = None,
1095 target_beat_end: float | None = None,
1096 target_note_pitch: int | None = None,
1097 parent_comment_id: str | None = None,
1098 symbol_address: str | None = None,
1099 ) -> ProposalCommentResponse:
1100 """Persist a new review comment on a proposal and return its wire representation.
1101
1102 ``author`` is the MSign handle of the reviewer.
1103 ``parent_comment_id`` must be an existing top-level comment on the same proposal
1104 when creating a threaded reply; the caller validates this constraint before
1105 calling here.
1106
1107 Raises ``ValueError`` if the proposal does not exist in the given repo.
1108 """
1109 stmt = select(MusehubProposal).where(
1110 MusehubProposal.proposal_id == proposal_id,
1111 MusehubProposal.repo_id == repo_id,
1112 )
1113 proposal = (await session.execute(stmt)).scalar_one_or_none()
1114 if proposal is None:
1115 raise ValueError(f"Proposal {proposal_id} not found in repo {repo_id}")
1116
1117 dimension_ref: JSONObject = {"type": target_type}
1118 if target_track is not None:
1119 dimension_ref["track"] = target_track
1120 if target_beat_start is not None:
1121 dimension_ref["beat_start"] = target_beat_start
1122 if target_beat_end is not None:
1123 dimension_ref["beat_end"] = target_beat_end
1124 if target_note_pitch is not None:
1125 dimension_ref["pitch"] = target_note_pitch
1126
1127 _created_at = _utc_now()
1128 comment = MusehubProposalComment(
1129 comment_id=compute_comment_id(proposal_id, author_identity_id, _created_at.isoformat()),
1130 proposal_id=proposal_id,
1131 repo_id=repo_id,
1132 author=author,
1133 body=body,
1134 dimension_ref=dimension_ref,
1135 parent_comment_id=parent_comment_id,
1136 symbol_address=symbol_address or None,
1137 created_at=_created_at,
1138 )
1139 session.add(comment)
1140 await session.flush()
1141 await session.refresh(comment)
1142 logger.info("✅ Created proposal comment %s on proposal %s by %s", comment.comment_id, proposal_id, author)
1143 return _to_comment_response(comment)
1144
1145
1146 async def backfill_comment_author_user_ids(
1147 session: AsyncSession,
1148 repo_id: str | None = None,
1149 batch: int = 500,
1150 ) -> int:
1151 """Populate author_user_id on proposal comments that have none.
1152
1153 Joins musehub_proposal_comments → musehub_identities on handle (author)
1154 and writes the identity_id. Idempotent — rows already populated are skipped.
1155 Returns count of rows updated.
1156 """
1157 from musehub.db.musehub_identity_models import MusehubIdentity
1158
1159 where = [MusehubProposalComment.author_user_id.is_(None)]
1160 if repo_id:
1161 where.append(MusehubProposalComment.repo_id == repo_id)
1162
1163 rows = (await session.execute(
1164 select(MusehubProposalComment).where(*where).limit(batch)
1165 )).scalars().all()
1166
1167 if not rows:
1168 return 0
1169
1170 handles = {r.author for r in rows}
1171 identity_rows = (await session.execute(
1172 select(MusehubIdentity.handle, MusehubIdentity.identity_id)
1173 .where(MusehubIdentity.handle.in_(handles))
1174 )).all()
1175 handle_to_id = {h: iid for h, iid in identity_rows}
1176
1177 updated = 0
1178 for row in rows:
1179 uid = handle_to_id.get(row.author)
1180 if uid:
1181 row.author_user_id = uid
1182 updated += 1
1183
1184 if updated:
1185 await session.flush()
1186
1187 return updated
1188
1189
1190 async def list_proposal_comments(
1191 session: AsyncSession,
1192 proposal_id: str,
1193 repo_id: str,
1194 cursor: str | None = None,
1195 limit: int = 20,
1196 ) -> ProposalCommentListResponse:
1197 """Return review comments for a proposal with cursor-based keyset pagination.
1198
1199 Comments are assembled into a two-level thread tree from the current page.
1200 Top-level comments (``parent_comment_id`` is None) form the root list.
1201 Each carries a ``replies`` list with direct children that appear within
1202 the same page, sorted by ``created_at`` ascending. Grandchildren are not
1203 supported — callers should reply to the original top-level comment.
1204
1205 ``cursor`` is the ISO 8601 ``created_at`` of the last seen comment
1206 (opaque to callers — pass ``nextCursor`` from a previous response
1207 verbatim). Omit to start from the beginning. ``total`` covers all
1208 comments on the proposal regardless of the current page.
1209 """
1210 conditions = [
1211 MusehubProposalComment.proposal_id == proposal_id,
1212 MusehubProposalComment.repo_id == repo_id,
1213 MusehubProposalComment.is_deleted.is_(False),
1214 ]
1215
1216 count_stmt = select(func.count(MusehubProposalComment.comment_id)).where(*conditions)
1217 total: int = (await session.execute(count_stmt)).scalar_one()
1218
1219 data_conditions = list(conditions)
1220 if cursor is not None:
1221 data_conditions.append(
1222 MusehubProposalComment.created_at > datetime.fromisoformat(cursor)
1223 )
1224
1225 rows = list(
1226 (
1227 await session.execute(
1228 select(MusehubProposalComment)
1229 .where(*data_conditions)
1230 .order_by(MusehubProposalComment.created_at)
1231 .limit(limit + 1)
1232 )
1233 ).scalars()
1234 )
1235
1236 next_cursor: str | None = None
1237 if len(rows) == limit + 1:
1238 next_cursor = rows[limit - 1].created_at.isoformat()
1239 rows = rows[:limit]
1240
1241 # Build id → response map first; attach replies in a second pass.
1242 top_level: list[ProposalCommentResponse] = []
1243 by_id: _CommentMap = {}
1244 for row in rows:
1245 resp = _to_comment_response(row)
1246 by_id[row.comment_id] = resp
1247 if row.parent_comment_id is None:
1248 top_level.append(resp)
1249
1250 for row in rows:
1251 if row.parent_comment_id is not None:
1252 parent = by_id.get(row.parent_comment_id)
1253 if parent is not None:
1254 parent.replies.append(by_id[row.comment_id])
1255
1256 return ProposalCommentListResponse(comments=top_level, total=total, next_cursor=next_cursor)
1257
1258
1259 # ---------------------------------------------------------------------------
1260 # Proposal reviews (reviewer assignment + approval workflow)
1261 # ---------------------------------------------------------------------------
1262
1263
1264 def _to_review_response(row: MusehubProposalReview) -> ProposalReviewResponse:
1265 return ProposalReviewResponse(
1266 id=row.review_id,
1267 proposal_id=row.proposal_id,
1268 reviewer_username=row.reviewer_username,
1269 state=row.state,
1270 body=row.body,
1271 submitted_at=row.submitted_at,
1272 created_at=row.created_at,
1273 )
1274
1275
1276 async def _assert_proposal_exists(session: AsyncSession, repo_id: str, proposal_id: str) -> None:
1277 """Raise ``ValueError`` if the proposal does not exist in the given repo."""
1278 stmt = select(MusehubProposal).where(
1279 MusehubProposal.proposal_id == proposal_id,
1280 MusehubProposal.repo_id == repo_id,
1281 )
1282 proposal = (await session.execute(stmt)).scalar_one_or_none()
1283 if proposal is None:
1284 raise ValueError(f"Proposal {proposal_id} not found in repo {repo_id}")
1285
1286
1287 async def request_reviewers(
1288 session: AsyncSession,
1289 *,
1290 repo_id: str,
1291 proposal_id: str,
1292 reviewers: list[str],
1293 ) -> ProposalReviewListResponse:
1294 """Add reviewer assignments to a proposal, creating a ``pending`` row for each.
1295
1296 Idempotent: if a reviewer already has a row (in any state), the existing row
1297 is left unchanged so a submitted approval is never reset by a re-request.
1298
1299 Raises ``ValueError`` if the proposal does not exist in the repo.
1300
1301 Returns the full updated review list for the proposal.
1302 """
1303 await _assert_proposal_exists(session, repo_id, proposal_id)
1304
1305 for username in reviewers:
1306 existing_stmt = select(MusehubProposalReview).where(
1307 MusehubProposalReview.proposal_id == proposal_id,
1308 MusehubProposalReview.reviewer_username == username,
1309 )
1310 existing = (await session.execute(existing_stmt)).scalar_one_or_none()
1311 if existing is None:
1312 now = _utc_now()
1313 identity_stmt = select(MusehubIdentity.identity_id).where(
1314 MusehubIdentity.handle == username
1315 )
1316 reviewer_identity_id = (await session.execute(identity_stmt)).scalar_one_or_none() or username
1317 review = MusehubProposalReview(
1318 review_id=compute_review_id(proposal_id, reviewer_identity_id, now.isoformat()),
1319 proposal_id=proposal_id,
1320 reviewer_username=username,
1321 state="pending",
1322 created_at=now,
1323 )
1324 session.add(review)
1325 logger.info("✅ Requested review from '%s' on proposal %s", username, proposal_id)
1326
1327 await session.flush()
1328 return await list_reviews(session, repo_id=repo_id, proposal_id=proposal_id)
1329
1330
1331 async def remove_reviewer(
1332 session: AsyncSession,
1333 *,
1334 repo_id: str,
1335 proposal_id: str,
1336 username: str,
1337 ) -> ProposalReviewListResponse:
1338 """Remove a pending review request for ``username`` on a proposal.
1339
1340 Only ``pending`` rows may be removed — submitted reviews are immutable to
1341 preserve the audit trail.
1342
1343 Raises ``ValueError`` if the proposal does not exist, the reviewer was never
1344 requested, or the reviewer has already submitted a non-pending review.
1345
1346 Returns the updated review list.
1347 """
1348 await _assert_proposal_exists(session, repo_id, proposal_id)
1349
1350 stmt = select(MusehubProposalReview).where(
1351 MusehubProposalReview.proposal_id == proposal_id,
1352 MusehubProposalReview.reviewer_username == username,
1353 )
1354 row = (await session.execute(stmt)).scalar_one_or_none()
1355 if row is None:
1356 raise ValueError(f"Reviewer '{username}' was not requested on proposal {proposal_id}")
1357 if row.state != "pending":
1358 raise ValueError(
1359 f"Cannot remove reviewer '{username}': review already submitted (state={row.state})"
1360 )
1361
1362 await session.delete(row)
1363 await session.flush()
1364 logger.info("✅ Removed review request for '%s' from proposal %s", username, proposal_id)
1365 return await list_reviews(session, repo_id=repo_id, proposal_id=proposal_id)
1366
1367
1368 async def list_reviews(
1369 session: AsyncSession,
1370 *,
1371 repo_id: str,
1372 proposal_id: str,
1373 state: str | None = None,
1374 cursor: str | None = None,
1375 limit: int = 20,
1376 ) -> ProposalReviewListResponse:
1377 """Return reviews for a proposal with cursor-based keyset pagination.
1378
1379 ``state`` may be one of ``pending``, ``approved``, ``changes_requested``,
1380 or ``dismissed``. When ``None``, all reviews are returned.
1381 Results are ordered by ``created_at`` ascending.
1382
1383 ``cursor`` is the ISO 8601 ``created_at`` of the last seen review
1384 (opaque to callers — pass ``nextCursor`` from a previous response
1385 verbatim). Omit to start from the beginning.
1386
1387 Raises ``ValueError`` if the proposal does not exist in the repo.
1388 """
1389 await _assert_proposal_exists(session, repo_id, proposal_id)
1390
1391 conditions = [MusehubProposalReview.proposal_id == proposal_id]
1392 if state is not None:
1393 conditions.append(MusehubProposalReview.state == state)
1394
1395 count_stmt = select(func.count(MusehubProposalReview.review_id)).where(*conditions)
1396 total: int = (await session.execute(count_stmt)).scalar_one()
1397
1398 data_conditions = list(conditions)
1399 if cursor is not None:
1400 data_conditions.append(
1401 MusehubProposalReview.created_at > datetime.fromisoformat(cursor)
1402 )
1403
1404 rows = list(
1405 (
1406 await session.execute(
1407 select(MusehubProposalReview)
1408 .where(*data_conditions)
1409 .order_by(MusehubProposalReview.created_at)
1410 .limit(limit + 1)
1411 )
1412 ).scalars()
1413 )
1414
1415 next_cursor: str | None = None
1416 if len(rows) == limit + 1:
1417 next_cursor = rows[limit - 1].created_at.isoformat()
1418 rows = rows[:limit]
1419
1420 return ProposalReviewListResponse(
1421 reviews=[_to_review_response(r) for r in rows],
1422 total=total,
1423 next_cursor=next_cursor,
1424 )
1425
1426
1427 async def submit_review(
1428 session: AsyncSession,
1429 *,
1430 repo_id: str,
1431 proposal_id: str,
1432 reviewer_username: str,
1433 reviewer_identity_id: str = "",
1434 verdict: str,
1435 body: str = "",
1436 ) -> ProposalReviewResponse:
1437 """Submit or update a formal review verdict for ``reviewer_username`` on a proposal.
1438
1439 ``verdict`` maps to a new state:
1440 - ``approve`` → ``approved``
1441 - ``request_changes`` → ``changes_requested``
1442
1443 If an existing row for this reviewer already exists, it is updated in-place,
1444 so changing from approve → request_changes (or vice versa) works by resubmitting.
1445 Ad-hoc reviews (no prior reviewer request) are also allowed.
1446
1447 Raises ``ValueError`` if the proposal does not exist in the repo.
1448 """
1449 await _assert_proposal_exists(session, repo_id, proposal_id)
1450
1451 _VERDICT_TO_STATE: StrDict = {
1452 "approve": "approved",
1453 "request_changes": "changes_requested",
1454 }
1455 if verdict not in _VERDICT_TO_STATE:
1456 raise ValueError(f"Invalid verdict '{verdict}'. Must be approve or request_changes.")
1457 new_state = _VERDICT_TO_STATE[verdict]
1458
1459 stmt = select(MusehubProposalReview).where(
1460 MusehubProposalReview.proposal_id == proposal_id,
1461 MusehubProposalReview.reviewer_username == reviewer_username,
1462 )
1463 row = (await session.execute(stmt)).scalar_one_or_none()
1464
1465 now = _utc_now()
1466 if row is None:
1467 row = MusehubProposalReview(
1468 review_id=compute_review_id(proposal_id, reviewer_identity_id, now.isoformat()),
1469 proposal_id=proposal_id,
1470 reviewer_username=reviewer_username,
1471 state=new_state,
1472 body=body or None,
1473 submitted_at=now,
1474 created_at=now,
1475 )
1476 session.add(row)
1477 else:
1478 row.state = new_state
1479 row.body = body or None
1480 row.submitted_at = now
1481
1482 await session.flush()
1483 await session.refresh(row)
1484 logger.info(
1485 "✅ Review submitted by '%s' on proposal %s: verdict=%s state=%s",
1486 reviewer_username,
1487 proposal_id,
1488 verdict,
1489 new_state,
1490 )
1491 return _to_review_response(row)
1492
1493 # ── Proposal list enrichment ──────────────────────────────────────────────────
1494
1495 _RISK_BAND_THRESHOLDS: list[tuple[float, str]] = [
1496 (0.75, "critical"),
1497 (0.50, "high"),
1498 (0.25, "medium"),
1499 (0.01, "low"),
1500 ]
1501
1502
1503 def _score_to_band(score: float) -> str:
1504 """Map a [0.0, 1.0] risk score to a human-readable band label.
1505
1506 Thresholds:
1507 ≥ 0.75 → "critical"
1508 ≥ 0.50 → "high"
1509 ≥ 0.25 → "medium"
1510 > 0.0 → "low"
1511 0.0 → "none"
1512 """
1513 for threshold, band in _RISK_BAND_THRESHOLDS:
1514 if score >= threshold:
1515 return band
1516 return "none"
1517
1518
1519 # Default required-approvals when merge_conditions is null.
1520 _DEFAULT_REQUIRED_APPROVALS = 2
1521
1522 # Domain weight map used for aggregate risk score. Unknown domains default to 1.0.
1523 _DOMAIN_WEIGHTS: dict[str, float] = {
1524 "code": 1.2,
1525 "midi": 1.0,
1526 "stems": 1.0,
1527 "pay": 1.5,
1528 }
1529
1530
1531 class _ProposalPrefetch:
1532 """Holds pre-fetched batch data for a page of proposals.
1533
1534 All DB reads for an entire page happen once in
1535 ``enrich_proposal_list_batch``; each ``enrich_proposal_list_entry`` call
1536 consults these in-memory maps — zero additional DB I/O per row.
1537 """
1538
1539 def __init__(
1540 self,
1541 *,
1542 reviews_by_proposal: dict[str, list[MusehubProposalReview]],
1543 author_types: dict[str, str],
1544 dag: ProposalDag | None = None,
1545 conflict_counts: dict[str, int | None] | None = None,
1546 ) -> None:
1547 self.reviews_by_proposal = reviews_by_proposal
1548 self.author_types = author_types
1549 self.dag: ProposalDag = dag or ProposalDag()
1550 # proposal_id → conflict_scan.result["conflict_count"]; None if not run
1551 self.conflict_counts: dict[str, int | None] = conflict_counts or {}
1552
1553
1554 async def _prefetch_for_batch(
1555 proposals: list[MusehubProposal],
1556 session: AsyncSession,
1557 ) -> _ProposalPrefetch:
1558 """Run the batch pre-fetch queries for a page of proposals.
1559
1560 Issues exactly two DB queries regardless of page size:
1561 1. All reviews for every proposal in the page.
1562 2. Identity types for every author in the page.
1563
1564 Args:
1565 proposals: ORM rows for the current page.
1566 session: Shared async session.
1567
1568 Returns:
1569 ``_ProposalPrefetch`` with maps keyed by proposal_id / author handle.
1570 """
1571 proposal_ids = [p.proposal_id for p in proposals]
1572 author_handles = list({p.author for p in proposals if p.author})
1573
1574 # Query 1 — reviews
1575 reviews_by_proposal: dict[str, list[MusehubProposalReview]] = {pid: [] for pid in proposal_ids}
1576 if proposal_ids:
1577 review_rows = list(
1578 (
1579 await session.execute(
1580 select(MusehubProposalReview).where(
1581 MusehubProposalReview.proposal_id.in_(proposal_ids)
1582 )
1583 )
1584 ).scalars()
1585 )
1586 for row in review_rows:
1587 reviews_by_proposal[row.proposal_id].append(row)
1588
1589 # Query 2 — identity types
1590 author_types: dict[str, str] = {}
1591 if author_handles:
1592 identity_rows = list(
1593 (
1594 await session.execute(
1595 select(MusehubIdentity.handle, MusehubIdentity.identity_type).where(
1596 MusehubIdentity.handle.in_(author_handles)
1597 )
1598 )
1599 ).all()
1600 )
1601 for handle, itype in identity_rows:
1602 author_types[handle] = itype
1603
1604 # Query 3 — dependency DAG (partial, scoped to this page + neighbours)
1605 dag = await load_dag_for_proposals(session, proposal_ids)
1606
1607 # Query 4 — latest conflict_scan simulation per proposal (for list summary)
1608 conflict_counts: dict[str, int | None] = {pid: None for pid in proposal_ids}
1609 if proposal_ids:
1610 sim_rows = list(
1611 (
1612 await session.execute(
1613 select(
1614 MusehubProposalSimulation.proposal_id,
1615 MusehubProposalSimulation.result,
1616 ).where(
1617 MusehubProposalSimulation.proposal_id.in_(proposal_ids),
1618 MusehubProposalSimulation.simulation_type == "conflict_scan",
1619 )
1620 )
1621 ).all()
1622 )
1623 for pid, result_json in sim_rows:
1624 if isinstance(result_json, dict):
1625 conflict_counts[pid] = result_json.get("conflict_count")
1626
1627 return _ProposalPrefetch(
1628 reviews_by_proposal=reviews_by_proposal,
1629 author_types=author_types,
1630 dag=dag,
1631 conflict_counts=conflict_counts,
1632 )
1633
1634
1635 def _enrich_one(
1636 proposal: MusehubProposal,
1637 prefetch: _ProposalPrefetch,
1638 ) -> ProposalListEntry:
1639 """Compute all display-facing fields for a single proposal list row.
1640
1641 This is the single source of truth for what the proposals list view renders
1642 per row. It does not issue any DB queries — all needed data comes from
1643 ``prefetch``, which is populated by ``_prefetch_for_batch`` before this
1644 function is called.
1645
1646 Computed fields (all server-side):
1647 - active_domains: domains with non-zero risk_score
1648 - domain_risk / domain_risk_band: derived from proposal.risk_score
1649 (currently a single code-domain score; extended as more domains land)
1650 - aggregate_risk_score: weighted mean across active domains
1651 - aggregate_risk_band: band for the aggregate score
1652 - approval_count / domains_approved / domains_pending_review:
1653 derived from pre-fetched reviews
1654 - all_merge_conditions_met: approval_count >= required_approvals
1655 and breakage_count == 0
1656 - author_type: resolved from pre-fetched MusehubIdentity rows
1657
1658 Performance contract:
1659 Zero DB I/O. All reads come from the ``prefetch`` maps. Typical
1660 wall time: < 1ms per row on warm prefetch data.
1661
1662 Args:
1663 proposal: ORM row for this proposal.
1664 prefetch: Pre-fetched batch data from ``_prefetch_for_batch``.
1665
1666 Returns:
1667 ``ProposalListEntry`` with all fields populated.
1668
1669 Raises:
1670 ValueError: If ``proposal.risk_score`` is outside ``[0.0, 1.0]``.
1671 """
1672 pid = proposal.proposal_id
1673
1674 # ── Risk ─────────────────────────────────────────────────────────────────
1675 # Use dimensional_risk dict (Phase 1 ORM columns) when present; fall back
1676 # to the scalar risk_score as code-domain risk for backwards compatibility.
1677 raw_dimensional = dict(getattr(proposal, "dimensional_risk", None) or {})
1678 if raw_dimensional:
1679 domain_risk = {d: float(v) for d, v in raw_dimensional.items() if float(v) > 0.0}
1680 else:
1681 code_risk = float(proposal.risk_score or 0.0)
1682 if not (0.0 <= code_risk <= 1.0):
1683 raise ValueError(f"proposal {pid}: risk_score {code_risk!r} out of [0, 1]")
1684 domain_risk = {"code": code_risk} if code_risk > 0.0 else {}
1685 active_domains = list(domain_risk.keys())
1686 domain_risk_band = {d: _score_to_band(v) for d, v in domain_risk.items()}
1687
1688 # Weighted aggregate
1689 if domain_risk:
1690 total_weight = sum(_DOMAIN_WEIGHTS.get(d, 1.0) for d in domain_risk)
1691 aggregate_risk_score = sum(
1692 v * _DOMAIN_WEIGHTS.get(d, 1.0) for d, v in domain_risk.items()
1693 ) / total_weight
1694 else:
1695 aggregate_risk_score = 0.0
1696 aggregate_risk_band = _score_to_band(aggregate_risk_score)
1697
1698 # ── Dependency position ───────────────────────────────────────────────────
1699 dag = prefetch.dag
1700 dep_blocked_by = blocked_by_numbers(dag, pid)
1701 dep_blocks = blocks_numbers(dag, pid)
1702 dep_is_blocked = is_blocked(dag, pid)
1703 mc_raw = proposal.merge_conditions or {}
1704 require_dep_merged: bool = mc_raw.get("require_dependency_merged", True)
1705 deps_satisfied = (not require_dep_merged) or (not dep_is_blocked)
1706
1707 # ── Reviews ───────────────────────────────────────────────────────────────
1708 reviews = prefetch.reviews_by_proposal.get(pid, [])
1709 approved_reviews = [r for r in reviews if r.state == "approved"]
1710 approval_count = len(approved_reviews)
1711 required_approvals = _DEFAULT_REQUIRED_APPROVALS
1712 domains_approved = ["code"] if approval_count > 0 and "code" in active_domains else []
1713 domains_pending_review = [d for d in active_domains if d not in domains_approved]
1714 all_merge_conditions_met = (
1715 approval_count >= required_approvals
1716 and proposal.breakage_count == 0
1717 and deps_satisfied
1718 )
1719
1720 # ── Author type ───────────────────────────────────────────────────────────
1721 author_type = prefetch.author_types.get(proposal.author, "human")
1722 agent_model: str | None = getattr(proposal, "agent_model", None)
1723 agent_spawned_by: str | None = getattr(proposal, "agent_spawned_by", None)
1724
1725 # ── Symbol preview ────────────────────────────────────────────────────────
1726 touched = list(proposal.touched_symbols or [])
1727 touched_symbols_preview = touched[:3]
1728
1729 return ProposalListEntry(
1730 proposal_id=proposal.proposal_id,
1731 proposal_number=proposal.proposal_number,
1732 title=(proposal.title[:80] + "…") if len(proposal.title) > 80 else proposal.title,
1733 state=proposal.state,
1734 proposal_type=getattr(proposal, "proposal_type", "state_merge"),
1735 from_branch=proposal.from_branch,
1736 to_branch=proposal.to_branch,
1737 author=proposal.author,
1738 author_type=author_type,
1739 created_at=proposal.created_at,
1740 merged_at=proposal.merged_at,
1741 is_draft=getattr(proposal, "is_draft", proposal.state == "drafting"),
1742 active_domains=active_domains,
1743 domain_risk=domain_risk,
1744 domain_risk_band=domain_risk_band,
1745 aggregate_risk_score=round(aggregate_risk_score, 4),
1746 aggregate_risk_band=aggregate_risk_band,
1747 approval_count=approval_count,
1748 required_approvals=required_approvals,
1749 domains_approved=domains_approved,
1750 domains_pending_review=domains_pending_review,
1751 all_merge_conditions_met=all_merge_conditions_met,
1752 blocked_by=dep_blocked_by,
1753 blocks=dep_blocks,
1754 is_blocked=dep_is_blocked,
1755 symbols_changed=proposal.symbols_changed,
1756 breakage_count=proposal.breakage_count,
1757 test_gap_count=proposal.test_gap_count,
1758 touched_symbols_preview=touched_symbols_preview,
1759 midi_tracks_changed=getattr(proposal, "midi_tracks_changed", 0),
1760 midi_notes_delta=getattr(proposal, "midi_notes_delta", 0),
1761 harmonic_tension_delta=getattr(proposal, "harmonic_tension_delta", None),
1762 payment_claim_count=getattr(proposal, "payment_claim_count", 0),
1763 payment_ledger_delta_nano=getattr(proposal, "payment_ledger_delta_nano", 0),
1764 payment_avax_address=getattr(proposal, "payment_avax_address", None),
1765 payment_settling=proposal.state == "settling" and "pay" in active_domains,
1766 agent_model=agent_model,
1767 agent_spawned_by=agent_spawned_by,
1768 merge_strategy=getattr(proposal, "merge_strategy", "overlay") or "overlay",
1769 simulation_conflict_count=prefetch.conflict_counts.get(pid),
1770 )
1771
1772
1773 async def enrich_proposal_list_entry(
1774 proposal: MusehubProposal,
1775 session: AsyncSession,
1776 ) -> ProposalListEntry:
1777 """Compute all display-facing fields for a single proposal list row.
1778
1779 Convenience wrapper around ``_enrich_one`` for callers that need to enrich
1780 a single proposal without a pre-existing batch context. Issues its own
1781 prefetch queries (2 DB round-trips).
1782
1783 For a full page (≥2 proposals) prefer ``enrich_proposal_list_batch`` which
1784 amortises the prefetch cost across all rows via a single parallel pass.
1785
1786 Args:
1787 proposal: ORM row for this proposal. Must have ``repo_id`` set.
1788 session: Async database session.
1789
1790 Returns:
1791 ``ProposalListEntry`` with all fields populated.
1792
1793 Raises:
1794 ValueError: If ``proposal.risk_score`` is outside ``[0.0, 1.0]``.
1795 """
1796 prefetch = await _prefetch_for_batch([proposal], session)
1797 return _enrich_one(proposal, prefetch)
1798
1799
1800 async def enrich_proposal_list_batch(
1801 proposals: list[MusehubProposal],
1802 session: AsyncSession,
1803 ) -> list[ProposalListEntry]:
1804 """Enrich a full page of proposal rows in a single parallel pass.
1805
1806 Pre-fetch strategy:
1807 Issues exactly 2 DB queries for the entire batch (reviews + identity
1808 types), then calls ``_enrich_one`` for each row synchronously.
1809 The synchronous per-row work is CPU-only (no I/O), so no concurrency
1810 overhead is needed.
1811
1812 Ordering:
1813 Result list is in the same order as ``proposals``.
1814
1815 Args:
1816 proposals: ORM rows for the current page (typically ≤ 20).
1817 session: Async session shared across the batch.
1818
1819 Returns:
1820 List of ``ProposalListEntry`` in the same order as ``proposals``.
1821
1822 Performance target: < 50ms for 20 proposals (dominated by the 2 DB
1823 queries; per-row computation is < 0.1ms).
1824 """
1825 if not proposals:
1826 return []
1827 prefetch = await _prefetch_for_batch(proposals, session)
1828 return [_enrich_one(p, prefetch) for p in proposals]
1829
1830
1831 async def get_domain_heat(
1832 repo_id: str,
1833 state: str,
1834 session: AsyncSession,
1835 ) -> DomainHeatResponse:
1836 """Return per-domain proposal counts and average risk for the heat bar.
1837
1838 Runs a single aggregation query against ``musehub_proposals`` filtered by
1839 ``repo_id`` and ``state``. The heat bar currently reflects the code domain
1840 only (the only domain with populated risk scores at this phase); additional
1841 domains are added as multi-domain risk rows land in Phase 2.
1842
1843 ``avg_risk`` is the arithmetic mean of non-zero ``risk_score`` values for
1844 proposals in the given state. Domains with zero matching proposals are
1845 omitted from the response dict.
1846
1847 Args:
1848 repo_id: Repository to query.
1849 state: Proposal state filter (e.g. ``"open"``). Pass ``"open"`` for
1850 the standard heat bar view. Pass ``"all"`` to skip the state
1851 filter entirely.
1852 session: Async session.
1853
1854 Returns:
1855 ``DomainHeatResponse`` with ``domains`` dict and ``total_open`` count.
1856
1857 Performance target: < 20ms (single aggregation query, no per-row work).
1858 """
1859 conditions = [MusehubProposal.repo_id == repo_id]
1860 if state != "all":
1861 conditions.append(MusehubProposal.state == state)
1862
1863 total: int = (
1864 await session.execute(
1865 select(func.count(MusehubProposal.proposal_id)).where(*conditions)
1866 )
1867 ).scalar_one()
1868
1869 # Code domain: all proposals in this repo/state (code is the only domain for now).
1870 # avg_risk computed from non-null, non-zero risk_score values only.
1871 risk_rows = list(
1872 (
1873 await session.execute(
1874 select(MusehubProposal.risk_score).where(
1875 *conditions,
1876 MusehubProposal.risk_score.isnot(None),
1877 MusehubProposal.risk_score > 0.0,
1878 )
1879 )
1880 ).scalars()
1881 )
1882 avg_risk = round(sum(risk_rows) / len(risk_rows), 4) if risk_rows else 0.0
1883
1884 # All known domains — code count = total (single-domain repos); others = 0.
1885 # Multi-domain heat will populate midi/stems/pay when those proposals land.
1886 domains: dict[str, DomainHeatEntry] = {
1887 "code": DomainHeatEntry(count=total, avg_risk=avg_risk),
1888 "midi": DomainHeatEntry(count=0, avg_risk=0.0),
1889 "stems": DomainHeatEntry(count=0, avg_risk=0.0),
1890 "pay": DomainHeatEntry(count=0, avg_risk=0.0),
1891 }
1892
1893 return DomainHeatResponse(domains=domains, total_open=total)
1894
1895
1896 async def get_merge_readiness(
1897 repo_id: str,
1898 session: AsyncSession,
1899 ) -> MergeReadinessResponse:
1900 """Bucket all non-merged proposals into readiness categories.
1901
1902 Categories:
1903 ready: approval_count >= required threshold AND breakage_count == 0
1904 settling: state == 'settling'
1905 needs_review: not settling, conditions not fully met
1906
1907 Dependency-blocked proposals (``blocked_by`` non-empty) are not yet
1908 tracked in the DB at this phase; ``blocked`` will always be empty until
1909 the dependency graph table lands in Phase 2.
1910
1911 Runs in one DB query; no per-row enrichment.
1912
1913 Args:
1914 repo_id: Repository to query.
1915 session: Async session.
1916
1917 Returns:
1918 ``MergeReadinessResponse`` with ``ready``, ``blocked``, ``settling``,
1919 and ``needs_review`` lists of proposal numbers.
1920
1921 Performance target: < 20ms.
1922 """
1923 rows = list(
1924 (
1925 await session.execute(
1926 select(
1927 MusehubProposal.proposal_number,
1928 MusehubProposal.state,
1929 MusehubProposal.breakage_count,
1930 ).where(
1931 MusehubProposal.repo_id == repo_id,
1932 MusehubProposal.state.notin_(["merged", "abandoned"]),
1933 )
1934 )
1935 ).all()
1936 )
1937
1938 # Pre-fetch approval counts in one query
1939 proposal_numbers_to_check = [r.proposal_number for r in rows]
1940 if proposal_numbers_to_check:
1941 approval_counts_rows = list(
1942 (
1943 await session.execute(
1944 select(
1945 MusehubProposal.proposal_number,
1946 func.count(MusehubProposalReview.review_id).label("approved_count"),
1947 )
1948 .join(
1949 MusehubProposalReview,
1950 (MusehubProposalReview.proposal_id == MusehubProposal.proposal_id)
1951 & (MusehubProposalReview.state == "approved"),
1952 isouter=True,
1953 )
1954 .where(
1955 MusehubProposal.repo_id == repo_id,
1956 MusehubProposal.proposal_number.in_(proposal_numbers_to_check),
1957 )
1958 .group_by(MusehubProposal.proposal_number)
1959 )
1960 ).all()
1961 )
1962 approval_by_number: dict[int, int] = {r.proposal_number: r.approved_count for r in approval_counts_rows}
1963 else:
1964 approval_by_number = {}
1965
1966 ready: list[int] = []
1967 blocked: list[int] = []
1968 settling: list[int] = []
1969 needs_review: list[int] = []
1970
1971 for row in rows:
1972 if row.state == "settling":
1973 settling.append(row.proposal_number)
1974 elif (
1975 approval_by_number.get(row.proposal_number, 0) >= _DEFAULT_REQUIRED_APPROVALS
1976 and row.breakage_count == 0
1977 ):
1978 ready.append(row.proposal_number)
1979 else:
1980 needs_review.append(row.proposal_number)
1981
1982 return MergeReadinessResponse(
1983 ready=ready,
1984 blocked=blocked,
1985 settling=settling,
1986 needs_review=needs_review,
1987 )
1988
1989
1990 # ─────────────────────────────────────────────────────────────────────────────
1991 # Phase 4 — Simulation Engine
1992 # ─────────────────────────────────────────────────────────────────────────────
1993
1994 _VALID_SIMULATION_TYPES = {"conflict_scan", "risk_projection", "dependency_order"}
1995
1996
1997 def _to_simulation_response(row: MusehubProposalSimulation, *, is_stale: bool) -> SimulationResponse:
1998 return SimulationResponse(
1999 simulation_id=row.simulation_id,
2000 proposal_id=row.proposal_id,
2001 simulation_type=row.simulation_type,
2002 result=row.result,
2003 is_stale=is_stale,
2004 from_branch_commit_id=row.from_branch_commit_id,
2005 duration_ms=row.duration_ms,
2006 created_at=row.created_at,
2007 expires_at=row.expires_at,
2008 )
2009
2010
2011 async def _current_from_commit(
2012 session: AsyncSession,
2013 repo_id: str,
2014 from_branch: str,
2015 ) -> str:
2016 """Return the current head_commit_id of from_branch, or '' if missing."""
2017 branch = await _get_branch(session, repo_id, from_branch)
2018 return (branch.head_commit_id or "") if branch else ""
2019
2020
2021 async def run_simulation(
2022 session: AsyncSession,
2023 repo_id: str,
2024 proposal_id: str,
2025 simulation_type: str,
2026 ) -> SimulationResponse:
2027 """Run a simulation for the proposal and upsert the cached result.
2028
2029 Always recomputes — use get_simulation to read the cache without re-running.
2030
2031 Raises:
2032 ValueError: Unknown simulation_type or proposal not found.
2033 """
2034 import time
2035
2036 from musehub.services.proposal_simulation import (
2037 simulate_conflict_scan,
2038 simulate_dependency_order,
2039 simulate_risk_projection,
2040 )
2041 from musehub.services.musehub_snapshot import get_snapshot_manifest
2042
2043 if simulation_type not in _VALID_SIMULATION_TYPES:
2044 raise ValueError(
2045 f"Unknown simulation_type '{simulation_type}'. "
2046 f"Valid types: {sorted(_VALID_SIMULATION_TYPES)}"
2047 )
2048
2049 stmt = select(MusehubProposal).where(
2050 MusehubProposal.repo_id == repo_id,
2051 MusehubProposal.proposal_id == proposal_id,
2052 )
2053 proposal = (await session.execute(stmt)).scalar_one_or_none()
2054 if proposal is None:
2055 raise ValueError(f"Proposal {proposal_id} not found in repo {repo_id}")
2056
2057 from_commit_id = await _current_from_commit(session, repo_id, proposal.from_branch)
2058 to_b = await _get_branch(session, repo_id, proposal.to_branch)
2059 from_b = await _get_branch(session, repo_id, proposal.from_branch)
2060
2061 to_manifest: StrDict = {}
2062 if to_b and to_b.head_commit_id:
2063 to_head = await session.get(MusehubCommit, to_b.head_commit_id)
2064 if to_head and to_head.snapshot_id:
2065 to_manifest = await get_snapshot_manifest(session, to_head.snapshot_id)
2066
2067 from_manifest: StrDict = {}
2068 if from_b and from_b.head_commit_id:
2069 from_head = await session.get(MusehubCommit, from_b.head_commit_id)
2070 if from_head and from_head.snapshot_id:
2071 from_manifest = await get_snapshot_manifest(session, from_head.snapshot_id)
2072
2073 strategy_name = getattr(proposal, "merge_strategy", "overlay") or "overlay"
2074 selective_domains: list[str] | None = getattr(proposal, "selective_domains", None)
2075 dimensional_risk: dict[str, float] | None = getattr(proposal, "dimensional_risk", None)
2076
2077 ancestor_manifest: StrDict | None = None
2078 if strategy_name in ("weave", "replay", "selective", "phased"):
2079 ancestor_manifest = await _resolve_ancestor_manifest(
2080 session, repo_id, proposal.from_branch, proposal.to_branch
2081 )
2082
2083 t0 = time.monotonic()
2084 if simulation_type == "conflict_scan":
2085 result_payload = simulate_conflict_scan(
2086 to_manifest,
2087 from_manifest,
2088 ancestor_manifest=ancestor_manifest,
2089 strategy=strategy_name,
2090 selective_domains=selective_domains,
2091 )
2092 elif simulation_type == "risk_projection":
2093 result_payload = simulate_risk_projection(
2094 to_manifest,
2095 from_manifest,
2096 ancestor_manifest=ancestor_manifest,
2097 current_dimensional_risk=dimensional_risk,
2098 strategy=strategy_name,
2099 selective_domains=selective_domains,
2100 )
2101 else: # dependency_order
2102 dag = await load_dag_for_proposals(session, [proposal_id])
2103 result_payload = simulate_dependency_order(dag)
2104
2105 duration_ms = int((time.monotonic() - t0) * 1000)
2106
2107 sim_id = compute_simulation_id(proposal_id, simulation_type, from_commit_id)
2108 now = _utc_now()
2109
2110 # Upsert: update on conflict (same proposal_id + simulation_type)
2111 existing_stmt = select(MusehubProposalSimulation).where(
2112 MusehubProposalSimulation.proposal_id == proposal_id,
2113 MusehubProposalSimulation.simulation_type == simulation_type,
2114 )
2115 existing = (await session.execute(existing_stmt)).scalar_one_or_none()
2116
2117 if existing is not None:
2118 existing.simulation_id = sim_id
2119 existing.from_branch_commit_id = from_commit_id
2120 existing.result = result_payload
2121 existing.duration_ms = duration_ms
2122 existing.created_at = now
2123 row = existing
2124 else:
2125 row = MusehubProposalSimulation(
2126 simulation_id=sim_id,
2127 proposal_id=proposal_id,
2128 simulation_type=simulation_type,
2129 from_branch_commit_id=from_commit_id,
2130 result=result_payload,
2131 duration_ms=duration_ms,
2132 created_at=now,
2133 )
2134 session.add(row)
2135
2136 await session.flush()
2137
2138 logger.info(
2139 "🔬 Simulation %s for proposal %s (%dms)",
2140 simulation_type, proposal_id, duration_ms
2141 )
2142 return _to_simulation_response(row, is_stale=False)
2143
2144
2145 async def get_simulation(
2146 session: AsyncSession,
2147 repo_id: str,
2148 proposal_id: str,
2149 simulation_type: str,
2150 ) -> SimulationResponse | None:
2151 """Return the cached simulation result, or None if never run.
2152
2153 Sets ``is_stale=True`` when the from_branch has advanced since the
2154 simulation was last run. Does NOT re-run; call run_simulation for that.
2155 """
2156 if simulation_type not in _VALID_SIMULATION_TYPES:
2157 raise ValueError(
2158 f"Unknown simulation_type '{simulation_type}'. "
2159 f"Valid types: {sorted(_VALID_SIMULATION_TYPES)}"
2160 )
2161
2162 stmt = select(MusehubProposalSimulation).where(
2163 MusehubProposalSimulation.proposal_id == proposal_id,
2164 MusehubProposalSimulation.simulation_type == simulation_type,
2165 )
2166 row = (await session.execute(stmt)).scalar_one_or_none()
2167 if row is None:
2168 return None
2169
2170 # Staleness check: compare stored commit ID with current branch tip
2171 proposal_stmt = select(MusehubProposal.from_branch).where(
2172 MusehubProposal.proposal_id == proposal_id,
2173 MusehubProposal.repo_id == repo_id,
2174 )
2175 from_branch = (await session.execute(proposal_stmt)).scalar_one_or_none()
2176 is_stale = False
2177 if from_branch:
2178 current_commit = await _current_from_commit(session, repo_id, from_branch)
2179 is_stale = bool(current_commit) and current_commit != row.from_branch_commit_id
2180
2181 return _to_simulation_response(row, is_stale=is_stale)
2182
2183
2184 async def list_simulations(
2185 session: AsyncSession,
2186 repo_id: str,
2187 proposal_id: str,
2188 ) -> SimulationListResponse:
2189 """Return all cached simulations for a proposal.
2190
2191 Includes staleness flags. Never re-runs.
2192 """
2193 # Resolve from_branch once for staleness checks
2194 proposal_stmt = select(MusehubProposal.from_branch).where(
2195 MusehubProposal.proposal_id == proposal_id,
2196 MusehubProposal.repo_id == repo_id,
2197 )
2198 from_branch = (await session.execute(proposal_stmt)).scalar_one_or_none()
2199 current_commit = ""
2200 if from_branch:
2201 current_commit = await _current_from_commit(session, repo_id, from_branch)
2202
2203 rows_stmt = select(MusehubProposalSimulation).where(
2204 MusehubProposalSimulation.proposal_id == proposal_id,
2205 )
2206 rows = list((await session.execute(rows_stmt)).scalars().all())
2207
2208 responses = [
2209 _to_simulation_response(
2210 r,
2211 is_stale=bool(current_commit) and current_commit != r.from_branch_commit_id,
2212 )
2213 for r in rows
2214 ]
2215 return SimulationListResponse(simulations=responses, total=len(responses))
File History 11 commits
sha256:400438cf8bc700a611f1ba798aa9def68290f487dc19f7dbf317985ad17050c9 chore: delete muse/prose domain — hallucinated, never existed Sonnet 4.6 minor 3 days ago
sha256:4d42a346263e7cbbd152c147f3e6f24576f4b4440df9249ffb9fbcf9db699fcb feat: populate url in create_issue and create_proposal responses Sonnet 4.6 minor 3 days ago
sha256:3707eba7ad42cadedf18c8b9c534d839b88cfd1c30924c3c5a3edc74e1d809de feat: add url field to mist, issue, and proposal list/read … Sonnet 4.6 minor 3 days ago
sha256:d110dd71fb7c1f5e064162de1262b2976841a00d7549bc4f441045f5c13ef33f feat: add MergeResultEmbed to ProposalResponse (deliverable 5) Sonnet 4.6 minor 4 days ago
sha256:50b52eda7afb2f122863aef47d684d1a9e4684b48f5f95367fc956e28ceb7d42 refactor: rename merge strategy aliases to canonical names Sonnet 4.6 minor 9 days ago
sha256:af9422a68cbd2db7c88f664388e11134b0ae0057ee5ad14465d82208548a9d7d changing --event to --verdict. displaying changes requested… Human minor 11 days ago
sha256:a909058d727faac4d77f6e659cc0b1f9315efcb6aabfd870d08763525a67093d dialing in --strategy and --history on merge proposal Human minor 11 days ago
sha256:1f95928652d220172ddd56fb71bdd7bee3158e1b86f1b8ba8b2470edde498c6a update proposal detail w/ --history and --strategy flags Human minor 12 days ago
sha256:1b1a47509ee8154b9c61b68e8e871f6f6dd4d48125c5bcc1df61733bd3657c42 insert `MusehubCommitGraph` row when `merge_proposal` creat… Human patch 12 days ago
sha256:f3995ec2c05c9c34b0e4d6e96349a811d0117a1c51d78096d757998ccb3c0520 fix: blobs only in S3/mpack — remove commit/snapshot indivi… Sonnet 4.6 patch 14 days ago
sha256:e597c0b97ade9c3c52ac4735ceb437ee69d1b6f0db61b8d7caa6467c5866566d feat(phase2): write commit objects to S3 at all 5 write sit… Sonnet 4.6 patch 17 days ago