"""muse.core.releases — release layer for the Muse VCS. Everything that reads, writes, or queries release records lives here. Public API ---------- ReleaseDict JSON-serialisable TypedDict for ReleaseRecord wire format. ReleaseRecord Versioned release dataclass with to_dict / from_dict. compute_release_id Content-addressed ID derivation for releases. release_path On-disk path helper. write_release / read_release Core release I/O. get_release_for_tag / list_releases / delete_release Query and mutation helpers. build_changelog Walk the commit graph to produce a typed changelog for a release. """ from __future__ import annotations import datetime import json as _json import logging import pathlib from dataclasses import dataclass, field from typing import TypedDict from muse.core.io import _read_msgpack_dict, _write_json_atomic from muse.core.paths import releases_dir as _releases_dir from muse.core.record_helpers import _int_val, _str_list, _str_val from muse.core.semver import ( ChangelogEntry, ReleaseChannel, SemVerTag, _CHANNEL_MAP, ) from muse.core.types import MsgpackDict, SemVerBump, content_hash, short_id, split_id logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Private deserialisation helpers # --------------------------------------------------------------------------- def _sem_ver_bump_val(d: MsgpackDict) -> SemVerBump: """Extract and validate a ``sem_ver_bump`` field from a raw storage dict. Falls back to ``"none"`` if the value is absent or not a recognised Literal — guards against tampered or forward-versioned records. """ val = _str_val(d, "sem_ver_bump", "none") if val == "major": return "major" if val == "minor": return "minor" if val == "patch": return "patch" return "none" def _parse_semver_tag(d: MsgpackDict, key: str) -> SemVerTag: """Extract a nested :class:`SemVerTag` from a raw storage mapping. Returns a zeroed ``SemVerTag`` if the key is absent or the value is not a dict — callers can inspect ``major == 0`` to detect the fallback. """ val = d.get(key) if isinstance(val, dict): return SemVerTag( major=_int_val(val, "major"), minor=_int_val(val, "minor"), patch=_int_val(val, "patch"), pre=_str_val(val, "pre"), build=_str_val(val, "build"), ) return SemVerTag(major=0, minor=0, patch=0, pre="", build="") def _parse_changelog_entries(d: MsgpackDict, key: str) -> list[ChangelogEntry]: """Extract a ``list[ChangelogEntry]`` from a raw storage mapping. Silently skips items that are not dicts — defensive deserialization at the boundary between wire format and in-memory types. """ val = d.get(key) if not isinstance(val, list): return [] entries: list[ChangelogEntry] = [] for item in val: if not isinstance(item, dict): continue entries.append( ChangelogEntry( commit_id=_str_val(item, "commit_id"), message=_str_val(item, "message"), sem_ver_bump=_sem_ver_bump_val(item), breaking_changes=_str_list(item, "breaking_changes"), author=_str_val(item, "author"), committed_at=_str_val(item, "committed_at"), agent_id=_str_val(item, "agent_id"), model_id=_str_val(item, "model_id"), ) ) return entries # --------------------------------------------------------------------------- # Wire-format TypedDict # --------------------------------------------------------------------------- class ReleaseDict(TypedDict): """JSON-serialisable representation of a ReleaseRecord.""" release_id: str repo_id: str tag: str semver: SemVerTag channel: str commit_id: str snapshot_id: str title: str body: str changelog: list[ChangelogEntry] agent_id: str model_id: str is_draft: bool gpg_signature: str created_at: str # --------------------------------------------------------------------------- # ID derivation # --------------------------------------------------------------------------- def compute_release_id(repo_id: str, tag: str, commit_id: str) -> str: """Return the content-addressed ``sha256:`` ID for a release. The ID is derived from the three fields that uniquely identify a release: its repository, the semver tag string, and the commit it pins. """ return content_hash({"commit_id": commit_id, "repo_id": repo_id, "tag": tag}) # --------------------------------------------------------------------------- # ReleaseRecord dataclass # --------------------------------------------------------------------------- @dataclass class ReleaseRecord: """A versioned release attached to a commit. A release is richer than a Git tag: - ``semver`` is a parsed struct (major/minor/patch/pre/build) — queryable by version component without string parsing. - ``channel`` replaces the boolean ``is_prerelease`` flag with a named distribution channel: stable | beta | alpha | nightly. - ``changelog`` is auto-generated from the typed ``sem_ver_bump`` and ``breaking_changes`` fields on commits since the previous release, so no conventional-commit parsing is needed. - ``snapshot_id`` makes the release byte-for-byte reproducible from the content-addressed object store forever. - ``agent_id`` / ``model_id`` surface AI provenance from the tip commit. - ``gpg_signature`` signs the release for tamper-evident distribution. """ release_id: str repo_id: str tag: str semver: SemVerTag channel: ReleaseChannel commit_id: str snapshot_id: str title: str body: str changelog: list[ChangelogEntry] agent_id: str = "" model_id: str = "" is_draft: bool = False gpg_signature: str = "" created_at: datetime.datetime = field( default_factory=lambda: datetime.datetime.now(datetime.timezone.utc) ) def to_dict(self) -> ReleaseDict: return ReleaseDict( release_id=self.release_id, repo_id=self.repo_id, tag=self.tag, semver=self.semver, channel=self.channel, commit_id=self.commit_id, snapshot_id=self.snapshot_id, title=self.title, body=self.body, changelog=list(self.changelog), agent_id=self.agent_id, model_id=self.model_id, is_draft=self.is_draft, gpg_signature=self.gpg_signature, created_at=self.created_at.isoformat(), ) @classmethod def from_dict(cls, d: "MsgpackDict | ReleaseDict") -> "ReleaseRecord": """Deserialise a :class:`ReleaseRecord` from a plain dict.""" created_at_str = _str_val(d, "created_at") try: created_at = datetime.datetime.fromisoformat(created_at_str) except ValueError: created_at = datetime.datetime.now(datetime.timezone.utc) channel = _CHANNEL_MAP.get(_str_val(d, "channel", "stable"), "stable") is_draft_val = d.get("is_draft", False) return cls( release_id=_str_val(d, "release_id"), repo_id=_str_val(d, "repo_id"), tag=_str_val(d, "tag"), semver=_parse_semver_tag(d, "semver"), channel=channel, commit_id=_str_val(d, "commit_id"), snapshot_id=_str_val(d, "snapshot_id"), title=_str_val(d, "title"), body=_str_val(d, "body"), changelog=_parse_changelog_entries(d, "changelog"), agent_id=_str_val(d, "agent_id"), model_id=_str_val(d, "model_id"), is_draft=bool(is_draft_val), gpg_signature=_str_val(d, "gpg_signature"), created_at=created_at, ) # --------------------------------------------------------------------------- # Path helper # --------------------------------------------------------------------------- def release_path( repo_root: pathlib.Path, repo_id: str, release_id: str ) -> pathlib.Path: """Return the on-disk path for a release record. Path shape: ``.muse/releases////.json`` Follows the same two-level algorithm convention as ``tag_path``. Args: repo_root: Repository root directory. repo_id: A ``:`` repo ID, or bare 64-char hex. release_id: A ``:`` release ID, or bare 64-char hex. Returns: Absolute path to the JSON file for this release. """ r_algo, r_hex = split_id(repo_id) rl_algo, rl_hex = split_id(release_id) return _releases_dir(repo_root) / r_algo / r_hex / rl_algo / f"{rl_hex}.json" # --------------------------------------------------------------------------- # Release I/O # --------------------------------------------------------------------------- def _read_release_or_migrate(path: pathlib.Path) -> "ReleaseRecord | None": """Read a release from *path*, handling both .json and legacy .msgpack files. If *path* is a .msgpack file, the record is silently migrated to .json and the old file deleted. """ try: if path.suffix == ".json": d: MsgpackDict = _json.loads(path.read_bytes().decode("utf-8")) else: d = _read_msgpack_dict(path) json_path = path.with_suffix(".json") _write_json_atomic(json_path, d) path.unlink(missing_ok=True) return ReleaseRecord.from_dict(d) except Exception: return None def write_release(repo_root: pathlib.Path, release: ReleaseRecord) -> None: """Persist a release record to ``.muse/releases////.json``.""" path = release_path(repo_root, release.repo_id, release.release_id) path.parent.mkdir(parents=True, exist_ok=True) _write_json_atomic(path, release.to_dict()) logger.debug("✅ Stored release %r (%s)", release.tag, short_id(release.release_id)) def read_release( repo_root: pathlib.Path, repo_id: str, release_id: str ) -> ReleaseRecord | None: """Load a release by its ID, or ``None`` if it does not exist or is corrupt.""" path = release_path(repo_root, repo_id, release_id) if path.exists(): return _read_release_or_migrate(path) # Fallback: legacy .msgpack file (silent upgrade to .json) legacy = path.with_suffix(".msgpack") if legacy.exists(): return _read_release_or_migrate(legacy) return None def get_release_for_tag( repo_root: pathlib.Path, repo_id: str, tag: str ) -> ReleaseRecord | None: """Return the release whose version tag matches *tag*, or ``None``. Searches all releases including drafts so callers can inspect or delete a draft before it is published. """ for release in list_releases(repo_root, repo_id, include_drafts=True): if release.tag == tag: return release return None def list_releases( repo_root: pathlib.Path, repo_id: str, channel: ReleaseChannel | None = None, include_drafts: bool = False, ) -> list[ReleaseRecord]: """Return all releases, newest first. Args: repo_root: Repository root. repo_id: Content-addressed repo ID (``:`` or bare hex). channel: Filter by channel; ``None`` returns all channels. include_drafts: When ``False`` (default) draft releases are hidden. """ r_algo, r_hex = split_id(repo_id) repo_dir = _releases_dir(repo_root) / r_algo / r_hex if not repo_dir.exists(): return [] results: list[ReleaseRecord] = [] for path in repo_dir.glob("*/*"): if path.suffix not in (".json", ".msgpack"): continue r = _read_release_or_migrate(path) if r is None: logger.critical("❌ Corrupt release file %s — skipped in listing", path) continue if r.is_draft and not include_drafts: continue if channel is not None and r.channel != channel: continue results.append(r) results.sort(key=lambda r: r.created_at, reverse=True) return results def delete_release(repo_root: pathlib.Path, repo_id: str, release_id: str) -> bool: """Delete a release record. Returns ``True`` if it existed. Callers are responsible for enforcing that only draft releases may be deleted. This function performs no such guard — enforce at the CLI/API layer. """ path = release_path(repo_root, repo_id, release_id) if path.exists(): path.unlink() logger.debug("🗑️ Deleted release %s", short_id(release_id)) return True return False def build_changelog( repo_root: pathlib.Path, from_commit_id: str | None, to_commit_id: str, max_commits: int = 500, ) -> list[ChangelogEntry]: """Walk the commit graph from *to_commit_id* back to *from_commit_id*. Returns a list of :class:`ChangelogEntry` dicts, oldest first, excluding merge commits and ``sem_ver_bump="none"`` commits (they carry no user-visible change). *from_commit_id* is excluded; *to_commit_id* is included. Args: repo_root: Repository root. from_commit_id: The last release commit (exclusive start), or ``None`` for a first release (walks back to the initial commit). to_commit_id: The HEAD commit to release (inclusive end). max_commits: Safety cap — changelog never exceeds this many entries. """ from muse.core.commits import walk_commits_between # local to avoid circular import raw = walk_commits_between(repo_root, to_commit_id, from_commit_id, max_commits) entries: list[ChangelogEntry] = [] for commit in reversed(raw): # oldest first entries.append(ChangelogEntry( commit_id=commit.commit_id, message=commit.message, sem_ver_bump=commit.sem_ver_bump, breaking_changes=list(commit.breaking_changes), author=commit.author, committed_at=commit.committed_at.isoformat(), agent_id=commit.agent_id, model_id=commit.model_id, )) return entries