gabriel / muse public
check_attr.py python
502 lines 16.2 KB
Raw
sha256:06dba78c2a78e251b580422dd1fd547f3c8357ff18f7709a860873b2d24dbbbf chore: bump version to 0.2.0rc14 Sonnet 4.6 patch 1 day ago
1 """muse check-attr — query merge-strategy attributes for paths.
2
3 Reads ``.museattributes``, resolves the applicable rules for each supplied
4 path, and reports the strategy that would be applied per dimension. Useful
5 for verifying that attribute rules are wired up correctly before a merge, and
6 for scripting domain-aware merge drivers.
7
8 Output (JSON, default)::
9
10 {
11 "domain": "midi",
12 "rules_loaded": 3,
13 "dimension": "*",
14 "summary": {
15 "total": 2,
16 "matched": 1,
17 "unmatched": 1,
18 "by_strategy": {"ours": 1, "auto": 1}
19 },
20 "results": [
21 {
22 "path": "tracks/drums.mid",
23 "dimension": "*",
24 "strategy": "ours",
25 "rule": {
26 "path_pattern": "drums/*",
27 "dimension": "*",
28 "strategy": "ours",
29 "comment": "Drums always prefer ours.",
30 "priority": 10,
31 "source_index": 0
32 }
33 },
34 {
35 "path": "tracks/melody.mid",
36 "dimension": "*",
37 "strategy": "auto",
38 "rule": null
39 }
40 ],
41 "duration_ms": 0.000123,
42 "exit_code": 0
43 }
44
45 Text output (``--format text``)::
46
47 tracks/drums.mid dimension=* strategy=ours (rule 0: drums/*)
48 tracks/melody.mid dimension=* strategy=auto (no matching rule)
49
50 With ``--rules-only`` (emit the loaded rule list without testing paths)::
51
52 {
53 "domain": "midi",
54 "rules_loaded": 3,
55 "dimension": "*",
56 "rules": [
57 {"path_pattern": "drums/*", "dimension": "*", "strategy": "ours", ...}
58 ],
59 "duration_ms": 0.000042,
60 "exit_code": 0
61 }
62
63 Output contract
64 ---------------
65
66 - Exit 0: attributes resolved and emitted (even when no rules match).
67 - Exit 1: bad ``--format`` value; missing path arguments when not using
68 ``--rules-only`` or ``--stdin``; ``--unmatched-only`` combined
69 with ``--rules-only``.
70 - Exit 3: I/O or TOML parse error reading ``.museattributes``.
71
72 JSON fields present in every successful response
73 ------------------------------------------------
74
75 ``duration_ms``
76 Wall-clock time in seconds from argument parsing to output. Useful for
77 agents monitoring attribute resolution latency as the rule set grows.
78 ``exit_code``
79 Always ``0`` on success. Lets agents parse a single JSON payload instead
80 of inspecting the process exit code separately.
81 ``summary`` *(default mode only)*
82 ``total`` — number of results emitted (after ``--unmatched-only`` filter).
83 ``matched`` — paths where a rule fired (strategy ≠ ``"auto"``).
84 ``unmatched`` — paths that fell through to the default ``"auto"`` strategy.
85 ``by_strategy`` — ``{strategy: count}`` map for the emitted results.
86
87 Strategies
88 ----------
89
90 ``ours``
91 Conflict resolution keeps the current-branch version.
92 ``theirs``
93 Conflict resolution keeps the incoming-branch version.
94 ``union``
95 Conflict resolution takes the union of both sides (additive, e.g. note sets).
96 ``base``
97 Conflict resolution falls back to the common ancestor.
98 ``auto``
99 No rule matched; the merge engine selects the best strategy automatically.
100 ``manual``
101 Conflict must be resolved manually; merge engine halts and surfaces the
102 conflict for human or agent inspection.
103
104 Agent use
105 ---------
106
107 Inspect active rules before a merge::
108
109 muse check-attr --rules-only --json
110
111 Pipe paths from staging area::
112
113 muse check-attr --stdin < staged_paths.txt
114
115 Query a specific dimension across many files::
116
117 muse check-attr tracks/drums.mid tracks/melody.mid --dimension notes
118
119 Discover all rules that would fire for a path::
120
121 muse check-attr tracks/drums.mid --all-rules --json
122
123 Find paths with no rule coverage (attribute gap analysis)::
124
125 muse check-attr --unmatched-only --stdin < staged_paths.txt --json
126
127 Check coverage summary without inspecting individual results::
128
129 muse check-attr foo.py bar.py baz.py --json | python3 -c \\
130 "import sys,json; s=json.load(sys.stdin)['summary']; print(s)"
131 """
132
133 import argparse
134 import fnmatch
135 import json
136 import logging
137 import sys
138 from typing import TypedDict
139
140 from muse.core.attributes import AttributeRule, load_attributes
141 from muse.core.errors import ExitCode
142 from muse.core.repo import require_repo
143 from muse.core.validation import sanitize_display, validate_workspace_path
144 from muse.core.timing import start_timer
145 from muse.core.envelope import EnvelopeJson, make_envelope
146 from muse.plugins.registry import read_domain
147
148 type _PerPath = dict[str, list["_RuleDict"]]
149 logger = logging.getLogger(__name__)
150
151 class _RuleDict(TypedDict):
152 path_pattern: str
153 dimension: str
154 strategy: str
155 comment: str
156 priority: int
157 source_index: int
158
159 class _PathResult(TypedDict):
160 path: str
161 dimension: str
162 strategy: str
163 rule: _RuleDict | None
164
165 class _SummaryDict(TypedDict):
166 total: int
167 matched: int
168 unmatched: int
169 by_strategy: dict[str, int]
170
171 class _RulesListJson(EnvelopeJson):
172 """Wire shape for --rules-only --json."""
173
174 domain: str
175 rules_loaded: int
176 dimension: str
177 rules: list[_RuleDict]
178
179 class _AllRulesJson(EnvelopeJson):
180 """Wire shape for --all-rules --json."""
181
182 domain: str
183 rules_loaded: int
184 dimension: str
185 results: list[dict]
186
187 class _CheckAttrJson(EnvelopeJson):
188 """Wire shape for default first-match --json output."""
189
190 domain: str
191 rules_loaded: int
192 dimension: str
193 summary: _SummaryDict
194 results: list[dict]
195
196 def _dim_match(rule: AttributeRule, dimension: str) -> bool:
197 """Return True when *rule* applies to *dimension*."""
198 return rule.dimension == "*" or rule.dimension == dimension or dimension == "*"
199
200 def _resolve_with_rule(
201 rules: list[AttributeRule],
202 path: str,
203 dimension: str,
204 ) -> tuple[str, AttributeRule | None]:
205 """Single-pass resolution: return ``(strategy, first_matching_rule)``.
206
207 Replaces the previous pattern of calling ``resolve_strategy`` then
208 ``_find_matching_rule`` separately, which iterated the rule list twice.
209
210 Returns ``("auto", None)`` when no rule matches.
211 """
212 for rule in rules:
213 if fnmatch.fnmatch(path, rule.path_pattern) and _dim_match(rule, dimension):
214 return rule.strategy, rule
215 return "auto", None
216
217 def _all_matching_rules(
218 rules: list[AttributeRule],
219 path: str,
220 dimension: str,
221 ) -> list[AttributeRule]:
222 """Return every rule that matches *path* and *dimension* (for ``--all-rules``)."""
223 return [
224 rule for rule in rules
225 if fnmatch.fnmatch(path, rule.path_pattern) and _dim_match(rule, dimension)
226 ]
227
228 def _rule_to_dict(rule: AttributeRule) -> _RuleDict:
229 return {
230 "path_pattern": rule.path_pattern,
231 "dimension": rule.dimension,
232 "strategy": rule.strategy,
233 "comment": rule.comment,
234 "priority": rule.priority,
235 "source_index": rule.source_index,
236 }
237
238 def register(subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]") -> None:
239 """Register the check-attr subcommand."""
240 parser = subparsers.add_parser(
241 "check-attr",
242 help="Query merge-strategy attributes for workspace paths.",
243 description=__doc__,
244 formatter_class=argparse.RawDescriptionHelpFormatter,
245 )
246 parser.add_argument(
247 "paths",
248 nargs="*",
249 help=(
250 "Workspace-relative paths to check. "
251 "Required unless --stdin or --rules-only is used."
252 ),
253 )
254 parser.add_argument(
255 "--stdin",
256 action="store_true",
257 dest="from_stdin",
258 help=(
259 "Read additional paths from stdin, one per line. "
260 "Blank lines and '#'-comments are skipped. "
261 "Combines with positional path arguments."
262 ),
263 )
264 parser.add_argument(
265 "--dimension", "-d",
266 default="*",
267 dest="dimension",
268 metavar="DIMENSION",
269 help=(
270 "Domain dimension to query (e.g. 'notes', 'pitch_bend'). "
271 "Use '*' to match any dimension. (default: *)"
272 ),
273 )
274 parser.add_argument(
275 "--json", "-j", action="store_true", dest="json_out",
276 help="Emit machine-readable JSON.",
277 )
278 parser.add_argument(
279 "--all-rules", "-A",
280 action="store_true",
281 dest="all_rules",
282 help="For each path, list all matching rules (not just the first).",
283 )
284 parser.add_argument(
285 "--rules-only",
286 action="store_true",
287 dest="rules_only",
288 help=(
289 "Emit the full loaded rule list without testing any paths. "
290 "No path arguments needed. "
291 "Useful for agents inspecting attribute configuration before staging."
292 ),
293 )
294 parser.add_argument(
295 "--unmatched-only",
296 action="store_true",
297 dest="unmatched_only",
298 help=(
299 "Show only paths that have no matching rule (strategy=auto). "
300 "Useful for identifying gaps in attribute coverage before a merge."
301 ),
302 )
303 parser.set_defaults(func=run)
304
305 def run(args: argparse.Namespace) -> None:
306 """Query merge-strategy attributes for one or more paths.
307
308 Reads ``.museattributes`` from the repository root and reports the effective
309 merge strategy for each path in the requested dimension. Default mode
310 returns the first matching rule per path in O(N); ``--all-rules`` returns
311 every matching rule; ``--rules-only`` lists all loaded rules without
312 requiring path arguments.
313
314 Agent quickstart
315 ----------------
316 ::
317
318 muse check-attr tracks/drums.mid --json
319 muse check-attr --rules-only --json
320 muse check-attr foo.py bar.py --json
321
322 JSON fields
323 -----------
324 domain Repository domain (e.g. ``"code"``).
325 rules_loaded Total number of rules loaded from ``.museattributes``.
326 dimension Attribute dimension queried (e.g. ``"merge"``).
327 results List of per-path result objects, each with: ``path``,
328 ``dimension``, ``strategy``, ``rule`` (matched rule or
329 ``null``), ``matching_rules`` (with ``--all-rules``).
330 summary Summary counts: ``total``, ``matched``, ``unmatched``,
331 ``by_strategy`` (only in default mode, not ``--rules-only``).
332 rules List of all loaded rules (only with ``--rules-only``).
333
334 Exit codes
335 ----------
336 0 Query completed successfully.
337 1 Conflicting flags or path validation error.
338 2 Not inside a Muse repository.
339 3 Internal error reading ``.museattributes``.
340 """
341 elapsed = start_timer()
342 json_out: bool = args.json_out
343 cli_paths: list[str] = args.paths or []
344 from_stdin: bool = args.from_stdin
345 dimension: str = args.dimension
346 all_rules_mode: bool = args.all_rules
347 rules_only: bool = args.rules_only
348 unmatched_only: bool = getattr(args, "unmatched_only", False)
349
350 if unmatched_only and rules_only:
351 print(
352 json.dumps({"error": "--unmatched-only and --rules-only are mutually exclusive."}),
353 file=sys.stderr,
354 )
355 raise SystemExit(ExitCode.USER_ERROR)
356
357 root = require_repo()
358 domain = read_domain(root)
359
360 try:
361 rules = load_attributes(root, domain=domain)
362 except ValueError as exc:
363 print(json.dumps({"error": str(exc)}), file=sys.stderr)
364 raise SystemExit(ExitCode.INTERNAL_ERROR)
365
366 # --rules-only: emit loaded rule list without requiring path args.
367 if rules_only:
368 rule_dicts = [_rule_to_dict(r) for r in rules]
369 if not json_out:
370 if not rules:
371 print("(no rules)")
372 for rd in rule_dicts:
373 print(
374 f"{sanitize_display(rd['path_pattern'])} "
375 f"dimension={sanitize_display(rd['dimension'])} "
376 f"strategy={sanitize_display(rd['strategy'])}"
377 )
378 else:
379 print(json.dumps(_RulesListJson(
380 **make_envelope(elapsed),
381 domain=domain,
382 rules_loaded=len(rules),
383 dimension=dimension,
384 rules=rule_dicts,
385 )))
386 return
387
388 # Collect paths: positional args + optional stdin.
389 # Strip \r\n (not just \n) so CRLF-terminated input on Windows or from
390 # carriage-return-injecting agents does not embed \r in path strings.
391 all_paths: list[str] = list(cli_paths)
392 if from_stdin:
393 for line in sys.stdin:
394 stripped = line.rstrip("\r\n")
395 if stripped and not stripped.startswith("#"):
396 all_paths.append(stripped)
397
398 if not all_paths:
399 print(
400 json.dumps({"error": "At least one path argument is required."}),
401 file=sys.stderr,
402 )
403 raise SystemExit(ExitCode.USER_ERROR)
404
405 # Validate paths: reject traversal sequences, null bytes, absolute paths,
406 # control characters, and excessively long values.
407 for p in all_paths:
408 try:
409 validate_workspace_path(p)
410 except ValueError as exc:
411 print(
412 json.dumps({"error": f"Invalid path {p!r}: {exc}"}),
413 file=sys.stderr,
414 )
415 raise SystemExit(ExitCode.USER_ERROR)
416
417 # --all-rules: every rule that fires for each path.
418 if all_rules_mode:
419 per_path: _PerPath = {
420 path: [_rule_to_dict(r) for r in _all_matching_rules(rules, path, dimension)]
421 for path in all_paths
422 }
423
424 if not json_out:
425 for path, matched_rules in per_path.items():
426 if not matched_rules:
427 print(f"{sanitize_display(path)} (no matching rules)")
428 else:
429 for rd in matched_rules:
430 print(
431 f"{sanitize_display(path)} "
432 f"dimension={sanitize_display(rd['dimension'])} "
433 f"strategy={sanitize_display(rd['strategy'])} "
434 f"(rule {rd['source_index']}: "
435 f"{sanitize_display(rd['path_pattern'])})"
436 )
437 return
438
439 print(json.dumps(_AllRulesJson(
440 **make_envelope(elapsed),
441 domain=domain,
442 rules_loaded=len(rules),
443 dimension=dimension,
444 results=[
445 {"path": path, "matching_rules": per_path[path]}
446 for path in all_paths
447 ],
448 )))
449 return
450
451 # Default: first-match winner per path — single O(N) pass per path.
452 results: list[_PathResult] = []
453 for path in all_paths:
454 strategy, matched_rule = _resolve_with_rule(rules, path, dimension)
455 results.append({
456 "path": path,
457 "dimension": dimension,
458 "strategy": strategy,
459 "rule": _rule_to_dict(matched_rule) if matched_rule else None,
460 })
461
462 # --unmatched-only: retain only paths that fell through to auto.
463 if unmatched_only:
464 results = [r for r in results if r["strategy"] == "auto"]
465
466 # Build summary counts over the (possibly filtered) result set.
467 by_strategy: dict[str, int] = {}
468 for r in results:
469 by_strategy[r["strategy"]] = by_strategy.get(r["strategy"], 0) + 1
470 summary: _SummaryDict = {
471 "total": len(results),
472 "matched": sum(1 for r in results if r["strategy"] != "auto"),
473 "unmatched": sum(1 for r in results if r["strategy"] == "auto"),
474 "by_strategy": by_strategy,
475 }
476
477 if not json_out:
478 for res in results:
479 rule_entry = res["rule"]
480 if rule_entry is not None:
481 rule_info = (
482 f"(rule {rule_entry['source_index']}: "
483 f"{sanitize_display(rule_entry['path_pattern'])})"
484 )
485 else:
486 rule_info = "(no matching rule)"
487 print(
488 f"{sanitize_display(res['path'])} "
489 f"dimension={sanitize_display(res['dimension'])} "
490 f"strategy={sanitize_display(res['strategy'])} "
491 f"{rule_info}"
492 )
493 return
494
495 print(json.dumps(_CheckAttrJson(
496 **make_envelope(elapsed),
497 domain=domain,
498 rules_loaded=len(rules),
499 dimension=dimension,
500 summary=summary,
501 results=[dict(r) for r in results],
502 )))
File History 1 commit
sha256:06dba78c2a78e251b580422dd1fd547f3c8357ff18f7709a860873b2d24dbbbf chore: bump version to 0.2.0rc14 Sonnet 4.6 patch 1 day ago