"""Hardening tests for ``muse clone``. Coverage -------- Unit - _infer_dir_name: normal URL, trailing slash, query/fragment stripped, path-traversal blocked, bare host, dot/double-dot fallback - _init_muse_dir: all _CLONE_SUBDIRS created, repo.json written, HEAD set, config.toml written, tags/ regression, superset of init subdirs - _restore_working_tree: commit None warns, snapshot None warns, happy path Integration (mocked transport — uses fetch_mpack / on_object callback) - already-exists guard raises USER_ERROR - signing identity forwarded to fetch_remote_info and fetch_mpack - domain fallback to "code" (not "midi") - branch fallback to first available when requested branch is missing - commits_received comes from apply_result["commits_written"], not mpack - --no-checkout skips apply_manifest - branch refs written for every remote branch - target directory removed on fetch_mpack failure - target directory removed on _init_muse_dir failure Security - ANSI injection in URL stripped in stderr output - ANSI injection in branch name stripped - Path-traversal URL blocked by _infer_dir_name - All progress/error messages go to stderr, not stdout - already-exists error on stderr, stdout empty E2E (via CliRunner) - --dry-run exits 0, no filesystem changes - --dry-run --json correct schema - --json cloned schema correct (all keys present, blobs_written from on_object) - --format json equivalent to --json - --no-checkout flag accepted - transport error exits non-zero - empty repository exits non-zero Performance / Stress - 8 concurrent clones into isolated directories do not interfere """ from __future__ import annotations import json import pathlib import threading from collections.abc import Callable from typing import TYPE_CHECKING from unittest.mock import MagicMock, patch import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.types import Manifest, blob_id if TYPE_CHECKING: from muse.cli.commands.clone import _CloneJson from muse.core.mpack import ApplyResult, RemoteInfo from muse.core.transport import FetchMPackResult, SigningIdentity cli = None runner = CliRunner() from muse.core.types import long_id COMMIT_ID = long_id("a" * 64) SNAP_ID = long_id("b" * 64) REPO_ID = "test-repo-id" # ── typed helpers ───────────────────────────────────────────────────────────── def _make_apply_result( commits_written: int = 5, blobs_written: int = 11, ) -> "ApplyResult": from muse.core.mpack import ApplyResult return ApplyResult( commits_written=commits_written, snapshots_written=commits_written, blobs_written=blobs_written, blobs_skipped=0, tags_written=0, failed_blobs=[], skipped_snapshots=[], ) def _make_remote_info( branch_heads: Manifest | None = None, domain: str = "code", default_branch: str = "main", repo_id: str = REPO_ID, ) -> "RemoteInfo": effective_heads: Manifest = ( {"main": COMMIT_ID} if branch_heads is None else branch_heads ) return { "repo_id": repo_id, "domain": domain, "default_branch": default_branch, "branch_heads": effective_heads, } def _make_fetch_mpack_result( commits: list[str] | None = None, snapshots: list[str] | None = None, blobs_received: int = 0, ) -> "FetchMPackResult": return { "commits": commits or [], "snapshots": snapshots or [], "blobs_received": blobs_received, } def _make_transport_mock( branch_heads: Manifest | None = None, domain: str = "code", objects_count: int = 11, ) -> MagicMock: """Return a transport mock whose fetch_mpack dispatches objects via on_object.""" t = MagicMock() t.fetch_remote_info.return_value = _make_remote_info(branch_heads, domain=domain) def _fetch_mpack(url: str, signing: "SigningIdentity | None", want: list[str], have: list[str], on_object: Callable[..., None] | None = None, **kwargs: str) -> "FetchMPackResult": if callable(on_object): for i in range(objects_count): content = f"fake-blob-{i}".encode() oid = blob_id(content) on_object({"object_id": oid, "content": content, "path": f"file{i}.txt"}) return _make_fetch_mpack_result(blobs_received=objects_count) t.fetch_mpack.side_effect = _fetch_mpack return t def _json_line(result: InvokeResult) -> "_CloneJson": for line in result.output.splitlines(): stripped = line.strip() if stripped.startswith("{"): parsed: _CloneJson = json.loads(stripped) return parsed raise ValueError(f"No JSON line in output:\n{result.output!r}") # ── Unit: _infer_dir_name ───────────────────────────────────────────────────── class TestInferDirName: def test_last_url_segment(self) -> None: from muse.cli.commands.clone import _infer_dir_name assert _infer_dir_name("http://hub.muse.ai/gabriel/my-repo") == "my-repo" def test_trailing_slash_stripped(self) -> None: from muse.cli.commands.clone import _infer_dir_name assert _infer_dir_name("http://hub.muse.ai/gabriel/my-repo/") == "my-repo" def test_query_string_stripped(self) -> None: from muse.cli.commands.clone import _infer_dir_name assert _infer_dir_name("http://hub.muse.ai/repo?token=abc") == "repo" def test_fragment_stripped(self) -> None: from muse.cli.commands.clone import _infer_dir_name assert _infer_dir_name("http://hub.muse.ai/repo#section") == "repo" def test_bare_host_uses_hostname(self) -> None: from muse.cli.commands.clone import _infer_dir_name result = _infer_dir_name("http://hub.muse.ai/") assert result == "hub.muse.ai" assert ".." not in result def test_path_traversal_blocked(self) -> None: from muse.cli.commands.clone import _infer_dir_name result = _infer_dir_name("http://attacker.example.com/../../../../etc/passwd") assert ".." not in result assert "/" not in result assert result != "" def test_dot_only_falls_back(self) -> None: from muse.cli.commands.clone import _infer_dir_name result = _infer_dir_name("http://example.com/.") assert result not in (".", "..") def test_double_dot_falls_back(self) -> None: from muse.cli.commands.clone import _infer_dir_name result = _infer_dir_name("http://example.com/..") assert result not in (".", "..") assert result == "muse-repo" # ── Unit: _init_muse_dir ────────────────────────────────────────────────────── class TestInitMuseDir: def test_all_clone_subdirs_created(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.clone import _CLONE_SUBDIRS, _init_muse_dir _init_muse_dir(tmp_path, REPO_ID, "code", "main") dot_muse = muse_dir(tmp_path) for subdir in _CLONE_SUBDIRS: assert (dot_muse / subdir).is_dir(), f"Missing subdir: {subdir}" def test_tags_dir_created(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.clone import _init_muse_dir _init_muse_dir(tmp_path, REPO_ID, "code", "main") assert (tags_dir(tmp_path)).is_dir() def test_repo_json_written(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.clone import _init_muse_dir _init_muse_dir(tmp_path, "my-repo-id", "code", "main") meta = json.loads((repo_json_path(tmp_path)).read_text()) assert meta["repo_id"] == "my-repo-id" assert meta["domain"] == "code" assert "schema_version" in meta assert "created_at" in meta def test_head_file_written(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.clone import _init_muse_dir _init_muse_dir(tmp_path, REPO_ID, "code", "dev") head = (head_path(tmp_path)).read_text() assert "dev" in head def test_default_branch_ref_created(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.clone import _init_muse_dir _init_muse_dir(tmp_path, REPO_ID, "code", "main") assert (heads_dir(tmp_path) / "main").exists() def test_config_toml_written(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.clone import _init_muse_dir _init_muse_dir(tmp_path, REPO_ID, "code", "main") config = (config_toml_path(tmp_path)).read_text() assert "[remotes]" in config def test_superset_of_init_subdirs(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.clone import _CLONE_SUBDIRS from muse.cli.commands.init import _INIT_SUBDIRS clone_set = set(_CLONE_SUBDIRS) for subdir in _INIT_SUBDIRS: assert subdir in clone_set, ( f"_INIT_SUBDIRS has '{subdir}' but _CLONE_SUBDIRS does not" ) # ── Unit: _restore_working_tree ─────────────────────────────────────────────── class TestRestoreWorkingTree: def test_warns_when_commit_not_found( self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: from muse.cli.commands.clone import _restore_working_tree with patch("muse.cli.commands.clone.read_commit", return_value=None): with patch("muse.cli.commands.clone.apply_manifest") as am: _restore_working_tree(tmp_path, long_id("a" * 64)) am.assert_not_called() def test_warns_when_snapshot_not_found( self, tmp_path: pathlib.Path, caplog: pytest.LogCaptureFixture ) -> None: from muse.cli.commands.clone import _restore_working_tree fake_commit = MagicMock() fake_commit.snapshot_id = SNAP_ID with ( patch("muse.cli.commands.clone.read_commit", return_value=fake_commit), patch("muse.cli.commands.clone.read_snapshot", return_value=None), ): with patch("muse.cli.commands.clone.apply_manifest") as am: _restore_working_tree(tmp_path, COMMIT_ID) am.assert_not_called() def test_happy_path_calls_apply_manifest(self, tmp_path: pathlib.Path) -> None: from muse.cli.commands.clone import _restore_working_tree fake_commit = MagicMock() fake_commit.snapshot_id = SNAP_ID fake_snap = MagicMock() fake_snap.manifest = {"file.txt": "aabbcc"} with ( patch("muse.cli.commands.clone.read_commit", return_value=fake_commit), patch("muse.cli.commands.clone.read_snapshot", return_value=fake_snap), ): with patch("muse.cli.commands.clone.apply_manifest") as am: _restore_working_tree(tmp_path, COMMIT_ID) am.assert_called_once_with(tmp_path, {}, {"file.txt": "aabbcc"}) # ── Integration: run() with mocked transport ────────────────────────────────── def _invoke_run( tmp_path: pathlib.Path, url: str = "http://localhost:19999/gabriel/repo", branch: str | None = None, dry_run: bool = False, no_checkout: bool = False, json_out: bool = False, transport: MagicMock | None = None, apply_result: "ApplyResult | None" = None, signing: "SigningIdentity | None" = None, objects_count: int = 11, ) -> None: import argparse from muse.cli.commands.clone import run t = transport or _make_transport_mock(objects_count=objects_count) ar = apply_result or _make_apply_result() args = argparse.Namespace( url=url, directory=str(tmp_path / "cloned"), branch=branch, dry_run=dry_run, no_checkout=no_checkout, json_out=json_out, ) with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=signing), patch("muse.cli.commands.clone.make_transport", return_value=t), patch("muse.cli.commands.clone.apply_mpack", return_value=ar), patch("muse.cli.commands.clone.set_remote"), patch("muse.cli.commands.clone.set_remote_head"), patch("muse.cli.commands.clone.set_upstream"), patch("muse.cli.commands.clone.apply_manifest"), patch("muse.cli.commands.clone.read_commit", return_value=MagicMock(snapshot_id=SNAP_ID)), patch("muse.cli.commands.clone.read_snapshot", return_value=MagicMock(manifest={})), ): run(args) class TestRunIntegration: def test_already_exists_raises_user_error(self, tmp_path: pathlib.Path) -> None: import argparse from muse.cli.commands.clone import run from muse.core.errors import ExitCode target = tmp_path / "existing" muse_dir(target).mkdir(parents=True) args = argparse.Namespace( url="http://localhost:19999/repo", directory=str(target), branch=None, dry_run=False, no_checkout=False, json_out=False, ) with pytest.raises(SystemExit) as exc: with patch("muse.cli.commands.clone.get_signing_identity", return_value=None): run(args) assert exc.value.code == ExitCode.USER_ERROR def test_signing_forwarded_to_transport(self, tmp_path: pathlib.Path) -> None: """Signing identity must reach both fetch_remote_info and fetch_mpack.""" from muse.core.transport import SigningIdentity from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey signing = SigningIdentity("alice", Ed25519PrivateKey.generate()) t = _make_transport_mock() _invoke_run(tmp_path, transport=t, signing=signing) t.fetch_remote_info.assert_called_once_with( "http://localhost:19999/gabriel/repo", signing=signing ) t.fetch_mpack.assert_called_once() call_kwargs = t.fetch_mpack.call_args # signing is the second positional arg assert call_kwargs.args[1] is signing or call_kwargs.kwargs.get("signing") is signing def test_domain_defaults_to_code_not_midi(self, tmp_path: pathlib.Path) -> None: t = _make_transport_mock() t.fetch_remote_info.return_value = _make_remote_info(domain="") target = tmp_path / "cloned" import argparse from muse.cli.commands.clone import run args = argparse.Namespace( url="http://localhost:19999/repo", directory=str(target), branch=None, dry_run=False, no_checkout=False, json_out=False, ) with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", return_value=t), patch("muse.cli.commands.clone.apply_mpack", return_value=_make_apply_result()), patch("muse.cli.commands.clone.set_remote"), patch("muse.cli.commands.clone.set_remote_head"), patch("muse.cli.commands.clone.set_upstream"), patch("muse.cli.commands.clone.apply_manifest"), patch("muse.cli.commands.clone.read_commit", return_value=MagicMock(snapshot_id=SNAP_ID)), patch("muse.cli.commands.clone.read_snapshot", return_value=MagicMock(manifest={})), ): run(args) meta = json.loads((repo_json_path(target)).read_text()) assert meta["domain"] == "code", f"Expected 'code', got '{meta['domain']}'" def test_commits_received_from_apply_result_not_bundle( self, tmp_path: pathlib.Path ) -> None: """commits_received in JSON output must come from apply_result['commits_written'].""" output_lines: list[str] = [] import argparse from muse.cli.commands.clone import run ar = _make_apply_result(commits_written=7, blobs_written=23) t = _make_transport_mock(objects_count=0) args = argparse.Namespace( url="http://localhost:19999/repo", directory=str(tmp_path / "cloned"), branch=None, dry_run=False, no_checkout=False, json_out=True, ) with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", return_value=t), patch("muse.cli.commands.clone.apply_mpack", return_value=ar), patch("muse.cli.commands.clone.set_remote"), patch("muse.cli.commands.clone.set_remote_head"), patch("muse.cli.commands.clone.set_upstream"), patch("muse.cli.commands.clone.apply_manifest"), patch("muse.cli.commands.clone.read_commit", return_value=MagicMock(snapshot_id=SNAP_ID)), patch("muse.cli.commands.clone.read_snapshot", return_value=MagicMock(manifest={})), patch("builtins.print", side_effect=lambda *a, **kw: output_lines.append(str(a[0]) if a else "")), ): run(args) json_out = next((l for l in output_lines if l.strip().startswith("{")), None) assert json_out is not None data: _CloneJson = json.loads(json_out) assert data["commits_received"] == 7 def test_branch_fallback_to_first_available( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: import argparse from muse.cli.commands.clone import run t = _make_transport_mock(branch_heads={"dev": COMMIT_ID}) args = argparse.Namespace( url="http://localhost:19999/repo", directory=str(tmp_path / "cloned"), branch="nonexistent", dry_run=False, no_checkout=True, json_out=False, ) with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", return_value=t), patch("muse.cli.commands.clone.apply_mpack", return_value=_make_apply_result()), patch("muse.cli.commands.clone.set_remote"), patch("muse.cli.commands.clone.set_remote_head"), patch("muse.cli.commands.clone.set_upstream"), ): run(args) assert "nonexistent" in capsys.readouterr().err def test_target_dir_removed_on_fetch_failure( self, tmp_path: pathlib.Path ) -> None: """If fetch_mpack raises TransportError, the target directory is removed.""" from muse.core.transport import TransportError import argparse from muse.cli.commands.clone import run t = _make_transport_mock() t.fetch_mpack.side_effect = TransportError("connection refused", 503) target = tmp_path / "cloned" args = argparse.Namespace( url="http://localhost:19999/repo", directory=str(target), branch=None, dry_run=False, no_checkout=False, json_out=False, ) with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", return_value=t), pytest.raises(SystemExit), ): run(args) assert not target.exists(), "Target directory must be removed on failure" def test_target_dir_removed_on_init_failure( self, tmp_path: pathlib.Path ) -> None: import argparse from muse.cli.commands.clone import run args = argparse.Namespace( url="http://localhost:19999/repo", directory=str(tmp_path / "cloned"), branch=None, dry_run=False, no_checkout=False, json_out=False, ) with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", return_value=_make_transport_mock()), patch("muse.cli.commands.clone._init_muse_dir", side_effect=OSError("disk full")), pytest.raises(SystemExit), ): run(args) assert not (tmp_path / "cloned").exists() def test_no_checkout_skips_apply_manifest(self, tmp_path: pathlib.Path) -> None: t = _make_transport_mock() apply_mock = MagicMock() import argparse from muse.cli.commands.clone import run args = argparse.Namespace( url="http://localhost:19999/repo", directory=str(tmp_path / "cloned"), branch=None, dry_run=False, no_checkout=True, json_out=False, ) with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", return_value=t), patch("muse.cli.commands.clone.apply_mpack", return_value=_make_apply_result()), patch("muse.cli.commands.clone.set_remote"), patch("muse.cli.commands.clone.set_remote_head"), patch("muse.cli.commands.clone.set_upstream"), patch("muse.cli.commands.clone.apply_manifest", apply_mock), ): run(args) apply_mock.assert_not_called() def test_branch_refs_written_for_every_remote_branch( self, tmp_path: pathlib.Path ) -> None: branches = {"main": long_id("a" * 64), "dev": long_id("b" * 64), "feat/x": long_id("c" * 64)} t = _make_transport_mock(branch_heads=branches) import argparse from muse.cli.commands.clone import run target = tmp_path / "cloned" args = argparse.Namespace( url="http://localhost:19999/repo", directory=str(target), branch=None, dry_run=False, no_checkout=True, json_out=False, ) with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", return_value=t), patch("muse.cli.commands.clone.apply_mpack", return_value=_make_apply_result()), patch("muse.cli.commands.clone.set_remote"), patch("muse.cli.commands.clone.set_remote_head"), patch("muse.cli.commands.clone.set_upstream"), ): run(args) for branch, cid in branches.items(): ref_file = ref_path(target, branch) assert ref_file.exists(), f"Missing ref file for branch {branch}" assert ref_file.read_text() == cid # ── Security ────────────────────────────────────────────────────────────────── class TestSecurity: def test_ansi_in_url_stripped_in_stderr( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: malicious_url = "\x1b[31mhttp://attacker.example.com/repo\x1b[0m" import argparse from muse.cli.commands.clone import run from muse.core.transport import TransportError t = MagicMock() t.fetch_remote_info.side_effect = TransportError("refused", 503) args = argparse.Namespace( url=malicious_url, directory=str(tmp_path / "cloned"), branch=None, dry_run=False, no_checkout=False, json_out=False, ) with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", return_value=t), pytest.raises(SystemExit), ): run(args) assert "\x1b[" not in capsys.readouterr().err def test_ansi_in_branch_name_stripped( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: malicious_branch = "\x1b[32mHACKED\x1b[0m" t = _make_transport_mock(branch_heads={"main": COMMIT_ID}) import argparse from muse.cli.commands.clone import run args = argparse.Namespace( url="http://localhost:19999/repo", directory=str(tmp_path / "cloned"), branch=malicious_branch, dry_run=False, no_checkout=True, json_out=False, ) with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", return_value=t), patch("muse.cli.commands.clone.apply_mpack", return_value=_make_apply_result()), patch("muse.cli.commands.clone.set_remote"), patch("muse.cli.commands.clone.set_remote_head"), patch("muse.cli.commands.clone.set_upstream"), ): run(args) assert "\x1b[" not in capsys.readouterr().err def test_path_traversal_url_blocked(self) -> None: from muse.cli.commands.clone import _infer_dir_name result = _infer_dir_name("http://attacker.example.com/../../../etc/passwd") assert ".." not in result assert "etc" not in result or result == "etc" def test_all_diagnostics_go_to_stderr_not_stdout( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: t = _make_transport_mock() import argparse from muse.cli.commands.clone import run args = argparse.Namespace( url="http://localhost:19999/repo", directory=str(tmp_path / "cloned"), branch=None, dry_run=False, no_checkout=True, json_out=False, ) with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", return_value=t), patch("muse.cli.commands.clone.apply_mpack", return_value=_make_apply_result()), patch("muse.cli.commands.clone.set_remote"), patch("muse.cli.commands.clone.set_remote_head"), patch("muse.cli.commands.clone.set_upstream"), ): run(args) assert capsys.readouterr().out == "" def test_already_exists_message_on_stderr( self, tmp_path: pathlib.Path, capsys: pytest.CaptureFixture[str] ) -> None: import argparse from muse.cli.commands.clone import run target = tmp_path / "existing" muse_dir(target).mkdir(parents=True) args = argparse.Namespace( url="http://localhost:19999/repo", directory=str(target), branch=None, dry_run=False, no_checkout=False, json_out=False, ) with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), pytest.raises(SystemExit), ): run(args) cap = capsys.readouterr() assert "already" in cap.err.lower() assert cap.out == "" # ── E2E: via CliRunner ──────────────────────────────────────────────────────── def _invoke(*args: str, transport: MagicMock | None = None, tmp_path: pathlib.Path | None = None) -> InvokeResult: t = transport or _make_transport_mock() with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", return_value=t), patch("muse.cli.commands.clone.apply_mpack", return_value=_make_apply_result()), patch("muse.cli.commands.clone.set_remote"), patch("muse.cli.commands.clone.set_remote_head"), patch("muse.cli.commands.clone.set_upstream"), patch("muse.cli.commands.clone.apply_manifest"), patch("muse.cli.commands.clone.read_commit", return_value=MagicMock(snapshot_id=SNAP_ID)), patch("muse.cli.commands.clone.read_snapshot", return_value=MagicMock(manifest={})), ): return runner.invoke( cli, ["clone", "http://localhost:19999/gabriel/repo", *args], ) class TestCLIClone: def test_dry_run_exits_zero_no_fs_changes(self, tmp_path: pathlib.Path) -> None: target = tmp_path / "should-not-exist" result = runner.invoke( cli, ["clone", "http://localhost:19999/repo", str(target), "--dry-run"], ) assert not target.exists() def test_dry_run_json_schema(self, tmp_path: pathlib.Path) -> None: with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", return_value=_make_transport_mock()), ): result = runner.invoke( cli, ["clone", "http://localhost:19999/repo", str(tmp_path / "out"), "--dry-run", "--json"], ) assert result.exit_code == 0, result.output data = _json_line(result) assert data["status"] == "dry_run" assert data["dry_run"] is True assert data["head"] == COMMIT_ID assert data["domain"] == "code" def test_json_cloned_schema(self, tmp_path: pathlib.Path) -> None: """All expected keys present; blobs_written counted via on_object callback.""" with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", return_value=_make_transport_mock(objects_count=11)), patch("muse.cli.commands.clone.apply_mpack", return_value=_make_apply_result(commits_written=5)), patch("muse.cli.commands.clone.set_remote"), patch("muse.cli.commands.clone.set_remote_head"), patch("muse.cli.commands.clone.set_upstream"), patch("muse.cli.commands.clone.apply_manifest"), patch("muse.cli.commands.clone.read_commit", return_value=MagicMock(snapshot_id=SNAP_ID)), patch("muse.cli.commands.clone.read_snapshot", return_value=MagicMock(manifest={})), ): result = runner.invoke( cli, ["clone", "http://localhost:19999/repo", str(tmp_path / "out"), "--json"], ) assert result.exit_code == 0, result.output data = _json_line(result) for key in ("status", "url", "directory", "branch", "commits_received", "blobs_written", "head", "domain", "dry_run"): assert key in data, f"Missing key: {key}" assert data["status"] == "cloned" assert data["dry_run"] is False assert data["commits_received"] == 5 assert data["blobs_written"] == 11 def test_no_checkout_flag_accepted(self, tmp_path: pathlib.Path) -> None: with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", return_value=_make_transport_mock()), patch("muse.cli.commands.clone.apply_mpack", return_value=_make_apply_result()), patch("muse.cli.commands.clone.set_remote"), patch("muse.cli.commands.clone.set_remote_head"), patch("muse.cli.commands.clone.set_upstream"), ): result = runner.invoke( cli, ["clone", "http://localhost:19999/repo", str(tmp_path / "out"), "--no-checkout"], ) assert result.exit_code == 0, result.output def test_transport_error_exits_nonzero(self) -> None: from muse.core.transport import TransportError t = MagicMock() t.fetch_remote_info.side_effect = TransportError("refused", 503) with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", return_value=t), ): result = runner.invoke(cli, ["clone", "http://localhost:19999/repo", "/tmp/muse-test-clone-xxx"]) assert result.exit_code != 0 def test_empty_repository_exits_nonzero(self) -> None: t = MagicMock() t.fetch_remote_info.return_value = _make_remote_info(branch_heads={}) with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", return_value=t), ): result = runner.invoke(cli, ["clone", "http://localhost:19999/repo", "/tmp/muse-test-clone-empty"]) assert result.exit_code != 0 def test_json_on_stdout_parseable(self, tmp_path: pathlib.Path) -> None: with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", return_value=_make_transport_mock()), ): result = runner.invoke( cli, ["clone", "http://localhost:19999/repo", str(tmp_path / "out"), "--dry-run", "--json"], ) assert result.exit_code == 0 data = _json_line(result) assert "status" in data # ── Stress: concurrent clones into isolated directories ─────────────────────── class TestStressConcurrent: def test_8_concurrent_clones_isolated(self, tmp_path: pathlib.Path) -> None: """8 concurrent clone calls into separate directories must not interfere. Patches are applied once at the test level (not per-thread) to avoid unittest.mock.patch thread-safety issues: concurrent patch/unpatch of the same attribute causes stale mocks to leak into subsequent tests. """ import argparse from muse.cli.commands.clone import run errors: list[str] = [] # Per-thread transport mocks are safe because they are independent objects. transports = [_make_transport_mock() for _ in range(8)] def _do(idx: int) -> None: try: target = tmp_path / f"repo{idx}" args = argparse.Namespace( url=f"http://localhost:19999/repo{idx}", directory=str(target), branch=None, dry_run=False, no_checkout=True, json_out=False, ) run(args) assert muse_dir(target).is_dir() assert (tags_dir(target)).is_dir() meta = json.loads((repo_json_path(target)).read_text()) assert meta["domain"] == "code" except Exception as exc: errors.append(f"Thread {idx}: {exc}") with ( patch("muse.cli.commands.clone.get_signing_identity", return_value=None), patch("muse.cli.commands.clone.make_transport", side_effect=transports), patch("muse.cli.commands.clone.apply_mpack", return_value=_make_apply_result()), patch("muse.cli.commands.clone.set_remote"), patch("muse.cli.commands.clone.set_remote_head"), patch("muse.cli.commands.clone.set_upstream"), ): threads = [threading.Thread(target=_do, args=(i,)) for i in range(8)] for th in threads: th.start() for th in threads: th.join() assert errors == [], f"Concurrent clone failures:\n{'\n'.join(errors)}" # --------------------------------------------------------------------------- # Flag registration tests # --------------------------------------------------------------------------- import argparse as _argparse from muse.cli.commands.clone import register as _register_clone from muse.core.paths import config_toml_path, head_path, heads_dir, muse_dir, ref_path, repo_json_path, tags_dir def _parse_clone(*args: str) -> _argparse.Namespace: root_p = _argparse.ArgumentParser() subs = root_p.add_subparsers(dest="cmd") _register_clone(subs) return root_p.parse_args(["clone", *args]) class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: ns = _parse_clone("https://example.com/repo") assert ns.json_out is False def test_json_flag_sets_json_out(self) -> None: ns = _parse_clone("https://example.com/repo", "--json") assert ns.json_out is True def test_j_shorthand_sets_json_out(self) -> None: ns = _parse_clone("https://example.com/repo", "-j") assert ns.json_out is True def test_dry_run_flag(self) -> None: ns = _parse_clone("https://example.com/repo", "--dry-run") assert ns.dry_run is True def test_n_shorthand_for_dry_run(self) -> None: ns = _parse_clone("https://example.com/repo", "-n") assert ns.dry_run is True def test_format_flag_no_longer_exists(self) -> None: import pytest with pytest.raises(SystemExit): _parse_clone("https://example.com/repo", "--format", "json")