gabriel / muse public
test_harmony_comprehensive.py python
2,424 lines 92.4 KB
Raw
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 7 days ago
1 """Comprehensive TDD suite for muse.core.harmony.
2
3 Every public API surface is exercised: fingerprinting, pattern CRUD,
4 resolution CRUD, best_resolution ranking, GC, audit, policy management,
5 policy matching, escalation lifecycle, auto_apply exact-replay,
6 auto_apply semantic matching via HarmonyPlugin, record_resolutions,
7 MergeState original_conflict_paths, path-traversal guards, and full
8 end-to-end integration flows.
9
10 Organisation
11 ------------
12 Each TestXxx class covers one coherent API surface. Within each class,
13 tests are ordered: happy path → edge cases → error/adversarial cases.
14 """
15 from __future__ import annotations
16
17 import datetime
18 import pathlib
19
20 import pytest
21
22 import muse.core.harmony as h
23 from muse.core.harmony import (
24 AgentProvenance,
25 AuditEventType,
26 ConflictPattern,
27 ConflictType,
28 EscalationRecord,
29 EscalationStatus,
30 Policy,
31 PolicyAction,
32 PolicyCondition,
33 PolicyScope,
34 Resolution,
35 ResolutionStrategy,
36 append_audit,
37 auto_apply,
38 best_resolution,
39 blob_fingerprint,
40 clear_all,
41 compute_escalation_id,
42 compute_pattern_id,
43 compute_resolution_id,
44 compute_semantic_fingerprint,
45 forget_pattern,
46 gc_stale,
47 increment_applied_count,
48 list_audit,
49 list_escalations,
50 list_patterns,
51 list_policies,
52 list_resolutions,
53 load_escalation,
54 load_pattern,
55 load_policy,
56 load_resolution,
57 match_policy,
58 record_escalation,
59 record_pattern,
60 record_resolutions,
61 remove_policy,
62 resolve_escalation,
63 save_policy,
64 save_resolution,
65 )
66 from muse.core.merge_engine import read_merge_state, write_merge_state
67 from muse.core.object_store import write_object
68 from muse.core.types import Manifest, MsgpackDict, NULL_LONG_ID, blob_id, long_id
69 from muse.domain import HarmonyPlugin, LiveState, StateDelta, StateSnapshot
70 from muse.core.paths import muse_dir
71
72
73 # ---------------------------------------------------------------------------
74 # Shared helpers
75 # ---------------------------------------------------------------------------
76
77
78 def _hex64(seed: str) -> str:
79 """Return a valid sha256: content-addressed ID derived from seed.
80
81 Used as a cheap deterministic fingerprint in tests — the seed uniquely
82 determines the ID, so tests can express 'same fingerprint' vs 'different
83 fingerprint' without computing actual content hashes.
84 """
85 return blob_id(seed.encode())
86
87
88 def _write_obj(root: pathlib.Path, content: bytes) -> str:
89 oid = blob_id(content)
90 write_object(root, oid, content)
91 return oid
92
93
94 def _now() -> datetime.datetime:
95 return datetime.datetime.now(datetime.timezone.utc)
96
97
98 def _make_pattern(
99 root: pathlib.Path,
100 *,
101 path: str = "config.py",
102 ours_content: bytes = b"a",
103 theirs_content: bytes = b"b",
104 domain: str = "code",
105 ) -> ConflictPattern:
106 ours_id = _write_obj(root, ours_content)
107 theirs_id = _write_obj(root, theirs_content)
108 blob_fp = blob_fingerprint(ours_id, theirs_id)
109 pattern_id = compute_pattern_id(path, blob_fp, blob_fp)
110 return ConflictPattern(
111 pattern_id=pattern_id,
112 path=path,
113 domain=domain,
114 conflict_type=ConflictType.CONTENT,
115 blob_fingerprint=blob_fp,
116 semantic_fingerprint=blob_fp,
117 ours_id=ours_id,
118 theirs_id=theirs_id,
119 description={},
120 recorded_at=_now(),
121 recorded_by="test",
122 )
123
124
125 def _make_resolution(
126 pattern: ConflictPattern,
127 outcome_content: bytes,
128 root: pathlib.Path,
129 *,
130 confidence: float = 1.0,
131 human_verified: bool = True,
132 applied_count: int = 0,
133 strategy: str = ResolutionStrategy.MANUAL,
134 ) -> Resolution:
135 outcome_blob = _write_obj(root, outcome_content)
136 now = _now()
137 rid = compute_resolution_id(
138 pattern.pattern_id, outcome_blob, strategy, AgentProvenance.human(), now
139 )
140 return Resolution(
141 resolution_id=rid,
142 pattern_id=pattern.pattern_id,
143 strategy=strategy,
144 policy_id=None,
145 outcome_blob=outcome_blob,
146 resolved_by=AgentProvenance.human(),
147 human_verified=human_verified,
148 confidence=confidence,
149 rationale="test",
150 resolved_at=now,
151 applied_count=applied_count,
152 )
153
154
155 def _make_policy(
156 policy_id: str = "test-policy",
157 *,
158 action: str = PolicyAction.PREFER_OURS,
159 scope: str = PolicyScope.REPO,
160 conflict_type: str | None = None,
161 domain: str | None = None,
162 path_pattern: str | None = None,
163 confidence: float = 0.9,
164 ) -> Policy:
165 return Policy(
166 policy_id=policy_id,
167 description="test",
168 when=PolicyCondition(
169 conflict_type=conflict_type,
170 domain=domain,
171 path_pattern=path_pattern,
172 ),
173 action=action,
174 confidence=confidence,
175 escalate_to=None,
176 delegate_to=None,
177 scope=scope,
178 created_at=_now(),
179 created_by="test",
180 )
181
182
183 type _StubResult = MsgpackDict
184
185
186 class _FakePlugin:
187 """Minimal MuseDomainPlugin — no HarmonyPlugin sub-protocol."""
188 name = "test"
189 def schema(self) -> _StubResult: return {}
190
191
192 def _noop(*args: str, **kwargs: str) -> _StubResult:
193 """Stub for unused MuseDomainPlugin methods in test doubles."""
194 return {}
195
196
197 class _SemanticPlugin:
198 """HarmonyPlugin that returns a fixed semantic fingerprint.
199
200 Implements all MuseDomainPlugin required methods so that
201 isinstance(plugin, HarmonyPlugin) returns True at runtime.
202 """
203 name = "semantic"
204
205 def __init__(self, fixed_fp: str) -> None:
206 self._fp = fixed_fp
207
208 def schema(self) -> _StubResult: return {}
209 def snapshot(self, live_state: LiveState) -> StateSnapshot: return _noop() # type: ignore[return-value]
210 def diff(self, base: StateSnapshot, target: StateSnapshot, *, repo_root: pathlib.Path | None = None) -> _StubResult: return _noop()
211 def merge(self, base: StateSnapshot, left: StateSnapshot, right: StateSnapshot, *, repo_root: pathlib.Path | None = None) -> _StubResult: return _noop()
212 def apply(self, delta: StateDelta, live_state: LiveState) -> LiveState: return _noop() # type: ignore[return-value]
213 def drift(self, *args: str, **kwargs: str) -> _StubResult: return _noop()
214
215 def conflict_fingerprint(
216 self,
217 path: str,
218 ours_id: str,
219 theirs_id: str,
220 repo_root: pathlib.Path,
221 ) -> str:
222 return self._fp
223
224
225 class _ThrowingPlugin:
226 """HarmonyPlugin that always raises during conflict_fingerprint."""
227 name = "throwing"
228
229 def schema(self) -> _StubResult: return {}
230 def snapshot(self, live_state: LiveState) -> StateSnapshot: return _noop() # type: ignore[return-value]
231 def diff(self, base: StateSnapshot, target: StateSnapshot, *, repo_root: pathlib.Path | None = None) -> _StubResult: return _noop()
232 def merge(self, base: StateSnapshot, left: StateSnapshot, right: StateSnapshot, *, repo_root: pathlib.Path | None = None) -> _StubResult: return _noop()
233 def apply(self, delta: StateDelta, live_state: LiveState) -> LiveState: return _noop() # type: ignore[return-value]
234 def drift(self, *args: str, **kwargs: str) -> _StubResult: return _noop()
235
236 def conflict_fingerprint(self, path: str, ours_id: str, theirs_id: str, repo_root: pathlib.Path) -> str:
237 raise RuntimeError("deliberate error")
238
239
240 class _BadLengthPlugin:
241 """HarmonyPlugin that returns a fingerprint of the wrong length."""
242 name = "bad"
243
244 def schema(self) -> _StubResult: return {}
245 def snapshot(self, live_state: LiveState) -> StateSnapshot: return _noop() # type: ignore[return-value]
246 def diff(self, base: StateSnapshot, target: StateSnapshot, *, repo_root: pathlib.Path | None = None) -> _StubResult: return _noop()
247 def merge(self, base: StateSnapshot, left: StateSnapshot, right: StateSnapshot, *, repo_root: pathlib.Path | None = None) -> _StubResult: return _noop()
248 def apply(self, delta: StateDelta, live_state: LiveState) -> LiveState: return _noop() # type: ignore[return-value]
249 def drift(self, *args: str, **kwargs: str) -> _StubResult: return _noop()
250
251 def conflict_fingerprint(self, path: str, ours_id: str, theirs_id: str, repo_root: pathlib.Path) -> str:
252 return "tooshort"
253
254
255 @pytest.fixture()
256 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
257 muse_dir(tmp_path).mkdir()
258 return tmp_path
259
260
261 # ===========================================================================
262 # 1. blob_fingerprint
263 # ===========================================================================
264
265
266 class TestBlobFingerprint:
267 def test_returns_sha256_id(self) -> None:
268 ours = blob_id(b"x")
269 theirs = blob_id(b"y")
270 fp = blob_fingerprint(ours, theirs)
271 assert fp.startswith("sha256:")
272 assert len(fp) == 71
273
274 def test_commutative(self) -> None:
275 a = blob_id(b"alpha")
276 b = blob_id(b"beta")
277 assert blob_fingerprint(a, b) == blob_fingerprint(b, a)
278
279 def test_stable(self) -> None:
280 a = blob_id(b"stable")
281 b = blob_id(b"input")
282 assert blob_fingerprint(a, b) == blob_fingerprint(a, b)
283
284 def test_different_pairs_produce_different_fingerprints(self) -> None:
285 a, b, c = blob_id(b"a"), blob_id(b"b"), blob_id(b"c")
286 assert blob_fingerprint(a, b) != blob_fingerprint(a, c)
287 assert blob_fingerprint(a, b) != blob_fingerprint(b, c)
288
289 def test_same_id_both_sides(self) -> None:
290 # Degenerate case: ours == theirs (no actual conflict)
291 a = blob_id(b"same")
292 fp = blob_fingerprint(a, a)
293 assert len(fp) == 71 and fp.startswith("sha256:") # still produces valid output
294
295
296 # ===========================================================================
297 # 2. compute_pattern_id
298 # ===========================================================================
299
300
301 class TestComputePatternId:
302 def test_returns_sha256_id(self) -> None:
303 fp = blob_fingerprint(blob_id(b"a"), blob_id(b"b"))
304 pid = compute_pattern_id("f.py", fp, fp)
305 assert pid.startswith("sha256:")
306 assert len(pid) == 71
307
308 def test_stable(self) -> None:
309 assert (
310 compute_pattern_id("f.py", _hex64("a"), _hex64("b"))
311 == compute_pattern_id("f.py", _hex64("a"), _hex64("b"))
312 )
313
314 def test_path_sensitive(self) -> None:
315 blob = _hex64("fp")
316 sem = _hex64("sfp")
317 assert compute_pattern_id("a.py", blob, sem) != compute_pattern_id("b.py", blob, sem)
318
319 def test_blob_fp_sensitive_when_no_semantic_plugin(self) -> None:
320 """When blob_fp == semantic_fp (no plugin), blob_fp determines identity."""
321 # Same blob_fp used for both slots (exact-replay mode)
322 fp1 = _hex64("fp1")
323 fp2 = _hex64("fp2")
324 # When blob_fp == semantic_fp, the formula is f"{path}:{blob_fp}:{semantic_fp}"
325 assert compute_pattern_id("f.py", fp1, fp1) != compute_pattern_id("f.py", fp2, fp2)
326
327 def test_blob_fp_irrelevant_when_semantic_fp_differs(self) -> None:
328 """When semantic_fp ≠ blob_fp, pattern_id depends only on semantic_fp and path.
329
330 This is the semantic plugin model: two conflicts with different blob IDs
331 but the same semantic fingerprint map to the same pattern, enabling
332 cross-content replay.
333 """
334 sem = _hex64("sfp")
335 # Two different blob_fps with the same semantic_fp → same pattern_id
336 pid1 = compute_pattern_id("f.py", _hex64("fp1"), sem)
337 pid2 = compute_pattern_id("f.py", _hex64("fp2"), sem)
338 assert pid1 == pid2, (
339 "When semantic_fp != blob_fp, pattern_id must depend only on "
340 "semantic_fp — different blob pairs with the same semantic shape "
341 "must share a pattern to enable cross-content replay"
342 )
343
344 def test_semantic_fp_sensitive(self) -> None:
345 blob = _hex64("bfp")
346 assert (
347 compute_pattern_id("f.py", blob, _hex64("s1"))
348 != compute_pattern_id("f.py", blob, _hex64("s2"))
349 )
350
351 def test_symbol_path_distinct_from_file_path(self) -> None:
352 blob = _hex64("fp")
353 sem = _hex64("sfp")
354 assert (
355 compute_pattern_id("config.py", blob, sem)
356 != compute_pattern_id("config.py::MAX_CONNECTIONS", blob, sem)
357 )
358
359
360 # ===========================================================================
361 # 3. compute_semantic_fingerprint
362 # ===========================================================================
363
364
365 class TestComputeSemanticFingerprint:
366 def test_no_plugin_returns_blob_fingerprint(self, repo: pathlib.Path) -> None:
367 ours = blob_id(b"x")
368 theirs = blob_id(b"y")
369 result = compute_semantic_fingerprint("f.py", ours, theirs, _FakePlugin(), repo)
370 assert result == blob_fingerprint(ours, theirs)
371
372 def test_commutative_no_plugin(self, repo: pathlib.Path) -> None:
373 ours = blob_id(b"x")
374 theirs = blob_id(b"y")
375 r1 = compute_semantic_fingerprint("f.py", ours, theirs, _FakePlugin(), repo)
376 r2 = compute_semantic_fingerprint("f.py", theirs, ours, _FakePlugin(), repo)
377 assert r1 == r2
378
379 def test_harmony_plugin_used_when_available(self, repo: pathlib.Path) -> None:
380 fixed = _hex64("fixed-semantic")
381 plugin = _SemanticPlugin(fixed)
382 ours = blob_id(b"x")
383 theirs = blob_id(b"y")
384 result = compute_semantic_fingerprint("f.py", ours, theirs, plugin, repo)
385 assert result == fixed
386
387 def test_throwing_plugin_falls_back_to_blob(self, repo: pathlib.Path) -> None:
388 ours = blob_id(b"x")
389 theirs = blob_id(b"y")
390 result = compute_semantic_fingerprint(
391 "f.py", ours, theirs, _ThrowingPlugin(), repo
392 )
393 assert result == blob_fingerprint(ours, theirs)
394
395 def test_bad_length_plugin_falls_back_to_blob(self, repo: pathlib.Path) -> None:
396 ours = blob_id(b"x")
397 theirs = blob_id(b"y")
398 result = compute_semantic_fingerprint(
399 "f.py", ours, theirs, _BadLengthPlugin(), repo
400 )
401 assert result == blob_fingerprint(ours, theirs)
402
403 def test_path_does_not_affect_blob_fallback(self, repo: pathlib.Path) -> None:
404 ours = blob_id(b"x")
405 theirs = blob_id(b"y")
406 r1 = compute_semantic_fingerprint("a.py", ours, theirs, _FakePlugin(), repo)
407 r2 = compute_semantic_fingerprint("b.py", ours, theirs, _FakePlugin(), repo)
408 # blob_fingerprint is path-independent
409 assert r1 == r2
410
411
412 # ===========================================================================
413 # 4. record_pattern / load_pattern
414 # ===========================================================================
415
416
417 class TestRecordPatternAndLoad:
418 def test_record_then_load(self, repo: pathlib.Path) -> None:
419 p = _make_pattern(repo)
420 record_pattern(repo, p)
421 loaded = load_pattern(repo, p.pattern_id)
422 assert loaded is not None
423 assert loaded.pattern_id == p.pattern_id
424 assert loaded.path == p.path
425 assert loaded.domain == p.domain
426
427 def test_record_idempotent(self, repo: pathlib.Path) -> None:
428 p = _make_pattern(repo)
429 record_pattern(repo, p)
430 record_pattern(repo, p) # second call must be a no-op
431 assert len(list_patterns(repo)) == 1
432
433 def test_load_missing_returns_none(self, repo: pathlib.Path) -> None:
434 assert load_pattern(repo, _hex64("missing")) is None
435
436 def test_load_invalid_id_returns_none(self, repo: pathlib.Path) -> None:
437 assert load_pattern(repo, "not-hex-64") is None
438
439 def test_record_invalid_id_raises(self, repo: pathlib.Path) -> None:
440 p = _make_pattern(repo)
441 # Replace the pattern_id with an invalid value via dataclass replace
442 from dataclasses import replace as dc_replace
443 bad_p = dc_replace(p, pattern_id="short")
444 with pytest.raises(ValueError):
445 record_pattern(repo, bad_p)
446
447 def test_all_fields_round_trip(self, repo: pathlib.Path) -> None:
448 p = _make_pattern(repo, path="src/api.py::handle_request", domain="code")
449 record_pattern(repo, p)
450 loaded = load_pattern(repo, p.pattern_id)
451 assert loaded is not None
452 assert loaded.path == "src/api.py::handle_request"
453 assert loaded.conflict_type == ConflictType.CONTENT
454 assert loaded.blob_fingerprint == p.blob_fingerprint
455
456
457 # ===========================================================================
458 # 5. list_patterns
459 # ===========================================================================
460
461
462 class TestListPatterns:
463 def test_empty_store(self, repo: pathlib.Path) -> None:
464 assert list_patterns(repo) == []
465
466 def test_one_pattern(self, repo: pathlib.Path) -> None:
467 p = _make_pattern(repo)
468 record_pattern(repo, p)
469 patterns = list_patterns(repo)
470 assert len(patterns) == 1
471 assert patterns[0].pattern_id == p.pattern_id
472
473 def test_multiple_patterns(self, repo: pathlib.Path) -> None:
474 for i in range(5):
475 p = _make_pattern(repo, ours_content=bytes([i]), theirs_content=bytes([i + 10]))
476 record_pattern(repo, p)
477 assert len(list_patterns(repo)) == 5
478
479 def test_distinct_paths_produce_distinct_patterns(self, repo: pathlib.Path) -> None:
480 ours = _write_obj(repo, b"ours")
481 theirs = _write_obj(repo, b"theirs")
482 paths = ["a.py::foo", "a.py::bar", "b.py::baz"]
483 for path in paths:
484 blob_fp = blob_fingerprint(ours, theirs)
485 pid = compute_pattern_id(path, blob_fp, blob_fp)
486 from dataclasses import replace as dc_replace
487 p = dc_replace(
488 _make_pattern(repo),
489 pattern_id=pid,
490 path=path,
491 ours_id=ours,
492 theirs_id=theirs,
493 blob_fingerprint=blob_fp,
494 semantic_fingerprint=blob_fp,
495 )
496 record_pattern(repo, p)
497 result = list_patterns(repo)
498 assert len(result) == 3
499 stored_paths = {r.path for r in result}
500 assert stored_paths == set(paths)
501
502
503 # ===========================================================================
504 # 6. forget_pattern
505 # ===========================================================================
506
507
508 class TestForgetPattern:
509 def test_forget_existing(self, repo: pathlib.Path) -> None:
510 p = _make_pattern(repo)
511 record_pattern(repo, p)
512 assert forget_pattern(repo, p.pattern_id) is True
513 assert load_pattern(repo, p.pattern_id) is None
514
515 def test_forget_missing_returns_false(self, repo: pathlib.Path) -> None:
516 assert forget_pattern(repo, _hex64("gone")) is False
517
518 def test_forget_also_removes_resolutions(self, repo: pathlib.Path) -> None:
519 p = _make_pattern(repo)
520 record_pattern(repo, p)
521 r = _make_resolution(p, b"outcome", repo)
522 save_resolution(repo, r)
523 forget_pattern(repo, p.pattern_id)
524 assert list_resolutions(repo, p.pattern_id) == []
525
526 def test_invalid_id_returns_false(self, repo: pathlib.Path) -> None:
527 assert forget_pattern(repo, "bad-id") is False
528
529
530 # ===========================================================================
531 # 7. clear_all
532 # ===========================================================================
533
534
535 class TestClearAll:
536 def test_empty_store(self, repo: pathlib.Path) -> None:
537 assert clear_all(repo) == 0
538
539 def test_clears_all_patterns(self, repo: pathlib.Path) -> None:
540 for i in range(3):
541 p = _make_pattern(repo, ours_content=bytes([i]), theirs_content=bytes([i + 20]))
542 record_pattern(repo, p)
543 assert clear_all(repo) == 3
544 assert list_patterns(repo) == []
545
546 def test_clears_patterns_with_resolutions(self, repo: pathlib.Path) -> None:
547 p = _make_pattern(repo)
548 record_pattern(repo, p)
549 r = _make_resolution(p, b"out", repo)
550 save_resolution(repo, r)
551 clear_all(repo)
552 assert list_patterns(repo) == []
553 assert list_resolutions(repo, p.pattern_id) == []
554
555
556 # ===========================================================================
557 # 8. save_resolution / load_resolution
558 # ===========================================================================
559
560
561 class TestSaveAndLoadResolution:
562 def test_save_then_load(self, repo: pathlib.Path) -> None:
563 p = _make_pattern(repo)
564 record_pattern(repo, p)
565 r = _make_resolution(p, b"resolved content", repo)
566 save_resolution(repo, r)
567 loaded = load_resolution(repo, p.pattern_id, r.resolution_id)
568 assert loaded is not None
569 assert loaded.resolution_id == r.resolution_id
570 assert loaded.outcome_blob == r.outcome_blob
571 assert loaded.human_verified is True
572 assert loaded.confidence == 1.0
573
574 def test_load_missing_returns_none(self, repo: pathlib.Path) -> None:
575 p = _make_pattern(repo)
576 record_pattern(repo, p)
577 assert load_resolution(repo, p.pattern_id, _hex64("gone")) is None
578
579 def test_save_is_idempotent(self, repo: pathlib.Path) -> None:
580 """save_resolution is explicitly idempotent — second call is a no-op."""
581 p = _make_pattern(repo)
582 record_pattern(repo, p)
583 r = _make_resolution(p, b"v1", repo)
584 save_resolution(repo, r)
585 # Second call with same resolution_id must not raise and must not overwrite
586 from dataclasses import replace as dc_replace
587 r2 = dc_replace(r, applied_count=99, confidence=0.01)
588 save_resolution(repo, r2) # must be a no-op
589 loaded = load_resolution(repo, p.pattern_id, r.resolution_id)
590 assert loaded is not None
591 assert loaded.applied_count == 0 # original value preserved
592 assert loaded.confidence == 1.0 # original value preserved
593
594 def test_save_requires_existing_pattern(self, repo: pathlib.Path) -> None:
595 """save_resolution raises FileNotFoundError when pattern doesn't exist."""
596 fake_pattern_id = _hex64("nonexistent-pattern")
597 outcome = _write_obj(repo, b"outcome")
598 now = _now()
599 prov = AgentProvenance.human()
600 rid = compute_resolution_id(fake_pattern_id, outcome, ResolutionStrategy.MANUAL, prov, now)
601 r = Resolution(
602 resolution_id=rid,
603 pattern_id=fake_pattern_id,
604 strategy=ResolutionStrategy.MANUAL,
605 policy_id=None,
606 outcome_blob=outcome,
607 resolved_by=prov,
608 human_verified=False,
609 confidence=0.5,
610 rationale="orphan",
611 resolved_at=now,
612 applied_count=0,
613 )
614 with pytest.raises(FileNotFoundError):
615 save_resolution(repo, r)
616
617 def test_agent_provenance_round_trip(self, repo: pathlib.Path) -> None:
618 p = _make_pattern(repo)
619 record_pattern(repo, p)
620 outcome = _write_obj(repo, b"agent-resolved")
621 now = _now()
622 prov = AgentProvenance.agent("claude-code", "claude-sonnet-4-6")
623 rid = compute_resolution_id(
624 p.pattern_id, outcome, ResolutionStrategy.EXACT_REPLAY, prov, now
625 )
626 r = Resolution(
627 resolution_id=rid,
628 pattern_id=p.pattern_id,
629 strategy=ResolutionStrategy.EXACT_REPLAY,
630 policy_id=None,
631 outcome_blob=outcome,
632 resolved_by=prov,
633 human_verified=False,
634 confidence=0.95,
635 rationale="agent resolved",
636 resolved_at=now,
637 applied_count=0,
638 )
639 save_resolution(repo, r)
640 loaded = load_resolution(repo, p.pattern_id, rid)
641 assert loaded is not None
642 assert loaded.resolved_by.type == "agent"
643 assert loaded.resolved_by.agent_id == "claude-code"
644 assert loaded.resolved_by.model_id == "claude-sonnet-4-6"
645 assert loaded.confidence == 0.95
646 assert loaded.human_verified is False
647
648
649 # ===========================================================================
650 # 9. list_resolutions + sorting
651 # ===========================================================================
652
653
654 class TestListResolutions:
655 def test_empty(self, repo: pathlib.Path) -> None:
656 p = _make_pattern(repo)
657 record_pattern(repo, p)
658 assert list_resolutions(repo, p.pattern_id) == []
659
660 def test_single(self, repo: pathlib.Path) -> None:
661 p = _make_pattern(repo)
662 record_pattern(repo, p)
663 r = _make_resolution(p, b"out", repo)
664 save_resolution(repo, r)
665 result = list_resolutions(repo, p.pattern_id)
666 assert len(result) == 1
667 assert result[0].resolution_id == r.resolution_id
668
669 def test_sorted_human_verified_first(self, repo: pathlib.Path) -> None:
670 """human_verified=True must sort before human_verified=False."""
671 p = _make_pattern(repo)
672 record_pattern(repo, p)
673
674 unverified = _make_resolution(p, b"unverified", repo, human_verified=False, confidence=1.0)
675 verified = _make_resolution(p, b"verified", repo, human_verified=True, confidence=0.5)
676 save_resolution(repo, unverified)
677 save_resolution(repo, verified)
678
679 result = list_resolutions(repo, p.pattern_id)
680 assert result[0].human_verified is True
681
682 def test_sorted_confidence_descending(self, repo: pathlib.Path) -> None:
683 p = _make_pattern(repo)
684 record_pattern(repo, p)
685
686 low = _make_resolution(p, b"low", repo, confidence=0.3, human_verified=False)
687 high = _make_resolution(p, b"high", repo, confidence=0.9, human_verified=False)
688 save_resolution(repo, low)
689 save_resolution(repo, high)
690
691 result = list_resolutions(repo, p.pattern_id)
692 assert result[0].confidence == 0.9
693
694 def test_sorted_applied_count_descending_as_tiebreaker(
695 self, repo: pathlib.Path
696 ) -> None:
697 p = _make_pattern(repo)
698 record_pattern(repo, p)
699
700 rarely = _make_resolution(p, b"rarely", repo, confidence=0.8, applied_count=1)
701 often = _make_resolution(p, b"often", repo, confidence=0.8, applied_count=10)
702 save_resolution(repo, rarely)
703 save_resolution(repo, often)
704
705 result = list_resolutions(repo, p.pattern_id)
706 assert result[0].applied_count == 10
707
708 def test_invalid_pattern_id_returns_empty(self, repo: pathlib.Path) -> None:
709 assert list_resolutions(repo, "bad-id") == []
710
711
712 # ===========================================================================
713 # 10. best_resolution
714 # ===========================================================================
715
716
717 class TestBestResolution:
718 def test_returns_none_when_no_resolutions(self, repo: pathlib.Path) -> None:
719 p = _make_pattern(repo)
720 record_pattern(repo, p)
721 assert best_resolution(repo, p.pattern_id) is None
722
723 def test_returns_only_resolution(self, repo: pathlib.Path) -> None:
724 p = _make_pattern(repo)
725 record_pattern(repo, p)
726 r = _make_resolution(p, b"out", repo)
727 save_resolution(repo, r)
728 best = best_resolution(repo, p.pattern_id)
729 assert best is not None
730 assert best.resolution_id == r.resolution_id
731
732 def test_prefers_human_verified_over_high_confidence(
733 self, repo: pathlib.Path
734 ) -> None:
735 p = _make_pattern(repo)
736 record_pattern(repo, p)
737
738 agent_high = _make_resolution(
739 p, b"agent", repo, human_verified=False, confidence=0.99
740 )
741 human_low = _make_resolution(
742 p, b"human", repo, human_verified=True, confidence=0.5
743 )
744 save_resolution(repo, agent_high)
745 save_resolution(repo, human_low)
746
747 best = best_resolution(repo, p.pattern_id)
748 assert best is not None
749 assert best.human_verified is True
750
751 def test_prefers_higher_confidence(self, repo: pathlib.Path) -> None:
752 p = _make_pattern(repo)
753 record_pattern(repo, p)
754
755 low = _make_resolution(p, b"low", repo, confidence=0.4, human_verified=False)
756 high = _make_resolution(p, b"high", repo, confidence=0.8, human_verified=False)
757 save_resolution(repo, low)
758 save_resolution(repo, high)
759
760 best = best_resolution(repo, p.pattern_id)
761 assert best is not None
762 assert best.confidence == 0.8
763
764 def test_prefers_higher_applied_count_as_tiebreaker(
765 self, repo: pathlib.Path
766 ) -> None:
767 p = _make_pattern(repo)
768 record_pattern(repo, p)
769
770 rare = _make_resolution(p, b"rare", repo, applied_count=2, confidence=0.9)
771 freq = _make_resolution(p, b"freq", repo, applied_count=20, confidence=0.9)
772 save_resolution(repo, rare)
773 save_resolution(repo, freq)
774
775 best = best_resolution(repo, p.pattern_id)
776 assert best is not None
777 assert best.applied_count == 20
778
779
780 # ===========================================================================
781 # 11. increment_applied_count
782 # ===========================================================================
783
784
785 class TestIncrementAppliedCount:
786 def test_increments_from_zero(self, repo: pathlib.Path) -> None:
787 p = _make_pattern(repo)
788 record_pattern(repo, p)
789 r = _make_resolution(p, b"out", repo, applied_count=0)
790 save_resolution(repo, r)
791
792 result = increment_applied_count(repo, p.pattern_id, r.resolution_id)
793 assert result is True
794
795 loaded = load_resolution(repo, p.pattern_id, r.resolution_id)
796 assert loaded is not None
797 assert loaded.applied_count == 1
798
799 def test_increments_multiple_times(self, repo: pathlib.Path) -> None:
800 p = _make_pattern(repo)
801 record_pattern(repo, p)
802 r = _make_resolution(p, b"out", repo, applied_count=0)
803 save_resolution(repo, r)
804
805 for _ in range(5):
806 increment_applied_count(repo, p.pattern_id, r.resolution_id)
807
808 loaded = load_resolution(repo, p.pattern_id, r.resolution_id)
809 assert loaded is not None
810 assert loaded.applied_count == 5
811
812 def test_returns_false_for_missing_resolution(self, repo: pathlib.Path) -> None:
813 p = _make_pattern(repo)
814 record_pattern(repo, p)
815 result = increment_applied_count(repo, p.pattern_id, _hex64("gone"))
816 assert result is False
817
818
819 # ===========================================================================
820 # 12. gc_stale
821 # ===========================================================================
822
823
824 class TestGcStale:
825 def test_empty_store(self, repo: pathlib.Path) -> None:
826 assert gc_stale(repo, age_days=0) == 0
827
828 def test_removes_old_pattern_without_resolution(self, repo: pathlib.Path) -> None:
829 p = _make_pattern(repo)
830 from dataclasses import replace as dc_replace
831 old_time = _now() - datetime.timedelta(days=200)
832 old_p = dc_replace(p, recorded_at=old_time)
833 record_pattern(repo, old_p)
834
835 removed = gc_stale(repo, age_days=90)
836 assert removed == 1
837 assert list_patterns(repo) == []
838
839 def test_keeps_pattern_with_resolution(self, repo: pathlib.Path) -> None:
840 p = _make_pattern(repo)
841 from dataclasses import replace as dc_replace
842 old_time = _now() - datetime.timedelta(days=200)
843 old_p = dc_replace(p, recorded_at=old_time)
844 record_pattern(repo, old_p)
845 r = _make_resolution(old_p, b"resolved", repo)
846 save_resolution(repo, r)
847
848 removed = gc_stale(repo, age_days=90)
849 assert removed == 0
850 assert len(list_patterns(repo)) == 1
851
852 def test_keeps_recent_pattern_without_resolution(self, repo: pathlib.Path) -> None:
853 p = _make_pattern(repo)
854 record_pattern(repo, p) # recorded_at = now
855
856 removed = gc_stale(repo, age_days=90)
857 assert removed == 0
858 assert len(list_patterns(repo)) == 1
859
860 def test_removes_only_old_stale_patterns(self, repo: pathlib.Path) -> None:
861 """Mix of old-stale, old-resolved, new-stale — only old-stale removed."""
862 from dataclasses import replace as dc_replace
863
864 old_stale = _make_pattern(repo, ours_content=b"old_s_o", theirs_content=b"old_s_t")
865 old_resolved = _make_pattern(repo, ours_content=b"old_r_o", theirs_content=b"old_r_t")
866 new_stale = _make_pattern(repo, ours_content=b"new_s_o", theirs_content=b"new_s_t")
867
868 old_time = _now() - datetime.timedelta(days=100)
869 old_stale = dc_replace(old_stale, recorded_at=old_time)
870 old_resolved = dc_replace(old_resolved, recorded_at=old_time)
871
872 record_pattern(repo, old_stale)
873 record_pattern(repo, old_resolved)
874 record_pattern(repo, new_stale)
875
876 r = _make_resolution(old_resolved, b"res", repo)
877 save_resolution(repo, r)
878
879 removed = gc_stale(repo, age_days=90)
880 assert removed == 1
881 remaining = {p.pattern_id for p in list_patterns(repo)}
882 assert old_resolved.pattern_id in remaining
883 assert new_stale.pattern_id in remaining
884 assert old_stale.pattern_id not in remaining
885
886
887 # ===========================================================================
888 # 13. Audit log
889 # ===========================================================================
890
891
892 class TestAuditLog:
893 """append_audit(root, event_type, acted_by, *, pattern_id, resolution_id, policy_id, metadata)"""
894
895 def test_empty_store(self, repo: pathlib.Path) -> None:
896 assert list_audit(repo) == []
897
898 def test_append_and_list(self, repo: pathlib.Path) -> None:
899 append_audit(
900 repo,
901 AuditEventType.PATTERN_RECORDED,
902 AgentProvenance.human(),
903 pattern_id=_hex64("p"),
904 )
905 entries = list_audit(repo)
906 assert len(entries) == 1
907 assert entries[0]["event_type"] == AuditEventType.PATTERN_RECORDED
908
909 def test_multiple_entries_accumulate(self, repo: pathlib.Path) -> None:
910 for i in range(5):
911 append_audit(
912 repo,
913 AuditEventType.RESOLUTION_SAVED,
914 AgentProvenance.human(),
915 metadata={"i": i},
916 )
917 assert len(list_audit(repo)) == 5
918
919 def test_limit_parameter(self, repo: pathlib.Path) -> None:
920 for _ in range(10):
921 append_audit(repo, AuditEventType.PATTERN_RECORDED, AgentProvenance.human())
922 assert len(list_audit(repo, limit=3)) == 3
923
924 def test_audit_is_append_only(self, repo: pathlib.Path) -> None:
925 append_audit(repo, AuditEventType.PATTERN_RECORDED, AgentProvenance.human())
926 append_audit(repo, AuditEventType.RESOLUTION_APPLIED, AgentProvenance.human())
927 entries = list_audit(repo)
928 types = {e["event_type"] for e in entries}
929 assert AuditEventType.PATTERN_RECORDED in types
930 assert AuditEventType.RESOLUTION_APPLIED in types
931
932 def test_agent_provenance_in_audit(self, repo: pathlib.Path) -> None:
933 prov = AgentProvenance.agent("claude-code", "claude-sonnet-4-6")
934 append_audit(repo, AuditEventType.RESOLUTION_APPLIED, prov)
935 entries = list_audit(repo)
936 assert len(entries) == 1
937 assert entries[0]["acted_by"]["type"] == "agent"
938 assert entries[0]["acted_by"]["agent_id"] == "claude-code"
939
940 def test_different_event_types(self, repo: pathlib.Path) -> None:
941 for event in [
942 AuditEventType.PATTERN_RECORDED,
943 AuditEventType.RESOLUTION_SAVED,
944 AuditEventType.RESOLUTION_APPLIED,
945 AuditEventType.GC_RUN,
946 AuditEventType.CLEAR_RUN,
947 ]:
948 append_audit(repo, event, AgentProvenance.human())
949 entries = list_audit(repo, limit=100)
950 event_types = {e["event_type"] for e in entries}
951 for event in [
952 AuditEventType.PATTERN_RECORDED,
953 AuditEventType.RESOLUTION_SAVED,
954 AuditEventType.RESOLUTION_APPLIED,
955 ]:
956 assert event in event_types
957
958 def test_metadata_stored(self, repo: pathlib.Path) -> None:
959 append_audit(
960 repo,
961 AuditEventType.PATTERN_RECORDED,
962 AgentProvenance.human(),
963 pattern_id=_hex64("p"),
964 metadata={"extra": "value"},
965 )
966 entries = list_audit(repo)
967 assert entries[0]["metadata"]["extra"] == "value"
968
969 def test_audit_id_is_content_addressed(self, repo: pathlib.Path) -> None:
970 """audit_id must be sha256: of canonical entry content, not a UUID."""
971 import json as _json
972 append_audit(
973 repo,
974 AuditEventType.PATTERN_RECORDED,
975 AgentProvenance.human(),
976 pattern_id=_hex64("p"),
977 metadata={"k": "v"},
978 )
979 entry = list_audit(repo)[0]
980 audit_id = entry["audit_id"]
981 # Must be a long_id, not a UUID4
982 assert audit_id.startswith("sha256:"), f"Expected sha256: prefix, got {audit_id!r}"
983 assert len(audit_id) == 71, f"Expected 71 chars (sha256: + 64 hex), got {len(audit_id)}"
984
985 def test_audit_id_is_deterministic(self, repo: pathlib.Path) -> None:
986 """Same content → same audit_id (content-addressed, not random)."""
987 import json as _json
988 prov = AgentProvenance.human()
989 # Compute what the id should be from the entry fields
990 append_audit(
991 repo,
992 AuditEventType.RESOLUTION_SAVED,
993 prov,
994 pattern_id=_hex64("p"),
995 resolution_id=_hex64("r"),
996 metadata={"x": 1},
997 )
998 entry = list_audit(repo)[0]
999 # Re-derive: sha256 of entry without audit_id, sorted keys
1000 payload = {k: v for k, v in entry.items() if k != "audit_id"}
1001 expected = blob_id(_json.dumps(payload, sort_keys=True, separators=(",", ":")).encode())
1002 assert entry["audit_id"] == expected
1003
1004 def test_audit_id_is_sha256_not_uuid4(self, repo: pathlib.Path) -> None:
1005 """audit_id must be sha256-prefixed, not UUID4 (8-4-4-4-12 format)."""
1006 import re
1007 append_audit(repo, AuditEventType.GC_RUN, AgentProvenance.human())
1008 entry = list_audit(repo)[0]
1009 uuid4_re = re.compile(
1010 r"^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$"
1011 )
1012 assert not uuid4_re.match(entry["audit_id"]), "audit_id must not be a UUID4"
1013
1014
1015 # ===========================================================================
1016 # 14. Policy CRUD
1017 # ===========================================================================
1018
1019
1020 class TestPolicyCRUD:
1021 def test_save_and_load(self, repo: pathlib.Path) -> None:
1022 policy = _make_policy("my-policy")
1023 save_policy(repo, policy)
1024 loaded = load_policy(repo, "my-policy")
1025 assert loaded is not None
1026 assert loaded.policy_id == "my-policy"
1027 assert loaded.action == PolicyAction.PREFER_OURS
1028
1029 def test_load_missing_returns_none(self, repo: pathlib.Path) -> None:
1030 assert load_policy(repo, "nonexistent") is None
1031
1032 def test_load_invalid_id_returns_none(self, repo: pathlib.Path) -> None:
1033 assert load_policy(repo, "bad id!") is None
1034
1035 def test_save_overwrites_existing(self, repo: pathlib.Path) -> None:
1036 p1 = _make_policy("pol", action=PolicyAction.PREFER_OURS)
1037 save_policy(repo, p1)
1038 p2 = _make_policy("pol", action=PolicyAction.PREFER_THEIRS)
1039 save_policy(repo, p2)
1040 loaded = load_policy(repo, "pol")
1041 assert loaded is not None
1042 assert loaded.action == PolicyAction.PREFER_THEIRS
1043
1044 def test_list_empty(self, repo: pathlib.Path) -> None:
1045 assert list_policies(repo) == []
1046
1047 def test_list_multiple(self, repo: pathlib.Path) -> None:
1048 for pid in ["a", "b", "c"]:
1049 save_policy(repo, _make_policy(pid))
1050 assert len(list_policies(repo)) == 3
1051
1052 def test_remove_existing(self, repo: pathlib.Path) -> None:
1053 save_policy(repo, _make_policy("remove-me"))
1054 assert remove_policy(repo, "remove-me") is True
1055 assert load_policy(repo, "remove-me") is None
1056
1057 def test_remove_missing_returns_false(self, repo: pathlib.Path) -> None:
1058 assert remove_policy(repo, "nope") is False
1059
1060 def test_list_sorted_by_scope_order(self, repo: pathlib.Path) -> None:
1061 file_pol = _make_policy("file-pol", scope=PolicyScope.FILE)
1062 ws_pol = _make_policy("ws-pol", scope=PolicyScope.WORKSPACE)
1063 domain_pol = _make_policy("domain-pol", scope=PolicyScope.DOMAIN)
1064 repo_pol = _make_policy("repo-pol", scope=PolicyScope.REPO)
1065
1066 for p in [file_pol, domain_pol, ws_pol, repo_pol]:
1067 save_policy(repo, p)
1068
1069 ordered = list_policies(repo)
1070 scopes = [p.scope for p in ordered]
1071 expected_order = [PolicyScope.WORKSPACE, PolicyScope.REPO,
1072 PolicyScope.DOMAIN, PolicyScope.FILE]
1073 assert scopes == expected_order
1074
1075
1076 # ===========================================================================
1077 # 15. PolicyCondition matching (_condition_matches and match_policy)
1078 # ===========================================================================
1079
1080
1081 class TestPolicyConditionMatching:
1082 """_condition_matches and match_policy — all predicate combinations."""
1083
1084 def _pattern(self, path: str = "f.py", domain: str = "code",
1085 conflict_type: str = ConflictType.CONTENT) -> ConflictPattern:
1086 blob = _hex64("fp")
1087 pid = compute_pattern_id(path, blob, blob)
1088 return ConflictPattern(
1089 pattern_id=pid,
1090 path=path,
1091 domain=domain,
1092 conflict_type=conflict_type,
1093 blob_fingerprint=blob,
1094 semantic_fingerprint=blob,
1095 ours_id=blob_id(b"o"),
1096 theirs_id=blob_id(b"t"),
1097 description={},
1098 recorded_at=_now(),
1099 recorded_by="test",
1100 )
1101
1102 def test_all_none_matches_everything(self) -> None:
1103 cond = PolicyCondition()
1104 assert h._condition_matches(cond, self._pattern())
1105 assert h._condition_matches(cond, self._pattern(domain="midi"))
1106
1107 def test_conflict_type_match(self) -> None:
1108 cond = PolicyCondition(conflict_type=ConflictType.CONTENT)
1109 assert h._condition_matches(cond, self._pattern())
1110
1111 def test_conflict_type_no_match(self) -> None:
1112 cond = PolicyCondition(conflict_type=ConflictType.STRUCTURAL)
1113 assert not h._condition_matches(cond, self._pattern())
1114
1115 def test_domain_match(self) -> None:
1116 cond = PolicyCondition(domain="code")
1117 assert h._condition_matches(cond, self._pattern(domain="code"))
1118
1119 def test_domain_no_match(self) -> None:
1120 cond = PolicyCondition(domain="midi")
1121 assert not h._condition_matches(cond, self._pattern(domain="code"))
1122
1123 def test_path_pattern_exact_glob(self) -> None:
1124 cond = PolicyCondition(path_pattern="*.py")
1125 assert h._condition_matches(cond, self._pattern(path="app.py"))
1126
1127 def test_path_pattern_directory_glob(self) -> None:
1128 cond = PolicyCondition(path_pattern="src/*.py")
1129 assert h._condition_matches(cond, self._pattern(path="src/main.py"))
1130 assert not h._condition_matches(cond, self._pattern(path="tests/main.py"))
1131
1132 def test_path_pattern_no_match(self) -> None:
1133 cond = PolicyCondition(path_pattern="*.mid")
1134 assert not h._condition_matches(cond, self._pattern(path="song.py"))
1135
1136 def test_all_conditions_must_match(self) -> None:
1137 cond = PolicyCondition(conflict_type=ConflictType.CONTENT, domain="code",
1138 path_pattern="*.py")
1139 assert h._condition_matches(cond, self._pattern())
1140 # domain wrong
1141 assert not h._condition_matches(cond, self._pattern(domain="midi"))
1142 # path wrong
1143 assert not h._condition_matches(cond, self._pattern(path="song.mid"))
1144
1145 def test_min_confidence_not_checked_here(self) -> None:
1146 """min_confidence is a proposal-time filter — _condition_matches ignores it."""
1147 cond = PolicyCondition(min_confidence=0.99)
1148 # Should match regardless — min_confidence is not a pattern field
1149 assert h._condition_matches(cond, self._pattern())
1150
1151 def test_match_policy_returns_none_when_no_policies(self) -> None:
1152 assert match_policy([], self._pattern()) is None
1153
1154 def test_match_policy_returns_first_match(self) -> None:
1155 p1 = _make_policy("p1", action=PolicyAction.PREFER_OURS,
1156 conflict_type=ConflictType.CONTENT)
1157 p2 = _make_policy("p2", action=PolicyAction.PREFER_THEIRS,
1158 conflict_type=ConflictType.CONTENT)
1159 result = match_policy([p1, p2], self._pattern())
1160 assert result is not None
1161 assert result.policy_id == "p1"
1162
1163 def test_match_policy_returns_none_when_no_match(self) -> None:
1164 p1 = _make_policy("p1", domain="midi") # won't match domain="code"
1165 result = match_policy([p1], self._pattern(domain="code"))
1166 assert result is None
1167
1168 def test_match_policy_skips_non_matching(self) -> None:
1169 wrong = _make_policy("wrong", domain="midi")
1170 right = _make_policy("right", domain="code")
1171 result = match_policy([wrong, right], self._pattern(domain="code"))
1172 assert result is not None
1173 assert result.policy_id == "right"
1174
1175 def test_match_policy_symbol_path_glob(self) -> None:
1176 """path_pattern must match symbol-level paths like 'config.py::*'."""
1177 cond_pol = _make_policy("sym", path_pattern="config.py::*")
1178 result = match_policy([cond_pol], self._pattern(path="config.py::MAX_CONNECTIONS"))
1179 assert result is not None
1180
1181
1182 # ===========================================================================
1183 # 16. Escalation lifecycle
1184 # ===========================================================================
1185
1186
1187 class TestEscalationLifecycle:
1188 def _make_escalation(self, pattern_id: str | None = None) -> EscalationRecord:
1189 pid = pattern_id or _hex64("pat")
1190 reason = "could not auto-resolve"
1191 eid = compute_escalation_id(pid, reason)
1192 return EscalationRecord(
1193 escalation_id=eid,
1194 pattern_id=pid,
1195 reason=reason,
1196 escalated_at=_now(),
1197 escalated_by=AgentProvenance.agent("claude-code"),
1198 status=EscalationStatus.OPEN,
1199 )
1200
1201 def test_compute_escalation_id_deterministic(self) -> None:
1202 pid = _hex64("p")
1203 reason = "reason"
1204 assert compute_escalation_id(pid, reason) == compute_escalation_id(pid, reason)
1205
1206 def test_compute_escalation_id_differs_on_different_inputs(self) -> None:
1207 pid = _hex64("p")
1208 assert compute_escalation_id(pid, "r1") != compute_escalation_id(pid, "r2")
1209 assert compute_escalation_id(_hex64("p1"), "r") != compute_escalation_id(_hex64("p2"), "r")
1210
1211 def test_record_and_load(self, repo: pathlib.Path) -> None:
1212 rec = self._make_escalation()
1213 result = record_escalation(repo, rec)
1214 assert result is True
1215 loaded = load_escalation(repo, rec.escalation_id)
1216 assert loaded is not None
1217 assert loaded.escalation_id == rec.escalation_id
1218 assert loaded.status == EscalationStatus.OPEN
1219
1220 def test_record_idempotent(self, repo: pathlib.Path) -> None:
1221 rec = self._make_escalation()
1222 assert record_escalation(repo, rec) is True
1223 assert record_escalation(repo, rec) is False # already exists
1224
1225 def test_load_missing_returns_none(self, repo: pathlib.Path) -> None:
1226 assert load_escalation(repo, _hex64("gone")) is None
1227
1228 def test_list_escalations_empty(self, repo: pathlib.Path) -> None:
1229 assert list_escalations(repo) == []
1230
1231 def test_list_escalations_all(self, repo: pathlib.Path) -> None:
1232 for i in range(3):
1233 rec = self._make_escalation(_hex64(f"pat{i}"))
1234 record_escalation(repo, rec)
1235 assert len(list_escalations(repo)) == 3
1236
1237 def test_list_escalations_filter_by_status(self, repo: pathlib.Path) -> None:
1238 open_rec = self._make_escalation(_hex64("open"))
1239 record_escalation(repo, open_rec)
1240
1241 resolved_rec = self._make_escalation(_hex64("resolved"))
1242 record_escalation(repo, resolved_rec)
1243 resolve_escalation(
1244 repo,
1245 resolved_rec.escalation_id,
1246 _hex64("res"),
1247 AgentProvenance.human(),
1248 _now(),
1249 )
1250
1251 open_list = list_escalations(repo, status=EscalationStatus.OPEN)
1252 resolved_list = list_escalations(repo, status=EscalationStatus.RESOLVED)
1253 assert len(open_list) == 1
1254 assert open_list[0].status == EscalationStatus.OPEN
1255 assert len(resolved_list) == 1
1256 assert resolved_list[0].status == EscalationStatus.RESOLVED
1257
1258 def test_resolve_escalation(self, repo: pathlib.Path) -> None:
1259 rec = self._make_escalation()
1260 record_escalation(repo, rec)
1261
1262 result = resolve_escalation(
1263 repo,
1264 rec.escalation_id,
1265 _hex64("resolution"),
1266 AgentProvenance.human(),
1267 _now(),
1268 )
1269 assert result is True
1270
1271 loaded = load_escalation(repo, rec.escalation_id)
1272 assert loaded is not None
1273 assert loaded.status == EscalationStatus.RESOLVED
1274 assert loaded.resolution_id == _hex64("resolution")
1275
1276 def test_resolve_missing_escalation_returns_false(self, repo: pathlib.Path) -> None:
1277 result = resolve_escalation(
1278 repo, _hex64("gone"), _hex64("res"), AgentProvenance.human(), _now()
1279 )
1280 assert result is False
1281
1282
1283 # ===========================================================================
1284 # 17. record_resolutions — file-level paths
1285 # ===========================================================================
1286
1287
1288 class TestRecordResolutionsFilePaths:
1289 def test_records_pattern_and_resolution(self, repo: pathlib.Path) -> None:
1290 ours_id = _write_obj(repo, b"version = 1")
1291 theirs_id = _write_obj(repo, b"version = 2")
1292 resolution_id = _write_obj(repo, b"version = 3")
1293
1294 ours_m: Manifest = {"config.py": ours_id}
1295 theirs_m: Manifest = {"config.py": theirs_id}
1296 new_m: Manifest = {"config.py": resolution_id}
1297
1298 saved = record_resolutions(repo, ["config.py"], ours_m, theirs_m, new_m, "code", _FakePlugin())
1299 assert saved == ["config.py"]
1300
1301 patterns = list_patterns(repo)
1302 assert len(patterns) == 1
1303 assert patterns[0].path == "config.py"
1304
1305 resolutions = list_resolutions(repo, patterns[0].pattern_id)
1306 assert len(resolutions) == 1
1307 assert resolutions[0].outcome_blob == resolution_id
1308 assert resolutions[0].human_verified is True
1309 assert resolutions[0].confidence == 1.0
1310 assert resolutions[0].strategy == ResolutionStrategy.MANUAL
1311
1312 def test_skips_path_not_in_manifests(self, repo: pathlib.Path) -> None:
1313 saved = record_resolutions(repo, ["missing.py"], {}, {}, {}, "code", _FakePlugin())
1314 assert saved == []
1315 assert list_patterns(repo) == []
1316
1317 def test_skips_when_ours_missing_from_manifest(self, repo: pathlib.Path) -> None:
1318 theirs_id = _write_obj(repo, b"v2")
1319 res_id = _write_obj(repo, b"v2")
1320 saved = record_resolutions(
1321 repo, ["f.py"], {}, {"f.py": theirs_id}, {"f.py": res_id}, "code", _FakePlugin()
1322 )
1323 assert saved == []
1324
1325 def test_skips_when_theirs_missing_from_manifest(self, repo: pathlib.Path) -> None:
1326 ours_id = _write_obj(repo, b"v1")
1327 res_id = _write_obj(repo, b"v1")
1328 saved = record_resolutions(
1329 repo, ["f.py"], {"f.py": ours_id}, {}, {"f.py": res_id}, "code", _FakePlugin()
1330 )
1331 assert saved == []
1332
1333 def test_skips_when_outcome_missing_from_new_manifest(self, repo: pathlib.Path) -> None:
1334 ours_id = _write_obj(repo, b"v1")
1335 theirs_id = _write_obj(repo, b"v2")
1336 saved = record_resolutions(
1337 repo, ["f.py"], {"f.py": ours_id}, {"f.py": theirs_id}, {}, "code", _FakePlugin()
1338 )
1339 assert saved == []
1340
1341 def test_idempotent_second_call(self, repo: pathlib.Path) -> None:
1342 ours_id = _write_obj(repo, b"a")
1343 theirs_id = _write_obj(repo, b"b")
1344 resolution_id = _write_obj(repo, b"c")
1345 ours_m: Manifest = {"f.py": ours_id}
1346 theirs_m: Manifest = {"f.py": theirs_id}
1347 new_m: Manifest = {"f.py": resolution_id}
1348
1349 record_resolutions(repo, ["f.py"], ours_m, theirs_m, new_m, "code", _FakePlugin())
1350 record_resolutions(repo, ["f.py"], ours_m, theirs_m, new_m, "code", _FakePlugin())
1351
1352 assert len(list_patterns(repo)) == 1
1353 assert len(list_resolutions(repo, list_patterns(repo)[0].pattern_id)) == 1
1354
1355 def test_multiple_paths(self, repo: pathlib.Path) -> None:
1356 ours_a = _write_obj(repo, b"a_ours")
1357 theirs_a = _write_obj(repo, b"a_theirs")
1358 res_a = _write_obj(repo, b"a_res")
1359
1360 ours_b = _write_obj(repo, b"b_ours")
1361 theirs_b = _write_obj(repo, b"b_theirs")
1362 res_b = _write_obj(repo, b"b_res")
1363
1364 saved = record_resolutions(
1365 repo,
1366 ["a.py", "b.py"],
1367 {"a.py": ours_a, "b.py": ours_b},
1368 {"a.py": theirs_a, "b.py": theirs_b},
1369 {"a.py": res_a, "b.py": res_b},
1370 "code",
1371 _FakePlugin(),
1372 )
1373 assert set(saved) == {"a.py", "b.py"}
1374 assert len(list_patterns(repo)) == 2
1375
1376 def test_returns_only_saved_paths(self, repo: pathlib.Path) -> None:
1377 """Paths missing from manifests are silently skipped, not in return value."""
1378 ours_id = _write_obj(repo, b"x")
1379 theirs_id = _write_obj(repo, b"y")
1380 res_id = _write_obj(repo, b"z")
1381
1382 saved = record_resolutions(
1383 repo,
1384 ["present.py", "absent.py"],
1385 {"present.py": ours_id},
1386 {"present.py": theirs_id},
1387 {"present.py": res_id},
1388 "code",
1389 _FakePlugin(),
1390 )
1391 assert saved == ["present.py"]
1392
1393
1394 # ===========================================================================
1395 # 18. record_resolutions — symbol-level paths
1396 # ===========================================================================
1397
1398
1399 class TestRecordResolutionsSymbolPaths:
1400 def test_symbol_path_records_pattern(self, repo: pathlib.Path) -> None:
1401 ours_id = _write_obj(repo, b"MAX_CONNECTIONS = 10")
1402 theirs_id = _write_obj(repo, b"MAX_CONNECTIONS = 25")
1403 resolution_id = _write_obj(repo, b"MAX_CONNECTIONS = 50")
1404
1405 saved = record_resolutions(
1406 repo,
1407 ["config.py::MAX_CONNECTIONS"],
1408 {"config.py": ours_id},
1409 {"config.py": theirs_id},
1410 {"config.py": resolution_id},
1411 "code",
1412 _FakePlugin(),
1413 )
1414
1415 assert saved == ["config.py::MAX_CONNECTIONS"]
1416 patterns = list_patterns(repo)
1417 assert len(patterns) == 1
1418 assert patterns[0].path == "config.py::MAX_CONNECTIONS"
1419
1420 def test_multiple_symbols_same_file_produce_distinct_patterns(
1421 self, repo: pathlib.Path
1422 ) -> None:
1423 file_ours = _write_obj(repo, b"file-ours")
1424 file_theirs = _write_obj(repo, b"file-theirs")
1425 file_resolved = _write_obj(repo, b"file-resolved")
1426
1427 saved = record_resolutions(
1428 repo,
1429 ["app.py::foo", "app.py::bar"],
1430 {"app.py": file_ours},
1431 {"app.py": file_theirs},
1432 {"app.py": file_resolved},
1433 "code",
1434 _FakePlugin(),
1435 )
1436 assert set(saved) == {"app.py::foo", "app.py::bar"}
1437 patterns = list_patterns(repo)
1438 assert len(patterns) == 2
1439 paths = {p.path for p in patterns}
1440 assert paths == {"app.py::foo", "app.py::bar"}
1441
1442 def test_symbol_path_missing_file_portion(self, repo: pathlib.Path) -> None:
1443 saved = record_resolutions(
1444 repo, ["missing.py::Symbol"], {}, {}, {}, "code", _FakePlugin()
1445 )
1446 assert saved == []
1447
1448 def test_symbol_path_idempotent(self, repo: pathlib.Path) -> None:
1449 ours_id = _write_obj(repo, b"ours")
1450 theirs_id = _write_obj(repo, b"theirs")
1451 res_id = _write_obj(repo, b"resolved")
1452 ours_m: Manifest = {"f.py": ours_id}
1453 theirs_m: Manifest = {"f.py": theirs_id}
1454 new_m: Manifest = {"f.py": res_id}
1455
1456 record_resolutions(repo, ["f.py::Symbol"], ours_m, theirs_m, new_m, "code", _FakePlugin())
1457 record_resolutions(repo, ["f.py::Symbol"], ours_m, theirs_m, new_m, "code", _FakePlugin())
1458
1459 assert len(list_patterns(repo)) == 1
1460 assert len(list_resolutions(repo, list_patterns(repo)[0].pattern_id)) == 1
1461
1462 def test_deeply_nested_symbol_path(self, repo: pathlib.Path) -> None:
1463 ours_id = _write_obj(repo, b"ours")
1464 theirs_id = _write_obj(repo, b"theirs")
1465 res_id = _write_obj(repo, b"resolved")
1466
1467 saved = record_resolutions(
1468 repo,
1469 ["src/auth/tokens.py::TokenManager.rotate"],
1470 {"src/auth/tokens.py": ours_id},
1471 {"src/auth/tokens.py": theirs_id},
1472 {"src/auth/tokens.py": res_id},
1473 "code",
1474 _FakePlugin(),
1475 )
1476 assert saved == ["src/auth/tokens.py::TokenManager.rotate"]
1477 patterns = list_patterns(repo)
1478 assert patterns[0].path == "src/auth/tokens.py::TokenManager.rotate"
1479
1480
1481 # ===========================================================================
1482 # 19. auto_apply — exact replay (file paths)
1483 # ===========================================================================
1484
1485
1486 class TestAutoApplyExactReplay:
1487 def test_no_resolution_records_pattern_and_returns_remaining(
1488 self, repo: pathlib.Path
1489 ) -> None:
1490 ours_id = _write_obj(repo, b"v1")
1491 theirs_id = _write_obj(repo, b"v2")
1492 ours_m: Manifest = {"f.py": ours_id}
1493 theirs_m: Manifest = {"f.py": theirs_id}
1494
1495 resolved, remaining = auto_apply(
1496 repo, ["f.py"], ours_m, theirs_m, "code", _FakePlugin()
1497 )
1498 assert resolved == {}
1499 assert "f.py" in remaining
1500 # Pattern should have been recorded for future learning
1501 assert len(list_patterns(repo)) == 1
1502
1503 def test_second_identical_conflict_auto_resolves(
1504 self, repo: pathlib.Path
1505 ) -> None:
1506 ours_id = _write_obj(repo, b"ours-content")
1507 theirs_id = _write_obj(repo, b"theirs-content")
1508 resolution_content = b"resolved-content"
1509 resolution_id = _write_obj(repo, resolution_content)
1510
1511 ours_m: Manifest = {"f.py": ours_id}
1512 theirs_m: Manifest = {"f.py": theirs_id}
1513 new_m: Manifest = {"f.py": resolution_id}
1514
1515 # First conflict: record how it was resolved
1516 record_resolutions(repo, ["f.py"], ours_m, theirs_m, new_m, "code", _FakePlugin())
1517
1518 # Second identical conflict: auto_apply should replay
1519 dest = repo / "f.py"
1520 resolved, remaining = auto_apply(repo, ["f.py"], ours_m, theirs_m, "code", _FakePlugin())
1521
1522 assert "f.py" in resolved
1523 assert remaining == []
1524 assert dest.read_bytes() == resolution_content
1525
1526 def test_commutative_replay(self, repo: pathlib.Path) -> None:
1527 """Record A-vs-B; auto_apply B-vs-A should still match."""
1528 ours_id = _write_obj(repo, b"A")
1529 theirs_id = _write_obj(repo, b"B")
1530 resolution_id = _write_obj(repo, b"A") # kept ours
1531
1532 ours_m: Manifest = {"f.py": ours_id}
1533 theirs_m: Manifest = {"f.py": theirs_id}
1534 new_m: Manifest = {"f.py": resolution_id}
1535
1536 record_resolutions(repo, ["f.py"], ours_m, theirs_m, new_m, "code", _FakePlugin())
1537
1538 # Swapped: ours=B, theirs=A — same fingerprint (commutative)
1539 swapped_ours: Manifest = {"f.py": theirs_id}
1540 swapped_theirs: Manifest = {"f.py": ours_id}
1541 resolved, remaining = auto_apply(
1542 repo, ["f.py"], swapped_ours, swapped_theirs, "code", _FakePlugin()
1543 )
1544 assert "f.py" in resolved
1545 assert remaining == []
1546
1547 def test_different_content_does_not_auto_resolve(self, repo: pathlib.Path) -> None:
1548 """Different blob IDs → different fingerprint → no auto-apply."""
1549 ours_id = _write_obj(repo, b"old-ours")
1550 theirs_id = _write_obj(repo, b"old-theirs")
1551 resolution_id = _write_obj(repo, b"old-resolved")
1552
1553 record_resolutions(
1554 repo, ["f.py"],
1555 {"f.py": ours_id}, {"f.py": theirs_id}, {"f.py": resolution_id},
1556 "code", _FakePlugin(),
1557 )
1558
1559 new_ours = _write_obj(repo, b"new-ours")
1560 new_theirs = _write_obj(repo, b"new-theirs")
1561
1562 resolved, remaining = auto_apply(
1563 repo, ["f.py"],
1564 {"f.py": new_ours}, {"f.py": new_theirs},
1565 "code", _FakePlugin(),
1566 )
1567 assert resolved == {}
1568 assert "f.py" in remaining
1569
1570 def test_applied_count_incremented_on_replay(self, repo: pathlib.Path) -> None:
1571 ours_id = _write_obj(repo, b"ours")
1572 theirs_id = _write_obj(repo, b"theirs")
1573 resolution_id = _write_obj(repo, b"resolved")
1574
1575 ours_m: Manifest = {"f.py": ours_id}
1576 theirs_m: Manifest = {"f.py": theirs_id}
1577 new_m: Manifest = {"f.py": resolution_id}
1578 record_resolutions(repo, ["f.py"], ours_m, theirs_m, new_m, "code", _FakePlugin())
1579
1580 auto_apply(repo, ["f.py"], ours_m, theirs_m, "code", _FakePlugin())
1581
1582 p = list_patterns(repo)[0]
1583 resolutions = list_resolutions(repo, p.pattern_id)
1584 assert resolutions[0].applied_count == 1
1585
1586 def test_multiple_replays_increment_count(self, repo: pathlib.Path) -> None:
1587 ours_id = _write_obj(repo, b"ours")
1588 theirs_id = _write_obj(repo, b"theirs")
1589 resolution_id = _write_obj(repo, b"resolved")
1590
1591 ours_m: Manifest = {"f.py": ours_id}
1592 theirs_m: Manifest = {"f.py": theirs_id}
1593 new_m: Manifest = {"f.py": resolution_id}
1594 record_resolutions(repo, ["f.py"], ours_m, theirs_m, new_m, "code", _FakePlugin())
1595
1596 for _ in range(3):
1597 auto_apply(repo, ["f.py"], ours_m, theirs_m, "code", _FakePlugin())
1598
1599 p = list_patterns(repo)[0]
1600 resolutions = list_resolutions(repo, p.pattern_id)
1601 assert resolutions[0].applied_count == 3
1602
1603 def test_resolves_file_to_disk(self, repo: pathlib.Path) -> None:
1604 """The resolved content must actually be written to the working tree."""
1605 ours_id = _write_obj(repo, b"ours")
1606 theirs_id = _write_obj(repo, b"theirs")
1607 content = b"the chosen resolution\n"
1608 resolution_id = _write_obj(repo, content)
1609
1610 ours_m: Manifest = {"result.py": ours_id}
1611 theirs_m: Manifest = {"result.py": theirs_id}
1612 new_m: Manifest = {"result.py": resolution_id}
1613 record_resolutions(repo, ["result.py"], ours_m, theirs_m, new_m, "code", _FakePlugin())
1614
1615 dest = repo / "result.py"
1616 auto_apply(repo, ["result.py"], ours_m, theirs_m, "code", _FakePlugin())
1617 assert dest.read_bytes() == content
1618
1619
1620 # ===========================================================================
1621 # 20. auto_apply — symbol-level paths
1622 # ===========================================================================
1623
1624
1625 class TestAutoApplySymbolPaths:
1626 def test_first_symbol_conflict_records_pattern(self, repo: pathlib.Path) -> None:
1627 ours_id = _write_obj(repo, b"DEBUG = False")
1628 theirs_id = _write_obj(repo, b"DEBUG = True")
1629
1630 _, remaining = auto_apply(
1631 repo,
1632 ["settings.py::DEBUG"],
1633 {"settings.py": ours_id},
1634 {"settings.py": theirs_id},
1635 "code",
1636 _FakePlugin(),
1637 )
1638
1639 assert "settings.py::DEBUG" in remaining
1640 patterns = list_patterns(repo)
1641 assert len(patterns) == 1
1642 assert patterns[0].path == "settings.py::DEBUG"
1643
1644 def test_symbol_conflict_replayed(self, repo: pathlib.Path) -> None:
1645 ours_id = _write_obj(repo, b"TIMEOUT = 30")
1646 theirs_id = _write_obj(repo, b"TIMEOUT = 60")
1647 resolution_content = b"TIMEOUT = 45"
1648 resolution_id = _write_obj(repo, resolution_content)
1649
1650 ours_m: Manifest = {"config.py": ours_id}
1651 theirs_m: Manifest = {"config.py": theirs_id}
1652 new_m: Manifest = {"config.py": resolution_id}
1653
1654 record_resolutions(
1655 repo, ["config.py::TIMEOUT"], ours_m, theirs_m, new_m, "code", _FakePlugin()
1656 )
1657
1658 dest = repo / "config.py"
1659 resolved, remaining = auto_apply(
1660 repo, ["config.py::TIMEOUT"], ours_m, theirs_m, "code", _FakePlugin()
1661 )
1662
1663 assert "config.py::TIMEOUT" in resolved
1664 assert remaining == []
1665 assert dest.read_bytes() == resolution_content
1666
1667 def test_symbol_and_file_path_produce_distinct_patterns(
1668 self, repo: pathlib.Path
1669 ) -> None:
1670 """config.py and config.py::MAX must produce separate patterns."""
1671 ours_id = _write_obj(repo, b"ours")
1672 theirs_id = _write_obj(repo, b"theirs")
1673 res_id = _write_obj(repo, b"res")
1674
1675 ours_m: Manifest = {"config.py": ours_id}
1676 theirs_m: Manifest = {"config.py": theirs_id}
1677 new_m: Manifest = {"config.py": res_id}
1678
1679 record_resolutions(repo, ["config.py"], ours_m, theirs_m, new_m, "code", _FakePlugin())
1680 auto_apply(repo, ["config.py::MAX"], ours_m, theirs_m, "code", _FakePlugin())
1681
1682 patterns = list_patterns(repo)
1683 assert len(patterns) == 2
1684 paths = {p.path for p in patterns}
1685 assert "config.py" in paths
1686 assert "config.py::MAX" in paths
1687
1688 def test_multiple_symbols_in_one_auto_apply(self, repo: pathlib.Path) -> None:
1689 ours_id = _write_obj(repo, b"ours")
1690 theirs_id = _write_obj(repo, b"theirs")
1691 res_id = _write_obj(repo, b"res")
1692
1693 ours_m: Manifest = {"f.py": ours_id}
1694 theirs_m: Manifest = {"f.py": theirs_id}
1695 new_m: Manifest = {"f.py": res_id}
1696
1697 record_resolutions(
1698 repo, ["f.py::alpha", "f.py::beta"], ours_m, theirs_m, new_m, "code", _FakePlugin()
1699 )
1700 resolved, remaining = auto_apply(
1701 repo, ["f.py::alpha", "f.py::beta"], ours_m, theirs_m, "code", _FakePlugin()
1702 )
1703 assert "f.py::alpha" in resolved
1704 assert "f.py::beta" in resolved
1705 assert remaining == []
1706
1707 def test_partial_resolution_some_symbols_remain(
1708 self, repo: pathlib.Path
1709 ) -> None:
1710 """Only symbols with saved resolutions are auto-applied."""
1711 ours_id = _write_obj(repo, b"ours")
1712 theirs_id = _write_obj(repo, b"theirs")
1713 res_id = _write_obj(repo, b"res")
1714
1715 ours_m: Manifest = {"f.py": ours_id}
1716 theirs_m: Manifest = {"f.py": theirs_id}
1717 new_m: Manifest = {"f.py": res_id}
1718
1719 # Only record resolution for alpha, not beta
1720 record_resolutions(
1721 repo, ["f.py::alpha"], ours_m, theirs_m, new_m, "code", _FakePlugin()
1722 )
1723 resolved, remaining = auto_apply(
1724 repo, ["f.py::alpha", "f.py::beta"], ours_m, theirs_m, "code", _FakePlugin()
1725 )
1726 assert "f.py::alpha" in resolved
1727 assert "f.py::beta" in remaining
1728
1729
1730 # ===========================================================================
1731 # 21. auto_apply — path traversal guard
1732 # ===========================================================================
1733
1734
1735 class TestAutoApplyPathTraversal:
1736 def test_traversal_path_skipped(self, repo: pathlib.Path) -> None:
1737 ours_id = _write_obj(repo, b"x")
1738 theirs_id = _write_obj(repo, b"y")
1739 ours_m: Manifest = {"../traversal.py": ours_id}
1740 theirs_m: Manifest = {"../traversal.py": theirs_id}
1741
1742 resolved, remaining = auto_apply(
1743 repo, ["../traversal.py"], ours_m, theirs_m, "code", _FakePlugin()
1744 )
1745 assert resolved == {}
1746 assert "../traversal.py" in remaining
1747 # No pattern should have been recorded for a traversal attempt
1748 assert list_patterns(repo) == []
1749
1750 def test_symbol_traversal_skipped(self, repo: pathlib.Path) -> None:
1751 ours_id = _write_obj(repo, b"x")
1752 theirs_id = _write_obj(repo, b"y")
1753 ours_m: Manifest = {"../traversal.py": ours_id}
1754 theirs_m: Manifest = {"../traversal.py": theirs_id}
1755
1756 resolved, remaining = auto_apply(
1757 repo, ["../traversal.py::Symbol"], ours_m, theirs_m, "code", _FakePlugin()
1758 )
1759 assert resolved == {}
1760 assert "../traversal.py::Symbol" in remaining
1761 assert list_patterns(repo) == []
1762
1763 def test_absolute_path_skipped(self, repo: pathlib.Path) -> None:
1764 ours_id = _write_obj(repo, b"x")
1765 theirs_id = _write_obj(repo, b"y")
1766 abs_path = "/etc/passwd"
1767 ours_m: Manifest = {abs_path: ours_id}
1768 theirs_m: Manifest = {abs_path: theirs_id}
1769
1770 resolved, remaining = auto_apply(
1771 repo, [abs_path], ours_m, theirs_m, "code", _FakePlugin()
1772 )
1773 assert resolved == {}
1774 assert abs_path in remaining
1775
1776 def test_legitimate_nested_path_not_skipped(self, repo: pathlib.Path) -> None:
1777 ours_id = _write_obj(repo, b"v1")
1778 theirs_id = _write_obj(repo, b"v2")
1779 res_id = _write_obj(repo, b"v3")
1780 path = "src/auth/tokens.py"
1781 ours_m: Manifest = {path: ours_id}
1782 theirs_m: Manifest = {path: theirs_id}
1783 new_m: Manifest = {path: res_id}
1784
1785 record_resolutions(repo, [path], ours_m, theirs_m, new_m, "code", _FakePlugin())
1786 resolved, remaining = auto_apply(
1787 repo, [path], ours_m, theirs_m, "code", _FakePlugin()
1788 )
1789 assert path in resolved
1790 assert remaining == []
1791
1792
1793 # ===========================================================================
1794 # 22. auto_apply — one-sided deletion (ours or theirs missing)
1795 # ===========================================================================
1796
1797
1798 class TestAutoApplyOneWayDeletion:
1799 def test_ours_missing_from_manifest(self, repo: pathlib.Path) -> None:
1800 theirs_id = _write_obj(repo, b"theirs")
1801 resolved, remaining = auto_apply(
1802 repo, ["f.py"], {}, {"f.py": theirs_id}, "code", _FakePlugin()
1803 )
1804 assert resolved == {}
1805 assert "f.py" in remaining
1806
1807 def test_theirs_missing_from_manifest(self, repo: pathlib.Path) -> None:
1808 ours_id = _write_obj(repo, b"ours")
1809 resolved, remaining = auto_apply(
1810 repo, ["f.py"], {"f.py": ours_id}, {}, "code", _FakePlugin()
1811 )
1812 assert resolved == {}
1813 assert "f.py" in remaining
1814
1815 def test_both_missing(self, repo: pathlib.Path) -> None:
1816 resolved, remaining = auto_apply(
1817 repo, ["f.py"], {}, {}, "code", _FakePlugin()
1818 )
1819 assert resolved == {}
1820 assert "f.py" in remaining
1821
1822
1823 # ===========================================================================
1824 # 23. auto_apply — empty conflict list
1825 # ===========================================================================
1826
1827
1828 class TestAutoApplyEmptyList:
1829 def test_empty_conflict_list(self, repo: pathlib.Path) -> None:
1830 resolved, remaining = auto_apply(repo, [], {}, {}, "code", _FakePlugin())
1831 assert resolved == {}
1832 assert remaining == []
1833
1834 def test_empty_list_no_patterns_recorded(self, repo: pathlib.Path) -> None:
1835 auto_apply(repo, [], {}, {}, "code", _FakePlugin())
1836 assert list_patterns(repo) == []
1837
1838
1839 # ===========================================================================
1840 # 24. auto_apply — semantic fingerprinting via HarmonyPlugin
1841 # ===========================================================================
1842
1843
1844 class TestAutoApplySemanticPlugin:
1845 """A HarmonyPlugin that collapses semantically equivalent conflicts.
1846
1847 Scenario: Two conflicts that have different blob IDs but the plugin
1848 assigns them the same semantic fingerprint. Harmony should recognise
1849 the second conflict as a replay of the first.
1850 """
1851
1852 def test_semantic_plugin_enables_cross_content_replay(
1853 self, repo: pathlib.Path
1854 ) -> None:
1855 shared_semantic = _hex64("shared-semantic-fingerprint")
1856 plugin = _SemanticPlugin(shared_semantic)
1857
1858 # First conflict: blob pair (A, B)
1859 ours_A = _write_obj(repo, b"variant-A-ours")
1860 theirs_A = _write_obj(repo, b"variant-A-theirs")
1861 resolution_A = _write_obj(repo, b"resolution-A")
1862
1863 ours_m_A: Manifest = {"song.mid": ours_A}
1864 theirs_m_A: Manifest = {"song.mid": theirs_A}
1865 new_m_A: Manifest = {"song.mid": resolution_A}
1866
1867 record_resolutions(
1868 repo, ["song.mid"], ours_m_A, theirs_m_A, new_m_A, "midi", plugin
1869 )
1870
1871 # Second conflict: blob pair (C, D) — different content but same semantic FP
1872 ours_C = _write_obj(repo, b"variant-C-ours")
1873 theirs_C = _write_obj(repo, b"variant-C-theirs")
1874
1875 ours_m_C: Manifest = {"song.mid": ours_C}
1876 theirs_m_C: Manifest = {"song.mid": theirs_C}
1877
1878 dest = repo / "song.mid"
1879 resolved, remaining = auto_apply(
1880 repo, ["song.mid"], ours_m_C, theirs_m_C, "midi", plugin
1881 )
1882
1883 assert "song.mid" in resolved, (
1884 "Semantic plugin maps both conflicts to the same pattern_id; "
1885 "auto_apply must replay the saved resolution even though blob IDs differ"
1886 )
1887 assert remaining == []
1888 assert dest.read_bytes() == b"resolution-A"
1889
1890 def test_different_semantic_fingerprints_no_cross_replay(
1891 self, repo: pathlib.Path
1892 ) -> None:
1893 """Two semantically distinct conflicts must NOT cross-replay."""
1894 plugin_A = _SemanticPlugin(_hex64("semantic-A"))
1895 plugin_B = _SemanticPlugin(_hex64("semantic-B"))
1896
1897 ours_id = _write_obj(repo, b"ours")
1898 theirs_id = _write_obj(repo, b"theirs")
1899 res_id = _write_obj(repo, b"res")
1900
1901 ours_m: Manifest = {"f.py": ours_id}
1902 theirs_m: Manifest = {"f.py": theirs_id}
1903 new_m: Manifest = {"f.py": res_id}
1904
1905 record_resolutions(repo, ["f.py"], ours_m, theirs_m, new_m, "code", plugin_A)
1906
1907 # Same blobs, but plugin_B produces a different fingerprint
1908 resolved, remaining = auto_apply(
1909 repo, ["f.py"], ours_m, theirs_m, "code", plugin_B
1910 )
1911 assert resolved == {}
1912 assert "f.py" in remaining
1913
1914
1915 # ===========================================================================
1916 # 25. MergeState — original_conflict_paths (Bug 3)
1917 # ===========================================================================
1918
1919
1920 class TestMergeStateOriginalConflictPaths:
1921 def test_write_sets_original_conflict_paths(self, repo: pathlib.Path) -> None:
1922 write_merge_state(
1923 repo,
1924 base_commit=NULL_LONG_ID,
1925 ours_commit=long_id("1" * 64),
1926 theirs_commit=long_id("2" * 64),
1927 conflict_paths=["config.py::MAX_CONNECTIONS", "utils.py::clamp"],
1928 )
1929 state = read_merge_state(repo)
1930 assert state is not None
1931 assert state.original_conflict_paths == [
1932 "config.py::MAX_CONNECTIONS",
1933 "utils.py::clamp",
1934 ]
1935
1936 def test_original_paths_preserved_after_partial_resolution(
1937 self, repo: pathlib.Path
1938 ) -> None:
1939 write_merge_state(
1940 repo,
1941 base_commit=NULL_LONG_ID,
1942 ours_commit=long_id("1" * 64),
1943 theirs_commit=long_id("2" * 64),
1944 conflict_paths=["config.py::A", "config.py::B"],
1945 )
1946 write_merge_state(
1947 repo,
1948 base_commit=NULL_LONG_ID,
1949 ours_commit=long_id("1" * 64),
1950 theirs_commit=long_id("2" * 64),
1951 conflict_paths=["config.py::B"], # A resolved
1952 )
1953 state = read_merge_state(repo)
1954 assert state is not None
1955 assert state.conflict_paths == ["config.py::B"]
1956 assert state.original_conflict_paths == ["config.py::A", "config.py::B"]
1957
1958 def test_original_paths_preserved_when_all_resolved(
1959 self, repo: pathlib.Path
1960 ) -> None:
1961 write_merge_state(
1962 repo,
1963 base_commit=NULL_LONG_ID,
1964 ours_commit=long_id("1" * 64),
1965 theirs_commit=long_id("2" * 64),
1966 conflict_paths=["config.py::MAX_CONNECTIONS"],
1967 )
1968 write_merge_state(
1969 repo,
1970 base_commit=NULL_LONG_ID,
1971 ours_commit=long_id("1" * 64),
1972 theirs_commit=long_id("2" * 64),
1973 conflict_paths=[],
1974 )
1975 state = read_merge_state(repo)
1976 assert state is not None
1977 assert state.conflict_paths == []
1978 assert state.original_conflict_paths == ["config.py::MAX_CONNECTIONS"]
1979
1980 def test_commit_pattern_uses_original_paths(self, repo: pathlib.Path) -> None:
1981 ours_id = _write_obj(repo, b"MAX_CONNECTIONS = 50")
1982 theirs_id = _write_obj(repo, b"MAX_CONNECTIONS = 25")
1983 resolution_id = _write_obj(repo, b"MAX_CONNECTIONS = 50")
1984
1985 write_merge_state(
1986 repo,
1987 base_commit=NULL_LONG_ID,
1988 ours_commit=long_id("1" * 64),
1989 theirs_commit=long_id("2" * 64),
1990 conflict_paths=["config.py::MAX_CONNECTIONS"],
1991 )
1992 write_merge_state(
1993 repo,
1994 base_commit=NULL_LONG_ID,
1995 ours_commit=long_id("1" * 64),
1996 theirs_commit=long_id("2" * 64),
1997 conflict_paths=[],
1998 )
1999 state = read_merge_state(repo)
2000 assert state is not None
2001
2002 paths_for_harmony = state.original_conflict_paths or state.conflict_paths
2003 saved = record_resolutions(
2004 repo, paths_for_harmony,
2005 {"config.py": ours_id},
2006 {"config.py": theirs_id},
2007 {"config.py": resolution_id},
2008 "code", _FakePlugin(),
2009 )
2010 assert saved == ["config.py::MAX_CONNECTIONS"]
2011
2012 def test_three_path_stepwise_resolution(self, repo: pathlib.Path) -> None:
2013 """Simulate resolving three conflicts one-by-one via checkout --ours."""
2014 original = ["a.py::X", "b.py::Y", "c.py::Z"]
2015 write_merge_state(
2016 repo,
2017 base_commit=NULL_LONG_ID,
2018 ours_commit=long_id("1" * 64),
2019 theirs_commit=long_id("2" * 64),
2020 conflict_paths=original,
2021 )
2022 # Resolve X
2023 write_merge_state(
2024 repo,
2025 base_commit=NULL_LONG_ID,
2026 ours_commit=long_id("1" * 64),
2027 theirs_commit=long_id("2" * 64),
2028 conflict_paths=["b.py::Y", "c.py::Z"],
2029 )
2030 # Resolve Y
2031 write_merge_state(
2032 repo,
2033 base_commit=NULL_LONG_ID,
2034 ours_commit=long_id("1" * 64),
2035 theirs_commit=long_id("2" * 64),
2036 conflict_paths=["c.py::Z"],
2037 )
2038 # Resolve Z
2039 write_merge_state(
2040 repo,
2041 base_commit=NULL_LONG_ID,
2042 ours_commit=long_id("1" * 64),
2043 theirs_commit=long_id("2" * 64),
2044 conflict_paths=[],
2045 )
2046 state = read_merge_state(repo)
2047 assert state is not None
2048 assert state.conflict_paths == []
2049 assert state.original_conflict_paths == sorted(original)
2050
2051
2052 # ===========================================================================
2053 # 26. Full end-to-end integration
2054 # ===========================================================================
2055
2056
2057 class TestEndToEndIntegration:
2058 """High-level workflows that exercise the full harmony lifecycle."""
2059
2060 def test_full_exact_replay_cycle(self, repo: pathlib.Path) -> None:
2061 """Conflict → record → same conflict → auto-apply → applied_count == 1."""
2062 ours_id = _write_obj(repo, b"MAX_CONNECTIONS = 10")
2063 theirs_id = _write_obj(repo, b"MAX_CONNECTIONS = 50")
2064 resolution_content = b"MAX_CONNECTIONS = 50"
2065 resolution_id = _write_obj(repo, resolution_content)
2066
2067 # Step 1: First conflict. auto_apply has nothing → records pattern.
2068 ours_m: Manifest = {"config.py": ours_id}
2069 theirs_m: Manifest = {"config.py": theirs_id}
2070 resolved, remaining = auto_apply(repo, ["config.py"], ours_m, theirs_m, "code", _FakePlugin())
2071 assert resolved == {}
2072 assert "config.py" in remaining
2073 assert len(list_patterns(repo)) == 1
2074
2075 # Step 2: User resolves and commits → record_resolutions learns the outcome.
2076 new_m: Manifest = {"config.py": resolution_id}
2077 saved = record_resolutions(repo, ["config.py"], ours_m, theirs_m, new_m, "code", _FakePlugin())
2078 assert saved == ["config.py"]
2079
2080 p = list_patterns(repo)[0]
2081 resolutions = list_resolutions(repo, p.pattern_id)
2082 assert len(resolutions) == 1
2083 assert resolutions[0].human_verified is True
2084
2085 # Step 3: Same conflict recurs → auto_apply replays.
2086 dest = repo / "config.py"
2087 resolved, remaining = auto_apply(repo, ["config.py"], ours_m, theirs_m, "code", _FakePlugin())
2088 assert "config.py" in resolved
2089 assert remaining == []
2090 assert dest.read_bytes() == resolution_content
2091
2092 # Step 4: applied_count has been incremented.
2093 resolutions = list_resolutions(repo, p.pattern_id)
2094 assert resolutions[0].applied_count == 1
2095
2096 def test_symbol_path_full_cycle(self, repo: pathlib.Path) -> None:
2097 """Full cycle with symbol-level conflict path."""
2098 ours_id = _write_obj(repo, b"TIMEOUT = 30")
2099 theirs_id = _write_obj(repo, b"TIMEOUT = 60")
2100 resolution_content = b"TIMEOUT = 45"
2101 resolution_id = _write_obj(repo, resolution_content)
2102
2103 ours_m: Manifest = {"settings.py": ours_id}
2104 theirs_m: Manifest = {"settings.py": theirs_id}
2105
2106 # First conflict — records pattern
2107 auto_apply(repo, ["settings.py::TIMEOUT"], ours_m, theirs_m, "code", _FakePlugin())
2108 assert len(list_patterns(repo)) == 1
2109
2110 # Record resolution
2111 record_resolutions(
2112 repo, ["settings.py::TIMEOUT"], ours_m, theirs_m,
2113 {"settings.py": resolution_id}, "code", _FakePlugin()
2114 )
2115
2116 # Replay
2117 dest = repo / "settings.py"
2118 resolved, remaining = auto_apply(
2119 repo, ["settings.py::TIMEOUT"], ours_m, theirs_m, "code", _FakePlugin()
2120 )
2121 assert "settings.py::TIMEOUT" in resolved
2122 assert remaining == []
2123 assert dest.read_bytes() == resolution_content
2124
2125 def test_gc_clears_stale_unlearned_patterns(self, repo: pathlib.Path) -> None:
2126 """Patterns auto_apply recorded (no resolution) are GC'd after threshold."""
2127 ours_id = _write_obj(repo, b"v1")
2128 theirs_id = _write_obj(repo, b"v2")
2129 ours_m: Manifest = {"f.py": ours_id}
2130 theirs_m: Manifest = {"f.py": theirs_id}
2131
2132 auto_apply(repo, ["f.py"], ours_m, theirs_m, "code", _FakePlugin())
2133 assert len(list_patterns(repo)) == 1
2134
2135 # Backdate the recorded_at
2136 from dataclasses import replace as dc_replace
2137 p = list_patterns(repo)[0]
2138 old_p = dc_replace(p, recorded_at=_now() - datetime.timedelta(days=100))
2139 forget_pattern(repo, p.pattern_id)
2140 record_pattern(repo, old_p)
2141
2142 removed = gc_stale(repo, age_days=90)
2143 assert removed == 1
2144 assert list_patterns(repo) == []
2145
2146 def test_policy_takes_precedence_in_match(self, repo: pathlib.Path) -> None:
2147 """Saved policy matches a pattern — match_policy returns the policy."""
2148 policy = _make_policy(
2149 "prefer-ours-for-config",
2150 action=PolicyAction.PREFER_OURS,
2151 path_pattern="config.py",
2152 )
2153 save_policy(repo, policy)
2154
2155 ours_id = _write_obj(repo, b"ours")
2156 theirs_id = _write_obj(repo, b"theirs")
2157 blob_fp = blob_fingerprint(ours_id, theirs_id)
2158 pid = compute_pattern_id("config.py", blob_fp, blob_fp)
2159 pattern = ConflictPattern(
2160 pattern_id=pid,
2161 path="config.py",
2162 domain="code",
2163 conflict_type=ConflictType.CONTENT,
2164 blob_fingerprint=blob_fp,
2165 semantic_fingerprint=blob_fp,
2166 ours_id=ours_id,
2167 theirs_id=theirs_id,
2168 description={},
2169 recorded_at=_now(),
2170 recorded_by="test",
2171 )
2172 policies = list_policies(repo)
2173 matched = match_policy(policies, pattern)
2174 assert matched is not None
2175 assert matched.policy_id == "prefer-ours-for-config"
2176 assert matched.action == PolicyAction.PREFER_OURS
2177
2178 def test_escalation_full_lifecycle(self, repo: pathlib.Path) -> None:
2179 """Open escalation → load → resolve → status becomes RESOLVED."""
2180 p = _make_pattern(repo)
2181 record_pattern(repo, p)
2182
2183 reason = "no policy and no prior resolution"
2184 esc_id = compute_escalation_id(p.pattern_id, reason)
2185 esc = EscalationRecord(
2186 escalation_id=esc_id,
2187 pattern_id=p.pattern_id,
2188 reason=reason,
2189 escalated_at=_now(),
2190 escalated_by=AgentProvenance.agent("claude-code"),
2191 status=EscalationStatus.OPEN,
2192 )
2193 record_escalation(repo, esc)
2194
2195 loaded = load_escalation(repo, esc_id)
2196 assert loaded is not None
2197 assert loaded.status == EscalationStatus.OPEN
2198
2199 # Human resolves
2200 r = _make_resolution(p, b"manual-outcome", repo)
2201 save_resolution(repo, r)
2202
2203 resolve_escalation(repo, esc_id, r.resolution_id, AgentProvenance.human(), _now())
2204
2205 loaded = load_escalation(repo, esc_id)
2206 assert loaded is not None
2207 assert loaded.status == EscalationStatus.RESOLVED
2208 assert loaded.resolution_id == r.resolution_id
2209
2210 open_escs = list_escalations(repo, status=EscalationStatus.OPEN)
2211 assert len(open_escs) == 0
2212
2213 def test_clear_all_then_replay_learns_fresh(self, repo: pathlib.Path) -> None:
2214 """After clear_all, harmony starts fresh — no stale resolutions replayed."""
2215 ours_id = _write_obj(repo, b"ours")
2216 theirs_id = _write_obj(repo, b"theirs")
2217 res_id = _write_obj(repo, b"res")
2218 ours_m: Manifest = {"f.py": ours_id}
2219 theirs_m: Manifest = {"f.py": theirs_id}
2220 new_m: Manifest = {"f.py": res_id}
2221
2222 record_resolutions(repo, ["f.py"], ours_m, theirs_m, new_m, "code", _FakePlugin())
2223 assert len(list_patterns(repo)) == 1
2224
2225 clear_all(repo)
2226 assert list_patterns(repo) == []
2227
2228 resolved, remaining = auto_apply(repo, ["f.py"], ours_m, theirs_m, "code", _FakePlugin())
2229 assert resolved == {}
2230 assert "f.py" in remaining
2231
2232 def test_multiple_resolutions_best_wins(self, repo: pathlib.Path) -> None:
2233 """When two resolutions exist, auto_apply picks the best."""
2234 ours_id = _write_obj(repo, b"ours")
2235 theirs_id = _write_obj(repo, b"theirs")
2236
2237 p = _make_pattern(repo, ours_content=b"ours", theirs_content=b"theirs")
2238 record_pattern(repo, p)
2239
2240 # Unverified low-confidence resolution
2241 r_low = _make_resolution(p, b"low-quality-outcome", repo,
2242 human_verified=False, confidence=0.4)
2243 save_resolution(repo, r_low)
2244
2245 # Human-verified high-confidence resolution
2246 high_content = b"high-quality-outcome"
2247 r_high = _make_resolution(p, high_content, repo,
2248 human_verified=True, confidence=1.0)
2249 save_resolution(repo, r_high)
2250
2251 dest = repo / "config.py"
2252 resolved, remaining = auto_apply(
2253 repo, ["config.py"],
2254 {"config.py": ours_id},
2255 {"config.py": theirs_id},
2256 "code", _FakePlugin(),
2257 )
2258 assert "config.py" in resolved
2259 assert dest.read_bytes() == high_content
2260
2261
2262 # ===========================================================================
2263 # 27. AgentProvenance serialization
2264 # ===========================================================================
2265
2266
2267 class TestAgentProvenance:
2268 def test_human_provenance(self) -> None:
2269 prov = AgentProvenance.human()
2270 assert prov.type == "human"
2271 assert prov.agent_id is None
2272 assert prov.model_id is None
2273 d = prov.to_dict()
2274 assert d["type"] == "human"
2275
2276 def test_agent_provenance(self) -> None:
2277 prov = AgentProvenance.agent("claude-code", "claude-sonnet-4-6")
2278 assert prov.type == "agent"
2279 assert prov.agent_id == "claude-code"
2280 assert prov.model_id == "claude-sonnet-4-6"
2281
2282 def test_round_trip(self) -> None:
2283 prov = AgentProvenance.agent("claude-code", "claude-sonnet-4-6")
2284 d = prov.to_dict()
2285 restored = AgentProvenance.from_dict(d)
2286 assert restored == prov
2287
2288 def test_from_dict_missing_fields_defaults(self) -> None:
2289 restored = AgentProvenance.from_dict({})
2290 assert restored.type == "human"
2291 assert restored.agent_id is None
2292 assert restored.model_id is None
2293
2294 def test_agent_without_model(self) -> None:
2295 prov = AgentProvenance.agent("my-agent")
2296 assert prov.model_id is None
2297 d = prov.to_dict()
2298 restored = AgentProvenance.from_dict(d)
2299 assert restored.agent_id == "my-agent"
2300 assert restored.model_id is None
2301
2302
2303 # ---------------------------------------------------------------------------
2304 # Content-addressed ID format — all harmony IDs must carry sha256: prefix
2305 # ---------------------------------------------------------------------------
2306
2307
2308 class TestHarmonyIdFormat:
2309 """All compute_*_id functions must return sha256:-prefixed IDs (length 71)."""
2310
2311 def _obj_id(self, seed: str) -> str:
2312 return blob_id(seed.encode())
2313
2314 # blob_fingerprint
2315
2316 def test_blob_fingerprint_sha256_prefix(self) -> None:
2317 fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b"))
2318 assert fp.startswith("sha256:"), f"expected sha256: prefix, got {fp!r}"
2319 assert len(fp) == 71
2320
2321 def test_blob_fingerprint_commutative(self) -> None:
2322 a, b = self._obj_id("x"), self._obj_id("y")
2323 assert blob_fingerprint(a, b) == blob_fingerprint(b, a)
2324
2325 def test_blob_fingerprint_deterministic(self) -> None:
2326 a, b = self._obj_id("p"), self._obj_id("q")
2327 assert blob_fingerprint(a, b) == blob_fingerprint(a, b)
2328
2329 def test_blob_fingerprint_differs_by_inputs(self) -> None:
2330 a, b, c = self._obj_id("a"), self._obj_id("b"), self._obj_id("c")
2331 assert blob_fingerprint(a, b) != blob_fingerprint(a, c)
2332
2333 # compute_pattern_id
2334
2335 def test_compute_pattern_id_sha256_prefix(self, tmp_path: pathlib.Path) -> None:
2336 fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b"))
2337 pid = compute_pattern_id("file.py", fp, fp)
2338 assert pid.startswith("sha256:")
2339 assert len(pid) == 71
2340
2341 def test_compute_pattern_id_deterministic(self) -> None:
2342 fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b"))
2343 assert compute_pattern_id("f.py", fp, fp) == compute_pattern_id("f.py", fp, fp)
2344
2345 def test_compute_pattern_id_differs_by_path(self) -> None:
2346 fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b"))
2347 assert compute_pattern_id("a.py", fp, fp) != compute_pattern_id("b.py", fp, fp)
2348
2349 def test_compute_pattern_id_semantic_differs_from_blob(self) -> None:
2350 blob_fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b"))
2351 sem_fp = blob_fingerprint(self._obj_id("c"), self._obj_id("d"))
2352 pid_blob = compute_pattern_id("f.py", blob_fp, blob_fp)
2353 pid_sem = compute_pattern_id("f.py", blob_fp, sem_fp)
2354 assert pid_blob != pid_sem
2355
2356 # compute_resolution_id
2357
2358 def test_compute_resolution_id_sha256_prefix(self) -> None:
2359 fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b"))
2360 pid = compute_pattern_id("f.py", fp, fp)
2361 rid = compute_resolution_id(
2362 pid,
2363 self._obj_id("resolved"),
2364 "ours",
2365 AgentProvenance.human(),
2366 datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc),
2367 )
2368 assert rid.startswith("sha256:")
2369 assert len(rid) == 71
2370
2371 def test_compute_resolution_id_deterministic(self) -> None:
2372 fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b"))
2373 pid = compute_pattern_id("f.py", fp, fp)
2374 ts = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc)
2375 r1 = compute_resolution_id(pid, self._obj_id("r"), "ours", AgentProvenance.human(), ts)
2376 r2 = compute_resolution_id(pid, self._obj_id("r"), "ours", AgentProvenance.human(), ts)
2377 assert r1 == r2
2378
2379 def test_compute_resolution_id_differs_by_outcome(self) -> None:
2380 fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b"))
2381 pid = compute_pattern_id("f.py", fp, fp)
2382 ts = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc)
2383 r1 = compute_resolution_id(pid, self._obj_id("r1"), "ours", AgentProvenance.human(), ts)
2384 r2 = compute_resolution_id(pid, self._obj_id("r2"), "ours", AgentProvenance.human(), ts)
2385 assert r1 != r2
2386
2387 # compute_escalation_id
2388
2389 def test_compute_escalation_id_sha256_prefix(self) -> None:
2390 fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b"))
2391 pid = compute_pattern_id("f.py", fp, fp)
2392 eid = compute_escalation_id(pid, "no matching policy")
2393 assert eid.startswith("sha256:")
2394 assert len(eid) == 71
2395
2396 def test_compute_escalation_id_deterministic(self) -> None:
2397 fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b"))
2398 pid = compute_pattern_id("f.py", fp, fp)
2399 e1 = compute_escalation_id(pid, "reason")
2400 e2 = compute_escalation_id(pid, "reason")
2401 assert e1 == e2
2402
2403 def test_compute_escalation_id_differs_by_reason(self) -> None:
2404 fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b"))
2405 pid = compute_pattern_id("f.py", fp, fp)
2406 e1 = compute_escalation_id(pid, "reason A")
2407 e2 = compute_escalation_id(pid, "reason B")
2408 assert e1 != e2
2409
2410 # audit_id
2411
2412 def test_append_audit_id_sha256_prefix(self, tmp_path: pathlib.Path) -> None:
2413 fp = blob_fingerprint(self._obj_id("a"), self._obj_id("b"))
2414 pid = compute_pattern_id("f.py", fp, fp)
2415 append_audit(
2416 tmp_path,
2417 AuditEventType.PATTERN_RECORDED,
2418 AgentProvenance.human(),
2419 pattern_id=pid,
2420 )
2421 entries = list_audit(tmp_path, limit=1)
2422 assert len(entries) == 1
2423 assert entries[0]["audit_id"].startswith("sha256:")
2424 assert len(entries[0]["audit_id"]) == 71
File History 1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b fix: try fetch/presign before fetch/mpack to avoid Cloudfla… Sonnet 4.6 patch 7 days ago