test_cmd_auth_hardening.py
python
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
20 days ago
| 1 | """Comprehensive hardening tests for ``muse auth``. |
| 2 | |
| 3 | Coverage |
| 4 | -------- |
| 5 | Unit |
| 6 | - _hub_base_url: scheme validation, port handling, path stripping |
| 7 | - _json_post: scheme guard fires before network, HTTP error, URLError, |
| 8 | oversized response, non-dict response |
| 9 | - _sanitize_token_or_exit: valid token, empty, control char, too long |
| 10 | - _display_entry: JSON schema correctness, all fields present, stderr routing |
| 11 | - _resolve_hub: explicit → config → None |
| 12 | |
| 13 | Integration (real fixture — identity.toml in tmp_path) |
| 14 | - run_whoami: no identity exits nonzero, --json schema correct, --all lists all |
| 15 | - run_logout: missing identity is not an error, --json schema, --all clears all |
| 16 | |
| 17 | Security |
| 18 | - file:// hub URL rejected in _hub_base_url and _json_post |
| 19 | - ftp:// hub URL rejected |
| 20 | - ANSI in hub URL sanitized in error messages |
| 21 | - Token never echoed to stdout |
| 22 | |
| 23 | E2E (via CliRunner + fixture) |
| 24 | - keygen: --json schema when crypto unavailable (ImportError handled gracefully) |
| 25 | - login: --json schema with all required keys |
| 26 | - whoami: --json schema with all required keys, token_set=true |
| 27 | - logout: --json schema ok, --json schema nothing_to_do |
| 28 | - --all flag for whoami and logout |
| 29 | |
| 30 | Stress |
| 31 | - 8 concurrent logins to isolated identity files do not race |
| 32 | """ |
| 33 | |
| 34 | from __future__ import annotations |
| 35 | |
| 36 | import json |
| 37 | import pathlib |
| 38 | import ssl |
| 39 | import threading |
| 40 | import urllib.request |
| 41 | from typing import TYPE_CHECKING |
| 42 | from unittest.mock import MagicMock, patch |
| 43 | |
| 44 | import pytest |
| 45 | |
| 46 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 47 | from muse.core.paths import muse_dir |
| 48 | |
| 49 | if TYPE_CHECKING: |
| 50 | pass |
| 51 | |
| 52 | from muse.cli.commands.auth import ( |
| 53 | _KeygenJson, |
| 54 | _LogoutJson, |
| 55 | _WhoamiJson, |
| 56 | ) |
| 57 | from muse.core.identity import IdentityEntry, save_identity |
| 58 | |
| 59 | cli = None |
| 60 | runner = CliRunner() |
| 61 | |
| 62 | # ── fixture ─────────────────────────────────────────────────────────────────── |
| 63 | |
| 64 | |
| 65 | @pytest.fixture |
| 66 | def identity_dir(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: |
| 67 | """Isolated ~/.muse dir with a fresh identity.toml; no repo needed for auth.""" |
| 68 | muse_home = tmp_path / ".muse" |
| 69 | muse_home.mkdir() |
| 70 | (muse_home / "identity.toml").write_text("") |
| 71 | monkeypatch.setenv("MUSE_HOME", str(muse_home)) |
| 72 | monkeypatch.setattr( |
| 73 | "muse.core.identity._IDENTITY_FILE", |
| 74 | muse_home / "identity.toml", |
| 75 | ) |
| 76 | return muse_home |
| 77 | |
| 78 | |
| 79 | @pytest.fixture |
| 80 | def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: |
| 81 | """Minimal .muse/ repo with identity file, used for CLI E2E tests.""" |
| 82 | from muse._version import __version__ |
| 83 | |
| 84 | dot_muse = muse_dir(tmp_path) |
| 85 | for sub in ("refs/heads", "objects", "commits", "snapshots"): |
| 86 | (dot_muse / sub).mkdir(parents=True, exist_ok=True) |
| 87 | (dot_muse / "repo.json").write_text( |
| 88 | json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"}) |
| 89 | ) |
| 90 | (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") |
| 91 | (dot_muse / "refs" / "heads" / "main").write_text("") |
| 92 | (dot_muse / "config.toml").write_text("") |
| 93 | muse_home = tmp_path / ".muse-home" |
| 94 | muse_home.mkdir() |
| 95 | (muse_home / "identity.toml").write_text("") |
| 96 | monkeypatch.setenv("MUSE_HOME", str(muse_home)) |
| 97 | monkeypatch.setattr( |
| 98 | "muse.core.identity._IDENTITY_FILE", |
| 99 | muse_home / "identity.toml", |
| 100 | ) |
| 101 | monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) |
| 102 | monkeypatch.chdir(tmp_path) |
| 103 | return tmp_path |
| 104 | |
| 105 | |
| 106 | def _json_whoami(result: InvokeResult) -> _WhoamiJson: |
| 107 | for line in result.output.splitlines(): |
| 108 | stripped = line.strip() |
| 109 | if stripped.startswith("{"): |
| 110 | data: _WhoamiJson = json.loads(stripped) |
| 111 | return data |
| 112 | raise ValueError(f"No JSON line in output:\n{result.output!r}") |
| 113 | |
| 114 | |
| 115 | def _json_logout(result: InvokeResult) -> _LogoutJson: |
| 116 | for line in result.output.splitlines(): |
| 117 | stripped = line.strip() |
| 118 | if stripped.startswith("{"): |
| 119 | data: _LogoutJson = json.loads(stripped) |
| 120 | return data |
| 121 | raise ValueError(f"No JSON line in output:\n{result.output!r}") |
| 122 | |
| 123 | |
| 124 | def _json_keygen(result: InvokeResult) -> _KeygenJson: |
| 125 | for line in result.output.splitlines(): |
| 126 | stripped = line.strip() |
| 127 | if stripped.startswith("{"): |
| 128 | data: _KeygenJson = json.loads(stripped) |
| 129 | return data |
| 130 | raise ValueError(f"No JSON line in output:\n{result.output!r}") |
| 131 | |
| 132 | |
| 133 | # ── Unit: _hub_base_url ─────────────────────────────────────────────────────── |
| 134 | |
| 135 | |
| 136 | class TestHubBaseUrl: |
| 137 | def test_https_path_stripped(self) -> None: |
| 138 | from muse.cli.commands.auth import _hub_base_url |
| 139 | assert _hub_base_url("https://musehub.ai/gabriel/muse") == "https://musehub.ai" |
| 140 | |
| 141 | def test_http_localhost_port_preserved(self) -> None: |
| 142 | from muse.cli.commands.auth import _hub_base_url |
| 143 | assert _hub_base_url("https://localhost:1337") == "https://localhost:1337" |
| 144 | |
| 145 | def test_explicit_port_preserved(self) -> None: |
| 146 | from muse.cli.commands.auth import _hub_base_url |
| 147 | assert _hub_base_url("https://hub.example.com:8443/owner/repo") == "https://hub.example.com:8443" |
| 148 | |
| 149 | def test_file_scheme_exits_nonzero(self) -> None: |
| 150 | from muse.cli.commands.auth import _hub_base_url |
| 151 | from muse.core.errors import ExitCode |
| 152 | with pytest.raises(SystemExit) as exc_info: |
| 153 | _hub_base_url("file:///etc/passwd") |
| 154 | assert exc_info.value.code == ExitCode.USER_ERROR |
| 155 | |
| 156 | def test_ftp_scheme_exits_nonzero(self) -> None: |
| 157 | from muse.cli.commands.auth import _hub_base_url |
| 158 | with pytest.raises(SystemExit): |
| 159 | _hub_base_url("ftp://ftp.example.com/repo") |
| 160 | |
| 161 | def test_data_scheme_exits_nonzero(self) -> None: |
| 162 | from muse.cli.commands.auth import _hub_base_url |
| 163 | with pytest.raises(SystemExit): |
| 164 | _hub_base_url("data:text/plain,malicious") |
| 165 | |
| 166 | |
| 167 | # ── Unit: _json_post ────────────────────────────────────────────────────────── |
| 168 | |
| 169 | |
| 170 | class TestJsonPost: |
| 171 | def test_file_scheme_rejected_before_network(self) -> None: |
| 172 | """_json_post must not make a network request for file:// URLs.""" |
| 173 | from muse.cli.commands.auth import _json_post_raw as _json_post |
| 174 | with patch("urllib.request.urlopen") as mock_open: |
| 175 | with pytest.raises(SystemExit): |
| 176 | _json_post("file:///etc/passwd", "/api/test", {}) |
| 177 | mock_open.assert_not_called() |
| 178 | |
| 179 | def test_ftp_scheme_rejected_before_network(self) -> None: |
| 180 | from muse.cli.commands.auth import _json_post_raw as _json_post |
| 181 | with patch("urllib.request.urlopen") as mock_open: |
| 182 | with pytest.raises(SystemExit): |
| 183 | _json_post("ftp://example.com", "/api/test", {}) |
| 184 | mock_open.assert_not_called() |
| 185 | |
| 186 | def test_http_error_exits_user_error(self) -> None: |
| 187 | import urllib.error |
| 188 | from muse.cli.commands.auth import _json_post_raw as _json_post |
| 189 | exc = urllib.error.HTTPError( |
| 190 | url="", code=401, msg="Unauthorized", hdrs=MagicMock(), fp=None |
| 191 | ) |
| 192 | # fp must be None to avoid type conflicts; HTTPError accepts None here |
| 193 | with patch("urllib.request.urlopen", side_effect=exc): |
| 194 | with pytest.raises(SystemExit): |
| 195 | _json_post("http://hub.local", "/api/test", {}) |
| 196 | |
| 197 | def test_url_error_exits_user_error(self) -> None: |
| 198 | import urllib.error |
| 199 | from muse.cli.commands.auth import _json_post_raw as _json_post |
| 200 | exc = urllib.error.URLError(reason="connection refused") |
| 201 | with patch("urllib.request.urlopen", side_effect=exc): |
| 202 | with pytest.raises(SystemExit): |
| 203 | _json_post("http://hub.local", "/api/test", {}) |
| 204 | |
| 205 | def test_oversized_response_exits(self) -> None: |
| 206 | from muse.cli.commands.auth import _MAX_RESPONSE_BYTES, _json_post_raw as _json_post |
| 207 | mock_resp = MagicMock() |
| 208 | mock_resp.__enter__ = lambda s: s |
| 209 | mock_resp.__exit__ = MagicMock(return_value=False) |
| 210 | mock_resp.read.return_value = b"x" * (_MAX_RESPONSE_BYTES + 2) |
| 211 | with patch("urllib.request.urlopen", return_value=mock_resp): |
| 212 | with pytest.raises(SystemExit): |
| 213 | _json_post("http://hub.local", "/api/test", {}) |
| 214 | |
| 215 | def test_non_dict_response_exits(self) -> None: |
| 216 | from muse.cli.commands.auth import _json_post_raw as _json_post |
| 217 | mock_resp = MagicMock() |
| 218 | mock_resp.__enter__ = lambda s: s |
| 219 | mock_resp.__exit__ = MagicMock(return_value=False) |
| 220 | mock_resp.read.return_value = b'["not", "a", "dict"]' |
| 221 | with patch("urllib.request.urlopen", return_value=mock_resp): |
| 222 | with pytest.raises(SystemExit): |
| 223 | _json_post("http://hub.local", "/api/test", {}) |
| 224 | |
| 225 | def test_none_values_stripped_from_payload(self) -> None: |
| 226 | """None-valued keys must not appear in the serialised request body.""" |
| 227 | from muse.cli.commands.auth import _json_post_raw as _json_post |
| 228 | captured: list[bytes] = [] |
| 229 | |
| 230 | def _capture(req: urllib.request.Request, timeout: float, context: ssl.SSLContext | None = None) -> MagicMock: |
| 231 | data: bytes = req.data if isinstance(req.data, bytes) else b"" |
| 232 | captured.append(data) |
| 233 | mock_resp = MagicMock() |
| 234 | mock_resp.__enter__ = lambda s: s |
| 235 | mock_resp.__exit__ = MagicMock(return_value=False) |
| 236 | mock_resp.read.return_value = b'{"ok": true}' |
| 237 | return mock_resp |
| 238 | |
| 239 | with patch("urllib.request.urlopen", side_effect=_capture): |
| 240 | _json_post("http://hub.local", "/test", {"a": "1", "b": None, "c": "3"}) |
| 241 | |
| 242 | sent = json.loads(captured[0]) |
| 243 | assert "b" not in sent |
| 244 | assert sent["a"] == "1" |
| 245 | assert sent["c"] == "3" |
| 246 | |
| 247 | |
| 248 | # ── Unit: _display_entry ────────────────────────────────────────────────────── |
| 249 | |
| 250 | |
| 251 | class TestDisplayEntry: |
| 252 | def _make_entry(self) -> IdentityEntry: |
| 253 | return { |
| 254 | "type": "human", |
| 255 | "handle": "alice", |
| 256 | "algorithm": "ed25519", |
| 257 | "fingerprint": "abc123fingerprint456", |
| 258 | "hd_path": "m/1075233755'/0'/0'/0'/0'/0'", |
| 259 | } |
| 260 | |
| 261 | def test_json_output_all_keys_present(self, capsys: pytest.CaptureFixture[str]) -> None: |
| 262 | from muse.cli.commands.auth import _display_entry |
| 263 | _display_entry("hub.example.com", self._make_entry(), json_output=True) |
| 264 | data = json.loads(capsys.readouterr().out) |
| 265 | for key in ("hub", "type", "handle", "fingerprint", "key_set", "capabilities"): |
| 266 | assert key in data, f"Missing key: {key}" |
| 267 | |
| 268 | def test_json_key_set_true(self, capsys: pytest.CaptureFixture[str]) -> None: |
| 269 | from muse.cli.commands.auth import _display_entry |
| 270 | _display_entry("hub.example.com", self._make_entry(), json_output=True) |
| 271 | data = json.loads(capsys.readouterr().out) |
| 272 | assert data["key_set"] is True |
| 273 | assert isinstance(data["key_set"], bool) |
| 274 | |
| 275 | def test_json_no_key_key_set_false(self, capsys: pytest.CaptureFixture[str]) -> None: |
| 276 | from muse.cli.commands.auth import _display_entry |
| 277 | entry: IdentityEntry = {"type": "agent"} |
| 278 | _display_entry("hub.example.com", entry, json_output=True) |
| 279 | data = json.loads(capsys.readouterr().out) |
| 280 | assert data["key_set"] is False |
| 281 | assert isinstance(data["key_set"], bool) |
| 282 | |
| 283 | def test_json_goes_to_stdout_not_stderr(self, capsys: pytest.CaptureFixture[str]) -> None: |
| 284 | from muse.cli.commands.auth import _display_entry |
| 285 | _display_entry("hub.example.com", self._make_entry(), json_output=True) |
| 286 | captured = capsys.readouterr() |
| 287 | assert captured.out.strip().startswith("{") |
| 288 | |
| 289 | def test_text_goes_to_stderr(self, capsys: pytest.CaptureFixture[str]) -> None: |
| 290 | from muse.cli.commands.auth import _display_entry |
| 291 | _display_entry("hub.example.com", self._make_entry(), json_output=False) |
| 292 | captured = capsys.readouterr() |
| 293 | assert captured.out.strip() == "" |
| 294 | assert "hub.example.com" in captured.err |
| 295 | |
| 296 | def test_ansi_in_hostname_stripped_in_text_mode(self, capsys: pytest.CaptureFixture[str]) -> None: |
| 297 | from muse.cli.commands.auth import _display_entry |
| 298 | _display_entry("\x1b[31mmalicious\x1b[0m", self._make_entry(), json_output=False) |
| 299 | assert "\x1b[" not in capsys.readouterr().err |
| 300 | |
| 301 | def test_capabilities_list_in_json(self, capsys: pytest.CaptureFixture[str]) -> None: |
| 302 | from muse.cli.commands.auth import _display_entry |
| 303 | entry: IdentityEntry = {"type": "agent", "capabilities": ["push", "pull"]} |
| 304 | _display_entry("hub.example.com", entry, json_output=True) |
| 305 | data = json.loads(capsys.readouterr().out) |
| 306 | assert data["capabilities"] == ["push", "pull"] |
| 307 | |
| 308 | |
| 309 | # ── Integration: run_whoami ─────────────────────────────────────────────────── |
| 310 | |
| 311 | |
| 312 | class TestWhoamiHardening: |
| 313 | _HUB = "http://localhost:19999" |
| 314 | |
| 315 | def _store(self, repo: pathlib.Path, *, handle: str = "alice") -> None: |
| 316 | save_identity(self._HUB, { |
| 317 | "type": "human", |
| 318 | "handle": handle, |
| 319 | "algorithm": "ed25519", |
| 320 | "fingerprint": "fp123", |
| 321 | "hd_path": "m/1075233755'/0'/0'/0'/0'/0'", |
| 322 | }) |
| 323 | |
| 324 | def test_no_identity_exits_nonzero(self, repo: pathlib.Path) -> None: |
| 325 | result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB]) |
| 326 | assert result.exit_code != 0 |
| 327 | |
| 328 | def test_whoami_json_schema(self, repo: pathlib.Path) -> None: |
| 329 | self._store(repo) |
| 330 | result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB, "--json"]) |
| 331 | assert result.exit_code == 0 |
| 332 | data = _json_whoami(result) |
| 333 | for key in ("hub", "type", "handle", "fingerprint", "key_set", "capabilities"): |
| 334 | assert key in data, f"Missing key: {key}" |
| 335 | assert data["key_set"] is True |
| 336 | assert isinstance(data["key_set"], bool) |
| 337 | assert data["handle"] == "alice" |
| 338 | |
| 339 | def test_whoami_json_stdout_clean(self, repo: pathlib.Path) -> None: |
| 340 | self._store(repo) |
| 341 | result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB, "--json"]) |
| 342 | assert result.exit_code == 0 |
| 343 | json_lines = [l for l in result.output.splitlines() if l.strip().startswith("{")] |
| 344 | assert len(json_lines) >= 1 |
| 345 | |
| 346 | def test_whoami_all_lists_multiple(self, repo: pathlib.Path) -> None: |
| 347 | hub2 = "http://localhost:20000" |
| 348 | save_identity(self._HUB, {"type": "human", "handle": "alice"}) |
| 349 | save_identity(hub2, {"type": "agent", "handle": "bot"}) |
| 350 | result = runner.invoke(cli, ["auth", "whoami", "--all", "--json"]) |
| 351 | assert result.exit_code == 0 |
| 352 | parsed = json.loads(result.output) |
| 353 | assert isinstance(parsed["identities"], list) |
| 354 | assert len(parsed["identities"]) == 2 |
| 355 | |
| 356 | def test_whoami_all_empty_exits_nonzero(self, repo: pathlib.Path) -> None: |
| 357 | result = runner.invoke(cli, ["auth", "whoami", "--all"]) |
| 358 | assert result.exit_code != 0 |
| 359 | |
| 360 | def test_whoami_no_hub_exits_nonzero(self, repo: pathlib.Path) -> None: |
| 361 | result = runner.invoke(cli, ["auth", "whoami"]) |
| 362 | assert result.exit_code != 0 |
| 363 | |
| 364 | def test_whoami_all_json_is_single_array(self, repo: pathlib.Path) -> None: |
| 365 | hub2 = "http://localhost:20000" |
| 366 | save_identity(self._HUB, {"type": "human", "handle": "alice"}) |
| 367 | save_identity(hub2, {"type": "agent", "handle": "bot"}) |
| 368 | result = runner.invoke(cli, ["auth", "whoami", "--all", "--json"]) |
| 369 | assert result.exit_code == 0 |
| 370 | parsed = json.loads(result.output) |
| 371 | assert isinstance(parsed["identities"], list) |
| 372 | assert len(parsed["identities"]) == 2 |
| 373 | |
| 374 | def test_whoami_key_set_is_bool(self, repo: pathlib.Path) -> None: |
| 375 | self._store(repo) |
| 376 | result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB, "--json"]) |
| 377 | assert result.exit_code == 0 |
| 378 | raw = result.output |
| 379 | assert '"key_set": true' in raw or '"key_set":true' in raw |
| 380 | assert '"key_set": "true"' not in raw |
| 381 | |
| 382 | def test_whoami_short_j_flag(self, repo: pathlib.Path) -> None: |
| 383 | self._store(repo) |
| 384 | result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB, "-j"]) |
| 385 | assert result.exit_code == 0 |
| 386 | json.loads(result.output) |
| 387 | |
| 388 | def test_whoami_short_a_flag(self, repo: pathlib.Path) -> None: |
| 389 | self._store(repo) |
| 390 | result = runner.invoke(cli, ["auth", "whoami", "-a"]) |
| 391 | assert result.exit_code == 0 |
| 392 | assert "localhost" in result.stderr |
| 393 | |
| 394 | |
| 395 | # ── Integration: run_logout ─────────────────────────────────────────────────── |
| 396 | |
| 397 | |
| 398 | class TestLogoutHardening: |
| 399 | _HUB = "http://localhost:19999" |
| 400 | |
| 401 | def _store(self, repo: pathlib.Path) -> None: |
| 402 | save_identity(self._HUB, {"type": "human", "handle": "alice"}) |
| 403 | |
| 404 | def test_logout_success_json_schema(self, repo: pathlib.Path) -> None: |
| 405 | self._store(repo) |
| 406 | result = runner.invoke( |
| 407 | cli, ["auth", "logout", "--hub", self._HUB, "--json"] |
| 408 | ) |
| 409 | assert result.exit_code == 0 |
| 410 | data = _json_logout(result) |
| 411 | assert data["status"] == "ok" |
| 412 | assert data["count"] == 1 |
| 413 | assert isinstance(data["hubs"], list) |
| 414 | assert len(data["hubs"]) == 1 |
| 415 | |
| 416 | def test_logout_nothing_to_do_json(self, repo: pathlib.Path) -> None: |
| 417 | result = runner.invoke( |
| 418 | cli, ["auth", "logout", "--hub", self._HUB, "--json"] |
| 419 | ) |
| 420 | assert result.exit_code == 0 |
| 421 | data = _json_logout(result) |
| 422 | assert data["status"] == "nothing_to_do" |
| 423 | assert data["count"] == 0 |
| 424 | assert data["hubs"] == [] |
| 425 | |
| 426 | def test_logout_all_clears_all(self, repo: pathlib.Path) -> None: |
| 427 | hub2 = "http://localhost:20000" |
| 428 | save_identity(self._HUB, {"type": "human", "handle": "a"}) |
| 429 | save_identity(hub2, {"type": "agent", "handle": "b"}) |
| 430 | result = runner.invoke(cli, ["auth", "logout", "--all", "--json"]) |
| 431 | assert result.exit_code == 0 |
| 432 | data = _json_logout(result) |
| 433 | assert data["status"] == "ok" |
| 434 | assert data["count"] == 2 |
| 435 | |
| 436 | def test_logout_all_empty_json(self, repo: pathlib.Path) -> None: |
| 437 | result = runner.invoke(cli, ["auth", "logout", "--all", "--json"]) |
| 438 | assert result.exit_code == 0 |
| 439 | data = _json_logout(result) |
| 440 | assert data["status"] == "nothing_to_do" |
| 441 | assert data["count"] == 0 |
| 442 | |
| 443 | def test_logout_removes_identity(self, repo: pathlib.Path) -> None: |
| 444 | self._store(repo) |
| 445 | runner.invoke(cli, ["auth", "logout", "--hub", self._HUB]) |
| 446 | from muse.core.identity import load_identity |
| 447 | assert load_identity(self._HUB) is None |
| 448 | |
| 449 | def test_logout_json_stdout_clean(self, repo: pathlib.Path) -> None: |
| 450 | self._store(repo) |
| 451 | result = runner.invoke(cli, ["auth", "logout", "--hub", self._HUB, "--json"]) |
| 452 | assert result.exit_code == 0 |
| 453 | for line in result.output.splitlines(): |
| 454 | stripped = line.strip() |
| 455 | if stripped: |
| 456 | assert stripped.startswith("{") or stripped.startswith('"') \ |
| 457 | or stripped.startswith("}"), f"Non-JSON on stdout: {stripped!r}" |
| 458 | |
| 459 | def test_logout_json_hubs_list_contains_hostname(self, repo: pathlib.Path) -> None: |
| 460 | """JSON output 'hubs' list contains the normalised hostname.""" |
| 461 | self._store(repo) |
| 462 | result = runner.invoke( |
| 463 | cli, ["auth", "logout", "--hub", self._HUB, "--json"] |
| 464 | ) |
| 465 | assert result.exit_code == 0 |
| 466 | data = _json_logout(result) |
| 467 | assert "hubs" in data |
| 468 | assert len(data["hubs"]) == 1 |
| 469 | assert "localhost" in data["hubs"][0] |
| 470 | |
| 471 | def test_logout_all_json_hubs_sorted(self, repo: pathlib.Path) -> None: |
| 472 | """--all --json hubs list is alphabetically sorted.""" |
| 473 | hubs = [ |
| 474 | "http://z-hub.example.com", |
| 475 | "http://a-hub.example.com", |
| 476 | "http://m-hub.example.com", |
| 477 | ] |
| 478 | for h in hubs: |
| 479 | save_identity(h, {"type": "human", "handle": "u"}) |
| 480 | result = runner.invoke(cli, ["auth", "logout", "--all", "--json"]) |
| 481 | assert result.exit_code == 0 |
| 482 | data = _json_logout(result) |
| 483 | assert data["hubs"] == sorted(data["hubs"]), "hubs not sorted" |
| 484 | assert data["count"] == 3 |
| 485 | |
| 486 | def test_logout_idempotent_second_call(self, repo: pathlib.Path) -> None: |
| 487 | """Logging out twice succeeds both times (second call is nothing_to_do).""" |
| 488 | self._store(repo) |
| 489 | r1 = runner.invoke(cli, ["auth", "logout", "--hub", self._HUB, "--json"]) |
| 490 | r2 = runner.invoke(cli, ["auth", "logout", "--hub", self._HUB, "--json"]) |
| 491 | assert r1.exit_code == 0 |
| 492 | assert r2.exit_code == 0 |
| 493 | d1 = _json_logout(r1) |
| 494 | d2 = _json_logout(r2) |
| 495 | assert d1["status"] == "ok" |
| 496 | assert d2["status"] == "nothing_to_do" |
| 497 | |
| 498 | def test_logout_preserves_sibling_hub(self, repo: pathlib.Path) -> None: |
| 499 | """Logging out from one hub must not remove a different hub's identity.""" |
| 500 | hub2 = "http://sibling.example.com" |
| 501 | self._store(repo) |
| 502 | save_identity(hub2, {"type": "agent", "handle": "bot"}) |
| 503 | runner.invoke(cli, ["auth", "logout", "--hub", self._HUB]) |
| 504 | from muse.core.identity import load_identity |
| 505 | assert load_identity(self._HUB) is None |
| 506 | assert load_identity(hub2) is not None |
| 507 | |
| 508 | def test_logout_short_flags(self, repo: pathlib.Path) -> None: |
| 509 | """-j and -a short flags work identically to --json and --all.""" |
| 510 | hub2 = "http://shortflag.example.com" |
| 511 | self._store(repo) |
| 512 | save_identity(hub2, {"type": "agent", "handle": "bot"}) |
| 513 | result = runner.invoke(cli, ["auth", "logout", "-a", "-j"]) |
| 514 | assert result.exit_code == 0 |
| 515 | data = _json_logout(result) |
| 516 | assert data["status"] == "ok" |
| 517 | assert data["count"] == 2 |
| 518 | |
| 519 | def test_logout_all_single_write(self, repo: pathlib.Path) -> None: |
| 520 | """clear_all_identities is called exactly once for --all (not N times).""" |
| 521 | import unittest.mock |
| 522 | hubs = [f"http://hub{i}.example.com" for i in range(5)] |
| 523 | for h in hubs: |
| 524 | save_identity(h, {"type": "human", "handle": "u"}) |
| 525 | with unittest.mock.patch( |
| 526 | "muse.core.identity._save_all", wraps=__import__("muse.core.identity", fromlist=["_save_all"])._save_all |
| 527 | ) as mock_save: |
| 528 | result = runner.invoke(cli, ["auth", "logout", "--all", "--json"]) |
| 529 | assert result.exit_code == 0 |
| 530 | # _save_all must be called exactly once for all 5 hubs |
| 531 | assert mock_save.call_count == 1 |
| 532 | |
| 533 | def test_logout_ansi_in_hub_url_sanitized(self, repo: pathlib.Path) -> None: |
| 534 | """ANSI codes injected into a hub URL display name are stripped in text output.""" |
| 535 | import unittest.mock |
| 536 | ansi_hostname = "\x1b[31mattacker.example.com\x1b[0m" |
| 537 | malicious_entry: IdentityEntry = {"type": "human", "handle": "eve"} |
| 538 | with unittest.mock.patch( |
| 539 | "muse.core.identity._load_all", |
| 540 | return_value={ansi_hostname: malicious_entry}, |
| 541 | ): |
| 542 | # Provide a matching --hub so logout finds it |
| 543 | result = runner.invoke( |
| 544 | cli, |
| 545 | ["auth", "logout", "--hub", ansi_hostname], |
| 546 | ) |
| 547 | # Output (stderr) must not contain raw ESC bytes |
| 548 | assert "\x1b" not in result.output, "ANSI escape leaked to output" |
| 549 | |
| 550 | |
| 551 | # ── Security ────────────────────────────────────────────────────────────────── |
| 552 | |
| 553 | |
| 554 | class TestAuthSecurity: |
| 555 | def test_json_post_file_scheme_blocked(self) -> None: |
| 556 | from muse.cli.commands.auth import _json_post_raw as _json_post |
| 557 | with patch("urllib.request.urlopen") as mock_net: |
| 558 | with pytest.raises(SystemExit): |
| 559 | _json_post("file:///etc/passwd", "/api/test", {}) |
| 560 | mock_net.assert_not_called() |
| 561 | |
| 562 | def test_hub_base_url_file_scheme_does_not_open_file(self) -> None: |
| 563 | from muse.cli.commands.auth import _hub_base_url |
| 564 | with patch("builtins.open") as mock_open: |
| 565 | with pytest.raises(SystemExit): |
| 566 | _hub_base_url("file:///etc/shadow") |
| 567 | mock_open.assert_not_called() |
| 568 | |
| 569 | |
| 570 | # ── E2E: keygen with unavailable crypto ────────────────────────────────────── |
| 571 | |
| 572 | |
| 573 | class TestKeygenCLI: |
| 574 | def test_keygen_no_hub_exits_nonzero(self, repo: pathlib.Path) -> None: |
| 575 | result = runner.invoke(cli, ["auth", "keygen"]) |
| 576 | assert result.exit_code != 0 |
| 577 | |
| 578 | def test_keygen_invalid_scheme_exits(self, repo: pathlib.Path) -> None: |
| 579 | result = runner.invoke( |
| 580 | cli, ["auth", "keygen", "--hub", "ftp://attacker.example.com"] |
| 581 | ) |
| 582 | # May fail at scheme validation or at key generation |
| 583 | assert result.exit_code != 0 |
| 584 | |
| 585 | |
| 586 | # ── E2E: whoami capabilities field ─────────────────────────────────────────── |
| 587 | |
| 588 | |
| 589 | class TestWhoamiCapabilities: |
| 590 | _HUB = "http://localhost:19999" |
| 591 | |
| 592 | def test_capabilities_empty_list_in_json(self, repo: pathlib.Path) -> None: |
| 593 | save_identity(self._HUB, {"type": "human", "handle": "alice"}) |
| 594 | result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB, "--json"]) |
| 595 | assert result.exit_code == 0 |
| 596 | data = _json_whoami(result) |
| 597 | assert isinstance(data["capabilities"], list) |
| 598 | |
| 599 | |
| 600 | # ── Stress: concurrent logins ───────────────────────────────────────────────── |
| 601 | |
| 602 | |
| 603 | class TestStressConcurrent: |
| 604 | def test_8_sequential_saves_to_isolated_files_correct( |
| 605 | self, tmp_path: pathlib.Path |
| 606 | ) -> None: |
| 607 | """Sequential save→load round-trips to isolated identity files produce correct data. |
| 608 | |
| 609 | This validates the save/load logic is correct before layering concurrency. |
| 610 | Module-level patching is not thread-safe, so isolation is tested sequentially. |
| 611 | """ |
| 612 | import muse.core.identity as _id_mod |
| 613 | orig_file = _id_mod._IDENTITY_FILE |
| 614 | |
| 615 | for idx in range(8): |
| 616 | identity_file = tmp_path / f"identity_{idx}.toml" |
| 617 | identity_file.write_text("") |
| 618 | hub = f"http://localhost:{19000 + idx}" |
| 619 | |
| 620 | _id_mod._IDENTITY_FILE = identity_file |
| 621 | try: |
| 622 | from muse.core.identity import save_identity, load_identity, IdentityEntry |
| 623 | entry: IdentityEntry = {"type": "agent", "handle": f"agent-{idx}"} |
| 624 | save_identity(hub, entry) |
| 625 | stored = load_identity(hub) |
| 626 | assert stored is not None, f"Identity not stored for {hub}" |
| 627 | assert stored["handle"] == f"agent-{idx}" |
| 628 | finally: |
| 629 | _id_mod._IDENTITY_FILE = orig_file |
| 630 | |
| 631 | def test_8_concurrent_logins_shared_identity_file( |
| 632 | self, tmp_path: pathlib.Path |
| 633 | ) -> None: |
| 634 | """8 threads writing different hubs to the same identity file. |
| 635 | |
| 636 | The advisory lock in ``_identity_write_lock()`` must prevent |
| 637 | read-modify-write races. All 8 entries must survive after all threads |
| 638 | complete. |
| 639 | |
| 640 | We patch _IDENTITY_FILE and _IDENTITY_DIR *once* before threads start |
| 641 | to avoid the non-thread-safe module-attribute race that would arise if |
| 642 | each thread patched them independently. |
| 643 | """ |
| 644 | import muse.core.identity as _id_mod |
| 645 | |
| 646 | identity_dir = tmp_path / "muse-home" |
| 647 | identity_dir.mkdir() |
| 648 | identity_file = identity_dir / "identity.toml" |
| 649 | identity_file.write_text("") |
| 650 | |
| 651 | orig_file = _id_mod._IDENTITY_FILE |
| 652 | orig_dir = _id_mod._IDENTITY_DIR |
| 653 | _id_mod._IDENTITY_FILE = identity_file |
| 654 | _id_mod._IDENTITY_DIR = identity_dir |
| 655 | |
| 656 | errors: list[str] = [] |
| 657 | |
| 658 | def _do(idx: int) -> None: |
| 659 | try: |
| 660 | hub = f"http://localhost:{19000 + idx}" |
| 661 | from muse.core.identity import save_identity, IdentityEntry |
| 662 | entry: IdentityEntry = {"type": "human", "handle": f"user-{idx:02d}"} |
| 663 | save_identity(hub, entry) |
| 664 | except Exception as exc: |
| 665 | errors.append(f"Thread {idx}: {exc}") |
| 666 | |
| 667 | threads = [threading.Thread(target=_do, args=(i,)) for i in range(8)] |
| 668 | for t in threads: |
| 669 | t.start() |
| 670 | for t in threads: |
| 671 | t.join() |
| 672 | |
| 673 | try: |
| 674 | from muse.core.identity import list_all_identities |
| 675 | all_ids = list_all_identities() |
| 676 | finally: |
| 677 | _id_mod._IDENTITY_FILE = orig_file |
| 678 | _id_mod._IDENTITY_DIR = orig_dir |
| 679 | |
| 680 | assert errors == [], f"Concurrent shared identity file failures:\n{'\n'.join(errors)}" |
| 681 | assert len(all_ids) == 8, f"Expected 8 identities, got {len(all_ids)}: {list(all_ids)}" |
| 682 | |
| 683 | def test_16_sequential_logins_all_survive( |
| 684 | self, tmp_path: pathlib.Path |
| 685 | ) -> None: |
| 686 | """16 sequential logins to distinct hubs all persist correctly. |
| 687 | |
| 688 | Validates that repeated save→load round-trips don't corrupt the TOML |
| 689 | file and that all entries coexist after the final write. |
| 690 | """ |
| 691 | import muse.core.identity as _id_mod |
| 692 | |
| 693 | identity_dir = tmp_path / "muse-stress" |
| 694 | identity_dir.mkdir() |
| 695 | identity_file = identity_dir / "identity.toml" |
| 696 | identity_file.write_text("") |
| 697 | |
| 698 | orig_file = _id_mod._IDENTITY_FILE |
| 699 | orig_dir = _id_mod._IDENTITY_DIR |
| 700 | _id_mod._IDENTITY_FILE = identity_file |
| 701 | _id_mod._IDENTITY_DIR = identity_dir |
| 702 | |
| 703 | try: |
| 704 | from muse.core.identity import save_identity, load_identity, IdentityEntry |
| 705 | for idx in range(16): |
| 706 | hub = f"http://localhost:{21000 + idx}" |
| 707 | entry: IdentityEntry = { |
| 708 | "type": "agent", |
| 709 | "handle": f"agent-{idx:02d}", |
| 710 | } |
| 711 | save_identity(hub, entry) |
| 712 | |
| 713 | from muse.core.identity import list_all_identities |
| 714 | all_ids = list_all_identities() |
| 715 | assert len(all_ids) == 16, f"Expected 16, got {len(all_ids)}" |
| 716 | |
| 717 | for idx in (0, 7, 15): |
| 718 | hub = f"http://localhost:{21000 + idx}" |
| 719 | stored = load_identity(hub) |
| 720 | assert stored is not None |
| 721 | assert stored["handle"] == f"agent-{idx:02d}" |
| 722 | finally: |
| 723 | _id_mod._IDENTITY_FILE = orig_file |
| 724 | _id_mod._IDENTITY_DIR = orig_dir |
| 725 | |
| 726 | |
| 727 | # --------------------------------------------------------------------------- |
| 728 | # Flag registration tests |
| 729 | # --------------------------------------------------------------------------- |
| 730 | |
| 731 | import argparse as _argparse |
| 732 | from muse.cli.commands.auth import register as _register_auth |
| 733 | |
| 734 | |
| 735 | def _parse_auth(*args: str) -> _argparse.Namespace: |
| 736 | """Build an argument parser via register() and parse args.""" |
| 737 | root_p = _argparse.ArgumentParser() |
| 738 | subs = root_p.add_subparsers(dest="cmd") |
| 739 | _register_auth(subs) |
| 740 | return root_p.parse_args(["auth", *args]) |
| 741 | |
| 742 | |
| 743 | class TestRegisterFlags: |
| 744 | # ── keygen ────────────────────────────────────────────────────────────── |
| 745 | def test_keygen_default_json_out_is_false(self) -> None: |
| 746 | ns = _parse_auth("keygen", "--hub", "https://localhost:1337") |
| 747 | assert ns.json_out is False |
| 748 | |
| 749 | def test_keygen_json_flag_sets_json_out(self) -> None: |
| 750 | ns = _parse_auth("keygen", "--hub", "https://localhost:1337", "--json") |
| 751 | assert ns.json_out is True |
| 752 | |
| 753 | def test_keygen_j_shorthand_sets_json_out(self) -> None: |
| 754 | ns = _parse_auth("keygen", "--hub", "https://localhost:1337", "-j") |
| 755 | assert ns.json_out is True |
| 756 | |
| 757 | def test_keygen_force_flag(self) -> None: |
| 758 | ns = _parse_auth("keygen", "--hub", "https://localhost:1337", "--force") |
| 759 | assert ns.force is True |
| 760 | |
| 761 | # ── recover ───────────────────────────────────────────────────────────── |
| 762 | def test_recover_default_json_out_is_false(self) -> None: |
| 763 | ns = _parse_auth("recover", "--hub", "https://localhost:1337") |
| 764 | assert ns.json_out is False |
| 765 | |
| 766 | def test_recover_json_flag_sets_json_out(self) -> None: |
| 767 | ns = _parse_auth("recover", "--hub", "https://localhost:1337", "--json") |
| 768 | assert ns.json_out is True |
| 769 | |
| 770 | def test_recover_j_shorthand_sets_json_out(self) -> None: |
| 771 | ns = _parse_auth("recover", "--hub", "https://localhost:1337", "-j") |
| 772 | assert ns.json_out is True |
| 773 | |
| 774 | # ── rotate ────────────────────────────────────────────────────────────── |
| 775 | def test_rotate_default_json_out_is_false(self) -> None: |
| 776 | ns = _parse_auth("rotate") |
| 777 | assert ns.json_out is False |
| 778 | |
| 779 | def test_rotate_json_flag_sets_json_out(self) -> None: |
| 780 | ns = _parse_auth("rotate", "--json") |
| 781 | assert ns.json_out is True |
| 782 | |
| 783 | def test_rotate_j_shorthand_sets_json_out(self) -> None: |
| 784 | ns = _parse_auth("rotate", "-j") |
| 785 | assert ns.json_out is True |
| 786 | |
| 787 | # ── register ──────────────────────────────────────────────────────────── |
| 788 | def test_register_default_json_out_is_false(self) -> None: |
| 789 | ns = _parse_auth("register", "--hub", "https://localhost:1337", "--handle", "gabriel") |
| 790 | assert ns.json_out is False |
| 791 | |
| 792 | def test_register_json_flag_sets_json_out(self) -> None: |
| 793 | ns = _parse_auth("register", "--hub", "https://localhost:1337", "--handle", "gabriel", "--json") |
| 794 | assert ns.json_out is True |
| 795 | |
| 796 | def test_register_j_shorthand_sets_json_out(self) -> None: |
| 797 | ns = _parse_auth("register", "--hub", "https://localhost:1337", "--handle", "gabriel", "-j") |
| 798 | assert ns.json_out is True |
| 799 | |
| 800 | # ── whoami ────────────────────────────────────────────────────────────── |
| 801 | def test_whoami_default_json_out_is_false(self) -> None: |
| 802 | ns = _parse_auth("whoami") |
| 803 | assert ns.json_out is False |
| 804 | |
| 805 | def test_whoami_json_flag_sets_json_out(self) -> None: |
| 806 | ns = _parse_auth("whoami", "--json") |
| 807 | assert ns.json_out is True |
| 808 | |
| 809 | def test_whoami_j_shorthand_sets_json_out(self) -> None: |
| 810 | ns = _parse_auth("whoami", "-j") |
| 811 | assert ns.json_out is True |
| 812 | |
| 813 | def test_whoami_all_flag(self) -> None: |
| 814 | ns = _parse_auth("whoami", "--all") |
| 815 | assert ns.all_hubs is True |
| 816 | |
| 817 | # ── logout ────────────────────────────────────────────────────────────── |
| 818 | def test_logout_default_json_out_is_false(self) -> None: |
| 819 | ns = _parse_auth("logout") |
| 820 | assert ns.json_out is False |
| 821 | |
| 822 | def test_logout_json_flag_sets_json_out(self) -> None: |
| 823 | ns = _parse_auth("logout", "--json") |
| 824 | assert ns.json_out is True |
| 825 | |
| 826 | def test_logout_j_shorthand_sets_json_out(self) -> None: |
| 827 | ns = _parse_auth("logout", "-j") |
| 828 | assert ns.json_out is True |
| 829 | |
| 830 | # ── show ──────────────────────────────────────────────────────────────── |
| 831 | def test_show_default_json_out_is_false(self) -> None: |
| 832 | ns = _parse_auth("show") |
| 833 | assert ns.json_out is False |
| 834 | |
| 835 | def test_show_json_flag_sets_json_out(self) -> None: |
| 836 | ns = _parse_auth("show", "--json") |
| 837 | assert ns.json_out is True |
| 838 | |
| 839 | def test_show_j_shorthand_sets_json_out(self) -> None: |
| 840 | ns = _parse_auth("show", "-j") |
| 841 | assert ns.json_out is True |
File History
4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
20 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e
fix: rename objects→blobs in push client and all stale test…
Sonnet 4.6
patch
22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a
fix: repair four test failures from post-migration audit
Sonnet 4.6
patch
28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠
28 days ago