"""Phase 1.7 — Linux-kernel scale: commit history walk. Tests cover: - 15k commit chain: walk_commits_between_result truncation flag - 15k commit chain: muse log --json emits "truncated" in JSON - 60k-deep branches: find_merge_base raises a clear error (not wrong answer) - commit_graph on 15k: emits "truncated": true (JSON + text + count-only) - Configurable caps via [limits] in config.toml - O(n²) regression: _collect_all_commits uses deque (benchmarked) - Streaming JSON: muse log --json doesn't hold all commits in memory - walk_commits_between_result returns WalkResult TypedDict - Truncation NOT flagged when walk naturally completes under cap - find_merge_base raises on BOTH A-side and B-side cap hit - get_commits_for_branch respects configurable cap via walk_limit """ from __future__ import annotations from collections.abc import Mapping type _ConfigMap = dict[str, int] import collections import datetime import json import pathlib import sys import time import tomllib import unittest.mock as mock from typing import TypedDict import pytest from tests.cli_test_helper import CliRunner class _LogOutput(TypedDict, total=False): """Shape of muse log --json output.""" truncated: bool commits: list[Mapping[str, str]] class _CommitJson(TypedDict, total=False): """Shape of a single commit entry in muse log --json.""" commit_id: str branch: str message: str author: str committed_at: str parent_commit_id: str snapshot_id: str metadata: Manifest sem_ver_bump: str from muse.core.merge_engine import find_merge_base from muse.core.ids import hash_commit as compute_commit_id from muse.core.types import Manifest, fake_id from muse.core.commits import ( CommitRecord, WalkResult, get_commits_for_branch, walk_commits_between, walk_commits_between_result, write_commit, ) from muse.core.paths import config_toml_path, heads_dir, muse_dir # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- _DT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) def _repo(tmp_path: pathlib.Path) -> pathlib.Path: """Create a minimal .muse/ directory structure.""" dot_muse = muse_dir(tmp_path) (dot_muse / "commits").mkdir(parents=True) (dot_muse / "snapshots").mkdir(parents=True) (dot_muse / "refs" / "heads").mkdir(parents=True) (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"})) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n") (dot_muse / "refs" / "heads" / "main").write_text("") return tmp_path def _make_commit( root: pathlib.Path, label: str = "", message: str = "msg", parent: str | None = None, parent2: str | None = None, branch: str = "main", ) -> CommitRecord: """Write a commit with a real content-addressed ID derived from its inputs. *label* is used to derive a unique snapshot_id; it need not be a real snapshot in the object store. The commit_id is computed via ``compute_commit_id`` so that ``read_commit`` can verify it on read. """ snapshot_id = fake_id(label) if label else fake_id("default") parent_ids = [p for p in [parent, parent2] if p is not None] commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snapshot_id, message=message, committed_at_iso=_DT.isoformat(), ) c = CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snapshot_id, message=message, committed_at=_DT, parent_commit_id=parent, parent2_commit_id=parent2, ) write_commit(root, c) return c def _build_linear_chain(root: pathlib.Path, n: int) -> list[str]: """Write a linear chain of *n* commits; return list of IDs newest-first.""" real_ids: list[str] = [] prev: str | None = None for i in range(n): record = _make_commit(root, f"commit_{i:08d}", parent=prev) prev = record.commit_id real_ids.append(record.commit_id) # HEAD points to the last commit (newest) tip = real_ids[-1] (heads_dir(root) / "main").write_text(tip) return list(reversed(real_ids)) # newest-first def _write_config(root: pathlib.Path, limits: _ConfigMap) -> None: """Write a [limits] section to .muse/config.toml.""" lines = ["[limits]\n"] for k, v in limits.items(): lines.append(f"{k} = {v}\n") (config_toml_path(root)).write_text("".join(lines)) # --------------------------------------------------------------------------- # 1. WalkResult TypedDict # --------------------------------------------------------------------------- class TestWalkResultType: def test_walk_result_is_typed_dict(self, tmp_path: pathlib.Path) -> None: root = _repo(tmp_path) ids = _build_linear_chain(root, 5) result = walk_commits_between_result(root, ids[0], max_commits=100) assert isinstance(result, dict) assert "commits" in result assert "truncated" in result assert "count" in result def test_walk_result_not_truncated_when_chain_fits( self, tmp_path: pathlib.Path ) -> None: root = _repo(tmp_path) ids = _build_linear_chain(root, 10) result = walk_commits_between_result(root, ids[0], max_commits=100) assert result["truncated"] is False assert result["count"] == 10 assert len(result["commits"]) == 10 def test_walk_result_truncated_at_cap(self, tmp_path: pathlib.Path) -> None: root = _repo(tmp_path) ids = _build_linear_chain(root, 50) result = walk_commits_between_result(root, ids[0], max_commits=20) assert result["truncated"] is True assert result["count"] == 20 assert len(result["commits"]) == 20 def test_walk_commits_between_backward_compat( self, tmp_path: pathlib.Path ) -> None: """walk_commits_between still returns list[CommitRecord].""" root = _repo(tmp_path) ids = _build_linear_chain(root, 5) result = walk_commits_between(root, ids[0], max_commits=100) assert isinstance(result, list) assert len(result) == 5 def test_count_matches_len_commits(self, tmp_path: pathlib.Path) -> None: root = _repo(tmp_path) ids = _build_linear_chain(root, 30) for cap in (5, 10, 30, 100): r = walk_commits_between_result(root, ids[0], max_commits=cap) assert r["count"] == len(r["commits"]) # --------------------------------------------------------------------------- # 2. 15k commit chain — truncation and correctness # --------------------------------------------------------------------------- @pytest.mark.slow class TestLinearChainScale: def test_15k_chain_walk_truncates_at_default_cap( self, tmp_path: pathlib.Path ) -> None: """15k chain with default cap (10k) → truncated=True, 10k commits.""" root = _repo(tmp_path) ids = _build_linear_chain(root, 15_000) result = walk_commits_between_result(root, ids[0]) # default cap = 10k assert result["truncated"] is True assert result["count"] == 10_000 def test_15k_chain_walk_completes_with_raised_cap( self, tmp_path: pathlib.Path ) -> None: """15k chain with cap=20k → truncated=False, 15k commits.""" root = _repo(tmp_path) ids = _build_linear_chain(root, 15_000) result = walk_commits_between_result(root, ids[0], max_commits=20_000) assert result["truncated"] is False assert result["count"] == 15_000 def test_15k_chain_order_newest_first(self, tmp_path: pathlib.Path) -> None: """First commit returned must be the tip (newest).""" root = _repo(tmp_path) ids = _build_linear_chain(root, 100) result = walk_commits_between_result(root, ids[0], max_commits=200) assert result["commits"][0].commit_id == ids[0] # newest first # --------------------------------------------------------------------------- # 3. Configurable caps via [limits] in config.toml # --------------------------------------------------------------------------- class TestConfigurableCaps: def test_walk_cap_from_config(self, tmp_path: pathlib.Path) -> None: root = _repo(tmp_path) _build_linear_chain(root, 200) _write_config(root, {"max_walk_commits": 50}) from muse.cli.config import get_limit cap = get_limit("max_walk_commits", root) assert cap == 50 def test_graph_cap_from_config(self, tmp_path: pathlib.Path) -> None: root = _repo(tmp_path) _write_config(root, {"max_graph_commits": 1000}) from muse.cli.config import get_limit assert get_limit("max_graph_commits", root) == 1000 def test_ancestors_cap_from_config(self, tmp_path: pathlib.Path) -> None: root = _repo(tmp_path) _write_config(root, {"max_ancestors": 999}) from muse.cli.config import get_limit assert get_limit("max_ancestors", root) == 999 def test_default_cap_when_config_absent(self, tmp_path: pathlib.Path) -> None: root = _repo(tmp_path) from muse.cli.config import ( _DEFAULT_MAX_ANCESTORS, _DEFAULT_MAX_WALK_COMMITS, get_limit, ) assert get_limit("max_walk_commits", root) == _DEFAULT_MAX_WALK_COMMITS assert get_limit("max_ancestors", root) == _DEFAULT_MAX_ANCESTORS def test_invalid_cap_ignored_uses_default(self, tmp_path: pathlib.Path) -> None: """Negative or zero limits in config must be ignored — use default.""" root = _repo(tmp_path) _write_config(root, {"max_walk_commits": -1}) # invalid from muse.cli.config import _DEFAULT_MAX_WALK_COMMITS, get_limit assert get_limit("max_walk_commits", root) == _DEFAULT_MAX_WALK_COMMITS def test_get_config_value_reads_limits(self, tmp_path: pathlib.Path) -> None: root = _repo(tmp_path) _write_config(root, {"max_walk_commits": 777}) from muse.cli.config import get_config_value assert get_config_value("limits.max_walk_commits", root) == "777" def test_limits_config_parsed_correctly(self, tmp_path: pathlib.Path) -> None: """All three limit keys are correctly parsed from config.toml.""" root = _repo(tmp_path) _write_config(root, { "max_walk_commits": 111, "max_ancestors": 222, "max_graph_commits": 333, }) from muse.cli.config import get_limit assert get_limit("max_walk_commits", root) == 111 assert get_limit("max_ancestors", root) == 222 assert get_limit("max_graph_commits", root) == 333 # --------------------------------------------------------------------------- # 4. find_merge_base — consistency at cap (both sides raise) # --------------------------------------------------------------------------- class TestFindMergeBaseAtCap: def _make_diverging_branches( self, root: pathlib.Path, depth_a: int, depth_b: int ) -> tuple[str, str, str]: """Create a common root then two branches of given depths. Returns (tip_a, tip_b, common_id).""" common_record = _make_commit(root, "common", "common root") common_id = common_record.commit_id prev_a = common_id for i in range(depth_a): record = _make_commit(root, f"branch_a_{i}", parent=prev_a, branch="branch-a") prev_a = record.commit_id tip_a = prev_a prev_b = common_id for i in range(depth_b): record = _make_commit(root, f"branch_b_{i}", parent=prev_b, branch="branch-b") prev_b = record.commit_id tip_b = prev_b return tip_a, tip_b, common_id def test_small_graph_finds_base_correctly( self, tmp_path: pathlib.Path ) -> None: root = _repo(tmp_path) tip_a, tip_b, common_id = self._make_diverging_branches(root, 5, 5) base = find_merge_base(root, tip_a, tip_b) assert base == common_id def test_a_side_cap_raises_muse_cli_error( self, tmp_path: pathlib.Path ) -> None: """A-side exceeding cap must raise MuseCLIError (not silently truncate).""" from muse.core.errors import MuseCLIError root = _repo(tmp_path) # Two branches each 110 commits deep from a common ancestor — the BFS # cannot find the merge base within the 100-ancestor cap. tip_a, tip_b, _ = self._make_diverging_branches(root, 110, 110) with mock.patch("muse.cli.config.get_limit", return_value=100): with pytest.raises(MuseCLIError, match="Ancestor graph exceeds"): find_merge_base(root, tip_a, tip_b) def test_b_side_cap_also_raises_muse_cli_error( self, tmp_path: pathlib.Path ) -> None: """B-side exceeding cap must also raise MuseCLIError — consistent behavior.""" from muse.core.errors import MuseCLIError root = _repo(tmp_path) # Two branches from a common ancestor deep in the history. # A-side is short (won't hit cap), B-side is long. common_record = _make_commit(root, "deep_common", "common") common_id = common_record.commit_id # B-side: 110 commits prev = common_id tip_b = common_id for i in range(110): record = _make_commit(root, f"b_side_{i}", parent=prev, branch="b") prev = record.commit_id tip_b = record.commit_id # A-side: 5 commits (tiny — won't hit cap) prev = common_id tip_a = common_id for i in range(5): record = _make_commit(root, f"a_side_{i}", parent=prev, branch="a") prev = record.commit_id tip_a = record.commit_id # With cap=100, B-side has 110 commits → raises with mock.patch("muse.cli.config.get_limit", return_value=100): with pytest.raises(MuseCLIError, match="Ancestor graph"): find_merge_base(root, tip_a, tip_b) def test_error_message_mentions_config_key( self, tmp_path: pathlib.Path ) -> None: """Error message must tell users how to raise the cap.""" from muse.core.errors import MuseCLIError root = _repo(tmp_path) tip_a, tip_b, _ = self._make_diverging_branches(root, 110, 110) with mock.patch("muse.cli.config.get_limit", return_value=100): with pytest.raises(MuseCLIError) as exc_info: find_merge_base(root, tip_a, tip_b) assert "max_ancestors" in str(exc_info.value) assert "config.toml" in str(exc_info.value) @pytest.mark.slow def test_60k_deep_branches_raise_not_wrong_answer( self, tmp_path: pathlib.Path ) -> None: """Two 60k-deep branches: find_merge_base raises, never silently truncates.""" from muse.core.errors import MuseCLIError root = _repo(tmp_path) # Use cap=50k (default); build 52k branches — enough to exceed cap, no excess common_record = _make_commit(root, "root60k", "root") common_id = common_record.commit_id n = 52_000 prev_a = common_id for i in range(n): record = _make_commit(root, f"a60k_{i}", parent=prev_a, branch="a") prev_a = record.commit_id tip_a = prev_a # B-side: only 10 commits — A-side will hit the cap first prev_b = common_id for i in range(10): record = _make_commit(root, f"b10_{i}", parent=prev_b, branch="b") prev_b = record.commit_id tip_b = prev_b # find_merge_base must raise, not silently return None/wrong answer with pytest.raises(MuseCLIError, match="Ancestor graph exceeds"): find_merge_base(root, tip_a, tip_b) # --------------------------------------------------------------------------- # 5. _collect_all_commits — delegates to iter_ancestors (O(1) popleft # guaranteed by graph.py; no O(n²) list.pop(0) pattern here) # --------------------------------------------------------------------------- class TestCollectAllCommitsPerformance: def test_uses_deque_not_list_for_bfs(self) -> None: """_collect_all_commits must delegate to iter_ancestors; no inline BFS.""" from muse.cli.commands import log as log_mod import inspect import ast source = inspect.getsource(log_mod._collect_all_commits) # Parse AST to check code (not docstring) for list.pop(0) pattern tree = ast.parse(source) pop0_calls: list[ast.Call] = [] for node in ast.walk(tree): if ( isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute) and node.func.attr == "pop" and node.args and isinstance(node.args[0], ast.Constant) and node.args[0].value == 0 ): pop0_calls.append(node) assert not pop0_calls, ( "Found list.pop(0) in _collect_all_commits — this is the O(n²) bug. " "Replace with deque.popleft()." ) assert "iter_ancestors" in source, ( "_collect_all_commits must delegate to iter_ancestors. " "O(1) popleft is guaranteed by graph.py's walk_dag." ) def test_collect_returns_tuple_with_truncated_flag( self, tmp_path: pathlib.Path ) -> None: root = _repo(tmp_path) ids = _build_linear_chain(root, 20) from muse.cli.commands.log import _collect_all_commits commits, truncated = _collect_all_commits(root, [ids[0]], max_commits=100) assert isinstance(commits, dict) assert isinstance(truncated, bool) assert truncated is False assert len(commits) == 20 def test_collect_truncates_at_cap(self, tmp_path: pathlib.Path) -> None: root = _repo(tmp_path) ids = _build_linear_chain(root, 50) from muse.cli.commands.log import _collect_all_commits commits, truncated = _collect_all_commits(root, [ids[0]], max_commits=10) assert truncated is True assert len(commits) == 10 @pytest.mark.slow def test_10k_commits_completes_in_reasonable_time( self, tmp_path: pathlib.Path ) -> None: """10k BFS must complete in < 5s — proves O(n) not O(n²).""" root = _repo(tmp_path) ids = _build_linear_chain(root, 10_000) from muse.cli.commands.log import _collect_all_commits t0 = time.perf_counter() commits, _ = _collect_all_commits(root, [ids[0]], max_commits=100_000) elapsed = time.perf_counter() - t0 assert len(commits) == 10_000 assert elapsed < 5.0, ( f"_collect_all_commits took {elapsed:.2f}s for 10k commits. " "Expected < 5s. O(n²) list.pop(0) would take ~50s." ) # --------------------------------------------------------------------------- # 6. muse log --json streaming output with "truncated" field # --------------------------------------------------------------------------- class TestLogJsonOutput: def _run_log(self, root: pathlib.Path, *extra_args: str) -> _LogOutput: """Run muse log --json via CliRunner and parse the output.""" runner = CliRunner() result = runner.invoke(None, ["log", "--json"], env={"MUSE_REPO_ROOT": str(root)}) out = result.stdout.strip() if not out: return _LogOutput() parsed: _LogOutput = json.loads(out) return parsed def test_log_json_has_truncated_field(self, tmp_path: pathlib.Path) -> None: """muse log --json must include a 'truncated' key in output.""" root = _repo(tmp_path) ids = _build_linear_chain(root, 5) (heads_dir(root) / "main").write_text(ids[0]) output = self._run_log(root) assert "truncated" in output, ( "muse log --json must include 'truncated' key. " "Agents rely on this to know whether to page." ) def test_log_json_not_truncated_for_small_history( self, tmp_path: pathlib.Path ) -> None: root = _repo(tmp_path) ids = _build_linear_chain(root, 5) (heads_dir(root) / "main").write_text(ids[0]) output = self._run_log(root) assert output["truncated"] is False def test_log_json_has_commits_array(self, tmp_path: pathlib.Path) -> None: root = _repo(tmp_path) ids = _build_linear_chain(root, 3) (heads_dir(root) / "main").write_text(ids[0]) output = self._run_log(root) commits = output.get("commits") assert isinstance(commits, list) assert len(commits) == 3 def test_log_json_commit_fields(self, tmp_path: pathlib.Path) -> None: """Each commit in JSON output has the required agent-facing fields.""" root = _repo(tmp_path) ids = _build_linear_chain(root, 2) (heads_dir(root) / "main").write_text(ids[0]) output = self._run_log(root) commits_raw = output.get("commits", []) assert isinstance(commits_raw, list) assert len(commits_raw) >= 1 # Verify required fields are present in the raw dict first_raw = commits_raw[0] assert isinstance(first_raw, dict) required_fields = { "commit_id", "branch", "message", "author", "committed_at", "parent_commit_id", "snapshot_id", "metadata", "sem_ver_bump", } assert required_fields.issubset(set(first_raw.keys())), ( f"Missing fields: {required_fields - set(first_raw.keys())}" ) def test_log_json_empty_history(self, tmp_path: pathlib.Path) -> None: """Empty history emits a valid JSON response (not an exception).""" root = _repo(tmp_path) # No commits, HEAD is empty — must not crash output = self._run_log(root) # Valid outcomes: empty dict (branch not found) or {"truncated":false,"commits":[]} if output: assert "commits" in output or output == {} # --------------------------------------------------------------------------- # 7. commit_graph — truncated in all output formats # --------------------------------------------------------------------------- class TestCommitGraphTruncation: def _run_commit_graph( self, root: pathlib.Path, tip: str, fmt: str = "json", max_commits: int = 10_000, count_only: bool = False, ) -> str: args = ["commit-graph", "--tip", tip, "--max", str(max_commits)] if fmt == "json": args.append("--json") if count_only: args.append("--count") runner = CliRunner() result = runner.invoke(None, args, env={"MUSE_REPO_ROOT": str(root)}) return result.stdout def test_json_has_truncated_false_when_under_cap( self, tmp_path: pathlib.Path ) -> None: root = _repo(tmp_path) ids = _build_linear_chain(root, 10) out = json.loads(self._run_commit_graph(root, ids[0], max_commits=100)) assert out["truncated"] is False assert out["count"] == 10 def test_json_has_truncated_true_when_over_cap( self, tmp_path: pathlib.Path ) -> None: root = _repo(tmp_path) ids = _build_linear_chain(root, 50) out = json.loads(self._run_commit_graph(root, ids[0], max_commits=20)) assert out["truncated"] is True assert out["count"] == 20 def test_text_format_has_truncated_comment_when_over_cap( self, tmp_path: pathlib.Path ) -> None: root = _repo(tmp_path) ids = _build_linear_chain(root, 50) out = self._run_commit_graph(root, ids[0], fmt="text", max_commits=10) assert "TRUNCATED" in out, ( "text format must emit '# TRUNCATED' when cap is hit" ) def test_text_format_no_truncated_when_under_cap( self, tmp_path: pathlib.Path ) -> None: root = _repo(tmp_path) ids = _build_linear_chain(root, 5) out = self._run_commit_graph(root, ids[0], fmt="text", max_commits=100) assert "TRUNCATED" not in out def test_count_only_has_truncated_field( self, tmp_path: pathlib.Path ) -> None: root = _repo(tmp_path) ids = _build_linear_chain(root, 50) out = json.loads(self._run_commit_graph( root, ids[0], max_commits=20, count_only=True )) assert "truncated" in out, "count-only must include 'truncated'" assert out["truncated"] is True assert out["count"] == 20 @pytest.mark.slow def test_15k_chain_commit_graph_completes_in_30s( self, tmp_path: pathlib.Path ) -> None: """commit-graph on 15k commits must complete in < 30s.""" root = _repo(tmp_path) ids = _build_linear_chain(root, 15_000) t0 = time.perf_counter() out = json.loads(self._run_commit_graph(root, ids[0], max_commits=10_000)) elapsed = time.perf_counter() - t0 assert out["truncated"] is True assert elapsed < 30.0, ( f"commit-graph on 15k commits took {elapsed:.1f}s — must be < 30s" ) # --------------------------------------------------------------------------- # 8. Regression: B-side was returning None silently (old bug) # --------------------------------------------------------------------------- class TestMergeBaseConsistency: def test_a_and_b_cap_raise_same_type(self, tmp_path: pathlib.Path) -> None: """Both A-side and B-side cap must raise the same exception type.""" from muse.core.errors import MuseCLIError root = _repo(tmp_path) # Build a 30-commit chain ids = _build_linear_chain(root, 30) tip_a = ids[0] # newest tip_b = ids[15] # halfway # With cap=20, A-side will be exhausted (30 > 20) with mock.patch("muse.cli.config.get_limit", return_value=20): with pytest.raises(MuseCLIError): find_merge_base(root, tip_a, tip_b) def test_symmetric_result_for_small_graph( self, tmp_path: pathlib.Path ) -> None: """find_merge_base(a, b) == find_merge_base(b, a) for a small graph.""" root = _repo(tmp_path) common_record = _make_commit(root, "sym_root") common_id = common_record.commit_id tip_a_record = _make_commit(root, "sym_a_1", parent=common_id) tip_a = tip_a_record.commit_id tip_b_record = _make_commit(root, "sym_b_1", parent=common_id) tip_b = tip_b_record.commit_id ab = find_merge_base(root, tip_a, tip_b) ba = find_merge_base(root, tip_b, tip_a) assert ab == ba == common_id # --------------------------------------------------------------------------- # 9. walk_commits_between_result — from_commit_id exclusion # --------------------------------------------------------------------------- class TestWalkFromCommitExclusion: def test_from_commit_excluded(self, tmp_path: pathlib.Path) -> None: """from_commit_id is exclusive — it must not appear in the result.""" root = _repo(tmp_path) ids = _build_linear_chain(root, 10) # newest first stop = ids[5] # stop before this one result = walk_commits_between_result(root, ids[0], from_commit_id=stop) result_ids = {c.commit_id for c in result["commits"]} assert stop not in result_ids assert result["truncated"] is False assert result["count"] == 5 # ids[0]..ids[4] def test_no_from_commit_walks_all(self, tmp_path: pathlib.Path) -> None: root = _repo(tmp_path) ids = _build_linear_chain(root, 20) result = walk_commits_between_result(root, ids[0], max_commits=100) assert result["count"] == 20 assert result["truncated"] is False