"""Tests for ``muse verify-commit`` — verify Ed25519 signatures on commits. Coverage tiers -------------- Unit: _resolve_ref — HEAD→tip, HEAD→missing branch, sha256:-prefixed passthrough, branch name→ref file, missing branch ref→None _verify_one — valid sig, unsigned, missing commit, missing pubkey, bit-flip in signature, unknown sig algo, unknown pubkey algo, committed_at tamper, model_id tamper, wrong keypair, decode_pubkey ValueError, signed_at non-empty, error None on success, key_status cache hit _fetch_key_status — active, revoked, unknown status value, network error Integration: Text output: OK/BAD/ERR lines, (unsigned) signer, key= part present/absent JSON output: all schema fields, error field stripped, signed_at non-empty, signer matches agent_id --strict: unsigned exits nonzero; without --strict exits 0 --check-key-status: unknown without hub; caches per key_id HEAD shorthand, branch name ref Batch: all valid exits 0; one invalid exits nonzero; result order preserved Nonexistent sha256:- ref → USER_ERROR --json flag accepted (shorthand alias) Security: ANSI escape in ref → rejected, error to stderr, stdout empty Null byte in ref → rejected Path traversal ref → rejected Bare hex ref → rejected with clear message, error to stderr, stdout empty No traceback on bad ref or bad format Data integrity: committed_at tamper → invalid through full CLI flow model_id tamper → invalid through full CLI flow Wrong keypair → invalid (public key doesn't match signing key) Stress: 100 signed commits all verify correctly (unit) Batch of 50 commits via CLI → all results emitted key_status_cache: N commits with same key → exactly 1 network call """ from __future__ import annotations from collections.abc import Mapping import datetime import json import pathlib from typing import TYPE_CHECKING from unittest.mock import MagicMock, patch import pytest from muse.core.types import blob_id, decode_sig, encode_sig, split_id if TYPE_CHECKING: from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from muse.core.object_store import write_object from muse.core.provenance import ( encode_public_key, provenance_payload, sign_commit_record, verify_commit_ed25519, ) from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, commit_path, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest from muse.core.paths import heads_dir, muse_dir, ref_path from tests.cli_test_helper import CliRunner, InvokeResult runner = CliRunner() _REPO_ID = "verify-commit-test" _counter = 0 # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _init_repo(path: pathlib.Path) -> pathlib.Path: muse = muse_dir(path) for d in ("commits", "snapshots", "objects", "refs/heads", "code"): (muse / d).mkdir(parents=True, exist_ok=True) (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (muse / "repo.json").write_text( json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" ) return path def _env(repo: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(repo)} def _make_key() -> "Ed25519PrivateKey": """Generate a fresh Ed25519 private key.""" from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey return Ed25519PrivateKey.generate() def _commit_files( root: pathlib.Path, files: Mapping[str, bytes], branch: str = "main", message: str | None = None, sign: bool = False, private_key: "Ed25519PrivateKey | None" = None, agent_id: str = "test-agent", model_id: str = "", ) -> tuple[str, CommitRecord]: """Create a commit; optionally sign it. Returns (commit_id, CommitRecord).""" global _counter _counter += 1 manifest: Manifest = {} for rel_path, content in files.items(): obj_id = blob_id(content) write_object(root, obj_id, content) manifest[rel_path] = obj_id abs_path = root / rel_path abs_path.parent.mkdir(parents=True, exist_ok=True) abs_path.write_bytes(content) snap_id = hash_snapshot(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime.now(datetime.timezone.utc) branch_ref = ref_path(root, branch) parent_id = branch_ref.read_text(encoding="utf-8").strip() if branch_ref.exists() else None parents = [parent_id] if parent_id else [] msg = message or f"commit {_counter}" # Resolve public key before compute_commit_id so signer_public_key is # bound into the v2 hash (matching what _verify_commit_id expects). sig = "" pub_b64 = "" key_id = "" if sign and private_key is not None: from muse.core.provenance import encode_public_key _, pub_b64 = encode_public_key(private_key) commit_id = hash_commit( parent_ids=parents, snapshot_id=snap_id, message=msg, committed_at_iso=committed_at.isoformat(), signer_public_key=pub_b64, ) if sign and private_key is not None: result = sign_commit_record( commit_id, agent_id, private_key, model_id=model_id, committed_at=committed_at.isoformat(), ) if result: sig, pub_b64, key_id = result record = CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message=msg, committed_at=committed_at, parent_commit_id=parent_id, agent_id=agent_id if sign else "", model_id=model_id if sign else "", signature=sig, signer_public_key=pub_b64, signer_key_id=key_id, ) write_commit(root, record) branch_ref.write_text(commit_id, encoding="utf-8") return commit_id, record def _invoke(repo: pathlib.Path, *args: str) -> InvokeResult: from muse.cli.app import main as cli return runner.invoke(cli, ["verify-commit", *args], env=_env(repo)) def _force_write_commit(root: pathlib.Path, record: CommitRecord) -> None: """Overwrite a commit object unconditionally (bypasses write_commit idempotency). Writes to the unified object store path so read_commit can find it. Uses JSON format with the 'commit \0' header. """ import json as _json from muse.core.object_store import object_path as _object_path obj_file = _object_path(root, record.commit_id) obj_file.parent.mkdir(parents=True, exist_ok=True) payload = _json.dumps(record.to_dict(), separators=(",", ":")).encode() obj_file.write_bytes(f"commit {len(payload)}\0".encode() + payload) # --------------------------------------------------------------------------- # Unit — _resolve_ref # --------------------------------------------------------------------------- class TestResolveRef: def test_head_resolves_to_branch_tip(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.verify_commit import _resolve_ref root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) resolved = _resolve_ref(root, "HEAD") assert resolved == commit_id def test_head_returns_none_when_branch_has_no_commits(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.verify_commit import _resolve_ref root = _init_repo(tmp_path) # HEAD points to main but main ref file doesn't exist yet resolved = _resolve_ref(root, "HEAD") assert resolved is None def test_sha256_prefixed_id_passthrough(self, tmp_path: pathlib.Path) -> None: """sha256:-prefixed IDs are returned as-is (no ref-file lookup).""" from muse.cli.commands.verify_commit import _resolve_ref root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) resolved = _resolve_ref(root, commit_id) assert resolved == commit_id def test_bare_hex_normalised_to_sha256_prefix(self, tmp_path: pathlib.Path) -> None: """A 64-char bare hex is normalised to sha256: prefix.""" from muse.cli.commands.verify_commit import _resolve_ref root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) bare = split_id(commit_id)[1] resolved = _resolve_ref(root, bare) assert resolved == commit_id def test_branch_name_resolves_via_ref_file(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.verify_commit import _resolve_ref root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key, branch="dev") resolved = _resolve_ref(root, "dev") assert resolved == commit_id def test_missing_branch_ref_returns_none(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.verify_commit import _resolve_ref root = _init_repo(tmp_path) resolved = _resolve_ref(root, "nonexistent-branch") assert resolved is None # --------------------------------------------------------------------------- # Unit — _verify_one # --------------------------------------------------------------------------- class TestVerifyOne: def test_valid_signature(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) result = _verify_one(root, commit_id) assert result["valid"] is True assert result["commit_id"] == commit_id assert len(result["key_id"]) > 0 def test_unsigned_commit_valid_false_no_error(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=False) result = _verify_one(root, commit_id) assert result["valid"] is False assert result["error"] is None assert result["signer"] == "" def test_missing_commit_returns_error(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) result = _verify_one(root, blob_id(b"nonexistent commit")) assert result["valid"] is False assert "not found" in (result["error"] or "") def test_missing_public_key_valid_false(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) key = _make_key() commit_id, record = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) tampered = CommitRecord( commit_id=record.commit_id, branch=record.branch, snapshot_id=record.snapshot_id, message=record.message, committed_at=record.committed_at, parent_commit_id=record.parent_commit_id, agent_id=record.agent_id, signature=record.signature, signer_public_key="", # stripped signer_key_id=record.signer_key_id, ) _force_write_commit(root, tampered) result = _verify_one(root, commit_id) assert result["valid"] is False def test_bit_flip_in_signature(self, tmp_path: pathlib.Path) -> None: """A single bit flip in the stored signature must invalidate it.""" from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) key = _make_key() commit_id, record = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) algo, sig_bytes = decode_sig(record.signature) flipped = bytes([sig_bytes[0] ^ 0xFF]) + sig_bytes[1:] bad_sig = encode_sig(algo, flipped) tampered = CommitRecord( commit_id=record.commit_id, branch=record.branch, snapshot_id=record.snapshot_id, message=record.message, committed_at=record.committed_at, parent_commit_id=record.parent_commit_id, agent_id=record.agent_id, signature=bad_sig, signer_public_key=record.signer_public_key, signer_key_id=record.signer_key_id, ) _force_write_commit(root, tampered) result = _verify_one(root, commit_id) assert result["valid"] is False def test_unknown_signature_algorithm(self, tmp_path: pathlib.Path) -> None: """A commit whose signature carries an unknown algorithm prefix returns valid=False.""" from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) key = _make_key() commit_id, record = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) _, raw_sig_bytes = decode_sig(record.signature) unknown_sig = encode_sig("mldsa65", raw_sig_bytes) tampered = CommitRecord( commit_id=record.commit_id, branch=record.branch, snapshot_id=record.snapshot_id, message=record.message, committed_at=record.committed_at, parent_commit_id=record.parent_commit_id, agent_id=record.agent_id, signature=unknown_sig, signer_public_key=record.signer_public_key, signer_key_id=record.signer_key_id, ) _force_write_commit(root, tampered) result = _verify_one(root, commit_id) assert result["valid"] is False assert "mldsa65" in (result.get("error") or "") def test_unknown_public_key_algorithm(self, tmp_path: pathlib.Path) -> None: """A commit with an unknown pubkey algorithm prefix returns valid=False with error.""" from muse.cli.commands.verify_commit import _verify_one from muse.core.types import encode_pubkey, encode_sig from muse.core.ids import hash_snapshot from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) root = _init_repo(tmp_path) # Build commit with mldsa65 key from scratch so commit_id is consistent. fake_mldsa_key = encode_pubkey("mldsa65", b"\xab" * 32) fake_sig = encode_sig("ed25519", b"\x00" * 64) snap_id = hash_snapshot({"a.py": blob_id(b"x = 1\n")}) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest={"a.py": blob_id(b"x = 1\n")})) import datetime committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) commit_id = hash_commit( parent_ids=[], snapshot_id=snap_id, message="mldsa-test", committed_at_iso=committed_at.isoformat(), signer_public_key=fake_mldsa_key, ) record = CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message="mldsa-test", committed_at=committed_at, agent_id="test-agent", signature=fake_sig, signer_public_key=fake_mldsa_key, ) commit_path(root, commit_id).parent.mkdir(parents=True, exist_ok=True) _force_write_commit(root, record) (heads_dir(root) / "main").write_text(commit_id, encoding="utf-8") result = _verify_one(root, commit_id) assert result["valid"] is False assert "mldsa65" in (result.get("error") or "") def test_committed_at_tamper_invalidates_signature(self, tmp_path: pathlib.Path) -> None: """Mutating committed_at in the stored record must invalidate the signature.""" from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) key = _make_key() commit_id, record = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) # Shift committed_at by one day. original_ts = record.committed_at tampered_ts = original_ts + datetime.timedelta(days=1) tampered = CommitRecord( commit_id=record.commit_id, branch=record.branch, snapshot_id=record.snapshot_id, message=record.message, committed_at=tampered_ts, # mutated parent_commit_id=record.parent_commit_id, agent_id=record.agent_id, signature=record.signature, signer_public_key=record.signer_public_key, signer_key_id=record.signer_key_id, ) _force_write_commit(root, tampered) result = _verify_one(root, commit_id) assert result["valid"] is False def test_agent_id_tamper_invalidates_signature(self, tmp_path: pathlib.Path) -> None: """Changing agent_id in the stored record must invalidate the signature.""" from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) key = _make_key() commit_id, record = _commit_files( root, {"a.py": b"x = 1\n"}, sign=True, private_key=key, agent_id="agent-A" ) tampered = CommitRecord( commit_id=record.commit_id, branch=record.branch, snapshot_id=record.snapshot_id, message=record.message, committed_at=record.committed_at, parent_commit_id=record.parent_commit_id, agent_id="agent-B", # tampered signature=record.signature, signer_public_key=record.signer_public_key, signer_key_id=record.signer_key_id, ) _force_write_commit(root, tampered) result = _verify_one(root, commit_id) assert result["valid"] is False def test_model_id_tamper_invalidates_signature(self, tmp_path: pathlib.Path) -> None: """Changing model_id in the stored record must invalidate the signature.""" from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) key = _make_key() commit_id, record = _commit_files( root, {"a.py": b"x = 1\n"}, sign=True, private_key=key, model_id="claude-sonnet-4-6" ) tampered = CommitRecord( commit_id=record.commit_id, branch=record.branch, snapshot_id=record.snapshot_id, message=record.message, committed_at=record.committed_at, parent_commit_id=record.parent_commit_id, agent_id=record.agent_id, model_id="claude-opus-4-6", # tampered signature=record.signature, signer_public_key=record.signer_public_key, signer_key_id=record.signer_key_id, ) _force_write_commit(root, tampered) result = _verify_one(root, commit_id) assert result["valid"] is False def test_wrong_keypair_invalid(self, tmp_path: pathlib.Path) -> None: """Public key from a different keypair must not verify the signature.""" from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) signing_key = _make_key() wrong_key = _make_key() commit_id, record = _commit_files( root, {"a.py": b"x = 1\n"}, sign=True, private_key=signing_key ) # Swap in the public key from wrong_key. _, wrong_pub_b64 = encode_public_key(wrong_key) tampered = CommitRecord( commit_id=record.commit_id, branch=record.branch, snapshot_id=record.snapshot_id, message=record.message, committed_at=record.committed_at, parent_commit_id=record.parent_commit_id, agent_id=record.agent_id, signature=record.signature, signer_public_key=wrong_pub_b64, signer_key_id=record.signer_key_id, ) _force_write_commit(root, tampered) result = _verify_one(root, commit_id) assert result["valid"] is False def test_signed_at_populated_for_signed_commit(self, tmp_path: pathlib.Path) -> None: """signed_at is a non-empty ISO string for a signed commit.""" from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) result = _verify_one(root, commit_id) assert result["signed_at"] assert "T" in result["signed_at"] # ISO 8601 format def test_error_none_for_valid_commit(self, tmp_path: pathlib.Path) -> None: """error field is None when the signature is valid.""" from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) result = _verify_one(root, commit_id) assert result["error"] is None def test_key_status_unknown_without_hub(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) result = _verify_one(root, commit_id, check_key_status=True, hub_url=None) assert result["key_status"] == "unknown" def test_key_status_cache_hit_skips_network(self, tmp_path: pathlib.Path) -> None: """Cache hit must prevent a second _fetch_key_status call.""" from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) cache: dict[str, str] = {} call_count = 0 def mock_fetch(hub_url: str, key_id: str) -> str: nonlocal call_count call_count += 1 return "active" with patch("muse.cli.commands.verify_commit._fetch_key_status", side_effect=mock_fetch): _verify_one(root, commit_id, check_key_status=True, hub_url="http://fake", key_status_cache=cache) _verify_one(root, commit_id, check_key_status=True, hub_url="http://fake", key_status_cache=cache) assert call_count == 1 def test_json_schema_all_keys_present(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) result = _verify_one(root, commit_id) assert {"commit_id", "valid", "signer", "key_id", "signed_at", "key_status", "error"} == set(result) # --------------------------------------------------------------------------- # Unit — _fetch_key_status # --------------------------------------------------------------------------- class TestFetchKeyStatus: def test_returns_active(self) -> None: from muse.cli.commands.verify_commit import _fetch_key_status mock_resp = MagicMock() mock_resp.read.return_value = json.dumps({"status": "active"}).encode() mock_resp.__enter__ = lambda s: s mock_resp.__exit__ = MagicMock(return_value=False) with patch("urllib.request.urlopen", return_value=mock_resp): assert _fetch_key_status("http://hub", "key123") == "active" def test_returns_revoked(self) -> None: from muse.cli.commands.verify_commit import _fetch_key_status mock_resp = MagicMock() mock_resp.read.return_value = json.dumps({"status": "revoked"}).encode() mock_resp.__enter__ = lambda s: s mock_resp.__exit__ = MagicMock(return_value=False) with patch("urllib.request.urlopen", return_value=mock_resp): assert _fetch_key_status("http://hub", "key123") == "revoked" def test_returns_unknown_for_unrecognised_status(self) -> None: from muse.cli.commands.verify_commit import _fetch_key_status mock_resp = MagicMock() mock_resp.read.return_value = json.dumps({"status": "pending"}).encode() mock_resp.__enter__ = lambda s: s mock_resp.__exit__ = MagicMock(return_value=False) with patch("urllib.request.urlopen", return_value=mock_resp): assert _fetch_key_status("http://hub", "key123") == "unknown" def test_returns_unknown_on_network_error(self) -> None: from muse.cli.commands.verify_commit import _fetch_key_status with patch("urllib.request.urlopen", side_effect=OSError("connection refused")): assert _fetch_key_status("http://hub", "key123") == "unknown" def test_returns_unknown_on_timeout(self) -> None: from muse.cli.commands.verify_commit import _fetch_key_status import socket with patch("urllib.request.urlopen", side_effect=socket.timeout("timed out")): assert _fetch_key_status("http://hub", "key123") == "unknown" def test_returns_unknown_on_invalid_json(self) -> None: from muse.cli.commands.verify_commit import _fetch_key_status mock_resp = MagicMock() mock_resp.read.return_value = b"not json" mock_resp.__enter__ = lambda s: s mock_resp.__exit__ = MagicMock(return_value=False) with patch("urllib.request.urlopen", return_value=mock_resp): assert _fetch_key_status("http://hub", "key123") == "unknown" # --------------------------------------------------------------------------- # Integration — text output # --------------------------------------------------------------------------- class TestTextOutput: def test_ok_line_format(self, tmp_path: pathlib.Path) -> None: """Text output: 'OK signer= key='""" root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files( root, {"a.py": b"x = 1\n"}, sign=True, private_key=key, agent_id="claude-code" ) result = _invoke(root, commit_id) assert result.exit_code == 0 assert "OK" in result.output assert "claude-code" in result.output assert "key=" in result.output def test_bad_line_for_invalid_signature(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) key = _make_key() commit_id, record = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) algo, sig_bytes = decode_sig(record.signature) flipped = bytes([sig_bytes[0] ^ 0xFF]) + sig_bytes[1:] tampered = CommitRecord( commit_id=record.commit_id, branch=record.branch, snapshot_id=record.snapshot_id, message=record.message, committed_at=record.committed_at, parent_commit_id=record.parent_commit_id, agent_id=record.agent_id, signature=encode_sig(algo, flipped), signer_public_key=record.signer_public_key, signer_key_id=record.signer_key_id, ) _force_write_commit(root, tampered) result = _invoke(root, commit_id) assert result.exit_code != 0 assert "BAD" in result.output def test_err_line_for_missing_commit(self, tmp_path: pathlib.Path) -> None: """Text output: 'ERR (commit not found)'""" root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) # Write a ref that points to a commit that doesn't exist. ghost_id = blob_id(b"ghost commit that does not exist") result = _invoke(root, ghost_id) assert result.exit_code != 0 assert "ERR" in result.output def test_unsigned_shows_unsigned_signer(self, tmp_path: pathlib.Path) -> None: """Unsigned commits show '(unsigned)' as the signer in text output.""" root = _init_repo(tmp_path) commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=False) result = _invoke(root, commit_id) assert result.exit_code == 0 assert "(unsigned)" in result.output def test_key_absent_from_text_output_for_unsigned(self, tmp_path: pathlib.Path) -> None: """key= part must not appear in text output for unsigned commits.""" root = _init_repo(tmp_path) commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=False) result = _invoke(root, commit_id) assert "key=" not in result.output def test_short_commit_id_in_text_output(self, tmp_path: pathlib.Path) -> None: """Text output uses short_id, not the full 71-char commit ID.""" root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) result = _invoke(root, commit_id) assert commit_id not in result.output # full ID not present assert commit_id[len("sha256:"):len("sha256:") + 12] in result.output # short hex present # --------------------------------------------------------------------------- # Integration — JSON output # --------------------------------------------------------------------------- class TestJsonOutput: def test_valid_commit_all_fields(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files( root, {"a.py": b"x = 1\n"}, sign=True, private_key=key, agent_id="claude-code" ) result = _invoke(root, commit_id, "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["commit_id"] == commit_id assert data["valid"] is True assert data["signer"] == "claude-code" assert data["key_id"] assert data["key_status"] == "unknown" def test_signed_at_non_empty_for_signed_commit(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) result = _invoke(root, commit_id, "--json") data = json.loads(result.stdout) assert data["signed_at"] assert "T" in data["signed_at"] def test_error_field_stripped_from_json_output(self, tmp_path: pathlib.Path) -> None: """The internal 'error' field must not appear in emitted JSON.""" root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) result = _invoke(root, commit_id, "--json") data = json.loads(result.stdout) assert "error" not in data def test_duration_ms_and_exit_code_in_json(self, tmp_path: pathlib.Path) -> None: """duration_ms and exit_code are present in every JSON result line.""" root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) result = _invoke(root, commit_id, "--json") data = json.loads(result.stdout) assert "duration_ms" in data assert isinstance(data["duration_ms"], float) assert data["duration_ms"] >= 0 assert "exit_code" in data assert data["exit_code"] == 0 def test_exit_code_nonzero_in_json_on_failure(self, tmp_path: pathlib.Path) -> None: """exit_code reflects the actual exit status — non-zero when verification fails.""" root = _init_repo(tmp_path) result = _invoke(root, blob_id(b"nonexistent commit"), "--json") data = json.loads(result.stdout) assert data["exit_code"] != 0 assert data["duration_ms"] >= 0 def test_unsigned_commit_json(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=False) result = _invoke(root, commit_id, "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["valid"] is False assert data["signer"] == "" assert "error" not in data def test_batch_each_line_is_valid_json(self, tmp_path: pathlib.Path) -> None: """Batch output: one valid JSON object per line.""" root = _init_repo(tmp_path) key = _make_key() ids = [_commit_files(root, {f"f{i}.py": f"x={i}".encode()}, sign=True, private_key=key)[0] for i in range(3)] result = _invoke(root, *ids, "--json") assert result.exit_code == 0 lines = [l for l in result.stdout.strip().splitlines() if l] assert len(lines) == 3 for line in lines: obj = json.loads(line) assert obj["valid"] is True def test_batch_results_in_submission_order(self, tmp_path: pathlib.Path) -> None: """Batch results must arrive in the same order as the input commit IDs.""" root = _init_repo(tmp_path) key = _make_key() ids = [_commit_files(root, {f"f{i}.py": f"x={i}".encode()}, sign=True, private_key=key)[0] for i in range(5)] result = _invoke(root, *ids, "--json") assert result.exit_code == 0 lines = [l for l in result.stdout.strip().splitlines() if l] returned_ids = [json.loads(l)["commit_id"] for l in lines] assert returned_ids == ids def test_json_flag_alias(self, tmp_path: pathlib.Path) -> None: """--json is accepted and produces JSON output.""" root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) result = _invoke(root, "--json", commit_id) assert result.exit_code == 0 assert "valid" in json.loads(result.stdout) # --------------------------------------------------------------------------- # Integration — HEAD and branch name refs # --------------------------------------------------------------------------- class TestRefResolution: def test_head_shorthand(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) key = _make_key() _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) result = _invoke(root, "HEAD", "--json") assert result.exit_code == 0 assert json.loads(result.stdout)["valid"] is True def test_branch_name_ref(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key, branch="dev") result = _invoke(root, "dev", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["commit_id"] == commit_id assert data["valid"] is True def test_nonexistent_sha256_ref_exits_user_error(self, tmp_path: pathlib.Path) -> None: """A sha256:-prefixed ID that doesn't exist in the store exits USER_ERROR.""" root = _init_repo(tmp_path) key = _make_key() _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) ghost = blob_id(b"ghost commit") result = _invoke(root, ghost, "--json") assert result.exit_code != 0 data = json.loads(result.stdout) assert data["valid"] is False def test_nonexistent_branch_exits_user_error(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "no-such-branch") assert result.exit_code != 0 assert result.stdout_bytes == b"" # error went to stderr # --------------------------------------------------------------------------- # Integration — --strict # --------------------------------------------------------------------------- class TestStrictMode: def test_unsigned_no_strict_exits_0(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=False) result = _invoke(root, commit_id, "--json") assert result.exit_code == 0 assert json.loads(result.stdout)["valid"] is False def test_unsigned_strict_exits_nonzero(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=False) result = _invoke(root, commit_id, "--strict") assert result.exit_code != 0 def test_signed_strict_exits_0(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) result = _invoke(root, commit_id, "--strict") assert result.exit_code == 0 def test_batch_one_unsigned_strict_exits_nonzero(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) key = _make_key() cid1, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) cid2, _ = _commit_files(root, {"a.py": b"x = 2\n"}, sign=False) result = _invoke(root, cid1, cid2, "--strict") assert result.exit_code != 0 # --------------------------------------------------------------------------- # Integration — --check-key-status # --------------------------------------------------------------------------- class TestCheckKeyStatus: def test_unknown_without_hub(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) result = _invoke(root, commit_id, "--check-key-status", "--json") assert result.exit_code == 0 assert json.loads(result.stdout)["key_status"] == "unknown" # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestSecurity: def test_ansi_in_ref_rejected(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "\x1b[31mbad\x1b[0m") assert result.exit_code != 0 assert result.stdout_bytes == b"" # error went to stderr def test_null_byte_in_ref_rejected(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "sha256:abc\x00def") assert result.exit_code != 0 def test_path_traversal_in_ref_rejected(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "../../etc/passwd") assert result.exit_code != 0 assert result.stdout_bytes == b"" def test_bare_hex_ref_rejected_with_message(self, tmp_path: pathlib.Path) -> None: """Bare hex without sha256: prefix → rejected; message mentions sha256:.""" root = _init_repo(tmp_path) bare = "a" * 64 result = _invoke(root, bare) assert result.exit_code != 0 assert result.stdout_bytes == b"" assert "sha256:" in result.stderr def test_no_traceback_on_bad_ref(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "not-a-real-ref") assert "Traceback" not in result.output assert "Traceback" not in result.stderr # --------------------------------------------------------------------------- # Data integrity — full CLI flow # --------------------------------------------------------------------------- class TestDataIntegrity: def test_committed_at_tamper_fails_cli(self, tmp_path: pathlib.Path) -> None: """committed_at mutation detected through the full CLI verify-commit flow.""" root = _init_repo(tmp_path) key = _make_key() commit_id, record = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) tampered_ts = record.committed_at + datetime.timedelta(seconds=1) tampered = CommitRecord( commit_id=record.commit_id, branch=record.branch, snapshot_id=record.snapshot_id, message=record.message, committed_at=tampered_ts, parent_commit_id=record.parent_commit_id, agent_id=record.agent_id, signature=record.signature, signer_public_key=record.signer_public_key, signer_key_id=record.signer_key_id, ) _force_write_commit(root, tampered) result = _invoke(root, commit_id, "--json") assert result.exit_code != 0 assert json.loads(result.stdout)["valid"] is False def test_model_id_tamper_fails_cli(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) key = _make_key() commit_id, record = _commit_files( root, {"a.py": b"x = 1\n"}, sign=True, private_key=key, model_id="claude-sonnet-4-6" ) tampered = CommitRecord( commit_id=record.commit_id, branch=record.branch, snapshot_id=record.snapshot_id, message=record.message, committed_at=record.committed_at, parent_commit_id=record.parent_commit_id, agent_id=record.agent_id, model_id="gpt-5", # tampered signature=record.signature, signer_public_key=record.signer_public_key, signer_key_id=record.signer_key_id, ) _force_write_commit(root, tampered) result = _invoke(root, commit_id, "--json") assert result.exit_code != 0 assert json.loads(result.stdout)["valid"] is False def test_wrong_keypair_fails_cli(self, tmp_path: pathlib.Path) -> None: """Public key from a different keypair → invalid through full CLI flow.""" root = _init_repo(tmp_path) signing_key = _make_key() wrong_key = _make_key() commit_id, record = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=signing_key) _, wrong_pub_b64 = encode_public_key(wrong_key) tampered = CommitRecord( commit_id=record.commit_id, branch=record.branch, snapshot_id=record.snapshot_id, message=record.message, committed_at=record.committed_at, parent_commit_id=record.parent_commit_id, agent_id=record.agent_id, signature=record.signature, signer_public_key=wrong_pub_b64, signer_key_id=record.signer_key_id, ) _force_write_commit(root, tampered) result = _invoke(root, commit_id, "--json") assert result.exit_code != 0 assert json.loads(result.stdout)["valid"] is False def test_ed25519_prefix_survives_store_roundtrip(self, tmp_path: pathlib.Path) -> None: """Signature and public key must keep 'ed25519:' prefix through write→read.""" from muse.core.commits import read_commit root = _init_repo(tmp_path) key = _make_key() commit_id, _ = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) reloaded = read_commit(root, commit_id) assert reloaded is not None assert reloaded.signature.startswith("ed25519:") assert reloaded.signer_public_key.startswith("ed25519:") def test_force_writecommit_path_matches_read_commit(self, tmp_path: pathlib.Path) -> None: """_force_write_commit must write to the path that read_commit reads from.""" from muse.core.commits import read_commit root = _init_repo(tmp_path) key = _make_key() commit_id, record = _commit_files(root, {"a.py": b"x = 1\n"}, sign=True, private_key=key) sentinel = CommitRecord( commit_id=record.commit_id, branch=record.branch, snapshot_id=record.snapshot_id, message=record.message, committed_at=record.committed_at, parent_commit_id=record.parent_commit_id, agent_id="sentinel-agent", signature=record.signature, signer_public_key=record.signer_public_key, signer_key_id=record.signer_key_id, ) _force_write_commit(root, sentinel) reloaded = read_commit(root, commit_id) assert reloaded is not None assert reloaded.agent_id == "sentinel-agent" # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestStress: def test_100_signed_commits_all_valid(self, tmp_path: pathlib.Path) -> None: """100 signed commits all verify correctly (unit path).""" from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) key = _make_key() for i in range(100): commit_id, _ = _commit_files( root, {f"f{i}.py": f"v = {i}\n".encode()}, sign=True, private_key=key ) result = _verify_one(root, commit_id) assert result["valid"] is True, f"commit {i} failed" def test_batch_50_commits_all_results_emitted(self, tmp_path: pathlib.Path) -> None: """Batch of 50 commits → CLI emits exactly 50 JSON lines.""" root = _init_repo(tmp_path) key = _make_key() ids = [ _commit_files(root, {f"f{i}.py": f"x={i}".encode()}, sign=True, private_key=key)[0] for i in range(50) ] result = _invoke(root, *ids, "--json") assert result.exit_code == 0 lines = [l for l in result.stdout.strip().splitlines() if l] assert len(lines) == 50 assert all(json.loads(l)["valid"] for l in lines) def test_10_different_keys_all_valid(self, tmp_path: pathlib.Path) -> None: """Each commit signed with a different key verifies independently.""" from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) for i in range(10): key = _make_key() commit_id, _ = _commit_files( root, {f"k{i}.py": f"v={i}".encode()}, sign=True, private_key=key ) result = _verify_one(root, commit_id) assert result["valid"] is True, f"key {i} failed" def test_key_status_cache_n_commits_same_key_one_call(self, tmp_path: pathlib.Path) -> None: """N commits sharing a key_id → exactly 1 network call via cache.""" from muse.cli.commands.verify_commit import _verify_one root = _init_repo(tmp_path) key = _make_key() call_count = 0 def mock_fetch(hub_url: str, key_id: str) -> str: nonlocal call_count call_count += 1 return "active" cache: dict[str, str] = {} with patch("muse.cli.commands.verify_commit._fetch_key_status", side_effect=mock_fetch): for i in range(10): commit_id, _ = _commit_files( root, {f"f{i}.py": f"x={i}".encode()}, sign=True, private_key=key ) _verify_one(root, commit_id, check_key_status=True, hub_url="http://fake", key_status_cache=cache) assert call_count == 1, f"expected 1 network call, got {call_count}" # --------------------------------------------------------------------------- # Flag registration # --------------------------------------------------------------------------- class TestRegisterFlags: def _parse(self, *args: str) -> "argparse.Namespace": import argparse from muse.cli.commands.verify_commit import register p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["verify-commit", *args]) def test_default_json_out_is_false(self) -> None: ns = self._parse("HEAD") assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = self._parse("HEAD", "--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = self._parse("HEAD", "-j") assert ns.json_out is True