gabriel / musehub public
test_mist_models_service.py python
1,290 lines 44.7 KB
Raw
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
1 """Section 15 — Mists: 8-layer test suite.
2
3 Tests span the ORM model (MusehubMist), the seven Pydantic wire models in
4 musehub.models.mists, and the nine service functions in
5 musehub.services.musehub_mists.
6
7 Layer 1 Unit
8 - TestUnitPydanticModels: MistResponse, MistListEntry, MistListResponse,
9 MistCreateRequest, MistUpdateRequest, MistForkResponse, MistEmbedResponse
10 construction and field defaults.
11 - TestUnitValidators: MistCreateRequest.validate_visibility,
12 validate_tags (count, length, null-byte, HTML-special),
13 validate_filename (delegates to validate_mist_filename).
14
15 Layer 2 Integration
16 - TestIntegrationCreate: create_mist persists all fields, returns MistResponse.
17 - TestIntegrationGet: get_mist returns populated response or None.
18 - TestIntegrationList: list_mists — owner filter, secret exclusion, artifact_type
19 filter, total counter, next_cursor pagination.
20 - TestIntegrationFork: fork_mist copies content, sets fork_parent_id,
21 increments source fork_count atomically.
22 - TestIntegrationUpdate: update_mist patches fields, owner guard.
23 - TestIntegrationDelete: delete_mist hard-delete, owner guard.
24 - TestIntegrationCounters: increment_mist_view / increment_mist_embed atomic.
25 - TestIntegrationForkList: get_mist_forks returns direct children.
26
27 Layer 3 Edge Cases
28 - TestEdgeCases: create duplicate mist_id → IntegrityError; fork with None
29 parent → None; fork depth limit; update content increments version;
30 list with bad cursor string is ignored.
31
32 Layer 4 Stress
33 - TestStress: 50 mists created, list returns expected totals; 5-level fork chain.
34
35 Layer 5 Data Integrity
36 - TestDataIntegrity: list total stays consistent after delete; counters are
37 independent per mist; fork inherits tags and symbol_anchors; fork keeps
38 parent visibility.
39
40 Layer 6 Performance
41 - TestPerformance: list 50 mists <500ms; count 100 forks <500ms.
42
43 Layer 7 Security
44 - TestSecurity: update/delete rejected for non-owner; secret mists hidden from
45 public list; fork depth limit blocks chain attacks.
46
47 Layer 8 Docstrings / API
48 - TestDocstrings: every public service function has a docstring; every Pydantic
49 model has a class docstring.
50 """
51
52 from __future__ import annotations
53
54 import secrets
55 import time
56
57 import pytest
58 from sqlalchemy.exc import IntegrityError
59 from sqlalchemy.ext.asyncio import AsyncSession
60
61 from datetime import datetime, timezone
62 from musehub.core.genesis import compute_identity_id, compute_repo_id
63 from musehub.db.musehub_repo_models import MusehubMist, MusehubRepo
64 from musehub.models.mists import (
65 MistCreateRequest,
66 MistEmbedResponse,
67 MistForkResponse,
68 MistListEntry,
69 MistListResponse,
70 MistResponse,
71 MistUpdateRequest,
72 )
73 from musehub.services.musehub_mists import (
74 create_mist,
75 delete_mist,
76 fork_mist,
77 get_mist,
78 get_mist_forks,
79 increment_mist_embed,
80 increment_mist_view,
81 list_mists,
82 update_mist,
83 )
84
85 # ===========================================================================
86 # Helpers
87 # ===========================================================================
88
89 _OWNER = "gabriel"
90 _OTHER = "alice"
91
92
93 def _uid() -> str:
94 return secrets.token_hex(16)
95
96
97 def _mist_id() -> str:
98 return secrets.token_hex(6)
99
100
101 async def _repo(
102 session: AsyncSession,
103 slug: str | None = None,
104 owner: str = _OWNER,
105 visibility: str = "public",
106 ) -> MusehubRepo:
107 slug = slug or _uid()
108 created_at = datetime.now(tz=timezone.utc)
109 owner_id = compute_identity_id(owner.encode())
110 repo = MusehubRepo(
111 repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()),
112 name=slug,
113 owner=owner,
114 slug=slug,
115 visibility=visibility,
116 owner_user_id=owner_id,
117 created_at=created_at,
118 updated_at=created_at,
119 )
120 session.add(repo)
121 await session.flush()
122 await session.refresh(repo)
123 return repo
124
125
126 async def _mist(
127 session: AsyncSession,
128 repo_id: str,
129 *,
130 mist_id: str | None = None,
131 owner: str = _OWNER,
132 filename: str = "hello.py",
133 content: str = "print('hello')",
134 artifact_type: str = "code",
135 language: str = "python",
136 visibility: str = "public",
137 tags: list[str] | None = None,
138 symbol_anchors: list[str] | None = None,
139 title: str = "",
140 description: str = "",
141 ) -> MistResponse:
142 return await create_mist(
143 session,
144 mist_id=mist_id or _mist_id(),
145 filename=filename,
146 content=content,
147 owner=owner,
148 repo_id=repo_id,
149 artifact_type=artifact_type,
150 language=language,
151 size_bytes=len(content.encode()),
152 visibility=visibility,
153 tags=tags or [],
154 symbol_anchors=symbol_anchors or [],
155 title=title,
156 description=description,
157 )
158
159
160 # ===========================================================================
161 # Layer 1 — Unit: Pydantic models
162 # ===========================================================================
163
164
165 class TestUnitPydanticModels:
166 """MistResponse and friends construct with expected defaults."""
167
168 def test_mist_response_required_fields(self) -> None:
169 from datetime import datetime, timezone
170
171 now = datetime.now(tz=timezone.utc)
172 resp = MistResponse(
173 mist_id="abc123def456",
174 owner="gabriel",
175 artifact_type="code",
176 filename="script.py",
177 created_at=now,
178 updated_at=now,
179 )
180 assert resp.mist_id == "abc123def456"
181 assert resp.owner == "gabriel"
182 assert resp.content == ""
183 assert resp.signed is False
184 assert resp.fork_parent_id is None
185 assert resp.fork_depth == 0
186 assert resp.visibility == "public"
187 assert resp.tags == []
188 assert resp.symbol_anchors == []
189
190 def test_mist_list_entry_primary_symbol_none(self) -> None:
191 from datetime import datetime, timezone
192
193 now = datetime.now(tz=timezone.utc)
194 entry = MistListEntry(
195 mist_id="abc123def456",
196 owner="gabriel",
197 artifact_type="code",
198 filename="essay.md",
199 created_at=now,
200 updated_at=now,
201 )
202 assert entry.primary_symbol is None
203 assert entry.language == ""
204 assert entry.title == ""
205
206 def test_mist_list_response_defaults(self) -> None:
207 resp = MistListResponse()
208 assert resp.total == 0
209 assert resp.next_cursor is None
210 assert resp.mists == []
211
212 def test_mist_create_request_minimal(self) -> None:
213 req = MistCreateRequest(filename="score.mid", content="binary")
214 assert req.visibility == "public"
215 assert req.tags == []
216 assert req.agent_id == ""
217 assert req.gpg_signature is None
218
219 def test_mist_update_request_all_none(self) -> None:
220 req = MistUpdateRequest()
221 assert req.title is None
222 assert req.content is None
223 assert req.visibility is None
224
225 def test_mist_fork_response_fields(self) -> None:
226 from datetime import datetime, timezone
227
228 now = datetime.now(tz=timezone.utc)
229 resp = MistForkResponse(
230 mist_id="fork11111111",
231 owner="alice",
232 fork_parent_id="orig11111111",
233 artifact_type="code",
234 filename="main.py",
235 created_at=now,
236 )
237 assert resp.url == ""
238 assert resp.language == ""
239
240 def test_mist_embed_response_fields(self) -> None:
241 resp = MistEmbedResponse(
242 mist_id="abc123def456",
243 owner="gabriel",
244 iframe="<iframe/>",
245 js="<script/>",
246 badge="[![Mist](url)](link)",
247 )
248 assert resp.mist_id == "abc123def456"
249
250 def test_mist_response_camel_alias(self) -> None:
251 from datetime import datetime, timezone
252
253 now = datetime.now(tz=timezone.utc)
254 resp = MistResponse(
255 mist_id="abc123def456",
256 owner="gabriel",
257 artifact_type="code",
258 filename="main.py",
259 created_at=now,
260 updated_at=now,
261 fork_parent_id="parent11111",
262 symbol_anchors=["main.py::foo"],
263 )
264 d = resp.model_dump(by_alias=True)
265 assert "mistId" in d
266 assert "forkParentId" in d
267 assert "symbolAnchors" in d
268
269
270 class TestUnitValidators:
271 """MistCreateRequest validators reject bad input."""
272
273 def test_visibility_invalid(self) -> None:
274 with pytest.raises(Exception):
275 MistCreateRequest(filename="f.py", content="x", visibility="private")
276
277 def test_visibility_valid_values(self) -> None:
278 MistCreateRequest(filename="f.py", content="x", visibility="public")
279 MistCreateRequest(filename="f.py", content="x", visibility="secret")
280
281 def test_too_many_tags(self) -> None:
282 with pytest.raises(Exception):
283 MistCreateRequest(
284 filename="f.py",
285 content="x",
286 tags=["t"] * 11,
287 )
288
289 def test_tag_too_long(self) -> None:
290 with pytest.raises(Exception):
291 MistCreateRequest(
292 filename="f.py",
293 content="x",
294 tags=["a" * 65],
295 )
296
297 def test_tag_null_byte(self) -> None:
298 with pytest.raises(Exception):
299 MistCreateRequest(
300 filename="f.py",
301 content="x",
302 tags=["bad\x00tag"],
303 )
304
305 def test_tag_html_special(self) -> None:
306 for ch in ("<", ">", '"', "'", "&"):
307 with pytest.raises(Exception):
308 MistCreateRequest(
309 filename="f.py",
310 content="x",
311 tags=[f"tag{ch}val"],
312 )
313
314 def test_tag_valid(self) -> None:
315 req = MistCreateRequest(
316 filename="f.py",
317 content="x",
318 tags=["python", "audio", "ai"],
319 )
320 assert len(req.tags) == 3
321
322 def test_filename_traversal_rejected(self) -> None:
323 with pytest.raises(Exception):
324 MistCreateRequest(filename="../etc/passwd", content="x")
325
326 def test_filename_path_separator_rejected(self) -> None:
327 with pytest.raises(Exception):
328 MistCreateRequest(filename="a/b.py", content="x")
329
330 def test_filename_null_byte_rejected(self) -> None:
331 with pytest.raises(Exception):
332 MistCreateRequest(filename="f\x00ile.py", content="x")
333
334 def test_update_visibility_invalid(self) -> None:
335 with pytest.raises(Exception):
336 MistUpdateRequest(visibility="admin")
337
338 def test_update_visibility_none_allowed(self) -> None:
339 req = MistUpdateRequest(visibility=None)
340 assert req.visibility is None
341
342
343 # ===========================================================================
344 # Layer 2 — Integration
345 # ===========================================================================
346
347
348 class TestIntegrationCreate:
349 """create_mist persists a row and returns MistResponse."""
350
351 @pytest.mark.asyncio
352 async def test_create_stores_all_fields(self, db_session: AsyncSession) -> None:
353 repo = await _repo(db_session)
354 mid = _mist_id()
355 resp = await create_mist(
356 db_session,
357 mist_id=mid,
358 filename="score.mid",
359 content="MThd...",
360 owner=_OWNER,
361 repo_id=str(repo.repo_id),
362 artifact_type="midi",
363 language="",
364 size_bytes=7,
365 title="My Score",
366 description="A midi score",
367 visibility="public",
368 tags=["music", "midi"],
369 symbol_anchors=[],
370 agent_id="agent-1",
371 model_id="claude-sonnet-4-6",
372 gpg_signature="SIG",
373 )
374 assert resp.mist_id == mid
375 assert resp.filename == "score.mid"
376 assert resp.artifact_type == "midi"
377 assert resp.title == "My Score"
378 assert resp.description == "A midi score"
379 assert resp.tags == ["music", "midi"]
380 assert resp.signed is True
381 assert resp.agent_id == "agent-1"
382 assert resp.model_id == "claude-sonnet-4-6"
383 assert resp.fork_depth == 0
384 assert resp.fork_count == 0
385 assert resp.view_count == 0
386 assert resp.version == 1
387
388 @pytest.mark.asyncio
389 async def test_create_defaults(self, db_session: AsyncSession) -> None:
390 repo = await _repo(db_session)
391 resp = await create_mist(
392 db_session,
393 mist_id=_mist_id(),
394 filename="minimal.txt",
395 content="hello",
396 owner=_OWNER,
397 repo_id=str(repo.repo_id),
398 )
399 assert resp.artifact_type == "unknown"
400 assert resp.language == ""
401 assert resp.title == ""
402 assert resp.description == ""
403 assert resp.visibility == "public"
404 assert resp.tags == []
405 assert resp.signed is False
406
407 @pytest.mark.asyncio
408 async def test_create_returns_mist_response(self, db_session: AsyncSession) -> None:
409 repo = await _repo(db_session)
410 resp = await _mist(db_session, str(repo.repo_id))
411 assert isinstance(resp, MistResponse)
412 assert resp.created_at is not None
413 assert resp.updated_at is not None
414
415 @pytest.mark.asyncio
416 async def test_create_with_base_url(self, db_session: AsyncSession) -> None:
417 repo = await _repo(db_session)
418 mid = _mist_id()
419 resp = await create_mist(
420 db_session,
421 mist_id=mid,
422 filename="f.py",
423 content="x",
424 owner=_OWNER,
425 repo_id=str(repo.repo_id),
426 base_url="https://musehub.ai",
427 )
428 assert resp.url == f"https://musehub.ai/{_OWNER}/mists/{mid}"
429
430 @pytest.mark.asyncio
431 async def test_create_symbol_anchors_stored(self, db_session: AsyncSession) -> None:
432 repo = await _repo(db_session)
433 anchors = ["main.py::foo", "main.py::bar"]
434 resp = await _mist(
435 db_session,
436 str(repo.repo_id),
437 symbol_anchors=anchors,
438 )
439 assert resp.symbol_anchors == anchors
440
441
442 class TestIntegrationGet:
443 """get_mist returns MistResponse or None."""
444
445 @pytest.mark.asyncio
446 async def test_get_existing(self, db_session: AsyncSession) -> None:
447 repo = await _repo(db_session)
448 created = await _mist(db_session, str(repo.repo_id))
449 fetched = await get_mist(db_session, created.mist_id)
450 assert fetched is not None
451 assert fetched.mist_id == created.mist_id
452 assert fetched.content == created.content
453
454 @pytest.mark.asyncio
455 async def test_get_not_found(self, db_session: AsyncSession) -> None:
456 result = await get_mist(db_session, "notexist000")
457 assert result is None
458
459 @pytest.mark.asyncio
460 async def test_get_with_base_url(self, db_session: AsyncSession) -> None:
461 repo = await _repo(db_session)
462 created = await _mist(db_session, str(repo.repo_id))
463 fetched = await get_mist(db_session, created.mist_id, base_url="https://musehub.ai")
464 assert fetched is not None
465 assert fetched.url.startswith("https://musehub.ai/")
466
467
468 class TestIntegrationList:
469 """list_mists — filters, totals, pagination."""
470
471 @pytest.mark.asyncio
472 async def test_list_by_owner(self, db_session: AsyncSession) -> None:
473 r1 = await _repo(db_session, owner=_OWNER)
474 r2 = await _repo(db_session, owner=_OTHER)
475 for _ in range(3):
476 await _mist(db_session, str(r1.repo_id), owner=_OWNER)
477 for _ in range(2):
478 await _mist(db_session, str(r2.repo_id), owner=_OTHER)
479
480 result = await list_mists(db_session, _OWNER)
481 assert result.total == 3
482 assert all(e.owner == _OWNER for e in result.mists)
483
484 @pytest.mark.asyncio
485 async def test_list_secret_hidden_by_default(self, db_session: AsyncSession) -> None:
486 repo = await _repo(db_session)
487 await _mist(db_session, str(repo.repo_id), visibility="public")
488 await _mist(db_session, str(repo.repo_id), visibility="secret")
489
490 result = await list_mists(db_session, _OWNER, include_secret=False)
491 assert result.total == 1
492 assert all(e.visibility == "public" for e in result.mists)
493
494 @pytest.mark.asyncio
495 async def test_list_secret_visible_when_owner(self, db_session: AsyncSession) -> None:
496 repo = await _repo(db_session)
497 await _mist(db_session, str(repo.repo_id), visibility="public")
498 await _mist(db_session, str(repo.repo_id), visibility="secret")
499
500 result = await list_mists(db_session, _OWNER, include_secret=True)
501 assert result.total == 2
502
503 @pytest.mark.asyncio
504 async def test_list_artifact_type_filter(self, db_session: AsyncSession) -> None:
505 repo = await _repo(db_session)
506 await _mist(db_session, str(repo.repo_id), artifact_type="code")
507 await _mist(db_session, str(repo.repo_id), artifact_type="midi")
508 await _mist(db_session, str(repo.repo_id), artifact_type="midi")
509
510 result = await list_mists(db_session, _OWNER, artifact_type="midi")
511 assert result.total == 2
512 assert all(e.artifact_type == "midi" for e in result.mists)
513
514 @pytest.mark.asyncio
515 async def test_list_pagination_next_cursor(self, db_session: AsyncSession) -> None:
516 repo = await _repo(db_session)
517 for _ in range(5):
518 await _mist(db_session, str(repo.repo_id))
519
520 page1 = await list_mists(db_session, _OWNER, limit=3)
521 assert len(page1.mists) == 3
522 assert page1.next_cursor is not None
523
524 page2 = await list_mists(db_session, _OWNER, limit=3, cursor=page1.next_cursor)
525 assert len(page2.mists) == 2
526 assert page2.next_cursor is None
527
528 @pytest.mark.asyncio
529 async def test_list_global_explore(self, db_session: AsyncSession) -> None:
530 r1 = await _repo(db_session, owner=_OWNER)
531 r2 = await _repo(db_session, owner=_OTHER)
532 await _mist(db_session, str(r1.repo_id), owner=_OWNER)
533 await _mist(db_session, str(r2.repo_id), owner=_OTHER, visibility="secret")
534
535 # explore — owner=None, include_secret=False → only 1 public
536 result = await list_mists(db_session, owner=None, include_secret=False)
537 assert result.total == 1
538
539 @pytest.mark.asyncio
540 async def test_list_newest_first(self, db_session: AsyncSession) -> None:
541 import asyncio
542
543 repo = await _repo(db_session)
544 for i in range(3):
545 await _mist(db_session, str(repo.repo_id), content=f"v{i}")
546 await asyncio.sleep(0.01)
547
548 result = await list_mists(db_session, _OWNER, limit=10)
549 dates = [e.created_at for e in result.mists]
550 assert dates == sorted(dates, reverse=True)
551
552
553 class TestIntegrationFork:
554 """fork_mist creates a copy with correct linkage."""
555
556 @pytest.mark.asyncio
557 async def test_fork_creates_linked_copy(self, db_session: AsyncSession) -> None:
558 repo = await _repo(db_session)
559 original = await _mist(db_session, str(repo.repo_id), tags=["a", "b"])
560 fork_repo = await _repo(db_session, owner=_OTHER)
561 fork_id = _mist_id()
562
563 resp = await fork_mist(
564 db_session,
565 original.mist_id,
566 new_mist_id=fork_id,
567 new_owner=_OTHER,
568 new_repo_id=str(fork_repo.repo_id),
569 )
570 assert resp is not None
571 assert resp.mist_id == fork_id
572 assert resp.fork_parent_id == original.mist_id
573 assert resp.owner == _OTHER
574
575 @pytest.mark.asyncio
576 async def test_fork_increments_source_fork_count(self, db_session: AsyncSession) -> None:
577 repo = await _repo(db_session)
578 original = await _mist(db_session, str(repo.repo_id))
579 fork_repo = await _repo(db_session, owner=_OTHER)
580
581 await fork_mist(
582 db_session,
583 original.mist_id,
584 new_mist_id=_mist_id(),
585 new_owner=_OTHER,
586 new_repo_id=str(fork_repo.repo_id),
587 )
588 await db_session.commit()
589
590 source = await get_mist(db_session, original.mist_id)
591 assert source is not None
592 assert source.fork_count == 1
593
594 @pytest.mark.asyncio
595 async def test_fork_inherits_content(self, db_session: AsyncSession) -> None:
596 repo = await _repo(db_session)
597 original = await _mist(
598 db_session,
599 str(repo.repo_id),
600 content="def foo(): pass",
601 symbol_anchors=["main.py::foo"],
602 tags=["ai"],
603 )
604 fork_repo = await _repo(db_session, owner=_OTHER)
605 fork_id = _mist_id()
606 await fork_mist(
607 db_session,
608 original.mist_id,
609 new_mist_id=fork_id,
610 new_owner=_OTHER,
611 new_repo_id=str(fork_repo.repo_id),
612 )
613 fork = await get_mist(db_session, fork_id)
614 assert fork is not None
615 assert fork.content == "def foo(): pass"
616 assert fork.symbol_anchors == ["main.py::foo"]
617 assert fork.tags == ["ai"]
618
619 @pytest.mark.asyncio
620 async def test_fork_increments_depth(self, db_session: AsyncSession) -> None:
621 repo = await _repo(db_session)
622 original = await _mist(db_session, str(repo.repo_id))
623 fork_repo = await _repo(db_session, owner=_OTHER)
624 fork_id = _mist_id()
625
626 resp = await fork_mist(
627 db_session,
628 original.mist_id,
629 new_mist_id=fork_id,
630 new_owner=_OTHER,
631 new_repo_id=str(fork_repo.repo_id),
632 )
633 assert resp is not None
634 fork = await get_mist(db_session, fork_id)
635 assert fork is not None
636 assert fork.fork_depth == 1
637
638 @pytest.mark.asyncio
639 async def test_fork_nonexistent_returns_none(self, db_session: AsyncSession) -> None:
640 repo = await _repo(db_session)
641 result = await fork_mist(
642 db_session,
643 "doesnotexist",
644 new_mist_id=_mist_id(),
645 new_owner=_OTHER,
646 new_repo_id=str(repo.repo_id),
647 )
648 assert result is None
649
650
651 class TestIntegrationUpdate:
652 """update_mist patches fields; owner guard blocks others."""
653
654 @pytest.mark.asyncio
655 async def test_update_title(self, db_session: AsyncSession) -> None:
656 repo = await _repo(db_session)
657 original = await _mist(db_session, str(repo.repo_id), title="old")
658
659 updated = await update_mist(
660 db_session, original.mist_id, _OWNER, title="new title"
661 )
662 assert updated is not None
663 assert updated.title == "new title"
664
665 @pytest.mark.asyncio
666 async def test_update_visibility(self, db_session: AsyncSession) -> None:
667 repo = await _repo(db_session)
668 original = await _mist(db_session, str(repo.repo_id), visibility="public")
669
670 updated = await update_mist(
671 db_session, original.mist_id, _OWNER, visibility="secret"
672 )
673 assert updated is not None
674 assert updated.visibility == "secret"
675
676 @pytest.mark.asyncio
677 async def test_update_content_increments_version(self, db_session: AsyncSession) -> None:
678 repo = await _repo(db_session)
679 original = await _mist(db_session, str(repo.repo_id), content="v1")
680
681 updated = await update_mist(
682 db_session, original.mist_id, _OWNER, content="v2"
683 )
684 assert updated is not None
685 assert updated.content == "v2"
686 assert updated.version == 2
687
688 @pytest.mark.asyncio
689 async def test_update_non_owner_returns_none(self, db_session: AsyncSession) -> None:
690 repo = await _repo(db_session)
691 original = await _mist(db_session, str(repo.repo_id))
692
693 result = await update_mist(
694 db_session, original.mist_id, _OTHER, title="hacked"
695 )
696 assert result is None
697
698 @pytest.mark.asyncio
699 async def test_update_not_found_returns_none(self, db_session: AsyncSession) -> None:
700 result = await update_mist(
701 db_session, "notexist000", _OWNER, title="x"
702 )
703 assert result is None
704
705 @pytest.mark.asyncio
706 async def test_update_filename(self, db_session: AsyncSession) -> None:
707 repo = await _repo(db_session)
708 original = await _mist(db_session, str(repo.repo_id), filename="foo.md")
709
710 updated = await update_mist(
711 db_session, original.mist_id, _OWNER, filename="object_store_details.md"
712 )
713 assert updated is not None
714 assert updated.filename == "object_store_details.md"
715
716 @pytest.mark.asyncio
717 async def test_update_filename_with_content(self, db_session: AsyncSession) -> None:
718 repo = await _repo(db_session)
719 original = await _mist(db_session, str(repo.repo_id), filename="foo.md", content="v1")
720
721 updated = await update_mist(
722 db_session, original.mist_id, _OWNER,
723 filename="renamed.md", content="v2"
724 )
725 assert updated is not None
726 assert updated.filename == "renamed.md"
727 assert updated.content == "v2"
728 assert updated.version == 2
729
730 @pytest.mark.asyncio
731 async def test_update_filename_none_leaves_unchanged(self, db_session: AsyncSession) -> None:
732 repo = await _repo(db_session)
733 original = await _mist(db_session, str(repo.repo_id), filename="original.py")
734
735 updated = await update_mist(
736 db_session, original.mist_id, _OWNER, title="new title"
737 )
738 assert updated is not None
739 assert updated.filename == "original.py"
740
741 @pytest.mark.asyncio
742 async def test_update_none_fields_unchanged(self, db_session: AsyncSession) -> None:
743 repo = await _repo(db_session)
744 original = await _mist(
745 db_session, str(repo.repo_id), title="keep", tags=["x"]
746 )
747
748 updated = await update_mist(
749 db_session, original.mist_id, _OWNER, description="new desc"
750 )
751 assert updated is not None
752 assert updated.title == "keep"
753 assert updated.tags == ["x"]
754 assert updated.description == "new desc"
755
756
757 class TestIntegrationDelete:
758 """delete_mist hard-deletes; owner guard blocks others."""
759
760 @pytest.mark.asyncio
761 async def test_delete_own_mist(self, db_session: AsyncSession) -> None:
762 repo = await _repo(db_session)
763 m = await _mist(db_session, str(repo.repo_id))
764
765 ok = await delete_mist(db_session, m.mist_id, _OWNER)
766 assert ok is True
767
768 gone = await get_mist(db_session, m.mist_id)
769 assert gone is None
770
771 @pytest.mark.asyncio
772 async def test_delete_non_owner_fails(self, db_session: AsyncSession) -> None:
773 repo = await _repo(db_session)
774 m = await _mist(db_session, str(repo.repo_id))
775
776 ok = await delete_mist(db_session, m.mist_id, _OTHER)
777 assert ok is False
778 still_there = await get_mist(db_session, m.mist_id)
779 assert still_there is not None
780
781 @pytest.mark.asyncio
782 async def test_delete_not_found_returns_false(self, db_session: AsyncSession) -> None:
783 ok = await delete_mist(db_session, "notexist000", _OWNER)
784 assert ok is False
785
786
787 class TestIntegrationCounters:
788 """Atomic view/embed counter increments."""
789
790 @pytest.mark.asyncio
791 async def test_increment_view_count(self, db_session: AsyncSession) -> None:
792 repo = await _repo(db_session)
793 m = await _mist(db_session, str(repo.repo_id))
794
795 await increment_mist_view(db_session, m.mist_id)
796 await increment_mist_view(db_session, m.mist_id)
797 await db_session.commit()
798
799 fetched = await get_mist(db_session, m.mist_id)
800 assert fetched is not None
801 assert fetched.view_count == 2
802
803 @pytest.mark.asyncio
804 async def test_increment_embed_count(self, db_session: AsyncSession) -> None:
805 repo = await _repo(db_session)
806 m = await _mist(db_session, str(repo.repo_id))
807
808 await increment_mist_embed(db_session, m.mist_id)
809 await db_session.commit()
810
811 fetched = await get_mist(db_session, m.mist_id)
812 assert fetched is not None
813 assert fetched.embed_count == 1
814
815 @pytest.mark.asyncio
816 async def test_increment_nonexistent_noop(self, db_session: AsyncSession) -> None:
817 """Incrementing a missing mist_id is a silent no-op."""
818 await increment_mist_view(db_session, "notexist000")
819 await increment_mist_embed(db_session, "notexist000")
820
821
822 class TestIntegrationForkList:
823 """get_mist_forks returns direct children only."""
824
825 @pytest.mark.asyncio
826 async def test_forks_of_original(self, db_session: AsyncSession) -> None:
827 repo = await _repo(db_session)
828 original = await _mist(db_session, str(repo.repo_id))
829
830 for _ in range(3):
831 fork_repo = await _repo(db_session, owner=_OTHER)
832 await fork_mist(
833 db_session,
834 original.mist_id,
835 new_mist_id=_mist_id(),
836 new_owner=_OTHER,
837 new_repo_id=str(fork_repo.repo_id),
838 )
839
840 forks = await get_mist_forks(db_session, original.mist_id)
841 assert len(forks) == 3
842 assert all(isinstance(f, MistListEntry) for f in forks)
843 assert all(f.fork_parent_id == original.mist_id for f in forks)
844
845 @pytest.mark.asyncio
846 async def test_no_forks_returns_empty(self, db_session: AsyncSession) -> None:
847 repo = await _repo(db_session)
848 m = await _mist(db_session, str(repo.repo_id))
849 forks = await get_mist_forks(db_session, m.mist_id)
850 assert forks == []
851
852
853 # ===========================================================================
854 # Layer 3 — Edge Cases
855 # ===========================================================================
856
857
858 class TestEdgeCases:
859 """Boundary and error conditions."""
860
861 @pytest.mark.asyncio
862 async def test_duplicate_mist_id_raises_integrity_error(
863 self, db_session: AsyncSession
864 ) -> None:
865 repo = await _repo(db_session)
866 mid = _mist_id()
867 await create_mist(
868 db_session,
869 mist_id=mid,
870 filename="a.py",
871 content="x",
872 owner=_OWNER,
873 repo_id=str(repo.repo_id),
874 )
875 with pytest.raises(IntegrityError):
876 await create_mist(
877 db_session,
878 mist_id=mid,
879 filename="b.py",
880 content="y",
881 owner=_OWNER,
882 repo_id=str(repo.repo_id),
883 )
884
885 @pytest.mark.asyncio
886 async def test_fork_depth_limit_enforced(self, db_session: AsyncSession) -> None:
887 """Fork chain at depth 5 cannot be forked further."""
888 from musehub.services.musehub_mists import _FORK_DEPTH_LIMIT
889
890 repo = await _repo(db_session)
891 current_id = (await _mist(db_session, str(repo.repo_id))).mist_id
892
893 for depth in range(_FORK_DEPTH_LIMIT):
894 fork_repo = await _repo(db_session, owner=_OTHER)
895 fork_id = _mist_id()
896 resp = await fork_mist(
897 db_session,
898 current_id,
899 new_mist_id=fork_id,
900 new_owner=_OTHER,
901 new_repo_id=str(fork_repo.repo_id),
902 )
903 assert resp is not None, f"Expected fork at depth {depth + 1} to succeed"
904 current_id = fork_id
905
906 # Now at depth 5 — next fork must be rejected
907 final_repo = await _repo(db_session, owner=_OTHER)
908 result = await fork_mist(
909 db_session,
910 current_id,
911 new_mist_id=_mist_id(),
912 new_owner=_OTHER,
913 new_repo_id=str(final_repo.repo_id),
914 )
915 assert result is None
916
917 @pytest.mark.asyncio
918 async def test_update_content_twice_increments_version_twice(
919 self, db_session: AsyncSession
920 ) -> None:
921 repo = await _repo(db_session)
922 m = await _mist(db_session, str(repo.repo_id))
923
924 await update_mist(db_session, m.mist_id, _OWNER, content="v2")
925 await update_mist(db_session, m.mist_id, _OWNER, content="v3")
926
927 final = await get_mist(db_session, m.mist_id)
928 assert final is not None
929 assert final.version == 3
930 assert final.content == "v3"
931
932 @pytest.mark.asyncio
933 async def test_list_bad_cursor_ignored(self, db_session: AsyncSession) -> None:
934 """A non-ISO-8601 cursor string is silently ignored (returns full list)."""
935 repo = await _repo(db_session)
936 await _mist(db_session, str(repo.repo_id))
937 await _mist(db_session, str(repo.repo_id))
938
939 result = await list_mists(db_session, _OWNER, cursor="not-a-date")
940 assert result.total == 2
941
942 @pytest.mark.asyncio
943 async def test_create_empty_tags_stored_as_list(self, db_session: AsyncSession) -> None:
944 repo = await _repo(db_session)
945 m = await _mist(db_session, str(repo.repo_id))
946 assert m.tags == []
947
948 @pytest.mark.asyncio
949 async def test_create_with_secret_visibility(self, db_session: AsyncSession) -> None:
950 repo = await _repo(db_session)
951 m = await _mist(db_session, str(repo.repo_id), visibility="secret")
952 assert m.visibility == "secret"
953 fetched = await get_mist(db_session, m.mist_id)
954 assert fetched is not None
955 assert fetched.visibility == "secret"
956
957 @pytest.mark.asyncio
958 async def test_mist_list_entry_primary_symbol_from_anchors(
959 self, db_session: AsyncSession
960 ) -> None:
961 repo = await _repo(db_session)
962 anchors = ["utils.py::compute", "utils.py::clean"]
963 await _mist(db_session, str(repo.repo_id), symbol_anchors=anchors)
964
965 result = await list_mists(db_session, _OWNER)
966 assert result.total == 1
967 assert result.mists[0].primary_symbol == "utils.py::compute"
968
969
970 # ===========================================================================
971 # Layer 4 — Stress
972 # ===========================================================================
973
974
975 class TestStress:
976 """Bulk create and list under load."""
977
978 @pytest.mark.asyncio
979 async def test_create_50_mists(self, db_session: AsyncSession) -> None:
980 repo = await _repo(db_session)
981 for i in range(50):
982 await _mist(db_session, str(repo.repo_id), content=f"print({i})")
983
984 result = await list_mists(db_session, _OWNER, limit=50)
985 assert result.total == 50
986 assert len(result.mists) == 50
987
988 @pytest.mark.asyncio
989 async def test_five_level_fork_chain(self, db_session: AsyncSession) -> None:
990 """Create a 5-level deep fork chain and verify depths."""
991 repo = await _repo(db_session)
992 root_id = (await _mist(db_session, str(repo.repo_id))).mist_id
993 current_id = root_id
994
995 for expected_depth in range(1, 6):
996 fork_repo = await _repo(db_session, owner=_OTHER)
997 fork_id = _mist_id()
998 resp = await fork_mist(
999 db_session,
1000 current_id,
1001 new_mist_id=fork_id,
1002 new_owner=_OTHER,
1003 new_repo_id=str(fork_repo.repo_id),
1004 )
1005 assert resp is not None
1006 fork_row = await get_mist(db_session, fork_id)
1007 assert fork_row is not None
1008 assert fork_row.fork_depth == expected_depth
1009 current_id = fork_id
1010
1011 @pytest.mark.asyncio
1012 async def test_list_pagination_covers_all_pages(self, db_session: AsyncSession) -> None:
1013 """Paginate through 25 mists with page size 10."""
1014 repo = await _repo(db_session)
1015 for i in range(25):
1016 await _mist(db_session, str(repo.repo_id), content=f"item {i}")
1017
1018 collected: list[MistListEntry] = []
1019 cursor = None
1020 while True:
1021 page = await list_mists(db_session, _OWNER, limit=10, cursor=cursor)
1022 collected.extend(page.mists)
1023 if page.next_cursor is None:
1024 break
1025 cursor = page.next_cursor
1026
1027 assert len(collected) == 25
1028
1029
1030 # ===========================================================================
1031 # Layer 5 — Data Integrity
1032 # ===========================================================================
1033
1034
1035 class TestDataIntegrity:
1036 """Counters are independent; list total is consistent after delete."""
1037
1038 @pytest.mark.asyncio
1039 async def test_total_decrements_after_delete(self, db_session: AsyncSession) -> None:
1040 repo = await _repo(db_session)
1041 m1 = await _mist(db_session, str(repo.repo_id))
1042 await _mist(db_session, str(repo.repo_id))
1043
1044 await delete_mist(db_session, m1.mist_id, _OWNER)
1045
1046 result = await list_mists(db_session, _OWNER)
1047 assert result.total == 1
1048
1049 @pytest.mark.asyncio
1050 async def test_counters_independent_per_mist(self, db_session: AsyncSession) -> None:
1051 repo = await _repo(db_session)
1052 m1 = await _mist(db_session, str(repo.repo_id))
1053 m2 = await _mist(db_session, str(repo.repo_id))
1054
1055 await increment_mist_view(db_session, m1.mist_id)
1056 await increment_mist_embed(db_session, m2.mist_id)
1057 await db_session.commit()
1058
1059 r1 = await get_mist(db_session, m1.mist_id)
1060 r2 = await get_mist(db_session, m2.mist_id)
1061 assert r1 is not None and r2 is not None
1062 assert r1.view_count == 1 and r1.embed_count == 0
1063 assert r2.view_count == 0 and r2.embed_count == 1
1064
1065 @pytest.mark.asyncio
1066 async def test_fork_inherits_parent_visibility(self, db_session: AsyncSession) -> None:
1067 repo = await _repo(db_session)
1068 original = await _mist(db_session, str(repo.repo_id), visibility="secret")
1069 fork_repo = await _repo(db_session, owner=_OTHER)
1070 fork_id = _mist_id()
1071
1072 await fork_mist(
1073 db_session,
1074 original.mist_id,
1075 new_mist_id=fork_id,
1076 new_owner=_OTHER,
1077 new_repo_id=str(fork_repo.repo_id),
1078 )
1079
1080 fork = await get_mist(db_session, fork_id)
1081 assert fork is not None
1082 assert fork.visibility == "secret"
1083
1084 @pytest.mark.asyncio
1085 async def test_fork_count_multi(self, db_session: AsyncSession) -> None:
1086 """fork_count on original equals number of forks created."""
1087 repo = await _repo(db_session)
1088 original = await _mist(db_session, str(repo.repo_id))
1089
1090 for _ in range(4):
1091 fr = await _repo(db_session, owner=_OTHER)
1092 await fork_mist(
1093 db_session,
1094 original.mist_id,
1095 new_mist_id=_mist_id(),
1096 new_owner=_OTHER,
1097 new_repo_id=str(fr.repo_id),
1098 )
1099 await db_session.commit()
1100
1101 source = await get_mist(db_session, original.mist_id)
1102 assert source is not None
1103 assert source.fork_count == 4
1104
1105 @pytest.mark.asyncio
1106 async def test_update_tags_replaces_list(self, db_session: AsyncSession) -> None:
1107 repo = await _repo(db_session)
1108 m = await _mist(db_session, str(repo.repo_id), tags=["a", "b"])
1109
1110 updated = await update_mist(db_session, m.mist_id, _OWNER, tags=["c"])
1111 assert updated is not None
1112 assert updated.tags == ["c"]
1113
1114
1115 # ===========================================================================
1116 # Layer 6 — Performance
1117 # ===========================================================================
1118
1119
1120 class TestPerformance:
1121 """Bulk operations must complete within generous thresholds."""
1122
1123 @pytest.mark.asyncio
1124 async def test_list_50_mists_under_500ms(self, db_session: AsyncSession) -> None:
1125 repo = await _repo(db_session)
1126 for i in range(50):
1127 await _mist(db_session, str(repo.repo_id), content=f"c{i}")
1128
1129 t0 = time.perf_counter()
1130 result = await list_mists(db_session, _OWNER, limit=50)
1131 elapsed = time.perf_counter() - t0
1132
1133 assert result.total == 50
1134 assert elapsed < 0.5, f"list_mists took {elapsed:.3f}s — too slow"
1135
1136 @pytest.mark.asyncio
1137 async def test_get_100_times_under_500ms(self, db_session: AsyncSession) -> None:
1138 repo = await _repo(db_session)
1139 m = await _mist(db_session, str(repo.repo_id))
1140
1141 t0 = time.perf_counter()
1142 for _ in range(100):
1143 await get_mist(db_session, m.mist_id)
1144 elapsed = time.perf_counter() - t0
1145
1146 assert elapsed < 0.5, f"100x get_mist took {elapsed:.3f}s"
1147
1148
1149 # ===========================================================================
1150 # Layer 7 — Security
1151 # ===========================================================================
1152
1153
1154 class TestSecurity:
1155 """Owner guard and visibility rules are enforced."""
1156
1157 @pytest.mark.asyncio
1158 async def test_update_by_non_owner_rejected(self, db_session: AsyncSession) -> None:
1159 repo = await _repo(db_session)
1160 m = await _mist(db_session, str(repo.repo_id))
1161
1162 result = await update_mist(db_session, m.mist_id, "attacker", content="evil")
1163 assert result is None
1164
1165 # Content must be unchanged
1166 original = await get_mist(db_session, m.mist_id)
1167 assert original is not None
1168 assert original.content == m.content
1169
1170 @pytest.mark.asyncio
1171 async def test_delete_by_non_owner_rejected(self, db_session: AsyncSession) -> None:
1172 repo = await _repo(db_session)
1173 m = await _mist(db_session, str(repo.repo_id))
1174
1175 ok = await delete_mist(db_session, m.mist_id, "attacker")
1176 assert ok is False
1177 assert await get_mist(db_session, m.mist_id) is not None
1178
1179 @pytest.mark.asyncio
1180 async def test_secret_mist_hidden_from_explore(self, db_session: AsyncSession) -> None:
1181 repo = await _repo(db_session)
1182 await _mist(db_session, str(repo.repo_id), visibility="secret")
1183
1184 result = await list_mists(db_session, owner=None, include_secret=False)
1185 assert result.total == 0
1186
1187 @pytest.mark.asyncio
1188 async def test_fork_depth_limit_prevents_deep_chain(
1189 self, db_session: AsyncSession
1190 ) -> None:
1191 from musehub.services.musehub_mists import _FORK_DEPTH_LIMIT
1192
1193 repo = await _repo(db_session)
1194 current_id = (await _mist(db_session, str(repo.repo_id))).mist_id
1195
1196 for _ in range(_FORK_DEPTH_LIMIT):
1197 fr = await _repo(db_session, owner=_OTHER)
1198 resp = await fork_mist(
1199 db_session,
1200 current_id,
1201 new_mist_id=_mist_id(),
1202 new_owner=_OTHER,
1203 new_repo_id=str(fr.repo_id),
1204 )
1205 assert resp is not None
1206 current_id = resp.mist_id
1207
1208 # Attempt to exceed the limit
1209 final_repo = await _repo(db_session, owner=_OTHER)
1210 over_limit = await fork_mist(
1211 db_session,
1212 current_id,
1213 new_mist_id=_mist_id(),
1214 new_owner=_OTHER,
1215 new_repo_id=str(final_repo.repo_id),
1216 )
1217 assert over_limit is None
1218
1219 @pytest.mark.asyncio
1220 async def test_secret_visible_only_with_include_secret(
1221 self, db_session: AsyncSession
1222 ) -> None:
1223 repo = await _repo(db_session)
1224 await _mist(db_session, str(repo.repo_id), visibility="secret")
1225
1226 hidden = await list_mists(db_session, _OWNER, include_secret=False)
1227 visible = await list_mists(db_session, _OWNER, include_secret=True)
1228
1229 assert hidden.total == 0
1230 assert visible.total == 1
1231
1232
1233 # ===========================================================================
1234 # Layer 8 — Docstrings / API surface
1235 # ===========================================================================
1236
1237
1238 class TestDocstrings:
1239 """Public API surface has docstrings."""
1240
1241 def test_service_functions_have_docstrings(self) -> None:
1242 import musehub.services.musehub_mists as svc
1243
1244 fns = [
1245 svc.create_mist,
1246 svc.get_mist,
1247 svc.list_mists,
1248 svc.fork_mist,
1249 svc.update_mist,
1250 svc.delete_mist,
1251 svc.increment_mist_view,
1252 svc.increment_mist_embed,
1253 svc.get_mist_forks,
1254 ]
1255 missing = [f.__name__ for f in fns if not (f.__doc__ or "").strip()]
1256 assert missing == [], f"Service functions missing docstrings: {missing}"
1257
1258 def test_pydantic_models_have_docstrings(self) -> None:
1259 import musehub.models.mists as m
1260
1261 models = [
1262 m.MistResponse,
1263 m.MistListEntry,
1264 m.MistListResponse,
1265 m.MistCreateRequest,
1266 m.MistUpdateRequest,
1267 m.MistForkResponse,
1268 m.MistEmbedResponse,
1269 ]
1270 missing = [cls.__name__ for cls in models if not (cls.__doc__ or "").strip()]
1271 assert missing == [], f"Pydantic models missing docstrings: {missing}"
1272
1273 def test_orm_model_has_tablename(self) -> None:
1274 assert MusehubMist.__tablename__ == "musehub_mists"
1275
1276 def test_orm_model_has_expected_columns(self) -> None:
1277 expected = {
1278 "mist_id", "repo_id", "owner", "artifact_type", "language",
1279 "filename", "title", "description", "content", "size_bytes",
1280 "commit_id", "snapshot_id", "version", "agent_id", "model_id",
1281 "gpg_signature", "fork_parent_id", "fork_depth", "fork_count",
1282 "view_count", "embed_count", "visibility", "tags",
1283 "symbol_anchors", "created_at", "updated_at",
1284 }
1285 actual = {c.key for c in MusehubMist.__table__.columns}
1286 assert expected == actual
1287
1288 def test_fork_depth_limit_constant(self) -> None:
1289 from musehub.services.musehub_mists import _FORK_DEPTH_LIMIT
1290 assert _FORK_DEPTH_LIMIT == 5
File History 3 commits
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As… Human 1 day ago
sha256:6b1949fc2797ca4c1936a637a4cbfec828ef56cf52398a2e74ca3c4f494e728f fix: use wire_bytes not mpack_bytes_raw in compute_object_b… Sonnet 4.6 patch 10 days ago
sha256:4aed3d8601c8dd3ed37074de35f11f4a9699a0a4b99d43727048fd3f8e6fd13d chore: doc sweep, ignore wrangler build state, misc fixes Sonnet 4.6 minor 12 days ago