gabriel / muse public
op_log.py python
408 lines 14.6 KB
Raw
sha256:ff478cfdcdd4b7fd6de89cb68896601a981f945634463275ec333bd20ca36402 Merge branch 'dev' into main Human 21 days ago
1 """Append-only operation log for Muse live collaboration.
2
3 The op log is the bridge between real-time collaborative editing and the
4 immutable commit DAG. During a live session, operations are appended to
5 the log as they occur. At commit time the log is collapsed into a
6 :class:`~muse.domain.StructuredDelta` and stored with the commit record.
7
8 Design principles
9 -----------------
10 - **Append-only** — entries are never modified or deleted; the file grows
11 monotonically. Compaction happens through checkpoints (see below).
12 - **Lamport-clocked** — every entry carries a logical Lamport timestamp
13 that imposes a total order across concurrent actors without wall-clock
14 coordination.
15 - **Causally linked** — ``parent_op_ids`` lets any entry declare the ops it
16 depends on, enabling causal replay and CRDT join operations downstream.
17 - **Domain-neutral** — the log stores :class:`~muse.domain.DomainOp` values
18 unchanged; the core engine has no opinion about what those ops mean.
19 - **Checkpoint / compaction** — when a live session crystallises into a Muse
20 commit, a checkpoint record is written that marks the current snapshot.
21 Subsequent reads return only ops that arrived after the checkpoint.
22
23 Layout::
24
25 .muse/op_log/<session_id>/
26 ops.jsonl — one JSON line per OpEntry (append-only)
27 checkpoint.json — most recent checkpoint (snapshot_id + lamport_ts)
28
29 Relationship to the commit DAG
30 -------------------------------
31 The op log does **not** replace the commit DAG. It is a staging area:
32
33 live edits → OpLog.append() → ops.jsonl
34 session end → OpLog.checkpoint(snapshot_id) → commit record
35 commit record → normal Muse commit DAG
36
37 Replaying the log from a checkpoint reproduces the snapshot deterministically,
38 giving the same guarantee as re-running ``git apply`` from a patch file.
39
40 Usage::
41
42 from muse.core.op_log import OpLog, make_op_entry
43
44 log = OpLog(repo_root, session_id="session-abc")
45 entry = make_op_entry(
46 actor_id="counterpoint-bot",
47 domain="midi",
48 domain_op=my_insert_op,
49 lamport_ts=log.next_lamport_ts(),
50 )
51 log.append(entry)
52
53 delta = log.to_structured_delta("midi") # collapse for commit
54 ckpt = log.checkpoint(snapshot_id) # crystallise
55 """
56
57 import json
58 import logging
59 import pathlib
60 from typing import TypedDict
61
62 from muse.core.types import content_hash, load_json_file, now_utc_iso
63 from muse.core.io import write_text_atomic
64 from muse.domain import DomainOp, StructuredDelta
65 from muse.core.paths import op_log_dir
66
67 type _IntMap = dict[str, int]
68 logger = logging.getLogger(__name__)
69
70 # ---------------------------------------------------------------------------
71 # Wire-format TypedDicts
72 # ---------------------------------------------------------------------------
73
74 class OpEntry(TypedDict):
75 """A single operation in the append-only op log.
76
77 ``op_id``
78 Stable content-addressed ID for this entry — used by consumers to deduplicate
79 on replay and by CRDT join to establish causal identity.
80 ``actor_id``
81 The agent or human identity that produced this op.
82 ``lamport_ts``
83 Logical Lamport timestamp. Monotonically increasing within a
84 session; used to establish total ordering when wall-clock times
85 are unavailable or unreliable.
86 ``parent_op_ids``
87 Causal parents — op IDs that this entry depends on. Empty list
88 means this entry has no explicit causal dependency (root entry).
89 Used by CRDT merge and causal replay.
90 ``domain``
91 Domain tag matching the :class:`~muse.domain.MuseDomainPlugin`
92 that produced this op (e.g. ``"midi"``, ``"code"``).
93 ``domain_op``
94 The actual typed domain operation. Stored verbatim.
95 ``created_at``
96 ISO 8601 UTC wall-clock timestamp when the entry was appended.
97 Informational only — use ``lamport_ts`` for ordering.
98 ``intent_id``
99 Links this op to a coordination intent (from
100 :mod:`muse.core.coordination`). Empty string if not applicable.
101 ``reservation_id``
102 Links this op to a coordination reservation. Empty string if not
103 applicable.
104 """
105
106 op_id: str
107 actor_id: str
108 lamport_ts: int
109 parent_op_ids: list[str]
110 domain: str
111 domain_op: DomainOp
112 created_at: str
113 intent_id: str
114 reservation_id: str
115
116 class OpLogCheckpoint(TypedDict):
117 """A snapshot of the op log state at commit time.
118
119 Written by :meth:`OpLog.checkpoint` when a live session crystallises
120 into a Muse commit. Subsequent :meth:`OpLog.replay_since_checkpoint`
121 calls return only ops that arrived after this checkpoint.
122
123 ``session_id``
124 The session this checkpoint belongs to.
125 ``snapshot_id``
126 The commit snapshot ID that this checkpoint materialises. All ops
127 up to and including ``lamport_ts`` are captured by this snapshot.
128 ``lamport_ts``
129 The Lamport timestamp of the last op included in this checkpoint.
130 ``op_count``
131 Number of op entries in the log at checkpoint time.
132 ``created_at``
133 ISO 8601 UTC timestamp.
134 """
135
136 session_id: str
137 snapshot_id: str
138 lamport_ts: int
139 op_count: int
140 created_at: str
141
142 # ---------------------------------------------------------------------------
143 # Factory
144 # ---------------------------------------------------------------------------
145
146 def make_op_entry(
147 actor_id: str,
148 domain: str,
149 domain_op: DomainOp,
150 lamport_ts: int,
151 *,
152 parent_op_ids: list[str] | None = None,
153 intent_id: str = "",
154 reservation_id: str = "",
155 ) -> OpEntry:
156 """Create a new :class:`OpEntry` with a content-addressed op_id.
157
158 Args:
159 actor_id: Agent or human identity string.
160 domain: Domain tag (e.g. ``"midi"``).
161 domain_op: The typed domain operation to log.
162 lamport_ts: Logical Lamport timestamp for this entry.
163 parent_op_ids: Causal dependencies. Defaults to empty list.
164 intent_id: Optional coordination intent linkage.
165 reservation_id: Optional coordination reservation linkage.
166
167 Returns:
168 A fully populated :class:`OpEntry`.
169 """
170 parents = list(parent_op_ids or [])
171 op_id = content_hash({
172 "actor_id": actor_id,
173 "domain": domain,
174 "domain_op": domain_op,
175 "intent_id": intent_id,
176 "lamport_ts": lamport_ts,
177 "parent_op_ids": sorted(parents),
178 "reservation_id": reservation_id,
179 })
180 return OpEntry(
181 op_id=op_id,
182 actor_id=actor_id,
183 lamport_ts=lamport_ts,
184 parent_op_ids=parents,
185 domain=domain,
186 domain_op=domain_op,
187 created_at=now_utc_iso(),
188 intent_id=intent_id,
189 reservation_id=reservation_id,
190 )
191
192 # ---------------------------------------------------------------------------
193 # OpLog
194 # ---------------------------------------------------------------------------
195
196 class OpLog:
197 """Append-only operation log for a single live collaboration session.
198
199 Each session gets its own directory under ``.muse/op_log/<session_id>/``.
200 The log file is JSON-lines: one :class:`OpEntry` per line. The checkpoint
201 file is a single JSON object written atomically when a session is committed.
202
203 Args:
204 repo_root: Repository root (the directory containing ``.muse/``).
205 session_id: Stable identifier for this collaboration session. Use a
206 content-addressed ID, a branch name, or any stable string. The session
207 directory is created on first :meth:`append`.
208 """
209
210 def __init__(self, repo_root: pathlib.Path, session_id: str) -> None:
211 self._repo_root = repo_root
212 self._session_id = session_id
213 self._session_dir = op_log_dir(repo_root) / session_id
214 self._ops_path = self._session_dir / "ops.jsonl"
215 self._checkpoint_path = self._session_dir / "checkpoint.json"
216 self._lamport: int = 0
217
218 # ------------------------------------------------------------------
219 # Internal helpers
220 # ------------------------------------------------------------------
221
222 def _ensure_dir(self) -> None:
223 self._session_dir.mkdir(parents=True, exist_ok=True)
224
225 def _load_lamport(self) -> int:
226 """Return the highest lamport_ts seen in the log so far."""
227 if not self._ops_path.exists():
228 return 0
229 highest = 0
230 with self._ops_path.open() as fh:
231 for line in fh:
232 line = line.strip()
233 if not line:
234 continue
235 try:
236 entry: OpEntry = json.loads(line)
237 highest = max(highest, entry.get("lamport_ts", 0))
238 except json.JSONDecodeError:
239 continue
240 return highest
241
242 # ------------------------------------------------------------------
243 # Public API
244 # ------------------------------------------------------------------
245
246 def next_lamport_ts(self) -> int:
247 """Return the next Lamport timestamp to use, advancing the counter.
248
249 The counter is initialised lazily from the highest value found in the
250 log on first call (so that a reopened session continues from where it
251 left off).
252
253 Returns:
254 Monotonically increasing integer.
255 """
256 if self._lamport == 0:
257 self._lamport = self._load_lamport()
258 self._lamport += 1
259 return self._lamport
260
261 def append(self, entry: OpEntry) -> None:
262 """Append *entry* to the op log.
263
264 The entry is serialised as a single JSON line and flushed to disk.
265 This is the only write operation on the log file; entries are never
266 modified or deleted.
267
268 Args:
269 entry: A fully populated :class:`OpEntry`.
270 """
271 self._ensure_dir()
272 line = f"{json.dumps(entry, separators=(',', ':'))}\n"
273 with self._ops_path.open("a") as fh:
274 fh.write(line)
275 logger.debug(
276 "✅ OpLog append: actor=%r domain=%r ts=%d",
277 entry["actor_id"],
278 entry["domain"],
279 entry["lamport_ts"],
280 )
281
282 def read_all(self) -> list[OpEntry]:
283 """Return all entries in the log, in append order.
284
285 Returns:
286 List of :class:`OpEntry` dicts, oldest first.
287 """
288 if not self._ops_path.exists():
289 return []
290 entries: list[OpEntry] = []
291 with self._ops_path.open() as fh:
292 for line in fh:
293 line = line.strip()
294 if not line:
295 continue
296 try:
297 entries.append(json.loads(line))
298 except json.JSONDecodeError as exc:
299 logger.warning("⚠️ Corrupt op log line in %s: %s", self._ops_path, exc)
300 return entries
301
302 def replay_since_checkpoint(self) -> list[OpEntry]:
303 """Return entries that arrived after the last checkpoint.
304
305 If no checkpoint exists, returns all entries (equivalent to
306 :meth:`read_all`).
307
308 Returns:
309 List of :class:`OpEntry` dicts since last checkpoint, oldest first.
310 """
311 checkpoint = self.read_checkpoint()
312 all_entries = self.read_all()
313 if checkpoint is None:
314 return all_entries
315 cutoff = checkpoint["lamport_ts"]
316 return [e for e in all_entries if e["lamport_ts"] > cutoff]
317
318 def to_structured_delta(self, domain: str) -> StructuredDelta:
319 """Collapse all entries since the last checkpoint into a StructuredDelta.
320
321 Ops are ordered by Lamport timestamp. Ops from domains other than
322 *domain* are filtered out (a session may carry cross-domain ops from
323 coordinated agents; each domain collapses its own slice).
324
325 Args:
326 domain: Domain tag to filter by (e.g. ``"midi"``).
327
328 Returns:
329 A :class:`~muse.domain.StructuredDelta` with the ordered op list
330 and a simple count summary.
331 """
332 entries = self.replay_since_checkpoint()
333 entries.sort(key=lambda e: e["lamport_ts"])
334 ops = [e["domain_op"] for e in entries if e["domain"] == domain]
335
336 counts: _IntMap = {}
337 for op in ops:
338 kind = op.get("op", "unknown")
339 counts[kind] = counts.get(kind, 0) + 1
340 parts = [f"{v} {k}" for k, v in sorted(counts.items())]
341 summary = ", ".join(parts) if parts else "no ops"
342
343 return StructuredDelta(domain=domain, ops=ops, summary=summary)
344
345 def checkpoint(self, snapshot_id: str) -> OpLogCheckpoint:
346 """Write a checkpoint recording that all current ops are in *snapshot_id*.
347
348 After a checkpoint, :meth:`replay_since_checkpoint` will only return
349 ops that arrive after this call. The op log file itself is never
350 truncated — the checkpoint is a logical marker.
351
352 Args:
353 snapshot_id: The Muse snapshot ID that captured all ops to date.
354
355 Returns:
356 The written :class:`OpLogCheckpoint`.
357 """
358 all_entries = self.read_all()
359 highest_ts = max((e["lamport_ts"] for e in all_entries), default=0)
360 ckpt = OpLogCheckpoint(
361 session_id=self._session_id,
362 snapshot_id=snapshot_id,
363 lamport_ts=highest_ts,
364 op_count=len(all_entries),
365 created_at=now_utc_iso(),
366 )
367 self._ensure_dir()
368 write_text_atomic(self._checkpoint_path, f"{json.dumps(ckpt, indent=2)}\n")
369 logger.info(
370 "✅ OpLog checkpoint: session=%r snapshot=%s ts=%d ops=%d",
371 self._session_id,
372 snapshot_id,
373 highest_ts,
374 len(all_entries),
375 )
376 return ckpt
377
378 def read_checkpoint(self) -> OpLogCheckpoint | None:
379 """Load the most recent checkpoint, or ``None`` if none exists."""
380 if not self._checkpoint_path.exists():
381 return None
382 raw = load_json_file(self._checkpoint_path)
383 if raw is None:
384 logger.warning("⚠️ Corrupt checkpoint file %s", self._checkpoint_path)
385 return None
386 return raw
387
388 def session_id(self) -> str:
389 """Return the session ID for this log."""
390 return self._session_id
391
392 # ---------------------------------------------------------------------------
393 # Session listing
394 # ---------------------------------------------------------------------------
395
396 def list_sessions(repo_root: pathlib.Path) -> list[str]:
397 """Return all session IDs that have op log directories under *repo_root*.
398
399 Args:
400 repo_root: Repository root.
401
402 Returns:
403 Sorted list of session ID strings.
404 """
405 log_dir = op_log_dir(repo_root)
406 if not log_dir.exists():
407 return []
408 return sorted(p.name for p in log_dir.iterdir() if p.is_dir())
File History 2 commits
sha256:ff478cfdcdd4b7fd6de89cb68896601a981f945634463275ec333bd20ca36402 Merge branch 'dev' into main Human 21 days ago
sha256:1c4b3e3a9a1f300774c3ee662b572a698d5fd405bf765a71e6011a2e9c3eaaaa feat: Muse — version control for the agent era Human 73 days ago