gabriel / muse public

test_cmd_coord_gc.py file-level

at sha256:8 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 πŸ’₯ blast risk
sha256:4 Merge branch 'dev' into main · gabriel · Jun 17, 2026
1 """Comprehensive tests for ``muse coord gc``.
2
3 Coverage matrix
4 ---------------
5 Unit
6 ~~~~
7 * run_coord_gc dry_run=True β€” does not delete files
8 * run_coord_gc dry_run=False β€” deletes expired records
9 * grace period β€” recently expired records skipped
10 * include_intents flag β€” intents purged when opt-in
11 * orphaned release (no reservation) β€” collected
12 * orphaned heartbeat (no reservation) β€” collected
13 * released reservation β€” collected after grace period
14 * heartbeat-extended reservation β€” not collected until effective expiry passes
15 * _fmt_bytes β€” all size ranges including TiB
16
17 Integration
18 ~~~~~~~~~~~
19 * Default (dry-run) β€” shows "DRY RUN", nothing deleted
20 * --execute β€” actually deletes expired reservations
21 * --grace-period large β€” recently expired records not deleted
22 * --include-intents β€” intents counted in output
23 * --verbose β€” removed IDs printed to output
24 * --format json β€” valid compact JSON with all required keys
25 * --json shorthand β€” same result as --format json
26 * Empty repo β€” exits 0 with "Nothing to collect"
27 * --grace-period -1 β€” exits USER_ERROR (1)
28 * --max-intent-age 0 β€” exits USER_ERROR (1)
29 * JSON compact β€” no newlines inside the object
30 * Dry-run JSON β€” dry_run=true, nothing deleted
31 * Orphaned heartbeat/release removed even with active reservations present
32
33 E2E
34 ~~~
35 * Full lifecycle: reserve β†’ heartbeat β†’ release β†’ gc β†’ all files gone
36 * Active reservation survives GC; expired neighbour is collected
37 * Concurrent dry-run GC on same repo does not crash
38
39 Stress
40 ~~~~~~
41 * 500 expired reservations GC'd < 3 s
42 * 1000-record mixed repo (active + expired) β€” only expired collected
43 """
44
45 from __future__ import annotations
46
47 import argparse
48
49 import datetime
50 import itertools
51 import json as _json
52 import pathlib
53 import time
54
55 import pytest
56
57 from tests.cli_test_helper import CliRunner
58 from muse.core.types import content_hash, now_utc_iso
59 from muse.core.paths import coordination_dir, muse_dir
60
61 _id_seq = itertools.count()
62
63 def _new_id() -> str:
64 return content_hash({"seq": next(_id_seq)})
65
66 runner = CliRunner()
67 cli = None
68
69
70 # ── Fixtures ──────────────────────────────────────────────────────────────────
71
72
73 @pytest.fixture()
74 def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
75 dot_muse = muse_dir(tmp_path)
76 dot_muse.mkdir()
77 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n")
78 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
79 return tmp_path
80
81
82 # ── Helpers ───────────────────────────────────────────────────────────────────
83
84
85 def _write_expired_reservation(repo: pathlib.Path, run_id: str = "agent-x") -> str:
86 coord_dir = coordination_dir(repo) / "reservations"
87 coord_dir.mkdir(parents=True, exist_ok=True)
88 rid = _new_id()
89 now = datetime.datetime.now(datetime.timezone.utc)
90 data = {
91 "reservation_id": rid,
92 "run_id": run_id,
93 "branch": "main",
94 "addresses": ["src/x.py::foo"],
95 "operation": None,
96 "created_at": (now - datetime.timedelta(hours=2)).isoformat(),
97 "expires_at": (now - datetime.timedelta(hours=1)).isoformat(),
98 }
99 (coord_dir / f"{rid}.json").write_text(_json.dumps(data))
100 return rid
101
102
103 def _write_active_reservation(repo: pathlib.Path, run_id: str = "agent-y") -> str:
104 coord_dir = coordination_dir(repo) / "reservations"
105 coord_dir.mkdir(parents=True, exist_ok=True)
106 rid = _new_id()
107 now = datetime.datetime.now(datetime.timezone.utc)
108 data = {
109 "reservation_id": rid,
110 "run_id": run_id,
111 "branch": "main",
112 "addresses": ["src/y.py::bar"],
113 "operation": None,
114 "created_at": now.isoformat(),
115 "expires_at": (now + datetime.timedelta(hours=1)).isoformat(),
116 }
117 (coord_dir / f"{rid}.json").write_text(_json.dumps(data))
118 return rid
119
120
121 def _write_expired_intent(repo: pathlib.Path, run_id: str = "agent-z") -> str:
122 intent_dir = coordination_dir(repo) / "intents"
123 intent_dir.mkdir(parents=True, exist_ok=True)
124 iid = _new_id()
125 now = datetime.datetime.now(datetime.timezone.utc)
126 data = {
127 "intent_id": iid,
128 "run_id": run_id,
129 "branch": "main",
130 "addresses": ["src/z.py::baz"],
131 "created_at": (now - datetime.timedelta(days=10)).isoformat(),
132 "expires_at": (now - datetime.timedelta(days=9)).isoformat(),
133 }
134 (intent_dir / f"{iid}.json").write_text(_json.dumps(data))
135 return iid
136
137
138 # ── Unit: run_coord_gc ────────────────────────────────────────────────────────
139
140
141 class TestRunCoordGcUnit:
142 def test_dry_run_does_not_delete_files(self, repo: pathlib.Path) -> None:
143 from muse.core.coordination import run_coord_gc
144 rid = _write_expired_reservation(repo)
145 result = run_coord_gc(repo, dry_run=True, grace_period_seconds=0)
146 assert result.dry_run is True
147 # File must still exist after dry run
148 res_file = coordination_dir(repo) / "reservations" / f"{rid}.json"
149 assert res_file.exists()
150
151 def test_execute_deletes_expired_reservation(self, repo: pathlib.Path) -> None:
152 from muse.core.coordination import run_coord_gc
153 rid = _write_expired_reservation(repo)
154 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
155 assert result.dry_run is False
156 assert result.reservations_removed >= 1
157 res_file = coordination_dir(repo) / "reservations" / f"{rid}.json"
158 assert not res_file.exists()
159
160 def test_active_reservation_not_deleted(self, repo: pathlib.Path) -> None:
161 from muse.core.coordination import run_coord_gc
162 rid = _write_active_reservation(repo)
163 run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
164 res_file = coordination_dir(repo) / "reservations" / f"{rid}.json"
165 assert res_file.exists()
166
167 def test_grace_period_protects_recently_expired(self, repo: pathlib.Path) -> None:
168 from muse.core.coordination import run_coord_gc
169 rid = _write_expired_reservation(repo)
170 # Grace period of 7 hours (25200s) > 1 hour elapsed since expiry β†’ skipped
171 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=25200)
172 assert result.reservations_removed == 0
173 res_file = coordination_dir(repo) / "reservations" / f"{rid}.json"
174 assert res_file.exists()
175
176 def test_include_intents_false_leaves_intents(self, repo: pathlib.Path) -> None:
177 from muse.core.coordination import run_coord_gc
178 iid = _write_expired_intent(repo)
179 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0, include_intents=False)
180 assert result.intents_removed == 0
181 intent_file = coordination_dir(repo) / "intents" / f"{iid}.json"
182 assert intent_file.exists()
183
184 def test_include_intents_true_removes_old_intents(self, repo: pathlib.Path) -> None:
185 from muse.core.coordination import run_coord_gc
186 _write_expired_intent(repo)
187 result = run_coord_gc(
188 repo,
189 dry_run=False,
190 grace_period_seconds=0,
191 include_intents=True,
192 max_intent_age_seconds=60, # 1 minute β€” our intent is 10 days old
193 )
194 assert result.intents_removed >= 1
195
196 def test_result_has_duration_ms(self, repo: pathlib.Path) -> None:
197 from muse.core.coordination import run_coord_gc
198 result = run_coord_gc(repo, dry_run=True, grace_period_seconds=0)
199 assert result.duration_ms >= 0.0
200
201 def test_empty_repo_total_removed_is_zero(self, repo: pathlib.Path) -> None:
202 from muse.core.coordination import run_coord_gc
203 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
204 assert result.total_removed == 0
205
206
207 # ── Integration ───────────────────────────────────────────────────────────────
208
209
210 class TestCoordGcIntegration:
211 def test_default_dry_run(self, repo: pathlib.Path) -> None:
212 _write_expired_reservation(repo)
213 result = runner.invoke(cli, ["coord", "gc"])
214 assert result.exit_code == 0
215 assert "DRY RUN" in result.output
216
217 def test_default_dry_run_does_not_delete(self, repo: pathlib.Path) -> None:
218 rid = _write_expired_reservation(repo)
219 runner.invoke(cli, ["coord", "gc"])
220 res_file = coordination_dir(repo) / "reservations" / f"{rid}.json"
221 assert res_file.exists()
222
223 def test_execute_deletes_expired(self, repo: pathlib.Path) -> None:
224 rid = _write_expired_reservation(repo)
225 result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0"])
226 assert result.exit_code == 0
227 res_file = coordination_dir(repo) / "reservations" / f"{rid}.json"
228 assert not res_file.exists()
229
230 def test_execute_text_output_shows_gc_complete(self, repo: pathlib.Path) -> None:
231 _write_expired_reservation(repo)
232 result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0"])
233 assert result.exit_code == 0
234 assert "GC complete" in result.output
235
236 def test_grace_period_large_nothing_deleted(self, repo: pathlib.Path) -> None:
237 _write_expired_reservation(repo) # expired 1 hour ago
238 # Grace period of 7 hours (25200s) > 1 hour elapsed since expiry β†’ skipped
239 result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "25200"])
240 assert result.exit_code == 0
241 assert "Nothing to collect" in result.output
242
243 def test_include_intents_flag_reaches_intents(self, repo: pathlib.Path) -> None:
244 _write_expired_intent(repo)
245 result = runner.invoke(cli, [
246 "coord", "gc", "--execute", "--include-intents",
247 "--max-intent-age", "60", "--grace-period", "0",
248 ])
249 assert result.exit_code == 0
250
251 def test_verbose_prints_removed_ids(self, repo: pathlib.Path) -> None:
252 rid = _write_expired_reservation(repo)
253 result = runner.invoke(cli, [
254 "coord", "gc", "--execute", "--grace-period", "0", "--verbose",
255 ])
256 assert result.exit_code == 0
257 assert rid in result.output
258
259 def test_format_json_valid_structure(self, repo: pathlib.Path) -> None:
260 _write_expired_reservation(repo)
261 result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0", "--json"])
262 assert result.exit_code == 0
263 data = _json.loads(result.output.strip())
264 required_keys = {
265 "dry_run", "grace_period_seconds", "include_intents", "max_intent_age_seconds",
266 "reservations_removed", "reservations_removed_bytes", "releases_removed",
267 "releases_removed_bytes", "heartbeats_removed", "heartbeats_removed_bytes",
268 "intents_removed", "intents_removed_bytes", "total_removed", "total_removed_bytes",
269 "removed_ids", "duration_ms",
270 }
271 assert required_keys <= set(data)
272
273 def test_json_shorthand(self, repo: pathlib.Path) -> None:
274 result = runner.invoke(cli, ["coord", "gc", "--json"])
275 assert result.exit_code == 0
276 data = _json.loads(result.output.strip())
277 assert "dry_run" in data
278 assert data["dry_run"] is True
279
280 def test_empty_repo_exits_0_nothing_to_collect(self, repo: pathlib.Path) -> None:
281 result = runner.invoke(cli, ["coord", "gc"])
282 assert result.exit_code == 0
283 assert "Nothing to collect" in result.output
284
285 def test_grace_period_negative_exits_user_error(self, repo: pathlib.Path) -> None:
286 from muse.core.errors import ExitCode
287 result = runner.invoke(cli, ["coord", "gc", "--grace-period", "-1"])
288 assert result.exit_code == ExitCode.USER_ERROR
289
290 def test_max_intent_age_zero_exits_user_error(self, repo: pathlib.Path) -> None:
291 from muse.core.errors import ExitCode
292 result = runner.invoke(cli, ["coord", "gc", "--max-intent-age", "0"])
293 assert result.exit_code == ExitCode.USER_ERROR
294
295 def test_json_dry_run_field_true_by_default(self, repo: pathlib.Path) -> None:
296 result = runner.invoke(cli, ["coord", "gc", "--json"])
297 data = _json.loads(result.output.strip())
298 assert data["dry_run"] is True
299
300 def test_json_execute_dry_run_field_false(self, repo: pathlib.Path) -> None:
301 result = runner.invoke(cli, ["coord", "gc", "--execute", "--json"])
302 data = _json.loads(result.output.strip())
303 assert data["dry_run"] is False
304
305 def test_duration_ms_is_nonnegative_float(self, repo: pathlib.Path) -> None:
306 result = runner.invoke(cli, ["coord", "gc", "--json"])
307 data = _json.loads(result.output.strip())
308 assert isinstance(data["duration_ms"], float)
309 assert data["duration_ms"] >= 0.0
310
311
312 # ── Security ──────────────────────────────────────────────────────────────────
313
314
315 class TestCoordGcSecurity:
316 def test_gc_does_not_traverse_outside_coordination_dir(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None:
317 """Verify GC only touches .muse/coordination/ subdirectories."""
318 sentinel = tmp_path / "outside_sentinel.json"
319 sentinel.write_text('{"should": "not be deleted"}')
320
321 result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0"])
322 assert result.exit_code == 0
323 assert sentinel.exists(), "GC must not delete files outside .muse/coordination/"
324
325 def test_symlink_outside_repo_not_followed(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None:
326 """A symlink inside coordination/ pointing outside the repo is skipped."""
327 import os
328 outside_file = tmp_path / "secret.json"
329 outside_file.write_text('{"secret": "data"}')
330
331 reservations_dir = coordination_dir(repo) / "reservations"
332 reservations_dir.mkdir(parents=True, exist_ok=True)
333 link = reservations_dir / "malicious_link.json"
334 try:
335 os.symlink(outside_file, link)
336 except OSError:
337 pytest.skip("symlink creation not supported")
338
339 # GC should not crash and should not delete the outside file
340 result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0"])
341 assert result.exit_code == 0
342 assert outside_file.exists()
343
344
345 # ── Stress ────────────────────────────────────────────────────────────────────
346
347
348 class TestCoordGcStress:
349 def test_500_expired_reservations_under_3s(self, repo: pathlib.Path) -> None:
350 for _ in range(500):
351 _write_expired_reservation(repo)
352
353 t0 = time.monotonic()
354 result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0"])
355 elapsed = time.monotonic() - t0
356
357 assert result.exit_code == 0
358 assert elapsed < 3.0
359 data_lines = result.output
360 assert "GC complete" in data_lines or "removed" in data_lines.lower()
361
362 # Verify all files are gone
363 reservations_dir = coordination_dir(repo) / "reservations"
364 remaining = list(reservations_dir.glob("*.json"))
365 assert len(remaining) == 0
366
367 def test_1000_mixed_repo_only_expired_collected(self, repo: pathlib.Path) -> None:
368 """500 active + 500 expired: only the expired set is removed."""
369 for _ in range(500):
370 _write_expired_reservation(repo)
371 active_ids = {_write_active_reservation(repo) for _ in range(500)}
372
373 result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0"])
374 assert result.exit_code == 0
375
376 reservations_dir = coordination_dir(repo) / "reservations"
377 surviving = {p.stem for p in reservations_dir.glob("*.json")}
378 assert active_ids == surviving, "Active reservations must not be collected"
379
380
381 # ---------------------------------------------------------------------------
382 # Unit β€” _fmt_bytes
383 # ---------------------------------------------------------------------------
384
385
386 class TestFmtBytes:
387 def test_zero(self) -> None:
388 from muse.cli.commands.coord_gc import _fmt_bytes
389 assert _fmt_bytes(0) == "0 B"
390
391 def test_under_1024(self) -> None:
392 from muse.cli.commands.coord_gc import _fmt_bytes
393 assert _fmt_bytes(1023) == "1023 B"
394
395 def test_exactly_1_kib(self) -> None:
396 from muse.cli.commands.coord_gc import _fmt_bytes
397 assert _fmt_bytes(1024) == "1.0 KiB"
398
399 def test_exactly_1_mib(self) -> None:
400 from muse.cli.commands.coord_gc import _fmt_bytes
401 assert _fmt_bytes(1024 ** 2) == "1.0 MiB"
402
403 def test_exactly_1_gib(self) -> None:
404 from muse.cli.commands.coord_gc import _fmt_bytes
405 assert _fmt_bytes(1024 ** 3) == "1.0 GiB"
406
407 def test_exactly_1_tib(self) -> None:
408 from muse.cli.commands.coord_gc import _fmt_bytes
409 assert _fmt_bytes(1024 ** 4) == "1.0 TiB"
410
411 def test_2_tib(self) -> None:
412 from muse.cli.commands.coord_gc import _fmt_bytes
413 assert _fmt_bytes(2 * 1024 ** 4) == "2.0 TiB"
414
415 def test_gib_does_not_overflow_to_wrong_unit(self) -> None:
416 from muse.cli.commands.coord_gc import _fmt_bytes
417 # 512 GiB β€” must stay in GiB, not TiB
418 assert _fmt_bytes(512 * 1024 ** 3) == "512.0 GiB"
419
420 def test_1023_gib_stays_gib(self) -> None:
421 from muse.cli.commands.coord_gc import _fmt_bytes
422 result = _fmt_bytes(1023 * 1024 ** 3)
423 assert "GiB" in result
424
425
426 # ---------------------------------------------------------------------------
427 # Unit β€” orphan / lifecycle
428 # ---------------------------------------------------------------------------
429
430
431 class TestRunCoordGcOrphanAndLifecycle:
432 def test_orphaned_release_collected(self, repo: pathlib.Path) -> None:
433 """A release tombstone with no matching reservation is an orphan β€” collect it."""
434 from muse.core.coordination import run_coord_gc
435 releases_dir = coordination_dir(repo) / "releases"
436 releases_dir.mkdir(parents=True, exist_ok=True)
437 orphan_id = _new_id()
438 orphan_path = releases_dir / f"{orphan_id}.json"
439 orphan_path.write_text(_json.dumps({
440 "reservation_id": orphan_id,
441 "run_id": "ghost",
442 "released_at": now_utc_iso(),
443 "reason": "completed",
444 }))
445 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
446 assert result.releases_removed >= 1
447 assert not orphan_path.exists()
448
449 def test_orphaned_heartbeat_collected(self, repo: pathlib.Path) -> None:
450 """A heartbeat file with no matching reservation is an orphan β€” collect it."""
451 from muse.core.coordination import run_coord_gc
452 hb_dir = coordination_dir(repo) / "heartbeats"
453 hb_dir.mkdir(parents=True, exist_ok=True)
454 orphan_id = _new_id()
455 orphan_path = hb_dir / f"{orphan_id}.json"
456 now = datetime.datetime.now(datetime.timezone.utc)
457 orphan_path.write_text(_json.dumps({
458 "reservation_id": orphan_id,
459 "run_id": "ghost",
460 "last_beat_at": now.isoformat(),
461 "extended_expires_at": (now + datetime.timedelta(hours=1)).isoformat(),
462 }))
463 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
464 assert result.heartbeats_removed >= 1
465 assert not orphan_path.exists()
466
467 def test_released_reservation_collected_after_grace(self, repo: pathlib.Path) -> None:
468 """Reserve β†’ release β†’ gc(grace=0): reservation + tombstone both gone."""
469 from muse.core.coordination import (
470 run_coord_gc, create_reservation, create_release
471 )
472 res = create_reservation(
473 repo, run_id="agent-r", branch="main",
474 addresses=["a.py::f"], ttl_seconds=3600,
475 )
476 rid = res.reservation_id
477 create_release(repo, rid, run_id="agent-r")
478
479 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
480 assert result.reservations_removed >= 1
481 assert result.releases_removed >= 1
482 res_path = coordination_dir(repo) / "reservations" / f"{rid}.json"
483 rel_path = coordination_dir(repo) / "releases" / f"{rid}.json"
484 assert not res_path.exists()
485 assert not rel_path.exists()
486
487 def test_heartbeat_extended_reservation_not_expired(self, repo: pathlib.Path) -> None:
488 """A reservation past its original TTL but heartbeated is still active."""
489 from muse.core.coordination import (
490 run_coord_gc, create_reservation, create_heartbeat
491 )
492 # Reservation expired 30 minutes ago
493 coord_dir = coordination_dir(repo) / "reservations"
494 coord_dir.mkdir(parents=True, exist_ok=True)
495 rid = _new_id()
496 now = datetime.datetime.now(datetime.timezone.utc)
497 data = {
498 "reservation_id": rid, "run_id": "agent-hb", "branch": "main",
499 "addresses": ["a.py::f"], "operation": None,
500 "created_at": (now - datetime.timedelta(hours=2)).isoformat(),
501 "expires_at": (now - datetime.timedelta(minutes=30)).isoformat(),
502 }
503 (coord_dir / f"{rid}.json").write_text(_json.dumps(data))
504 # Heartbeat extends it 2 hours into the future
505 hb_dir = coordination_dir(repo) / "heartbeats"
506 hb_dir.mkdir(parents=True, exist_ok=True)
507 (hb_dir / f"{rid}.json").write_text(_json.dumps({
508 "reservation_id": rid, "run_id": "agent-hb",
509 "last_beat_at": now.isoformat(),
510 "extended_expires_at": (now + datetime.timedelta(hours=2)).isoformat(),
511 }))
512
513 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
514 assert result.reservations_removed == 0
515 assert (coord_dir / f"{rid}.json").exists(), "Heartbeated reservation must survive"
516
517 def test_dry_run_removed_ids_populated(self, repo: pathlib.Path) -> None:
518 """dry_run=True must populate removed_ids even though nothing is deleted."""
519 from muse.core.coordination import run_coord_gc
520 rid = _write_expired_reservation(repo)
521 result = run_coord_gc(repo, dry_run=True, grace_period_seconds=0)
522 assert rid in result.removed_ids
523 res_path = coordination_dir(repo) / "reservations" / f"{rid}.json"
524 assert res_path.exists(), "dry_run must not delete files"
525
526 def test_total_removed_is_sum_of_parts(self, repo: pathlib.Path) -> None:
527 """total_removed == sum of all category counters."""
528 from muse.core.coordination import run_coord_gc
529 _write_expired_reservation(repo)
530 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
531 expected = (
532 result.reservations_removed
533 + result.releases_removed
534 + result.heartbeats_removed
535 + result.intents_removed
536 )
537 assert result.total_removed == expected
538
539 def test_total_bytes_is_sum_of_parts(self, repo: pathlib.Path) -> None:
540 """total_removed_bytes == sum of all byte counters."""
541 from muse.core.coordination import run_coord_gc
542 _write_expired_reservation(repo)
543 result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
544 expected = (
545 result.reservations_removed_bytes
546 + result.releases_removed_bytes
547 + result.heartbeats_removed_bytes
548 + result.intents_removed_bytes
549 )
550 assert result.total_removed_bytes == expected
551
552
553 # ---------------------------------------------------------------------------
554 # Integration β€” JSON compact format and new exit codes
555 # ---------------------------------------------------------------------------
556
557
558 class TestCoordGcJsonAndExitCodes:
559 def test_json_output_is_compact(self, repo: pathlib.Path) -> None:
560 """JSON must be single-line (no indent=2 pretty-printing)."""
561 result = runner.invoke(cli, ["coord", "gc", "--json"])
562 assert result.exit_code == 0
563 assert "\n" not in result.output.strip()
564
565 def test_json_execute_compact(self, repo: pathlib.Path) -> None:
566 _write_expired_reservation(repo)
567 result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0", "--json"])
568 assert result.exit_code == 0
569 assert "\n" not in result.output.strip()
570
571 def test_grace_period_negative_json_has_status(self, repo: pathlib.Path) -> None:
572 result = runner.invoke(cli, ["coord", "gc", "--grace-period", "-1", "--json"])
573 data = _json.loads(result.output)
574 assert data["status"] == "bad_args"
575
576 def test_max_intent_age_zero_json_has_status(self, repo: pathlib.Path) -> None:
577 result = runner.invoke(cli, ["coord", "gc", "--max-intent-age", "0", "--json"])
578 data = _json.loads(result.output)
579 assert data["status"] == "bad_args"
580
581 def test_grace_period_negative_exits_user_error(self, repo: pathlib.Path) -> None:
582 from muse.core.errors import ExitCode
583 result = runner.invoke(cli, ["coord", "gc", "--grace-period", "-1"])
584 assert result.exit_code == ExitCode.USER_ERROR
585
586 def test_max_intent_age_zero_exits_user_error(self, repo: pathlib.Path) -> None:
587 from muse.core.errors import ExitCode
588 result = runner.invoke(cli, ["coord", "gc", "--max-intent-age", "0"])
589 assert result.exit_code == ExitCode.USER_ERROR
590
591 def test_json_removed_ids_list_populated(self, repo: pathlib.Path) -> None:
592 rid = _write_expired_reservation(repo)
593 result = runner.invoke(cli, ["coord", "gc", "--execute", "--grace-period", "0", "--json"])
594 data = _json.loads(result.output)
595 assert rid in data["removed_ids"]
596
597 def test_json_dry_run_removed_ids_populated(self, repo: pathlib.Path) -> None:
598 """removed_ids is populated in dry-run JSON too."""
599 rid = _write_expired_reservation(repo)
600 result = runner.invoke(cli, ["coord", "gc", "--json"])
601 data = _json.loads(result.output)
602 assert rid in data["removed_ids"]
603 assert data["dry_run"] is True
604
605 def test_orphaned_release_in_json_output(self, repo: pathlib.Path) -> None:
606 """Orphaned release collected and reflected in JSON releases_removed."""
607 releases_dir = coordination_dir(repo) / "releases"
608 releases_dir.mkdir(parents=True, exist_ok=True)
609 orphan_id = _new_id()
610 (releases_dir / f"{orphan_id}.json").write_text(_json.dumps({
611 "reservation_id": orphan_id,
612 "run_id": "ghost",
613 "released_at": now_utc_iso(),
614 "reason": "completed",
615 }))
616 result = runner.invoke(cli, [
617 "coord", "gc", "--execute", "--grace-period", "0", "--json"
618 ])
619 data = _json.loads(result.output)
620 assert data["releases_removed"] >= 1
621
622 def test_error_message_uses_emoji_prefix(self, repo: pathlib.Path) -> None:
623 """Validation errors must start with ❌, not bare 'error:'."""
624 result = runner.invoke(cli, ["coord", "gc", "--grace-period", "-1"])
625 combined = result.output + (result.stderr or "")
626 assert "❌" in combined
627
628
629 # ---------------------------------------------------------------------------
630 # E2E β€” full lifecycle
631 # ---------------------------------------------------------------------------
632
633
634 class TestCoordGcE2E:
635 def test_full_lifecycle_reserve_release_gc(self, repo: pathlib.Path) -> None:
636 """Reserve β†’ release β†’ GC: reservation + tombstone both removed."""
637 from muse.core.coordination import (
638 create_reservation, create_release, run_coord_gc,
639 load_all_reservations, load_released_ids,
640 )
641 res = create_reservation(
642 repo, run_id="e2e-agent", branch="main",
643 addresses=["src/e2e.py::func"], ttl_seconds=3600,
644 )
645 rid = res.reservation_id
646 create_release(repo, rid, run_id="e2e-agent")
647
648 gc_result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
649 assert rid in gc_result.removed_ids
650
651 res_path = coordination_dir(repo) / "reservations" / f"{rid}.json"
652 rel_path = coordination_dir(repo) / "releases" / f"{rid}.json"
653 assert not res_path.exists()
654 assert not rel_path.exists()
655
656 def test_active_survives_while_expired_neighbour_collected(self, repo: pathlib.Path) -> None:
657 """One active + one expired: only expired is collected."""
658 from muse.core.coordination import run_coord_gc
659 active_id = _write_active_reservation(repo)
660 expired_id = _write_expired_reservation(repo)
661
662 run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
663
664 res_dir = coordination_dir(repo) / "reservations"
665 surviving = {p.stem for p in res_dir.glob("*.json")}
666 assert active_id in surviving
667 assert expired_id not in surviving
668
669 def test_gc_with_heartbeat_extended_reservation(self, repo: pathlib.Path) -> None:
670 """Expired-by-TTL but heartbeat-extended: survives GC."""
671 from muse.core.coordination import (
672 create_reservation, create_heartbeat, run_coord_gc
673 )
674 res = create_reservation(
675 repo, run_id="hb-agent", branch="main",
676 addresses=["src/hb.py::func"], ttl_seconds=1,
677 )
678 rid = res.reservation_id
679 # Heartbeat extends 2 hours into the future
680 create_heartbeat(repo, rid, run_id="hb-agent", extension_seconds=7200)
681
682 gc_result = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
683 assert rid not in gc_result.removed_ids
684 res_path = coordination_dir(repo) / "reservations" / f"{rid}.json"
685 assert res_path.exists()
686
687 def test_repeated_gc_is_idempotent(self, repo: pathlib.Path) -> None:
688 """Running GC twice on an already-clean repo returns zero totals."""
689 from muse.core.coordination import run_coord_gc
690 _write_expired_reservation(repo)
691 run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
692 result2 = run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
693 assert result2.total_removed == 0
694
695 def test_gc_via_cli_full_lifecycle(self, repo: pathlib.Path) -> None:
696 """End-to-end via CLI: reserve β†’ release β†’ gc --execute β†’ JSON confirms removal."""
697 from muse.core.coordination import create_reservation, create_release
698 res = create_reservation(
699 repo, run_id="cli-e2e", branch="main",
700 addresses=["src/cli.py::func"], ttl_seconds=3600,
701 )
702 rid = res.reservation_id
703 create_release(repo, rid, run_id="cli-e2e")
704
705 result = runner.invoke(cli, [
706 "coord", "gc", "--execute", "--grace-period", "0", "--json",
707 ])
708 assert result.exit_code == 0
709 data = _json.loads(result.output)
710 assert rid in data["removed_ids"]
711 assert data["reservations_removed"] >= 1
712 assert data["releases_removed"] >= 1
713
714
715 # ---------------------------------------------------------------------------
716 # Concurrent
717 # ---------------------------------------------------------------------------
718
719
720 class TestCoordGcConcurrent:
721 def test_concurrent_dry_run_does_not_crash(self, repo: pathlib.Path) -> None:
722 """20 concurrent dry-run GC passes on the same repo must all succeed."""
723 import threading
724 for _ in range(50):
725 _write_expired_reservation(repo)
726
727 errors: list[Exception] = []
728 lock = threading.Lock()
729
730 def _gc() -> None:
731 try:
732 result = runner.invoke(cli, ["coord", "gc", "--json"])
733 assert result.exit_code == 0
734 except Exception as exc: # noqa: BLE001
735 with lock:
736 errors.append(exc)
737
738 threads = [threading.Thread(target=_gc) for _ in range(20)]
739 for t in threads:
740 t.start()
741 for t in threads:
742 t.join()
743
744 assert not errors, f"Concurrent dry-run errors: {errors}"
745
746 def test_concurrent_execute_leaves_no_files(self, repo: pathlib.Path) -> None:
747 """Two concurrent execute passes must not leave any collectable files on disk.
748
749 Both passes may succeed or one may race ahead β€” either is fine.
750 The critical invariant is that all expired files are gone afterward.
751 """
752 import threading
753 from muse.core.coordination import run_coord_gc
754 for _ in range(100):
755 _write_expired_reservation(repo)
756
757 def _gc() -> None:
758 run_coord_gc(repo, dry_run=False, grace_period_seconds=0)
759
760 t1 = threading.Thread(target=_gc)
761 t2 = threading.Thread(target=_gc)
762 t1.start()
763 t2.start()
764 t1.join()
765 t2.join()
766
767 res_dir = coordination_dir(repo) / "reservations"
768 if res_dir.exists():
769 remaining = list(res_dir.glob("*.json"))
770 assert remaining == [], f"Files left after concurrent GC: {remaining}"
771
772
773 # ---------------------------------------------------------------------------
774 # TestRegisterFlags β€” --json / -j normalized at argparse level
775 # ---------------------------------------------------------------------------
776
777
778 class TestRegisterFlags:
779 """register() must expose --json with -j shorthand and dest=json_out."""
780
781 def _make_parser(self) -> "argparse.ArgumentParser":
782 import argparse as ap
783 from muse.cli.commands.coord_gc import register
784 root = ap.ArgumentParser()
785 subs = root.add_subparsers()
786 register(subs)
787 return root
788
789 def test_json_out_default_false(self) -> None:
790 p = self._make_parser()
791 ns = p.parse_args(['gc'])
792 assert ns.json_out is False
793
794 def test_json_out_true_with_json_flag(self) -> None:
795 p = self._make_parser()
796 ns = p.parse_args(['gc', '--json'])
797 assert ns.json_out is True
798
799 def test_json_out_true_with_j_flag(self) -> None:
800 p = self._make_parser()
801 ns = p.parse_args(['gc', '-j'])
802 assert ns.json_out is True