"""Unit tests for BlobBackend (boto3 mocked — no real S3/MinIO calls). Covers: BlobBackend._key / uri_for — pure logic BlobBackend.put/get/exists/delete/get_batch — boto3 client interactions BlobBackend.get_batch — parallel execution BlobBackend.presign_put / presign_batch — public_endpoint_url rewriting read_object_bytes — cache → s3:// → None resolution decode_b64 — padding variants """ from __future__ import annotations import time from types import SimpleNamespace from typing import TypedDict from unittest.mock import MagicMock, patch import pytest from musehub.storage.backends import BlobBackend, decode_b64, read_object_bytes from muse.core.types import long_id class _S3GetResponse(TypedDict): Body: MagicMock def _mock_backend(*, head_raises: bool = False) -> BlobBackend: b = BlobBackend(bucket="test-bucket", region="us-east-1") mock_client = MagicMock() if head_raises: mock_client.head_object.side_effect = Exception("NoSuchKey") b._client = mock_client return b # ═══════════════════════════════════════════════════════════════════════════════ # _key and uri_for — pure logic # ═══════════════════════════════════════════════════════════════════════════════ class TestBlobBackendKeys: def test_key_contains_object_id(self) -> None: b = BlobBackend(bucket="test-bucket", region="us-east-1") assert "abc123" in b._key("abc123") def test_key_preserves_colon(self) -> None: b = BlobBackend(bucket="b", region="us-east-1") assert "sha256:deadbeef" in b._key("sha256:deadbeef") def test_uri_for_s3_scheme(self) -> None: b = BlobBackend(bucket="my-bucket", region="us-east-1") assert b.uri_for("obj1").startswith("s3://my-bucket/") def test_uri_for_preserves_colon(self) -> None: b = BlobBackend(bucket="b", region="us-east-1") assert "sha256:abc" in b.uri_for("sha256:abc") # ═══════════════════════════════════════════════════════════════════════════════ # boto3 interactions (mocked client) # ═══════════════════════════════════════════════════════════════════════════════ class TestBlobBackendMocked: async def test_put_calls_put_object_no_head_check(self) -> None: b = _mock_backend() await b.put("obj1", b"data") b._client.put_object.assert_called_once() b._client.head_object.assert_not_called() async def test_put_idempotent_second_write_safe(self) -> None: b = _mock_backend() await b.put("obj1", b"data") await b.put("obj1", b"data") assert b._client.put_object.call_count == 2 async def test_get_returns_body_bytes(self) -> None: b = _mock_backend() mock_body = MagicMock() mock_body.read.return_value = b"s3 content" b._client.get_object.return_value = {"Body": mock_body} assert await b.get("obj1") == b"s3 content" async def test_get_returns_none_on_missing(self) -> None: b = _mock_backend() b._client.get_object.side_effect = Exception("NoSuchKey") assert await b.get("obj1") is None async def test_exists_true_when_head_succeeds(self) -> None: b = _mock_backend() b._client.head_object.return_value = {} assert await b.exists("obj1") is True async def test_exists_false_on_error(self) -> None: b = _mock_backend(head_raises=True) assert await b.exists("obj1") is False async def test_delete_calls_delete_object(self) -> None: b = _mock_backend() await b.delete("obj1") b._client.delete_object.assert_called_once() async def test_get_batch_returns_all_found(self) -> None: b = _mock_backend() def _get_object(Bucket: str, Key: str) -> _S3GetResponse: mock_body = MagicMock() mock_body.read.return_value = Key.encode() return {"Body": mock_body} b._client.get_object.side_effect = _get_object result = await b.get_batch(["oid-a", "oid-b", "oid-c"]) assert set(result.keys()) == {"oid-a", "oid-b", "oid-c"} async def test_get_batch_omits_missing(self) -> None: b = _mock_backend() b._client.get_object.side_effect = Exception("NoSuchKey") assert await b.get_batch(["oid-1", "oid-2"]) == {} async def test_get_batch_runs_in_parallel(self) -> None: import asyncio DELAY, N = 0.04, 6 b = BlobBackend(bucket="test-bucket", region="us-east-1") async def _slow_get(oid: str, **_kw: str) -> bytes | None: await asyncio.sleep(DELAY) return oid.encode() b.get = _slow_get # type: ignore[method-assign] start = time.perf_counter() result = await b.get_batch([f"oid-{i}" for i in range(N)]) elapsed = time.perf_counter() - start assert len(result) == N assert elapsed < DELAY * 2.5, ( f"get_batch took {elapsed * 1000:.0f}ms — must be parallel, not sequential" ) # ═══════════════════════════════════════════════════════════════════════════════ # read_object_bytes — resolution chain # ═══════════════════════════════════════════════════════════════════════════════ class TestReadObjectBytes: def _obj(self, **kwargs: str | bytes | None) -> SimpleNamespace: defaults = dict(object_id=long_id("aa" * 32), content_cache=None, storage_uri="") defaults.update(kwargs) return SimpleNamespace(**defaults) async def test_returns_content_cache(self) -> None: obj = self._obj(content_cache=b"cached") assert await read_object_bytes(obj) == b"cached" async def test_cache_beats_storage_uri(self) -> None: obj = self._obj(content_cache=b"cached", storage_uri="s3://bucket/key") assert await read_object_bytes(obj) == b"cached" async def test_reads_s3_uri(self) -> None: obj = self._obj(storage_uri="s3://my-bucket/objects/sha256:abc123") mock_backend = MagicMock() async def _fake_get(oid: str, **_kw: str) -> bytes: return b"s3 bytes" mock_backend.get = _fake_get with patch("musehub.storage.backends.get_backend", return_value=mock_backend): assert await read_object_bytes(obj) == b"s3 bytes" async def test_no_path_no_cache_returns_none(self) -> None: assert await read_object_bytes(self._obj()) is None # ═══════════════════════════════════════════════════════════════════════════════ # decode_b64 # ═══════════════════════════════════════════════════════════════════════════════ # ═══════════════════════════════════════════════════════════════════════════════ # presign_put / presign_batch — public_endpoint_url rewriting # ═══════════════════════════════════════════════════════════════════════════════ class TestPresignPublicEndpoint: """When BlobBackend is configured with a public_endpoint_url, presigned URLs must use that host — not the internal endpoint host — so external clients (the muse CLI running on the host) can actually reach the URL. Real-world case: BLOB_STORAGE_ENDPOINT=http://minio:9000 (Docker-internal), BLOB_STORAGE_PUBLIC_ENDPOINT=http://localhost:9000 (reachable from the host machine). """ def _backend_with_internal_and_public(self) -> BlobBackend: b = BlobBackend( bucket="test-bucket", region="us-east-1", endpoint_url="http://minio:9000", public_endpoint_url="http://localhost:9000", ) mock_client = MagicMock() mock_client.generate_presigned_url.return_value = ( "http://minio:9000/test-bucket/objects/sha256:abc?sig=x" ) b._client = mock_client return b def _backend_without_public(self) -> BlobBackend: b = BlobBackend( bucket="test-bucket", region="us-east-1", endpoint_url="http://minio:9000", ) mock_client = MagicMock() mock_client.generate_presigned_url.return_value = ( "http://minio:9000/test-bucket/objects/sha256:abc?sig=x" ) b._client = mock_client return b async def test_presign_put_rewrites_host_when_public_endpoint_set(self) -> None: b = self._backend_with_internal_and_public() url = await b.presign_put("sha256:abc", ttl_seconds=300) assert "localhost:9000" in url, f"expected localhost:9000 in presigned URL, got: {url}" assert "minio:9000" not in url, f"internal hostname leaked into presigned URL: {url}" async def test_presign_put_no_rewrite_when_public_endpoint_absent(self) -> None: b = self._backend_without_public() url = await b.presign_put("sha256:abc", ttl_seconds=300) assert "minio:9000" in url, f"expected minio:9000 in URL when no public endpoint: {url}" async def test_presign_batch_rewrites_all_urls(self) -> None: b = self._backend_with_internal_and_public() b._client.generate_presigned_url.side_effect = [ f"http://minio:9000/bucket/objects/sha256:{i}?sig=x" for i in ("aaa", "bbb", "ccc") ] result = await b.presign_batch( ["sha256:aaa", "sha256:bbb", "sha256:ccc"], direction="put", ttl_seconds=300 ) for oid, url in result.items(): assert "localhost:9000" in url, f"[{oid}] expected localhost in URL: {url}" assert "minio:9000" not in url, f"[{oid}] internal host leaked: {url}" async def test_presign_batch_no_rewrite_when_public_endpoint_absent(self) -> None: b = self._backend_without_public() b._client.generate_presigned_url.side_effect = [ f"http://minio:9000/bucket/objects/sha256:{i}?sig=x" for i in ("aaa", "bbb") ] result = await b.presign_batch( ["sha256:aaa", "sha256:bbb"], direction="put", ttl_seconds=300 ) for url in result.values(): assert "minio:9000" in url async def test_presign_put_preserves_path_and_query(self) -> None: """Rewriting must only change the scheme+host, not the path or query string.""" b = self._backend_with_internal_and_public() b._client.generate_presigned_url.return_value = ( "http://minio:9000/test-bucket/objects/sha256:abc?X-Amz-Signature=deadbeef&X-Amz-Expires=300" ) url = await b.presign_put("sha256:abc", ttl_seconds=300) assert "X-Amz-Signature=deadbeef" in url assert "X-Amz-Expires=300" in url assert "/test-bucket/objects/sha256:abc" in url # ═══════════════════════════════════════════════════════════════════════════════ # decode_b64 # ═══════════════════════════════════════════════════════════════════════════════ class TestDecodeb64: def test_standard(self) -> None: import base64 data = b"hello world" assert decode_b64(base64.b64encode(data).decode()) == data def test_missing_padding(self) -> None: import base64 for data in (b"h", b"hi", b"hel"): encoded = base64.b64encode(data).decode().rstrip("=") assert decode_b64(encoded) == data def test_empty(self) -> None: assert decode_b64("") == b""