"""Phase 2.4 — Malicious msgpack payload hardening tests. Attack surface covered ---------------------- * ``safe_unpackb`` — the new canonical deserialization primitive: - Size-bomb (payload larger than max_bytes cap) - Billion-laughs via enormous maps / arrays - String length bomb - Binary blob injection (allow_binary=False default) - ``strict_map_key=False`` passthrough for legacy staging index - Clean inputs still deserialise correctly - 10 000-entry random fuzz with no unhandled exceptions - Concurrent deserialization stress (50 threads) * ``read_msgpack_file`` — file-based wrapper: - Stat check fires before read_bytes (no OOM on 4 GiB file placeholder) - Per-value limits enforced after stat * ``MAX_PACK_MSGPACK_BYTES`` — new pack/mpack constant: - Exported and larger than ``MAX_MSGPACK_BYTES`` - Verified against the 512 MiB specification * Callsite hardening (end-to-end via CliRunner / direct call): - ``mpack._load_bundle`` rejects oversized mpack files - ``unpack_objects`` stdin rejects size-bomb payloads - ``verify_pack`` stdin rejects size-bomb payloads - ``symbol_cache.SymbolCache.load`` rejects oversized cache files - ``test_history.load_history`` rejects oversized history files - ``transport._decode`` enforces per-value limits on server responses - ``_invariants._FileCache.load`` rejects oversized cache files - ``stage.read_stage`` rejects oversized staging index files * TypeGuard narrowing (``_is_commit_dict``, ``_is_snapshot_dict``) — non-dict entries in a wire mpack are silently dropped, not propagated. """ from __future__ import annotations import os import pathlib import random import struct import tempfile import threading import time from unittest.mock import MagicMock, patch import msgpack import pytest from muse.core.io import ( MAX_MSGPACK_BYTES, MAX_PACK_MSGPACK_BYTES, read_msgpack_file, safe_unpackb, ) from muse.core.types import MsgpackValue from muse.core.paths import muse_dir # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _pack(obj: MsgpackValue) -> bytes: raw = msgpack.packb(obj, use_bin_type=True) assert isinstance(raw, bytes) return raw def _nested_map(depth: int) -> MsgpackDict: """Build a dict nested *depth* levels deep.""" result: MsgpackDict = {"x": None} for _ in range(depth - 1): result = {"x": result} return result def _nested_list(depth: int) -> list[MsgpackValue]: """Build a list nested *depth* levels deep.""" result: list[MsgpackValue] = [None] for _ in range(depth - 1): result = [result] return result # --------------------------------------------------------------------------- # 1. safe_unpackb — unit tests # --------------------------------------------------------------------------- class TestSafeUnpackbSizeBomb: """safe_unpackb raises ValueError before parsing when len(raw) > max_bytes.""" def test_exact_limit_is_accepted(self) -> None: payload = _pack("x") assert safe_unpackb(payload, max_bytes=len(payload)) == "x" def test_one_byte_over_raises(self) -> None: payload = _pack("x") with pytest.raises(ValueError, match="safety cap"): safe_unpackb(payload, max_bytes=len(payload) - 1) def test_default_limit_is_max_msgpack_bytes(self) -> None: """Payloads at exactly MAX_MSGPACK_BYTES are accepted.""" tiny = _pack({"k": "v"}) result = safe_unpackb(tiny) assert result == {"k": "v"} def test_size_error_includes_context_label(self) -> None: payload = _pack("big") with pytest.raises(ValueError, match="stdin"): safe_unpackb(payload, context="stdin", max_bytes=0) def test_empty_bytes_accepted(self) -> None: """Empty msgpack (nil) is valid and accepted.""" nil_bytes = _pack(None) result = safe_unpackb(nil_bytes) assert result is None class TestSafeUnpackbPerValueLimits: """Per-value limits block billion-laughs payloads.""" def test_string_over_1_mib_raises(self) -> None: big_str = "A" * (1_048_577) # 1 MiB + 1 byte payload = _pack(big_str) with pytest.raises(Exception): safe_unpackb(payload, max_bytes=len(payload) + 100) def test_string_exactly_at_1_mib_accepted(self) -> None: ok_str = "A" * 1_048_576 payload = _pack(ok_str) result = safe_unpackb(payload, max_bytes=len(payload) + 100) assert result == ok_str def test_huge_map_rejected(self) -> None: """A map with 1_000_001 keys exceeds _MSGPACK_MAX_MAP_LEN.""" # Build a msgpack map header with count > limit directly to avoid # allocating 1M Python strings in the test process. count = 1_000_001 # msgpack map32: 0xdf + uint32 BE count raw = struct.pack(">BI", 0xDF, count) + b"\xa1x\xa1y" * count with pytest.raises(Exception): safe_unpackb(raw, max_bytes=len(raw) + 1000) def test_huge_array_rejected(self) -> None: """An array with 1_000_001 entries exceeds _MSGPACK_MAX_ARRAY_LEN.""" count = 1_000_001 # msgpack array32: 0xdd + uint32 BE count raw = struct.pack(">BI", 0xDD, count) + b"\xc0" * count with pytest.raises(Exception): safe_unpackb(raw, max_bytes=len(raw) + 1000) def test_binary_blob_rejected_by_default(self) -> None: """allow_binary=False (default) rejects msgpack binary blobs.""" payload = _pack(b"\x00\xff" * 10) with pytest.raises(Exception): safe_unpackb(payload, max_bytes=len(payload) + 100) def test_binary_blob_accepted_with_allow_binary(self) -> None: """allow_binary=True permits binary blobs (pack/mpack payloads).""" blob = b"\xde\xad\xbe\xef" * 4 payload = _pack(blob) result = safe_unpackb(payload, max_bytes=len(payload) + 100, allow_binary=True) assert result == blob def test_strict_map_key_true_rejects_integer_keys(self) -> None: """By default, integer map keys are rejected.""" # Hand-craft msgpack with integer key payload = struct.pack(">BB", 0x81, 0x01) + b"\xa1v" # {1: "v"} with pytest.raises(Exception): safe_unpackb(payload, max_bytes=100) def test_strict_map_key_false_allows_integer_keys(self) -> None: """strict_map_key=False permits legacy integer keys (e.g. {1: 'v'}).""" payload = struct.pack(">BB", 0x81, 0x01) + b"\xa1v" # {1: "v"} result = safe_unpackb(payload, max_bytes=100, strict_map_key=False) # Returned as a dict with one entry whose value is "v" assert isinstance(result, dict) and list(result.values()) == ["v"] def test_invalid_msgpack_raises(self) -> None: with pytest.raises(Exception): safe_unpackb(b"\xff\xfe\xfd\xfc", max_bytes=100) def test_clean_dict_roundtrip(self) -> None: original: MsgpackDict = {"key": "value", "n": 42, "flag": True} result = safe_unpackb(_pack(original)) assert result == original def test_clean_list_roundtrip(self) -> None: original: list[MsgpackValue] = ["a", 1, None, True] result = safe_unpackb(_pack(original)) assert result == original class TestSafeUnpackbNestingBomb: """Deeply nested structures — should raise or return, never hang.""" def test_500_nested_dicts_terminates_quickly(self) -> None: """500 nested dicts must terminate (raise or succeed) within 1 second.""" import sys old_limit = sys.getrecursionlimit() sys.setrecursionlimit(max(old_limit, 5000)) try: payload = _pack(_nested_map(500)) start = time.monotonic() try: safe_unpackb(payload, max_bytes=len(payload) + 10_000) except Exception: pass elapsed = time.monotonic() - start assert elapsed < 1.0, f"Nested dict deserialization hung ({elapsed:.2f}s)" finally: sys.setrecursionlimit(old_limit) def test_500_nested_lists_terminates_quickly(self) -> None: import sys old_limit = sys.getrecursionlimit() sys.setrecursionlimit(max(old_limit, 5000)) try: payload = _pack(_nested_list(500)) start = time.monotonic() try: safe_unpackb(payload, max_bytes=len(payload) + 10_000) except Exception: pass elapsed = time.monotonic() - start assert elapsed < 1.0, f"Nested list deserialization hung ({elapsed:.2f}s)" finally: sys.setrecursionlimit(old_limit) # --------------------------------------------------------------------------- # 2. read_msgpack_file — unit tests # --------------------------------------------------------------------------- class TestReadMsgpackFile: """read_msgpack_file enforces the size cap via stat before read_bytes.""" def test_normal_file_roundtrips(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "ok.msgpack" f.write_bytes(_pack({"a": 1})) result = read_msgpack_file(f) assert result == {"a": 1} def test_oversized_file_raises_os_error_before_read(self, tmp_path: pathlib.Path) -> None: """The stat check must fire *before* read_bytes so no OOM occurs.""" f = tmp_path / "big.msgpack" # Write minimal valid msgpack but lie to stat via mock f.write_bytes(_pack("tiny")) with patch.object(pathlib.Path, "stat") as mock_stat: mock_stat.return_value = MagicMock(st_size=MAX_MSGPACK_BYTES + 1) with pytest.raises(OSError, match="safety cap"): read_msgpack_file(f) def test_error_message_includes_filename(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "corrupt.msgpack" f.write_bytes(_pack("x")) with patch.object(pathlib.Path, "stat") as mock_stat: mock_stat.return_value = MagicMock(st_size=MAX_MSGPACK_BYTES + 1) with pytest.raises(OSError, match="corrupt.msgpack"): read_msgpack_file(f) def test_custom_max_bytes_respected(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "small.msgpack" f.write_bytes(_pack("x")) size = f.stat().st_size # One byte under custom limit — OK result = read_msgpack_file(f, max_bytes=size + 1) assert result == "x" # One byte over custom limit — raises with pytest.raises(OSError): read_msgpack_file(f, max_bytes=size - 1) def test_strict_map_key_false_passed_through(self, tmp_path: pathlib.Path) -> None: """strict_map_key=False is forwarded to safe_unpackb.""" f = tmp_path / "int_keys.msgpack" # hand-craft msgpack with int key 1 -> "v" f.write_bytes(struct.pack(">BB", 0x81, 0x01) + b"\xa1v") result = read_msgpack_file(f, strict_map_key=False) assert isinstance(result, dict) and list(result.values()) == ["v"] def test_per_value_limits_apply_after_stat(self, tmp_path: pathlib.Path) -> None: """Even within the size cap, a huge string is rejected.""" big_str = "Z" * (1_048_577) f = tmp_path / "big_str.msgpack" f.write_bytes(_pack(big_str)) size = f.stat().st_size with pytest.raises(Exception): read_msgpack_file(f, max_bytes=size + 1000) # --------------------------------------------------------------------------- # 3. MAX_PACK_MSGPACK_BYTES — constant tests # --------------------------------------------------------------------------- class TestMaxPackMsgpackBytes: def test_exported(self) -> None: assert MAX_PACK_MSGPACK_BYTES is not None def test_is_int(self) -> None: assert isinstance(MAX_PACK_MSGPACK_BYTES, int) def test_larger_than_max_msgpack_bytes(self) -> None: assert MAX_PACK_MSGPACK_BYTES > MAX_MSGPACK_BYTES def test_is_512_mib(self) -> None: assert MAX_PACK_MSGPACK_BYTES == 512 * 1024 * 1024 # --------------------------------------------------------------------------- # 4. safe_unpackb fuzzing — 10 000 random inputs, no unhandled exceptions # --------------------------------------------------------------------------- class TestSafeUnpackbFuzz10k: """Feed 10 000 random byte strings to safe_unpackb. All calls must raise a known exception or return a valid MsgpackValue. No unhandled exceptions (AttributeError, KeyError, etc.) are permitted. """ def test_fuzz_10k_random_bytes(self) -> None: rng = random.Random(0xDEADBEEF) allowed_exc = ( ValueError, # size cap msgpack.UnpackException, msgpack.ExtraData, msgpack.FormatError, RecursionError, UnicodeDecodeError, MemoryError, ) for i in range(10_000): size = rng.randint(0, 64) payload = bytes(rng.randint(0, 255) for _ in range(size)) try: safe_unpackb(payload, max_bytes=256) except allowed_exc: pass except Exception as exc: pytest.fail( f"Unexpected exception on iteration {i} " f"(payload={payload.hex()!r}): {type(exc).__name__}: {exc}" ) def test_fuzz_10k_valid_msgpack(self) -> None: """All valid msgpack inputs (within limits) must deserialise cleanly.""" rng = random.Random(0xCAFEBABE) allowed_exc = ( ValueError, # size cap or value limit msgpack.UnpackException, ) for i in range(10_000): # Generate a small valid msgpack object kind = rng.randint(0, 4) if kind == 0: obj: MsgpackValue = rng.randint(-(2**31), 2**31 - 1) elif kind == 1: obj = rng.random() elif kind == 2: obj = "".join(chr(rng.randint(32, 126)) for _ in range(rng.randint(0, 32))) elif kind == 3: obj = None else: obj = bool(rng.randint(0, 1)) payload = _pack(obj) try: safe_unpackb(payload, max_bytes=len(payload) + 10) except allowed_exc: pass except Exception as exc: pytest.fail( f"Unexpected exception on iteration {i}: " f"{type(exc).__name__}: {exc}" ) # --------------------------------------------------------------------------- # 5. Concurrent deserialization stress # --------------------------------------------------------------------------- class TestSafeUnpackbConcurrent: """50 threads calling safe_unpackb simultaneously — no data races.""" def test_50_threads_concurrent_safe_unpackb(self) -> None: payload = _pack({"key": "value", "n": 42}) errors: list[Exception] = [] def _worker() -> None: try: result = safe_unpackb(payload) assert result == {"key": "value", "n": 42} except Exception as exc: errors.append(exc) threads = [threading.Thread(target=_worker) for _ in range(50)] for t in threads: t.start() for t in threads: t.join(timeout=5) assert not errors, f"Concurrent errors: {errors}" def test_50_threads_size_bomb_rejected_concurrently(self) -> None: payload = _pack("tiny") errors: list[str] = [] def _worker() -> None: try: safe_unpackb(payload, max_bytes=0) errors.append("Expected ValueError, got success") except ValueError: pass # correct except Exception as exc: errors.append(f"Wrong exception: {type(exc).__name__}: {exc}") threads = [threading.Thread(target=_worker) for _ in range(50)] for t in threads: t.start() for t in threads: t.join(timeout=5) assert not errors, f"Errors: {errors}" # --------------------------------------------------------------------------- # 6. Callsite hardening — mpack._load_bundle # --------------------------------------------------------------------------- class TestBundleLoadHardening: """_load_bundle rejects oversized files before reading into memory.""" def test_oversized_bundle_file_exits_cleanly(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.bundle import _load_bundle bundle_file = tmp_path / "huge.mpack" bundle_file.write_bytes(_pack({"commits": [], "snapshots": [], "blobs": []})) with patch.object(pathlib.Path, "stat") as mock_stat: mock_stat.return_value = MagicMock(st_size=MAX_PACK_MSGPACK_BYTES + 1) with pytest.raises(SystemExit): _load_bundle(bundle_file) def test_bundle_size_check_fires_before_read(self, tmp_path: pathlib.Path) -> None: """Stat check must happen *before* read_bytes — never allocate the big buffer.""" from muse.cli.commands.bundle import _load_bundle bundle_file = tmp_path / "fake_huge.mpack" bundle_file.write_bytes(_pack({})) read_bytes_calls: list[int] = [] original_read_bytes = pathlib.Path.read_bytes def tracked_read_bytes(self: pathlib.Path) -> bytes: read_bytes_calls.append(1) return original_read_bytes(self) with patch.object(pathlib.Path, "stat") as mock_stat, \ patch.object(pathlib.Path, "read_bytes", tracked_read_bytes): mock_stat.return_value = MagicMock(st_size=MAX_PACK_MSGPACK_BYTES + 1) with pytest.raises(SystemExit): _load_bundle(bundle_file) # read_bytes must NOT have been called — stat check fired first assert not read_bytes_calls, "read_bytes was called despite oversized stat" def test_valid_bundle_loads_correctly(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.bundle import _load_bundle bundle_file = tmp_path / "valid.mpack" bundle_file.write_bytes(_pack({"commits": [], "snapshots": [], "blobs": []})) result = _load_bundle(bundle_file) assert isinstance(result, dict) def test_non_dict_bundle_payload_rejected(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.bundle import _load_bundle bundle_file = tmp_path / "list.mpack" bundle_file.write_bytes(_pack([1, 2, 3])) with pytest.raises(SystemExit): _load_bundle(bundle_file) def test_non_dict_commits_entries_silently_dropped(self, tmp_path: pathlib.Path) -> None: """Non-dict entries in commits list are filtered by _is_commit_dict.""" from muse.cli.commands.bundle import _load_bundle bundle_file = tmp_path / "mixed.mpack" bundle_file.write_bytes(_pack({ "commits": ["string_entry", 42, None, {"real_key": "real_value"}], "snapshots": [], "blobs": [], })) result = _load_bundle(bundle_file) commits = result.get("commits", []) # Only the dict entry should survive assert len(commits) == 1 assert commits[0] == {"real_key": "real_value"} # --------------------------------------------------------------------------- # 7. Callsite hardening — unpack_objects (stdin) # --------------------------------------------------------------------------- class TestUnpackObjectsStdinHardening: """unpack_objects rejects size-bomb payloads from stdin.""" def test_size_bomb_stdin_rejected(self, tmp_path: pathlib.Path) -> None: """A stdin payload exceeding MAX_PACK_MSGPACK_BYTES is rejected.""" import sys from io import BytesIO import muse.cli.commands.unpack_objects as _mod tiny_payload = _pack({"commits": [], "snapshots": [], "blobs": []}) # Mock stdin.buffer.read to return a payload that exceeds the limit with patch.object(_mod, "sys") as mock_sys, \ patch("muse.cli.commands.unpack_objects.require_repo", return_value=tmp_path): # Build a .muse dir so require_repo doesn't fail muse_dir(tmp_path).mkdir(exist_ok=True) mock_sys.stdin = MagicMock() mock_sys.stdin.buffer = MagicMock() # Simulate payload larger than MAX_PACK_MSGPACK_BYTES oversized = b"X" * (MAX_PACK_MSGPACK_BYTES + 1) mock_sys.stdin.buffer.read.return_value = oversized mock_sys.stderr = sys.stderr mock_sys.stdout = sys.stdout with pytest.raises(SystemExit): args = MagicMock() args.fmt = "json" _mod.run(args) def test_invalid_msgpack_stdin_rejected(self, tmp_path: pathlib.Path) -> None: """Garbage stdin bytes produce a clean error exit, not a traceback.""" import sys import muse.cli.commands.unpack_objects as _mod with patch.object(_mod, "sys") as mock_sys, \ patch("muse.cli.commands.unpack_objects.require_repo", return_value=tmp_path): muse_dir(tmp_path).mkdir(exist_ok=True) mock_sys.stdin = MagicMock() mock_sys.stdin.buffer = MagicMock() mock_sys.stdin.buffer.read.return_value = b"\xff\xfe\xfd garbage" mock_sys.stderr = sys.stderr mock_sys.stdout = sys.stdout with pytest.raises(SystemExit) as exc_info: args = MagicMock() args.fmt = "json" _mod.run(args) assert exc_info.value.code != 0 # --------------------------------------------------------------------------- # 8. Callsite hardening — verify_pack (stdin / file) # --------------------------------------------------------------------------- class TestVerifyPackHardening: """verify_pack rejects oversized and malformed msgpack.""" def test_size_bomb_raises_system_exit(self) -> None: """A stdin payload exceeding MAX_PACK_MSGPACK_BYTES is rejected.""" import sys import muse.cli.commands.verify_pack as _mod oversized = b"X" * (MAX_PACK_MSGPACK_BYTES + 1) with patch.object(_mod, "sys") as mock_sys: mock_sys.stdin = MagicMock() mock_sys.stdin.buffer = MagicMock() mock_sys.stdin.buffer.read.return_value = oversized mock_sys.stderr = sys.stderr mock_sys.stdout = sys.stdout with pytest.raises(SystemExit): args = MagicMock() args.format = "json" args.file = None _mod.run(args) # --------------------------------------------------------------------------- # 9. Callsite hardening — symbol_cache.SymbolCache.load # --------------------------------------------------------------------------- class TestSymbolCacheHardening: """SymbolCache.load returns empty cache for oversized or corrupt files.""" def test_oversized_cache_returns_empty(self, tmp_path: pathlib.Path) -> None: from muse.core.symbol_cache import SymbolCache from muse.core.paths import symbol_cache_path dot_muse = muse_dir(tmp_path) (dot_muse / "cache").mkdir(parents=True) symbol_cache_path(tmp_path).write_bytes(_pack({"version": 1, "entries": {}})) with patch.object(pathlib.Path, "stat") as mock_stat: mock_stat.return_value = MagicMock(st_size=MAX_MSGPACK_BYTES + 1) result = SymbolCache.load(dot_muse) assert result.size == 0 def test_corrupt_cache_returns_empty(self, tmp_path: pathlib.Path) -> None: from muse.core.symbol_cache import SymbolCache from muse.core.paths import symbol_cache_path dot_muse = muse_dir(tmp_path) (dot_muse / "cache").mkdir(parents=True) symbol_cache_path(tmp_path).write_bytes(b"\xff\xfe garbage") result = SymbolCache.load(dot_muse) assert result.size == 0 def test_valid_cache_still_loads(self, tmp_path: pathlib.Path) -> None: from muse.core.symbol_cache import SymbolCache from muse.core.paths import symbol_cache_path dot_muse = muse_dir(tmp_path) (dot_muse / "cache").mkdir(parents=True) symbol_cache_path(tmp_path).write_bytes(_pack({"version": 1, "entries": {}})) result = SymbolCache.load(dot_muse) assert result.size == 0 # --------------------------------------------------------------------------- # 10. Callsite hardening — test_history.load_history # --------------------------------------------------------------------------- class TestTestHistoryHardening: """load_history returns [] for oversized or corrupt history files.""" def test_oversized_history_returns_empty_list(self, tmp_path: pathlib.Path) -> None: from muse.core.test_history import load_history from muse.core.paths import test_history_path history_file = test_history_path(tmp_path) history_file.parent.mkdir(parents=True, exist_ok=True) history_file.write_bytes(_pack({"version": 1, "runs": []})) with patch.object(pathlib.Path, "stat") as mock_stat: mock_stat.return_value = MagicMock(st_size=MAX_MSGPACK_BYTES + 1) result = load_history(tmp_path) assert result == [] def test_corrupt_history_returns_empty_list(self, tmp_path: pathlib.Path) -> None: from muse.core.test_history import load_history from muse.core.paths import test_history_path history_file = test_history_path(tmp_path) history_file.parent.mkdir(parents=True, exist_ok=True) history_file.write_bytes(b"\xde\xad\xbe\xef garbage") result = load_history(tmp_path) assert result == [] # --------------------------------------------------------------------------- # 11. Callsite hardening — transport._decode # --------------------------------------------------------------------------- class TestTransportDecodeHardening: """_decode (static method on HttpTransport) enforces per-value limits.""" def test_empty_response_returns_empty_dict(self) -> None: from muse.core.transport import HttpTransport result = HttpTransport._decode(b"") assert result == {} def test_valid_msgpack_decodes(self) -> None: from muse.core.transport import HttpTransport payload = _pack({"status": "ok", "count": 42}) result = HttpTransport._decode(payload) assert result == {"status": "ok", "count": 42} def test_string_over_limit_raises_transport_error(self) -> None: from muse.core.transport import HttpTransport, TransportError big_str = "B" * (1_048_577) payload = _pack({"msg": big_str}) with pytest.raises(TransportError, match="invalid msgpack"): HttpTransport._decode(payload) def test_non_dict_top_level_returns_empty_dict(self) -> None: from muse.core.transport import HttpTransport payload = _pack([1, 2, 3]) result = HttpTransport._decode(payload) assert result == {} def test_invalid_msgpack_raises_transport_error(self) -> None: from muse.core.transport import HttpTransport, TransportError with pytest.raises(TransportError, match="invalid msgpack"): HttpTransport._decode(b"\xff\xfe garbage") # --------------------------------------------------------------------------- # 12. Callsite hardening — stage.read_stage # --------------------------------------------------------------------------- class TestReadStageHardening: """read_stage returns {} for oversized or corrupt staging index files.""" def test_oversized_stage_returns_empty(self, tmp_path: pathlib.Path) -> None: from muse.plugins.code.stage import read_stage, stage_path root = tmp_path sp = stage_path(root) sp.parent.mkdir(parents=True, exist_ok=True) sp.write_bytes(_pack({"version": 2, "entries": {}})) with patch.object(pathlib.Path, "stat") as mock_stat: mock_stat.return_value = MagicMock(st_size=MAX_MSGPACK_BYTES + 1) result = read_stage(root) assert result == {} def test_corrupt_stage_returns_empty(self, tmp_path: pathlib.Path) -> None: from muse.plugins.code.stage import read_stage, stage_path root = tmp_path sp = stage_path(root) sp.parent.mkdir(parents=True, exist_ok=True) sp.write_bytes(b"\xfe\xed garbage") result = read_stage(root) assert result == {} def test_non_dict_stage_returns_empty(self, tmp_path: pathlib.Path) -> None: from muse.plugins.code.stage import read_stage, stage_path root = tmp_path sp = stage_path(root) sp.parent.mkdir(parents=True, exist_ok=True) sp.write_bytes(_pack([1, 2, 3])) # list, not dict result = read_stage(root) assert result == {} # --------------------------------------------------------------------------- # 13. TypeGuard narrowing — mpack and unpack_objects # --------------------------------------------------------------------------- class TestTypeGuardNarrowing: """Non-dict entries in wire bundles are filtered, not propagated.""" def test_is_commit_dict_rejects_non_dicts(self) -> None: from muse.cli.commands.bundle import _is_commit_dict assert _is_commit_dict({}) is True assert _is_commit_dict({"commit_id": "abc"}) is True assert _is_commit_dict("string") is False assert _is_commit_dict(42) is False assert _is_commit_dict(None) is False assert _is_commit_dict([]) is False def test_is_snapshot_dict_rejects_non_dicts(self) -> None: from muse.cli.commands.bundle import _is_snapshot_dict assert _is_snapshot_dict({}) is True assert _is_snapshot_dict({"snapshot_id": "abc"}) is True assert _is_snapshot_dict("string") is False assert _is_snapshot_dict(42) is False assert _is_snapshot_dict(None) is False def test_unpack_objects_is_commit_dict(self) -> None: from muse.cli.commands.unpack_objects import _is_commit_dict assert _is_commit_dict({"commit_id": "abc"}) is True assert _is_commit_dict("not a dict") is False def test_unpack_objects_is_snapshot_dict(self) -> None: from muse.cli.commands.unpack_objects import _is_snapshot_dict assert _is_snapshot_dict({"snapshot_id": "abc"}) is True assert _is_snapshot_dict(99) is False def test_as_branch_heads_filters_non_str_values(self) -> None: from muse.cli.commands.unpack_objects import _as_branch_heads result = _as_branch_heads({"main": "abc123", "bad": 42, "ok": "def456"}) assert result == {"main": "abc123", "ok": "def456"} def test_as_branch_heads_non_dict_input(self) -> None: from muse.cli.commands.unpack_objects import _as_branch_heads assert _as_branch_heads(None) == {} assert _as_branch_heads("string") == {} assert _as_branch_heads([]) == {}