"""TDD — Phase 7: migrate old object store to Git-idiomatic muse format. All three ID formulas changed in Phase 2: old hash_blob(data) = sha256(data) new hash_blob(data) = sha256("blob \\0") old hash_snapshot(manifest) = sha256(canonical) new hash_snapshot(manifest) = sha256("snapshot \\0") old hash_commit(...) = sha256(canonical) new hash_commit(...) = sha256("commit \\0") The migration is a full DAG rewrite in three passes: Pass 1 — blobs: old raw bytes → muse-format at new path; build old→new map Pass 2 — snapshots: update manifest blob IDs, recompute snapshot ID; build map Pass 3 — commits: update snapshot_id, recompute commit ID; update refs Every pass is non-destructive: old files are never deleted. """ from __future__ import annotations import hashlib import json import pathlib import msgpack import pytest import msgpack from muse.core.ids import hash_blob, hash_snapshot, hash_commit from muse.core.object_store import object_path, objects_dir, read_muse_object from muse.core.paths import commits_dir, snapshots_dir from muse.core.types import long_id _Manifest = dict[str, str] # path → object_id # --------------------------------------------------------------------------- # Helpers — produce old-formula IDs the same way pre-Phase-2 code did # --------------------------------------------------------------------------- def _old_blob_id(data: bytes) -> str: """sha256(data) — the pre-Phase-2 blob ID formula.""" return long_id(hashlib.sha256(data).hexdigest()) def _write_old_blob(repo: pathlib.Path, data: bytes) -> str: """Write a raw blob using the old formula; return its old object_id.""" old_id = _old_blob_id(data) path = object_path(repo, old_id) path.parent.mkdir(parents=True, exist_ok=True) path.write_bytes(data) return old_id # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- BLOB_A = b"# Hello\n" BLOB_B = b"print('world')\n" @pytest.fixture def repo(tmp_path: pathlib.Path) -> pathlib.Path: objects_dir(tmp_path).mkdir(parents=True, exist_ok=True) return tmp_path # --------------------------------------------------------------------------- # Pass 1 — blob migration # --------------------------------------------------------------------------- def test_migrate_blob_ids_writes_muse_format_copy(repo: pathlib.Path) -> None: """A raw old-format blob gets a muse-format copy at the new hash_blob path.""" from muse.core.migrate import migrate_blob_ids old_id = _write_old_blob(repo, BLOB_A) result = migrate_blob_ids(repo, dry_run=False) new_id = hash_blob(BLOB_A) new_path = object_path(repo, new_id) assert new_path.exists(), "muse-format blob was not written" assert new_path.read_bytes() == b"blob 8\0" + BLOB_A assert result.id_map[old_id] == new_id def test_migrate_blob_ids_dry_run_writes_nothing(repo: pathlib.Path) -> None: """dry_run=True populates id_map but writes no files.""" from muse.core.migrate import migrate_blob_ids old_id = _write_old_blob(repo, BLOB_A) result = migrate_blob_ids(repo, dry_run=True) new_id = hash_blob(BLOB_A) new_path = object_path(repo, new_id) assert not new_path.exists(), "dry_run must not write any files" assert result.id_map[old_id] == new_id assert result.blobs_written == 1 def test_migrate_blob_ids_skips_already_muse_format(repo: pathlib.Path) -> None: """A blob already in muse format maps to itself and is not rewritten.""" from muse.core.migrate import migrate_blob_ids from muse.core.object_store import write_muse_object object_id = write_muse_object(repo, "blob", BLOB_A) result = migrate_blob_ids(repo, dry_run=False) assert result.id_map[object_id] == object_id assert result.blobs_written == 0 assert result.blobs_skipped == 1 def test_migrate_blob_ids_multiple_blobs(repo: pathlib.Path) -> None: """All old-format blobs are migrated; id_map covers every one.""" from muse.core.migrate import migrate_blob_ids old_a = _write_old_blob(repo, BLOB_A) old_b = _write_old_blob(repo, BLOB_B) result = migrate_blob_ids(repo, dry_run=False) assert result.id_map[old_a] == hash_blob(BLOB_A) assert result.id_map[old_b] == hash_blob(BLOB_B) assert result.blobs_written == 2 # --------------------------------------------------------------------------- # Pass 2 — snapshot migration helpers # --------------------------------------------------------------------------- def _old_snapshot_id(manifest: _Manifest) -> str: """sha256(canonical) without type prefix — the pre-Phase-2 formula.""" from muse.core.types import split_id _SEP = "\x00" parts = sorted(f"{path}{_SEP}{split_id(oid)[1]}" for path, oid in manifest.items()) canonical = _SEP.join(parts).encode() return long_id(hashlib.sha256(canonical).hexdigest()) def _write_old_snapshot( repo: pathlib.Path, manifest: _Manifest, created_at: str = "2026-05-20T16:00:00+00:00", ) -> str: """Write an old-format snapshot msgpack; return its old snapshot_id.""" old_id = _old_snapshot_id(manifest) _, hex_id = old_id.split(":", 1) snap_dir = snapshots_dir(repo) / "sha256" snap_dir.mkdir(parents=True, exist_ok=True) path = snap_dir / f"{hex_id}.msgpack" record = { "schema_version": 1, "snapshot_id": old_id, "manifest": manifest, "directories": [], "created_at": created_at, "note": "", } path.write_bytes(msgpack.packb(record, use_bin_type=True)) return old_id # --------------------------------------------------------------------------- # Pass 2 — snapshot migration tests # --------------------------------------------------------------------------- def test_migrate_snapshot_ids_writes_to_object_store(repo: pathlib.Path) -> None: """Migrated snapshot appears in the unified object store in muse format.""" from muse.core.migrate import migrate_blob_ids, migrate_snapshot_ids old_blob_id = _write_old_blob(repo, BLOB_A) blob_result = migrate_blob_ids(repo, dry_run=False) old_snap_id = _write_old_snapshot(repo, {"hello.md": old_blob_id}) snap_result = migrate_snapshot_ids(repo, blob_result.id_map, dry_run=False) new_blob_id = hash_blob(BLOB_A) new_snap_id = hash_snapshot({"hello.md": new_blob_id}) assert snap_result.id_map[old_snap_id] == new_snap_id result = read_muse_object(repo, new_snap_id) assert result is not None type_str, raw = result assert type_str == "snapshot" data = json.loads(raw) assert data["snapshot_id"] == new_snap_id assert data["manifest"]["hello.md"] == new_blob_id def test_migrate_snapshot_ids_dry_run_writes_nothing(repo: pathlib.Path) -> None: """dry_run=True builds id_map but writes no files.""" from muse.core.migrate import migrate_blob_ids, migrate_snapshot_ids old_blob_id = _write_old_blob(repo, BLOB_A) blob_result = migrate_blob_ids(repo, dry_run=True) old_snap_id = _write_old_snapshot(repo, {"hello.md": old_blob_id}) snap_result = migrate_snapshot_ids(repo, blob_result.id_map, dry_run=True) new_snap_id = hash_snapshot({"hello.md": hash_blob(BLOB_A)}) assert snap_result.id_map[old_snap_id] == new_snap_id assert not object_path(repo, new_snap_id).exists() def test_migrate_snapshot_ids_non_destructive(repo: pathlib.Path) -> None: """Old snapshot msgpack is preserved after migration.""" from muse.core.migrate import migrate_blob_ids, migrate_snapshot_ids old_blob_id = _write_old_blob(repo, BLOB_A) blob_result = migrate_blob_ids(repo, dry_run=False) old_snap_id = _write_old_snapshot(repo, {"hello.md": old_blob_id}) _, old_hex = old_snap_id.split(":", 1) old_path = snapshots_dir(repo) / "sha256" / f"{old_hex}.msgpack" migrate_snapshot_ids(repo, blob_result.id_map, dry_run=False) assert old_path.exists(), "old snapshot msgpack must not be deleted" # --------------------------------------------------------------------------- # Pass 3 — commit migration helpers # --------------------------------------------------------------------------- def _old_commit_id( parent_ids: list[str], snapshot_id: str, message: str, committed_at_iso: str, author: str = "", signer_public_key: str = "", ) -> str: """sha256(canonical) without type prefix — the pre-Phase-2 commit formula.""" from muse.core.types import split_id _SEP = "\x00" parts = [ _SEP.join(sorted(split_id(p)[1] for p in parent_ids)), split_id(snapshot_id)[1] if snapshot_id else "", message, committed_at_iso, author, signer_public_key, ] canonical = _SEP.join(parts).encode() return long_id(hashlib.sha256(canonical).hexdigest()) def _write_old_commit( repo: pathlib.Path, snapshot_id: str, message: str = "initial commit", committed_at: str = "2026-05-20T16:00:00+00:00", author: str = "gabriel", parent_ids: list[str] | None = None, ) -> str: """Write an old-format commit msgpack; return its old commit_id.""" parents = parent_ids or [] old_id = _old_commit_id( parent_ids=parents, snapshot_id=snapshot_id, message=message, committed_at_iso=committed_at, author=author, ) _, hex_id = old_id.split(":", 1) cmt_dir = commits_dir(repo) / "sha256" cmt_dir.mkdir(parents=True, exist_ok=True) record = { "commit_id": old_id, "branch": "main", "snapshot_id": snapshot_id, "message": message, "committed_at": committed_at, "parent_commit_id": parents[0] if parents else None, "parent2_commit_id": parents[1] if len(parents) > 1 else None, "author": author, "signature": "", "signer_public_key": "", "format_version": 8, "metadata": {}, "structured_delta": None, "sem_ver_bump": "none", "breaking_changes": [], "agent_id": "claude-code", "model_id": "claude-sonnet-4-6", "toolchain_id": "", "prompt_hash": "", "reviewed_by": [], "test_runs": 0, "labels": [], "status": "", "notes": [], "score": None, } path = cmt_dir / f"{hex_id}.msgpack" path.write_bytes(msgpack.packb(record, use_bin_type=True)) return old_id # --------------------------------------------------------------------------- # Pass 3 — commit migration tests # --------------------------------------------------------------------------- def test_migrate_commit_ids_writes_to_object_store(repo: pathlib.Path) -> None: """Migrated commit appears in the unified object store with updated snapshot_id.""" from muse.core.migrate import migrate_blob_ids, migrate_snapshot_ids, migrate_commit_ids old_blob_id = _write_old_blob(repo, BLOB_A) blob_result = migrate_blob_ids(repo, dry_run=False) old_snap_id = _write_old_snapshot(repo, {"hello.md": old_blob_id}) snap_result = migrate_snapshot_ids(repo, blob_result.id_map, dry_run=False) old_cmt_id = _write_old_commit(repo, snapshot_id=old_snap_id) cmt_result = migrate_commit_ids(repo, snap_result.id_map, dry_run=False) new_blob_id = hash_blob(BLOB_A) new_snap_id = hash_snapshot({"hello.md": new_blob_id}) new_cmt_id = hash_commit( parent_ids=[], snapshot_id=new_snap_id, message="initial commit", committed_at_iso="2026-05-20T16:00:00+00:00", author="gabriel", ) assert cmt_result.id_map[old_cmt_id] == new_cmt_id result = read_muse_object(repo, new_cmt_id) assert result is not None type_str, raw = result assert type_str == "commit" data = json.loads(raw) assert data["commit_id"] == new_cmt_id assert data["snapshot_id"] == new_snap_id def test_migrate_commit_ids_non_destructive(repo: pathlib.Path) -> None: """Old commit msgpack is preserved after migration.""" from muse.core.migrate import migrate_blob_ids, migrate_snapshot_ids, migrate_commit_ids old_blob_id = _write_old_blob(repo, BLOB_A) blob_result = migrate_blob_ids(repo, dry_run=False) old_snap_id = _write_old_snapshot(repo, {"hello.md": old_blob_id}) snap_result = migrate_snapshot_ids(repo, blob_result.id_map, dry_run=False) old_cmt_id = _write_old_commit(repo, snapshot_id=old_snap_id) _, old_hex = old_cmt_id.split(":", 1) old_path = commits_dir(repo) / "sha256" / f"{old_hex}.msgpack" migrate_commit_ids(repo, snap_result.id_map, dry_run=False) assert old_path.exists(), "old commit msgpack must not be deleted" def test_migrate_commit_ids_dry_run_writes_nothing(repo: pathlib.Path) -> None: """dry_run=True builds id_map but writes no files.""" from muse.core.migrate import migrate_blob_ids, migrate_snapshot_ids, migrate_commit_ids old_blob_id = _write_old_blob(repo, BLOB_A) blob_result = migrate_blob_ids(repo, dry_run=True) old_snap_id = _write_old_snapshot(repo, {"hello.md": old_blob_id}) snap_result = migrate_snapshot_ids(repo, blob_result.id_map, dry_run=True) old_cmt_id = _write_old_commit(repo, snapshot_id=old_snap_id) cmt_result = migrate_commit_ids(repo, snap_result.id_map, dry_run=True) new_snap_id = hash_snapshot({"hello.md": hash_blob(BLOB_A)}) new_cmt_id = hash_commit( parent_ids=[], snapshot_id=new_snap_id, message="initial commit", committed_at_iso="2026-05-20T16:00:00+00:00", author="gabriel", ) assert cmt_result.id_map[old_cmt_id] == new_cmt_id assert not object_path(repo, new_cmt_id).exists()