gabriel / muse public
test_harmony_phase4.py python
693 lines 25.7 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 20 days ago
1 """Tests for Phase 4 — Escalation Records.
2
3 New additions to ``muse/core/harmony.py``:
4
5 ``EscalationStatus`` — open string constants: OPEN, RESOLVED
6 ``AuditEventType.ESCALATION_RESOLVED``
7 ``EscalationRecord`` — frozen dataclass
8 ``compute_escalation_id`` — deterministic hex64 from (pattern_id, reason)
9 ``escalations_dir`` — ``.muse/harmony/escalations/``
10 ``record_escalation`` — atomic write; idempotent
11 ``load_escalation`` — read by ID
12 ``list_escalations`` — filtered list (status filter)
13 ``resolve_escalation`` — transition open → resolved, atomic
14
15 Coverage tiers
16 --------------
17 I Unit — EscalationStatus constants, EscalationRecord fields,
18 compute_escalation_id determinism + collision resistance
19 II Integration — record / load / list / resolve_escalation CRUD
20 III End-to-end — escalate → list → resolve → audit trail
21 IV Stress — 200-escalation list, concurrent record (same ID idempotency)
22 V Data integrity — JSON round-trip, all fields always present, frozen
23 VI Security — ID validation, path traversal rejected, size cap respected
24 VII Performance — all ops <50 ms
25 """
26 from __future__ import annotations
27
28 import concurrent.futures
29 import dataclasses
30 import datetime
31 import json
32 import pathlib
33 import time
34
35 import pytest
36
37 import muse.core.harmony as h
38 from muse.core.paths import harmony_dir, muse_dir
39 from muse.core.types import fake_id, long_id, split_id
40 from muse.core.harmony import (
41 AgentProvenance,
42 AuditEventType,
43 EscalationRecord,
44 EscalationStatus,
45 append_audit,
46 blob_fingerprint,
47 compute_escalation_id,
48 compute_pattern_id,
49 compute_resolution_id,
50 escalation_path,
51 escalations_dir,
52 list_audit,
53 list_escalations,
54 load_escalation,
55 record_escalation,
56 resolve_escalation,
57 save_resolution,
58 Resolution,
59 )
60
61
62 # ---------------------------------------------------------------------------
63 # Shared helpers
64 # ---------------------------------------------------------------------------
65
66
67 def _utc_now() -> datetime.datetime:
68 return datetime.datetime.now(datetime.timezone.utc)
69
70
71 @pytest.fixture()
72 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
73 muse_dir(tmp_path).mkdir()
74 return tmp_path
75
76
77 def _make_record(
78 pattern_id: str | None = None,
79 reason: str = "No match found",
80 agent_id: str | None = None,
81 ) -> EscalationRecord:
82 pid = pattern_id or fake_id("pat1")
83 eid = compute_escalation_id(pid, reason)
84 return EscalationRecord(
85 escalation_id=eid,
86 pattern_id=pid,
87 reason=reason,
88 escalated_at=_utc_now(),
89 escalated_by=(
90 AgentProvenance.agent(agent_id) if agent_id else AgentProvenance.human()
91 ),
92 )
93
94
95 # ===========================================================================
96 # Tier I — Unit
97 # ===========================================================================
98
99
100 class TestEscalationStatusConstants:
101 """I: EscalationStatus is an open string-constant namespace."""
102
103 def test_open_value(self) -> None:
104 assert EscalationStatus.OPEN == "open"
105
106 def test_resolved_value(self) -> None:
107 assert EscalationStatus.RESOLVED == "resolved"
108
109 def test_constants_are_strings(self) -> None:
110 assert isinstance(EscalationStatus.OPEN, str)
111 assert isinstance(EscalationStatus.RESOLVED, str)
112
113
114 class TestAuditEventTypeEscalationResolved:
115 """I: AuditEventType.ESCALATION_RESOLVED constant exists."""
116
117 def test_escalation_resolved_constant(self) -> None:
118 assert AuditEventType.ESCALATION_RESOLVED == "escalation_resolved"
119
120 def test_escalation_recorded_still_present(self) -> None:
121 assert AuditEventType.ESCALATION_RECORDED == "escalation_recorded"
122
123
124 class TestEscalationRecordFields:
125 """I: EscalationRecord dataclass shape and defaults."""
126
127 def test_required_fields_present(self) -> None:
128 pid = fake_id("p")
129 eid = compute_escalation_id(pid, "reason")
130 r = EscalationRecord(
131 escalation_id=eid,
132 pattern_id=pid,
133 reason="test reason",
134 escalated_at=_utc_now(),
135 escalated_by=AgentProvenance.human(),
136 )
137 assert r.escalation_id == eid
138 assert r.pattern_id == pid
139 assert r.reason == "test reason"
140 assert r.status == EscalationStatus.OPEN
141
142 def test_status_defaults_to_open(self) -> None:
143 r = _make_record()
144 assert r.status == EscalationStatus.OPEN
145
146 def test_optional_fields_default_none(self) -> None:
147 r = _make_record()
148 assert r.resolved_at is None
149 assert r.resolved_by is None
150 assert r.resolution_id is None
151
152 def test_frozen(self) -> None:
153 r = _make_record()
154 with pytest.raises((dataclasses.FrozenInstanceError, AttributeError)):
155 r.status = EscalationStatus.RESOLVED # type: ignore[misc]
156
157 def test_is_dataclass(self) -> None:
158 assert dataclasses.is_dataclass(EscalationRecord)
159
160
161 class TestComputeEscalationId:
162 """I: compute_escalation_id is deterministic and collision-resistant."""
163
164 def test_deterministic_same_inputs(self) -> None:
165 pid = fake_id("p")
166 eid1 = compute_escalation_id(pid, "reason")
167 eid2 = compute_escalation_id(pid, "reason")
168 assert eid1 == eid2
169
170 def test_different_pattern_ids_differ(self) -> None:
171 pid1 = fake_id("p1")
172 pid2 = fake_id("p2")
173 assert compute_escalation_id(pid1, "reason") != compute_escalation_id(pid2, "reason")
174
175 def test_different_reasons_differ(self) -> None:
176 pid = fake_id("p")
177 assert compute_escalation_id(pid, "reason A") != compute_escalation_id(pid, "reason B")
178
179 def test_returns_sha256_id(self) -> None:
180 eid = compute_escalation_id(fake_id("p"), "reason")
181 assert eid.startswith("sha256:")
182 assert len(eid) == 71
183 assert all(c in "0123456789abcdef" for c in split_id(eid)[1])
184
185
186 # ===========================================================================
187 # Tier II — Integration: CRUD
188 # ===========================================================================
189
190
191 class TestEscalationsDir:
192 """II: escalations_dir returns the correct path."""
193
194 def test_path_structure(self, repo: pathlib.Path) -> None:
195 d = escalations_dir(repo)
196 assert d == harmony_dir(repo) / "escalations"
197
198 def test_does_not_create_dir(self, repo: pathlib.Path) -> None:
199 d = escalations_dir(repo)
200 assert not d.exists()
201
202
203 class TestRecordEscalation:
204 """II: record_escalation writes a JSON file; idempotent."""
205
206 def test_creates_escalation_file(self, repo: pathlib.Path) -> None:
207 rec = _make_record()
208 record_escalation(repo, rec)
209 dest = escalation_path(repo, rec.escalation_id)
210 assert dest.exists()
211
212 def test_idempotent_second_write_no_error(self, repo: pathlib.Path) -> None:
213 rec = _make_record()
214 record_escalation(repo, rec)
215 record_escalation(repo, rec) # should not raise
216 dest = escalation_path(repo, rec.escalation_id)
217 assert dest.exists()
218
219 def test_idempotent_returns_false_on_second_write(self, repo: pathlib.Path) -> None:
220 rec = _make_record()
221 first = record_escalation(repo, rec)
222 second = record_escalation(repo, rec)
223 assert first is True
224 assert second is False
225
226 def test_json_contains_expected_keys(self, repo: pathlib.Path) -> None:
227 rec = _make_record()
228 record_escalation(repo, rec)
229 dest = escalation_path(repo, rec.escalation_id)
230 data = json.loads(dest.read_text())
231 for key in ("escalation_id", "pattern_id", "reason", "escalated_at",
232 "escalated_by", "status"):
233 assert key in data
234
235 def test_status_stored_as_open(self, repo: pathlib.Path) -> None:
236 rec = _make_record()
237 record_escalation(repo, rec)
238 dest = escalation_path(repo, rec.escalation_id)
239 data = json.loads(dest.read_text())
240 assert data["status"] == EscalationStatus.OPEN
241
242 def test_atomic_write_no_temp_files(self, repo: pathlib.Path) -> None:
243 rec = _make_record()
244 record_escalation(repo, rec)
245 esc_dir = escalations_dir(repo)
246 tmp_files = list(esc_dir.glob("*.tmp"))
247 assert tmp_files == []
248
249
250 class TestLoadEscalation:
251 """II: load_escalation retrieves a stored record."""
252
253 def test_returns_none_for_missing(self, repo: pathlib.Path) -> None:
254 eid = fake_id("missing")
255 assert load_escalation(repo, eid) is None
256
257 def test_round_trips_required_fields(self, repo: pathlib.Path) -> None:
258 rec = _make_record()
259 record_escalation(repo, rec)
260 loaded = load_escalation(repo, rec.escalation_id)
261 assert loaded is not None
262 assert loaded.escalation_id == rec.escalation_id
263 assert loaded.pattern_id == rec.pattern_id
264 assert loaded.reason == rec.reason
265 assert loaded.status == EscalationStatus.OPEN
266
267 def test_escalated_by_human_round_trips(self, repo: pathlib.Path) -> None:
268 rec = _make_record()
269 record_escalation(repo, rec)
270 loaded = load_escalation(repo, rec.escalation_id)
271 assert loaded is not None
272 assert loaded.escalated_by.type == "human"
273
274 def test_escalated_by_agent_round_trips(self, repo: pathlib.Path) -> None:
275 rec = _make_record(agent_id="claude-code")
276 record_escalation(repo, rec)
277 loaded = load_escalation(repo, rec.escalation_id)
278 assert loaded is not None
279 assert loaded.escalated_by.agent_id == "claude-code"
280
281 def test_validates_id_rejects_traversal(self, repo: pathlib.Path) -> None:
282 with pytest.raises(ValueError):
283 load_escalation(repo, "../../malicious")
284
285
286 class TestListEscalations:
287 """II: list_escalations returns sorted list with optional status filter."""
288
289 def test_empty_store_returns_empty(self, repo: pathlib.Path) -> None:
290 assert list_escalations(repo) == []
291
292 def test_returns_all_by_default(self, repo: pathlib.Path) -> None:
293 pid1, pid2 = fake_id("p1"), fake_id("p2")
294 record_escalation(repo, _make_record(pid1, "r1"))
295 record_escalation(repo, _make_record(pid2, "r2"))
296 assert len(list_escalations(repo)) == 2
297
298 def test_filter_open_only(self, repo: pathlib.Path) -> None:
299 pid1, pid2 = fake_id("p1"), fake_id("p2")
300 rec1 = _make_record(pid1, "r1")
301 rec2 = _make_record(pid2, "r2")
302 record_escalation(repo, rec1)
303 record_escalation(repo, rec2)
304 resolve_escalation(
305 repo, rec2.escalation_id, fake_id("res"),
306 AgentProvenance.human(), _utc_now()
307 )
308 results = list_escalations(repo, status=EscalationStatus.OPEN)
309 assert len(results) == 1
310 assert results[0].escalation_id == rec1.escalation_id
311
312 def test_filter_resolved_only(self, repo: pathlib.Path) -> None:
313 pid1, pid2 = fake_id("p1"), fake_id("p2")
314 rec1 = _make_record(pid1, "r1")
315 rec2 = _make_record(pid2, "r2")
316 record_escalation(repo, rec1)
317 record_escalation(repo, rec2)
318 resolve_escalation(
319 repo, rec2.escalation_id, fake_id("res"),
320 AgentProvenance.human(), _utc_now()
321 )
322 results = list_escalations(repo, status=EscalationStatus.RESOLVED)
323 assert len(results) == 1
324 assert results[0].escalation_id == rec2.escalation_id
325
326 def test_sorted_newest_first(self, repo: pathlib.Path) -> None:
327 pids = [fake_id(f"p{i}") for i in range(3)]
328 for i, pid in enumerate(pids):
329 rec = _make_record(pid, f"reason {i}")
330 record_escalation(repo, rec)
331 results = list_escalations(repo)
332 timestamps = [r.escalated_at for r in results]
333 assert timestamps == sorted(timestamps, reverse=True)
334
335 def test_skips_symlinks(self, repo: pathlib.Path) -> None:
336 rec = _make_record()
337 record_escalation(repo, rec)
338 esc_dir = escalations_dir(repo)
339 link = esc_dir / ("a" * 64 + ".json")
340 link.symlink_to(esc_dir / f"{long_id(rec.escalation_id, strip=True)}.json")
341 results = list_escalations(repo)
342 assert len(results) == 1 # symlink skipped
343
344
345 class TestResolveEscalation:
346 """II: resolve_escalation transitions open → resolved atomically."""
347
348 def test_returns_true_when_found(self, repo: pathlib.Path) -> None:
349 rec = _make_record()
350 record_escalation(repo, rec)
351 result = resolve_escalation(
352 repo, rec.escalation_id, fake_id("res"),
353 AgentProvenance.human(), _utc_now()
354 )
355 assert result is True
356
357 def test_returns_false_when_not_found(self, repo: pathlib.Path) -> None:
358 eid = fake_id("missing")
359 result = resolve_escalation(
360 repo, eid, fake_id("res"),
361 AgentProvenance.human(), _utc_now()
362 )
363 assert result is False
364
365 def test_status_updated_to_resolved(self, repo: pathlib.Path) -> None:
366 rec = _make_record()
367 record_escalation(repo, rec)
368 resolve_escalation(
369 repo, rec.escalation_id, fake_id("res"),
370 AgentProvenance.human(), _utc_now()
371 )
372 loaded = load_escalation(repo, rec.escalation_id)
373 assert loaded is not None
374 assert loaded.status == EscalationStatus.RESOLVED
375
376 def test_resolution_id_stored(self, repo: pathlib.Path) -> None:
377 rec = _make_record()
378 record_escalation(repo, rec)
379 res_id = fake_id("resolution")
380 resolve_escalation(
381 repo, rec.escalation_id, res_id,
382 AgentProvenance.human(), _utc_now()
383 )
384 loaded = load_escalation(repo, rec.escalation_id)
385 assert loaded is not None
386 assert loaded.resolution_id == res_id
387
388 def test_resolved_by_stored(self, repo: pathlib.Path) -> None:
389 rec = _make_record()
390 record_escalation(repo, rec)
391 actor = AgentProvenance.agent("claude-code", "claude-sonnet-4-6")
392 resolve_escalation(repo, rec.escalation_id, fake_id("res"), actor, _utc_now())
393 loaded = load_escalation(repo, rec.escalation_id)
394 assert loaded is not None
395 assert loaded.resolved_by is not None
396 assert loaded.resolved_by.agent_id == "claude-code"
397
398 def test_resolved_at_stored(self, repo: pathlib.Path) -> None:
399 rec = _make_record()
400 record_escalation(repo, rec)
401 now = _utc_now()
402 resolve_escalation(repo, rec.escalation_id, fake_id("res"),
403 AgentProvenance.human(), now)
404 loaded = load_escalation(repo, rec.escalation_id)
405 assert loaded is not None
406 assert loaded.resolved_at is not None
407
408 def test_validates_escalation_id(self, repo: pathlib.Path) -> None:
409 with pytest.raises(ValueError):
410 resolve_escalation(
411 repo, "bad-id", fake_id("res"),
412 AgentProvenance.human(), _utc_now()
413 )
414
415 def test_no_temp_files_after_resolve(self, repo: pathlib.Path) -> None:
416 rec = _make_record()
417 record_escalation(repo, rec)
418 resolve_escalation(
419 repo, rec.escalation_id, fake_id("res"),
420 AgentProvenance.human(), _utc_now()
421 )
422 esc_dir = escalations_dir(repo)
423 assert list(esc_dir.glob("*.tmp")) == []
424
425
426 # ===========================================================================
427 # Tier III — End-to-end
428 # ===========================================================================
429
430
431 class TestEndToEnd:
432 """III: full lifecycle: escalate → list → resolve → audit."""
433
434 def test_escalate_list_resolve_audit(self, repo: pathlib.Path) -> None:
435 pid = fake_id("track.mid:content")
436 reason = "No policy or resolution found for pattern"
437 actor = AgentProvenance.agent("claude-code")
438
439 # 1. record escalation
440 rec = EscalationRecord(
441 escalation_id=compute_escalation_id(pid, reason),
442 pattern_id=pid,
443 reason=reason,
444 escalated_at=_utc_now(),
445 escalated_by=actor,
446 )
447 record_escalation(repo, rec)
448
449 # 2. audit escalation event
450 append_audit(repo, AuditEventType.ESCALATION_RECORDED, actor, pattern_id=pid)
451
452 # 3. list shows 1 open escalation
453 open_esc = list_escalations(repo, status=EscalationStatus.OPEN)
454 assert len(open_esc) == 1
455
456 # 4. resolve it
457 res_id = fake_id("resolution")
458 resolve_escalation(repo, rec.escalation_id, res_id, actor, _utc_now())
459 append_audit(
460 repo, AuditEventType.ESCALATION_RESOLVED, actor,
461 pattern_id=pid, metadata={"escalation_id": rec.escalation_id}
462 )
463
464 # 5. now 0 open, 1 resolved
465 assert list_escalations(repo, status=EscalationStatus.OPEN) == []
466 resolved = list_escalations(repo, status=EscalationStatus.RESOLVED)
467 assert len(resolved) == 1
468
469 # 6. audit log has both events
470 entries = list_audit(repo)
471 event_types = {e["event_type"] for e in entries}
472 assert AuditEventType.ESCALATION_RECORDED in event_types
473 assert AuditEventType.ESCALATION_RESOLVED in event_types
474
475 def test_multiple_patterns_independent_escalations(self, repo: pathlib.Path) -> None:
476 pids = [fake_id(f"track{i}.mid") for i in range(5)]
477 recs = []
478 for pid in pids:
479 rec = _make_record(pid, "no match")
480 record_escalation(repo, rec)
481 recs.append(rec)
482
483 assert len(list_escalations(repo)) == 5
484
485 # resolve two of them
486 for rec in recs[:2]:
487 resolve_escalation(repo, rec.escalation_id, fake_id("res"),
488 AgentProvenance.human(), _utc_now())
489
490 assert len(list_escalations(repo, status=EscalationStatus.OPEN)) == 3
491 assert len(list_escalations(repo, status=EscalationStatus.RESOLVED)) == 2
492
493
494 # ===========================================================================
495 # Tier IV — Stress
496 # ===========================================================================
497
498
499 class TestStress:
500 """IV: 200 escalations; concurrent record idempotency."""
501
502 def test_list_200_escalations(self, repo: pathlib.Path) -> None:
503 pids = [fake_id(f"stress-{i}") for i in range(200)]
504 for pid in pids:
505 record_escalation(repo, _make_record(pid, "stress"))
506 results = list_escalations(repo)
507 assert len(results) == 200
508
509 def test_concurrent_record_same_id_idempotent(self, repo: pathlib.Path) -> None:
510 """Concurrent writes of the same escalation_id must not corrupt data."""
511 rec = _make_record()
512 n = 20
513
514 def _write() -> bool:
515 return record_escalation(repo, rec)
516
517 with concurrent.futures.ThreadPoolExecutor(max_workers=n) as ex:
518 futures = [ex.submit(_write) for _ in range(n)]
519 results = [f.result() for f in futures]
520
521 # File must exist and be valid JSON
522 dest = escalation_path(repo, rec.escalation_id)
523 assert dest.exists()
524 data = json.loads(dest.read_text())
525 assert data["escalation_id"] == rec.escalation_id
526 # At least one write must have succeeded
527 assert any(results)
528
529
530 # ===========================================================================
531 # Tier V — Data integrity
532 # ===========================================================================
533
534
535 class TestDataIntegrity:
536 """V: JSON round-trip; all fields present; frozen records."""
537
538 def test_json_round_trip_open(self, repo: pathlib.Path) -> None:
539 rec = _make_record()
540 record_escalation(repo, rec)
541 loaded = load_escalation(repo, rec.escalation_id)
542 assert loaded is not None
543 assert loaded.escalation_id == rec.escalation_id
544 assert loaded.pattern_id == rec.pattern_id
545 assert loaded.reason == rec.reason
546 assert loaded.status == EscalationStatus.OPEN
547 assert loaded.resolved_at is None
548 assert loaded.resolved_by is None
549 assert loaded.resolution_id is None
550
551 def test_json_round_trip_resolved(self, repo: pathlib.Path) -> None:
552 rec = _make_record()
553 record_escalation(repo, rec)
554 res_id = fake_id("res")
555 actor = AgentProvenance.agent("claude-code", "claude-sonnet-4-6")
556 now = _utc_now()
557 resolve_escalation(repo, rec.escalation_id, res_id, actor, now)
558 loaded = load_escalation(repo, rec.escalation_id)
559 assert loaded is not None
560 assert loaded.status == EscalationStatus.RESOLVED
561 assert loaded.resolution_id == res_id
562 assert loaded.resolved_by is not None
563 assert loaded.resolved_by.agent_id == "claude-code"
564 assert loaded.resolved_by.model_id == "claude-sonnet-4-6"
565 assert loaded.resolved_at is not None
566
567 def test_escalation_record_is_frozen(self) -> None:
568 rec = _make_record()
569 assert dataclasses.is_dataclass(rec)
570 with pytest.raises((dataclasses.FrozenInstanceError, AttributeError)):
571 rec.reason = "mutated" # type: ignore[misc]
572
573 def test_no_temp_files_in_escalations_dir(self, repo: pathlib.Path) -> None:
574 for i in range(5):
575 record_escalation(repo, _make_record(fake_id(f"p{i}"), "r"))
576 esc_dir = escalations_dir(repo)
577 assert list(esc_dir.glob("*.tmp")) == []
578
579 def test_escalated_at_is_utc_aware(self, repo: pathlib.Path) -> None:
580 rec = _make_record()
581 record_escalation(repo, rec)
582 loaded = load_escalation(repo, rec.escalation_id)
583 assert loaded is not None
584 assert loaded.escalated_at.tzinfo is not None
585
586
587 # ===========================================================================
588 # Tier VI — Security
589 # ===========================================================================
590
591
592 class TestSecurity:
593 """VI: ID validation; path traversal rejected; size cap."""
594
595 def test_load_traversal_rejected(self, repo: pathlib.Path) -> None:
596 with pytest.raises(ValueError):
597 load_escalation(repo, "../../etc/passwd")
598
599 def test_resolve_traversal_rejected(self, repo: pathlib.Path) -> None:
600 with pytest.raises(ValueError):
601 resolve_escalation(
602 repo, "../traversal", fake_id("r"),
603 AgentProvenance.human(), _utc_now()
604 )
605
606 def test_load_null_byte_rejected(self, repo: pathlib.Path) -> None:
607 with pytest.raises(ValueError):
608 load_escalation(repo, "a" * 63 + "\x00")
609
610 def test_load_too_short_rejected(self, repo: pathlib.Path) -> None:
611 with pytest.raises(ValueError):
612 load_escalation(repo, "abc")
613
614 def test_load_uppercase_hex_rejected(self, repo: pathlib.Path) -> None:
615 with pytest.raises(ValueError):
616 load_escalation(repo, "A" * 64)
617
618 def test_list_skips_non_json_files(self, repo: pathlib.Path) -> None:
619 rec = _make_record()
620 record_escalation(repo, rec)
621 # Plant a non-JSON file
622 esc_dir = escalations_dir(repo)
623 (esc_dir / "not-a-real-file.txt").write_text("noise")
624 results = list_escalations(repo)
625 assert len(results) == 1
626
627 def test_oversized_escalation_file_skipped(self, repo: pathlib.Path) -> None:
628 """Files exceeding _MAX_ESCALATION_BYTES must be silently skipped."""
629 rec = _make_record()
630 record_escalation(repo, rec)
631 dest = escalation_path(repo, rec.escalation_id)
632 # Bloat the file beyond 16 KiB
633 bloated = dest.read_text() + " " * 20_000
634 dest.write_text(bloated)
635 results = list_escalations(repo)
636 assert results == []
637
638 def test_symlinks_in_escalations_dir_skipped(self, repo: pathlib.Path) -> None:
639 rec = _make_record()
640 record_escalation(repo, rec)
641 esc_dir = escalations_dir(repo)
642 link = esc_dir / f"{'b' * 64}.json"
643 link.symlink_to(esc_dir / f"{long_id(rec.escalation_id, strip=True)}.json")
644 results = list_escalations(repo)
645 assert len(results) == 1 # only real file
646
647
648 # ===========================================================================
649 # Tier VII — Performance
650 # ===========================================================================
651
652
653 class TestPerformance:
654 """VII: all ops complete within 50 ms."""
655
656 def test_record_escalation_under_50ms(self, repo: pathlib.Path) -> None:
657 rec = _make_record()
658 start = time.monotonic()
659 record_escalation(repo, rec)
660 elapsed = (time.monotonic() - start) * 1000
661 assert elapsed < 50, f"record_escalation took {elapsed:.1f}ms"
662
663 def test_load_escalation_under_50ms(self, repo: pathlib.Path) -> None:
664 rec = _make_record()
665 record_escalation(repo, rec)
666 start = time.monotonic()
667 load_escalation(repo, rec.escalation_id)
668 elapsed = (time.monotonic() - start) * 1000
669 assert elapsed < 50, f"load_escalation took {elapsed:.1f}ms"
670
671 def test_resolve_escalation_under_50ms(self, repo: pathlib.Path) -> None:
672 rec = _make_record()
673 record_escalation(repo, rec)
674 start = time.monotonic()
675 resolve_escalation(repo, rec.escalation_id, fake_id("r"),
676 AgentProvenance.human(), _utc_now())
677 elapsed = (time.monotonic() - start) * 1000
678 assert elapsed < 50, f"resolve_escalation took {elapsed:.1f}ms"
679
680 def test_list_100_escalations_under_50ms(self, repo: pathlib.Path) -> None:
681 for i in range(100):
682 record_escalation(repo, _make_record(fake_id(f"p{i}"), "r"))
683 start = time.monotonic()
684 list_escalations(repo)
685 elapsed = (time.monotonic() - start) * 1000
686 assert elapsed < 50, f"list_escalations(100) took {elapsed:.1f}ms"
687
688 def test_compute_escalation_id_under_1ms(self) -> None:
689 pid = fake_id("p")
690 start = time.monotonic()
691 compute_escalation_id(pid, "reason")
692 elapsed = (time.monotonic() - start) * 1000
693 assert elapsed < 1, f"compute_escalation_id took {elapsed:.2f}ms"
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 20 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 28 days ago