gabriel / muse public

test_core_query_engine.py file-level

at sha256:c · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 πŸ’₯ blast risk
sha256:4 Merge branch 'dev' into main · gabriel · Jun 17, 2026
1 """Tests for the generic query engine in muse/core/query_engine.py.
2
3 Also contains regression tests that prove the two dead walkers in
4 ``muse.plugins.code._query`` (``walk_commits`` and ``walk_commits_range``) are
5 fully covered by the live walkers (``walk_commits_bfs`` and
6 ``store.walk_commits_between``) before those dead functions are deleted.
7 """
8
9 import datetime
10 import pathlib
11 import tempfile
12
13 import pytest
14
15 from muse.core.query_engine import QueryMatch, format_matches, walk_history
16 from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id
17 from muse.core.commits import (
18 CommitRecord,
19 walk_commits_between,
20 write_commit,
21 )
22 from muse.plugins.code._query import walk_commits_bfs
23 from muse.core.types import Manifest
24 from muse.core.paths import heads_dir, muse_dir
25
26
27 # ---------------------------------------------------------------------------
28 # Helpers
29 # ---------------------------------------------------------------------------
30
31
32 def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path:
33 """Set up a minimal .muse/ structure for query_engine tests."""
34 muse = muse_dir(tmp_path)
35 muse.mkdir()
36 (muse / "repo.json").write_text('{"repo_id":"test-repo"}')
37 (muse / "HEAD").write_text("ref: refs/heads/main")
38 (muse / "commits").mkdir()
39 (muse / "snapshots").mkdir()
40 (muse / "refs" / "heads").mkdir(parents=True)
41 return tmp_path
42
43
44 def _write_commit(root: pathlib.Path, label: str, parent_id: str | None = None) -> CommitRecord:
45 """Write a content-addressed CommitRecord. *label* is used only in the message."""
46 snap_id = compute_snapshot_id({})
47 committed_at = datetime.datetime.now(datetime.timezone.utc)
48 parent_ids = [parent_id] if parent_id else []
49 commit_id = compute_commit_id(
50 parent_ids=parent_ids,
51 snapshot_id=snap_id,
52 message=f"commit {label}",
53 committed_at_iso=committed_at.isoformat(),
54 author="test-author",
55 )
56 record = CommitRecord(
57 commit_id=commit_id,
58 branch="main",
59 snapshot_id=snap_id,
60 message=f"commit {label}",
61 committed_at=committed_at,
62 parent_commit_id=parent_id,
63 author="test-author",
64 )
65 write_commit(root, record)
66 return record
67
68
69 # ---------------------------------------------------------------------------
70 # walk_history
71 # ---------------------------------------------------------------------------
72
73
74 class TestWalkHistory:
75 def test_empty_branch_returns_empty(self) -> None:
76 with tempfile.TemporaryDirectory() as tmp:
77 root = _make_repo(pathlib.Path(tmp))
78 results = walk_history(root, "main", lambda c, m, r: [])
79 assert results == []
80
81 def test_single_commit_visited(self) -> None:
82 with tempfile.TemporaryDirectory() as tmp:
83 root = _make_repo(pathlib.Path(tmp))
84 c = _write_commit(root, "aaa111")
85 (heads_dir(root) / "main").write_text(c.commit_id)
86
87 visited: list[str] = []
88
89 def evaluator(commit: CommitRecord, manifest: Manifest, r: pathlib.Path) -> list[QueryMatch]:
90 visited.append(commit.commit_id)
91 return []
92
93 walk_history(root, "main", evaluator, load_manifest=False)
94 assert visited == [c.commit_id]
95
96 def test_chain_walked_newest_first(self) -> None:
97 with tempfile.TemporaryDirectory() as tmp:
98 root = _make_repo(pathlib.Path(tmp))
99 c_aaa = _write_commit(root, "aaa111")
100 c_bbb = _write_commit(root, "bbb222", parent_id=c_aaa.commit_id)
101 (heads_dir(root) / "main").write_text(c_bbb.commit_id)
102
103 visited: list[str] = []
104
105 def evaluator(commit: CommitRecord, manifest: Manifest, r: pathlib.Path) -> list[QueryMatch]:
106 visited.append(commit.commit_id)
107 return []
108
109 walk_history(root, "main", evaluator, load_manifest=False)
110 assert visited == [c_bbb.commit_id, c_aaa.commit_id]
111
112 def test_matches_collected(self) -> None:
113 with tempfile.TemporaryDirectory() as tmp:
114 root = _make_repo(pathlib.Path(tmp))
115 c = _write_commit(root, "ccc333")
116 (heads_dir(root) / "main").write_text(c.commit_id)
117
118 def evaluator(commit: CommitRecord, manifest: Manifest, r: pathlib.Path) -> list[QueryMatch]:
119 return [QueryMatch(
120 commit_id=commit.commit_id,
121 author=commit.author,
122 committed_at=commit.committed_at.isoformat(),
123 branch=commit.branch,
124 detail="test match",
125 extra={},
126 )]
127
128 results = walk_history(root, "main", evaluator, load_manifest=False)
129 assert len(results) == 1
130 assert results[0]["detail"] == "test match"
131
132 def test_max_commits_limits_walk(self) -> None:
133 with tempfile.TemporaryDirectory() as tmp:
134 root = _make_repo(pathlib.Path(tmp))
135 records: list[CommitRecord] = []
136 for i in range(10):
137 parent_id = records[i - 1].commit_id if i > 0 else None
138 records.append(_write_commit(root, f"commit{i:03d}", parent_id=parent_id))
139 (heads_dir(root) / "main").write_text(records[-1].commit_id)
140
141 visited: list[str] = []
142
143 def evaluator(commit: CommitRecord, manifest: Manifest, r: pathlib.Path) -> list[QueryMatch]:
144 visited.append(commit.commit_id)
145 return []
146
147 walk_history(root, "main", evaluator, max_commits=3, load_manifest=False)
148 assert len(visited) == 3
149
150 def test_head_commit_id_override(self) -> None:
151 with tempfile.TemporaryDirectory() as tmp:
152 root = _make_repo(pathlib.Path(tmp))
153 c_aaa = _write_commit(root, "aaa111")
154 c_bbb = _write_commit(root, "bbb222", parent_id=c_aaa.commit_id)
155 # HEAD points to bbb222 but we override to aaa111.
156 (heads_dir(root) / "main").write_text(c_bbb.commit_id)
157
158 visited: list[str] = []
159
160 def evaluator(commit: CommitRecord, manifest: Manifest, r: pathlib.Path) -> list[QueryMatch]:
161 visited.append(commit.commit_id)
162 return []
163
164 walk_history(root, "main", evaluator, head_commit_id=c_aaa.commit_id, load_manifest=False)
165 assert visited == [c_aaa.commit_id]
166
167
168 # ---------------------------------------------------------------------------
169 # format_matches
170 # ---------------------------------------------------------------------------
171
172
173 class TestFormatMatches:
174 def test_empty_returns_no_matches(self) -> None:
175 assert "No matches" in format_matches([])
176
177 def test_single_match_formatted(self) -> None:
178 m = QueryMatch(
179 commit_id="a" * 64,
180 author="gabriel",
181 committed_at="2026-03-18T12:00:00+00:00",
182 branch="main",
183 detail="my_function (added)",
184 extra={},
185 )
186 out = format_matches([m])
187 assert ("a" * 64)[:8] in out
188 assert "gabriel" in out
189 assert "my_function (added)" in out
190
191 def test_agent_id_shown_when_present(self) -> None:
192 m = QueryMatch(
193 commit_id="a" * 64,
194 author="bot",
195 committed_at="2026-03-18T12:00:00+00:00",
196 branch="main",
197 detail="something",
198 extra={},
199 agent_id="claude-v4",
200 )
201 out = format_matches([m])
202 assert "claude-v4" in out
203
204 def test_max_results_truncation_message_updated(self) -> None:
205 """format_matches uses '--limit' in the truncation hint (not '--max')."""
206 matches = [
207 QueryMatch(
208 commit_id=f"commit{i:04d}",
209 author="x",
210 committed_at="2026-01-01T00:00:00+00:00",
211 branch="main",
212 detail=f"match {i}",
213 extra={},
214 )
215 for i in range(10)
216 ]
217 out = format_matches(matches, max_results=5)
218 assert "--limit" in out
219
220 def test_max_results_capped(self) -> None:
221 matches = [
222 QueryMatch(
223 commit_id=f"commit{i:04d}",
224 author="x",
225 committed_at="2026-01-01T00:00:00+00:00",
226 branch="main",
227 detail=f"match {i}",
228 extra={},
229 )
230 for i in range(100)
231 ]
232 out = format_matches(matches, max_results=5)
233 assert "95 more" in out
234
235
236 # ---------------------------------------------------------------------------
237 # Regression tests: dead walkers covered by live walkers
238 #
239 # These tests prove that walk_commits_bfs and store.walk_commits_between
240 # fully cover the use-cases of the dead walk_commits and walk_commits_range
241 # before those functions are deleted. If these tests pass, deletion is safe.
242 # ---------------------------------------------------------------------------
243
244
245 def _make_repo_for_walker(tmp_path: pathlib.Path) -> pathlib.Path:
246 muse = muse_dir(tmp_path)
247 muse.mkdir()
248 (muse / "repo.json").write_text('{"repo_id":"walker-test"}')
249 (muse / "HEAD").write_text("main")
250 (muse / "commits").mkdir()
251 (muse / "snapshots").mkdir()
252 (muse / "refs" / "heads").mkdir(parents=True)
253 return tmp_path
254
255
256 def _commit(
257 root: pathlib.Path,
258 label: str,
259 parent: str | None = None,
260 parent2: str | None = None,
261 ) -> CommitRecord:
262 """Write a content-addressed CommitRecord. *label* is used only in the message."""
263 snap_id = compute_snapshot_id({})
264 committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
265 parent_ids = [p for p in [parent, parent2] if p is not None]
266 commit_id = compute_commit_id(
267 parent_ids=parent_ids,
268 snapshot_id=snap_id,
269 message=f"msg {label}",
270 committed_at_iso=committed_at.isoformat(),
271 author="tester",
272 )
273 rec = CommitRecord(
274 commit_id=commit_id,
275 branch="main",
276 snapshot_id=snap_id,
277 message=f"msg {label}",
278 committed_at=committed_at,
279 parent_commit_id=parent,
280 parent2_commit_id=parent2,
281 author="tester",
282 )
283 write_commit(root, rec)
284 return rec
285
286
287 class TestWalkHistoryFollowMerges:
288 """Belt-and-suspenders tests for walk_history(follow_merges=True/False)."""
289
290 def test_follow_merges_false_skips_parent2(
291 self, tmp_path: pathlib.Path
292 ) -> None:
293 """follow_merges=False (default) stays on the main parent chain only."""
294 root = _make_repo_for_walker(tmp_path)
295 c_main1 = _commit(root, "main1")
296 c_feat1 = _commit(root, "feat1", parent=c_main1.commit_id)
297 c_merge = _commit(root, "merge_c", parent=c_main1.commit_id, parent2=c_feat1.commit_id)
298 (heads_dir(root) / "main").write_text(c_merge.commit_id)
299
300 visited: list[str] = []
301
302 def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
303 visited.append(c.commit_id)
304 return []
305
306 walk_history(root, "main", ev, follow_merges=False, load_manifest=False)
307 assert c_feat1.commit_id not in visited
308 assert c_merge.commit_id in visited
309 assert c_main1.commit_id in visited
310
311 def test_follow_merges_true_visits_parent2(
312 self, tmp_path: pathlib.Path
313 ) -> None:
314 """follow_merges=True visits both parents of a merge commit."""
315 root = _make_repo_for_walker(tmp_path)
316 c_base = _commit(root, "base")
317 c_feature = _commit(root, "feature", parent=c_base.commit_id)
318 c_merge = _commit(root, "merge_c", parent=c_base.commit_id, parent2=c_feature.commit_id)
319 (heads_dir(root) / "main").write_text(c_merge.commit_id)
320
321 visited: list[str] = []
322
323 def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
324 visited.append(c.commit_id)
325 return []
326
327 walk_history(root, "main", ev, follow_merges=True, load_manifest=False)
328 assert set(visited) == {c_merge.commit_id, c_base.commit_id, c_feature.commit_id}
329
330 def test_follow_merges_true_linear_chain(
331 self, tmp_path: pathlib.Path
332 ) -> None:
333 """follow_merges=True on a linear chain behaves identically to False."""
334 root = _make_repo_for_walker(tmp_path)
335 c_a = _commit(root, "a")
336 c_b = _commit(root, "b", parent=c_a.commit_id)
337 c_c = _commit(root, "c", parent=c_b.commit_id)
338 (heads_dir(root) / "main").write_text(c_c.commit_id)
339
340 visited_ff: list[str] = []
341 visited_ft: list[str] = []
342
343 def ev_ff(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
344 visited_ff.append(c.commit_id)
345 return []
346
347 def ev_ft(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
348 visited_ft.append(c.commit_id)
349 return []
350
351 walk_history(root, "main", ev_ff, follow_merges=False, load_manifest=False)
352 walk_history(root, "main", ev_ft, follow_merges=True, load_manifest=False)
353 assert set(visited_ff) == set(visited_ft) == {c_a.commit_id, c_b.commit_id, c_c.commit_id}
354
355 def test_follow_merges_since_filter_applies(
356 self, tmp_path: pathlib.Path
357 ) -> None:
358 """since filter still prunes commits even with follow_merges=True."""
359 root = _make_repo_for_walker(tmp_path)
360 # Pin explicit timestamps so since filter is deterministic.
361 t_old = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc)
362 t_new = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
363
364 snap_id = compute_snapshot_id({})
365 cid_old = compute_commit_id(
366 parent_ids=[],
367 snapshot_id=snap_id,
368 message="old",
369 committed_at_iso=t_old.isoformat(),
370 author="tester",
371 )
372 rec_old = CommitRecord(
373 commit_id=cid_old,
374 branch="main",
375 snapshot_id=snap_id,
376 message="old",
377 committed_at=t_old,
378 author="tester",
379 )
380 write_commit(root, rec_old)
381
382 cid_new = compute_commit_id(
383 parent_ids=[cid_old],
384 snapshot_id=snap_id,
385 message="new",
386 committed_at_iso=t_new.isoformat(),
387 author="tester",
388 )
389 rec_new = CommitRecord(
390 commit_id=cid_new,
391 branch="main",
392 snapshot_id=snap_id,
393 message="new",
394 committed_at=t_new,
395 parent_commit_id=cid_old,
396 author="tester",
397 )
398 write_commit(root, rec_new)
399 (heads_dir(root) / "main").write_text(cid_new)
400
401 visited: list[str] = []
402
403 def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
404 visited.append(c.commit_id)
405 return []
406
407 since = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc)
408 walk_history(root, "main", ev, follow_merges=True, since=since, load_manifest=False)
409 assert cid_new in visited
410 assert cid_old not in visited
411
412
413 def test_follow_merges_true_diamond_dag_no_duplicates(
414 self, tmp_path: pathlib.Path
415 ) -> None:
416 """BFS never visits the same commit twice (diamond DAG case)."""
417 root = _make_repo_for_walker(tmp_path)
418 # Diamond: base ← left ← merge, base ← right ← merge
419 c_base = _commit(root, "base")
420 c_left = _commit(root, "left", parent=c_base.commit_id)
421 c_right = _commit(root, "right", parent=c_base.commit_id)
422 c_merge = _commit(root, "merge_c", parent=c_left.commit_id, parent2=c_right.commit_id)
423 (heads_dir(root) / "main").write_text(c_merge.commit_id)
424
425 visited: list[str] = []
426
427 def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
428 visited.append(c.commit_id)
429 return []
430
431 walk_history(root, "main", ev, follow_merges=True, load_manifest=False)
432 # Each commit visited exactly once.
433 assert len(visited) == len(set(visited))
434 assert set(visited) == {c_base.commit_id, c_left.commit_id, c_right.commit_id, c_merge.commit_id}
435
436 def test_follow_merges_max_commits_respected(
437 self, tmp_path: pathlib.Path
438 ) -> None:
439 """max_commits caps BFS walk even with follow_merges=True."""
440 root = _make_repo_for_walker(tmp_path)
441 c1 = _commit(root, "c1")
442 c2 = _commit(root, "c2", parent=c1.commit_id)
443 c3 = _commit(root, "c3", parent=c2.commit_id)
444 c4 = _commit(root, "c4", parent=c3.commit_id)
445 (heads_dir(root) / "main").write_text(c4.commit_id)
446
447 visited: list[str] = []
448
449 def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
450 visited.append(c.commit_id)
451 return []
452
453 walk_history(root, "main", ev, follow_merges=True, max_commits=2, load_manifest=False)
454 assert len(visited) == 2
455
456 def test_follow_merges_evaluator_sees_match(
457 self, tmp_path: pathlib.Path
458 ) -> None:
459 """Matches from parent2 commits are included in results."""
460 root = _make_repo_for_walker(tmp_path)
461 c_base = _commit(root, "base")
462 c_feature = _commit(root, "feature", parent=c_base.commit_id)
463 c_merge = _commit(root, "merge_c", parent=c_base.commit_id, parent2=c_feature.commit_id)
464 (heads_dir(root) / "main").write_text(c_merge.commit_id)
465
466 def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
467 if c.commit_id == c_feature.commit_id:
468 return [QueryMatch(
469 commit_id=c.commit_id,
470 author=c.author,
471 committed_at=c.committed_at.isoformat(),
472 branch=c.branch,
473 detail="feature found",
474 extra={},
475 )]
476 return []
477
478 results = walk_history(root, "main", ev, follow_merges=True, load_manifest=False)
479 assert len(results) == 1
480 assert results[0]["detail"] == "feature found"
481
482 def test_follow_merges_false_misses_parent2_commit(
483 self, tmp_path: pathlib.Path
484 ) -> None:
485 """With follow_merges=False, parent2 commits are never evaluated."""
486 root = _make_repo_for_walker(tmp_path)
487 c_base = _commit(root, "base")
488 c_feature = _commit(root, "feature", parent=c_base.commit_id)
489 c_merge = _commit(root, "merge_c", parent=c_base.commit_id, parent2=c_feature.commit_id)
490 (heads_dir(root) / "main").write_text(c_merge.commit_id)
491
492 def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
493 if c.commit_id == c_feature.commit_id:
494 return [QueryMatch(
495 commit_id=c.commit_id,
496 author=c.author,
497 committed_at=c.committed_at.isoformat(),
498 branch=c.branch,
499 detail="feature found",
500 extra={},
501 )]
502 return []
503
504 results = walk_history(root, "main", ev, follow_merges=False, load_manifest=False)
505 assert results == [] # feature commit is never visited
506
507
508 class TestLiveWalkersContracts:
509 """Regression: walk_commits_bfs and walk_commits_between cover deleted walkers.
510
511 These tests lock down the contracts of the surviving walkers, proving
512 the deleted walk_commits and walk_commits_range are fully superseded.
513 """
514
515 def test_walk_commits_bfs_linear_chain(self, tmp_path: pathlib.Path) -> None:
516 """walk_commits_bfs on a linear chain returns all commits, newest first."""
517 root = _make_repo_for_walker(tmp_path)
518 c_aaa = _commit(root, "aaa")
519 c_bbb = _commit(root, "bbb", parent=c_aaa.commit_id)
520 c_ccc = _commit(root, "ccc", parent=c_bbb.commit_id)
521
522 live_commits, truncated = walk_commits_bfs(root, c_ccc.commit_id)
523 live_ids = [c.commit_id for c in live_commits]
524
525 assert truncated is False
526 assert set(live_ids) == {c_aaa.commit_id, c_bbb.commit_id, c_ccc.commit_id}
527
528 def test_walk_commits_bfs_follows_parent2(self, tmp_path: pathlib.Path) -> None:
529 """walk_commits_bfs reaches parent2 branches β€” supersedes dead linear walker."""
530 root = _make_repo_for_walker(tmp_path)
531 c_base = _commit(root, "base")
532 c_feature = _commit(root, "feature", parent=c_base.commit_id)
533 c_merge = _commit(root, "merge_commit", parent=c_base.commit_id, parent2=c_feature.commit_id)
534
535 live_commits, _ = walk_commits_bfs(root, c_merge.commit_id)
536 live_ids = set(c.commit_id for c in live_commits)
537
538 assert c_feature.commit_id in live_ids
539 assert c_base.commit_id in live_ids
540 assert c_merge.commit_id in live_ids
541
542 def test_walk_commits_between_range(self, tmp_path: pathlib.Path) -> None:
543 """walk_commits_between excludes from_commit_id β€” supersedes walk_commits_range."""
544 root = _make_repo_for_walker(tmp_path)
545 c1 = _commit(root, "c1")
546 c2 = _commit(root, "c2", parent=c1.commit_id)
547 c3 = _commit(root, "c3", parent=c2.commit_id)
548 c4 = _commit(root, "c4", parent=c3.commit_id)
549
550 result = walk_commits_between(root, to_commit_id=c4.commit_id, from_commit_id=c1.commit_id)
551 ids = [c.commit_id for c in result]
552
553 assert ids == [c4.commit_id, c3.commit_id, c2.commit_id]
554 assert c1.commit_id not in ids
555
556 def test_walk_commits_between_none_from(self, tmp_path: pathlib.Path) -> None:
557 """walk_commits_between with from_commit_id=None returns entire chain."""
558 root = _make_repo_for_walker(tmp_path)
559 c_x1 = _commit(root, "x1")
560 c_x2 = _commit(root, "x2", parent=c_x1.commit_id)
561
562 ids = [c.commit_id for c in walk_commits_between(root, c_x2.commit_id, None)]
563 assert ids == [c_x2.commit_id, c_x1.commit_id]
564
565 def test_walk_commits_bfs_stop_at_excludes_boundary(
566 self, tmp_path: pathlib.Path
567 ) -> None:
568 """walk_commits_bfs stop_at_commit_id excludes the boundary β€” same contract as walk_commits_between."""
569 root = _make_repo_for_walker(tmp_path)
570 c_p1 = _commit(root, "p1")
571 c_p2 = _commit(root, "p2", parent=c_p1.commit_id)
572 c_p3 = _commit(root, "p3", parent=c_p2.commit_id)
573
574 bfs_commits, _ = walk_commits_bfs(root, c_p3.commit_id, stop_at_commit_id=c_p1.commit_id)
575 bfs_ids = [c.commit_id for c in bfs_commits]
576
577 assert c_p1.commit_id not in bfs_ids
578 assert set(bfs_ids) == {c_p3.commit_id, c_p2.commit_id}