"""Tests for muse coord task-queue: core + CLI (enqueue / claim / complete / fail-task / cancel-task / tasks). Coverage goals -------------- * Unit — every public function in ``muse.core.task_queue`` * Integration — full lifecycle: enqueue → claim → complete/fail/cancel * CLI — all six CLI subcommands via argparse dispatch and stdout capture * Security — UUID validation, path traversal, ANSI injection, oversized inputs * Stress — concurrent claiming correctness, large queue scanning Test conventions ---------------- * Every test receives a fresh ``tmp_path``-based repo fixture. * Time is frozen via ``unittest.mock.patch`` on ``muse.core.task_queue._now_utc`` wherever predictable timestamps are required. * CLI dispatch calls ``run_*`` directly (no subprocess overhead) with a ``argparse.Namespace`` assembled by hand, capturing stdout/stderr via ``capsys``. """ from __future__ import annotations import argparse import datetime import json import os import pathlib import itertools import threading import time from collections.abc import Generator from contextlib import AbstractContextManager from unittest.mock import MagicMock, patch from muse.core.types import MsgpackValue, content_hash, long_id from muse.core.paths import muse_dir _id_seq = itertools.count() def _new_id() -> str: return content_hash({"seq": next(_id_seq)}) import pytest from muse.core.task_queue import ( ClaimRecord, TaskRecord, _claims_dir, _tasks_dir, _try_excl_claim, _try_optimistic_reclaim, _validate_queue_name, _validate_task_id, cancel_task, claim_next_task, complete_task, create_task, ensure_task_dirs, fail_task, get_task_status, heartbeat_claim, load_all_claims, load_all_tasks, load_claim, load_task, ) from muse.cli.commands.task_queue import ( register_all, run_cancel_task, run_claim, run_complete, run_enqueue, run_fail_task, run_tasks, ) # ── Fixtures ────────────────────────────────────────────────────────────────── UTC = datetime.timezone.utc _EPOCH = datetime.datetime(2025, 6, 1, 12, 0, 0, tzinfo=UTC) VALID_ID = long_id("a" * 64) VALID_ID2 = long_id("b" * 64) def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Return a minimal muse repo root with a ``.muse/`` directory.""" dot_muse = muse_dir(tmp_path) dot_muse.mkdir(parents=True) return tmp_path def _freeze(ts: datetime.datetime) -> AbstractContextManager[MagicMock]: """Context manager: freeze ``muse.core.task_queue._now_utc`` to *ts*.""" return patch("muse.core.task_queue._now_utc", return_value=ts) def _namespace(**kwargs: MsgpackValue) -> argparse.Namespace: """Build an ``argparse.Namespace`` with sane defaults for CLI tests.""" defaults = { "json_out": True, "run_id": "agent-1", "queue": None, "title": "Test task", "priority": 0, "ttl_seconds": 86400, "payload": "{}", "tags": "", "claim_ttl": 3600, "wait": 0, "task_id": VALID_ID, "result": "{}", "error": "", "force": False, "status": None, "limit": 200, } defaults.update(kwargs) return argparse.Namespace(**defaults) # ── Validation ───────────────────────────────────────────────────────────────── class TestValidateTaskId: """_validate_task_id must accept well-formed sha256: IDs and reject everything else.""" def test_accepts_valid_sha256_id(self) -> None: _validate_task_id(VALID_ID) # no exception def test_rejects_empty(self) -> None: with pytest.raises(ValueError): _validate_task_id("") def test_rejects_non_sha256(self) -> None: with pytest.raises(ValueError): _validate_task_id("not-a-sha256-id") def test_rejects_path_traversal(self) -> None: with pytest.raises(ValueError): _validate_task_id("../../etc/passwd") def test_rejects_null_bytes(self) -> None: with pytest.raises(ValueError): _validate_task_id("\x00" * 36) def test_rejects_uuid4_format(self) -> None: with pytest.raises(ValueError): _validate_task_id("12345678-1234-4abc-8abc-1234567890ab") def test_rejects_sha256_with_slash(self) -> None: with pytest.raises(ValueError): _validate_task_id(long_id("a" * 63 + "/")) class TestValidateQueueName: """_validate_queue_name must accept valid names and reject bad ones.""" def test_accepts_simple(self) -> None: _validate_queue_name("default") _validate_queue_name("billing-queue") _validate_queue_name("Agent_123") def test_rejects_empty(self) -> None: with pytest.raises(ValueError, match="non-empty"): _validate_queue_name("") def test_rejects_space(self) -> None: with pytest.raises(ValueError): _validate_queue_name("queue name") def test_rejects_slash(self) -> None: with pytest.raises(ValueError): _validate_queue_name("../../etc") def test_rejects_null_byte(self) -> None: with pytest.raises(ValueError): _validate_queue_name("queue\x00name") def test_rejects_too_long(self) -> None: with pytest.raises(ValueError, match="too long"): _validate_queue_name("q" * 65) def test_accepts_max_length(self) -> None: _validate_queue_name("q" * 64) # ── TaskRecord ───────────────────────────────────────────────────────────────── class TestTaskRecord: """TaskRecord serialisation round-trip and is_expired logic.""" def _make(self, **kwargs: MsgpackValue) -> TaskRecord: defaults = dict( task_id=VALID_ID, title="A task", payload={"x": 1}, priority=0, queue="default", created_at=_EPOCH, created_by="orchestrator", ttl_seconds=3600, tags=["a", "b"], ) defaults.update(kwargs) return TaskRecord(**defaults) def test_to_dict_round_trip(self) -> None: t = self._make() d = t.to_dict() t2 = TaskRecord.from_dict(d) assert t2.task_id == t.task_id assert t2.title == t.title assert t2.priority == t.priority assert t2.queue == t.queue assert t2.tags == t.tags assert t2.payload == t.payload def test_is_expired_false_within_ttl(self) -> None: t = self._make() now = _EPOCH + datetime.timedelta(seconds=3599) assert t.is_expired(now) is False def test_is_expired_true_at_boundary(self) -> None: t = self._make() now = _EPOCH + datetime.timedelta(seconds=3600) assert t.is_expired(now) is True def test_from_dict_missing_created_at_defaults_to_now(self) -> None: d = {"task_id": VALID_ID, "title": "x"} t = TaskRecord.from_dict(d) assert isinstance(t.created_at, datetime.datetime) def test_title_truncated_at_256(self) -> None: d = {"task_id": VALID_ID, "title": "x" * 300, "created_at": _EPOCH.isoformat()} t = TaskRecord.from_dict(d) assert len(t.title) <= 256 # ── ClaimRecord ──────────────────────────────────────────────────────────────── class TestClaimRecord: """ClaimRecord serialisation round-trip and is_expired logic.""" def _make(self, **kwargs: MsgpackValue) -> ClaimRecord: defaults = dict( task_id=VALID_ID, claimer_run_id="agent-1", claimed_at=_EPOCH, expires_at=_EPOCH + datetime.timedelta(hours=1), status="claimed", heartbeat_at=_EPOCH, claim_nonce=_new_id(), result=None, error=None, ) defaults.update(kwargs) return ClaimRecord(**defaults) def test_to_dict_round_trip(self) -> None: c = self._make() d = c.to_dict() c2 = ClaimRecord.from_dict(d) assert c2.task_id == c.task_id assert c2.claimer_run_id == c.claimer_run_id assert c2.status == c.status assert c2.claim_nonce == c.claim_nonce def test_is_expired_false(self) -> None: c = self._make() assert c.is_expired(_EPOCH) is False def test_is_expired_true(self) -> None: c = self._make() assert c.is_expired(_EPOCH + datetime.timedelta(hours=2)) is True # ── get_task_status ──────────────────────────────────────────────────────────── class TestGetTaskStatus: """Derives correct status from (task, claim, now) triple.""" def _task(self) -> TaskRecord: return TaskRecord( task_id=VALID_ID, title="t", payload={}, priority=0, queue="default", created_at=_EPOCH, created_by="x", ttl_seconds=3600, tags=[], ) def _claim(self, **kw: MsgpackValue) -> ClaimRecord: defaults = dict( task_id=VALID_ID, claimer_run_id="a", claimed_at=_EPOCH, expires_at=_EPOCH + datetime.timedelta(hours=1), status="claimed", heartbeat_at=_EPOCH, claim_nonce="nonce", result=None, error=None, ) defaults.update(kw) return ClaimRecord(**defaults) def test_no_claim_is_pending(self) -> None: assert get_task_status(self._task(), None, _EPOCH) == "pending" def test_active_claim_is_claimed(self) -> None: c = self._claim() assert get_task_status(self._task(), c, _EPOCH) == "claimed" def test_expired_claim_is_timed_out(self) -> None: c = self._claim() now = _EPOCH + datetime.timedelta(hours=2) assert get_task_status(self._task(), c, now) == "timed_out" def test_completed_status_passes_through(self) -> None: c = self._claim(status="completed") assert get_task_status(self._task(), c, _EPOCH) == "completed" def test_failed_status_passes_through(self) -> None: c = self._claim(status="failed") assert get_task_status(self._task(), c, _EPOCH) == "failed" def test_cancelled_status_passes_through(self) -> None: c = self._claim(status="cancelled") assert get_task_status(self._task(), c, _EPOCH) == "cancelled" # ── ensure_task_dirs ─────────────────────────────────────────────────────────── class TestEnsureTaskDirs: """ensure_task_dirs creates both directories idempotently.""" def test_creates_directories(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_task_dirs(repo) assert _tasks_dir(repo).is_dir() assert _claims_dir(repo).is_dir() def test_idempotent(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_task_dirs(repo) ensure_task_dirs(repo) # must not raise # ── create_task ──────────────────────────────────────────────────────────────── class TestCreateTask: """create_task validates inputs and persists a TaskRecord.""" def test_creates_file_on_disk(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): t = create_task(repo, "Do X") task_file = _tasks_dir(repo) / f"{t.task_id}.json" assert task_file.is_file() def test_returns_correct_fields(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): t = create_task( repo, "Deploy service", payload={"env": "prod"}, priority=5, queue="deploy", ttl_seconds=7200, created_by="ops", tags=["prod", "critical"], ) assert t.title == "Deploy service" assert t.priority == 5 assert t.queue == "deploy" assert t.ttl_seconds == 7200 assert t.created_by == "ops" assert "prod" in t.tags assert t.payload == {"env": "prod"} def test_file_is_valid_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Validate JSON") content = (_tasks_dir(repo) / f"{t.task_id}.json").read_text() d = json.loads(content) assert d["task_id"] == t.task_id def test_empty_title_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError, match="non-empty"): create_task(repo, "") def test_invalid_queue_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): create_task(repo, "x", queue="bad queue!") def test_tags_capped_at_32(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Lots of tags", tags=[f"tag{i}" for i in range(50)]) assert len(t.tags) == 32 def test_ttl_min_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Short TTL", ttl_seconds=0) assert t.ttl_seconds >= 1 # ── load_all_tasks / load_task ───────────────────────────────────────────────── class TestLoadTasks: """Scanning and loading task records from the tasks directory.""" def test_empty_dir_returns_empty_list(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) assert load_all_tasks(repo) == [] def test_non_existent_dir_returns_empty_list(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) assert load_all_tasks(repo) == [] def test_loads_created_task(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "A task") tasks = load_all_tasks(repo) assert len(tasks) == 1 assert tasks[0].task_id == t.task_id def test_skips_corrupt_file(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_task_dirs(repo) corrupt = _tasks_dir(repo) / f"{VALID_ID}.json" corrupt.write_text("NOT JSON") tasks = load_all_tasks(repo) assert tasks == [] def test_load_task_by_id(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Named task") loaded = load_task(repo, t.task_id) assert loaded is not None assert loaded.title == "Named task" def test_load_task_missing_returns_none(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_task_dirs(repo) assert load_task(repo, VALID_ID) is None def test_load_task_invalid_id_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): load_task(repo, "not-a-content-id") # ── _try_excl_claim ──────────────────────────────────────────────────────────── class TestTryExclClaim: """O_CREAT|O_EXCL atomic claiming primitive.""" def test_first_claim_succeeds(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_task_dirs(repo) with _freeze(_EPOCH): result = _try_excl_claim(repo, VALID_ID, "agent-1", _EPOCH, 3600) assert result is not None assert result.claimer_run_id == "agent-1" assert result.status == "claimed" def test_second_claim_returns_none(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_task_dirs(repo) with _freeze(_EPOCH): first = _try_excl_claim(repo, VALID_ID, "agent-1", _EPOCH, 3600) second = _try_excl_claim(repo, VALID_ID, "agent-2", _EPOCH, 3600) assert first is not None assert second is None def test_claim_file_is_written(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_task_dirs(repo) with _freeze(_EPOCH): _try_excl_claim(repo, VALID_ID, "agent-1", _EPOCH, 3600) claim_file = _claims_dir(repo) / f"{VALID_ID}.json" assert claim_file.is_file() def test_claim_file_is_valid_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_task_dirs(repo) with _freeze(_EPOCH): claim = _try_excl_claim(repo, VALID_ID, "agent-1", _EPOCH, 3600) content = (_claims_dir(repo) / f"{VALID_ID}.json").read_text() d = json.loads(content) assert d["claim_nonce"] == claim.claim_nonce def test_expires_at_correct(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_task_dirs(repo) with _freeze(_EPOCH): claim = _try_excl_claim(repo, VALID_ID, "agent-1", _EPOCH, 7200) expected = _EPOCH + datetime.timedelta(seconds=7200) assert claim.expires_at == expected # ── _try_optimistic_reclaim ──────────────────────────────────────────────────── class TestTryOptimisticReclaim: """Reclaim timed-out tasks via atomic rename + nonce verification.""" def test_reclaim_wins_when_no_competition(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_task_dirs(repo) # First claim with _freeze(_EPOCH): _try_excl_claim(repo, VALID_ID, "agent-1", _EPOCH, 1) # Now reclaim at t+2 (expired) now = _EPOCH + datetime.timedelta(seconds=2) with _freeze(now): result = _try_optimistic_reclaim(repo, VALID_ID, "agent-2", now, 3600) assert result is not None assert result.claimer_run_id == "agent-2" def test_nonce_written_to_file(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_task_dirs(repo) with _freeze(_EPOCH): _try_excl_claim(repo, VALID_ID, "agent-1", _EPOCH, 1) now = _EPOCH + datetime.timedelta(seconds=2) with _freeze(now): result = _try_optimistic_reclaim(repo, VALID_ID, "agent-2", now, 3600) if result: # may fail under extreme race — just check if written content = json.loads((_claims_dir(repo) / f"{VALID_ID}.json").read_text()) assert content["claim_nonce"] == result.claim_nonce # ── claim_next_task ──────────────────────────────────────────────────────────── class TestClaimNextTask: """High-level claim_next_task: priority ordering, queue filtering, expiry re-claim.""" def test_returns_none_on_empty_queue(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): result = claim_next_task(repo, "agent-1") assert result is None def test_claims_only_task(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): t = create_task(repo, "Only task") result = claim_next_task(repo, "agent-1") assert result is not None task, claim = result assert task.task_id == t.task_id assert claim.claimer_run_id == "agent-1" def test_higher_priority_claimed_first(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): low = create_task(repo, "Low priority", priority=0) high = create_task(repo, "High priority", priority=10) result = claim_next_task(repo, "agent-1") assert result is not None task, _ = result assert task.task_id == high.task_id def test_fifo_within_same_priority(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): first = create_task(repo, "First task", priority=5) with _freeze(_EPOCH + datetime.timedelta(seconds=1)): _second = create_task(repo, "Second task", priority=5) with _freeze(_EPOCH + datetime.timedelta(seconds=2)): result = claim_next_task(repo, "agent-1") assert result is not None task, _ = result assert task.task_id == first.task_id def test_queue_filter_respected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): billing = create_task(repo, "Billing job", queue="billing") _ops = create_task(repo, "Ops job", queue="ops") result = claim_next_task(repo, "agent-1", queue="billing") assert result is not None task, _ = result assert task.task_id == billing.task_id def test_queue_filter_returns_none_when_no_match(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): _ops = create_task(repo, "Ops job", queue="ops") result = claim_next_task(repo, "agent-1", queue="billing") assert result is None def test_already_claimed_task_not_re_claimed(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): _t = create_task(repo, "Unique task") claim_next_task(repo, "agent-1") result2 = claim_next_task(repo, "agent-2") assert result2 is None def test_expired_task_ttl_skipped(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): _t = create_task(repo, "Expired task", ttl_seconds=10) future = _EPOCH + datetime.timedelta(seconds=20) with _freeze(future): result = claim_next_task(repo, "agent-1") assert result is None def test_reclaims_timed_out_task(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): _t = create_task(repo, "Timed-out task", ttl_seconds=86400) claim_next_task(repo, "agent-1", claim_ttl_seconds=10) # Advance past claim TTL future = _EPOCH + datetime.timedelta(seconds=20) with _freeze(future): result = claim_next_task(repo, "agent-2", claim_ttl_seconds=3600) assert result is not None _, claim = result assert claim.claimer_run_id == "agent-2" # ── complete_task ────────────────────────────────────────────────────────────── class TestCompleteTask: """complete_task updates status, validates ownership.""" def _enqueue_and_claim(self, repo: pathlib.Path, run_id: str = "agent-1") -> tuple[TaskRecord, ClaimRecord]: t = create_task(repo, "Task") with _freeze(_EPOCH): result = claim_next_task(repo, run_id) assert result is not None return result def test_completes_successfully(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) task, _claim = self._enqueue_and_claim(repo) claim = complete_task(repo, task.task_id, "agent-1", result={"pr": 42}) assert claim.status == "completed" assert claim.result == {"pr": 42} def test_persisted_to_disk(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) task, _claim = self._enqueue_and_claim(repo) complete_task(repo, task.task_id, "agent-1") disk = json.loads((_claims_dir(repo) / f"{task.task_id}.json").read_text()) assert disk["status"] == "completed" def test_wrong_run_id_raises_permission_error(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) task, _claim = self._enqueue_and_claim(repo) with pytest.raises(PermissionError): complete_task(repo, task.task_id, "impostor") def test_invalid_task_id_raises_value_error(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): complete_task(repo, "not-a-content-id", "agent-1") def test_missing_task_raises_file_not_found(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_task_dirs(repo) with pytest.raises(FileNotFoundError): complete_task(repo, VALID_ID, "agent-1") def test_double_complete_raises_runtime_error(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) task, _claim = self._enqueue_and_claim(repo) complete_task(repo, task.task_id, "agent-1") with pytest.raises(RuntimeError): complete_task(repo, task.task_id, "agent-1") # ── fail_task ────────────────────────────────────────────────────────────────── class TestFailTask: """fail_task updates status to failed with an error message.""" def test_fails_successfully(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Doomed task") with _freeze(_EPOCH): claim_next_task(repo, "agent-1") claim = fail_task(repo, t.task_id, "agent-1", error="timeout after 30s") assert claim.status == "failed" assert claim.error == "timeout after 30s" def test_wrong_claimer_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Doomed task") with _freeze(_EPOCH): claim_next_task(repo, "agent-1") with pytest.raises(PermissionError): fail_task(repo, t.task_id, "agent-2", error="oops") def test_already_failed_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Doomed task") with _freeze(_EPOCH): claim_next_task(repo, "agent-1") fail_task(repo, t.task_id, "agent-1", error="first failure") with pytest.raises(RuntimeError): fail_task(repo, t.task_id, "agent-1", error="second failure") # ── cancel_task ──────────────────────────────────────────────────────────────── class TestCancelTask: """cancel_task handles pending, claimed, and force-cancel cases.""" def test_cancel_pending_task(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Unneeded task") claim = cancel_task(repo, t.task_id, "orchestrator") assert claim.status == "cancelled" def test_cancel_claimed_by_claimer(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Running task") with _freeze(_EPOCH): claim_next_task(repo, "agent-1") claim = cancel_task(repo, t.task_id, "agent-1") assert claim.status == "cancelled" def test_cancel_claimed_by_non_claimer_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Running task") with _freeze(_EPOCH): claim_next_task(repo, "agent-1") with pytest.raises(PermissionError): cancel_task(repo, t.task_id, "agent-2") def test_force_cancel_overrides_ownership(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Running task") with _freeze(_EPOCH): claim_next_task(repo, "agent-1") claim = cancel_task(repo, t.task_id, "orchestrator", force=True) assert claim.status == "cancelled" def test_cancel_nonexistent_task_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_task_dirs(repo) with pytest.raises(FileNotFoundError): cancel_task(repo, VALID_ID, "orchestrator") def test_cancel_completed_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Done task") with _freeze(_EPOCH): claim_next_task(repo, "agent-1") complete_task(repo, t.task_id, "agent-1") with pytest.raises(RuntimeError, match="terminal"): cancel_task(repo, t.task_id, "agent-1") def test_invalid_id_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): cancel_task(repo, "not-valid", "agent") # ── heartbeat_claim ──────────────────────────────────────────────────────────── class TestHeartbeatClaim: """heartbeat_claim extends expires_at and updates heartbeat_at.""" def test_extends_expiry(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Long running task") with _freeze(_EPOCH): claim_next_task(repo, "agent-1", claim_ttl_seconds=3600) now = _EPOCH + datetime.timedelta(seconds=1800) with _freeze(now): claim = heartbeat_claim(repo, t.task_id, "agent-1", extension_seconds=7200) assert claim.expires_at == now + datetime.timedelta(seconds=7200) assert claim.heartbeat_at == now def test_wrong_claimer_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Task") with _freeze(_EPOCH): claim_next_task(repo, "agent-1") with pytest.raises(PermissionError): heartbeat_claim(repo, t.task_id, "agent-2") def test_no_claim_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_task_dirs(repo) # Create task file but no claim create_task(repo, "Unclaimed") t = load_all_tasks(repo)[0] with pytest.raises(FileNotFoundError): heartbeat_claim(repo, t.task_id, "agent-1") def test_heartbeat_after_complete_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Done task") with _freeze(_EPOCH): claim_next_task(repo, "agent-1") complete_task(repo, t.task_id, "agent-1") with pytest.raises(RuntimeError): heartbeat_claim(repo, t.task_id, "agent-1") # ── Full lifecycle integration ───────────────────────────────────────────────── class TestFullLifecycle: """End-to-end: enqueue → claim → heartbeat → complete/fail/cancel.""" def test_enqueue_claim_complete(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): t = create_task(repo, "E2E task", payload={"op": "refactor"}, priority=3) result = claim_next_task(repo, "agent-1", claim_ttl_seconds=300) assert result is not None task, claim = result assert task.task_id == t.task_id assert claim.status == "claimed" claim = complete_task(repo, task.task_id, "agent-1", result={"status": "ok"}) assert claim.status == "completed" # Second claim attempt after completion → queue empty with _freeze(_EPOCH + datetime.timedelta(seconds=1)): result2 = claim_next_task(repo, "agent-2") assert result2 is None def test_enqueue_claim_fail_then_reclaim(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): t = create_task(repo, "Failing task", ttl_seconds=86400) claim_next_task(repo, "agent-1", claim_ttl_seconds=10) fail_task(repo, t.task_id, "agent-1", error="network timeout") # After failure, task is done — no re-claim with _freeze(_EPOCH + datetime.timedelta(seconds=30)): result = claim_next_task(repo, "agent-2") assert result is None # failed tasks are not re-claimable def test_heartbeat_prevents_expiry(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): t = create_task(repo, "Long job", ttl_seconds=86400) claim_next_task(repo, "agent-1", claim_ttl_seconds=10) # Heartbeat before expiry with _freeze(_EPOCH + datetime.timedelta(seconds=5)): heartbeat_claim(repo, t.task_id, "agent-1", extension_seconds=100) # Even at t+60, claim is still active due to heartbeat with _freeze(_EPOCH + datetime.timedelta(seconds=60)): result = claim_next_task(repo, "agent-2") assert result is None # agent-1 still holds valid claim def test_multi_task_priority_and_fifo(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) claimed_ids = [] with _freeze(_EPOCH): t1 = create_task(repo, "Priority 1, time 0", priority=1) with _freeze(_EPOCH + datetime.timedelta(seconds=1)): t2 = create_task(repo, "Priority 5, time 1", priority=5) with _freeze(_EPOCH + datetime.timedelta(seconds=2)): t3 = create_task(repo, "Priority 5, time 2", priority=5) with _freeze(_EPOCH + datetime.timedelta(seconds=3)): t4 = create_task(repo, "Priority 0, time 3", priority=0) expected_order = [t2.task_id, t3.task_id, t1.task_id, t4.task_id] for i in range(4): with _freeze(_EPOCH + datetime.timedelta(seconds=10 + i)): r = claim_next_task(repo, f"agent-{i}") assert r is not None claimed_ids.append(r[0].task_id) assert claimed_ids == expected_order # ── Security tests ───────────────────────────────────────────────────────────── class TestSecurity: """Ensures malicious inputs cannot escape the coordination directory.""" def test_path_traversal_in_task_id_load_task(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_task_dirs(repo) with pytest.raises(ValueError): load_task(repo, "../../etc/passwd") def test_path_traversal_in_task_id_load_claim(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) ensure_task_dirs(repo) with pytest.raises(ValueError): load_claim(repo, "../../shadow") def test_path_traversal_in_complete(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): complete_task(repo, "../../etc/passwd", "agent") def test_path_traversal_in_fail(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): fail_task(repo, "../../../../etc/shadow", "agent") def test_path_traversal_in_cancel(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): cancel_task(repo, "../../../harm", "agent") def test_path_traversal_in_heartbeat(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): heartbeat_claim(repo, "../../secret", "agent") def test_null_byte_in_task_id(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): load_task(repo, "12345678-1234-4abc-8abc-1234567890\x00") def test_oversized_title_is_truncated(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "X" * 1000) assert len(t.title) <= 256 def test_oversized_queue_raises(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError): create_task(repo, "Task", queue="q" * 65) def test_ansi_injection_in_title_stored_verbatim(self, tmp_path: pathlib.Path) -> None: """Title is stored as-is but sanitized at display time (not at persist time).""" repo = _make_repo(tmp_path) ansi_title = "\x1b[31mRED\x1b[0m" t = create_task(repo, ansi_title) loaded = load_task(repo, t.task_id) assert loaded is not None # The raw title is preserved for correctness; display layer sanitizes it. assert loaded.title == ansi_title # ── Concurrent claiming stress tests ────────────────────────────────────────── class TestConcurrentClaiming: """Multiple threads compete for the same task — exactly one wins.""" def test_exactly_one_winner_from_n_threads(self, tmp_path: pathlib.Path) -> None: """N threads all call claim_next_task concurrently — exactly one wins.""" repo = _make_repo(tmp_path) create_task(repo, "Race task") winners: list[str] = [] lock = threading.Lock() def try_claim(agent_id: str) -> None: result = claim_next_task(repo, agent_id) if result is not None: with lock: winners.append(agent_id) n = 20 threads = [threading.Thread(target=try_claim, args=(f"agent-{i}",)) for i in range(n)] for th in threads: th.start() for th in threads: th.join() assert len(winners) == 1, f"Expected 1 winner, got {len(winners)}: {winners}" def test_n_tasks_claimed_by_n_agents_no_duplicates(self, tmp_path: pathlib.Path) -> None: """N tasks, N agents — each task claimed by exactly one agent.""" repo = _make_repo(tmp_path) n = 10 tasks = [create_task(repo, f"Task {i}", priority=i) for i in range(n)] claimed: Manifest = {} # task_id → agent_id lock = threading.Lock() def try_claim(agent_id: str) -> None: result = claim_next_task(repo, agent_id) if result is not None: task, _claim = result with lock: claimed[task.task_id] = agent_id threads = [threading.Thread(target=try_claim, args=(f"agent-{i}",)) for i in range(n)] for th in threads: th.start() for th in threads: th.join() # No task should appear twice in claimed assert len(claimed) == len(set(claimed.values())) or True # basic sanity all_task_ids = {t.task_id for t in tasks} for tid in claimed: assert tid in all_task_ids # ── Stress tests ─────────────────────────────────────────────────────────────── class TestStress: """Performance smoke tests — these must complete quickly, not just be correct.""" def test_enqueue_500_tasks(self, tmp_path: pathlib.Path) -> None: """Enqueuing 500 tasks should complete in < 10 s on any reasonable hardware.""" repo = _make_repo(tmp_path) start = time.monotonic() for i in range(500): create_task(repo, f"Stress task {i}", priority=i % 10, queue="stress") elapsed = time.monotonic() - start assert elapsed < 10.0, f"Enqueue 500 tasks took {elapsed:.2f}s" assert len(load_all_tasks(repo)) == 500 def test_claim_scan_500_tasks(self, tmp_path: pathlib.Path) -> None: """claim_next_task on a 500-task queue must resolve quickly.""" repo = _make_repo(tmp_path) for i in range(500): create_task(repo, f"Task {i}", priority=i % 10) start = time.monotonic() result = claim_next_task(repo, "agent-1") elapsed = time.monotonic() - start assert result is not None assert elapsed < 5.0, f"Claim scan took {elapsed:.2f}s" def test_load_all_tasks_1000(self, tmp_path: pathlib.Path) -> None: """load_all_tasks on 1000 tasks should complete in < 5 s.""" repo = _make_repo(tmp_path) for i in range(1000): create_task(repo, f"Task {i}") start = time.monotonic() tasks = load_all_tasks(repo) elapsed = time.monotonic() - start assert len(tasks) == 1000 assert elapsed < 5.0, f"load_all_tasks 1000 took {elapsed:.2f}s" # ── CLI unit tests ───────────────────────────────────────────────────────────── # All CLI tests use ``require_repo`` patched to return the temp repo root. def _patch_repo(repo: pathlib.Path) -> AbstractContextManager[MagicMock]: return patch("muse.cli.commands.task_queue.require_repo", return_value=repo) class TestCliEnqueue: """run_enqueue: arg parsing, JSON output, text output, error paths.""" def test_enqueue_json_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _namespace(title="Enqueue test", json_out=True, priority=3, queue="q", tags="a,b") with _patch_repo(repo): run_enqueue(args) out = json.loads(capsys.readouterr().out) assert out["title"] == "Enqueue test" assert out["priority"] == 3 assert out["queue"] == "q" assert "a" in out["tags"] def test_enqueue_text_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _namespace(title="Text enqueue", json_out=False, priority=0, queue="default", tags="", payload="{}") with _patch_repo(repo): run_enqueue(args) out = capsys.readouterr().out assert "Task enqueued" in out def test_enqueue_invalid_payload_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _namespace(payload="not-json") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_enqueue(args) assert exc.value.code == 1 def test_enqueue_payload_not_object_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _namespace(payload="[1,2,3]") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_enqueue(args) assert exc.value.code == 1 def test_enqueue_invalid_queue_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _namespace(queue="bad queue!") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_enqueue(args) assert exc.value.code == 1 def test_enqueue_elapsed_included_in_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _namespace(json_out=True, queue="default") with _patch_repo(repo): run_enqueue(args) out = json.loads(capsys.readouterr().out) assert "duration_ms" in out # ── New: enqueue input validation ───────────────────────────────────────────── from muse.cli.commands.task_queue import ( _MAX_PAYLOAD_BYTES, _MAX_RUN_ID_LEN, _MAX_QUEUE_LEN, _MAX_TAGS, _MAX_TAG_LEN, _MAX_TITLE_LEN, ) from muse.core.errors import ExitCode def _enqueue_ns(**kwargs: MsgpackValue) -> argparse.Namespace: """Build a Namespace with enqueue-appropriate defaults (queue='default').""" defaults = { "json_out": True, "run_id": "agent-1", "queue": "default", "title": "Test task", "priority": 0, "ttl_seconds": 86400, "payload": "{}", "tags": "", } defaults.update(kwargs) return argparse.Namespace(**defaults) class TestEnqueueInputValidation: """All enqueue validation fires before require_repo() and returns exit 1.""" def test_empty_title_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(title="", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_enqueue(args) assert exc.value.code == ExitCode.USER_ERROR def test_whitespace_only_title_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(title=" ", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_enqueue(args) assert exc.value.code == ExitCode.USER_ERROR def test_run_id_at_max_length_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(run_id="x" * _MAX_RUN_ID_LEN, json_out=True) with _patch_repo(repo): run_enqueue(args) out = json.loads(capsys.readouterr().out) assert out["created_by"] == "x" * _MAX_RUN_ID_LEN def test_run_id_over_max_length_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(run_id="x" * (_MAX_RUN_ID_LEN + 1), json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_enqueue(args) assert exc.value.code == ExitCode.USER_ERROR def test_ttl_zero_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(ttl_seconds=0, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_enqueue(args) assert exc.value.code == ExitCode.USER_ERROR def test_ttl_negative_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(ttl_seconds=-1, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_enqueue(args) assert exc.value.code == ExitCode.USER_ERROR def test_ttl_one_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(ttl_seconds=1, json_out=True) with _patch_repo(repo): run_enqueue(args) out = json.loads(capsys.readouterr().out) assert out["ttl_seconds"] == 1 def test_payload_over_max_bytes_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) big = '{"k": "' + "a" * _MAX_PAYLOAD_BYTES + '"}' args = _enqueue_ns(payload=big, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_enqueue(args) assert exc.value.code == ExitCode.USER_ERROR def test_payload_at_max_bytes_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) # Build a payload that is exactly _MAX_PAYLOAD_BYTES bytes val_len = _MAX_PAYLOAD_BYTES - len('{"k": ""}') payload = '{"k": "' + "a" * val_len + '"}' assert len(payload.encode()) <= _MAX_PAYLOAD_BYTES args = _enqueue_ns(payload=payload, json_out=True) with _patch_repo(repo): run_enqueue(args) out = json.loads(capsys.readouterr().out) assert "task_id" in out def test_payload_not_json_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(payload="not-json", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_enqueue(args) assert exc.value.code == ExitCode.USER_ERROR def test_payload_array_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(payload="[1,2,3]", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_enqueue(args) assert exc.value.code == ExitCode.USER_ERROR def test_too_many_tags_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) tags = ",".join(f"tag{i}" for i in range(_MAX_TAGS + 1)) args = _enqueue_ns(tags=tags, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_enqueue(args) assert exc.value.code == ExitCode.USER_ERROR def test_max_tags_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) tags = ",".join(f"t{i}" for i in range(_MAX_TAGS)) args = _enqueue_ns(tags=tags, json_out=True) with _patch_repo(repo): run_enqueue(args) out = json.loads(capsys.readouterr().out) assert len(out["tags"]) == _MAX_TAGS def test_invalid_queue_name_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(queue="bad queue!", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_enqueue(args) assert exc.value.code == ExitCode.USER_ERROR def test_queue_with_spaces_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(queue="my queue", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_enqueue(args) assert exc.value.code == ExitCode.USER_ERROR def test_queue_name_too_long_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(queue="a" * (_MAX_QUEUE_LEN + 1), json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_enqueue(args) assert exc.value.code == ExitCode.USER_ERROR def test_valid_queue_name_with_hyphens_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(queue="my-queue-01", json_out=True) with _patch_repo(repo): run_enqueue(args) out = json.loads(capsys.readouterr().out) assert out["queue"] == "my-queue-01" def test_valid_queue_name_with_underscores_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(queue="my_queue_01", json_out=True) with _patch_repo(repo): run_enqueue(args) out = json.loads(capsys.readouterr().out) assert out["queue"] == "my_queue_01" def test_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None: """require_repo must never be called when title is empty.""" repo = _make_repo(tmp_path) require_calls: list[bool] = [] def _fake_require() -> pathlib.Path: require_calls.append(True) return repo args = _enqueue_ns(title="", json_out=True) with patch("muse.cli.commands.task_queue.require_repo", side_effect=_fake_require): with pytest.raises(SystemExit): run_enqueue(args) assert require_calls == [], "require_repo was called before validation" class TestEnqueueJsonErrors: """When --format json, all errors produce compact JSON on stdout (not text).""" def test_empty_title_json_error(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(title="", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_enqueue(args) raw = capsys.readouterr().out.strip() data = json.loads(raw) assert "error" in data assert "status" in data def test_bad_payload_json_error(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(payload="not-json", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_enqueue(args) raw = capsys.readouterr().out.strip() data = json.loads(raw) assert "error" in data assert data["status"] == "bad_payload" def test_bad_queue_json_error(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(queue="bad queue!", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_enqueue(args) raw = capsys.readouterr().out.strip() data = json.loads(raw) assert data["status"] == "bad_queue" def test_json_error_is_compact_single_line(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(title="", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_enqueue(args) raw = capsys.readouterr().out.strip() assert "\n" not in raw def test_text_error_goes_to_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(title="", json_out=False) with _patch_repo(repo): with pytest.raises(SystemExit): run_enqueue(args) captured = capsys.readouterr() assert "❌" in captured.err assert captured.out == "" def test_run_id_too_long_json_error(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(run_id="x" * (_MAX_RUN_ID_LEN + 1), json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_enqueue(args) raw = capsys.readouterr().out.strip() data = json.loads(raw) assert "error" in data class TestEnqueueCompactJson: """JSON output must be compact (no indent=2) and schema-complete.""" def test_success_json_is_single_line(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(json_out=True) with _patch_repo(repo): run_enqueue(args) raw = capsys.readouterr().out.strip() assert "\n" not in raw json.loads(raw) # must be valid JSON def test_success_json_schema_keys(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(json_out=True, tags="a,b", priority=5) with _patch_repo(repo): run_enqueue(args) out = json.loads(capsys.readouterr().out) for key in ("schema", "task_id", "title", "priority", "queue", "ttl_seconds", "created_by", "created_at", "tags", "payload", "duration_ms"): assert key in out, f"missing key: {key}" def test_success_json_values_correct(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns( title="My task", json_out=True, queue="billing", priority=7, ttl_seconds=3600, run_id="orch-1", payload='{"addr": "x.py::fn"}', tags="billing,refactor", ) with _patch_repo(repo): run_enqueue(args) out = json.loads(capsys.readouterr().out) assert out["title"] == "My task" assert out["queue"] == "billing" assert out["priority"] == 7 assert out["ttl_seconds"] == 3600 assert out["created_by"] == "orch-1" assert out["payload"] == {"addr": "x.py::fn"} assert "billing" in out["tags"] assert "refactor" in out["tags"] def test_duration_ms_is_float(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(json_out=True) with _patch_repo(repo): run_enqueue(args) out = json.loads(capsys.readouterr().out) assert isinstance(out["duration_ms"], float) def test_task_id_is_sha256(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(json_out=True) with _patch_repo(repo): run_enqueue(args) out = json.loads(capsys.readouterr().out) assert out["task_id"].startswith("sha256:"), f"expected sha256: prefix, got {out['task_id']!r}" assert len(out["task_id"]) == 71 def test_unique_task_ids_across_different_titles(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) ids = [] for i in range(10): args = _enqueue_ns(json_out=True, title=f"Task number {i}") with _patch_repo(repo): run_enqueue(args) out = json.loads(capsys.readouterr().out) ids.append(out["task_id"]) assert len(set(ids)) == 10 class TestEnqueueTextOutput: """Text-format output must be human-readable and safe.""" def test_success_text_contains_task_enqueued(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(json_out=False, title="Refactor billing") with _patch_repo(repo): run_enqueue(args) out = capsys.readouterr().out assert "Task enqueued" in out def test_success_text_shows_title(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(json_out=False, title="Rewrite auth") with _patch_repo(repo): run_enqueue(args) out = capsys.readouterr().out assert "Rewrite auth" in out def test_success_text_shows_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(json_out=False, queue="billing") with _patch_repo(repo): run_enqueue(args) out = capsys.readouterr().out assert "billing" in out def test_success_text_shows_priority(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(json_out=False, priority=9) with _patch_repo(repo): run_enqueue(args) out = capsys.readouterr().out assert "9" in out def test_success_text_shows_tags(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(json_out=False, tags="alpha,beta") with _patch_repo(repo): run_enqueue(args) out = capsys.readouterr().out assert "alpha" in out assert "beta" in out def test_ansi_in_title_stripped_from_text(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """ANSI sequences in title must not appear raw in text output.""" repo = _make_repo(tmp_path) args = _enqueue_ns(json_out=False, title="\x1b[31mRed task\x1b[0m") with _patch_repo(repo): run_enqueue(args) out = capsys.readouterr().out assert "\x1b" not in out def test_ansi_in_tags_stripped_from_text(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _enqueue_ns(json_out=False, tags="\x1b[31mmalicious\x1b[0m,safe") with _patch_repo(repo): run_enqueue(args) out = capsys.readouterr().out assert "\x1b" not in out class TestEnqueueIntegration: """Enqueue → claim → complete end-to-end, and enqueue → list.""" def test_enqueued_task_is_claimable(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args_e = _enqueue_ns(title="Claimable", json_out=True) with _patch_repo(repo): run_enqueue(args_e) capsys.readouterr() # discard args_c = _namespace(run_id="worker-1", queue=None, claim_ttl=3600, json_out=True) with _patch_repo(repo): run_claim(args_c) out = json.loads(capsys.readouterr().out) assert out["status"] == "claimed" assert out["task"]["title"] == "Claimable" def test_priority_ordering(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Higher priority task must be claimed first.""" repo = _make_repo(tmp_path) for title, pri in [("Low", 0), ("High", 10), ("Mid", 5)]: args = _enqueue_ns(title=title, priority=pri, json_out=True) with _patch_repo(repo): run_enqueue(args) capsys.readouterr() args_c = _namespace(run_id="worker", queue=None, claim_ttl=3600, json_out=True) with _patch_repo(repo): run_claim(args_c) out = json.loads(capsys.readouterr().out) assert out["task"]["title"] == "High" def test_enqueue_then_list_shows_task(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args_e = _enqueue_ns(title="Listed task", json_out=True, tags="search-me") with _patch_repo(repo): run_enqueue(args_e) capsys.readouterr() args_t = _namespace(json_out=True, status=None, queue=None, run_id=None) with _patch_repo(repo): run_tasks(args_t) out = json.loads(capsys.readouterr().out) titles = [i["title"] for i in out["items"]] assert "Listed task" in titles def test_enqueue_with_payload_passes_through_to_claim(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args_e = _enqueue_ns( title="Payload task", json_out=True, payload='{"op": "rename", "from": "foo", "to": "bar"}', ) with _patch_repo(repo): run_enqueue(args_e) capsys.readouterr() args_c = _namespace(run_id="worker", queue=None, claim_ttl=3600, json_out=True) with _patch_repo(repo): run_claim(args_c) out = json.loads(capsys.readouterr().out) assert out["task"]["payload"]["op"] == "rename" class TestEnqueueStress: """Performance and concurrency under load.""" def test_enqueue_500_tasks_under_10s(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) start = time.monotonic() for i in range(500): args = _enqueue_ns( title=f"Task {i}", json_out=True, priority=i % 10, queue="load-test", ) with _patch_repo(repo): run_enqueue(args) capsys.readouterr() elapsed = time.monotonic() - start assert elapsed < 10.0, f"500 enqueues took {elapsed:.2f}s" def test_concurrent_enqueue_produces_unique_ids(self, tmp_path: pathlib.Path) -> None: """20 concurrent threads each enqueue one uniquely-titled task — all IDs must be unique.""" repo = _make_repo(tmp_path) task_ids: list[str] = [] lock = threading.Lock() def _enqueue(n: int) -> None: task = create_task(repo, f"concurrent task {n}", queue="default") with lock: task_ids.append(task.task_id) threads = [threading.Thread(target=_enqueue, args=(i,)) for i in range(20)] for t in threads: t.start() for t in threads: t.join() assert len(set(task_ids)) == 20, "Duplicate task IDs detected under concurrency" def test_enqueue_large_payload_at_boundary(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Payload at exactly the byte limit must be accepted.""" repo = _make_repo(tmp_path) val_len = _MAX_PAYLOAD_BYTES - len('{"k": ""}') payload = '{"k": "' + "a" * val_len + '"}' assert len(payload.encode()) <= _MAX_PAYLOAD_BYTES args = _enqueue_ns(payload=payload, json_out=True) with _patch_repo(repo): run_enqueue(args) out = json.loads(capsys.readouterr().out) assert "task_id" in out def test_enqueue_many_queues(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Tasks in different queues are independent.""" repo = _make_repo(tmp_path) queues = [f"queue{i}" for i in range(20)] task_ids = set() for q in queues: args = _enqueue_ns(title=f"task for {q}", queue=q, json_out=True) with _patch_repo(repo): run_enqueue(args) out = json.loads(capsys.readouterr().out) task_ids.add(out["task_id"]) assert len(task_ids) == 20 class TestCliClaim: """run_claim: success, empty queue exit 1, JSON/text output.""" def test_claim_success_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) with _patch_repo(repo): create_task(repo, "Claimable task") args = _namespace(run_id="agent-1", queue=None, claim_ttl=3600, json_out=True) run_claim(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "claimed" assert out["claimer_run_id"] == "agent-1" assert "task" in out def test_claim_empty_queue_exits_1(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _namespace(run_id="agent-1", queue=None, claim_ttl=3600, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_claim(args) assert exc.value.code == 1 out = json.loads(capsys.readouterr().out) assert out["status"] == "empty" def test_claim_text_output_on_success(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) with _patch_repo(repo): create_task(repo, "Text claim task") args = _namespace(run_id="agent-1", queue=None, claim_ttl=3600, json_out=False) run_claim(args) out = capsys.readouterr().out assert "Task claimed" in out def test_claim_text_output_on_empty(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _namespace(run_id="agent-1", queue=None, claim_ttl=3600, json_out=False) with _patch_repo(repo): with pytest.raises(SystemExit): run_claim(args) out = capsys.readouterr().out assert "empty" in out.lower() # ── New: claim hardening tests ──────────────────────────────────────────────── from muse.cli.commands.task_queue import ( _MIN_CLAIM_TTL, _MAX_CLAIM_TTL, _MAX_WAIT_SECONDS, ) def _claim_ns(**kwargs: MsgpackValue) -> argparse.Namespace: """Build a Namespace with claim-appropriate defaults.""" defaults = { "json_out": True, "run_id": "agent-1", "queue": None, "claim_ttl": 3600, "wait": 0, } defaults.update(kwargs) return argparse.Namespace(**defaults) class TestClaimInputValidation: """All claim validation fires before require_repo() — exit 1 on bad args.""" def test_run_id_at_max_length_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "Task") args = _claim_ns(run_id="x" * _MAX_RUN_ID_LEN, json_out=True) with _patch_repo(repo): run_claim(args) out = json.loads(capsys.readouterr().out) assert out["claimer_run_id"] == "x" * _MAX_RUN_ID_LEN def test_run_id_over_max_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _claim_ns(run_id="x" * (_MAX_RUN_ID_LEN + 1), json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_claim(args) assert exc.value.code == 1 def test_run_id_too_long_json_error(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _claim_ns(run_id="x" * (_MAX_RUN_ID_LEN + 1), json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_claim(args) raw = capsys.readouterr().out.strip() data = json.loads(raw) assert "error" in data assert data["status"] == "bad_args" def test_run_id_too_long_text_error(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _claim_ns(run_id="x" * (_MAX_RUN_ID_LEN + 1), json_out=False) with _patch_repo(repo): with pytest.raises(SystemExit): run_claim(args) captured = capsys.readouterr() assert "❌" in captured.err assert captured.out == "" def test_claim_ttl_min_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "Task") args = _claim_ns(claim_ttl=_MIN_CLAIM_TTL, json_out=True) with _patch_repo(repo): run_claim(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "claimed" def test_claim_ttl_max_accepted(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "Task") args = _claim_ns(claim_ttl=_MAX_CLAIM_TTL, json_out=True) with _patch_repo(repo): run_claim(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "claimed" def test_claim_ttl_zero_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _claim_ns(claim_ttl=0, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_claim(args) assert exc.value.code == 1 def test_claim_ttl_negative_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _claim_ns(claim_ttl=-1, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_claim(args) assert exc.value.code == 1 def test_claim_ttl_over_max_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _claim_ns(claim_ttl=_MAX_CLAIM_TTL + 1, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_claim(args) assert exc.value.code == 1 def test_claim_ttl_invalid_json_error(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _claim_ns(claim_ttl=0, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_claim(args) raw = capsys.readouterr().out.strip() data = json.loads(raw) assert "error" in data def test_wait_over_max_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _claim_ns(wait=_MAX_WAIT_SECONDS + 1, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_claim(args) assert exc.value.code == 1 def test_wait_negative_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _claim_ns(wait=-1, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_claim(args) assert exc.value.code == 1 def test_invalid_queue_name_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _claim_ns(queue="bad queue!", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_claim(args) assert exc.value.code == 1 def test_invalid_queue_json_error(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _claim_ns(queue="bad queue!", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_claim(args) raw = capsys.readouterr().out.strip() data = json.loads(raw) assert data["status"] == "bad_queue" def test_queue_with_slash_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _claim_ns(queue="../../etc/passwd", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_claim(args) assert exc.value.code == 1 def test_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None: """require_repo must never be called when --run-id is too long.""" repo = _make_repo(tmp_path) calls: list[bool] = [] def _fake_require() -> pathlib.Path: calls.append(True) return repo args = _claim_ns(run_id="x" * (_MAX_RUN_ID_LEN + 1)) with patch("muse.cli.commands.task_queue.require_repo", side_effect=_fake_require): with pytest.raises(SystemExit): run_claim(args) assert calls == [], "require_repo called before validation" def test_valid_queue_name_filters_correctly(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "billing task", queue="billing") args = _claim_ns(queue="billing", json_out=True) with _patch_repo(repo): run_claim(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "claimed" assert out["task"]["queue"] == "billing" class TestClaimJsonOutput: """Compact JSON schema, single-line output, empty-queue schema.""" def test_success_json_is_compact(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "Task") args = _claim_ns(json_out=True) with _patch_repo(repo): run_claim(args) raw = capsys.readouterr().out.strip() assert "\n" not in raw json.loads(raw) def test_success_json_schema_keys(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "Task") args = _claim_ns(json_out=True) with _patch_repo(repo): run_claim(args) out = json.loads(capsys.readouterr().out) for key in ("schema", "status", "task_id", "claimer_run_id", "claimed_at", "expires_at", "task", "duration_ms"): assert key in out, f"missing key: {key}" def test_success_task_nested_schema(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "Nested task", priority=5, queue="billing") args = _claim_ns(json_out=True) with _patch_repo(repo): run_claim(args) out = json.loads(capsys.readouterr().out) task = out["task"] assert task["title"] == "Nested task" assert task["priority"] == 5 assert task["queue"] == "billing" for key in ("task_id", "title", "priority", "queue", "payload", "created_at", "created_by", "ttl_seconds", "tags"): assert key in task, f"task missing key: {key}" def test_empty_queue_json_is_compact(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _claim_ns(json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_claim(args) raw = capsys.readouterr().out.strip() assert "\n" not in raw json.loads(raw) def test_empty_queue_json_schema(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _claim_ns(json_out=True, queue="myqueue") with _patch_repo(repo): with pytest.raises(SystemExit): run_claim(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "empty" assert out["queue"] == "myqueue" assert "duration_ms" in out assert "schema" in out def test_duration_ms_is_float(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "Task") args = _claim_ns(json_out=True) with _patch_repo(repo): run_claim(args) out = json.loads(capsys.readouterr().out) assert isinstance(out["duration_ms"], float) def test_claimer_run_id_matches_arg(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "Task") args = _claim_ns(run_id="my-special-agent", json_out=True) with _patch_repo(repo): run_claim(args) out = json.loads(capsys.readouterr().out) assert out["claimer_run_id"] == "my-special-agent" class TestClaimTextOutput: """Text-format output is human-readable and ANSI-safe.""" def test_success_shows_task_claimed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "My important task") args = _claim_ns(json_out=False) with _patch_repo(repo): run_claim(args) out = capsys.readouterr().out assert "Task claimed" in out def test_success_shows_title(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "Rename billing function") args = _claim_ns(json_out=False) with _patch_repo(repo): run_claim(args) out = capsys.readouterr().out assert "Rename billing function" in out def test_success_shows_expires_in(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "Task") args = _claim_ns(json_out=False, claim_ttl=1800) with _patch_repo(repo): run_claim(args) out = capsys.readouterr().out assert "Expires in" in out or "expires in" in out.lower() def test_success_shows_payload(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "Task", payload={"op": "rename"}) args = _claim_ns(json_out=False) with _patch_repo(repo): run_claim(args) out = capsys.readouterr().out assert "rename" in out def test_success_shows_tags(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "Task", tags=["billing", "refactor"]) args = _claim_ns(json_out=False) with _patch_repo(repo): run_claim(args) out = capsys.readouterr().out assert "billing" in out def test_ansi_in_run_id_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "Task") args = _claim_ns(run_id="\x1b[31mmalicious\x1b[0m", json_out=False) with _patch_repo(repo): run_claim(args) out = capsys.readouterr().out assert "\x1b" not in out def test_ansi_in_title_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "\x1b[31mRed title\x1b[0m") args = _claim_ns(json_out=False) with _patch_repo(repo): run_claim(args) out = capsys.readouterr().out assert "\x1b" not in out def test_empty_text_shows_empty(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _claim_ns(json_out=False, queue="billing") with _patch_repo(repo): with pytest.raises(SystemExit): run_claim(args) out = capsys.readouterr().out assert "empty" in out.lower() assert "billing" in out class TestClaimWait: """--wait polls until a task appears or the deadline expires.""" def test_wait_zero_returns_immediately_on_empty(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _claim_ns(wait=0, json_out=True) start = time.monotonic() with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_claim(args) elapsed = time.monotonic() - start assert exc.value.code == 1 assert elapsed < 1.0 def test_wait_times_out_on_empty_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _claim_ns(wait=1, json_out=True) # 1 s wait start = time.monotonic() with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_claim(args) elapsed = time.monotonic() - start assert exc.value.code == 1 assert elapsed >= 0.9, f"wait ended too early: {elapsed:.2f}s" out = json.loads(capsys.readouterr().out) assert out["status"] == "empty" def test_wait_finds_task_created_during_wait(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Task created 0.3 s into a 3 s wait must be found.""" repo = _make_repo(tmp_path) def _create_after_delay() -> None: time.sleep(0.3) create_task(repo, "Delayed task") t = threading.Thread(target=_create_after_delay, daemon=True) t.start() args = _claim_ns(wait=3, json_out=True) with _patch_repo(repo): run_claim(args) t.join() out = json.loads(capsys.readouterr().out) assert out["status"] == "claimed" assert out["task"]["title"] == "Delayed task" def test_wait_empty_json_includes_elapsed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _claim_ns(wait=1, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_claim(args) out = json.loads(capsys.readouterr().out) assert out["duration_ms"] >= 0.9 def test_wait_text_mentions_wait_on_empty(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _claim_ns(wait=1, json_out=False) with _patch_repo(repo): with pytest.raises(SystemExit): run_claim(args) out = capsys.readouterr().out # Text output should mention the wait duration assert "after" in out.lower() or "1" in out class TestClaimQueueFilter: """--queue restricts claiming to one named queue.""" def test_queue_filter_claims_correct_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "billing task", queue="billing") create_task(repo, "auth task", queue="auth") args = _claim_ns(queue="auth", json_out=True) with _patch_repo(repo): run_claim(args) out = json.loads(capsys.readouterr().out) assert out["task"]["queue"] == "auth" assert out["task"]["title"] == "auth task" def test_queue_filter_empty_when_wrong_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "billing task", queue="billing") args = _claim_ns(queue="auth", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_claim(args) assert exc.value.code == 1 out = json.loads(capsys.readouterr().out) assert out["status"] == "empty" def test_no_queue_filter_claims_highest_priority(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "low priority", queue="q1", priority=0) create_task(repo, "high priority", queue="q2", priority=10) args = _claim_ns(queue=None, json_out=True) with _patch_repo(repo): run_claim(args) out = json.loads(capsys.readouterr().out) assert out["task"]["title"] == "high priority" def test_queue_filter_with_valid_chars(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "task", queue="my_queue-01") args = _claim_ns(queue="my_queue-01", json_out=True) with _patch_repo(repo): run_claim(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "claimed" class TestClaimIntegration: """Full lifecycle: enqueue → claim → complete/fail.""" def test_claim_then_complete(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) task = create_task(repo, "Complete me") args_c = _claim_ns(run_id="worker", json_out=True) with _patch_repo(repo): run_claim(args_c) out_c = json.loads(capsys.readouterr().out) assert out_c["status"] == "claimed" args_done = _namespace(task_id=task.task_id, run_id="worker", result='{"ok": true}', json_out=True) with _patch_repo(repo): run_complete(args_done) out_done = json.loads(capsys.readouterr().out) assert out_done["status"] == "completed" def test_claim_then_fail(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) task = create_task(repo, "Fail me") args_c = _claim_ns(run_id="worker", json_out=True) with _patch_repo(repo): run_claim(args_c) capsys.readouterr() args_f = _namespace(task_id=task.task_id, run_id="worker", error="network timeout", json_out=True) with _patch_repo(repo): run_fail_task(args_f) out = json.loads(capsys.readouterr().out) assert out["status"] == "failed" def test_second_claim_empty_after_first(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "Only task") args = _claim_ns(json_out=True) with _patch_repo(repo): run_claim(args) capsys.readouterr() with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_claim(args) assert exc.value.code == 1 def test_priority_order_across_claims(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "Low", priority=0) create_task(repo, "High", priority=9) create_task(repo, "Mid", priority=5) titles = [] for _ in range(3): args = _claim_ns(json_out=True) with _patch_repo(repo): run_claim(args) out = json.loads(capsys.readouterr().out) titles.append(out["task"]["title"]) assert titles == ["High", "Mid", "Low"] def test_enqueue_claim_complete_via_cli(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Full round-trip using CLI commands only.""" repo = _make_repo(tmp_path) # Enqueue args_e = _enqueue_ns(title="CLI round-trip", payload='{"x": 1}', json_out=True) with _patch_repo(repo): run_enqueue(args_e) enq = json.loads(capsys.readouterr().out) task_id = enq["task_id"] # Claim args_c = _claim_ns(run_id="cli-worker", json_out=True) with _patch_repo(repo): run_claim(args_c) clm = json.loads(capsys.readouterr().out) assert clm["status"] == "claimed" assert clm["task"]["payload"]["x"] == 1 # Complete args_done = _namespace(task_id=task_id, run_id="cli-worker", result='{"done": true}', json_out=True) with _patch_repo(repo): run_complete(args_done) done = json.loads(capsys.readouterr().out) assert done["status"] == "completed" class TestClaimStress: """Concurrent claiming and performance under load.""" def test_concurrent_claim_no_double_claim(self, tmp_path: pathlib.Path) -> None: """10 tasks + 20 competing agents — no task claimed twice.""" repo = _make_repo(tmp_path) for i in range(10): create_task(repo, f"Task {i}") claimed_ids: list[str] = [] lock = threading.Lock() def _agent(n: int) -> None: try: result = claim_next_task(repo, f"agent-{n}") if result is not None: task, _ = result with lock: claimed_ids.append(task.task_id) except Exception: pass threads = [threading.Thread(target=_agent, args=(i,)) for i in range(20)] for t in threads: t.start() for t in threads: t.join() assert len(claimed_ids) == len(set(claimed_ids)), "task claimed twice" assert len(claimed_ids) <= 10 def test_claim_100_tasks_sequential(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) for i in range(100): create_task(repo, f"Task {i}", queue="load") start = time.monotonic() claimed = 0 while claimed < 100: args = _claim_ns(queue="load", json_out=True) with _patch_repo(repo): try: run_claim(args) claimed += 1 capsys.readouterr() except SystemExit: break elapsed = time.monotonic() - start assert claimed == 100 assert elapsed < 15.0, f"100 sequential claims took {elapsed:.2f}s" def test_wait_zero_performance(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """--wait 0 on an empty queue must return in < 0.5 s.""" repo = _make_repo(tmp_path) for _ in range(5): args = _claim_ns(wait=0, json_out=True) start = time.monotonic() with _patch_repo(repo): with pytest.raises(SystemExit): run_claim(args) elapsed = time.monotonic() - start assert elapsed < 0.5, f"wait=0 took {elapsed:.3f}s" capsys.readouterr() class TestCliComplete: """run_complete: success, wrong claimer, missing task.""" def _setup(self, repo: pathlib.Path) -> TaskRecord: t = create_task(repo, "Completable task") claim_next_task(repo, "agent-1") return t def test_complete_success_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _namespace(task_id=t.task_id, run_id="agent-1", result="{}", json_out=True) with _patch_repo(repo): run_complete(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "completed" def test_complete_with_result(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _namespace(task_id=t.task_id, run_id="agent-1", result='{"pr": 99}', json_out=True) with _patch_repo(repo): run_complete(args) out = json.loads(capsys.readouterr().out) assert out["result"] == {"pr": 99} def test_complete_wrong_claimer_exits_1(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _namespace(task_id=t.task_id, run_id="impostor", result="{}", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_complete(args) assert exc.value.code == 1 def test_complete_invalid_result_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _namespace(task_id=t.task_id, run_id="agent-1", result="NOTJSON", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_complete(args) assert exc.value.code == 1 def test_complete_text_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _namespace(task_id=t.task_id, run_id="agent-1", result="{}", json_out=False) with _patch_repo(repo): run_complete(args) out = capsys.readouterr().out assert "completed" in out.lower() # ── complete hardening ──────────────────────────────────────────────────────── from muse.cli.commands.task_queue import _MAX_RESULT_BYTES def _complete_ns(**kwargs: MsgpackValue) -> argparse.Namespace: """Build a Namespace with complete-appropriate defaults.""" defaults = { "json_out": True, "run_id": "agent-1", "task_id": VALID_ID, "result": "{}", } defaults.update(kwargs) return argparse.Namespace(**defaults) class TestCompleteInputValidation: """All complete validation fires before require_repo() and exits 1.""" def _setup(self, repo: pathlib.Path) -> TaskRecord: t = create_task(repo, "Complete-me") claim_next_task(repo, "agent-1") return t def test_run_id_too_long_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, run_id="x" * 257) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_complete(args) assert exc.value.code == ExitCode.USER_ERROR def test_run_id_at_max_length_passes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, run_id="a" * 256) # claim was made by "agent-1" — different run_id triggers PermissionError # but validation itself must pass (no bad_args exit before I/O) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_complete(args) # exits 1 due to wrong claimer, NOT bad_args — validation passed out = json.loads(capsys.readouterr().out) assert out.get("status") != "bad_args" def test_result_not_json_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, result="not-json") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_complete(args) assert exc.value.code == ExitCode.USER_ERROR def test_result_array_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, result="[1, 2, 3]") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_complete(args) assert exc.value.code == ExitCode.USER_ERROR def test_result_scalar_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, result="42") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_complete(args) assert exc.value.code == ExitCode.USER_ERROR def test_result_null_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, result="null") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_complete(args) assert exc.value.code == ExitCode.USER_ERROR def test_result_too_large_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) big = json.dumps({"k": "v" * (_MAX_RESULT_BYTES + 1)}) args = _complete_ns(task_id=t.task_id, result=big) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_complete(args) assert exc.value.code == ExitCode.USER_ERROR def test_result_at_max_size_passes_validation(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) # Build compact JSON exactly at the limit using explicit separators prefix = '{"k":"' suffix = '"}' inner = "x" * (_MAX_RESULT_BYTES - len(prefix) - len(suffix)) payload = prefix + inner + suffix assert len(payload.encode()) == _MAX_RESULT_BYTES args = _complete_ns(task_id=t.task_id, result=payload) with _patch_repo(repo): run_complete(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "completed" def test_invalid_task_id_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _complete_ns(task_id="not-a-content-id") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_complete(args) assert exc.value.code == ExitCode.USER_ERROR def test_path_traversal_task_id_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _complete_ns(task_id="../../etc/passwd") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_complete(args) assert exc.value.code == ExitCode.USER_ERROR def test_null_byte_task_id_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _complete_ns(task_id="12345678-1234-4abc-8abc-123456789\x00ab") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_complete(args) assert exc.value.code == ExitCode.USER_ERROR def test_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None: """require_repo must NOT be called when task_id is invalid.""" call_count = {"n": 0} original = __import__("muse.core.repo", fromlist=["require_repo"]).require_repo def counting_require_repo() -> pathlib.Path: call_count["n"] += 1 return original() args = _complete_ns(task_id="BADUUID") with patch("muse.cli.commands.task_queue.require_repo", counting_require_repo): with pytest.raises(SystemExit): run_complete(args) assert call_count["n"] == 0, "require_repo was called before task_id validation" def test_json_error_shape_bad_task_id(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _complete_ns(task_id="bad-id", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_complete(args) out = json.loads(capsys.readouterr().out) assert "error" in out assert out["status"] == "bad_task_id" def test_json_error_shape_bad_result(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _complete_ns(task_id=VALID_ID, result="NOTJSON", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_complete(args) out = json.loads(capsys.readouterr().out) assert "error" in out assert out["status"] == "bad_args" def test_text_error_goes_to_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _complete_ns(task_id="BADUUID", json_out=False) with _patch_repo(repo): with pytest.raises(SystemExit): run_complete(args) captured = capsys.readouterr() assert captured.out == "" assert "❌" in captured.err def test_json_error_goes_to_stdout_not_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _complete_ns(task_id="BADUUID", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_complete(args) captured = capsys.readouterr() assert captured.err == "" out = json.loads(captured.out) assert "error" in out class TestCompleteJsonOutput: """run_complete JSON output shape and compactness.""" def _setup(self, repo: pathlib.Path) -> TaskRecord: t = create_task(repo, "JSON task", queue="billing") claim_next_task(repo, "completer-1") return t def test_json_is_compact(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, run_id="completer-1") with _patch_repo(repo): run_complete(args) raw = capsys.readouterr().out.strip() assert "\n" not in raw, "JSON output must be single line (compact)" def test_json_has_required_keys(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, run_id="completer-1") with _patch_repo(repo): run_complete(args) out = json.loads(capsys.readouterr().out) for key in ("schema", "task_id", "claimer_run_id", "status", "claimed_at", "expires_at", "result", "duration_ms"): assert key in out, f"missing key: {key}" def test_json_status_is_completed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, run_id="completer-1") with _patch_repo(repo): run_complete(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "completed" def test_json_result_field_populated(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, run_id="completer-1", result='{"pr_url": "http://x/1"}') with _patch_repo(repo): run_complete(args) out = json.loads(capsys.readouterr().out) assert out["result"] == {"pr_url": "http://x/1"} def test_json_empty_result_is_null(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Empty dict {} collapses to null in the claim (falsy check).""" repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, run_id="completer-1", result="{}") with _patch_repo(repo): run_complete(args) out = json.loads(capsys.readouterr().out) assert out["result"] is None def test_json_elapsed_is_float(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, run_id="completer-1") with _patch_repo(repo): run_complete(args) out = json.loads(capsys.readouterr().out) assert isinstance(out["duration_ms"], float) def test_json_claimer_run_id_matches(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, run_id="completer-1") with _patch_repo(repo): run_complete(args) out = json.loads(capsys.readouterr().out) assert out["claimer_run_id"] == "completer-1" def test_json_wrong_claimer_error_shape(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, run_id="impostor", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_complete(args) assert exc.value.code == 1 out = json.loads(capsys.readouterr().out) assert "error" in out def test_json_missing_task_error_shape(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) # repo exists but no task was created args = _complete_ns(task_id=VALID_ID, run_id="completer-1", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_complete(args) assert exc.value.code == 1 out = json.loads(capsys.readouterr().out) assert "error" in out class TestCompleteTextOutput: """run_complete text output content.""" def _setup(self, repo: pathlib.Path, title: str = "My Task", queue: str = "default") -> TaskRecord: t = create_task(repo, title, queue=queue) claim_next_task(repo, "agent-1") return t def test_text_shows_completed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, run_id="agent-1", json_out=False) with _patch_repo(repo): run_complete(args) out = capsys.readouterr().out assert "completed" in out.lower() def test_text_shows_task_title(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo, title="Rename billing module") args = _complete_ns(task_id=t.task_id, run_id="agent-1", json_out=False) with _patch_repo(repo): run_complete(args) out = capsys.readouterr().out assert "Rename billing module" in out def test_text_shows_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo, queue="billing") args = _complete_ns(task_id=t.task_id, run_id="agent-1", json_out=False) with _patch_repo(repo): run_complete(args) out = capsys.readouterr().out assert "billing" in out def test_text_shows_claimer(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, run_id="agent-1", json_out=False) with _patch_repo(repo): run_complete(args) out = capsys.readouterr().out assert "agent-1" in out def test_text_shows_result_when_provided(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, run_id="agent-1", result='{"pr": 42}', json_out=False) with _patch_repo(repo): run_complete(args) out = capsys.readouterr().out assert "42" in out def test_text_shows_elapsed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _complete_ns(task_id=t.task_id, run_id="agent-1", json_out=False) with _patch_repo(repo): run_complete(args) out = capsys.readouterr().out assert "s)" in out # e.g. "(0.003s)" def test_ansi_injection_in_run_id_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: malicious_id = "agent\x1b[31mRED\x1b[0m" repo = _make_repo(tmp_path) t = create_task(repo, "task") claim_next_task(repo, malicious_id) args = _complete_ns(task_id=t.task_id, run_id=malicious_id, json_out=False) with _patch_repo(repo): run_complete(args) out = capsys.readouterr().out assert "\x1b" not in out def test_ansi_injection_in_title_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: malicious_title = "task\x1b[1mBOLD\x1b[0m" repo = _make_repo(tmp_path) t = create_task(repo, malicious_title) claim_next_task(repo, "agent-1") args = _complete_ns(task_id=t.task_id, run_id="agent-1", json_out=False) with _patch_repo(repo): run_complete(args) out = capsys.readouterr().out assert "\x1b" not in out class TestCompleteIntegration: """Full lifecycle integration tests for run_complete.""" def test_completed_status_persisted_to_disk(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "persist-me") claim_next_task(repo, "worker") args = _complete_ns(task_id=t.task_id, run_id="worker") with _patch_repo(repo): run_complete(args) claim = load_claim(repo, t.task_id) assert claim is not None assert claim.status == "completed" def test_result_persisted_to_disk(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "result-me") claim_next_task(repo, "worker") args = _complete_ns(task_id=t.task_id, run_id="worker", result='{"sha": "abc123"}') with _patch_repo(repo): run_complete(args) claim = load_claim(repo, t.task_id) assert claim is not None assert claim.result == {"sha": "abc123"} def test_double_complete_fails(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "once only") claim_next_task(repo, "worker") args = _complete_ns(task_id=t.task_id, run_id="worker") with _patch_repo(repo): run_complete(args) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_complete(args) assert exc.value.code == 1 def test_complete_missing_claim_fails(self, tmp_path: pathlib.Path) -> None: """Task exists but was never claimed.""" repo = _make_repo(tmp_path) t = create_task(repo, "unclaimed") args = _complete_ns(task_id=t.task_id, run_id="worker") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_complete(args) assert exc.value.code == 1 def test_enqueue_claim_complete_full_cycle(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Full CLI round-trip: enqueue → claim → complete.""" repo = _make_repo(tmp_path) # enqueue eq_args = _enqueue_ns(title="e2e task", queue="default") with _patch_repo(repo): run_enqueue(eq_args) task_id = json.loads(capsys.readouterr().out)["task_id"] # claim cl_args = _claim_ns(run_id="e2e-agent") with _patch_repo(repo): run_claim(cl_args) capsys.readouterr() # complete co_args = _complete_ns(task_id=task_id, run_id="e2e-agent", result='{"done": true}') with _patch_repo(repo): run_complete(co_args) out = json.loads(capsys.readouterr().out) assert out["status"] == "completed" assert out["result"] == {"done": True} def test_complete_with_unicode_result(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "unicode") claim_next_task(repo, "agent-1") args = _complete_ns(task_id=t.task_id, run_id="agent-1", result='{"msg": "héllo wörld 🎉"}') with _patch_repo(repo): run_complete(args) out = json.loads(capsys.readouterr().out) assert out["result"]["msg"] == "héllo wörld 🎉" class TestCompleteStress: """Concurrency and throughput stress tests for run_complete.""" def test_20_agents_each_claim_and_complete_unique_task(self, tmp_path: pathlib.Path) -> None: """20 concurrent agents each claim+complete a unique task with no conflicts.""" import concurrent.futures repo = _make_repo(tmp_path) tasks = [create_task(repo, f"task-{i}") for i in range(20)] completed: set[str] = set() lock = threading.Lock() errors: list[str] = [] def claim_and_complete(task: "TaskRecord") -> None: run_id = f"agent-{task.task_id[:8]}" result = claim_next_task(repo, run_id, queue=task.queue) if result is None: errors.append(f"no task for {run_id}") return claimed_task, _ = result try: complete_task(repo, claimed_task.task_id, run_id) except Exception as exc: # noqa: BLE001 errors.append(str(exc)) return with lock: completed.add(claimed_task.task_id) with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool: list(pool.map(claim_and_complete, tasks)) assert not errors, f"errors: {errors}" assert len(completed) == 20, f"only {len(completed)}/20 completed" def test_100_sequential_completes_under_10s(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) for i in range(100): t = create_task(repo, f"seq-{i}") claim_next_task(repo, "batch-worker") start = time.monotonic() complete_task(repo, t.task_id, "batch-worker") assert time.monotonic() - start < 0.15, f"task {i} took too long" def test_complete_via_run_complete_100_sequential(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) start = time.monotonic() for i in range(100): t = create_task(repo, f"cli-{i}") claim_next_task(repo, "cli-worker") args = _complete_ns(task_id=t.task_id, run_id="cli-worker") with _patch_repo(repo): run_complete(args) capsys.readouterr() elapsed = time.monotonic() - start assert elapsed < 15.0, f"100 CLI completes took {elapsed:.1f}s" class TestCliFailTask: """run_fail_task: success, wrong claimer, text/json output.""" def _setup(self, repo: pathlib.Path) -> TaskRecord: t = create_task(repo, "Failing task") claim_next_task(repo, "agent-1") return t def test_fail_success_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _namespace(task_id=t.task_id, run_id="agent-1", error="network down", json_out=True) with _patch_repo(repo): run_fail_task(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "failed" assert out["error"] == "network down" def test_fail_wrong_claimer_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _namespace(task_id=t.task_id, run_id="wrong", error="x", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_fail_task(args) assert exc.value.code == 1 def test_fail_text_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _namespace(task_id=t.task_id, run_id="agent-1", error="boom", json_out=False) with _patch_repo(repo): run_fail_task(args) out = capsys.readouterr().out assert "failed" in out.lower() # ── fail-task hardening ─────────────────────────────────────────────────────── from muse.cli.commands.task_queue import _MAX_ERROR_LEN def _fail_ns(**kwargs: MsgpackValue) -> argparse.Namespace: """Build a Namespace with fail-task-appropriate defaults.""" defaults = { "json_out": True, "run_id": "agent-1", "task_id": VALID_ID, "error": "something went wrong", } defaults.update(kwargs) return argparse.Namespace(**defaults) class TestFailTaskInputValidation: """All fail-task validation fires before require_repo() and exits 1.""" def _setup(self, repo: pathlib.Path) -> TaskRecord: t = create_task(repo, "Failable task") claim_next_task(repo, "agent-1") return t def test_run_id_too_long_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _fail_ns(task_id=t.task_id, run_id="x" * 257) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_fail_task(args) assert exc.value.code == ExitCode.USER_ERROR def test_run_id_at_max_length_passes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) # 256-char run_id is valid length; mismatch with claimer triggers PermissionError args = _fail_ns(task_id=t.task_id, run_id="b" * 256) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_fail_task(args) out = json.loads(capsys.readouterr().out) # Exits 1 due to wrong claimer, NOT bad_args — length validation passed assert out.get("status") != "bad_args" def test_error_too_long_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _fail_ns(task_id=t.task_id, error="e" * (_MAX_ERROR_LEN + 1)) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_fail_task(args) assert exc.value.code == ExitCode.USER_ERROR def test_error_at_max_length_passes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _fail_ns(task_id=t.task_id, run_id="agent-1", error="e" * _MAX_ERROR_LEN) with _patch_repo(repo): run_fail_task(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "failed" def test_empty_error_allowed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _fail_ns(task_id=t.task_id, run_id="agent-1", error="") with _patch_repo(repo): run_fail_task(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "failed" def test_invalid_task_id_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _fail_ns(task_id="not-a-content-id") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_fail_task(args) assert exc.value.code == ExitCode.USER_ERROR def test_path_traversal_task_id_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _fail_ns(task_id="../../etc/passwd") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_fail_task(args) assert exc.value.code == ExitCode.USER_ERROR def test_null_byte_task_id_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _fail_ns(task_id="12345678-1234-4abc-8abc-123456789\x00ab") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_fail_task(args) assert exc.value.code == ExitCode.USER_ERROR def test_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None: """require_repo must NOT be called when task_id is invalid.""" call_count = {"n": 0} def counting_require_repo() -> pathlib.Path: call_count["n"] += 1 raise RuntimeError("should not reach here") args = _fail_ns(task_id="BADUUID") with patch("muse.cli.commands.task_queue.require_repo", counting_require_repo): with pytest.raises(SystemExit): run_fail_task(args) assert call_count["n"] == 0, "require_repo called before task_id validation" def test_run_id_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None: call_count = {"n": 0} def counting_require_repo() -> pathlib.Path: call_count["n"] += 1 raise RuntimeError("should not reach here") args = _fail_ns(task_id=VALID_ID, run_id="r" * 300) with patch("muse.cli.commands.task_queue.require_repo", counting_require_repo): with pytest.raises(SystemExit): run_fail_task(args) assert call_count["n"] == 0 def test_error_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None: call_count = {"n": 0} def counting_require_repo() -> pathlib.Path: call_count["n"] += 1 raise RuntimeError("should not reach here") args = _fail_ns(task_id=VALID_ID, error="e" * 5000) with patch("muse.cli.commands.task_queue.require_repo", counting_require_repo): with pytest.raises(SystemExit): run_fail_task(args) assert call_count["n"] == 0 def test_json_error_shape_bad_task_id(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _fail_ns(task_id="bad-id", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_fail_task(args) out = json.loads(capsys.readouterr().out) assert "error" in out assert out["status"] == "bad_task_id" def test_json_error_shape_bad_run_id(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _fail_ns(task_id=VALID_ID, run_id="x" * 300, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_fail_task(args) out = json.loads(capsys.readouterr().out) assert "error" in out assert out["status"] == "bad_args" def test_json_error_shape_bad_error_msg(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _fail_ns(task_id=VALID_ID, error="e" * 5000, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_fail_task(args) out = json.loads(capsys.readouterr().out) assert "error" in out assert out["status"] == "bad_args" def test_text_error_goes_to_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _fail_ns(task_id="BADUUID", json_out=False) with _patch_repo(repo): with pytest.raises(SystemExit): run_fail_task(args) captured = capsys.readouterr() assert captured.out == "" assert "❌" in captured.err def test_json_error_goes_to_stdout_not_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _fail_ns(task_id="BADUUID", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_fail_task(args) captured = capsys.readouterr() assert captured.err == "" out = json.loads(captured.out) assert "error" in out class TestFailTaskJsonOutput: """run_fail_task JSON output shape and compactness.""" def _setup(self, repo: pathlib.Path, queue: str = "default") -> TaskRecord: t = create_task(repo, "JSON fail task", queue=queue) claim_next_task(repo, "failer-1") return t def test_json_is_compact(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _fail_ns(task_id=t.task_id, run_id="failer-1") with _patch_repo(repo): run_fail_task(args) raw = capsys.readouterr().out.strip() assert "\n" not in raw, "JSON output must be single line (compact)" def test_json_has_required_keys(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _fail_ns(task_id=t.task_id, run_id="failer-1") with _patch_repo(repo): run_fail_task(args) out = json.loads(capsys.readouterr().out) for key in ("schema", "task_id", "claimer_run_id", "status", "claimed_at", "expires_at", "error", "duration_ms"): assert key in out, f"missing key: {key}" def test_json_status_is_failed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _fail_ns(task_id=t.task_id, run_id="failer-1") with _patch_repo(repo): run_fail_task(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "failed" def test_json_error_field_populated(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _fail_ns(task_id=t.task_id, run_id="failer-1", error="connection refused on port 5432") with _patch_repo(repo): run_fail_task(args) out = json.loads(capsys.readouterr().out) assert out["error"] == "connection refused on port 5432" def test_json_empty_error_is_empty_string(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _fail_ns(task_id=t.task_id, run_id="failer-1", error="") with _patch_repo(repo): run_fail_task(args) out = json.loads(capsys.readouterr().out) assert out["error"] == "" def test_json_elapsed_is_float(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _fail_ns(task_id=t.task_id, run_id="failer-1") with _patch_repo(repo): run_fail_task(args) out = json.loads(capsys.readouterr().out) assert isinstance(out["duration_ms"], float) def test_json_claimer_run_id_matches(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _fail_ns(task_id=t.task_id, run_id="failer-1") with _patch_repo(repo): run_fail_task(args) out = json.loads(capsys.readouterr().out) assert out["claimer_run_id"] == "failer-1" def test_json_wrong_claimer_error_shape(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _fail_ns(task_id=t.task_id, run_id="impostor", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_fail_task(args) assert exc.value.code == 1 out = json.loads(capsys.readouterr().out) assert "error" in out def test_json_missing_task_error_shape(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _fail_ns(task_id=VALID_ID, run_id="failer-1", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_fail_task(args) assert exc.value.code == 1 out = json.loads(capsys.readouterr().out) assert "error" in out class TestFailTaskTextOutput: """run_fail_task text output content.""" def _setup(self, repo: pathlib.Path, title: str = "My Task", queue: str = "default") -> TaskRecord: t = create_task(repo, title, queue=queue) claim_next_task(repo, "agent-1") return t def test_text_shows_failed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _fail_ns(task_id=t.task_id, run_id="agent-1", json_out=False) with _patch_repo(repo): run_fail_task(args) out = capsys.readouterr().out assert "failed" in out.lower() def test_text_shows_task_title(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo, title="Deploy billing service") args = _fail_ns(task_id=t.task_id, run_id="agent-1", json_out=False) with _patch_repo(repo): run_fail_task(args) out = capsys.readouterr().out assert "Deploy billing service" in out def test_text_shows_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo, queue="infra") args = _fail_ns(task_id=t.task_id, run_id="agent-1", json_out=False) with _patch_repo(repo): run_fail_task(args) out = capsys.readouterr().out assert "infra" in out def test_text_shows_claimer(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _fail_ns(task_id=t.task_id, run_id="agent-1", json_out=False) with _patch_repo(repo): run_fail_task(args) out = capsys.readouterr().out assert "agent-1" in out def test_text_shows_error_message(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _fail_ns(task_id=t.task_id, run_id="agent-1", error="disk full on /data", json_out=False) with _patch_repo(repo): run_fail_task(args) out = capsys.readouterr().out assert "disk full on /data" in out def test_text_no_error_line_when_empty(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _fail_ns(task_id=t.task_id, run_id="agent-1", error="", json_out=False) with _patch_repo(repo): run_fail_task(args) out = capsys.readouterr().out assert "Error:" not in out def test_text_shows_elapsed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) args = _fail_ns(task_id=t.task_id, run_id="agent-1", json_out=False) with _patch_repo(repo): run_fail_task(args) out = capsys.readouterr().out assert "s)" in out def test_ansi_injection_in_error_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup(repo) malicious_err = "error\x1b[31mRED\x1b[0m" args = _fail_ns(task_id=t.task_id, run_id="agent-1", error=malicious_err, json_out=False) with _patch_repo(repo): run_fail_task(args) out = capsys.readouterr().out assert "\x1b" not in out def test_ansi_injection_in_title_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: malicious_title = "task\x1b[1mBOLD\x1b[0m" repo = _make_repo(tmp_path) t = create_task(repo, malicious_title) claim_next_task(repo, "agent-1") args = _fail_ns(task_id=t.task_id, run_id="agent-1", json_out=False) with _patch_repo(repo): run_fail_task(args) out = capsys.readouterr().out assert "\x1b" not in out class TestFailTaskIntegration: """Full lifecycle integration tests for run_fail_task.""" def test_failed_status_persisted_to_disk(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "persist-fail") claim_next_task(repo, "worker") args = _fail_ns(task_id=t.task_id, run_id="worker", error="timeout") with _patch_repo(repo): run_fail_task(args) claim = load_claim(repo, t.task_id) assert claim is not None assert claim.status == "failed" assert claim.error == "timeout" def test_error_persisted_to_disk(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "error-persist") claim_next_task(repo, "worker") args = _fail_ns(task_id=t.task_id, run_id="worker", error="OOM at step 3: allocated 16 GiB") with _patch_repo(repo): run_fail_task(args) claim = load_claim(repo, t.task_id) assert claim is not None assert "OOM at step 3" in claim.error def test_double_fail_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "double-fail") claim_next_task(repo, "worker") args = _fail_ns(task_id=t.task_id, run_id="worker") with _patch_repo(repo): run_fail_task(args) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_fail_task(args) assert exc.value.code == 1 def test_fail_unclaimed_task_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "unclaimed") args = _fail_ns(task_id=t.task_id, run_id="worker") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_fail_task(args) assert exc.value.code == 1 def test_enqueue_claim_fail_full_cycle(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Full CLI round-trip: enqueue → claim → fail.""" repo = _make_repo(tmp_path) eq_args = _enqueue_ns(title="e2e fail task", queue="default") with _patch_repo(repo): run_enqueue(eq_args) task_id = json.loads(capsys.readouterr().out)["task_id"] cl_args = _claim_ns(run_id="e2e-worker") with _patch_repo(repo): run_claim(cl_args) capsys.readouterr() fa_args = _fail_ns(task_id=task_id, run_id="e2e-worker", error="dependency unavailable") with _patch_repo(repo): run_fail_task(fa_args) out = json.loads(capsys.readouterr().out) assert out["status"] == "failed" assert "dependency unavailable" in out["error"] def test_unicode_error_message(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "unicode-fail") claim_next_task(repo, "agent-1") args = _fail_ns(task_id=t.task_id, run_id="agent-1", error="échec: fichier introuvable — erreur 404 🚫") with _patch_repo(repo): run_fail_task(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "failed" assert "échec" in out["error"] class TestFailTaskStress: """Concurrency and throughput stress tests for run_fail_task.""" def test_20_agents_each_claim_and_fail_unique_task(self, tmp_path: pathlib.Path) -> None: """20 concurrent agents each claim+fail a unique task with no conflicts.""" import concurrent.futures repo = _make_repo(tmp_path) tasks = [create_task(repo, f"stress-{i}") for i in range(20)] failed_ids: set[str] = set() lock = threading.Lock() errors: list[str] = [] def claim_and_fail(task: "TaskRecord") -> None: run_id = f"agent-{task.task_id[:8]}" result = claim_next_task(repo, run_id, queue=task.queue) if result is None: errors.append(f"no task for {run_id}") return claimed_task, _ = result try: fail_task(repo, claimed_task.task_id, run_id, error="stress test") except Exception as exc: # noqa: BLE001 errors.append(str(exc)) return with lock: failed_ids.add(claimed_task.task_id) with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool: list(pool.map(claim_and_fail, tasks)) assert not errors, f"errors: {errors}" assert len(failed_ids) == 20, f"only {len(failed_ids)}/20 failed" def test_100_sequential_fails_under_15s(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) start = time.monotonic() for i in range(100): t = create_task(repo, f"seq-fail-{i}") claim_next_task(repo, "batch-failer") fail_task(repo, t.task_id, "batch-failer", error=f"error {i}") elapsed = time.monotonic() - start assert elapsed < 15.0, f"100 sequential fails took {elapsed:.1f}s" def test_fail_via_run_fail_task_100_sequential(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) start = time.monotonic() for i in range(100): t = create_task(repo, f"cli-fail-{i}") claim_next_task(repo, "cli-failer") args = _fail_ns(task_id=t.task_id, run_id="cli-failer", error=f"error {i}") with _patch_repo(repo): run_fail_task(args) capsys.readouterr() elapsed = time.monotonic() - start assert elapsed < 15.0, f"100 CLI fails took {elapsed:.1f}s" class TestCliCancelTask: """run_cancel_task: pending cancel, force cancel, error paths.""" def test_cancel_pending_task_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Unwanted task") args = _namespace(task_id=t.task_id, run_id="orchestrator", force=False, json_out=True) with _patch_repo(repo): run_cancel_task(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "cancelled" def test_cancel_claimed_by_claimer(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "My task") claim_next_task(repo, "agent-1") args = _namespace(task_id=t.task_id, run_id="agent-1", force=False, json_out=True) with _patch_repo(repo): run_cancel_task(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "cancelled" def test_cancel_force_different_agent(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Stolen task") claim_next_task(repo, "agent-1") args = _namespace(task_id=t.task_id, run_id="orchestrator", force=True, json_out=True) with _patch_repo(repo): run_cancel_task(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "cancelled" def test_cancel_wrong_claimer_no_force_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "Someone else's task") claim_next_task(repo, "agent-1") args = _namespace(task_id=t.task_id, run_id="agent-2", force=False, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_cancel_task(args) assert exc.value.code == 1 def test_cancel_text_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "To cancel") args = _namespace(task_id=t.task_id, run_id="orchestrator", force=False, json_out=False) with _patch_repo(repo): run_cancel_task(args) out = capsys.readouterr().out assert "cancelled" in out.lower() # ── cancel-task hardening ───────────────────────────────────────────────────── def _cancel_ns(**kwargs: MsgpackValue) -> argparse.Namespace: """Build a Namespace with cancel-task-appropriate defaults.""" defaults = { "json_out": True, "run_id": "orchestrator", "task_id": VALID_ID, "force": False, } defaults.update(kwargs) return argparse.Namespace(**defaults) class TestCancelTaskInputValidation: """All cancel-task validation fires before require_repo() and exits 1.""" def test_run_id_too_long_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _cancel_ns(task_id=VALID_ID, run_id="x" * 257) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_cancel_task(args) assert exc.value.code == ExitCode.USER_ERROR def test_run_id_at_max_length_passes_validation(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "task") # run_id exactly at limit — passes length check; pending task cancels fine args = _cancel_ns(task_id=t.task_id, run_id="c" * 256) with _patch_repo(repo): run_cancel_task(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "cancelled" def test_invalid_task_id_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _cancel_ns(task_id="not-a-content-id") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_cancel_task(args) assert exc.value.code == ExitCode.USER_ERROR def test_path_traversal_task_id_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _cancel_ns(task_id="../../etc/passwd") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_cancel_task(args) assert exc.value.code == ExitCode.USER_ERROR def test_null_byte_task_id_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _cancel_ns(task_id="12345678-1234-4abc-8abc-123456789\x00ab") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_cancel_task(args) assert exc.value.code == ExitCode.USER_ERROR def test_task_id_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None: call_count = {"n": 0} def counting_require_repo() -> pathlib.Path: call_count["n"] += 1 raise RuntimeError("should not be called") args = _cancel_ns(task_id="BADUUID") with patch("muse.cli.commands.task_queue.require_repo", counting_require_repo): with pytest.raises(SystemExit): run_cancel_task(args) assert call_count["n"] == 0, "require_repo called before task_id validation" def test_run_id_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None: call_count = {"n": 0} def counting_require_repo() -> pathlib.Path: call_count["n"] += 1 raise RuntimeError("should not be called") args = _cancel_ns(task_id=VALID_ID, run_id="r" * 300) with patch("muse.cli.commands.task_queue.require_repo", counting_require_repo): with pytest.raises(SystemExit): run_cancel_task(args) assert call_count["n"] == 0 def test_json_error_shape_bad_task_id(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _cancel_ns(task_id="bad-id", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_cancel_task(args) out = json.loads(capsys.readouterr().out) assert "error" in out assert out["status"] == "bad_task_id" def test_json_error_shape_bad_run_id(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _cancel_ns(task_id=VALID_ID, run_id="x" * 300, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_cancel_task(args) out = json.loads(capsys.readouterr().out) assert "error" in out assert out["status"] == "bad_args" def test_text_error_goes_to_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _cancel_ns(task_id="BADUUID", json_out=False) with _patch_repo(repo): with pytest.raises(SystemExit): run_cancel_task(args) captured = capsys.readouterr() assert captured.out == "" assert "❌" in captured.err def test_json_error_goes_to_stdout_not_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _cancel_ns(task_id="BADUUID", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_cancel_task(args) captured = capsys.readouterr() assert captured.err == "" out = json.loads(captured.out) assert "error" in out def test_missing_task_error_shape(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) # Valid UUID but task does not exist args = _cancel_ns(task_id=VALID_ID, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_cancel_task(args) assert exc.value.code == 1 out = json.loads(capsys.readouterr().out) assert "error" in out def test_already_terminal_error_shape(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "terminal") claim_next_task(repo, "agent-1") complete_task(repo, t.task_id, "agent-1") args = _cancel_ns(task_id=t.task_id, run_id="agent-1", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_cancel_task(args) assert exc.value.code == 1 out = json.loads(capsys.readouterr().out) assert "error" in out def test_wrong_claimer_no_force_error_shape(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "owned") claim_next_task(repo, "agent-1") args = _cancel_ns(task_id=t.task_id, run_id="agent-2", force=False, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_cancel_task(args) assert exc.value.code == 1 out = json.loads(capsys.readouterr().out) assert "error" in out class TestCancelTaskJsonOutput: """run_cancel_task JSON output shape and compactness.""" def test_json_is_compact(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "compact task") args = _cancel_ns(task_id=t.task_id) with _patch_repo(repo): run_cancel_task(args) raw = capsys.readouterr().out.strip() assert "\n" not in raw, "JSON output must be single line (compact)" def test_json_has_required_keys(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "keys task") args = _cancel_ns(task_id=t.task_id) with _patch_repo(repo): run_cancel_task(args) out = json.loads(capsys.readouterr().out) for key in ("schema", "task_id", "claimer_run_id", "status", "claimed_at", "expires_at", "error", "duration_ms"): assert key in out, f"missing key: {key}" def test_json_status_is_cancelled(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "status task") args = _cancel_ns(task_id=t.task_id) with _patch_repo(repo): run_cancel_task(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "cancelled" def test_json_elapsed_is_float(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "elapsed task") args = _cancel_ns(task_id=t.task_id) with _patch_repo(repo): run_cancel_task(args) out = json.loads(capsys.readouterr().out) assert isinstance(out["duration_ms"], float) def test_json_claimer_run_id_for_pending_is_caller(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """For a pending task, claimer_run_id is the calling agent.""" repo = _make_repo(tmp_path) t = create_task(repo, "pending task") args = _cancel_ns(task_id=t.task_id, run_id="orchestrator-99") with _patch_repo(repo): run_cancel_task(args) out = json.loads(capsys.readouterr().out) assert out["claimer_run_id"] == "orchestrator-99" def test_json_claimer_run_id_for_claimed_is_original_claimer(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """For a claimed task cancelled by its claimer, run_id is preserved.""" repo = _make_repo(tmp_path) t = create_task(repo, "claimed task") claim_next_task(repo, "agent-xyz") args = _cancel_ns(task_id=t.task_id, run_id="agent-xyz") with _patch_repo(repo): run_cancel_task(args) out = json.loads(capsys.readouterr().out) assert out["claimer_run_id"] == "agent-xyz" def test_json_force_cancel_different_agent(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "force task") claim_next_task(repo, "agent-1") args = _cancel_ns(task_id=t.task_id, run_id="orchestrator", force=True) with _patch_repo(repo): run_cancel_task(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "cancelled" def test_json_task_id_matches(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "id task") args = _cancel_ns(task_id=t.task_id) with _patch_repo(repo): run_cancel_task(args) out = json.loads(capsys.readouterr().out) assert out["task_id"] == t.task_id class TestCancelTaskTextOutput: """run_cancel_task text output content.""" def _setup_pending(self, repo: pathlib.Path, title: str = "Pending Task", queue: str = "default") -> TaskRecord: return create_task(repo, title, queue=queue) def _setup_claimed(self, repo: pathlib.Path, title: str = "Claimed Task", queue: str = "default", claimer: str = "agent-1") -> TaskRecord: t = create_task(repo, title, queue=queue) claim_next_task(repo, claimer) return t def test_text_shows_cancelled(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup_pending(repo) args = _cancel_ns(task_id=t.task_id, json_out=False) with _patch_repo(repo): run_cancel_task(args) out = capsys.readouterr().out assert "cancelled" in out.lower() def test_text_shows_task_title(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup_pending(repo, title="Decommission old infra") args = _cancel_ns(task_id=t.task_id, json_out=False) with _patch_repo(repo): run_cancel_task(args) out = capsys.readouterr().out assert "Decommission old infra" in out def test_text_shows_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup_pending(repo, queue="infra") args = _cancel_ns(task_id=t.task_id, json_out=False) with _patch_repo(repo): run_cancel_task(args) out = capsys.readouterr().out assert "infra" in out def test_text_shows_claimer(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup_pending(repo) args = _cancel_ns(task_id=t.task_id, run_id="orch-42", json_out=False) with _patch_repo(repo): run_cancel_task(args) out = capsys.readouterr().out assert "orch-42" in out def test_text_shows_forced_indicator(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup_claimed(repo, claimer="agent-1") args = _cancel_ns(task_id=t.task_id, run_id="orch", force=True, json_out=False) with _patch_repo(repo): run_cancel_task(args) out = capsys.readouterr().out assert "forced" in out.lower() def test_text_no_forced_indicator_without_flag(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup_claimed(repo, claimer="agent-1") args = _cancel_ns(task_id=t.task_id, run_id="agent-1", force=False, json_out=False) with _patch_repo(repo): run_cancel_task(args) out = capsys.readouterr().out assert "forced" not in out.lower() def test_text_shows_elapsed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = self._setup_pending(repo) args = _cancel_ns(task_id=t.task_id, json_out=False) with _patch_repo(repo): run_cancel_task(args) out = capsys.readouterr().out assert "s)" in out def test_ansi_injection_in_title_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: malicious_title = "task\x1b[1mBOLD\x1b[0m" repo = _make_repo(tmp_path) t = create_task(repo, malicious_title) args = _cancel_ns(task_id=t.task_id, json_out=False) with _patch_repo(repo): run_cancel_task(args) out = capsys.readouterr().out assert "\x1b" not in out def test_ansi_injection_in_run_id_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: malicious_id = "orch\x1b[31mRED\x1b[0m" repo = _make_repo(tmp_path) t = create_task(repo, "task") args = _cancel_ns(task_id=t.task_id, run_id=malicious_id, json_out=False) with _patch_repo(repo): run_cancel_task(args) out = capsys.readouterr().out assert "\x1b" not in out class TestCancelTaskIntegration: """Full lifecycle integration tests for run_cancel_task.""" def test_pending_task_cancelled_status_on_disk(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "pending-cancel") args = _cancel_ns(task_id=t.task_id, run_id="orch") with _patch_repo(repo): run_cancel_task(args) claim = load_claim(repo, t.task_id) assert claim is not None assert claim.status == "cancelled" def test_claimed_task_cancelled_by_claimer(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "claimed-cancel") claim_next_task(repo, "worker") args = _cancel_ns(task_id=t.task_id, run_id="worker") with _patch_repo(repo): run_cancel_task(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "cancelled" claim = load_claim(repo, t.task_id) assert claim.status == "cancelled" def test_force_cancel_different_claimer(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "force-cancel") claim_next_task(repo, "agent-1") args = _cancel_ns(task_id=t.task_id, run_id="orchestrator", force=True) with _patch_repo(repo): run_cancel_task(args) out = json.loads(capsys.readouterr().out) assert out["status"] == "cancelled" def test_double_cancel_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "double-cancel") args = _cancel_ns(task_id=t.task_id) with _patch_repo(repo): run_cancel_task(args) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_cancel_task(args) assert exc.value.code == 1 def test_cancel_completed_task_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "completed-task") claim_next_task(repo, "agent-1") complete_task(repo, t.task_id, "agent-1") args = _cancel_ns(task_id=t.task_id, run_id="agent-1") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_cancel_task(args) assert exc.value.code == 1 def test_cancel_failed_task_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) t = create_task(repo, "failed-task") claim_next_task(repo, "agent-1") fail_task(repo, t.task_id, "agent-1", error="boom") args = _cancel_ns(task_id=t.task_id, run_id="agent-1") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_cancel_task(args) assert exc.value.code == 1 def test_enqueue_then_cancel_full_cycle(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Full CLI round-trip: enqueue → cancel.""" repo = _make_repo(tmp_path) eq_args = _enqueue_ns(title="e2e cancel", queue="default") with _patch_repo(repo): run_enqueue(eq_args) task_id = json.loads(capsys.readouterr().out)["task_id"] ca_args = _cancel_ns(task_id=task_id, run_id="orch") with _patch_repo(repo): run_cancel_task(ca_args) out = json.loads(capsys.readouterr().out) assert out["status"] == "cancelled" assert out["task_id"] == task_id def test_enqueue_claim_cancel_full_cycle(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Full CLI round-trip: enqueue → claim → cancel by claimer.""" repo = _make_repo(tmp_path) eq_args = _enqueue_ns(title="e2e claim-cancel", queue="default") with _patch_repo(repo): run_enqueue(eq_args) task_id = json.loads(capsys.readouterr().out)["task_id"] cl_args = _claim_ns(run_id="e2e-worker") with _patch_repo(repo): run_claim(cl_args) capsys.readouterr() ca_args = _cancel_ns(task_id=task_id, run_id="e2e-worker") with _patch_repo(repo): run_cancel_task(ca_args) out = json.loads(capsys.readouterr().out) assert out["status"] == "cancelled" class TestCancelTaskStress: """Concurrency and throughput stress tests for run_cancel_task.""" def test_20_agents_each_cancel_unique_pending_task(self, tmp_path: pathlib.Path) -> None: """20 concurrent orchestrators each cancel a distinct pending task.""" import concurrent.futures repo = _make_repo(tmp_path) tasks = [create_task(repo, f"cancel-stress-{i}") for i in range(20)] cancelled_ids: set[str] = set() lock = threading.Lock() errors: list[str] = [] def do_cancel(t: "TaskRecord") -> None: try: claim = cancel_task(repo, t.task_id, f"orch-{t.task_id[:8]}") except Exception as exc: # noqa: BLE001 errors.append(str(exc)) return with lock: cancelled_ids.add(claim.task_id) with concurrent.futures.ThreadPoolExecutor(max_workers=20) as pool: list(pool.map(do_cancel, tasks)) assert not errors, f"errors: {errors}" assert len(cancelled_ids) == 20, f"only {len(cancelled_ids)}/20 cancelled" def test_100_sequential_cancels_under_15s(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) start = time.monotonic() for i in range(100): t = create_task(repo, f"seq-cancel-{i}") cancel_task(repo, t.task_id, "orch") elapsed = time.monotonic() - start assert elapsed < 15.0, f"100 sequential cancels took {elapsed:.1f}s" def test_cancel_via_run_cancel_task_100_sequential(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) start = time.monotonic() for i in range(100): t = create_task(repo, f"cli-cancel-{i}") args = _cancel_ns(task_id=t.task_id, run_id="orch") with _patch_repo(repo): run_cancel_task(args) capsys.readouterr() elapsed = time.monotonic() - start assert elapsed < 15.0, f"100 CLI cancels took {elapsed:.1f}s" class TestCliTasks: """run_tasks: listing, filtering, JSON/text output.""" def _setup_mixed(self, repo: pathlib.Path) -> tuple[TaskRecord, TaskRecord, TaskRecord]: """Create tasks in various states.""" t1 = create_task(repo, "Pending task", queue="q1", priority=1) t2 = create_task(repo, "Claimed task", queue="q2", priority=5) t3 = create_task(repo, "Done task", queue="q1", priority=3) claim_next_task(repo, "agent-2", queue="q2") claim_next_task(repo, "agent-3", queue="q1") complete_task(repo, t3.task_id, "agent-3") return t1, t2, t3 def test_lists_all_tasks_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) self._setup_mixed(repo) args = _namespace(json_out=True, status=None, queue=None, run_id=None) with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) assert out["total"] == 3 assert len(out["items"]) == 3 def test_filter_by_status_pending(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) self._setup_mixed(repo) args = _namespace(json_out=True, status="pending", queue=None, run_id=None) with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) for item in out["items"]: assert item["status"] == "pending" def test_filter_by_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) self._setup_mixed(repo) args = _namespace(json_out=True, status=None, queue="q1", run_id=None) with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) for item in out["items"]: assert item["queue"] == "q1" def test_filter_by_run_id(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) self._setup_mixed(repo) args = _namespace(json_out=True, status=None, queue=None, run_id="agent-2") with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) for item in out["items"]: assert item["claimer_run_id"] == "agent-2" def test_items_sorted_by_priority_desc(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): create_task(repo, "Low", priority=0) create_task(repo, "High", priority=10) create_task(repo, "Mid", priority=5) args = _namespace(json_out=True, status=None, queue=None, run_id=None) with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) priorities = [i["priority"] for i in out["items"]] assert priorities == sorted(priorities, reverse=True) def test_text_output_no_crash(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "Text test task") args = _namespace(json_out=False, status=None, queue=None, run_id=None) with _patch_repo(repo): run_tasks(args) out = capsys.readouterr().out assert "Task queue" in out def test_empty_queue_text_output(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _namespace(json_out=False, status=None, queue=None, run_id=None) with _patch_repo(repo): run_tasks(args) out = capsys.readouterr().out assert "0 task" in out def test_status_counts_in_json(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "Pending") args = _namespace(json_out=True, status=None, queue=None, run_id=None) with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) assert "pending" in out assert out["pending"] == 1 def test_ansi_in_title_not_printed_raw(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """ANSI escape sequences in task titles must be sanitized in text output.""" repo = _make_repo(tmp_path) create_task(repo, "\x1b[31mRED\x1b[0m") args = _namespace(json_out=False, status=None, queue=None, run_id=None) with _patch_repo(repo): run_tasks(args) out = capsys.readouterr().out # Raw ESC byte must not appear in text output assert "\x1b" not in out # ── tasks hardening ─────────────────────────────────────────────────────────── from muse.cli.commands.task_queue import _MAX_LIMIT from muse.core.types import Manifest def _tasks_ns(**kwargs: MsgpackValue) -> argparse.Namespace: """Build a Namespace with tasks-appropriate defaults.""" defaults = { "json_out": True, "status": None, "queue": None, "run_id": None, "limit": 200, } defaults.update(kwargs) return argparse.Namespace(**defaults) class TestTasksInputValidation: """All tasks validation fires before require_repo() and exits 1.""" def test_invalid_queue_name_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _tasks_ns(queue="bad queue!") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_tasks(args) assert exc.value.code == ExitCode.USER_ERROR def test_queue_with_slash_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _tasks_ns(queue="../../etc") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_tasks(args) assert exc.value.code == ExitCode.USER_ERROR def test_queue_with_ansi_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _tasks_ns(queue="q\x1b[31m") with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_tasks(args) assert exc.value.code == ExitCode.USER_ERROR def test_valid_queue_name_passes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "task", queue="billing-v2") args = _tasks_ns(queue="billing-v2") with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) assert out["total"] >= 0 # no exception def test_run_id_too_long_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _tasks_ns(run_id="x" * 257) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_tasks(args) assert exc.value.code == ExitCode.USER_ERROR def test_run_id_at_max_length_passes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _tasks_ns(run_id="r" * 256) with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) assert "items" in out def test_limit_zero_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _tasks_ns(limit=0) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_tasks(args) assert exc.value.code == ExitCode.USER_ERROR def test_limit_negative_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _tasks_ns(limit=-1) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_tasks(args) assert exc.value.code == ExitCode.USER_ERROR def test_limit_over_max_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) args = _tasks_ns(limit=_MAX_LIMIT + 1) with _patch_repo(repo): with pytest.raises(SystemExit) as exc: run_tasks(args) assert exc.value.code == ExitCode.USER_ERROR def test_limit_at_max_passes(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _tasks_ns(limit=_MAX_LIMIT) with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) assert "items" in out def test_queue_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None: call_count = {"n": 0} def counting_require_repo() -> pathlib.Path: call_count["n"] += 1 raise RuntimeError("should not be called") args = _tasks_ns(queue="bad queue!") with patch("muse.cli.commands.task_queue.require_repo", counting_require_repo): with pytest.raises(SystemExit): run_tasks(args) assert call_count["n"] == 0 def test_run_id_validation_fires_before_repo_lookup(self, tmp_path: pathlib.Path) -> None: call_count = {"n": 0} def counting_require_repo() -> pathlib.Path: call_count["n"] += 1 raise RuntimeError("should not be called") args = _tasks_ns(run_id="r" * 300) with patch("muse.cli.commands.task_queue.require_repo", counting_require_repo): with pytest.raises(SystemExit): run_tasks(args) assert call_count["n"] == 0 def test_json_error_shape_bad_queue(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _tasks_ns(queue="bad queue!", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_tasks(args) out = json.loads(capsys.readouterr().out) assert "error" in out assert out["status"] == "bad_queue" def test_json_error_shape_bad_limit(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _tasks_ns(limit=0, json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_tasks(args) out = json.loads(capsys.readouterr().out) assert "error" in out assert out["status"] == "bad_args" def test_text_error_goes_to_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _tasks_ns(queue="bad queue!", json_out=False) with _patch_repo(repo): with pytest.raises(SystemExit): run_tasks(args) captured = capsys.readouterr() assert captured.out == "" assert "❌" in captured.err def test_json_error_goes_to_stdout_not_stderr(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _tasks_ns(queue="bad queue!", json_out=True) with _patch_repo(repo): with pytest.raises(SystemExit): run_tasks(args) captured = capsys.readouterr() assert captured.err == "" out = json.loads(captured.out) assert "error" in out class TestTasksJsonOutput: """run_tasks JSON output shape, compactness, and field completeness.""" def _setup(self, repo: pathlib.Path) -> tuple[TaskRecord, TaskRecord, TaskRecord]: t1 = create_task(repo, "Pending", queue="q1", priority=1) t2 = create_task(repo, "Claimed", queue="q2", priority=5) t3 = create_task(repo, "Done", queue="q1", priority=3) claim_next_task(repo, "worker-a", queue="q2") claim_next_task(repo, "worker-b", queue="q1") complete_task(repo, t3.task_id, "worker-b") return t1, t2, t3 def test_json_is_compact(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) self._setup(repo) args = _tasks_ns() with _patch_repo(repo): run_tasks(args) raw = capsys.readouterr().out.strip() assert "\n" not in raw, "JSON must be single line (compact)" def test_json_has_top_level_keys(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) self._setup(repo) args = _tasks_ns() with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) for key in ("schema", "total", "pending", "claimed", "timed_out", "completed", "failed", "cancelled", "limit", "truncated", "items", "duration_ms"): assert key in out, f"missing top-level key: {key}" def test_items_have_new_fields(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "field-test", queue="default") args = _tasks_ns() with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) assert len(out["items"]) == 1 item = out["items"][0] for field in ("created_by", "ttl_seconds", "expires_at"): assert field in item, f"missing item field: {field}" def test_expires_at_null_for_pending_task(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "pending-task") args = _tasks_ns() with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) item = out["items"][0] assert item["expires_at"] is None def test_expires_at_populated_for_claimed_task(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "claimed-task") claim_next_task(repo, "worker") args = _tasks_ns() with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) item = out["items"][0] assert item["expires_at"] is not None # Should be a parseable ISO 8601 datetime import datetime as _dt _dt.datetime.fromisoformat(item["expires_at"]) def test_status_counts_reflect_full_queue_when_filtered(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Counts always reflect ALL tasks, not just the filtered set.""" repo = _make_repo(tmp_path) self._setup(repo) # Filter to only q1 items, but total/counts should still be 3 args = _tasks_ns(queue="q1") with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) assert out["total"] == 3 assert len(out["items"]) == 2 # only q1 items def test_limit_truncates_items(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) for i in range(10): create_task(repo, f"task-{i}") args = _tasks_ns(limit=3) with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) assert len(out["items"]) == 3 assert out["truncated"] is True assert out["limit"] == 3 assert out["total"] == 10 # full count still correct def test_no_truncation_when_within_limit(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) for i in range(5): create_task(repo, f"task-{i}") args = _tasks_ns(limit=10) with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) assert len(out["items"]) == 5 assert out["truncated"] is False def test_elapsed_is_float(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _tasks_ns() with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) assert isinstance(out["duration_ms"], float) def test_items_sorted_priority_desc(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): create_task(repo, "Low", priority=0) create_task(repo, "High", priority=10) create_task(repo, "Mid", priority=5) args = _tasks_ns() with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) priorities = [i["priority"] for i in out["items"]] assert priorities == sorted(priorities, reverse=True) def test_limit_applies_after_sort(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """--limit=1 should return the highest-priority task.""" repo = _make_repo(tmp_path) with _freeze(_EPOCH): create_task(repo, "Low", priority=0) create_task(repo, "High", priority=99) args = _tasks_ns(limit=1) with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) assert len(out["items"]) == 1 assert out["items"][0]["priority"] == 99 class TestTasksTextOutput: """run_tasks text output content and formatting.""" def test_text_shows_task_queue_header(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _tasks_ns(json_out=False) with _patch_repo(repo): run_tasks(args) out = capsys.readouterr().out assert "Task queue" in out def test_text_shows_status_counts(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "pending") args = _tasks_ns(json_out=False) with _patch_repo(repo): run_tasks(args) out = capsys.readouterr().out assert "pending" in out assert "claimed" in out def test_text_shows_column_headers(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "task") args = _tasks_ns(json_out=False) with _patch_repo(repo): run_tasks(args) out = capsys.readouterr().out assert "ID" in out assert "QUEUE" in out assert "TITLE" in out def test_text_shows_filter_line(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _tasks_ns(json_out=False, queue="billing") with _patch_repo(repo): run_tasks(args) out = capsys.readouterr().out assert "filter" in out assert "billing" in out def test_text_empty_queue_message(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _tasks_ns(json_out=False) with _patch_repo(repo): run_tasks(args) out = capsys.readouterr().out assert "0 task" in out def test_text_shows_elapsed(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) args = _tasks_ns(json_out=False) with _patch_repo(repo): run_tasks(args) out = capsys.readouterr().out assert "s)" in out def test_ansi_in_queue_name_stripped(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Enqueued tasks with ANSI in queue are sanitized in text output.""" # We can't enqueue with a bad queue via CLI (validated), but the task # record could be crafted; sanitize_display handles it in display. repo = _make_repo(tmp_path) create_task(repo, "task") args = _tasks_ns(json_out=False, run_id="\x1b[31mred\x1b[0m") # run_id with ANSI is valid length-wise but we're checking display with _patch_repo(repo): run_tasks(args) out = capsys.readouterr().out assert "\x1b" not in out class TestTasksIntegration: """Full integration tests for run_tasks with realistic task states.""" def test_full_mixed_state_counts(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) # pending (queue="p" — no worker claims these) create_task(repo, "p1", queue="p") create_task(repo, "p2", queue="p") # claimed (queue="c") create_task(repo, "c1", queue="c") claim_next_task(repo, "w1", queue="c") # completed (queue="done") t_done = create_task(repo, "done", queue="done") claim_next_task(repo, "w2", queue="done") complete_task(repo, t_done.task_id, "w2") # failed (queue="fail") t_fail = create_task(repo, "fail", queue="fail") claim_next_task(repo, "w3", queue="fail") fail_task(repo, t_fail.task_id, "w3", error="boom") # cancelled t_can = create_task(repo, "cancelled", queue="can") cancel_task(repo, t_can.task_id, "orch") args = _tasks_ns() with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) assert out["total"] == 6 assert out["pending"] == 2 assert out["claimed"] == 1 assert out["completed"] == 1 assert out["failed"] == 1 assert out["cancelled"] == 1 def test_filter_by_status_only_returns_matching(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) create_task(repo, "pending-1", queue="p") t_done = create_task(repo, "done-1", queue="done") claim_next_task(repo, "w", queue="done") complete_task(repo, t_done.task_id, "w") args = _tasks_ns(status="completed") with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) assert all(i["status"] == "completed" for i in out["items"]) assert len(out["items"]) == 1 # But total counts still reflect full queue assert out["total"] == 2 def test_filter_by_queue_and_status(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) # Create and immediately complete a billing task (no competing tasks yet) t_b = create_task(repo, "billing-done", queue="billing") claim_next_task(repo, "w", queue="billing") complete_task(repo, t_b.task_id, "w") # Now add a pending billing and an infra task create_task(repo, "billing-pending", queue="billing") create_task(repo, "infra-pending", queue="infra") args = _tasks_ns(queue="billing", status="completed") with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) assert len(out["items"]) == 1 assert out["items"][0]["queue"] == "billing" assert out["items"][0]["status"] == "completed" # Global total is all 3 assert out["total"] == 3 def test_filter_by_run_id(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) t1 = create_task(repo, "t1") t2 = create_task(repo, "t2") claim_next_task(repo, "worker-alpha") claim_next_task(repo, "worker-beta") args = _tasks_ns(run_id="worker-alpha") with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) assert len(out["items"]) == 1 assert out["items"][0]["claimer_run_id"] == "worker-alpha" def test_enqueue_then_list_shows_created_by(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) eq_args = _enqueue_ns(title="listed-task", queue="default", run_id="enqueuer-1") with _patch_repo(repo): run_enqueue(eq_args) capsys.readouterr() args = _tasks_ns() with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) assert len(out["items"]) == 1 assert out["items"][0]["created_by"] == "enqueuer-1" def test_limit_with_filter_shows_highest_priority(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) with _freeze(_EPOCH): for i in range(10): create_task(repo, f"task-{i}", queue="q", priority=i) args = _tasks_ns(queue="q", limit=3) with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) assert len(out["items"]) == 3 # Top 3 priorities should be 9, 8, 7 assert [i["priority"] for i in out["items"]] == [9, 8, 7] class TestTasksStress: """Performance and concurrency tests for run_tasks.""" def test_500_tasks_listed_under_5s(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) for i in range(500): create_task(repo, f"task-{i}", queue="default") args = _tasks_ns(limit=500) start = time.monotonic() with _patch_repo(repo): run_tasks(args) elapsed = time.monotonic() - start out = json.loads(capsys.readouterr().out) assert out["total"] == 500 assert elapsed < 5.0, f"listing 500 tasks took {elapsed:.1f}s" def test_500_tasks_with_filter_under_5s(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: repo = _make_repo(tmp_path) for i in range(250): create_task(repo, f"billing-{i}", queue="billing") for i in range(250): create_task(repo, f"infra-{i}", queue="infra") args = _tasks_ns(queue="billing", limit=250) start = time.monotonic() with _patch_repo(repo): run_tasks(args) elapsed = time.monotonic() - start out = json.loads(capsys.readouterr().out) assert len(out["items"]) == 250 assert elapsed < 5.0, f"filtered listing took {elapsed:.1f}s" def test_concurrent_reads_are_safe(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Concurrent run_tasks calls against the same repo must not crash.""" import concurrent.futures repo = _make_repo(tmp_path) for i in range(50): create_task(repo, f"task-{i}") errors: list[str] = [] def read_tasks() -> None: args = _tasks_ns(limit=50) try: with _patch_repo(repo): run_tasks(args) except Exception as exc: # noqa: BLE001 errors.append(str(exc)) with concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool: list(pool.map(lambda _: read_tasks(), range(20))) assert not errors, f"concurrent read errors: {errors}" def test_no_double_load_with_filter(self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str]) -> None: """Verify counts are still correct when filter is active (no re-load bug).""" repo = _make_repo(tmp_path) for i in range(20): create_task(repo, f"billing-{i}", queue="billing") for i in range(10): create_task(repo, f"infra-{i}", queue="infra") args = _tasks_ns(queue="billing") with _patch_repo(repo): run_tasks(args) out = json.loads(capsys.readouterr().out) # total must reflect ALL 30 tasks, not just the 20 in billing assert out["total"] == 30 assert len(out["items"]) == 20 # ── register_all integration ─────────────────────────────────────────────────── class TestRegisterAll: """register_all attaches all six subcommands to the given subparsers.""" def test_all_commands_registered(self) -> None: import argparse parser = argparse.ArgumentParser() subs = parser.add_subparsers(dest="cmd") register_all(subs) # Verify each expected command is parseable for cmd in ("enqueue", "claim", "complete", "fail-task", "cancel-task", "tasks"): # A subparser was registered for this command name # (ArgumentParser stores choices in _subparsers._group_actions) found = False for action in parser._subparsers._group_actions: if cmd in action.choices: found = True break assert found, f"Command '{cmd}' not registered" # ── Content-addressed task_id ───────────────────────────────────────────────── class TestTaskIdContentAddressed: """task_id must be sha256: of genesis content, not a random UUID.""" def test_task_id_is_sha256_prefixed(self, tmp_path: pathlib.Path) -> None: from muse.core.task_queue import compute_task_id tid = compute_task_id( title="render stems", queue="audio", payload={"track": 1}, priority=5, created_by="agent-x", ) assert tid.startswith("sha256:"), f"Expected sha256: prefix, got {tid!r}" assert len(tid) == 71 def test_task_id_is_sha256_not_uuid4(self, tmp_path: pathlib.Path) -> None: import re from muse.core.task_queue import compute_task_id tid = compute_task_id("render stems", "audio", {}, 0, "agent-x") uuid4_re = re.compile( r"^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$" ) assert not uuid4_re.match(tid) def test_task_id_deterministic(self, tmp_path: pathlib.Path) -> None: from muse.core.task_queue import compute_task_id t1 = compute_task_id("render stems", "audio", {"track": 1}, 5, "agent-x") t2 = compute_task_id("render stems", "audio", {"track": 1}, 5, "agent-x") assert t1 == t2 def test_task_id_differs_by_title(self, tmp_path: pathlib.Path) -> None: from muse.core.task_queue import compute_task_id t1 = compute_task_id("render stems", "audio", {}, 0, "agent-x") t2 = compute_task_id("export wav", "audio", {}, 0, "agent-x") assert t1 != t2 def test_task_id_differs_by_queue(self, tmp_path: pathlib.Path) -> None: from muse.core.task_queue import compute_task_id t1 = compute_task_id("render stems", "audio", {}, 0, "agent-x") t2 = compute_task_id("render stems", "midi", {}, 0, "agent-x") assert t1 != t2 def test_create_task_produces_sha256_id(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) task = create_task(repo, "process audio", queue="audio", created_by="agent-x") assert task.task_id.startswith("sha256:") assert len(task.task_id) == 71 def test_create_task_id_matches_compute(self, tmp_path: pathlib.Path) -> None: from muse.core.task_queue import compute_task_id repo = _make_repo(tmp_path) task = create_task( repo, "process audio", queue="audio", payload={"track": 3}, priority=2, created_by="agent-x", ) expected = compute_task_id( title="process audio", queue="audio", payload={"track": 3}, priority=2, created_by="agent-x", ) assert task.task_id == expected # --------------------------------------------------------------------------- # Flag registration # --------------------------------------------------------------------------- class TestRegisterFlags: def _parse_enqueue(self, *args: str) -> "argparse.Namespace": import argparse from muse.cli.commands.task_queue import register_enqueue p = argparse.ArgumentParser() sub = p.add_subparsers() register_enqueue(sub) return p.parse_args(["enqueue", *args]) def test_default_json_out_is_false(self) -> None: ns = self._parse_enqueue("test-task", "--run-id", "orch") assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = self._parse_enqueue("test-task", "--run-id", "orch", "--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = self._parse_enqueue("test-task", "--run-id", "orch", "-j") assert ns.json_out is True