gabriel / muse public
rga.py python
321 lines 12.4 KB
Raw
sha256:18b983389ee1b55900fcd799bfbb496552d2e3ecded9d18cefbfef188947a12e chore: remove blob-debug test marker file Sonnet 4.6 1 day ago
1 """Replicated Growable Array (RGA) — CRDT for ordered sequences.
2
3 The RGA (Roh et al., 2011 "Replicated abstract data types") provides
4 Google-Docs-style collaborative editing semantics for any ordered sequence
5 domain. Every element carries a globally unique, immutable identifier
6 ``f"{timestamp}@{author}"``; this identifier determines insertion order when
7 two agents concurrently insert at the same position.
8
9 **Core invariant**: the visible sequence is the list of elements whose
10 ``deleted`` flag is ``False``, in the order determined by their identifiers.
11 Deletions are *tombstoned* (``deleted=True``) rather than physically removed
12 so that identifiers remain stable across replicas.
13
14 **Insertion semantics**: ``insert(after_id, element)`` inserts *element*
15 immediately after the element with ``id == after_id`` (``None`` means
16 prepend). Concurrent inserts at the same position are resolved by sorting
17 the new element's ID lexicographically (descending) — the "bigger" ID wins
18 and is placed first, giving a deterministic outcome independent of delivery
19 order.
20
21 **Lattice laws satisfied by** :meth:`join`:
22 1. Commutativity: ``join(a, b) == join(b, a)``
23 2. Associativity: ``join(join(a, b), c) == join(a, join(b, c))``
24 3. Idempotency: ``join(a, a) == a``
25
26 Public API
27 ----------
28 - :class:`RGAElement` — ``TypedDict`` for one array element.
29 - :class:`RGA` — the array itself.
30 """
31
32 import logging
33 from typing import TypedDict
34
35 type _ElemMap = dict[str, "RGAElement"]
36 logger = logging.getLogger(__name__)
37
38 class RGAElement(TypedDict):
39 """A single element in an :class:`RGA`.
40
41 ``id`` is the stable unique identifier ``"{timestamp}@{author}"`` assigned
42 at insertion time. ``value`` is the content hash of the element (it
43 references the object store — all binary content lives there).
44 ``deleted`` is ``True`` for tombstoned elements that no longer appear in
45 the visible sequence.
46
47 ``parent_id`` is the ``id`` of the element this one was inserted after
48 (``None`` means it was prepended — inserted at the head). This is
49 required for the commutative join algorithm to correctly place concurrent
50 inserts regardless of which replica initiates the join.
51 """
52
53 id: str
54 value: str
55 deleted: bool
56 parent_id: str | None
57
58 class RGA:
59 """Replicated Growable Array — CRDT for ordered sequences.
60
61 Provides ``insert``, ``delete``, ``join``, and ``to_sequence`` operations.
62 All mutating methods return new :class:`RGA` instances; ``self`` is
63 never modified.
64
65 The internal representation is a list of :class:`RGAElement` dicts in
66 insertion order (not visible order — tombstones are kept inline).
67
68 Example::
69
70 rga = RGA()
71 rga, id_a = rga.insert(None, "note-hash-A") # prepend
72 rga, id_b = rga.insert(id_a, "note-hash-B") # insert after A
73 rga = rga.delete(id_a) # tombstone A
74 assert rga.to_sequence() == ["note-hash-B"]
75 """
76
77 def __init__(self, elements: list[RGAElement] | None = None) -> None:
78 """Construct an RGA, optionally pre-populated.
79
80 Args:
81 elements: Ordered list of :class:`RGAElement` dicts (may contain
82 tombstones). Copied defensively.
83 """
84 self._elements: list[RGAElement] = list(elements) if elements else []
85
86 # ------------------------------------------------------------------
87 # Mutations (return new RGA)
88 # ------------------------------------------------------------------
89
90 def insert(self, after_id: str | None, value: str, *, element_id: str) -> RGA:
91 """Return a new RGA with *value* inserted after *after_id*.
92
93 Concurrent inserts at the same position are resolved by placing the
94 element with the lexicographically *larger* ``element_id`` first.
95
96 Args:
97 after_id: The ``id`` of the element to insert after, or ``None``
98 to prepend (insert before all existing elements).
99 value: The content hash of the new element.
100 element_id: The stable unique ID for the new element; callers
101 should use ``f"{timestamp}@{author}"`` to ensure global
102 uniqueness across agents.
103
104 Returns:
105 A new :class:`RGA` with the element inserted at the correct position.
106 """
107 new_elem: RGAElement = {
108 "id": element_id,
109 "value": value,
110 "deleted": False,
111 "parent_id": after_id,
112 }
113 elems = list(self._elements)
114
115 if after_id is None:
116 # Prepend: among concurrent prepends (same parent_id=None), larger ID goes first.
117 insert_pos = 0
118 while (
119 insert_pos < len(elems)
120 and elems[insert_pos]["parent_id"] is None
121 and elems[insert_pos]["id"] > element_id
122 ):
123 insert_pos += 1
124 elems.insert(insert_pos, new_elem)
125 else:
126 # Find the anchor element.
127 anchor_idx = next(
128 (i for i, e in enumerate(elems) if e["id"] == after_id), None
129 )
130 if anchor_idx is None:
131 # Unknown anchor — append at end (safe degradation).
132 logger.warning("RGA.insert: unknown after_id=%r, appending at end", after_id)
133 elems.append(new_elem)
134 else:
135 # Insert after anchor. Skip any existing elements that also
136 # have the same parent_id AND a larger element ID (concurrent
137 # inserts at the same position; larger ID wins leftmost slot).
138 insert_pos = anchor_idx + 1
139 while (
140 insert_pos < len(elems)
141 and elems[insert_pos]["parent_id"] == after_id
142 and elems[insert_pos]["id"] > element_id
143 ):
144 insert_pos += 1
145 elems.insert(insert_pos, new_elem)
146
147 return RGA(elems)
148
149 def delete(self, element_id: str) -> RGA:
150 """Return a new RGA with *element_id* tombstoned.
151
152 Tombstoning is idempotent — deleting an already-deleted or unknown
153 element is a no-op.
154
155 Args:
156 element_id: The ``id`` of the element to tombstone.
157
158 Returns:
159 A new :class:`RGA` with the element marked ``deleted=True``.
160 """
161 new_elems: list[RGAElement] = []
162 for elem in self._elements:
163 if elem["id"] == element_id:
164 new_elems.append({
165 "id": elem["id"],
166 "value": elem["value"],
167 "deleted": True,
168 "parent_id": elem["parent_id"],
169 })
170 else:
171 new_elems.append({
172 "id": elem["id"],
173 "value": elem["value"],
174 "deleted": elem["deleted"],
175 "parent_id": elem["parent_id"],
176 })
177 return RGA(new_elems)
178
179 # ------------------------------------------------------------------
180 # CRDT join
181 # ------------------------------------------------------------------
182
183 def join(self, other: RGA) -> RGA:
184 """Return the lattice join — the union of both arrays.
185
186 Elements are keyed by ``id``. The join:
187 1. Takes the union of all element IDs from both replicas.
188 2. For each ID, marks the element ``deleted`` if *either* replica has
189 it tombstoned (once deleted, always deleted — monotone).
190 3. Preserves the insertion-order sequence from ``self``; appends any
191 elements from ``other`` not yet seen in ``self``.
192
193 Args:
194 other: The RGA to merge with.
195
196 Returns:
197 A new :class:`RGA` that is the join of ``self`` and *other*.
198 """
199 # Build ID → element maps from both replicas.
200 self_map: _ElemMap = {e["id"]: e for e in self._elements}
201 other_map: _ElemMap = {e["id"]: e for e in other._elements}
202
203 # Merge deletions monotonically: once deleted in either, always deleted.
204 merged_map: _ElemMap = {}
205 all_ids = set(self_map) | set(other_map)
206 for eid in all_ids:
207 if eid in self_map and eid in other_map:
208 s = self_map[eid]
209 o = other_map[eid]
210 # In practice the same element_id always carries the same value
211 # (because element_id = "{timestamp}@{author}" uniquely identifies
212 # a write). If values differ (only possible in crafted test scenarios),
213 # pick the lexicographically larger value for commutativity.
214 winning_value = s["value"] if s["value"] >= o["value"] else o["value"]
215 merged_map[eid] = {
216 "id": eid,
217 "value": winning_value,
218 "deleted": s["deleted"] or o["deleted"],
219 "parent_id": s["parent_id"],
220 }
221 elif eid in self_map:
222 src = self_map[eid]
223 merged_map[eid] = {
224 "id": src["id"],
225 "value": src["value"],
226 "deleted": src["deleted"],
227 "parent_id": src["parent_id"],
228 }
229 else:
230 src = other_map[eid]
231 merged_map[eid] = {
232 "id": src["id"],
233 "value": src["value"],
234 "deleted": src["deleted"],
235 "parent_id": src["parent_id"],
236 }
237
238 # Rebuild a canonical ordered sequence using parent_id links.
239 # Group elements by parent_id. Within each group, sort by ID
240 # descending (larger ID → leftmost, per concurrent-insert tiebreak rule).
241 # Traverse recursively: start with children of None (prepended), then
242 # recurse on each child's children.
243 from collections import defaultdict
244 children: dict[str | None, list[str]] = defaultdict(list)
245 for eid, elem in merged_map.items():
246 children[elem["parent_id"]].append(eid)
247 for group in children.values():
248 group.sort(reverse=True) # larger ID first
249
250 ordered: list[RGAElement] = []
251
252 def _traverse(parent: str | None) -> None:
253 for eid in children.get(parent, []):
254 ordered.append(merged_map[eid])
255 _traverse(eid)
256
257 _traverse(None)
258 return RGA(ordered)
259
260 # ------------------------------------------------------------------
261 # Query
262 # ------------------------------------------------------------------
263
264 def to_sequence(self) -> list[str]:
265 """Return the visible element values (excluding tombstones).
266
267 Returns:
268 List of ``value`` strings in document order, tombstones excluded.
269 """
270 return [e["value"] for e in self._elements if not e["deleted"]]
271
272 def __len__(self) -> int:
273 return len([e for e in self._elements if not e["deleted"]])
274
275 # ------------------------------------------------------------------
276 # Serialisation
277 # ------------------------------------------------------------------
278
279 def to_dict(self) -> list[RGAElement]:
280 """Return a JSON-serialisable list of :class:`RGAElement` dicts.
281
282 Returns:
283 Ordered list of all elements (including tombstones).
284 """
285 return [
286 {"id": e["id"], "value": e["value"], "deleted": e["deleted"], "parent_id": e["parent_id"]}
287 for e in self._elements
288 ]
289
290 @classmethod
291 def from_dict(cls, data: list[RGAElement]) -> RGA:
292 """Reconstruct an :class:`RGA` from its wire representation.
293
294 Args:
295 data: List of :class:`RGAElement` dicts as produced by
296 :meth:`to_dict`.
297
298 Returns:
299 A new :class:`RGA`.
300 """
301 return cls(list(data))
302
303 # ------------------------------------------------------------------
304 # Python dunder helpers
305 # ------------------------------------------------------------------
306
307 def equivalent(self, other: RGA) -> bool:
308 """Return ``True`` if both RGAs have identical element lists (including tombstones).
309
310 Note: use :meth:`to_sequence` comparison when only visible content matters.
311
312 Args:
313 other: The RGA to compare against.
314
315 Returns:
316 ``True`` when the full internal element lists are equal.
317 """
318 return self._elements == other._elements
319
320 def __repr__(self) -> str:
321 return f"RGA(len={len(self)}, elements={self._elements!r})"
File History 7 commits
sha256:18b983389ee1b55900fcd799bfbb496552d2e3ecded9d18cefbfef188947a12e chore: remove blob-debug test marker file Sonnet 4.6 1 day ago
sha256:e452ad9a6ace6ccc6d875a35e06caf9da5576a970c1c36133b69a891ce5fefa8 chore: prebuild timing test Sonnet 4.6 9 days ago
sha256:0008ab6695e3e064b3e236b24fd19e538fef6a588eb0d211622f4466d919c0b1 merge: pull staging/dev — advance to 0.2.0rc12 Sonnet 4.6 patch 11 days ago
sha256:9c33d61749fff814c5226d5386aa2af7064c2c02788594a25fdd709358132eea fix: _PROPOSAL_PREFIX_RESOLVE_LIMIT 200 → 100 to match hub … Sonnet 4.6 22 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 25 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 31 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 31 days ago