gabriel / muse public
dag.py python
466 lines 16.5 KB
Raw
sha256:1ddad36d76d3a8d323f9b3664169cb184b7a38b39208214a2ae504154260826f fix: show full cryptographic IDs in all human-readable CLI output Sonnet 4.6 patch 5 days ago
1 """``muse coord dag`` — inspect the coordination dependency DAG.
2
3 Shows which reservations are waiting for others to complete, reports blocked
4 status, detects cycles, and computes the correct execution order via
5 topological sort.
6
7 Overview
8 --------
9 When multiple agents reserve addresses in parallel, some agents may declare
10 that their work depends on another agent's reservation being released first.
11 The dependency DAG captures these ordering constraints.
12
13 ``muse coord dag`` reads all dependency records from
14 ``.muse/coordination/dependencies/``, cross-references the currently active
15 reservations, and answers three questions:
16
17 1. **Who is blocked?** — which reservations cannot start because a dependency
18 is still active.
19 2. **What order should work proceed?** — topological sort respecting all
20 declared dependencies.
21 3. **Are there any cycles?** — flag immediately; cycles mean no agent can ever
22 become unblocked.
23
24 Output examples
25 ---------------
26 Text (full DAG, default)::
27
28 Dependency DAG — 4 node(s), 3 edge(s)
29 1 blocked 3 unblocked 0 cycles
30
31 TOPO STATUS ID DEPENDS-ON
32 ────────────────────────────────────────────────────────────────────
33 1 unblocked a1b2c3d4 (no deps)
34 2 unblocked e5f6a7b8 (no deps)
35 3 BLOCKED c9d0e1f2 a1b2c3d4
36 4 unblocked 01234567 e5f6a7b8
37
38 Text (flat, no topo column, default without --topo)::
39
40 STATUS ID DEPENDS-ON
41 ────────────────────────────────────────────────────────────────────
42 unblocked a1b2c3d4 (no deps)
43 BLOCKED c9d0e1f2 a1b2c3d4
44
45 JSON::
46
47 {
48 "schema_version": "...",
49 "total_nodes": 4,
50 "total_edges": 3,
51 "blocked_count": 1,
52 "active_only": false,
53 "cycle": null,
54 "nodes": [
55 {
56 "reservation_id": "c9d0e1f2-...",
57 "depends_on": ["a1b2c3d4-..."],
58 "active": true,
59 "blocked": true,
60 "blocking": ["a1b2c3d4-..."],
61 "topo_index": 3
62 },
63 ...
64 ]
65 }
66
67 Exit codes::
68
69 0 — success (even when blocked nodes exist)
70 1 — cycle detected, bad arguments, or unexpected error
71 """
72
73 import argparse
74 import json
75 import sys
76
77 from collections.abc import Callable
78 from typing import TypedDict
79
80 from muse.core.coordination import _validate_reservation_id, active_reservations
81 from muse.core.dag import (
82 DependencyRecord,
83 detect_cycle,
84 get_blocking,
85 is_blocked,
86 load_all_dependencies,
87 load_dag,
88 topological_sort,
89 )
90 from muse.core.envelope import EnvelopeJson, make_envelope
91 from muse.core.errors import ExitCode
92 from muse.core.repo import require_repo
93 from muse.core.timing import start_timer
94 from muse.core.validation import sanitize_display
95
96 type _Graph = dict[str, set[str]]
97
98 class _DagErrorJson(EnvelopeJson):
99 """JSON output for dag error paths."""
100
101 error: str
102 status: str
103
104 class _DagNodeJson(TypedDict):
105 """One node in the DAG output."""
106
107 reservation_id: str
108 depends_on: list[str]
109 active: bool
110 blocked: bool
111 blocking: list[str]
112 topo_index: int | None
113
114 class _DagSingleJson(EnvelopeJson):
115 """JSON output for ``muse coord dag --reservation-id <id> --json``."""
116
117 reservation_id: str
118 depends_on: list[str]
119 active: bool
120 blocked: bool
121 blocking: list[str]
122 cycle: list[str] | None
123
124 class _DagFullJson(EnvelopeJson):
125 """JSON output for ``muse coord dag --json`` (full DAG)."""
126
127 total_nodes: int
128 total_edges: int
129 blocked_count: int
130 active_only: bool
131 cycle: list[str] | None
132 nodes: list[_DagNodeJson]
133
134 def register(
135 subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]",
136 ) -> None:
137 """Register ``dag`` on *subparsers* (under ``muse coord``).
138
139 Wires all flags with their defaults, choices, and help text so that
140 ``--help`` output is accurate. Sets ``func`` to :func:`run`.
141
142 Flags registered
143 ----------------
144 ``--reservation-id <sha256:...>``
145 Show dependency details for a single reservation. Must be a valid
146 sha256: content ID when provided.
147 ``--topo``
148 Print nodes in topological execution order with an explicit TOPO
149 column. Without this flag, a simpler flat table (no TOPO column)
150 is shown.
151 ``--active-only``
152 Restrict the graph to nodes that correspond to currently active
153 reservations. Nodes whose reservation has already expired or been
154 released are hidden.
155 ``--format`` / ``--json``
156 Emit compact JSON to stdout; default is human-readable text.
157 """
158 parser = subparsers.add_parser(
159 "dag",
160 help="Inspect the coordination dependency DAG.",
161 formatter_class=argparse.RawDescriptionHelpFormatter,
162 description=__doc__,
163 )
164 parser.add_argument(
165 "--reservation-id",
166 dest="reservation_id",
167 default=None,
168 metavar="ID",
169 help=(
170 "Show dependency details for a single reservation ID. "
171 "Must be a valid sha256: content ID. When omitted the full DAG is shown."
172 ),
173 )
174 parser.add_argument(
175 "--topo",
176 action="store_true",
177 default=False,
178 help=(
179 "Print nodes in topological execution order with a TOPO column. "
180 "Without this flag, a simpler flat table is shown."
181 ),
182 )
183 parser.add_argument(
184 "--active-only",
185 action="store_true",
186 default=False,
187 dest="active_only",
188 help=(
189 "Restrict output to nodes whose reservation is currently active. "
190 "Expired and released reservations are hidden."
191 ),
192 )
193 parser.add_argument(
194 "--json", "-j",
195 action="store_true",
196 dest="json_out",
197 help="Emit machine-readable JSON.",
198 )
199 parser.set_defaults(func=run)
200
201 def run(args: argparse.Namespace) -> None:
202 """Inspect the coordination dependency DAG.
203
204 Loads all dependency records and active reservations from
205 ``.muse/coordination/``, derives blocked status for each node, and
206 optionally computes topological order. Use ``--reservation-id`` to focus
207 on a single node; ``--active-only`` to prune expired reservations.
208
209 Agent quickstart
210 ----------------
211 ::
212
213 muse coord dag --format json
214 muse coord dag --active-only --format json
215 muse coord dag --reservation-id <sha256:...> --format json
216 muse coord dag --topo --format json
217
218 JSON fields (full graph)
219 ------------------------
220 total_nodes Number of nodes in the graph.
221 total_edges Total number of dependency edges.
222 blocked_count Number of nodes with unsatisfied dependencies.
223 active_only ``true`` if ``--active-only`` was passed.
224 cycle List of node IDs forming a cycle, or ``null``.
225 nodes List of node objects: ``reservation_id``, ``depends_on``,
226 ``active``, ``blocked``, ``blocking``.
227
228 JSON fields (single node, with ``--reservation-id``)
229 -----------------------------------------------------
230 reservation_id content-addressed ID of the requested node.
231 depends_on List of reservation IDs this node depends on.
232 active ``true`` if the reservation is currently active.
233 blocked ``true`` if any dependency is unresolved.
234 blocking List of reservation IDs blocked by this node.
235 cycle Cycle path if a cycle is detected, else ``null``.
236
237 Exit codes
238 ----------
239 0 Success (blocked nodes or empty graph are still success).
240 1 Cycle detected, bad arguments, or error loading state.
241 2 Not inside a Muse repository.
242 """
243 elapsed = start_timer()
244 json_out: bool = args.json_out
245 reservation_id: str | None = args.reservation_id
246 active_only: bool = args.active_only
247
248 # ── Input validation (before any file I/O) ────────────────────────────────
249
250 if reservation_id is not None:
251 try:
252 _validate_reservation_id(reservation_id)
253 except ValueError as exc:
254 msg = str(exc)
255 if json_out:
256 print(json.dumps(_DagErrorJson(**make_envelope(elapsed, exit_code=ExitCode.USER_ERROR), error=msg, status="bad_reservation_id")))
257 else:
258 print(f"❌ --reservation-id: {msg}", file=sys.stderr)
259 raise SystemExit(ExitCode.USER_ERROR)
260
261 root = require_repo()
262
263 all_deps = load_all_dependencies(root)
264 graph = load_dag(root)
265
266 # Derive the set of currently active reservation IDs.
267 try:
268 active = active_reservations(root)
269 except Exception as exc: # noqa: BLE001
270 if json_out:
271 print(json.dumps(_DagErrorJson(**make_envelope(elapsed, exit_code=ExitCode.USER_ERROR), error=str(exc), status="error")))
272 else:
273 print(f"❌ {exc}", file=sys.stderr)
274 raise SystemExit(ExitCode.USER_ERROR)
275
276 active_ids: frozenset[str] = frozenset(r.reservation_id for r in active)
277
278 # ── Active-only filter ────────────────────────────────────────────────────
279 if active_only:
280 graph = {k: v for k, v in graph.items() if k in active_ids}
281
282 # ── Cycle detection ───────────────────────────────────────────────────────
283 cycle = detect_cycle(graph)
284
285 # ── Single-reservation mode ───────────────────────────────────────────────
286 if reservation_id is not None:
287 _run_single(
288 reservation_id, graph, active_ids, cycle, json_out, elapsed
289 )
290 return
291
292 # ── Full DAG mode ─────────────────────────────────────────────────────────
293 _run_full(graph, active_ids, cycle, all_deps, json_out, args.topo, active_only, elapsed)
294
295 if cycle is not None:
296 raise SystemExit(ExitCode.USER_ERROR)
297
298 # ── Output helpers ─────────────────────────────────────────────────────────────
299
300 def _run_single(
301 reservation_id: str,
302 graph: _Graph,
303 active_ids: frozenset[str],
304 cycle: list[str] | None,
305 json_out: bool,
306 elapsed: Callable[[], float],
307 ) -> None:
308 """Emit dependency details for a single reservation.
309
310 Args:
311 reservation_id: The content-addressed ID to look up in the graph.
312 graph: Adjacency map — node → set of nodes it depends on.
313 active_ids: Reservation IDs whose reservation file is still active.
314 cycle: Cycle path returned by :func:`~muse.core.dag.detect_cycle`,
315 or ``None`` if the graph is acyclic.
316 json_out: Emit compact JSON when ``True``; human-readable text otherwise.
317 """
318 deps = sorted(graph.get(reservation_id, set()))
319 blocked = is_blocked(reservation_id, graph, active_ids)
320 blocking = get_blocking(reservation_id, graph, active_ids)
321
322 if json_out:
323 print(json.dumps(_DagSingleJson(
324 **make_envelope(elapsed),
325 reservation_id=reservation_id,
326 depends_on=deps,
327 active=reservation_id in active_ids,
328 blocked=blocked,
329 blocking=blocking,
330 cycle=cycle,
331 )))
332 return
333
334 rid_short = sanitize_display(reservation_id)
335 status = "BLOCKED" if blocked else "unblocked"
336 print(f"\nReservation {rid_short}… [{status}]")
337 if deps:
338 print(f" depends_on ({len(deps)}):")
339 for dep in deps:
340 active_marker = " ← ACTIVE" if dep in active_ids else ""
341 print(f" {sanitize_display(dep)}{active_marker}")
342 else:
343 print(" no declared dependencies")
344 if blocking:
345 print(f"\n Blocking ({len(blocking)}):")
346 for b in blocking:
347 print(f" {sanitize_display(b)} (still active)")
348 if cycle:
349 print(f"\n ⚠️ Cycle detected: {' → '.join(sanitize_display(c) for c in cycle)}")
350
351 def _run_full(
352 graph: _Graph,
353 active_ids: frozenset[str],
354 cycle: list[str] | None,
355 all_deps: list[DependencyRecord],
356 json_out: bool,
357 topo_mode: bool,
358 active_only: bool,
359 elapsed: Callable[[], float],
360 ) -> None:
361 """Emit the full DAG listing.
362
363 Args:
364 graph: Adjacency map — node → set of nodes it depends on. May be
365 pre-filtered to active-only nodes by the caller.
366 active_ids: Reservation IDs whose reservation file is still active.
367 cycle: Cycle path from :func:`~muse.core.dag.detect_cycle`, or
368 ``None`` if acyclic.
369 all_deps: Raw dependency records loaded from disk (used for metadata
370 in future extensions; not currently used directly in output).
371 json_out: Emit compact JSON when ``True``; human-readable text otherwise.
372 topo_mode: When ``True``, include a TOPO column in text output and
373 order rows by dependency-first execution order.
374 active_only: Reflected in JSON output as ``active_only`` field so
375 consumers know whether the graph was filtered.
376 """
377 total_nodes = len(graph)
378 total_edges = sum(len(v) for v in graph.values())
379
380 # Compute topological order (or handle cycle gracefully).
381 if cycle is None:
382 try:
383 topo_order = topological_sort(graph)
384 except ValueError:
385 topo_order = sorted(graph)
386 else:
387 topo_order = sorted(graph)
388
389 topo_index = {node: i + 1 for i, node in enumerate(topo_order)}
390
391 # Build per-node records.
392 nodes: list[_DagNodeJson] = []
393 for node in topo_order:
394 deps = sorted(graph.get(node, set()))
395 blocked = is_blocked(node, graph, active_ids)
396 blocking = get_blocking(node, graph, active_ids)
397 nodes.append(_DagNodeJson(
398 reservation_id=node,
399 depends_on=deps,
400 active=node in active_ids,
401 blocked=blocked,
402 blocking=blocking,
403 topo_index=topo_index.get(node),
404 ))
405
406 blocked_count = sum(1 for n in nodes if n["blocked"])
407
408 if json_out:
409 print(json.dumps(_DagFullJson(
410 **make_envelope(elapsed),
411 total_nodes=total_nodes,
412 total_edges=total_edges,
413 blocked_count=blocked_count,
414 active_only=active_only,
415 cycle=cycle,
416 nodes=nodes,
417 )))
418 return
419
420 # Text output.
421 print(f"\nDependency DAG — {total_nodes} node(s), {total_edges} edge(s)")
422 if cycle:
423 cycle_str = " → ".join(sanitize_display(c) for c in cycle)
424 print(f" ⚠️ CYCLE DETECTED: {cycle_str}")
425 print(" No topological order is possible until the cycle is resolved.")
426 else:
427 print(
428 f" {blocked_count} blocked "
429 f"{total_nodes - blocked_count} unblocked"
430 )
431
432 if not nodes:
433 print("\n (no dependency records)")
434 return
435
436 print()
437 if topo_mode:
438 print(f"{'TOPO':>4} {'STATUS':<12} ID DEPENDS-ON")
439 print("─" * 80)
440 for node_rec in nodes:
441 idx = node_rec["topo_index"] or 0
442 status = "BLOCKED" if node_rec["blocked"] else "unblocked"
443 rid = sanitize_display(node_rec["reservation_id"])
444 if node_rec["depends_on"]:
445 deps_str = ", ".join(
446 sanitize_display(d) for d in node_rec["depends_on"]
447 )
448 else:
449 deps_str = "(no deps)"
450 print(f"{idx:>4} {status:<12} {rid} {deps_str}")
451 else:
452 print(f"{'STATUS':<12} ID DEPENDS-ON")
453 print("─" * 64)
454 for node_rec in nodes:
455 status = "BLOCKED" if node_rec["blocked"] else "unblocked"
456 rid = sanitize_display(node_rec["reservation_id"])
457 if node_rec["depends_on"]:
458 deps_str = ", ".join(
459 sanitize_display(d) for d in node_rec["depends_on"]
460 )
461 else:
462 deps_str = "(no deps)"
463 print(f"{status:<12} {rid} {deps_str}")
464
465 if cycle:
466 print(f"\n ⚠️ Resolve the cycle before any blocked agent can proceed.")
File History 1 commit
sha256:1ddad36d76d3a8d323f9b3664169cb184b7a38b39208214a2ae504154260826f fix: show full cryptographic IDs in all human-readable CLI output Sonnet 4.6 patch 5 days ago