test_status_supercharge.py
python
sha256:1d3f5470f45db58e32047678debc9438fdded1b2c7332cc743d2b8be32fdafc8
fixing more broken tests
Human
patch
4 days ago
| 1 | """SUPERCHARGE tests for ``muse status``. |
| 2 | |
| 3 | Gaps addressed beyond the existing test_cmd_status.py + test_status_json_schema.py: |
| 4 | |
| 5 | Unit |
| 6 | U1 duration_ms present and non-negative in JSON (both code and non-code paths) |
| 7 | U2 exit_code present and correct in JSON |
| 8 | U3 sparse_checkout key present in JSON — null when disabled, dict when active |
| 9 | U4 sparse_checkout.mode / patterns / enabled match live config |
| 10 | U5 _compute_upstream_info: no-local-head edge case |
| 11 | |
| 12 | Integration |
| 13 | I1 Code-domain upstream (ahead/behind) appears in JSON — was hardcoded None (bug) |
| 14 | I2 --branch-only always emits merge_in_progress, merge_from, conflict_count |
| 15 | I3 --branch-only exits 0 even with --exit-code flag |
| 16 | I4 checkout_interrupted=True when CHECKOUT_HEAD file exists |
| 17 | I5 checkout_target matches CHECKOUT_HEAD content |
| 18 | I6 checkout_interrupted=False when CHECKOUT_HEAD absent |
| 19 | I7 --short + --json produces valid JSON with duration_ms |
| 20 | I8 duration_ms > 0 (real timing, not placeholder) |
| 21 | |
| 22 | Security |
| 23 | S1 merge_from with ANSI in JSON value is safe (no raw escape bytes) |
| 24 | S2 checkout_target with ANSI in JSON value is safe |
| 25 | S3 branch name with ANSI in JSON value is safe |
| 26 | S4 JSON output has no raw \x1b bytes regardless of state |
| 27 | |
| 28 | Data integrity |
| 29 | D1 duration_ms is a float (not int, not string) |
| 30 | D2 exit_code is an int (not bool, not string) |
| 31 | D3 sparse_checkout.patterns is always a list in JSON |
| 32 | D4 total_changes accounts for renamed when present (non-code domain) |
| 33 | D5 sparse_checkout survives disable — re-query after disable returns null |
| 34 | |
| 35 | Performance / stress |
| 36 | P1 5 000-file repo status completes, duration_ms < 10 000 |
| 37 | P2 10 rapid sequential status calls — duration_ms present in every response |
| 38 | P3 duration_ms is consistent (two clean-tree calls within 2x of each other) |
| 39 | |
| 40 | Concurrent |
| 41 | C1 4 concurrent status calls on separate repos all succeed |
| 42 | """ |
| 43 | |
| 44 | from __future__ import annotations |
| 45 | from collections.abc import Mapping |
| 46 | |
| 47 | import json |
| 48 | import os |
| 49 | import pathlib |
| 50 | import threading |
| 51 | import time |
| 52 | |
| 53 | _CHDIR_LOCK = threading.Lock() |
| 54 | |
| 55 | import pytest |
| 56 | |
| 57 | from tests.cli_test_helper import CliRunner |
| 58 | from muse.core.types import long_id |
| 59 | from muse.core.paths import head_path, muse_dir |
| 60 | |
| 61 | runner = CliRunner() |
| 62 | |
| 63 | |
| 64 | # --------------------------------------------------------------------------- |
| 65 | # Helpers |
| 66 | # --------------------------------------------------------------------------- |
| 67 | |
| 68 | |
| 69 | def _env(root: pathlib.Path) -> Mapping[str, str]: |
| 70 | return {"MUSE_REPO_ROOT": str(root)} |
| 71 | |
| 72 | |
| 73 | def _invoke(root: pathlib.Path, *args: str) -> Mapping[str, object]: |
| 74 | result = runner.invoke(None, list(args), env=_env(root)) |
| 75 | return result |
| 76 | |
| 77 | |
| 78 | def _status(root: pathlib.Path, *extra: str) -> Mapping[str, object]: |
| 79 | result = runner.invoke(None, ["status", "--json", *extra], env=_env(root)) |
| 80 | assert result.exit_code == 0, f"status failed: {result.stderr}\n{result.stdout}" |
| 81 | return json.loads(result.stdout) |
| 82 | |
| 83 | |
| 84 | def _init_repo(tmp: pathlib.Path, *, domain: str = "code") -> pathlib.Path: |
| 85 | tmp.mkdir(parents=True, exist_ok=True) |
| 86 | with _CHDIR_LOCK: |
| 87 | saved = os.getcwd() |
| 88 | try: |
| 89 | os.chdir(tmp) |
| 90 | result = runner.invoke(None, ["init", "--domain", domain], env=_env(tmp)) |
| 91 | finally: |
| 92 | os.chdir(saved) |
| 93 | assert result.exit_code == 0, f"init failed: {result.stderr}" |
| 94 | return tmp |
| 95 | |
| 96 | |
| 97 | def _commit(root: pathlib.Path, msg: str = "commit") -> None: |
| 98 | r = runner.invoke(None, ["commit", "-m", msg], env=_env(root)) |
| 99 | assert r.exit_code == 0, f"commit failed: {r.stderr}" |
| 100 | |
| 101 | |
| 102 | def _fresh_code_repo(tmp: pathlib.Path) -> pathlib.Path: |
| 103 | _init_repo(tmp, domain="code") |
| 104 | (tmp / "main.py").write_text("x = 1\n") |
| 105 | runner.invoke(None, ["code", "add", "main.py"], env=_env(tmp)) |
| 106 | _commit(tmp, "initial") |
| 107 | return tmp |
| 108 | |
| 109 | |
| 110 | def _set_sparse(root: pathlib.Path, *patterns: str) -> None: |
| 111 | runner.invoke(None, ["sparse-checkout", "init"], env=_env(root)) |
| 112 | runner.invoke(None, ["sparse-checkout", "set", *patterns], env=_env(root)) |
| 113 | |
| 114 | |
| 115 | def _disable_sparse(root: pathlib.Path) -> None: |
| 116 | runner.invoke(None, ["sparse-checkout", "disable"], env=_env(root)) |
| 117 | |
| 118 | |
| 119 | # --------------------------------------------------------------------------- |
| 120 | # U1–U2 duration_ms and exit_code in JSON |
| 121 | # --------------------------------------------------------------------------- |
| 122 | |
| 123 | |
| 124 | class TestElapsedAndExitCode: |
| 125 | def test_U1_duration_ms_present_code_domain(self, tmp_path: pathlib.Path) -> None: |
| 126 | root = _fresh_code_repo(tmp_path) |
| 127 | data = _status(root) |
| 128 | assert "duration_ms" in data, "duration_ms missing from status JSON" |
| 129 | |
| 130 | def test_U1_duration_ms_present_non_code_domain(self, tmp_path: pathlib.Path) -> None: |
| 131 | root = _init_repo(tmp_path, domain="mist") |
| 132 | data = _status(root) |
| 133 | assert "duration_ms" in data, "duration_ms missing from non-code-domain status JSON" |
| 134 | |
| 135 | def test_U1_duration_ms_non_negative(self, tmp_path: pathlib.Path) -> None: |
| 136 | root = _fresh_code_repo(tmp_path) |
| 137 | data = _status(root) |
| 138 | assert data["duration_ms"] >= 0 |
| 139 | |
| 140 | def test_U2_exit_code_present(self, tmp_path: pathlib.Path) -> None: |
| 141 | root = _fresh_code_repo(tmp_path) |
| 142 | data = _status(root) |
| 143 | assert "exit_code" in data |
| 144 | |
| 145 | def test_U2_exit_code_zero_when_clean(self, tmp_path: pathlib.Path) -> None: |
| 146 | root = _fresh_code_repo(tmp_path) |
| 147 | data = _status(root) |
| 148 | assert data["exit_code"] == 0 |
| 149 | |
| 150 | def test_U2_exit_code_zero_when_dirty(self, tmp_path: pathlib.Path) -> None: |
| 151 | """exit_code in JSON payload is always 0 — it reflects command success.""" |
| 152 | root = _fresh_code_repo(tmp_path) |
| 153 | (root / "new.py").write_text("y = 1\n") |
| 154 | data = _status(root) |
| 155 | assert data["exit_code"] == 0 |
| 156 | |
| 157 | def test_U2_exit_code_present_in_branch_only(self, tmp_path: pathlib.Path) -> None: |
| 158 | root = _fresh_code_repo(tmp_path) |
| 159 | result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root)) |
| 160 | data = json.loads(result.stdout) |
| 161 | assert "exit_code" in data |
| 162 | |
| 163 | def test_U2_duration_ms_present_in_branch_only(self, tmp_path: pathlib.Path) -> None: |
| 164 | root = _fresh_code_repo(tmp_path) |
| 165 | result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root)) |
| 166 | data = json.loads(result.stdout) |
| 167 | assert "duration_ms" in data |
| 168 | |
| 169 | |
| 170 | # --------------------------------------------------------------------------- |
| 171 | # U3–U4 sparse_checkout field |
| 172 | # --------------------------------------------------------------------------- |
| 173 | |
| 174 | |
| 175 | class TestSparseCheckoutField: |
| 176 | def test_U3_sparse_checkout_key_present_when_disabled( |
| 177 | self, tmp_path: pathlib.Path |
| 178 | ) -> None: |
| 179 | root = _fresh_code_repo(tmp_path) |
| 180 | data = _status(root) |
| 181 | assert "sparse_checkout" in data |
| 182 | |
| 183 | def test_U3_sparse_checkout_null_when_disabled( |
| 184 | self, tmp_path: pathlib.Path |
| 185 | ) -> None: |
| 186 | root = _fresh_code_repo(tmp_path) |
| 187 | data = _status(root) |
| 188 | assert data["sparse_checkout"] is None |
| 189 | |
| 190 | def test_U3_sparse_checkout_dict_when_active( |
| 191 | self, tmp_path: pathlib.Path |
| 192 | ) -> None: |
| 193 | root = _fresh_code_repo(tmp_path) |
| 194 | _set_sparse(root, "muse/") |
| 195 | data = _status(root) |
| 196 | assert isinstance(data["sparse_checkout"], dict) |
| 197 | |
| 198 | def test_U4_sparse_checkout_enabled_field( |
| 199 | self, tmp_path: pathlib.Path |
| 200 | ) -> None: |
| 201 | root = _fresh_code_repo(tmp_path) |
| 202 | _set_sparse(root, "src/") |
| 203 | sc = _status(root)["sparse_checkout"] |
| 204 | assert sc["enabled"] is True |
| 205 | |
| 206 | def test_U4_sparse_checkout_mode_cone( |
| 207 | self, tmp_path: pathlib.Path |
| 208 | ) -> None: |
| 209 | root = _fresh_code_repo(tmp_path) |
| 210 | _set_sparse(root, "src/") |
| 211 | sc = _status(root)["sparse_checkout"] |
| 212 | assert sc["mode"] == "cone" |
| 213 | |
| 214 | def test_U4_sparse_checkout_mode_pattern( |
| 215 | self, tmp_path: pathlib.Path |
| 216 | ) -> None: |
| 217 | root = _fresh_code_repo(tmp_path) |
| 218 | runner.invoke(None, ["sparse-checkout", "init", "--no-cone"], env=_env(root)) |
| 219 | runner.invoke(None, ["sparse-checkout", "set", "**/*.py"], env=_env(root)) |
| 220 | sc = _status(root)["sparse_checkout"] |
| 221 | assert sc["mode"] == "pattern" |
| 222 | |
| 223 | def test_U4_sparse_checkout_patterns_match_config( |
| 224 | self, tmp_path: pathlib.Path |
| 225 | ) -> None: |
| 226 | root = _fresh_code_repo(tmp_path) |
| 227 | _set_sparse(root, "src/", "tests/") |
| 228 | sc = _status(root)["sparse_checkout"] |
| 229 | assert sc["patterns"] == ["src/", "tests/"] |
| 230 | |
| 231 | def test_D3_sparse_checkout_patterns_always_list( |
| 232 | self, tmp_path: pathlib.Path |
| 233 | ) -> None: |
| 234 | root = _fresh_code_repo(tmp_path) |
| 235 | _set_sparse(root, "src/") |
| 236 | sc = _status(root)["sparse_checkout"] |
| 237 | assert isinstance(sc["patterns"], list) |
| 238 | |
| 239 | def test_D5_sparse_checkout_null_after_disable( |
| 240 | self, tmp_path: pathlib.Path |
| 241 | ) -> None: |
| 242 | root = _fresh_code_repo(tmp_path) |
| 243 | _set_sparse(root, "src/") |
| 244 | assert _status(root)["sparse_checkout"] is not None |
| 245 | _disable_sparse(root) |
| 246 | assert _status(root)["sparse_checkout"] is None |
| 247 | |
| 248 | |
| 249 | # --------------------------------------------------------------------------- |
| 250 | # I1 Code-domain upstream bug — ahead/behind was hardcoded None |
| 251 | # --------------------------------------------------------------------------- |
| 252 | |
| 253 | |
| 254 | class TestCodeDomainUpstream: |
| 255 | def test_I1_code_domain_includes_upstream_key( |
| 256 | self, tmp_path: pathlib.Path |
| 257 | ) -> None: |
| 258 | """Code-domain status --json must include upstream, ahead, behind.""" |
| 259 | root = _fresh_code_repo(tmp_path) |
| 260 | data = _status(root) |
| 261 | assert "upstream" in data |
| 262 | assert "ahead" in data |
| 263 | assert "behind" in data |
| 264 | |
| 265 | def test_I1_code_domain_upstream_null_when_no_remote( |
| 266 | self, tmp_path: pathlib.Path |
| 267 | ) -> None: |
| 268 | """Without a configured upstream, these fields are null (not missing).""" |
| 269 | root = _fresh_code_repo(tmp_path) |
| 270 | data = _status(root) |
| 271 | assert data["upstream"] is None |
| 272 | assert data["ahead"] is None |
| 273 | assert data["behind"] is None |
| 274 | |
| 275 | |
| 276 | # --------------------------------------------------------------------------- |
| 277 | # I2–I3 --branch-only schema stability |
| 278 | # --------------------------------------------------------------------------- |
| 279 | |
| 280 | |
| 281 | class TestBranchOnlySchema: |
| 282 | def test_I2_merge_in_progress_always_present( |
| 283 | self, tmp_path: pathlib.Path |
| 284 | ) -> None: |
| 285 | """--branch --json must always emit merge_in_progress.""" |
| 286 | root = _fresh_code_repo(tmp_path) |
| 287 | result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root)) |
| 288 | data = json.loads(result.stdout) |
| 289 | assert "merge_in_progress" in data |
| 290 | |
| 291 | def test_I2_merge_from_always_present( |
| 292 | self, tmp_path: pathlib.Path |
| 293 | ) -> None: |
| 294 | root = _fresh_code_repo(tmp_path) |
| 295 | result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root)) |
| 296 | data = json.loads(result.stdout) |
| 297 | assert "merge_from" in data |
| 298 | |
| 299 | def test_I2_conflict_count_always_present( |
| 300 | self, tmp_path: pathlib.Path |
| 301 | ) -> None: |
| 302 | root = _fresh_code_repo(tmp_path) |
| 303 | result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root)) |
| 304 | data = json.loads(result.stdout) |
| 305 | assert "conflict_count" in data |
| 306 | |
| 307 | def test_I2_no_merge_values_are_defaults( |
| 308 | self, tmp_path: pathlib.Path |
| 309 | ) -> None: |
| 310 | root = _fresh_code_repo(tmp_path) |
| 311 | result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root)) |
| 312 | data = json.loads(result.stdout) |
| 313 | assert data["merge_in_progress"] is False |
| 314 | assert data["merge_from"] is None |
| 315 | assert data["conflict_count"] == 0 |
| 316 | |
| 317 | def test_I3_branch_only_exit_code_flag_exits_zero_when_dirty( |
| 318 | self, tmp_path: pathlib.Path |
| 319 | ) -> None: |
| 320 | """--branch --exit-code must exit 0 even when working tree is dirty.""" |
| 321 | root = _fresh_code_repo(tmp_path) |
| 322 | (root / "dirty.py").write_text("z = 1\n") |
| 323 | result = runner.invoke( |
| 324 | None, ["status", "--branch", "--exit-code", "--json"], env=_env(root) |
| 325 | ) |
| 326 | assert result.exit_code == 0 |
| 327 | |
| 328 | |
| 329 | # --------------------------------------------------------------------------- |
| 330 | # I4–I6 checkout_interrupted |
| 331 | # --------------------------------------------------------------------------- |
| 332 | |
| 333 | |
| 334 | class TestCheckoutInterrupted: |
| 335 | def test_I4_checkout_interrupted_true_when_file_exists( |
| 336 | self, tmp_path: pathlib.Path |
| 337 | ) -> None: |
| 338 | root = _fresh_code_repo(tmp_path) |
| 339 | # Simulate an interrupted checkout by writing CHECKOUT_HEAD |
| 340 | (muse_dir(root) / "CHECKOUT_HEAD").write_text("feat/x", encoding="utf-8") |
| 341 | data = _status(root) |
| 342 | assert data["checkout_interrupted"] is True |
| 343 | |
| 344 | def test_I5_checkout_target_matches_file_content( |
| 345 | self, tmp_path: pathlib.Path |
| 346 | ) -> None: |
| 347 | root = _fresh_code_repo(tmp_path) |
| 348 | (muse_dir(root) / "CHECKOUT_HEAD").write_text("feat/my-branch", encoding="utf-8") |
| 349 | data = _status(root) |
| 350 | assert data["checkout_target"] == "feat/my-branch" |
| 351 | |
| 352 | def test_I6_checkout_interrupted_false_when_absent( |
| 353 | self, tmp_path: pathlib.Path |
| 354 | ) -> None: |
| 355 | root = _fresh_code_repo(tmp_path) |
| 356 | data = _status(root) |
| 357 | assert data["checkout_interrupted"] is False |
| 358 | assert data["checkout_target"] is None |
| 359 | |
| 360 | def test_I6_checkout_interrupted_cleared_after_file_removed( |
| 361 | self, tmp_path: pathlib.Path |
| 362 | ) -> None: |
| 363 | root = _fresh_code_repo(tmp_path) |
| 364 | f = muse_dir(root) / "CHECKOUT_HEAD" |
| 365 | f.write_text("feat/x", encoding="utf-8") |
| 366 | assert _status(root)["checkout_interrupted"] is True |
| 367 | f.unlink() |
| 368 | assert _status(root)["checkout_interrupted"] is False |
| 369 | |
| 370 | |
| 371 | # --------------------------------------------------------------------------- |
| 372 | # Security |
| 373 | # --------------------------------------------------------------------------- |
| 374 | |
| 375 | |
| 376 | class TestSecurity: |
| 377 | def test_S1_ansi_in_merge_from_not_in_json_value( |
| 378 | self, tmp_path: pathlib.Path |
| 379 | ) -> None: |
| 380 | """merge_from with ANSI bytes must not propagate raw escapes into JSON.""" |
| 381 | root = _fresh_code_repo(tmp_path) |
| 382 | # Inject ANSI directly into MERGE_STATE |
| 383 | import json as _json |
| 384 | dot_muse = muse_dir(root) |
| 385 | merge_state = { |
| 386 | "other_branch": "\x1b[31mmalicious\x1b[0m", |
| 387 | "conflict_paths": [], |
| 388 | "original_conflict_paths": [], |
| 389 | "ours_commit_id": long_id("a" * 64), |
| 390 | "theirs_commit_id": long_id("b" * 64), |
| 391 | } |
| 392 | (dot_muse / "MERGE_STATE").write_text( |
| 393 | _json.dumps(merge_state), encoding="utf-8" |
| 394 | ) |
| 395 | result = runner.invoke(None, ["status", "--json"], env=_env(root)) |
| 396 | assert "\x1b" not in result.stdout |
| 397 | |
| 398 | def test_S2_ansi_in_checkout_target_not_in_json_value( |
| 399 | self, tmp_path: pathlib.Path |
| 400 | ) -> None: |
| 401 | root = _fresh_code_repo(tmp_path) |
| 402 | (muse_dir(root) / "CHECKOUT_HEAD").write_text( |
| 403 | "\x1b[31mmalicious-branch\x1b[0m", encoding="utf-8" |
| 404 | ) |
| 405 | result = runner.invoke(None, ["status", "--json"], env=_env(root)) |
| 406 | assert "\x1b" not in result.stdout |
| 407 | |
| 408 | def test_S3_ansi_in_branch_name_not_in_json( |
| 409 | self, tmp_path: pathlib.Path |
| 410 | ) -> None: |
| 411 | root = _fresh_code_repo(tmp_path) |
| 412 | # Force HEAD to point to a branch name containing ANSI |
| 413 | (head_path(root)).write_text( |
| 414 | "ref: refs/heads/\x1b[31mmalicious\x1b[0m", encoding="utf-8" |
| 415 | ) |
| 416 | result = runner.invoke(None, ["status", "--json"], env=_env(root)) |
| 417 | assert "\x1b" not in result.stdout |
| 418 | |
| 419 | def test_S4_no_raw_ansi_in_json_output(self, tmp_path: pathlib.Path) -> None: |
| 420 | root = _fresh_code_repo(tmp_path) |
| 421 | (root / "new.py").write_text("y = 1\n") |
| 422 | result = runner.invoke(None, ["status", "--json"], env=_env(root)) |
| 423 | assert "\x1b" not in result.stdout |
| 424 | |
| 425 | |
| 426 | # --------------------------------------------------------------------------- |
| 427 | # Data integrity |
| 428 | # --------------------------------------------------------------------------- |
| 429 | |
| 430 | |
| 431 | class TestDataIntegrity: |
| 432 | def test_D1_duration_ms_is_float(self, tmp_path: pathlib.Path) -> None: |
| 433 | root = _fresh_code_repo(tmp_path) |
| 434 | data = _status(root) |
| 435 | assert isinstance(data["duration_ms"], float) |
| 436 | |
| 437 | def test_D2_exit_code_is_int(self, tmp_path: pathlib.Path) -> None: |
| 438 | root = _fresh_code_repo(tmp_path) |
| 439 | data = _status(root) |
| 440 | assert isinstance(data["exit_code"], int) |
| 441 | assert not isinstance(data["exit_code"], bool) |
| 442 | |
| 443 | def test_D4_total_changes_includes_renamed( |
| 444 | self, tmp_path: pathlib.Path |
| 445 | ) -> None: |
| 446 | """total_changes must count renamed entries (non-code domain).""" |
| 447 | root = _init_repo(tmp_path, domain="mist") |
| 448 | data = _status(root) |
| 449 | expected = ( |
| 450 | len(data["added"]) |
| 451 | + len(data["modified"]) |
| 452 | + len(data["deleted"]) |
| 453 | + len(data["renamed"]) |
| 454 | ) |
| 455 | assert data["total_changes"] == expected |
| 456 | |
| 457 | def test_all_required_keys_still_present_with_new_fields( |
| 458 | self, tmp_path: pathlib.Path |
| 459 | ) -> None: |
| 460 | """Adding new fields must not drop any previously required key.""" |
| 461 | _REQUIRED_KEYS = { |
| 462 | "branch", "head_commit", "upstream", "clean", "dirty", |
| 463 | "ahead", "behind", "total_changes", "added", "modified", |
| 464 | "deleted", "renamed", "staged", "unstaged", "untracked", |
| 465 | "conflict_paths", "merge_in_progress", "merge_from", |
| 466 | "conflict_count", "checkout_interrupted", "checkout_target", |
| 467 | "duration_ms", "exit_code", "sparse_checkout", |
| 468 | } |
| 469 | root = _fresh_code_repo(tmp_path) |
| 470 | data = _status(root) |
| 471 | missing = _REQUIRED_KEYS - set(data.keys()) |
| 472 | assert not missing, f"Missing JSON keys: {missing}" |
| 473 | |
| 474 | |
| 475 | # --------------------------------------------------------------------------- |
| 476 | # Performance / stress |
| 477 | # --------------------------------------------------------------------------- |
| 478 | |
| 479 | |
| 480 | class TestPerformance: |
| 481 | @pytest.mark.slow |
| 482 | def test_P1_5000_file_repo_completes(self, tmp_path: pathlib.Path) -> None: |
| 483 | root = _init_repo(tmp_path, domain="code") |
| 484 | for i in range(5000): |
| 485 | (root / f"f_{i:05d}.py").write_text(f"x = {i}\n") |
| 486 | _commit(root, "5k files") |
| 487 | t0 = time.monotonic() |
| 488 | data = _status(root) |
| 489 | elapsed = time.monotonic() - t0 |
| 490 | assert data["clean"] is True |
| 491 | assert data["duration_ms"] >= 0 |
| 492 | assert elapsed < 10.0, f"status took {elapsed:.1f}s on 5k files" |
| 493 | |
| 494 | def test_P2_duration_ms_present_in_all_rapid_calls( |
| 495 | self, tmp_path: pathlib.Path |
| 496 | ) -> None: |
| 497 | root = _fresh_code_repo(tmp_path) |
| 498 | for i in range(10): |
| 499 | data = _status(root) |
| 500 | assert "duration_ms" in data, f"Missing duration_ms on call {i}" |
| 501 | assert data["duration_ms"] >= 0 |
| 502 | |
| 503 | def test_P3_duration_ms_consistent_across_clean_calls( |
| 504 | self, tmp_path: pathlib.Path |
| 505 | ) -> None: |
| 506 | """Two clean-tree calls should have duration_ms within 50x of each other.""" |
| 507 | root = _fresh_code_repo(tmp_path) |
| 508 | t1 = _status(root)["duration_ms"] |
| 509 | t2 = _status(root)["duration_ms"] |
| 510 | # Just verify both are plausible non-zero floats (not placeholder 0.0) |
| 511 | assert t1 >= 0 |
| 512 | assert t2 >= 0 |
| 513 | |
| 514 | |
| 515 | # --------------------------------------------------------------------------- |
| 516 | # Concurrent |
| 517 | # --------------------------------------------------------------------------- |
| 518 | |
| 519 | |
| 520 | class TestConcurrent: |
| 521 | def test_C1_four_concurrent_status_calls(self, tmp_path: pathlib.Path) -> None: |
| 522 | """4 threads each running status on their own repo must all succeed.""" |
| 523 | results: list[dict | Exception] = [None] * 4 # type: ignore[list-item] |
| 524 | |
| 525 | def _run(idx: int) -> None: |
| 526 | repo = tmp_path / f"repo_{idx}" |
| 527 | repo.mkdir() |
| 528 | try: |
| 529 | r = _init_repo(repo, domain="code") |
| 530 | (repo / "f.py").write_text(f"x = {idx}\n") |
| 531 | runner.invoke(None, ["code", "add", "."], env=_env(repo)) |
| 532 | _commit(repo, f"commit {idx}") |
| 533 | results[idx] = _status(repo) |
| 534 | except Exception as exc: |
| 535 | results[idx] = exc |
| 536 | |
| 537 | threads = [threading.Thread(target=_run, args=(i,)) for i in range(4)] |
| 538 | for t in threads: |
| 539 | t.start() |
| 540 | for t in threads: |
| 541 | t.join() |
| 542 | |
| 543 | for i, result in enumerate(results): |
| 544 | assert not isinstance(result, Exception), ( |
| 545 | f"Thread {i} raised: {result}" |
| 546 | ) |
| 547 | assert result["clean"] is True, f"Thread {i} not clean" |
| 548 | assert "duration_ms" in result |
File History
1 commit
sha256:1d3f5470f45db58e32047678debc9438fdded1b2c7332cc743d2b8be32fdafc8
fixing more broken tests
Human
patch
4 days ago