test_cmd_maintenance.py
file-level
1
files
1
commits
0
hotspots
0
🧊 dead
0
💥 blast risk
| 1 | """Tests for ``muse maintenance`` — scheduled store maintenance orchestration. |
| 2 | |
| 3 | Coverage tiers: |
| 4 | - Unit: run records timestamp; run --task gc; run --task verify-objects; |
| 5 | run --all; run --dry-run; run --json schema; status text + JSON; |
| 6 | schedule --period-hours; schedule --enable/--disable; |
| 7 | status reflects schedule config; no-config defaults |
| 8 | - Integration: verify-objects detects corrupt objects; gc cleans unreachable |
| 9 | blobs; json result keys correct; dry-run produces no mutations |
| 10 | - Security: no ANSI injection in task output |
| 11 | - Stress: verify-objects on 100 objects completes and counts correctly |
| 12 | """ |
| 13 | |
| 14 | from __future__ import annotations |
| 15 | from collections.abc import Mapping |
| 16 | |
| 17 | import datetime |
| 18 | import json |
| 19 | import pathlib |
| 20 | import time |
| 21 | import unittest.mock |
| 22 | |
| 23 | import pytest |
| 24 | |
| 25 | from tests.cli_test_helper import CliRunner |
| 26 | from muse.core.object_store import object_path, write_object |
| 27 | from muse.core.paths import maintenance_json_path, muse_dir |
| 28 | |
| 29 | from muse.core.snapshots import ( |
| 30 | SnapshotRecord, |
| 31 | write_snapshot, |
| 32 | ) |
| 33 | from muse.core.types import Manifest, blob_id |
| 34 | |
| 35 | runner = CliRunner() |
| 36 | |
| 37 | _REPO_ID = "maintenance-test" |
| 38 | |
| 39 | |
| 40 | # --------------------------------------------------------------------------- |
| 41 | # Helpers |
| 42 | # --------------------------------------------------------------------------- |
| 43 | |
| 44 | |
| 45 | |
| 46 | |
| 47 | def _init_repo(path: pathlib.Path) -> pathlib.Path: |
| 48 | dot_muse = muse_dir(path) |
| 49 | for d in ("commits", "snapshots", "objects", "refs/heads", "code"): |
| 50 | (dot_muse / d).mkdir(parents=True, exist_ok=True) |
| 51 | (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") |
| 52 | (dot_muse / "repo.json").write_text( |
| 53 | json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" |
| 54 | ) |
| 55 | return path |
| 56 | |
| 57 | |
| 58 | def _env(repo: pathlib.Path) -> Mapping[str, str]: |
| 59 | return {"MUSE_REPO_ROOT": str(repo)} |
| 60 | |
| 61 | |
| 62 | def _invoke(args: list[str], repo: pathlib.Path) -> tuple[int, str, str]: |
| 63 | result = runner.invoke(None, args, env=_env(repo)) |
| 64 | return result.exit_code, result.stdout, result.stderr |
| 65 | |
| 66 | |
| 67 | def _maint_config(repo: pathlib.Path) -> pathlib.Path: |
| 68 | return maintenance_json_path(repo) |
| 69 | |
| 70 | |
| 71 | def _write_object(repo: pathlib.Path, content: bytes) -> str: |
| 72 | obj_id = blob_id(content) |
| 73 | write_object(repo, obj_id, content) |
| 74 | return obj_id |
| 75 | |
| 76 | |
| 77 | # --------------------------------------------------------------------------- |
| 78 | # Unit — run (default) |
| 79 | # --------------------------------------------------------------------------- |
| 80 | |
| 81 | |
| 82 | class TestRun: |
| 83 | def test_run_exits_zero(self, tmp_path: pathlib.Path) -> None: |
| 84 | repo = _init_repo(tmp_path / "repo") |
| 85 | rc, out, err = _invoke(["maintenance", "run"], repo) |
| 86 | assert rc == 0 |
| 87 | |
| 88 | def test_run_records_timestamp(self, tmp_path: pathlib.Path) -> None: |
| 89 | repo = _init_repo(tmp_path / "repo") |
| 90 | _invoke(["maintenance", "run"], repo) |
| 91 | cfg = json.loads(_maint_config(repo).read_text()) |
| 92 | assert "last_run" in cfg |
| 93 | assert "gc" in cfg["last_run"] |
| 94 | |
| 95 | def test_run_creates_config_if_missing(self, tmp_path: pathlib.Path) -> None: |
| 96 | repo = _init_repo(tmp_path / "repo") |
| 97 | assert not _maint_config(repo).exists() |
| 98 | _invoke(["maintenance", "run"], repo) |
| 99 | assert _maint_config(repo).exists() |
| 100 | |
| 101 | def test_run_task_gc(self, tmp_path: pathlib.Path) -> None: |
| 102 | repo = _init_repo(tmp_path / "repo") |
| 103 | rc, out, err = _invoke(["maintenance", "run", "--task", "gc"], repo) |
| 104 | assert rc == 0 |
| 105 | cfg = json.loads(_maint_config(repo).read_text()) |
| 106 | assert "gc" in cfg["last_run"] |
| 107 | |
| 108 | def test_run_task_verify_objects(self, tmp_path: pathlib.Path) -> None: |
| 109 | repo = _init_repo(tmp_path / "repo") |
| 110 | _write_object(repo, b"hello") |
| 111 | rc, out, err = _invoke( |
| 112 | ["maintenance", "run", "--task", "verify-objects"], repo |
| 113 | ) |
| 114 | assert rc == 0 |
| 115 | cfg = json.loads(_maint_config(repo).read_text()) |
| 116 | assert "verify-objects" in cfg["last_run"] |
| 117 | |
| 118 | def test_run_all(self, tmp_path: pathlib.Path) -> None: |
| 119 | repo = _init_repo(tmp_path / "repo") |
| 120 | rc, out, err = _invoke(["maintenance", "run", "--all"], repo) |
| 121 | assert rc == 0 |
| 122 | cfg = json.loads(_maint_config(repo).read_text()) |
| 123 | assert "gc" in cfg["last_run"] |
| 124 | assert "verify-objects" in cfg["last_run"] |
| 125 | |
| 126 | def test_run_dry_run_no_config_written(self, tmp_path: pathlib.Path) -> None: |
| 127 | repo = _init_repo(tmp_path / "repo") |
| 128 | _invoke(["maintenance", "run", "--dry-run"], repo) |
| 129 | # dry-run should NOT persist timestamps |
| 130 | if _maint_config(repo).exists(): |
| 131 | cfg = json.loads(_maint_config(repo).read_text()) |
| 132 | assert "last_run" not in cfg or not cfg.get("last_run") |
| 133 | |
| 134 | def test_run_json_schema(self, tmp_path: pathlib.Path) -> None: |
| 135 | repo = _init_repo(tmp_path / "repo") |
| 136 | rc, out, err = _invoke(["maintenance", "run", "--json"], repo) |
| 137 | assert rc == 0 |
| 138 | data = json.loads(out) |
| 139 | assert "tasks_run" in data |
| 140 | assert "results" in data |
| 141 | assert "dry_run" in data |
| 142 | assert "duration_ms" in data |
| 143 | |
| 144 | def test_run_json_tasks_run_is_list(self, tmp_path: pathlib.Path) -> None: |
| 145 | repo = _init_repo(tmp_path / "repo") |
| 146 | rc, out, err = _invoke( |
| 147 | ["maintenance", "run", "--task", "gc", "--json"], repo |
| 148 | ) |
| 149 | data = json.loads(out) |
| 150 | assert isinstance(data["tasks_run"], list) |
| 151 | assert "gc" in data["tasks_run"] |
| 152 | |
| 153 | def test_run_unknown_task_exits_nonzero(self, tmp_path: pathlib.Path) -> None: |
| 154 | repo = _init_repo(tmp_path / "repo") |
| 155 | rc, out, err = _invoke( |
| 156 | ["maintenance", "run", "--task", "bogus-task"], repo |
| 157 | ) |
| 158 | assert rc != 0 |
| 159 | |
| 160 | def test_run_dry_run_flag_in_json(self, tmp_path: pathlib.Path) -> None: |
| 161 | repo = _init_repo(tmp_path / "repo") |
| 162 | rc, out, err = _invoke( |
| 163 | ["maintenance", "run", "--dry-run", "--json"], repo |
| 164 | ) |
| 165 | data = json.loads(out) |
| 166 | assert data["dry_run"] is True |
| 167 | |
| 168 | |
| 169 | # --------------------------------------------------------------------------- |
| 170 | # Unit — status |
| 171 | # --------------------------------------------------------------------------- |
| 172 | |
| 173 | |
| 174 | class TestStatus: |
| 175 | def test_status_no_config(self, tmp_path: pathlib.Path) -> None: |
| 176 | repo = _init_repo(tmp_path / "repo") |
| 177 | rc, out, err = _invoke(["maintenance", "status"], repo) |
| 178 | assert rc == 0 |
| 179 | assert "never" in out.lower() or "no" in out.lower() or "disabled" in out.lower() |
| 180 | |
| 181 | def test_status_json_no_config(self, tmp_path: pathlib.Path) -> None: |
| 182 | repo = _init_repo(tmp_path / "repo") |
| 183 | rc, out, err = _invoke(["maintenance", "status", "--json"], repo) |
| 184 | assert rc == 0 |
| 185 | data = json.loads(out) |
| 186 | assert "enabled" in data |
| 187 | assert "period_hours" in data |
| 188 | assert "last_run" in data |
| 189 | |
| 190 | def test_status_shows_last_run_after_run(self, tmp_path: pathlib.Path) -> None: |
| 191 | repo = _init_repo(tmp_path / "repo") |
| 192 | _invoke(["maintenance", "run", "--task", "gc"], repo) |
| 193 | rc, out, err = _invoke(["maintenance", "status"], repo) |
| 194 | assert rc == 0 |
| 195 | assert "gc" in out.lower() |
| 196 | |
| 197 | def test_status_json_has_last_run_timestamp(self, tmp_path: pathlib.Path) -> None: |
| 198 | repo = _init_repo(tmp_path / "repo") |
| 199 | _invoke(["maintenance", "run", "--task", "gc"], repo) |
| 200 | rc, out, err = _invoke(["maintenance", "status", "--json"], repo) |
| 201 | data = json.loads(out) |
| 202 | assert "gc" in data["last_run"] |
| 203 | # last_run entries are now {timestamp, status, duration_ms} records |
| 204 | record = data["last_run"]["gc"] |
| 205 | assert isinstance(record, dict) |
| 206 | assert "T" in record["timestamp"] |
| 207 | assert record["status"] in ("ok", "error") |
| 208 | |
| 209 | |
| 210 | # --------------------------------------------------------------------------- |
| 211 | # Unit — schedule |
| 212 | # --------------------------------------------------------------------------- |
| 213 | |
| 214 | |
| 215 | class TestSchedule: |
| 216 | def test_schedule_period_hours(self, tmp_path: pathlib.Path) -> None: |
| 217 | repo = _init_repo(tmp_path / "repo") |
| 218 | rc, out, err = _invoke( |
| 219 | ["maintenance", "schedule", "--period-hours", "48"], repo |
| 220 | ) |
| 221 | assert rc == 0 |
| 222 | cfg = json.loads(_maint_config(repo).read_text()) |
| 223 | assert cfg["period_hours"] == 48 |
| 224 | |
| 225 | def test_schedule_disable(self, tmp_path: pathlib.Path) -> None: |
| 226 | repo = _init_repo(tmp_path / "repo") |
| 227 | _invoke(["maintenance", "schedule", "--period-hours", "24"], repo) |
| 228 | rc, out, err = _invoke(["maintenance", "schedule", "--disable"], repo) |
| 229 | assert rc == 0 |
| 230 | cfg = json.loads(_maint_config(repo).read_text()) |
| 231 | assert cfg["enabled"] is False |
| 232 | |
| 233 | def test_schedule_enable(self, tmp_path: pathlib.Path) -> None: |
| 234 | repo = _init_repo(tmp_path / "repo") |
| 235 | _invoke(["maintenance", "schedule", "--disable"], repo) |
| 236 | rc, out, err = _invoke(["maintenance", "schedule", "--enable"], repo) |
| 237 | assert rc == 0 |
| 238 | cfg = json.loads(_maint_config(repo).read_text()) |
| 239 | assert cfg["enabled"] is True |
| 240 | |
| 241 | def test_schedule_status_reflects_period(self, tmp_path: pathlib.Path) -> None: |
| 242 | repo = _init_repo(tmp_path / "repo") |
| 243 | _invoke(["maintenance", "schedule", "--period-hours", "72"], repo) |
| 244 | rc, out, err = _invoke(["maintenance", "status", "--json"], repo) |
| 245 | data = json.loads(out) |
| 246 | assert data["period_hours"] == 72 |
| 247 | |
| 248 | def test_schedule_invalid_period_exits_nonzero(self, tmp_path: pathlib.Path) -> None: |
| 249 | repo = _init_repo(tmp_path / "repo") |
| 250 | rc, out, err = _invoke( |
| 251 | ["maintenance", "schedule", "--period-hours", "-1"], repo |
| 252 | ) |
| 253 | assert rc != 0 |
| 254 | |
| 255 | def test_schedule_default_period_is_24(self, tmp_path: pathlib.Path) -> None: |
| 256 | repo = _init_repo(tmp_path / "repo") |
| 257 | _invoke(["maintenance", "schedule"], repo) |
| 258 | cfg = json.loads(_maint_config(repo).read_text()) |
| 259 | assert cfg.get("period_hours", 24) == 24 |
| 260 | |
| 261 | |
| 262 | # --------------------------------------------------------------------------- |
| 263 | # Integration — verify-objects detects corruption |
| 264 | # --------------------------------------------------------------------------- |
| 265 | |
| 266 | |
| 267 | class TestVerifyObjectsIntegration: |
| 268 | def test_verify_objects_passes_on_clean_store(self, tmp_path: pathlib.Path) -> None: |
| 269 | repo = _init_repo(tmp_path / "repo") |
| 270 | for i in range(5): |
| 271 | _write_object(repo, f"content-{i}".encode()) |
| 272 | rc, out, err = _invoke( |
| 273 | ["maintenance", "run", "--task", "verify-objects", "--json"], repo |
| 274 | ) |
| 275 | assert rc == 0 |
| 276 | data = json.loads(out) |
| 277 | assert data["results"]["verify-objects"]["failed"] == 0 |
| 278 | |
| 279 | def test_verify_objects_detects_corrupt_object(self, tmp_path: pathlib.Path) -> None: |
| 280 | import os |
| 281 | repo = _init_repo(tmp_path / "repo") |
| 282 | obj_id = _write_object(repo, b"good content") |
| 283 | # corrupt the file (objects are stored read-only, chmod first) |
| 284 | obj_path = object_path(repo, obj_id) |
| 285 | obj_path.chmod(0o644) |
| 286 | obj_path.write_bytes(b"corrupted data") |
| 287 | |
| 288 | rc, out, err = _invoke( |
| 289 | ["maintenance", "run", "--task", "verify-objects", "--json"], repo |
| 290 | ) |
| 291 | # Should still exit 0 but report failures |
| 292 | data = json.loads(out) |
| 293 | assert data["results"]["verify-objects"]["failed"] >= 1 |
| 294 | |
| 295 | def test_verify_objects_json_has_checked_count(self, tmp_path: pathlib.Path) -> None: |
| 296 | repo = _init_repo(tmp_path / "repo") |
| 297 | for i in range(3): |
| 298 | _write_object(repo, f"item-{i}".encode()) |
| 299 | rc, out, err = _invoke( |
| 300 | ["maintenance", "run", "--task", "verify-objects", "--json"], repo |
| 301 | ) |
| 302 | data = json.loads(out) |
| 303 | assert data["results"]["verify-objects"]["checked"] == 3 |
| 304 | |
| 305 | |
| 306 | # --------------------------------------------------------------------------- |
| 307 | # Stress — verify-objects on 100 objects |
| 308 | # --------------------------------------------------------------------------- |
| 309 | |
| 310 | |
| 311 | class TestStress: |
| 312 | def test_100_objects_verify_all_pass(self, tmp_path: pathlib.Path) -> None: |
| 313 | repo = _init_repo(tmp_path / "repo") |
| 314 | for i in range(100): |
| 315 | _write_object(repo, f"stress-object-{i:03d}".encode()) |
| 316 | rc, out, err = _invoke( |
| 317 | ["maintenance", "run", "--task", "verify-objects", "--json"], repo |
| 318 | ) |
| 319 | assert rc == 0 |
| 320 | data = json.loads(out) |
| 321 | v = data["results"]["verify-objects"] |
| 322 | assert v["checked"] == 100 |
| 323 | assert v["failed"] == 0 |
| 324 | |
| 325 | |
| 326 | class TestRegisterFlags: |
| 327 | def test_default_json_out_is_false(self) -> None: |
| 328 | import argparse |
| 329 | from muse.cli.commands.maintenance import register |
| 330 | p = argparse.ArgumentParser() |
| 331 | subs = p.add_subparsers() |
| 332 | register(subs) |
| 333 | args = p.parse_args(["maintenance", "run"]) |
| 334 | assert args.json_out is False |
| 335 | |
| 336 | def test_json_flag_sets_json_out(self) -> None: |
| 337 | import argparse |
| 338 | from muse.cli.commands.maintenance import register |
| 339 | p = argparse.ArgumentParser() |
| 340 | subs = p.add_subparsers() |
| 341 | register(subs) |
| 342 | args = p.parse_args(["maintenance", "run", "--json"]) |
| 343 | assert args.json_out is True |
| 344 | |
| 345 | def test_j_shorthand_sets_json_out(self) -> None: |
| 346 | import argparse |
| 347 | from muse.cli.commands.maintenance import register |
| 348 | p = argparse.ArgumentParser() |
| 349 | subs = p.add_subparsers() |
| 350 | register(subs) |
| 351 | args = p.parse_args(["maintenance", "run", "-j"]) |
| 352 | assert args.json_out is True |