"""Comprehensive TDD suite for muse.core.harmony. Every public API surface is exercised: fingerprinting, pattern CRUD, resolution CRUD, best_resolution ranking, GC, audit, policy management, policy matching, escalation lifecycle, auto_apply exact-replay, auto_apply semantic matching via HarmonyPlugin, record_resolutions, MergeState original_conflict_paths, path-traversal guards, and full end-to-end integration flows. Organisation ------------ Each TestXxx class covers one coherent API surface. Within each class, tests are ordered: happy path → edge cases → error/adversarial cases. """ from __future__ import annotations import datetime import pathlib import pytest import muse.core.harmony as h from muse.core.harmony import ( AgentProvenance, AuditEventType, ConflictPattern, ConflictType, EscalationRecord, EscalationStatus, Policy, PolicyAction, PolicyCondition, PolicyScope, Resolution, ResolutionStrategy, append_audit, auto_apply, best_resolution, blob_fingerprint, clear_all, compute_escalation_id, compute_pattern_id, compute_resolution_id, compute_semantic_fingerprint, forget_pattern, gc_stale, increment_applied_count, list_audit, list_escalations, list_patterns, list_policies, list_resolutions, load_escalation, load_pattern, load_policy, load_resolution, match_policy, record_escalation, record_pattern, record_resolutions, remove_policy, resolve_escalation, save_policy, save_resolution, ) from muse.core.merge_engine import read_merge_state, write_merge_state from muse.core.object_store import write_object from muse.core.types import Manifest, MsgpackDict, NULL_LONG_ID, blob_id, long_id from muse.domain import HarmonyPlugin, LiveState, StateDelta, StateSnapshot from muse.core.paths import muse_dir # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _hex64(seed: str) -> str: """Return a valid sha256: content-addressed ID derived from seed. Used as a cheap deterministic fingerprint in tests — the seed uniquely determines the ID, so tests can express 'same fingerprint' vs 'different fingerprint' without computing actual content hashes. """ return blob_id(seed.encode()) def _write_obj(root: pathlib.Path, content: bytes) -> str: oid = blob_id(content) write_object(root, oid, content) return oid def _now() -> datetime.datetime: return datetime.datetime.now(datetime.timezone.utc) def _make_pattern( root: pathlib.Path, *, path: str = "config.py", ours_content: bytes = b"a", theirs_content: bytes = b"b", domain: str = "code", ) -> ConflictPattern: ours_id = _write_obj(root, ours_content) theirs_id = _write_obj(root, theirs_content) blob_fp = blob_fingerprint(ours_id, theirs_id) pattern_id = compute_pattern_id(path, blob_fp, blob_fp) return ConflictPattern( pattern_id=pattern_id, path=path, domain=domain, conflict_type=ConflictType.CONTENT, blob_fingerprint=blob_fp, semantic_fingerprint=blob_fp, ours_id=ours_id, theirs_id=theirs_id, description={}, recorded_at=_now(), recorded_by="test", ) def _make_resolution( pattern: ConflictPattern, outcome_content: bytes, root: pathlib.Path, *, confidence: float = 1.0, human_verified: bool = True, applied_count: int = 0, strategy: str = ResolutionStrategy.MANUAL, ) -> Resolution: outcome_blob = _write_obj(root, outcome_content) now = _now() rid = compute_resolution_id( pattern.pattern_id, outcome_blob, strategy, AgentProvenance.human(), now ) return Resolution( resolution_id=rid, pattern_id=pattern.pattern_id, strategy=strategy, policy_id=None, outcome_blob=outcome_blob, resolved_by=AgentProvenance.human(), human_verified=human_verified, confidence=confidence, rationale="test", resolved_at=now, applied_count=applied_count, ) def _make_policy( policy_id: str = "test-policy", *, action: str = PolicyAction.PREFER_OURS, scope: str = PolicyScope.REPO, conflict_type: str | None = None, domain: str | None = None, path_pattern: str | None = None, confidence: float = 0.9, ) -> Policy: return Policy( policy_id=policy_id, description="test", when=PolicyCondition( conflict_type=conflict_type, domain=domain, path_pattern=path_pattern, ), action=action, confidence=confidence, escalate_to=None, delegate_to=None, scope=scope, created_at=_now(), created_by="test", ) type _StubResult = MsgpackDict class _FakePlugin: """Minimal MuseDomainPlugin — no HarmonyPlugin sub-protocol.""" name = "test" def schema(self) -> _StubResult: return {} def _noop(*args: str, **kwargs: str) -> _StubResult: """Stub for unused MuseDomainPlugin methods in test doubles.""" return {} class _SemanticPlugin: """HarmonyPlugin that returns a fixed semantic fingerprint. Implements all MuseDomainPlugin required methods so that isinstance(plugin, HarmonyPlugin) returns True at runtime. """ name = "semantic" def __init__(self, fixed_fp: str) -> None: self._fp = fixed_fp def schema(self) -> _StubResult: return {} def snapshot(self, live_state: LiveState) -> StateSnapshot: return _noop() # type: ignore[return-value] def diff(self, base: StateSnapshot, target: StateSnapshot, *, repo_root: pathlib.Path | None = None) -> _StubResult: return _noop() def merge(self, base: StateSnapshot, left: StateSnapshot, right: StateSnapshot, *, repo_root: pathlib.Path | None = None) -> _StubResult: return _noop() def apply(self, delta: StateDelta, live_state: LiveState) -> LiveState: return _noop() # type: ignore[return-value] def drift(self, *args: str, **kwargs: str) -> _StubResult: return _noop() def conflict_fingerprint( self, path: str, ours_id: str, theirs_id: str, repo_root: pathlib.Path, ) -> str: return self._fp class _ThrowingPlugin: """HarmonyPlugin that always raises during conflict_fingerprint.""" name = "throwing" def schema(self) -> _StubResult: return {} def snapshot(self, live_state: LiveState) -> StateSnapshot: return _noop() # type: ignore[return-value] def diff(self, base: StateSnapshot, target: StateSnapshot, *, repo_root: pathlib.Path | None = None) -> _StubResult: return _noop() def merge(self, base: StateSnapshot, left: StateSnapshot, right: StateSnapshot, *, repo_root: pathlib.Path | None = None) -> _StubResult: return _noop() def apply(self, delta: StateDelta, live_state: LiveState) -> LiveState: return _noop() # type: ignore[return-value] def drift(self, *args: str, **kwargs: str) -> _StubResult: return _noop() def conflict_fingerprint(self, path: str, ours_id: str, theirs_id: str, repo_root: pathlib.Path) -> str: raise RuntimeError("deliberate error") class _BadLengthPlugin: """HarmonyPlugin that returns a fingerprint of the wrong length.""" name = "bad" def schema(self) -> _StubResult: return {} def snapshot(self, live_state: LiveState) -> StateSnapshot: return _noop() # type: ignore[return-value] def diff(self, base: StateSnapshot, target: StateSnapshot, *, repo_root: pathlib.Path | None = None) -> _StubResult: return _noop() def merge(self, base: StateSnapshot, left: StateSnapshot, right: StateSnapshot, *, repo_root: pathlib.Path | None = None) -> _StubResult: return _noop() def apply(self, delta: StateDelta, live_state: LiveState) -> LiveState: return _noop() # type: ignore[return-value] def drift(self, *args: str, **kwargs: str) -> _StubResult: return _noop() def conflict_fingerprint(self, path: str, ours_id: str, theirs_id: str, repo_root: pathlib.Path) -> str: return "tooshort" @pytest.fixture() def repo(tmp_path: pathlib.Path) -> pathlib.Path: muse_dir(tmp_path).mkdir() return tmp_path # =========================================================================== # 1. blob_fingerprint # =========================================================================== class TestBlobFingerprint: def test_returns_sha256_id(self) -> None: ours = blob_id(b"x") theirs = blob_id(b"y") fp = blob_fingerprint(ours, theirs) assert fp.startswith("sha256:") assert len(fp) == 71 def test_commutative(self) -> None: a = blob_id(b"alpha") b = blob_id(b"beta") assert blob_fingerprint(a, b) == blob_fingerprint(b, a) def test_stable(self) -> None: a = blob_id(b"stable") b = blob_id(b"input") assert blob_fingerprint(a, b) == blob_fingerprint(a, b) def test_different_pairs_produce_different_fingerprints(self) -> None: a, b, c = blob_id(b"a"), blob_id(b"b"), blob_id(b"c") assert blob_fingerprint(a, b) != blob_fingerprint(a, c) assert blob_fingerprint(a, b) != blob_fingerprint(b, c) def test_same_id_both_sides(self) -> None: # Degenerate case: ours == theirs (no actual conflict) a = blob_id(b"same") fp = blob_fingerprint(a, a) assert len(fp) == 71 and fp.startswith("sha256:") # still produces valid output # =========================================================================== # 2. compute_pattern_id # =========================================================================== class TestComputePatternId: def test_returns_sha256_id(self) -> None: fp = blob_fingerprint(blob_id(b"a"), blob_id(b"b")) pid = compute_pattern_id("f.py", fp, fp) assert pid.startswith("sha256:") assert len(pid) == 71 def test_stable(self) -> None: assert ( compute_pattern_id("f.py", _hex64("a"), _hex64("b")) == compute_pattern_id("f.py", _hex64("a"), _hex64("b")) ) def test_path_sensitive(self) -> None: blob = _hex64("fp") sem = _hex64("sfp") assert compute_pattern_id("a.py", blob, sem) != compute_pattern_id("b.py", blob, sem) def test_blob_fp_sensitive_when_no_semantic_plugin(self) -> None: """When blob_fp == semantic_fp (no plugin), blob_fp determines identity.""" # Same blob_fp used for both slots (exact-replay mode) fp1 = _hex64("fp1") fp2 = _hex64("fp2") # When blob_fp == semantic_fp, the formula is f"{path}:{blob_fp}:{semantic_fp}" assert compute_pattern_id("f.py", fp1, fp1) != compute_pattern_id("f.py", fp2, fp2) def test_blob_fp_irrelevant_when_semantic_fp_differs(self) -> None: """When semantic_fp ≠ blob_fp, pattern_id depends only on semantic_fp and path. This is the semantic plugin model: two conflicts with different blob IDs but the same semantic fingerprint map to the same pattern, enabling cross-content replay. """ sem = _hex64("sfp") # Two different blob_fps with the same semantic_fp → same pattern_id pid1 = compute_pattern_id("f.py", _hex64("fp1"), sem) pid2 = compute_pattern_id("f.py", _hex64("fp2"), sem) assert pid1 == pid2, ( "When semantic_fp != blob_fp, pattern_id must depend only on " "semantic_fp — different blob pairs with the same semantic shape " "must share a pattern to enable cross-content replay" ) def test_semantic_fp_sensitive(self) -> None: blob = _hex64("bfp") assert ( compute_pattern_id("f.py", blob, _hex64("s1")) != compute_pattern_id("f.py", blob, _hex64("s2")) ) def test_symbol_path_distinct_from_file_path(self) -> None: blob = _hex64("fp") sem = _hex64("sfp") assert ( compute_pattern_id("config.py", blob, sem) != compute_pattern_id("config.py::MAX_CONNECTIONS", blob, sem) ) # =========================================================================== # 3. compute_semantic_fingerprint # =========================================================================== class TestComputeSemanticFingerprint: def test_no_plugin_returns_blob_fingerprint(self, repo: pathlib.Path) -> None: ours = blob_id(b"x") theirs = blob_id(b"y") result = compute_semantic_fingerprint("f.py", ours, theirs, _FakePlugin(), repo) assert result == blob_fingerprint(ours, theirs) def test_commutative_no_plugin(self, repo: pathlib.Path) -> None: ours = blob_id(b"x") theirs = blob_id(b"y") r1 = compute_semantic_fingerprint("f.py", ours, theirs, _FakePlugin(), repo) r2 = compute_semantic_fingerprint("f.py", theirs, ours, _FakePlugin(), repo) assert r1 == r2 def test_harmony_plugin_used_when_available(self, repo: pathlib.Path) -> None: fixed = _hex64("fixed-semantic") plugin = _SemanticPlugin(fixed) ours = blob_id(b"x") theirs = blob_id(b"y") result = compute_semantic_fingerprint("f.py", ours, theirs, plugin, repo) assert result == fixed def test_throwing_plugin_falls_back_to_blob(self, repo: pathlib.Path) -> None: ours = blob_id(b"x") theirs = blob_id(b"y") result = compute_semantic_fingerprint( "f.py", ours, theirs, _ThrowingPlugin(), repo ) assert result == blob_fingerprint(ours, theirs) def test_bad_length_plugin_falls_back_to_blob(self, repo: pathlib.Path) -> None: ours = blob_id(b"x") theirs = blob_id(b"y") result = compute_semantic_fingerprint( "f.py", ours, theirs, _BadLengthPlugin(), repo ) assert result == blob_fingerprint(ours, theirs) def test_path_does_not_affect_blob_fallback(self, repo: pathlib.Path) -> None: ours = blob_id(b"x") theirs = blob_id(b"y") r1 = compute_semantic_fingerprint("a.py", ours, theirs, _FakePlugin(), repo) r2 = compute_semantic_fingerprint("b.py", ours, theirs, _FakePlugin(), repo) # blob_fingerprint is path-independent assert r1 == r2 # =========================================================================== # 4. record_pattern / load_pattern # =========================================================================== class TestRecordPatternAndLoad: def test_record_then_load(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) loaded = load_pattern(repo, p.pattern_id) assert loaded is not None assert loaded.pattern_id == p.pattern_id assert loaded.path == p.path assert loaded.domain == p.domain def test_record_idempotent(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) record_pattern(repo, p) # second call must be a no-op assert len(list_patterns(repo)) == 1 def test_load_missing_returns_none(self, repo: pathlib.Path) -> None: assert load_pattern(repo, _hex64("missing")) is None def test_load_invalid_id_returns_none(self, repo: pathlib.Path) -> None: assert load_pattern(repo, "not-hex-64") is None def test_record_invalid_id_raises(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) # Replace the pattern_id with an invalid value via dataclass replace from dataclasses import replace as dc_replace bad_p = dc_replace(p, pattern_id="short") with pytest.raises(ValueError): record_pattern(repo, bad_p) def test_all_fields_round_trip(self, repo: pathlib.Path) -> None: p = _make_pattern(repo, path="src/api.py::handle_request", domain="code") record_pattern(repo, p) loaded = load_pattern(repo, p.pattern_id) assert loaded is not None assert loaded.path == "src/api.py::handle_request" assert loaded.conflict_type == ConflictType.CONTENT assert loaded.blob_fingerprint == p.blob_fingerprint # =========================================================================== # 5. list_patterns # =========================================================================== class TestListPatterns: def test_empty_store(self, repo: pathlib.Path) -> None: assert list_patterns(repo) == [] def test_one_pattern(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) patterns = list_patterns(repo) assert len(patterns) == 1 assert patterns[0].pattern_id == p.pattern_id def test_multiple_patterns(self, repo: pathlib.Path) -> None: for i in range(5): p = _make_pattern(repo, ours_content=bytes([i]), theirs_content=bytes([i + 10])) record_pattern(repo, p) assert len(list_patterns(repo)) == 5 def test_distinct_paths_produce_distinct_patterns(self, repo: pathlib.Path) -> None: ours = _write_obj(repo, b"ours") theirs = _write_obj(repo, b"theirs") paths = ["a.py::foo", "a.py::bar", "b.py::baz"] for path in paths: blob_fp = blob_fingerprint(ours, theirs) pid = compute_pattern_id(path, blob_fp, blob_fp) from dataclasses import replace as dc_replace p = dc_replace( _make_pattern(repo), pattern_id=pid, path=path, ours_id=ours, theirs_id=theirs, blob_fingerprint=blob_fp, semantic_fingerprint=blob_fp, ) record_pattern(repo, p) result = list_patterns(repo) assert len(result) == 3 stored_paths = {r.path for r in result} assert stored_paths == set(paths) # =========================================================================== # 6. forget_pattern # =========================================================================== class TestForgetPattern: def test_forget_existing(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) assert forget_pattern(repo, p.pattern_id) is True assert load_pattern(repo, p.pattern_id) is None def test_forget_missing_returns_false(self, repo: pathlib.Path) -> None: assert forget_pattern(repo, _hex64("gone")) is False def test_forget_also_removes_resolutions(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) r = _make_resolution(p, b"outcome", repo) save_resolution(repo, r) forget_pattern(repo, p.pattern_id) assert list_resolutions(repo, p.pattern_id) == [] def test_invalid_id_returns_false(self, repo: pathlib.Path) -> None: assert forget_pattern(repo, "bad-id") is False # =========================================================================== # 7. clear_all # =========================================================================== class TestClearAll: def test_empty_store(self, repo: pathlib.Path) -> None: assert clear_all(repo) == 0 def test_clears_all_patterns(self, repo: pathlib.Path) -> None: for i in range(3): p = _make_pattern(repo, ours_content=bytes([i]), theirs_content=bytes([i + 20])) record_pattern(repo, p) assert clear_all(repo) == 3 assert list_patterns(repo) == [] def test_clears_patterns_with_resolutions(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) r = _make_resolution(p, b"out", repo) save_resolution(repo, r) clear_all(repo) assert list_patterns(repo) == [] assert list_resolutions(repo, p.pattern_id) == [] # =========================================================================== # 8. save_resolution / load_resolution # =========================================================================== class TestSaveAndLoadResolution: def test_save_then_load(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) r = _make_resolution(p, b"resolved content", repo) save_resolution(repo, r) loaded = load_resolution(repo, p.pattern_id, r.resolution_id) assert loaded is not None assert loaded.resolution_id == r.resolution_id assert loaded.outcome_blob == r.outcome_blob assert loaded.human_verified is True assert loaded.confidence == 1.0 def test_load_missing_returns_none(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) assert load_resolution(repo, p.pattern_id, _hex64("gone")) is None def test_save_is_idempotent(self, repo: pathlib.Path) -> None: """save_resolution is explicitly idempotent — second call is a no-op.""" p = _make_pattern(repo) record_pattern(repo, p) r = _make_resolution(p, b"v1", repo) save_resolution(repo, r) # Second call with same resolution_id must not raise and must not overwrite from dataclasses import replace as dc_replace r2 = dc_replace(r, applied_count=99, confidence=0.01) save_resolution(repo, r2) # must be a no-op loaded = load_resolution(repo, p.pattern_id, r.resolution_id) assert loaded is not None assert loaded.applied_count == 0 # original value preserved assert loaded.confidence == 1.0 # original value preserved def test_save_requires_existing_pattern(self, repo: pathlib.Path) -> None: """save_resolution raises FileNotFoundError when pattern doesn't exist.""" fake_pattern_id = _hex64("nonexistent-pattern") outcome = _write_obj(repo, b"outcome") now = _now() prov = AgentProvenance.human() rid = compute_resolution_id(fake_pattern_id, outcome, ResolutionStrategy.MANUAL, prov, now) r = Resolution( resolution_id=rid, pattern_id=fake_pattern_id, strategy=ResolutionStrategy.MANUAL, policy_id=None, outcome_blob=outcome, resolved_by=prov, human_verified=False, confidence=0.5, rationale="orphan", resolved_at=now, applied_count=0, ) with pytest.raises(FileNotFoundError): save_resolution(repo, r) def test_agent_provenance_round_trip(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) outcome = _write_obj(repo, b"agent-resolved") now = _now() prov = AgentProvenance.agent("claude-code", "claude-sonnet-4-6") rid = compute_resolution_id( p.pattern_id, outcome, ResolutionStrategy.EXACT_REPLAY, prov, now ) r = Resolution( resolution_id=rid, pattern_id=p.pattern_id, strategy=ResolutionStrategy.EXACT_REPLAY, policy_id=None, outcome_blob=outcome, resolved_by=prov, human_verified=False, confidence=0.95, rationale="agent resolved", resolved_at=now, applied_count=0, ) save_resolution(repo, r) loaded = load_resolution(repo, p.pattern_id, rid) assert loaded is not None assert loaded.resolved_by.type == "agent" assert loaded.resolved_by.agent_id == "claude-code" assert loaded.resolved_by.model_id == "claude-sonnet-4-6" assert loaded.confidence == 0.95 assert loaded.human_verified is False # =========================================================================== # 9. list_resolutions + sorting # =========================================================================== class TestListResolutions: def test_empty(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) assert list_resolutions(repo, p.pattern_id) == [] def test_single(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) r = _make_resolution(p, b"out", repo) save_resolution(repo, r) result = list_resolutions(repo, p.pattern_id) assert len(result) == 1 assert result[0].resolution_id == r.resolution_id def test_sorted_human_verified_first(self, repo: pathlib.Path) -> None: """human_verified=True must sort before human_verified=False.""" p = _make_pattern(repo) record_pattern(repo, p) unverified = _make_resolution(p, b"unverified", repo, human_verified=False, confidence=1.0) verified = _make_resolution(p, b"verified", repo, human_verified=True, confidence=0.5) save_resolution(repo, unverified) save_resolution(repo, verified) result = list_resolutions(repo, p.pattern_id) assert result[0].human_verified is True def test_sorted_confidence_descending(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) low = _make_resolution(p, b"low", repo, confidence=0.3, human_verified=False) high = _make_resolution(p, b"high", repo, confidence=0.9, human_verified=False) save_resolution(repo, low) save_resolution(repo, high) result = list_resolutions(repo, p.pattern_id) assert result[0].confidence == 0.9 def test_sorted_applied_count_descending_as_tiebreaker( self, repo: pathlib.Path ) -> None: p = _make_pattern(repo) record_pattern(repo, p) rarely = _make_resolution(p, b"rarely", repo, confidence=0.8, applied_count=1) often = _make_resolution(p, b"often", repo, confidence=0.8, applied_count=10) save_resolution(repo, rarely) save_resolution(repo, often) result = list_resolutions(repo, p.pattern_id) assert result[0].applied_count == 10 def test_invalid_pattern_id_returns_empty(self, repo: pathlib.Path) -> None: assert list_resolutions(repo, "bad-id") == [] # =========================================================================== # 10. best_resolution # =========================================================================== class TestBestResolution: def test_returns_none_when_no_resolutions(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) assert best_resolution(repo, p.pattern_id) is None def test_returns_only_resolution(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) r = _make_resolution(p, b"out", repo) save_resolution(repo, r) best = best_resolution(repo, p.pattern_id) assert best is not None assert best.resolution_id == r.resolution_id def test_prefers_human_verified_over_high_confidence( self, repo: pathlib.Path ) -> None: p = _make_pattern(repo) record_pattern(repo, p) agent_high = _make_resolution( p, b"agent", repo, human_verified=False, confidence=0.99 ) human_low = _make_resolution( p, b"human", repo, human_verified=True, confidence=0.5 ) save_resolution(repo, agent_high) save_resolution(repo, human_low) best = best_resolution(repo, p.pattern_id) assert best is not None assert best.human_verified is True def test_prefers_higher_confidence(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) low = _make_resolution(p, b"low", repo, confidence=0.4, human_verified=False) high = _make_resolution(p, b"high", repo, confidence=0.8, human_verified=False) save_resolution(repo, low) save_resolution(repo, high) best = best_resolution(repo, p.pattern_id) assert best is not None assert best.confidence == 0.8 def test_prefers_higher_applied_count_as_tiebreaker( self, repo: pathlib.Path ) -> None: p = _make_pattern(repo) record_pattern(repo, p) rare = _make_resolution(p, b"rare", repo, applied_count=2, confidence=0.9) freq = _make_resolution(p, b"freq", repo, applied_count=20, confidence=0.9) save_resolution(repo, rare) save_resolution(repo, freq) best = best_resolution(repo, p.pattern_id) assert best is not None assert best.applied_count == 20 # =========================================================================== # 11. increment_applied_count # =========================================================================== class TestIncrementAppliedCount: def test_increments_from_zero(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) r = _make_resolution(p, b"out", repo, applied_count=0) save_resolution(repo, r) result = increment_applied_count(repo, p.pattern_id, r.resolution_id) assert result is True loaded = load_resolution(repo, p.pattern_id, r.resolution_id) assert loaded is not None assert loaded.applied_count == 1 def test_increments_multiple_times(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) r = _make_resolution(p, b"out", repo, applied_count=0) save_resolution(repo, r) for _ in range(5): increment_applied_count(repo, p.pattern_id, r.resolution_id) loaded = load_resolution(repo, p.pattern_id, r.resolution_id) assert loaded is not None assert loaded.applied_count == 5 def test_returns_false_for_missing_resolution(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) result = increment_applied_count(repo, p.pattern_id, _hex64("gone")) assert result is False # =========================================================================== # 12. gc_stale # =========================================================================== class TestGcStale: def test_empty_store(self, repo: pathlib.Path) -> None: assert gc_stale(repo, age_days=0) == 0 def test_removes_old_pattern_without_resolution(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) from dataclasses import replace as dc_replace old_time = _now() - datetime.timedelta(days=200) old_p = dc_replace(p, recorded_at=old_time) record_pattern(repo, old_p) removed = gc_stale(repo, age_days=90) assert removed == 1 assert list_patterns(repo) == [] def test_keeps_pattern_with_resolution(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) from dataclasses import replace as dc_replace old_time = _now() - datetime.timedelta(days=200) old_p = dc_replace(p, recorded_at=old_time) record_pattern(repo, old_p) r = _make_resolution(old_p, b"resolved", repo) save_resolution(repo, r) removed = gc_stale(repo, age_days=90) assert removed == 0 assert len(list_patterns(repo)) == 1 def test_keeps_recent_pattern_without_resolution(self, repo: pathlib.Path) -> None: p = _make_pattern(repo) record_pattern(repo, p) # recorded_at = now removed = gc_stale(repo, age_days=90) assert removed == 0 assert len(list_patterns(repo)) == 1 def test_removes_only_old_stale_patterns(self, repo: pathlib.Path) -> None: """Mix of old-stale, old-resolved, new-stale — only old-stale removed.""" from dataclasses import replace as dc_replace old_stale = _make_pattern(repo, ours_content=b"old_s_o", theirs_content=b"old_s_t") old_resolved = _make_pattern(repo, ours_content=b"old_r_o", theirs_content=b"old_r_t") new_stale = _make_pattern(repo, ours_content=b"new_s_o", theirs_content=b"new_s_t") old_time = _now() - datetime.timedelta(days=100) old_stale = dc_replace(old_stale, recorded_at=old_time) old_resolved = dc_replace(old_resolved, recorded_at=old_time) record_pattern(repo, old_stale) record_pattern(repo, old_resolved) record_pattern(repo, new_stale) r = _make_resolution(old_resolved, b"res", repo) save_resolution(repo, r) removed = gc_stale(repo, age_days=90) assert removed == 1 remaining = {p.pattern_id for p in list_patterns(repo)} assert old_resolved.pattern_id in remaining assert new_stale.pattern_id in remaining assert old_stale.pattern_id not in remaining # =========================================================================== # 13. Audit log # =========================================================================== class TestAuditLog: """append_audit(root, event_type, acted_by, *, pattern_id, resolution_id, policy_id, metadata)""" def test_empty_store(self, repo: pathlib.Path) -> None: assert list_audit(repo) == [] def test_append_and_list(self, repo: pathlib.Path) -> None: append_audit( repo, AuditEventType.PATTERN_RECORDED, AgentProvenance.human(), pattern_id=_hex64("p"), ) entries = list_audit(repo) assert len(entries) == 1 assert entries[0]["event_type"] == AuditEventType.PATTERN_RECORDED def test_multiple_entries_accumulate(self, repo: pathlib.Path) -> None: for i in range(5): append_audit( repo, AuditEventType.RESOLUTION_SAVED, AgentProvenance.human(), metadata={"i": i}, ) assert len(list_audit(repo)) == 5 def test_limit_parameter(self, repo: pathlib.Path) -> None: for _ in range(10): append_audit(repo, AuditEventType.PATTERN_RECORDED, AgentProvenance.human()) assert len(list_audit(repo, limit=3)) == 3 def test_audit_is_append_only(self, repo: pathlib.Path) -> None: append_audit(repo, AuditEventType.PATTERN_RECORDED, AgentProvenance.human()) append_audit(repo, AuditEventType.RESOLUTION_APPLIED, AgentProvenance.human()) entries = list_audit(repo) types = {e["event_type"] for e in entries} assert AuditEventType.PATTERN_RECORDED in types assert AuditEventType.RESOLUTION_APPLIED in types def test_agent_provenance_in_audit(self, repo: pathlib.Path) -> None: prov = AgentProvenance.agent("claude-code", "claude-sonnet-4-6") append_audit(repo, AuditEventType.RESOLUTION_APPLIED, prov) entries = list_audit(repo) assert len(entries) == 1 assert entries[0]["acted_by"]["type"] == "agent" assert entries[0]["acted_by"]["agent_id"] == "claude-code" def test_different_event_types(self, repo: pathlib.Path) -> None: for event in [ AuditEventType.PATTERN_RECORDED, AuditEventType.RESOLUTION_SAVED, AuditEventType.RESOLUTION_APPLIED, AuditEventType.GC_RUN, AuditEventType.CLEAR_RUN, ]: append_audit(repo, event, AgentProvenance.human()) entries = list_audit(repo, limit=100) event_types = {e["event_type"] for e in entries} for event in [ AuditEventType.PATTERN_RECORDED, AuditEventType.RESOLUTION_SAVED, AuditEventType.RESOLUTION_APPLIED, ]: assert event in event_types def test_metadata_stored(self, repo: pathlib.Path) -> None: append_audit( repo, AuditEventType.PATTERN_RECORDED, AgentProvenance.human(), pattern_id=_hex64("p"), metadata={"extra": "value"}, ) entries = list_audit(repo) assert entries[0]["metadata"]["extra"] == "value" def test_audit_id_is_content_addressed(self, repo: pathlib.Path) -> None: """audit_id must be sha256: of canonical entry content, not a UUID.""" import json as _json append_audit( repo, AuditEventType.PATTERN_RECORDED, AgentProvenance.human(), pattern_id=_hex64("p"), metadata={"k": "v"}, ) entry = list_audit(repo)[0] audit_id = entry["audit_id"] # Must be a long_id, not a UUID4 assert audit_id.startswith("sha256:"), f"Expected sha256: prefix, got {audit_id!r}" assert len(audit_id) == 71, f"Expected 71 chars (sha256: + 64 hex), got {len(audit_id)}" def test_audit_id_is_deterministic(self, repo: pathlib.Path) -> None: """Same content → same audit_id (content-addressed, not random).""" import json as _json prov = AgentProvenance.human() # Compute what the id should be from the entry fields append_audit( repo, AuditEventType.RESOLUTION_SAVED, prov, pattern_id=_hex64("p"), resolution_id=_hex64("r"), metadata={"x": 1}, ) entry = list_audit(repo)[0] # Re-derive: sha256 of entry without audit_id, sorted keys payload = {k: v for k, v in entry.items() if k != "audit_id"} expected = blob_id(_json.dumps(payload, sort_keys=True, separators=(",", ":")).encode()) assert entry["audit_id"] == expected def test_audit_id_is_sha256_not_uuid4(self, repo: pathlib.Path) -> None: """audit_id must be sha256-prefixed, not UUID4 (8-4-4-4-12 format).""" import re append_audit(repo, AuditEventType.GC_RUN, AgentProvenance.human()) entry = list_audit(repo)[0] uuid4_re = re.compile( r"^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$" ) assert not uuid4_re.match(entry["audit_id"]), "audit_id must not be a UUID4" # =========================================================================== # 14. Policy CRUD # =========================================================================== class TestPolicyCRUD: def test_save_and_load(self, repo: pathlib.Path) -> None: policy = _make_policy("my-policy") save_policy(repo, policy) loaded = load_policy(repo, "my-policy") assert loaded is not None assert loaded.policy_id == "my-policy" assert loaded.action == PolicyAction.PREFER_OURS def test_load_missing_returns_none(self, repo: pathlib.Path) -> None: assert load_policy(repo, "nonexistent") is None def test_load_invalid_id_returns_none(self, repo: pathlib.Path) -> None: assert load_policy(repo, "bad id!") is None def test_save_overwrites_existing(self, repo: pathlib.Path) -> None: p1 = _make_policy("pol", action=PolicyAction.PREFER_OURS) save_policy(repo, p1) p2 = _make_policy("pol", action=PolicyAction.PREFER_THEIRS) save_policy(repo, p2) loaded = load_policy(repo, "pol") assert loaded is not None assert loaded.action == PolicyAction.PREFER_THEIRS def test_list_empty(self, repo: pathlib.Path) -> None: assert list_policies(repo) == [] def test_list_multiple(self, repo: pathlib.Path) -> None: for pid in ["a", "b", "c"]: save_policy(repo, _make_policy(pid)) assert len(list_policies(repo)) == 3 def test_remove_existing(self, repo: pathlib.Path) -> None: save_policy(repo, _make_policy("remove-me")) assert remove_policy(repo, "remove-me") is True assert load_policy(repo, "remove-me") is None def test_remove_missing_returns_false(self, repo: pathlib.Path) -> None: assert remove_policy(repo, "nope") is False def test_list_sorted_by_scope_order(self, repo: pathlib.Path) -> None: file_pol = _make_policy("file-pol", scope=PolicyScope.FILE) ws_pol = _make_policy("ws-pol", scope=PolicyScope.WORKSPACE) domain_pol = _make_policy("domain-pol", scope=PolicyScope.DOMAIN) repo_pol = _make_policy("repo-pol", scope=PolicyScope.REPO) for p in [file_pol, domain_pol, ws_pol, repo_pol]: save_policy(repo, p) ordered = list_policies(repo) scopes = [p.scope for p in ordered] expected_order = [PolicyScope.WORKSPACE, PolicyScope.REPO, PolicyScope.DOMAIN, PolicyScope.FILE] assert scopes == expected_order # =========================================================================== # 15. PolicyCondition matching (_condition_matches and match_policy) # =========================================================================== class TestPolicyConditionMatching: """_condition_matches and match_policy — all predicate combinations.""" def _pattern(self, path: str = "f.py", domain: str = "code", conflict_type: str = ConflictType.CONTENT) -> ConflictPattern: blob = _hex64("fp") pid = compute_pattern_id(path, blob, blob) return ConflictPattern( pattern_id=pid, path=path, domain=domain, conflict_type=conflict_type, blob_fingerprint=blob, semantic_fingerprint=blob, ours_id=blob_id(b"o"), theirs_id=blob_id(b"t"), description={}, recorded_at=_now(), recorded_by="test", ) def test_all_none_matches_everything(self) -> None: cond = PolicyCondition() assert h._condition_matches(cond, self._pattern()) assert h._condition_matches(cond, self._pattern(domain="midi")) def test_conflict_type_match(self) -> None: cond = PolicyCondition(conflict_type=ConflictType.CONTENT) assert h._condition_matches(cond, self._pattern()) def test_conflict_type_no_match(self) -> None: cond = PolicyCondition(conflict_type=ConflictType.STRUCTURAL) assert not h._condition_matches(cond, self._pattern()) def test_domain_match(self) -> None: cond = PolicyCondition(domain="code") assert h._condition_matches(cond, self._pattern(domain="code")) def test_domain_no_match(self) -> None: cond = PolicyCondition(domain="midi") assert not h._condition_matches(cond, self._pattern(domain="code")) def test_path_pattern_exact_glob(self) -> None: cond = PolicyCondition(path_pattern="*.py") assert h._condition_matches(cond, self._pattern(path="app.py")) def test_path_pattern_directory_glob(self) -> None: cond = PolicyCondition(path_pattern="src/*.py") assert h._condition_matches(cond, self._pattern(path="src/main.py")) assert not h._condition_matches(cond, self._pattern(path="tests/main.py")) def test_path_pattern_no_match(self) -> None: cond = PolicyCondition(path_pattern="*.mid") assert not h._condition_matches(cond, self._pattern(path="song.py")) def test_all_conditions_must_match(self) -> None: cond = PolicyCondition(conflict_type=ConflictType.CONTENT, domain="code", path_pattern="*.py") assert h._condition_matches(cond, self._pattern()) # domain wrong assert not h._condition_matches(cond, self._pattern(domain="midi")) # path wrong assert not h._condition_matches(cond, self._pattern(path="song.mid")) def test_min_confidence_not_checked_here(self) -> None: """min_confidence is a proposal-time filter — _condition_matches ignores it.""" cond = PolicyCondition(min_confidence=0.99) # Should match regardless — min_confidence is not a pattern field assert h._condition_matches(cond, self._pattern()) def test_match_policy_returns_none_when_no_policies(self) -> None: assert match_policy([], self._pattern()) is None def test_match_policy_returns_first_match(self) -> None: p1 = _make_policy("p1", action=PolicyAction.PREFER_OURS, conflict_type=ConflictType.CONTENT) p2 = _make_policy("p2", action=PolicyAction.PREFER_THEIRS, conflict_type=ConflictType.CONTENT) result = match_policy([p1, p2], self._pattern()) assert result is not None assert result.policy_id == "p1" def test_match_policy_returns_none_when_no_match(self) -> None: p1 = _make_policy("p1", domain="midi") # won't match domain="code" result = match_policy([p1], self._pattern(domain="code")) assert result is None def test_match_policy_skips_non_matching(self) -> None: wrong = _make_policy("wrong", domain="midi") right = _make_policy("right", domain="code") result = match_policy([wrong, right], self._pattern(domain="code")) assert result is not None assert result.policy_id == "right" def test_match_policy_symbol_path_glob(self) -> None: """path_pattern must match symbol-level paths like 'config.py::*'.""" cond_pol = _make_policy("sym", path_pattern="config.py::*") result = match_policy([cond_pol], self._pattern(path="config.py::MAX_CONNECTIONS")) assert result is not None # =========================================================================== # 16. Escalation lifecycle # =========================================================================== class TestEscalationLifecycle: def _make_escalation(self, pattern_id: str | None = None) -> EscalationRecord: pid = pattern_id or _hex64("pat") reason = "could not auto-resolve" eid = compute_escalation_id(pid, reason) return EscalationRecord( escalation_id=eid, pattern_id=pid, reason=reason, escalated_at=_now(), escalated_by=AgentProvenance.agent("claude-code"), status=EscalationStatus.OPEN, ) def test_compute_escalation_id_deterministic(self) -> None: pid = _hex64("p") reason = "reason" assert compute_escalation_id(pid, reason) == compute_escalation_id(pid, reason) def test_compute_escalation_id_differs_on_different_inputs(self) -> None: pid = _hex64("p") assert compute_escalation_id(pid, "r1") != compute_escalation_id(pid, "r2") assert compute_escalation_id(_hex64("p1"), "r") != compute_escalation_id(_hex64("p2"), "r") def test_record_and_load(self, repo: pathlib.Path) -> None: rec = self._make_escalation() result = record_escalation(repo, rec) assert result is True loaded = load_escalation(repo, rec.escalation_id) assert loaded is not None assert loaded.escalation_id == rec.escalation_id assert loaded.status == EscalationStatus.OPEN def test_record_idempotent(self, repo: pathlib.Path) -> None: rec = self._make_escalation() assert record_escalation(repo, rec) is True assert record_escalation(repo, rec) is False # already exists def test_load_missing_returns_none(self, repo: pathlib.Path) -> None: assert load_escalation(repo, _hex64("gone")) is None def test_list_escalations_empty(self, repo: pathlib.Path) -> None: assert list_escalations(repo) == [] def test_list_escalations_all(self, repo: pathlib.Path) -> None: for i in range(3): rec = self._make_escalation(_hex64(f"pat{i}")) record_escalation(repo, rec) assert len(list_escalations(repo)) == 3 def test_list_escalations_filter_by_status(self, repo: pathlib.Path) -> None: open_rec = self._make_escalation(_hex64("open")) record_escalation(repo, open_rec) resolved_rec = self._make_escalation(_hex64("resolved")) record_escalation(repo, resolved_rec) resolve_escalation( repo, resolved_rec.escalation_id, _hex64("res"), AgentProvenance.human(), _now(), ) open_list = list_escalations(repo, status=EscalationStatus.OPEN) resolved_list = list_escalations(repo, status=EscalationStatus.RESOLVED) assert len(open_list) == 1 assert open_list[0].status == EscalationStatus.OPEN assert len(resolved_list) == 1 assert resolved_list[0].status == EscalationStatus.RESOLVED def test_resolve_escalation(self, repo: pathlib.Path) -> None: rec = self._make_escalation() record_escalation(repo, rec) result = resolve_escalation( repo, rec.escalation_id, _hex64("resolution"), AgentProvenance.human(), _now(), ) assert result is True loaded = load_escalation(repo, rec.escalation_id) assert loaded is not None assert loaded.status == EscalationStatus.RESOLVED assert loaded.resolution_id == _hex64("resolution") def test_resolve_missing_escalation_returns_false(self, repo: pathlib.Path) -> None: result = resolve_escalation( repo, _hex64("gone"), _hex64("res"), AgentProvenance.human(), _now() ) assert result is False # =========================================================================== # 17. record_resolutions — file-level paths # =========================================================================== class TestRecordResolutionsFilePaths: def test_records_pattern_and_resolution(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"version = 1") theirs_id = _write_obj(repo, b"version = 2") resolution_id = _write_obj(repo, b"version = 3") ours_m: Manifest = {"config.py": ours_id} theirs_m: Manifest = {"config.py": theirs_id} new_m: Manifest = {"config.py": resolution_id} saved = record_resolutions(repo, ["config.py"], ours_m, theirs_m, new_m, "code", _FakePlugin()) assert saved == ["config.py"] patterns = list_patterns(repo) assert len(patterns) == 1 assert patterns[0].path == "config.py" resolutions = list_resolutions(repo, patterns[0].pattern_id) assert len(resolutions) == 1 assert resolutions[0].outcome_blob == resolution_id assert resolutions[0].human_verified is True assert resolutions[0].confidence == 1.0 assert resolutions[0].strategy == ResolutionStrategy.MANUAL def test_skips_path_not_in_manifests(self, repo: pathlib.Path) -> None: saved = record_resolutions(repo, ["missing.py"], {}, {}, {}, "code", _FakePlugin()) assert saved == [] assert list_patterns(repo) == [] def test_skips_when_ours_missing_from_manifest(self, repo: pathlib.Path) -> None: theirs_id = _write_obj(repo, b"v2") res_id = _write_obj(repo, b"v2") saved = record_resolutions( repo, ["f.py"], {}, {"f.py": theirs_id}, {"f.py": res_id}, "code", _FakePlugin() ) assert saved == [] def test_skips_when_theirs_missing_from_manifest(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"v1") res_id = _write_obj(repo, b"v1") saved = record_resolutions( repo, ["f.py"], {"f.py": ours_id}, {}, {"f.py": res_id}, "code", _FakePlugin() ) assert saved == [] def test_skips_when_outcome_missing_from_new_manifest(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"v1") theirs_id = _write_obj(repo, b"v2") saved = record_resolutions( repo, ["f.py"], {"f.py": ours_id}, {"f.py": theirs_id}, {}, "code", _FakePlugin() ) assert saved == [] def test_idempotent_second_call(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"a") theirs_id = _write_obj(repo, b"b") resolution_id = _write_obj(repo, b"c") ours_m: Manifest = {"f.py": ours_id} theirs_m: Manifest = {"f.py": theirs_id} new_m: Manifest = {"f.py": resolution_id} record_resolutions(repo, ["f.py"], ours_m, theirs_m, new_m, "code", _FakePlugin()) record_resolutions(repo, ["f.py"], ours_m, theirs_m, new_m, "code", _FakePlugin()) assert len(list_patterns(repo)) == 1 assert len(list_resolutions(repo, list_patterns(repo)[0].pattern_id)) == 1 def test_multiple_paths(self, repo: pathlib.Path) -> None: ours_a = _write_obj(repo, b"a_ours") theirs_a = _write_obj(repo, b"a_theirs") res_a = _write_obj(repo, b"a_res") ours_b = _write_obj(repo, b"b_ours") theirs_b = _write_obj(repo, b"b_theirs") res_b = _write_obj(repo, b"b_res") saved = record_resolutions( repo, ["a.py", "b.py"], {"a.py": ours_a, "b.py": ours_b}, {"a.py": theirs_a, "b.py": theirs_b}, {"a.py": res_a, "b.py": res_b}, "code", _FakePlugin(), ) assert set(saved) == {"a.py", "b.py"} assert len(list_patterns(repo)) == 2 def test_returns_only_saved_paths(self, repo: pathlib.Path) -> None: """Paths missing from manifests are silently skipped, not in return value.""" ours_id = _write_obj(repo, b"x") theirs_id = _write_obj(repo, b"y") res_id = _write_obj(repo, b"z") saved = record_resolutions( repo, ["present.py", "absent.py"], {"present.py": ours_id}, {"present.py": theirs_id}, {"present.py": res_id}, "code", _FakePlugin(), ) assert saved == ["present.py"] # =========================================================================== # 18. record_resolutions — symbol-level paths # =========================================================================== class TestRecordResolutionsSymbolPaths: def test_symbol_path_records_pattern(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"MAX_CONNECTIONS = 10") theirs_id = _write_obj(repo, b"MAX_CONNECTIONS = 25") resolution_id = _write_obj(repo, b"MAX_CONNECTIONS = 50") saved = record_resolutions( repo, ["config.py::MAX_CONNECTIONS"], {"config.py": ours_id}, {"config.py": theirs_id}, {"config.py": resolution_id}, "code", _FakePlugin(), ) assert saved == ["config.py::MAX_CONNECTIONS"] patterns = list_patterns(repo) assert len(patterns) == 1 assert patterns[0].path == "config.py::MAX_CONNECTIONS" def test_multiple_symbols_same_file_produce_distinct_patterns( self, repo: pathlib.Path ) -> None: file_ours = _write_obj(repo, b"file-ours") file_theirs = _write_obj(repo, b"file-theirs") file_resolved = _write_obj(repo, b"file-resolved") saved = record_resolutions( repo, ["app.py::foo", "app.py::bar"], {"app.py": file_ours}, {"app.py": file_theirs}, {"app.py": file_resolved}, "code", _FakePlugin(), ) assert set(saved) == {"app.py::foo", "app.py::bar"} patterns = list_patterns(repo) assert len(patterns) == 2 paths = {p.path for p in patterns} assert paths == {"app.py::foo", "app.py::bar"} def test_symbol_path_missing_file_portion(self, repo: pathlib.Path) -> None: saved = record_resolutions( repo, ["missing.py::Symbol"], {}, {}, {}, "code", _FakePlugin() ) assert saved == [] def test_symbol_path_idempotent(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"ours") theirs_id = _write_obj(repo, b"theirs") res_id = _write_obj(repo, b"resolved") ours_m: Manifest = {"f.py": ours_id} theirs_m: Manifest = {"f.py": theirs_id} new_m: Manifest = {"f.py": res_id} record_resolutions(repo, ["f.py::Symbol"], ours_m, theirs_m, new_m, "code", _FakePlugin()) record_resolutions(repo, ["f.py::Symbol"], ours_m, theirs_m, new_m, "code", _FakePlugin()) assert len(list_patterns(repo)) == 1 assert len(list_resolutions(repo, list_patterns(repo)[0].pattern_id)) == 1 def test_deeply_nested_symbol_path(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"ours") theirs_id = _write_obj(repo, b"theirs") res_id = _write_obj(repo, b"resolved") saved = record_resolutions( repo, ["src/auth/tokens.py::TokenManager.rotate"], {"src/auth/tokens.py": ours_id}, {"src/auth/tokens.py": theirs_id}, {"src/auth/tokens.py": res_id}, "code", _FakePlugin(), ) assert saved == ["src/auth/tokens.py::TokenManager.rotate"] patterns = list_patterns(repo) assert patterns[0].path == "src/auth/tokens.py::TokenManager.rotate" # =========================================================================== # 19. auto_apply — exact replay (file paths) # =========================================================================== class TestAutoApplyExactReplay: def test_no_resolution_records_pattern_and_returns_remaining( self, repo: pathlib.Path ) -> None: ours_id = _write_obj(repo, b"v1") theirs_id = _write_obj(repo, b"v2") ours_m: Manifest = {"f.py": ours_id} theirs_m: Manifest = {"f.py": theirs_id} resolved, remaining = auto_apply( repo, ["f.py"], ours_m, theirs_m, "code", _FakePlugin() ) assert resolved == {} assert "f.py" in remaining # Pattern should have been recorded for future learning assert len(list_patterns(repo)) == 1 def test_second_identical_conflict_auto_resolves( self, repo: pathlib.Path ) -> None: ours_id = _write_obj(repo, b"ours-content") theirs_id = _write_obj(repo, b"theirs-content") resolution_content = b"resolved-content" resolution_id = _write_obj(repo, resolution_content) ours_m: Manifest = {"f.py": ours_id} theirs_m: Manifest = {"f.py": theirs_id} new_m: Manifest = {"f.py": resolution_id} # First conflict: record how it was resolved record_resolutions(repo, ["f.py"], ours_m, theirs_m, new_m, "code", _FakePlugin()) # Second identical conflict: auto_apply should replay dest = repo / "f.py" resolved, remaining = auto_apply(repo, ["f.py"], ours_m, theirs_m, "code", _FakePlugin()) assert "f.py" in resolved assert remaining == [] assert dest.read_bytes() == resolution_content def test_commutative_replay(self, repo: pathlib.Path) -> None: """Record A-vs-B; auto_apply B-vs-A should still match.""" ours_id = _write_obj(repo, b"A") theirs_id = _write_obj(repo, b"B") resolution_id = _write_obj(repo, b"A") # kept ours ours_m: Manifest = {"f.py": ours_id} theirs_m: Manifest = {"f.py": theirs_id} new_m: Manifest = {"f.py": resolution_id} record_resolutions(repo, ["f.py"], ours_m, theirs_m, new_m, "code", _FakePlugin()) # Swapped: ours=B, theirs=A — same fingerprint (commutative) swapped_ours: Manifest = {"f.py": theirs_id} swapped_theirs: Manifest = {"f.py": ours_id} resolved, remaining = auto_apply( repo, ["f.py"], swapped_ours, swapped_theirs, "code", _FakePlugin() ) assert "f.py" in resolved assert remaining == [] def test_different_content_does_not_auto_resolve(self, repo: pathlib.Path) -> None: """Different blob IDs → different fingerprint → no auto-apply.""" ours_id = _write_obj(repo, b"old-ours") theirs_id = _write_obj(repo, b"old-theirs") resolution_id = _write_obj(repo, b"old-resolved") record_resolutions( repo, ["f.py"], {"f.py": ours_id}, {"f.py": theirs_id}, {"f.py": resolution_id}, "code", _FakePlugin(), ) new_ours = _write_obj(repo, b"new-ours") new_theirs = _write_obj(repo, b"new-theirs") resolved, remaining = auto_apply( repo, ["f.py"], {"f.py": new_ours}, {"f.py": new_theirs}, "code", _FakePlugin(), ) assert resolved == {} assert "f.py" in remaining def test_applied_count_incremented_on_replay(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"ours") theirs_id = _write_obj(repo, b"theirs") resolution_id = _write_obj(repo, b"resolved") ours_m: Manifest = {"f.py": ours_id} theirs_m: Manifest = {"f.py": theirs_id} new_m: Manifest = {"f.py": resolution_id} record_resolutions(repo, ["f.py"], ours_m, theirs_m, new_m, "code", _FakePlugin()) auto_apply(repo, ["f.py"], ours_m, theirs_m, "code", _FakePlugin()) p = list_patterns(repo)[0] resolutions = list_resolutions(repo, p.pattern_id) assert resolutions[0].applied_count == 1 def test_multiple_replays_increment_count(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"ours") theirs_id = _write_obj(repo, b"theirs") resolution_id = _write_obj(repo, b"resolved") ours_m: Manifest = {"f.py": ours_id} theirs_m: Manifest = {"f.py": theirs_id} new_m: Manifest = {"f.py": resolution_id} record_resolutions(repo, ["f.py"], ours_m, theirs_m, new_m, "code", _FakePlugin()) for _ in range(3): auto_apply(repo, ["f.py"], ours_m, theirs_m, "code", _FakePlugin()) p = list_patterns(repo)[0] resolutions = list_resolutions(repo, p.pattern_id) assert resolutions[0].applied_count == 3 def test_resolves_file_to_disk(self, repo: pathlib.Path) -> None: """The resolved content must actually be written to the working tree.""" ours_id = _write_obj(repo, b"ours") theirs_id = _write_obj(repo, b"theirs") content = b"the chosen resolution\n" resolution_id = _write_obj(repo, content) ours_m: Manifest = {"result.py": ours_id} theirs_m: Manifest = {"result.py": theirs_id} new_m: Manifest = {"result.py": resolution_id} record_resolutions(repo, ["result.py"], ours_m, theirs_m, new_m, "code", _FakePlugin()) dest = repo / "result.py" auto_apply(repo, ["result.py"], ours_m, theirs_m, "code", _FakePlugin()) assert dest.read_bytes() == content # =========================================================================== # 20. auto_apply — symbol-level paths # =========================================================================== class TestAutoApplySymbolPaths: def test_first_symbol_conflict_records_pattern(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"DEBUG = False") theirs_id = _write_obj(repo, b"DEBUG = True") _, remaining = auto_apply( repo, ["settings.py::DEBUG"], {"settings.py": ours_id}, {"settings.py": theirs_id}, "code", _FakePlugin(), ) assert "settings.py::DEBUG" in remaining patterns = list_patterns(repo) assert len(patterns) == 1 assert patterns[0].path == "settings.py::DEBUG" def test_symbol_conflict_replayed(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"TIMEOUT = 30") theirs_id = _write_obj(repo, b"TIMEOUT = 60") resolution_content = b"TIMEOUT = 45" resolution_id = _write_obj(repo, resolution_content) ours_m: Manifest = {"config.py": ours_id} theirs_m: Manifest = {"config.py": theirs_id} new_m: Manifest = {"config.py": resolution_id} record_resolutions( repo, ["config.py::TIMEOUT"], ours_m, theirs_m, new_m, "code", _FakePlugin() ) dest = repo / "config.py" resolved, remaining = auto_apply( repo, ["config.py::TIMEOUT"], ours_m, theirs_m, "code", _FakePlugin() ) assert "config.py::TIMEOUT" in resolved assert remaining == [] assert dest.read_bytes() == resolution_content def test_symbol_and_file_path_produce_distinct_patterns( self, repo: pathlib.Path ) -> None: """config.py and config.py::MAX must produce separate patterns.""" ours_id = _write_obj(repo, b"ours") theirs_id = _write_obj(repo, b"theirs") res_id = _write_obj(repo, b"res") ours_m: Manifest = {"config.py": ours_id} theirs_m: Manifest = {"config.py": theirs_id} new_m: Manifest = {"config.py": res_id} record_resolutions(repo, ["config.py"], ours_m, theirs_m, new_m, "code", _FakePlugin()) auto_apply(repo, ["config.py::MAX"], ours_m, theirs_m, "code", _FakePlugin()) patterns = list_patterns(repo) assert len(patterns) == 2 paths = {p.path for p in patterns} assert "config.py" in paths assert "config.py::MAX" in paths def test_multiple_symbols_in_one_auto_apply(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"ours") theirs_id = _write_obj(repo, b"theirs") res_id = _write_obj(repo, b"res") ours_m: Manifest = {"f.py": ours_id} theirs_m: Manifest = {"f.py": theirs_id} new_m: Manifest = {"f.py": res_id} record_resolutions( repo, ["f.py::alpha", "f.py::beta"], ours_m, theirs_m, new_m, "code", _FakePlugin() ) resolved, remaining = auto_apply( repo, ["f.py::alpha", "f.py::beta"], ours_m, theirs_m, "code", _FakePlugin() ) assert "f.py::alpha" in resolved assert "f.py::beta" in resolved assert remaining == [] def test_partial_resolution_some_symbols_remain( self, repo: pathlib.Path ) -> None: """Only symbols with saved resolutions are auto-applied.""" ours_id = _write_obj(repo, b"ours") theirs_id = _write_obj(repo, b"theirs") res_id = _write_obj(repo, b"res") ours_m: Manifest = {"f.py": ours_id} theirs_m: Manifest = {"f.py": theirs_id} new_m: Manifest = {"f.py": res_id} # Only record resolution for alpha, not beta record_resolutions( repo, ["f.py::alpha"], ours_m, theirs_m, new_m, "code", _FakePlugin() ) resolved, remaining = auto_apply( repo, ["f.py::alpha", "f.py::beta"], ours_m, theirs_m, "code", _FakePlugin() ) assert "f.py::alpha" in resolved assert "f.py::beta" in remaining # =========================================================================== # 21. auto_apply — path traversal guard # =========================================================================== class TestAutoApplyPathTraversal: def test_traversal_path_skipped(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"x") theirs_id = _write_obj(repo, b"y") ours_m: Manifest = {"../traversal.py": ours_id} theirs_m: Manifest = {"../traversal.py": theirs_id} resolved, remaining = auto_apply( repo, ["../traversal.py"], ours_m, theirs_m, "code", _FakePlugin() ) assert resolved == {} assert "../traversal.py" in remaining # No pattern should have been recorded for a traversal attempt assert list_patterns(repo) == [] def test_symbol_traversal_skipped(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"x") theirs_id = _write_obj(repo, b"y") ours_m: Manifest = {"../traversal.py": ours_id} theirs_m: Manifest = {"../traversal.py": theirs_id} resolved, remaining = auto_apply( repo, ["../traversal.py::Symbol"], ours_m, theirs_m, "code", _FakePlugin() ) assert resolved == {} assert "../traversal.py::Symbol" in remaining assert list_patterns(repo) == [] def test_absolute_path_skipped(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"x") theirs_id = _write_obj(repo, b"y") abs_path = "/etc/passwd" ours_m: Manifest = {abs_path: ours_id} theirs_m: Manifest = {abs_path: theirs_id} resolved, remaining = auto_apply( repo, [abs_path], ours_m, theirs_m, "code", _FakePlugin() ) assert resolved == {} assert abs_path in remaining def test_legitimate_nested_path_not_skipped(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"v1") theirs_id = _write_obj(repo, b"v2") res_id = _write_obj(repo, b"v3") path = "src/auth/tokens.py" ours_m: Manifest = {path: ours_id} theirs_m: Manifest = {path: theirs_id} new_m: Manifest = {path: res_id} record_resolutions(repo, [path], ours_m, theirs_m, new_m, "code", _FakePlugin()) resolved, remaining = auto_apply( repo, [path], ours_m, theirs_m, "code", _FakePlugin() ) assert path in resolved assert remaining == [] # =========================================================================== # 22. auto_apply — one-sided deletion (ours or theirs missing) # =========================================================================== class TestAutoApplyOneWayDeletion: def test_ours_missing_from_manifest(self, repo: pathlib.Path) -> None: theirs_id = _write_obj(repo, b"theirs") resolved, remaining = auto_apply( repo, ["f.py"], {}, {"f.py": theirs_id}, "code", _FakePlugin() ) assert resolved == {} assert "f.py" in remaining def test_theirs_missing_from_manifest(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"ours") resolved, remaining = auto_apply( repo, ["f.py"], {"f.py": ours_id}, {}, "code", _FakePlugin() ) assert resolved == {} assert "f.py" in remaining def test_both_missing(self, repo: pathlib.Path) -> None: resolved, remaining = auto_apply( repo, ["f.py"], {}, {}, "code", _FakePlugin() ) assert resolved == {} assert "f.py" in remaining # =========================================================================== # 23. auto_apply — empty conflict list # =========================================================================== class TestAutoApplyEmptyList: def test_empty_conflict_list(self, repo: pathlib.Path) -> None: resolved, remaining = auto_apply(repo, [], {}, {}, "code", _FakePlugin()) assert resolved == {} assert remaining == [] def test_empty_list_no_patterns_recorded(self, repo: pathlib.Path) -> None: auto_apply(repo, [], {}, {}, "code", _FakePlugin()) assert list_patterns(repo) == [] # =========================================================================== # 24. auto_apply — semantic fingerprinting via HarmonyPlugin # =========================================================================== class TestAutoApplySemanticPlugin: """A HarmonyPlugin that collapses semantically equivalent conflicts. Scenario: Two conflicts that have different blob IDs but the plugin assigns them the same semantic fingerprint. Harmony should recognise the second conflict as a replay of the first. """ def test_semantic_plugin_enables_cross_content_replay( self, repo: pathlib.Path ) -> None: shared_semantic = _hex64("shared-semantic-fingerprint") plugin = _SemanticPlugin(shared_semantic) # First conflict: blob pair (A, B) ours_A = _write_obj(repo, b"variant-A-ours") theirs_A = _write_obj(repo, b"variant-A-theirs") resolution_A = _write_obj(repo, b"resolution-A") ours_m_A: Manifest = {"song.mid": ours_A} theirs_m_A: Manifest = {"song.mid": theirs_A} new_m_A: Manifest = {"song.mid": resolution_A} record_resolutions( repo, ["song.mid"], ours_m_A, theirs_m_A, new_m_A, "midi", plugin ) # Second conflict: blob pair (C, D) — different content but same semantic FP ours_C = _write_obj(repo, b"variant-C-ours") theirs_C = _write_obj(repo, b"variant-C-theirs") ours_m_C: Manifest = {"song.mid": ours_C} theirs_m_C: Manifest = {"song.mid": theirs_C} dest = repo / "song.mid" resolved, remaining = auto_apply( repo, ["song.mid"], ours_m_C, theirs_m_C, "midi", plugin ) assert "song.mid" in resolved, ( "Semantic plugin maps both conflicts to the same pattern_id; " "auto_apply must replay the saved resolution even though blob IDs differ" ) assert remaining == [] assert dest.read_bytes() == b"resolution-A" def test_different_semantic_fingerprints_no_cross_replay( self, repo: pathlib.Path ) -> None: """Two semantically distinct conflicts must NOT cross-replay.""" plugin_A = _SemanticPlugin(_hex64("semantic-A")) plugin_B = _SemanticPlugin(_hex64("semantic-B")) ours_id = _write_obj(repo, b"ours") theirs_id = _write_obj(repo, b"theirs") res_id = _write_obj(repo, b"res") ours_m: Manifest = {"f.py": ours_id} theirs_m: Manifest = {"f.py": theirs_id} new_m: Manifest = {"f.py": res_id} record_resolutions(repo, ["f.py"], ours_m, theirs_m, new_m, "code", plugin_A) # Same blobs, but plugin_B produces a different fingerprint resolved, remaining = auto_apply( repo, ["f.py"], ours_m, theirs_m, "code", plugin_B ) assert resolved == {} assert "f.py" in remaining # =========================================================================== # 25. MergeState — original_conflict_paths (Bug 3) # =========================================================================== class TestMergeStateOriginalConflictPaths: def test_write_sets_original_conflict_paths(self, repo: pathlib.Path) -> None: write_merge_state( repo, base_commit=NULL_LONG_ID, ours_commit=long_id("1" * 64), theirs_commit=long_id("2" * 64), conflict_paths=["config.py::MAX_CONNECTIONS", "utils.py::clamp"], ) state = read_merge_state(repo) assert state is not None assert state.original_conflict_paths == [ "config.py::MAX_CONNECTIONS", "utils.py::clamp", ] def test_original_paths_preserved_after_partial_resolution( self, repo: pathlib.Path ) -> None: write_merge_state( repo, base_commit=NULL_LONG_ID, ours_commit=long_id("1" * 64), theirs_commit=long_id("2" * 64), conflict_paths=["config.py::A", "config.py::B"], ) write_merge_state( repo, base_commit=NULL_LONG_ID, ours_commit=long_id("1" * 64), theirs_commit=long_id("2" * 64), conflict_paths=["config.py::B"], # A resolved ) state = read_merge_state(repo) assert state is not None assert state.conflict_paths == ["config.py::B"] assert state.original_conflict_paths == ["config.py::A", "config.py::B"] def test_original_paths_preserved_when_all_resolved( self, repo: pathlib.Path ) -> None: write_merge_state( repo, base_commit=NULL_LONG_ID, ours_commit=long_id("1" * 64), theirs_commit=long_id("2" * 64), conflict_paths=["config.py::MAX_CONNECTIONS"], ) write_merge_state( repo, base_commit=NULL_LONG_ID, ours_commit=long_id("1" * 64), theirs_commit=long_id("2" * 64), conflict_paths=[], ) state = read_merge_state(repo) assert state is not None assert state.conflict_paths == [] assert state.original_conflict_paths == ["config.py::MAX_CONNECTIONS"] def test_commit_pattern_uses_original_paths(self, repo: pathlib.Path) -> None: ours_id = _write_obj(repo, b"MAX_CONNECTIONS = 50") theirs_id = _write_obj(repo, b"MAX_CONNECTIONS = 25") resolution_id = _write_obj(repo, b"MAX_CONNECTIONS = 50") write_merge_state( repo, base_commit=NULL_LONG_ID, ours_commit=long_id("1" * 64), theirs_commit=long_id("2" * 64), conflict_paths=["config.py::MAX_CONNECTIONS"], ) write_merge_state( repo, base_commit=NULL_LONG_ID, ours_commit=long_id("1" * 64), theirs_commit=long_id("2" * 64), conflict_paths=[], ) state = read_merge_state(repo) assert state is not None paths_for_harmony = state.original_conflict_paths or state.conflict_paths saved = record_resolutions( repo, paths_for_harmony, {"config.py": ours_id}, {"config.py": theirs_id}, {"config.py": resolution_id}, "code", _FakePlugin(), ) assert saved == ["config.py::MAX_CONNECTIONS"] def test_three_path_stepwise_resolution(self, repo: pathlib.Path) -> None: """Simulate resolving three conflicts one-by-one via checkout --ours.""" original = ["a.py::X", "b.py::Y", "c.py::Z"] write_merge_state( repo, base_commit=NULL_LONG_ID, ours_commit=long_id("1" * 64), theirs_commit=long_id("2" * 64), conflict_paths=original, ) # Resolve X write_merge_state( repo, base_commit=NULL_LONG_ID, ours_commit=long_id("1" * 64), theirs_commit=long_id("2" * 64), conflict_paths=["b.py::Y", "c.py::Z"], ) # Resolve Y write_merge_state( repo, base_commit=NULL_LONG_ID, ours_commit=long_id("1" * 64), theirs_commit=long_id("2" * 64), conflict_paths=["c.py::Z"], ) # Resolve Z write_merge_state( repo, base_commit=NULL_LONG_ID, ours_commit=long_id("1" * 64), theirs_commit=long_id("2" * 64), conflict_paths=[], ) state = read_merge_state(repo) assert state is not None assert state.conflict_paths == [] assert state.original_conflict_paths == sorted(original) # =========================================================================== # 26. Full end-to-end integration # =========================================================================== class TestEndToEndIntegration: """High-level workflows that exercise the full harmony lifecycle.""" def test_full_exact_replay_cycle(self, repo: pathlib.Path) -> None: """Conflict → record → same conflict → auto-apply → applied_count == 1.""" ours_id = _write_obj(repo, b"MAX_CONNECTIONS = 10") theirs_id = _write_obj(repo, b"MAX_CONNECTIONS = 50") resolution_content = b"MAX_CONNECTIONS = 50" resolution_id = _write_obj(repo, resolution_content) # Step 1: First conflict. auto_apply has nothing → records pattern. ours_m: Manifest = {"config.py": ours_id} theirs_m: Manifest = {"config.py": theirs_id} resolved, remaining = auto_apply(repo, ["config.py"], ours_m, theirs_m, "code", _FakePlugin()) assert resolved == {} assert "config.py" in remaining assert len(list_patterns(repo)) == 1 # Step 2: User resolves and commits → record_resolutions learns the outcome. new_m: Manifest = {"config.py": resolution_id} saved = record_resolutions(repo, ["config.py"], ours_m, theirs_m, new_m, "code", _FakePlugin()) assert saved == ["config.py"] p = list_patterns(repo)[0] resolutions = list_resolutions(repo, p.pattern_id) assert len(resolutions) == 1 assert resolutions[0].human_verified is True # Step 3: Same conflict recurs → auto_apply replays. dest = repo / "config.py" resolved, remaining = auto_apply(repo, ["config.py"], ours_m, theirs_m, "code", _FakePlugin()) assert "config.py" in resolved assert remaining == [] assert dest.read_bytes() == resolution_content # Step 4: applied_count has been incremented. resolutions = list_resolutions(repo, p.pattern_id) assert resolutions[0].applied_count == 1 def test_symbol_path_full_cycle(self, repo: pathlib.Path) -> None: """Full cycle with symbol-level conflict path.""" ours_id = _write_obj(repo, b"TIMEOUT = 30") theirs_id = _write_obj(repo, b"TIMEOUT = 60") resolution_content = b"TIMEOUT = 45" resolution_id = _write_obj(repo, resolution_content) ours_m: Manifest = {"settings.py": ours_id} theirs_m: Manifest = {"settings.py": theirs_id} # First conflict — records pattern auto_apply(repo, ["settings.py::TIMEOUT"], ours_m, theirs_m, "code", _FakePlugin()) assert len(list_patterns(repo)) == 1 # Record resolution record_resolutions( repo, ["settings.py::TIMEOUT"], ours_m, theirs_m, {"settings.py": resolution_id}, "code", _FakePlugin() ) # Replay dest = repo / "settings.py" resolved, remaining = auto_apply( repo, ["settings.py::TIMEOUT"], ours_m, theirs_m, "code", _FakePlugin() ) assert "settings.py::TIMEOUT" in resolved assert remaining == [] assert dest.read_bytes() == resolution_content def test_gc_clears_stale_unlearned_patterns(self, repo: pathlib.Path) -> None: """Patterns auto_apply recorded (no resolution) are GC'd after threshold.""" ours_id = _write_obj(repo, b"v1") theirs_id = _write_obj(repo, b"v2") ours_m: Manifest = {"f.py": ours_id} theirs_m: Manifest = {"f.py": theirs_id} auto_apply(repo, ["f.py"], ours_m, theirs_m, "code", _FakePlugin()) assert len(list_patterns(repo)) == 1 # Backdate the recorded_at from dataclasses import replace as dc_replace p = list_patterns(repo)[0] old_p = dc_replace(p, recorded_at=_now() - datetime.timedelta(days=100)) forget_pattern(repo, p.pattern_id) record_pattern(repo, old_p) removed = gc_stale(repo, age_days=90) assert removed == 1 assert list_patterns(repo) == [] def test_policy_takes_precedence_in_match(self, repo: pathlib.Path) -> None: """Saved policy matches a pattern — match_policy returns the policy.""" policy = _make_policy( "prefer-ours-for-config", action=PolicyAction.PREFER_OURS, path_pattern="config.py", ) save_policy(repo, policy) ours_id = _write_obj(repo, b"ours") theirs_id = _write_obj(repo, b"theirs") blob_fp = blob_fingerprint(ours_id, theirs_id) pid = compute_pattern_id("config.py", blob_fp, blob_fp) pattern = ConflictPattern( pattern_id=pid, path="config.py", domain="code", conflict_type=ConflictType.CONTENT, blob_fingerprint=blob_fp, semantic_fingerprint=blob_fp, ours_id=ours_id, theirs_id=theirs_id, description={}, recorded_at=_now(), recorded_by="test", ) policies = list_policies(repo) matched = match_policy(policies, pattern) assert matched is not None assert matched.policy_id == "prefer-ours-for-config" assert matched.action == PolicyAction.PREFER_OURS def test_escalation_full_lifecycle(self, repo: pathlib.Path) -> None: """Open escalation → load → resolve → status becomes RESOLVED.""" p = _make_pattern(repo) record_pattern(repo, p) reason = "no policy and no prior resolution" esc_id = compute_escalation_id(p.pattern_id, reason) esc = EscalationRecord( escalation_id=esc_id, pattern_id=p.pattern_id, reason=reason, escalated_at=_now(), escalated_by=AgentProvenance.agent("claude-code"), status=EscalationStatus.OPEN, ) record_escalation(repo, esc) loaded = load_escalation(repo, esc_id) assert loaded is not None assert loaded.status == EscalationStatus.OPEN # Human resolves r = _make_resolution(p, b"manual-outcome", repo) save_resolution(repo, r) resolve_escalation(repo, esc_id, r.resolution_id, AgentProvenance.human(), _now()) loaded = load_escalation(repo, esc_id) assert loaded is not None assert loaded.status == EscalationStatus.RESOLVED assert loaded.resolution_id == r.resolution_id open_escs = list_escalations(repo, status=EscalationStatus.OPEN) assert len(open_escs) == 0 def test_clear_all_then_replay_learns_fresh(self, repo: pathlib.Path) -> None: """After clear_all, harmony starts fresh — no stale resolutions replayed.""" ours_id = _write_obj(repo, b"ours") theirs_id = _write_obj(repo, b"theirs") res_id = _write_obj(repo, b"res") ours_m: Manifest = {"f.py": ours_id} theirs_m: Manifest = {"f.py": theirs_id} new_m: Manifest = {"f.py": res_id} record_resolutions(repo, ["f.py"], ours_m, theirs_m, new_m, "code", _FakePlugin()) assert len(list_patterns(repo)) == 1 clear_all(repo) assert list_patterns(repo) == [] resolved, remaining = auto_apply(repo, ["f.py"], ours_m, theirs_m, "code", _FakePlugin()) assert resolved == {} assert "f.py" in remaining def test_multiple_resolutions_best_wins(self, repo: pathlib.Path) -> None: """When two resolutions exist, auto_apply picks the best.""" ours_id = _write_obj(repo, b"ours") theirs_id = _write_obj(repo, b"theirs") p = _make_pattern(repo, ours_content=b"ours", theirs_content=b"theirs") record_pattern(repo, p) # Unverified low-confidence resolution r_low = _make_resolution(p, b"low-quality-outcome", repo, human_verified=False, confidence=0.4) save_resolution(repo, r_low) # Human-verified high-confidence resolution high_content = b"high-quality-outcome" r_high = _make_resolution(p, high_content, repo, human_verified=True, confidence=1.0) save_resolution(repo, r_high) dest = repo / "config.py" resolved, remaining = auto_apply( repo, ["config.py"], {"config.py": ours_id}, {"config.py": theirs_id}, "code", _FakePlugin(), ) assert "config.py" in resolved assert dest.read_bytes() == high_content # =========================================================================== # 27. AgentProvenance serialization # =========================================================================== class TestAgentProvenance: def test_human_provenance(self) -> None: prov = AgentProvenance.human() assert prov.type == "human" assert prov.agent_id is None assert prov.model_id is None d = prov.to_dict() assert d["type"] == "human" def test_agent_provenance(self) -> None: prov = AgentProvenance.agent("claude-code", "claude-sonnet-4-6") assert prov.type == "agent" assert prov.agent_id == "claude-code" assert prov.model_id == "claude-sonnet-4-6" def test_round_trip(self) -> None: prov = AgentProvenance.agent("claude-code", "claude-sonnet-4-6") d = prov.to_dict() restored = AgentProvenance.from_dict(d) assert restored == prov def test_from_dict_missing_fields_defaults(self) -> None: restored = AgentProvenance.from_dict({}) assert restored.type == "human" assert restored.agent_id is None assert restored.model_id is None def test_agent_without_model(self) -> None: prov = AgentProvenance.agent("my-agent") assert prov.model_id is None d = prov.to_dict() restored = AgentProvenance.from_dict(d) assert restored.agent_id == "my-agent" assert restored.model_id is None # --------------------------------------------------------------------------- # Content-addressed ID format — all harmony IDs must carry sha256: prefix # --------------------------------------------------------------------------- class TestHarmonyIdFormat: """All compute_*_id functions must return sha256:-prefixed IDs (length 71).""" def _obj_id(self, seed: str) -> str: return blob_id(seed.encode()) # blob_fingerprint def test_blob_fingerprint_sha256_prefix(self) -> None: fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b")) assert fp.startswith("sha256:"), f"expected sha256: prefix, got {fp!r}" assert len(fp) == 71 def test_blob_fingerprint_commutative(self) -> None: a, b = self._obj_id("x"), self._obj_id("y") assert blob_fingerprint(a, b) == blob_fingerprint(b, a) def test_blob_fingerprint_deterministic(self) -> None: a, b = self._obj_id("p"), self._obj_id("q") assert blob_fingerprint(a, b) == blob_fingerprint(a, b) def test_blob_fingerprint_differs_by_inputs(self) -> None: a, b, c = self._obj_id("a"), self._obj_id("b"), self._obj_id("c") assert blob_fingerprint(a, b) != blob_fingerprint(a, c) # compute_pattern_id def test_compute_pattern_id_sha256_prefix(self, tmp_path: pathlib.Path) -> None: fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b")) pid = compute_pattern_id("file.py", fp, fp) assert pid.startswith("sha256:") assert len(pid) == 71 def test_compute_pattern_id_deterministic(self) -> None: fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b")) assert compute_pattern_id("f.py", fp, fp) == compute_pattern_id("f.py", fp, fp) def test_compute_pattern_id_differs_by_path(self) -> None: fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b")) assert compute_pattern_id("a.py", fp, fp) != compute_pattern_id("b.py", fp, fp) def test_compute_pattern_id_semantic_differs_from_blob(self) -> None: blob_fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b")) sem_fp = blob_fingerprint(self._obj_id("c"), self._obj_id("d")) pid_blob = compute_pattern_id("f.py", blob_fp, blob_fp) pid_sem = compute_pattern_id("f.py", blob_fp, sem_fp) assert pid_blob != pid_sem # compute_resolution_id def test_compute_resolution_id_sha256_prefix(self) -> None: fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b")) pid = compute_pattern_id("f.py", fp, fp) rid = compute_resolution_id( pid, self._obj_id("resolved"), "ours", AgentProvenance.human(), datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc), ) assert rid.startswith("sha256:") assert len(rid) == 71 def test_compute_resolution_id_deterministic(self) -> None: fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b")) pid = compute_pattern_id("f.py", fp, fp) ts = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc) r1 = compute_resolution_id(pid, self._obj_id("r"), "ours", AgentProvenance.human(), ts) r2 = compute_resolution_id(pid, self._obj_id("r"), "ours", AgentProvenance.human(), ts) assert r1 == r2 def test_compute_resolution_id_differs_by_outcome(self) -> None: fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b")) pid = compute_pattern_id("f.py", fp, fp) ts = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc) r1 = compute_resolution_id(pid, self._obj_id("r1"), "ours", AgentProvenance.human(), ts) r2 = compute_resolution_id(pid, self._obj_id("r2"), "ours", AgentProvenance.human(), ts) assert r1 != r2 # compute_escalation_id def test_compute_escalation_id_sha256_prefix(self) -> None: fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b")) pid = compute_pattern_id("f.py", fp, fp) eid = compute_escalation_id(pid, "no matching policy") assert eid.startswith("sha256:") assert len(eid) == 71 def test_compute_escalation_id_deterministic(self) -> None: fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b")) pid = compute_pattern_id("f.py", fp, fp) e1 = compute_escalation_id(pid, "reason") e2 = compute_escalation_id(pid, "reason") assert e1 == e2 def test_compute_escalation_id_differs_by_reason(self) -> None: fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b")) pid = compute_pattern_id("f.py", fp, fp) e1 = compute_escalation_id(pid, "reason A") e2 = compute_escalation_id(pid, "reason B") assert e1 != e2 # audit_id def test_append_audit_id_sha256_prefix(self, tmp_path: pathlib.Path) -> None: fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b")) pid = compute_pattern_id("f.py", fp, fp) append_audit( tmp_path, AuditEventType.PATTERN_RECORDED, AgentProvenance.human(), pattern_id=pid, ) entries = list_audit(tmp_path, limit=1) assert len(entries) == 1 assert entries[0]["audit_id"].startswith("sha256:") assert len(entries[0]["audit_id"]) == 71