gabriel / muse public
test_identity_domain_schema.py python
148 lines 6.4 KB
Raw
1 """TDD — IdentityPlugin.schema() and protocol conformance.
2
3 Covers:
4 - schema() returns a valid DomainSchema
5 - domain field is "identity"
6 - Two dimensions: "identities" and "relationships"
7 - Both dimensions are independently mergeable
8 - "identities" dimension uses SetSchema keyed by handle
9 - "relationships" dimension uses SetSchema keyed by (from, edge_type, to)
10 - merge_mode is "three_way"
11 - Plugin satisfies MuseDomainPlugin protocol (runtime isinstance check)
12 - Plugin satisfies AddressedMergePlugin protocol (has merge_ops)
13 - Plugin satisfies HarmonyPlugin protocol (has conflict_fingerprint)
14 - drift() delegates to snapshot + diff correctly
15 - apply() returns live_state unchanged (post-processing no-op for now)
16 """
17 from __future__ import annotations
18
19 import pathlib
20
21 import pytest
22
23 from muse.domain import (
24 DriftReport,
25 LiveState,
26 MergeResult,
27 MuseDomainPlugin,
28 AddressedMergePlugin,
29 SnapshotManifest,
30 StateDelta,
31 StateSnapshot,
32 )
33 from muse.plugins.identity.plugin import IdentityPlugin
34 from muse.core.types import long_id
35
36
37 @pytest.fixture
38 def plugin() -> IdentityPlugin:
39 return IdentityPlugin()
40
41
42 # ── DomainSchema ──────────────────────────────────────────────────────────────
43
44 class TestSchema:
45 def test_domain_is_identity(self, plugin: IdentityPlugin) -> None:
46 assert plugin.schema()["domain"] == "identity"
47
48 def test_description_non_empty(self, plugin: IdentityPlugin) -> None:
49 assert plugin.schema()["description"]
50
51 def test_has_two_dimensions(self, plugin: IdentityPlugin) -> None:
52 dims = plugin.schema()["dimensions"]
53 assert len(dims) == 2
54
55 def test_identities_dimension_exists(self, plugin: IdentityPlugin) -> None:
56 names = {d["name"] for d in plugin.schema()["dimensions"]}
57 assert "identities" in names
58
59 def test_relationships_dimension_exists(self, plugin: IdentityPlugin) -> None:
60 names = {d["name"] for d in plugin.schema()["dimensions"]}
61 assert "relationships" in names
62
63 def test_both_dimensions_independently_mergeable(self, plugin: IdentityPlugin) -> None:
64 for dim in plugin.schema()["dimensions"]:
65 assert dim["independent_merge"] is True
66
67 def test_identities_dimension_is_set_schema(self, plugin: IdentityPlugin) -> None:
68 dim = next(d for d in plugin.schema()["dimensions"] if d["name"] == "identities")
69 assert dim["schema"]["kind"] == "set"
70
71 def test_relationships_dimension_is_set_schema(self, plugin: IdentityPlugin) -> None:
72 dim = next(d for d in plugin.schema()["dimensions"] if d["name"] == "relationships")
73 assert dim["schema"]["kind"] == "set"
74
75 def test_merge_mode_is_three_way(self, plugin: IdentityPlugin) -> None:
76 assert plugin.schema()["merge_mode"] == "three_way"
77
78 def test_schema_version_is_string(self, plugin: IdentityPlugin) -> None:
79 assert isinstance(plugin.schema()["schema_version"], str)
80
81
82 # ── Protocol conformance ──────────────────────────────────────────────────────
83
84 class TestProtocolConformance:
85 def test_satisfies_muse_domain_plugin(self, plugin: IdentityPlugin) -> None:
86 assert isinstance(plugin, MuseDomainPlugin)
87
88 def test_satisfies_addressed_merge_plugin(self, plugin: IdentityPlugin) -> None:
89 assert isinstance(plugin, AddressedMergePlugin)
90
91 def test_has_conflict_fingerprint_harmony(self, plugin: IdentityPlugin) -> None:
92 assert callable(getattr(plugin, "conflict_fingerprint", None))
93
94 def test_has_all_six_required_methods(self, plugin: IdentityPlugin) -> None:
95 for method in ("snapshot", "diff", "merge", "drift", "apply", "schema"):
96 assert callable(getattr(plugin, method, None)), f"missing: {method}"
97
98 def test_has_merge_ops(self, plugin: IdentityPlugin) -> None:
99 assert callable(getattr(plugin, "merge_ops", None))
100
101
102 # ── drift() ───────────────────────────────────────────────────────────────────
103
104 class TestDrift:
105 def test_no_drift_when_tree_matches_commit(self, plugin: IdentityPlugin, tmp_path: pathlib.Path) -> None:
106 snap = plugin.snapshot(tmp_path)
107 report = plugin.drift(snap, tmp_path)
108 assert report.has_drift is False
109
110 def test_drift_detected_on_new_file(self, plugin: IdentityPlugin, tmp_path: pathlib.Path) -> None:
111 snap = plugin.snapshot(tmp_path)
112 (tmp_path / "identities").mkdir()
113 (tmp_path / "identities" / "gabriel.json").write_bytes(b'{"handle":"gabriel"}')
114 report = plugin.drift(snap, tmp_path)
115 assert report.has_drift is True
116
117 def test_drift_report_has_summary(self, plugin: IdentityPlugin, tmp_path: pathlib.Path) -> None:
118 snap = plugin.snapshot(tmp_path)
119 (tmp_path / "identities").mkdir()
120 (tmp_path / "identities" / "alice.json").write_bytes(b'{"handle":"alice"}')
121 report = plugin.drift(snap, tmp_path)
122 assert hasattr(report, "summary")
123
124 def test_no_drift_summary_indicates_clean(self, plugin: IdentityPlugin, tmp_path: pathlib.Path) -> None:
125 snap = plugin.snapshot(tmp_path)
126 report = plugin.drift(snap, tmp_path)
127 # summary should convey "nothing changed"
128 assert not report.has_drift
129
130
131 # ── apply() ───────────────────────────────────────────────────────────────────
132
133 class TestApply:
134 def test_apply_returns_live_state(self, plugin: IdentityPlugin, tmp_path: pathlib.Path) -> None:
135 snap = plugin.snapshot(tmp_path)
136 delta: StateDelta = {"ops": [], "summary": "", "domain": "identity"}
137 result = plugin.apply(delta, tmp_path)
138 assert result == tmp_path
139
140 def test_apply_with_manifest_returns_manifest(self, plugin: IdentityPlugin) -> None:
141 manifest = SnapshotManifest(
142 files={"identities/alice.json": long_id("a" * 64)},
143 domain="identity",
144 directories=[],
145 )
146 delta: StateDelta = {"ops": [], "summary": "", "domain": "identity"}
147 result = plugin.apply(delta, manifest)
148 assert result == manifest
File History 1 commit