"""Tests for ``muse rev-list`` — raw commit ID stream with filters. Coverage tiers: - Unit: _walk_from, _parse_range, _parse_date, filter predicates - Integration: --count, --max-count, --first-parent, --no-merges, --merges, --author, --after, --before, --touches, --reverse, --json, A..B range syntax - End-to-end: full CLI invocation via CliRunner - Security: ref injection, --touches path traversal, --author regex injection - Stress: 500-commit chain with --count (flat memory), --touches on large repo """ from __future__ import annotations from collections.abc import Callable, Mapping import json import os import pathlib import pytest from muse.core.types import split_id from tests.cli_test_helper import CliRunner, InvokeResult runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _invoke(repo: pathlib.Path, *args: str) -> InvokeResult: from muse.cli.app import main as cli saved = os.getcwd() try: os.chdir(repo) return runner.invoke(cli, ["rev-list", *args]) finally: os.chdir(saved) def _init(repo: pathlib.Path) -> None: from muse.cli.app import main as cli repo.mkdir(parents=True, exist_ok=True) saved = os.getcwd() try: os.chdir(repo) runner.invoke(cli, ["init"]) finally: os.chdir(saved) def _commit( repo: pathlib.Path, msg: str = "commit", filename: str | None = None, author: str | None = None, ) -> str: """Commit one file and return the commit_id.""" from muse.cli.app import main as cli fname = filename or f"f_{abs(hash(msg)) % 99999}.py" (repo / fname).write_text(f"# {msg}\n") saved = os.getcwd() try: os.chdir(repo) runner.invoke(cli, ["code", "add", fname]) extra = ["--author", author] if author else [] result = runner.invoke(cli, ["commit", "-m", msg, "--json", *extra]) data = json.loads(result.stdout) return data["commit_id"] finally: os.chdir(saved) def _fresh_repo(tmp: pathlib.Path, n: int = 3) -> tuple[pathlib.Path, list[str]]: """Create a repo with n commits, return (repo_path, [commit_ids oldest→newest]).""" repo = tmp / "repo" _init(repo) ids: list[str] = [] for i in range(n): cid = _commit(repo, f"commit {i}", filename=f"file_{i}.py") ids.append(cid) return repo, ids # --------------------------------------------------------------------------- # Unit — internal helpers # --------------------------------------------------------------------------- def test_walk_from_uses_deque() -> None: """_walk_from must use collections.deque; no variable.pop(0) calls in code.""" import inspect, ast from muse.cli.commands import rev_list as mod src = inspect.getsource(mod._walk_from) assert "deque" in src, "_walk_from must use collections.deque" # Parse the AST to check for list.pop(0) calls — this skips docstrings. tree = ast.parse(src) for node in ast.walk(tree): if ( isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute) and node.func.attr == "pop" and len(node.args) == 1 and isinstance(node.args[0], ast.Constant) and node.args[0].value == 0 ): raise AssertionError("_walk_from must not call .pop(0) — use deque.popleft()") def test_parse_range_dotdot() -> None: """'A..B' must be parsed into (exclude='A', include='B').""" from muse.cli.commands.rev_list import _parse_range exc, inc = _parse_range("abc..def") assert exc == "abc" assert inc == "def" def test_parse_range_single() -> None: """A plain ref with no '..' must parse to (exclude=None, include=ref).""" from muse.cli.commands.rev_list import _parse_range exc, inc = _parse_range("HEAD") assert exc is None assert inc == "HEAD" def test_parse_date_valid() -> None: from muse.cli.commands.rev_list import _parse_date import datetime dt = _parse_date("2026-01-15") assert dt.year == 2026 assert dt.month == 1 assert dt.day == 15 assert dt.tzinfo == datetime.timezone.utc def test_parse_date_invalid_raises() -> None: from muse.cli.commands.rev_list import _parse_date with pytest.raises(ValueError, match="date"): _parse_date("not-a-date") # --------------------------------------------------------------------------- # Integration — basic output # --------------------------------------------------------------------------- def test_rev_list_emits_one_id_per_line(tmp_path: pathlib.Path) -> None: repo, ids = _fresh_repo(tmp_path, n=3) result = _invoke(repo, "HEAD") assert result.exit_code == 0 lines = [l for l in result.stdout.strip().splitlines() if l] assert len(lines) == 3 # Each line must be a sha256:-prefixed commit ID (7 prefix + 64 hex chars = 71) for line in lines: assert line.startswith("sha256:"), f"expected sha256: prefix, got {line!r}" _, hex_part = split_id(line) assert len(hex_part) == 64, f"expected 64-char hex after prefix, got {len(hex_part)}" int(hex_part, 16) def test_rev_list_newest_first(tmp_path: pathlib.Path) -> None: repo, ids = _fresh_repo(tmp_path, n=3) result = _invoke(repo, "HEAD") lines = [l for l in result.stdout.strip().splitlines() if l] # ids list is oldest→newest; rev-list default is newest→oldest assert lines[0] == ids[-1] assert lines[-1] == ids[0] def test_rev_list_count(tmp_path: pathlib.Path) -> None: repo, ids = _fresh_repo(tmp_path, n=5) result = _invoke(repo, "--count", "HEAD") assert result.exit_code == 0 assert result.stdout.strip() == "5" def test_rev_list_max_count(tmp_path: pathlib.Path) -> None: repo, ids = _fresh_repo(tmp_path, n=5) result = _invoke(repo, "--max-count", "2", "HEAD") lines = [l for l in result.stdout.strip().splitlines() if l] assert len(lines) == 2 assert lines[0] == ids[-1] # most recent def test_rev_list_reverse(tmp_path: pathlib.Path) -> None: repo, ids = _fresh_repo(tmp_path, n=3) result = _invoke(repo, "--reverse", "HEAD") lines = [l for l in result.stdout.strip().splitlines() if l] assert lines[0] == ids[0] # oldest first assert lines[-1] == ids[-1] # newest last def test_rev_list_json(tmp_path: pathlib.Path) -> None: repo, ids = _fresh_repo(tmp_path, n=3) result = _invoke(repo, "--json", "HEAD") assert result.exit_code == 0 data = json.loads(result.stdout) assert "commit_ids" in data assert len(data["commit_ids"]) == 3 assert data["commit_ids"][0] == ids[-1] # --------------------------------------------------------------------------- # Integration — filter flags # --------------------------------------------------------------------------- def _make_merge_commit(repo: pathlib.Path) -> None: """Create a divergent history and merge it, producing a real merge commit.""" from muse.cli.app import main as cli saved = os.getcwd() try: os.chdir(repo) runner.invoke(cli, ["checkout", "-b", "feat"]) _commit(repo, "feat work", filename="feat_only.py") runner.invoke(cli, ["checkout", "main"]) # Commit on main so histories diverge → true merge commit (not FF) _commit(repo, "main diverge", filename="main_only.py") runner.invoke(cli, ["merge", "feat"]) finally: os.chdir(saved) def test_rev_list_no_merges(tmp_path: pathlib.Path) -> None: """--no-merges excludes commits that have two parents.""" repo, ids = _fresh_repo(tmp_path, n=2) _make_merge_commit(repo) result_all = _invoke(repo, "--count", "HEAD") result_no_merges = _invoke(repo, "--no-merges", "--count", "HEAD") total = int(result_all.stdout.strip()) no_merge_count = int(result_no_merges.stdout.strip()) assert no_merge_count < total def test_rev_list_merges_only(tmp_path: pathlib.Path) -> None: """--merges emits only merge commits.""" repo, ids = _fresh_repo(tmp_path, n=2) _make_merge_commit(repo) result = _invoke(repo, "--merges", "--count", "HEAD") assert int(result.stdout.strip()) >= 1 def test_rev_list_author_filter(tmp_path: pathlib.Path) -> None: repo = tmp_path / "repo" _init(repo) _commit(repo, "alice commit", author="Alice") _commit(repo, "bob commit", author="Bob") _commit(repo, "alice again", author="Alice") result = _invoke(repo, "--author", "Alice", "--count", "HEAD") assert result.exit_code == 0 assert result.stdout.strip() == "2" def test_rev_list_after_filter(tmp_path: pathlib.Path) -> None: """--after excludes commits before the date.""" repo, ids = _fresh_repo(tmp_path, n=3) # All commits are in the future (2026) so --after 2020-01-01 keeps all result_all = _invoke(repo, "--count", "HEAD") result_after = _invoke(repo, "--after", "2020-01-01", "--count", "HEAD") assert result_all.stdout.strip() == result_after.stdout.strip() # --after 2099-01-01 should keep nothing result_future = _invoke(repo, "--after", "2099-01-01", "--count", "HEAD") assert result_future.stdout.strip() == "0" def test_rev_list_before_filter(tmp_path: pathlib.Path) -> None: """--before excludes commits after the date.""" repo, ids = _fresh_repo(tmp_path, n=3) result_before = _invoke(repo, "--before", "2099-01-01", "--count", "HEAD") assert int(result_before.stdout.strip()) == 3 result_past = _invoke(repo, "--before", "2020-01-01", "--count", "HEAD") assert result_past.stdout.strip() == "0" def test_rev_list_touches_filter(tmp_path: pathlib.Path) -> None: """--touches only emits commits that changed the specified path.""" repo = tmp_path / "repo" _init(repo) _commit(repo, "add alpha", filename="alpha.py") _commit(repo, "add beta", filename="beta.py") _commit(repo, "modify alpha", filename="alpha.py") result = _invoke(repo, "--touches", "alpha.py", "--count", "HEAD") assert result.exit_code == 0 assert result.stdout.strip() == "2" def test_rev_list_touches_directory_prefix(tmp_path: pathlib.Path) -> None: """--touches src/ matches all files under src/.""" repo = tmp_path / "repo" _init(repo) (repo / "src").mkdir() _commit(repo, "src file", filename="src/main.py") _commit(repo, "root file", filename="root.py") _commit(repo, "src again", filename="src/utils.py") result = _invoke(repo, "--touches", "src/", "--count", "HEAD") assert result.exit_code == 0 assert result.stdout.strip() == "2" # --------------------------------------------------------------------------- # Integration — range syntax # --------------------------------------------------------------------------- def test_rev_list_range_syntax(tmp_path: pathlib.Path) -> None: """A..B emits commits reachable from B but not from A.""" from muse.cli.app import main as cli repo, base_ids = _fresh_repo(tmp_path, n=2) saved = os.getcwd() try: os.chdir(repo) runner.invoke(cli, ["checkout", "-b", "feat"]) finally: os.chdir(saved) feat_id1 = _commit(repo, "feat 1", filename="feat1.py") feat_id2 = _commit(repo, "feat 2", filename="feat2.py") result = _invoke(repo, "main..feat") lines = [l for l in result.stdout.strip().splitlines() if l] assert len(lines) == 2 assert feat_id2 in lines assert feat_id1 in lines # Base commits must NOT appear for base_id in base_ids: assert base_id not in lines def test_rev_list_range_count(tmp_path: pathlib.Path) -> None: """--count with range counts only the range, not the full history.""" from muse.cli.app import main as cli repo, _ = _fresh_repo(tmp_path, n=3) saved = os.getcwd() try: os.chdir(repo) runner.invoke(cli, ["checkout", "-b", "feat"]) finally: os.chdir(saved) _commit(repo, "feat A", filename="fa.py") _commit(repo, "feat B", filename="fb.py") result = _invoke(repo, "--count", "main..feat") assert result.stdout.strip() == "2" # --------------------------------------------------------------------------- # Integration — first-parent # --------------------------------------------------------------------------- def test_rev_list_first_parent(tmp_path: pathlib.Path) -> None: """--first-parent only follows the first-parent chain.""" from muse.cli.app import main as cli repo, base_ids = _fresh_repo(tmp_path, n=2) saved = os.getcwd() try: os.chdir(repo) runner.invoke(cli, ["checkout", "-b", "feat"]) _commit(repo, "feat work", filename="feat.py") runner.invoke(cli, ["checkout", "main"]) runner.invoke(cli, ["merge", "feat"]) finally: os.chdir(saved) result_all = _invoke(repo, "--count", "HEAD") result_fp = _invoke(repo, "--first-parent", "--count", "HEAD") assert int(result_fp.stdout.strip()) <= int(result_all.stdout.strip()) # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- def test_rev_list_ref_not_found_exits_nonzero(tmp_path: pathlib.Path) -> None: repo, _ = _fresh_repo(tmp_path, n=1) result = _invoke(repo, "nonexistent-branch") assert result.exit_code != 0 def test_rev_list_author_regex_special_chars_handled(tmp_path: pathlib.Path) -> None: """Malformed regex in --author must produce a clean error, not a crash.""" repo, _ = _fresh_repo(tmp_path, n=1) result = _invoke(repo, "--author", "[invalid-regex", "--count", "HEAD") # Should either work (literal match fallback) or exit with a clean error code assert result.exit_code in (0, 1, 2) def test_rev_list_touches_path_traversal_rejected(tmp_path: pathlib.Path) -> None: """--touches with path traversal sequences must be rejected.""" repo, _ = _fresh_repo(tmp_path, n=1) result = _invoke(repo, "--touches", "../../../etc/passwd", "--count", "HEAD") assert result.exit_code != 0 # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- def test_rev_list_count_flat_memory_large_chain(tmp_path: pathlib.Path) -> None: """--count on a 500-commit chain must complete without building a list.""" import tracemalloc repo = tmp_path / "repo" _init(repo) for i in range(500): (repo / f"f{i}.py").write_text(f"# {i}\n") saved = os.getcwd() try: os.chdir(repo) runner.invoke(cli_main(), ["code", "add", f"f{i}.py"]) runner.invoke(cli_main(), ["commit", "-m", f"c{i}"]) finally: os.chdir(saved) tracemalloc.start() result = _invoke(repo, "--count", "HEAD") _, peak = tracemalloc.get_traced_memory() tracemalloc.stop() assert result.stdout.strip() == "500" # Peak memory for --count should stay well under 50 MB assert peak < 50 * 1024 * 1024, f"Peak memory {peak // 1024} KB exceeds limit" def cli_main() -> "Callable[..., None]": from muse.cli.app import main return main def test_rev_list_stress_touches_large_repo(tmp_path: pathlib.Path) -> None: """--touches on a 100-file, 50-commit repo completes without error.""" repo = tmp_path / "repo" _init(repo) for i in range(50): fname = f"file_{i % 10}.py" # 10 files, cycling (repo / fname).write_text(f"# iteration {i}\n") saved = os.getcwd() try: os.chdir(repo) runner.invoke(cli_main(), ["code", "add", fname]) runner.invoke(cli_main(), ["commit", "-m", f"c{i}"]) finally: os.chdir(saved) result = _invoke(repo, "--touches", "file_0.py", "--count", "HEAD") assert result.exit_code == 0 count = int(result.stdout.strip()) assert count >= 5 # file_0 touched at commits 0, 10, 20, 30, 40 # --------------------------------------------------------------------------- # JSON schema — duration_ms + exit_code on all output paths # --------------------------------------------------------------------------- class TestJsonSchema: """--json output must carry duration_ms and exit_code on every path.""" def test_success_has_duration_ms(self, tmp_path: pathlib.Path) -> None: repo, _ = _fresh_repo(tmp_path, n=2) result = _invoke(repo, "--json", "HEAD") assert result.exit_code == 0 d = json.loads(result.stdout) assert "duration_ms" in d, "duration_ms missing from --json output" assert isinstance(d["duration_ms"], (int, float)) assert d["duration_ms"] >= 0 def test_success_has_exit_code_zero(self, tmp_path: pathlib.Path) -> None: repo, _ = _fresh_repo(tmp_path, n=2) result = _invoke(repo, "--json", "HEAD") d = json.loads(result.stdout) assert "exit_code" in d, "exit_code missing from --json output" assert d["exit_code"] == 0 def test_empty_result_json_has_schema(self, tmp_path: pathlib.Path) -> None: """When --after filters everything out, --json still emits a full envelope.""" repo, _ = _fresh_repo(tmp_path, n=2) result = _invoke(repo, "--json", "--after", "2099-01-01", "HEAD") d = json.loads(result.stdout) assert d["commit_ids"] == [] assert "duration_ms" in d assert "exit_code" in d def test_reverse_json_has_schema(self, tmp_path: pathlib.Path) -> None: repo, ids = _fresh_repo(tmp_path, n=3) result = _invoke(repo, "--json", "--reverse", "HEAD") d = json.loads(result.stdout) assert "duration_ms" in d assert d["exit_code"] == 0 assert d["commit_ids"][0] == ids[0] # oldest-first # --------------------------------------------------------------------------- # --count --json — structured output instead of bare integer # --------------------------------------------------------------------------- class TestCountJson: """--count --json must emit a JSON dict, not a bare integer.""" def test_count_json_is_valid_json(self, tmp_path: pathlib.Path) -> None: repo, _ = _fresh_repo(tmp_path, n=3) result = _invoke(repo, "--count", "--json", "HEAD") assert result.exit_code == 0 # Must parse as JSON — not a bare integer d = json.loads(result.stdout) assert isinstance(d, dict) def test_count_json_has_count_key(self, tmp_path: pathlib.Path) -> None: repo, _ = _fresh_repo(tmp_path, n=4) result = _invoke(repo, "--count", "--json", "HEAD") d = json.loads(result.stdout) assert "count" in d assert d["count"] == 4 def test_count_json_has_duration_ms(self, tmp_path: pathlib.Path) -> None: repo, _ = _fresh_repo(tmp_path, n=2) result = _invoke(repo, "--count", "--json", "HEAD") d = json.loads(result.stdout) assert "duration_ms" in d assert isinstance(d["duration_ms"], (int, float)) def test_count_json_has_exit_code_zero(self, tmp_path: pathlib.Path) -> None: repo, _ = _fresh_repo(tmp_path, n=2) result = _invoke(repo, "--count", "--json", "HEAD") d = json.loads(result.stdout) assert d["exit_code"] == 0 def test_count_without_json_still_plain_int(self, tmp_path: pathlib.Path) -> None: """--count alone (no --json) must emit a bare integer, not a JSON dict.""" repo, _ = _fresh_repo(tmp_path, n=3) result = _invoke(repo, "--count", "HEAD") assert result.exit_code == 0 assert result.stdout.strip() == "3" # Confirm it is NOT a structured JSON dict (bare ints are valid JSON but # agents relying on --count should not get a dict without --json). parsed = json.loads(result.stdout) assert not isinstance(parsed, dict), "plain --count must not emit a JSON dict" def test_count_json_with_filter(self, tmp_path: pathlib.Path) -> None: """--count --json works correctly with a filter applied.""" repo = tmp_path / "repo" _init(repo) _commit(repo, "alice A", author="Alice") _commit(repo, "bob B", author="Bob") _commit(repo, "alice C", author="Alice") result = _invoke(repo, "--count", "--json", "--author", "Alice", "HEAD") d = json.loads(result.stdout) assert d["count"] == 2 # --------------------------------------------------------------------------- # Error JSON — all error exit paths emit structured JSON when --json is set # --------------------------------------------------------------------------- class TestErrorJson: """Every error path must emit a parseable JSON envelope when --json is passed.""" def _assert_error_json(self, result: InvokeResult) -> Mapping[str, object]: assert result.exit_code != 0, "expected non-zero exit on error" d = json.loads(result.stdout) assert "error" in d, f"error key missing: {d}" assert "exit_code" in d assert d["exit_code"] != 0 assert "duration_ms" in d return d def test_bad_ref_json(self, tmp_path: pathlib.Path) -> None: repo, _ = _fresh_repo(tmp_path, n=1) result = _invoke(repo, "--json", "nonexistent-ref") self._assert_error_json(result) def test_mutual_exclusion_json(self, tmp_path: pathlib.Path) -> None: repo, _ = _fresh_repo(tmp_path, n=1) result = _invoke(repo, "--json", "--no-merges", "--merges", "HEAD") self._assert_error_json(result) def test_touches_traversal_json(self, tmp_path: pathlib.Path) -> None: repo, _ = _fresh_repo(tmp_path, n=1) result = _invoke(repo, "--json", "--touches", "../etc/passwd", "HEAD") self._assert_error_json(result) def test_bad_after_date_json(self, tmp_path: pathlib.Path) -> None: repo, _ = _fresh_repo(tmp_path, n=1) result = _invoke(repo, "--json", "--after", "not-a-date", "HEAD") self._assert_error_json(result) def test_bad_before_date_json(self, tmp_path: pathlib.Path) -> None: repo, _ = _fresh_repo(tmp_path, n=1) result = _invoke(repo, "--json", "--before", "not-a-date", "HEAD") self._assert_error_json(result) def test_bad_exclude_ref_json(self, tmp_path: pathlib.Path) -> None: """A..B where A is invalid must also emit structured JSON error.""" repo, _ = _fresh_repo(tmp_path, n=1) result = _invoke(repo, "--json", "nonexistent..HEAD") self._assert_error_json(result) def test_error_json_has_message(self, tmp_path: pathlib.Path) -> None: repo, _ = _fresh_repo(tmp_path, n=1) result = _invoke(repo, "--json", "nonexistent-ref") d = json.loads(result.stdout) assert "message" in d assert isinstance(d["message"], str) assert len(d["message"]) > 0 # --------------------------------------------------------------------------- # --after predicate precedence — latent bug guard # --------------------------------------------------------------------------- class TestAfterPredicate: """Guard against regression in --after predicate operator precedence.""" def test_after_far_future_excludes_all(self, tmp_path: pathlib.Path) -> None: """--after 2099-01-01 must match zero commits regardless of tzinfo state.""" repo, _ = _fresh_repo(tmp_path, n=3) result = _invoke(repo, "--after", "2099-01-01", "--count", "HEAD") assert result.exit_code == 0 assert result.stdout.strip() == "0" def test_after_far_past_keeps_all(self, tmp_path: pathlib.Path) -> None: repo, _ = _fresh_repo(tmp_path, n=3) result = _invoke(repo, "--after", "2000-01-01", "--count", "HEAD") assert result.exit_code == 0 assert result.stdout.strip() == "3" def test_after_json_far_future_zero(self, tmp_path: pathlib.Path) -> None: """--after --json with no matches emits commit_ids:[] not a crash.""" repo, _ = _fresh_repo(tmp_path, n=2) result = _invoke(repo, "--json", "--after", "2099-01-01", "HEAD") assert result.exit_code == 0 d = json.loads(result.stdout) assert d["commit_ids"] == [] class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: import argparse from muse.cli.commands.rev_list import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["rev-list"]) assert args.json_out is False def test_json_flag_sets_json_out(self) -> None: import argparse from muse.cli.commands.rev_list import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["rev-list", "--json"]) assert args.json_out is True def test_j_shorthand_sets_json_out(self) -> None: import argparse from muse.cli.commands.rev_list import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["rev-list", "-j"]) assert args.json_out is True