"""Phase 1 of issue #12: naming fix — from_msgpack → from_dict. Coverage -------- - CommitRecord.from_dict accepts the same input as from_msgpack did - SnapshotRecord.from_dict accepts the same input as from_msgpack did - TagRecord.from_dict accepts the same input as from_msgpack did - ReleaseRecord.from_dict accepts the same input as from_msgpack did - SymbolHistoryEntry.from_dict accepts the same input as from_msgpack did - No from_msgpack method exists on any storage record class - import msgpack does not appear in storage modules (wire-only allowlist enforced) """ from __future__ import annotations import datetime import pathlib import pytest from muse.core.commits import CommitRecord from muse.core.snapshots import SnapshotRecord from muse.core.tags import TagRecord from muse.core.releases import ReleaseRecord from muse.core.indices import SymbolHistoryEntry from muse.core.types import MsgpackDict, long_id, fake_id _NOW = datetime.datetime(2025, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) _TS = _NOW.isoformat() _CID = long_id("a" * 64) _SID = long_id("b" * 64) _RID = "test-repo" _TID = fake_id("tag-id") # --------------------------------------------------------------------------- # CommitRecord.from_dict # --------------------------------------------------------------------------- class TestCommitRecordFromDict: def _minimal(self) -> MsgpackDict: return { "commit_id": _CID, "repo_id": _RID, "branch": "main", "snapshot_id": _SID, "message": "test commit", "committed_at": _TS, } def test_from_dict_exists(self) -> None: assert hasattr(CommitRecord, "from_dict") def test_from_msgpack_does_not_exist(self) -> None: assert not hasattr(CommitRecord, "from_msgpack"), ( "CommitRecord.from_msgpack must be removed — use from_dict" ) def test_roundtrip(self) -> None: rec = CommitRecord.from_dict(self._minimal()) assert rec.branch == "main" assert rec.message == "test commit" def test_optional_fields_absent(self) -> None: rec = CommitRecord.from_dict(self._minimal()) assert rec.parent_commit_id is None assert rec.agent_id == "" def test_raises_on_bad_committed_at(self) -> None: d = self._minimal() d["committed_at"] = "not-a-date" with pytest.raises((ValueError, Exception)): CommitRecord.from_dict(d) # --------------------------------------------------------------------------- # SnapshotRecord.from_dict # --------------------------------------------------------------------------- class TestSnapshotRecordFromDict: def _minimal(self) -> MsgpackDict: return { "snapshot_id": _SID, "repo_id": _RID, "manifest": {}, "created_at": _TS, } def test_from_dict_exists(self) -> None: assert hasattr(SnapshotRecord, "from_dict") def test_from_msgpack_does_not_exist(self) -> None: assert not hasattr(SnapshotRecord, "from_msgpack"), ( "SnapshotRecord.from_msgpack must be removed — use from_dict" ) def test_roundtrip(self) -> None: rec = SnapshotRecord.from_dict(self._minimal()) assert rec.snapshot_id == _SID assert rec.manifest == {} def test_raises_on_bad_created_at(self) -> None: d = self._minimal() d["created_at"] = "not-a-date" with pytest.raises((ValueError, Exception)): SnapshotRecord.from_dict(d) # --------------------------------------------------------------------------- # TagRecord.from_dict # --------------------------------------------------------------------------- class TestTagRecordFromDict: def _minimal(self) -> MsgpackDict: return { "repo_id": _RID, "tag_id": _TID, "commit_id": _CID, "tag": "v1.0.0", "created_at": _TS, "message": "", "signature": "", "signer_public_key": "", "signer_key_id": "", } def test_from_dict_exists(self) -> None: assert hasattr(TagRecord, "from_dict") def test_from_msgpack_does_not_exist(self) -> None: assert not hasattr(TagRecord, "from_msgpack"), ( "TagRecord.from_msgpack must be removed — use from_dict" ) def test_roundtrip(self) -> None: rec = TagRecord.from_dict(self._minimal()) assert rec.tag == "v1.0.0" assert rec.commit_id == _CID # --------------------------------------------------------------------------- # ReleaseRecord.from_dict # --------------------------------------------------------------------------- class TestReleaseRecordFromDict: def _minimal(self) -> MsgpackDict: return { "release_id": fake_id("rel-id"), "repo_id": _RID, "tag": "v1.0.0", "semver": "1.0.0", "channel": "stable", "commit_id": _CID, "title": "", "notes": "", "created_at": _TS, "is_draft": False, "gpg_signature": "", } def test_from_dict_exists(self) -> None: assert hasattr(ReleaseRecord, "from_dict") def test_from_msgpack_does_not_exist(self) -> None: assert not hasattr(ReleaseRecord, "from_msgpack"), ( "ReleaseRecord.from_msgpack must be removed — use from_dict" ) def test_roundtrip(self) -> None: rec = ReleaseRecord.from_dict(self._minimal()) assert rec.tag == "v1.0.0" assert rec.channel == "stable" # --------------------------------------------------------------------------- # SymbolHistoryEntry.from_dict # --------------------------------------------------------------------------- class TestSymbolHistoryEntryFromDict: def _minimal(self) -> MsgpackDict: return { "commit_id": _CID, "committed_at": _TS, "op": "modified", "content_id": "", "body_hash": "", "signature_id": "", } def test_from_dict_exists(self) -> None: assert hasattr(SymbolHistoryEntry, "from_dict") def test_from_msgpack_does_not_exist(self) -> None: assert not hasattr(SymbolHistoryEntry, "from_msgpack"), ( "SymbolHistoryEntry.from_msgpack must be removed — use from_dict" ) def test_roundtrip(self) -> None: entry = SymbolHistoryEntry.from_dict(self._minimal()) assert entry.commit_id == _CID assert entry.op == "modified" # --------------------------------------------------------------------------- # Lint: no from_msgpack attribute on any storage record class # --------------------------------------------------------------------------- _REPO_ROOT = pathlib.Path(__file__).parent.parent _RECORD_CLASSES = [CommitRecord, SnapshotRecord, TagRecord, ReleaseRecord, SymbolHistoryEntry] def _has_from_msgpack_call(path: pathlib.Path) -> list[int]: """Return line numbers where .from_msgpack( appears in *path*.""" lines = [] for i, line in enumerate(path.read_text(encoding="utf-8").splitlines(), 1): if ".from_msgpack(" in line and not line.strip().startswith("#"): lines.append(i) return lines class TestNoFromMsgpackRemains: def test_no_from_msgpack_on_record_classes(self) -> None: """No storage record class may have a from_msgpack method.""" for cls in _RECORD_CLASSES: assert not hasattr(cls, "from_msgpack"), ( f"{cls.__name__}.from_msgpack still exists — rename to from_dict" ) def test_no_from_msgpack_calls_in_source(self) -> None: """No source file under muse/ (non-test) may call .from_msgpack().""" source_root = _REPO_ROOT / "muse" violations: list[tuple[pathlib.Path, int]] = [] for path in source_root.rglob("*.py"): for lineno in _has_from_msgpack_call(path): violations.append((path.relative_to(_REPO_ROOT), lineno)) assert violations == [], ( f"Calls to .from_msgpack() found in source files: {violations}" ) def test_no_from_msgpack_calls_in_tests(self) -> None: """No test file may call .from_msgpack() on a record class.""" tests_root = _REPO_ROOT / "tests" this_file = pathlib.Path(__file__).resolve() violations: list[tuple[pathlib.Path, int]] = [] for path in tests_root.rglob("*.py"): if path.resolve() == this_file: continue # this file documents the naming contract for lineno in _has_from_msgpack_call(path): violations.append((path.relative_to(_REPO_ROOT), lineno)) assert violations == [], ( f"Calls to .from_msgpack() found in test files: {violations}" )