code_harmony.py
python
sha256:e6465e8a9b7fa8e6223ed4a3576e96c568c913ae2caeb9c31f15e7a81b250b40
docs: add | jq convention to --json section of agent-guide
Sonnet 4.6
1 day ago
| 1 | """Code-domain HarmonyPlugin — normalized token-bag similarity. |
| 2 | |
| 3 | ``CodePlugin`` |
| 4 | Implements the :class:`~muse.core.harmony.engine.HarmonyPlugin` protocol |
| 5 | for code-domain conflict patterns. Computes Tanimoto similarity on |
| 6 | normalized token multisets produced by :func:`code_fingerprint`. |
| 7 | |
| 8 | ``code_fingerprint(source)`` |
| 9 | Converts a code snippet into a sorted, space-separated string of |
| 10 | normalized tokens. Pass the result as ``--semantic-fingerprint`` when |
| 11 | recording a code-domain conflict pattern. |
| 12 | |
| 13 | Normalization rules |
| 14 | ------------------- |
| 15 | |
| 16 | =================== =========== |
| 17 | Raw token Normalized |
| 18 | =================== =========== |
| 19 | Python keyword preserved as-is (``def``, ``class``, ``return``, …) |
| 20 | Identifier ``ID`` |
| 21 | String literal ``STR`` |
| 22 | Number literal ``NUM`` |
| 23 | Operator / punctuation preserved (``(``, ``+``, ``:``, ``->``, …) |
| 24 | Comment stripped |
| 25 | Whitespace / indent stripped |
| 26 | =================== =========== |
| 27 | |
| 28 | Fingerprint format |
| 29 | ------------------ |
| 30 | |
| 31 | The fingerprint is a **sorted** space-separated token list with repetitions |
| 32 | preserved (multiset encoding):: |
| 33 | |
| 34 | code_fingerprint("def foo(x): return x + 1") |
| 35 | # → "( ) + : ID ID ID NUM def return" |
| 36 | |
| 37 | Sorting makes the fingerprint order-independent: renaming all identifiers in |
| 38 | a function body does not change its structural fingerprint. |
| 39 | |
| 40 | Similarity metric |
| 41 | ----------------- |
| 42 | |
| 43 | :class:`CodePlugin` uses the **Tanimoto coefficient** (multiset Jaccard):: |
| 44 | |
| 45 | sim(A, B) = |min(A, B)| / |max(A, B)| |
| 46 | |
| 47 | where ``|min|`` is the sum of element-wise minimums and ``|max|`` the sum of |
| 48 | element-wise maximums across all distinct token types. |
| 49 | |
| 50 | Tanimoto on multisets rewards shared token *frequencies*, not just shared |
| 51 | types — two functions with three ``ID`` tokens each score higher than a |
| 52 | function with three ``ID`` tokens vs one with ten. |
| 53 | |
| 54 | Typical scores |
| 55 | ~~~~~~~~~~~~~~ |
| 56 | |
| 57 | =========================================== ======= |
| 58 | Structural relationship Score |
| 59 | =========================================== ======= |
| 60 | Identical source (or same keywords + ops) 1.00 |
| 61 | Same function, different identifier names 1.00 |
| 62 | Same import pattern, different module name 1.00 |
| 63 | Same function, one extra parameter ~0.70 |
| 64 | Same function, added docstring ~0.65 |
| 65 | Function body vs class skeleton ~0.17 |
| 66 | Completely unrelated <0.20 |
| 67 | =========================================== ======= |
| 68 | |
| 69 | Fallback tokenizer |
| 70 | ------------------ |
| 71 | |
| 72 | If the Python :mod:`tokenize` module raises :exc:`tokenize.TokenError` |
| 73 | (malformed or non-Python source), :func:`code_fingerprint` falls back to a |
| 74 | simple regex-based tokenizer that applies the same normalization rules. This |
| 75 | handles SQL, YAML fragments, and other text-like conflict regions gracefully. |
| 76 | |
| 77 | Usage example |
| 78 | ------------- |
| 79 | |
| 80 | :: |
| 81 | |
| 82 | from muse.core.plugins.code_harmony import CodePlugin, code_fingerprint |
| 83 | |
| 84 | # When recording a conflict: |
| 85 | fp = code_fingerprint(conflict_region_source) |
| 86 | # → pass as --semantic-fingerprint to muse harmony record |
| 87 | |
| 88 | # When running the engine with code-aware similarity: |
| 89 | result = resolve(root, pattern, plugin=CodePlugin()) |
| 90 | |
| 91 | Thread safety |
| 92 | ------------- |
| 93 | |
| 94 | Both :func:`code_fingerprint` and :class:`CodePlugin` are stateless — safe |
| 95 | for concurrent use without locking. |
| 96 | """ |
| 97 | |
| 98 | import io |
| 99 | import keyword |
| 100 | import re |
| 101 | import tokenize |
| 102 | from collections import Counter |
| 103 | |
| 104 | # --------------------------------------------------------------------------- |
| 105 | # Size guard — prevent OOM on adversarial inputs |
| 106 | # --------------------------------------------------------------------------- |
| 107 | |
| 108 | #: Maximum source bytes processed by :func:`code_fingerprint`. |
| 109 | #: Input beyond this limit is silently truncated before tokenization. |
| 110 | _MAX_SOURCE_BYTES: int = 524_288 # 512 KiB |
| 111 | |
| 112 | # --------------------------------------------------------------------------- |
| 113 | # Regex-based fallback tokenizer |
| 114 | # --------------------------------------------------------------------------- |
| 115 | |
| 116 | #: Matches one token in a "code-like" string: comment, string, number, |
| 117 | #: identifier, or a single non-whitespace character (operator/punctuation). |
| 118 | _TOKEN_RE: re.Pattern[str] = re.compile( |
| 119 | r"#[^\n]*" # Python/Ruby/Shell line comment → skip |
| 120 | r'|//[^\n]*' # C/JS/Go line comment → skip |
| 121 | r'|"(?:[^"\\]|\\.)*"' # double-quoted string |
| 122 | r"|'(?:[^'\\]|\\.)*'" # single-quoted string |
| 123 | r"|[0-9]+(?:\.[0-9]*)?" # integer or decimal number |
| 124 | r"|[A-Za-z_\u0080-\uFFFF][A-Za-z0-9_\u0080-\uFFFF]*" # identifier (incl. unicode) |
| 125 | r"|[^\s]", # single non-whitespace (operator/punctuation) |
| 126 | re.UNICODE, |
| 127 | ) |
| 128 | |
| 129 | def _tokenize_fallback(source: str) -> list[str]: |
| 130 | """Regex-based tokenizer for non-Python or malformed source.""" |
| 131 | tokens: list[str] = [] |
| 132 | for m in _TOKEN_RE.finditer(source): |
| 133 | tok = m.group() |
| 134 | |
| 135 | # Skip comments |
| 136 | if tok.startswith("#") or tok.startswith("//"): |
| 137 | continue |
| 138 | |
| 139 | # String literals |
| 140 | if tok.startswith(('"', "'")): |
| 141 | tokens.append("STR") |
| 142 | continue |
| 143 | |
| 144 | # Numbers |
| 145 | if tok[0].isdigit(): |
| 146 | tokens.append("NUM") |
| 147 | continue |
| 148 | |
| 149 | # Identifiers and keywords |
| 150 | if tok[0].isalpha() or tok[0] == "_" or ord(tok[0]) > 127: |
| 151 | tokens.append(tok if keyword.iskeyword(tok) else "ID") |
| 152 | continue |
| 153 | |
| 154 | # Operators and punctuation (single chars and multi-char via OP) |
| 155 | tokens.append(tok) |
| 156 | |
| 157 | return tokens |
| 158 | |
| 159 | # --------------------------------------------------------------------------- |
| 160 | # Python tokenizer |
| 161 | # --------------------------------------------------------------------------- |
| 162 | |
| 163 | #: Token types to silently drop during Python tokenization. |
| 164 | _SKIP_TYPES: frozenset[int] = frozenset({ |
| 165 | tokenize.COMMENT, |
| 166 | tokenize.NEWLINE, |
| 167 | tokenize.NL, |
| 168 | tokenize.INDENT, |
| 169 | tokenize.DEDENT, |
| 170 | tokenize.ENCODING, |
| 171 | tokenize.ENDMARKER, |
| 172 | }) |
| 173 | |
| 174 | def _tokenize_python(source: str) -> list[str]: |
| 175 | """Tokenize Python source using :mod:`tokenize`, normalizing tokens. |
| 176 | |
| 177 | Falls back to :func:`_tokenize_fallback` on :exc:`tokenize.TokenError`. |
| 178 | """ |
| 179 | # Strip null bytes — they cause tokenize to raise immediately |
| 180 | source = source.replace("\x00", "") |
| 181 | |
| 182 | tokens: list[str] = [] |
| 183 | try: |
| 184 | stream = io.StringIO(source) |
| 185 | for tok_type, tok_string, _, _, _ in tokenize.generate_tokens(stream.readline): |
| 186 | if tok_type in _SKIP_TYPES: |
| 187 | continue |
| 188 | if tok_type == tokenize.NAME: |
| 189 | tokens.append(tok_string if keyword.iskeyword(tok_string) else "ID") |
| 190 | elif tok_type == tokenize.STRING: |
| 191 | tokens.append("STR") |
| 192 | elif tok_type == tokenize.NUMBER: |
| 193 | tokens.append("NUM") |
| 194 | elif tok_type == tokenize.OP: |
| 195 | tokens.append(tok_string) |
| 196 | # OP covers all operators and punctuation; other types dropped |
| 197 | except tokenize.TokenError: |
| 198 | tokens = _tokenize_fallback(source) |
| 199 | |
| 200 | return tokens |
| 201 | |
| 202 | # --------------------------------------------------------------------------- |
| 203 | # Public API |
| 204 | # --------------------------------------------------------------------------- |
| 205 | |
| 206 | def code_fingerprint(source: str) -> str: |
| 207 | """Return a normalized token-bag fingerprint for a code snippet. |
| 208 | |
| 209 | The fingerprint is a **sorted** space-separated string of normalized |
| 210 | tokens (see module docstring for normalization rules). Sorting makes it |
| 211 | order-independent: renaming identifiers does not change the fingerprint. |
| 212 | |
| 213 | Input longer than :data:`_MAX_SOURCE_BYTES` is silently truncated. |
| 214 | Malformed Python falls back to a regex-based tokenizer. |
| 215 | |
| 216 | Args: |
| 217 | source: Raw source code of the conflict region. |
| 218 | |
| 219 | Returns: |
| 220 | Sorted space-separated normalized token string, or ``""`` for empty / |
| 221 | whitespace-only / comment-only input. |
| 222 | |
| 223 | Example:: |
| 224 | |
| 225 | code_fingerprint("def foo(x): return x + 1") |
| 226 | # → "( ) + : ID ID ID NUM def return" |
| 227 | """ |
| 228 | if not source or not source.strip(): |
| 229 | return "" |
| 230 | |
| 231 | # Guard against oversized input |
| 232 | if len(source.encode()) > _MAX_SOURCE_BYTES: |
| 233 | source = source.encode()[:_MAX_SOURCE_BYTES].decode(errors="replace") |
| 234 | |
| 235 | tokens = _tokenize_python(source) |
| 236 | return " ".join(sorted(tokens)) |
| 237 | |
| 238 | class CodePlugin: |
| 239 | """HarmonyPlugin for code-domain conflict similarity. |
| 240 | |
| 241 | Implements the :class:`~muse.core.harmony.engine.HarmonyPlugin` structural |
| 242 | protocol — no explicit inheritance required. |
| 243 | |
| 244 | Computes Tanimoto similarity on the token multisets encoded in two |
| 245 | :func:`code_fingerprint` strings. Returns values in ``[0.0, 1.0]``. |
| 246 | |
| 247 | Typical usage:: |
| 248 | |
| 249 | from muse.core.plugins.code_harmony import CodePlugin |
| 250 | from muse.core.harmony.engine import resolve, EngineConfig |
| 251 | |
| 252 | result = resolve(root, pattern, plugin=CodePlugin()) |
| 253 | """ |
| 254 | |
| 255 | def similarity(self, fp_a: str, fp_b: str) -> float: |
| 256 | """Tanimoto coefficient on token multisets. |
| 257 | |
| 258 | Args: |
| 259 | fp_a: Fingerprint string from :func:`code_fingerprint`. |
| 260 | fp_b: Fingerprint string from :func:`code_fingerprint`. |
| 261 | |
| 262 | Returns: |
| 263 | Float in ``[0.0, 1.0]``. ``1.0`` for identical token bags; |
| 264 | ``0.0`` when one fingerprint is empty and the other is not. |
| 265 | """ |
| 266 | if not fp_a and not fp_b: |
| 267 | return 1.0 |
| 268 | if not fp_a or not fp_b: |
| 269 | return 0.0 |
| 270 | |
| 271 | ca = Counter(fp_a.split()) |
| 272 | cb = Counter(fp_b.split()) |
| 273 | |
| 274 | all_tokens = set(ca) | set(cb) |
| 275 | intersection = sum(min(ca[t], cb[t]) for t in all_tokens) |
| 276 | union = sum(max(ca[t], cb[t]) for t in all_tokens) |
| 277 | |
| 278 | return intersection / union if union else 1.0 |
File History
1 commit
sha256:e6465e8a9b7fa8e6223ed4a3576e96c568c913ae2caeb9c31f15e7a81b250b40
docs: add | jq convention to --json section of agent-guide
Sonnet 4.6
1 day ago