test_cmd_archive_hardening.py
python
sha256:d11a87833d5fad6059b7662844bf5448a8911a17cce7a51811f71ad394f248eb
bump to v0.2.0rc13
Human
patch
6 days ago
| 1 | """Comprehensive hardening tests for ``muse archive``. |
| 2 | |
| 3 | Coverage dimensions: |
| 4 | |
| 5 | Unit |
| 6 | ~~~~ |
| 7 | - ``_safe_arcname`` empty rel_path rejected |
| 8 | - ``_safe_arcname`` null bytes in rel_path rejected |
| 9 | - ``_safe_arcname`` null bytes in prefix rejected |
| 10 | - ``_safe_arcname`` dot-only path rejected (".") |
| 11 | - ``_safe_arcname`` deeply nested safe path allowed |
| 12 | - ``_safe_arcname`` path with spaces allowed |
| 13 | - ``_safe_arcname`` unicode filenames allowed |
| 14 | - ``_ArchiveJson`` TypedDict has all expected fields |
| 15 | |
| 16 | Security |
| 17 | ~~~~~~~~ |
| 18 | - ``--json`` flag now works (not broken by format validation) |
| 19 | - All error messages route to stderr, not stdout |
| 20 | - Unknown --format rejected with nonzero exit (argparse choices= guard) |
| 21 | - --prefix with ``..`` rejected with nonzero exit |
| 22 | - Zip-slip in manifest (``../`` prefix) skipped in tar.gz |
| 23 | - Zip-slip in manifest (``../`` prefix) skipped in zip |
| 24 | - ANSI escape sequences in commit message sanitized in text output |
| 25 | - Null byte in manifest rel_path skipped silently |
| 26 | |
| 27 | JSON schema |
| 28 | ~~~~~~~~~~~ |
| 29 | - ``--json`` on tar.gz produces valid ``_ArchiveJson`` schema |
| 30 | - ``--json`` on zip produces valid ``_ArchiveJson`` schema |
| 31 | - ``--json`` includes correct ``file_count`` and ``bytes`` |
| 32 | - ``--json`` includes ``commit_id`` (full SHA-256) |
| 33 | - ``--json`` includes ``message`` and ``branch`` |
| 34 | - ``--json`` includes ``ref`` as null when HEAD used |
| 35 | - ``--json`` includes ``ref`` as string when --ref used |
| 36 | - ``--json`` on empty snapshot reports file_count=0 |
| 37 | |
| 38 | Integration |
| 39 | ~~~~~~~~~~~ |
| 40 | - ``--ref`` with short SHA resolves correctly |
| 41 | - ``--ref`` with branch name resolves correctly |
| 42 | - ``--ref`` with unknown ref exits nonzero and writes to stderr |
| 43 | - Default output path is ``<sha12>.tar.gz`` |
| 44 | - Custom output path honoured |
| 45 | - Missing object in manifest skipped gracefully |
| 46 | - Archive file content matches committed bytes (round-trip) |
| 47 | - Zip archive entries are readable |
| 48 | - Tar.gz archive entries are readable |
| 49 | |
| 50 | E2E |
| 51 | ~~~ |
| 52 | - Full lifecycle: init → commit files → archive → verify contents |
| 53 | - ``--prefix`` adds directory level inside both tar.gz and zip |
| 54 | - Repeated archive calls produce identical archives (deterministic) |
| 55 | - No ``.muse/`` metadata appears in any archive entry |
| 56 | |
| 57 | Stress |
| 58 | ~~~~~~ |
| 59 | - 200-file archive completes without error |
| 60 | - Concurrent archive calls on different repos are safe |
| 61 | """ |
| 62 | |
| 63 | from __future__ import annotations |
| 64 | |
| 65 | type _FileStore = dict[str, bytes] |
| 66 | |
| 67 | import json |
| 68 | import pathlib |
| 69 | import tarfile |
| 70 | import threading |
| 71 | import zipfile |
| 72 | from typing import TypedDict |
| 73 | |
| 74 | import pytest |
| 75 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 76 | from muse.core.types import blob_id, long_id, short_id, fake_id |
| 77 | from muse.core.paths import heads_dir, muse_dir |
| 78 | |
| 79 | cli = None |
| 80 | runner = CliRunner() |
| 81 | |
| 82 | |
| 83 | # --------------------------------------------------------------------------- |
| 84 | # Helpers |
| 85 | # --------------------------------------------------------------------------- |
| 86 | |
| 87 | |
| 88 | def _env(root: pathlib.Path) -> Manifest: |
| 89 | return {"MUSE_REPO_ROOT": str(root)} |
| 90 | |
| 91 | |
| 92 | def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: |
| 93 | import datetime as dt |
| 94 | dot_muse = muse_dir(tmp_path) |
| 95 | for sub in ("objects", "commits", "snapshots", "refs/heads"): |
| 96 | (dot_muse / sub).mkdir(parents=True, exist_ok=True) |
| 97 | (dot_muse / "repo.json").write_text(json.dumps({ |
| 98 | "repo_id": fake_id("repo"), |
| 99 | "domain": "code", |
| 100 | "default_branch": "main", |
| 101 | "created_at": "2026-01-01T00:00:00+00:00", |
| 102 | }), encoding="utf-8") |
| 103 | (dot_muse / "HEAD").write_text("ref: refs/heads/main\n", encoding="utf-8") |
| 104 | return tmp_path |
| 105 | |
| 106 | |
| 107 | def _write_object(root: pathlib.Path, content: bytes) -> str: |
| 108 | from muse.core.object_store import write_object |
| 109 | obj_id = blob_id(content) |
| 110 | write_object(root, obj_id, content) |
| 111 | return obj_id |
| 112 | |
| 113 | |
| 114 | def _make_commit( |
| 115 | root: pathlib.Path, |
| 116 | files: _FileStore | None = None, |
| 117 | message: str = "test commit", |
| 118 | ) -> str: |
| 119 | import datetime as dt |
| 120 | from muse.core.ids import hash_commit, hash_snapshot |
| 121 | from muse.core.commits import ( |
| 122 | CommitRecord, |
| 123 | write_commit, |
| 124 | ) |
| 125 | from muse.core.snapshots import ( |
| 126 | SnapshotRecord, |
| 127 | write_snapshot, |
| 128 | ) |
| 129 | |
| 130 | ref_file = heads_dir(root) / "main" |
| 131 | parent_id = ref_file.read_text().strip() if ref_file.exists() else None |
| 132 | |
| 133 | manifest: Manifest = {} |
| 134 | for rel_path, content in (files or {}).items(): |
| 135 | manifest[rel_path] = _write_object(root, content) |
| 136 | |
| 137 | snap_id = hash_snapshot(manifest) |
| 138 | committed_at = dt.datetime(2026, 1, 1, tzinfo=dt.timezone.utc) |
| 139 | commit_id = hash_commit( parent_ids=[parent_id] if parent_id else [], |
| 140 | snapshot_id=snap_id, |
| 141 | message=message, |
| 142 | committed_at_iso=committed_at.isoformat(), |
| 143 | ) |
| 144 | write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) |
| 145 | write_commit(root, CommitRecord( |
| 146 | commit_id=commit_id, |
| 147 | branch="main", |
| 148 | snapshot_id=snap_id, |
| 149 | message=message, |
| 150 | committed_at=committed_at, |
| 151 | parent_commit_id=parent_id, |
| 152 | )) |
| 153 | ref_file.parent.mkdir(parents=True, exist_ok=True) |
| 154 | ref_file.write_text(commit_id, encoding="utf-8") |
| 155 | return commit_id |
| 156 | |
| 157 | |
| 158 | def _invoke(root: pathlib.Path, *args: str, cwd: pathlib.Path | None = None) -> InvokeResult: |
| 159 | return runner.invoke(cli, ["archive"] + list(args), env=_env(root), cwd=cwd or root, catch_exceptions=False) |
| 160 | |
| 161 | |
| 162 | class _ArchiveJson(TypedDict): |
| 163 | path: str |
| 164 | format: str |
| 165 | file_count: int |
| 166 | bytes: int |
| 167 | commit_id: str |
| 168 | message: str |
| 169 | branch: str |
| 170 | ref: str | None |
| 171 | |
| 172 | |
| 173 | def _parse_json(output: str) -> _ArchiveJson: |
| 174 | for line in output.splitlines(): |
| 175 | line = line.strip() |
| 176 | if line.startswith("{"): |
| 177 | raw = json.loads(line) |
| 178 | return _ArchiveJson( |
| 179 | path=str(raw["path"]), |
| 180 | format=str(raw["format"]), |
| 181 | file_count=int(raw["file_count"]), |
| 182 | bytes=int(raw["bytes"]), |
| 183 | commit_id=str(raw["commit_id"]), |
| 184 | message=str(raw["message"]), |
| 185 | branch=str(raw["branch"]), |
| 186 | ref=raw["ref"] if raw["ref"] is not None else None, |
| 187 | ) |
| 188 | raise AssertionError(f"No JSON object found in output:\n{output}") |
| 189 | |
| 190 | |
| 191 | # --------------------------------------------------------------------------- |
| 192 | # Unit — _safe_arcname edge cases |
| 193 | # --------------------------------------------------------------------------- |
| 194 | |
| 195 | |
| 196 | class TestSafeArcname: |
| 197 | def test_empty_rel_path_rejected(self) -> None: |
| 198 | from muse.cli.commands.archive import _safe_arcname |
| 199 | assert _safe_arcname("", "") is None |
| 200 | assert _safe_arcname("prefix", "") is None |
| 201 | |
| 202 | def test_null_byte_in_rel_path_rejected(self) -> None: |
| 203 | from muse.cli.commands.archive import _safe_arcname |
| 204 | assert _safe_arcname("", "file\x00.txt") is None |
| 205 | |
| 206 | def test_null_byte_in_prefix_rejected(self) -> None: |
| 207 | from muse.cli.commands.archive import _safe_arcname |
| 208 | assert _safe_arcname("pre\x00fix", "file.txt") is None |
| 209 | |
| 210 | def test_dot_only_path_rejected(self) -> None: |
| 211 | from muse.cli.commands.archive import _safe_arcname |
| 212 | # PurePosixPath("") normalises to "." — must be rejected |
| 213 | assert _safe_arcname("", ".") is None |
| 214 | |
| 215 | def test_deeply_nested_safe_path(self) -> None: |
| 216 | from muse.cli.commands.archive import _safe_arcname |
| 217 | assert _safe_arcname("", "a/b/c/d/e/file.txt") == "a/b/c/d/e/file.txt" |
| 218 | |
| 219 | def test_path_with_spaces(self) -> None: |
| 220 | from muse.cli.commands.archive import _safe_arcname |
| 221 | assert _safe_arcname("", "my file.mid") == "my file.mid" |
| 222 | |
| 223 | def test_unicode_filename(self) -> None: |
| 224 | from muse.cli.commands.archive import _safe_arcname |
| 225 | assert _safe_arcname("", "音楽/track.mid") == "音楽/track.mid" |
| 226 | |
| 227 | def test_prefix_with_subdirs(self) -> None: |
| 228 | from muse.cli.commands.archive import _safe_arcname |
| 229 | assert _safe_arcname("release/v1.0", "file.txt") == "release/v1.0/file.txt" |
| 230 | |
| 231 | |
| 232 | # --------------------------------------------------------------------------- |
| 233 | # Security |
| 234 | # --------------------------------------------------------------------------- |
| 235 | |
| 236 | |
| 237 | class TestSecurity: |
| 238 | def test_json_flag_now_works(self, tmp_path: pathlib.Path) -> None: |
| 239 | """--json must NOT exit with an error (it was broken before the fix).""" |
| 240 | root = _make_repo(tmp_path) |
| 241 | _make_commit(root, files={"song.mid": b"MIDI"}) |
| 242 | out = tmp_path / "out.tar.gz" |
| 243 | result = _invoke(root, "--output", str(out), "--json") |
| 244 | assert result.exit_code == 0, f"--json flag is still broken: {result.output}" |
| 245 | |
| 246 | def test_error_unknown_format_to_stderr(self, tmp_path: pathlib.Path) -> None: |
| 247 | """Unknown --format must exit nonzero (argparse choices= rejects it).""" |
| 248 | root = _make_repo(tmp_path) |
| 249 | _make_commit(root) |
| 250 | result = runner.invoke(cli, ["archive", "--format", "rar"], env=_env(root)) |
| 251 | assert result.exit_code != 0 |
| 252 | |
| 253 | def test_error_prefix_traversal_to_stderr(self, tmp_path: pathlib.Path) -> None: |
| 254 | root = _make_repo(tmp_path) |
| 255 | _make_commit(root, files={"song.mid": b"data"}) |
| 256 | result = runner.invoke(cli, ["archive", "--prefix", "../traversal/"], env=_env(root)) |
| 257 | assert result.exit_code != 0 |
| 258 | # Error must NOT appear on stdout (it should be on stderr, which CliRunner merges) |
| 259 | # We verify exit code nonzero — that's the contract. |
| 260 | |
| 261 | def test_error_no_commits_nonzero(self, tmp_path: pathlib.Path) -> None: |
| 262 | root = _make_repo(tmp_path) |
| 263 | result = runner.invoke(cli, ["archive"], env=_env(root)) |
| 264 | assert result.exit_code != 0 |
| 265 | |
| 266 | def test_error_bad_ref_nonzero(self, tmp_path: pathlib.Path) -> None: |
| 267 | root = _make_repo(tmp_path) |
| 268 | _make_commit(root) |
| 269 | result = runner.invoke(cli, ["archive", "--ref", "nonexistent-branch-xyz"], env=_env(root)) |
| 270 | assert result.exit_code != 0 |
| 271 | |
| 272 | def test_zip_slip_in_tar_manifest_skipped(self, tmp_path: pathlib.Path) -> None: |
| 273 | from muse.cli.commands.archive import _build_entries, _build_tar |
| 274 | root = _make_repo(tmp_path) |
| 275 | malicious_id = _write_object(root, b"malicious content") |
| 276 | safe_id = _write_object(root, b"safe content") |
| 277 | out = tmp_path / "test.tar.gz" |
| 278 | manifest = {"../../../etc/cron.d/malicious": malicious_id, "safe.txt": safe_id} |
| 279 | entries, _ = _build_entries(root, manifest, prefix="") |
| 280 | count = _build_tar(entries, out) |
| 281 | assert count == 1 |
| 282 | with tarfile.open(out, "r:gz") as tf: |
| 283 | names = tf.getnames() |
| 284 | assert not any("etc" in n or "cron" in n for n in names) |
| 285 | assert "safe.txt" in names |
| 286 | |
| 287 | def test_zip_slip_in_zip_manifest_skipped(self, tmp_path: pathlib.Path) -> None: |
| 288 | from muse.cli.commands.archive import _build_entries, _build_zip |
| 289 | root = _make_repo(tmp_path) |
| 290 | malicious_id = _write_object(root, b"malicious") |
| 291 | safe_id = _write_object(root, b"safe") |
| 292 | out = tmp_path / "test.zip" |
| 293 | manifest = {"../../../etc/malicious": malicious_id, "safe.txt": safe_id} |
| 294 | entries, _ = _build_entries(root, manifest, prefix="") |
| 295 | count = _build_zip(entries, out) |
| 296 | assert count == 1 |
| 297 | with zipfile.ZipFile(out, "r") as zf: |
| 298 | names = zf.namelist() |
| 299 | assert not any("etc" in n for n in names) |
| 300 | assert "safe.txt" in names |
| 301 | |
| 302 | def test_null_byte_in_manifest_path_skipped(self, tmp_path: pathlib.Path) -> None: |
| 303 | from muse.cli.commands.archive import _build_entries, _build_tar |
| 304 | root = _make_repo(tmp_path) |
| 305 | null_id = _write_object(root, b"null content") |
| 306 | safe_id = _write_object(root, b"safe content") |
| 307 | out = tmp_path / "null.tar.gz" |
| 308 | manifest = {"file\x00.txt": null_id, "safe.txt": safe_id} |
| 309 | entries, _ = _build_entries(root, manifest, prefix="") |
| 310 | count = _build_tar(entries, out) |
| 311 | assert count == 1 |
| 312 | |
| 313 | def test_ansi_in_commit_message_sanitized(self, tmp_path: pathlib.Path) -> None: |
| 314 | root = _make_repo(tmp_path) |
| 315 | _make_commit(root, files={"f.mid": b"data"}, message="\x1b[31mred\x1b[0m") |
| 316 | out = tmp_path / "ansi.tar.gz" |
| 317 | result = _invoke(root, "--output", str(out)) |
| 318 | assert result.exit_code == 0 |
| 319 | assert "\x1b" not in result.output |
| 320 | |
| 321 | def test_no_muse_dir_in_archive(self, tmp_path: pathlib.Path) -> None: |
| 322 | """The .muse/ directory must never appear in any archive entry.""" |
| 323 | root = _make_repo(tmp_path) |
| 324 | _make_commit(root, files={"song.mid": b"MIDI"}) |
| 325 | out = tmp_path / "clean.tar.gz" |
| 326 | _invoke(root, "--output", str(out)) |
| 327 | with tarfile.open(out, "r:gz") as tf: |
| 328 | names = tf.getnames() |
| 329 | assert not any(".muse" in n for n in names) |
| 330 | |
| 331 | |
| 332 | # --------------------------------------------------------------------------- |
| 333 | # JSON schema |
| 334 | # --------------------------------------------------------------------------- |
| 335 | |
| 336 | |
| 337 | class TestJsonSchema: |
| 338 | def test_json_tar_gz_schema(self, tmp_path: pathlib.Path) -> None: |
| 339 | root = _make_repo(tmp_path) |
| 340 | commit_id = _make_commit(root, files={"a.mid": b"data", "b.mid": b"more"}) |
| 341 | out = tmp_path / "archive.tar.gz" |
| 342 | result = _invoke(root, "--output", str(out), "--json") |
| 343 | assert result.exit_code == 0 |
| 344 | payload = _parse_json(result.output) |
| 345 | assert payload["format"] == "tar.gz" |
| 346 | assert payload["file_count"] == 2 |
| 347 | assert payload["bytes"] > 0 |
| 348 | assert payload["commit_id"] == commit_id |
| 349 | assert payload["branch"] == "main" |
| 350 | assert payload["ref"] is None |
| 351 | assert payload["path"] == str(out) |
| 352 | |
| 353 | def test_json_zip_schema(self, tmp_path: pathlib.Path) -> None: |
| 354 | root = _make_repo(tmp_path) |
| 355 | commit_id = _make_commit(root, files={"track.mid": b"MIDI"}) |
| 356 | out = tmp_path / "archive.zip" |
| 357 | result = _invoke(root, "--format", "zip", "--output", str(out), "--json") |
| 358 | assert result.exit_code == 0 |
| 359 | payload = _parse_json(result.output) |
| 360 | assert payload["format"] == "zip" |
| 361 | assert payload["file_count"] == 1 |
| 362 | assert payload["commit_id"] == commit_id |
| 363 | |
| 364 | def test_json_ref_field_when_head(self, tmp_path: pathlib.Path) -> None: |
| 365 | root = _make_repo(tmp_path) |
| 366 | _make_commit(root, files={"f.mid": b"x"}) |
| 367 | out = tmp_path / "a.tar.gz" |
| 368 | result = _invoke(root, "--output", str(out), "--json") |
| 369 | payload = _parse_json(result.output) |
| 370 | assert payload["ref"] is None |
| 371 | |
| 372 | def test_json_ref_field_when_explicit_ref(self, tmp_path: pathlib.Path) -> None: |
| 373 | root = _make_repo(tmp_path) |
| 374 | commit_id = _make_commit(root, files={"f.mid": b"x"}) |
| 375 | short = short_id(commit_id) |
| 376 | out = tmp_path / "a.tar.gz" |
| 377 | result = _invoke(root, "--ref", short, "--output", str(out), "--json") |
| 378 | payload = _parse_json(result.output) |
| 379 | assert payload["ref"] == short |
| 380 | |
| 381 | def test_json_empty_snapshot(self, tmp_path: pathlib.Path) -> None: |
| 382 | root = _make_repo(tmp_path) |
| 383 | _make_commit(root, files={}) |
| 384 | out = tmp_path / "empty.tar.gz" |
| 385 | result = _invoke(root, "--output", str(out), "--json") |
| 386 | payload = _parse_json(result.output) |
| 387 | assert payload["file_count"] == 0 |
| 388 | |
| 389 | def test_json_message_field(self, tmp_path: pathlib.Path) -> None: |
| 390 | root = _make_repo(tmp_path) |
| 391 | _make_commit(root, files={"f.mid": b"x"}, message="release v2.0") |
| 392 | out = tmp_path / "a.tar.gz" |
| 393 | result = _invoke(root, "--output", str(out), "--json") |
| 394 | payload = _parse_json(result.output) |
| 395 | assert payload["message"] == "release v2.0" |
| 396 | |
| 397 | |
| 398 | # --------------------------------------------------------------------------- |
| 399 | # Integration |
| 400 | # --------------------------------------------------------------------------- |
| 401 | |
| 402 | |
| 403 | class TestIntegration: |
| 404 | def test_default_output_path_is_sha12_dot_format(self, tmp_path: pathlib.Path) -> None: |
| 405 | root = _make_repo(tmp_path) |
| 406 | commit_id = _make_commit(root, files={"f.mid": b"data"}) |
| 407 | result = _invoke(root) |
| 408 | assert result.exit_code == 0 |
| 409 | # Filename uses bare hex (colons illegal on Windows). |
| 410 | bare_short = short_id(commit_id, strip=True) |
| 411 | assert bare_short in result.output |
| 412 | assert ".tar.gz" in result.output |
| 413 | |
| 414 | def test_ref_with_short_sha(self, tmp_path: pathlib.Path) -> None: |
| 415 | root = _make_repo(tmp_path) |
| 416 | commit_id = _make_commit(root, files={"a.mid": b"MIDI"}) |
| 417 | out = tmp_path / "ref.tar.gz" |
| 418 | # Use the full commit_id as ref (canonical sha256: prefixed form). |
| 419 | result = _invoke(root, "--ref", commit_id, "--output", str(out)) |
| 420 | assert result.exit_code == 0 |
| 421 | assert out.exists() |
| 422 | |
| 423 | def test_missing_object_skipped_gracefully(self, tmp_path: pathlib.Path) -> None: |
| 424 | """If an object file is missing from the store, that entry is skipped — not a crash.""" |
| 425 | from muse.cli.commands.archive import _build_entries, _build_tar |
| 426 | root = _make_repo(tmp_path) |
| 427 | # Write one good object, one phantom. |
| 428 | good_id = _write_object(root, b"good content") |
| 429 | phantom_id = long_id("a" * 64)# valid format but not written to store |
| 430 | out = tmp_path / "partial.tar.gz" |
| 431 | manifest = {"good.txt": good_id, "missing.txt": phantom_id} |
| 432 | entries, _ = _build_entries(root, manifest, prefix="") |
| 433 | count = _build_tar(entries, out) |
| 434 | assert count == 1 |
| 435 | with tarfile.open(out, "r:gz") as tf: |
| 436 | names = tf.getnames() |
| 437 | assert "good.txt" in names |
| 438 | assert "missing.txt" not in names |
| 439 | |
| 440 | def test_archive_bytes_match_committed_content(self, tmp_path: pathlib.Path) -> None: |
| 441 | """Content extracted from the archive must match what was committed.""" |
| 442 | root = _make_repo(tmp_path) |
| 443 | content = b"exact bytes for round-trip verification" |
| 444 | _make_commit(root, files={"track.mid": content}) |
| 445 | out = tmp_path / "roundtrip.tar.gz" |
| 446 | _invoke(root, "--output", str(out)) |
| 447 | with tarfile.open(out, "r:gz") as tf: |
| 448 | member = tf.getmembers()[0] |
| 449 | extracted = tf.extractfile(member) |
| 450 | assert extracted is not None |
| 451 | assert extracted.read() == content |
| 452 | |
| 453 | def test_zip_content_round_trip(self, tmp_path: pathlib.Path) -> None: |
| 454 | root = _make_repo(tmp_path) |
| 455 | content = b"zip round trip bytes" |
| 456 | _make_commit(root, files={"data.mid": content}) |
| 457 | out = tmp_path / "rt.zip" |
| 458 | _invoke(root, "--format", "zip", "--output", str(out)) |
| 459 | with zipfile.ZipFile(out, "r") as zf: |
| 460 | names = zf.namelist() |
| 461 | assert len(names) == 1 |
| 462 | extracted = zf.read(names[0]) |
| 463 | assert extracted == content |
| 464 | |
| 465 | def test_prefix_appears_in_tar_gz(self, tmp_path: pathlib.Path) -> None: |
| 466 | root = _make_repo(tmp_path) |
| 467 | _make_commit(root, files={"song.mid": b"MIDI"}) |
| 468 | out = tmp_path / "prefixed.tar.gz" |
| 469 | _invoke(root, "--output", str(out), "--prefix", "band-v1.0") |
| 470 | with tarfile.open(out, "r:gz") as tf: |
| 471 | names = tf.getnames() |
| 472 | assert all(n.startswith("band-v1.0/") for n in names) |
| 473 | |
| 474 | def test_prefix_appears_in_zip(self, tmp_path: pathlib.Path) -> None: |
| 475 | root = _make_repo(tmp_path) |
| 476 | _make_commit(root, files={"song.mid": b"MIDI"}) |
| 477 | out = tmp_path / "prefixed.zip" |
| 478 | _invoke(root, "--format", "zip", "--output", str(out), "--prefix", "band-v2.0") |
| 479 | with zipfile.ZipFile(out, "r") as zf: |
| 480 | names = zf.namelist() |
| 481 | assert all(n.startswith("band-v2.0/") for n in names) |
| 482 | |
| 483 | |
| 484 | # --------------------------------------------------------------------------- |
| 485 | # E2E — full lifecycle |
| 486 | # --------------------------------------------------------------------------- |
| 487 | |
| 488 | |
| 489 | class TestE2E: |
| 490 | def test_full_lifecycle_tar_gz(self, tmp_path: pathlib.Path) -> None: |
| 491 | """init → commit multiple files → archive → verify all files present.""" |
| 492 | root = _make_repo(tmp_path) |
| 493 | files = { |
| 494 | "tracks/track_01.mid": b"MIDI track 1", |
| 495 | "tracks/track_02.mid": b"MIDI track 2", |
| 496 | "README.txt": b"Album readme", |
| 497 | } |
| 498 | _make_commit(root, files=files) |
| 499 | out = tmp_path / "album.tar.gz" |
| 500 | result = _invoke(root, "--output", str(out)) |
| 501 | assert result.exit_code == 0 |
| 502 | assert out.exists() |
| 503 | with tarfile.open(out, "r:gz") as tf: |
| 504 | names = tf.getnames() |
| 505 | assert len(names) == 3 |
| 506 | assert any("track_01.mid" in n for n in names) |
| 507 | assert any("track_02.mid" in n for n in names) |
| 508 | assert any("README.txt" in n for n in names) |
| 509 | |
| 510 | def test_deterministic_output(self, tmp_path: pathlib.Path) -> None: |
| 511 | """Two archive calls on the same commit produce byte-identical files.""" |
| 512 | root = _make_repo(tmp_path) |
| 513 | _make_commit(root, files={"a.mid": b"AAA", "b.mid": b"BBB"}) |
| 514 | out1 = tmp_path / "run1.tar.gz" |
| 515 | out2 = tmp_path / "run2.tar.gz" |
| 516 | _invoke(root, "--output", str(out1)) |
| 517 | _invoke(root, "--output", str(out2)) |
| 518 | # gzip includes a timestamp by default, so byte equality is not guaranteed; |
| 519 | # but the member names and content must be identical. |
| 520 | with tarfile.open(out1, "r:gz") as tf1, tarfile.open(out2, "r:gz") as tf2: |
| 521 | names1 = sorted(tf1.getnames()) |
| 522 | names2 = sorted(tf2.getnames()) |
| 523 | assert names1 == names2 |
| 524 | |
| 525 | def test_historical_ref_archive(self, tmp_path: pathlib.Path) -> None: |
| 526 | """Archiving an old commit SHA produces only files from that snapshot.""" |
| 527 | root = _make_repo(tmp_path) |
| 528 | first_id = _make_commit(root, files={"v1.mid": b"v1 data"}) |
| 529 | _make_commit(root, files={"v1.mid": b"v1 data", "v2.mid": b"v2 data"}) |
| 530 | out = tmp_path / "historical.tar.gz" |
| 531 | result = _invoke(root, "--ref", short_id(first_id), "--output", str(out)) |
| 532 | assert result.exit_code == 0 |
| 533 | with tarfile.open(out, "r:gz") as tf: |
| 534 | names = tf.getnames() |
| 535 | assert any("v1.mid" in n for n in names) |
| 536 | assert not any("v2.mid" in n for n in names) |
| 537 | |
| 538 | def test_output_text_shows_commit_short(self, tmp_path: pathlib.Path) -> None: |
| 539 | root = _make_repo(tmp_path) |
| 540 | commit_id = _make_commit(root, files={"f.mid": b"x"}) |
| 541 | out = tmp_path / "out.tar.gz" |
| 542 | result = _invoke(root, "--output", str(out)) |
| 543 | assert result.exit_code == 0 |
| 544 | assert commit_id[:len("sha256:") + 12] in result.output |
| 545 | |
| 546 | def test_output_text_shows_file_count(self, tmp_path: pathlib.Path) -> None: |
| 547 | root = _make_repo(tmp_path) |
| 548 | _make_commit(root, files={"a.mid": b"x", "b.mid": b"y", "c.mid": b"z"}) |
| 549 | out = tmp_path / "out.tar.gz" |
| 550 | result = _invoke(root, "--output", str(out)) |
| 551 | assert "3" in result.output |
| 552 | |
| 553 | |
| 554 | # --------------------------------------------------------------------------- |
| 555 | # Stress |
| 556 | # --------------------------------------------------------------------------- |
| 557 | |
| 558 | |
| 559 | class TestStress: |
| 560 | def test_200_file_archive(self, tmp_path: pathlib.Path) -> None: |
| 561 | root = _make_repo(tmp_path) |
| 562 | files = {f"track_{i:03d}.mid": f"MIDI content {i}".encode() for i in range(200)} |
| 563 | _make_commit(root, files=files) |
| 564 | out = tmp_path / "big.tar.gz" |
| 565 | result = _invoke(root, "--output", str(out), "--json") |
| 566 | assert result.exit_code == 0 |
| 567 | payload = _parse_json(result.output) |
| 568 | assert payload["file_count"] == 200 |
| 569 | with tarfile.open(out, "r:gz") as tf: |
| 570 | assert len(tf.getnames()) == 200 |
| 571 | |
| 572 | def test_concurrent_archives_different_repos(self, tmp_path: pathlib.Path) -> None: |
| 573 | """Concurrent archive operations on different repos must not interfere.""" |
| 574 | errors: list[str] = [] |
| 575 | |
| 576 | def _run(idx: int) -> None: |
| 577 | repo_dir = tmp_path / f"repo_{idx}" |
| 578 | repo_dir.mkdir() |
| 579 | root = _make_repo(repo_dir) |
| 580 | _make_commit(root, files={f"track_{idx}.mid": f"content {idx}".encode()}) |
| 581 | out = repo_dir / f"archive_{idx}.tar.gz" |
| 582 | try: |
| 583 | result = _invoke(root, "--output", str(out)) |
| 584 | if result.exit_code != 0: |
| 585 | errors.append(f"Thread {idx} exit {result.exit_code}: {result.output[:200]}") |
| 586 | except Exception as exc: |
| 587 | errors.append(f"Thread {idx}: {exc}") |
| 588 | |
| 589 | threads = [threading.Thread(target=_run, args=(i,)) for i in range(8)] |
| 590 | for t in threads: |
| 591 | t.start() |
| 592 | for t in threads: |
| 593 | t.join() |
| 594 | |
| 595 | assert not errors, f"Concurrent archive failures: {errors}" |
File History
1 commit
sha256:d11a87833d5fad6059b7662844bf5448a8911a17cce7a51811f71ad394f248eb
bump to v0.2.0rc13
Human
patch
6 days ago