gabriel / muse public
test_harmony_phase1.py python
1,341 lines 53.3 KB
Raw
1 """Tests for muse/core/harmony.py — Phase 1: Core data model.
2
3 Coverage tiers
4 --------------
5 I Unit — fingerprints, validation, namespaces, dataclasses, condition matching
6 II Integration — all CRUD operations (patterns, resolutions, policies, audit, gc)
7 III End-to-end — full conflict lifecycle from record → resolve → replay → gc
8 IV Stress — 10 k pattern scan, concurrent writes under parallel threads
9 V Data integrity— atomic writes (no temp files left), JSON round-trip, field types
10 VI Security — path traversal, symlink guards, size caps, crafted IDs
11 VII Performance — per-operation timing assertions
12 """
13 from __future__ import annotations
14 from collections.abc import Mapping
15
16 import concurrent.futures
17 import datetime
18 from muse.core.paths import muse_dir
19 from muse.core.types import fake_id
20 import json
21 import os
22 import pathlib
23 import tempfile
24 import threading
25 import time
26 from dataclasses import FrozenInstanceError
27 from typing import Any
28 from unittest import mock
29
30 import pytest
31
32 import muse.core.harmony as h
33 from muse.core.harmony import (
34 AgentProvenance,
35 AuditEvent,
36 AuditEventType,
37 ConflictPattern,
38 ConflictType,
39 Policy,
40 PolicyAction,
41 PolicyCondition,
42 PolicyScope,
43 Resolution,
44 ResolutionProposal,
45 ResolutionStrategy,
46 _MAX_AUDIT_BYTES,
47 _MAX_PATTERN_BYTES,
48 _MAX_POLICY_BYTES,
49 _MAX_RESOLUTION_BYTES,
50 _MAX_SCAN,
51 _condition_matches,
52 append_audit,
53 best_resolution,
54 blob_fingerprint,
55 clear_all,
56 compute_pattern_id,
57 compute_resolution_id,
58 forget_pattern,
59 gc_stale,
60 increment_applied_count,
61 list_audit,
62 list_patterns,
63 list_policies,
64 list_resolutions,
65 load_pattern,
66 load_policy,
67 load_resolution,
68 match_policy,
69 record_pattern,
70 remove_policy,
71 save_policy,
72 save_resolution,
73 )
74
75
76 # ---------------------------------------------------------------------------
77 # Shared fixtures
78 # ---------------------------------------------------------------------------
79
80
81 @pytest.fixture()
82 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
83 """Return a temporary directory acting as a bare repo root."""
84 muse_dir(tmp_path).mkdir()
85 return tmp_path
86
87
88
89 def _utc_now() -> datetime.datetime:
90 return datetime.datetime.now(datetime.timezone.utc)
91
92
93 def _make_pattern(
94 path: str = "track.mid",
95 domain: str = "midi",
96 conflict_type: str = ConflictType.CONTENT,
97 ours: str = "ours",
98 theirs: str = "theirs",
99 description: Mapping[str, object] | None = None,
100 recorded_by: str = "claude-code",
101 ) -> ConflictPattern:
102 """Build a ConflictPattern with sensible defaults."""
103 ours_id = fake_id(ours)
104 theirs_id = fake_id(theirs)
105 blob_fp = blob_fingerprint(ours_id, theirs_id)
106 semantic_fp = blob_fp
107 pattern_id = compute_pattern_id(path, blob_fp, semantic_fp)
108 return ConflictPattern(
109 pattern_id=pattern_id,
110 path=path,
111 domain=domain,
112 conflict_type=conflict_type,
113 blob_fingerprint=blob_fp,
114 semantic_fingerprint=semantic_fp,
115 ours_id=ours_id,
116 theirs_id=theirs_id,
117 description=description or {},
118 recorded_at=_utc_now(),
119 recorded_by=recorded_by,
120 )
121
122
123 def _make_resolution(
124 pattern: ConflictPattern,
125 strategy: str = ResolutionStrategy.MANUAL,
126 confidence: float = 0.9,
127 human_verified: bool = False,
128 provenance: AgentProvenance | None = None,
129 policy_id: str | None = None,
130 ) -> Resolution:
131 """Build a Resolution tied to *pattern* with sensible defaults."""
132 outcome_blob = fake_id(f"outcome-{pattern.pattern_id[:8]}")
133 prov = provenance or AgentProvenance.agent("claude-code", "claude-sonnet-4-6")
134 resolved_at = _utc_now()
135 resolution_id = compute_resolution_id(
136 pattern.pattern_id, outcome_blob, strategy, prov, resolved_at
137 )
138 return Resolution(
139 resolution_id=resolution_id,
140 pattern_id=pattern.pattern_id,
141 strategy=strategy,
142 policy_id=policy_id,
143 outcome_blob=outcome_blob,
144 resolved_by=prov,
145 human_verified=human_verified,
146 confidence=confidence,
147 rationale="Test rationale",
148 resolved_at=resolved_at,
149 )
150
151
152 def _make_policy(
153 policy_id: str = "always-prefer-ours",
154 scope: str = PolicyScope.REPO,
155 action: str = PolicyAction.PREFER_OURS,
156 confidence: float = 0.95,
157 conflict_type: str | None = None,
158 domain: str | None = None,
159 path_pattern: str | None = None,
160 ) -> Policy:
161 """Build a Policy with sensible defaults."""
162 return Policy(
163 policy_id=policy_id,
164 description="Test policy",
165 when=PolicyCondition(
166 conflict_type=conflict_type,
167 domain=domain,
168 path_pattern=path_pattern,
169 ),
170 action=action,
171 confidence=confidence,
172 escalate_to=None,
173 delegate_to=None,
174 scope=scope,
175 created_at=_utc_now(),
176 created_by="claude-code",
177 )
178
179
180 # ===========================================================================
181 # Tier I — Unit tests
182 # ===========================================================================
183
184
185 class TestBlobFingerprint:
186 """I: blob_fingerprint must be commutative and deterministic."""
187
188 def test_commutativity(self) -> None:
189 a, b = fake_id("A"), fake_id("B")
190 assert blob_fingerprint(a, b) == blob_fingerprint(b, a)
191
192 def test_determinism(self) -> None:
193 a, b = fake_id("X"), fake_id("Y")
194 fp1 = blob_fingerprint(a, b)
195 fp2 = blob_fingerprint(a, b)
196 assert fp1 == fp2
197
198 def test_output_is_64_hex(self) -> None:
199 a, b = fake_id("p"), fake_id("q")
200 fp = blob_fingerprint(a, b)
201 assert fp.startswith("sha256:")
202 assert len(fp) == 71
203
204 def test_distinct_pairs_differ(self) -> None:
205 ab = blob_fingerprint(fake_id("A"), fake_id("B"))
206 cd = blob_fingerprint(fake_id("C"), fake_id("D"))
207 assert ab != cd
208
209 def test_same_id_both_sides(self) -> None:
210 a = fake_id("same")
211 # Should not crash; result is deterministic
212 fp = blob_fingerprint(a, a)
213 assert fp.startswith("sha256:")
214 assert len(fp) == 71
215
216
217 class TestComputePatternId:
218 """I: compute_pattern_id includes path, so same content → different IDs for different paths."""
219
220 def test_deterministic(self) -> None:
221 blob_fp = fake_id("blob")
222 sem_fp = fake_id("sem")
223 p1 = compute_pattern_id("track.mid", blob_fp, sem_fp)
224 p2 = compute_pattern_id("track.mid", blob_fp, sem_fp)
225 assert p1 == p2
226
227 def test_path_changes_id(self) -> None:
228 blob_fp = fake_id("blob")
229 sem_fp = fake_id("sem")
230 p1 = compute_pattern_id("track.mid", blob_fp, sem_fp)
231 p2 = compute_pattern_id("drums.mid", blob_fp, sem_fp)
232 assert p1 != p2
233
234 def test_blob_changes_id(self) -> None:
235 # When blob_fp == semantic_fp (no plugin), blob content drives the pattern ID.
236 fpA = fake_id("blobA")
237 fpB = fake_id("blobB")
238 p1 = compute_pattern_id("track.mid", fpA, fpA)
239 p2 = compute_pattern_id("track.mid", fpB, fpB)
240 assert p1 != p2
241
242 def test_64_hex_output(self) -> None:
243 pid = compute_pattern_id("f.py", fake_id("b"), fake_id("s"))
244 assert pid.startswith("sha256:")
245 assert len(pid) == 71
246
247
248 class TestComputeResolutionId:
249 """I: compute_resolution_id is deterministic and encodes actor."""
250
251 def test_deterministic(self) -> None:
252 prov = AgentProvenance.agent("claude-code", "claude-sonnet-4-6")
253 ts = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc)
254 pid = fake_id("pattern")
255 ob = fake_id("outcome")
256 r1 = compute_resolution_id(pid, ob, ResolutionStrategy.MANUAL, prov, ts)
257 r2 = compute_resolution_id(pid, ob, ResolutionStrategy.MANUAL, prov, ts)
258 assert r1 == r2
259
260 def test_different_agents_differ(self) -> None:
261 ts = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc)
262 pid = fake_id("pattern")
263 ob = fake_id("outcome")
264 p1 = AgentProvenance.agent("claude-code")
265 p2 = AgentProvenance.agent("codex")
266 r1 = compute_resolution_id(pid, ob, ResolutionStrategy.MANUAL, p1, ts)
267 r2 = compute_resolution_id(pid, ob, ResolutionStrategy.MANUAL, p2, ts)
268 assert r1 != r2
269
270 def test_human_provenance_encodes_as_human(self) -> None:
271 ts = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc)
272 pid = fake_id("pattern")
273 ob = fake_id("outcome")
274 rid = compute_resolution_id(pid, ob, ResolutionStrategy.MANUAL, AgentProvenance.human(), ts)
275 assert rid.startswith("sha256:")
276 assert len(rid) == 71
277
278
279 class TestValidation:
280 """I: _validate_id and _validate_policy_id must reject bad inputs."""
281
282 def test_validate_id_accepts_64_hex(self) -> None:
283 h._validate_id(fake_id("a")) # no exception
284
285 def test_validate_id_rejects_63_chars(self) -> None:
286 with pytest.raises(ValueError):
287 h._validate_id("a" * 63)
288
289 def test_validate_id_rejects_65_chars(self) -> None:
290 with pytest.raises(ValueError):
291 h._validate_id("a" * 65)
292
293 def test_validate_id_rejects_uppercase(self) -> None:
294 with pytest.raises(ValueError):
295 h._validate_id("A" * 64)
296
297 def test_validate_id_rejects_path_traversal(self) -> None:
298 # Attempt to inject a path traversal via the ID
299 with pytest.raises(ValueError):
300 h._validate_id(f"../../../etc/passwd{'a' * 45}")
301
302 def test_validate_id_rejects_empty(self) -> None:
303 with pytest.raises(ValueError):
304 h._validate_id("")
305
306 def test_validate_policy_id_accepts_alphanumeric(self) -> None:
307 h._validate_policy_id("my-policy_123") # no exception
308
309 def test_validate_policy_id_rejects_slash(self) -> None:
310 with pytest.raises(ValueError, match="alphanumeric"):
311 h._validate_policy_id("bad/policy")
312
313 def test_validate_policy_id_rejects_dot(self) -> None:
314 with pytest.raises(ValueError):
315 h._validate_policy_id("bad.policy")
316
317 def test_validate_policy_id_rejects_empty(self) -> None:
318 with pytest.raises(ValueError):
319 h._validate_policy_id("")
320
321 def test_validate_policy_id_rejects_129_chars(self) -> None:
322 with pytest.raises(ValueError):
323 h._validate_policy_id("a" * 129)
324
325 def test_validate_policy_id_accepts_128_chars(self) -> None:
326 h._validate_policy_id("a" * 128) # no exception
327
328
329 class TestNamespaces:
330 """I: Open string-constant namespaces are plain strings — plugin extensibility."""
331
332 def test_conflict_type_are_strings(self) -> None:
333 assert isinstance(ConflictType.CONTENT, str)
334 assert isinstance(ConflictType.STRUCTURAL, str)
335 assert isinstance(ConflictType.METADATA, str)
336 assert isinstance(ConflictType.RELATIONAL, str)
337 assert isinstance(ConflictType.UNKNOWN, str)
338
339 def test_resolution_strategy_are_strings(self) -> None:
340 assert isinstance(ResolutionStrategy.POLICY, str)
341 assert isinstance(ResolutionStrategy.EXACT_REPLAY, str)
342 assert isinstance(ResolutionStrategy.SEMANTIC_PROPOSAL, str)
343 assert isinstance(ResolutionStrategy.MANUAL, str)
344
345 def test_policy_action_are_strings(self) -> None:
346 for attr in ("PREFER_OURS", "PREFER_THEIRS", "ESCALATE", "REQUIRE_HUMAN", "DELEGATE"):
347 assert isinstance(getattr(PolicyAction, attr), str)
348
349 def test_policy_scope_are_strings(self) -> None:
350 for attr in ("WORKSPACE", "REPO", "DOMAIN", "FILE"):
351 assert isinstance(getattr(PolicyScope, attr), str)
352
353 def test_audit_event_type_are_strings(self) -> None:
354 for attr in (
355 "PATTERN_RECORDED", "RESOLUTION_SAVED", "RESOLUTION_APPLIED",
356 "PATTERN_FORGOTTEN", "POLICY_SAVED", "POLICY_REMOVED", "GC_RUN", "CLEAR_RUN",
357 ):
358 assert isinstance(getattr(AuditEventType, attr), str)
359
360 def test_plugin_can_use_custom_conflict_type(self) -> None:
361 # Plugins may add strings at runtime — no import needed
362 custom_type = "note_collision"
363 pattern = _make_pattern(conflict_type=custom_type)
364 assert pattern.conflict_type == custom_type
365
366
367 class TestDataclasses:
368 """I: Frozen dataclasses, AgentProvenance constructors, PolicyCondition wildcards."""
369
370 def test_agent_provenance_human(self) -> None:
371 p = AgentProvenance.human()
372 assert p.type == "human"
373 assert p.agent_id is None
374 assert p.model_id is None
375
376 def test_agent_provenance_agent(self) -> None:
377 p = AgentProvenance.agent("claude-code", "claude-sonnet-4-6")
378 assert p.type == "agent"
379 assert p.agent_id == "claude-code"
380 assert p.model_id == "claude-sonnet-4-6"
381
382 def test_agent_provenance_agent_no_model(self) -> None:
383 p = AgentProvenance.agent("codex")
384 assert p.model_id is None
385
386 def test_agent_provenance_frozen(self) -> None:
387 p = AgentProvenance.human()
388 with pytest.raises(FrozenInstanceError):
389 p.type = "agent" # type: ignore[misc]
390
391 def test_policy_condition_all_none_wildcard(self) -> None:
392 cond = PolicyCondition()
393 assert cond.conflict_type is None
394 assert cond.domain is None
395 assert cond.path_pattern is None
396 assert cond.min_confidence is None
397
398 def test_policy_condition_frozen(self) -> None:
399 cond = PolicyCondition(conflict_type=ConflictType.CONTENT)
400 with pytest.raises(FrozenInstanceError):
401 cond.conflict_type = ConflictType.STRUCTURAL # type: ignore[misc]
402
403 def test_conflict_pattern_frozen(self) -> None:
404 p = _make_pattern()
405 with pytest.raises(FrozenInstanceError):
406 p.path = "malicious.mid" # type: ignore[misc]
407
408 def test_resolution_frozen(self) -> None:
409 pattern = _make_pattern()
410 res = _make_resolution(pattern)
411 with pytest.raises(FrozenInstanceError):
412 res.confidence = 0.0 # type: ignore[misc]
413
414 def test_resolution_proposal_defaults(self) -> None:
415 prop = ResolutionProposal(
416 pattern_id=fake_id("p"),
417 strategy=ResolutionStrategy.SEMANTIC_PROPOSAL,
418 proposed_action=PolicyAction.PREFER_OURS,
419 confidence=0.7,
420 rationale="fuzzy match",
421 )
422 assert prop.policy_id is None
423 assert prop.similar_pattern_id is None
424 assert prop.similarity is None
425 assert prop.requires_confirmation is False
426
427
428 class TestConditionMatching:
429 """I: _condition_matches and match_policy first-match-wins semantics."""
430
431 def _pattern(self, **kwargs: str | int | float | bool | None) -> ConflictPattern:
432 return _make_pattern(**kwargs)
433
434 def test_all_none_matches_anything(self) -> None:
435 cond = PolicyCondition()
436 assert _condition_matches(cond, self._pattern()) is True
437
438 def test_conflict_type_match(self) -> None:
439 cond = PolicyCondition(conflict_type=ConflictType.CONTENT)
440 assert _condition_matches(cond, self._pattern(conflict_type=ConflictType.CONTENT)) is True
441 assert _condition_matches(cond, self._pattern(conflict_type=ConflictType.METADATA)) is False
442
443 def test_domain_match(self) -> None:
444 cond = PolicyCondition(domain="midi")
445 assert _condition_matches(cond, self._pattern(domain="midi")) is True
446 assert _condition_matches(cond, self._pattern(domain="code")) is False
447
448 def test_path_pattern_glob(self) -> None:
449 cond = PolicyCondition(path_pattern="*.mid")
450 assert _condition_matches(cond, self._pattern(path="track.mid")) is True
451 assert _condition_matches(cond, self._pattern(path="src/main.py")) is False
452
453 def test_path_pattern_prefix_glob(self) -> None:
454 cond = PolicyCondition(path_pattern="audio/*")
455 assert _condition_matches(cond, self._pattern(path="audio/kick.mid")) is True
456 assert _condition_matches(cond, self._pattern(path="video/clip.mp4")) is False
457
458 def test_all_conditions_must_match(self) -> None:
459 cond = PolicyCondition(conflict_type=ConflictType.CONTENT, domain="midi")
460 matching = self._pattern(conflict_type=ConflictType.CONTENT, domain="midi")
461 wrong_domain = self._pattern(conflict_type=ConflictType.CONTENT, domain="code")
462 wrong_type = self._pattern(conflict_type=ConflictType.METADATA, domain="midi")
463 assert _condition_matches(cond, matching) is True
464 assert _condition_matches(cond, wrong_domain) is False
465 assert _condition_matches(cond, wrong_type) is False
466
467 def test_match_policy_first_match_wins(self) -> None:
468 policy_a = _make_policy("policy-a", scope=PolicyScope.WORKSPACE, conflict_type=ConflictType.CONTENT)
469 policy_b = _make_policy("policy-b", scope=PolicyScope.REPO, conflict_type=ConflictType.CONTENT)
470 pattern = self._pattern(conflict_type=ConflictType.CONTENT)
471 result = match_policy([policy_a, policy_b], pattern)
472 assert result is not None
473 assert result.policy_id == "policy-a"
474
475 def test_match_policy_no_match_returns_none(self) -> None:
476 policy = _make_policy("p", conflict_type=ConflictType.STRUCTURAL)
477 pattern = self._pattern(conflict_type=ConflictType.CONTENT)
478 assert match_policy([policy], pattern) is None
479
480 def test_match_policy_empty_list(self) -> None:
481 pattern = self._pattern()
482 assert match_policy([], pattern) is None
483
484 def test_min_confidence_not_evaluated_here(self) -> None:
485 # min_confidence is an engine-level filter, not evaluated by _condition_matches
486 cond = PolicyCondition(min_confidence=0.99)
487 # Should still match since no other fields constrain the pattern
488 assert _condition_matches(cond, self._pattern()) is True
489
490
491 # ===========================================================================
492 # Tier II — Integration tests
493 # ===========================================================================
494
495
496 class TestPatternCRUD:
497 """II: record_pattern, load_pattern, list_patterns, forget_pattern, clear_all."""
498
499 def test_record_and_load(self, repo: pathlib.Path) -> None:
500 pattern = _make_pattern()
501 record_pattern(repo, pattern)
502 loaded = load_pattern(repo, pattern.pattern_id)
503 assert loaded is not None
504 assert loaded.pattern_id == pattern.pattern_id
505 assert loaded.path == pattern.path
506 assert loaded.domain == pattern.domain
507 assert loaded.conflict_type == pattern.conflict_type
508
509 def test_record_is_idempotent(self, repo: pathlib.Path) -> None:
510 pattern = _make_pattern()
511 record_pattern(repo, pattern)
512 # Second call should not raise and should return same ID
513 pid = record_pattern(repo, pattern)
514 assert pid == pattern.pattern_id
515 # Only one pattern.json should exist
516 entry_dir = h.pattern_dir(repo, pattern.pattern_id)
517 assert list(entry_dir.glob("pattern.json")) == [entry_dir / "pattern.json"]
518
519 def test_load_nonexistent_returns_none(self, repo: pathlib.Path) -> None:
520 assert load_pattern(repo, "a" * 64) is None
521
522 def test_load_invalid_id_returns_none(self, repo: pathlib.Path) -> None:
523 assert load_pattern(repo, "not-a-hex-id") is None
524
525 def test_list_patterns_empty(self, repo: pathlib.Path) -> None:
526 assert list_patterns(repo) == []
527
528 def test_list_patterns_multiple(self, repo: pathlib.Path) -> None:
529 p1 = _make_pattern(path="a.mid", ours="oa", theirs="ta")
530 p2 = _make_pattern(path="b.mid", ours="ob", theirs="tb")
531 record_pattern(repo, p1)
532 record_pattern(repo, p2)
533 results = list_patterns(repo)
534 assert len(results) == 2
535 pids = {r.pattern_id for r in results}
536 assert {p1.pattern_id, p2.pattern_id} == pids
537
538 def test_list_patterns_sorted_newest_first(self, repo: pathlib.Path) -> None:
539 older = _make_pattern(path="old.mid", ours="oa", theirs="ta")
540 newer = _make_pattern(path="new.mid", ours="ob", theirs="tb")
541 # Force newer to be newer by manipulating recorded_at
542 import dataclasses
543 ts_old = datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc)
544 ts_new = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc)
545 older = dataclasses.replace(older, recorded_at=ts_old)
546 newer = dataclasses.replace(newer, recorded_at=ts_new)
547 record_pattern(repo, older)
548 record_pattern(repo, newer)
549 results = list_patterns(repo)
550 assert results[0].pattern_id == newer.pattern_id
551
552 def test_forget_pattern_removes_entry(self, repo: pathlib.Path) -> None:
553 pattern = _make_pattern()
554 record_pattern(repo, pattern)
555 assert forget_pattern(repo, pattern.pattern_id) is True
556 assert load_pattern(repo, pattern.pattern_id) is None
557
558 def test_forget_nonexistent_returns_false(self, repo: pathlib.Path) -> None:
559 assert forget_pattern(repo, "a" * 64) is False
560
561 def test_forget_invalid_id_returns_false(self, repo: pathlib.Path) -> None:
562 assert forget_pattern(repo, "../traversal") is False
563
564 def test_forget_also_removes_resolutions(self, repo: pathlib.Path) -> None:
565 pattern = _make_pattern()
566 record_pattern(repo, pattern)
567 res = _make_resolution(pattern)
568 save_resolution(repo, res)
569 forget_pattern(repo, pattern.pattern_id)
570 # Resolution directory should be gone
571 res_dir = h._resolutions_dir(repo, pattern.pattern_id)
572 assert not res_dir.exists()
573
574 def test_clear_all_removes_all(self, repo: pathlib.Path) -> None:
575 for i in range(5):
576 record_pattern(repo, _make_pattern(path=f"f{i}.mid", ours=f"o{i}", theirs=f"t{i}"))
577 removed = clear_all(repo)
578 assert removed == 5
579 assert list_patterns(repo) == []
580
581 def test_clear_all_empty_store(self, repo: pathlib.Path) -> None:
582 assert clear_all(repo) == 0
583
584 def test_record_pattern_invalid_id_raises(self, repo: pathlib.Path) -> None:
585 import dataclasses
586 pattern = _make_pattern()
587 bad = dataclasses.replace(pattern, pattern_id="bad-id")
588 with pytest.raises(ValueError):
589 record_pattern(repo, bad)
590
591
592 class TestResolutionCRUD:
593 """II: save_resolution, load_resolution, list_resolutions, increment_applied_count, best_resolution."""
594
595 def test_save_and_load(self, repo: pathlib.Path) -> None:
596 pattern = _make_pattern()
597 record_pattern(repo, pattern)
598 res = _make_resolution(pattern)
599 save_resolution(repo, res)
600 loaded = load_resolution(repo, pattern.pattern_id, res.resolution_id)
601 assert loaded is not None
602 assert loaded.resolution_id == res.resolution_id
603 assert loaded.pattern_id == pattern.pattern_id
604 assert loaded.strategy == res.strategy
605 assert loaded.confidence == res.confidence
606
607 def test_save_is_idempotent(self, repo: pathlib.Path) -> None:
608 pattern = _make_pattern()
609 record_pattern(repo, pattern)
610 res = _make_resolution(pattern)
611 save_resolution(repo, res)
612 save_resolution(repo, res) # second call is no-op
613 assert len(list_resolutions(repo, pattern.pattern_id)) == 1
614
615 def test_save_requires_parent_pattern(self, repo: pathlib.Path) -> None:
616 pattern = _make_pattern()
617 res = _make_resolution(pattern)
618 with pytest.raises(FileNotFoundError, match="No harmony pattern"):
619 save_resolution(repo, res)
620
621 def test_load_nonexistent_returns_none(self, repo: pathlib.Path) -> None:
622 assert load_resolution(repo, "a" * 64, "b" * 64) is None
623
624 def test_load_invalid_ids_return_none(self, repo: pathlib.Path) -> None:
625 assert load_resolution(repo, "bad", "b" * 64) is None
626 assert load_resolution(repo, "a" * 64, "bad") is None
627
628 def test_list_resolutions_empty(self, repo: pathlib.Path) -> None:
629 pattern = _make_pattern()
630 record_pattern(repo, pattern)
631 assert list_resolutions(repo, pattern.pattern_id) == []
632
633 def test_list_resolutions_sorted_by_quality(self, repo: pathlib.Path) -> None:
634 """human_verified > confidence > applied_count (desc)."""
635 pattern = _make_pattern()
636 record_pattern(repo, pattern)
637
638 low_conf = _make_resolution(pattern, confidence=0.3)
639 high_conf = _make_resolution(pattern, confidence=0.9)
640 verified = _make_resolution(pattern, confidence=0.5, human_verified=True)
641
642 # Build distinct resolutions (different outcomes)
643 import dataclasses
644 low_conf = dataclasses.replace(
645 low_conf,
646 outcome_blob=fake_id("low_outcome"),
647 resolution_id=fake_id("low_res"),
648 )
649 high_conf = dataclasses.replace(
650 high_conf,
651 outcome_blob=fake_id("high_outcome"),
652 resolution_id=fake_id("high_res"),
653 )
654 verified = dataclasses.replace(
655 verified,
656 outcome_blob=fake_id("ver_outcome"),
657 resolution_id=fake_id("ver_res"),
658 )
659
660 for r in (low_conf, high_conf, verified):
661 save_resolution(repo, r)
662
663 results = list_resolutions(repo, pattern.pattern_id)
664 assert results[0].resolution_id == verified.resolution_id # human_verified first
665 assert results[-1].resolution_id == low_conf.resolution_id # lowest confidence last
666
667 def test_increment_applied_count(self, repo: pathlib.Path) -> None:
668 pattern = _make_pattern()
669 record_pattern(repo, pattern)
670 res = _make_resolution(pattern)
671 save_resolution(repo, res)
672 assert increment_applied_count(repo, pattern.pattern_id, res.resolution_id) is True
673 loaded = load_resolution(repo, pattern.pattern_id, res.resolution_id)
674 assert loaded is not None
675 assert loaded.applied_count == 1
676
677 def test_increment_applied_count_multiple_times(self, repo: pathlib.Path) -> None:
678 pattern = _make_pattern()
679 record_pattern(repo, pattern)
680 res = _make_resolution(pattern)
681 save_resolution(repo, res)
682 for _ in range(5):
683 increment_applied_count(repo, pattern.pattern_id, res.resolution_id)
684 loaded = load_resolution(repo, pattern.pattern_id, res.resolution_id)
685 assert loaded is not None
686 assert loaded.applied_count == 5
687
688 def test_increment_nonexistent_returns_false(self, repo: pathlib.Path) -> None:
689 assert increment_applied_count(repo, "a" * 64, "b" * 64) is False
690
691 def test_best_resolution_returns_highest_quality(self, repo: pathlib.Path) -> None:
692 pattern = _make_pattern()
693 record_pattern(repo, pattern)
694
695 import dataclasses
696 r1 = _make_resolution(pattern, confidence=0.5)
697 r2 = _make_resolution(pattern, confidence=0.9)
698 r1 = dataclasses.replace(r1, outcome_blob=fake_id("r1ob"), resolution_id=fake_id("r1id"))
699 r2 = dataclasses.replace(r2, outcome_blob=fake_id("r2ob"), resolution_id=fake_id("r2id"))
700 save_resolution(repo, r1)
701 save_resolution(repo, r2)
702
703 best = best_resolution(repo, pattern.pattern_id)
704 assert best is not None
705 assert best.resolution_id == r2.resolution_id
706
707 def test_best_resolution_none_when_no_resolutions(self, repo: pathlib.Path) -> None:
708 pattern = _make_pattern()
709 record_pattern(repo, pattern)
710 assert best_resolution(repo, pattern.pattern_id) is None
711
712
713 class TestPolicyCRUD:
714 """II: save_policy, load_policy, list_policies scope-sorted, remove_policy."""
715
716 def test_save_and_load(self, repo: pathlib.Path) -> None:
717 policy = _make_policy()
718 save_policy(repo, policy)
719 loaded = load_policy(repo, policy.policy_id)
720 assert loaded is not None
721 assert loaded.policy_id == policy.policy_id
722 assert loaded.action == policy.action
723 assert loaded.scope == policy.scope
724
725 def test_save_overwrites_existing(self, repo: pathlib.Path) -> None:
726 policy = _make_policy(action=PolicyAction.PREFER_OURS)
727 save_policy(repo, policy)
728 import dataclasses
729 updated = dataclasses.replace(policy, action=PolicyAction.PREFER_THEIRS)
730 save_policy(repo, updated)
731 loaded = load_policy(repo, policy.policy_id)
732 assert loaded is not None
733 assert loaded.action == PolicyAction.PREFER_THEIRS
734
735 def test_load_nonexistent_returns_none(self, repo: pathlib.Path) -> None:
736 assert load_policy(repo, "missing-policy") is None
737
738 def test_load_invalid_id_returns_none(self, repo: pathlib.Path) -> None:
739 assert load_policy(repo, "bad/policy/id") is None
740
741 def test_list_policies_empty(self, repo: pathlib.Path) -> None:
742 assert list_policies(repo) == []
743
744 def test_list_policies_scope_order(self, repo: pathlib.Path) -> None:
745 """workspace → repo → domain → file regardless of insertion order."""
746 file_p = _make_policy("file-p", scope=PolicyScope.FILE)
747 workspace_p = _make_policy("workspace-p", scope=PolicyScope.WORKSPACE)
748 domain_p = _make_policy("domain-p", scope=PolicyScope.DOMAIN)
749 repo_p = _make_policy("repo-p", scope=PolicyScope.REPO)
750 for p in (file_p, workspace_p, domain_p, repo_p):
751 save_policy(repo, p)
752 results = list_policies(repo)
753 scopes = [r.scope for r in results]
754 assert scopes.index(PolicyScope.WORKSPACE) < scopes.index(PolicyScope.REPO)
755 assert scopes.index(PolicyScope.REPO) < scopes.index(PolicyScope.DOMAIN)
756 assert scopes.index(PolicyScope.DOMAIN) < scopes.index(PolicyScope.FILE)
757
758 def test_remove_policy_returns_true(self, repo: pathlib.Path) -> None:
759 policy = _make_policy()
760 save_policy(repo, policy)
761 assert remove_policy(repo, policy.policy_id) is True
762 assert load_policy(repo, policy.policy_id) is None
763
764 def test_remove_nonexistent_returns_false(self, repo: pathlib.Path) -> None:
765 assert remove_policy(repo, "no-such-policy") is False
766
767 def test_remove_invalid_id_returns_false(self, repo: pathlib.Path) -> None:
768 assert remove_policy(repo, "bad/id") is False
769
770 def test_save_invalid_id_raises(self, repo: pathlib.Path) -> None:
771 import dataclasses
772 policy = _make_policy()
773 bad = dataclasses.replace(policy, policy_id="bad/id")
774 with pytest.raises(ValueError):
775 save_policy(repo, bad)
776
777 def test_condition_round_trips(self, repo: pathlib.Path) -> None:
778 policy = _make_policy(conflict_type=ConflictType.CONTENT, domain="midi", path_pattern="*.mid")
779 save_policy(repo, policy)
780 loaded = load_policy(repo, policy.policy_id)
781 assert loaded is not None
782 assert loaded.when.conflict_type == ConflictType.CONTENT
783 assert loaded.when.domain == "midi"
784 assert loaded.when.path_pattern == "*.mid"
785
786
787 class TestAuditLog:
788 """II: append_audit, list_audit sorted newest-first."""
789
790 def test_append_and_list(self, repo: pathlib.Path) -> None:
791 actor = AgentProvenance.agent("claude-code")
792 append_audit(repo, AuditEventType.PATTERN_RECORDED, actor, pattern_id="a" * 64)
793 entries = list_audit(repo)
794 assert len(entries) == 1
795 assert entries[0]["event_type"] == AuditEventType.PATTERN_RECORDED
796
797 def test_entries_sorted_newest_first(self, repo: pathlib.Path) -> None:
798 actor = AgentProvenance.human()
799 for i in range(3):
800 append_audit(repo, AuditEventType.GC_RUN, actor, metadata={"i": i})
801 time.sleep(0.01) # slight delay so filenames differ
802 entries = list_audit(repo)
803 assert len(entries) == 3
804 # Filenames encode date+content-id — sorted descending means newest at [0]
805 names_in_dir = sorted(
806 (f.name for f in h.audit_dir(repo).iterdir()),
807 reverse=True,
808 )
809 # audit_id is "sha256:<hex>"; filename embeds 12 hex chars starting at index 7
810 assert entries[0]["audit_id"][7:19] in names_in_dir[0]
811
812 def test_list_audit_empty(self, repo: pathlib.Path) -> None:
813 assert list_audit(repo) == []
814
815 def test_limit_respected(self, repo: pathlib.Path) -> None:
816 actor = AgentProvenance.human()
817 for _ in range(10):
818 append_audit(repo, AuditEventType.GC_RUN, actor)
819 entries = list_audit(repo, limit=3)
820 assert len(entries) == 3
821
822 def test_audit_fields_present(self, repo: pathlib.Path) -> None:
823 actor = AgentProvenance.agent("claude-code", "claude-sonnet-4-6")
824 append_audit(
825 repo, AuditEventType.RESOLUTION_SAVED, actor,
826 pattern_id="a" * 64, resolution_id="b" * 64,
827 metadata={"extra": "data"},
828 )
829 entry = list_audit(repo)[0]
830 assert entry["event_type"] == AuditEventType.RESOLUTION_SAVED
831 assert entry["pattern_id"] == "a" * 64
832 assert entry["resolution_id"] == "b" * 64
833 assert entry["acted_by"]["agent_id"] == "claude-code"
834 assert entry["metadata"]["extra"] == "data"
835 assert "audit_id" in entry
836 assert "occurred_at" in entry
837
838
839 class TestGcStale:
840 """II: gc_stale keeps resolved patterns and removes old unresolved ones."""
841
842 def test_gc_removes_old_unresolved(self, repo: pathlib.Path) -> None:
843 import dataclasses
844 old_pattern = _make_pattern()
845 old_ts = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc)
846 old_pattern = dataclasses.replace(old_pattern, recorded_at=old_ts)
847 record_pattern(repo, old_pattern)
848 removed = gc_stale(repo, age_days=1)
849 assert removed == 1
850 assert load_pattern(repo, old_pattern.pattern_id) is None
851
852 def test_gc_keeps_resolved_pattern(self, repo: pathlib.Path) -> None:
853 import dataclasses
854 pattern = _make_pattern()
855 old_ts = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc)
856 pattern = dataclasses.replace(pattern, recorded_at=old_ts)
857 record_pattern(repo, pattern)
858 res = _make_resolution(pattern)
859 save_resolution(repo, res)
860 removed = gc_stale(repo, age_days=1)
861 assert removed == 0
862 assert load_pattern(repo, pattern.pattern_id) is not None
863
864 def test_gc_keeps_recent_unresolved(self, repo: pathlib.Path) -> None:
865 pattern = _make_pattern() # recorded_at = now
866 record_pattern(repo, pattern)
867 removed = gc_stale(repo, age_days=90)
868 assert removed == 0
869
870 def test_gc_empty_store(self, repo: pathlib.Path) -> None:
871 assert gc_stale(repo, age_days=1) == 0
872
873
874 # ===========================================================================
875 # Tier III — End-to-end lifecycle
876 # ===========================================================================
877
878
879 class TestFullLifecycle:
880 """III: record → save_resolution → best_resolution → increment → gc won't touch it."""
881
882 def test_complete_lifecycle(self, repo: pathlib.Path) -> None:
883 # 1. Record the conflict pattern
884 pattern = _make_pattern(
885 path="tracks/lead.mid",
886 domain="midi",
887 conflict_type=ConflictType.CONTENT,
888 )
889 pid = record_pattern(repo, pattern)
890 assert pid == pattern.pattern_id
891 assert load_pattern(repo, pid) is not None
892
893 # 2. Save a resolution
894 res = _make_resolution(pattern, strategy=ResolutionStrategy.MANUAL, confidence=0.85)
895 save_resolution(repo, res)
896
897 # 3. Retrieve best resolution
898 best = best_resolution(repo, pid)
899 assert best is not None
900 assert best.resolution_id == res.resolution_id
901 assert best.confidence == pytest.approx(0.85)
902
903 # 4. Replay — increment applied count
904 increment_applied_count(repo, pid, res.resolution_id)
905 increment_applied_count(repo, pid, res.resolution_id)
906 reloaded = load_resolution(repo, pid, res.resolution_id)
907 assert reloaded is not None
908 assert reloaded.applied_count == 2
909
910 # 5. GC should NOT remove — has a resolution
911 import dataclasses
912 old_ts = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc)
913 pattern_old = dataclasses.replace(pattern, recorded_at=old_ts)
914 # Re-record is idempotent so we can't update recorded_at via record_pattern.
915 # Write updated pattern.json directly for this edge-case test.
916 entry_p = h.pattern_dir(repo, pid) / "pattern.json"
917 entry_p.write_text(
918 json.dumps(h._pattern_to_dict(pattern_old), indent=2), encoding="utf-8"
919 )
920 gc_count = gc_stale(repo, age_days=1)
921 assert gc_count == 0 # Protected because it has a resolution
922 assert load_pattern(repo, pid) is not None
923
924 def test_policy_fires_on_matching_pattern(self, repo: pathlib.Path) -> None:
925 policy = _make_policy(
926 "midi-prefer-ours",
927 scope=PolicyScope.DOMAIN,
928 action=PolicyAction.PREFER_OURS,
929 domain="midi",
930 )
931 save_policy(repo, policy)
932
933 pattern = _make_pattern(domain="midi", conflict_type=ConflictType.CONTENT)
934 record_pattern(repo, pattern)
935
936 policies = list_policies(repo)
937 matched = match_policy(policies, pattern)
938 assert matched is not None
939 assert matched.policy_id == "midi-prefer-ours"
940 assert matched.action == PolicyAction.PREFER_OURS
941
942 def test_policy_does_not_fire_wrong_domain(self, repo: pathlib.Path) -> None:
943 policy = _make_policy("midi-only", domain="midi")
944 save_policy(repo, policy)
945 pattern = _make_pattern(domain="code")
946 record_pattern(repo, pattern)
947 matched = match_policy(list_policies(repo), pattern)
948 assert matched is None
949
950 def test_audit_trail_through_lifecycle(self, repo: pathlib.Path) -> None:
951 actor = AgentProvenance.agent("claude-code", "claude-sonnet-4-6")
952 pattern = _make_pattern()
953 pid = record_pattern(repo, pattern)
954 append_audit(repo, AuditEventType.PATTERN_RECORDED, actor, pattern_id=pid)
955
956 res = _make_resolution(pattern)
957 save_resolution(repo, res)
958 append_audit(
959 repo, AuditEventType.RESOLUTION_SAVED, actor,
960 pattern_id=pid, resolution_id=res.resolution_id,
961 )
962
963 entries = list_audit(repo)
964 event_types = [e["event_type"] for e in entries]
965 assert AuditEventType.RESOLUTION_SAVED in event_types
966 assert AuditEventType.PATTERN_RECORDED in event_types
967
968
969 # ===========================================================================
970 # Tier IV — Stress tests
971 # ===========================================================================
972
973
974 class TestStress:
975 """IV: 10k pattern scan, concurrent record_pattern, concurrent save_resolution."""
976
977 def test_100_patterns_scan(self, repo: pathlib.Path) -> None:
978 """Store 100 patterns and verify list_patterns returns all of them."""
979 n = 100
980 for i in range(n):
981 record_pattern(repo, _make_pattern(path=f"f{i}.mid", ours=f"o{i}", theirs=f"t{i}"))
982 results = list_patterns(repo)
983 assert len(results) == n
984
985 def test_concurrent_record_pattern_no_corruption(self, repo: pathlib.Path) -> None:
986 """Concurrent record_pattern from 20 threads — all patterns must be loadable."""
987 patterns = [
988 _make_pattern(path=f"concurrent{i}.mid", ours=f"co{i}", theirs=f"ct{i}")
989 for i in range(20)
990 ]
991 errors: list[Exception] = []
992
993 def worker(p: ConflictPattern) -> None:
994 try:
995 record_pattern(repo, p)
996 except Exception as exc:
997 errors.append(exc)
998
999 with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
1000 futures = [executor.submit(worker, p) for p in patterns]
1001 concurrent.futures.wait(futures)
1002
1003 assert errors == [], f"Thread errors: {errors}"
1004 for p in patterns:
1005 assert load_pattern(repo, p.pattern_id) is not None
1006
1007 def test_concurrent_increment_applied_count(self, repo: pathlib.Path) -> None:
1008 """20 concurrent increments — none must crash, final count must be ≥ 1.
1009
1010 ``increment_applied_count`` is a read-modify-write cycle; ``os.replace``
1011 makes each individual write atomic but does not serialise the full cycle.
1012 Under heavy concurrency, updates may be lost (last writer wins). The
1013 guarantee is: no exception, file always valid, count always ≥ 1.
1014 """
1015 pattern = _make_pattern()
1016 record_pattern(repo, pattern)
1017 res = _make_resolution(pattern)
1018 save_resolution(repo, res)
1019
1020 lock = threading.Lock()
1021 errors: list[Exception] = []
1022
1023 def worker() -> None:
1024 try:
1025 increment_applied_count(repo, pattern.pattern_id, res.resolution_id)
1026 except Exception as exc:
1027 with lock:
1028 errors.append(exc)
1029
1030 threads = [threading.Thread(target=worker) for _ in range(20)]
1031 for t in threads:
1032 t.start()
1033 for t in threads:
1034 t.join()
1035
1036 assert errors == [], f"Thread errors: {errors}"
1037 loaded = load_resolution(repo, pattern.pattern_id, res.resolution_id)
1038 assert loaded is not None
1039 # At least one increment must have landed; file must be valid JSON
1040 assert loaded.applied_count >= 1
1041
1042 def test_list_patterns_scan_cap_does_not_crash(
1043 self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch
1044 ) -> None:
1045 """If _MAX_SCAN is set to 5, list_patterns truncates rather than crashing."""
1046 import muse.core.harmony.patterns as _hpat
1047 monkeypatch.setattr(h, "_MAX_SCAN", 5)
1048 monkeypatch.setattr(_hpat, "_MAX_SCAN", 5)
1049 for i in range(10):
1050 record_pattern(repo, _make_pattern(path=f"s{i}.mid", ours=f"o{i}", theirs=f"t{i}"))
1051 results = list_patterns(repo)
1052 assert len(results) <= 5
1053
1054
1055 # ===========================================================================
1056 # Tier V — Data integrity
1057 # ===========================================================================
1058
1059
1060 class TestDataIntegrity:
1061 """V: atomic writes (no temp files left), JSON round-trip, field type preservation."""
1062
1063 def test_no_temp_files_after_record_pattern(self, repo: pathlib.Path) -> None:
1064 pattern = _make_pattern()
1065 record_pattern(repo, pattern)
1066 entry_dir = h.pattern_dir(repo, pattern.pattern_id)
1067 tmp_files = list(entry_dir.glob(".harmony-tmp-*"))
1068 assert tmp_files == []
1069
1070 def test_no_temp_files_after_save_resolution(self, repo: pathlib.Path) -> None:
1071 pattern = _make_pattern()
1072 record_pattern(repo, pattern)
1073 res = _make_resolution(pattern)
1074 save_resolution(repo, res)
1075 res_dir = h._resolutions_dir(repo, pattern.pattern_id)
1076 tmp_files = list(res_dir.glob(".harmony-tmp-*"))
1077 assert tmp_files == []
1078
1079 def test_no_temp_files_after_save_policy(self, repo: pathlib.Path) -> None:
1080 policy = _make_policy()
1081 save_policy(repo, policy)
1082 tmp_files = list(h.policies_dir(repo).glob(".harmony-tmp-*"))
1083 assert tmp_files == []
1084
1085 def test_pattern_json_round_trip(self, repo: pathlib.Path) -> None:
1086 pattern = _make_pattern(
1087 path="round/trip.mid",
1088 domain="midi",
1089 conflict_type=ConflictType.STRUCTURAL,
1090 description={"beats": 4, "key": "Cmaj"},
1091 )
1092 record_pattern(repo, pattern)
1093 loaded = load_pattern(repo, pattern.pattern_id)
1094 assert loaded is not None
1095 assert loaded.path == "round/trip.mid"
1096 assert loaded.domain == "midi"
1097 assert loaded.conflict_type == ConflictType.STRUCTURAL
1098 assert loaded.description == {"beats": 4, "key": "Cmaj"}
1099 assert loaded.ours_id == pattern.ours_id
1100 assert loaded.theirs_id == pattern.theirs_id
1101
1102 def test_resolution_json_round_trip(self, repo: pathlib.Path) -> None:
1103 pattern = _make_pattern()
1104 record_pattern(repo, pattern)
1105 prov = AgentProvenance.agent("claude-code", "claude-sonnet-4-6")
1106 res = _make_resolution(pattern, provenance=prov, confidence=0.77, human_verified=True)
1107 save_resolution(repo, res)
1108 loaded = load_resolution(repo, pattern.pattern_id, res.resolution_id)
1109 assert loaded is not None
1110 assert loaded.confidence == pytest.approx(0.77)
1111 assert loaded.human_verified is True
1112 assert loaded.resolved_by.type == "agent"
1113 assert loaded.resolved_by.agent_id == "claude-code"
1114 assert loaded.resolved_by.model_id == "claude-sonnet-4-6"
1115
1116 def test_policy_json_round_trip(self, repo: pathlib.Path) -> None:
1117 policy = _make_policy(
1118 "round-trip-policy",
1119 scope=PolicyScope.WORKSPACE,
1120 action=PolicyAction.ESCALATE,
1121 confidence=0.6,
1122 conflict_type=ConflictType.RELATIONAL,
1123 domain="code",
1124 path_pattern="src/**",
1125 )
1126 save_policy(repo, policy)
1127 loaded = load_policy(repo, policy.policy_id)
1128 assert loaded is not None
1129 assert loaded.scope == PolicyScope.WORKSPACE
1130 assert loaded.action == PolicyAction.ESCALATE
1131 assert loaded.confidence == pytest.approx(0.6)
1132 assert loaded.when.conflict_type == ConflictType.RELATIONAL
1133 assert loaded.when.domain == "code"
1134 assert loaded.when.path_pattern == "src/**"
1135
1136 def test_agent_provenance_round_trip(self) -> None:
1137 p = AgentProvenance.agent("claude-code", "claude-sonnet-4-6")
1138 d = p.to_dict()
1139 restored = AgentProvenance.from_dict(d)
1140 assert restored == p
1141
1142 def test_agent_provenance_human_round_trip(self) -> None:
1143 p = AgentProvenance.human()
1144 restored = AgentProvenance.from_dict(p.to_dict())
1145 assert restored == p
1146
1147 def test_recorded_at_is_utc_aware(self, repo: pathlib.Path) -> None:
1148 pattern = _make_pattern()
1149 record_pattern(repo, pattern)
1150 loaded = load_pattern(repo, pattern.pattern_id)
1151 assert loaded is not None
1152 assert loaded.recorded_at.tzinfo is not None
1153
1154 def test_resolved_at_is_utc_aware(self, repo: pathlib.Path) -> None:
1155 pattern = _make_pattern()
1156 record_pattern(repo, pattern)
1157 res = _make_resolution(pattern)
1158 save_resolution(repo, res)
1159 loaded = load_resolution(repo, pattern.pattern_id, res.resolution_id)
1160 assert loaded is not None
1161 assert loaded.resolved_at.tzinfo is not None
1162
1163 def test_applied_count_starts_at_zero(self, repo: pathlib.Path) -> None:
1164 pattern = _make_pattern()
1165 record_pattern(repo, pattern)
1166 res = _make_resolution(pattern)
1167 save_resolution(repo, res)
1168 loaded = load_resolution(repo, pattern.pattern_id, res.resolution_id)
1169 assert loaded is not None
1170 assert loaded.applied_count == 0
1171
1172
1173 # ===========================================================================
1174 # Tier VI — Security
1175 # ===========================================================================
1176
1177
1178 class TestSecurity:
1179 """VI: path traversal, symlink guards, size caps, crafted policy_id."""
1180
1181 def test_path_traversal_in_pattern_id_rejected(self, repo: pathlib.Path) -> None:
1182 with pytest.raises(ValueError):
1183 h._validate_id("../../../etc/passwd")
1184
1185 def test_path_traversal_in_load_pattern(self, repo: pathlib.Path) -> None:
1186 result = load_pattern(repo, "../traversal")
1187 assert result is None
1188
1189 def test_path_traversal_in_load_resolution(self, repo: pathlib.Path) -> None:
1190 result = load_resolution(repo, "a" * 64, "../traversal")
1191 assert result is None
1192
1193 def test_path_traversal_in_forget_pattern(self, repo: pathlib.Path) -> None:
1194 result = forget_pattern(repo, "../traversal/../../../../../etc")
1195 assert result is False
1196
1197 def test_crafted_policy_id_with_slash(self, repo: pathlib.Path) -> None:
1198 result = load_policy(repo, "../../etc/passwd")
1199 assert result is None
1200
1201 def test_crafted_policy_id_with_null_byte(self, repo: pathlib.Path) -> None:
1202 with pytest.raises(ValueError):
1203 h._validate_policy_id("policy\x00id")
1204
1205 def test_symlinks_in_patterns_dir_skipped(self, repo: pathlib.Path) -> None:
1206 pdir = h.patterns_dir(repo)
1207 pdir.mkdir(parents=True, exist_ok=True)
1208 # Create a symlink in the patterns dir
1209 link = pdir / ("s" * 64)
1210 target = repo / "other"
1211 target.mkdir()
1212 link.symlink_to(target)
1213 results = list_patterns(repo)
1214 assert results == [] # symlink skipped
1215
1216 def test_symlinks_in_policies_dir_skipped(self, repo: pathlib.Path) -> None:
1217 poldir = h.policies_dir(repo)
1218 poldir.mkdir(parents=True, exist_ok=True)
1219 link = poldir / "linked-policy.json"
1220 target = repo / "victim.json"
1221 target.write_text('{"malicious": true}')
1222 link.symlink_to(target)
1223 results = list_policies(repo)
1224 assert results == []
1225
1226 def test_oversized_pattern_file_rejected(self, repo: pathlib.Path) -> None:
1227 pattern = _make_pattern()
1228 record_pattern(repo, pattern)
1229 meta_p = h.pattern_dir(repo, pattern.pattern_id) / "pattern.json"
1230 # Overwrite with a file exceeding cap
1231 meta_p.write_bytes(b"x" * (_MAX_PATTERN_BYTES + 1))
1232 result = load_pattern(repo, pattern.pattern_id)
1233 assert result is None
1234
1235 def test_oversized_resolution_file_rejected(self, repo: pathlib.Path) -> None:
1236 pattern = _make_pattern()
1237 record_pattern(repo, pattern)
1238 res = _make_resolution(pattern)
1239 save_resolution(repo, res)
1240 dest = h._resolution_path(repo, pattern.pattern_id, res.resolution_id)
1241 dest.write_bytes(b"y" * (_MAX_RESOLUTION_BYTES + 1))
1242 result = load_resolution(repo, pattern.pattern_id, res.resolution_id)
1243 assert result is None
1244
1245 def test_oversized_policy_file_rejected(self, repo: pathlib.Path) -> None:
1246 policy = _make_policy()
1247 save_policy(repo, policy)
1248 dest = h.policies_dir(repo) / f"{policy.policy_id}.json"
1249 dest.write_bytes(b"z" * (_MAX_POLICY_BYTES + 1))
1250 result = load_policy(repo, policy.policy_id)
1251 assert result is None
1252
1253 def test_malformed_json_pattern_returns_none(self, repo: pathlib.Path) -> None:
1254 pid = "a" * 64
1255 entry = h.pattern_dir(repo, pid)
1256 entry.mkdir(parents=True)
1257 (entry / "pattern.json").write_text("not json", encoding="utf-8")
1258 assert load_pattern(repo, pid) is None
1259
1260 def test_malformed_json_resolution_returns_none(self, repo: pathlib.Path) -> None:
1261 pattern = _make_pattern()
1262 record_pattern(repo, pattern)
1263 rid = "b" * 64
1264 dest = h.resolution_path(repo, pattern.pattern_id, rid)
1265 dest.parent.mkdir(parents=True, exist_ok=True)
1266 dest.write_text("{{bad", encoding="utf-8")
1267 assert load_resolution(repo, pattern.pattern_id, rid) is None
1268
1269 def test_non_hex_dir_in_patterns_dir_skipped(self, repo: pathlib.Path) -> None:
1270 pdir = h.patterns_dir(repo)
1271 pdir.mkdir(parents=True, exist_ok=True)
1272 (pdir / "not-a-valid-id").mkdir()
1273 assert list_patterns(repo) == []
1274
1275
1276 # ===========================================================================
1277 # Tier VII — Performance
1278 # ===========================================================================
1279
1280
1281 class TestPerformance:
1282 """VII: operation timing assertions — single operations must be fast."""
1283
1284 def test_record_pattern_under_50ms(self, repo: pathlib.Path) -> None:
1285 pattern = _make_pattern()
1286 start = time.monotonic()
1287 record_pattern(repo, pattern)
1288 elapsed = (time.monotonic() - start) * 1000
1289 assert elapsed < 50, f"record_pattern took {elapsed:.1f}ms — expected <50ms"
1290
1291 def test_load_pattern_under_10ms(self, repo: pathlib.Path) -> None:
1292 pattern = _make_pattern()
1293 record_pattern(repo, pattern)
1294 start = time.monotonic()
1295 load_pattern(repo, pattern.pattern_id)
1296 elapsed = (time.monotonic() - start) * 1000
1297 assert elapsed < 10, f"load_pattern took {elapsed:.1f}ms — expected <10ms"
1298
1299 def test_save_resolution_under_50ms(self, repo: pathlib.Path) -> None:
1300 pattern = _make_pattern()
1301 record_pattern(repo, pattern)
1302 res = _make_resolution(pattern)
1303 start = time.monotonic()
1304 save_resolution(repo, res)
1305 elapsed = (time.monotonic() - start) * 1000
1306 assert elapsed < 50, f"save_resolution took {elapsed:.1f}ms — expected <50ms"
1307
1308 def test_list_100_patterns_under_500ms(self, repo: pathlib.Path) -> None:
1309 n = 100
1310 for i in range(n):
1311 record_pattern(repo, _make_pattern(path=f"perf{i}.mid", ours=f"o{i}", theirs=f"t{i}"))
1312 start = time.monotonic()
1313 results = list_patterns(repo)
1314 elapsed = (time.monotonic() - start) * 1000
1315 assert len(results) == n
1316 assert elapsed < 500, f"list_patterns(100) took {elapsed:.1f}ms — expected <500ms"
1317
1318 def test_save_and_load_policy_under_20ms(self, repo: pathlib.Path) -> None:
1319 policy = _make_policy()
1320 start = time.monotonic()
1321 save_policy(repo, policy)
1322 load_policy(repo, policy.policy_id)
1323 elapsed = (time.monotonic() - start) * 1000
1324 assert elapsed < 20, f"save+load policy took {elapsed:.1f}ms — expected <20ms"
1325
1326 def test_append_audit_under_20ms(self, repo: pathlib.Path) -> None:
1327 actor = AgentProvenance.human()
1328 start = time.monotonic()
1329 append_audit(repo, AuditEventType.GC_RUN, actor)
1330 elapsed = (time.monotonic() - start) * 1000
1331 assert elapsed < 20, f"append_audit took {elapsed:.1f}ms — expected <20ms"
1332
1333 def test_increment_applied_count_under_20ms(self, repo: pathlib.Path) -> None:
1334 pattern = _make_pattern()
1335 record_pattern(repo, pattern)
1336 res = _make_resolution(pattern)
1337 save_resolution(repo, res)
1338 start = time.monotonic()
1339 increment_applied_count(repo, pattern.pattern_id, res.resolution_id)
1340 elapsed = (time.monotonic() - start) * 1000
1341 assert elapsed < 20, f"increment_applied_count took {elapsed:.1f}ms — expected <20ms"
File History 1 commit