test_mpack_index_job_phase4.py
python
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠ breaking
21 days ago
| 1 | """TDD — Phase 4: observability timing breakdown. |
| 2 | |
| 3 | process_mpack_index_job must return a result dict that a monitoring dashboard |
| 4 | or alerting rule can consume without re-parsing log lines. |
| 5 | |
| 6 | Phase 4 invariants: |
| 7 | 1. Return dict contains per-phase timing keys (ms float, non-negative). |
| 8 | 2. Return dict contains mpack_size_bytes (int, positive). |
| 9 | 3. Sum of individual phase timings ≤ elapsed_ms + measurement overhead. |
| 10 | 4. The structured summary log line is emitted with all phase timings. |
| 11 | |
| 12 | Phase timing keys: |
| 13 | fetch_mpack_ms — MinIO GET (one round-trip, critical path) |
| 14 | unpack_ms — msgpack.unpackb (O(mpack_size), CPU-bound) |
| 15 | snapshot_insert_ms — bulk INSERT musehub_snapshots |
| 16 | commit_insert_ms — bulk INSERT musehub_commits |
| 17 | object_puts_ms — parallel MinIO PUTs (network-bound, batched) |
| 18 | object_insert_ms — bulk INSERT / ON CONFLICT DO UPDATE musehub_objects |
| 19 | object_refs_ms — upsert musehub_object_refs |
| 20 | """ |
| 21 | from __future__ import annotations |
| 22 | |
| 23 | import datetime |
| 24 | import hashlib |
| 25 | import logging |
| 26 | import pathlib |
| 27 | |
| 28 | import msgpack |
| 29 | import pytest |
| 30 | import pytest_asyncio |
| 31 | |
| 32 | pytestmark = pytest.mark.skip(reason="muse wire protocol in flux") |
| 33 | from httpx import AsyncClient, ASGITransport |
| 34 | from sqlalchemy.ext.asyncio import AsyncSession |
| 35 | |
| 36 | from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request |
| 37 | from musehub.db.database import get_db |
| 38 | from musehub.main import app |
| 39 | |
| 40 | from muse.core.object_store import write_object |
| 41 | from muse.core.mpack import build_mpack |
| 42 | from muse.core.paths import muse_dir |
| 43 | from muse.core.snapshot import compute_commit_id, compute_snapshot_id |
| 44 | from muse.core.commits import CommitRecord, write_commit |
| 45 | from muse.core.refs import write_branch_ref |
| 46 | from muse.core.snapshots import SnapshotRecord, write_snapshot |
| 47 | from muse.core.types import blob_id |
| 48 | |
| 49 | |
| 50 | _AUTH_CTX = MSignContext( |
| 51 | handle="gabriel", |
| 52 | identity_id="sha256:" + "0" * 64, |
| 53 | is_agent=False, |
| 54 | is_admin=True, |
| 55 | ) |
| 56 | |
| 57 | _N_FILES = 8 |
| 58 | _N_COMMITS = 4 |
| 59 | _FILES_CHANGED = 2 |
| 60 | _BLOB_SIZE = 128 |
| 61 | |
| 62 | # Canonical set of per-phase timing keys Phase 4 requires. |
| 63 | _PHASE_TIMING_KEYS: frozenset[str] = frozenset({ |
| 64 | "fetch_mpack_ms", |
| 65 | "unpack_ms", |
| 66 | "snapshot_insert_ms", |
| 67 | "commit_insert_ms", |
| 68 | "object_puts_ms", |
| 69 | "object_insert_ms", |
| 70 | "object_refs_ms", |
| 71 | }) |
| 72 | |
| 73 | |
| 74 | # ── fixtures ──────────────────────────────────────────────────────────────── |
| 75 | |
| 76 | @pytest_asyncio.fixture() |
| 77 | async def client(db_session: AsyncSession) -> None: |
| 78 | async def _override_get_db() -> None: |
| 79 | yield db_session |
| 80 | |
| 81 | app.dependency_overrides[get_db] = _override_get_db |
| 82 | app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX |
| 83 | app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX |
| 84 | |
| 85 | async with AsyncClient( |
| 86 | transport=ASGITransport(app=app), |
| 87 | base_url="https://localhost:1337", |
| 88 | ) as c: |
| 89 | yield c |
| 90 | |
| 91 | app.dependency_overrides.clear() |
| 92 | |
| 93 | |
| 94 | @pytest_asyncio.fixture() |
| 95 | async def repo(client: AsyncClient) -> None: |
| 96 | resp = await client.post( |
| 97 | "/api/repos", |
| 98 | json={"owner": "gabriel", "name": "phase4-obs-test", "visibility": "public", "initialize": False}, |
| 99 | ) |
| 100 | assert resp.status_code in (200, 201), resp.text |
| 101 | data = resp.json() |
| 102 | yield data["slug"] |
| 103 | await client.delete(f"/api/repos/{data['repoId']}") |
| 104 | |
| 105 | |
| 106 | def _make_repo(tmp: pathlib.Path) -> tuple[pathlib.Path, str, dict]: |
| 107 | tmp.mkdir(parents=True, exist_ok=True) |
| 108 | dot = muse_dir(tmp) |
| 109 | dot.mkdir() |
| 110 | (dot / "repo.json").write_text('{"repo_id":"phase4-test","owner":"gabriel"}') |
| 111 | for d in ("commits", "snapshots", "objects"): |
| 112 | (dot / d).mkdir() |
| 113 | (dot / "refs" / "heads").mkdir(parents=True) |
| 114 | (dot / "HEAD").write_text("ref: refs/heads/main\n") |
| 115 | (dot / "config.toml").write_text("") |
| 116 | |
| 117 | blob_ids: list[str] = [] |
| 118 | for i in range(_N_FILES): |
| 119 | data = f"base-{i:04d}".encode() + b"x" * _BLOB_SIZE |
| 120 | oid = blob_id(data) |
| 121 | write_object(tmp, oid, data) |
| 122 | blob_ids.append(oid) |
| 123 | |
| 124 | base_manifest = {f"src/file_{i:04d}.py": blob_ids[i] for i in range(_N_FILES)} |
| 125 | parent = None |
| 126 | tip = "" |
| 127 | ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) |
| 128 | |
| 129 | for i in range(_N_COMMITS): |
| 130 | manifest = dict(base_manifest) |
| 131 | for j in range(_FILES_CHANGED): |
| 132 | idx = (i * _FILES_CHANGED + j) % _N_FILES |
| 133 | raw = f"c{i:04d}-f{j}".encode() + b"y" * _BLOB_SIZE |
| 134 | oid = blob_id(raw) |
| 135 | write_object(tmp, oid, raw) |
| 136 | manifest[f"src/file_{idx:04d}.py"] = oid |
| 137 | |
| 138 | sid = compute_snapshot_id(manifest) |
| 139 | write_snapshot(tmp, SnapshotRecord(snapshot_id=sid, manifest=manifest)) |
| 140 | msg = f"commit-{i:05d}" |
| 141 | cid = compute_commit_id( |
| 142 | parent_ids=[parent] if parent else [], |
| 143 | snapshot_id=sid, |
| 144 | message=msg, |
| 145 | committed_at_iso=ts.isoformat(), |
| 146 | author="gabriel", |
| 147 | ) |
| 148 | write_commit(tmp, CommitRecord( |
| 149 | commit_id=cid, branch="main", |
| 150 | snapshot_id=sid, message=msg, committed_at=ts, |
| 151 | parent_commit_id=parent, parent2_commit_id=None, |
| 152 | author="gabriel", metadata={}, structured_delta=None, |
| 153 | sem_ver_bump="none", breaking_changes=[], |
| 154 | agent_id="", model_id="", toolchain_id="", |
| 155 | prompt_hash="", signature="", signer_key_id="", |
| 156 | )) |
| 157 | parent = cid |
| 158 | tip = cid |
| 159 | ts += datetime.timedelta(seconds=60) |
| 160 | |
| 161 | write_branch_ref(tmp, "main", tip) |
| 162 | mpack = build_mpack(tmp, [tip], have=[]) |
| 163 | return tmp, tip, mpack |
| 164 | |
| 165 | |
| 166 | async def _push_mpack(client: AsyncClient, repo_slug: str, mpack: bytes, head: str, db_session: AsyncSession) -> str: |
| 167 | """Upload mpack to MinIO and create a mpack.index job row. Returns job_id.""" |
| 168 | import httpx as _httpx |
| 169 | from datetime import datetime, timezone |
| 170 | from sqlalchemy import select as _select |
| 171 | from musehub.db.musehub_repo_models import MusehubRepo as _Repo |
| 172 | from musehub.core.genesis import compute_job_id as _compute_job_id |
| 173 | from musehub.db.musehub_jobs_models import MusehubBackgroundJob |
| 174 | |
| 175 | repo_row = (await db_session.execute( |
| 176 | _select(_Repo).where(_Repo.slug == repo_slug) |
| 177 | )).scalar_one() |
| 178 | repo_id = repo_row.repo_id |
| 179 | |
| 180 | wire_bytes = msgpack.packb(mpack, use_bin_type=True) |
| 181 | mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() |
| 182 | n_objects = len(mpack.get("objects") or []) |
| 183 | |
| 184 | pr = await client.post( |
| 185 | f"/gabriel/{repo_slug}/push/mpack-presign", |
| 186 | content=msgpack.packb({"mpack_key": mpack_key, "size_bytes": len(wire_bytes)}, use_bin_type=True), |
| 187 | headers={"Content-Type": "application/x-msgpack"}, |
| 188 | ) |
| 189 | assert pr.status_code == 200, pr.text |
| 190 | upload_url = pr.json().get("upload_url") or pr.json().get("uploadUrl") |
| 191 | |
| 192 | async with _httpx.AsyncClient() as raw: |
| 193 | put = await raw.put(upload_url, content=wire_bytes) |
| 194 | assert put.status_code in (200, 204) |
| 195 | |
| 196 | now = datetime.now(tz=timezone.utc) |
| 197 | job_id = _compute_job_id(repo_id, "mpack.index", now.isoformat()) |
| 198 | db_session.add(MusehubBackgroundJob( |
| 199 | job_id=job_id, |
| 200 | repo_id=repo_id, |
| 201 | job_type="mpack.index", |
| 202 | payload={ |
| 203 | "mpack_key": mpack_key, |
| 204 | "pusher_id": "sha256:" + "0" * 64, |
| 205 | "branch": "main", |
| 206 | "head": head, |
| 207 | "force": False, |
| 208 | "declared_objects_count": n_objects, |
| 209 | }, |
| 210 | status="pending", |
| 211 | created_at=now, |
| 212 | attempt=0, |
| 213 | )) |
| 214 | await db_session.flush() |
| 215 | return job_id |
| 216 | |
| 217 | |
| 218 | # ── Phase 4 tests ─────────────────────────────────────────────────────────── |
| 219 | |
| 220 | @pytest.mark.asyncio |
| 221 | async def test_return_dict_has_per_phase_timing_keys( |
| 222 | client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, |
| 223 | ) -> None: |
| 224 | """Return dict contains all seven per-phase timing keys. |
| 225 | |
| 226 | These keys let monitoring dashboards alert on individual phases |
| 227 | (e.g., 'object_puts_ms > 5000') without parsing log lines. |
| 228 | """ |
| 229 | _, head, mpack = _make_repo(tmp_path / "repo") |
| 230 | job_id = await _push_mpack(client, repo, mpack, head, db_session) |
| 231 | |
| 232 | from musehub.services.musehub_wire import process_mpack_index_job |
| 233 | result = await process_mpack_index_job(db_session, job_id) |
| 234 | await db_session.commit() |
| 235 | |
| 236 | missing = _PHASE_TIMING_KEYS - set(result.keys()) |
| 237 | assert not missing, ( |
| 238 | f"process_mpack_index_job return dict is missing phase timing keys: {missing}\n" |
| 239 | f"Got keys: {sorted(result.keys())}" |
| 240 | ) |
| 241 | |
| 242 | |
| 243 | @pytest.mark.asyncio |
| 244 | async def test_all_phase_timings_are_non_negative( |
| 245 | client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, |
| 246 | ) -> None: |
| 247 | """Every timing value is a non-negative float. |
| 248 | |
| 249 | A negative timing would indicate a clock anomaly or a bug in the |
| 250 | monotonic timer checkpoints. Zero is allowed (empty mpack phase). |
| 251 | """ |
| 252 | _, head, mpack = _make_repo(tmp_path / "repo") |
| 253 | job_id = await _push_mpack(client, repo, mpack, head, db_session) |
| 254 | |
| 255 | from musehub.services.musehub_wire import process_mpack_index_job |
| 256 | result = await process_mpack_index_job(db_session, job_id) |
| 257 | await db_session.commit() |
| 258 | |
| 259 | bad = {k: result[k] for k in _PHASE_TIMING_KEYS if result[k] < 0} |
| 260 | assert not bad, ( |
| 261 | f"Negative timing values in result: {bad}\n" |
| 262 | "Monotonic clock should never go backwards." |
| 263 | ) |
| 264 | |
| 265 | |
| 266 | @pytest.mark.asyncio |
| 267 | async def test_mpack_size_bytes_in_return_dict( |
| 268 | client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, |
| 269 | ) -> None: |
| 270 | """Return dict contains mpack_size_bytes — the raw wire size in bytes. |
| 271 | |
| 272 | mpack_size_bytes lets dashboards correlate job timing against mpack size |
| 273 | (e.g., MB/s throughput for object_puts, overall fetch rate). |
| 274 | """ |
| 275 | _, head, mpack = _make_repo(tmp_path / "repo") |
| 276 | wire_bytes = msgpack.packb(mpack, use_bin_type=True) |
| 277 | expected_size = len(wire_bytes) |
| 278 | |
| 279 | job_id = await _push_mpack(client, repo, mpack, head, db_session) |
| 280 | |
| 281 | from musehub.services.musehub_wire import process_mpack_index_job |
| 282 | result = await process_mpack_index_job(db_session, job_id) |
| 283 | await db_session.commit() |
| 284 | |
| 285 | assert "mpack_size_bytes" in result, ( |
| 286 | f"mpack_size_bytes missing from result dict. Got: {sorted(result.keys())}" |
| 287 | ) |
| 288 | assert result["mpack_size_bytes"] == expected_size, ( |
| 289 | f"mpack_size_bytes {result['mpack_size_bytes']} != expected {expected_size}" |
| 290 | ) |
| 291 | |
| 292 | |
| 293 | @pytest.mark.asyncio |
| 294 | async def test_phase_timings_sum_within_elapsed( |
| 295 | client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, |
| 296 | ) -> None: |
| 297 | """Sum of per-phase timings is at most elapsed_ms + 50 ms measurement overhead. |
| 298 | |
| 299 | This catches bugs where a phase timer checkpoint is in the wrong place |
| 300 | (e.g., t_commits measured before the commit loop instead of after). |
| 301 | Overhead budget of 50 ms covers timer calls, logging, and context switches. |
| 302 | """ |
| 303 | _, head, mpack = _make_repo(tmp_path / "repo") |
| 304 | job_id = await _push_mpack(client, repo, mpack, head, db_session) |
| 305 | |
| 306 | from musehub.services.musehub_wire import process_mpack_index_job |
| 307 | result = await process_mpack_index_job(db_session, job_id) |
| 308 | await db_session.commit() |
| 309 | |
| 310 | phase_sum = sum(result[k] for k in _PHASE_TIMING_KEYS) |
| 311 | elapsed = result["elapsed_ms"] |
| 312 | overhead_budget_ms = 50.0 |
| 313 | |
| 314 | assert phase_sum <= elapsed + overhead_budget_ms, ( |
| 315 | f"Sum of phase timings ({phase_sum:.1f} ms) exceeds elapsed_ms " |
| 316 | f"({elapsed:.1f} ms) by more than {overhead_budget_ms} ms — " |
| 317 | "a timer checkpoint is likely misplaced." |
| 318 | ) |
| 319 | |
| 320 | |
| 321 | @pytest.mark.asyncio |
| 322 | async def test_summary_log_contains_phase_breakdown( |
| 323 | client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, |
| 324 | caplog: pytest.LogCaptureFixture, |
| 325 | ) -> None: |
| 326 | """The structured summary log line contains all phase timing labels. |
| 327 | |
| 328 | Ops teams need to grep a single log line per job and see the full |
| 329 | timing breakdown — not reconstruct it from seven separate lines. |
| 330 | The summary log must contain every phase key that appears in the |
| 331 | return dict so log-based alerts and dashboards stay in sync. |
| 332 | """ |
| 333 | _, head, mpack = _make_repo(tmp_path / "repo") |
| 334 | job_id = await _push_mpack(client, repo, mpack, head, db_session) |
| 335 | |
| 336 | from musehub.services.musehub_wire import process_mpack_index_job |
| 337 | with caplog.at_level(logging.INFO, logger="musehub.services.musehub_wire"): |
| 338 | result = await process_mpack_index_job(db_session, job_id) |
| 339 | await db_session.commit() |
| 340 | |
| 341 | # Find the summary log line (the ✅ done line) |
| 342 | summary_lines = [r.message for r in caplog.records if "mpack.index" in r.message and "done" in r.message] |
| 343 | assert summary_lines, ( |
| 344 | "No '✅ [mpack.index] done' summary log line found. " |
| 345 | f"All mpack.index log lines: {[r.message for r in caplog.records if 'mpack.index' in r.message]}" |
| 346 | ) |
| 347 | |
| 348 | summary = summary_lines[-1] |
| 349 | _PHASE_LABELS = ( |
| 350 | "fetch_mpack", "unpack", "object_puts", |
| 351 | "object_insert", "object_refs", "snapshot_insert", "commit_insert", |
| 352 | ) |
| 353 | missing_labels = [label for label in _PHASE_LABELS if label not in summary] |
| 354 | assert not missing_labels, ( |
| 355 | f"Summary log line missing phase labels: {missing_labels}\n" |
| 356 | f"Summary: {summary!r}" |
| 357 | ) |
File History
1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠
21 days ago