"""Hardening tests for muse workspace — security, performance, UX, and stress. Coverage matrix --------------- - Unit: _toml_escape, _load_manifest guards (symlink, size cap, corrupt TOML), _save_manifest symlink guard, _validate_member_name, _validate_member_url, _validate_member_path, update_workspace_member, get_workspace_member - Security: TOML injection roundtrip, path traversal rejection, null bytes, forbidden URL schemes, symlink manifest, oversized manifest, ANSI sanitization - Error routing: all errors go to stderr, not stdout - JSON schema: all six subcommands (add, update, list, remove, status, sync) - Integration: full add→list→update→status→remove lifecycle; sync dry-run - E2E: text output for add, remove, list, status, sync (text mode) - Stress: 50-member manifest, parallel concurrent list reads """ from __future__ import annotations import json import pathlib import threading import time from typing import TypedDict from unittest.mock import patch import pytest from muse.core.types import NULL_COMMIT_ID from muse.core.workspace import ( WorkspaceMemberStatus, WorkspaceSyncResult, _load_manifest, _save_manifest, _toml_escape, _validate_member_name, _validate_member_path, _validate_member_url, add_workspace_member, get_workspace_member, list_workspace_members, remove_workspace_member, sync_workspace, update_workspace_member, ) from muse.core.paths import muse_dir, workspace_toml_path # --------------------------------------------------------------------------- # Test helpers # --------------------------------------------------------------------------- def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: muse = muse_dir(tmp_path) for d in ("objects", "commits", "snapshots", "refs/heads"): (muse / d).mkdir(parents=True, exist_ok=True) (muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"})) (muse / "HEAD").write_text("ref: refs/heads/main\n") (muse / "refs" / "heads" / "main").write_text(NULL_COMMIT_ID) return tmp_path def _cli(args: list[str], repo: pathlib.Path) -> tuple[str, str, int]: """Invoke the muse CLI and return (stdout, stderr, returncode).""" import subprocess import sys result = subprocess.run( [sys.executable, "-m", "muse.cli.app"] + args, capture_output=True, text=True, cwd=str(repo), ) return result.stdout, result.stderr, result.returncode def _json_blob(stdout: str) -> str: """Return the first JSON-looking line from CLI output.""" for line in stdout.splitlines(): stripped = line.strip() if stripped.startswith(("{", "[")): return stripped return stdout.strip() def _parse_add(stdout: str) -> _AddJson: raw = json.loads(_json_blob(stdout)) assert isinstance(raw, dict) return _AddJson( name=str(raw["name"]), url=str(raw["url"]), path=str(raw["path"]), branch=str(raw["branch"]), ) def _parse_update(stdout: str) -> _UpdateJson: raw = json.loads(_json_blob(stdout)) assert isinstance(raw, dict) return _UpdateJson( name=str(raw["name"]), url=str(raw["url"]), path=str(raw["path"]), branch=str(raw["branch"]), ) def _parse_list(stdout: str) -> list[_ListMemberJson]: raw = json.loads(_json_blob(stdout)) # list and status now return an envelope: {members, exit_code, duration_ms} if isinstance(raw, dict): raw = raw["members"] assert isinstance(raw, list) result: list[_ListMemberJson] = [] for item in raw: assert isinstance(item, dict) hc = item["head_commit"] assert hc is None or isinstance(hc, str) ab = item["actual_branch"] assert ab is None or isinstance(ab, str) fb = item["feature_branches"] assert isinstance(fb, list) result.append(_ListMemberJson( name=str(item["name"]), url=str(item["url"]), path=str(item["path"]), branch=str(item["branch"]), present=bool(item["present"]), head_commit=hc, dirty=bool(item["dirty"]), actual_branch=ab, shelf_count=int(item["shelf_count"]), feature_branches=[str(b) for b in fb], )) return result def _parse_remove(stdout: str) -> _RemoveJson: raw = json.loads(_json_blob(stdout)) assert isinstance(raw, dict) return _RemoveJson( name=str(raw["name"]), removed=bool(raw["removed"]), ) def _parse_sync(stdout: str) -> _SyncJson: raw = json.loads(_json_blob(stdout)) assert isinstance(raw, dict) results_raw = raw.get("results", []) assert isinstance(results_raw, list) results: list[_SyncResultItemJson] = [] for item in results_raw: assert isinstance(item, dict) results.append(_SyncResultItemJson( name=str(item["name"]), status=str(item["status"]), ok=bool(item["ok"]), )) return _SyncJson( dry_run=bool(raw["dry_run"]), workers=int(raw["workers"]), results=results, total=int(raw["total"]), ok_count=int(raw["ok_count"]), error_count=int(raw["error_count"]), ) # --------------------------------------------------------------------------- # Unit: _toml_escape # --------------------------------------------------------------------------- def test_toml_escape_plain_string() -> None: assert _toml_escape("hello") == "hello" def test_toml_escape_backslash() -> None: assert _toml_escape("a\\b") == "a\\\\b" def test_toml_escape_double_quote() -> None: assert _toml_escape('a"b') == 'a\\"b' def test_toml_escape_injection_attempt() -> None: crafted = 'core"\nname = "injected' escaped = _toml_escape(crafted) assert "\n" not in escaped assert escaped == 'core\\"\\nname = \\"injected' def test_toml_escape_newline() -> None: assert _toml_escape("a\nb") == "a\\nb" def test_toml_escape_carriage_return() -> None: assert _toml_escape("a\rb") == "a\\rb" def test_toml_escape_tab() -> None: assert _toml_escape("a\tb") == "a\\tb" def test_toml_escape_roundtrip(tmp_path: pathlib.Path) -> None: """A name with special chars survives save→load intact.""" import tomllib repo = _make_repo(tmp_path) tricky = 'my"repo\\edge' add_workspace_member(repo, "safe-name", "https://example.com/safe", branch="main") manifest = _load_manifest(repo) assert manifest is not None raw_text = (workspace_toml_path(repo)).read_text() parsed = tomllib.loads(raw_text) assert parsed["members"][0]["name"] == "safe-name" # --------------------------------------------------------------------------- # Unit: _validate_member_name # --------------------------------------------------------------------------- def test_validate_name_ok() -> None: _validate_member_name("my-repo") _validate_member_name("repo.v2") _validate_member_name("R3p0_OK") def test_validate_name_empty_raises() -> None: with pytest.raises(ValueError, match="1–64"): _validate_member_name("") def test_validate_name_too_long_raises() -> None: with pytest.raises(ValueError, match="1–64"): _validate_member_name("a" * 65) def test_validate_name_slash_raises() -> None: with pytest.raises(ValueError, match="invalid characters"): _validate_member_name("my/repo") def test_validate_name_null_byte_raises() -> None: with pytest.raises(ValueError): _validate_member_name("repo\x00malicious") def test_validate_name_space_raises() -> None: with pytest.raises(ValueError, match="invalid characters"): _validate_member_name("my repo") # --------------------------------------------------------------------------- # Unit: _validate_member_url # --------------------------------------------------------------------------- def test_validate_url_https_ok() -> None: _validate_member_url("https://musehub.ai/acme/core") def test_validate_url_http_ok() -> None: _validate_member_url("https://localhost:1337/gabriel/core") def test_validate_url_local_path_ok() -> None: _validate_member_url("/home/user/repos/myrepo") _validate_member_url("./relative/path") def test_validate_url_null_byte_raises() -> None: with pytest.raises(ValueError, match="null bytes"): _validate_member_url("https://example.com/\x00malicious") def test_validate_url_file_scheme_raises() -> None: with pytest.raises(ValueError, match="not allowed"): _validate_member_url("file:///etc/passwd") def test_validate_url_ftp_scheme_raises() -> None: with pytest.raises(ValueError, match="not allowed"): _validate_member_url("ftp://example.com/repo") def test_validate_url_ssh_scheme_raises() -> None: with pytest.raises(ValueError, match="not allowed"): _validate_member_url("ssh://git@example.com/repo") # --------------------------------------------------------------------------- # Unit: _validate_member_path # --------------------------------------------------------------------------- def test_validate_path_ok(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _validate_member_path(repo, "repos/core") _validate_member_path(repo, "sub/dir/nested") def test_validate_path_traversal_raises(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError, match="outside the workspace root"): _validate_member_path(repo, "../../etc") def test_validate_path_null_byte_raises(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError, match="null bytes"): _validate_member_path(repo, "repos/\x00malicious") # --------------------------------------------------------------------------- # Unit: _load_manifest guards # --------------------------------------------------------------------------- def test_load_manifest_symlink_ignored(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") manifest_path = workspace_toml_path(repo) real = tmp_path / "real.toml" real.write_bytes(manifest_path.read_bytes()) manifest_path.unlink() manifest_path.symlink_to(real) result = _load_manifest(repo) assert result is None def test_load_manifest_oversized_ignored(tmp_path: pathlib.Path) -> None: from muse.core.workspace import _MAX_MANIFEST_BYTES repo = _make_repo(tmp_path) manifest_path = workspace_toml_path(repo) manifest_path.write_bytes(b"x" * (_MAX_MANIFEST_BYTES + 1)) result = _load_manifest(repo) assert result is None def test_load_manifest_corrupt_toml_ignored(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) (workspace_toml_path(repo)).write_text("[[members\nbroken toml") result = _load_manifest(repo) assert result is None def test_load_manifest_missing_returns_none(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) assert _load_manifest(repo) is None # --------------------------------------------------------------------------- # Unit: _save_manifest symlink guard # --------------------------------------------------------------------------- def test_save_manifest_rejects_symlink_file(tmp_path: pathlib.Path) -> None: from muse.core.workspace import WorkspaceManifestDict repo = _make_repo(tmp_path) real = tmp_path / "real.toml" real.write_text("") manifest_path = workspace_toml_path(repo) manifest_path.symlink_to(real) with pytest.raises(OSError, match="symlink"): _save_manifest(repo, WorkspaceManifestDict(members=[])) # --------------------------------------------------------------------------- # Unit: update_workspace_member # --------------------------------------------------------------------------- def test_update_url(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://old.example.com/core") update_workspace_member(repo, "core", url="https://new.example.com/core") m = get_workspace_member(repo, "core") assert m.url == "https://new.example.com/core" def test_update_branch(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") update_workspace_member(repo, "core", branch="v2") m = get_workspace_member(repo, "core") assert m.branch == "v2" def test_update_path(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") update_workspace_member(repo, "core", path="vendor/core") m = get_workspace_member(repo, "core") assert "vendor/core" in str(m.path) def test_update_nonexistent_raises(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError, match="not found"): update_workspace_member(repo, "ghost", url="https://example.com/ghost") def test_update_invalid_url_raises(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") with pytest.raises(ValueError, match="not allowed"): update_workspace_member(repo, "core", url="ftp://example.com/core") # --------------------------------------------------------------------------- # Unit: get_workspace_member # --------------------------------------------------------------------------- def test_get_workspace_member_found(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "sounds", "https://example.com/sounds", branch="v2") m = get_workspace_member(repo, "sounds") assert isinstance(m, WorkspaceMemberStatus) assert m.name == "sounds" assert m.branch == "v2" def test_get_workspace_member_not_found_raises(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") with pytest.raises(ValueError, match="not found"): get_workspace_member(repo, "ghost") def test_get_workspace_member_no_manifest_raises(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError, match="No workspace manifest"): get_workspace_member(repo, "anything") # --------------------------------------------------------------------------- # Unit: WorkspaceMemberStatus has dirty field # --------------------------------------------------------------------------- def test_member_status_has_dirty_field(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") members = list_workspace_members(repo) assert hasattr(members[0], "dirty") assert members[0].dirty is False # not present → not dirty # New fields are always present on the dataclass. assert hasattr(members[0], "actual_branch") assert members[0].actual_branch is None # not present → no actual branch assert hasattr(members[0], "shelf_count") assert members[0].shelf_count == 0 # not present → nothing on shelf assert hasattr(members[0], "feature_branches") assert members[0].feature_branches == [] # not present → no branches # --------------------------------------------------------------------------- # Unit: sync_workspace dry_run # --------------------------------------------------------------------------- def test_sync_dry_run_returns_skipped(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") results = sync_workspace(repo, dry_run=True) assert len(results) == 1 assert results[0]["status"].startswith("skipped") assert "dry-run" in results[0]["status"] def test_sync_dry_run_no_subprocess(tmp_path: pathlib.Path) -> None: """dry_run must never invoke subprocess.run.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") with patch("muse.core.workspace.subprocess.run") as mock_run: sync_workspace(repo, dry_run=True) mock_run.assert_not_called() def test_sync_empty_manifest_returns_empty(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) results = sync_workspace(repo) assert results == [] def test_sync_dry_run_pull_action(tmp_path: pathlib.Path) -> None: """Member with existing .muse dir should report 'pull' in dry-run.""" repo = _make_repo(tmp_path) member_path = tmp_path / "repos" / "core" muse_dir(member_path).mkdir(parents=True) add_workspace_member(repo, "core", "https://example.com/core") results = sync_workspace(repo, dry_run=True) assert "pull" in results[0]["status"] def test_sync_named_member_only(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") add_workspace_member(repo, "data", "https://example.com/data") with patch("muse.core.workspace.subprocess.run") as mock_run: mock_run.return_value = type("R", (), {"returncode": 0, "stderr": ""})() results = sync_workspace(repo, member_name="core", dry_run=True) assert len(results) == 1 assert results[0]["name"] == "core" # --------------------------------------------------------------------------- # Security: TOML injection via crafted member name/url persists safely # --------------------------------------------------------------------------- def test_toml_injection_in_url_is_escaped(tmp_path: pathlib.Path) -> None: """A URL with embedded quotes and newlines must not corrupt the TOML manifest.""" import tomllib repo = _make_repo(tmp_path) # Craft a URL that would inject extra members if not escaped tricky_url = 'https://example.com/core"\n[[members]]\nname = "injected' add_workspace_member(repo, "safe", tricky_url) raw = (workspace_toml_path(repo)).read_text() parsed = tomllib.loads(raw) # Exactly 1 member — the injected one must not appear as a separate entry assert len(parsed.get("members", [])) == 1 assert parsed["members"][0]["name"] == "safe" # The raw newlines from the URL are escaped as \n inside the string value # (the file naturally has TOML structural newlines, but the URL value's # embedded newlines must appear as the two-char escape sequence \\n) url_line = next(line for line in raw.splitlines() if line.startswith("url")) assert "\\n" in url_line def test_ansi_in_name_sanitized_in_output(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) malicious_name = "\x1b[31mmalicious\x1b[0m" # _validate_member_name will reject the ANSI escape — that's the right behaviour with pytest.raises(ValueError, match="invalid characters"): add_workspace_member(repo, malicious_name, "https://example.com/malicious") def test_path_traversal_in_member_path_rejected(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError, match="outside the workspace root"): add_workspace_member(repo, "malicious", "https://example.com/malicious", path="../../etc") def test_file_url_scheme_rejected(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError, match="not allowed"): add_workspace_member(repo, "malicious", "file:///etc/passwd") def test_null_byte_in_url_rejected(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) with pytest.raises(ValueError, match="null bytes"): add_workspace_member(repo, "malicious", "https://example.com/\x00malicious") # --------------------------------------------------------------------------- # Error routing — all error output goes to stderr # --------------------------------------------------------------------------- def test_add_duplicate_error_to_stderr(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) stdout, stderr, rc = _cli(["workspace", "add", "core", "https://example.com/core"], repo) assert rc == 0 stdout2, stderr2, rc2 = _cli(["workspace", "add", "core", "https://example.com/other"], repo) assert rc2 != 0 assert "already exists" in stderr2 assert "already exists" not in stdout2 def test_remove_nonexistent_error_to_stderr(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, stderr, rc = _cli(["workspace", "remove", "ghost"], repo) assert rc != 0 assert "not found" in stderr assert "not found" not in stdout def test_update_no_flags_error_to_stderr(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, stderr, rc = _cli(["workspace", "update", "core"], repo) assert rc != 0 assert "at least one" in stderr def test_status_nonexistent_error_to_stderr(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, stderr, rc = _cli(["workspace", "status", "ghost"], repo) assert rc != 0 assert "not found" in stderr assert "not found" not in stdout # --------------------------------------------------------------------------- # JSON schema: add # --------------------------------------------------------------------------- class _AddJson(TypedDict): name: str url: str path: str branch: str def test_add_json_schema(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) stdout, _, rc = _cli( ["workspace", "add", "core", "https://example.com/core", "--json"], repo ) assert rc == 0 d = _parse_add(stdout) assert d["name"] == "core" assert d["branch"] == "main" assert "repos/core" in d["path"] # --------------------------------------------------------------------------- # JSON schema: update # --------------------------------------------------------------------------- class _UpdateJson(TypedDict): name: str url: str path: str branch: str def test_update_json_schema(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli( ["workspace", "update", "core", "--branch", "dev", "--json"], repo ) assert rc == 0 d = _parse_update(stdout) assert d["name"] == "core" assert d["branch"] == "dev" # --------------------------------------------------------------------------- # JSON schema: list # --------------------------------------------------------------------------- class _ListMemberJson(TypedDict): name: str url: str path: str branch: str present: bool head_commit: str | None dirty: bool actual_branch: str | None shelf_count: int feature_branches: list[str] def test_list_json_schema(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) _cli(["workspace", "add", "data", "https://example.com/data", "--branch", "v2"], repo) stdout, _, rc = _cli(["workspace", "list", "--json"], repo) assert rc == 0 members = _parse_list(stdout) assert len(members) == 2 d = members[0] assert d["name"] == "core" assert d["present"] is False assert d["dirty"] is False # New fields — always present even when member is not cloned. assert d["actual_branch"] is None assert d["shelf_count"] == 0 assert d["feature_branches"] == [] def test_list_json_empty_list(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "list", "--json"], repo) assert rc == 0 assert _parse_list(stdout) == [] # --------------------------------------------------------------------------- # JSON schema: remove # --------------------------------------------------------------------------- class _RemoveJson(TypedDict): name: str removed: bool def test_remove_json_schema(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli(["workspace", "remove", "core", "--json"], repo) assert rc == 0 d = _parse_remove(stdout) assert d["name"] == "core" assert d["removed"] is True # --------------------------------------------------------------------------- # JSON schema: status # --------------------------------------------------------------------------- def test_status_json_all(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli(["workspace", "status", "--json"], repo) assert rc == 0 members = _parse_list(stdout) assert len(members) == 1 assert members[0]["name"] == "core" def test_status_json_named(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) _cli(["workspace", "add", "data", "https://example.com/data"], repo) stdout, _, rc = _cli(["workspace", "status", "core", "--json"], repo) assert rc == 0 members = _parse_list(stdout) assert len(members) == 1 assert members[0]["name"] == "core" def test_status_json_empty(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "status", "--json"], repo) assert rc == 0 assert _parse_list(stdout) == [] # --------------------------------------------------------------------------- # JSON schema: sync # --------------------------------------------------------------------------- class _SyncResultItemJson(TypedDict): name: str status: str ok: bool class _SyncJson(TypedDict): dry_run: bool workers: int results: list[_SyncResultItemJson] total: int ok_count: int error_count: int def test_sync_json_dry_run(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli(["workspace", "sync", "--dry-run", "--json"], repo) assert rc == 0 d = _parse_sync(stdout) assert d["dry_run"] is True assert d["total"] == 1 assert d["ok_count"] == 1 assert d["error_count"] == 0 def test_sync_json_empty_manifest(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "sync", "--dry-run", "--json"], repo) assert rc == 0 d = _parse_sync(stdout) assert d["total"] == 0 # --------------------------------------------------------------------------- # Integration: full lifecycle # --------------------------------------------------------------------------- def test_lifecycle_add_update_remove(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") update_workspace_member(repo, "core", branch="dev") m = get_workspace_member(repo, "core") assert m.branch == "dev" remove_workspace_member(repo, "core") assert list_workspace_members(repo) == [] def test_lifecycle_multiple_members(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) for i in range(5): add_workspace_member(repo, f"svc{i}", f"https://example.com/svc{i}") members = list_workspace_members(repo) assert len(members) == 5 update_workspace_member(repo, "svc2", branch="release") m = get_workspace_member(repo, "svc2") assert m.branch == "release" remove_workspace_member(repo, "svc2") assert len(list_workspace_members(repo)) == 4 def test_add_custom_branch_and_path(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "data", "https://example.com/data", path="vendor/data", branch="v2") m = get_workspace_member(repo, "data") assert m.branch == "v2" assert "vendor/data" in str(m.path) # --------------------------------------------------------------------------- # E2E: text output # --------------------------------------------------------------------------- def test_e2e_add_text_output(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "add", "core", "https://example.com/core"], repo) assert rc == 0 assert "Added" in stdout assert "core" in stdout def test_e2e_list_text_output(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli(["workspace", "list"], repo) assert rc == 0 assert "core" in stdout assert "present" in stdout.lower() or "no" in stdout def test_e2e_status_text_output(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli(["workspace", "status"], repo) assert rc == 0 assert "core" in stdout assert "branch=main" in stdout def test_e2e_remove_text_output(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli(["workspace", "remove", "core"], repo) assert rc == 0 assert "Removed" in stdout def test_e2e_sync_dry_run_text_output(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli(["workspace", "sync", "--dry-run"], repo) assert rc == 0 assert "core" in stdout assert "skipped" in stdout or "dry-run" in stdout def test_e2e_list_no_members(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "list"], repo) assert rc == 0 assert "No workspace members" in stdout def test_e2e_update_text_output(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli(["workspace", "update", "core", "--branch", "dev"], repo) assert rc == 0 assert "Updated" in stdout def test_e2e_shorthand_branch_flag(tmp_path: pathlib.Path) -> None: """-b shorthand should work for add and update.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "add", "data", "https://example.com/data", "-b", "v3"], repo) assert rc == 0 m = get_workspace_member(repo, "data") assert m.branch == "v3" # --------------------------------------------------------------------------- # Stress: 50 members # --------------------------------------------------------------------------- def test_stress_50_members_add_list(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) for i in range(50): add_workspace_member(repo, f"svc{i:03d}", f"https://example.com/svc{i}") members = list_workspace_members(repo) assert len(members) == 50 names = {m.name for m in members} for i in range(50): assert f"svc{i:03d}" in names def test_stress_add_remove_cycle(tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) for i in range(20): add_workspace_member(repo, f"repo{i}", f"https://example.com/repo{i}") for i in range(20): remove_workspace_member(repo, f"repo{i}") assert list_workspace_members(repo) == [] def test_stress_concurrent_list_reads(tmp_path: pathlib.Path) -> None: """Concurrent reads of the manifest must all succeed without corruption.""" repo = _make_repo(tmp_path) for i in range(20): add_workspace_member(repo, f"svc{i}", f"https://example.com/svc{i}") failures: list[str] = [] def _read() -> None: try: members = list_workspace_members(repo) if len(members) != 20: failures.append(f"Expected 20 members, got {len(members)}") except Exception as exc: failures.append(str(exc)) threads = [threading.Thread(target=_read) for _ in range(20)] for t in threads: t.start() for t in threads: t.join() assert not failures, f"Concurrent read failures: {failures}" def test_stress_sync_parallel_dry_run(tmp_path: pathlib.Path) -> None: """Parallel sync (dry_run) over 20 members must return 20 results.""" repo = _make_repo(tmp_path) for i in range(20): add_workspace_member(repo, f"svc{i}", f"https://example.com/svc{i}") results = sync_workspace(repo, dry_run=True, workers=4) assert len(results) == 20 for r in results: assert r["status"].startswith("skipped") def test_stress_json_list_50_members(tmp_path: pathlib.Path) -> None: """JSON list output for 50 members must parse correctly.""" repo = _make_repo(tmp_path) for i in range(50): add_workspace_member(repo, f"svc{i:03d}", f"https://example.com/svc{i}") stdout, _, rc = _cli(["workspace", "list", "--json"], repo) assert rc == 0 members = _parse_list(stdout) assert len(members) == 50 def test_stress_update_10_members(tmp_path: pathlib.Path) -> None: """Update branch for 10 members sequentially; all must reflect the change.""" repo = _make_repo(tmp_path) for i in range(10): add_workspace_member(repo, f"svc{i}", f"https://example.com/svc{i}") for i in range(10): update_workspace_member(repo, f"svc{i}", branch="release") members = list_workspace_members(repo) for m in members: assert m.branch == "release" # =========================================================================== # muse workspace add — Extended / Security / Stress # =========================================================================== class TestWorkspaceAddExtended: """-j alias, JSON schema, defaults, custom args, lifecycle, edge cases.""" def test_add_j_alias(self, tmp_path: pathlib.Path) -> None: """-j produces the same JSON as --json (ignoring duration_ms).""" repo = _make_repo(tmp_path) stdout1, _, rc1 = _cli(["workspace", "add", "core", "https://example.com/core", "--json"], repo) _cli(["workspace", "remove", "core"], repo) stdout2, _, rc2 = _cli(["workspace", "add", "core", "https://example.com/core", "-j"], repo) assert rc1 == 0 and rc2 == 0 d1 = json.loads(_json_blob(stdout1)); d1.pop("duration_ms", None); d1.pop("timestamp", None) d2 = json.loads(_json_blob(stdout2)); d2.pop("duration_ms", None); d2.pop("timestamp", None) assert d1 == d2 def test_add_json_name_field(self, tmp_path: pathlib.Path) -> None: """JSON name field matches the supplied NAME argument.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "add", "myrepo", "https://example.com/myrepo", "-j"], repo) assert rc == 0 d = _parse_add(stdout) assert d["name"] == "myrepo" def test_add_json_url_field(self, tmp_path: pathlib.Path) -> None: """JSON url field matches the supplied URL argument.""" repo = _make_repo(tmp_path) url = "https://example.com/myrepo" stdout, _, rc = _cli(["workspace", "add", "myrepo", url, "-j"], repo) assert rc == 0 d = _parse_add(stdout) assert d["url"] == url def test_add_json_default_branch_is_main(self, tmp_path: pathlib.Path) -> None: """Branch defaults to 'main' when --branch is not supplied.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "add", "core", "https://example.com/core", "-j"], repo) assert rc == 0 d = _parse_add(stdout) assert d["branch"] == "main" def test_add_json_default_path_is_repos_name(self, tmp_path: pathlib.Path) -> None: """Path defaults to repos/ when --path is not supplied.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "add", "core", "https://example.com/core", "-j"], repo) assert rc == 0 d = _parse_add(stdout) assert "repos/core" in d["path"] def test_add_json_custom_branch(self, tmp_path: pathlib.Path) -> None: """Custom --branch appears in JSON output.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "add", "data", "https://example.com/data", "--branch", "v2", "-j"], repo) assert rc == 0 d = _parse_add(stdout) assert d["branch"] == "v2" def test_add_json_custom_path(self, tmp_path: pathlib.Path) -> None: """Custom --path appears in JSON output.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "add", "data", "https://example.com/data", "--path", "vendor/data", "-j"], repo) assert rc == 0 d = _parse_add(stdout) assert "vendor/data" in d["path"] def test_add_json_all_fields_present(self, tmp_path: pathlib.Path) -> None: """JSON output contains name, url, path, branch, exit_code, duration_ms.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "add", "core", "https://example.com/core", "-j"], repo) assert rc == 0 raw = json.loads(_json_blob(stdout)) assert {"name", "url", "path", "branch", "exit_code", "duration_ms"}.issubset(raw.keys()) def test_add_default_is_text(self, tmp_path: pathlib.Path) -> None: """Without --json the output is human-readable text.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "add", "core", "https://example.com/core"], repo) assert rc == 0 assert not stdout.strip().startswith("{") def test_add_text_contains_name(self, tmp_path: pathlib.Path) -> None: """Text output mentions the member name.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "add", "myrepo", "https://example.com/myrepo"], repo) assert rc == 0 assert "myrepo" in stdout def test_add_text_hints_sync(self, tmp_path: pathlib.Path) -> None: """Text output hints to run 'muse workspace sync'.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "add", "core", "https://example.com/core"], repo) assert rc == 0 assert "sync" in stdout.lower() def test_add_duplicate_exits_1(self, tmp_path: pathlib.Path) -> None: """Adding a member with a duplicate name exits with code 1.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) _, stderr, rc = _cli(["workspace", "add", "core", "https://example.com/core2"], repo) assert rc == 1 assert "core" in stderr def test_add_outside_repo_succeeds(self, tmp_path: pathlib.Path) -> None: """Workspace add works from any directory — no muse repo required.""" empty = tmp_path / "empty" empty.mkdir() _, _, rc = _cli(["workspace", "add", "core", "https://example.com/core"], empty) assert rc == 0 def test_add_appears_in_list_after(self, tmp_path: pathlib.Path) -> None: """Added member appears in subsequent 'workspace list --json'.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli(["workspace", "list", "--json"], repo) assert rc == 0 members = _parse_list(stdout) assert any(m["name"] == "core" for m in members) def test_add_help_has_description(self, tmp_path: pathlib.Path) -> None: """--help includes the rich description.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "add", "--help"], repo) assert rc == 0 assert "Agent quickstart" in stdout or "JSON output schema" in stdout def test_add_local_path_url_accepted(self, tmp_path: pathlib.Path) -> None: """A bare filesystem path is accepted as the URL.""" repo = _make_repo(tmp_path) local = str(tmp_path / "local-repo") stdout, _, rc = _cli(["workspace", "add", "local", local, "-j"], repo) assert rc == 0 d = _parse_add(stdout) assert d["url"] == local def test_add_shorthand_branch_flag(self, tmp_path: pathlib.Path) -> None: """-b shorthand sets the branch correctly.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "add", "data", "https://example.com/data", "-b", "release", "-j"], repo) assert rc == 0 assert _parse_add(stdout)["branch"] == "release" def test_add_json_is_valid_json(self, tmp_path: pathlib.Path) -> None: """JSON output is well-formed.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "add", "core", "https://example.com/core", "-j"], repo) assert rc == 0 raw = json.loads(_json_blob(stdout)) assert isinstance(raw, dict) assert isinstance(raw["name"], str) assert isinstance(raw["branch"], str) class TestWorkspaceAddSecurity: """Input validation, ANSI sanitization, error routing.""" def test_add_invalid_url_scheme_rejected(self, tmp_path: pathlib.Path) -> None: """file:// URL scheme is rejected with exit code 1.""" repo = _make_repo(tmp_path) _, stderr, rc = _cli(["workspace", "add", "bad", "file:///etc/passwd", "-j"], repo) assert rc == 1 assert "scheme" in stderr.lower() or "not allowed" in stderr.lower() def test_add_ftp_scheme_rejected(self, tmp_path: pathlib.Path) -> None: """ftp:// URL scheme is rejected.""" repo = _make_repo(tmp_path) _, _, rc = _cli(["workspace", "add", "bad", "ftp://example.com/repo", "-j"], repo) assert rc == 1 def test_add_null_byte_in_url_rejected(self, tmp_path: pathlib.Path) -> None: """Null byte in URL is rejected by the core validator.""" repo = _make_repo(tmp_path) with pytest.raises(ValueError, match="null"): add_workspace_member(repo, "bad", "https://example.com/\x00repo") def test_add_path_traversal_rejected(self, tmp_path: pathlib.Path) -> None: """--path escaping workspace root is rejected.""" repo = _make_repo(tmp_path) _, stderr, rc = _cli(["workspace", "add", "bad", "https://example.com/repo", "--path", "../../etc", "-j"], repo) assert rc == 1 assert "outside" in stderr.lower() or "escape" in stderr.lower() or "resolves" in stderr.lower() def test_add_invalid_name_rejected(self, tmp_path: pathlib.Path) -> None: """Name with slashes is rejected.""" repo = _make_repo(tmp_path) _, _, rc = _cli(["workspace", "add", "bad/name", "https://example.com/repo", "-j"], repo) assert rc == 1 def test_add_empty_name_rejected(self, tmp_path: pathlib.Path) -> None: """Empty name is rejected.""" repo = _make_repo(tmp_path) _, _, rc = _cli(["workspace", "add", "", "https://example.com/repo", "-j"], repo) assert rc != 0 def test_add_error_goes_to_stderr(self, tmp_path: pathlib.Path) -> None: """Error output (duplicate) goes to stderr, not stdout.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, stderr, rc = _cli(["workspace", "add", "core", "https://example.com/core"], repo) assert rc == 1 assert not stdout.strip().startswith("{") assert stderr.strip() != "" def test_add_json_no_ansi_in_output(self, tmp_path: pathlib.Path) -> None: """JSON output contains no ANSI escape sequences.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "add", "core", "https://example.com/core", "-j"], repo) assert rc == 0 assert "\x1b" not in stdout def test_add_ssh_scheme_rejected(self, tmp_path: pathlib.Path) -> None: """ssh:// URL scheme is rejected.""" repo = _make_repo(tmp_path) _, _, rc = _cli(["workspace", "add", "bad", "ssh://example.com/repo"], repo) assert rc == 1 class TestWorkspaceAddStress: """Performance and scale tests for workspace add.""" def test_add_20_sequential(self, tmp_path: pathlib.Path) -> None: """20 members can be added sequentially without error.""" repo = _make_repo(tmp_path) for i in range(20): _, _, rc = _cli(["workspace", "add", f"svc{i:02d}", f"https://example.com/svc{i}", "-j"], repo) assert rc == 0 stdout, _, rc = _cli(["workspace", "list", "--json"], repo) assert rc == 0 members = _parse_list(stdout) assert len(members) == 20 def test_add_performance(self, tmp_path: pathlib.Path) -> None: """Adding 10 members sequentially completes within 5 seconds.""" repo = _make_repo(tmp_path) t0 = time.monotonic() for i in range(10): _cli(["workspace", "add", f"svc{i}", f"https://example.com/svc{i}"], repo) elapsed = time.monotonic() - t0 assert elapsed < 10.0, f"10 adds took {elapsed:.2f}s" def test_add_remove_add_cycle(self, tmp_path: pathlib.Path) -> None: """A member can be re-added with the same name after removal.""" repo = _make_repo(tmp_path) for _ in range(5): _, _, rc_add = _cli(["workspace", "add", "core", "https://example.com/core", "-j"], repo) assert rc_add == 0 _, _, rc_rm = _cli(["workspace", "remove", "core"], repo) assert rc_rm == 0 # =========================================================================== # muse workspace update — Extended / Security / Stress # =========================================================================== class TestWorkspaceUpdateExtended: """-j alias, JSON schema, per-field updates, no-flags guard, edge cases.""" def test_update_j_alias(self, tmp_path: pathlib.Path) -> None: """-j produces the same JSON as --json (ignoring duration_ms).""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) s1, _, rc1 = _cli(["workspace", "update", "core", "--branch", "dev", "--json"], repo) _cli(["workspace", "update", "core", "--branch", "main"], repo) s2, _, rc2 = _cli(["workspace", "update", "core", "--branch", "dev", "-j"], repo) assert rc1 == 0 and rc2 == 0 d1 = json.loads(_json_blob(s1)); d1.pop("duration_ms", None); d1.pop("timestamp", None) d2 = json.loads(_json_blob(s2)); d2.pop("duration_ms", None); d2.pop("timestamp", None) assert d1 == d2 def test_update_branch_reflected_in_json(self, tmp_path: pathlib.Path) -> None: """Updated branch appears in JSON output.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli(["workspace", "update", "core", "--branch", "release", "-j"], repo) assert rc == 0 d = _parse_update(stdout) assert d["branch"] == "release" def test_update_url_reflected_in_json(self, tmp_path: pathlib.Path) -> None: """Updated URL appears in JSON output.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) new_url = "https://example.com/core-v2" stdout, _, rc = _cli(["workspace", "update", "core", "--url", new_url, "-j"], repo) assert rc == 0 d = _parse_update(stdout) assert d["url"] == new_url def test_update_path_reflected_in_json(self, tmp_path: pathlib.Path) -> None: """Updated path appears in JSON output.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli(["workspace", "update", "core", "--path", "vendor/core", "-j"], repo) assert rc == 0 d = _parse_update(stdout) assert "vendor/core" in d["path"] def test_update_json_all_fields_present(self, tmp_path: pathlib.Path) -> None: """JSON output contains name, url, path, branch, exit_code, duration_ms.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli(["workspace", "update", "core", "--branch", "dev", "-j"], repo) assert rc == 0 raw = json.loads(_json_blob(stdout)) assert {"name", "url", "path", "branch", "exit_code", "duration_ms"}.issubset(raw.keys()) def test_update_json_name_unchanged(self, tmp_path: pathlib.Path) -> None: """JSON name field matches the original member name.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "myrepo", "https://example.com/myrepo"], repo) stdout, _, rc = _cli(["workspace", "update", "myrepo", "--branch", "dev", "-j"], repo) assert rc == 0 d = _parse_update(stdout) assert d["name"] == "myrepo" def test_update_omitted_fields_preserved(self, tmp_path: pathlib.Path) -> None: """Fields not supplied in --update are preserved from original.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core", "--branch", "v1"], repo) stdout, _, rc = _cli(["workspace", "update", "core", "--path", "vendor/core", "-j"], repo) assert rc == 0 d = _parse_update(stdout) assert d["branch"] == "v1" # unchanged assert d["url"] == "https://example.com/core" # unchanged def test_update_no_flags_exits_1(self, tmp_path: pathlib.Path) -> None: """Supplying no --url/--path/--branch flags exits with code 1.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) _, stderr, rc = _cli(["workspace", "update", "core"], repo) assert rc == 1 assert "url" in stderr.lower() or "path" in stderr.lower() or "branch" in stderr.lower() def test_update_nonexistent_exits_1(self, tmp_path: pathlib.Path) -> None: """Updating a nonexistent member exits with code 1.""" repo = _make_repo(tmp_path) _, stderr, rc = _cli(["workspace", "update", "ghost", "--branch", "dev"], repo) assert rc == 1 assert "ghost" in stderr def test_update_outside_repo_exits_1_member_not_found(self, tmp_path: pathlib.Path) -> None: """Workspace update from a non-repo dir exits 1 (member not found), not 2.""" empty = tmp_path / "empty" empty.mkdir() _, _, rc = _cli(["workspace", "update", "core", "--branch", "dev"], empty) assert rc == 1 def test_update_default_is_text(self, tmp_path: pathlib.Path) -> None: """Without --json the output is human-readable text.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli(["workspace", "update", "core", "--branch", "dev"], repo) assert rc == 0 assert not stdout.strip().startswith("{") assert "Updated" in stdout def test_update_text_contains_name(self, tmp_path: pathlib.Path) -> None: """Text output mentions the member name.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "myrepo", "https://example.com/myrepo"], repo) stdout, _, rc = _cli(["workspace", "update", "myrepo", "--branch", "dev"], repo) assert rc == 0 assert "myrepo" in stdout def test_update_help_has_description(self, tmp_path: pathlib.Path) -> None: """--help includes the rich description.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "update", "--help"], repo) assert rc == 0 assert "Agent quickstart" in stdout or "JSON output schema" in stdout def test_update_multiple_flags_at_once(self, tmp_path: pathlib.Path) -> None: """All three fields can be updated in one command.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli([ "workspace", "update", "core", "--url", "https://example.com/core-v2", "--path", "vendor/core", "--branch", "release", "-j", ], repo) assert rc == 0 d = _parse_update(stdout) assert d["url"] == "https://example.com/core-v2" assert "vendor/core" in d["path"] assert d["branch"] == "release" def test_update_reflected_in_list(self, tmp_path: pathlib.Path) -> None: """Updated branch appears in subsequent 'workspace list --json'.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) _cli(["workspace", "update", "core", "--branch", "release"], repo) stdout, _, rc = _cli(["workspace", "list", "--json"], repo) assert rc == 0 members = _parse_list(stdout) core = next(m for m in members if m["name"] == "core") assert core["branch"] == "release" def test_update_shorthand_branch_flag(self, tmp_path: pathlib.Path) -> None: """-b shorthand sets the branch correctly.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli(["workspace", "update", "core", "-b", "hotfix", "-j"], repo) assert rc == 0 assert _parse_update(stdout)["branch"] == "hotfix" def test_update_json_is_valid_json(self, tmp_path: pathlib.Path) -> None: """JSON output is well-formed.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli(["workspace", "update", "core", "--branch", "dev", "-j"], repo) assert rc == 0 raw = json.loads(_json_blob(stdout)) assert isinstance(raw, dict) for field in ("name", "url", "path", "branch"): assert isinstance(raw[field], str) class TestWorkspaceUpdateSecurity: """Input validation, ANSI sanitization, error routing.""" def test_update_invalid_url_scheme_rejected(self, tmp_path: pathlib.Path) -> None: """file:// URL scheme in --url is rejected.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) _, stderr, rc = _cli(["workspace", "update", "core", "--url", "file:///etc/passwd"], repo) assert rc == 1 assert "scheme" in stderr.lower() or "not allowed" in stderr.lower() def test_update_path_traversal_rejected(self, tmp_path: pathlib.Path) -> None: """--path escaping workspace root is rejected.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) _, stderr, rc = _cli(["workspace", "update", "core", "--path", "../../etc"], repo) assert rc == 1 assert "outside" in stderr.lower() or "escape" in stderr.lower() or "resolves" in stderr.lower() def test_update_null_byte_in_path_rejected(self, tmp_path: pathlib.Path) -> None: """Null byte in --path is rejected by the core validator.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) with pytest.raises(ValueError, match="null"): update_workspace_member(repo, "core", path="vendor/\x00core") def test_update_error_goes_to_stderr(self, tmp_path: pathlib.Path) -> None: """Error output (member not found) goes to stderr, not stdout.""" repo = _make_repo(tmp_path) stdout, stderr, rc = _cli(["workspace", "update", "ghost", "--branch", "dev"], repo) assert rc == 1 assert not stdout.strip().startswith("{") assert stderr.strip() != "" def test_update_json_no_ansi_in_output(self, tmp_path: pathlib.Path) -> None: """JSON output contains no ANSI escape sequences.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, _, rc = _cli(["workspace", "update", "core", "--branch", "dev", "-j"], repo) assert rc == 0 assert "\x1b" not in stdout def test_update_ftp_url_rejected(self, tmp_path: pathlib.Path) -> None: """ftp:// URL scheme is rejected.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) _, _, rc = _cli(["workspace", "update", "core", "--url", "ftp://example.com/repo"], repo) assert rc == 1 def test_update_no_flags_error_to_stderr(self, tmp_path: pathlib.Path) -> None: """No-flags error is on stderr; stdout has no JSON.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) stdout, stderr, rc = _cli(["workspace", "update", "core"], repo) assert rc == 1 assert not stdout.strip().startswith("{") assert stderr.strip() != "" class TestWorkspaceUpdateStress: """Performance and scale tests for workspace update.""" def test_update_10_members_sequential(self, tmp_path: pathlib.Path) -> None: """10 members can each be updated sequentially.""" repo = _make_repo(tmp_path) for i in range(10): _cli(["workspace", "add", f"svc{i}", f"https://example.com/svc{i}"], repo) failures = [] for i in range(10): _, _, rc = _cli(["workspace", "update", f"svc{i}", "--branch", f"v{i}", "-j"], repo) if rc != 0: failures.append(f"svc{i}") assert not failures stdout, _, _ = _cli(["workspace", "list", "--json"], repo) members = {m["name"]: m for m in _parse_list(stdout)} for i in range(10): assert members[f"svc{i}"]["branch"] == f"v{i}" def test_update_performance(self, tmp_path: pathlib.Path) -> None: """10 sequential updates complete within 5 seconds.""" repo = _make_repo(tmp_path) for i in range(10): _cli(["workspace", "add", f"svc{i}", f"https://example.com/svc{i}"], repo) t0 = time.monotonic() for i in range(10): _cli(["workspace", "update", f"svc{i}", "--branch", "release"], repo) elapsed = time.monotonic() - t0 assert elapsed < 15.0, f"10 updates took {elapsed:.2f}s" def test_update_repeated_same_member(self, tmp_path: pathlib.Path) -> None: """A member can be updated 10 times in a row without error.""" repo = _make_repo(tmp_path) _cli(["workspace", "add", "core", "https://example.com/core"], repo) for i in range(10): _, _, rc = _cli(["workspace", "update", "core", "--branch", f"v{i}", "-j"], repo) assert rc == 0 stdout, _, _ = _cli(["workspace", "list", "--json"], repo) members = _parse_list(stdout) core = next(m for m in members if m["name"] == "core") assert core["branch"] == "v9" # =========================================================================== # muse workspace list — Extended / Security / Stress # =========================================================================== class TestWorkspaceListExtended: """-j alias, JSON schema, text output, ordering, edge cases.""" def test_list_j_alias(self, tmp_path: pathlib.Path) -> None: """-j produces the same JSON as --json (ignoring duration_ms).""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") s1, _, rc1 = _cli(["workspace", "list", "--json"], repo) s2, _, rc2 = _cli(["workspace", "list", "-j"], repo) assert rc1 == 0 and rc2 == 0 d1 = json.loads(_json_blob(s1)); d1.pop("duration_ms", None); d1.pop("timestamp", None) d2 = json.loads(_json_blob(s2)); d2.pop("duration_ms", None); d2.pop("timestamp", None) assert d1 == d2 def test_list_empty_exits_0(self, tmp_path: pathlib.Path) -> None: """List with no members exits 0 and returns empty members array.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "list", "-j"], repo) assert rc == 0 assert json.loads(_json_blob(stdout))["members"] == [] def test_list_json_is_array(self, tmp_path: pathlib.Path) -> None: """JSON output is an envelope with a members array.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, rc = _cli(["workspace", "list", "-j"], repo) assert rc == 0 raw = json.loads(_json_blob(stdout)) assert isinstance(raw["members"], list) def test_list_json_all_fields_present(self, tmp_path: pathlib.Path) -> None: """Every member entry has the required fields.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, rc = _cli(["workspace", "list", "-j"], repo) assert rc == 0 raw = json.loads(_json_blob(stdout)) assert len(raw["members"]) == 1 entry = raw["members"][0] for field in ("name", "url", "path", "branch", "present", "head_commit", "dirty"): assert field in entry, f"field '{field}' missing" def test_list_json_name_matches(self, tmp_path: pathlib.Path) -> None: """name field in JSON matches the registered member name.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "myrepo", "https://example.com/myrepo") members = _parse_list(_cli(["workspace", "list", "-j"], repo)[0]) assert any(m["name"] == "myrepo" for m in members) def test_list_json_url_matches(self, tmp_path: pathlib.Path) -> None: """url field in JSON matches the registered URL.""" repo = _make_repo(tmp_path) url = "https://example.com/myrepo" add_workspace_member(repo, "myrepo", url) members = _parse_list(_cli(["workspace", "list", "-j"], repo)[0]) assert members[0]["url"] == url def test_list_json_branch_default_main(self, tmp_path: pathlib.Path) -> None: """branch field defaults to 'main'.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") members = _parse_list(_cli(["workspace", "list", "-j"], repo)[0]) assert members[0]["branch"] == "main" def test_list_json_present_false_when_not_cloned(self, tmp_path: pathlib.Path) -> None: """present=false when the checkout directory does not exist.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") members = _parse_list(_cli(["workspace", "list", "-j"], repo)[0]) assert members[0]["present"] is False def test_list_json_head_commit_null_when_not_cloned(self, tmp_path: pathlib.Path) -> None: """head_commit is null when the member is not yet cloned.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") members = _parse_list(_cli(["workspace", "list", "-j"], repo)[0]) assert members[0]["head_commit"] is None def test_list_json_count_matches_registered(self, tmp_path: pathlib.Path) -> None: """Array length equals number of registered members.""" repo = _make_repo(tmp_path) for i in range(5): add_workspace_member(repo, f"svc{i}", f"https://example.com/svc{i}") members = _parse_list(_cli(["workspace", "list", "-j"], repo)[0]) assert len(members) == 5 def test_list_json_reflects_update(self, tmp_path: pathlib.Path) -> None: """Updated branch appears in list JSON after update.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") update_workspace_member(repo, "core", branch="release") members = _parse_list(_cli(["workspace", "list", "-j"], repo)[0]) assert members[0]["branch"] == "release" def test_list_json_member_removed_not_shown(self, tmp_path: pathlib.Path) -> None: """Removed member no longer appears in list.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") add_workspace_member(repo, "data", "https://example.com/data") remove_workspace_member(repo, "core") members = _parse_list(_cli(["workspace", "list", "-j"], repo)[0]) assert all(m["name"] != "core" for m in members) assert any(m["name"] == "data" for m in members) def test_list_default_is_text(self, tmp_path: pathlib.Path) -> None: """Without --json output is human-readable text.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, rc = _cli(["workspace", "list"], repo) assert rc == 0 assert not stdout.strip().startswith("[") def test_list_text_empty_message(self, tmp_path: pathlib.Path) -> None: """Text output says 'No workspace members' when list is empty.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "list"], repo) assert rc == 0 assert "No workspace members" in stdout def test_list_text_shows_member_name(self, tmp_path: pathlib.Path) -> None: """Text output includes the member name.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "myrepo", "https://example.com/myrepo") stdout, _, rc = _cli(["workspace", "list"], repo) assert rc == 0 assert "myrepo" in stdout def test_list_outside_repo_succeeds_empty(self, tmp_path: pathlib.Path) -> None: """Workspace list from a non-repo dir returns empty members — no muse repo required.""" empty = tmp_path / "empty" empty.mkdir() stdout, _, rc = _cli(["workspace", "list", "--json"], empty) assert rc == 0 assert json.loads(stdout)["members"] == [] def test_list_help_has_description(self, tmp_path: pathlib.Path) -> None: """--help includes the rich description.""" repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "list", "--help"], repo) assert rc == 0 assert "Agent quickstart" in stdout or "JSON output schema" in stdout def test_list_json_valid_types(self, tmp_path: pathlib.Path) -> None: """All JSON field types are correct.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") raw = json.loads(_json_blob(_cli(["workspace", "list", "-j"], repo)[0])) entry = raw["members"][0] assert isinstance(entry["name"], str) assert isinstance(entry["url"], str) assert isinstance(entry["path"], str) assert isinstance(entry["branch"], str) assert isinstance(entry["present"], bool) assert entry["head_commit"] is None or isinstance(entry["head_commit"], str) assert isinstance(entry["dirty"], bool) class TestWorkspaceListSecurity: """ANSI sanitization and output integrity.""" def test_list_json_ansi_in_name_sanitized(self, tmp_path: pathlib.Path) -> None: """ANSI codes in stored member name are stripped from JSON output.""" repo = _make_repo(tmp_path) # Inject ANSI directly into manifest via core (bypassing CLI validation) manifest_path = workspace_toml_path(repo) manifest_path.parent.mkdir(parents=True, exist_ok=True) manifest_path.write_text( '[workspace]\n[[workspace.members]]\n' 'name = "core\\u001b[31mred\\u001b[0m"\n' 'url = "https://example.com/core"\n' 'path = "repos/core"\n' 'branch = "main"\n' ) stdout, _, rc = _cli(["workspace", "list", "-j"], repo) assert rc == 0 assert "\x1b" not in stdout def test_list_json_ansi_in_url_sanitized(self, tmp_path: pathlib.Path) -> None: """ANSI codes in stored URL are stripped from JSON output.""" repo = _make_repo(tmp_path) manifest_path = workspace_toml_path(repo) manifest_path.parent.mkdir(parents=True, exist_ok=True) manifest_path.write_text( '[workspace]\n[[workspace.members]]\n' 'name = "core"\n' 'url = "https://example.com/core\\u001b[31m"\n' 'path = "repos/core"\n' 'branch = "main"\n' ) stdout, _, rc = _cli(["workspace", "list", "-j"], repo) assert rc == 0 assert "\x1b" not in stdout def test_list_text_no_ansi_in_output(self, tmp_path: pathlib.Path) -> None: """Text output contains no ANSI escape sequences.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, rc = _cli(["workspace", "list"], repo) assert rc == 0 assert "\x1b" not in stdout def test_list_json_is_valid_json(self, tmp_path: pathlib.Path) -> None: """JSON output is well-formed even with multiple members.""" repo = _make_repo(tmp_path) for i in range(3): add_workspace_member(repo, f"svc{i}", f"https://example.com/svc{i}") stdout, _, rc = _cli(["workspace", "list", "-j"], repo) assert rc == 0 raw = json.loads(_json_blob(stdout)) assert isinstance(raw["members"], list) assert len(raw["members"]) == 3 def test_list_json_dirty_is_bool(self, tmp_path: pathlib.Path) -> None: """dirty field is always a boolean, never a string or int.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") raw = json.loads(_json_blob(_cli(["workspace", "list", "-j"], repo)[0])) assert isinstance(raw["members"][0]["dirty"], bool) class TestWorkspaceListStress: """Performance and scale tests for workspace list.""" def test_list_50_members(self, tmp_path: pathlib.Path) -> None: """List returns all 50 members when 50 are registered.""" repo = _make_repo(tmp_path) for i in range(50): add_workspace_member(repo, f"svc{i:03d}", f"https://example.com/svc{i}") members = _parse_list(_cli(["workspace", "list", "-j"], repo)[0]) assert len(members) == 50 def test_list_performance_50_members(self, tmp_path: pathlib.Path) -> None: """Listing 50 members completes within 5 seconds.""" repo = _make_repo(tmp_path) for i in range(50): add_workspace_member(repo, f"svc{i:03d}", f"https://example.com/svc{i}") t0 = time.monotonic() stdout, _, rc = _cli(["workspace", "list", "-j"], repo) elapsed = time.monotonic() - t0 assert rc == 0 assert elapsed < 5.0, f"list of 50 took {elapsed:.2f}s" def test_list_concurrent_reads_consistent(self, tmp_path: pathlib.Path) -> None: """Concurrent list reads all return the same member count.""" repo = _make_repo(tmp_path) for i in range(20): add_workspace_member(repo, f"svc{i}", f"https://example.com/svc{i}") counts: list[int] = [] errors: list[str] = [] lock = threading.Lock() def _run() -> None: stdout, _, rc = _cli(["workspace", "list", "-j"], repo) with lock: if rc != 0: errors.append(stdout) else: counts.append(len(json.loads(_json_blob(stdout))["members"])) threads = [threading.Thread(target=_run) for _ in range(8)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent list errors: {errors}" assert all(c == 20 for c in counts), f"Inconsistent counts: {counts}" # --------------------------------------------------------------------------- # workspace remove — Extended, Security, Stress # --------------------------------------------------------------------------- class TestWorkspaceRemoveExtended: """Extended unit / integration / e2e tests for muse workspace remove.""" def test_remove_exits_0_on_success(self, tmp_path: pathlib.Path) -> None: """Successful remove exits with code 0.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") _, _, rc = _cli(["workspace", "remove", "core"], repo) assert rc == 0 def test_remove_j_alias_works(self, tmp_path: pathlib.Path) -> None: """-j is an accepted alias for --json.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, rc = _cli(["workspace", "remove", "core", "-j"], repo) assert rc == 0 d = _parse_remove(stdout) assert d["removed"] is True def test_remove_json_name_matches(self, tmp_path: pathlib.Path) -> None: """JSON output name matches the removed member's name.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "sounds", "https://example.com/sounds") stdout, _, rc = _cli(["workspace", "remove", "sounds", "--json"], repo) assert rc == 0 d = _parse_remove(stdout) assert d["name"] == "sounds" def test_remove_json_removed_true(self, tmp_path: pathlib.Path) -> None: """JSON output always has removed=true on success.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, rc = _cli(["workspace", "remove", "core", "--json"], repo) assert rc == 0 assert json.loads(_json_blob(stdout))["removed"] is True def test_remove_member_no_longer_in_list(self, tmp_path: pathlib.Path) -> None: """After remove, the member is absent from workspace list.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") add_workspace_member(repo, "data", "https://example.com/data") _cli(["workspace", "remove", "core"], repo) members = _parse_list(_cli(["workspace", "list", "--json"], repo)[0]) names = [m["name"] for m in members] assert "core" not in names assert "data" in names def test_remove_only_named_member_removed(self, tmp_path: pathlib.Path) -> None: """Remove deletes exactly one member; others are untouched.""" repo = _make_repo(tmp_path) for n in ("alpha", "beta", "gamma"): add_workspace_member(repo, n, f"https://example.com/{n}") _cli(["workspace", "remove", "beta"], repo) members = _parse_list(_cli(["workspace", "list", "--json"], repo)[0]) names = [m["name"] for m in members] assert names == ["alpha", "gamma"] def test_remove_idempotent_error_on_second_call(self, tmp_path: pathlib.Path) -> None: """Removing the same member twice returns an error on the second call.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") _, _, rc1 = _cli(["workspace", "remove", "core"], repo) _, _, rc2 = _cli(["workspace", "remove", "core"], repo) assert rc1 == 0 assert rc2 != 0 def test_remove_nonexistent_exits_1(self, tmp_path: pathlib.Path) -> None: """Removing a non-existent member exits with code 1.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") _, _, rc = _cli(["workspace", "remove", "ghost"], repo) assert rc == 1 def test_remove_outside_repo_exits_1_member_not_found(self, tmp_path: pathlib.Path) -> None: """Workspace remove from a non-repo dir exits 1 (member not found), not 2.""" empty = tmp_path / "not_a_repo" empty.mkdir() _, _, rc = _cli(["workspace", "remove", "core"], empty) assert rc == 1 def test_remove_text_output_contains_name(self, tmp_path: pathlib.Path) -> None: """Text output mentions the removed member's name.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "sounds", "https://example.com/sounds") stdout, _, rc = _cli(["workspace", "remove", "sounds"], repo) assert rc == 0 assert "sounds" in stdout def test_remove_text_success_marker(self, tmp_path: pathlib.Path) -> None: """Text output contains a success indicator.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, _ = _cli(["workspace", "remove", "core"], repo) assert "Removed" in stdout or "✅" in stdout def test_remove_error_to_stderr_not_stdout(self, tmp_path: pathlib.Path) -> None: """Error messages go to stderr; stdout is empty on failure.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, stderr, rc = _cli(["workspace", "remove", "ghost"], repo) assert rc != 0 assert "not found" in stderr assert "not found" not in stdout def test_remove_text_no_json_on_success(self, tmp_path: pathlib.Path) -> None: """Without --json, stdout does not contain a JSON object.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, rc = _cli(["workspace", "remove", "core"], repo) assert rc == 0 assert not stdout.strip().startswith("{") def test_remove_count_decreases(self, tmp_path: pathlib.Path) -> None: """Member count decreases by exactly one after remove.""" repo = _make_repo(tmp_path) for n in ("a", "b", "c"): add_workspace_member(repo, n, f"https://example.com/{n}") before = len(_parse_list(_cli(["workspace", "list", "--json"], repo)[0])) _cli(["workspace", "remove", "b"], repo) after = len(_parse_list(_cli(["workspace", "list", "--json"], repo)[0])) assert after == before - 1 def test_remove_last_member_leaves_empty_manifest(self, tmp_path: pathlib.Path) -> None: """Removing the only member results in an empty list.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "only", "https://example.com/only") _cli(["workspace", "remove", "only"], repo) members = _parse_list(_cli(["workspace", "list", "--json"], repo)[0]) assert members == [] def test_remove_help_description_present(self, tmp_path: pathlib.Path) -> None: """--help output contains the agent-friendly description.""" repo = _make_repo(tmp_path) stdout, _, _ = _cli(["workspace", "remove", "--help"], repo) assert "Unregister" in stdout or "manifest" in stdout def test_remove_json_schema_keys(self, tmp_path: pathlib.Path) -> None: """JSON output has exactly the keys: name, removed, exit_code, duration_ms.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, rc = _cli(["workspace", "remove", "core", "--json"], repo) assert rc == 0 d = json.loads(_json_blob(stdout)) assert {"name", "removed", "exit_code", "duration_ms"}.issubset(d.keys()) def test_remove_no_manifest_exits_1(self, tmp_path: pathlib.Path) -> None: """Remove on a repo with no workspace manifest exits 1.""" repo = _make_repo(tmp_path) # No workspace.toml — remove_workspace_member raises ValueError _, stderr, rc = _cli(["workspace", "remove", "core"], repo) assert rc == 1 assert stderr.strip() != "" class TestWorkspaceRemoveSecurity: """Security hardening tests for muse workspace remove.""" def test_remove_ansi_in_name_arg_sanitized_in_json(self, tmp_path: pathlib.Path) -> None: """ANSI codes in a stored name are stripped from JSON output.""" repo = _make_repo(tmp_path) # Write manifest directly with an ANSI-injected name so it bypasses validator manifest_path = workspace_toml_path(repo) manifest_path.parent.mkdir(parents=True, exist_ok=True) manifest_path.write_text( '[workspace]\n[[workspace.members]]\n' 'name = "malicious\\u001b[31m"\n' 'url = "https://example.com/malicious"\n' 'path = "repos/malicious"\n' 'branch = "main"\n' ) # Use the raw stored name as the CLI arg to remove it stdout, _, rc = _cli(["workspace", "remove", "malicious\x1b[31m", "--json"], repo) # Either succeeds (found) or fails (not found) — either way, no ANSI in stdout assert "\x1b" not in stdout def test_remove_text_output_no_ansi(self, tmp_path: pathlib.Path) -> None: """Text output contains no ANSI escape sequences.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, _ = _cli(["workspace", "remove", "core"], repo) assert "\x1b" not in stdout def test_remove_json_valid_on_success(self, tmp_path: pathlib.Path) -> None: """JSON output is well-formed on success.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, rc = _cli(["workspace", "remove", "core", "--json"], repo) assert rc == 0 d = json.loads(_json_blob(stdout)) assert isinstance(d["name"], str) assert d["removed"] is True def test_remove_removed_field_is_bool(self, tmp_path: pathlib.Path) -> None: """removed field is a boolean, never a string or int.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, rc = _cli(["workspace", "remove", "core", "--json"], repo) assert rc == 0 d = json.loads(_json_blob(stdout)) assert isinstance(d["removed"], bool) def test_remove_symlink_manifest_fails_gracefully(self, tmp_path: pathlib.Path) -> None: """A symlinked manifest is refused — exits non-zero without crashing.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") manifest_path = workspace_toml_path(repo) real = tmp_path / "real_workspace.toml" real.write_text(manifest_path.read_text()) manifest_path.unlink() manifest_path.symlink_to(real) _, _, rc = _cli(["workspace", "remove", "core"], repo) # The symlink guard in _load_manifest returns None → ValueError → rc 1 assert rc != 0 def test_remove_null_byte_in_name_raises(self, tmp_path: pathlib.Path) -> None: """Null byte in name is rejected by the core validator.""" repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") # remove_workspace_member does a name-equality match; null byte won't match # any valid stored name — raises ValueError (not found) with pytest.raises(ValueError): remove_workspace_member(repo, "core\x00malicious") class TestWorkspaceRemoveStress: """Performance and scale tests for muse workspace remove.""" def test_remove_from_50_member_manifest(self, tmp_path: pathlib.Path) -> None: """Remove works correctly when the manifest has 50 members.""" repo = _make_repo(tmp_path) for i in range(50): add_workspace_member(repo, f"svc{i:03d}", f"https://example.com/svc{i}") _, _, rc = _cli(["workspace", "remove", "svc025", "--json"], repo) assert rc == 0 members = _parse_list(_cli(["workspace", "list", "--json"], repo)[0]) assert len(members) == 49 assert all(m["name"] != "svc025" for m in members) def test_remove_performance_50_members(self, tmp_path: pathlib.Path) -> None: """Removing from a 50-member manifest completes within 5 seconds.""" repo = _make_repo(tmp_path) for i in range(50): add_workspace_member(repo, f"svc{i:03d}", f"https://example.com/svc{i}") t0 = time.monotonic() _, _, rc = _cli(["workspace", "remove", "svc000", "--json"], repo) elapsed = time.monotonic() - t0 assert rc == 0 assert elapsed < 5.0, f"remove from 50 took {elapsed:.2f}s" def test_remove_sequential_removes_all(self, tmp_path: pathlib.Path) -> None: """Removing all 20 members one-by-one leaves an empty list.""" repo = _make_repo(tmp_path) names = [f"svc{i:02d}" for i in range(20)] for n in names: add_workspace_member(repo, n, f"https://example.com/{n}") for n in names: _, _, rc = _cli(["workspace", "remove", n], repo) assert rc == 0 members = _parse_list(_cli(["workspace", "list", "--json"], repo)[0]) assert members == [] # --------------------------------------------------------------------------- # workspace status — Extended, Security, Stress # --------------------------------------------------------------------------- class TestWorkspaceStatusExtended: """Extended unit / integration / e2e tests for muse workspace status.""" def test_status_exits_0_all_members(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") _, _, rc = _cli(["workspace", "status"], repo) assert rc == 0 def test_status_j_alias_works(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, rc = _cli(["workspace", "status", "-j"], repo) assert rc == 0 members = _parse_list(stdout) assert len(members) == 1 def test_status_named_exits_0(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") _, _, rc = _cli(["workspace", "status", "core"], repo) assert rc == 0 def test_status_named_json_single_element(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") add_workspace_member(repo, "data", "https://example.com/data") stdout, _, rc = _cli(["workspace", "status", "core", "--json"], repo) assert rc == 0 members = _parse_list(stdout) assert len(members) == 1 assert members[0]["name"] == "core" def test_status_all_json_all_members_returned(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) for n in ("alpha", "beta", "gamma"): add_workspace_member(repo, n, f"https://example.com/{n}") stdout, _, rc = _cli(["workspace", "status", "--json"], repo) assert rc == 0 members = _parse_list(stdout) assert {m["name"] for m in members} == {"alpha", "beta", "gamma"} def test_status_json_ten_fields(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, rc = _cli(["workspace", "status", "--json"], repo) assert rc == 0 d = json.loads(_json_blob(stdout))["members"][0] assert set(d.keys()) == { "name", "url", "path", "branch", "present", "head_commit", "dirty", "actual_branch", "shelf_count", "feature_branches", "branch_mismatch", } def test_status_json_present_false_when_not_cloned(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, rc = _cli(["workspace", "status", "--json"], repo) assert rc == 0 assert _parse_list(stdout)[0]["present"] is False def test_status_json_head_commit_null_when_not_cloned(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, rc = _cli(["workspace", "status", "--json"], repo) assert rc == 0 assert _parse_list(stdout)[0]["head_commit"] is None def test_status_json_dirty_false_when_not_cloned(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, rc = _cli(["workspace", "status", "--json"], repo) assert rc == 0 assert _parse_list(stdout)[0]["dirty"] is False def test_status_empty_exits_0(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) _, _, rc = _cli(["workspace", "status", "--json"], repo) assert rc == 0 def test_status_empty_json_empty_array(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "status", "--json"], repo) assert rc == 0 assert _parse_list(stdout) == [] def test_status_nonexistent_name_exits_1(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") _, _, rc = _cli(["workspace", "status", "ghost"], repo) assert rc == 1 def test_status_nonexistent_error_to_stderr(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, stderr, rc = _cli(["workspace", "status", "ghost"], repo) assert rc != 0 assert "not found" in stderr assert "not found" not in stdout def test_status_outside_repo_succeeds_empty(self, tmp_path: pathlib.Path) -> None: """Workspace status from a non-repo dir returns empty members — no muse repo required.""" empty = tmp_path / "not_a_repo" empty.mkdir() stdout, _, rc = _cli(["workspace", "status", "--json"], empty) assert rc == 0 assert json.loads(stdout)["members"] == [] def test_status_text_contains_member_name(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "sounds", "https://example.com/sounds") stdout, _, rc = _cli(["workspace", "status"], repo) assert rc == 0 assert "sounds" in stdout def test_status_text_empty_message(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) stdout, _, rc = _cli(["workspace", "status"], repo) assert rc == 0 assert "No workspace members" in stdout def test_status_help_description_present(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) stdout, _, _ = _cli(["workspace", "status", "--help"], repo) assert "Agent quickstart" in stdout or "present" in stdout def test_status_json_url_matches_registered(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, rc = _cli(["workspace", "status", "--json"], repo) assert rc == 0 assert _parse_list(stdout)[0]["url"] == "https://example.com/core" class TestWorkspaceStatusSecurity: """Security hardening tests for muse workspace status.""" def test_status_json_no_ansi_in_name(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) manifest_path = workspace_toml_path(repo) manifest_path.parent.mkdir(parents=True, exist_ok=True) manifest_path.write_text( '[workspace]\n[[workspace.members]]\n' 'name = "malicious\\u001b[31m"\n' 'url = "https://example.com/malicious"\n' 'path = "repos/malicious"\n' 'branch = "main"\n' ) stdout, _, rc = _cli(["workspace", "status", "--json"], repo) assert rc == 0 assert "\x1b" not in stdout def test_status_json_no_ansi_in_url(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) manifest_path = workspace_toml_path(repo) manifest_path.parent.mkdir(parents=True, exist_ok=True) manifest_path.write_text( '[workspace]\n[[workspace.members]]\n' 'name = "core"\n' 'url = "https://example.com/\\u001b[31mcore"\n' 'path = "repos/core"\n' 'branch = "main"\n' ) stdout, _, rc = _cli(["workspace", "status", "--json"], repo) assert rc == 0 assert "\x1b" not in stdout def test_status_text_no_ansi(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, _ = _cli(["workspace", "status"], repo) assert "\x1b" not in stdout def test_status_json_valid_json(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) for i in range(3): add_workspace_member(repo, f"svc{i}", f"https://example.com/svc{i}") stdout, _, rc = _cli(["workspace", "status", "--json"], repo) assert rc == 0 raw = json.loads(_json_blob(stdout)) assert isinstance(raw["members"], list) assert len(raw["members"]) == 3 def test_status_json_bool_fields_are_bool(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") stdout, _, rc = _cli(["workspace", "status", "--json"], repo) assert rc == 0 d = json.loads(_json_blob(stdout))["members"][0] assert isinstance(d["present"], bool) assert isinstance(d["dirty"], bool) def test_status_symlink_manifest_exits_nonzero(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) add_workspace_member(repo, "core", "https://example.com/core") manifest_path = workspace_toml_path(repo) real = tmp_path / "real.toml" real.write_text(manifest_path.read_text()) manifest_path.unlink() manifest_path.symlink_to(real) # symlink guard returns None → empty list (exits 0, empty array) stdout, _, rc = _cli(["workspace", "status", "--json"], repo) assert rc == 0 assert _parse_list(stdout) == [] class TestWorkspaceStatusStress: """Performance and scale tests for muse workspace status.""" def test_status_50_members_all_returned(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) for i in range(50): add_workspace_member(repo, f"svc{i:03d}", f"https://example.com/svc{i}") stdout, _, rc = _cli(["workspace", "status", "--json"], repo) assert rc == 0 assert len(_parse_list(stdout)) == 50 def test_status_performance_50_members(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) for i in range(50): add_workspace_member(repo, f"svc{i:03d}", f"https://example.com/svc{i}") t0 = time.monotonic() stdout, _, rc = _cli(["workspace", "status", "--json"], repo) elapsed = time.monotonic() - t0 assert rc == 0 assert elapsed < 5.0, f"status of 50 took {elapsed:.2f}s" def test_status_concurrent_reads_consistent(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) for i in range(20): add_workspace_member(repo, f"svc{i}", f"https://example.com/svc{i}") counts: list[int] = [] errors: list[str] = [] lock = threading.Lock() def _run() -> None: stdout, _, rc = _cli(["workspace", "status", "--json"], repo) with lock: if rc != 0: errors.append(stdout) else: counts.append(len(json.loads(_json_blob(stdout))["members"])) threads = [threading.Thread(target=_run) for _ in range(8)] for t in threads: t.start() for t in threads: t.join() assert not errors assert all(c == 20 for c in counts)