"""Supercharge tests for ``muse auth``. New features under test ----------------------- 1. ``--type human|agent`` filter on ``whoami --all`` 2. ``provisioned_by`` field in ``whoami --json`` output (trust chain visibility) 3. ``hd_path`` field in ``whoami --json`` output (HD provenance visibility) 4. ``_KeygenJson`` TypedDict completeness: ``hd_path``, ``mnemonic_word_count``, ``label`` 5. ``_ShowJson`` TypedDict completeness: ``algorithm``, ``provisioned_by``, ``provisioned_by_fingerprint`` 6. Security: invalid ``--type`` value exits non-zero 7. ``show --json`` includes ``algorithm`` and ``hd_path`` Test categories --------------- - unit : TypedDict schema completeness - integration : CLI round-trips via CliRunner with isolated identity files - security : bad --type value rejected, ANSI-safe output - data integrity: provisioned_by, hd_path survive save→load round-trip - performance : _load_all with 50 entries under 200 ms """ from __future__ import annotations from collections.abc import Mapping import json import pathlib import time import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.identity import IdentityEntry, save_identity from muse.core.paths import muse_dir cli = None runner = CliRunner() HUB = "http://localhost:19111" HOSTNAME = "localhost:19111" HUB2 = "http://localhost:19222" HOSTNAME2 = "localhost:19222" # ── fixtures ────────────────────────────────────────────────────────────────── @pytest.fixture def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: """Minimal .muse/ repo with isolated identity home.""" from muse._version import __version__ dot_muse = muse_dir(tmp_path) for sub in ("refs/heads", "objects", "commits", "snapshots"): (dot_muse / sub).mkdir(parents=True, exist_ok=True) (dot_muse / "repo.json").write_text( json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"}) ) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") (dot_muse / "refs" / "heads" / "main").write_text("") (dot_muse / "config.toml").write_text("") muse_home = tmp_path / ".muse-home" muse_home.mkdir() (muse_home / "identity.toml").write_text("") import muse.core.identity as _id_mod monkeypatch.setattr(_id_mod, "_IDENTITY_FILE", muse_home / "identity.toml") monkeypatch.setattr(_id_mod, "_IDENTITY_DIR", muse_home) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) monkeypatch.chdir(tmp_path) return tmp_path def _human(handle: str = "alice") -> IdentityEntry: return { "type": "human", "handle": handle, "algorithm": "ed25519", "fingerprint": "a" * 64, } def _agent(handle: str = "bot", provisioned_by: str = "alice") -> IdentityEntry: return { "type": "agent", "handle": handle, "algorithm": "ed25519", "fingerprint": "b" * 64, "provisioned_by": provisioned_by, } def _parse_json(result: InvokeResult) -> Mapping[str, object]: """Parse first JSON structure from result.output (object or array).""" for line in result.output.splitlines(): stripped = line.strip() if stripped.startswith("{") or stripped.startswith("["): return json.loads(stripped) raise ValueError(f"No JSON in output:\n{result.output!r}") # ── Unit: _KeygenJson TypedDict completeness ────────────────────────────────── class TestKeygenJsonTypedDict: """_KeygenJson TypedDict must declare hd_path and mnemonic_word_count.""" def test_hd_path_in_keygen_typeddict(self) -> None: from muse.cli.commands.auth import _KeygenJson hints = _KeygenJson.__annotations__ assert "hd_path" in hints, ( "_KeygenJson missing hd_path — run_keygen already emits it" ) def test_mnemonic_word_count_in_keygen_typeddict(self) -> None: from muse.cli.commands.auth import _KeygenJson hints = _KeygenJson.__annotations__ assert "mnemonic_word_count" in hints, ( "_KeygenJson missing mnemonic_word_count — run_keygen already emits it" ) def test_all_required_keys_present(self) -> None: from muse.cli.commands.auth import _KeygenJson hints = _KeygenJson.__annotations__ for key in ("status", "hub", "hostname", "public_key_b64", "fingerprint", "hd_path", "mnemonic_word_count"): assert key in hints, f"_KeygenJson missing key: {key!r}" # ── Unit: _ShowJson TypedDict completeness ──────────────────────────────────── class TestShowJsonTypedDict: """_ShowJson TypedDict must expose algorithm, provisioned_by, provisioned_by_fingerprint — fields that identity.toml stores but the show command didn't surface.""" def test_algorithm_in_show_typeddict(self) -> None: from muse.cli.commands.auth import _ShowJson assert "algorithm" in _ShowJson.__annotations__, "_ShowJson missing algorithm" def test_provisioned_by_in_show_typeddict(self) -> None: from muse.cli.commands.auth import _ShowJson assert "provisioned_by" in _ShowJson.__annotations__, \ "_ShowJson missing provisioned_by (agent trust chain)" def test_provisioned_by_fingerprint_in_show_typeddict(self) -> None: from muse.cli.commands.auth import _ShowJson assert "provisioned_by_fingerprint" in _ShowJson.__annotations__, \ "_ShowJson missing provisioned_by_fingerprint" # ── Unit: _WhoamiJson has provisioned_by and hd_path ───────────────────────── class TestWhoamiJsonTypedDict: def test_provisioned_by_in_whoami_typeddict(self) -> None: from muse.cli.commands.auth import _WhoamiJson assert "provisioned_by" in _WhoamiJson.__annotations__, \ "_WhoamiJson missing provisioned_by" def test_hd_path_in_whoami_typeddict(self) -> None: from muse.cli.commands.auth import _WhoamiJson assert "hd_path" in _WhoamiJson.__annotations__, \ "_WhoamiJson missing hd_path" # ── Integration: _display_entry emits provisioned_by for agents ─────────────── class TestDisplayEntryProvisionedBy: def test_agent_provisioned_by_in_json(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.auth import _display_entry entry: IdentityEntry = { "type": "agent", "handle": "bot", "algorithm": "ed25519", "fingerprint": "b" * 64, "provisioned_by": "alice", } _display_entry(HOSTNAME, entry, json_output=True) data = json.loads(capsys.readouterr().out) assert data.get("provisioned_by") == "alice" def test_human_no_provisioned_by(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.auth import _display_entry entry: IdentityEntry = _human() _display_entry(HOSTNAME, entry, json_output=True) data = json.loads(capsys.readouterr().out) assert "provisioned_by" not in data or data.get("provisioned_by") == "" def test_agent_hd_path_in_json(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.auth import _display_entry hd_path = "m/1075233755'/0'/0'/0'/0'/0'" entry: IdentityEntry = { "type": "human", "handle": "gabriel", "algorithm": "ed25519", "fingerprint": "a" * 64, "hd_path": hd_path, } _display_entry(HOSTNAME, entry, json_output=True) data = json.loads(capsys.readouterr().out) assert data.get("hd_path") == hd_path def test_no_hd_path_absent_from_json(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.auth import _display_entry entry: IdentityEntry = _human() _display_entry(HOSTNAME, entry, json_output=True) data = json.loads(capsys.readouterr().out) assert "hd_path" not in data or data.get("hd_path") == "" # ── Integration: whoami --all --type filter ─────────────────────────────────── class TestWhoamiTypeFilter: """``muse auth whoami --all --type TYPE`` filters by identity type.""" def test_type_human_returns_only_humans(self, repo: pathlib.Path) -> None: save_identity(HUB, _human("alice")) save_identity(HUB2, _agent("bot", "alice")) result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "human", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output)["identities"] assert isinstance(data, list) assert all(e["type"] == "human" for e in data), f"non-human in results: {data}" handles = {e["handle"] for e in data} assert "alice" in handles assert "bot" not in handles def test_type_agent_returns_only_agents(self, repo: pathlib.Path) -> None: save_identity(HUB, _human("alice")) save_identity(HUB2, _agent("bot", "alice")) result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "agent", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output)["identities"] assert isinstance(data, list) assert all(e["type"] == "agent" for e in data) handles = {e["handle"] for e in data} assert "bot" in handles assert "alice" not in handles def test_type_filter_no_match_exits_nonzero(self, repo: pathlib.Path) -> None: """--type agent when only humans are stored → exit nonzero.""" save_identity(HUB, _human("alice")) result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "agent"]) assert result.exit_code != 0 def test_type_filter_no_match_json_exits_nonzero(self, repo: pathlib.Path) -> None: save_identity(HUB, _human("alice")) result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "agent", "--json"]) assert result.exit_code != 0 def test_type_invalid_value_exits_nonzero(self, repo: pathlib.Path) -> None: """--type must accept only 'human' or 'agent'.""" save_identity(HUB, _human()) result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "superuser"]) assert result.exit_code != 0 def test_type_requires_all_flag(self, repo: pathlib.Path) -> None: """--type without --all should fail or be ignored gracefully.""" save_identity(HUB, _human()) result = runner.invoke(cli, ["auth", "whoami", "--hub", HUB, "--type", "human", "--json"]) # Either succeeds (type flag ignored for single-hub) or fails cleanly # Most important: no crash / traceback assert result.exit_code in (0, 1), f"Unexpected exit code: {result.exit_code}" def test_type_filter_counts_correctly(self, repo: pathlib.Path) -> None: """3 humans + 2 agents; --type human → 3 results.""" hubs = [f"http://localhost:{19111 + i}" for i in range(5)] for i, hub in enumerate(hubs): if i < 3: save_identity(hub, _human(f"human-{i}")) else: save_identity(hub, _agent(f"bot-{i}", "operator")) result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "human", "--json"]) assert result.exit_code == 0 data = json.loads(result.output)["identities"] assert len(data) == 3 def test_type_agent_includes_provisioned_by(self, repo: pathlib.Path) -> None: """Agent entries in --type agent output expose provisioned_by.""" save_identity(HUB, _agent("bot", "alice")) result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "agent", "--json"]) assert result.exit_code == 0 data = json.loads(result.output)["identities"] assert len(data) == 1 assert data[0].get("provisioned_by") == "alice" # ── Integration: whoami --json includes provisioned_by for agents ───────────── class TestWhoamiProvisionedBy: def test_whoami_json_agent_has_provisioned_by(self, repo: pathlib.Path) -> None: save_identity(HUB, _agent("bot", "alice")) result = runner.invoke(cli, ["auth", "whoami", "--hub", HUB, "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert data.get("provisioned_by") == "alice" def test_whoami_json_human_no_provisioned_by(self, repo: pathlib.Path) -> None: save_identity(HUB, _human()) result = runner.invoke(cli, ["auth", "whoami", "--hub", HUB, "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "provisioned_by" not in data or not data["provisioned_by"] def test_whoami_all_json_agent_has_provisioned_by(self, repo: pathlib.Path) -> None: save_identity(HUB, _agent("bot", "alice")) result = runner.invoke(cli, ["auth", "whoami", "--all", "--json"]) assert result.exit_code == 0 entries = json.loads(result.output)["identities"] bot = next(e for e in entries if e["handle"] == "bot") assert bot.get("provisioned_by") == "alice" # ── Integration: whoami --json includes hd_path when present ────────────────── class TestWhoamiHdPath: def test_whoami_json_hd_entry_has_hd_path(self, repo: pathlib.Path) -> None: hd_path = "m/1075233755'/0'/0'/0'/0'/0'" entry: IdentityEntry = { "type": "human", "handle": "gabriel", "algorithm": "ed25519", "fingerprint": "a" * 64, "hd_path": hd_path, } save_identity(HUB, entry) result = runner.invoke(cli, ["auth", "whoami", "--hub", HUB, "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert data.get("hd_path") == hd_path def test_whoami_json_no_hd_path_when_absent(self, repo: pathlib.Path) -> None: save_identity(HUB, _human()) result = runner.invoke(cli, ["auth", "whoami", "--hub", HUB, "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "hd_path" not in data or not data["hd_path"] # ── Integration: show --json includes algorithm ─────────────────────────────── class TestShowJsonKeyPathAlgorithm: def test_show_json_has_algorithm(self, repo: pathlib.Path) -> None: save_identity(HUB, _human()) result = runner.invoke(cli, ["auth", "show", "--hub", HUB, "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert "algorithm" in data, f"show --json missing algorithm; got: {list(data)}" def test_show_json_algorithm_value_correct(self, repo: pathlib.Path) -> None: save_identity(HUB, _human()) result = runner.invoke(cli, ["auth", "show", "--hub", HUB, "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert data["algorithm"] == "ed25519" def test_show_json_agent_has_provisioned_by(self, repo: pathlib.Path) -> None: save_identity(HUB, _agent("bot", "alice")) result = runner.invoke(cli, ["auth", "show", "--hub", HUB, "--json"]) assert result.exit_code == 0 data = json.loads(result.output) assert data.get("provisioned_by") == "alice" # ── Security: type filter input validation ──────────────────────────────────── class TestTypeFilterSecurity: def test_type_with_ansi_injection_rejected(self, repo: pathlib.Path) -> None: save_identity(HUB, _human()) result = runner.invoke( cli, ["auth", "whoami", "--all", "--type", "\x1b[31mhuman\x1b[0m"] ) assert result.exit_code != 0 def test_type_with_newline_injection_rejected(self, repo: pathlib.Path) -> None: save_identity(HUB, _human()) result = runner.invoke( cli, ["auth", "whoami", "--all", "--type", "human\nmalicious"] ) assert result.exit_code != 0 def test_type_with_semicolon_rejected(self, repo: pathlib.Path) -> None: save_identity(HUB, _human()) result = runner.invoke( cli, ["auth", "whoami", "--all", "--type", "human;rm -rf /"] ) assert result.exit_code != 0 # ── Performance: _load_all with 50 entries under 200 ms ────────────────────── class TestLoadAllPerformance: def test_50_entries_under_200ms(self, tmp_path: pathlib.Path) -> None: from muse.core.identity import _load_all, _dump_identity entries: dict[str, IdentityEntry] = {} for i in range(50): hostname = f"localhost:{19500 + i}" entries[hostname] = { "type": "human", "handle": f"user-{i:02d}", "algorithm": "ed25519", "fingerprint": "a" * 64, "hd_path": "m/1075233755'/0'/0'/0'/0'/0'", } p = tmp_path / "identity.toml" p.write_text(_dump_identity(entries)) start = time.monotonic() loaded = _load_all(p) elapsed = time.monotonic() - start assert len(loaded) == 50 assert elapsed < 0.2, f"_load_all with 50 entries took {elapsed:.3f}s" # ── Stress: show with 50+ identities ───────────────────────────────────────── class TestShowStress: """show must handle large identity files without corruption.""" def test_show_with_50_identities_returns_correct_entry( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """show --hub TARGET picks the right entry from a file with 50 entries.""" import muse.core.identity as _id_mod from muse.core.identity import _dump_identity identity_file = tmp_path / "identity.toml" monkeypatch.setattr(_id_mod, "_IDENTITY_FILE", identity_file) monkeypatch.setattr(_id_mod, "_IDENTITY_DIR", tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) monkeypatch.chdir(tmp_path) # Create minimal .muse structure from muse._version import __version__ dot_muse = muse_dir(tmp_path) for sub in ("refs/heads", "objects", "commits", "snapshots"): (dot_muse / sub).mkdir(parents=True, exist_ok=True) (dot_muse / "repo.json").write_text( json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"}) ) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") (dot_muse / "refs" / "heads" / "main").write_text("") (dot_muse / "config.toml").write_text("") entries: dict[str, IdentityEntry] = {} target_hub = "http://localhost:20050" for i in range(50): hub = f"http://localhost:{20000 + i}" entries[f"localhost:{20000 + i}"] = { "type": "human", "handle": f"user-{i:02d}", "algorithm": "ed25519", "fingerprint": "a" * 64, } # Override one specific entry as the target entries["localhost:20050"] = { "type": "human", "handle": "target-user", "algorithm": "ed25519", "fingerprint": "f" * 64, } identity_file.write_text(_dump_identity(entries)) result = runner.invoke(cli, ["auth", "show", "--hub", target_hub, "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["handle"] == "target-user" assert data["fingerprint"] == "f" * 64 def test_show_50_repeated_calls_consistent( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """50 consecutive show calls return identical results.""" import muse.core.identity as _id_mod from muse.core.identity import _dump_identity identity_file = tmp_path / "identity.toml" monkeypatch.setattr(_id_mod, "_IDENTITY_FILE", identity_file) monkeypatch.setattr(_id_mod, "_IDENTITY_DIR", tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) monkeypatch.chdir(tmp_path) from muse._version import __version__ dot_muse = muse_dir(tmp_path) for sub in ("refs/heads", "objects", "commits", "snapshots"): (dot_muse / sub).mkdir(parents=True, exist_ok=True) (dot_muse / "repo.json").write_text( json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"}) ) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") (dot_muse / "refs" / "heads" / "main").write_text("") (dot_muse / "config.toml").write_text("") entry: IdentityEntry = { "type": "human", "handle": "stable-user", "algorithm": "ed25519", "fingerprint": "b" * 64, } identity_file.write_text(_dump_identity({"localhost:20099": entry})) results = set() for _ in range(50): r = runner.invoke(cli, ["auth", "show", "--hub", "http://localhost:20099", "--json"]) assert r.exit_code == 0 d = json.loads(r.output) d.pop("duration_ms", None) d.pop("timestamp", None) results.add(json.dumps(d, sort_keys=True)) assert len(results) == 1, "show returned different output across 50 calls" # ── Stress: logout clear_all_identities with many hubs ─────────────────────── class TestLogoutStress: """logout --all must atomically clear arbitrarily many entries.""" def test_logout_all_50_hubs( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """logout --all removes all 50 entries in one shot.""" import muse.core.identity as _id_mod from muse.core.identity import _dump_identity identity_file = tmp_path / "identity.toml" monkeypatch.setattr(_id_mod, "_IDENTITY_FILE", identity_file) monkeypatch.setattr(_id_mod, "_IDENTITY_DIR", tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) monkeypatch.chdir(tmp_path) from muse._version import __version__ dot_muse = muse_dir(tmp_path) for sub in ("refs/heads", "objects", "commits", "snapshots"): (dot_muse / sub).mkdir(parents=True, exist_ok=True) (dot_muse / "repo.json").write_text( json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"}) ) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") (dot_muse / "refs" / "heads" / "main").write_text("") (dot_muse / "config.toml").write_text("") entries: dict[str, IdentityEntry] = {} for i in range(50): entries[f"localhost:{21000 + i}"] = { "type": "human", "handle": f"user-{i:02d}", "algorithm": "ed25519", "fingerprint": "a" * 64, } identity_file.write_text(_dump_identity(entries)) result = runner.invoke(cli, ["auth", "logout", "--all", "--json"]) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["status"] == "ok" assert data["count"] == 50 assert len(data["hubs"]) == 50 # File should be empty now remaining = identity_file.read_text().strip() assert remaining == "", f"identity.toml not cleared: {remaining!r}" # ── Performance: logout --all with 50 hubs under 100 ms ────────────────────── class TestLogoutPerformance: def test_logout_all_50_hubs_under_100ms( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: import muse.core.identity as _id_mod from muse.core.identity import _dump_identity, clear_all_identities identity_file = tmp_path / "identity.toml" monkeypatch.setattr(_id_mod, "_IDENTITY_FILE", identity_file) monkeypatch.setattr(_id_mod, "_IDENTITY_DIR", tmp_path) entries: dict[str, IdentityEntry] = {} for i in range(50): entries[f"localhost:{22000 + i}"] = { "type": "human", "handle": f"user-{i:02d}", "algorithm": "ed25519", "fingerprint": "a" * 64, } identity_file.write_text(_dump_identity(entries)) start = time.monotonic() removed = clear_all_identities() elapsed = time.monotonic() - start assert len(removed) == 50 assert elapsed < 0.1, f"clear_all_identities(50) took {elapsed:.3f}s" # ── Data integrity: recover produces identical fingerprint from same mnemonic ─ class TestRecoverDataIntegrity: """Recovering from the same mnemonic must reproduce the same fingerprint.""" _MNEMONIC = ( "abandon abandon abandon abandon abandon abandon abandon abandon " "abandon abandon abandon about" ) def _patch( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: import muse.core.keypair as kp_mod import muse.core.identity as id_mod fake_home = tmp_path / "home" fake_home.mkdir(parents=True, exist_ok=True) import pathlib as _pl monkeypatch.setattr(_pl.Path, "home", staticmethod(lambda: fake_home)) monkeypatch.setattr(kp_mod, "_KEYS_DIR", fake_home / ".muse" / "keys") monkeypatch.setattr(id_mod, "_IDENTITY_DIR", fake_home / ".muse") monkeypatch.setattr(id_mod, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml") monkeypatch.setenv("MUSE_KEYCHAIN_BACKEND", "disabled") def test_same_mnemonic_same_fingerprint( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Recovering twice from the same mnemonic gives the same fingerprint.""" self._patch(monkeypatch, tmp_path) r1 = runner.invoke( cli, ["auth", "recover", "--hub", "http://localhost:19911", "--json"], input=f"{self._MNEMONIC}\n", ) assert r1.exit_code == 0, r1.output d1 = json.loads(r1.output) # Force-overwrite on second recover r2 = runner.invoke( cli, ["auth", "recover", "--hub", "http://localhost:19911", "--force", "--json"], input=f"{self._MNEMONIC}\n", ) assert r2.exit_code == 0, r2.output d2 = json.loads(r2.output) assert d1["fingerprint"] == d2["fingerprint"], ( f"Fingerprint changed between recoveries: {d1['fingerprint']} vs {d2['fingerprint']}" ) assert d1["public_key_b64"] == d2["public_key_b64"] def test_different_mnemonic_different_fingerprint( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """A different mnemonic produces a different fingerprint.""" self._patch(monkeypatch, tmp_path) mnemonic_b = ( "abandon abandon abandon abandon abandon abandon abandon abandon " "abandon abandon abandon zoo" # intentionally invalid — just needs to pass BIP39 ) # Use the canonical 12-word test vector for second recover mnemonic_b = ( "zoo zoo zoo zoo zoo zoo zoo zoo zoo zoo zoo wrong" ) r1 = runner.invoke( cli, ["auth", "recover", "--hub", "http://localhost:19912", "--json"], input=f"{self._MNEMONIC}\n", ) assert r1.exit_code == 0, r1.output r2 = runner.invoke( cli, ["auth", "recover", "--hub", "http://localhost:19913", "--json"], input=f"{mnemonic_b}\n", ) assert r2.exit_code == 0, r2.output d1 = json.loads(r1.output) d2 = json.loads(r2.output) assert d1["fingerprint"] != d2["fingerprint"], ( "Different mnemonics must not produce the same fingerprint" ) def test_recover_hd_path_persisted( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """After recover, identity.toml must contain hd_path.""" import muse.core.identity as id_mod self._patch(monkeypatch, tmp_path) r = runner.invoke( cli, ["auth", "recover", "--hub", "http://localhost:19914", "--json"], input=f"{self._MNEMONIC}\n", ) assert r.exit_code == 0, r.output data = json.loads(r.output) assert "hd_path" in data assert data["hd_path"].startswith("m/") # Also verify TOML on disk loaded = id_mod.load_identity("http://localhost:19914") assert loaded is not None assert loaded.get("hd_path", "").startswith("m/") def test_recover_mnemonic_not_in_toml( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Mnemonic must not be written to identity.toml during recover.""" import muse.core.identity as id_mod self._patch(monkeypatch, tmp_path) r = runner.invoke( cli, ["auth", "recover", "--hub", "http://localhost:19915"], input=f"{self._MNEMONIC}\n", ) assert r.exit_code == 0, r.output import re raw = id_mod._IDENTITY_FILE.read_text() assert re.search(r'^\s*mnemonic\s*=', raw, re.MULTILINE) is None, ( f"mnemonic written to TOML:\n{raw}" ) assert self._MNEMONIC not in raw # ── Stress: recover same hub 10 times (--force) ─────────────────────────────── class TestRecoverStress: _MNEMONIC = ( "abandon abandon abandon abandon abandon abandon abandon abandon " "abandon abandon abandon about" ) def _patch( self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path ) -> None: import muse.core.keypair as kp_mod import muse.core.identity as id_mod fake_home = tmp_path / "home" fake_home.mkdir(parents=True, exist_ok=True) import pathlib as _pl monkeypatch.setattr(_pl.Path, "home", staticmethod(lambda: fake_home)) monkeypatch.setattr(kp_mod, "_KEYS_DIR", fake_home / ".muse" / "keys") monkeypatch.setattr(id_mod, "_IDENTITY_DIR", fake_home / ".muse") monkeypatch.setattr(id_mod, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml") monkeypatch.setenv("MUSE_KEYCHAIN_BACKEND", "disabled") def test_10_recoveries_same_fingerprint( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """10 forced recoveries from the same mnemonic must all yield the same fingerprint.""" self._patch(monkeypatch, tmp_path) fingerprints: list[str] = [] for i in range(10): flags = ["auth", "recover", "--hub", "http://localhost:19920", "--json"] if i > 0: flags.append("--force") r = runner.invoke(cli, flags, input=f"{self._MNEMONIC}\n") assert r.exit_code == 0, f"recover #{i} failed:\n{r.output}" fingerprints.append(json.loads(r.output)["fingerprint"]) assert len(set(fingerprints)) == 1, ( f"Fingerprint varied across 10 recoveries: {set(fingerprints)}" ) def test_10_recoveries_different_hubs( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Recover for 10 different hubs from the same mnemonic — all succeed.""" self._patch(monkeypatch, tmp_path) fingerprints: list[str] = [] for i in range(10): hub = f"http://localhost:{19930 + i}" r = runner.invoke( cli, ["auth", "recover", "--hub", hub, "--json"], input=f"{self._MNEMONIC}\n", ) assert r.exit_code == 0, f"recover for {hub} failed:\n{r.output}" fingerprints.append(json.loads(r.output)["fingerprint"]) # All 10 should produce the same fingerprint (same mnemonic, human key) assert len(set(fingerprints)) == 1, ( "Same mnemonic should give same fingerprint regardless of hub hostname" ) # ── Performance: recover completes within SLA ───────────────────────────────── class TestRecoverPerformance: _MNEMONIC = ( "abandon abandon abandon abandon abandon abandon abandon abandon " "abandon abandon abandon about" ) def test_recover_under_3_seconds( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: """Full recover (PBKDF2 + SLIP-0010 + PEM write) must complete within 3 s.""" import muse.core.keypair as kp_mod import muse.core.identity as id_mod import pathlib as _pl fake_home = tmp_path / "home" fake_home.mkdir(parents=True, exist_ok=True) monkeypatch.setattr(_pl.Path, "home", staticmethod(lambda: fake_home)) monkeypatch.setattr(kp_mod, "_KEYS_DIR", fake_home / ".muse" / "keys") monkeypatch.setattr(id_mod, "_IDENTITY_DIR", fake_home / ".muse") monkeypatch.setattr(id_mod, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml") monkeypatch.setenv("MUSE_KEYCHAIN_BACKEND", "disabled") start = time.monotonic() result = runner.invoke( cli, ["auth", "recover", "--hub", "http://localhost:19940", "--json"], input=f"{self._MNEMONIC}\n", ) elapsed = time.monotonic() - start assert result.exit_code == 0, result.output assert elapsed < 3.0, f"recover took {elapsed:.2f}s — exceeds 3 s SLA" # ── Performance: register latency (stubbed network) ─────────────────────────── class TestRegisterPerformance: """register with a mocked hub must complete within 500 ms.""" def test_register_under_500ms( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: import muse.core.keypair as kp_mod import muse.core.identity as id_mod import pathlib as _pl import unittest.mock import urllib.request fake_home = tmp_path / "home" fake_home.mkdir(parents=True, exist_ok=True) monkeypatch.setattr(_pl.Path, "home", staticmethod(lambda: fake_home)) monkeypatch.setattr(kp_mod, "_KEYS_DIR", fake_home / ".muse" / "keys") monkeypatch.setattr(id_mod, "_IDENTITY_DIR", fake_home / ".muse") monkeypatch.setattr(id_mod, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml") # Pre-set identity entry and patch keychain so register can sign import muse.core.keychain as _kc _MNEMONIC = "abandon " * 11 + "about" monkeypatch.setattr(_kc, "is_available", lambda: True) monkeypatch.setattr(_kc, "load", lambda: _MNEMONIC) id_mod.save_identity("http://localhost:19950", { "type": "human", "handle": "perf-user", "algorithm": "ed25519", "hd_path": "m/1075233755'/0'/0'/0'/0'/0'", "fingerprint": "a" * 64, }) # Mock challenge-response round-trip def _fake_urlopen(req: "urllib.request.Request | str", *args: str, **kwargs: str) -> "_Resp1": url = req.full_url if hasattr(req, "full_url") else str(req) if "challenge" in url: body = json.dumps({ "challenge_token": "deadbeef" * 8, # 64-char hex nonce "is_new_key": True, "algorithm": "ed25519", }).encode() else: body = json.dumps({ "token": "test-auth-token", "handle": "perf-user", "identity_id": "id-123", "is_new_identity": True, "auth_method": "ed25519", }).encode() class _Resp1: def __init__(self) -> None: self.status = 200 def read(self, n: int = -1) -> bytes: return body def __enter__(self) -> "_Resp1": return self def __exit__(self, *a: object) -> None: pass return _Resp1() monkeypatch.setattr(urllib.request, "urlopen", _fake_urlopen) start = time.monotonic() result = runner.invoke( cli, ["auth", "register", "--hub", "http://localhost:19950", "--handle", "perf-user", "--json"], ) elapsed = time.monotonic() - start assert result.exit_code == 0, f"register failed:\n{result.output}" assert elapsed < 0.5, f"register took {elapsed:.3f}s — exceeds 500 ms SLA" # ── Stress: register repeated calls to same hub ─────────────────────────────── class TestRegisterStress: """register is idempotent — repeated calls with --force succeed.""" def test_5_register_calls_same_hub( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: import muse.core.keypair as kp_mod import muse.core.identity as id_mod import pathlib as _pl import urllib.request fake_home = tmp_path / "home" fake_home.mkdir(parents=True, exist_ok=True) monkeypatch.setattr(_pl.Path, "home", staticmethod(lambda: fake_home)) monkeypatch.setattr(kp_mod, "_KEYS_DIR", fake_home / ".muse" / "keys") monkeypatch.setattr(id_mod, "_IDENTITY_DIR", fake_home / ".muse") monkeypatch.setattr(id_mod, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml") # Pre-set identity entry and patch keychain so register can sign import muse.core.keychain as _kc _MNEMONIC = "abandon " * 11 + "about" monkeypatch.setattr(_kc, "is_available", lambda: True) monkeypatch.setattr(_kc, "load", lambda: _MNEMONIC) id_mod.save_identity("http://localhost:19960", { "type": "human", "handle": "stress-user", "algorithm": "ed25519", "hd_path": "m/1075233755'/0'/0'/0'/0'/0'", "fingerprint": "a" * 64, }) call_count = [0] def _fake_urlopen(req: "urllib.request.Request | str", *args: str, **kwargs: str) -> "_Resp2": call_count[0] += 1 url = req.full_url if hasattr(req, "full_url") else str(req) if "challenge" in url: body = json.dumps({ "challenge_token": "cafebabe" * 8, # 64-char hex nonce "is_new_key": False, "algorithm": "ed25519", }).encode() else: body = json.dumps({ "token": "auth-token", "handle": "stress-user", "identity_id": "id-stress", "is_new_identity": False, "auth_method": "ed25519", }).encode() class _Resp2: status = 200 def read(self, n: int = -1) -> bytes: return body def __enter__(self) -> "_Resp2": return self def __exit__(self, *a: object) -> None: pass return _Resp2() monkeypatch.setattr(urllib.request, "urlopen", _fake_urlopen) handles_seen: list[str] = [] for i in range(5): r = runner.invoke( cli, ["auth", "register", "--hub", "http://localhost:19960", "--handle", "stress-user", "--json"], ) assert r.exit_code == 0, f"register #{i} failed:\n{r.output}" data = _parse_json(r) handles_seen.append(data.get("handle", "")) assert all(h == "stress-user" for h in handles_seen), ( f"handle inconsistent across 5 registrations: {handles_seen}" ) # ── TDD: key_path removed from IdentityEntry and _dump_identity (P3) ───────── class TestKeyPathPurged: """P3: key_path must be gone from IdentityEntry TypedDict and _dump_identity. Before fix: key_path: str is in IdentityEntry; _dump_identity serialises it. After fix: key_path is absent from the TypedDict; _dump_identity never emits it. """ def test_P3_1_key_path_not_in_identity_entry_typeddict(self) -> None: """key_path must not appear in IdentityEntry's annotations.""" from muse.core.identity import IdentityEntry annotations = IdentityEntry.__annotations__ assert "key_path" not in annotations, ( "key_path still in IdentityEntry TypedDict — Phase 3 not complete" ) def test_P3_2_dump_identity_never_emits_key_path(self) -> None: """_dump_identity must not write key_path to TOML even when entry has it.""" from muse.core.identity import _dump_identity entry = { "type": "human", "handle": "alice", "key_path": "/home/alice/.muse/keys/musehub_ai.pem", # must be stripped "algorithm": "ed25519", "fingerprint": "abc123", "hd_path": "m/1075233755'/0'/0'/0'/0'/0'", } toml_text = _dump_identity({"musehub.ai": entry}) assert "key_path" not in toml_text, ( f"_dump_identity still emits key_path:\n{toml_text}" )