lcs.py
python
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠ breaking
29 days ago
| 1 | """LCS / Myers shortest-edit-script algorithm for ordered sequences. |
| 2 | |
| 3 | Operates on ``list[str]`` where each string is a content ID (SHA-256 or |
| 4 | deterministic hash). Two elements are considered identical iff their content |
| 5 | IDs are equal — the algorithm never inspects actual content. |
| 6 | |
| 7 | Public API |
| 8 | ---------- |
| 9 | - :func:`myers_ses` — compute shortest edit script (keep / insert / delete). |
| 10 | - :func:`detect_moves` — post-process insert+delete pairs into ``MoveOp``\\s. |
| 11 | - :func:`diff` — end-to-end: list[str] × list[str] → ``StructuredDelta``. |
| 12 | |
| 13 | Algorithm |
| 14 | --------- |
| 15 | ``myers_ses`` uses the classic O(nm) LCS dynamic-programming traceback. This |
| 16 | is the same algorithm as ``midi_diff.lcs_edit_script`` but operates on content |
| 17 | IDs (strings) rather than ``NoteKey`` dicts, making it fully generic. |
| 18 | |
| 19 | The patience-diff and O(nd) Myers variants (see ``SequenceSchema.diff_algorithm``) |
| 20 | are not yet implemented; both fall back to the O(nm) LCS. |
| 21 | as an optimisation without changing the public API. |
| 22 | """ |
| 23 | |
| 24 | import logging |
| 25 | from dataclasses import dataclass |
| 26 | from typing import Literal |
| 27 | |
| 28 | from muse.core.schema import SequenceSchema |
| 29 | from muse.domain import DeleteOp, DomainOp, InsertOp, MoveOp, StructuredDelta |
| 30 | |
| 31 | type _DeleteMap = dict[str, "DeleteOp"] |
| 32 | logger = logging.getLogger(__name__) |
| 33 | |
| 34 | EditKind = Literal["keep", "insert", "delete"] |
| 35 | |
| 36 | @dataclass(frozen=True) |
| 37 | class EditStep: |
| 38 | """One step in the shortest edit script produced by :func:`myers_ses`.""" |
| 39 | |
| 40 | kind: EditKind |
| 41 | base_index: int # index in the base content-ID list |
| 42 | target_index: int # index in the target content-ID list |
| 43 | item: str # content ID of the element |
| 44 | |
| 45 | # --------------------------------------------------------------------------- |
| 46 | # Core algorithm |
| 47 | # --------------------------------------------------------------------------- |
| 48 | |
| 49 | def myers_ses(base: list[str], target: list[str]) -> list[EditStep]: |
| 50 | """Compute the shortest edit script transforming *base* into *target*. |
| 51 | |
| 52 | Uses the O(nm) LCS dynamic-programming table followed by a linear-time |
| 53 | traceback. Two elements are equal iff their content IDs match. |
| 54 | |
| 55 | Args: |
| 56 | base: Ordered list of content IDs for the base sequence. |
| 57 | target: Ordered list of content IDs for the target sequence. |
| 58 | |
| 59 | Returns: |
| 60 | A list of :class:`EditStep` entries (keep / insert / delete) that |
| 61 | transforms *base* into *target*. The number of "keep" steps equals |
| 62 | the LCS length; insert + delete steps are minimal. |
| 63 | """ |
| 64 | n, m = len(base), len(target) |
| 65 | |
| 66 | # dp[i][j] = length of LCS of base[i:] and target[j:] |
| 67 | dp: list[list[int]] = [[0] * (m + 1) for _ in range(n + 1)] |
| 68 | for i in range(n - 1, -1, -1): |
| 69 | for j in range(m - 1, -1, -1): |
| 70 | if base[i] == target[j]: |
| 71 | dp[i][j] = dp[i + 1][j + 1] + 1 |
| 72 | else: |
| 73 | dp[i][j] = max(dp[i + 1][j], dp[i][j + 1]) |
| 74 | |
| 75 | steps: list[EditStep] = [] |
| 76 | i, j = 0, 0 |
| 77 | while i < n or j < m: |
| 78 | if i < n and j < m and base[i] == target[j]: |
| 79 | steps.append(EditStep("keep", i, j, base[i])) |
| 80 | i += 1 |
| 81 | j += 1 |
| 82 | elif j < m and (i >= n or dp[i][j + 1] >= dp[i + 1][j]): |
| 83 | steps.append(EditStep("insert", i, j, target[j])) |
| 84 | j += 1 |
| 85 | else: |
| 86 | steps.append(EditStep("delete", i, j, base[i])) |
| 87 | i += 1 |
| 88 | |
| 89 | return steps |
| 90 | |
| 91 | # --------------------------------------------------------------------------- |
| 92 | # Move detection post-pass |
| 93 | # --------------------------------------------------------------------------- |
| 94 | |
| 95 | def detect_moves( |
| 96 | inserts: list[InsertOp], |
| 97 | deletes: list[DeleteOp], |
| 98 | ) -> tuple[list[MoveOp], list[InsertOp], list[DeleteOp]]: |
| 99 | """Collapse (delete, insert) pairs that share a content ID into ``MoveOp``\\s. |
| 100 | |
| 101 | A move is defined as a delete and an insert of the same content (same |
| 102 | ``content_id``) at different positions. Where the positions are the same, |
| 103 | the pair is left as separate insert/delete ops (idempotent round-trip). |
| 104 | |
| 105 | Args: |
| 106 | inserts: ``InsertOp`` entries from the LCS edit script. |
| 107 | deletes: ``DeleteOp`` entries from the LCS edit script. |
| 108 | |
| 109 | Returns: |
| 110 | A tuple ``(moves, remaining_inserts, remaining_deletes)`` where |
| 111 | ``moves`` contains the detected ``MoveOp``\\s and the remaining lists |
| 112 | hold ops that could not be paired. |
| 113 | """ |
| 114 | delete_by_content: _DeleteMap = {} |
| 115 | for d in deletes: |
| 116 | # Keep the first delete for each content_id — later ones are true deletes. |
| 117 | if d["content_id"] not in delete_by_content: |
| 118 | delete_by_content[d["content_id"]] = d |
| 119 | |
| 120 | moves: list[MoveOp] = [] |
| 121 | remaining_inserts: list[InsertOp] = [] |
| 122 | consumed: set[str] = set() |
| 123 | |
| 124 | for ins in inserts: |
| 125 | cid = ins["content_id"] |
| 126 | if cid in delete_by_content and cid not in consumed: |
| 127 | d = delete_by_content[cid] |
| 128 | from_pos = d["position"] if d["position"] is not None else 0 |
| 129 | to_pos = ins["position"] if ins["position"] is not None else 0 |
| 130 | if from_pos != to_pos: |
| 131 | moves.append( |
| 132 | MoveOp( |
| 133 | op="move", |
| 134 | address=ins["address"], |
| 135 | from_position=from_pos, |
| 136 | to_position=to_pos, |
| 137 | content_id=cid, |
| 138 | ) |
| 139 | ) |
| 140 | consumed.add(cid) |
| 141 | continue |
| 142 | remaining_inserts.append(ins) |
| 143 | |
| 144 | remaining_deletes = [d for d in deletes if d["content_id"] not in consumed] |
| 145 | return moves, remaining_inserts, remaining_deletes |
| 146 | |
| 147 | # --------------------------------------------------------------------------- |
| 148 | # Top-level diff entry point |
| 149 | # --------------------------------------------------------------------------- |
| 150 | |
| 151 | def diff( |
| 152 | schema: SequenceSchema, |
| 153 | base: list[str], |
| 154 | target: list[str], |
| 155 | *, |
| 156 | domain: str, |
| 157 | address: str = "", |
| 158 | ) -> StructuredDelta: |
| 159 | """Diff two ordered sequences of content IDs, returning a ``StructuredDelta``. |
| 160 | |
| 161 | Runs :func:`myers_ses`, then :func:`detect_moves` to collapse paired |
| 162 | insert/delete entries into ``MoveOp``\\s. The resulting ``ops`` list |
| 163 | contains ``DeleteOp``, ``InsertOp``, and ``MoveOp`` entries. |
| 164 | |
| 165 | Args: |
| 166 | schema: The ``SequenceSchema`` declaring element type and identity. |
| 167 | base: Base (ancestor) sequence as a list of content IDs. |
| 168 | target: Target (newer) sequence as a list of content IDs. |
| 169 | domain: Domain tag for the returned ``StructuredDelta``. |
| 170 | address: Address prefix for generated op entries (e.g. file path). |
| 171 | |
| 172 | Returns: |
| 173 | A ``StructuredDelta`` with a human-readable ``summary`` and typed ops. |
| 174 | """ |
| 175 | steps = myers_ses(base, target) |
| 176 | |
| 177 | raw_inserts: list[InsertOp] = [] |
| 178 | raw_deletes: list[DeleteOp] = [] |
| 179 | elem = schema["element_type"] |
| 180 | |
| 181 | for step in steps: |
| 182 | if step.kind == "insert": |
| 183 | raw_inserts.append( |
| 184 | InsertOp( |
| 185 | op="insert", |
| 186 | address=address, |
| 187 | position=step.target_index, |
| 188 | content_id=step.item, |
| 189 | content_summary=f"{elem}:{step.item}", |
| 190 | ) |
| 191 | ) |
| 192 | elif step.kind == "delete": |
| 193 | raw_deletes.append( |
| 194 | DeleteOp( |
| 195 | op="delete", |
| 196 | address=address, |
| 197 | position=step.base_index, |
| 198 | content_id=step.item, |
| 199 | content_summary=f"{elem}:{step.item}", |
| 200 | ) |
| 201 | ) |
| 202 | |
| 203 | moves, remaining_inserts, remaining_deletes = detect_moves(raw_inserts, raw_deletes) |
| 204 | ops: list[DomainOp] = [*remaining_deletes, *remaining_inserts, *moves] |
| 205 | |
| 206 | n_ins = len(remaining_inserts) |
| 207 | n_del = len(remaining_deletes) |
| 208 | n_mov = len(moves) |
| 209 | |
| 210 | parts: list[str] = [] |
| 211 | if n_ins: |
| 212 | parts.append(f"{n_ins} {elem}{'s' if n_ins != 1 else ''} added") |
| 213 | if n_del: |
| 214 | parts.append(f"{n_del} {elem}{'s' if n_del != 1 else ''} removed") |
| 215 | if n_mov: |
| 216 | parts.append(f"{n_mov} {'moved' if n_mov != 1 else 'moved'}") |
| 217 | summary = ", ".join(parts) if parts else f"no {elem} changes" |
| 218 | |
| 219 | logger.debug( |
| 220 | "lcs.diff %r: +%d -%d ~%d ops on %d→%d elements", |
| 221 | address, |
| 222 | n_ins, |
| 223 | n_del, |
| 224 | n_mov, |
| 225 | len(base), |
| 226 | len(target), |
| 227 | ) |
| 228 | |
| 229 | return StructuredDelta(domain=domain, ops=ops, summary=summary) |
File History
1 commit
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠
29 days ago