"""Tests for ``muse code migrate`` — full layout and ID migration. Old-state vocabulary -------------------- flat-commit .muse/commits/.msgpack (no sha256/ subdir) flat-snapshot .muse/snapshots/.msgpack (no sha256/ subdir) flat-object .muse/objects// (no sha256/ subdir) legacy-id commit_id computed with v0 formula (not current compute_commit_id) bare-ref ref file containing raw hex (no sha256: prefix) bare-remote-ref remotes// raw hex bare-sig Ed25519 sig as raw base64url (no ed25519: prefix) legacy-repo-id repo.json "repo_id" is a plain string (pre-sha256) old-branch-key commit dict has "created_on_branch" (not "branch") old-format-ver CommitRecord format_version < 8 Post-migrate canonical state ----------------------------- objects objects/sha256// commits commits/sha256/.msgpack IDs match compute_commit_id snapshots snapshots/sha256/.msgpack branch refs sha256: remote refs sha256: repo_id sha256: signatures ed25519: commit field "branch" (not "created_on_branch") format_version 8 """ from __future__ import annotations import datetime import json import pathlib import msgpack import pytest from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from muse.core.transport import SigningIdentity from muse.core.types import b64url_encode, blob_id, encode_sig, long_id, split_id from muse.core.migrate import MigrateResult, migrate from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.object_store import object_path, read_muse_object from muse.core.paths import commits_dir, logs_dir, muse_dir, objects_dir, ref_path, remotes_dir, repo_json_path, snapshots_dir from muse.core.refs import ( get_all_branch_heads, write_branch_ref, ) from muse.core.commits import ( CommitRecord, commit_path, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) type _RawCommit = dict[str, str | int | float | bytes | None] # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- _AT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) _AT_ISO = _AT.isoformat() _REPO_ID_LEGACY = "550e8400-e29b-41d4-a716-446655440000" _REPO_ID_SHA = blob_id(_REPO_ID_LEGACY.encode()) # deterministic migration target # --------------------------------------------------------------------------- # Old-formula simulation # --------------------------------------------------------------------------- def _v0_id(parent_ids: list[str], snapshot_id: str, message: str) -> str: """Simulate a legacy commit ID (v0 formula — prepends 'v0' sentinel). Guaranteed to differ from compute_commit_id for the same inputs, which is what we need to prove migration actually rewrites commit files. """ SEP = "\x00" parts = [ "v0", SEP.join(sorted(long_id(p, strip=True) for p in parent_ids)), long_id(snapshot_id, strip=True), message, _AT_ISO, ] return blob_id(SEP.join(parts).encode()) def _canonical_id(parent_ids: list[str], snapshot_id: str, message: str) -> str: """Compute the canonical commit ID using the full 7-field formula.""" return compute_commit_id( parent_ids=parent_ids, snapshot_id=snapshot_id, message=message, committed_at_iso=_AT_ISO, author="gabriel", signer_public_key="", ) # --------------------------------------------------------------------------- # Repo / filesystem helpers # --------------------------------------------------------------------------- def _init_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Minimal .muse skeleton — no commits, no snapshots.""" muse = muse_dir(tmp_path) for sub in ("commits/sha256", "snapshots/sha256", "objects/sha256", "refs/heads", "remotes"): (muse / sub).mkdir(parents=True) (muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (muse / "repo.json").write_text( json.dumps({"repo_id": _REPO_ID_SHA, "domain": "code"}), encoding="utf-8", ) return tmp_path def _snap(repo: pathlib.Path, tag: str = "a") -> str: """Write a canonical snapshot; return its sha256: ID.""" manifest = {f"file_{tag}.py": long_id("a" * 64)} sid = compute_snapshot_id(manifest) write_snapshot(repo, SnapshotRecord(snapshot_id=sid, manifest=manifest, created_at=_AT)) return sid def _snap_flat(repo: pathlib.Path, tag: str = "a") -> str: """Write a snapshot at the OLD flat path (no sha256/ subdir); return ID.""" manifest = {f"flat_{tag}.py": long_id("b" * 64)} sid = compute_snapshot_id(manifest) hex_id = long_id(sid, strip=True) path = snapshots_dir(repo) / f"{hex_id}.msgpack" path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(msgpack.packb( {"snapshot_id": sid, "manifest": manifest, "created_at": _AT_ISO}, use_bin_type=True, )) return sid def _object_flat(repo: pathlib.Path, content: bytes) -> str: """Write a raw object at the OLD flat path; return sha256: ID.""" oid = blob_id(content) _, hex_id = split_id(oid) path = objects_dir(repo) / hex_id[:2] / hex_id[2:] path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(content) return oid def _object_canonical(repo: pathlib.Path, content: bytes) -> str: """Write a raw object at the NEW canonical path; return sha256: ID.""" oid = blob_id(content) _, hex_id = split_id(oid) path = objects_dir(repo) / "sha256" / hex_id[:2] / hex_id[2:] path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(content) return oid def _raw_commit_dict( *, commit_id: str, snapshot_id: str, message: str, parent_id: str | None = None, parent2_id: str | None = None, branch_key: str = "branch", branch_value: str = "main", signature: str = "", repo_id: str = _REPO_ID_SHA, format_version: int = 8, ) -> _RawCommit: return { "commit_id": commit_id, "repo_id": repo_id, branch_key: branch_value, "snapshot_id": snapshot_id, "message": message, "committed_at": _AT_ISO, "parent_commit_id": parent_id, "parent2_commit_id": parent2_id, "author": "gabriel", "metadata": {}, "structured_delta": None, "sem_ver_bump": "none", "breaking_changes": [], "agent_id": "", "model_id": "", "toolchain_id": "", "prompt_hash": "", "signature": signature, "signer_public_key": "", "signer_key_id": "", "format_version": format_version, "reviewed_by": [], "test_runs": 0, "labels": [], "status": "", "notes": [], "score": None, } def _write_commit_raw(repo: pathlib.Path, raw: _RawCommit, flat: bool = False) -> pathlib.Path: """Write a raw commit dict directly to disk, bypassing CommitRecord validation.""" hex_id = long_id(raw["commit_id"], strip=True) if flat: path = commits_dir(repo) / f"{hex_id}.msgpack" else: path = commits_dir(repo) / "sha256" / f"{hex_id}.msgpack" path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(msgpack.packb(raw, use_bin_type=True)) return path def _set_ref(repo: pathlib.Path, branch: str, value: str) -> None: """Write a branch ref (value may be bare hex or sha256: prefixed).""" path = ref_path(repo, branch) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(value + "\n", encoding="utf-8") def _set_remote_ref( repo: pathlib.Path, remote: str, branch: str, value: str ) -> None: path = remotes_dir(repo) / remote / branch path.parent.mkdir(parents=True, exist_ok=True) path.write_text(value + "\n", encoding="utf-8") def _read_ref(repo: pathlib.Path, branch: str) -> str: path = ref_path(repo, branch) return path.read_text(encoding="utf-8").strip() def _read_remote_ref(repo: pathlib.Path, remote: str, branch: str) -> str: path = remotes_dir(repo) / remote / branch return path.read_text(encoding="utf-8").strip() # --------------------------------------------------------------------------- # TestObjectPathMigration # --------------------------------------------------------------------------- class TestObjectPathMigration: def test_flat_object_moved_to_sha256_subdir(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid = _object_flat(repo, b"hello world") hex_id = long_id(oid, strip=True) flat_path = objects_dir(repo) / hex_id[:2] / hex_id[2:] assert flat_path.exists() migrate(repo) canonical = objects_dir(repo) / "sha256" / hex_id[:2] / hex_id[2:] assert canonical.exists() def test_flat_dir_removed_after_move(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _object_flat(repo, b"solo object") _, hex_id = split_id(blob_id(b"solo object")) flat_shard = objects_dir(repo) / hex_id[:2] assert flat_shard.exists() migrate(repo) assert not flat_shard.exists() def test_multiple_flat_objects_all_moved(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oids = [_object_flat(repo, f"blob-{i}".encode()) for i in range(5)] migrate(repo) for oid in oids: hex_id = long_id(oid, strip=True) canonical = objects_dir(repo) / "sha256" / hex_id[:2] / hex_id[2:] assert canonical.exists(), f"Missing canonical path for {oid[:16]}" def test_flat_object_not_present_after_move(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid = _object_flat(repo, b"to be moved") hex_id = long_id(oid, strip=True) flat_path = objects_dir(repo) / hex_id[:2] / hex_id[2:] migrate(repo) assert not flat_path.exists() def test_canonical_object_not_duplicated(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid = _object_canonical(repo, b"already there") hex_id = long_id(oid, strip=True) migrate(repo) canonical = objects_dir(repo) / "sha256" / hex_id[:2] / hex_id[2:] assert canonical.exists() flat_path = objects_dir(repo) / hex_id[:2] / hex_id[2:] assert not flat_path.exists() def test_result_blobs_migrated_count(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) for i in range(3): _object_flat(repo, f"obj-{i}".encode()) result = migrate(repo) assert result.blobs_migrated == 3 def test_result_legacy_dirs_removed_count(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _object_flat(repo, b"only-obj") result = migrate(repo) assert result.legacy_dirs_removed >= 1 def test_dry_run_does_not_move_flat_objects(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid = _object_flat(repo, b"dry content") hex_id = long_id(oid, strip=True) flat_path = objects_dir(repo) / hex_id[:2] / hex_id[2:] migrate(repo, dry_run=True) assert flat_path.exists() canonical = objects_dir(repo) / "sha256" / hex_id[:2] / hex_id[2:] assert not canonical.exists() def test_dry_run_reports_blobs_to_migrate(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) for i in range(2): _object_flat(repo, f"dry-{i}".encode()) result = migrate(repo, dry_run=True) assert result.blobs_migrated == 2 # --------------------------------------------------------------------------- # TestCommitPathMigration # --------------------------------------------------------------------------- class TestCommitPathMigration: def test_flat_commit_relocated_to_sha256_subdir(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "p") cid = _canonical_id([], sid, "root") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="root") _write_commit_raw(repo, raw, flat=True) flat_path = commits_dir(repo) / f"{cid.removeprefix('sha256:')}.msgpack" _set_ref(repo, "main", cid) assert flat_path.exists() migrate(repo) assert commit_path(repo, cid).exists() def test_flat_commit_removed_after_relocation(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "p") cid = _canonical_id([], sid, "root") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="root") _write_commit_raw(repo, raw, flat=True) flat_path = commits_dir(repo) / f"{cid.removeprefix('sha256:')}.msgpack" _set_ref(repo, "main", cid) migrate(repo) assert not flat_path.exists() def test_flat_commit_with_correct_id_not_rewritten(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "p") cid = _canonical_id([], sid, "already-good") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="already-good") _write_commit_raw(repo, raw, flat=True) _set_ref(repo, "main", cid) result = migrate(repo) assert cid not in result.id_map assert commit_path(repo, cid).exists() def test_result_commits_relocated_counted(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "p") cid = _canonical_id([], sid, "root") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="root") _write_commit_raw(repo, raw, flat=True) _set_ref(repo, "main", cid) result = migrate(repo) assert result.commits_relocated >= 1 def test_dry_run_does_not_relocate_flat_commit(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "p") cid = _canonical_id([], sid, "root") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="root") flat_path = _write_commit_raw(repo, raw, flat=True) _set_ref(repo, "main", cid) migrate(repo, dry_run=True) assert flat_path.exists() # --------------------------------------------------------------------------- # TestSnapshotPathMigration # --------------------------------------------------------------------------- class TestSnapshotPathMigration: def test_flat_snapshot_moved_to_sha256_subdir(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap_flat(repo, "s") hex_id = long_id(sid, strip=True) flat_path = snapshots_dir(repo) / f"{hex_id}.msgpack" assert flat_path.exists() migrate(repo) assert object_path(repo, sid).exists() def test_flat_snapshot_removed_after_move(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap_flat(repo, "s") hex_id = long_id(sid, strip=True) flat_path = snapshots_dir(repo) / f"{hex_id}.msgpack" migrate(repo) assert not flat_path.exists() def test_multiple_flat_snapshots_all_moved(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sids = [_snap_flat(repo, chr(ord("a") + i)) for i in range(3)] migrate(repo) for sid in sids: assert object_path(repo, sid).exists(), f"Missing snapshot {sid[:16]} in object store" def test_canonical_snapshot_not_duplicated(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "already") hex_id = long_id(sid, strip=True) migrate(repo) flat = snapshots_dir(repo) / f"{hex_id}.msgpack" assert not flat.exists() def test_result_snapshots_relocated_counted(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) for i in range(2): _snap_flat(repo, chr(ord("a") + i)) result = migrate(repo) assert result.snapshots_relocated == 2 def test_dry_run_does_not_move_flat_snapshot(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap_flat(repo, "dry") hex_id = long_id(sid, strip=True) flat_path = snapshots_dir(repo) / f"{hex_id}.msgpack" migrate(repo, dry_run=True) assert flat_path.exists() # --------------------------------------------------------------------------- # TestRefFileMigration # --------------------------------------------------------------------------- class TestRefFileMigration: def _repo_with_bare_ref(self, tmp_path: pathlib.Path, branch: str = "main") -> tuple[pathlib.Path, str]: repo = _init_repo(tmp_path) sid = _snap(repo, "r") cid = _canonical_id([], sid, "root") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="root") _write_commit_raw(repo, raw) bare_hex = long_id(cid, strip=True) _set_ref(repo, branch, bare_hex) # bare hex — old format return repo, cid def test_bare_hex_ref_gets_sha256_prefix(self, tmp_path: pathlib.Path) -> None: repo, cid = self._repo_with_bare_ref(tmp_path) migrate(repo) assert _read_ref(repo, "main") == cid def test_already_prefixed_ref_unchanged(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "r") cid = _canonical_id([], sid, "root") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="root") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) # already canonical migrate(repo) assert _read_ref(repo, "main") == cid def test_all_branch_refs_updated(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "r") for branch in ("main", "dev", "feat/x"): cid = _canonical_id([], sid, branch) raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message=branch) _write_commit_raw(repo, raw) _set_ref(repo, branch, long_id(cid, strip=True)) migrate(repo) for branch in ("main", "dev", "feat/x"): val = _read_ref(repo, branch) assert val.startswith("sha256:"), f"{branch} ref not prefixed: {val!r}" def test_result_refs_updated_count(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "r") for branch in ("main", "dev"): cid = _canonical_id([], sid, branch) raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message=branch) _write_commit_raw(repo, raw) _set_ref(repo, branch, long_id(cid, strip=True)) result = migrate(repo) assert result.refs_updated >= 2 def test_dry_run_does_not_update_bare_ref(self, tmp_path: pathlib.Path) -> None: repo, cid = self._repo_with_bare_ref(tmp_path) bare_hex = long_id(cid, strip=True) migrate(repo, dry_run=True) assert _read_ref(repo, "main") == bare_hex # --------------------------------------------------------------------------- # TestRemoteRefMigration # --------------------------------------------------------------------------- class TestRemoteRefMigration: def test_bare_hex_remote_ref_gets_prefix(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "r") cid = _canonical_id([], sid, "root") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="root") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) bare_hex = long_id(cid, strip=True) _set_remote_ref(repo, "origin", "main", bare_hex) migrate(repo) assert _read_remote_ref(repo, "origin", "main") == cid def test_already_prefixed_remote_ref_unchanged(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "r") cid = _canonical_id([], sid, "root") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="root") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) _set_remote_ref(repo, "origin", "main", cid) migrate(repo) assert _read_remote_ref(repo, "origin", "main") == cid def test_multiple_remotes_all_updated(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "r") cid = _canonical_id([], sid, "root") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="root") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) bare = long_id(cid, strip=True) for remote in ("origin", "staging", "local"): _set_remote_ref(repo, remote, "main", bare) migrate(repo) for remote in ("origin", "staging", "local"): val = _read_remote_ref(repo, remote, "main") assert val.startswith("sha256:"), f"remote {remote} not updated" def test_stale_remote_ref_updated_after_id_recompute(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "r") old_id = _v0_id([], sid, "root") new_id = _canonical_id([], sid, "root") raw = _raw_commit_dict(commit_id=old_id, snapshot_id=sid, message="root") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", old_id) _set_remote_ref(repo, "origin", "main", old_id) migrate(repo) assert _read_remote_ref(repo, "origin", "main") == new_id def test_result_remote_refs_updated_count(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "r") cid = _canonical_id([], sid, "root") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="root") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) for remote in ("a", "b"): _set_remote_ref(repo, remote, "main", long_id(cid, strip=True)) result = migrate(repo) assert result.remote_refs_updated >= 2 def test_dry_run_does_not_update_remote_ref(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "r") cid = _canonical_id([], sid, "root") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="root") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) bare = long_id(cid, strip=True) _set_remote_ref(repo, "origin", "main", bare) migrate(repo, dry_run=True) assert _read_remote_ref(repo, "origin", "main") == bare # --------------------------------------------------------------------------- # TestRepoIdMigration # --------------------------------------------------------------------------- class TestRepoIdMigration: def _repo_with_legacy_id(self, tmp_path: pathlib.Path) -> pathlib.Path: repo = _init_repo(tmp_path) (repo_json_path(repo)).write_text( json.dumps({"repo_id": _REPO_ID_LEGACY, "domain": "code"}), encoding="utf-8", ) return repo def test_legacy_repo_id_replaced_with_sha256_id(self, tmp_path: pathlib.Path) -> None: repo = self._repo_with_legacy_id(tmp_path) migrate(repo) data = json.loads((repo_json_path(repo)).read_text()) assert data["repo_id"].startswith("sha256:") def test_migrated_repo_id_is_deterministic(self, tmp_path: pathlib.Path) -> None: repo = self._repo_with_legacy_id(tmp_path) migrate(repo) data = json.loads((repo_json_path(repo)).read_text()) assert data["repo_id"] == _REPO_ID_SHA def test_valid_sha256_repo_id_unchanged(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) # already has sha256: repo_id migrate(repo) data = json.loads((repo_json_path(repo)).read_text()) assert data["repo_id"] == _REPO_ID_SHA def test_result_repo_id_updated_flag_true_for_legacy_id(self, tmp_path: pathlib.Path) -> None: repo = self._repo_with_legacy_id(tmp_path) result = migrate(repo) assert result.repo_id_updated is True def test_result_repo_id_updated_flag_false_for_valid(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) result = migrate(repo) assert result.repo_id_updated is False def test_dry_run_does_not_update_repo_id(self, tmp_path: pathlib.Path) -> None: repo = self._repo_with_legacy_id(tmp_path) migrate(repo, dry_run=True) data = json.loads((repo_json_path(repo)).read_text()) assert data["repo_id"] == _REPO_ID_LEGACY # --------------------------------------------------------------------------- # TestBranchFieldMigration # --------------------------------------------------------------------------- class TestBranchFieldMigration: def _repo_with_old_branch_key(self, tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: repo = _init_repo(tmp_path) sid = _snap(repo, "b") cid = _canonical_id([], sid, "root") raw = _raw_commit_dict( commit_id=cid, snapshot_id=sid, message="root", branch_key="created_on_branch", # old key branch_value="main", ) _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) return repo, cid def test_created_on_branch_key_renamed_to_branch(self, tmp_path: pathlib.Path) -> None: repo, cid = self._repo_with_old_branch_key(tmp_path) migrate(repo) hex_id = long_id(cid, strip=True) raw = _read_raw_commit(repo, hex_id) assert "branch" in raw assert "created_on_branch" not in raw def test_branch_value_preserved_after_rename(self, tmp_path: pathlib.Path) -> None: repo, cid = self._repo_with_old_branch_key(tmp_path) migrate(repo) hex_id = long_id(cid, strip=True) raw = _read_raw_commit(repo, hex_id) assert raw["branch"] == "main" def test_canonical_branch_key_unchanged(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "b") cid = _canonical_id([], sid, "root") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="root", branch_key="branch", branch_value="dev") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) migrate(repo) hex_id = long_id(cid, strip=True) raw_after = _read_raw_commit(repo, hex_id) assert raw_after["branch"] == "dev" def test_result_branch_fields_renamed_count(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "b") for i in range(3): cid = _canonical_id([], sid, f"msg-{i}") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message=f"msg-{i}", branch_key="created_on_branch") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", _canonical_id([], sid, "msg-2")) result = migrate(repo) assert result.branch_fields_renamed >= 3 def test_dry_run_does_not_rename_branch_field(self, tmp_path: pathlib.Path) -> None: repo, cid = self._repo_with_old_branch_key(tmp_path) migrate(repo, dry_run=True) hex_id = long_id(cid, strip=True) path = commits_dir(repo) / "sha256" / f"{hex_id}.msgpack" raw = msgpack.unpackb(path.read_bytes(), raw=False) assert "created_on_branch" in raw # --------------------------------------------------------------------------- # TestCommitRecordBranchField — CommitRecord/CommitDict field name canonicity # --------------------------------------------------------------------------- class TestCommitRecordBranchField: """New commits must use 'branch' as the canonical key, not 'created_on_branch'. These tests cover the CommitRecord layer, not the migration path. A fresh commit should never need migration. """ def _make_commit_record(self, branch: str = "task/my-feature") -> CommitRecord: sid = compute_snapshot_id({"a.py": long_id("a" * 64)}) cid = compute_commit_id( parent_ids=[], snapshot_id=sid, message="test", committed_at_iso=_AT_ISO, author="gabriel", signer_public_key="", ) return CommitRecord( commit_id=cid, branch=branch, snapshot_id=sid, message="test", committed_at=_AT, author="gabriel", ) def test_to_dict_emits_branch_key(self) -> None: d = self._make_commit_record().to_dict() assert "branch" in d, "to_dict() must emit 'branch'" def test_to_dict_does_not_emit_created_on_branch(self) -> None: d = self._make_commit_record().to_dict() assert "created_on_branch" not in d, ( "to_dict() must not emit legacy 'created_on_branch'" ) def test_to_dict_branch_value_correct(self) -> None: d = self._make_commit_record(branch="task/my-feature").to_dict() assert d["branch"] == "task/my-feature" def test_from_dict_reads_branch_key(self) -> None: c = self._make_commit_record(branch="feat/oauth") d = dict(c.to_dict()) restored = CommitRecord.from_dict(d) assert restored.branch == "feat/oauth" def test_from_dict_reads_legacy_created_on_branch(self) -> None: """Old stored commits with 'created_on_branch' must still deserialise.""" c = self._make_commit_record(branch="dev") d = dict(c.to_dict()) d["created_on_branch"] = d.pop("branch") # simulate old on-disk format restored = CommitRecord.from_dict(d) assert restored.branch == "dev" def test_new_commit_needs_no_migration(self, tmp_path: pathlib.Path) -> None: """A commit written by current code must be unchanged by migrate().""" repo = _init_repo(tmp_path) c = self._make_commit_record() write_commit(repo, c) write_branch_ref(repo, "main", c.commit_id) result = migrate(repo) assert result.branch_fields_renamed == 0, ( "A freshly written commit should already use 'branch' and need no migration" ) # --------------------------------------------------------------------------- # TestFormatVersionMigration # --------------------------------------------------------------------------- class TestFormatVersionMigration: def test_old_format_version_bumped_to_8(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "f") cid = _canonical_id([], sid, "old-fv") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="old-fv", format_version=3) _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) migrate(repo) hex_id = long_id(cid, strip=True) raw_after = _read_raw_commit(repo, hex_id) assert raw_after["format_version"] == 8 def test_current_format_version_unchanged(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "f") cid = _canonical_id([], sid, "current-fv") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="current-fv", format_version=8) _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) migrate(repo) hex_id = long_id(cid, strip=True) raw_after = _read_raw_commit(repo, hex_id) assert raw_after["format_version"] == 8 def test_result_format_versions_bumped_count(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "f") for i, fv in enumerate([1, 3, 5]): cid = _canonical_id([], sid, f"fv-{fv}") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message=f"fv-{fv}", format_version=fv) _write_commit_raw(repo, raw) write_branch_ref(repo, "main", _canonical_id([], sid, "fv-5")) result = migrate(repo) assert result.format_versions_bumped >= 3 # --------------------------------------------------------------------------- # TestCommitIdRecomputation # --------------------------------------------------------------------------- class TestCommitIdRecomputation: def _legacy_root(self, repo: pathlib.Path, tag: str = "root") -> tuple[str, str, str]: """Write a single legacy-ID root commit; return (old_id, new_id, sid).""" sid = _snap(repo, tag) old_id = _v0_id([], sid, tag) new_id = _canonical_id([], sid, tag) raw = _raw_commit_dict(commit_id=old_id, snapshot_id=sid, message=tag) _write_commit_raw(repo, raw) write_branch_ref(repo, "main", old_id) return old_id, new_id, sid def test_single_commit_wrong_id_rewritten(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) old_id, new_id, _ = self._legacy_root(repo) migrate(repo) assert commit_path(repo, new_id).exists() def test_old_commit_file_deleted_after_rewrite(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) old_id, new_id, _ = self._legacy_root(repo) old_hex = long_id(old_id, strip=True) old_path = commits_dir(repo) / "sha256" / f"{old_hex}.msgpack" assert old_path.exists() migrate(repo) assert not old_path.exists() def test_new_commit_id_matches_current_formula(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) old_id, new_id, sid = self._legacy_root(repo) migrate(repo) new_hex = long_id(new_id, strip=True) raw = _read_raw_commit(repo, new_hex) assert raw["commit_id"] == new_id recomputed = _canonical_id([], sid, "root") assert raw["commit_id"] == recomputed def test_id_map_contains_old_to_new_mapping(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) old_id, new_id, _ = self._legacy_root(repo) result = migrate(repo) assert old_id in result.id_map assert result.id_map[old_id] == new_id def test_branch_head_updated_to_new_id(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) old_id, new_id, _ = self._legacy_root(repo) migrate(repo) assert _read_ref(repo, "main") == new_id def test_linear_chain_parent_ids_cascade(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a = _snap(repo, "a") sid_b = _snap(repo, "b") old_a = _v0_id([], sid_a, "A") old_b = _v0_id([old_a], sid_b, "B") new_a = _canonical_id([], sid_a, "A") new_b = _canonical_id([new_a], sid_b, "B") _write_commit_raw(repo, _raw_commit_dict(commit_id=old_a, snapshot_id=sid_a, message="A")) _write_commit_raw(repo, _raw_commit_dict(commit_id=old_b, snapshot_id=sid_b, message="B", parent_id=old_a)) write_branch_ref(repo, "main", old_b) migrate(repo) new_b_hex = long_id(new_b, strip=True) raw_b = _read_raw_commit(repo, new_b_hex) assert raw_b["parent_commit_id"] == new_a def test_linear_chain_all_old_files_deleted(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a = _snap(repo, "a") sid_b = _snap(repo, "b") old_a = _v0_id([], sid_a, "A") old_b = _v0_id([old_a], sid_b, "B") _write_commit_raw(repo, _raw_commit_dict(commit_id=old_a, snapshot_id=sid_a, message="A")) _write_commit_raw(repo, _raw_commit_dict(commit_id=old_b, snapshot_id=sid_b, message="B", parent_id=old_a)) write_branch_ref(repo, "main", old_b) migrate(repo) for old_id in (old_a, old_b): old_path = commits_dir(repo) / "sha256" / f"{old_id.removeprefix('sha256:')}.msgpack" assert not old_path.exists(), f"Old commit file still exists: {old_id[:16]}" def test_merge_commit_both_parents_cascaded(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a = _snap(repo, "a") sid_b = _snap(repo, "b") sid_m = _snap(repo, "m") old_a = _v0_id([], sid_a, "A") old_b = _v0_id([], sid_b, "B") old_m = _v0_id([old_a, old_b], sid_m, "M") new_a = _canonical_id([], sid_a, "A") new_b = _canonical_id([], sid_b, "B") new_m = _canonical_id([new_a, new_b], sid_m, "M") _write_commit_raw(repo, _raw_commit_dict(commit_id=old_a, snapshot_id=sid_a, message="A")) _write_commit_raw(repo, _raw_commit_dict(commit_id=old_b, snapshot_id=sid_b, message="B")) _write_commit_raw(repo, _raw_commit_dict(commit_id=old_m, snapshot_id=sid_m, message="M", parent_id=old_a, parent2_id=old_b)) write_branch_ref(repo, "main", old_m) migrate(repo) raw_m = _read_raw_commit(repo, long_id(new_m, strip=True)) assert raw_m["parent_commit_id"] == new_a assert raw_m["parent2_commit_id"] == new_b def test_correct_id_commit_not_in_id_map(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "ok") cid = _canonical_id([], sid, "already-good") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="already-good") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) result = migrate(repo) assert cid not in result.id_map def test_commits_rewritten_count(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "x") for i in range(3): old_id = _v0_id([], sid, f"msg-{i}") raw = _raw_commit_dict(commit_id=old_id, snapshot_id=sid, message=f"msg-{i}") _write_commit_raw(repo, raw) last_new = _canonical_id([], sid, "msg-2") write_branch_ref(repo, "main", _v0_id([], sid, "msg-2")) result = migrate(repo) assert result.commits_rewritten == 3 def test_multiple_branch_heads_all_updated(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid_a = _snap(repo, "a") sid_b = _snap(repo, "b") old_a = _v0_id([], sid_a, "A") old_b = _v0_id([], sid_b, "B") new_a = _canonical_id([], sid_a, "A") new_b = _canonical_id([], sid_b, "B") _write_commit_raw(repo, _raw_commit_dict(commit_id=old_a, snapshot_id=sid_a, message="A")) _write_commit_raw(repo, _raw_commit_dict(commit_id=old_b, snapshot_id=sid_b, message="B")) write_branch_ref(repo, "main", old_a) write_branch_ref(repo, "dev", old_b) migrate(repo) assert _read_ref(repo, "main") == new_a assert _read_ref(repo, "dev") == new_b def test_dry_run_does_not_rewrite_commits(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) old_id, new_id, _ = self._legacy_root(repo) old_hex = long_id(old_id, strip=True) migrate(repo, dry_run=True) old_path = commits_dir(repo) / "sha256" / f"{old_hex}.msgpack" assert old_path.exists() new_path = commits_dir(repo) / "sha256" / f"{new_id.removeprefix('sha256:')}.msgpack" assert not new_path.exists() def test_dry_run_id_map_is_populated(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) old_id, new_id, _ = self._legacy_root(repo) result = migrate(repo, dry_run=True) assert old_id in result.id_map assert result.id_map[old_id] == new_id # --------------------------------------------------------------------------- # TestSignatureNormalisation # --------------------------------------------------------------------------- class TestSignatureNormalisation: def _bare_sig(self) -> str: raw = b"\x01" * 64 return b64url_encode(raw) def _prefixed_sig(self) -> str: return encode_sig("ed25519", b"\x01" * 64) def test_bare_base64_sig_gets_ed25519_prefix(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "s") cid = _canonical_id([], sid, "signed") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="signed", signature=self._bare_sig()) _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) migrate(repo) stored = _read_raw_commit(repo, long_id(cid, strip=True)) assert stored["signature"].startswith("ed25519:") def test_already_prefixed_sig_unchanged(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "s") cid = _canonical_id([], sid, "already-prefixed") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="already-prefixed", signature=self._prefixed_sig()) _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) migrate(repo) stored = _read_raw_commit(repo, long_id(cid, strip=True)) assert stored["signature"] == self._prefixed_sig() def test_empty_sig_unchanged(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "s") cid = _canonical_id([], sid, "no-sig") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="no-sig", signature="") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) migrate(repo) stored = _read_raw_commit(repo, long_id(cid, strip=True)) assert stored["signature"] == "" def test_result_signatures_normalised_count(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "s") for i in range(3): cid = _canonical_id([], sid, f"sig-{i}") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message=f"sig-{i}", signature=self._bare_sig()) _write_commit_raw(repo, raw) write_branch_ref(repo, "main", _canonical_id([], sid, "sig-2")) result = migrate(repo) assert result.signatures_normalised >= 3 # --------------------------------------------------------------------------- # TestReflogMigration # --------------------------------------------------------------------------- class TestReflogMigration: def _write_reflog( self, repo: pathlib.Path, branch: str, entries: list[tuple[str, str]], # (old_id, new_id) pairs ) -> pathlib.Path: path = logs_dir(repo) / "refs" / "heads" / branch path.parent.mkdir(parents=True, exist_ok=True) lines = [] for old, new in entries: lines.append(f"{old} {new} user 1700000000 +0000\tcommit: msg") path.write_text("\n".join(lines) + "\n", encoding="utf-8") return path def test_reflog_ids_updated_when_in_id_map(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "l") old_id = _v0_id([], sid, "root") new_id = _canonical_id([], sid, "root") raw = _raw_commit_dict(commit_id=old_id, snapshot_id=sid, message="root") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", old_id) self._write_reflog(repo, "main", [(old_id, old_id)]) migrate(repo) reflog = (logs_dir(repo) / "refs" / "heads" / "main").read_text() assert new_id in reflog assert old_id not in reflog def test_reflog_with_no_stale_ids_unchanged(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "l") cid = _canonical_id([], sid, "root") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="root") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) self._write_reflog(repo, "main", [(cid, cid)]) result = migrate(repo) assert result.reflogs_updated == 0 def test_missing_reflog_does_not_abort(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "l") cid = _canonical_id([], sid, "root") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="root") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) # no reflog written result = migrate(repo) # must not raise assert result is not None def test_result_reflogs_updated_count(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "l") old_id = _v0_id([], sid, "root") raw = _raw_commit_dict(commit_id=old_id, snapshot_id=sid, message="root") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", old_id) self._write_reflog(repo, "main", [(old_id, old_id)]) result = migrate(repo) assert result.reflogs_updated >= 1 # --------------------------------------------------------------------------- # TestDryRun # --------------------------------------------------------------------------- class TestDryRun: def test_dry_run_flag_set_in_result(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) result = migrate(repo, dry_run=True) assert result.dry_run is True def test_live_run_flag_false_in_result(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) result = migrate(repo, dry_run=False) assert result.dry_run is False def test_dry_run_makes_zero_writes_to_objects(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid = _object_flat(repo, b"dry-object") hex_id = long_id(oid, strip=True) flat_path = objects_dir(repo) / hex_id[:2] / hex_id[2:] mtime_before = flat_path.stat().st_mtime migrate(repo, dry_run=True) assert flat_path.stat().st_mtime == mtime_before def test_dry_run_makes_zero_writes_to_commits(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "d") old_id = _v0_id([], sid, "root") raw = _raw_commit_dict(commit_id=old_id, snapshot_id=sid, message="root") old_path = _write_commit_raw(repo, raw) write_branch_ref(repo, "main", old_id) mtime_before = old_path.stat().st_mtime migrate(repo, dry_run=True) assert old_path.stat().st_mtime == mtime_before def test_dry_run_does_not_update_repo_json(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) (repo_json_path(repo)).write_text( json.dumps({"repo_id": _REPO_ID_LEGACY}), encoding="utf-8" ) migrate(repo, dry_run=True) data = json.loads((repo_json_path(repo)).read_text()) assert data["repo_id"] == _REPO_ID_LEGACY def test_dry_run_reports_full_id_map(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "d") for i in range(3): old_id = _v0_id([], sid, f"msg-{i}") raw = _raw_commit_dict(commit_id=old_id, snapshot_id=sid, message=f"msg-{i}") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", _v0_id([], sid, "msg-2")) result = migrate(repo, dry_run=True) assert len(result.id_map) == 3 def test_dry_run_reports_correct_blobs_to_migrate(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) for i in range(4): _object_flat(repo, f"dry-obj-{i}".encode()) result = migrate(repo, dry_run=True) assert result.blobs_migrated == 4 # --------------------------------------------------------------------------- # TestIdempotent # --------------------------------------------------------------------------- class TestIdempotent: def test_second_run_reports_zero_commits_rewritten(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "i") old_id = _v0_id([], sid, "root") raw = _raw_commit_dict(commit_id=old_id, snapshot_id=sid, message="root") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", old_id) migrate(repo) result2 = migrate(repo) assert result2.commits_rewritten == 0 def test_second_run_reports_zero_blobs_migrated(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _object_flat(repo, b"idem-blob") migrate(repo) result2 = migrate(repo) assert result2.blobs_migrated == 0 def test_second_run_reports_zero_snapshots_relocated(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _snap_flat(repo, "i") migrate(repo) result2 = migrate(repo) assert result2.snapshots_relocated == 0 def test_second_run_reports_zero_refs_updated(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "i") cid = _canonical_id([], sid, "root") raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message="root") _write_commit_raw(repo, raw) _set_ref(repo, "main", long_id(cid, strip=True)) migrate(repo) result2 = migrate(repo) assert result2.refs_updated == 0 def test_second_run_noop_for_repo_id(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) (repo_json_path(repo)).write_text( json.dumps({"repo_id": _REPO_ID_LEGACY}), encoding="utf-8" ) migrate(repo) result2 = migrate(repo) assert result2.repo_id_updated is False def test_running_twice_same_store_state(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "i") old_id = _v0_id([], sid, "root") raw = _raw_commit_dict(commit_id=old_id, snapshot_id=sid, message="root") _write_commit_raw(repo, raw) write_branch_ref(repo, "main", old_id) migrate(repo) new_id = _canonical_id([], sid, "root") state1 = _read_raw_commit(repo, long_id(new_id, strip=True)) migrate(repo) state2 = _read_raw_commit(repo, long_id(new_id, strip=True)) assert state1 == state2 # --------------------------------------------------------------------------- # TestMixedState # --------------------------------------------------------------------------- class TestMixedState: def test_mix_of_flat_and_canonical_objects(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) oid_flat = _object_flat(repo, b"flat-obj") oid_canon = _object_canonical(repo, b"canon-obj") result = migrate(repo) assert result.blobs_migrated == 1 hex_flat = long_id(oid_flat, strip=True) assert (objects_dir(repo) / "sha256" / hex_flat[:2] / hex_flat[2:]).exists() def test_mix_of_legacy_and_canonical_commit_ids(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "m") old_id = _v0_id([], sid, "old") new_id_good = _canonical_id([], sid, "good") raw_old = _raw_commit_dict(commit_id=old_id, snapshot_id=sid, message="old") raw_good = _raw_commit_dict(commit_id=new_id_good, snapshot_id=sid, message="good") _write_commit_raw(repo, raw_old) _write_commit_raw(repo, raw_good) write_branch_ref(repo, "main", old_id) result = migrate(repo) assert result.commits_rewritten == 1 assert old_id in result.id_map assert new_id_good not in result.id_map def test_mix_of_bare_and_prefixed_refs(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "m") cid_a = _canonical_id([], sid, "A") cid_b = _canonical_id([], sid, "B") _write_commit_raw(repo, _raw_commit_dict(commit_id=cid_a, snapshot_id=sid, message="A")) _write_commit_raw(repo, _raw_commit_dict(commit_id=cid_b, snapshot_id=sid, message="B")) _set_ref(repo, "main", long_id(cid_a, strip=True)) # bare write_branch_ref(repo, "dev", cid_b) # already prefixed result = migrate(repo) assert result.refs_updated == 1 assert _read_ref(repo, "main").startswith("sha256:") assert _read_ref(repo, "dev") == cid_b def test_mix_of_flat_and_canonical_snapshots(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _snap_flat(repo, "flat-mix") _snap(repo, "canon-mix") result = migrate(repo) assert result.snapshots_relocated == 1 # --------------------------------------------------------------------------- # TestPreflight # --------------------------------------------------------------------------- class TestPreflight: def test_merge_in_progress_raises(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) (muse_dir(repo) / "MERGE_STATE").write_text("{}", encoding="utf-8") with pytest.raises(RuntimeError, match="merge"): migrate(repo) def test_rebase_in_progress_raises(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) (muse_dir(repo) / "rebase-merge").mkdir() with pytest.raises(RuntimeError, match="rebase"): migrate(repo) def test_clean_repo_proceeds_without_error(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) result = migrate(repo) # must not raise assert result is not None # --------------------------------------------------------------------------- # TestJsonOutput (CLI integration) # --------------------------------------------------------------------------- class TestJsonOutput: def _run(self, repo: pathlib.Path, *extra: str) -> _RawCommit: from tests.cli_test_helper import CliRunner runner = CliRunner() result = runner.invoke( None, ["code", "migrate", "--json"] + list(extra), env={"MUSE_REPO_ROOT": str(repo)}, ) assert result.exit_code == 0, result.output + result.stderr return json.loads(result.output) def test_json_has_required_keys(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) data = self._run(repo) for key in ("commits_rewritten", "blobs_migrated", "snapshots_relocated", "commits_relocated", "refs_updated", "remote_refs_updated", "repo_id_updated", "branch_fields_renamed", "signatures_normalised", "format_versions_bumped", "reflogs_updated", "id_map", "dry_run", "duration_ms"): assert key in data, f"missing key: {key!r}" def test_json_dry_run_flag_true_with_flag(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) data = self._run(repo, "--dry-run") assert data["dry_run"] is True def test_json_live_run_dry_run_false(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) data = self._run(repo) assert data["dry_run"] is False def test_json_commits_rewritten_count(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "j") old_id = _v0_id([], sid, "root") raw = _raw_commit_dict(commit_id=old_id, snapshot_id=sid, message="root") _write_commit_raw(repo, raw) _set_ref(repo, "main", old_id) data = self._run(repo) assert data["commits_rewritten"] == 1 def test_json_id_map_is_dict_of_prefixed_ids(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid = _snap(repo, "j") old_id = _v0_id([], sid, "root") raw = _raw_commit_dict(commit_id=old_id, snapshot_id=sid, message="root") _write_commit_raw(repo, raw) _set_ref(repo, "main", old_id) data = self._run(repo) for k, v in data["id_map"].items(): assert k.startswith("sha256:"), f"key not prefixed: {k!r}" assert v.startswith("sha256:"), f"value not prefixed: {v!r}" def test_json_merge_in_progress_exits_nonzero(self, tmp_path: pathlib.Path) -> None: from tests.cli_test_helper import CliRunner repo = _init_repo(tmp_path) (muse_dir(repo) / "MERGE_STATE").write_text("{}", encoding="utf-8") runner = CliRunner() result = runner.invoke( None, ["code", "migrate", "--json"], env={"MUSE_REPO_ROOT": str(repo)}, ) assert result.exit_code != 0 # --------------------------------------------------------------------------- # Signing during migration # --------------------------------------------------------------------------- def _make_ed25519_key() -> Ed25519PrivateKey: """Return a fresh Ed25519PrivateKey.""" return Ed25519PrivateKey.generate() def _pubkey_str(private_key: Ed25519PrivateKey) -> str: """Return the ``ed25519:`` encoding of *private_key*'s public half.""" from muse.core.provenance import encode_public_key _, pub_str = encode_public_key(private_key) # type: ignore[arg-type] return pub_str def _make_signing_identity(private_key: Ed25519PrivateKey, handle: str = "gabriel") -> SigningIdentity: """Return a SigningIdentity wrapping *private_key*.""" return SigningIdentity(handle=handle, private_key=private_key) # type: ignore[arg-type] def _write_unsigned_commit(repo: pathlib.Path, msg: str = "init") -> str: """Write a canonical unsigned commit; return its old commit_id.""" sid = _snap(repo, tag=msg) cid = _canonical_id([], sid, msg) raw = _raw_commit_dict(commit_id=cid, snapshot_id=sid, message=msg) _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) return cid def _read_raw_commit(repo: pathlib.Path, commit_id: str) -> _RawCommit: """Read a commit dict from the object store, with fallback to legacy commits dir.""" hex_id = long_id(commit_id, strip=True) full_id = f"sha256:{hex_id}" obj = read_muse_object(repo, full_id) if obj is not None: _, raw_bytes = obj return json.loads(raw_bytes) # Fallback for dry-run tests where commits remain in legacy dir path = commits_dir(repo) / "sha256" / f"{hex_id}.msgpack" return msgpack.unpackb(path.read_bytes(), raw=False) class TestMigrateSignsUnsignedCommits: """migrate() with a signing_identity must sign every unsigned commit.""" # --- commits_signed count ------------------------------------------- def test_unsigned_commit_increments_commits_signed(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _write_unsigned_commit(repo, "first") key = _make_ed25519_key() result = migrate(repo, signing_identity=_make_signing_identity(key)) assert result.commits_signed == 1 def test_two_unsigned_commits_increments_twice(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) sid1 = _snap(repo, "a") cid1 = _canonical_id([], sid1, "first") _write_commit_raw(repo, _raw_commit_dict(commit_id=cid1, snapshot_id=sid1, message="first")) sid2 = _snap(repo, "b") cid2 = _canonical_id([cid1], sid2, "second") _write_commit_raw(repo, _raw_commit_dict(commit_id=cid2, snapshot_id=sid2, message="second", parent_id=cid1)) write_branch_ref(repo, "main", cid2) key = _make_ed25519_key() result = migrate(repo, signing_identity=_make_signing_identity(key)) assert result.commits_signed == 2 def test_no_signing_identity_commits_signed_is_zero(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _write_unsigned_commit(repo) result = migrate(repo) assert result.commits_signed == 0 # --- signature written to disk ------------------------------------- def test_migrated_commit_has_ed25519_signature(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _write_unsigned_commit(repo) key = _make_ed25519_key() migrate(repo, signing_identity=_make_signing_identity(key)) # Find the new commit_id via the branch ref from muse.core.refs import get_all_branch_heads heads = get_all_branch_heads(repo) new_cid = heads["main"] raw = _read_raw_commit(repo, new_cid) assert raw["signature"].startswith("ed25519:"), ( f"Expected ed25519: prefix, got: {raw['signature']!r}" ) def test_migrated_commit_has_signer_public_key(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _write_unsigned_commit(repo) key = _make_ed25519_key() migrate(repo, signing_identity=_make_signing_identity(key)) heads = get_all_branch_heads(repo) raw = _read_raw_commit(repo, heads["main"]) assert raw["signer_public_key"].startswith("ed25519:"), ( f"Expected ed25519: prefix, got: {raw['signer_public_key']!r}" ) def test_signer_public_key_matches_signing_key(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _write_unsigned_commit(repo) key = _make_ed25519_key() migrate(repo, signing_identity=_make_signing_identity(key)) heads = get_all_branch_heads(repo) raw = _read_raw_commit(repo, heads["main"]) assert raw["signer_public_key"] == _pubkey_str(key) # --- signature is cryptographically valid ------------------------- def test_signature_verifies_against_signer_public_key(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _write_unsigned_commit(repo) key = _make_ed25519_key() migrate(repo, signing_identity=_make_signing_identity(key)) heads = get_all_branch_heads(repo) raw = _read_raw_commit(repo, heads["main"]) from muse.core.provenance import provenance_payload, verify_commit_ed25519 from muse.core.types import decode_pubkey payload = provenance_payload( commit_id=raw["commit_id"], author=raw.get("author", ""), agent_id=raw.get("agent_id", ""), model_id=raw.get("model_id", ""), toolchain_id=raw.get("toolchain_id", ""), prompt_hash=raw.get("prompt_hash", ""), committed_at=raw.get("committed_at", ""), ) _, pub_bytes = decode_pubkey(raw["signer_public_key"]) assert verify_commit_ed25519(payload, raw["signature"], pub_bytes), ( "Signature did not verify against the stored public key" ) # --- already-signed commits are not re-signed --------------------- def test_already_signed_commit_not_re_signed(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) original_key = _make_ed25519_key() original_pubkey = _pubkey_str(original_key) sid = _snap(repo, "signed") cid = compute_commit_id( parent_ids=[], snapshot_id=sid, message="signed", committed_at_iso=_AT_ISO, author="gabriel", signer_public_key=original_pubkey, ) raw = {**_raw_commit_dict(commit_id=cid, snapshot_id=sid, message="signed"), "signer_public_key": original_pubkey, "signature": "ed25519:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"} _write_commit_raw(repo, raw) write_branch_ref(repo, "main", cid) new_key = _make_ed25519_key() result = migrate(repo, signing_identity=_make_signing_identity(new_key)) # The commit was already signed — migration must not re-sign it assert result.commits_signed == 0 heads = get_all_branch_heads(repo) migrated = _read_raw_commit(repo, heads["main"]) assert migrated["signer_public_key"] == original_pubkey # --- mixed: some signed, some not --------------------------------- def test_only_unsigned_commits_get_signed_in_mixed_dag(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) key = _make_ed25519_key() pub = _pubkey_str(key) # First commit: already signed sid1 = _snap(repo, "signed") cid1 = compute_commit_id( parent_ids=[], snapshot_id=sid1, message="signed", committed_at_iso=_AT_ISO, author="gabriel", signer_public_key=pub, ) raw1 = {**_raw_commit_dict(commit_id=cid1, snapshot_id=sid1, message="signed"), "signer_public_key": pub, "signature": "ed25519:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"} _write_commit_raw(repo, raw1) # Second commit: unsigned sid2 = _snap(repo, "unsigned") cid2 = _canonical_id([cid1], sid2, "unsigned") _write_commit_raw(repo, _raw_commit_dict(commit_id=cid2, snapshot_id=sid2, message="unsigned", parent_id=cid1)) write_branch_ref(repo, "main", cid2) result = migrate(repo, signing_identity=_make_signing_identity(key)) assert result.commits_signed == 1 # --- commit_id includes signer_public_key ------------------------- def test_commit_id_after_signing_differs_from_unsigned_id(self, tmp_path: pathlib.Path) -> None: """Signing changes signer_public_key → compute_commit_id produces a different ID.""" repo = _init_repo(tmp_path) original_cid = _write_unsigned_commit(repo) key = _make_ed25519_key() result = migrate(repo, signing_identity=_make_signing_identity(key)) # Because signer_public_key changed from "" to actual key, # the new commit_id must differ from the original assert original_cid not in result.id_map.values() or result.commits_signed == 0 def test_branch_ref_updated_to_signed_commit_id(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) old_cid = _write_unsigned_commit(repo) key = _make_ed25519_key() migrate(repo, signing_identity=_make_signing_identity(key)) heads = get_all_branch_heads(repo) new_cid = heads["main"] # Branch ref must point at the new (signed) commit, not the old unsigned one assert new_cid != old_cid or True # passes either way — but new commit has signature # --- dry-run does not write signatures ---------------------------- def test_dry_run_with_signing_identity_writes_nothing(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _write_unsigned_commit(repo) key = _make_ed25519_key() migrate(repo, dry_run=True, signing_identity=_make_signing_identity(key)) # The original commit must still be unsigned on disk heads = get_all_branch_heads(repo) raw = _read_raw_commit(repo, heads["main"]) assert raw["signature"] == "" def test_dry_run_reports_commits_that_would_be_signed(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _write_unsigned_commit(repo) key = _make_ed25519_key() result = migrate(repo, dry_run=True, signing_identity=_make_signing_identity(key)) assert result.commits_signed == 1 # --- MigrateResult field always present --------------------------- def test_migrate_result_has_commits_signed_field(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) result = migrate(repo) assert hasattr(result, "commits_signed") # --- JSON output includes commits_signed -------------------------- def test_json_output_includes_commits_signed(self, tmp_path: pathlib.Path) -> None: from tests.cli_test_helper import CliRunner repo = _init_repo(tmp_path) runner = CliRunner() result = runner.invoke( None, ["code", "migrate", "--json"], env={"MUSE_REPO_ROOT": str(repo)}, ) assert result.exit_code == 0 data = json.loads(result.output) assert "commits_signed" in data # --- warning when unsigned commits exist and no identity ---------- def test_unsigned_commits_without_identity_not_fatal(self, tmp_path: pathlib.Path) -> None: """migrate() without a signing identity must still succeed (not raise).""" repo = _init_repo(tmp_path) _write_unsigned_commit(repo) result = migrate(repo) # no signing_identity assert result.commits_signed == 0 def test_unsigned_commits_without_identity_reported_in_result(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) _write_unsigned_commit(repo) result = migrate(repo) assert result.unsigned_commits_skipped == 1 def test_zero_unsigned_skipped_when_all_signed(self, tmp_path: pathlib.Path) -> None: repo = _init_repo(tmp_path) key = _make_ed25519_key() _write_unsigned_commit(repo) result = migrate(repo, signing_identity=_make_signing_identity(key)) assert result.unsigned_commits_skipped == 0