gabriel / muse public
test_core_query_engine.py python
574 lines 22.2 KB
Raw
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 29 days ago
1 """Tests for the generic query engine in muse/core/query_engine.py.
2
3 Also contains regression tests that prove the two dead walkers in
4 ``muse.plugins.code._query`` (``walk_commits`` and ``walk_commits_range``) are
5 fully covered by the live walkers (``walk_commits_bfs`` and
6 ``store.walk_commits_between``) before those dead functions are deleted.
7 """
8
9 import datetime
10 import pathlib
11 import tempfile
12
13 import pytest
14
15 from muse.core.query_engine import QueryMatch, format_matches, walk_history
16 from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id
17 from muse.core.store import CommitRecord, write_commit, walk_commits_between
18 from muse.plugins.code._query import walk_commits_bfs
19 from muse.core.types import Manifest
20 from muse.core.paths import heads_dir, muse_dir
21
22
23 # ---------------------------------------------------------------------------
24 # Helpers
25 # ---------------------------------------------------------------------------
26
27
28 def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path:
29 """Set up a minimal .muse/ structure for query_engine tests."""
30 muse = muse_dir(tmp_path)
31 muse.mkdir()
32 (muse / "repo.json").write_text('{"repo_id":"test-repo"}')
33 (muse / "HEAD").write_text("ref: refs/heads/main")
34 (muse / "commits").mkdir()
35 (muse / "snapshots").mkdir()
36 (muse / "refs" / "heads").mkdir(parents=True)
37 return tmp_path
38
39
40 def _write_commit(root: pathlib.Path, label: str, parent_id: str | None = None) -> CommitRecord:
41 """Write a content-addressed CommitRecord. *label* is used only in the message."""
42 snap_id = compute_snapshot_id({})
43 committed_at = datetime.datetime.now(datetime.timezone.utc)
44 parent_ids = [parent_id] if parent_id else []
45 commit_id = compute_commit_id(
46 parent_ids=parent_ids,
47 snapshot_id=snap_id,
48 message=f"commit {label}",
49 committed_at_iso=committed_at.isoformat(),
50 author="test-author",
51 )
52 record = CommitRecord(
53 commit_id=commit_id,
54 branch="main",
55 snapshot_id=snap_id,
56 message=f"commit {label}",
57 committed_at=committed_at,
58 parent_commit_id=parent_id,
59 author="test-author",
60 )
61 write_commit(root, record)
62 return record
63
64
65 # ---------------------------------------------------------------------------
66 # walk_history
67 # ---------------------------------------------------------------------------
68
69
70 class TestWalkHistory:
71 def test_empty_branch_returns_empty(self) -> None:
72 with tempfile.TemporaryDirectory() as tmp:
73 root = _make_repo(pathlib.Path(tmp))
74 results = walk_history(root, "main", lambda c, m, r: [])
75 assert results == []
76
77 def test_single_commit_visited(self) -> None:
78 with tempfile.TemporaryDirectory() as tmp:
79 root = _make_repo(pathlib.Path(tmp))
80 c = _write_commit(root, "aaa111")
81 (heads_dir(root) / "main").write_text(c.commit_id)
82
83 visited: list[str] = []
84
85 def evaluator(commit: CommitRecord, manifest: Manifest, r: pathlib.Path) -> list[QueryMatch]:
86 visited.append(commit.commit_id)
87 return []
88
89 walk_history(root, "main", evaluator, load_manifest=False)
90 assert visited == [c.commit_id]
91
92 def test_chain_walked_newest_first(self) -> None:
93 with tempfile.TemporaryDirectory() as tmp:
94 root = _make_repo(pathlib.Path(tmp))
95 c_aaa = _write_commit(root, "aaa111")
96 c_bbb = _write_commit(root, "bbb222", parent_id=c_aaa.commit_id)
97 (heads_dir(root) / "main").write_text(c_bbb.commit_id)
98
99 visited: list[str] = []
100
101 def evaluator(commit: CommitRecord, manifest: Manifest, r: pathlib.Path) -> list[QueryMatch]:
102 visited.append(commit.commit_id)
103 return []
104
105 walk_history(root, "main", evaluator, load_manifest=False)
106 assert visited == [c_bbb.commit_id, c_aaa.commit_id]
107
108 def test_matches_collected(self) -> None:
109 with tempfile.TemporaryDirectory() as tmp:
110 root = _make_repo(pathlib.Path(tmp))
111 c = _write_commit(root, "ccc333")
112 (heads_dir(root) / "main").write_text(c.commit_id)
113
114 def evaluator(commit: CommitRecord, manifest: Manifest, r: pathlib.Path) -> list[QueryMatch]:
115 return [QueryMatch(
116 commit_id=commit.commit_id,
117 author=commit.author,
118 committed_at=commit.committed_at.isoformat(),
119 branch=commit.branch,
120 detail="test match",
121 extra={},
122 )]
123
124 results = walk_history(root, "main", evaluator, load_manifest=False)
125 assert len(results) == 1
126 assert results[0]["detail"] == "test match"
127
128 def test_max_commits_limits_walk(self) -> None:
129 with tempfile.TemporaryDirectory() as tmp:
130 root = _make_repo(pathlib.Path(tmp))
131 records: list[CommitRecord] = []
132 for i in range(10):
133 parent_id = records[i - 1].commit_id if i > 0 else None
134 records.append(_write_commit(root, f"commit{i:03d}", parent_id=parent_id))
135 (heads_dir(root) / "main").write_text(records[-1].commit_id)
136
137 visited: list[str] = []
138
139 def evaluator(commit: CommitRecord, manifest: Manifest, r: pathlib.Path) -> list[QueryMatch]:
140 visited.append(commit.commit_id)
141 return []
142
143 walk_history(root, "main", evaluator, max_commits=3, load_manifest=False)
144 assert len(visited) == 3
145
146 def test_head_commit_id_override(self) -> None:
147 with tempfile.TemporaryDirectory() as tmp:
148 root = _make_repo(pathlib.Path(tmp))
149 c_aaa = _write_commit(root, "aaa111")
150 c_bbb = _write_commit(root, "bbb222", parent_id=c_aaa.commit_id)
151 # HEAD points to bbb222 but we override to aaa111.
152 (heads_dir(root) / "main").write_text(c_bbb.commit_id)
153
154 visited: list[str] = []
155
156 def evaluator(commit: CommitRecord, manifest: Manifest, r: pathlib.Path) -> list[QueryMatch]:
157 visited.append(commit.commit_id)
158 return []
159
160 walk_history(root, "main", evaluator, head_commit_id=c_aaa.commit_id, load_manifest=False)
161 assert visited == [c_aaa.commit_id]
162
163
164 # ---------------------------------------------------------------------------
165 # format_matches
166 # ---------------------------------------------------------------------------
167
168
169 class TestFormatMatches:
170 def test_empty_returns_no_matches(self) -> None:
171 assert "No matches" in format_matches([])
172
173 def test_single_match_formatted(self) -> None:
174 m = QueryMatch(
175 commit_id="a" * 64,
176 author="gabriel",
177 committed_at="2026-03-18T12:00:00+00:00",
178 branch="main",
179 detail="my_function (added)",
180 extra={},
181 )
182 out = format_matches([m])
183 assert ("a" * 64)[:8] in out
184 assert "gabriel" in out
185 assert "my_function (added)" in out
186
187 def test_agent_id_shown_when_present(self) -> None:
188 m = QueryMatch(
189 commit_id="a" * 64,
190 author="bot",
191 committed_at="2026-03-18T12:00:00+00:00",
192 branch="main",
193 detail="something",
194 extra={},
195 agent_id="claude-v4",
196 )
197 out = format_matches([m])
198 assert "claude-v4" in out
199
200 def test_max_results_truncation_message_updated(self) -> None:
201 """format_matches uses '--limit' in the truncation hint (not '--max')."""
202 matches = [
203 QueryMatch(
204 commit_id=f"commit{i:04d}",
205 author="x",
206 committed_at="2026-01-01T00:00:00+00:00",
207 branch="main",
208 detail=f"match {i}",
209 extra={},
210 )
211 for i in range(10)
212 ]
213 out = format_matches(matches, max_results=5)
214 assert "--limit" in out
215
216 def test_max_results_capped(self) -> None:
217 matches = [
218 QueryMatch(
219 commit_id=f"commit{i:04d}",
220 author="x",
221 committed_at="2026-01-01T00:00:00+00:00",
222 branch="main",
223 detail=f"match {i}",
224 extra={},
225 )
226 for i in range(100)
227 ]
228 out = format_matches(matches, max_results=5)
229 assert "95 more" in out
230
231
232 # ---------------------------------------------------------------------------
233 # Regression tests: dead walkers covered by live walkers
234 #
235 # These tests prove that walk_commits_bfs and store.walk_commits_between
236 # fully cover the use-cases of the dead walk_commits and walk_commits_range
237 # before those functions are deleted. If these tests pass, deletion is safe.
238 # ---------------------------------------------------------------------------
239
240
241 def _make_repo_for_walker(tmp_path: pathlib.Path) -> pathlib.Path:
242 muse = muse_dir(tmp_path)
243 muse.mkdir()
244 (muse / "repo.json").write_text('{"repo_id":"walker-test"}')
245 (muse / "HEAD").write_text("main")
246 (muse / "commits").mkdir()
247 (muse / "snapshots").mkdir()
248 (muse / "refs" / "heads").mkdir(parents=True)
249 return tmp_path
250
251
252 def _commit(
253 root: pathlib.Path,
254 label: str,
255 parent: str | None = None,
256 parent2: str | None = None,
257 ) -> CommitRecord:
258 """Write a content-addressed CommitRecord. *label* is used only in the message."""
259 snap_id = compute_snapshot_id({})
260 committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
261 parent_ids = [p for p in [parent, parent2] if p is not None]
262 commit_id = compute_commit_id(
263 parent_ids=parent_ids,
264 snapshot_id=snap_id,
265 message=f"msg {label}",
266 committed_at_iso=committed_at.isoformat(),
267 author="tester",
268 )
269 rec = CommitRecord(
270 commit_id=commit_id,
271 branch="main",
272 snapshot_id=snap_id,
273 message=f"msg {label}",
274 committed_at=committed_at,
275 parent_commit_id=parent,
276 parent2_commit_id=parent2,
277 author="tester",
278 )
279 write_commit(root, rec)
280 return rec
281
282
283 class TestWalkHistoryFollowMerges:
284 """Belt-and-suspenders tests for walk_history(follow_merges=True/False)."""
285
286 def test_follow_merges_false_skips_parent2(
287 self, tmp_path: pathlib.Path
288 ) -> None:
289 """follow_merges=False (default) stays on the main parent chain only."""
290 root = _make_repo_for_walker(tmp_path)
291 c_main1 = _commit(root, "main1")
292 c_feat1 = _commit(root, "feat1", parent=c_main1.commit_id)
293 c_merge = _commit(root, "merge_c", parent=c_main1.commit_id, parent2=c_feat1.commit_id)
294 (heads_dir(root) / "main").write_text(c_merge.commit_id)
295
296 visited: list[str] = []
297
298 def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
299 visited.append(c.commit_id)
300 return []
301
302 walk_history(root, "main", ev, follow_merges=False, load_manifest=False)
303 assert c_feat1.commit_id not in visited
304 assert c_merge.commit_id in visited
305 assert c_main1.commit_id in visited
306
307 def test_follow_merges_true_visits_parent2(
308 self, tmp_path: pathlib.Path
309 ) -> None:
310 """follow_merges=True visits both parents of a merge commit."""
311 root = _make_repo_for_walker(tmp_path)
312 c_base = _commit(root, "base")
313 c_feature = _commit(root, "feature", parent=c_base.commit_id)
314 c_merge = _commit(root, "merge_c", parent=c_base.commit_id, parent2=c_feature.commit_id)
315 (heads_dir(root) / "main").write_text(c_merge.commit_id)
316
317 visited: list[str] = []
318
319 def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
320 visited.append(c.commit_id)
321 return []
322
323 walk_history(root, "main", ev, follow_merges=True, load_manifest=False)
324 assert set(visited) == {c_merge.commit_id, c_base.commit_id, c_feature.commit_id}
325
326 def test_follow_merges_true_linear_chain(
327 self, tmp_path: pathlib.Path
328 ) -> None:
329 """follow_merges=True on a linear chain behaves identically to False."""
330 root = _make_repo_for_walker(tmp_path)
331 c_a = _commit(root, "a")
332 c_b = _commit(root, "b", parent=c_a.commit_id)
333 c_c = _commit(root, "c", parent=c_b.commit_id)
334 (heads_dir(root) / "main").write_text(c_c.commit_id)
335
336 visited_ff: list[str] = []
337 visited_ft: list[str] = []
338
339 def ev_ff(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
340 visited_ff.append(c.commit_id)
341 return []
342
343 def ev_ft(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
344 visited_ft.append(c.commit_id)
345 return []
346
347 walk_history(root, "main", ev_ff, follow_merges=False, load_manifest=False)
348 walk_history(root, "main", ev_ft, follow_merges=True, load_manifest=False)
349 assert set(visited_ff) == set(visited_ft) == {c_a.commit_id, c_b.commit_id, c_c.commit_id}
350
351 def test_follow_merges_since_filter_applies(
352 self, tmp_path: pathlib.Path
353 ) -> None:
354 """since filter still prunes commits even with follow_merges=True."""
355 root = _make_repo_for_walker(tmp_path)
356 # Pin explicit timestamps so since filter is deterministic.
357 t_old = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc)
358 t_new = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
359
360 snap_id = compute_snapshot_id({})
361 cid_old = compute_commit_id(
362 parent_ids=[],
363 snapshot_id=snap_id,
364 message="old",
365 committed_at_iso=t_old.isoformat(),
366 author="tester",
367 )
368 rec_old = CommitRecord(
369 commit_id=cid_old,
370 branch="main",
371 snapshot_id=snap_id,
372 message="old",
373 committed_at=t_old,
374 author="tester",
375 )
376 write_commit(root, rec_old)
377
378 cid_new = compute_commit_id(
379 parent_ids=[cid_old],
380 snapshot_id=snap_id,
381 message="new",
382 committed_at_iso=t_new.isoformat(),
383 author="tester",
384 )
385 rec_new = CommitRecord(
386 commit_id=cid_new,
387 branch="main",
388 snapshot_id=snap_id,
389 message="new",
390 committed_at=t_new,
391 parent_commit_id=cid_old,
392 author="tester",
393 )
394 write_commit(root, rec_new)
395 (heads_dir(root) / "main").write_text(cid_new)
396
397 visited: list[str] = []
398
399 def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
400 visited.append(c.commit_id)
401 return []
402
403 since = datetime.datetime(2025, 1, 1, tzinfo=datetime.timezone.utc)
404 walk_history(root, "main", ev, follow_merges=True, since=since, load_manifest=False)
405 assert cid_new in visited
406 assert cid_old not in visited
407
408
409 def test_follow_merges_true_diamond_dag_no_duplicates(
410 self, tmp_path: pathlib.Path
411 ) -> None:
412 """BFS never visits the same commit twice (diamond DAG case)."""
413 root = _make_repo_for_walker(tmp_path)
414 # Diamond: base ← left ← merge, base ← right ← merge
415 c_base = _commit(root, "base")
416 c_left = _commit(root, "left", parent=c_base.commit_id)
417 c_right = _commit(root, "right", parent=c_base.commit_id)
418 c_merge = _commit(root, "merge_c", parent=c_left.commit_id, parent2=c_right.commit_id)
419 (heads_dir(root) / "main").write_text(c_merge.commit_id)
420
421 visited: list[str] = []
422
423 def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
424 visited.append(c.commit_id)
425 return []
426
427 walk_history(root, "main", ev, follow_merges=True, load_manifest=False)
428 # Each commit visited exactly once.
429 assert len(visited) == len(set(visited))
430 assert set(visited) == {c_base.commit_id, c_left.commit_id, c_right.commit_id, c_merge.commit_id}
431
432 def test_follow_merges_max_commits_respected(
433 self, tmp_path: pathlib.Path
434 ) -> None:
435 """max_commits caps BFS walk even with follow_merges=True."""
436 root = _make_repo_for_walker(tmp_path)
437 c1 = _commit(root, "c1")
438 c2 = _commit(root, "c2", parent=c1.commit_id)
439 c3 = _commit(root, "c3", parent=c2.commit_id)
440 c4 = _commit(root, "c4", parent=c3.commit_id)
441 (heads_dir(root) / "main").write_text(c4.commit_id)
442
443 visited: list[str] = []
444
445 def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
446 visited.append(c.commit_id)
447 return []
448
449 walk_history(root, "main", ev, follow_merges=True, max_commits=2, load_manifest=False)
450 assert len(visited) == 2
451
452 def test_follow_merges_evaluator_sees_match(
453 self, tmp_path: pathlib.Path
454 ) -> None:
455 """Matches from parent2 commits are included in results."""
456 root = _make_repo_for_walker(tmp_path)
457 c_base = _commit(root, "base")
458 c_feature = _commit(root, "feature", parent=c_base.commit_id)
459 c_merge = _commit(root, "merge_c", parent=c_base.commit_id, parent2=c_feature.commit_id)
460 (heads_dir(root) / "main").write_text(c_merge.commit_id)
461
462 def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
463 if c.commit_id == c_feature.commit_id:
464 return [QueryMatch(
465 commit_id=c.commit_id,
466 author=c.author,
467 committed_at=c.committed_at.isoformat(),
468 branch=c.branch,
469 detail="feature found",
470 extra={},
471 )]
472 return []
473
474 results = walk_history(root, "main", ev, follow_merges=True, load_manifest=False)
475 assert len(results) == 1
476 assert results[0]["detail"] == "feature found"
477
478 def test_follow_merges_false_misses_parent2_commit(
479 self, tmp_path: pathlib.Path
480 ) -> None:
481 """With follow_merges=False, parent2 commits are never evaluated."""
482 root = _make_repo_for_walker(tmp_path)
483 c_base = _commit(root, "base")
484 c_feature = _commit(root, "feature", parent=c_base.commit_id)
485 c_merge = _commit(root, "merge_c", parent=c_base.commit_id, parent2=c_feature.commit_id)
486 (heads_dir(root) / "main").write_text(c_merge.commit_id)
487
488 def ev(c: CommitRecord, m: Manifest, r: pathlib.Path) -> list[QueryMatch]:
489 if c.commit_id == c_feature.commit_id:
490 return [QueryMatch(
491 commit_id=c.commit_id,
492 author=c.author,
493 committed_at=c.committed_at.isoformat(),
494 branch=c.branch,
495 detail="feature found",
496 extra={},
497 )]
498 return []
499
500 results = walk_history(root, "main", ev, follow_merges=False, load_manifest=False)
501 assert results == [] # feature commit is never visited
502
503
504 class TestLiveWalkersContracts:
505 """Regression: walk_commits_bfs and walk_commits_between cover deleted walkers.
506
507 These tests lock down the contracts of the surviving walkers, proving
508 the deleted walk_commits and walk_commits_range are fully superseded.
509 """
510
511 def test_walk_commits_bfs_linear_chain(self, tmp_path: pathlib.Path) -> None:
512 """walk_commits_bfs on a linear chain returns all commits, newest first."""
513 root = _make_repo_for_walker(tmp_path)
514 c_aaa = _commit(root, "aaa")
515 c_bbb = _commit(root, "bbb", parent=c_aaa.commit_id)
516 c_ccc = _commit(root, "ccc", parent=c_bbb.commit_id)
517
518 live_commits, truncated = walk_commits_bfs(root, c_ccc.commit_id)
519 live_ids = [c.commit_id for c in live_commits]
520
521 assert truncated is False
522 assert set(live_ids) == {c_aaa.commit_id, c_bbb.commit_id, c_ccc.commit_id}
523
524 def test_walk_commits_bfs_follows_parent2(self, tmp_path: pathlib.Path) -> None:
525 """walk_commits_bfs reaches parent2 branches — supersedes dead linear walker."""
526 root = _make_repo_for_walker(tmp_path)
527 c_base = _commit(root, "base")
528 c_feature = _commit(root, "feature", parent=c_base.commit_id)
529 c_merge = _commit(root, "merge_commit", parent=c_base.commit_id, parent2=c_feature.commit_id)
530
531 live_commits, _ = walk_commits_bfs(root, c_merge.commit_id)
532 live_ids = set(c.commit_id for c in live_commits)
533
534 assert c_feature.commit_id in live_ids
535 assert c_base.commit_id in live_ids
536 assert c_merge.commit_id in live_ids
537
538 def test_walk_commits_between_range(self, tmp_path: pathlib.Path) -> None:
539 """walk_commits_between excludes from_commit_id — supersedes walk_commits_range."""
540 root = _make_repo_for_walker(tmp_path)
541 c1 = _commit(root, "c1")
542 c2 = _commit(root, "c2", parent=c1.commit_id)
543 c3 = _commit(root, "c3", parent=c2.commit_id)
544 c4 = _commit(root, "c4", parent=c3.commit_id)
545
546 result = walk_commits_between(root, to_commit_id=c4.commit_id, from_commit_id=c1.commit_id)
547 ids = [c.commit_id for c in result]
548
549 assert ids == [c4.commit_id, c3.commit_id, c2.commit_id]
550 assert c1.commit_id not in ids
551
552 def test_walk_commits_between_none_from(self, tmp_path: pathlib.Path) -> None:
553 """walk_commits_between with from_commit_id=None returns entire chain."""
554 root = _make_repo_for_walker(tmp_path)
555 c_x1 = _commit(root, "x1")
556 c_x2 = _commit(root, "x2", parent=c_x1.commit_id)
557
558 ids = [c.commit_id for c in walk_commits_between(root, c_x2.commit_id, None)]
559 assert ids == [c_x2.commit_id, c_x1.commit_id]
560
561 def test_walk_commits_bfs_stop_at_excludes_boundary(
562 self, tmp_path: pathlib.Path
563 ) -> None:
564 """walk_commits_bfs stop_at_commit_id excludes the boundary — same contract as walk_commits_between."""
565 root = _make_repo_for_walker(tmp_path)
566 c_p1 = _commit(root, "p1")
567 c_p2 = _commit(root, "p2", parent=c_p1.commit_id)
568 c_p3 = _commit(root, "p3", parent=c_p2.commit_id)
569
570 bfs_commits, _ = walk_commits_bfs(root, c_p3.commit_id, stop_at_commit_id=c_p1.commit_id)
571 bfs_ids = [c.commit_id for c in bfs_commits]
572
573 assert c_p1.commit_id not in bfs_ids
574 assert set(bfs_ids) == {c_p3.commit_id, c_p2.commit_id}
File History 2 commits
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 29 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 30 days ago