"""Tests for ``muse range-diff`` — supercharged coverage. Coverage tiers -------------- - Unit: _parse_range, _resolve_ref (sha256: prefix), _compute_patch_id, _patch_id_for_commit, _pair_series pairing logic - Integration: identical series, changed/dropped/added, JSON schema, text output, creation-factor variants, error cases - End-to-end: full CLI via CliRunner - Data integrity: old_count/new_count match pair list; files_changed per commit; patch_id has sha256: prefix; duration_ms is numeric - Performance: 50-commit series completes under 2 seconds - Security: ANSI injection rejected; no control characters in output - Stress: 50-commit mixed series (equivalent + changed + added) Supercharged JSON schema (all ``--json`` outputs) -------------------------------------------------- :: { "old_range": "base..old", "new_range": "base..new", "trivially_equivalent": true, "old_count": 3, "new_count": 3, "stable": false, "creation_factor": 0.6, "pairs": [ { "old": { "commit_id": "sha256:...", "patch_id": "sha256:...", "subject": "feat: add foo", "files_changed": 2 }, "new": { ... }, "status": "equivalent" } ], "duration_ms": 12.3, "exit_code": 0 } """ from __future__ import annotations from collections.abc import Mapping import datetime import argparse import json import pathlib import re import time import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.object_store import write_object from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, blob_id, long_id from muse.core.paths import ref_path, muse_dir runner = CliRunner() _REPO_ID = "range-diff-super" _counter = 0 # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _oid(content: bytes) -> str: """sha256:-prefixed object ID — correct for all Muse APIs.""" return blob_id(content) def _init_repo(path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(path) for d in ("commits", "snapshots", "objects", "refs/heads", "code"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" ) return path def _env(repo: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(repo)} def _write_files(root: pathlib.Path, files: Mapping[str, bytes]) -> Manifest: manifest: Manifest = {} for rel, content in files.items(): oid = _oid(content) write_object(root, oid, content) manifest[rel] = oid p = root / rel p.parent.mkdir(parents=True, exist_ok=True) p.write_bytes(content) return manifest def _commit( root: pathlib.Path, files: Mapping[str, bytes], branch: str = "main", parent_id: str | None = None, message: str | None = None, ) -> str: global _counter _counter += 1 manifest = _write_files(root, files) snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime.now(datetime.timezone.utc) msg = message or f"commit {_counter}" commit_id = compute_commit_id( [parent_id] if parent_id else [], snap_id, msg, committed_at.isoformat(), ) write_commit(root, CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message=msg, committed_at=committed_at, parent_commit_id=parent_id, )) branch_ref = ref_path(root, branch) branch_ref.parent.mkdir(parents=True, exist_ok=True) branch_ref.write_text(commit_id, encoding="utf-8") return commit_id def _invoke(repo: pathlib.Path, *args: str) -> InvokeResult: from muse.cli.app import main as cli return runner.invoke(cli, ["range-diff", *args], env=_env(repo)) # --------------------------------------------------------------------------- # Unit — _parse_range # --------------------------------------------------------------------------- class TestParseRange: def test_with_dotdot(self) -> None: from muse.cli.commands.range_diff import _parse_range base, tip = _parse_range("abc..def") assert base == "abc" assert tip == "def" def test_no_dotdot(self) -> None: from muse.cli.commands.range_diff import _parse_range base, tip = _parse_range("main") assert base is None assert tip == "main" def test_strips_whitespace(self) -> None: from muse.cli.commands.range_diff import _parse_range base, tip = _parse_range("base .. tip") assert base == "base" assert tip == "tip" def test_sha256_prefixed_base(self) -> None: from muse.cli.commands.range_diff import _parse_range sha = long_id("a" * 64) base, tip = _parse_range(f"{sha}..main") assert base == sha assert tip == "main" # --------------------------------------------------------------------------- # Unit — _resolve_ref with sha256: prefix # --------------------------------------------------------------------------- class TestResolveRef: def test_resolves_branch_name(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.range_diff import _resolve_ref root = _init_repo(tmp_path) cid = _commit(root, {"a.py": b"x\n"}, branch="main") resolved = _resolve_ref(root, "main") assert resolved == cid def test_resolves_sha256_prefixed_commit_id(self, tmp_path: pathlib.Path) -> None: """RED: _resolve_ref must accept sha256:-prefixed commit IDs.""" from muse.cli.commands.range_diff import _resolve_ref root = _init_repo(tmp_path) cid = _commit(root, {"a.py": b"x\n"}, branch="main") # cid from compute_commit_id is sha256:-prefixed assert cid.startswith("sha256:") resolved = _resolve_ref(root, cid) assert resolved is not None, ( f"_resolve_ref could not resolve sha256:-prefixed commit ID {cid[:20]}..." ) def test_resolves_head(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.range_diff import _resolve_ref root = _init_repo(tmp_path) cid = _commit(root, {"a.py": b"x\n"}, branch="main") assert _resolve_ref(root, "HEAD") == cid def test_nonexistent_branch_returns_none(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.range_diff import _resolve_ref root = _init_repo(tmp_path) assert _resolve_ref(root, "no-such-branch") is None def test_nonexistent_sha_returns_none(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.range_diff import _resolve_ref root = _init_repo(tmp_path) fake = long_id("f" * 64) assert _resolve_ref(root, fake) is None # --------------------------------------------------------------------------- # Unit — _compute_patch_id / _patch_id_for_commit # --------------------------------------------------------------------------- class TestPatchId: def test_returns_sha256_prefixed_patch_id(self, tmp_path: pathlib.Path) -> None: """RED: patch_id must have sha256: prefix — consistent with muse patch-id --json.""" from muse.cli.commands.range_diff import _patch_id_for_commit root = _init_repo(tmp_path) base = _commit(root, {"base.py": b"base\n"}, branch="main") cid = _commit(root, {"base.py": b"base\n", "a.py": b"a=1\n"}, branch="feat", parent_id=base) pid, _ = _patch_id_for_commit(root, cid, stable=False) assert pid.startswith("sha256:"), ( f"patch_id should be sha256:-prefixed but got: {pid[:20]!r}" ) def test_returns_files_changed_count(self, tmp_path: pathlib.Path) -> None: """RED: _patch_id_for_commit must return (patch_id, files_changed) tuple.""" from muse.cli.commands.range_diff import _patch_id_for_commit root = _init_repo(tmp_path) base = _commit(root, {"base.py": b"base\n"}, branch="main") # commit adds 2 new files relative to parent cid = _commit(root, {"base.py": b"base\n", "a.py": b"a\n", "b.py": b"b\n"}, branch="feat", parent_id=base) pid, fc = _patch_id_for_commit(root, cid, stable=False) assert fc == 2, f"Expected 2 files_changed, got {fc}" def test_same_content_same_patch_id(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.range_diff import _patch_id_for_commit root = _init_repo(tmp_path) base = _commit(root, {"base.py": b"base\n"}, branch="main") c1 = _commit(root, {"base.py": b"base\n", "a.py": b"a=1\n"}, branch="b1", parent_id=base) c2 = _commit(root, {"base.py": b"base\n", "a.py": b"a=1\n"}, branch="b2", parent_id=base) pid1, _ = _patch_id_for_commit(root, c1, stable=False) pid2, _ = _patch_id_for_commit(root, c2, stable=False) assert pid1 == pid2 def test_different_content_different_patch_id(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.range_diff import _patch_id_for_commit root = _init_repo(tmp_path) base = _commit(root, {"base.py": b"base\n"}, branch="main") c1 = _commit(root, {"base.py": b"base\n", "a.py": b"v1\n"}, branch="b1", parent_id=base) c2 = _commit(root, {"base.py": b"base\n", "a.py": b"v2\n"}, branch="b2", parent_id=base) pid1, _ = _patch_id_for_commit(root, c1, stable=False) pid2, _ = _patch_id_for_commit(root, c2, stable=False) assert pid1 != pid2 def test_stable_ignores_trailing_whitespace(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.range_diff import _patch_id_for_commit root = _init_repo(tmp_path) base = _commit(root, {"base.py": b"base\n"}, branch="main") c1 = _commit(root, {"base.py": b"base\n", "a.py": b"a=1\n"}, branch="b1", parent_id=base) c2 = _commit(root, {"base.py": b"base\n", "a.py": b"a=1 \n"}, branch="b2", parent_id=base) pid1, _ = _patch_id_for_commit(root, c1, stable=True) pid2, _ = _patch_id_for_commit(root, c2, stable=True) assert pid1 == pid2 def test_first_commit_zero_files_changed(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.range_diff import _patch_id_for_commit root = _init_repo(tmp_path) # First commit has no parent — base_manifest is empty, so all files are "added" cid = _commit(root, {"a.py": b"x\n", "b.py": b"y\n"}, branch="main") _, fc = _patch_id_for_commit(root, cid, stable=False) assert fc == 2 # --------------------------------------------------------------------------- # Integration — trivially equivalent # --------------------------------------------------------------------------- class TestTriviallyEquivalent: def test_identical_series_exit_zero(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) c2 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n", "b.py": b"b=2\n"}, branch="old", parent_id=c1) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) n2 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n", "b.py": b"b=2\n"}, branch="new", parent_id=n1) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["trivially_equivalent"] is True assert all(p["status"] == "equivalent" for p in data["pairs"]) def test_both_empty_trivially_equivalent(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") result = _invoke(root, f"{base}..{base}", f"{base}..{base}", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["trivially_equivalent"] is True assert data["pairs"] == [] def test_empty_old_all_added(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..{base}", f"{base}..new", "--json") data = json.loads(result.stdout) assert all(p["status"] == "added" for p in data["pairs"]) def test_empty_new_all_dropped(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..{base}", "--json") data = json.loads(result.stdout) assert all(p["status"] == "dropped" for p in data["pairs"]) # --------------------------------------------------------------------------- # Integration — differences # --------------------------------------------------------------------------- class TestDifferences: def test_single_commit_changed(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"v1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"v2\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) assert data["trivially_equivalent"] is False assert len(data["pairs"]) == 1 assert data["pairs"][0]["status"] == "changed" def test_commit_added(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) n2 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n", "extra.py": b"e=9\n"}, branch="new", parent_id=n1) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) statuses = [p["status"] for p in data["pairs"]] assert "added" in statuses assert "equivalent" in statuses def test_commit_dropped(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base, message="add a") c2 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n", "b.py": b"b=2\n"}, branch="old", parent_id=c1, message="add b") n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n", "b.py": b"b=2\n"}, branch="new", parent_id=base, message="add a and b squashed") result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) statuses = {p["status"] for p in data["pairs"]} assert "dropped" in statuses or "changed" in statuses def test_exit_zero_when_equivalent(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") assert result.exit_code == 0 def test_exit_nonzero_when_differs(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"v1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"v2\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") assert result.exit_code != 0 # --------------------------------------------------------------------------- # Integration — JSON schema supercharge (RED tests) # --------------------------------------------------------------------------- class TestJsonSchema: def test_has_duration_ms(self, tmp_path: pathlib.Path) -> None: """RED: duration_ms must be present in JSON output.""" root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) assert "duration_ms" in data, "duration_ms missing from JSON" assert isinstance(data["duration_ms"], (int, float)) assert data["duration_ms"] >= 0 def test_has_exit_code(self, tmp_path: pathlib.Path) -> None: """RED: exit_code must be present in JSON output.""" root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) assert "exit_code" in data, "exit_code missing from JSON" assert data["exit_code"] == 0 def test_exit_code_reflects_differences(self, tmp_path: pathlib.Path) -> None: """exit_code in JSON must be 1 when series differ.""" root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"v1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"v2\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) assert "exit_code" in data assert data["exit_code"] == 1 def test_has_old_count(self, tmp_path: pathlib.Path) -> None: """RED: old_count must reflect the number of commits in the old range.""" root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) c2 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n", "b.py": b"b=2\n"}, branch="old", parent_id=c1) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) assert "old_count" in data, "old_count missing from JSON" assert data["old_count"] == 2 def test_has_new_count(self, tmp_path: pathlib.Path) -> None: """RED: new_count must reflect the number of commits in the new range.""" root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) n2 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n", "c.py": b"c=3\n"}, branch="new", parent_id=n1) n3 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n", "c.py": b"c=3\n", "d.py": b"d=4\n"}, branch="new", parent_id=n2) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) assert "new_count" in data, "new_count missing from JSON" assert data["new_count"] == 3 def test_has_stable_field(self, tmp_path: pathlib.Path) -> None: """RED: stable must appear in JSON output reflecting the --stable flag.""" root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--json", "--stable") data = json.loads(result.stdout) assert "stable" in data, "stable missing from JSON" assert data["stable"] is True def test_stable_false_by_default(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) assert "stable" in data assert data["stable"] is False def test_has_creation_factor(self, tmp_path: pathlib.Path) -> None: """RED: creation_factor must appear in JSON, reflecting the value used.""" root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--json", "--creation-factor", "0.3") data = json.loads(result.stdout) assert "creation_factor" in data, "creation_factor missing from JSON" assert abs(data["creation_factor"] - 0.3) < 0.01 def test_patch_id_has_sha256_prefix(self, tmp_path: pathlib.Path) -> None: """RED: patch_id in pair commit info must be sha256:-prefixed.""" root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base, message="add a") n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base, message="add a") result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) for pair in data["pairs"]: for side in ("old", "new"): info = pair.get(side) if info is not None: assert info["patch_id"].startswith("sha256:"), ( f"pair[{side}].patch_id lacks sha256: prefix: {info['patch_id'][:20]!r}" ) def test_pair_has_files_changed(self, tmp_path: pathlib.Path) -> None: """RED: each pair commit info must include files_changed count.""" root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n", "b.py": b"b=2\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n", "b.py": b"b=2\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) for pair in data["pairs"]: for side in ("old", "new"): info = pair.get(side) if info is not None: assert "files_changed" in info, ( f"pair[{side}] missing files_changed: {info}" ) assert isinstance(info["files_changed"], int) assert info["files_changed"] >= 0 def test_old_count_new_count_match_pairs(self, tmp_path: pathlib.Path) -> None: """old_count + new_count must be consistent with actual range walks.""" root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) c2 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n", "b.py": b"b=2\n"}, branch="old", parent_id=c1) c3 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n", "b.py": b"b=2\n", "c.py": b"c=3\n"}, branch="old", parent_id=c2) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) n2 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n", "b.py": b"b=2\n"}, branch="new", parent_id=n1) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) assert data["old_count"] == 3 assert data["new_count"] == 2 def test_complete_schema_keys(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) for key in ("old_range", "new_range", "trivially_equivalent", "old_count", "new_count", "stable", "creation_factor", "pairs", "duration_ms", "exit_code"): assert key in data, f"key {key!r} missing from JSON output" # --------------------------------------------------------------------------- # Integration — creation-factor # --------------------------------------------------------------------------- class TestCreationFactor: def test_zero_no_fuzzy_pairing(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"v1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"v2\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--creation-factor", "0.0", "--json") data = json.loads(result.stdout) statuses = {p["status"] for p in data["pairs"]} assert "changed" not in statuses assert "dropped" in statuses or "added" in statuses def test_one_all_positionally_paired(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"v1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"v2\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--creation-factor", "1.0", "--json") data = json.loads(result.stdout) statuses = {p["status"] for p in data["pairs"]} assert "changed" in statuses def test_creation_factor_clamped_to_range(self, tmp_path: pathlib.Path) -> None: """creation_factor in JSON must be clamped to [0.0, 1.0].""" root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--creation-factor", "99.0", "--json") data = json.loads(result.stdout) assert data["creation_factor"] <= 1.0 # --------------------------------------------------------------------------- # Integration — text output # --------------------------------------------------------------------------- class TestTextOutput: def test_equivalent_shows_equals_symbol(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base, message="add a") n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base, message="add a") result = _invoke(root, f"{base}..old", f"{base}..new") assert result.exit_code == 0 assert "=" in result.stdout def test_text_short_id_has_sha256_prefix(self, tmp_path: pathlib.Path) -> None: """Short IDs in pair lines must be ``sha256:<12 hex chars>`` — prefix is canonical.""" root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base, message="add a") _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base, message="add a") result = _invoke(root, f"{base}..old", f"{base}..new") # Only inspect pair lines (start with =, !, <, >) pair_lines = [l for l in result.stdout.splitlines() if l and l[0] in "=!<>"] assert pair_lines, "No pair lines in text output" sha256_short = re.compile(r"^sha256:[0-9a-f]{12}$") found = [tok for line in pair_lines for tok in line.split() if sha256_short.match(tok)] assert found, ( f"No sha256:<12-hex> short IDs found in pair lines.\n" f"Pair lines: {pair_lines}" ) def test_text_short_ids_are_sha256_plus_8_hex(self, tmp_path: pathlib.Path) -> None: """Short IDs must be sha256: + exactly 12 lowercase hex chars (19 total).""" root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") _commit(root, {"readme.txt": b"base\n", "a.py": b"v1\n"}, branch="old", parent_id=base, message="changed commit") _commit(root, {"readme.txt": b"base\n", "a.py": b"v2\n"}, branch="new", parent_id=base, message="changed commit") result = _invoke(root, f"{base}..old", f"{base}..new") sha256_short = re.compile(r"^sha256:[0-9a-f]{12}$") found = [] for line in result.stdout.splitlines(): for token in line.split(): if sha256_short.match(token): found.append(token) assert found, f"No sha256:<12-hex> tokens found in output:\n{result.stdout}" for tok in found: assert len(tok) == 19, f"Expected 19 chars (sha256: + 12 hex), got {len(tok)}: {tok!r}" def test_changed_shows_exclamation_symbol(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"v1\n"}, branch="old", parent_id=base, message="add a v1") n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"v2\n"}, branch="new", parent_id=base, message="add a v2") result = _invoke(root, f"{base}..old", f"{base}..new") assert result.exit_code != 0 assert "!" in result.stdout def test_added_shows_gt_symbol(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) n2 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n", "x.py": b"x\n"}, branch="new", parent_id=n1) result = _invoke(root, f"{base}..old", f"{base}..new") assert ">" in result.stdout def test_dropped_shows_lt_symbol(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) c2 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n", "b.py": b"b=2\n"}, branch="old", parent_id=c1) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n", "b.py": b"b=2\n"}, branch="new", parent_id=base, message="squashed") result = _invoke(root, f"{base}..old", f"{base}..new", "--creation-factor", "0.0") assert "<" in result.stdout # --------------------------------------------------------------------------- # Integration — error cases # --------------------------------------------------------------------------- class TestErrors: def test_nonexistent_old_ref(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"a.py": b"x\n"}, branch="main") result = _invoke(root, "ghost..no-such", "main..main") assert result.exit_code != 0 def test_nonexistent_new_ref(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"a.py": b"x\n"}, branch="main") result = _invoke(root, f"{base}..main", "ghost..no-such") assert result.exit_code != 0 # --------------------------------------------------------------------------- # Data integrity # --------------------------------------------------------------------------- class TestDataIntegrity: def test_files_changed_accurate(self, tmp_path: pathlib.Path) -> None: """files_changed on a pair entry must match the actual diff size.""" root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") # commit changes exactly 3 files relative to parent c1 = _commit(root, { "readme.txt": b"base\n", "a.py": b"a=1\n", "b.py": b"b=2\n", "c.py": b"c=3\n" }, branch="old", parent_id=base) n1 = _commit(root, { "readme.txt": b"base\n", "a.py": b"a=1\n", "b.py": b"b=2\n", "c.py": b"c=3\n" }, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) pair = data["pairs"][0] # 3 files added relative to base assert pair["old"]["files_changed"] == 3 assert pair["new"]["files_changed"] == 3 def test_old_count_zero_when_old_range_empty(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..{base}", f"{base}..new", "--json") data = json.loads(result.stdout) assert data["old_count"] == 0 assert data["new_count"] == 1 def test_commit_ids_are_sha256_prefixed(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base, message="add a") n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base, message="add a") result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) for pair in data["pairs"]: for side in ("old", "new"): info = pair.get(side) if info is not None: assert info["commit_id"].startswith("sha256:"), ( f"commit_id lacks sha256: prefix: {info['commit_id']!r}" ) def test_pair_subject_matches_commit_message(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base, message="feat: add alpha") n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base, message="feat: add alpha") result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) pair = data["pairs"][0] assert pair["old"]["subject"] == "feat: add alpha" assert pair["new"]["subject"] == "feat: add alpha" def test_duration_ms_is_plausible(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) assert 0 <= data["duration_ms"] < 10_000 # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- class TestSecurity: def test_ansi_in_old_range_rejected(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "\x1b[31mbad\x1b[0m..main", "main..main") assert result.exit_code != 0 def test_ansi_in_new_range_rejected(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "main..main", "\x1b[31mbad\x1b[0m..main") assert result.exit_code != 0 def test_control_char_in_range_rejected(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "main\x00trick..main", "main..main") assert result.exit_code != 0 def test_no_ansi_in_json_output(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") assert "\x1b[" not in result.stdout def test_no_ansi_in_text_output(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new") assert "\x1b[" not in result.stdout # --------------------------------------------------------------------------- # Performance # --------------------------------------------------------------------------- class TestPerformance: def test_50_commit_equivalent_series_under_2_seconds(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"base.py": b"base\n"}, branch="main") old_id = base new_id = base for i in range(50): content = f"v = {i}\n".encode() old_id = _commit(root, {f"f{i}.py": content}, branch="old", parent_id=old_id, message=f"add f{i}") new_id = _commit(root, {f"f{i}.py": content}, branch="new", parent_id=new_id, message=f"add f{i}") t0 = time.monotonic() result = _invoke(root, f"{base}..old", f"{base}..new", "--json") elapsed = time.monotonic() - t0 assert result.exit_code == 0 assert elapsed < 2.0, f"range-diff took {elapsed:.2f}s — expected < 2s" data = json.loads(result.stdout) assert data["trivially_equivalent"] is True assert len(data["pairs"]) == 50 def test_duration_ms_under_threshold(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"readme.txt": b"base\n"}, branch="main") c1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="old", parent_id=base) n1 = _commit(root, {"readme.txt": b"base\n", "a.py": b"a=1\n"}, branch="new", parent_id=base) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") data = json.loads(result.stdout) assert data["duration_ms"] < 2_000 # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- class TestStress: def test_50_commits_all_equivalent(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"base.py": b"base\n"}, branch="main") old_id = base new_id = base for i in range(50): content = f"v = {i}\n".encode() old_id = _commit(root, {f"f{i}.py": content}, branch="old", parent_id=old_id, message=f"add f{i}") new_id = _commit(root, {f"f{i}.py": content}, branch="new", parent_id=new_id, message=f"add f{i}") result = _invoke(root, f"{base}..old", f"{base}..new", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["trivially_equivalent"] is True assert len(data["pairs"]) == 50 assert data["old_count"] == 50 assert data["new_count"] == 50 def test_50_commits_mixed(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base = _commit(root, {"base.py": b"base\n"}, branch="main") old_id = base new_id = base # First 25: identical for i in range(25): content = f"v = {i}\n".encode() old_id = _commit(root, {f"f{i}.py": content}, branch="old", parent_id=old_id) new_id = _commit(root, {f"f{i}.py": content}, branch="new", parent_id=new_id) # Next 25: different content for i in range(25, 50): old_id = _commit(root, {f"f{i}.py": f"old_{i}\n".encode()}, branch="old", parent_id=old_id) new_id = _commit(root, {f"f{i}.py": f"new_{i}\n".encode()}, branch="new", parent_id=new_id) # New has 10 extra commits for i in range(50, 60): new_id = _commit(root, {f"extra{i}.py": b"extra\n"}, branch="new", parent_id=new_id) result = _invoke(root, f"{base}..old", f"{base}..new", "--json") assert result.exit_code != 0 data = json.loads(result.stdout) equivalent = [p for p in data["pairs"] if p["status"] == "equivalent"] assert len(equivalent) == 25 added = [p for p in data["pairs"] if p["status"] == "added"] assert len(added) == 10 assert data["old_count"] == 50 assert data["new_count"] == 60 # --------------------------------------------------------------------------- # TestRegisterFlags — argparse-level verification # --------------------------------------------------------------------------- class TestRegisterFlags: """Verify that register() wires --json / -j correctly.""" def _make_parser(self) -> "argparse.ArgumentParser": import argparse from muse.cli.commands.range_diff import register ap = argparse.ArgumentParser() subs = ap.add_subparsers() register(subs) return ap def test_json_flag_long(self) -> None: ns = self._make_parser().parse_args(["range-diff", "main..feat/x", "main..feat/y", "--json"]) assert ns.json_out is True def test_j_alias(self) -> None: ns = self._make_parser().parse_args(["range-diff", "main..feat/x", "main..feat/y", "-j"]) assert ns.json_out is True def test_default_is_text(self) -> None: ns = self._make_parser().parse_args(["range-diff", "main..feat/x", "main..feat/y"]) assert ns.json_out is False def test_dest_is_json_out(self) -> None: ns = self._make_parser().parse_args(["range-diff", "main..feat/x", "main..feat/y", "-j"]) assert hasattr(ns, "json_out") assert not hasattr(ns, "fmt")