gabriel / muse public
test_cmd_cat.py python
853 lines 33.7 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 20 days ago
1 """Comprehensive tests for ``muse code cat``.
2
3 Coverage
4 --------
5 Unit
6 _extract_source — basic slice, context lines, unicode, binary-safe
7 _format_line_numbers — numbering, width padding, first-line offset
8 _resolve_symbol — qualified match, bare-name match, ambiguous, missing
9 _get_file_bytes — workdir read, object-store fallback, not-in-manifest
10
11 Integration
12 cat ADDRESS — found, missing, no "::", JSON output
13 cat --all — all symbols in file, kind filter
14 cat --at REF — historical snapshot
15 cat --context N — surrounding lines appear
16 cat --line-numbers — line numbers prefix
17 cat --json — schema, errors list, unicode
18 cat multiple addresses — batch lookup, partial errors
19 cat untracked file — exits 1, FILE_NOT_TRACKED (never leaks content)
20 cat staged file — staged blob readable from object store
21
22 Security
23 sanitize_display — control chars in address do not crash
24 missing repo — exits non-zero outside repo
25
26 Stress
27 file with 200 symbols — --all completes in < 5 s
28 50 addresses in one call — batch under 3 s
29 """
30
31 from __future__ import annotations
32
33 import json
34 import pathlib
35 import textwrap
36 import time
37
38 import pytest
39
40 from typing import Literal
41
42 from tests.cli_test_helper import CliRunner
43 from muse.cli.commands.cat import (
44 _FileError,
45 _extract_source,
46 _format_line_numbers,
47 _get_file_bytes,
48 _resolve_symbol,
49 )
50 from muse.plugins.code.ast_parser import SymbolRecord, SymbolKind
51 from muse.core.types import NULL_LONG_ID, long_id
52 from muse.core.object_store import write_object
53 from muse.core.types import long_id as _long_id, blob_id as _blob_id
54 from muse.plugins.code.stage import make_entry, write_stage
55
56 cli = None
57 runner = CliRunner()
58
59
60 # ---------------------------------------------------------------------------
61 # Helpers
62 # ---------------------------------------------------------------------------
63
64
65 def _make_record(
66 qualified_name: str,
67 name: str,
68 lineno: int,
69 end_lineno: int,
70 kind: SymbolKind = "function",
71 ) -> SymbolRecord:
72 return SymbolRecord(
73 name=name,
74 qualified_name=qualified_name,
75 kind=kind,
76 lineno=lineno,
77 end_lineno=end_lineno,
78 content_id="a" * 64,
79 body_hash="b" * 64,
80 signature_id="c" * 64,
81 metadata_id="",
82 canonical_key=f"mod.py###{kind}#{name}#{lineno}",
83 )
84
85
86 # ---------------------------------------------------------------------------
87 # Shared repo fixture
88 # ---------------------------------------------------------------------------
89
90
91 @pytest.fixture
92 def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
93 monkeypatch.chdir(tmp_path)
94 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
95 r = runner.invoke(cli, ["init", "--domain", "code"])
96 assert r.exit_code == 0, r.output
97
98 (tmp_path / "billing.py").write_text(textwrap.dedent("""\
99 class Invoice:
100 def compute_total(self, items: list[int]) -> int:
101 return sum(items)
102
103 def apply_discount(self, total: float, pct: float) -> float:
104 return total * (1 - pct)
105
106 def validate_amount(amount: float) -> bool:
107 return amount > 0
108
109 def format_receipt(amount: float) -> str:
110 return f"Total: {amount:.2f}"
111 """))
112
113 r2 = runner.invoke(cli, ["commit", "-m", "initial"])
114 assert r2.exit_code == 0, r2.output
115 return tmp_path
116
117
118 # ---------------------------------------------------------------------------
119 # Unit — _extract_source
120 # ---------------------------------------------------------------------------
121
122
123 class TestExtractSource:
124 _SOURCE = b"line1\nline2\nline3\nline4\nline5\n"
125
126 def test_basic_slice(self) -> None:
127 result = _extract_source(self._SOURCE, lineno=2, end_lineno=3)
128 assert result == "line2\nline3"
129
130 def test_single_line(self) -> None:
131 result = _extract_source(self._SOURCE, lineno=1, end_lineno=1)
132 assert result == "line1"
133
134 def test_context_before(self) -> None:
135 result = _extract_source(self._SOURCE, lineno=3, end_lineno=3, context=1)
136 assert "line2" in result
137 assert "line3" in result
138
139 def test_context_after(self) -> None:
140 result = _extract_source(self._SOURCE, lineno=3, end_lineno=3, context=1)
141 assert "line4" in result
142
143 def test_context_clamps_at_start(self) -> None:
144 # Asking for 10 lines of context before line 1 must not go negative.
145 result = _extract_source(self._SOURCE, lineno=1, end_lineno=1, context=10)
146 assert "line1" in result
147
148 def test_context_clamps_at_end(self) -> None:
149 result = _extract_source(self._SOURCE, lineno=5, end_lineno=5, context=10)
150 assert "line5" in result
151
152 def test_unicode_round_trips(self) -> None:
153 src = "def café() -> str:\n return 'café'\n".encode()
154 result = _extract_source(src, lineno=1, end_lineno=2)
155 assert "café" in result
156
157 def test_binary_errors_replaced(self) -> None:
158 src = b"def foo():\n x = \xff\xfe\n"
159 result = _extract_source(src, lineno=1, end_lineno=2)
160 assert "foo" in result # must not raise
161
162
163 # ---------------------------------------------------------------------------
164 # Unit — _format_line_numbers
165 # ---------------------------------------------------------------------------
166
167
168 class TestFormatLineNumbers:
169 def test_single_line_numbered(self) -> None:
170 result = _format_line_numbers("hello", start_lineno=5)
171 assert result.startswith("5")
172 assert "hello" in result
173
174 def test_multiline_numbered(self) -> None:
175 source = "a\nb\nc"
176 result = _format_line_numbers(source, start_lineno=1)
177 lines = result.splitlines()
178 assert len(lines) == 3
179 assert lines[0].startswith("1")
180 assert lines[2].startswith("3")
181
182 def test_width_pads_for_large_line_numbers(self) -> None:
183 # 100 lines → width=3; the separator " " appears at offset 3 for every line.
184 source = "\n".join(f"line_{i}" for i in range(100))
185 result = _format_line_numbers(source, start_lineno=1)
186 expected_width = len(str(100)) # 3
187 for line in result.splitlines():
188 sep = line[expected_width : expected_width + 2]
189 assert sep == " ", f"separator not at col {expected_width} in {line!r}"
190
191 def test_offset_start_lineno(self) -> None:
192 result = _format_line_numbers("hello", start_lineno=42)
193 assert result.startswith("42")
194
195
196 # ---------------------------------------------------------------------------
197 # Unit — _resolve_symbol
198 # ---------------------------------------------------------------------------
199
200
201 class TestResolveSymbol:
202 def test_qualified_name_match(self) -> None:
203 tree = {
204 "mod.py::MyClass.my_method": _make_record("MyClass.my_method", "my_method", 5, 7),
205 }
206 record, err = _resolve_symbol(tree, "MyClass.my_method", "mod.py")
207 assert record is not None
208 assert err == ""
209
210 def test_bare_name_unambiguous(self) -> None:
211 tree = {
212 "mod.py::my_func": _make_record("my_func", "my_func", 1, 3),
213 }
214 record, err = _resolve_symbol(tree, "my_func", "mod.py")
215 assert record is not None
216
217 def test_bare_name_ambiguous_returns_error(self) -> None:
218 tree = {
219 "mod.py::A.validate": _make_record("A.validate", "validate", 1, 2),
220 "mod.py::B.validate": _make_record("B.validate", "validate", 5, 6),
221 }
222 record, err = _resolve_symbol(tree, "validate", "mod.py")
223 assert record is None
224 assert "ambiguous" in err.lower() or "qualify" in err.lower()
225
226 def test_not_found_returns_error_message(self) -> None:
227 tree = {
228 "mod.py::existing": _make_record("existing", "existing", 1, 3),
229 }
230 record, err = _resolve_symbol(tree, "missing_func", "mod.py")
231 assert record is None
232 assert "not found" in err.lower() or "missing_func" in err
233
234 def test_empty_tree_returns_error(self) -> None:
235 record, err = _resolve_symbol({}, "anything", "mod.py")
236 assert record is None
237 assert len(err) > 0
238
239 def test_import_symbols_excluded_from_suggestions(self) -> None:
240 """Import pseudo-symbols must not show up as 'available' options."""
241 tree = {
242 "mod.py::import::os": _make_record("import::os", "os", 1, 1, kind="import"),
243 }
244 record, err = _resolve_symbol(tree, "missing", "mod.py")
245 assert record is None
246 assert "import::os" not in err
247
248
249 # ---------------------------------------------------------------------------
250 # Integration — basic address lookup
251 # ---------------------------------------------------------------------------
252
253
254 class TestCatBasic:
255 def test_finds_top_level_function(self, repo: pathlib.Path) -> None:
256 result = runner.invoke(cli, ["code", "cat", "billing.py::validate_amount"])
257 assert result.exit_code == 0, result.output
258 assert "validate_amount" in result.output
259
260 def test_shows_function_body(self, repo: pathlib.Path) -> None:
261 result = runner.invoke(cli, ["code", "cat", "billing.py::validate_amount"])
262 assert result.exit_code == 0
263 assert "amount > 0" in result.output
264
265 def test_finds_method(self, repo: pathlib.Path) -> None:
266 result = runner.invoke(cli, ["code", "cat", "billing.py::Invoice.compute_total"])
267 assert result.exit_code == 0
268 assert "compute_total" in result.output
269
270 def test_missing_symbol_exits_one(self, repo: pathlib.Path) -> None:
271 result = runner.invoke(cli, ["code", "cat", "billing.py::zzz_nonexistent"])
272 assert result.exit_code == 1
273
274 def test_no_separator_exits_one(self, repo: pathlib.Path) -> None:
275 result = runner.invoke(cli, ["code", "cat", "billing.py"])
276 assert result.exit_code == 1
277
278 def test_untracked_file_exits_one(self, repo: pathlib.Path) -> None:
279 result = runner.invoke(cli, ["code", "cat", "nowhere.py::foo"])
280 assert result.exit_code == 1
281
282
283 # ---------------------------------------------------------------------------
284 # Integration — --all mode
285 # ---------------------------------------------------------------------------
286
287
288 class TestCatFileFlag:
289 """--file <path> is a convenience alias for <path> --all."""
290
291 def test_file_flag_prints_all_symbols(self, repo: pathlib.Path) -> None:
292 result = runner.invoke(cli, ["code", "cat", "--file", "billing.py"])
293 assert result.exit_code == 0
294 assert "validate_amount" in result.output
295 assert "format_receipt" in result.output
296 assert "compute_total" in result.output
297
298 def test_file_flag_accepts_kind_filter(self, repo: pathlib.Path) -> None:
299 result = runner.invoke(cli, ["code", "cat", "--file", "billing.py", "--kind", "function"])
300 assert result.exit_code == 0
301 assert "validate_amount" in result.output
302
303 def test_file_flag_json_output(self, repo: pathlib.Path) -> None:
304 result = runner.invoke(cli, ["code", "cat", "--file", "billing.py", "--json"])
305 assert result.exit_code == 0
306 data = json.loads(result.output)
307 assert "results" in data
308 names = [r["symbol"] for r in data["results"]]
309 assert "validate_amount" in names
310
311 def test_file_flag_untracked_file_errors(self, repo: pathlib.Path) -> None:
312 result = runner.invoke(cli, ["code", "cat", "--file", "missing.py"])
313 assert result.exit_code != 0
314
315
316 class TestCatAll:
317 def test_all_prints_every_non_import_symbol(self, repo: pathlib.Path) -> None:
318 result = runner.invoke(cli, ["code", "cat", "--all", "billing.py"])
319 assert result.exit_code == 0
320 assert "validate_amount" in result.output
321 assert "format_receipt" in result.output
322 assert "compute_total" in result.output
323
324 def test_all_kind_filter_functions_only(self, repo: pathlib.Path) -> None:
325 result = runner.invoke(cli, ["code", "cat", "--all", "--kind", "function", "billing.py"])
326 assert result.exit_code == 0
327 assert "validate_amount" in result.output
328 # Classes should not appear as their own block (only functions).
329
330 def test_all_untracked_file_skips_gracefully(self, repo: pathlib.Path) -> None:
331 result = runner.invoke(cli, ["code", "cat", "--all", "missing.py"])
332 # Should exit with error since file not in manifest.
333 assert result.exit_code != 0 or "not tracked" in result.output.lower()
334
335
336 # ---------------------------------------------------------------------------
337 # Integration — --line-numbers and --context
338 # ---------------------------------------------------------------------------
339
340
341 class TestCatLineNumbers:
342 def test_line_numbers_flag(self, repo: pathlib.Path) -> None:
343 result = runner.invoke(cli, [
344 "code", "cat", "--line-numbers", "billing.py::validate_amount",
345 ])
346 assert result.exit_code == 0
347 # Some line in the output should start with a digit.
348 output_lines = result.output.splitlines()
349 has_number = any(line.strip() and line.strip()[0].isdigit() for line in output_lines)
350 assert has_number
351
352 def test_context_includes_surrounding_lines(self, repo: pathlib.Path) -> None:
353 result = runner.invoke(cli, [
354 "code", "cat", "--context", "2", "billing.py::validate_amount",
355 ])
356 assert result.exit_code == 0
357 # With context=2 the preceding lines of the file should appear.
358 assert len(result.output.splitlines()) > 2
359
360
361 # ---------------------------------------------------------------------------
362 # Integration — --json
363 # ---------------------------------------------------------------------------
364
365
366 class TestCatJson:
367 def test_json_schema(self, repo: pathlib.Path) -> None:
368 result = runner.invoke(cli, ["code", "cat", "--json", "billing.py::validate_amount"])
369 assert result.exit_code == 0, result.output
370 data = json.loads(result.output)
371 assert "results" in data
372 assert "errors" in data
373 assert len(data["results"]) == 1
374
375 def test_json_result_fields(self, repo: pathlib.Path) -> None:
376 result = runner.invoke(cli, ["code", "cat", "--json", "billing.py::validate_amount"])
377 data = json.loads(result.output)
378 r = data["results"][0]
379 for field in ("address", "source", "kind", "lineno", "end_lineno"):
380 assert field in r, f"missing field {field!r}"
381
382 def test_json_missing_symbol_in_errors(self, repo: pathlib.Path) -> None:
383 result = runner.invoke(cli, ["code", "cat", "--json", "billing.py::zzz_missing"])
384 data = json.loads(result.output)
385 assert len(data["errors"]) == 1
386 assert "zzz_missing" in data["errors"][0].get("error", "")
387
388 def test_json_multiple_results(self, repo: pathlib.Path) -> None:
389 result = runner.invoke(cli, [
390 "code", "cat", "--json",
391 "billing.py::validate_amount",
392 "billing.py::format_receipt",
393 ])
394 data = json.loads(result.output)
395 assert len(data["results"]) == 2
396
397 def test_json_partial_error(self, repo: pathlib.Path) -> None:
398 result = runner.invoke(cli, [
399 "code", "cat", "--json",
400 "billing.py::validate_amount",
401 "billing.py::zzz_missing",
402 ])
403 data = json.loads(result.output)
404 assert len(data["results"]) == 1
405 assert len(data["errors"]) == 1
406
407 def test_json_has_duration_ms(self, repo: pathlib.Path) -> None:
408 result = runner.invoke(cli, ["code", "cat", "--json", "billing.py::validate_amount"])
409 assert result.exit_code == 0
410 data = json.loads(result.output)
411 assert "duration_ms" in data
412 assert isinstance(data["duration_ms"], float)
413 assert data["duration_ms"] >= 0.0
414
415 def test_json_has_source_ref(self, repo: pathlib.Path) -> None:
416 result = runner.invoke(cli, ["code", "cat", "--json", "billing.py::validate_amount"])
417 data = json.loads(result.output)
418 assert "source_ref" in data
419 assert data["source_ref"] == "working tree"
420
421 def test_format_json_flag(self, repo: pathlib.Path) -> None:
422 """--json flag produces JSON output."""
423 result = runner.invoke(cli, [
424 "code", "cat", "--json", "billing.py::validate_amount",
425 ])
426 assert result.exit_code == 0
427 data = json.loads(result.output)
428 assert "results" in data
429
430 def test_format_text_is_default(self, repo: pathlib.Path) -> None:
431 """Default (no flag) produces text output, not JSON."""
432 result = runner.invoke(cli, ["code", "cat", "billing.py::validate_amount"])
433 assert result.exit_code == 0
434 # Text output starts with a '#' header, not a JSON brace.
435 assert result.output.strip().startswith("#")
436
437 def test_json_error_has_error_code(self, repo: pathlib.Path) -> None:
438 """Every error entry in JSON output must have an error_code field."""
439 result = runner.invoke(cli, [
440 "code", "cat", "--json",
441 "billing.py::zzz_missing",
442 "nowhere.py::foo",
443 ])
444 data = json.loads(result.output)
445 for err in data["errors"]:
446 assert "error_code" in err, f"error_code missing from {err}"
447
448
449 # ---------------------------------------------------------------------------
450 # Integration — --at historical snapshot
451 # ---------------------------------------------------------------------------
452
453
454 class TestCatAtRef:
455 def test_at_head_works(self, repo: pathlib.Path) -> None:
456 result = runner.invoke(cli, [
457 "code", "cat", "--at", "HEAD", "billing.py::validate_amount",
458 ])
459 assert result.exit_code == 0, result.output
460 assert "validate_amount" in result.output
461
462 def test_at_bad_ref_exits_one(self, repo: pathlib.Path) -> None:
463 result = runner.invoke(cli, [
464 "code", "cat", "--at", "zzz_bad_ref_xyz",
465 "billing.py::validate_amount",
466 ])
467 assert result.exit_code == 1
468
469
470 # ---------------------------------------------------------------------------
471 # Integration — --staged flag
472 # ---------------------------------------------------------------------------
473
474
475 class TestCatStaged:
476 """muse code cat --staged reads symbols from the staged version of a file."""
477
478 def test_staged_reads_staged_blob(self, repo: pathlib.Path) -> None:
479 """--staged returns staged content, not the working-tree disk version."""
480 staged_content = (
481 "class Invoice:\n"
482 " def compute_total(self, items: list[int]) -> int:\n"
483 " return sum(items) + 99 # staged addition\n\n"
484 " def apply_discount(self, total: float, pct: float) -> float:\n"
485 " return total * (1 - pct)\n\n"
486 "def validate_amount(amount: float) -> bool:\n"
487 " return amount > 0\n\n"
488 "def format_receipt(amount: float) -> str:\n"
489 " return f'Total: {amount:.2f}'\n"
490 ).encode()
491 obj_id = _blob_id(staged_content)
492 long_oid = _long_id(obj_id)
493 write_object(repo, long_oid, staged_content)
494 write_stage(repo, {"billing.py": make_entry(long_oid, "M")})
495 result = runner.invoke(
496 cli, ["code", "cat", "--staged", "billing.py::Invoice.compute_total"]
497 )
498 assert result.exit_code == 0
499 assert "99" in result.output # staged addition visible
500
501 def test_staged_json_source_ref_is_staged(self, repo: pathlib.Path) -> None:
502 result = runner.invoke(
503 cli, ["code", "cat", "--staged", "--json", "billing.py::validate_amount"]
504 )
505 assert result.exit_code == 0
506 data = json.loads(result.output)
507 assert data.get("source_ref") == "staged"
508
509 def test_staged_falls_back_to_committed_when_no_stage_entry(
510 self, repo: pathlib.Path
511 ) -> None:
512 """File in HEAD but not in stage → --staged falls back to HEAD version."""
513 result = runner.invoke(
514 cli, ["code", "cat", "--staged", "billing.py::validate_amount"]
515 )
516 assert result.exit_code == 0
517 assert "validate_amount" in result.output
518
519 def test_staged_and_at_mutually_exclusive(self, repo: pathlib.Path) -> None:
520 result = runner.invoke(
521 cli,
522 ["code", "cat", "--staged", "--at", "HEAD", "billing.py::validate_amount"],
523 )
524 assert result.exit_code != 0
525
526 def test_staged_and_at_json_has_error(self, repo: pathlib.Path) -> None:
527 result = runner.invoke(
528 cli,
529 [
530 "code", "cat", "--staged", "--at", "HEAD",
531 "--json", "billing.py::validate_amount",
532 ],
533 )
534 assert result.exit_code != 0
535 data = json.loads(result.output)
536 assert "error" in data
537 assert "mutually exclusive" in data["error"].lower()
538
539 def test_staged_untracked_file_errors(self, repo: pathlib.Path) -> None:
540 result = runner.invoke(
541 cli, ["code", "cat", "--staged", "nowhere.py::foo"]
542 )
543 assert result.exit_code != 0
544
545 def test_staged_flag_registered(self) -> None:
546 import argparse as _ap
547 from muse.cli.commands.cat import register as _reg
548 p = _ap.ArgumentParser()
549 subs = p.add_subparsers(dest="cmd")
550 _reg(subs)
551 args = p.parse_args(["cat", "--staged", "f.py::fn"])
552 assert args.staged is True
553
554 def test_staged_default_is_false(self) -> None:
555 import argparse as _ap
556 from muse.cli.commands.cat import register as _reg
557 p = _ap.ArgumentParser()
558 subs = p.add_subparsers(dest="cmd")
559 _reg(subs)
560 args = p.parse_args(["cat", "f.py::fn"])
561 assert args.staged is False
562
563
564 # ---------------------------------------------------------------------------
565 # Security
566 # ---------------------------------------------------------------------------
567
568
569 class TestCatSecurity:
570 def test_requires_repo(
571 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
572 ) -> None:
573 monkeypatch.chdir(tmp_path)
574 monkeypatch.delenv("MUSE_REPO_ROOT", raising=False)
575 result = runner.invoke(cli, ["code", "cat", "billing.py::foo"])
576 assert result.exit_code != 0
577
578 def test_control_chars_in_address_do_not_crash(self, repo: pathlib.Path) -> None:
579 result = runner.invoke(cli, ["code", "cat", "billing.py::foo\x01bar"])
580 assert result.exit_code in (0, 1) # must not raise unhandled exception
581
582 def test_symlink_workdir_rejected(
583 self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch
584 ) -> None:
585 """A tracked file that is actually a symlink must be rejected."""
586 # Create a real file so the manifest knows about it, then replace with a symlink.
587 real = repo / "billing.py"
588 link = repo / "link.py"
589 link.symlink_to(real)
590 # Inject the symlink path into a fake manifest and call _get_file_bytes directly.
591 fake_manifest = {"link.py": "a" * 64}
592 with pytest.raises(_FileError) as exc_info:
593 _get_file_bytes(repo, "link.py", fake_manifest, source_is_workdir=True)
594 assert exc_info.value.code == "SYMLINK_REJECTED"
595
596 def test_path_traversal_rejected(
597 self, repo: pathlib.Path, monkeypatch: pytest.MonkeyPatch
598 ) -> None:
599 """A manifest entry with '..' that escapes the repo must be rejected."""
600 fake_manifest = {"../outside.py": "a" * 64}
601 with pytest.raises(_FileError) as exc_info:
602 _get_file_bytes(repo, "../outside.py", fake_manifest, source_is_workdir=True)
603 assert exc_info.value.code in ("PATH_TRAVERSAL", "FILE_NOT_TRACKED")
604
605 def test_blob_not_found_gives_precise_error_code(
606 self, repo: pathlib.Path
607 ) -> None:
608 """Missing blob raises _FileError with BLOB_NOT_FOUND, not generic exit."""
609 fake_manifest = {"billing.py": NULL_LONG_ID} # blob that doesn't exist in store
610 with pytest.raises(_FileError) as exc_info:
611 _get_file_bytes(repo, "billing.py", fake_manifest, source_is_workdir=False)
612 assert exc_info.value.code == "BLOB_NOT_FOUND"
613
614 def test_symlink_in_json_gives_error_code(self, repo: pathlib.Path) -> None:
615 """Symlink rejection surfaces as a JSON error, not a crash."""
616 link = repo / "symlink_billing.py"
617 link.symlink_to(repo / "billing.py")
618 # Commit so symlink_billing.py appears in the manifest (won't — symlinks
619 # are not tracked by the code plugin, so we test via _all_ on an untracked path).
620 result = runner.invoke(cli, ["code", "cat", "--json", "symlink_billing.py::foo"])
621 # Either exit 0 with an error in the errors list, or exit 1 — never a crash.
622 assert result.exit_code in (0, 1)
623 try:
624 data = json.loads(result.output)
625 # If JSON, errors list must be non-empty or results non-empty.
626 assert isinstance(data, dict)
627 except json.JSONDecodeError:
628 pass # text-mode output is also acceptable here
629
630
631 # ---------------------------------------------------------------------------
632 # Stress
633 # ---------------------------------------------------------------------------
634
635
636 class TestCatStress:
637 @pytest.fixture
638 def large_repo(
639 self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
640 ) -> pathlib.Path:
641 monkeypatch.chdir(tmp_path)
642 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
643 runner.invoke(cli, ["init", "--domain", "code"])
644
645 lines: list[str] = []
646 for i in range(200):
647 lines.append(f"def symbol_{i:04d}(x: int) -> int:")
648 lines.append(f" return x + {i}")
649 lines.append("")
650 (tmp_path / "big.py").write_text("\n".join(lines))
651
652 r = runner.invoke(cli, ["commit", "-m", "big module"])
653 assert r.exit_code == 0, r.output
654 return tmp_path
655
656 def test_all_200_symbols_under_5s(self, large_repo: pathlib.Path) -> None:
657 start = time.monotonic()
658 result = runner.invoke(cli, ["code", "cat", "--all", "big.py"])
659 elapsed = time.monotonic() - start
660 assert result.exit_code == 0, result.output
661 assert elapsed < 5.0, f"--all on 200 symbols took {elapsed:.2f}s"
662 assert "symbol_0000" in result.output
663 assert "symbol_0199" in result.output
664
665 def test_50_addresses_batch_under_5s(self, large_repo: pathlib.Path) -> None:
666 addresses = [f"big.py::symbol_{i:04d}" for i in range(50)]
667 start = time.monotonic()
668 result = runner.invoke(cli, ["code", "cat", "--json"] + addresses)
669 elapsed = time.monotonic() - start
670 assert result.exit_code == 0, result.output
671 assert elapsed < 5.0, f"50-address batch took {elapsed:.2f}s"
672 data = json.loads(result.output)
673 assert len(data["results"]) == 50
674
675 def test_all_json_200_symbols_schema_valid(self, large_repo: pathlib.Path) -> None:
676 result = runner.invoke(cli, ["code", "cat", "--all", "--json", "big.py"])
677 assert result.exit_code == 0, result.output
678 data = json.loads(result.output)
679 assert len(data["results"]) == 200
680 for r in data["results"]:
681 assert "source" in r
682 assert "lineno" in r
683
684 def test_file_cache_batch_same_file_under_2s(self, large_repo: pathlib.Path) -> None:
685 """50 addresses to the same file should benefit from caching: one read, one parse."""
686 addresses = [f"big.py::symbol_{i:04d}" for i in range(50)]
687 start = time.monotonic()
688 result = runner.invoke(cli, ["code", "cat", "--json"] + addresses)
689 elapsed = time.monotonic() - start
690 assert result.exit_code == 0, result.output
691 # With caching, 50 same-file lookups should be fast.
692 assert elapsed < 2.0, f"50 same-file addresses took {elapsed:.2f}s — cache may not be working"
693 data = json.loads(result.output)
694 assert len(data["results"]) == 50
695
696 def test_duration_ms_in_large_batch(self, large_repo: pathlib.Path) -> None:
697 addresses = [f"big.py::symbol_{i:04d}" for i in range(20)]
698 result = runner.invoke(cli, ["code", "cat", "--json"] + addresses)
699 assert result.exit_code == 0
700 data = json.loads(result.output)
701 assert isinstance(data["duration_ms"], float)
702 assert data["duration_ms"] >= 0.0
703
704
705 # ---------------------------------------------------------------------------
706 # Flag registration tests
707 # ---------------------------------------------------------------------------
708
709 import argparse as _argparse
710 from muse.cli.commands.cat import register as _register_cat
711
712
713 def _parse_cat(*args: str) -> _argparse.Namespace:
714 """Build an argument parser via register() and parse args."""
715 root_p = _argparse.ArgumentParser()
716 subs = root_p.add_subparsers(dest="cmd")
717 _register_cat(subs)
718 return root_p.parse_args(["cat", *args])
719
720
721 class TestRegisterFlags:
722 def test_default_json_out_is_false(self) -> None:
723 ns = _parse_cat("src/foo.py::MyFn")
724 assert ns.json_out is False
725
726 def test_json_flag_sets_json_out(self) -> None:
727 ns = _parse_cat("src/foo.py::MyFn", "--json")
728 assert ns.json_out is True
729
730 def test_j_shorthand_sets_json_out(self) -> None:
731 ns = _parse_cat("src/foo.py::MyFn", "-j")
732 assert ns.json_out is True
733
734 def test_line_numbers_has_no_n_shorthand(self) -> None:
735 import pytest
736 with pytest.raises(SystemExit):
737 _parse_cat("src/foo.py::MyFn", "-n")
738
739 def test_all_flag(self) -> None:
740 ns = _parse_cat("src/foo.py", "--all")
741 assert ns.all_symbols is True
742
743
744 # ---------------------------------------------------------------------------
745 # Phase 3: untracked files error; staged files readable from object store
746 # ---------------------------------------------------------------------------
747
748
749 def _stage_py(repo: pathlib.Path, rel_path: str, content: bytes) -> None:
750 """Write blob to object store and record a stage entry — simulates muse code add."""
751 obj_id = _long_id(_blob_id(content))
752 write_object(repo, obj_id, content)
753 write_stage(repo, {rel_path: make_entry(obj_id, "A")})
754
755
756 _STAGED_SRC = textwrap.dedent("""\
757 def staged_fn(x: int) -> int:
758 return x * 2
759
760 class StagedClass:
761 def method(self) -> None:
762 pass
763 """).encode()
764
765
766 class TestUntrackedFileErrors:
767 """muse code cat must reject files that exist on disk but are not tracked."""
768
769 def test_untracked_on_disk_address_exits_nonzero(
770 self, repo: pathlib.Path
771 ) -> None:
772 (repo / "untracked.py").write_text("def ghost(): pass\n")
773 result = runner.invoke(cli, ["code", "cat", "untracked.py::ghost"])
774 assert result.exit_code != 0
775
776 def test_untracked_on_disk_all_mode_exits_nonzero(
777 self, repo: pathlib.Path
778 ) -> None:
779 (repo / "untracked.py").write_text("def ghost(): pass\n")
780 result = runner.invoke(cli, ["code", "cat", "--all", "untracked.py"])
781 assert result.exit_code != 0
782
783 def test_untracked_on_disk_json_error_code(self, repo: pathlib.Path) -> None:
784 (repo / "untracked.py").write_text("def ghost(): pass\n")
785 result = runner.invoke(
786 cli, ["code", "cat", "untracked.py::ghost", "--json"]
787 )
788 assert result.exit_code != 0
789 data = json.loads(result.output)
790 assert data["errors"][0]["error_code"] == "FILE_NOT_TRACKED"
791
792 def test_untracked_content_not_leaked(self, repo: pathlib.Path) -> None:
793 (repo / "untracked.py").write_text("secret = 'xyzzy'\n")
794 result = runner.invoke(
795 cli, ["code", "cat", "untracked.py::secret", "--json"]
796 )
797 assert "xyzzy" not in result.output
798
799 def test_tracked_file_still_works(self, repo: pathlib.Path) -> None:
800 result = runner.invoke(cli, ["code", "cat", "billing.py::validate_amount"])
801 assert result.exit_code == 0
802 assert "validate_amount" in result.output
803
804
805 class TestStagedFileReads:
806 """muse code cat must read staged-but-uncommitted files from the object store."""
807
808 def test_staged_symbol_readable(self, repo: pathlib.Path) -> None:
809 _stage_py(repo, "new_mod.py", _STAGED_SRC)
810 (repo / "new_mod.py").write_bytes(_STAGED_SRC)
811 result = runner.invoke(cli, ["code", "cat", "new_mod.py::staged_fn"])
812 assert result.exit_code == 0
813 assert "staged_fn" in result.output
814
815 def test_staged_all_mode_readable(self, repo: pathlib.Path) -> None:
816 _stage_py(repo, "new_mod.py", _STAGED_SRC)
817 (repo / "new_mod.py").write_bytes(_STAGED_SRC)
818 result = runner.invoke(cli, ["code", "cat", "--all", "new_mod.py"])
819 assert result.exit_code == 0
820 assert "staged_fn" in result.output
821 assert "StagedClass" in result.output
822
823 def test_staged_json_schema(self, repo: pathlib.Path) -> None:
824 _stage_py(repo, "new_mod.py", _STAGED_SRC)
825 (repo / "new_mod.py").write_bytes(_STAGED_SRC)
826 result = runner.invoke(
827 cli, ["code", "cat", "new_mod.py::staged_fn", "--json"]
828 )
829 assert result.exit_code == 0
830 data = json.loads(result.output)
831 assert len(data["results"]) == 1
832 assert data["results"][0]["symbol"] == "staged_fn"
833
834 def test_staged_blob_deleted_from_disk_still_readable(
835 self, repo: pathlib.Path
836 ) -> None:
837 """Staged blob readable even after the file is removed from disk."""
838 disk = repo / "new_mod.py"
839 disk.write_bytes(_STAGED_SRC)
840 _stage_py(repo, "new_mod.py", _STAGED_SRC)
841 disk.unlink()
842 result = runner.invoke(cli, ["code", "cat", "new_mod.py::staged_fn"])
843 assert result.exit_code == 0
844 assert "staged_fn" in result.output
845
846 def test_staged_content_matches_blob(self, repo: pathlib.Path) -> None:
847 _stage_py(repo, "new_mod.py", _STAGED_SRC)
848 (repo / "new_mod.py").write_bytes(_STAGED_SRC)
849 result = runner.invoke(
850 cli, ["code", "cat", "new_mod.py::staged_fn", "--json"]
851 )
852 data = json.loads(result.output)
853 assert "x * 2" in data["results"][0]["source"]
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 20 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 29 days ago