"""Tests for muse coord dag — dependency DAG on reservations. Coverage goals -------------- * Unit — every public function in ``muse.core.dag`` * Integration — add_dependencies → load → build_graph → detect/sort round-trip * CLI — ``muse coord dag`` and ``muse coord reserve --depends-on`` via direct function dispatch and stdout capture * Security — ID validation, path traversal, self-dependency rejection * Stress — large linear chains, wide diamonds, many-node graphs Test conventions ---------------- * Every test receives a fresh repo fixture via ``_make_repo(tmp_path)``. * Active-reservation state is injected by creating real Reservation files via :func:`muse.core.coordination.create_reservation` + the repo fixture. * Time is frozen where predictable timestamps matter. * CLI dispatch calls ``run`` directly with a hand-assembled ``argparse.Namespace``. """ from __future__ import annotations import argparse import datetime import json import pathlib import time import itertools from collections.abc import Generator from contextlib import AbstractContextManager from unittest.mock import MagicMock, patch import pytest from muse.core.coordination import Reservation from muse.core.types import MsgpackValue, content_hash, fake_id from muse.core.dag import ( AdjacencyMap, DependencyRecord, _MAX_DEPS, _dependencies_dir, _validate_id, add_dependencies, build_graph, detect_cycle, ensure_dag_dirs, get_blocking, is_blocked, load_all_dependencies, load_dag, load_dependencies, topological_sort, ) from muse.cli.commands.dag import run as dag_run from muse.cli.commands.reserve import run as reserve_run from muse.core.paths import muse_dir UTC = datetime.timezone.utc _EPOCH = datetime.datetime(2026, 3, 30, 12, 0, 0, tzinfo=UTC) _A = fake_id("dag-node-a") _B = fake_id("dag-node-b") _C = fake_id("dag-node-c") _D = fake_id("dag-node-d") _E = fake_id("dag-node-e") _id_seq = itertools.count() def _new_id() -> str: return content_hash({"seq": next(_id_seq)}) def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Return a minimal muse repo root.""" muse_dir(tmp_path).mkdir(parents=True) return tmp_path def _freeze(ts: datetime.datetime) -> AbstractContextManager[MagicMock]: return patch("muse.core.dag._now_utc", return_value=ts) def _dag_ns(fmt: str = "json", **kwargs: MsgpackValue) -> argparse.Namespace: defaults = { "reservation_id": None, "topo": False, "active_only": False, "json_out": fmt == "json", } defaults.update(kwargs) return argparse.Namespace(**defaults) def _reserve_ns(**kwargs: MsgpackValue) -> argparse.Namespace: defaults = { "addresses": ["billing.py::compute"], "run_id": "agent-1", "ttl": 3600, "operation": None, "json_out": True, "depends_on": [], } defaults.update(kwargs) return argparse.Namespace(**defaults) def _patch_repo(repo: pathlib.Path) -> AbstractContextManager[None]: from contextlib import ExitStack, contextmanager @contextmanager def _ctx() -> Generator[None, None, None]: with ExitStack() as stack: stack.enter_context(patch("muse.cli.commands.reserve.require_repo", return_value=repo)) stack.enter_context(patch("muse.cli.commands.reserve.read_current_branch", return_value="dev")) yield return _ctx() def _patch_dag_repo(repo: pathlib.Path) -> AbstractContextManager[MagicMock]: return patch("muse.cli.commands.dag.require_repo", return_value=repo) # ── _validate_id ───────────────────────────────────────────────────────────── class TestValidateId: """ID validation accepts sha256: content-addressed IDs and rejects everything else.""" def test_accepts_valid_content_id(self) -> None: _validate_id(_A) # no exception def test_accepts_another_content_id(self) -> None: _validate_id(_new_id()) # no exception def test_rejects_empty(self) -> None: with pytest.raises(ValueError): _validate_id("") def test_rejects_plain_string(self) -> None: with pytest.raises(ValueError): _validate_id("not-a-content-id") def test_rejects_uuid4(self) -> None: with pytest.raises(ValueError): _validate_id("aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa") def test_rejects_path_traversal(self) -> None: with pytest.raises(ValueError): _validate_id("../../etc/passwd") def test_rejects_null_byte(self) -> None: with pytest.raises(ValueError): _validate_id("\x00" * 36) def test_rejects_slash(self) -> None: with pytest.raises(ValueError): _validate_id(f"sha256:/{'a' * 63}") # ── DependencyRecord ─────────────────────────────────────────────────────────── class TestDependencyRecord: """DependencyRecord serialisation round-trip.""" def _make(self, **kw: MsgpackValue) -> DependencyRecord: defaults = dict( reservation_id=_A, depends_on=[_B, _C], created_at=_EPOCH, ) defaults.update(kw) return DependencyRecord(**defaults) def test_to_dict_round_trip(self) -> None: rec = self._make() d = rec.to_dict() rec2 = DependencyRecord.from_dict(d) assert rec2.reservation_id == _A assert rec2.depends_on == [_B, _C] def test_from_dict_missing_created_at_defaults_to_now(self) -> None: d = {"reservation_id": _A, "depends_on": [_B]} rec = DependencyRecord.from_dict(d) assert isinstance(rec.created_at, datetime.datetime) def test_from_dict_empty_depends_on(self) -> None: d = {"reservation_id": _A, "depends_on": [], "created_at": _EPOCH.isoformat()} rec = DependencyRecord.from_dict(d) assert rec.depends_on == [] def test_from_dict_non_list_depends_on_defaults_empty(self) -> None: d = {"reservation_id": _A, "depends_on": None, "created_at": _EPOCH.isoformat()} rec = DependencyRecord.from_dict(d) assert rec.depends_on == [] def test_schema_version_in_dict(self) -> None: rec = self._make() d = rec.to_dict() assert "schema_version" in d # ── ensure_dag_dirs / load_all_dependencies ─────────────────────────────────── class TestEnsureDagDirs: def test_creates_directory(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_dag_dirs(repo) assert _dependencies_dir(repo).is_dir() def test_idempotent(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_dag_dirs(repo) ensure_dag_dirs(repo) def test_empty_dir_returns_empty_list(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) assert load_all_dependencies(repo) == [] def test_non_existent_dir_returns_empty_list(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) assert load_all_dependencies(repo) == [] def test_skips_corrupt_file(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_dag_dirs(repo) (_dependencies_dir(repo) / f"{_A}.json").write_text("NOT JSON") assert load_all_dependencies(repo) == [] # ── load_dependencies ───────────────────────────────────────────────────────── class TestLoadDependencies: def test_returns_none_for_missing(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_dag_dirs(repo) assert load_dependencies(repo, _A) is None def test_invalid_id_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): load_dependencies(repo, "not-a-content-id") def test_returns_record_after_add(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): add_dependencies(repo, _A, [_B]) rec = load_dependencies(repo, _A) assert rec is not None assert rec.depends_on == [_B] # ── build_graph ──────────────────────────────────────────────────────────────── class TestBuildGraph: def _rec(self, rid: str, deps: list[str]) -> DependencyRecord: return DependencyRecord(rid, deps, _EPOCH) def test_empty_input(self) -> None: g = build_graph([]) assert g == {} def test_single_node_no_deps(self) -> None: g = build_graph([self._rec(_A, [])]) assert g[_A] == set() def test_adds_dependency_nodes(self) -> None: g = build_graph([self._rec(_A, [_B])]) assert _A in g assert _B in g assert _B in g[_A] def test_diamond_shape(self) -> None: # A → B, A → C, B → D, C → D g = build_graph([ self._rec(_A, [_B, _C]), self._rec(_B, [_D]), self._rec(_C, [_D]), ]) assert g[_A] == {_B, _C} assert g[_B] == {_D} assert g[_C] == {_D} assert g[_D] == set() def test_leaf_nodes_included(self) -> None: g = build_graph([self._rec(_A, [_B, _C])]) assert _B in g and g[_B] == set() assert _C in g and g[_C] == set() # ── detect_cycle ─────────────────────────────────────────────────────────────── class TestDetectCycle: def test_empty_graph_no_cycle(self) -> None: assert detect_cycle({}) is None def test_single_node_no_cycle(self) -> None: assert detect_cycle({_A: set()}) is None def test_linear_chain_no_cycle(self) -> None: g = {_A: {_B}, _B: {_C}, _C: set()} assert detect_cycle(g) is None def test_direct_self_loop(self) -> None: g = {_A: {_A}} cycle = detect_cycle(g) assert cycle is not None assert _A in cycle def test_two_node_cycle(self) -> None: g = {_A: {_B}, _B: {_A}} cycle = detect_cycle(g) assert cycle is not None assert len(cycle) >= 2 def test_three_node_cycle(self) -> None: g = {_A: {_B}, _B: {_C}, _C: {_A}} cycle = detect_cycle(g) assert cycle is not None assert len(cycle) >= 3 def test_diamond_no_cycle(self) -> None: # A → B, A → C, B → D, C → D (fan-in is fine) g = {_A: {_B, _C}, _B: {_D}, _C: {_D}, _D: set()} assert detect_cycle(g) is None def test_cycle_path_starts_and_ends_same(self) -> None: g = {_A: {_B}, _B: {_C}, _C: {_A}} cycle = detect_cycle(g) assert cycle is not None assert cycle[0] == cycle[-1] def test_unrelated_subgraph_no_cycle(self) -> None: # A → B (no cycle), C → D (no cycle) g = {_A: {_B}, _B: set(), _C: {_D}, _D: set()} assert detect_cycle(g) is None # ── topological_sort ─────────────────────────────────────────────────────────── class TestTopologicalSort: def test_empty_graph(self) -> None: result = topological_sort({}) assert result == [] def test_single_node(self) -> None: result = topological_sort({_A: set()}) assert result == [_A] def test_linear_chain_order(self) -> None: # A → B → C: C should come first (fewest deps), then B, then A g = {_A: {_B}, _B: {_C}, _C: set()} result = topological_sort(g) assert result.index(_C) < result.index(_B) < result.index(_A) def test_diamond_order(self) -> None: # D (leaf) → B, C (intermediate) → A (top) g = {_A: {_B, _C}, _B: {_D}, _C: {_D}, _D: set()} result = topological_sort(g) assert result.index(_D) < result.index(_B) assert result.index(_D) < result.index(_C) assert result.index(_B) < result.index(_A) assert result.index(_C) < result.index(_A) def test_all_nodes_present(self) -> None: g = {_A: {_B}, _B: {_C}, _C: set()} result = topological_sort(g) assert set(result) == {_A, _B, _C} def test_subset_nodes(self) -> None: g = {_A: {_B}, _B: {_C}, _C: set(), _D: set()} # Only ask for A's subgraph result = topological_sort(g, nodes=[_A]) assert _A in result and _B in result and _C in result # D is unrelated — may or may not be included; just check order assert result.index(_C) < result.index(_B) < result.index(_A) def test_cycle_raises(self) -> None: g = {_A: {_B}, _B: {_A}} with pytest.raises(ValueError, match="cycle"): topological_sort(g) def test_independent_nodes_all_returned(self) -> None: g = {_A: set(), _B: set(), _C: set()} result = topological_sort(g) assert set(result) == {_A, _B, _C} # ── is_blocked / get_blocking ────────────────────────────────────────────────── class TestIsBlockedAndGetBlocking: def test_no_deps_not_blocked(self) -> None: g = {_A: set()} assert is_blocked(_A, g, frozenset({_B})) is False def test_dep_inactive_not_blocked(self) -> None: g = {_A: {_B}} # B is not in active_ids → unblocked assert is_blocked(_A, g, frozenset()) is False def test_dep_active_is_blocked(self) -> None: g = {_A: {_B}} assert is_blocked(_A, g, frozenset({_B})) is True def test_multiple_deps_one_active(self) -> None: g = {_A: {_B, _C}} assert is_blocked(_A, g, frozenset({_B})) is True def test_unknown_node_not_blocked(self) -> None: g = {} assert is_blocked(_A, g, frozenset({_B})) is False def test_get_blocking_empty(self) -> None: g = {_A: {_B}} assert get_blocking(_A, g, frozenset()) == [] def test_get_blocking_returns_active_deps(self) -> None: g = {_A: {_B, _C}} blocking = get_blocking(_A, g, frozenset({_B})) assert blocking == [_B] def test_get_blocking_sorted(self) -> None: g = {_A: {_B, _C, _D}} blocking = get_blocking(_A, g, frozenset({_C, _B})) assert blocking == sorted([_B, _C]) # ── add_dependencies ─────────────────────────────────────────────────────────── class TestAddDependencies: def test_creates_file_on_disk(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _A, [_B]) dep_file = _dependencies_dir(repo) / f"{_A}.json" assert dep_file.is_file() def test_file_is_valid_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _A, [_B]) raw = json.loads((_dependencies_dir(repo) / f"{_A}.json").read_text()) assert raw["reservation_id"] == _A assert _B in raw["depends_on"] def test_returns_correct_record(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) rec = add_dependencies(repo, _A, [_B, _C]) assert rec.reservation_id == _A assert set(rec.depends_on) == {_B, _C} def test_deduplicates_deps(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) rec = add_dependencies(repo, _A, [_B, _B, _B]) assert rec.depends_on == [_B] def test_empty_deps_accepted(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) rec = add_dependencies(repo, _A, []) assert rec.depends_on == [] def test_write_once_raises_file_exists(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _A, [_B]) with pytest.raises(FileExistsError): add_dependencies(repo, _A, [_C]) def test_self_dependency_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError, match="itself"): add_dependencies(repo, _A, [_A]) def test_invalid_reservation_id_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): add_dependencies(repo, "not-a-content-id", [_B]) def test_invalid_dep_id_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): add_dependencies(repo, _A, ["../../etc/passwd"]) def test_too_many_deps_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) many = [_new_id() for _ in range(_MAX_DEPS + 1)] with pytest.raises(ValueError, match="too many"): add_dependencies(repo, _A, many) def test_max_deps_accepted(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) many = [_new_id() for _ in range(_MAX_DEPS)] rec = add_dependencies(repo, _A, many) assert len(rec.depends_on) == _MAX_DEPS def test_rejects_cycle_two_node(self, tmp_path: pathlib.Path) -> None: """A → B then B → A must be rejected.""" repo = _make_repo(tmp_path) add_dependencies(repo, _A, [_B]) with pytest.raises(ValueError, match="cycle"): add_dependencies(repo, _B, [_A]) def test_rejects_cycle_three_node(self, tmp_path: pathlib.Path) -> None: """A → B → C → A must be rejected at the third edge.""" repo = _make_repo(tmp_path) add_dependencies(repo, _A, [_B]) add_dependencies(repo, _B, [_C]) with pytest.raises(ValueError, match="cycle"): add_dependencies(repo, _C, [_A]) def test_allows_diamond(self, tmp_path: pathlib.Path) -> None: """A → B, A → C, B → D, C → D is a valid DAG (fan-in, not a cycle).""" repo = _make_repo(tmp_path) add_dependencies(repo, _A, [_B, _C]) add_dependencies(repo, _B, [_D]) add_dependencies(repo, _C, [_D]) # must not raise g = load_dag(repo) assert detect_cycle(g) is None # ── load_dag ─────────────────────────────────────────────────────────────────── class TestLoadDag: def test_empty_repo(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) assert load_dag(repo) == {} def test_single_record(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _A, [_B]) g = load_dag(repo) assert g[_A] == {_B} assert _B in g def test_multiple_records(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _A, [_B]) add_dependencies(repo, _B, [_C]) g = load_dag(repo) assert g[_A] == {_B} assert g[_B] == {_C} # ── Integration: full lifecycle ──────────────────────────────────────────────── class TestIntegration: """End-to-end: create deps → load → compute order → check blocked.""" def test_pipeline_ordering(self, tmp_path: pathlib.Path) -> None: """Five-stage pipeline: E→D, D→C, C→B, B→A. Topo order: A,B,C,D,E.""" repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) add_dependencies(repo, _C, [_B]) add_dependencies(repo, _D, [_C]) add_dependencies(repo, _E, [_D]) g = load_dag(repo) assert detect_cycle(g) is None order = topological_sort(g) assert order.index(_A) < order.index(_B) assert order.index(_B) < order.index(_C) assert order.index(_C) < order.index(_D) assert order.index(_D) < order.index(_E) def test_blocked_status_with_active_reservations(self, tmp_path: pathlib.Path) -> None: """B depends on A; A is active → B is blocked; A is active → not blocked.""" repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) g = load_dag(repo) # A is still active assert is_blocked(_B, g, frozenset({_A})) is True assert get_blocking(_B, g, frozenset({_A})) == [_A] # A has been released assert is_blocked(_B, g, frozenset()) is False assert get_blocking(_B, g, frozenset()) == [] def test_unblocked_when_no_deps(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _A, []) g = load_dag(repo) assert is_blocked(_A, g, frozenset({_B, _C})) is False def test_diamond_blocked_status(self, tmp_path: pathlib.Path) -> None: """A → B, A → C, B → D, C → D. When D is active, B and C are blocked.""" repo = _make_repo(tmp_path) add_dependencies(repo, _A, [_B, _C]) add_dependencies(repo, _B, [_D]) add_dependencies(repo, _C, [_D]) g = load_dag(repo) active = frozenset({_D}) # is_blocked checks DIRECT deps only. # A's direct deps are B and C — neither is in active_ids{D}. assert is_blocked(_A, g, active) is False assert is_blocked(_B, g, active) is True # B depends on D (active) assert is_blocked(_C, g, active) is True # C depends on D (active) assert is_blocked(_D, g, active) is False # D has no deps # ── Security tests ───────────────────────────────────────────────────────────── class TestSecurity: def test_path_traversal_in_add_reservation_id(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): add_dependencies(repo, "../../etc/passwd", [_B]) def test_path_traversal_in_dep_id(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): add_dependencies(repo, _A, ["../../harm"]) def test_null_byte_in_reservation_id(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): add_dependencies(repo, "\x00" * 36, [_B]) def test_null_byte_in_dep_id(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): add_dependencies(repo, _A, ["\x00" * 36]) def test_path_traversal_in_load_dependencies(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): load_dependencies(repo, "../../etc/shadow") def test_cycle_cannot_be_written(self, tmp_path: pathlib.Path) -> None: """Even under rapid concurrent conditions, a cycle must never reach disk.""" repo = _make_repo(tmp_path) add_dependencies(repo, _A, [_B]) add_dependencies(repo, _B, [_C]) # This would create A→B→C→A with pytest.raises(ValueError, match="cycle"): add_dependencies(repo, _C, [_A]) # Verify C has no record on disk assert (_dependencies_dir(repo) / f"{_C}.json").exists() is False # ── CLI: muse coord dag ──────────────────────────────────────────────────────── class TestCliDag: def _make_deps(self, repo: pathlib.Path) -> None: add_dependencies(repo, _B, [_A]) add_dependencies(repo, _C, [_B]) def test_full_dag_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) self._make_deps(repo) args = _dag_ns(fmt="json") with _patch_dag_repo(repo): dag_run(args) out = json.loads(capsys.readouterr().out) assert out["total_nodes"] == 3 assert out["total_edges"] == 2 assert "nodes" in out def test_full_dag_text(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) self._make_deps(repo) args = _dag_ns(fmt="text") with _patch_dag_repo(repo): dag_run(args) out = capsys.readouterr().out assert "Dependency DAG" in out def test_empty_dag_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _dag_ns(fmt="json") with _patch_dag_repo(repo): dag_run(args) out = json.loads(capsys.readouterr().out) assert out["total_nodes"] == 0 assert out["nodes"] == [] def test_single_reservation_mode_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args = _dag_ns(fmt="json", reservation_id=_B) with _patch_dag_repo(repo): dag_run(args) out = json.loads(capsys.readouterr().out) assert out["reservation_id"] == _B assert _A in out["depends_on"] def test_single_reservation_mode_text(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args = _dag_ns(fmt="text", reservation_id=_B) with _patch_dag_repo(repo): dag_run(args) out = capsys.readouterr().out # Should show the reservation ID prefix assert _B[:8] in out def test_cycle_detected_exits_1(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """A cycle on disk causes exit code 1.""" repo = _make_repo(tmp_path) # Manually write a cycle to disk, bypassing add_dependencies validation ensure_dag_dirs(repo) import json as _json from muse._version import __version__ rec_a = {"schema_version": __version__, "reservation_id": _A, "depends_on": [_B], "created_at": _EPOCH.isoformat()} rec_b = {"schema_version": __version__, "reservation_id": _B, "depends_on": [_A], "created_at": _EPOCH.isoformat()} (_dependencies_dir(repo) / f"{_A}.json").write_text(_json.dumps(rec_a)) (_dependencies_dir(repo) / f"{_B}.json").write_text(_json.dumps(rec_b)) args = _dag_ns(fmt="json") with _patch_dag_repo(repo): with pytest.raises(SystemExit) as exc: dag_run(args) assert exc.value.code == 1 def test_blocked_count_in_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """A reservation with an active dependency must show blocked_count > 0.""" repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) # Inject _A as an active reservation via patching active_reservations. from muse.core.coordination import Reservation fake_res = Reservation( reservation_id=_A, run_id="agent-a", branch="dev", addresses=["foo.py::bar"], created_at=_EPOCH, expires_at=_EPOCH + datetime.timedelta(hours=1), operation=None, ) with _patch_dag_repo(repo), \ patch("muse.cli.commands.dag.active_reservations", return_value=[fake_res]): args = _dag_ns(fmt="json") dag_run(args) out = json.loads(capsys.readouterr().out) assert out["blocked_count"] == 1 def test_topo_index_in_nodes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args = _dag_ns(fmt="json") with _patch_dag_repo(repo): dag_run(args) out = json.loads(capsys.readouterr().out) indices = [n["topo_index"] for n in out["nodes"]] assert all(i is not None for i in indices) # A must have lower index than B (A has no deps, B depends on A) a_node = next(n for n in out["nodes"] if n["reservation_id"] == _A) b_node = next(n for n in out["nodes"] if n["reservation_id"] == _B) assert a_node["topo_index"] < b_node["topo_index"] def test_ansi_not_in_text_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Node IDs stored on disk must not produce raw ANSI in text output.""" repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args = _dag_ns(fmt="text") with _patch_dag_repo(repo): dag_run(args) out = capsys.readouterr().out assert "\x1b" not in out # ── CLI: muse coord reserve --depends-on ────────────────────────────────────── class TestReserveWithDependsOn: """reserve --depends-on writes both a reservation and a dependency record.""" def test_reserve_without_depends_on(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _reserve_ns(depends_on=[], addresses=["foo.py::bar"]) with _patch_repo(repo): reserve_run(args) out = json.loads(capsys.readouterr().out) assert out["depends_on"] == [] assert out["dependency_error"] is None def test_reserve_with_single_dep(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _reserve_ns(depends_on=[_A], addresses=["foo.py::bar"]) with _patch_repo(repo): reserve_run(args) out = json.loads(capsys.readouterr().out) assert _A in out["depends_on"] # Verify file exists on disk res_id = out["reservation_id"] dep_rec = load_dependencies(repo, res_id) assert dep_rec is not None assert _A in dep_rec.depends_on def test_reserve_with_multiple_deps(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _reserve_ns(depends_on=[_A, _B], addresses=["foo.py::bar"]) with _patch_repo(repo): reserve_run(args) out = json.loads(capsys.readouterr().out) assert set(out["depends_on"]) == {_A, _B} def test_reserve_invalid_dep_id_exits_1(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Invalid content ID in --depends-on → dependency_error set, exit 1.""" repo = _make_repo(tmp_path) args = _reserve_ns(depends_on=["not-a-content-id"], addresses=["foo.py::bar"]) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: reserve_run(args) assert exc.value.code == 1 out = json.loads(capsys.readouterr().out) assert out["dependency_error"] is not None def test_reserve_dep_creates_dag_record(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """The reservation ID from --depends-on is reflected in load_dag.""" repo = _make_repo(tmp_path) # First reservation (will be the dependency) args1 = _reserve_ns(depends_on=[], addresses=["a.py::fn"]) with _patch_repo(repo): reserve_run(args1) dep_id = json.loads(capsys.readouterr().out)["reservation_id"] # Second reservation depends on the first args2 = _reserve_ns(depends_on=[dep_id], addresses=["b.py::fn"]) with _patch_repo(repo): reserve_run(args2) new_id = json.loads(capsys.readouterr().out)["reservation_id"] g = load_dag(repo) assert dep_id in g[new_id] # ── Stress tests ─────────────────────────────────────────────────────────────── class TestStress: def test_linear_chain_100(self, tmp_path: pathlib.Path) -> None: """100-node chain A0 → A1 → … → A99: no cycle, correct topo order.""" repo = _make_repo(tmp_path) ids = [_new_id() for _ in range(100)] for i in range(1, len(ids)): add_dependencies(repo, ids[i], [ids[i - 1]]) g = load_dag(repo) assert detect_cycle(g) is None order = topological_sort(g) # ids[0] must come before ids[1] must come before … ids[99] for i in range(1, len(ids)): assert order.index(ids[i - 1]) < order.index(ids[i]) def test_wide_diamond_50(self, tmp_path: pathlib.Path) -> None: """1 root, 50 middle nodes, 1 leaf: fan-out then fan-in.""" repo = _make_repo(tmp_path) root_id = _new_id() leaf_id = _new_id() middles = [_new_id() for _ in range(50)] # root → each middle for m in middles: add_dependencies(repo, m, [root_id]) # leaf → each middle (leaf depends on all 50) add_dependencies(repo, leaf_id, middles) g = load_dag(repo) assert detect_cycle(g) is None order = topological_sort(g) assert order.index(root_id) < order.index(middles[0]) assert order.index(middles[-1]) < order.index(leaf_id) def test_add_500_independent_deps(self, tmp_path: pathlib.Path) -> None: """500 reservations each with one independent dependency — quick scan.""" repo = _make_repo(tmp_path) start = time.monotonic() for _ in range(500): add_dependencies(repo, _new_id(), [_new_id()]) elapsed = time.monotonic() - start assert elapsed < 10.0, f"500 independent adds took {elapsed:.2f}s" def test_load_dag_500_nodes(self, tmp_path: pathlib.Path) -> None: """load_dag on 500 records must complete quickly.""" repo = _make_repo(tmp_path) for _ in range(500): add_dependencies(repo, _new_id(), [_new_id()]) start = time.monotonic() g = load_dag(repo) elapsed = time.monotonic() - start assert len(g) >= 500 assert elapsed < 5.0, f"load_dag 500 nodes took {elapsed:.2f}s" def test_topo_sort_1000_node_chain(self) -> None: """In-memory topo sort of 1000-node linear chain is fast.""" ids = [_new_id() for _ in range(1000)] graph: AdjacencyMap = {} for i, rid in enumerate(ids): graph[rid] = {ids[i - 1]} if i > 0 else set() start = time.monotonic() order = topological_sort(graph) elapsed = time.monotonic() - start assert len(order) == 1000 assert elapsed < 1.0, f"topo sort 1000 nodes took {elapsed:.2f}s" def test_detect_cycle_1000_node_acyclic(self) -> None: """Cycle detection on a 1000-node chain (no cycle) must be fast.""" ids = [_new_id() for _ in range(1000)] graph: AdjacencyMap = {} for i, rid in enumerate(ids): graph[rid] = {ids[i - 1]} if i > 0 else set() start = time.monotonic() result = detect_cycle(graph) elapsed = time.monotonic() - start assert result is None assert elapsed < 1.0, f"detect_cycle 1000 nodes took {elapsed:.2f}s" def test_dag_json_200_active_nodes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """CLI dag with 200 nodes completes quickly.""" repo = _make_repo(tmp_path) ids = [_new_id() for _ in range(200)] for i in range(1, len(ids)): add_dependencies(repo, ids[i], [ids[i - 1]]) args = _dag_ns(fmt="json") start = time.monotonic() with _patch_dag_repo(repo): dag_run(args) elapsed = time.monotonic() - start out = json.loads(capsys.readouterr().out) assert out["total_nodes"] == 200 assert elapsed < 5.0, f"200-node dag JSON took {elapsed:.2f}s" # ── New: input validation ────────────────────────────────────────────────────── class TestCliDagInputValidation: """ID validation on --reservation-id fires before any file I/O.""" def test_valid_content_id_accepted_single_mode(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args = _dag_ns(fmt="json", reservation_id=_B) with _patch_dag_repo(repo): dag_run(args) out = json.loads(capsys.readouterr().out) assert out["reservation_id"] == _B def test_invalid_id_exits_1_text(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _dag_ns(fmt="text", reservation_id="not-a-content-id") with _patch_dag_repo(repo): with pytest.raises(SystemExit) as exc: dag_run(args) assert exc.value.code == 1 err = capsys.readouterr().err assert "❌" in err def test_invalid_id_exits_1_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _dag_ns(fmt="json", reservation_id="not-a-content-id") with _patch_dag_repo(repo): with pytest.raises(SystemExit) as exc: dag_run(args) assert exc.value.code == 1 out = json.loads(capsys.readouterr().out) assert "error" in out assert out["status"] == "bad_reservation_id" def test_path_traversal_exits_1(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _dag_ns(fmt="json", reservation_id="../../etc/passwd") with _patch_dag_repo(repo): with pytest.raises(SystemExit) as exc: dag_run(args) assert exc.value.code == 1 def test_null_byte_in_reservation_id_exits_1(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _dag_ns(fmt="json", reservation_id="\x00" * 36) with _patch_dag_repo(repo): with pytest.raises(SystemExit) as exc: dag_run(args) assert exc.value.code == 1 def test_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None: """require_repo must never be called when --reservation-id is invalid.""" repo = _make_repo(tmp_path) args = _dag_ns(fmt="json", reservation_id="bad-id") require_calls: list[bool] = [] def _fake_require() -> pathlib.Path: require_calls.append(True) return repo with patch("muse.cli.commands.dag.require_repo", side_effect=_fake_require): with pytest.raises(SystemExit): dag_run(args) assert require_calls == [], "require_repo was called before validation" def test_none_reservation_id_shows_full_dag(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args = _dag_ns(fmt="json", reservation_id=None) with _patch_dag_repo(repo): dag_run(args) out = json.loads(capsys.readouterr().out) assert "nodes" in out assert out["total_nodes"] > 0 def test_json_error_is_compact_single_line(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """JSON error output must be on a single line (compact, no indent).""" repo = _make_repo(tmp_path) args = _dag_ns(fmt="json", reservation_id="bad") with _patch_dag_repo(repo): with pytest.raises(SystemExit): dag_run(args) raw = capsys.readouterr().out.strip() assert "\n" not in raw data = json.loads(raw) assert "error" in data # ── New: compact JSON output ─────────────────────────────────────────────────── class TestCliDagCompactJson: """All JSON output must be compact (no indent=2).""" def test_full_dag_json_is_single_line(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args = _dag_ns(fmt="json") with _patch_dag_repo(repo): dag_run(args) raw = capsys.readouterr().out.strip() assert "\n" not in raw json.loads(raw) # must be valid JSON def test_single_mode_json_is_single_line(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args = _dag_ns(fmt="json", reservation_id=_B) with _patch_dag_repo(repo): dag_run(args) raw = capsys.readouterr().out.strip() assert "\n" not in raw json.loads(raw) def test_empty_dag_json_is_single_line(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _dag_ns(fmt="json") with _patch_dag_repo(repo): dag_run(args) raw = capsys.readouterr().out.strip() assert "\n" not in raw data = json.loads(raw) assert data["total_nodes"] == 0 def test_full_dag_json_schema_keys(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args = _dag_ns(fmt="json") with _patch_dag_repo(repo): dag_run(args) out = json.loads(capsys.readouterr().out) for key in ("total_nodes", "total_edges", "blocked_count", "active_only", "cycle", "nodes"): assert key in out, f"missing key: {key}" def test_single_mode_json_schema_keys(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args = _dag_ns(fmt="json", reservation_id=_B) with _patch_dag_repo(repo): dag_run(args) out = json.loads(capsys.readouterr().out) for key in ("reservation_id", "depends_on", "active", "blocked", "blocking", "cycle"): assert key in out, f"missing key: {key}" def test_active_only_field_in_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _dag_ns(fmt="json", active_only=False) with _patch_dag_repo(repo): dag_run(args) out = json.loads(capsys.readouterr().out) assert out["active_only"] is False def test_active_only_true_reflected_in_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _dag_ns(fmt="json", active_only=True) with _patch_dag_repo(repo): dag_run(args) out = json.loads(capsys.readouterr().out) assert out["active_only"] is True # ── New: --active-only flag ──────────────────────────────────────────────────── class TestCliDagActiveOnly: """--active-only restricts the graph to currently active reservations.""" def _make_fake_active(self, reservation_id: str) -> Reservation: return Reservation( reservation_id=reservation_id, run_id="agent-test", branch="dev", addresses=["foo.py::bar"], created_at=_EPOCH, expires_at=_EPOCH + datetime.timedelta(hours=1), operation=None, ) def test_active_only_false_shows_all_nodes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args = _dag_ns(fmt="json", active_only=False) with _patch_dag_repo(repo): dag_run(args) out = json.loads(capsys.readouterr().out) # Both _A and _B must be in graph (neither is active, no filter) node_ids = {n["reservation_id"] for n in out["nodes"]} assert _A in node_ids and _B in node_ids def test_active_only_true_empty_active_set(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args = _dag_ns(fmt="json", active_only=True) with _patch_dag_repo(repo), \ patch("muse.cli.commands.dag.active_reservations", return_value=[]): dag_run(args) out = json.loads(capsys.readouterr().out) # No active reservations → empty graph assert out["total_nodes"] == 0 assert out["nodes"] == [] def test_active_only_filters_to_active_nodes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) add_dependencies(repo, _C, [_B]) # Only _B is active fake_active = [self._make_fake_active(_B)] args = _dag_ns(fmt="json", active_only=True) with _patch_dag_repo(repo), \ patch("muse.cli.commands.dag.active_reservations", return_value=fake_active): dag_run(args) out = json.loads(capsys.readouterr().out) node_ids = {n["reservation_id"] for n in out["nodes"]} assert _B in node_ids assert _A not in node_ids assert _C not in node_ids def test_active_only_all_active_shows_all(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) fake_active = [self._make_fake_active(_A), self._make_fake_active(_B)] args = _dag_ns(fmt="json", active_only=True) with _patch_dag_repo(repo), \ patch("muse.cli.commands.dag.active_reservations", return_value=fake_active): dag_run(args) out = json.loads(capsys.readouterr().out) node_ids = {n["reservation_id"] for n in out["nodes"]} assert _A in node_ids and _B in node_ids def test_active_only_reflected_true_in_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _dag_ns(fmt="json", active_only=True) with _patch_dag_repo(repo), \ patch("muse.cli.commands.dag.active_reservations", return_value=[]): dag_run(args) out = json.loads(capsys.readouterr().out) assert out["active_only"] is True def test_active_only_text_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) add_dependencies(repo, _C, [_B]) # Only _B is active — _A and _C are filtered from graph nodes fake_active = [self._make_fake_active(_B)] args = _dag_ns(fmt="json", active_only=True) with _patch_dag_repo(repo), \ patch("muse.cli.commands.dag.active_reservations", return_value=fake_active): dag_run(args) out = json.loads(capsys.readouterr().out) node_ids = {n["reservation_id"] for n in out["nodes"]} # Only _B passes the active-only filter assert _B in node_ids assert _C not in node_ids assert _A not in node_ids assert out["total_nodes"] == 1 # ── New: --topo flag ─────────────────────────────────────────────────────────── class TestCliDagTopoFlag: """--topo adds a TOPO column; without it a flat table is shown.""" def test_without_topo_no_topo_column(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args = _dag_ns(fmt="text", topo=False) with _patch_dag_repo(repo): dag_run(args) out = capsys.readouterr().out # flat table has STATUS column but no TOPO column header assert "STATUS" in out assert "TOPO" not in out def test_with_topo_shows_topo_column(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args = _dag_ns(fmt="text", topo=True) with _patch_dag_repo(repo): dag_run(args) out = capsys.readouterr().out assert "TOPO" in out assert "STATUS" in out def test_topo_flag_does_not_affect_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """JSON output is identical regardless of --topo (topo_index always present).""" repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args_with = _dag_ns(fmt="json", topo=True) with _patch_dag_repo(repo): dag_run(args_with) out_with = json.loads(capsys.readouterr().out) args_without = _dag_ns(fmt="json", topo=False) with _patch_dag_repo(repo): dag_run(args_without) out_without = json.loads(capsys.readouterr().out) # Both must have topo_index on every node for node in out_with["nodes"]: assert node["topo_index"] is not None for node in out_without["nodes"]: assert node["topo_index"] is not None def test_without_topo_nodes_shown_in_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args = _dag_ns(fmt="text", topo=False) with _patch_dag_repo(repo): dag_run(args) out = capsys.readouterr().out # Node IDs (truncated) must appear assert _B[:8] in out def test_with_topo_node_id_in_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args = _dag_ns(fmt="text", topo=True) with _patch_dag_repo(repo): dag_run(args) out = capsys.readouterr().out assert _B[:8] in out def test_topo_order_correct_in_text(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """With --topo, nodes with no deps must appear before dependents.""" repo = _make_repo(tmp_path) add_dependencies(repo, _B, [_A]) args = _dag_ns(fmt="text", topo=True) with _patch_dag_repo(repo): dag_run(args) out = capsys.readouterr().out # _A has lower topo index → must appear before _B in output pos_a = out.find(_A[:8]) pos_b = out.find(_B[:8]) assert pos_a < pos_b, "dependency must appear before dependent in topo mode" def test_empty_dag_topo_no_crash(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _dag_ns(fmt="text", topo=True) with _patch_dag_repo(repo): dag_run(args) out = capsys.readouterr().out assert "no dependency records" in out # --------------------------------------------------------------------------- # TestRegisterFlags — --json / -j normalized at argparse level # --------------------------------------------------------------------------- class TestRegisterFlags: """register() must expose --json with -j shorthand and dest=json_out.""" def _make_parser(self) -> "argparse.ArgumentParser": import argparse as ap from muse.cli.commands.dag import register root = ap.ArgumentParser() subs = root.add_subparsers() register(subs) return root def test_json_out_default_false(self) -> None: p = self._make_parser() ns = p.parse_args(['dag']) assert ns.json_out is False def test_json_out_true_with_json_flag(self) -> None: p = self._make_parser() ns = p.parse_args(['dag', '--json']) assert ns.json_out is True def test_json_out_true_with_j_flag(self) -> None: p = self._make_parser() ns = p.parse_args(['dag', '-j']) assert ns.json_out is True