gabriel / muse public
test_snapshot_supercharge.py python
832 lines 33.2 KB
Raw
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 6 days ago
1 """Comprehensive tests for ``muse snapshot`` subcommands.
2
3 Covers gaps in the original test_cmd_snapshot.py:
4
5 * JSON envelope — duration_ms / exit_code on all four subcommands
6 * JSON schema completeness — all documented fields, correct types
7 * Bug regression — sha256: prefix round-trip through _list_all_snapshots /
8 _resolve_snapshot (bare-hex stem bug)
9 * Data integrity — create → export tar.gz/zip → extract → verify file content
10 * Security — ANSI escape injection in note, symlink skip in snapshots dir,
11 path traversal rejected by _validate_snapshot_id_prefix / _safe_arcname,
12 zip-slip guard for crafted manifest entries
13 * Text mode — ``snapshot read --text`` output format
14 * --prefix — files nested under prefix directory inside archive
15 * Limit validation — limit=0 rejected, limit=1 honoured, limit clamps output
16 * Idempotency — identical working-tree always produces the same snapshot_id
17 * Empty list envelope — snapshot list --json returns envelope even when empty
18 * Concurrent stress — N parallel snapshot creates, all independent and valid
19 * Large file export — single 5 MiB file round-trips correctly
20 """
21
22 from __future__ import annotations
23 from collections.abc import Mapping
24
25 import json
26 import os
27 import pathlib
28 import tarfile
29 import threading
30 import zipfile
31
32 import pytest
33
34 from muse.core.types import short_id, split_id
35 from muse.core.paths import muse_dir, snapshots_dir
36 from tests.cli_test_helper import CliRunner
37
38 cli = None # argparse migration — CliRunner ignores this arg
39
40 runner = CliRunner()
41
42
43 # ---------------------------------------------------------------------------
44 # Shared helpers
45 # ---------------------------------------------------------------------------
46
47
48 def _init_repo(path: pathlib.Path) -> pathlib.Path:
49 dot_muse = muse_dir(path)
50 for d in ("commits", "snapshots", "objects", "refs/heads"):
51 (dot_muse / d).mkdir(parents=True, exist_ok=True)
52 (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
53 (dot_muse / "repo.json").write_text(
54 json.dumps({"repo_id": "snap-supercharge", "domain": "code"}),
55 encoding="utf-8",
56 )
57 return path
58
59
60 def _env(repo: pathlib.Path) -> Mapping[str, str]:
61 return {"MUSE_REPO_ROOT": str(repo)}
62
63
64 def _create_files(root: pathlib.Path, count: int = 3) -> list[str]:
65 names: list[str] = []
66 for i in range(count):
67 name = f"file_{i}.txt"
68 (root / name).write_text(f"content-{i}", encoding="utf-8")
69 names.append(name)
70 return names
71
72
73 def _create_snapshot(root: pathlib.Path, note: str = "") -> Mapping[str, object]:
74 """Create a snapshot and return the parsed JSON output."""
75 cmd = ["snapshot", "create", "--json"]
76 if note:
77 cmd += ["-m", note]
78 result = runner.invoke(cli, cmd, env=_env(root))
79 assert result.exit_code == 0, result.output
80 return json.loads(result.output)
81
82
83 # ---------------------------------------------------------------------------
84 # JSON envelope — duration_ms / exit_code
85 # ---------------------------------------------------------------------------
86
87
88 class TestJsonEnvelope:
89 """Every --json subcommand must include duration_ms and exit_code."""
90
91 def test_create_has_duration_ms(self, tmp_path: pathlib.Path) -> None:
92 _init_repo(tmp_path)
93 _create_files(tmp_path, 1)
94 data = _create_snapshot(tmp_path)
95 assert "duration_ms" in data
96 assert isinstance(data["duration_ms"], (int, float))
97 assert data["duration_ms"] >= 0
98
99 def test_create_has_exit_code_zero(self, tmp_path: pathlib.Path) -> None:
100 _init_repo(tmp_path)
101 _create_files(tmp_path, 1)
102 data = _create_snapshot(tmp_path)
103 assert data["exit_code"] == 0
104
105 def test_list_has_duration_ms(self, tmp_path: pathlib.Path) -> None:
106 _init_repo(tmp_path)
107 _create_files(tmp_path, 1)
108 _create_snapshot(tmp_path)
109 result = runner.invoke(cli, ["snapshot", "list", "--json"], env=_env(tmp_path))
110 assert result.exit_code == 0
111 data = json.loads(result.output)
112 assert "duration_ms" in data
113 assert isinstance(data["duration_ms"], (int, float))
114 assert data["duration_ms"] >= 0
115
116 def test_list_has_exit_code_zero(self, tmp_path: pathlib.Path) -> None:
117 _init_repo(tmp_path)
118 _create_files(tmp_path, 1)
119 _create_snapshot(tmp_path)
120 result = runner.invoke(cli, ["snapshot", "list", "--json"], env=_env(tmp_path))
121 data = json.loads(result.output)
122 assert data["exit_code"] == 0
123
124 def test_list_empty_has_envelope(self, tmp_path: pathlib.Path) -> None:
125 _init_repo(tmp_path)
126 result = runner.invoke(cli, ["snapshot", "list", "--json"], env=_env(tmp_path))
127 assert result.exit_code == 0
128 data = json.loads(result.output)
129 assert data["snapshots"] == []
130 assert "duration_ms" in data
131 assert data["exit_code"] == 0
132
133 def test_read_has_duration_ms(self, tmp_path: pathlib.Path) -> None:
134 _init_repo(tmp_path)
135 _create_files(tmp_path, 1)
136 created = _create_snapshot(tmp_path)
137 snap_id = created["snapshot_id"]
138 result = runner.invoke(cli, ["snapshot", "read", snap_id, "--json"], env=_env(tmp_path))
139 assert result.exit_code == 0
140 data = json.loads(result.output)
141 assert "duration_ms" in data
142 assert isinstance(data["duration_ms"], (int, float))
143 assert data["duration_ms"] >= 0
144
145 def test_read_has_exit_code_zero(self, tmp_path: pathlib.Path) -> None:
146 _init_repo(tmp_path)
147 _create_files(tmp_path, 1)
148 created = _create_snapshot(tmp_path)
149 snap_id = created["snapshot_id"]
150 result = runner.invoke(cli, ["snapshot", "read", snap_id, "--json"], env=_env(tmp_path))
151 data = json.loads(result.output)
152 assert data["exit_code"] == 0
153
154 def test_export_has_duration_ms(self, tmp_path: pathlib.Path) -> None:
155 _init_repo(tmp_path)
156 _create_files(tmp_path, 1)
157 created = _create_snapshot(tmp_path)
158 snap_id = created["snapshot_id"]
159 out = tmp_path / "out.tar.gz"
160 result = runner.invoke(
161 cli,
162 ["snapshot", "export", snap_id, "--output", str(out), "--json"],
163 env=_env(tmp_path),
164 )
165 assert result.exit_code == 0
166 data = json.loads(result.output)
167 assert "duration_ms" in data
168 assert isinstance(data["duration_ms"], (int, float))
169 assert data["duration_ms"] >= 0
170
171 def test_export_has_exit_code_zero(self, tmp_path: pathlib.Path) -> None:
172 _init_repo(tmp_path)
173 _create_files(tmp_path, 1)
174 created = _create_snapshot(tmp_path)
175 snap_id = created["snapshot_id"]
176 out = tmp_path / "out.tar.gz"
177 result = runner.invoke(
178 cli,
179 ["snapshot", "export", snap_id, "--output", str(out), "--json"],
180 env=_env(tmp_path),
181 )
182 data = json.loads(result.output)
183 assert data["exit_code"] == 0
184
185
186 # ---------------------------------------------------------------------------
187 # JSON schema completeness
188 # ---------------------------------------------------------------------------
189
190
191 class TestJsonSchemaCompleteness:
192 """All documented fields must be present with correct types."""
193
194 def test_create_schema(self, tmp_path: pathlib.Path) -> None:
195 _init_repo(tmp_path)
196 _create_files(tmp_path, 2)
197 data = _create_snapshot(tmp_path, note="schema-test")
198 assert isinstance(data["repo_id"], str)
199 assert isinstance(data["snapshot_id"], str)
200 assert data["snapshot_id"].startswith("sha256:")
201 assert isinstance(data["file_count"], int)
202 assert data["file_count"] >= 1
203 assert isinstance(data["note"], str)
204 assert data["note"] == "schema-test"
205 assert isinstance(data["created_at"], str)
206 # ISO-8601: basic sanity check
207 assert "T" in data["created_at"] or "-" in data["created_at"]
208 assert isinstance(data["duration_ms"], (int, float))
209 assert isinstance(data["exit_code"], int)
210
211 def test_list_schema(self, tmp_path: pathlib.Path) -> None:
212 _init_repo(tmp_path)
213 _create_files(tmp_path, 2)
214 _create_snapshot(tmp_path, note="list-schema")
215 result = runner.invoke(cli, ["snapshot", "list", "--json"], env=_env(tmp_path))
216 assert result.exit_code == 0
217 data = json.loads(result.output)
218 assert "snapshots" in data
219 assert isinstance(data["snapshots"], list)
220 assert "duration_ms" in data
221 assert "exit_code" in data
222 item = data["snapshots"][0]
223 assert isinstance(item["snapshot_id"], str)
224 assert item["snapshot_id"].startswith("sha256:")
225 assert isinstance(item["file_count"], int)
226 assert isinstance(item["note"], str)
227 assert isinstance(item["created_at"], str)
228
229 def test_read_schema(self, tmp_path: pathlib.Path) -> None:
230 _init_repo(tmp_path)
231 _create_files(tmp_path, 2)
232 created = _create_snapshot(tmp_path, note="read-schema")
233 snap_id = created["snapshot_id"]
234 result = runner.invoke(cli, ["snapshot", "read", snap_id, "--json"], env=_env(tmp_path))
235 assert result.exit_code == 0
236 data = json.loads(result.output)
237 assert isinstance(data["snapshot_id"], str)
238 assert data["snapshot_id"].startswith("sha256:")
239 assert isinstance(data["created_at"], str)
240 assert isinstance(data["file_count"], int)
241 assert isinstance(data["note"], str)
242 assert isinstance(data["manifest"], dict)
243 assert len(data["manifest"]) == data["file_count"]
244 assert isinstance(data["duration_ms"], (int, float))
245 assert isinstance(data["exit_code"], int)
246
247 def test_export_schema(self, tmp_path: pathlib.Path) -> None:
248 _init_repo(tmp_path)
249 _create_files(tmp_path, 2)
250 created = _create_snapshot(tmp_path)
251 snap_id = created["snapshot_id"]
252 out = tmp_path / "schema.tar.gz"
253 result = runner.invoke(
254 cli,
255 ["snapshot", "export", snap_id, "--output", str(out), "--json"],
256 env=_env(tmp_path),
257 )
258 assert result.exit_code == 0
259 data = json.loads(result.output)
260 assert isinstance(data["snapshot_id"], str)
261 assert isinstance(data["output"], str)
262 assert data["format"] in ("tar.gz", "zip")
263 assert isinstance(data["file_count"], int)
264 assert isinstance(data["size_bytes"], int)
265 assert data["size_bytes"] > 0
266 assert isinstance(data["duration_ms"], (int, float))
267 assert isinstance(data["exit_code"], int)
268
269 def test_manifest_keys_are_sorted(self, tmp_path: pathlib.Path) -> None:
270 _init_repo(tmp_path)
271 # Create files in reverse alpha order to verify manifest sorts them.
272 for name in ("zzz.txt", "aaa.txt", "mmm.txt"):
273 (tmp_path / name).write_text(name, encoding="utf-8")
274 created = _create_snapshot(tmp_path)
275 snap_id = created["snapshot_id"]
276 result = runner.invoke(cli, ["snapshot", "read", snap_id, "--json"], env=_env(tmp_path))
277 data = json.loads(result.output)
278 keys = list(data["manifest"].keys())
279 assert keys == sorted(keys)
280
281
282 # ---------------------------------------------------------------------------
283 # Bug regression — sha256: prefix round-trip
284 # ---------------------------------------------------------------------------
285
286
287 class TestSha256PrefixRoundTrip:
288 """Regression for the bare-hex-stem bug: _list_all_snapshots and
289 _resolve_snapshot were passing path.stem (bare hex) to read_snapshot,
290 which then compared it against compute_snapshot_id output (sha256: prefixed),
291 causing every snapshot to fail content-hash verification and appear missing."""
292
293 def test_list_after_create_returns_snapshot(self, tmp_path: pathlib.Path) -> None:
294 _init_repo(tmp_path)
295 _create_files(tmp_path, 2)
296 created = _create_snapshot(tmp_path)
297 result = runner.invoke(cli, ["snapshot", "list", "--json"], env=_env(tmp_path))
298 assert result.exit_code == 0
299 data = json.loads(result.output)
300 ids = [s["snapshot_id"] for s in data["snapshots"]]
301 assert created["snapshot_id"] in ids
302
303 def test_read_by_full_id_succeeds(self, tmp_path: pathlib.Path) -> None:
304 _init_repo(tmp_path)
305 _create_files(tmp_path, 1)
306 created = _create_snapshot(tmp_path)
307 snap_id = created["snapshot_id"]
308 result = runner.invoke(cli, ["snapshot", "read", snap_id], env=_env(tmp_path))
309 assert result.exit_code == 0
310
311 def test_bare_hex_prefix_rejected(self, tmp_path: pathlib.Path) -> None:
312 """Bare hex prefix (no sha256: type tag) must be rejected at the CLI boundary."""
313 _init_repo(tmp_path)
314 _create_files(tmp_path, 1)
315 created = _create_snapshot(tmp_path)
316 snap_id = created["snapshot_id"]
317 result = runner.invoke(cli, ["snapshot", "read", short_id(snap_id, strip=True)], env=_env(tmp_path))
318 assert result.exit_code != 0
319
320 def test_read_by_sha256_prefix_succeeds(self, tmp_path: pathlib.Path) -> None:
321 """Full sha256:... ID passed to snapshot read must resolve."""
322 _init_repo(tmp_path)
323 _create_files(tmp_path, 1)
324 created = _create_snapshot(tmp_path)
325 snap_id = created["snapshot_id"]
326 result = runner.invoke(cli, ["snapshot", "read", snap_id, "--json"], env=_env(tmp_path))
327 assert result.exit_code == 0
328 data = json.loads(result.output)
329 assert data["snapshot_id"] == snap_id
330
331 def test_snapshot_id_in_read_matches_create(self, tmp_path: pathlib.Path) -> None:
332 _init_repo(tmp_path)
333 _create_files(tmp_path, 2)
334 created = _create_snapshot(tmp_path)
335 result = runner.invoke(cli, ["snapshot", "read", created["snapshot_id"], "--json"], env=_env(tmp_path))
336 data = json.loads(result.output)
337 assert data["snapshot_id"] == created["snapshot_id"]
338
339
340 # ---------------------------------------------------------------------------
341 # Data integrity — create → export → verify content
342 # ---------------------------------------------------------------------------
343
344
345 class TestDataIntegrity:
346 """File contents written to archives must match the original source files."""
347
348 def test_tar_gz_content_matches_source(self, tmp_path: pathlib.Path) -> None:
349 _init_repo(tmp_path)
350 names = _create_files(tmp_path, 3)
351 created = _create_snapshot(tmp_path)
352 snap_id = created["snapshot_id"]
353 out = tmp_path / "integrity.tar.gz"
354 runner.invoke(
355 cli,
356 ["snapshot", "export", snap_id, "--output", str(out)],
357 env=_env(tmp_path),
358 )
359 assert out.exists()
360 with tarfile.open(out, "r:gz") as tar:
361 members = {m.name: m for m in tar.getmembers()}
362 for name in names:
363 match = [k for k in members if k.endswith(name)]
364 assert match, f"{name} not found in archive"
365 content = tar.extractfile(members[match[0]])
366 assert content is not None
367 extracted = content.read().decode("utf-8")
368 expected = (tmp_path / name).read_text(encoding="utf-8")
369 assert extracted == expected, f"content mismatch for {name}"
370
371 def test_zip_content_matches_source(self, tmp_path: pathlib.Path) -> None:
372 _init_repo(tmp_path)
373 names = _create_files(tmp_path, 3)
374 created = _create_snapshot(tmp_path)
375 snap_id = created["snapshot_id"]
376 out = tmp_path / "integrity.zip"
377 runner.invoke(
378 cli,
379 ["snapshot", "export", snap_id, "--format", "zip", "--output", str(out)],
380 env=_env(tmp_path),
381 )
382 assert out.exists()
383 with zipfile.ZipFile(out, "r") as zf:
384 namelist = zf.namelist()
385 for name in names:
386 match = [k for k in namelist if k.endswith(name)]
387 assert match, f"{name} not found in zip"
388 extracted = zf.read(match[0]).decode("utf-8")
389 expected = (tmp_path / name).read_text(encoding="utf-8")
390 assert extracted == expected, f"content mismatch for {name}"
391
392 def test_export_file_count_matches_snapshot(self, tmp_path: pathlib.Path) -> None:
393 _init_repo(tmp_path)
394 _create_files(tmp_path, 4)
395 created = _create_snapshot(tmp_path)
396 snap_id = created["snapshot_id"]
397 out = tmp_path / "count.tar.gz"
398 result = runner.invoke(
399 cli,
400 ["snapshot", "export", snap_id, "--output", str(out), "--json"],
401 env=_env(tmp_path),
402 )
403 assert result.exit_code == 0
404 data = json.loads(result.output)
405 assert data["file_count"] == created["file_count"]
406
407 def test_export_size_bytes_matches_disk(self, tmp_path: pathlib.Path) -> None:
408 _init_repo(tmp_path)
409 _create_files(tmp_path, 2)
410 created = _create_snapshot(tmp_path)
411 snap_id = created["snapshot_id"]
412 out = tmp_path / "size.tar.gz"
413 result = runner.invoke(
414 cli,
415 ["snapshot", "export", snap_id, "--output", str(out), "--json"],
416 env=_env(tmp_path),
417 )
418 data = json.loads(result.output)
419 assert data["size_bytes"] == out.stat().st_size
420
421
422 # ---------------------------------------------------------------------------
423 # Security
424 # ---------------------------------------------------------------------------
425
426
427 class TestSecurity:
428 """Security properties of snapshot commands."""
429
430 def test_ansi_escape_in_note_sanitized_in_text_output(self, tmp_path: pathlib.Path) -> None:
431 """ANSI escape sequences in notes must not reach the terminal raw."""
432 _init_repo(tmp_path)
433 _create_files(tmp_path, 1)
434 malicious_note = "\x1b[31mred\x1b[0m"
435 result = runner.invoke(
436 cli, ["snapshot", "create", "-m", malicious_note], env=_env(tmp_path)
437 )
438 assert result.exit_code == 0
439 # ANSI escape character should not appear verbatim in text output.
440 assert "\x1b" not in result.output
441
442 def test_note_appears_sanitized_in_list_text(self, tmp_path: pathlib.Path) -> None:
443 _init_repo(tmp_path)
444 _create_files(tmp_path, 1)
445 malicious_note = "\x1b[1mBOLD\x1b[0m"
446 _create_snapshot(tmp_path, note=malicious_note)
447 result = runner.invoke(cli, ["snapshot", "list"], env=_env(tmp_path))
448 assert result.exit_code == 0
449 assert "\x1b" not in result.output
450
451 def test_symlink_in_objects_dir_is_skipped(self, tmp_path: pathlib.Path) -> None:
452 """A symlink inside .muse/objects/ must not be read as a snapshot."""
453 from muse.core.paths import objects_dir
454 _init_repo(tmp_path)
455 _create_files(tmp_path, 1)
456 created = _create_snapshot(tmp_path)
457 objs_dir = objects_dir(tmp_path)
458 # Plant a symlink in the object store pointing to an unrelated file.
459 target = tmp_path / "some_file.txt"
460 target.write_bytes(b"payload")
461 shard_dir = objs_dir / "sha256" / "de"
462 shard_dir.mkdir(parents=True, exist_ok=True)
463 fake_name = "ad" + "0" * 60
464 link = shard_dir / fake_name
465 try:
466 link.symlink_to(target)
467 except (OSError, NotImplementedError):
468 pytest.skip("symlinks not supported on this platform")
469 result = runner.invoke(cli, ["snapshot", "list", "--json"], env=_env(tmp_path))
470 assert result.exit_code == 0
471 data = json.loads(result.output)
472 # Only the legitimately created snapshot should appear.
473 ids = [s["snapshot_id"] for s in data["snapshots"]]
474 assert len(ids) == 1
475 assert ids[0] == created["snapshot_id"]
476
477 def test_path_traversal_in_snapshot_id_prefix_is_safe(self, tmp_path: pathlib.Path) -> None:
478 """A crafted snapshot_id with ../ must not escape the snapshots dir."""
479 _init_repo(tmp_path)
480 result = runner.invoke(
481 cli,
482 ["snapshot", "read", "../../etc/passwd"],
483 env=_env(tmp_path),
484 )
485 # Must fail gracefully — not crash, not read /etc/passwd.
486 assert result.exit_code != 0
487
488 def test_safe_arcname_rejects_dotdot_path(self, tmp_path: pathlib.Path) -> None:
489 """_safe_arcname must return None for paths with .. segments."""
490 from muse.cli.commands.snapshot_cmd import _safe_arcname
491
492 assert _safe_arcname("", "../etc/passwd") is None
493 assert _safe_arcname("prefix", "../../secret") is None
494
495 def test_safe_arcname_rejects_absolute_path(self, tmp_path: pathlib.Path) -> None:
496 from muse.cli.commands.snapshot_cmd import _safe_arcname
497
498 assert _safe_arcname("", "/etc/passwd") is None
499 assert _safe_arcname("prefix", "/root/.ssh/id_rsa") is None
500
501 def test_safe_arcname_accepts_normal_path(self, tmp_path: pathlib.Path) -> None:
502 from muse.cli.commands.snapshot_cmd import _safe_arcname
503
504 assert _safe_arcname("", "src/main.py") == "src/main.py"
505 assert _safe_arcname("myproject", "lib/util.py") == "myproject/lib/util.py"
506
507 def test_safe_arcname_rejects_dotdot_in_prefix(self) -> None:
508 from muse.cli.commands.snapshot_cmd import _safe_arcname
509
510 assert _safe_arcname("../escape", "file.txt") is None
511
512
513 # ---------------------------------------------------------------------------
514 # Text mode — snapshot read --text
515 # ---------------------------------------------------------------------------
516
517
518 class TestTextMode:
519 def test_read_text_shows_snapshot_id(self, tmp_path: pathlib.Path) -> None:
520 _init_repo(tmp_path)
521 _create_files(tmp_path, 2)
522 created = _create_snapshot(tmp_path)
523 snap_id = created["snapshot_id"]
524 result = runner.invoke(
525 cli, ["snapshot", "read", snap_id], env=_env(tmp_path)
526 )
527 assert result.exit_code == 0
528 assert "snapshot_id" in result.output
529 assert snap_id in result.output
530
531 def test_read_text_shows_file_list(self, tmp_path: pathlib.Path) -> None:
532 _init_repo(tmp_path)
533 _create_files(tmp_path, 2)
534 created = _create_snapshot(tmp_path)
535 snap_id = created["snapshot_id"]
536 result = runner.invoke(
537 cli, ["snapshot", "read", snap_id], env=_env(tmp_path)
538 )
539 assert result.exit_code == 0
540 assert "file" in result.output.lower() or "files" in result.output.lower()
541
542 def test_read_text_shows_note_when_set(self, tmp_path: pathlib.Path) -> None:
543 _init_repo(tmp_path)
544 _create_files(tmp_path, 1)
545 created = _create_snapshot(tmp_path, note="my-label")
546 snap_id = created["snapshot_id"]
547 result = runner.invoke(
548 cli, ["snapshot", "read", snap_id], env=_env(tmp_path)
549 )
550 assert result.exit_code == 0
551 assert "my-label" in result.output
552
553 def test_read_text_is_not_valid_json(self, tmp_path: pathlib.Path) -> None:
554 """--text output must not be machine-parseable JSON."""
555 _init_repo(tmp_path)
556 _create_files(tmp_path, 1)
557 created = _create_snapshot(tmp_path)
558 snap_id = created["snapshot_id"]
559 result = runner.invoke(
560 cli, ["snapshot", "read", snap_id], env=_env(tmp_path)
561 )
562 assert result.exit_code == 0
563 with pytest.raises((json.JSONDecodeError, ValueError)):
564 json.loads(result.output)
565
566
567 # ---------------------------------------------------------------------------
568 # --prefix export
569 # ---------------------------------------------------------------------------
570
571
572 class TestPrefixExport:
573 def test_tar_gz_files_nested_under_prefix(self, tmp_path: pathlib.Path) -> None:
574 _init_repo(tmp_path)
575 _create_files(tmp_path, 2)
576 created = _create_snapshot(tmp_path)
577 snap_id = created["snapshot_id"]
578 out = tmp_path / "prefixed.tar.gz"
579 runner.invoke(
580 cli,
581 ["snapshot", "export", snap_id, "--prefix", "myproject", "--output", str(out)],
582 env=_env(tmp_path),
583 )
584 assert out.exists()
585 with tarfile.open(out, "r:gz") as tar:
586 names = tar.getnames()
587 assert all(n.startswith("myproject/") for n in names), names
588
589 def test_zip_files_nested_under_prefix(self, tmp_path: pathlib.Path) -> None:
590 _init_repo(tmp_path)
591 _create_files(tmp_path, 2)
592 created = _create_snapshot(tmp_path)
593 snap_id = created["snapshot_id"]
594 out = tmp_path / "prefixed.zip"
595 runner.invoke(
596 cli,
597 [
598 "snapshot", "export", snap_id,
599 "--format", "zip",
600 "--prefix", "release",
601 "--output", str(out),
602 ],
603 env=_env(tmp_path),
604 )
605 assert out.exists()
606 with zipfile.ZipFile(out, "r") as zf:
607 names = zf.namelist()
608 assert all(n.startswith("release/") for n in names), names
609
610 def test_empty_prefix_uses_flat_layout(self, tmp_path: pathlib.Path) -> None:
611 _init_repo(tmp_path)
612 _create_files(tmp_path, 2)
613 created = _create_snapshot(tmp_path)
614 snap_id = created["snapshot_id"]
615 out = tmp_path / "flat.tar.gz"
616 runner.invoke(
617 cli,
618 ["snapshot", "export", snap_id, "--prefix", "", "--output", str(out)],
619 env=_env(tmp_path),
620 )
621 assert out.exists()
622 with tarfile.open(out, "r:gz") as tar:
623 names = tar.getnames()
624 assert all(not n.startswith("/") for n in names)
625
626
627 # ---------------------------------------------------------------------------
628 # Limit validation
629 # ---------------------------------------------------------------------------
630
631
632 class TestLimitValidation:
633 def test_limit_zero_rejected(self, tmp_path: pathlib.Path) -> None:
634 _init_repo(tmp_path)
635 result = runner.invoke(
636 cli, ["snapshot", "list", "--limit", "0"], env=_env(tmp_path)
637 )
638 assert result.exit_code != 0
639
640 def test_limit_one_returns_at_most_one(self, tmp_path: pathlib.Path) -> None:
641 _init_repo(tmp_path)
642 _create_files(tmp_path, 1)
643 for _ in range(3):
644 _create_snapshot(tmp_path)
645 result = runner.invoke(
646 cli, ["snapshot", "list", "--limit", "1", "--json"], env=_env(tmp_path)
647 )
648 assert result.exit_code == 0
649 data = json.loads(result.output)
650 assert len(data["snapshots"]) <= 1
651
652 def test_negative_limit_rejected(self, tmp_path: pathlib.Path) -> None:
653 _init_repo(tmp_path)
654 result = runner.invoke(
655 cli, ["snapshot", "list", "--limit", "-1"], env=_env(tmp_path)
656 )
657 assert result.exit_code != 0
658
659 def test_short_flag_n_respected(self, tmp_path: pathlib.Path) -> None:
660 _init_repo(tmp_path)
661 _create_files(tmp_path, 1)
662 for _ in range(4):
663 _create_snapshot(tmp_path)
664 result = runner.invoke(
665 cli, ["snapshot", "list", "--limit", "2", "--json"], env=_env(tmp_path)
666 )
667 assert result.exit_code == 0
668 data = json.loads(result.output)
669 assert len(data["snapshots"]) <= 2
670
671
672 # ---------------------------------------------------------------------------
673 # Idempotency — same tree → same snapshot_id
674 # ---------------------------------------------------------------------------
675
676
677 class TestIdempotency:
678 def test_same_files_same_snapshot_id(self, tmp_path: pathlib.Path) -> None:
679 _init_repo(tmp_path)
680 _create_files(tmp_path, 3)
681 first = _create_snapshot(tmp_path)
682 second = _create_snapshot(tmp_path)
683 assert first["snapshot_id"] == second["snapshot_id"]
684
685 def test_different_content_different_snapshot_id(self, tmp_path: pathlib.Path) -> None:
686 _init_repo(tmp_path)
687 _create_files(tmp_path, 2)
688 first = _create_snapshot(tmp_path)
689 # Modify a file.
690 (tmp_path / "file_0.txt").write_text("changed-content", encoding="utf-8")
691 second = _create_snapshot(tmp_path)
692 assert first["snapshot_id"] != second["snapshot_id"]
693
694 def test_list_shows_only_one_when_idempotent(self, tmp_path: pathlib.Path) -> None:
695 """write_snapshot is idempotent — same ID written twice → one file."""
696 _init_repo(tmp_path)
697 _create_files(tmp_path, 2)
698 _create_snapshot(tmp_path)
699 _create_snapshot(tmp_path)
700 result = runner.invoke(cli, ["snapshot", "list", "--json"], env=_env(tmp_path))
701 data = json.loads(result.output)
702 # De-duplicate by snapshot_id.
703 ids = {s["snapshot_id"] for s in data["snapshots"]}
704 assert len(ids) == 1
705
706
707 # ---------------------------------------------------------------------------
708 # List ordering — newest first
709 # ---------------------------------------------------------------------------
710
711
712 class TestListOrdering:
713 def test_list_newest_first(self, tmp_path: pathlib.Path) -> None:
714 """Multiple distinct snapshots must be returned newest-first."""
715 _init_repo(tmp_path)
716 snap_ids: list[str] = []
717 for i in range(3):
718 (tmp_path / f"round_{i}.txt").write_text(f"v{i}", encoding="utf-8")
719 created = _create_snapshot(tmp_path)
720 snap_ids.append(created["snapshot_id"])
721 result = runner.invoke(cli, ["snapshot", "list", "--json"], env=_env(tmp_path))
722 data = json.loads(result.output)
723 returned = [s["snapshot_id"] for s in data["snapshots"]]
724 # Newest (last created) must appear first.
725 assert returned[0] == snap_ids[-1]
726
727
728 # ---------------------------------------------------------------------------
729 # Concurrent stress
730 # ---------------------------------------------------------------------------
731
732
733 class TestConcurrentStress:
734 def test_concurrent_creates_all_succeed(self, tmp_path: pathlib.Path) -> None:
735 """N threads creating snapshots concurrently must all succeed."""
736 _init_repo(tmp_path)
737 _create_files(tmp_path, 5)
738 n_threads = 8
739 errors: list[str] = []
740 results: list[dict] = []
741 lock = threading.Lock()
742
743 def _do_create() -> None:
744 result = runner.invoke(
745 cli, ["snapshot", "create", "--json"], env=_env(tmp_path)
746 )
747 with lock:
748 if result.exit_code != 0:
749 errors.append(result.output)
750 else:
751 results.append(json.loads(result.output))
752
753 threads = [threading.Thread(target=_do_create) for _ in range(n_threads)]
754 for t in threads:
755 t.start()
756 for t in threads:
757 t.join()
758
759 assert not errors, f"Some creates failed: {errors}"
760 assert len(results) == n_threads
761 # All results have a valid snapshot_id.
762 for r in results:
763 assert r["snapshot_id"].startswith("sha256:")
764 assert r["exit_code"] == 0
765
766
767 # ---------------------------------------------------------------------------
768 # Large file stress
769 # ---------------------------------------------------------------------------
770
771
772 class TestLargeFileExport:
773 def test_large_file_round_trips_correctly(self, tmp_path: pathlib.Path) -> None:
774 """A 5 MiB file must survive create → export → extract unchanged."""
775 _init_repo(tmp_path)
776 payload = os.urandom(5 * 1024 * 1024)
777 (tmp_path / "big.bin").write_bytes(payload)
778 created = _create_snapshot(tmp_path)
779 snap_id = created["snapshot_id"]
780 out = tmp_path / "big.tar.gz"
781 result = runner.invoke(
782 cli,
783 ["snapshot", "export", snap_id, "--output", str(out), "--json"],
784 env=_env(tmp_path),
785 )
786 assert result.exit_code == 0
787 data = json.loads(result.output)
788 assert data["file_count"] >= 1
789 assert data["size_bytes"] > 0
790 assert out.exists()
791 # Verify archive actually opens.
792 assert tarfile.is_tarfile(str(out))
793 with tarfile.open(out, "r:gz") as tar:
794 members = [m for m in tar.getmembers() if m.name.endswith("big.bin")]
795 assert members, "big.bin not found in archive"
796 content = tar.extractfile(members[0])
797 assert content is not None
798 assert content.read() == payload
799
800
801 # ---------------------------------------------------------------------------
802 # Export to default filename
803 # ---------------------------------------------------------------------------
804
805
806 class TestDefaultFilename:
807 def test_export_default_filename_is_short_id_dot_format(self, tmp_path: pathlib.Path) -> None:
808 """When --output is omitted, the archive uses <short_id>.<fmt>."""
809 _init_repo(tmp_path)
810 _create_files(tmp_path, 1)
811 created = _create_snapshot(tmp_path)
812 snap_id = created["snapshot_id"]
813 # Run from tmp_path so the default output lands there.
814 orig_dir = pathlib.Path.cwd()
815 os.chdir(tmp_path)
816 try:
817 result = runner.invoke(
818 cli, ["snapshot", "export", snap_id, "--json"], env=_env(tmp_path)
819 )
820 finally:
821 os.chdir(orig_dir)
822 assert result.exit_code == 0
823 data = json.loads(result.output)
824 assert data["output"].endswith(".tar.gz")
825 assert pathlib.Path(tmp_path / data["output"]).exists() or pathlib.Path(data["output"]).exists()
826
827 def test_export_not_found_exits_nonzero(self, tmp_path: pathlib.Path) -> None:
828 _init_repo(tmp_path)
829 result = runner.invoke(
830 cli, ["snapshot", "export", "nonexistent"], env=_env(tmp_path)
831 )
832 assert result.exit_code != 0
File History 1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 6 days ago