"""Harmony policy persistence and matching — save, load, list, remove, match. Single responsibility: CRUD for Policy objects in the harmony store plus the _condition_matches / match_policy evaluation helpers. """ from __future__ import annotations import fnmatch import json import logging import pathlib from collections.abc import Mapping from muse.core.types import JsonValue, load_json_file from .fingerprint import _parse_dt from .paths import policies_dir, _validate_policy_id from .patterns import _write_atomic from .types import ( ConflictPattern, Policy, PolicyAction, PolicyCondition, PolicyScope, _PolicyConditionDict, _PolicyDict, ) logger = logging.getLogger(__name__) #: Maximum bytes read from a policy JSON file. _MAX_POLICY_BYTES: int = 8_192 # 8 KiB #: Maximum policies loaded in a single :func:`list_policies` call. _MAX_POLICIES: int = 1_000 # --------------------------------------------------------------------------- # Serialisation helpers # --------------------------------------------------------------------------- def _policy_condition_to_dict(cond: PolicyCondition) -> _PolicyConditionDict: return _PolicyConditionDict( conflict_type=cond.conflict_type, domain=cond.domain, path_pattern=cond.path_pattern, min_confidence=cond.min_confidence, ) def _policy_to_dict(policy: Policy) -> _PolicyDict: return _PolicyDict( policy_id=policy.policy_id, description=policy.description, when=_policy_condition_to_dict(policy.when), action=policy.action, confidence=policy.confidence, escalate_to=policy.escalate_to, delegate_to=policy.delegate_to, scope=policy.scope, created_at=policy.created_at.isoformat(), created_by=policy.created_by, ) def _dict_to_policy(data: Mapping[str, JsonValue]) -> Policy | None: """Deserialise a JSON dict to :class:`Policy`, returning ``None`` on error.""" try: when_data = data.get("when") or {} condition = PolicyCondition( conflict_type=when_data.get("conflict_type") or None, domain=when_data.get("domain") or None, path_pattern=when_data.get("path_pattern") or None, min_confidence=when_data.get("min_confidence"), ) return Policy( policy_id=str(data["policy_id"]), description=str(data.get("description", "")), when=condition, action=str(data.get("action", PolicyAction.ESCALATE)), confidence=float(data.get("confidence", 1.0)), escalate_to=data.get("escalate_to") or None, delegate_to=data.get("delegate_to") or None, scope=str(data.get("scope", PolicyScope.REPO)), created_at=_parse_dt(data.get("created_at")), created_by=str(data.get("created_by", "unknown")), ) except (KeyError, TypeError, ValueError) as exc: logger.warning("⚠️ harmony: failed to deserialise policy: %s", exc) return None # --------------------------------------------------------------------------- # Policy CRUD # --------------------------------------------------------------------------- def save_policy(root: pathlib.Path, policy: Policy) -> None: """Persist a :class:`Policy` to the harmony policy store. Stored at ``.muse/harmony/policies/.json``. **Overwrites** any existing policy with the same ID — policies are versioned by replacement, not by append. Args: root: Repository root. policy: Policy to save. Raises: ValueError: If ``policy.policy_id`` is not URL-safe. """ _validate_policy_id(policy.policy_id) dest = policies_dir(root) / f"{policy.policy_id}.json" _write_atomic(dest, json.dumps(_policy_to_dict(policy), indent=2)) logger.debug( "harmony: saved policy %r (action=%s, scope=%s)", policy.policy_id, policy.action, policy.scope, ) def load_policy(root: pathlib.Path, policy_id: str) -> Policy | None: """Load a single :class:`Policy` by ID. Returns ``None`` when *policy_id* is invalid, the file does not exist, it exceeds :data:`_MAX_POLICY_BYTES`, or JSON parsing fails. Args: root: Repository root. policy_id: URL-safe policy identifier. """ try: _validate_policy_id(policy_id) except ValueError: return None dest = policies_dir(root) / f"{policy_id}.json" if not dest.exists(): return None try: size = dest.stat().st_size except OSError as exc: logger.warning("⚠️ harmony: failed to read policy %r: %s", policy_id, exc) return None if size > _MAX_POLICY_BYTES: logger.warning( "⚠️ harmony: policy %r is %d bytes — too large; skipping", policy_id, size, ) return None data = load_json_file(dest) if data is None: logger.warning( "⚠️ harmony: failed to read policy %r: unreadable or invalid JSON", policy_id ) return None return _dict_to_policy(data) def list_policies(root: pathlib.Path) -> list[Policy]: """Return all :class:`Policy` entries from the harmony policy store. Sorted by scope order (workspace → repo → domain → file) then by ``created_at`` ascending within each scope, so earlier policies take precedence over later ones at the same scope level. Args: root: Repository root. """ _SCOPE_ORDER = { PolicyScope.WORKSPACE: 0, PolicyScope.REPO: 1, PolicyScope.DOMAIN: 2, PolicyScope.FILE: 3, } pdir = policies_dir(root) if not pdir.exists(): return [] results: list[Policy] = [] count = 0 for f in pdir.iterdir(): if count >= _MAX_POLICIES: logger.warning( "⚠️ harmony: >%d policies — scan truncated", _MAX_POLICIES ) break count += 1 if f.is_symlink() or not f.is_file(): continue if not f.name.endswith(".json"): continue pid = f.name[:-5] p = load_policy(root, pid) if p is not None: results.append(p) results.sort(key=lambda p: (_SCOPE_ORDER.get(p.scope, 99), p.created_at)) return results def remove_policy(root: pathlib.Path, policy_id: str) -> bool: """Remove a policy from the harmony store. Args: root: Repository root. policy_id: URL-safe policy identifier. Returns: ``True`` if the policy existed and was removed, ``False`` otherwise. """ try: _validate_policy_id(policy_id) except ValueError: logger.warning( "⚠️ harmony: invalid policy_id in remove_policy: %r", policy_id ) return False dest = policies_dir(root) / f"{policy_id}.json" if not dest.exists(): return False dest.unlink(missing_ok=True) logger.debug("harmony: removed policy %r", policy_id) return True # --------------------------------------------------------------------------- # Policy matching # --------------------------------------------------------------------------- def _condition_matches(condition: PolicyCondition, pattern: ConflictPattern) -> bool: """Return ``True`` if *pattern* satisfies every non-``None`` field of *condition*. ``None`` fields are wildcards and match any value. ``path_pattern`` uses :func:`fnmatch.fnmatch` glob semantics (e.g. ``"*.mid"``, ``"src/**"``). ``min_confidence`` is a proposal-time filter evaluated by the harmony engine against the resolution confidence — not against any static field of the pattern — so it is intentionally not checked here. """ if condition.conflict_type is not None: if pattern.conflict_type != condition.conflict_type: return False if condition.domain is not None: if pattern.domain != condition.domain: return False if condition.path_pattern is not None: if not fnmatch.fnmatch(pattern.path, condition.path_pattern): return False return True def match_policy( policies: list[Policy], pattern: ConflictPattern, ) -> Policy | None: """Return the first :class:`Policy` whose condition matches *pattern*. Evaluates policies in the order given. Callers should pass the list returned by :func:`list_policies`, which is already sorted by scope (workspace → repo → domain → file). Args: policies: Ordered list of active policies to evaluate. pattern: Incoming conflict pattern to match against. Returns: The first matching :class:`Policy`, or ``None`` if no policy fires. """ for policy in policies: if _condition_matches(policy.when, pattern): return policy return None