gabriel / musehub public

typing_audit.py file-level

at sha256:7 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 πŸ’₯ blast risk
sha256:0 fix: fall back to any indexed mpack in read_object_bytes when push mpac… · gabriel · Jun 17, 2026
1 """Typing audit β€” zero-tolerance type-safety enforcement for mission-critical code.
2
3 Every banned pattern maps to a future Rust port liability: if Python cannot
4 name a type, ``rustc`` cannot either. The ratchet keeps the rule enforced
5 continuously so violations never accumulate.
6
7 Patterns checked
8 ----------------
9 *Any-as-type* β€” ``dict[str, Any]``, ``list[Any]``, ``type[Any]``,
10 ``Any | X``, ``X | Any``, ``Mapping[str, Any]``, etc.
11
12 *object-as-type* β€” same severity as Any; erases all structural information.
13
14 *cast()* β€” all usage banned; it conceals a broken callee return type.
15
16 *# type: ignore* β€” every suppressed error is an unaudited assumption.
17
18 *Bare collections* β€” ``list``, ``dict``, ``set``, ``tuple`` without ``[T]``.
19
20 *Optional[X]* and *Union[X, Y]* β€” use ``X | None`` and ``X | Y`` (PEP 604).
21
22 *Legacy typing imports* β€” ``List``, ``Dict``, ``Set``, ``Tuple``.
23
24 *Bare Callable / Callable returning Any* β€” must carry a full signature.
25
26 *Untyped varargs* β€” ``*args: Any``, ``**kwargs: Any``, and unannotated
27 ``*args`` / ``**kwargs`` (annotation absent entirely).
28
29 *Untyped function definitions* β€” missing return or parameter annotation.
30
31 *Unconstrained TypeVar* β€” ``TypeVar(...)`` with no ``bound=`` and no
32 constraint arguments; behaves identically to ``Any`` in practice.
33
34 *Naked dict at boundary* β€” ``dict[str, X]`` as a parameter or return type
35 is banned at function/method boundaries. Every dict with known keys must
36 be a ``TypedDict``; every dict with dynamic keys must justify its key space.
37 The only valid ``dict[str, ...]`` at a boundary is an explicitly named
38 ``TypedDict`` subclass. This rule exists because ``rustc`` cannot infer
39 struct fields from a ``HashMap<String, X>`` β€” named fields must be declared.
40 Pattern ``boundary_dict`` fires on ``: dict[str,`` and ``-> dict[str,``.
41
42 *Anonymous dict in collection* β€” ``list[dict[str, X]]``, ``dict[str, dict[str, X]]``,
43 ``tuple[dict[str, X], ...]``. An anonymous dict nested inside a collection is
44 always a named struct waiting to be declared. Use a ``TypedDict`` subclass or a
45 named type alias (e.g. ``list[JSONObject]``, ``list[SymbolHistoryEntry]``).
46
47 Named type aliases do NOT trigger this rule β€” only the literal expansion does.
48 This is by design: ``list[JSONObject]`` is fine; ``list[dict[str, JSONValue]]`` is not.
49 Rust requires every struct field to be named; ``Vec<HashMap<String, Value>>`` is
50 never the right answer when ``Vec<SymbolEntry>`` is possible.
51
52 ``concrete_dict_in_list`` β€” fires on ``list[dict[str,``, ``tuple[dict[str,``,
53 ``set[dict[str,``
54 ``dict_of_dict`` β€” fires on ``dict[str, dict[str,``
55
56 Usage::
57
58 python tools/typing_audit.py # musehub/ + tests/
59 python tools/typing_audit.py --dirs musehub/ tests/
60 python tools/typing_audit.py --dirs musehub/ --max-any 0 --max-untyped 0
61 python tools/typing_audit.py --json artifacts/typing_audit.json
62 """
63
64 from __future__ import annotations
65
66 import argparse
67 import ast
68 import io
69 import json
70 import operator
71 import re
72 import sys
73 import tokenize
74 from collections import defaultdict
75 from pathlib import Path
76 from typing import TypedDict
77
78 # ---------------------------------------------------------------------------
79 # Type aliases β€” avoid dict[str, X] at function/class-field boundaries.
80 # ---------------------------------------------------------------------------
81
82 type PatternCounts = dict[str, int]
83 type PatternLines = dict[str, list[int]]
84 type PatternMap = dict[str, re.Pattern[str]]
85 type PerFileViolations = dict[str, PatternCounts]
86
87
88 class Violation(TypedDict):
89 """A single typed violation β€” one pattern match at one source location."""
90
91 file: str
92 line: int
93 kind: str
94
95 # ---------------------------------------------------------------------------
96 # Data shapes β€” TypedDicts replace every dict[str, Any] in the old script.
97 # All shapes mirror the Rust struct that will eventually own them.
98 # ---------------------------------------------------------------------------
99
100
101 class UntypedDef(TypedDict):
102 """A function or method that is missing a required type annotation.
103
104 ``issue`` is one of:
105
106 - ``"missing_return_type"`` β€” no return annotation.
107 - ``"missing_param_type"`` β€” a non-self/cls parameter lacks annotation.
108 - ``"untyped_args"`` β€” ``*args`` is annotated as ``Any`` or has
109 no annotation at all.
110 - ``"untyped_kwargs"`` β€” ``**kwargs`` is annotated as ``Any`` or has
111 no annotation at all.
112 - ``"unconstrained_typevar"``β€” a ``TypeVar`` with no ``bound=`` and no
113 positional constraints.
114 """
115
116 file: str
117 line: int
118 name: str
119 issue: str
120
121
122 class FileResult(TypedDict):
123 """Typing-violation summary for a single Python source file."""
124
125 file: str
126 imports_any: bool
127 patterns: PatternCounts
128 pattern_lines: PatternLines
129 type_ignore_variants: PatternCounts
130 untyped_defs: list[UntypedDef]
131
132
133 class Offender(TypedDict):
134 """A file with at least one typing violation, ranked by total count."""
135
136 file: str
137 total: int
138 patterns: PatternCounts
139
140
141 class ReportSummary(TypedDict):
142 """High-level aggregate counts for the entire scan."""
143
144 total_files_scanned: int
145 files_importing_any: int
146 total_any_patterns: int
147 untyped_defs: int
148
149
150 class Report(TypedDict):
151 """Full typing-audit report produced by :func:`generate_report`."""
152
153 summary: ReportSummary
154 pattern_totals: PatternCounts
155 type_ignore_variants: PatternCounts
156 top_offenders: list[Offender]
157 per_file: PerFileViolations
158 violations: list[Violation]
159 untyped_defs: list[UntypedDef]
160
161
162 # ---------------------------------------------------------------------------
163 # String-literal masking
164 # ---------------------------------------------------------------------------
165
166
167 def _mask_string_literals(source: str) -> str:
168 """Replace string-literal content with spaces, preserving newlines.
169
170 Pattern matching runs on the masked source so that raw regex strings,
171 docstrings, and string constants never produce false positives. All
172 newlines are preserved so that line numbers stay accurate.
173
174 Tokenisation errors (e.g. incomplete source snippets) are silently
175 ignored β€” the original source is returned unchanged so the caller still
176 produces *some* output rather than silently dropping the file.
177
178 Args:
179 source: Full UTF-8 source text of a Python file.
180
181 Returns:
182 A copy of *source* with the content of every string token replaced
183 by space characters (newlines within multi-line strings preserved).
184 """
185 chars = list(source)
186 lines = source.splitlines(keepends=True)
187
188 # Pre-compute cumulative line offsets for O(1) (row, col) β†’ offset.
189 offsets: list[int] = [0]
190 for ln in lines:
191 offsets.append(offsets[-1] + len(ln))
192
193 def _abs(row: int, col: int) -> int:
194 return offsets[row - 1] + col
195
196 # Token types that contain string literal content β€” including f-string
197 # middle segments which are FSTRING_MIDDLE (not STRING) in Python 3.12+.
198 _FSTRING_MIDDLE = getattr(tokenize, "FSTRING_MIDDLE", None)
199 _STRING_TYPES = {tokenize.STRING}
200 if _FSTRING_MIDDLE is not None:
201 _STRING_TYPES.add(_FSTRING_MIDDLE)
202
203 try:
204 gen = tokenize.generate_tokens(io.StringIO(source).readline)
205 for tok_type, _tok_str, (srow, scol), (erow, ecol), _ in gen:
206 if tok_type not in _STRING_TYPES:
207 continue
208 start = _abs(srow, scol)
209 end = _abs(erow, ecol)
210 for i in range(start, end):
211 if chars[i] not in {"\n", "\r"}:
212 chars[i] = " "
213 except tokenize.TokenError:
214 pass
215
216 return "".join(chars)
217
218
219 # ---------------------------------------------------------------------------
220 # Pattern registry
221 # ---------------------------------------------------------------------------
222
223 #: All patterns that count toward the violation total.
224 #: Keys are stable identifiers used in JSON output and tests.
225 #:
226 #: NOTE: do NOT use re.IGNORECASE β€” Python type annotations are case-sensitive.
227 #: ``List`` and ``list`` are distinct identifiers; matching ``list[any]``
228 #: (where ``any`` is the built-in function) would be a false positive.
229 _PATTERNS: PatternMap = {
230 # Any-as-type ─────────────────────────────────────────────────────────
231 "dict_str_any": re.compile(r"\bdict\[str,\s*Any\]|\bDict\[str,\s*Any\]"),
232 "list_any": re.compile(r"\blist\[Any\]|\bList\[Any\]"),
233 "type_any": re.compile(r"\btype\[Any\]"),
234 "any_in_union": re.compile(r"\bAny\s*\||\|\s*Any\b"),
235 "return_any": re.compile(r"->\s*Any\b"),
236 "param_any": re.compile(r":\s*Any\b"),
237 "mapping_any": re.compile(r"\bMapping\[str,\s*Any\]"),
238 "optional_any": re.compile(r"\bOptional\[Any\]"),
239 "sequence_any": re.compile(r"\bSequence\[Any\]|\bIterable\[Any\]"),
240 "tuple_any": re.compile(r"\btuple\[[^\n]*Any[^\n]*\]|\bTuple\[[^\n]*Any[^\n]*\]"),
241 # object-as-type ──────────────────────────────────────────────────────
242 "param_object": re.compile(r":\s*object\b"),
243 "return_object": re.compile(r"->\s*object\b"),
244 # Handles one level of nesting, e.g. dict[str, list[object]].
245 # NOTE: Mapping is intentionally excluded β€” Mapping[str, object] is the
246 # correct type for read-only, covariant mappings at framework boundaries
247 # (e.g. Jinja2 template contexts). Mapping[str, Any] is caught separately
248 # by mapping_any. Only mutable collection types need this guard.
249 "collection_object": re.compile(
250 r"\b(?:dict|list|set|tuple|Sequence)"
251 r"\[[^\n\[\]]*(?:\[[^\n\[\]]*\][^\n\[\]]*)*\bobject\b"
252 ),
253 # cast() β€” banned ─────────────────────────────────────────────────────
254 "cast_usage": re.compile(r"(?<![.\w])cast\("),
255 # type: ignore β€” only flag blanket suppresses (no specific error code).
256 # ``# type: ignore[some-code]`` is acceptable when the exact issue is known;
257 # ``# type: ignore`` with no code is a blind suppression and always banned.
258 "type_ignore": re.compile(r"#\s*type:\s*ignore(?!\s*\[)"),
259 # Bare collections (no type parameters) ───────────────────────────────
260 # Negative lookaheads exclude parameterised forms and prose.
261 "bare_list": re.compile(r"(?::\s*|->\s*)list\b(?!\[|\(|\s+[a-z])"),
262 "bare_dict": re.compile(r"(?::\s*|->\s*)dict\b(?!\[|\(|\s+[a-z])"),
263 "bare_set": re.compile(r"(?::\s*|->\s*)set\b(?!\[|\(|\s+[a-z])"),
264 "bare_tuple": re.compile(r"(?::\s*|->\s*)tuple\b(?!\[|\(|\s+[a-z])"),
265 # Optional[X] β€” use X | None (PEP 604) ────────────────────────────────
266 "optional_usage": re.compile(r"\bOptional\[(?!Any\b)"),
267 # Union[X, Y] β€” use X | Y (PEP 604) ──────────────────────────────────
268 "union_usage": re.compile(r"\bUnion\["),
269 # Legacy typing imports (use lowercase builtins) ──────────────────────
270 "legacy_List": re.compile(r"\bList\["),
271 "legacy_Dict": re.compile(r"\bDict\["),
272 "legacy_Set": re.compile(r"\bSet\["),
273 "legacy_Tuple": re.compile(r"\bTuple\["),
274 # Callable β€” must carry full signature ────────────────────────────────
275 "bare_callable": re.compile(r"(?::\s*|->\s*)Callable\b(?!\[)"),
276 "callable_any": re.compile(r"\bCallable\[[^\n]*,\s*Any\s*\]"),
277 # Untyped varargs β€” *args: Any / **kwargs: Any ────────────────────────
278 # Unannotated *args/**kwargs are caught by the AST walker instead.
279 "varargs_any": re.compile(r"\*{1,2}\w+:\s*Any\b"),
280 # Naked dict at boundary β€” dict[str, X] as param/return type is banned.
281 # Every structured boundary must use a TypedDict (or dataclass/enum).
282 # Matches ": dict[str," and "-> dict[str," β€” the two annotation positions.
283 #
284 # APPROVED alternatives at boundaries:
285 # - ReadOnlyJSONObject (= Mapping[str, JSONValue]) for read-only JSON params
286 # - A named TypedDict subclass for any dict with statically known keys
287 #
288 # Mapping[str, JSONValue] is covariant so any dict[str, T where T βŠ† JSONValue]
289 # is assignable to it. This pattern (boundary_dict) does NOT fire on
290 # Mapping[...]; mapping_any does NOT fire on Mapping[str, JSONValue].
291 # Therefore Mapping[str, JSONValue] is the safe boundary form for JSON dicts.
292 "boundary_dict": re.compile(r"(?::\s*|->\s*)dict\[str\s*,"),
293 # Anonymous dict in collection β€” list[dict[str, X]] / dict[str, dict[str, X]].
294 # A dict nested inside a collection is always a named struct opportunity.
295 # Use a TypedDict subclass or a named type alias (e.g. list[JSONObject]).
296 # Named aliases do NOT trigger this rule β€” only the literal expansion does.
297 # This is intentional: list[JSONObject] is fine; list[dict[str, JSONValue]] is not.
298 "concrete_dict_in_list": re.compile(
299 r"\b(?:list|tuple|set)\[dict\[str,"
300 ),
301 "dict_of_dict": re.compile(
302 r"\bdict\[str,\s*dict\[str,"
303 ),
304 }
305
306 # Category groupings for the human-readable report, in display order.
307 _CATEGORY_ORDER: list[tuple[str, list[str]]] = [
308 ("Any-as-type", [
309 "dict_str_any", "list_any", "type_any", "any_in_union",
310 "return_any", "param_any",
311 "mapping_any", "optional_any", "sequence_any", "tuple_any",
312 ]),
313 ("object-as-type", ["param_object", "return_object", "collection_object"]),
314 ("cast() usage", ["cast_usage"]),
315 ("type: ignore", ["type_ignore"]),
316 ("Bare collections", ["bare_list", "bare_dict", "bare_set", "bare_tuple"]),
317 ("Optional (use X | None)", ["optional_usage"]),
318 ("Union (use X | Y)", ["union_usage"]),
319 ("Legacy typing imports", ["legacy_List", "legacy_Dict", "legacy_Set", "legacy_Tuple"]),
320 ("Callable (must carry full signature)", ["bare_callable", "callable_any"]),
321 ("Untyped varargs", ["varargs_any"]),
322 ("Naked dict at boundary (use TypedDict)", ["boundary_dict"]),
323 ("Anonymous dict in collection (use TypedDict or named alias)", [
324 "concrete_dict_in_list", "dict_of_dict",
325 ]),
326 ]
327
328 # Directories that are never source code and must be skipped during scanning.
329 _SKIP_DIRS: frozenset[str] = frozenset({
330 "venv", ".venv", "env", ".env",
331 "__pycache__",
332 ".git", ".muse", ".mypy_cache", ".ruff_cache", ".pytest_cache", ".tox",
333 "dist", "build", "site-packages", "__pypackages__",
334 })
335
336
337 # ---------------------------------------------------------------------------
338 # Pattern helpers
339 # ---------------------------------------------------------------------------
340
341
342 def _count_pattern_in_line(line: str, pattern: re.Pattern[str]) -> int:
343 """Return the number of non-overlapping matches of *pattern* in *line*."""
344 return len(pattern.findall(line))
345
346
347 def _imports_any(source: str) -> bool:
348 """Return ``True`` if the source file imports ``Any`` from ``typing``
349 or ``typing_extensions``.
350
351 Excludes commented-out import lines (lines where ``from`` is preceded only
352 by ``#`` and optional whitespace).
353 """
354 return bool(re.search(
355 r"^[ \t]*from\s+typing(?:_extensions)?\s+import\s+.*\bAny\b",
356 source,
357 re.MULTILINE,
358 ))
359
360
361 def _classify_type_ignore(line: str) -> str:
362 """Classify the style of a ``# type: ignore`` comment.
363
364 Returns ``"type_ignore[code]"`` for code-specific ignores, or
365 ``"type_ignore[blanket]"`` for bare ``# type: ignore``.
366
367 Args:
368 line: A single source line that contains ``# type: ignore``.
369
370 Returns:
371 A string label for the variant.
372 """
373 m = re.search(r"#\s*type:\s*ignore\[([^\]]+)\]", line)
374 if m:
375 return f"type_ignore[{m.group(1)}]"
376 return "type_ignore[blanket]"
377
378
379 # ---------------------------------------------------------------------------
380 # AST-based detection
381 # ---------------------------------------------------------------------------
382
383
384 def _is_any_annotation(node: ast.expr | None) -> bool:
385 """Return ``True`` if *node* is the bare ``Any`` name."""
386 return isinstance(node, ast.Name) and node.id == "Any"
387
388
389 def _find_untyped_defs(source: str, filepath: str) -> list[UntypedDef]:
390 """Walk the AST and collect every function with a missing annotation.
391
392 Checks:
393
394 - Missing return type (``node.returns is None``).
395 - Missing parameter annotation (excluding ``self`` and ``cls``).
396 - ``*args`` annotated as ``Any`` **or** with no annotation at all.
397 - ``**kwargs`` annotated as ``Any`` **or** with no annotation at all.
398 - ``TypeVar(...)`` assignments with no ``bound=`` and no constraints.
399
400 Line numbers for parameter violations use the argument's own line number
401 (``arg.lineno``) rather than the function definition line, so the report
402 points directly at the problematic parameter.
403
404 Skips files that cannot be parsed.
405
406 Args:
407 source: Full source text of the file.
408 filepath: Path string used in the returned records.
409
410 Returns:
411 A list of :class:`UntypedDef` records, one per violation found.
412 """
413 results: list[UntypedDef] = []
414 try:
415 tree = ast.parse(source)
416 except SyntaxError:
417 return results
418
419 for node in ast.walk(tree):
420 if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
421 continue
422
423 if node.returns is None:
424 results.append(UntypedDef(
425 file=filepath,
426 line=node.lineno,
427 name=node.name,
428 issue="missing_return_type",
429 ))
430
431 all_args = (
432 node.args.args
433 + node.args.posonlyargs
434 + node.args.kwonlyargs
435 )
436 for arg in all_args:
437 if arg.arg in {"self", "cls"}:
438 continue
439 if arg.annotation is None:
440 results.append(UntypedDef(
441 file=filepath,
442 line=arg.lineno,
443 name=f"{node.name}.{arg.arg}",
444 issue="missing_param_type",
445 ))
446
447 vararg = node.args.vararg
448 if vararg is not None:
449 if vararg.annotation is None or _is_any_annotation(vararg.annotation):
450 results.append(UntypedDef(
451 file=filepath,
452 line=vararg.lineno,
453 name=f"{node.name}.*{vararg.arg}",
454 issue="untyped_args",
455 ))
456
457 kwarg = node.args.kwarg
458 if kwarg is not None:
459 if kwarg.annotation is None or _is_any_annotation(kwarg.annotation):
460 results.append(UntypedDef(
461 file=filepath,
462 line=kwarg.lineno,
463 name=f"{node.name}.**{kwarg.arg}",
464 issue="untyped_kwargs",
465 ))
466
467 # TypeVar without constraints or bound β€” behaves identically to Any.
468 results.extend(_find_unconstrained_typevars(tree, filepath))
469
470 return results
471
472
473 def _find_unconstrained_typevars(tree: ast.Module, filepath: str) -> list[UntypedDef]:
474 """Return a record for every ``TypeVar(...)`` with no bound or constraints.
475
476 A bare ``T = TypeVar("T")`` is semantically equivalent to ``T: Any``.
477 The Rust port requires every generic to carry an explicit trait bound.
478
479 Args:
480 tree: Parsed AST of the file.
481 filepath: Path string used in the returned records.
482
483 Returns:
484 A list of :class:`UntypedDef` records for unconstrained ``TypeVar``
485 definitions.
486 """
487 results: list[UntypedDef] = []
488 for node in ast.walk(tree):
489 # Match: T = TypeVar("T") or T = TypeVar("T", bound=...)
490 if not isinstance(node, ast.Assign):
491 continue
492 value = node.value
493 if not isinstance(value, ast.Call):
494 continue
495 func = value.func
496 if not (isinstance(func, ast.Name) and func.id == "TypeVar"):
497 continue
498 # A TypeVar is constrained when it has:
499 # - positional args beyond the name (constraint types), OR
500 # - a keyword arg named "bound"
501 extra_args = value.args[1:] # args[0] is the name string
502 kw_names = {kw.arg for kw in value.keywords}
503 if extra_args or "bound" in kw_names:
504 continue # constrained β€” OK
505 # Unconstrained TypeVar.
506 target_name = (
507 node.targets[0].id
508 if isinstance(node.targets[0], ast.Name)
509 else "<TypeVar>"
510 )
511 results.append(UntypedDef(
512 file=filepath,
513 line=node.lineno,
514 name=target_name,
515 issue="unconstrained_typevar",
516 ))
517 return results
518
519
520 # ---------------------------------------------------------------------------
521 # File and directory scanner
522 # ---------------------------------------------------------------------------
523
524
525 def scan_file(filepath: Path) -> FileResult | None:
526 """Scan a single Python file and return its violation summary.
527
528 String literals are masked before pattern matching so that raw regex
529 strings and docstring prose never produce false positives. The
530 ``# type: ignore`` check runs on the *original* source because those
531 comments are not string literals.
532
533 Returns ``None`` when the file cannot be read (I/O or encoding error).
534
535 Args:
536 filepath: Absolute or relative path to the Python file.
537
538 Returns:
539 A :class:`FileResult` on success, ``None`` on I/O failure.
540 """
541 try:
542 source = filepath.read_text(encoding="utf-8")
543 except (OSError, UnicodeDecodeError):
544 return None
545
546 masked = _mask_string_literals(source)
547
548 original_lines = source.splitlines()
549 masked_lines = masked.splitlines()
550
551 patterns: defaultdict[str, int] = defaultdict(int)
552 pattern_lines: defaultdict[str, list[int]] = defaultdict(list)
553 type_ignore_variants: defaultdict[str, int] = defaultdict(int)
554
555 for lineno, (orig_line, masked_line) in enumerate(
556 zip(original_lines, masked_lines), 1
557 ):
558 stripped = masked_line.strip()
559 if not stripped or stripped.startswith("#"):
560 continue
561
562 for name, pattern in _PATTERNS.items():
563 # All patterns run on the masked line β€” string literals are blanked
564 # so raw regex strings and docstring prose never trigger false
565 # positives. Comments are NOT masked (they are not string tokens)
566 # so "# type: ignore" on real code lines is still detected.
567 #
568 # Exception: dunder methods legitimately use `: object` (e.g.
569 # `__eq__(self, other: object)`, `__contains__(self, item: object)`).
570 # Skip param_object/return_object for those signatures.
571 if name in {"param_object", "return_object"}:
572 if re.search(r"def\s+__\w+__\s*\(", masked_line):
573 continue
574 # boundary_dict fires on `: dict[str,` and `-> dict[str,`.
575 # Local variable annotations (e.g. ``x: dict[str, int] = {}``) are
576 # NOT boundaries β€” only function parameter and return annotations are.
577 # `: dict[str,` on a non-def line is a local variable; skip it.
578 if name == "boundary_dict" and ": dict[str," in masked_line:
579 if not re.search(r"\bdef\b", masked_line):
580 continue
581 count = _count_pattern_in_line(masked_line, pattern)
582 if count > 0:
583 patterns[name] += count
584 pattern_lines[name].append(lineno)
585
586 if name == "type_ignore":
587 # Classify against the original line so we can distinguish
588 # blanket ignores from code-specific ones.
589 variant = _classify_type_ignore(orig_line)
590 type_ignore_variants[variant] += 1
591
592 return FileResult(
593 file=str(filepath),
594 imports_any=_imports_any(source),
595 patterns=dict(patterns),
596 pattern_lines=dict(pattern_lines),
597 type_ignore_variants=dict(type_ignore_variants),
598 untyped_defs=_find_untyped_defs(source, str(filepath)),
599 )
600
601
602 def scan_directory(directory: Path) -> list[FileResult]:
603 """Recursively scan all Python files in *directory*.
604
605 Skips virtual environments, caches, build artefacts, and VCS/tool
606 metadata directories (see ``_SKIP_DIRS``).
607
608 Args:
609 directory: Root of the directory tree to scan.
610
611 Returns:
612 A list of :class:`FileResult` objects, one per successfully scanned file.
613 """
614 results: list[FileResult] = []
615 for py_file in sorted(directory.rglob("*.py")):
616 if any(part in _SKIP_DIRS for part in py_file.parts):
617 continue
618 file_result = scan_file(py_file)
619 if file_result is not None:
620 results.append(file_result)
621 return results
622
623
624 # ---------------------------------------------------------------------------
625 # Report generation
626 # ---------------------------------------------------------------------------
627
628
629 def _offender_sort_key(entry: Offender) -> int:
630 """Return the sort key for an :class:`Offender` (total violation count)."""
631 return entry["total"]
632
633
634 def generate_report(results: list[FileResult]) -> Report:
635 """Aggregate per-file scan results into a :class:`Report`.
636
637 Args:
638 results: List of :class:`FileResult` objects from :func:`scan_file`
639 or :func:`scan_directory`.
640
641 Returns:
642 A :class:`Report` ready for human display or JSON serialisation.
643 """
644 totals: defaultdict[str, int] = defaultdict(int)
645 files_with_any_import = 0
646 per_file: PerFileViolations = {}
647 top_offenders: list[Offender] = []
648 all_type_ignore_variants: defaultdict[str, int] = defaultdict(int)
649 all_untyped_defs: list[UntypedDef] = []
650 all_violations: list[Violation] = []
651
652 for r in results:
653 filepath = r["file"]
654 if r["imports_any"]:
655 files_with_any_import += 1
656
657 file_total = 0
658 file_patterns: PatternCounts = {}
659 for pattern, count in r["patterns"].items():
660 totals[pattern] += count
661 file_patterns[pattern] = count
662 file_total += count
663 for lineno in r["pattern_lines"].get(pattern, []):
664 all_violations.append(Violation(file=filepath, line=lineno, kind=pattern))
665
666 if file_total > 0:
667 per_file[filepath] = file_patterns
668 top_offenders.append(Offender(
669 file=filepath,
670 total=file_total,
671 patterns=file_patterns,
672 ))
673
674 for variant, count in r["type_ignore_variants"].items():
675 all_type_ignore_variants[variant] += count
676
677 all_untyped_defs.extend(r["untyped_defs"])
678
679 all_violations.sort(key=lambda v: (v["file"], v["line"]))
680 top_offenders.sort(key=_offender_sort_key, reverse=True)
681
682 return Report(
683 summary=ReportSummary(
684 total_files_scanned=len(results),
685 files_importing_any=files_with_any_import,
686 total_any_patterns=sum(totals.values()),
687 untyped_defs=len(all_untyped_defs),
688 ),
689 pattern_totals=dict(totals),
690 type_ignore_variants=dict(all_type_ignore_variants),
691 # Store all offenders in JSON; display is capped separately in the
692 # human-readable printer.
693 top_offenders=top_offenders,
694 per_file=per_file,
695 violations=all_violations,
696 # Store the full list β€” callers that need all records can use --json.
697 untyped_defs=all_untyped_defs,
698 )
699
700
701 # ---------------------------------------------------------------------------
702 # Human-readable report printer
703 # ---------------------------------------------------------------------------
704
705
706 def print_human_summary(report: Report, top_n: int = 15) -> None:
707 """Print a formatted, human-readable summary of *report* to stdout.
708
709 Args:
710 report: A :class:`Report` produced by :func:`generate_report`.
711 top_n: How many offenders to display in the top-offenders list.
712 """
713 s = report["summary"]
714 totals = report["pattern_totals"]
715
716 print("\n" + "=" * 70)
717 print(" TYPING AUDIT β€” Violation Report")
718 print("=" * 70)
719 print(f" Files scanned: {s['total_files_scanned']}")
720 print(f" Files importing Any: {s['files_importing_any']}")
721 print(f" Total violations: {s['total_any_patterns']}")
722 print(f" Untyped defs: {s['untyped_defs']}")
723 print()
724
725 has_violations = False
726 for category, pattern_names in _CATEGORY_ORDER:
727 category_total = sum(totals.get(p, 0) for p in pattern_names)
728 if category_total == 0:
729 continue
730 has_violations = True
731 print(f" {category}:")
732 for p in pattern_names:
733 count = totals.get(p, 0)
734 if count > 0:
735 print(f" {p:38s} {count:5d}")
736 print()
737
738 if not has_violations:
739 print(" Pattern breakdown: (none)")
740 print()
741
742 if report["type_ignore_variants"]:
743 print(" # type: ignore variants:")
744 for variant, count in sorted(
745 report["type_ignore_variants"].items(),
746 key=operator.itemgetter(1),
747 reverse=True,
748 ):
749 print(f" {variant:44s} {count:5d}")
750 print()
751
752 if report["violations"]:
753 print(" Violations (file:line [kind]):")
754 for v in report["violations"]:
755 print(f" {v['file']}:{v['line']} [{v['kind']}]")
756 print()
757
758 print(f" Top {top_n} offenders:")
759 for entry in report["top_offenders"][:top_n]:
760 print(f" {entry['total']:4d} {entry['file']}")
761 print("=" * 70 + "\n")
762
763
764 # ---------------------------------------------------------------------------
765 # CLI
766 # ---------------------------------------------------------------------------
767
768
769 def main() -> None:
770 """Entry point: parse CLI flags, run the scan, and enforce the ratchet.
771
772 Scans the specified directories (or individual files), prints a human
773 summary, optionally writes a JSON report, and exits non-zero when either
774 the pattern violation count exceeds ``--max-any`` or the untyped-def
775 count exceeds ``--max-untyped``.
776 """
777 parser = argparse.ArgumentParser(
778 description=(
779 "Audit typing violations: Any, object, cast, bare collections, "
780 "Optional/Union (legacy), Callable without signature, untyped "
781 "varargs, type: ignore, untyped defs, unconstrained TypeVars."
782 ),
783 )
784 parser.add_argument(
785 "--dirs",
786 nargs="+",
787 default=["muse/", "tests/"],
788 help="Directories or individual .py files to scan. Default: muse/ tests/",
789 )
790 parser.add_argument(
791 "--json",
792 type=str,
793 metavar="PATH",
794 help="Write the JSON report to PATH.",
795 )
796 parser.add_argument(
797 "--max-any",
798 type=int,
799 default=None,
800 metavar="N",
801 help="Exit non-zero if total pattern violations exceed N (ratchet mode).",
802 )
803 parser.add_argument(
804 "--max-untyped",
805 type=int,
806 default=None,
807 metavar="N",
808 help="Exit non-zero if total untyped-def count exceeds N (ratchet mode).",
809 )
810 parser.add_argument(
811 "--top-n",
812 type=int,
813 default=15,
814 metavar="N",
815 help="Number of offenders to display in the human summary. Default: 15.",
816 )
817 args = parser.parse_args()
818
819 all_results: list[FileResult] = []
820 for d in args.dirs:
821 p = Path(d)
822 if p.is_file() and p.suffix == ".py":
823 result = scan_file(p)
824 if result is not None:
825 all_results.append(result)
826 elif p.is_dir():
827 all_results.extend(scan_directory(p))
828 else:
829 print(f"WARNING: {d} does not exist, skipping", file=sys.stderr)
830
831 report = generate_report(all_results)
832 print_human_summary(report, top_n=args.top_n)
833
834 if args.json:
835 out = Path(args.json)
836 out.parent.mkdir(parents=True, exist_ok=True)
837 out.write_text(json.dumps(report, indent=2), encoding="utf-8")
838 print(f" JSON report written to {args.json}")
839
840 failed = False
841
842 if args.max_any is not None:
843 total = report["summary"]["total_any_patterns"]
844 if total > args.max_any:
845 print(
846 f"\n❌ RATCHET FAILED (patterns): {total} violations exceed "
847 f"threshold of {args.max_any}",
848 file=sys.stderr,
849 )
850 failed = True
851 else:
852 print(
853 f"\nβœ… RATCHET OK (patterns): {total} violations within "
854 f"threshold of {args.max_any}",
855 )
856
857 if args.max_untyped is not None:
858 untyped = report["summary"]["untyped_defs"]
859 if untyped > args.max_untyped:
860 print(
861 f"\n❌ RATCHET FAILED (untyped defs): {untyped} exceed "
862 f"threshold of {args.max_untyped}",
863 file=sys.stderr,
864 )
865 failed = True
866 else:
867 print(
868 f"\nβœ… RATCHET OK (untyped defs): {untyped} within "
869 f"threshold of {args.max_untyped}",
870 )
871
872 if failed:
873 sys.exit(1)
874
875
876 if __name__ == "__main__":
877 main()