"""Comprehensive tests for ``muse snapshot`` subcommands. Covers gaps in the original test_cmd_snapshot.py: * JSON envelope — duration_ms / exit_code on all four subcommands * JSON schema completeness — all documented fields, correct types * Bug regression — sha256: prefix round-trip through _list_all_snapshots / _resolve_snapshot (bare-hex stem bug) * Data integrity — create → export tar.gz/zip → extract → verify file content * Security — ANSI escape injection in note, symlink skip in snapshots dir, path traversal rejected by _validate_snapshot_id_prefix / _safe_arcname, zip-slip guard for crafted manifest entries * Text mode — ``snapshot read --text`` output format * --prefix — files nested under prefix directory inside archive * Limit validation — limit=0 rejected, limit=1 honoured, limit clamps output * Idempotency — identical working-tree always produces the same snapshot_id * Empty list envelope — snapshot list --json returns envelope even when empty * Concurrent stress — N parallel snapshot creates, all independent and valid * Large file export — single 5 MiB file round-trips correctly """ from __future__ import annotations from collections.abc import Mapping import json import os import pathlib import tarfile import threading import zipfile import pytest from muse.core.types import short_id, split_id from muse.core.paths import muse_dir, snapshots_dir from tests.cli_test_helper import CliRunner cli = None # argparse migration — CliRunner ignores this arg runner = CliRunner() # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _init_repo(path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(path) for d in ("commits", "snapshots", "objects", "refs/heads"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": "snap-supercharge", "domain": "code"}), encoding="utf-8", ) return path def _env(repo: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(repo)} def _create_files(root: pathlib.Path, count: int = 3) -> list[str]: names: list[str] = [] for i in range(count): name = f"file_{i}.txt" (root / name).write_text(f"content-{i}", encoding="utf-8") names.append(name) return names def _create_snapshot(root: pathlib.Path, note: str = "") -> Mapping[str, object]: """Create a snapshot and return the parsed JSON output.""" cmd = ["snapshot", "create", "--json"] if note: cmd += ["-m", note] result = runner.invoke(cli, cmd, env=_env(root)) assert result.exit_code == 0, result.output return json.loads(result.output) # --------------------------------------------------------------------------- # JSON envelope — duration_ms / exit_code # --------------------------------------------------------------------------- class TestJsonEnvelope: """Every --json subcommand must include duration_ms and exit_code.""" def test_create_has_duration_ms(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 1) data = _create_snapshot(tmp_path) assert "duration_ms" in data assert isinstance(data["duration_ms"], (int, float)) assert data["duration_ms"] >= 0 def test_create_has_exit_code_zero(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 1) data = _create_snapshot(tmp_path) assert data["exit_code"] == 0 def test_list_has_duration_ms(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 1) _create_snapshot(tmp_path) result = runner.invoke(cli, ["snapshot", "list", "--json"], env=_env(tmp_path)) assert result.exit_code == 0 data = json.loads(result.output) assert "duration_ms" in data assert isinstance(data["duration_ms"], (int, float)) assert data["duration_ms"] >= 0 def test_list_has_exit_code_zero(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 1) _create_snapshot(tmp_path) result = runner.invoke(cli, ["snapshot", "list", "--json"], env=_env(tmp_path)) data = json.loads(result.output) assert data["exit_code"] == 0 def test_list_empty_has_envelope(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) result = runner.invoke(cli, ["snapshot", "list", "--json"], env=_env(tmp_path)) assert result.exit_code == 0 data = json.loads(result.output) assert data["snapshots"] == [] assert "duration_ms" in data assert data["exit_code"] == 0 def test_read_has_duration_ms(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 1) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] result = runner.invoke(cli, ["snapshot", "read", snap_id, "--json"], env=_env(tmp_path)) assert result.exit_code == 0 data = json.loads(result.output) assert "duration_ms" in data assert isinstance(data["duration_ms"], (int, float)) assert data["duration_ms"] >= 0 def test_read_has_exit_code_zero(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 1) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] result = runner.invoke(cli, ["snapshot", "read", snap_id, "--json"], env=_env(tmp_path)) data = json.loads(result.output) assert data["exit_code"] == 0 def test_export_has_duration_ms(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 1) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] out = tmp_path / "out.tar.gz" result = runner.invoke( cli, ["snapshot", "export", snap_id, "--output", str(out), "--json"], env=_env(tmp_path), ) assert result.exit_code == 0 data = json.loads(result.output) assert "duration_ms" in data assert isinstance(data["duration_ms"], (int, float)) assert data["duration_ms"] >= 0 def test_export_has_exit_code_zero(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 1) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] out = tmp_path / "out.tar.gz" result = runner.invoke( cli, ["snapshot", "export", snap_id, "--output", str(out), "--json"], env=_env(tmp_path), ) data = json.loads(result.output) assert data["exit_code"] == 0 # --------------------------------------------------------------------------- # JSON schema completeness # --------------------------------------------------------------------------- class TestJsonSchemaCompleteness: """All documented fields must be present with correct types.""" def test_create_schema(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 2) data = _create_snapshot(tmp_path, note="schema-test") assert isinstance(data["repo_id"], str) assert isinstance(data["snapshot_id"], str) assert data["snapshot_id"].startswith("sha256:") assert isinstance(data["file_count"], int) assert data["file_count"] >= 1 assert isinstance(data["note"], str) assert data["note"] == "schema-test" assert isinstance(data["created_at"], str) # ISO-8601: basic sanity check assert "T" in data["created_at"] or "-" in data["created_at"] assert isinstance(data["duration_ms"], (int, float)) assert isinstance(data["exit_code"], int) def test_list_schema(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 2) _create_snapshot(tmp_path, note="list-schema") result = runner.invoke(cli, ["snapshot", "list", "--json"], env=_env(tmp_path)) assert result.exit_code == 0 data = json.loads(result.output) assert "snapshots" in data assert isinstance(data["snapshots"], list) assert "duration_ms" in data assert "exit_code" in data item = data["snapshots"][0] assert isinstance(item["snapshot_id"], str) assert item["snapshot_id"].startswith("sha256:") assert isinstance(item["file_count"], int) assert isinstance(item["note"], str) assert isinstance(item["created_at"], str) def test_read_schema(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 2) created = _create_snapshot(tmp_path, note="read-schema") snap_id = created["snapshot_id"] result = runner.invoke(cli, ["snapshot", "read", snap_id, "--json"], env=_env(tmp_path)) assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data["snapshot_id"], str) assert data["snapshot_id"].startswith("sha256:") assert isinstance(data["created_at"], str) assert isinstance(data["file_count"], int) assert isinstance(data["note"], str) assert isinstance(data["manifest"], dict) assert len(data["manifest"]) == data["file_count"] assert isinstance(data["duration_ms"], (int, float)) assert isinstance(data["exit_code"], int) def test_export_schema(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 2) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] out = tmp_path / "schema.tar.gz" result = runner.invoke( cli, ["snapshot", "export", snap_id, "--output", str(out), "--json"], env=_env(tmp_path), ) assert result.exit_code == 0 data = json.loads(result.output) assert isinstance(data["snapshot_id"], str) assert isinstance(data["output"], str) assert data["format"] in ("tar.gz", "zip") assert isinstance(data["file_count"], int) assert isinstance(data["size_bytes"], int) assert data["size_bytes"] > 0 assert isinstance(data["duration_ms"], (int, float)) assert isinstance(data["exit_code"], int) def test_manifest_keys_are_sorted(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) # Create files in reverse alpha order to verify manifest sorts them. for name in ("zzz.txt", "aaa.txt", "mmm.txt"): (tmp_path / name).write_text(name, encoding="utf-8") created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] result = runner.invoke(cli, ["snapshot", "read", snap_id, "--json"], env=_env(tmp_path)) data = json.loads(result.output) keys = list(data["manifest"].keys()) assert keys == sorted(keys) # --------------------------------------------------------------------------- # Bug regression — sha256: prefix round-trip # --------------------------------------------------------------------------- class TestSha256PrefixRoundTrip: """Regression for the bare-hex-stem bug: _list_all_snapshots and _resolve_snapshot were passing path.stem (bare hex) to read_snapshot, which then compared it against compute_snapshot_id output (sha256: prefixed), causing every snapshot to fail content-hash verification and appear missing.""" def test_list_after_create_returns_snapshot(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 2) created = _create_snapshot(tmp_path) result = runner.invoke(cli, ["snapshot", "list", "--json"], env=_env(tmp_path)) assert result.exit_code == 0 data = json.loads(result.output) ids = [s["snapshot_id"] for s in data["snapshots"]] assert created["snapshot_id"] in ids def test_read_by_full_id_succeeds(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 1) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] result = runner.invoke(cli, ["snapshot", "read", snap_id], env=_env(tmp_path)) assert result.exit_code == 0 def test_bare_hex_prefix_rejected(self, tmp_path: pathlib.Path) -> None: """Bare hex prefix (no sha256: type tag) must be rejected at the CLI boundary.""" _init_repo(tmp_path) _create_files(tmp_path, 1) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] result = runner.invoke(cli, ["snapshot", "read", short_id(snap_id, strip=True)], env=_env(tmp_path)) assert result.exit_code != 0 def test_read_by_sha256_prefix_succeeds(self, tmp_path: pathlib.Path) -> None: """Full sha256:... ID passed to snapshot read must resolve.""" _init_repo(tmp_path) _create_files(tmp_path, 1) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] result = runner.invoke(cli, ["snapshot", "read", snap_id, "--json"], env=_env(tmp_path)) assert result.exit_code == 0 data = json.loads(result.output) assert data["snapshot_id"] == snap_id def test_snapshot_id_in_read_matches_create(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 2) created = _create_snapshot(tmp_path) result = runner.invoke(cli, ["snapshot", "read", created["snapshot_id"], "--json"], env=_env(tmp_path)) data = json.loads(result.output) assert data["snapshot_id"] == created["snapshot_id"] # --------------------------------------------------------------------------- # Data integrity — create → export → verify content # --------------------------------------------------------------------------- class TestDataIntegrity: """File contents written to archives must match the original source files.""" def test_tar_gz_content_matches_source(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) names = _create_files(tmp_path, 3) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] out = tmp_path / "integrity.tar.gz" runner.invoke( cli, ["snapshot", "export", snap_id, "--output", str(out)], env=_env(tmp_path), ) assert out.exists() with tarfile.open(out, "r:gz") as tar: members = {m.name: m for m in tar.getmembers()} for name in names: match = [k for k in members if k.endswith(name)] assert match, f"{name} not found in archive" content = tar.extractfile(members[match[0]]) assert content is not None extracted = content.read().decode("utf-8") expected = (tmp_path / name).read_text(encoding="utf-8") assert extracted == expected, f"content mismatch for {name}" def test_zip_content_matches_source(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) names = _create_files(tmp_path, 3) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] out = tmp_path / "integrity.zip" runner.invoke( cli, ["snapshot", "export", snap_id, "--format", "zip", "--output", str(out)], env=_env(tmp_path), ) assert out.exists() with zipfile.ZipFile(out, "r") as zf: namelist = zf.namelist() for name in names: match = [k for k in namelist if k.endswith(name)] assert match, f"{name} not found in zip" extracted = zf.read(match[0]).decode("utf-8") expected = (tmp_path / name).read_text(encoding="utf-8") assert extracted == expected, f"content mismatch for {name}" def test_export_file_count_matches_snapshot(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 4) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] out = tmp_path / "count.tar.gz" result = runner.invoke( cli, ["snapshot", "export", snap_id, "--output", str(out), "--json"], env=_env(tmp_path), ) assert result.exit_code == 0 data = json.loads(result.output) assert data["file_count"] == created["file_count"] def test_export_size_bytes_matches_disk(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 2) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] out = tmp_path / "size.tar.gz" result = runner.invoke( cli, ["snapshot", "export", snap_id, "--output", str(out), "--json"], env=_env(tmp_path), ) data = json.loads(result.output) assert data["size_bytes"] == out.stat().st_size # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestSecurity: """Security properties of snapshot commands.""" def test_ansi_escape_in_note_sanitized_in_text_output(self, tmp_path: pathlib.Path) -> None: """ANSI escape sequences in notes must not reach the terminal raw.""" _init_repo(tmp_path) _create_files(tmp_path, 1) malicious_note = "\x1b[31mred\x1b[0m" result = runner.invoke( cli, ["snapshot", "create", "-m", malicious_note], env=_env(tmp_path) ) assert result.exit_code == 0 # ANSI escape character should not appear verbatim in text output. assert "\x1b" not in result.output def test_note_appears_sanitized_in_list_text(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 1) malicious_note = "\x1b[1mBOLD\x1b[0m" _create_snapshot(tmp_path, note=malicious_note) result = runner.invoke(cli, ["snapshot", "list"], env=_env(tmp_path)) assert result.exit_code == 0 assert "\x1b" not in result.output def test_symlink_in_objects_dir_is_skipped(self, tmp_path: pathlib.Path) -> None: """A symlink inside .muse/objects/ must not be read as a snapshot.""" from muse.core.paths import objects_dir _init_repo(tmp_path) _create_files(tmp_path, 1) created = _create_snapshot(tmp_path) objs_dir = objects_dir(tmp_path) # Plant a symlink in the object store pointing to an unrelated file. target = tmp_path / "some_file.txt" target.write_bytes(b"payload") shard_dir = objs_dir / "sha256" / "de" shard_dir.mkdir(parents=True, exist_ok=True) fake_name = "ad" + "0" * 60 link = shard_dir / fake_name try: link.symlink_to(target) except (OSError, NotImplementedError): pytest.skip("symlinks not supported on this platform") result = runner.invoke(cli, ["snapshot", "list", "--json"], env=_env(tmp_path)) assert result.exit_code == 0 data = json.loads(result.output) # Only the legitimately created snapshot should appear. ids = [s["snapshot_id"] for s in data["snapshots"]] assert len(ids) == 1 assert ids[0] == created["snapshot_id"] def test_path_traversal_in_snapshot_id_prefix_is_safe(self, tmp_path: pathlib.Path) -> None: """A crafted snapshot_id with ../ must not escape the snapshots dir.""" _init_repo(tmp_path) result = runner.invoke( cli, ["snapshot", "read", "../../etc/passwd"], env=_env(tmp_path), ) # Must fail gracefully — not crash, not read /etc/passwd. assert result.exit_code != 0 def test_safe_arcname_rejects_dotdot_path(self, tmp_path: pathlib.Path) -> None: """_safe_arcname must return None for paths with .. segments.""" from muse.cli.commands.snapshot_cmd import _safe_arcname assert _safe_arcname("", "../etc/passwd") is None assert _safe_arcname("prefix", "../../secret") is None def test_safe_arcname_rejects_absolute_path(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_cmd import _safe_arcname assert _safe_arcname("", "/etc/passwd") is None assert _safe_arcname("prefix", "/root/.ssh/id_rsa") is None def test_safe_arcname_accepts_normal_path(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.snapshot_cmd import _safe_arcname assert _safe_arcname("", "src/main.py") == "src/main.py" assert _safe_arcname("myproject", "lib/util.py") == "myproject/lib/util.py" def test_safe_arcname_rejects_dotdot_in_prefix(self) -> None: from muse.cli.commands.snapshot_cmd import _safe_arcname assert _safe_arcname("../escape", "file.txt") is None # --------------------------------------------------------------------------- # Text mode — snapshot read --text # --------------------------------------------------------------------------- class TestTextMode: def test_read_text_shows_snapshot_id(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 2) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] result = runner.invoke( cli, ["snapshot", "read", snap_id], env=_env(tmp_path) ) assert result.exit_code == 0 assert "snapshot_id" in result.output assert snap_id in result.output def test_read_text_shows_file_list(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 2) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] result = runner.invoke( cli, ["snapshot", "read", snap_id], env=_env(tmp_path) ) assert result.exit_code == 0 assert "file" in result.output.lower() or "files" in result.output.lower() def test_read_text_shows_note_when_set(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 1) created = _create_snapshot(tmp_path, note="my-label") snap_id = created["snapshot_id"] result = runner.invoke( cli, ["snapshot", "read", snap_id], env=_env(tmp_path) ) assert result.exit_code == 0 assert "my-label" in result.output def test_read_text_is_not_valid_json(self, tmp_path: pathlib.Path) -> None: """--text output must not be machine-parseable JSON.""" _init_repo(tmp_path) _create_files(tmp_path, 1) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] result = runner.invoke( cli, ["snapshot", "read", snap_id], env=_env(tmp_path) ) assert result.exit_code == 0 with pytest.raises((json.JSONDecodeError, ValueError)): json.loads(result.output) # --------------------------------------------------------------------------- # --prefix export # --------------------------------------------------------------------------- class TestPrefixExport: def test_tar_gz_files_nested_under_prefix(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 2) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] out = tmp_path / "prefixed.tar.gz" runner.invoke( cli, ["snapshot", "export", snap_id, "--prefix", "myproject", "--output", str(out)], env=_env(tmp_path), ) assert out.exists() with tarfile.open(out, "r:gz") as tar: names = tar.getnames() assert all(n.startswith("myproject/") for n in names), names def test_zip_files_nested_under_prefix(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 2) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] out = tmp_path / "prefixed.zip" runner.invoke( cli, [ "snapshot", "export", snap_id, "--format", "zip", "--prefix", "release", "--output", str(out), ], env=_env(tmp_path), ) assert out.exists() with zipfile.ZipFile(out, "r") as zf: names = zf.namelist() assert all(n.startswith("release/") for n in names), names def test_empty_prefix_uses_flat_layout(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 2) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] out = tmp_path / "flat.tar.gz" runner.invoke( cli, ["snapshot", "export", snap_id, "--prefix", "", "--output", str(out)], env=_env(tmp_path), ) assert out.exists() with tarfile.open(out, "r:gz") as tar: names = tar.getnames() assert all(not n.startswith("/") for n in names) # --------------------------------------------------------------------------- # Limit validation # --------------------------------------------------------------------------- class TestLimitValidation: def test_limit_zero_rejected(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) result = runner.invoke( cli, ["snapshot", "list", "--limit", "0"], env=_env(tmp_path) ) assert result.exit_code != 0 def test_limit_one_returns_at_most_one(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 1) for _ in range(3): _create_snapshot(tmp_path) result = runner.invoke( cli, ["snapshot", "list", "--limit", "1", "--json"], env=_env(tmp_path) ) assert result.exit_code == 0 data = json.loads(result.output) assert len(data["snapshots"]) <= 1 def test_negative_limit_rejected(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) result = runner.invoke( cli, ["snapshot", "list", "--limit", "-1"], env=_env(tmp_path) ) assert result.exit_code != 0 def test_short_flag_n_respected(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 1) for _ in range(4): _create_snapshot(tmp_path) result = runner.invoke( cli, ["snapshot", "list", "--limit", "2", "--json"], env=_env(tmp_path) ) assert result.exit_code == 0 data = json.loads(result.output) assert len(data["snapshots"]) <= 2 # --------------------------------------------------------------------------- # Idempotency — same tree → same snapshot_id # --------------------------------------------------------------------------- class TestIdempotency: def test_same_files_same_snapshot_id(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 3) first = _create_snapshot(tmp_path) second = _create_snapshot(tmp_path) assert first["snapshot_id"] == second["snapshot_id"] def test_different_content_different_snapshot_id(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _create_files(tmp_path, 2) first = _create_snapshot(tmp_path) # Modify a file. (tmp_path / "file_0.txt").write_text("changed-content", encoding="utf-8") second = _create_snapshot(tmp_path) assert first["snapshot_id"] != second["snapshot_id"] def test_list_shows_only_one_when_idempotent(self, tmp_path: pathlib.Path) -> None: """write_snapshot is idempotent — same ID written twice → one file.""" _init_repo(tmp_path) _create_files(tmp_path, 2) _create_snapshot(tmp_path) _create_snapshot(tmp_path) result = runner.invoke(cli, ["snapshot", "list", "--json"], env=_env(tmp_path)) data = json.loads(result.output) # De-duplicate by snapshot_id. ids = {s["snapshot_id"] for s in data["snapshots"]} assert len(ids) == 1 # --------------------------------------------------------------------------- # List ordering — newest first # --------------------------------------------------------------------------- class TestListOrdering: def test_list_newest_first(self, tmp_path: pathlib.Path) -> None: """Multiple distinct snapshots must be returned newest-first.""" _init_repo(tmp_path) snap_ids: list[str] = [] for i in range(3): (tmp_path / f"round_{i}.txt").write_text(f"v{i}", encoding="utf-8") created = _create_snapshot(tmp_path) snap_ids.append(created["snapshot_id"]) result = runner.invoke(cli, ["snapshot", "list", "--json"], env=_env(tmp_path)) data = json.loads(result.output) returned = [s["snapshot_id"] for s in data["snapshots"]] # Newest (last created) must appear first. assert returned[0] == snap_ids[-1] # --------------------------------------------------------------------------- # Concurrent stress # --------------------------------------------------------------------------- class TestConcurrentStress: def test_concurrent_creates_all_succeed(self, tmp_path: pathlib.Path) -> None: """N threads creating snapshots concurrently must all succeed.""" _init_repo(tmp_path) _create_files(tmp_path, 5) n_threads = 8 errors: list[str] = [] results: list[dict] = [] lock = threading.Lock() def _do_create() -> None: result = runner.invoke( cli, ["snapshot", "create", "--json"], env=_env(tmp_path) ) with lock: if result.exit_code != 0: errors.append(result.output) else: results.append(json.loads(result.output)) threads = [threading.Thread(target=_do_create) for _ in range(n_threads)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Some creates failed: {errors}" assert len(results) == n_threads # All results have a valid snapshot_id. for r in results: assert r["snapshot_id"].startswith("sha256:") assert r["exit_code"] == 0 # --------------------------------------------------------------------------- # Large file stress # --------------------------------------------------------------------------- class TestLargeFileExport: def test_large_file_round_trips_correctly(self, tmp_path: pathlib.Path) -> None: """A 5 MiB file must survive create → export → extract unchanged.""" _init_repo(tmp_path) payload = os.urandom(5 * 1024 * 1024) (tmp_path / "big.bin").write_bytes(payload) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] out = tmp_path / "big.tar.gz" result = runner.invoke( cli, ["snapshot", "export", snap_id, "--output", str(out), "--json"], env=_env(tmp_path), ) assert result.exit_code == 0 data = json.loads(result.output) assert data["file_count"] >= 1 assert data["size_bytes"] > 0 assert out.exists() # Verify archive actually opens. assert tarfile.is_tarfile(str(out)) with tarfile.open(out, "r:gz") as tar: members = [m for m in tar.getmembers() if m.name.endswith("big.bin")] assert members, "big.bin not found in archive" content = tar.extractfile(members[0]) assert content is not None assert content.read() == payload # --------------------------------------------------------------------------- # Export to default filename # --------------------------------------------------------------------------- class TestDefaultFilename: def test_export_default_filename_is_short_id_dot_format(self, tmp_path: pathlib.Path) -> None: """When --output is omitted, the archive uses ..""" _init_repo(tmp_path) _create_files(tmp_path, 1) created = _create_snapshot(tmp_path) snap_id = created["snapshot_id"] # Run from tmp_path so the default output lands there. orig_dir = pathlib.Path.cwd() os.chdir(tmp_path) try: result = runner.invoke( cli, ["snapshot", "export", snap_id, "--json"], env=_env(tmp_path) ) finally: os.chdir(orig_dir) assert result.exit_code == 0 data = json.loads(result.output) assert data["output"].endswith(".tar.gz") assert pathlib.Path(tmp_path / data["output"]).exists() or pathlib.Path(data["output"]).exists() def test_export_not_found_exits_nonzero(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) result = runner.invoke( cli, ["snapshot", "export", "nonexistent"], env=_env(tmp_path) ) assert result.exit_code != 0