"""Section 19 — Mist CLI layer tests (all eight tiers). Covers the three new CLI subcommands added by issue #10: update PATCH /api/mists/{id} — partial update of metadata / content forks GET /api/mists/{id}/forks — list direct forks raw GET /api/mists/{id}/raw — raw artifact bytes download Each test class is labelled with its tier so the suite mirrors the project's standard eight-tier structure: Tier 1 Unit — pure-function / argparse logic, no I/O Tier 2 Schema — HTTP request/response body shape assertions Tier 3 DB state — database contents after CLI-driven mutation Tier 4 Stress — concurrent or volume requests Tier 5 Integration — full HTTP round-trip via AsyncClient Tier 6 Performance — latency assertions Tier 7 Security — auth enforcement, access-control boundaries Tier 8 Docstrings — every new public symbol has a docstring """ from __future__ import annotations import asyncio import inspect import pathlib import secrets import time import pytest import pytest_asyncio from httpx import AsyncClient from sqlalchemy.ext.asyncio import AsyncSession from musehub.mcp.write_tools.mists import execute_create_mist from musehub.services.musehub_mcp_executor import ( execute_list_mist_forks, execute_read_mist_raw, ) from collections.abc import Callable from musehub.types.json_types import JSONObject, JSONValue, StrDict _OWNER = "testuser" # matches conftest._TEST_HANDLE _OTHER = "otheruser" # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _unique_content() -> str: """Return content that is unique across test runs.""" return f"# cli test\nvalue = {secrets.token_hex(16)!r}\n" async def _create_mist( owner: str = _OWNER, visibility: str = "public", content: str | None = None, filename: str | None = None, ) -> str: """Create a mist via the MCP executor and return its mist_id.""" result = await execute_create_mist( filename=filename or f"cli_{secrets.token_hex(4)}.py", content=content or _unique_content(), actor=owner, visibility=visibility, ) assert result.ok, f"create_mist failed: {result.error_message}" return str(result.data["mist_id"]) async def _post_fork(client: AsyncClient, auth_headers: StrDict, mist_id: str) -> str: """Fork a mist via the REST API and return the new mist_id.""" r = await client.post(f"/api/mists/{mist_id}/fork", headers=auth_headers) assert r.status_code == 201, r.text return str(r.json()["mistId"]) # ═══════════════════════════════════════════════════════════════════════════════ # Tier 1 — Unit (pure logic, no I/O) # ═══════════════════════════════════════════════════════════════════════════════ class TestUnitUpdate: """Tier 1: argument-level logic for the update subcommand.""" def test_update_tags_splits_on_comma(self) -> None: """Comma-separated tags produce a list of trimmed strings.""" raw = "security, auth, v2" tags = [t.strip() for t in raw.split(",") if t.strip()] assert tags == ["security", "auth", "v2"] def test_update_empty_tags_string_yields_empty_list(self) -> None: """An empty or whitespace-only tags string produces no tags.""" raw = " " tags = [t.strip() for t in raw.split(",") if t.strip()] assert tags == [] def test_update_valid_visibilities(self) -> None: """Only 'public' and 'secret' are accepted visibility values.""" from muse.plugins.mist.plugin import MIST_VISIBILITIES assert "public" in MIST_VISIBILITIES assert "secret" in MIST_VISIBILITIES assert "private" not in MIST_VISIBILITIES def test_update_payload_excludes_none_fields(self) -> None: """Fields left as None must not appear in the PATCH payload.""" title = "hello" description = None visibility = None payload: JSONObject = {} if title is not None: payload["title"] = title if description is not None: payload["description"] = description if visibility is not None: payload["visibility"] = visibility assert "title" in payload assert "description" not in payload assert "visibility" not in payload class TestUnitForks: """Tier 1: argument-level logic for the forks subcommand.""" def test_forks_limit_clamped_to_max_100(self) -> None: """Limits above 100 are clamped server-side; client clamps locally.""" user_limit = 999 clamped = max(1, min(user_limit, 100)) assert clamped == 100 def test_forks_limit_clamped_to_min_1(self) -> None: """Limits below 1 are raised to 1.""" user_limit = 0 clamped = max(1, min(user_limit, 100)) assert clamped == 1 class TestUnitRaw: """Tier 1: argument-level logic for the raw subcommand.""" def test_raw_accepts_owner_slash_id_format(self) -> None: """'owner/id' format is split correctly.""" mist_id = "gabriel/aB3xKq9dPwNm" if "/" in mist_id: id_part = mist_id.split("/", 1)[1].strip() else: id_part = mist_id assert id_part == "aB3xKq9dPwNm" def test_raw_plain_id_format_unchanged(self) -> None: """A bare 12-char ID is passed through as-is.""" mist_id = "aB3xKq9dPwNm" if "/" in mist_id: id_part = mist_id.split("/", 1)[1].strip() else: id_part = mist_id assert id_part == "aB3xKq9dPwNm" # ═══════════════════════════════════════════════════════════════════════════════ # Tier 2 — Schema (HTTP request/response shape) # ═══════════════════════════════════════════════════════════════════════════════ class TestSchemaUpdate: """Tier 2: PATCH /api/mists/{id} request and response body shape.""" @pytest.mark.anyio async def test_update_response_contains_mist_id( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """PATCH response always carries mistId.""" mid = await _create_mist() r = await client.patch( f"/api/mists/{mid}", json={"title": "Schema check"}, headers=auth_headers, ) assert r.status_code == 200 body = r.json() assert "mistId" in body @pytest.mark.anyio async def test_update_partial_body_only_changes_named_fields( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """Omitted fields are NOT reset to null or empty.""" mid = await _create_mist() # Set initial title and tags. await client.patch( f"/api/mists/{mid}", json={"title": "Original", "tags": ["a", "b"]}, headers=auth_headers, ) # Update only title — tags must survive. r = await client.patch( f"/api/mists/{mid}", json={"title": "Revised"}, headers=auth_headers, ) assert r.status_code == 200 r2 = await client.get(f"/api/mists/{mid}") assert r2.status_code == 200 assert r2.json()["tags"] == ["a", "b"] @pytest.mark.anyio async def test_update_content_increments_version( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """Updating content bumps the version counter by 1.""" mid = await _create_mist() r0 = await client.get(f"/api/mists/{mid}") initial_version = r0.json()["version"] r = await client.patch( f"/api/mists/{mid}", json={"content": f"# new version\nvalue = {secrets.token_hex(16)!r}\n"}, headers=auth_headers, ) assert r.status_code == 200 assert r.json()["version"] == initial_version + 1 class TestSchemaForks: """Tier 2: GET /api/mists/{id}/forks response shape.""" @pytest.mark.anyio async def test_forks_response_is_list( self, client: AsyncClient, db_session: AsyncSession ) -> None: """An unfollowed mist returns an empty list, not null.""" mid = await _create_mist() r = await client.get(f"/api/mists/{mid}/forks") assert r.status_code == 200 assert isinstance(r.json(), list) @pytest.mark.anyio async def test_forks_entry_shape( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """Each fork entry carries the expected keys.""" mid = await _create_mist() await _post_fork(client, auth_headers, mid) r = await client.get(f"/api/mists/{mid}/forks") assert r.status_code == 200 fork = r.json()[0] for key in ("mistId", "owner", "filename", "forkDepth", "createdAt"): assert key in fork, f"Missing key: {key}" class TestSchemaRaw: """Tier 2: GET /api/mists/{id}/raw response headers and body.""" @pytest.mark.anyio async def test_raw_content_disposition_contains_filename( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Content-Disposition header includes the original filename.""" mid = await _create_mist(filename="mymodule.py") r = await client.get(f"/api/mists/{mid}/raw") assert r.status_code == 200 cd = r.headers.get("content-disposition", "") assert "mymodule.py" in cd @pytest.mark.anyio async def test_raw_content_type_code_is_text_plain( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Python code artifacts are served as text/plain.""" mid = await _create_mist(filename="validate.py") r = await client.get(f"/api/mists/{mid}/raw") assert r.status_code == 200 assert "text/plain" in r.headers.get("content-type", "") @pytest.mark.anyio async def test_raw_body_matches_stored_content( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Raw body bytes equal the UTF-8 encoding of the stored content.""" content = f"def hello(): return {secrets.token_hex(16)!r}\n" mid = await _create_mist(content=content, filename="hello.py") r = await client.get(f"/api/mists/{mid}/raw") assert r.status_code == 200 assert r.content == content.encode("utf-8") # ═══════════════════════════════════════════════════════════════════════════════ # Tier 3 — DB state (database contents after mutation) # ═══════════════════════════════════════════════════════════════════════════════ class TestDbStateUpdate: """Tier 3: verify DB state after CLI-driven update operations.""" @pytest.mark.anyio async def test_update_title_persisted( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """Updated title is readable back via GET.""" mid = await _create_mist() await client.patch( f"/api/mists/{mid}", json={"title": "DB title check"}, headers=auth_headers ) r = await client.get(f"/api/mists/{mid}") assert r.json()["title"] == "DB title check" @pytest.mark.anyio async def test_update_visibility_persisted( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """Updated visibility is readable back via GET (owner only for secret).""" mid = await _create_mist() await client.patch( f"/api/mists/{mid}", json={"visibility": "secret"}, headers=auth_headers ) r = await client.get(f"/api/mists/{mid}", headers=auth_headers) assert r.json()["visibility"] == "secret" @pytest.mark.anyio async def test_update_tags_replaced_atomically( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """Tags update replaces the full list, not appends.""" mid = await _create_mist() await client.patch( f"/api/mists/{mid}", json={"tags": ["old"]}, headers=auth_headers ) await client.patch( f"/api/mists/{mid}", json={"tags": ["new1", "new2"]}, headers=auth_headers ) r = await client.get(f"/api/mists/{mid}") assert sorted(r.json()["tags"]) == ["new1", "new2"] @pytest.mark.anyio async def test_update_content_updates_size_bytes( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """size_bytes is recalculated after content update.""" short_content = "x = 1\n" mid = await _create_mist(content=short_content) long_content = "x = 1\n" + "# " + "a" * 500 + "\n" await client.patch( f"/api/mists/{mid}", json={"content": long_content}, headers=auth_headers ) r = await client.get(f"/api/mists/{mid}") assert r.json()["sizeBytes"] == len(long_content.encode("utf-8")) # ═══════════════════════════════════════════════════════════════════════════════ # Tier 4 — Stress # ═══════════════════════════════════════════════════════════════════════════════ class TestStressUpdate: """Tier 4: concurrent updates on distinct mists.""" @pytest.mark.anyio async def test_10_concurrent_updates_all_succeed( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """10 concurrent title updates on different mists all return 200.""" mids = [await _create_mist() for _ in range(10)] async def _update(mid: str) -> int: r = await client.patch( f"/api/mists/{mid}", json={"title": f"concurrent-{mid}"}, headers=auth_headers, ) return r.status_code statuses = await asyncio.gather(*[_update(m) for m in mids]) assert all(s == 200 for s in statuses), statuses class TestStressForks: """Tier 4: fork list on a parent with many children.""" @pytest.mark.anyio async def test_list_forks_15_children( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """A parent with 15 forks returns all 15 in a single page.""" mid = await _create_mist() for _ in range(15): await _post_fork(client, auth_headers, mid) r = await client.get(f"/api/mists/{mid}/forks?limit=100") assert r.status_code == 200 assert len(r.json()) == 15 class TestStressRaw: """Tier 4: concurrent raw downloads.""" @pytest.mark.anyio async def test_10_concurrent_raw_downloads( self, client: AsyncClient, db_session: AsyncSession ) -> None: """10 concurrent raw downloads of the same mist all return 200.""" mid = await _create_mist() async def _get() -> int: r = await client.get(f"/api/mists/{mid}/raw") return r.status_code statuses = await asyncio.gather(*[_get() for _ in range(10)]) assert all(s == 200 for s in statuses), statuses # ═══════════════════════════════════════════════════════════════════════════════ # Tier 5 — Integration (full HTTP round-trips) # ═══════════════════════════════════════════════════════════════════════════════ class TestIntegrationUpdate: """Tier 5: full PATCH round-trips.""" @pytest.mark.anyio async def test_update_title_roundtrip( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: mid = await _create_mist() r = await client.patch( f"/api/mists/{mid}", json={"title": "Round-trip title"}, headers=auth_headers ) assert r.status_code == 200 assert r.json()["title"] == "Round-trip title" @pytest.mark.anyio async def test_update_unknown_mist_returns_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: r = await client.patch( "/api/mists/doesNotExist1", json={"title": "x"}, headers=auth_headers ) assert r.status_code == 404 @pytest.mark.anyio async def test_update_non_owner_returns_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """Non-owner update returns 404 (not 403, to avoid leaking existence).""" mid = await _create_mist(owner=_OTHER) r = await client.patch( f"/api/mists/{mid}", json={"title": "stolen"}, headers=auth_headers ) assert r.status_code == 404 class TestIntegrationForks: """Tier 5: full GET /api/mists/{id}/forks round-trips.""" @pytest.mark.anyio async def test_forks_empty_on_root( self, client: AsyncClient, db_session: AsyncSession ) -> None: mid = await _create_mist() r = await client.get(f"/api/mists/{mid}/forks") assert r.status_code == 200 assert r.json() == [] @pytest.mark.anyio async def test_forks_after_one_fork( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: mid = await _create_mist() fork_id = await _post_fork(client, auth_headers, mid) r = await client.get(f"/api/mists/{mid}/forks") assert r.status_code == 200 ids = [f["mistId"] for f in r.json()] assert fork_id in ids @pytest.mark.anyio async def test_forks_unknown_parent_returns_404( self, client: AsyncClient, db_session: AsyncSession ) -> None: r = await client.get("/api/mists/doesNotExist1/forks") assert r.status_code == 404 class TestIntegrationRaw: """Tier 5: full GET /api/mists/{id}/raw round-trips.""" @pytest.mark.anyio async def test_raw_public_mist_returns_content( self, client: AsyncClient, db_session: AsyncSession ) -> None: content = f"def hello(): return {secrets.token_hex(16)!r}\n" mid = await _create_mist(content=content) r = await client.get(f"/api/mists/{mid}/raw") assert r.status_code == 200 assert r.content == content.encode("utf-8") @pytest.mark.anyio async def test_raw_unknown_mist_returns_404( self, client: AsyncClient, db_session: AsyncSession ) -> None: r = await client.get("/api/mists/doesNotExist1/raw") assert r.status_code == 404 # ═══════════════════════════════════════════════════════════════════════════════ # Tier 6 — Performance # ═══════════════════════════════════════════════════════════════════════════════ class TestPerformanceRaw: """Tier 6: raw download latency for a small artifact.""" @pytest.mark.anyio async def test_raw_1kb_under_200ms( self, client: AsyncClient, db_session: AsyncSession ) -> None: """A 1 KiB artifact should respond in under 200 ms.""" content = "x = 1\n" * 170 # ~1 KiB mid = await _create_mist(content=content) start = time.monotonic() r = await client.get(f"/api/mists/{mid}/raw") elapsed = time.monotonic() - start assert r.status_code == 200 assert elapsed < 0.2, f"Raw took {elapsed:.3f}s — expected < 200ms" class TestPerformanceForkList: """Tier 6: fork list latency.""" @pytest.mark.anyio async def test_forks_10_under_500ms( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """Listing 10 forks should respond in under 500 ms.""" mid = await _create_mist() for _ in range(10): await _post_fork(client, auth_headers, mid) start = time.monotonic() r = await client.get(f"/api/mists/{mid}/forks?limit=100") elapsed = time.monotonic() - start assert r.status_code == 200 assert elapsed < 0.5, f"Fork list took {elapsed:.3f}s — expected < 500ms" # ═══════════════════════════════════════════════════════════════════════════════ # Tier 7 — Security # ═══════════════════════════════════════════════════════════════════════════════ class TestSecurityUpdate: """Tier 7: auth enforcement on PATCH /api/mists/{id}.""" @pytest.mark.anyio async def test_update_without_auth_returns_401( self, client: AsyncClient, db_session: AsyncSession ) -> None: """PATCH without Authorization header returns 401.""" mid = await _create_mist() r = await client.patch(f"/api/mists/{mid}", json={"title": "x"}) assert r.status_code == 401 @pytest.mark.anyio async def test_update_non_owner_returns_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """testuser cannot update a mist owned by otheruser.""" mid = await _create_mist(owner=_OTHER) r = await client.patch( f"/api/mists/{mid}", json={"title": "hijack"}, headers=auth_headers ) assert r.status_code == 404 class TestSecurityRaw: """Tier 7: access control on GET /api/mists/{id}/raw.""" @pytest.mark.anyio async def test_raw_public_mist_no_auth_returns_200( self, client: AsyncClient, db_session: AsyncSession ) -> None: mid = await _create_mist(visibility="public") r = await client.get(f"/api/mists/{mid}/raw") assert r.status_code == 200 @pytest.mark.anyio async def test_raw_secret_mist_non_owner_returns_403( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """testuser cannot download raw bytes from otheruser's secret mist.""" mid = await _create_mist(owner=_OTHER, visibility="secret") r = await client.get(f"/api/mists/{mid}/raw", headers=auth_headers) assert r.status_code == 403 @pytest.mark.anyio async def test_raw_secret_mist_unauthenticated_returns_403( self, client: AsyncClient, db_session: AsyncSession ) -> None: """Anonymous callers cannot download a secret mist's raw bytes.""" mid = await _create_mist(owner=_OTHER, visibility="secret") r = await client.get(f"/api/mists/{mid}/raw") assert r.status_code == 403 class TestSecurityForks: """Tier 7: access control on GET /api/mists/{id}/forks.""" @pytest.mark.anyio async def test_forks_of_public_mist_visible_to_anonymous( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """Anyone can see forks of a public mist — no auth required.""" mid = await _create_mist(visibility="public") await _post_fork(client, auth_headers, mid) r = await client.get(f"/api/mists/{mid}/forks") assert r.status_code == 200 assert len(r.json()) >= 1 @pytest.mark.anyio async def test_forks_of_secret_mist_non_owner_returns_403( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession ) -> None: """testuser cannot list forks of otheruser's secret mist.""" mid = await _create_mist(owner=_OTHER, visibility="secret") r = await client.get(f"/api/mists/{mid}/forks", headers=auth_headers) assert r.status_code == 403 # ═══════════════════════════════════════════════════════════════════════════════ # Tier 7 (MCP executor) — Security via executor functions # ═══════════════════════════════════════════════════════════════════════════════ class TestSecurityMcpExecutors: """Tier 7: access control in the MCP executor layer.""" @pytest.mark.anyio async def test_list_forks_missing_mist_id_returns_error( self, db_session: AsyncSession ) -> None: result = await execute_list_mist_forks("") assert result.ok is False assert result.error_code == "missing_args" @pytest.mark.anyio async def test_list_forks_unknown_mist_returns_not_found( self, db_session: AsyncSession ) -> None: result = await execute_list_mist_forks("doesNotExist1") assert result.ok is False assert result.error_code == "not_found" @pytest.mark.anyio async def test_list_forks_secret_non_owner_returns_forbidden( self, db_session: AsyncSession ) -> None: mid = await _create_mist(owner=_OTHER, visibility="secret") result = await execute_list_mist_forks(mid, actor="bob") assert result.ok is False assert result.error_code == "forbidden" @pytest.mark.anyio async def test_list_forks_owner_can_list_secret_forks( self, db_session: AsyncSession ) -> None: mid = await _create_mist(owner=_OTHER, visibility="secret") result = await execute_list_mist_forks(mid, actor=_OTHER) assert result.ok is True assert result.data["mist_id"] == mid @pytest.mark.anyio async def test_raw_missing_mist_id_returns_error( self, db_session: AsyncSession ) -> None: result = await execute_read_mist_raw("") assert result.ok is False assert result.error_code == "missing_args" @pytest.mark.anyio async def test_raw_unknown_mist_returns_not_found( self, db_session: AsyncSession ) -> None: result = await execute_read_mist_raw("doesNotExist1") assert result.ok is False assert result.error_code == "not_found" @pytest.mark.anyio async def test_raw_secret_non_owner_returns_forbidden( self, db_session: AsyncSession ) -> None: mid = await _create_mist(owner=_OTHER, visibility="secret") result = await execute_read_mist_raw(mid, actor="bob") assert result.ok is False assert result.error_code == "forbidden" @pytest.mark.anyio async def test_raw_public_mist_anonymous_returns_content( self, db_session: AsyncSession ) -> None: content = f"def public(): return {secrets.token_hex(16)!r}\n" mid = await _create_mist(content=content, visibility="public") result = await execute_read_mist_raw(mid, actor="") assert result.ok is True assert result.data["content"] == content # ═══════════════════════════════════════════════════════════════════════════════ # Tier 8 — Docstrings # ═══════════════════════════════════════════════════════════════════════════════ class TestDocstrings: """Tier 8: every new public symbol has a non-empty Google-style docstring.""" def _assert_doc(self, obj: Callable[..., object], name: str) -> None: doc = inspect.getdoc(obj) assert doc, f"{name} has no docstring" assert len(doc) > 20, f"{name} docstring is too short: {doc!r}" def test_run_update_has_docstring(self) -> None: from muse.cli.commands.mist import run_update self._assert_doc(run_update, "run_update") def test_run_forks_has_docstring(self) -> None: from muse.cli.commands.mist import run_forks self._assert_doc(run_forks, "run_forks") def test_run_raw_has_docstring(self) -> None: from muse.cli.commands.mist import run_raw self._assert_doc(run_raw, "run_raw") def test_get_mist_raw_route_has_docstring(self) -> None: from musehub.api.routes.musehub.mists import get_mist_raw self._assert_doc(get_mist_raw, "get_mist_raw") def test_content_type_helper_has_docstring(self) -> None: from musehub.api.routes.musehub.mists import _content_type_for_mist self._assert_doc(_content_type_for_mist, "_content_type_for_mist") def test_execute_list_mist_forks_has_docstring(self) -> None: self._assert_doc(execute_list_mist_forks, "execute_list_mist_forks") def test_execute_read_mist_raw_has_docstring(self) -> None: self._assert_doc(execute_read_mist_raw, "execute_read_mist_raw") def test_muse_mist_list_forks_tool_has_description(self) -> None: from musehub.mcp.tools.musehub import MUSEHUB_TOOL_NAMES, MUSEHUB_READ_TOOLS assert "muse_mist_list_forks" in MUSEHUB_TOOL_NAMES tool = next(t for t in MUSEHUB_READ_TOOLS if t["name"] == "muse_mist_list_forks") assert tool.get("description"), "muse_mist_list_forks tool has no description" def test_muse_mist_raw_tool_has_description(self) -> None: from musehub.mcp.tools.musehub import MUSEHUB_TOOL_NAMES, MUSEHUB_READ_TOOLS assert "muse_mist_raw" in MUSEHUB_TOOL_NAMES tool = next(t for t in MUSEHUB_READ_TOOLS if t["name"] == "muse_mist_raw") assert tool.get("description"), "muse_mist_raw tool has no description"