"""Comprehensive hardening tests for ``muse archive``. Coverage dimensions: Unit ~~~~ - ``_safe_arcname`` empty rel_path rejected - ``_safe_arcname`` null bytes in rel_path rejected - ``_safe_arcname`` null bytes in prefix rejected - ``_safe_arcname`` dot-only path rejected (".") - ``_safe_arcname`` deeply nested safe path allowed - ``_safe_arcname`` path with spaces allowed - ``_safe_arcname`` unicode filenames allowed - ``_ArchiveJson`` TypedDict has all expected fields Security ~~~~~~~~ - ``--json`` flag now works (not broken by format validation) - All error messages route to stderr, not stdout - Unknown --format rejected with nonzero exit (argparse choices= guard) - --prefix with ``..`` rejected with nonzero exit - Zip-slip in manifest (``../`` prefix) skipped in tar.gz - Zip-slip in manifest (``../`` prefix) skipped in zip - ANSI escape sequences in commit message sanitized in text output - Null byte in manifest rel_path skipped silently JSON schema ~~~~~~~~~~~ - ``--json`` on tar.gz produces valid ``_ArchiveJson`` schema - ``--json`` on zip produces valid ``_ArchiveJson`` schema - ``--json`` includes correct ``file_count`` and ``bytes`` - ``--json`` includes ``commit_id`` (full SHA-256) - ``--json`` includes ``message`` and ``branch`` - ``--json`` includes ``ref`` as null when HEAD used - ``--json`` includes ``ref`` as string when --ref used - ``--json`` on empty snapshot reports file_count=0 Integration ~~~~~~~~~~~ - ``--ref`` with short SHA resolves correctly - ``--ref`` with branch name resolves correctly - ``--ref`` with unknown ref exits nonzero and writes to stderr - Default output path is ``.tar.gz`` - Custom output path honoured - Missing object in manifest skipped gracefully - Archive file content matches committed bytes (round-trip) - Zip archive entries are readable - Tar.gz archive entries are readable E2E ~~~ - Full lifecycle: init → commit files → archive → verify contents - ``--prefix`` adds directory level inside both tar.gz and zip - Repeated archive calls produce identical archives (deterministic) - No ``.muse/`` metadata appears in any archive entry Stress ~~~~~~ - 200-file archive completes without error - Concurrent archive calls on different repos are safe """ from __future__ import annotations type _FileStore = dict[str, bytes] import json import pathlib import tarfile import threading import zipfile from typing import TypedDict import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.types import blob_id, long_id, short_id, fake_id from muse.core.paths import heads_dir, muse_dir cli = None runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _env(root: pathlib.Path) -> Manifest: return {"MUSE_REPO_ROOT": str(root)} def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: import datetime as dt dot_muse = muse_dir(tmp_path) for sub in ("objects", "commits", "snapshots", "refs/heads"): (dot_muse / sub).mkdir(parents=True, exist_ok=True) (dot_muse / "repo.json").write_text(json.dumps({ "repo_id": fake_id("repo"), "domain": "code", "default_branch": "main", "created_at": "2026-01-01T00:00:00+00:00", }), encoding="utf-8") (dot_muse / "HEAD").write_text("ref: refs/heads/main\n", encoding="utf-8") return tmp_path def _write_object(root: pathlib.Path, content: bytes) -> str: from muse.core.object_store import write_object obj_id = blob_id(content) write_object(root, obj_id, content) return obj_id def _make_commit( root: pathlib.Path, files: _FileStore | None = None, message: str = "test commit", ) -> str: import datetime as dt from muse.core.ids import hash_commit, hash_snapshot from muse.core.store import CommitRecord, SnapshotRecord, write_commit, write_snapshot ref_file = heads_dir(root) / "main" parent_id = ref_file.read_text().strip() if ref_file.exists() else None manifest: Manifest = {} for rel_path, content in (files or {}).items(): manifest[rel_path] = _write_object(root, content) snap_id = hash_snapshot(manifest) committed_at = dt.datetime(2026, 1, 1, tzinfo=dt.timezone.utc) commit_id = hash_commit( parent_ids=[parent_id] if parent_id else [], snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), ) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) write_commit(root, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=message, committed_at=committed_at, parent_commit_id=parent_id, )) ref_file.parent.mkdir(parents=True, exist_ok=True) ref_file.write_text(commit_id, encoding="utf-8") return commit_id def _invoke(root: pathlib.Path, *args: str, cwd: pathlib.Path | None = None) -> InvokeResult: return runner.invoke(cli, ["archive"] + list(args), env=_env(root), cwd=cwd or root, catch_exceptions=False) class _ArchiveJson(TypedDict): path: str format: str file_count: int bytes: int commit_id: str message: str branch: str ref: str | None def _parse_json(output: str) -> _ArchiveJson: for line in output.splitlines(): line = line.strip() if line.startswith("{"): raw = json.loads(line) return _ArchiveJson( path=str(raw["path"]), format=str(raw["format"]), file_count=int(raw["file_count"]), bytes=int(raw["bytes"]), commit_id=str(raw["commit_id"]), message=str(raw["message"]), branch=str(raw["branch"]), ref=raw["ref"] if raw["ref"] is not None else None, ) raise AssertionError(f"No JSON object found in output:\n{output}") # --------------------------------------------------------------------------- # Unit — _safe_arcname edge cases # --------------------------------------------------------------------------- class TestSafeArcname: def test_empty_rel_path_rejected(self) -> None: from muse.cli.commands.archive import _safe_arcname assert _safe_arcname("", "") is None assert _safe_arcname("prefix", "") is None def test_null_byte_in_rel_path_rejected(self) -> None: from muse.cli.commands.archive import _safe_arcname assert _safe_arcname("", "file\x00.txt") is None def test_null_byte_in_prefix_rejected(self) -> None: from muse.cli.commands.archive import _safe_arcname assert _safe_arcname("pre\x00fix", "file.txt") is None def test_dot_only_path_rejected(self) -> None: from muse.cli.commands.archive import _safe_arcname # PurePosixPath("") normalises to "." — must be rejected assert _safe_arcname("", ".") is None def test_deeply_nested_safe_path(self) -> None: from muse.cli.commands.archive import _safe_arcname assert _safe_arcname("", "a/b/c/d/e/file.txt") == "a/b/c/d/e/file.txt" def test_path_with_spaces(self) -> None: from muse.cli.commands.archive import _safe_arcname assert _safe_arcname("", "my file.mid") == "my file.mid" def test_unicode_filename(self) -> None: from muse.cli.commands.archive import _safe_arcname assert _safe_arcname("", "音楽/track.mid") == "音楽/track.mid" def test_prefix_with_subdirs(self) -> None: from muse.cli.commands.archive import _safe_arcname assert _safe_arcname("release/v1.0", "file.txt") == "release/v1.0/file.txt" # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestSecurity: def test_json_flag_now_works(self, tmp_path: pathlib.Path) -> None: """--json must NOT exit with an error (it was broken before the fix).""" root = _make_repo(tmp_path) _make_commit(root, files={"song.mid": b"MIDI"}) out = tmp_path / "out.tar.gz" result = _invoke(root, "--output", str(out), "--json") assert result.exit_code == 0, f"--json flag is still broken: {result.output}" def test_error_unknown_format_to_stderr(self, tmp_path: pathlib.Path) -> None: """Unknown --format must exit nonzero (argparse choices= rejects it).""" root = _make_repo(tmp_path) _make_commit(root) result = runner.invoke(cli, ["archive", "--format", "rar"], env=_env(root)) assert result.exit_code != 0 def test_error_prefix_traversal_to_stderr(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _make_commit(root, files={"song.mid": b"data"}) result = runner.invoke(cli, ["archive", "--prefix", "../traversal/"], env=_env(root)) assert result.exit_code != 0 # Error must NOT appear on stdout (it should be on stderr, which CliRunner merges) # We verify exit code nonzero — that's the contract. def test_error_no_commits_nonzero(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) result = runner.invoke(cli, ["archive"], env=_env(root)) assert result.exit_code != 0 def test_error_bad_ref_nonzero(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _make_commit(root) result = runner.invoke(cli, ["archive", "--ref", "nonexistent-branch-xyz"], env=_env(root)) assert result.exit_code != 0 def test_zip_slip_in_tar_manifest_skipped(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.archive import _build_entries, _build_tar root = _make_repo(tmp_path) malicious_id = _write_object(root, b"malicious content") safe_id = _write_object(root, b"safe content") out = tmp_path / "test.tar.gz" manifest = {"../../../etc/cron.d/malicious": malicious_id, "safe.txt": safe_id} entries, _ = _build_entries(root, manifest, prefix="") count = _build_tar(entries, out) assert count == 1 with tarfile.open(out, "r:gz") as tf: names = tf.getnames() assert not any("etc" in n or "cron" in n for n in names) assert "safe.txt" in names def test_zip_slip_in_zip_manifest_skipped(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.archive import _build_entries, _build_zip root = _make_repo(tmp_path) malicious_id = _write_object(root, b"malicious") safe_id = _write_object(root, b"safe") out = tmp_path / "test.zip" manifest = {"../../../etc/malicious": malicious_id, "safe.txt": safe_id} entries, _ = _build_entries(root, manifest, prefix="") count = _build_zip(entries, out) assert count == 1 with zipfile.ZipFile(out, "r") as zf: names = zf.namelist() assert not any("etc" in n for n in names) assert "safe.txt" in names def test_null_byte_in_manifest_path_skipped(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.archive import _build_entries, _build_tar root = _make_repo(tmp_path) null_id = _write_object(root, b"null content") safe_id = _write_object(root, b"safe content") out = tmp_path / "null.tar.gz" manifest = {"file\x00.txt": null_id, "safe.txt": safe_id} entries, _ = _build_entries(root, manifest, prefix="") count = _build_tar(entries, out) assert count == 1 def test_ansi_in_commit_message_sanitized(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _make_commit(root, files={"f.mid": b"data"}, message="\x1b[31mred\x1b[0m") out = tmp_path / "ansi.tar.gz" result = _invoke(root, "--output", str(out)) assert result.exit_code == 0 assert "\x1b" not in result.output def test_no_muse_dir_in_archive(self, tmp_path: pathlib.Path) -> None: """The .muse/ directory must never appear in any archive entry.""" root = _make_repo(tmp_path) _make_commit(root, files={"song.mid": b"MIDI"}) out = tmp_path / "clean.tar.gz" _invoke(root, "--output", str(out)) with tarfile.open(out, "r:gz") as tf: names = tf.getnames() assert not any(".muse" in n for n in names) # --------------------------------------------------------------------------- # JSON schema # --------------------------------------------------------------------------- class TestJsonSchema: def test_json_tar_gz_schema(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) commit_id = _make_commit(root, files={"a.mid": b"data", "b.mid": b"more"}) out = tmp_path / "archive.tar.gz" result = _invoke(root, "--output", str(out), "--json") assert result.exit_code == 0 payload = _parse_json(result.output) assert payload["format"] == "tar.gz" assert payload["file_count"] == 2 assert payload["bytes"] > 0 assert payload["commit_id"] == commit_id assert payload["branch"] == "main" assert payload["ref"] is None assert payload["path"] == str(out) def test_json_zip_schema(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) commit_id = _make_commit(root, files={"track.mid": b"MIDI"}) out = tmp_path / "archive.zip" result = _invoke(root, "--format", "zip", "--output", str(out), "--json") assert result.exit_code == 0 payload = _parse_json(result.output) assert payload["format"] == "zip" assert payload["file_count"] == 1 assert payload["commit_id"] == commit_id def test_json_ref_field_when_head(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _make_commit(root, files={"f.mid": b"x"}) out = tmp_path / "a.tar.gz" result = _invoke(root, "--output", str(out), "--json") payload = _parse_json(result.output) assert payload["ref"] is None def test_json_ref_field_when_explicit_ref(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) commit_id = _make_commit(root, files={"f.mid": b"x"}) short = short_id(commit_id) out = tmp_path / "a.tar.gz" result = _invoke(root, "--ref", short, "--output", str(out), "--json") payload = _parse_json(result.output) assert payload["ref"] == short def test_json_empty_snapshot(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _make_commit(root, files={}) out = tmp_path / "empty.tar.gz" result = _invoke(root, "--output", str(out), "--json") payload = _parse_json(result.output) assert payload["file_count"] == 0 def test_json_message_field(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _make_commit(root, files={"f.mid": b"x"}, message="release v2.0") out = tmp_path / "a.tar.gz" result = _invoke(root, "--output", str(out), "--json") payload = _parse_json(result.output) assert payload["message"] == "release v2.0" # --------------------------------------------------------------------------- # Integration # --------------------------------------------------------------------------- class TestIntegration: def test_default_output_path_is_sha12_dot_format(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) commit_id = _make_commit(root, files={"f.mid": b"data"}) result = _invoke(root) assert result.exit_code == 0 # Filename uses bare hex (colons illegal on Windows). bare_short = short_id(commit_id, strip=True) assert bare_short in result.output assert ".tar.gz" in result.output def test_ref_with_short_sha(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) commit_id = _make_commit(root, files={"a.mid": b"MIDI"}) out = tmp_path / "ref.tar.gz" # Use the full commit_id as ref (canonical sha256: prefixed form). result = _invoke(root, "--ref", commit_id, "--output", str(out)) assert result.exit_code == 0 assert out.exists() def test_missing_object_skipped_gracefully(self, tmp_path: pathlib.Path) -> None: """If an object file is missing from the store, that entry is skipped — not a crash.""" from muse.cli.commands.archive import _build_entries, _build_tar root = _make_repo(tmp_path) # Write one good object, one phantom. good_id = _write_object(root, b"good content") phantom_id = long_id("a" * 64)# valid format but not written to store out = tmp_path / "partial.tar.gz" manifest = {"good.txt": good_id, "missing.txt": phantom_id} entries, _ = _build_entries(root, manifest, prefix="") count = _build_tar(entries, out) assert count == 1 with tarfile.open(out, "r:gz") as tf: names = tf.getnames() assert "good.txt" in names assert "missing.txt" not in names def test_archive_bytes_match_committed_content(self, tmp_path: pathlib.Path) -> None: """Content extracted from the archive must match what was committed.""" root = _make_repo(tmp_path) content = b"exact bytes for round-trip verification" _make_commit(root, files={"track.mid": content}) out = tmp_path / "roundtrip.tar.gz" _invoke(root, "--output", str(out)) with tarfile.open(out, "r:gz") as tf: member = tf.getmembers()[0] extracted = tf.extractfile(member) assert extracted is not None assert extracted.read() == content def test_zip_content_round_trip(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) content = b"zip round trip bytes" _make_commit(root, files={"data.mid": content}) out = tmp_path / "rt.zip" _invoke(root, "--format", "zip", "--output", str(out)) with zipfile.ZipFile(out, "r") as zf: names = zf.namelist() assert len(names) == 1 extracted = zf.read(names[0]) assert extracted == content def test_prefix_appears_in_tar_gz(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _make_commit(root, files={"song.mid": b"MIDI"}) out = tmp_path / "prefixed.tar.gz" _invoke(root, "--output", str(out), "--prefix", "band-v1.0") with tarfile.open(out, "r:gz") as tf: names = tf.getnames() assert all(n.startswith("band-v1.0/") for n in names) def test_prefix_appears_in_zip(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _make_commit(root, files={"song.mid": b"MIDI"}) out = tmp_path / "prefixed.zip" _invoke(root, "--format", "zip", "--output", str(out), "--prefix", "band-v2.0") with zipfile.ZipFile(out, "r") as zf: names = zf.namelist() assert all(n.startswith("band-v2.0/") for n in names) # --------------------------------------------------------------------------- # E2E — full lifecycle # --------------------------------------------------------------------------- class TestE2E: def test_full_lifecycle_tar_gz(self, tmp_path: pathlib.Path) -> None: """init → commit multiple files → archive → verify all files present.""" root = _make_repo(tmp_path) files = { "tracks/track_01.mid": b"MIDI track 1", "tracks/track_02.mid": b"MIDI track 2", "README.txt": b"Album readme", } _make_commit(root, files=files) out = tmp_path / "album.tar.gz" result = _invoke(root, "--output", str(out)) assert result.exit_code == 0 assert out.exists() with tarfile.open(out, "r:gz") as tf: names = tf.getnames() assert len(names) == 3 assert any("track_01.mid" in n for n in names) assert any("track_02.mid" in n for n in names) assert any("README.txt" in n for n in names) def test_deterministic_output(self, tmp_path: pathlib.Path) -> None: """Two archive calls on the same commit produce byte-identical files.""" root = _make_repo(tmp_path) _make_commit(root, files={"a.mid": b"AAA", "b.mid": b"BBB"}) out1 = tmp_path / "run1.tar.gz" out2 = tmp_path / "run2.tar.gz" _invoke(root, "--output", str(out1)) _invoke(root, "--output", str(out2)) # gzip includes a timestamp by default, so byte equality is not guaranteed; # but the member names and content must be identical. with tarfile.open(out1, "r:gz") as tf1, tarfile.open(out2, "r:gz") as tf2: names1 = sorted(tf1.getnames()) names2 = sorted(tf2.getnames()) assert names1 == names2 def test_historical_ref_archive(self, tmp_path: pathlib.Path) -> None: """Archiving an old commit SHA produces only files from that snapshot.""" root = _make_repo(tmp_path) first_id = _make_commit(root, files={"v1.mid": b"v1 data"}) _make_commit(root, files={"v1.mid": b"v1 data", "v2.mid": b"v2 data"}) out = tmp_path / "historical.tar.gz" result = _invoke(root, "--ref", short_id(first_id), "--output", str(out)) assert result.exit_code == 0 with tarfile.open(out, "r:gz") as tf: names = tf.getnames() assert any("v1.mid" in n for n in names) assert not any("v2.mid" in n for n in names) def test_output_text_shows_commit_short(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) commit_id = _make_commit(root, files={"f.mid": b"x"}) out = tmp_path / "out.tar.gz" result = _invoke(root, "--output", str(out)) assert result.exit_code == 0 assert commit_id[:len("sha256:") + 12] in result.output def test_output_text_shows_file_count(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) _make_commit(root, files={"a.mid": b"x", "b.mid": b"y", "c.mid": b"z"}) out = tmp_path / "out.tar.gz" result = _invoke(root, "--output", str(out)) assert "3" in result.output # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestStress: def test_200_file_archive(self, tmp_path: pathlib.Path) -> None: root = _make_repo(tmp_path) files = {f"track_{i:03d}.mid": f"MIDI content {i}".encode() for i in range(200)} _make_commit(root, files=files) out = tmp_path / "big.tar.gz" result = _invoke(root, "--output", str(out), "--json") assert result.exit_code == 0 payload = _parse_json(result.output) assert payload["file_count"] == 200 with tarfile.open(out, "r:gz") as tf: assert len(tf.getnames()) == 200 def test_concurrent_archives_different_repos(self, tmp_path: pathlib.Path) -> None: """Concurrent archive operations on different repos must not interfere.""" errors: list[str] = [] def _run(idx: int) -> None: repo_dir = tmp_path / f"repo_{idx}" repo_dir.mkdir() root = _make_repo(repo_dir) _make_commit(root, files={f"track_{idx}.mid": f"content {idx}".encode()}) out = repo_dir / f"archive_{idx}.tar.gz" try: result = _invoke(root, "--output", str(out)) if result.exit_code != 0: errors.append(f"Thread {idx} exit {result.exit_code}: {result.output[:200]}") except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=_run, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent archive failures: {errors}"