"""Supercharge tests for ``muse restore`` — performance, data integrity, object-store corruption, concurrency, and source+staged combos. Coverage tiers added here: - Performance: duration_ms present, non-negative, and reasonable - Data integrity: complete JSON schema, correct types, exit_code field - Error mapping: object store corruption → exit code 3 (INTERNAL_ERROR) - Concurrent: two threads restore independent files without racing - Source+staged: --source --staged restores stage entry from source commit - Text summary: text output includes "Restored N" summary line - Docstring gap: _resolve_source_manifest returns {} for bad ref (not raises) """ from __future__ import annotations from collections.abc import Mapping import json import pathlib import threading import time import datetime import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.object_store import write_object from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, blob_id from muse.plugins.code.stage import StagedFileMap, make_entry, read_stage, write_stage from muse.core.paths import heads_dir, muse_dir, ref_path runner = CliRunner() _REPO_ID = "restore-supercharge-test" _counter = 1000 # offset to avoid collisions with test_cmd_restore.py def _init_repo(path: pathlib.Path) -> pathlib.Path: muse = muse_dir(path) for d in ("commits", "snapshots", "objects", "refs/heads", "code"): (muse / d).mkdir(parents=True, exist_ok=True) (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (muse / "repo.json").write_text( json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" ) return path def _env(repo: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(repo)} def _commit_files(root: pathlib.Path, files: Mapping[str, bytes], branch: str = "main") -> str: global _counter _counter += 1 manifest: Manifest = {} for rel_path, content in files.items(): obj_id = blob_id(content) write_object(root, obj_id, content) manifest[rel_path] = obj_id abs_path = root / rel_path abs_path.parent.mkdir(parents=True, exist_ok=True) abs_path.write_bytes(content) snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime.now(datetime.timezone.utc) commit_id = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message=f"commit {_counter}", committed_at_iso=committed_at.isoformat(), ) write_commit( root, CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message=f"commit {_counter}", committed_at=committed_at, ), ) (ref_path(root, branch)).write_text(commit_id, encoding="utf-8") return commit_id def _invoke(repo: pathlib.Path, *args: str) -> InvokeResult: from muse.cli.app import main as cli return runner.invoke(cli, ["restore", *args], env=_env(repo)) # --------------------------------------------------------------------------- # Performance tier # --------------------------------------------------------------------------- def test_restore_json_has_duration_ms(tmp_path: pathlib.Path) -> None: """JSON output must include 'duration_ms' as a non-negative float.""" root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# orig\n"}) (root / "a.py").write_bytes(b"# dirty\n") result = _invoke(root, "--json", "a.py") assert result.exit_code == 0 data = json.loads(result.stdout) assert "duration_ms" in data, "JSON must include 'duration_ms'" assert isinstance(data["duration_ms"], (int, float)), "duration_ms must be numeric" assert data["duration_ms"] >= 0, "duration_ms must be non-negative" def test_restore_duration_ms_is_reasonable(tmp_path: pathlib.Path) -> None: """duration_ms for a single-file restore should be well under 5 seconds.""" root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# orig\n"}) (root / "a.py").write_bytes(b"# dirty\n") result = _invoke(root, "--json", "a.py") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["duration_ms"] < 5_000, f"duration_ms={data['duration_ms']} is suspiciously large" def test_restore_dry_run_json_has_duration_ms(tmp_path: pathlib.Path) -> None: """duration_ms must be present even in dry-run mode.""" root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# orig\n"}) (root / "a.py").write_bytes(b"# dirty\n") result = _invoke(root, "--dry-run", "--json", "a.py") assert result.exit_code == 0 data = json.loads(result.stdout) assert "duration_ms" in data # --------------------------------------------------------------------------- # Data integrity tier # --------------------------------------------------------------------------- def test_restore_json_schema_complete_on_success(tmp_path: pathlib.Path) -> None: """All required JSON fields are present with correct types on success.""" root = _init_repo(tmp_path) _commit_files(root, {"s.py": b"# orig\n"}) (root / "s.py").write_bytes(b"# dirty\n") result = _invoke(root, "--json", "s.py") assert result.exit_code == 0 data = json.loads(result.stdout) assert isinstance(data["restored"], list) assert isinstance(data["not_found"], list) assert isinstance(data["dry_run"], bool) assert isinstance(data["staged"], bool) assert isinstance(data["worktree"], bool) assert isinstance(data["duration_ms"], (int, float)) assert isinstance(data["exit_code"], int) def test_restore_json_exit_code_zero_on_success(tmp_path: pathlib.Path) -> None: """exit_code in JSON is 0 when all files are restored successfully.""" root = _init_repo(tmp_path) _commit_files(root, {"ok.py": b"# orig\n"}) (root / "ok.py").write_bytes(b"# dirty\n") result = _invoke(root, "--json", "ok.py") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["exit_code"] == 0 def test_restore_json_exit_code_one_when_file_not_found(tmp_path: pathlib.Path) -> None: """exit_code in JSON is 1 (USER_ERROR) when a file is not in source.""" root = _init_repo(tmp_path) _commit_files(root, {"anchor.py": b"# anchor\n"}) result = _invoke(root, "--json", "ghost.py") assert result.exit_code != 0 data = json.loads(result.stdout) assert data["exit_code"] == 1 def test_restore_json_restored_list_correct(tmp_path: pathlib.Path) -> None: """restored list contains exactly the successfully restored paths.""" root = _init_repo(tmp_path) _commit_files(root, {"x.py": b"# x\n", "y.py": b"# y\n"}) (root / "x.py").write_bytes(b"# dirty x\n") (root / "y.py").write_bytes(b"# dirty y\n") result = _invoke(root, "--json", "x.py", "y.py") data = json.loads(result.stdout) assert sorted(data["restored"]) == ["x.py", "y.py"] assert data["not_found"] == [] def test_restore_json_not_found_list_correct(tmp_path: pathlib.Path) -> None: """not_found list contains paths that were absent from the source manifest.""" root = _init_repo(tmp_path) _commit_files(root, {"real.py": b"# real\n"}) (root / "real.py").write_bytes(b"# dirty\n") result = _invoke(root, "--json", "real.py", "ghost.py") data = json.loads(result.stdout) assert "real.py" in data["restored"] assert "ghost.py" in data["not_found"] def test_restore_json_staged_and_worktree_flags_reflect_args(tmp_path: pathlib.Path) -> None: """staged/worktree fields in JSON reflect the CLI flags used.""" root = _init_repo(tmp_path) _commit_files(root, {"f.py": b"# orig\n"}) obj_id = blob_id(b"# mod\n") write_object(root, obj_id, b"# mod\n") stage: StagedFileMap = {"f.py": make_entry(obj_id, "M")} write_stage(root, stage) result = _invoke(root, "--staged", "--worktree", "--json", "f.py") data = json.loads(result.stdout) assert data["staged"] is True assert data["worktree"] is True # --------------------------------------------------------------------------- # Error mapping — object store corruption → exit code 3 # --------------------------------------------------------------------------- def test_restore_missing_object_exits_3(tmp_path: pathlib.Path) -> None: """When an object_id is in the manifest but missing from the store, exit code must be 3.""" root = _init_repo(tmp_path) content = b"# original\n" obj_id = blob_id(content) # Build a manifest pointing at an object that is NOT in the store. # We write the commit but deliberately don't call write_object. manifest: Manifest = {"corrupt.py": obj_id} snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime.now(datetime.timezone.utc) global _counter _counter += 1 commit_id = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message=f"corrupt commit {_counter}", committed_at_iso=committed_at.isoformat(), ) write_commit( root, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=f"corrupt commit {_counter}", committed_at=committed_at, ), ) (heads_dir(root) / "main").write_text(commit_id, encoding="utf-8") # Create the file on disk so path resolution doesn't fail (root / "corrupt.py").write_bytes(b"# dirty\n") result = _invoke(root, "corrupt.py") assert result.exit_code == 3, ( f"Expected exit code 3 (INTERNAL_ERROR) for missing object, got {result.exit_code}" ) def test_restore_missing_object_json_exit_code_3(tmp_path: pathlib.Path) -> None: """JSON exit_code is 3 when the object is missing from the store.""" root = _init_repo(tmp_path) content = b"# original\n" obj_id = blob_id(content) manifest: Manifest = {"corrupt2.py": obj_id} snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime.now(datetime.timezone.utc) global _counter _counter += 1 commit_id = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message=f"corrupt2 {_counter}", committed_at_iso=committed_at.isoformat(), ) write_commit( root, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=f"corrupt2 {_counter}", committed_at=committed_at, ), ) (heads_dir(root) / "main").write_text(commit_id, encoding="utf-8") (root / "corrupt2.py").write_bytes(b"# dirty\n") result = _invoke(root, "--json", "corrupt2.py") assert result.exit_code == 3 data = json.loads(result.stdout) assert data["exit_code"] == 3 # --------------------------------------------------------------------------- # Concurrent restore # --------------------------------------------------------------------------- def test_restore_concurrent_independent_files(tmp_path: pathlib.Path) -> None: """Two threads restore independent files without racing or corrupting each other.""" root = _init_repo(tmp_path) original_a = b"# thread-a original\n" original_b = b"# thread-b original\n" _commit_files(root, {"ta.py": original_a, "tb.py": original_b}) (root / "ta.py").write_bytes(b"# dirty a\n") (root / "tb.py").write_bytes(b"# dirty b\n") errors: list[Exception] = [] def restore_a() -> None: try: result = _invoke(root, "ta.py") assert result.exit_code == 0, f"thread-a exit {result.exit_code}" except Exception as exc: errors.append(exc) def restore_b() -> None: try: result = _invoke(root, "tb.py") assert result.exit_code == 0, f"thread-b exit {result.exit_code}" except Exception as exc: errors.append(exc) t1 = threading.Thread(target=restore_a) t2 = threading.Thread(target=restore_b) t1.start() t2.start() t1.join(timeout=10) t2.join(timeout=10) assert not errors, f"Concurrent restore errors: {errors}" assert (root / "ta.py").read_bytes() == original_a assert (root / "tb.py").read_bytes() == original_b # --------------------------------------------------------------------------- # --source --staged combo # --------------------------------------------------------------------------- def test_restore_source_and_staged_clears_stage_from_source(tmp_path: pathlib.Path) -> None: """--source --staged clears the stage entry so it matches source.""" root = _init_repo(tmp_path) v1_content = b"# v1\n" v1_commit = _commit_files(root, {"versioned.py": v1_content}) # Update to v2 v2_content = b"# v2\n" _commit_files(root, {"versioned.py": v2_content}) # Stage a modification on top of v2 mod_content = b"# staged mod\n" obj_id = blob_id(mod_content) write_object(root, obj_id, mod_content) stage: StagedFileMap = {"versioned.py": make_entry(obj_id, "M")} write_stage(root, stage) # --source v1_commit --staged should clear the stage entry result = _invoke(root, "--source", v1_commit, "--staged", "versioned.py") assert result.exit_code == 0 stage_after = read_stage(root) assert "versioned.py" not in stage_after def test_restore_source_staged_worktree_restores_from_source(tmp_path: pathlib.Path) -> None: """--source --staged --worktree restores disk from source, clears stage.""" root = _init_repo(tmp_path) v1_content = b"# v1 original\n" v1_commit = _commit_files(root, {"combo.py": v1_content}) _commit_files(root, {"combo.py": b"# v2\n"}) mod_content = b"# staged mod\n" obj_id = blob_id(mod_content) write_object(root, obj_id, mod_content) stage: StagedFileMap = {"combo.py": make_entry(obj_id, "M")} write_stage(root, stage) (root / "combo.py").write_bytes(b"# dirty disk\n") result = _invoke(root, "--source", v1_commit, "--staged", "--worktree", "combo.py") assert result.exit_code == 0 assert (root / "combo.py").read_bytes() == v1_content stage_after = read_stage(root) assert "combo.py" not in stage_after # --------------------------------------------------------------------------- # Text summary output # --------------------------------------------------------------------------- def test_restore_text_output_summary_line(tmp_path: pathlib.Path) -> None: """Text output includes a summary line like 'Restored 2 file(s)'.""" root = _init_repo(tmp_path) _commit_files(root, {"p.py": b"# p\n", "q.py": b"# q\n"}) (root / "p.py").write_bytes(b"# dirty p\n") (root / "q.py").write_bytes(b"# dirty q\n") result = _invoke(root, "p.py", "q.py") assert result.exit_code == 0 output = result.stdout + (result.stderr or "") assert "2" in output, f"Expected count in output: {output!r}" def test_restore_text_output_errors_noted(tmp_path: pathlib.Path) -> None: """Text output notes how many errors occurred when some paths fail.""" root = _init_repo(tmp_path) _commit_files(root, {"real.py": b"# real\n"}) (root / "real.py").write_bytes(b"# dirty\n") result = _invoke(root, "real.py", "ghost.py") assert result.exit_code != 0 output = (result.stdout or "") + (result.stderr or "") # Should mention the failure somehow assert "ghost" in output or "error" in output.lower() or "not" in output.lower() # --------------------------------------------------------------------------- # _resolve_source_manifest — docstring gap: bad ref returns {}, never raises # --------------------------------------------------------------------------- def test_resolve_source_manifest_bad_ref_returns_empty(tmp_path: pathlib.Path) -> None: """_resolve_source_manifest returns {} for a non-existent ref — never raises.""" from muse.cli.commands.restore import _resolve_source_manifest root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) result = _resolve_source_manifest(root, source_ref="nonexistent-branch-xyz") assert result == {} def test_resolve_source_manifest_valid_ref(tmp_path: pathlib.Path) -> None: """_resolve_source_manifest resolves a valid branch name to its manifest.""" from muse.cli.commands.restore import _resolve_source_manifest root = _init_repo(tmp_path) content = b"# branch content\n" _commit_files(root, {"b.py": content}, branch="main") manifest = _resolve_source_manifest(root, source_ref="main") assert "b.py" in manifest assert manifest["b.py"] == blob_id(content) # --------------------------------------------------------------------------- # Edge: restore staged-only with --source doesn't require file on disk # --------------------------------------------------------------------------- def test_restore_staged_only_source_does_not_require_disk_file(tmp_path: pathlib.Path) -> None: """--staged with --source works even when the disk file doesn't exist.""" root = _init_repo(tmp_path) v1_commit = _commit_files(root, {"staged_only.py": b"# v1\n"}) # Stage a modification obj_id = blob_id(b"# mod\n") write_object(root, obj_id, b"# mod\n") stage: StagedFileMap = {"staged_only.py": make_entry(obj_id, "M")} write_stage(root, stage) # Delete disk file (root / "staged_only.py").unlink() result = _invoke(root, "--source", v1_commit, "--staged", "staged_only.py") assert result.exit_code == 0 stage_after = read_stage(root) assert "staged_only.py" not in stage_after # --------------------------------------------------------------------------- # Performance: duration_ms for 50-file restore is under 10 seconds # --------------------------------------------------------------------------- def test_restore_50_files_duration_ms_reasonable(tmp_path: pathlib.Path) -> None: """50-file restore reports duration_ms and completes under 10 seconds.""" root = _init_repo(tmp_path) files = {f"perf_{i}.py": f"# orig {i}\n".encode() for i in range(50)} _commit_files(root, files) for name in files: (root / name).write_bytes(b"# dirty\n") result = _invoke(root, "--json", *files.keys()) assert result.exit_code == 0 data = json.loads(result.stdout) assert "duration_ms" in data assert data["duration_ms"] < 10_000 assert len(data["restored"]) == 50 class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: import argparse from muse.cli.commands.restore import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["restore", "src/billing.py"]) assert args.json_out is False def test_json_flag_sets_json_out(self) -> None: import argparse from muse.cli.commands.restore import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["restore", "src/billing.py", "--json"]) assert args.json_out is True def test_j_shorthand_sets_json_out(self) -> None: import argparse from muse.cli.commands.restore import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["restore", "src/billing.py", "-j"]) assert args.json_out is True