"""TDD — IdentityPushValidator. Hub-side enforcement layer for identity-domain pushes. Called by wire_push before any objects are persisted. Enforces all three invariants against the full committed state. Invariants ---------- I1 Acyclicity — hard error → push rejected I2 Root distance — warning → push accepted, node annotated as orphaned I3 Authorization — hard error → push rejected Authorization rules ------------------- spawns(from, to) → from_handle must appear in authorized_by member_of(member, org) → quorum-many CURRENT members of org must appear in authorized_by "current member" = another identity with a member_of edge to the same org The org's quorum threshold comes from its IdentityRecord.quorum field. """ from __future__ import annotations from decimal import Decimal import pytest from musehub.types.json_types import JSONObject from musehub.graph.push_validator import ( IdentityPushValidator, ValidationResult, ) # ── record factories ────────────────────────────────────────────────────────── def human(handle: str) -> JSONObject: return dict(handle=handle, type="human", pubkey="ed25519:AAAA", quorum=None, registered_at="2026-04-21T00:00:00Z", metadata={}) def agent(handle: str) -> JSONObject: return dict(handle=handle, type="agent", pubkey="ed25519:BBBB", quorum=None, registered_at="2026-04-21T00:00:00Z", metadata={}) def org(handle: str, quorum: int = 1) -> JSONObject: return dict(handle=handle, type="org", pubkey=None, quorum=quorum, registered_at="2026-04-21T00:00:00Z", metadata={}) def spawns(frm: str, to: str, *signers: str) -> JSONObject: return dict( from_handle=frm, to_handle=to, edge_type="spawns", weight=None, authorized_by=[dict(signer=s, signature="ed25519:SIG", signed_at="2026-04-21T00:00:00Z") for s in signers], ) def member_of(member: str, org_handle: str, weight: str = "1", *signers: str) -> JSONObject: return dict( from_handle=member, to_handle=org_handle, edge_type="member_of", weight=weight, authorized_by=[dict(signer=s, signature="ed25519:SIG", signed_at="2026-04-21T00:00:00Z") for s in signers], ) @pytest.fixture def v() -> IdentityPushValidator: return IdentityPushValidator() # ── empty / trivial ─────────────────────────────────────────────────────────── class TestTrivial: def test_empty_graph_is_valid(self, v: IdentityPushValidator) -> None: result = v.validate([], []) assert result.valid is True assert result.errors == [] def test_single_human_is_valid(self, v: IdentityPushValidator) -> None: result = v.validate([human("gabriel")], []) assert result.valid is True def test_single_org_no_members_warning(self, v: IdentityPushValidator) -> None: result = v.validate([org("acme")], []) # orphaned org — no path to a human root assert result.valid is True # warning, not error assert any("acme" in w for w in result.warnings) def test_orphaned_agent_warning(self, v: IdentityPushValidator) -> None: result = v.validate([agent("bot")], []) assert result.valid is True assert any("bot" in w for w in result.warnings) # ── I1 Acyclicity ───────────────────────────────────────────────────────────── class TestI1Acyclicity: def test_linear_chain_valid(self, v: IdentityPushValidator) -> None: identities = [human("h"), agent("a1"), agent("a2")] rels = [spawns("h", "a1", "h"), spawns("a1", "a2", "a1")] assert v.validate(identities, rels).valid is True def test_self_loop_rejected(self, v: IdentityPushValidator) -> None: identities = [agent("bot")] rels = [spawns("bot", "bot", "bot")] result = v.validate(identities, rels) assert result.valid is False assert any("cycle" in e.lower() or "I1" in e for e in result.errors) def test_direct_cycle_rejected(self, v: IdentityPushValidator) -> None: identities = [human("alice"), agent("bot")] rels = [spawns("alice", "bot", "alice"), spawns("bot", "alice", "alice")] result = v.validate(identities, rels) assert result.valid is False assert result.errors def test_indirect_cycle_three_nodes_rejected(self, v: IdentityPushValidator) -> None: identities = [agent("a"), agent("b"), agent("c")] rels = [spawns("a", "b", "a"), spawns("b", "c", "b"), spawns("c", "a", "c")] result = v.validate(identities, rels) assert result.valid is False def test_diamond_dag_valid(self, v: IdentityPushValidator) -> None: # alice→a1, alice→a2, a1→target, a2→target — valid DAG, no cycle identities = [human("alice"), agent("a1"), agent("a2"), agent("target")] rels = [ spawns("alice", "a1", "alice"), spawns("alice", "a2", "alice"), spawns("a1", "target", "a1"), spawns("a2", "target", "a2"), ] assert v.validate(identities, rels).valid is True def test_member_of_cycle_rejected(self, v: IdentityPushValidator) -> None: identities = [org("org-a"), org("org-b")] rels = [member_of("org-a", "org-b", "1"), member_of("org-b", "org-a", "1")] result = v.validate(identities, rels) assert result.valid is False def test_cross_edge_type_cycle_rejected(self, v: IdentityPushValidator) -> None: # alice --spawns--> bot --member_of--> alice (cross-type cycle) identities = [human("alice"), agent("bot")] rels = [spawns("alice", "bot", "alice"), member_of("bot", "alice", "1")] result = v.validate(identities, rels) assert result.valid is False # ── I2 Root distance ────────────────────────────────────────────────────────── class TestI2RootDistance: def test_human_no_warning(self, v: IdentityPushValidator) -> None: result = v.validate([human("gabriel")], []) assert result.warnings == [] def test_agent_spawned_by_human_no_warning(self, v: IdentityPushValidator) -> None: identities = [human("gabriel"), agent("bot")] rels = [spawns("gabriel", "bot", "gabriel")] result = v.validate(identities, rels) assert not any("bot" in w for w in result.warnings) def test_agent_chain_no_warning(self, v: IdentityPushValidator) -> None: identities = [human("h"), agent("a1"), agent("a2"), agent("a3")] rels = [ spawns("h", "a1", "h"), spawns("a1", "a2", "h"), spawns("a2", "a3", "h"), ] assert v.validate(identities, rels).warnings == [] def test_orphaned_agent_warned(self, v: IdentityPushValidator) -> None: result = v.validate([agent("bot")], []) assert any("bot" in w for w in result.warnings) def test_org_with_human_member_no_warning(self, v: IdentityPushValidator) -> None: identities = [human("alice"), org("acme")] rels = [member_of("alice", "acme", "1", "alice")] assert not any("acme" in w for w in v.validate(identities, rels).warnings) def test_org_with_no_human_path_warned(self, v: IdentityPushValidator) -> None: # org exists but no members at all result = v.validate([org("acme")], []) assert any("acme" in w for w in result.warnings) def test_nested_org_with_human_root_no_warning(self, v: IdentityPushValidator) -> None: identities = [human("alice"), org("sub"), org("parent")] rels = [ member_of("alice", "sub", "1", "alice"), member_of("sub", "parent", "1", "alice"), ] result = v.validate(identities, rels) assert not any("parent" in w for w in result.warnings) # ── I3 Authorization — spawns ───────────────────────────────────────────────── class TestI3AuthSpawns: def test_spawner_signature_present_valid(self, v: IdentityPushValidator) -> None: identities = [human("gabriel"), agent("bot")] rels = [spawns("gabriel", "bot", "gabriel")] # gabriel signed assert v.validate(identities, rels).valid is True def test_spawner_signature_absent_rejected(self, v: IdentityPushValidator) -> None: identities = [human("gabriel"), agent("bot")] rels = [spawns("gabriel", "bot")] # no signatures result = v.validate(identities, rels) assert result.valid is False assert any("authorized" in e.lower() or "signature" in e.lower() or "I3" in e for e in result.errors) def test_wrong_signer_rejected(self, v: IdentityPushValidator) -> None: # alice tries to authorize gabriel's spawn — she has no right identities = [human("gabriel"), human("alice"), agent("bot")] rels = [spawns("gabriel", "bot", "alice")] result = v.validate(identities, rels) assert result.valid is False def test_extra_signers_ok_as_long_as_spawner_present(self, v: IdentityPushValidator) -> None: identities = [human("gabriel"), human("alice"), agent("bot")] rels = [spawns("gabriel", "bot", "gabriel", "alice")] # gabriel + alice assert v.validate(identities, rels).valid is True def test_agent_spawning_agent_authorized_by_original_spawner(self, v: IdentityPushValidator) -> None: # gabriel → bot-1 → bot-2; bot-1 must sign the bot-2 spawn identities = [human("gabriel"), agent("bot-1"), agent("bot-2")] rels = [ spawns("gabriel", "bot-1", "gabriel"), spawns("bot-1", "bot-2", "bot-1"), ] assert v.validate(identities, rels).valid is True def test_agent_spawn_wrong_signer_rejected(self, v: IdentityPushValidator) -> None: identities = [human("gabriel"), agent("bot-1"), agent("bot-2")] rels = [ spawns("gabriel", "bot-1", "gabriel"), spawns("bot-1", "bot-2", "gabriel"), # gabriel signs bot-1's spawn — wrong ] result = v.validate(identities, rels) assert result.valid is False # ── I3 Authorization — member_of ───────────────────────────────────────────── class TestI3AuthMemberOf: def test_first_member_self_authorized_valid(self, v: IdentityPushValidator) -> None: # First member of an org — no prior members — authorized by themselves identities = [human("alice"), org("acme", quorum=1)] rels = [member_of("alice", "acme", "1", "alice")] assert v.validate(identities, rels).valid is True def test_membership_requires_quorum_of_current_members(self, v: IdentityPushValidator) -> None: # acme has quorum=2; alice and bob are members # carol wants to join — needs 2 current member signatures identities = [human("alice"), human("bob"), human("carol"), org("acme", quorum=2)] rels = [ member_of("alice", "acme", "1", "alice"), member_of("bob", "acme", "1", "alice"), # alice authorized bob member_of("carol", "acme", "1", "alice", "bob"), # alice + bob → quorum=2 ✓ ] assert v.validate(identities, rels).valid is True def test_membership_insufficient_signatures_rejected(self, v: IdentityPushValidator) -> None: identities = [human("alice"), human("bob"), human("carol"), org("acme", quorum=2)] rels = [ member_of("alice", "acme", "1", "alice"), member_of("bob", "acme", "1", "alice"), member_of("carol", "acme", "1", "alice"), # only alice signed — quorum=2 not met ] result = v.validate(identities, rels) assert result.valid is False def test_non_member_signature_does_not_count(self, v: IdentityPushValidator) -> None: # dave is not a member of acme — his signature shouldn't satisfy quorum identities = [human("alice"), human("dave"), org("acme", quorum=1)] rels = [ member_of("alice", "acme", "1", "alice"), member_of("dave", "acme", "1", "dave"), # dave signs his own join — he's not yet a member ] # dave's self-authorization shouldn't count unless acme quorum=1 allows first-member join # Since alice is already a member, dave needs alice's signature result = v.validate(identities, rels) assert result.valid is False def test_no_signatures_rejected(self, v: IdentityPushValidator) -> None: identities = [human("alice"), org("acme", quorum=1)] rels = [member_of("alice", "acme", "1")] # no signatures result = v.validate(identities, rels) assert result.valid is False def test_quorum_1_founder_joins_empty_org_valid(self, v: IdentityPushValidator) -> None: # The very first member of a quorum=1 org — self-authorization is sufficient identities = [human("gabriel"), org("musehub-org", quorum=1)] rels = [member_of("gabriel", "musehub-org", "1", "gabriel")] assert v.validate(identities, rels).valid is True # ── Combined scenarios ──────────────────────────────────────────────────────── class TestCombinedScenarios: def test_full_graph_gabriel_claudecode_musehub(self, v: IdentityPushValidator) -> None: """The demo scenario from the experiment: all valid. Order matters for I3 ordered processing: 1. gabriel joins graph-lab first (founder, self-signs) 2. claude-code joins (1 prior = gabriel, min(2,1)=1 → gabriel signs) 3. musehub-org joins (2 prior = {gabriel, claude-code}, min(2,2)=2 → both sign) """ identities = [ human("gabriel"), agent("claude-code"), org("musehub-org", quorum=1), org("graph-lab", quorum=2), ] rels = [ spawns("gabriel", "claude-code", "gabriel"), member_of("gabriel", "musehub-org", "1", "gabriel"), member_of("gabriel", "graph-lab", "1", "gabriel"), member_of("claude-code", "graph-lab", "1", "gabriel"), member_of("musehub-org", "graph-lab", "1", "gabriel", "claude-code"), ] result = v.validate(identities, rels) assert result.valid is True assert result.errors == [] def test_i1_error_does_not_suppress_i3_errors(self, v: IdentityPushValidator) -> None: # Both a cycle AND a missing signature — both errors reported identities = [human("alice"), agent("bot")] rels = [ spawns("alice", "bot"), # I3: no signature spawns("bot", "alice", "bot"), # I1: cycle ] result = v.validate(identities, rels) assert result.valid is False assert len(result.errors) >= 2 def test_warnings_do_not_make_result_invalid(self, v: IdentityPushValidator) -> None: # Valid graph + orphaned agent → valid but with warning identities = [human("gabriel"), agent("orphan")] rels = [spawns("gabriel", "gabriel")] # no spawn of orphan # orphan has no spawner → warning result = v.validate([human("gabriel"), agent("orphan")], []) assert result.valid is True assert result.warnings # at least one warning about orphan