"""Tests for ``muse mv`` — tracked-file move with staging. Coverage tiers: - Unit: _resolve_source, _resolve_dest, _get_source_object_id helpers - Integration: basic move (disk + stage), --dry-run, --force, move-into-dir, directory move, staged-only-file move, --json output - End-to-end: full CLI via CliRunner - Security: path traversal in source/dest rejected, outside-repo paths - Edge cases: source not tracked, dest already tracked, dest exists on disk - Stress: 200-file repo, move half """ from __future__ import annotations from collections.abc import Mapping import datetime import json import pathlib import pytest from tests.cli_test_helper import CliRunner from muse.core.object_store import write_object from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, blob_id from muse.plugins.code.stage import StagedFileMap, make_entry, read_stage, write_stage from muse.core.paths import heads_dir, muse_dir runner = CliRunner() _REPO_ID = "mv-test" # --------------------------------------------------------------------------- # Bootstrap helpers (same pattern as test_cmd_rm) # --------------------------------------------------------------------------- _counter = 0 def _init_repo(path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(path) for d in ("commits", "snapshots", "objects", "refs/heads", "code"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" ) return path def _env(repo: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(repo)} def _commit_files(root: pathlib.Path, files: Mapping[str, bytes]) -> str: """Write *files* to disk + object store; create and record a commit.""" global _counter _counter += 1 manifest: Manifest = {} for rel_path, content in files.items(): obj_id = blob_id(content) write_object(root, obj_id, content) manifest[rel_path] = obj_id abs_path = root / rel_path abs_path.parent.mkdir(parents=True, exist_ok=True) abs_path.write_bytes(content) snap_id = hash_snapshot(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime.now(datetime.timezone.utc) commit_id = hash_commit( parent_ids=[], snapshot_id=snap_id, message=f"commit {_counter}", committed_at_iso=committed_at.isoformat(), ) write_commit( root, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=f"commit {_counter}", committed_at=committed_at, ), ) (heads_dir(root) / "main").write_text(commit_id, encoding="utf-8") return commit_id def _invoke(repo: pathlib.Path, *args: str) -> "InvokeResult": from muse.cli.app import main as cli return runner.invoke(cli, ["mv", *args], env=_env(repo)) # --------------------------------------------------------------------------- # help # --------------------------------------------------------------------------- def test_mv_help() -> None: from muse.cli.app import main as cli result = runner.invoke(cli, ["mv", "--help"]) assert result.exit_code == 0 assert "mv" in result.output # --------------------------------------------------------------------------- # Unit — internal helpers # --------------------------------------------------------------------------- def test_resolve_source_returns_relative_posix(tmp_path: pathlib.Path) -> None: from muse.cli.commands.mv import _resolve_path root = _init_repo(tmp_path) _commit_files(root, {"alpha.py": b"# a\n"}) rel = _resolve_path(root, "alpha.py") assert rel == "alpha.py" def test_resolve_path_rejects_traversal(tmp_path: pathlib.Path) -> None: from muse.cli.commands.mv import _resolve_path import sys root = _init_repo(tmp_path) with pytest.raises(SystemExit) as exc_info: _resolve_path(root, "../../../etc/passwd") assert exc_info.value.code != 0 def test_get_source_object_id_from_head(tmp_path: pathlib.Path) -> None: from muse.cli.commands.mv import _get_source_object_id root = _init_repo(tmp_path) content = b"hello world\n" _commit_files(root, {"src.py": content}) stage: StagedFileMap = {} head_manifest: Manifest = {"src.py": blob_id(content)} obj_id = _get_source_object_id("src.py", head_manifest, stage) assert obj_id == blob_id(content) def test_get_source_object_id_prefers_stage(tmp_path: pathlib.Path) -> None: """Staged object_id takes precedence over HEAD manifest.""" from muse.cli.commands.mv import _get_source_object_id root = _init_repo(tmp_path) old_content = b"old\n" new_content = b"new\n" head_manifest: Manifest = {"src.py": blob_id(old_content)} stage: StagedFileMap = {"src.py": make_entry(blob_id(new_content), "M")} obj_id = _get_source_object_id("src.py", head_manifest, stage) assert obj_id == blob_id(new_content) # --------------------------------------------------------------------------- # Integration — basic move # --------------------------------------------------------------------------- def test_mv_renames_file_on_disk(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"alpha.py": b"# alpha\n"}) result = _invoke(root, "alpha.py", "beta.py") assert result.exit_code == 0 assert not (root / "alpha.py").exists() assert (root / "beta.py").exists() assert (root / "beta.py").read_bytes() == b"# alpha\n" def test_mv_stages_source_as_deleted(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"alpha.py": b"# a\n"}) _invoke(root, "alpha.py", "beta.py") stage = read_stage(root) assert "alpha.py" in stage assert stage["alpha.py"]["mode"] == "D" def test_mv_stages_dest_as_added(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) content = b"# a\n" _commit_files(root, {"alpha.py": content}) _invoke(root, "alpha.py", "beta.py") stage = read_stage(root) assert "beta.py" in stage assert stage["beta.py"]["mode"] == "A" def test_mv_object_id_preserved(tmp_path: pathlib.Path) -> None: """The dest entry must share the source's object_id — content unchanged.""" root = _init_repo(tmp_path) content = b"# source content\n" obj_id = blob_id(content) _commit_files(root, {"alpha.py": content}) _invoke(root, "alpha.py", "beta.py") stage = read_stage(root) assert stage["beta.py"]["object_id"] == obj_id def test_mv_exit_code_zero_on_success(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) result = _invoke(root, "a.py", "b.py") assert result.exit_code == 0 def test_mv_prints_rename_line(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) result = _invoke(root, "a.py", "b.py") assert "a.py" in result.output assert "b.py" in result.output # --------------------------------------------------------------------------- # Integration — --json # --------------------------------------------------------------------------- def test_mv_json_output_structure(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) content = b"# j\n" _commit_files(root, {"j.py": content}) result = _invoke(root, "--json", "j.py", "k.py") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["source"] == "j.py" assert data["dest"] == "k.py" assert data["status"] in ("moved", "dry_run") assert data["dry_run"] is False assert data["object_id"] == blob_id(content) # --------------------------------------------------------------------------- # Integration — --dry-run # --------------------------------------------------------------------------- def test_mv_dry_run_no_disk_change(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"alpha.py": b"# a\n"}) result = _invoke(root, "--dry-run", "alpha.py", "beta.py") assert result.exit_code == 0 assert (root / "alpha.py").exists() assert not (root / "beta.py").exists() def test_mv_dry_run_no_stage_change(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"alpha.py": b"# a\n"}) _invoke(root, "--dry-run", "alpha.py", "beta.py") stage = read_stage(root) assert "alpha.py" not in stage assert "beta.py" not in stage def test_mv_dry_run_json(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"alpha.py": b"# a\n"}) result = _invoke(root, "--dry-run", "--json", "alpha.py", "beta.py") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["dry_run"] is True assert data["status"] == "dry_run" # --------------------------------------------------------------------------- # Integration — error conditions # --------------------------------------------------------------------------- def test_mv_source_not_tracked_exits_nonzero(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"other.py": b"# o\n"}) (root / "ghost.py").write_text("# untracked\n") result = _invoke(root, "ghost.py", "dest.py") assert result.exit_code != 0 def test_mv_source_not_on_disk_exits_nonzero(tmp_path: pathlib.Path) -> None: """Source tracked in HEAD but deleted from disk → error unless --force.""" root = _init_repo(tmp_path) _commit_files(root, {"missing.py": b"# m\n"}) (root / "missing.py").unlink() result = _invoke(root, "missing.py", "dest.py") assert result.exit_code != 0 def test_mv_dest_already_tracked_exits_nonzero(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"src.py": b"# src\n", "dst.py": b"# dst\n"}) result = _invoke(root, "src.py", "dst.py") assert result.exit_code != 0 def test_mv_dest_exists_on_disk_exits_nonzero(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"src.py": b"# src\n"}) (root / "dst.py").write_text("# untracked but on disk\n") result = _invoke(root, "src.py", "dst.py") assert result.exit_code != 0 # --------------------------------------------------------------------------- # Integration — --force # --------------------------------------------------------------------------- def test_mv_force_allows_tracked_dest(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"src.py": b"# src\n", "dst.py": b"# dst\n"}) result = _invoke(root, "--force", "src.py", "dst.py") assert result.exit_code == 0 stage = read_stage(root) assert "src.py" in stage and stage["src.py"]["mode"] == "D" assert "dst.py" in stage and stage["dst.py"]["mode"] in ("A", "M") def test_mv_force_allows_untracked_dest_on_disk(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"src.py": b"# src\n"}) (root / "dst.py").write_text("# untracked\n") result = _invoke(root, "--force", "src.py", "dst.py") assert result.exit_code == 0 assert not (root / "src.py").exists() assert (root / "dst.py").read_bytes() == b"# src\n" # --------------------------------------------------------------------------- # Integration — move into directory # --------------------------------------------------------------------------- def test_mv_into_existing_directory(tmp_path: pathlib.Path) -> None: """mv file.py dir/ moves to dir/file.py when dir/ exists.""" root = _init_repo(tmp_path) _commit_files(root, {"alpha.py": b"# a\n"}) (root / "subdir").mkdir() result = _invoke(root, "alpha.py", "subdir/") assert result.exit_code == 0 assert (root / "subdir" / "alpha.py").exists() stage = read_stage(root) assert "subdir/alpha.py" in stage assert stage["subdir/alpha.py"]["mode"] == "A" assert stage["alpha.py"]["mode"] == "D" def test_mv_into_directory_preserves_object_id(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) content = b"# content\n" _commit_files(root, {"f.py": content}) (root / "pkg").mkdir() _invoke(root, "f.py", "pkg/") stage = read_stage(root) assert stage["pkg/f.py"]["object_id"] == blob_id(content) # --------------------------------------------------------------------------- # Integration — staged-only file (never committed) # --------------------------------------------------------------------------- def test_mv_staged_only_source_updates_stage(tmp_path: pathlib.Path) -> None: """A file staged as 'A' (never committed) is moved: stage entry replaced.""" root = _init_repo(tmp_path) # Repo has at least one commit so HEAD exists _commit_files(root, {"anchor.py": b"# anchor\n"}) # Stage a new file that has never been committed content = b"# new file\n" obj_id = blob_id(content) write_object(root, obj_id, content) (root / "new.py").write_bytes(content) stage = read_stage(root) stage["new.py"] = make_entry(obj_id, "A") write_stage(root, stage) result = _invoke(root, "new.py", "renamed.py") assert result.exit_code == 0 stage_after = read_stage(root) # "new.py" must be gone from stage (was never committed → no "D" entry) assert "new.py" not in stage_after assert "renamed.py" in stage_after assert stage_after["renamed.py"]["mode"] == "A" assert stage_after["renamed.py"]["object_id"] == obj_id # --------------------------------------------------------------------------- # Integration — subdirectory paths # --------------------------------------------------------------------------- def test_mv_across_subdirectories(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) (root / "src").mkdir() (root / "lib").mkdir() _commit_files(root, {"src/module.py": b"# m\n"}) result = _invoke(root, "src/module.py", "lib/module.py") assert result.exit_code == 0 assert not (root / "src" / "module.py").exists() assert (root / "lib" / "module.py").exists() stage = read_stage(root) assert stage["src/module.py"]["mode"] == "D" assert stage["lib/module.py"]["mode"] == "A" # --------------------------------------------------------------------------- # Security # --------------------------------------------------------------------------- def test_mv_source_path_traversal_rejected(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"anchor.py": b"# a\n"}) result = _invoke(root, "../../../etc/passwd", "dest.py") assert result.exit_code != 0 def test_mv_dest_path_traversal_rejected(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"src.py": b"# s\n"}) result = _invoke(root, "src.py", "../../../tmp/malicious.py") assert result.exit_code != 0 def test_mv_outside_repo_source_rejected(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path / "repo") _commit_files(root, {"anchor.py": b"# a\n"}) outside = tmp_path / "outside.py" outside.write_text("# outside\n") result = _invoke(root, str(outside), "dest.py") assert result.exit_code != 0 # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- def test_mv_stress_200_files_move_half(tmp_path: pathlib.Path) -> None: """Move 100 of 200 tracked files; all stage entries must be consistent.""" root = _init_repo(tmp_path) files = {f"file_{i}.py": f"# {i}\n".encode() for i in range(200)} _commit_files(root, files) (root / "dest").mkdir() for i in range(0, 200, 2): # move even-numbered files result = _invoke(root, f"file_{i}.py", f"dest/file_{i}.py") assert result.exit_code == 0, f"Move failed for file_{i}.py: {result.output}" stage = read_stage(root) for i in range(0, 200, 2): assert stage[f"file_{i}.py"]["mode"] == "D" assert stage[f"dest/file_{i}.py"]["mode"] == "A" # Odd-numbered files untouched for i in range(1, 200, 2): assert f"file_{i}.py" not in stage class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: import argparse from muse.cli.commands.mv import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["mv", "a.py", "b.py"]) assert args.json_out is False def test_json_flag_sets_json_out(self) -> None: import argparse from muse.cli.commands.mv import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["mv", "a.py", "b.py", "--json"]) assert args.json_out is True def test_j_shorthand_sets_json_out(self) -> None: import argparse from muse.cli.commands.mv import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["mv", "a.py", "b.py", "-j"]) assert args.json_out is True