"""TDD — migrate --force-resign must re-sign commits that have stale signatures. After ``muse code migrate`` rewrites commit IDs, any existing signature becomes invalid (the signature covers the old commit_id, not the new one). Running ``migrate(force_resign=True)`` must detect this and re-sign every commit, returning ``commits_signed == N`` and leaving the repo in a verifiable state. Tests ----- FR1 (RED) force_resign=True returns commits_signed == N for a repo whose commits all carry stale signatures (migrated but not re-signed). FR2 (RED) After force-resign the written commits pass Ed25519 verification. FR3 (GREEN) force_resign=False (default) does NOT re-sign already-signed commits. FR4 (RED) force_resign=True is idempotent — running it twice still returns commits_signed == N on the second run (existing valid sigs replaced). FR5 (RED) A repo with a mix of unsigned commits and stale-signed commits: force_resign re-signs only the signed ones (unsigned ones are also signed, just via the ``is_unsigned`` path — commits_signed == total). """ from __future__ import annotations import datetime import json import pathlib import msgpack import pytest from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from muse.core.migrate import migrate from muse.core.paths import commits_dir, muse_dir, ref_path, snapshots_dir from muse.core.provenance import ( encode_public_key, provenance_payload, sign_commit_ed25519, verify_commit_ed25519, ) from muse.core.types import decode_pubkey from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.transport import SigningIdentity from muse.core.types import b64url_encode, blob_id, long_id, split_id _CommitDict = dict[str, str | int | None | list[str]] # raw commit msgpack dict # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- _AT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) _AT_ISO = _AT.isoformat() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_key() -> Ed25519PrivateKey: return Ed25519PrivateKey.generate() def _make_identity(handle: str = "gabriel") -> SigningIdentity: return SigningIdentity(handle=handle, private_key=_make_key()) def _init_repo(tmp_path: pathlib.Path) -> pathlib.Path: muse = muse_dir(tmp_path) for sub in ("commits/sha256", "snapshots/sha256", "objects/sha256", "refs/heads", "remotes"): (muse / sub).mkdir(parents=True) (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (muse / "repo.json").write_text( json.dumps({"repo_id": blob_id(b"test-repo"), "domain": "code"}), encoding="utf-8", ) return tmp_path def _write_snap(repo: pathlib.Path, tag: str = "a") -> str: manifest = {f"file_{tag}.py": long_id("a" * 64)} sid = compute_snapshot_id(manifest) hex_id = long_id(sid, strip=True) path = snapshots_dir(repo) / "sha256" / f"{hex_id}.msgpack" path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(msgpack.packb( {"snapshot_id": sid, "manifest": manifest, "created_at": _AT_ISO}, use_bin_type=True, )) return sid def _sign_over_id(commit_id: str, private_key: Ed25519PrivateKey, author: str = "gabriel") -> str: """Produce a valid Ed25519 signature whose payload references ``commit_id``.""" payload = provenance_payload( commit_id=commit_id, author=author, committed_at=_AT_ISO, ) return sign_commit_ed25519(payload, private_key) def _pub_str(private_key: Ed25519PrivateKey) -> str: _, pub = encode_public_key(private_key) return pub def _write_commit_raw( repo: pathlib.Path, *, commit_id: str, snapshot_id: str, message: str, parent_id: str | None = None, signature: str = "", signer_public_key: str = "", ) -> None: """Write a raw commit dict to the canonical commits/sha256/ location.""" raw = { "commit_id": commit_id, "branch": "main", "snapshot_id": snapshot_id, "message": message, "committed_at": _AT_ISO, "parent_commit_id": parent_id, "parent2_commit_id": None, "author": "gabriel", "metadata": {}, "structured_delta": None, "sem_ver_bump": "none", "breaking_changes": [], "agent_id": "", "model_id": "", "toolchain_id": "", "prompt_hash": "", "signature": signature, "signer_public_key": signer_public_key, "signer_key_id": "", "format_version": 8, "reviewed_by": [], "test_runs": 0, "labels": [], "status": "", "notes": [], "score": None, } hex_id = long_id(commit_id, strip=True) path = commits_dir(repo) / "sha256" / f"{hex_id}.msgpack" path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(msgpack.packb(raw, use_bin_type=True)) def _set_ref(repo: pathlib.Path, branch: str, commit_id: str) -> None: path = ref_path(repo, branch) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(commit_id + "\n", encoding="utf-8") def _read_raw_commit(repo: pathlib.Path, commit_id: str) -> _CommitDict: hex_id = long_id(commit_id, strip=True) path = commits_dir(repo) / "sha256" / f"{hex_id}.msgpack" return msgpack.unpackb(path.read_bytes(), raw=False) def _build_stale_signed_chain( repo: pathlib.Path, signing_key: Ed25519PrivateKey, n: int = 3 ) -> list[str]: """Build a chain of N commits each signed over their own commit_id. The key twist: we sign each commit over a STALE commit_id — a fake ID that differs from the canonical ``compute_commit_id`` result. This simulates the post-migration state where the ID was rewritten but the signature was not updated. Returns list of (canonical) commit_ids written to disk. """ pub = _pub_str(signing_key) commit_ids: list[str] = [] parent_id: str | None = None for i in range(n): sid = _write_snap(repo, tag=str(i)) # Canonical ID (what the record will be stored as) canonical_id = compute_commit_id( parent_ids=[parent_id] if parent_id else [], snapshot_id=sid, message=f"commit {i}", committed_at_iso=_AT_ISO, author="gabriel", signer_public_key=pub, ) # Stale ID — blob of something different; the signature covers THIS, not canonical_id stale_id = blob_id(f"stale-{i}".encode()) stale_sig = _sign_over_id(stale_id, signing_key) _write_commit_raw( repo, commit_id=canonical_id, snapshot_id=sid, message=f"commit {i}", parent_id=parent_id, signature=stale_sig, signer_public_key=pub, ) commit_ids.append(canonical_id) parent_id = canonical_id _set_ref(repo, "main", commit_ids[-1]) return commit_ids # ══════════════════════════════════════════════════════════════════════════════ # FR1 — force_resign=True returns commits_signed == N # ══════════════════════════════════════════════════════════════════════════════ def test_fr1_force_resign_returns_commits_signed(tmp_path: pathlib.Path) -> None: """migrate(force_resign=True) must return commits_signed == N when commits carry stale signatures (signed over a different commit_id). This test is RED until the --force-resign re-signing path is fixed. """ repo = _init_repo(tmp_path) identity = _make_identity() n = 3 _build_stale_signed_chain(repo, identity.private_key, n) result = migrate(repo, signing_identity=identity, force_resign=True) assert result.commits_signed == n, ( f"Expected commits_signed={n}, got {result.commits_signed}. " "force_resign=True must re-sign every commit that carries a stale signature." ) # ══════════════════════════════════════════════════════════════════════════════ # FR2 — commits pass Ed25519 verification after force-resign # ══════════════════════════════════════════════════════════════════════════════ def test_fr2_commits_verify_after_force_resign(tmp_path: pathlib.Path) -> None: """After migrate(force_resign=True), every written commit must have a signature that verifies correctly against its stored commit_id. """ repo = _init_repo(tmp_path) identity = _make_identity() commit_ids = _build_stale_signed_chain(repo, identity.private_key, 3) migrate(repo, signing_identity=identity, force_resign=True) # After migrate, commit IDs may have changed — discover actual stored IDs. # For simplicity, verify all commits in commits/sha256/ failed: list[str] = [] for commit_path in sorted((commits_dir(repo) / "sha256").glob("**/*.msgpack")): raw = msgpack.unpackb(commit_path.read_bytes(), raw=False) sig = raw.get("signature", "") pub_key_str = raw.get("signer_public_key", "") cid = raw.get("commit_id", "") if not sig or not pub_key_str or not cid: continue payload = provenance_payload( commit_id=cid, author=raw.get("author", ""), agent_id=raw.get("agent_id", ""), model_id=raw.get("model_id", ""), toolchain_id=raw.get("toolchain_id", ""), prompt_hash=raw.get("prompt_hash", ""), committed_at=raw.get("committed_at", ""), ) try: _, pub_raw = decode_pubkey(pub_key_str) except Exception: failed.append(cid) continue if not verify_commit_ed25519(payload, sig, pub_raw): failed.append(cid) assert not failed, ( f"{len(failed)} commit(s) still have invalid signatures after force-resign: " f"{failed[:3]}" ) # ══════════════════════════════════════════════════════════════════════════════ # FR3 — force_resign=False does NOT re-sign commits that are already signed # ══════════════════════════════════════════════════════════════════════════════ def test_fr3_no_resign_without_flag(tmp_path: pathlib.Path) -> None: """Without force_resign, migrate must not re-sign commits that already have a signature (even if that signature is stale). commits_signed must be 0. """ repo = _init_repo(tmp_path) identity = _make_identity() _build_stale_signed_chain(repo, identity.private_key, 3) result = migrate(repo, signing_identity=identity, force_resign=False) assert result.commits_signed == 0, ( f"Expected commits_signed=0 without --force-resign, got {result.commits_signed}." ) # ══════════════════════════════════════════════════════════════════════════════ # FR4 — force_resign=True is idempotent (re-sign already-valid signatures) # ══════════════════════════════════════════════════════════════════════════════ def test_fr4_force_resign_idempotent(tmp_path: pathlib.Path) -> None: """Running migrate(force_resign=True) a second time on an already-resigned repo must still report commits_signed == N — the flag is unconditional. """ repo = _init_repo(tmp_path) identity = _make_identity() n = 2 _build_stale_signed_chain(repo, identity.private_key, n) # First run — fixes stale sigs r1 = migrate(repo, signing_identity=identity, force_resign=True) assert r1.commits_signed == n, f"First run: expected {n}, got {r1.commits_signed}" # Second run — sigs are now valid, but force_resign must re-sign anyway r2 = migrate(repo, signing_identity=identity, force_resign=True) assert r2.commits_signed == n, ( f"Second run: expected {n} (idempotent), got {r2.commits_signed}. " "force_resign=True must always re-sign, regardless of current sig validity." ) # ══════════════════════════════════════════════════════════════════════════════ # FR5 — force_resign=True re-signs the stale-signed commits # ══════════════════════════════════════════════════════════════════════════════ # ══════════════════════════════════════════════════════════════════════════════ # FR6 — force_resign=True with signing_identity=None raises ValueError # ══════════════════════════════════════════════════════════════════════════════ def test_fr6_force_resign_without_identity_raises(tmp_path: pathlib.Path) -> None: """migrate(force_resign=True, signing_identity=None) must raise ValueError. The root cause of the production bug: when the repo has no hub configured, get_signing_identity() returns None. migrate() then silently returns commits_signed=0 — a silent no-op that leaves all signatures invalid. The fix: migrate() must raise ValueError when force_resign=True but no signing identity is provided, so the caller gets a clear error instead of a mysterious zero count. """ repo = _init_repo(tmp_path) identity = _make_identity() _build_stale_signed_chain(repo, identity.private_key, 2) with pytest.raises(ValueError, match="force_resign"): migrate(repo, signing_identity=None, force_resign=True) def test_fr5_force_resign_covers_stale_signed_commits(tmp_path: pathlib.Path) -> None: """force_resign=True must re-sign commits that have stale signatures (not just unsigned commits). commits_signed must equal the total count. """ repo = _init_repo(tmp_path) identity = _make_identity() n = 4 _build_stale_signed_chain(repo, identity.private_key, n) result = migrate(repo, signing_identity=identity, force_resign=True) # All commits (stale-signed) must be re-signed assert result.commits_signed == n, ( f"Expected all {n} stale-signed commits to be re-signed, " f"got commits_signed={result.commits_signed}." )