gabriel / muse public
test_lineage.py python
579 lines 22.1 KB
Raw
sha256:b89fa4fd9ca0d692fc66f6b9aef4c3a0c13c8e9b439faf42da8e91e09f048d4f tests/test_cmd_revert_hardening.py, tests/test_cmd_semantic… Human 3 days ago
1 """Comprehensive tests for muse code lineage.
2
3 Test layers
4 -----------
5 Unit
6 ``build_lineage`` exercised directly with synthetic ``CommitRecord`` objects
7 carrying hand-crafted ``structured_delta`` data. No repo, no disk I/O.
8
9 Integration
10 CLI invocations via ``CliRunner`` against a real tmp-path repo with two
11 Python commits (the shared ``code_repo`` fixture).
12
13 Edge-case
14 Empty history, deleted-then-re-created, address without ``::`` guard,
15 unknown branch / ref, ``--filter`` narrowing, ``--since``/``--until``
16 date bounds, ``--count`` output, ``--stability`` output.
17
18 Stress
19 Programmatically generate N commits each carrying an InsertOp, ModifyOp,
20 or DeleteOp and verify ``build_lineage`` produces the expected event count
21 and kind sequence without error.
22 """
23
24 from __future__ import annotations
25
26 import datetime
27 import json
28 import pathlib
29 import textwrap
30
31 import pytest
32
33 from tests.cli_test_helper import CliRunner
34 from muse.cli.commands.lineage import _LineageEvent, _classify_replace, _stability, build_lineage
35 from muse.core.commits import CommitRecord
36 from muse.domain import DeleteOp, DomainOp, InsertOp, PatchOp, ReplaceOp
37
38 cli = None # argparse migration — CliRunner ignores this arg
39 runner = CliRunner()
40
41 # ---------------------------------------------------------------------------
42 # Helpers
43 # ---------------------------------------------------------------------------
44
45 _REPO_ID = "test-repo-id"
46 _SEQ = [0]
47
48
49 def _cid(tag: str) -> str:
50 """Return a deterministic 64-char hex content_id from a short tag."""
51 return tag.ljust(64, "0")[:64]
52
53
54 def _ts(offset_days: int = 0) -> datetime.datetime:
55 base = datetime.datetime(2026, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc)
56 return base + datetime.timedelta(days=offset_days)
57
58
59 def _commit(
60 *,
61 message: str = "commit",
62 ops: list[DomainOp] | None = None,
63 day: int = 0,
64 commit_id: str | None = None,
65 ) -> CommitRecord:
66 """Build a synthetic CommitRecord with the given symbol-level ops."""
67 _SEQ[0] += 1
68 cid = commit_id or f"c{_SEQ[0]:063d}"
69 return CommitRecord(
70 commit_id=cid,
71 branch="main",
72 snapshot_id=f"snap-{cid}",
73 message=message,
74 committed_at=_ts(day),
75 structured_delta={"ops": ops or [], "domain": "code", "summary": message},
76 )
77
78
79 def _insert(address: str, content_id: str) -> InsertOp:
80 return InsertOp(
81 op="insert",
82 address=address,
83 position=None,
84 content_id=_cid(content_id),
85 content_summary=f"function {address.split('::')[-1]}",
86 )
87
88
89 def _delete(address: str, content_id: str) -> DeleteOp:
90 return DeleteOp(
91 op="delete",
92 address=address,
93 position=None,
94 content_id=_cid(content_id),
95 content_summary=f"function {address.split('::')[-1]}",
96 )
97
98
99 def _replace(address: str, old_cid: str, new_cid: str, old_sum: str = "", new_sum: str = "") -> ReplaceOp:
100 return ReplaceOp(
101 op="replace",
102 address=address,
103 position=None,
104 old_content_id=_cid(old_cid),
105 new_content_id=_cid(new_cid),
106 old_summary=old_sum,
107 new_summary=new_sum,
108 )
109
110
111 def _patch(*child_ops: DomainOp, file: str = "billing.py") -> PatchOp:
112 """Wrap symbol-level ops in a PatchOp (as Muse emits for file changes)."""
113 return PatchOp(
114 op="patch",
115 address=file,
116 child_ops=list(child_ops),
117 child_domain="code",
118 child_summary="",
119 )
120
121
122 ADDR = "billing.py::compute_total"
123 OTHER = "billing.py::compute_total_v2"
124 OTHER_FILE = "utils.py::compute_total"
125
126
127 # ---------------------------------------------------------------------------
128 # Unit: _classify_replace
129 # ---------------------------------------------------------------------------
130
131
132 class TestClassifyReplace:
133 def test_signature_change_detected_in_old(self) -> None:
134 assert _classify_replace("signature changed", "") == "signature_change"
135
136 def test_signature_change_detected_in_new(self) -> None:
137 assert _classify_replace("", "new signature") == "signature_change"
138
139 def test_full_rewrite_when_no_signature(self) -> None:
140 assert _classify_replace("impl updated", "impl updated v2") == "full_rewrite"
141
142 def test_empty_summaries_full_rewrite(self) -> None:
143 assert _classify_replace("", "") == "full_rewrite"
144
145
146 # ---------------------------------------------------------------------------
147 # Unit: _stability
148 # ---------------------------------------------------------------------------
149
150
151 class TestStability:
152 def test_no_events(self) -> None:
153 assert _stability([]) == (0, 0)
154
155 def test_all_created(self) -> None:
156 evs = [_LineageEvent("c1", "2026-01-01", "init", "created")]
157 assert _stability(evs) == (0, 1)
158
159 def test_mixed(self) -> None:
160 evs = [
161 _LineageEvent("c1", "2026-01-01", "init", "created"),
162 _LineageEvent("c2", "2026-01-02", "fix", "modified", detail="impl_only"),
163 _LineageEvent("c3", "2026-01-03", "fix2", "modified", detail="full_rewrite"),
164 ]
165 assert _stability(evs) == (2, 3)
166
167
168 # ---------------------------------------------------------------------------
169 # Unit: build_lineage — core event kinds
170 # ---------------------------------------------------------------------------
171
172
173 class TestBuildLineageCreated:
174 def test_no_commits(self) -> None:
175 assert build_lineage(ADDR, []) == []
176
177 def test_no_structured_delta(self) -> None:
178 c = CommitRecord(
179 commit_id="c" * 64,
180 branch="main",
181 snapshot_id="snap",
182 message="empty",
183 committed_at=_ts(),
184 structured_delta=None,
185 )
186 assert build_lineage(ADDR, [c]) == []
187
188 def test_single_insert_emits_created(self) -> None:
189 c = _commit(ops=[_insert(ADDR, "aaa")], message="add fn", day=0)
190 events = build_lineage(ADDR, [c])
191 assert len(events) == 1
192 assert events[0].kind == "created"
193 assert events[0].message == "add fn"
194 assert events[0].new_content_id == _cid("aaa")
195
196 def test_insert_inside_patch_op(self) -> None:
197 """flat_symbol_ops must recurse into PatchOp.child_ops."""
198 c = _commit(ops=[_patch(_insert(ADDR, "bbb"))], message="patch add")
199 events = build_lineage(ADDR, [c])
200 assert len(events) == 1
201 assert events[0].kind == "created"
202
203 def test_unrelated_insert_ignored(self) -> None:
204 c = _commit(ops=[_insert("billing.py::other_fn", "ccc")])
205 assert build_lineage(ADDR, [c]) == []
206
207
208 class TestBuildLineageModified:
209 def test_replace_emits_modified(self) -> None:
210 c1 = _commit(ops=[_insert(ADDR, "v1")], day=0)
211 c2 = _commit(ops=[_replace(ADDR, "v1", "v2")], day=1, message="fix")
212 events = build_lineage(ADDR, [c1, c2])
213 kinds = [e.kind for e in events]
214 assert kinds == ["created", "modified"]
215 assert events[1].detail == "full_rewrite"
216 assert events[1].message == "fix"
217
218 def test_replace_with_signature_detail(self) -> None:
219 c1 = _commit(ops=[_insert(ADDR, "v1")], day=0)
220 c2 = _commit(ops=[_replace(ADDR, "v1", "v2", old_sum="signature changed")], day=1)
221 events = build_lineage(ADDR, [c1, c2])
222 assert events[1].kind == "modified"
223 assert events[1].detail == "signature_change"
224
225 def test_multiple_modifications_in_sequence(self) -> None:
226 commits = [
227 _commit(ops=[_insert(ADDR, "v1")], day=0),
228 _commit(ops=[_replace(ADDR, "v1", "v2")], day=1),
229 _commit(ops=[_replace(ADDR, "v2", "v3")], day=2),
230 _commit(ops=[_replace(ADDR, "v3", "v4")], day=3),
231 ]
232 events = build_lineage(ADDR, commits)
233 assert len(events) == 4
234 assert events[0].kind == "created"
235 assert all(e.kind == "modified" for e in events[1:])
236
237
238 class TestBuildLineageDeleted:
239 def test_delete_emits_deleted(self) -> None:
240 c1 = _commit(ops=[_insert(ADDR, "v1")], day=0)
241 c2 = _commit(ops=[_delete(ADDR, "v1")], day=1, message="remove fn")
242 events = build_lineage(ADDR, [c1, c2])
243 assert events[-1].kind == "deleted"
244 assert events[-1].message == "remove fn"
245
246 def test_delete_marks_address_not_live(self) -> None:
247 """After delete, re-inserting the same content should emit 'created', not 'copied_from'."""
248 c1 = _commit(ops=[_insert(ADDR, "v1")], day=0)
249 c2 = _commit(ops=[_delete(ADDR, "v1")], day=1)
250 c3 = _commit(ops=[_insert(ADDR, "v1")], day=2)
251 events = build_lineage(ADDR, [c1, c2, c3])
252 kinds = [e.kind for e in events]
253 assert kinds == ["created", "deleted", "created"]
254
255
256 class TestBuildLineageRenamedMoved:
257 def test_rename_within_same_file(self) -> None:
258 """InsertOp at ADDR + DeleteOp at OTHER (same file, same content_id) → renamed_from."""
259 c1 = _commit(ops=[_insert(OTHER, "v1")], day=0)
260 c2 = _commit(ops=[_insert(ADDR, "v1"), _delete(OTHER, "v1")], day=1, message="rename")
261 events = build_lineage(ADDR, [c1, c2])
262 ev = next(e for e in events if e.kind == "renamed_from")
263 assert ev.detail == OTHER
264 assert ev.message == "rename"
265
266 def test_move_across_files(self) -> None:
267 """InsertOp at ADDR + DeleteOp at OTHER_FILE (different file) → moved_from."""
268 c1 = _commit(ops=[_insert(OTHER_FILE, "v1")], day=0)
269 c2 = _commit(ops=[_insert(ADDR, "v1"), _delete(OTHER_FILE, "v1")], day=1, message="move")
270 events = build_lineage(ADDR, [c1, c2])
271 ev = next(e for e in events if e.kind == "moved_from")
272 assert ev.detail == OTHER_FILE
273 assert ev.message == "move"
274
275 def test_rename_file_correctly_classified(self) -> None:
276 """Same file → renamed_from, not moved_from."""
277 c1 = _commit(ops=[_insert(OTHER, "v1")], day=0)
278 c2 = _commit(ops=[_insert(ADDR, "v1"), _delete(OTHER, "v1")], day=1)
279 events = build_lineage(ADDR, [c1, c2])
280 assert any(e.kind == "renamed_from" for e in events)
281 assert not any(e.kind == "moved_from" for e in events)
282
283
284 class TestBuildLineageCopied:
285 def test_copied_from_living_symbol(self) -> None:
286 """Insert at ADDR with content_id already live at OTHER → copied_from."""
287 c1 = _commit(ops=[_insert(OTHER, "v1")], day=0)
288 c2 = _commit(ops=[_insert(ADDR, "v1")], day=1, message="copy fn")
289 events = build_lineage(ADDR, [c1, c2])
290 assert events[0].kind == "copied_from"
291 assert events[0].detail == OTHER
292 assert events[0].message == "copy fn"
293
294 def test_not_copied_when_no_living_symbol(self) -> None:
295 """Insert with unique content_id → created, not copied_from."""
296 c = _commit(ops=[_insert(ADDR, "unique_content")], day=0)
297 events = build_lineage(ADDR, [c])
298 assert events[0].kind == "created"
299
300
301 # ---------------------------------------------------------------------------
302 # Unit: build_lineage — complex lifecycle
303 # ---------------------------------------------------------------------------
304
305
306 class TestBuildLineageLifecycle:
307 def test_full_lifecycle(self) -> None:
308 """create → modify → rename_away → recreate → delete."""
309 # Phase 1: created at ADDR
310 c1 = _commit(ops=[_insert(ADDR, "v1")], day=0, message="create")
311 # Phase 2: modified
312 c2 = _commit(ops=[_replace(ADDR, "v1", "v2")], day=1, message="modify")
313 # Phase 3: renamed away — ADDR is deleted, NEW_ADDR is inserted
314 new_addr = "billing.py::compute_total_renamed"
315 c3 = _commit(ops=[_insert(new_addr, "v2"), _delete(ADDR, "v2")], day=2, message="rename away")
316 # Phase 4: ADDR re-created with fresh content
317 c4 = _commit(ops=[_insert(ADDR, "v3")], day=3, message="recreate")
318 # Phase 5: deleted
319 c5 = _commit(ops=[_delete(ADDR, "v3")], day=4, message="delete")
320
321 events = build_lineage(ADDR, [c1, c2, c3, c4, c5])
322 kinds = [e.kind for e in events]
323 assert kinds == ["created", "modified", "deleted", "created", "deleted"]
324
325 def test_commit_message_propagated(self) -> None:
326 c1 = _commit(ops=[_insert(ADDR, "v1")], message="Initial commit")
327 events = build_lineage(ADDR, [c1])
328 assert events[0].message == "Initial commit"
329
330 def test_to_dict_has_full_commit_id(self) -> None:
331 c1 = _commit(ops=[_insert(ADDR, "v1")], commit_id="a" * 64)
332 events = build_lineage(ADDR, [c1])
333 d = events[0].to_dict()
334 assert d["commit_id"] == "a" * 64 # not truncated
335
336 def test_to_dict_has_message(self) -> None:
337 c1 = _commit(ops=[_insert(ADDR, "v1")], message="My message")
338 events = build_lineage(ADDR, [c1])
339 assert events[0].to_dict()["message"] == "My message"
340
341 def test_commits_without_symbol_ops_skipped(self) -> None:
342 """File-level ops (no '::') must not generate any events."""
343 file_op = ReplaceOp(
344 op="replace",
345 address="billing.py",
346 position=None,
347 old_content_id=_cid("old"),
348 new_content_id=_cid("new"),
349 old_summary="",
350 new_summary="",
351 )
352 c = _commit(ops=[file_op])
353 assert build_lineage(ADDR, [c]) == []
354
355
356 # ---------------------------------------------------------------------------
357 # Unit: build_lineage — date filtering via _gather_commits (tested indirectly
358 # through the CLI --since/--until flags in integration tests below)
359 # ---------------------------------------------------------------------------
360
361
362 # ---------------------------------------------------------------------------
363 # Stress: large commit sequence
364 # ---------------------------------------------------------------------------
365
366
367 class TestBuildLineageStress:
368 def test_many_modifications(self) -> None:
369 """500 sequential modifications produce 501 events without error."""
370 n = 500
371 commits: list[CommitRecord] = [_commit(ops=[_insert(ADDR, "v0")], day=0)]
372 for i in range(1, n + 1):
373 commits.append(_commit(
374 ops=[_replace(ADDR, f"v{i-1}", f"v{i}")],
375 day=i,
376 ))
377 events = build_lineage(ADDR, commits)
378 assert len(events) == n + 1
379 assert events[0].kind == "created"
380 assert all(e.kind == "modified" for e in events[1:])
381
382 def test_many_unrelated_commits_skipped_efficiently(self) -> None:
383 """1000 commits touching only unrelated symbols → 0 events for ADDR."""
384 commits = [
385 _commit(ops=[_insert(f"billing.py::other_{i}", f"uid_{i}")], day=i)
386 for i in range(1000)
387 ]
388 events = build_lineage(ADDR, commits)
389 assert events == []
390
391 def test_interleaved_symbol_and_unrelated_ops(self) -> None:
392 """Mix of target-symbol ops and noise — only target events emitted."""
393 commits: list[CommitRecord] = []
394 for i in range(200):
395 ops: list[DomainOp] = [_insert(f"billing.py::noise_{i}", f"n{i}")]
396 if i == 50:
397 ops.append(_insert(ADDR, "start"))
398 if i == 100:
399 ops.append(_replace(ADDR, "start", "mid"))
400 if i == 150:
401 ops.append(_delete(ADDR, "mid"))
402 commits.append(_commit(ops=ops, day=i))
403
404 events = build_lineage(ADDR, commits)
405 kinds = [e.kind for e in events]
406 assert kinds == ["created", "modified", "deleted"]
407
408 def test_delete_recreate_cycle(self) -> None:
409 """Symbol deleted and recreated 10 times → 10 deletes + 11 creates."""
410 commits: list[CommitRecord] = [_commit(ops=[_insert(ADDR, "v0")], day=0)]
411 for cycle in range(10):
412 base = cycle * 2 + 1
413 commits.append(_commit(ops=[_delete(ADDR, f"v{cycle}")], day=base))
414 commits.append(_commit(ops=[_insert(ADDR, f"v{cycle+1}")], day=base + 1))
415
416 events = build_lineage(ADDR, commits)
417 kinds = [e.kind for e in events]
418 assert kinds.count("created") == 11
419 assert kinds.count("deleted") == 10
420
421
422 # ---------------------------------------------------------------------------
423 # Integration: CLI
424 # ---------------------------------------------------------------------------
425
426
427 @pytest.fixture
428 def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
429 monkeypatch.chdir(tmp_path)
430 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
431 r = runner.invoke(cli, ["init", "--domain", "code"])
432 assert r.exit_code == 0, r.output
433 return tmp_path
434
435
436 @pytest.fixture
437 def code_repo(repo: pathlib.Path) -> pathlib.Path:
438 """Two-commit repo: billing.py created then function renamed."""
439 (repo / "billing.py").write_text(textwrap.dedent("""\
440 def compute_total(items):
441 return sum(items)
442
443 def process_order(invoice, items):
444 return compute_total(items)
445 """))
446 runner.invoke(cli, ["code", "add", "billing.py"])
447 r = runner.invoke(cli, ["commit", "-m", "Initial billing module"])
448 assert r.exit_code == 0, r.output
449
450 (repo / "billing.py").write_text(textwrap.dedent("""\
451 def compute_invoice_total(items):
452 return sum(items)
453
454 def process_order(invoice, items):
455 return compute_invoice_total(items)
456 """))
457 runner.invoke(cli, ["code", "add", "billing.py"])
458 r = runner.invoke(cli, ["commit", "-m", "Rename compute_total"])
459 assert r.exit_code == 0, r.output
460 return repo
461
462
463 class TestLineageCLI:
464 def test_exits_zero_for_existing_symbol(self, code_repo: pathlib.Path) -> None:
465 result = runner.invoke(cli, ["code", "lineage", "billing.py::process_order"])
466 assert result.exit_code == 0, result.output
467
468 def test_json_schema(self, code_repo: pathlib.Path) -> None:
469 result = runner.invoke(cli, ["code", "lineage", "--json", "billing.py::process_order"])
470 assert result.exit_code == 0, result.output
471 data = json.loads(result.output)
472 assert "events" in data
473 assert "total" in data
474 assert "stability_pct" in data
475 assert "modified_count" in data
476 assert isinstance(data["events"], list)
477 for ev in data["events"]:
478 assert "commit_id" in ev
479 assert "committed_at" in ev
480 assert "event" in ev
481 assert "message" in ev
482 # Full sha256:<64-hex> format — not truncated
483 assert ev["commit_id"].startswith("sha256:")
484 assert len(ev["commit_id"]) == 71
485
486 def test_no_address_separator_rejected(self, code_repo: pathlib.Path) -> None:
487 result = runner.invoke(cli, ["code", "lineage", "billing.py"])
488 assert result.exit_code == 1
489
490 def test_missing_symbol_returns_zero_events(self, code_repo: pathlib.Path) -> None:
491 result = runner.invoke(cli, ["code", "lineage", "billing.py::nonexistent_xyz"])
492 assert result.exit_code == 0
493 assert "no events found" in result.output
494
495 def test_count_only_outputs_integer(self, code_repo: pathlib.Path) -> None:
496 result = runner.invoke(cli, ["code", "lineage", "--count", "billing.py::process_order"])
497 assert result.exit_code == 0
498 assert result.output.strip().isdigit()
499
500 def test_filter_created_subset(self, code_repo: pathlib.Path) -> None:
501 result = runner.invoke(cli, [
502 "code", "lineage", "--filter", "created",
503 "--json", "billing.py::process_order",
504 ])
505 assert result.exit_code == 0
506 data = json.loads(result.output)
507 for ev in data["events"]:
508 assert ev["event"] == "created"
509
510 def test_filter_modified_subset(self, code_repo: pathlib.Path) -> None:
511 result = runner.invoke(cli, [
512 "code", "lineage", "--filter", "modified",
513 "--json", "billing.py::process_order",
514 ])
515 assert result.exit_code == 0
516 data = json.loads(result.output)
517 for ev in data["events"]:
518 assert ev["event"] == "modified"
519
520 def test_since_future_returns_empty(self, code_repo: pathlib.Path) -> None:
521 result = runner.invoke(cli, [
522 "code", "lineage", "--since", "2099-01-01",
523 "billing.py::process_order",
524 ])
525 assert result.exit_code == 0
526 assert "no events found" in result.output
527
528 def test_since_invalid_date_rejected(self, code_repo: pathlib.Path) -> None:
529 result = runner.invoke(cli, [
530 "code", "lineage", "--since", "not-a-date",
531 "billing.py::process_order",
532 ])
533 assert result.exit_code == 1
534
535 def test_until_invalid_date_rejected(self, code_repo: pathlib.Path) -> None:
536 result = runner.invoke(cli, [
537 "code", "lineage", "--until", "99/99/99",
538 "billing.py::process_order",
539 ])
540 assert result.exit_code == 1
541
542 def test_commit_flag_accepted(self, code_repo: pathlib.Path) -> None:
543 result = runner.invoke(cli, [
544 "code", "lineage", "--commit", "HEAD",
545 "billing.py::process_order",
546 ])
547 assert result.exit_code == 0
548
549 def test_stability_flag_present_in_output(self, code_repo: pathlib.Path) -> None:
550 result = runner.invoke(cli, [
551 "code", "lineage", "--stability", "billing.py::process_order",
552 ])
553 assert result.exit_code == 0
554 # Stability line only appears when there are events.
555 # Just check no crash.
556
557 def test_unknown_branch_rejected(self, code_repo: pathlib.Path) -> None:
558 result = runner.invoke(cli, [
559 "code", "lineage", "--branch", "nonexistent-branch-xyz",
560 "billing.py::process_order",
561 ])
562 assert result.exit_code == 1
563
564 def test_requires_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
565 monkeypatch.chdir(tmp_path)
566 result = runner.invoke(cli, ["code", "lineage", "src/a.py::f"])
567 assert result.exit_code != 0
568
569 def test_json_events_have_full_commit_id(self, code_repo: pathlib.Path) -> None:
570 result = runner.invoke(cli, ["code", "lineage", "--json", "billing.py::process_order"])
571 assert result.exit_code == 0
572 data = json.loads(result.output)
573 for ev in data["events"]:
574 assert ev["commit_id"].startswith("sha256:"), (
575 "commit_id must use sha256: prefix format"
576 )
577 assert len(ev["commit_id"]) == 71, (
578 f"sha256:<64-hex> should be 71 chars, got {len(ev['commit_id'])}"
579 )
File History 1 commit
sha256:b89fa4fd9ca0d692fc66f6b9aef4c3a0c13c8e9b439faf42da8e91e09f048d4f tests/test_cmd_revert_hardening.py, tests/test_cmd_semantic… Human 3 days ago