"""Comprehensive tests for ``muse pack-objects`` and ``unpack-objects``. Coverage tiers -------------- - Integration: pack HEAD, explicit commit, --have pruning, --dry-run, round-trip pack→unpack, text+json format for unpack - Security: invalid want/have IDs rejected, empty stdin, corrupted msgpack - Stress: 5-commit chain pack, 200 unpack rounds (idempotency) """ from __future__ import annotations import datetime import json import pathlib import msgpack from muse.core.errors import ExitCode from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.object_store import has_object, object_path, write_object from muse.core.types import Manifest, blob_id from muse.core.paths import heads_dir, muse_dir from tests.cli_test_helper import CliRunner, InvokeResult runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: repo = tmp_path / "repo" dot_muse = muse_dir(repo) for sub in ("objects", "commits", "snapshots", "refs/heads"): (dot_muse / sub).mkdir(parents=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main") (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo", "domain": "code"})) return repo def _snap(repo: pathlib.Path, manifest: Manifest | None = None) -> str: m = manifest or {} snap_id = compute_snapshot_id(m) write_snapshot(repo, SnapshotRecord( snapshot_id=snap_id, manifest=m, created_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc), )) return snap_id def _commit( repo: pathlib.Path, snap_id: str, *, parent: str | None = None, message: str = "test", ) -> str: committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) parent_ids: list[str] = [parent] if parent else [] commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), ) write_commit(repo, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=message, committed_at=committed_at, parent_commit_id=parent, )) return commit_id def _set_head(repo: pathlib.Path, commit_id: str) -> None: ref = heads_dir(repo) / "main" ref.write_text(commit_id) def _po(repo: pathlib.Path, *args: str) -> InvokeResult: from muse.cli.app import main as cli return runner.invoke( cli, ["pack-objects", *args], env={"MUSE_REPO_ROOT": str(repo)}, ) def _uo(repo: pathlib.Path, input_bytes: bytes, *args: str) -> InvokeResult: from muse.cli.app import main as cli return runner.invoke( cli, ["unpack-objects", *args], input=input_bytes, env={"MUSE_REPO_ROOT": str(repo)}, ) # --------------------------------------------------------------------------- # pack-objects # --------------------------------------------------------------------------- class TestPackObjects: def test_pack_head(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid) _set_head(repo, cid) result = _po(repo, "HEAD") assert result.exit_code == 0 mpack = msgpack.unpackb(result.stdout_bytes, raw=False) assert "commits" in mpack assert len(mpack["commits"]) >= 1 def test_pack_explicit_commit(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid) result = _po(repo, cid) assert result.exit_code == 0 mpack = msgpack.unpackb(result.stdout_bytes, raw=False) ids = [c["commit_id"] for c in mpack["commits"]] assert cid in ids def test_dry_run_returns_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid) result = _po(repo, cid, "--dry-run", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["commits"] >= 1 assert "snapshots" in data assert "blobs" in data assert cid in data["want"] def test_dry_run_head(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid) _set_head(repo, cid) result = _po(repo, "HEAD", "--dry-run", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert cid in data["want"] def test_have_prunes_old_commits(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) c1 = _commit(repo, sid, message="c1") c2 = _commit(repo, sid, parent=c1) result = _po(repo, c2, "--have", c1, "--dry-run", "--json") assert result.exit_code == 0 data = json.loads(result.output) # c1 is already in "have" — should not be in the pack assert data["commits"] == 1 def test_invalid_want_id_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _po(repo, "not-a-hex-id") assert result.exit_code == ExitCode.USER_ERROR def test_invalid_have_id_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid) result = _po(repo, cid, "--have", "bad-hex") assert result.exit_code == ExitCode.USER_ERROR def test_head_no_commits_errors(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _po(repo, "HEAD") assert result.exit_code == ExitCode.USER_ERROR def test_no_traceback_on_bad_want(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _po(repo, "bad") assert "Traceback" not in result.output # --------------------------------------------------------------------------- # unpack-objects # --------------------------------------------------------------------------- class TestUnpackObjects: def test_round_trip(self, tmp_path: pathlib.Path) -> None: src = _make_repo(tmp_path / "src") dst = _make_repo(tmp_path / "dst") sid = _snap(src) cid = _commit(src, sid) pack_result = _po(src, cid) assert pack_result.exit_code == 0 unpack_result = _uo(dst, pack_result.stdout_bytes, "--json") assert unpack_result.exit_code == 0 data = json.loads(unpack_result.output) assert data["commits_written"] == 1 def test_idempotent_double_unpack(self, tmp_path: pathlib.Path) -> None: src = _make_repo(tmp_path / "src") dst = _make_repo(tmp_path / "dst") sid = _snap(src) cid = _commit(src, sid) pack_bytes = _po(src, cid).stdout_bytes _uo(dst, pack_bytes, "--json") result2 = _uo(dst, pack_bytes, "--json") assert result2.exit_code == 0 data = json.loads(result2.output) assert data["commits_written"] == 0 # already present def test_json_shorthand(self, tmp_path: pathlib.Path) -> None: src = _make_repo(tmp_path / "src") dst = _make_repo(tmp_path / "dst") sid = _snap(src) cid = _commit(src, sid) pack_bytes = _po(src, cid).stdout_bytes result = _uo(dst, pack_bytes, "--json") assert result.exit_code == 0 assert "commits_written" in json.loads(result.output) def test_text_format(self, tmp_path: pathlib.Path) -> None: src = _make_repo(tmp_path / "src") dst = _make_repo(tmp_path / "dst") sid = _snap(src) cid = _commit(src, sid) pack_bytes = _po(src, cid).stdout_bytes result = _uo(dst, pack_bytes) assert result.exit_code == 0 assert "commits" in result.output def test_corrupted_msgpack_errors(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _uo(repo, b"\xff\xfe corrupted bytes") assert result.exit_code == ExitCode.USER_ERROR def test_empty_stdin_treated_as_empty_pack(self, tmp_path: pathlib.Path) -> None: """An empty msgpack map {} is a valid empty pack; raw empty bytes are not.""" repo = _make_repo(tmp_path) result = _uo(repo, b"") assert result.exit_code == ExitCode.USER_ERROR def test_no_traceback_on_corrupt_input(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _uo(repo, b"this is not msgpack at all!") assert "Traceback" not in result.output # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestStress: def test_5_commit_chain_pack(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) prev: str | None = None for i in range(5): prev = _commit(repo, sid, parent=prev, message=f"commit-{i}") assert prev is not None result = _po(repo, prev, "--dry-run", "--json") assert result.exit_code == 0 data = json.loads(result.output) assert data["commits"] == 5 def test_200_unpack_idempotency_rounds(self, tmp_path: pathlib.Path) -> None: src = _make_repo(tmp_path / "src") dst = _make_repo(tmp_path / "dst") sid = _snap(src) cid = _commit(src, sid) pack_bytes = _po(src, cid).stdout_bytes for i in range(200): result = _uo(dst, pack_bytes) assert result.exit_code == 0, f"failed at iteration {i}" # --------------------------------------------------------------------------- # Additional security, format, and unit gap-fill tests # --------------------------------------------------------------------------- class TestPackObjectsSecurity: def test_dry_run_json_has_expected_keys(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid) _set_head(repo, cid) r = _po(repo, "HEAD", "--dry-run", "--json") assert r.exit_code == 0 d = json.loads(r.output) assert "want" in d assert "have" in d assert "commits" in d assert "snapshots" in d assert "blobs" in d def test_ansi_in_want_rejected(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) r = _po(repo, f"\x1b[31m{'a' * 58}\x1b[0m") assert r.exit_code != 0 assert "Traceback" not in r.output def test_empty_want_list_errors(self, tmp_path: pathlib.Path) -> None: """pack-objects with no want IDs and no HEAD should error gracefully.""" repo = _make_repo(tmp_path) r = _po(repo) assert r.exit_code != 0 def test_200_sequential_dry_run(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) sid = _snap(repo) cid = _commit(repo, sid) _set_head(repo, cid) for i in range(200): r = _po(repo, "HEAD", "--dry-run") assert r.exit_code == 0, f"failed at {i}" class TestUnpackObjectsSecurity: def test_format_error_to_stderr(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) r = _uo(repo, b"", "--format", "xml") assert r.exit_code != 0 assert r.stdout_bytes == b"" assert r.stderr.strip() # any error text on stderr def test_no_traceback_on_bad_format(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) r = _uo(repo, b"", "--format", "bad") assert "Traceback" not in r.output def test_full_round_trip_with_objects(self, tmp_path: pathlib.Path) -> None: """Pack objects included in a snapshot manifest survive the round trip.""" src = _make_repo(tmp_path / "src") dst = _make_repo(tmp_path / "dst") content = b"hello round trip" oid = blob_id(content) write_object(src, oid, content) sid = _snap(src, {"hello.txt": oid}) cid = _commit(src, sid) pack_bytes = _po(src, cid).stdout_bytes assert len(pack_bytes) > 0 r = _uo(dst, pack_bytes) assert r.exit_code == 0 # Object should now exist in dst assert has_object(dst, oid) def test_unpack_text_output_format(self, tmp_path: pathlib.Path) -> None: src = _make_repo(tmp_path / "src") dst = _make_repo(tmp_path / "dst") sid = _snap(src) cid = _commit(src, sid) pack_bytes = _po(src, cid).stdout_bytes r = _uo(dst, pack_bytes) assert r.exit_code == 0 assert "commit" in r.output.lower() or "object" in r.output.lower() or "ok" in r.output.lower() def test_no_traceback_on_invalid_msgpack(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) r = _uo(repo, b"\xff\xfe invalid msgpack") assert "Traceback" not in r.output