test_auth_supercharge.py
python
sha256:d11a87833d5fad6059b7662844bf5448a8911a17cce7a51811f71ad394f248eb
bump to v0.2.0rc13
Human
patch
6 days ago
| 1 | """Supercharge tests for ``muse auth``. |
| 2 | |
| 3 | New features under test |
| 4 | ----------------------- |
| 5 | 1. ``--type human|agent`` filter on ``whoami --all`` |
| 6 | 2. ``provisioned_by`` field in ``whoami --json`` output (trust chain visibility) |
| 7 | 3. ``hd_path`` field in ``whoami --json`` output (HD provenance visibility) |
| 8 | 4. ``_KeygenJson`` TypedDict completeness: ``hd_path``, ``mnemonic_word_count``, ``label`` |
| 9 | 5. ``_ShowJson`` TypedDict completeness: ``algorithm``, |
| 10 | ``provisioned_by``, ``provisioned_by_fingerprint`` |
| 11 | 6. Security: invalid ``--type`` value exits non-zero |
| 12 | 7. ``show --json`` includes ``algorithm`` and ``hd_path`` |
| 13 | |
| 14 | Test categories |
| 15 | --------------- |
| 16 | - unit : TypedDict schema completeness |
| 17 | - integration : CLI round-trips via CliRunner with isolated identity files |
| 18 | - security : bad --type value rejected, ANSI-safe output |
| 19 | - data integrity: provisioned_by, hd_path survive save→load round-trip |
| 20 | - performance : _load_all with 50 entries under 200 ms |
| 21 | """ |
| 22 | |
| 23 | from __future__ import annotations |
| 24 | from collections.abc import Mapping |
| 25 | |
| 26 | import json |
| 27 | import pathlib |
| 28 | import time |
| 29 | |
| 30 | import pytest |
| 31 | |
| 32 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 33 | from muse.core.identity import IdentityEntry, save_identity |
| 34 | from muse.core.paths import muse_dir |
| 35 | |
| 36 | cli = None |
| 37 | runner = CliRunner() |
| 38 | |
| 39 | HUB = "http://localhost:19111" |
| 40 | HOSTNAME = "localhost:19111" |
| 41 | HUB2 = "http://localhost:19222" |
| 42 | HOSTNAME2 = "localhost:19222" |
| 43 | |
| 44 | |
| 45 | # ── fixtures ────────────────────────────────────────────────────────────────── |
| 46 | |
| 47 | |
| 48 | @pytest.fixture |
| 49 | def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: |
| 50 | """Minimal .muse/ repo with isolated identity home.""" |
| 51 | from muse._version import __version__ |
| 52 | |
| 53 | dot_muse = muse_dir(tmp_path) |
| 54 | for sub in ("refs/heads", "objects", "commits", "snapshots"): |
| 55 | (dot_muse / sub).mkdir(parents=True, exist_ok=True) |
| 56 | (dot_muse / "repo.json").write_text( |
| 57 | json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"}) |
| 58 | ) |
| 59 | (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") |
| 60 | (dot_muse / "refs" / "heads" / "main").write_text("") |
| 61 | (dot_muse / "config.toml").write_text("") |
| 62 | |
| 63 | muse_home = tmp_path / ".muse-home" |
| 64 | muse_home.mkdir() |
| 65 | (muse_home / "identity.toml").write_text("") |
| 66 | |
| 67 | import muse.core.identity as _id_mod |
| 68 | monkeypatch.setattr(_id_mod, "_IDENTITY_FILE", muse_home / "identity.toml") |
| 69 | monkeypatch.setattr(_id_mod, "_IDENTITY_DIR", muse_home) |
| 70 | monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) |
| 71 | monkeypatch.chdir(tmp_path) |
| 72 | return tmp_path |
| 73 | |
| 74 | |
| 75 | def _human(handle: str = "alice") -> IdentityEntry: |
| 76 | return { |
| 77 | "type": "human", |
| 78 | "handle": handle, |
| 79 | "algorithm": "ed25519", |
| 80 | "fingerprint": "a" * 64, |
| 81 | } |
| 82 | |
| 83 | |
| 84 | def _agent(handle: str = "bot", provisioned_by: str = "alice") -> IdentityEntry: |
| 85 | return { |
| 86 | "type": "agent", |
| 87 | "handle": handle, |
| 88 | "algorithm": "ed25519", |
| 89 | "fingerprint": "b" * 64, |
| 90 | "provisioned_by": provisioned_by, |
| 91 | } |
| 92 | |
| 93 | |
| 94 | def _parse_json(result: InvokeResult) -> Mapping[str, object]: |
| 95 | """Parse first JSON structure from result.output (object or array).""" |
| 96 | for line in result.output.splitlines(): |
| 97 | stripped = line.strip() |
| 98 | if stripped.startswith("{") or stripped.startswith("["): |
| 99 | return json.loads(stripped) |
| 100 | raise ValueError(f"No JSON in output:\n{result.output!r}") |
| 101 | |
| 102 | |
| 103 | # ── Unit: _KeygenJson TypedDict completeness ────────────────────────────────── |
| 104 | |
| 105 | |
| 106 | class TestKeygenJsonTypedDict: |
| 107 | """_KeygenJson TypedDict must declare hd_path and mnemonic_word_count.""" |
| 108 | |
| 109 | def test_hd_path_in_keygen_typeddict(self) -> None: |
| 110 | from muse.cli.commands.auth import _KeygenJson |
| 111 | hints = _KeygenJson.__annotations__ |
| 112 | assert "hd_path" in hints, ( |
| 113 | "_KeygenJson missing hd_path — run_keygen already emits it" |
| 114 | ) |
| 115 | |
| 116 | def test_mnemonic_word_count_in_keygen_typeddict(self) -> None: |
| 117 | from muse.cli.commands.auth import _KeygenJson |
| 118 | hints = _KeygenJson.__annotations__ |
| 119 | assert "mnemonic_word_count" in hints, ( |
| 120 | "_KeygenJson missing mnemonic_word_count — run_keygen already emits it" |
| 121 | ) |
| 122 | |
| 123 | def test_all_required_keys_present(self) -> None: |
| 124 | from muse.cli.commands.auth import _KeygenJson |
| 125 | hints = _KeygenJson.__annotations__ |
| 126 | for key in ("status", "hub", "hostname", "public_key_b64", |
| 127 | "fingerprint", "hd_path", "mnemonic_word_count"): |
| 128 | assert key in hints, f"_KeygenJson missing key: {key!r}" |
| 129 | |
| 130 | |
| 131 | # ── Unit: _ShowJson TypedDict completeness ──────────────────────────────────── |
| 132 | |
| 133 | |
| 134 | class TestShowJsonTypedDict: |
| 135 | """_ShowJson TypedDict must expose algorithm, provisioned_by, |
| 136 | provisioned_by_fingerprint — fields that identity.toml stores but the |
| 137 | show command didn't surface.""" |
| 138 | |
| 139 | def test_algorithm_in_show_typeddict(self) -> None: |
| 140 | from muse.cli.commands.auth import _ShowJson |
| 141 | assert "algorithm" in _ShowJson.__annotations__, "_ShowJson missing algorithm" |
| 142 | |
| 143 | def test_provisioned_by_in_show_typeddict(self) -> None: |
| 144 | from muse.cli.commands.auth import _ShowJson |
| 145 | assert "provisioned_by" in _ShowJson.__annotations__, \ |
| 146 | "_ShowJson missing provisioned_by (agent trust chain)" |
| 147 | |
| 148 | def test_provisioned_by_fingerprint_in_show_typeddict(self) -> None: |
| 149 | from muse.cli.commands.auth import _ShowJson |
| 150 | assert "provisioned_by_fingerprint" in _ShowJson.__annotations__, \ |
| 151 | "_ShowJson missing provisioned_by_fingerprint" |
| 152 | |
| 153 | |
| 154 | # ── Unit: _WhoamiJson has provisioned_by and hd_path ───────────────────────── |
| 155 | |
| 156 | |
| 157 | class TestWhoamiJsonTypedDict: |
| 158 | def test_provisioned_by_in_whoami_typeddict(self) -> None: |
| 159 | from muse.cli.commands.auth import _WhoamiJson |
| 160 | assert "provisioned_by" in _WhoamiJson.__annotations__, \ |
| 161 | "_WhoamiJson missing provisioned_by" |
| 162 | |
| 163 | def test_hd_path_in_whoami_typeddict(self) -> None: |
| 164 | from muse.cli.commands.auth import _WhoamiJson |
| 165 | assert "hd_path" in _WhoamiJson.__annotations__, \ |
| 166 | "_WhoamiJson missing hd_path" |
| 167 | |
| 168 | |
| 169 | # ── Integration: _display_entry emits provisioned_by for agents ─────────────── |
| 170 | |
| 171 | |
| 172 | class TestDisplayEntryProvisionedBy: |
| 173 | def test_agent_provisioned_by_in_json(self, capsys: pytest.CaptureFixture[str]) -> None: |
| 174 | from muse.cli.commands.auth import _display_entry |
| 175 | entry: IdentityEntry = { |
| 176 | "type": "agent", |
| 177 | "handle": "bot", |
| 178 | "algorithm": "ed25519", |
| 179 | "fingerprint": "b" * 64, |
| 180 | "provisioned_by": "alice", |
| 181 | } |
| 182 | _display_entry(HOSTNAME, entry, json_output=True) |
| 183 | data = json.loads(capsys.readouterr().out) |
| 184 | assert data.get("provisioned_by") == "alice" |
| 185 | |
| 186 | def test_human_no_provisioned_by(self, capsys: pytest.CaptureFixture[str]) -> None: |
| 187 | from muse.cli.commands.auth import _display_entry |
| 188 | entry: IdentityEntry = _human() |
| 189 | _display_entry(HOSTNAME, entry, json_output=True) |
| 190 | data = json.loads(capsys.readouterr().out) |
| 191 | assert "provisioned_by" not in data or data.get("provisioned_by") == "" |
| 192 | |
| 193 | def test_agent_hd_path_in_json(self, capsys: pytest.CaptureFixture[str]) -> None: |
| 194 | from muse.cli.commands.auth import _display_entry |
| 195 | hd_path = "m/1075233755'/0'/0'/0'/0'/0'" |
| 196 | entry: IdentityEntry = { |
| 197 | "type": "human", |
| 198 | "handle": "gabriel", |
| 199 | "algorithm": "ed25519", |
| 200 | "fingerprint": "a" * 64, |
| 201 | "hd_path": hd_path, |
| 202 | } |
| 203 | _display_entry(HOSTNAME, entry, json_output=True) |
| 204 | data = json.loads(capsys.readouterr().out) |
| 205 | assert data.get("hd_path") == hd_path |
| 206 | |
| 207 | def test_no_hd_path_absent_from_json(self, capsys: pytest.CaptureFixture[str]) -> None: |
| 208 | from muse.cli.commands.auth import _display_entry |
| 209 | entry: IdentityEntry = _human() |
| 210 | _display_entry(HOSTNAME, entry, json_output=True) |
| 211 | data = json.loads(capsys.readouterr().out) |
| 212 | assert "hd_path" not in data or data.get("hd_path") == "" |
| 213 | |
| 214 | |
| 215 | # ── Integration: whoami --all --type filter ─────────────────────────────────── |
| 216 | |
| 217 | |
| 218 | class TestWhoamiTypeFilter: |
| 219 | """``muse auth whoami --all --type TYPE`` filters by identity type.""" |
| 220 | |
| 221 | def test_type_human_returns_only_humans(self, repo: pathlib.Path) -> None: |
| 222 | save_identity(HUB, _human("alice")) |
| 223 | save_identity(HUB2, _agent("bot", "alice")) |
| 224 | result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "human", "--json"]) |
| 225 | assert result.exit_code == 0, result.output |
| 226 | data = json.loads(result.output)["identities"] |
| 227 | assert isinstance(data, list) |
| 228 | assert all(e["type"] == "human" for e in data), f"non-human in results: {data}" |
| 229 | handles = {e["handle"] for e in data} |
| 230 | assert "alice" in handles |
| 231 | assert "bot" not in handles |
| 232 | |
| 233 | def test_type_agent_returns_only_agents(self, repo: pathlib.Path) -> None: |
| 234 | save_identity(HUB, _human("alice")) |
| 235 | save_identity(HUB2, _agent("bot", "alice")) |
| 236 | result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "agent", "--json"]) |
| 237 | assert result.exit_code == 0, result.output |
| 238 | data = json.loads(result.output)["identities"] |
| 239 | assert isinstance(data, list) |
| 240 | assert all(e["type"] == "agent" for e in data) |
| 241 | handles = {e["handle"] for e in data} |
| 242 | assert "bot" in handles |
| 243 | assert "alice" not in handles |
| 244 | |
| 245 | def test_type_filter_no_match_exits_nonzero(self, repo: pathlib.Path) -> None: |
| 246 | """--type agent when only humans are stored → exit nonzero.""" |
| 247 | save_identity(HUB, _human("alice")) |
| 248 | result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "agent"]) |
| 249 | assert result.exit_code != 0 |
| 250 | |
| 251 | def test_type_filter_no_match_json_exits_nonzero(self, repo: pathlib.Path) -> None: |
| 252 | save_identity(HUB, _human("alice")) |
| 253 | result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "agent", "--json"]) |
| 254 | assert result.exit_code != 0 |
| 255 | |
| 256 | def test_type_invalid_value_exits_nonzero(self, repo: pathlib.Path) -> None: |
| 257 | """--type must accept only 'human' or 'agent'.""" |
| 258 | save_identity(HUB, _human()) |
| 259 | result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "superuser"]) |
| 260 | assert result.exit_code != 0 |
| 261 | |
| 262 | def test_type_requires_all_flag(self, repo: pathlib.Path) -> None: |
| 263 | """--type without --all should fail or be ignored gracefully.""" |
| 264 | save_identity(HUB, _human()) |
| 265 | result = runner.invoke(cli, ["auth", "whoami", "--hub", HUB, "--type", "human", "--json"]) |
| 266 | # Either succeeds (type flag ignored for single-hub) or fails cleanly |
| 267 | # Most important: no crash / traceback |
| 268 | assert result.exit_code in (0, 1), f"Unexpected exit code: {result.exit_code}" |
| 269 | |
| 270 | def test_type_filter_counts_correctly(self, repo: pathlib.Path) -> None: |
| 271 | """3 humans + 2 agents; --type human → 3 results.""" |
| 272 | hubs = [f"http://localhost:{19111 + i}" for i in range(5)] |
| 273 | for i, hub in enumerate(hubs): |
| 274 | if i < 3: |
| 275 | save_identity(hub, _human(f"human-{i}")) |
| 276 | else: |
| 277 | save_identity(hub, _agent(f"bot-{i}", "operator")) |
| 278 | result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "human", "--json"]) |
| 279 | assert result.exit_code == 0 |
| 280 | data = json.loads(result.output)["identities"] |
| 281 | assert len(data) == 3 |
| 282 | |
| 283 | def test_type_agent_includes_provisioned_by(self, repo: pathlib.Path) -> None: |
| 284 | """Agent entries in --type agent output expose provisioned_by.""" |
| 285 | save_identity(HUB, _agent("bot", "alice")) |
| 286 | result = runner.invoke(cli, ["auth", "whoami", "--all", "--type", "agent", "--json"]) |
| 287 | assert result.exit_code == 0 |
| 288 | data = json.loads(result.output)["identities"] |
| 289 | assert len(data) == 1 |
| 290 | assert data[0].get("provisioned_by") == "alice" |
| 291 | |
| 292 | |
| 293 | # ── Integration: whoami --json includes provisioned_by for agents ───────────── |
| 294 | |
| 295 | |
| 296 | class TestWhoamiProvisionedBy: |
| 297 | def test_whoami_json_agent_has_provisioned_by(self, repo: pathlib.Path) -> None: |
| 298 | save_identity(HUB, _agent("bot", "alice")) |
| 299 | result = runner.invoke(cli, ["auth", "whoami", "--hub", HUB, "--json"]) |
| 300 | assert result.exit_code == 0 |
| 301 | data = json.loads(result.output) |
| 302 | assert data.get("provisioned_by") == "alice" |
| 303 | |
| 304 | def test_whoami_json_human_no_provisioned_by(self, repo: pathlib.Path) -> None: |
| 305 | save_identity(HUB, _human()) |
| 306 | result = runner.invoke(cli, ["auth", "whoami", "--hub", HUB, "--json"]) |
| 307 | assert result.exit_code == 0 |
| 308 | data = json.loads(result.output) |
| 309 | assert "provisioned_by" not in data or not data["provisioned_by"] |
| 310 | |
| 311 | def test_whoami_all_json_agent_has_provisioned_by(self, repo: pathlib.Path) -> None: |
| 312 | save_identity(HUB, _agent("bot", "alice")) |
| 313 | result = runner.invoke(cli, ["auth", "whoami", "--all", "--json"]) |
| 314 | assert result.exit_code == 0 |
| 315 | entries = json.loads(result.output)["identities"] |
| 316 | bot = next(e for e in entries if e["handle"] == "bot") |
| 317 | assert bot.get("provisioned_by") == "alice" |
| 318 | |
| 319 | |
| 320 | # ── Integration: whoami --json includes hd_path when present ────────────────── |
| 321 | |
| 322 | |
| 323 | class TestWhoamiHdPath: |
| 324 | def test_whoami_json_hd_entry_has_hd_path(self, repo: pathlib.Path) -> None: |
| 325 | hd_path = "m/1075233755'/0'/0'/0'/0'/0'" |
| 326 | entry: IdentityEntry = { |
| 327 | "type": "human", |
| 328 | "handle": "gabriel", |
| 329 | "algorithm": "ed25519", |
| 330 | "fingerprint": "a" * 64, |
| 331 | "hd_path": hd_path, |
| 332 | } |
| 333 | save_identity(HUB, entry) |
| 334 | result = runner.invoke(cli, ["auth", "whoami", "--hub", HUB, "--json"]) |
| 335 | assert result.exit_code == 0 |
| 336 | data = json.loads(result.output) |
| 337 | assert data.get("hd_path") == hd_path |
| 338 | |
| 339 | def test_whoami_json_no_hd_path_when_absent(self, repo: pathlib.Path) -> None: |
| 340 | save_identity(HUB, _human()) |
| 341 | result = runner.invoke(cli, ["auth", "whoami", "--hub", HUB, "--json"]) |
| 342 | assert result.exit_code == 0 |
| 343 | data = json.loads(result.output) |
| 344 | assert "hd_path" not in data or not data["hd_path"] |
| 345 | |
| 346 | |
| 347 | # ── Integration: show --json includes algorithm ─────────────────────────────── |
| 348 | |
| 349 | |
| 350 | class TestShowJsonKeyPathAlgorithm: |
| 351 | def test_show_json_has_algorithm(self, repo: pathlib.Path) -> None: |
| 352 | save_identity(HUB, _human()) |
| 353 | result = runner.invoke(cli, ["auth", "show", "--hub", HUB, "--json"]) |
| 354 | assert result.exit_code == 0 |
| 355 | data = json.loads(result.output) |
| 356 | assert "algorithm" in data, f"show --json missing algorithm; got: {list(data)}" |
| 357 | |
| 358 | def test_show_json_algorithm_value_correct(self, repo: pathlib.Path) -> None: |
| 359 | save_identity(HUB, _human()) |
| 360 | result = runner.invoke(cli, ["auth", "show", "--hub", HUB, "--json"]) |
| 361 | assert result.exit_code == 0 |
| 362 | data = json.loads(result.output) |
| 363 | assert data["algorithm"] == "ed25519" |
| 364 | |
| 365 | def test_show_json_agent_has_provisioned_by(self, repo: pathlib.Path) -> None: |
| 366 | save_identity(HUB, _agent("bot", "alice")) |
| 367 | result = runner.invoke(cli, ["auth", "show", "--hub", HUB, "--json"]) |
| 368 | assert result.exit_code == 0 |
| 369 | data = json.loads(result.output) |
| 370 | assert data.get("provisioned_by") == "alice" |
| 371 | |
| 372 | |
| 373 | # ── Security: type filter input validation ──────────────────────────────────── |
| 374 | |
| 375 | |
| 376 | class TestTypeFilterSecurity: |
| 377 | def test_type_with_ansi_injection_rejected(self, repo: pathlib.Path) -> None: |
| 378 | save_identity(HUB, _human()) |
| 379 | result = runner.invoke( |
| 380 | cli, ["auth", "whoami", "--all", "--type", "\x1b[31mhuman\x1b[0m"] |
| 381 | ) |
| 382 | assert result.exit_code != 0 |
| 383 | |
| 384 | def test_type_with_newline_injection_rejected(self, repo: pathlib.Path) -> None: |
| 385 | save_identity(HUB, _human()) |
| 386 | result = runner.invoke( |
| 387 | cli, ["auth", "whoami", "--all", "--type", "human\nmalicious"] |
| 388 | ) |
| 389 | assert result.exit_code != 0 |
| 390 | |
| 391 | def test_type_with_semicolon_rejected(self, repo: pathlib.Path) -> None: |
| 392 | save_identity(HUB, _human()) |
| 393 | result = runner.invoke( |
| 394 | cli, ["auth", "whoami", "--all", "--type", "human;rm -rf /"] |
| 395 | ) |
| 396 | assert result.exit_code != 0 |
| 397 | |
| 398 | |
| 399 | # ── Performance: _load_all with 50 entries under 200 ms ────────────────────── |
| 400 | |
| 401 | |
| 402 | class TestLoadAllPerformance: |
| 403 | def test_50_entries_under_200ms(self, tmp_path: pathlib.Path) -> None: |
| 404 | from muse.core.identity import _load_all, _dump_identity |
| 405 | |
| 406 | entries: dict[str, IdentityEntry] = {} |
| 407 | for i in range(50): |
| 408 | hostname = f"localhost:{19500 + i}" |
| 409 | entries[hostname] = { |
| 410 | "type": "human", |
| 411 | "handle": f"user-{i:02d}", |
| 412 | "algorithm": "ed25519", |
| 413 | "fingerprint": "a" * 64, |
| 414 | "hd_path": "m/1075233755'/0'/0'/0'/0'/0'", |
| 415 | } |
| 416 | |
| 417 | p = tmp_path / "identity.toml" |
| 418 | p.write_text(_dump_identity(entries)) |
| 419 | |
| 420 | start = time.monotonic() |
| 421 | loaded = _load_all(p) |
| 422 | elapsed = time.monotonic() - start |
| 423 | |
| 424 | assert len(loaded) == 50 |
| 425 | assert elapsed < 0.2, f"_load_all with 50 entries took {elapsed:.3f}s" |
| 426 | |
| 427 | |
| 428 | # ── Stress: show with 50+ identities ───────────────────────────────────────── |
| 429 | |
| 430 | |
| 431 | class TestShowStress: |
| 432 | """show must handle large identity files without corruption.""" |
| 433 | |
| 434 | def test_show_with_50_identities_returns_correct_entry( |
| 435 | self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch |
| 436 | ) -> None: |
| 437 | """show --hub TARGET picks the right entry from a file with 50 entries.""" |
| 438 | import muse.core.identity as _id_mod |
| 439 | from muse.core.identity import _dump_identity |
| 440 | |
| 441 | identity_file = tmp_path / "identity.toml" |
| 442 | monkeypatch.setattr(_id_mod, "_IDENTITY_FILE", identity_file) |
| 443 | monkeypatch.setattr(_id_mod, "_IDENTITY_DIR", tmp_path) |
| 444 | monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) |
| 445 | monkeypatch.chdir(tmp_path) |
| 446 | |
| 447 | # Create minimal .muse structure |
| 448 | from muse._version import __version__ |
| 449 | dot_muse = muse_dir(tmp_path) |
| 450 | for sub in ("refs/heads", "objects", "commits", "snapshots"): |
| 451 | (dot_muse / sub).mkdir(parents=True, exist_ok=True) |
| 452 | (dot_muse / "repo.json").write_text( |
| 453 | json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"}) |
| 454 | ) |
| 455 | (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") |
| 456 | (dot_muse / "refs" / "heads" / "main").write_text("") |
| 457 | (dot_muse / "config.toml").write_text("") |
| 458 | |
| 459 | entries: dict[str, IdentityEntry] = {} |
| 460 | target_hub = "http://localhost:20050" |
| 461 | for i in range(50): |
| 462 | hub = f"http://localhost:{20000 + i}" |
| 463 | entries[f"localhost:{20000 + i}"] = { |
| 464 | "type": "human", |
| 465 | "handle": f"user-{i:02d}", |
| 466 | "algorithm": "ed25519", |
| 467 | "fingerprint": "a" * 64, |
| 468 | } |
| 469 | # Override one specific entry as the target |
| 470 | entries["localhost:20050"] = { |
| 471 | "type": "human", |
| 472 | "handle": "target-user", |
| 473 | "algorithm": "ed25519", |
| 474 | "fingerprint": "f" * 64, |
| 475 | } |
| 476 | identity_file.write_text(_dump_identity(entries)) |
| 477 | |
| 478 | result = runner.invoke(cli, ["auth", "show", "--hub", target_hub, "--json"]) |
| 479 | assert result.exit_code == 0, result.output |
| 480 | data = json.loads(result.output) |
| 481 | assert data["handle"] == "target-user" |
| 482 | assert data["fingerprint"] == "f" * 64 |
| 483 | |
| 484 | def test_show_50_repeated_calls_consistent( |
| 485 | self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch |
| 486 | ) -> None: |
| 487 | """50 consecutive show calls return identical results.""" |
| 488 | import muse.core.identity as _id_mod |
| 489 | from muse.core.identity import _dump_identity |
| 490 | |
| 491 | identity_file = tmp_path / "identity.toml" |
| 492 | monkeypatch.setattr(_id_mod, "_IDENTITY_FILE", identity_file) |
| 493 | monkeypatch.setattr(_id_mod, "_IDENTITY_DIR", tmp_path) |
| 494 | monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) |
| 495 | monkeypatch.chdir(tmp_path) |
| 496 | |
| 497 | from muse._version import __version__ |
| 498 | dot_muse = muse_dir(tmp_path) |
| 499 | for sub in ("refs/heads", "objects", "commits", "snapshots"): |
| 500 | (dot_muse / sub).mkdir(parents=True, exist_ok=True) |
| 501 | (dot_muse / "repo.json").write_text( |
| 502 | json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"}) |
| 503 | ) |
| 504 | (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") |
| 505 | (dot_muse / "refs" / "heads" / "main").write_text("") |
| 506 | (dot_muse / "config.toml").write_text("") |
| 507 | |
| 508 | entry: IdentityEntry = { |
| 509 | "type": "human", |
| 510 | "handle": "stable-user", |
| 511 | "algorithm": "ed25519", |
| 512 | "fingerprint": "b" * 64, |
| 513 | } |
| 514 | identity_file.write_text(_dump_identity({"localhost:20099": entry})) |
| 515 | |
| 516 | results = set() |
| 517 | for _ in range(50): |
| 518 | r = runner.invoke(cli, ["auth", "show", "--hub", "http://localhost:20099", "--json"]) |
| 519 | assert r.exit_code == 0 |
| 520 | d = json.loads(r.output) |
| 521 | d.pop("duration_ms", None) |
| 522 | d.pop("timestamp", None) |
| 523 | results.add(json.dumps(d, sort_keys=True)) |
| 524 | |
| 525 | assert len(results) == 1, "show returned different output across 50 calls" |
| 526 | |
| 527 | |
| 528 | # ── Stress: logout clear_all_identities with many hubs ─────────────────────── |
| 529 | |
| 530 | |
| 531 | class TestLogoutStress: |
| 532 | """logout --all must atomically clear arbitrarily many entries.""" |
| 533 | |
| 534 | def test_logout_all_50_hubs( |
| 535 | self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch |
| 536 | ) -> None: |
| 537 | """logout --all removes all 50 entries in one shot.""" |
| 538 | import muse.core.identity as _id_mod |
| 539 | from muse.core.identity import _dump_identity |
| 540 | |
| 541 | identity_file = tmp_path / "identity.toml" |
| 542 | monkeypatch.setattr(_id_mod, "_IDENTITY_FILE", identity_file) |
| 543 | monkeypatch.setattr(_id_mod, "_IDENTITY_DIR", tmp_path) |
| 544 | monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) |
| 545 | monkeypatch.chdir(tmp_path) |
| 546 | |
| 547 | from muse._version import __version__ |
| 548 | dot_muse = muse_dir(tmp_path) |
| 549 | for sub in ("refs/heads", "objects", "commits", "snapshots"): |
| 550 | (dot_muse / sub).mkdir(parents=True, exist_ok=True) |
| 551 | (dot_muse / "repo.json").write_text( |
| 552 | json.dumps({"repo_id": "test-repo", "schema_version": __version__, "domain": "code"}) |
| 553 | ) |
| 554 | (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") |
| 555 | (dot_muse / "refs" / "heads" / "main").write_text("") |
| 556 | (dot_muse / "config.toml").write_text("") |
| 557 | |
| 558 | entries: dict[str, IdentityEntry] = {} |
| 559 | for i in range(50): |
| 560 | entries[f"localhost:{21000 + i}"] = { |
| 561 | "type": "human", |
| 562 | "handle": f"user-{i:02d}", |
| 563 | "algorithm": "ed25519", |
| 564 | "fingerprint": "a" * 64, |
| 565 | } |
| 566 | identity_file.write_text(_dump_identity(entries)) |
| 567 | |
| 568 | result = runner.invoke(cli, ["auth", "logout", "--all", "--json"]) |
| 569 | assert result.exit_code == 0, result.output |
| 570 | data = json.loads(result.output) |
| 571 | assert data["status"] == "ok" |
| 572 | assert data["count"] == 50 |
| 573 | assert len(data["hubs"]) == 50 |
| 574 | |
| 575 | # File should be empty now |
| 576 | remaining = identity_file.read_text().strip() |
| 577 | assert remaining == "", f"identity.toml not cleared: {remaining!r}" |
| 578 | |
| 579 | |
| 580 | # ── Performance: logout --all with 50 hubs under 100 ms ────────────────────── |
| 581 | |
| 582 | |
| 583 | class TestLogoutPerformance: |
| 584 | def test_logout_all_50_hubs_under_100ms( |
| 585 | self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch |
| 586 | ) -> None: |
| 587 | import muse.core.identity as _id_mod |
| 588 | from muse.core.identity import _dump_identity, clear_all_identities |
| 589 | |
| 590 | identity_file = tmp_path / "identity.toml" |
| 591 | monkeypatch.setattr(_id_mod, "_IDENTITY_FILE", identity_file) |
| 592 | monkeypatch.setattr(_id_mod, "_IDENTITY_DIR", tmp_path) |
| 593 | |
| 594 | entries: dict[str, IdentityEntry] = {} |
| 595 | for i in range(50): |
| 596 | entries[f"localhost:{22000 + i}"] = { |
| 597 | "type": "human", |
| 598 | "handle": f"user-{i:02d}", |
| 599 | "algorithm": "ed25519", |
| 600 | "fingerprint": "a" * 64, |
| 601 | } |
| 602 | identity_file.write_text(_dump_identity(entries)) |
| 603 | |
| 604 | start = time.monotonic() |
| 605 | removed = clear_all_identities() |
| 606 | elapsed = time.monotonic() - start |
| 607 | |
| 608 | assert len(removed) == 50 |
| 609 | assert elapsed < 0.1, f"clear_all_identities(50) took {elapsed:.3f}s" |
| 610 | |
| 611 | |
| 612 | # ── Data integrity: recover produces identical fingerprint from same mnemonic ─ |
| 613 | |
| 614 | |
| 615 | class TestRecoverDataIntegrity: |
| 616 | """Recovering from the same mnemonic must reproduce the same fingerprint.""" |
| 617 | |
| 618 | _MNEMONIC = ( |
| 619 | "abandon abandon abandon abandon abandon abandon abandon abandon " |
| 620 | "abandon abandon abandon about" |
| 621 | ) |
| 622 | |
| 623 | def _patch( |
| 624 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 625 | ) -> None: |
| 626 | import muse.core.keypair as kp_mod |
| 627 | import muse.core.identity as id_mod |
| 628 | |
| 629 | fake_home = tmp_path / "home" |
| 630 | fake_home.mkdir(parents=True, exist_ok=True) |
| 631 | import pathlib as _pl |
| 632 | monkeypatch.setattr(_pl.Path, "home", staticmethod(lambda: fake_home)) |
| 633 | monkeypatch.setattr(kp_mod, "_KEYS_DIR", fake_home / ".muse" / "keys") |
| 634 | monkeypatch.setattr(id_mod, "_IDENTITY_DIR", fake_home / ".muse") |
| 635 | monkeypatch.setattr(id_mod, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml") |
| 636 | monkeypatch.setenv("MUSE_KEYCHAIN_BACKEND", "disabled") |
| 637 | |
| 638 | def test_same_mnemonic_same_fingerprint( |
| 639 | self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch |
| 640 | ) -> None: |
| 641 | """Recovering twice from the same mnemonic gives the same fingerprint.""" |
| 642 | self._patch(monkeypatch, tmp_path) |
| 643 | |
| 644 | r1 = runner.invoke( |
| 645 | cli, |
| 646 | ["auth", "recover", "--hub", "http://localhost:19911", "--json"], |
| 647 | input=f"{self._MNEMONIC}\n", |
| 648 | ) |
| 649 | assert r1.exit_code == 0, r1.output |
| 650 | d1 = json.loads(r1.output) |
| 651 | |
| 652 | # Force-overwrite on second recover |
| 653 | r2 = runner.invoke( |
| 654 | cli, |
| 655 | ["auth", "recover", "--hub", "http://localhost:19911", "--force", "--json"], |
| 656 | input=f"{self._MNEMONIC}\n", |
| 657 | ) |
| 658 | assert r2.exit_code == 0, r2.output |
| 659 | d2 = json.loads(r2.output) |
| 660 | |
| 661 | assert d1["fingerprint"] == d2["fingerprint"], ( |
| 662 | f"Fingerprint changed between recoveries: {d1['fingerprint']} vs {d2['fingerprint']}" |
| 663 | ) |
| 664 | assert d1["public_key_b64"] == d2["public_key_b64"] |
| 665 | |
| 666 | def test_different_mnemonic_different_fingerprint( |
| 667 | self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch |
| 668 | ) -> None: |
| 669 | """A different mnemonic produces a different fingerprint.""" |
| 670 | self._patch(monkeypatch, tmp_path) |
| 671 | |
| 672 | mnemonic_b = ( |
| 673 | "abandon abandon abandon abandon abandon abandon abandon abandon " |
| 674 | "abandon abandon abandon zoo" # intentionally invalid — just needs to pass BIP39 |
| 675 | ) |
| 676 | # Use the canonical 12-word test vector for second recover |
| 677 | mnemonic_b = ( |
| 678 | "zoo zoo zoo zoo zoo zoo zoo zoo zoo zoo zoo wrong" |
| 679 | ) |
| 680 | |
| 681 | r1 = runner.invoke( |
| 682 | cli, |
| 683 | ["auth", "recover", "--hub", "http://localhost:19912", "--json"], |
| 684 | input=f"{self._MNEMONIC}\n", |
| 685 | ) |
| 686 | assert r1.exit_code == 0, r1.output |
| 687 | |
| 688 | r2 = runner.invoke( |
| 689 | cli, |
| 690 | ["auth", "recover", "--hub", "http://localhost:19913", "--json"], |
| 691 | input=f"{mnemonic_b}\n", |
| 692 | ) |
| 693 | assert r2.exit_code == 0, r2.output |
| 694 | |
| 695 | d1 = json.loads(r1.output) |
| 696 | d2 = json.loads(r2.output) |
| 697 | assert d1["fingerprint"] != d2["fingerprint"], ( |
| 698 | "Different mnemonics must not produce the same fingerprint" |
| 699 | ) |
| 700 | |
| 701 | def test_recover_hd_path_persisted( |
| 702 | self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch |
| 703 | ) -> None: |
| 704 | """After recover, identity.toml must contain hd_path.""" |
| 705 | import muse.core.identity as id_mod |
| 706 | self._patch(monkeypatch, tmp_path) |
| 707 | |
| 708 | r = runner.invoke( |
| 709 | cli, |
| 710 | ["auth", "recover", "--hub", "http://localhost:19914", "--json"], |
| 711 | input=f"{self._MNEMONIC}\n", |
| 712 | ) |
| 713 | assert r.exit_code == 0, r.output |
| 714 | data = json.loads(r.output) |
| 715 | assert "hd_path" in data |
| 716 | assert data["hd_path"].startswith("m/") |
| 717 | |
| 718 | # Also verify TOML on disk |
| 719 | loaded = id_mod.load_identity("http://localhost:19914") |
| 720 | assert loaded is not None |
| 721 | assert loaded.get("hd_path", "").startswith("m/") |
| 722 | |
| 723 | def test_recover_mnemonic_not_in_toml( |
| 724 | self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch |
| 725 | ) -> None: |
| 726 | """Mnemonic must not be written to identity.toml during recover.""" |
| 727 | import muse.core.identity as id_mod |
| 728 | self._patch(monkeypatch, tmp_path) |
| 729 | |
| 730 | r = runner.invoke( |
| 731 | cli, |
| 732 | ["auth", "recover", "--hub", "http://localhost:19915"], |
| 733 | input=f"{self._MNEMONIC}\n", |
| 734 | ) |
| 735 | assert r.exit_code == 0, r.output |
| 736 | |
| 737 | import re |
| 738 | raw = id_mod._IDENTITY_FILE.read_text() |
| 739 | assert re.search(r'^\s*mnemonic\s*=', raw, re.MULTILINE) is None, ( |
| 740 | f"mnemonic written to TOML:\n{raw}" |
| 741 | ) |
| 742 | assert self._MNEMONIC not in raw |
| 743 | |
| 744 | |
| 745 | # ── Stress: recover same hub 10 times (--force) ─────────────────────────────── |
| 746 | |
| 747 | |
| 748 | class TestRecoverStress: |
| 749 | _MNEMONIC = ( |
| 750 | "abandon abandon abandon abandon abandon abandon abandon abandon " |
| 751 | "abandon abandon abandon about" |
| 752 | ) |
| 753 | |
| 754 | def _patch( |
| 755 | self, monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path |
| 756 | ) -> None: |
| 757 | import muse.core.keypair as kp_mod |
| 758 | import muse.core.identity as id_mod |
| 759 | |
| 760 | fake_home = tmp_path / "home" |
| 761 | fake_home.mkdir(parents=True, exist_ok=True) |
| 762 | import pathlib as _pl |
| 763 | monkeypatch.setattr(_pl.Path, "home", staticmethod(lambda: fake_home)) |
| 764 | monkeypatch.setattr(kp_mod, "_KEYS_DIR", fake_home / ".muse" / "keys") |
| 765 | monkeypatch.setattr(id_mod, "_IDENTITY_DIR", fake_home / ".muse") |
| 766 | monkeypatch.setattr(id_mod, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml") |
| 767 | monkeypatch.setenv("MUSE_KEYCHAIN_BACKEND", "disabled") |
| 768 | |
| 769 | def test_10_recoveries_same_fingerprint( |
| 770 | self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch |
| 771 | ) -> None: |
| 772 | """10 forced recoveries from the same mnemonic must all yield the same fingerprint.""" |
| 773 | self._patch(monkeypatch, tmp_path) |
| 774 | |
| 775 | fingerprints: list[str] = [] |
| 776 | for i in range(10): |
| 777 | flags = ["auth", "recover", "--hub", "http://localhost:19920", "--json"] |
| 778 | if i > 0: |
| 779 | flags.append("--force") |
| 780 | r = runner.invoke(cli, flags, input=f"{self._MNEMONIC}\n") |
| 781 | assert r.exit_code == 0, f"recover #{i} failed:\n{r.output}" |
| 782 | fingerprints.append(json.loads(r.output)["fingerprint"]) |
| 783 | |
| 784 | assert len(set(fingerprints)) == 1, ( |
| 785 | f"Fingerprint varied across 10 recoveries: {set(fingerprints)}" |
| 786 | ) |
| 787 | |
| 788 | def test_10_recoveries_different_hubs( |
| 789 | self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch |
| 790 | ) -> None: |
| 791 | """Recover for 10 different hubs from the same mnemonic — all succeed.""" |
| 792 | self._patch(monkeypatch, tmp_path) |
| 793 | |
| 794 | fingerprints: list[str] = [] |
| 795 | for i in range(10): |
| 796 | hub = f"http://localhost:{19930 + i}" |
| 797 | r = runner.invoke( |
| 798 | cli, |
| 799 | ["auth", "recover", "--hub", hub, "--json"], |
| 800 | input=f"{self._MNEMONIC}\n", |
| 801 | ) |
| 802 | assert r.exit_code == 0, f"recover for {hub} failed:\n{r.output}" |
| 803 | fingerprints.append(json.loads(r.output)["fingerprint"]) |
| 804 | |
| 805 | # All 10 should produce the same fingerprint (same mnemonic, human key) |
| 806 | assert len(set(fingerprints)) == 1, ( |
| 807 | "Same mnemonic should give same fingerprint regardless of hub hostname" |
| 808 | ) |
| 809 | |
| 810 | |
| 811 | # ── Performance: recover completes within SLA ───────────────────────────────── |
| 812 | |
| 813 | |
| 814 | class TestRecoverPerformance: |
| 815 | _MNEMONIC = ( |
| 816 | "abandon abandon abandon abandon abandon abandon abandon abandon " |
| 817 | "abandon abandon abandon about" |
| 818 | ) |
| 819 | |
| 820 | def test_recover_under_3_seconds( |
| 821 | self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch |
| 822 | ) -> None: |
| 823 | """Full recover (PBKDF2 + SLIP-0010 + PEM write) must complete within 3 s.""" |
| 824 | import muse.core.keypair as kp_mod |
| 825 | import muse.core.identity as id_mod |
| 826 | import pathlib as _pl |
| 827 | |
| 828 | fake_home = tmp_path / "home" |
| 829 | fake_home.mkdir(parents=True, exist_ok=True) |
| 830 | monkeypatch.setattr(_pl.Path, "home", staticmethod(lambda: fake_home)) |
| 831 | monkeypatch.setattr(kp_mod, "_KEYS_DIR", fake_home / ".muse" / "keys") |
| 832 | monkeypatch.setattr(id_mod, "_IDENTITY_DIR", fake_home / ".muse") |
| 833 | monkeypatch.setattr(id_mod, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml") |
| 834 | monkeypatch.setenv("MUSE_KEYCHAIN_BACKEND", "disabled") |
| 835 | |
| 836 | start = time.monotonic() |
| 837 | result = runner.invoke( |
| 838 | cli, |
| 839 | ["auth", "recover", "--hub", "http://localhost:19940", "--json"], |
| 840 | input=f"{self._MNEMONIC}\n", |
| 841 | ) |
| 842 | elapsed = time.monotonic() - start |
| 843 | |
| 844 | assert result.exit_code == 0, result.output |
| 845 | assert elapsed < 3.0, f"recover took {elapsed:.2f}s — exceeds 3 s SLA" |
| 846 | |
| 847 | |
| 848 | # ── Performance: register latency (stubbed network) ─────────────────────────── |
| 849 | |
| 850 | |
| 851 | class TestRegisterPerformance: |
| 852 | """register with a mocked hub must complete within 500 ms.""" |
| 853 | |
| 854 | def test_register_under_500ms( |
| 855 | self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch |
| 856 | ) -> None: |
| 857 | import muse.core.keypair as kp_mod |
| 858 | import muse.core.identity as id_mod |
| 859 | import pathlib as _pl |
| 860 | import unittest.mock |
| 861 | import urllib.request |
| 862 | |
| 863 | fake_home = tmp_path / "home" |
| 864 | fake_home.mkdir(parents=True, exist_ok=True) |
| 865 | monkeypatch.setattr(_pl.Path, "home", staticmethod(lambda: fake_home)) |
| 866 | monkeypatch.setattr(kp_mod, "_KEYS_DIR", fake_home / ".muse" / "keys") |
| 867 | monkeypatch.setattr(id_mod, "_IDENTITY_DIR", fake_home / ".muse") |
| 868 | monkeypatch.setattr(id_mod, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml") |
| 869 | |
| 870 | # Pre-set identity entry and patch keychain so register can sign |
| 871 | import muse.core.keychain as _kc |
| 872 | _MNEMONIC = "abandon " * 11 + "about" |
| 873 | monkeypatch.setattr(_kc, "is_available", lambda: True) |
| 874 | monkeypatch.setattr(_kc, "load", lambda: _MNEMONIC) |
| 875 | id_mod.save_identity("http://localhost:19950", { |
| 876 | "type": "human", |
| 877 | "handle": "perf-user", |
| 878 | "algorithm": "ed25519", |
| 879 | "hd_path": "m/1075233755'/0'/0'/0'/0'/0'", |
| 880 | "fingerprint": "a" * 64, |
| 881 | }) |
| 882 | |
| 883 | # Mock challenge-response round-trip |
| 884 | def _fake_urlopen(req: "urllib.request.Request | str", *args: str, **kwargs: str) -> "_Resp1": |
| 885 | url = req.full_url if hasattr(req, "full_url") else str(req) |
| 886 | if "challenge" in url: |
| 887 | body = json.dumps({ |
| 888 | "challenge_token": "deadbeef" * 8, # 64-char hex nonce |
| 889 | "is_new_key": True, |
| 890 | "algorithm": "ed25519", |
| 891 | }).encode() |
| 892 | else: |
| 893 | body = json.dumps({ |
| 894 | "token": "test-auth-token", |
| 895 | "handle": "perf-user", |
| 896 | "identity_id": "id-123", |
| 897 | "is_new_identity": True, |
| 898 | "auth_method": "ed25519", |
| 899 | }).encode() |
| 900 | |
| 901 | class _Resp1: |
| 902 | def __init__(self) -> None: |
| 903 | self.status = 200 |
| 904 | def read(self, n: int = -1) -> bytes: |
| 905 | return body |
| 906 | def __enter__(self) -> "_Resp1": return self |
| 907 | def __exit__(self, *a: object) -> None: pass |
| 908 | |
| 909 | return _Resp1() |
| 910 | |
| 911 | monkeypatch.setattr(urllib.request, "urlopen", _fake_urlopen) |
| 912 | |
| 913 | start = time.monotonic() |
| 914 | result = runner.invoke( |
| 915 | cli, |
| 916 | ["auth", "register", "--hub", "http://localhost:19950", |
| 917 | "--handle", "perf-user", "--json"], |
| 918 | ) |
| 919 | elapsed = time.monotonic() - start |
| 920 | |
| 921 | assert result.exit_code == 0, f"register failed:\n{result.output}" |
| 922 | assert elapsed < 0.5, f"register took {elapsed:.3f}s — exceeds 500 ms SLA" |
| 923 | |
| 924 | |
| 925 | # ── Stress: register repeated calls to same hub ─────────────────────────────── |
| 926 | |
| 927 | |
| 928 | class TestRegisterStress: |
| 929 | """register is idempotent — repeated calls with --force succeed.""" |
| 930 | |
| 931 | def test_5_register_calls_same_hub( |
| 932 | self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch |
| 933 | ) -> None: |
| 934 | import muse.core.keypair as kp_mod |
| 935 | import muse.core.identity as id_mod |
| 936 | import pathlib as _pl |
| 937 | import urllib.request |
| 938 | |
| 939 | fake_home = tmp_path / "home" |
| 940 | fake_home.mkdir(parents=True, exist_ok=True) |
| 941 | monkeypatch.setattr(_pl.Path, "home", staticmethod(lambda: fake_home)) |
| 942 | monkeypatch.setattr(kp_mod, "_KEYS_DIR", fake_home / ".muse" / "keys") |
| 943 | monkeypatch.setattr(id_mod, "_IDENTITY_DIR", fake_home / ".muse") |
| 944 | monkeypatch.setattr(id_mod, "_IDENTITY_FILE", fake_home / ".muse" / "identity.toml") |
| 945 | |
| 946 | # Pre-set identity entry and patch keychain so register can sign |
| 947 | import muse.core.keychain as _kc |
| 948 | _MNEMONIC = "abandon " * 11 + "about" |
| 949 | monkeypatch.setattr(_kc, "is_available", lambda: True) |
| 950 | monkeypatch.setattr(_kc, "load", lambda: _MNEMONIC) |
| 951 | id_mod.save_identity("http://localhost:19960", { |
| 952 | "type": "human", |
| 953 | "handle": "stress-user", |
| 954 | "algorithm": "ed25519", |
| 955 | "hd_path": "m/1075233755'/0'/0'/0'/0'/0'", |
| 956 | "fingerprint": "a" * 64, |
| 957 | }) |
| 958 | |
| 959 | call_count = [0] |
| 960 | |
| 961 | def _fake_urlopen(req: "urllib.request.Request | str", *args: str, **kwargs: str) -> "_Resp2": |
| 962 | call_count[0] += 1 |
| 963 | url = req.full_url if hasattr(req, "full_url") else str(req) |
| 964 | if "challenge" in url: |
| 965 | body = json.dumps({ |
| 966 | "challenge_token": "cafebabe" * 8, # 64-char hex nonce |
| 967 | "is_new_key": False, |
| 968 | "algorithm": "ed25519", |
| 969 | }).encode() |
| 970 | else: |
| 971 | body = json.dumps({ |
| 972 | "token": "auth-token", |
| 973 | "handle": "stress-user", |
| 974 | "identity_id": "id-stress", |
| 975 | "is_new_identity": False, |
| 976 | "auth_method": "ed25519", |
| 977 | }).encode() |
| 978 | |
| 979 | class _Resp2: |
| 980 | status = 200 |
| 981 | def read(self, n: int = -1) -> bytes: |
| 982 | return body |
| 983 | def __enter__(self) -> "_Resp2": return self |
| 984 | def __exit__(self, *a: object) -> None: pass |
| 985 | |
| 986 | return _Resp2() |
| 987 | |
| 988 | monkeypatch.setattr(urllib.request, "urlopen", _fake_urlopen) |
| 989 | |
| 990 | handles_seen: list[str] = [] |
| 991 | for i in range(5): |
| 992 | r = runner.invoke( |
| 993 | cli, |
| 994 | ["auth", "register", "--hub", "http://localhost:19960", |
| 995 | "--handle", "stress-user", "--json"], |
| 996 | ) |
| 997 | assert r.exit_code == 0, f"register #{i} failed:\n{r.output}" |
| 998 | data = _parse_json(r) |
| 999 | handles_seen.append(data.get("handle", "")) |
| 1000 | |
| 1001 | assert all(h == "stress-user" for h in handles_seen), ( |
| 1002 | f"handle inconsistent across 5 registrations: {handles_seen}" |
| 1003 | ) |
| 1004 | |
| 1005 | |
| 1006 | # ── TDD: key_path removed from IdentityEntry and _dump_identity (P3) ───────── |
| 1007 | |
| 1008 | |
| 1009 | class TestKeyPathPurged: |
| 1010 | """P3: key_path must be gone from IdentityEntry TypedDict and _dump_identity. |
| 1011 | |
| 1012 | Before fix: key_path: str is in IdentityEntry; _dump_identity serialises it. |
| 1013 | After fix: key_path is absent from the TypedDict; _dump_identity never emits it. |
| 1014 | """ |
| 1015 | |
| 1016 | def test_P3_1_key_path_not_in_identity_entry_typeddict(self) -> None: |
| 1017 | """key_path must not appear in IdentityEntry's annotations.""" |
| 1018 | from muse.core.identity import IdentityEntry |
| 1019 | annotations = IdentityEntry.__annotations__ |
| 1020 | assert "key_path" not in annotations, ( |
| 1021 | "key_path still in IdentityEntry TypedDict — Phase 3 not complete" |
| 1022 | ) |
| 1023 | |
| 1024 | def test_P3_2_dump_identity_never_emits_key_path(self) -> None: |
| 1025 | """_dump_identity must not write key_path to TOML even when entry has it.""" |
| 1026 | from muse.core.identity import _dump_identity |
| 1027 | entry = { |
| 1028 | "type": "human", |
| 1029 | "handle": "alice", |
| 1030 | "key_path": "/home/alice/.muse/keys/musehub_ai.pem", # must be stripped |
| 1031 | "algorithm": "ed25519", |
| 1032 | "fingerprint": "abc123", |
| 1033 | "hd_path": "m/1075233755'/0'/0'/0'/0'/0'", |
| 1034 | } |
| 1035 | toml_text = _dump_identity({"musehub.ai": entry}) |
| 1036 | assert "key_path" not in toml_text, ( |
| 1037 | f"_dump_identity still emits key_path:\n{toml_text}" |
| 1038 | ) |
File History
1 commit
sha256:d11a87833d5fad6059b7662844bf5448a8911a17cce7a51811f71ad394f248eb
bump to v0.2.0rc13
Human
patch
6 days ago