"""Persistent test-run history indexed by pytest node ID. Every time ``muse code test`` executes a test suite it appends a :class:`RunRecord` to the history. The history is stored in ``.muse/cache/test_history.json`` — a plain JSON list of run records, one per ``run_tests`` invocation. What the history enables ------------------------ * **Flaky-test detection** — a test that sometimes passes and sometimes fails across the last N runs is flagged as flaky. * **Failure-streak tracking** — how many consecutive runs ended in failure for a given test? High streaks signal systemic breakage. * **Duration trend** — is a test getting slower over time? * **Smart test ordering** — sort slowest/most-recently-failed tests to run first so failures surface as early as possible in a parallel run. Security -------- The history file is written atomically (rename-after-write) to prevent partial writes from corrupting the index. All data originates from pytest subprocess output (JSON report); no user-supplied data is executed. """ import json import logging import os import pathlib import time from collections.abc import Sequence from typing import Literal, NotRequired, TypedDict import json as _json from muse.core.types import MsgpackDict, content_hash from muse.core.paths import test_history_path as _test_history_path from muse.core.types import MsgpackValue from muse.core.record_helpers import ( _int_val, _str_list, _str_or_none, _str_val, ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Public type definitions # --------------------------------------------------------------------------- Outcome = Literal["passed", "failed", "error", "skipped"] class CaseRecord(TypedDict): """Result of a single test function within a run.""" node_id: str """Pytest node ID, e.g. ``"tests/test_foo.py::TestBar::test_baz"``.""" outcome: Outcome """Test outcome as reported by pytest.""" duration_ms: float """Wall-clock execution time in milliseconds.""" symbol_addresses: list[str] """Production symbol addresses this test is known to cover (may be empty when selection did not produce coverage data).""" longrepr: NotRequired[str] """Short failure representation from pytest (omitted when passing).""" class RunRecord(TypedDict): """A single ``muse code test`` invocation.""" run_id: str """content-addressed sha256: ID identifying this specific run.""" timestamp: str """ISO 8601 UTC timestamp of the run start, e.g. ``"2026-03-26T14:05:00Z"``.""" commit_id: str | None """HEAD commit ID at the time of the run, or ``None`` if repo has no commits.""" branch: str | None """Current branch name at run time, or ``None`` for detached HEAD.""" results: list[CaseRecord] """Individual test-case outcomes within this run.""" total: int """Total number of test cases.""" passed: int """Number of passing test cases.""" failed: int """Number of failing test cases.""" errored: int """Number of test cases that raised an unexpected error.""" skipped: int """Number of skipped test cases.""" class HistorySummary(TypedDict): """Per-test-function aggregated history summary.""" node_id: str """Pytest node ID.""" total_runs: int """Number of times this test has been seen across all recorded runs.""" pass_count: int """Runs where the test passed.""" fail_count: int """Runs where the test failed or errored.""" skip_count: int """Runs where the test was skipped.""" flaky: bool """True when pass_count > 0 **and** fail_count > 0 across recorded runs.""" avg_duration_ms: float """Mean execution time across all non-skipped runs, in milliseconds.""" last_outcome: Outcome | None """Most recent outcome for this test, or ``None`` if never recorded.""" last_run_timestamp: str | None """ISO 8601 timestamp of the most recent run that included this test.""" fail_streak: int """Number of consecutive most-recent runs in which the test failed/errored.""" # --------------------------------------------------------------------------- # Storage path # --------------------------------------------------------------------------- _HISTORY_VERSION = 2 type _SummaryMap = dict[str, "HistorySummary"] type _MutableSummaryMap = dict[str, "_MutableSummary"] # --------------------------------------------------------------------------- # Internal serialisation TypedDicts # --------------------------------------------------------------------------- class _TestCaseDoc(TypedDict): """Msgpack document shape for a single test-case result.""" node_id: str outcome: str duration_ms: float symbol_addresses: list[str] longrepr: str class _RunDoc(TypedDict): """Msgpack document shape for a single run record.""" run_id: str timestamp: str commit_id: str | None branch: str | None total: int passed: int failed: int errored: int skipped: int results: list[_TestCaseDoc] class _HistoryDoc(TypedDict): """Top-level JSON document shape for the history file.""" version: int runs: list[_RunDoc] def _history_path(root: pathlib.Path) -> pathlib.Path: """Return the path to the test-history JSON file inside ``.muse/``.""" return _test_history_path(root) # --------------------------------------------------------------------------- # Serialisation helpers # --------------------------------------------------------------------------- def _record_to_dict(record: RunRecord) -> _RunDoc: """Serialise a :class:`RunRecord` to a :class:`_RunDoc` for JSON encoding.""" return _RunDoc( run_id=record["run_id"], timestamp=record["timestamp"], commit_id=record.get("commit_id"), branch=record.get("branch"), total=record["total"], passed=record["passed"], failed=record["failed"], errored=record["errored"], skipped=record["skipped"], results=[ _TestCaseDoc( node_id=r["node_id"], outcome=r["outcome"], duration_ms=r["duration_ms"], symbol_addresses=r["symbol_addresses"], longrepr=r.get("longrepr", ""), ) for r in record["results"] ], ) def _record_from_dict(raw: MsgpackValue) -> RunRecord | None: """Deserialise a dict value into a :class:`RunRecord`. Returns ``None`` on any structural mismatch so a single corrupt entry does not abort the entire history load. """ if not isinstance(raw, dict): logger.debug("test_history: skipping non-dict run record") return None try: results: list[CaseRecord] = [] raw_results = raw.get("results", []) if not isinstance(raw_results, list): return None for r in raw_results: if not isinstance(r, dict): continue r_dict: MsgpackDict = r node_id = _str_val(r_dict, "node_id", "") raw_outcome = _str_val(r_dict, "outcome", "error") if not node_id: continue if raw_outcome == "passed": outcome: Outcome = "passed" elif raw_outcome == "failed": outcome = "failed" elif raw_outcome == "skipped": outcome = "skipped" else: outcome = "error" longrepr = _str_val(r_dict, "longrepr", "") duration_raw = r_dict.get("duration_ms", 0.0) duration_ms = float(duration_raw) if isinstance(duration_raw, (int, float)) else 0.0 rec = CaseRecord( node_id=node_id, outcome=outcome, duration_ms=duration_ms, symbol_addresses=_str_list(r_dict, "symbol_addresses"), ) if longrepr: rec["longrepr"] = longrepr results.append(rec) raw_dict: MsgpackDict = raw timestamp_str = _str_val(raw_dict, "timestamp", "") _raw_run_id = _str_val(raw_dict, "run_id", "") if _raw_run_id: run_id_str = _raw_run_id else: # Derive a deterministic fallback from the record's timestamp. run_id_str = content_hash({"fallback": True, "timestamp": timestamp_str or time.time_ns()}) commit_id_str = _str_or_none(raw_dict, "commit_id") branch_str = _str_or_none(raw_dict, "branch") return RunRecord( run_id=run_id_str, timestamp=timestamp_str, commit_id=commit_id_str, branch=branch_str, results=results, total=_int_val(raw_dict, "total", len(results)), passed=_int_val(raw_dict, "passed", 0), failed=_int_val(raw_dict, "failed", 0), errored=_int_val(raw_dict, "errored", 0), skipped=_int_val(raw_dict, "skipped", 0), ) except (KeyError, TypeError, ValueError) as exc: logger.debug("test_history: failed to deserialise run record: %s", exc) return None # --------------------------------------------------------------------------- # Public I/O # --------------------------------------------------------------------------- def load_history(root: pathlib.Path) -> list[RunRecord]: """Load and return all run records from ``.muse/cache/test_history.json``. Returns an empty list if the file does not exist or cannot be parsed. Individual corrupt records are silently skipped so one bad entry never prevents history from loading. """ path = _history_path(root) if not path.exists(): return [] try: raw = path.read_bytes() if raw and raw[0] > 0x7F: logger.warning("⚠️ test_history: %s is old binary format — ignoring", path) return [] doc = _json.loads(raw.decode("utf-8")) except Exception as exc: logger.warning("⚠️ test_history: could not load %s: %s", path, exc) return [] if not isinstance(doc, dict): return [] entries = doc.get("runs", []) if not isinstance(entries, list): return [] records: list[RunRecord] = [] for entry in entries: parsed = _record_from_dict(entry) if parsed is not None: records.append(parsed) return records def save_history(root: pathlib.Path, records: list[RunRecord]) -> None: """Atomically overwrite ``.muse/cache/test_history.json`` with *records*. Uses rename-after-write to guarantee the file is never left in a partially written state. """ path = _history_path(root) path.parent.mkdir(parents=True, exist_ok=True) doc = _HistoryDoc( version=_HISTORY_VERSION, runs=[_record_to_dict(r) for r in records], ) encoded = _json.dumps(doc, ensure_ascii=False, separators=(",", ":")).encode("utf-8") tmp = path.with_suffix(".tmp") try: tmp.write_bytes(encoded) os.replace(tmp, path) except OSError as exc: logger.error("❌ test_history: failed to write %s: %s", path, exc) tmp.unlink(missing_ok=True) raise def append_run(root: pathlib.Path, record: RunRecord) -> None: """Append a single :class:`RunRecord` to the history. Loads the existing history, appends *record*, and saves atomically. Concurrent appends from parallel workers may interleave; the history is not a CRDT but the worst-case outcome is a duplicate entry which is harmless for the analytics use-cases. """ records = load_history(root) records.append(record) save_history(root, records) def make_run_id() -> str: """Return a content-addressed sha256: ID for a new test run. Uses nanosecond wall-clock time as genesis, giving uniqueness at sub-microsecond granularity without requiring random input. """ return content_hash({"started_at_ns": time.time_ns()}) def iso_now() -> str: """Return the current UTC time as an ISO 8601 string (seconds precision).""" t = time.gmtime() return ( f"{t.tm_year:04d}-{t.tm_mon:02d}-{t.tm_mday:02d}T" f"{t.tm_hour:02d}:{t.tm_min:02d}:{t.tm_sec:02d}Z" ) # --------------------------------------------------------------------------- # Analytics # --------------------------------------------------------------------------- def summarize(records: Sequence[RunRecord]) -> _SummaryMap: """Aggregate *records* into a per-test summary map. Args: records: Run records as returned by :func:`load_history`. Returns: Dict mapping pytest node ID → :class:`HistorySummary`. """ summaries: _MutableSummaryMap = {} for run in records: for result in run["results"]: nid = result["node_id"] if nid not in summaries: summaries[nid] = _MutableSummary( node_id=nid, outcomes=[], durations=[], timestamps=[], ) summaries[nid]["outcomes"].append(result["outcome"]) summaries[nid]["durations"].append(result["duration_ms"]) summaries[nid]["timestamps"].append(run["timestamp"]) out: _SummaryMap = {} for nid, ms in summaries.items(): outcomes = ms["outcomes"] durations = ms["durations"] timestamps = ms["timestamps"] pass_count = sum(1 for o in outcomes if o == "passed") fail_count = sum(1 for o in outcomes if o in {"failed", "error"}) skip_count = sum(1 for o in outcomes if o == "skipped") non_skip_durations = [ d for d, o in zip(durations, outcomes) if o != "skipped" ] avg_ms = ( sum(non_skip_durations) / len(non_skip_durations) if non_skip_durations else 0.0 ) # Failure streak: count consecutive failures from the most recent run. streak = 0 for o in reversed(outcomes): if o in {"failed", "error"}: streak += 1 else: break last_timestamp = timestamps[-1] if timestamps else None last_outcome: Outcome | None = outcomes[-1] if outcomes else None out[nid] = HistorySummary( node_id=nid, total_runs=len(outcomes), pass_count=pass_count, fail_count=fail_count, skip_count=skip_count, flaky=pass_count > 0 and fail_count > 0, avg_duration_ms=avg_ms, last_outcome=last_outcome, last_run_timestamp=last_timestamp, fail_streak=streak, ) return out def flaky_tests(records: Sequence[RunRecord]) -> list[HistorySummary]: """Return :class:`HistorySummary` entries for tests that are flaky. A test is flaky when it has both at least one pass and at least one failure across the recorded history. Results are sorted by ``fail_count`` descending so the most problematic tests appear first. """ sums = summarize(records) flaky = [s for s in sums.values() if s["flaky"]] flaky.sort(key=lambda s: s["fail_count"], reverse=True) return flaky def prioritize_targets( node_ids: list[str], records: Sequence[RunRecord], ) -> list[str]: """Re-order *node_ids* so highest-risk tests run first. Risk ordering (highest first): 1. Tests with a failure streak > 0 (currently broken). 2. Tests that are flaky (historically unreliable). 3. Tests that have never been recorded (unknown risk — run early). 4. Tests sorted by average duration descending (slow tests surface failures earlier in a parallel run). Returns the same node IDs in a new order. """ sums = summarize(records) def _sort_key(nid: str) -> tuple[int, int, float]: s = sums.get(nid) if s is None: # Unknown: moderate priority between streaky and healthy. return (1, 0, 0.0) streak_score = 0 if s["fail_streak"] == 0 else 2 flaky_score = 1 if s["flaky"] else 0 # Negate duration so slower tests come first (we sort ascending). return (-(streak_score + flaky_score), 0, -s["avg_duration_ms"]) return sorted(node_ids, key=_sort_key) # --------------------------------------------------------------------------- # Internal mutable accumulation type (not exported) # --------------------------------------------------------------------------- class _MutableSummary(TypedDict): """Temporary accumulator used inside :func:`summarize`.""" node_id: str outcomes: list[Outcome] durations: list[float] timestamps: list[str]