gabriel / muse public
test_cmd_dag.py python
1,323 lines 52.4 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
1 """Tests for muse coord dag — dependency DAG on reservations.
2
3 Coverage goals
4 --------------
5 * Unit — every public function in ``muse.core.dag``
6 * Integration — add_dependencies → load → build_graph → detect/sort round-trip
7 * CLI — ``muse coord dag`` and ``muse coord reserve --depends-on`` via direct
8 function dispatch and stdout capture
9 * Security — ID validation, path traversal, self-dependency rejection
10 * Stress — large linear chains, wide diamonds, many-node graphs
11
12 Test conventions
13 ----------------
14 * Every test receives a fresh repo fixture via ``_make_repo(tmp_path)``.
15 * Active-reservation state is injected by creating real Reservation files via
16 :func:`muse.core.coordination.create_reservation` + the repo fixture.
17 * Time is frozen where predictable timestamps matter.
18 * CLI dispatch calls ``run`` directly with a hand-assembled ``argparse.Namespace``.
19 """
20
21 from __future__ import annotations
22
23 import argparse
24 import datetime
25 import json
26 import pathlib
27 import time
28 import itertools
29 from collections.abc import Generator
30 from contextlib import AbstractContextManager
31 from unittest.mock import MagicMock, patch
32
33 import pytest
34
35 from muse.core.coordination import Reservation
36
37 from muse.core.types import MsgpackValue, content_hash, fake_id
38 from muse.core.dag import (
39 AdjacencyMap,
40 DependencyRecord,
41 _MAX_DEPS,
42 _dependencies_dir,
43 _validate_id,
44 add_dependencies,
45 build_graph,
46 detect_cycle,
47 ensure_dag_dirs,
48 get_blocking,
49 is_blocked,
50 load_all_dependencies,
51 load_dag,
52 load_dependencies,
53 topological_sort,
54 )
55 from muse.cli.commands.dag import run as dag_run
56 from muse.cli.commands.reserve import run as reserve_run
57 from muse.core.paths import muse_dir
58
59 UTC = datetime.timezone.utc
60 _EPOCH = datetime.datetime(2026, 3, 30, 12, 0, 0, tzinfo=UTC)
61
62 _A = fake_id("dag-node-a")
63 _B = fake_id("dag-node-b")
64 _C = fake_id("dag-node-c")
65 _D = fake_id("dag-node-d")
66 _E = fake_id("dag-node-e")
67
68 _id_seq = itertools.count()
69
70
71 def _new_id() -> str:
72 return content_hash({"seq": next(_id_seq)})
73
74
75 def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path:
76 """Return a minimal muse repo root."""
77 muse_dir(tmp_path).mkdir(parents=True)
78 return tmp_path
79
80
81 def _freeze(ts: datetime.datetime) -> AbstractContextManager[MagicMock]:
82 return patch("muse.core.dag._now_utc", return_value=ts)
83
84
85 def _dag_ns(fmt: str = "json", **kwargs: MsgpackValue) -> argparse.Namespace:
86 defaults = {
87 "reservation_id": None,
88 "topo": False,
89 "active_only": False,
90 "json_out": fmt == "json",
91 }
92 defaults.update(kwargs)
93 return argparse.Namespace(**defaults)
94
95
96 def _reserve_ns(**kwargs: MsgpackValue) -> argparse.Namespace:
97 defaults = {
98 "addresses": ["billing.py::compute"],
99 "run_id": "agent-1",
100 "ttl": 3600,
101 "operation": None,
102 "json_out": True,
103 "depends_on": [],
104 }
105 defaults.update(kwargs)
106 return argparse.Namespace(**defaults)
107
108
109 def _patch_repo(repo: pathlib.Path) -> AbstractContextManager[None]:
110 from contextlib import ExitStack, contextmanager
111
112 @contextmanager
113 def _ctx() -> Generator[None, None, None]:
114 with ExitStack() as stack:
115 stack.enter_context(patch("muse.cli.commands.reserve.require_repo", return_value=repo))
116 stack.enter_context(patch("muse.cli.commands.reserve.read_current_branch", return_value="dev"))
117 yield
118
119 return _ctx()
120
121
122 def _patch_dag_repo(repo: pathlib.Path) -> AbstractContextManager[MagicMock]:
123 return patch("muse.cli.commands.dag.require_repo", return_value=repo)
124
125
126 # ── _validate_id ─────────────────────────────────────────────────────────────
127
128
129 class TestValidateId:
130 """ID validation accepts sha256: content-addressed IDs and rejects everything else."""
131
132 def test_accepts_valid_content_id(self) -> None:
133 _validate_id(_A) # no exception
134
135 def test_accepts_another_content_id(self) -> None:
136 _validate_id(_new_id()) # no exception
137
138 def test_rejects_empty(self) -> None:
139 with pytest.raises(ValueError):
140 _validate_id("")
141
142 def test_rejects_plain_string(self) -> None:
143 with pytest.raises(ValueError):
144 _validate_id("not-a-content-id")
145
146 def test_rejects_uuid4(self) -> None:
147 with pytest.raises(ValueError):
148 _validate_id("aaaaaaaa-aaaa-4aaa-8aaa-aaaaaaaaaaaa")
149
150 def test_rejects_path_traversal(self) -> None:
151 with pytest.raises(ValueError):
152 _validate_id("../../etc/passwd")
153
154 def test_rejects_null_byte(self) -> None:
155 with pytest.raises(ValueError):
156 _validate_id("\x00" * 36)
157
158 def test_rejects_slash(self) -> None:
159 with pytest.raises(ValueError):
160 _validate_id(f"sha256:/{'a' * 63}")
161
162
163 # ── DependencyRecord ───────────────────────────────────────────────────────────
164
165
166 class TestDependencyRecord:
167 """DependencyRecord serialisation round-trip."""
168
169 def _make(self, **kw: MsgpackValue) -> DependencyRecord:
170 defaults = dict(
171 reservation_id=_A,
172 depends_on=[_B, _C],
173 created_at=_EPOCH,
174 )
175 defaults.update(kw)
176 return DependencyRecord(**defaults)
177
178 def test_to_dict_round_trip(self) -> None:
179 rec = self._make()
180 d = rec.to_dict()
181 rec2 = DependencyRecord.from_dict(d)
182 assert rec2.reservation_id == _A
183 assert rec2.depends_on == [_B, _C]
184
185 def test_from_dict_missing_created_at_defaults_to_now(self) -> None:
186 d = {"reservation_id": _A, "depends_on": [_B]}
187 rec = DependencyRecord.from_dict(d)
188 assert isinstance(rec.created_at, datetime.datetime)
189
190 def test_from_dict_empty_depends_on(self) -> None:
191 d = {"reservation_id": _A, "depends_on": [], "created_at": _EPOCH.isoformat()}
192 rec = DependencyRecord.from_dict(d)
193 assert rec.depends_on == []
194
195 def test_from_dict_non_list_depends_on_defaults_empty(self) -> None:
196 d = {"reservation_id": _A, "depends_on": None, "created_at": _EPOCH.isoformat()}
197 rec = DependencyRecord.from_dict(d)
198 assert rec.depends_on == []
199
200 def test_schema_version_in_dict(self) -> None:
201 rec = self._make()
202 d = rec.to_dict()
203 assert "schema_version" in d
204
205
206 # ── ensure_dag_dirs / load_all_dependencies ───────────────────────────────────
207
208
209 class TestEnsureDagDirs:
210 def test_creates_directory(self, tmp_path: pathlib.Path) -> None:
211 repo = _make_repo(tmp_path)
212 ensure_dag_dirs(repo)
213 assert _dependencies_dir(repo).is_dir()
214
215 def test_idempotent(self, tmp_path: pathlib.Path) -> None:
216 repo = _make_repo(tmp_path)
217 ensure_dag_dirs(repo)
218 ensure_dag_dirs(repo)
219
220 def test_empty_dir_returns_empty_list(self, tmp_path: pathlib.Path) -> None:
221 repo = _make_repo(tmp_path)
222 assert load_all_dependencies(repo) == []
223
224 def test_non_existent_dir_returns_empty_list(self, tmp_path: pathlib.Path) -> None:
225 repo = _make_repo(tmp_path)
226 assert load_all_dependencies(repo) == []
227
228 def test_skips_corrupt_file(self, tmp_path: pathlib.Path) -> None:
229 repo = _make_repo(tmp_path)
230 ensure_dag_dirs(repo)
231 (_dependencies_dir(repo) / f"{_A}.json").write_text("NOT JSON")
232 assert load_all_dependencies(repo) == []
233
234
235 # ── load_dependencies ─────────────────────────────────────────────────────────
236
237
238 class TestLoadDependencies:
239 def test_returns_none_for_missing(self, tmp_path: pathlib.Path) -> None:
240 repo = _make_repo(tmp_path)
241 ensure_dag_dirs(repo)
242 assert load_dependencies(repo, _A) is None
243
244 def test_invalid_id_raises(self, tmp_path: pathlib.Path) -> None:
245 repo = _make_repo(tmp_path)
246 with pytest.raises(ValueError):
247 load_dependencies(repo, "not-a-content-id")
248
249 def test_returns_record_after_add(self, tmp_path: pathlib.Path) -> None:
250 repo = _make_repo(tmp_path)
251 with _freeze(_EPOCH):
252 add_dependencies(repo, _A, [_B])
253 rec = load_dependencies(repo, _A)
254 assert rec is not None
255 assert rec.depends_on == [_B]
256
257
258 # ── build_graph ────────────────────────────────────────────────────────────────
259
260
261 class TestBuildGraph:
262 def _rec(self, rid: str, deps: list[str]) -> DependencyRecord:
263 return DependencyRecord(rid, deps, _EPOCH)
264
265 def test_empty_input(self) -> None:
266 g = build_graph([])
267 assert g == {}
268
269 def test_single_node_no_deps(self) -> None:
270 g = build_graph([self._rec(_A, [])])
271 assert g[_A] == set()
272
273 def test_adds_dependency_nodes(self) -> None:
274 g = build_graph([self._rec(_A, [_B])])
275 assert _A in g
276 assert _B in g
277 assert _B in g[_A]
278
279 def test_diamond_shape(self) -> None:
280 # A → B, A → C, B → D, C → D
281 g = build_graph([
282 self._rec(_A, [_B, _C]),
283 self._rec(_B, [_D]),
284 self._rec(_C, [_D]),
285 ])
286 assert g[_A] == {_B, _C}
287 assert g[_B] == {_D}
288 assert g[_C] == {_D}
289 assert g[_D] == set()
290
291 def test_leaf_nodes_included(self) -> None:
292 g = build_graph([self._rec(_A, [_B, _C])])
293 assert _B in g and g[_B] == set()
294 assert _C in g and g[_C] == set()
295
296
297 # ── detect_cycle ───────────────────────────────────────────────────────────────
298
299
300 class TestDetectCycle:
301 def test_empty_graph_no_cycle(self) -> None:
302 assert detect_cycle({}) is None
303
304 def test_single_node_no_cycle(self) -> None:
305 assert detect_cycle({_A: set()}) is None
306
307 def test_linear_chain_no_cycle(self) -> None:
308 g = {_A: {_B}, _B: {_C}, _C: set()}
309 assert detect_cycle(g) is None
310
311 def test_direct_self_loop(self) -> None:
312 g = {_A: {_A}}
313 cycle = detect_cycle(g)
314 assert cycle is not None
315 assert _A in cycle
316
317 def test_two_node_cycle(self) -> None:
318 g = {_A: {_B}, _B: {_A}}
319 cycle = detect_cycle(g)
320 assert cycle is not None
321 assert len(cycle) >= 2
322
323 def test_three_node_cycle(self) -> None:
324 g = {_A: {_B}, _B: {_C}, _C: {_A}}
325 cycle = detect_cycle(g)
326 assert cycle is not None
327 assert len(cycle) >= 3
328
329 def test_diamond_no_cycle(self) -> None:
330 # A → B, A → C, B → D, C → D (fan-in is fine)
331 g = {_A: {_B, _C}, _B: {_D}, _C: {_D}, _D: set()}
332 assert detect_cycle(g) is None
333
334 def test_cycle_path_starts_and_ends_same(self) -> None:
335 g = {_A: {_B}, _B: {_C}, _C: {_A}}
336 cycle = detect_cycle(g)
337 assert cycle is not None
338 assert cycle[0] == cycle[-1]
339
340 def test_unrelated_subgraph_no_cycle(self) -> None:
341 # A → B (no cycle), C → D (no cycle)
342 g = {_A: {_B}, _B: set(), _C: {_D}, _D: set()}
343 assert detect_cycle(g) is None
344
345
346 # ── topological_sort ───────────────────────────────────────────────────────────
347
348
349 class TestTopologicalSort:
350 def test_empty_graph(self) -> None:
351 result = topological_sort({})
352 assert result == []
353
354 def test_single_node(self) -> None:
355 result = topological_sort({_A: set()})
356 assert result == [_A]
357
358 def test_linear_chain_order(self) -> None:
359 # A → B → C: C should come first (fewest deps), then B, then A
360 g = {_A: {_B}, _B: {_C}, _C: set()}
361 result = topological_sort(g)
362 assert result.index(_C) < result.index(_B) < result.index(_A)
363
364 def test_diamond_order(self) -> None:
365 # D (leaf) → B, C (intermediate) → A (top)
366 g = {_A: {_B, _C}, _B: {_D}, _C: {_D}, _D: set()}
367 result = topological_sort(g)
368 assert result.index(_D) < result.index(_B)
369 assert result.index(_D) < result.index(_C)
370 assert result.index(_B) < result.index(_A)
371 assert result.index(_C) < result.index(_A)
372
373 def test_all_nodes_present(self) -> None:
374 g = {_A: {_B}, _B: {_C}, _C: set()}
375 result = topological_sort(g)
376 assert set(result) == {_A, _B, _C}
377
378 def test_subset_nodes(self) -> None:
379 g = {_A: {_B}, _B: {_C}, _C: set(), _D: set()}
380 # Only ask for A's subgraph
381 result = topological_sort(g, nodes=[_A])
382 assert _A in result and _B in result and _C in result
383 # D is unrelated — may or may not be included; just check order
384 assert result.index(_C) < result.index(_B) < result.index(_A)
385
386 def test_cycle_raises(self) -> None:
387 g = {_A: {_B}, _B: {_A}}
388 with pytest.raises(ValueError, match="cycle"):
389 topological_sort(g)
390
391 def test_independent_nodes_all_returned(self) -> None:
392 g = {_A: set(), _B: set(), _C: set()}
393 result = topological_sort(g)
394 assert set(result) == {_A, _B, _C}
395
396
397 # ── is_blocked / get_blocking ──────────────────────────────────────────────────
398
399
400 class TestIsBlockedAndGetBlocking:
401 def test_no_deps_not_blocked(self) -> None:
402 g = {_A: set()}
403 assert is_blocked(_A, g, frozenset({_B})) is False
404
405 def test_dep_inactive_not_blocked(self) -> None:
406 g = {_A: {_B}}
407 # B is not in active_ids → unblocked
408 assert is_blocked(_A, g, frozenset()) is False
409
410 def test_dep_active_is_blocked(self) -> None:
411 g = {_A: {_B}}
412 assert is_blocked(_A, g, frozenset({_B})) is True
413
414 def test_multiple_deps_one_active(self) -> None:
415 g = {_A: {_B, _C}}
416 assert is_blocked(_A, g, frozenset({_B})) is True
417
418 def test_unknown_node_not_blocked(self) -> None:
419 g = {}
420 assert is_blocked(_A, g, frozenset({_B})) is False
421
422 def test_get_blocking_empty(self) -> None:
423 g = {_A: {_B}}
424 assert get_blocking(_A, g, frozenset()) == []
425
426 def test_get_blocking_returns_active_deps(self) -> None:
427 g = {_A: {_B, _C}}
428 blocking = get_blocking(_A, g, frozenset({_B}))
429 assert blocking == [_B]
430
431 def test_get_blocking_sorted(self) -> None:
432 g = {_A: {_B, _C, _D}}
433 blocking = get_blocking(_A, g, frozenset({_C, _B}))
434 assert blocking == sorted([_B, _C])
435
436
437 # ── add_dependencies ───────────────────────────────────────────────────────────
438
439
440 class TestAddDependencies:
441 def test_creates_file_on_disk(self, tmp_path: pathlib.Path) -> None:
442 repo = _make_repo(tmp_path)
443 add_dependencies(repo, _A, [_B])
444 dep_file = _dependencies_dir(repo) / f"{_A}.json"
445 assert dep_file.is_file()
446
447 def test_file_is_valid_json(self, tmp_path: pathlib.Path) -> None:
448 repo = _make_repo(tmp_path)
449 add_dependencies(repo, _A, [_B])
450 raw = json.loads((_dependencies_dir(repo) / f"{_A}.json").read_text())
451 assert raw["reservation_id"] == _A
452 assert _B in raw["depends_on"]
453
454 def test_returns_correct_record(self, tmp_path: pathlib.Path) -> None:
455 repo = _make_repo(tmp_path)
456 rec = add_dependencies(repo, _A, [_B, _C])
457 assert rec.reservation_id == _A
458 assert set(rec.depends_on) == {_B, _C}
459
460 def test_deduplicates_deps(self, tmp_path: pathlib.Path) -> None:
461 repo = _make_repo(tmp_path)
462 rec = add_dependencies(repo, _A, [_B, _B, _B])
463 assert rec.depends_on == [_B]
464
465 def test_empty_deps_accepted(self, tmp_path: pathlib.Path) -> None:
466 repo = _make_repo(tmp_path)
467 rec = add_dependencies(repo, _A, [])
468 assert rec.depends_on == []
469
470 def test_write_once_raises_file_exists(self, tmp_path: pathlib.Path) -> None:
471 repo = _make_repo(tmp_path)
472 add_dependencies(repo, _A, [_B])
473 with pytest.raises(FileExistsError):
474 add_dependencies(repo, _A, [_C])
475
476 def test_self_dependency_raises(self, tmp_path: pathlib.Path) -> None:
477 repo = _make_repo(tmp_path)
478 with pytest.raises(ValueError, match="itself"):
479 add_dependencies(repo, _A, [_A])
480
481 def test_invalid_reservation_id_raises(self, tmp_path: pathlib.Path) -> None:
482 repo = _make_repo(tmp_path)
483 with pytest.raises(ValueError):
484 add_dependencies(repo, "not-a-content-id", [_B])
485
486 def test_invalid_dep_id_raises(self, tmp_path: pathlib.Path) -> None:
487 repo = _make_repo(tmp_path)
488 with pytest.raises(ValueError):
489 add_dependencies(repo, _A, ["../../etc/passwd"])
490
491 def test_too_many_deps_raises(self, tmp_path: pathlib.Path) -> None:
492 repo = _make_repo(tmp_path)
493 many = [_new_id() for _ in range(_MAX_DEPS + 1)]
494 with pytest.raises(ValueError, match="too many"):
495 add_dependencies(repo, _A, many)
496
497 def test_max_deps_accepted(self, tmp_path: pathlib.Path) -> None:
498 repo = _make_repo(tmp_path)
499 many = [_new_id() for _ in range(_MAX_DEPS)]
500 rec = add_dependencies(repo, _A, many)
501 assert len(rec.depends_on) == _MAX_DEPS
502
503 def test_rejects_cycle_two_node(self, tmp_path: pathlib.Path) -> None:
504 """A → B then B → A must be rejected."""
505 repo = _make_repo(tmp_path)
506 add_dependencies(repo, _A, [_B])
507 with pytest.raises(ValueError, match="cycle"):
508 add_dependencies(repo, _B, [_A])
509
510 def test_rejects_cycle_three_node(self, tmp_path: pathlib.Path) -> None:
511 """A → B → C → A must be rejected at the third edge."""
512 repo = _make_repo(tmp_path)
513 add_dependencies(repo, _A, [_B])
514 add_dependencies(repo, _B, [_C])
515 with pytest.raises(ValueError, match="cycle"):
516 add_dependencies(repo, _C, [_A])
517
518 def test_allows_diamond(self, tmp_path: pathlib.Path) -> None:
519 """A → B, A → C, B → D, C → D is a valid DAG (fan-in, not a cycle)."""
520 repo = _make_repo(tmp_path)
521 add_dependencies(repo, _A, [_B, _C])
522 add_dependencies(repo, _B, [_D])
523 add_dependencies(repo, _C, [_D]) # must not raise
524 g = load_dag(repo)
525 assert detect_cycle(g) is None
526
527
528 # ── load_dag ───────────────────────────────────────────────────────────────────
529
530
531 class TestLoadDag:
532 def test_empty_repo(self, tmp_path: pathlib.Path) -> None:
533 repo = _make_repo(tmp_path)
534 assert load_dag(repo) == {}
535
536 def test_single_record(self, tmp_path: pathlib.Path) -> None:
537 repo = _make_repo(tmp_path)
538 add_dependencies(repo, _A, [_B])
539 g = load_dag(repo)
540 assert g[_A] == {_B}
541 assert _B in g
542
543 def test_multiple_records(self, tmp_path: pathlib.Path) -> None:
544 repo = _make_repo(tmp_path)
545 add_dependencies(repo, _A, [_B])
546 add_dependencies(repo, _B, [_C])
547 g = load_dag(repo)
548 assert g[_A] == {_B}
549 assert g[_B] == {_C}
550
551
552 # ── Integration: full lifecycle ────────────────────────────────────────────────
553
554
555 class TestIntegration:
556 """End-to-end: create deps → load → compute order → check blocked."""
557
558 def test_pipeline_ordering(self, tmp_path: pathlib.Path) -> None:
559 """Five-stage pipeline: E→D, D→C, C→B, B→A. Topo order: A,B,C,D,E."""
560 repo = _make_repo(tmp_path)
561 add_dependencies(repo, _B, [_A])
562 add_dependencies(repo, _C, [_B])
563 add_dependencies(repo, _D, [_C])
564 add_dependencies(repo, _E, [_D])
565
566 g = load_dag(repo)
567 assert detect_cycle(g) is None
568
569 order = topological_sort(g)
570 assert order.index(_A) < order.index(_B)
571 assert order.index(_B) < order.index(_C)
572 assert order.index(_C) < order.index(_D)
573 assert order.index(_D) < order.index(_E)
574
575 def test_blocked_status_with_active_reservations(self, tmp_path: pathlib.Path) -> None:
576 """B depends on A; A is active → B is blocked; A is active → not blocked."""
577 repo = _make_repo(tmp_path)
578 add_dependencies(repo, _B, [_A])
579 g = load_dag(repo)
580
581 # A is still active
582 assert is_blocked(_B, g, frozenset({_A})) is True
583 assert get_blocking(_B, g, frozenset({_A})) == [_A]
584
585 # A has been released
586 assert is_blocked(_B, g, frozenset()) is False
587 assert get_blocking(_B, g, frozenset()) == []
588
589 def test_unblocked_when_no_deps(self, tmp_path: pathlib.Path) -> None:
590 repo = _make_repo(tmp_path)
591 add_dependencies(repo, _A, [])
592 g = load_dag(repo)
593 assert is_blocked(_A, g, frozenset({_B, _C})) is False
594
595 def test_diamond_blocked_status(self, tmp_path: pathlib.Path) -> None:
596 """A → B, A → C, B → D, C → D. When D is active, B and C are blocked."""
597 repo = _make_repo(tmp_path)
598 add_dependencies(repo, _A, [_B, _C])
599 add_dependencies(repo, _B, [_D])
600 add_dependencies(repo, _C, [_D])
601 g = load_dag(repo)
602
603 active = frozenset({_D})
604 # is_blocked checks DIRECT deps only.
605 # A's direct deps are B and C — neither is in active_ids{D}.
606 assert is_blocked(_A, g, active) is False
607 assert is_blocked(_B, g, active) is True # B depends on D (active)
608 assert is_blocked(_C, g, active) is True # C depends on D (active)
609 assert is_blocked(_D, g, active) is False # D has no deps
610
611
612 # ── Security tests ─────────────────────────────────────────────────────────────
613
614
615 class TestSecurity:
616 def test_path_traversal_in_add_reservation_id(self, tmp_path: pathlib.Path) -> None:
617 repo = _make_repo(tmp_path)
618 with pytest.raises(ValueError):
619 add_dependencies(repo, "../../etc/passwd", [_B])
620
621 def test_path_traversal_in_dep_id(self, tmp_path: pathlib.Path) -> None:
622 repo = _make_repo(tmp_path)
623 with pytest.raises(ValueError):
624 add_dependencies(repo, _A, ["../../harm"])
625
626 def test_null_byte_in_reservation_id(self, tmp_path: pathlib.Path) -> None:
627 repo = _make_repo(tmp_path)
628 with pytest.raises(ValueError):
629 add_dependencies(repo, "\x00" * 36, [_B])
630
631 def test_null_byte_in_dep_id(self, tmp_path: pathlib.Path) -> None:
632 repo = _make_repo(tmp_path)
633 with pytest.raises(ValueError):
634 add_dependencies(repo, _A, ["\x00" * 36])
635
636 def test_path_traversal_in_load_dependencies(self, tmp_path: pathlib.Path) -> None:
637 repo = _make_repo(tmp_path)
638 with pytest.raises(ValueError):
639 load_dependencies(repo, "../../etc/shadow")
640
641 def test_cycle_cannot_be_written(self, tmp_path: pathlib.Path) -> None:
642 """Even under rapid concurrent conditions, a cycle must never reach disk."""
643 repo = _make_repo(tmp_path)
644 add_dependencies(repo, _A, [_B])
645 add_dependencies(repo, _B, [_C])
646 # This would create A→B→C→A
647 with pytest.raises(ValueError, match="cycle"):
648 add_dependencies(repo, _C, [_A])
649 # Verify C has no record on disk
650 assert (_dependencies_dir(repo) / f"{_C}.json").exists() is False
651
652
653 # ── CLI: muse coord dag ────────────────────────────────────────────────────────
654
655
656 class TestCliDag:
657 def _make_deps(self, repo: pathlib.Path) -> None:
658 add_dependencies(repo, _B, [_A])
659 add_dependencies(repo, _C, [_B])
660
661 def test_full_dag_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
662 repo = _make_repo(tmp_path)
663 self._make_deps(repo)
664 args = _dag_ns(fmt="json")
665 with _patch_dag_repo(repo):
666 dag_run(args)
667 out = json.loads(capsys.readouterr().out)
668 assert out["total_nodes"] == 3
669 assert out["total_edges"] == 2
670 assert "nodes" in out
671
672 def test_full_dag_text(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
673 repo = _make_repo(tmp_path)
674 self._make_deps(repo)
675 args = _dag_ns(fmt="text")
676 with _patch_dag_repo(repo):
677 dag_run(args)
678 out = capsys.readouterr().out
679 assert "Dependency DAG" in out
680
681 def test_empty_dag_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
682 repo = _make_repo(tmp_path)
683 args = _dag_ns(fmt="json")
684 with _patch_dag_repo(repo):
685 dag_run(args)
686 out = json.loads(capsys.readouterr().out)
687 assert out["total_nodes"] == 0
688 assert out["nodes"] == []
689
690 def test_single_reservation_mode_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
691 repo = _make_repo(tmp_path)
692 add_dependencies(repo, _B, [_A])
693 args = _dag_ns(fmt="json", reservation_id=_B)
694 with _patch_dag_repo(repo):
695 dag_run(args)
696 out = json.loads(capsys.readouterr().out)
697 assert out["reservation_id"] == _B
698 assert _A in out["depends_on"]
699
700 def test_single_reservation_mode_text(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
701 repo = _make_repo(tmp_path)
702 add_dependencies(repo, _B, [_A])
703 args = _dag_ns(fmt="text", reservation_id=_B)
704 with _patch_dag_repo(repo):
705 dag_run(args)
706 out = capsys.readouterr().out
707 # Should show the reservation ID prefix
708 assert _B[:8] in out
709
710 def test_cycle_detected_exits_1(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
711 """A cycle on disk causes exit code 1."""
712 repo = _make_repo(tmp_path)
713 # Manually write a cycle to disk, bypassing add_dependencies validation
714 ensure_dag_dirs(repo)
715 import json as _json
716 from muse._version import __version__
717 rec_a = {"schema_version": __version__, "reservation_id": _A,
718 "depends_on": [_B], "created_at": _EPOCH.isoformat()}
719 rec_b = {"schema_version": __version__, "reservation_id": _B,
720 "depends_on": [_A], "created_at": _EPOCH.isoformat()}
721 (_dependencies_dir(repo) / f"{_A}.json").write_text(_json.dumps(rec_a))
722 (_dependencies_dir(repo) / f"{_B}.json").write_text(_json.dumps(rec_b))
723
724 args = _dag_ns(fmt="json")
725 with _patch_dag_repo(repo):
726 with pytest.raises(SystemExit) as exc:
727 dag_run(args)
728 assert exc.value.code == 1
729
730 def test_blocked_count_in_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
731 """A reservation with an active dependency must show blocked_count > 0."""
732 repo = _make_repo(tmp_path)
733 add_dependencies(repo, _B, [_A])
734
735 # Inject _A as an active reservation via patching active_reservations.
736 from muse.core.coordination import Reservation
737 fake_res = Reservation(
738 reservation_id=_A,
739 run_id="agent-a",
740 branch="dev",
741 addresses=["foo.py::bar"],
742 created_at=_EPOCH,
743 expires_at=_EPOCH + datetime.timedelta(hours=1),
744 operation=None,
745 )
746 with _patch_dag_repo(repo), \
747 patch("muse.cli.commands.dag.active_reservations", return_value=[fake_res]):
748 args = _dag_ns(fmt="json")
749 dag_run(args)
750
751 out = json.loads(capsys.readouterr().out)
752 assert out["blocked_count"] == 1
753
754 def test_topo_index_in_nodes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
755 repo = _make_repo(tmp_path)
756 add_dependencies(repo, _B, [_A])
757 args = _dag_ns(fmt="json")
758 with _patch_dag_repo(repo):
759 dag_run(args)
760 out = json.loads(capsys.readouterr().out)
761 indices = [n["topo_index"] for n in out["nodes"]]
762 assert all(i is not None for i in indices)
763 # A must have lower index than B (A has no deps, B depends on A)
764 a_node = next(n for n in out["nodes"] if n["reservation_id"] == _A)
765 b_node = next(n for n in out["nodes"] if n["reservation_id"] == _B)
766 assert a_node["topo_index"] < b_node["topo_index"]
767
768 def test_ansi_not_in_text_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
769 """Node IDs stored on disk must not produce raw ANSI in text output."""
770 repo = _make_repo(tmp_path)
771 add_dependencies(repo, _B, [_A])
772 args = _dag_ns(fmt="text")
773 with _patch_dag_repo(repo):
774 dag_run(args)
775 out = capsys.readouterr().out
776 assert "\x1b" not in out
777
778
779 # ── CLI: muse coord reserve --depends-on ──────────────────────────────────────
780
781
782 class TestReserveWithDependsOn:
783 """reserve --depends-on writes both a reservation and a dependency record."""
784
785 def test_reserve_without_depends_on(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
786 repo = _make_repo(tmp_path)
787 args = _reserve_ns(depends_on=[], addresses=["foo.py::bar"])
788 with _patch_repo(repo):
789 reserve_run(args)
790 out = json.loads(capsys.readouterr().out)
791 assert out["depends_on"] == []
792 assert out["dependency_error"] is None
793
794 def test_reserve_with_single_dep(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
795 repo = _make_repo(tmp_path)
796 args = _reserve_ns(depends_on=[_A], addresses=["foo.py::bar"])
797 with _patch_repo(repo):
798 reserve_run(args)
799 out = json.loads(capsys.readouterr().out)
800 assert _A in out["depends_on"]
801 # Verify file exists on disk
802 res_id = out["reservation_id"]
803 dep_rec = load_dependencies(repo, res_id)
804 assert dep_rec is not None
805 assert _A in dep_rec.depends_on
806
807 def test_reserve_with_multiple_deps(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
808 repo = _make_repo(tmp_path)
809 args = _reserve_ns(depends_on=[_A, _B], addresses=["foo.py::bar"])
810 with _patch_repo(repo):
811 reserve_run(args)
812 out = json.loads(capsys.readouterr().out)
813 assert set(out["depends_on"]) == {_A, _B}
814
815 def test_reserve_invalid_dep_id_exits_1(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
816 """Invalid content ID in --depends-on → dependency_error set, exit 1."""
817 repo = _make_repo(tmp_path)
818 args = _reserve_ns(depends_on=["not-a-content-id"], addresses=["foo.py::bar"])
819 with _patch_repo(repo):
820 with pytest.raises(SystemExit) as exc:
821 reserve_run(args)
822 assert exc.value.code == 1
823 out = json.loads(capsys.readouterr().out)
824 assert out["dependency_error"] is not None
825
826 def test_reserve_dep_creates_dag_record(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
827 """The reservation ID from --depends-on is reflected in load_dag."""
828 repo = _make_repo(tmp_path)
829 # First reservation (will be the dependency)
830 args1 = _reserve_ns(depends_on=[], addresses=["a.py::fn"])
831 with _patch_repo(repo):
832 reserve_run(args1)
833 dep_id = json.loads(capsys.readouterr().out)["reservation_id"]
834
835 # Second reservation depends on the first
836 args2 = _reserve_ns(depends_on=[dep_id], addresses=["b.py::fn"])
837 with _patch_repo(repo):
838 reserve_run(args2)
839 new_id = json.loads(capsys.readouterr().out)["reservation_id"]
840
841 g = load_dag(repo)
842 assert dep_id in g[new_id]
843
844
845 # ── Stress tests ───────────────────────────────────────────────────────────────
846
847
848 class TestStress:
849 def test_linear_chain_100(self, tmp_path: pathlib.Path) -> None:
850 """100-node chain A0 → A1 → … → A99: no cycle, correct topo order."""
851 repo = _make_repo(tmp_path)
852 ids = [_new_id() for _ in range(100)]
853 for i in range(1, len(ids)):
854 add_dependencies(repo, ids[i], [ids[i - 1]])
855
856 g = load_dag(repo)
857 assert detect_cycle(g) is None
858
859 order = topological_sort(g)
860 # ids[0] must come before ids[1] must come before … ids[99]
861 for i in range(1, len(ids)):
862 assert order.index(ids[i - 1]) < order.index(ids[i])
863
864 def test_wide_diamond_50(self, tmp_path: pathlib.Path) -> None:
865 """1 root, 50 middle nodes, 1 leaf: fan-out then fan-in."""
866 repo = _make_repo(tmp_path)
867 root_id = _new_id()
868 leaf_id = _new_id()
869 middles = [_new_id() for _ in range(50)]
870
871 # root → each middle
872 for m in middles:
873 add_dependencies(repo, m, [root_id])
874 # leaf → each middle (leaf depends on all 50)
875 add_dependencies(repo, leaf_id, middles)
876
877 g = load_dag(repo)
878 assert detect_cycle(g) is None
879 order = topological_sort(g)
880 assert order.index(root_id) < order.index(middles[0])
881 assert order.index(middles[-1]) < order.index(leaf_id)
882
883 def test_add_500_independent_deps(self, tmp_path: pathlib.Path) -> None:
884 """500 reservations each with one independent dependency — quick scan."""
885 repo = _make_repo(tmp_path)
886 start = time.monotonic()
887 for _ in range(500):
888 add_dependencies(repo, _new_id(), [_new_id()])
889 elapsed = time.monotonic() - start
890 assert elapsed < 10.0, f"500 independent adds took {elapsed:.2f}s"
891
892 def test_load_dag_500_nodes(self, tmp_path: pathlib.Path) -> None:
893 """load_dag on 500 records must complete quickly."""
894 repo = _make_repo(tmp_path)
895 for _ in range(500):
896 add_dependencies(repo, _new_id(), [_new_id()])
897 start = time.monotonic()
898 g = load_dag(repo)
899 elapsed = time.monotonic() - start
900 assert len(g) >= 500
901 assert elapsed < 5.0, f"load_dag 500 nodes took {elapsed:.2f}s"
902
903 def test_topo_sort_1000_node_chain(self) -> None:
904 """In-memory topo sort of 1000-node linear chain is fast."""
905 ids = [_new_id() for _ in range(1000)]
906 graph: AdjacencyMap = {}
907 for i, rid in enumerate(ids):
908 graph[rid] = {ids[i - 1]} if i > 0 else set()
909
910 start = time.monotonic()
911 order = topological_sort(graph)
912 elapsed = time.monotonic() - start
913 assert len(order) == 1000
914 assert elapsed < 1.0, f"topo sort 1000 nodes took {elapsed:.2f}s"
915
916 def test_detect_cycle_1000_node_acyclic(self) -> None:
917 """Cycle detection on a 1000-node chain (no cycle) must be fast."""
918 ids = [_new_id() for _ in range(1000)]
919 graph: AdjacencyMap = {}
920 for i, rid in enumerate(ids):
921 graph[rid] = {ids[i - 1]} if i > 0 else set()
922
923 start = time.monotonic()
924 result = detect_cycle(graph)
925 elapsed = time.monotonic() - start
926 assert result is None
927 assert elapsed < 1.0, f"detect_cycle 1000 nodes took {elapsed:.2f}s"
928
929 def test_dag_json_200_active_nodes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
930 """CLI dag with 200 nodes completes quickly."""
931 repo = _make_repo(tmp_path)
932 ids = [_new_id() for _ in range(200)]
933 for i in range(1, len(ids)):
934 add_dependencies(repo, ids[i], [ids[i - 1]])
935 args = _dag_ns(fmt="json")
936 start = time.monotonic()
937 with _patch_dag_repo(repo):
938 dag_run(args)
939 elapsed = time.monotonic() - start
940 out = json.loads(capsys.readouterr().out)
941 assert out["total_nodes"] == 200
942 assert elapsed < 5.0, f"200-node dag JSON took {elapsed:.2f}s"
943
944
945 # ── New: input validation ──────────────────────────────────────────────────────
946
947
948 class TestCliDagInputValidation:
949 """ID validation on --reservation-id fires before any file I/O."""
950
951 def test_valid_content_id_accepted_single_mode(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
952 repo = _make_repo(tmp_path)
953 add_dependencies(repo, _B, [_A])
954 args = _dag_ns(fmt="json", reservation_id=_B)
955 with _patch_dag_repo(repo):
956 dag_run(args)
957 out = json.loads(capsys.readouterr().out)
958 assert out["reservation_id"] == _B
959
960 def test_invalid_id_exits_1_text(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
961 repo = _make_repo(tmp_path)
962 args = _dag_ns(fmt="text", reservation_id="not-a-content-id")
963 with _patch_dag_repo(repo):
964 with pytest.raises(SystemExit) as exc:
965 dag_run(args)
966 assert exc.value.code == 1
967 err = capsys.readouterr().err
968 assert "❌" in err
969
970 def test_invalid_id_exits_1_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
971 repo = _make_repo(tmp_path)
972 args = _dag_ns(fmt="json", reservation_id="not-a-content-id")
973 with _patch_dag_repo(repo):
974 with pytest.raises(SystemExit) as exc:
975 dag_run(args)
976 assert exc.value.code == 1
977 out = json.loads(capsys.readouterr().out)
978 assert "error" in out
979 assert out["status"] == "bad_reservation_id"
980
981 def test_path_traversal_exits_1(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
982 repo = _make_repo(tmp_path)
983 args = _dag_ns(fmt="json", reservation_id="../../etc/passwd")
984 with _patch_dag_repo(repo):
985 with pytest.raises(SystemExit) as exc:
986 dag_run(args)
987 assert exc.value.code == 1
988
989 def test_null_byte_in_reservation_id_exits_1(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
990 repo = _make_repo(tmp_path)
991 args = _dag_ns(fmt="json", reservation_id="\x00" * 36)
992 with _patch_dag_repo(repo):
993 with pytest.raises(SystemExit) as exc:
994 dag_run(args)
995 assert exc.value.code == 1
996
997 def test_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None:
998 """require_repo must never be called when --reservation-id is invalid."""
999 repo = _make_repo(tmp_path)
1000 args = _dag_ns(fmt="json", reservation_id="bad-id")
1001 require_calls: list[bool] = []
1002
1003 def _fake_require() -> pathlib.Path:
1004 require_calls.append(True)
1005 return repo
1006
1007 with patch("muse.cli.commands.dag.require_repo", side_effect=_fake_require):
1008 with pytest.raises(SystemExit):
1009 dag_run(args)
1010 assert require_calls == [], "require_repo was called before validation"
1011
1012 def test_none_reservation_id_shows_full_dag(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1013 repo = _make_repo(tmp_path)
1014 add_dependencies(repo, _B, [_A])
1015 args = _dag_ns(fmt="json", reservation_id=None)
1016 with _patch_dag_repo(repo):
1017 dag_run(args)
1018 out = json.loads(capsys.readouterr().out)
1019 assert "nodes" in out
1020 assert out["total_nodes"] > 0
1021
1022 def test_json_error_is_compact_single_line(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1023 """JSON error output must be on a single line (compact, no indent)."""
1024 repo = _make_repo(tmp_path)
1025 args = _dag_ns(fmt="json", reservation_id="bad")
1026 with _patch_dag_repo(repo):
1027 with pytest.raises(SystemExit):
1028 dag_run(args)
1029 raw = capsys.readouterr().out.strip()
1030 assert "\n" not in raw
1031 data = json.loads(raw)
1032 assert "error" in data
1033
1034
1035 # ── New: compact JSON output ───────────────────────────────────────────────────
1036
1037
1038 class TestCliDagCompactJson:
1039 """All JSON output must be compact (no indent=2)."""
1040
1041 def test_full_dag_json_is_single_line(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1042 repo = _make_repo(tmp_path)
1043 add_dependencies(repo, _B, [_A])
1044 args = _dag_ns(fmt="json")
1045 with _patch_dag_repo(repo):
1046 dag_run(args)
1047 raw = capsys.readouterr().out.strip()
1048 assert "\n" not in raw
1049 json.loads(raw) # must be valid JSON
1050
1051 def test_single_mode_json_is_single_line(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1052 repo = _make_repo(tmp_path)
1053 add_dependencies(repo, _B, [_A])
1054 args = _dag_ns(fmt="json", reservation_id=_B)
1055 with _patch_dag_repo(repo):
1056 dag_run(args)
1057 raw = capsys.readouterr().out.strip()
1058 assert "\n" not in raw
1059 json.loads(raw)
1060
1061 def test_empty_dag_json_is_single_line(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1062 repo = _make_repo(tmp_path)
1063 args = _dag_ns(fmt="json")
1064 with _patch_dag_repo(repo):
1065 dag_run(args)
1066 raw = capsys.readouterr().out.strip()
1067 assert "\n" not in raw
1068 data = json.loads(raw)
1069 assert data["total_nodes"] == 0
1070
1071 def test_full_dag_json_schema_keys(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1072 repo = _make_repo(tmp_path)
1073 add_dependencies(repo, _B, [_A])
1074 args = _dag_ns(fmt="json")
1075 with _patch_dag_repo(repo):
1076 dag_run(args)
1077 out = json.loads(capsys.readouterr().out)
1078 for key in ("total_nodes", "total_edges",
1079 "blocked_count", "active_only", "cycle", "nodes"):
1080 assert key in out, f"missing key: {key}"
1081
1082 def test_single_mode_json_schema_keys(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1083 repo = _make_repo(tmp_path)
1084 add_dependencies(repo, _B, [_A])
1085 args = _dag_ns(fmt="json", reservation_id=_B)
1086 with _patch_dag_repo(repo):
1087 dag_run(args)
1088 out = json.loads(capsys.readouterr().out)
1089 for key in ("reservation_id", "depends_on", "active", "blocked", "blocking", "cycle"):
1090 assert key in out, f"missing key: {key}"
1091
1092 def test_active_only_field_in_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1093 repo = _make_repo(tmp_path)
1094 args = _dag_ns(fmt="json", active_only=False)
1095 with _patch_dag_repo(repo):
1096 dag_run(args)
1097 out = json.loads(capsys.readouterr().out)
1098 assert out["active_only"] is False
1099
1100 def test_active_only_true_reflected_in_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1101 repo = _make_repo(tmp_path)
1102 args = _dag_ns(fmt="json", active_only=True)
1103 with _patch_dag_repo(repo):
1104 dag_run(args)
1105 out = json.loads(capsys.readouterr().out)
1106 assert out["active_only"] is True
1107
1108
1109 # ── New: --active-only flag ────────────────────────────────────────────────────
1110
1111
1112 class TestCliDagActiveOnly:
1113 """--active-only restricts the graph to currently active reservations."""
1114
1115 def _make_fake_active(self, reservation_id: str) -> Reservation:
1116 return Reservation(
1117 reservation_id=reservation_id,
1118 run_id="agent-test",
1119 branch="dev",
1120 addresses=["foo.py::bar"],
1121 created_at=_EPOCH,
1122 expires_at=_EPOCH + datetime.timedelta(hours=1),
1123 operation=None,
1124 )
1125
1126 def test_active_only_false_shows_all_nodes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1127 repo = _make_repo(tmp_path)
1128 add_dependencies(repo, _B, [_A])
1129 args = _dag_ns(fmt="json", active_only=False)
1130 with _patch_dag_repo(repo):
1131 dag_run(args)
1132 out = json.loads(capsys.readouterr().out)
1133 # Both _A and _B must be in graph (neither is active, no filter)
1134 node_ids = {n["reservation_id"] for n in out["nodes"]}
1135 assert _A in node_ids and _B in node_ids
1136
1137 def test_active_only_true_empty_active_set(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1138 repo = _make_repo(tmp_path)
1139 add_dependencies(repo, _B, [_A])
1140 args = _dag_ns(fmt="json", active_only=True)
1141 with _patch_dag_repo(repo), \
1142 patch("muse.cli.commands.dag.active_reservations", return_value=[]):
1143 dag_run(args)
1144 out = json.loads(capsys.readouterr().out)
1145 # No active reservations → empty graph
1146 assert out["total_nodes"] == 0
1147 assert out["nodes"] == []
1148
1149 def test_active_only_filters_to_active_nodes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1150 repo = _make_repo(tmp_path)
1151 add_dependencies(repo, _B, [_A])
1152 add_dependencies(repo, _C, [_B])
1153 # Only _B is active
1154 fake_active = [self._make_fake_active(_B)]
1155 args = _dag_ns(fmt="json", active_only=True)
1156 with _patch_dag_repo(repo), \
1157 patch("muse.cli.commands.dag.active_reservations", return_value=fake_active):
1158 dag_run(args)
1159 out = json.loads(capsys.readouterr().out)
1160 node_ids = {n["reservation_id"] for n in out["nodes"]}
1161 assert _B in node_ids
1162 assert _A not in node_ids
1163 assert _C not in node_ids
1164
1165 def test_active_only_all_active_shows_all(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1166 repo = _make_repo(tmp_path)
1167 add_dependencies(repo, _B, [_A])
1168 fake_active = [self._make_fake_active(_A), self._make_fake_active(_B)]
1169 args = _dag_ns(fmt="json", active_only=True)
1170 with _patch_dag_repo(repo), \
1171 patch("muse.cli.commands.dag.active_reservations", return_value=fake_active):
1172 dag_run(args)
1173 out = json.loads(capsys.readouterr().out)
1174 node_ids = {n["reservation_id"] for n in out["nodes"]}
1175 assert _A in node_ids and _B in node_ids
1176
1177 def test_active_only_reflected_true_in_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1178 repo = _make_repo(tmp_path)
1179 args = _dag_ns(fmt="json", active_only=True)
1180 with _patch_dag_repo(repo), \
1181 patch("muse.cli.commands.dag.active_reservations", return_value=[]):
1182 dag_run(args)
1183 out = json.loads(capsys.readouterr().out)
1184 assert out["active_only"] is True
1185
1186 def test_active_only_text_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1187 repo = _make_repo(tmp_path)
1188 add_dependencies(repo, _B, [_A])
1189 add_dependencies(repo, _C, [_B])
1190 # Only _B is active — _A and _C are filtered from graph nodes
1191 fake_active = [self._make_fake_active(_B)]
1192 args = _dag_ns(fmt="json", active_only=True)
1193 with _patch_dag_repo(repo), \
1194 patch("muse.cli.commands.dag.active_reservations", return_value=fake_active):
1195 dag_run(args)
1196 out = json.loads(capsys.readouterr().out)
1197 node_ids = {n["reservation_id"] for n in out["nodes"]}
1198 # Only _B passes the active-only filter
1199 assert _B in node_ids
1200 assert _C not in node_ids
1201 assert _A not in node_ids
1202 assert out["total_nodes"] == 1
1203
1204
1205 # ── New: --topo flag ───────────────────────────────────────────────────────────
1206
1207
1208 class TestCliDagTopoFlag:
1209 """--topo adds a TOPO column; without it a flat table is shown."""
1210
1211 def test_without_topo_no_topo_column(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1212 repo = _make_repo(tmp_path)
1213 add_dependencies(repo, _B, [_A])
1214 args = _dag_ns(fmt="text", topo=False)
1215 with _patch_dag_repo(repo):
1216 dag_run(args)
1217 out = capsys.readouterr().out
1218 # flat table has STATUS column but no TOPO column header
1219 assert "STATUS" in out
1220 assert "TOPO" not in out
1221
1222 def test_with_topo_shows_topo_column(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1223 repo = _make_repo(tmp_path)
1224 add_dependencies(repo, _B, [_A])
1225 args = _dag_ns(fmt="text", topo=True)
1226 with _patch_dag_repo(repo):
1227 dag_run(args)
1228 out = capsys.readouterr().out
1229 assert "TOPO" in out
1230 assert "STATUS" in out
1231
1232 def test_topo_flag_does_not_affect_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1233 """JSON output is identical regardless of --topo (topo_index always present)."""
1234 repo = _make_repo(tmp_path)
1235 add_dependencies(repo, _B, [_A])
1236
1237 args_with = _dag_ns(fmt="json", topo=True)
1238 with _patch_dag_repo(repo):
1239 dag_run(args_with)
1240 out_with = json.loads(capsys.readouterr().out)
1241
1242 args_without = _dag_ns(fmt="json", topo=False)
1243 with _patch_dag_repo(repo):
1244 dag_run(args_without)
1245 out_without = json.loads(capsys.readouterr().out)
1246
1247 # Both must have topo_index on every node
1248 for node in out_with["nodes"]:
1249 assert node["topo_index"] is not None
1250 for node in out_without["nodes"]:
1251 assert node["topo_index"] is not None
1252
1253 def test_without_topo_nodes_shown_in_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1254 repo = _make_repo(tmp_path)
1255 add_dependencies(repo, _B, [_A])
1256 args = _dag_ns(fmt="text", topo=False)
1257 with _patch_dag_repo(repo):
1258 dag_run(args)
1259 out = capsys.readouterr().out
1260 # Node IDs (truncated) must appear
1261 assert _B[:8] in out
1262
1263 def test_with_topo_node_id_in_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1264 repo = _make_repo(tmp_path)
1265 add_dependencies(repo, _B, [_A])
1266 args = _dag_ns(fmt="text", topo=True)
1267 with _patch_dag_repo(repo):
1268 dag_run(args)
1269 out = capsys.readouterr().out
1270 assert _B[:8] in out
1271
1272 def test_topo_order_correct_in_text(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1273 """With --topo, nodes with no deps must appear before dependents."""
1274 repo = _make_repo(tmp_path)
1275 add_dependencies(repo, _B, [_A])
1276 args = _dag_ns(fmt="text", topo=True)
1277 with _patch_dag_repo(repo):
1278 dag_run(args)
1279 out = capsys.readouterr().out
1280 # _A has lower topo index → must appear before _B in output
1281 pos_a = out.find(_A[:8])
1282 pos_b = out.find(_B[:8])
1283 assert pos_a < pos_b, "dependency must appear before dependent in topo mode"
1284
1285 def test_empty_dag_topo_no_crash(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1286 repo = _make_repo(tmp_path)
1287 args = _dag_ns(fmt="text", topo=True)
1288 with _patch_dag_repo(repo):
1289 dag_run(args)
1290 out = capsys.readouterr().out
1291 assert "no dependency records" in out
1292
1293
1294 # ---------------------------------------------------------------------------
1295 # TestRegisterFlags — --json / -j normalized at argparse level
1296 # ---------------------------------------------------------------------------
1297
1298
1299 class TestRegisterFlags:
1300 """register() must expose --json with -j shorthand and dest=json_out."""
1301
1302 def _make_parser(self) -> "argparse.ArgumentParser":
1303 import argparse as ap
1304 from muse.cli.commands.dag import register
1305 root = ap.ArgumentParser()
1306 subs = root.add_subparsers()
1307 register(subs)
1308 return root
1309
1310 def test_json_out_default_false(self) -> None:
1311 p = self._make_parser()
1312 ns = p.parse_args(['dag'])
1313 assert ns.json_out is False
1314
1315 def test_json_out_true_with_json_flag(self) -> None:
1316 p = self._make_parser()
1317 ns = p.parse_args(['dag', '--json'])
1318 assert ns.json_out is True
1319
1320 def test_json_out_true_with_j_flag(self) -> None:
1321 p = self._make_parser()
1322 ns = p.parse_args(['dag', '-j'])
1323 assert ns.json_out is True
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 29 days ago