"""Tests for the canonical ``muse agent`` JSON schema. Coverage -------- I keygen schema I1 All required keys present in keygen response I2 status is "ok" I3 hd_seed_b64 decodes to exactly 64 bytes I4 public_key_b64 decodes to exactly 32 bytes I5 fingerprint is sha256 hex of public_key_b64 bytes I6 name is null when --name not provided I7 name reflects --name flag when provided I8 msign_path contains the account index I9 hub is the full URL passed via --hub II list schema II1 Returns a JSON array (not object) II2 Empty array when no slots registered II3 Each entry has all required keys II4 Entries are sorted by account index (ascending) II5 hub in each entry is hostname (not full URL) III register schema III1 All required keys present in register response III2 status is "ok" III3 hub is hostname (not full URL) III4 msign_path contains the account index IV Error paths — JSON errors when --json is passed IV1 keygen with no identity → JSON error, exit 1 IV2 keygen with no mnemonic → JSON error, exit 1 IV3 keygen with negative account → JSON error, exit 1 IV4 keygen with no hub (no config) → JSON error, exit 1 IV5 Error responses include "error" key IV6 Error responses include "message" key """ from __future__ import annotations from collections.abc import Mapping import json import pathlib import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.types import b64url_decode, public_key_fingerprint cli = None runner = CliRunner() _TEST_HUB = "https://localhost:1337" _TEST_HOSTNAME = "localhost:1337" _TEST_MNEMONIC = ( "abandon abandon abandon abandon abandon abandon abandon abandon " "abandon abandon abandon about" ) _KEYGEN_REQUIRED_KEYS = { "status", "hub", "account", "name", "msign_path", "public_key_b64", "fingerprint", "hd_seed_b64", } _LIST_ENTRY_REQUIRED_KEYS = {"name", "account", "hub", "msign_path"} _REGISTER_REQUIRED_KEYS = {"status", "name", "account", "hub", "msign_path"} # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture() def isolated_identity( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> pathlib.Path: fake_dir = tmp_path / "dot_muse" fake_dir.mkdir() fake_file = fake_dir / "identity.toml" monkeypatch.setattr("muse.core.identity._IDENTITY_DIR", fake_dir) monkeypatch.setattr("muse.core.identity._IDENTITY_FILE", fake_file) return fake_dir @pytest.fixture() def isolated_slots( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> pathlib.Path: fake_dir = tmp_path / "dot_muse_slots" fake_dir.mkdir() fake_file = fake_dir / "agent-slots.toml" monkeypatch.setattr("muse.core.agent_slots._SLOTS_DIR", fake_dir) monkeypatch.setattr("muse.core.agent_slots._SLOTS_FILE", fake_file) return fake_dir @pytest.fixture() def identity_with_mnemonic(isolated_identity: pathlib.Path) -> None: import muse.core.keychain as _kc from muse.core.identity import IdentityEntry, save_identity _kc.store(_TEST_MNEMONIC) entry: IdentityEntry = { "type": "human", "handle": "gabriel", "hd_path": "m/1075233755'/0'/0'/0'/0'/0'", } save_identity(_TEST_HUB, entry) def _keygen( *extra_args: str, identity: None = None, slots: None = None, ) -> Mapping[str, object]: result = runner.invoke( cli, ["agent", "keygen", "--hub", _TEST_HUB, "--account", "1", "--json"] + list(extra_args), ) assert result.exit_code == 0, f"keygen failed:\n{result.output}" return json.loads(result.output.strip().splitlines()[0]) def _list_slots(slots: None = None) -> list[Mapping[str, str | int | None]]: result = runner.invoke( cli, ["agent", "list", "--hub", _TEST_HUB, "--json"] ) assert result.exit_code == 0, f"list failed:\n{result.output}" return json.loads(result.output.strip().splitlines()[0])["slots"] def _register(name: str, account: int, slots: None = None) -> Mapping[str, object]: result = runner.invoke( cli, ["agent", "register", "--hub", _TEST_HUB, "--account", str(account), "--name", name, "--json"], ) assert result.exit_code == 0, f"register failed:\n{result.output}" return json.loads(result.output.strip().splitlines()[0]) # --------------------------------------------------------------------------- # I keygen schema # --------------------------------------------------------------------------- class TestKeygenSchemaI: def test_I1_all_required_keys_present( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: data = _keygen() missing = _KEYGEN_REQUIRED_KEYS - set(data.keys()) assert not missing, f"Missing keys in keygen response: {missing}" def test_I2_status_is_ok( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: data = _keygen() assert data["status"] == "ok" def test_I3_hd_seed_b64_decodes_to_64_bytes( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: data = _keygen() raw = b64url_decode(data["hd_seed_b64"]) assert len(raw) == 64, f"Expected 64 bytes, got {len(raw)}" def test_I4_public_key_b64_decodes_to_32_bytes( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: data = _keygen() raw = b64url_decode(data["public_key_b64"]) assert len(raw) == 32, f"Expected 32 bytes, got {len(raw)}" def test_I5_fingerprint_is_sha256_of_public_key( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: data = _keygen() pub_bytes = b64url_decode(data["public_key_b64"]) expected = public_key_fingerprint(pub_bytes) assert data["fingerprint"] == expected, ( f"Fingerprint mismatch: {data['fingerprint']!r} != {expected!r}" ) def test_I6_name_is_null_without_name_flag( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: data = _keygen() assert data["name"] is None def test_I7_name_reflects_name_flag( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: result = runner.invoke( cli, ["agent", "keygen", "--hub", _TEST_HUB, "--account", "1", "--name", "orchestra", "--json"], ) assert result.exit_code == 0 data = json.loads(result.output.strip().splitlines()[0]) assert data["name"] == "orchestra" def test_I8_msign_path_contains_account_index( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: result = runner.invoke( cli, ["agent", "keygen", "--hub", _TEST_HUB, "--account", "7", "--json"], ) assert result.exit_code == 0 data = json.loads(result.output.strip().splitlines()[0]) assert "7'" in data["msign_path"] assert data["msign_path"].startswith("m/") def test_I9_hub_is_full_url( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: data = _keygen() assert data["hub"] == _TEST_HUB # --------------------------------------------------------------------------- # II list schema # --------------------------------------------------------------------------- class TestListSchemaII: def _list_data(self, result: "InvokeResult") -> Mapping[str, object]: return json.loads(result.output.strip().splitlines()[0]) def test_II1_returns_json_object_with_slots( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: result = runner.invoke( cli, ["agent", "list", "--hub", _TEST_HUB, "--json"] ) assert result.exit_code == 0 data = self._list_data(result) assert isinstance(data, dict) assert "slots" in data def test_II2_empty_slots_when_no_slots( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: result = runner.invoke( cli, ["agent", "list", "--hub", _TEST_HUB, "--json"] ) assert result.exit_code == 0 data = self._list_data(result) assert data["slots"] == [] def test_II3_each_entry_has_required_keys( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: from muse.core.agent_slots import register_slot register_slot(_TEST_HUB, "orchestra", 1) register_slot(_TEST_HUB, "mixer", 2) result = runner.invoke( cli, ["agent", "list", "--hub", _TEST_HUB, "--json"] ) assert result.exit_code == 0 entries = self._list_data(result)["slots"] assert entries for entry in entries: missing = _LIST_ENTRY_REQUIRED_KEYS - set(entry.keys()) assert not missing, f"Missing keys in list entry: {missing}" def test_II4_entries_sorted_by_account( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: from muse.core.agent_slots import register_slot register_slot(_TEST_HUB, "z-agent", 5) register_slot(_TEST_HUB, "a-agent", 2) register_slot(_TEST_HUB, "m-agent", 9) result = runner.invoke( cli, ["agent", "list", "--hub", _TEST_HUB, "--json"] ) assert result.exit_code == 0 entries = self._list_data(result)["slots"] accounts = [e["account"] for e in entries] assert accounts == sorted(accounts) def test_II5_hub_is_hostname_not_url( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: from muse.core.agent_slots import register_slot register_slot(_TEST_HUB, "test-slot", 3) result = runner.invoke( cli, ["agent", "list", "--hub", _TEST_HUB, "--json"] ) assert result.exit_code == 0 entries = self._list_data(result)["slots"] assert entries for entry in entries: assert entry["hub"] == _TEST_HOSTNAME, ( f"Expected hostname {_TEST_HOSTNAME!r}, got {entry['hub']!r}" ) # --------------------------------------------------------------------------- # III register schema # --------------------------------------------------------------------------- class TestRegisterSchemaIII: def test_III1_all_required_keys_present( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: data = _register("orchestra", 1) missing = _REGISTER_REQUIRED_KEYS - set(data.keys()) assert not missing, f"Missing keys in register response: {missing}" def test_III2_status_is_ok( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: data = _register("orchestra", 1) assert data["status"] == "ok" def test_III3_hub_is_hostname( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: data = _register("test-agent", 4) assert data["hub"] == _TEST_HOSTNAME, ( f"Expected hostname {_TEST_HOSTNAME!r}, got {data['hub']!r}" ) def test_III4_msign_path_contains_account_index( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: data = _register("my-agent", 11) assert "11'" in data["msign_path"] assert data["msign_path"].startswith("m/") # --------------------------------------------------------------------------- # IV Error paths — JSON errors when --json is passed # --------------------------------------------------------------------------- class TestErrorPathsIV: def test_IV1_keygen_no_identity_json_error( self, isolated_identity: pathlib.Path, isolated_slots: pathlib.Path, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """No identity registered → exit 1 + JSON error on stdout.""" result = runner.invoke( cli, ["agent", "keygen", "--hub", _TEST_HUB, "--account", "1", "--json"], ) assert result.exit_code == 1 # The first JSON line on stdout must parse json_line = next( (ln for ln in result.output.splitlines() if ln.strip().startswith("{")), None, ) assert json_line is not None, f"No JSON in output:\n{result.output}" data = json.loads(json_line) assert "error" in data def test_IV2_keygen_no_mnemonic_json_error( self, isolated_identity: pathlib.Path, isolated_slots: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Identity exists but has no mnemonic → exit 1 + JSON error.""" # Disable keychain so no leftover entry from a previous test run leaks in. monkeypatch.setenv("MUSE_KEYCHAIN_BACKEND", "disabled") from muse.core.identity import IdentityEntry, save_identity entry: IdentityEntry = {"type": "human", "handle": "gabriel"} save_identity(_TEST_HUB, entry) result = runner.invoke( cli, ["agent", "keygen", "--hub", _TEST_HUB, "--account", "1", "--json"], ) assert result.exit_code == 1 json_line = next( (ln for ln in result.output.splitlines() if ln.strip().startswith("{")), None, ) assert json_line is not None, f"No JSON in output:\n{result.output}" data = json.loads(json_line) assert "error" in data def test_IV3_keygen_negative_account_json_error( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path ) -> None: """Negative account index with --json → exit 1 + JSON error.""" result = runner.invoke( cli, ["agent", "keygen", "--hub", _TEST_HUB, "--account", "-1", "--json"], ) assert result.exit_code == 1 json_line = next( (ln for ln in result.output.splitlines() if ln.strip().startswith("{")), None, ) assert json_line is not None, f"No JSON in output:\n{result.output}" data = json.loads(json_line) assert "error" in data def test_IV4_keygen_no_hub_json_error( self, identity_with_mnemonic: None, isolated_slots: pathlib.Path, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """No hub configured, no --hub flag, --json → exit 1 + JSON error.""" monkeypatch.chdir(tmp_path) result = runner.invoke( cli, ["agent", "keygen", "--account", "1", "--json"], ) assert result.exit_code == 1 json_line = next( (ln for ln in result.output.splitlines() if ln.strip().startswith("{")), None, ) assert json_line is not None, f"No JSON in output:\n{result.output}" data = json.loads(json_line) assert "error" in data def test_IV5_error_has_error_key( self, isolated_identity: pathlib.Path, isolated_slots: pathlib.Path ) -> None: """JSON error responses always have an 'error' key.""" result = runner.invoke( cli, ["agent", "keygen", "--hub", _TEST_HUB, "--account", "1", "--json"], ) assert result.exit_code == 1 json_line = next( (ln for ln in result.output.splitlines() if ln.strip().startswith("{")), None, ) assert json_line is not None data = json.loads(json_line) assert "error" in data, f"No 'error' key in: {data}" assert isinstance(data["error"], str) assert data["error"] # non-empty def test_IV6_error_has_message_key( self, isolated_identity: pathlib.Path, isolated_slots: pathlib.Path ) -> None: """JSON error responses always have a 'message' key.""" result = runner.invoke( cli, ["agent", "keygen", "--hub", _TEST_HUB, "--account", "1", "--json"], ) assert result.exit_code == 1 json_line = next( (ln for ln in result.output.splitlines() if ln.strip().startswith("{")), None, ) assert json_line is not None data = json.loads(json_line) assert "message" in data, f"No 'message' key in: {data}" assert isinstance(data["message"], str) assert data["message"] # non-empty