"""Phase 2 — Dependency DAG Engine tests (issue #37). Tier 1 — Unit (pure, no DB) - build_dag: adjacency, merged_ids, number_by_id - topological_sort: linear chain, branching diamond, multi-root, isolated nodes - topological_sort: raises CycleError on cycles; cycle_ids populated correctly - detect_cycle: True/False without side effects - blocked_by_numbers: live deps only (merged dependencies excluded) - blocks_numbers: reverse direction - is_blocked: True/False contract Tier 5 — Integration (DB) - create_proposal wires depends_on → MusehubProposalDependency rows - create_proposal rejects unknown dependency IDs - create_proposal detects and rejects cycles via CycleError - enrich_proposal_list_batch populates blocked_by / blocks / is_blocked - all_merge_conditions_met is False while hard dep is unmerged - all_merge_conditions_met is True once dep is merged - merge_proposal gates on unmerged dependencies (raises RuntimeError) - merge_proposal succeeds once all deps are merged - Merging dep A unblocks dep B in subsequent enrichment """ from __future__ import annotations import uuid from datetime import datetime, timezone import pytest from sqlalchemy.ext.asyncio import AsyncSession from musehub.services.proposal_dag import ( CycleError, ProposalDag, blocked_by_numbers, blocks_numbers, build_dag, detect_cycle, is_blocked, topological_sort, ) # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── def _now() -> datetime: return datetime.now(tz=timezone.utc) def _uid() -> str: return uuid.uuid4().hex[:12] def _ids(n: int) -> list[str]: return [f"sha256:{'%064x' % i}" for i in range(1, n + 1)] async def _make_repo(session: AsyncSession) -> str: from musehub.core.genesis import compute_identity_id, compute_repo_id from musehub.db.musehub_repo_models import MusehubBranch, MusehubRepo from musehub.db.musehub_social_models import MusehubProposal, MusehubProposalDependency owner = "dagtest" slug = f"repo-{_uid()}" owner_id = compute_identity_id(owner.encode()) created_at = _now() repo = MusehubRepo( repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()), name=slug, owner=owner, slug=slug, visibility="public", owner_user_id=owner_id, description="", tags=[], created_at=created_at, ) session.add(repo) await session.flush() return repo.repo_id async def _make_branch(session: AsyncSession, repo_id: str, name: str) -> None: from musehub.core.genesis import compute_branch_id from musehub.db.musehub_repo_models import MusehubBranch, MusehubRepo from musehub.db.musehub_social_models import MusehubProposal, MusehubProposalDependency branch = MusehubBranch( branch_id=compute_branch_id(repo_id, name), repo_id=repo_id, name=name, ) session.add(branch) await session.flush() async def _create_proposal( session: AsyncSession, repo_id: str, *, from_branch: str, number: int = 1, depends_on: list[str] | None = None, ) -> "MusehubProposal": from musehub.services.musehub_proposals import create_proposal return await create_proposal( session, repo_id=repo_id, title=f"proposal {number}", from_branch=from_branch, to_branch="dev", author="dagtest", depends_on=depends_on or [], ) # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — Unit: build_dag # ───────────────────────────────────────────────────────────────────────────── class TestBuildDag: def test_empty_edges(self) -> None: dag = build_dag([]) assert dag.nodes == set() assert len(dag.depends_on) == 0 def test_single_edge(self) -> None: a, b = _ids(2) dag = build_dag([(b, a)]) # B depends on A assert a in dag.nodes assert b in dag.nodes assert a in dag.depends_on[b] assert b in dag.required_by[a] def test_merged_ids_populated(self) -> None: a, b = _ids(2) dag = build_dag([(b, a)], merged_ids=[a]) assert a in dag.merged_ids assert b not in dag.merged_ids def test_number_by_id_populated(self) -> None: a, b = _ids(2) dag = build_dag([(b, a)], number_by_id={a: 1, b: 2}) assert dag.number_by_id[a] == 1 assert dag.number_by_id[b] == 2 # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — Unit: topological_sort # ───────────────────────────────────────────────────────────────────────────── class TestTopologicalSort: def test_empty_dag(self) -> None: dag = build_dag([]) assert topological_sort(dag) == [] def test_linear_chain_abc(self) -> None: a, b, c = _ids(3) dag = build_dag([(b, a), (c, b)], number_by_id={a: 1, b: 2, c: 3}) order = topological_sort(dag) assert order.index(a) < order.index(b) < order.index(c) def test_diamond_shape(self) -> None: # A → B, A → C, B → D, C → D (D depends on B and C, which both depend on A) a, b, c, d = _ids(4) dag = build_dag([(b, a), (c, a), (d, b), (d, c)]) order = topological_sort(dag) assert order.index(a) < order.index(b) assert order.index(a) < order.index(c) assert order.index(b) < order.index(d) assert order.index(c) < order.index(d) def test_multi_root(self) -> None: a, b, c = _ids(3) # A and B are independent roots; C depends on both dag = build_dag([(c, a), (c, b)]) order = topological_sort(dag) assert order.index(a) < order.index(c) assert order.index(b) < order.index(c) def test_merged_nodes_excluded_from_output(self) -> None: a, b = _ids(2) dag = build_dag([(b, a)], merged_ids=[a]) order = topological_sort(dag) assert a not in order assert b in order def test_cycle_two_nodes_raises(self) -> None: a, b = _ids(2) dag = build_dag([(a, b), (b, a)]) with pytest.raises(CycleError) as exc_info: topological_sort(dag) assert a in exc_info.value.cycle_ids or b in exc_info.value.cycle_ids def test_cycle_three_nodes_raises(self) -> None: a, b, c = _ids(3) dag = build_dag([(b, a), (c, b), (a, c)]) # A←B←C←A with pytest.raises(CycleError) as exc_info: topological_sort(dag) assert len(exc_info.value.cycle_ids) >= 2 def test_cycle_error_message_contains_ids(self) -> None: a, b = _ids(2) dag = build_dag([(a, b), (b, a)]) with pytest.raises(CycleError) as exc_info: topological_sort(dag) msg = str(exc_info.value) assert "cycle" in msg.lower() def test_merged_breaks_cycle(self) -> None: a, b = _ids(2) # A depends on B, B depends on A — but A is merged, so B is free dag = build_dag([(a, b), (b, a)], merged_ids=[a]) order = topological_sort(dag) assert b in order assert a not in order # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — Unit: detect_cycle # ───────────────────────────────────────────────────────────────────────────── class TestDetectCycle: def test_no_cycle_returns_false(self) -> None: a, b = _ids(2) dag = build_dag([(b, a)]) assert detect_cycle(dag) is False def test_cycle_returns_true(self) -> None: a, b = _ids(2) dag = build_dag([(a, b), (b, a)]) assert detect_cycle(dag) is True def test_no_side_effects_on_dag(self) -> None: a, b, c = _ids(3) dag = build_dag([(b, a), (c, b)]) detect_cycle(dag) assert c in dag.depends_on # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — Unit: blocked_by_numbers / blocks_numbers / is_blocked # ───────────────────────────────────────────────────────────────────────────── class TestBlockedByAndBlocks: def test_no_dependencies_not_blocked(self) -> None: a = _ids(1)[0] dag = build_dag([], number_by_id={a: 1}) assert blocked_by_numbers(dag, a) == [] assert is_blocked(dag, a) is False def test_live_dependency_blocks(self) -> None: a, b = _ids(2) dag = build_dag([(b, a)], number_by_id={a: 1, b: 2}) assert blocked_by_numbers(dag, b) == [1] assert is_blocked(dag, b) is True def test_merged_dependency_not_blocking(self) -> None: a, b = _ids(2) dag = build_dag([(b, a)], merged_ids=[a], number_by_id={a: 1, b: 2}) assert blocked_by_numbers(dag, b) == [] assert is_blocked(dag, b) is False def test_partial_merge_still_blocked(self) -> None: a, b, c = _ids(3) # C depends on both A and B; only A merged dag = build_dag([(c, a), (c, b)], merged_ids=[a], number_by_id={a: 1, b: 2, c: 3}) assert blocked_by_numbers(dag, c) == [2] # B still unmerged assert is_blocked(dag, c) is True def test_blocks_numbers(self) -> None: a, b, c = _ids(3) # B and C both depend on A dag = build_dag([(b, a), (c, a)], number_by_id={a: 1, b: 2, c: 3}) assert blocks_numbers(dag, a) == [2, 3] def test_blocks_excludes_merged_waiters(self) -> None: a, b = _ids(2) dag = build_dag([(b, a)], merged_ids=[b], number_by_id={a: 1, b: 2}) assert blocks_numbers(dag, a) == [] def test_blocked_by_sorted_ascending(self) -> None: a, b, c, d = _ids(4) dag = build_dag([(d, c), (d, b), (d, a)], number_by_id={a: 1, b: 2, c: 3, d: 4}) result = blocked_by_numbers(dag, d) assert result == sorted(result) # ───────────────────────────────────────────────────────────────────────────── # Tier 5 — Integration: DB-backed DAG operations # ───────────────────────────────────────────────────────────────────────────── class TestDagIntegration: @pytest.mark.asyncio async def test_create_proposal_persists_dependency_edges( self, db_session: AsyncSession ) -> None: from sqlalchemy import select from musehub.db.musehub_repo_models import MusehubBranch, MusehubRepo from musehub.db.musehub_social_models import MusehubProposal, MusehubProposalDependency repo_id = await _make_repo(db_session) await _make_branch(db_session, repo_id, "feat/p1") await _make_branch(db_session, repo_id, "feat/p2") await _make_branch(db_session, repo_id, "dev") p1 = await _create_proposal(db_session, repo_id, from_branch="feat/p1", number=1) p2 = await _create_proposal( db_session, repo_id, from_branch="feat/p2", number=2, depends_on=[p1.proposal_id] ) edge_rows = list( ( await db_session.execute( select(MusehubProposalDependency).where( MusehubProposalDependency.dependent_proposal_id == p2.proposal_id ) ) ).scalars() ) assert len(edge_rows) == 1 assert edge_rows[0].dependency_proposal_id == p1.proposal_id @pytest.mark.asyncio async def test_create_proposal_rejects_unknown_dependency( self, db_session: AsyncSession ) -> None: repo_id = await _make_repo(db_session) await _make_branch(db_session, repo_id, "feat/p1") await _make_branch(db_session, repo_id, "dev") fake_id = "sha256:" + "f" * 64 with pytest.raises(ValueError, match="unknown proposal IDs"): await _create_proposal( db_session, repo_id, from_branch="feat/p1", depends_on=[fake_id] ) @pytest.mark.asyncio async def test_create_proposal_rejects_cycle( self, db_session: AsyncSession ) -> None: from musehub.services.proposal_dag import create_dependency_edges repo_id = await _make_repo(db_session) await _make_branch(db_session, repo_id, "feat/p1") await _make_branch(db_session, repo_id, "feat/p2") await _make_branch(db_session, repo_id, "dev") p1 = await _create_proposal(db_session, repo_id, from_branch="feat/p1", number=1) p2 = await _create_proposal( db_session, repo_id, from_branch="feat/p2", number=2, depends_on=[p1.proposal_id] ) # p2 depends on p1. Adding p1 depends-on p2 would create a cycle. with pytest.raises(CycleError): await create_dependency_edges( db_session, p1.proposal_id, [p2.proposal_id] ) @pytest.mark.asyncio async def test_enrich_batch_populates_blocked_by( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_proposals import enrich_proposal_list_batch repo_id = await _make_repo(db_session) await _make_branch(db_session, repo_id, "feat/p1") await _make_branch(db_session, repo_id, "feat/p2") await _make_branch(db_session, repo_id, "dev") p1_resp = await _create_proposal(db_session, repo_id, from_branch="feat/p1", number=1) p2_resp = await _create_proposal( db_session, repo_id, from_branch="feat/p2", number=2, depends_on=[p1_resp.proposal_id] ) from musehub.db.musehub_repo_models import MusehubBranch, MusehubRepo from musehub.db.musehub_social_models import MusehubProposal, MusehubProposalDependency from sqlalchemy import select p1_orm = (await db_session.execute( select(MusehubProposal).where(MusehubProposal.proposal_id == p1_resp.proposal_id) )).scalar_one() p2_orm = (await db_session.execute( select(MusehubProposal).where(MusehubProposal.proposal_id == p2_resp.proposal_id) )).scalar_one() entries = await enrich_proposal_list_batch([p1_orm, p2_orm], db_session) by_id = {e.proposal_id: e for e in entries} # p2 is blocked by p1 p2_entry = by_id[p2_orm.proposal_id] assert p2_entry.is_blocked is True assert p1_orm.proposal_number in p2_entry.blocked_by # p1 blocks p2 p1_entry = by_id[p1_orm.proposal_id] assert p2_orm.proposal_number in p1_entry.blocks assert p1_entry.is_blocked is False @pytest.mark.asyncio async def test_all_merge_conditions_false_while_dep_unmerged( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_proposals import enrich_proposal_list_batch from musehub.db.musehub_repo_models import MusehubBranch, MusehubRepo from musehub.db.musehub_social_models import MusehubProposal, MusehubProposalDependency from sqlalchemy import select repo_id = await _make_repo(db_session) await _make_branch(db_session, repo_id, "feat/p1") await _make_branch(db_session, repo_id, "feat/p2") await _make_branch(db_session, repo_id, "dev") p1 = await _create_proposal(db_session, repo_id, from_branch="feat/p1", number=1) p2_orm = await _create_proposal( db_session, repo_id, from_branch="feat/p2", number=2, depends_on=[p1.proposal_id] ) p2_fresh = (await db_session.execute( select(MusehubProposal).where(MusehubProposal.proposal_id == p2_orm.proposal_id) )).scalar_one() entries = await enrich_proposal_list_batch([p2_fresh], db_session) assert entries[0].all_merge_conditions_met is False @pytest.mark.asyncio async def test_merge_proposal_gated_by_unmerged_dep( self, db_session: AsyncSession ) -> None: from musehub.services.musehub_proposals import merge_proposal repo_id = await _make_repo(db_session) await _make_branch(db_session, repo_id, "feat/p1") await _make_branch(db_session, repo_id, "feat/p2") await _make_branch(db_session, repo_id, "dev") p1 = await _create_proposal(db_session, repo_id, from_branch="feat/p1", number=1) p2_orm = await _create_proposal( db_session, repo_id, from_branch="feat/p2", number=2, depends_on=[p1.proposal_id] ) with pytest.raises(RuntimeError, match="unmerged dependencies"): await merge_proposal(db_session, repo_id, p2_orm.proposal_id)