"""Section 15 — Mists: 8-layer test suite.
Tests span the ORM model (MusehubMist), the seven Pydantic wire models in
musehub.models.mists, and the nine service functions in
musehub.services.musehub_mists.
Layer 1 Unit
- TestUnitPydanticModels: MistResponse, MistListEntry, MistListResponse,
MistCreateRequest, MistUpdateRequest, MistForkResponse, MistEmbedResponse
construction and field defaults.
- TestUnitValidators: MistCreateRequest.validate_visibility,
validate_tags (count, length, null-byte, HTML-special),
validate_filename (delegates to validate_mist_filename).
Layer 2 Integration
- TestIntegrationCreate: create_mist persists all fields, returns MistResponse.
- TestIntegrationGet: get_mist returns populated response or None.
- TestIntegrationList: list_mists — owner filter, secret exclusion, artifact_type
filter, total counter, next_cursor pagination.
- TestIntegrationFork: fork_mist copies content, sets fork_parent_id,
increments source fork_count atomically.
- TestIntegrationUpdate: update_mist patches fields, owner guard.
- TestIntegrationDelete: delete_mist hard-delete, owner guard.
- TestIntegrationCounters: increment_mist_view / increment_mist_embed atomic.
- TestIntegrationForkList: get_mist_forks returns direct children.
Layer 3 Edge Cases
- TestEdgeCases: create duplicate mist_id → IntegrityError; fork with None
parent → None; fork depth limit; update content increments version;
list with bad cursor string is ignored.
Layer 4 Stress
- TestStress: 50 mists created, list returns expected totals; 5-level fork chain.
Layer 5 Data Integrity
- TestDataIntegrity: list total stays consistent after delete; counters are
independent per mist; fork inherits tags and symbol_anchors; fork keeps
parent visibility.
Layer 6 Performance
- TestPerformance: list 50 mists <500ms; count 100 forks <500ms.
Layer 7 Security
- TestSecurity: update/delete rejected for non-owner; secret mists hidden from
public list; fork depth limit blocks chain attacks.
Layer 8 Docstrings / API
- TestDocstrings: every public service function has a docstring; every Pydantic
model has a class docstring.
"""
from __future__ import annotations
import secrets
import time
import pytest
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime, timezone
from musehub.core.genesis import compute_identity_id, compute_repo_id
from musehub.db.musehub_repo_models import MusehubMist, MusehubRepo
from musehub.models.mists import (
MistCreateRequest,
MistEmbedResponse,
MistForkResponse,
MistListEntry,
MistListResponse,
MistResponse,
MistUpdateRequest,
)
from musehub.services.musehub_mists import (
create_mist,
delete_mist,
fork_mist,
get_mist,
get_mist_forks,
increment_mist_embed,
increment_mist_view,
list_mists,
update_mist,
)
# ===========================================================================
# Helpers
# ===========================================================================
_OWNER = "gabriel"
_OTHER = "alice"
def _uid() -> str:
return secrets.token_hex(16)
def _mist_id() -> str:
return secrets.token_hex(6)
async def _repo(
session: AsyncSession,
slug: str | None = None,
owner: str = _OWNER,
visibility: str = "public",
) -> MusehubRepo:
slug = slug or _uid()
created_at = datetime.now(tz=timezone.utc)
owner_id = compute_identity_id(owner.encode())
repo = MusehubRepo(
repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()),
name=slug,
owner=owner,
slug=slug,
visibility=visibility,
owner_user_id=owner_id,
created_at=created_at,
updated_at=created_at,
)
session.add(repo)
await session.flush()
await session.refresh(repo)
return repo
async def _mist(
session: AsyncSession,
repo_id: str,
*,
mist_id: str | None = None,
owner: str = _OWNER,
filename: str = "hello.py",
content: str = "print('hello')",
artifact_type: str = "code",
language: str = "python",
visibility: str = "public",
tags: list[str] | None = None,
symbol_anchors: list[str] | None = None,
title: str = "",
description: str = "",
) -> MistResponse:
return await create_mist(
session,
mist_id=mist_id or _mist_id(),
filename=filename,
content=content,
owner=owner,
repo_id=repo_id,
artifact_type=artifact_type,
language=language,
size_bytes=len(content.encode()),
visibility=visibility,
tags=tags or [],
symbol_anchors=symbol_anchors or [],
title=title,
description=description,
)
# ===========================================================================
# Layer 1 — Unit: Pydantic models
# ===========================================================================
class TestUnitPydanticModels:
"""MistResponse and friends construct with expected defaults."""
def test_mist_response_required_fields(self) -> None:
from datetime import datetime, timezone
now = datetime.now(tz=timezone.utc)
resp = MistResponse(
mist_id="abc123def456",
owner="gabriel",
artifact_type="code",
filename="script.py",
created_at=now,
updated_at=now,
)
assert resp.mist_id == "abc123def456"
assert resp.owner == "gabriel"
assert resp.content == ""
assert resp.signed is False
assert resp.fork_parent_id is None
assert resp.fork_depth == 0
assert resp.visibility == "public"
assert resp.tags == []
assert resp.symbol_anchors == []
def test_mist_list_entry_primary_symbol_none(self) -> None:
from datetime import datetime, timezone
now = datetime.now(tz=timezone.utc)
entry = MistListEntry(
mist_id="abc123def456",
owner="gabriel",
artifact_type="code",
filename="essay.md",
created_at=now,
updated_at=now,
)
assert entry.primary_symbol is None
assert entry.language == ""
assert entry.title == ""
def test_mist_list_response_defaults(self) -> None:
resp = MistListResponse()
assert resp.total == 0
assert resp.next_cursor is None
assert resp.mists == []
def test_mist_create_request_minimal(self) -> None:
req = MistCreateRequest(filename="score.mid", content="binary")
assert req.visibility == "public"
assert req.tags == []
assert req.agent_id == ""
assert req.gpg_signature is None
def test_mist_update_request_all_none(self) -> None:
req = MistUpdateRequest()
assert req.title is None
assert req.content is None
assert req.visibility is None
def test_mist_fork_response_fields(self) -> None:
from datetime import datetime, timezone
now = datetime.now(tz=timezone.utc)
resp = MistForkResponse(
mist_id="fork11111111",
owner="alice",
fork_parent_id="orig11111111",
artifact_type="code",
filename="main.py",
created_at=now,
)
assert resp.url == ""
assert resp.language == ""
def test_mist_embed_response_fields(self) -> None:
resp = MistEmbedResponse(
mist_id="abc123def456",
owner="gabriel",
iframe="",
js="",
badge="[](link)",
)
assert resp.mist_id == "abc123def456"
def test_mist_response_camel_alias(self) -> None:
from datetime import datetime, timezone
now = datetime.now(tz=timezone.utc)
resp = MistResponse(
mist_id="abc123def456",
owner="gabriel",
artifact_type="code",
filename="main.py",
created_at=now,
updated_at=now,
fork_parent_id="parent11111",
symbol_anchors=["main.py::foo"],
)
d = resp.model_dump(by_alias=True)
assert "mistId" in d
assert "forkParentId" in d
assert "symbolAnchors" in d
class TestUnitValidators:
"""MistCreateRequest validators reject bad input."""
def test_visibility_invalid(self) -> None:
with pytest.raises(Exception):
MistCreateRequest(filename="f.py", content="x", visibility="private")
def test_visibility_valid_values(self) -> None:
MistCreateRequest(filename="f.py", content="x", visibility="public")
MistCreateRequest(filename="f.py", content="x", visibility="secret")
def test_too_many_tags(self) -> None:
with pytest.raises(Exception):
MistCreateRequest(
filename="f.py",
content="x",
tags=["t"] * 11,
)
def test_tag_too_long(self) -> None:
with pytest.raises(Exception):
MistCreateRequest(
filename="f.py",
content="x",
tags=["a" * 65],
)
def test_tag_null_byte(self) -> None:
with pytest.raises(Exception):
MistCreateRequest(
filename="f.py",
content="x",
tags=["bad\x00tag"],
)
def test_tag_html_special(self) -> None:
for ch in ("<", ">", '"', "'", "&"):
with pytest.raises(Exception):
MistCreateRequest(
filename="f.py",
content="x",
tags=[f"tag{ch}val"],
)
def test_tag_valid(self) -> None:
req = MistCreateRequest(
filename="f.py",
content="x",
tags=["python", "audio", "ai"],
)
assert len(req.tags) == 3
def test_filename_traversal_rejected(self) -> None:
with pytest.raises(Exception):
MistCreateRequest(filename="../etc/passwd", content="x")
def test_filename_path_separator_rejected(self) -> None:
with pytest.raises(Exception):
MistCreateRequest(filename="a/b.py", content="x")
def test_filename_null_byte_rejected(self) -> None:
with pytest.raises(Exception):
MistCreateRequest(filename="f\x00ile.py", content="x")
def test_update_visibility_invalid(self) -> None:
with pytest.raises(Exception):
MistUpdateRequest(visibility="admin")
def test_update_visibility_none_allowed(self) -> None:
req = MistUpdateRequest(visibility=None)
assert req.visibility is None
# ===========================================================================
# Layer 2 — Integration
# ===========================================================================
class TestIntegrationCreate:
"""create_mist persists a row and returns MistResponse."""
@pytest.mark.asyncio
async def test_create_stores_all_fields(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
mid = _mist_id()
resp = await create_mist(
db_session,
mist_id=mid,
filename="score.mid",
content="MThd...",
owner=_OWNER,
repo_id=str(repo.repo_id),
artifact_type="midi",
language="",
size_bytes=7,
title="My Score",
description="A midi score",
visibility="public",
tags=["music", "midi"],
symbol_anchors=[],
agent_id="agent-1",
model_id="claude-sonnet-4-6",
gpg_signature="SIG",
)
assert resp.mist_id == mid
assert resp.filename == "score.mid"
assert resp.artifact_type == "midi"
assert resp.title == "My Score"
assert resp.description == "A midi score"
assert resp.tags == ["music", "midi"]
assert resp.signed is True
assert resp.agent_id == "agent-1"
assert resp.model_id == "claude-sonnet-4-6"
assert resp.fork_depth == 0
assert resp.fork_count == 0
assert resp.view_count == 0
assert resp.version == 1
@pytest.mark.asyncio
async def test_create_defaults(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
resp = await create_mist(
db_session,
mist_id=_mist_id(),
filename="minimal.txt",
content="hello",
owner=_OWNER,
repo_id=str(repo.repo_id),
)
assert resp.artifact_type == "unknown"
assert resp.language == ""
assert resp.title == ""
assert resp.description == ""
assert resp.visibility == "public"
assert resp.tags == []
assert resp.signed is False
@pytest.mark.asyncio
async def test_create_returns_mist_response(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
resp = await _mist(db_session, str(repo.repo_id))
assert isinstance(resp, MistResponse)
assert resp.created_at is not None
assert resp.updated_at is not None
@pytest.mark.asyncio
async def test_create_with_base_url(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
mid = _mist_id()
resp = await create_mist(
db_session,
mist_id=mid,
filename="f.py",
content="x",
owner=_OWNER,
repo_id=str(repo.repo_id),
base_url="https://musehub.ai",
)
assert resp.url == f"https://musehub.ai/{_OWNER}/mists/{mid}"
@pytest.mark.asyncio
async def test_create_symbol_anchors_stored(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
anchors = ["main.py::foo", "main.py::bar"]
resp = await _mist(
db_session,
str(repo.repo_id),
symbol_anchors=anchors,
)
assert resp.symbol_anchors == anchors
class TestIntegrationGet:
"""get_mist returns MistResponse or None."""
@pytest.mark.asyncio
async def test_get_existing(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
created = await _mist(db_session, str(repo.repo_id))
fetched = await get_mist(db_session, created.mist_id)
assert fetched is not None
assert fetched.mist_id == created.mist_id
assert fetched.content == created.content
@pytest.mark.asyncio
async def test_get_not_found(self, db_session: AsyncSession) -> None:
result = await get_mist(db_session, "notexist000")
assert result is None
@pytest.mark.asyncio
async def test_get_with_base_url(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
created = await _mist(db_session, str(repo.repo_id))
fetched = await get_mist(db_session, created.mist_id, base_url="https://musehub.ai")
assert fetched is not None
assert fetched.url.startswith("https://musehub.ai/")
class TestIntegrationList:
"""list_mists — filters, totals, pagination."""
@pytest.mark.asyncio
async def test_list_by_owner(self, db_session: AsyncSession) -> None:
r1 = await _repo(db_session, owner=_OWNER)
r2 = await _repo(db_session, owner=_OTHER)
for _ in range(3):
await _mist(db_session, str(r1.repo_id), owner=_OWNER)
for _ in range(2):
await _mist(db_session, str(r2.repo_id), owner=_OTHER)
result = await list_mists(db_session, _OWNER)
assert result.total == 3
assert all(e.owner == _OWNER for e in result.mists)
@pytest.mark.asyncio
async def test_list_secret_hidden_by_default(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
await _mist(db_session, str(repo.repo_id), visibility="public")
await _mist(db_session, str(repo.repo_id), visibility="secret")
result = await list_mists(db_session, _OWNER, include_secret=False)
assert result.total == 1
assert all(e.visibility == "public" for e in result.mists)
@pytest.mark.asyncio
async def test_list_secret_visible_when_owner(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
await _mist(db_session, str(repo.repo_id), visibility="public")
await _mist(db_session, str(repo.repo_id), visibility="secret")
result = await list_mists(db_session, _OWNER, include_secret=True)
assert result.total == 2
@pytest.mark.asyncio
async def test_list_artifact_type_filter(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
await _mist(db_session, str(repo.repo_id), artifact_type="code")
await _mist(db_session, str(repo.repo_id), artifact_type="midi")
await _mist(db_session, str(repo.repo_id), artifact_type="midi")
result = await list_mists(db_session, _OWNER, artifact_type="midi")
assert result.total == 2
assert all(e.artifact_type == "midi" for e in result.mists)
@pytest.mark.asyncio
async def test_list_pagination_next_cursor(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
for _ in range(5):
await _mist(db_session, str(repo.repo_id))
page1 = await list_mists(db_session, _OWNER, limit=3)
assert len(page1.mists) == 3
assert page1.next_cursor is not None
page2 = await list_mists(db_session, _OWNER, limit=3, cursor=page1.next_cursor)
assert len(page2.mists) == 2
assert page2.next_cursor is None
@pytest.mark.asyncio
async def test_list_global_explore(self, db_session: AsyncSession) -> None:
r1 = await _repo(db_session, owner=_OWNER)
r2 = await _repo(db_session, owner=_OTHER)
await _mist(db_session, str(r1.repo_id), owner=_OWNER)
await _mist(db_session, str(r2.repo_id), owner=_OTHER, visibility="secret")
# explore — owner=None, include_secret=False → only 1 public
result = await list_mists(db_session, owner=None, include_secret=False)
assert result.total == 1
@pytest.mark.asyncio
async def test_list_newest_first(self, db_session: AsyncSession) -> None:
import asyncio
repo = await _repo(db_session)
for i in range(3):
await _mist(db_session, str(repo.repo_id), content=f"v{i}")
await asyncio.sleep(0.01)
result = await list_mists(db_session, _OWNER, limit=10)
dates = [e.created_at for e in result.mists]
assert dates == sorted(dates, reverse=True)
class TestIntegrationFork:
"""fork_mist creates a copy with correct linkage."""
@pytest.mark.asyncio
async def test_fork_creates_linked_copy(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
original = await _mist(db_session, str(repo.repo_id), tags=["a", "b"])
fork_repo = await _repo(db_session, owner=_OTHER)
fork_id = _mist_id()
resp = await fork_mist(
db_session,
original.mist_id,
new_mist_id=fork_id,
new_owner=_OTHER,
new_repo_id=str(fork_repo.repo_id),
)
assert resp is not None
assert resp.mist_id == fork_id
assert resp.fork_parent_id == original.mist_id
assert resp.owner == _OTHER
@pytest.mark.asyncio
async def test_fork_increments_source_fork_count(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
original = await _mist(db_session, str(repo.repo_id))
fork_repo = await _repo(db_session, owner=_OTHER)
await fork_mist(
db_session,
original.mist_id,
new_mist_id=_mist_id(),
new_owner=_OTHER,
new_repo_id=str(fork_repo.repo_id),
)
await db_session.commit()
source = await get_mist(db_session, original.mist_id)
assert source is not None
assert source.fork_count == 1
@pytest.mark.asyncio
async def test_fork_inherits_content(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
original = await _mist(
db_session,
str(repo.repo_id),
content="def foo(): pass",
symbol_anchors=["main.py::foo"],
tags=["ai"],
)
fork_repo = await _repo(db_session, owner=_OTHER)
fork_id = _mist_id()
await fork_mist(
db_session,
original.mist_id,
new_mist_id=fork_id,
new_owner=_OTHER,
new_repo_id=str(fork_repo.repo_id),
)
fork = await get_mist(db_session, fork_id)
assert fork is not None
assert fork.content == "def foo(): pass"
assert fork.symbol_anchors == ["main.py::foo"]
assert fork.tags == ["ai"]
@pytest.mark.asyncio
async def test_fork_increments_depth(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
original = await _mist(db_session, str(repo.repo_id))
fork_repo = await _repo(db_session, owner=_OTHER)
fork_id = _mist_id()
resp = await fork_mist(
db_session,
original.mist_id,
new_mist_id=fork_id,
new_owner=_OTHER,
new_repo_id=str(fork_repo.repo_id),
)
assert resp is not None
fork = await get_mist(db_session, fork_id)
assert fork is not None
assert fork.fork_depth == 1
@pytest.mark.asyncio
async def test_fork_nonexistent_returns_none(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
result = await fork_mist(
db_session,
"doesnotexist",
new_mist_id=_mist_id(),
new_owner=_OTHER,
new_repo_id=str(repo.repo_id),
)
assert result is None
class TestIntegrationUpdate:
"""update_mist patches fields; owner guard blocks others."""
@pytest.mark.asyncio
async def test_update_title(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
original = await _mist(db_session, str(repo.repo_id), title="old")
updated = await update_mist(
db_session, original.mist_id, _OWNER, title="new title"
)
assert updated is not None
assert updated.title == "new title"
@pytest.mark.asyncio
async def test_update_visibility(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
original = await _mist(db_session, str(repo.repo_id), visibility="public")
updated = await update_mist(
db_session, original.mist_id, _OWNER, visibility="secret"
)
assert updated is not None
assert updated.visibility == "secret"
@pytest.mark.asyncio
async def test_update_content_increments_version(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
original = await _mist(db_session, str(repo.repo_id), content="v1")
updated = await update_mist(
db_session, original.mist_id, _OWNER, content="v2"
)
assert updated is not None
assert updated.content == "v2"
assert updated.version == 2
@pytest.mark.asyncio
async def test_update_non_owner_returns_none(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
original = await _mist(db_session, str(repo.repo_id))
result = await update_mist(
db_session, original.mist_id, _OTHER, title="hacked"
)
assert result is None
@pytest.mark.asyncio
async def test_update_not_found_returns_none(self, db_session: AsyncSession) -> None:
result = await update_mist(
db_session, "notexist000", _OWNER, title="x"
)
assert result is None
@pytest.mark.asyncio
async def test_update_filename(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
original = await _mist(db_session, str(repo.repo_id), filename="foo.md")
updated = await update_mist(
db_session, original.mist_id, _OWNER, filename="object_store_details.md"
)
assert updated is not None
assert updated.filename == "object_store_details.md"
@pytest.mark.asyncio
async def test_update_filename_with_content(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
original = await _mist(db_session, str(repo.repo_id), filename="foo.md", content="v1")
updated = await update_mist(
db_session, original.mist_id, _OWNER,
filename="renamed.md", content="v2"
)
assert updated is not None
assert updated.filename == "renamed.md"
assert updated.content == "v2"
assert updated.version == 2
@pytest.mark.asyncio
async def test_update_filename_none_leaves_unchanged(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
original = await _mist(db_session, str(repo.repo_id), filename="original.py")
updated = await update_mist(
db_session, original.mist_id, _OWNER, title="new title"
)
assert updated is not None
assert updated.filename == "original.py"
@pytest.mark.asyncio
async def test_update_none_fields_unchanged(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
original = await _mist(
db_session, str(repo.repo_id), title="keep", tags=["x"]
)
updated = await update_mist(
db_session, original.mist_id, _OWNER, description="new desc"
)
assert updated is not None
assert updated.title == "keep"
assert updated.tags == ["x"]
assert updated.description == "new desc"
class TestIntegrationDelete:
"""delete_mist hard-deletes; owner guard blocks others."""
@pytest.mark.asyncio
async def test_delete_own_mist(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
m = await _mist(db_session, str(repo.repo_id))
ok = await delete_mist(db_session, m.mist_id, _OWNER)
assert ok is True
gone = await get_mist(db_session, m.mist_id)
assert gone is None
@pytest.mark.asyncio
async def test_delete_non_owner_fails(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
m = await _mist(db_session, str(repo.repo_id))
ok = await delete_mist(db_session, m.mist_id, _OTHER)
assert ok is False
still_there = await get_mist(db_session, m.mist_id)
assert still_there is not None
@pytest.mark.asyncio
async def test_delete_not_found_returns_false(self, db_session: AsyncSession) -> None:
ok = await delete_mist(db_session, "notexist000", _OWNER)
assert ok is False
class TestIntegrationCounters:
"""Atomic view/embed counter increments."""
@pytest.mark.asyncio
async def test_increment_view_count(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
m = await _mist(db_session, str(repo.repo_id))
await increment_mist_view(db_session, m.mist_id)
await increment_mist_view(db_session, m.mist_id)
await db_session.commit()
fetched = await get_mist(db_session, m.mist_id)
assert fetched is not None
assert fetched.view_count == 2
@pytest.mark.asyncio
async def test_increment_embed_count(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
m = await _mist(db_session, str(repo.repo_id))
await increment_mist_embed(db_session, m.mist_id)
await db_session.commit()
fetched = await get_mist(db_session, m.mist_id)
assert fetched is not None
assert fetched.embed_count == 1
@pytest.mark.asyncio
async def test_increment_nonexistent_noop(self, db_session: AsyncSession) -> None:
"""Incrementing a missing mist_id is a silent no-op."""
await increment_mist_view(db_session, "notexist000")
await increment_mist_embed(db_session, "notexist000")
class TestIntegrationForkList:
"""get_mist_forks returns direct children only."""
@pytest.mark.asyncio
async def test_forks_of_original(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
original = await _mist(db_session, str(repo.repo_id))
for _ in range(3):
fork_repo = await _repo(db_session, owner=_OTHER)
await fork_mist(
db_session,
original.mist_id,
new_mist_id=_mist_id(),
new_owner=_OTHER,
new_repo_id=str(fork_repo.repo_id),
)
forks = await get_mist_forks(db_session, original.mist_id)
assert len(forks) == 3
assert all(isinstance(f, MistListEntry) for f in forks)
assert all(f.fork_parent_id == original.mist_id for f in forks)
@pytest.mark.asyncio
async def test_no_forks_returns_empty(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
m = await _mist(db_session, str(repo.repo_id))
forks = await get_mist_forks(db_session, m.mist_id)
assert forks == []
# ===========================================================================
# Layer 3 — Edge Cases
# ===========================================================================
class TestEdgeCases:
"""Boundary and error conditions."""
@pytest.mark.asyncio
async def test_duplicate_mist_id_raises_integrity_error(
self, db_session: AsyncSession
) -> None:
repo = await _repo(db_session)
mid = _mist_id()
await create_mist(
db_session,
mist_id=mid,
filename="a.py",
content="x",
owner=_OWNER,
repo_id=str(repo.repo_id),
)
with pytest.raises(IntegrityError):
await create_mist(
db_session,
mist_id=mid,
filename="b.py",
content="y",
owner=_OWNER,
repo_id=str(repo.repo_id),
)
@pytest.mark.asyncio
async def test_fork_depth_limit_enforced(self, db_session: AsyncSession) -> None:
"""Fork chain at depth 5 cannot be forked further."""
from musehub.services.musehub_mists import _FORK_DEPTH_LIMIT
repo = await _repo(db_session)
current_id = (await _mist(db_session, str(repo.repo_id))).mist_id
for depth in range(_FORK_DEPTH_LIMIT):
fork_repo = await _repo(db_session, owner=_OTHER)
fork_id = _mist_id()
resp = await fork_mist(
db_session,
current_id,
new_mist_id=fork_id,
new_owner=_OTHER,
new_repo_id=str(fork_repo.repo_id),
)
assert resp is not None, f"Expected fork at depth {depth + 1} to succeed"
current_id = fork_id
# Now at depth 5 — next fork must be rejected
final_repo = await _repo(db_session, owner=_OTHER)
result = await fork_mist(
db_session,
current_id,
new_mist_id=_mist_id(),
new_owner=_OTHER,
new_repo_id=str(final_repo.repo_id),
)
assert result is None
@pytest.mark.asyncio
async def test_update_content_twice_increments_version_twice(
self, db_session: AsyncSession
) -> None:
repo = await _repo(db_session)
m = await _mist(db_session, str(repo.repo_id))
await update_mist(db_session, m.mist_id, _OWNER, content="v2")
await update_mist(db_session, m.mist_id, _OWNER, content="v3")
final = await get_mist(db_session, m.mist_id)
assert final is not None
assert final.version == 3
assert final.content == "v3"
@pytest.mark.asyncio
async def test_list_bad_cursor_ignored(self, db_session: AsyncSession) -> None:
"""A non-ISO-8601 cursor string is silently ignored (returns full list)."""
repo = await _repo(db_session)
await _mist(db_session, str(repo.repo_id))
await _mist(db_session, str(repo.repo_id))
result = await list_mists(db_session, _OWNER, cursor="not-a-date")
assert result.total == 2
@pytest.mark.asyncio
async def test_create_empty_tags_stored_as_list(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
m = await _mist(db_session, str(repo.repo_id))
assert m.tags == []
@pytest.mark.asyncio
async def test_create_with_secret_visibility(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
m = await _mist(db_session, str(repo.repo_id), visibility="secret")
assert m.visibility == "secret"
fetched = await get_mist(db_session, m.mist_id)
assert fetched is not None
assert fetched.visibility == "secret"
@pytest.mark.asyncio
async def test_mist_list_entry_primary_symbol_from_anchors(
self, db_session: AsyncSession
) -> None:
repo = await _repo(db_session)
anchors = ["utils.py::compute", "utils.py::clean"]
await _mist(db_session, str(repo.repo_id), symbol_anchors=anchors)
result = await list_mists(db_session, _OWNER)
assert result.total == 1
assert result.mists[0].primary_symbol == "utils.py::compute"
# ===========================================================================
# Layer 4 — Stress
# ===========================================================================
class TestStress:
"""Bulk create and list under load."""
@pytest.mark.asyncio
async def test_create_50_mists(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
for i in range(50):
await _mist(db_session, str(repo.repo_id), content=f"print({i})")
result = await list_mists(db_session, _OWNER, limit=50)
assert result.total == 50
assert len(result.mists) == 50
@pytest.mark.asyncio
async def test_five_level_fork_chain(self, db_session: AsyncSession) -> None:
"""Create a 5-level deep fork chain and verify depths."""
repo = await _repo(db_session)
root_id = (await _mist(db_session, str(repo.repo_id))).mist_id
current_id = root_id
for expected_depth in range(1, 6):
fork_repo = await _repo(db_session, owner=_OTHER)
fork_id = _mist_id()
resp = await fork_mist(
db_session,
current_id,
new_mist_id=fork_id,
new_owner=_OTHER,
new_repo_id=str(fork_repo.repo_id),
)
assert resp is not None
fork_row = await get_mist(db_session, fork_id)
assert fork_row is not None
assert fork_row.fork_depth == expected_depth
current_id = fork_id
@pytest.mark.asyncio
async def test_list_pagination_covers_all_pages(self, db_session: AsyncSession) -> None:
"""Paginate through 25 mists with page size 10."""
repo = await _repo(db_session)
for i in range(25):
await _mist(db_session, str(repo.repo_id), content=f"item {i}")
collected: list[MistListEntry] = []
cursor = None
while True:
page = await list_mists(db_session, _OWNER, limit=10, cursor=cursor)
collected.extend(page.mists)
if page.next_cursor is None:
break
cursor = page.next_cursor
assert len(collected) == 25
# ===========================================================================
# Layer 5 — Data Integrity
# ===========================================================================
class TestDataIntegrity:
"""Counters are independent; list total is consistent after delete."""
@pytest.mark.asyncio
async def test_total_decrements_after_delete(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
m1 = await _mist(db_session, str(repo.repo_id))
await _mist(db_session, str(repo.repo_id))
await delete_mist(db_session, m1.mist_id, _OWNER)
result = await list_mists(db_session, _OWNER)
assert result.total == 1
@pytest.mark.asyncio
async def test_counters_independent_per_mist(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
m1 = await _mist(db_session, str(repo.repo_id))
m2 = await _mist(db_session, str(repo.repo_id))
await increment_mist_view(db_session, m1.mist_id)
await increment_mist_embed(db_session, m2.mist_id)
await db_session.commit()
r1 = await get_mist(db_session, m1.mist_id)
r2 = await get_mist(db_session, m2.mist_id)
assert r1 is not None and r2 is not None
assert r1.view_count == 1 and r1.embed_count == 0
assert r2.view_count == 0 and r2.embed_count == 1
@pytest.mark.asyncio
async def test_fork_inherits_parent_visibility(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
original = await _mist(db_session, str(repo.repo_id), visibility="secret")
fork_repo = await _repo(db_session, owner=_OTHER)
fork_id = _mist_id()
await fork_mist(
db_session,
original.mist_id,
new_mist_id=fork_id,
new_owner=_OTHER,
new_repo_id=str(fork_repo.repo_id),
)
fork = await get_mist(db_session, fork_id)
assert fork is not None
assert fork.visibility == "secret"
@pytest.mark.asyncio
async def test_fork_count_multi(self, db_session: AsyncSession) -> None:
"""fork_count on original equals number of forks created."""
repo = await _repo(db_session)
original = await _mist(db_session, str(repo.repo_id))
for _ in range(4):
fr = await _repo(db_session, owner=_OTHER)
await fork_mist(
db_session,
original.mist_id,
new_mist_id=_mist_id(),
new_owner=_OTHER,
new_repo_id=str(fr.repo_id),
)
await db_session.commit()
source = await get_mist(db_session, original.mist_id)
assert source is not None
assert source.fork_count == 4
@pytest.mark.asyncio
async def test_update_tags_replaces_list(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
m = await _mist(db_session, str(repo.repo_id), tags=["a", "b"])
updated = await update_mist(db_session, m.mist_id, _OWNER, tags=["c"])
assert updated is not None
assert updated.tags == ["c"]
# ===========================================================================
# Layer 6 — Performance
# ===========================================================================
class TestPerformance:
"""Bulk operations must complete within generous thresholds."""
@pytest.mark.asyncio
async def test_list_50_mists_under_500ms(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
for i in range(50):
await _mist(db_session, str(repo.repo_id), content=f"c{i}")
t0 = time.perf_counter()
result = await list_mists(db_session, _OWNER, limit=50)
elapsed = time.perf_counter() - t0
assert result.total == 50
assert elapsed < 0.5, f"list_mists took {elapsed:.3f}s — too slow"
@pytest.mark.asyncio
async def test_get_100_times_under_500ms(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
m = await _mist(db_session, str(repo.repo_id))
t0 = time.perf_counter()
for _ in range(100):
await get_mist(db_session, m.mist_id)
elapsed = time.perf_counter() - t0
assert elapsed < 0.5, f"100x get_mist took {elapsed:.3f}s"
# ===========================================================================
# Layer 7 — Security
# ===========================================================================
class TestSecurity:
"""Owner guard and visibility rules are enforced."""
@pytest.mark.asyncio
async def test_update_by_non_owner_rejected(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
m = await _mist(db_session, str(repo.repo_id))
result = await update_mist(db_session, m.mist_id, "attacker", content="evil")
assert result is None
# Content must be unchanged
original = await get_mist(db_session, m.mist_id)
assert original is not None
assert original.content == m.content
@pytest.mark.asyncio
async def test_delete_by_non_owner_rejected(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
m = await _mist(db_session, str(repo.repo_id))
ok = await delete_mist(db_session, m.mist_id, "attacker")
assert ok is False
assert await get_mist(db_session, m.mist_id) is not None
@pytest.mark.asyncio
async def test_secret_mist_hidden_from_explore(self, db_session: AsyncSession) -> None:
repo = await _repo(db_session)
await _mist(db_session, str(repo.repo_id), visibility="secret")
result = await list_mists(db_session, owner=None, include_secret=False)
assert result.total == 0
@pytest.mark.asyncio
async def test_fork_depth_limit_prevents_deep_chain(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_mists import _FORK_DEPTH_LIMIT
repo = await _repo(db_session)
current_id = (await _mist(db_session, str(repo.repo_id))).mist_id
for _ in range(_FORK_DEPTH_LIMIT):
fr = await _repo(db_session, owner=_OTHER)
resp = await fork_mist(
db_session,
current_id,
new_mist_id=_mist_id(),
new_owner=_OTHER,
new_repo_id=str(fr.repo_id),
)
assert resp is not None
current_id = resp.mist_id
# Attempt to exceed the limit
final_repo = await _repo(db_session, owner=_OTHER)
over_limit = await fork_mist(
db_session,
current_id,
new_mist_id=_mist_id(),
new_owner=_OTHER,
new_repo_id=str(final_repo.repo_id),
)
assert over_limit is None
@pytest.mark.asyncio
async def test_secret_visible_only_with_include_secret(
self, db_session: AsyncSession
) -> None:
repo = await _repo(db_session)
await _mist(db_session, str(repo.repo_id), visibility="secret")
hidden = await list_mists(db_session, _OWNER, include_secret=False)
visible = await list_mists(db_session, _OWNER, include_secret=True)
assert hidden.total == 0
assert visible.total == 1
# ===========================================================================
# Layer 8 — Docstrings / API surface
# ===========================================================================
class TestDocstrings:
"""Public API surface has docstrings."""
def test_service_functions_have_docstrings(self) -> None:
import musehub.services.musehub_mists as svc
fns = [
svc.create_mist,
svc.get_mist,
svc.list_mists,
svc.fork_mist,
svc.update_mist,
svc.delete_mist,
svc.increment_mist_view,
svc.increment_mist_embed,
svc.get_mist_forks,
]
missing = [f.__name__ for f in fns if not (f.__doc__ or "").strip()]
assert missing == [], f"Service functions missing docstrings: {missing}"
def test_pydantic_models_have_docstrings(self) -> None:
import musehub.models.mists as m
models = [
m.MistResponse,
m.MistListEntry,
m.MistListResponse,
m.MistCreateRequest,
m.MistUpdateRequest,
m.MistForkResponse,
m.MistEmbedResponse,
]
missing = [cls.__name__ for cls in models if not (cls.__doc__ or "").strip()]
assert missing == [], f"Pydantic models missing docstrings: {missing}"
def test_orm_model_has_tablename(self) -> None:
assert MusehubMist.__tablename__ == "musehub_mists"
def test_orm_model_has_expected_columns(self) -> None:
expected = {
"mist_id", "repo_id", "owner", "artifact_type", "language",
"filename", "title", "description", "content", "size_bytes",
"commit_id", "snapshot_id", "version", "agent_id", "model_id",
"gpg_signature", "fork_parent_id", "fork_depth", "fork_count",
"view_count", "embed_count", "visibility", "tags",
"symbol_anchors", "created_at", "updated_at",
}
actual = {c.key for c in MusehubMist.__table__.columns}
assert expected == actual
def test_fork_depth_limit_constant(self) -> None:
from musehub.services.musehub_mists import _FORK_DEPTH_LIMIT
assert _FORK_DEPTH_LIMIT == 5