gabriel / muse public
test_core_coord_bus.py python
433 lines 19.6 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
1 """Tests for muse.core.coord_bus — coordination bus HTTP client.
2
3 Covers all acceptance criteria:
4
5 Unit:
6 - _build_url: owner/slug %-encoding, path traversal prevention
7 - push_to_hub: request body format, token header, response parsing
8 - pull_from_hub: request body format, cursor, kind filter, limit
9
10 Integration (mock HTTP):
11 - Successful push returns inserted/skipped counts
12 - Successful pull returns records + cursor
13 - 401 produces CoordBusError with "Authentication failed"
14 - 404 produces CoordBusError with status_code=404
15 - Network error produces CoordBusError with status_code=0
16 - Oversized response produces CoordBusError
17
18 Security:
19 - Owner with path traversal (../etc) is %-encoded in URL
20 - Slug with special chars is %-encoded in URL
21 - Redirect refused (status_code propagated)
22 - Token never appears in error messages (401 body masked)
23
24 Validation:
25 - push_to_hub with empty records raises ValueError
26 - push_to_hub with too many records raises ValueError
27 - pull_from_hub with out-of-range limit raises ValueError
28
29 Stress:
30 - 500-record push payload serializes correctly
31 - 1000-record pull response parsed correctly
32 - 100 sequential push_to_hub calls with mock (sub-second)
33 """
34
35 from __future__ import annotations
36
37 import itertools
38 import json
39 import time
40 from contextlib import AbstractContextManager
41 from io import BytesIO
42 from unittest.mock import MagicMock, patch
43
44 import pytest
45
46 from muse.core.types import MsgpackDict, content_hash
47
48 _id_seq = itertools.count()
49
50
51 def _new_id() -> str:
52 return content_hash({"seq": next(_id_seq)})
53 from muse.core.coord_bus import (
54 CoordBusError,
55 MAX_PULL_LIMIT,
56 MAX_PUSH_BATCH,
57 _build_url,
58 pull_from_hub,
59 push_to_hub,
60 )
61
62
63 # ── Helpers ────────────────────────────────────────────────────────────────────
64
65
66 def _make_signing() -> "SigningIdentity":
67 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
68 from muse.core.transport import SigningIdentity
69 return SigningIdentity(handle="testuser", private_key=Ed25519PrivateKey.generate())
70
71
72 def _mock_response(body: MsgpackDict, status: int = 200) -> MagicMock:
73 """Build a mock urllib response object for ``_open_url``."""
74 raw = json.dumps(body).encode("utf-8")
75 resp = MagicMock()
76 resp.read.return_value = raw
77 resp.__enter__ = lambda s: s
78 resp.__exit__ = MagicMock(return_value=False)
79 return resp
80
81
82 def _patch_open(body: MsgpackDict, status: int = 200) -> AbstractContextManager[MagicMock]:
83 """Context manager that patches ``_open_url`` in coord_bus."""
84 return patch(
85 "muse.core.coord_bus._STRICT_OPENER.open",
86 return_value=_mock_response(body, status),
87 )
88
89
90 def _make_record(kind: str = "reservation") -> MsgpackDict:
91 return {
92 "kind": kind,
93 "record_id": _new_id(),
94 "run_id": "agent-1",
95 "payload": {"note": "test"},
96 }
97
98
99 # ── Unit: _build_url ───────────────────────────────────────────────────────────
100
101
102 class TestBuildUrl:
103 def test_basic_url(self) -> None:
104 url = _build_url("https://localhost:1337", "gabriel", "myrepo", "coord/push")
105 assert url == "https://localhost:1337/gabriel/myrepo/coord/push"
106
107 def test_trailing_slash_stripped_from_hub(self) -> None:
108 url = _build_url("https://localhost:1337/", "gabriel", "myrepo", "coord/push")
109 assert url == "https://localhost:1337/gabriel/myrepo/coord/push"
110
111 def test_owner_percent_encoded(self) -> None:
112 url = _build_url("https://localhost:1337", "../traversal", "myrepo", "coord/push")
113 assert "../traversal" not in url
114 assert "%2F" in url or "%2E%2E" in url or "..%2Ftraversal" in url
115
116 def test_slug_percent_encoded(self) -> None:
117 url = _build_url("https://localhost:1337", "gabriel", "my repo", "coord/push")
118 assert " " not in url
119 assert "my%20repo" in url
120
121 def test_path_traversal_in_owner_encoded(self) -> None:
122 url = _build_url("http://hub", "../../etc", "passwd", "coord/push")
123 # Internal slashes within the owner component MUST be %-encoded so the
124 # owner cannot span multiple path segments (RFC 3986 — %2F ≠ path sep).
125 # The URL structure is /owner/slug/endpoint — only structural '/' chars
126 # are literal; internal ones become %2F.
127 assert "%2F" in url # slashes within owner are encoded
128 # There must be no more than 4 literal slashes: after scheme + host + 3 path seps.
129 path = url.split("://", 1)[1] # strip scheme
130 path_part = path.split("/", 1)[1] if "/" in path else path # strip host
131 # In the path, the ONLY literal '/' must be the owner/slug/endpoint separators.
132 # The traversal '..' must not produce new path segments.
133 assert "..%2F" in url or "%2F.." in url
134
135 def test_special_chars_in_slug(self) -> None:
136 url = _build_url("http://hub", "gabriel", "my@repo", "coord/pull")
137 assert "my%40repo" in url
138
139 def test_push_and_pull_endpoints(self) -> None:
140 push_url = _build_url("http://hub", "u", "r", "coord/push")
141 pull_url = _build_url("http://hub", "u", "r", "coord/pull")
142 assert push_url.endswith("coord/push")
143 assert pull_url.endswith("coord/pull")
144
145
146 # ── Unit: push_to_hub ─────────────────────────────────────────────────────────
147
148
149 class TestPushToHub:
150 def test_push_returns_inserted_skipped(self) -> None:
151 records = [_make_record()]
152 with _patch_open({"inserted": 1, "skipped": 0}):
153 result = push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing())
154 assert result["inserted"] == 1
155 assert result["skipped"] == 0
156
157 def test_push_empty_records_raises(self) -> None:
158 with pytest.raises(ValueError, match="non-empty"):
159 push_to_hub("http://hub", "gabriel", "repo", [], signing=_make_signing())
160
161 def test_push_too_many_records_raises(self) -> None:
162 records = [_make_record() for _ in range(MAX_PUSH_BATCH + 1)]
163 with pytest.raises(ValueError, match="maximum batch size"):
164 push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing())
165
166 def test_push_sends_authorization_header(self) -> None:
167 records = [_make_record()]
168 with patch("muse.core.coord_bus._STRICT_OPENER.open") as mock_open:
169 mock_open.return_value = _mock_response({"inserted": 1, "skipped": 0})
170 push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing())
171 req = mock_open.call_args[0][0]
172 assert req.get_header("Authorization").startswith("MSign ")
173
174 def test_push_sends_correct_content_type(self) -> None:
175 records = [_make_record()]
176 with patch("muse.core.coord_bus._STRICT_OPENER.open") as mock_open:
177 mock_open.return_value = _mock_response({"inserted": 1, "skipped": 0})
178 push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing())
179 req = mock_open.call_args[0][0]
180 assert "application/json" in req.get_header("Content-type")
181
182 def test_push_request_body_structure(self) -> None:
183 records = [_make_record()]
184 captured = {}
185 with patch("muse.core.coord_bus._STRICT_OPENER.open") as mock_open:
186 mock_open.return_value = _mock_response({"inserted": 1, "skipped": 0})
187 push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing())
188 req = mock_open.call_args[0][0]
189 captured["body"] = json.loads(req.data)
190 assert "records" in captured["body"]
191 assert len(captured["body"]["records"]) == 1
192
193 def test_push_401_raises_with_auth_message(self) -> None:
194 import urllib.error
195 records = [_make_record()]
196 err = urllib.error.HTTPError(
197 "http://hub", 401, "Unauthorized", {}, BytesIO(b"token leaked here")
198 )
199 with patch("muse.core.coord_bus._STRICT_OPENER.open", side_effect=err):
200 with pytest.raises(CoordBusError) as exc_info:
201 push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing())
202 # 401 body must NOT appear in the message (credentials could be reflected).
203 assert "token leaked here" not in str(exc_info.value)
204 assert "Authentication failed" in str(exc_info.value)
205 assert exc_info.value.status_code == 401
206
207 def test_push_404_raises_with_status_code(self) -> None:
208 import urllib.error
209 records = [_make_record()]
210 err = urllib.error.HTTPError("http://hub", 404, "Not Found", {}, BytesIO(b"not found"))
211 with patch("muse.core.coord_bus._STRICT_OPENER.open", side_effect=err):
212 with pytest.raises(CoordBusError) as exc_info:
213 push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing())
214 assert exc_info.value.status_code == 404
215
216 def test_push_network_error_raises(self) -> None:
217 import urllib.error
218 records = [_make_record()]
219 err = urllib.error.URLError("Connection refused")
220 with patch("muse.core.coord_bus._STRICT_OPENER.open", side_effect=err):
221 with pytest.raises(CoordBusError) as exc_info:
222 push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing())
223 assert exc_info.value.status_code == 0
224
225 def test_push_oversized_response_raises(self) -> None:
226 from muse.core.coord_bus import MAX_COORD_RESPONSE_BYTES
227
228 huge_body = b"x" * (MAX_COORD_RESPONSE_BYTES + 2)
229 resp = MagicMock()
230 resp.read.return_value = huge_body
231 resp.__enter__ = lambda s: s
232 resp.__exit__ = MagicMock(return_value=False)
233
234 records = [_make_record()]
235 with patch("muse.core.coord_bus._STRICT_OPENER.open", return_value=resp):
236 with pytest.raises(CoordBusError, match="exceeded"):
237 push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing())
238
239 def test_push_max_batch_exactly_accepted(self) -> None:
240 records = [_make_record() for _ in range(MAX_PUSH_BATCH)]
241 with _patch_open({"inserted": MAX_PUSH_BATCH, "skipped": 0}):
242 result = push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing())
243 assert result["inserted"] == MAX_PUSH_BATCH
244
245 def test_push_no_token_does_not_send_auth_header(self) -> None:
246 records = [_make_record()]
247 with patch("muse.core.coord_bus._STRICT_OPENER.open") as mock_open:
248 mock_open.return_value = _mock_response({"inserted": 1, "skipped": 0})
249 push_to_hub("http://hub", "gabriel", "repo", records, signing=None)
250 req = mock_open.call_args[0][0]
251 assert req.get_header("Authorization") is None
252
253
254 # ── Unit: pull_from_hub ───────────────────────────────────────────────────────
255
256
257 class TestPullFromHub:
258 def test_pull_returns_records_and_cursor(self) -> None:
259 uid = _new_id()
260 records = [{"id": 1, "kind": "reservation", "record_id": uid, "payload": {}}]
261 with _patch_open({"records": records, "cursor": 1}):
262 result = pull_from_hub("http://hub", "gabriel", "repo", signing=_make_signing())
263 assert len(result["records"]) == 1
264 assert result["cursor"] == 1
265
266 def test_pull_out_of_range_limit_raises(self) -> None:
267 with pytest.raises(ValueError, match="limit must be"):
268 pull_from_hub("http://hub", "gabriel", "repo", limit=0)
269 with pytest.raises(ValueError, match="limit must be"):
270 pull_from_hub("http://hub", "gabriel", "repo", limit=MAX_PULL_LIMIT + 1)
271
272 def test_pull_sends_correct_body(self) -> None:
273 captured = {}
274 with patch("muse.core.coord_bus._STRICT_OPENER.open") as mock_open:
275 mock_open.return_value = _mock_response({"records": [], "cursor": 0})
276 pull_from_hub(
277 "http://hub", "gabriel", "repo",
278 since_id=42,
279 kinds=["reservation"],
280 limit=100,
281 signing=_make_signing(),
282 )
283 req = mock_open.call_args[0][0]
284 captured["body"] = json.loads(req.data)
285 assert captured["body"]["since_id"] == 42
286 assert "reservation" in captured["body"]["kinds"]
287 assert captured["body"]["limit"] == 100
288
289 def test_pull_empty_kinds_sends_empty_list(self) -> None:
290 captured = {}
291 with patch("muse.core.coord_bus._STRICT_OPENER.open") as mock_open:
292 mock_open.return_value = _mock_response({"records": [], "cursor": 0})
293 pull_from_hub("http://hub", "gabriel", "repo", signing=_make_signing())
294 req = mock_open.call_args[0][0]
295 captured["body"] = json.loads(req.data)
296 assert captured["body"]["kinds"] == []
297
298 def test_pull_401_raises_with_masked_body(self) -> None:
299 import urllib.error
300 err = urllib.error.HTTPError(
301 "http://hub", 401, "Unauthorized", {}, BytesIO(b"secret_token_here")
302 )
303 with patch("muse.core.coord_bus._STRICT_OPENER.open", side_effect=err):
304 with pytest.raises(CoordBusError) as exc_info:
305 pull_from_hub("http://hub", "gabriel", "repo", signing=_make_signing())
306 assert "secret_token_here" not in str(exc_info.value)
307 assert exc_info.value.status_code == 401
308
309 def test_pull_network_error(self) -> None:
310 import urllib.error
311 err = urllib.error.URLError("Name resolution failure")
312 with patch("muse.core.coord_bus._STRICT_OPENER.open", side_effect=err):
313 with pytest.raises(CoordBusError) as exc_info:
314 pull_from_hub("http://hub", "gabriel", "repo", signing=_make_signing())
315 assert exc_info.value.status_code == 0
316
317 def test_pull_default_since_id_is_zero(self) -> None:
318 captured = {}
319 with patch("muse.core.coord_bus._STRICT_OPENER.open") as mock_open:
320 mock_open.return_value = _mock_response({"records": [], "cursor": 0})
321 pull_from_hub("http://hub", "gabriel", "repo")
322 req = mock_open.call_args[0][0]
323 captured["body"] = json.loads(req.data)
324 assert captured["body"]["since_id"] == 0
325
326 def test_pull_exact_max_limit_accepted(self) -> None:
327 with _patch_open({"records": [], "cursor": 0}):
328 result = pull_from_hub(
329 "http://hub", "gabriel", "repo", limit=MAX_PULL_LIMIT
330 )
331 assert result["cursor"] == 0
332
333
334 # ── Security tests ─────────────────────────────────────────────────────────────
335
336
337 class TestCoordBusSecurity:
338 def test_redirect_refused(self) -> None:
339 """_NoRedirectHandler refuses all redirects."""
340 import urllib.error
341 from muse.core.coord_bus import _NoRedirectHandler
342
343 handler = _NoRedirectHandler()
344 req_mock = MagicMock()
345 req_mock.full_url = "http://hub/push"
346 fp_mock = MagicMock()
347 headers_mock = MagicMock()
348
349 with pytest.raises(urllib.error.HTTPError) as exc_info:
350 handler.redirect_request(
351 req_mock, fp_mock, 301, "Moved Permanently",
352 headers_mock, "http://other-host/malicious"
353 )
354 assert "Redirect refused" in str(exc_info.value.msg)
355
356 def test_401_body_never_exposed(self) -> None:
357 """401 response body must not appear in CoordBusError message."""
358 import urllib.error
359 sensitive = "SENSITIVE_CREDENTIAL_DATA"
360 err = urllib.error.HTTPError("http://hub", 401, "Unauth", {}, BytesIO(sensitive.encode()))
361 records = [_make_record()]
362 with patch("muse.core.coord_bus._STRICT_OPENER.open", side_effect=err):
363 with pytest.raises(CoordBusError) as exc_info:
364 push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing())
365 assert "SENSITIVE_TOKEN" not in str(exc_info.value)
366
367 def test_signing_not_in_url(self) -> None:
368 """Signing identity must appear only in Authorization header, never in the URL."""
369 records = [_make_record()]
370 with patch("muse.core.coord_bus._STRICT_OPENER.open") as mock_open:
371 mock_open.return_value = _mock_response({"inserted": 1, "skipped": 0})
372 si = _make_signing()
373 push_to_hub("http://hub", "gabriel", "repo", records, signing=si)
374 req = mock_open.call_args[0][0]
375 assert "testuser" not in req.full_url
376
377 def test_invalid_json_response_raises(self) -> None:
378 """Non-JSON response body raises CoordBusError, not unhandled exception."""
379 resp = MagicMock()
380 resp.read.return_value = b"not json at all %%"
381 resp.__enter__ = lambda s: s
382 resp.__exit__ = MagicMock(return_value=False)
383 records = [_make_record()]
384 with patch("muse.core.coord_bus._STRICT_OPENER.open", return_value=resp):
385 with pytest.raises(CoordBusError, match="Invalid JSON"):
386 push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing())
387
388
389 # ── Stress tests ───────────────────────────────────────────────────────────────
390
391
392 class TestCoordBusStress:
393 def test_push_500_records_serializes_fast(self) -> None:
394 """500-record batch push serializes and deserializes in < 1 second."""
395 records = [_make_record() for _ in range(500)]
396 t0 = time.monotonic()
397 with _patch_open({"inserted": 500, "skipped": 0}):
398 result = push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing())
399 elapsed = time.monotonic() - t0
400 assert result["inserted"] == 500
401 assert elapsed < 1.0, f"500-record push took {elapsed:.2f}s"
402
403 def test_pull_1000_records_parsed_fast(self) -> None:
404 """Parsing a 1000-record pull response completes in < 1 second."""
405 uid_list = [_new_id() for _ in range(1000)]
406 records = [
407 {"id": i + 1, "kind": "reservation", "record_id": uid, "payload": {}}
408 for i, uid in enumerate(uid_list)
409 ]
410 t0 = time.monotonic()
411 with _patch_open({"records": records, "cursor": 1000}):
412 result = pull_from_hub("http://hub", "gabriel", "repo", limit=1000)
413 elapsed = time.monotonic() - t0
414 assert len(result["records"]) == 1000
415 assert elapsed < 1.0, f"1000-record pull took {elapsed:.2f}s"
416
417 def test_100_sequential_push_calls_with_mock(self) -> None:
418 """100 sequential push_to_hub calls complete in < 2 seconds with mocked HTTP."""
419 records = [_make_record()]
420 t0 = time.monotonic()
421 for _ in range(100):
422 with _patch_open({"inserted": 1, "skipped": 0}):
423 push_to_hub("http://hub", "gabriel", "repo", records, signing=_make_signing())
424 elapsed = time.monotonic() - t0
425 assert elapsed < 2.0, f"100 sequential pushes took {elapsed:.2f}s"
426
427 def test_build_url_100k_calls_fast(self) -> None:
428 """_build_url is called frequently; 100k calls must complete in < 1s."""
429 t0 = time.monotonic()
430 for _ in range(100_000):
431 _build_url("https://localhost:1337", "gabriel", "myrepo", "coord/push")
432 elapsed = time.monotonic() - t0
433 assert elapsed < 1.0, f"100k _build_url calls took {elapsed:.2f}s"
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 29 days ago