"""MuseHub profile persistence adapter. Single point of DB access for user-profile entities (``musehub_identities``). Aggregates cross-repo data (public repos, contribution graph, session credits) so that route handlers stay thin. """ import logging from collections import defaultdict from datetime import datetime, timedelta, timezone from sqlalchemy import distinct, desc, func, select, text from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_identity_models import MusehubIdentity from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo from musehub.types.json_types import IntDict from musehub.models.musehub import ( ActivityDomain, AttestationBadge, OrgManifest, ProfileManifest, ProfileResponse, ProfileRepoSummary, ProfileUpdateRequest, TrustChainEntry, ) logger = logging.getLogger(__name__) _MAX_PINNED = 6 _CONTRIBUTION_WEEKS = 52 def _utc_today() -> datetime: return datetime.now(tz=timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) def _to_profile_response( identity: MusehubIdentity, repos: list[ProfileRepoSummary], session_credits: int, ) -> ProfileResponse: return ProfileResponse( user_id=identity.identity_id, username=identity.handle, display_name=identity.display_name, bio=identity.bio, avatar_url=identity.avatar_url, location=identity.location, website_url=identity.website_url, social_url=identity.social_url, is_verified=identity.is_verified, cc_license=identity.cc_license, pinned_repo_ids=list(identity.pinned_repo_ids or []), repos=repos, session_credits=session_credits, created_at=identity.created_at, updated_at=identity.updated_at, ) async def create_profile( session: AsyncSession, user_id: str, username: str, bio: str | None = None, avatar_url: str | None = None, ) -> MusehubIdentity: """Create a new MusehubIdentity for a human user.""" identity = MusehubIdentity( identity_id=user_id, handle=username, identity_type="human", bio=bio, avatar_url=avatar_url, ) session.add(identity) await session.flush() return identity async def get_profile_by_username( session: AsyncSession, username: str ) -> MusehubIdentity | None: result = await session.execute( select(MusehubIdentity).where( MusehubIdentity.handle == username, MusehubIdentity.deleted_at.is_(None), ) ) return result.scalar_one_or_none() async def get_profile_by_user_id( session: AsyncSession, user_id: str ) -> MusehubIdentity | None: result = await session.execute( select(MusehubIdentity).where(MusehubIdentity.identity_id == user_id) ) return result.scalar_one_or_none() async def update_profile( session: AsyncSession, identity: MusehubIdentity, patch: ProfileUpdateRequest, ) -> MusehubIdentity: if patch.display_name is not None: identity.display_name = patch.display_name if patch.bio is not None: identity.bio = patch.bio if patch.avatar_url is not None: identity.avatar_url = patch.avatar_url if patch.location is not None: identity.location = patch.location if patch.website_url is not None: identity.website_url = patch.website_url if patch.social_url is not None: identity.social_url = patch.social_url if patch.pinned_repo_ids is not None: identity.pinned_repo_ids = patch.pinned_repo_ids[:_MAX_PINNED] identity.updated_at = datetime.now(tz=timezone.utc) session.add(identity) await session.flush() return identity async def get_public_repos( session: AsyncSession, handle: str ) -> list[ProfileRepoSummary]: repo_rows_result = await session.execute( select(MusehubRepo) .where( MusehubRepo.owner == handle, MusehubRepo.visibility == "public", ) .order_by(desc(MusehubRepo.created_at)) ) repo_rows = list(repo_rows_result.scalars()) if not repo_rows: return [] repo_ids = [r.repo_id for r in repo_rows] latest_result = await session.execute( select( MusehubCommitRef.repo_id, func.max(MusehubCommit.timestamp).label("last_activity"), ) .join(MusehubCommit, MusehubCommitRef.commit_id == MusehubCommit.commit_id) .where(MusehubCommitRef.repo_id.in_(repo_ids)) .group_by(MusehubCommitRef.repo_id) ) last_activity = {row.repo_id: row.last_activity for row in latest_result} return [ ProfileRepoSummary( repo_id=r.repo_id, name=r.name, owner=r.owner, slug=r.slug, visibility=r.visibility, domain=r.domain_id or "code", last_activity_at=last_activity.get(r.repo_id), created_at=r.created_at, ) for r in repo_rows ] async def get_session_credits(session: AsyncSession, handle: str) -> int: repo_result = await session.execute( select(MusehubRepo.repo_id).where( MusehubRepo.owner == handle, ) ) repo_ids = [row[0] for row in repo_result] if not repo_ids: return 0 result = await session.execute( select(func.count(MusehubCommitRef.commit_id)).where( MusehubCommitRef.repo_id.in_(repo_ids) ) ) count = result.scalar() return int(count) if count is not None else 0 async def get_full_profile( session: AsyncSession, username: str ) -> ProfileResponse | None: identity = await get_profile_by_username(session, username) if identity is None: return None repos, session_credits = ( await get_public_repos(session, identity.handle), await get_session_credits(session, identity.handle), ) return _to_profile_response(identity, repos, session_credits) # --------------------------------------------------------------------------- # Multi-domain activity canvas (52 weeks × 7 days per domain) # --------------------------------------------------------------------------- _GRID_DAYS = 364 # 52 × 7 # Internal system domains that are not meaningful user-facing activity. _CANVAS_EXCLUDED_DOMAINS: frozenset[str] = frozenset({"identity", "social"}) # Valid domain slugs are short alphanumeric slugs. Anything containing ":" is a # raw cryptographic identifier (e.g. sha256:) that leaked into domain_id — # treat those as "code" (the default code-domain) rather than a new domain label. def _normalise_domain_slug(raw: str | None) -> str: slug = raw or "code" if ":" in slug: return "code" return slug def _empty_grid() -> list[int]: return [0] * _GRID_DAYS def _date_to_grid_index(today: datetime, target_date: datetime) -> int | None: """Return the grid index (0 = oldest Monday) for a given date, or None if out of range.""" delta = (today.date() - target_date.date()).days if delta < 0 or delta >= _GRID_DAYS: return None return _GRID_DAYS - 1 - delta async def _build_domain_commit_grid( session: AsyncSession, handle: str, today: datetime, cutoff: datetime, domain_id: str, ) -> list[int]: """Count commits to repos owned by handle with the given domain slug. "code" matches both repos with domain_id="code" and repos with domain_id=NULL (legacy rows created before domain tracking was added). """ if domain_id == "code": # "code" is the default domain. Match explicit "code" rows, NULL (legacy), # and any domain_id that is a raw cryptographic value (contains ":") — those # are mis-classified repos that should have been tagged "code". domain_filter = ( (MusehubRepo.domain_id == "code") | MusehubRepo.domain_id.is_(None) | MusehubRepo.domain_id.like("%:%") ) else: domain_filter = MusehubRepo.domain_id == domain_id repo_result = await session.execute( select(MusehubRepo.repo_id).where( MusehubRepo.owner == handle, domain_filter, ) ) repo_ids = [row[0] for row in repo_result] if not repo_ids: return _empty_grid() rows = await session.execute( select( func.date(MusehubCommit.timestamp).label("day"), func.count(MusehubCommit.commit_id).label("cnt"), ) .join(MusehubCommitRef, MusehubCommitRef.commit_id == MusehubCommit.commit_id) .where( MusehubCommitRef.repo_id.in_(repo_ids), MusehubCommit.timestamp >= cutoff, ) .group_by(func.date(MusehubCommit.timestamp)) ) grid = _empty_grid() for row in rows: idx = _date_to_grid_index(today, datetime.fromisoformat(str(row.day))) if idx is not None: grid[idx] = int(row.cnt) return grid def _grid_to_domain(domain: str, grid: list[int]) -> ActivityDomain: peak = max(grid) if grid else 0 total = sum(grid) return ActivityDomain(domain=domain, grid=grid, peak=peak, total=total) async def build_activity_canvas( session: AsyncSession, handle: str ) -> list[ActivityDomain]: """Build the activity canvas for a handle. domain_id in musehub_repos is a plain string label ("code", "midi", "mist", …) or NULL for legacy repos. NULL is treated as "code". Internal domains (identity, social) are excluded. Only domains with at least one commit in the trailing 52 weeks are included. """ today = _utc_today() cutoff = today - timedelta(weeks=_CONTRIBUTION_WEEKS) # Collect all distinct domain slugs the user has repos under. # NULL domain_id is normalised to "code" here and in _build_domain_commit_grid. result = await session.execute( select(MusehubRepo.domain_id) .where(MusehubRepo.owner == handle) .distinct() ) raw_slugs: set[str] = { _normalise_domain_slug(row[0]) for row in result if _normalise_domain_slug(row[0]) not in _CANVAS_EXCLUDED_DOMAINS } domains: list[ActivityDomain] = [] for slug in sorted(raw_slugs): grid = await _build_domain_commit_grid(session, handle, today, cutoff, slug) entry = _grid_to_domain(slug, grid) if entry.total > 0: domains.append(entry) return domains # --------------------------------------------------------------------------- # Trust chain — agent lineage back to spawning human # --------------------------------------------------------------------------- async def _build_trust_chain( session: AsyncSession, identity: MusehubIdentity ) -> list[TrustChainEntry]: """Walk the spawned_by chain from agent identity up to the root human.""" chain: list[TrustChainEntry] = [ TrustChainEntry( handle=identity.handle, identity_type=identity.identity_type, spawned_by=identity.spawned_by, ) ] current_handle = identity.spawned_by seen: set[str] = {identity.handle} while current_handle and current_handle not in seen: seen.add(current_handle) result = await session.execute( select(MusehubIdentity).where( MusehubIdentity.handle == current_handle, MusehubIdentity.deleted_at.is_(None), ) ) parent = result.scalar_one_or_none() if parent is None: break chain.append( TrustChainEntry( handle=parent.handle, identity_type=parent.identity_type, spawned_by=parent.spawned_by, ) ) current_handle = parent.spawned_by return chain # --------------------------------------------------------------------------- # Unified profile manifest # --------------------------------------------------------------------------- async def build_profile_manifest( session: AsyncSession, handle: str, attestations: list[AttestationBadge] | None = None, mpay_sent_nano: int = 0, mpay_received_nano: int = 0, ) -> ProfileManifest | None: """Build the unified archetype-aware profile manifest. ``attestations`` and MPay totals are supplied by the caller so this function doesn't need to import the attestation/mpay services directly. """ identity = await get_profile_by_username(session, handle) if identity is None: return None repos = await get_public_repos(session, handle) activity = await build_activity_canvas(session, handle) trust_chain: list[TrustChainEntry] = [] org: OrgManifest | None = None if identity.identity_type == "agent": trust_chain = await _build_trust_chain(session, identity) elif identity.identity_type == "org": org = OrgManifest( members=list(identity.org_members or []), quorum=identity.org_quorum or 1, treasury_address=identity.org_treasury_address, ) return ProfileManifest( identity_id=identity.identity_id, handle=identity.handle, identity_type=identity.identity_type, display_name=identity.display_name, bio=identity.bio, avatar_url=identity.avatar_url, location=identity.location, website_url=identity.website_url, social_url=identity.social_url, is_verified=identity.is_verified, cc_license=identity.cc_license, pinned_repo_ids=list(identity.pinned_repo_ids or []), repos=repos, created_at=identity.created_at, updated_at=identity.updated_at, activity=activity, attestations=attestations or [], avax_address=identity.avax_address if identity.identity_type == "human" else None, agent_model=identity.agent_model if identity.identity_type == "agent" else None, agent_capabilities=list(identity.agent_capabilities or []) if identity.identity_type == "agent" else [], trust_chain=trust_chain, org=org, mpay_total_sent_nano=mpay_sent_nano, mpay_total_received_nano=mpay_received_nano, )