"""Tests for ``muse archive`` — snapshot export command. Tiers ----- 1. Unit — ``_safe_arcname`` and ``_build_entries`` in isolation. 2. Integration — store round-trip: write commit/snapshot, build archive, verify contents. 3. End-to-End — full CLI invocations via CliRunner. 4. Security — zip-slip, tar-slip, null bytes, ``..`` traversal, unsafe prefixes. 5. Stress — large manifests, many files, names at path limits. 6. Performance — timing assertions on archive creation and list mode. 7. Data Integrity — archive contents match snapshot manifest exactly; JSON schema complete. """ from __future__ import annotations import datetime import json import pathlib import tarfile import time import zipfile import pytest from tests.cli_test_helper import CliRunner cli = None # argparse migration — CliRunner ignores this arg from muse.cli.commands.archive import ( _FORMAT_CHOICES, _build_entries, _build_tar, _build_zip, _safe_arcname, ) from muse.core.object_store import write_object from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import blob_id, long_id, short_id, split_id, fake_id from muse.core.paths import heads_dir, muse_dir runner = CliRunner() # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: """Minimal Muse repo chdir'd into tmp_path.""" monkeypatch.chdir(tmp_path) dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "repo.json").write_text('{"repo_id":"test-repo"}') (dot_muse / "HEAD").write_text("ref: refs/heads/main") (dot_muse / "commits").mkdir() (dot_muse / "snapshots").mkdir() (dot_muse / "refs" / "heads").mkdir(parents=True) (dot_muse / "objects").mkdir() return tmp_path def _make_commit( root: pathlib.Path, files: dict[str, bytes], message: str = "test commit", ) -> CommitRecord: """Write objects, a snapshot, and a commit; update the branch ref. Args: root: Repository root. files: Mapping of relative path → raw file bytes. message: Commit message. Returns: The written ``CommitRecord``. """ manifest: dict[str, str] = {} for rel_path, content in files.items(): oid = blob_id(content) write_object(root, oid, content) manifest[rel_path] = oid snap_id = hash_snapshot(manifest) snap = SnapshotRecord( snapshot_id=snap_id, manifest=manifest, directories=[], created_at=datetime.datetime(2026, 3, 1, tzinfo=datetime.timezone.utc), note="", ) write_snapshot(root, snap) committed_at = datetime.datetime(2026, 3, 1, tzinfo=datetime.timezone.utc) cid = hash_commit( parent_ids=[], snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), author="test-author", ) record = CommitRecord( commit_id=cid, branch="main", snapshot_id=snap_id, message=message, committed_at=committed_at, author="test-author", agent_id="test-agent", model_id="test-model", ) write_commit(root, record) (heads_dir(root) / "main").write_text(cid) return record # =========================================================================== # 1. Unit tests — _safe_arcname and _build_entries # =========================================================================== class TestSafeArcname: def test_simple_path_no_prefix(self) -> None: assert _safe_arcname("", "src/main.py") == "src/main.py" def test_simple_path_with_prefix(self) -> None: assert _safe_arcname("myproject", "src/main.py") == "myproject/src/main.py" def test_prefix_trailing_slash_stripped(self) -> None: assert _safe_arcname("myproject/", "a.py") == "myproject/a.py" def test_empty_rel_path_returns_none(self) -> None: assert _safe_arcname("", "") is None def test_dot_rel_path_returns_none(self) -> None: # PurePosixPath("") → "." — should be rejected assert _safe_arcname("", ".") is None def test_absolute_rel_path_returns_none(self) -> None: assert _safe_arcname("", "/etc/passwd") is None def test_dotdot_in_rel_path_returns_none(self) -> None: assert _safe_arcname("", "../../etc/passwd") is None def test_dotdot_component_in_rel_path_returns_none(self) -> None: assert _safe_arcname("", "src/../../../etc/passwd") is None def test_dotdot_in_prefix_returns_none(self) -> None: assert _safe_arcname("../traversal", "a.py") is None def test_null_byte_in_rel_path_returns_none(self) -> None: assert _safe_arcname("", "a\x00b.py") is None def test_null_byte_in_prefix_returns_none(self) -> None: assert _safe_arcname("pre\x00fix", "a.py") is None def test_nested_path(self) -> None: assert _safe_arcname("", "a/b/c/d.txt") == "a/b/c/d.txt" def test_single_filename(self) -> None: assert _safe_arcname("", "README.md") == "README.md" def test_prefix_with_subdirs(self) -> None: assert _safe_arcname("proj/v2", "src/app.py") == "proj/v2/src/app.py" class TestBuildEntries: def test_returns_entries_for_valid_manifest(self, repo: pathlib.Path) -> None: c = _make_commit(repo, {"a.py": b"hello"}) from muse.core.commits import read_commit from muse.core.snapshots import read_snapshot commit = read_commit(repo, c.commit_id) assert commit is not None snap = read_snapshot(repo, commit.snapshot_id) assert snap is not None entries, skipped = _build_entries(repo, snap.manifest, "") assert len(entries) == 1 assert skipped == [] arcname, oid, path = entries[0] assert arcname == "a.py" assert path.exists() def test_skips_missing_objects(self, repo: pathlib.Path) -> None: # Fake a manifest entry pointing at a nonexistent object. fake_manifest = {"ghost.py": fake_id("ghost-obj")} entries, skipped = _build_entries(repo, fake_manifest, "") assert entries == [] assert len(skipped) == 1 assert "missing" in skipped[0] def test_entries_sorted_by_arcname(self, repo: pathlib.Path) -> None: c = _make_commit(repo, {"z.py": b"z", "a.py": b"a", "m.py": b"m"}) from muse.core.commits import read_commit from muse.core.snapshots import read_snapshot commit = read_commit(repo, c.commit_id) snap = read_snapshot(repo, commit.snapshot_id) entries, _ = _build_entries(repo, snap.manifest, "") names = [e[0] for e in entries] assert names == sorted(names) def test_prefix_applied_to_arcnames(self, repo: pathlib.Path) -> None: c = _make_commit(repo, {"src/app.py": b"app"}) from muse.core.commits import read_commit from muse.core.snapshots import read_snapshot commit = read_commit(repo, c.commit_id) snap = read_snapshot(repo, commit.snapshot_id) entries, _ = _build_entries(repo, snap.manifest, "myproject") assert entries[0][0] == "myproject/src/app.py" # =========================================================================== # 2. Integration tests — store round-trip + archive contents # =========================================================================== class TestTarContents: def test_tar_contains_all_files(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"aaa", "b.py": b"bbb"}) out = tmp_path / "out.tar.gz" runner.invoke(cli, ["archive", "--output", str(out)], catch_exceptions=False) with tarfile.open(out, "r:gz") as tar: names = tar.getnames() assert "a.py" in names assert "b.py" in names def test_tar_file_contents_match_source(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"hello.py": b"print('hello')"}) out = tmp_path / "out.tar.gz" runner.invoke(cli, ["archive", "--output", str(out)], catch_exceptions=False) with tarfile.open(out, "r:gz") as tar: member = tar.getmember("hello.py") f = tar.extractfile(member) assert f is not None assert f.read() == b"print('hello')" def test_tar_prefix_wraps_files(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "out.tar.gz" runner.invoke( cli, ["archive", "--prefix", "proj", "--output", str(out)], catch_exceptions=False, ) with tarfile.open(out, "r:gz") as tar: names = tar.getnames() assert "proj/a.py" in names assert "a.py" not in names def test_no_muse_metadata_in_tar(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"src/app.py": b"app"}) out = tmp_path / "out.tar.gz" runner.invoke(cli, ["archive", "--output", str(out)], catch_exceptions=False) with tarfile.open(out, "r:gz") as tar: names = tar.getnames() assert not any(".muse" in n for n in names) class TestZipContents: def test_zip_contains_all_files(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"x.py": b"x", "y.py": b"y"}) out = tmp_path / "out.zip" runner.invoke(cli, ["archive", "--format", "zip", "--output", str(out)], catch_exceptions=False) with zipfile.ZipFile(out) as zf: names = zf.namelist() assert "x.py" in names assert "y.py" in names def test_zip_file_contents_match_source(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"data.txt": b"hello world"}) out = tmp_path / "out.zip" runner.invoke(cli, ["archive", "--format", "zip", "--output", str(out)], catch_exceptions=False) with zipfile.ZipFile(out) as zf: assert zf.read("data.txt") == b"hello world" def test_zip_prefix_wraps_files(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"b.py": b"b"}) out = tmp_path / "out.zip" runner.invoke( cli, ["archive", "--format", "zip", "--prefix", "release", "--output", str(out)], catch_exceptions=False, ) with zipfile.ZipFile(out) as zf: names = zf.namelist() assert "release/b.py" in names assert "b.py" not in names # =========================================================================== # 3. End-to-End tests — full CLI # =========================================================================== class TestDefaultBehavior: def test_exits_0_with_commit(self, repo: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) result = runner.invoke(cli, ["archive"], catch_exceptions=False) assert result.exit_code == 0 def test_default_filename_no_sha256_prefix(self, repo: pathlib.Path) -> None: c = _make_commit(repo, {"a.py": b"a"}) runner.invoke(cli, ["archive"], catch_exceptions=False) _, hex_full = split_id(c.commit_id) assert pathlib.Path(f"{hex_full}.tar.gz").exists() def test_default_filename_has_no_colon(self, repo: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) runner.invoke(cli, ["archive"], catch_exceptions=False) created = list(pathlib.Path(".").glob("*.tar.gz")) assert created, "no tar.gz file created" assert ":" not in created[0].name def test_no_commits_exits_1(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["archive"]) assert result.exit_code != 0 def test_output_includes_file_count(self, repo: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a", "b.py": b"b"}) result = runner.invoke(cli, ["archive"], catch_exceptions=False) assert "2 file(s)" in result.output def test_output_includes_commit_short(self, repo: pathlib.Path) -> None: c = _make_commit(repo, {"a.py": b"a"}) result = runner.invoke(cli, ["archive"], catch_exceptions=False) short = short_id(c.commit_id, strip=True) assert short in result.output class TestFormatFlag: def test_zip_format_flag(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "out.zip" result = runner.invoke( cli, ["archive", "--format", "zip", "--output", str(out)], catch_exceptions=False, ) assert result.exit_code == 0 assert zipfile.is_zipfile(out) def test_tgz_short_flag(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "out.tar.gz" result = runner.invoke( cli, ["archive", "-f", "tar.gz", "--output", str(out)], catch_exceptions=False, ) assert result.exit_code == 0 assert tarfile.is_tarfile(out) def test_invalid_format_exits_nonzero(self, repo: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) result = runner.invoke(cli, ["archive", "--format", "rar"]) assert result.exit_code != 0 class TestRefFlag: def test_ref_to_branch(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}, message="on main") out = tmp_path / "out.tar.gz" result = runner.invoke( cli, ["archive", "--ref", "main", "--output", str(out)], catch_exceptions=False, ) assert result.exit_code == 0 def test_ref_to_commit_id(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: c = _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "out.tar.gz" short = c.commit_id[len("sha256:"):len("sha256:") + 8] result = runner.invoke( cli, ["archive", "--ref", short, "--output", str(out)], catch_exceptions=False, ) assert result.exit_code == 0 def test_unknown_ref_exits_1(self, repo: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) result = runner.invoke(cli, ["archive", "--ref", "no-such-branch"]) assert result.exit_code != 0 class TestOutputFlag: def test_custom_output_path(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "release.tar.gz" runner.invoke(cli, ["archive", "--output", str(out)], catch_exceptions=False) assert out.exists() def test_output_short_flag(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "r.tar.gz" result = runner.invoke( cli, ["archive", "-o", str(out)], catch_exceptions=False ) assert result.exit_code == 0 assert out.exists() def test_missing_output_dir_exits_1(self, repo: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) result = runner.invoke(cli, ["archive", "--output", "/nonexistent/dir/out.tar.gz"]) assert result.exit_code != 0 class TestListMode: def test_list_exits_0(self, repo: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) result = runner.invoke(cli, ["archive", "--list"], catch_exceptions=False) assert result.exit_code == 0 def test_list_does_not_create_file(self, repo: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) before = set(pathlib.Path(".").glob("*.tar.gz")) runner.invoke(cli, ["archive", "--list"], catch_exceptions=False) after = set(pathlib.Path(".").glob("*.tar.gz")) assert before == after def test_list_shows_file_paths(self, repo: pathlib.Path) -> None: _make_commit(repo, {"src/app.py": b"app", "README.md": b"readme"}) result = runner.invoke(cli, ["archive", "--list"], catch_exceptions=False) assert "src/app.py" in result.output assert "README.md" in result.output def test_list_shows_file_count(self, repo: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a", "b.py": b"b", "c.py": b"c"}) result = runner.invoke(cli, ["archive", "--list"], catch_exceptions=False) assert "3 file(s)" in result.output def test_list_with_prefix_shows_prefixed_paths(self, repo: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) result = runner.invoke( cli, ["archive", "--list", "--prefix", "proj"], catch_exceptions=False, ) assert "proj/a.py" in result.output def test_list_json_schema(self, repo: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) result = runner.invoke( cli, ["archive", "--list", "--json"], catch_exceptions=False ) data = json.loads(result.output) required = { "commit_id", "snapshot_id", "message", "branch", "author", "committed_at", "ref", "prefix", "file_count", "entries", } assert required <= data.keys() assert isinstance(data["entries"], list) assert data["entries"][0].keys() >= {"path", "object_id"} def test_list_json_entry_count_matches(self, repo: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a", "b.py": b"b"}) result = runner.invoke( cli, ["archive", "--list", "--json"], catch_exceptions=False ) data = json.loads(result.output) assert data["file_count"] == 2 assert len(data["entries"]) == 2 class TestJsonOutput: def test_json_exits_0(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "out.tar.gz" result = runner.invoke( cli, ["archive", "--json", "--output", str(out)], catch_exceptions=False, ) assert result.exit_code == 0 def test_json_is_valid(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "out.tar.gz" result = runner.invoke( cli, ["archive", "--json", "--output", str(out)], catch_exceptions=False, ) data = json.loads(result.output) assert isinstance(data, dict) def test_json_has_all_keys(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "out.tar.gz" result = runner.invoke( cli, ["archive", "--json", "--output", str(out)], catch_exceptions=False, ) data = json.loads(result.output) required = { "path", "format", "file_count", "bytes", "commit_id", "snapshot_id", "message", "branch", "author", "agent_id", "model_id", "committed_at", "ref", "prefix", } assert required <= data.keys() def test_json_file_count_correct(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a", "b.py": b"b"}) out = tmp_path / "out.tar.gz" result = runner.invoke( cli, ["archive", "--json", "--output", str(out)], catch_exceptions=False, ) data = json.loads(result.output) assert data["file_count"] == 2 def test_json_bytes_positive(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"some content here"}) out = tmp_path / "out.tar.gz" result = runner.invoke( cli, ["archive", "--json", "--output", str(out)], catch_exceptions=False, ) data = json.loads(result.output) assert data["bytes"] > 0 def test_json_commit_id_matches(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: c = _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "out.tar.gz" result = runner.invoke( cli, ["archive", "--json", "--output", str(out)], catch_exceptions=False, ) data = json.loads(result.output) assert data["commit_id"] == c.commit_id def test_json_snapshot_id_present(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: c = _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "out.tar.gz" result = runner.invoke( cli, ["archive", "--json", "--output", str(out)], catch_exceptions=False, ) data = json.loads(result.output) assert data["snapshot_id"] == c.snapshot_id def test_json_agent_id_and_model_id(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "out.tar.gz" result = runner.invoke( cli, ["archive", "--json", "--output", str(out)], catch_exceptions=False, ) data = json.loads(result.output) assert data["agent_id"] == "test-agent" assert data["model_id"] == "test-model" def test_json_ref_null_for_head(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "out.tar.gz" result = runner.invoke( cli, ["archive", "--json", "--output", str(out)], catch_exceptions=False, ) data = json.loads(result.output) assert data["ref"] is None def test_json_ref_set_when_given(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "out.tar.gz" result = runner.invoke( cli, ["archive", "--json", "--ref", "main", "--output", str(out)], catch_exceptions=False, ) data = json.loads(result.output) assert data["ref"] == "main" def test_json_prefix_field(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "out.tar.gz" result = runner.invoke( cli, ["archive", "--json", "--prefix", "myproj", "--output", str(out)], catch_exceptions=False, ) data = json.loads(result.output) assert data["prefix"] == "myproj" def test_json_format_field(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "out.zip" result = runner.invoke( cli, ["archive", "--json", "--format", "zip", "--output", str(out)], catch_exceptions=False, ) data = json.loads(result.output) assert data["format"] == "zip" # =========================================================================== # 4. Security tests # =========================================================================== class TestSecurity: def test_safe_arcname_blocks_traversal(self) -> None: assert _safe_arcname("", "../../etc/passwd") is None def test_safe_arcname_blocks_absolute(self) -> None: assert _safe_arcname("", "/etc/passwd") is None def test_safe_arcname_blocks_null_byte_path(self) -> None: assert _safe_arcname("", "a\x00b") is None def test_safe_arcname_blocks_null_byte_prefix(self) -> None: assert _safe_arcname("pre\x00fix", "a.py") is None def test_safe_arcname_blocks_dotdot_prefix(self) -> None: assert _safe_arcname("../../malicious", "a.py") is None def test_prefix_dotdot_rejected_by_cli(self, repo: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) result = runner.invoke(cli, ["archive", "--prefix", "../../etc"]) assert result.exit_code != 0 def test_prefix_dotdot_error_on_stderr(self, repo: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) result = runner.invoke(cli, ["archive", "--prefix", "../../etc"]) assert "❌" in result.stderr def test_unknown_ref_does_not_glob(self, repo: pathlib.Path) -> None: """A glob metacharacter in --ref must not trigger directory scanning.""" _make_commit(repo, {"a.py": b"a"}) result = runner.invoke(cli, ["archive", "--ref", "../../*"]) assert result.exit_code != 0 def test_tar_archive_has_no_traversal_paths(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"safe/file.py": b"ok"}) out = tmp_path / "out.tar.gz" runner.invoke(cli, ["archive", "--output", str(out)], catch_exceptions=False) with tarfile.open(out, "r:gz") as tar: for name in tar.getnames(): assert not name.startswith("/") assert ".." not in name.split("/") def test_zip_archive_has_no_traversal_paths(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"safe/file.py": b"ok"}) out = tmp_path / "out.zip" runner.invoke( cli, ["archive", "--format", "zip", "--output", str(out)], catch_exceptions=False, ) with zipfile.ZipFile(out) as zf: for name in zf.namelist(): assert not name.startswith("/") assert ".." not in name.split("/") # =========================================================================== # 5. Stress tests # =========================================================================== class TestStress: def test_100_file_manifest_tar(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: files = {f"src/module_{i:03d}.py": f"# module {i}".encode() for i in range(100)} _make_commit(repo, files) out = tmp_path / "out.tar.gz" result = runner.invoke(cli, ["archive", "--output", str(out)], catch_exceptions=False) assert result.exit_code == 0 with tarfile.open(out, "r:gz") as tar: assert len(tar.getnames()) == 100 def test_100_file_manifest_zip(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: files = {f"src/module_{i:03d}.py": f"# module {i}".encode() for i in range(100)} _make_commit(repo, files) out = tmp_path / "out.zip" result = runner.invoke( cli, ["archive", "--format", "zip", "--output", str(out)], catch_exceptions=False, ) assert result.exit_code == 0 with zipfile.ZipFile(out) as zf: assert len(zf.namelist()) == 100 def test_list_mode_100_files(self, repo: pathlib.Path) -> None: files = {f"f_{i:03d}.txt": b"x" for i in range(100)} _make_commit(repo, files) result = runner.invoke(cli, ["archive", "--list", "--json"], catch_exceptions=False) data = json.loads(result.output) assert data["file_count"] == 100 assert len(data["entries"]) == 100 def test_deeply_nested_paths(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: files = {"a/b/c/d/e/f/deep.py": b"deep"} _make_commit(repo, files) out = tmp_path / "out.tar.gz" runner.invoke(cli, ["archive", "--output", str(out)], catch_exceptions=False) with tarfile.open(out, "r:gz") as tar: assert "a/b/c/d/e/f/deep.py" in tar.getnames() def test_large_file_content(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: big = b"x" * (1024 * 512) # 512 KiB _make_commit(repo, {"big.bin": big}) out = tmp_path / "out.tar.gz" result = runner.invoke(cli, ["archive", "--output", str(out)], catch_exceptions=False) assert result.exit_code == 0 with tarfile.open(out, "r:gz") as tar: f = tar.extractfile(tar.getmember("big.bin")) assert f is not None assert f.read() == big # =========================================================================== # 6. Performance tests # =========================================================================== class TestPerformance: def test_single_file_archive_under_500ms(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "out.tar.gz" start = time.monotonic() runner.invoke(cli, ["archive", "--output", str(out)], catch_exceptions=False) elapsed = time.monotonic() - start assert elapsed < 0.5, f"single-file archive took {elapsed:.3f}s" def test_list_mode_under_300ms(self, repo: pathlib.Path) -> None: files = {f"f_{i}.py": b"x" for i in range(20)} _make_commit(repo, files) start = time.monotonic() runner.invoke(cli, ["archive", "--list", "--json"], catch_exceptions=False) elapsed = time.monotonic() - start assert elapsed < 0.3, f"list mode took {elapsed:.3f}s" def test_json_output_under_500ms(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: files = {f"f_{i}.py": b"x" for i in range(10)} _make_commit(repo, files) out = tmp_path / "out.tar.gz" start = time.monotonic() runner.invoke(cli, ["archive", "--json", "--output", str(out)], catch_exceptions=False) elapsed = time.monotonic() - start assert elapsed < 0.5, f"json archive took {elapsed:.3f}s" # =========================================================================== # 7. Data Integrity tests # =========================================================================== class TestDataIntegrity: def test_archive_contains_exactly_manifest_files( self, repo: pathlib.Path, tmp_path: pathlib.Path ) -> None: """Every file in the snapshot manifest appears in the archive, no more.""" files = {"a.py": b"a", "b/c.py": b"bc", "d.txt": b"d"} _make_commit(repo, files) out = tmp_path / "out.tar.gz" runner.invoke(cli, ["archive", "--output", str(out)], catch_exceptions=False) with tarfile.open(out, "r:gz") as tar: names = set(tar.getnames()) assert names == set(files.keys()) def test_file_bytes_match_original(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: content = b"\x00\x01\x02binary\xff\xfe" _make_commit(repo, {"binary.bin": content}) out = tmp_path / "out.tar.gz" runner.invoke(cli, ["archive", "--output", str(out)], catch_exceptions=False) with tarfile.open(out, "r:gz") as tar: f = tar.extractfile(tar.getmember("binary.bin")) assert f is not None assert f.read() == content def test_zip_bytes_match_original(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: content = b"exact content" _make_commit(repo, {"f.txt": content}) out = tmp_path / "out.zip" runner.invoke( cli, ["archive", "--format", "zip", "--output", str(out)], catch_exceptions=False, ) with zipfile.ZipFile(out) as zf: assert zf.read("f.txt") == content def test_list_entries_match_archive_entries( self, repo: pathlib.Path, tmp_path: pathlib.Path ) -> None: """Files listed by --list match files written to the archive.""" files = {"x.py": b"x", "y/z.py": b"yz"} _make_commit(repo, files) list_result = runner.invoke( cli, ["archive", "--list", "--json"], catch_exceptions=False ) list_data = json.loads(list_result.output) listed_paths = {e["path"] for e in list_data["entries"]} out = tmp_path / "out.tar.gz" runner.invoke(cli, ["archive", "--output", str(out)], catch_exceptions=False) with tarfile.open(out, "r:gz") as tar: archive_paths = set(tar.getnames()) assert listed_paths == archive_paths def test_list_entries_sorted(self, repo: pathlib.Path) -> None: files = {"z.py": b"z", "a.py": b"a", "m.py": b"m"} _make_commit(repo, files) result = runner.invoke( cli, ["archive", "--list", "--json"], catch_exceptions=False ) data = json.loads(result.output) paths = [e["path"] for e in data["entries"]] assert paths == sorted(paths) def test_committed_at_iso8601(self, repo: pathlib.Path, tmp_path: pathlib.Path) -> None: _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "out.tar.gz" result = runner.invoke( cli, ["archive", "--json", "--output", str(out)], catch_exceptions=False ) data = json.loads(result.output) # Must parse without error dt = datetime.datetime.fromisoformat(data["committed_at"]) assert dt.tzinfo is not None def test_json_path_field_matches_written_file( self, repo: pathlib.Path, tmp_path: pathlib.Path ) -> None: _make_commit(repo, {"a.py": b"a"}) out = tmp_path / "exact-name.tar.gz" result = runner.invoke( cli, ["archive", "--json", "--output", str(out)], catch_exceptions=False ) data = json.loads(result.output) assert pathlib.Path(data["path"]) == out def test_json_bytes_matches_file_size( self, repo: pathlib.Path, tmp_path: pathlib.Path ) -> None: _make_commit(repo, {"a.py": b"content here"}) out = tmp_path / "out.tar.gz" result = runner.invoke( cli, ["archive", "--json", "--output", str(out)], catch_exceptions=False ) data = json.loads(result.output) assert data["bytes"] == out.stat().st_size def test_format_choices_complete(self) -> None: assert _FORMAT_CHOICES == {"tar.gz", "zip"}