gabriel / muse public
plugin.py python
270 lines 9.8 KB
Raw
sha256:ff478cfdcdd4b7fd6de89cb68896601a981f945634463275ec333bd20ca36402 Merge branch 'dev' into main Human 21 days ago
1 """Identity domain plugin — version-controlled human/agent/org identity graph.
2
3 Implements:
4 - MuseDomainPlugin (6 required methods)
5 - AddressedMergePlugin (merge_ops for per-entity address-keyed map merge)
6 - HarmonyPlugin (conflict_fingerprint for semantic replay)
7
8 Working tree layout
9 -------------------
10 identities/<handle>.json — one IdentityRecord per node
11 relationships/<from>--<edge>--<to>.json — one RelationshipRecord per edge
12
13 Invariants enforced
14 -------------------
15 I1 Acyclicity — merge() rejects any new relationship that would introduce
16 a cycle in the combined edge set. The graph is never left
17 in a cyclic state.
18 I2 Root distance — derived property; not enforced at merge time.
19 I3 Quorum soundness — hub-level concern; not enforced at merge time.
20 """
21
22 import json
23 import os
24 import pathlib
25 import stat as _stat
26
27 from muse._version import __version__
28 from muse.core.types import blob_id
29 from muse.core.diff_algorithms import snapshot_diff
30 from muse.core.schema import DimensionSpec, DomainSchema, SetSchema
31 from muse.domain import (
32 DomainOp,
33 DriftReport,
34 LiveState,
35 MergeResult,
36 SnapshotManifest,
37 StateDelta,
38 StateSnapshot,
39 StructuredDelta,
40 )
41
42 from .dag import _DAG, build_dag_from_paths
43 from .records import parse_relationship_path
44
45 _DOMAIN = "identity"
46 _VALID_DIRS = {"identities", "relationships"}
47
48 class IdentityPlugin:
49 """Identity domain plugin — satisfies MuseDomainPlugin, AddressedMergePlugin, HarmonyPlugin."""
50
51 # ── MuseDomainPlugin — required ────────────────────────────────────────────
52
53 def snapshot(self, live_state: LiveState) -> StateSnapshot:
54 if isinstance(live_state, pathlib.Path):
55 return self._snapshot_dir(live_state)
56 return live_state # already a SnapshotManifest
57
58 def diff(
59 self,
60 base: StateSnapshot,
61 target: StateSnapshot,
62 *,
63 repo_root: pathlib.Path | None = None,
64 ) -> StateDelta:
65 return snapshot_diff(self.schema(), base, target)
66
67 def merge(
68 self,
69 base: StateSnapshot,
70 left: StateSnapshot,
71 right: StateSnapshot,
72 *,
73 repo_root: pathlib.Path | None = None,
74 ) -> MergeResult:
75 base_files = base["files"]
76 left_files = left["files"]
77 right_files = right["files"]
78
79 merged: dict[str, str] = dict(base_files)
80 conflicts: list[str] = []
81
82 all_paths = set(base_files) | set(left_files) | set(right_files)
83 for path in sorted(all_paths):
84 b = base_files.get(path)
85 l = left_files.get(path)
86 r = right_files.get(path)
87
88 if l == r:
89 if l is None:
90 merged.pop(path, None)
91 else:
92 merged[path] = l
93 elif b == l:
94 if r is None:
95 merged.pop(path, None)
96 else:
97 merged[path] = r
98 elif b == r:
99 if l is None:
100 merged.pop(path, None)
101 else:
102 merged[path] = l
103 else:
104 conflicts.append(path)
105 merged[path] = l or r or b or ""
106
107 # I1 — reject new relationship edges that would create cycles
108 i1_conflicts = self._check_i1(base_files, merged)
109 for bad_path in i1_conflicts:
110 if bad_path not in conflicts:
111 conflicts.append(bad_path)
112 merged.pop(bad_path, None)
113
114 return MergeResult(
115 merged=SnapshotManifest(files=merged, domain=_DOMAIN, directories=[]),
116 conflicts=conflicts,
117 )
118
119 def drift(self, committed: StateSnapshot, live: LiveState) -> DriftReport:
120 current = self.snapshot(live)
121 delta = self.diff(committed, current)
122 has_drift = len(delta["ops"]) > 0
123 return DriftReport(has_drift=has_drift, summary=delta["summary"], delta=delta)
124
125 def apply(self, delta: StateDelta, live_state: LiveState) -> LiveState:
126 return live_state
127
128 def schema(self) -> DomainSchema:
129 return DomainSchema(
130 domain=_DOMAIN,
131 description=(
132 "Identity graph domain — version-controlled humans, agents, and orgs "
133 "with acyclicity enforcement (I1), root-distance indexing (I2), "
134 "and recursive quorum soundness (I3)."
135 ),
136 top_level=SetSchema(kind="set", element_type="record", identity="by_content"),
137 dimensions=[
138 DimensionSpec(
139 name="identities",
140 description="Identity nodes — human, agent, or org records keyed by handle.",
141 schema=SetSchema(kind="set", element_type="identity", identity="by_id"),
142 independent_merge=True,
143 ),
144 DimensionSpec(
145 name="relationships",
146 description=(
147 "Directed edges (SPAWNS, MEMBER_OF) keyed by (from, edge_type, to). "
148 "Additions are independently mergeable unless they introduce a cycle (I1)."
149 ),
150 schema=SetSchema(kind="set", element_type="relationship", identity="by_id"),
151 independent_merge=True,
152 ),
153 ],
154 merge_mode="three_way",
155 schema_version=__version__,
156 )
157
158 # ── AddressedMergePlugin — address-keyed map merge ────────────────────────
159
160 def merge_ops(
161 self,
162 base: StateSnapshot,
163 ours_snap: StateSnapshot,
164 theirs_snap: StateSnapshot,
165 ours_ops: list[DomainOp],
166 theirs_ops: list[DomainOp],
167 *,
168 repo_root: pathlib.Path | None = None,
169 ) -> MergeResult:
170 return self.merge(base, ours_snap, theirs_snap, repo_root=repo_root)
171
172 # ── HarmonyPlugin — semantic conflict fingerprinting ──────────────────────
173
174 def conflict_fingerprint(
175 self,
176 path: str,
177 ours_id: str,
178 theirs_id: str,
179 repo_root: pathlib.Path,
180 ) -> str:
181 """Stable ``sha256:``-prefixed fingerprint of the conflict's structural shape.
182
183 For relationship records: hashes (from, edge_type, to) — ignoring
184 timestamps and signatures so the same structural conflict is recognized
185 across different signing times, enabling Harmony to replay resolutions.
186
187 For identity records: hashes (handle, type) — ignoring pubkey rotation
188 timestamps.
189 """
190 parsed_rel = parse_relationship_path(path)
191 if parsed_rel is not None:
192 frm, edge, to = parsed_rel
193 canonical = json.dumps({"from": frm, "edge_type": edge, "to": to}, sort_keys=True)
194 else:
195 # Identity path — hash the path itself as the structural key
196 canonical = json.dumps({"path": path}, sort_keys=True)
197
198 # Fingerprint is commutative over ours/theirs ordering
199 ids_sorted = sorted([ours_id, theirs_id])
200 ids_block = "\n".join(ids_sorted)
201 payload = f"{canonical}\n{ids_block}"
202 return blob_id(payload.encode())
203
204 # ── internals ─────────────────────────────────────────────────────────────
205
206 def _snapshot_dir(self, workdir: pathlib.Path) -> SnapshotManifest:
207 files: dict[str, str] = {}
208 root_str = str(workdir)
209 prefix_len = len(root_str) + 1
210
211 for dirpath, dirnames, filenames in os.walk(root_str, followlinks=False):
212 rel_dir = dirpath[prefix_len:] if len(dirpath) > len(root_str) else ""
213 top_dir = rel_dir.split("/")[0] if rel_dir else ""
214 if top_dir and top_dir not in _VALID_DIRS:
215 dirnames[:] = []
216 continue
217 dirnames[:] = sorted(d for d in dirnames if not d.startswith("."))
218
219 for fname in sorted(filenames):
220 if not fname.endswith(".json"):
221 continue
222 abs_str = os.path.join(dirpath, fname)
223 try:
224 st = os.lstat(abs_str)
225 except OSError:
226 continue
227 if not _stat.S_ISREG(st.st_mode):
228 continue
229 rel = abs_str[prefix_len:]
230 if os.sep != "/":
231 rel = rel.replace(os.sep, "/")
232 # Only track identities/ and relationships/ paths
233 top = rel.split("/")[0]
234 if top not in _VALID_DIRS:
235 continue
236 raw = pathlib.Path(abs_str).read_bytes()
237 digest = blob_id(raw)
238 files[rel] = digest
239
240 return SnapshotManifest(files=files, domain=_DOMAIN, directories=[])
241
242 def _check_i1(
243 self,
244 base_files: dict[str, str],
245 merged_files: dict[str, str],
246 ) -> list[str]:
247 """Return relationship paths in merged_files that introduce a cycle over base."""
248 # New relationships = in merged but not in base
249 new_rels = {
250 p for p in merged_files
251 if p not in base_files and parse_relationship_path(p) is not None
252 }
253 if not new_rels:
254 return []
255
256 # Build DAG from base edges
257 dag = build_dag_from_paths(set(base_files.keys()))
258
259 bad: list[str] = []
260 for path in sorted(new_rels):
261 parsed = parse_relationship_path(path)
262 if parsed is None:
263 continue
264 frm, _edge, to = parsed
265 if dag.would_cycle(frm, to):
266 bad.append(path)
267 else:
268 dag.add_edge(frm, to)
269
270 return bad
File History 1 commit
sha256:ff478cfdcdd4b7fd6de89cb68896601a981f945634463275ec333bd20ca36402 Merge branch 'dev' into main Human 21 days ago