"""Seven-tier tests for ``muse/cli/commands/velocity.py``. Tiers ----- Unit — _module_of; _bar; _WindowStats.net; _compute_predictions; _print_table (empty, with modules, with predictions, truncated). Integration — _VelocityJson TypedDict fields; -j alias; register() docstring; run() docstring envelope fields. End-to-end — --json emits schema_version/mode/exit_code/duration_ms; -j alias; --predict -j; human output unchanged; empty repo. Stress — 1 000 _module_of; 500 _bar; _print_table with 200 modules. Data integrity — schema_version str; exit_code int; duration_ms float; modules list; predictions list; window_size preserved. Security — hostile address in predictions survives JSON; SQL injection in since; very long module path; unicode addresses. Performance — 1 000 _module_of under 200 ms; velocity JSON completes quickly. """ from __future__ import annotations from collections.abc import Mapping import json import os import pathlib import textwrap import threading import time from typing import TYPE_CHECKING, get_type_hints import pytest from tests.cli_test_helper import CliRunner, InvokeResult if TYPE_CHECKING: from muse.cli.commands.velocity import _WindowStats, _ModuleAccumulator, _SymbolFreq runner = CliRunner() # ────────────────────────────────────────────────────────────────────────────── # Shared helpers # ────────────────────────────────────────────────────────────────────────────── def _make_window(**kw: str) -> "_WindowStats": from muse.cli.commands.velocity import _WindowStats w = _WindowStats() for k, v in kw.items(): setattr(w, k, v) return w def _make_accumulator(current: "_WindowStats | None" = None, prior: "_WindowStats | None" = None, last_active_rank: int = -1, stagnant_commits: int = 0) -> "_ModuleAccumulator": from muse.cli.commands.velocity import _ModuleAccumulator acc = _ModuleAccumulator() if current is not None: acc.current = current if prior is not None: acc.prior = prior acc.last_active_rank = last_active_rank acc.stagnant_commits = stagnant_commits return acc def _make_sym_freq(frequency: int = 3, last_rank: int = 0, module: str = "src/") -> "_SymbolFreq": from muse.cli.commands.velocity import _SymbolFreq sf = _SymbolFreq(frequency=frequency, last_rank=last_rank, module=module) return sf def _commit(repo: pathlib.Path, files: Mapping[str, str], message: str) -> None: for name, content in files.items(): path = repo / name path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") saved = os.getcwd() try: os.chdir(repo) runner.invoke(None, ["code", "add", "."]) runner.invoke(None, ["commit", "-m", message]) finally: os.chdir(saved) def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult: saved = os.getcwd() try: os.chdir(repo) return runner.invoke(None, args) finally: os.chdir(saved) @pytest.fixture() def vel_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Minimal repo with two commits so velocity has commit history to walk.""" saved = os.getcwd() try: os.chdir(tmp_path) runner.invoke(None, ["init"]) finally: os.chdir(saved) _commit(tmp_path, { "src/calc.py": textwrap.dedent("""\ def add(a: int, b: int) -> int: return a + b def subtract(a: int, b: int) -> int: return a - b """), "src/utils.py": textwrap.dedent("""\ def helper(x: str) -> str: return x.upper() """), }, "feat: initial symbols") _commit(tmp_path, { "src/calc.py": textwrap.dedent("""\ def add(a: int, b: int) -> int: return a + b def subtract(a: int, b: int) -> int: return a - b def multiply(a: int, b: int) -> int: return a * b """), "src/engine.py": textwrap.dedent("""\ def process(data: list) -> list: return sorted(data) def validate(item) -> bool: return item is not None """), }, "feat: add multiply and engine module") return tmp_path # ────────────────────────────────────────────────────────────────────────────── # Unit — _module_of # ────────────────────────────────────────────────────────────────────────────── class TestModuleOf: def test_nested_path_returns_dir_with_slash(self) -> None: from muse.cli.commands.velocity import _module_of assert _module_of("muse/core/store.py") == "muse/core/" def test_single_level_path(self) -> None: from muse.cli.commands.velocity import _module_of assert _module_of("tests/test_foo.py") == "tests/" def test_root_file_returns_root_sentinel(self) -> None: from muse.cli.commands.velocity import _module_of assert _module_of("billing.py") == "(root)" def test_windows_backslash_normalised(self) -> None: from muse.cli.commands.velocity import _module_of assert _module_of("muse\\core\\store.py") == "muse/core/" def test_deeply_nested_path(self) -> None: from muse.cli.commands.velocity import _module_of assert _module_of("a/b/c/d/e.py") == "a/b/c/d/" def test_returns_str(self) -> None: from muse.cli.commands.velocity import _module_of assert isinstance(_module_of("src/foo.py"), str) # ────────────────────────────────────────────────────────────────────────────── # Unit — _bar # ────────────────────────────────────────────────────────────────────────────── class TestBar: def test_zero_max_returns_empty(self) -> None: from muse.cli.commands.velocity import _bar assert _bar(5, 0) == "" def test_positive_net_returns_filled_bar(self) -> None: from muse.cli.commands.velocity import _bar result = _bar(10, 10) assert "█" in result def test_negative_net_includes_negative_label(self) -> None: from muse.cli.commands.velocity import _bar result = _bar(-5, 10) assert "net negative" in result def test_zero_net_returns_minimal_bar(self) -> None: from muse.cli.commands.velocity import _bar result = _bar(0, 10) # Zero net with positive max: filled=0 → returns "▏" assert result == "▏" def test_full_bar_has_max_blocks(self) -> None: from muse.cli.commands.velocity import _bar, _BAR_WIDTH result = _bar(100, 100) assert result.count("█") == _BAR_WIDTH def test_partial_bar_proportional(self) -> None: from muse.cli.commands.velocity import _bar full = _bar(10, 10) half = _bar(5, 10) assert len(half) <= len(full) # ────────────────────────────────────────────────────────────────────────────── # Unit — _WindowStats # ────────────────────────────────────────────────────────────────────────────── class TestWindowStats: def test_net_positive(self) -> None: from muse.cli.commands.velocity import _WindowStats w = _WindowStats(added=10, removed=3) assert w.net == 7 def test_net_negative(self) -> None: from muse.cli.commands.velocity import _WindowStats w = _WindowStats(added=2, removed=8) assert w.net == -6 def test_net_zero(self) -> None: from muse.cli.commands.velocity import _WindowStats w = _WindowStats(added=5, removed=5) assert w.net == 0 def test_defaults_are_zero(self) -> None: from muse.cli.commands.velocity import _WindowStats w = _WindowStats() assert w.added == 0 assert w.removed == 0 assert w.modified == 0 assert w.active_commits == 0 assert w.net == 0 # ────────────────────────────────────────────────────────────────────────────── # Unit — _compute_predictions # ────────────────────────────────────────────────────────────────────────────── class TestComputePredictions: def test_empty_symbol_freq_returns_empty(self) -> None: from muse.cli.commands.velocity import _compute_predictions result = _compute_predictions({}, {}, window_size=20, top_k=5) assert result == [] def test_top_k_zero_returns_empty(self) -> None: from muse.cli.commands.velocity import _compute_predictions sym = {"src/foo.py::bar": _make_sym_freq(frequency=5)} result = _compute_predictions(sym, {}, window_size=20, top_k=0) assert result == [] def test_top_k_limits_results(self) -> None: from muse.cli.commands.velocity import _compute_predictions syms = {f"src/f{i}.py::fn": _make_sym_freq(frequency=i + 1) for i in range(10)} result = _compute_predictions(syms, {}, window_size=20, top_k=3) assert len(result) == 3 def test_higher_frequency_ranks_first(self) -> None: from muse.cli.commands.velocity import _compute_predictions syms = { "a.py::low": _make_sym_freq(frequency=1, last_rank=0), "b.py::high": _make_sym_freq(frequency=10, last_rank=0), } result = _compute_predictions(syms, {}, window_size=20, top_k=2) assert result[0]["address"] == "b.py::high" def test_result_has_required_fields(self) -> None: from muse.cli.commands.velocity import _compute_predictions syms = {"src/foo.py::bar": _make_sym_freq(frequency=3)} result = _compute_predictions(syms, {}, window_size=20, top_k=1) assert len(result) == 1 out = result[0] for field in ("address", "module", "score", "frequency", "last_commit_rank"): assert field in out, f"missing: {field}" def test_score_is_positive(self) -> None: from muse.cli.commands.velocity import _compute_predictions syms = {"src/foo.py::bar": _make_sym_freq(frequency=5, last_rank=1)} result = _compute_predictions(syms, {}, window_size=20, top_k=1) assert result[0]["score"] > 0.0 def test_recent_symbol_scores_higher_than_old(self) -> None: from muse.cli.commands.velocity import _compute_predictions syms = { "a.py::recent": _make_sym_freq(frequency=2, last_rank=0), "b.py::old": _make_sym_freq(frequency=2, last_rank=15), } result = _compute_predictions(syms, {}, window_size=20, top_k=2) assert result[0]["address"] == "a.py::recent" def test_module_velocity_boost_applied(self) -> None: from muse.cli.commands.velocity import _compute_predictions, _ModuleAccumulator, _WindowStats fast_mod = _ModuleAccumulator() fast_mod.current = _WindowStats(added=10, removed=0) slow_mod = _ModuleAccumulator() slow_mod.current = _WindowStats(added=0, removed=0) modules = {"fast/": fast_mod, "slow/": slow_mod} syms = { "fast/a.py::fn": _make_sym_freq(frequency=3, last_rank=0, module="fast/"), "slow/b.py::fn": _make_sym_freq(frequency=3, last_rank=0, module="slow/"), } result = _compute_predictions(syms, modules, window_size=20, top_k=2) # fast/ module gets velocity boost assert result[0]["module"] == "fast/" # ────────────────────────────────────────────────────────────────────────────── # Unit — _print_table # ────────────────────────────────────────────────────────────────────────────── class TestPrintTable: def test_empty_ranked_prints_no_changes_message(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.velocity import _print_table _print_table([], [], ref="dev", commits_analysed=10, window_size=5, truncated=False, since=None) assert "no modules" in capsys.readouterr().out.lower() def test_header_shows_ref(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.velocity import _print_table _print_table([], [], ref="my-branch", commits_analysed=0, window_size=5, truncated=False, since=None) assert "my-branch" in capsys.readouterr().out def test_truncated_shows_warning(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.velocity import _print_table _print_table([], [], ref="dev", commits_analysed=100, window_size=5, truncated=True, since=None) assert "truncated" in capsys.readouterr().out def test_since_shown_in_scope(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.velocity import _print_table _print_table([], [], ref="dev", commits_analysed=10, window_size=5, truncated=False, since="v1.0") assert "v1.0" in capsys.readouterr().out def test_module_row_shown(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.velocity import _print_table acc = _make_accumulator( current=_make_window(added=5, removed=1, modified=3), ) _print_table([("src/core/", acc)], [], ref="dev", commits_analysed=20, window_size=10, truncated=False, since=None) assert "src/core/" in capsys.readouterr().out def test_stagnant_module_shows_note(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.velocity import _print_table acc = _make_accumulator(stagnant_commits=8) _print_table([("docs/", acc)], [], ref="dev", commits_analysed=20, window_size=10, truncated=False, since=None) assert "stagnant" in capsys.readouterr().out def test_predictions_shown(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.velocity import _print_table, _PredictionOut pred = _PredictionOut(address="src/core/store.py::read", module="src/core/", score=0.91, frequency=5, last_commit_rank=0) acc = _make_accumulator(current=_make_window(added=1)) _print_table([("src/core/", acc)], [pred], ref="dev", commits_analysed=20, window_size=10, truncated=False, since=None) assert "src/core/store.py::read" in capsys.readouterr().out def test_acceleration_leader_shown(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.velocity import _print_table current = _make_window(added=10, removed=0, modified=5) prior = _make_window(added=2, removed=0, modified=1) acc = _make_accumulator(current=current, prior=prior) _print_table([("src/hot/", acc)], [], ref="dev", commits_analysed=40, window_size=20, truncated=False, since=None) out = capsys.readouterr().out assert "Acceleration" in out or "src/hot/" in out # ────────────────────────────────────────────────────────────────────────────── # Integration — TypedDict, alias, docstrings # ────────────────────────────────────────────────────────────────────────────── class TestTypedDict: def test_velocity_json_has_schema_version(self) -> None: from muse.cli.commands.velocity import _VelocityJson assert "schema" in get_type_hints(_VelocityJson) def test_velocity_json_has_exit_code(self) -> None: from muse.cli.commands.velocity import _VelocityJson assert "exit_code" in get_type_hints(_VelocityJson) def test_velocity_json_has_duration_ms(self) -> None: from muse.cli.commands.velocity import _VelocityJson assert "duration_ms" in get_type_hints(_VelocityJson) def test_velocity_json_has_mode(self) -> None: from muse.cli.commands.velocity import _VelocityJson assert "mode" in get_type_hints(_VelocityJson) def test_velocity_json_has_modules(self) -> None: from muse.cli.commands.velocity import _VelocityJson assert "modules" in get_type_hints(_VelocityJson) def test_velocity_json_has_predictions(self) -> None: from muse.cli.commands.velocity import _VelocityJson assert "predictions" in get_type_hints(_VelocityJson) def test_velocity_json_has_ref(self) -> None: from muse.cli.commands.velocity import _VelocityJson assert "ref" in get_type_hints(_VelocityJson) def test_velocity_json_has_window_size(self) -> None: from muse.cli.commands.velocity import _VelocityJson assert "window_size" in get_type_hints(_VelocityJson) def test_velocity_json_has_truncated(self) -> None: from muse.cli.commands.velocity import _VelocityJson assert "truncated" in get_type_hints(_VelocityJson) class TestAliasRegistration: def test_j_alias_registered(self) -> None: from muse.cli.commands.velocity import register import argparse p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) ns = p.parse_args(["velocity", "-j"]) assert ns.json_out is True def test_json_long_form_works(self) -> None: from muse.cli.commands.velocity import register import argparse p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) ns = p.parse_args(["velocity", "--json"]) assert ns.json_out is True def test_j_and_predict_parse_together(self) -> None: from muse.cli.commands.velocity import register import argparse p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) ns = p.parse_args(["velocity", "--predict", "5", "-j"]) assert ns.json_out is True assert ns.predict == 5 class TestDocstrings: def test_register_mentions_j_alias(self) -> None: from muse.cli.commands.velocity import register doc = register.__doc__ or "" assert "-j" in doc or "--json" in doc def test_run_mentions_schema_version(self) -> None: from muse.cli.commands.velocity import run assert "schema" in (run.__doc__ or "") def test_run_mentions_exit_code(self) -> None: from muse.cli.commands.velocity import run assert "exit_code" in (run.__doc__ or "") def test_run_mentions_duration_ms(self) -> None: from muse.cli.commands.velocity import run assert "duration_ms" in (run.__doc__ or "") def test_run_mentions_mode(self) -> None: from muse.cli.commands.velocity import run assert "mode" in (run.__doc__ or "") # ────────────────────────────────────────────────────────────────────────────── # End-to-end # ────────────────────────────────────────────────────────────────────────────── class TestEndToEnd: def test_json_exits_zero(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--json"]) assert r.exit_code == 0 def test_json_emits_schema_version(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--json"]) assert r.exit_code == 0 assert "schema" in json.loads(r.output) def test_json_emits_mode(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--json"]) assert r.exit_code == 0 assert json.loads(r.output)["mode"] == "velocity" def test_json_emits_exit_code(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--json"]) assert r.exit_code == 0 assert isinstance(json.loads(r.output)["exit_code"], int) def test_json_emits_duration_ms(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--json"]) assert r.exit_code == 0 d = json.loads(r.output) assert isinstance(d["duration_ms"], float) assert d["duration_ms"] >= 0.0 def test_json_emits_modules_list(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--json"]) assert r.exit_code == 0 assert isinstance(json.loads(r.output)["modules"], list) def test_json_emits_predictions_list(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--json"]) assert r.exit_code == 0 assert isinstance(json.loads(r.output)["predictions"], list) def test_json_emits_window_size(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--window", "5", "--json"]) assert r.exit_code == 0 d = json.loads(r.output) assert d["window_size"] == 5 def test_json_emits_ref(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--json"]) assert r.exit_code == 0 assert isinstance(json.loads(r.output)["ref"], str) def test_json_emits_truncated(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--json"]) assert r.exit_code == 0 assert isinstance(json.loads(r.output)["truncated"], bool) def test_j_alias_produces_json(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "-j"]) assert r.exit_code == 0 d = json.loads(r.output) assert d["mode"] == "velocity" def test_predict_j_includes_predictions(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--predict", "3", "-j"]) assert r.exit_code == 0 d = json.loads(r.output) assert "predictions" in d # predictions may be empty if no current-window hits, but key must exist assert isinstance(d["predictions"], list) def test_human_output_still_works(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity"]) assert r.exit_code == 0 assert "Symbol velocity" in r.output def test_schema_version_matches_muse_version(self, vel_repo: pathlib.Path) -> None: from muse import __version__ r = _invoke(vel_repo, ["code", "velocity", "--json"]) assert r.exit_code == 0 assert isinstance(json.loads(r.output)["schema"], int) # ────────────────────────────────────────────────────────────────────────────── # Stress # ────────────────────────────────────────────────────────────────────────────── class TestStress: def test_1000_module_of(self) -> None: from muse.cli.commands.velocity import _module_of for i in range(1_000): r = _module_of(f"src/pkg_{i}/file_{i}.py") assert r == f"src/pkg_{i}/" def test_500_bar_calls(self) -> None: from muse.cli.commands.velocity import _bar for i in range(500): r = _bar(i % 20, 20) assert isinstance(r, str) def test_print_table_200_modules(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.velocity import _print_table ranked = [ (f"mod_{i}/", _make_accumulator( current=_make_window(added=i, removed=0, modified=i), )) for i in range(200) ] _print_table(ranked, [], ref="dev", commits_analysed=400, window_size=20, truncated=False, since=None) out = capsys.readouterr().out assert "mod_0/" in out def test_concurrent_module_of(self) -> None: from muse.cli.commands.velocity import _module_of results: list[str] = [] lock = threading.Lock() def _run() -> None: r = _module_of("src/core/store.py") with lock: results.append(r) threads = [threading.Thread(target=_run) for _ in range(50)] for t in threads: t.start() for t in threads: t.join() assert all(r == "src/core/" for r in results) assert len(results) == 50 # ────────────────────────────────────────────────────────────────────────────── # Data integrity # ────────────────────────────────────────────────────────────────────────────── class TestDataIntegrity: def test_schema_version_is_str(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--json"]) assert isinstance(json.loads(r.output)["schema"], int) def test_schema_version_nonempty(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--json"]) assert json.loads(r.output)["schema"] > 0 def test_exit_code_is_int(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--json"]) assert isinstance(json.loads(r.output)["exit_code"], int) def test_duration_ms_is_float(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--json"]) d = json.loads(r.output) assert isinstance(d["duration_ms"], float) def test_duration_ms_non_negative(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--json"]) assert json.loads(r.output)["duration_ms"] >= 0.0 def test_window_size_preserved(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--window", "7", "--json"]) assert json.loads(r.output)["window_size"] == 7 def test_modules_entries_have_required_fields(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--json"]) d = json.loads(r.output) for m in d["modules"]: for field in ("module", "current", "prior", "acceleration", "stagnant_commits"): assert field in m, f"missing: {field}" def test_json_serialisable(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--json"]) json.loads(r.output) # must not raise def test_mode_is_velocity(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--json"]) assert json.loads(r.output)["mode"] == "velocity" def test_predictions_entries_have_required_fields(self, vel_repo: pathlib.Path) -> None: r = _invoke(vel_repo, ["code", "velocity", "--predict", "5", "--json"]) d = json.loads(r.output) for p in d["predictions"]: for field in ("address", "score"): assert field in p, f"missing: {field}" # ────────────────────────────────────────────────────────────────────────────── # Security # ────────────────────────────────────────────────────────────────────────────── class TestSecurity: def test_hostile_address_in_predictions_survives_json(self) -> None: from muse.cli.commands.velocity import _compute_predictions, _SymbolFreq malicious = '"; DROP TABLE commits; --' syms = {malicious: _make_sym_freq(frequency=5)} result = _compute_predictions(syms, {}, window_size=20, top_k=1) assert len(result) == 1 serialised = json.dumps(result[0]) assert json.loads(serialised)["address"] == malicious def test_xss_in_module_name_does_not_crash(self, capsys: pytest.CaptureFixture[str]) -> None: from muse.cli.commands.velocity import _print_table acc = _make_accumulator(current=_make_window(added=1)) _print_table([("/", acc)], [], ref="dev", commits_analysed=5, window_size=2, truncated=False, since=None) assert "