"""Hardening tests for ``muse release`` and ``muse/cli/commands/release.py``. Covers: - Security: ANSI-safe body rendering via sanitize_display - Security: TTY guard on ``muse release delete`` without --yes - Error routing: all user-visible errors go to stderr - JSON schema: add, list, show, push dry-run, delete dry-run, delete aborted - --dry-run push: no network call, structured output - --dry-run delete: no deletion, structured output - --json flag: push, delete, show, list, add - --commit alias for --ref on add - Integration: full lifecycle add → show → list → delete - Integration: channel filtering - Stress: 50 releases, concurrent list reads """ from __future__ import annotations import datetime import json import pathlib import threading from typing import TypedDict from unittest.mock import MagicMock, patch import pytest from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.types import Manifest, fake_id from muse.core.semver import SemVerTag from muse.core.releases import ( ReleaseRecord, delete_release, get_release_for_tag, list_releases, write_release, ) from muse.core.paths import muse_dir, ref_path runner = CliRunner() # --------------------------------------------------------------------------- # TypedDicts for JSON schema validation # --------------------------------------------------------------------------- class _PushJson(TypedDict): status: str tag: str remote: str release_id: str dry_run: bool class _DeleteJson(TypedDict): status: str tag: str was_draft: bool remote_retracted: bool dry_run: bool class _ShowJson(TypedDict): tag: str channel: str commit_id: str snapshot_id: str release_id: str is_draft: bool # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _env(root: pathlib.Path) -> Manifest: return {"MUSE_REPO_ROOT": str(root)} def _init_repo(tmp_path: pathlib.Path, domain: str = "code") -> tuple[pathlib.Path, str]: dot_muse = muse_dir(tmp_path) dot_muse.mkdir() repo_id = fake_id("repo") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": repo_id, "domain": domain, "default_branch": "main"}), encoding="utf-8", ) (dot_muse / "HEAD").write_text("ref: refs/heads/main\n", encoding="utf-8") (dot_muse / "refs" / "heads").mkdir(parents=True) (dot_muse / "snapshots").mkdir() (dot_muse / "commits").mkdir() (dot_muse / "objects").mkdir() return tmp_path, repo_id def _make_commit( root: pathlib.Path, repo_id: str, branch: str = "main", message: str = "feat: add", sem_ver_bump: str = "minor", ) -> str: from muse.core.ids import hash_snapshot, hash_commit from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.domain import SemVerBump ref_file = ref_path(root, branch) raw_parent = ref_file.read_text().strip() if ref_file.exists() else "" parent_id: str | None = raw_parent if raw_parent else None manifest: Manifest = {} snap_id = hash_snapshot(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) now = datetime.datetime.now(datetime.timezone.utc) parent_ids: list[str] = [parent_id] if parent_id else [] commit_id = hash_commit( parent_ids=parent_ids, snapshot_id=snap_id, message=message, committed_at_iso=now.isoformat(), ) _bump_map = { "major": "major", "minor": "minor", "patch": "patch", "none": "none" } bump_val: SemVerBump = _bump_map.get(sem_ver_bump, "none") write_commit(root, CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message=message, committed_at=now, parent_commit_id=parent_id, sem_ver_bump=bump_val, )) ref_file.write_text(commit_id, encoding="utf-8") return commit_id def _write_release(root: pathlib.Path, repo_id: str, tag: str, is_draft: bool = False) -> ReleaseRecord: from muse.core.semver import ReleaseChannel sv_raw = tag.lstrip("v").split("-") parts = sv_raw[0].split(".") major, minor, patch_num = int(parts[0]), int(parts[1]), int(parts[2]) pre = sv_raw[1] if len(sv_raw) > 1 else "" semver = SemVerTag(major=major, minor=minor, patch=patch_num, pre=pre, build="") _channel_map = { "beta": "beta", "alpha": "alpha", "nightly": "nightly", } channel: ReleaseChannel = _channel_map.get( next((k for k in _channel_map if k in pre), ""), "stable" ) rec = ReleaseRecord( repo_id=repo_id, release_id=fake_id(f"release-{tag}"), tag=tag, semver=semver, channel=channel, commit_id="a" * 64, snapshot_id="b" * 64, title=f"Release {tag}", body="", changelog=[], is_draft=is_draft, ) write_release(root, rec) return rec def _invoke(args: list[str], repo: pathlib.Path) -> InvokeResult: return runner.invoke(None, args, env=_env(repo)) def _json_blob(output: str) -> str: for line in output.splitlines(): line = line.strip() if line.startswith("{") or line.startswith("["): return line return output.strip() def _parse_push(output: str) -> _PushJson: raw = json.loads(_json_blob(output)) assert isinstance(raw, dict) status = raw["status"] tag = raw["tag"] remote = raw["remote"] release_id = raw["release_id"] dry_run = raw["dry_run"] assert isinstance(status, str) assert isinstance(tag, str) assert isinstance(remote, str) assert isinstance(release_id, str) assert isinstance(dry_run, bool) return _PushJson(status=status, tag=tag, remote=remote, release_id=release_id, dry_run=dry_run) def _parse_delete(output: str) -> _DeleteJson: raw = json.loads(_json_blob(output)) assert isinstance(raw, dict) status = raw["status"] tag = raw["tag"] was_draft = raw["was_draft"] remote_retracted = raw["remote_retracted"] dry_run = raw["dry_run"] assert isinstance(status, str) assert isinstance(tag, str) assert isinstance(was_draft, bool) assert isinstance(remote_retracted, bool) assert isinstance(dry_run, bool) return _DeleteJson( status=status, tag=tag, was_draft=was_draft, remote_retracted=remote_retracted, dry_run=dry_run, ) def _parse_show(output: str) -> _ShowJson: raw = json.loads(_json_blob(output)) assert isinstance(raw, dict) tag = raw["tag"] channel = raw["channel"] commit_id = raw["commit_id"] snapshot_id = raw["snapshot_id"] release_id = raw["release_id"] is_draft = raw["is_draft"] assert isinstance(tag, str) assert isinstance(channel, str) assert isinstance(commit_id, str) assert isinstance(snapshot_id, str) assert isinstance(release_id, str) assert isinstance(is_draft, bool) return _ShowJson( tag=tag, channel=channel, commit_id=commit_id, snapshot_id=snapshot_id, release_id=release_id, is_draft=is_draft, ) # --------------------------------------------------------------------------- # Security: ANSI injection in body text # --------------------------------------------------------------------------- def test_body_ansi_stripped_in_text_output(tmp_path: pathlib.Path) -> None: """ANSI codes in release.body must be stripped before terminal output.""" root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) ansi_body = "\x1b[31mDanger\x1b[0m" result = _invoke( ["release", "add", "v1.0.0", "--body", ansi_body], root ) assert result.exit_code == 0 show = _invoke(["release", "read", "v1.0.0"], root) assert result.exit_code == 0 # ANSI escape sequences must not appear in the text output. assert "\x1b[" not in show.output def test_title_ansi_stripped_in_text_output(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) ansi_title = "\x1b[1mBold\x1b[0m Release" result = _invoke(["release", "add", "v1.0.0", "--title", ansi_title], root) assert result.exit_code == 0 show = _invoke(["release", "read", "v1.0.0"], root) assert "\x1b[" not in show.output # --------------------------------------------------------------------------- # Security: TTY guard on delete without --yes # --------------------------------------------------------------------------- def test_delete_published_non_tty_without_yes_fails(tmp_path: pathlib.Path) -> None: """Non-TTY delete without --yes must exit USER_ERROR, never block.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0", is_draft=False) result = _invoke(["release", "delete", "v1.0.0"], root) assert result.exit_code != 0 assert "TTY" in result.stderr or "--yes" in result.stderr def test_delete_draft_non_tty_without_yes_fails(tmp_path: pathlib.Path) -> None: """Even draft deletes require --yes in non-TTY contexts.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0-alpha.1", is_draft=True) result = _invoke(["release", "delete", "v1.0.0-alpha.1"], root) assert result.exit_code != 0 assert "TTY" in result.stderr or "--yes" in result.stderr # --------------------------------------------------------------------------- # Error routing: errors go to stderr # --------------------------------------------------------------------------- def test_add_invalid_semver_error_to_stderr(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = _invoke(["release", "add", "not-semver"], root) assert result.exit_code != 0 def test_add_duplicate_error_to_stderr(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) _invoke(["release", "add", "v1.0.0"], root) result = _invoke(["release", "add", "v1.0.0"], root) assert result.exit_code != 0 assert "already exists" in result.stderr.lower() def test_show_not_found_error(tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) result = _invoke(["release", "read", "v99.0.0"], root) assert result.exit_code != 0 assert "not found" in result.stderr.lower() def test_push_not_found_locally_error(tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) result = _invoke(["release", "push", "v99.0.0", "--remote", "origin"], root) assert result.exit_code != 0 assert "not found" in result.stderr.lower() def test_delete_not_found_error(tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) result = _invoke(["release", "delete", "v99.0.0", "--yes"], root) assert result.exit_code != 0 assert "not found" in result.stderr.lower() # --------------------------------------------------------------------------- # JSON schema: --json on add # --------------------------------------------------------------------------- def test_add_json_output_schema(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id, message="feat: new", sem_ver_bump="minor") result = _invoke(["release", "add", "v1.0.0", "--title", "First", "--json"], root) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["tag"] == "v1.0.0" assert data["channel"] == "stable" assert isinstance(data["release_id"], str) assert isinstance(data["changelog"], list) assert data["is_draft"] is False def test_add_draft_json_output(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = _invoke( ["release", "add", "v1.0.0-alpha.1", "--draft", "--json"], root ) assert result.exit_code == 0, result.output data = json.loads(result.output) assert data["is_draft"] is True assert data["channel"] == "alpha" # --------------------------------------------------------------------------- # JSON schema: --json on show # --------------------------------------------------------------------------- def test_show_json_schema(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v2.0.0") result = _invoke(["release", "read", "v2.0.0", "--json"], root) assert result.exit_code == 0, result.output parsed = _parse_show(result.output) assert parsed["tag"] == "v2.0.0" assert parsed["channel"] == "stable" assert parsed["is_draft"] is False # --------------------------------------------------------------------------- # JSON schema: --json on list # --------------------------------------------------------------------------- def test_list_json_schema(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") _write_release(root, repo_id, "v1.1.0-beta.1") result = _invoke(["release", "list", "--include-drafts", "--json"], root) assert result.exit_code == 0, result.output data = json.loads(result.output) releases = data["releases"] assert isinstance(releases, list) assert len(releases) >= 1 tags = {r["tag"] for r in releases} assert "v1.0.0" in tags def test_list_empty_json(tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) result = _invoke(["release", "list", "--json"], root) assert result.exit_code == 0 data = json.loads(result.output) assert data["releases"] == [] # --------------------------------------------------------------------------- # JSON schema: --dry-run push # --------------------------------------------------------------------------- def test_push_dry_run_json_schema(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke( ["release", "push", "v1.0.0", "--remote", "origin", "--dry-run", "--json"], root ) assert result.exit_code == 0, result.output parsed = _parse_push(result.output) assert parsed["status"] == "dry_run" assert parsed["tag"] == "v1.0.0" assert parsed["remote"] == "origin" assert parsed["dry_run"] is True def test_push_dry_run_no_network_call(tmp_path: pathlib.Path) -> None: """--dry-run push must not call transport.create_release.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") with patch("muse.cli.commands.release.make_transport") as mock_transport: result = _invoke( ["release", "push", "v1.0.0", "--remote", "origin", "--dry-run"], root ) assert result.exit_code == 0 # make_transport should not be called at all in dry-run mode. mock_transport.assert_not_called() def test_push_dry_run_text_output(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke(["release", "push", "v1.0.0", "--remote", "origin", "--dry-run"], root) assert result.exit_code == 0 assert "dry-run" in result.output.lower() or "would push" in result.output.lower() assert "v1.0.0" in result.output # --------------------------------------------------------------------------- # JSON schema: --dry-run delete # --------------------------------------------------------------------------- def test_delete_dry_run_json_schema(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0", is_draft=False) result = _invoke( ["release", "delete", "v1.0.0", "--dry-run", "--json"], root ) assert result.exit_code == 0, result.output parsed = _parse_delete(result.output) assert parsed["status"] == "dry_run" assert parsed["tag"] == "v1.0.0" assert parsed["was_draft"] is False assert parsed["remote_retracted"] is False assert parsed["dry_run"] is True def test_delete_dry_run_no_deletion(tmp_path: pathlib.Path) -> None: """--dry-run delete must not remove the release record.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0", is_draft=False) result = _invoke(["release", "delete", "v1.0.0", "--dry-run"], root) assert result.exit_code == 0 # Release must still exist. assert get_release_for_tag(root, repo_id, "v1.0.0") is not None def test_delete_dry_run_text_output(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0-alpha.1", is_draft=True) result = _invoke(["release", "delete", "v1.0.0-alpha.1", "--dry-run"], root) assert result.exit_code == 0 assert "v1.0.0-alpha.1" in result.output assert "dry-run" in result.output.lower() or "would delete" in result.output.lower() def test_delete_draft_dry_run_schema(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0-beta.1", is_draft=True) result = _invoke( ["release", "delete", "v1.0.0-beta.1", "--dry-run", "--json"], root ) assert result.exit_code == 0 parsed = _parse_delete(result.output) assert parsed["was_draft"] is True # --------------------------------------------------------------------------- # JSON schema: delete --yes --json # --------------------------------------------------------------------------- def test_delete_yes_json_schema(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0", is_draft=False) result = _invoke(["release", "delete", "v1.0.0", "--yes", "--json"], root) assert result.exit_code == 0, result.output parsed = _parse_delete(result.output) assert parsed["status"] == "deleted" assert parsed["tag"] == "v1.0.0" assert parsed["was_draft"] is False assert parsed["remote_retracted"] is False assert parsed["dry_run"] is False def test_delete_draft_yes_json_schema(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0-alpha.1", is_draft=True) result = _invoke( ["release", "delete", "v1.0.0-alpha.1", "--yes", "--json"], root ) assert result.exit_code == 0, result.output parsed = _parse_delete(result.output) assert parsed["status"] == "deleted" assert parsed["was_draft"] is True # --------------------------------------------------------------------------- # --commit alias for --ref on add # --------------------------------------------------------------------------- def test_add_commit_alias_for_ref(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) commit_id = _make_commit(root, repo_id, message="chore: setup") result = _invoke( ["release", "add", "v1.0.0", "--commit", commit_id], root ) assert result.exit_code == 0, result.output # --------------------------------------------------------------------------- # Integration: full lifecycle # --------------------------------------------------------------------------- def test_full_lifecycle_add_show_list_delete(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id, message="feat: init") # Add add_result = _invoke( ["release", "add", "v1.0.0", "--title", "First release", "--json"], root ) assert add_result.exit_code == 0, add_result.output add_data = json.loads(add_result.output) assert add_data["tag"] == "v1.0.0" # Show show_result = _invoke(["release", "read", "v1.0.0", "--json"], root) assert show_result.exit_code == 0 show_data = _parse_show(show_result.output) assert show_data["tag"] == "v1.0.0" # List list_result = _invoke(["release", "list", "--json"], root) assert list_result.exit_code == 0 list_data = json.loads(list_result.output)["releases"] assert any(r["tag"] == "v1.0.0" for r in list_data) # Delete del_result = _invoke(["release", "delete", "v1.0.0", "--yes", "--json"], root) assert del_result.exit_code == 0 del_data = _parse_delete(del_result.output) assert del_data["status"] == "deleted" # Confirm gone list_after = _invoke(["release", "list", "--json"], root) assert list_after.exit_code == 0 assert json.loads(list_after.output)["releases"] == [] def test_lifecycle_draft_to_promoted(tmp_path: pathlib.Path) -> None: """Create a draft, verify it's excluded from list by default, then delete it.""" root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) _invoke(["release", "add", "v1.0.0-rc.1", "--draft"], root) # Not in default list (no --include-drafts). no_draft = _invoke(["release", "list", "--json"], root) data = json.loads(no_draft.output)["releases"] assert all(r["tag"] != "v1.0.0-rc.1" for r in data) # Visible with --include-drafts. with_drafts = _invoke(["release", "list", "--include-drafts", "--json"], root) data2 = json.loads(with_drafts.output)["releases"] assert any(r["tag"] == "v1.0.0-rc.1" for r in data2) # Delete draft. del_result = _invoke(["release", "delete", "v1.0.0-rc.1", "--yes"], root) assert del_result.exit_code == 0 def test_channel_filter_integration(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") _write_release(root, repo_id, "v1.1.0-beta.1") stable = _invoke(["release", "list", "--channel", "stable", "--json"], root) assert stable.exit_code == 0 stable_data = json.loads(stable.output)["releases"] assert all(r["channel"] == "stable" for r in stable_data) assert any(r["tag"] == "v1.0.0" for r in stable_data) beta = _invoke(["release", "list", "--channel", "beta", "--json"], root) assert beta.exit_code == 0 beta_data = json.loads(beta.output)["releases"] assert all(r["channel"] == "beta" for r in beta_data) # --------------------------------------------------------------------------- # E2E: help output # --------------------------------------------------------------------------- def test_release_help() -> None: result = runner.invoke(None, ["release", "--help"]) assert result.exit_code == 0 def test_add_help() -> None: result = runner.invoke(None, ["release", "add", "--help"]) assert result.exit_code == 0 assert "--json" in result.output assert "--draft" in result.output assert "--channel" in result.output def test_push_help() -> None: result = runner.invoke(None, ["release", "push", "--help"]) assert result.exit_code == 0 assert "--json" in result.output assert "--dry-run" in result.output def test_delete_help() -> None: result = runner.invoke(None, ["release", "delete", "--help"]) assert result.exit_code == 0 assert "--json" in result.output assert "--dry-run" in result.output assert "--yes" in result.output # --------------------------------------------------------------------------- # E2E: text output correctness # --------------------------------------------------------------------------- def test_add_text_output(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = _invoke(["release", "add", "v1.2.3", "--title", "Summer drop"], root) assert result.exit_code == 0 assert "v1.2.3" in result.output def test_delete_text_output(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke(["release", "delete", "v1.0.0", "--yes"], root) assert result.exit_code == 0 assert "deleted" in result.output.lower() def test_push_dry_run_text_mentions_tag(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke(["release", "push", "v1.0.0", "--remote", "origin", "--dry-run"], root) assert result.exit_code == 0 assert "v1.0.0" in result.output # --------------------------------------------------------------------------- # Stress: 50 releases, list all, concurrent reads # --------------------------------------------------------------------------- def test_stress_50_releases_list(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) for i in range(50): _write_release(root, repo_id, f"v1.{i}.0") releases = list_releases(root, repo_id) assert len(releases) == 50 def test_stress_list_json_50(tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) for i in range(50): _write_release(root, repo_id, f"v2.{i}.0") result = _invoke(["release", "list", "--json"], root) assert result.exit_code == 0 data = json.loads(result.output) assert len(data["releases"]) == 50 def test_stress_concurrent_list_reads(tmp_path: pathlib.Path) -> None: """Concurrent list_releases calls on the same repo must not crash.""" root, repo_id = _init_repo(tmp_path) for i in range(20): _write_release(root, repo_id, f"v3.{i}.0") errors: list[str] = [] def _read() -> None: try: releases = list_releases(root, repo_id) assert len(releases) == 20 except Exception as exc: # noqa: BLE001 errors.append(str(exc)) threads = [threading.Thread(target=_read) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent failures: {errors}" def test_stress_add_delete_cycle(tmp_path: pathlib.Path) -> None: """Add and delete 20 releases in sequence; list must be empty at end.""" root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) for i in range(20): tag = f"v4.{i}.0" add = _invoke(["release", "add", tag, "--json"], root) assert add.exit_code == 0, add.output rel_id = json.loads(add.output)["release_id"] deleted = delete_release(root, repo_id, rel_id) assert deleted remaining = list_releases(root, repo_id) assert remaining == [] # =========================================================================== # TestReleaseAddExtended — 18 tests # =========================================================================== class TestReleaseAddExtended: def test_exit_code_zero_on_success(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = _invoke(["release", "add", "v1.0.0"], root) assert result.exit_code == 0 def test_outside_repo_exits_2(self, tmp_path: pathlib.Path) -> None: result = _invoke(["release", "add", "v1.0.0"], tmp_path) assert result.exit_code == 2 def test_invalid_semver_exits_1(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = _invoke(["release", "add", "not-semver"], root) assert result.exit_code == 1 def test_duplicate_tag_exits_1(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) _invoke(["release", "add", "v1.0.0"], root) result = _invoke(["release", "add", "v1.0.0"], root) assert result.exit_code == 1 def test_j_alias(self, tmp_path: pathlib.Path) -> None: """-j must produce identical JSON output to --json.""" root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) r1 = _invoke(["release", "add", "v1.0.0", "--json"], root) assert r1.exit_code == 0 root2 = tmp_path / "r2" root2.mkdir() root2b, repo_id2 = _init_repo(root2) _make_commit(root2b, repo_id2) r2 = _invoke(["release", "add", "v1.0.0", "-j"], root2b) assert r2.exit_code == 0 d1, d2 = json.loads(r1.output), json.loads(r2.output) assert d1.keys() == d2.keys() def test_json_compact_no_indent(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = _invoke(["release", "add", "v1.0.0", "--json"], root) assert result.exit_code == 0 assert "\n" not in result.output.strip() def test_json_schema_all_key_fields(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = _invoke(["release", "add", "v1.0.0", "--json"], root) assert result.exit_code == 0 data = json.loads(result.output) for field in ("tag", "channel", "commit_id", "snapshot_id", "release_id", "is_draft", "changelog"): assert field in data, f"Missing field: {field}" def test_channel_inferred_stable_no_pre(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) data = json.loads(_invoke(["release", "add", "v1.0.0", "--json"], root).output) assert data["channel"] == "stable" def test_channel_inferred_beta(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) data = json.loads(_invoke(["release", "add", "v1.0.0-beta.1", "--json"], root).output) assert data["channel"] == "beta" def test_channel_inferred_alpha(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) data = json.loads(_invoke(["release", "add", "v1.0.0-alpha.1", "--json"], root).output) assert data["channel"] == "alpha" def test_channel_override(self, tmp_path: pathlib.Path) -> None: """Explicit --channel overrides semver inference.""" root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) data = json.loads( _invoke(["release", "add", "v1.0.0", "--channel", "beta", "--json"], root).output ) assert data["channel"] == "beta" def test_unknown_channel_exits_1(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = _invoke(["release", "add", "v1.0.0", "--channel", "canary"], root) assert result.exit_code != 0 def test_draft_flag_in_json(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) data = json.loads( _invoke(["release", "add", "v1.0.0-alpha.1", "--draft", "--json"], root).output ) assert data["is_draft"] is True def test_no_draft_by_default(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) data = json.loads(_invoke(["release", "add", "v1.0.0", "--json"], root).output) assert data["is_draft"] is False def test_changelog_list_in_json(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id, message="feat: one", sem_ver_bump="minor") _make_commit(root, repo_id, message="fix: two", sem_ver_bump="patch") data = json.loads(_invoke(["release", "add", "v1.0.0", "--json"], root).output) assert isinstance(data["changelog"], list) assert len(data["changelog"]) == 2 def test_ref_not_found_exits_1(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = _invoke(["release", "add", "v1.0.0", "--ref", "nonexistent"], root) assert result.exit_code == 1 def test_help_mentions_agent_quickstart(self) -> None: result = runner.invoke(None, ["release", "add", "--help"]) assert "Agent quickstart" in result.output def test_help_mentions_exit_codes(self) -> None: result = runner.invoke(None, ["release", "add", "--help"]) assert "Exit codes" in result.output # =========================================================================== # TestReleaseAddSecurity — 6 tests # =========================================================================== class TestReleaseAddSecurity: def test_ansi_in_title_stripped_text(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) _invoke(["release", "add", "v1.0.0", "--title", "\x1b[1mBold\x1b[0m"], root) show = _invoke(["release", "read", "v1.0.0"], root) assert "\x1b" not in show.output def test_ansi_in_body_stripped_text(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) _invoke(["release", "add", "v1.0.0", "--body", "\x1b[31mDanger\x1b[0m"], root) show = _invoke(["release", "read", "v1.0.0"], root) assert "\x1b" not in show.output def test_control_char_in_title_stripped_text(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) _invoke(["release", "add", "v1.0.0", "--title", "Evil\x07Bell"], root) show = _invoke(["release", "read", "v1.0.0"], root) assert "\x07" not in show.output def test_no_json_outside_repo(self, tmp_path: pathlib.Path) -> None: result = _invoke(["release", "add", "v1.0.0", "--json"], tmp_path) assert result.exit_code == 2 assert not result.output.strip().startswith("{") def test_no_traceback_invalid_semver(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) result = _invoke(["release", "add", "not-valid"], root) assert "Traceback" not in result.output def test_no_traceback_outside_repo(self, tmp_path: pathlib.Path) -> None: result = _invoke(["release", "add", "v1.0.0"], tmp_path) assert result.exit_code == 2 assert "Traceback" not in result.output # =========================================================================== # TestReleaseAddStress — 3 tests # =========================================================================== class TestReleaseAddStress: def test_20_sequential_patch_releases(self, tmp_path: pathlib.Path) -> None: """20 sequentially added patch releases all succeed.""" root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) for i in range(20): result = _invoke(["release", "add", f"v1.0.{i}", "--json"], root) assert result.exit_code == 0, f"v1.0.{i} failed: {result.output}" assert len(list_releases(root, repo_id)) == 20 def test_20_releases_across_channels(self, tmp_path: pathlib.Path) -> None: """Releases spanning all four channels are created correctly.""" root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) tags = ( [f"v1.{i}.0" for i in range(5)] + [f"v2.{i}.0-beta.1" for i in range(5)] + [f"v3.{i}.0-alpha.1" for i in range(5)] + [f"v4.{i}.0-nightly.1" for i in range(5)] ) for tag in tags: r = _invoke(["release", "add", tag, "--json"], root) assert r.exit_code == 0, f"{tag}: {r.output}" releases = list_releases(root, repo_id, include_drafts=True) assert len(releases) == 20 def test_changelog_grows_with_commits(self, tmp_path: pathlib.Path) -> None: """Changelog for each successive release only includes commits since prior.""" root, repo_id = _init_repo(tmp_path) for i in range(5): _make_commit(root, repo_id, message=f"feat: step {i}", sem_ver_bump="minor") d1 = json.loads(_invoke(["release", "add", "v1.0.0", "--json"], root).output) assert len(d1["changelog"]) == 5 for i in range(3): _make_commit(root, repo_id, message=f"fix: patch {i}", sem_ver_bump="patch") d2 = json.loads(_invoke(["release", "add", "v1.0.1", "--json"], root).output) assert len(d2["changelog"]) == 3 # =========================================================================== # TestReleaseListExtended — 18 tests # =========================================================================== class TestReleaseListExtended: def test_exit_code_zero_empty(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) assert _invoke(["release", "list"], root).exit_code == 0 def test_exit_code_zero_with_releases(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") assert _invoke(["release", "list"], root).exit_code == 0 def test_outside_repo_exits_2(self, tmp_path: pathlib.Path) -> None: assert _invoke(["release", "list"], tmp_path).exit_code == 2 def test_j_alias(self, tmp_path: pathlib.Path) -> None: """-j must produce the same schema as --json (duration_ms varies between runs).""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") r1 = _invoke(["release", "list", "--json"], root) r2 = _invoke(["release", "list", "-j"], root) assert r1.exit_code == 0 and r2.exit_code == 0 d1 = {k: v for k, v in json.loads(r1.output).items() if k not in {"duration_ms", "timestamp"}} d2 = {k: v for k, v in json.loads(r2.output).items() if k not in {"duration_ms", "timestamp"}} assert d1 == d2 def test_json_compact_no_indent(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke(["release", "list", "--json"], root) assert result.exit_code == 0 assert "\n" not in result.output.strip() def test_json_empty_is_array(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) data = json.loads(_invoke(["release", "list", "--json"], root).output) assert data["releases"] == [] def test_json_contains_all_key_fields(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") data = json.loads(_invoke(["release", "list", "--json"], root).output) releases = data["releases"] assert len(releases) == 1 rec = releases[0] for field in ("tag", "channel", "commit_id", "snapshot_id", "release_id", "is_draft"): assert field in rec, f"Missing field: {field}" def test_drafts_excluded_by_default(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0", is_draft=True) data = json.loads(_invoke(["release", "list", "--json"], root).output) assert data["releases"] == [] def test_drafts_included_with_flag(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0", is_draft=True) data = json.loads( _invoke(["release", "list", "--include-drafts", "--json"], root).output ) releases = data["releases"] assert len(releases) == 1 assert releases[0]["is_draft"] is True def test_channel_filter_stable(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") _write_release(root, repo_id, "v1.1.0-beta.1") data = json.loads( _invoke(["release", "list", "--channel", "stable", "--json"], root).output ) releases = data["releases"] assert len(releases) == 1 assert releases[0]["channel"] == "stable" def test_channel_filter_beta(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") _write_release(root, repo_id, "v1.1.0-beta.1") data = json.loads( _invoke(["release", "list", "--channel", "beta", "--json"], root).output ) releases = data["releases"] assert len(releases) == 1 assert releases[0]["channel"] == "beta" def test_channel_filter_empty_returns_all(self, tmp_path: pathlib.Path) -> None: """No --channel flag returns all channels.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") _write_release(root, repo_id, "v1.1.0-beta.1") data = json.loads(_invoke(["release", "list", "--json"], root).output) assert len(data["releases"]) == 2 def test_text_shows_tag_and_channel(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v2.0.0") result = _invoke(["release", "list"], root) assert result.exit_code == 0 assert "v2.0.0" in result.output assert "stable" in result.output def test_text_empty_message(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) result = _invoke(["release", "list"], root) assert "No releases" in result.output def test_remote_not_configured_exits_1(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) result = _invoke(["release", "list", "--remote", "nosuchremote"], root) assert result.exit_code == 1 def test_multiple_releases_all_returned(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) for i in range(5): _write_release(root, repo_id, f"v1.{i}.0") data = json.loads(_invoke(["release", "list", "--json"], root).output) assert len(data["releases"]) == 5 def test_help_mentions_agent_quickstart(self) -> None: result = runner.invoke(None, ["release", "list", "--help"]) assert "Agent quickstart" in result.output def test_help_mentions_exit_codes(self) -> None: result = runner.invoke(None, ["release", "list", "--help"]) assert "Exit codes" in result.output # =========================================================================== # TestReleaseListSecurity — 6 tests # =========================================================================== class TestReleaseListSecurity: def test_ansi_in_title_stripped_text(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) from muse.core.semver import ReleaseChannel sv = SemVerTag(major=1, minor=0, patch=0, pre="", build="") rec = ReleaseRecord( repo_id=repo_id, release_id=fake_id("release"), tag="v1.0.0", semver=sv, channel="stable", commit_id="a" * 64, snapshot_id="b" * 64, title="\x1b[31mEvil\x1b[0m", body="", changelog=[], ) write_release(root, rec) result = _invoke(["release", "list"], root) assert result.exit_code == 0 assert "\x1b" not in result.output def test_control_char_in_title_stripped_text(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) sv = SemVerTag(major=1, minor=0, patch=0, pre="", build="") rec = ReleaseRecord( repo_id=repo_id, release_id=fake_id("release"), tag="v1.0.0", semver=sv, channel="stable", commit_id="a" * 64, snapshot_id="b" * 64, title="Evil\x07Bell", body="", changelog=[], ) write_release(root, rec) result = _invoke(["release", "list"], root) assert "\x07" not in result.output def test_no_json_outside_repo(self, tmp_path: pathlib.Path) -> None: result = _invoke(["release", "list", "--json"], tmp_path) assert result.exit_code == 2 assert not result.output.strip().startswith("[") def test_no_traceback_outside_repo(self, tmp_path: pathlib.Path) -> None: result = _invoke(["release", "list"], tmp_path) assert result.exit_code == 2 assert "Traceback" not in result.output def test_no_traceback_unknown_remote(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) result = _invoke(["release", "list", "--remote", "badremote"], root) assert "Traceback" not in result.output def test_json_output_on_stdout(self, tmp_path: pathlib.Path) -> None: """JSON object goes to stdout on success.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke(["release", "list", "--json"], root) assert result.output.strip().startswith("{") # =========================================================================== # TestReleaseListStress — 3 tests # =========================================================================== class TestReleaseListStress: def test_100_releases_json(self, tmp_path: pathlib.Path) -> None: """100 releases returned correctly in JSON mode.""" root, repo_id = _init_repo(tmp_path) for i in range(100): _write_release(root, repo_id, f"v1.{i}.0") data = json.loads(_invoke(["release", "list", "--json"], root).output) assert len(data["releases"]) == 100 def test_100_releases_text(self, tmp_path: pathlib.Path) -> None: """100 releases listed in text mode without error.""" root, repo_id = _init_repo(tmp_path) for i in range(100): _write_release(root, repo_id, f"v2.{i}.0") result = _invoke(["release", "list"], root) assert result.exit_code == 0 assert "v2.0.0" in result.output def test_channel_filter_25_each(self, tmp_path: pathlib.Path) -> None: """25 releases per channel — filter returns exactly 25 each.""" import hashlib as _hl root, repo_id = _init_repo(tmp_path) channels = [("stable", "v1.{}.0"), ("beta", "v2.{}.0-beta.1"), ("alpha", "v3.{}.0-alpha.1"), ("nightly", "v4.{}.0-nightly.1")] for _ch, tmpl in channels: for i in range(25): _write_release(root, repo_id, tmpl.format(i)) for ch, _ in channels: data = json.loads( _invoke(["release", "list", "--channel", ch, "--json"], root).output ) assert len(data["releases"]) == 25, f"Expected 25 for channel {ch}, got {len(data['releases'])}" # =========================================================================== # TestReleaseShowExtended — 18 tests # =========================================================================== class TestReleaseShowExtended: def test_exit_code_zero(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") assert _invoke(["release", "read", "v1.0.0"], root).exit_code == 0 def test_not_found_exits_4(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) assert _invoke(["release", "read", "v99.0.0"], root).exit_code == 4 def test_outside_repo_exits_2(self, tmp_path: pathlib.Path) -> None: assert _invoke(["release", "read", "v1.0.0"], tmp_path).exit_code == 2 def test_j_alias(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") r1 = _invoke(["release", "read", "v1.0.0", "--json"], root) r2 = _invoke(["release", "read", "v1.0.0", "-j"], root) assert r1.exit_code == 0 and r2.exit_code == 0 d1 = {k: v for k, v in json.loads(r1.output).items() if k not in {"duration_ms", "timestamp"}} d2 = {k: v for k, v in json.loads(r2.output).items() if k not in {"duration_ms", "timestamp"}} assert d1 == d2 def test_json_compact_no_indent(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke(["release", "read", "v1.0.0", "--json"], root) assert result.exit_code == 0 assert "\n" not in result.output.strip() def test_json_is_object_not_array(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke(["release", "read", "v1.0.0", "--json"], root) assert result.output.strip().startswith("{") def test_json_all_key_fields(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") data = json.loads(_invoke(["release", "read", "v1.0.0", "--json"], root).output) for field in ("tag", "channel", "commit_id", "snapshot_id", "release_id", "is_draft", "changelog", "semver", "title", "body", "created_at"): assert field in data, f"Missing field: {field}" def test_text_shows_tag(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v2.3.4") assert "v2.3.4" in _invoke(["release", "read", "v2.3.4"], root).output def test_text_shows_channel(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") assert "stable" in _invoke(["release", "read", "v1.0.0"], root).output def test_text_shows_commit(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke(["release", "read", "v1.0.0"], root) assert "Commit" in result.output def test_text_shows_created_at(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") assert "Created" in _invoke(["release", "read", "v1.0.0"], root).output def test_text_shows_title_when_set(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id) _invoke(["release", "add", "v1.0.0", "--title", "Summer Drop"], root) assert "Summer Drop" in _invoke(["release", "read", "v1.0.0"], root).output def test_text_draft_label(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0", is_draft=True) assert "[DRAFT]" in _invoke(["release", "read", "v1.0.0"], root).output def test_text_no_draft_label_for_non_draft(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0", is_draft=False) assert "[DRAFT]" not in _invoke(["release", "read", "v1.0.0"], root).output def test_text_changelog_shows_commit_count(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) _make_commit(root, repo_id, message="feat: a", sem_ver_bump="minor") _make_commit(root, repo_id, message="fix: b", sem_ver_bump="patch") _invoke(["release", "add", "v1.0.0"], root) result = _invoke(["release", "read", "v1.0.0"], root) assert "2 commits" in result.output def test_changelog_truncated_at_20_text(self, tmp_path: pathlib.Path) -> None: """Changelogs > 20 entries show a '… and N more' footer.""" root, repo_id = _init_repo(tmp_path) for i in range(25): _make_commit(root, repo_id, message=f"feat: step {i}", sem_ver_bump="minor") _invoke(["release", "add", "v1.0.0"], root) result = _invoke(["release", "read", "v1.0.0"], root) assert "more" in result.output def test_help_mentions_agent_quickstart(self) -> None: assert "Agent quickstart" in runner.invoke(None, ["release", "read", "--help"]).output def test_help_mentions_exit_codes(self) -> None: assert "Exit codes" in runner.invoke(None, ["release", "read", "--help"]).output # =========================================================================== # TestReleaseShowSecurity — 6 tests # =========================================================================== class TestReleaseShowSecurity: def _write_crafted( self, root: pathlib.Path, repo_id: str, tag: str = "v1.0.0", title: str = "", body: str = "", channel: str = "stable", ) -> None: sv = SemVerTag(major=1, minor=0, patch=0, pre="", build="") rec = ReleaseRecord( repo_id=repo_id, release_id=fake_id("release"), tag=tag, semver=sv, channel=channel, commit_id="a" * 64, snapshot_id="b" * 64, title=title, body=body, changelog=[], ) write_release(root, rec) def test_ansi_in_title_stripped(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) self._write_crafted(root, repo_id, title="\x1b[31mEvil\x1b[0m") assert "\x1b" not in _invoke(["release", "read", "v1.0.0"], root).output def test_ansi_in_body_stripped(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) self._write_crafted(root, repo_id, body="\x1b[32mInjected\x1b[0m") assert "\x1b" not in _invoke(["release", "read", "v1.0.0"], root).output def test_control_char_in_title_stripped(self, tmp_path: pathlib.Path) -> None: root, repo_id = _init_repo(tmp_path) self._write_crafted(root, repo_id, title="Evil\x07Bell") assert "\x07" not in _invoke(["release", "read", "v1.0.0"], root).output def test_no_json_outside_repo(self, tmp_path: pathlib.Path) -> None: result = _invoke(["release", "read", "v1.0.0", "--json"], tmp_path) assert result.exit_code == 2 assert not result.output.strip().startswith("{") def test_no_traceback_not_found(self, tmp_path: pathlib.Path) -> None: root, _ = _init_repo(tmp_path) result = _invoke(["release", "read", "v99.0.0"], root) assert "Traceback" not in result.output def test_no_traceback_outside_repo(self, tmp_path: pathlib.Path) -> None: result = _invoke(["release", "read", "v1.0.0"], tmp_path) assert "Traceback" not in result.output # =========================================================================== # TestReleaseShowStress — 3 tests # =========================================================================== class TestReleaseShowStress: def test_show_release_with_25_changelog_entries(self, tmp_path: pathlib.Path) -> None: """Show handles a 25-entry changelog — truncation footer appears.""" root, repo_id = _init_repo(tmp_path) for i in range(25): _make_commit(root, repo_id, message=f"feat: item {i}", sem_ver_bump="minor") _invoke(["release", "add", "v1.0.0"], root) result = _invoke(["release", "read", "v1.0.0"], root) assert result.exit_code == 0 assert "25 commits" in result.output assert "more" in result.output def test_show_20_distinct_releases(self, tmp_path: pathlib.Path) -> None: """show on each of 20 distinct releases all exit 0.""" root, repo_id = _init_repo(tmp_path) for i in range(20): _write_release(root, repo_id, f"v1.{i}.0") for i in range(20): r = _invoke(["release", "read", f"v1.{i}.0", "--json"], root) assert r.exit_code == 0, f"v1.{i}.0 failed: {r.output}" assert json.loads(r.output)["tag"] == f"v1.{i}.0" def test_concurrent_show_reads(self, tmp_path: pathlib.Path) -> None: """Concurrent get_release_for_tag calls on the same release must not crash.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") errors: list[str] = [] def _do_read() -> None: try: rec = get_release_for_tag(root, repo_id, "v1.0.0") assert rec is not None assert rec.tag == "v1.0.0" except Exception as exc: # noqa: BLE001 errors.append(str(exc)) threads = [threading.Thread(target=_do_read) for _ in range(10)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent failures: {errors}" # --------------------------------------------------------------------------- # Extended / Security / Stress tests for ``muse release push`` # --------------------------------------------------------------------------- class TestReleasePushExtended: """Unit, integration, and edge-case tests for ``muse release push``.""" def test_push_help_contains_agent_quickstart(self) -> None: result = runner.invoke(None, ["release", "push", "--help"]) assert result.exit_code == 0 assert "quickstart" in result.output.lower() or "muse release push v" in result.output def test_push_help_contains_json_schema(self) -> None: result = runner.invoke(None, ["release", "push", "--help"]) assert result.exit_code == 0 assert "release_id" in result.output def test_push_help_contains_exit_codes(self) -> None: result = runner.invoke(None, ["release", "push", "--help"]) assert result.exit_code == 0 assert "exit code" in result.output.lower() or "0 —" in result.output def test_push_j_alias_dry_run(self, tmp_path: pathlib.Path) -> None: """-j is an alias for --json.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke(["release", "push", "v1.0.0", "--remote", "origin", "--dry-run", "-j"], root) assert result.exit_code == 0 parsed = _parse_push(result.output) assert parsed["status"] == "dry_run" assert parsed["dry_run"] is True def test_push_dry_run_json_release_id_is_local(self, tmp_path: pathlib.Path) -> None: """dry-run JSON includes the local release_id (no network call).""" root, repo_id = _init_repo(tmp_path) rec = _write_release(root, repo_id, "v1.0.0") result = _invoke( ["release", "push", "v1.0.0", "--remote", "origin", "--dry-run", "--json"], root ) assert result.exit_code == 0 parsed = _parse_push(result.output) assert parsed["release_id"] == rec.release_id def test_push_dry_run_remote_default_is_origin(self, tmp_path: pathlib.Path) -> None: """--remote defaults to 'origin'.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke(["release", "push", "v1.0.0", "--dry-run", "--json"], root) assert result.exit_code == 0 parsed = _parse_push(result.output) assert parsed["remote"] == "origin" def test_push_dry_run_custom_remote_in_json(self, tmp_path: pathlib.Path) -> None: """Custom --remote name is reflected in JSON output.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke( ["release", "push", "v1.0.0", "--remote", "staging", "--dry-run", "--json"], root ) assert result.exit_code == 0 parsed = _parse_push(result.output) assert parsed["remote"] == "staging" def test_push_not_found_exits_4(self, tmp_path: pathlib.Path) -> None: """Missing local release exits with code 4.""" root, _ = _init_repo(tmp_path) result = _invoke(["release", "push", "v99.0.0", "--remote", "origin"], root) assert result.exit_code == 4 def test_push_not_found_error_to_stderr(self, tmp_path: pathlib.Path) -> None: """'not found' message goes to stderr, stdout is empty.""" root, _ = _init_repo(tmp_path) result = _invoke(["release", "push", "v99.0.0", "--remote", "origin"], root) assert result.exit_code != 0 assert "not found" in result.stderr.lower() def test_push_dry_run_no_transport_call(self, tmp_path: pathlib.Path) -> None: """--dry-run must not invoke transport.create_release.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") with patch("muse.cli.commands.release.make_transport") as mock_transport: result = _invoke( ["release", "push", "v1.0.0", "--remote", "origin", "--dry-run"], root ) assert result.exit_code == 0 mock_transport.assert_not_called() def test_push_text_output_mentions_tag(self, tmp_path: pathlib.Path) -> None: """Text dry-run output contains the tag.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v2.3.4") result = _invoke( ["release", "push", "v2.3.4", "--remote", "origin", "--dry-run"], root ) assert result.exit_code == 0 assert "v2.3.4" in result.output def test_push_text_output_mentions_remote(self, tmp_path: pathlib.Path) -> None: """Text dry-run output mentions the remote.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke( ["release", "push", "v1.0.0", "--remote", "myremote", "--dry-run"], root ) assert result.exit_code == 0 assert "myremote" in result.output def test_push_remote_not_configured_exits_1(self, tmp_path: pathlib.Path) -> None: """Missing remote config exits with code 1.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke(["release", "push", "v1.0.0", "--remote", "nonexistent"], root) assert result.exit_code == 1 def test_push_remote_error_exits_5(self, tmp_path: pathlib.Path) -> None: """TransportError from create_release exits with code 5.""" from muse.core.transport import TransportError root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") mock_t = MagicMock() mock_t.create_release.side_effect = TransportError("server error", 500) with patch("muse.cli.commands.release.make_transport", return_value=mock_t): with patch("muse.cli.commands.release.get_signing_identity", return_value="tok"): with patch("muse.cli.commands.release._resolve_remote_url", return_value="http://hub"): result = _invoke(["release", "push", "v1.0.0", "--remote", "origin"], root) assert result.exit_code == 5 def test_push_remote_error_message_to_stderr(self, tmp_path: pathlib.Path) -> None: """Transport error message appears in output.""" from muse.core.transport import TransportError root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") mock_t = MagicMock() mock_t.create_release.side_effect = TransportError("timeout reached", 0) with patch("muse.cli.commands.release.make_transport", return_value=mock_t): with patch("muse.cli.commands.release.get_signing_identity", return_value="tok"): with patch("muse.cli.commands.release._resolve_remote_url", return_value="http://hub"): result = _invoke(["release", "push", "v1.0.0", "--remote", "origin"], root) assert result.exit_code == 5 assert "push failed" in result.stderr.lower() def test_push_success_json_schema(self, tmp_path: pathlib.Path) -> None: """Successful push JSON has all required fields.""" remote_id = fake_id("remote-release") root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") mock_t = MagicMock() mock_t.create_release.return_value = remote_id with patch("muse.cli.commands.release.make_transport", return_value=mock_t): with patch("muse.cli.commands.release.get_signing_identity", return_value="tok"): with patch("muse.cli.commands.release._resolve_remote_url", return_value="http://hub"): result = _invoke( ["release", "push", "v1.0.0", "--remote", "origin", "--json"], root ) assert result.exit_code == 0 parsed = _parse_push(result.output) assert parsed["status"] == "pushed" assert parsed["tag"] == "v1.0.0" assert parsed["remote"] == "origin" assert parsed["release_id"] == remote_id assert parsed["dry_run"] is False def test_push_success_text_output(self, tmp_path: pathlib.Path) -> None: """Successful push text output mentions tag and remote.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.2.3") mock_t = MagicMock() mock_t.create_release.return_value = fake_id("remote-release-123") with patch("muse.cli.commands.release.make_transport", return_value=mock_t): with patch("muse.cli.commands.release.get_signing_identity", return_value="tok"): with patch("muse.cli.commands.release._resolve_remote_url", return_value="http://hub"): result = _invoke(["release", "push", "v1.2.3", "--remote", "origin"], root) assert result.exit_code == 0 assert "v1.2.3" in result.output assert "origin" in result.output def test_push_dry_run_tag_in_json(self, tmp_path: pathlib.Path) -> None: """dry-run JSON tag field matches the requested tag.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v3.1.4") result = _invoke( ["release", "push", "v3.1.4", "--remote", "origin", "--dry-run", "--json"], root ) assert result.exit_code == 0 assert json.loads(_json_blob(result.output))["tag"] == "v3.1.4" class TestReleasePushSecurity: """Security tests for ``muse release push``.""" def test_push_ansi_tag_stripped_in_dry_run_text(self, tmp_path: pathlib.Path) -> None: """ANSI escape in tag is stripped from dry-run text output.""" malicious_tag = "\x1b[31mv1.0.0\x1b[0m" root, repo_id = _init_repo(tmp_path) rec = ReleaseRecord( repo_id=repo_id, release_id=fake_id("release"), tag=malicious_tag, semver=SemVerTag(major=1, minor=0, patch=0, pre="", build=""), channel="stable", commit_id="a" * 64, snapshot_id="b" * 64, title="malicious", body="", changelog=[], is_draft=False, ) write_release(root, rec) result = _invoke( ["release", "push", malicious_tag, "--remote", "origin", "--dry-run"], root ) assert result.exit_code == 0 assert "\x1b[31m" not in result.output def test_push_ansi_remote_stripped_in_dry_run_text(self, tmp_path: pathlib.Path) -> None: """ANSI escape in remote name is stripped from dry-run text output.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") # Remote with ANSI — will hit "remote not configured" path but still # sanitize_display must strip control chars from error message output. malicious_remote = "\x1b[32morigin\x1b[0m" result = _invoke( ["release", "push", "v1.0.0", "--remote", malicious_remote, "--dry-run"], root ) # dry-run skips remote lookup; the remote name appears in output assert result.exit_code == 0 assert "\x1b[32m" not in result.output def test_push_control_char_tag_stripped_in_text(self, tmp_path: pathlib.Path) -> None: """Control characters in tag are stripped from dry-run text output.""" malicious_tag = "v1.0.0\r\ninjected" root, repo_id = _init_repo(tmp_path) rec = ReleaseRecord( repo_id=repo_id, release_id=fake_id("release"), tag=malicious_tag, semver=SemVerTag(major=1, minor=0, patch=0, pre="", build=""), channel="stable", commit_id="a" * 64, snapshot_id="b" * 64, title="ctrl", body="", changelog=[], is_draft=False, ) write_release(root, rec) result = _invoke( ["release", "push", malicious_tag, "--remote", "origin", "--dry-run"], root ) assert result.exit_code == 0 assert "\r" not in result.output def test_push_ansi_tag_preserved_in_json(self, tmp_path: pathlib.Path) -> None: """ANSI in tag is NOT stripped from JSON output (raw data for agents).""" malicious_tag = "\x1b[31mv1.0.0\x1b[0m" root, repo_id = _init_repo(tmp_path) rec = ReleaseRecord( repo_id=repo_id, release_id=fake_id("release"), tag=malicious_tag, semver=SemVerTag(major=1, minor=0, patch=0, pre="", build=""), channel="stable", commit_id="a" * 64, snapshot_id="b" * 64, title="malicious", body="", changelog=[], is_draft=False, ) write_release(root, rec) result = _invoke( ["release", "push", malicious_tag, "--remote", "origin", "--dry-run", "--json"], root ) assert result.exit_code == 0 data = json.loads(_json_blob(result.output)) # JSON carries raw tag; sanitization only applies to human-readable text assert data["tag"] == malicious_tag def test_push_remote_error_ansi_stripped(self, tmp_path: pathlib.Path) -> None: """ANSI in TransportError message is stripped from error output.""" from muse.core.transport import TransportError root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") mock_t = MagicMock() mock_t.create_release.side_effect = TransportError("\x1b[31mfailed\x1b[0m", 503) with patch("muse.cli.commands.release.make_transport", return_value=mock_t): with patch("muse.cli.commands.release.get_signing_identity", return_value="tok"): with patch("muse.cli.commands.release._resolve_remote_url", return_value="http://hub"): result = _invoke(["release", "push", "v1.0.0", "--remote", "origin"], root) assert result.exit_code == 5 assert "\x1b[31m" not in result.output def test_push_not_found_message_sanitized(self, tmp_path: pathlib.Path) -> None: """ANSI in 'not found' tag path is stripped from error message.""" root, _ = _init_repo(tmp_path) malicious_tag = "\x1b[31mv99.0.0\x1b[0m" result = _invoke(["release", "push", malicious_tag, "--remote", "origin"], root) assert result.exit_code != 0 assert "\x1b[31m" not in result.output class TestReleasePushStress: """Stress tests for ``muse release push``.""" def test_push_dry_run_50_different_tags(self, tmp_path: pathlib.Path) -> None: """50 different tags each dry-run push successfully.""" root, repo_id = _init_repo(tmp_path) for i in range(50): _write_release(root, repo_id, f"v1.{i}.0") for i in range(50): r = _invoke( ["release", "push", f"v1.{i}.0", "--remote", "origin", "--dry-run", "--json"], root, ) assert r.exit_code == 0, f"v1.{i}.0 failed: {r.output}" assert json.loads(_json_blob(r.output))["status"] == "dry_run" def test_push_concurrent_dry_run_reads(self, tmp_path: pathlib.Path) -> None: """Concurrent get_release_for_tag calls (push lookup path) must not crash.""" root, repo_id = _init_repo(tmp_path) for i in range(20): _write_release(root, repo_id, f"v2.{i}.0") errors: list[str] = [] def _do_lookup(tag: str) -> None: try: rec = get_release_for_tag(root, repo_id, tag) assert rec is not None assert rec.tag == tag except Exception as exc: # noqa: BLE001 errors.append(str(exc)) threads = [threading.Thread(target=_do_lookup, args=(f"v2.{i}.0",)) for i in range(20)] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent failures: {errors}" def test_push_dry_run_json_compact_no_indent(self, tmp_path: pathlib.Path) -> None: """JSON output is compact (no indentation), consistent with other commands.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke( ["release", "push", "v1.0.0", "--remote", "origin", "--dry-run", "--json"], root ) assert result.exit_code == 0 raw = _json_blob(result.output) # Compact JSON has no leading spaces on keys assert "\n " not in raw # --------------------------------------------------------------------------- # Extended / Security / Stress tests for ``muse release delete`` # --------------------------------------------------------------------------- class TestReleaseDeleteExtended: """Unit, integration, and edge-case tests for ``muse release delete``.""" def test_delete_help_contains_agent_quickstart(self) -> None: result = runner.invoke(None, ["release", "delete", "--help"]) assert result.exit_code == 0 assert "quickstart" in result.output.lower() or "muse release delete v" in result.output def test_delete_help_contains_json_schema(self) -> None: result = runner.invoke(None, ["release", "delete", "--help"]) assert result.exit_code == 0 assert "was_draft" in result.output def test_delete_help_contains_exit_codes(self) -> None: result = runner.invoke(None, ["release", "delete", "--help"]) assert result.exit_code == 0 assert "exit code" in result.output.lower() or "0 —" in result.output def test_delete_j_alias_dry_run(self, tmp_path: pathlib.Path) -> None: """-j is an alias for --json.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke(["release", "delete", "v1.0.0", "--dry-run", "-j"], root) assert result.exit_code == 0 parsed = _parse_delete(result.output) assert parsed["status"] == "dry_run" assert parsed["dry_run"] is True def test_delete_not_found_exits_4(self, tmp_path: pathlib.Path) -> None: """Missing local tag exits code 4.""" root, _ = _init_repo(tmp_path) result = _invoke(["release", "delete", "v99.0.0", "--yes"], root) assert result.exit_code == 4 def test_delete_yes_skips_confirmation(self, tmp_path: pathlib.Path) -> None: """--yes deletes without prompting in non-TTY context.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke(["release", "delete", "v1.0.0", "--yes"], root) assert result.exit_code == 0 assert get_release_for_tag(root, repo_id, "v1.0.0") is None def test_delete_yes_json_was_draft_false(self, tmp_path: pathlib.Path) -> None: """JSON was_draft reflects false for a published release.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0", is_draft=False) result = _invoke(["release", "delete", "v1.0.0", "--yes", "--json"], root) assert result.exit_code == 0 parsed = _parse_delete(result.output) assert parsed["was_draft"] is False def test_delete_yes_json_was_draft_true(self, tmp_path: pathlib.Path) -> None: """JSON was_draft reflects true for a draft release.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0-alpha.1", is_draft=True) result = _invoke(["release", "delete", "v1.0.0-alpha.1", "--yes", "--json"], root) assert result.exit_code == 0 parsed = _parse_delete(result.output) assert parsed["was_draft"] is True def test_delete_dry_run_preserves_release(self, tmp_path: pathlib.Path) -> None: """--dry-run must not remove the release record.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke(["release", "delete", "v1.0.0", "--dry-run"], root) assert result.exit_code == 0 assert get_release_for_tag(root, repo_id, "v1.0.0") is not None def test_delete_dry_run_json_remote_retracted_false(self, tmp_path: pathlib.Path) -> None: """dry-run JSON always has remote_retracted=false.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke( ["release", "delete", "v1.0.0", "--dry-run", "--remote", "origin", "--json"], root ) assert result.exit_code == 0 parsed = _parse_delete(result.output) assert parsed["remote_retracted"] is False assert parsed["dry_run"] is True def test_delete_dry_run_text_mentions_tag(self, tmp_path: pathlib.Path) -> None: """dry-run text output contains the tag.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v2.3.4") result = _invoke(["release", "delete", "v2.3.4", "--dry-run"], root) assert result.exit_code == 0 assert "v2.3.4" in result.output def test_delete_dry_run_text_mentions_remote(self, tmp_path: pathlib.Path) -> None: """dry-run text output mentions the remote when --remote is supplied.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke( ["release", "delete", "v1.0.0", "--dry-run", "--remote", "staging"], root ) assert result.exit_code == 0 assert "staging" in result.output def test_delete_remote_error_exits_5(self, tmp_path: pathlib.Path) -> None: """TransportError from delete_release_remote exits with code 5.""" from muse.core.transport import TransportError root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") mock_t = MagicMock() mock_t.delete_release_remote.side_effect = TransportError("gone", 404) with patch("muse.cli.commands.release.make_transport", return_value=mock_t): with patch("muse.cli.commands.release.get_signing_identity", return_value="tok"): with patch("muse.cli.commands.release._resolve_remote_url", return_value="http://hub"): result = _invoke( ["release", "delete", "v1.0.0", "--yes", "--remote", "origin"], root ) assert result.exit_code == 5 def test_delete_remote_success_sets_remote_retracted(self, tmp_path: pathlib.Path) -> None: """Successful remote retraction sets remote_retracted=true in JSON.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") mock_t = MagicMock() mock_t.delete_release_remote.return_value = None with patch("muse.cli.commands.release.make_transport", return_value=mock_t): with patch("muse.cli.commands.release.get_signing_identity", return_value="tok"): with patch("muse.cli.commands.release._resolve_remote_url", return_value="http://hub"): result = _invoke( ["release", "delete", "v1.0.0", "--yes", "--remote", "origin", "--json"], root, ) assert result.exit_code == 0 parsed = _parse_delete(result.output) assert parsed["status"] == "deleted" assert parsed["remote_retracted"] is True def test_delete_remote_not_configured_exits_1(self, tmp_path: pathlib.Path) -> None: """Unconfigured --remote exits with code 1.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke( ["release", "delete", "v1.0.0", "--yes", "--remote", "nonexistent"], root ) assert result.exit_code == 1 def test_delete_json_compact_no_indent(self, tmp_path: pathlib.Path) -> None: """JSON output is compact (no indentation).""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke(["release", "delete", "v1.0.0", "--dry-run", "--json"], root) assert result.exit_code == 0 raw = _json_blob(result.output) assert "\n " not in raw def test_delete_success_text_mentions_deleted(self, tmp_path: pathlib.Path) -> None: """Success text output says 'deleted'.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") result = _invoke(["release", "delete", "v1.0.0", "--yes"], root) assert result.exit_code == 0 assert "deleted" in result.output.lower() def test_delete_non_tty_without_yes_exits_1(self, tmp_path: pathlib.Path) -> None: """Non-TTY delete without --yes exits USER_ERROR (1), never blocks.""" root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") # CliRunner runs without a TTY by default. result = _invoke(["release", "delete", "v1.0.0"], root) assert result.exit_code == 1 class TestReleaseDeleteSecurity: """Security tests for ``muse release delete``.""" def test_delete_ansi_tag_stripped_in_dry_run_text(self, tmp_path: pathlib.Path) -> None: """ANSI escape in tag is stripped from dry-run text output.""" malicious_tag = "\x1b[31mv1.0.0\x1b[0m" root, repo_id = _init_repo(tmp_path) rec = ReleaseRecord( repo_id=repo_id, release_id=fake_id("release"), tag=malicious_tag, semver=SemVerTag(major=1, minor=0, patch=0, pre="", build=""), channel="stable", commit_id="a" * 64, snapshot_id="b" * 64, title="malicious", body="", changelog=[], is_draft=False, ) write_release(root, rec) result = _invoke(["release", "delete", malicious_tag, "--dry-run"], root) assert result.exit_code == 0 assert "\x1b[31m" not in result.output def test_delete_ansi_tag_stripped_in_success_text(self, tmp_path: pathlib.Path) -> None: """ANSI escape in tag is stripped from delete success text output.""" malicious_tag = "\x1b[32mv1.0.0\x1b[0m" root, repo_id = _init_repo(tmp_path) rec = ReleaseRecord( repo_id=repo_id, release_id=fake_id("release"), tag=malicious_tag, semver=SemVerTag(major=1, minor=0, patch=0, pre="", build=""), channel="stable", commit_id="a" * 64, snapshot_id="b" * 64, title="malicious", body="", changelog=[], is_draft=False, ) write_release(root, rec) result = _invoke(["release", "delete", malicious_tag, "--yes"], root) assert result.exit_code == 0 assert "\x1b[32m" not in result.output def test_delete_control_char_tag_stripped_in_dry_run(self, tmp_path: pathlib.Path) -> None: """Control characters in tag are stripped from dry-run text output.""" malicious_tag = "v1.0.0\r\ninjected" root, repo_id = _init_repo(tmp_path) rec = ReleaseRecord( repo_id=repo_id, release_id=fake_id("release"), tag=malicious_tag, semver=SemVerTag(major=1, minor=0, patch=0, pre="", build=""), channel="stable", commit_id="a" * 64, snapshot_id="b" * 64, title="ctrl", body="", changelog=[], is_draft=False, ) write_release(root, rec) result = _invoke(["release", "delete", malicious_tag, "--dry-run"], root) assert result.exit_code == 0 assert "\r" not in result.output def test_delete_ansi_tag_preserved_in_json(self, tmp_path: pathlib.Path) -> None: """ANSI in tag is NOT stripped from JSON output (raw data for agents).""" malicious_tag = "\x1b[31mv1.0.0\x1b[0m" root, repo_id = _init_repo(tmp_path) rec = ReleaseRecord( repo_id=repo_id, release_id=fake_id("release"), tag=malicious_tag, semver=SemVerTag(major=1, minor=0, patch=0, pre="", build=""), channel="stable", commit_id="a" * 64, snapshot_id="b" * 64, title="malicious", body="", changelog=[], is_draft=False, ) write_release(root, rec) result = _invoke(["release", "delete", malicious_tag, "--dry-run", "--json"], root) assert result.exit_code == 0 data = json.loads(_json_blob(result.output)) # JSON carries raw tag; sanitization only applies to human-readable text assert data["tag"] == malicious_tag def test_delete_remote_error_ansi_stripped(self, tmp_path: pathlib.Path) -> None: """ANSI in TransportError message is stripped from error output.""" from muse.core.transport import TransportError root, repo_id = _init_repo(tmp_path) _write_release(root, repo_id, "v1.0.0") mock_t = MagicMock() mock_t.delete_release_remote.side_effect = TransportError("\x1b[31mfailed\x1b[0m", 503) with patch("muse.cli.commands.release.make_transport", return_value=mock_t): with patch("muse.cli.commands.release.get_signing_identity", return_value="tok"): with patch("muse.cli.commands.release._resolve_remote_url", return_value="http://hub"): result = _invoke( ["release", "delete", "v1.0.0", "--yes", "--remote", "origin"], root ) assert result.exit_code == 5 assert "\x1b[31m" not in result.output def test_delete_not_found_message_sanitized(self, tmp_path: pathlib.Path) -> None: """ANSI in tag is stripped from 'not found' error message.""" root, _ = _init_repo(tmp_path) malicious_tag = "\x1b[31mv99.0.0\x1b[0m" result = _invoke(["release", "delete", malicious_tag, "--yes"], root) assert result.exit_code != 0 assert "\x1b[31m" not in result.output class TestReleaseDeleteStress: """Stress tests for ``muse release delete``.""" def test_delete_50_releases_sequential(self, tmp_path: pathlib.Path) -> None: """Add 50 releases then delete all; list must be empty.""" root, repo_id = _init_repo(tmp_path) for i in range(50): _write_release(root, repo_id, f"v1.{i}.0") assert len(list_releases(root, repo_id)) == 50 for i in range(50): r = _invoke(["release", "delete", f"v1.{i}.0", "--yes", "--json"], root) assert r.exit_code == 0, f"v1.{i}.0 failed: {r.output}" assert json.loads(_json_blob(r.output))["status"] == "deleted" assert list_releases(root, repo_id) == [] def test_delete_concurrent_different_tags(self, tmp_path: pathlib.Path) -> None: """Concurrent delete_release calls on distinct tags must not crash or corrupt.""" root, repo_id = _init_repo(tmp_path) recs = [_write_release(root, repo_id, f"v3.{i}.0") for i in range(20)] errors: list[str] = [] def _do_delete(rec_id: str) -> None: try: result = delete_release(root, repo_id, rec_id) assert result is True except Exception as exc: # noqa: BLE001 errors.append(str(exc)) threads = [threading.Thread(target=_do_delete, args=(r.release_id,)) for r in recs] for t in threads: t.start() for t in threads: t.join() assert not errors, f"Concurrent failures: {errors}" assert list_releases(root, repo_id) == [] def test_delete_dry_run_json_compact_50(self, tmp_path: pathlib.Path) -> None: """50 dry-run delete JSON outputs are all compact (no indent).""" root, repo_id = _init_repo(tmp_path) for i in range(50): _write_release(root, repo_id, f"v4.{i}.0") for i in range(50): r = _invoke( ["release", "delete", f"v4.{i}.0", "--dry-run", "--json"], root ) assert r.exit_code == 0, f"v4.{i}.0 failed: {r.output}" raw = _json_blob(r.output) assert "\n " not in raw