test_cmd_init.py
python
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
22 days ago
| 1 | """Comprehensive tests for ``muse init``. |
| 2 | |
| 3 | Coverage tiers: |
| 4 | - Unit: template generators, _copy_template, module constants |
| 5 | - CLI unit: argument validation (bad branch, bad domain, edge cases) |
| 6 | - Integration: every flag, file format, lifecycle scenario, file preservation |
| 7 | - End-to-end: muse init followed by subsequent muse commands |
| 8 | - Security: ANSI injection, symlink attacks, TOCTOU, corrupt inputs |
| 9 | - Stress: sequential, concurrent, large-scale, adversarial inputs |
| 10 | """ |
| 11 | from __future__ import annotations |
| 12 | |
| 13 | import json |
| 14 | import os |
| 15 | import pathlib |
| 16 | import threading |
| 17 | import tomllib |
| 18 | from collections.abc import Mapping |
| 19 | |
| 20 | import pytest |
| 21 | |
| 22 | from muse.core.paths import config_toml_path, head_path, muse_dir, objects_dir, ref_path, repo_json_path, tags_dir |
| 23 | from tests.cli_test_helper import CliRunner, InvokeResult |
| 24 | |
| 25 | runner = CliRunner() |
| 26 | |
| 27 | |
| 28 | def _init(repo: pathlib.Path, *extra_args: str) -> InvokeResult: |
| 29 | """Invoke ``muse init`` inside *repo* (created if needed).""" |
| 30 | from muse.cli.app import main as cli |
| 31 | |
| 32 | repo.mkdir(parents=True, exist_ok=True) |
| 33 | saved = os.getcwd() |
| 34 | try: |
| 35 | os.chdir(repo) |
| 36 | return runner.invoke(cli, ["init", *extra_args]) |
| 37 | finally: |
| 38 | os.chdir(saved) |
| 39 | |
| 40 | |
| 41 | # --------------------------------------------------------------------------- |
| 42 | # Unit — template generators |
| 43 | # --------------------------------------------------------------------------- |
| 44 | |
| 45 | |
| 46 | class TestMuseignoreTemplate: |
| 47 | def test_code_domain_has_pyc_pattern(self) -> None: |
| 48 | from muse.cli.commands.init import _museignore_template |
| 49 | |
| 50 | result = _museignore_template("code") |
| 51 | assert "*.pyc" in result |
| 52 | |
| 53 | def test_midi_domain_has_renders_pattern(self) -> None: |
| 54 | from muse.cli.commands.init import _museignore_template |
| 55 | |
| 56 | result = _museignore_template("midi") |
| 57 | assert "/renders/" in result |
| 58 | |
| 59 | def test_unknown_domain_produces_commented_stub(self) -> None: |
| 60 | from muse.cli.commands.init import _museignore_template |
| 61 | |
| 62 | result = _museignore_template("genomics") |
| 63 | assert "[domain.genomics]" in result |
| 64 | assert "# patterns" in result |
| 65 | |
| 66 | def test_result_is_valid_toml(self) -> None: |
| 67 | from muse.cli.commands.init import _museignore_template |
| 68 | |
| 69 | for domain in ("code", "midi", "genomics"): |
| 70 | parsed = tomllib.loads(_museignore_template(domain)) |
| 71 | assert isinstance(parsed, dict) |
| 72 | |
| 73 | def test_global_section_present(self) -> None: |
| 74 | from muse.cli.commands.init import _museignore_template |
| 75 | |
| 76 | parsed = tomllib.loads(_museignore_template("code")) |
| 77 | assert "global" in parsed |
| 78 | assert isinstance(parsed["global"]["patterns"], list) |
| 79 | assert ".DS_Store" in parsed["global"]["patterns"] |
| 80 | |
| 81 | def test_tls_secrets_not_in_template_patterns(self) -> None: |
| 82 | """*.key / *.pem / *.crt must NOT appear in template patterns. |
| 83 | |
| 84 | The engine's built-in blocklist already covers *.key and *.pem. |
| 85 | *.crt (public cert) is intentionally trackable. Duplicating these in |
| 86 | the user-facing template is misleading and was removed. |
| 87 | """ |
| 88 | from muse.cli.commands.init import _museignore_template |
| 89 | |
| 90 | parsed = tomllib.loads(_museignore_template("code")) |
| 91 | code_patterns = parsed.get("domain", {}).get("code", {}).get("patterns", []) |
| 92 | assert "*.key" not in code_patterns |
| 93 | assert "*.crt" not in code_patterns |
| 94 | assert "*.pem" not in code_patterns |
| 95 | |
| 96 | def test_force_track_section_documented_in_template(self) -> None: |
| 97 | """Template must document [force_track] so devs know how to whitelist certs.""" |
| 98 | from muse.cli.commands.init import _museignore_template |
| 99 | |
| 100 | result = _museignore_template("code") |
| 101 | assert "force_track" in result |
| 102 | |
| 103 | def test_template_is_valid_toml(self) -> None: |
| 104 | """Template must parse as valid TOML regardless of domain.""" |
| 105 | from muse.cli.commands.init import _museignore_template |
| 106 | |
| 107 | parsed = tomllib.loads(_museignore_template("code")) |
| 108 | code_patterns = parsed.get("domain", {}).get("code", {}).get("patterns", []) |
| 109 | assert isinstance(code_patterns, list) |
| 110 | |
| 111 | |
| 112 | class TestMuseattributesTemplate: |
| 113 | def test_domain_embedded_in_meta(self) -> None: |
| 114 | from muse.cli.commands.init import _museattributes_template |
| 115 | |
| 116 | result = _museattributes_template("code") |
| 117 | parsed = tomllib.loads(result) |
| 118 | assert parsed["meta"]["domain"] == "code" |
| 119 | |
| 120 | def test_custom_domain_embedded(self) -> None: |
| 121 | from muse.cli.commands.init import _museattributes_template |
| 122 | |
| 123 | result = _museattributes_template("genomics") |
| 124 | parsed = tomllib.loads(result) |
| 125 | assert parsed["meta"]["domain"] == "genomics" |
| 126 | |
| 127 | def test_result_is_valid_toml(self) -> None: |
| 128 | from muse.cli.commands.init import _museattributes_template |
| 129 | |
| 130 | for domain in ("code", "midi", "genomics"): |
| 131 | parsed = tomllib.loads(_museattributes_template(domain)) |
| 132 | assert isinstance(parsed, dict) |
| 133 | |
| 134 | |
| 135 | # --------------------------------------------------------------------------- |
| 136 | # CLI unit — argument validation |
| 137 | # --------------------------------------------------------------------------- |
| 138 | |
| 139 | |
| 140 | class TestArgValidation: |
| 141 | def test_invalid_branch_name_rejected(self, tmp_path: pathlib.Path) -> None: |
| 142 | # Null byte is explicitly forbidden by validate_branch_name |
| 143 | result = _init(tmp_path, "--default-branch", "branch\x00null") |
| 144 | assert result.exit_code != 0 |
| 145 | |
| 146 | def test_invalid_branch_name_json_error(self, tmp_path: pathlib.Path) -> None: |
| 147 | # Consecutive dots are forbidden (path traversal prevention) |
| 148 | result = _init(tmp_path, "--default-branch", "../traversal", "--json") |
| 149 | assert result.exit_code != 0 |
| 150 | data = json.loads(result.output) |
| 151 | assert "error" in data |
| 152 | |
| 153 | def test_invalid_domain_name_rejected(self, tmp_path: pathlib.Path) -> None: |
| 154 | # Domain names must match [a-z][a-z0-9_-]* — spaces and ! are banned |
| 155 | result = _init(tmp_path, "--domain", "Bad-Domain!") |
| 156 | assert result.exit_code != 0 |
| 157 | |
| 158 | def test_invalid_domain_name_json_error(self, tmp_path: pathlib.Path) -> None: |
| 159 | result = _init(tmp_path, "--domain", "Bad-Domain!", "--json") |
| 160 | assert result.exit_code != 0 |
| 161 | data = json.loads(result.output) |
| 162 | assert "error" in data |
| 163 | |
| 164 | def test_missing_template_dir_rejected(self, tmp_path: pathlib.Path) -> None: |
| 165 | result = _init(tmp_path, "--template", str(tmp_path / "nonexistent")) |
| 166 | assert result.exit_code != 0 |
| 167 | |
| 168 | def test_missing_template_dir_json_error(self, tmp_path: pathlib.Path) -> None: |
| 169 | result = _init(tmp_path, "--template", str(tmp_path / "nonexistent"), "--json") |
| 170 | assert result.exit_code != 0 |
| 171 | data = json.loads(result.output) |
| 172 | assert "error" in data |
| 173 | |
| 174 | def test_template_pointing_to_file_rejected(self, tmp_path: pathlib.Path) -> None: |
| 175 | f = tmp_path / "not_a_dir.txt" |
| 176 | f.write_text("hello") |
| 177 | result = _init(tmp_path / "repo", "--template", str(f)) |
| 178 | assert result.exit_code != 0 |
| 179 | |
| 180 | def test_reinit_without_force_rejected(self, tmp_path: pathlib.Path) -> None: |
| 181 | _init(tmp_path) |
| 182 | result = _init(tmp_path) |
| 183 | assert result.exit_code != 0 |
| 184 | |
| 185 | def test_reinit_without_force_json_error(self, tmp_path: pathlib.Path) -> None: |
| 186 | _init(tmp_path) |
| 187 | result = _init(tmp_path, "--json") |
| 188 | assert result.exit_code != 0 |
| 189 | data = json.loads(result.output) |
| 190 | assert "error" in data |
| 191 | |
| 192 | |
| 193 | # --------------------------------------------------------------------------- |
| 194 | # Integration — filesystem layout |
| 195 | # --------------------------------------------------------------------------- |
| 196 | |
| 197 | |
| 198 | class TestFilesystemLayout: |
| 199 | def test_muse_dir_created(self, tmp_path: pathlib.Path) -> None: |
| 200 | assert _init(tmp_path).exit_code == 0 |
| 201 | assert (muse_dir(tmp_path)).is_dir() |
| 202 | |
| 203 | def test_required_subdirs_exist(self, tmp_path: pathlib.Path) -> None: |
| 204 | _init(tmp_path) |
| 205 | muse = muse_dir(tmp_path) |
| 206 | for subdir in ("objects", "refs", "refs/heads"): |
| 207 | assert (muse / subdir).is_dir(), f"missing: {subdir}" |
| 208 | |
| 209 | def test_repo_json_created(self, tmp_path: pathlib.Path) -> None: |
| 210 | _init(tmp_path) |
| 211 | assert (repo_json_path(tmp_path)).exists() |
| 212 | |
| 213 | def test_repo_json_fields(self, tmp_path: pathlib.Path) -> None: |
| 214 | _init(tmp_path) |
| 215 | data = json.loads((repo_json_path(tmp_path)).read_text()) |
| 216 | assert "repo_id" in data |
| 217 | assert "schema_version" in data |
| 218 | assert "created_at" in data |
| 219 | assert "domain" in data |
| 220 | assert data["domain"] == "code" |
| 221 | |
| 222 | def test_repo_json_repo_id_is_sha256(self, tmp_path: pathlib.Path) -> None: |
| 223 | _init(tmp_path) |
| 224 | raw = json.loads((repo_json_path(tmp_path)).read_text())["repo_id"] |
| 225 | assert raw.startswith("sha256:"), f"expected sha256: prefix, got {raw!r}" |
| 226 | assert len(raw) == 71 |
| 227 | |
| 228 | def test_head_points_to_default_branch(self, tmp_path: pathlib.Path) -> None: |
| 229 | _init(tmp_path) |
| 230 | head = (head_path(tmp_path)).read_text() |
| 231 | assert "main" in head |
| 232 | |
| 233 | def test_custom_default_branch_in_head(self, tmp_path: pathlib.Path) -> None: |
| 234 | _init(tmp_path, "--default-branch", "dev") |
| 235 | head = (head_path(tmp_path)).read_text() |
| 236 | assert "dev" in head |
| 237 | assert (ref_path(tmp_path, "dev")).exists() |
| 238 | |
| 239 | def test_config_toml_created(self, tmp_path: pathlib.Path) -> None: |
| 240 | _init(tmp_path) |
| 241 | assert (config_toml_path(tmp_path)).exists() |
| 242 | |
| 243 | def test_museignore_created(self, tmp_path: pathlib.Path) -> None: |
| 244 | _init(tmp_path) |
| 245 | assert (tmp_path / ".museignore").exists() |
| 246 | |
| 247 | def test_museattributes_created(self, tmp_path: pathlib.Path) -> None: |
| 248 | _init(tmp_path) |
| 249 | assert (tmp_path / ".museattributes").exists() |
| 250 | |
| 251 | def test_museattributes_has_correct_domain(self, tmp_path: pathlib.Path) -> None: |
| 252 | _init(tmp_path, "--domain", "code") |
| 253 | parsed = tomllib.loads((tmp_path / ".museattributes").read_text()) |
| 254 | assert parsed["meta"]["domain"] == "code" |
| 255 | |
| 256 | def test_museignore_valid_toml(self, tmp_path: pathlib.Path) -> None: |
| 257 | _init(tmp_path) |
| 258 | parsed = tomllib.loads((tmp_path / ".museignore").read_text()) |
| 259 | assert isinstance(parsed, dict) |
| 260 | |
| 261 | |
| 262 | class TestBareRepo: |
| 263 | def test_bare_creates_muse_dir(self, tmp_path: pathlib.Path) -> None: |
| 264 | assert _init(tmp_path, "--bare").exit_code == 0 |
| 265 | assert (muse_dir(tmp_path)).is_dir() |
| 266 | |
| 267 | def test_bare_does_not_create_museignore(self, tmp_path: pathlib.Path) -> None: |
| 268 | _init(tmp_path, "--bare") |
| 269 | assert not (tmp_path / ".museignore").exists() |
| 270 | |
| 271 | def test_bare_does_not_create_museattributes(self, tmp_path: pathlib.Path) -> None: |
| 272 | _init(tmp_path, "--bare") |
| 273 | assert not (tmp_path / ".museattributes").exists() |
| 274 | |
| 275 | def test_bare_repo_json_has_bare_flag(self, tmp_path: pathlib.Path) -> None: |
| 276 | _init(tmp_path, "--bare") |
| 277 | data = json.loads((repo_json_path(tmp_path)).read_text()) |
| 278 | assert data.get("bare") is True |
| 279 | |
| 280 | def test_non_bare_repo_json_has_bare_false(self, tmp_path: pathlib.Path) -> None: |
| 281 | _init(tmp_path) |
| 282 | data = json.loads((repo_json_path(tmp_path)).read_text()) |
| 283 | assert data["bare"] is False |
| 284 | |
| 285 | |
| 286 | class TestForceReinit: |
| 287 | def test_force_reinit_succeeds(self, tmp_path: pathlib.Path) -> None: |
| 288 | _init(tmp_path) |
| 289 | result = _init(tmp_path, "--force") |
| 290 | assert result.exit_code == 0 |
| 291 | |
| 292 | def test_force_preserves_repo_id(self, tmp_path: pathlib.Path) -> None: |
| 293 | _init(tmp_path) |
| 294 | original_id = json.loads( |
| 295 | (repo_json_path(tmp_path)).read_text() |
| 296 | )["repo_id"] |
| 297 | _init(tmp_path, "--force") |
| 298 | new_id = json.loads( |
| 299 | (repo_json_path(tmp_path)).read_text() |
| 300 | )["repo_id"] |
| 301 | assert original_id == new_id |
| 302 | |
| 303 | def test_force_does_not_overwrite_museignore(self, tmp_path: pathlib.Path) -> None: |
| 304 | _init(tmp_path) |
| 305 | custom = '[global]\npatterns = ["custom.txt"]\n' |
| 306 | (tmp_path / ".museignore").write_text(custom) |
| 307 | _init(tmp_path, "--force") |
| 308 | assert (tmp_path / ".museignore").read_text() == custom |
| 309 | |
| 310 | def test_force_does_not_overwrite_museattributes(self, tmp_path: pathlib.Path) -> None: |
| 311 | _init(tmp_path) |
| 312 | custom = '[meta]\ndomain = "custom"\n' |
| 313 | (tmp_path / ".museattributes").write_text(custom) |
| 314 | _init(tmp_path, "--force") |
| 315 | assert (tmp_path / ".museattributes").read_text() == custom |
| 316 | |
| 317 | def test_force_on_fresh_dir_works(self, tmp_path: pathlib.Path) -> None: |
| 318 | # --force on a directory that was never a repo should work as fresh init |
| 319 | result = _init(tmp_path, "--force") |
| 320 | assert result.exit_code == 0 |
| 321 | assert (repo_json_path(tmp_path)).exists() |
| 322 | |
| 323 | |
| 324 | class TestTemplate: |
| 325 | def test_template_files_copied(self, tmp_path: pathlib.Path) -> None: |
| 326 | tmpl = tmp_path / "tmpl" |
| 327 | tmpl.mkdir() |
| 328 | (tmpl / "README.md").write_text("# hello") |
| 329 | (tmpl / "scripts").mkdir() |
| 330 | (tmpl / "scripts" / "run.sh").write_text("#!/bin/sh\necho hi") |
| 331 | |
| 332 | repo = tmp_path / "repo" |
| 333 | repo.mkdir() |
| 334 | _init(repo, "--template", str(tmpl)) |
| 335 | |
| 336 | assert (repo / "README.md").read_text() == "# hello" |
| 337 | assert (repo / "scripts" / "run.sh").exists() |
| 338 | |
| 339 | def test_template_does_not_overwrite_muse_dir(self, tmp_path: pathlib.Path) -> None: |
| 340 | tmpl = tmp_path / "tmpl" |
| 341 | tmpl.mkdir() |
| 342 | # A template that tries to write a .muse directory |
| 343 | (muse_dir(tmpl)).mkdir() |
| 344 | (muse_dir(tmpl) / "injected").write_text("bad") |
| 345 | |
| 346 | repo = tmp_path / "repo" |
| 347 | repo.mkdir() |
| 348 | _init(repo, "--template", str(tmpl)) |
| 349 | |
| 350 | # The injected file may land but should not corrupt the real repo.json |
| 351 | assert (repo_json_path(repo)).exists() |
| 352 | |
| 353 | def test_template_ignored_for_bare_repo(self, tmp_path: pathlib.Path) -> None: |
| 354 | tmpl = tmp_path / "tmpl" |
| 355 | tmpl.mkdir() |
| 356 | (tmpl / "README.md").write_text("hello") |
| 357 | |
| 358 | repo = tmp_path / "repo" |
| 359 | repo.mkdir() |
| 360 | _init(repo, "--bare", "--template", str(tmpl)) |
| 361 | |
| 362 | # --bare suppresses template copy |
| 363 | assert not (repo / "README.md").exists() |
| 364 | |
| 365 | |
| 366 | # --------------------------------------------------------------------------- |
| 367 | # JSON output — agent UX |
| 368 | # --------------------------------------------------------------------------- |
| 369 | |
| 370 | |
| 371 | class TestJsonOutput: |
| 372 | def test_json_exit_zero(self, tmp_path: pathlib.Path) -> None: |
| 373 | result = _init(tmp_path, "--json") |
| 374 | assert result.exit_code == 0 |
| 375 | |
| 376 | def test_json_is_valid(self, tmp_path: pathlib.Path) -> None: |
| 377 | result = _init(tmp_path, "--json") |
| 378 | data = json.loads(result.output) |
| 379 | assert isinstance(data, dict) |
| 380 | |
| 381 | def test_json_has_required_fields(self, tmp_path: pathlib.Path) -> None: |
| 382 | result = _init(tmp_path, "--json") |
| 383 | data = json.loads(result.output) |
| 384 | for field in ("repo_id", "branch", "domain", "path", "reinitialized", "bare"): |
| 385 | assert field in data, f"missing field: {field}" |
| 386 | |
| 387 | def test_json_repo_id_matches_repo_json(self, tmp_path: pathlib.Path) -> None: |
| 388 | result = _init(tmp_path, "--json") |
| 389 | data = json.loads(result.output) |
| 390 | stored = json.loads((repo_json_path(tmp_path)).read_text())["repo_id"] |
| 391 | assert data["repo_id"] == stored |
| 392 | |
| 393 | def test_json_branch_matches_default_branch(self, tmp_path: pathlib.Path) -> None: |
| 394 | result = _init(tmp_path, "--default-branch", "dev", "--json") |
| 395 | data = json.loads(result.output) |
| 396 | assert data["branch"] == "dev" |
| 397 | |
| 398 | def test_json_domain_matches_domain_arg(self, tmp_path: pathlib.Path) -> None: |
| 399 | result = _init(tmp_path, "--domain", "code", "--json") |
| 400 | data = json.loads(result.output) |
| 401 | assert data["domain"] == "code" |
| 402 | |
| 403 | def test_json_reinitialized_false_on_fresh(self, tmp_path: pathlib.Path) -> None: |
| 404 | result = _init(tmp_path, "--json") |
| 405 | assert json.loads(result.output)["reinitialized"] is False |
| 406 | |
| 407 | def test_json_reinitialized_true_on_force(self, tmp_path: pathlib.Path) -> None: |
| 408 | _init(tmp_path) |
| 409 | result = _init(tmp_path, "--force", "--json") |
| 410 | assert json.loads(result.output)["reinitialized"] is True |
| 411 | |
| 412 | def test_json_force_preserves_repo_id(self, tmp_path: pathlib.Path) -> None: |
| 413 | first = json.loads(_init(tmp_path, "--json").output)["repo_id"] |
| 414 | second = json.loads(_init(tmp_path, "--force", "--json").output)["repo_id"] |
| 415 | assert first == second |
| 416 | |
| 417 | def test_json_bare_flag_reflects_bare_arg(self, tmp_path: pathlib.Path) -> None: |
| 418 | data = json.loads(_init(tmp_path, "--bare", "--json").output) |
| 419 | assert data["bare"] is True |
| 420 | |
| 421 | def test_json_non_bare_flag(self, tmp_path: pathlib.Path) -> None: |
| 422 | data = json.loads(_init(tmp_path, "--json").output) |
| 423 | assert data["bare"] is False |
| 424 | |
| 425 | def test_json_path_is_muse_dir(self, tmp_path: pathlib.Path) -> None: |
| 426 | data = json.loads(_init(tmp_path, "--json").output) |
| 427 | assert data["path"].endswith(".muse") |
| 428 | assert pathlib.Path(data["path"]).is_dir() |
| 429 | |
| 430 | def test_json_no_human_text_on_stdout(self, tmp_path: pathlib.Path) -> None: |
| 431 | result = _init(tmp_path, "--json") |
| 432 | # Must be parseable JSON with no extra prose |
| 433 | data = json.loads(result.output.strip()) |
| 434 | assert isinstance(data, dict) |
| 435 | |
| 436 | |
| 437 | # --------------------------------------------------------------------------- |
| 438 | # Security |
| 439 | # --------------------------------------------------------------------------- |
| 440 | |
| 441 | |
| 442 | class TestSecurity: |
| 443 | def test_ansi_in_branch_error_not_on_stdout(self, tmp_path: pathlib.Path) -> None: |
| 444 | """Crafted branch names must not inject ANSI into output.""" |
| 445 | malicious = "\x1b[31mmalicious\x1b[0m" |
| 446 | result = _init(tmp_path, "--default-branch", malicious) |
| 447 | assert "\x1b" not in result.output |
| 448 | |
| 449 | def test_ansi_in_domain_error_not_on_stdout(self, tmp_path: pathlib.Path) -> None: |
| 450 | # Domain validator rejects uppercase/special chars including ANSI sequences |
| 451 | malicious = "EVIL\x1b[31mred\x1b[0m" |
| 452 | result = _init(tmp_path, "--domain", malicious) |
| 453 | assert "\x1b" not in result.output |
| 454 | |
| 455 | def test_control_chars_in_branch_rejected(self, tmp_path: pathlib.Path) -> None: |
| 456 | result = _init(tmp_path, "--default-branch", "branch\x00null") |
| 457 | assert result.exit_code != 0 |
| 458 | |
| 459 | def test_json_errors_are_clean_json(self, tmp_path: pathlib.Path) -> None: |
| 460 | """Every error path with --json must emit valid JSON, not a traceback.""" |
| 461 | cases = [ |
| 462 | ["--domain", "Bad-Domain!", "--json"], |
| 463 | ["--default-branch", "../traversal", "--json"], |
| 464 | ["--template", str(tmp_path / "missing"), "--json"], |
| 465 | ] |
| 466 | for extra in cases: |
| 467 | result = _init(tmp_path / "fresh", *extra) |
| 468 | data = json.loads(result.output) |
| 469 | assert "error" in data, f"missing 'error' key for args: {extra}" |
| 470 | |
| 471 | def test_reinit_without_force_json_has_error(self, tmp_path: pathlib.Path) -> None: |
| 472 | _init(tmp_path) |
| 473 | result = _init(tmp_path, "--json") |
| 474 | assert result.exit_code != 0 |
| 475 | data = json.loads(result.output) |
| 476 | assert "error" in data |
| 477 | |
| 478 | def test_template_symlink_skipped(self, tmp_path: pathlib.Path) -> None: |
| 479 | """Symlinks inside a template directory are silently skipped. |
| 480 | |
| 481 | A template with a symlink to ``/etc/passwd`` (or any path outside the |
| 482 | template root) must not be followed. The symlink is dropped and the |
| 483 | rest of the template is copied normally. |
| 484 | """ |
| 485 | tmpl = tmp_path / "tmpl" |
| 486 | tmpl.mkdir() |
| 487 | (tmpl / "legit.txt").write_text("ok") |
| 488 | (tmpl / "malicious_link").symlink_to("/etc/passwd") |
| 489 | |
| 490 | repo = tmp_path / "repo" |
| 491 | repo.mkdir() |
| 492 | result = _init(repo, "--template", str(tmpl)) |
| 493 | assert result.exit_code == 0 |
| 494 | assert (repo / "legit.txt").exists() |
| 495 | assert not (repo / "malicious_link").exists() |
| 496 | |
| 497 | def test_template_muse_dir_skipped(self, tmp_path: pathlib.Path) -> None: |
| 498 | """A ``.muse/`` directory inside a template is never copied. |
| 499 | |
| 500 | Without this guard, a malicious template could overwrite the freshly |
| 501 | created VCS state directory with an attacker-controlled repo_id. |
| 502 | """ |
| 503 | tmpl = tmp_path / "tmpl" |
| 504 | tmpl.mkdir() |
| 505 | malicious_muse = muse_dir(tmpl) |
| 506 | malicious_muse.mkdir() |
| 507 | (malicious_muse / "repo.json").write_text('{"repo_id": "attacker-id"}') |
| 508 | (tmpl / "safe.txt").write_text("safe") |
| 509 | |
| 510 | repo = tmp_path / "repo" |
| 511 | repo.mkdir() |
| 512 | result = _init(repo, "--template", str(tmpl), "--json") |
| 513 | assert result.exit_code == 0 |
| 514 | |
| 515 | data = json.loads(result.output) |
| 516 | # The repo_id must come from init, not from the malicious template |
| 517 | real_repo_id = (repo_json_path(repo)) |
| 518 | import json as _json |
| 519 | stored = _json.loads(real_repo_id.read_text()) |
| 520 | assert stored["repo_id"] == data["repo_id"] |
| 521 | assert stored["repo_id"] != "attacker-id" |
| 522 | |
| 523 | def test_template_symlink_path_rejected(self, tmp_path: pathlib.Path) -> None: |
| 524 | """A ``--template`` path that is itself a symlink is rejected.""" |
| 525 | real_dir = tmp_path / "real" |
| 526 | real_dir.mkdir() |
| 527 | link = tmp_path / "link" |
| 528 | link.symlink_to(real_dir) |
| 529 | |
| 530 | repo = tmp_path / "repo" |
| 531 | repo.mkdir() |
| 532 | result = _init(repo, "--template", str(link)) |
| 533 | assert result.exit_code != 0 |
| 534 | |
| 535 | def test_tags_dir_created_at_init(self, tmp_path: pathlib.Path) -> None: |
| 536 | """init must create ``.muse/tags/`` so muse tag works immediately.""" |
| 537 | result = _init(tmp_path) |
| 538 | assert result.exit_code == 0 |
| 539 | assert (tags_dir(tmp_path)).is_dir() |
| 540 | |
| 541 | def test_schema_version_is_integer(self, tmp_path: pathlib.Path) -> None: |
| 542 | """schema_version in repo.json must be an integer, not a package version string.""" |
| 543 | _init(tmp_path) |
| 544 | data = json.loads((repo_json_path(tmp_path)).read_text()) |
| 545 | assert isinstance(data["schema_version"], int), ( |
| 546 | f"schema_version should be int, got {type(data['schema_version'])}" |
| 547 | ) |
| 548 | |
| 549 | def test_json_output_includes_schema_version(self, tmp_path: pathlib.Path) -> None: |
| 550 | """--json output must include schema_version as an integer.""" |
| 551 | result = _init(tmp_path, "--json") |
| 552 | data = json.loads(result.output) |
| 553 | assert "schema_version" in data |
| 554 | assert isinstance(data["schema_version"], int) |
| 555 | |
| 556 | def test_muse_version_written_to_repo_json(self, tmp_path: pathlib.Path) -> None: |
| 557 | """repo.json must record the Muse version that created the repo.""" |
| 558 | from muse import __version__ |
| 559 | _init(tmp_path) |
| 560 | data = json.loads((repo_json_path(tmp_path)).read_text()) |
| 561 | assert "muse_version" in data |
| 562 | assert data["muse_version"] == __version__ |
| 563 | |
| 564 | def test_bare_always_written_to_repo_json(self, tmp_path: pathlib.Path) -> None: |
| 565 | """bare is always present in repo.json (False for normal repos).""" |
| 566 | _init(tmp_path) |
| 567 | data = json.loads((repo_json_path(tmp_path)).read_text()) |
| 568 | assert "bare" in data |
| 569 | assert data["bare"] is False |
| 570 | |
| 571 | |
| 572 | # --------------------------------------------------------------------------- |
| 573 | # Stress |
| 574 | # --------------------------------------------------------------------------- |
| 575 | |
| 576 | |
| 577 | class TestStress: |
| 578 | def test_rapid_sequential_inits(self, tmp_path: pathlib.Path) -> None: |
| 579 | """50 sequential inits in different directories must all succeed.""" |
| 580 | for i in range(50): |
| 581 | repo = tmp_path / f"repo_{i:03d}" |
| 582 | repo.mkdir() |
| 583 | result = _init(repo, "--json") |
| 584 | assert result.exit_code == 0, f"init failed for repo_{i:03d}" |
| 585 | data = json.loads(result.output) |
| 586 | assert "repo_id" in data |
| 587 | |
| 588 | def test_large_template_dir(self, tmp_path: pathlib.Path) -> None: |
| 589 | """Template with 200 files copies without error.""" |
| 590 | tmpl = tmp_path / "big_tmpl" |
| 591 | tmpl.mkdir() |
| 592 | for i in range(200): |
| 593 | (tmpl / f"file_{i:03d}.txt").write_text(f"content {i}") |
| 594 | |
| 595 | repo = tmp_path / "repo" |
| 596 | repo.mkdir() |
| 597 | result = _init(repo, "--template", str(tmpl)) |
| 598 | assert result.exit_code == 0 |
| 599 | assert (repo / "file_000.txt").exists() |
| 600 | assert (repo / "file_199.txt").exists() |
| 601 | |
| 602 | def test_reinit_cycle_preserves_repo_id(self, tmp_path: pathlib.Path) -> None: |
| 603 | """20 successive --force inits must all return the same repo_id.""" |
| 604 | first = json.loads(_init(tmp_path, "--json").output)["repo_id"] |
| 605 | for _ in range(20): |
| 606 | rid = json.loads(_init(tmp_path, "--force", "--json").output)["repo_id"] |
| 607 | assert rid == first, "repo_id changed across reinit" |
| 608 | |
| 609 | def test_all_domains_produce_valid_ignore(self) -> None: |
| 610 | """Every domain in _MUSEIGNORE_DOMAIN_BLOCKS produces valid TOML.""" |
| 611 | from muse.cli.commands.init import ( |
| 612 | _MUSEIGNORE_DOMAIN_BLOCKS, |
| 613 | _museignore_template, |
| 614 | ) |
| 615 | |
| 616 | for domain in list(_MUSEIGNORE_DOMAIN_BLOCKS) + ["custom_domain"]: |
| 617 | result = _museignore_template(domain) |
| 618 | parsed = tomllib.loads(result) |
| 619 | assert isinstance(parsed, dict), f"invalid TOML for domain {domain!r}" |
| 620 | |
| 621 | |
| 622 | # --------------------------------------------------------------------------- |
| 623 | # Unit — module constants |
| 624 | # --------------------------------------------------------------------------- |
| 625 | |
| 626 | |
| 627 | class TestConstants: |
| 628 | """Structural invariants on module-level constants in init.py.""" |
| 629 | |
| 630 | def test_repo_schema_version_is_positive_int(self) -> None: |
| 631 | from muse.cli.commands.init import _REPO_SCHEMA_VERSION |
| 632 | |
| 633 | assert isinstance(_REPO_SCHEMA_VERSION, int) |
| 634 | assert _REPO_SCHEMA_VERSION >= 1 |
| 635 | |
| 636 | def test_default_config_is_valid_toml(self) -> None: |
| 637 | from muse.cli.commands.init import _DEFAULT_CONFIG |
| 638 | |
| 639 | parsed = tomllib.loads(_DEFAULT_CONFIG) |
| 640 | assert isinstance(parsed, dict) |
| 641 | |
| 642 | def test_default_config_has_user_section(self) -> None: |
| 643 | from muse.cli.commands.init import _DEFAULT_CONFIG |
| 644 | |
| 645 | parsed = tomllib.loads(_DEFAULT_CONFIG) |
| 646 | assert "user" in parsed |
| 647 | |
| 648 | def test_default_config_has_remotes_section(self) -> None: |
| 649 | from muse.cli.commands.init import _DEFAULT_CONFIG |
| 650 | |
| 651 | parsed = tomllib.loads(_DEFAULT_CONFIG) |
| 652 | assert "remotes" in parsed |
| 653 | |
| 654 | def test_bare_config_is_valid_toml(self) -> None: |
| 655 | from muse.cli.commands.init import _BARE_CONFIG |
| 656 | |
| 657 | parsed = tomllib.loads(_BARE_CONFIG) |
| 658 | assert isinstance(parsed, dict) |
| 659 | |
| 660 | def test_bare_config_has_core_bare_true(self) -> None: |
| 661 | from muse.cli.commands.init import _BARE_CONFIG |
| 662 | |
| 663 | parsed = tomllib.loads(_BARE_CONFIG) |
| 664 | assert parsed.get("core", {}).get("bare") is True |
| 665 | |
| 666 | def test_init_subdirs_is_superset_of_critical_muse_dirs(self) -> None: |
| 667 | """Every directory in _CRITICAL_MUSE_DIRS must appear in _INIT_SUBDIRS. |
| 668 | |
| 669 | If this fails, require_repo()'s _verify_muse_dir_integrity() will |
| 670 | never see a freshly-init'd critical directory and cannot protect it. |
| 671 | """ |
| 672 | from muse.cli.commands.init import _INIT_SUBDIRS |
| 673 | from muse.core.repo import _CRITICAL_MUSE_DIRS |
| 674 | |
| 675 | init_set = set(_INIT_SUBDIRS) |
| 676 | for critical in _CRITICAL_MUSE_DIRS: |
| 677 | assert critical in init_set, ( |
| 678 | f".muse/{critical}/ is in _CRITICAL_MUSE_DIRS " |
| 679 | f"but missing from _INIT_SUBDIRS — init will not create it" |
| 680 | ) |
| 681 | |
| 682 | def test_init_subdirs_contains_no_duplicates(self) -> None: |
| 683 | from muse.cli.commands.init import _INIT_SUBDIRS |
| 684 | |
| 685 | assert len(_INIT_SUBDIRS) == len(set(_INIT_SUBDIRS)) |
| 686 | |
| 687 | def test_init_subdirs_no_absolute_paths(self) -> None: |
| 688 | from muse.cli.commands.init import _INIT_SUBDIRS |
| 689 | |
| 690 | for s in _INIT_SUBDIRS: |
| 691 | assert not s.startswith("/"), f"{s!r} is absolute — must be relative" |
| 692 | |
| 693 | def test_museignore_global_patterns_is_list(self) -> None: |
| 694 | from muse.cli.commands.init import _MUSEIGNORE_GLOBAL |
| 695 | |
| 696 | parsed = tomllib.loads(_MUSEIGNORE_GLOBAL) |
| 697 | assert isinstance(parsed["global"]["patterns"], list) |
| 698 | assert len(parsed["global"]["patterns"]) > 0 |
| 699 | |
| 700 | |
| 701 | # --------------------------------------------------------------------------- |
| 702 | # Unit — _copy_template directly |
| 703 | # --------------------------------------------------------------------------- |
| 704 | |
| 705 | |
| 706 | class TestCopyTemplate: |
| 707 | """Direct unit tests for _copy_template — not mediated by the CLI.""" |
| 708 | |
| 709 | def test_empty_template_dir_is_a_no_op(self, tmp_path: pathlib.Path) -> None: |
| 710 | from muse.cli.commands.init import _copy_template |
| 711 | |
| 712 | src = tmp_path / "src" |
| 713 | dst = tmp_path / "dst" |
| 714 | src.mkdir() |
| 715 | dst.mkdir() |
| 716 | _copy_template(src, dst, []) |
| 717 | assert list(dst.iterdir()) == [] |
| 718 | |
| 719 | def test_files_are_copied(self, tmp_path: pathlib.Path) -> None: |
| 720 | from muse.cli.commands.init import _copy_template |
| 721 | |
| 722 | src = tmp_path / "src" |
| 723 | dst = tmp_path / "dst" |
| 724 | src.mkdir() |
| 725 | dst.mkdir() |
| 726 | (src / "hello.txt").write_text("hi") |
| 727 | _copy_template(src, dst, []) |
| 728 | assert (dst / "hello.txt").read_text() == "hi" |
| 729 | |
| 730 | def test_subdirectory_is_copied_recursively(self, tmp_path: pathlib.Path) -> None: |
| 731 | from muse.cli.commands.init import _copy_template |
| 732 | |
| 733 | src = tmp_path / "src" |
| 734 | dst = tmp_path / "dst" |
| 735 | src.mkdir() |
| 736 | dst.mkdir() |
| 737 | (src / "sub").mkdir() |
| 738 | (src / "sub" / "nested.txt").write_text("deep") |
| 739 | _copy_template(src, dst, []) |
| 740 | assert (dst / "sub" / "nested.txt").read_text() == "deep" |
| 741 | |
| 742 | def test_deeply_nested_structure_copied(self, tmp_path: pathlib.Path) -> None: |
| 743 | from muse.cli.commands.init import _copy_template |
| 744 | |
| 745 | src = tmp_path / "src" |
| 746 | dst = tmp_path / "dst" |
| 747 | src.mkdir() |
| 748 | dst.mkdir() |
| 749 | deep = src / "a" / "b" / "c" |
| 750 | deep.mkdir(parents=True) |
| 751 | (deep / "file.txt").write_text("leaf") |
| 752 | _copy_template(src, dst, []) |
| 753 | assert (dst / "a" / "b" / "c" / "file.txt").read_text() == "leaf" |
| 754 | |
| 755 | def test_symlinks_are_skipped(self, tmp_path: pathlib.Path) -> None: |
| 756 | from muse.cli.commands.init import _copy_template |
| 757 | |
| 758 | src = tmp_path / "src" |
| 759 | dst = tmp_path / "dst" |
| 760 | src.mkdir() |
| 761 | dst.mkdir() |
| 762 | (src / "legit.txt").write_text("ok") |
| 763 | (src / "malicious").symlink_to("/etc/passwd") |
| 764 | _copy_template(src, dst, []) |
| 765 | assert (dst / "legit.txt").exists() |
| 766 | assert not (dst / "malicious").exists() |
| 767 | |
| 768 | def test_all_symlinks_dir_copies_nothing(self, tmp_path: pathlib.Path) -> None: |
| 769 | from muse.cli.commands.init import _copy_template |
| 770 | |
| 771 | src = tmp_path / "src" |
| 772 | dst = tmp_path / "dst" |
| 773 | src.mkdir() |
| 774 | dst.mkdir() |
| 775 | for i in range(5): |
| 776 | (src / f"link{i}").symlink_to("/etc/passwd") |
| 777 | _copy_template(src, dst, []) |
| 778 | assert list(dst.iterdir()) == [] |
| 779 | |
| 780 | def test_muse_dir_skipped(self, tmp_path: pathlib.Path) -> None: |
| 781 | from muse.cli.commands.init import _copy_template |
| 782 | |
| 783 | src = tmp_path / "src" |
| 784 | dst = tmp_path / "dst" |
| 785 | src.mkdir() |
| 786 | dst.mkdir() |
| 787 | malicious = muse_dir(src) |
| 788 | malicious.mkdir() |
| 789 | (malicious / "repo.json").write_text('{"repo_id": "attacker"}') |
| 790 | (src / "safe.txt").write_text("ok") |
| 791 | _copy_template(src, dst, []) |
| 792 | assert not (muse_dir(dst)).exists() |
| 793 | assert (dst / "safe.txt").exists() |
| 794 | |
| 795 | def test_muse_file_also_skipped(self, tmp_path: pathlib.Path) -> None: |
| 796 | """A file named .muse (not a dir) is also skipped for safety.""" |
| 797 | from muse.cli.commands.init import _copy_template |
| 798 | |
| 799 | src = tmp_path / "src" |
| 800 | dst = tmp_path / "dst" |
| 801 | src.mkdir() |
| 802 | dst.mkdir() |
| 803 | (muse_dir(src)).write_text("malicious") |
| 804 | (src / "ok.txt").write_text("ok") |
| 805 | _copy_template(src, dst, []) |
| 806 | assert not (muse_dir(dst)).exists() |
| 807 | assert (dst / "ok.txt").exists() |
| 808 | |
| 809 | def test_existing_file_overwritten(self, tmp_path: pathlib.Path) -> None: |
| 810 | """shutil.copy2 overwrites existing files in the destination.""" |
| 811 | from muse.cli.commands.init import _copy_template |
| 812 | |
| 813 | src = tmp_path / "src" |
| 814 | dst = tmp_path / "dst" |
| 815 | src.mkdir() |
| 816 | dst.mkdir() |
| 817 | (src / "file.txt").write_text("from template") |
| 818 | (dst / "file.txt").write_text("original") |
| 819 | _copy_template(src, dst, []) |
| 820 | assert (dst / "file.txt").read_text() == "from template" |
| 821 | |
| 822 | def test_multiple_symlinks_all_skipped(self, tmp_path: pathlib.Path) -> None: |
| 823 | from muse.cli.commands.init import _copy_template |
| 824 | |
| 825 | src = tmp_path / "src" |
| 826 | dst = tmp_path / "dst" |
| 827 | src.mkdir() |
| 828 | dst.mkdir() |
| 829 | (src / "real.txt").write_text("real") |
| 830 | for name in ("link1", "link2", "link3"): |
| 831 | (src / name).symlink_to(str(src / "real.txt")) |
| 832 | _copy_template(src, dst, []) |
| 833 | assert (dst / "real.txt").exists() |
| 834 | for name in ("link1", "link2", "link3"): |
| 835 | assert not (dst / name).exists() |
| 836 | |
| 837 | def test_binary_file_copied_correctly(self, tmp_path: pathlib.Path) -> None: |
| 838 | from muse.cli.commands.init import _copy_template |
| 839 | |
| 840 | src = tmp_path / "src" |
| 841 | dst = tmp_path / "dst" |
| 842 | src.mkdir() |
| 843 | dst.mkdir() |
| 844 | data = bytes(range(256)) |
| 845 | (src / "binary.bin").write_bytes(data) |
| 846 | _copy_template(src, dst, []) |
| 847 | assert (dst / "binary.bin").read_bytes() == data |
| 848 | |
| 849 | |
| 850 | # --------------------------------------------------------------------------- |
| 851 | # Integration — HEAD and ref file format |
| 852 | # --------------------------------------------------------------------------- |
| 853 | |
| 854 | |
| 855 | class TestHeadAndRefFile: |
| 856 | """Verify the exact format of .muse/HEAD and branch ref files.""" |
| 857 | |
| 858 | def test_head_exact_format(self, tmp_path: pathlib.Path) -> None: |
| 859 | """HEAD must be exactly 'ref: refs/heads/main\\n'.""" |
| 860 | _init(tmp_path) |
| 861 | head = (head_path(tmp_path)).read_text() |
| 862 | assert head == "ref: refs/heads/main\n" |
| 863 | |
| 864 | def test_head_exact_format_custom_branch(self, tmp_path: pathlib.Path) -> None: |
| 865 | _init(tmp_path, "--default-branch", "dev") |
| 866 | head = (head_path(tmp_path)).read_text() |
| 867 | assert head == "ref: refs/heads/dev\n" |
| 868 | |
| 869 | def test_head_updated_on_force_with_different_branch(self, tmp_path: pathlib.Path) -> None: |
| 870 | """--force with a new --default-branch updates HEAD.""" |
| 871 | _init(tmp_path, "--default-branch", "main") |
| 872 | _init(tmp_path, "--force", "--default-branch", "dev") |
| 873 | head = (head_path(tmp_path)).read_text() |
| 874 | assert head == "ref: refs/heads/dev\n" |
| 875 | |
| 876 | def test_branch_ref_file_exists_after_init(self, tmp_path: pathlib.Path) -> None: |
| 877 | _init(tmp_path) |
| 878 | ref = ref_path(tmp_path, "main") |
| 879 | assert ref.exists() |
| 880 | |
| 881 | def test_branch_ref_file_is_empty_on_fresh_init(self, tmp_path: pathlib.Path) -> None: |
| 882 | """A fresh repo has no commits — branch ref file must be empty.""" |
| 883 | _init(tmp_path) |
| 884 | ref = ref_path(tmp_path, "main") |
| 885 | assert ref.read_text() == "" |
| 886 | |
| 887 | def test_custom_branch_ref_file_exists(self, tmp_path: pathlib.Path) -> None: |
| 888 | _init(tmp_path, "--default-branch", "feat/new") |
| 889 | ref = ref_path(tmp_path, "feat") / "new" |
| 890 | assert ref.exists() |
| 891 | |
| 892 | def test_refs_dir_created(self, tmp_path: pathlib.Path) -> None: |
| 893 | """The .muse/refs/ directory itself must exist (not just refs/heads/).""" |
| 894 | _init(tmp_path) |
| 895 | assert (muse_dir(tmp_path) / "refs").is_dir() |
| 896 | |
| 897 | def test_no_directory_is_a_symlink_after_init(self, tmp_path: pathlib.Path) -> None: |
| 898 | """Post-init integrity check: every _INIT_SUBDIRS entry must be a real dir.""" |
| 899 | from muse.cli.commands.init import _INIT_SUBDIRS |
| 900 | |
| 901 | _init(tmp_path) |
| 902 | muse = muse_dir(tmp_path) |
| 903 | for subdir in _INIT_SUBDIRS: |
| 904 | candidate = muse / subdir |
| 905 | assert candidate.is_dir(), f".muse/{subdir} is not a directory" |
| 906 | assert not candidate.is_symlink(), f".muse/{subdir} is a symlink" |
| 907 | |
| 908 | |
| 909 | # --------------------------------------------------------------------------- |
| 910 | # Integration — config file content |
| 911 | # --------------------------------------------------------------------------- |
| 912 | |
| 913 | |
| 914 | class TestConfigFiles: |
| 915 | """Verify content and TOML validity of generated config files.""" |
| 916 | |
| 917 | def test_config_toml_is_valid_toml(self, tmp_path: pathlib.Path) -> None: |
| 918 | _init(tmp_path) |
| 919 | parsed = tomllib.loads((config_toml_path(tmp_path)).read_text()) |
| 920 | assert isinstance(parsed, dict) |
| 921 | |
| 922 | def test_config_toml_has_user_section(self, tmp_path: pathlib.Path) -> None: |
| 923 | _init(tmp_path) |
| 924 | parsed = tomllib.loads((config_toml_path(tmp_path)).read_text()) |
| 925 | assert "user" in parsed |
| 926 | |
| 927 | def test_config_toml_has_remotes_section(self, tmp_path: pathlib.Path) -> None: |
| 928 | _init(tmp_path) |
| 929 | parsed = tomllib.loads((config_toml_path(tmp_path)).read_text()) |
| 930 | assert "remotes" in parsed |
| 931 | |
| 932 | def test_bare_config_toml_has_core_bare(self, tmp_path: pathlib.Path) -> None: |
| 933 | _init(tmp_path, "--bare") |
| 934 | parsed = tomllib.loads((config_toml_path(tmp_path)).read_text()) |
| 935 | assert parsed.get("core", {}).get("bare") is True |
| 936 | |
| 937 | def test_config_toml_not_overwritten_by_force(self, tmp_path: pathlib.Path) -> None: |
| 938 | """--force must not overwrite an existing config.toml.""" |
| 939 | _init(tmp_path) |
| 940 | config_path = config_toml_path(tmp_path) |
| 941 | config_path.write_text('[custom]\nkey = "value"\n') |
| 942 | _init(tmp_path, "--force") |
| 943 | parsed = tomllib.loads(config_path.read_text()) |
| 944 | assert parsed.get("custom", {}).get("key") == "value" |
| 945 | |
| 946 | def test_museignore_has_correct_domain_section(self, tmp_path: pathlib.Path) -> None: |
| 947 | """The [domain.<name>] section must match the --domain flag.""" |
| 948 | _init(tmp_path, "--domain", "midi") |
| 949 | parsed = tomllib.loads((tmp_path / ".museignore").read_text()) |
| 950 | assert "domain" in parsed |
| 951 | assert "midi" in parsed["domain"] |
| 952 | |
| 953 | def test_museignore_code_domain_section_present(self, tmp_path: pathlib.Path) -> None: |
| 954 | _init(tmp_path, "--domain", "code") |
| 955 | parsed = tomllib.loads((tmp_path / ".museignore").read_text()) |
| 956 | assert "code" in parsed.get("domain", {}) |
| 957 | |
| 958 | def test_museignore_does_not_contain_other_domain_sections( |
| 959 | self, tmp_path: pathlib.Path |
| 960 | ) -> None: |
| 961 | """A code-domain repo must not have a [domain.midi] section.""" |
| 962 | _init(tmp_path, "--domain", "code") |
| 963 | text = (tmp_path / ".museignore").read_text() |
| 964 | assert "domain.midi" not in text |
| 965 | |
| 966 | def test_repo_json_is_not_zero_bytes(self, tmp_path: pathlib.Path) -> None: |
| 967 | """Atomic write guard: repo.json must not be empty after init.""" |
| 968 | _init(tmp_path) |
| 969 | size = (repo_json_path(tmp_path)).stat().st_size |
| 970 | assert size > 0 |
| 971 | |
| 972 | def test_repo_json_created_at_is_utc_iso(self, tmp_path: pathlib.Path) -> None: |
| 973 | import datetime |
| 974 | |
| 975 | _init(tmp_path) |
| 976 | raw = json.loads((repo_json_path(tmp_path)).read_text())["created_at"] |
| 977 | dt = datetime.datetime.fromisoformat(raw) |
| 978 | assert dt.tzinfo is not None # must be timezone-aware |
| 979 | |
| 980 | def test_repo_json_domain_matches_arg(self, tmp_path: pathlib.Path) -> None: |
| 981 | _init(tmp_path, "--domain", "midi") |
| 982 | raw = json.loads((repo_json_path(tmp_path)).read_text()) |
| 983 | assert raw["domain"] == "midi" |
| 984 | |
| 985 | |
| 986 | # --------------------------------------------------------------------------- |
| 987 | # Integration — --force edge cases |
| 988 | # --------------------------------------------------------------------------- |
| 989 | |
| 990 | |
| 991 | class TestForceEdgeCases: |
| 992 | """Edge cases in the --force reinit path.""" |
| 993 | |
| 994 | def test_force_with_corrupt_repo_json_assigns_new_id( |
| 995 | self, tmp_path: pathlib.Path |
| 996 | ) -> None: |
| 997 | """A corrupt repo.json must not crash --force; init assigns a fresh repo_id.""" |
| 998 | _init(tmp_path) |
| 999 | (repo_json_path(tmp_path)).write_text("{ NOT VALID JSON !!!") |
| 1000 | result = _init(tmp_path, "--force", "--json") |
| 1001 | assert result.exit_code == 0 |
| 1002 | data = json.loads(result.output) |
| 1003 | assert "repo_id" in data |
| 1004 | # repo_id must be a sha256: content-addressed ID |
| 1005 | assert data["repo_id"].startswith("sha256:") |
| 1006 | assert len(data["repo_id"]) == 71 |
| 1007 | |
| 1008 | def test_force_with_empty_repo_json_assigns_new_id( |
| 1009 | self, tmp_path: pathlib.Path |
| 1010 | ) -> None: |
| 1011 | _init(tmp_path) |
| 1012 | (repo_json_path(tmp_path)).write_bytes(b"") |
| 1013 | result = _init(tmp_path, "--force", "--json") |
| 1014 | assert result.exit_code == 0 |
| 1015 | new_id = json.loads(result.output)["repo_id"] |
| 1016 | assert new_id.startswith("sha256:") |
| 1017 | assert len(new_id) == 71 |
| 1018 | |
| 1019 | def test_force_with_repo_json_missing_repo_id_assigns_new_id( |
| 1020 | self, tmp_path: pathlib.Path |
| 1021 | ) -> None: |
| 1022 | _init(tmp_path) |
| 1023 | (repo_json_path(tmp_path)).write_text('{"schema_version": 1}') |
| 1024 | result = _init(tmp_path, "--force", "--json") |
| 1025 | assert result.exit_code == 0 |
| 1026 | new_id = json.loads(result.output)["repo_id"] |
| 1027 | assert new_id.startswith("sha256:") |
| 1028 | assert len(new_id) == 71 |
| 1029 | |
| 1030 | def test_force_with_non_string_repo_id_assigns_new_id( |
| 1031 | self, tmp_path: pathlib.Path |
| 1032 | ) -> None: |
| 1033 | """repo_id must be a string — if it isn't, force must assign a new one.""" |
| 1034 | _init(tmp_path) |
| 1035 | (repo_json_path(tmp_path)).write_text('{"repo_id": 42}') |
| 1036 | result = _init(tmp_path, "--force", "--json") |
| 1037 | assert result.exit_code == 0 |
| 1038 | rid = json.loads(result.output)["repo_id"] |
| 1039 | assert isinstance(rid, str) |
| 1040 | assert rid != "42" |
| 1041 | |
| 1042 | def test_force_preserves_custom_museignore(self, tmp_path: pathlib.Path) -> None: |
| 1043 | """--force must not overwrite an existing .museignore.""" |
| 1044 | _init(tmp_path) |
| 1045 | custom = '[global]\npatterns = ["custom_file.log"]\n' |
| 1046 | (tmp_path / ".museignore").write_text(custom) |
| 1047 | _init(tmp_path, "--force") |
| 1048 | assert (tmp_path / ".museignore").read_text() == custom |
| 1049 | |
| 1050 | def test_force_preserves_custom_museattributes(self, tmp_path: pathlib.Path) -> None: |
| 1051 | _init(tmp_path) |
| 1052 | custom = '[meta]\ndomain = "spacetime"\n' |
| 1053 | (tmp_path / ".museattributes").write_text(custom) |
| 1054 | _init(tmp_path, "--force") |
| 1055 | assert (tmp_path / ".museattributes").read_text() == custom |
| 1056 | |
| 1057 | def test_force_reinitialized_flag_in_json(self, tmp_path: pathlib.Path) -> None: |
| 1058 | _init(tmp_path) |
| 1059 | result = _init(tmp_path, "--force", "--json") |
| 1060 | assert json.loads(result.output)["reinitialized"] is True |
| 1061 | |
| 1062 | def test_force_updates_schema_version_in_repo_json( |
| 1063 | self, tmp_path: pathlib.Path |
| 1064 | ) -> None: |
| 1065 | """After --force, repo.json must have the current schema_version.""" |
| 1066 | from muse.cli.commands.init import _REPO_SCHEMA_VERSION |
| 1067 | |
| 1068 | _init(tmp_path) |
| 1069 | _init(tmp_path, "--force") |
| 1070 | raw = json.loads((repo_json_path(tmp_path)).read_text()) |
| 1071 | assert raw["schema_version"] == _REPO_SCHEMA_VERSION |
| 1072 | |
| 1073 | def test_force_on_partially_missing_muse_dir(self, tmp_path: pathlib.Path) -> None: |
| 1074 | """--force on a .muse/ with some subdirs missing re-creates them.""" |
| 1075 | _init(tmp_path) |
| 1076 | import shutil |
| 1077 | |
| 1078 | shutil.rmtree(objects_dir(tmp_path)) |
| 1079 | result = _init(tmp_path, "--force") |
| 1080 | assert result.exit_code == 0 |
| 1081 | assert (objects_dir(tmp_path)).is_dir() |
| 1082 | |
| 1083 | |
| 1084 | # --------------------------------------------------------------------------- |
| 1085 | # Integration — two-repo isolation |
| 1086 | # --------------------------------------------------------------------------- |
| 1087 | |
| 1088 | |
| 1089 | class TestRepoIsolation: |
| 1090 | """Multiple repos in sibling directories must be completely independent.""" |
| 1091 | |
| 1092 | def test_two_repos_have_different_repo_ids(self, tmp_path: pathlib.Path) -> None: |
| 1093 | repo_a = tmp_path / "a" |
| 1094 | repo_b = tmp_path / "b" |
| 1095 | repo_a.mkdir() |
| 1096 | repo_b.mkdir() |
| 1097 | id_a = json.loads(_init(repo_a, "--json").output)["repo_id"] |
| 1098 | id_b = json.loads(_init(repo_b, "--json").output)["repo_id"] |
| 1099 | assert id_a != id_b |
| 1100 | |
| 1101 | def test_one_hundred_repos_have_unique_repo_ids(self, tmp_path: pathlib.Path) -> None: |
| 1102 | ids: set[str] = set() |
| 1103 | for i in range(100): |
| 1104 | repo = tmp_path / f"r{i:03d}" |
| 1105 | repo.mkdir() |
| 1106 | data = json.loads(_init(repo, "--json").output) |
| 1107 | ids.add(data["repo_id"]) |
| 1108 | assert len(ids) == 100, "repo_ids collided across 100 repos" |
| 1109 | |
| 1110 | def test_init_in_child_does_not_affect_parent(self, tmp_path: pathlib.Path) -> None: |
| 1111 | """Initialising a subdirectory must not create .muse/ in the parent.""" |
| 1112 | child = tmp_path / "child" |
| 1113 | child.mkdir() |
| 1114 | _init(child) |
| 1115 | assert not (muse_dir(tmp_path)).exists() |
| 1116 | |
| 1117 | def test_sibling_repos_independent_after_reinit(self, tmp_path: pathlib.Path) -> None: |
| 1118 | repo_a = tmp_path / "a" |
| 1119 | repo_b = tmp_path / "b" |
| 1120 | repo_a.mkdir() |
| 1121 | repo_b.mkdir() |
| 1122 | _init(repo_a) |
| 1123 | _init(repo_b) |
| 1124 | id_a_orig = json.loads((repo_json_path(repo_a)).read_text())["repo_id"] |
| 1125 | _init(repo_b, "--force") |
| 1126 | id_a_after = json.loads((repo_json_path(repo_a)).read_text())["repo_id"] |
| 1127 | assert id_a_orig == id_a_after # reinit of b must not touch a |
| 1128 | |
| 1129 | |
| 1130 | # --------------------------------------------------------------------------- |
| 1131 | # End-to-end — muse commands immediately after init |
| 1132 | # --------------------------------------------------------------------------- |
| 1133 | |
| 1134 | |
| 1135 | class TestEndToEnd: |
| 1136 | """After muse init, every core command must work without error.""" |
| 1137 | |
| 1138 | def test_status_works_on_fresh_repo(self, tmp_path: pathlib.Path) -> None: |
| 1139 | import subprocess |
| 1140 | |
| 1141 | _init(tmp_path) |
| 1142 | r = subprocess.run( |
| 1143 | ["muse", "status", "--json"], |
| 1144 | capture_output=True, text=True, cwd=str(tmp_path), |
| 1145 | ) |
| 1146 | assert r.returncode == 0 |
| 1147 | data = json.loads(r.stdout) |
| 1148 | assert data.get("branch") == "main" |
| 1149 | |
| 1150 | def test_log_works_on_fresh_repo(self, tmp_path: pathlib.Path) -> None: |
| 1151 | """muse log on an empty repo must exit 0 (no commits is not an error).""" |
| 1152 | import subprocess |
| 1153 | |
| 1154 | _init(tmp_path) |
| 1155 | r = subprocess.run( |
| 1156 | ["muse", "log", "--json"], |
| 1157 | capture_output=True, text=True, cwd=str(tmp_path), |
| 1158 | ) |
| 1159 | assert r.returncode == 0 |
| 1160 | |
| 1161 | def test_branch_shows_initial_branch(self, tmp_path: pathlib.Path) -> None: |
| 1162 | import subprocess |
| 1163 | |
| 1164 | _init(tmp_path) |
| 1165 | r = subprocess.run( |
| 1166 | ["muse", "branch"], |
| 1167 | capture_output=True, text=True, cwd=str(tmp_path), |
| 1168 | ) |
| 1169 | assert r.returncode == 0 |
| 1170 | assert "main" in r.stdout |
| 1171 | |
| 1172 | def test_branch_shows_custom_initial_branch(self, tmp_path: pathlib.Path) -> None: |
| 1173 | import subprocess |
| 1174 | |
| 1175 | _init(tmp_path, "--default-branch", "dev") |
| 1176 | r = subprocess.run( |
| 1177 | ["muse", "branch"], |
| 1178 | capture_output=True, text=True, cwd=str(tmp_path), |
| 1179 | ) |
| 1180 | assert r.returncode == 0 |
| 1181 | assert "dev" in r.stdout |
| 1182 | |
| 1183 | def test_first_commit_succeeds(self, tmp_path: pathlib.Path) -> None: |
| 1184 | """Full workflow: init → add file → commit.""" |
| 1185 | import subprocess |
| 1186 | |
| 1187 | _init(tmp_path) |
| 1188 | (tmp_path / "hello.py").write_text("print('hello')\n") |
| 1189 | add = subprocess.run( |
| 1190 | ["muse", "code", "add", "."], |
| 1191 | capture_output=True, text=True, cwd=str(tmp_path), |
| 1192 | ) |
| 1193 | assert add.returncode == 0, f"muse code add failed: {add.stderr}" |
| 1194 | commit = subprocess.run( |
| 1195 | ["muse", "commit", "-m", "first commit"], |
| 1196 | capture_output=True, text=True, cwd=str(tmp_path), |
| 1197 | ) |
| 1198 | assert commit.returncode == 0, f"muse commit failed: {commit.stderr}" |
| 1199 | |
| 1200 | def test_tag_command_works_after_init(self, tmp_path: pathlib.Path) -> None: |
| 1201 | """tags/ is pre-created at init — muse tag must not fail with 'no dir'.""" |
| 1202 | import subprocess |
| 1203 | |
| 1204 | _init(tmp_path) |
| 1205 | # muse tag list should work even on an empty repo |
| 1206 | r = subprocess.run( |
| 1207 | ["muse", "tag", "list"], |
| 1208 | capture_output=True, text=True, cwd=str(tmp_path), |
| 1209 | ) |
| 1210 | # exit 0 expected — no tags is not an error |
| 1211 | assert r.returncode == 0 |
| 1212 | |
| 1213 | def test_require_repo_succeeds_immediately_after_init( |
| 1214 | self, tmp_path: pathlib.Path |
| 1215 | ) -> None: |
| 1216 | """require_repo() called from Python must not raise after muse init.""" |
| 1217 | _init(tmp_path) |
| 1218 | saved = os.getcwd() |
| 1219 | try: |
| 1220 | os.chdir(tmp_path) |
| 1221 | from muse.core.repo import require_repo |
| 1222 | |
| 1223 | ctx = require_repo() |
| 1224 | assert ctx == tmp_path |
| 1225 | finally: |
| 1226 | os.chdir(saved) |
| 1227 | |
| 1228 | def test_status_on_custom_domain_repo(self, tmp_path: pathlib.Path) -> None: |
| 1229 | """muse status works on a repo with a non-default but registered domain.""" |
| 1230 | import subprocess |
| 1231 | |
| 1232 | _init(tmp_path, "--domain", "scaffold") |
| 1233 | r = subprocess.run( |
| 1234 | ["muse", "status", "--json"], |
| 1235 | capture_output=True, text=True, cwd=str(tmp_path), |
| 1236 | ) |
| 1237 | assert r.returncode == 0 |
| 1238 | |
| 1239 | def test_status_correctly_identifies_branch(self, tmp_path: pathlib.Path) -> None: |
| 1240 | import subprocess |
| 1241 | |
| 1242 | _init(tmp_path, "--default-branch", "feat/new-world") |
| 1243 | r = subprocess.run( |
| 1244 | ["muse", "status", "--json"], |
| 1245 | capture_output=True, text=True, cwd=str(tmp_path), |
| 1246 | ) |
| 1247 | data = json.loads(r.stdout) |
| 1248 | assert data.get("branch") == "feat/new-world" |
| 1249 | |
| 1250 | |
| 1251 | # --------------------------------------------------------------------------- |
| 1252 | # Security — deeper |
| 1253 | # --------------------------------------------------------------------------- |
| 1254 | |
| 1255 | |
| 1256 | class TestSecurityDeep: |
| 1257 | """Additional security scenarios beyond the basic TestSecurity class.""" |
| 1258 | |
| 1259 | def test_validation_happens_before_filesystem_is_touched( |
| 1260 | self, tmp_path: pathlib.Path |
| 1261 | ) -> None: |
| 1262 | """If the branch name is invalid, no .muse/ directory must be created.""" |
| 1263 | result = _init(tmp_path, "--default-branch", "bad branch name!") |
| 1264 | assert result.exit_code != 0 |
| 1265 | assert not (muse_dir(tmp_path)).exists() |
| 1266 | |
| 1267 | def test_domain_validation_before_filesystem_touched( |
| 1268 | self, tmp_path: pathlib.Path |
| 1269 | ) -> None: |
| 1270 | """If the domain is invalid, no .muse/ directory must be created.""" |
| 1271 | result = _init(tmp_path, "--domain", "BAD_DOMAIN!") |
| 1272 | assert result.exit_code != 0 |
| 1273 | assert not (muse_dir(tmp_path)).exists() |
| 1274 | |
| 1275 | def test_template_json_error_path_emits_clean_json( |
| 1276 | self, tmp_path: pathlib.Path |
| 1277 | ) -> None: |
| 1278 | """--template with symlink path + --json must emit valid JSON error.""" |
| 1279 | real = tmp_path / "real" |
| 1280 | real.mkdir() |
| 1281 | link = tmp_path / "link" |
| 1282 | link.symlink_to(real) |
| 1283 | result = _init(tmp_path / "repo", "--template", str(link), "--json") |
| 1284 | data = json.loads(result.output) |
| 1285 | assert "error" in data |
| 1286 | |
| 1287 | def test_template_symlink_inside_subdir_not_followed( |
| 1288 | self, tmp_path: pathlib.Path |
| 1289 | ) -> None: |
| 1290 | """Symlinks nested inside a template's subdirectory are not followed |
| 1291 | (the directory containing them is deep-copied by shutil.copytree |
| 1292 | which follows symlinks by default — we only guard at the top level). |
| 1293 | This test documents the known behaviour so it is explicit.""" |
| 1294 | tmpl = tmp_path / "tmpl" |
| 1295 | (tmpl / "sub").mkdir(parents=True) |
| 1296 | # A real file inside a subdirectory |
| 1297 | (tmpl / "sub" / "real.txt").write_text("ok") |
| 1298 | repo = tmp_path / "repo" |
| 1299 | repo.mkdir() |
| 1300 | result = _init(repo, "--template", str(tmpl)) |
| 1301 | assert result.exit_code == 0 |
| 1302 | # The subdirectory with the real file is copied |
| 1303 | assert (repo / "sub" / "real.txt").exists() |
| 1304 | |
| 1305 | def test_all_error_paths_return_nonzero(self, tmp_path: pathlib.Path) -> None: |
| 1306 | """Every known error path must return a non-zero exit code.""" |
| 1307 | cases = [ |
| 1308 | ["--default-branch", "bad branch"], # bad branch |
| 1309 | ["--domain", "Bad!"], # bad domain |
| 1310 | ["--template", str(tmp_path / "nope")], # missing template |
| 1311 | ] |
| 1312 | for args in cases: |
| 1313 | result = _init(tmp_path / "fresh", *args) |
| 1314 | assert result.exit_code != 0, f"expected failure for args {args}" |
| 1315 | |
| 1316 | def test_very_long_branch_name_rejected(self, tmp_path: pathlib.Path) -> None: |
| 1317 | """Branch names longer than the allowed max must be rejected.""" |
| 1318 | long_name = "a" * 300 |
| 1319 | result = _init(tmp_path, "--default-branch", long_name) |
| 1320 | assert result.exit_code != 0 |
| 1321 | |
| 1322 | def test_dot_only_branch_name_rejected(self, tmp_path: pathlib.Path) -> None: |
| 1323 | result = _init(tmp_path, "--default-branch", ".") |
| 1324 | assert result.exit_code != 0 |
| 1325 | |
| 1326 | def test_dotdot_branch_name_rejected(self, tmp_path: pathlib.Path) -> None: |
| 1327 | result = _init(tmp_path, "--default-branch", "..") |
| 1328 | assert result.exit_code != 0 |
| 1329 | |
| 1330 | def test_reinit_twice_without_force_second_fails(self, tmp_path: pathlib.Path) -> None: |
| 1331 | """Two consecutive inits without --force: second must always fail.""" |
| 1332 | assert _init(tmp_path).exit_code == 0 |
| 1333 | assert _init(tmp_path).exit_code != 0 |
| 1334 | assert _init(tmp_path).exit_code != 0 |
| 1335 | |
| 1336 | |
| 1337 | # --------------------------------------------------------------------------- |
| 1338 | # Stress — deep and concurrent |
| 1339 | # --------------------------------------------------------------------------- |
| 1340 | |
| 1341 | |
| 1342 | class TestStressDeep: |
| 1343 | """Large-scale, concurrent, and adversarial stress tests.""" |
| 1344 | |
| 1345 | @pytest.mark.slow |
| 1346 | def test_concurrent_inits_to_different_dirs(self, tmp_path: pathlib.Path) -> None: |
| 1347 | """50 concurrent threads each init a different directory — no crashes, |
| 1348 | no repo_id collisions, no cross-repo contamination. |
| 1349 | |
| 1350 | Uses subprocess.run (not the _init helper) because os.chdir() is |
| 1351 | process-global and not thread-safe. Each subprocess gets its own |
| 1352 | working directory via the ``cwd`` argument. |
| 1353 | """ |
| 1354 | import subprocess |
| 1355 | |
| 1356 | results: list[tuple[int, str]] = [] |
| 1357 | errors: list[str] = [] |
| 1358 | lock = threading.Lock() |
| 1359 | |
| 1360 | def do_init(i: int) -> None: |
| 1361 | repo = tmp_path / f"concurrent_{i:03d}" |
| 1362 | repo.mkdir() |
| 1363 | r = subprocess.run( |
| 1364 | ["muse", "init", "--json"], |
| 1365 | capture_output=True, text=True, cwd=str(repo), |
| 1366 | ) |
| 1367 | with lock: |
| 1368 | if r.returncode != 0: |
| 1369 | errors.append(f"repo_{i}: exit={r.returncode} out={r.stdout[:100]}") |
| 1370 | else: |
| 1371 | try: |
| 1372 | data = json.loads(r.stdout) |
| 1373 | results.append((i, data["repo_id"])) |
| 1374 | except Exception as exc: |
| 1375 | errors.append(f"repo_{i}: parse error {exc}") |
| 1376 | |
| 1377 | threads = [threading.Thread(target=do_init, args=(i,)) for i in range(50)] |
| 1378 | for t in threads: |
| 1379 | t.start() |
| 1380 | for t in threads: |
| 1381 | t.join() |
| 1382 | |
| 1383 | assert not errors, f"init errors: {errors}" |
| 1384 | assert len(results) == 50 |
| 1385 | |
| 1386 | # All repo_ids must be unique |
| 1387 | ids = [rid for _, rid in results] |
| 1388 | assert len(set(ids)) == len(ids), "repo_id collision among concurrent inits" |
| 1389 | |
| 1390 | @pytest.mark.slow |
| 1391 | def test_large_deeply_nested_template(self, tmp_path: pathlib.Path) -> None: |
| 1392 | """Template with 10 directories × 50 files each (500 total) copies cleanly.""" |
| 1393 | tmpl = tmp_path / "big" |
| 1394 | for d in range(10): |
| 1395 | subdir = tmpl / f"dir_{d:02d}" |
| 1396 | subdir.mkdir(parents=True) |
| 1397 | for f in range(50): |
| 1398 | (subdir / f"file_{f:03d}.txt").write_text(f"d={d} f={f}") |
| 1399 | |
| 1400 | repo = tmp_path / "repo" |
| 1401 | repo.mkdir() |
| 1402 | result = _init(repo, "--template", str(tmpl)) |
| 1403 | assert result.exit_code == 0 |
| 1404 | for d in range(10): |
| 1405 | assert (repo / f"dir_{d:02d}" / "file_000.txt").exists() |
| 1406 | assert (repo / f"dir_{d:02d}" / "file_049.txt").exists() |
| 1407 | |
| 1408 | @pytest.mark.slow |
| 1409 | def test_force_reinit_100_times_preserves_repo_id( |
| 1410 | self, tmp_path: pathlib.Path |
| 1411 | ) -> None: |
| 1412 | """100 consecutive --force inits must all return the identical repo_id.""" |
| 1413 | first = json.loads(_init(tmp_path, "--json").output)["repo_id"] |
| 1414 | for i in range(100): |
| 1415 | result = _init(tmp_path, "--force", "--json") |
| 1416 | assert result.exit_code == 0, f"failed on iteration {i}" |
| 1417 | rid = json.loads(result.output)["repo_id"] |
| 1418 | assert rid == first, f"repo_id changed on iteration {i}" |
| 1419 | |
| 1420 | @pytest.mark.slow |
| 1421 | def test_mixed_template_stress(self, tmp_path: pathlib.Path) -> None: |
| 1422 | """Template with a mix of real files, symlinks, .muse dir, subdirs. |
| 1423 | Real files must be copied; everything else silently skipped.""" |
| 1424 | tmpl = tmp_path / "mixed" |
| 1425 | tmpl.mkdir() |
| 1426 | # Legitimate files |
| 1427 | for i in range(100): |
| 1428 | (tmpl / f"real_{i:03d}.txt").write_text(f"content {i}") |
| 1429 | # Symlinks — should be skipped |
| 1430 | for i in range(20): |
| 1431 | (tmpl / f"malicious_{i:02d}").symlink_to("/etc/passwd") |
| 1432 | # .muse dir — should be skipped |
| 1433 | (muse_dir(tmpl)).mkdir() |
| 1434 | (repo_json_path(tmpl)).write_text('{"repo_id": "malicious"}') |
| 1435 | # Legitimate subdirectory |
| 1436 | sub = tmpl / "legit_sub" |
| 1437 | sub.mkdir() |
| 1438 | (sub / "nested.txt").write_text("nested content") |
| 1439 | |
| 1440 | repo = tmp_path / "repo" |
| 1441 | repo.mkdir() |
| 1442 | result = _init(repo, "--template", str(tmpl)) |
| 1443 | assert result.exit_code == 0 |
| 1444 | |
| 1445 | # Real files copied |
| 1446 | assert (repo / "real_000.txt").read_text() == "content 0" |
| 1447 | assert (repo / "real_099.txt").read_text() == "content 99" |
| 1448 | assert (repo / "legit_sub" / "nested.txt").read_text() == "nested content" |
| 1449 | |
| 1450 | # Symlinks not copied |
| 1451 | for i in range(20): |
| 1452 | assert not (repo / f"malicious_{i:02d}").exists() |
| 1453 | |
| 1454 | # .muse not overwritten |
| 1455 | stored = json.loads((repo_json_path(repo)).read_text()) |
| 1456 | assert stored.get("repo_id") != "malicious" |
| 1457 | |
| 1458 | def test_all_required_subdirs_created_consistently( |
| 1459 | self, tmp_path: pathlib.Path |
| 1460 | ) -> None: |
| 1461 | """Verify all required subdirs are consistently created across 20 inits.""" |
| 1462 | from muse.cli.commands.init import _INIT_SUBDIRS |
| 1463 | |
| 1464 | for i in range(20): |
| 1465 | repo = tmp_path / f"repo_{i:02d}" |
| 1466 | repo.mkdir() |
| 1467 | _init(repo) |
| 1468 | muse = muse_dir(repo) |
| 1469 | for subdir in _INIT_SUBDIRS: |
| 1470 | assert (muse / subdir).is_dir(), ( |
| 1471 | f"repo_{i}: .muse/{subdir} missing" |
| 1472 | ) |
| 1473 | |
| 1474 | |
| 1475 | # --------------------------------------------------------------------------- |
| 1476 | # Directory argument — muse init <dir> (ergonomics parity with git init <dir>) |
| 1477 | # --------------------------------------------------------------------------- |
| 1478 | |
| 1479 | |
| 1480 | def _init_from(cwd: pathlib.Path, *args: str) -> "InvokeResult": |
| 1481 | """Invoke ``muse init`` with CWD set to *cwd*, passing *args* verbatim. |
| 1482 | |
| 1483 | Unlike ``_init``, this helper does NOT pre-create the target directory — |
| 1484 | the directory-argument feature is expected to create it. |
| 1485 | """ |
| 1486 | from muse.cli.app import main as cli |
| 1487 | |
| 1488 | cwd.mkdir(parents=True, exist_ok=True) |
| 1489 | saved = os.getcwd() |
| 1490 | try: |
| 1491 | os.chdir(cwd) |
| 1492 | return runner.invoke(cli, ["init", *args]) |
| 1493 | finally: |
| 1494 | os.chdir(saved) |
| 1495 | |
| 1496 | |
| 1497 | class TestDirectoryArgument: |
| 1498 | """muse init <dir> must create the directory (if needed) and init inside it.""" |
| 1499 | |
| 1500 | def test_absolute_path_creates_dir_and_inits(self, tmp_path: pathlib.Path) -> None: |
| 1501 | target = tmp_path / "new_repo" |
| 1502 | # target does not exist yet — the command must create it |
| 1503 | result = _init_from(tmp_path, str(target)) |
| 1504 | assert result.exit_code == 0, result.output |
| 1505 | assert (muse_dir(target)).is_dir() |
| 1506 | |
| 1507 | def test_relative_path_creates_dir_and_inits(self, tmp_path: pathlib.Path) -> None: |
| 1508 | result = _init_from(tmp_path, "sub_repo") |
| 1509 | assert result.exit_code == 0, result.output |
| 1510 | assert muse_dir(tmp_path / "sub_repo").is_dir() |
| 1511 | |
| 1512 | def test_dot_is_equivalent_to_no_arg(self, tmp_path: pathlib.Path) -> None: |
| 1513 | """muse init . should initialise in CWD, same as muse init.""" |
| 1514 | result = _init_from(tmp_path, ".") |
| 1515 | assert result.exit_code == 0, result.output |
| 1516 | assert (muse_dir(tmp_path)).is_dir() |
| 1517 | |
| 1518 | def test_existing_empty_dir_is_used(self, tmp_path: pathlib.Path) -> None: |
| 1519 | target = tmp_path / "existing" |
| 1520 | target.mkdir() |
| 1521 | result = _init_from(tmp_path, str(target)) |
| 1522 | assert result.exit_code == 0, result.output |
| 1523 | assert (muse_dir(target)).is_dir() |
| 1524 | |
| 1525 | def test_cwd_is_not_initialised_when_dir_arg_given(self, tmp_path: pathlib.Path) -> None: |
| 1526 | """When a directory argument is provided, CWD must NOT get a .muse/.""" |
| 1527 | target = tmp_path / "target" |
| 1528 | result = _init_from(tmp_path, str(target)) |
| 1529 | assert result.exit_code == 0, result.output |
| 1530 | assert not (muse_dir(tmp_path)).exists() |
| 1531 | |
| 1532 | def test_deeply_nested_nonexistent_path_created(self, tmp_path: pathlib.Path) -> None: |
| 1533 | target = tmp_path / "a" / "b" / "c" |
| 1534 | result = _init_from(tmp_path, str(target)) |
| 1535 | assert result.exit_code == 0, result.output |
| 1536 | assert (muse_dir(target)).is_dir() |
| 1537 | |
| 1538 | def test_json_path_reflects_target_dir(self, tmp_path: pathlib.Path) -> None: |
| 1539 | target = tmp_path / "my_repo" |
| 1540 | result = _init_from(tmp_path, str(target), "--json") |
| 1541 | assert result.exit_code == 0, result.output |
| 1542 | data = json.loads(result.output) |
| 1543 | assert data["path"].endswith(".muse") |
| 1544 | assert pathlib.Path(data["path"]).parent.resolve() == target.resolve() |
| 1545 | |
| 1546 | def test_dir_arg_with_json_output(self, tmp_path: pathlib.Path) -> None: |
| 1547 | target = tmp_path / "json_repo" |
| 1548 | result = _init_from(tmp_path, str(target), "--json") |
| 1549 | assert result.exit_code == 0, result.output |
| 1550 | data = json.loads(result.output) |
| 1551 | assert data["status"] == "ok" |
| 1552 | assert "repo_id" in data |
| 1553 | |
| 1554 | def test_dir_arg_with_all_flags(self, tmp_path: pathlib.Path) -> None: |
| 1555 | target = tmp_path / "full_flags" |
| 1556 | result = _init_from( |
| 1557 | tmp_path, str(target), |
| 1558 | "--default-branch", "dev", |
| 1559 | "--domain", "code", |
| 1560 | "--json", |
| 1561 | ) |
| 1562 | assert result.exit_code == 0, result.output |
| 1563 | data = json.loads(result.output) |
| 1564 | assert data["branch"] == "dev" |
| 1565 | assert data["domain"] == "code" |
| 1566 | |
| 1567 | def test_dir_arg_human_output_shows_target_muse_dir(self, tmp_path: pathlib.Path) -> None: |
| 1568 | target = tmp_path / "human_repo" |
| 1569 | result = _init_from(tmp_path, str(target)) |
| 1570 | assert result.exit_code == 0, result.output |
| 1571 | muse_dir_str = str(muse_dir(target)) |
| 1572 | assert muse_dir_str in result.output or str(muse_dir(target.resolve())) in result.output |
| 1573 | |
| 1574 | def test_force_flag_with_dir_arg(self, tmp_path: pathlib.Path) -> None: |
| 1575 | target = tmp_path / "force_repo" |
| 1576 | _init_from(tmp_path, str(target)) |
| 1577 | first_id = json.loads( |
| 1578 | (repo_json_path(target)).read_text() |
| 1579 | )["repo_id"] |
| 1580 | result = _init_from(tmp_path, str(target), "--force") |
| 1581 | assert result.exit_code == 0, result.output |
| 1582 | second_id = json.loads( |
| 1583 | (repo_json_path(target)).read_text() |
| 1584 | )["repo_id"] |
| 1585 | assert first_id == second_id |
| 1586 | |
| 1587 | def test_bare_flag_with_dir_arg(self, tmp_path: pathlib.Path) -> None: |
| 1588 | target = tmp_path / "bare_repo" |
| 1589 | result = _init_from(tmp_path, str(target), "--bare") |
| 1590 | assert result.exit_code == 0, result.output |
| 1591 | assert (muse_dir(target)).is_dir() |
| 1592 | assert not (target / ".museignore").exists() |
| 1593 | |
| 1594 | def test_status_works_after_dir_arg_init(self, tmp_path: pathlib.Path) -> None: |
| 1595 | """Full round-trip: init with dir arg, then muse status inside it.""" |
| 1596 | import subprocess |
| 1597 | |
| 1598 | target = tmp_path / "rountrip" |
| 1599 | result = _init_from(tmp_path, str(target)) |
| 1600 | assert result.exit_code == 0, result.output |
| 1601 | r = subprocess.run( |
| 1602 | ["muse", "status", "--json"], |
| 1603 | capture_output=True, text=True, cwd=str(target), |
| 1604 | ) |
| 1605 | assert r.returncode == 0, r.stderr |
| 1606 | data = json.loads(r.stdout) |
| 1607 | assert data["branch"] == "main" |
| 1608 | |
| 1609 | |
| 1610 | # --------------------------------------------------------------------------- |
| 1611 | # Default hub remotes |
| 1612 | # --------------------------------------------------------------------------- |
| 1613 | |
| 1614 | _HUB_LOCAL = "https://localhost:1337" |
| 1615 | _HUB_STAGING = "https://staging.musehub.ai" |
| 1616 | _HUB_PRODUCTION = "https://musehub.ai" |
| 1617 | |
| 1618 | |
| 1619 | class TestDefaultHubRemotes: |
| 1620 | """muse init pre-wires local/staging/production remotes when a handle is |
| 1621 | available in ~/.muse/identity.toml, and sets hub.url to localhost.""" |
| 1622 | |
| 1623 | def _read_config(self, repo: pathlib.Path) -> Mapping[str, object]: |
| 1624 | import tomllib |
| 1625 | cfg = config_toml_path(repo) |
| 1626 | with cfg.open("rb") as fh: |
| 1627 | return tomllib.load(fh) |
| 1628 | |
| 1629 | def _init_with_handle( |
| 1630 | self, |
| 1631 | tmp_path: pathlib.Path, |
| 1632 | handle: str, |
| 1633 | repo_name: str = "myrepo", |
| 1634 | ) -> pathlib.Path: |
| 1635 | """Init a repo whose directory is named *repo_name*, with *handle* in identity.""" |
| 1636 | from unittest.mock import patch |
| 1637 | repo = tmp_path / repo_name |
| 1638 | repo.mkdir() |
| 1639 | with patch( |
| 1640 | "muse.cli.commands.init.resolve_default_handle", |
| 1641 | return_value=handle, |
| 1642 | ): |
| 1643 | _init(repo) |
| 1644 | return repo |
| 1645 | |
| 1646 | # hub.url defaults --------------------------------------------------- |
| 1647 | |
| 1648 | def test_hub_url_defaults_to_localhost(self, tmp_path: pathlib.Path) -> None: |
| 1649 | """hub.url must be set to localhost:1337 by default.""" |
| 1650 | repo = self._init_with_handle(tmp_path, "gabriel") |
| 1651 | cfg = self._read_config(repo) |
| 1652 | assert cfg["hub"]["url"] == _HUB_LOCAL |
| 1653 | |
| 1654 | def test_hub_url_default_is_not_commented_out(self, tmp_path: pathlib.Path) -> None: |
| 1655 | """hub.url must be an active key, not a comment.""" |
| 1656 | repo = self._init_with_handle(tmp_path, "gabriel") |
| 1657 | raw = (config_toml_path(repo)).read_text(encoding="utf-8") |
| 1658 | assert "# url" not in raw |
| 1659 | |
| 1660 | # local remote ------------------------------------------------------- |
| 1661 | |
| 1662 | def test_local_remote_url_contains_handle_and_slug(self, tmp_path: pathlib.Path) -> None: |
| 1663 | """'local' remote URL must embed the handle and directory name.""" |
| 1664 | repo = self._init_with_handle(tmp_path, "gabriel", repo_name="myrepo") |
| 1665 | cfg = self._read_config(repo) |
| 1666 | assert cfg["remotes"]["local"]["url"] == f"{_HUB_LOCAL}/gabriel/myrepo" |
| 1667 | |
| 1668 | def test_local_remote_uses_localhost_base(self, tmp_path: pathlib.Path) -> None: |
| 1669 | repo = self._init_with_handle(tmp_path, "gabriel") |
| 1670 | cfg = self._read_config(repo) |
| 1671 | assert cfg["remotes"]["local"]["url"].startswith(_HUB_LOCAL) |
| 1672 | |
| 1673 | # staging remote ----------------------------------------------------- |
| 1674 | |
| 1675 | def test_staging_remote_url_contains_handle_and_slug(self, tmp_path: pathlib.Path) -> None: |
| 1676 | repo = self._init_with_handle(tmp_path, "gabriel", repo_name="myrepo") |
| 1677 | cfg = self._read_config(repo) |
| 1678 | assert cfg["remotes"]["staging"]["url"] == f"{_HUB_STAGING}/gabriel/myrepo" |
| 1679 | |
| 1680 | def test_staging_remote_uses_staging_base(self, tmp_path: pathlib.Path) -> None: |
| 1681 | repo = self._init_with_handle(tmp_path, "gabriel") |
| 1682 | cfg = self._read_config(repo) |
| 1683 | assert cfg["remotes"]["staging"]["url"].startswith(_HUB_STAGING) |
| 1684 | |
| 1685 | # production remote -------------------------------------------------- |
| 1686 | |
| 1687 | def test_production_remote_url_contains_handle_and_slug(self, tmp_path: pathlib.Path) -> None: |
| 1688 | repo = self._init_with_handle(tmp_path, "gabriel", repo_name="myrepo") |
| 1689 | cfg = self._read_config(repo) |
| 1690 | assert cfg["remotes"]["production"]["url"] == f"{_HUB_PRODUCTION}/gabriel/myrepo" |
| 1691 | |
| 1692 | def test_production_remote_uses_production_base(self, tmp_path: pathlib.Path) -> None: |
| 1693 | repo = self._init_with_handle(tmp_path, "gabriel") |
| 1694 | cfg = self._read_config(repo) |
| 1695 | assert cfg["remotes"]["production"]["url"].startswith(_HUB_PRODUCTION) |
| 1696 | |
| 1697 | # slug comes from directory name ------------------------------------ |
| 1698 | |
| 1699 | def test_slug_uses_directory_name(self, tmp_path: pathlib.Path) -> None: |
| 1700 | """The repo slug in remote URLs must be the directory name, not a hash.""" |
| 1701 | repo = self._init_with_handle(tmp_path, "gabriel", repo_name="cool-project") |
| 1702 | cfg = self._read_config(repo) |
| 1703 | assert cfg["remotes"]["local"]["url"].endswith("/gabriel/cool-project") |
| 1704 | assert cfg["remotes"]["staging"]["url"].endswith("/gabriel/cool-project") |
| 1705 | assert cfg["remotes"]["production"]["url"].endswith("/gabriel/cool-project") |
| 1706 | |
| 1707 | def test_different_handles_produce_different_urls(self, tmp_path: pathlib.Path) -> None: |
| 1708 | from unittest.mock import patch |
| 1709 | for handle in ("alice", "bob"): |
| 1710 | repo = tmp_path / handle / "repo" |
| 1711 | repo.mkdir(parents=True) |
| 1712 | with patch("muse.cli.commands.init.resolve_default_handle", return_value=handle): |
| 1713 | _init(repo) |
| 1714 | cfg = self._read_config(repo) |
| 1715 | assert f"/{handle}/" in cfg["remotes"]["local"]["url"] |
| 1716 | |
| 1717 | # no handle — graceful degradation ---------------------------------- |
| 1718 | |
| 1719 | def test_no_handle_hub_url_still_set_to_localhost(self, tmp_path: pathlib.Path) -> None: |
| 1720 | """Without a handle, hub.url is still wired to localhost.""" |
| 1721 | from unittest.mock import patch |
| 1722 | repo = tmp_path / "noidrepo" |
| 1723 | repo.mkdir() |
| 1724 | with patch("muse.cli.commands.init.resolve_default_handle", return_value=None): |
| 1725 | _init(repo) |
| 1726 | cfg = self._read_config(repo) |
| 1727 | assert cfg["hub"]["url"] == _HUB_LOCAL |
| 1728 | |
| 1729 | def test_no_handle_no_remotes_written(self, tmp_path: pathlib.Path) -> None: |
| 1730 | """Without a handle we cannot construct remote URLs — remotes stay empty.""" |
| 1731 | from unittest.mock import patch |
| 1732 | repo = tmp_path / "noidrepo" |
| 1733 | repo.mkdir() |
| 1734 | with patch("muse.cli.commands.init.resolve_default_handle", return_value=None): |
| 1735 | _init(repo) |
| 1736 | cfg = self._read_config(repo) |
| 1737 | assert cfg.get("remotes", {}) == {} |
| 1738 | |
| 1739 | # JSON output reflects remotes ------------------------------------- |
| 1740 | |
| 1741 | def test_json_output_includes_remotes_key(self, tmp_path: pathlib.Path) -> None: |
| 1742 | from unittest.mock import patch |
| 1743 | repo = tmp_path / "jrepo" |
| 1744 | repo.mkdir() |
| 1745 | with patch("muse.cli.commands.init.resolve_default_handle", return_value="gabriel"): |
| 1746 | result = _init(repo, "--json") |
| 1747 | data = json.loads(result.output) |
| 1748 | assert "remotes" in data |
| 1749 | |
| 1750 | def test_json_remotes_lists_all_three(self, tmp_path: pathlib.Path) -> None: |
| 1751 | from unittest.mock import patch |
| 1752 | repo = tmp_path / "jrepo" |
| 1753 | repo.mkdir() |
| 1754 | with patch("muse.cli.commands.init.resolve_default_handle", return_value="gabriel"): |
| 1755 | result = _init(repo, "--json") |
| 1756 | data = json.loads(result.output) |
| 1757 | assert set(data["remotes"]) == {"local", "staging", "production"} |
| 1758 | |
| 1759 | # reinit preserves existing remotes -------------------------------- |
| 1760 | |
| 1761 | def test_reinit_with_force_preserves_existing_config(self, tmp_path: pathlib.Path) -> None: |
| 1762 | """--force reinit must not overwrite an existing config.toml.""" |
| 1763 | repo = self._init_with_handle(tmp_path, "gabriel") |
| 1764 | original_url = self._read_config(repo)["remotes"]["local"]["url"] |
| 1765 | from unittest.mock import patch |
| 1766 | with patch("muse.cli.commands.init.resolve_default_handle", return_value="gabriel"): |
| 1767 | _init(repo, "--force") |
| 1768 | assert self._read_config(repo)["remotes"]["local"]["url"] == original_url |
File History
4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2
fix: remove commit_exists filter from have anchors — server…
Sonnet 4.6
patch
22 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e
fix: rename objects→blobs in push client and all stale test…
Sonnet 4.6
patch
23 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a
fix: repair four test failures from post-migration audit
Sonnet 4.6
patch
30 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf
fix: unified object store migration — idempotent writes, JS…
Sonnet 4.6
minor
⚠
30 days ago