"""Seven-tier tests for ``muse/core/schema.py``. All types are TypedDicts — no runtime logic, so the tiers focus on: Unit — field presence/types on every TypedDict, Literal constraints. Integration — ElementSchema union membership, MapSchema recursive nesting, DomainSchema round-trips through json.dumps / json.loads. End-to-end — schema instances accepted by functions that consume DomainSchema. Stress — 10 000 construction cycles; deeply-nested MapSchema. Data integrity — field values survive JSON round-trip unchanged. Security — hostile strings in str fields do not cause crashes. Performance — 10 000 constructions under 1 s. """ from __future__ import annotations import json import time import typing from collections.abc import Mapping import pytest type _KwVal = str | int | float | bool | None # ────────────────────────────────────────────────────────────────────────────── # Helpers # ────────────────────────────────────────────────────────────────────────────── def _seq(**kw: _KwVal) -> Mapping[str, object]: base = dict( kind="sequence", element_type="note", identity="by_position", diff_algorithm="lcs", alphabet=None, ) base.update(kw) return base def _tree(**kw: _KwVal) -> Mapping[str, object]: base = dict(kind="tree", node_type="ast_node", diff_algorithm="zhang_shasha") base.update(kw) return base def _tensor(**kw: _KwVal) -> Mapping[str, object]: base = dict( kind="tensor", dtype="float32", rank=2, epsilon=1e-6, diff_mode="sparse" ) base.update(kw) return base def _set(**kw: _KwVal) -> Mapping[str, object]: base = dict(kind="set", element_type="file_id", identity="by_id") base.update(kw) return base def _map(value_schema: Mapping[str, object] | None = None, **kw: _KwVal) -> Mapping[str, object]: base = dict( kind="map", key_type="str", value_schema=value_schema or _seq(), identity="by_key", ) base.update(kw) return base def _dim(**kw: _KwVal) -> Mapping[str, object]: base = dict( name="notes", description="MIDI note events", schema=_seq(), independent_merge=True, ) base.update(kw) return base def _crdt_dim(**kw: _KwVal) -> Mapping[str, object]: base = dict( name="tempo", description="BPM value", crdt_type="lww_register", independent_merge=True, ) base.update(kw) return base def _domain(**kw: _KwVal) -> Mapping[str, object]: base = dict( domain="midi", description="MIDI music domain", dimensions=[_dim()], top_level=_set(), merge_mode="three_way", schema_version="0.1.0", ) base.update(kw) return base # ────────────────────────────────────────────────────────────────────────────── # Unit — SequenceSchema # ────────────────────────────────────────────────────────────────────────────── class TestSequenceSchema: def test_required_keys(self) -> None: from muse.core.schema import SequenceSchema hints = typing.get_type_hints(SequenceSchema) assert {"kind", "element_type", "identity", "diff_algorithm", "alphabet"} <= set(hints) def test_kind_literal_is_sequence(self) -> None: from muse.core.schema import SequenceSchema hints = typing.get_type_hints(SequenceSchema) args = typing.get_args(hints["kind"]) assert "sequence" in args def test_valid_diff_algorithms(self) -> None: from muse.core.schema import SequenceSchema hints = typing.get_type_hints(SequenceSchema) algos = set(typing.get_args(hints["diff_algorithm"])) assert algos == {"lcs", "myers", "patience"} def test_valid_identity_values(self) -> None: from muse.core.schema import SequenceSchema hints = typing.get_type_hints(SequenceSchema) ids = set(typing.get_args(hints["identity"])) assert ids == {"by_id", "by_position", "by_content"} def test_alphabet_is_optional_list(self) -> None: from muse.core.schema import SequenceSchema hints = typing.get_type_hints(SequenceSchema) # Should be list[str] | None args = typing.get_args(hints["alphabet"]) assert type(None) in args # ────────────────────────────────────────────────────────────────────────────── # Unit — TreeSchema # ────────────────────────────────────────────────────────────────────────────── class TestTreeSchema: def test_required_keys(self) -> None: from muse.core.schema import TreeSchema hints = typing.get_type_hints(TreeSchema) assert {"kind", "node_type", "diff_algorithm"} <= set(hints) def test_valid_diff_algorithms(self) -> None: from muse.core.schema import TreeSchema hints = typing.get_type_hints(TreeSchema) algos = set(typing.get_args(hints["diff_algorithm"])) assert algos == {"zhang_shasha", "gumtree"} # ────────────────────────────────────────────────────────────────────────────── # Unit — TensorSchema # ────────────────────────────────────────────────────────────────────────────── class TestTensorSchema: def test_required_keys(self) -> None: from muse.core.schema import TensorSchema hints = typing.get_type_hints(TensorSchema) assert {"kind", "dtype", "rank", "epsilon", "diff_mode"} <= set(hints) def test_valid_dtypes(self) -> None: from muse.core.schema import TensorSchema hints = typing.get_type_hints(TensorSchema) dtypes = set(typing.get_args(hints["dtype"])) assert dtypes == {"float32", "float64", "int8", "int16", "int32", "int64"} def test_valid_diff_modes(self) -> None: from muse.core.schema import TensorSchema hints = typing.get_type_hints(TensorSchema) modes = set(typing.get_args(hints["diff_mode"])) assert modes == {"sparse", "block", "full"} # ────────────────────────────────────────────────────────────────────────────── # Unit — SetSchema # ────────────────────────────────────────────────────────────────────────────── class TestSetSchema: def test_required_keys(self) -> None: from muse.core.schema import SetSchema hints = typing.get_type_hints(SetSchema) assert {"kind", "element_type", "identity"} <= set(hints) def test_valid_identity_values(self) -> None: from muse.core.schema import SetSchema hints = typing.get_type_hints(SetSchema) ids = set(typing.get_args(hints["identity"])) assert ids == {"by_content", "by_id"} # ────────────────────────────────────────────────────────────────────────────── # Unit — MapSchema # ────────────────────────────────────────────────────────────────────────────── class TestMapSchema: def test_required_keys(self) -> None: from muse.core.schema import MapSchema hints = typing.get_type_hints(MapSchema) assert {"kind", "key_type", "value_schema", "identity"} <= set(hints) def test_identity_is_by_key(self) -> None: from muse.core.schema import MapSchema hints = typing.get_type_hints(MapSchema) args = typing.get_args(hints["identity"]) assert "by_key" in args # ────────────────────────────────────────────────────────────────────────────── # Unit — DimensionSpec # ────────────────────────────────────────────────────────────────────────────── class TestDimensionSpec: def test_required_keys(self) -> None: from muse.core.schema import DimensionSpec hints = typing.get_type_hints(DimensionSpec) assert {"name", "description", "schema", "independent_merge"} <= set(hints) def test_independent_merge_is_bool(self) -> None: from muse.core.schema import DimensionSpec hints = typing.get_type_hints(DimensionSpec) assert hints["independent_merge"] is bool # ────────────────────────────────────────────────────────────────────────────── # Unit — CRDTDimensionSpec # ────────────────────────────────────────────────────────────────────────────── class TestCRDTDimensionSpec: def test_required_keys(self) -> None: from muse.core.schema import CRDTDimensionSpec hints = typing.get_type_hints(CRDTDimensionSpec) assert {"name", "description", "crdt_type", "independent_merge"} <= set(hints) def test_valid_crdt_types(self) -> None: from muse.core.schema import CRDTPrimitive args = set(typing.get_args(CRDTPrimitive)) assert args == {"lww_register", "or_set", "rga", "aw_map", "g_counter"} # ────────────────────────────────────────────────────────────────────────────── # Unit — DomainSchema # ────────────────────────────────────────────────────────────────────────────── class TestDomainSchema: def test_required_keys(self) -> None: from muse.core.schema import DomainSchema hints = typing.get_type_hints(DomainSchema) assert {"domain", "description", "dimensions", "top_level", "merge_mode", "schema_version"} <= set(hints) def test_valid_merge_modes(self) -> None: from muse.core.schema import DomainSchema hints = typing.get_type_hints(DomainSchema) modes = set(typing.get_args(hints["merge_mode"])) assert modes == {"three_way", "crdt"} # ────────────────────────────────────────────────────────────────────────────── # Integration — ElementSchema union, recursive nesting, JSON round-trip # ────────────────────────────────────────────────────────────────────────────── class TestIntegration: def test_element_schema_includes_all_five_types(self) -> None: from muse.core.schema import ( ElementSchema, MapSchema, SequenceSchema, SetSchema, TensorSchema, TreeSchema, ) members = typing.get_args(ElementSchema) assert SequenceSchema in members assert TreeSchema in members assert TensorSchema in members assert SetSchema in members assert MapSchema in members def test_map_schema_recursive_nesting(self) -> None: """MapSchema.value_schema can itself be a MapSchema — recursive.""" inner = _map(value_schema=_seq()) outer = _map(value_schema=inner) # Should be JSON-serialisable without error. json.dumps(outer) def test_domain_schema_json_round_trip(self) -> None: schema = _domain() raw = json.dumps(schema) back = json.loads(raw) assert back == schema def test_dimension_spec_json_round_trip(self) -> None: dim = _dim() assert json.loads(json.dumps(dim)) == dim def test_crdt_dimension_spec_json_round_trip(self) -> None: cdim = _crdt_dim() assert json.loads(json.dumps(cdim)) == cdim def test_all_element_schema_types_json_serialisable(self) -> None: for schema in [_seq(), _tree(), _tensor(), _set(), _map()]: json.dumps(schema) # must not raise def test_domain_with_crdt_merge_mode(self) -> None: schema = _domain(merge_mode="crdt") assert json.loads(json.dumps(schema))["merge_mode"] == "crdt" def test_multiple_dimensions_in_domain(self) -> None: schema = _domain(dimensions=[_dim(name="notes"), _dim(name="tempo")]) raw = json.dumps(schema) back = json.loads(raw) assert len(back["dimensions"]) == 2 # ────────────────────────────────────────────────────────────────────────────── # End-to-end — schema used as plugin contract # ────────────────────────────────────────────────────────────────────────────── class TestEndToEnd: def test_schema_importable_from_public_path(self) -> None: from muse.core.schema import DomainSchema # noqa: F401 def test_element_schema_importable(self) -> None: from muse.core.schema import ElementSchema # noqa: F401 def test_crdt_primitive_importable(self) -> None: from muse.core.schema import CRDTPrimitive # noqa: F401 def test_domain_schema_dict_passable_to_json_dumps(self) -> None: schema = _domain() result = json.dumps(schema, sort_keys=True) assert '"domain": "midi"' in result def test_sequence_schema_with_alphabet(self) -> None: seq = _seq(alphabet=["C", "D", "E", "F", "G", "A", "B"]) assert json.loads(json.dumps(seq))["alphabet"] == ["C", "D", "E", "F", "G", "A", "B"] # ────────────────────────────────────────────────────────────────────────────── # Stress # ────────────────────────────────────────────────────────────────────────────── class TestStress: def test_10000_domain_schema_constructions(self) -> None: for i in range(10_000): schema = _domain(domain=f"domain_{i}", schema_version=f"0.{i}.0") assert schema["domain"] == f"domain_{i}" def test_deeply_nested_map_schema(self) -> None: """MapSchema.value_schema is recursive — 50 levels deep must not crash.""" schema = _seq() for _ in range(50): schema = _map(value_schema=schema) # Must be JSON-serialisable regardless of depth. json.dumps(schema) def test_domain_with_100_dimensions(self) -> None: dims = [_dim(name=f"dim_{i}") for i in range(100)] schema = _domain(dimensions=dims) raw = json.loads(json.dumps(schema)) assert len(raw["dimensions"]) == 100 # ────────────────────────────────────────────────────────────────────────────── # Data integrity # ────────────────────────────────────────────────────────────────────────────── class TestDataIntegrity: def test_tensor_epsilon_survives_json_round_trip(self) -> None: t = _tensor(epsilon=1e-9) back = json.loads(json.dumps(t)) assert abs(back["epsilon"] - 1e-9) < 1e-20 def test_tensor_rank_survives_json_round_trip(self) -> None: t = _tensor(rank=4) assert json.loads(json.dumps(t))["rank"] == 4 def test_independent_merge_bool_survives_round_trip(self) -> None: dim = _dim(independent_merge=False) back = json.loads(json.dumps(dim)) assert back["independent_merge"] is False def test_domain_schema_version_string_preserved(self) -> None: schema = _domain(schema_version="1.2.3") back = json.loads(json.dumps(schema)) assert back["schema_version"] == "1.2.3" def test_set_element_type_preserved(self) -> None: s = _set(element_type="track_id") assert json.loads(json.dumps(s))["element_type"] == "track_id" # ────────────────────────────────────────────────────────────────────────────── # Security # ────────────────────────────────────────────────────────────────────────────── class TestSecurity: def test_hostile_string_in_domain_name_survives_json(self) -> None: malicious = '"; DROP TABLE domains; --' schema = _domain(domain=malicious) back = json.loads(json.dumps(schema)) assert back["domain"] == malicious def test_ansi_in_description_survives_json(self) -> None: desc = "\x1b[31mmalicious\x1b[0m" schema = _domain(description=desc) back = json.loads(json.dumps(schema)) assert back["description"] == desc def test_null_byte_in_element_type_survives_json(self) -> None: s = _seq(element_type="note\x00malicious") back = json.loads(json.dumps(s)) assert back["element_type"] == "note\x00malicious" def test_unicode_in_dimension_name_survives_json(self) -> None: dim = _dim(name="音符") back = json.loads(json.dumps(dim)) assert back["name"] == "音符" # ────────────────────────────────────────────────────────────────────────────── # Performance # ────────────────────────────────────────────────────────────────────────────── class TestPerformance: def test_10000_constructions_under_1s(self) -> None: start = time.perf_counter() for i in range(10_000): _domain(schema_version=f"0.{i}.0") elapsed = time.perf_counter() - start assert elapsed < 1.0 def test_json_round_trip_10000_times_under_2s(self) -> None: schema = _domain() start = time.perf_counter() for _ in range(10_000): json.loads(json.dumps(schema)) elapsed = time.perf_counter() - start assert elapsed < 2.0