gabriel / muse public
or_set.py python
240 lines 8.9 KB
Raw
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 8 days ago
1 """Observed-Remove Set (OR-Set) — add-wins CRDT for unordered collections.
2
3 An OR-Set is an unordered set where *adds always win over concurrent removes*.
4 Each element is tagged with a unique token set when it is added. Removing an
5 element requires specifying the observed tokens; a concurrent add with a new
6 token survives the remove.
7
8 This gives the "add-wins" property: if agent A adds element X and agent B
9 concurrently removes X, the merged result still contains X. The rationale
10 for Muse: an annotation or a note added by one agent should not be silently
11 deleted by a concurrent tombstone from another.
12
13 **Algorithm (Shapiro et al., 2011 "A Comprehensive Study of CRDTs"):**
14
15 - Each element ``e`` maps to a set of *unique tokens* ``{t₁, t₂, …}``.
16 - ``add(e)`` generates a fresh token and inserts ``(e, token)`` into the
17 payload.
18 - ``remove(e)`` removes every ``(e, token)`` pair currently observed.
19 A concurrent ``add(e)`` with a new token survives because its token was not
20 yet in the "observed" set at remove time.
21 - ``join(a, b)`` is set-union on the ``(element, token)`` pairs, then removes
22 any pair whose token appears in either replica's *tombstone* set (for
23 optimisation — see :attr:`_tombstones`).
24
25 We simplify slightly: tombstones are not compacted in this implementation
26 (correct but not space-optimal). Compaction is a GC concern, not a
27 correctness concern.
28
29 **Lattice laws satisfied by** :meth:`join`:
30 1. Commutativity: ``join(a, b) == join(b, a)``
31 2. Associativity: ``join(join(a, b), c) == join(a, join(b, c))``
32 3. Idempotency: ``join(a, a) == a``
33
34 Public API
35 ----------
36 - :class:`ORSetEntry` — ``TypedDict`` for a single (element, token) pair.
37 - :class:`ORSetDict` — ``TypedDict`` wire format for a complete OR-Set.
38 - :class:`ORSet` — the set itself.
39 """
40
41 import logging
42 import secrets
43 from typing import TypedDict
44
45 logger = logging.getLogger(__name__)
46
47 class ORSetEntry(TypedDict):
48 """A single (element, token) pair in the OR-Set payload.
49
50 ``element`` is the string value being tracked (e.g. a content hash or a
51 label). ``token`` is the unique identifier created when this particular
52 *addition* of ``element`` occurred; it distinguishes concurrent adds of
53 the same element by different agents.
54 """
55
56 element: str
57 token: str
58
59 class ORSetDict(TypedDict):
60 """Wire format for a complete :class:`ORSet`.
61
62 ``entries`` holds all live ``(element, token)`` pairs.
63 ``tombstones`` holds all token strings that have been explicitly removed.
64 An entry whose token appears in ``tombstones`` is considered deleted.
65 """
66
67 entries: list[ORSetEntry]
68 tombstones: list[str]
69
70 class ORSet:
71 """Observed-Remove Set — an unordered add-wins CRDT set.
72
73 Elements are arbitrary strings (content hashes, labels, identifiers).
74 The set supports concurrent add and remove from multiple agents with the
75 guarantee that adds always win over concurrent removes.
76
77 All mutating methods return new :class:`ORSet` instances; ``self`` is
78 never modified.
79
80 Example::
81
82 s1 = ORSet()
83 s1, tok = s1.add("note-A")
84
85 s2 = ORSet()
86 s2 = s2.remove("note-A", {tok}) # remove the observed token
87
88 # Concurrent add by s1 with a NEW token survives the remove:
89 s1_v2, tok2 = s1.add("note-A") # new token
90
91 merged = s1_v2.join(s2)
92 assert "note-A" in merged.elements() # add-wins
93 """
94
95 def __init__(
96 self,
97 entries: set[tuple[str, str]] | None = None,
98 tombstones: set[str] | None = None,
99 ) -> None:
100 """Initialise an OR-Set, optionally pre-populated.
101
102 Args:
103 entries: Set of ``(element, token)`` pairs (alive entries).
104 tombstones: Set of removed tokens.
105 """
106 self._entries: set[tuple[str, str]] = set(entries) if entries else set()
107 self._tombstones: set[str] = set(tombstones) if tombstones else set()
108
109 # ------------------------------------------------------------------
110 # Mutations (return new ORSet)
111 # ------------------------------------------------------------------
112
113 def add(self, element: str) -> tuple[ORSet, str]:
114 """Add *element* to the set with a fresh unique token.
115
116 Args:
117 element: The string value to add.
118
119 Returns:
120 A ``(new_set, token)`` pair where ``new_set`` contains the
121 added element and ``token`` is the unique identifier of this
122 particular addition (useful for targeted removal later).
123 """
124 token = secrets.token_hex(16)
125 new_entries = self._entries | {(element, token)}
126 return ORSet(new_entries, self._tombstones), token
127
128 def remove(self, element: str, observed_tokens: set[str]) -> ORSet:
129 """Remove *element* by tombstoning all currently observed tokens.
130
131 Only the tokens listed in *observed_tokens* are tombstoned. Any token
132 added *after* this remove (i.e. from a concurrent ``add``) is not
133 affected and the element will survive in the merged result.
134
135 Args:
136 element: The element to remove.
137 observed_tokens: The set of tokens for *element* that were observed
138 at remove time (typically ``self.tokens_for(element)``).
139
140 Returns:
141 A new :class:`ORSet` with *element*'s observed tokens tombstoned.
142 """
143 relevant = {(e, t) for e, t in self._entries if e == element and t in observed_tokens}
144 new_tombstones = self._tombstones | {t for _, t in relevant}
145 new_entries = self._entries - relevant
146 return ORSet(new_entries, new_tombstones)
147
148 # ------------------------------------------------------------------
149 # CRDT join
150 # ------------------------------------------------------------------
151
152 def join(self, other: ORSet) -> ORSet:
153 """Return the lattice join — union of entries minus all tombstones.
154
155 Args:
156 other: The OR-Set to merge with.
157
158 Returns:
159 A new :class:`ORSet` containing all entries from either replica
160 whose tokens have not been tombstoned by either replica.
161 """
162 all_tombstones = self._tombstones | other._tombstones
163 all_entries = (self._entries | other._entries) - {
164 (e, t) for e, t in self._entries | other._entries if t in all_tombstones
165 }
166 return ORSet(all_entries, all_tombstones)
167
168 # ------------------------------------------------------------------
169 # Query
170 # ------------------------------------------------------------------
171
172 def elements(self) -> frozenset[str]:
173 """Return the set of visible elements (those not tombstoned).
174
175 Returns:
176 Frozenset of string elements currently in the set.
177 """
178 return frozenset(e for e, t in self._entries if t not in self._tombstones)
179
180 def tokens_for(self, element: str) -> set[str]:
181 """Return all live tokens for *element*.
182
183 Pass the result to :meth:`remove` to remove *element* without
184 accidentally tombstoning tokens added concurrently.
185
186 Args:
187 element: The element to look up.
188
189 Returns:
190 Set of token strings associated with live copies of *element*.
191 """
192 return {t for e, t in self._entries if e == element and t not in self._tombstones}
193
194 def __contains__(self, element: str) -> bool:
195 return element in self.elements()
196
197 # ------------------------------------------------------------------
198 # Serialisation
199 # ------------------------------------------------------------------
200
201 def to_dict(self) -> ORSetDict:
202 """Return a JSON-serialisable :class:`ORSetDict`.
203
204 Returns:
205 Dict with ``"entries"`` and ``"tombstones"`` lists.
206 """
207 entries: list[ORSetEntry] = [{"element": e, "token": t} for e, t in sorted(self._entries)]
208 return {"entries": entries, "tombstones": sorted(self._tombstones)}
209
210 @classmethod
211 def from_dict(cls, data: ORSetDict) -> ORSet:
212 """Reconstruct an :class:`ORSet` from its wire representation.
213
214 Args:
215 data: Dict as produced by :meth:`to_dict`.
216
217 Returns:
218 A new :class:`ORSet`.
219 """
220 entries = {(entry["element"], entry["token"]) for entry in data["entries"]}
221 tombstones = set(data["tombstones"])
222 return cls(entries, tombstones)
223
224 # ------------------------------------------------------------------
225 # Python dunder helpers
226 # ------------------------------------------------------------------
227
228 def equivalent(self, other: ORSet) -> bool:
229 """Return ``True`` if both OR-Sets have the same visible elements and tombstones.
230
231 Args:
232 other: The OR-Set to compare against.
233
234 Returns:
235 ``True`` when visible elements and tombstone sets are identical.
236 """
237 return self.elements() == other.elements() and self._tombstones == other._tombstones
238
239 def __repr__(self) -> str:
240 return f"ORSet(elements={self.elements()!r})"
File History 1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 8 days ago