"""Tests for muse name-rev. Coverage tiers -------------- Unit — _build_name_map (flat, hierarchical, symlink skip, bad ref ID, branch_pattern filter, max_walk ceiling), _resolve_prefix (exact, short prefix, ambiguous, no-match), _NameRevEntry schema Integration — tip commit, parent chain, multiple IDs, undefined commit, text output, --name-only, --undefined string, --branches filter, --stdin, short-prefix resolution, --max-walk, --json shorthand, ambiguous prefix Security — ANSI in branch name sanitized, non-hex input rejected, error output to stderr (format, no-args, max-walk, non-hex), no traceback, symlinks skipped, --undefined value sanitized Stress — 50-commit chain, 10-branch repo, 200 sequential calls, stdin with 50 commit IDs """ from __future__ import annotations type _CommitInfoMap = dict[str, tuple[str, int]] import argparse import datetime import io import json import pathlib from tests.cli_test_helper import CliRunner, InvokeResult from muse.cli.commands.name_rev import ( _NameRevEntry, _build_name_map, _resolve_prefix, ) from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, long_id from muse.core.paths import muse_dir, heads_dir, ref_path cli = None # argparse-based CLI; CliRunner ignores this arg runner = CliRunner() # --------------------------------------------------------------------------- # Register flags # --------------------------------------------------------------------------- class TestRegisterFlags: def _parse(self, *args: str) -> argparse.Namespace: from muse.cli.commands.name_rev import register p = argparse.ArgumentParser() sub = p.add_subparsers() register(sub) return p.parse_args(["name-rev", *args]) def test_default_json_out_is_false(self) -> None: ns = self._parse("abc123") assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = self._parse("--json", "abc123") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = self._parse("-j", "abc123") assert ns.json_out is True # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _init_repo(path: pathlib.Path) -> pathlib.Path: muse = muse_dir(path) (muse / "commits").mkdir(parents=True) (muse / "snapshots").mkdir(parents=True) (muse / "objects").mkdir(parents=True) (muse / "refs" / "heads").mkdir(parents=True) (muse / "HEAD").write_text("ref: refs/heads/main\n", encoding="utf-8") (muse / "repo.json").write_text( json.dumps({"repo_id": "test-repo", "domain": "midi"}), encoding="utf-8" ) return path def _env(repo: pathlib.Path) -> Manifest: return {"MUSE_REPO_ROOT": str(repo)} def _snap(repo: pathlib.Path, tag: str) -> str: sid = hash_snapshot({}) write_snapshot( repo, SnapshotRecord( snapshot_id=sid, manifest={}, created_at=datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc), ), ) return sid def _commit( repo: pathlib.Path, tag: str, branch: str = "main", parent: str | None = None, ) -> str: sid = _snap(repo, tag) committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) parent_ids: list[str] = [parent] if parent else [] cid = hash_commit( parent_ids=parent_ids, snapshot_id=sid, message=tag, committed_at_iso=committed_at.isoformat(), author="tester", ) write_commit( repo, CommitRecord( commit_id=cid, branch=branch, snapshot_id=sid, message=tag, committed_at=committed_at, author="tester", parent_commit_id=parent, parent2_commit_id=None, ), ) branch_ref = ref_path(repo, branch) branch_ref.parent.mkdir(parents=True, exist_ok=True) branch_ref.write_text(cid, encoding="utf-8") return cid # Two messages whose content-addressed commit IDs share the 4-char prefix "c1d3". # Verified: hash_commit(parent_ids=[], snapshot_id=hash_snapshot({}), message=msg, committed_at_iso="2026-01-01T00:00:00+00:00", author="tester") _AMBIG_MSG_1 = "commit-search-143" # -> c1d369... _AMBIG_MSG_2 = "commit-search-346" # -> c1d39f... _AMBIG_PREFIX = "c1d3" def _nr(repo: pathlib.Path, *args: str, stdin: str | None = None) -> InvokeResult: """Invoke name-rev in JSON mode (--json always included).""" return runner.invoke( cli, ["name-rev", "--json", *args], env=_env(repo), input=stdin, ) def _nr_text(repo: pathlib.Path, *args: str, stdin: str | None = None) -> InvokeResult: """Invoke name-rev in text mode (no --json flag).""" return runner.invoke( cli, ["name-rev", *args], env=_env(repo), input=stdin, ) # --------------------------------------------------------------------------- # Unit — _NameRevEntry schema # --------------------------------------------------------------------------- class TestNameRevEntrySchema: def test_required_fields(self) -> None: keys = _NameRevEntry.__annotations__ for f in ("commit_id", "input", "name", "branch", "distance", "undefined", "ambiguous"): assert f in keys def test_input_field_present(self) -> None: """New 'input' field tracks the original caller-supplied value.""" assert "input" in _NameRevEntry.__annotations__ def test_ambiguous_field_present(self) -> None: assert "ambiguous" in _NameRevEntry.__annotations__ # --------------------------------------------------------------------------- # Unit — _resolve_prefix # --------------------------------------------------------------------------- class TestResolvePrefix: def _map(self) -> _CommitInfoMap: # Keys must be sha256:-prefixed — that is how the BFS name_map stores them. return { long_id(f"abcd1234{'0' * 56}"): ("main", 0), long_id(f"abcd5678{'0' * 56}"): ("dev", 1), long_id(f"ffff0000{'0' * 56}"): ("feat", 2), } def test_exact_match(self) -> None: m = self._map() k = long_id(f"abcd1234{'0' * 56}") full_id, ambiguous = _resolve_prefix(k, m) assert full_id == k assert not ambiguous def test_short_prefix_unique(self) -> None: m = self._map() # Bare hex prefix — _resolve_prefix normalises to sha256: internally. full_id, ambiguous = _resolve_prefix("abcd1234", m) assert full_id == long_id(f"abcd1234{'0' * 56}") assert not ambiguous def test_short_prefix_ambiguous(self) -> None: m = self._map() # "abcd" matches both "sha256:abcd1234..." and "sha256:abcd5678..." full_id, ambiguous = _resolve_prefix("abcd", m) assert full_id is None assert ambiguous def test_no_match_returns_none(self) -> None: m = self._map() full_id, ambiguous = _resolve_prefix("deadbeef", m) assert full_id is None assert not ambiguous def test_empty_map(self) -> None: full_id, ambiguous = _resolve_prefix("abc123", {}) assert full_id is None assert not ambiguous # --------------------------------------------------------------------------- # Unit — _build_name_map # --------------------------------------------------------------------------- class TestBuildNameMap: def test_empty_heads_dir(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) assert _build_name_map(tmp_path, set()) == {} def test_tip_commit_distance_zero(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c1") m = _build_name_map(tmp_path, {cid}) assert cid in m assert m[cid] == ("main", 0) def test_parent_commit_distance_one(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) c1 = _commit(tmp_path, "c1") c2 = _commit(tmp_path, "c2", parent=c1) m = _build_name_map(tmp_path, {c1, c2}) assert m[c1][1] == 1 # distance from tip (c2) assert m[c2][1] == 0 # tip itself def test_hierarchical_branch_discovered(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit(tmp_path, "main-c", "main") cid = _commit(tmp_path, "feat-c", "feat/my-thing") m = _build_name_map(tmp_path, {cid}) assert cid in m assert m[cid][0] == "feat/my-thing" def test_symlink_ref_skipped(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c", "main") real = heads_dir(tmp_path) / "main" link = heads_dir(tmp_path) / "sym" link.symlink_to(real) m = _build_name_map(tmp_path, {cid}) # The commit should be reachable via main, not via the symlink. assert cid in m assert m[cid][0] == "main" # not "sym" def test_invalid_ref_id_skipped(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c", "main") bad = heads_dir(tmp_path) / "bad" bad.write_text("not-a-sha\n", encoding="utf-8") m = _build_name_map(tmp_path, {cid}) assert cid in m # main still seeded def test_branch_pattern_filter(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) c_main = _commit(tmp_path, "c-main", "main") c_feat = _commit(tmp_path, "c-feat", "feat/x") m = _build_name_map(tmp_path, {c_main, c_feat}, branch_pattern="main") # feat/x commit should be unreachable (only main seeded) assert c_main in m assert c_feat not in m def test_max_walk_ceiling(self, tmp_path: pathlib.Path) -> None: """BFS stops after max_walk steps; deep commits may remain unmapped.""" _init_repo(tmp_path) parent: str | None = None first: str | None = None for i in range(10): cid = _commit(tmp_path, f"c{i}", parent=parent) if first is None: first = cid parent = cid assert first is not None # max_walk=1 means only the tip is visited; grandparent must be unmapped m = _build_name_map(tmp_path, {first}, max_walk=1) assert first not in m # too deep to reach def test_early_exit_when_all_targets_found(self, tmp_path: pathlib.Path) -> None: """When both targets are at tips, BFS should stop without full traversal.""" _init_repo(tmp_path) c1 = _commit(tmp_path, "c1", "main") c2 = _commit(tmp_path, "c2", "dev") m = _build_name_map(tmp_path, {c1, c2}) assert c1 in m assert c2 in m # --------------------------------------------------------------------------- # Integration — basic resolution # --------------------------------------------------------------------------- class TestResolution: def test_tip_commit_distance_zero(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c1") r = _nr(tmp_path, cid) assert r.exit_code == 0 entry = json.loads(r.output)["results"][0] assert entry["commit_id"] == cid assert entry["distance"] == 0 assert entry["undefined"] is False assert entry["ambiguous"] is False assert entry["name"] == "main" def test_parent_commit_named_tilde_one(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) c1 = _commit(tmp_path, "c1") _commit(tmp_path, "c2", parent=c1) r = _nr(tmp_path, c1) entry = json.loads(r.output)["results"][0] assert entry["distance"] == 1 assert entry["name"] == "main~1" def test_grandparent_named_tilde_two(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) c1 = _commit(tmp_path, "grandparent") c2 = _commit(tmp_path, "parent", parent=c1) _commit(tmp_path, "tip", parent=c2) r = _nr(tmp_path, c1) entry = json.loads(r.output)["results"][0] assert entry["distance"] == 2 assert "~2" in entry["name"] def test_undefined_commit(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit(tmp_path, "c1") fake_id = "a" * 64 r = _nr(tmp_path, fake_id) entry = json.loads(r.output)["results"][0] assert entry["undefined"] is True assert entry["name"] is None assert entry["commit_id"] is None def test_multiple_commit_ids(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) c1 = _commit(tmp_path, "c1") c2 = _commit(tmp_path, "c2", parent=c1) r = _nr(tmp_path, c1, c2) data = json.loads(r.output) assert len(data["results"]) == 2 def test_input_field_preserves_original(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c1") short = cid[:10] r = _nr(tmp_path, short) entry = json.loads(r.output)["results"][0] assert entry["input"] == short assert entry["commit_id"] == cid # resolved to full # --------------------------------------------------------------------------- # Integration — short prefix resolution # --------------------------------------------------------------------------- class TestPrefixResolution: def test_short_prefix_resolves(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c1") r = _nr(tmp_path, cid[:8]) assert r.exit_code == 0 entry = json.loads(r.output)["results"][0] assert entry["commit_id"] == cid assert entry["undefined"] is False def test_four_char_prefix_resolves(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c1") # Extract 4 hex chars from the sha256:-prefixed ID (skip the "sha256:" prefix). hex_prefix = cid[len("sha256:"):len("sha256:") + 4] r = _nr(tmp_path, hex_prefix) assert r.exit_code == 0 entry = json.loads(r.output)["results"][0] # Might be undefined if prefix is too ambiguous in the BFS map # but the input field is always preserved assert entry["input"] == hex_prefix def test_ambiguous_prefix_marked(self, tmp_path: pathlib.Path) -> None: """Two commits sharing a 4-char hex prefix → ambiguous result, not undefined.""" _init_repo(tmp_path) # _AMBIG_MSG_1 and _AMBIG_MSG_2 produce commit IDs whose hex portions share _AMBIG_PREFIX. cid1 = _commit(tmp_path, _AMBIG_MSG_1, "main") cid2 = _commit(tmp_path, _AMBIG_MSG_2, "dev") hex1 = cid1[len("sha256:"):len("sha256:") + 4] hex2 = cid2[len("sha256:"):len("sha256:") + 4] assert hex1 == hex2 == _AMBIG_PREFIX, "pre-computed prefix mismatch" r = _nr(tmp_path, _AMBIG_PREFIX) assert r.exit_code == 0 entry = json.loads(r.output)["results"][0] assert entry["ambiguous"] is True def test_non_hex_input_rejected(self, tmp_path: pathlib.Path) -> None: # Default format is json → error goes to stdout as JSON, stderr is empty. _init_repo(tmp_path) r = _nr(tmp_path, "not-hex-at-all!") assert r.exit_code != 0 assert not r.stderr.strip() assert json.loads(r.output)["status"] == "error" # --------------------------------------------------------------------------- # Integration — --branches filter # --------------------------------------------------------------------------- class TestBranchesFilter: def test_branches_filter_restricts_bfs(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) c_main = _commit(tmp_path, "c-main", "main") c_dev = _commit(tmp_path, "c-dev", "dev") # With --branches main, c_dev should be undefined r = _nr(tmp_path, c_dev, "--branches", "main") assert r.exit_code == 0 entry = json.loads(r.output)["results"][0] assert entry["undefined"] is True def test_branches_filter_finds_matching(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) c_main = _commit(tmp_path, "c-main", "main") c_dev = _commit(tmp_path, "c-dev", "dev") r = _nr(tmp_path, c_main, "--branches", "main") assert r.exit_code == 0 entry = json.loads(r.output)["results"][0] assert entry["undefined"] is False assert entry["branch"] == "main" def test_branches_glob_pattern(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) c_feat1 = _commit(tmp_path, "c-feat1", "feat/one") c_feat2 = _commit(tmp_path, "c-feat2", "feat/two") c_main = _commit(tmp_path, "c-main", "main") # Only feat/* seeded; main commit should be undefined r = _nr(tmp_path, c_main, "--branches", "feat/*") entry = json.loads(r.output)["results"][0] assert entry["undefined"] is True def test_branches_filter_hierarchical(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c-feat", "feat/my-thing") r = _nr(tmp_path, cid, "--branches", "feat/*") entry = json.loads(r.output)["results"][0] assert entry["undefined"] is False assert entry["branch"] == "feat/my-thing" # --------------------------------------------------------------------------- # Integration — --stdin # --------------------------------------------------------------------------- class TestStdinMode: def test_stdin_reads_commit_ids(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c1") r = _nr(tmp_path, "--stdin", stdin=f"{cid}\n") assert r.exit_code == 0 data = json.loads(r.output) assert len(data["results"]) == 1 assert data["results"][0]["commit_id"] == cid def test_stdin_skips_blank_lines(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c1") r = _nr(tmp_path, "--stdin", stdin=f"\n{cid}\n\n") assert r.exit_code == 0 assert len(json.loads(r.output)["results"]) == 1 def test_stdin_skips_comments(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c1") r = _nr(tmp_path, "--stdin", stdin=f"# this is a comment\n{cid}\n") assert r.exit_code == 0 assert len(json.loads(r.output)["results"]) == 1 def test_stdin_combined_with_positional(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) c1 = _commit(tmp_path, "c1", "main") c2 = _commit(tmp_path, "c2", "dev") r = _nr(tmp_path, c1, "--stdin", stdin=f"{c2}\n") assert r.exit_code == 0 data = json.loads(r.output) assert len(data["results"]) == 2 def test_stdin_empty_with_no_positional_errors(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) r = _nr_text(tmp_path, "--stdin", stdin="") assert r.exit_code != 0 assert r.stdout_bytes == b"" # --------------------------------------------------------------------------- # Integration — text output, --name-only, --undefined # --------------------------------------------------------------------------- class TestTextOutput: def test_text_format_contains_cid(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c1") r = _nr_text(tmp_path, cid) assert r.exit_code == 0 assert cid in r.output def test_name_only_omits_cid(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c1") r = _nr_text(tmp_path, "--name-only", cid) assert r.exit_code == 0 assert cid not in r.output assert r.output.strip() def test_custom_undefined_string(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) _commit(tmp_path, "c1") fake = "b" * 64 r = _nr_text(tmp_path, "--undefined", "UNKNOWN", fake) assert r.exit_code == 0 assert "UNKNOWN" in r.output def test_ambiguous_shown_in_text(self, tmp_path: pathlib.Path) -> None: """Ambiguous prefix emits '(ambiguous)' in text output mode.""" _init_repo(tmp_path) cid1 = _commit(tmp_path, _AMBIG_MSG_1, "main") cid2 = _commit(tmp_path, _AMBIG_MSG_2, "dev") hex1 = cid1[len("sha256:"):len("sha256:") + 4] hex2 = cid2[len("sha256:"):len("sha256:") + 4] assert hex1 == hex2 == _AMBIG_PREFIX, "pre-computed prefix mismatch" r = _nr_text(tmp_path, _AMBIG_PREFIX) assert r.exit_code == 0 assert "ambiguous" in r.output.lower() # --------------------------------------------------------------------------- # Integration — --max-walk # --------------------------------------------------------------------------- class TestMaxWalk: def test_max_walk_limits_bfs(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) parent: str | None = None first: str | None = None for i in range(8): cid = _commit(tmp_path, f"c{i}", parent=parent) if first is None: first = cid parent = cid assert first is not None r = _nr(tmp_path, first, "--max-walk", "1") assert r.exit_code == 0 entry = json.loads(r.output)["results"][0] # Deep commit unreachable with max_walk=1 assert entry["undefined"] is True def test_max_walk_zero_errors(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c1") r = _nr_text(tmp_path, cid, "--max-walk", "0") assert r.exit_code != 0 assert r.stdout_bytes == b"" def test_json_shorthand(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c1") r = _nr(tmp_path, "--json", cid) assert r.exit_code == 0 data = json.loads(r.output) assert "results" in data # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestSecurity: def test_non_hex_input_rejected(self, tmp_path: pathlib.Path) -> None: # Default format is json → error goes to stdout as JSON, stderr is empty. _init_repo(tmp_path) r = _nr(tmp_path, "not-hex!") assert r.exit_code != 0 assert not r.stderr.strip() assert json.loads(r.output)["status"] == "error" def test_ansi_in_branch_name_sanitized_text(self, tmp_path: pathlib.Path) -> None: """Branch name from ref → sanitize_display applied before printing.""" _init_repo(tmp_path) cid = _commit(tmp_path, "c1") # We can't easily inject into the branch name; test via undefined output # with an ANSI-containing --undefined value being sanitized r = _nr_text(tmp_path, "--undefined", "\x1b[31mred\x1b[0m", "a" * 64) assert r.exit_code == 0 assert "\x1b" not in r.output def test_unknown_flag_exits_nonzero(self, tmp_path: pathlib.Path) -> None: # Unrecognized flags → argparse rejects and routes error to stderr. _init_repo(tmp_path) cid = _commit(tmp_path, "c1") r = _nr_text(tmp_path, "--format", "xml", cid) assert r.exit_code != 0 assert r.stderr.strip() def test_no_args_exits_nonzero(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) r = _nr_text(tmp_path) assert r.exit_code != 0 assert r.stdout_bytes == b"" def test_no_traceback_on_unknown_flag(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c1") r = _nr_text(tmp_path, "--format", "bad", cid) assert "Traceback" not in r.output assert "Traceback" not in r.stderr def test_no_traceback_on_non_hex(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) r = _nr(tmp_path, "xyz!!!") assert "Traceback" not in r.output assert "Traceback" not in r.stderr def test_symlink_ref_skipped(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c", "main") real = heads_dir(tmp_path) / "main" link = heads_dir(tmp_path) / "sym" link.symlink_to(real) r = _nr(tmp_path, cid) assert r.exit_code == 0 entry = json.loads(r.output)["results"][0] assert entry["branch"] == "main" # not "sym" def test_no_repo_exits_cleanly(self, tmp_path: pathlib.Path) -> None: r = runner.invoke( cli, ["name-rev", "a" * 64], env={"MUSE_REPO_ROOT": str(tmp_path / "norepo")}, ) assert r.exit_code != 0 assert "Traceback" not in r.output assert "Traceback" not in r.stderr # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestStress: def test_50_commit_chain(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) parent: str | None = None commits: list[str] = [] for i in range(50): cid = _commit(tmp_path, f"c{i}", parent=parent) commits.append(cid) parent = cid # Tip should be at distance 0; first at distance 49. r = _nr(tmp_path, commits[-1], commits[0]) assert r.exit_code == 0 results = json.loads(r.output)["results"] by_id = {e["commit_id"]: e for e in results} assert by_id[commits[-1]]["distance"] == 0 assert by_id[commits[0]]["distance"] == 49 def test_10_branch_repo(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) tip_ids = [] for i in range(10): cid = _commit(tmp_path, f"c-branch{i}", f"branch-{i:02d}") tip_ids.append(cid) r = _nr(tmp_path, *tip_ids) assert r.exit_code == 0 data = json.loads(r.output) assert len(data["results"]) == 10 for entry in data["results"]: assert entry["distance"] == 0 def test_200_sequential_calls(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) cid = _commit(tmp_path, "c1") for _ in range(200): r = _nr(tmp_path, cid) assert r.exit_code == 0 assert json.loads(r.output)["results"][0]["distance"] == 0 def test_stdin_50_commit_ids(self, tmp_path: pathlib.Path) -> None: _init_repo(tmp_path) parent: str | None = None commits = [] for i in range(50): cid = _commit(tmp_path, f"cs{i}", parent=parent) commits.append(cid) parent = cid stdin_input = "\n".join(commits) + "\n" r = _nr(tmp_path, "--stdin", stdin=stdin_input) assert r.exit_code == 0 data = json.loads(r.output) assert len(data["results"]) == 50