or_set.py
python
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b
fix: try fetch/presign before fetch/mpack to avoid Cloudfla…
Sonnet 4.6
patch
8 days ago
| 1 | """Observed-Remove Set (OR-Set) — add-wins CRDT for unordered collections. |
| 2 | |
| 3 | An OR-Set is an unordered set where *adds always win over concurrent removes*. |
| 4 | Each element is tagged with a unique token set when it is added. Removing an |
| 5 | element requires specifying the observed tokens; a concurrent add with a new |
| 6 | token survives the remove. |
| 7 | |
| 8 | This gives the "add-wins" property: if agent A adds element X and agent B |
| 9 | concurrently removes X, the merged result still contains X. The rationale |
| 10 | for Muse: an annotation or a note added by one agent should not be silently |
| 11 | deleted by a concurrent tombstone from another. |
| 12 | |
| 13 | **Algorithm (Shapiro et al., 2011 "A Comprehensive Study of CRDTs"):** |
| 14 | |
| 15 | - Each element ``e`` maps to a set of *unique tokens* ``{t₁, t₂, …}``. |
| 16 | - ``add(e)`` generates a fresh token and inserts ``(e, token)`` into the |
| 17 | payload. |
| 18 | - ``remove(e)`` removes every ``(e, token)`` pair currently observed. |
| 19 | A concurrent ``add(e)`` with a new token survives because its token was not |
| 20 | yet in the "observed" set at remove time. |
| 21 | - ``join(a, b)`` is set-union on the ``(element, token)`` pairs, then removes |
| 22 | any pair whose token appears in either replica's *tombstone* set (for |
| 23 | optimisation — see :attr:`_tombstones`). |
| 24 | |
| 25 | We simplify slightly: tombstones are not compacted in this implementation |
| 26 | (correct but not space-optimal). Compaction is a GC concern, not a |
| 27 | correctness concern. |
| 28 | |
| 29 | **Lattice laws satisfied by** :meth:`join`: |
| 30 | 1. Commutativity: ``join(a, b) == join(b, a)`` |
| 31 | 2. Associativity: ``join(join(a, b), c) == join(a, join(b, c))`` |
| 32 | 3. Idempotency: ``join(a, a) == a`` |
| 33 | |
| 34 | Public API |
| 35 | ---------- |
| 36 | - :class:`ORSetEntry` — ``TypedDict`` for a single (element, token) pair. |
| 37 | - :class:`ORSetDict` — ``TypedDict`` wire format for a complete OR-Set. |
| 38 | - :class:`ORSet` — the set itself. |
| 39 | """ |
| 40 | |
| 41 | import logging |
| 42 | import secrets |
| 43 | from typing import TypedDict |
| 44 | |
| 45 | logger = logging.getLogger(__name__) |
| 46 | |
| 47 | class ORSetEntry(TypedDict): |
| 48 | """A single (element, token) pair in the OR-Set payload. |
| 49 | |
| 50 | ``element`` is the string value being tracked (e.g. a content hash or a |
| 51 | label). ``token`` is the unique identifier created when this particular |
| 52 | *addition* of ``element`` occurred; it distinguishes concurrent adds of |
| 53 | the same element by different agents. |
| 54 | """ |
| 55 | |
| 56 | element: str |
| 57 | token: str |
| 58 | |
| 59 | class ORSetDict(TypedDict): |
| 60 | """Wire format for a complete :class:`ORSet`. |
| 61 | |
| 62 | ``entries`` holds all live ``(element, token)`` pairs. |
| 63 | ``tombstones`` holds all token strings that have been explicitly removed. |
| 64 | An entry whose token appears in ``tombstones`` is considered deleted. |
| 65 | """ |
| 66 | |
| 67 | entries: list[ORSetEntry] |
| 68 | tombstones: list[str] |
| 69 | |
| 70 | class ORSet: |
| 71 | """Observed-Remove Set — an unordered add-wins CRDT set. |
| 72 | |
| 73 | Elements are arbitrary strings (content hashes, labels, identifiers). |
| 74 | The set supports concurrent add and remove from multiple agents with the |
| 75 | guarantee that adds always win over concurrent removes. |
| 76 | |
| 77 | All mutating methods return new :class:`ORSet` instances; ``self`` is |
| 78 | never modified. |
| 79 | |
| 80 | Example:: |
| 81 | |
| 82 | s1 = ORSet() |
| 83 | s1, tok = s1.add("note-A") |
| 84 | |
| 85 | s2 = ORSet() |
| 86 | s2 = s2.remove("note-A", {tok}) # remove the observed token |
| 87 | |
| 88 | # Concurrent add by s1 with a NEW token survives the remove: |
| 89 | s1_v2, tok2 = s1.add("note-A") # new token |
| 90 | |
| 91 | merged = s1_v2.join(s2) |
| 92 | assert "note-A" in merged.elements() # add-wins |
| 93 | """ |
| 94 | |
| 95 | def __init__( |
| 96 | self, |
| 97 | entries: set[tuple[str, str]] | None = None, |
| 98 | tombstones: set[str] | None = None, |
| 99 | ) -> None: |
| 100 | """Initialise an OR-Set, optionally pre-populated. |
| 101 | |
| 102 | Args: |
| 103 | entries: Set of ``(element, token)`` pairs (alive entries). |
| 104 | tombstones: Set of removed tokens. |
| 105 | """ |
| 106 | self._entries: set[tuple[str, str]] = set(entries) if entries else set() |
| 107 | self._tombstones: set[str] = set(tombstones) if tombstones else set() |
| 108 | |
| 109 | # ------------------------------------------------------------------ |
| 110 | # Mutations (return new ORSet) |
| 111 | # ------------------------------------------------------------------ |
| 112 | |
| 113 | def add(self, element: str) -> tuple[ORSet, str]: |
| 114 | """Add *element* to the set with a fresh unique token. |
| 115 | |
| 116 | Args: |
| 117 | element: The string value to add. |
| 118 | |
| 119 | Returns: |
| 120 | A ``(new_set, token)`` pair where ``new_set`` contains the |
| 121 | added element and ``token`` is the unique identifier of this |
| 122 | particular addition (useful for targeted removal later). |
| 123 | """ |
| 124 | token = secrets.token_hex(16) |
| 125 | new_entries = self._entries | {(element, token)} |
| 126 | return ORSet(new_entries, self._tombstones), token |
| 127 | |
| 128 | def remove(self, element: str, observed_tokens: set[str]) -> ORSet: |
| 129 | """Remove *element* by tombstoning all currently observed tokens. |
| 130 | |
| 131 | Only the tokens listed in *observed_tokens* are tombstoned. Any token |
| 132 | added *after* this remove (i.e. from a concurrent ``add``) is not |
| 133 | affected and the element will survive in the merged result. |
| 134 | |
| 135 | Args: |
| 136 | element: The element to remove. |
| 137 | observed_tokens: The set of tokens for *element* that were observed |
| 138 | at remove time (typically ``self.tokens_for(element)``). |
| 139 | |
| 140 | Returns: |
| 141 | A new :class:`ORSet` with *element*'s observed tokens tombstoned. |
| 142 | """ |
| 143 | relevant = {(e, t) for e, t in self._entries if e == element and t in observed_tokens} |
| 144 | new_tombstones = self._tombstones | {t for _, t in relevant} |
| 145 | new_entries = self._entries - relevant |
| 146 | return ORSet(new_entries, new_tombstones) |
| 147 | |
| 148 | # ------------------------------------------------------------------ |
| 149 | # CRDT join |
| 150 | # ------------------------------------------------------------------ |
| 151 | |
| 152 | def join(self, other: ORSet) -> ORSet: |
| 153 | """Return the lattice join — union of entries minus all tombstones. |
| 154 | |
| 155 | Args: |
| 156 | other: The OR-Set to merge with. |
| 157 | |
| 158 | Returns: |
| 159 | A new :class:`ORSet` containing all entries from either replica |
| 160 | whose tokens have not been tombstoned by either replica. |
| 161 | """ |
| 162 | all_tombstones = self._tombstones | other._tombstones |
| 163 | all_entries = (self._entries | other._entries) - { |
| 164 | (e, t) for e, t in self._entries | other._entries if t in all_tombstones |
| 165 | } |
| 166 | return ORSet(all_entries, all_tombstones) |
| 167 | |
| 168 | # ------------------------------------------------------------------ |
| 169 | # Query |
| 170 | # ------------------------------------------------------------------ |
| 171 | |
| 172 | def elements(self) -> frozenset[str]: |
| 173 | """Return the set of visible elements (those not tombstoned). |
| 174 | |
| 175 | Returns: |
| 176 | Frozenset of string elements currently in the set. |
| 177 | """ |
| 178 | return frozenset(e for e, t in self._entries if t not in self._tombstones) |
| 179 | |
| 180 | def tokens_for(self, element: str) -> set[str]: |
| 181 | """Return all live tokens for *element*. |
| 182 | |
| 183 | Pass the result to :meth:`remove` to remove *element* without |
| 184 | accidentally tombstoning tokens added concurrently. |
| 185 | |
| 186 | Args: |
| 187 | element: The element to look up. |
| 188 | |
| 189 | Returns: |
| 190 | Set of token strings associated with live copies of *element*. |
| 191 | """ |
| 192 | return {t for e, t in self._entries if e == element and t not in self._tombstones} |
| 193 | |
| 194 | def __contains__(self, element: str) -> bool: |
| 195 | return element in self.elements() |
| 196 | |
| 197 | # ------------------------------------------------------------------ |
| 198 | # Serialisation |
| 199 | # ------------------------------------------------------------------ |
| 200 | |
| 201 | def to_dict(self) -> ORSetDict: |
| 202 | """Return a JSON-serialisable :class:`ORSetDict`. |
| 203 | |
| 204 | Returns: |
| 205 | Dict with ``"entries"`` and ``"tombstones"`` lists. |
| 206 | """ |
| 207 | entries: list[ORSetEntry] = [{"element": e, "token": t} for e, t in sorted(self._entries)] |
| 208 | return {"entries": entries, "tombstones": sorted(self._tombstones)} |
| 209 | |
| 210 | @classmethod |
| 211 | def from_dict(cls, data: ORSetDict) -> ORSet: |
| 212 | """Reconstruct an :class:`ORSet` from its wire representation. |
| 213 | |
| 214 | Args: |
| 215 | data: Dict as produced by :meth:`to_dict`. |
| 216 | |
| 217 | Returns: |
| 218 | A new :class:`ORSet`. |
| 219 | """ |
| 220 | entries = {(entry["element"], entry["token"]) for entry in data["entries"]} |
| 221 | tombstones = set(data["tombstones"]) |
| 222 | return cls(entries, tombstones) |
| 223 | |
| 224 | # ------------------------------------------------------------------ |
| 225 | # Python dunder helpers |
| 226 | # ------------------------------------------------------------------ |
| 227 | |
| 228 | def equivalent(self, other: ORSet) -> bool: |
| 229 | """Return ``True`` if both OR-Sets have the same visible elements and tombstones. |
| 230 | |
| 231 | Args: |
| 232 | other: The OR-Set to compare against. |
| 233 | |
| 234 | Returns: |
| 235 | ``True`` when visible elements and tombstone sets are identical. |
| 236 | """ |
| 237 | return self.elements() == other.elements() and self._tombstones == other._tombstones |
| 238 | |
| 239 | def __repr__(self) -> str: |
| 240 | return f"ORSet(elements={self.elements()!r})" |
File History
1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b
fix: try fetch/presign before fetch/mpack to avoid Cloudfla…
Sonnet 4.6
patch
8 days ago