test_blob_backend_unit.py
python
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2
feat: add repair-commit wire endpoint (API parity with repa…
Opus 4.8
minor
⚠ breaking
1 day ago
| 1 | """Unit tests for BlobBackend (boto3 mocked — no real S3/MinIO calls). |
| 2 | |
| 3 | Covers: |
| 4 | BlobBackend._key / uri_for — pure logic |
| 5 | BlobBackend.put/get/exists/delete/get_batch — boto3 client interactions |
| 6 | BlobBackend.get_batch — parallel execution |
| 7 | BlobBackend.presign_put / presign_batch — public_endpoint_url rewriting |
| 8 | read_object_bytes — cache → s3:// → None resolution |
| 9 | decode_b64 — padding variants |
| 10 | """ |
| 11 | from __future__ import annotations |
| 12 | |
| 13 | import time |
| 14 | from types import SimpleNamespace |
| 15 | from typing import TypedDict |
| 16 | from unittest.mock import MagicMock, patch |
| 17 | |
| 18 | import pytest |
| 19 | |
| 20 | from musehub.storage.backends import BlobBackend, decode_b64, read_object_bytes |
| 21 | from muse.core.types import long_id |
| 22 | |
| 23 | |
| 24 | class _S3GetResponse(TypedDict): |
| 25 | Body: MagicMock |
| 26 | |
| 27 | |
| 28 | def _mock_backend(*, head_raises: bool = False) -> BlobBackend: |
| 29 | b = BlobBackend(bucket="test-bucket", region="us-east-1") |
| 30 | mock_client = MagicMock() |
| 31 | if head_raises: |
| 32 | mock_client.head_object.side_effect = Exception("NoSuchKey") |
| 33 | b._client = mock_client |
| 34 | return b |
| 35 | |
| 36 | |
| 37 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 38 | # _key and uri_for — pure logic |
| 39 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 40 | |
| 41 | class TestBlobBackendKeys: |
| 42 | def test_key_contains_object_id(self) -> None: |
| 43 | b = BlobBackend(bucket="test-bucket", region="us-east-1") |
| 44 | assert "abc123" in b._key("abc123") |
| 45 | |
| 46 | def test_key_preserves_colon(self) -> None: |
| 47 | b = BlobBackend(bucket="b", region="us-east-1") |
| 48 | assert "sha256:deadbeef" in b._key("sha256:deadbeef") |
| 49 | |
| 50 | def test_uri_for_s3_scheme(self) -> None: |
| 51 | b = BlobBackend(bucket="my-bucket", region="us-east-1") |
| 52 | assert b.uri_for("obj1").startswith("s3://my-bucket/") |
| 53 | |
| 54 | def test_uri_for_preserves_colon(self) -> None: |
| 55 | b = BlobBackend(bucket="b", region="us-east-1") |
| 56 | assert "sha256:abc" in b.uri_for("sha256:abc") |
| 57 | |
| 58 | |
| 59 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 60 | # boto3 interactions (mocked client) |
| 61 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 62 | |
| 63 | class TestBlobBackendMocked: |
| 64 | async def test_put_calls_put_object_no_head_check(self) -> None: |
| 65 | b = _mock_backend() |
| 66 | await b.put("obj1", b"data") |
| 67 | b._client.put_object.assert_called_once() |
| 68 | b._client.head_object.assert_not_called() |
| 69 | |
| 70 | async def test_put_idempotent_second_write_safe(self) -> None: |
| 71 | b = _mock_backend() |
| 72 | await b.put("obj1", b"data") |
| 73 | await b.put("obj1", b"data") |
| 74 | assert b._client.put_object.call_count == 2 |
| 75 | |
| 76 | async def test_get_returns_body_bytes(self) -> None: |
| 77 | b = _mock_backend() |
| 78 | mock_body = MagicMock() |
| 79 | mock_body.read.return_value = b"s3 content" |
| 80 | b._client.get_object.return_value = {"Body": mock_body} |
| 81 | assert await b.get("obj1") == b"s3 content" |
| 82 | |
| 83 | async def test_get_returns_none_on_missing(self) -> None: |
| 84 | b = _mock_backend() |
| 85 | b._client.get_object.side_effect = Exception("NoSuchKey") |
| 86 | assert await b.get("obj1") is None |
| 87 | |
| 88 | async def test_exists_true_when_head_succeeds(self) -> None: |
| 89 | b = _mock_backend() |
| 90 | b._client.head_object.return_value = {} |
| 91 | assert await b.exists("obj1") is True |
| 92 | |
| 93 | async def test_exists_false_on_error(self) -> None: |
| 94 | b = _mock_backend(head_raises=True) |
| 95 | assert await b.exists("obj1") is False |
| 96 | |
| 97 | async def test_delete_calls_delete_object(self) -> None: |
| 98 | b = _mock_backend() |
| 99 | await b.delete("obj1") |
| 100 | b._client.delete_object.assert_called_once() |
| 101 | |
| 102 | async def test_get_batch_returns_all_found(self) -> None: |
| 103 | b = _mock_backend() |
| 104 | |
| 105 | def _get_object(Bucket: str, Key: str) -> _S3GetResponse: |
| 106 | mock_body = MagicMock() |
| 107 | mock_body.read.return_value = Key.encode() |
| 108 | return {"Body": mock_body} |
| 109 | |
| 110 | b._client.get_object.side_effect = _get_object |
| 111 | result = await b.get_batch(["oid-a", "oid-b", "oid-c"]) |
| 112 | assert set(result.keys()) == {"oid-a", "oid-b", "oid-c"} |
| 113 | |
| 114 | async def test_get_batch_omits_missing(self) -> None: |
| 115 | b = _mock_backend() |
| 116 | b._client.get_object.side_effect = Exception("NoSuchKey") |
| 117 | assert await b.get_batch(["oid-1", "oid-2"]) == {} |
| 118 | |
| 119 | async def test_get_batch_runs_in_parallel(self) -> None: |
| 120 | import asyncio |
| 121 | DELAY, N = 0.04, 6 |
| 122 | b = BlobBackend(bucket="test-bucket", region="us-east-1") |
| 123 | |
| 124 | async def _slow_get(oid: str, **_kw: str) -> bytes | None: |
| 125 | await asyncio.sleep(DELAY) |
| 126 | return oid.encode() |
| 127 | |
| 128 | b.get = _slow_get # type: ignore[method-assign] |
| 129 | start = time.perf_counter() |
| 130 | result = await b.get_batch([f"oid-{i}" for i in range(N)]) |
| 131 | elapsed = time.perf_counter() - start |
| 132 | |
| 133 | assert len(result) == N |
| 134 | assert elapsed < DELAY * 2.5, ( |
| 135 | f"get_batch took {elapsed * 1000:.0f}ms — must be parallel, not sequential" |
| 136 | ) |
| 137 | |
| 138 | |
| 139 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 140 | # read_object_bytes — resolution chain |
| 141 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 142 | |
| 143 | class TestReadObjectBytes: |
| 144 | def _obj(self, **kwargs: str | bytes | None) -> SimpleNamespace: |
| 145 | defaults = dict(object_id=long_id("aa" * 32), content_cache=None, storage_uri="") |
| 146 | defaults.update(kwargs) |
| 147 | return SimpleNamespace(**defaults) |
| 148 | |
| 149 | async def test_returns_content_cache(self) -> None: |
| 150 | obj = self._obj(content_cache=b"cached") |
| 151 | assert await read_object_bytes(obj) == b"cached" |
| 152 | |
| 153 | async def test_cache_beats_storage_uri(self) -> None: |
| 154 | obj = self._obj(content_cache=b"cached", storage_uri="s3://bucket/key") |
| 155 | assert await read_object_bytes(obj) == b"cached" |
| 156 | |
| 157 | async def test_reads_s3_uri(self) -> None: |
| 158 | obj = self._obj(storage_uri="s3://my-bucket/objects/sha256:abc123") |
| 159 | mock_backend = MagicMock() |
| 160 | |
| 161 | async def _fake_get(oid: str, **_kw: str) -> bytes: |
| 162 | return b"s3 bytes" |
| 163 | |
| 164 | mock_backend.get = _fake_get |
| 165 | with patch("musehub.storage.backends.get_backend", return_value=mock_backend): |
| 166 | assert await read_object_bytes(obj) == b"s3 bytes" |
| 167 | |
| 168 | async def test_no_path_no_cache_returns_none(self) -> None: |
| 169 | assert await read_object_bytes(self._obj()) is None |
| 170 | |
| 171 | |
| 172 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 173 | # decode_b64 |
| 174 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 175 | |
| 176 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 177 | # presign_put / presign_batch — public_endpoint_url rewriting |
| 178 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 179 | |
| 180 | class TestPresignPublicEndpoint: |
| 181 | """When BlobBackend is configured with a public_endpoint_url, presigned URLs |
| 182 | must use that host — not the internal endpoint host — so external clients |
| 183 | (the muse CLI running on the host) can actually reach the URL. |
| 184 | |
| 185 | Real-world case: BLOB_STORAGE_ENDPOINT=http://minio:9000 (Docker-internal), |
| 186 | BLOB_STORAGE_PUBLIC_ENDPOINT=http://localhost:9000 (reachable from the host machine). |
| 187 | """ |
| 188 | |
| 189 | def _backend_with_internal_and_public(self) -> BlobBackend: |
| 190 | b = BlobBackend( |
| 191 | bucket="test-bucket", |
| 192 | region="us-east-1", |
| 193 | endpoint_url="http://minio:9000", |
| 194 | public_endpoint_url="http://localhost:9000", |
| 195 | ) |
| 196 | mock_client = MagicMock() |
| 197 | mock_client.generate_presigned_url.return_value = ( |
| 198 | "http://minio:9000/test-bucket/objects/sha256:abc?sig=x" |
| 199 | ) |
| 200 | b._client = mock_client |
| 201 | return b |
| 202 | |
| 203 | def _backend_without_public(self) -> BlobBackend: |
| 204 | b = BlobBackend( |
| 205 | bucket="test-bucket", |
| 206 | region="us-east-1", |
| 207 | endpoint_url="http://minio:9000", |
| 208 | ) |
| 209 | mock_client = MagicMock() |
| 210 | mock_client.generate_presigned_url.return_value = ( |
| 211 | "http://minio:9000/test-bucket/objects/sha256:abc?sig=x" |
| 212 | ) |
| 213 | b._client = mock_client |
| 214 | return b |
| 215 | |
| 216 | async def test_presign_put_rewrites_host_when_public_endpoint_set(self) -> None: |
| 217 | b = self._backend_with_internal_and_public() |
| 218 | url = await b.presign_put("sha256:abc", ttl_seconds=300) |
| 219 | assert "localhost:9000" in url, f"expected localhost:9000 in presigned URL, got: {url}" |
| 220 | assert "minio:9000" not in url, f"internal hostname leaked into presigned URL: {url}" |
| 221 | |
| 222 | async def test_presign_put_no_rewrite_when_public_endpoint_absent(self) -> None: |
| 223 | b = self._backend_without_public() |
| 224 | url = await b.presign_put("sha256:abc", ttl_seconds=300) |
| 225 | assert "minio:9000" in url, f"expected minio:9000 in URL when no public endpoint: {url}" |
| 226 | |
| 227 | async def test_presign_batch_rewrites_all_urls(self) -> None: |
| 228 | b = self._backend_with_internal_and_public() |
| 229 | b._client.generate_presigned_url.side_effect = [ |
| 230 | f"http://minio:9000/bucket/objects/sha256:{i}?sig=x" |
| 231 | for i in ("aaa", "bbb", "ccc") |
| 232 | ] |
| 233 | result = await b.presign_batch( |
| 234 | ["sha256:aaa", "sha256:bbb", "sha256:ccc"], direction="put", ttl_seconds=300 |
| 235 | ) |
| 236 | for oid, url in result.items(): |
| 237 | assert "localhost:9000" in url, f"[{oid}] expected localhost in URL: {url}" |
| 238 | assert "minio:9000" not in url, f"[{oid}] internal host leaked: {url}" |
| 239 | |
| 240 | async def test_presign_batch_no_rewrite_when_public_endpoint_absent(self) -> None: |
| 241 | b = self._backend_without_public() |
| 242 | b._client.generate_presigned_url.side_effect = [ |
| 243 | f"http://minio:9000/bucket/objects/sha256:{i}?sig=x" |
| 244 | for i in ("aaa", "bbb") |
| 245 | ] |
| 246 | result = await b.presign_batch( |
| 247 | ["sha256:aaa", "sha256:bbb"], direction="put", ttl_seconds=300 |
| 248 | ) |
| 249 | for url in result.values(): |
| 250 | assert "minio:9000" in url |
| 251 | |
| 252 | async def test_presign_put_preserves_path_and_query(self) -> None: |
| 253 | """Rewriting must only change the scheme+host, not the path or query string.""" |
| 254 | b = self._backend_with_internal_and_public() |
| 255 | b._client.generate_presigned_url.return_value = ( |
| 256 | "http://minio:9000/test-bucket/objects/sha256:abc?X-Amz-Signature=deadbeef&X-Amz-Expires=300" |
| 257 | ) |
| 258 | url = await b.presign_put("sha256:abc", ttl_seconds=300) |
| 259 | assert "X-Amz-Signature=deadbeef" in url |
| 260 | assert "X-Amz-Expires=300" in url |
| 261 | assert "/test-bucket/objects/sha256:abc" in url |
| 262 | |
| 263 | |
| 264 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 265 | # decode_b64 |
| 266 | # ═══════════════════════════════════════════════════════════════════════════════ |
| 267 | |
| 268 | class TestDecodeb64: |
| 269 | def test_standard(self) -> None: |
| 270 | import base64 |
| 271 | data = b"hello world" |
| 272 | assert decode_b64(base64.b64encode(data).decode()) == data |
| 273 | |
| 274 | def test_missing_padding(self) -> None: |
| 275 | import base64 |
| 276 | for data in (b"h", b"hi", b"hel"): |
| 277 | encoded = base64.b64encode(data).decode().rstrip("=") |
| 278 | assert decode_b64(encoded) == data |
| 279 | |
| 280 | def test_empty(self) -> None: |
| 281 | assert decode_b64("") == b"" |
File History
1 commit
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2
feat: add repair-commit wire endpoint (API parity with repa…
Opus 4.8
minor
⚠
1 day ago