"""Comprehensive tests for ``muse code breakage``. Coverage: I. Unit — _build_head_names_set / _build_head_class_methods II. Unit — _check_file (all issue types, edge cases) III. Integration — run() with a real repo (init → commit → modify → breakage) IV. Integration — --language, --path, --commit, --strict flags V. Integration — exit-code contract VI. Integration — JSON output schema VII. Regression — bugs fixed in this review (O(1) lookup, removed_public_method, silent-fallback guard, working-tree-only files) VIII.Stress — 200-file repo with deliberate breakage scattered throughout """ from __future__ import annotations import json import pathlib import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.cli.commands.breakage import ( _BreakageIssue, _build_head_class_methods, _build_head_names_set, _check_file, ) from typing import TypedDict from muse.plugins.code.ast_parser import SymbolKind, SymbolRecord, SymbolTree class _BreakageJson(TypedDict, total=False): schema_version: str commit: str branch: str language_filter: str | None path_filter: str | None strict: bool file_count: int issues: list[_BreakageIssue] total: int errors: int warnings: int cli = None runner = CliRunner() # --------------------------------------------------------------------------- # Test fixture helpers # --------------------------------------------------------------------------- @pytest.fixture def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path: """Fresh Muse repo in tmp_path.""" monkeypatch.chdir(tmp_path) monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) result = runner.invoke(cli, ["init"]) assert result.exit_code == 0, result.output return tmp_path def _write(repo: pathlib.Path, rel_path: str, content: str) -> None: p = repo / rel_path p.parent.mkdir(parents=True, exist_ok=True) p.write_text(content) def _commit(repo: pathlib.Path, msg: str = "snapshot") -> None: result = runner.invoke(cli, ["code", "add", "."]) assert result.exit_code == 0, result.output result = runner.invoke(cli, ["commit", "-m", msg]) assert result.exit_code == 0, result.output def _breakage( args: list[str] | None = None, ) -> InvokeResult: return runner.invoke(cli, ["code", "breakage"] + (args or [])) def _sym( kind: SymbolKind, name: str, qualified_name: str = "", content_id: str = "abc", ) -> SymbolRecord: return SymbolRecord( kind=kind, name=name, qualified_name=qualified_name or name, content_id=content_id, body_hash=content_id, signature_id=content_id, metadata_id="", canonical_key=f"f#{kind}#{name}#1", lineno=1, end_lineno=10, ) # --------------------------------------------------------------------------- # Section I — Unit: _build_head_names_set # --------------------------------------------------------------------------- class TestBuildHeadNamesSet: def test_excludes_imports(self) -> None: tree: SymbolTree = { "f.py::import::os": _sym("import", "os"), "f.py::Foo": _sym("class", "Foo"), } result = _build_head_names_set({"f.py": tree}) assert "Foo" in result assert "os" not in result def test_collects_across_files(self) -> None: t1: SymbolTree = {"a.py::foo": _sym("function", "foo")} t2: SymbolTree = {"b.py::bar": _sym("function", "bar")} result = _build_head_names_set({"a.py": t1, "b.py": t2}) assert result == {"foo", "bar"} def test_empty_map_returns_empty_set(self) -> None: assert _build_head_names_set({}) == set() def test_only_imports_returns_empty_set(self) -> None: tree: SymbolTree = {"f.py::import::x": _sym("import", "x")} assert _build_head_names_set({"f.py": tree}) == set() # --------------------------------------------------------------------------- # Section II — Unit: _build_head_class_methods # --------------------------------------------------------------------------- class TestBuildHeadClassMethods: def test_collects_methods(self) -> None: tree: SymbolTree = { "f.py::Foo": _sym("class", "Foo"), "f.py::Foo.bar": _sym("method", "bar", qualified_name="Foo.bar"), "f.py::Foo.baz": _sym("method", "baz", qualified_name="Foo.baz"), } result = _build_head_class_methods({"f.py": tree}) assert result == {"f.py::Foo": {"bar", "baz"}} def test_ignores_non_methods(self) -> None: tree: SymbolTree = { "f.py::Foo": _sym("class", "Foo"), "f.py::top_fn": _sym("function", "top_fn"), } result = _build_head_class_methods({"f.py": tree}) assert result == {} def test_multiple_classes(self) -> None: tree: SymbolTree = { "f.py::A.m1": _sym("method", "m1", qualified_name="A.m1"), "f.py::B.m2": _sym("method", "m2", qualified_name="B.m2"), } result = _build_head_class_methods({"f.py": tree}) assert result["f.py::A"] == {"m1"} assert result["f.py::B"] == {"m2"} def test_method_without_dot_in_qualified_name_is_skipped(self) -> None: tree: SymbolTree = {"f.py::orphan": _sym("method", "orphan", qualified_name="orphan")} assert _build_head_class_methods({"f.py": tree}) == {} # --------------------------------------------------------------------------- # Section III — Unit: _check_file # --------------------------------------------------------------------------- class TestCheckFile: # --- stale_import ------------------------------------------------------- def test_stale_import_flagged_as_warning(self) -> None: # Must use the new qualified_name format "import::::" # with a muse-internal module — only those are checked for staleness. working: SymbolTree = { "f.py::import::gone": _sym( "import", "gone", qualified_name="import::muse.core.foo::gone" ) } head: SymbolTree = {} issues = _check_file("f.py", working, head, set(), {}, frozenset()) assert len(issues) == 1 assert issues[0]["issue_type"] == "stale_import" assert issues[0]["severity"] == "warning" assert "gone" in issues[0]["description"] def test_import_present_in_head_is_clean(self) -> None: working: SymbolTree = {"f.py::import::alive": _sym("import", "alive")} head: SymbolTree = {} issues = _check_file("f.py", working, head, {"alive"}, {}, frozenset()) assert issues == [] def test_module_import_not_flagged_as_stale(self) -> None: # ``from muse.cli.commands import breakage`` — name is a module, not a # symbol. It should never appear in head_names_set, but it must not # be flagged as stale when the corresponding .py file exists in the # HEAD snapshot. working: SymbolTree = { "app.py::import::breakage": _sym( "import", "breakage", qualified_name="import::muse.cli.commands::breakage", ) } head_file_paths = frozenset({"muse/cli/commands/breakage.py"}) issues = _check_file("app.py", working, {}, set(), {}, head_file_paths) assert issues == [] def test_module_import_via_init_not_flagged(self) -> None: # ``from muse.plugins import code`` where ``muse/plugins/code/__init__.py`` # exists — package import, not stale. working: SymbolTree = { "app.py::import::code": _sym( "import", "code", qualified_name="import::muse.plugins::code", ) } head_file_paths = frozenset({"muse/plugins/code/__init__.py"}) issues = _check_file("app.py", working, {}, set(), {}, head_file_paths) assert issues == [] def test_truly_missing_import_still_flagged_with_file_paths(self) -> None: # head_file_paths is non-empty but doesn't contain the imported module — # the import is genuinely stale. working: SymbolTree = { "f.py::import::vanished": _sym( "import", "vanished", qualified_name="import::muse.core::vanished", ) } head_file_paths = frozenset({"muse/core/other.py"}) issues = _check_file("f.py", working, {}, set(), {}, head_file_paths) assert len(issues) == 1 assert issues[0]["issue_type"] == "stale_import" def test_import_defined_locally_is_clean(self) -> None: working: SymbolTree = { "f.py::import::local_fn": _sym("import", "local_fn"), "f.py::local_fn": _sym("function", "local_fn"), } issues = _check_file("f.py", working, {}, set(), {}, frozenset()) assert issues == [] def test_wildcard_import_not_flagged(self) -> None: working: SymbolTree = {"f.py::import::*:os": _sym("import", "*:os")} issues = _check_file("f.py", working, {}, set(), {}, frozenset()) assert issues == [] def test_empty_working_tree_returns_no_issues(self) -> None: issues = _check_file("f.py", {}, {}, {"something"}, {}, frozenset()) assert issues == [] def test_multiple_stale_imports(self) -> None: working: SymbolTree = { "f.py::import::a": _sym( "import", "a", qualified_name="import::muse.core.x::a" ), "f.py::import::b": _sym( "import", "b", qualified_name="import::muse.core.y::b" ), } issues = _check_file("f.py", working, {}, set(), {}, frozenset()) assert len(issues) == 2 types = {i["issue_type"] for i in issues} assert types == {"stale_import"} # --- removed_public_method ---------------------------------------------- def test_removed_public_method_flagged_as_error(self) -> None: head: SymbolTree = { "f.py::Foo": _sym("class", "Foo"), } head_class_methods = {"f.py::Foo": {"pay"}} working: SymbolTree = { "f.py::Foo": _sym("class", "Foo"), # pay() is missing from working tree } issues = _check_file("f.py", working, head, set(), head_class_methods, frozenset()) assert any(i["issue_type"] == "removed_public_method" for i in issues) assert any(i["severity"] == "error" for i in issues) assert any("pay" in i["description"] for i in issues) def test_private_method_removal_not_flagged(self) -> None: head: SymbolTree = {"f.py::Foo": _sym("class", "Foo")} head_class_methods = {"f.py::Foo": {"_internal"}} working: SymbolTree = {"f.py::Foo": _sym("class", "Foo")} issues = _check_file("f.py", working, head, set(), head_class_methods, frozenset()) assert all(i["issue_type"] != "removed_public_method" for i in issues) def test_added_method_not_flagged(self) -> None: head: SymbolTree = {"f.py::Foo": _sym("class", "Foo")} head_class_methods = {"f.py::Foo": {"pay"}} working: SymbolTree = { "f.py::Foo": _sym("class", "Foo"), "f.py::Foo.pay": _sym("method", "pay", qualified_name="Foo.pay"), "f.py::Foo.refund": _sym("method", "refund", qualified_name="Foo.refund"), } issues = _check_file("f.py", working, head, set(), head_class_methods, frozenset()) assert all(i["issue_type"] != "removed_public_method" for i in issues) def test_class_in_working_not_in_head_not_flagged(self) -> None: head: SymbolTree = {} working: SymbolTree = { "f.py::Brand": _sym("class", "Brand"), "f.py::Brand.name": _sym("method", "name", qualified_name="Brand.name"), } issues = _check_file("f.py", working, head, set(), {}, frozenset()) assert issues == [] def test_removed_method_check_only_applies_to_python(self) -> None: head_js: SymbolTree = {"f.js::Foo": _sym("class", "Foo")} head_class_methods = {"f.js::Foo": {"pay"}} working_js: SymbolTree = {"f.js::Foo": _sym("class", "Foo")} issues = _check_file("f.js", working_js, head_js, set(), head_class_methods, frozenset()) # Non-Python files should not trigger removed_public_method assert all(i["issue_type"] != "removed_public_method" for i in issues) # --------------------------------------------------------------------------- # Section IV — Integration: init → commit → modify → breakage # --------------------------------------------------------------------------- CLEAN_MODULE = """\ def compute(x: int) -> int: return x * 2 class Calculator: def add(self, a: int, b: int) -> int: return a + b def subtract(self, a: int, b: int) -> int: return a - b """ MODULE_WITH_STALE_IMPORT = """\ from muse.core.nonexistent import utterly_nonexistent_symbol # stale: not in HEAD def compute(x: int) -> int: return x * 2 class Calculator: def add(self, a: int, b: int) -> int: return a + b def subtract(self, a: int, b: int) -> int: return a - b """ MODULE_WITH_REMOVED_METHOD = """\ class Calculator: def add(self, a: int, b: int) -> int: return a + b # subtract() removed! """ class TestIntegrationBasic: def test_no_changes_is_clean(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "add calc") result = _breakage() assert result.exit_code == 0 assert "No structural breakage" in result.output def test_stale_import_detected(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "add calc") (repo / "src/calc.py").write_text(MODULE_WITH_STALE_IMPORT) result = _breakage() assert "stale_import" in result.output assert result.exit_code == 0 # warnings don't cause exit 1 without --strict def test_removed_public_method_detected(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "add calc") (repo / "src/calc.py").write_text(MODULE_WITH_REMOVED_METHOD) result = _breakage() assert "removed_public_method" in result.output assert "subtract" in result.output assert result.exit_code == 1 # errors always exit 1 def test_no_head_commit_exits_nonzero( self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch ) -> None: monkeypatch.chdir(tmp_path) runner.invoke(cli, ["init"]) result = _breakage() assert result.exit_code != 0 def test_output_shows_commit_hash(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "add calc") result = _breakage() assert result.exit_code == 0 # Commit hash appears in the header line assert "Breakage check" in result.output # --------------------------------------------------------------------------- # Section V — Integration: flags # --------------------------------------------------------------------------- class TestFlags: def test_language_filter_restricts_files(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "add calc") (repo / "src/calc.py").write_text(MODULE_WITH_STALE_IMPORT) # With --language Python the stale import should surface result_py = _breakage(["--language", "Python"]) assert "stale_import" in result_py.output # With --language MIDI nothing matches — should be clean result_midi = _breakage(["--language", "MIDI"]) assert "No structural breakage" in result_midi.output assert result_midi.exit_code == 0 def test_path_filter_restricts_files(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _write(repo, "src/other.py", CLEAN_MODULE) _commit(repo, "two modules") (repo / "src/calc.py").write_text(MODULE_WITH_REMOVED_METHOD) (repo / "src/other.py").write_text(MODULE_WITH_REMOVED_METHOD) # Filter to only other.py — calc.py breakage should not appear result = _breakage(["--path", "src/other.py"]) assert "other.py" in result.output # calc.py should not appear in output assert "calc.py" not in result.output def test_strict_makes_warnings_exit_one(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "add calc") (repo / "src/calc.py").write_text(MODULE_WITH_STALE_IMPORT) # Without --strict: warning, exit 0 result = _breakage() assert result.exit_code == 0 # With --strict: warning becomes exit 1 result_strict = _breakage(["--strict"]) assert result_strict.exit_code == 1 def test_commit_flag_resolves_named_branch(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "v1") result = _breakage(["--commit", "main"]) assert result.exit_code == 0 def test_commit_flag_invalid_ref_exits_nonzero(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "v1") result = _breakage(["--commit", "no-such-ref-xyz"]) assert result.exit_code != 0 def test_path_filter_no_matches_is_clean(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "add calc") (repo / "src/calc.py").write_text(MODULE_WITH_REMOVED_METHOD) result = _breakage(["--path", "totally/nonexistent/*.py"]) assert result.exit_code == 0 def test_strict_with_no_issues_still_exits_zero(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "add calc") result = _breakage(["--strict"]) assert result.exit_code == 0 # --------------------------------------------------------------------------- # Section VI — Integration: JSON output # --------------------------------------------------------------------------- class TestJsonOutput: def _json(self, args: list[str] | None = None) -> _BreakageJson: result = _breakage((args or []) + ["--json"]) data: _BreakageJson = json.loads(result.output) return data def test_schema_keys_present(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") data = self._json() required = { "schema", "commit", "branch", "language_filter", "path_filter", "strict", "file_count", "issues", "total", "errors", "warning_count", } assert required <= data.keys() def test_clean_repo_zero_issues(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") data = self._json() assert data["total"] == 0 assert data["errors"] == 0 assert data["warning_count"] == 0 assert data["issues"] == [] def test_error_counted_correctly(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") (repo / "src/calc.py").write_text(MODULE_WITH_REMOVED_METHOD) data = self._json() assert data["errors"] >= 1 error_issues = [i for i in data["issues"] if i["severity"] == "error"] assert len(error_issues) == data["errors"] def test_warning_counted_correctly(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") (repo / "src/calc.py").write_text(MODULE_WITH_STALE_IMPORT) data = self._json() assert data["warning_count"] >= 1 warn_issues = [i for i in data["issues"] if i["severity"] == "warning"] assert len(warn_issues) == data["warning_count"] def test_issue_keys_present(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") (repo / "src/calc.py").write_text(MODULE_WITH_REMOVED_METHOD) data = self._json() assert data["issues"] issue = data["issues"][0] assert {"issue_type", "path", "description", "severity"} <= issue.keys() def test_file_count_reflects_manifest(self, repo: pathlib.Path) -> None: _write(repo, "src/a.py", CLEAN_MODULE) _write(repo, "src/b.py", CLEAN_MODULE) _commit(repo, "two files") data = self._json() assert data["file_count"] >= 2 def test_path_filter_reflected_in_json(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") data = self._json(["--path", "src/*.py"]) assert data["path_filter"] == "src/*.py" def test_strict_reflected_in_json(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") data = self._json(["--strict"]) assert data["strict"] is True def test_language_filter_reflected_in_json(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") data = self._json(["--language", "Python"]) assert data["language_filter"] == "Python" def test_branch_field_is_nonempty_string(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") data = self._json() assert isinstance(data["branch"], str) assert data["branch"] # --------------------------------------------------------------------------- # Section VII — Exit-code contract # --------------------------------------------------------------------------- class TestExitCode: def test_clean_exits_zero(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") assert _breakage().exit_code == 0 def test_errors_exit_one(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") (repo / "src/calc.py").write_text(MODULE_WITH_REMOVED_METHOD) assert _breakage().exit_code == 1 def test_warnings_exit_zero_without_strict(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") (repo / "src/calc.py").write_text(MODULE_WITH_STALE_IMPORT) assert _breakage().exit_code == 0 def test_warnings_exit_one_with_strict(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") (repo / "src/calc.py").write_text(MODULE_WITH_STALE_IMPORT) assert _breakage(["--strict"]).exit_code == 1 def test_json_exit_code_matches_human_output(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") (repo / "src/calc.py").write_text(MODULE_WITH_REMOVED_METHOD) human = _breakage() json_out = _breakage(["--json"]) assert human.exit_code == json_out.exit_code == 1 # --------------------------------------------------------------------------- # Section VIII — Regression: bugs fixed in this review # --------------------------------------------------------------------------- class TestRegressions: def test_check2_does_not_produce_false_positives_across_classes( self, repo: pathlib.Path ) -> None: """Old Check 2 would flag class A for missing methods of class B. The fixed version only checks each class against its own HEAD methods. """ mod = """\ class A: def foo(self) -> None: pass class B: def bar(self) -> None: pass """ _write(repo, "src/two_classes.py", mod) _commit(repo, "two classes") # Both classes still have their own methods — no breakage result = _breakage() assert result.exit_code == 0 assert "removed_public_method" not in result.output def test_stale_import_lookup_is_o1_not_linear_scan( self, repo: pathlib.Path ) -> None: """Verifies that `head_names_set` is used (set membership, not any() loop). We test this indirectly: if 1000 HEAD symbols exist, the check must still complete quickly and return the correct result. Correctness regression only — performance is measured separately in the stress tests. """ # Build a module with 50 functions big_mod = "\n".join(f"def fn_{i}() -> None:\n pass" for i in range(50)) _write(repo, "src/big.py", big_mod) _commit(repo, "50 fns") # Add a stale import to a separate file — must be a muse.* import # so the stale-import filter fires on it. _write(repo, "src/consumer.py", "from muse.big import does_not_exist\n") _commit(repo, "add consumer") (repo / "src/consumer.py").write_text( "from muse.big import does_not_exist\n" ) result = _breakage(["--path", "src/consumer.py"]) assert "stale_import" in result.output def test_silent_manifest_fallback_guard( self, repo: pathlib.Path ) -> None: """run() must not silently treat an unreadable snapshot as an empty manifest. We verify by checking that the command exits non-zero (error) rather than silently succeeding with zero issues when the commit cannot be read. """ _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") result = _breakage(["--commit", "nonexistent-ref-8675309"]) assert result.exit_code != 0 def test_removed_public_method_severity_is_error_not_warning( self, repo: pathlib.Path ) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") (repo / "src/calc.py").write_text(MODULE_WITH_REMOVED_METHOD) data = json.loads(_breakage(["--json"]).output) removed = [i for i in data["issues"] if i["issue_type"] == "removed_public_method"] assert removed, "expected at least one removed_public_method issue" for issue in removed: assert issue["severity"] == "error" def test_stale_import_severity_is_warning_not_error( self, repo: pathlib.Path ) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") (repo / "src/calc.py").write_text(MODULE_WITH_STALE_IMPORT) data = json.loads(_breakage(["--json"]).output) stale = [i for i in data["issues"] if i["issue_type"] == "stale_import"] assert stale, "expected at least one stale_import issue" for issue in stale: assert issue["severity"] == "warning" def test_non_python_file_no_removed_method_check( self, repo: pathlib.Path ) -> None: """removed_public_method is Python-only; non-Python files must not trigger it.""" _write(repo, "notes.md", "# Hello\n") _commit(repo, "markdown") result = _breakage() assert result.exit_code == 0 assert "removed_public_method" not in result.output def test_both_issues_in_same_file(self, repo: pathlib.Path) -> None: _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "baseline") # Introduce both a stale import AND a removed method. # Must use a muse.* source module so the stale-import filter fires. (repo / "src/calc.py").write_text( "from muse.core.vanished_module import vanished\n\n" "class Calculator:\n" " def add(self, a: int, b: int) -> int:\n" " return a + b\n" ) result = _breakage() assert "stale_import" in result.output assert "removed_public_method" in result.output assert result.exit_code == 1 # error from removed method # --------------------------------------------------------------------------- # Section IX — Stress tests # --------------------------------------------------------------------------- _CLEAN_FUNC_TEMPLATE = """\ def fn_{i}(x: int) -> int: return x + {i} class Class_{i}: def method_a(self) -> str: return "a_{i}" def method_b(self) -> str: return "b_{i}" """ _BROKEN_FUNC_TEMPLATE = """\ from muse.nowhere_special import ghost_{i} def fn_{i}(x: int) -> int: return x + {i} class Class_{i}: def method_a(self) -> str: return "a_{i}" # method_b removed """ class TestStress: @pytest.mark.slow def test_200_clean_files_no_false_positives( self, repo: pathlib.Path ) -> None: """200 files, all clean — breakage must report 0 issues.""" for i in range(200): _write(repo, f"src/mod_{i:03d}.py", _CLEAN_FUNC_TEMPLATE.format(i=i)) _commit(repo, "200 clean files") result = _breakage() assert result.exit_code == 0 assert "No structural breakage" in result.output @pytest.mark.slow def test_200_files_20_broken_detected_exactly( self, repo: pathlib.Path ) -> None: """200 files; the last 20 have both stale_import and removed_public_method.""" for i in range(180): _write(repo, f"src/mod_{i:03d}.py", _CLEAN_FUNC_TEMPLATE.format(i=i)) for i in range(180, 200): _write(repo, f"src/mod_{i:03d}.py", _CLEAN_FUNC_TEMPLATE.format(i=i)) _commit(repo, "200 files committed clean") # Now break the last 20 in the working tree (not committed) for i in range(180, 200): (repo / f"src/mod_{i:03d}.py").write_text( _BROKEN_FUNC_TEMPLATE.format(i=i) ) data = json.loads(_breakage(["--json"]).output) # Each broken file contributes: 1 stale_import warning + 1 error assert data["errors"] >= 20, f"Expected ≥20 errors, got {data['errors']}" assert data["warning_count"] >= 20, f"Expected ≥20 warnings, got {data['warnings']}" # No issues from the 180 clean files broken_files = {f"src/mod_{i:03d}.py" for i in range(180, 200)} for issue in data["issues"]: assert issue["path"] in broken_files, ( f"Unexpected issue in {issue['path']}: {issue['description']}" ) @pytest.mark.slow def test_path_filter_on_200_file_repo(self, repo: pathlib.Path) -> None: """Path filter must reduce the checked set, not scan everything.""" for i in range(200): _write(repo, f"src/mod_{i:03d}.py", _CLEAN_FUNC_TEMPLATE.format(i=i)) _commit(repo, "200 files") # Break one file outside the filter (repo / "src/mod_050.py").write_text(MODULE_WITH_REMOVED_METHOD) # Filter restricts to mod_100–mod_199 only — the broken file is excluded result = _breakage(["--path", "src/mod_1*.py"]) assert result.exit_code == 0 assert "removed_public_method" not in result.output @pytest.mark.slow def test_strict_on_200_files_with_one_warning(self, repo: pathlib.Path) -> None: for i in range(200): _write(repo, f"src/mod_{i:03d}.py", _CLEAN_FUNC_TEMPLATE.format(i=i)) _commit(repo, "200 files") # Introduce a single stale import in one file. # Must use a muse.* source module so the stale-import filter fires. content = _CLEAN_FUNC_TEMPLATE.format(i=0) (repo / "src/mod_000.py").write_text(f"from muse.ghost import gone\n{content}") # Without --strict: exit 0 (warning only) assert _breakage().exit_code == 0 # With --strict: exit 1 assert _breakage(["--strict"]).exit_code == 1 @pytest.mark.slow def test_multiple_commits_commit_flag(self, repo: pathlib.Path) -> None: """--commit lets you diff against any historical snapshot.""" _write(repo, "src/calc.py", CLEAN_MODULE) _commit(repo, "v1 — has subtract") # Break it in v2 (repo / "src/calc.py").write_text(MODULE_WITH_REMOVED_METHOD) runner.invoke(cli, ["code", "add", "."]) runner.invoke(cli, ["commit", "-m", "v2 — subtract removed"]) # Working tree matches v2 (committed); checking against main shows no # breakage since the removal is already committed. result_main = _breakage(["--commit", "main"]) assert result_main.exit_code == 0 @pytest.mark.slow def test_json_issues_each_have_all_required_fields_stress( self, repo: pathlib.Path ) -> None: for i in range(50): _write(repo, f"src/m{i}.py", _CLEAN_FUNC_TEMPLATE.format(i=i)) _commit(repo, "50 files") for i in range(25): (repo / f"src/m{i}.py").write_text(_BROKEN_FUNC_TEMPLATE.format(i=i)) data = json.loads(_breakage(["--json"]).output) required_keys = {"issue_type", "path", "description", "severity"} for issue in data["issues"]: missing = required_keys - issue.keys() assert not missing, f"Issue missing keys {missing}: {issue}" # --------------------------------------------------------------------------- # Flag registration tests # --------------------------------------------------------------------------- import argparse as _argparse from muse.cli.commands.breakage import register as _register_breakage def _parse_breakage(*args: str) -> _argparse.Namespace: """Build an argument parser via register() and parse args.""" root_p = _argparse.ArgumentParser() subs = root_p.add_subparsers(dest="cmd") _register_breakage(subs) return root_p.parse_args(["breakage", *args]) class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: ns = _parse_breakage() assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = _parse_breakage("--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = _parse_breakage("-j") assert ns.json_out is True def test_language_flag(self) -> None: ns = _parse_breakage("--language", "Python") assert ns.language == "Python" def test_strict_flag(self) -> None: ns = _parse_breakage("--strict") assert ns.strict is True