"""Tests for Phase 4 CLI additions to ``muse harmony``. New subcommands: ``muse harmony escalate `` — record an escalation ``muse harmony escalations`` — list escalations ``muse harmony resolve-escalation `` — close an escalation ``muse harmony engine … --auto-escalate`` — engine auto-records on Tier 4 Coverage tiers -------------- I Unit — TypedDicts for escalate / escalations / resolve-escalation II Success — escalate recorded; escalations list; resolve-escalation closes III Errors — invalid IDs; missing records; resolution-id required IV E2E — engine --auto-escalate → escalations; full lifecycle V Integrity — all JSON fields always present; timestamps present VI Security — path-traversal IDs rejected VII Perf — all subcommands <300 ms """ from __future__ import annotations from collections.abc import Mapping from muse.core.types import fake_id from muse.core.paths import muse_dir import json import pathlib import time import typing import pytest from tests.cli_test_helper import CliRunner runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @pytest.fixture() def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: dot_muse = muse_dir(tmp_path) dot_muse.mkdir() (dot_muse / "config.toml").write_text('[repo]\nname = "test"\nid = "abc"\n') monkeypatch.chdir(tmp_path) return tmp_path def _record_pattern( path: str = "track.mid", domain: str = "midi", conflict_type: str = "content", ours: str = "ours", theirs: str = "theirs", ) -> str: r = runner.invoke(None, [ "harmony", "record", "--path", path, "--domain", domain, "--conflict-type", conflict_type, "--ours-id", fake_id(ours), "--theirs-id", fake_id(theirs), "--json", ]) assert r.exit_code == 0, r.output return json.loads(r.output)["pattern_id"] def _save_resolution(pattern_id: str, confidence: str = "0.9") -> str: r = runner.invoke(None, [ "harmony", "resolve", "--pattern-id", pattern_id, "--strategy", "manual", "--outcome-blob", fake_id("outcome"), "--confidence", confidence, "--json", ]) assert r.exit_code == 0, r.output return json.loads(r.output)["resolution_id"] def _escalate(pattern_id: str, reason: str = "No match found") -> str: r = runner.invoke(None, [ "harmony", "escalate", pattern_id, "--reason", reason, "--json", ]) assert r.exit_code == 0, r.output return json.loads(r.output)["escalation_id"] # =========================================================================== # Tier I — Unit: TypedDict schemas # =========================================================================== class TestTypedDictSchemas: """I: new TypedDicts declare expected keys.""" def _hints(self, name: str) -> Mapping[str, object]: import muse.cli.commands.harmony as h td = getattr(h, name) return typing.get_type_hints(td) def test_escalate_json_has_escalation_id(self) -> None: assert "escalation_id" in self._hints("_HarmonyEscalateJson") def test_escalate_json_has_pattern_id(self) -> None: assert "pattern_id" in self._hints("_HarmonyEscalateJson") def test_escalate_json_has_already_existed(self) -> None: assert "already_existed" in self._hints("_HarmonyEscalateJson") def test_escalation_entry_has_status(self) -> None: assert "status" in self._hints("_HarmonyEscalationEntryJson") def test_escalation_entry_has_escalation_id(self) -> None: assert "escalation_id" in self._hints("_HarmonyEscalationEntryJson") def test_escalation_entry_has_pattern_id(self) -> None: assert "pattern_id" in self._hints("_HarmonyEscalationEntryJson") def test_escalations_json_has_total(self) -> None: assert "total" in self._hints("_HarmonyEscalationsJson") def test_escalations_json_has_escalations(self) -> None: assert "escalations" in self._hints("_HarmonyEscalationsJson") def test_resolve_escalation_json_has_escalation_id(self) -> None: assert "escalation_id" in self._hints("_HarmonyResolveEscalationJson") def test_resolve_escalation_json_has_resolved(self) -> None: assert "resolved" in self._hints("_HarmonyResolveEscalationJson") class TestRegistration: """I: new subcommands are reachable.""" def test_escalate_help(self, repo: pathlib.Path) -> None: r = runner.invoke(None, ["harmony", "escalate", "--help"]) assert r.exit_code == 0 def test_escalations_help(self, repo: pathlib.Path) -> None: r = runner.invoke(None, ["harmony", "escalations", "--help"]) assert r.exit_code == 0 def test_resolve_escalation_help(self, repo: pathlib.Path) -> None: r = runner.invoke(None, ["harmony", "resolve-escalation", "--help"]) assert r.exit_code == 0 # =========================================================================== # Tier II — Integration: success paths # =========================================================================== class TestEscalateSuccess: """II: muse harmony escalate — success paths.""" def test_escalate_returns_escalation_id(self, repo: pathlib.Path) -> None: pid = _record_pattern() r = runner.invoke(None, ["harmony", "escalate", pid, "--json"]) assert r.exit_code == 0 data = json.loads(r.output) assert "escalation_id" in data assert data["escalation_id"].startswith("sha256:") def test_escalate_returns_pattern_id(self, repo: pathlib.Path) -> None: pid = _record_pattern() r = runner.invoke(None, ["harmony", "escalate", pid, "--json"]) data = json.loads(r.output) assert data["pattern_id"] == pid def test_escalate_already_existed_false_on_first(self, repo: pathlib.Path) -> None: pid = _record_pattern() r = runner.invoke(None, ["harmony", "escalate", pid, "--json"]) assert json.loads(r.output)["already_existed"] is False def test_escalate_idempotent_already_existed_true(self, repo: pathlib.Path) -> None: pid = _record_pattern() runner.invoke(None, ["harmony", "escalate", pid, "--json"]) r2 = runner.invoke(None, ["harmony", "escalate", pid, "--json"]) assert r2.exit_code == 0 assert json.loads(r2.output)["already_existed"] is True def test_escalate_same_id_both_calls(self, repo: pathlib.Path) -> None: pid = _record_pattern() r1 = runner.invoke(None, ["harmony", "escalate", pid, "--json"]) r2 = runner.invoke(None, ["harmony", "escalate", pid, "--json"]) eid1 = json.loads(r1.output)["escalation_id"] eid2 = json.loads(r2.output)["escalation_id"] assert eid1 == eid2 def test_escalate_custom_reason(self, repo: pathlib.Path) -> None: pid = _record_pattern() r = runner.invoke(None, [ "harmony", "escalate", pid, "--reason", "Custom escalation reason", "--json", ]) assert r.exit_code == 0 assert "escalation_id" in json.loads(r.output) def test_escalate_text_output(self, repo: pathlib.Path) -> None: pid = _record_pattern() r = runner.invoke(None, ["harmony", "escalate", pid]) assert r.exit_code == 0 assert pid[:12] in r.output def test_escalate_unknown_pattern_still_works(self, repo: pathlib.Path) -> None: """Escalation can reference a pattern not in the store.""" unknown_pid = fake_id("unknown-pattern") r = runner.invoke(None, ["harmony", "escalate", unknown_pid, "--json"]) assert r.exit_code == 0 assert json.loads(r.output)["pattern_id"] == unknown_pid def test_escalate_with_agent_id(self, repo: pathlib.Path) -> None: pid = _record_pattern() r = runner.invoke(None, [ "harmony", "escalate", pid, "--agent-id", "claude-code", "--json", ]) assert r.exit_code == 0 class TestEscalationsSuccess: """II: muse harmony escalations — success paths.""" def test_empty_store_returns_zero(self, repo: pathlib.Path) -> None: r = runner.invoke(None, ["harmony", "escalations", "--json"]) assert r.exit_code == 0 data = json.loads(r.output) assert data["total"] == 0 assert data["escalations"] == [] def test_lists_all_by_default(self, repo: pathlib.Path) -> None: pid1, pid2 = _record_pattern("a.mid"), _record_pattern("b.mid", ours="b_ours", theirs="b_theirs") _escalate(pid1) _escalate(pid2) r = runner.invoke(None, ["harmony", "escalations", "--json"]) assert r.exit_code == 0 data = json.loads(r.output) assert data["total"] == 2 def test_filter_open(self, repo: pathlib.Path) -> None: pid1 = _record_pattern() pid2 = _record_pattern("b.mid", ours="bo", theirs="bt") eid1 = _escalate(pid1) _escalate(pid2) # resolve first res_id = _save_resolution(pid1) runner.invoke(None, [ "harmony", "resolve-escalation", eid1, "--resolution-id", res_id, "--json", ]) r = runner.invoke(None, ["harmony", "escalations", "--status", "open", "--json"]) data = json.loads(r.output) assert data["total"] == 1 def test_filter_resolved(self, repo: pathlib.Path) -> None: pid = _record_pattern() eid = _escalate(pid) res_id = _save_resolution(pid) runner.invoke(None, [ "harmony", "resolve-escalation", eid, "--resolution-id", res_id, "--json", ]) r = runner.invoke(None, ["harmony", "escalations", "--status", "resolved", "--json"]) assert json.loads(r.output)["total"] == 1 def test_text_output(self, repo: pathlib.Path) -> None: pid = _record_pattern() _escalate(pid) r = runner.invoke(None, ["harmony", "escalations"]) assert r.exit_code == 0 class TestResolveEscalationSuccess: """II: muse harmony resolve-escalation — success paths.""" def test_resolved_true_when_found(self, repo: pathlib.Path) -> None: pid = _record_pattern() eid = _escalate(pid) res_id = _save_resolution(pid) r = runner.invoke(None, [ "harmony", "resolve-escalation", eid, "--resolution-id", res_id, "--json", ]) assert r.exit_code == 0 data = json.loads(r.output) assert data["resolved"] is True def test_resolve_escalation_id_in_response(self, repo: pathlib.Path) -> None: pid = _record_pattern() eid = _escalate(pid) res_id = _save_resolution(pid) r = runner.invoke(None, [ "harmony", "resolve-escalation", eid, "--resolution-id", res_id, "--json", ]) assert json.loads(r.output)["escalation_id"] == eid def test_resolved_false_when_not_found(self, repo: pathlib.Path) -> None: eid = fake_id("missing-esc") res_id = fake_id("res") r = runner.invoke(None, [ "harmony", "resolve-escalation", eid, "--resolution-id", res_id, "--json", ]) assert r.exit_code == 0 assert json.loads(r.output)["resolved"] is False def test_text_output(self, repo: pathlib.Path) -> None: pid = _record_pattern() eid = _escalate(pid) res_id = _save_resolution(pid) r = runner.invoke(None, [ "harmony", "resolve-escalation", eid, "--resolution-id", res_id, ]) assert r.exit_code == 0 class TestEngineAutoEscalate: """II: engine --auto-escalate creates escalation record on Tier 4.""" def test_auto_escalate_creates_record(self, repo: pathlib.Path) -> None: pid = _record_pattern() runner.invoke(None, ["harmony", "engine", pid, "--auto-escalate", "--json"]) r = runner.invoke(None, ["harmony", "escalations", "--json"]) data = json.loads(r.output) assert data["total"] >= 1 def test_auto_escalate_only_on_escalated(self, repo: pathlib.Path) -> None: """If the engine resolves (applied), no escalation record is created.""" pid = _record_pattern() _save_resolution(pid, confidence="0.95") runner.invoke(None, ["harmony", "engine", pid, "--auto-escalate", "--json"]) r = runner.invoke(None, ["harmony", "escalations", "--json"]) assert json.loads(r.output)["total"] == 0 def test_auto_escalate_pattern_id_in_record(self, repo: pathlib.Path) -> None: pid = _record_pattern() runner.invoke(None, ["harmony", "engine", pid, "--auto-escalate"]) r = runner.invoke(None, ["harmony", "escalations", "--json"]) recs = json.loads(r.output)["escalations"] assert any(e["pattern_id"] == pid for e in recs) def test_engine_without_flag_no_escalation_record(self, repo: pathlib.Path) -> None: """Without --auto-escalate, escalate tier writes audit but NOT a record.""" pid = _record_pattern() runner.invoke(None, ["harmony", "engine", pid, "--json"]) r = runner.invoke(None, ["harmony", "escalations", "--json"]) assert json.loads(r.output)["total"] == 0 # =========================================================================== # Tier III — Error paths # =========================================================================== class TestEscalateErrors: """III: muse harmony escalate — error paths.""" def test_invalid_id_exits_1(self, repo: pathlib.Path) -> None: r = runner.invoke(None, ["harmony", "escalate", "bad-id", "--json"]) assert r.exit_code == 1 def test_traversal_id_exits_1(self, repo: pathlib.Path) -> None: r = runner.invoke(None, ["harmony", "escalate", "../../malicious", "--json"]) assert r.exit_code == 1 class TestResolveEscalationErrors: """III: muse harmony resolve-escalation — error paths.""" def test_invalid_escalation_id_exits_1(self, repo: pathlib.Path) -> None: r = runner.invoke(None, [ "harmony", "resolve-escalation", "bad-id", "--resolution-id", fake_id("r"), "--json", ]) assert r.exit_code == 1 def test_invalid_resolution_id_exits_1(self, repo: pathlib.Path) -> None: eid = fake_id("esc") r = runner.invoke(None, [ "harmony", "resolve-escalation", eid, "--resolution-id", "bad-res", "--json", ]) assert r.exit_code == 1 def test_missing_resolution_id_flag_exits_non_zero(self, repo: pathlib.Path) -> None: eid = fake_id("esc") r = runner.invoke(None, ["harmony", "resolve-escalation", eid, "--json"]) assert r.exit_code != 0 # =========================================================================== # Tier IV — End-to-end # =========================================================================== class TestEndToEnd: """IV: Full lifecycle via CLI.""" def test_escalate_resolve_escalation_audit(self, repo: pathlib.Path) -> None: pid = _record_pattern() eid = _escalate(pid) # Verify escalation is open r = runner.invoke(None, ["harmony", "escalations", "--status", "open", "--json"]) assert json.loads(r.output)["total"] == 1 # Save a resolution res_id = _save_resolution(pid) # Resolve the escalation r = runner.invoke(None, [ "harmony", "resolve-escalation", eid, "--resolution-id", res_id, "--json", ]) assert json.loads(r.output)["resolved"] is True # Now zero open escalations r = runner.invoke(None, ["harmony", "escalations", "--status", "open", "--json"]) assert json.loads(r.output)["total"] == 0 # Audit log has escalation_resolved event r = runner.invoke(None, ["harmony", "audit", "--json"]) event_types = [e["event_type"] for e in json.loads(r.output)["entries"]] assert "escalation_resolved" in event_types def test_engine_auto_escalate_then_resolve(self, repo: pathlib.Path) -> None: pid = _record_pattern() runner.invoke(None, ["harmony", "engine", pid, "--auto-escalate"]) r = runner.invoke(None, ["harmony", "escalations", "--status", "open", "--json"]) open_recs = json.loads(r.output)["escalations"] assert len(open_recs) >= 1 eid = open_recs[0]["escalation_id"] res_id = _save_resolution(pid) r = runner.invoke(None, [ "harmony", "resolve-escalation", eid, "--resolution-id", res_id, "--json", ]) assert json.loads(r.output)["resolved"] is True def test_multiple_escalate_filter_independently(self, repo: pathlib.Path) -> None: pids = [_record_pattern(f"{i}.mid", ours=f"o{i}", theirs=f"t{i}") for i in range(4)] eids = [_escalate(p) for p in pids] # Resolve first two for i, (pid, eid) in enumerate(zip(pids[:2], eids[:2])): res_id = _save_resolution(pid, confidence=f"0.{80 + i}") runner.invoke(None, [ "harmony", "resolve-escalation", eid, "--resolution-id", res_id, "--json", ]) r_open = runner.invoke(None, ["harmony", "escalations", "--status", "open", "--json"]) r_resolved = runner.invoke(None, ["harmony", "escalations", "--status", "resolved", "--json"]) assert json.loads(r_open.output)["total"] == 2 assert json.loads(r_resolved.output)["total"] == 2 # =========================================================================== # Tier V — Data integrity # =========================================================================== class TestDataIntegrity: """V: All JSON fields always present; types correct.""" def test_escalate_all_fields_present(self, repo: pathlib.Path) -> None: pid = _record_pattern() r = runner.invoke(None, ["harmony", "escalate", pid, "--json"]) data = json.loads(r.output) for field in ("escalation_id", "pattern_id", "already_existed"): assert field in data, f"missing: {field}" def test_escalations_entry_all_fields_present(self, repo: pathlib.Path) -> None: pid = _record_pattern() _escalate(pid) r = runner.invoke(None, ["harmony", "escalations", "--json"]) entry = json.loads(r.output)["escalations"][0] for field in ("escalation_id", "pattern_id", "reason", "status", "escalated_at", "escalated_by"): assert field in entry, f"missing: {field}" def test_escalation_id_is_prefixed_sha256(self, repo: pathlib.Path) -> None: pid = _record_pattern() r = runner.invoke(None, ["harmony", "escalate", pid, "--json"]) eid = json.loads(r.output)["escalation_id"] assert eid.startswith("sha256:") assert len(eid) == 71 # "sha256:" + 64 hex chars assert all(c in "0123456789abcdef" for c in eid[7:]) def test_resolve_escalation_all_fields_present(self, repo: pathlib.Path) -> None: pid = _record_pattern() eid = _escalate(pid) res_id = _save_resolution(pid) r = runner.invoke(None, [ "harmony", "resolve-escalation", eid, "--resolution-id", res_id, "--json", ]) data = json.loads(r.output) for field in ("escalation_id", "resolved"): assert field in data, f"missing: {field}" def test_escalations_empty_list_not_null(self, repo: pathlib.Path) -> None: r = runner.invoke(None, ["harmony", "escalations", "--json"]) data = json.loads(r.output) assert isinstance(data["escalations"], list) # =========================================================================== # Tier VI — Security # =========================================================================== class TestSecurity: """VI: Path-traversal IDs rejected at all Phase 4 entry points.""" def test_escalate_traversal_rejected(self, repo: pathlib.Path) -> None: r = runner.invoke(None, ["harmony", "escalate", "../../malicious", "--json"]) assert r.exit_code == 1 def test_escalate_null_byte_rejected(self, repo: pathlib.Path) -> None: r = runner.invoke(None, ["harmony", "escalate", "a" * 63 + "\x00", "--json"]) assert r.exit_code == 1 def test_resolve_escalation_traversal_eid(self, repo: pathlib.Path) -> None: r = runner.invoke(None, [ "harmony", "resolve-escalation", "../../malicious", "--resolution-id", fake_id("r"), "--json", ]) assert r.exit_code == 1 def test_resolve_escalation_traversal_res_id(self, repo: pathlib.Path) -> None: eid = fake_id("esc") r = runner.invoke(None, [ "harmony", "resolve-escalation", eid, "--resolution-id", "../../malicious", "--json", ]) assert r.exit_code == 1 # =========================================================================== # Tier VII — Performance # =========================================================================== class TestPerformance: """VII: all Phase 4 subcommands <300 ms.""" def test_escalate_under_300ms(self, repo: pathlib.Path) -> None: pid = _record_pattern() start = time.monotonic() runner.invoke(None, ["harmony", "escalate", pid, "--json"]) elapsed = (time.monotonic() - start) * 1000 assert elapsed < 300, f"escalate took {elapsed:.0f}ms" def test_escalations_under_300ms(self, repo: pathlib.Path) -> None: start = time.monotonic() runner.invoke(None, ["harmony", "escalations", "--json"]) elapsed = (time.monotonic() - start) * 1000 assert elapsed < 300, f"escalations took {elapsed:.0f}ms" def test_resolve_escalation_under_300ms(self, repo: pathlib.Path) -> None: pid = _record_pattern() eid = _escalate(pid) res_id = _save_resolution(pid) start = time.monotonic() runner.invoke(None, [ "harmony", "resolve-escalation", eid, "--resolution-id", res_id, "--json", ]) elapsed = (time.monotonic() - start) * 1000 assert elapsed < 300, f"resolve-escalation took {elapsed:.0f}ms"