gabriel / muse public
test_history.py python
499 lines 16.5 KB
Raw
sha256:e6465e8a9b7fa8e6223ed4a3576e96c568c913ae2caeb9c31f15e7a81b250b40 docs: add | jq convention to --json section of agent-guide Sonnet 4.6 1 day ago
1 """Persistent test-run history indexed by pytest node ID.
2
3 Every time ``muse code test`` executes a test suite it appends a
4 :class:`RunRecord` to the history. The history is stored in
5 ``.muse/cache/test_history.json`` — a plain JSON list of run records,
6 one per ``run_tests`` invocation.
7
8 What the history enables
9 ------------------------
10 * **Flaky-test detection** — a test that sometimes passes and sometimes
11 fails across the last N runs is flagged as flaky.
12 * **Failure-streak tracking** — how many consecutive runs ended in failure
13 for a given test? High streaks signal systemic breakage.
14 * **Duration trend** — is a test getting slower over time?
15 * **Smart test ordering** — sort slowest/most-recently-failed tests to run
16 first so failures surface as early as possible in a parallel run.
17
18 Security
19 --------
20 The history file is written atomically (rename-after-write) to prevent
21 partial writes from corrupting the index. All data originates from pytest
22 subprocess output (JSON report); no user-supplied data is executed.
23 """
24
25 import json
26 import logging
27 import os
28 import pathlib
29 import time
30 from collections.abc import Sequence
31 from typing import Literal, NotRequired, TypedDict
32
33 import json as _json
34
35 from muse.core.types import MsgpackDict, content_hash
36 from muse.core.paths import test_history_path as _test_history_path
37 from muse.core.types import MsgpackValue
38 from muse.core.record_helpers import (
39 _int_val,
40 _str_list,
41 _str_or_none,
42 _str_val,
43 )
44
45 logger = logging.getLogger(__name__)
46
47 # ---------------------------------------------------------------------------
48 # Public type definitions
49 # ---------------------------------------------------------------------------
50
51 Outcome = Literal["passed", "failed", "error", "skipped"]
52
53 class CaseRecord(TypedDict):
54 """Result of a single test function within a run."""
55
56 node_id: str
57 """Pytest node ID, e.g. ``"tests/test_foo.py::TestBar::test_baz"``."""
58
59 outcome: Outcome
60 """Test outcome as reported by pytest."""
61
62 duration_ms: float
63 """Wall-clock execution time in milliseconds."""
64
65 symbol_addresses: list[str]
66 """Production symbol addresses this test is known to cover (may be empty
67 when selection did not produce coverage data)."""
68
69 longrepr: NotRequired[str]
70 """Short failure representation from pytest (omitted when passing)."""
71
72 class RunRecord(TypedDict):
73 """A single ``muse code test`` invocation."""
74
75 run_id: str
76 """content-addressed sha256: ID identifying this specific run."""
77
78 timestamp: str
79 """ISO 8601 UTC timestamp of the run start, e.g. ``"2026-03-26T14:05:00Z"``."""
80
81 commit_id: str | None
82 """HEAD commit ID at the time of the run, or ``None`` if repo has no commits."""
83
84 branch: str | None
85 """Current branch name at run time, or ``None`` for detached HEAD."""
86
87 results: list[CaseRecord]
88 """Individual test-case outcomes within this run."""
89
90 total: int
91 """Total number of test cases."""
92
93 passed: int
94 """Number of passing test cases."""
95
96 failed: int
97 """Number of failing test cases."""
98
99 errored: int
100 """Number of test cases that raised an unexpected error."""
101
102 skipped: int
103 """Number of skipped test cases."""
104
105 class HistorySummary(TypedDict):
106 """Per-test-function aggregated history summary."""
107
108 node_id: str
109 """Pytest node ID."""
110
111 total_runs: int
112 """Number of times this test has been seen across all recorded runs."""
113
114 pass_count: int
115 """Runs where the test passed."""
116
117 fail_count: int
118 """Runs where the test failed or errored."""
119
120 skip_count: int
121 """Runs where the test was skipped."""
122
123 flaky: bool
124 """True when pass_count > 0 **and** fail_count > 0 across recorded runs."""
125
126 avg_duration_ms: float
127 """Mean execution time across all non-skipped runs, in milliseconds."""
128
129 last_outcome: Outcome | None
130 """Most recent outcome for this test, or ``None`` if never recorded."""
131
132 last_run_timestamp: str | None
133 """ISO 8601 timestamp of the most recent run that included this test."""
134
135 fail_streak: int
136 """Number of consecutive most-recent runs in which the test failed/errored."""
137
138 # ---------------------------------------------------------------------------
139 # Storage path
140 # ---------------------------------------------------------------------------
141
142 _HISTORY_VERSION = 2
143
144 type _SummaryMap = dict[str, "HistorySummary"]
145 type _MutableSummaryMap = dict[str, "_MutableSummary"]
146
147 # ---------------------------------------------------------------------------
148 # Internal serialisation TypedDicts
149 # ---------------------------------------------------------------------------
150
151 class _TestCaseDoc(TypedDict):
152 """Msgpack document shape for a single test-case result."""
153
154 node_id: str
155 outcome: str
156 duration_ms: float
157 symbol_addresses: list[str]
158 longrepr: str
159
160 class _RunDoc(TypedDict):
161 """Msgpack document shape for a single run record."""
162
163 run_id: str
164 timestamp: str
165 commit_id: str | None
166 branch: str | None
167 total: int
168 passed: int
169 failed: int
170 errored: int
171 skipped: int
172 results: list[_TestCaseDoc]
173
174 class _HistoryDoc(TypedDict):
175 """Top-level JSON document shape for the history file."""
176
177 version: int
178 runs: list[_RunDoc]
179
180 def _history_path(root: pathlib.Path) -> pathlib.Path:
181 """Return the path to the test-history JSON file inside ``.muse/``."""
182 return _test_history_path(root)
183
184 # ---------------------------------------------------------------------------
185 # Serialisation helpers
186 # ---------------------------------------------------------------------------
187
188 def _record_to_dict(record: RunRecord) -> _RunDoc:
189 """Serialise a :class:`RunRecord` to a :class:`_RunDoc` for JSON encoding."""
190 return _RunDoc(
191 run_id=record["run_id"],
192 timestamp=record["timestamp"],
193 commit_id=record.get("commit_id"),
194 branch=record.get("branch"),
195 total=record["total"],
196 passed=record["passed"],
197 failed=record["failed"],
198 errored=record["errored"],
199 skipped=record["skipped"],
200 results=[
201 _TestCaseDoc(
202 node_id=r["node_id"],
203 outcome=r["outcome"],
204 duration_ms=r["duration_ms"],
205 symbol_addresses=r["symbol_addresses"],
206 longrepr=r.get("longrepr", ""),
207 )
208 for r in record["results"]
209 ],
210 )
211
212 def _record_from_dict(raw: MsgpackValue) -> RunRecord | None:
213 """Deserialise a dict value into a :class:`RunRecord`.
214
215 Returns ``None`` on any structural mismatch so a single corrupt entry
216 does not abort the entire history load.
217 """
218 if not isinstance(raw, dict):
219 logger.debug("test_history: skipping non-dict run record")
220 return None
221 try:
222 results: list[CaseRecord] = []
223 raw_results = raw.get("results", [])
224 if not isinstance(raw_results, list):
225 return None
226 for r in raw_results:
227 if not isinstance(r, dict):
228 continue
229 r_dict: MsgpackDict = r
230 node_id = _str_val(r_dict, "node_id", "")
231 raw_outcome = _str_val(r_dict, "outcome", "error")
232 if not node_id:
233 continue
234 if raw_outcome == "passed":
235 outcome: Outcome = "passed"
236 elif raw_outcome == "failed":
237 outcome = "failed"
238 elif raw_outcome == "skipped":
239 outcome = "skipped"
240 else:
241 outcome = "error"
242 longrepr = _str_val(r_dict, "longrepr", "")
243 duration_raw = r_dict.get("duration_ms", 0.0)
244 duration_ms = float(duration_raw) if isinstance(duration_raw, (int, float)) else 0.0
245 rec = CaseRecord(
246 node_id=node_id,
247 outcome=outcome,
248 duration_ms=duration_ms,
249 symbol_addresses=_str_list(r_dict, "symbol_addresses"),
250 )
251 if longrepr:
252 rec["longrepr"] = longrepr
253 results.append(rec)
254
255 raw_dict: MsgpackDict = raw
256
257 timestamp_str = _str_val(raw_dict, "timestamp", "")
258 _raw_run_id = _str_val(raw_dict, "run_id", "")
259 if _raw_run_id:
260 run_id_str = _raw_run_id
261 else:
262 # Derive a deterministic fallback from the record's timestamp.
263 run_id_str = content_hash({"fallback": True, "timestamp": timestamp_str or time.time_ns()})
264 commit_id_str = _str_or_none(raw_dict, "commit_id")
265 branch_str = _str_or_none(raw_dict, "branch")
266
267 return RunRecord(
268 run_id=run_id_str,
269 timestamp=timestamp_str,
270 commit_id=commit_id_str,
271 branch=branch_str,
272 results=results,
273 total=_int_val(raw_dict, "total", len(results)),
274 passed=_int_val(raw_dict, "passed", 0),
275 failed=_int_val(raw_dict, "failed", 0),
276 errored=_int_val(raw_dict, "errored", 0),
277 skipped=_int_val(raw_dict, "skipped", 0),
278 )
279 except (KeyError, TypeError, ValueError) as exc:
280 logger.debug("test_history: failed to deserialise run record: %s", exc)
281 return None
282
283 # ---------------------------------------------------------------------------
284 # Public I/O
285 # ---------------------------------------------------------------------------
286
287 def load_history(root: pathlib.Path) -> list[RunRecord]:
288 """Load and return all run records from ``.muse/cache/test_history.json``.
289
290 Returns an empty list if the file does not exist or cannot be parsed.
291 Individual corrupt records are silently skipped so one bad entry never
292 prevents history from loading.
293 """
294 path = _history_path(root)
295 if not path.exists():
296 return []
297 try:
298 raw = path.read_bytes()
299 if raw and raw[0] > 0x7F:
300 logger.warning("⚠️ test_history: %s is old binary format — ignoring", path)
301 return []
302 doc = _json.loads(raw.decode("utf-8"))
303 except Exception as exc:
304 logger.warning("⚠️ test_history: could not load %s: %s", path, exc)
305 return []
306
307 if not isinstance(doc, dict):
308 return []
309
310 entries = doc.get("runs", [])
311 if not isinstance(entries, list):
312 return []
313
314 records: list[RunRecord] = []
315 for entry in entries:
316 parsed = _record_from_dict(entry)
317 if parsed is not None:
318 records.append(parsed)
319 return records
320
321 def save_history(root: pathlib.Path, records: list[RunRecord]) -> None:
322 """Atomically overwrite ``.muse/cache/test_history.json`` with *records*.
323
324 Uses rename-after-write to guarantee the file is never left in a
325 partially written state.
326 """
327 path = _history_path(root)
328 path.parent.mkdir(parents=True, exist_ok=True)
329
330 doc = _HistoryDoc(
331 version=_HISTORY_VERSION,
332 runs=[_record_to_dict(r) for r in records],
333 )
334 encoded = _json.dumps(doc, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
335
336 tmp = path.with_suffix(".tmp")
337 try:
338 tmp.write_bytes(encoded)
339 os.replace(tmp, path)
340 except OSError as exc:
341 logger.error("❌ test_history: failed to write %s: %s", path, exc)
342 tmp.unlink(missing_ok=True)
343 raise
344
345 def append_run(root: pathlib.Path, record: RunRecord) -> None:
346 """Append a single :class:`RunRecord` to the history.
347
348 Loads the existing history, appends *record*, and saves atomically.
349 Concurrent appends from parallel workers may interleave; the history is
350 not a CRDT but the worst-case outcome is a duplicate entry which is
351 harmless for the analytics use-cases.
352 """
353 records = load_history(root)
354 records.append(record)
355 save_history(root, records)
356
357 def make_run_id() -> str:
358 """Return a content-addressed sha256: ID for a new test run.
359
360 Uses nanosecond wall-clock time as genesis, giving uniqueness at
361 sub-microsecond granularity without requiring random input.
362 """
363 return content_hash({"started_at_ns": time.time_ns()})
364
365 def iso_now() -> str:
366 """Return the current UTC time as an ISO 8601 string (seconds precision)."""
367 t = time.gmtime()
368 return (
369 f"{t.tm_year:04d}-{t.tm_mon:02d}-{t.tm_mday:02d}T"
370 f"{t.tm_hour:02d}:{t.tm_min:02d}:{t.tm_sec:02d}Z"
371 )
372
373 # ---------------------------------------------------------------------------
374 # Analytics
375 # ---------------------------------------------------------------------------
376
377 def summarize(records: Sequence[RunRecord]) -> _SummaryMap:
378 """Aggregate *records* into a per-test summary map.
379
380 Args:
381 records: Run records as returned by :func:`load_history`.
382
383 Returns:
384 Dict mapping pytest node ID → :class:`HistorySummary`.
385 """
386 summaries: _MutableSummaryMap = {}
387
388 for run in records:
389 for result in run["results"]:
390 nid = result["node_id"]
391 if nid not in summaries:
392 summaries[nid] = _MutableSummary(
393 node_id=nid,
394 outcomes=[],
395 durations=[],
396 timestamps=[],
397 )
398 summaries[nid]["outcomes"].append(result["outcome"])
399 summaries[nid]["durations"].append(result["duration_ms"])
400 summaries[nid]["timestamps"].append(run["timestamp"])
401
402 out: _SummaryMap = {}
403 for nid, ms in summaries.items():
404 outcomes = ms["outcomes"]
405 durations = ms["durations"]
406 timestamps = ms["timestamps"]
407
408 pass_count = sum(1 for o in outcomes if o == "passed")
409 fail_count = sum(1 for o in outcomes if o in {"failed", "error"})
410 skip_count = sum(1 for o in outcomes if o == "skipped")
411
412 non_skip_durations = [
413 d for d, o in zip(durations, outcomes) if o != "skipped"
414 ]
415 avg_ms = (
416 sum(non_skip_durations) / len(non_skip_durations)
417 if non_skip_durations
418 else 0.0
419 )
420
421 # Failure streak: count consecutive failures from the most recent run.
422 streak = 0
423 for o in reversed(outcomes):
424 if o in {"failed", "error"}:
425 streak += 1
426 else:
427 break
428
429 last_timestamp = timestamps[-1] if timestamps else None
430 last_outcome: Outcome | None = outcomes[-1] if outcomes else None
431
432 out[nid] = HistorySummary(
433 node_id=nid,
434 total_runs=len(outcomes),
435 pass_count=pass_count,
436 fail_count=fail_count,
437 skip_count=skip_count,
438 flaky=pass_count > 0 and fail_count > 0,
439 avg_duration_ms=avg_ms,
440 last_outcome=last_outcome,
441 last_run_timestamp=last_timestamp,
442 fail_streak=streak,
443 )
444
445 return out
446
447 def flaky_tests(records: Sequence[RunRecord]) -> list[HistorySummary]:
448 """Return :class:`HistorySummary` entries for tests that are flaky.
449
450 A test is flaky when it has both at least one pass and at least one
451 failure across the recorded history. Results are sorted by
452 ``fail_count`` descending so the most problematic tests appear first.
453 """
454 sums = summarize(records)
455 flaky = [s for s in sums.values() if s["flaky"]]
456 flaky.sort(key=lambda s: s["fail_count"], reverse=True)
457 return flaky
458
459 def prioritize_targets(
460 node_ids: list[str],
461 records: Sequence[RunRecord],
462 ) -> list[str]:
463 """Re-order *node_ids* so highest-risk tests run first.
464
465 Risk ordering (highest first):
466
467 1. Tests with a failure streak > 0 (currently broken).
468 2. Tests that are flaky (historically unreliable).
469 3. Tests that have never been recorded (unknown risk — run early).
470 4. Tests sorted by average duration descending (slow tests surface
471 failures earlier in a parallel run).
472
473 Returns the same node IDs in a new order.
474 """
475 sums = summarize(records)
476
477 def _sort_key(nid: str) -> tuple[int, int, float]:
478 s = sums.get(nid)
479 if s is None:
480 # Unknown: moderate priority between streaky and healthy.
481 return (1, 0, 0.0)
482 streak_score = 0 if s["fail_streak"] == 0 else 2
483 flaky_score = 1 if s["flaky"] else 0
484 # Negate duration so slower tests come first (we sort ascending).
485 return (-(streak_score + flaky_score), 0, -s["avg_duration_ms"])
486
487 return sorted(node_ids, key=_sort_key)
488
489 # ---------------------------------------------------------------------------
490 # Internal mutable accumulation type (not exported)
491 # ---------------------------------------------------------------------------
492
493 class _MutableSummary(TypedDict):
494 """Temporary accumulator used inside :func:`summarize`."""
495
496 node_id: str
497 outcomes: list[Outcome]
498 durations: list[float]
499 timestamps: list[str]
File History 1 commit
sha256:e6465e8a9b7fa8e6223ed4a3576e96c568c913ae2caeb9c31f15e7a81b250b40 docs: add | jq convention to --json section of agent-guide Sonnet 4.6 1 day ago