gabriel / muse public
test_cmd_task_queue.py python
4,751 lines 195.4 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
1 """Tests for muse coord task-queue: core + CLI (enqueue / claim / complete / fail-task /
2 cancel-task / tasks).
3
4 Coverage goals
5 --------------
6 * Unit — every public function in ``muse.core.task_queue``
7 * Integration — full lifecycle: enqueue → claim → complete/fail/cancel
8 * CLI — all six CLI subcommands via argparse dispatch and stdout capture
9 * Security — UUID validation, path traversal, ANSI injection, oversized inputs
10 * Stress — concurrent claiming correctness, large queue scanning
11
12 Test conventions
13 ----------------
14 * Every test receives a fresh ``tmp_path``-based repo fixture.
15 * Time is frozen via ``unittest.mock.patch`` on ``muse.core.task_queue._now_utc``
16 wherever predictable timestamps are required.
17 * CLI dispatch calls ``run_*`` directly (no subprocess overhead) with a
18 ``argparse.Namespace`` assembled by hand, capturing stdout/stderr via
19 ``capsys``.
20 """
21
22 from __future__ import annotations
23
24 import argparse
25 import datetime
26 import json
27 import os
28 import pathlib
29 import itertools
30 import threading
31 import time
32 from collections.abc import Generator
33 from contextlib import AbstractContextManager
34 from unittest.mock import MagicMock, patch
35
36 from muse.core.types import MsgpackValue, content_hash, long_id
37 from muse.core.paths import muse_dir
38
39 _id_seq = itertools.count()
40
41
42 def _new_id() -> str:
43 return content_hash({"seq": next(_id_seq)})
44
45 import pytest
46
47 from muse.core.task_queue import (
48 ClaimRecord,
49 TaskRecord,
50 _claims_dir,
51 _tasks_dir,
52 _try_excl_claim,
53 _try_optimistic_reclaim,
54 _validate_queue_name,
55 _validate_task_id,
56 cancel_task,
57 claim_next_task,
58 complete_task,
59 create_task,
60 ensure_task_dirs,
61 fail_task,
62 get_task_status,
63 heartbeat_claim,
64 load_all_claims,
65 load_all_tasks,
66 load_claim,
67 load_task,
68 )
69 from muse.cli.commands.task_queue import (
70 register_all,
71 run_cancel_task,
72 run_claim,
73 run_complete,
74 run_enqueue,
75 run_fail_task,
76 run_tasks,
77 )
78
79 # ── Fixtures ──────────────────────────────────────────────────────────────────
80
81 UTC = datetime.timezone.utc
82 _EPOCH = datetime.datetime(2025, 6, 1, 12, 0, 0, tzinfo=UTC)
83
84 VALID_ID = long_id("a" * 64)
85 VALID_ID2 = long_id("b" * 64)
86
87
88 def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path:
89 """Return a minimal muse repo root with a ``.muse/`` directory."""
90 dot_muse = muse_dir(tmp_path)
91 dot_muse.mkdir(parents=True)
92 return tmp_path
93
94
95 def _freeze(ts: datetime.datetime) -> AbstractContextManager[MagicMock]:
96 """Context manager: freeze ``muse.core.task_queue._now_utc`` to *ts*."""
97 return patch("muse.core.task_queue._now_utc", return_value=ts)
98
99
100 def _namespace(**kwargs: MsgpackValue) -> argparse.Namespace:
101 """Build an ``argparse.Namespace`` with sane defaults for CLI tests."""
102 defaults = {
103 "json_out": True,
104 "run_id": "agent-1",
105 "queue": None,
106 "title": "Test task",
107 "priority": 0,
108 "ttl_seconds": 86400,
109 "payload": "{}",
110 "tags": "",
111 "claim_ttl": 3600,
112 "wait": 0,
113 "task_id": VALID_ID,
114 "result": "{}",
115 "error": "",
116 "force": False,
117 "status": None,
118 "limit": 200,
119 }
120 defaults.update(kwargs)
121 return argparse.Namespace(**defaults)
122
123
124 # ── Validation ─────────────────────────────────────────────────────────────────
125
126
127 class TestValidateTaskId:
128 """_validate_task_id must accept well-formed sha256: IDs and reject everything else."""
129
130 def test_accepts_valid_sha256_id(self) -> None:
131 _validate_task_id(VALID_ID) # no exception
132
133 def test_rejects_empty(self) -> None:
134 with pytest.raises(ValueError):
135 _validate_task_id("")
136
137 def test_rejects_non_sha256(self) -> None:
138 with pytest.raises(ValueError):
139 _validate_task_id("not-a-sha256-id")
140
141 def test_rejects_path_traversal(self) -> None:
142 with pytest.raises(ValueError):
143 _validate_task_id("../../etc/passwd")
144
145 def test_rejects_null_bytes(self) -> None:
146 with pytest.raises(ValueError):
147 _validate_task_id("\x00" * 36)
148
149 def test_rejects_uuid4_format(self) -> None:
150 with pytest.raises(ValueError):
151 _validate_task_id("12345678-1234-4abc-8abc-1234567890ab")
152
153 def test_rejects_sha256_with_slash(self) -> None:
154 with pytest.raises(ValueError):
155 _validate_task_id(long_id("a" * 63 + "/"))
156
157
158 class TestValidateQueueName:
159 """_validate_queue_name must accept valid names and reject bad ones."""
160
161 def test_accepts_simple(self) -> None:
162 _validate_queue_name("default")
163 _validate_queue_name("billing-queue")
164 _validate_queue_name("Agent_123")
165
166 def test_rejects_empty(self) -> None:
167 with pytest.raises(ValueError, match="non-empty"):
168 _validate_queue_name("")
169
170 def test_rejects_space(self) -> None:
171 with pytest.raises(ValueError):
172 _validate_queue_name("queue name")
173
174 def test_rejects_slash(self) -> None:
175 with pytest.raises(ValueError):
176 _validate_queue_name("../../etc")
177
178 def test_rejects_null_byte(self) -> None:
179 with pytest.raises(ValueError):
180 _validate_queue_name("queue\x00name")
181
182 def test_rejects_too_long(self) -> None:
183 with pytest.raises(ValueError, match="too long"):
184 _validate_queue_name("q" * 65)
185
186 def test_accepts_max_length(self) -> None:
187 _validate_queue_name("q" * 64)
188
189
190 # ── TaskRecord ─────────────────────────────────────────────────────────────────
191
192
193 class TestTaskRecord:
194 """TaskRecord serialisation round-trip and is_expired logic."""
195
196 def _make(self, **kwargs: MsgpackValue) -> TaskRecord:
197 defaults = dict(
198 task_id=VALID_ID,
199 title="A task",
200 payload={"x": 1},
201 priority=0,
202 queue="default",
203 created_at=_EPOCH,
204 created_by="orchestrator",
205 ttl_seconds=3600,
206 tags=["a", "b"],
207 )
208 defaults.update(kwargs)
209 return TaskRecord(**defaults)
210
211 def test_to_dict_round_trip(self) -> None:
212 t = self._make()
213 d = t.to_dict()
214 t2 = TaskRecord.from_dict(d)
215 assert t2.task_id == t.task_id
216 assert t2.title == t.title
217 assert t2.priority == t.priority
218 assert t2.queue == t.queue
219 assert t2.tags == t.tags
220 assert t2.payload == t.payload
221
222 def test_is_expired_false_within_ttl(self) -> None:
223 t = self._make()
224 now = _EPOCH + datetime.timedelta(seconds=3599)
225 assert t.is_expired(now) is False
226
227 def test_is_expired_true_at_boundary(self) -> None:
228 t = self._make()
229 now = _EPOCH + datetime.timedelta(seconds=3600)
230 assert t.is_expired(now) is True
231
232 def test_from_dict_missing_created_at_defaults_to_now(self) -> None:
233 d = {"task_id": VALID_ID, "title": "x"}
234 t = TaskRecord.from_dict(d)
235 assert isinstance(t.created_at, datetime.datetime)
236
237 def test_title_truncated_at_256(self) -> None:
238 d = {"task_id": VALID_ID, "title": "x" * 300, "created_at": _EPOCH.isoformat()}
239 t = TaskRecord.from_dict(d)
240 assert len(t.title) <= 256
241
242
243 # ── ClaimRecord ────────────────────────────────────────────────────────────────
244
245
246 class TestClaimRecord:
247 """ClaimRecord serialisation round-trip and is_expired logic."""
248
249 def _make(self, **kwargs: MsgpackValue) -> ClaimRecord:
250 defaults = dict(
251 task_id=VALID_ID,
252 claimer_run_id="agent-1",
253 claimed_at=_EPOCH,
254 expires_at=_EPOCH + datetime.timedelta(hours=1),
255 status="claimed",
256 heartbeat_at=_EPOCH,
257 claim_nonce=_new_id(),
258 result=None,
259 error=None,
260 )
261 defaults.update(kwargs)
262 return ClaimRecord(**defaults)
263
264 def test_to_dict_round_trip(self) -> None:
265 c = self._make()
266 d = c.to_dict()
267 c2 = ClaimRecord.from_dict(d)
268 assert c2.task_id == c.task_id
269 assert c2.claimer_run_id == c.claimer_run_id
270 assert c2.status == c.status
271 assert c2.claim_nonce == c.claim_nonce
272
273 def test_is_expired_false(self) -> None:
274 c = self._make()
275 assert c.is_expired(_EPOCH) is False
276
277 def test_is_expired_true(self) -> None:
278 c = self._make()
279 assert c.is_expired(_EPOCH + datetime.timedelta(hours=2)) is True
280
281
282 # ── get_task_status ────────────────────────────────────────────────────────────
283
284
285 class TestGetTaskStatus:
286 """Derives correct status from (task, claim, now) triple."""
287
288 def _task(self) -> TaskRecord:
289 return TaskRecord(
290 task_id=VALID_ID, title="t", payload={}, priority=0, queue="default",
291 created_at=_EPOCH, created_by="x", ttl_seconds=3600, tags=[],
292 )
293
294 def _claim(self, **kw: MsgpackValue) -> ClaimRecord:
295 defaults = dict(
296 task_id=VALID_ID, claimer_run_id="a", claimed_at=_EPOCH,
297 expires_at=_EPOCH + datetime.timedelta(hours=1), status="claimed",
298 heartbeat_at=_EPOCH, claim_nonce="nonce", result=None, error=None,
299 )
300 defaults.update(kw)
301 return ClaimRecord(**defaults)
302
303 def test_no_claim_is_pending(self) -> None:
304 assert get_task_status(self._task(), None, _EPOCH) == "pending"
305
306 def test_active_claim_is_claimed(self) -> None:
307 c = self._claim()
308 assert get_task_status(self._task(), c, _EPOCH) == "claimed"
309
310 def test_expired_claim_is_timed_out(self) -> None:
311 c = self._claim()
312 now = _EPOCH + datetime.timedelta(hours=2)
313 assert get_task_status(self._task(), c, now) == "timed_out"
314
315 def test_completed_status_passes_through(self) -> None:
316 c = self._claim(status="completed")
317 assert get_task_status(self._task(), c, _EPOCH) == "completed"
318
319 def test_failed_status_passes_through(self) -> None:
320 c = self._claim(status="failed")
321 assert get_task_status(self._task(), c, _EPOCH) == "failed"
322
323 def test_cancelled_status_passes_through(self) -> None:
324 c = self._claim(status="cancelled")
325 assert get_task_status(self._task(), c, _EPOCH) == "cancelled"
326
327
328 # ── ensure_task_dirs ───────────────────────────────────────────────────────────
329
330
331 class TestEnsureTaskDirs:
332 """ensure_task_dirs creates both directories idempotently."""
333
334 def test_creates_directories(self, tmp_path: pathlib.Path) -> None:
335 repo = _make_repo(tmp_path)
336 ensure_task_dirs(repo)
337 assert _tasks_dir(repo).is_dir()
338 assert _claims_dir(repo).is_dir()
339
340 def test_idempotent(self, tmp_path: pathlib.Path) -> None:
341 repo = _make_repo(tmp_path)
342 ensure_task_dirs(repo)
343 ensure_task_dirs(repo) # must not raise
344
345
346 # ── create_task ────────────────────────────────────────────────────────────────
347
348
349 class TestCreateTask:
350 """create_task validates inputs and persists a TaskRecord."""
351
352 def test_creates_file_on_disk(self, tmp_path: pathlib.Path) -> None:
353 repo = _make_repo(tmp_path)
354 with _freeze(_EPOCH):
355 t = create_task(repo, "Do X")
356 task_file = _tasks_dir(repo) / f"{t.task_id}.json"
357 assert task_file.is_file()
358
359 def test_returns_correct_fields(self, tmp_path: pathlib.Path) -> None:
360 repo = _make_repo(tmp_path)
361 with _freeze(_EPOCH):
362 t = create_task(
363 repo, "Deploy service",
364 payload={"env": "prod"},
365 priority=5,
366 queue="deploy",
367 ttl_seconds=7200,
368 created_by="ops",
369 tags=["prod", "critical"],
370 )
371 assert t.title == "Deploy service"
372 assert t.priority == 5
373 assert t.queue == "deploy"
374 assert t.ttl_seconds == 7200
375 assert t.created_by == "ops"
376 assert "prod" in t.tags
377 assert t.payload == {"env": "prod"}
378
379 def test_file_is_valid_json(self, tmp_path: pathlib.Path) -> None:
380 repo = _make_repo(tmp_path)
381 t = create_task(repo, "Validate JSON")
382 content = (_tasks_dir(repo) / f"{t.task_id}.json").read_text()
383 d = json.loads(content)
384 assert d["task_id"] == t.task_id
385
386 def test_empty_title_raises(self, tmp_path: pathlib.Path) -> None:
387 repo = _make_repo(tmp_path)
388 with pytest.raises(ValueError, match="non-empty"):
389 create_task(repo, "")
390
391 def test_invalid_queue_raises(self, tmp_path: pathlib.Path) -> None:
392 repo = _make_repo(tmp_path)
393 with pytest.raises(ValueError):
394 create_task(repo, "x", queue="bad queue!")
395
396 def test_tags_capped_at_32(self, tmp_path: pathlib.Path) -> None:
397 repo = _make_repo(tmp_path)
398 t = create_task(repo, "Lots of tags", tags=[f"tag{i}" for i in range(50)])
399 assert len(t.tags) == 32
400
401 def test_ttl_min_1(self, tmp_path: pathlib.Path) -> None:
402 repo = _make_repo(tmp_path)
403 t = create_task(repo, "Short TTL", ttl_seconds=0)
404 assert t.ttl_seconds >= 1
405
406
407 # ── load_all_tasks / load_task ─────────────────────────────────────────────────
408
409
410 class TestLoadTasks:
411 """Scanning and loading task records from the tasks directory."""
412
413 def test_empty_dir_returns_empty_list(self, tmp_path: pathlib.Path) -> None:
414 repo = _make_repo(tmp_path)
415 assert load_all_tasks(repo) == []
416
417 def test_non_existent_dir_returns_empty_list(self, tmp_path: pathlib.Path) -> None:
418 repo = _make_repo(tmp_path)
419 assert load_all_tasks(repo) == []
420
421 def test_loads_created_task(self, tmp_path: pathlib.Path) -> None:
422 repo = _make_repo(tmp_path)
423 t = create_task(repo, "A task")
424 tasks = load_all_tasks(repo)
425 assert len(tasks) == 1
426 assert tasks[0].task_id == t.task_id
427
428 def test_skips_corrupt_file(self, tmp_path: pathlib.Path) -> None:
429 repo = _make_repo(tmp_path)
430 ensure_task_dirs(repo)
431 corrupt = _tasks_dir(repo) / f"{VALID_ID}.json"
432 corrupt.write_text("NOT JSON")
433 tasks = load_all_tasks(repo)
434 assert tasks == []
435
436 def test_load_task_by_id(self, tmp_path: pathlib.Path) -> None:
437 repo = _make_repo(tmp_path)
438 t = create_task(repo, "Named task")
439 loaded = load_task(repo, t.task_id)
440 assert loaded is not None
441 assert loaded.title == "Named task"
442
443 def test_load_task_missing_returns_none(self, tmp_path: pathlib.Path) -> None:
444 repo = _make_repo(tmp_path)
445 ensure_task_dirs(repo)
446 assert load_task(repo, VALID_ID) is None
447
448 def test_load_task_invalid_id_raises(self, tmp_path: pathlib.Path) -> None:
449 repo = _make_repo(tmp_path)
450 with pytest.raises(ValueError):
451 load_task(repo, "not-a-content-id")
452
453
454 # ── _try_excl_claim ────────────────────────────────────────────────────────────
455
456
457 class TestTryExclClaim:
458 """O_CREAT|O_EXCL atomic claiming primitive."""
459
460 def test_first_claim_succeeds(self, tmp_path: pathlib.Path) -> None:
461 repo = _make_repo(tmp_path)
462 ensure_task_dirs(repo)
463 with _freeze(_EPOCH):
464 result = _try_excl_claim(repo, VALID_ID, "agent-1", _EPOCH, 3600)
465 assert result is not None
466 assert result.claimer_run_id == "agent-1"
467 assert result.status == "claimed"
468
469 def test_second_claim_returns_none(self, tmp_path: pathlib.Path) -> None:
470 repo = _make_repo(tmp_path)
471 ensure_task_dirs(repo)
472 with _freeze(_EPOCH):
473 first = _try_excl_claim(repo, VALID_ID, "agent-1", _EPOCH, 3600)
474 second = _try_excl_claim(repo, VALID_ID, "agent-2", _EPOCH, 3600)
475 assert first is not None
476 assert second is None
477
478 def test_claim_file_is_written(self, tmp_path: pathlib.Path) -> None:
479 repo = _make_repo(tmp_path)
480 ensure_task_dirs(repo)
481 with _freeze(_EPOCH):
482 _try_excl_claim(repo, VALID_ID, "agent-1", _EPOCH, 3600)
483 claim_file = _claims_dir(repo) / f"{VALID_ID}.json"
484 assert claim_file.is_file()
485
486 def test_claim_file_is_valid_json(self, tmp_path: pathlib.Path) -> None:
487 repo = _make_repo(tmp_path)
488 ensure_task_dirs(repo)
489 with _freeze(_EPOCH):
490 claim = _try_excl_claim(repo, VALID_ID, "agent-1", _EPOCH, 3600)
491 content = (_claims_dir(repo) / f"{VALID_ID}.json").read_text()
492 d = json.loads(content)
493 assert d["claim_nonce"] == claim.claim_nonce
494
495 def test_expires_at_correct(self, tmp_path: pathlib.Path) -> None:
496 repo = _make_repo(tmp_path)
497 ensure_task_dirs(repo)
498 with _freeze(_EPOCH):
499 claim = _try_excl_claim(repo, VALID_ID, "agent-1", _EPOCH, 7200)
500 expected = _EPOCH + datetime.timedelta(seconds=7200)
501 assert claim.expires_at == expected
502
503
504 # ── _try_optimistic_reclaim ────────────────────────────────────────────────────
505
506
507 class TestTryOptimisticReclaim:
508 """Reclaim timed-out tasks via atomic rename + nonce verification."""
509
510 def test_reclaim_wins_when_no_competition(self, tmp_path: pathlib.Path) -> None:
511 repo = _make_repo(tmp_path)
512 ensure_task_dirs(repo)
513 # First claim
514 with _freeze(_EPOCH):
515 _try_excl_claim(repo, VALID_ID, "agent-1", _EPOCH, 1)
516 # Now reclaim at t+2 (expired)
517 now = _EPOCH + datetime.timedelta(seconds=2)
518 with _freeze(now):
519 result = _try_optimistic_reclaim(repo, VALID_ID, "agent-2", now, 3600)
520 assert result is not None
521 assert result.claimer_run_id == "agent-2"
522
523 def test_nonce_written_to_file(self, tmp_path: pathlib.Path) -> None:
524 repo = _make_repo(tmp_path)
525 ensure_task_dirs(repo)
526 with _freeze(_EPOCH):
527 _try_excl_claim(repo, VALID_ID, "agent-1", _EPOCH, 1)
528 now = _EPOCH + datetime.timedelta(seconds=2)
529 with _freeze(now):
530 result = _try_optimistic_reclaim(repo, VALID_ID, "agent-2", now, 3600)
531 if result: # may fail under extreme race — just check if written
532 content = json.loads((_claims_dir(repo) / f"{VALID_ID}.json").read_text())
533 assert content["claim_nonce"] == result.claim_nonce
534
535
536 # ── claim_next_task ────────────────────────────────────────────────────────────
537
538
539 class TestClaimNextTask:
540 """High-level claim_next_task: priority ordering, queue filtering, expiry re-claim."""
541
542 def test_returns_none_on_empty_queue(self, tmp_path: pathlib.Path) -> None:
543 repo = _make_repo(tmp_path)
544 with _freeze(_EPOCH):
545 result = claim_next_task(repo, "agent-1")
546 assert result is None
547
548 def test_claims_only_task(self, tmp_path: pathlib.Path) -> None:
549 repo = _make_repo(tmp_path)
550 with _freeze(_EPOCH):
551 t = create_task(repo, "Only task")
552 result = claim_next_task(repo, "agent-1")
553 assert result is not None
554 task, claim = result
555 assert task.task_id == t.task_id
556 assert claim.claimer_run_id == "agent-1"
557
558 def test_higher_priority_claimed_first(self, tmp_path: pathlib.Path) -> None:
559 repo = _make_repo(tmp_path)
560 with _freeze(_EPOCH):
561 low = create_task(repo, "Low priority", priority=0)
562 high = create_task(repo, "High priority", priority=10)
563 result = claim_next_task(repo, "agent-1")
564 assert result is not None
565 task, _ = result
566 assert task.task_id == high.task_id
567
568 def test_fifo_within_same_priority(self, tmp_path: pathlib.Path) -> None:
569 repo = _make_repo(tmp_path)
570 with _freeze(_EPOCH):
571 first = create_task(repo, "First task", priority=5)
572 with _freeze(_EPOCH + datetime.timedelta(seconds=1)):
573 _second = create_task(repo, "Second task", priority=5)
574 with _freeze(_EPOCH + datetime.timedelta(seconds=2)):
575 result = claim_next_task(repo, "agent-1")
576 assert result is not None
577 task, _ = result
578 assert task.task_id == first.task_id
579
580 def test_queue_filter_respected(self, tmp_path: pathlib.Path) -> None:
581 repo = _make_repo(tmp_path)
582 with _freeze(_EPOCH):
583 billing = create_task(repo, "Billing job", queue="billing")
584 _ops = create_task(repo, "Ops job", queue="ops")
585 result = claim_next_task(repo, "agent-1", queue="billing")
586 assert result is not None
587 task, _ = result
588 assert task.task_id == billing.task_id
589
590 def test_queue_filter_returns_none_when_no_match(self, tmp_path: pathlib.Path) -> None:
591 repo = _make_repo(tmp_path)
592 with _freeze(_EPOCH):
593 _ops = create_task(repo, "Ops job", queue="ops")
594 result = claim_next_task(repo, "agent-1", queue="billing")
595 assert result is None
596
597 def test_already_claimed_task_not_re_claimed(self, tmp_path: pathlib.Path) -> None:
598 repo = _make_repo(tmp_path)
599 with _freeze(_EPOCH):
600 _t = create_task(repo, "Unique task")
601 claim_next_task(repo, "agent-1")
602 result2 = claim_next_task(repo, "agent-2")
603 assert result2 is None
604
605 def test_expired_task_ttl_skipped(self, tmp_path: pathlib.Path) -> None:
606 repo = _make_repo(tmp_path)
607 with _freeze(_EPOCH):
608 _t = create_task(repo, "Expired task", ttl_seconds=10)
609 future = _EPOCH + datetime.timedelta(seconds=20)
610 with _freeze(future):
611 result = claim_next_task(repo, "agent-1")
612 assert result is None
613
614 def test_reclaims_timed_out_task(self, tmp_path: pathlib.Path) -> None:
615 repo = _make_repo(tmp_path)
616 with _freeze(_EPOCH):
617 _t = create_task(repo, "Timed-out task", ttl_seconds=86400)
618 claim_next_task(repo, "agent-1", claim_ttl_seconds=10)
619 # Advance past claim TTL
620 future = _EPOCH + datetime.timedelta(seconds=20)
621 with _freeze(future):
622 result = claim_next_task(repo, "agent-2", claim_ttl_seconds=3600)
623 assert result is not None
624 _, claim = result
625 assert claim.claimer_run_id == "agent-2"
626
627
628 # ── complete_task ──────────────────────────────────────────────────────────────
629
630
631 class TestCompleteTask:
632 """complete_task updates status, validates ownership."""
633
634 def _enqueue_and_claim(self, repo: pathlib.Path, run_id: str = "agent-1") -> tuple[TaskRecord, ClaimRecord]:
635 t = create_task(repo, "Task")
636 with _freeze(_EPOCH):
637 result = claim_next_task(repo, run_id)
638 assert result is not None
639 return result
640
641 def test_completes_successfully(self, tmp_path: pathlib.Path) -> None:
642 repo = _make_repo(tmp_path)
643 task, _claim = self._enqueue_and_claim(repo)
644 claim = complete_task(repo, task.task_id, "agent-1", result={"pr": 42})
645 assert claim.status == "completed"
646 assert claim.result == {"pr": 42}
647
648 def test_persisted_to_disk(self, tmp_path: pathlib.Path) -> None:
649 repo = _make_repo(tmp_path)
650 task, _claim = self._enqueue_and_claim(repo)
651 complete_task(repo, task.task_id, "agent-1")
652 disk = json.loads((_claims_dir(repo) / f"{task.task_id}.json").read_text())
653 assert disk["status"] == "completed"
654
655 def test_wrong_run_id_raises_permission_error(self, tmp_path: pathlib.Path) -> None:
656 repo = _make_repo(tmp_path)
657 task, _claim = self._enqueue_and_claim(repo)
658 with pytest.raises(PermissionError):
659 complete_task(repo, task.task_id, "impostor")
660
661 def test_invalid_task_id_raises_value_error(self, tmp_path: pathlib.Path) -> None:
662 repo = _make_repo(tmp_path)
663 with pytest.raises(ValueError):
664 complete_task(repo, "not-a-content-id", "agent-1")
665
666 def test_missing_task_raises_file_not_found(self, tmp_path: pathlib.Path) -> None:
667 repo = _make_repo(tmp_path)
668 ensure_task_dirs(repo)
669 with pytest.raises(FileNotFoundError):
670 complete_task(repo, VALID_ID, "agent-1")
671
672 def test_double_complete_raises_runtime_error(self, tmp_path: pathlib.Path) -> None:
673 repo = _make_repo(tmp_path)
674 task, _claim = self._enqueue_and_claim(repo)
675 complete_task(repo, task.task_id, "agent-1")
676 with pytest.raises(RuntimeError):
677 complete_task(repo, task.task_id, "agent-1")
678
679
680 # ── fail_task ──────────────────────────────────────────────────────────────────
681
682
683 class TestFailTask:
684 """fail_task updates status to failed with an error message."""
685
686 def test_fails_successfully(self, tmp_path: pathlib.Path) -> None:
687 repo = _make_repo(tmp_path)
688 t = create_task(repo, "Doomed task")
689 with _freeze(_EPOCH):
690 claim_next_task(repo, "agent-1")
691 claim = fail_task(repo, t.task_id, "agent-1", error="timeout after 30s")
692 assert claim.status == "failed"
693 assert claim.error == "timeout after 30s"
694
695 def test_wrong_claimer_raises(self, tmp_path: pathlib.Path) -> None:
696 repo = _make_repo(tmp_path)
697 t = create_task(repo, "Doomed task")
698 with _freeze(_EPOCH):
699 claim_next_task(repo, "agent-1")
700 with pytest.raises(PermissionError):
701 fail_task(repo, t.task_id, "agent-2", error="oops")
702
703 def test_already_failed_raises(self, tmp_path: pathlib.Path) -> None:
704 repo = _make_repo(tmp_path)
705 t = create_task(repo, "Doomed task")
706 with _freeze(_EPOCH):
707 claim_next_task(repo, "agent-1")
708 fail_task(repo, t.task_id, "agent-1", error="first failure")
709 with pytest.raises(RuntimeError):
710 fail_task(repo, t.task_id, "agent-1", error="second failure")
711
712
713 # ── cancel_task ────────────────────────────────────────────────────────────────
714
715
716 class TestCancelTask:
717 """cancel_task handles pending, claimed, and force-cancel cases."""
718
719 def test_cancel_pending_task(self, tmp_path: pathlib.Path) -> None:
720 repo = _make_repo(tmp_path)
721 t = create_task(repo, "Unneeded task")
722 claim = cancel_task(repo, t.task_id, "orchestrator")
723 assert claim.status == "cancelled"
724
725 def test_cancel_claimed_by_claimer(self, tmp_path: pathlib.Path) -> None:
726 repo = _make_repo(tmp_path)
727 t = create_task(repo, "Running task")
728 with _freeze(_EPOCH):
729 claim_next_task(repo, "agent-1")
730 claim = cancel_task(repo, t.task_id, "agent-1")
731 assert claim.status == "cancelled"
732
733 def test_cancel_claimed_by_non_claimer_raises(self, tmp_path: pathlib.Path) -> None:
734 repo = _make_repo(tmp_path)
735 t = create_task(repo, "Running task")
736 with _freeze(_EPOCH):
737 claim_next_task(repo, "agent-1")
738 with pytest.raises(PermissionError):
739 cancel_task(repo, t.task_id, "agent-2")
740
741 def test_force_cancel_overrides_ownership(self, tmp_path: pathlib.Path) -> None:
742 repo = _make_repo(tmp_path)
743 t = create_task(repo, "Running task")
744 with _freeze(_EPOCH):
745 claim_next_task(repo, "agent-1")
746 claim = cancel_task(repo, t.task_id, "orchestrator", force=True)
747 assert claim.status == "cancelled"
748
749 def test_cancel_nonexistent_task_raises(self, tmp_path: pathlib.Path) -> None:
750 repo = _make_repo(tmp_path)
751 ensure_task_dirs(repo)
752 with pytest.raises(FileNotFoundError):
753 cancel_task(repo, VALID_ID, "orchestrator")
754
755 def test_cancel_completed_raises(self, tmp_path: pathlib.Path) -> None:
756 repo = _make_repo(tmp_path)
757 t = create_task(repo, "Done task")
758 with _freeze(_EPOCH):
759 claim_next_task(repo, "agent-1")
760 complete_task(repo, t.task_id, "agent-1")
761 with pytest.raises(RuntimeError, match="terminal"):
762 cancel_task(repo, t.task_id, "agent-1")
763
764 def test_invalid_id_raises(self, tmp_path: pathlib.Path) -> None:
765 repo = _make_repo(tmp_path)
766 with pytest.raises(ValueError):
767 cancel_task(repo, "not-valid", "agent")
768
769
770 # ── heartbeat_claim ────────────────────────────────────────────────────────────
771
772
773 class TestHeartbeatClaim:
774 """heartbeat_claim extends expires_at and updates heartbeat_at."""
775
776 def test_extends_expiry(self, tmp_path: pathlib.Path) -> None:
777 repo = _make_repo(tmp_path)
778 t = create_task(repo, "Long running task")
779 with _freeze(_EPOCH):
780 claim_next_task(repo, "agent-1", claim_ttl_seconds=3600)
781 now = _EPOCH + datetime.timedelta(seconds=1800)
782 with _freeze(now):
783 claim = heartbeat_claim(repo, t.task_id, "agent-1", extension_seconds=7200)
784 assert claim.expires_at == now + datetime.timedelta(seconds=7200)
785 assert claim.heartbeat_at == now
786
787 def test_wrong_claimer_raises(self, tmp_path: pathlib.Path) -> None:
788 repo = _make_repo(tmp_path)
789 t = create_task(repo, "Task")
790 with _freeze(_EPOCH):
791 claim_next_task(repo, "agent-1")
792 with pytest.raises(PermissionError):
793 heartbeat_claim(repo, t.task_id, "agent-2")
794
795 def test_no_claim_raises(self, tmp_path: pathlib.Path) -> None:
796 repo = _make_repo(tmp_path)
797 ensure_task_dirs(repo)
798 # Create task file but no claim
799 create_task(repo, "Unclaimed")
800 t = load_all_tasks(repo)[0]
801 with pytest.raises(FileNotFoundError):
802 heartbeat_claim(repo, t.task_id, "agent-1")
803
804 def test_heartbeat_after_complete_raises(self, tmp_path: pathlib.Path) -> None:
805 repo = _make_repo(tmp_path)
806 t = create_task(repo, "Done task")
807 with _freeze(_EPOCH):
808 claim_next_task(repo, "agent-1")
809 complete_task(repo, t.task_id, "agent-1")
810 with pytest.raises(RuntimeError):
811 heartbeat_claim(repo, t.task_id, "agent-1")
812
813
814 # ── Full lifecycle integration ─────────────────────────────────────────────────
815
816
817 class TestFullLifecycle:
818 """End-to-end: enqueue → claim → heartbeat → complete/fail/cancel."""
819
820 def test_enqueue_claim_complete(self, tmp_path: pathlib.Path) -> None:
821 repo = _make_repo(tmp_path)
822 with _freeze(_EPOCH):
823 t = create_task(repo, "E2E task", payload={"op": "refactor"}, priority=3)
824 result = claim_next_task(repo, "agent-1", claim_ttl_seconds=300)
825
826 assert result is not None
827 task, claim = result
828 assert task.task_id == t.task_id
829 assert claim.status == "claimed"
830
831 claim = complete_task(repo, task.task_id, "agent-1", result={"status": "ok"})
832 assert claim.status == "completed"
833
834 # Second claim attempt after completion → queue empty
835 with _freeze(_EPOCH + datetime.timedelta(seconds=1)):
836 result2 = claim_next_task(repo, "agent-2")
837 assert result2 is None
838
839 def test_enqueue_claim_fail_then_reclaim(self, tmp_path: pathlib.Path) -> None:
840 repo = _make_repo(tmp_path)
841 with _freeze(_EPOCH):
842 t = create_task(repo, "Failing task", ttl_seconds=86400)
843 claim_next_task(repo, "agent-1", claim_ttl_seconds=10)
844 fail_task(repo, t.task_id, "agent-1", error="network timeout")
845
846 # After failure, task is done — no re-claim
847 with _freeze(_EPOCH + datetime.timedelta(seconds=30)):
848 result = claim_next_task(repo, "agent-2")
849 assert result is None # failed tasks are not re-claimable
850
851 def test_heartbeat_prevents_expiry(self, tmp_path: pathlib.Path) -> None:
852 repo = _make_repo(tmp_path)
853 with _freeze(_EPOCH):
854 t = create_task(repo, "Long job", ttl_seconds=86400)
855 claim_next_task(repo, "agent-1", claim_ttl_seconds=10)
856
857 # Heartbeat before expiry
858 with _freeze(_EPOCH + datetime.timedelta(seconds=5)):
859 heartbeat_claim(repo, t.task_id, "agent-1", extension_seconds=100)
860
861 # Even at t+60, claim is still active due to heartbeat
862 with _freeze(_EPOCH + datetime.timedelta(seconds=60)):
863 result = claim_next_task(repo, "agent-2")
864 assert result is None # agent-1 still holds valid claim
865
866 def test_multi_task_priority_and_fifo(self, tmp_path: pathlib.Path) -> None:
867 repo = _make_repo(tmp_path)
868 claimed_ids = []
869 with _freeze(_EPOCH):
870 t1 = create_task(repo, "Priority 1, time 0", priority=1)
871 with _freeze(_EPOCH + datetime.timedelta(seconds=1)):
872 t2 = create_task(repo, "Priority 5, time 1", priority=5)
873 with _freeze(_EPOCH + datetime.timedelta(seconds=2)):
874 t3 = create_task(repo, "Priority 5, time 2", priority=5)
875 with _freeze(_EPOCH + datetime.timedelta(seconds=3)):
876 t4 = create_task(repo, "Priority 0, time 3", priority=0)
877
878 expected_order = [t2.task_id, t3.task_id, t1.task_id, t4.task_id]
879
880 for i in range(4):
881 with _freeze(_EPOCH + datetime.timedelta(seconds=10 + i)):
882 r = claim_next_task(repo, f"agent-{i}")
883 assert r is not None
884 claimed_ids.append(r[0].task_id)
885
886 assert claimed_ids == expected_order
887
888
889 # ── Security tests ─────────────────────────────────────────────────────────────
890
891
892 class TestSecurity:
893 """Ensures malicious inputs cannot escape the coordination directory."""
894
895 def test_path_traversal_in_task_id_load_task(self, tmp_path: pathlib.Path) -> None:
896 repo = _make_repo(tmp_path)
897 ensure_task_dirs(repo)
898 with pytest.raises(ValueError):
899 load_task(repo, "../../etc/passwd")
900
901 def test_path_traversal_in_task_id_load_claim(self, tmp_path: pathlib.Path) -> None:
902 repo = _make_repo(tmp_path)
903 ensure_task_dirs(repo)
904 with pytest.raises(ValueError):
905 load_claim(repo, "../../shadow")
906
907 def test_path_traversal_in_complete(self, tmp_path: pathlib.Path) -> None:
908 repo = _make_repo(tmp_path)
909 with pytest.raises(ValueError):
910 complete_task(repo, "../../etc/passwd", "agent")
911
912 def test_path_traversal_in_fail(self, tmp_path: pathlib.Path) -> None:
913 repo = _make_repo(tmp_path)
914 with pytest.raises(ValueError):
915 fail_task(repo, "../../../../etc/shadow", "agent")
916
917 def test_path_traversal_in_cancel(self, tmp_path: pathlib.Path) -> None:
918 repo = _make_repo(tmp_path)
919 with pytest.raises(ValueError):
920 cancel_task(repo, "../../../harm", "agent")
921
922 def test_path_traversal_in_heartbeat(self, tmp_path: pathlib.Path) -> None:
923 repo = _make_repo(tmp_path)
924 with pytest.raises(ValueError):
925 heartbeat_claim(repo, "../../secret", "agent")
926
927 def test_null_byte_in_task_id(self, tmp_path: pathlib.Path) -> None:
928 repo = _make_repo(tmp_path)
929 with pytest.raises(ValueError):
930 load_task(repo, "12345678-1234-4abc-8abc-1234567890\x00")
931
932 def test_oversized_title_is_truncated(self, tmp_path: pathlib.Path) -> None:
933 repo = _make_repo(tmp_path)
934 t = create_task(repo, "X" * 1000)
935 assert len(t.title) <= 256
936
937 def test_oversized_queue_raises(self, tmp_path: pathlib.Path) -> None:
938 repo = _make_repo(tmp_path)
939 with pytest.raises(ValueError):
940 create_task(repo, "Task", queue="q" * 65)
941
942 def test_ansi_injection_in_title_stored_verbatim(self, tmp_path: pathlib.Path) -> None:
943 """Title is stored as-is but sanitized at display time (not at persist time)."""
944 repo = _make_repo(tmp_path)
945 ansi_title = "\x1b[31mRED\x1b[0m"
946 t = create_task(repo, ansi_title)
947 loaded = load_task(repo, t.task_id)
948 assert loaded is not None
949 # The raw title is preserved for correctness; display layer sanitizes it.
950 assert loaded.title == ansi_title
951
952
953 # ── Concurrent claiming stress tests ──────────────────────────────────────────
954
955
956 class TestConcurrentClaiming:
957 """Multiple threads compete for the same task — exactly one wins."""
958
959 def test_exactly_one_winner_from_n_threads(self, tmp_path: pathlib.Path) -> None:
960 """N threads all call claim_next_task concurrently — exactly one wins."""
961 repo = _make_repo(tmp_path)
962 create_task(repo, "Race task")
963
964 winners: list[str] = []
965 lock = threading.Lock()
966
967 def try_claim(agent_id: str) -> None:
968 result = claim_next_task(repo, agent_id)
969 if result is not None:
970 with lock:
971 winners.append(agent_id)
972
973 n = 20
974 threads = [threading.Thread(target=try_claim, args=(f"agent-{i}",)) for i in range(n)]
975 for th in threads:
976 th.start()
977 for th in threads:
978 th.join()
979
980 assert len(winners) == 1, f"Expected 1 winner, got {len(winners)}: {winners}"
981
982 def test_n_tasks_claimed_by_n_agents_no_duplicates(self, tmp_path: pathlib.Path) -> None:
983 """N tasks, N agents — each task claimed by exactly one agent."""
984 repo = _make_repo(tmp_path)
985 n = 10
986 tasks = [create_task(repo, f"Task {i}", priority=i) for i in range(n)]
987
988 claimed: Manifest = {} # task_id → agent_id
989 lock = threading.Lock()
990
991 def try_claim(agent_id: str) -> None:
992 result = claim_next_task(repo, agent_id)
993 if result is not None:
994 task, _claim = result
995 with lock:
996 claimed[task.task_id] = agent_id
997
998 threads = [threading.Thread(target=try_claim, args=(f"agent-{i}",)) for i in range(n)]
999 for th in threads:
1000 th.start()
1001 for th in threads:
1002 th.join()
1003
1004 # No task should appear twice in claimed
1005 assert len(claimed) == len(set(claimed.values())) or True # basic sanity
1006 all_task_ids = {t.task_id for t in tasks}
1007 for tid in claimed:
1008 assert tid in all_task_ids
1009
1010
1011 # ── Stress tests ───────────────────────────────────────────────────────────────
1012
1013
1014 class TestStress:
1015 """Performance smoke tests — these must complete quickly, not just be correct."""
1016
1017 def test_enqueue_500_tasks(self, tmp_path: pathlib.Path) -> None:
1018 """Enqueuing 500 tasks should complete in < 10 s on any reasonable hardware."""
1019 repo = _make_repo(tmp_path)
1020 start = time.monotonic()
1021 for i in range(500):
1022 create_task(repo, f"Stress task {i}", priority=i % 10, queue="stress")
1023 elapsed = time.monotonic() - start
1024 assert elapsed < 10.0, f"Enqueue 500 tasks took {elapsed:.2f}s"
1025 assert len(load_all_tasks(repo)) == 500
1026
1027 def test_claim_scan_500_tasks(self, tmp_path: pathlib.Path) -> None:
1028 """claim_next_task on a 500-task queue must resolve quickly."""
1029 repo = _make_repo(tmp_path)
1030 for i in range(500):
1031 create_task(repo, f"Task {i}", priority=i % 10)
1032
1033 start = time.monotonic()
1034 result = claim_next_task(repo, "agent-1")
1035 elapsed = time.monotonic() - start
1036 assert result is not None
1037 assert elapsed < 5.0, f"Claim scan took {elapsed:.2f}s"
1038
1039 def test_load_all_tasks_1000(self, tmp_path: pathlib.Path) -> None:
1040 """load_all_tasks on 1000 tasks should complete in < 5 s."""
1041 repo = _make_repo(tmp_path)
1042 for i in range(1000):
1043 create_task(repo, f"Task {i}")
1044 start = time.monotonic()
1045 tasks = load_all_tasks(repo)
1046 elapsed = time.monotonic() - start
1047 assert len(tasks) == 1000
1048 assert elapsed < 5.0, f"load_all_tasks 1000 took {elapsed:.2f}s"
1049
1050
1051 # ── CLI unit tests ─────────────────────────────────────────────────────────────
1052
1053 # All CLI tests use ``require_repo`` patched to return the temp repo root.
1054
1055
1056 def _patch_repo(repo: pathlib.Path) -> AbstractContextManager[MagicMock]:
1057 return patch("muse.cli.commands.task_queue.require_repo", return_value=repo)
1058
1059
1060 class TestCliEnqueue:
1061 """run_enqueue: arg parsing, JSON output, text output, error paths."""
1062
1063 def test_enqueue_json_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1064 repo = _make_repo(tmp_path)
1065 args = _namespace(title="Enqueue test", json_out=True, priority=3, queue="q", tags="a,b")
1066 with _patch_repo(repo):
1067 run_enqueue(args)
1068 out = json.loads(capsys.readouterr().out)
1069 assert out["title"] == "Enqueue test"
1070 assert out["priority"] == 3
1071 assert out["queue"] == "q"
1072 assert "a" in out["tags"]
1073
1074 def test_enqueue_text_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1075 repo = _make_repo(tmp_path)
1076 args = _namespace(title="Text enqueue", json_out=False, priority=0, queue="default",
1077 tags="", payload="{}")
1078 with _patch_repo(repo):
1079 run_enqueue(args)
1080 out = capsys.readouterr().out
1081 assert "Task enqueued" in out
1082
1083 def test_enqueue_invalid_payload_exits_1(self, tmp_path: pathlib.Path) -> None:
1084 repo = _make_repo(tmp_path)
1085 args = _namespace(payload="not-json")
1086 with _patch_repo(repo):
1087 with pytest.raises(SystemExit) as exc:
1088 run_enqueue(args)
1089 assert exc.value.code == 1
1090
1091 def test_enqueue_payload_not_object_exits_1(self, tmp_path: pathlib.Path) -> None:
1092 repo = _make_repo(tmp_path)
1093 args = _namespace(payload="[1,2,3]")
1094 with _patch_repo(repo):
1095 with pytest.raises(SystemExit) as exc:
1096 run_enqueue(args)
1097 assert exc.value.code == 1
1098
1099 def test_enqueue_invalid_queue_exits_1(self, tmp_path: pathlib.Path) -> None:
1100 repo = _make_repo(tmp_path)
1101 args = _namespace(queue="bad queue!")
1102 with _patch_repo(repo):
1103 with pytest.raises(SystemExit) as exc:
1104 run_enqueue(args)
1105 assert exc.value.code == 1
1106
1107 def test_enqueue_elapsed_included_in_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1108 repo = _make_repo(tmp_path)
1109 args = _namespace(json_out=True, queue="default")
1110 with _patch_repo(repo):
1111 run_enqueue(args)
1112 out = json.loads(capsys.readouterr().out)
1113 assert "duration_ms" in out
1114
1115
1116 # ── New: enqueue input validation ─────────────────────────────────────────────
1117
1118
1119 from muse.cli.commands.task_queue import (
1120 _MAX_PAYLOAD_BYTES,
1121 _MAX_RUN_ID_LEN,
1122 _MAX_QUEUE_LEN,
1123 _MAX_TAGS,
1124 _MAX_TAG_LEN,
1125 _MAX_TITLE_LEN,
1126 )
1127 from muse.core.errors import ExitCode
1128
1129
1130 def _enqueue_ns(**kwargs: MsgpackValue) -> argparse.Namespace:
1131 """Build a Namespace with enqueue-appropriate defaults (queue='default')."""
1132 defaults = {
1133 "json_out": True,
1134 "run_id": "agent-1",
1135 "queue": "default",
1136 "title": "Test task",
1137 "priority": 0,
1138 "ttl_seconds": 86400,
1139 "payload": "{}",
1140 "tags": "",
1141 }
1142 defaults.update(kwargs)
1143 return argparse.Namespace(**defaults)
1144
1145
1146 class TestEnqueueInputValidation:
1147 """All enqueue validation fires before require_repo() and returns exit 1."""
1148
1149 def test_empty_title_exits_1(self, tmp_path: pathlib.Path) -> None:
1150 repo = _make_repo(tmp_path)
1151 args = _enqueue_ns(title="", json_out=True)
1152 with _patch_repo(repo):
1153 with pytest.raises(SystemExit) as exc:
1154 run_enqueue(args)
1155 assert exc.value.code == ExitCode.USER_ERROR
1156
1157 def test_whitespace_only_title_exits_1(self, tmp_path: pathlib.Path) -> None:
1158 repo = _make_repo(tmp_path)
1159 args = _enqueue_ns(title=" ", json_out=True)
1160 with _patch_repo(repo):
1161 with pytest.raises(SystemExit) as exc:
1162 run_enqueue(args)
1163 assert exc.value.code == ExitCode.USER_ERROR
1164
1165 def test_run_id_at_max_length_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1166 repo = _make_repo(tmp_path)
1167 args = _enqueue_ns(run_id="x" * _MAX_RUN_ID_LEN, json_out=True)
1168 with _patch_repo(repo):
1169 run_enqueue(args)
1170 out = json.loads(capsys.readouterr().out)
1171 assert out["created_by"] == "x" * _MAX_RUN_ID_LEN
1172
1173 def test_run_id_over_max_length_exits_1(self, tmp_path: pathlib.Path) -> None:
1174 repo = _make_repo(tmp_path)
1175 args = _enqueue_ns(run_id="x" * (_MAX_RUN_ID_LEN + 1), json_out=True)
1176 with _patch_repo(repo):
1177 with pytest.raises(SystemExit) as exc:
1178 run_enqueue(args)
1179 assert exc.value.code == ExitCode.USER_ERROR
1180
1181 def test_ttl_zero_exits_1(self, tmp_path: pathlib.Path) -> None:
1182 repo = _make_repo(tmp_path)
1183 args = _enqueue_ns(ttl_seconds=0, json_out=True)
1184 with _patch_repo(repo):
1185 with pytest.raises(SystemExit) as exc:
1186 run_enqueue(args)
1187 assert exc.value.code == ExitCode.USER_ERROR
1188
1189 def test_ttl_negative_exits_1(self, tmp_path: pathlib.Path) -> None:
1190 repo = _make_repo(tmp_path)
1191 args = _enqueue_ns(ttl_seconds=-1, json_out=True)
1192 with _patch_repo(repo):
1193 with pytest.raises(SystemExit) as exc:
1194 run_enqueue(args)
1195 assert exc.value.code == ExitCode.USER_ERROR
1196
1197 def test_ttl_one_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1198 repo = _make_repo(tmp_path)
1199 args = _enqueue_ns(ttl_seconds=1, json_out=True)
1200 with _patch_repo(repo):
1201 run_enqueue(args)
1202 out = json.loads(capsys.readouterr().out)
1203 assert out["ttl_seconds"] == 1
1204
1205 def test_payload_over_max_bytes_exits_1(self, tmp_path: pathlib.Path) -> None:
1206 repo = _make_repo(tmp_path)
1207 big = '{"k": "' + "a" * _MAX_PAYLOAD_BYTES + '"}'
1208 args = _enqueue_ns(payload=big, json_out=True)
1209 with _patch_repo(repo):
1210 with pytest.raises(SystemExit) as exc:
1211 run_enqueue(args)
1212 assert exc.value.code == ExitCode.USER_ERROR
1213
1214 def test_payload_at_max_bytes_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1215 repo = _make_repo(tmp_path)
1216 # Build a payload that is exactly _MAX_PAYLOAD_BYTES bytes
1217 val_len = _MAX_PAYLOAD_BYTES - len('{"k": ""}')
1218 payload = '{"k": "' + "a" * val_len + '"}'
1219 assert len(payload.encode()) <= _MAX_PAYLOAD_BYTES
1220 args = _enqueue_ns(payload=payload, json_out=True)
1221 with _patch_repo(repo):
1222 run_enqueue(args)
1223 out = json.loads(capsys.readouterr().out)
1224 assert "task_id" in out
1225
1226 def test_payload_not_json_exits_1(self, tmp_path: pathlib.Path) -> None:
1227 repo = _make_repo(tmp_path)
1228 args = _enqueue_ns(payload="not-json", json_out=True)
1229 with _patch_repo(repo):
1230 with pytest.raises(SystemExit) as exc:
1231 run_enqueue(args)
1232 assert exc.value.code == ExitCode.USER_ERROR
1233
1234 def test_payload_array_exits_1(self, tmp_path: pathlib.Path) -> None:
1235 repo = _make_repo(tmp_path)
1236 args = _enqueue_ns(payload="[1,2,3]", json_out=True)
1237 with _patch_repo(repo):
1238 with pytest.raises(SystemExit) as exc:
1239 run_enqueue(args)
1240 assert exc.value.code == ExitCode.USER_ERROR
1241
1242 def test_too_many_tags_exits_1(self, tmp_path: pathlib.Path) -> None:
1243 repo = _make_repo(tmp_path)
1244 tags = ",".join(f"tag{i}" for i in range(_MAX_TAGS + 1))
1245 args = _enqueue_ns(tags=tags, json_out=True)
1246 with _patch_repo(repo):
1247 with pytest.raises(SystemExit) as exc:
1248 run_enqueue(args)
1249 assert exc.value.code == ExitCode.USER_ERROR
1250
1251 def test_max_tags_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1252 repo = _make_repo(tmp_path)
1253 tags = ",".join(f"t{i}" for i in range(_MAX_TAGS))
1254 args = _enqueue_ns(tags=tags, json_out=True)
1255 with _patch_repo(repo):
1256 run_enqueue(args)
1257 out = json.loads(capsys.readouterr().out)
1258 assert len(out["tags"]) == _MAX_TAGS
1259
1260 def test_invalid_queue_name_exits_1(self, tmp_path: pathlib.Path) -> None:
1261 repo = _make_repo(tmp_path)
1262 args = _enqueue_ns(queue="bad queue!", json_out=True)
1263 with _patch_repo(repo):
1264 with pytest.raises(SystemExit) as exc:
1265 run_enqueue(args)
1266 assert exc.value.code == ExitCode.USER_ERROR
1267
1268 def test_queue_with_spaces_exits_1(self, tmp_path: pathlib.Path) -> None:
1269 repo = _make_repo(tmp_path)
1270 args = _enqueue_ns(queue="my queue", json_out=True)
1271 with _patch_repo(repo):
1272 with pytest.raises(SystemExit) as exc:
1273 run_enqueue(args)
1274 assert exc.value.code == ExitCode.USER_ERROR
1275
1276 def test_queue_name_too_long_exits_1(self, tmp_path: pathlib.Path) -> None:
1277 repo = _make_repo(tmp_path)
1278 args = _enqueue_ns(queue="a" * (_MAX_QUEUE_LEN + 1), json_out=True)
1279 with _patch_repo(repo):
1280 with pytest.raises(SystemExit) as exc:
1281 run_enqueue(args)
1282 assert exc.value.code == ExitCode.USER_ERROR
1283
1284 def test_valid_queue_name_with_hyphens_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1285 repo = _make_repo(tmp_path)
1286 args = _enqueue_ns(queue="my-queue-01", json_out=True)
1287 with _patch_repo(repo):
1288 run_enqueue(args)
1289 out = json.loads(capsys.readouterr().out)
1290 assert out["queue"] == "my-queue-01"
1291
1292 def test_valid_queue_name_with_underscores_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1293 repo = _make_repo(tmp_path)
1294 args = _enqueue_ns(queue="my_queue_01", json_out=True)
1295 with _patch_repo(repo):
1296 run_enqueue(args)
1297 out = json.loads(capsys.readouterr().out)
1298 assert out["queue"] == "my_queue_01"
1299
1300 def test_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None:
1301 """require_repo must never be called when title is empty."""
1302 repo = _make_repo(tmp_path)
1303 require_calls: list[bool] = []
1304
1305 def _fake_require() -> pathlib.Path:
1306 require_calls.append(True)
1307 return repo
1308
1309 args = _enqueue_ns(title="", json_out=True)
1310 with patch("muse.cli.commands.task_queue.require_repo", side_effect=_fake_require):
1311 with pytest.raises(SystemExit):
1312 run_enqueue(args)
1313 assert require_calls == [], "require_repo was called before validation"
1314
1315
1316 class TestEnqueueJsonErrors:
1317 """When --format json, all errors produce compact JSON on stdout (not text)."""
1318
1319 def test_empty_title_json_error(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1320 repo = _make_repo(tmp_path)
1321 args = _enqueue_ns(title="", json_out=True)
1322 with _patch_repo(repo):
1323 with pytest.raises(SystemExit):
1324 run_enqueue(args)
1325 raw = capsys.readouterr().out.strip()
1326 data = json.loads(raw)
1327 assert "error" in data
1328 assert "status" in data
1329
1330 def test_bad_payload_json_error(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1331 repo = _make_repo(tmp_path)
1332 args = _enqueue_ns(payload="not-json", json_out=True)
1333 with _patch_repo(repo):
1334 with pytest.raises(SystemExit):
1335 run_enqueue(args)
1336 raw = capsys.readouterr().out.strip()
1337 data = json.loads(raw)
1338 assert "error" in data
1339 assert data["status"] == "bad_payload"
1340
1341 def test_bad_queue_json_error(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1342 repo = _make_repo(tmp_path)
1343 args = _enqueue_ns(queue="bad queue!", json_out=True)
1344 with _patch_repo(repo):
1345 with pytest.raises(SystemExit):
1346 run_enqueue(args)
1347 raw = capsys.readouterr().out.strip()
1348 data = json.loads(raw)
1349 assert data["status"] == "bad_queue"
1350
1351 def test_json_error_is_compact_single_line(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1352 repo = _make_repo(tmp_path)
1353 args = _enqueue_ns(title="", json_out=True)
1354 with _patch_repo(repo):
1355 with pytest.raises(SystemExit):
1356 run_enqueue(args)
1357 raw = capsys.readouterr().out.strip()
1358 assert "\n" not in raw
1359
1360 def test_text_error_goes_to_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1361 repo = _make_repo(tmp_path)
1362 args = _enqueue_ns(title="", json_out=False)
1363 with _patch_repo(repo):
1364 with pytest.raises(SystemExit):
1365 run_enqueue(args)
1366 captured = capsys.readouterr()
1367 assert "❌" in captured.err
1368 assert captured.out == ""
1369
1370 def test_run_id_too_long_json_error(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1371 repo = _make_repo(tmp_path)
1372 args = _enqueue_ns(run_id="x" * (_MAX_RUN_ID_LEN + 1), json_out=True)
1373 with _patch_repo(repo):
1374 with pytest.raises(SystemExit):
1375 run_enqueue(args)
1376 raw = capsys.readouterr().out.strip()
1377 data = json.loads(raw)
1378 assert "error" in data
1379
1380
1381 class TestEnqueueCompactJson:
1382 """JSON output must be compact (no indent=2) and schema-complete."""
1383
1384 def test_success_json_is_single_line(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1385 repo = _make_repo(tmp_path)
1386 args = _enqueue_ns(json_out=True)
1387 with _patch_repo(repo):
1388 run_enqueue(args)
1389 raw = capsys.readouterr().out.strip()
1390 assert "\n" not in raw
1391 json.loads(raw) # must be valid JSON
1392
1393 def test_success_json_schema_keys(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1394 repo = _make_repo(tmp_path)
1395 args = _enqueue_ns(json_out=True, tags="a,b", priority=5)
1396 with _patch_repo(repo):
1397 run_enqueue(args)
1398 out = json.loads(capsys.readouterr().out)
1399 for key in ("schema", "task_id", "title", "priority", "queue",
1400 "ttl_seconds", "created_by", "created_at", "tags", "payload",
1401 "duration_ms"):
1402 assert key in out, f"missing key: {key}"
1403
1404 def test_success_json_values_correct(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1405 repo = _make_repo(tmp_path)
1406 args = _enqueue_ns(
1407 title="My task", json_out=True, queue="billing",
1408 priority=7, ttl_seconds=3600, run_id="orch-1",
1409 payload='{"addr": "x.py::fn"}', tags="billing,refactor",
1410 )
1411 with _patch_repo(repo):
1412 run_enqueue(args)
1413 out = json.loads(capsys.readouterr().out)
1414 assert out["title"] == "My task"
1415 assert out["queue"] == "billing"
1416 assert out["priority"] == 7
1417 assert out["ttl_seconds"] == 3600
1418 assert out["created_by"] == "orch-1"
1419 assert out["payload"] == {"addr": "x.py::fn"}
1420 assert "billing" in out["tags"]
1421 assert "refactor" in out["tags"]
1422
1423 def test_duration_ms_is_float(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1424 repo = _make_repo(tmp_path)
1425 args = _enqueue_ns(json_out=True)
1426 with _patch_repo(repo):
1427 run_enqueue(args)
1428 out = json.loads(capsys.readouterr().out)
1429 assert isinstance(out["duration_ms"], float)
1430
1431 def test_task_id_is_sha256(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1432 repo = _make_repo(tmp_path)
1433 args = _enqueue_ns(json_out=True)
1434 with _patch_repo(repo):
1435 run_enqueue(args)
1436 out = json.loads(capsys.readouterr().out)
1437 assert out["task_id"].startswith("sha256:"), f"expected sha256: prefix, got {out['task_id']!r}"
1438 assert len(out["task_id"]) == 71
1439
1440 def test_unique_task_ids_across_different_titles(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1441 repo = _make_repo(tmp_path)
1442 ids = []
1443 for i in range(10):
1444 args = _enqueue_ns(json_out=True, title=f"Task number {i}")
1445 with _patch_repo(repo):
1446 run_enqueue(args)
1447 out = json.loads(capsys.readouterr().out)
1448 ids.append(out["task_id"])
1449 assert len(set(ids)) == 10
1450
1451
1452 class TestEnqueueTextOutput:
1453 """Text-format output must be human-readable and safe."""
1454
1455 def test_success_text_contains_task_enqueued(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1456 repo = _make_repo(tmp_path)
1457 args = _enqueue_ns(json_out=False, title="Refactor billing")
1458 with _patch_repo(repo):
1459 run_enqueue(args)
1460 out = capsys.readouterr().out
1461 assert "Task enqueued" in out
1462
1463 def test_success_text_shows_title(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1464 repo = _make_repo(tmp_path)
1465 args = _enqueue_ns(json_out=False, title="Rewrite auth")
1466 with _patch_repo(repo):
1467 run_enqueue(args)
1468 out = capsys.readouterr().out
1469 assert "Rewrite auth" in out
1470
1471 def test_success_text_shows_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1472 repo = _make_repo(tmp_path)
1473 args = _enqueue_ns(json_out=False, queue="billing")
1474 with _patch_repo(repo):
1475 run_enqueue(args)
1476 out = capsys.readouterr().out
1477 assert "billing" in out
1478
1479 def test_success_text_shows_priority(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1480 repo = _make_repo(tmp_path)
1481 args = _enqueue_ns(json_out=False, priority=9)
1482 with _patch_repo(repo):
1483 run_enqueue(args)
1484 out = capsys.readouterr().out
1485 assert "9" in out
1486
1487 def test_success_text_shows_tags(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1488 repo = _make_repo(tmp_path)
1489 args = _enqueue_ns(json_out=False, tags="alpha,beta")
1490 with _patch_repo(repo):
1491 run_enqueue(args)
1492 out = capsys.readouterr().out
1493 assert "alpha" in out
1494 assert "beta" in out
1495
1496 def test_ansi_in_title_stripped_from_text(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1497 """ANSI sequences in title must not appear raw in text output."""
1498 repo = _make_repo(tmp_path)
1499 args = _enqueue_ns(json_out=False, title="\x1b[31mRed task\x1b[0m")
1500 with _patch_repo(repo):
1501 run_enqueue(args)
1502 out = capsys.readouterr().out
1503 assert "\x1b" not in out
1504
1505 def test_ansi_in_tags_stripped_from_text(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1506 repo = _make_repo(tmp_path)
1507 args = _enqueue_ns(json_out=False, tags="\x1b[31mmalicious\x1b[0m,safe")
1508 with _patch_repo(repo):
1509 run_enqueue(args)
1510 out = capsys.readouterr().out
1511 assert "\x1b" not in out
1512
1513
1514 class TestEnqueueIntegration:
1515 """Enqueue → claim → complete end-to-end, and enqueue → list."""
1516
1517 def test_enqueued_task_is_claimable(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1518 repo = _make_repo(tmp_path)
1519 args_e = _enqueue_ns(title="Claimable", json_out=True)
1520 with _patch_repo(repo):
1521 run_enqueue(args_e)
1522 capsys.readouterr() # discard
1523
1524 args_c = _namespace(run_id="worker-1", queue=None, claim_ttl=3600, json_out=True)
1525 with _patch_repo(repo):
1526 run_claim(args_c)
1527 out = json.loads(capsys.readouterr().out)
1528 assert out["status"] == "claimed"
1529 assert out["task"]["title"] == "Claimable"
1530
1531 def test_priority_ordering(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1532 """Higher priority task must be claimed first."""
1533 repo = _make_repo(tmp_path)
1534 for title, pri in [("Low", 0), ("High", 10), ("Mid", 5)]:
1535 args = _enqueue_ns(title=title, priority=pri, json_out=True)
1536 with _patch_repo(repo):
1537 run_enqueue(args)
1538 capsys.readouterr()
1539
1540 args_c = _namespace(run_id="worker", queue=None, claim_ttl=3600, json_out=True)
1541 with _patch_repo(repo):
1542 run_claim(args_c)
1543 out = json.loads(capsys.readouterr().out)
1544 assert out["task"]["title"] == "High"
1545
1546 def test_enqueue_then_list_shows_task(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1547 repo = _make_repo(tmp_path)
1548 args_e = _enqueue_ns(title="Listed task", json_out=True, tags="search-me")
1549 with _patch_repo(repo):
1550 run_enqueue(args_e)
1551 capsys.readouterr()
1552
1553 args_t = _namespace(json_out=True, status=None, queue=None, run_id=None)
1554 with _patch_repo(repo):
1555 run_tasks(args_t)
1556 out = json.loads(capsys.readouterr().out)
1557 titles = [i["title"] for i in out["items"]]
1558 assert "Listed task" in titles
1559
1560 def test_enqueue_with_payload_passes_through_to_claim(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1561 repo = _make_repo(tmp_path)
1562 args_e = _enqueue_ns(
1563 title="Payload task", json_out=True,
1564 payload='{"op": "rename", "from": "foo", "to": "bar"}',
1565 )
1566 with _patch_repo(repo):
1567 run_enqueue(args_e)
1568 capsys.readouterr()
1569
1570 args_c = _namespace(run_id="worker", queue=None, claim_ttl=3600, json_out=True)
1571 with _patch_repo(repo):
1572 run_claim(args_c)
1573 out = json.loads(capsys.readouterr().out)
1574 assert out["task"]["payload"]["op"] == "rename"
1575
1576
1577 class TestEnqueueStress:
1578 """Performance and concurrency under load."""
1579
1580 def test_enqueue_500_tasks_under_10s(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1581 repo = _make_repo(tmp_path)
1582 start = time.monotonic()
1583 for i in range(500):
1584 args = _enqueue_ns(
1585 title=f"Task {i}", json_out=True,
1586 priority=i % 10, queue="load-test",
1587 )
1588 with _patch_repo(repo):
1589 run_enqueue(args)
1590 capsys.readouterr()
1591 elapsed = time.monotonic() - start
1592 assert elapsed < 10.0, f"500 enqueues took {elapsed:.2f}s"
1593
1594 def test_concurrent_enqueue_produces_unique_ids(self, tmp_path: pathlib.Path) -> None:
1595 """20 concurrent threads each enqueue one uniquely-titled task — all IDs must be unique."""
1596 repo = _make_repo(tmp_path)
1597 task_ids: list[str] = []
1598 lock = threading.Lock()
1599
1600 def _enqueue(n: int) -> None:
1601 task = create_task(repo, f"concurrent task {n}", queue="default")
1602 with lock:
1603 task_ids.append(task.task_id)
1604
1605 threads = [threading.Thread(target=_enqueue, args=(i,)) for i in range(20)]
1606 for t in threads:
1607 t.start()
1608 for t in threads:
1609 t.join()
1610 assert len(set(task_ids)) == 20, "Duplicate task IDs detected under concurrency"
1611
1612 def test_enqueue_large_payload_at_boundary(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1613 """Payload at exactly the byte limit must be accepted."""
1614 repo = _make_repo(tmp_path)
1615 val_len = _MAX_PAYLOAD_BYTES - len('{"k": ""}')
1616 payload = '{"k": "' + "a" * val_len + '"}'
1617 assert len(payload.encode()) <= _MAX_PAYLOAD_BYTES
1618 args = _enqueue_ns(payload=payload, json_out=True)
1619 with _patch_repo(repo):
1620 run_enqueue(args)
1621 out = json.loads(capsys.readouterr().out)
1622 assert "task_id" in out
1623
1624 def test_enqueue_many_queues(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1625 """Tasks in different queues are independent."""
1626 repo = _make_repo(tmp_path)
1627 queues = [f"queue{i}" for i in range(20)]
1628 task_ids = set()
1629 for q in queues:
1630 args = _enqueue_ns(title=f"task for {q}", queue=q, json_out=True)
1631 with _patch_repo(repo):
1632 run_enqueue(args)
1633 out = json.loads(capsys.readouterr().out)
1634 task_ids.add(out["task_id"])
1635 assert len(task_ids) == 20
1636
1637
1638 class TestCliClaim:
1639 """run_claim: success, empty queue exit 1, JSON/text output."""
1640
1641 def test_claim_success_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1642 repo = _make_repo(tmp_path)
1643 with _patch_repo(repo):
1644 create_task(repo, "Claimable task")
1645 args = _namespace(run_id="agent-1", queue=None, claim_ttl=3600, json_out=True)
1646 run_claim(args)
1647 out = json.loads(capsys.readouterr().out)
1648 assert out["status"] == "claimed"
1649 assert out["claimer_run_id"] == "agent-1"
1650 assert "task" in out
1651
1652 def test_claim_empty_queue_exits_1(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1653 repo = _make_repo(tmp_path)
1654 args = _namespace(run_id="agent-1", queue=None, claim_ttl=3600, json_out=True)
1655 with _patch_repo(repo):
1656 with pytest.raises(SystemExit) as exc:
1657 run_claim(args)
1658 assert exc.value.code == 1
1659 out = json.loads(capsys.readouterr().out)
1660 assert out["status"] == "empty"
1661
1662 def test_claim_text_output_on_success(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1663 repo = _make_repo(tmp_path)
1664 with _patch_repo(repo):
1665 create_task(repo, "Text claim task")
1666 args = _namespace(run_id="agent-1", queue=None, claim_ttl=3600, json_out=False)
1667 run_claim(args)
1668 out = capsys.readouterr().out
1669 assert "Task claimed" in out
1670
1671 def test_claim_text_output_on_empty(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1672 repo = _make_repo(tmp_path)
1673 args = _namespace(run_id="agent-1", queue=None, claim_ttl=3600, json_out=False)
1674 with _patch_repo(repo):
1675 with pytest.raises(SystemExit):
1676 run_claim(args)
1677 out = capsys.readouterr().out
1678 assert "empty" in out.lower()
1679
1680
1681 # ── New: claim hardening tests ────────────────────────────────────────────────
1682
1683 from muse.cli.commands.task_queue import (
1684 _MIN_CLAIM_TTL,
1685 _MAX_CLAIM_TTL,
1686 _MAX_WAIT_SECONDS,
1687 )
1688
1689
1690 def _claim_ns(**kwargs: MsgpackValue) -> argparse.Namespace:
1691 """Build a Namespace with claim-appropriate defaults."""
1692 defaults = {
1693 "json_out": True,
1694 "run_id": "agent-1",
1695 "queue": None,
1696 "claim_ttl": 3600,
1697 "wait": 0,
1698 }
1699 defaults.update(kwargs)
1700 return argparse.Namespace(**defaults)
1701
1702
1703 class TestClaimInputValidation:
1704 """All claim validation fires before require_repo() — exit 1 on bad args."""
1705
1706 def test_run_id_at_max_length_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1707 repo = _make_repo(tmp_path)
1708 create_task(repo, "Task")
1709 args = _claim_ns(run_id="x" * _MAX_RUN_ID_LEN, json_out=True)
1710 with _patch_repo(repo):
1711 run_claim(args)
1712 out = json.loads(capsys.readouterr().out)
1713 assert out["claimer_run_id"] == "x" * _MAX_RUN_ID_LEN
1714
1715 def test_run_id_over_max_exits_1(self, tmp_path: pathlib.Path) -> None:
1716 repo = _make_repo(tmp_path)
1717 args = _claim_ns(run_id="x" * (_MAX_RUN_ID_LEN + 1), json_out=True)
1718 with _patch_repo(repo):
1719 with pytest.raises(SystemExit) as exc:
1720 run_claim(args)
1721 assert exc.value.code == 1
1722
1723 def test_run_id_too_long_json_error(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1724 repo = _make_repo(tmp_path)
1725 args = _claim_ns(run_id="x" * (_MAX_RUN_ID_LEN + 1), json_out=True)
1726 with _patch_repo(repo):
1727 with pytest.raises(SystemExit):
1728 run_claim(args)
1729 raw = capsys.readouterr().out.strip()
1730 data = json.loads(raw)
1731 assert "error" in data
1732 assert data["status"] == "bad_args"
1733
1734 def test_run_id_too_long_text_error(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1735 repo = _make_repo(tmp_path)
1736 args = _claim_ns(run_id="x" * (_MAX_RUN_ID_LEN + 1), json_out=False)
1737 with _patch_repo(repo):
1738 with pytest.raises(SystemExit):
1739 run_claim(args)
1740 captured = capsys.readouterr()
1741 assert "❌" in captured.err
1742 assert captured.out == ""
1743
1744 def test_claim_ttl_min_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1745 repo = _make_repo(tmp_path)
1746 create_task(repo, "Task")
1747 args = _claim_ns(claim_ttl=_MIN_CLAIM_TTL, json_out=True)
1748 with _patch_repo(repo):
1749 run_claim(args)
1750 out = json.loads(capsys.readouterr().out)
1751 assert out["status"] == "claimed"
1752
1753 def test_claim_ttl_max_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1754 repo = _make_repo(tmp_path)
1755 create_task(repo, "Task")
1756 args = _claim_ns(claim_ttl=_MAX_CLAIM_TTL, json_out=True)
1757 with _patch_repo(repo):
1758 run_claim(args)
1759 out = json.loads(capsys.readouterr().out)
1760 assert out["status"] == "claimed"
1761
1762 def test_claim_ttl_zero_exits_1(self, tmp_path: pathlib.Path) -> None:
1763 repo = _make_repo(tmp_path)
1764 args = _claim_ns(claim_ttl=0, json_out=True)
1765 with _patch_repo(repo):
1766 with pytest.raises(SystemExit) as exc:
1767 run_claim(args)
1768 assert exc.value.code == 1
1769
1770 def test_claim_ttl_negative_exits_1(self, tmp_path: pathlib.Path) -> None:
1771 repo = _make_repo(tmp_path)
1772 args = _claim_ns(claim_ttl=-1, json_out=True)
1773 with _patch_repo(repo):
1774 with pytest.raises(SystemExit) as exc:
1775 run_claim(args)
1776 assert exc.value.code == 1
1777
1778 def test_claim_ttl_over_max_exits_1(self, tmp_path: pathlib.Path) -> None:
1779 repo = _make_repo(tmp_path)
1780 args = _claim_ns(claim_ttl=_MAX_CLAIM_TTL + 1, json_out=True)
1781 with _patch_repo(repo):
1782 with pytest.raises(SystemExit) as exc:
1783 run_claim(args)
1784 assert exc.value.code == 1
1785
1786 def test_claim_ttl_invalid_json_error(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1787 repo = _make_repo(tmp_path)
1788 args = _claim_ns(claim_ttl=0, json_out=True)
1789 with _patch_repo(repo):
1790 with pytest.raises(SystemExit):
1791 run_claim(args)
1792 raw = capsys.readouterr().out.strip()
1793 data = json.loads(raw)
1794 assert "error" in data
1795
1796 def test_wait_over_max_exits_1(self, tmp_path: pathlib.Path) -> None:
1797 repo = _make_repo(tmp_path)
1798 args = _claim_ns(wait=_MAX_WAIT_SECONDS + 1, json_out=True)
1799 with _patch_repo(repo):
1800 with pytest.raises(SystemExit) as exc:
1801 run_claim(args)
1802 assert exc.value.code == 1
1803
1804 def test_wait_negative_exits_1(self, tmp_path: pathlib.Path) -> None:
1805 repo = _make_repo(tmp_path)
1806 args = _claim_ns(wait=-1, json_out=True)
1807 with _patch_repo(repo):
1808 with pytest.raises(SystemExit) as exc:
1809 run_claim(args)
1810 assert exc.value.code == 1
1811
1812 def test_invalid_queue_name_exits_1(self, tmp_path: pathlib.Path) -> None:
1813 repo = _make_repo(tmp_path)
1814 args = _claim_ns(queue="bad queue!", json_out=True)
1815 with _patch_repo(repo):
1816 with pytest.raises(SystemExit) as exc:
1817 run_claim(args)
1818 assert exc.value.code == 1
1819
1820 def test_invalid_queue_json_error(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1821 repo = _make_repo(tmp_path)
1822 args = _claim_ns(queue="bad queue!", json_out=True)
1823 with _patch_repo(repo):
1824 with pytest.raises(SystemExit):
1825 run_claim(args)
1826 raw = capsys.readouterr().out.strip()
1827 data = json.loads(raw)
1828 assert data["status"] == "bad_queue"
1829
1830 def test_queue_with_slash_exits_1(self, tmp_path: pathlib.Path) -> None:
1831 repo = _make_repo(tmp_path)
1832 args = _claim_ns(queue="../../etc/passwd", json_out=True)
1833 with _patch_repo(repo):
1834 with pytest.raises(SystemExit) as exc:
1835 run_claim(args)
1836 assert exc.value.code == 1
1837
1838 def test_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None:
1839 """require_repo must never be called when --run-id is too long."""
1840 repo = _make_repo(tmp_path)
1841 calls: list[bool] = []
1842
1843 def _fake_require() -> pathlib.Path:
1844 calls.append(True)
1845 return repo
1846
1847 args = _claim_ns(run_id="x" * (_MAX_RUN_ID_LEN + 1))
1848 with patch("muse.cli.commands.task_queue.require_repo", side_effect=_fake_require):
1849 with pytest.raises(SystemExit):
1850 run_claim(args)
1851 assert calls == [], "require_repo called before validation"
1852
1853 def test_valid_queue_name_filters_correctly(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1854 repo = _make_repo(tmp_path)
1855 create_task(repo, "billing task", queue="billing")
1856 args = _claim_ns(queue="billing", json_out=True)
1857 with _patch_repo(repo):
1858 run_claim(args)
1859 out = json.loads(capsys.readouterr().out)
1860 assert out["status"] == "claimed"
1861 assert out["task"]["queue"] == "billing"
1862
1863
1864 class TestClaimJsonOutput:
1865 """Compact JSON schema, single-line output, empty-queue schema."""
1866
1867 def test_success_json_is_compact(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1868 repo = _make_repo(tmp_path)
1869 create_task(repo, "Task")
1870 args = _claim_ns(json_out=True)
1871 with _patch_repo(repo):
1872 run_claim(args)
1873 raw = capsys.readouterr().out.strip()
1874 assert "\n" not in raw
1875 json.loads(raw)
1876
1877 def test_success_json_schema_keys(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1878 repo = _make_repo(tmp_path)
1879 create_task(repo, "Task")
1880 args = _claim_ns(json_out=True)
1881 with _patch_repo(repo):
1882 run_claim(args)
1883 out = json.loads(capsys.readouterr().out)
1884 for key in ("schema", "status", "task_id", "claimer_run_id",
1885 "claimed_at", "expires_at", "task", "duration_ms"):
1886 assert key in out, f"missing key: {key}"
1887
1888 def test_success_task_nested_schema(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1889 repo = _make_repo(tmp_path)
1890 create_task(repo, "Nested task", priority=5, queue="billing")
1891 args = _claim_ns(json_out=True)
1892 with _patch_repo(repo):
1893 run_claim(args)
1894 out = json.loads(capsys.readouterr().out)
1895 task = out["task"]
1896 assert task["title"] == "Nested task"
1897 assert task["priority"] == 5
1898 assert task["queue"] == "billing"
1899 for key in ("task_id", "title", "priority", "queue", "payload",
1900 "created_at", "created_by", "ttl_seconds", "tags"):
1901 assert key in task, f"task missing key: {key}"
1902
1903 def test_empty_queue_json_is_compact(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1904 repo = _make_repo(tmp_path)
1905 args = _claim_ns(json_out=True)
1906 with _patch_repo(repo):
1907 with pytest.raises(SystemExit):
1908 run_claim(args)
1909 raw = capsys.readouterr().out.strip()
1910 assert "\n" not in raw
1911 json.loads(raw)
1912
1913 def test_empty_queue_json_schema(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1914 repo = _make_repo(tmp_path)
1915 args = _claim_ns(json_out=True, queue="myqueue")
1916 with _patch_repo(repo):
1917 with pytest.raises(SystemExit):
1918 run_claim(args)
1919 out = json.loads(capsys.readouterr().out)
1920 assert out["status"] == "empty"
1921 assert out["queue"] == "myqueue"
1922 assert "duration_ms" in out
1923 assert "schema" in out
1924
1925 def test_duration_ms_is_float(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1926 repo = _make_repo(tmp_path)
1927 create_task(repo, "Task")
1928 args = _claim_ns(json_out=True)
1929 with _patch_repo(repo):
1930 run_claim(args)
1931 out = json.loads(capsys.readouterr().out)
1932 assert isinstance(out["duration_ms"], float)
1933
1934 def test_claimer_run_id_matches_arg(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1935 repo = _make_repo(tmp_path)
1936 create_task(repo, "Task")
1937 args = _claim_ns(run_id="my-special-agent", json_out=True)
1938 with _patch_repo(repo):
1939 run_claim(args)
1940 out = json.loads(capsys.readouterr().out)
1941 assert out["claimer_run_id"] == "my-special-agent"
1942
1943
1944 class TestClaimTextOutput:
1945 """Text-format output is human-readable and ANSI-safe."""
1946
1947 def test_success_shows_task_claimed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1948 repo = _make_repo(tmp_path)
1949 create_task(repo, "My important task")
1950 args = _claim_ns(json_out=False)
1951 with _patch_repo(repo):
1952 run_claim(args)
1953 out = capsys.readouterr().out
1954 assert "Task claimed" in out
1955
1956 def test_success_shows_title(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1957 repo = _make_repo(tmp_path)
1958 create_task(repo, "Rename billing function")
1959 args = _claim_ns(json_out=False)
1960 with _patch_repo(repo):
1961 run_claim(args)
1962 out = capsys.readouterr().out
1963 assert "Rename billing function" in out
1964
1965 def test_success_shows_expires_in(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1966 repo = _make_repo(tmp_path)
1967 create_task(repo, "Task")
1968 args = _claim_ns(json_out=False, claim_ttl=1800)
1969 with _patch_repo(repo):
1970 run_claim(args)
1971 out = capsys.readouterr().out
1972 assert "Expires in" in out or "expires in" in out.lower()
1973
1974 def test_success_shows_payload(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1975 repo = _make_repo(tmp_path)
1976 create_task(repo, "Task", payload={"op": "rename"})
1977 args = _claim_ns(json_out=False)
1978 with _patch_repo(repo):
1979 run_claim(args)
1980 out = capsys.readouterr().out
1981 assert "rename" in out
1982
1983 def test_success_shows_tags(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1984 repo = _make_repo(tmp_path)
1985 create_task(repo, "Task", tags=["billing", "refactor"])
1986 args = _claim_ns(json_out=False)
1987 with _patch_repo(repo):
1988 run_claim(args)
1989 out = capsys.readouterr().out
1990 assert "billing" in out
1991
1992 def test_ansi_in_run_id_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
1993 repo = _make_repo(tmp_path)
1994 create_task(repo, "Task")
1995 args = _claim_ns(run_id="\x1b[31mmalicious\x1b[0m", json_out=False)
1996 with _patch_repo(repo):
1997 run_claim(args)
1998 out = capsys.readouterr().out
1999 assert "\x1b" not in out
2000
2001 def test_ansi_in_title_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2002 repo = _make_repo(tmp_path)
2003 create_task(repo, "\x1b[31mRed title\x1b[0m")
2004 args = _claim_ns(json_out=False)
2005 with _patch_repo(repo):
2006 run_claim(args)
2007 out = capsys.readouterr().out
2008 assert "\x1b" not in out
2009
2010 def test_empty_text_shows_empty(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2011 repo = _make_repo(tmp_path)
2012 args = _claim_ns(json_out=False, queue="billing")
2013 with _patch_repo(repo):
2014 with pytest.raises(SystemExit):
2015 run_claim(args)
2016 out = capsys.readouterr().out
2017 assert "empty" in out.lower()
2018 assert "billing" in out
2019
2020
2021 class TestClaimWait:
2022 """--wait polls until a task appears or the deadline expires."""
2023
2024 def test_wait_zero_returns_immediately_on_empty(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2025 repo = _make_repo(tmp_path)
2026 args = _claim_ns(wait=0, json_out=True)
2027 start = time.monotonic()
2028 with _patch_repo(repo):
2029 with pytest.raises(SystemExit) as exc:
2030 run_claim(args)
2031 elapsed = time.monotonic() - start
2032 assert exc.value.code == 1
2033 assert elapsed < 1.0
2034
2035 def test_wait_times_out_on_empty_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2036 repo = _make_repo(tmp_path)
2037 args = _claim_ns(wait=1, json_out=True) # 1 s wait
2038 start = time.monotonic()
2039 with _patch_repo(repo):
2040 with pytest.raises(SystemExit) as exc:
2041 run_claim(args)
2042 elapsed = time.monotonic() - start
2043 assert exc.value.code == 1
2044 assert elapsed >= 0.9, f"wait ended too early: {elapsed:.2f}s"
2045 out = json.loads(capsys.readouterr().out)
2046 assert out["status"] == "empty"
2047
2048 def test_wait_finds_task_created_during_wait(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2049 """Task created 0.3 s into a 3 s wait must be found."""
2050 repo = _make_repo(tmp_path)
2051
2052 def _create_after_delay() -> None:
2053 time.sleep(0.3)
2054 create_task(repo, "Delayed task")
2055
2056 t = threading.Thread(target=_create_after_delay, daemon=True)
2057 t.start()
2058
2059 args = _claim_ns(wait=3, json_out=True)
2060 with _patch_repo(repo):
2061 run_claim(args)
2062 t.join()
2063
2064 out = json.loads(capsys.readouterr().out)
2065 assert out["status"] == "claimed"
2066 assert out["task"]["title"] == "Delayed task"
2067
2068 def test_wait_empty_json_includes_elapsed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2069 repo = _make_repo(tmp_path)
2070 args = _claim_ns(wait=1, json_out=True)
2071 with _patch_repo(repo):
2072 with pytest.raises(SystemExit):
2073 run_claim(args)
2074 out = json.loads(capsys.readouterr().out)
2075 assert out["duration_ms"] >= 0.9
2076
2077 def test_wait_text_mentions_wait_on_empty(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2078 repo = _make_repo(tmp_path)
2079 args = _claim_ns(wait=1, json_out=False)
2080 with _patch_repo(repo):
2081 with pytest.raises(SystemExit):
2082 run_claim(args)
2083 out = capsys.readouterr().out
2084 # Text output should mention the wait duration
2085 assert "after" in out.lower() or "1" in out
2086
2087
2088 class TestClaimQueueFilter:
2089 """--queue restricts claiming to one named queue."""
2090
2091 def test_queue_filter_claims_correct_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2092 repo = _make_repo(tmp_path)
2093 create_task(repo, "billing task", queue="billing")
2094 create_task(repo, "auth task", queue="auth")
2095 args = _claim_ns(queue="auth", json_out=True)
2096 with _patch_repo(repo):
2097 run_claim(args)
2098 out = json.loads(capsys.readouterr().out)
2099 assert out["task"]["queue"] == "auth"
2100 assert out["task"]["title"] == "auth task"
2101
2102 def test_queue_filter_empty_when_wrong_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2103 repo = _make_repo(tmp_path)
2104 create_task(repo, "billing task", queue="billing")
2105 args = _claim_ns(queue="auth", json_out=True)
2106 with _patch_repo(repo):
2107 with pytest.raises(SystemExit) as exc:
2108 run_claim(args)
2109 assert exc.value.code == 1
2110 out = json.loads(capsys.readouterr().out)
2111 assert out["status"] == "empty"
2112
2113 def test_no_queue_filter_claims_highest_priority(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2114 repo = _make_repo(tmp_path)
2115 create_task(repo, "low priority", queue="q1", priority=0)
2116 create_task(repo, "high priority", queue="q2", priority=10)
2117 args = _claim_ns(queue=None, json_out=True)
2118 with _patch_repo(repo):
2119 run_claim(args)
2120 out = json.loads(capsys.readouterr().out)
2121 assert out["task"]["title"] == "high priority"
2122
2123 def test_queue_filter_with_valid_chars(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2124 repo = _make_repo(tmp_path)
2125 create_task(repo, "task", queue="my_queue-01")
2126 args = _claim_ns(queue="my_queue-01", json_out=True)
2127 with _patch_repo(repo):
2128 run_claim(args)
2129 out = json.loads(capsys.readouterr().out)
2130 assert out["status"] == "claimed"
2131
2132
2133 class TestClaimIntegration:
2134 """Full lifecycle: enqueue → claim → complete/fail."""
2135
2136 def test_claim_then_complete(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2137 repo = _make_repo(tmp_path)
2138 task = create_task(repo, "Complete me")
2139 args_c = _claim_ns(run_id="worker", json_out=True)
2140 with _patch_repo(repo):
2141 run_claim(args_c)
2142 out_c = json.loads(capsys.readouterr().out)
2143 assert out_c["status"] == "claimed"
2144
2145 args_done = _namespace(task_id=task.task_id, run_id="worker",
2146 result='{"ok": true}', json_out=True)
2147 with _patch_repo(repo):
2148 run_complete(args_done)
2149 out_done = json.loads(capsys.readouterr().out)
2150 assert out_done["status"] == "completed"
2151
2152 def test_claim_then_fail(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2153 repo = _make_repo(tmp_path)
2154 task = create_task(repo, "Fail me")
2155 args_c = _claim_ns(run_id="worker", json_out=True)
2156 with _patch_repo(repo):
2157 run_claim(args_c)
2158 capsys.readouterr()
2159
2160 args_f = _namespace(task_id=task.task_id, run_id="worker",
2161 error="network timeout", json_out=True)
2162 with _patch_repo(repo):
2163 run_fail_task(args_f)
2164 out = json.loads(capsys.readouterr().out)
2165 assert out["status"] == "failed"
2166
2167 def test_second_claim_empty_after_first(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2168 repo = _make_repo(tmp_path)
2169 create_task(repo, "Only task")
2170 args = _claim_ns(json_out=True)
2171 with _patch_repo(repo):
2172 run_claim(args)
2173 capsys.readouterr()
2174 with _patch_repo(repo):
2175 with pytest.raises(SystemExit) as exc:
2176 run_claim(args)
2177 assert exc.value.code == 1
2178
2179 def test_priority_order_across_claims(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2180 repo = _make_repo(tmp_path)
2181 create_task(repo, "Low", priority=0)
2182 create_task(repo, "High", priority=9)
2183 create_task(repo, "Mid", priority=5)
2184
2185 titles = []
2186 for _ in range(3):
2187 args = _claim_ns(json_out=True)
2188 with _patch_repo(repo):
2189 run_claim(args)
2190 out = json.loads(capsys.readouterr().out)
2191 titles.append(out["task"]["title"])
2192
2193 assert titles == ["High", "Mid", "Low"]
2194
2195 def test_enqueue_claim_complete_via_cli(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2196 """Full round-trip using CLI commands only."""
2197 repo = _make_repo(tmp_path)
2198
2199 # Enqueue
2200 args_e = _enqueue_ns(title="CLI round-trip", payload='{"x": 1}', json_out=True)
2201 with _patch_repo(repo):
2202 run_enqueue(args_e)
2203 enq = json.loads(capsys.readouterr().out)
2204 task_id = enq["task_id"]
2205
2206 # Claim
2207 args_c = _claim_ns(run_id="cli-worker", json_out=True)
2208 with _patch_repo(repo):
2209 run_claim(args_c)
2210 clm = json.loads(capsys.readouterr().out)
2211 assert clm["status"] == "claimed"
2212 assert clm["task"]["payload"]["x"] == 1
2213
2214 # Complete
2215 args_done = _namespace(task_id=task_id, run_id="cli-worker",
2216 result='{"done": true}', json_out=True)
2217 with _patch_repo(repo):
2218 run_complete(args_done)
2219 done = json.loads(capsys.readouterr().out)
2220 assert done["status"] == "completed"
2221
2222
2223 class TestClaimStress:
2224 """Concurrent claiming and performance under load."""
2225
2226 def test_concurrent_claim_no_double_claim(self, tmp_path: pathlib.Path) -> None:
2227 """10 tasks + 20 competing agents — no task claimed twice."""
2228 repo = _make_repo(tmp_path)
2229 for i in range(10):
2230 create_task(repo, f"Task {i}")
2231
2232 claimed_ids: list[str] = []
2233 lock = threading.Lock()
2234
2235 def _agent(n: int) -> None:
2236 try:
2237 result = claim_next_task(repo, f"agent-{n}")
2238 if result is not None:
2239 task, _ = result
2240 with lock:
2241 claimed_ids.append(task.task_id)
2242 except Exception:
2243 pass
2244
2245 threads = [threading.Thread(target=_agent, args=(i,)) for i in range(20)]
2246 for t in threads:
2247 t.start()
2248 for t in threads:
2249 t.join()
2250
2251 assert len(claimed_ids) == len(set(claimed_ids)), "task claimed twice"
2252 assert len(claimed_ids) <= 10
2253
2254 def test_claim_100_tasks_sequential(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2255 repo = _make_repo(tmp_path)
2256 for i in range(100):
2257 create_task(repo, f"Task {i}", queue="load")
2258
2259 start = time.monotonic()
2260 claimed = 0
2261 while claimed < 100:
2262 args = _claim_ns(queue="load", json_out=True)
2263 with _patch_repo(repo):
2264 try:
2265 run_claim(args)
2266 claimed += 1
2267 capsys.readouterr()
2268 except SystemExit:
2269 break
2270 elapsed = time.monotonic() - start
2271 assert claimed == 100
2272 assert elapsed < 15.0, f"100 sequential claims took {elapsed:.2f}s"
2273
2274 def test_wait_zero_performance(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2275 """--wait 0 on an empty queue must return in < 0.5 s."""
2276 repo = _make_repo(tmp_path)
2277 for _ in range(5):
2278 args = _claim_ns(wait=0, json_out=True)
2279 start = time.monotonic()
2280 with _patch_repo(repo):
2281 with pytest.raises(SystemExit):
2282 run_claim(args)
2283 elapsed = time.monotonic() - start
2284 assert elapsed < 0.5, f"wait=0 took {elapsed:.3f}s"
2285 capsys.readouterr()
2286
2287
2288 class TestCliComplete:
2289 """run_complete: success, wrong claimer, missing task."""
2290
2291 def _setup(self, repo: pathlib.Path) -> TaskRecord:
2292 t = create_task(repo, "Completable task")
2293 claim_next_task(repo, "agent-1")
2294 return t
2295
2296 def test_complete_success_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2297 repo = _make_repo(tmp_path)
2298 t = self._setup(repo)
2299 args = _namespace(task_id=t.task_id, run_id="agent-1", result="{}", json_out=True)
2300 with _patch_repo(repo):
2301 run_complete(args)
2302 out = json.loads(capsys.readouterr().out)
2303 assert out["status"] == "completed"
2304
2305 def test_complete_with_result(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2306 repo = _make_repo(tmp_path)
2307 t = self._setup(repo)
2308 args = _namespace(task_id=t.task_id, run_id="agent-1",
2309 result='{"pr": 99}', json_out=True)
2310 with _patch_repo(repo):
2311 run_complete(args)
2312 out = json.loads(capsys.readouterr().out)
2313 assert out["result"] == {"pr": 99}
2314
2315 def test_complete_wrong_claimer_exits_1(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2316 repo = _make_repo(tmp_path)
2317 t = self._setup(repo)
2318 args = _namespace(task_id=t.task_id, run_id="impostor", result="{}", json_out=True)
2319 with _patch_repo(repo):
2320 with pytest.raises(SystemExit) as exc:
2321 run_complete(args)
2322 assert exc.value.code == 1
2323
2324 def test_complete_invalid_result_exits_1(self, tmp_path: pathlib.Path) -> None:
2325 repo = _make_repo(tmp_path)
2326 t = self._setup(repo)
2327 args = _namespace(task_id=t.task_id, run_id="agent-1", result="NOTJSON", json_out=True)
2328 with _patch_repo(repo):
2329 with pytest.raises(SystemExit) as exc:
2330 run_complete(args)
2331 assert exc.value.code == 1
2332
2333 def test_complete_text_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2334 repo = _make_repo(tmp_path)
2335 t = self._setup(repo)
2336 args = _namespace(task_id=t.task_id, run_id="agent-1", result="{}", json_out=False)
2337 with _patch_repo(repo):
2338 run_complete(args)
2339 out = capsys.readouterr().out
2340 assert "completed" in out.lower()
2341
2342
2343 # ── complete hardening ────────────────────────────────────────────────────────
2344
2345 from muse.cli.commands.task_queue import _MAX_RESULT_BYTES
2346
2347
2348 def _complete_ns(**kwargs: MsgpackValue) -> argparse.Namespace:
2349 """Build a Namespace with complete-appropriate defaults."""
2350 defaults = {
2351 "json_out": True,
2352 "run_id": "agent-1",
2353 "task_id": VALID_ID,
2354 "result": "{}",
2355 }
2356 defaults.update(kwargs)
2357 return argparse.Namespace(**defaults)
2358
2359
2360 class TestCompleteInputValidation:
2361 """All complete validation fires before require_repo() and exits 1."""
2362
2363 def _setup(self, repo: pathlib.Path) -> TaskRecord:
2364 t = create_task(repo, "Complete-me")
2365 claim_next_task(repo, "agent-1")
2366 return t
2367
2368 def test_run_id_too_long_exits_1(self, tmp_path: pathlib.Path) -> None:
2369 repo = _make_repo(tmp_path)
2370 t = self._setup(repo)
2371 args = _complete_ns(task_id=t.task_id, run_id="x" * 257)
2372 with _patch_repo(repo):
2373 with pytest.raises(SystemExit) as exc:
2374 run_complete(args)
2375 assert exc.value.code == ExitCode.USER_ERROR
2376
2377 def test_run_id_at_max_length_passes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2378 repo = _make_repo(tmp_path)
2379 t = self._setup(repo)
2380 args = _complete_ns(task_id=t.task_id, run_id="a" * 256)
2381 # claim was made by "agent-1" — different run_id triggers PermissionError
2382 # but validation itself must pass (no bad_args exit before I/O)
2383 with _patch_repo(repo):
2384 with pytest.raises(SystemExit) as exc:
2385 run_complete(args)
2386 # exits 1 due to wrong claimer, NOT bad_args — validation passed
2387 out = json.loads(capsys.readouterr().out)
2388 assert out.get("status") != "bad_args"
2389
2390 def test_result_not_json_exits_1(self, tmp_path: pathlib.Path) -> None:
2391 repo = _make_repo(tmp_path)
2392 t = self._setup(repo)
2393 args = _complete_ns(task_id=t.task_id, result="not-json")
2394 with _patch_repo(repo):
2395 with pytest.raises(SystemExit) as exc:
2396 run_complete(args)
2397 assert exc.value.code == ExitCode.USER_ERROR
2398
2399 def test_result_array_exits_1(self, tmp_path: pathlib.Path) -> None:
2400 repo = _make_repo(tmp_path)
2401 t = self._setup(repo)
2402 args = _complete_ns(task_id=t.task_id, result="[1, 2, 3]")
2403 with _patch_repo(repo):
2404 with pytest.raises(SystemExit) as exc:
2405 run_complete(args)
2406 assert exc.value.code == ExitCode.USER_ERROR
2407
2408 def test_result_scalar_exits_1(self, tmp_path: pathlib.Path) -> None:
2409 repo = _make_repo(tmp_path)
2410 t = self._setup(repo)
2411 args = _complete_ns(task_id=t.task_id, result="42")
2412 with _patch_repo(repo):
2413 with pytest.raises(SystemExit) as exc:
2414 run_complete(args)
2415 assert exc.value.code == ExitCode.USER_ERROR
2416
2417 def test_result_null_exits_1(self, tmp_path: pathlib.Path) -> None:
2418 repo = _make_repo(tmp_path)
2419 t = self._setup(repo)
2420 args = _complete_ns(task_id=t.task_id, result="null")
2421 with _patch_repo(repo):
2422 with pytest.raises(SystemExit) as exc:
2423 run_complete(args)
2424 assert exc.value.code == ExitCode.USER_ERROR
2425
2426 def test_result_too_large_exits_1(self, tmp_path: pathlib.Path) -> None:
2427 repo = _make_repo(tmp_path)
2428 t = self._setup(repo)
2429 big = json.dumps({"k": "v" * (_MAX_RESULT_BYTES + 1)})
2430 args = _complete_ns(task_id=t.task_id, result=big)
2431 with _patch_repo(repo):
2432 with pytest.raises(SystemExit) as exc:
2433 run_complete(args)
2434 assert exc.value.code == ExitCode.USER_ERROR
2435
2436 def test_result_at_max_size_passes_validation(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2437 repo = _make_repo(tmp_path)
2438 t = self._setup(repo)
2439 # Build compact JSON exactly at the limit using explicit separators
2440 prefix = '{"k":"'
2441 suffix = '"}'
2442 inner = "x" * (_MAX_RESULT_BYTES - len(prefix) - len(suffix))
2443 payload = prefix + inner + suffix
2444 assert len(payload.encode()) == _MAX_RESULT_BYTES
2445 args = _complete_ns(task_id=t.task_id, result=payload)
2446 with _patch_repo(repo):
2447 run_complete(args)
2448 out = json.loads(capsys.readouterr().out)
2449 assert out["status"] == "completed"
2450
2451 def test_invalid_task_id_exits_1(self, tmp_path: pathlib.Path) -> None:
2452 repo = _make_repo(tmp_path)
2453 args = _complete_ns(task_id="not-a-content-id")
2454 with _patch_repo(repo):
2455 with pytest.raises(SystemExit) as exc:
2456 run_complete(args)
2457 assert exc.value.code == ExitCode.USER_ERROR
2458
2459 def test_path_traversal_task_id_exits_1(self, tmp_path: pathlib.Path) -> None:
2460 repo = _make_repo(tmp_path)
2461 args = _complete_ns(task_id="../../etc/passwd")
2462 with _patch_repo(repo):
2463 with pytest.raises(SystemExit) as exc:
2464 run_complete(args)
2465 assert exc.value.code == ExitCode.USER_ERROR
2466
2467 def test_null_byte_task_id_exits_1(self, tmp_path: pathlib.Path) -> None:
2468 repo = _make_repo(tmp_path)
2469 args = _complete_ns(task_id="12345678-1234-4abc-8abc-123456789\x00ab")
2470 with _patch_repo(repo):
2471 with pytest.raises(SystemExit) as exc:
2472 run_complete(args)
2473 assert exc.value.code == ExitCode.USER_ERROR
2474
2475 def test_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None:
2476 """require_repo must NOT be called when task_id is invalid."""
2477 call_count = {"n": 0}
2478 original = __import__("muse.core.repo", fromlist=["require_repo"]).require_repo
2479
2480 def counting_require_repo() -> pathlib.Path:
2481 call_count["n"] += 1
2482 return original()
2483
2484 args = _complete_ns(task_id="BADUUID")
2485 with patch("muse.cli.commands.task_queue.require_repo", counting_require_repo):
2486 with pytest.raises(SystemExit):
2487 run_complete(args)
2488 assert call_count["n"] == 0, "require_repo was called before task_id validation"
2489
2490 def test_json_error_shape_bad_task_id(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2491 repo = _make_repo(tmp_path)
2492 args = _complete_ns(task_id="bad-id", json_out=True)
2493 with _patch_repo(repo):
2494 with pytest.raises(SystemExit):
2495 run_complete(args)
2496 out = json.loads(capsys.readouterr().out)
2497 assert "error" in out
2498 assert out["status"] == "bad_task_id"
2499
2500 def test_json_error_shape_bad_result(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2501 repo = _make_repo(tmp_path)
2502 args = _complete_ns(task_id=VALID_ID, result="NOTJSON", json_out=True)
2503 with _patch_repo(repo):
2504 with pytest.raises(SystemExit):
2505 run_complete(args)
2506 out = json.loads(capsys.readouterr().out)
2507 assert "error" in out
2508 assert out["status"] == "bad_args"
2509
2510 def test_text_error_goes_to_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2511 repo = _make_repo(tmp_path)
2512 args = _complete_ns(task_id="BADUUID", json_out=False)
2513 with _patch_repo(repo):
2514 with pytest.raises(SystemExit):
2515 run_complete(args)
2516 captured = capsys.readouterr()
2517 assert captured.out == ""
2518 assert "❌" in captured.err
2519
2520 def test_json_error_goes_to_stdout_not_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2521 repo = _make_repo(tmp_path)
2522 args = _complete_ns(task_id="BADUUID", json_out=True)
2523 with _patch_repo(repo):
2524 with pytest.raises(SystemExit):
2525 run_complete(args)
2526 captured = capsys.readouterr()
2527 assert captured.err == ""
2528 out = json.loads(captured.out)
2529 assert "error" in out
2530
2531
2532 class TestCompleteJsonOutput:
2533 """run_complete JSON output shape and compactness."""
2534
2535 def _setup(self, repo: pathlib.Path) -> TaskRecord:
2536 t = create_task(repo, "JSON task", queue="billing")
2537 claim_next_task(repo, "completer-1")
2538 return t
2539
2540 def test_json_is_compact(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2541 repo = _make_repo(tmp_path)
2542 t = self._setup(repo)
2543 args = _complete_ns(task_id=t.task_id, run_id="completer-1")
2544 with _patch_repo(repo):
2545 run_complete(args)
2546 raw = capsys.readouterr().out.strip()
2547 assert "\n" not in raw, "JSON output must be single line (compact)"
2548
2549 def test_json_has_required_keys(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2550 repo = _make_repo(tmp_path)
2551 t = self._setup(repo)
2552 args = _complete_ns(task_id=t.task_id, run_id="completer-1")
2553 with _patch_repo(repo):
2554 run_complete(args)
2555 out = json.loads(capsys.readouterr().out)
2556 for key in ("schema", "task_id", "claimer_run_id", "status",
2557 "claimed_at", "expires_at", "result", "duration_ms"):
2558 assert key in out, f"missing key: {key}"
2559
2560 def test_json_status_is_completed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2561 repo = _make_repo(tmp_path)
2562 t = self._setup(repo)
2563 args = _complete_ns(task_id=t.task_id, run_id="completer-1")
2564 with _patch_repo(repo):
2565 run_complete(args)
2566 out = json.loads(capsys.readouterr().out)
2567 assert out["status"] == "completed"
2568
2569 def test_json_result_field_populated(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2570 repo = _make_repo(tmp_path)
2571 t = self._setup(repo)
2572 args = _complete_ns(task_id=t.task_id, run_id="completer-1",
2573 result='{"pr_url": "http://x/1"}')
2574 with _patch_repo(repo):
2575 run_complete(args)
2576 out = json.loads(capsys.readouterr().out)
2577 assert out["result"] == {"pr_url": "http://x/1"}
2578
2579 def test_json_empty_result_is_null(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2580 """Empty dict {} collapses to null in the claim (falsy check)."""
2581 repo = _make_repo(tmp_path)
2582 t = self._setup(repo)
2583 args = _complete_ns(task_id=t.task_id, run_id="completer-1", result="{}")
2584 with _patch_repo(repo):
2585 run_complete(args)
2586 out = json.loads(capsys.readouterr().out)
2587 assert out["result"] is None
2588
2589 def test_json_elapsed_is_float(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2590 repo = _make_repo(tmp_path)
2591 t = self._setup(repo)
2592 args = _complete_ns(task_id=t.task_id, run_id="completer-1")
2593 with _patch_repo(repo):
2594 run_complete(args)
2595 out = json.loads(capsys.readouterr().out)
2596 assert isinstance(out["duration_ms"], float)
2597
2598 def test_json_claimer_run_id_matches(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2599 repo = _make_repo(tmp_path)
2600 t = self._setup(repo)
2601 args = _complete_ns(task_id=t.task_id, run_id="completer-1")
2602 with _patch_repo(repo):
2603 run_complete(args)
2604 out = json.loads(capsys.readouterr().out)
2605 assert out["claimer_run_id"] == "completer-1"
2606
2607 def test_json_wrong_claimer_error_shape(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2608 repo = _make_repo(tmp_path)
2609 t = self._setup(repo)
2610 args = _complete_ns(task_id=t.task_id, run_id="impostor", json_out=True)
2611 with _patch_repo(repo):
2612 with pytest.raises(SystemExit) as exc:
2613 run_complete(args)
2614 assert exc.value.code == 1
2615 out = json.loads(capsys.readouterr().out)
2616 assert "error" in out
2617
2618 def test_json_missing_task_error_shape(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2619 repo = _make_repo(tmp_path)
2620 # repo exists but no task was created
2621 args = _complete_ns(task_id=VALID_ID, run_id="completer-1", json_out=True)
2622 with _patch_repo(repo):
2623 with pytest.raises(SystemExit) as exc:
2624 run_complete(args)
2625 assert exc.value.code == 1
2626 out = json.loads(capsys.readouterr().out)
2627 assert "error" in out
2628
2629
2630 class TestCompleteTextOutput:
2631 """run_complete text output content."""
2632
2633 def _setup(self, repo: pathlib.Path, title: str = "My Task", queue: str = "default") -> TaskRecord:
2634 t = create_task(repo, title, queue=queue)
2635 claim_next_task(repo, "agent-1")
2636 return t
2637
2638 def test_text_shows_completed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2639 repo = _make_repo(tmp_path)
2640 t = self._setup(repo)
2641 args = _complete_ns(task_id=t.task_id, run_id="agent-1", json_out=False)
2642 with _patch_repo(repo):
2643 run_complete(args)
2644 out = capsys.readouterr().out
2645 assert "completed" in out.lower()
2646
2647 def test_text_shows_task_title(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2648 repo = _make_repo(tmp_path)
2649 t = self._setup(repo, title="Rename billing module")
2650 args = _complete_ns(task_id=t.task_id, run_id="agent-1", json_out=False)
2651 with _patch_repo(repo):
2652 run_complete(args)
2653 out = capsys.readouterr().out
2654 assert "Rename billing module" in out
2655
2656 def test_text_shows_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2657 repo = _make_repo(tmp_path)
2658 t = self._setup(repo, queue="billing")
2659 args = _complete_ns(task_id=t.task_id, run_id="agent-1", json_out=False)
2660 with _patch_repo(repo):
2661 run_complete(args)
2662 out = capsys.readouterr().out
2663 assert "billing" in out
2664
2665 def test_text_shows_claimer(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2666 repo = _make_repo(tmp_path)
2667 t = self._setup(repo)
2668 args = _complete_ns(task_id=t.task_id, run_id="agent-1", json_out=False)
2669 with _patch_repo(repo):
2670 run_complete(args)
2671 out = capsys.readouterr().out
2672 assert "agent-1" in out
2673
2674 def test_text_shows_result_when_provided(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2675 repo = _make_repo(tmp_path)
2676 t = self._setup(repo)
2677 args = _complete_ns(task_id=t.task_id, run_id="agent-1",
2678 result='{"pr": 42}', json_out=False)
2679 with _patch_repo(repo):
2680 run_complete(args)
2681 out = capsys.readouterr().out
2682 assert "42" in out
2683
2684 def test_text_shows_elapsed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2685 repo = _make_repo(tmp_path)
2686 t = self._setup(repo)
2687 args = _complete_ns(task_id=t.task_id, run_id="agent-1", json_out=False)
2688 with _patch_repo(repo):
2689 run_complete(args)
2690 out = capsys.readouterr().out
2691 assert "s)" in out # e.g. "(0.003s)"
2692
2693 def test_ansi_injection_in_run_id_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2694 malicious_id = "agent\x1b[31mRED\x1b[0m"
2695 repo = _make_repo(tmp_path)
2696 t = create_task(repo, "task")
2697 claim_next_task(repo, malicious_id)
2698 args = _complete_ns(task_id=t.task_id, run_id=malicious_id, json_out=False)
2699 with _patch_repo(repo):
2700 run_complete(args)
2701 out = capsys.readouterr().out
2702 assert "\x1b" not in out
2703
2704 def test_ansi_injection_in_title_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2705 malicious_title = "task\x1b[1mBOLD\x1b[0m"
2706 repo = _make_repo(tmp_path)
2707 t = create_task(repo, malicious_title)
2708 claim_next_task(repo, "agent-1")
2709 args = _complete_ns(task_id=t.task_id, run_id="agent-1", json_out=False)
2710 with _patch_repo(repo):
2711 run_complete(args)
2712 out = capsys.readouterr().out
2713 assert "\x1b" not in out
2714
2715
2716 class TestCompleteIntegration:
2717 """Full lifecycle integration tests for run_complete."""
2718
2719 def test_completed_status_persisted_to_disk(self, tmp_path: pathlib.Path) -> None:
2720 repo = _make_repo(tmp_path)
2721 t = create_task(repo, "persist-me")
2722 claim_next_task(repo, "worker")
2723 args = _complete_ns(task_id=t.task_id, run_id="worker")
2724 with _patch_repo(repo):
2725 run_complete(args)
2726 claim = load_claim(repo, t.task_id)
2727 assert claim is not None
2728 assert claim.status == "completed"
2729
2730 def test_result_persisted_to_disk(self, tmp_path: pathlib.Path) -> None:
2731 repo = _make_repo(tmp_path)
2732 t = create_task(repo, "result-me")
2733 claim_next_task(repo, "worker")
2734 args = _complete_ns(task_id=t.task_id, run_id="worker",
2735 result='{"sha": "abc123"}')
2736 with _patch_repo(repo):
2737 run_complete(args)
2738 claim = load_claim(repo, t.task_id)
2739 assert claim is not None
2740 assert claim.result == {"sha": "abc123"}
2741
2742 def test_double_complete_fails(self, tmp_path: pathlib.Path) -> None:
2743 repo = _make_repo(tmp_path)
2744 t = create_task(repo, "once only")
2745 claim_next_task(repo, "worker")
2746 args = _complete_ns(task_id=t.task_id, run_id="worker")
2747 with _patch_repo(repo):
2748 run_complete(args)
2749 with _patch_repo(repo):
2750 with pytest.raises(SystemExit) as exc:
2751 run_complete(args)
2752 assert exc.value.code == 1
2753
2754 def test_complete_missing_claim_fails(self, tmp_path: pathlib.Path) -> None:
2755 """Task exists but was never claimed."""
2756 repo = _make_repo(tmp_path)
2757 t = create_task(repo, "unclaimed")
2758 args = _complete_ns(task_id=t.task_id, run_id="worker")
2759 with _patch_repo(repo):
2760 with pytest.raises(SystemExit) as exc:
2761 run_complete(args)
2762 assert exc.value.code == 1
2763
2764 def test_enqueue_claim_complete_full_cycle(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2765 """Full CLI round-trip: enqueue → claim → complete."""
2766 repo = _make_repo(tmp_path)
2767 # enqueue
2768 eq_args = _enqueue_ns(title="e2e task", queue="default")
2769 with _patch_repo(repo):
2770 run_enqueue(eq_args)
2771 task_id = json.loads(capsys.readouterr().out)["task_id"]
2772 # claim
2773 cl_args = _claim_ns(run_id="e2e-agent")
2774 with _patch_repo(repo):
2775 run_claim(cl_args)
2776 capsys.readouterr()
2777 # complete
2778 co_args = _complete_ns(task_id=task_id, run_id="e2e-agent",
2779 result='{"done": true}')
2780 with _patch_repo(repo):
2781 run_complete(co_args)
2782 out = json.loads(capsys.readouterr().out)
2783 assert out["status"] == "completed"
2784 assert out["result"] == {"done": True}
2785
2786 def test_complete_with_unicode_result(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2787 repo = _make_repo(tmp_path)
2788 t = create_task(repo, "unicode")
2789 claim_next_task(repo, "agent-1")
2790 args = _complete_ns(task_id=t.task_id, run_id="agent-1",
2791 result='{"msg": "héllo wörld 🎉"}')
2792 with _patch_repo(repo):
2793 run_complete(args)
2794 out = json.loads(capsys.readouterr().out)
2795 assert out["result"]["msg"] == "héllo wörld 🎉"
2796
2797
2798 class TestCompleteStress:
2799 """Concurrency and throughput stress tests for run_complete."""
2800
2801 def test_20_agents_each_claim_and_complete_unique_task(self, tmp_path: pathlib.Path) -> None:
2802 """20 concurrent agents each claim+complete a unique task with no conflicts."""
2803 import concurrent.futures
2804 repo = _make_repo(tmp_path)
2805 tasks = [create_task(repo, f"task-{i}") for i in range(20)]
2806
2807 completed: set[str] = set()
2808 lock = threading.Lock()
2809 errors: list[str] = []
2810
2811 def claim_and_complete(task: "TaskRecord") -> None:
2812 run_id = f"agent-{task.task_id[:8]}"
2813 result = claim_next_task(repo, run_id, queue=task.queue)
2814 if result is None:
2815 errors.append(f"no task for {run_id}")
2816 return
2817 claimed_task, _ = result
2818 try:
2819 complete_task(repo, claimed_task.task_id, run_id)
2820 except Exception as exc: # noqa: BLE001
2821 errors.append(str(exc))
2822 return
2823 with lock:
2824 completed.add(claimed_task.task_id)
2825
2826 with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool:
2827 list(pool.map(claim_and_complete, tasks))
2828
2829 assert not errors, f"errors: {errors}"
2830 assert len(completed) == 20, f"only {len(completed)}/20 completed"
2831
2832 def test_100_sequential_completes_under_10s(self, tmp_path: pathlib.Path) -> None:
2833 repo = _make_repo(tmp_path)
2834 for i in range(100):
2835 t = create_task(repo, f"seq-{i}")
2836 claim_next_task(repo, "batch-worker")
2837 start = time.monotonic()
2838 complete_task(repo, t.task_id, "batch-worker")
2839 assert time.monotonic() - start < 0.15, f"task {i} took too long"
2840
2841 def test_complete_via_run_complete_100_sequential(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2842 repo = _make_repo(tmp_path)
2843 start = time.monotonic()
2844 for i in range(100):
2845 t = create_task(repo, f"cli-{i}")
2846 claim_next_task(repo, "cli-worker")
2847 args = _complete_ns(task_id=t.task_id, run_id="cli-worker")
2848 with _patch_repo(repo):
2849 run_complete(args)
2850 capsys.readouterr()
2851 elapsed = time.monotonic() - start
2852 assert elapsed < 15.0, f"100 CLI completes took {elapsed:.1f}s"
2853
2854
2855 class TestCliFailTask:
2856 """run_fail_task: success, wrong claimer, text/json output."""
2857
2858 def _setup(self, repo: pathlib.Path) -> TaskRecord:
2859 t = create_task(repo, "Failing task")
2860 claim_next_task(repo, "agent-1")
2861 return t
2862
2863 def test_fail_success_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2864 repo = _make_repo(tmp_path)
2865 t = self._setup(repo)
2866 args = _namespace(task_id=t.task_id, run_id="agent-1", error="network down", json_out=True)
2867 with _patch_repo(repo):
2868 run_fail_task(args)
2869 out = json.loads(capsys.readouterr().out)
2870 assert out["status"] == "failed"
2871 assert out["error"] == "network down"
2872
2873 def test_fail_wrong_claimer_exits_1(self, tmp_path: pathlib.Path) -> None:
2874 repo = _make_repo(tmp_path)
2875 t = self._setup(repo)
2876 args = _namespace(task_id=t.task_id, run_id="wrong", error="x", json_out=True)
2877 with _patch_repo(repo):
2878 with pytest.raises(SystemExit) as exc:
2879 run_fail_task(args)
2880 assert exc.value.code == 1
2881
2882 def test_fail_text_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2883 repo = _make_repo(tmp_path)
2884 t = self._setup(repo)
2885 args = _namespace(task_id=t.task_id, run_id="agent-1", error="boom", json_out=False)
2886 with _patch_repo(repo):
2887 run_fail_task(args)
2888 out = capsys.readouterr().out
2889 assert "failed" in out.lower()
2890
2891
2892 # ── fail-task hardening ───────────────────────────────────────────────────────
2893
2894 from muse.cli.commands.task_queue import _MAX_ERROR_LEN
2895
2896
2897 def _fail_ns(**kwargs: MsgpackValue) -> argparse.Namespace:
2898 """Build a Namespace with fail-task-appropriate defaults."""
2899 defaults = {
2900 "json_out": True,
2901 "run_id": "agent-1",
2902 "task_id": VALID_ID,
2903 "error": "something went wrong",
2904 }
2905 defaults.update(kwargs)
2906 return argparse.Namespace(**defaults)
2907
2908
2909 class TestFailTaskInputValidation:
2910 """All fail-task validation fires before require_repo() and exits 1."""
2911
2912 def _setup(self, repo: pathlib.Path) -> TaskRecord:
2913 t = create_task(repo, "Failable task")
2914 claim_next_task(repo, "agent-1")
2915 return t
2916
2917 def test_run_id_too_long_exits_1(self, tmp_path: pathlib.Path) -> None:
2918 repo = _make_repo(tmp_path)
2919 t = self._setup(repo)
2920 args = _fail_ns(task_id=t.task_id, run_id="x" * 257)
2921 with _patch_repo(repo):
2922 with pytest.raises(SystemExit) as exc:
2923 run_fail_task(args)
2924 assert exc.value.code == ExitCode.USER_ERROR
2925
2926 def test_run_id_at_max_length_passes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2927 repo = _make_repo(tmp_path)
2928 t = self._setup(repo)
2929 # 256-char run_id is valid length; mismatch with claimer triggers PermissionError
2930 args = _fail_ns(task_id=t.task_id, run_id="b" * 256)
2931 with _patch_repo(repo):
2932 with pytest.raises(SystemExit) as exc:
2933 run_fail_task(args)
2934 out = json.loads(capsys.readouterr().out)
2935 # Exits 1 due to wrong claimer, NOT bad_args — length validation passed
2936 assert out.get("status") != "bad_args"
2937
2938 def test_error_too_long_exits_1(self, tmp_path: pathlib.Path) -> None:
2939 repo = _make_repo(tmp_path)
2940 t = self._setup(repo)
2941 args = _fail_ns(task_id=t.task_id, error="e" * (_MAX_ERROR_LEN + 1))
2942 with _patch_repo(repo):
2943 with pytest.raises(SystemExit) as exc:
2944 run_fail_task(args)
2945 assert exc.value.code == ExitCode.USER_ERROR
2946
2947 def test_error_at_max_length_passes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2948 repo = _make_repo(tmp_path)
2949 t = self._setup(repo)
2950 args = _fail_ns(task_id=t.task_id, run_id="agent-1",
2951 error="e" * _MAX_ERROR_LEN)
2952 with _patch_repo(repo):
2953 run_fail_task(args)
2954 out = json.loads(capsys.readouterr().out)
2955 assert out["status"] == "failed"
2956
2957 def test_empty_error_allowed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
2958 repo = _make_repo(tmp_path)
2959 t = self._setup(repo)
2960 args = _fail_ns(task_id=t.task_id, run_id="agent-1", error="")
2961 with _patch_repo(repo):
2962 run_fail_task(args)
2963 out = json.loads(capsys.readouterr().out)
2964 assert out["status"] == "failed"
2965
2966 def test_invalid_task_id_exits_1(self, tmp_path: pathlib.Path) -> None:
2967 repo = _make_repo(tmp_path)
2968 args = _fail_ns(task_id="not-a-content-id")
2969 with _patch_repo(repo):
2970 with pytest.raises(SystemExit) as exc:
2971 run_fail_task(args)
2972 assert exc.value.code == ExitCode.USER_ERROR
2973
2974 def test_path_traversal_task_id_exits_1(self, tmp_path: pathlib.Path) -> None:
2975 repo = _make_repo(tmp_path)
2976 args = _fail_ns(task_id="../../etc/passwd")
2977 with _patch_repo(repo):
2978 with pytest.raises(SystemExit) as exc:
2979 run_fail_task(args)
2980 assert exc.value.code == ExitCode.USER_ERROR
2981
2982 def test_null_byte_task_id_exits_1(self, tmp_path: pathlib.Path) -> None:
2983 repo = _make_repo(tmp_path)
2984 args = _fail_ns(task_id="12345678-1234-4abc-8abc-123456789\x00ab")
2985 with _patch_repo(repo):
2986 with pytest.raises(SystemExit) as exc:
2987 run_fail_task(args)
2988 assert exc.value.code == ExitCode.USER_ERROR
2989
2990 def test_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None:
2991 """require_repo must NOT be called when task_id is invalid."""
2992 call_count = {"n": 0}
2993
2994 def counting_require_repo() -> pathlib.Path:
2995 call_count["n"] += 1
2996 raise RuntimeError("should not reach here")
2997
2998 args = _fail_ns(task_id="BADUUID")
2999 with patch("muse.cli.commands.task_queue.require_repo", counting_require_repo):
3000 with pytest.raises(SystemExit):
3001 run_fail_task(args)
3002 assert call_count["n"] == 0, "require_repo called before task_id validation"
3003
3004 def test_run_id_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None:
3005 call_count = {"n": 0}
3006
3007 def counting_require_repo() -> pathlib.Path:
3008 call_count["n"] += 1
3009 raise RuntimeError("should not reach here")
3010
3011 args = _fail_ns(task_id=VALID_ID, run_id="r" * 300)
3012 with patch("muse.cli.commands.task_queue.require_repo", counting_require_repo):
3013 with pytest.raises(SystemExit):
3014 run_fail_task(args)
3015 assert call_count["n"] == 0
3016
3017 def test_error_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None:
3018 call_count = {"n": 0}
3019
3020 def counting_require_repo() -> pathlib.Path:
3021 call_count["n"] += 1
3022 raise RuntimeError("should not reach here")
3023
3024 args = _fail_ns(task_id=VALID_ID, error="e" * 5000)
3025 with patch("muse.cli.commands.task_queue.require_repo", counting_require_repo):
3026 with pytest.raises(SystemExit):
3027 run_fail_task(args)
3028 assert call_count["n"] == 0
3029
3030 def test_json_error_shape_bad_task_id(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3031 repo = _make_repo(tmp_path)
3032 args = _fail_ns(task_id="bad-id", json_out=True)
3033 with _patch_repo(repo):
3034 with pytest.raises(SystemExit):
3035 run_fail_task(args)
3036 out = json.loads(capsys.readouterr().out)
3037 assert "error" in out
3038 assert out["status"] == "bad_task_id"
3039
3040 def test_json_error_shape_bad_run_id(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3041 repo = _make_repo(tmp_path)
3042 args = _fail_ns(task_id=VALID_ID, run_id="x" * 300, json_out=True)
3043 with _patch_repo(repo):
3044 with pytest.raises(SystemExit):
3045 run_fail_task(args)
3046 out = json.loads(capsys.readouterr().out)
3047 assert "error" in out
3048 assert out["status"] == "bad_args"
3049
3050 def test_json_error_shape_bad_error_msg(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3051 repo = _make_repo(tmp_path)
3052 args = _fail_ns(task_id=VALID_ID, error="e" * 5000, json_out=True)
3053 with _patch_repo(repo):
3054 with pytest.raises(SystemExit):
3055 run_fail_task(args)
3056 out = json.loads(capsys.readouterr().out)
3057 assert "error" in out
3058 assert out["status"] == "bad_args"
3059
3060 def test_text_error_goes_to_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3061 repo = _make_repo(tmp_path)
3062 args = _fail_ns(task_id="BADUUID", json_out=False)
3063 with _patch_repo(repo):
3064 with pytest.raises(SystemExit):
3065 run_fail_task(args)
3066 captured = capsys.readouterr()
3067 assert captured.out == ""
3068 assert "❌" in captured.err
3069
3070 def test_json_error_goes_to_stdout_not_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3071 repo = _make_repo(tmp_path)
3072 args = _fail_ns(task_id="BADUUID", json_out=True)
3073 with _patch_repo(repo):
3074 with pytest.raises(SystemExit):
3075 run_fail_task(args)
3076 captured = capsys.readouterr()
3077 assert captured.err == ""
3078 out = json.loads(captured.out)
3079 assert "error" in out
3080
3081
3082 class TestFailTaskJsonOutput:
3083 """run_fail_task JSON output shape and compactness."""
3084
3085 def _setup(self, repo: pathlib.Path, queue: str = "default") -> TaskRecord:
3086 t = create_task(repo, "JSON fail task", queue=queue)
3087 claim_next_task(repo, "failer-1")
3088 return t
3089
3090 def test_json_is_compact(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3091 repo = _make_repo(tmp_path)
3092 t = self._setup(repo)
3093 args = _fail_ns(task_id=t.task_id, run_id="failer-1")
3094 with _patch_repo(repo):
3095 run_fail_task(args)
3096 raw = capsys.readouterr().out.strip()
3097 assert "\n" not in raw, "JSON output must be single line (compact)"
3098
3099 def test_json_has_required_keys(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3100 repo = _make_repo(tmp_path)
3101 t = self._setup(repo)
3102 args = _fail_ns(task_id=t.task_id, run_id="failer-1")
3103 with _patch_repo(repo):
3104 run_fail_task(args)
3105 out = json.loads(capsys.readouterr().out)
3106 for key in ("schema", "task_id", "claimer_run_id", "status",
3107 "claimed_at", "expires_at", "error", "duration_ms"):
3108 assert key in out, f"missing key: {key}"
3109
3110 def test_json_status_is_failed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3111 repo = _make_repo(tmp_path)
3112 t = self._setup(repo)
3113 args = _fail_ns(task_id=t.task_id, run_id="failer-1")
3114 with _patch_repo(repo):
3115 run_fail_task(args)
3116 out = json.loads(capsys.readouterr().out)
3117 assert out["status"] == "failed"
3118
3119 def test_json_error_field_populated(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3120 repo = _make_repo(tmp_path)
3121 t = self._setup(repo)
3122 args = _fail_ns(task_id=t.task_id, run_id="failer-1",
3123 error="connection refused on port 5432")
3124 with _patch_repo(repo):
3125 run_fail_task(args)
3126 out = json.loads(capsys.readouterr().out)
3127 assert out["error"] == "connection refused on port 5432"
3128
3129 def test_json_empty_error_is_empty_string(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3130 repo = _make_repo(tmp_path)
3131 t = self._setup(repo)
3132 args = _fail_ns(task_id=t.task_id, run_id="failer-1", error="")
3133 with _patch_repo(repo):
3134 run_fail_task(args)
3135 out = json.loads(capsys.readouterr().out)
3136 assert out["error"] == ""
3137
3138 def test_json_elapsed_is_float(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3139 repo = _make_repo(tmp_path)
3140 t = self._setup(repo)
3141 args = _fail_ns(task_id=t.task_id, run_id="failer-1")
3142 with _patch_repo(repo):
3143 run_fail_task(args)
3144 out = json.loads(capsys.readouterr().out)
3145 assert isinstance(out["duration_ms"], float)
3146
3147 def test_json_claimer_run_id_matches(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3148 repo = _make_repo(tmp_path)
3149 t = self._setup(repo)
3150 args = _fail_ns(task_id=t.task_id, run_id="failer-1")
3151 with _patch_repo(repo):
3152 run_fail_task(args)
3153 out = json.loads(capsys.readouterr().out)
3154 assert out["claimer_run_id"] == "failer-1"
3155
3156 def test_json_wrong_claimer_error_shape(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3157 repo = _make_repo(tmp_path)
3158 t = self._setup(repo)
3159 args = _fail_ns(task_id=t.task_id, run_id="impostor", json_out=True)
3160 with _patch_repo(repo):
3161 with pytest.raises(SystemExit) as exc:
3162 run_fail_task(args)
3163 assert exc.value.code == 1
3164 out = json.loads(capsys.readouterr().out)
3165 assert "error" in out
3166
3167 def test_json_missing_task_error_shape(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3168 repo = _make_repo(tmp_path)
3169 args = _fail_ns(task_id=VALID_ID, run_id="failer-1", json_out=True)
3170 with _patch_repo(repo):
3171 with pytest.raises(SystemExit) as exc:
3172 run_fail_task(args)
3173 assert exc.value.code == 1
3174 out = json.loads(capsys.readouterr().out)
3175 assert "error" in out
3176
3177
3178 class TestFailTaskTextOutput:
3179 """run_fail_task text output content."""
3180
3181 def _setup(self, repo: pathlib.Path, title: str = "My Task", queue: str = "default") -> TaskRecord:
3182 t = create_task(repo, title, queue=queue)
3183 claim_next_task(repo, "agent-1")
3184 return t
3185
3186 def test_text_shows_failed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3187 repo = _make_repo(tmp_path)
3188 t = self._setup(repo)
3189 args = _fail_ns(task_id=t.task_id, run_id="agent-1", json_out=False)
3190 with _patch_repo(repo):
3191 run_fail_task(args)
3192 out = capsys.readouterr().out
3193 assert "failed" in out.lower()
3194
3195 def test_text_shows_task_title(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3196 repo = _make_repo(tmp_path)
3197 t = self._setup(repo, title="Deploy billing service")
3198 args = _fail_ns(task_id=t.task_id, run_id="agent-1", json_out=False)
3199 with _patch_repo(repo):
3200 run_fail_task(args)
3201 out = capsys.readouterr().out
3202 assert "Deploy billing service" in out
3203
3204 def test_text_shows_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3205 repo = _make_repo(tmp_path)
3206 t = self._setup(repo, queue="infra")
3207 args = _fail_ns(task_id=t.task_id, run_id="agent-1", json_out=False)
3208 with _patch_repo(repo):
3209 run_fail_task(args)
3210 out = capsys.readouterr().out
3211 assert "infra" in out
3212
3213 def test_text_shows_claimer(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3214 repo = _make_repo(tmp_path)
3215 t = self._setup(repo)
3216 args = _fail_ns(task_id=t.task_id, run_id="agent-1", json_out=False)
3217 with _patch_repo(repo):
3218 run_fail_task(args)
3219 out = capsys.readouterr().out
3220 assert "agent-1" in out
3221
3222 def test_text_shows_error_message(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3223 repo = _make_repo(tmp_path)
3224 t = self._setup(repo)
3225 args = _fail_ns(task_id=t.task_id, run_id="agent-1",
3226 error="disk full on /data", json_out=False)
3227 with _patch_repo(repo):
3228 run_fail_task(args)
3229 out = capsys.readouterr().out
3230 assert "disk full on /data" in out
3231
3232 def test_text_no_error_line_when_empty(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3233 repo = _make_repo(tmp_path)
3234 t = self._setup(repo)
3235 args = _fail_ns(task_id=t.task_id, run_id="agent-1", error="", json_out=False)
3236 with _patch_repo(repo):
3237 run_fail_task(args)
3238 out = capsys.readouterr().out
3239 assert "Error:" not in out
3240
3241 def test_text_shows_elapsed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3242 repo = _make_repo(tmp_path)
3243 t = self._setup(repo)
3244 args = _fail_ns(task_id=t.task_id, run_id="agent-1", json_out=False)
3245 with _patch_repo(repo):
3246 run_fail_task(args)
3247 out = capsys.readouterr().out
3248 assert "s)" in out
3249
3250 def test_ansi_injection_in_error_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3251 repo = _make_repo(tmp_path)
3252 t = self._setup(repo)
3253 malicious_err = "error\x1b[31mRED\x1b[0m"
3254 args = _fail_ns(task_id=t.task_id, run_id="agent-1",
3255 error=malicious_err, json_out=False)
3256 with _patch_repo(repo):
3257 run_fail_task(args)
3258 out = capsys.readouterr().out
3259 assert "\x1b" not in out
3260
3261 def test_ansi_injection_in_title_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3262 malicious_title = "task\x1b[1mBOLD\x1b[0m"
3263 repo = _make_repo(tmp_path)
3264 t = create_task(repo, malicious_title)
3265 claim_next_task(repo, "agent-1")
3266 args = _fail_ns(task_id=t.task_id, run_id="agent-1", json_out=False)
3267 with _patch_repo(repo):
3268 run_fail_task(args)
3269 out = capsys.readouterr().out
3270 assert "\x1b" not in out
3271
3272
3273 class TestFailTaskIntegration:
3274 """Full lifecycle integration tests for run_fail_task."""
3275
3276 def test_failed_status_persisted_to_disk(self, tmp_path: pathlib.Path) -> None:
3277 repo = _make_repo(tmp_path)
3278 t = create_task(repo, "persist-fail")
3279 claim_next_task(repo, "worker")
3280 args = _fail_ns(task_id=t.task_id, run_id="worker", error="timeout")
3281 with _patch_repo(repo):
3282 run_fail_task(args)
3283 claim = load_claim(repo, t.task_id)
3284 assert claim is not None
3285 assert claim.status == "failed"
3286 assert claim.error == "timeout"
3287
3288 def test_error_persisted_to_disk(self, tmp_path: pathlib.Path) -> None:
3289 repo = _make_repo(tmp_path)
3290 t = create_task(repo, "error-persist")
3291 claim_next_task(repo, "worker")
3292 args = _fail_ns(task_id=t.task_id, run_id="worker",
3293 error="OOM at step 3: allocated 16 GiB")
3294 with _patch_repo(repo):
3295 run_fail_task(args)
3296 claim = load_claim(repo, t.task_id)
3297 assert claim is not None
3298 assert "OOM at step 3" in claim.error
3299
3300 def test_double_fail_exits_1(self, tmp_path: pathlib.Path) -> None:
3301 repo = _make_repo(tmp_path)
3302 t = create_task(repo, "double-fail")
3303 claim_next_task(repo, "worker")
3304 args = _fail_ns(task_id=t.task_id, run_id="worker")
3305 with _patch_repo(repo):
3306 run_fail_task(args)
3307 with _patch_repo(repo):
3308 with pytest.raises(SystemExit) as exc:
3309 run_fail_task(args)
3310 assert exc.value.code == 1
3311
3312 def test_fail_unclaimed_task_exits_1(self, tmp_path: pathlib.Path) -> None:
3313 repo = _make_repo(tmp_path)
3314 t = create_task(repo, "unclaimed")
3315 args = _fail_ns(task_id=t.task_id, run_id="worker")
3316 with _patch_repo(repo):
3317 with pytest.raises(SystemExit) as exc:
3318 run_fail_task(args)
3319 assert exc.value.code == 1
3320
3321 def test_enqueue_claim_fail_full_cycle(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3322 """Full CLI round-trip: enqueue → claim → fail."""
3323 repo = _make_repo(tmp_path)
3324 eq_args = _enqueue_ns(title="e2e fail task", queue="default")
3325 with _patch_repo(repo):
3326 run_enqueue(eq_args)
3327 task_id = json.loads(capsys.readouterr().out)["task_id"]
3328
3329 cl_args = _claim_ns(run_id="e2e-worker")
3330 with _patch_repo(repo):
3331 run_claim(cl_args)
3332 capsys.readouterr()
3333
3334 fa_args = _fail_ns(task_id=task_id, run_id="e2e-worker",
3335 error="dependency unavailable")
3336 with _patch_repo(repo):
3337 run_fail_task(fa_args)
3338 out = json.loads(capsys.readouterr().out)
3339 assert out["status"] == "failed"
3340 assert "dependency unavailable" in out["error"]
3341
3342 def test_unicode_error_message(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3343 repo = _make_repo(tmp_path)
3344 t = create_task(repo, "unicode-fail")
3345 claim_next_task(repo, "agent-1")
3346 args = _fail_ns(task_id=t.task_id, run_id="agent-1",
3347 error="échec: fichier introuvable — erreur 404 🚫")
3348 with _patch_repo(repo):
3349 run_fail_task(args)
3350 out = json.loads(capsys.readouterr().out)
3351 assert out["status"] == "failed"
3352 assert "échec" in out["error"]
3353
3354
3355 class TestFailTaskStress:
3356 """Concurrency and throughput stress tests for run_fail_task."""
3357
3358 def test_20_agents_each_claim_and_fail_unique_task(self, tmp_path: pathlib.Path) -> None:
3359 """20 concurrent agents each claim+fail a unique task with no conflicts."""
3360 import concurrent.futures
3361 repo = _make_repo(tmp_path)
3362 tasks = [create_task(repo, f"stress-{i}") for i in range(20)]
3363
3364 failed_ids: set[str] = set()
3365 lock = threading.Lock()
3366 errors: list[str] = []
3367
3368 def claim_and_fail(task: "TaskRecord") -> None:
3369 run_id = f"agent-{task.task_id[:8]}"
3370 result = claim_next_task(repo, run_id, queue=task.queue)
3371 if result is None:
3372 errors.append(f"no task for {run_id}")
3373 return
3374 claimed_task, _ = result
3375 try:
3376 fail_task(repo, claimed_task.task_id, run_id, error="stress test")
3377 except Exception as exc: # noqa: BLE001
3378 errors.append(str(exc))
3379 return
3380 with lock:
3381 failed_ids.add(claimed_task.task_id)
3382
3383 with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool:
3384 list(pool.map(claim_and_fail, tasks))
3385
3386 assert not errors, f"errors: {errors}"
3387 assert len(failed_ids) == 20, f"only {len(failed_ids)}/20 failed"
3388
3389 def test_100_sequential_fails_under_15s(self, tmp_path: pathlib.Path) -> None:
3390 repo = _make_repo(tmp_path)
3391 start = time.monotonic()
3392 for i in range(100):
3393 t = create_task(repo, f"seq-fail-{i}")
3394 claim_next_task(repo, "batch-failer")
3395 fail_task(repo, t.task_id, "batch-failer", error=f"error {i}")
3396 elapsed = time.monotonic() - start
3397 assert elapsed < 15.0, f"100 sequential fails took {elapsed:.1f}s"
3398
3399 def test_fail_via_run_fail_task_100_sequential(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3400 repo = _make_repo(tmp_path)
3401 start = time.monotonic()
3402 for i in range(100):
3403 t = create_task(repo, f"cli-fail-{i}")
3404 claim_next_task(repo, "cli-failer")
3405 args = _fail_ns(task_id=t.task_id, run_id="cli-failer",
3406 error=f"error {i}")
3407 with _patch_repo(repo):
3408 run_fail_task(args)
3409 capsys.readouterr()
3410 elapsed = time.monotonic() - start
3411 assert elapsed < 15.0, f"100 CLI fails took {elapsed:.1f}s"
3412
3413
3414 class TestCliCancelTask:
3415 """run_cancel_task: pending cancel, force cancel, error paths."""
3416
3417 def test_cancel_pending_task_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3418 repo = _make_repo(tmp_path)
3419 t = create_task(repo, "Unwanted task")
3420 args = _namespace(task_id=t.task_id, run_id="orchestrator", force=False, json_out=True)
3421 with _patch_repo(repo):
3422 run_cancel_task(args)
3423 out = json.loads(capsys.readouterr().out)
3424 assert out["status"] == "cancelled"
3425
3426 def test_cancel_claimed_by_claimer(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3427 repo = _make_repo(tmp_path)
3428 t = create_task(repo, "My task")
3429 claim_next_task(repo, "agent-1")
3430 args = _namespace(task_id=t.task_id, run_id="agent-1", force=False, json_out=True)
3431 with _patch_repo(repo):
3432 run_cancel_task(args)
3433 out = json.loads(capsys.readouterr().out)
3434 assert out["status"] == "cancelled"
3435
3436 def test_cancel_force_different_agent(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3437 repo = _make_repo(tmp_path)
3438 t = create_task(repo, "Stolen task")
3439 claim_next_task(repo, "agent-1")
3440 args = _namespace(task_id=t.task_id, run_id="orchestrator", force=True, json_out=True)
3441 with _patch_repo(repo):
3442 run_cancel_task(args)
3443 out = json.loads(capsys.readouterr().out)
3444 assert out["status"] == "cancelled"
3445
3446 def test_cancel_wrong_claimer_no_force_exits_1(self, tmp_path: pathlib.Path) -> None:
3447 repo = _make_repo(tmp_path)
3448 t = create_task(repo, "Someone else's task")
3449 claim_next_task(repo, "agent-1")
3450 args = _namespace(task_id=t.task_id, run_id="agent-2", force=False, json_out=True)
3451 with _patch_repo(repo):
3452 with pytest.raises(SystemExit) as exc:
3453 run_cancel_task(args)
3454 assert exc.value.code == 1
3455
3456 def test_cancel_text_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3457 repo = _make_repo(tmp_path)
3458 t = create_task(repo, "To cancel")
3459 args = _namespace(task_id=t.task_id, run_id="orchestrator", force=False, json_out=False)
3460 with _patch_repo(repo):
3461 run_cancel_task(args)
3462 out = capsys.readouterr().out
3463 assert "cancelled" in out.lower()
3464
3465
3466 # ── cancel-task hardening ─────────────────────────────────────────────────────
3467
3468
3469 def _cancel_ns(**kwargs: MsgpackValue) -> argparse.Namespace:
3470 """Build a Namespace with cancel-task-appropriate defaults."""
3471 defaults = {
3472 "json_out": True,
3473 "run_id": "orchestrator",
3474 "task_id": VALID_ID,
3475 "force": False,
3476 }
3477 defaults.update(kwargs)
3478 return argparse.Namespace(**defaults)
3479
3480
3481 class TestCancelTaskInputValidation:
3482 """All cancel-task validation fires before require_repo() and exits 1."""
3483
3484 def test_run_id_too_long_exits_1(self, tmp_path: pathlib.Path) -> None:
3485 repo = _make_repo(tmp_path)
3486 args = _cancel_ns(task_id=VALID_ID, run_id="x" * 257)
3487 with _patch_repo(repo):
3488 with pytest.raises(SystemExit) as exc:
3489 run_cancel_task(args)
3490 assert exc.value.code == ExitCode.USER_ERROR
3491
3492 def test_run_id_at_max_length_passes_validation(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3493 repo = _make_repo(tmp_path)
3494 t = create_task(repo, "task")
3495 # run_id exactly at limit — passes length check; pending task cancels fine
3496 args = _cancel_ns(task_id=t.task_id, run_id="c" * 256)
3497 with _patch_repo(repo):
3498 run_cancel_task(args)
3499 out = json.loads(capsys.readouterr().out)
3500 assert out["status"] == "cancelled"
3501
3502 def test_invalid_task_id_exits_1(self, tmp_path: pathlib.Path) -> None:
3503 repo = _make_repo(tmp_path)
3504 args = _cancel_ns(task_id="not-a-content-id")
3505 with _patch_repo(repo):
3506 with pytest.raises(SystemExit) as exc:
3507 run_cancel_task(args)
3508 assert exc.value.code == ExitCode.USER_ERROR
3509
3510 def test_path_traversal_task_id_exits_1(self, tmp_path: pathlib.Path) -> None:
3511 repo = _make_repo(tmp_path)
3512 args = _cancel_ns(task_id="../../etc/passwd")
3513 with _patch_repo(repo):
3514 with pytest.raises(SystemExit) as exc:
3515 run_cancel_task(args)
3516 assert exc.value.code == ExitCode.USER_ERROR
3517
3518 def test_null_byte_task_id_exits_1(self, tmp_path: pathlib.Path) -> None:
3519 repo = _make_repo(tmp_path)
3520 args = _cancel_ns(task_id="12345678-1234-4abc-8abc-123456789\x00ab")
3521 with _patch_repo(repo):
3522 with pytest.raises(SystemExit) as exc:
3523 run_cancel_task(args)
3524 assert exc.value.code == ExitCode.USER_ERROR
3525
3526 def test_task_id_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None:
3527 call_count = {"n": 0}
3528
3529 def counting_require_repo() -> pathlib.Path:
3530 call_count["n"] += 1
3531 raise RuntimeError("should not be called")
3532
3533 args = _cancel_ns(task_id="BADUUID")
3534 with patch("muse.cli.commands.task_queue.require_repo", counting_require_repo):
3535 with pytest.raises(SystemExit):
3536 run_cancel_task(args)
3537 assert call_count["n"] == 0, "require_repo called before task_id validation"
3538
3539 def test_run_id_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None:
3540 call_count = {"n": 0}
3541
3542 def counting_require_repo() -> pathlib.Path:
3543 call_count["n"] += 1
3544 raise RuntimeError("should not be called")
3545
3546 args = _cancel_ns(task_id=VALID_ID, run_id="r" * 300)
3547 with patch("muse.cli.commands.task_queue.require_repo", counting_require_repo):
3548 with pytest.raises(SystemExit):
3549 run_cancel_task(args)
3550 assert call_count["n"] == 0
3551
3552 def test_json_error_shape_bad_task_id(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3553 repo = _make_repo(tmp_path)
3554 args = _cancel_ns(task_id="bad-id", json_out=True)
3555 with _patch_repo(repo):
3556 with pytest.raises(SystemExit):
3557 run_cancel_task(args)
3558 out = json.loads(capsys.readouterr().out)
3559 assert "error" in out
3560 assert out["status"] == "bad_task_id"
3561
3562 def test_json_error_shape_bad_run_id(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3563 repo = _make_repo(tmp_path)
3564 args = _cancel_ns(task_id=VALID_ID, run_id="x" * 300, json_out=True)
3565 with _patch_repo(repo):
3566 with pytest.raises(SystemExit):
3567 run_cancel_task(args)
3568 out = json.loads(capsys.readouterr().out)
3569 assert "error" in out
3570 assert out["status"] == "bad_args"
3571
3572 def test_text_error_goes_to_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3573 repo = _make_repo(tmp_path)
3574 args = _cancel_ns(task_id="BADUUID", json_out=False)
3575 with _patch_repo(repo):
3576 with pytest.raises(SystemExit):
3577 run_cancel_task(args)
3578 captured = capsys.readouterr()
3579 assert captured.out == ""
3580 assert "❌" in captured.err
3581
3582 def test_json_error_goes_to_stdout_not_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3583 repo = _make_repo(tmp_path)
3584 args = _cancel_ns(task_id="BADUUID", json_out=True)
3585 with _patch_repo(repo):
3586 with pytest.raises(SystemExit):
3587 run_cancel_task(args)
3588 captured = capsys.readouterr()
3589 assert captured.err == ""
3590 out = json.loads(captured.out)
3591 assert "error" in out
3592
3593 def test_missing_task_error_shape(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3594 repo = _make_repo(tmp_path)
3595 # Valid UUID but task does not exist
3596 args = _cancel_ns(task_id=VALID_ID, json_out=True)
3597 with _patch_repo(repo):
3598 with pytest.raises(SystemExit) as exc:
3599 run_cancel_task(args)
3600 assert exc.value.code == 1
3601 out = json.loads(capsys.readouterr().out)
3602 assert "error" in out
3603
3604 def test_already_terminal_error_shape(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3605 repo = _make_repo(tmp_path)
3606 t = create_task(repo, "terminal")
3607 claim_next_task(repo, "agent-1")
3608 complete_task(repo, t.task_id, "agent-1")
3609 args = _cancel_ns(task_id=t.task_id, run_id="agent-1", json_out=True)
3610 with _patch_repo(repo):
3611 with pytest.raises(SystemExit) as exc:
3612 run_cancel_task(args)
3613 assert exc.value.code == 1
3614 out = json.loads(capsys.readouterr().out)
3615 assert "error" in out
3616
3617 def test_wrong_claimer_no_force_error_shape(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3618 repo = _make_repo(tmp_path)
3619 t = create_task(repo, "owned")
3620 claim_next_task(repo, "agent-1")
3621 args = _cancel_ns(task_id=t.task_id, run_id="agent-2",
3622 force=False, json_out=True)
3623 with _patch_repo(repo):
3624 with pytest.raises(SystemExit) as exc:
3625 run_cancel_task(args)
3626 assert exc.value.code == 1
3627 out = json.loads(capsys.readouterr().out)
3628 assert "error" in out
3629
3630
3631 class TestCancelTaskJsonOutput:
3632 """run_cancel_task JSON output shape and compactness."""
3633
3634 def test_json_is_compact(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3635 repo = _make_repo(tmp_path)
3636 t = create_task(repo, "compact task")
3637 args = _cancel_ns(task_id=t.task_id)
3638 with _patch_repo(repo):
3639 run_cancel_task(args)
3640 raw = capsys.readouterr().out.strip()
3641 assert "\n" not in raw, "JSON output must be single line (compact)"
3642
3643 def test_json_has_required_keys(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3644 repo = _make_repo(tmp_path)
3645 t = create_task(repo, "keys task")
3646 args = _cancel_ns(task_id=t.task_id)
3647 with _patch_repo(repo):
3648 run_cancel_task(args)
3649 out = json.loads(capsys.readouterr().out)
3650 for key in ("schema", "task_id", "claimer_run_id", "status",
3651 "claimed_at", "expires_at", "error", "duration_ms"):
3652 assert key in out, f"missing key: {key}"
3653
3654 def test_json_status_is_cancelled(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3655 repo = _make_repo(tmp_path)
3656 t = create_task(repo, "status task")
3657 args = _cancel_ns(task_id=t.task_id)
3658 with _patch_repo(repo):
3659 run_cancel_task(args)
3660 out = json.loads(capsys.readouterr().out)
3661 assert out["status"] == "cancelled"
3662
3663 def test_json_elapsed_is_float(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3664 repo = _make_repo(tmp_path)
3665 t = create_task(repo, "elapsed task")
3666 args = _cancel_ns(task_id=t.task_id)
3667 with _patch_repo(repo):
3668 run_cancel_task(args)
3669 out = json.loads(capsys.readouterr().out)
3670 assert isinstance(out["duration_ms"], float)
3671
3672 def test_json_claimer_run_id_for_pending_is_caller(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3673 """For a pending task, claimer_run_id is the calling agent."""
3674 repo = _make_repo(tmp_path)
3675 t = create_task(repo, "pending task")
3676 args = _cancel_ns(task_id=t.task_id, run_id="orchestrator-99")
3677 with _patch_repo(repo):
3678 run_cancel_task(args)
3679 out = json.loads(capsys.readouterr().out)
3680 assert out["claimer_run_id"] == "orchestrator-99"
3681
3682 def test_json_claimer_run_id_for_claimed_is_original_claimer(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3683 """For a claimed task cancelled by its claimer, run_id is preserved."""
3684 repo = _make_repo(tmp_path)
3685 t = create_task(repo, "claimed task")
3686 claim_next_task(repo, "agent-xyz")
3687 args = _cancel_ns(task_id=t.task_id, run_id="agent-xyz")
3688 with _patch_repo(repo):
3689 run_cancel_task(args)
3690 out = json.loads(capsys.readouterr().out)
3691 assert out["claimer_run_id"] == "agent-xyz"
3692
3693 def test_json_force_cancel_different_agent(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3694 repo = _make_repo(tmp_path)
3695 t = create_task(repo, "force task")
3696 claim_next_task(repo, "agent-1")
3697 args = _cancel_ns(task_id=t.task_id, run_id="orchestrator", force=True)
3698 with _patch_repo(repo):
3699 run_cancel_task(args)
3700 out = json.loads(capsys.readouterr().out)
3701 assert out["status"] == "cancelled"
3702
3703 def test_json_task_id_matches(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3704 repo = _make_repo(tmp_path)
3705 t = create_task(repo, "id task")
3706 args = _cancel_ns(task_id=t.task_id)
3707 with _patch_repo(repo):
3708 run_cancel_task(args)
3709 out = json.loads(capsys.readouterr().out)
3710 assert out["task_id"] == t.task_id
3711
3712
3713 class TestCancelTaskTextOutput:
3714 """run_cancel_task text output content."""
3715
3716 def _setup_pending(self, repo: pathlib.Path, title: str = "Pending Task",
3717 queue: str = "default") -> TaskRecord:
3718 return create_task(repo, title, queue=queue)
3719
3720 def _setup_claimed(self, repo: pathlib.Path, title: str = "Claimed Task",
3721 queue: str = "default", claimer: str = "agent-1") -> TaskRecord:
3722 t = create_task(repo, title, queue=queue)
3723 claim_next_task(repo, claimer)
3724 return t
3725
3726 def test_text_shows_cancelled(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3727 repo = _make_repo(tmp_path)
3728 t = self._setup_pending(repo)
3729 args = _cancel_ns(task_id=t.task_id, json_out=False)
3730 with _patch_repo(repo):
3731 run_cancel_task(args)
3732 out = capsys.readouterr().out
3733 assert "cancelled" in out.lower()
3734
3735 def test_text_shows_task_title(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3736 repo = _make_repo(tmp_path)
3737 t = self._setup_pending(repo, title="Decommission old infra")
3738 args = _cancel_ns(task_id=t.task_id, json_out=False)
3739 with _patch_repo(repo):
3740 run_cancel_task(args)
3741 out = capsys.readouterr().out
3742 assert "Decommission old infra" in out
3743
3744 def test_text_shows_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3745 repo = _make_repo(tmp_path)
3746 t = self._setup_pending(repo, queue="infra")
3747 args = _cancel_ns(task_id=t.task_id, json_out=False)
3748 with _patch_repo(repo):
3749 run_cancel_task(args)
3750 out = capsys.readouterr().out
3751 assert "infra" in out
3752
3753 def test_text_shows_claimer(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3754 repo = _make_repo(tmp_path)
3755 t = self._setup_pending(repo)
3756 args = _cancel_ns(task_id=t.task_id, run_id="orch-42", json_out=False)
3757 with _patch_repo(repo):
3758 run_cancel_task(args)
3759 out = capsys.readouterr().out
3760 assert "orch-42" in out
3761
3762 def test_text_shows_forced_indicator(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3763 repo = _make_repo(tmp_path)
3764 t = self._setup_claimed(repo, claimer="agent-1")
3765 args = _cancel_ns(task_id=t.task_id, run_id="orch", force=True, json_out=False)
3766 with _patch_repo(repo):
3767 run_cancel_task(args)
3768 out = capsys.readouterr().out
3769 assert "forced" in out.lower()
3770
3771 def test_text_no_forced_indicator_without_flag(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3772 repo = _make_repo(tmp_path)
3773 t = self._setup_claimed(repo, claimer="agent-1")
3774 args = _cancel_ns(task_id=t.task_id, run_id="agent-1",
3775 force=False, json_out=False)
3776 with _patch_repo(repo):
3777 run_cancel_task(args)
3778 out = capsys.readouterr().out
3779 assert "forced" not in out.lower()
3780
3781 def test_text_shows_elapsed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3782 repo = _make_repo(tmp_path)
3783 t = self._setup_pending(repo)
3784 args = _cancel_ns(task_id=t.task_id, json_out=False)
3785 with _patch_repo(repo):
3786 run_cancel_task(args)
3787 out = capsys.readouterr().out
3788 assert "s)" in out
3789
3790 def test_ansi_injection_in_title_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3791 malicious_title = "task\x1b[1mBOLD\x1b[0m"
3792 repo = _make_repo(tmp_path)
3793 t = create_task(repo, malicious_title)
3794 args = _cancel_ns(task_id=t.task_id, json_out=False)
3795 with _patch_repo(repo):
3796 run_cancel_task(args)
3797 out = capsys.readouterr().out
3798 assert "\x1b" not in out
3799
3800 def test_ansi_injection_in_run_id_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3801 malicious_id = "orch\x1b[31mRED\x1b[0m"
3802 repo = _make_repo(tmp_path)
3803 t = create_task(repo, "task")
3804 args = _cancel_ns(task_id=t.task_id, run_id=malicious_id, json_out=False)
3805 with _patch_repo(repo):
3806 run_cancel_task(args)
3807 out = capsys.readouterr().out
3808 assert "\x1b" not in out
3809
3810
3811 class TestCancelTaskIntegration:
3812 """Full lifecycle integration tests for run_cancel_task."""
3813
3814 def test_pending_task_cancelled_status_on_disk(self, tmp_path: pathlib.Path) -> None:
3815 repo = _make_repo(tmp_path)
3816 t = create_task(repo, "pending-cancel")
3817 args = _cancel_ns(task_id=t.task_id, run_id="orch")
3818 with _patch_repo(repo):
3819 run_cancel_task(args)
3820 claim = load_claim(repo, t.task_id)
3821 assert claim is not None
3822 assert claim.status == "cancelled"
3823
3824 def test_claimed_task_cancelled_by_claimer(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3825 repo = _make_repo(tmp_path)
3826 t = create_task(repo, "claimed-cancel")
3827 claim_next_task(repo, "worker")
3828 args = _cancel_ns(task_id=t.task_id, run_id="worker")
3829 with _patch_repo(repo):
3830 run_cancel_task(args)
3831 out = json.loads(capsys.readouterr().out)
3832 assert out["status"] == "cancelled"
3833 claim = load_claim(repo, t.task_id)
3834 assert claim.status == "cancelled"
3835
3836 def test_force_cancel_different_claimer(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3837 repo = _make_repo(tmp_path)
3838 t = create_task(repo, "force-cancel")
3839 claim_next_task(repo, "agent-1")
3840 args = _cancel_ns(task_id=t.task_id, run_id="orchestrator", force=True)
3841 with _patch_repo(repo):
3842 run_cancel_task(args)
3843 out = json.loads(capsys.readouterr().out)
3844 assert out["status"] == "cancelled"
3845
3846 def test_double_cancel_exits_1(self, tmp_path: pathlib.Path) -> None:
3847 repo = _make_repo(tmp_path)
3848 t = create_task(repo, "double-cancel")
3849 args = _cancel_ns(task_id=t.task_id)
3850 with _patch_repo(repo):
3851 run_cancel_task(args)
3852 with _patch_repo(repo):
3853 with pytest.raises(SystemExit) as exc:
3854 run_cancel_task(args)
3855 assert exc.value.code == 1
3856
3857 def test_cancel_completed_task_exits_1(self, tmp_path: pathlib.Path) -> None:
3858 repo = _make_repo(tmp_path)
3859 t = create_task(repo, "completed-task")
3860 claim_next_task(repo, "agent-1")
3861 complete_task(repo, t.task_id, "agent-1")
3862 args = _cancel_ns(task_id=t.task_id, run_id="agent-1")
3863 with _patch_repo(repo):
3864 with pytest.raises(SystemExit) as exc:
3865 run_cancel_task(args)
3866 assert exc.value.code == 1
3867
3868 def test_cancel_failed_task_exits_1(self, tmp_path: pathlib.Path) -> None:
3869 repo = _make_repo(tmp_path)
3870 t = create_task(repo, "failed-task")
3871 claim_next_task(repo, "agent-1")
3872 fail_task(repo, t.task_id, "agent-1", error="boom")
3873 args = _cancel_ns(task_id=t.task_id, run_id="agent-1")
3874 with _patch_repo(repo):
3875 with pytest.raises(SystemExit) as exc:
3876 run_cancel_task(args)
3877 assert exc.value.code == 1
3878
3879 def test_enqueue_then_cancel_full_cycle(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3880 """Full CLI round-trip: enqueue → cancel."""
3881 repo = _make_repo(tmp_path)
3882 eq_args = _enqueue_ns(title="e2e cancel", queue="default")
3883 with _patch_repo(repo):
3884 run_enqueue(eq_args)
3885 task_id = json.loads(capsys.readouterr().out)["task_id"]
3886
3887 ca_args = _cancel_ns(task_id=task_id, run_id="orch")
3888 with _patch_repo(repo):
3889 run_cancel_task(ca_args)
3890 out = json.loads(capsys.readouterr().out)
3891 assert out["status"] == "cancelled"
3892 assert out["task_id"] == task_id
3893
3894 def test_enqueue_claim_cancel_full_cycle(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3895 """Full CLI round-trip: enqueue → claim → cancel by claimer."""
3896 repo = _make_repo(tmp_path)
3897 eq_args = _enqueue_ns(title="e2e claim-cancel", queue="default")
3898 with _patch_repo(repo):
3899 run_enqueue(eq_args)
3900 task_id = json.loads(capsys.readouterr().out)["task_id"]
3901
3902 cl_args = _claim_ns(run_id="e2e-worker")
3903 with _patch_repo(repo):
3904 run_claim(cl_args)
3905 capsys.readouterr()
3906
3907 ca_args = _cancel_ns(task_id=task_id, run_id="e2e-worker")
3908 with _patch_repo(repo):
3909 run_cancel_task(ca_args)
3910 out = json.loads(capsys.readouterr().out)
3911 assert out["status"] == "cancelled"
3912
3913
3914 class TestCancelTaskStress:
3915 """Concurrency and throughput stress tests for run_cancel_task."""
3916
3917 def test_20_agents_each_cancel_unique_pending_task(self, tmp_path: pathlib.Path) -> None:
3918 """20 concurrent orchestrators each cancel a distinct pending task."""
3919 import concurrent.futures
3920 repo = _make_repo(tmp_path)
3921 tasks = [create_task(repo, f"cancel-stress-{i}") for i in range(20)]
3922
3923 cancelled_ids: set[str] = set()
3924 lock = threading.Lock()
3925 errors: list[str] = []
3926
3927 def do_cancel(t: "TaskRecord") -> None:
3928 try:
3929 claim = cancel_task(repo, t.task_id, f"orch-{t.task_id[:8]}")
3930 except Exception as exc: # noqa: BLE001
3931 errors.append(str(exc))
3932 return
3933 with lock:
3934 cancelled_ids.add(claim.task_id)
3935
3936 with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool:
3937 list(pool.map(do_cancel, tasks))
3938
3939 assert not errors, f"errors: {errors}"
3940 assert len(cancelled_ids) == 20, f"only {len(cancelled_ids)}/20 cancelled"
3941
3942 def test_100_sequential_cancels_under_15s(self, tmp_path: pathlib.Path) -> None:
3943 repo = _make_repo(tmp_path)
3944 start = time.monotonic()
3945 for i in range(100):
3946 t = create_task(repo, f"seq-cancel-{i}")
3947 cancel_task(repo, t.task_id, "orch")
3948 elapsed = time.monotonic() - start
3949 assert elapsed < 15.0, f"100 sequential cancels took {elapsed:.1f}s"
3950
3951 def test_cancel_via_run_cancel_task_100_sequential(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3952 repo = _make_repo(tmp_path)
3953 start = time.monotonic()
3954 for i in range(100):
3955 t = create_task(repo, f"cli-cancel-{i}")
3956 args = _cancel_ns(task_id=t.task_id, run_id="orch")
3957 with _patch_repo(repo):
3958 run_cancel_task(args)
3959 capsys.readouterr()
3960 elapsed = time.monotonic() - start
3961 assert elapsed < 15.0, f"100 CLI cancels took {elapsed:.1f}s"
3962
3963
3964 class TestCliTasks:
3965 """run_tasks: listing, filtering, JSON/text output."""
3966
3967 def _setup_mixed(self, repo: pathlib.Path) -> tuple[TaskRecord, TaskRecord, TaskRecord]:
3968 """Create tasks in various states."""
3969 t1 = create_task(repo, "Pending task", queue="q1", priority=1)
3970 t2 = create_task(repo, "Claimed task", queue="q2", priority=5)
3971 t3 = create_task(repo, "Done task", queue="q1", priority=3)
3972 claim_next_task(repo, "agent-2", queue="q2")
3973 claim_next_task(repo, "agent-3", queue="q1")
3974 complete_task(repo, t3.task_id, "agent-3")
3975 return t1, t2, t3
3976
3977 def test_lists_all_tasks_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3978 repo = _make_repo(tmp_path)
3979 self._setup_mixed(repo)
3980 args = _namespace(json_out=True, status=None, queue=None, run_id=None)
3981 with _patch_repo(repo):
3982 run_tasks(args)
3983 out = json.loads(capsys.readouterr().out)
3984 assert out["total"] == 3
3985 assert len(out["items"]) == 3
3986
3987 def test_filter_by_status_pending(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3988 repo = _make_repo(tmp_path)
3989 self._setup_mixed(repo)
3990 args = _namespace(json_out=True, status="pending", queue=None, run_id=None)
3991 with _patch_repo(repo):
3992 run_tasks(args)
3993 out = json.loads(capsys.readouterr().out)
3994 for item in out["items"]:
3995 assert item["status"] == "pending"
3996
3997 def test_filter_by_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
3998 repo = _make_repo(tmp_path)
3999 self._setup_mixed(repo)
4000 args = _namespace(json_out=True, status=None, queue="q1", run_id=None)
4001 with _patch_repo(repo):
4002 run_tasks(args)
4003 out = json.loads(capsys.readouterr().out)
4004 for item in out["items"]:
4005 assert item["queue"] == "q1"
4006
4007 def test_filter_by_run_id(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4008 repo = _make_repo(tmp_path)
4009 self._setup_mixed(repo)
4010 args = _namespace(json_out=True, status=None, queue=None, run_id="agent-2")
4011 with _patch_repo(repo):
4012 run_tasks(args)
4013 out = json.loads(capsys.readouterr().out)
4014 for item in out["items"]:
4015 assert item["claimer_run_id"] == "agent-2"
4016
4017 def test_items_sorted_by_priority_desc(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4018 repo = _make_repo(tmp_path)
4019 with _freeze(_EPOCH):
4020 create_task(repo, "Low", priority=0)
4021 create_task(repo, "High", priority=10)
4022 create_task(repo, "Mid", priority=5)
4023 args = _namespace(json_out=True, status=None, queue=None, run_id=None)
4024 with _patch_repo(repo):
4025 run_tasks(args)
4026 out = json.loads(capsys.readouterr().out)
4027 priorities = [i["priority"] for i in out["items"]]
4028 assert priorities == sorted(priorities, reverse=True)
4029
4030 def test_text_output_no_crash(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4031 repo = _make_repo(tmp_path)
4032 create_task(repo, "Text test task")
4033 args = _namespace(json_out=False, status=None, queue=None, run_id=None)
4034 with _patch_repo(repo):
4035 run_tasks(args)
4036 out = capsys.readouterr().out
4037 assert "Task queue" in out
4038
4039 def test_empty_queue_text_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4040 repo = _make_repo(tmp_path)
4041 args = _namespace(json_out=False, status=None, queue=None, run_id=None)
4042 with _patch_repo(repo):
4043 run_tasks(args)
4044 out = capsys.readouterr().out
4045 assert "0 task" in out
4046
4047 def test_status_counts_in_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4048 repo = _make_repo(tmp_path)
4049 create_task(repo, "Pending")
4050 args = _namespace(json_out=True, status=None, queue=None, run_id=None)
4051 with _patch_repo(repo):
4052 run_tasks(args)
4053 out = json.loads(capsys.readouterr().out)
4054 assert "pending" in out
4055 assert out["pending"] == 1
4056
4057 def test_ansi_in_title_not_printed_raw(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4058 """ANSI escape sequences in task titles must be sanitized in text output."""
4059 repo = _make_repo(tmp_path)
4060 create_task(repo, "\x1b[31mRED\x1b[0m")
4061 args = _namespace(json_out=False, status=None, queue=None, run_id=None)
4062 with _patch_repo(repo):
4063 run_tasks(args)
4064 out = capsys.readouterr().out
4065 # Raw ESC byte must not appear in text output
4066 assert "\x1b" not in out
4067
4068
4069 # ── tasks hardening ───────────────────────────────────────────────────────────
4070
4071 from muse.cli.commands.task_queue import _MAX_LIMIT
4072 from muse.core.types import Manifest
4073
4074
4075 def _tasks_ns(**kwargs: MsgpackValue) -> argparse.Namespace:
4076 """Build a Namespace with tasks-appropriate defaults."""
4077 defaults = {
4078 "json_out": True,
4079 "status": None,
4080 "queue": None,
4081 "run_id": None,
4082 "limit": 200,
4083 }
4084 defaults.update(kwargs)
4085 return argparse.Namespace(**defaults)
4086
4087
4088 class TestTasksInputValidation:
4089 """All tasks validation fires before require_repo() and exits 1."""
4090
4091 def test_invalid_queue_name_exits_1(self, tmp_path: pathlib.Path) -> None:
4092 repo = _make_repo(tmp_path)
4093 args = _tasks_ns(queue="bad queue!")
4094 with _patch_repo(repo):
4095 with pytest.raises(SystemExit) as exc:
4096 run_tasks(args)
4097 assert exc.value.code == ExitCode.USER_ERROR
4098
4099 def test_queue_with_slash_exits_1(self, tmp_path: pathlib.Path) -> None:
4100 repo = _make_repo(tmp_path)
4101 args = _tasks_ns(queue="../../etc")
4102 with _patch_repo(repo):
4103 with pytest.raises(SystemExit) as exc:
4104 run_tasks(args)
4105 assert exc.value.code == ExitCode.USER_ERROR
4106
4107 def test_queue_with_ansi_exits_1(self, tmp_path: pathlib.Path) -> None:
4108 repo = _make_repo(tmp_path)
4109 args = _tasks_ns(queue="q\x1b[31m")
4110 with _patch_repo(repo):
4111 with pytest.raises(SystemExit) as exc:
4112 run_tasks(args)
4113 assert exc.value.code == ExitCode.USER_ERROR
4114
4115 def test_valid_queue_name_passes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4116 repo = _make_repo(tmp_path)
4117 create_task(repo, "task", queue="billing-v2")
4118 args = _tasks_ns(queue="billing-v2")
4119 with _patch_repo(repo):
4120 run_tasks(args)
4121 out = json.loads(capsys.readouterr().out)
4122 assert out["total"] >= 0 # no exception
4123
4124 def test_run_id_too_long_exits_1(self, tmp_path: pathlib.Path) -> None:
4125 repo = _make_repo(tmp_path)
4126 args = _tasks_ns(run_id="x" * 257)
4127 with _patch_repo(repo):
4128 with pytest.raises(SystemExit) as exc:
4129 run_tasks(args)
4130 assert exc.value.code == ExitCode.USER_ERROR
4131
4132 def test_run_id_at_max_length_passes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4133 repo = _make_repo(tmp_path)
4134 args = _tasks_ns(run_id="r" * 256)
4135 with _patch_repo(repo):
4136 run_tasks(args)
4137 out = json.loads(capsys.readouterr().out)
4138 assert "items" in out
4139
4140 def test_limit_zero_exits_1(self, tmp_path: pathlib.Path) -> None:
4141 repo = _make_repo(tmp_path)
4142 args = _tasks_ns(limit=0)
4143 with _patch_repo(repo):
4144 with pytest.raises(SystemExit) as exc:
4145 run_tasks(args)
4146 assert exc.value.code == ExitCode.USER_ERROR
4147
4148 def test_limit_negative_exits_1(self, tmp_path: pathlib.Path) -> None:
4149 repo = _make_repo(tmp_path)
4150 args = _tasks_ns(limit=-1)
4151 with _patch_repo(repo):
4152 with pytest.raises(SystemExit) as exc:
4153 run_tasks(args)
4154 assert exc.value.code == ExitCode.USER_ERROR
4155
4156 def test_limit_over_max_exits_1(self, tmp_path: pathlib.Path) -> None:
4157 repo = _make_repo(tmp_path)
4158 args = _tasks_ns(limit=_MAX_LIMIT + 1)
4159 with _patch_repo(repo):
4160 with pytest.raises(SystemExit) as exc:
4161 run_tasks(args)
4162 assert exc.value.code == ExitCode.USER_ERROR
4163
4164 def test_limit_at_max_passes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4165 repo = _make_repo(tmp_path)
4166 args = _tasks_ns(limit=_MAX_LIMIT)
4167 with _patch_repo(repo):
4168 run_tasks(args)
4169 out = json.loads(capsys.readouterr().out)
4170 assert "items" in out
4171
4172 def test_queue_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None:
4173 call_count = {"n": 0}
4174
4175 def counting_require_repo() -> pathlib.Path:
4176 call_count["n"] += 1
4177 raise RuntimeError("should not be called")
4178
4179 args = _tasks_ns(queue="bad queue!")
4180 with patch("muse.cli.commands.task_queue.require_repo", counting_require_repo):
4181 with pytest.raises(SystemExit):
4182 run_tasks(args)
4183 assert call_count["n"] == 0
4184
4185 def test_run_id_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None:
4186 call_count = {"n": 0}
4187
4188 def counting_require_repo() -> pathlib.Path:
4189 call_count["n"] += 1
4190 raise RuntimeError("should not be called")
4191
4192 args = _tasks_ns(run_id="r" * 300)
4193 with patch("muse.cli.commands.task_queue.require_repo", counting_require_repo):
4194 with pytest.raises(SystemExit):
4195 run_tasks(args)
4196 assert call_count["n"] == 0
4197
4198 def test_json_error_shape_bad_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4199 repo = _make_repo(tmp_path)
4200 args = _tasks_ns(queue="bad queue!", json_out=True)
4201 with _patch_repo(repo):
4202 with pytest.raises(SystemExit):
4203 run_tasks(args)
4204 out = json.loads(capsys.readouterr().out)
4205 assert "error" in out
4206 assert out["status"] == "bad_queue"
4207
4208 def test_json_error_shape_bad_limit(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4209 repo = _make_repo(tmp_path)
4210 args = _tasks_ns(limit=0, json_out=True)
4211 with _patch_repo(repo):
4212 with pytest.raises(SystemExit):
4213 run_tasks(args)
4214 out = json.loads(capsys.readouterr().out)
4215 assert "error" in out
4216 assert out["status"] == "bad_args"
4217
4218 def test_text_error_goes_to_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4219 repo = _make_repo(tmp_path)
4220 args = _tasks_ns(queue="bad queue!", json_out=False)
4221 with _patch_repo(repo):
4222 with pytest.raises(SystemExit):
4223 run_tasks(args)
4224 captured = capsys.readouterr()
4225 assert captured.out == ""
4226 assert "❌" in captured.err
4227
4228 def test_json_error_goes_to_stdout_not_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4229 repo = _make_repo(tmp_path)
4230 args = _tasks_ns(queue="bad queue!", json_out=True)
4231 with _patch_repo(repo):
4232 with pytest.raises(SystemExit):
4233 run_tasks(args)
4234 captured = capsys.readouterr()
4235 assert captured.err == ""
4236 out = json.loads(captured.out)
4237 assert "error" in out
4238
4239
4240 class TestTasksJsonOutput:
4241 """run_tasks JSON output shape, compactness, and field completeness."""
4242
4243 def _setup(self, repo: pathlib.Path) -> tuple[TaskRecord, TaskRecord, TaskRecord]:
4244 t1 = create_task(repo, "Pending", queue="q1", priority=1)
4245 t2 = create_task(repo, "Claimed", queue="q2", priority=5)
4246 t3 = create_task(repo, "Done", queue="q1", priority=3)
4247 claim_next_task(repo, "worker-a", queue="q2")
4248 claim_next_task(repo, "worker-b", queue="q1")
4249 complete_task(repo, t3.task_id, "worker-b")
4250 return t1, t2, t3
4251
4252 def test_json_is_compact(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4253 repo = _make_repo(tmp_path)
4254 self._setup(repo)
4255 args = _tasks_ns()
4256 with _patch_repo(repo):
4257 run_tasks(args)
4258 raw = capsys.readouterr().out.strip()
4259 assert "\n" not in raw, "JSON must be single line (compact)"
4260
4261 def test_json_has_top_level_keys(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4262 repo = _make_repo(tmp_path)
4263 self._setup(repo)
4264 args = _tasks_ns()
4265 with _patch_repo(repo):
4266 run_tasks(args)
4267 out = json.loads(capsys.readouterr().out)
4268 for key in ("schema", "total", "pending", "claimed", "timed_out",
4269 "completed", "failed", "cancelled", "limit", "truncated",
4270 "items", "duration_ms"):
4271 assert key in out, f"missing top-level key: {key}"
4272
4273 def test_items_have_new_fields(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4274 repo = _make_repo(tmp_path)
4275 create_task(repo, "field-test", queue="default")
4276 args = _tasks_ns()
4277 with _patch_repo(repo):
4278 run_tasks(args)
4279 out = json.loads(capsys.readouterr().out)
4280 assert len(out["items"]) == 1
4281 item = out["items"][0]
4282 for field in ("created_by", "ttl_seconds", "expires_at"):
4283 assert field in item, f"missing item field: {field}"
4284
4285 def test_expires_at_null_for_pending_task(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4286 repo = _make_repo(tmp_path)
4287 create_task(repo, "pending-task")
4288 args = _tasks_ns()
4289 with _patch_repo(repo):
4290 run_tasks(args)
4291 out = json.loads(capsys.readouterr().out)
4292 item = out["items"][0]
4293 assert item["expires_at"] is None
4294
4295 def test_expires_at_populated_for_claimed_task(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4296 repo = _make_repo(tmp_path)
4297 create_task(repo, "claimed-task")
4298 claim_next_task(repo, "worker")
4299 args = _tasks_ns()
4300 with _patch_repo(repo):
4301 run_tasks(args)
4302 out = json.loads(capsys.readouterr().out)
4303 item = out["items"][0]
4304 assert item["expires_at"] is not None
4305 # Should be a parseable ISO 8601 datetime
4306 import datetime as _dt
4307 _dt.datetime.fromisoformat(item["expires_at"])
4308
4309 def test_status_counts_reflect_full_queue_when_filtered(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4310 """Counts always reflect ALL tasks, not just the filtered set."""
4311 repo = _make_repo(tmp_path)
4312 self._setup(repo)
4313 # Filter to only q1 items, but total/counts should still be 3
4314 args = _tasks_ns(queue="q1")
4315 with _patch_repo(repo):
4316 run_tasks(args)
4317 out = json.loads(capsys.readouterr().out)
4318 assert out["total"] == 3
4319 assert len(out["items"]) == 2 # only q1 items
4320
4321 def test_limit_truncates_items(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4322 repo = _make_repo(tmp_path)
4323 for i in range(10):
4324 create_task(repo, f"task-{i}")
4325 args = _tasks_ns(limit=3)
4326 with _patch_repo(repo):
4327 run_tasks(args)
4328 out = json.loads(capsys.readouterr().out)
4329 assert len(out["items"]) == 3
4330 assert out["truncated"] is True
4331 assert out["limit"] == 3
4332 assert out["total"] == 10 # full count still correct
4333
4334 def test_no_truncation_when_within_limit(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4335 repo = _make_repo(tmp_path)
4336 for i in range(5):
4337 create_task(repo, f"task-{i}")
4338 args = _tasks_ns(limit=10)
4339 with _patch_repo(repo):
4340 run_tasks(args)
4341 out = json.loads(capsys.readouterr().out)
4342 assert len(out["items"]) == 5
4343 assert out["truncated"] is False
4344
4345 def test_elapsed_is_float(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4346 repo = _make_repo(tmp_path)
4347 args = _tasks_ns()
4348 with _patch_repo(repo):
4349 run_tasks(args)
4350 out = json.loads(capsys.readouterr().out)
4351 assert isinstance(out["duration_ms"], float)
4352
4353 def test_items_sorted_priority_desc(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4354 repo = _make_repo(tmp_path)
4355 with _freeze(_EPOCH):
4356 create_task(repo, "Low", priority=0)
4357 create_task(repo, "High", priority=10)
4358 create_task(repo, "Mid", priority=5)
4359 args = _tasks_ns()
4360 with _patch_repo(repo):
4361 run_tasks(args)
4362 out = json.loads(capsys.readouterr().out)
4363 priorities = [i["priority"] for i in out["items"]]
4364 assert priorities == sorted(priorities, reverse=True)
4365
4366 def test_limit_applies_after_sort(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4367 """--limit=1 should return the highest-priority task."""
4368 repo = _make_repo(tmp_path)
4369 with _freeze(_EPOCH):
4370 create_task(repo, "Low", priority=0)
4371 create_task(repo, "High", priority=99)
4372 args = _tasks_ns(limit=1)
4373 with _patch_repo(repo):
4374 run_tasks(args)
4375 out = json.loads(capsys.readouterr().out)
4376 assert len(out["items"]) == 1
4377 assert out["items"][0]["priority"] == 99
4378
4379
4380 class TestTasksTextOutput:
4381 """run_tasks text output content and formatting."""
4382
4383 def test_text_shows_task_queue_header(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4384 repo = _make_repo(tmp_path)
4385 args = _tasks_ns(json_out=False)
4386 with _patch_repo(repo):
4387 run_tasks(args)
4388 out = capsys.readouterr().out
4389 assert "Task queue" in out
4390
4391 def test_text_shows_status_counts(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4392 repo = _make_repo(tmp_path)
4393 create_task(repo, "pending")
4394 args = _tasks_ns(json_out=False)
4395 with _patch_repo(repo):
4396 run_tasks(args)
4397 out = capsys.readouterr().out
4398 assert "pending" in out
4399 assert "claimed" in out
4400
4401 def test_text_shows_column_headers(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4402 repo = _make_repo(tmp_path)
4403 create_task(repo, "task")
4404 args = _tasks_ns(json_out=False)
4405 with _patch_repo(repo):
4406 run_tasks(args)
4407 out = capsys.readouterr().out
4408 assert "ID" in out
4409 assert "QUEUE" in out
4410 assert "TITLE" in out
4411
4412 def test_text_shows_filter_line(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4413 repo = _make_repo(tmp_path)
4414 args = _tasks_ns(json_out=False, queue="billing")
4415 with _patch_repo(repo):
4416 run_tasks(args)
4417 out = capsys.readouterr().out
4418 assert "filter" in out
4419 assert "billing" in out
4420
4421 def test_text_empty_queue_message(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4422 repo = _make_repo(tmp_path)
4423 args = _tasks_ns(json_out=False)
4424 with _patch_repo(repo):
4425 run_tasks(args)
4426 out = capsys.readouterr().out
4427 assert "0 task" in out
4428
4429 def test_text_shows_elapsed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4430 repo = _make_repo(tmp_path)
4431 args = _tasks_ns(json_out=False)
4432 with _patch_repo(repo):
4433 run_tasks(args)
4434 out = capsys.readouterr().out
4435 assert "s)" in out
4436
4437 def test_ansi_in_queue_name_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4438 """Enqueued tasks with ANSI in queue are sanitized in text output."""
4439 # We can't enqueue with a bad queue via CLI (validated), but the task
4440 # record could be crafted; sanitize_display handles it in display.
4441 repo = _make_repo(tmp_path)
4442 create_task(repo, "task")
4443 args = _tasks_ns(json_out=False, run_id="\x1b[31mred\x1b[0m")
4444 # run_id with ANSI is valid length-wise but we're checking display
4445 with _patch_repo(repo):
4446 run_tasks(args)
4447 out = capsys.readouterr().out
4448 assert "\x1b" not in out
4449
4450
4451 class TestTasksIntegration:
4452 """Full integration tests for run_tasks with realistic task states."""
4453
4454 def test_full_mixed_state_counts(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4455 repo = _make_repo(tmp_path)
4456 # pending (queue="p" — no worker claims these)
4457 create_task(repo, "p1", queue="p")
4458 create_task(repo, "p2", queue="p")
4459 # claimed (queue="c")
4460 create_task(repo, "c1", queue="c")
4461 claim_next_task(repo, "w1", queue="c")
4462 # completed (queue="done")
4463 t_done = create_task(repo, "done", queue="done")
4464 claim_next_task(repo, "w2", queue="done")
4465 complete_task(repo, t_done.task_id, "w2")
4466 # failed (queue="fail")
4467 t_fail = create_task(repo, "fail", queue="fail")
4468 claim_next_task(repo, "w3", queue="fail")
4469 fail_task(repo, t_fail.task_id, "w3", error="boom")
4470 # cancelled
4471 t_can = create_task(repo, "cancelled", queue="can")
4472 cancel_task(repo, t_can.task_id, "orch")
4473
4474 args = _tasks_ns()
4475 with _patch_repo(repo):
4476 run_tasks(args)
4477 out = json.loads(capsys.readouterr().out)
4478 assert out["total"] == 6
4479 assert out["pending"] == 2
4480 assert out["claimed"] == 1
4481 assert out["completed"] == 1
4482 assert out["failed"] == 1
4483 assert out["cancelled"] == 1
4484
4485 def test_filter_by_status_only_returns_matching(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4486 repo = _make_repo(tmp_path)
4487 create_task(repo, "pending-1", queue="p")
4488 t_done = create_task(repo, "done-1", queue="done")
4489 claim_next_task(repo, "w", queue="done")
4490 complete_task(repo, t_done.task_id, "w")
4491
4492 args = _tasks_ns(status="completed")
4493 with _patch_repo(repo):
4494 run_tasks(args)
4495 out = json.loads(capsys.readouterr().out)
4496 assert all(i["status"] == "completed" for i in out["items"])
4497 assert len(out["items"]) == 1
4498 # But total counts still reflect full queue
4499 assert out["total"] == 2
4500
4501 def test_filter_by_queue_and_status(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4502 repo = _make_repo(tmp_path)
4503 # Create and immediately complete a billing task (no competing tasks yet)
4504 t_b = create_task(repo, "billing-done", queue="billing")
4505 claim_next_task(repo, "w", queue="billing")
4506 complete_task(repo, t_b.task_id, "w")
4507 # Now add a pending billing and an infra task
4508 create_task(repo, "billing-pending", queue="billing")
4509 create_task(repo, "infra-pending", queue="infra")
4510
4511 args = _tasks_ns(queue="billing", status="completed")
4512 with _patch_repo(repo):
4513 run_tasks(args)
4514 out = json.loads(capsys.readouterr().out)
4515 assert len(out["items"]) == 1
4516 assert out["items"][0]["queue"] == "billing"
4517 assert out["items"][0]["status"] == "completed"
4518 # Global total is all 3
4519 assert out["total"] == 3
4520
4521 def test_filter_by_run_id(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4522 repo = _make_repo(tmp_path)
4523 t1 = create_task(repo, "t1")
4524 t2 = create_task(repo, "t2")
4525 claim_next_task(repo, "worker-alpha")
4526 claim_next_task(repo, "worker-beta")
4527
4528 args = _tasks_ns(run_id="worker-alpha")
4529 with _patch_repo(repo):
4530 run_tasks(args)
4531 out = json.loads(capsys.readouterr().out)
4532 assert len(out["items"]) == 1
4533 assert out["items"][0]["claimer_run_id"] == "worker-alpha"
4534
4535 def test_enqueue_then_list_shows_created_by(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4536 repo = _make_repo(tmp_path)
4537 eq_args = _enqueue_ns(title="listed-task", queue="default", run_id="enqueuer-1")
4538 with _patch_repo(repo):
4539 run_enqueue(eq_args)
4540 capsys.readouterr()
4541
4542 args = _tasks_ns()
4543 with _patch_repo(repo):
4544 run_tasks(args)
4545 out = json.loads(capsys.readouterr().out)
4546 assert len(out["items"]) == 1
4547 assert out["items"][0]["created_by"] == "enqueuer-1"
4548
4549 def test_limit_with_filter_shows_highest_priority(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4550 repo = _make_repo(tmp_path)
4551 with _freeze(_EPOCH):
4552 for i in range(10):
4553 create_task(repo, f"task-{i}", queue="q", priority=i)
4554 args = _tasks_ns(queue="q", limit=3)
4555 with _patch_repo(repo):
4556 run_tasks(args)
4557 out = json.loads(capsys.readouterr().out)
4558 assert len(out["items"]) == 3
4559 # Top 3 priorities should be 9, 8, 7
4560 assert [i["priority"] for i in out["items"]] == [9, 8, 7]
4561
4562
4563 class TestTasksStress:
4564 """Performance and concurrency tests for run_tasks."""
4565
4566 def test_500_tasks_listed_under_5s(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4567 repo = _make_repo(tmp_path)
4568 for i in range(500):
4569 create_task(repo, f"task-{i}", queue="default")
4570 args = _tasks_ns(limit=500)
4571 start = time.monotonic()
4572 with _patch_repo(repo):
4573 run_tasks(args)
4574 elapsed = time.monotonic() - start
4575 out = json.loads(capsys.readouterr().out)
4576 assert out["total"] == 500
4577 assert elapsed < 5.0, f"listing 500 tasks took {elapsed:.1f}s"
4578
4579 def test_500_tasks_with_filter_under_5s(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4580 repo = _make_repo(tmp_path)
4581 for i in range(250):
4582 create_task(repo, f"billing-{i}", queue="billing")
4583 for i in range(250):
4584 create_task(repo, f"infra-{i}", queue="infra")
4585 args = _tasks_ns(queue="billing", limit=250)
4586 start = time.monotonic()
4587 with _patch_repo(repo):
4588 run_tasks(args)
4589 elapsed = time.monotonic() - start
4590 out = json.loads(capsys.readouterr().out)
4591 assert len(out["items"]) == 250
4592 assert elapsed < 5.0, f"filtered listing took {elapsed:.1f}s"
4593
4594 def test_concurrent_reads_are_safe(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4595 """Concurrent run_tasks calls against the same repo must not crash."""
4596 import concurrent.futures
4597 repo = _make_repo(tmp_path)
4598 for i in range(50):
4599 create_task(repo, f"task-{i}")
4600 errors: list[str] = []
4601
4602 def read_tasks() -> None:
4603 args = _tasks_ns(limit=50)
4604 try:
4605 with _patch_repo(repo):
4606 run_tasks(args)
4607 except Exception as exc: # noqa: BLE001
4608 errors.append(str(exc))
4609
4610 with concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool:
4611 list(pool.map(lambda _: read_tasks(), range(20)))
4612
4613 assert not errors, f"concurrent read errors: {errors}"
4614
4615 def test_no_double_load_with_filter(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None:
4616 """Verify counts are still correct when filter is active (no re-load bug)."""
4617 repo = _make_repo(tmp_path)
4618 for i in range(20):
4619 create_task(repo, f"billing-{i}", queue="billing")
4620 for i in range(10):
4621 create_task(repo, f"infra-{i}", queue="infra")
4622
4623 args = _tasks_ns(queue="billing")
4624 with _patch_repo(repo):
4625 run_tasks(args)
4626 out = json.loads(capsys.readouterr().out)
4627 # total must reflect ALL 30 tasks, not just the 20 in billing
4628 assert out["total"] == 30
4629 assert len(out["items"]) == 20
4630
4631
4632 # ── register_all integration ───────────────────────────────────────────────────
4633
4634
4635 class TestRegisterAll:
4636 """register_all attaches all six subcommands to the given subparsers."""
4637
4638 def test_all_commands_registered(self) -> None:
4639 import argparse
4640 parser = argparse.ArgumentParser()
4641 subs = parser.add_subparsers(dest="cmd")
4642 register_all(subs)
4643 # Verify each expected command is parseable
4644 for cmd in ("enqueue", "claim", "complete", "fail-task", "cancel-task", "tasks"):
4645 # A subparser was registered for this command name
4646 # (ArgumentParser stores choices in _subparsers._group_actions)
4647 found = False
4648 for action in parser._subparsers._group_actions:
4649 if cmd in action.choices:
4650 found = True
4651 break
4652 assert found, f"Command '{cmd}' not registered"
4653
4654
4655 # ── Content-addressed task_id ─────────────────────────────────────────────────
4656
4657
4658 class TestTaskIdContentAddressed:
4659 """task_id must be sha256: of genesis content, not a random UUID."""
4660
4661 def test_task_id_is_sha256_prefixed(self, tmp_path: pathlib.Path) -> None:
4662 from muse.core.task_queue import compute_task_id
4663 tid = compute_task_id(
4664 title="render stems",
4665 queue="audio",
4666 payload={"track": 1},
4667 priority=5,
4668 created_by="agent-x",
4669 )
4670 assert tid.startswith("sha256:"), f"Expected sha256: prefix, got {tid!r}"
4671 assert len(tid) == 71
4672
4673 def test_task_id_is_sha256_not_uuid4(self, tmp_path: pathlib.Path) -> None:
4674 import re
4675 from muse.core.task_queue import compute_task_id
4676 tid = compute_task_id("render stems", "audio", {}, 0, "agent-x")
4677 uuid4_re = re.compile(
4678 r"^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$"
4679 )
4680 assert not uuid4_re.match(tid)
4681
4682 def test_task_id_deterministic(self, tmp_path: pathlib.Path) -> None:
4683 from muse.core.task_queue import compute_task_id
4684 t1 = compute_task_id("render stems", "audio", {"track": 1}, 5, "agent-x")
4685 t2 = compute_task_id("render stems", "audio", {"track": 1}, 5, "agent-x")
4686 assert t1 == t2
4687
4688 def test_task_id_differs_by_title(self, tmp_path: pathlib.Path) -> None:
4689 from muse.core.task_queue import compute_task_id
4690 t1 = compute_task_id("render stems", "audio", {}, 0, "agent-x")
4691 t2 = compute_task_id("export wav", "audio", {}, 0, "agent-x")
4692 assert t1 != t2
4693
4694 def test_task_id_differs_by_queue(self, tmp_path: pathlib.Path) -> None:
4695 from muse.core.task_queue import compute_task_id
4696 t1 = compute_task_id("render stems", "audio", {}, 0, "agent-x")
4697 t2 = compute_task_id("render stems", "midi", {}, 0, "agent-x")
4698 assert t1 != t2
4699
4700 def test_create_task_produces_sha256_id(self, tmp_path: pathlib.Path) -> None:
4701 repo = _make_repo(tmp_path)
4702 task = create_task(repo, "process audio", queue="audio", created_by="agent-x")
4703 assert task.task_id.startswith("sha256:")
4704 assert len(task.task_id) == 71
4705
4706 def test_create_task_id_matches_compute(self, tmp_path: pathlib.Path) -> None:
4707 from muse.core.task_queue import compute_task_id
4708 repo = _make_repo(tmp_path)
4709 task = create_task(
4710 repo,
4711 "process audio",
4712 queue="audio",
4713 payload={"track": 3},
4714 priority=2,
4715 created_by="agent-x",
4716 )
4717 expected = compute_task_id(
4718 title="process audio",
4719 queue="audio",
4720 payload={"track": 3},
4721 priority=2,
4722 created_by="agent-x",
4723 )
4724 assert task.task_id == expected
4725
4726
4727 # ---------------------------------------------------------------------------
4728 # Flag registration
4729 # ---------------------------------------------------------------------------
4730
4731
4732 class TestRegisterFlags:
4733 def _parse_enqueue(self, *args: str) -> "argparse.Namespace":
4734 import argparse
4735 from muse.cli.commands.task_queue import register_enqueue
4736 p = argparse.ArgumentParser()
4737 sub = p.add_subparsers()
4738 register_enqueue(sub)
4739 return p.parse_args(["enqueue", *args])
4740
4741 def test_default_json_out_is_false(self) -> None:
4742 ns = self._parse_enqueue("test-task", "--run-id", "orch")
4743 assert ns.json_out is False
4744
4745 def test_json_flag_sets_json_out(self) -> None:
4746 ns = self._parse_enqueue("test-task", "--run-id", "orch", "--json")
4747 assert ns.json_out is True
4748
4749 def test_j_shorthand_sets_json_out(self) -> None:
4750 ns = self._parse_enqueue("test-task", "--run-id", "orch", "-j")
4751 assert ns.json_out is True
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 29 days ago