"""Tests for ``muse maintenance`` — scheduled store maintenance orchestration. Coverage tiers: - Unit: run records timestamp; run --task gc; run --task verify-objects; run --all; run --dry-run; run --json schema; status text + JSON; schedule --period-hours; schedule --enable/--disable; status reflects schedule config; no-config defaults - Integration: verify-objects detects corrupt objects; gc cleans unreachable blobs; json result keys correct; dry-run produces no mutations - Security: no ANSI injection in task output - Stress: verify-objects on 100 objects completes and counts correctly """ from __future__ import annotations from collections.abc import Mapping import datetime import json import pathlib import time import unittest.mock import pytest from tests.cli_test_helper import CliRunner from muse.core.object_store import object_path, write_object from muse.core.paths import maintenance_json_path, muse_dir from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, blob_id runner = CliRunner() _REPO_ID = "maintenance-test" # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _init_repo(path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(path) for d in ("commits", "snapshots", "objects", "refs/heads", "code"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" ) return path def _env(repo: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(repo)} def _invoke(args: list[str], repo: pathlib.Path) -> tuple[int, str, str]: result = runner.invoke(None, args, env=_env(repo)) return result.exit_code, result.stdout, result.stderr def _maint_config(repo: pathlib.Path) -> pathlib.Path: return maintenance_json_path(repo) def _write_object(repo: pathlib.Path, content: bytes) -> str: obj_id = blob_id(content) write_object(repo, obj_id, content) return obj_id # --------------------------------------------------------------------------- # Unit — run (default) # --------------------------------------------------------------------------- class TestRun: def test_run_exits_zero(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") rc, out, err = _invoke(["maintenance", "run"], repo) assert rc == 0 def test_run_records_timestamp(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") _invoke(["maintenance", "run"], repo) cfg = json.loads(_maint_config(repo).read_text()) assert "last_run" in cfg assert "gc" in cfg["last_run"] def test_run_creates_config_if_missing(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") assert not _maint_config(repo).exists() _invoke(["maintenance", "run"], repo) assert _maint_config(repo).exists() def test_run_task_gc(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") rc, out, err = _invoke(["maintenance", "run", "--task", "gc"], repo) assert rc == 0 cfg = json.loads(_maint_config(repo).read_text()) assert "gc" in cfg["last_run"] def test_run_task_verify_objects(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") _write_object(repo, b"hello") rc, out, err = _invoke( ["maintenance", "run", "--task", "verify-objects"], repo ) assert rc == 0 cfg = json.loads(_maint_config(repo).read_text()) assert "verify-objects" in cfg["last_run"] def test_run_all(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") rc, out, err = _invoke(["maintenance", "run", "--all"], repo) assert rc == 0 cfg = json.loads(_maint_config(repo).read_text()) assert "gc" in cfg["last_run"] assert "verify-objects" in cfg["last_run"] def test_run_dry_run_no_config_written(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") _invoke(["maintenance", "run", "--dry-run"], repo) # dry-run should NOT persist timestamps if _maint_config(repo).exists(): cfg = json.loads(_maint_config(repo).read_text()) assert "last_run" not in cfg or not cfg.get("last_run") def test_run_json_schema(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") rc, out, err = _invoke(["maintenance", "run", "--json"], repo) assert rc == 0 data = json.loads(out) assert "tasks_run" in data assert "results" in data assert "dry_run" in data assert "duration_ms" in data def test_run_json_tasks_run_is_list(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") rc, out, err = _invoke( ["maintenance", "run", "--task", "gc", "--json"], repo ) data = json.loads(out) assert isinstance(data["tasks_run"], list) assert "gc" in data["tasks_run"] def test_run_unknown_task_exits_nonzero(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") rc, out, err = _invoke( ["maintenance", "run", "--task", "bogus-task"], repo ) assert rc != 0 def test_run_dry_run_flag_in_json(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") rc, out, err = _invoke( ["maintenance", "run", "--dry-run", "--json"], repo ) data = json.loads(out) assert data["dry_run"] is True # --------------------------------------------------------------------------- # Unit — status # --------------------------------------------------------------------------- class TestStatus: def test_status_no_config(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") rc, out, err = _invoke(["maintenance", "status"], repo) assert rc == 0 assert "never" in out.lower() or "no" in out.lower() or "disabled" in out.lower() def test_status_json_no_config(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") rc, out, err = _invoke(["maintenance", "status", "--json"], repo) assert rc == 0 data = json.loads(out) assert "enabled" in data assert "period_hours" in data assert "last_run" in data def test_status_shows_last_run_after_run(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") _invoke(["maintenance", "run", "--task", "gc"], repo) rc, out, err = _invoke(["maintenance", "status"], repo) assert rc == 0 assert "gc" in out.lower() def test_status_json_has_last_run_timestamp(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") _invoke(["maintenance", "run", "--task", "gc"], repo) rc, out, err = _invoke(["maintenance", "status", "--json"], repo) data = json.loads(out) assert "gc" in data["last_run"] # last_run entries are now {timestamp, status, duration_ms} records record = data["last_run"]["gc"] assert isinstance(record, dict) assert "T" in record["timestamp"] assert record["status"] in ("ok", "error") # --------------------------------------------------------------------------- # Unit — schedule # --------------------------------------------------------------------------- class TestSchedule: def test_schedule_period_hours(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") rc, out, err = _invoke( ["maintenance", "schedule", "--period-hours", "48"], repo ) assert rc == 0 cfg = json.loads(_maint_config(repo).read_text()) assert cfg["period_hours"] == 48 def test_schedule_disable(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") _invoke(["maintenance", "schedule", "--period-hours", "24"], repo) rc, out, err = _invoke(["maintenance", "schedule", "--disable"], repo) assert rc == 0 cfg = json.loads(_maint_config(repo).read_text()) assert cfg["enabled"] is False def test_schedule_enable(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") _invoke(["maintenance", "schedule", "--disable"], repo) rc, out, err = _invoke(["maintenance", "schedule", "--enable"], repo) assert rc == 0 cfg = json.loads(_maint_config(repo).read_text()) assert cfg["enabled"] is True def test_schedule_status_reflects_period(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") _invoke(["maintenance", "schedule", "--period-hours", "72"], repo) rc, out, err = _invoke(["maintenance", "status", "--json"], repo) data = json.loads(out) assert data["period_hours"] == 72 def test_schedule_invalid_period_exits_nonzero(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") rc, out, err = _invoke( ["maintenance", "schedule", "--period-hours", "-1"], repo ) assert rc != 0 def test_schedule_default_period_is_24(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") _invoke(["maintenance", "schedule"], repo) cfg = json.loads(_maint_config(repo).read_text()) assert cfg.get("period_hours", 24) == 24 # --------------------------------------------------------------------------- # Integration — verify-objects detects corruption # --------------------------------------------------------------------------- class TestVerifyObjectsIntegration: def test_verify_objects_passes_on_clean_store(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") for i in range(5): _write_object(repo, f"content-{i}".encode()) rc, out, err = _invoke( ["maintenance", "run", "--task", "verify-objects", "--json"], repo ) assert rc == 0 data = json.loads(out) assert data["results"]["verify-objects"]["failed"] == 0 def test_verify_objects_detects_corrupt_object(self, tmp_path: pathlib.Path) -> None: import os repo = _init_repo(tmp_path / "repo") obj_id = _write_object(repo, b"good content") # corrupt the file (objects are stored read-only, chmod first) obj_path = object_path(repo, obj_id) obj_path.chmod(0o644) obj_path.write_bytes(b"corrupted data") rc, out, err = _invoke( ["maintenance", "run", "--task", "verify-objects", "--json"], repo ) # Should still exit 0 but report failures data = json.loads(out) assert data["results"]["verify-objects"]["failed"] >= 1 def test_verify_objects_json_has_checked_count(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") for i in range(3): _write_object(repo, f"item-{i}".encode()) rc, out, err = _invoke( ["maintenance", "run", "--task", "verify-objects", "--json"], repo ) data = json.loads(out) assert data["results"]["verify-objects"]["checked"] == 3 # --------------------------------------------------------------------------- # Stress — verify-objects on 100 objects # --------------------------------------------------------------------------- class TestStress: def test_100_objects_verify_all_pass(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path / "repo") for i in range(100): _write_object(repo, f"stress-object-{i:03d}".encode()) rc, out, err = _invoke( ["maintenance", "run", "--task", "verify-objects", "--json"], repo ) assert rc == 0 data = json.loads(out) v = data["results"]["verify-objects"] assert v["checked"] == 100 assert v["failed"] == 0 class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: import argparse from muse.cli.commands.maintenance import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["maintenance", "run"]) assert args.json_out is False def test_json_flag_sets_json_out(self) -> None: import argparse from muse.cli.commands.maintenance import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["maintenance", "run", "--json"]) assert args.json_out is True def test_j_shorthand_sets_json_out(self) -> None: import argparse from muse.cli.commands.maintenance import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["maintenance", "run", "-j"]) assert args.json_out is True