"""TDD — Phase 4: observability timing breakdown. process_mpack_index_job must return a result dict that a monitoring dashboard or alerting rule can consume without re-parsing log lines. Phase 4 invariants: 1. Return dict contains per-phase timing keys (ms float, non-negative). 2. Return dict contains mpack_size_bytes (int, positive). 3. Sum of individual phase timings ≤ elapsed_ms + measurement overhead. 4. The structured summary log line is emitted with all phase timings. Phase timing keys: fetch_mpack_ms — MinIO GET (one round-trip, critical path) unpack_ms — msgpack.unpackb (O(mpack_size), CPU-bound) snapshot_insert_ms — bulk INSERT musehub_snapshots commit_insert_ms — bulk INSERT musehub_commits object_puts_ms — parallel MinIO PUTs (network-bound, batched) object_insert_ms — bulk INSERT / ON CONFLICT DO UPDATE musehub_objects object_refs_ms — upsert musehub_object_refs """ from __future__ import annotations import datetime import hashlib import logging import pathlib import msgpack import pytest import pytest_asyncio pytestmark = pytest.mark.skip(reason="muse wire protocol in flux") from httpx import AsyncClient, ASGITransport from sqlalchemy.ext.asyncio import AsyncSession from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request from musehub.db.database import get_db from musehub.main import app from muse.core.object_store import write_object from muse.core.mpack import build_mpack from muse.core.paths import muse_dir from muse.core.snapshot import compute_commit_id, compute_snapshot_id from muse.core.commits import CommitRecord, write_commit from muse.core.refs import write_branch_ref from muse.core.snapshots import SnapshotRecord, write_snapshot from muse.core.types import blob_id _AUTH_CTX = MSignContext( handle="gabriel", identity_id="sha256:" + "0" * 64, is_agent=False, is_admin=True, ) _N_FILES = 8 _N_COMMITS = 4 _FILES_CHANGED = 2 _BLOB_SIZE = 128 # Canonical set of per-phase timing keys Phase 4 requires. _PHASE_TIMING_KEYS: frozenset[str] = frozenset({ "fetch_mpack_ms", "unpack_ms", "snapshot_insert_ms", "commit_insert_ms", "object_puts_ms", "object_insert_ms", "object_refs_ms", }) # ── fixtures ──────────────────────────────────────────────────────────────── @pytest_asyncio.fixture() async def client(db_session: AsyncSession) -> None: async def _override_get_db() -> None: yield db_session app.dependency_overrides[get_db] = _override_get_db app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX async with AsyncClient( transport=ASGITransport(app=app), base_url="https://localhost:1337", ) as c: yield c app.dependency_overrides.clear() @pytest_asyncio.fixture() async def repo(client: AsyncClient) -> None: resp = await client.post( "/api/repos", json={"owner": "gabriel", "name": "phase4-obs-test", "visibility": "public", "initialize": False}, ) assert resp.status_code in (200, 201), resp.text data = resp.json() yield data["slug"] await client.delete(f"/api/repos/{data['repoId']}") def _make_repo(tmp: pathlib.Path) -> tuple[pathlib.Path, str, dict]: tmp.mkdir(parents=True, exist_ok=True) dot = muse_dir(tmp) dot.mkdir() (dot / "repo.json").write_text('{"repo_id":"phase4-test","owner":"gabriel"}') for d in ("commits", "snapshots", "objects"): (dot / d).mkdir() (dot / "refs" / "heads").mkdir(parents=True) (dot / "HEAD").write_text("ref: refs/heads/main\n") (dot / "config.toml").write_text("") blob_ids: list[str] = [] for i in range(_N_FILES): data = f"base-{i:04d}".encode() + b"x" * _BLOB_SIZE oid = blob_id(data) write_object(tmp, oid, data) blob_ids.append(oid) base_manifest = {f"src/file_{i:04d}.py": blob_ids[i] for i in range(_N_FILES)} parent = None tip = "" ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) for i in range(_N_COMMITS): manifest = dict(base_manifest) for j in range(_FILES_CHANGED): idx = (i * _FILES_CHANGED + j) % _N_FILES raw = f"c{i:04d}-f{j}".encode() + b"y" * _BLOB_SIZE oid = blob_id(raw) write_object(tmp, oid, raw) manifest[f"src/file_{idx:04d}.py"] = oid sid = compute_snapshot_id(manifest) write_snapshot(tmp, SnapshotRecord(snapshot_id=sid, manifest=manifest)) msg = f"commit-{i:05d}" cid = compute_commit_id( parent_ids=[parent] if parent else [], snapshot_id=sid, message=msg, committed_at_iso=ts.isoformat(), author="gabriel", ) write_commit(tmp, CommitRecord( commit_id=cid, branch="main", snapshot_id=sid, message=msg, committed_at=ts, parent_commit_id=parent, parent2_commit_id=None, author="gabriel", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", )) parent = cid tip = cid ts += datetime.timedelta(seconds=60) write_branch_ref(tmp, "main", tip) mpack = build_mpack(tmp, [tip], have=[]) return tmp, tip, mpack async def _push_mpack(client: AsyncClient, repo_slug: str, mpack: bytes, head: str, db_session: AsyncSession) -> str: """Upload mpack to MinIO and create a mpack.index job row. Returns job_id.""" import httpx as _httpx from datetime import datetime, timezone from sqlalchemy import select as _select from musehub.db.musehub_repo_models import MusehubRepo as _Repo from musehub.core.genesis import compute_job_id as _compute_job_id from musehub.db.musehub_jobs_models import MusehubBackgroundJob repo_row = (await db_session.execute( _select(_Repo).where(_Repo.slug == repo_slug) )).scalar_one() repo_id = repo_row.repo_id wire_bytes = msgpack.packb(mpack, use_bin_type=True) mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() n_objects = len(mpack.get("objects") or []) pr = await client.post( f"/gabriel/{repo_slug}/push/mpack-presign", content=msgpack.packb({"mpack_key": mpack_key, "size_bytes": len(wire_bytes)}, use_bin_type=True), headers={"Content-Type": "application/x-msgpack"}, ) assert pr.status_code == 200, pr.text upload_url = pr.json().get("upload_url") or pr.json().get("uploadUrl") async with _httpx.AsyncClient() as raw: put = await raw.put(upload_url, content=wire_bytes) assert put.status_code in (200, 204) now = datetime.now(tz=timezone.utc) job_id = _compute_job_id(repo_id, "mpack.index", now.isoformat()) db_session.add(MusehubBackgroundJob( job_id=job_id, repo_id=repo_id, job_type="mpack.index", payload={ "mpack_key": mpack_key, "pusher_id": "sha256:" + "0" * 64, "branch": "main", "head": head, "force": False, "declared_objects_count": n_objects, }, status="pending", created_at=now, attempt=0, )) await db_session.flush() return job_id # ── Phase 4 tests ─────────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_return_dict_has_per_phase_timing_keys( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """Return dict contains all seven per-phase timing keys. These keys let monitoring dashboards alert on individual phases (e.g., 'object_puts_ms > 5000') without parsing log lines. """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo, mpack, head, db_session) from musehub.services.musehub_wire import process_mpack_index_job result = await process_mpack_index_job(db_session, job_id) await db_session.commit() missing = _PHASE_TIMING_KEYS - set(result.keys()) assert not missing, ( f"process_mpack_index_job return dict is missing phase timing keys: {missing}\n" f"Got keys: {sorted(result.keys())}" ) @pytest.mark.asyncio async def test_all_phase_timings_are_non_negative( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """Every timing value is a non-negative float. A negative timing would indicate a clock anomaly or a bug in the monotonic timer checkpoints. Zero is allowed (empty mpack phase). """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo, mpack, head, db_session) from musehub.services.musehub_wire import process_mpack_index_job result = await process_mpack_index_job(db_session, job_id) await db_session.commit() bad = {k: result[k] for k in _PHASE_TIMING_KEYS if result[k] < 0} assert not bad, ( f"Negative timing values in result: {bad}\n" "Monotonic clock should never go backwards." ) @pytest.mark.asyncio async def test_mpack_size_bytes_in_return_dict( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """Return dict contains mpack_size_bytes — the raw wire size in bytes. mpack_size_bytes lets dashboards correlate job timing against mpack size (e.g., MB/s throughput for object_puts, overall fetch rate). """ _, head, mpack = _make_repo(tmp_path / "repo") wire_bytes = msgpack.packb(mpack, use_bin_type=True) expected_size = len(wire_bytes) job_id = await _push_mpack(client, repo, mpack, head, db_session) from musehub.services.musehub_wire import process_mpack_index_job result = await process_mpack_index_job(db_session, job_id) await db_session.commit() assert "mpack_size_bytes" in result, ( f"mpack_size_bytes missing from result dict. Got: {sorted(result.keys())}" ) assert result["mpack_size_bytes"] == expected_size, ( f"mpack_size_bytes {result['mpack_size_bytes']} != expected {expected_size}" ) @pytest.mark.asyncio async def test_phase_timings_sum_within_elapsed( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """Sum of per-phase timings is at most elapsed_ms + 50 ms measurement overhead. This catches bugs where a phase timer checkpoint is in the wrong place (e.g., t_commits measured before the commit loop instead of after). Overhead budget of 50 ms covers timer calls, logging, and context switches. """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo, mpack, head, db_session) from musehub.services.musehub_wire import process_mpack_index_job result = await process_mpack_index_job(db_session, job_id) await db_session.commit() phase_sum = sum(result[k] for k in _PHASE_TIMING_KEYS) elapsed = result["elapsed_ms"] overhead_budget_ms = 50.0 assert phase_sum <= elapsed + overhead_budget_ms, ( f"Sum of phase timings ({phase_sum:.1f} ms) exceeds elapsed_ms " f"({elapsed:.1f} ms) by more than {overhead_budget_ms} ms — " "a timer checkpoint is likely misplaced." ) @pytest.mark.asyncio async def test_summary_log_contains_phase_breakdown( client: AsyncClient, repo: str, tmp_path: pathlib.Path, db_session: AsyncSession, caplog: pytest.LogCaptureFixture, ) -> None: """The structured summary log line contains all phase timing labels. Ops teams need to grep a single log line per job and see the full timing breakdown — not reconstruct it from seven separate lines. The summary log must contain every phase key that appears in the return dict so log-based alerts and dashboards stay in sync. """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo, mpack, head, db_session) from musehub.services.musehub_wire import process_mpack_index_job with caplog.at_level(logging.INFO, logger="musehub.services.musehub_wire"): result = await process_mpack_index_job(db_session, job_id) await db_session.commit() # Find the summary log line (the ✅ done line) summary_lines = [r.message for r in caplog.records if "mpack.index" in r.message and "done" in r.message] assert summary_lines, ( "No '✅ [mpack.index] done' summary log line found. " f"All mpack.index log lines: {[r.message for r in caplog.records if 'mpack.index' in r.message]}" ) summary = summary_lines[-1] _PHASE_LABELS = ( "fetch_mpack", "unpack", "object_puts", "object_insert", "object_refs", "snapshot_insert", "commit_insert", ) missing_labels = [label for label in _PHASE_LABELS if label not in summary] assert not missing_labels, ( f"Summary log line missing phase labels: {missing_labels}\n" f"Summary: {summary!r}" )