gabriel / musehub public

test_blob_backend_unit.py file-level

at sha256:3 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 πŸ’₯ blast risk
sha256:0 fix: fall back to any indexed mpack in read_object_bytes when push mpac… · gabriel · Jun 17, 2026
1 """Unit tests for BlobBackend (boto3 mocked β€” no real S3/MinIO calls).
2
3 Covers:
4 BlobBackend._key / uri_for β€” pure logic
5 BlobBackend.put/get/exists/delete/get_batch β€” boto3 client interactions
6 BlobBackend.get_batch β€” parallel execution
7 BlobBackend.presign_put / presign_batch β€” public_endpoint_url rewriting
8 read_object_bytes β€” cache β†’ s3:// β†’ None resolution
9 decode_b64 β€” padding variants
10 """
11 from __future__ import annotations
12
13 import time
14 from types import SimpleNamespace
15 from typing import TypedDict
16 from unittest.mock import MagicMock, patch
17
18 import pytest
19
20 from musehub.storage.backends import BlobBackend, decode_b64, read_object_bytes
21 from muse.core.types import long_id
22
23
24 class _S3GetResponse(TypedDict):
25 Body: MagicMock
26
27
28 def _mock_backend(*, head_raises: bool = False) -> BlobBackend:
29 b = BlobBackend(bucket="test-bucket", region="us-east-1")
30 mock_client = MagicMock()
31 if head_raises:
32 mock_client.head_object.side_effect = Exception("NoSuchKey")
33 b._client = mock_client
34 return b
35
36
37 # ═══════════════════════════════════════════════════════════════════════════════
38 # _key and uri_for β€” pure logic
39 # ═══════════════════════════════════════════════════════════════════════════════
40
41 class TestBlobBackendKeys:
42 def test_key_contains_object_id(self) -> None:
43 b = BlobBackend(bucket="test-bucket", region="us-east-1")
44 assert "abc123" in b._key("abc123")
45
46 def test_key_preserves_colon(self) -> None:
47 b = BlobBackend(bucket="b", region="us-east-1")
48 assert "sha256:deadbeef" in b._key("sha256:deadbeef")
49
50 def test_uri_for_s3_scheme(self) -> None:
51 b = BlobBackend(bucket="my-bucket", region="us-east-1")
52 assert b.uri_for("obj1").startswith("s3://my-bucket/")
53
54 def test_uri_for_preserves_colon(self) -> None:
55 b = BlobBackend(bucket="b", region="us-east-1")
56 assert "sha256:abc" in b.uri_for("sha256:abc")
57
58
59 # ═══════════════════════════════════════════════════════════════════════════════
60 # boto3 interactions (mocked client)
61 # ═══════════════════════════════════════════════════════════════════════════════
62
63 class TestBlobBackendMocked:
64 async def test_put_calls_put_object_no_head_check(self) -> None:
65 b = _mock_backend()
66 await b.put("obj1", b"data")
67 b._client.put_object.assert_called_once()
68 b._client.head_object.assert_not_called()
69
70 async def test_put_idempotent_second_write_safe(self) -> None:
71 b = _mock_backend()
72 await b.put("obj1", b"data")
73 await b.put("obj1", b"data")
74 assert b._client.put_object.call_count == 2
75
76 async def test_get_returns_body_bytes(self) -> None:
77 b = _mock_backend()
78 mock_body = MagicMock()
79 mock_body.read.return_value = b"s3 content"
80 b._client.get_object.return_value = {"Body": mock_body}
81 assert await b.get("obj1") == b"s3 content"
82
83 async def test_get_returns_none_on_missing(self) -> None:
84 b = _mock_backend()
85 b._client.get_object.side_effect = Exception("NoSuchKey")
86 assert await b.get("obj1") is None
87
88 async def test_exists_true_when_head_succeeds(self) -> None:
89 b = _mock_backend()
90 b._client.head_object.return_value = {}
91 assert await b.exists("obj1") is True
92
93 async def test_exists_false_on_error(self) -> None:
94 b = _mock_backend(head_raises=True)
95 assert await b.exists("obj1") is False
96
97 async def test_delete_calls_delete_object(self) -> None:
98 b = _mock_backend()
99 await b.delete("obj1")
100 b._client.delete_object.assert_called_once()
101
102 async def test_get_batch_returns_all_found(self) -> None:
103 b = _mock_backend()
104
105 def _get_object(Bucket: str, Key: str) -> _S3GetResponse:
106 mock_body = MagicMock()
107 mock_body.read.return_value = Key.encode()
108 return {"Body": mock_body}
109
110 b._client.get_object.side_effect = _get_object
111 result = await b.get_batch(["oid-a", "oid-b", "oid-c"])
112 assert set(result.keys()) == {"oid-a", "oid-b", "oid-c"}
113
114 async def test_get_batch_omits_missing(self) -> None:
115 b = _mock_backend()
116 b._client.get_object.side_effect = Exception("NoSuchKey")
117 assert await b.get_batch(["oid-1", "oid-2"]) == {}
118
119 async def test_get_batch_runs_in_parallel(self) -> None:
120 import asyncio
121 DELAY, N = 0.04, 6
122 b = BlobBackend(bucket="test-bucket", region="us-east-1")
123
124 async def _slow_get(oid: str, **_kw: str) -> bytes | None:
125 await asyncio.sleep(DELAY)
126 return oid.encode()
127
128 b.get = _slow_get # type: ignore[method-assign]
129 start = time.perf_counter()
130 result = await b.get_batch([f"oid-{i}" for i in range(N)])
131 elapsed = time.perf_counter() - start
132
133 assert len(result) == N
134 assert elapsed < DELAY * 2.5, (
135 f"get_batch took {elapsed * 1000:.0f}ms β€” must be parallel, not sequential"
136 )
137
138
139 # ═══════════════════════════════════════════════════════════════════════════════
140 # read_object_bytes β€” resolution chain
141 # ═══════════════════════════════════════════════════════════════════════════════
142
143 class TestReadObjectBytes:
144 def _obj(self, **kwargs: str | bytes | None) -> SimpleNamespace:
145 defaults = dict(object_id=long_id("aa" * 32), content_cache=None, storage_uri="")
146 defaults.update(kwargs)
147 return SimpleNamespace(**defaults)
148
149 async def test_returns_content_cache(self) -> None:
150 obj = self._obj(content_cache=b"cached")
151 assert await read_object_bytes(obj) == b"cached"
152
153 async def test_cache_beats_storage_uri(self) -> None:
154 obj = self._obj(content_cache=b"cached", storage_uri="s3://bucket/key")
155 assert await read_object_bytes(obj) == b"cached"
156
157 async def test_reads_s3_uri(self) -> None:
158 obj = self._obj(storage_uri="s3://my-bucket/objects/sha256:abc123")
159 mock_backend = MagicMock()
160
161 async def _fake_get(oid: str, **_kw: str) -> bytes:
162 return b"s3 bytes"
163
164 mock_backend.get = _fake_get
165 with patch("musehub.storage.backends.get_backend", return_value=mock_backend):
166 assert await read_object_bytes(obj) == b"s3 bytes"
167
168 async def test_no_path_no_cache_returns_none(self) -> None:
169 assert await read_object_bytes(self._obj()) is None
170
171
172 # ═══════════════════════════════════════════════════════════════════════════════
173 # decode_b64
174 # ═══════════════════════════════════════════════════════════════════════════════
175
176 # ═══════════════════════════════════════════════════════════════════════════════
177 # presign_put / presign_batch β€” public_endpoint_url rewriting
178 # ═══════════════════════════════════════════════════════════════════════════════
179
180 class TestPresignPublicEndpoint:
181 """When BlobBackend is configured with a public_endpoint_url, presigned URLs
182 must use that host β€” not the internal endpoint host β€” so external clients
183 (the muse CLI running on the host) can actually reach the URL.
184
185 Real-world case: BLOB_STORAGE_ENDPOINT=http://minio:9000 (Docker-internal),
186 BLOB_STORAGE_PUBLIC_ENDPOINT=http://localhost:9000 (reachable from the host machine).
187 """
188
189 def _backend_with_internal_and_public(self) -> BlobBackend:
190 b = BlobBackend(
191 bucket="test-bucket",
192 region="us-east-1",
193 endpoint_url="http://minio:9000",
194 public_endpoint_url="http://localhost:9000",
195 )
196 mock_client = MagicMock()
197 mock_client.generate_presigned_url.return_value = (
198 "http://minio:9000/test-bucket/objects/sha256:abc?sig=x"
199 )
200 b._client = mock_client
201 return b
202
203 def _backend_without_public(self) -> BlobBackend:
204 b = BlobBackend(
205 bucket="test-bucket",
206 region="us-east-1",
207 endpoint_url="http://minio:9000",
208 )
209 mock_client = MagicMock()
210 mock_client.generate_presigned_url.return_value = (
211 "http://minio:9000/test-bucket/objects/sha256:abc?sig=x"
212 )
213 b._client = mock_client
214 return b
215
216 async def test_presign_put_rewrites_host_when_public_endpoint_set(self) -> None:
217 b = self._backend_with_internal_and_public()
218 url = await b.presign_put("sha256:abc", ttl_seconds=300)
219 assert "localhost:9000" in url, f"expected localhost:9000 in presigned URL, got: {url}"
220 assert "minio:9000" not in url, f"internal hostname leaked into presigned URL: {url}"
221
222 async def test_presign_put_no_rewrite_when_public_endpoint_absent(self) -> None:
223 b = self._backend_without_public()
224 url = await b.presign_put("sha256:abc", ttl_seconds=300)
225 assert "minio:9000" in url, f"expected minio:9000 in URL when no public endpoint: {url}"
226
227 async def test_presign_batch_rewrites_all_urls(self) -> None:
228 b = self._backend_with_internal_and_public()
229 b._client.generate_presigned_url.side_effect = [
230 f"http://minio:9000/bucket/objects/sha256:{i}?sig=x"
231 for i in ("aaa", "bbb", "ccc")
232 ]
233 result = await b.presign_batch(
234 ["sha256:aaa", "sha256:bbb", "sha256:ccc"], direction="put", ttl_seconds=300
235 )
236 for oid, url in result.items():
237 assert "localhost:9000" in url, f"[{oid}] expected localhost in URL: {url}"
238 assert "minio:9000" not in url, f"[{oid}] internal host leaked: {url}"
239
240 async def test_presign_batch_no_rewrite_when_public_endpoint_absent(self) -> None:
241 b = self._backend_without_public()
242 b._client.generate_presigned_url.side_effect = [
243 f"http://minio:9000/bucket/objects/sha256:{i}?sig=x"
244 for i in ("aaa", "bbb")
245 ]
246 result = await b.presign_batch(
247 ["sha256:aaa", "sha256:bbb"], direction="put", ttl_seconds=300
248 )
249 for url in result.values():
250 assert "minio:9000" in url
251
252 async def test_presign_put_preserves_path_and_query(self) -> None:
253 """Rewriting must only change the scheme+host, not the path or query string."""
254 b = self._backend_with_internal_and_public()
255 b._client.generate_presigned_url.return_value = (
256 "http://minio:9000/test-bucket/objects/sha256:abc?X-Amz-Signature=deadbeef&X-Amz-Expires=300"
257 )
258 url = await b.presign_put("sha256:abc", ttl_seconds=300)
259 assert "X-Amz-Signature=deadbeef" in url
260 assert "X-Amz-Expires=300" in url
261 assert "/test-bucket/objects/sha256:abc" in url
262
263
264 # ═══════════════════════════════════════════════════════════════════════════════
265 # decode_b64
266 # ═══════════════════════════════════════════════════════════════════════════════
267
268 class TestDecodeb64:
269 def test_standard(self) -> None:
270 import base64
271 data = b"hello world"
272 assert decode_b64(base64.b64encode(data).decode()) == data
273
274 def test_missing_padding(self) -> None:
275 import base64
276 for data in (b"h", b"hi", b"hel"):
277 encoded = base64.b64encode(data).decode().rstrip("=")
278 assert decode_b64(encoded) == data
279
280 def test_empty(self) -> None:
281 assert decode_b64("") == b""