gabriel / musehub public
test_identity_push_validator.py python
344 lines 15.8 KB
Raw
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32 fix: fall back to DB ancestry check when mpack-only fast-fo… Sonnet 4.6 patch 7 days ago
1 """TDD — IdentityPushValidator.
2
3 Hub-side enforcement layer for identity-domain pushes.
4 Called by wire_push before any objects are persisted.
5 Enforces all three invariants against the full committed state.
6
7 Invariants
8 ----------
9 I1 Acyclicity — hard error → push rejected
10 I2 Root distance — warning → push accepted, node annotated as orphaned
11 I3 Authorization — hard error → push rejected
12
13 Authorization rules
14 -------------------
15 spawns(from, to) → from_handle must appear in authorized_by
16 member_of(member, org) → quorum-many CURRENT members of org must appear in authorized_by
17 "current member" = another identity with a member_of edge to the same org
18 The org's quorum threshold comes from its IdentityRecord.quorum field.
19 """
20 from __future__ import annotations
21
22 from decimal import Decimal
23
24 import pytest
25 from musehub.types.json_types import JSONObject
26
27 from musehub.graph.push_validator import (
28 IdentityPushValidator,
29 ValidationResult,
30 )
31
32
33 # ── record factories ──────────────────────────────────────────────────────────
34
35 def human(handle: str) -> JSONObject:
36 return dict(handle=handle, type="human", pubkey="ed25519:AAAA",
37 quorum=None, registered_at="2026-04-21T00:00:00Z", metadata={})
38
39
40 def agent(handle: str) -> JSONObject:
41 return dict(handle=handle, type="agent", pubkey="ed25519:BBBB",
42 quorum=None, registered_at="2026-04-21T00:00:00Z", metadata={})
43
44
45 def org(handle: str, quorum: int = 1) -> JSONObject:
46 return dict(handle=handle, type="org", pubkey=None,
47 quorum=quorum, registered_at="2026-04-21T00:00:00Z", metadata={})
48
49
50 def spawns(frm: str, to: str, *signers: str) -> JSONObject:
51 return dict(
52 from_handle=frm, to_handle=to, edge_type="spawns",
53 weight=None,
54 authorized_by=[dict(signer=s, signature="ed25519:SIG", signed_at="2026-04-21T00:00:00Z")
55 for s in signers],
56 )
57
58
59 def member_of(member: str, org_handle: str, weight: str = "1", *signers: str) -> JSONObject:
60 return dict(
61 from_handle=member, to_handle=org_handle, edge_type="member_of",
62 weight=weight,
63 authorized_by=[dict(signer=s, signature="ed25519:SIG", signed_at="2026-04-21T00:00:00Z")
64 for s in signers],
65 )
66
67
68 @pytest.fixture
69 def v() -> IdentityPushValidator:
70 return IdentityPushValidator()
71
72
73 # ── empty / trivial ───────────────────────────────────────────────────────────
74
75 class TestTrivial:
76 def test_empty_graph_is_valid(self, v: IdentityPushValidator) -> None:
77 result = v.validate([], [])
78 assert result.valid is True
79 assert result.errors == []
80
81 def test_single_human_is_valid(self, v: IdentityPushValidator) -> None:
82 result = v.validate([human("gabriel")], [])
83 assert result.valid is True
84
85 def test_single_org_no_members_warning(self, v: IdentityPushValidator) -> None:
86 result = v.validate([org("acme")], [])
87 # orphaned org — no path to a human root
88 assert result.valid is True # warning, not error
89 assert any("acme" in w for w in result.warnings)
90
91 def test_orphaned_agent_warning(self, v: IdentityPushValidator) -> None:
92 result = v.validate([agent("bot")], [])
93 assert result.valid is True
94 assert any("bot" in w for w in result.warnings)
95
96
97 # ── I1 Acyclicity ─────────────────────────────────────────────────────────────
98
99 class TestI1Acyclicity:
100 def test_linear_chain_valid(self, v: IdentityPushValidator) -> None:
101 identities = [human("h"), agent("a1"), agent("a2")]
102 rels = [spawns("h", "a1", "h"), spawns("a1", "a2", "a1")]
103 assert v.validate(identities, rels).valid is True
104
105 def test_self_loop_rejected(self, v: IdentityPushValidator) -> None:
106 identities = [agent("bot")]
107 rels = [spawns("bot", "bot", "bot")]
108 result = v.validate(identities, rels)
109 assert result.valid is False
110 assert any("cycle" in e.lower() or "I1" in e for e in result.errors)
111
112 def test_direct_cycle_rejected(self, v: IdentityPushValidator) -> None:
113 identities = [human("alice"), agent("bot")]
114 rels = [spawns("alice", "bot", "alice"), spawns("bot", "alice", "alice")]
115 result = v.validate(identities, rels)
116 assert result.valid is False
117 assert result.errors
118
119 def test_indirect_cycle_three_nodes_rejected(self, v: IdentityPushValidator) -> None:
120 identities = [agent("a"), agent("b"), agent("c")]
121 rels = [spawns("a", "b", "a"), spawns("b", "c", "b"), spawns("c", "a", "c")]
122 result = v.validate(identities, rels)
123 assert result.valid is False
124
125 def test_diamond_dag_valid(self, v: IdentityPushValidator) -> None:
126 # alice→a1, alice→a2, a1→target, a2→target — valid DAG, no cycle
127 identities = [human("alice"), agent("a1"), agent("a2"), agent("target")]
128 rels = [
129 spawns("alice", "a1", "alice"),
130 spawns("alice", "a2", "alice"),
131 spawns("a1", "target", "a1"),
132 spawns("a2", "target", "a2"),
133 ]
134 assert v.validate(identities, rels).valid is True
135
136 def test_member_of_cycle_rejected(self, v: IdentityPushValidator) -> None:
137 identities = [org("org-a"), org("org-b")]
138 rels = [member_of("org-a", "org-b", "1"), member_of("org-b", "org-a", "1")]
139 result = v.validate(identities, rels)
140 assert result.valid is False
141
142 def test_cross_edge_type_cycle_rejected(self, v: IdentityPushValidator) -> None:
143 # alice --spawns--> bot --member_of--> alice (cross-type cycle)
144 identities = [human("alice"), agent("bot")]
145 rels = [spawns("alice", "bot", "alice"), member_of("bot", "alice", "1")]
146 result = v.validate(identities, rels)
147 assert result.valid is False
148
149
150 # ── I2 Root distance ──────────────────────────────────────────────────────────
151
152 class TestI2RootDistance:
153 def test_human_no_warning(self, v: IdentityPushValidator) -> None:
154 result = v.validate([human("gabriel")], [])
155 assert result.warnings == []
156
157 def test_agent_spawned_by_human_no_warning(self, v: IdentityPushValidator) -> None:
158 identities = [human("gabriel"), agent("bot")]
159 rels = [spawns("gabriel", "bot", "gabriel")]
160 result = v.validate(identities, rels)
161 assert not any("bot" in w for w in result.warnings)
162
163 def test_agent_chain_no_warning(self, v: IdentityPushValidator) -> None:
164 identities = [human("h"), agent("a1"), agent("a2"), agent("a3")]
165 rels = [
166 spawns("h", "a1", "h"),
167 spawns("a1", "a2", "h"),
168 spawns("a2", "a3", "h"),
169 ]
170 assert v.validate(identities, rels).warnings == []
171
172 def test_orphaned_agent_warned(self, v: IdentityPushValidator) -> None:
173 result = v.validate([agent("bot")], [])
174 assert any("bot" in w for w in result.warnings)
175
176 def test_org_with_human_member_no_warning(self, v: IdentityPushValidator) -> None:
177 identities = [human("alice"), org("acme")]
178 rels = [member_of("alice", "acme", "1", "alice")]
179 assert not any("acme" in w for w in v.validate(identities, rels).warnings)
180
181 def test_org_with_no_human_path_warned(self, v: IdentityPushValidator) -> None:
182 # org exists but no members at all
183 result = v.validate([org("acme")], [])
184 assert any("acme" in w for w in result.warnings)
185
186 def test_nested_org_with_human_root_no_warning(self, v: IdentityPushValidator) -> None:
187 identities = [human("alice"), org("sub"), org("parent")]
188 rels = [
189 member_of("alice", "sub", "1", "alice"),
190 member_of("sub", "parent", "1", "alice"),
191 ]
192 result = v.validate(identities, rels)
193 assert not any("parent" in w for w in result.warnings)
194
195
196 # ── I3 Authorization — spawns ─────────────────────────────────────────────────
197
198 class TestI3AuthSpawns:
199 def test_spawner_signature_present_valid(self, v: IdentityPushValidator) -> None:
200 identities = [human("gabriel"), agent("bot")]
201 rels = [spawns("gabriel", "bot", "gabriel")] # gabriel signed
202 assert v.validate(identities, rels).valid is True
203
204 def test_spawner_signature_absent_rejected(self, v: IdentityPushValidator) -> None:
205 identities = [human("gabriel"), agent("bot")]
206 rels = [spawns("gabriel", "bot")] # no signatures
207 result = v.validate(identities, rels)
208 assert result.valid is False
209 assert any("authorized" in e.lower() or "signature" in e.lower() or "I3" in e
210 for e in result.errors)
211
212 def test_wrong_signer_rejected(self, v: IdentityPushValidator) -> None:
213 # alice tries to authorize gabriel's spawn — she has no right
214 identities = [human("gabriel"), human("alice"), agent("bot")]
215 rels = [spawns("gabriel", "bot", "alice")]
216 result = v.validate(identities, rels)
217 assert result.valid is False
218
219 def test_extra_signers_ok_as_long_as_spawner_present(self, v: IdentityPushValidator) -> None:
220 identities = [human("gabriel"), human("alice"), agent("bot")]
221 rels = [spawns("gabriel", "bot", "gabriel", "alice")] # gabriel + alice
222 assert v.validate(identities, rels).valid is True
223
224 def test_agent_spawning_agent_authorized_by_original_spawner(self, v: IdentityPushValidator) -> None:
225 # gabriel → bot-1 → bot-2; bot-1 must sign the bot-2 spawn
226 identities = [human("gabriel"), agent("bot-1"), agent("bot-2")]
227 rels = [
228 spawns("gabriel", "bot-1", "gabriel"),
229 spawns("bot-1", "bot-2", "bot-1"),
230 ]
231 assert v.validate(identities, rels).valid is True
232
233 def test_agent_spawn_wrong_signer_rejected(self, v: IdentityPushValidator) -> None:
234 identities = [human("gabriel"), agent("bot-1"), agent("bot-2")]
235 rels = [
236 spawns("gabriel", "bot-1", "gabriel"),
237 spawns("bot-1", "bot-2", "gabriel"), # gabriel signs bot-1's spawn — wrong
238 ]
239 result = v.validate(identities, rels)
240 assert result.valid is False
241
242
243 # ── I3 Authorization — member_of ─────────────────────────────────────────────
244
245 class TestI3AuthMemberOf:
246 def test_first_member_self_authorized_valid(self, v: IdentityPushValidator) -> None:
247 # First member of an org — no prior members — authorized by themselves
248 identities = [human("alice"), org("acme", quorum=1)]
249 rels = [member_of("alice", "acme", "1", "alice")]
250 assert v.validate(identities, rels).valid is True
251
252 def test_membership_requires_quorum_of_current_members(self, v: IdentityPushValidator) -> None:
253 # acme has quorum=2; alice and bob are members
254 # carol wants to join — needs 2 current member signatures
255 identities = [human("alice"), human("bob"), human("carol"), org("acme", quorum=2)]
256 rels = [
257 member_of("alice", "acme", "1", "alice"),
258 member_of("bob", "acme", "1", "alice"), # alice authorized bob
259 member_of("carol", "acme", "1", "alice", "bob"), # alice + bob → quorum=2 ✓
260 ]
261 assert v.validate(identities, rels).valid is True
262
263 def test_membership_insufficient_signatures_rejected(self, v: IdentityPushValidator) -> None:
264 identities = [human("alice"), human("bob"), human("carol"), org("acme", quorum=2)]
265 rels = [
266 member_of("alice", "acme", "1", "alice"),
267 member_of("bob", "acme", "1", "alice"),
268 member_of("carol", "acme", "1", "alice"), # only alice signed — quorum=2 not met
269 ]
270 result = v.validate(identities, rels)
271 assert result.valid is False
272
273 def test_non_member_signature_does_not_count(self, v: IdentityPushValidator) -> None:
274 # dave is not a member of acme — his signature shouldn't satisfy quorum
275 identities = [human("alice"), human("dave"), org("acme", quorum=1)]
276 rels = [
277 member_of("alice", "acme", "1", "alice"),
278 member_of("dave", "acme", "1", "dave"), # dave signs his own join — he's not yet a member
279 ]
280 # dave's self-authorization shouldn't count unless acme quorum=1 allows first-member join
281 # Since alice is already a member, dave needs alice's signature
282 result = v.validate(identities, rels)
283 assert result.valid is False
284
285 def test_no_signatures_rejected(self, v: IdentityPushValidator) -> None:
286 identities = [human("alice"), org("acme", quorum=1)]
287 rels = [member_of("alice", "acme", "1")] # no signatures
288 result = v.validate(identities, rels)
289 assert result.valid is False
290
291 def test_quorum_1_founder_joins_empty_org_valid(self, v: IdentityPushValidator) -> None:
292 # The very first member of a quorum=1 org — self-authorization is sufficient
293 identities = [human("gabriel"), org("musehub-org", quorum=1)]
294 rels = [member_of("gabriel", "musehub-org", "1", "gabriel")]
295 assert v.validate(identities, rels).valid is True
296
297
298 # ── Combined scenarios ────────────────────────────────────────────────────────
299
300 class TestCombinedScenarios:
301 def test_full_graph_gabriel_claudecode_musehub(self, v: IdentityPushValidator) -> None:
302 """The demo scenario from the experiment: all valid.
303
304 Order matters for I3 ordered processing:
305 1. gabriel joins graph-lab first (founder, self-signs)
306 2. claude-code joins (1 prior = gabriel, min(2,1)=1 → gabriel signs)
307 3. musehub-org joins (2 prior = {gabriel, claude-code}, min(2,2)=2 → both sign)
308 """
309 identities = [
310 human("gabriel"),
311 agent("claude-code"),
312 org("musehub-org", quorum=1),
313 org("graph-lab", quorum=2),
314 ]
315 rels = [
316 spawns("gabriel", "claude-code", "gabriel"),
317 member_of("gabriel", "musehub-org", "1", "gabriel"),
318 member_of("gabriel", "graph-lab", "1", "gabriel"),
319 member_of("claude-code", "graph-lab", "1", "gabriel"),
320 member_of("musehub-org", "graph-lab", "1", "gabriel", "claude-code"),
321 ]
322 result = v.validate(identities, rels)
323 assert result.valid is True
324 assert result.errors == []
325
326 def test_i1_error_does_not_suppress_i3_errors(self, v: IdentityPushValidator) -> None:
327 # Both a cycle AND a missing signature — both errors reported
328 identities = [human("alice"), agent("bot")]
329 rels = [
330 spawns("alice", "bot"), # I3: no signature
331 spawns("bot", "alice", "bot"), # I1: cycle
332 ]
333 result = v.validate(identities, rels)
334 assert result.valid is False
335 assert len(result.errors) >= 2
336
337 def test_warnings_do_not_make_result_invalid(self, v: IdentityPushValidator) -> None:
338 # Valid graph + orphaned agent → valid but with warning
339 identities = [human("gabriel"), agent("orphan")]
340 rels = [spawns("gabriel", "gabriel")] # no spawn of orphan
341 # orphan has no spawner → warning
342 result = v.validate([human("gabriel"), agent("orphan")], [])
343 assert result.valid is True
344 assert result.warnings # at least one warning about orphan
File History 1 commit
sha256:7d6dd8f4a89e2d1fef2d84f6e65feaff51385d382f466766b7f690a22ec18e32 fix: fall back to DB ancestry check when mpack-only fast-fo… Sonnet 4.6 patch 7 days ago