gabriel / musehub public
test_mpack_index_job_phase4.py python
357 lines 13.2 KB
Raw
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor ⚠ breaking 21 days ago
1 """TDD — Phase 4: observability timing breakdown.
2
3 process_mpack_index_job must return a result dict that a monitoring dashboard
4 or alerting rule can consume without re-parsing log lines.
5
6 Phase 4 invariants:
7 1. Return dict contains per-phase timing keys (ms float, non-negative).
8 2. Return dict contains mpack_size_bytes (int, positive).
9 3. Sum of individual phase timings ≤ elapsed_ms + measurement overhead.
10 4. The structured summary log line is emitted with all phase timings.
11
12 Phase timing keys:
13 fetch_mpack_ms — MinIO GET (one round-trip, critical path)
14 unpack_ms — msgpack.unpackb (O(mpack_size), CPU-bound)
15 snapshot_insert_ms — bulk INSERT musehub_snapshots
16 commit_insert_ms — bulk INSERT musehub_commits
17 object_puts_ms — parallel MinIO PUTs (network-bound, batched)
18 object_insert_ms — bulk INSERT / ON CONFLICT DO UPDATE musehub_objects
19 object_refs_ms — upsert musehub_object_refs
20 """
21 from __future__ import annotations
22
23 import datetime
24 import hashlib
25 import logging
26 import pathlib
27
28 import msgpack
29 import pytest
30 import pytest_asyncio
31
32 pytestmark = pytest.mark.skip(reason="muse wire protocol in flux")
33 from httpx import AsyncClient, ASGITransport
34 from sqlalchemy.ext.asyncio import AsyncSession
35
36 from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request
37 from musehub.db.database import get_db
38 from musehub.main import app
39
40 from muse.core.object_store import write_object
41 from muse.core.mpack import build_mpack
42 from muse.core.paths import muse_dir
43 from muse.core.snapshot import compute_commit_id, compute_snapshot_id
44 from muse.core.commits import CommitRecord, write_commit
45 from muse.core.refs import write_branch_ref
46 from muse.core.snapshots import SnapshotRecord, write_snapshot
47 from muse.core.types import blob_id
48
49
50 _AUTH_CTX = MSignContext(
51 handle="gabriel",
52 identity_id="sha256:" + "0" * 64,
53 is_agent=False,
54 is_admin=True,
55 )
56
57 _N_FILES = 8
58 _N_COMMITS = 4
59 _FILES_CHANGED = 2
60 _BLOB_SIZE = 128
61
62 # Canonical set of per-phase timing keys Phase 4 requires.
63 _PHASE_TIMING_KEYS: frozenset[str] = frozenset({
64 "fetch_mpack_ms",
65 "unpack_ms",
66 "snapshot_insert_ms",
67 "commit_insert_ms",
68 "object_puts_ms",
69 "object_insert_ms",
70 "object_refs_ms",
71 })
72
73
74 # ── fixtures ────────────────────────────────────────────────────────────────
75
76 @pytest_asyncio.fixture()
77 async def client(db_session: AsyncSession) -> None:
78 async def _override_get_db() -> None:
79 yield db_session
80
81 app.dependency_overrides[get_db] = _override_get_db
82 app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX
83 app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX
84
85 async with AsyncClient(
86 transport=ASGITransport(app=app),
87 base_url="https://localhost:1337",
88 ) as c:
89 yield c
90
91 app.dependency_overrides.clear()
92
93
94 @pytest_asyncio.fixture()
95 async def repo(client: AsyncClient) -> None:
96 resp = await client.post(
97 "/api/repos",
98 json={"owner": "gabriel", "name": "phase4-obs-test", "visibility": "public", "initialize": False},
99 )
100 assert resp.status_code in (200, 201), resp.text
101 data = resp.json()
102 yield data["slug"]
103 await client.delete(f"/api/repos/{data['repoId']}")
104
105
106 def _make_repo(tmp: pathlib.Path) -> tuple[pathlib.Path, str, dict]:
107 tmp.mkdir(parents=True, exist_ok=True)
108 dot = muse_dir(tmp)
109 dot.mkdir()
110 (dot / "repo.json").write_text('{"repo_id":"phase4-test","owner":"gabriel"}')
111 for d in ("commits", "snapshots", "objects"):
112 (dot / d).mkdir()
113 (dot / "refs" / "heads").mkdir(parents=True)
114 (dot / "HEAD").write_text("ref: refs/heads/main\n")
115 (dot / "config.toml").write_text("")
116
117 blob_ids: list[str] = []
118 for i in range(_N_FILES):
119 data = f"base-{i:04d}".encode() + b"x" * _BLOB_SIZE
120 oid = blob_id(data)
121 write_object(tmp, oid, data)
122 blob_ids.append(oid)
123
124 base_manifest = {f"src/file_{i:04d}.py": blob_ids[i] for i in range(_N_FILES)}
125 parent = None
126 tip = ""
127 ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
128
129 for i in range(_N_COMMITS):
130 manifest = dict(base_manifest)
131 for j in range(_FILES_CHANGED):
132 idx = (i * _FILES_CHANGED + j) % _N_FILES
133 raw = f"c{i:04d}-f{j}".encode() + b"y" * _BLOB_SIZE
134 oid = blob_id(raw)
135 write_object(tmp, oid, raw)
136 manifest[f"src/file_{idx:04d}.py"] = oid
137
138 sid = compute_snapshot_id(manifest)
139 write_snapshot(tmp, SnapshotRecord(snapshot_id=sid, manifest=manifest))
140 msg = f"commit-{i:05d}"
141 cid = compute_commit_id(
142 parent_ids=[parent] if parent else [],
143 snapshot_id=sid,
144 message=msg,
145 committed_at_iso=ts.isoformat(),
146 author="gabriel",
147 )
148 write_commit(tmp, CommitRecord(
149 commit_id=cid, branch="main",
150 snapshot_id=sid, message=msg, committed_at=ts,
151 parent_commit_id=parent, parent2_commit_id=None,
152 author="gabriel", metadata={}, structured_delta=None,
153 sem_ver_bump="none", breaking_changes=[],
154 agent_id="", model_id="", toolchain_id="",
155 prompt_hash="", signature="", signer_key_id="",
156 ))
157 parent = cid
158 tip = cid
159 ts += datetime.timedelta(seconds=60)
160
161 write_branch_ref(tmp, "main", tip)
162 mpack = build_mpack(tmp, [tip], have=[])
163 return tmp, tip, mpack
164
165
166 async def _push_mpack(client: AsyncClient, repo_slug: str, mpack: bytes, head: str, db_session: AsyncSession) -> str:
167 """Upload mpack to MinIO and create a mpack.index job row. Returns job_id."""
168 import httpx as _httpx
169 from datetime import datetime, timezone
170 from sqlalchemy import select as _select
171 from musehub.db.musehub_repo_models import MusehubRepo as _Repo
172 from musehub.core.genesis import compute_job_id as _compute_job_id
173 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
174
175 repo_row = (await db_session.execute(
176 _select(_Repo).where(_Repo.slug == repo_slug)
177 )).scalar_one()
178 repo_id = repo_row.repo_id
179
180 wire_bytes = msgpack.packb(mpack, use_bin_type=True)
181 mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest()
182 n_objects = len(mpack.get("objects") or [])
183
184 pr = await client.post(
185 f"/gabriel/{repo_slug}/push/mpack-presign",
186 content=msgpack.packb({"mpack_key": mpack_key, "size_bytes": len(wire_bytes)}, use_bin_type=True),
187 headers={"Content-Type": "application/x-msgpack"},
188 )
189 assert pr.status_code == 200, pr.text
190 upload_url = pr.json().get("upload_url") or pr.json().get("uploadUrl")
191
192 async with _httpx.AsyncClient() as raw:
193 put = await raw.put(upload_url, content=wire_bytes)
194 assert put.status_code in (200, 204)
195
196 now = datetime.now(tz=timezone.utc)
197 job_id = _compute_job_id(repo_id, "mpack.index", now.isoformat())
198 db_session.add(MusehubBackgroundJob(
199 job_id=job_id,
200 repo_id=repo_id,
201 job_type="mpack.index",
202 payload={
203 "mpack_key": mpack_key,
204 "pusher_id": "sha256:" + "0" * 64,
205 "branch": "main",
206 "head": head,
207 "force": False,
208 "declared_objects_count": n_objects,
209 },
210 status="pending",
211 created_at=now,
212 attempt=0,
213 ))
214 await db_session.flush()
215 return job_id
216
217
218 # ── Phase 4 tests ───────────────────────────────────────────────────────────
219
220 @pytest.mark.asyncio
221 async def test_return_dict_has_per_phase_timing_keys(
222 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
223 ) -> None:
224 """Return dict contains all seven per-phase timing keys.
225
226 These keys let monitoring dashboards alert on individual phases
227 (e.g., 'object_puts_ms > 5000') without parsing log lines.
228 """
229 _, head, mpack = _make_repo(tmp_path / "repo")
230 job_id = await _push_mpack(client, repo, mpack, head, db_session)
231
232 from musehub.services.musehub_wire import process_mpack_index_job
233 result = await process_mpack_index_job(db_session, job_id)
234 await db_session.commit()
235
236 missing = _PHASE_TIMING_KEYS - set(result.keys())
237 assert not missing, (
238 f"process_mpack_index_job return dict is missing phase timing keys: {missing}\n"
239 f"Got keys: {sorted(result.keys())}"
240 )
241
242
243 @pytest.mark.asyncio
244 async def test_all_phase_timings_are_non_negative(
245 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
246 ) -> None:
247 """Every timing value is a non-negative float.
248
249 A negative timing would indicate a clock anomaly or a bug in the
250 monotonic timer checkpoints. Zero is allowed (empty mpack phase).
251 """
252 _, head, mpack = _make_repo(tmp_path / "repo")
253 job_id = await _push_mpack(client, repo, mpack, head, db_session)
254
255 from musehub.services.musehub_wire import process_mpack_index_job
256 result = await process_mpack_index_job(db_session, job_id)
257 await db_session.commit()
258
259 bad = {k: result[k] for k in _PHASE_TIMING_KEYS if result[k] < 0}
260 assert not bad, (
261 f"Negative timing values in result: {bad}\n"
262 "Monotonic clock should never go backwards."
263 )
264
265
266 @pytest.mark.asyncio
267 async def test_mpack_size_bytes_in_return_dict(
268 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
269 ) -> None:
270 """Return dict contains mpack_size_bytes — the raw wire size in bytes.
271
272 mpack_size_bytes lets dashboards correlate job timing against mpack size
273 (e.g., MB/s throughput for object_puts, overall fetch rate).
274 """
275 _, head, mpack = _make_repo(tmp_path / "repo")
276 wire_bytes = msgpack.packb(mpack, use_bin_type=True)
277 expected_size = len(wire_bytes)
278
279 job_id = await _push_mpack(client, repo, mpack, head, db_session)
280
281 from musehub.services.musehub_wire import process_mpack_index_job
282 result = await process_mpack_index_job(db_session, job_id)
283 await db_session.commit()
284
285 assert "mpack_size_bytes" in result, (
286 f"mpack_size_bytes missing from result dict. Got: {sorted(result.keys())}"
287 )
288 assert result["mpack_size_bytes"] == expected_size, (
289 f"mpack_size_bytes {result['mpack_size_bytes']} != expected {expected_size}"
290 )
291
292
293 @pytest.mark.asyncio
294 async def test_phase_timings_sum_within_elapsed(
295 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
296 ) -> None:
297 """Sum of per-phase timings is at most elapsed_ms + 50 ms measurement overhead.
298
299 This catches bugs where a phase timer checkpoint is in the wrong place
300 (e.g., t_commits measured before the commit loop instead of after).
301 Overhead budget of 50 ms covers timer calls, logging, and context switches.
302 """
303 _, head, mpack = _make_repo(tmp_path / "repo")
304 job_id = await _push_mpack(client, repo, mpack, head, db_session)
305
306 from musehub.services.musehub_wire import process_mpack_index_job
307 result = await process_mpack_index_job(db_session, job_id)
308 await db_session.commit()
309
310 phase_sum = sum(result[k] for k in _PHASE_TIMING_KEYS)
311 elapsed = result["elapsed_ms"]
312 overhead_budget_ms = 50.0
313
314 assert phase_sum <= elapsed + overhead_budget_ms, (
315 f"Sum of phase timings ({phase_sum:.1f} ms) exceeds elapsed_ms "
316 f"({elapsed:.1f} ms) by more than {overhead_budget_ms} ms — "
317 "a timer checkpoint is likely misplaced."
318 )
319
320
321 @pytest.mark.asyncio
322 async def test_summary_log_contains_phase_breakdown(
323 client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession,
324 caplog: pytest.LogCaptureFixture,
325 ) -> None:
326 """The structured summary log line contains all phase timing labels.
327
328 Ops teams need to grep a single log line per job and see the full
329 timing breakdown — not reconstruct it from seven separate lines.
330 The summary log must contain every phase key that appears in the
331 return dict so log-based alerts and dashboards stay in sync.
332 """
333 _, head, mpack = _make_repo(tmp_path / "repo")
334 job_id = await _push_mpack(client, repo, mpack, head, db_session)
335
336 from musehub.services.musehub_wire import process_mpack_index_job
337 with caplog.at_level(logging.INFO, logger="musehub.services.musehub_wire"):
338 result = await process_mpack_index_job(db_session, job_id)
339 await db_session.commit()
340
341 # Find the summary log line (the ✅ done line)
342 summary_lines = [r.message for r in caplog.records if "mpack.index" in r.message and "done" in r.message]
343 assert summary_lines, (
344 "No '✅ [mpack.index] done' summary log line found. "
345 f"All mpack.index log lines: {[r.message for r in caplog.records if 'mpack.index' in r.message]}"
346 )
347
348 summary = summary_lines[-1]
349 _PHASE_LABELS = (
350 "fetch_mpack", "unpack", "object_puts",
351 "object_insert", "object_refs", "snapshot_insert", "commit_insert",
352 )
353 missing_labels = [label for label in _PHASE_LABELS if label not in summary]
354 assert not missing_labels, (
355 f"Summary log line missing phase labels: {missing_labels}\n"
356 f"Summary: {summary!r}"
357 )
File History 1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor 21 days ago