"""Tests for Phase 4 — Escalation Records. New additions to ``muse/core/harmony.py``: ``EscalationStatus`` — open string constants: OPEN, RESOLVED ``AuditEventType.ESCALATION_RESOLVED`` ``EscalationRecord`` — frozen dataclass ``compute_escalation_id`` — deterministic hex64 from (pattern_id, reason) ``escalations_dir`` — ``.muse/harmony/escalations/`` ``record_escalation`` — atomic write; idempotent ``load_escalation`` — read by ID ``list_escalations`` — filtered list (status filter) ``resolve_escalation`` — transition open → resolved, atomic Coverage tiers -------------- I Unit — EscalationStatus constants, EscalationRecord fields, compute_escalation_id determinism + collision resistance II Integration — record / load / list / resolve_escalation CRUD III End-to-end — escalate → list → resolve → audit trail IV Stress — 200-escalation list, concurrent record (same ID idempotency) V Data integrity — JSON round-trip, all fields always present, frozen VI Security — ID validation, path traversal rejected, size cap respected VII Performance — all ops <50 ms """ from __future__ import annotations import concurrent.futures import dataclasses import datetime import json import pathlib import time import pytest import muse.core.harmony as h from muse.core.paths import harmony_dir, muse_dir from muse.core.types import fake_id, long_id, split_id from muse.core.harmony import ( AgentProvenance, AuditEventType, EscalationRecord, EscalationStatus, append_audit, blob_fingerprint, compute_escalation_id, compute_pattern_id, compute_resolution_id, escalation_path, escalations_dir, list_audit, list_escalations, load_escalation, record_escalation, resolve_escalation, save_resolution, Resolution, ) # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- def _utc_now() -> datetime.datetime: return datetime.datetime.now(datetime.timezone.utc) @pytest.fixture() def repo(tmp_path: pathlib.Path) -> pathlib.Path: muse_dir(tmp_path).mkdir() return tmp_path def _make_record( pattern_id: str | None = None, reason: str = "No match found", agent_id: str | None = None, ) -> EscalationRecord: pid = pattern_id or fake_id("pat1") eid = compute_escalation_id(pid, reason) return EscalationRecord( escalation_id=eid, pattern_id=pid, reason=reason, escalated_at=_utc_now(), escalated_by=( AgentProvenance.agent(agent_id) if agent_id else AgentProvenance.human() ), ) # =========================================================================== # Tier I — Unit # =========================================================================== class TestEscalationStatusConstants: """I: EscalationStatus is an open string-constant namespace.""" def test_open_value(self) -> None: assert EscalationStatus.OPEN == "open" def test_resolved_value(self) -> None: assert EscalationStatus.RESOLVED == "resolved" def test_constants_are_strings(self) -> None: assert isinstance(EscalationStatus.OPEN, str) assert isinstance(EscalationStatus.RESOLVED, str) class TestAuditEventTypeEscalationResolved: """I: AuditEventType.ESCALATION_RESOLVED constant exists.""" def test_escalation_resolved_constant(self) -> None: assert AuditEventType.ESCALATION_RESOLVED == "escalation_resolved" def test_escalation_recorded_still_present(self) -> None: assert AuditEventType.ESCALATION_RECORDED == "escalation_recorded" class TestEscalationRecordFields: """I: EscalationRecord dataclass shape and defaults.""" def test_required_fields_present(self) -> None: pid = fake_id("p") eid = compute_escalation_id(pid, "reason") r = EscalationRecord( escalation_id=eid, pattern_id=pid, reason="test reason", escalated_at=_utc_now(), escalated_by=AgentProvenance.human(), ) assert r.escalation_id == eid assert r.pattern_id == pid assert r.reason == "test reason" assert r.status == EscalationStatus.OPEN def test_status_defaults_to_open(self) -> None: r = _make_record() assert r.status == EscalationStatus.OPEN def test_optional_fields_default_none(self) -> None: r = _make_record() assert r.resolved_at is None assert r.resolved_by is None assert r.resolution_id is None def test_frozen(self) -> None: r = _make_record() with pytest.raises((dataclasses.FrozenInstanceError, AttributeError)): r.status = EscalationStatus.RESOLVED # type: ignore[misc] def test_is_dataclass(self) -> None: assert dataclasses.is_dataclass(EscalationRecord) class TestComputeEscalationId: """I: compute_escalation_id is deterministic and collision-resistant.""" def test_deterministic_same_inputs(self) -> None: pid = fake_id("p") eid1 = compute_escalation_id(pid, "reason") eid2 = compute_escalation_id(pid, "reason") assert eid1 == eid2 def test_different_pattern_ids_differ(self) -> None: pid1 = fake_id("p1") pid2 = fake_id("p2") assert compute_escalation_id(pid1, "reason") != compute_escalation_id(pid2, "reason") def test_different_reasons_differ(self) -> None: pid = fake_id("p") assert compute_escalation_id(pid, "reason A") != compute_escalation_id(pid, "reason B") def test_returns_sha256_id(self) -> None: eid = compute_escalation_id(fake_id("p"), "reason") assert eid.startswith("sha256:") assert len(eid) == 71 assert all(c in "0123456789abcdef" for c in split_id(eid)[1]) # =========================================================================== # Tier II — Integration: CRUD # =========================================================================== class TestEscalationsDir: """II: escalations_dir returns the correct path.""" def test_path_structure(self, repo: pathlib.Path) -> None: d = escalations_dir(repo) assert d == harmony_dir(repo) / "escalations" def test_does_not_create_dir(self, repo: pathlib.Path) -> None: d = escalations_dir(repo) assert not d.exists() class TestRecordEscalation: """II: record_escalation writes a JSON file; idempotent.""" def test_creates_escalation_file(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) dest = escalation_path(repo, rec.escalation_id) assert dest.exists() def test_idempotent_second_write_no_error(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) record_escalation(repo, rec) # should not raise dest = escalation_path(repo, rec.escalation_id) assert dest.exists() def test_idempotent_returns_false_on_second_write(self, repo: pathlib.Path) -> None: rec = _make_record() first = record_escalation(repo, rec) second = record_escalation(repo, rec) assert first is True assert second is False def test_json_contains_expected_keys(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) dest = escalation_path(repo, rec.escalation_id) data = json.loads(dest.read_text()) for key in ("escalation_id", "pattern_id", "reason", "escalated_at", "escalated_by", "status"): assert key in data def test_status_stored_as_open(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) dest = escalation_path(repo, rec.escalation_id) data = json.loads(dest.read_text()) assert data["status"] == EscalationStatus.OPEN def test_atomic_write_no_temp_files(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) esc_dir = escalations_dir(repo) tmp_files = list(esc_dir.glob("*.tmp")) assert tmp_files == [] class TestLoadEscalation: """II: load_escalation retrieves a stored record.""" def test_returns_none_for_missing(self, repo: pathlib.Path) -> None: eid = fake_id("missing") assert load_escalation(repo, eid) is None def test_round_trips_required_fields(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) loaded = load_escalation(repo, rec.escalation_id) assert loaded is not None assert loaded.escalation_id == rec.escalation_id assert loaded.pattern_id == rec.pattern_id assert loaded.reason == rec.reason assert loaded.status == EscalationStatus.OPEN def test_escalated_by_human_round_trips(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) loaded = load_escalation(repo, rec.escalation_id) assert loaded is not None assert loaded.escalated_by.type == "human" def test_escalated_by_agent_round_trips(self, repo: pathlib.Path) -> None: rec = _make_record(agent_id="claude-code") record_escalation(repo, rec) loaded = load_escalation(repo, rec.escalation_id) assert loaded is not None assert loaded.escalated_by.agent_id == "claude-code" def test_validates_id_rejects_traversal(self, repo: pathlib.Path) -> None: with pytest.raises(ValueError): load_escalation(repo, "../../malicious") class TestListEscalations: """II: list_escalations returns sorted list with optional status filter.""" def test_empty_store_returns_empty(self, repo: pathlib.Path) -> None: assert list_escalations(repo) == [] def test_returns_all_by_default(self, repo: pathlib.Path) -> None: pid1, pid2 = fake_id("p1"), fake_id("p2") record_escalation(repo, _make_record(pid1, "r1")) record_escalation(repo, _make_record(pid2, "r2")) assert len(list_escalations(repo)) == 2 def test_filter_open_only(self, repo: pathlib.Path) -> None: pid1, pid2 = fake_id("p1"), fake_id("p2") rec1 = _make_record(pid1, "r1") rec2 = _make_record(pid2, "r2") record_escalation(repo, rec1) record_escalation(repo, rec2) resolve_escalation( repo, rec2.escalation_id, fake_id("res"), AgentProvenance.human(), _utc_now() ) results = list_escalations(repo, status=EscalationStatus.OPEN) assert len(results) == 1 assert results[0].escalation_id == rec1.escalation_id def test_filter_resolved_only(self, repo: pathlib.Path) -> None: pid1, pid2 = fake_id("p1"), fake_id("p2") rec1 = _make_record(pid1, "r1") rec2 = _make_record(pid2, "r2") record_escalation(repo, rec1) record_escalation(repo, rec2) resolve_escalation( repo, rec2.escalation_id, fake_id("res"), AgentProvenance.human(), _utc_now() ) results = list_escalations(repo, status=EscalationStatus.RESOLVED) assert len(results) == 1 assert results[0].escalation_id == rec2.escalation_id def test_sorted_newest_first(self, repo: pathlib.Path) -> None: pids = [fake_id(f"p{i}") for i in range(3)] for i, pid in enumerate(pids): rec = _make_record(pid, f"reason {i}") record_escalation(repo, rec) results = list_escalations(repo) timestamps = [r.escalated_at for r in results] assert timestamps == sorted(timestamps, reverse=True) def test_skips_symlinks(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) esc_dir = escalations_dir(repo) link = esc_dir / ("a" * 64 + ".json") link.symlink_to(esc_dir / f"{long_id(rec.escalation_id, strip=True)}.json") results = list_escalations(repo) assert len(results) == 1 # symlink skipped class TestResolveEscalation: """II: resolve_escalation transitions open → resolved atomically.""" def test_returns_true_when_found(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) result = resolve_escalation( repo, rec.escalation_id, fake_id("res"), AgentProvenance.human(), _utc_now() ) assert result is True def test_returns_false_when_not_found(self, repo: pathlib.Path) -> None: eid = fake_id("missing") result = resolve_escalation( repo, eid, fake_id("res"), AgentProvenance.human(), _utc_now() ) assert result is False def test_status_updated_to_resolved(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) resolve_escalation( repo, rec.escalation_id, fake_id("res"), AgentProvenance.human(), _utc_now() ) loaded = load_escalation(repo, rec.escalation_id) assert loaded is not None assert loaded.status == EscalationStatus.RESOLVED def test_resolution_id_stored(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) res_id = fake_id("resolution") resolve_escalation( repo, rec.escalation_id, res_id, AgentProvenance.human(), _utc_now() ) loaded = load_escalation(repo, rec.escalation_id) assert loaded is not None assert loaded.resolution_id == res_id def test_resolved_by_stored(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) actor = AgentProvenance.agent("claude-code", "claude-sonnet-4-6") resolve_escalation(repo, rec.escalation_id, fake_id("res"), actor, _utc_now()) loaded = load_escalation(repo, rec.escalation_id) assert loaded is not None assert loaded.resolved_by is not None assert loaded.resolved_by.agent_id == "claude-code" def test_resolved_at_stored(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) now = _utc_now() resolve_escalation(repo, rec.escalation_id, fake_id("res"), AgentProvenance.human(), now) loaded = load_escalation(repo, rec.escalation_id) assert loaded is not None assert loaded.resolved_at is not None def test_validates_escalation_id(self, repo: pathlib.Path) -> None: with pytest.raises(ValueError): resolve_escalation( repo, "bad-id", fake_id("res"), AgentProvenance.human(), _utc_now() ) def test_no_temp_files_after_resolve(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) resolve_escalation( repo, rec.escalation_id, fake_id("res"), AgentProvenance.human(), _utc_now() ) esc_dir = escalations_dir(repo) assert list(esc_dir.glob("*.tmp")) == [] # =========================================================================== # Tier III — End-to-end # =========================================================================== class TestEndToEnd: """III: full lifecycle: escalate → list → resolve → audit.""" def test_escalate_list_resolve_audit(self, repo: pathlib.Path) -> None: pid = fake_id("track.mid:content") reason = "No policy or resolution found for pattern" actor = AgentProvenance.agent("claude-code") # 1. record escalation rec = EscalationRecord( escalation_id=compute_escalation_id(pid, reason), pattern_id=pid, reason=reason, escalated_at=_utc_now(), escalated_by=actor, ) record_escalation(repo, rec) # 2. audit escalation event append_audit(repo, AuditEventType.ESCALATION_RECORDED, actor, pattern_id=pid) # 3. list shows 1 open escalation open_esc = list_escalations(repo, status=EscalationStatus.OPEN) assert len(open_esc) == 1 # 4. resolve it res_id = fake_id("resolution") resolve_escalation(repo, rec.escalation_id, res_id, actor, _utc_now()) append_audit( repo, AuditEventType.ESCALATION_RESOLVED, actor, pattern_id=pid, metadata={"escalation_id": rec.escalation_id} ) # 5. now 0 open, 1 resolved assert list_escalations(repo, status=EscalationStatus.OPEN) == [] resolved = list_escalations(repo, status=EscalationStatus.RESOLVED) assert len(resolved) == 1 # 6. audit log has both events entries = list_audit(repo) event_types = {e["event_type"] for e in entries} assert AuditEventType.ESCALATION_RECORDED in event_types assert AuditEventType.ESCALATION_RESOLVED in event_types def test_multiple_patterns_independent_escalations(self, repo: pathlib.Path) -> None: pids = [fake_id(f"track{i}.mid") for i in range(5)] recs = [] for pid in pids: rec = _make_record(pid, "no match") record_escalation(repo, rec) recs.append(rec) assert len(list_escalations(repo)) == 5 # resolve two of them for rec in recs[:2]: resolve_escalation(repo, rec.escalation_id, fake_id("res"), AgentProvenance.human(), _utc_now()) assert len(list_escalations(repo, status=EscalationStatus.OPEN)) == 3 assert len(list_escalations(repo, status=EscalationStatus.RESOLVED)) == 2 # =========================================================================== # Tier IV — Stress # =========================================================================== class TestStress: """IV: 200 escalations; concurrent record idempotency.""" def test_list_200_escalations(self, repo: pathlib.Path) -> None: pids = [fake_id(f"stress-{i}") for i in range(200)] for pid in pids: record_escalation(repo, _make_record(pid, "stress")) results = list_escalations(repo) assert len(results) == 200 def test_concurrent_record_same_id_idempotent(self, repo: pathlib.Path) -> None: """Concurrent writes of the same escalation_id must not corrupt data.""" rec = _make_record() n = 20 def _write() -> bool: return record_escalation(repo, rec) with concurrent.futures.ThreadPoolExecutor(max_workers=n) as ex: futures = [ex.submit(_write) for _ in range(n)] results = [f.result() for f in futures] # File must exist and be valid JSON dest = escalation_path(repo, rec.escalation_id) assert dest.exists() data = json.loads(dest.read_text()) assert data["escalation_id"] == rec.escalation_id # At least one write must have succeeded assert any(results) # =========================================================================== # Tier V — Data integrity # =========================================================================== class TestDataIntegrity: """V: JSON round-trip; all fields present; frozen records.""" def test_json_round_trip_open(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) loaded = load_escalation(repo, rec.escalation_id) assert loaded is not None assert loaded.escalation_id == rec.escalation_id assert loaded.pattern_id == rec.pattern_id assert loaded.reason == rec.reason assert loaded.status == EscalationStatus.OPEN assert loaded.resolved_at is None assert loaded.resolved_by is None assert loaded.resolution_id is None def test_json_round_trip_resolved(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) res_id = fake_id("res") actor = AgentProvenance.agent("claude-code", "claude-sonnet-4-6") now = _utc_now() resolve_escalation(repo, rec.escalation_id, res_id, actor, now) loaded = load_escalation(repo, rec.escalation_id) assert loaded is not None assert loaded.status == EscalationStatus.RESOLVED assert loaded.resolution_id == res_id assert loaded.resolved_by is not None assert loaded.resolved_by.agent_id == "claude-code" assert loaded.resolved_by.model_id == "claude-sonnet-4-6" assert loaded.resolved_at is not None def test_escalation_record_is_frozen(self) -> None: rec = _make_record() assert dataclasses.is_dataclass(rec) with pytest.raises((dataclasses.FrozenInstanceError, AttributeError)): rec.reason = "mutated" # type: ignore[misc] def test_no_temp_files_in_escalations_dir(self, repo: pathlib.Path) -> None: for i in range(5): record_escalation(repo, _make_record(fake_id(f"p{i}"), "r")) esc_dir = escalations_dir(repo) assert list(esc_dir.glob("*.tmp")) == [] def test_escalated_at_is_utc_aware(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) loaded = load_escalation(repo, rec.escalation_id) assert loaded is not None assert loaded.escalated_at.tzinfo is not None # =========================================================================== # Tier VI — Security # =========================================================================== class TestSecurity: """VI: ID validation; path traversal rejected; size cap.""" def test_load_traversal_rejected(self, repo: pathlib.Path) -> None: with pytest.raises(ValueError): load_escalation(repo, "../../etc/passwd") def test_resolve_traversal_rejected(self, repo: pathlib.Path) -> None: with pytest.raises(ValueError): resolve_escalation( repo, "../traversal", fake_id("r"), AgentProvenance.human(), _utc_now() ) def test_load_null_byte_rejected(self, repo: pathlib.Path) -> None: with pytest.raises(ValueError): load_escalation(repo, "a" * 63 + "\x00") def test_load_too_short_rejected(self, repo: pathlib.Path) -> None: with pytest.raises(ValueError): load_escalation(repo, "abc") def test_load_uppercase_hex_rejected(self, repo: pathlib.Path) -> None: with pytest.raises(ValueError): load_escalation(repo, "A" * 64) def test_list_skips_non_json_files(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) # Plant a non-JSON file esc_dir = escalations_dir(repo) (esc_dir / "not-a-real-file.txt").write_text("noise") results = list_escalations(repo) assert len(results) == 1 def test_oversized_escalation_file_skipped(self, repo: pathlib.Path) -> None: """Files exceeding _MAX_ESCALATION_BYTES must be silently skipped.""" rec = _make_record() record_escalation(repo, rec) dest = escalation_path(repo, rec.escalation_id) # Bloat the file beyond 16 KiB bloated = dest.read_text() + " " * 20_000 dest.write_text(bloated) results = list_escalations(repo) assert results == [] def test_symlinks_in_escalations_dir_skipped(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) esc_dir = escalations_dir(repo) link = esc_dir / f"{'b' * 64}.json" link.symlink_to(esc_dir / f"{long_id(rec.escalation_id, strip=True)}.json") results = list_escalations(repo) assert len(results) == 1 # only real file # =========================================================================== # Tier VII — Performance # =========================================================================== class TestPerformance: """VII: all ops complete within 50 ms.""" def test_record_escalation_under_50ms(self, repo: pathlib.Path) -> None: rec = _make_record() start = time.monotonic() record_escalation(repo, rec) elapsed = (time.monotonic() - start) * 1000 assert elapsed < 50, f"record_escalation took {elapsed:.1f}ms" def test_load_escalation_under_50ms(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) start = time.monotonic() load_escalation(repo, rec.escalation_id) elapsed = (time.monotonic() - start) * 1000 assert elapsed < 50, f"load_escalation took {elapsed:.1f}ms" def test_resolve_escalation_under_50ms(self, repo: pathlib.Path) -> None: rec = _make_record() record_escalation(repo, rec) start = time.monotonic() resolve_escalation(repo, rec.escalation_id, fake_id("r"), AgentProvenance.human(), _utc_now()) elapsed = (time.monotonic() - start) * 1000 assert elapsed < 50, f"resolve_escalation took {elapsed:.1f}ms" def test_list_100_escalations_under_50ms(self, repo: pathlib.Path) -> None: for i in range(100): record_escalation(repo, _make_record(fake_id(f"p{i}"), "r")) start = time.monotonic() list_escalations(repo) elapsed = (time.monotonic() - start) * 1000 assert elapsed < 50, f"list_escalations(100) took {elapsed:.1f}ms" def test_compute_escalation_id_under_1ms(self) -> None: pid = fake_id("p") start = time.monotonic() compute_escalation_id(pid, "reason") elapsed = (time.monotonic() - start) * 1000 assert elapsed < 1, f"compute_escalation_id took {elapsed:.2f}ms"