"""Tests for ``muse merge-tree`` — three-way merge without working tree modification. Coverage tiers: - Unit: clean merge (no conflicts), conflicting merge, trivial merge (one branch unchanged), explicit --base override, JSON schema, working-tree isolation (no files written without --write-objects) - Integration: merge-tree result matches actual muse merge; --write-objects creates snapshot; text output; nonexistent branch exits nonzero; batch conflict reporting; same-base same-result determinism - Security: ANSI in branch name rejected; no working-tree mutation - Stress: 50-file merge with 30% conflicts; manifest cache (3 snapshot reads) """ from __future__ import annotations from collections.abc import Mapping import datetime import json import pathlib import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.object_store import write_object from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, blob_id from muse.core.paths import merge_state_path, muse_dir, ref_path runner = CliRunner() _REPO_ID = "merge-tree-test" _counter = 0 # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _init_repo(path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(path) for d in ("commits", "snapshots", "objects", "refs/heads", "code"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" ) return path def _env(repo: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(repo)} def _write_files(root: pathlib.Path, files: Mapping[str, bytes]) -> Manifest: manifest: Manifest = {} for rel_path, content in files.items(): obj_id = blob_id(content) write_object(root, obj_id, content) manifest[rel_path] = obj_id abs_path = root / rel_path abs_path.parent.mkdir(parents=True, exist_ok=True) abs_path.write_bytes(content) return manifest def _commit( root: pathlib.Path, files: Mapping[str, bytes], branch: str = "main", parent_id: str | None = None, message: str | None = None, ) -> str: global _counter _counter += 1 manifest = _write_files(root, files) snap_id = hash_snapshot(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime.now(datetime.timezone.utc) msg = message or f"commit {_counter}" commit_id = hash_commit( parent_ids=[parent_id] if parent_id else [], snapshot_id=snap_id, message=msg, committed_at_iso=committed_at.isoformat(), ) write_commit(root, CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message=msg, committed_at=committed_at, parent_commit_id=parent_id, )) branch_ref = ref_path(root, branch) branch_ref.parent.mkdir(parents=True, exist_ok=True) branch_ref.write_text(commit_id, encoding="utf-8") return commit_id def _invoke(repo: pathlib.Path, *args: str) -> InvokeResult: from muse.cli.app import main as cli return runner.invoke(cli, ["merge-tree", *args], env=_env(repo)) def _setup_divergent_repo(root: pathlib.Path) -> tuple[str, str, str]: """Create a base commit then two branches with non-overlapping changes. Returns (base_id, branch_a_id, branch_b_id). """ base_id = _commit(root, { "shared.py": b"x = 0\n", "only_base.py": b"base\n", }, branch="main") a_id = _commit(root, { "shared.py": b"x = 0\n", "only_base.py": b"base\n", "a_new.py": b"a = 1\n", }, branch="branch-a", parent_id=base_id) b_id = _commit(root, { "shared.py": b"x = 0\n", "only_base.py": b"base\n", "b_new.py": b"b = 2\n", }, branch="branch-b", parent_id=base_id) return base_id, a_id, b_id def _setup_conflicting_repo(root: pathlib.Path) -> tuple[str, str, str]: """Create a base commit then two branches that both modify shared.py differently.""" base_id = _commit(root, {"shared.py": b"x = 0\n"}, branch="main") a_id = _commit(root, {"shared.py": b"x = 1\n"}, branch="branch-a", parent_id=base_id) b_id = _commit(root, {"shared.py": b"x = 2\n"}, branch="branch-b", parent_id=base_id) return base_id, a_id, b_id # --------------------------------------------------------------------------- # Unit — clean merge (no conflicts) # --------------------------------------------------------------------------- def test_clean_merge_exits_zero(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _setup_divergent_repo(root) result = _invoke(root, "branch-a", "branch-b", "--json") assert result.exit_code == 0 def test_clean_merge_json_schema(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _setup_divergent_repo(root) result = _invoke(root, "branch-a", "branch-b", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) for key in ("base", "branch1", "branch2", "conflicts", "merged_manifest", "trivially_merged"): assert key in data, f"missing key: {key}" def test_clean_merge_no_conflicts(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _setup_divergent_repo(root) result = _invoke(root, "branch-a", "branch-b", "--json") data = json.loads(result.stdout) assert data["conflicts"] == [] assert data["trivially_merged"] is True def test_clean_merge_manifest_contains_both_changes(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _setup_divergent_repo(root) result = _invoke(root, "branch-a", "branch-b", "--json") data = json.loads(result.stdout) manifest = data["merged_manifest"] assert "a_new.py" in manifest assert "b_new.py" in manifest assert manifest["a_new.py"] is not None assert manifest["b_new.py"] is not None # --------------------------------------------------------------------------- # Unit — conflicting merge # --------------------------------------------------------------------------- def test_conflicting_merge_exits_nonzero(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _setup_conflicting_repo(root) result = _invoke(root, "branch-a", "branch-b", "--json") assert result.exit_code != 0 def test_conflicting_merge_reports_conflict_paths(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _setup_conflicting_repo(root) result = _invoke(root, "branch-a", "branch-b", "--json") data = json.loads(result.stdout) assert "shared.py" in data["conflicts"] assert data["trivially_merged"] is False def test_conflicting_merge_manifest_has_null_for_conflicts(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _setup_conflicting_repo(root) result = _invoke(root, "branch-a", "branch-b", "--json") data = json.loads(result.stdout) assert data["merged_manifest"]["shared.py"] is None # --------------------------------------------------------------------------- # Unit — trivial merge (one branch unchanged) # --------------------------------------------------------------------------- def test_trivial_merge_one_side_unchanged(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base_id = _commit(root, {"a.py": b"x=1\n"}, branch="main") # branch-a has a new file; branch-b is identical to base _commit(root, {"a.py": b"x=1\n", "new.py": b"y=2\n"}, branch="branch-a", parent_id=base_id) _commit(root, {"a.py": b"x=1\n"}, branch="branch-b", parent_id=base_id) result = _invoke(root, "branch-a", "branch-b", "--json") data = json.loads(result.stdout) assert data["conflicts"] == [] assert "new.py" in data["merged_manifest"] # --------------------------------------------------------------------------- # Unit — explicit --base override # --------------------------------------------------------------------------- def test_explicit_base_override(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base_id = _commit(root, {"a.py": b"v0\n"}, branch="main") a_id = _commit(root, {"a.py": b"v1\n"}, branch="branch-a", parent_id=base_id) b_id = _commit(root, {"a.py": b"v2\n"}, branch="branch-b", parent_id=base_id) # With explicit base the merge still uses the same base → same result result = _invoke(root, "branch-a", "branch-b", "--base", base_id, "--json") data = json.loads(result.stdout) assert data["base"] == base_id def test_explicit_base_nonexistent_exits_nonzero(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) base_id = _commit(root, {"a.py": b"v0\n"}, branch="main") a_id = _commit(root, {"a.py": b"v1\n"}, branch="branch-a", parent_id=base_id) b_id = _commit(root, {"a.py": b"v2\n"}, branch="branch-b", parent_id=base_id) bad_base = "a" * 64 result = _invoke(root, "branch-a", "branch-b", "--base", bad_base, "--json") assert result.exit_code != 0 # --------------------------------------------------------------------------- # Unit — working-tree isolation # --------------------------------------------------------------------------- def test_no_working_tree_mutation(tmp_path: pathlib.Path) -> None: """merge-tree must never write to the working tree.""" root = _init_repo(tmp_path) _setup_divergent_repo(root) # Record working-tree state before before = {p.name for p in root.iterdir() if not p.name.startswith(".")} _invoke(root, "branch-a", "branch-b", "--json") after = {p.name for p in root.iterdir() if not p.name.startswith(".")} assert before == after def test_no_merge_state_written(tmp_path: pathlib.Path) -> None: """merge-tree must not write MERGE_STATE.json.""" root = _init_repo(tmp_path) _setup_conflicting_repo(root) _invoke(root, "branch-a", "branch-b", "--json") assert not (merge_state_path(root)).exists() # --------------------------------------------------------------------------- # Integration — --write-objects # --------------------------------------------------------------------------- def test_write_objects_creates_snapshot(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _setup_divergent_repo(root) result = _invoke(root, "branch-a", "branch-b", "--write-objects", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert "snapshot_id" in data snap_id = data["snapshot_id"] # Snapshot must exist in the unified object store from muse.core.object_store import object_path assert object_path(root, snap_id).exists() def test_write_objects_snapshot_matches_manifest(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _setup_divergent_repo(root) result = _invoke(root, "branch-a", "branch-b", "--write-objects", "--json") data = json.loads(result.stdout) from muse.core.snapshots import read_snapshot snap = read_snapshot(root, data["snapshot_id"]) assert snap is not None # Manifest in snapshot matches non-null entries in merged_manifest for path, oid in data["merged_manifest"].items(): if oid is not None: assert snap.manifest.get(path) == oid def test_write_objects_not_default(tmp_path: pathlib.Path) -> None: """Without --write-objects no snapshot_id is returned.""" root = _init_repo(tmp_path) _setup_divergent_repo(root) result = _invoke(root, "branch-a", "branch-b", "--json") data = json.loads(result.stdout) assert "snapshot_id" not in data # --------------------------------------------------------------------------- # Integration — text output # --------------------------------------------------------------------------- def test_text_output_shows_branch_names(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _setup_divergent_repo(root) result = _invoke(root, "branch-a", "branch-b") assert result.exit_code == 0 assert "branch-a" in result.stdout or "a_new.py" in result.stdout def test_text_output_conflict_mentions_path(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _setup_conflicting_repo(root) result = _invoke(root, "branch-a", "branch-b") assert "shared.py" in result.stdout # --------------------------------------------------------------------------- # Integration — error cases # --------------------------------------------------------------------------- def test_nonexistent_branch_exits_nonzero(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit(root, {"a.py": b"x\n"}, branch="main") result = _invoke(root, "main", "no-such-branch", "--json") assert result.exit_code != 0 def test_both_branches_nonexistent_exits_nonzero(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "ghost-a", "ghost-b", "--json") assert result.exit_code != 0 def test_no_common_ancestor_exits_nonzero(tmp_path: pathlib.Path) -> None: """Branches with no shared history cannot be merge-treed.""" root = _init_repo(tmp_path) # Two independent root commits — no shared ancestor _commit(root, {"x.py": b"x\n"}, branch="orphan-a") _commit(root, {"y.py": b"y\n"}, branch="orphan-b") result = _invoke(root, "orphan-a", "orphan-b", "--json") assert result.exit_code != 0 # --------------------------------------------------------------------------- # Integration — determinism # --------------------------------------------------------------------------- def test_same_inputs_same_output(tmp_path: pathlib.Path) -> None: """merge-tree is pure — same inputs must produce identical output (excluding timing).""" root = _init_repo(tmp_path) _setup_divergent_repo(root) r1 = _invoke(root, "branch-a", "branch-b", "--json") r2 = _invoke(root, "branch-a", "branch-b", "--json") _TIMING_KEYS = {"duration_ms", "timestamp"} d1 = {k: v for k, v in json.loads(r1.stdout).items() if k not in _TIMING_KEYS} d2 = {k: v for k, v in json.loads(r2.stdout).items() if k not in _TIMING_KEYS} assert d1 == d2 def test_branch_order_does_not_affect_conflict_detection(tmp_path: pathlib.Path) -> None: """Conflicts must be detected regardless of argument order.""" root = _init_repo(tmp_path) _setup_conflicting_repo(root) r1 = _invoke(root, "branch-a", "branch-b", "--json") r2 = _invoke(root, "branch-b", "branch-a", "--json") d1 = json.loads(r1.stdout) d2 = json.loads(r2.stdout) assert set(d1["conflicts"]) == set(d2["conflicts"]) # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- def test_ansi_in_branch_name_rejected(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "\x1b[31mbad\x1b[0m", "main") assert result.exit_code != 0 def test_ansi_in_second_branch_name_rejected(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "main", "\x1b[31mbad\x1b[0m") assert result.exit_code != 0 # --------------------------------------------------------------------------- # Stress — 50 files, 30% conflict rate # --------------------------------------------------------------------------- def test_stress_50_files_30_pct_conflicts(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) n = 50 conflict_count = int(n * 0.3) # 15 conflict files base_files = {f"f{i}.py": f"v = {i}\n".encode() for i in range(n)} base_id = _commit(root, base_files, branch="main") # branch-a: modify first conflict_count files + add a_extra.py a_files = dict(base_files) for i in range(conflict_count): a_files[f"f{i}.py"] = f"v = {i}_a\n".encode() a_files["a_extra.py"] = b"a = 1\n" a_id = _commit(root, a_files, branch="stress-a", parent_id=base_id) # branch-b: modify same conflict_count files differently + add b_extra.py b_files = dict(base_files) for i in range(conflict_count): b_files[f"f{i}.py"] = f"v = {i}_b\n".encode() b_files["b_extra.py"] = b"b = 2\n" b_id = _commit(root, b_files, branch="stress-b", parent_id=base_id) result = _invoke(root, "stress-a", "stress-b", "--json") assert result.exit_code != 0 # has conflicts data = json.loads(result.stdout) assert len(data["conflicts"]) == conflict_count # Non-conflict extra files should be in merged manifest assert "a_extra.py" in data["merged_manifest"] assert "b_extra.py" in data["merged_manifest"] # Unchanged files should be in merged manifest for i in range(conflict_count, n): assert f"f{i}.py" in data["merged_manifest"] class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: import argparse from muse.cli.commands.merge_tree import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["merge-tree", "main", "dev"]) assert args.json_out is False def test_json_flag_sets_json_out(self) -> None: import argparse from muse.cli.commands.merge_tree import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["merge-tree", "main", "dev", "--json"]) assert args.json_out is True def test_j_shorthand_sets_json_out(self) -> None: import argparse from muse.cli.commands.merge_tree import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["merge-tree", "main", "dev", "-j"]) assert args.json_out is True