"""``muse annotate`` — attach CRDT-backed metadata to an existing commit. Annotations use real CRDT semantics so that multiple agents can annotate the same commit concurrently without conflicts: - ``--reviewed-by NAME`` merges into ``reviewed_by`` using **ORSet** semantics (set union — once added, a reviewer is never lost by concurrent additions). - ``--remove-reviewer NAME`` removes a reviewer (not CRDT-safe across concurrent removals, but useful for correcting mistakes). - ``--test-run`` increments ``test_runs`` using **GCounter** semantics (monotonically increasing counter). - ``--label LABEL`` merges into ``labels`` using **ORSet** semantics. - ``--remove-label LABEL`` removes a label. - ``--status STATUS`` sets ``status`` using **LWW-Register** semantics (last write wins). Must be one of: ``pending``, ``approved``, ``rejected``, ``needs-review``, ``wip``. Pass an empty string ``""`` to clear. - ``--note TEXT`` appends to ``notes`` (append-only, no dedup). - ``--score FLOAT`` sets ``score`` using **LWW-Register** semantics (last write wins). Must be in [0.0, 1.0]. These annotations are persisted directly in the commit record on disk. Commit reference ---------------- The *commit* argument accepts any reference that ``resolve_commit_ref`` understands: - ``HEAD`` or omitted — the most recent commit on the current branch. - ``HEAD~N`` — *N* first-parent steps back from HEAD. - A full 64-character hex commit ID. - A short hex prefix (e.g. ``abc1234``) — resolved by prefix scan. Security model -------------- - All user-controlled string values are validated before storage: control characters, ANSI escapes, and oversized values are rejected. - All values are sanitized via ``sanitize_display()`` before appearing in human-readable terminal output. - All error messages go to **stderr**; **stdout** carries only data. - Commit references are resolved through the safe-prefix scan in ``resolve_commit_ref`` — glob metacharacters cannot escape the scan. Agent UX -------- Pass ``--json`` to receive a machine-readable object on stdout. Usage:: muse annotate # show HEAD annotations muse annotate abc1234 # show commit annotations muse annotate abc1234 --reviewed-by agent-x muse annotate abc1234 --reviewed-by alice --reviewed-by bob muse annotate abc1234 --reviewed-by 'alice,claude-v4' muse annotate abc1234 --remove-reviewer agent-old muse annotate abc1234 --test-run muse annotate abc1234 --label hotfix --label perf muse annotate abc1234 --remove-label hotfix muse annotate abc1234 --status approved muse annotate abc1234 --note "Checked edge cases in production." muse annotate abc1234 --score 0.95 muse annotate abc1234 --dry-run --reviewed-by agent-x muse annotate abc1234 --json JSON schema (``--json``):: { "commit_id": "", "parent_commit_id": "", "snapshot_id": "", "message": "", "branch": "", "author": "", "agent_id": "", "model_id": "", "committed_at": "", "reviewed_by": ["alice", "bob"], "test_runs": 3, "labels": ["hotfix", "perf"], "status": "approved", "notes": ["Checked edge cases in production."], "score": 0.95, "changed": true, "dry_run": false } Exit codes ---------- - 0 — success (show or mutation applied) - 1 — user error (bad input, commit not found, invalid args) - 2 — not inside a Muse repository """ import argparse import json import logging import sys from typing import TypedDict import pathlib from muse.core.envelope import EnvelopeJson, make_envelope from muse.core.errors import ExitCode from muse.core.repo import require_repo from muse.core.timing import start_timer from muse.core.refs import read_current_branch from muse.core.commits import ( CommitRecord, overwrite_commit, resolve_commit_ref, ) from muse.core.validation import sanitize_display, sanitize_provenance logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Validation constants # --------------------------------------------------------------------------- _MAX_REVIEWER_LEN: int = 200 _MAX_LABEL_LEN: int = 100 _MAX_NOTE_LEN: int = 4000 _STATUS_VALUES: frozenset[str] = frozenset( {"", "pending", "approved", "rejected", "needs-review", "wip"} ) # --------------------------------------------------------------------------- # JSON TypedDict — stable machine-readable output schema # --------------------------------------------------------------------------- class _AnnotatePayload(TypedDict): """Domain-only fields built by ``_record_to_json``.""" commit_id: str parent_commit_id: str | None snapshot_id: str message: str branch: str author: str agent_id: str model_id: str committed_at: str reviewed_by: list[str] test_runs: int labels: list[str] status: str notes: list[str] score: float | None changed: bool dry_run: bool class _AnnotateJson(_AnnotatePayload, EnvelopeJson): """Full wire shape for ``muse annotate --json``.""" # --------------------------------------------------------------------------- # Sentinel for "not provided" (distinct from None) # --------------------------------------------------------------------------- class _Unset: """Singleton sentinel — distinguishes 'not provided' from ``None``.""" _instance: "_Unset | None" = None def __new__(cls) -> "_Unset": if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance _UNSET = _Unset() # --------------------------------------------------------------------------- # Validators # --------------------------------------------------------------------------- def _validate_name(name: str, *, field: str, max_len: int) -> str: """Return *name* unchanged if valid; raise SystemExit on bad input.""" if not name: print(f"❌ {field} must not be empty.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR.value) if len(name) > max_len: print( f"❌ {field} too long ({len(name)} chars, max {max_len}).", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR.value) sanitised = sanitize_provenance(name) if sanitised != name: print( f"❌ {field} contains control characters: {name!r}", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR.value) return name def _validate_reviewer(name: str) -> str: return _validate_name(name, field="reviewer name", max_len=_MAX_REVIEWER_LEN) def _validate_label(label: str) -> str: return _validate_name(label, field="label", max_len=_MAX_LABEL_LEN) def _validate_status(value: str) -> str: """Return *value* if it is a recognised status string.""" if value not in _STATUS_VALUES: valid = ", ".join(sorted(s for s in _STATUS_VALUES if s)) print( f"❌ unknown status {value!r}. Valid values: {valid}", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR.value) return value def _validate_score(raw: str) -> float: """Parse *raw* as a float in [0.0, 1.0].""" try: value = float(raw) except ValueError: print(f"❌ score must be a number, got {raw!r}.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR.value) if not (0.0 <= value <= 1.0): print( f"❌ score must be in [0.0, 1.0], got {value}.", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR.value) return value def _validate_note(text: str) -> str: """Validate a free-text note.""" if not text or not text.strip(): print("❌ note must not be empty.", file=sys.stderr) raise SystemExit(ExitCode.USER_ERROR.value) if len(text) > _MAX_NOTE_LEN: print( f"❌ note too long ({len(text)} chars, max {_MAX_NOTE_LEN}).", file=sys.stderr, ) raise SystemExit(ExitCode.USER_ERROR.value) return text # --------------------------------------------------------------------------- # Comma-separated list parsers # --------------------------------------------------------------------------- def _parse_name_list( raw_values: list[str], *, validator: "callable[[str], str]", ) -> list[str]: """Expand comma-separated lists and validate each name. Accepts both ``--flag alice --flag bob`` and ``--flag 'alice,bob'``. Whitespace is stripped; duplicates are removed (insertion order preserved). """ seen: set[str] = set() result: list[str] = [] for raw in raw_values: for part in raw.split(","): name = part.strip() if not name: continue validator(name) if name not in seen: seen.add(name) result.append(name) return result # --------------------------------------------------------------------------- # Reviewer list helper # --------------------------------------------------------------------------- def _parse_reviewer_list(raw_values: list[str]) -> list[str]: """Expand and validate a list of reviewer name tokens. Convenience wrapper over ``_parse_name_list`` with ``_validate_reviewer``. """ return _parse_name_list(raw_values, validator=_validate_reviewer) # --------------------------------------------------------------------------- # JSON builder # --------------------------------------------------------------------------- def _commit_to_json( record: CommitRecord, *, changed: bool, dry_run: bool, ) -> "_AnnotatePayload": """Build the annotation JSON payload from a ``CommitRecord``. Convenience wrapper over ``_record_to_json`` with no overrides. """ return _record_to_json(record, changed=changed, dry_run=dry_run) def _record_to_json( record: CommitRecord, *, override_reviewed_by: list[str] | None = None, override_test_runs: int | None = None, override_labels: list[str] | None = None, override_status: str | None = None, override_notes: list[str] | None = None, override_score: "float | None | _Unset" = None, use_score_override: bool = False, changed: bool, dry_run: bool, ) -> _AnnotatePayload: """Build the domain JSON payload from a ``CommitRecord``, with optional overrides.""" final_score = ( override_score # type: ignore[assignment] if use_score_override else record.score ) return _AnnotatePayload( commit_id=record.commit_id, parent_commit_id=record.parent_commit_id, snapshot_id=record.snapshot_id, message=record.message, branch=record.branch, author=record.author, agent_id=record.agent_id, model_id=record.model_id, committed_at=record.committed_at.isoformat(), reviewed_by=( override_reviewed_by if override_reviewed_by is not None else list(record.reviewed_by) ), test_runs=( override_test_runs if override_test_runs is not None else record.test_runs ), labels=( override_labels if override_labels is not None else list(record.labels) ), status=( override_status if override_status is not None else record.status ), notes=( override_notes if override_notes is not None else list(record.notes) ), score=final_score, changed=changed, dry_run=dry_run, ) # --------------------------------------------------------------------------- # Human-readable display # --------------------------------------------------------------------------- def _show(record: CommitRecord, *, output_json: bool, envelope: EnvelopeJson | None = None) -> None: """Display the current annotations for *record*.""" if output_json: payload = _record_to_json(record, changed=False, dry_run=False) print(json.dumps(_AnnotateJson(**(envelope or {}), **payload))) return cid = sanitize_display(record.commit_id) print(f"ℹ️ commit {cid}") if record.reviewed_by: reviewers = ", ".join(sanitize_display(r) for r in sorted(record.reviewed_by)) print(f" reviewed-by: {reviewers}") else: print(" reviewed-by: (none)") print(f" test-runs: {record.test_runs}") if record.labels: labels = ", ".join(sanitize_display(lbl) for lbl in sorted(record.labels)) print(f" labels: {labels}") else: print(" labels: (none)") print(f" status: {sanitize_display(record.status) or '(unset)'}") if record.notes: print(f" notes: {len(record.notes)} note(s)") for i, note in enumerate(record.notes, 1): print(f" [{i}] {sanitize_display(note)}") else: print(" notes: (none)") score_str = f"{record.score:.4f}" if record.score is not None else "(unset)" print(f" score: {score_str}") # --------------------------------------------------------------------------- # Command registration # --------------------------------------------------------------------------- def register( subparsers: "argparse._SubParsersAction[argparse.ArgumentParser]", ) -> None: """Register the ``annotate`` subcommand.""" parser = subparsers.add_parser( "annotate", help="Attach CRDT-backed annotations to an existing commit.", description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( "commit_arg", nargs="?", default=None, metavar="COMMIT", help="Commit to annotate: full SHA, short prefix, HEAD~N, or omit for HEAD.", ) # --- reviewer flags --- parser.add_argument( "--reviewed-by", action="append", dest="reviewed_by", default=None, metavar="NAME[,NAME…]", help=( "Add a reviewer (ORSet semantics). " "Accepts comma-separated names or multiple flags." ), ) parser.add_argument( "--remove-reviewer", action="append", dest="remove_reviewer", default=None, metavar="NAME[,NAME…]", help="Remove a reviewer. Comma-separated or multiple flags.", ) # --- test-run counter --- parser.add_argument( "--test-run", action="store_true", dest="test_run", help="Increment the GCounter test-run count for this commit.", ) # --- label flags --- parser.add_argument( "--label", action="append", dest="label", default=None, metavar="LABEL[,LABEL…]", help=( "Add a label (ORSet semantics). " "Accepts comma-separated labels or multiple flags." ), ) parser.add_argument( "--remove-label", action="append", dest="remove_label", default=None, metavar="LABEL[,LABEL…]", help="Remove a label. Comma-separated or multiple flags.", ) # --- status (LWW) --- parser.add_argument( "--status", dest="status", default=None, metavar="STATUS", help=( "Set commit status (LWW). " f"Valid values: {', '.join(sorted(s for s in _STATUS_VALUES if s))}. " "Pass empty string to clear." ), ) # --- note (append-only) --- parser.add_argument( "--note", action="append", dest="notes", default=None, metavar="TEXT", help="Append a free-text note (append-only, no dedup).", ) # --- score (LWW float) --- parser.add_argument( "--score", dest="score_raw", default=None, metavar="FLOAT", help="Set a quality score in [0.0, 1.0] (LWW semantics).", ) # --- shared flags --- parser.add_argument( "--dry-run", "-n", action="store_true", dest="dry_run", help="Show what would change without writing to disk.", ) parser.add_argument( "--json", "-j", action="store_true", dest="json_out", help="Emit a JSON object to stdout.", ) parser.set_defaults(func=run) # --------------------------------------------------------------------------- # Command handler # --------------------------------------------------------------------------- def run(args: argparse.Namespace) -> None: """Attach CRDT-backed annotations to an existing commit. Adds or removes reviewers, labels, status, notes, and quality scores on any commit without rewriting history. Annotations are stored as CRDT deltas and merged automatically across branches. Agent quickstart ---------------- :: muse annotate --reviewed-by alice --label approved --json muse annotate --status reviewed --score 0.95 --json muse annotate --remove-label wip --json JSON fields ----------- commit_id Commit that was annotated. reviewers Full list of reviewers after the update. labels Full list of labels after the update. status Current status string, or ``null``. score Quality score float, or ``null``. notes List of note strings. dry_run ``true`` when ``--dry-run`` was passed (no writes occurred). Exit codes ---------- 0 Annotated successfully. 1 Validation error or commit not found. 2 Not inside a Muse repository. """ elapsed = start_timer() commit_arg: str | None = args.commit_arg reviewed_by_raw: list[str] | None = args.reviewed_by remove_reviewer_raw: list[str] | None = args.remove_reviewer test_run: bool = args.test_run label_raw: list[str] | None = args.label remove_label_raw: list[str] | None = args.remove_label status_raw: str | None = args.status notes_raw: list[str] | None = args.notes score_raw: str | None = args.score_raw dry_run: bool = args.dry_run json_out: bool = args.json_out root = require_repo() branch = read_current_branch(root) record = resolve_commit_ref(root, branch, commit_arg) if record is None: ref_display = sanitize_display(commit_arg or "HEAD") print(f"❌ commit {ref_display!r} not found.", file=sys.stderr) raise SystemExit(ExitCode.NOT_FOUND.value) # Validate all inputs before touching any state. add_reviewers = _parse_name_list(reviewed_by_raw or [], validator=_validate_reviewer) remove_reviewers = _parse_name_list(remove_reviewer_raw or [], validator=_validate_reviewer) add_labels = _parse_name_list(label_raw or [], validator=_validate_label) remove_labels = _parse_name_list(remove_label_raw or [], validator=_validate_label) new_status: str | None = _validate_status(status_raw) if status_raw is not None else None new_notes: list[str] = [_validate_note(n) for n in (notes_raw or [])] score_provided = score_raw is not None new_score: float | None = _validate_score(score_raw) if score_provided else None is_show_mode = ( not add_reviewers and not remove_reviewers and not test_run and not add_labels and not remove_labels and new_status is None and not new_notes and not score_provided ) if is_show_mode: _show(record, output_json=json_out, envelope=make_envelope(elapsed)) return # --- compute new state --- # reviewed_by: ORSet reviewer_set: set[str] = set(record.reviewed_by) added_reviewers: list[str] = [] removed_reviewers: list[str] = [] for name in add_reviewers: if name not in reviewer_set: reviewer_set.add(name) added_reviewers.append(name) for name in remove_reviewers: if name in reviewer_set: reviewer_set.discard(name) removed_reviewers.append(name) final_reviewed_by = sorted(reviewer_set) # test_runs: GCounter final_test_runs = record.test_runs + (1 if test_run else 0) # labels: ORSet label_set: set[str] = set(record.labels) added_labels: list[str] = [] removed_labels: list[str] = [] for lbl in add_labels: if lbl not in label_set: label_set.add(lbl) added_labels.append(lbl) for lbl in remove_labels: if lbl in label_set: label_set.discard(lbl) removed_labels.append(lbl) final_labels = sorted(label_set) # status: LWW final_status = new_status if new_status is not None else record.status # notes: append-only final_notes = list(record.notes) + new_notes # score: LWW final_score: float | None = new_score if score_provided else record.score changed = ( final_reviewed_by != sorted(record.reviewed_by) or final_test_runs != record.test_runs or final_labels != sorted(record.labels) or final_status != record.status or final_notes != record.notes or final_score != record.score ) if not dry_run and changed: record.reviewed_by = final_reviewed_by record.test_runs = final_test_runs record.labels = final_labels record.status = final_status record.notes = final_notes record.score = final_score overwrite_commit(root, record) if json_out: payload = _record_to_json( record, override_reviewed_by=final_reviewed_by, override_test_runs=final_test_runs, override_labels=final_labels, override_status=final_status, override_notes=final_notes, override_score=final_score, use_score_override=True, changed=changed, dry_run=dry_run, ) print(json.dumps(_AnnotateJson(**make_envelope(elapsed), **payload))) return # Human-readable output. cid = sanitize_display(record.commit_id) prefix = "[dry-run] " if dry_run else "" for name in added_reviewers: print(f"✅ {prefix}Added reviewer: {sanitize_display(name)}") for name in removed_reviewers: print(f"✅ {prefix}Removed reviewer: {sanitize_display(name)}") removed_reviewer_set = set(removed_reviewers) for name in remove_reviewers: if name not in removed_reviewer_set: print( f"⚠️ reviewer {sanitize_display(name)!r} was not present.", file=sys.stderr, ) if test_run: print(f"✅ {prefix}Test run recorded (total: {final_test_runs})") for lbl in added_labels: print(f"✅ {prefix}Added label: {sanitize_display(lbl)}") for lbl in removed_labels: print(f"✅ {prefix}Removed label: {sanitize_display(lbl)}") removed_label_set = set(removed_labels) for lbl in remove_labels: if lbl not in removed_label_set: print( f"⚠️ label {sanitize_display(lbl)!r} was not present.", file=sys.stderr, ) if new_status is not None: status_display = sanitize_display(new_status) or "(cleared)" print(f"✅ {prefix}Status set to: {status_display}") for note in new_notes: preview = sanitize_display(note[:60]) print(f"✅ {prefix}Note appended: {preview!r}") if score_provided: print(f"✅ {prefix}Score set to: {final_score:.4f}") if changed: print(f"[{cid}] annotation {'would be ' if dry_run else ''}updated") else: print(f"[{cid}] no changes (annotations already up to date)")