"""Phase 4 — Simulation Engine tests (issue #37). Tier 1 — Unit (pure, no DB) simulate_conflict_scan: - no conflicts when branches don't share files - conflicts surfaced when both sides changed the same file (with ancestor) - conflicts_by_domain groups paths correctly - files_added / files_modified / files_removed counts forwarded from strategy - strategy_used reflects the chosen strategy simulate_risk_projection: - zero risk when nothing changes - change ratio drives risk_band upward - conflict files increase conflict_ratio component - existing dimensional_risk factored in at 20% weight - risk_delta positive when post-merge risk exceeds pre-merge - risk_delta negative when merge reduces per-domain risk (edge case) - domains_affected lists only changed domains simulate_dependency_order: - linear chain yields correct order - two independent nodes appear in phase 1 - node with one dep lands in phase 2 - cycle detected returns cycle_detected=True and cycle_ids - merged deps excluded from blocking (live-edge filtering) Tier 2 — genesis compute_simulation_id: - deterministic for same inputs - different simulation_type produces different ID - different from_branch_commit_id produces different ID Tier 5 — Integration (DB) run_simulation: - conflict_scan runs and returns SimulationResponse - risk_projection runs and returns SimulationResponse - dependency_order runs and returns SimulationResponse - re-running updates the cached row (upsert semantics) - unknown simulation_type raises ValueError get_simulation: - returns None when not yet run - returns cached result after run - is_stale=True when from_branch head advances list_simulations: - empty list before any simulations run - all three types listed after running each """ from __future__ import annotations import os from datetime import datetime, timezone from typing import Any import pytest from muse.core.types import blob_id, fake_id, short_id from sqlalchemy.ext.asyncio import AsyncSession from musehub.types.json_types import StrDict from musehub.services.proposal_simulation import ( simulate_conflict_scan, simulate_dependency_order, simulate_risk_projection, ) from musehub.services.proposal_dag import ProposalDag # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── def _now() -> datetime: return datetime.now(tz=timezone.utc) def _uid() -> str: return short_id(blob_id(os.urandom(16)), strip=True) def _oid(label: int | str) -> str: return fake_id(str(label)) def _dag( *, depends_on: dict[str, set[str]] | None = None, merged_ids: set[str] | None = None, ) -> ProposalDag: dep_input = depends_on or {} merged = merged_ids or set() nodes: set[str] = set() for pid, deps in dep_input.items(): nodes.add(pid) nodes.update(deps) # Every node must be a key in depends_on (dependency nodes have empty sets) dep: dict[str, set[str]] = {n: set() for n in nodes} dep.update(dep_input) required_by: dict[str, set[str]] = {} for pid, deps in dep.items(): for d in deps: required_by.setdefault(d, set()).add(pid) return ProposalDag( depends_on=dep, required_by=required_by, nodes=nodes, merged_ids=merged, number_by_id={n: i for i, n in enumerate(sorted(nodes))}, ) # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — simulate_conflict_scan # ───────────────────────────────────────────────────────────────────────────── class TestConflictScan: def test_no_conflicts_when_disjoint_files(self) -> None: to = {"a.py": _oid(1)} frm = {"b.py": _oid(2)} r = simulate_conflict_scan(to, frm) assert r["conflict_count"] == 0 assert r["conflicting_files"] == [] def test_conflicts_surfaced_with_ancestor(self) -> None: anc = {"shared.py": _oid(0)} to = {"shared.py": _oid(1)} frm = {"shared.py": _oid(2)} r = simulate_conflict_scan(to, frm, ancestor_manifest=anc) assert r["conflict_count"] == 1 assert "shared.py" in r["conflicting_files"] def test_no_conflict_when_only_one_side_changed(self) -> None: anc = {"shared.py": _oid(0), "other.py": _oid(1)} to = {"shared.py": _oid(0), "other.py": _oid(2)} # shared unchanged frm = {"shared.py": _oid(3)} # from changed shared r = simulate_conflict_scan(to, frm, ancestor_manifest=anc) assert r["conflict_count"] == 0 def test_conflicts_by_domain_groups_paths(self) -> None: anc = {"src/a.py": _oid(0), "tracks/b.mid": _oid(1)} to = {"src/a.py": _oid(2), "tracks/b.mid": _oid(3)} frm = {"src/a.py": _oid(4), "tracks/b.mid": _oid(5)} r = simulate_conflict_scan(to, frm, ancestor_manifest=anc) assert r["conflict_count"] == 2 assert "code" in r["conflicts_by_domain"] assert "midi" in r["conflicts_by_domain"] assert "src/a.py" in r["conflicts_by_domain"]["code"] assert "tracks/b.mid" in r["conflicts_by_domain"]["midi"] def test_files_added_count(self) -> None: to = {"a.py": _oid(1)} frm = {"a.py": _oid(2), "new.py": _oid(3)} # new.py added r = simulate_conflict_scan(to, frm) assert r["files_added"] == 1 def test_strategy_used_reflected(self) -> None: r = simulate_conflict_scan({}, {"x.py": _oid(1)}, strategy="overlay") assert r["strategy_used"] == "overlay" def test_domains_affected_lists_changed_domains(self) -> None: frm = {"tracks/beat.mid": _oid(1), "src/main.py": _oid(2)} r = simulate_conflict_scan({}, frm) assert "midi" in r["domains_affected"] assert "code" in r["domains_affected"] def test_empty_manifests_zero_conflicts(self) -> None: r = simulate_conflict_scan({}, {}) assert r["conflict_count"] == 0 assert r["files_added"] == 0 assert r["files_modified"] == 0 assert r["files_removed"] == 0 # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — simulate_risk_projection # ───────────────────────────────────────────────────────────────────────────── class TestRiskProjection: def test_zero_risk_when_nothing_changes(self) -> None: manifest = {"a.py": _oid(1)} r = simulate_risk_projection(manifest, manifest) assert r["overall_projected_risk"] == 0.0 assert r["risk_band"] == "low" assert r["files_changed_count"] == 0 def test_risk_band_low_for_small_change(self) -> None: to = {f"f{i}.py": _oid(i) for i in range(20)} frm = dict(to) frm["f0.py"] = _oid(99) # one file changed r = simulate_risk_projection(to, frm) assert r["risk_band"] == "low" def test_risk_band_increases_with_conflicts(self) -> None: anc = {"a.py": _oid(0)} to = {"a.py": _oid(1)} frm = {"a.py": _oid(2)} r = simulate_risk_projection(to, frm, ancestor_manifest=anc) # conflict_ratio = 1.0 → risk component is 0.4 * change_ratio + 0.4 * 1.0 # at minimum the conflict component pushes risk above 0 assert r["overall_projected_risk"] > 0 def test_existing_risk_factored_in(self) -> None: to = {"a.py": _oid(1)} frm = {"a.py": _oid(2)} r_no_prior = simulate_risk_projection(to, frm) r_with_prior = simulate_risk_projection( to, frm, current_dimensional_risk={"code": 0.9} ) # Adding existing high risk should increase or maintain overall projected risk assert r_with_prior["overall_projected_risk"] >= r_no_prior["overall_projected_risk"] def test_risk_delta_positive_when_risk_increases(self) -> None: to = {"a.py": _oid(1)} frm = {"a.py": _oid(2)} r = simulate_risk_projection(to, frm, current_dimensional_risk={"code": 0.0}) # Changing a file with zero prior risk → delta > 0 assert r["risk_delta"] >= 0.0 def test_domains_affected_populated(self) -> None: to = {"src/main.py": _oid(1)} frm = {"src/main.py": _oid(2), "tracks/beat.mid": _oid(3)} r = simulate_risk_projection(to, frm) assert "code" in r["domains_affected"] assert "midi" in r["domains_affected"] def test_files_changed_count(self) -> None: to = {"a.py": _oid(1), "b.py": _oid(2)} frm = {"a.py": _oid(3), "b.py": _oid(2)} # only a changed r = simulate_risk_projection(to, frm) assert r["files_changed_count"] == 1 def test_conflict_count_matches_merge_result(self) -> None: anc = {"a.py": _oid(0), "b.py": _oid(1)} to = {"a.py": _oid(2), "b.py": _oid(3)} frm = {"a.py": _oid(4), "b.py": _oid(5)} r = simulate_risk_projection(to, frm, ancestor_manifest=anc) assert r["conflict_count"] == 2 # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — simulate_dependency_order # ───────────────────────────────────────────────────────────────────────────── class TestDependencyOrder: def test_single_node_no_deps(self) -> None: dag = _dag(depends_on={"p1": set()}) r = simulate_dependency_order(dag) assert r["cycle_detected"] is False assert r["node_count"] == 1 assert "p1" in r["order"] def test_linear_chain_correct_order(self) -> None: # p3 depends on p2, p2 depends on p1 dag = _dag(depends_on={"p2": {"p1"}, "p3": {"p2"}}) r = simulate_dependency_order(dag) assert r["cycle_detected"] is False order = r["order"] assert order.index("p1") < order.index("p2") < order.index("p3") def test_two_independent_nodes_in_phase_1(self) -> None: dag = _dag(depends_on={"p1": set(), "p2": set()}) r = simulate_dependency_order(dag) assert len(r["phases"]) == 1 assert set(r["phases"][0]) == {"p1", "p2"} def test_node_with_dep_in_phase_2(self) -> None: dag = _dag(depends_on={"p1": set(), "p2": {"p1"}}) r = simulate_dependency_order(dag) assert len(r["phases"]) == 2 assert "p1" in r["phases"][0] assert "p2" in r["phases"][1] def test_cycle_detected(self) -> None: # p1 → p2 → p1 dag = _dag(depends_on={"p1": {"p2"}, "p2": {"p1"}}) r = simulate_dependency_order(dag) assert r["cycle_detected"] is True assert len(r["cycle_ids"]) > 0 assert r["order"] == [] assert r["phases"] == [] def test_merged_deps_excluded_from_blocking(self) -> None: # p2 depends on p1; p1 is already merged → p2 has no live blocks dag = _dag(depends_on={"p2": {"p1"}}, merged_ids={"p1"}) r = simulate_dependency_order(dag) assert r["cycle_detected"] is False # p2 is the only unmerged node assert "p2" in r["order"] def test_phase_count_equals_len_phases(self) -> None: dag = _dag(depends_on={"p1": set(), "p2": {"p1"}, "p3": {"p2"}}) r = simulate_dependency_order(dag) assert r["phase_count"] == len(r["phases"]) def test_empty_dag_returns_empty(self) -> None: dag = ProposalDag( depends_on={}, required_by={}, nodes=set(), merged_ids=set(), number_by_id={}, ) r = simulate_dependency_order(dag) assert r["order"] == [] assert r["phases"] == [] assert r["cycle_detected"] is False # ───────────────────────────────────────────────────────────────────────────── # Tier 2 — compute_simulation_id # ───────────────────────────────────────────────────────────────────────────── class TestComputeSimulationId: def test_deterministic(self) -> None: from musehub.core.genesis import compute_simulation_id a = compute_simulation_id("prop-1", "conflict_scan", "sha256:abc") b = compute_simulation_id("prop-1", "conflict_scan", "sha256:abc") assert a == b def test_different_type_different_id(self) -> None: from musehub.core.genesis import compute_simulation_id a = compute_simulation_id("prop-1", "conflict_scan", "sha256:abc") b = compute_simulation_id("prop-1", "risk_projection", "sha256:abc") assert a != b def test_different_commit_different_id(self) -> None: from musehub.core.genesis import compute_simulation_id a = compute_simulation_id("prop-1", "conflict_scan", "sha256:aaa") b = compute_simulation_id("prop-1", "conflict_scan", "sha256:bbb") assert a != b def test_sha256_prefixed(self) -> None: from musehub.core.genesis import compute_simulation_id sid = compute_simulation_id("prop-1", "conflict_scan", "sha256:abc") assert sid.startswith("sha256:") # ───────────────────────────────────────────────────────────────────────────── # Tier 5 — Integration (DB) # ───────────────────────────────────────────────────────────────────────────── async def _make_repo(session: AsyncSession) -> str: from musehub.core.genesis import compute_identity_id, compute_repo_id from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubRepo from musehub.db.musehub_social_models import MusehubProposalSimulation owner = "simtest" slug = f"sim-{_uid()}" owner_id = compute_identity_id(owner.encode()) created_at = _now() repo = MusehubRepo( repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()), name=slug, owner=owner, slug=slug, visibility="public", owner_user_id=owner_id, description="", tags=[], created_at=created_at, ) session.add(repo) await session.flush() return repo.repo_id async def _make_branch_with_commit( session: AsyncSession, repo_id: str, branch_name: str, manifest: StrDict, ) -> str: """Create branch + commit + snapshot. Returns commit_id.""" from musehub.core.genesis import compute_branch_id, compute_identity_id from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitRef, MusehubRepo from musehub.db.musehub_social_models import MusehubProposalSimulation from musehub.muse_cli.snapshot import compute_commit_id, compute_snapshot_id from musehub.services.musehub_snapshot import upsert_snapshot_entries created_at = _now() snapshot_id = compute_snapshot_id(manifest) await upsert_snapshot_entries(session, repo_id, snapshot_id, manifest) commit_id = compute_commit_id( [], snapshot_id, f"init {branch_name}", created_at.isoformat(), author="simtest", signer_public_key="", ) commit = MusehubCommit( commit_id=commit_id, branch=branch_name, parent_ids=[], message=f"init {branch_name}", author="simtest", timestamp=created_at, snapshot_id=snapshot_id, ) session.add(commit) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id)) branch = MusehubBranch( branch_id=compute_branch_id(repo_id, branch_name), repo_id=repo_id, name=branch_name, head_commit_id=commit_id, ) session.add(branch) await session.flush() return commit_id async def _make_proposal( session: AsyncSession, repo_id: str, from_branch: str = "feat/x", to_branch: str = "dev", ) -> str: """Create a proposal; returns proposal_id.""" from musehub.services.musehub_proposals import create_proposal p = await create_proposal( session, repo_id=repo_id, title="sim test proposal", from_branch=from_branch, to_branch=to_branch, author="simtest", author_identity_id=fake_id("simtest-identity"), ) return p.proposal_id class TestRunSimulation: @pytest.mark.asyncio async def test_conflict_scan_runs(self, db_session: AsyncSession) -> None: from musehub.services.musehub_proposals import run_simulation repo_id = await _make_repo(db_session) await _make_branch_with_commit( db_session, repo_id, "dev", {"a.py": _oid(1)} ) await _make_branch_with_commit( db_session, repo_id, "feat/x", {"a.py": _oid(2), "b.py": _oid(3)} ) proposal_id = await _make_proposal(db_session, repo_id) result = await run_simulation(db_session, repo_id, proposal_id, "conflict_scan") assert result.simulation_type == "conflict_scan" assert result.proposal_id == proposal_id assert "conflict_count" in result.result assert result.is_stale is False assert result.simulation_id.startswith("sha256:") @pytest.mark.asyncio async def test_risk_projection_runs(self, db_session: AsyncSession) -> None: from musehub.services.musehub_proposals import run_simulation repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {"a.py": _oid(1)}) await _make_branch_with_commit( db_session, repo_id, "feat/x", {"a.py": _oid(2)} ) proposal_id = await _make_proposal(db_session, repo_id) result = await run_simulation(db_session, repo_id, proposal_id, "risk_projection") assert result.simulation_type == "risk_projection" assert "overall_projected_risk" in result.result assert "risk_band" in result.result @pytest.mark.asyncio async def test_dependency_order_runs(self, db_session: AsyncSession) -> None: from musehub.services.musehub_proposals import run_simulation repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit(db_session, repo_id, "feat/x", {}) proposal_id = await _make_proposal(db_session, repo_id) result = await run_simulation(db_session, repo_id, proposal_id, "dependency_order") assert result.simulation_type == "dependency_order" assert "order" in result.result assert "phases" in result.result assert result.result["cycle_detected"] is False @pytest.mark.asyncio async def test_rerun_upserts_row(self, db_session: AsyncSession) -> None: from musehub.services.musehub_proposals import run_simulation from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubRepo from musehub.db.musehub_social_models import MusehubProposalSimulation from sqlalchemy import select, func repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit(db_session, repo_id, "feat/x", {}) proposal_id = await _make_proposal(db_session, repo_id) await run_simulation(db_session, repo_id, proposal_id, "conflict_scan") await run_simulation(db_session, repo_id, proposal_id, "conflict_scan") count = ( await db_session.execute( select(func.count()).select_from(MusehubProposalSimulation).where( MusehubProposalSimulation.proposal_id == proposal_id, MusehubProposalSimulation.simulation_type == "conflict_scan", ) ) ).scalar_one() assert count == 1 # exactly one row — upsert not insert @pytest.mark.asyncio async def test_unknown_type_raises(self, db_session: AsyncSession) -> None: from musehub.services.musehub_proposals import run_simulation repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit(db_session, repo_id, "feat/x", {}) proposal_id = await _make_proposal(db_session, repo_id) with pytest.raises(ValueError, match="Unknown simulation_type"): await run_simulation(db_session, repo_id, proposal_id, "not_a_type") class TestGetSimulation: @pytest.mark.asyncio async def test_returns_none_before_run(self, db_session: AsyncSession) -> None: from musehub.services.musehub_proposals import get_simulation repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit(db_session, repo_id, "feat/x", {}) proposal_id = await _make_proposal(db_session, repo_id) result = await get_simulation(db_session, repo_id, proposal_id, "conflict_scan") assert result is None @pytest.mark.asyncio async def test_returns_cached_after_run(self, db_session: AsyncSession) -> None: from musehub.services.musehub_proposals import get_simulation, run_simulation repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit(db_session, repo_id, "feat/x", {}) proposal_id = await _make_proposal(db_session, repo_id) await run_simulation(db_session, repo_id, proposal_id, "conflict_scan") result = await get_simulation(db_session, repo_id, proposal_id, "conflict_scan") assert result is not None assert result.simulation_type == "conflict_scan" assert result.is_stale is False @pytest.mark.asyncio async def test_is_stale_when_branch_advances(self, db_session: AsyncSession) -> None: from musehub.services.musehub_proposals import get_simulation, run_simulation from musehub.core.genesis import compute_branch_id from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitRef, MusehubRepo from musehub.db.musehub_social_models import MusehubProposalSimulation from musehub.muse_cli.snapshot import compute_commit_id, compute_snapshot_id from musehub.services.musehub_snapshot import upsert_snapshot_entries repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit( db_session, repo_id, "feat/x", {"a.py": _oid(1)} ) proposal_id = await _make_proposal(db_session, repo_id) # Run simulation with initial commit await run_simulation(db_session, repo_id, proposal_id, "conflict_scan") # Advance from_branch to a new commit new_manifest = {"a.py": _oid(99), "extra.py": _oid(100)} new_snapshot = compute_snapshot_id(new_manifest) await upsert_snapshot_entries(db_session, repo_id, new_snapshot, new_manifest) new_commit_id = compute_commit_id( [], new_snapshot, "advance", _now().isoformat(), author="simtest", signer_public_key="", ) new_commit = MusehubCommit( commit_id=new_commit_id, branch="feat/x", parent_ids=[], message="advance", author="simtest", timestamp=_now(), snapshot_id=new_snapshot, ) db_session.add(new_commit) db_session.add(MusehubCommitRef(repo_id=repo_id, commit_id=new_commit_id)) # Update branch head branch_id = compute_branch_id(repo_id, "feat/x") branch_row = await db_session.get(MusehubBranch, branch_id) assert branch_row is not None branch_row.head_commit_id = new_commit_id await db_session.flush() result = await get_simulation(db_session, repo_id, proposal_id, "conflict_scan") assert result is not None assert result.is_stale is True class TestListSimulations: @pytest.mark.asyncio async def test_empty_before_any_run(self, db_session: AsyncSession) -> None: from musehub.services.musehub_proposals import list_simulations repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit(db_session, repo_id, "feat/x", {}) proposal_id = await _make_proposal(db_session, repo_id) resp = await list_simulations(db_session, repo_id, proposal_id) assert resp.total == 0 assert resp.simulations == [] @pytest.mark.asyncio async def test_all_three_types_listed(self, db_session: AsyncSession) -> None: from musehub.services.musehub_proposals import list_simulations, run_simulation repo_id = await _make_repo(db_session) await _make_branch_with_commit(db_session, repo_id, "dev", {}) await _make_branch_with_commit(db_session, repo_id, "feat/x", {}) proposal_id = await _make_proposal(db_session, repo_id) for sim_type in ("conflict_scan", "risk_projection", "dependency_order"): await run_simulation(db_session, repo_id, proposal_id, sim_type) resp = await list_simulations(db_session, repo_id, proposal_id) assert resp.total == 3 types_returned = {s.simulation_type for s in resp.simulations} assert types_returned == {"conflict_scan", "risk_projection", "dependency_order"}