"""TDD — release_analysis must not access commit_meta (dropped in migration 0020).""" from __future__ import annotations import pytest import msgpack from datetime import datetime, timezone from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitRef, MusehubSnapshot, MusehubSnapshotRef from musehub.types.json_types import JSONObject from tests.factories import create_repo from muse.core.types import blob_id def _utc() -> datetime: return datetime.now(tz=timezone.utc) async def _add_snapshot(session: AsyncSession, repo_id: str, seed: str, manifest: JSONObject) -> str: snap_id = blob_id(f"snap-{seed}".encode()) session.add(MusehubSnapshot( snapshot_id=snap_id, directories=[], manifest_blob=msgpack.packb(manifest, use_bin_type=True), entry_count=len(manifest), created_at=_utc(), )) session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=snap_id)) await session.commit() return snap_id async def _add_commit(session: AsyncSession, repo_id: str, seed: str, snap_id: str | None = None, **kwargs: typing.Any) -> MusehubCommit: row = MusehubCommit( commit_id=blob_id(seed.encode()), branch="dev", parent_ids=[], message=f"feat: {seed}", author="gabriel", timestamp=_utc(), snapshot_id=snap_id, **kwargs, ) session.add(row) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=row.commit_id)) session.add(MusehubBranch( branch_id=blob_id(f"br-{seed}".encode()), repo_id=repo_id, name="dev", head_commit_id=row.commit_id, )) await session.commit() return row # --------------------------------------------------------------------------- # RL1 — _load_commit_chain does not crash with AttributeError on commit_meta # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_rl1_load_commit_chain_no_commit_meta( db_session: AsyncSession, ) -> None: """_load_commit_chain must not raise AttributeError on commit_meta.""" from musehub.services.release_analysis import _walk_commits repo = await create_repo(db_session, owner="gabriel", visibility="public") snap_id = await _add_snapshot(db_session, repo.repo_id, "rl1", {"src/app.py": blob_id(b"content")}) commit = await _add_commit( db_session, repo.repo_id, "rl1-c1", snap_id, agent_id="claude-code", model_id="claude-sonnet-4-6", sem_ver_bump="minor", ) result = await _walk_commits(db_session, repo.repo_id, commit.commit_id, max_commits=10) assert isinstance(result, list) assert len(result) == 1 assert result[0]["agent_id"] == "claude-code" assert result[0]["model_id"] == "claude-sonnet-4-6"