"""Tests for ``muse coord forecast``. Coverage matrix --------------- Unit ~~~~ * :class:`_ConflictPrediction` — construction, ``to_dict`` * Pass 1: direct address overlap detection * Pass 2: blast-radius overlap — call graph available path * Pass 2: call graph unavailable — ``(OSError, KeyError, ValueError, AttributeError)`` produces a warning, not a silent skip and not a crash * Pass 2: no commits yet → warning emitted * Pass 3: operation conflict detection Integration ~~~~~~~~~~~ * ``muse coord forecast`` — empty swarm * ``muse coord forecast`` — address_overlap detected * ``muse coord forecast`` — operation_conflict detected * ``muse coord forecast --branch`` filtering * ``muse coord forecast --format json`` — schema complete, all required keys * ``muse coord forecast --json`` — shorthand works * ``muse coord forecast`` text output — warnings visible when call graph absent * JSON: ``call_graph_available`` is ``False`` when index missing * JSON: ``partial_forecast`` true when any pass was skipped * JSON: ``warnings`` non-empty when call graph absent * JSON: ``duration_ms`` present and non-negative * JSON: ``current_branch`` and ``branch_filter`` present * JSON: compact output (no indent=2 newlines) * Text output: released reservations excluded (active_reservations) Input validation ~~~~~~~~~~~~~~~~ * ``--run-id`` at max length (256): accepted * ``--run-id`` over max length: exits USER_ERROR (1), compact JSON error * ``--min-confidence`` at boundary 0.0: accepted (shows all) * ``--min-confidence`` at boundary 1.0: accepted (shows only certainties) * ``--min-confidence`` = 1.5: exits USER_ERROR (1), compact JSON error * ``--min-confidence`` = -0.1: exits USER_ERROR (1), compact JSON error * validation fires before any file I/O --run-id filter ~~~~~~~~~~~~~~~ * filters to conflicts involving the named agent * unknown agent returns empty conflicts list * does not affect reservations/intents counts in output --min-confidence filter ~~~~~~~~~~~~~~~~~~~~~~~ * hides conflicts below threshold * shows conflicts at exactly the threshold * 0.9 threshold: only high-risk conflicts * risk counts (high/medium/low) reflect post-filter list Security ~~~~~~~~ * ANSI escape sequences in run_id / branch / address sanitized before output * Control characters stripped from agent labels in text output * address_glob is never used for filesystem access Stress ~~~~~~ * 100 reservations × 100 addresses → Pass 1 in < 2 s * 50 reservations → Pass 2 with mock call graph in < 1 s * 200 intents → Pass 3 in < 1 s * 500 reservations / 500 intents — combined forecast in < 5 s """ from __future__ import annotations import argparse import datetime import json import os import pathlib import time from unittest.mock import MagicMock, patch import pytest from muse.cli.commands.forecast import _MAX_RUN_ID_LEN from muse.core.types import MsgpackDict, fake_id from muse.core.paths import muse_dir from muse.core.coordination import Reservation from muse.core.errors import ExitCode # ── Helpers ─────────────────────────────────────────────────────────────────── def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: import json as _json dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") (dot_muse / "repo.json").write_text( _json.dumps({"repo_id": fake_id("repo"), "name": "test-repo"}) ) return tmp_path def _now_utc() -> datetime.datetime: return datetime.datetime.now(datetime.timezone.utc) def _run_forecast( repo: pathlib.Path, *, branch_filter: str | None = None, run_id_filter: str | None = None, min_confidence: float = 0.0, fmt: str = "json", ) -> tuple[int, MsgpackDict]: """Run the forecast command and return (exit_code, parsed_output).""" from muse.cli.commands.forecast import run as forecast_run ns = argparse.Namespace( branch_filter=branch_filter, run_id_filter=run_id_filter, min_confidence=min_confidence, json_out=fmt == "json", ) old = os.getcwd() os.chdir(repo) try: forecast_run(ns) return 0, {} except SystemExit as exc: return exc.code, {} finally: os.chdir(old) def _run_forecast_json( repo: pathlib.Path, *, branch_filter: str | None = None, run_id_filter: str | None = None, min_confidence: float = 0.0, capsys: pytest.CaptureFixture[str], ) -> MsgpackDict: """Run forecast in JSON mode and return the parsed output dict.""" _run_forecast( repo, branch_filter=branch_filter, run_id_filter=run_id_filter, min_confidence=min_confidence, fmt="json", ) return json.loads(capsys.readouterr().out) def _run_forecast_text( repo: pathlib.Path, *, branch_filter: str | None = None, run_id_filter: str | None = None, min_confidence: float = 0.0, capsys: pytest.CaptureFixture[str], ) -> str: """Run forecast in text mode and return the captured output.""" _run_forecast( repo, branch_filter=branch_filter, run_id_filter=run_id_filter, min_confidence=min_confidence, fmt="text", ) return capsys.readouterr().out # ── Fixtures ────────────────────────────────────────────────────────────────── @pytest.fixture() def repo(tmp_path: pathlib.Path) -> pathlib.Path: return _make_repo(tmp_path) @pytest.fixture() def repo_with_conflict(repo: pathlib.Path) -> tuple[pathlib.Path, Reservation, Reservation]: """Two active reservations sharing an address on different branches.""" from muse.core.coordination import create_reservation res_a = create_reservation( repo, run_id="agent-41", branch="main", addresses=["src/billing.py::compute_total"], ttl_seconds=3600, ) res_b = create_reservation( repo, run_id="agent-42", branch="feature/billing", addresses=["src/billing.py::compute_total"], ttl_seconds=3600, ) return repo, res_a, res_b @pytest.fixture() def repo_with_op_conflict(repo: pathlib.Path) -> pathlib.Path: """One agent intends delete, another intends modify on same address.""" from muse.core.coordination import create_reservation, create_intent res_a = create_reservation( repo, run_id="agent-del", branch="main", addresses=["src/api.py::old_endpoint"], ttl_seconds=3600, ) res_b = create_reservation( repo, run_id="agent-mod", branch="feat/new", addresses=["src/api.py::old_endpoint"], ttl_seconds=3600, ) create_intent( repo, res_a.reservation_id, "agent-del", "main", ["src/api.py::old_endpoint"], "delete", "removing deprecated endpoint", ) create_intent( repo, res_b.reservation_id, "agent-mod", "feat/new", ["src/api.py::old_endpoint"], "modify", "extend response shape", ) return repo # ───────────────────────────────────────────────────────────────────────────── # Unit tests — _ConflictPrediction # ───────────────────────────────────────────────────────────────────────────── class TestConflictPrediction: def test_to_dict_keys(self) -> None: from muse.cli.commands.forecast import _ConflictPrediction c = _ConflictPrediction( conflict_type="address_overlap", addresses=["src/billing.py::compute_total"], agents=["agent-41@main", "agent-42@feat"], confidence=1.0, description="direct overlap", ) d = c.to_dict() assert set(d.keys()) == { "conflict_type", "addresses", "agents", "confidence", "description" } def test_confidence_rounded(self) -> None: from muse.cli.commands.forecast import _ConflictPrediction c = _ConflictPrediction("x", [], [], 0.749999, "") assert c.to_dict()["confidence"] == 0.75 def test_all_conflict_types(self) -> None: from muse.cli.commands.forecast import _ConflictPrediction for ctype in ("address_overlap", "blast_radius_overlap", "operation_conflict"): c = _ConflictPrediction(ctype, [], [], 0.9, "desc") assert c.to_dict()["conflict_type"] == ctype # ───────────────────────────────────────────────────────────────────────────── # Unit tests — Pass 1: address overlap # ───────────────────────────────────────────────────────────────────────────── class TestPass1AddressOverlap: def test_overlap_detected(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_reservation create_reservation( repo, run_id="ag-1", branch="b1", addresses=["src/x.py::fn"], ttl_seconds=3600, ) create_reservation( repo, run_id="ag-2", branch="b2", addresses=["src/x.py::fn"], ttl_seconds=3600, ) data = _run_forecast_json(repo, capsys=capsys) conflict_types = [c["conflict_type"] for c in data["conflicts"]] assert "address_overlap" in conflict_types def test_no_overlap_when_different_addresses(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_reservation create_reservation( repo, run_id="ag-1", branch="b", addresses=["x.py::a"], ttl_seconds=3600, ) create_reservation( repo, run_id="ag-2", branch="b", addresses=["x.py::b"], ttl_seconds=3600, ) data = _run_forecast_json(repo, capsys=capsys) conflict_types = [c["conflict_type"] for c in data["conflicts"]] assert "address_overlap" not in conflict_types def test_same_agent_same_address_no_conflict(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_reservation create_reservation( repo, run_id="same-agent", branch="b", addresses=["x.py::fn"], ttl_seconds=3600, ) data = _run_forecast_json(repo, capsys=capsys) assert data["conflicts"] == [] def test_overlap_confidence_is_1(self, repo_with_conflict: tuple[pathlib.Path, Reservation, Reservation], capsys: pytest.CaptureFixture[str]) -> None: repo, *_ = repo_with_conflict data = _run_forecast_json(repo, capsys=capsys) overlaps = [c for c in data["conflicts"] if c["conflict_type"] == "address_overlap"] assert all(c["confidence"] == 1.0 for c in overlaps) def test_multiple_overlaps_all_reported(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_reservation for i in range(3): create_reservation( repo, run_id=f"ag-{i}", branch="b", addresses=[f"x.py::fn{i}", "shared.py::util"], ttl_seconds=3600, ) data = _run_forecast_json(repo, capsys=capsys) overlaps = [c for c in data["conflicts"] if c["conflict_type"] == "address_overlap"] # shared.py::util is claimed by 3 agents → 1 overlap entry. assert len(overlaps) >= 1 # ───────────────────────────────────────────────────────────────────────────── # Unit tests — Pass 2: blast-radius (call graph) # ───────────────────────────────────────────────────────────────────────────── class TestPass2BlastRadius: def test_call_graph_unavailable_oserror_warns(self, repo_with_conflict: tuple[pathlib.Path, Reservation, Reservation], capsys: pytest.CaptureFixture[str]) -> None: """OSError from build_reverse_graph → warning, not silent pass or crash.""" repo, *_ = repo_with_conflict with ( patch( "muse.cli.commands.forecast.resolve_commit_ref", return_value=MagicMock(commit_id="abc123"), ), patch( "muse.cli.commands.forecast.get_commit_snapshot_manifest", return_value={"src/billing.py": "sha"}, ), patch( "muse.cli.commands.forecast.build_reverse_graph", side_effect=OSError("index file missing"), ), ): data = _run_forecast_json(repo, capsys=capsys) assert data["call_graph_available"] is False assert any("call graph unavailable" in w for w in data["warnings"]) # Direct overlap still detected (Pass 1 always runs). assert any(c["conflict_type"] == "address_overlap" for c in data["conflicts"]) def test_call_graph_unavailable_keyerror_warns(self, repo_with_conflict: tuple[pathlib.Path, Reservation, Reservation], capsys: pytest.CaptureFixture[str]) -> None: repo, *_ = repo_with_conflict with ( patch("muse.cli.commands.forecast.resolve_commit_ref", return_value=MagicMock(commit_id="abc123")), patch("muse.cli.commands.forecast.get_commit_snapshot_manifest", return_value={}), patch("muse.cli.commands.forecast.build_reverse_graph", side_effect=KeyError("missing key")), ): data = _run_forecast_json(repo, capsys=capsys) assert data["call_graph_available"] is False assert data["warnings"] def test_call_graph_unavailable_valueerror_warns(self, repo_with_conflict: tuple[pathlib.Path, Reservation, Reservation], capsys: pytest.CaptureFixture[str]) -> None: repo, *_ = repo_with_conflict with ( patch("muse.cli.commands.forecast.resolve_commit_ref", return_value=MagicMock(commit_id="abc123")), patch("muse.cli.commands.forecast.get_commit_snapshot_manifest", return_value={}), patch("muse.cli.commands.forecast.build_reverse_graph", side_effect=ValueError("bad data")), ): data = _run_forecast_json(repo, capsys=capsys) assert data["call_graph_available"] is False assert data["warnings"] def test_call_graph_unavailable_attributeerror_warns(self, repo_with_conflict: tuple[pathlib.Path, Reservation, Reservation], capsys: pytest.CaptureFixture[str]) -> None: repo, *_ = repo_with_conflict with ( patch("muse.cli.commands.forecast.resolve_commit_ref", return_value=MagicMock(commit_id="abc123")), patch("muse.cli.commands.forecast.get_commit_snapshot_manifest", return_value={}), patch("muse.cli.commands.forecast.build_reverse_graph", side_effect=AttributeError("NoneType has no attribute")), ): data = _run_forecast_json(repo, capsys=capsys) assert data["call_graph_available"] is False assert data["warnings"] def test_unexpected_exception_propagates(self, repo_with_conflict: tuple[pathlib.Path, Reservation, Reservation], capsys: pytest.CaptureFixture[str]) -> None: """RuntimeError (unexpected) must NOT be silently swallowed.""" repo, *_ = repo_with_conflict with ( patch("muse.cli.commands.forecast.resolve_commit_ref", return_value=MagicMock(commit_id="abc123")), patch("muse.cli.commands.forecast.get_commit_snapshot_manifest", return_value={}), patch("muse.cli.commands.forecast.build_reverse_graph", side_effect=RuntimeError("index corrupted")), ): with pytest.raises(RuntimeError, match="index corrupted"): _run_forecast(repo, fmt="json") def test_no_commits_warns(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_reservation create_reservation( repo, run_id="ag", branch="main", addresses=["x.py::fn"], ttl_seconds=3600, ) with patch( "muse.cli.commands.forecast.resolve_commit_ref", return_value=None, ): data = _run_forecast_json(repo, capsys=capsys) assert data["call_graph_available"] is False assert any("no commits" in w for w in data["warnings"]) def test_blast_radius_overlap_detected_with_mock_graph(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """When call graph is available, blast_radius_overlap is detected.""" from muse.core.coordination import create_reservation create_reservation( repo, run_id="ag-1", branch="main", addresses=["billing.py::compute_total"], ttl_seconds=3600, ) create_reservation( repo, run_id="ag-2", branch="feat", addresses=["api.py::process_payment"], ttl_seconds=3600, ) # Mock: each address sees the other in its transitive callers, # so the test is correct regardless of dict iteration order. _caller_map = { "billing.py::compute_total": {1: ["api.py::process_payment"]}, "api.py::process_payment": {1: ["billing.py::compute_total"]}, } with ( patch("muse.cli.commands.forecast.resolve_commit_ref", return_value=MagicMock(commit_id="abc")), patch("muse.cli.commands.forecast.get_commit_snapshot_manifest", return_value={"billing.py": "sha", "api.py": "sha2"}), patch("muse.cli.commands.forecast.build_reverse_graph", return_value={}), patch("muse.cli.commands.forecast.transitive_callers", side_effect=lambda name, rev, max_depth=0: _caller_map.get(name, {})), ): data = _run_forecast_json(repo, capsys=capsys) assert data["call_graph_available"] is True assert any( c["conflict_type"] == "blast_radius_overlap" for c in data["conflicts"] ) blast = next( c for c in data["conflicts"] if c["conflict_type"] == "blast_radius_overlap" ) assert blast["confidence"] == 0.75 # ───────────────────────────────────────────────────────────────────────────── # Unit tests — Pass 3: operation conflicts # ───────────────────────────────────────────────────────────────────────────── class TestPass3OperationConflict: def test_delete_vs_modify_conflict(self, repo_with_op_conflict: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = repo_with_op_conflict data = _run_forecast_json(repo, capsys=capsys) op_conflicts = [ c for c in data["conflicts"] if c["conflict_type"] == "operation_conflict" ] assert len(op_conflicts) == 1 assert op_conflicts[0]["confidence"] == 0.9 assert "src/api.py::old_endpoint" in op_conflicts[0]["addresses"] def test_delete_vs_rename_conflict(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_intent, create_reservation res_a = create_reservation( repo, run_id="ag-del", branch="b", addresses=["x.py::fn"], ttl_seconds=3600, ) res_b = create_reservation( repo, run_id="ag-ren", branch="b2", addresses=["x.py::fn"], ttl_seconds=3600, ) create_intent(repo, res_a.reservation_id, "ag-del", "b", ["x.py::fn"], "delete", "") create_intent(repo, res_b.reservation_id, "ag-ren", "b2", ["x.py::fn"], "rename", "") data = _run_forecast_json(repo, capsys=capsys) op_conflicts = [c for c in data["conflicts"] if c["conflict_type"] == "operation_conflict"] assert op_conflicts def test_delete_vs_extract_conflict(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_intent, create_reservation res_a = create_reservation(repo, run_id="ag-del", branch="b", addresses=["x.py::fn"], ttl_seconds=3600) res_b = create_reservation(repo, run_id="ag-ext", branch="b2", addresses=["x.py::fn"], ttl_seconds=3600) create_intent(repo, res_a.reservation_id, "ag-del", "b", ["x.py::fn"], "delete", "") create_intent(repo, res_b.reservation_id, "ag-ext", "b2", ["x.py::fn"], "extract", "") data = _run_forecast_json(repo, capsys=capsys) assert any(c["conflict_type"] == "operation_conflict" for c in data["conflicts"]) def test_no_conflict_same_op_same_agent(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_intent, create_reservation res = create_reservation(repo, run_id="ag", branch="b", addresses=["x.py::fn"], ttl_seconds=3600) create_intent(repo, res.reservation_id, "ag", "b", ["x.py::fn"], "modify", "") create_intent(repo, res.reservation_id, "ag", "b", ["x.py::fn"], "modify", "again") data = _run_forecast_json(repo, capsys=capsys) assert not any(c["conflict_type"] == "operation_conflict" for c in data["conflicts"]) def test_two_modifies_no_conflict(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Two agents both modifying → no operation_conflict (both allowed).""" from muse.core.coordination import create_intent, create_reservation res_a = create_reservation(repo, run_id="ag-1", branch="b1", addresses=["x.py::fn"], ttl_seconds=3600) res_b = create_reservation(repo, run_id="ag-2", branch="b2", addresses=["x.py::fn"], ttl_seconds=3600) create_intent(repo, res_a.reservation_id, "ag-1", "b1", ["x.py::fn"], "modify", "") create_intent(repo, res_b.reservation_id, "ag-2", "b2", ["x.py::fn"], "modify", "") data = _run_forecast_json(repo, capsys=capsys) assert not any(c["conflict_type"] == "operation_conflict" for c in data["conflicts"]) # ───────────────────────────────────────────────────────────────────────────── # Integration tests — full CLI # ───────────────────────────────────────────────────────────────────────────── class TestForecastIntegration: def test_empty_swarm_json(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: data = _run_forecast_json(repo, capsys=capsys) assert data["conflicts"] == [] assert data["active_reservations"] == 0 assert data["intents_count"] == 0 def test_json_schema_complete(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: data = _run_forecast_json(repo, capsys=capsys) required_keys = { "schema", "current_branch", "branch_filter", "active_reservations", "intents_count", "call_graph_available", "warnings", "conflicts", "high_risk", "medium_risk", "low_risk", "duration_ms", } assert required_keys.issubset(data.keys()) def test_duration_ms_is_float(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: data = _run_forecast_json(repo, capsys=capsys) assert isinstance(data["duration_ms"], float) assert data["duration_ms"] >= 0 def test_current_branch_in_json(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: data = _run_forecast_json(repo, capsys=capsys) assert isinstance(data["current_branch"], str) def test_branch_filter_in_json(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: data = _run_forecast_json(repo, branch_filter="feat/x", capsys=capsys) assert data["branch_filter"] == "feat/x" def test_branch_filter_null_when_not_set(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: data = _run_forecast_json(repo, capsys=capsys) assert data["branch_filter"] is None def test_high_medium_low_risk_counts(self, repo_with_conflict: tuple[pathlib.Path, Reservation, Reservation], capsys: pytest.CaptureFixture[str]) -> None: repo, *_ = repo_with_conflict data = _run_forecast_json(repo, capsys=capsys) assert data["high_risk"] + data["medium_risk"] + data["low_risk"] == len(data["conflicts"]) def test_address_overlap_in_high_or_medium(self, repo_with_conflict: tuple[pathlib.Path, Reservation, Reservation], capsys: pytest.CaptureFixture[str]) -> None: repo, *_ = repo_with_conflict data = _run_forecast_json(repo, capsys=capsys) # address_overlap has confidence 1.0 → high risk assert data["high_risk"] >= 1 def test_branch_filter_excludes_other_branches(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_reservation create_reservation(repo, run_id="ag-1", branch="main", addresses=["x.py::fn"], ttl_seconds=3600) create_reservation(repo, run_id="ag-2", branch="feat", addresses=["x.py::fn"], ttl_seconds=3600) # Filter to "main" only — no overlap visible (only 1 agent on main). data = _run_forecast_json(repo, branch_filter="main", capsys=capsys) assert data["active_reservations"] == 1 assert data["conflicts"] == [] def test_format_json_flag(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: data = _run_forecast_json(repo, capsys=capsys) assert "conflicts" in data def test_text_format_default(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: text = _run_forecast_text(repo, capsys=capsys) assert "Conflict forecast" in text def test_text_shows_no_conflicts_message(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: text = _run_forecast_text(repo, capsys=capsys) assert "No conflicts predicted" in text def test_text_shows_conflict(self, repo_with_conflict: tuple[pathlib.Path, Reservation, Reservation], capsys: pytest.CaptureFixture[str]) -> None: repo, *_ = repo_with_conflict text = _run_forecast_text(repo, capsys=capsys) assert "address_overlap" in text def test_text_shows_elapsed(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: text = _run_forecast_text(repo, capsys=capsys) assert "s)" in text def test_text_shows_warning_when_call_graph_absent(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_reservation create_reservation(repo, run_id="ag", branch="b", addresses=["x.py::fn"], ttl_seconds=3600) with patch("muse.cli.commands.forecast.resolve_commit_ref", return_value=None): text = _run_forecast_text(repo, capsys=capsys) assert "Note:" in text def test_warnings_list_in_json_when_no_commits(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: with patch("muse.cli.commands.forecast.resolve_commit_ref", return_value=None): data = _run_forecast_json(repo, capsys=capsys) assert isinstance(data["warnings"], list) assert data["warnings"] def test_released_reservation_excluded_from_active(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_release, create_reservation res = create_reservation(repo, run_id="ag", branch="b", addresses=["x.py::fn"], ttl_seconds=3600) create_release(repo, res.reservation_id, "ag", "completed") data = _run_forecast_json(repo, capsys=capsys) # active_reservations uses active_reservations() which excludes released. assert data["active_reservations"] == 0 def test_expired_reservation_excluded(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import _reservations_dir, create_reservation import json as _json res = create_reservation(repo, run_id="ag", branch="b", addresses=["x.py::fn"], ttl_seconds=1) # Back-date. path = _reservations_dir(repo) / f"{res.reservation_id}.json" data = _json.loads(path.read_text()) data["expires_at"] = ( datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(seconds=10) ).isoformat() path.write_text(_json.dumps(data)) out = _run_forecast_json(repo, capsys=capsys) assert out["active_reservations"] == 0 # ───────────────────────────────────────────────────────────────────────────── # Security tests # ───────────────────────────────────────────────────────────────────────────── class TestForecastSecurity: def test_ansi_in_run_id_stripped_from_text(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_reservation ansi_run_id = "\x1b[31mmalicious\x1b[0m" create_reservation( repo, run_id=ansi_run_id, branch="b", addresses=["x.py::fn"], ttl_seconds=3600, ) create_reservation( repo, run_id="normal-agent", branch="b2", addresses=["x.py::fn"], ttl_seconds=3600, ) text = _run_forecast_text(repo, capsys=capsys) # ANSI escape bytes must not appear in text output. assert "\x1b[" not in text assert "malicious" in text # sanitized content still shown def test_ansi_in_branch_stripped_from_text(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_reservation malicious_branch = "\x1b[32mfeat/malicious\x1b[0m" create_reservation( repo, run_id="ag-1", branch=malicious_branch, addresses=["x.py::fn"], ttl_seconds=3600, ) create_reservation( repo, run_id="ag-2", branch="b2", addresses=["x.py::fn"], ttl_seconds=3600, ) text = _run_forecast_text(repo, capsys=capsys) assert "\x1b[" not in text def test_control_chars_in_address_stripped_from_text(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_reservation malicious_addr = "src/x.py::fn\x00\r\n" create_reservation( repo, run_id="ag-1", branch="b", addresses=[malicious_addr], ttl_seconds=3600, ) create_reservation( repo, run_id="ag-2", branch="b2", addresses=[malicious_addr], ttl_seconds=3600, ) text = _run_forecast_text(repo, capsys=capsys) assert "\x00" not in text assert "\r" not in text def test_address_filtering_uses_no_filesystem(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """address_glob filtering in Pass 1/3 must not access the filesystem.""" from muse.core.coordination import create_reservation # Address looks like a path traversal — must be treated as a pure string. create_reservation( repo, run_id="ag-1", branch="b", addresses=["../../etc/passwd::root"], ttl_seconds=3600, ) create_reservation( repo, run_id="ag-2", branch="b2", addresses=["../../etc/passwd::root"], ttl_seconds=3600, ) # Should not raise FileNotFoundError or similar. data = _run_forecast_json(repo, capsys=capsys) assert any(c["conflict_type"] == "address_overlap" for c in data["conflicts"]) # ───────────────────────────────────────────────────────────────────────────── # Stress tests # ───────────────────────────────────────────────────────────────────────────── class TestForecastStress: def test_100_reservations_100_addresses_pass1_under_2s(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Pass 1 with 100 reservations sharing many addresses completes in < 2 s.""" from muse.core.coordination import create_reservation N = 100 shared_addresses = [f"src/mod{i % 10}.py::fn{i % 10}" for i in range(10)] for i in range(N): create_reservation( repo, run_id=f"ag-{i}", branch=f"feat/branch-{i}", addresses=[shared_addresses[i % 10], f"src/unique{i}.py::fn"], ttl_seconds=3600, ) t0 = time.monotonic() data = _run_forecast_json(repo, capsys=capsys) elapsed = time.monotonic() - t0 assert elapsed < 2.0, f"Pass 1 took {elapsed:.2f}s — too slow" # Each shared address has many agents → many overlaps. assert data["conflicts"] def test_50_reservations_mock_callgraph_under_1s(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Pass 2 with 50 reservations and a mock call graph completes in < 1 s.""" from muse.core.coordination import create_reservation N = 50 for i in range(N): create_reservation( repo, run_id=f"ag-{i}", branch=f"b{i}", addresses=[f"src/f{i}.py::fn{i}"], ttl_seconds=3600, ) # Mock call graph: no transitive relationships → no blast-radius conflicts. with ( patch("muse.cli.commands.forecast.resolve_commit_ref", return_value=MagicMock(commit_id="abc")), patch("muse.cli.commands.forecast.get_commit_snapshot_manifest", return_value={f"src/f{i}.py": f"sha{i}" for i in range(N)}), patch("muse.cli.commands.forecast.build_reverse_graph", return_value={}), ): t0 = time.monotonic() data = _run_forecast_json(repo, capsys=capsys) elapsed = time.monotonic() - t0 assert elapsed < 1.0, f"Pass 2 took {elapsed:.2f}s — too slow" assert data["call_graph_available"] is True def test_200_intents_pass3_under_1s(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Pass 3 with 200 intents completes in < 1 s.""" from muse.core.coordination import create_intent, create_reservation N = 200 for i in range(N): res = create_reservation( repo, run_id=f"ag-{i}", branch=f"b{i}", addresses=[f"f{i}.py::fn"], ttl_seconds=3600, ) op = "delete" if i % 2 == 0 else "modify" create_intent( repo, res.reservation_id, f"ag-{i}", f"b{i}", [f"f{i}.py::fn"], op, "", ) t0 = time.monotonic() data = _run_forecast_json(repo, capsys=capsys) elapsed = time.monotonic() - t0 assert elapsed < 1.0, f"Pass 3 took {elapsed:.2f}s — too slow" # Pairs of delete+modify on same address → operation_conflict entries. # Only agents sharing the same address create a conflict, so here # each fn is unique per agent → no operation conflicts expected. assert isinstance(data["conflicts"], list) def test_all_three_passes_combined_under_3s(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Combined forecast across all three passes with realistic workload.""" from muse.core.coordination import create_intent, create_reservation # 20 agents, 5 shared addresses (overlap), 5 unique. shared = [f"shared.py::fn{i}" for i in range(5)] for i in range(20): res = create_reservation( repo, run_id=f"ag-{i}", branch=f"b{i % 4}", addresses=[shared[i % 5], f"unique{i}.py::fn"], ttl_seconds=3600, ) if i % 3 == 0: create_intent( repo, res.reservation_id, f"ag-{i}", f"b{i % 4}", [shared[i % 5]], "delete" if i % 6 == 0 else "modify", "", ) with ( patch("muse.cli.commands.forecast.resolve_commit_ref", return_value=None), ): t0 = time.monotonic() data = _run_forecast_json(repo, capsys=capsys) elapsed = time.monotonic() - t0 assert elapsed < 3.0, f"Combined forecast took {elapsed:.2f}s — too slow" assert data["active_reservations"] == 20 def test_500_reservations_500_intents_under_5s(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Large swarm: 500 reservations + 500 intents across all three passes.""" from muse.core.coordination import create_intent, create_reservation N = 500 shared_addresses = [f"src/hot{i % 20}.py::fn" for i in range(20)] for i in range(N): res = create_reservation( repo, run_id=f"ag-{i}", branch=f"b{i % 10}", addresses=[shared_addresses[i % 20], f"unique{i}.py::fn"], ttl_seconds=3600, ) op = "delete" if i % 50 == 0 else "modify" create_intent( repo, res.reservation_id, f"ag-{i}", f"b{i % 10}", [shared_addresses[i % 20]], op, "", ) with patch("muse.cli.commands.forecast.resolve_commit_ref", return_value=None): t0 = time.monotonic() data = _run_forecast_json(repo, capsys=capsys) elapsed = time.monotonic() - t0 assert elapsed < 5.0, f"500-agent forecast took {elapsed:.2f}s — too slow" assert data["active_reservations"] == N assert data["partial_forecast"] is True # ───────────────────────────────────────────────────────────────────────────── # Input validation tests # ───────────────────────────────────────────────────────────────────────────── class TestForecastInputValidation: def test_run_id_at_max_length_accepted(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: run_id = "a" * _MAX_RUN_ID_LEN data = _run_forecast_json(repo, run_id_filter=run_id, capsys=capsys) assert data["run_id_filter"] == run_id def test_run_id_over_max_length_exits_user_error(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: run_id = "a" * (_MAX_RUN_ID_LEN + 1) code, _ = _run_forecast(repo, run_id_filter=run_id, fmt="text") assert code == ExitCode.USER_ERROR def test_run_id_over_max_json_returns_error_field(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: run_id = "a" * (_MAX_RUN_ID_LEN + 1) _run_forecast(repo, run_id_filter=run_id, fmt="json") out = capsys.readouterr().out.strip() data = json.loads(out) assert "error" in data assert data.get("status") == "bad_args" def test_run_id_error_output_is_compact(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: run_id = "a" * (_MAX_RUN_ID_LEN + 1) _run_forecast(repo, run_id_filter=run_id, fmt="json") out = capsys.readouterr().out.strip() assert "\n" not in out def test_min_confidence_zero_accepted(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: data = _run_forecast_json(repo, min_confidence=0.0, capsys=capsys) assert data["min_confidence"] == 0.0 def test_min_confidence_one_accepted(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: data = _run_forecast_json(repo, min_confidence=1.0, capsys=capsys) assert data["min_confidence"] == 1.0 def test_min_confidence_above_1_exits_user_error(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: code, _ = _run_forecast(repo, min_confidence=1.5, fmt="text") assert code == ExitCode.USER_ERROR def test_min_confidence_negative_exits_user_error(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: code, _ = _run_forecast(repo, min_confidence=-0.1, fmt="text") assert code == ExitCode.USER_ERROR def test_min_confidence_above_1_json_returns_error_field(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: _run_forecast(repo, min_confidence=1.5, fmt="json") out = capsys.readouterr().out.strip() data = json.loads(out) assert "error" in data assert data.get("status") == "bad_args" def test_min_confidence_error_output_is_compact(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: _run_forecast(repo, min_confidence=99.0, fmt="json") out = capsys.readouterr().out.strip() assert "\n" not in out # ───────────────────────────────────────────────────────────────────────────── # --run-id filter tests # ───────────────────────────────────────────────────────────────────────────── class TestForecastRunIdFilter: def test_run_id_filter_keeps_only_matching_conflicts(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_reservation create_reservation(repo, run_id="ag-target", branch="b1", addresses=["x.py::fn"], ttl_seconds=3600) create_reservation(repo, run_id="ag-other", branch="b2", addresses=["x.py::fn"], ttl_seconds=3600) create_reservation(repo, run_id="ag-third", branch="b3", addresses=["y.py::fn"], ttl_seconds=3600) create_reservation(repo, run_id="ag-fourth", branch="b4", addresses=["y.py::fn"], ttl_seconds=3600) # Without filter: 2 conflicts (x.py and y.py). all_data = _run_forecast_json(repo, capsys=capsys) assert len(all_data["conflicts"]) == 2 # Filter to ag-target: only x.py conflict (ag-target is not on y.py). filtered = _run_forecast_json(repo, run_id_filter="ag-target", capsys=capsys) assert all( any(a.startswith("ag-target@") for a in c["agents"]) for c in filtered["conflicts"] ) def test_run_id_filter_unknown_agent_returns_empty(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_reservation create_reservation(repo, run_id="ag-1", branch="b1", addresses=["x.py::fn"], ttl_seconds=3600) create_reservation(repo, run_id="ag-2", branch="b2", addresses=["x.py::fn"], ttl_seconds=3600) data = _run_forecast_json(repo, run_id_filter="no-such-agent", capsys=capsys) assert data["conflicts"] == [] def test_run_id_filter_does_not_change_reservation_count(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_reservation create_reservation(repo, run_id="ag-1", branch="b1", addresses=["x.py::fn"], ttl_seconds=3600) create_reservation(repo, run_id="ag-2", branch="b2", addresses=["x.py::fn"], ttl_seconds=3600) data = _run_forecast_json(repo, run_id_filter="ag-1", capsys=capsys) # Reservation count reflects the full filtered-by-branch set, not by run_id. assert data["active_reservations"] == 2 def test_run_id_filter_in_json_field(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: data = _run_forecast_json(repo, run_id_filter="my-agent", capsys=capsys) assert data["run_id_filter"] == "my-agent" def test_run_id_filter_null_when_not_set(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: data = _run_forecast_json(repo, capsys=capsys) assert data["run_id_filter"] is None def test_run_id_filter_operation_conflict(self, repo_with_op_conflict: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = repo_with_op_conflict # agent-del is involved in the op conflict; filter to it. data = _run_forecast_json(repo, run_id_filter="agent-del", capsys=capsys) assert any(c["conflict_type"] == "operation_conflict" for c in data["conflicts"]) def test_run_id_filter_excludes_operation_conflict_for_uninvolved_agent( self, repo_with_op_conflict: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: repo = repo_with_op_conflict data = _run_forecast_json(repo, run_id_filter="uninvolved-bot", capsys=capsys) assert data["conflicts"] == [] # ───────────────────────────────────────────────────────────────────────────── # --min-confidence filter tests # ───────────────────────────────────────────────────────────────────────────── class TestForecastMinConfidence: def _setup_all_conflict_types(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Return (address_overlap, operation_conflict) — both detected.""" from muse.core.coordination import create_intent, create_reservation res_a = create_reservation(repo, run_id="ag-1", branch="b1", addresses=["x.py::fn"], ttl_seconds=3600) res_b = create_reservation(repo, run_id="ag-2", branch="b2", addresses=["x.py::fn"], ttl_seconds=3600) create_intent(repo, res_a.reservation_id, "ag-1", "b1", ["x.py::fn"], "delete", "") create_intent(repo, res_b.reservation_id, "ag-2", "b2", ["x.py::fn"], "modify", "") def test_min_confidence_0_shows_all(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: self._setup_all_conflict_types(repo, capsys) data = _run_forecast_json(repo, min_confidence=0.0, capsys=capsys) types = {c["conflict_type"] for c in data["conflicts"]} assert "address_overlap" in types assert "operation_conflict" in types def test_min_confidence_0_9_hides_medium(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: self._setup_all_conflict_types(repo, capsys) # Inject a mock blast_radius_overlap at 0.75. with ( patch("muse.cli.commands.forecast.resolve_commit_ref", return_value=MagicMock(commit_id="abc")), patch("muse.cli.commands.forecast.get_commit_snapshot_manifest", return_value={}), patch("muse.cli.commands.forecast.build_reverse_graph", return_value={}), patch("muse.cli.commands.forecast.transitive_callers", return_value={}), ): data = _run_forecast_json(repo, min_confidence=0.9, capsys=capsys) for c in data["conflicts"]: assert c["confidence"] >= 0.9 def test_min_confidence_at_exact_threshold_included(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: self._setup_all_conflict_types(repo, capsys) # address_overlap has confidence 1.0; 1.0 >= 1.0 → included. data = _run_forecast_json(repo, min_confidence=1.0, capsys=capsys) types = {c["conflict_type"] for c in data["conflicts"]} assert "address_overlap" in types def test_min_confidence_1_hides_operation_conflict(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: self._setup_all_conflict_types(repo, capsys) # operation_conflict has confidence 0.9; 0.9 < 1.0 → hidden. data = _run_forecast_json(repo, min_confidence=1.0, capsys=capsys) assert not any(c["conflict_type"] == "operation_conflict" for c in data["conflicts"]) def test_risk_counts_reflect_filtered_list(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: self._setup_all_conflict_types(repo, capsys) data = _run_forecast_json(repo, min_confidence=0.95, capsys=capsys) # After filtering to >= 0.95, only address_overlap (1.0) remains. assert data["high_risk"] + data["medium_risk"] + data["low_risk"] == len(data["conflicts"]) def test_min_confidence_in_json_field(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: data = _run_forecast_json(repo, min_confidence=0.75, capsys=capsys) assert data["min_confidence"] == 0.75 # ───────────────────────────────────────────────────────────────────────────── # JSON format — compact output + new schema fields # ───────────────────────────────────────────────────────────────────────────── class TestForecastJsonFormat: def test_json_output_is_compact(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """No indent=2 — output must be a single line.""" _run_forecast(repo, fmt="json") out = capsys.readouterr().out.strip() assert "\n" not in out, "JSON output must be compact (no newlines)" def test_partial_forecast_true_when_no_commits(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: with patch("muse.cli.commands.forecast.resolve_commit_ref", return_value=None): data = _run_forecast_json(repo, capsys=capsys) assert data["partial_forecast"] is True def test_partial_forecast_false_when_call_graph_available(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: from muse.core.coordination import create_reservation create_reservation(repo, run_id="ag", branch="main", addresses=["x.py::fn"], ttl_seconds=3600) with ( patch("muse.cli.commands.forecast.resolve_commit_ref", return_value=MagicMock(commit_id="abc")), patch("muse.cli.commands.forecast.get_commit_snapshot_manifest", return_value={}), patch("muse.cli.commands.forecast.build_reverse_graph", return_value={}), ): data = _run_forecast_json(repo, capsys=capsys) assert data["partial_forecast"] is False def test_run_id_filter_field_in_schema(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: data = _run_forecast_json(repo, capsys=capsys) assert "run_id_filter" in data def test_min_confidence_field_in_schema(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: data = _run_forecast_json(repo, capsys=capsys) assert "min_confidence" in data def test_partial_forecast_field_in_schema(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: data = _run_forecast_json(repo, capsys=capsys) assert "partial_forecast" in data def test_all_required_keys_present(self, repo: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: data = _run_forecast_json(repo, capsys=capsys) required = { "schema", "current_branch", "branch_filter", "run_id_filter", "min_confidence", "active_reservations", "intents_count", "call_graph_available", "partial_forecast", "warnings", "conflicts", "high_risk", "medium_risk", "low_risk", "duration_ms", } assert required.issubset(data.keys())