"""MinIO — BlobBackend as the only storage backend. Tier 1 — Unit (no network) - get_backend() raises RuntimeError when no bucket is configured - get_backend() returns BlobBackend when blob_storage_bucket is set - get_backend() returns BlobBackend when aws_s3_asset_bucket is set - LocalBackend is not importable (deleted) Tier 2 — Integration (MinIO required) - BlobBackend.put/get/exists/delete round-trip against a live MinIO container - put is idempotent (second write is safe) - get returns None for missing object - exists returns False for missing object Tier 3 — Mpack content-addressability (MinIO required) - put_mpack/get_mpack round-trip: sha256(get_mpack(key)) == key - presign_mpack_put PUT/get_mpack round-trip: sha256(get_mpack(key)) == key MinIO assumed at http://localhost:9000 with credentials minioadmin/minioadmin. Tests in TestBlobBackendMinIO are skipped automatically when MinIO is unreachable. """ from __future__ import annotations import hashlib import importlib import secrets import urllib.request from unittest.mock import patch import pytest from muse.core.types import long_id # ─── helpers ────────────────────────────────────────────────────────────────── MINIO_ENDPOINT = "http://localhost:9000" MINIO_BUCKET = "muse-objects" MINIO_ACCESS_KEY = "minioadmin" MINIO_SECRET_KEY = "minioadmin" def _oid() -> str: return long_id(secrets.token_hex(32)) def _minio_backend() -> None: from musehub.storage.backends import BlobBackend return BlobBackend( bucket=MINIO_BUCKET, endpoint_url=MINIO_ENDPOINT, access_key_id=MINIO_ACCESS_KEY, secret_access_key=MINIO_SECRET_KEY, region="us-east-1", ) def _minio_reachable() -> bool: try: import urllib.request urllib.request.urlopen(f"{MINIO_ENDPOINT}/minio/health/live", timeout=2) return True except Exception: return False requires_minio = pytest.mark.skipif( not _minio_reachable(), reason="MinIO not reachable at localhost:9000 — start docker compose first", ) # ═══════════════════════════════════════════════════════════════════════════════ # Tier 1 — Unit: get_backend() contract after LocalBackend is deleted # ═══════════════════════════════════════════════════════════════════════════════ class TestBlobBackendNaming: def test_blob_backend_importable(self) -> None: from musehub.storage.backends import BlobBackend # noqa: F401 def test_blob_backend_exported_from_storage(self) -> None: import musehub.storage as mod assert hasattr(mod, "BlobBackend") def test_s3_backend_name_deleted(self) -> None: """S3Backend must not exist — the name leaks the transport protocol.""" import musehub.storage.backends as mod assert not hasattr(mod, "S3Backend"), ( "S3Backend still exists — rename it to BlobBackend" ) def test_get_backend_returns_blob_backend(self) -> None: from musehub.storage.backends import BlobBackend, get_backend from unittest.mock import patch with patch("musehub.storage.backends.settings") as mock_settings: mock_settings.blob_storage_bucket = "muse-objects" mock_settings.blob_storage_endpoint = MINIO_ENDPOINT mock_settings.blob_storage_access_key_id = MINIO_ACCESS_KEY mock_settings.blob_storage_secret_access_key = MINIO_SECRET_KEY mock_settings.blob_storage_region = "us-east-1" result = get_backend() assert isinstance(result, BlobBackend) class TestGetBackendNoFallback: def test_raises_when_no_bucket_configured(self) -> None: """get_backend() must raise RuntimeError when neither blob_storage_bucket nor aws_s3_asset_bucket is set. No silent LocalBackend fallback.""" from musehub.storage.backends import _get_backend_impl with patch("musehub.storage.backends.settings") as mock_settings: mock_settings.blob_storage_bucket = None mock_settings.aws_s3_asset_bucket = None with pytest.raises(RuntimeError, match="No storage backend configured"): _get_backend_impl() def test_returns_blob_backend_when_blob_storage_bucket_set(self) -> None: from musehub.storage.backends import BlobBackend, get_backend with patch("musehub.storage.backends.settings") as mock_settings: mock_settings.blob_storage_bucket = "muse-objects" mock_settings.blob_storage_endpoint = MINIO_ENDPOINT mock_settings.blob_storage_access_key_id = MINIO_ACCESS_KEY mock_settings.blob_storage_secret_access_key = MINIO_SECRET_KEY mock_settings.blob_storage_region = "auto" result = get_backend() assert isinstance(result, BlobBackend) def test_returns_blob_backend_when_aws_bucket_set(self) -> None: from musehub.storage.backends import BlobBackend, get_backend with patch("musehub.storage.backends.settings") as mock_settings: mock_settings.blob_storage_bucket = None mock_settings.aws_s3_asset_bucket = "my-bucket" mock_settings.aws_region = "us-east-1" mock_settings.blob_storage_endpoint = None mock_settings.blob_storage_access_key_id = None mock_settings.blob_storage_secret_access_key = None result = get_backend() assert isinstance(result, BlobBackend) def test_local_backend_is_deleted(self) -> None: """LocalBackend must not exist in the module after deletion.""" import musehub.storage.backends as mod assert not hasattr(mod, "LocalBackend"), ( "LocalBackend still exists in musehub.storage.backends — delete it" ) def test_local_backend_not_in_storage_init(self) -> None: """LocalBackend must not be exported from musehub.storage.""" import musehub.storage as mod assert not hasattr(mod, "LocalBackend"), ( "LocalBackend still exported from musehub.storage — remove it from __all__" ) # ═══════════════════════════════════════════════════════════════════════════════ # Tier 2 — Integration: S3Backend against live MinIO # ═══════════════════════════════════════════════════════════════════════════════ class TestS3BackendMinIO: @requires_minio async def test_put_get_round_trip(self) -> None: backend = _minio_backend() oid = _oid() data = b"hello minio " + secrets.token_bytes(32) await backend.put(oid, data) result = await backend.get(oid) assert result == data @requires_minio async def test_exists_false_before_put(self) -> None: backend = _minio_backend() oid = _oid() assert await backend.exists(oid) is False @requires_minio async def test_exists_true_after_put(self) -> None: backend = _minio_backend() oid = _oid() await backend.put(oid, b"exists-check") assert await backend.exists(oid) is True @requires_minio async def test_put_is_idempotent(self) -> None: backend = _minio_backend() oid = _oid() data = b"idempotent-data" await backend.put(oid, data) await backend.put(oid, data) result = await backend.get(oid) assert result == data @requires_minio async def test_get_returns_none_for_missing(self) -> None: backend = _minio_backend() oid = _oid() result = await backend.get(oid) assert result is None @requires_minio async def test_delete_removes_object(self) -> None: backend = _minio_backend() oid = _oid() await backend.put(oid, b"to-be-deleted") assert await backend.exists(oid) is True await backend.delete(oid) assert await backend.exists(oid) is False @requires_minio async def test_uri_for_uses_s3_scheme(self) -> None: backend = _minio_backend() oid = _oid() uri = backend.uri_for(oid) assert uri.startswith(f"s3://{MINIO_BUCKET}/") @requires_minio async def test_get_backend_with_minio_env_vars(self) -> None: """get_backend() wired to MinIO env vars returns a working S3Backend.""" from musehub.storage.backends import get_backend with patch("musehub.storage.backends.settings") as mock_settings: mock_settings.blob_storage_bucket = MINIO_BUCKET mock_settings.blob_storage_endpoint = MINIO_ENDPOINT mock_settings.blob_storage_access_key_id = MINIO_ACCESS_KEY mock_settings.blob_storage_secret_access_key = MINIO_SECRET_KEY mock_settings.blob_storage_region = "us-east-1" backend = get_backend() oid = _oid() await backend.put(oid, b"via-get-backend") assert await backend.get(oid) == b"via-get-backend" # ═══════════════════════════════════════════════════════════════════════════════ # Tier 3 — Mpack content-addressability # ═══════════════════════════════════════════════════════════════════════════════ def _mpack_key_for(data: bytes) -> str: return "sha256:" + hashlib.sha256(data).hexdigest() def _minio_backend_with_public_endpoint(): """Backend with public endpoint set so presigned URLs use localhost:9000.""" from musehub.storage.backends import BlobBackend return BlobBackend( bucket=MINIO_BUCKET, endpoint_url=MINIO_ENDPOINT, public_endpoint_url=MINIO_ENDPOINT, access_key_id=MINIO_ACCESS_KEY, secret_access_key=MINIO_SECRET_KEY, region="us-east-1", ) class TestMpackContentAddressability: """Every mpack written must be readable back with sha256(bytes) == key.""" @requires_minio async def test_put_mpack_get_mpack_round_trip(self) -> None: """put_mpack then get_mpack must return bytes whose sha256 matches the key.""" backend = _minio_backend_with_public_endpoint() data = b"MUSE" + secrets.token_bytes(128) key = _mpack_key_for(data) await backend.put_mpack(key, data) result = await backend.get_mpack(key) assert result is not None, "get_mpack returned None — object not found" actual_key = _mpack_key_for(result) assert actual_key == key, ( f"Content-addressability violated: wrote key={key[:30]}… " f"but read back sha256={actual_key[:30]}…" ) assert result == data @requires_minio async def test_presign_put_get_mpack_round_trip(self) -> None: """Presigned PUT then get_mpack must return bytes whose sha256 matches the key. This is the push path: client PUTs via presigned URL, server reads via get_mpack. If the presigned URL key encoding differs from _mpack_key, this test catches it. """ backend = _minio_backend_with_public_endpoint() data = b"MUSE" + secrets.token_bytes(128) key = _mpack_key_for(data) upload_url = await backend.presign_mpack_put(key, ttl_seconds=300) # PUT directly to the presigned URL (simulates the muse push client) req = urllib.request.Request( upload_url, data=data, method="PUT", headers={"Content-Type": "application/x-muse-pack"}, ) with urllib.request.urlopen(req) as resp: assert resp.status == 200, f"presigned PUT failed: HTTP {resp.status}" result = await backend.get_mpack(key) assert result is not None, ( "get_mpack returned None after presigned PUT — key mismatch between " f"presigned URL and _mpack_key. upload_url={upload_url}" ) actual_key = _mpack_key_for(result) assert actual_key == key, ( f"Content-addressability violated after presigned PUT: " f"wrote key={key[:30]}… but read back sha256={actual_key[:30]}…\n" f"upload_url={upload_url}" )