"""Tests for muse/core/harmony.py — Phase 1: Core data model. Coverage tiers -------------- I Unit — fingerprints, validation, namespaces, dataclasses, condition matching II Integration — all CRUD operations (patterns, resolutions, policies, audit, gc) III End-to-end — full conflict lifecycle from record → resolve → replay → gc IV Stress — 10 k pattern scan, concurrent writes under parallel threads V Data integrity— atomic writes (no temp files left), JSON round-trip, field types VI Security — path traversal, symlink guards, size caps, crafted IDs VII Performance — per-operation timing assertions """ from __future__ import annotations from collections.abc import Mapping import concurrent.futures import datetime from muse.core.paths import muse_dir from muse.core.types import fake_id import json import os import pathlib import tempfile import threading import time from dataclasses import FrozenInstanceError from typing import Any from unittest import mock import pytest import muse.core.harmony as h from muse.core.harmony import ( AgentProvenance, AuditEvent, AuditEventType, ConflictPattern, ConflictType, Policy, PolicyAction, PolicyCondition, PolicyScope, Resolution, ResolutionProposal, ResolutionStrategy, _MAX_AUDIT_BYTES, _MAX_PATTERN_BYTES, _MAX_POLICY_BYTES, _MAX_RESOLUTION_BYTES, _MAX_SCAN, _condition_matches, append_audit, best_resolution, blob_fingerprint, clear_all, compute_pattern_id, compute_resolution_id, forget_pattern, gc_stale, increment_applied_count, list_audit, list_patterns, list_policies, list_resolutions, load_pattern, load_policy, load_resolution, match_policy, record_pattern, remove_policy, save_policy, save_resolution, ) # --------------------------------------------------------------------------- # Shared fixtures # --------------------------------------------------------------------------- @pytest.fixture() def repo(tmp_path: pathlib.Path) -> pathlib.Path: """Return a temporary directory acting as a bare repo root.""" muse_dir(tmp_path).mkdir() return tmp_path def _utc_now() -> datetime.datetime: return datetime.datetime.now(datetime.timezone.utc) def _make_pattern( path: str = "track.mid", domain: str = "midi", conflict_type: str = ConflictType.CONTENT, ours: str = "ours", theirs: str = "theirs", description: Mapping[str, object] | None = None, recorded_by: str = "claude-code", ) -> ConflictPattern: """Build a ConflictPattern with sensible defaults.""" ours_id = fake_id(ours) theirs_id = fake_id(theirs) blob_fp = blob_fingerprint(ours_id, theirs_id) semantic_fp = blob_fp pattern_id = compute_pattern_id(path, blob_fp, semantic_fp) return ConflictPattern( pattern_id=pattern_id, path=path, domain=domain, conflict_type=conflict_type, blob_fingerprint=blob_fp, semantic_fingerprint=semantic_fp, ours_id=ours_id, theirs_id=theirs_id, description=description or {}, recorded_at=_utc_now(), recorded_by=recorded_by, ) def _make_resolution( pattern: ConflictPattern, strategy: str = ResolutionStrategy.MANUAL, confidence: float = 0.9, human_verified: bool = False, provenance: AgentProvenance | None = None, policy_id: str | None = None, ) -> Resolution: """Build a Resolution tied to *pattern* with sensible defaults.""" outcome_blob = fake_id(f"outcome-{pattern.pattern_id[:8]}") prov = provenance or AgentProvenance.agent("claude-code", "claude-sonnet-4-6") resolved_at = _utc_now() resolution_id = compute_resolution_id( pattern.pattern_id, outcome_blob, strategy, prov, resolved_at ) return Resolution( resolution_id=resolution_id, pattern_id=pattern.pattern_id, strategy=strategy, policy_id=policy_id, outcome_blob=outcome_blob, resolved_by=prov, human_verified=human_verified, confidence=confidence, rationale="Test rationale", resolved_at=resolved_at, ) def _make_policy( policy_id: str = "always-prefer-ours", scope: str = PolicyScope.REPO, action: str = PolicyAction.PREFER_OURS, confidence: float = 0.95, conflict_type: str | None = None, domain: str | None = None, path_pattern: str | None = None, ) -> Policy: """Build a Policy with sensible defaults.""" return Policy( policy_id=policy_id, description="Test policy", when=PolicyCondition( conflict_type=conflict_type, domain=domain, path_pattern=path_pattern, ), action=action, confidence=confidence, escalate_to=None, delegate_to=None, scope=scope, created_at=_utc_now(), created_by="claude-code", ) # =========================================================================== # Tier I — Unit tests # =========================================================================== class TestBlobFingerprint: """I: blob_fingerprint must be commutative and deterministic.""" def test_commutativity(self) -> None: a, b = fake_id("A"), fake_id("B") assert blob_fingerprint(a, b) == blob_fingerprint(b, a) def test_determinism(self) -> None: a, b = fake_id("X"), fake_id("Y") fp1 = blob_fingerprint(a, b) fp2 = blob_fingerprint(a, b) assert fp1 == fp2 def test_output_is_64_hex(self) -> None: a, b = fake_id("p"), fake_id("q") fp = blob_fingerprint(a, b) assert fp.startswith("sha256:") assert len(fp) == 71 def test_distinct_pairs_differ(self) -> None: ab = blob_fingerprint(fake_id("A"), fake_id("B")) cd = blob_fingerprint(fake_id("C"), fake_id("D")) assert ab != cd def test_same_id_both_sides(self) -> None: a = fake_id("same") # Should not crash; result is deterministic fp = blob_fingerprint(a, a) assert fp.startswith("sha256:") assert len(fp) == 71 class TestComputePatternId: """I: compute_pattern_id includes path, so same content → different IDs for different paths.""" def test_deterministic(self) -> None: blob_fp = fake_id("blob") sem_fp = fake_id("sem") p1 = compute_pattern_id("track.mid", blob_fp, sem_fp) p2 = compute_pattern_id("track.mid", blob_fp, sem_fp) assert p1 == p2 def test_path_changes_id(self) -> None: blob_fp = fake_id("blob") sem_fp = fake_id("sem") p1 = compute_pattern_id("track.mid", blob_fp, sem_fp) p2 = compute_pattern_id("drums.mid", blob_fp, sem_fp) assert p1 != p2 def test_blob_changes_id(self) -> None: # When blob_fp == semantic_fp (no plugin), blob content drives the pattern ID. fpA = fake_id("blobA") fpB = fake_id("blobB") p1 = compute_pattern_id("track.mid", fpA, fpA) p2 = compute_pattern_id("track.mid", fpB, fpB) assert p1 != p2 def test_64_hex_output(self) -> None: pid = compute_pattern_id("f.py", fake_id("b"), fake_id("s")) assert pid.startswith("sha256:") assert len(pid) == 71 class TestComputeResolutionId: """I: compute_resolution_id is deterministic and encodes actor.""" def test_deterministic(self) -> None: prov = AgentProvenance.agent("claude-code", "claude-sonnet-4-6") ts = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc) pid = fake_id("pattern") ob = fake_id("outcome") r1 = compute_resolution_id(pid, ob, ResolutionStrategy.MANUAL, prov, ts) r2 = compute_resolution_id(pid, ob, ResolutionStrategy.MANUAL, prov, ts) assert r1 == r2 def test_different_agents_differ(self) -> None: ts = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc) pid = fake_id("pattern") ob = fake_id("outcome") p1 = AgentProvenance.agent("claude-code") p2 = AgentProvenance.agent("codex") r1 = compute_resolution_id(pid, ob, ResolutionStrategy.MANUAL, p1, ts) r2 = compute_resolution_id(pid, ob, ResolutionStrategy.MANUAL, p2, ts) assert r1 != r2 def test_human_provenance_encodes_as_human(self) -> None: ts = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc) pid = fake_id("pattern") ob = fake_id("outcome") rid = compute_resolution_id(pid, ob, ResolutionStrategy.MANUAL, AgentProvenance.human(), ts) assert rid.startswith("sha256:") assert len(rid) == 71 class TestValidation: """I: _validate_id and _validate_policy_id must reject bad inputs.""" def test_validate_id_accepts_64_hex(self) -> None: h._validate_id(fake_id("a")) # no exception def test_validate_id_rejects_63_chars(self) -> None: with pytest.raises(ValueError): h._validate_id("a" * 63) def test_validate_id_rejects_65_chars(self) -> None: with pytest.raises(ValueError): h._validate_id("a" * 65) def test_validate_id_rejects_uppercase(self) -> None: with pytest.raises(ValueError): h._validate_id("A" * 64) def test_validate_id_rejects_path_traversal(self) -> None: # Attempt to inject a path traversal via the ID with pytest.raises(ValueError): h._validate_id(f"../../../etc/passwd{'a' * 45}") def test_validate_id_rejects_empty(self) -> None: with pytest.raises(ValueError): h._validate_id("") def test_validate_policy_id_accepts_alphanumeric(self) -> None: h._validate_policy_id("my-policy_123") # no exception def test_validate_policy_id_rejects_slash(self) -> None: with pytest.raises(ValueError, match="alphanumeric"): h._validate_policy_id("bad/policy") def test_validate_policy_id_rejects_dot(self) -> None: with pytest.raises(ValueError): h._validate_policy_id("bad.policy") def test_validate_policy_id_rejects_empty(self) -> None: with pytest.raises(ValueError): h._validate_policy_id("") def test_validate_policy_id_rejects_129_chars(self) -> None: with pytest.raises(ValueError): h._validate_policy_id("a" * 129) def test_validate_policy_id_accepts_128_chars(self) -> None: h._validate_policy_id("a" * 128) # no exception class TestNamespaces: """I: Open string-constant namespaces are plain strings — plugin extensibility.""" def test_conflict_type_are_strings(self) -> None: assert isinstance(ConflictType.CONTENT, str) assert isinstance(ConflictType.STRUCTURAL, str) assert isinstance(ConflictType.METADATA, str) assert isinstance(ConflictType.RELATIONAL, str) assert isinstance(ConflictType.UNKNOWN, str) def test_resolution_strategy_are_strings(self) -> None: assert isinstance(ResolutionStrategy.POLICY, str) assert isinstance(ResolutionStrategy.EXACT_REPLAY, str) assert isinstance(ResolutionStrategy.SEMANTIC_PROPOSAL, str) assert isinstance(ResolutionStrategy.MANUAL, str) def test_policy_action_are_strings(self) -> None: for attr in ("PREFER_OURS", "PREFER_THEIRS", "ESCALATE", "REQUIRE_HUMAN", "DELEGATE"): assert isinstance(getattr(PolicyAction, attr), str) def test_policy_scope_are_strings(self) -> None: for attr in ("WORKSPACE", "REPO", "DOMAIN", "FILE"): assert isinstance(getattr(PolicyScope, attr), str) def test_audit_event_type_are_strings(self) -> None: for attr in ( "PATTERN_RECORDED", "RESOLUTION_SAVED", "RESOLUTION_APPLIED", "PATTERN_FORGOTTEN", "POLICY_SAVED", "POLICY_REMOVED", "GC_RUN", "CLEAR_RUN", ): assert isinstance(getattr(AuditEventType, attr), str) def test_plugin_can_use_custom_conflict_type(self) -> None: # Plugins may add strings at runtime — no import needed custom_type = "note_collision" pattern = _make_pattern(conflict_type=custom_type) assert pattern.conflict_type == custom_type class TestDataclasses: """I: Frozen dataclasses, AgentProvenance constructors, PolicyCondition wildcards.""" def test_agent_provenance_human(self) -> None: p = AgentProvenance.human() assert p.type == "human" assert p.agent_id is None assert p.model_id is None def test_agent_provenance_agent(self) -> None: p = AgentProvenance.agent("claude-code", "claude-sonnet-4-6") assert p.type == "agent" assert p.agent_id == "claude-code" assert p.model_id == "claude-sonnet-4-6" def test_agent_provenance_agent_no_model(self) -> None: p = AgentProvenance.agent("codex") assert p.model_id is None def test_agent_provenance_frozen(self) -> None: p = AgentProvenance.human() with pytest.raises(FrozenInstanceError): p.type = "agent" # type: ignore[misc] def test_policy_condition_all_none_wildcard(self) -> None: cond = PolicyCondition() assert cond.conflict_type is None assert cond.domain is None assert cond.path_pattern is None assert cond.min_confidence is None def test_policy_condition_frozen(self) -> None: cond = PolicyCondition(conflict_type=ConflictType.CONTENT) with pytest.raises(FrozenInstanceError): cond.conflict_type = ConflictType.STRUCTURAL # type: ignore[misc] def test_conflict_pattern_frozen(self) -> None: p = _make_pattern() with pytest.raises(FrozenInstanceError): p.path = "malicious.mid" # type: ignore[misc] def test_resolution_frozen(self) -> None: pattern = _make_pattern() res = _make_resolution(pattern) with pytest.raises(FrozenInstanceError): res.confidence = 0.0 # type: ignore[misc] def test_resolution_proposal_defaults(self) -> None: prop = ResolutionProposal( pattern_id=fake_id("p"), strategy=ResolutionStrategy.SEMANTIC_PROPOSAL, proposed_action=PolicyAction.PREFER_OURS, confidence=0.7, rationale="fuzzy match", ) assert prop.policy_id is None assert prop.similar_pattern_id is None assert prop.similarity is None assert prop.requires_confirmation is False class TestConditionMatching: """I: _condition_matches and match_policy first-match-wins semantics.""" def _pattern(self, **kwargs: str | int | float | bool | None) -> ConflictPattern: return _make_pattern(**kwargs) def test_all_none_matches_anything(self) -> None: cond = PolicyCondition() assert _condition_matches(cond, self._pattern()) is True def test_conflict_type_match(self) -> None: cond = PolicyCondition(conflict_type=ConflictType.CONTENT) assert _condition_matches(cond, self._pattern(conflict_type=ConflictType.CONTENT)) is True assert _condition_matches(cond, self._pattern(conflict_type=ConflictType.METADATA)) is False def test_domain_match(self) -> None: cond = PolicyCondition(domain="midi") assert _condition_matches(cond, self._pattern(domain="midi")) is True assert _condition_matches(cond, self._pattern(domain="code")) is False def test_path_pattern_glob(self) -> None: cond = PolicyCondition(path_pattern="*.mid") assert _condition_matches(cond, self._pattern(path="track.mid")) is True assert _condition_matches(cond, self._pattern(path="src/main.py")) is False def test_path_pattern_prefix_glob(self) -> None: cond = PolicyCondition(path_pattern="audio/*") assert _condition_matches(cond, self._pattern(path="audio/kick.mid")) is True assert _condition_matches(cond, self._pattern(path="video/clip.mp4")) is False def test_all_conditions_must_match(self) -> None: cond = PolicyCondition(conflict_type=ConflictType.CONTENT, domain="midi") matching = self._pattern(conflict_type=ConflictType.CONTENT, domain="midi") wrong_domain = self._pattern(conflict_type=ConflictType.CONTENT, domain="code") wrong_type = self._pattern(conflict_type=ConflictType.METADATA, domain="midi") assert _condition_matches(cond, matching) is True assert _condition_matches(cond, wrong_domain) is False assert _condition_matches(cond, wrong_type) is False def test_match_policy_first_match_wins(self) -> None: policy_a = _make_policy("policy-a", scope=PolicyScope.WORKSPACE, conflict_type=ConflictType.CONTENT) policy_b = _make_policy("policy-b", scope=PolicyScope.REPO, conflict_type=ConflictType.CONTENT) pattern = self._pattern(conflict_type=ConflictType.CONTENT) result = match_policy([policy_a, policy_b], pattern) assert result is not None assert result.policy_id == "policy-a" def test_match_policy_no_match_returns_none(self) -> None: policy = _make_policy("p", conflict_type=ConflictType.STRUCTURAL) pattern = self._pattern(conflict_type=ConflictType.CONTENT) assert match_policy([policy], pattern) is None def test_match_policy_empty_list(self) -> None: pattern = self._pattern() assert match_policy([], pattern) is None def test_min_confidence_not_evaluated_here(self) -> None: # min_confidence is an engine-level filter, not evaluated by _condition_matches cond = PolicyCondition(min_confidence=0.99) # Should still match since no other fields constrain the pattern assert _condition_matches(cond, self._pattern()) is True # =========================================================================== # Tier II — Integration tests # =========================================================================== class TestPatternCRUD: """II: record_pattern, load_pattern, list_patterns, forget_pattern, clear_all.""" def test_record_and_load(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) loaded = load_pattern(repo, pattern.pattern_id) assert loaded is not None assert loaded.pattern_id == pattern.pattern_id assert loaded.path == pattern.path assert loaded.domain == pattern.domain assert loaded.conflict_type == pattern.conflict_type def test_record_is_idempotent(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) # Second call should not raise and should return same ID pid = record_pattern(repo, pattern) assert pid == pattern.pattern_id # Only one pattern.json should exist entry_dir = h.pattern_dir(repo, pattern.pattern_id) assert list(entry_dir.glob("pattern.json")) == [entry_dir / "pattern.json"] def test_load_nonexistent_returns_none(self, repo: pathlib.Path) -> None: assert load_pattern(repo, "a" * 64) is None def test_load_invalid_id_returns_none(self, repo: pathlib.Path) -> None: assert load_pattern(repo, "not-a-hex-id") is None def test_list_patterns_empty(self, repo: pathlib.Path) -> None: assert list_patterns(repo) == [] def test_list_patterns_multiple(self, repo: pathlib.Path) -> None: p1 = _make_pattern(path="a.mid", ours="oa", theirs="ta") p2 = _make_pattern(path="b.mid", ours="ob", theirs="tb") record_pattern(repo, p1) record_pattern(repo, p2) results = list_patterns(repo) assert len(results) == 2 pids = {r.pattern_id for r in results} assert {p1.pattern_id, p2.pattern_id} == pids def test_list_patterns_sorted_newest_first(self, repo: pathlib.Path) -> None: older = _make_pattern(path="old.mid", ours="oa", theirs="ta") newer = _make_pattern(path="new.mid", ours="ob", theirs="tb") # Force newer to be newer by manipulating recorded_at import dataclasses ts_old = datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc) ts_new = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc) older = dataclasses.replace(older, recorded_at=ts_old) newer = dataclasses.replace(newer, recorded_at=ts_new) record_pattern(repo, older) record_pattern(repo, newer) results = list_patterns(repo) assert results[0].pattern_id == newer.pattern_id def test_forget_pattern_removes_entry(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) assert forget_pattern(repo, pattern.pattern_id) is True assert load_pattern(repo, pattern.pattern_id) is None def test_forget_nonexistent_returns_false(self, repo: pathlib.Path) -> None: assert forget_pattern(repo, "a" * 64) is False def test_forget_invalid_id_returns_false(self, repo: pathlib.Path) -> None: assert forget_pattern(repo, "../traversal") is False def test_forget_also_removes_resolutions(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) res = _make_resolution(pattern) save_resolution(repo, res) forget_pattern(repo, pattern.pattern_id) # Resolution directory should be gone res_dir = h._resolutions_dir(repo, pattern.pattern_id) assert not res_dir.exists() def test_clear_all_removes_all(self, repo: pathlib.Path) -> None: for i in range(5): record_pattern(repo, _make_pattern(path=f"f{i}.mid", ours=f"o{i}", theirs=f"t{i}")) removed = clear_all(repo) assert removed == 5 assert list_patterns(repo) == [] def test_clear_all_empty_store(self, repo: pathlib.Path) -> None: assert clear_all(repo) == 0 def test_record_pattern_invalid_id_raises(self, repo: pathlib.Path) -> None: import dataclasses pattern = _make_pattern() bad = dataclasses.replace(pattern, pattern_id="bad-id") with pytest.raises(ValueError): record_pattern(repo, bad) class TestResolutionCRUD: """II: save_resolution, load_resolution, list_resolutions, increment_applied_count, best_resolution.""" def test_save_and_load(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) res = _make_resolution(pattern) save_resolution(repo, res) loaded = load_resolution(repo, pattern.pattern_id, res.resolution_id) assert loaded is not None assert loaded.resolution_id == res.resolution_id assert loaded.pattern_id == pattern.pattern_id assert loaded.strategy == res.strategy assert loaded.confidence == res.confidence def test_save_is_idempotent(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) res = _make_resolution(pattern) save_resolution(repo, res) save_resolution(repo, res) # second call is no-op assert len(list_resolutions(repo, pattern.pattern_id)) == 1 def test_save_requires_parent_pattern(self, repo: pathlib.Path) -> None: pattern = _make_pattern() res = _make_resolution(pattern) with pytest.raises(FileNotFoundError, match="No harmony pattern"): save_resolution(repo, res) def test_load_nonexistent_returns_none(self, repo: pathlib.Path) -> None: assert load_resolution(repo, "a" * 64, "b" * 64) is None def test_load_invalid_ids_return_none(self, repo: pathlib.Path) -> None: assert load_resolution(repo, "bad", "b" * 64) is None assert load_resolution(repo, "a" * 64, "bad") is None def test_list_resolutions_empty(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) assert list_resolutions(repo, pattern.pattern_id) == [] def test_list_resolutions_sorted_by_quality(self, repo: pathlib.Path) -> None: """human_verified > confidence > applied_count (desc).""" pattern = _make_pattern() record_pattern(repo, pattern) low_conf = _make_resolution(pattern, confidence=0.3) high_conf = _make_resolution(pattern, confidence=0.9) verified = _make_resolution(pattern, confidence=0.5, human_verified=True) # Build distinct resolutions (different outcomes) import dataclasses low_conf = dataclasses.replace( low_conf, outcome_blob=fake_id("low_outcome"), resolution_id=fake_id("low_res"), ) high_conf = dataclasses.replace( high_conf, outcome_blob=fake_id("high_outcome"), resolution_id=fake_id("high_res"), ) verified = dataclasses.replace( verified, outcome_blob=fake_id("ver_outcome"), resolution_id=fake_id("ver_res"), ) for r in (low_conf, high_conf, verified): save_resolution(repo, r) results = list_resolutions(repo, pattern.pattern_id) assert results[0].resolution_id == verified.resolution_id # human_verified first assert results[-1].resolution_id == low_conf.resolution_id # lowest confidence last def test_increment_applied_count(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) res = _make_resolution(pattern) save_resolution(repo, res) assert increment_applied_count(repo, pattern.pattern_id, res.resolution_id) is True loaded = load_resolution(repo, pattern.pattern_id, res.resolution_id) assert loaded is not None assert loaded.applied_count == 1 def test_increment_applied_count_multiple_times(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) res = _make_resolution(pattern) save_resolution(repo, res) for _ in range(5): increment_applied_count(repo, pattern.pattern_id, res.resolution_id) loaded = load_resolution(repo, pattern.pattern_id, res.resolution_id) assert loaded is not None assert loaded.applied_count == 5 def test_increment_nonexistent_returns_false(self, repo: pathlib.Path) -> None: assert increment_applied_count(repo, "a" * 64, "b" * 64) is False def test_best_resolution_returns_highest_quality(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) import dataclasses r1 = _make_resolution(pattern, confidence=0.5) r2 = _make_resolution(pattern, confidence=0.9) r1 = dataclasses.replace(r1, outcome_blob=fake_id("r1ob"), resolution_id=fake_id("r1id")) r2 = dataclasses.replace(r2, outcome_blob=fake_id("r2ob"), resolution_id=fake_id("r2id")) save_resolution(repo, r1) save_resolution(repo, r2) best = best_resolution(repo, pattern.pattern_id) assert best is not None assert best.resolution_id == r2.resolution_id def test_best_resolution_none_when_no_resolutions(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) assert best_resolution(repo, pattern.pattern_id) is None class TestPolicyCRUD: """II: save_policy, load_policy, list_policies scope-sorted, remove_policy.""" def test_save_and_load(self, repo: pathlib.Path) -> None: policy = _make_policy() save_policy(repo, policy) loaded = load_policy(repo, policy.policy_id) assert loaded is not None assert loaded.policy_id == policy.policy_id assert loaded.action == policy.action assert loaded.scope == policy.scope def test_save_overwrites_existing(self, repo: pathlib.Path) -> None: policy = _make_policy(action=PolicyAction.PREFER_OURS) save_policy(repo, policy) import dataclasses updated = dataclasses.replace(policy, action=PolicyAction.PREFER_THEIRS) save_policy(repo, updated) loaded = load_policy(repo, policy.policy_id) assert loaded is not None assert loaded.action == PolicyAction.PREFER_THEIRS def test_load_nonexistent_returns_none(self, repo: pathlib.Path) -> None: assert load_policy(repo, "missing-policy") is None def test_load_invalid_id_returns_none(self, repo: pathlib.Path) -> None: assert load_policy(repo, "bad/policy/id") is None def test_list_policies_empty(self, repo: pathlib.Path) -> None: assert list_policies(repo) == [] def test_list_policies_scope_order(self, repo: pathlib.Path) -> None: """workspace → repo → domain → file regardless of insertion order.""" file_p = _make_policy("file-p", scope=PolicyScope.FILE) workspace_p = _make_policy("workspace-p", scope=PolicyScope.WORKSPACE) domain_p = _make_policy("domain-p", scope=PolicyScope.DOMAIN) repo_p = _make_policy("repo-p", scope=PolicyScope.REPO) for p in (file_p, workspace_p, domain_p, repo_p): save_policy(repo, p) results = list_policies(repo) scopes = [r.scope for r in results] assert scopes.index(PolicyScope.WORKSPACE) < scopes.index(PolicyScope.REPO) assert scopes.index(PolicyScope.REPO) < scopes.index(PolicyScope.DOMAIN) assert scopes.index(PolicyScope.DOMAIN) < scopes.index(PolicyScope.FILE) def test_remove_policy_returns_true(self, repo: pathlib.Path) -> None: policy = _make_policy() save_policy(repo, policy) assert remove_policy(repo, policy.policy_id) is True assert load_policy(repo, policy.policy_id) is None def test_remove_nonexistent_returns_false(self, repo: pathlib.Path) -> None: assert remove_policy(repo, "no-such-policy") is False def test_remove_invalid_id_returns_false(self, repo: pathlib.Path) -> None: assert remove_policy(repo, "bad/id") is False def test_save_invalid_id_raises(self, repo: pathlib.Path) -> None: import dataclasses policy = _make_policy() bad = dataclasses.replace(policy, policy_id="bad/id") with pytest.raises(ValueError): save_policy(repo, bad) def test_condition_round_trips(self, repo: pathlib.Path) -> None: policy = _make_policy(conflict_type=ConflictType.CONTENT, domain="midi", path_pattern="*.mid") save_policy(repo, policy) loaded = load_policy(repo, policy.policy_id) assert loaded is not None assert loaded.when.conflict_type == ConflictType.CONTENT assert loaded.when.domain == "midi" assert loaded.when.path_pattern == "*.mid" class TestAuditLog: """II: append_audit, list_audit sorted newest-first.""" def test_append_and_list(self, repo: pathlib.Path) -> None: actor = AgentProvenance.agent("claude-code") append_audit(repo, AuditEventType.PATTERN_RECORDED, actor, pattern_id="a" * 64) entries = list_audit(repo) assert len(entries) == 1 assert entries[0]["event_type"] == AuditEventType.PATTERN_RECORDED def test_entries_sorted_newest_first(self, repo: pathlib.Path) -> None: actor = AgentProvenance.human() for i in range(3): append_audit(repo, AuditEventType.GC_RUN, actor, metadata={"i": i}) time.sleep(0.01) # slight delay so filenames differ entries = list_audit(repo) assert len(entries) == 3 # Filenames encode date+content-id — sorted descending means newest at [0] names_in_dir = sorted( (f.name for f in h.audit_dir(repo).iterdir()), reverse=True, ) # audit_id is "sha256:"; filename embeds 12 hex chars starting at index 7 assert entries[0]["audit_id"][7:19] in names_in_dir[0] def test_list_audit_empty(self, repo: pathlib.Path) -> None: assert list_audit(repo) == [] def test_limit_respected(self, repo: pathlib.Path) -> None: actor = AgentProvenance.human() for _ in range(10): append_audit(repo, AuditEventType.GC_RUN, actor) entries = list_audit(repo, limit=3) assert len(entries) == 3 def test_audit_fields_present(self, repo: pathlib.Path) -> None: actor = AgentProvenance.agent("claude-code", "claude-sonnet-4-6") append_audit( repo, AuditEventType.RESOLUTION_SAVED, actor, pattern_id="a" * 64, resolution_id="b" * 64, metadata={"extra": "data"}, ) entry = list_audit(repo)[0] assert entry["event_type"] == AuditEventType.RESOLUTION_SAVED assert entry["pattern_id"] == "a" * 64 assert entry["resolution_id"] == "b" * 64 assert entry["acted_by"]["agent_id"] == "claude-code" assert entry["metadata"]["extra"] == "data" assert "audit_id" in entry assert "occurred_at" in entry class TestGcStale: """II: gc_stale keeps resolved patterns and removes old unresolved ones.""" def test_gc_removes_old_unresolved(self, repo: pathlib.Path) -> None: import dataclasses old_pattern = _make_pattern() old_ts = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc) old_pattern = dataclasses.replace(old_pattern, recorded_at=old_ts) record_pattern(repo, old_pattern) removed = gc_stale(repo, age_days=1) assert removed == 1 assert load_pattern(repo, old_pattern.pattern_id) is None def test_gc_keeps_resolved_pattern(self, repo: pathlib.Path) -> None: import dataclasses pattern = _make_pattern() old_ts = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc) pattern = dataclasses.replace(pattern, recorded_at=old_ts) record_pattern(repo, pattern) res = _make_resolution(pattern) save_resolution(repo, res) removed = gc_stale(repo, age_days=1) assert removed == 0 assert load_pattern(repo, pattern.pattern_id) is not None def test_gc_keeps_recent_unresolved(self, repo: pathlib.Path) -> None: pattern = _make_pattern() # recorded_at = now record_pattern(repo, pattern) removed = gc_stale(repo, age_days=90) assert removed == 0 def test_gc_empty_store(self, repo: pathlib.Path) -> None: assert gc_stale(repo, age_days=1) == 0 # =========================================================================== # Tier III — End-to-end lifecycle # =========================================================================== class TestFullLifecycle: """III: record → save_resolution → best_resolution → increment → gc won't touch it.""" def test_complete_lifecycle(self, repo: pathlib.Path) -> None: # 1. Record the conflict pattern pattern = _make_pattern( path="tracks/lead.mid", domain="midi", conflict_type=ConflictType.CONTENT, ) pid = record_pattern(repo, pattern) assert pid == pattern.pattern_id assert load_pattern(repo, pid) is not None # 2. Save a resolution res = _make_resolution(pattern, strategy=ResolutionStrategy.MANUAL, confidence=0.85) save_resolution(repo, res) # 3. Retrieve best resolution best = best_resolution(repo, pid) assert best is not None assert best.resolution_id == res.resolution_id assert best.confidence == pytest.approx(0.85) # 4. Replay — increment applied count increment_applied_count(repo, pid, res.resolution_id) increment_applied_count(repo, pid, res.resolution_id) reloaded = load_resolution(repo, pid, res.resolution_id) assert reloaded is not None assert reloaded.applied_count == 2 # 5. GC should NOT remove — has a resolution import dataclasses old_ts = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc) pattern_old = dataclasses.replace(pattern, recorded_at=old_ts) # Re-record is idempotent so we can't update recorded_at via record_pattern. # Write updated pattern.json directly for this edge-case test. entry_p = h.pattern_dir(repo, pid) / "pattern.json" entry_p.write_text( json.dumps(h._pattern_to_dict(pattern_old), indent=2), encoding="utf-8" ) gc_count = gc_stale(repo, age_days=1) assert gc_count == 0 # Protected because it has a resolution assert load_pattern(repo, pid) is not None def test_policy_fires_on_matching_pattern(self, repo: pathlib.Path) -> None: policy = _make_policy( "midi-prefer-ours", scope=PolicyScope.DOMAIN, action=PolicyAction.PREFER_OURS, domain="midi", ) save_policy(repo, policy) pattern = _make_pattern(domain="midi", conflict_type=ConflictType.CONTENT) record_pattern(repo, pattern) policies = list_policies(repo) matched = match_policy(policies, pattern) assert matched is not None assert matched.policy_id == "midi-prefer-ours" assert matched.action == PolicyAction.PREFER_OURS def test_policy_does_not_fire_wrong_domain(self, repo: pathlib.Path) -> None: policy = _make_policy("midi-only", domain="midi") save_policy(repo, policy) pattern = _make_pattern(domain="code") record_pattern(repo, pattern) matched = match_policy(list_policies(repo), pattern) assert matched is None def test_audit_trail_through_lifecycle(self, repo: pathlib.Path) -> None: actor = AgentProvenance.agent("claude-code", "claude-sonnet-4-6") pattern = _make_pattern() pid = record_pattern(repo, pattern) append_audit(repo, AuditEventType.PATTERN_RECORDED, actor, pattern_id=pid) res = _make_resolution(pattern) save_resolution(repo, res) append_audit( repo, AuditEventType.RESOLUTION_SAVED, actor, pattern_id=pid, resolution_id=res.resolution_id, ) entries = list_audit(repo) event_types = [e["event_type"] for e in entries] assert AuditEventType.RESOLUTION_SAVED in event_types assert AuditEventType.PATTERN_RECORDED in event_types # =========================================================================== # Tier IV — Stress tests # =========================================================================== class TestStress: """IV: 10k pattern scan, concurrent record_pattern, concurrent save_resolution.""" def test_100_patterns_scan(self, repo: pathlib.Path) -> None: """Store 100 patterns and verify list_patterns returns all of them.""" n = 100 for i in range(n): record_pattern(repo, _make_pattern(path=f"f{i}.mid", ours=f"o{i}", theirs=f"t{i}")) results = list_patterns(repo) assert len(results) == n def test_concurrent_record_pattern_no_corruption(self, repo: pathlib.Path) -> None: """Concurrent record_pattern from 20 threads — all patterns must be loadable.""" patterns = [ _make_pattern(path=f"concurrent{i}.mid", ours=f"co{i}", theirs=f"ct{i}") for i in range(20) ] errors: list[Exception] = [] def worker(p: ConflictPattern) -> None: try: record_pattern(repo, p) except Exception as exc: errors.append(exc) with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: futures = [executor.submit(worker, p) for p in patterns] concurrent.futures.wait(futures) assert errors == [], f"Thread errors: {errors}" for p in patterns: assert load_pattern(repo, p.pattern_id) is not None def test_concurrent_increment_applied_count(self, repo: pathlib.Path) -> None: """20 concurrent increments — none must crash, final count must be ≥ 1. ``increment_applied_count`` is a read-modify-write cycle; ``os.replace`` makes each individual write atomic but does not serialise the full cycle. Under heavy concurrency, updates may be lost (last writer wins). The guarantee is: no exception, file always valid, count always ≥ 1. """ pattern = _make_pattern() record_pattern(repo, pattern) res = _make_resolution(pattern) save_resolution(repo, res) lock = threading.Lock() errors: list[Exception] = [] def worker() -> None: try: increment_applied_count(repo, pattern.pattern_id, res.resolution_id) except Exception as exc: with lock: errors.append(exc) threads = [threading.Thread(target=worker) for _ in range(20)] for t in threads: t.start() for t in threads: t.join() assert errors == [], f"Thread errors: {errors}" loaded = load_resolution(repo, pattern.pattern_id, res.resolution_id) assert loaded is not None # At least one increment must have landed; file must be valid JSON assert loaded.applied_count >= 1 def test_list_patterns_scan_cap_does_not_crash( self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """If _MAX_SCAN is set to 5, list_patterns truncates rather than crashing.""" import muse.core.harmony.patterns as _hpat monkeypatch.setattr(h, "_MAX_SCAN", 5) monkeypatch.setattr(_hpat, "_MAX_SCAN", 5) for i in range(10): record_pattern(repo, _make_pattern(path=f"s{i}.mid", ours=f"o{i}", theirs=f"t{i}")) results = list_patterns(repo) assert len(results) <= 5 # =========================================================================== # Tier V — Data integrity # =========================================================================== class TestDataIntegrity: """V: atomic writes (no temp files left), JSON round-trip, field type preservation.""" def test_no_temp_files_after_record_pattern(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) entry_dir = h.pattern_dir(repo, pattern.pattern_id) tmp_files = list(entry_dir.glob(".harmony-tmp-*")) assert tmp_files == [] def test_no_temp_files_after_save_resolution(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) res = _make_resolution(pattern) save_resolution(repo, res) res_dir = h._resolutions_dir(repo, pattern.pattern_id) tmp_files = list(res_dir.glob(".harmony-tmp-*")) assert tmp_files == [] def test_no_temp_files_after_save_policy(self, repo: pathlib.Path) -> None: policy = _make_policy() save_policy(repo, policy) tmp_files = list(h.policies_dir(repo).glob(".harmony-tmp-*")) assert tmp_files == [] def test_pattern_json_round_trip(self, repo: pathlib.Path) -> None: pattern = _make_pattern( path="round/trip.mid", domain="midi", conflict_type=ConflictType.STRUCTURAL, description={"beats": 4, "key": "Cmaj"}, ) record_pattern(repo, pattern) loaded = load_pattern(repo, pattern.pattern_id) assert loaded is not None assert loaded.path == "round/trip.mid" assert loaded.domain == "midi" assert loaded.conflict_type == ConflictType.STRUCTURAL assert loaded.description == {"beats": 4, "key": "Cmaj"} assert loaded.ours_id == pattern.ours_id assert loaded.theirs_id == pattern.theirs_id def test_resolution_json_round_trip(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) prov = AgentProvenance.agent("claude-code", "claude-sonnet-4-6") res = _make_resolution(pattern, provenance=prov, confidence=0.77, human_verified=True) save_resolution(repo, res) loaded = load_resolution(repo, pattern.pattern_id, res.resolution_id) assert loaded is not None assert loaded.confidence == pytest.approx(0.77) assert loaded.human_verified is True assert loaded.resolved_by.type == "agent" assert loaded.resolved_by.agent_id == "claude-code" assert loaded.resolved_by.model_id == "claude-sonnet-4-6" def test_policy_json_round_trip(self, repo: pathlib.Path) -> None: policy = _make_policy( "round-trip-policy", scope=PolicyScope.WORKSPACE, action=PolicyAction.ESCALATE, confidence=0.6, conflict_type=ConflictType.RELATIONAL, domain="code", path_pattern="src/**", ) save_policy(repo, policy) loaded = load_policy(repo, policy.policy_id) assert loaded is not None assert loaded.scope == PolicyScope.WORKSPACE assert loaded.action == PolicyAction.ESCALATE assert loaded.confidence == pytest.approx(0.6) assert loaded.when.conflict_type == ConflictType.RELATIONAL assert loaded.when.domain == "code" assert loaded.when.path_pattern == "src/**" def test_agent_provenance_round_trip(self) -> None: p = AgentProvenance.agent("claude-code", "claude-sonnet-4-6") d = p.to_dict() restored = AgentProvenance.from_dict(d) assert restored == p def test_agent_provenance_human_round_trip(self) -> None: p = AgentProvenance.human() restored = AgentProvenance.from_dict(p.to_dict()) assert restored == p def test_recorded_at_is_utc_aware(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) loaded = load_pattern(repo, pattern.pattern_id) assert loaded is not None assert loaded.recorded_at.tzinfo is not None def test_resolved_at_is_utc_aware(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) res = _make_resolution(pattern) save_resolution(repo, res) loaded = load_resolution(repo, pattern.pattern_id, res.resolution_id) assert loaded is not None assert loaded.resolved_at.tzinfo is not None def test_applied_count_starts_at_zero(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) res = _make_resolution(pattern) save_resolution(repo, res) loaded = load_resolution(repo, pattern.pattern_id, res.resolution_id) assert loaded is not None assert loaded.applied_count == 0 # =========================================================================== # Tier VI — Security # =========================================================================== class TestSecurity: """VI: path traversal, symlink guards, size caps, crafted policy_id.""" def test_path_traversal_in_pattern_id_rejected(self, repo: pathlib.Path) -> None: with pytest.raises(ValueError): h._validate_id("../../../etc/passwd") def test_path_traversal_in_load_pattern(self, repo: pathlib.Path) -> None: result = load_pattern(repo, "../traversal") assert result is None def test_path_traversal_in_load_resolution(self, repo: pathlib.Path) -> None: result = load_resolution(repo, "a" * 64, "../traversal") assert result is None def test_path_traversal_in_forget_pattern(self, repo: pathlib.Path) -> None: result = forget_pattern(repo, "../traversal/../../../../../etc") assert result is False def test_crafted_policy_id_with_slash(self, repo: pathlib.Path) -> None: result = load_policy(repo, "../../etc/passwd") assert result is None def test_crafted_policy_id_with_null_byte(self, repo: pathlib.Path) -> None: with pytest.raises(ValueError): h._validate_policy_id("policy\x00id") def test_symlinks_in_patterns_dir_skipped(self, repo: pathlib.Path) -> None: pdir = h.patterns_dir(repo) pdir.mkdir(parents=True, exist_ok=True) # Create a symlink in the patterns dir link = pdir / ("s" * 64) target = repo / "other" target.mkdir() link.symlink_to(target) results = list_patterns(repo) assert results == [] # symlink skipped def test_symlinks_in_policies_dir_skipped(self, repo: pathlib.Path) -> None: poldir = h.policies_dir(repo) poldir.mkdir(parents=True, exist_ok=True) link = poldir / "linked-policy.json" target = repo / "victim.json" target.write_text('{"malicious": true}') link.symlink_to(target) results = list_policies(repo) assert results == [] def test_oversized_pattern_file_rejected(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) meta_p = h.pattern_dir(repo, pattern.pattern_id) / "pattern.json" # Overwrite with a file exceeding cap meta_p.write_bytes(b"x" * (_MAX_PATTERN_BYTES + 1)) result = load_pattern(repo, pattern.pattern_id) assert result is None def test_oversized_resolution_file_rejected(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) res = _make_resolution(pattern) save_resolution(repo, res) dest = h._resolution_path(repo, pattern.pattern_id, res.resolution_id) dest.write_bytes(b"y" * (_MAX_RESOLUTION_BYTES + 1)) result = load_resolution(repo, pattern.pattern_id, res.resolution_id) assert result is None def test_oversized_policy_file_rejected(self, repo: pathlib.Path) -> None: policy = _make_policy() save_policy(repo, policy) dest = h.policies_dir(repo) / f"{policy.policy_id}.json" dest.write_bytes(b"z" * (_MAX_POLICY_BYTES + 1)) result = load_policy(repo, policy.policy_id) assert result is None def test_malformed_json_pattern_returns_none(self, repo: pathlib.Path) -> None: pid = "a" * 64 entry = h.pattern_dir(repo, pid) entry.mkdir(parents=True) (entry / "pattern.json").write_text("not json", encoding="utf-8") assert load_pattern(repo, pid) is None def test_malformed_json_resolution_returns_none(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) rid = "b" * 64 dest = h.resolution_path(repo, pattern.pattern_id, rid) dest.parent.mkdir(parents=True, exist_ok=True) dest.write_text("{{bad", encoding="utf-8") assert load_resolution(repo, pattern.pattern_id, rid) is None def test_non_hex_dir_in_patterns_dir_skipped(self, repo: pathlib.Path) -> None: pdir = h.patterns_dir(repo) pdir.mkdir(parents=True, exist_ok=True) (pdir / "not-a-valid-id").mkdir() assert list_patterns(repo) == [] # =========================================================================== # Tier VII — Performance # =========================================================================== class TestPerformance: """VII: operation timing assertions — single operations must be fast.""" def test_record_pattern_under_50ms(self, repo: pathlib.Path) -> None: pattern = _make_pattern() start = time.monotonic() record_pattern(repo, pattern) elapsed = (time.monotonic() - start) * 1000 assert elapsed < 50, f"record_pattern took {elapsed:.1f}ms — expected <50ms" def test_load_pattern_under_10ms(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) start = time.monotonic() load_pattern(repo, pattern.pattern_id) elapsed = (time.monotonic() - start) * 1000 assert elapsed < 10, f"load_pattern took {elapsed:.1f}ms — expected <10ms" def test_save_resolution_under_50ms(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) res = _make_resolution(pattern) start = time.monotonic() save_resolution(repo, res) elapsed = (time.monotonic() - start) * 1000 assert elapsed < 50, f"save_resolution took {elapsed:.1f}ms — expected <50ms" def test_list_100_patterns_under_500ms(self, repo: pathlib.Path) -> None: n = 100 for i in range(n): record_pattern(repo, _make_pattern(path=f"perf{i}.mid", ours=f"o{i}", theirs=f"t{i}")) start = time.monotonic() results = list_patterns(repo) elapsed = (time.monotonic() - start) * 1000 assert len(results) == n assert elapsed < 500, f"list_patterns(100) took {elapsed:.1f}ms — expected <500ms" def test_save_and_load_policy_under_20ms(self, repo: pathlib.Path) -> None: policy = _make_policy() start = time.monotonic() save_policy(repo, policy) load_policy(repo, policy.policy_id) elapsed = (time.monotonic() - start) * 1000 assert elapsed < 20, f"save+load policy took {elapsed:.1f}ms — expected <20ms" def test_append_audit_under_20ms(self, repo: pathlib.Path) -> None: actor = AgentProvenance.human() start = time.monotonic() append_audit(repo, AuditEventType.GC_RUN, actor) elapsed = (time.monotonic() - start) * 1000 assert elapsed < 20, f"append_audit took {elapsed:.1f}ms — expected <20ms" def test_increment_applied_count_under_20ms(self, repo: pathlib.Path) -> None: pattern = _make_pattern() record_pattern(repo, pattern) res = _make_resolution(pattern) save_resolution(repo, res) start = time.monotonic() increment_applied_count(repo, pattern.pattern_id, res.resolution_id) elapsed = (time.monotonic() - start) * 1000 assert elapsed < 20, f"increment_applied_count took {elapsed:.1f}ms — expected <20ms"