typing_audit.py
file-level
1
files
1
commits
0
hotspots
0
π§ dead
0
π₯ blast risk
| 1 | """Typing audit β zero-tolerance type-safety enforcement for mission-critical code. |
| 2 | |
| 3 | Every banned pattern maps to a future Rust port liability: if Python cannot |
| 4 | name a type, ``rustc`` cannot either. The ratchet keeps the rule enforced |
| 5 | continuously so violations never accumulate. |
| 6 | |
| 7 | Patterns checked |
| 8 | ---------------- |
| 9 | *Any-as-type* β ``dict[str, Any]``, ``list[Any]``, ``type[Any]``, |
| 10 | ``Any | X``, ``X | Any``, ``Mapping[str, Any]``, etc. |
| 11 | |
| 12 | *object-as-type* β same severity as Any; erases all structural information. |
| 13 | |
| 14 | *cast()* β all usage banned; it conceals a broken callee return type. |
| 15 | |
| 16 | *# type: ignore* β every suppressed error is an unaudited assumption. |
| 17 | |
| 18 | *Bare collections* β ``list``, ``dict``, ``set``, ``tuple`` without ``[T]``. |
| 19 | |
| 20 | *Optional[X]* and *Union[X, Y]* β use ``X | None`` and ``X | Y`` (PEP 604). |
| 21 | |
| 22 | *Legacy typing imports* β ``List``, ``Dict``, ``Set``, ``Tuple``. |
| 23 | |
| 24 | *Bare Callable / Callable returning Any* β must carry a full signature. |
| 25 | |
| 26 | *Untyped varargs* β ``*args: Any``, ``**kwargs: Any``, and unannotated |
| 27 | ``*args`` / ``**kwargs`` (annotation absent entirely). |
| 28 | |
| 29 | *Untyped function definitions* β missing return or parameter annotation. |
| 30 | |
| 31 | *Unconstrained TypeVar* β ``TypeVar(...)`` with no ``bound=`` and no |
| 32 | constraint arguments; behaves identically to ``Any`` in practice. |
| 33 | |
| 34 | *Naked dict at boundary* β ``dict[str, X]`` as a parameter or return type |
| 35 | is banned at function/method boundaries. Every dict with known keys must |
| 36 | be a ``TypedDict``; every dict with dynamic keys must justify its key space. |
| 37 | The only valid ``dict[str, ...]`` at a boundary is an explicitly named |
| 38 | ``TypedDict`` subclass. This rule exists because ``rustc`` cannot infer |
| 39 | struct fields from a ``HashMap<String, X>`` β named fields must be declared. |
| 40 | Pattern ``boundary_dict`` fires on ``: dict[str,`` and ``-> dict[str,``. |
| 41 | |
| 42 | *Anonymous dict in collection* β ``list[dict[str, X]]``, ``dict[str, dict[str, X]]``, |
| 43 | ``tuple[dict[str, X], ...]``. An anonymous dict nested inside a collection is |
| 44 | always a named struct waiting to be declared. Use a ``TypedDict`` subclass or a |
| 45 | named type alias (e.g. ``list[JSONObject]``, ``list[SymbolHistoryEntry]``). |
| 46 | |
| 47 | Named type aliases do NOT trigger this rule β only the literal expansion does. |
| 48 | This is by design: ``list[JSONObject]`` is fine; ``list[dict[str, JSONValue]]`` is not. |
| 49 | Rust requires every struct field to be named; ``Vec<HashMap<String, Value>>`` is |
| 50 | never the right answer when ``Vec<SymbolEntry>`` is possible. |
| 51 | |
| 52 | ``concrete_dict_in_list`` β fires on ``list[dict[str,``, ``tuple[dict[str,``, |
| 53 | ``set[dict[str,`` |
| 54 | ``dict_of_dict`` β fires on ``dict[str, dict[str,`` |
| 55 | |
| 56 | Usage:: |
| 57 | |
| 58 | python tools/typing_audit.py # musehub/ + tests/ |
| 59 | python tools/typing_audit.py --dirs musehub/ tests/ |
| 60 | python tools/typing_audit.py --dirs musehub/ --max-any 0 --max-untyped 0 |
| 61 | python tools/typing_audit.py --json artifacts/typing_audit.json |
| 62 | """ |
| 63 | |
| 64 | from __future__ import annotations |
| 65 | |
| 66 | import argparse |
| 67 | import ast |
| 68 | import io |
| 69 | import json |
| 70 | import operator |
| 71 | import re |
| 72 | import sys |
| 73 | import tokenize |
| 74 | from collections import defaultdict |
| 75 | from pathlib import Path |
| 76 | from typing import TypedDict |
| 77 | |
| 78 | # --------------------------------------------------------------------------- |
| 79 | # Type aliases β avoid dict[str, X] at function/class-field boundaries. |
| 80 | # --------------------------------------------------------------------------- |
| 81 | |
| 82 | type PatternCounts = dict[str, int] |
| 83 | type PatternLines = dict[str, list[int]] |
| 84 | type PatternMap = dict[str, re.Pattern[str]] |
| 85 | type PerFileViolations = dict[str, PatternCounts] |
| 86 | |
| 87 | |
| 88 | class Violation(TypedDict): |
| 89 | """A single typed violation β one pattern match at one source location.""" |
| 90 | |
| 91 | file: str |
| 92 | line: int |
| 93 | kind: str |
| 94 | |
| 95 | # --------------------------------------------------------------------------- |
| 96 | # Data shapes β TypedDicts replace every dict[str, Any] in the old script. |
| 97 | # All shapes mirror the Rust struct that will eventually own them. |
| 98 | # --------------------------------------------------------------------------- |
| 99 | |
| 100 | |
| 101 | class UntypedDef(TypedDict): |
| 102 | """A function or method that is missing a required type annotation. |
| 103 | |
| 104 | ``issue`` is one of: |
| 105 | |
| 106 | - ``"missing_return_type"`` β no return annotation. |
| 107 | - ``"missing_param_type"`` β a non-self/cls parameter lacks annotation. |
| 108 | - ``"untyped_args"`` β ``*args`` is annotated as ``Any`` or has |
| 109 | no annotation at all. |
| 110 | - ``"untyped_kwargs"`` β ``**kwargs`` is annotated as ``Any`` or has |
| 111 | no annotation at all. |
| 112 | - ``"unconstrained_typevar"``β a ``TypeVar`` with no ``bound=`` and no |
| 113 | positional constraints. |
| 114 | """ |
| 115 | |
| 116 | file: str |
| 117 | line: int |
| 118 | name: str |
| 119 | issue: str |
| 120 | |
| 121 | |
| 122 | class FileResult(TypedDict): |
| 123 | """Typing-violation summary for a single Python source file.""" |
| 124 | |
| 125 | file: str |
| 126 | imports_any: bool |
| 127 | patterns: PatternCounts |
| 128 | pattern_lines: PatternLines |
| 129 | type_ignore_variants: PatternCounts |
| 130 | untyped_defs: list[UntypedDef] |
| 131 | |
| 132 | |
| 133 | class Offender(TypedDict): |
| 134 | """A file with at least one typing violation, ranked by total count.""" |
| 135 | |
| 136 | file: str |
| 137 | total: int |
| 138 | patterns: PatternCounts |
| 139 | |
| 140 | |
| 141 | class ReportSummary(TypedDict): |
| 142 | """High-level aggregate counts for the entire scan.""" |
| 143 | |
| 144 | total_files_scanned: int |
| 145 | files_importing_any: int |
| 146 | total_any_patterns: int |
| 147 | untyped_defs: int |
| 148 | |
| 149 | |
| 150 | class Report(TypedDict): |
| 151 | """Full typing-audit report produced by :func:`generate_report`.""" |
| 152 | |
| 153 | summary: ReportSummary |
| 154 | pattern_totals: PatternCounts |
| 155 | type_ignore_variants: PatternCounts |
| 156 | top_offenders: list[Offender] |
| 157 | per_file: PerFileViolations |
| 158 | violations: list[Violation] |
| 159 | untyped_defs: list[UntypedDef] |
| 160 | |
| 161 | |
| 162 | # --------------------------------------------------------------------------- |
| 163 | # String-literal masking |
| 164 | # --------------------------------------------------------------------------- |
| 165 | |
| 166 | |
| 167 | def _mask_string_literals(source: str) -> str: |
| 168 | """Replace string-literal content with spaces, preserving newlines. |
| 169 | |
| 170 | Pattern matching runs on the masked source so that raw regex strings, |
| 171 | docstrings, and string constants never produce false positives. All |
| 172 | newlines are preserved so that line numbers stay accurate. |
| 173 | |
| 174 | Tokenisation errors (e.g. incomplete source snippets) are silently |
| 175 | ignored β the original source is returned unchanged so the caller still |
| 176 | produces *some* output rather than silently dropping the file. |
| 177 | |
| 178 | Args: |
| 179 | source: Full UTF-8 source text of a Python file. |
| 180 | |
| 181 | Returns: |
| 182 | A copy of *source* with the content of every string token replaced |
| 183 | by space characters (newlines within multi-line strings preserved). |
| 184 | """ |
| 185 | chars = list(source) |
| 186 | lines = source.splitlines(keepends=True) |
| 187 | |
| 188 | # Pre-compute cumulative line offsets for O(1) (row, col) β offset. |
| 189 | offsets: list[int] = [0] |
| 190 | for ln in lines: |
| 191 | offsets.append(offsets[-1] + len(ln)) |
| 192 | |
| 193 | def _abs(row: int, col: int) -> int: |
| 194 | return offsets[row - 1] + col |
| 195 | |
| 196 | # Token types that contain string literal content β including f-string |
| 197 | # middle segments which are FSTRING_MIDDLE (not STRING) in Python 3.12+. |
| 198 | _FSTRING_MIDDLE = getattr(tokenize, "FSTRING_MIDDLE", None) |
| 199 | _STRING_TYPES = {tokenize.STRING} |
| 200 | if _FSTRING_MIDDLE is not None: |
| 201 | _STRING_TYPES.add(_FSTRING_MIDDLE) |
| 202 | |
| 203 | try: |
| 204 | gen = tokenize.generate_tokens(io.StringIO(source).readline) |
| 205 | for tok_type, _tok_str, (srow, scol), (erow, ecol), _ in gen: |
| 206 | if tok_type not in _STRING_TYPES: |
| 207 | continue |
| 208 | start = _abs(srow, scol) |
| 209 | end = _abs(erow, ecol) |
| 210 | for i in range(start, end): |
| 211 | if chars[i] not in {"\n", "\r"}: |
| 212 | chars[i] = " " |
| 213 | except tokenize.TokenError: |
| 214 | pass |
| 215 | |
| 216 | return "".join(chars) |
| 217 | |
| 218 | |
| 219 | # --------------------------------------------------------------------------- |
| 220 | # Pattern registry |
| 221 | # --------------------------------------------------------------------------- |
| 222 | |
| 223 | #: All patterns that count toward the violation total. |
| 224 | #: Keys are stable identifiers used in JSON output and tests. |
| 225 | #: |
| 226 | #: NOTE: do NOT use re.IGNORECASE β Python type annotations are case-sensitive. |
| 227 | #: ``List`` and ``list`` are distinct identifiers; matching ``list[any]`` |
| 228 | #: (where ``any`` is the built-in function) would be a false positive. |
| 229 | _PATTERNS: PatternMap = { |
| 230 | # Any-as-type βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 231 | "dict_str_any": re.compile(r"\bdict\[str,\s*Any\]|\bDict\[str,\s*Any\]"), |
| 232 | "list_any": re.compile(r"\blist\[Any\]|\bList\[Any\]"), |
| 233 | "type_any": re.compile(r"\btype\[Any\]"), |
| 234 | "any_in_union": re.compile(r"\bAny\s*\||\|\s*Any\b"), |
| 235 | "return_any": re.compile(r"->\s*Any\b"), |
| 236 | "param_any": re.compile(r":\s*Any\b"), |
| 237 | "mapping_any": re.compile(r"\bMapping\[str,\s*Any\]"), |
| 238 | "optional_any": re.compile(r"\bOptional\[Any\]"), |
| 239 | "sequence_any": re.compile(r"\bSequence\[Any\]|\bIterable\[Any\]"), |
| 240 | "tuple_any": re.compile(r"\btuple\[[^\n]*Any[^\n]*\]|\bTuple\[[^\n]*Any[^\n]*\]"), |
| 241 | # object-as-type ββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 242 | "param_object": re.compile(r":\s*object\b"), |
| 243 | "return_object": re.compile(r"->\s*object\b"), |
| 244 | # Handles one level of nesting, e.g. dict[str, list[object]]. |
| 245 | # NOTE: Mapping is intentionally excluded β Mapping[str, object] is the |
| 246 | # correct type for read-only, covariant mappings at framework boundaries |
| 247 | # (e.g. Jinja2 template contexts). Mapping[str, Any] is caught separately |
| 248 | # by mapping_any. Only mutable collection types need this guard. |
| 249 | "collection_object": re.compile( |
| 250 | r"\b(?:dict|list|set|tuple|Sequence)" |
| 251 | r"\[[^\n\[\]]*(?:\[[^\n\[\]]*\][^\n\[\]]*)*\bobject\b" |
| 252 | ), |
| 253 | # cast() β banned βββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 254 | "cast_usage": re.compile(r"(?<![.\w])cast\("), |
| 255 | # type: ignore β only flag blanket suppresses (no specific error code). |
| 256 | # ``# type: ignore[some-code]`` is acceptable when the exact issue is known; |
| 257 | # ``# type: ignore`` with no code is a blind suppression and always banned. |
| 258 | "type_ignore": re.compile(r"#\s*type:\s*ignore(?!\s*\[)"), |
| 259 | # Bare collections (no type parameters) βββββββββββββββββββββββββββββββ |
| 260 | # Negative lookaheads exclude parameterised forms and prose. |
| 261 | "bare_list": re.compile(r"(?::\s*|->\s*)list\b(?!\[|\(|\s+[a-z])"), |
| 262 | "bare_dict": re.compile(r"(?::\s*|->\s*)dict\b(?!\[|\(|\s+[a-z])"), |
| 263 | "bare_set": re.compile(r"(?::\s*|->\s*)set\b(?!\[|\(|\s+[a-z])"), |
| 264 | "bare_tuple": re.compile(r"(?::\s*|->\s*)tuple\b(?!\[|\(|\s+[a-z])"), |
| 265 | # Optional[X] β use X | None (PEP 604) ββββββββββββββββββββββββββββββββ |
| 266 | "optional_usage": re.compile(r"\bOptional\[(?!Any\b)"), |
| 267 | # Union[X, Y] β use X | Y (PEP 604) ββββββββββββββββββββββββββββββββββ |
| 268 | "union_usage": re.compile(r"\bUnion\["), |
| 269 | # Legacy typing imports (use lowercase builtins) ββββββββββββββββββββββ |
| 270 | "legacy_List": re.compile(r"\bList\["), |
| 271 | "legacy_Dict": re.compile(r"\bDict\["), |
| 272 | "legacy_Set": re.compile(r"\bSet\["), |
| 273 | "legacy_Tuple": re.compile(r"\bTuple\["), |
| 274 | # Callable β must carry full signature ββββββββββββββββββββββββββββββββ |
| 275 | "bare_callable": re.compile(r"(?::\s*|->\s*)Callable\b(?!\[)"), |
| 276 | "callable_any": re.compile(r"\bCallable\[[^\n]*,\s*Any\s*\]"), |
| 277 | # Untyped varargs β *args: Any / **kwargs: Any ββββββββββββββββββββββββ |
| 278 | # Unannotated *args/**kwargs are caught by the AST walker instead. |
| 279 | "varargs_any": re.compile(r"\*{1,2}\w+:\s*Any\b"), |
| 280 | # Naked dict at boundary β dict[str, X] as param/return type is banned. |
| 281 | # Every structured boundary must use a TypedDict (or dataclass/enum). |
| 282 | # Matches ": dict[str," and "-> dict[str," β the two annotation positions. |
| 283 | # |
| 284 | # APPROVED alternatives at boundaries: |
| 285 | # - ReadOnlyJSONObject (= Mapping[str, JSONValue]) for read-only JSON params |
| 286 | # - A named TypedDict subclass for any dict with statically known keys |
| 287 | # |
| 288 | # Mapping[str, JSONValue] is covariant so any dict[str, T where T β JSONValue] |
| 289 | # is assignable to it. This pattern (boundary_dict) does NOT fire on |
| 290 | # Mapping[...]; mapping_any does NOT fire on Mapping[str, JSONValue]. |
| 291 | # Therefore Mapping[str, JSONValue] is the safe boundary form for JSON dicts. |
| 292 | "boundary_dict": re.compile(r"(?::\s*|->\s*)dict\[str\s*,"), |
| 293 | # Anonymous dict in collection β list[dict[str, X]] / dict[str, dict[str, X]]. |
| 294 | # A dict nested inside a collection is always a named struct opportunity. |
| 295 | # Use a TypedDict subclass or a named type alias (e.g. list[JSONObject]). |
| 296 | # Named aliases do NOT trigger this rule β only the literal expansion does. |
| 297 | # This is intentional: list[JSONObject] is fine; list[dict[str, JSONValue]] is not. |
| 298 | "concrete_dict_in_list": re.compile( |
| 299 | r"\b(?:list|tuple|set)\[dict\[str," |
| 300 | ), |
| 301 | "dict_of_dict": re.compile( |
| 302 | r"\bdict\[str,\s*dict\[str," |
| 303 | ), |
| 304 | } |
| 305 | |
| 306 | # Category groupings for the human-readable report, in display order. |
| 307 | _CATEGORY_ORDER: list[tuple[str, list[str]]] = [ |
| 308 | ("Any-as-type", [ |
| 309 | "dict_str_any", "list_any", "type_any", "any_in_union", |
| 310 | "return_any", "param_any", |
| 311 | "mapping_any", "optional_any", "sequence_any", "tuple_any", |
| 312 | ]), |
| 313 | ("object-as-type", ["param_object", "return_object", "collection_object"]), |
| 314 | ("cast() usage", ["cast_usage"]), |
| 315 | ("type: ignore", ["type_ignore"]), |
| 316 | ("Bare collections", ["bare_list", "bare_dict", "bare_set", "bare_tuple"]), |
| 317 | ("Optional (use X | None)", ["optional_usage"]), |
| 318 | ("Union (use X | Y)", ["union_usage"]), |
| 319 | ("Legacy typing imports", ["legacy_List", "legacy_Dict", "legacy_Set", "legacy_Tuple"]), |
| 320 | ("Callable (must carry full signature)", ["bare_callable", "callable_any"]), |
| 321 | ("Untyped varargs", ["varargs_any"]), |
| 322 | ("Naked dict at boundary (use TypedDict)", ["boundary_dict"]), |
| 323 | ("Anonymous dict in collection (use TypedDict or named alias)", [ |
| 324 | "concrete_dict_in_list", "dict_of_dict", |
| 325 | ]), |
| 326 | ] |
| 327 | |
| 328 | # Directories that are never source code and must be skipped during scanning. |
| 329 | _SKIP_DIRS: frozenset[str] = frozenset({ |
| 330 | "venv", ".venv", "env", ".env", |
| 331 | "__pycache__", |
| 332 | ".git", ".muse", ".mypy_cache", ".ruff_cache", ".pytest_cache", ".tox", |
| 333 | "dist", "build", "site-packages", "__pypackages__", |
| 334 | }) |
| 335 | |
| 336 | |
| 337 | # --------------------------------------------------------------------------- |
| 338 | # Pattern helpers |
| 339 | # --------------------------------------------------------------------------- |
| 340 | |
| 341 | |
| 342 | def _count_pattern_in_line(line: str, pattern: re.Pattern[str]) -> int: |
| 343 | """Return the number of non-overlapping matches of *pattern* in *line*.""" |
| 344 | return len(pattern.findall(line)) |
| 345 | |
| 346 | |
| 347 | def _imports_any(source: str) -> bool: |
| 348 | """Return ``True`` if the source file imports ``Any`` from ``typing`` |
| 349 | or ``typing_extensions``. |
| 350 | |
| 351 | Excludes commented-out import lines (lines where ``from`` is preceded only |
| 352 | by ``#`` and optional whitespace). |
| 353 | """ |
| 354 | return bool(re.search( |
| 355 | r"^[ \t]*from\s+typing(?:_extensions)?\s+import\s+.*\bAny\b", |
| 356 | source, |
| 357 | re.MULTILINE, |
| 358 | )) |
| 359 | |
| 360 | |
| 361 | def _classify_type_ignore(line: str) -> str: |
| 362 | """Classify the style of a ``# type: ignore`` comment. |
| 363 | |
| 364 | Returns ``"type_ignore[code]"`` for code-specific ignores, or |
| 365 | ``"type_ignore[blanket]"`` for bare ``# type: ignore``. |
| 366 | |
| 367 | Args: |
| 368 | line: A single source line that contains ``# type: ignore``. |
| 369 | |
| 370 | Returns: |
| 371 | A string label for the variant. |
| 372 | """ |
| 373 | m = re.search(r"#\s*type:\s*ignore\[([^\]]+)\]", line) |
| 374 | if m: |
| 375 | return f"type_ignore[{m.group(1)}]" |
| 376 | return "type_ignore[blanket]" |
| 377 | |
| 378 | |
| 379 | # --------------------------------------------------------------------------- |
| 380 | # AST-based detection |
| 381 | # --------------------------------------------------------------------------- |
| 382 | |
| 383 | |
| 384 | def _is_any_annotation(node: ast.expr | None) -> bool: |
| 385 | """Return ``True`` if *node* is the bare ``Any`` name.""" |
| 386 | return isinstance(node, ast.Name) and node.id == "Any" |
| 387 | |
| 388 | |
| 389 | def _find_untyped_defs(source: str, filepath: str) -> list[UntypedDef]: |
| 390 | """Walk the AST and collect every function with a missing annotation. |
| 391 | |
| 392 | Checks: |
| 393 | |
| 394 | - Missing return type (``node.returns is None``). |
| 395 | - Missing parameter annotation (excluding ``self`` and ``cls``). |
| 396 | - ``*args`` annotated as ``Any`` **or** with no annotation at all. |
| 397 | - ``**kwargs`` annotated as ``Any`` **or** with no annotation at all. |
| 398 | - ``TypeVar(...)`` assignments with no ``bound=`` and no constraints. |
| 399 | |
| 400 | Line numbers for parameter violations use the argument's own line number |
| 401 | (``arg.lineno``) rather than the function definition line, so the report |
| 402 | points directly at the problematic parameter. |
| 403 | |
| 404 | Skips files that cannot be parsed. |
| 405 | |
| 406 | Args: |
| 407 | source: Full source text of the file. |
| 408 | filepath: Path string used in the returned records. |
| 409 | |
| 410 | Returns: |
| 411 | A list of :class:`UntypedDef` records, one per violation found. |
| 412 | """ |
| 413 | results: list[UntypedDef] = [] |
| 414 | try: |
| 415 | tree = ast.parse(source) |
| 416 | except SyntaxError: |
| 417 | return results |
| 418 | |
| 419 | for node in ast.walk(tree): |
| 420 | if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): |
| 421 | continue |
| 422 | |
| 423 | if node.returns is None: |
| 424 | results.append(UntypedDef( |
| 425 | file=filepath, |
| 426 | line=node.lineno, |
| 427 | name=node.name, |
| 428 | issue="missing_return_type", |
| 429 | )) |
| 430 | |
| 431 | all_args = ( |
| 432 | node.args.args |
| 433 | + node.args.posonlyargs |
| 434 | + node.args.kwonlyargs |
| 435 | ) |
| 436 | for arg in all_args: |
| 437 | if arg.arg in {"self", "cls"}: |
| 438 | continue |
| 439 | if arg.annotation is None: |
| 440 | results.append(UntypedDef( |
| 441 | file=filepath, |
| 442 | line=arg.lineno, |
| 443 | name=f"{node.name}.{arg.arg}", |
| 444 | issue="missing_param_type", |
| 445 | )) |
| 446 | |
| 447 | vararg = node.args.vararg |
| 448 | if vararg is not None: |
| 449 | if vararg.annotation is None or _is_any_annotation(vararg.annotation): |
| 450 | results.append(UntypedDef( |
| 451 | file=filepath, |
| 452 | line=vararg.lineno, |
| 453 | name=f"{node.name}.*{vararg.arg}", |
| 454 | issue="untyped_args", |
| 455 | )) |
| 456 | |
| 457 | kwarg = node.args.kwarg |
| 458 | if kwarg is not None: |
| 459 | if kwarg.annotation is None or _is_any_annotation(kwarg.annotation): |
| 460 | results.append(UntypedDef( |
| 461 | file=filepath, |
| 462 | line=kwarg.lineno, |
| 463 | name=f"{node.name}.**{kwarg.arg}", |
| 464 | issue="untyped_kwargs", |
| 465 | )) |
| 466 | |
| 467 | # TypeVar without constraints or bound β behaves identically to Any. |
| 468 | results.extend(_find_unconstrained_typevars(tree, filepath)) |
| 469 | |
| 470 | return results |
| 471 | |
| 472 | |
| 473 | def _find_unconstrained_typevars(tree: ast.Module, filepath: str) -> list[UntypedDef]: |
| 474 | """Return a record for every ``TypeVar(...)`` with no bound or constraints. |
| 475 | |
| 476 | A bare ``T = TypeVar("T")`` is semantically equivalent to ``T: Any``. |
| 477 | The Rust port requires every generic to carry an explicit trait bound. |
| 478 | |
| 479 | Args: |
| 480 | tree: Parsed AST of the file. |
| 481 | filepath: Path string used in the returned records. |
| 482 | |
| 483 | Returns: |
| 484 | A list of :class:`UntypedDef` records for unconstrained ``TypeVar`` |
| 485 | definitions. |
| 486 | """ |
| 487 | results: list[UntypedDef] = [] |
| 488 | for node in ast.walk(tree): |
| 489 | # Match: T = TypeVar("T") or T = TypeVar("T", bound=...) |
| 490 | if not isinstance(node, ast.Assign): |
| 491 | continue |
| 492 | value = node.value |
| 493 | if not isinstance(value, ast.Call): |
| 494 | continue |
| 495 | func = value.func |
| 496 | if not (isinstance(func, ast.Name) and func.id == "TypeVar"): |
| 497 | continue |
| 498 | # A TypeVar is constrained when it has: |
| 499 | # - positional args beyond the name (constraint types), OR |
| 500 | # - a keyword arg named "bound" |
| 501 | extra_args = value.args[1:] # args[0] is the name string |
| 502 | kw_names = {kw.arg for kw in value.keywords} |
| 503 | if extra_args or "bound" in kw_names: |
| 504 | continue # constrained β OK |
| 505 | # Unconstrained TypeVar. |
| 506 | target_name = ( |
| 507 | node.targets[0].id |
| 508 | if isinstance(node.targets[0], ast.Name) |
| 509 | else "<TypeVar>" |
| 510 | ) |
| 511 | results.append(UntypedDef( |
| 512 | file=filepath, |
| 513 | line=node.lineno, |
| 514 | name=target_name, |
| 515 | issue="unconstrained_typevar", |
| 516 | )) |
| 517 | return results |
| 518 | |
| 519 | |
| 520 | # --------------------------------------------------------------------------- |
| 521 | # File and directory scanner |
| 522 | # --------------------------------------------------------------------------- |
| 523 | |
| 524 | |
| 525 | def scan_file(filepath: Path) -> FileResult | None: |
| 526 | """Scan a single Python file and return its violation summary. |
| 527 | |
| 528 | String literals are masked before pattern matching so that raw regex |
| 529 | strings and docstring prose never produce false positives. The |
| 530 | ``# type: ignore`` check runs on the *original* source because those |
| 531 | comments are not string literals. |
| 532 | |
| 533 | Returns ``None`` when the file cannot be read (I/O or encoding error). |
| 534 | |
| 535 | Args: |
| 536 | filepath: Absolute or relative path to the Python file. |
| 537 | |
| 538 | Returns: |
| 539 | A :class:`FileResult` on success, ``None`` on I/O failure. |
| 540 | """ |
| 541 | try: |
| 542 | source = filepath.read_text(encoding="utf-8") |
| 543 | except (OSError, UnicodeDecodeError): |
| 544 | return None |
| 545 | |
| 546 | masked = _mask_string_literals(source) |
| 547 | |
| 548 | original_lines = source.splitlines() |
| 549 | masked_lines = masked.splitlines() |
| 550 | |
| 551 | patterns: defaultdict[str, int] = defaultdict(int) |
| 552 | pattern_lines: defaultdict[str, list[int]] = defaultdict(list) |
| 553 | type_ignore_variants: defaultdict[str, int] = defaultdict(int) |
| 554 | |
| 555 | for lineno, (orig_line, masked_line) in enumerate( |
| 556 | zip(original_lines, masked_lines), 1 |
| 557 | ): |
| 558 | stripped = masked_line.strip() |
| 559 | if not stripped or stripped.startswith("#"): |
| 560 | continue |
| 561 | |
| 562 | for name, pattern in _PATTERNS.items(): |
| 563 | # All patterns run on the masked line β string literals are blanked |
| 564 | # so raw regex strings and docstring prose never trigger false |
| 565 | # positives. Comments are NOT masked (they are not string tokens) |
| 566 | # so "# type: ignore" on real code lines is still detected. |
| 567 | # |
| 568 | # Exception: dunder methods legitimately use `: object` (e.g. |
| 569 | # `__eq__(self, other: object)`, `__contains__(self, item: object)`). |
| 570 | # Skip param_object/return_object for those signatures. |
| 571 | if name in {"param_object", "return_object"}: |
| 572 | if re.search(r"def\s+__\w+__\s*\(", masked_line): |
| 573 | continue |
| 574 | # boundary_dict fires on `: dict[str,` and `-> dict[str,`. |
| 575 | # Local variable annotations (e.g. ``x: dict[str, int] = {}``) are |
| 576 | # NOT boundaries β only function parameter and return annotations are. |
| 577 | # `: dict[str,` on a non-def line is a local variable; skip it. |
| 578 | if name == "boundary_dict" and ": dict[str," in masked_line: |
| 579 | if not re.search(r"\bdef\b", masked_line): |
| 580 | continue |
| 581 | count = _count_pattern_in_line(masked_line, pattern) |
| 582 | if count > 0: |
| 583 | patterns[name] += count |
| 584 | pattern_lines[name].append(lineno) |
| 585 | |
| 586 | if name == "type_ignore": |
| 587 | # Classify against the original line so we can distinguish |
| 588 | # blanket ignores from code-specific ones. |
| 589 | variant = _classify_type_ignore(orig_line) |
| 590 | type_ignore_variants[variant] += 1 |
| 591 | |
| 592 | return FileResult( |
| 593 | file=str(filepath), |
| 594 | imports_any=_imports_any(source), |
| 595 | patterns=dict(patterns), |
| 596 | pattern_lines=dict(pattern_lines), |
| 597 | type_ignore_variants=dict(type_ignore_variants), |
| 598 | untyped_defs=_find_untyped_defs(source, str(filepath)), |
| 599 | ) |
| 600 | |
| 601 | |
| 602 | def scan_directory(directory: Path) -> list[FileResult]: |
| 603 | """Recursively scan all Python files in *directory*. |
| 604 | |
| 605 | Skips virtual environments, caches, build artefacts, and VCS/tool |
| 606 | metadata directories (see ``_SKIP_DIRS``). |
| 607 | |
| 608 | Args: |
| 609 | directory: Root of the directory tree to scan. |
| 610 | |
| 611 | Returns: |
| 612 | A list of :class:`FileResult` objects, one per successfully scanned file. |
| 613 | """ |
| 614 | results: list[FileResult] = [] |
| 615 | for py_file in sorted(directory.rglob("*.py")): |
| 616 | if any(part in _SKIP_DIRS for part in py_file.parts): |
| 617 | continue |
| 618 | file_result = scan_file(py_file) |
| 619 | if file_result is not None: |
| 620 | results.append(file_result) |
| 621 | return results |
| 622 | |
| 623 | |
| 624 | # --------------------------------------------------------------------------- |
| 625 | # Report generation |
| 626 | # --------------------------------------------------------------------------- |
| 627 | |
| 628 | |
| 629 | def _offender_sort_key(entry: Offender) -> int: |
| 630 | """Return the sort key for an :class:`Offender` (total violation count).""" |
| 631 | return entry["total"] |
| 632 | |
| 633 | |
| 634 | def generate_report(results: list[FileResult]) -> Report: |
| 635 | """Aggregate per-file scan results into a :class:`Report`. |
| 636 | |
| 637 | Args: |
| 638 | results: List of :class:`FileResult` objects from :func:`scan_file` |
| 639 | or :func:`scan_directory`. |
| 640 | |
| 641 | Returns: |
| 642 | A :class:`Report` ready for human display or JSON serialisation. |
| 643 | """ |
| 644 | totals: defaultdict[str, int] = defaultdict(int) |
| 645 | files_with_any_import = 0 |
| 646 | per_file: PerFileViolations = {} |
| 647 | top_offenders: list[Offender] = [] |
| 648 | all_type_ignore_variants: defaultdict[str, int] = defaultdict(int) |
| 649 | all_untyped_defs: list[UntypedDef] = [] |
| 650 | all_violations: list[Violation] = [] |
| 651 | |
| 652 | for r in results: |
| 653 | filepath = r["file"] |
| 654 | if r["imports_any"]: |
| 655 | files_with_any_import += 1 |
| 656 | |
| 657 | file_total = 0 |
| 658 | file_patterns: PatternCounts = {} |
| 659 | for pattern, count in r["patterns"].items(): |
| 660 | totals[pattern] += count |
| 661 | file_patterns[pattern] = count |
| 662 | file_total += count |
| 663 | for lineno in r["pattern_lines"].get(pattern, []): |
| 664 | all_violations.append(Violation(file=filepath, line=lineno, kind=pattern)) |
| 665 | |
| 666 | if file_total > 0: |
| 667 | per_file[filepath] = file_patterns |
| 668 | top_offenders.append(Offender( |
| 669 | file=filepath, |
| 670 | total=file_total, |
| 671 | patterns=file_patterns, |
| 672 | )) |
| 673 | |
| 674 | for variant, count in r["type_ignore_variants"].items(): |
| 675 | all_type_ignore_variants[variant] += count |
| 676 | |
| 677 | all_untyped_defs.extend(r["untyped_defs"]) |
| 678 | |
| 679 | all_violations.sort(key=lambda v: (v["file"], v["line"])) |
| 680 | top_offenders.sort(key=_offender_sort_key, reverse=True) |
| 681 | |
| 682 | return Report( |
| 683 | summary=ReportSummary( |
| 684 | total_files_scanned=len(results), |
| 685 | files_importing_any=files_with_any_import, |
| 686 | total_any_patterns=sum(totals.values()), |
| 687 | untyped_defs=len(all_untyped_defs), |
| 688 | ), |
| 689 | pattern_totals=dict(totals), |
| 690 | type_ignore_variants=dict(all_type_ignore_variants), |
| 691 | # Store all offenders in JSON; display is capped separately in the |
| 692 | # human-readable printer. |
| 693 | top_offenders=top_offenders, |
| 694 | per_file=per_file, |
| 695 | violations=all_violations, |
| 696 | # Store the full list β callers that need all records can use --json. |
| 697 | untyped_defs=all_untyped_defs, |
| 698 | ) |
| 699 | |
| 700 | |
| 701 | # --------------------------------------------------------------------------- |
| 702 | # Human-readable report printer |
| 703 | # --------------------------------------------------------------------------- |
| 704 | |
| 705 | |
| 706 | def print_human_summary(report: Report, top_n: int = 15) -> None: |
| 707 | """Print a formatted, human-readable summary of *report* to stdout. |
| 708 | |
| 709 | Args: |
| 710 | report: A :class:`Report` produced by :func:`generate_report`. |
| 711 | top_n: How many offenders to display in the top-offenders list. |
| 712 | """ |
| 713 | s = report["summary"] |
| 714 | totals = report["pattern_totals"] |
| 715 | |
| 716 | print("\n" + "=" * 70) |
| 717 | print(" TYPING AUDIT β Violation Report") |
| 718 | print("=" * 70) |
| 719 | print(f" Files scanned: {s['total_files_scanned']}") |
| 720 | print(f" Files importing Any: {s['files_importing_any']}") |
| 721 | print(f" Total violations: {s['total_any_patterns']}") |
| 722 | print(f" Untyped defs: {s['untyped_defs']}") |
| 723 | print() |
| 724 | |
| 725 | has_violations = False |
| 726 | for category, pattern_names in _CATEGORY_ORDER: |
| 727 | category_total = sum(totals.get(p, 0) for p in pattern_names) |
| 728 | if category_total == 0: |
| 729 | continue |
| 730 | has_violations = True |
| 731 | print(f" {category}:") |
| 732 | for p in pattern_names: |
| 733 | count = totals.get(p, 0) |
| 734 | if count > 0: |
| 735 | print(f" {p:38s} {count:5d}") |
| 736 | print() |
| 737 | |
| 738 | if not has_violations: |
| 739 | print(" Pattern breakdown: (none)") |
| 740 | print() |
| 741 | |
| 742 | if report["type_ignore_variants"]: |
| 743 | print(" # type: ignore variants:") |
| 744 | for variant, count in sorted( |
| 745 | report["type_ignore_variants"].items(), |
| 746 | key=operator.itemgetter(1), |
| 747 | reverse=True, |
| 748 | ): |
| 749 | print(f" {variant:44s} {count:5d}") |
| 750 | print() |
| 751 | |
| 752 | if report["violations"]: |
| 753 | print(" Violations (file:line [kind]):") |
| 754 | for v in report["violations"]: |
| 755 | print(f" {v['file']}:{v['line']} [{v['kind']}]") |
| 756 | print() |
| 757 | |
| 758 | print(f" Top {top_n} offenders:") |
| 759 | for entry in report["top_offenders"][:top_n]: |
| 760 | print(f" {entry['total']:4d} {entry['file']}") |
| 761 | print("=" * 70 + "\n") |
| 762 | |
| 763 | |
| 764 | # --------------------------------------------------------------------------- |
| 765 | # CLI |
| 766 | # --------------------------------------------------------------------------- |
| 767 | |
| 768 | |
| 769 | def main() -> None: |
| 770 | """Entry point: parse CLI flags, run the scan, and enforce the ratchet. |
| 771 | |
| 772 | Scans the specified directories (or individual files), prints a human |
| 773 | summary, optionally writes a JSON report, and exits non-zero when either |
| 774 | the pattern violation count exceeds ``--max-any`` or the untyped-def |
| 775 | count exceeds ``--max-untyped``. |
| 776 | """ |
| 777 | parser = argparse.ArgumentParser( |
| 778 | description=( |
| 779 | "Audit typing violations: Any, object, cast, bare collections, " |
| 780 | "Optional/Union (legacy), Callable without signature, untyped " |
| 781 | "varargs, type: ignore, untyped defs, unconstrained TypeVars." |
| 782 | ), |
| 783 | ) |
| 784 | parser.add_argument( |
| 785 | "--dirs", |
| 786 | nargs="+", |
| 787 | default=["muse/", "tests/"], |
| 788 | help="Directories or individual .py files to scan. Default: muse/ tests/", |
| 789 | ) |
| 790 | parser.add_argument( |
| 791 | "--json", |
| 792 | type=str, |
| 793 | metavar="PATH", |
| 794 | help="Write the JSON report to PATH.", |
| 795 | ) |
| 796 | parser.add_argument( |
| 797 | "--max-any", |
| 798 | type=int, |
| 799 | default=None, |
| 800 | metavar="N", |
| 801 | help="Exit non-zero if total pattern violations exceed N (ratchet mode).", |
| 802 | ) |
| 803 | parser.add_argument( |
| 804 | "--max-untyped", |
| 805 | type=int, |
| 806 | default=None, |
| 807 | metavar="N", |
| 808 | help="Exit non-zero if total untyped-def count exceeds N (ratchet mode).", |
| 809 | ) |
| 810 | parser.add_argument( |
| 811 | "--top-n", |
| 812 | type=int, |
| 813 | default=15, |
| 814 | metavar="N", |
| 815 | help="Number of offenders to display in the human summary. Default: 15.", |
| 816 | ) |
| 817 | args = parser.parse_args() |
| 818 | |
| 819 | all_results: list[FileResult] = [] |
| 820 | for d in args.dirs: |
| 821 | p = Path(d) |
| 822 | if p.is_file() and p.suffix == ".py": |
| 823 | result = scan_file(p) |
| 824 | if result is not None: |
| 825 | all_results.append(result) |
| 826 | elif p.is_dir(): |
| 827 | all_results.extend(scan_directory(p)) |
| 828 | else: |
| 829 | print(f"WARNING: {d} does not exist, skipping", file=sys.stderr) |
| 830 | |
| 831 | report = generate_report(all_results) |
| 832 | print_human_summary(report, top_n=args.top_n) |
| 833 | |
| 834 | if args.json: |
| 835 | out = Path(args.json) |
| 836 | out.parent.mkdir(parents=True, exist_ok=True) |
| 837 | out.write_text(json.dumps(report, indent=2), encoding="utf-8") |
| 838 | print(f" JSON report written to {args.json}") |
| 839 | |
| 840 | failed = False |
| 841 | |
| 842 | if args.max_any is not None: |
| 843 | total = report["summary"]["total_any_patterns"] |
| 844 | if total > args.max_any: |
| 845 | print( |
| 846 | f"\nβ RATCHET FAILED (patterns): {total} violations exceed " |
| 847 | f"threshold of {args.max_any}", |
| 848 | file=sys.stderr, |
| 849 | ) |
| 850 | failed = True |
| 851 | else: |
| 852 | print( |
| 853 | f"\nβ RATCHET OK (patterns): {total} violations within " |
| 854 | f"threshold of {args.max_any}", |
| 855 | ) |
| 856 | |
| 857 | if args.max_untyped is not None: |
| 858 | untyped = report["summary"]["untyped_defs"] |
| 859 | if untyped > args.max_untyped: |
| 860 | print( |
| 861 | f"\nβ RATCHET FAILED (untyped defs): {untyped} exceed " |
| 862 | f"threshold of {args.max_untyped}", |
| 863 | file=sys.stderr, |
| 864 | ) |
| 865 | failed = True |
| 866 | else: |
| 867 | print( |
| 868 | f"\nβ RATCHET OK (untyped defs): {untyped} within " |
| 869 | f"threshold of {args.max_untyped}", |
| 870 | ) |
| 871 | |
| 872 | if failed: |
| 873 | sys.exit(1) |
| 874 | |
| 875 | |
| 876 | if __name__ == "__main__": |
| 877 | main() |