gabriel / muse public
test_auth_supercharge.py python
1,038 lines 41.5 KB
Raw
1 """Supercharge tests for ``muse auth``.
2
3 New features under test
4 -----------------------
5 1. ``--type human|agent`` filter on ``whoami --all``
6 2. ``provisioned_by`` field in ``whoami --json`` output (trust chain visibility)
7 3. ``hd_path`` field in ``whoami --json`` output (HD provenance visibility)
8 4. ``_KeygenJson`` TypedDict completeness: ``hd_path``, ``mnemonic_word_count``, ``label``
9 5. ``_ShowJson`` TypedDict completeness: ``algorithm``,
10 ``provisioned_by``, ``provisioned_by_fingerprint``
11 6. Security: invalid ``--type`` value exits non-zero
12 7. ``show --json`` includes ``algorithm`` and ``hd_path``
13
14 Test categories
15 ---------------
16 - unit : TypedDict schema completeness
17 - integration : CLI round-trips via CliRunner with isolated identity files
18 - security : bad --type value rejected, ANSI-safe output
19 - data integrity: provisioned_by, hd_path survive save→load round-trip
20 - performance : _load_all with 50 entries under 200 ms
21 """
22
23 from __future__ import annotations
24 from collections.abc import Mapping
25
26 import json
27 import pathlib
28 import time
29
30 import pytest
31
32 from tests.cli_test_helper import CliRunner, InvokeResult
33 from muse.core.identity import IdentityEntry, save_identity
34 from muse.core.paths import muse_dir
35
36 cli = None
37 runner = CliRunner()
38
39 HUB = "http://localhost:19111"
40 HOSTNAME = "localhost:19111"
41 HUB2 = "http://localhost:19222"
42 HOSTNAME2 = "localhost:19222"
43
44
45 # ── fixtures ──────────────────────────────────────────────────────────────────
46
47
48 @pytest.fixture
49 def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
50 """Minimal .muse/ repo with isolated identity home."""
51 from muse._version import __version__
52
53 dot_muse = muse_dir(tmp_path)
54 for sub in ("refs/heads", "objects", "commits", "snapshots"):
55 (dot_muse / sub).mkdir(parents=True, exist_ok=True)
56 (dot_muse / "repo.json").write_text(
57 json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"})
58 )
59 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n")
60 (dot_muse / "refs" / "heads" / "main").write_text("")
61 (dot_muse / "config.toml").write_text("")
62
63 muse_home = tmp_path / ".muse-home"
64 muse_home.mkdir()
65 (muse_home / "identity.toml").write_text("")
66
67 import muse.core.identity as _id_mod
68 monkeypatch.setattr(_id_mod, "_IDENTITY_FILE", muse_home / "identity.toml")
69 monkeypatch.setattr(_id_mod, "_IDENTITY_DIR", muse_home)
70 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
71 monkeypatch.chdir(tmp_path)
72 return tmp_path
73
74
75 def _human(handle: str = "alice") -> IdentityEntry:
76 return {
77 "type": "human",
78 "handle": handle,
79 "algorithm": "ed25519",
80 "fingerprint": "a" * 64,
81 }
82
83
84 def _agent(handle: str = "bot", provisioned_by: str = "alice") -> IdentityEntry:
85 return {
86 "type": "agent",
87 "handle": handle,
88 "algorithm": "ed25519",
89 "fingerprint": "b" * 64,
90 "provisioned_by": provisioned_by,
91 }
92
93
94 def _parse_json(result: InvokeResult) -> Mapping[str, object]:
95 """Parse first JSON structure from result.output (object or array)."""
96 for line in result.output.splitlines():
97 stripped = line.strip()
98 if stripped.startswith("{") or stripped.startswith("["):
99 return json.loads(stripped)
100 raise ValueError(f"No JSON in output:\n{result.output!r}")
101
102
103 # ── Unit: _KeygenJson TypedDict completeness ──────────────────────────────────
104
105
106 class TestKeygenJsonTypedDict:
107 """_KeygenJson TypedDict must declare hd_path and mnemonic_word_count."""
108
109 def test_hd_path_in_keygen_typeddict(self) -> None:
110 from muse.cli.commands.auth import _KeygenJson
111 hints = _KeygenJson.__annotations__
112 assert "hd_path" in hints, (
113 "_KeygenJson missing hd_path — run_keygen already emits it"
114 )
115
116 def test_mnemonic_word_count_in_keygen_typeddict(self) -> None:
117 from muse.cli.commands.auth import _KeygenJson
118 hints = _KeygenJson.__annotations__
119 assert "mnemonic_word_count" in hints, (
120 "_KeygenJson missing mnemonic_word_count — run_keygen already emits it"
121 )
122
123 def test_all_required_keys_present(self) -> None:
124 from muse.cli.commands.auth import _KeygenJson
125 hints = _KeygenJson.__annotations__
126 for key in ("status", "hub", "hostname", "public_key_b64",
127 "fingerprint", "hd_path", "mnemonic_word_count"):
128 assert key in hints, f"_KeygenJson missing key: {key!r}"
129
130
131 # ── Unit: _ShowJson TypedDict completeness ────────────────────────────────────
132
133
134 class TestShowJsonTypedDict:
135 """_ShowJson TypedDict must expose algorithm, provisioned_by,
136 provisioned_by_fingerprint — fields that identity.toml stores but the
137 show command didn't surface."""
138
139 def test_algorithm_in_show_typeddict(self) -> None:
140 from muse.cli.commands.auth import _ShowJson
141 assert "algorithm" in _ShowJson.__annotations__, "_ShowJson missing algorithm"
142
143 def test_provisioned_by_in_show_typeddict(self) -> None:
144 from muse.cli.commands.auth import _ShowJson
145 assert "provisioned_by" in _ShowJson.__annotations__, \
146 "_ShowJson missing provisioned_by (agent trust chain)"
147
148 def test_provisioned_by_fingerprint_in_show_typeddict(self) -> None:
149 from muse.cli.commands.auth import _ShowJson
150 assert "provisioned_by_fingerprint" in _ShowJson.__annotations__, \
151 "_ShowJson missing provisioned_by_fingerprint"
152
153
154 # ── Unit: _WhoamiJson has provisioned_by and hd_path ─────────────────────────
155
156
157 class TestWhoamiJsonTypedDict:
158 def test_provisioned_by_in_whoami_typeddict(self) -> None:
159 from muse.cli.commands.auth import _WhoamiJson
160 assert "provisioned_by" in _WhoamiJson.__annotations__, \
161 "_WhoamiJson missing provisioned_by"
162
163 def test_hd_path_in_whoami_typeddict(self) -> None:
164 from muse.cli.commands.auth import _WhoamiJson
165 assert "hd_path" in _WhoamiJson.__annotations__, \
166 "_WhoamiJson missing hd_path"
167
168
169 # ── Integration: _display_entry emits provisioned_by for agents ───────────────
170
171
172 class TestDisplayEntryProvisionedBy:
173 def test_agent_provisioned_by_in_json(self, capsys: pytest.CaptureFixture[str]) -> None:
174 from muse.cli.commands.auth import _display_entry
175 entry: IdentityEntry = {
176 "type": "agent",
177 "handle": "bot",
178 "algorithm": "ed25519",
179 "fingerprint": "b" * 64,
180 "provisioned_by": "alice",
181 }
182 _display_entry(HOSTNAME, entry, json_output=True)
183 data = json.loads(capsys.readouterr().out)
184 assert data.get("provisioned_by") == "alice"
185
186 def test_human_no_provisioned_by(self, capsys: pytest.CaptureFixture[str]) -> None:
187 from muse.cli.commands.auth import _display_entry
188 entry: IdentityEntry = _human()
189 _display_entry(HOSTNAME, entry, json_output=True)
190 data = json.loads(capsys.readouterr().out)
191 assert "provisioned_by" not in data or data.get("provisioned_by") == ""
192
193 def test_agent_hd_path_in_json(self, capsys: pytest.CaptureFixture[str]) -> None:
194 from muse.cli.commands.auth import _display_entry
195 hd_path = "m/1075233755'/0'/0'/0'/0'/0'"
196 entry: IdentityEntry = {
197 "type": "human",
198 "handle": "gabriel",
199 "algorithm": "ed25519",
200 "fingerprint": "a" * 64,
201 "hd_path": hd_path,
202 }
203 _display_entry(HOSTNAME, entry, json_output=True)
204 data = json.loads(capsys.readouterr().out)
205 assert data.get("hd_path") == hd_path
206
207 def test_no_hd_path_absent_from_json(self, capsys: pytest.CaptureFixture[str]) -> None:
208 from muse.cli.commands.auth import _display_entry
209 entry: IdentityEntry = _human()
210 _display_entry(HOSTNAME, entry, json_output=True)
211 data = json.loads(capsys.readouterr().out)
212 assert "hd_path" not in data or data.get("hd_path") == ""
213
214
215 # ── Integration: whoami --all --type filter ───────────────────────────────────
216
217
218 class TestWhoamiTypeFilter:
219 """``muse auth whoami --all --type TYPE`` filters by identity type."""
220
221 def test_type_human_returns_only_humans(self, repo: pathlib.Path) -> None:
222 save_identity(HUB, _human("alice"))
223 save_identity(HUB2, _agent("bot", "alice"))
224 result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "human", "--json"])
225 assert result.exit_code == 0, result.output
226 data = json.loads(result.output)["identities"]
227 assert isinstance(data, list)
228 assert all(e["type"] == "human" for e in data), f"non-human in results: {data}"
229 handles = {e["handle"] for e in data}
230 assert "alice" in handles
231 assert "bot" not in handles
232
233 def test_type_agent_returns_only_agents(self, repo: pathlib.Path) -> None:
234 save_identity(HUB, _human("alice"))
235 save_identity(HUB2, _agent("bot", "alice"))
236 result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "agent", "--json"])
237 assert result.exit_code == 0, result.output
238 data = json.loads(result.output)["identities"]
239 assert isinstance(data, list)
240 assert all(e["type"] == "agent" for e in data)
241 handles = {e["handle"] for e in data}
242 assert "bot" in handles
243 assert "alice" not in handles
244
245 def test_type_filter_no_match_exits_nonzero(self, repo: pathlib.Path) -> None:
246 """--type agent when only humans are stored → exit nonzero."""
247 save_identity(HUB, _human("alice"))
248 result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "agent"])
249 assert result.exit_code != 0
250
251 def test_type_filter_no_match_json_exits_nonzero(self, repo: pathlib.Path) -> None:
252 save_identity(HUB, _human("alice"))
253 result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "agent", "--json"])
254 assert result.exit_code != 0
255
256 def test_type_invalid_value_exits_nonzero(self, repo: pathlib.Path) -> None:
257 """--type must accept only 'human' or 'agent'."""
258 save_identity(HUB, _human())
259 result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "superuser"])
260 assert result.exit_code != 0
261
262 def test_type_requires_all_flag(self, repo: pathlib.Path) -> None:
263 """--type without --all should fail or be ignored gracefully."""
264 save_identity(HUB, _human())
265 result = runner.invoke(cli, ["auth", "whoami", "--hub", HUB, "--type", "human", "--json"])
266 # Either succeeds (type flag ignored for single-hub) or fails cleanly
267 # Most important: no crash / traceback
268 assert result.exit_code in (0, 1), f"Unexpected exit code: {result.exit_code}"
269
270 def test_type_filter_counts_correctly(self, repo: pathlib.Path) -> None:
271 """3 humans + 2 agents; --type human → 3 results."""
272 hubs = [f"http://localhost:{19111 + i}" for i in range(5)]
273 for i, hub in enumerate(hubs):
274 if i < 3:
275 save_identity(hub, _human(f"human-{i}"))
276 else:
277 save_identity(hub, _agent(f"bot-{i}", "operator"))
278 result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "human", "--json"])
279 assert result.exit_code == 0
280 data = json.loads(result.output)["identities"]
281 assert len(data) == 3
282
283 def test_type_agent_includes_provisioned_by(self, repo: pathlib.Path) -> None:
284 """Agent entries in --type agent output expose provisioned_by."""
285 save_identity(HUB, _agent("bot", "alice"))
286 result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "agent", "--json"])
287 assert result.exit_code == 0
288 data = json.loads(result.output)["identities"]
289 assert len(data) == 1
290 assert data[0].get("provisioned_by") == "alice"
291
292
293 # ── Integration: whoami --json includes provisioned_by for agents ─────────────
294
295
296 class TestWhoamiProvisionedBy:
297 def test_whoami_json_agent_has_provisioned_by(self, repo: pathlib.Path) -> None:
298 save_identity(HUB, _agent("bot", "alice"))
299 result = runner.invoke(cli, ["auth", "whoami", "--hub", HUB, "--json"])
300 assert result.exit_code == 0
301 data = json.loads(result.output)
302 assert data.get("provisioned_by") == "alice"
303
304 def test_whoami_json_human_no_provisioned_by(self, repo: pathlib.Path) -> None:
305 save_identity(HUB, _human())
306 result = runner.invoke(cli, ["auth", "whoami", "--hub", HUB, "--json"])
307 assert result.exit_code == 0
308 data = json.loads(result.output)
309 assert "provisioned_by" not in data or not data["provisioned_by"]
310
311 def test_whoami_all_json_agent_has_provisioned_by(self, repo: pathlib.Path) -> None:
312 save_identity(HUB, _agent("bot", "alice"))
313 result = runner.invoke(cli, ["auth", "whoami", "--all", "--json"])
314 assert result.exit_code == 0
315 entries = json.loads(result.output)["identities"]
316 bot = next(e for e in entries if e["handle"] == "bot")
317 assert bot.get("provisioned_by") == "alice"
318
319
320 # ── Integration: whoami --json includes hd_path when present ──────────────────
321
322
323 class TestWhoamiHdPath:
324 def test_whoami_json_hd_entry_has_hd_path(self, repo: pathlib.Path) -> None:
325 hd_path = "m/1075233755'/0'/0'/0'/0'/0'"
326 entry: IdentityEntry = {
327 "type": "human",
328 "handle": "gabriel",
329 "algorithm": "ed25519",
330 "fingerprint": "a" * 64,
331 "hd_path": hd_path,
332 }
333 save_identity(HUB, entry)
334 result = runner.invoke(cli, ["auth", "whoami", "--hub", HUB, "--json"])
335 assert result.exit_code == 0
336 data = json.loads(result.output)
337 assert data.get("hd_path") == hd_path
338
339 def test_whoami_json_no_hd_path_when_absent(self, repo: pathlib.Path) -> None:
340 save_identity(HUB, _human())
341 result = runner.invoke(cli, ["auth", "whoami", "--hub", HUB, "--json"])
342 assert result.exit_code == 0
343 data = json.loads(result.output)
344 assert "hd_path" not in data or not data["hd_path"]
345
346
347 # ── Integration: show --json includes algorithm ───────────────────────────────
348
349
350 class TestShowJsonKeyPathAlgorithm:
351 def test_show_json_has_algorithm(self, repo: pathlib.Path) -> None:
352 save_identity(HUB, _human())
353 result = runner.invoke(cli, ["auth", "show", "--hub", HUB, "--json"])
354 assert result.exit_code == 0
355 data = json.loads(result.output)
356 assert "algorithm" in data, f"show --json missing algorithm; got: {list(data)}"
357
358 def test_show_json_algorithm_value_correct(self, repo: pathlib.Path) -> None:
359 save_identity(HUB, _human())
360 result = runner.invoke(cli, ["auth", "show", "--hub", HUB, "--json"])
361 assert result.exit_code == 0
362 data = json.loads(result.output)
363 assert data["algorithm"] == "ed25519"
364
365 def test_show_json_agent_has_provisioned_by(self, repo: pathlib.Path) -> None:
366 save_identity(HUB, _agent("bot", "alice"))
367 result = runner.invoke(cli, ["auth", "show", "--hub", HUB, "--json"])
368 assert result.exit_code == 0
369 data = json.loads(result.output)
370 assert data.get("provisioned_by") == "alice"
371
372
373 # ── Security: type filter input validation ────────────────────────────────────
374
375
376 class TestTypeFilterSecurity:
377 def test_type_with_ansi_injection_rejected(self, repo: pathlib.Path) -> None:
378 save_identity(HUB, _human())
379 result = runner.invoke(
380 cli, ["auth", "whoami", "--all", "--type", "\x1b[31mhuman\x1b[0m"]
381 )
382 assert result.exit_code != 0
383
384 def test_type_with_newline_injection_rejected(self, repo: pathlib.Path) -> None:
385 save_identity(HUB, _human())
386 result = runner.invoke(
387 cli, ["auth", "whoami", "--all", "--type", "human\nmalicious"]
388 )
389 assert result.exit_code != 0
390
391 def test_type_with_semicolon_rejected(self, repo: pathlib.Path) -> None:
392 save_identity(HUB, _human())
393 result = runner.invoke(
394 cli, ["auth", "whoami", "--all", "--type", "human;rm -rf /"]
395 )
396 assert result.exit_code != 0
397
398
399 # ── Performance: _load_all with 50 entries under 200 ms ──────────────────────
400
401
402 class TestLoadAllPerformance:
403 def test_50_entries_under_200ms(self, tmp_path: pathlib.Path) -> None:
404 from muse.core.identity import _load_all, _dump_identity
405
406 entries: dict[str, IdentityEntry] = {}
407 for i in range(50):
408 hostname = f"localhost:{19500 + i}"
409 entries[hostname] = {
410 "type": "human",
411 "handle": f"user-{i:02d}",
412 "algorithm": "ed25519",
413 "fingerprint": "a" * 64,
414 "hd_path": "m/1075233755'/0'/0'/0'/0'/0'",
415 }
416
417 p = tmp_path / "identity.toml"
418 p.write_text(_dump_identity(entries))
419
420 start = time.monotonic()
421 loaded = _load_all(p)
422 elapsed = time.monotonic() - start
423
424 assert len(loaded) == 50
425 assert elapsed < 0.2, f"_load_all with 50 entries took {elapsed:.3f}s"
426
427
428 # ── Stress: show with 50+ identities ─────────────────────────────────────────
429
430
431 class TestShowStress:
432 """show must handle large identity files without corruption."""
433
434 def test_show_with_50_identities_returns_correct_entry(
435 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
436 ) -> None:
437 """show --hub TARGET picks the right entry from a file with 50 entries."""
438 import muse.core.identity as _id_mod
439 from muse.core.identity import _dump_identity
440
441 identity_file = tmp_path / "identity.toml"
442 monkeypatch.setattr(_id_mod, "_IDENTITY_FILE", identity_file)
443 monkeypatch.setattr(_id_mod, "_IDENTITY_DIR", tmp_path)
444 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
445 monkeypatch.chdir(tmp_path)
446
447 # Create minimal .muse structure
448 from muse._version import __version__
449 dot_muse = muse_dir(tmp_path)
450 for sub in ("refs/heads", "objects", "commits", "snapshots"):
451 (dot_muse / sub).mkdir(parents=True, exist_ok=True)
452 (dot_muse / "repo.json").write_text(
453 json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"})
454 )
455 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n")
456 (dot_muse / "refs" / "heads" / "main").write_text("")
457 (dot_muse / "config.toml").write_text("")
458
459 entries: dict[str, IdentityEntry] = {}
460 target_hub = "http://localhost:20050"
461 for i in range(50):
462 hub = f"http://localhost:{20000 + i}"
463 entries[f"localhost:{20000 + i}"] = {
464 "type": "human",
465 "handle": f"user-{i:02d}",
466 "algorithm": "ed25519",
467 "fingerprint": "a" * 64,
468 }
469 # Override one specific entry as the target
470 entries["localhost:20050"] = {
471 "type": "human",
472 "handle": "target-user",
473 "algorithm": "ed25519",
474 "fingerprint": "f" * 64,
475 }
476 identity_file.write_text(_dump_identity(entries))
477
478 result = runner.invoke(cli, ["auth", "show", "--hub", target_hub, "--json"])
479 assert result.exit_code == 0, result.output
480 data = json.loads(result.output)
481 assert data["handle"] == "target-user"
482 assert data["fingerprint"] == "f" * 64
483
484 def test_show_50_repeated_calls_consistent(
485 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
486 ) -> None:
487 """50 consecutive show calls return identical results."""
488 import muse.core.identity as _id_mod
489 from muse.core.identity import _dump_identity
490
491 identity_file = tmp_path / "identity.toml"
492 monkeypatch.setattr(_id_mod, "_IDENTITY_FILE", identity_file)
493 monkeypatch.setattr(_id_mod, "_IDENTITY_DIR", tmp_path)
494 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
495 monkeypatch.chdir(tmp_path)
496
497 from muse._version import __version__
498 dot_muse = muse_dir(tmp_path)
499 for sub in ("refs/heads", "objects", "commits", "snapshots"):
500 (dot_muse / sub).mkdir(parents=True, exist_ok=True)
501 (dot_muse / "repo.json").write_text(
502 json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"})
503 )
504 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n")
505 (dot_muse / "refs" / "heads" / "main").write_text("")
506 (dot_muse / "config.toml").write_text("")
507
508 entry: IdentityEntry = {
509 "type": "human",
510 "handle": "stable-user",
511 "algorithm": "ed25519",
512 "fingerprint": "b" * 64,
513 }
514 identity_file.write_text(_dump_identity({"localhost:20099": entry}))
515
516 results = set()
517 for _ in range(50):
518 r = runner.invoke(cli, ["auth", "show", "--hub", "http://localhost:20099", "--json"])
519 assert r.exit_code == 0
520 d = json.loads(r.output)
521 d.pop("duration_ms", None)
522 d.pop("timestamp", None)
523 results.add(json.dumps(d, sort_keys=True))
524
525 assert len(results) == 1, "show returned different output across 50 calls"
526
527
528 # ── Stress: logout clear_all_identities with many hubs ───────────────────────
529
530
531 class TestLogoutStress:
532 """logout --all must atomically clear arbitrarily many entries."""
533
534 def test_logout_all_50_hubs(
535 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
536 ) -> None:
537 """logout --all removes all 50 entries in one shot."""
538 import muse.core.identity as _id_mod
539 from muse.core.identity import _dump_identity
540
541 identity_file = tmp_path / "identity.toml"
542 monkeypatch.setattr(_id_mod, "_IDENTITY_FILE", identity_file)
543 monkeypatch.setattr(_id_mod, "_IDENTITY_DIR", tmp_path)
544 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
545 monkeypatch.chdir(tmp_path)
546
547 from muse._version import __version__
548 dot_muse = muse_dir(tmp_path)
549 for sub in ("refs/heads", "objects", "commits", "snapshots"):
550 (dot_muse / sub).mkdir(parents=True, exist_ok=True)
551 (dot_muse / "repo.json").write_text(
552 json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"})
553 )
554 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n")
555 (dot_muse / "refs" / "heads" / "main").write_text("")
556 (dot_muse / "config.toml").write_text("")
557
558 entries: dict[str, IdentityEntry] = {}
559 for i in range(50):
560 entries[f"localhost:{21000 + i}"] = {
561 "type": "human",
562 "handle": f"user-{i:02d}",
563 "algorithm": "ed25519",
564 "fingerprint": "a" * 64,
565 }
566 identity_file.write_text(_dump_identity(entries))
567
568 result = runner.invoke(cli, ["auth", "logout", "--all", "--json"])
569 assert result.exit_code == 0, result.output
570 data = json.loads(result.output)
571 assert data["status"] == "ok"
572 assert data["count"] == 50
573 assert len(data["hubs"]) == 50
574
575 # File should be empty now
576 remaining = identity_file.read_text().strip()
577 assert remaining == "", f"identity.toml not cleared: {remaining!r}"
578
579
580 # ── Performance: logout --all with 50 hubs under 100 ms ──────────────────────
581
582
583 class TestLogoutPerformance:
584 def test_logout_all_50_hubs_under_100ms(
585 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
586 ) -> None:
587 import muse.core.identity as _id_mod
588 from muse.core.identity import _dump_identity, clear_all_identities
589
590 identity_file = tmp_path / "identity.toml"
591 monkeypatch.setattr(_id_mod, "_IDENTITY_FILE", identity_file)
592 monkeypatch.setattr(_id_mod, "_IDENTITY_DIR", tmp_path)
593
594 entries: dict[str, IdentityEntry] = {}
595 for i in range(50):
596 entries[f"localhost:{22000 + i}"] = {
597 "type": "human",
598 "handle": f"user-{i:02d}",
599 "algorithm": "ed25519",
600 "fingerprint": "a" * 64,
601 }
602 identity_file.write_text(_dump_identity(entries))
603
604 start = time.monotonic()
605 removed = clear_all_identities()
606 elapsed = time.monotonic() - start
607
608 assert len(removed) == 50
609 assert elapsed < 0.1, f"clear_all_identities(50) took {elapsed:.3f}s"
610
611
612 # ── Data integrity: recover produces identical fingerprint from same mnemonic ─
613
614
615 class TestRecoverDataIntegrity:
616 """Recovering from the same mnemonic must reproduce the same fingerprint."""
617
618 _MNEMONIC = (
619 "abandon abandon abandon abandon abandon abandon abandon abandon "
620 "abandon abandon abandon about"
621 )
622
623 def _patch(
624 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
625 ) -> None:
626 import muse.core.keypair as kp_mod
627 import muse.core.identity as id_mod
628
629 fake_home = tmp_path / "home"
630 fake_home.mkdir(parents=True, exist_ok=True)
631 import pathlib as _pl
632 monkeypatch.setattr(_pl.Path, "home", staticmethod(lambda: fake_home))
633 monkeypatch.setattr(kp_mod, "_KEYS_DIR", fake_home / ".muse" / "keys")
634 monkeypatch.setattr(id_mod, "_IDENTITY_DIR", fake_home / ".muse")
635 monkeypatch.setattr(id_mod, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml")
636 monkeypatch.setenv("MUSE_KEYCHAIN_BACKEND", "disabled")
637
638 def test_same_mnemonic_same_fingerprint(
639 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
640 ) -> None:
641 """Recovering twice from the same mnemonic gives the same fingerprint."""
642 self._patch(monkeypatch, tmp_path)
643
644 r1 = runner.invoke(
645 cli,
646 ["auth", "recover", "--hub", "http://localhost:19911", "--json"],
647 input=f"{self._MNEMONIC}\n",
648 )
649 assert r1.exit_code == 0, r1.output
650 d1 = json.loads(r1.output)
651
652 # Force-overwrite on second recover
653 r2 = runner.invoke(
654 cli,
655 ["auth", "recover", "--hub", "http://localhost:19911", "--force", "--json"],
656 input=f"{self._MNEMONIC}\n",
657 )
658 assert r2.exit_code == 0, r2.output
659 d2 = json.loads(r2.output)
660
661 assert d1["fingerprint"] == d2["fingerprint"], (
662 f"Fingerprint changed between recoveries: {d1['fingerprint']} vs {d2['fingerprint']}"
663 )
664 assert d1["public_key_b64"] == d2["public_key_b64"]
665
666 def test_different_mnemonic_different_fingerprint(
667 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
668 ) -> None:
669 """A different mnemonic produces a different fingerprint."""
670 self._patch(monkeypatch, tmp_path)
671
672 mnemonic_b = (
673 "abandon abandon abandon abandon abandon abandon abandon abandon "
674 "abandon abandon abandon zoo" # intentionally invalid — just needs to pass BIP39
675 )
676 # Use the canonical 12-word test vector for second recover
677 mnemonic_b = (
678 "zoo zoo zoo zoo zoo zoo zoo zoo zoo zoo zoo wrong"
679 )
680
681 r1 = runner.invoke(
682 cli,
683 ["auth", "recover", "--hub", "http://localhost:19912", "--json"],
684 input=f"{self._MNEMONIC}\n",
685 )
686 assert r1.exit_code == 0, r1.output
687
688 r2 = runner.invoke(
689 cli,
690 ["auth", "recover", "--hub", "http://localhost:19913", "--json"],
691 input=f"{mnemonic_b}\n",
692 )
693 assert r2.exit_code == 0, r2.output
694
695 d1 = json.loads(r1.output)
696 d2 = json.loads(r2.output)
697 assert d1["fingerprint"] != d2["fingerprint"], (
698 "Different mnemonics must not produce the same fingerprint"
699 )
700
701 def test_recover_hd_path_persisted(
702 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
703 ) -> None:
704 """After recover, identity.toml must contain hd_path."""
705 import muse.core.identity as id_mod
706 self._patch(monkeypatch, tmp_path)
707
708 r = runner.invoke(
709 cli,
710 ["auth", "recover", "--hub", "http://localhost:19914", "--json"],
711 input=f"{self._MNEMONIC}\n",
712 )
713 assert r.exit_code == 0, r.output
714 data = json.loads(r.output)
715 assert "hd_path" in data
716 assert data["hd_path"].startswith("m/")
717
718 # Also verify TOML on disk
719 loaded = id_mod.load_identity("http://localhost:19914")
720 assert loaded is not None
721 assert loaded.get("hd_path", "").startswith("m/")
722
723 def test_recover_mnemonic_not_in_toml(
724 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
725 ) -> None:
726 """Mnemonic must not be written to identity.toml during recover."""
727 import muse.core.identity as id_mod
728 self._patch(monkeypatch, tmp_path)
729
730 r = runner.invoke(
731 cli,
732 ["auth", "recover", "--hub", "http://localhost:19915"],
733 input=f"{self._MNEMONIC}\n",
734 )
735 assert r.exit_code == 0, r.output
736
737 import re
738 raw = id_mod._IDENTITY_FILE.read_text()
739 assert re.search(r'^\s*mnemonic\s*=', raw, re.MULTILINE) is None, (
740 f"mnemonic written to TOML:\n{raw}"
741 )
742 assert self._MNEMONIC not in raw
743
744
745 # ── Stress: recover same hub 10 times (--force) ───────────────────────────────
746
747
748 class TestRecoverStress:
749 _MNEMONIC = (
750 "abandon abandon abandon abandon abandon abandon abandon abandon "
751 "abandon abandon abandon about"
752 )
753
754 def _patch(
755 self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path
756 ) -> None:
757 import muse.core.keypair as kp_mod
758 import muse.core.identity as id_mod
759
760 fake_home = tmp_path / "home"
761 fake_home.mkdir(parents=True, exist_ok=True)
762 import pathlib as _pl
763 monkeypatch.setattr(_pl.Path, "home", staticmethod(lambda: fake_home))
764 monkeypatch.setattr(kp_mod, "_KEYS_DIR", fake_home / ".muse" / "keys")
765 monkeypatch.setattr(id_mod, "_IDENTITY_DIR", fake_home / ".muse")
766 monkeypatch.setattr(id_mod, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml")
767 monkeypatch.setenv("MUSE_KEYCHAIN_BACKEND", "disabled")
768
769 def test_10_recoveries_same_fingerprint(
770 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
771 ) -> None:
772 """10 forced recoveries from the same mnemonic must all yield the same fingerprint."""
773 self._patch(monkeypatch, tmp_path)
774
775 fingerprints: list[str] = []
776 for i in range(10):
777 flags = ["auth", "recover", "--hub", "http://localhost:19920", "--json"]
778 if i > 0:
779 flags.append("--force")
780 r = runner.invoke(cli, flags, input=f"{self._MNEMONIC}\n")
781 assert r.exit_code == 0, f"recover #{i} failed:\n{r.output}"
782 fingerprints.append(json.loads(r.output)["fingerprint"])
783
784 assert len(set(fingerprints)) == 1, (
785 f"Fingerprint varied across 10 recoveries: {set(fingerprints)}"
786 )
787
788 def test_10_recoveries_different_hubs(
789 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
790 ) -> None:
791 """Recover for 10 different hubs from the same mnemonic — all succeed."""
792 self._patch(monkeypatch, tmp_path)
793
794 fingerprints: list[str] = []
795 for i in range(10):
796 hub = f"http://localhost:{19930 + i}"
797 r = runner.invoke(
798 cli,
799 ["auth", "recover", "--hub", hub, "--json"],
800 input=f"{self._MNEMONIC}\n",
801 )
802 assert r.exit_code == 0, f"recover for {hub} failed:\n{r.output}"
803 fingerprints.append(json.loads(r.output)["fingerprint"])
804
805 # All 10 should produce the same fingerprint (same mnemonic, human key)
806 assert len(set(fingerprints)) == 1, (
807 "Same mnemonic should give same fingerprint regardless of hub hostname"
808 )
809
810
811 # ── Performance: recover completes within SLA ─────────────────────────────────
812
813
814 class TestRecoverPerformance:
815 _MNEMONIC = (
816 "abandon abandon abandon abandon abandon abandon abandon abandon "
817 "abandon abandon abandon about"
818 )
819
820 def test_recover_under_3_seconds(
821 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
822 ) -> None:
823 """Full recover (PBKDF2 + SLIP-0010 + PEM write) must complete within 3 s."""
824 import muse.core.keypair as kp_mod
825 import muse.core.identity as id_mod
826 import pathlib as _pl
827
828 fake_home = tmp_path / "home"
829 fake_home.mkdir(parents=True, exist_ok=True)
830 monkeypatch.setattr(_pl.Path, "home", staticmethod(lambda: fake_home))
831 monkeypatch.setattr(kp_mod, "_KEYS_DIR", fake_home / ".muse" / "keys")
832 monkeypatch.setattr(id_mod, "_IDENTITY_DIR", fake_home / ".muse")
833 monkeypatch.setattr(id_mod, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml")
834 monkeypatch.setenv("MUSE_KEYCHAIN_BACKEND", "disabled")
835
836 start = time.monotonic()
837 result = runner.invoke(
838 cli,
839 ["auth", "recover", "--hub", "http://localhost:19940", "--json"],
840 input=f"{self._MNEMONIC}\n",
841 )
842 elapsed = time.monotonic() - start
843
844 assert result.exit_code == 0, result.output
845 assert elapsed < 3.0, f"recover took {elapsed:.2f}s — exceeds 3 s SLA"
846
847
848 # ── Performance: register latency (stubbed network) ───────────────────────────
849
850
851 class TestRegisterPerformance:
852 """register with a mocked hub must complete within 500 ms."""
853
854 def test_register_under_500ms(
855 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
856 ) -> None:
857 import muse.core.keypair as kp_mod
858 import muse.core.identity as id_mod
859 import pathlib as _pl
860 import unittest.mock
861 import urllib.request
862
863 fake_home = tmp_path / "home"
864 fake_home.mkdir(parents=True, exist_ok=True)
865 monkeypatch.setattr(_pl.Path, "home", staticmethod(lambda: fake_home))
866 monkeypatch.setattr(kp_mod, "_KEYS_DIR", fake_home / ".muse" / "keys")
867 monkeypatch.setattr(id_mod, "_IDENTITY_DIR", fake_home / ".muse")
868 monkeypatch.setattr(id_mod, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml")
869
870 # Pre-set identity entry and patch keychain so register can sign
871 import muse.core.keychain as _kc
872 _MNEMONIC = "abandon " * 11 + "about"
873 monkeypatch.setattr(_kc, "is_available", lambda: True)
874 monkeypatch.setattr(_kc, "load", lambda: _MNEMONIC)
875 id_mod.save_identity("http://localhost:19950", {
876 "type": "human",
877 "handle": "perf-user",
878 "algorithm": "ed25519",
879 "hd_path": "m/1075233755'/0'/0'/0'/0'/0'",
880 "fingerprint": "a" * 64,
881 })
882
883 # Mock challenge-response round-trip
884 def _fake_urlopen(req: "urllib.request.Request | str", *args: str, **kwargs: str) -> "_Resp1":
885 url = req.full_url if hasattr(req, "full_url") else str(req)
886 if "challenge" in url:
887 body = json.dumps({
888 "challenge_token": "deadbeef" * 8, # 64-char hex nonce
889 "is_new_key": True,
890 "algorithm": "ed25519",
891 }).encode()
892 else:
893 body = json.dumps({
894 "token": "test-auth-token",
895 "handle": "perf-user",
896 "identity_id": "id-123",
897 "is_new_identity": True,
898 "auth_method": "ed25519",
899 }).encode()
900
901 class _Resp1:
902 def __init__(self) -> None:
903 self.status = 200
904 def read(self, n: int = -1) -> bytes:
905 return body
906 def __enter__(self) -> "_Resp1": return self
907 def __exit__(self, *a: object) -> None: pass
908
909 return _Resp1()
910
911 monkeypatch.setattr(urllib.request, "urlopen", _fake_urlopen)
912
913 start = time.monotonic()
914 result = runner.invoke(
915 cli,
916 ["auth", "register", "--hub", "http://localhost:19950",
917 "--handle", "perf-user", "--json"],
918 )
919 elapsed = time.monotonic() - start
920
921 assert result.exit_code == 0, f"register failed:\n{result.output}"
922 assert elapsed < 0.5, f"register took {elapsed:.3f}s — exceeds 500 ms SLA"
923
924
925 # ── Stress: register repeated calls to same hub ───────────────────────────────
926
927
928 class TestRegisterStress:
929 """register is idempotent — repeated calls with --force succeed."""
930
931 def test_5_register_calls_same_hub(
932 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
933 ) -> None:
934 import muse.core.keypair as kp_mod
935 import muse.core.identity as id_mod
936 import pathlib as _pl
937 import urllib.request
938
939 fake_home = tmp_path / "home"
940 fake_home.mkdir(parents=True, exist_ok=True)
941 monkeypatch.setattr(_pl.Path, "home", staticmethod(lambda: fake_home))
942 monkeypatch.setattr(kp_mod, "_KEYS_DIR", fake_home / ".muse" / "keys")
943 monkeypatch.setattr(id_mod, "_IDENTITY_DIR", fake_home / ".muse")
944 monkeypatch.setattr(id_mod, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml")
945
946 # Pre-set identity entry and patch keychain so register can sign
947 import muse.core.keychain as _kc
948 _MNEMONIC = "abandon " * 11 + "about"
949 monkeypatch.setattr(_kc, "is_available", lambda: True)
950 monkeypatch.setattr(_kc, "load", lambda: _MNEMONIC)
951 id_mod.save_identity("http://localhost:19960", {
952 "type": "human",
953 "handle": "stress-user",
954 "algorithm": "ed25519",
955 "hd_path": "m/1075233755'/0'/0'/0'/0'/0'",
956 "fingerprint": "a" * 64,
957 })
958
959 call_count = [0]
960
961 def _fake_urlopen(req: "urllib.request.Request | str", *args: str, **kwargs: str) -> "_Resp2":
962 call_count[0] += 1
963 url = req.full_url if hasattr(req, "full_url") else str(req)
964 if "challenge" in url:
965 body = json.dumps({
966 "challenge_token": "cafebabe" * 8, # 64-char hex nonce
967 "is_new_key": False,
968 "algorithm": "ed25519",
969 }).encode()
970 else:
971 body = json.dumps({
972 "token": "auth-token",
973 "handle": "stress-user",
974 "identity_id": "id-stress",
975 "is_new_identity": False,
976 "auth_method": "ed25519",
977 }).encode()
978
979 class _Resp2:
980 status = 200
981 def read(self, n: int = -1) -> bytes:
982 return body
983 def __enter__(self) -> "_Resp2": return self
984 def __exit__(self, *a: object) -> None: pass
985
986 return _Resp2()
987
988 monkeypatch.setattr(urllib.request, "urlopen", _fake_urlopen)
989
990 handles_seen: list[str] = []
991 for i in range(5):
992 r = runner.invoke(
993 cli,
994 ["auth", "register", "--hub", "http://localhost:19960",
995 "--handle", "stress-user", "--json"],
996 )
997 assert r.exit_code == 0, f"register #{i} failed:\n{r.output}"
998 data = _parse_json(r)
999 handles_seen.append(data.get("handle", ""))
1000
1001 assert all(h == "stress-user" for h in handles_seen), (
1002 f"handle inconsistent across 5 registrations: {handles_seen}"
1003 )
1004
1005
1006 # ── TDD: key_path removed from IdentityEntry and _dump_identity (P3) ─────────
1007
1008
1009 class TestKeyPathPurged:
1010 """P3: key_path must be gone from IdentityEntry TypedDict and _dump_identity.
1011
1012 Before fix: key_path: str is in IdentityEntry; _dump_identity serialises it.
1013 After fix: key_path is absent from the TypedDict; _dump_identity never emits it.
1014 """
1015
1016 def test_P3_1_key_path_not_in_identity_entry_typeddict(self) -> None:
1017 """key_path must not appear in IdentityEntry's annotations."""
1018 from muse.core.identity import IdentityEntry
1019 annotations = IdentityEntry.__annotations__
1020 assert "key_path" not in annotations, (
1021 "key_path still in IdentityEntry TypedDict — Phase 3 not complete"
1022 )
1023
1024 def test_P3_2_dump_identity_never_emits_key_path(self) -> None:
1025 """_dump_identity must not write key_path to TOML even when entry has it."""
1026 from muse.core.identity import _dump_identity
1027 entry = {
1028 "type": "human",
1029 "handle": "alice",
1030 "key_path": "/home/alice/.muse/keys/musehub_ai.pem", # must be stripped
1031 "algorithm": "ed25519",
1032 "fingerprint": "abc123",
1033 "hd_path": "m/1075233755'/0'/0'/0'/0'/0'",
1034 }
1035 toml_text = _dump_identity({"musehub.ai": entry})
1036 assert "key_path" not in toml_text, (
1037 f"_dump_identity still emits key_path:\n{toml_text}"
1038 )
File History 1 commit