gabriel / muse public
test_cmd_auth_hardening.py python
841 lines 35.7 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 20 days ago
1 """Comprehensive hardening tests for ``muse auth``.
2
3 Coverage
4 --------
5 Unit
6 - _hub_base_url: scheme validation, port handling, path stripping
7 - _json_post: scheme guard fires before network, HTTP error, URLError,
8 oversized response, non-dict response
9 - _sanitize_token_or_exit: valid token, empty, control char, too long
10 - _display_entry: JSON schema correctness, all fields present, stderr routing
11 - _resolve_hub: explicit → config → None
12
13 Integration (real fixture — identity.toml in tmp_path)
14 - run_whoami: no identity exits nonzero, --json schema correct, --all lists all
15 - run_logout: missing identity is not an error, --json schema, --all clears all
16
17 Security
18 - file:// hub URL rejected in _hub_base_url and _json_post
19 - ftp:// hub URL rejected
20 - ANSI in hub URL sanitized in error messages
21 - Token never echoed to stdout
22
23 E2E (via CliRunner + fixture)
24 - keygen: --json schema when crypto unavailable (ImportError handled gracefully)
25 - login: --json schema with all required keys
26 - whoami: --json schema with all required keys, token_set=true
27 - logout: --json schema ok, --json schema nothing_to_do
28 - --all flag for whoami and logout
29
30 Stress
31 - 8 concurrent logins to isolated identity files do not race
32 """
33
34 from __future__ import annotations
35
36 import json
37 import pathlib
38 import ssl
39 import threading
40 import urllib.request
41 from typing import TYPE_CHECKING
42 from unittest.mock import MagicMock, patch
43
44 import pytest
45
46 from tests.cli_test_helper import CliRunner, InvokeResult
47 from muse.core.paths import muse_dir
48
49 if TYPE_CHECKING:
50 pass
51
52 from muse.cli.commands.auth import (
53 _KeygenJson,
54 _LogoutJson,
55 _WhoamiJson,
56 )
57 from muse.core.identity import IdentityEntry, save_identity
58
59 cli = None
60 runner = CliRunner()
61
62 # ── fixture ───────────────────────────────────────────────────────────────────
63
64
65 @pytest.fixture
66 def identity_dir(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
67 """Isolated ~/.muse dir with a fresh identity.toml; no repo needed for auth."""
68 muse_home = tmp_path / ".muse"
69 muse_home.mkdir()
70 (muse_home / "identity.toml").write_text("")
71 monkeypatch.setenv("MUSE_HOME", str(muse_home))
72 monkeypatch.setattr(
73 "muse.core.identity._IDENTITY_FILE",
74 muse_home / "identity.toml",
75 )
76 return muse_home
77
78
79 @pytest.fixture
80 def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
81 """Minimal .muse/ repo with identity file, used for CLI E2E tests."""
82 from muse._version import __version__
83
84 dot_muse = muse_dir(tmp_path)
85 for sub in ("refs/heads", "objects", "commits", "snapshots"):
86 (dot_muse / sub).mkdir(parents=True, exist_ok=True)
87 (dot_muse / "repo.json").write_text(
88 json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"})
89 )
90 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n")
91 (dot_muse / "refs" / "heads" / "main").write_text("")
92 (dot_muse / "config.toml").write_text("")
93 muse_home = tmp_path / ".muse-home"
94 muse_home.mkdir()
95 (muse_home / "identity.toml").write_text("")
96 monkeypatch.setenv("MUSE_HOME", str(muse_home))
97 monkeypatch.setattr(
98 "muse.core.identity._IDENTITY_FILE",
99 muse_home / "identity.toml",
100 )
101 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
102 monkeypatch.chdir(tmp_path)
103 return tmp_path
104
105
106 def _json_whoami(result: InvokeResult) -> _WhoamiJson:
107 for line in result.output.splitlines():
108 stripped = line.strip()
109 if stripped.startswith("{"):
110 data: _WhoamiJson = json.loads(stripped)
111 return data
112 raise ValueError(f"No JSON line in output:\n{result.output!r}")
113
114
115 def _json_logout(result: InvokeResult) -> _LogoutJson:
116 for line in result.output.splitlines():
117 stripped = line.strip()
118 if stripped.startswith("{"):
119 data: _LogoutJson = json.loads(stripped)
120 return data
121 raise ValueError(f"No JSON line in output:\n{result.output!r}")
122
123
124 def _json_keygen(result: InvokeResult) -> _KeygenJson:
125 for line in result.output.splitlines():
126 stripped = line.strip()
127 if stripped.startswith("{"):
128 data: _KeygenJson = json.loads(stripped)
129 return data
130 raise ValueError(f"No JSON line in output:\n{result.output!r}")
131
132
133 # ── Unit: _hub_base_url ───────────────────────────────────────────────────────
134
135
136 class TestHubBaseUrl:
137 def test_https_path_stripped(self) -> None:
138 from muse.cli.commands.auth import _hub_base_url
139 assert _hub_base_url("https://musehub.ai/gabriel/muse") == "https://musehub.ai"
140
141 def test_http_localhost_port_preserved(self) -> None:
142 from muse.cli.commands.auth import _hub_base_url
143 assert _hub_base_url("https://localhost:1337") == "https://localhost:1337"
144
145 def test_explicit_port_preserved(self) -> None:
146 from muse.cli.commands.auth import _hub_base_url
147 assert _hub_base_url("https://hub.example.com:8443/owner/repo") == "https://hub.example.com:8443"
148
149 def test_file_scheme_exits_nonzero(self) -> None:
150 from muse.cli.commands.auth import _hub_base_url
151 from muse.core.errors import ExitCode
152 with pytest.raises(SystemExit) as exc_info:
153 _hub_base_url("file:///etc/passwd")
154 assert exc_info.value.code == ExitCode.USER_ERROR
155
156 def test_ftp_scheme_exits_nonzero(self) -> None:
157 from muse.cli.commands.auth import _hub_base_url
158 with pytest.raises(SystemExit):
159 _hub_base_url("ftp://ftp.example.com/repo")
160
161 def test_data_scheme_exits_nonzero(self) -> None:
162 from muse.cli.commands.auth import _hub_base_url
163 with pytest.raises(SystemExit):
164 _hub_base_url("data:text/plain,malicious")
165
166
167 # ── Unit: _json_post ──────────────────────────────────────────────────────────
168
169
170 class TestJsonPost:
171 def test_file_scheme_rejected_before_network(self) -> None:
172 """_json_post must not make a network request for file:// URLs."""
173 from muse.cli.commands.auth import _json_post_raw as _json_post
174 with patch("urllib.request.urlopen") as mock_open:
175 with pytest.raises(SystemExit):
176 _json_post("file:///etc/passwd", "/api/test", {})
177 mock_open.assert_not_called()
178
179 def test_ftp_scheme_rejected_before_network(self) -> None:
180 from muse.cli.commands.auth import _json_post_raw as _json_post
181 with patch("urllib.request.urlopen") as mock_open:
182 with pytest.raises(SystemExit):
183 _json_post("ftp://example.com", "/api/test", {})
184 mock_open.assert_not_called()
185
186 def test_http_error_exits_user_error(self) -> None:
187 import urllib.error
188 from muse.cli.commands.auth import _json_post_raw as _json_post
189 exc = urllib.error.HTTPError(
190 url="", code=401, msg="Unauthorized", hdrs=MagicMock(), fp=None
191 )
192 # fp must be None to avoid type conflicts; HTTPError accepts None here
193 with patch("urllib.request.urlopen", side_effect=exc):
194 with pytest.raises(SystemExit):
195 _json_post("http://hub.local", "/api/test", {})
196
197 def test_url_error_exits_user_error(self) -> None:
198 import urllib.error
199 from muse.cli.commands.auth import _json_post_raw as _json_post
200 exc = urllib.error.URLError(reason="connection refused")
201 with patch("urllib.request.urlopen", side_effect=exc):
202 with pytest.raises(SystemExit):
203 _json_post("http://hub.local", "/api/test", {})
204
205 def test_oversized_response_exits(self) -> None:
206 from muse.cli.commands.auth import _MAX_RESPONSE_BYTES, _json_post_raw as _json_post
207 mock_resp = MagicMock()
208 mock_resp.__enter__ = lambda s: s
209 mock_resp.__exit__ = MagicMock(return_value=False)
210 mock_resp.read.return_value = b"x" * (_MAX_RESPONSE_BYTES + 2)
211 with patch("urllib.request.urlopen", return_value=mock_resp):
212 with pytest.raises(SystemExit):
213 _json_post("http://hub.local", "/api/test", {})
214
215 def test_non_dict_response_exits(self) -> None:
216 from muse.cli.commands.auth import _json_post_raw as _json_post
217 mock_resp = MagicMock()
218 mock_resp.__enter__ = lambda s: s
219 mock_resp.__exit__ = MagicMock(return_value=False)
220 mock_resp.read.return_value = b'["not", "a", "dict"]'
221 with patch("urllib.request.urlopen", return_value=mock_resp):
222 with pytest.raises(SystemExit):
223 _json_post("http://hub.local", "/api/test", {})
224
225 def test_none_values_stripped_from_payload(self) -> None:
226 """None-valued keys must not appear in the serialised request body."""
227 from muse.cli.commands.auth import _json_post_raw as _json_post
228 captured: list[bytes] = []
229
230 def _capture(req: urllib.request.Request, timeout: float, context: ssl.SSLContext | None = None) -> MagicMock:
231 data: bytes = req.data if isinstance(req.data, bytes) else b""
232 captured.append(data)
233 mock_resp = MagicMock()
234 mock_resp.__enter__ = lambda s: s
235 mock_resp.__exit__ = MagicMock(return_value=False)
236 mock_resp.read.return_value = b'{"ok": true}'
237 return mock_resp
238
239 with patch("urllib.request.urlopen", side_effect=_capture):
240 _json_post("http://hub.local", "/test", {"a": "1", "b": None, "c": "3"})
241
242 sent = json.loads(captured[0])
243 assert "b" not in sent
244 assert sent["a"] == "1"
245 assert sent["c"] == "3"
246
247
248 # ── Unit: _display_entry ──────────────────────────────────────────────────────
249
250
251 class TestDisplayEntry:
252 def _make_entry(self) -> IdentityEntry:
253 return {
254 "type": "human",
255 "handle": "alice",
256 "algorithm": "ed25519",
257 "fingerprint": "abc123fingerprint456",
258 "hd_path": "m/1075233755'/0'/0'/0'/0'/0'",
259 }
260
261 def test_json_output_all_keys_present(self, capsys: pytest.CaptureFixture[str]) -> None:
262 from muse.cli.commands.auth import _display_entry
263 _display_entry("hub.example.com", self._make_entry(), json_output=True)
264 data = json.loads(capsys.readouterr().out)
265 for key in ("hub", "type", "handle", "fingerprint", "key_set", "capabilities"):
266 assert key in data, f"Missing key: {key}"
267
268 def test_json_key_set_true(self, capsys: pytest.CaptureFixture[str]) -> None:
269 from muse.cli.commands.auth import _display_entry
270 _display_entry("hub.example.com", self._make_entry(), json_output=True)
271 data = json.loads(capsys.readouterr().out)
272 assert data["key_set"] is True
273 assert isinstance(data["key_set"], bool)
274
275 def test_json_no_key_key_set_false(self, capsys: pytest.CaptureFixture[str]) -> None:
276 from muse.cli.commands.auth import _display_entry
277 entry: IdentityEntry = {"type": "agent"}
278 _display_entry("hub.example.com", entry, json_output=True)
279 data = json.loads(capsys.readouterr().out)
280 assert data["key_set"] is False
281 assert isinstance(data["key_set"], bool)
282
283 def test_json_goes_to_stdout_not_stderr(self, capsys: pytest.CaptureFixture[str]) -> None:
284 from muse.cli.commands.auth import _display_entry
285 _display_entry("hub.example.com", self._make_entry(), json_output=True)
286 captured = capsys.readouterr()
287 assert captured.out.strip().startswith("{")
288
289 def test_text_goes_to_stderr(self, capsys: pytest.CaptureFixture[str]) -> None:
290 from muse.cli.commands.auth import _display_entry
291 _display_entry("hub.example.com", self._make_entry(), json_output=False)
292 captured = capsys.readouterr()
293 assert captured.out.strip() == ""
294 assert "hub.example.com" in captured.err
295
296 def test_ansi_in_hostname_stripped_in_text_mode(self, capsys: pytest.CaptureFixture[str]) -> None:
297 from muse.cli.commands.auth import _display_entry
298 _display_entry("\x1b[31mmalicious\x1b[0m", self._make_entry(), json_output=False)
299 assert "\x1b[" not in capsys.readouterr().err
300
301 def test_capabilities_list_in_json(self, capsys: pytest.CaptureFixture[str]) -> None:
302 from muse.cli.commands.auth import _display_entry
303 entry: IdentityEntry = {"type": "agent", "capabilities": ["push", "pull"]}
304 _display_entry("hub.example.com", entry, json_output=True)
305 data = json.loads(capsys.readouterr().out)
306 assert data["capabilities"] == ["push", "pull"]
307
308
309 # ── Integration: run_whoami ───────────────────────────────────────────────────
310
311
312 class TestWhoamiHardening:
313 _HUB = "http://localhost:19999"
314
315 def _store(self, repo: pathlib.Path, *, handle: str = "alice") -> None:
316 save_identity(self._HUB, {
317 "type": "human",
318 "handle": handle,
319 "algorithm": "ed25519",
320 "fingerprint": "fp123",
321 "hd_path": "m/1075233755'/0'/0'/0'/0'/0'",
322 })
323
324 def test_no_identity_exits_nonzero(self, repo: pathlib.Path) -> None:
325 result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB])
326 assert result.exit_code != 0
327
328 def test_whoami_json_schema(self, repo: pathlib.Path) -> None:
329 self._store(repo)
330 result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB, "--json"])
331 assert result.exit_code == 0
332 data = _json_whoami(result)
333 for key in ("hub", "type", "handle", "fingerprint", "key_set", "capabilities"):
334 assert key in data, f"Missing key: {key}"
335 assert data["key_set"] is True
336 assert isinstance(data["key_set"], bool)
337 assert data["handle"] == "alice"
338
339 def test_whoami_json_stdout_clean(self, repo: pathlib.Path) -> None:
340 self._store(repo)
341 result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB, "--json"])
342 assert result.exit_code == 0
343 json_lines = [l for l in result.output.splitlines() if l.strip().startswith("{")]
344 assert len(json_lines) >= 1
345
346 def test_whoami_all_lists_multiple(self, repo: pathlib.Path) -> None:
347 hub2 = "http://localhost:20000"
348 save_identity(self._HUB, {"type": "human", "handle": "alice"})
349 save_identity(hub2, {"type": "agent", "handle": "bot"})
350 result = runner.invoke(cli, ["auth", "whoami", "--all", "--json"])
351 assert result.exit_code == 0
352 parsed = json.loads(result.output)
353 assert isinstance(parsed["identities"], list)
354 assert len(parsed["identities"]) == 2
355
356 def test_whoami_all_empty_exits_nonzero(self, repo: pathlib.Path) -> None:
357 result = runner.invoke(cli, ["auth", "whoami", "--all"])
358 assert result.exit_code != 0
359
360 def test_whoami_no_hub_exits_nonzero(self, repo: pathlib.Path) -> None:
361 result = runner.invoke(cli, ["auth", "whoami"])
362 assert result.exit_code != 0
363
364 def test_whoami_all_json_is_single_array(self, repo: pathlib.Path) -> None:
365 hub2 = "http://localhost:20000"
366 save_identity(self._HUB, {"type": "human", "handle": "alice"})
367 save_identity(hub2, {"type": "agent", "handle": "bot"})
368 result = runner.invoke(cli, ["auth", "whoami", "--all", "--json"])
369 assert result.exit_code == 0
370 parsed = json.loads(result.output)
371 assert isinstance(parsed["identities"], list)
372 assert len(parsed["identities"]) == 2
373
374 def test_whoami_key_set_is_bool(self, repo: pathlib.Path) -> None:
375 self._store(repo)
376 result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB, "--json"])
377 assert result.exit_code == 0
378 raw = result.output
379 assert '"key_set": true' in raw or '"key_set":true' in raw
380 assert '"key_set": "true"' not in raw
381
382 def test_whoami_short_j_flag(self, repo: pathlib.Path) -> None:
383 self._store(repo)
384 result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB, "-j"])
385 assert result.exit_code == 0
386 json.loads(result.output)
387
388 def test_whoami_short_a_flag(self, repo: pathlib.Path) -> None:
389 self._store(repo)
390 result = runner.invoke(cli, ["auth", "whoami", "-a"])
391 assert result.exit_code == 0
392 assert "localhost" in result.stderr
393
394
395 # ── Integration: run_logout ───────────────────────────────────────────────────
396
397
398 class TestLogoutHardening:
399 _HUB = "http://localhost:19999"
400
401 def _store(self, repo: pathlib.Path) -> None:
402 save_identity(self._HUB, {"type": "human", "handle": "alice"})
403
404 def test_logout_success_json_schema(self, repo: pathlib.Path) -> None:
405 self._store(repo)
406 result = runner.invoke(
407 cli, ["auth", "logout", "--hub", self._HUB, "--json"]
408 )
409 assert result.exit_code == 0
410 data = _json_logout(result)
411 assert data["status"] == "ok"
412 assert data["count"] == 1
413 assert isinstance(data["hubs"], list)
414 assert len(data["hubs"]) == 1
415
416 def test_logout_nothing_to_do_json(self, repo: pathlib.Path) -> None:
417 result = runner.invoke(
418 cli, ["auth", "logout", "--hub", self._HUB, "--json"]
419 )
420 assert result.exit_code == 0
421 data = _json_logout(result)
422 assert data["status"] == "nothing_to_do"
423 assert data["count"] == 0
424 assert data["hubs"] == []
425
426 def test_logout_all_clears_all(self, repo: pathlib.Path) -> None:
427 hub2 = "http://localhost:20000"
428 save_identity(self._HUB, {"type": "human", "handle": "a"})
429 save_identity(hub2, {"type": "agent", "handle": "b"})
430 result = runner.invoke(cli, ["auth", "logout", "--all", "--json"])
431 assert result.exit_code == 0
432 data = _json_logout(result)
433 assert data["status"] == "ok"
434 assert data["count"] == 2
435
436 def test_logout_all_empty_json(self, repo: pathlib.Path) -> None:
437 result = runner.invoke(cli, ["auth", "logout", "--all", "--json"])
438 assert result.exit_code == 0
439 data = _json_logout(result)
440 assert data["status"] == "nothing_to_do"
441 assert data["count"] == 0
442
443 def test_logout_removes_identity(self, repo: pathlib.Path) -> None:
444 self._store(repo)
445 runner.invoke(cli, ["auth", "logout", "--hub", self._HUB])
446 from muse.core.identity import load_identity
447 assert load_identity(self._HUB) is None
448
449 def test_logout_json_stdout_clean(self, repo: pathlib.Path) -> None:
450 self._store(repo)
451 result = runner.invoke(cli, ["auth", "logout", "--hub", self._HUB, "--json"])
452 assert result.exit_code == 0
453 for line in result.output.splitlines():
454 stripped = line.strip()
455 if stripped:
456 assert stripped.startswith("{") or stripped.startswith('"') \
457 or stripped.startswith("}"), f"Non-JSON on stdout: {stripped!r}"
458
459 def test_logout_json_hubs_list_contains_hostname(self, repo: pathlib.Path) -> None:
460 """JSON output 'hubs' list contains the normalised hostname."""
461 self._store(repo)
462 result = runner.invoke(
463 cli, ["auth", "logout", "--hub", self._HUB, "--json"]
464 )
465 assert result.exit_code == 0
466 data = _json_logout(result)
467 assert "hubs" in data
468 assert len(data["hubs"]) == 1
469 assert "localhost" in data["hubs"][0]
470
471 def test_logout_all_json_hubs_sorted(self, repo: pathlib.Path) -> None:
472 """--all --json hubs list is alphabetically sorted."""
473 hubs = [
474 "http://z-hub.example.com",
475 "http://a-hub.example.com",
476 "http://m-hub.example.com",
477 ]
478 for h in hubs:
479 save_identity(h, {"type": "human", "handle": "u"})
480 result = runner.invoke(cli, ["auth", "logout", "--all", "--json"])
481 assert result.exit_code == 0
482 data = _json_logout(result)
483 assert data["hubs"] == sorted(data["hubs"]), "hubs not sorted"
484 assert data["count"] == 3
485
486 def test_logout_idempotent_second_call(self, repo: pathlib.Path) -> None:
487 """Logging out twice succeeds both times (second call is nothing_to_do)."""
488 self._store(repo)
489 r1 = runner.invoke(cli, ["auth", "logout", "--hub", self._HUB, "--json"])
490 r2 = runner.invoke(cli, ["auth", "logout", "--hub", self._HUB, "--json"])
491 assert r1.exit_code == 0
492 assert r2.exit_code == 0
493 d1 = _json_logout(r1)
494 d2 = _json_logout(r2)
495 assert d1["status"] == "ok"
496 assert d2["status"] == "nothing_to_do"
497
498 def test_logout_preserves_sibling_hub(self, repo: pathlib.Path) -> None:
499 """Logging out from one hub must not remove a different hub's identity."""
500 hub2 = "http://sibling.example.com"
501 self._store(repo)
502 save_identity(hub2, {"type": "agent", "handle": "bot"})
503 runner.invoke(cli, ["auth", "logout", "--hub", self._HUB])
504 from muse.core.identity import load_identity
505 assert load_identity(self._HUB) is None
506 assert load_identity(hub2) is not None
507
508 def test_logout_short_flags(self, repo: pathlib.Path) -> None:
509 """-j and -a short flags work identically to --json and --all."""
510 hub2 = "http://shortflag.example.com"
511 self._store(repo)
512 save_identity(hub2, {"type": "agent", "handle": "bot"})
513 result = runner.invoke(cli, ["auth", "logout", "-a", "-j"])
514 assert result.exit_code == 0
515 data = _json_logout(result)
516 assert data["status"] == "ok"
517 assert data["count"] == 2
518
519 def test_logout_all_single_write(self, repo: pathlib.Path) -> None:
520 """clear_all_identities is called exactly once for --all (not N times)."""
521 import unittest.mock
522 hubs = [f"http://hub{i}.example.com" for i in range(5)]
523 for h in hubs:
524 save_identity(h, {"type": "human", "handle": "u"})
525 with unittest.mock.patch(
526 "muse.core.identity._save_all", wraps=__import__("muse.core.identity", fromlist=["_save_all"])._save_all
527 ) as mock_save:
528 result = runner.invoke(cli, ["auth", "logout", "--all", "--json"])
529 assert result.exit_code == 0
530 # _save_all must be called exactly once for all 5 hubs
531 assert mock_save.call_count == 1
532
533 def test_logout_ansi_in_hub_url_sanitized(self, repo: pathlib.Path) -> None:
534 """ANSI codes injected into a hub URL display name are stripped in text output."""
535 import unittest.mock
536 ansi_hostname = "\x1b[31mattacker.example.com\x1b[0m"
537 malicious_entry: IdentityEntry = {"type": "human", "handle": "eve"}
538 with unittest.mock.patch(
539 "muse.core.identity._load_all",
540 return_value={ansi_hostname: malicious_entry},
541 ):
542 # Provide a matching --hub so logout finds it
543 result = runner.invoke(
544 cli,
545 ["auth", "logout", "--hub", ansi_hostname],
546 )
547 # Output (stderr) must not contain raw ESC bytes
548 assert "\x1b" not in result.output, "ANSI escape leaked to output"
549
550
551 # ── Security ──────────────────────────────────────────────────────────────────
552
553
554 class TestAuthSecurity:
555 def test_json_post_file_scheme_blocked(self) -> None:
556 from muse.cli.commands.auth import _json_post_raw as _json_post
557 with patch("urllib.request.urlopen") as mock_net:
558 with pytest.raises(SystemExit):
559 _json_post("file:///etc/passwd", "/api/test", {})
560 mock_net.assert_not_called()
561
562 def test_hub_base_url_file_scheme_does_not_open_file(self) -> None:
563 from muse.cli.commands.auth import _hub_base_url
564 with patch("builtins.open") as mock_open:
565 with pytest.raises(SystemExit):
566 _hub_base_url("file:///etc/shadow")
567 mock_open.assert_not_called()
568
569
570 # ── E2E: keygen with unavailable crypto ──────────────────────────────────────
571
572
573 class TestKeygenCLI:
574 def test_keygen_no_hub_exits_nonzero(self, repo: pathlib.Path) -> None:
575 result = runner.invoke(cli, ["auth", "keygen"])
576 assert result.exit_code != 0
577
578 def test_keygen_invalid_scheme_exits(self, repo: pathlib.Path) -> None:
579 result = runner.invoke(
580 cli, ["auth", "keygen", "--hub", "ftp://attacker.example.com"]
581 )
582 # May fail at scheme validation or at key generation
583 assert result.exit_code != 0
584
585
586 # ── E2E: whoami capabilities field ───────────────────────────────────────────
587
588
589 class TestWhoamiCapabilities:
590 _HUB = "http://localhost:19999"
591
592 def test_capabilities_empty_list_in_json(self, repo: pathlib.Path) -> None:
593 save_identity(self._HUB, {"type": "human", "handle": "alice"})
594 result = runner.invoke(cli, ["auth", "whoami", "--hub", self._HUB, "--json"])
595 assert result.exit_code == 0
596 data = _json_whoami(result)
597 assert isinstance(data["capabilities"], list)
598
599
600 # ── Stress: concurrent logins ─────────────────────────────────────────────────
601
602
603 class TestStressConcurrent:
604 def test_8_sequential_saves_to_isolated_files_correct(
605 self, tmp_path: pathlib.Path
606 ) -> None:
607 """Sequential save→load round-trips to isolated identity files produce correct data.
608
609 This validates the save/load logic is correct before layering concurrency.
610 Module-level patching is not thread-safe, so isolation is tested sequentially.
611 """
612 import muse.core.identity as _id_mod
613 orig_file = _id_mod._IDENTITY_FILE
614
615 for idx in range(8):
616 identity_file = tmp_path / f"identity_{idx}.toml"
617 identity_file.write_text("")
618 hub = f"http://localhost:{19000 + idx}"
619
620 _id_mod._IDENTITY_FILE = identity_file
621 try:
622 from muse.core.identity import save_identity, load_identity, IdentityEntry
623 entry: IdentityEntry = {"type": "agent", "handle": f"agent-{idx}"}
624 save_identity(hub, entry)
625 stored = load_identity(hub)
626 assert stored is not None, f"Identity not stored for {hub}"
627 assert stored["handle"] == f"agent-{idx}"
628 finally:
629 _id_mod._IDENTITY_FILE = orig_file
630
631 def test_8_concurrent_logins_shared_identity_file(
632 self, tmp_path: pathlib.Path
633 ) -> None:
634 """8 threads writing different hubs to the same identity file.
635
636 The advisory lock in ``_identity_write_lock()`` must prevent
637 read-modify-write races. All 8 entries must survive after all threads
638 complete.
639
640 We patch _IDENTITY_FILE and _IDENTITY_DIR *once* before threads start
641 to avoid the non-thread-safe module-attribute race that would arise if
642 each thread patched them independently.
643 """
644 import muse.core.identity as _id_mod
645
646 identity_dir = tmp_path / "muse-home"
647 identity_dir.mkdir()
648 identity_file = identity_dir / "identity.toml"
649 identity_file.write_text("")
650
651 orig_file = _id_mod._IDENTITY_FILE
652 orig_dir = _id_mod._IDENTITY_DIR
653 _id_mod._IDENTITY_FILE = identity_file
654 _id_mod._IDENTITY_DIR = identity_dir
655
656 errors: list[str] = []
657
658 def _do(idx: int) -> None:
659 try:
660 hub = f"http://localhost:{19000 + idx}"
661 from muse.core.identity import save_identity, IdentityEntry
662 entry: IdentityEntry = {"type": "human", "handle": f"user-{idx:02d}"}
663 save_identity(hub, entry)
664 except Exception as exc:
665 errors.append(f"Thread {idx}: {exc}")
666
667 threads = [threading.Thread(target=_do, args=(i,)) for i in range(8)]
668 for t in threads:
669 t.start()
670 for t in threads:
671 t.join()
672
673 try:
674 from muse.core.identity import list_all_identities
675 all_ids = list_all_identities()
676 finally:
677 _id_mod._IDENTITY_FILE = orig_file
678 _id_mod._IDENTITY_DIR = orig_dir
679
680 assert errors == [], f"Concurrent shared identity file failures:\n{'\n'.join(errors)}"
681 assert len(all_ids) == 8, f"Expected 8 identities, got {len(all_ids)}: {list(all_ids)}"
682
683 def test_16_sequential_logins_all_survive(
684 self, tmp_path: pathlib.Path
685 ) -> None:
686 """16 sequential logins to distinct hubs all persist correctly.
687
688 Validates that repeated save→load round-trips don't corrupt the TOML
689 file and that all entries coexist after the final write.
690 """
691 import muse.core.identity as _id_mod
692
693 identity_dir = tmp_path / "muse-stress"
694 identity_dir.mkdir()
695 identity_file = identity_dir / "identity.toml"
696 identity_file.write_text("")
697
698 orig_file = _id_mod._IDENTITY_FILE
699 orig_dir = _id_mod._IDENTITY_DIR
700 _id_mod._IDENTITY_FILE = identity_file
701 _id_mod._IDENTITY_DIR = identity_dir
702
703 try:
704 from muse.core.identity import save_identity, load_identity, IdentityEntry
705 for idx in range(16):
706 hub = f"http://localhost:{21000 + idx}"
707 entry: IdentityEntry = {
708 "type": "agent",
709 "handle": f"agent-{idx:02d}",
710 }
711 save_identity(hub, entry)
712
713 from muse.core.identity import list_all_identities
714 all_ids = list_all_identities()
715 assert len(all_ids) == 16, f"Expected 16, got {len(all_ids)}"
716
717 for idx in (0, 7, 15):
718 hub = f"http://localhost:{21000 + idx}"
719 stored = load_identity(hub)
720 assert stored is not None
721 assert stored["handle"] == f"agent-{idx:02d}"
722 finally:
723 _id_mod._IDENTITY_FILE = orig_file
724 _id_mod._IDENTITY_DIR = orig_dir
725
726
727 # ---------------------------------------------------------------------------
728 # Flag registration tests
729 # ---------------------------------------------------------------------------
730
731 import argparse as _argparse
732 from muse.cli.commands.auth import register as _register_auth
733
734
735 def _parse_auth(*args: str) -> _argparse.Namespace:
736 """Build an argument parser via register() and parse args."""
737 root_p = _argparse.ArgumentParser()
738 subs = root_p.add_subparsers(dest="cmd")
739 _register_auth(subs)
740 return root_p.parse_args(["auth", *args])
741
742
743 class TestRegisterFlags:
744 # ── keygen ──────────────────────────────────────────────────────────────
745 def test_keygen_default_json_out_is_false(self) -> None:
746 ns = _parse_auth("keygen", "--hub", "https://localhost:1337")
747 assert ns.json_out is False
748
749 def test_keygen_json_flag_sets_json_out(self) -> None:
750 ns = _parse_auth("keygen", "--hub", "https://localhost:1337", "--json")
751 assert ns.json_out is True
752
753 def test_keygen_j_shorthand_sets_json_out(self) -> None:
754 ns = _parse_auth("keygen", "--hub", "https://localhost:1337", "-j")
755 assert ns.json_out is True
756
757 def test_keygen_force_flag(self) -> None:
758 ns = _parse_auth("keygen", "--hub", "https://localhost:1337", "--force")
759 assert ns.force is True
760
761 # ── recover ─────────────────────────────────────────────────────────────
762 def test_recover_default_json_out_is_false(self) -> None:
763 ns = _parse_auth("recover", "--hub", "https://localhost:1337")
764 assert ns.json_out is False
765
766 def test_recover_json_flag_sets_json_out(self) -> None:
767 ns = _parse_auth("recover", "--hub", "https://localhost:1337", "--json")
768 assert ns.json_out is True
769
770 def test_recover_j_shorthand_sets_json_out(self) -> None:
771 ns = _parse_auth("recover", "--hub", "https://localhost:1337", "-j")
772 assert ns.json_out is True
773
774 # ── rotate ──────────────────────────────────────────────────────────────
775 def test_rotate_default_json_out_is_false(self) -> None:
776 ns = _parse_auth("rotate")
777 assert ns.json_out is False
778
779 def test_rotate_json_flag_sets_json_out(self) -> None:
780 ns = _parse_auth("rotate", "--json")
781 assert ns.json_out is True
782
783 def test_rotate_j_shorthand_sets_json_out(self) -> None:
784 ns = _parse_auth("rotate", "-j")
785 assert ns.json_out is True
786
787 # ── register ────────────────────────────────────────────────────────────
788 def test_register_default_json_out_is_false(self) -> None:
789 ns = _parse_auth("register", "--hub", "https://localhost:1337", "--handle", "gabriel")
790 assert ns.json_out is False
791
792 def test_register_json_flag_sets_json_out(self) -> None:
793 ns = _parse_auth("register", "--hub", "https://localhost:1337", "--handle", "gabriel", "--json")
794 assert ns.json_out is True
795
796 def test_register_j_shorthand_sets_json_out(self) -> None:
797 ns = _parse_auth("register", "--hub", "https://localhost:1337", "--handle", "gabriel", "-j")
798 assert ns.json_out is True
799
800 # ── whoami ──────────────────────────────────────────────────────────────
801 def test_whoami_default_json_out_is_false(self) -> None:
802 ns = _parse_auth("whoami")
803 assert ns.json_out is False
804
805 def test_whoami_json_flag_sets_json_out(self) -> None:
806 ns = _parse_auth("whoami", "--json")
807 assert ns.json_out is True
808
809 def test_whoami_j_shorthand_sets_json_out(self) -> None:
810 ns = _parse_auth("whoami", "-j")
811 assert ns.json_out is True
812
813 def test_whoami_all_flag(self) -> None:
814 ns = _parse_auth("whoami", "--all")
815 assert ns.all_hubs is True
816
817 # ── logout ──────────────────────────────────────────────────────────────
818 def test_logout_default_json_out_is_false(self) -> None:
819 ns = _parse_auth("logout")
820 assert ns.json_out is False
821
822 def test_logout_json_flag_sets_json_out(self) -> None:
823 ns = _parse_auth("logout", "--json")
824 assert ns.json_out is True
825
826 def test_logout_j_shorthand_sets_json_out(self) -> None:
827 ns = _parse_auth("logout", "-j")
828 assert ns.json_out is True
829
830 # ── show ────────────────────────────────────────────────────────────────
831 def test_show_default_json_out_is_false(self) -> None:
832 ns = _parse_auth("show")
833 assert ns.json_out is False
834
835 def test_show_json_flag_sets_json_out(self) -> None:
836 ns = _parse_auth("show", "--json")
837 assert ns.json_out is True
838
839 def test_show_j_shorthand_sets_json_out(self) -> None:
840 ns = _parse_auth("show", "-j")
841 assert ns.json_out is True
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 20 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 28 days ago