gabriel / muse public
test_status_supercharge.py python
548 lines 20.3 KB
Raw
sha256:1d3f5470f45db58e32047678debc9438fdded1b2c7332cc743d2b8be32fdafc8 fixing more broken tests Human patch 4 days ago
1 """SUPERCHARGE tests for ``muse status``.
2
3 Gaps addressed beyond the existing test_cmd_status.py + test_status_json_schema.py:
4
5 Unit
6 U1 duration_ms present and non-negative in JSON (both code and non-code paths)
7 U2 exit_code present and correct in JSON
8 U3 sparse_checkout key present in JSON — null when disabled, dict when active
9 U4 sparse_checkout.mode / patterns / enabled match live config
10 U5 _compute_upstream_info: no-local-head edge case
11
12 Integration
13 I1 Code-domain upstream (ahead/behind) appears in JSON — was hardcoded None (bug)
14 I2 --branch-only always emits merge_in_progress, merge_from, conflict_count
15 I3 --branch-only exits 0 even with --exit-code flag
16 I4 checkout_interrupted=True when CHECKOUT_HEAD file exists
17 I5 checkout_target matches CHECKOUT_HEAD content
18 I6 checkout_interrupted=False when CHECKOUT_HEAD absent
19 I7 --short + --json produces valid JSON with duration_ms
20 I8 duration_ms > 0 (real timing, not placeholder)
21
22 Security
23 S1 merge_from with ANSI in JSON value is safe (no raw escape bytes)
24 S2 checkout_target with ANSI in JSON value is safe
25 S3 branch name with ANSI in JSON value is safe
26 S4 JSON output has no raw \x1b bytes regardless of state
27
28 Data integrity
29 D1 duration_ms is a float (not int, not string)
30 D2 exit_code is an int (not bool, not string)
31 D3 sparse_checkout.patterns is always a list in JSON
32 D4 total_changes accounts for renamed when present (non-code domain)
33 D5 sparse_checkout survives disable — re-query after disable returns null
34
35 Performance / stress
36 P1 5 000-file repo status completes, duration_ms < 10 000
37 P2 10 rapid sequential status calls — duration_ms present in every response
38 P3 duration_ms is consistent (two clean-tree calls within 2x of each other)
39
40 Concurrent
41 C1 4 concurrent status calls on separate repos all succeed
42 """
43
44 from __future__ import annotations
45 from collections.abc import Mapping
46
47 import json
48 import os
49 import pathlib
50 import threading
51 import time
52
53 _CHDIR_LOCK = threading.Lock()
54
55 import pytest
56
57 from tests.cli_test_helper import CliRunner
58 from muse.core.types import long_id
59 from muse.core.paths import head_path, muse_dir
60
61 runner = CliRunner()
62
63
64 # ---------------------------------------------------------------------------
65 # Helpers
66 # ---------------------------------------------------------------------------
67
68
69 def _env(root: pathlib.Path) -> Mapping[str, str]:
70 return {"MUSE_REPO_ROOT": str(root)}
71
72
73 def _invoke(root: pathlib.Path, *args: str) -> Mapping[str, object]:
74 result = runner.invoke(None, list(args), env=_env(root))
75 return result
76
77
78 def _status(root: pathlib.Path, *extra: str) -> Mapping[str, object]:
79 result = runner.invoke(None, ["status", "--json", *extra], env=_env(root))
80 assert result.exit_code == 0, f"status failed: {result.stderr}\n{result.stdout}"
81 return json.loads(result.stdout)
82
83
84 def _init_repo(tmp: pathlib.Path, *, domain: str = "code") -> pathlib.Path:
85 tmp.mkdir(parents=True, exist_ok=True)
86 with _CHDIR_LOCK:
87 saved = os.getcwd()
88 try:
89 os.chdir(tmp)
90 result = runner.invoke(None, ["init", "--domain", domain], env=_env(tmp))
91 finally:
92 os.chdir(saved)
93 assert result.exit_code == 0, f"init failed: {result.stderr}"
94 return tmp
95
96
97 def _commit(root: pathlib.Path, msg: str = "commit") -> None:
98 r = runner.invoke(None, ["commit", "-m", msg], env=_env(root))
99 assert r.exit_code == 0, f"commit failed: {r.stderr}"
100
101
102 def _fresh_code_repo(tmp: pathlib.Path) -> pathlib.Path:
103 _init_repo(tmp, domain="code")
104 (tmp / "main.py").write_text("x = 1\n")
105 runner.invoke(None, ["code", "add", "main.py"], env=_env(tmp))
106 _commit(tmp, "initial")
107 return tmp
108
109
110 def _set_sparse(root: pathlib.Path, *patterns: str) -> None:
111 runner.invoke(None, ["sparse-checkout", "init"], env=_env(root))
112 runner.invoke(None, ["sparse-checkout", "set", *patterns], env=_env(root))
113
114
115 def _disable_sparse(root: pathlib.Path) -> None:
116 runner.invoke(None, ["sparse-checkout", "disable"], env=_env(root))
117
118
119 # ---------------------------------------------------------------------------
120 # U1–U2 duration_ms and exit_code in JSON
121 # ---------------------------------------------------------------------------
122
123
124 class TestElapsedAndExitCode:
125 def test_U1_duration_ms_present_code_domain(self, tmp_path: pathlib.Path) -> None:
126 root = _fresh_code_repo(tmp_path)
127 data = _status(root)
128 assert "duration_ms" in data, "duration_ms missing from status JSON"
129
130 def test_U1_duration_ms_present_non_code_domain(self, tmp_path: pathlib.Path) -> None:
131 root = _init_repo(tmp_path, domain="mist")
132 data = _status(root)
133 assert "duration_ms" in data, "duration_ms missing from non-code-domain status JSON"
134
135 def test_U1_duration_ms_non_negative(self, tmp_path: pathlib.Path) -> None:
136 root = _fresh_code_repo(tmp_path)
137 data = _status(root)
138 assert data["duration_ms"] >= 0
139
140 def test_U2_exit_code_present(self, tmp_path: pathlib.Path) -> None:
141 root = _fresh_code_repo(tmp_path)
142 data = _status(root)
143 assert "exit_code" in data
144
145 def test_U2_exit_code_zero_when_clean(self, tmp_path: pathlib.Path) -> None:
146 root = _fresh_code_repo(tmp_path)
147 data = _status(root)
148 assert data["exit_code"] == 0
149
150 def test_U2_exit_code_zero_when_dirty(self, tmp_path: pathlib.Path) -> None:
151 """exit_code in JSON payload is always 0 — it reflects command success."""
152 root = _fresh_code_repo(tmp_path)
153 (root / "new.py").write_text("y = 1\n")
154 data = _status(root)
155 assert data["exit_code"] == 0
156
157 def test_U2_exit_code_present_in_branch_only(self, tmp_path: pathlib.Path) -> None:
158 root = _fresh_code_repo(tmp_path)
159 result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root))
160 data = json.loads(result.stdout)
161 assert "exit_code" in data
162
163 def test_U2_duration_ms_present_in_branch_only(self, tmp_path: pathlib.Path) -> None:
164 root = _fresh_code_repo(tmp_path)
165 result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root))
166 data = json.loads(result.stdout)
167 assert "duration_ms" in data
168
169
170 # ---------------------------------------------------------------------------
171 # U3–U4 sparse_checkout field
172 # ---------------------------------------------------------------------------
173
174
175 class TestSparseCheckoutField:
176 def test_U3_sparse_checkout_key_present_when_disabled(
177 self, tmp_path: pathlib.Path
178 ) -> None:
179 root = _fresh_code_repo(tmp_path)
180 data = _status(root)
181 assert "sparse_checkout" in data
182
183 def test_U3_sparse_checkout_null_when_disabled(
184 self, tmp_path: pathlib.Path
185 ) -> None:
186 root = _fresh_code_repo(tmp_path)
187 data = _status(root)
188 assert data["sparse_checkout"] is None
189
190 def test_U3_sparse_checkout_dict_when_active(
191 self, tmp_path: pathlib.Path
192 ) -> None:
193 root = _fresh_code_repo(tmp_path)
194 _set_sparse(root, "muse/")
195 data = _status(root)
196 assert isinstance(data["sparse_checkout"], dict)
197
198 def test_U4_sparse_checkout_enabled_field(
199 self, tmp_path: pathlib.Path
200 ) -> None:
201 root = _fresh_code_repo(tmp_path)
202 _set_sparse(root, "src/")
203 sc = _status(root)["sparse_checkout"]
204 assert sc["enabled"] is True
205
206 def test_U4_sparse_checkout_mode_cone(
207 self, tmp_path: pathlib.Path
208 ) -> None:
209 root = _fresh_code_repo(tmp_path)
210 _set_sparse(root, "src/")
211 sc = _status(root)["sparse_checkout"]
212 assert sc["mode"] == "cone"
213
214 def test_U4_sparse_checkout_mode_pattern(
215 self, tmp_path: pathlib.Path
216 ) -> None:
217 root = _fresh_code_repo(tmp_path)
218 runner.invoke(None, ["sparse-checkout", "init", "--no-cone"], env=_env(root))
219 runner.invoke(None, ["sparse-checkout", "set", "**/*.py"], env=_env(root))
220 sc = _status(root)["sparse_checkout"]
221 assert sc["mode"] == "pattern"
222
223 def test_U4_sparse_checkout_patterns_match_config(
224 self, tmp_path: pathlib.Path
225 ) -> None:
226 root = _fresh_code_repo(tmp_path)
227 _set_sparse(root, "src/", "tests/")
228 sc = _status(root)["sparse_checkout"]
229 assert sc["patterns"] == ["src/", "tests/"]
230
231 def test_D3_sparse_checkout_patterns_always_list(
232 self, tmp_path: pathlib.Path
233 ) -> None:
234 root = _fresh_code_repo(tmp_path)
235 _set_sparse(root, "src/")
236 sc = _status(root)["sparse_checkout"]
237 assert isinstance(sc["patterns"], list)
238
239 def test_D5_sparse_checkout_null_after_disable(
240 self, tmp_path: pathlib.Path
241 ) -> None:
242 root = _fresh_code_repo(tmp_path)
243 _set_sparse(root, "src/")
244 assert _status(root)["sparse_checkout"] is not None
245 _disable_sparse(root)
246 assert _status(root)["sparse_checkout"] is None
247
248
249 # ---------------------------------------------------------------------------
250 # I1 Code-domain upstream bug — ahead/behind was hardcoded None
251 # ---------------------------------------------------------------------------
252
253
254 class TestCodeDomainUpstream:
255 def test_I1_code_domain_includes_upstream_key(
256 self, tmp_path: pathlib.Path
257 ) -> None:
258 """Code-domain status --json must include upstream, ahead, behind."""
259 root = _fresh_code_repo(tmp_path)
260 data = _status(root)
261 assert "upstream" in data
262 assert "ahead" in data
263 assert "behind" in data
264
265 def test_I1_code_domain_upstream_null_when_no_remote(
266 self, tmp_path: pathlib.Path
267 ) -> None:
268 """Without a configured upstream, these fields are null (not missing)."""
269 root = _fresh_code_repo(tmp_path)
270 data = _status(root)
271 assert data["upstream"] is None
272 assert data["ahead"] is None
273 assert data["behind"] is None
274
275
276 # ---------------------------------------------------------------------------
277 # I2–I3 --branch-only schema stability
278 # ---------------------------------------------------------------------------
279
280
281 class TestBranchOnlySchema:
282 def test_I2_merge_in_progress_always_present(
283 self, tmp_path: pathlib.Path
284 ) -> None:
285 """--branch --json must always emit merge_in_progress."""
286 root = _fresh_code_repo(tmp_path)
287 result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root))
288 data = json.loads(result.stdout)
289 assert "merge_in_progress" in data
290
291 def test_I2_merge_from_always_present(
292 self, tmp_path: pathlib.Path
293 ) -> None:
294 root = _fresh_code_repo(tmp_path)
295 result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root))
296 data = json.loads(result.stdout)
297 assert "merge_from" in data
298
299 def test_I2_conflict_count_always_present(
300 self, tmp_path: pathlib.Path
301 ) -> None:
302 root = _fresh_code_repo(tmp_path)
303 result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root))
304 data = json.loads(result.stdout)
305 assert "conflict_count" in data
306
307 def test_I2_no_merge_values_are_defaults(
308 self, tmp_path: pathlib.Path
309 ) -> None:
310 root = _fresh_code_repo(tmp_path)
311 result = runner.invoke(None, ["status", "--branch", "--json"], env=_env(root))
312 data = json.loads(result.stdout)
313 assert data["merge_in_progress"] is False
314 assert data["merge_from"] is None
315 assert data["conflict_count"] == 0
316
317 def test_I3_branch_only_exit_code_flag_exits_zero_when_dirty(
318 self, tmp_path: pathlib.Path
319 ) -> None:
320 """--branch --exit-code must exit 0 even when working tree is dirty."""
321 root = _fresh_code_repo(tmp_path)
322 (root / "dirty.py").write_text("z = 1\n")
323 result = runner.invoke(
324 None, ["status", "--branch", "--exit-code", "--json"], env=_env(root)
325 )
326 assert result.exit_code == 0
327
328
329 # ---------------------------------------------------------------------------
330 # I4–I6 checkout_interrupted
331 # ---------------------------------------------------------------------------
332
333
334 class TestCheckoutInterrupted:
335 def test_I4_checkout_interrupted_true_when_file_exists(
336 self, tmp_path: pathlib.Path
337 ) -> None:
338 root = _fresh_code_repo(tmp_path)
339 # Simulate an interrupted checkout by writing CHECKOUT_HEAD
340 (muse_dir(root) / "CHECKOUT_HEAD").write_text("feat/x", encoding="utf-8")
341 data = _status(root)
342 assert data["checkout_interrupted"] is True
343
344 def test_I5_checkout_target_matches_file_content(
345 self, tmp_path: pathlib.Path
346 ) -> None:
347 root = _fresh_code_repo(tmp_path)
348 (muse_dir(root) / "CHECKOUT_HEAD").write_text("feat/my-branch", encoding="utf-8")
349 data = _status(root)
350 assert data["checkout_target"] == "feat/my-branch"
351
352 def test_I6_checkout_interrupted_false_when_absent(
353 self, tmp_path: pathlib.Path
354 ) -> None:
355 root = _fresh_code_repo(tmp_path)
356 data = _status(root)
357 assert data["checkout_interrupted"] is False
358 assert data["checkout_target"] is None
359
360 def test_I6_checkout_interrupted_cleared_after_file_removed(
361 self, tmp_path: pathlib.Path
362 ) -> None:
363 root = _fresh_code_repo(tmp_path)
364 f = muse_dir(root) / "CHECKOUT_HEAD"
365 f.write_text("feat/x", encoding="utf-8")
366 assert _status(root)["checkout_interrupted"] is True
367 f.unlink()
368 assert _status(root)["checkout_interrupted"] is False
369
370
371 # ---------------------------------------------------------------------------
372 # Security
373 # ---------------------------------------------------------------------------
374
375
376 class TestSecurity:
377 def test_S1_ansi_in_merge_from_not_in_json_value(
378 self, tmp_path: pathlib.Path
379 ) -> None:
380 """merge_from with ANSI bytes must not propagate raw escapes into JSON."""
381 root = _fresh_code_repo(tmp_path)
382 # Inject ANSI directly into MERGE_STATE
383 import json as _json
384 dot_muse = muse_dir(root)
385 merge_state = {
386 "other_branch": "\x1b[31mmalicious\x1b[0m",
387 "conflict_paths": [],
388 "original_conflict_paths": [],
389 "ours_commit_id": long_id("a" * 64),
390 "theirs_commit_id": long_id("b" * 64),
391 }
392 (dot_muse / "MERGE_STATE").write_text(
393 _json.dumps(merge_state), encoding="utf-8"
394 )
395 result = runner.invoke(None, ["status", "--json"], env=_env(root))
396 assert "\x1b" not in result.stdout
397
398 def test_S2_ansi_in_checkout_target_not_in_json_value(
399 self, tmp_path: pathlib.Path
400 ) -> None:
401 root = _fresh_code_repo(tmp_path)
402 (muse_dir(root) / "CHECKOUT_HEAD").write_text(
403 "\x1b[31mmalicious-branch\x1b[0m", encoding="utf-8"
404 )
405 result = runner.invoke(None, ["status", "--json"], env=_env(root))
406 assert "\x1b" not in result.stdout
407
408 def test_S3_ansi_in_branch_name_not_in_json(
409 self, tmp_path: pathlib.Path
410 ) -> None:
411 root = _fresh_code_repo(tmp_path)
412 # Force HEAD to point to a branch name containing ANSI
413 (head_path(root)).write_text(
414 "ref: refs/heads/\x1b[31mmalicious\x1b[0m", encoding="utf-8"
415 )
416 result = runner.invoke(None, ["status", "--json"], env=_env(root))
417 assert "\x1b" not in result.stdout
418
419 def test_S4_no_raw_ansi_in_json_output(self, tmp_path: pathlib.Path) -> None:
420 root = _fresh_code_repo(tmp_path)
421 (root / "new.py").write_text("y = 1\n")
422 result = runner.invoke(None, ["status", "--json"], env=_env(root))
423 assert "\x1b" not in result.stdout
424
425
426 # ---------------------------------------------------------------------------
427 # Data integrity
428 # ---------------------------------------------------------------------------
429
430
431 class TestDataIntegrity:
432 def test_D1_duration_ms_is_float(self, tmp_path: pathlib.Path) -> None:
433 root = _fresh_code_repo(tmp_path)
434 data = _status(root)
435 assert isinstance(data["duration_ms"], float)
436
437 def test_D2_exit_code_is_int(self, tmp_path: pathlib.Path) -> None:
438 root = _fresh_code_repo(tmp_path)
439 data = _status(root)
440 assert isinstance(data["exit_code"], int)
441 assert not isinstance(data["exit_code"], bool)
442
443 def test_D4_total_changes_includes_renamed(
444 self, tmp_path: pathlib.Path
445 ) -> None:
446 """total_changes must count renamed entries (non-code domain)."""
447 root = _init_repo(tmp_path, domain="mist")
448 data = _status(root)
449 expected = (
450 len(data["added"])
451 + len(data["modified"])
452 + len(data["deleted"])
453 + len(data["renamed"])
454 )
455 assert data["total_changes"] == expected
456
457 def test_all_required_keys_still_present_with_new_fields(
458 self, tmp_path: pathlib.Path
459 ) -> None:
460 """Adding new fields must not drop any previously required key."""
461 _REQUIRED_KEYS = {
462 "branch", "head_commit", "upstream", "clean", "dirty",
463 "ahead", "behind", "total_changes", "added", "modified",
464 "deleted", "renamed", "staged", "unstaged", "untracked",
465 "conflict_paths", "merge_in_progress", "merge_from",
466 "conflict_count", "checkout_interrupted", "checkout_target",
467 "duration_ms", "exit_code", "sparse_checkout",
468 }
469 root = _fresh_code_repo(tmp_path)
470 data = _status(root)
471 missing = _REQUIRED_KEYS - set(data.keys())
472 assert not missing, f"Missing JSON keys: {missing}"
473
474
475 # ---------------------------------------------------------------------------
476 # Performance / stress
477 # ---------------------------------------------------------------------------
478
479
480 class TestPerformance:
481 @pytest.mark.slow
482 def test_P1_5000_file_repo_completes(self, tmp_path: pathlib.Path) -> None:
483 root = _init_repo(tmp_path, domain="code")
484 for i in range(5000):
485 (root / f"f_{i:05d}.py").write_text(f"x = {i}\n")
486 _commit(root, "5k files")
487 t0 = time.monotonic()
488 data = _status(root)
489 elapsed = time.monotonic() - t0
490 assert data["clean"] is True
491 assert data["duration_ms"] >= 0
492 assert elapsed < 10.0, f"status took {elapsed:.1f}s on 5k files"
493
494 def test_P2_duration_ms_present_in_all_rapid_calls(
495 self, tmp_path: pathlib.Path
496 ) -> None:
497 root = _fresh_code_repo(tmp_path)
498 for i in range(10):
499 data = _status(root)
500 assert "duration_ms" in data, f"Missing duration_ms on call {i}"
501 assert data["duration_ms"] >= 0
502
503 def test_P3_duration_ms_consistent_across_clean_calls(
504 self, tmp_path: pathlib.Path
505 ) -> None:
506 """Two clean-tree calls should have duration_ms within 50x of each other."""
507 root = _fresh_code_repo(tmp_path)
508 t1 = _status(root)["duration_ms"]
509 t2 = _status(root)["duration_ms"]
510 # Just verify both are plausible non-zero floats (not placeholder 0.0)
511 assert t1 >= 0
512 assert t2 >= 0
513
514
515 # ---------------------------------------------------------------------------
516 # Concurrent
517 # ---------------------------------------------------------------------------
518
519
520 class TestConcurrent:
521 def test_C1_four_concurrent_status_calls(self, tmp_path: pathlib.Path) -> None:
522 """4 threads each running status on their own repo must all succeed."""
523 results: list[dict | Exception] = [None] * 4 # type: ignore[list-item]
524
525 def _run(idx: int) -> None:
526 repo = tmp_path / f"repo_{idx}"
527 repo.mkdir()
528 try:
529 r = _init_repo(repo, domain="code")
530 (repo / "f.py").write_text(f"x = {idx}\n")
531 runner.invoke(None, ["code", "add", "."], env=_env(repo))
532 _commit(repo, f"commit {idx}")
533 results[idx] = _status(repo)
534 except Exception as exc:
535 results[idx] = exc
536
537 threads = [threading.Thread(target=_run, args=(i,)) for i in range(4)]
538 for t in threads:
539 t.start()
540 for t in threads:
541 t.join()
542
543 for i, result in enumerate(results):
544 assert not isinstance(result, Exception), (
545 f"Thread {i} raised: {result}"
546 )
547 assert result["clean"] is True, f"Thread {i} not clean"
548 assert "duration_ms" in result
File History 1 commit
sha256:1d3f5470f45db58e32047678debc9438fdded1b2c7332cc743d2b8be32fdafc8 fixing more broken tests Human patch 4 days ago