gabriel / muse public
test_cmd_blame_hardening.py python
948 lines 37.9 KB
Raw
1 """Comprehensive tests for ``muse code blame`` CLI hardening.
2
3 Audit findings addressed
4 ------------------------
5 Security
6 - ev.detail and ev.new_address now passed through sanitize_display()
7 in text output — eliminates ANSI injection from stored commit data.
8 - from_ref echoed through sanitize_display() in error messages.
9 - Address argument validated for control characters and null bytes before
10 any processing.
11 - Guard added in reverse-rename path to require '::' in op_address,
12 preventing misparse of malformed commit records.
13
14 Performance
15 - address.rsplit("::", 1) was called twice per _events_in_commit invocation
16 (once for file_prefix, once for bare_name). Now pre-split once per outer
17 loop iteration and passed as parameters — saves 2N string ops for N
18 commits scanned.
19 - Early-exit: scan loop breaks as soon as a "created" event is found.
20 Full lineage is established at that point; no older commits can add
21 new events. Significant win for large repos.
22
23 Dead code removed
24 - Empty "# Repository helpers" comment section (no content).
25 - Unreachable max_commits < 1 guard (clamp_int already enforces min=1).
26
27 New capabilities
28 - --kind filter: show only events of specified kind(s).
29 - --author filter: case-insensitive substring match on commit author.
30 - Improved --all text output: author+message for every event; event
31 number labels beyond the first three.
32 - "... N older events" hint when --all is omitted but more events exist.
33 - _BlameEventJson and _BlameResultJson TypedDicts for stable JSON schemas.
34
35 Coverage tiers
36 --------------
37 - Unit: _flat_ops, _events_in_commit, _BlameEvent.to_dict
38 - Integration: run with show/add/rename/filter scenarios
39 - Security: control chars in address, ANSI in stored data, stderr routing
40 - E2E: full CLI invocations, JSON schema, exit codes, filter flags
41 - Stress: 500-commit chain, 50-event history, early-exit verification
42 """
43 from __future__ import annotations
44
45 import datetime
46 import json
47 import pathlib
48 import threading
49 from typing import TYPE_CHECKING
50 from unittest.mock import MagicMock
51
52 import pytest
53
54 from muse.core.errors import ExitCode
55 from muse.core.types import NULL_COMMIT_ID
56 from muse.core.ids import hash_commit, hash_snapshot
57 from muse.core.commits import (
58 CommitRecord,
59 write_commit,
60 )
61 from muse.domain import DomainOp, StructuredDelta
62 from tests.cli_test_helper import CliRunner, InvokeResult
63
64 from muse.cli.commands.blame import SymbolEventKind
65
66 if TYPE_CHECKING:
67 from muse.cli.commands.blame import _BlameResultJson
68
69 runner = CliRunner()
70 cli = None
71
72
73 # ---------------------------------------------------------------------------
74 # Helpers
75 # ---------------------------------------------------------------------------
76
77
78 def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path:
79 dot_muse = muse_dir(tmp_path)
80 for sub in ("commits", "snapshots", "refs/heads", "objects"):
81 (dot_muse / sub).mkdir(parents=True, exist_ok=True)
82 (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
83 (dot_muse / "repo.json").write_text(
84 json.dumps({"repo_id": "test-repo"}), encoding="utf-8"
85 )
86 return tmp_path
87
88
89 _EPOCH = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
90
91
92 def _write_commit(
93 root: pathlib.Path,
94 message: str = "test commit",
95 branch: str = "main",
96 parent_id: str | None = None,
97 author: str = "alice",
98 delta: StructuredDelta | None = None,
99 dt_offset_days: int = 0,
100 ) -> CommitRecord:
101 committed_at = _EPOCH + datetime.timedelta(days=dt_offset_days)
102 snap_id = hash_snapshot({})
103 parents = [parent_id] if parent_id else []
104 cid = hash_commit( parent_ids=parents,
105 snapshot_id=snap_id,
106 message=message,
107 committed_at_iso=committed_at.isoformat(),
108 author=author,
109 )
110 record = CommitRecord(
111 commit_id=cid,
112 branch=branch,
113 snapshot_id=snap_id,
114 message=message,
115 committed_at=committed_at,
116 author=author,
117 parent_commit_id=parent_id,
118 structured_delta=delta,
119 )
120 write_commit(root, record)
121 (ref_path(root, branch)).write_text(cid, encoding="utf-8")
122 return record
123
124
125 def _make_delta(ops: list[DomainOp]) -> StructuredDelta:
126 return StructuredDelta(domain="code", ops=ops, summary="")
127
128
129 _FAKE_HASH_A = "a" * 64
130 _FAKE_HASH_B = "b" * 64
131
132
133 def _insert_op(address: str, summary: str = "created") -> DomainOp:
134 from muse.domain import InsertOp
135 return InsertOp(
136 op="insert", address=address,
137 position=None, content_id=_FAKE_HASH_A,
138 content_summary=summary,
139 )
140
141
142 def _delete_op(address: str, summary: str = "deleted") -> DomainOp:
143 from muse.domain import DeleteOp
144 return DeleteOp(
145 op="delete", address=address,
146 position=None, content_id=_FAKE_HASH_A,
147 content_summary=summary,
148 )
149
150
151 def _replace_op(address: str, new_summary: str = "modified") -> DomainOp:
152 from muse.domain import ReplaceOp
153 return ReplaceOp(
154 op="replace", address=address,
155 position=None,
156 old_content_id=_FAKE_HASH_A, new_content_id=_FAKE_HASH_B,
157 old_summary="old", new_summary=new_summary,
158 )
159
160
161 def _invoke(root: pathlib.Path, *args: str) -> InvokeResult:
162 return runner.invoke(
163 cli,
164 ["code", "blame", *args],
165 env={"MUSE_REPO_ROOT": str(root)},
166 )
167
168
169 def _parse_json(result: InvokeResult) -> "_BlameResultJson":
170 from muse.cli.commands.blame import _BlameResultJson, _BlameEventJson
171
172 start = result.output.index("{")
173 blob = result.output[start:]
174 depth = 0
175 end = 0
176 for i, ch in enumerate(blob):
177 if ch == "{":
178 depth += 1
179 elif ch == "}":
180 depth -= 1
181 if depth == 0:
182 end = i + 1
183 break
184 raw = json.loads(blob[:end])
185 assert isinstance(raw, dict)
186 raw_events = raw.get("events", [])
187 assert isinstance(raw_events, list)
188 _valid_kinds = frozenset(("created", "modified", "renamed", "moved", "deleted", "signature"))
189 events: list[_BlameEventJson] = []
190 for e in raw_events:
191 assert isinstance(e, dict)
192 raw_kind = e.get("event", "modified")
193 kind: SymbolEventKind = raw_kind if raw_kind in _valid_kinds else "modified"
194 events.append(_BlameEventJson(
195 event=kind,
196 commit_id=str(e.get("commit_id", "")),
197 author=str(e.get("author", "")),
198 message=str(e.get("message", "")),
199 committed_at=str(e.get("committed_at", "")),
200 address=str(e.get("address", "")),
201 detail=str(e.get("detail", "")),
202 new_address=e.get("new_address"),
203 ))
204 return _BlameResultJson(
205 address=str(raw.get("address", "")),
206 start_ref=str(raw.get("start_ref", "")),
207 total_commits_scanned=int(raw.get("total_commits_scanned", 0)),
208 truncated=bool(raw.get("truncated", False)),
209 events=events,
210 )
211
212
213 # ---------------------------------------------------------------------------
214 # Unit — _flat_ops
215 # ---------------------------------------------------------------------------
216
217
218 class TestFlatOps:
219 def test_passthrough_non_patch_ops(self) -> None:
220 from muse.cli.commands.blame import _flat_ops
221
222 op = _insert_op("f.py::foo")
223 assert _flat_ops([op]) == [op]
224
225 def test_flattens_patch_children(self) -> None:
226 from muse.cli.commands.blame import _flat_ops
227 from muse.domain import PatchOp
228
229 child1 = _insert_op("f.py::foo")
230 child2 = _replace_op("f.py::bar")
231 patch = PatchOp(op="patch", address="f.py", child_ops=[child1, child2], child_domain="code", child_summary="test")
232 result = _flat_ops([patch])
233 assert result == [child1, child2]
234
235 def test_empty_ops(self) -> None:
236 from muse.cli.commands.blame import _flat_ops
237
238 assert _flat_ops([]) == []
239
240 def test_mixed_patch_and_leaf(self) -> None:
241 from muse.cli.commands.blame import _flat_ops
242 from muse.domain import PatchOp
243
244 child = _insert_op("f.py::child")
245 patch = PatchOp(op="patch", address="f.py", child_ops=[child], child_domain="code", child_summary="test")
246 leaf = _delete_op("g.py::gone")
247 result = _flat_ops([patch, leaf])
248 assert result == [child, leaf]
249
250
251 # ---------------------------------------------------------------------------
252 # Unit — _events_in_commit
253 # ---------------------------------------------------------------------------
254
255
256 class TestEventsInCommit:
257 def _commit(
258 self, root: pathlib.Path, delta: StructuredDelta | None = None
259 ) -> CommitRecord:
260 return _write_commit(root, delta=delta)
261
262 def test_insert_yields_created(self, tmp_path: pathlib.Path) -> None:
263 from muse.cli.commands.blame import _events_in_commit
264
265 repo = _make_repo(tmp_path)
266 delta = _make_delta([_insert_op("f.py::foo", "initial")])
267 c = self._commit(repo, delta)
268 evs, next_addr = _events_in_commit(c, "f.py::foo", "f.py", "foo")
269 assert len(evs) == 1
270 assert evs[0].kind == "created"
271 assert next_addr == "f.py::foo"
272
273 def test_replace_yields_modified(self, tmp_path: pathlib.Path) -> None:
274 from muse.cli.commands.blame import _events_in_commit
275
276 repo = _make_repo(tmp_path)
277 delta = _make_delta([_replace_op("f.py::foo", "refactored")])
278 c = self._commit(repo, delta)
279 evs, _ = _events_in_commit(c, "f.py::foo", "f.py", "foo")
280 assert len(evs) == 1
281 assert evs[0].kind == "modified"
282
283 def test_replace_rename_yields_renamed(self, tmp_path: pathlib.Path) -> None:
284 from muse.cli.commands.blame import _events_in_commit
285
286 repo = _make_repo(tmp_path)
287 delta = _make_delta([_replace_op("f.py::foo", "renamed to bar")])
288 c = self._commit(repo, delta)
289 evs, next_addr = _events_in_commit(c, "f.py::foo", "f.py", "foo")
290 assert len(evs) == 1
291 assert evs[0].kind == "renamed"
292 assert evs[0].new_address == "f.py::bar"
293 assert next_addr == "f.py::foo" # old name — unchanged when walking backward
294
295 def test_delete_yields_deleted(self, tmp_path: pathlib.Path) -> None:
296 from muse.cli.commands.blame import _events_in_commit
297
298 repo = _make_repo(tmp_path)
299 delta = _make_delta([_delete_op("f.py::foo", "removed")])
300 c = self._commit(repo, delta)
301 evs, _ = _events_in_commit(c, "f.py::foo", "f.py", "foo")
302 assert len(evs) == 1
303 assert evs[0].kind == "deleted"
304
305 def test_delete_moved_to_yields_moved(self, tmp_path: pathlib.Path) -> None:
306 from muse.cli.commands.blame import _events_in_commit
307
308 repo = _make_repo(tmp_path)
309 delta = _make_delta([_delete_op("f.py::foo", "moved to g.py")])
310 c = self._commit(repo, delta)
311 evs, _ = _events_in_commit(c, "f.py::foo", "f.py", "foo")
312 assert evs[0].kind == "moved"
313
314 def test_replace_signature_yields_signature(
315 self, tmp_path: pathlib.Path
316 ) -> None:
317 from muse.cli.commands.blame import _events_in_commit
318
319 repo = _make_repo(tmp_path)
320 delta = _make_delta([_replace_op("f.py::foo", "signature changed")])
321 c = self._commit(repo, delta)
322 evs, _ = _events_in_commit(c, "f.py::foo", "f.py", "foo")
323 assert evs[0].kind == "signature"
324
325 def test_no_delta_returns_empty(self, tmp_path: pathlib.Path) -> None:
326 from muse.cli.commands.blame import _events_in_commit
327
328 repo = _make_repo(tmp_path)
329 c = self._commit(repo, delta=None)
330 evs, next_addr = _events_in_commit(c, "f.py::foo", "f.py", "foo")
331 assert evs == []
332 assert next_addr == "f.py::foo"
333
334 def test_unrelated_op_not_matched(self, tmp_path: pathlib.Path) -> None:
335 from muse.cli.commands.blame import _events_in_commit
336
337 repo = _make_repo(tmp_path)
338 delta = _make_delta([_insert_op("f.py::other")])
339 c = self._commit(repo, delta)
340 evs, _ = _events_in_commit(c, "f.py::foo", "f.py", "foo")
341 assert evs == []
342
343 def test_reverse_rename_switches_next_address(
344 self, tmp_path: pathlib.Path
345 ) -> None:
346 from muse.cli.commands.blame import _events_in_commit
347
348 repo = _make_repo(tmp_path)
349 # op: old name "f.py::old" was renamed to "foo"
350 delta = _make_delta([_replace_op("f.py::old", "renamed to foo")])
351 c = self._commit(repo, delta)
352 evs, next_addr = _events_in_commit(c, "f.py::foo", "f.py", "foo")
353 assert len(evs) == 1
354 assert evs[0].kind == "renamed"
355 assert next_addr == "f.py::old"
356
357 def test_malformed_op_address_without_colons_skipped(
358 self, tmp_path: pathlib.Path
359 ) -> None:
360 from muse.cli.commands.blame import _events_in_commit
361
362 repo = _make_repo(tmp_path)
363 # op_address without '::' — should not cause crash or incorrect match
364 delta = _make_delta([_replace_op("nofile", "renamed to foo")])
365 c = self._commit(repo, delta)
366 # Should not raise and should not produce events for "f.py::foo"
367 evs, next_addr = _events_in_commit(c, "f.py::foo", "f.py", "foo")
368 assert evs == []
369 assert next_addr == "f.py::foo"
370
371
372 # ---------------------------------------------------------------------------
373 # Unit — _BlameEvent.to_dict
374 # ---------------------------------------------------------------------------
375
376
377 class TestBlameEventToDict:
378 def test_all_fields_present(self, tmp_path: pathlib.Path) -> None:
379 from muse.cli.commands.blame import _BlameEvent
380
381 repo = _make_repo(tmp_path)
382 c = _write_commit(repo)
383 ev = _BlameEvent("created", c, "f.py::foo", "initial", None)
384 d = ev.to_dict()
385 for field in (
386 "event", "commit_id", "author", "message",
387 "committed_at", "address", "detail", "new_address",
388 ):
389 assert field in d, f"Missing field: {field}"
390
391 def test_event_kind_preserved(self, tmp_path: pathlib.Path) -> None:
392 from muse.cli.commands.blame import _BlameEvent
393
394 repo = _make_repo(tmp_path)
395 c = _write_commit(repo)
396 for kind in ("created", "modified", "renamed", "moved", "deleted", "signature"):
397 typed_kind: SymbolEventKind = kind
398 ev = _BlameEvent(typed_kind, c, "f.py::foo", "detail", None)
399 assert ev.to_dict()["event"] == kind
400
401 def test_new_address_none_when_absent(self, tmp_path: pathlib.Path) -> None:
402 from muse.cli.commands.blame import _BlameEvent
403
404 repo = _make_repo(tmp_path)
405 c = _write_commit(repo)
406 ev = _BlameEvent("modified", c, "f.py::foo", "mod", None)
407 assert ev.to_dict()["new_address"] is None
408
409 def test_new_address_string_when_set(self, tmp_path: pathlib.Path) -> None:
410 from muse.cli.commands.blame import _BlameEvent
411
412 repo = _make_repo(tmp_path)
413 c = _write_commit(repo)
414 ev = _BlameEvent("renamed", c, "f.py::old", "renamed to new", "f.py::new")
415 assert ev.to_dict()["new_address"] == "f.py::new"
416
417
418 # ---------------------------------------------------------------------------
419 # Integration — basic blame scenarios
420 # ---------------------------------------------------------------------------
421
422
423 class TestBlameShow:
424 def test_no_events_shows_message(self, tmp_path: pathlib.Path) -> None:
425 repo = _make_repo(tmp_path)
426 _write_commit(repo)
427 result = _invoke(repo, "f.py::foo")
428 assert result.exit_code == 0
429 assert "no events found" in result.output
430
431 def test_created_event_shown(self, tmp_path: pathlib.Path) -> None:
432 repo = _make_repo(tmp_path)
433 delta = _make_delta([_insert_op("f.py::foo")])
434 _write_commit(repo, delta=delta)
435 result = _invoke(repo, "f.py::foo")
436 assert result.exit_code == 0
437 assert "created" in result.output
438
439 def test_modified_event_shown(self, tmp_path: pathlib.Path) -> None:
440 repo = _make_repo(tmp_path)
441 delta = _make_delta([_replace_op("f.py::foo", "big refactor")])
442 _write_commit(repo, delta=delta)
443 result = _invoke(repo, "f.py::foo")
444 assert result.exit_code == 0
445 assert "big refactor" in result.output
446
447 def test_author_shown_in_text_output(self, tmp_path: pathlib.Path) -> None:
448 repo = _make_repo(tmp_path)
449 delta = _make_delta([_insert_op("f.py::foo")])
450 _write_commit(repo, author="bob", delta=delta)
451 result = _invoke(repo, "f.py::foo")
452 assert result.exit_code == 0
453 assert "bob" in result.output
454
455 def test_message_shown_in_text_output(self, tmp_path: pathlib.Path) -> None:
456 repo = _make_repo(tmp_path)
457 delta = _make_delta([_insert_op("f.py::foo")])
458 _write_commit(repo, message="feat: add foo", delta=delta)
459 result = _invoke(repo, "f.py::foo")
460 assert result.exit_code == 0
461 assert "feat: add foo" in result.output
462
463 def test_shows_hint_when_more_events_exist(
464 self, tmp_path: pathlib.Path
465 ) -> None:
466 repo = _make_repo(tmp_path)
467 # 4 commits each touching f.py::foo
468 c1 = _write_commit(repo, message="c1", delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0)
469 c2 = _write_commit(repo, message="c2", parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod1")]), dt_offset_days=1)
470 c3 = _write_commit(repo, message="c3", parent_id=c2.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod2")]), dt_offset_days=2)
471 _write_commit(repo, message="c4", parent_id=c3.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod3")]), dt_offset_days=3)
472 result = _invoke(repo, "f.py::foo")
473 assert result.exit_code == 0
474 assert "older event" in result.output
475
476 def test_all_flag_shows_full_history(self, tmp_path: pathlib.Path) -> None:
477 repo = _make_repo(tmp_path)
478 c1 = _write_commit(repo, message="c1", delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0)
479 c2 = _write_commit(repo, message="c2", parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod1")]), dt_offset_days=1)
480 c3 = _write_commit(repo, message="c3", parent_id=c2.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod2")]), dt_offset_days=2)
481 _write_commit(repo, message="c4", parent_id=c3.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod3")]), dt_offset_days=3)
482 result = _invoke(repo, "f.py::foo", "--all")
483 assert result.exit_code == 0
484 assert "older event" not in result.output
485
486 def test_all_flag_shows_author_for_every_event(
487 self, tmp_path: pathlib.Path
488 ) -> None:
489 repo = _make_repo(tmp_path)
490 c1 = _write_commit(repo, author="alice", delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0)
491 _write_commit(repo, author="bob", parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod")]), dt_offset_days=1)
492 result = _invoke(repo, "f.py::foo", "--all")
493 assert result.exit_code == 0
494 assert "alice" in result.output
495 assert "bob" in result.output
496
497 def test_rename_shown_and_tracked(self, tmp_path: pathlib.Path) -> None:
498 repo = _make_repo(tmp_path)
499 c1 = _write_commit(repo, message="create", delta=_make_delta([_insert_op("f.py::old")]), dt_offset_days=0)
500 _write_commit(repo, message="rename", parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::old", "renamed to new")]), dt_offset_days=1)
501 result = _invoke(repo, "f.py::new")
502 assert result.exit_code == 0
503 assert "renamed" in result.output
504
505
506 # ---------------------------------------------------------------------------
507 # Integration — JSON output
508 # ---------------------------------------------------------------------------
509
510
511 class TestBlameJson:
512 def test_json_schema_all_fields(self, tmp_path: pathlib.Path) -> None:
513 repo = _make_repo(tmp_path)
514 delta = _make_delta([_insert_op("f.py::foo")])
515 _write_commit(repo, delta=delta)
516 result = _invoke(repo, "f.py::foo", "--json")
517 assert result.exit_code == 0
518 data = _parse_json(result)
519 for field in ("address", "start_ref", "total_commits_scanned", "truncated", "events"):
520 assert field in data, f"Missing field: {field}"
521
522 def test_json_address_matches(self, tmp_path: pathlib.Path) -> None:
523 repo = _make_repo(tmp_path)
524 _write_commit(repo)
525 result = _invoke(repo, "f.py::foo", "--json")
526 data = _parse_json(result)
527 assert data["address"] == "f.py::foo"
528
529 def test_json_event_fields(self, tmp_path: pathlib.Path) -> None:
530 repo = _make_repo(tmp_path)
531 delta = _make_delta([_insert_op("f.py::foo")])
532 _write_commit(repo, delta=delta)
533 result = _invoke(repo, "f.py::foo", "--json")
534 data = _parse_json(result)
535 assert len(data["events"]) == 1
536 ev = data["events"][0]
537 for field in (
538 "event", "commit_id", "author", "message",
539 "committed_at", "address", "detail", "new_address",
540 ):
541 assert field in ev, f"Missing event field: {field}"
542
543 def test_json_events_chronological(self, tmp_path: pathlib.Path) -> None:
544 repo = _make_repo(tmp_path)
545 c1 = _write_commit(repo, message="create", delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0)
546 _write_commit(repo, message="modify", parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo")]), dt_offset_days=1)
547 result = _invoke(repo, "f.py::foo", "--json")
548 data = _parse_json(result)
549 events = data["events"]
550 assert len(events) == 2
551 # Chronological (oldest first) in JSON
552 assert events[0]["event"] == "created"
553 assert events[1]["event"] == "modified"
554
555 def test_json_truncated_false_small_history(
556 self, tmp_path: pathlib.Path
557 ) -> None:
558 repo = _make_repo(tmp_path)
559 _write_commit(repo)
560 result = _invoke(repo, "f.py::foo", "--json")
561 data = _parse_json(result)
562 assert data["truncated"] is False
563
564 def test_json_no_events_empty_list(self, tmp_path: pathlib.Path) -> None:
565 repo = _make_repo(tmp_path)
566 _write_commit(repo)
567 result = _invoke(repo, "f.py::foo", "--json")
568 data = _parse_json(result)
569 assert data["events"] == []
570
571 def test_json_output_is_valid_json(self, tmp_path: pathlib.Path) -> None:
572 repo = _make_repo(tmp_path)
573 delta = _make_delta([_insert_op("f.py::foo")])
574 _write_commit(repo, delta=delta)
575 result = _invoke(repo, "f.py::foo", "--json")
576 assert result.exit_code == 0
577 # Must be parseable as JSON
578 start = result.output.index("{")
579 json.loads(result.output[start:])
580
581
582 # ---------------------------------------------------------------------------
583 # Integration — --kind filter
584 # ---------------------------------------------------------------------------
585
586
587 class TestKindFilter:
588 def test_kind_created_only(self, tmp_path: pathlib.Path) -> None:
589 repo = _make_repo(tmp_path)
590 c1 = _write_commit(repo, delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0)
591 _write_commit(repo, parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo")]), dt_offset_days=1)
592 result = _invoke(repo, "f.py::foo", "--kind", "created", "--all")
593 assert result.exit_code == 0
594 assert "created" in result.output
595 assert "modified" not in result.output
596
597 def test_kind_modified_only(self, tmp_path: pathlib.Path) -> None:
598 repo = _make_repo(tmp_path)
599 c1 = _write_commit(repo, delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0)
600 _write_commit(repo, parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo", "changed")]), dt_offset_days=1)
601 result = _invoke(repo, "f.py::foo", "--kind", "modified", "--all")
602 assert result.exit_code == 0
603 assert "changed" in result.output
604 assert "created" not in result.output
605
606 def test_kind_multiple_values(self, tmp_path: pathlib.Path) -> None:
607 repo = _make_repo(tmp_path)
608 delta = _make_delta([_insert_op("f.py::foo")])
609 _write_commit(repo, delta=delta)
610 result = _invoke(repo, "f.py::foo", "--kind", "created", "--kind", "modified")
611 assert result.exit_code == 0
612
613 def test_invalid_kind_exits_user_error(self, tmp_path: pathlib.Path) -> None:
614 repo = _make_repo(tmp_path)
615 _write_commit(repo)
616 result = _invoke(repo, "f.py::foo", "--kind", "invented")
617 assert result.exit_code == ExitCode.USER_ERROR.value
618
619 def test_kind_filter_in_json(self, tmp_path: pathlib.Path) -> None:
620 repo = _make_repo(tmp_path)
621 c1 = _write_commit(repo, delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0)
622 _write_commit(repo, parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo")]), dt_offset_days=1)
623 result = _invoke(repo, "f.py::foo", "--kind", "modified", "--json")
624 data = _parse_json(result)
625 assert all(ev["event"] == "modified" for ev in data["events"])
626
627 def test_no_match_shows_filter_message(self, tmp_path: pathlib.Path) -> None:
628 repo = _make_repo(tmp_path)
629 delta = _make_delta([_insert_op("f.py::foo")])
630 _write_commit(repo, delta=delta)
631 result = _invoke(repo, "f.py::foo", "--kind", "deleted")
632 assert result.exit_code == 0
633 assert "no events match" in result.output
634
635
636 # ---------------------------------------------------------------------------
637 # Integration — --author filter
638 # ---------------------------------------------------------------------------
639
640
641 class TestAuthorFilter:
642 def test_author_filter_matches(self, tmp_path: pathlib.Path) -> None:
643 repo = _make_repo(tmp_path)
644 delta = _make_delta([_insert_op("f.py::foo")])
645 _write_commit(repo, author="alice", delta=delta)
646 result = _invoke(repo, "f.py::foo", "--author", "alice")
647 assert result.exit_code == 0
648 assert "alice" in result.output
649
650 def test_author_filter_case_insensitive(self, tmp_path: pathlib.Path) -> None:
651 repo = _make_repo(tmp_path)
652 delta = _make_delta([_insert_op("f.py::foo")])
653 _write_commit(repo, author="Alice", delta=delta)
654 result = _invoke(repo, "f.py::foo", "--author", "ALICE")
655 assert result.exit_code == 0
656 assert "Alice" in result.output
657
658 def test_author_filter_no_match_empty(self, tmp_path: pathlib.Path) -> None:
659 repo = _make_repo(tmp_path)
660 delta = _make_delta([_insert_op("f.py::foo")])
661 _write_commit(repo, author="alice", delta=delta)
662 result = _invoke(repo, "f.py::foo", "--author", "nosuchauthor")
663 assert result.exit_code == 0
664 assert "no events match" in result.output
665
666 def test_author_filter_in_json(self, tmp_path: pathlib.Path) -> None:
667 repo = _make_repo(tmp_path)
668 c1 = _write_commit(repo, author="alice", delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0)
669 _write_commit(repo, author="bob", parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo")]), dt_offset_days=1)
670 result = _invoke(repo, "f.py::foo", "--author", "alice", "--json")
671 data = _parse_json(result)
672 assert all(ev["author"] == "alice" for ev in data["events"])
673
674
675 # ---------------------------------------------------------------------------
676 # Security
677 # ---------------------------------------------------------------------------
678
679
680 class TestBlameSecurity:
681 _ANSI = "\x1b[31mmalicious\x1b[0m"
682
683 def test_ansi_in_address_rejected(self, tmp_path: pathlib.Path) -> None:
684 repo = _make_repo(tmp_path)
685 _write_commit(repo)
686 result = _invoke(repo, f"f.py::{self._ANSI}")
687 assert result.exit_code == ExitCode.USER_ERROR.value
688
689 def test_null_byte_in_address_rejected(self, tmp_path: pathlib.Path) -> None:
690 repo = _make_repo(tmp_path)
691 _write_commit(repo)
692 result = _invoke(repo, "f.py::foo\x00bar")
693 assert result.exit_code == ExitCode.USER_ERROR.value
694
695 def test_control_char_in_address_rejected(self, tmp_path: pathlib.Path) -> None:
696 repo = _make_repo(tmp_path)
697 _write_commit(repo)
698 result = _invoke(repo, "f.py::foo\x07bell")
699 assert result.exit_code == ExitCode.USER_ERROR.value
700
701 def test_ansi_in_stored_detail_stripped_from_output(
702 self, tmp_path: pathlib.Path
703 ) -> None:
704 """ANSI in a commit's new_summary must not reach the terminal."""
705 repo = _make_repo(tmp_path)
706 # Store a commit with ANSI in new_summary (simulates a compromised record)
707 malicious_summary = f"modified {self._ANSI}"
708 delta = _make_delta([_replace_op("f.py::foo", malicious_summary)])
709 _write_commit(repo, delta=delta)
710 result = _invoke(repo, "f.py::foo")
711 assert result.exit_code == 0
712 assert "\x1b[" not in result.output
713
714 def test_ansi_in_author_stripped_from_output(
715 self, tmp_path: pathlib.Path
716 ) -> None:
717 repo = _make_repo(tmp_path)
718 delta = _make_delta([_insert_op("f.py::foo")])
719 _write_commit(repo, author=self._ANSI, delta=delta)
720 result = _invoke(repo, "f.py::foo")
721 assert result.exit_code == 0
722 assert "\x1b[" not in result.output
723
724 def test_ansi_in_message_stripped_from_output(
725 self, tmp_path: pathlib.Path
726 ) -> None:
727 repo = _make_repo(tmp_path)
728 delta = _make_delta([_insert_op("f.py::foo")])
729 _write_commit(repo, message=f"commit {self._ANSI}", delta=delta)
730 result = _invoke(repo, "f.py::foo")
731 assert result.exit_code == 0
732 assert "\x1b[" not in result.output
733
734 def test_missing_address_separator_exits_user_error(
735 self, tmp_path: pathlib.Path
736 ) -> None:
737 repo = _make_repo(tmp_path)
738 _write_commit(repo)
739 result = _invoke(repo, "no-separator")
740 assert result.exit_code == ExitCode.USER_ERROR.value
741
742 def test_commit_not_found_exits_not_found(
743 self, tmp_path: pathlib.Path
744 ) -> None:
745 repo = _make_repo(tmp_path)
746 _write_commit(repo)
747 result = _invoke(repo, "f.py::foo", "--from", NULL_COMMIT_ID)
748 assert result.exit_code == ExitCode.NOT_FOUND.value
749
750 def test_error_message_no_traceback(self, tmp_path: pathlib.Path) -> None:
751 repo = _make_repo(tmp_path)
752 result = _invoke(repo, "no-separator")
753 assert "Traceback" not in result.output
754
755 def test_json_stdout_clean_on_success(self, tmp_path: pathlib.Path) -> None:
756 """JSON consumers must not see non-JSON data on stdout."""
757 repo = _make_repo(tmp_path)
758 delta = _make_delta([_insert_op("f.py::foo")])
759 _write_commit(repo, delta=delta)
760 result = _invoke(repo, "f.py::foo", "--json")
761 stripped = result.output.lstrip()
762 assert stripped.startswith("{"), f"Expected JSON on stdout, got: {result.output[:80]!r}"
763
764
765 # ---------------------------------------------------------------------------
766 # E2E — full CLI flag coverage
767 # ---------------------------------------------------------------------------
768
769
770 class TestE2E:
771 def test_help_shows_new_flags(self, tmp_path: pathlib.Path) -> None:
772 result = runner.invoke(cli, ["code", "blame", "--help"])
773 assert result.exit_code == 0
774 assert "--kind" in result.output
775 assert "--author" in result.output
776 assert "--all" in result.output
777 assert "--json" in result.output
778 assert "--from" in result.output
779 assert "--max" in result.output
780
781 def test_default_max_is_applied(self, tmp_path: pathlib.Path) -> None:
782 from muse.cli.commands.blame import _DEFAULT_MAX
783 assert _DEFAULT_MAX == 500
784
785 def test_max_one_commit_scanned(self, tmp_path: pathlib.Path) -> None:
786 repo = _make_repo(tmp_path)
787 _write_commit(repo)
788 result = _invoke(repo, "f.py::foo", "--max", "1", "--json")
789 assert result.exit_code == 0
790 data = _parse_json(result)
791 assert data["total_commits_scanned"] == 1
792
793 def test_from_ref_head(self, tmp_path: pathlib.Path) -> None:
794 repo = _make_repo(tmp_path)
795 delta = _make_delta([_insert_op("f.py::foo")])
796 _write_commit(repo, delta=delta)
797 result = _invoke(repo, "f.py::foo", "--from", "HEAD")
798 assert result.exit_code == 0
799
800 def test_kind_and_author_combined(self, tmp_path: pathlib.Path) -> None:
801 repo = _make_repo(tmp_path)
802 c1 = _write_commit(repo, author="alice", delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0)
803 _write_commit(repo, author="bob", parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo")]), dt_offset_days=1)
804 result = _invoke(repo, "f.py::foo", "--kind", "created", "--author", "alice", "--json")
805 data = _parse_json(result)
806 assert all(
807 ev["event"] == "created" and ev["author"] == "alice"
808 for ev in data["events"]
809 )
810
811 def test_full_history_chronological_in_json(
812 self, tmp_path: pathlib.Path
813 ) -> None:
814 repo = _make_repo(tmp_path)
815 c1 = _write_commit(repo, message="create", delta=_make_delta([_insert_op("f.py::foo")]), dt_offset_days=0)
816 c2 = _write_commit(repo, message="mod1", parent_id=c1.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod1")]), dt_offset_days=1)
817 _write_commit(repo, message="mod2", parent_id=c2.commit_id, delta=_make_delta([_replace_op("f.py::foo", "mod2")]), dt_offset_days=2)
818 result = _invoke(repo, "f.py::foo", "--json", "--all")
819 data = _parse_json(result)
820 messages = [ev["message"] for ev in data["events"]]
821 assert messages == ["create", "mod1", "mod2"]
822
823
824 # ---------------------------------------------------------------------------
825 # Stress
826 # ---------------------------------------------------------------------------
827
828
829 class TestStress:
830 def test_early_exit_on_created(self, tmp_path: pathlib.Path) -> None:
831 """Scan stops at 'created' — rest of chain is not processed."""
832 repo = _make_repo(tmp_path)
833 # chain: create → 49 modifications
834 c = _write_commit(
835 repo, message="create",
836 delta=_make_delta([_insert_op("f.py::foo")]),
837 dt_offset_days=0,
838 )
839 for i in range(1, 50):
840 c = _write_commit(
841 repo, message=f"mod{i}", parent_id=c.commit_id,
842 delta=_make_delta([_replace_op("f.py::foo", f"mod{i}")]),
843 dt_offset_days=i,
844 )
845 result = _invoke(repo, "f.py::foo", "--json", "--all")
846 assert result.exit_code == 0
847 data = _parse_json(result)
848 # All 50 events should be present (created + 49 mods)
849 assert len(data["events"]) == 50
850 # early-exit: commits scanned should be exactly 50 (not more)
851 assert data["total_commits_scanned"] == 50
852
853 def test_50_event_history_all_flag(self, tmp_path: pathlib.Path) -> None:
854 repo = _make_repo(tmp_path)
855 c = _write_commit(
856 repo, delta=_make_delta([_insert_op("f.py::bar")]), dt_offset_days=0
857 )
858 for i in range(1, 50):
859 c = _write_commit(
860 repo, parent_id=c.commit_id,
861 delta=_make_delta([_replace_op("f.py::bar", f"change{i}")]),
862 dt_offset_days=i,
863 )
864 result = _invoke(repo, "f.py::bar", "--all")
865 assert result.exit_code == 0
866 assert "change49" in result.output
867
868 def test_concurrent_blame_isolated_repos(
869 self, tmp_path: pathlib.Path
870 ) -> None:
871 """Eight threads each blame their own isolated repo — no shared state."""
872 from muse.cli.commands.blame import _events_in_commit
873
874 errors: list[str] = []
875
876 def worker(idx: int) -> None:
877 try:
878 repo = _make_repo(tmp_path / f"repo{idx}")
879 delta = _make_delta([_insert_op(f"f.py::sym{idx}")])
880 c = _write_commit(repo, delta=delta)
881 # Directly test core logic (not CliRunner — env not thread-safe)
882 evs, _ = _events_in_commit(
883 c, f"f.py::sym{idx}", "f.py", f"sym{idx}"
884 )
885 if len(evs) != 1 or evs[0].kind != "created":
886 errors.append(f"Thread {idx}: unexpected events {evs!r}")
887 except Exception as exc:
888 errors.append(f"Thread {idx}: {exc}")
889
890 threads = [threading.Thread(target=worker, args=(i,)) for i in range(8)]
891 for t in threads:
892 t.start()
893 for t in threads:
894 t.join()
895
896 assert errors == [], f"Concurrent blame failures: {errors}"
897
898 def test_flat_ops_500_patch_children(self) -> None:
899 from muse.cli.commands.blame import _flat_ops
900 from muse.domain import PatchOp
901
902 children = [_insert_op(f"f.py::sym{i}") for i in range(500)]
903 patch = PatchOp(op="patch", address="f.py", child_ops=children, child_domain="code", child_summary="test")
904 result = _flat_ops([patch])
905 assert len(result) == 500
906
907
908 # ---------------------------------------------------------------------------
909 # Flag registration tests
910 # ---------------------------------------------------------------------------
911
912 import argparse as _argparse
913 from muse.cli.commands.blame import register as _register_blame
914 from muse.core.paths import muse_dir, ref_path
915
916
917 def _parse_blame(*args: str) -> _argparse.Namespace:
918 """Build an argument parser via register() and parse args."""
919 root_p = _argparse.ArgumentParser()
920 subs = root_p.add_subparsers(dest="cmd")
921 _register_blame(subs)
922 return root_p.parse_args(["blame", *args])
923
924
925 class TestRegisterFlags:
926 def test_default_json_out_is_false(self) -> None:
927 ns = _parse_blame("src/foo.py")
928 assert ns.json_out is False
929
930 def test_json_flag_sets_json_out(self) -> None:
931 ns = _parse_blame("src/foo.py", "--json")
932 assert ns.json_out is True
933
934 def test_j_shorthand_sets_json_out(self) -> None:
935 ns = _parse_blame("src/foo.py", "-j")
936 assert ns.json_out is True
937
938 def test_address_positional(self) -> None:
939 ns = _parse_blame("src/foo.py::MyFn")
940 assert ns.address == "src/foo.py::MyFn"
941
942 def test_all_flag(self) -> None:
943 ns = _parse_blame("src/foo.py", "--all")
944 assert ns.show_all is True
945
946 def test_a_shorthand_for_all(self) -> None:
947 ns = _parse_blame("src/foo.py", "-a")
948 assert ns.show_all is True
File History 1 commit