"""Tests for zlib-compressed object decompression in raw file serving and README fetching. Root cause: objects pushed via the old wire path were stored zlib-compressed in R2. The raw file endpoint and _fetch_readme read bytes directly from storage without decompressing, so staging serves garbled bytes while localhost (filesystem backend with repair logic) serves clean text. Fix: detect zlib magic bytes and decompress before serving — in both _fetch_readme and raw_file_semantic. """ from __future__ import annotations import zlib from unittest.mock import AsyncMock, MagicMock, patch import pytest import pytest_asyncio from httpx import AsyncClient, ASGITransport from sqlalchemy.ext.asyncio import AsyncSession from musehub.db.musehub_repo_models import MusehubObject from musehub.main import app # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _zlib_compress(text: str) -> bytes: return zlib.compress(text.encode()) def _raw_bytes(text: str) -> bytes: return text.encode() README_TEXT = "# muse-zsh\n\nOh My ZSH plugin for Muse.\n" ZLIB_README = _zlib_compress(README_TEXT) RAW_README = _raw_bytes(README_TEXT) # --------------------------------------------------------------------------- # Unit tests — decompress_if_needed utility # --------------------------------------------------------------------------- def test_decompress_if_needed_passes_plain_text_through() -> None: from musehub.types.compression import decompress_if_needed data = b"# plain text README\n" assert decompress_if_needed(data) == data def test_decompress_if_needed_decompresses_zlib_level_default() -> None: from musehub.types.compression import decompress_if_needed compressed = zlib.compress(b"hello world\n") assert decompress_if_needed(compressed) == b"hello world\n" def test_decompress_if_needed_decompresses_zlib_level_1() -> None: from musehub.types.compression import decompress_if_needed compressed = zlib.compress(b"hello\n", level=1) assert decompress_if_needed(compressed) == b"hello\n" def test_decompress_if_needed_decompresses_zlib_level_9() -> None: from musehub.types.compression import decompress_if_needed compressed = zlib.compress(b"hello\n", level=9) assert decompress_if_needed(compressed) == b"hello\n" def test_decompress_if_needed_passes_empty_bytes_through() -> None: from musehub.types.compression import decompress_if_needed assert decompress_if_needed(b"") == b"" def test_decompress_if_needed_passes_binary_non_zlib_through() -> None: from musehub.types.compression import decompress_if_needed # PNG magic bytes — not zlib data = b"\x89PNG\r\n\x1a\n" + b"\x00" * 100 assert decompress_if_needed(data) == data def test_decompress_if_needed_handles_truncated_zlib_gracefully() -> None: from musehub.types.compression import decompress_if_needed # zlib magic bytes but truncated — should return original bytes, not raise data = b"\x78\x9c\x00" # valid zlib header, invalid body result = decompress_if_needed(data) # Must not raise — returns original bytes on decompression failure assert isinstance(result, bytes) def test_decompress_if_needed_decompresses_full_readme() -> None: from musehub.types.compression import decompress_if_needed assert decompress_if_needed(ZLIB_README).decode() == README_TEXT # --------------------------------------------------------------------------- # Unit tests — _fetch_readme decompresses stored objects # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_fetch_readme_decompresses_zlib_object( monkeypatch: pytest.MonkeyPatch, ) -> None: """_fetch_readme must decode zlib-compressed objects from storage.""" from musehub.api.routes.musehub._ui_helpers import _fetch_readme fake_obj = MagicMock() fake_obj.content_cache = ZLIB_README fake_obj.storage_uri = "" mock_db = MagicMock() mock_db.get = AsyncMock(return_value=fake_obj) class _Entry: name = "README.md" object_id = "abc123" result = await _fetch_readme( db=mock_db, repo_id="repo-1", ref="main", entries=[_Entry()], # type: ignore[list-item] ) assert result == README_TEXT @pytest.mark.asyncio async def test_fetch_readme_plain_text_object_unchanged( monkeypatch: pytest.MonkeyPatch, ) -> None: """_fetch_readme must pass through already-plain-text objects unchanged.""" from musehub.api.routes.musehub._ui_helpers import _fetch_readme fake_obj = MagicMock() fake_obj.content_cache = RAW_README fake_obj.storage_uri = "" mock_db = MagicMock() mock_db.get = AsyncMock(return_value=fake_obj) class _Entry: name = "README.md" object_id = "abc123" result = await _fetch_readme( db=mock_db, repo_id="repo-1", ref="main", entries=[_Entry()], # type: ignore[list-item] ) assert result == README_TEXT # --------------------------------------------------------------------------- # Integration tests — raw_file_semantic endpoint decompresses # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_raw_endpoint_decompresses_zlib_object( client: AsyncClient, db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch, ) -> None: """GET /owner/repo/raw/ref/README.md must return plain text even when the stored object is zlib-compressed.""" from musehub.api.routes.musehub import ui_tree from musehub.api.routes.musehub import repos as repos_mod db_session.add(MusehubObject( object_id="deadbeef", path="README.md", size_bytes=len(ZLIB_README), storage_uri="s3://muse-objects/objects/deadbeef", )) await db_session.commit() # Stub repo resolution monkeypatch.setattr( ui_tree, "_resolve_repo", AsyncMock(return_value=("repo-id-1", MagicMock(), MagicMock())), ) # Stub file metadata lookup monkeypatch.setattr( ui_tree.musehub_repository, "get_file_at_ref", AsyncMock(return_value={"object_id": "deadbeef"}), ) # Stub storage: exists=True, stream yields zlib-compressed bytes mock_storage = MagicMock() mock_storage.exists = AsyncMock(return_value=True) async def _compressed_stream(object_id: str, chunk_size: int = 65536) -> None: yield ZLIB_README mock_storage.stream = _compressed_stream monkeypatch.setattr(ui_tree, "_get_storage_backend", lambda *_: mock_storage) resp = await client.get("/gabriel/muse-zsh/raw/main/README.md") assert resp.status_code == 200 assert resp.text == README_TEXT @pytest.mark.asyncio async def test_raw_endpoint_plain_text_object_unchanged( client: AsyncClient, db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch, ) -> None: """GET /owner/repo/raw/ref/README.md must pass through plain text unchanged.""" from musehub.api.routes.musehub import ui_tree db_session.add(MusehubObject( object_id="deadbeef", path="README.md", size_bytes=len(RAW_README), storage_uri="s3://muse-objects/objects/deadbeef", )) await db_session.commit() monkeypatch.setattr( ui_tree, "_resolve_repo", AsyncMock(return_value=("repo-id-1", MagicMock(), MagicMock())), ) monkeypatch.setattr( ui_tree.musehub_repository, "get_file_at_ref", AsyncMock(return_value={"object_id": "deadbeef"}), ) mock_storage = MagicMock() mock_storage.exists = AsyncMock(return_value=True) async def _plain_stream(object_id: str, chunk_size: int = 65536) -> None: yield RAW_README mock_storage.stream = _plain_stream monkeypatch.setattr(ui_tree, "_get_storage_backend", lambda *_: mock_storage) resp = await client.get("/gabriel/muse-zsh/raw/main/README.md") assert resp.status_code == 200 assert resp.text == README_TEXT @pytest.mark.asyncio async def test_raw_endpoint_zlib_toml_file_decompressed( client: AsyncClient, db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch, ) -> None: """Non-README files (e.g. .toml) are also decompressed when zlib-stored.""" from musehub.api.routes.musehub import ui_tree toml_content = '[workspace]\nversion = 1\n' compressed_toml = zlib.compress(toml_content.encode()) db_session.add(MusehubObject( object_id="deadbeef", path=".museattributes", size_bytes=len(compressed_toml), storage_uri="s3://muse-objects/objects/deadbeef", )) await db_session.commit() monkeypatch.setattr( ui_tree, "_resolve_repo", AsyncMock(return_value=("repo-id-1", MagicMock(), MagicMock())), ) monkeypatch.setattr( ui_tree.musehub_repository, "get_file_at_ref", AsyncMock(return_value={"object_id": "deadbeef"}), ) mock_storage = MagicMock() mock_storage.exists = AsyncMock(return_value=True) async def _compressed_stream(object_id: str, chunk_size: int = 65536) -> None: yield compressed_toml mock_storage.stream = _compressed_stream monkeypatch.setattr(ui_tree, "_get_storage_backend", lambda *_: mock_storage) resp = await client.get("/gabriel/muse-zsh/raw/main/.museattributes") assert resp.status_code == 200 assert resp.text == toml_content @pytest.mark.asyncio async def test_raw_endpoint_binary_file_not_decompressed( client: AsyncClient, db_session: AsyncSession, monkeypatch: pytest.MonkeyPatch, ) -> None: """Binary files (PNG etc.) must not be decompressed — their bytes are served raw.""" from musehub.api.routes.musehub import ui_tree # PNG magic — starts with bytes that are NOT a zlib header png_bytes = b"\x89PNG\r\n\x1a\n" + b"\x00" * 50 db_session.add(MusehubObject( object_id="deadbeef", path="logo.png", size_bytes=len(png_bytes), storage_uri="s3://muse-objects/objects/deadbeef", )) await db_session.commit() monkeypatch.setattr( ui_tree, "_resolve_repo", AsyncMock(return_value=("repo-id-1", MagicMock(), MagicMock())), ) monkeypatch.setattr( ui_tree.musehub_repository, "get_file_at_ref", AsyncMock(return_value={"object_id": "deadbeef"}), ) mock_storage = MagicMock() mock_storage.exists = AsyncMock(return_value=True) async def _png_stream(object_id: str, chunk_size: int = 65536) -> None: yield png_bytes mock_storage.stream = _png_stream monkeypatch.setattr(ui_tree, "_get_storage_backend", lambda *_: mock_storage) resp = await client.get("/gabriel/muse-zsh/raw/main/logo.png") assert resp.status_code == 200 assert resp.content == png_bytes