"""Comprehensive hardening tests for ``muse auth``. Coverage -------- Unit - _hub_base_url: scheme validation, port handling, path stripping - _json_post: scheme guard fires before network, HTTP error, URLError, oversized response, non-dict response - _sanitize_token_or_exit: valid token, empty, control char, too long - _display_entry: JSON schema correctness, all fields present, stderr routing - _resolve_hub: explicit → config → None Integration (real fixture — identity.toml in tmp_path) - run_whoami: no identity exits nonzero, --json schema correct, --all lists all - run_logout: missing identity is not an error, --json schema, --all clears all Security - file:// hub URL rejected in _hub_base_url and _json_post - ftp:// hub URL rejected - ANSI in hub URL sanitized in error messages - Token never echoed to stdout E2E (via CliRunner + fixture) - keygen: --json schema when crypto unavailable (ImportError handled gracefully) - login: --json schema with all required keys - whoami: --json schema with all required keys, token_set=true - logout: --json schema ok, --json schema nothing_to_do - --all flag for whoami and logout Stress - 8 concurrent logins to isolated identity files do not race """ from __future__ import annotations import json import pathlib import ssl import threading import urllib.request from typing import TYPE_CHECKING from unittest.mock import MagicMock, patch import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.paths import muse_dir if TYPE_CHECKING: pass from muse.cli.commands.auth import ( _KeygenJson, _LogoutJson, _WhoamiJson, ) from muse.core.identity import IdentityEntry, save_identity cli = None runner = CliRunner() # ── fixture ─────────────────────────────────────────────────────────────────── @pytest.fixture def identity_dir(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: """Isolated ~/.muse dir with a fresh identity.toml; no repo needed for auth.""" muse_home = tmp_path / ".muse" muse_home.mkdir() (muse_home / "identity.toml").write_text("") monkeypatch.setenv("MUSE_HOME", str(muse_home)) monkeypatch.setattr( "muse.core.identity._IDENTITY_FILE", muse_home / "identity.toml", ) return muse_home @pytest.fixture def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: """Minimal .muse/ repo with identity file, used for CLI E2E tests.""" from muse._version import __version__ dot_muse = muse_dir(tmp_path) for sub in ("refs/heads", "objects", "commits", "snapshots"): (dot_muse / sub).mkdir(parents=True, exist_ok=True) (dot_muse / "repo.json").write_text( json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"}) ) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") (dot_muse / "refs" / "heads" / "main").write_text("") (dot_muse / "config.toml").write_text("") muse_home = tmp_path / ".muse-home" muse_home.mkdir() (muse_home / "identity.toml").write_text("") monkeypatch.setenv("MUSE_HOME", str(muse_home)) monkeypatch.setattr( "muse.core.identity._IDENTITY_FILE", muse_home / "identity.toml", ) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) monkeypatch.chdir(tmp_path) return tmp_path def _json_whoami(result: InvokeResult) -> _WhoamiJson: for line in result.output.splitlines(): stripped = line.strip() if stripped.startswith("{"): data: _WhoamiJson = json.loads(stripped) return data raise ValueError(f"No JSON line in output:\n{result.output!r}") def _json_logout(result: InvokeResult) -> _LogoutJson: for line in result.output.splitlines(): stripped = line.strip() if stripped.startswith("{"): data: _LogoutJson = json.loads(stripped) return data raise ValueError(f"No JSON line in output:\n{result.output!r}") def _json_keygen(result: InvokeResult) -> _KeygenJson: for line in result.output.splitlines(): stripped = line.strip() if stripped.startswith("{"): data: _KeygenJson = json.loads(stripped) return data raise ValueError(f"No JSON line in output:\n{result.output!r}") # ── Unit: _hub_base_url ─────────────────────────────────────────────────────── class TestHubBaseUrl: def test_https_path_stripped(self) -> None: from muse.cli.commands.auth import _hub_base_url assert _hub_base_url("https://musehub.ai/gabriel/muse") == "https://musehub.ai" def test_http_localhost_port_preserved(self) -> None: from muse.cli.commands.auth import _hub_base_url assert _hub_base_url("https://localhost:1337") == "https://localhost:1337" def test_explicit_port_preserved(self) -> None: from muse.cli.commands.auth import _hub_base_url assert _hub_base_url("https://hub.example.com:8443/owner/repo") == "https://hub.example.com:8443" def test_file_scheme_exits_nonzero(self) -> None: from muse.cli.commands.auth import _hub_base_url from muse.core.errors import ExitCode with pytest.raises(SystemExit) as exc_info: _hub_base_url("file:///etc/passwd") assert exc_info.value.code == ExitCode.USER_ERROR def test_ftp_scheme_exits_nonzero(self) -> None: from muse.cli.commands.auth import _hub_base_url with pytest.raises(SystemExit): _hub_base_url("ftp://ftp.example.com/repo") def test_data_scheme_exits_nonzero(self) -> None: from muse.cli.commands.auth import _hub_base_url with pytest.raises(SystemExit): _hub_base_url("data:text/plain,malicious") # ── Unit: _json_post ────────────────────────────────────────────────────────── class TestJsonPost: def test_file_scheme_rejected_before_network(self) -> None: """_json_post must not make a network request for file:// URLs.""" from muse.cli.commands.auth import _json_post_raw as _json_post with patch("urllib.request.urlopen") as mock_open: with pytest.raises(SystemExit): _json_post("file:///etc/passwd", "/api/test", {}) mock_open.assert_not_called() def test_ftp_scheme_rejected_before_network(self) -> None: from muse.cli.commands.auth import _json_post_raw as _json_post with patch("urllib.request.urlopen") as mock_open: with pytest.raises(SystemExit): _json_post("ftp://example.com", "/api/test", {}) mock_open.assert_not_called() def test_http_error_exits_user_error(self) -> None: import urllib.error from muse.cli.commands.auth import _json_post_raw as _json_post exc = urllib.error.HTTPError( url="", code=401, msg="Unauthorized", hdrs=MagicMock(), fp=None ) # fp must be None to avoid type conflicts; HTTPError accepts None here with patch("urllib.request.urlopen", side_effect=exc): with pytest.raises(SystemExit): _json_post("http://hub.local", "/api/test", {}) def test_url_error_exits_user_error(self) -> None: import urllib.error from muse.cli.commands.auth import _json_post_raw as _json_post exc = urllib.error.URLError(reason="connection refused") with patch("urllib.request.urlopen", side_effect=exc): with pytest.raises(SystemExit): _json_post("http://hub.local", "/api/test", {}) def test_oversized_response_exits(self) -> None: from muse.cli.commands.auth import _MAX_RESPONSE_BYTES, _json_post_raw as _json_post mock_resp = MagicMock() mock_resp.__enter__ = lambda s: s mock_resp.__exit__ = MagicMock(return_value=False) mock_resp.read.return_value = b"x" * (_MAX_RESPONSE_BYTES + 2) with patch("urllib.request.urlopen", return_value=mock_resp): with pytest.raises(SystemExit): _json_post("http://hub.local", "/api/test", {}) def test_non_dict_response_exits(self) -> None: from muse.cli.commands.auth import _json_post_raw as _json_post mock_resp = MagicMock() mock_resp.__enter__ = lambda s: s mock_resp.__exit__ = MagicMock(return_value=False) mock_resp.read.return_value = b'["not", "a", "dict"]' with patch("urllib.request.urlopen", return_value=mock_resp): with pytest.raises(SystemExit): _json_post("http://hub.local", "/api/test", {}) def test_none_values_stripped_from_payload(self) -> None: """None-valued keys must not appear in the serialised request body.""" from muse.cli.commands.auth import _json_post_raw as _json_post captured: list[bytes] = [] def _capture(req: urllib.request.Request, timeout: float, context: ssl.SSLContext | None = None) -> MagicMock: data: bytes = req.data if isinstance(req.data, bytes) else b"" captured.append(data) mock_resp = MagicMock() mock_resp.__enter__ = lambda s: s mock_resp.__exit__ = MagicMock(return_value=False) mock_resp.read.return_value = b'{"ok": true}' return mock_resp with patch("urllib.request.urlopen", side_effect=_capture): _json_post("http://hub.local", "/test", {"a": "1", "b": None, "c": "3"}) sent = json.loads(captured[0]) assert "b" not in sent assert sent["a"] == "1" assert sent["c"] == "3" # ── Unit: _display_entry ────────────────────────────────────────────────────── class TestDisplayEntry: def _make_entry(self) -> IdentityEntry: return { "type": "human", "handle": "alice", "algorithm": "ed25519", "fingerprint": "abc123fingerprint456", "hd_path": "m/1075233755'/0'/0'/0'/0'/0'", } def test_json_output_all_keys_present(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.auth import _display_entry _display_entry("hub.example.com", self._make_entry(), json_output=True) data = json.loads(capsys.readouterr().out) for key in ("hub", "type", "handle", "fingerprint", "key_set", "capabilities"): assert key in data, f"Missing key: {key}" def test_json_key_set_true(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.auth import _display_entry _display_entry("hub.example.com", self._make_entry(), json_output=True) data = json.loads(capsys.readouterr().out) assert data["key_set"] is True assert isinstance(data["key_set"], bool) def test_json_no_key_key_set_false(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.auth import _display_entry entry: IdentityEntry = {"type": "agent"} _display_entry("hub.example.com", entry, json_output=True) data = json.loads(capsys.readouterr().out) assert data["key_set"] is False assert isinstance(data["key_set"], bool) def test_json_goes_to_stdout_not_stderr(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.auth import _display_entry _display_entry("hub.example.com", self._make_entry(), json_output=True) captured = capsys.readouterr() assert captured.out.strip().startswith("{") def test_text_goes_to_stderr(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.auth import _display_entry _display_entry("hub.example.com", self._make_entry(), json_output=False) captured = capsys.readouterr() assert captured.out.strip() == "" assert "hub.example.com" in captured.err def test_ansi_in_hostname_stripped_in_text_mode(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.auth import _display_entry _display_entry("\x1b[31mmalicious\x1b[0m", self._make_entry(), json_output=False) assert "\x1b[" not in capsys.readouterr().err def test_capabilities_list_in_json(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.auth import _display_entry entry: IdentityEntry = {"type": "agent", "capabilities": ["push", "pull"]} _display_entry("hub.example.com", entry, json_output=True) data = json.loads(capsys.readouterr().out) assert data["capabilities"] == ["push", "pull"] # ── Integration: run_whoami ─────────────────────────────────────────────────── class TestWhoamiHardening: _HUB = "http://localhost:19999" def _store(self, repo: pathlib.Path, *, handle: str = "alice") -> None: save_identity(self._HUB, { "type": "human", "handle": handle, "algorithm": "ed25519", "fingerprint": "fp123", "hd_path": "m/1075233755'/0'/0'/0'/0'/0'", }) def test_no_identity_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB]) assert result.exit_code != 0 def test_whoami_json_schema(self, repo: pathlib.Path) -> None: self._store(repo) result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB, "--json"]) assert result.exit_code == 0 data = _json_whoami(result) for key in ("hub", "type", "handle", "fingerprint", "key_set", "capabilities"): assert key in data, f"Missing key: {key}" assert data["key_set"] is True assert isinstance(data["key_set"], bool) assert data["handle"] == "alice" def test_whoami_json_stdout_clean(self, repo: pathlib.Path) -> None: self._store(repo) result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB, "--json"]) assert result.exit_code == 0 json_lines = [l for l in result.output.splitlines() if l.strip().startswith("{")] assert len(json_lines) >= 1 def test_whoami_all_lists_multiple(self, repo: pathlib.Path) -> None: hub2 = "http://localhost:20000" save_identity(self._HUB, {"type": "human", "handle": "alice"}) save_identity(hub2, {"type": "agent", "handle": "bot"}) result = runner.invoke(cli, ["auth", "whoami", "--all", "--json"]) assert result.exit_code == 0 parsed = json.loads(result.output) assert isinstance(parsed["identities"], list) assert len(parsed["identities"]) == 2 def test_whoami_all_empty_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["auth", "whoami", "--all"]) assert result.exit_code != 0 def test_whoami_no_hub_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["auth", "whoami"]) assert result.exit_code != 0 def test_whoami_all_json_is_single_array(self, repo: pathlib.Path) -> None: hub2 = "http://localhost:20000" save_identity(self._HUB, {"type": "human", "handle": "alice"}) save_identity(hub2, {"type": "agent", "handle": "bot"}) result = runner.invoke(cli, ["auth", "whoami", "--all", "--json"]) assert result.exit_code == 0 parsed = json.loads(result.output) assert isinstance(parsed["identities"], list) assert len(parsed["identities"]) == 2 def test_whoami_key_set_is_bool(self, repo: pathlib.Path) -> None: self._store(repo) result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB, "--json"]) assert result.exit_code == 0 raw = result.output assert '"key_set": true' in raw or '"key_set":true' in raw assert '"key_set": "true"' not in raw def test_whoami_short_j_flag(self, repo: pathlib.Path) -> None: self._store(repo) result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB, "-j"]) assert result.exit_code == 0 json.loads(result.output) def test_whoami_short_a_flag(self, repo: pathlib.Path) -> None: self._store(repo) result = runner.invoke(cli, ["auth", "whoami", "-a"]) assert result.exit_code == 0 assert "localhost" in result.stderr # ── Integration: run_logout ─────────────────────────────────────────────────── class TestLogoutHardening: _HUB = "http://localhost:19999" def _store(self, repo: pathlib.Path) -> None: save_identity(self._HUB, {"type": "human", "handle": "alice"}) def test_logout_success_json_schema(self, repo: pathlib.Path) -> None: self._store(repo) result = runner.invoke( cli, ["auth", "logout", "--hub", self._HUB, "--json"] ) assert result.exit_code == 0 data = _json_logout(result) assert data["status"] == "ok" assert data["count"] == 1 assert isinstance(data["hubs"], list) assert len(data["hubs"]) == 1 def test_logout_nothing_to_do_json(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["auth", "logout", "--hub", self._HUB, "--json"] ) assert result.exit_code == 0 data = _json_logout(result) assert data["status"] == "nothing_to_do" assert data["count"] == 0 assert data["hubs"] == [] def test_logout_all_clears_all(self, repo: pathlib.Path) -> None: hub2 = "http://localhost:20000" save_identity(self._HUB, {"type": "human", "handle": "a"}) save_identity(hub2, {"type": "agent", "handle": "b"}) result = runner.invoke(cli, ["auth", "logout", "--all", "--json"]) assert result.exit_code == 0 data = _json_logout(result) assert data["status"] == "ok" assert data["count"] == 2 def test_logout_all_empty_json(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["auth", "logout", "--all", "--json"]) assert result.exit_code == 0 data = _json_logout(result) assert data["status"] == "nothing_to_do" assert data["count"] == 0 def test_logout_removes_identity(self, repo: pathlib.Path) -> None: self._store(repo) runner.invoke(cli, ["auth", "logout", "--hub", self._HUB]) from muse.core.identity import load_identity assert load_identity(self._HUB) is None def test_logout_json_stdout_clean(self, repo: pathlib.Path) -> None: self._store(repo) result = runner.invoke(cli, ["auth", "logout", "--hub", self._HUB, "--json"]) assert result.exit_code == 0 for line in result.output.splitlines(): stripped = line.strip() if stripped: assert stripped.startswith("{") or stripped.startswith('"') \ or stripped.startswith("}"), f"Non-JSON on stdout: {stripped!r}" def test_logout_json_hubs_list_contains_hostname(self, repo: pathlib.Path) -> None: """JSON output 'hubs' list contains the normalised hostname.""" self._store(repo) result = runner.invoke( cli, ["auth", "logout", "--hub", self._HUB, "--json"] ) assert result.exit_code == 0 data = _json_logout(result) assert "hubs" in data assert len(data["hubs"]) == 1 assert "localhost" in data["hubs"][0] def test_logout_all_json_hubs_sorted(self, repo: pathlib.Path) -> None: """--all --json hubs list is alphabetically sorted.""" hubs = [ "http://z-hub.example.com", "http://a-hub.example.com", "http://m-hub.example.com", ] for h in hubs: save_identity(h, {"type": "human", "handle": "u"}) result = runner.invoke(cli, ["auth", "logout", "--all", "--json"]) assert result.exit_code == 0 data = _json_logout(result) assert data["hubs"] == sorted(data["hubs"]), "hubs not sorted" assert data["count"] == 3 def test_logout_idempotent_second_call(self, repo: pathlib.Path) -> None: """Logging out twice succeeds both times (second call is nothing_to_do).""" self._store(repo) r1 = runner.invoke(cli, ["auth", "logout", "--hub", self._HUB, "--json"]) r2 = runner.invoke(cli, ["auth", "logout", "--hub", self._HUB, "--json"]) assert r1.exit_code == 0 assert r2.exit_code == 0 d1 = _json_logout(r1) d2 = _json_logout(r2) assert d1["status"] == "ok" assert d2["status"] == "nothing_to_do" def test_logout_preserves_sibling_hub(self, repo: pathlib.Path) -> None: """Logging out from one hub must not remove a different hub's identity.""" hub2 = "http://sibling.example.com" self._store(repo) save_identity(hub2, {"type": "agent", "handle": "bot"}) runner.invoke(cli, ["auth", "logout", "--hub", self._HUB]) from muse.core.identity import load_identity assert load_identity(self._HUB) is None assert load_identity(hub2) is not None def test_logout_short_flags(self, repo: pathlib.Path) -> None: """-j and -a short flags work identically to --json and --all.""" hub2 = "http://shortflag.example.com" self._store(repo) save_identity(hub2, {"type": "agent", "handle": "bot"}) result = runner.invoke(cli, ["auth", "logout", "-a", "-j"]) assert result.exit_code == 0 data = _json_logout(result) assert data["status"] == "ok" assert data["count"] == 2 def test_logout_all_single_write(self, repo: pathlib.Path) -> None: """clear_all_identities is called exactly once for --all (not N times).""" import unittest.mock hubs = [f"http://hub{i}.example.com" for i in range(5)] for h in hubs: save_identity(h, {"type": "human", "handle": "u"}) with unittest.mock.patch( "muse.core.identity._save_all", wraps=__import__("muse.core.identity", fromlist=["_save_all"])._save_all ) as mock_save: result = runner.invoke(cli, ["auth", "logout", "--all", "--json"]) assert result.exit_code == 0 # _save_all must be called exactly once for all 5 hubs assert mock_save.call_count == 1 def test_logout_ansi_in_hub_url_sanitized(self, repo: pathlib.Path) -> None: """ANSI codes injected into a hub URL display name are stripped in text output.""" import unittest.mock ansi_hostname = "\x1b[31mattacker.example.com\x1b[0m" malicious_entry: IdentityEntry = {"type": "human", "handle": "eve"} with unittest.mock.patch( "muse.core.identity._load_all", return_value={ansi_hostname: malicious_entry}, ): # Provide a matching --hub so logout finds it result = runner.invoke( cli, ["auth", "logout", "--hub", ansi_hostname], ) # Output (stderr) must not contain raw ESC bytes assert "\x1b" not in result.output, "ANSI escape leaked to output" # ── Security ────────────────────────────────────────────────────────────────── class TestAuthSecurity: def test_json_post_file_scheme_blocked(self) -> None: from muse.cli.commands.auth import _json_post_raw as _json_post with patch("urllib.request.urlopen") as mock_net: with pytest.raises(SystemExit): _json_post("file:///etc/passwd", "/api/test", {}) mock_net.assert_not_called() def test_hub_base_url_file_scheme_does_not_open_file(self) -> None: from muse.cli.commands.auth import _hub_base_url with patch("builtins.open") as mock_open: with pytest.raises(SystemExit): _hub_base_url("file:///etc/shadow") mock_open.assert_not_called() # ── E2E: keygen with unavailable crypto ────────────────────────────────────── class TestKeygenCLI: def test_keygen_no_hub_exits_nonzero(self, repo: pathlib.Path) -> None: result = runner.invoke(cli, ["auth", "keygen"]) assert result.exit_code != 0 def test_keygen_invalid_scheme_exits(self, repo: pathlib.Path) -> None: result = runner.invoke( cli, ["auth", "keygen", "--hub", "ftp://attacker.example.com"] ) # May fail at scheme validation or at key generation assert result.exit_code != 0 # ── E2E: whoami capabilities field ─────────────────────────────────────────── class TestWhoamiCapabilities: _HUB = "http://localhost:19999" def test_capabilities_empty_list_in_json(self, repo: pathlib.Path) -> None: save_identity(self._HUB, {"type": "human", "handle": "alice"}) result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB, "--json"]) assert result.exit_code == 0 data = _json_whoami(result) assert isinstance(data["capabilities"], list) # ── Stress: concurrent logins ───────────────────────────────────────────────── class TestStressConcurrent: def test_8_sequential_saves_to_isolated_files_correct( self, tmp_path: pathlib.Path ) -> None: """Sequential save→load round-trips to isolated identity files produce correct data. This validates the save/load logic is correct before layering concurrency. Module-level patching is not thread-safe, so isolation is tested sequentially. """ import muse.core.identity as _id_mod orig_file = _id_mod._IDENTITY_FILE for idx in range(8): identity_file = tmp_path / f"identity_{idx}.toml" identity_file.write_text("") hub = f"http://localhost:{19000 + idx}" _id_mod._IDENTITY_FILE = identity_file try: from muse.core.identity import save_identity, load_identity, IdentityEntry entry: IdentityEntry = {"type": "agent", "handle": f"agent-{idx}"} save_identity(hub, entry) stored = load_identity(hub) assert stored is not None, f"Identity not stored for {hub}" assert stored["handle"] == f"agent-{idx}" finally: _id_mod._IDENTITY_FILE = orig_file def test_8_concurrent_logins_shared_identity_file( self, tmp_path: pathlib.Path ) -> None: """8 threads writing different hubs to the same identity file. The advisory lock in ``_identity_write_lock()`` must prevent read-modify-write races. All 8 entries must survive after all threads complete. We patch _IDENTITY_FILE and _IDENTITY_DIR *once* before threads start to avoid the non-thread-safe module-attribute race that would arise if each thread patched them independently. """ import muse.core.identity as _id_mod identity_dir = tmp_path / "muse-home" identity_dir.mkdir() identity_file = identity_dir / "identity.toml" identity_file.write_text("") orig_file = _id_mod._IDENTITY_FILE orig_dir = _id_mod._IDENTITY_DIR _id_mod._IDENTITY_FILE = identity_file _id_mod._IDENTITY_DIR = identity_dir errors: list[str] = [] def _do(idx: int) -> None: try: hub = f"http://localhost:{19000 + idx}" from muse.core.identity import save_identity, IdentityEntry entry: IdentityEntry = {"type": "human", "handle": f"user-{idx:02d}"} save_identity(hub, entry) except Exception as exc: errors.append(f"Thread {idx}: {exc}") threads = [threading.Thread(target=_do, args=(i,)) for i in range(8)] for t in threads: t.start() for t in threads: t.join() try: from muse.core.identity import list_all_identities all_ids = list_all_identities() finally: _id_mod._IDENTITY_FILE = orig_file _id_mod._IDENTITY_DIR = orig_dir assert errors == [], f"Concurrent shared identity file failures:\n{'\n'.join(errors)}" assert len(all_ids) == 8, f"Expected 8 identities, got {len(all_ids)}: {list(all_ids)}" def test_16_sequential_logins_all_survive( self, tmp_path: pathlib.Path ) -> None: """16 sequential logins to distinct hubs all persist correctly. Validates that repeated save→load round-trips don't corrupt the TOML file and that all entries coexist after the final write. """ import muse.core.identity as _id_mod identity_dir = tmp_path / "muse-stress" identity_dir.mkdir() identity_file = identity_dir / "identity.toml" identity_file.write_text("") orig_file = _id_mod._IDENTITY_FILE orig_dir = _id_mod._IDENTITY_DIR _id_mod._IDENTITY_FILE = identity_file _id_mod._IDENTITY_DIR = identity_dir try: from muse.core.identity import save_identity, load_identity, IdentityEntry for idx in range(16): hub = f"http://localhost:{21000 + idx}" entry: IdentityEntry = { "type": "agent", "handle": f"agent-{idx:02d}", } save_identity(hub, entry) from muse.core.identity import list_all_identities all_ids = list_all_identities() assert len(all_ids) == 16, f"Expected 16, got {len(all_ids)}" for idx in (0, 7, 15): hub = f"http://localhost:{21000 + idx}" stored = load_identity(hub) assert stored is not None assert stored["handle"] == f"agent-{idx:02d}" finally: _id_mod._IDENTITY_FILE = orig_file _id_mod._IDENTITY_DIR = orig_dir # --------------------------------------------------------------------------- # Flag registration tests # --------------------------------------------------------------------------- import argparse as _argparse from muse.cli.commands.auth import register as _register_auth def _parse_auth(*args: str) -> _argparse.Namespace: """Build an argument parser via register() and parse args.""" root_p = _argparse.ArgumentParser() subs = root_p.add_subparsers(dest="cmd") _register_auth(subs) return root_p.parse_args(["auth", *args]) class TestRegisterFlags: # ── keygen ────────────────────────────────────────────────────────────── def test_keygen_default_json_out_is_false(self) -> None: ns = _parse_auth("keygen", "--hub", "https://localhost:1337") assert ns.json_out is False def test_keygen_json_flag_sets_json_out(self) -> None: ns = _parse_auth("keygen", "--hub", "https://localhost:1337", "--json") assert ns.json_out is True def test_keygen_j_shorthand_sets_json_out(self) -> None: ns = _parse_auth("keygen", "--hub", "https://localhost:1337", "-j") assert ns.json_out is True def test_keygen_force_flag(self) -> None: ns = _parse_auth("keygen", "--hub", "https://localhost:1337", "--force") assert ns.force is True # ── recover ───────────────────────────────────────────────────────────── def test_recover_default_json_out_is_false(self) -> None: ns = _parse_auth("recover", "--hub", "https://localhost:1337") assert ns.json_out is False def test_recover_json_flag_sets_json_out(self) -> None: ns = _parse_auth("recover", "--hub", "https://localhost:1337", "--json") assert ns.json_out is True def test_recover_j_shorthand_sets_json_out(self) -> None: ns = _parse_auth("recover", "--hub", "https://localhost:1337", "-j") assert ns.json_out is True # ── rotate ────────────────────────────────────────────────────────────── def test_rotate_default_json_out_is_false(self) -> None: ns = _parse_auth("rotate") assert ns.json_out is False def test_rotate_json_flag_sets_json_out(self) -> None: ns = _parse_auth("rotate", "--json") assert ns.json_out is True def test_rotate_j_shorthand_sets_json_out(self) -> None: ns = _parse_auth("rotate", "-j") assert ns.json_out is True # ── register ──────────────────────────────────────────────────────────── def test_register_default_json_out_is_false(self) -> None: ns = _parse_auth("register", "--hub", "https://localhost:1337", "--handle", "gabriel") assert ns.json_out is False def test_register_json_flag_sets_json_out(self) -> None: ns = _parse_auth("register", "--hub", "https://localhost:1337", "--handle", "gabriel", "--json") assert ns.json_out is True def test_register_j_shorthand_sets_json_out(self) -> None: ns = _parse_auth("register", "--hub", "https://localhost:1337", "--handle", "gabriel", "-j") assert ns.json_out is True # ── whoami ────────────────────────────────────────────────────────────── def test_whoami_default_json_out_is_false(self) -> None: ns = _parse_auth("whoami") assert ns.json_out is False def test_whoami_json_flag_sets_json_out(self) -> None: ns = _parse_auth("whoami", "--json") assert ns.json_out is True def test_whoami_j_shorthand_sets_json_out(self) -> None: ns = _parse_auth("whoami", "-j") assert ns.json_out is True def test_whoami_all_flag(self) -> None: ns = _parse_auth("whoami", "--all") assert ns.all_hubs is True # ── logout ────────────────────────────────────────────────────────────── def test_logout_default_json_out_is_false(self) -> None: ns = _parse_auth("logout") assert ns.json_out is False def test_logout_json_flag_sets_json_out(self) -> None: ns = _parse_auth("logout", "--json") assert ns.json_out is True def test_logout_j_shorthand_sets_json_out(self) -> None: ns = _parse_auth("logout", "-j") assert ns.json_out is True # ── show ──────────────────────────────────────────────────────────────── def test_show_default_json_out_is_false(self) -> None: ns = _parse_auth("show") assert ns.json_out is False def test_show_json_flag_sets_json_out(self) -> None: ns = _parse_auth("show", "--json") assert ns.json_out is True def test_show_j_shorthand_sets_json_out(self) -> None: ns = _parse_auth("show", "-j") assert ns.json_out is True