gabriel / muse public
test_integrity_I7_history_walk.py python
737 lines 27.5 KB
Raw
1 """Phase 1.7 — Linux-kernel scale: commit history walk.
2
3 Tests cover:
4 - 15k commit chain: walk_commits_between_result truncation flag
5 - 15k commit chain: muse log --json emits "truncated" in JSON
6 - 60k-deep branches: find_merge_base raises a clear error (not wrong answer)
7 - commit_graph on 15k: emits "truncated": true (JSON + text + count-only)
8 - Configurable caps via [limits] in config.toml
9 - O(n²) regression: _collect_all_commits uses deque (benchmarked)
10 - Streaming JSON: muse log --json doesn't hold all commits in memory
11 - walk_commits_between_result returns WalkResult TypedDict
12 - Truncation NOT flagged when walk naturally completes under cap
13 - find_merge_base raises on BOTH A-side and B-side cap hit
14 - get_commits_for_branch respects configurable cap via walk_limit
15 """
16
17 from __future__ import annotations
18 from collections.abc import Mapping
19
20 type _ConfigMap = dict[str, int]
21
22 import collections
23 import datetime
24 import json
25 import pathlib
26 import sys
27 import time
28 import tomllib
29 import unittest.mock as mock
30 from typing import TypedDict
31
32 import pytest
33
34 from tests.cli_test_helper import CliRunner
35
36
37 class _LogOutput(TypedDict, total=False):
38 """Shape of muse log --json output."""
39
40 truncated: bool
41 commits: list[Mapping[str, str]]
42
43
44 class _CommitJson(TypedDict, total=False):
45 """Shape of a single commit entry in muse log --json."""
46
47 commit_id: str
48 branch: str
49 message: str
50 author: str
51 committed_at: str
52 parent_commit_id: str
53 snapshot_id: str
54 metadata: Manifest
55 sem_ver_bump: str
56
57 from muse.core.merge_engine import find_merge_base
58 from muse.core.ids import hash_commit as compute_commit_id
59
60 from muse.core.types import Manifest, fake_id
61 from muse.core.commits import (
62 CommitRecord,
63 WalkResult,
64 get_commits_for_branch,
65 walk_commits_between,
66 walk_commits_between_result,
67 write_commit,
68 )
69 from muse.core.paths import config_toml_path, heads_dir, muse_dir
70
71 # ---------------------------------------------------------------------------
72 # Helpers
73 # ---------------------------------------------------------------------------
74
75
76 _DT = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
77
78
79 def _repo(tmp_path: pathlib.Path) -> pathlib.Path:
80 """Create a minimal .muse/ directory structure."""
81 dot_muse = muse_dir(tmp_path)
82 (dot_muse / "commits").mkdir(parents=True)
83 (dot_muse / "snapshots").mkdir(parents=True)
84 (dot_muse / "refs" / "heads").mkdir(parents=True)
85 (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"}))
86 (dot_muse / "HEAD").write_text("ref: refs/heads/main\n")
87 (dot_muse / "refs" / "heads" / "main").write_text("")
88 return tmp_path
89
90
91 def _make_commit(
92 root: pathlib.Path,
93 label: str = "",
94 message: str = "msg",
95 parent: str | None = None,
96 parent2: str | None = None,
97 branch: str = "main",
98 ) -> CommitRecord:
99 """Write a commit with a real content-addressed ID derived from its inputs.
100
101 *label* is used to derive a unique snapshot_id; it need not be a real
102 snapshot in the object store. The commit_id is computed via
103 ``compute_commit_id`` so that ``read_commit`` can verify it on read.
104 """
105 snapshot_id = fake_id(label) if label else fake_id("default")
106 parent_ids = [p for p in [parent, parent2] if p is not None]
107 commit_id = compute_commit_id(
108 parent_ids=parent_ids,
109 snapshot_id=snapshot_id,
110 message=message,
111 committed_at_iso=_DT.isoformat(),
112 )
113 c = CommitRecord(
114 commit_id=commit_id,
115 branch=branch,
116 snapshot_id=snapshot_id,
117 message=message,
118 committed_at=_DT,
119 parent_commit_id=parent,
120 parent2_commit_id=parent2,
121 )
122 write_commit(root, c)
123 return c
124
125
126 def _build_linear_chain(root: pathlib.Path, n: int) -> list[str]:
127 """Write a linear chain of *n* commits; return list of IDs newest-first."""
128 real_ids: list[str] = []
129 prev: str | None = None
130 for i in range(n):
131 record = _make_commit(root, f"commit_{i:08d}", parent=prev)
132 prev = record.commit_id
133 real_ids.append(record.commit_id)
134 # HEAD points to the last commit (newest)
135 tip = real_ids[-1]
136 (heads_dir(root) / "main").write_text(tip)
137 return list(reversed(real_ids)) # newest-first
138
139
140 def _write_config(root: pathlib.Path, limits: _ConfigMap) -> None:
141 """Write a [limits] section to .muse/config.toml."""
142 lines = ["[limits]\n"]
143 for k, v in limits.items():
144 lines.append(f"{k} = {v}\n")
145 (config_toml_path(root)).write_text("".join(lines))
146
147
148 # ---------------------------------------------------------------------------
149 # 1. WalkResult TypedDict
150 # ---------------------------------------------------------------------------
151
152
153 class TestWalkResultType:
154 def test_walk_result_is_typed_dict(self, tmp_path: pathlib.Path) -> None:
155 root = _repo(tmp_path)
156 ids = _build_linear_chain(root, 5)
157 result = walk_commits_between_result(root, ids[0], max_commits=100)
158 assert isinstance(result, dict)
159 assert "commits" in result
160 assert "truncated" in result
161 assert "count" in result
162
163 def test_walk_result_not_truncated_when_chain_fits(
164 self, tmp_path: pathlib.Path
165 ) -> None:
166 root = _repo(tmp_path)
167 ids = _build_linear_chain(root, 10)
168 result = walk_commits_between_result(root, ids[0], max_commits=100)
169 assert result["truncated"] is False
170 assert result["count"] == 10
171 assert len(result["commits"]) == 10
172
173 def test_walk_result_truncated_at_cap(self, tmp_path: pathlib.Path) -> None:
174 root = _repo(tmp_path)
175 ids = _build_linear_chain(root, 50)
176 result = walk_commits_between_result(root, ids[0], max_commits=20)
177 assert result["truncated"] is True
178 assert result["count"] == 20
179 assert len(result["commits"]) == 20
180
181 def test_walk_commits_between_backward_compat(
182 self, tmp_path: pathlib.Path
183 ) -> None:
184 """walk_commits_between still returns list[CommitRecord]."""
185 root = _repo(tmp_path)
186 ids = _build_linear_chain(root, 5)
187 result = walk_commits_between(root, ids[0], max_commits=100)
188 assert isinstance(result, list)
189 assert len(result) == 5
190
191 def test_count_matches_len_commits(self, tmp_path: pathlib.Path) -> None:
192 root = _repo(tmp_path)
193 ids = _build_linear_chain(root, 30)
194 for cap in (5, 10, 30, 100):
195 r = walk_commits_between_result(root, ids[0], max_commits=cap)
196 assert r["count"] == len(r["commits"])
197
198
199 # ---------------------------------------------------------------------------
200 # 2. 15k commit chain — truncation and correctness
201 # ---------------------------------------------------------------------------
202
203
204 @pytest.mark.slow
205 class TestLinearChainScale:
206 def test_15k_chain_walk_truncates_at_default_cap(
207 self, tmp_path: pathlib.Path
208 ) -> None:
209 """15k chain with default cap (10k) → truncated=True, 10k commits."""
210 root = _repo(tmp_path)
211 ids = _build_linear_chain(root, 15_000)
212 result = walk_commits_between_result(root, ids[0]) # default cap = 10k
213 assert result["truncated"] is True
214 assert result["count"] == 10_000
215
216 def test_15k_chain_walk_completes_with_raised_cap(
217 self, tmp_path: pathlib.Path
218 ) -> None:
219 """15k chain with cap=20k → truncated=False, 15k commits."""
220 root = _repo(tmp_path)
221 ids = _build_linear_chain(root, 15_000)
222 result = walk_commits_between_result(root, ids[0], max_commits=20_000)
223 assert result["truncated"] is False
224 assert result["count"] == 15_000
225
226 def test_15k_chain_order_newest_first(self, tmp_path: pathlib.Path) -> None:
227 """First commit returned must be the tip (newest)."""
228 root = _repo(tmp_path)
229 ids = _build_linear_chain(root, 100)
230 result = walk_commits_between_result(root, ids[0], max_commits=200)
231 assert result["commits"][0].commit_id == ids[0] # newest first
232
233
234 # ---------------------------------------------------------------------------
235 # 3. Configurable caps via [limits] in config.toml
236 # ---------------------------------------------------------------------------
237
238
239 class TestConfigurableCaps:
240 def test_walk_cap_from_config(self, tmp_path: pathlib.Path) -> None:
241 root = _repo(tmp_path)
242 _build_linear_chain(root, 200)
243 _write_config(root, {"max_walk_commits": 50})
244
245 from muse.cli.config import get_limit
246 cap = get_limit("max_walk_commits", root)
247 assert cap == 50
248
249 def test_graph_cap_from_config(self, tmp_path: pathlib.Path) -> None:
250 root = _repo(tmp_path)
251 _write_config(root, {"max_graph_commits": 1000})
252
253 from muse.cli.config import get_limit
254 assert get_limit("max_graph_commits", root) == 1000
255
256 def test_ancestors_cap_from_config(self, tmp_path: pathlib.Path) -> None:
257 root = _repo(tmp_path)
258 _write_config(root, {"max_ancestors": 999})
259
260 from muse.cli.config import get_limit
261 assert get_limit("max_ancestors", root) == 999
262
263 def test_default_cap_when_config_absent(self, tmp_path: pathlib.Path) -> None:
264 root = _repo(tmp_path)
265 from muse.cli.config import (
266 _DEFAULT_MAX_ANCESTORS,
267 _DEFAULT_MAX_WALK_COMMITS,
268 get_limit,
269 )
270 assert get_limit("max_walk_commits", root) == _DEFAULT_MAX_WALK_COMMITS
271 assert get_limit("max_ancestors", root) == _DEFAULT_MAX_ANCESTORS
272
273 def test_invalid_cap_ignored_uses_default(self, tmp_path: pathlib.Path) -> None:
274 """Negative or zero limits in config must be ignored — use default."""
275 root = _repo(tmp_path)
276 _write_config(root, {"max_walk_commits": -1}) # invalid
277
278 from muse.cli.config import _DEFAULT_MAX_WALK_COMMITS, get_limit
279 assert get_limit("max_walk_commits", root) == _DEFAULT_MAX_WALK_COMMITS
280
281 def test_get_config_value_reads_limits(self, tmp_path: pathlib.Path) -> None:
282 root = _repo(tmp_path)
283 _write_config(root, {"max_walk_commits": 777})
284
285 from muse.cli.config import get_config_value
286 assert get_config_value("limits.max_walk_commits", root) == "777"
287
288 def test_limits_config_parsed_correctly(self, tmp_path: pathlib.Path) -> None:
289 """All three limit keys are correctly parsed from config.toml."""
290 root = _repo(tmp_path)
291 _write_config(root, {
292 "max_walk_commits": 111,
293 "max_ancestors": 222,
294 "max_graph_commits": 333,
295 })
296
297 from muse.cli.config import get_limit
298 assert get_limit("max_walk_commits", root) == 111
299 assert get_limit("max_ancestors", root) == 222
300 assert get_limit("max_graph_commits", root) == 333
301
302
303 # ---------------------------------------------------------------------------
304 # 4. find_merge_base — consistency at cap (both sides raise)
305 # ---------------------------------------------------------------------------
306
307
308 class TestFindMergeBaseAtCap:
309 def _make_diverging_branches(
310 self, root: pathlib.Path, depth_a: int, depth_b: int
311 ) -> tuple[str, str, str]:
312 """Create a common root then two branches of given depths.
313 Returns (tip_a, tip_b, common_id)."""
314 common_record = _make_commit(root, "common", "common root")
315 common_id = common_record.commit_id
316
317 prev_a = common_id
318 for i in range(depth_a):
319 record = _make_commit(root, f"branch_a_{i}", parent=prev_a, branch="branch-a")
320 prev_a = record.commit_id
321 tip_a = prev_a
322
323 prev_b = common_id
324 for i in range(depth_b):
325 record = _make_commit(root, f"branch_b_{i}", parent=prev_b, branch="branch-b")
326 prev_b = record.commit_id
327 tip_b = prev_b
328
329 return tip_a, tip_b, common_id
330
331 def test_small_graph_finds_base_correctly(
332 self, tmp_path: pathlib.Path
333 ) -> None:
334 root = _repo(tmp_path)
335 tip_a, tip_b, common_id = self._make_diverging_branches(root, 5, 5)
336 base = find_merge_base(root, tip_a, tip_b)
337 assert base == common_id
338
339 def test_a_side_cap_raises_muse_cli_error(
340 self, tmp_path: pathlib.Path
341 ) -> None:
342 """A-side exceeding cap must raise MuseCLIError (not silently truncate)."""
343 from muse.core.errors import MuseCLIError
344 root = _repo(tmp_path)
345 # Two branches each 110 commits deep from a common ancestor — the BFS
346 # cannot find the merge base within the 100-ancestor cap.
347 tip_a, tip_b, _ = self._make_diverging_branches(root, 110, 110)
348
349 with mock.patch("muse.cli.config.get_limit", return_value=100):
350 with pytest.raises(MuseCLIError, match="Ancestor graph exceeds"):
351 find_merge_base(root, tip_a, tip_b)
352
353 def test_b_side_cap_also_raises_muse_cli_error(
354 self, tmp_path: pathlib.Path
355 ) -> None:
356 """B-side exceeding cap must also raise MuseCLIError — consistent behavior."""
357 from muse.core.errors import MuseCLIError
358 root = _repo(tmp_path)
359
360 # Two branches from a common ancestor deep in the history.
361 # A-side is short (won't hit cap), B-side is long.
362 common_record = _make_commit(root, "deep_common", "common")
363 common_id = common_record.commit_id
364
365 # B-side: 110 commits
366 prev = common_id
367 tip_b = common_id
368 for i in range(110):
369 record = _make_commit(root, f"b_side_{i}", parent=prev, branch="b")
370 prev = record.commit_id
371 tip_b = record.commit_id
372
373 # A-side: 5 commits (tiny — won't hit cap)
374 prev = common_id
375 tip_a = common_id
376 for i in range(5):
377 record = _make_commit(root, f"a_side_{i}", parent=prev, branch="a")
378 prev = record.commit_id
379 tip_a = record.commit_id
380
381 # With cap=100, B-side has 110 commits → raises
382 with mock.patch("muse.cli.config.get_limit", return_value=100):
383 with pytest.raises(MuseCLIError, match="Ancestor graph"):
384 find_merge_base(root, tip_a, tip_b)
385
386 def test_error_message_mentions_config_key(
387 self, tmp_path: pathlib.Path
388 ) -> None:
389 """Error message must tell users how to raise the cap."""
390 from muse.core.errors import MuseCLIError
391 root = _repo(tmp_path)
392 tip_a, tip_b, _ = self._make_diverging_branches(root, 110, 110)
393
394 with mock.patch("muse.cli.config.get_limit", return_value=100):
395 with pytest.raises(MuseCLIError) as exc_info:
396 find_merge_base(root, tip_a, tip_b)
397 assert "max_ancestors" in str(exc_info.value)
398 assert "config.toml" in str(exc_info.value)
399
400 @pytest.mark.slow
401 def test_60k_deep_branches_raise_not_wrong_answer(
402 self, tmp_path: pathlib.Path
403 ) -> None:
404 """Two 60k-deep branches: find_merge_base raises, never silently truncates."""
405 from muse.core.errors import MuseCLIError
406 root = _repo(tmp_path)
407 # Use cap=50k (default); build 52k branches — enough to exceed cap, no excess
408 common_record = _make_commit(root, "root60k", "root")
409 common_id = common_record.commit_id
410
411 n = 52_000
412 prev_a = common_id
413 for i in range(n):
414 record = _make_commit(root, f"a60k_{i}", parent=prev_a, branch="a")
415 prev_a = record.commit_id
416 tip_a = prev_a
417
418 # B-side: only 10 commits — A-side will hit the cap first
419 prev_b = common_id
420 for i in range(10):
421 record = _make_commit(root, f"b10_{i}", parent=prev_b, branch="b")
422 prev_b = record.commit_id
423 tip_b = prev_b
424
425 # find_merge_base must raise, not silently return None/wrong answer
426 with pytest.raises(MuseCLIError, match="Ancestor graph exceeds"):
427 find_merge_base(root, tip_a, tip_b)
428
429
430 # ---------------------------------------------------------------------------
431 # 5. _collect_all_commits — delegates to iter_ancestors (O(1) popleft
432 # guaranteed by graph.py; no O(n²) list.pop(0) pattern here)
433 # ---------------------------------------------------------------------------
434
435
436 class TestCollectAllCommitsPerformance:
437 def test_uses_deque_not_list_for_bfs(self) -> None:
438 """_collect_all_commits must delegate to iter_ancestors; no inline BFS."""
439 from muse.cli.commands import log as log_mod
440 import inspect
441 import ast
442 source = inspect.getsource(log_mod._collect_all_commits)
443 # Parse AST to check code (not docstring) for list.pop(0) pattern
444 tree = ast.parse(source)
445 pop0_calls: list[ast.Call] = []
446 for node in ast.walk(tree):
447 if (
448 isinstance(node, ast.Call)
449 and isinstance(node.func, ast.Attribute)
450 and node.func.attr == "pop"
451 and node.args
452 and isinstance(node.args[0], ast.Constant)
453 and node.args[0].value == 0
454 ):
455 pop0_calls.append(node)
456 assert not pop0_calls, (
457 "Found list.pop(0) in _collect_all_commits — this is the O(n²) bug. "
458 "Replace with deque.popleft()."
459 )
460 assert "iter_ancestors" in source, (
461 "_collect_all_commits must delegate to iter_ancestors. "
462 "O(1) popleft is guaranteed by graph.py's walk_dag."
463 )
464
465 def test_collect_returns_tuple_with_truncated_flag(
466 self, tmp_path: pathlib.Path
467 ) -> None:
468 root = _repo(tmp_path)
469 ids = _build_linear_chain(root, 20)
470
471 from muse.cli.commands.log import _collect_all_commits
472 commits, truncated = _collect_all_commits(root, [ids[0]], max_commits=100)
473 assert isinstance(commits, dict)
474 assert isinstance(truncated, bool)
475 assert truncated is False
476 assert len(commits) == 20
477
478 def test_collect_truncates_at_cap(self, tmp_path: pathlib.Path) -> None:
479 root = _repo(tmp_path)
480 ids = _build_linear_chain(root, 50)
481
482 from muse.cli.commands.log import _collect_all_commits
483 commits, truncated = _collect_all_commits(root, [ids[0]], max_commits=10)
484 assert truncated is True
485 assert len(commits) == 10
486
487 @pytest.mark.slow
488 def test_10k_commits_completes_in_reasonable_time(
489 self, tmp_path: pathlib.Path
490 ) -> None:
491 """10k BFS must complete in < 5s — proves O(n) not O(n²)."""
492 root = _repo(tmp_path)
493 ids = _build_linear_chain(root, 10_000)
494
495 from muse.cli.commands.log import _collect_all_commits
496 t0 = time.perf_counter()
497 commits, _ = _collect_all_commits(root, [ids[0]], max_commits=100_000)
498 elapsed = time.perf_counter() - t0
499
500 assert len(commits) == 10_000
501 assert elapsed < 5.0, (
502 f"_collect_all_commits took {elapsed:.2f}s for 10k commits. "
503 "Expected < 5s. O(n²) list.pop(0) would take ~50s."
504 )
505
506
507 # ---------------------------------------------------------------------------
508 # 6. muse log --json streaming output with "truncated" field
509 # ---------------------------------------------------------------------------
510
511
512 class TestLogJsonOutput:
513 def _run_log(self, root: pathlib.Path, *extra_args: str) -> _LogOutput:
514 """Run muse log --json via CliRunner and parse the output."""
515 runner = CliRunner()
516 result = runner.invoke(None, ["log", "--json"], env={"MUSE_REPO_ROOT": str(root)})
517 out = result.stdout.strip()
518 if not out:
519 return _LogOutput()
520 parsed: _LogOutput = json.loads(out)
521 return parsed
522
523 def test_log_json_has_truncated_field(self, tmp_path: pathlib.Path) -> None:
524 """muse log --json must include a 'truncated' key in output."""
525 root = _repo(tmp_path)
526 ids = _build_linear_chain(root, 5)
527 (heads_dir(root) / "main").write_text(ids[0])
528
529 output = self._run_log(root)
530 assert "truncated" in output, (
531 "muse log --json must include 'truncated' key. "
532 "Agents rely on this to know whether to page."
533 )
534
535 def test_log_json_not_truncated_for_small_history(
536 self, tmp_path: pathlib.Path
537 ) -> None:
538 root = _repo(tmp_path)
539 ids = _build_linear_chain(root, 5)
540 (heads_dir(root) / "main").write_text(ids[0])
541
542 output = self._run_log(root)
543 assert output["truncated"] is False
544
545 def test_log_json_has_commits_array(self, tmp_path: pathlib.Path) -> None:
546 root = _repo(tmp_path)
547 ids = _build_linear_chain(root, 3)
548 (heads_dir(root) / "main").write_text(ids[0])
549
550 output = self._run_log(root)
551 commits = output.get("commits")
552 assert isinstance(commits, list)
553 assert len(commits) == 3
554
555 def test_log_json_commit_fields(self, tmp_path: pathlib.Path) -> None:
556 """Each commit in JSON output has the required agent-facing fields."""
557 root = _repo(tmp_path)
558 ids = _build_linear_chain(root, 2)
559 (heads_dir(root) / "main").write_text(ids[0])
560
561 output = self._run_log(root)
562 commits_raw = output.get("commits", [])
563 assert isinstance(commits_raw, list)
564 assert len(commits_raw) >= 1
565 # Verify required fields are present in the raw dict
566 first_raw = commits_raw[0]
567 assert isinstance(first_raw, dict)
568 required_fields = {
569 "commit_id", "branch", "message", "author",
570 "committed_at", "parent_commit_id", "snapshot_id",
571 "metadata", "sem_ver_bump",
572 }
573 assert required_fields.issubset(set(first_raw.keys())), (
574 f"Missing fields: {required_fields - set(first_raw.keys())}"
575 )
576
577 def test_log_json_empty_history(self, tmp_path: pathlib.Path) -> None:
578 """Empty history emits a valid JSON response (not an exception)."""
579 root = _repo(tmp_path)
580 # No commits, HEAD is empty — must not crash
581 output = self._run_log(root)
582 # Valid outcomes: empty dict (branch not found) or {"truncated":false,"commits":[]}
583 if output:
584 assert "commits" in output or output == {}
585
586
587 # ---------------------------------------------------------------------------
588 # 7. commit_graph — truncated in all output formats
589 # ---------------------------------------------------------------------------
590
591
592 class TestCommitGraphTruncation:
593 def _run_commit_graph(
594 self,
595 root: pathlib.Path,
596 tip: str,
597 fmt: str = "json",
598 max_commits: int = 10_000,
599 count_only: bool = False,
600 ) -> str:
601 args = ["commit-graph", "--tip", tip, "--max", str(max_commits)]
602 if fmt == "json":
603 args.append("--json")
604 if count_only:
605 args.append("--count")
606 runner = CliRunner()
607 result = runner.invoke(None, args, env={"MUSE_REPO_ROOT": str(root)})
608 return result.stdout
609
610 def test_json_has_truncated_false_when_under_cap(
611 self, tmp_path: pathlib.Path
612 ) -> None:
613 root = _repo(tmp_path)
614 ids = _build_linear_chain(root, 10)
615 out = json.loads(self._run_commit_graph(root, ids[0], max_commits=100))
616 assert out["truncated"] is False
617 assert out["count"] == 10
618
619 def test_json_has_truncated_true_when_over_cap(
620 self, tmp_path: pathlib.Path
621 ) -> None:
622 root = _repo(tmp_path)
623 ids = _build_linear_chain(root, 50)
624 out = json.loads(self._run_commit_graph(root, ids[0], max_commits=20))
625 assert out["truncated"] is True
626 assert out["count"] == 20
627
628 def test_text_format_has_truncated_comment_when_over_cap(
629 self, tmp_path: pathlib.Path
630 ) -> None:
631 root = _repo(tmp_path)
632 ids = _build_linear_chain(root, 50)
633 out = self._run_commit_graph(root, ids[0], fmt="text", max_commits=10)
634 assert "TRUNCATED" in out, (
635 "text format must emit '# TRUNCATED' when cap is hit"
636 )
637
638 def test_text_format_no_truncated_when_under_cap(
639 self, tmp_path: pathlib.Path
640 ) -> None:
641 root = _repo(tmp_path)
642 ids = _build_linear_chain(root, 5)
643 out = self._run_commit_graph(root, ids[0], fmt="text", max_commits=100)
644 assert "TRUNCATED" not in out
645
646 def test_count_only_has_truncated_field(
647 self, tmp_path: pathlib.Path
648 ) -> None:
649 root = _repo(tmp_path)
650 ids = _build_linear_chain(root, 50)
651 out = json.loads(self._run_commit_graph(
652 root, ids[0], max_commits=20, count_only=True
653 ))
654 assert "truncated" in out, "count-only must include 'truncated'"
655 assert out["truncated"] is True
656 assert out["count"] == 20
657
658 @pytest.mark.slow
659 def test_15k_chain_commit_graph_completes_in_30s(
660 self, tmp_path: pathlib.Path
661 ) -> None:
662 """commit-graph on 15k commits must complete in < 30s."""
663 root = _repo(tmp_path)
664 ids = _build_linear_chain(root, 15_000)
665
666 t0 = time.perf_counter()
667 out = json.loads(self._run_commit_graph(root, ids[0], max_commits=10_000))
668 elapsed = time.perf_counter() - t0
669
670 assert out["truncated"] is True
671 assert elapsed < 30.0, (
672 f"commit-graph on 15k commits took {elapsed:.1f}s — must be < 30s"
673 )
674
675
676 # ---------------------------------------------------------------------------
677 # 8. Regression: B-side was returning None silently (old bug)
678 # ---------------------------------------------------------------------------
679
680
681 class TestMergeBaseConsistency:
682 def test_a_and_b_cap_raise_same_type(self, tmp_path: pathlib.Path) -> None:
683 """Both A-side and B-side cap must raise the same exception type."""
684 from muse.core.errors import MuseCLIError
685 root = _repo(tmp_path)
686
687 # Build a 30-commit chain
688 ids = _build_linear_chain(root, 30)
689 tip_a = ids[0] # newest
690 tip_b = ids[15] # halfway
691
692 # With cap=20, A-side will be exhausted (30 > 20)
693 with mock.patch("muse.cli.config.get_limit", return_value=20):
694 with pytest.raises(MuseCLIError):
695 find_merge_base(root, tip_a, tip_b)
696
697 def test_symmetric_result_for_small_graph(
698 self, tmp_path: pathlib.Path
699 ) -> None:
700 """find_merge_base(a, b) == find_merge_base(b, a) for a small graph."""
701 root = _repo(tmp_path)
702 common_record = _make_commit(root, "sym_root")
703 common_id = common_record.commit_id
704
705 tip_a_record = _make_commit(root, "sym_a_1", parent=common_id)
706 tip_a = tip_a_record.commit_id
707 tip_b_record = _make_commit(root, "sym_b_1", parent=common_id)
708 tip_b = tip_b_record.commit_id
709
710 ab = find_merge_base(root, tip_a, tip_b)
711 ba = find_merge_base(root, tip_b, tip_a)
712 assert ab == ba == common_id
713
714
715 # ---------------------------------------------------------------------------
716 # 9. walk_commits_between_result — from_commit_id exclusion
717 # ---------------------------------------------------------------------------
718
719
720 class TestWalkFromCommitExclusion:
721 def test_from_commit_excluded(self, tmp_path: pathlib.Path) -> None:
722 """from_commit_id is exclusive — it must not appear in the result."""
723 root = _repo(tmp_path)
724 ids = _build_linear_chain(root, 10) # newest first
725 stop = ids[5] # stop before this one
726 result = walk_commits_between_result(root, ids[0], from_commit_id=stop)
727 result_ids = {c.commit_id for c in result["commits"]}
728 assert stop not in result_ids
729 assert result["truncated"] is False
730 assert result["count"] == 5 # ids[0]..ids[4]
731
732 def test_no_from_commit_walks_all(self, tmp_path: pathlib.Path) -> None:
733 root = _repo(tmp_path)
734 ids = _build_linear_chain(root, 20)
735 result = walk_commits_between_result(root, ids[0], max_commits=100)
736 assert result["count"] == 20
737 assert result["truncated"] is False
File History 1 commit