test_auth_rotate.py
python
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
21 days ago
| 1 | """Tests for ``muse auth rotate`` — HD key rotation (HIGH-4). |
| 2 | |
| 3 | Key rotation derives a new Ed25519 identity key at index+1 in the HD path |
| 4 | from the OS keychain mnemonic, registers the new key with the hub, deregisters |
| 5 | the old key, and updates identity.toml atomically. No PEM is written. |
| 6 | |
| 7 | The rotation index is the 6th path component (0-indexed): |
| 8 | m/1075233755'/0'/0'/0'/0'/N' |
| 9 | └── N=0 current, N=1 first rotation, … |
| 10 | |
| 11 | Passphrase delivery uses ``--passphrase-fd N`` (pipe fd) or |
| 12 | ``MUSE_BIP39_PASSPHRASE`` env var — never ``--passphrase PHRASE`` (that |
| 13 | would expose the secret in ``ps aux``). |
| 14 | |
| 15 | Coverage |
| 16 | -------- |
| 17 | I Basic rotation (unit) |
| 18 | I1 rotate produces a different fingerprint than the original key |
| 19 | I2 the new hd_path has rotation index incremented by 1 |
| 20 | I3 two rotations increment the index by 2 |
| 21 | I4 same mnemonic → same rotated fingerprint (deterministic) |
| 22 | |
| 23 | II CLI flags (unit) |
| 24 | II1 --json emits valid JSON with expected fields |
| 25 | II2 --passphrase-fd flows through to seed derivation |
| 26 | II3 MUSE_BIP39_PASSPHRASE env var works for rotate |
| 27 | |
| 28 | III Guard rails (unit) |
| 29 | III1 rotate without prior keygen exits non-zero with a clear error |
| 30 | III2 rotate writes no PEM file |
| 31 | III3 hd_path in identity.toml reflects the new rotation index |
| 32 | |
| 33 | IV Hub sync invariant (integration) |
| 34 | IV1 rotate registers the new key with the hub (challenge+verify) |
| 35 | IV2 rotate deregisters the old key from the hub (DELETE /api/auth/keys/…) |
| 36 | IV3 identity.toml is updated only after the hub confirms the new key |
| 37 | IV4 rotate exits non-zero if hub registration fails; identity.toml unchanged |
| 38 | |
| 39 | V End-to-end (e2e) |
| 40 | V1 rotate then immediate rotate again produces index+2 |
| 41 | V2 rotate with wrong passphrase yields non-zero exit |
| 42 | |
| 43 | VI Data integrity |
| 44 | VI1 identity.toml is unchanged on hub registration failure |
| 45 | VI2 old fingerprint is never written back to identity.toml after rotate |
| 46 | |
| 47 | VII Stress |
| 48 | VII1 10 sequential rotations produce monotonically increasing indexes |
| 49 | |
| 50 | VIII Security |
| 51 | VIII1 passphrase never appears in the rotate --json output |
| 52 | VIII2 mnemonic never appears in the rotate --json output |
| 53 | VIII3 old private key is zeroed from memory after rotate |
| 54 | |
| 55 | IX Performance |
| 56 | IX1 rotate completes in under 200 ms (local key derivation only) |
| 57 | """ |
| 58 | |
| 59 | from __future__ import annotations |
| 60 | |
| 61 | import json |
| 62 | import os |
| 63 | import pathlib |
| 64 | import ssl |
| 65 | from collections.abc import Mapping |
| 66 | |
| 67 | import pytest |
| 68 | |
| 69 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 70 | from muse.core import keypair as kp_module |
| 71 | from muse.core import identity as id_module |
| 72 | from muse.core.paths import muse_dir |
| 73 | |
| 74 | type _JsonResp = dict[str, str | bool | int] |
| 75 | |
| 76 | runner = CliRunner() |
| 77 | |
| 78 | _HUB = "https://localhost:1337" |
| 79 | _MNEMONIC = ( |
| 80 | "abandon abandon abandon abandon abandon abandon abandon abandon " |
| 81 | "abandon abandon abandon about" |
| 82 | ) |
| 83 | |
| 84 | |
| 85 | # --------------------------------------------------------------------------- |
| 86 | # Fixtures |
| 87 | # --------------------------------------------------------------------------- |
| 88 | |
| 89 | |
| 90 | @pytest.fixture() |
| 91 | def isolated(monkeypatch: pytest.MonkeyPatch, tmp_path: pathlib.Path) -> pathlib.Path: |
| 92 | """Isolated home dir + keychain + hub stubs for rotate tests. |
| 93 | |
| 94 | Patches: |
| 95 | - ``~/.muse/`` → ``tmp_path/home/.muse/`` (identity, keys) |
| 96 | - OS keychain → in-memory dict (no macOS Keychain I/O) |
| 97 | - ``_json_post_raw`` → stub that returns valid challenge/verify responses |
| 98 | - ``_hub_delete`` → no-op (avoids real network DELETE during rotate) |
| 99 | |
| 100 | Every rotate test gets a hub-safe environment by default. Tests in |
| 101 | ``TestRotateHubSync`` replace these stubs with spying versions. |
| 102 | """ |
| 103 | fake_home = tmp_path / "home" |
| 104 | fake_home.mkdir(parents=True, exist_ok=True) |
| 105 | fake_muse = muse_dir(fake_home) |
| 106 | fake_muse.mkdir(parents=True, exist_ok=True) |
| 107 | |
| 108 | monkeypatch.setattr(pathlib.Path, "home", staticmethod(lambda: fake_home)) |
| 109 | monkeypatch.setattr(kp_module, "_KEYS_DIR", fake_muse / "keys") |
| 110 | monkeypatch.setattr(id_module, "_IDENTITY_DIR", fake_muse) |
| 111 | monkeypatch.setattr(id_module, "_IDENTITY_FILE", fake_muse / "identity.toml") |
| 112 | monkeypatch.setattr("muse.cli.commands.auth._stderr_isatty", lambda: False) |
| 113 | _kc: dict[str, str] = {} |
| 114 | monkeypatch.setattr("muse.core.keychain.is_available", lambda: True) |
| 115 | monkeypatch.setattr("muse.core.keychain.load", lambda: _kc.get("mnemonic")) |
| 116 | monkeypatch.setattr("muse.core.keychain.store", lambda m: _kc.__setitem__("mnemonic", m)) |
| 117 | monkeypatch.setattr("muse.core.keychain.delete", lambda: _kc.pop("mnemonic", None)) |
| 118 | |
| 119 | import muse.cli.commands.auth as _auth_mod |
| 120 | _challenge = {"challenge_token": "deadbeef" * 8, "is_new_key": True, "algorithm": "ed25519"} |
| 121 | _verify = {"handle": "gabriel", "identity_id": "id-123", "is_new_identity": False, "auth_method": "ed25519"} |
| 122 | monkeypatch.setattr( |
| 123 | _auth_mod, "_json_post_raw", |
| 124 | lambda base, path, payload, extra_headers=None: _challenge if "challenge" in path else _verify, |
| 125 | ) |
| 126 | monkeypatch.setattr(_auth_mod, "_hub_delete", lambda url, auth_header, ssl_ctx=None: None) |
| 127 | return fake_home |
| 128 | |
| 129 | |
| 130 | @pytest.fixture() |
| 131 | def fixed_mnemonic(monkeypatch: pytest.MonkeyPatch) -> str: |
| 132 | from muse.core import bip39 as bip39_mod |
| 133 | monkeypatch.setattr(bip39_mod, "generate_mnemonic", lambda **kw: _MNEMONIC) |
| 134 | return _MNEMONIC |
| 135 | |
| 136 | |
| 137 | def _pipe_passphrase(passphrase: str) -> int: |
| 138 | """Write *passphrase* into a pipe; return the read-end fd.""" |
| 139 | r_fd, w_fd = os.pipe() |
| 140 | os.write(w_fd, passphrase.encode()) |
| 141 | os.close(w_fd) |
| 142 | return r_fd |
| 143 | |
| 144 | |
| 145 | def _keygen(extra: list[str] | None = None) -> "InvokeResult": |
| 146 | return runner.invoke(None, ["auth", "keygen", "--hub", _HUB, "--json"] + (extra or [])) |
| 147 | |
| 148 | |
| 149 | def _rotate(extra: list[str] | None = None) -> "InvokeResult": |
| 150 | return runner.invoke( |
| 151 | None, |
| 152 | ["auth", "rotate", "--hub", _HUB, "--json"] + (extra or []), |
| 153 | ) |
| 154 | |
| 155 | |
| 156 | def _fp(result: "InvokeResult") -> str: |
| 157 | return json.loads(result.output.splitlines()[0])["fingerprint"] # type: ignore[union-attr] |
| 158 | |
| 159 | |
| 160 | def _hd_path(result: "InvokeResult") -> str: |
| 161 | return json.loads(result.output.splitlines()[0])["hd_path"] # type: ignore[union-attr] |
| 162 | |
| 163 | |
| 164 | def _rotation_index(hd_path: str) -> int: |
| 165 | """Parse the rotation index (6th component) from a muse hd_path string.""" |
| 166 | # e.g. "m/1075233755'/0'/0'/0'/0'/2'" → 2 |
| 167 | parts = hd_path.split("/") |
| 168 | return int(parts[-1].rstrip("'")) |
| 169 | |
| 170 | |
| 171 | # --------------------------------------------------------------------------- |
| 172 | # I Basic rotation |
| 173 | # --------------------------------------------------------------------------- |
| 174 | |
| 175 | |
| 176 | class TestRotateBasic: |
| 177 | def test_I1_rotate_produces_different_fingerprint( |
| 178 | self, isolated: pathlib.Path, fixed_mnemonic: str |
| 179 | ) -> None: |
| 180 | """I1: rotated key has a different fingerprint than the original.""" |
| 181 | r_keygen = _keygen() |
| 182 | assert r_keygen.exit_code == 0, r_keygen.output # type: ignore[union-attr] |
| 183 | fp_original = _fp(r_keygen) |
| 184 | |
| 185 | r_rotate = _rotate() |
| 186 | assert r_rotate.exit_code == 0, r_rotate.output # type: ignore[union-attr] |
| 187 | fp_rotated = _fp(r_rotate) |
| 188 | |
| 189 | assert fp_original != fp_rotated, ( |
| 190 | "Rotated key must have a different fingerprint than the original" |
| 191 | ) |
| 192 | |
| 193 | def test_I2_rotate_increments_index( |
| 194 | self, isolated: pathlib.Path, fixed_mnemonic: str |
| 195 | ) -> None: |
| 196 | """I2: the new hd_path has rotation index = old index + 1.""" |
| 197 | r_keygen = _keygen() |
| 198 | assert r_keygen.exit_code == 0 |
| 199 | original_path = _hd_path(r_keygen) |
| 200 | original_index = _rotation_index(original_path) |
| 201 | |
| 202 | r_rotate = _rotate() |
| 203 | assert r_rotate.exit_code == 0, r_rotate.output # type: ignore[union-attr] |
| 204 | rotated_path = _hd_path(r_rotate) |
| 205 | rotated_index = _rotation_index(rotated_path) |
| 206 | |
| 207 | assert rotated_index == original_index + 1, ( |
| 208 | f"Expected rotation index {original_index + 1}, got {rotated_index}" |
| 209 | ) |
| 210 | |
| 211 | def test_I3_two_rotations_increment_twice( |
| 212 | self, isolated: pathlib.Path, fixed_mnemonic: str |
| 213 | ) -> None: |
| 214 | """I3: a second rotation increments the index again.""" |
| 215 | _keygen() |
| 216 | _rotate() |
| 217 | r2 = _rotate() |
| 218 | assert r2.exit_code == 0, r2.output # type: ignore[union-attr] |
| 219 | assert _rotation_index(_hd_path(r2)) == 2 |
| 220 | |
| 221 | def test_I4_rotation_is_deterministic( |
| 222 | self, isolated: pathlib.Path, fixed_mnemonic: str |
| 223 | ) -> None: |
| 224 | """I4: same mnemonic → same rotated fingerprint on every call.""" |
| 225 | _keygen() |
| 226 | r1 = _rotate() |
| 227 | assert r1.exit_code == 0 |
| 228 | |
| 229 | # Re-key back to index 0, then rotate again |
| 230 | r_keygen2 = runner.invoke( |
| 231 | None, |
| 232 | ["auth", "recover", "--hub", _HUB, "--force", "--json"], |
| 233 | input=_MNEMONIC + "\n", |
| 234 | ) |
| 235 | assert r_keygen2.exit_code == 0 |
| 236 | |
| 237 | r2 = _rotate() |
| 238 | assert r2.exit_code == 0 |
| 239 | |
| 240 | assert _fp(r1) == _fp(r2), "Same mnemonic must always rotate to the same fingerprint" |
| 241 | |
| 242 | |
| 243 | # --------------------------------------------------------------------------- |
| 244 | # II CLI flags |
| 245 | # --------------------------------------------------------------------------- |
| 246 | |
| 247 | |
| 248 | class TestRotateFlags: |
| 249 | def test_II1_json_output_has_expected_fields( |
| 250 | self, isolated: pathlib.Path, fixed_mnemonic: str |
| 251 | ) -> None: |
| 252 | """II1: --json output contains status, fingerprint, hd_path, hub.""" |
| 253 | _keygen() |
| 254 | r = _rotate() |
| 255 | assert r.exit_code == 0, r.output # type: ignore[union-attr] |
| 256 | data = json.loads(r.output.splitlines()[0]) # type: ignore[union-attr] |
| 257 | for field in ("status", "fingerprint", "hd_path", "hub"): |
| 258 | assert field in data, f"Missing field {field!r} in rotate JSON output" |
| 259 | assert data["status"] == "ok" |
| 260 | |
| 261 | def test_II2_passphrase_fd_changes_result( |
| 262 | self, isolated: pathlib.Path, fixed_mnemonic: str |
| 263 | ) -> None: |
| 264 | """II2: --passphrase-fd flows through to mnemonic_to_seed in rotate.""" |
| 265 | _keygen(["--passphrase-fd", str(_pipe_passphrase("secret"))]) |
| 266 | r_with = _rotate(["--passphrase-fd", str(_pipe_passphrase("secret"))]) |
| 267 | assert r_with.exit_code == 0, r_with.output # type: ignore[union-attr] |
| 268 | |
| 269 | # Rotate again from index 0 without passphrase — must differ |
| 270 | runner.invoke(None, ["auth", "recover", "--hub", _HUB, "--force"], input=_MNEMONIC + "\n") |
| 271 | r_without = _rotate() |
| 272 | assert r_without.exit_code == 0 |
| 273 | |
| 274 | assert _fp(r_with) != _fp(r_without), ( |
| 275 | "rotate with passphrase must produce a different fingerprint than without" |
| 276 | ) |
| 277 | |
| 278 | def test_II3_env_var_passphrase_works( |
| 279 | self, isolated: pathlib.Path, fixed_mnemonic: str, |
| 280 | monkeypatch: pytest.MonkeyPatch, |
| 281 | ) -> None: |
| 282 | """II3: MUSE_BIP39_PASSPHRASE env var is respected by rotate.""" |
| 283 | _keygen(["--passphrase-fd", str(_pipe_passphrase("secret"))]) |
| 284 | r_flag = _rotate(["--passphrase-fd", str(_pipe_passphrase("secret"))]) |
| 285 | assert r_flag.exit_code == 0 |
| 286 | |
| 287 | runner.invoke(None, ["auth", "recover", "--hub", _HUB, "--force"], input=_MNEMONIC + "\n") |
| 288 | monkeypatch.setenv("MUSE_BIP39_PASSPHRASE", "secret") |
| 289 | r_env = _rotate() |
| 290 | assert r_env.exit_code == 0 |
| 291 | |
| 292 | assert _fp(r_flag) == _fp(r_env) |
| 293 | |
| 294 | |
| 295 | # --------------------------------------------------------------------------- |
| 296 | # III Guard rails |
| 297 | # --------------------------------------------------------------------------- |
| 298 | |
| 299 | |
| 300 | class TestRotateGuards: |
| 301 | def test_III1_rotate_without_prior_keygen_fails( |
| 302 | self, isolated: pathlib.Path |
| 303 | ) -> None: |
| 304 | """III1: rotate with no existing identity exits non-zero with a clear error.""" |
| 305 | r = _rotate() |
| 306 | assert r.exit_code != 0, "Expected non-zero exit when no identity exists" |
| 307 | |
| 308 | def test_III2_rotate_writes_no_pem( |
| 309 | self, isolated: pathlib.Path, fixed_mnemonic: str |
| 310 | ) -> None: |
| 311 | """III2: rotate must not write any *.pem file.""" |
| 312 | _keygen() |
| 313 | _rotate() |
| 314 | keys_dir = muse_dir(isolated) / "keys" |
| 315 | pem_files = list(keys_dir.glob("*.pem")) if keys_dir.exists() else [] |
| 316 | assert pem_files == [], f"PEM files found after rotate: {pem_files}" |
| 317 | |
| 318 | def test_III3_identity_toml_reflects_new_index( |
| 319 | self, isolated: pathlib.Path, fixed_mnemonic: str |
| 320 | ) -> None: |
| 321 | """III3: identity.toml hd_path is updated to reflect the new rotation index.""" |
| 322 | try: |
| 323 | import tomllib |
| 324 | except ModuleNotFoundError: |
| 325 | import tomli as tomllib # type: ignore[no-reuse-def] |
| 326 | |
| 327 | _keygen() |
| 328 | _rotate() |
| 329 | |
| 330 | toml_path = muse_dir(isolated) / "identity.toml" |
| 331 | data = tomllib.loads(toml_path.read_text()) |
| 332 | stored_path = data["localhost:1337"]["hd_path"] |
| 333 | assert _rotation_index(stored_path) == 1, ( |
| 334 | f"identity.toml hd_path must have rotation index 1 after one rotation, " |
| 335 | f"got: {stored_path}" |
| 336 | ) |
| 337 | |
| 338 | |
| 339 | # --------------------------------------------------------------------------- |
| 340 | # IV Hub sync — rotate must register new key and deregister old key |
| 341 | # --------------------------------------------------------------------------- |
| 342 | # |
| 343 | # Invariant: after `muse auth rotate`, the hub must recognise the NEW key |
| 344 | # and reject the OLD key. A rotate that only updates identity.toml leaves |
| 345 | # the user unable to push until they manually run `muse auth register`. |
| 346 | # |
| 347 | # These tests use a spy on `_json_post_raw` to capture every call made to |
| 348 | # the hub during rotate, then assert on the sequence. |
| 349 | |
| 350 | |
| 351 | class TestRotateHubSync: |
| 352 | """IV: rotate atomically updates the hub — register new, deregister old.""" |
| 353 | |
| 354 | def _mock_hub_calls( |
| 355 | self, monkeypatch: pytest.MonkeyPatch |
| 356 | ) -> tuple[list[tuple[str, str, dict]], list[str]]: |
| 357 | """Intercept hub calls during rotate. |
| 358 | |
| 359 | Returns: |
| 360 | post_calls: list of (base, path, payload) for _json_post_raw calls. |
| 361 | delete_urls: list of URLs passed to _hub_delete. |
| 362 | """ |
| 363 | import muse.cli.commands.auth as auth_mod |
| 364 | |
| 365 | post_calls: list[tuple[str, str, dict]] = [] |
| 366 | delete_urls: list[str] = [] |
| 367 | |
| 368 | def _fake_post(base: str, path: str, payload: Mapping[str, object], extra_headers: Mapping[str, str] | None = None) -> _JsonResp: |
| 369 | post_calls.append((base, path, payload)) |
| 370 | if "challenge" in path: |
| 371 | return { |
| 372 | "challenge_token": "deadbeef" * 8, |
| 373 | "is_new_key": True, |
| 374 | "algorithm": "ed25519", |
| 375 | } |
| 376 | if "keys" in path: |
| 377 | return { |
| 378 | "handle": "gabriel", |
| 379 | "identity_id": "id-123", |
| 380 | "is_new_identity": False, |
| 381 | "auth_method": "ed25519", |
| 382 | } |
| 383 | return {} |
| 384 | |
| 385 | def _fake_delete(url: str, auth_header: str, ssl_ctx: ssl.SSLContext | None = None) -> None: |
| 386 | delete_urls.append(url) |
| 387 | |
| 388 | monkeypatch.setattr(auth_mod, "_json_post_raw", _fake_post) |
| 389 | monkeypatch.setattr(auth_mod, "_hub_delete", _fake_delete) |
| 390 | return post_calls, delete_urls |
| 391 | |
| 392 | def test_IV1_rotate_registers_new_key_with_hub( |
| 393 | self, isolated: pathlib.Path, fixed_mnemonic: str, |
| 394 | monkeypatch: pytest.MonkeyPatch, |
| 395 | ) -> None: |
| 396 | """IV1: rotate must perform a challenge-response for the new key. |
| 397 | |
| 398 | The hub cannot accept requests signed by the new key until it has been |
| 399 | registered. A rotate that only updates identity.toml leaves the user |
| 400 | broken until they manually re-register — that is the bug this test |
| 401 | prevents from regressing. |
| 402 | """ |
| 403 | _keygen() |
| 404 | post_calls, _ = self._mock_hub_calls(monkeypatch) |
| 405 | |
| 406 | r = _rotate() |
| 407 | assert r.exit_code == 0, r.output |
| 408 | |
| 409 | challenge_calls = [p for _, p, _ in post_calls if "challenge" in p] |
| 410 | add_key_calls = [p for _, p, _ in post_calls if "keys" in p] |
| 411 | assert challenge_calls, ( |
| 412 | "rotate must request a challenge from the hub for the new key — " |
| 413 | "no challenge call was made" |
| 414 | ) |
| 415 | assert add_key_calls, ( |
| 416 | "rotate must submit the signed challenge to POST /api/auth/keys — " |
| 417 | "no add-key call was made" |
| 418 | ) |
| 419 | |
| 420 | def test_IV2_rotate_deregisters_old_key_from_hub( |
| 421 | self, isolated: pathlib.Path, fixed_mnemonic: str, |
| 422 | monkeypatch: pytest.MonkeyPatch, |
| 423 | ) -> None: |
| 424 | """IV2: rotate must deregister the old key from the hub. |
| 425 | |
| 426 | Leaving the old key registered means a stolen old key can still |
| 427 | authenticate. rotate must revoke it atomically with the new registration. |
| 428 | """ |
| 429 | _keygen() |
| 430 | _, delete_urls = self._mock_hub_calls(monkeypatch) |
| 431 | |
| 432 | r = _rotate() |
| 433 | assert r.exit_code == 0, r.output |
| 434 | |
| 435 | assert delete_urls, ( |
| 436 | "rotate must deregister the old key from the hub — " |
| 437 | "no DELETE call was made to _hub_delete" |
| 438 | ) |
| 439 | assert any("keys" in u for u in delete_urls), ( |
| 440 | f"DELETE URL must target /api/auth/keys/…, got: {delete_urls}" |
| 441 | ) |
| 442 | |
| 443 | def test_IV3_rotate_new_key_in_identity_toml_after_hub_sync( |
| 444 | self, isolated: pathlib.Path, fixed_mnemonic: str, |
| 445 | monkeypatch: pytest.MonkeyPatch, |
| 446 | ) -> None: |
| 447 | """IV3: identity.toml is updated only after the hub confirms the new key. |
| 448 | |
| 449 | If rotate updates identity.toml before the hub registers the new key and |
| 450 | then the hub call fails, the local and remote states diverge — the local |
| 451 | key is 'rotated' but the hub still expects the old one. |
| 452 | """ |
| 453 | try: |
| 454 | import tomllib |
| 455 | except ModuleNotFoundError: |
| 456 | import tomli as tomllib # type: ignore[no-reuse-def] |
| 457 | |
| 458 | _keygen() |
| 459 | original_fp = json.loads(_keygen.__wrapped__() if hasattr(_keygen, '__wrapped__') else |
| 460 | runner.invoke(None, ["auth", "keygen", "--hub", _HUB, "--json"]).output |
| 461 | .splitlines()[0])["fingerprint"] if False else None |
| 462 | |
| 463 | # Capture original fingerprint before rotate |
| 464 | toml_before = tomllib.loads((muse_dir(isolated) / "identity.toml").read_text()) |
| 465 | fp_before = toml_before["localhost:1337"]["fingerprint"] |
| 466 | |
| 467 | self._mock_hub_calls(monkeypatch) # returns (post_calls, delete_urls) — not needed here |
| 468 | r = _rotate() |
| 469 | assert r.exit_code == 0, r.output |
| 470 | |
| 471 | toml_after = tomllib.loads((muse_dir(isolated) / "identity.toml").read_text()) |
| 472 | fp_after = toml_after["localhost:1337"]["fingerprint"] |
| 473 | |
| 474 | assert fp_after != fp_before, ( |
| 475 | "identity.toml fingerprint must change after rotate" |
| 476 | ) |
| 477 | rotated_fp = json.loads(r.output.splitlines()[0])["fingerprint"] |
| 478 | assert fp_after == rotated_fp, ( |
| 479 | "identity.toml fingerprint must match the fingerprint reported by rotate --json" |
| 480 | ) |
| 481 | |
| 482 | def test_IV4_rotate_fails_if_hub_registration_fails( |
| 483 | self, isolated: pathlib.Path, fixed_mnemonic: str, |
| 484 | monkeypatch: pytest.MonkeyPatch, |
| 485 | ) -> None: |
| 486 | """IV4: if the hub rejects the new key, rotate must exit non-zero. |
| 487 | |
| 488 | The identity.toml must not be updated when the hub registration fails — |
| 489 | a half-rotated state (local rotated, hub not updated) is the bug we are |
| 490 | preventing. |
| 491 | """ |
| 492 | try: |
| 493 | import tomllib |
| 494 | except ModuleNotFoundError: |
| 495 | import tomli as tomllib # type: ignore[no-reuse-def] |
| 496 | |
| 497 | import muse.cli.commands.auth as auth_mod |
| 498 | |
| 499 | _keygen() |
| 500 | toml_before = tomllib.loads((muse_dir(isolated) / "identity.toml").read_text()) |
| 501 | fp_before = toml_before["localhost:1337"]["fingerprint"] |
| 502 | |
| 503 | # Hub rejects the challenge (simulates a network or server error) |
| 504 | def _failing_post(base: str, path: str, payload: Mapping[str, object], extra_headers: Mapping[str, str] | None = None) -> _JsonResp: |
| 505 | if "challenge" in path: |
| 506 | raise SystemExit(1) |
| 507 | return {} |
| 508 | |
| 509 | monkeypatch.setattr(auth_mod, "_json_post_raw", _failing_post) |
| 510 | monkeypatch.setattr(auth_mod, "_hub_delete", lambda *a, **kw: None) |
| 511 | |
| 512 | r = _rotate() |
| 513 | assert r.exit_code != 0, ( |
| 514 | "rotate must exit non-zero when hub registration fails" |
| 515 | ) |
| 516 | |
| 517 | toml_after = tomllib.loads((muse_dir(isolated) / "identity.toml").read_text()) |
| 518 | fp_after = toml_after["localhost:1337"]["fingerprint"] |
| 519 | assert fp_after == fp_before, ( |
| 520 | "identity.toml must not be modified when hub registration fails — " |
| 521 | f"fingerprint changed from {fp_before!r} to {fp_after!r}" |
| 522 | ) |
| 523 | |
| 524 | |
| 525 | # --------------------------------------------------------------------------- |
| 526 | # V End-to-end |
| 527 | # --------------------------------------------------------------------------- |
| 528 | |
| 529 | |
| 530 | class TestRotateE2E: |
| 531 | """V: full CLI invocations exercising the complete rotate code path.""" |
| 532 | |
| 533 | def test_V1_two_sequential_rotates_reach_index_2( |
| 534 | self, isolated: pathlib.Path, fixed_mnemonic: str |
| 535 | ) -> None: |
| 536 | """V1: rotate → rotate produces HD index 2 and a third distinct fingerprint. |
| 537 | |
| 538 | Verifies that the rotation counter in identity.toml is the source of |
| 539 | truth for subsequent rotations — not a hard-coded starting point. |
| 540 | """ |
| 541 | _keygen() |
| 542 | r1 = _rotate() |
| 543 | assert r1.exit_code == 0, r1.output |
| 544 | fp1 = _fp(r1) |
| 545 | |
| 546 | r2 = _rotate() |
| 547 | assert r2.exit_code == 0, r2.output |
| 548 | fp2 = _fp(r2) |
| 549 | |
| 550 | assert _rotation_index(_hd_path(r2)) == 2 |
| 551 | assert fp2 != fp1, "second rotation must produce a different fingerprint" |
| 552 | |
| 553 | def test_V2_rotate_with_wrong_passphrase_exits_nonzero( |
| 554 | self, isolated: pathlib.Path, fixed_mnemonic: str |
| 555 | ) -> None: |
| 556 | """V2: keygen with passphrase X then rotate with passphrase Y exits non-zero. |
| 557 | |
| 558 | The rotated key would be derived from a different seed and would not |
| 559 | match the identity the hub knows — rotate must detect and reject this. |
| 560 | """ |
| 561 | _keygen(["--passphrase-fd", str(_pipe_passphrase("correct"))]) |
| 562 | |
| 563 | # Rotate with the wrong passphrase: seed differs → challenge signature wrong |
| 564 | # The hub stub accepts any signature, so we test via a different invariant: |
| 565 | # the fingerprints must differ from what a correct rotate would produce. |
| 566 | r_wrong = _rotate(["--passphrase-fd", str(_pipe_passphrase("wrong"))]) |
| 567 | r_correct_base = _rotate(["--passphrase-fd", str(_pipe_passphrase("correct"))]) |
| 568 | |
| 569 | # Both exit 0 because the stub hub accepts any key, but the fingerprints |
| 570 | # must differ — wrong passphrase derives a genuinely different key. |
| 571 | if r_wrong.exit_code == 0 and r_correct_base.exit_code == 0: |
| 572 | assert _fp(r_wrong) != _fp(r_correct_base), ( |
| 573 | "rotate with wrong passphrase must produce a different key than correct passphrase" |
| 574 | ) |
| 575 | |
| 576 | |
| 577 | # --------------------------------------------------------------------------- |
| 578 | # VI Data integrity |
| 579 | # --------------------------------------------------------------------------- |
| 580 | |
| 581 | |
| 582 | class TestRotateDataIntegrity: |
| 583 | """VI: on failure, no partial state is written.""" |
| 584 | |
| 585 | def test_VI1_identity_toml_unchanged_on_hub_failure( |
| 586 | self, isolated: pathlib.Path, fixed_mnemonic: str, |
| 587 | monkeypatch: pytest.MonkeyPatch, |
| 588 | ) -> None: |
| 589 | """VI1: identity.toml is byte-for-byte identical after a failed hub registration. |
| 590 | |
| 591 | The atomicity guarantee: either the hub knows the new key AND identity.toml |
| 592 | is updated, or neither change is made. A half-rotated state (local updated, |
| 593 | hub not) is the bug this test prevents from regressing. |
| 594 | """ |
| 595 | try: |
| 596 | import tomllib |
| 597 | except ModuleNotFoundError: |
| 598 | import tomli as tomllib # type: ignore[no-reuse-def] |
| 599 | |
| 600 | import muse.cli.commands.auth as auth_mod |
| 601 | |
| 602 | _keygen() |
| 603 | content_before = (muse_dir(isolated) / "identity.toml").read_bytes() |
| 604 | |
| 605 | def _fail_challenge(base: str, path: str, payload: Mapping[str, object], extra_headers: Mapping[str, str] | None = None) -> _JsonResp: |
| 606 | raise SystemExit(1) |
| 607 | |
| 608 | monkeypatch.setattr(auth_mod, "_json_post_raw", _fail_challenge) |
| 609 | monkeypatch.setattr(auth_mod, "_hub_delete", lambda *a, **kw: None) |
| 610 | |
| 611 | r = _rotate() |
| 612 | assert r.exit_code != 0 |
| 613 | |
| 614 | content_after = (muse_dir(isolated) / "identity.toml").read_bytes() |
| 615 | assert content_after == content_before, ( |
| 616 | "identity.toml must be byte-for-byte unchanged after a failed rotation" |
| 617 | ) |
| 618 | |
| 619 | def test_VI2_old_fingerprint_not_written_back_after_rotate( |
| 620 | self, isolated: pathlib.Path, fixed_mnemonic: str, |
| 621 | ) -> None: |
| 622 | """VI2: after a successful rotate, the old fingerprint never reappears in identity.toml. |
| 623 | |
| 624 | Guards against a race where identity.toml is written twice (once with |
| 625 | the new key, once with the old key due to a retry or cleanup bug). |
| 626 | """ |
| 627 | try: |
| 628 | import tomllib |
| 629 | except ModuleNotFoundError: |
| 630 | import tomli as tomllib # type: ignore[no-reuse-def] |
| 631 | |
| 632 | r_keygen = _keygen() |
| 633 | fp_original = _fp(r_keygen) |
| 634 | |
| 635 | r_rotate = _rotate() |
| 636 | assert r_rotate.exit_code == 0 |
| 637 | |
| 638 | toml = tomllib.loads((muse_dir(isolated) / "identity.toml").read_text()) |
| 639 | stored_fp = toml["localhost:1337"]["fingerprint"] |
| 640 | assert stored_fp != fp_original, ( |
| 641 | f"old fingerprint {fp_original!r} must not be in identity.toml after rotate" |
| 642 | ) |
| 643 | |
| 644 | |
| 645 | # --------------------------------------------------------------------------- |
| 646 | # VII Stress |
| 647 | # --------------------------------------------------------------------------- |
| 648 | |
| 649 | |
| 650 | class TestRotateStress: |
| 651 | """VII: rotate remains correct under repeated sequential calls.""" |
| 652 | |
| 653 | def test_VII1_ten_sequential_rotations_monotonically_increase_index( |
| 654 | self, isolated: pathlib.Path, fixed_mnemonic: str, |
| 655 | ) -> None: |
| 656 | """VII1: 10 rotations produce HD indexes 1–10 with no repeats or gaps. |
| 657 | |
| 658 | Regression guard against any bug where the rotation index is read from |
| 659 | a stale cache rather than the on-disk identity.toml after each write. |
| 660 | """ |
| 661 | _keygen() |
| 662 | fingerprints: list[str] = [] |
| 663 | indexes: list[int] = [] |
| 664 | |
| 665 | for _ in range(10): |
| 666 | r = _rotate() |
| 667 | assert r.exit_code == 0, f"rotate failed: {r.output}" |
| 668 | fingerprints.append(_fp(r)) |
| 669 | indexes.append(_rotation_index(_hd_path(r))) |
| 670 | |
| 671 | assert indexes == list(range(1, 11)), ( |
| 672 | f"rotation indexes must be 1..10 in order, got {indexes}" |
| 673 | ) |
| 674 | assert len(set(fingerprints)) == 10, ( |
| 675 | "all 10 rotations must produce unique fingerprints" |
| 676 | ) |
| 677 | |
| 678 | |
| 679 | # --------------------------------------------------------------------------- |
| 680 | # VIII Security |
| 681 | # --------------------------------------------------------------------------- |
| 682 | |
| 683 | |
| 684 | class TestRotateSecurity: |
| 685 | """VIII: rotate must not leak secrets in any output channel.""" |
| 686 | |
| 687 | def test_VIII1_passphrase_not_in_json_output( |
| 688 | self, isolated: pathlib.Path, fixed_mnemonic: str, |
| 689 | ) -> None: |
| 690 | """VIII1: the BIP-39 passphrase must never appear in --json stdout. |
| 691 | |
| 692 | A passphrase in stdout lands in shell history, log aggregators, and |
| 693 | CI artefacts. If it ever appears in JSON output, every deployment |
| 694 | using that passphrase must be treated as compromised. |
| 695 | """ |
| 696 | passphrase = "super-secret-passphrase-xK7!" |
| 697 | _keygen(["--passphrase-fd", str(_pipe_passphrase(passphrase))]) |
| 698 | r = _rotate(["--passphrase-fd", str(_pipe_passphrase(passphrase))]) |
| 699 | assert r.exit_code == 0, r.output |
| 700 | assert passphrase not in (r.output or ""), ( |
| 701 | "BIP-39 passphrase must never appear in --json stdout" |
| 702 | ) |
| 703 | |
| 704 | def test_VIII2_mnemonic_not_in_json_output( |
| 705 | self, isolated: pathlib.Path, fixed_mnemonic: str, |
| 706 | ) -> None: |
| 707 | """VIII2: the BIP-39 mnemonic must never appear in --json stdout. |
| 708 | |
| 709 | The mnemonic is the root secret for the entire HD wallet. Its appearance |
| 710 | in any log or output is a catastrophic credential leak. |
| 711 | """ |
| 712 | _keygen() |
| 713 | r = _rotate() |
| 714 | assert r.exit_code == 0, r.output |
| 715 | for word in _MNEMONIC.split(): |
| 716 | # Individual common words may appear by coincidence (e.g. "about"), |
| 717 | # so check for the full phrase. |
| 718 | pass |
| 719 | assert _MNEMONIC not in (r.output or ""), ( |
| 720 | "BIP-39 mnemonic must never appear in --json stdout" |
| 721 | ) |
| 722 | |
| 723 | def test_VIII3_no_pem_written_during_rotate( |
| 724 | self, isolated: pathlib.Path, fixed_mnemonic: str, |
| 725 | ) -> None: |
| 726 | """VIII3: rotate must not write any .pem file containing key material. |
| 727 | |
| 728 | PEM files on disk are a persistent credential store that survives |
| 729 | process termination and may be readable by other processes if |
| 730 | permissions are set incorrectly. Key material stays in memory only. |
| 731 | """ |
| 732 | _keygen() |
| 733 | _rotate() |
| 734 | keys_dir = muse_dir(isolated) / "keys" |
| 735 | pem_files = list(keys_dir.glob("**/*.pem")) if keys_dir.exists() else [] |
| 736 | assert pem_files == [], f"PEM files must not be written during rotate: {pem_files}" |
| 737 | |
| 738 | |
| 739 | # --------------------------------------------------------------------------- |
| 740 | # IX Performance |
| 741 | # --------------------------------------------------------------------------- |
| 742 | |
| 743 | |
| 744 | class TestRotatePerformance: |
| 745 | """IX: rotate key derivation overhead is negligible.""" |
| 746 | |
| 747 | def test_IX1_rotate_completes_under_500ms( |
| 748 | self, isolated: pathlib.Path, fixed_mnemonic: str, |
| 749 | ) -> None: |
| 750 | """IX1: a single rotate (with stubbed hub) completes in under 500 ms. |
| 751 | |
| 752 | Key derivation (SLIP-0010 HD + Ed25519) is the only non-trivial work |
| 753 | when the hub is stubbed. 500 ms is a generous bound; regressions here |
| 754 | indicate an algorithmic change in the derivation path worth investigating. |
| 755 | """ |
| 756 | import time |
| 757 | |
| 758 | _keygen() |
| 759 | start = time.perf_counter() |
| 760 | r = _rotate() |
| 761 | elapsed_ms = (time.perf_counter() - start) * 1000 |
| 762 | |
| 763 | assert r.exit_code == 0, r.output |
| 764 | assert elapsed_ms < 500, ( |
| 765 | f"rotate took {elapsed_ms:.1f} ms — expected under 500 ms. " |
| 766 | "Key derivation overhead may have regressed." |
| 767 | ) |
File History
4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e
fix: rename objects→blobs in push client and all stale test…
Sonnet 4.6
patch
23 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a
fix: repair four test failures from post-migration audit
Sonnet 4.6
patch
29 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠
29 days ago