gabriel / muse public
test_cmd_reset_hardening.py python
632 lines 27.9 KB
Raw
sha256:3788deb0ce8d6f235a23185f7f5cd65785ecdb41b0a7f2b0107adebf45c09221 update tests/test_cmd_reset_hardening.py and tests/test_cmd… Human 4 days ago
1 """Hardening tests for ``muse reset`` — security, schema, error routing, ordering.
2
3 These tests cover the issues fixed in the security/correctness/agent-UX audit.
4 They are intentionally distinct from the existing test_cmd_reset_revert.py and
5 test_cli_reset_revert.py suites, which cover the core reset algorithm.
6
7 Coverage tiers
8 --------------
9 Unit — parser flags, dead-code removal.
10 Integration — error routing to stderr, JSON schema, --dry-run, ordering safety.
11 End-to-end — full CLI: security, branch-name sanitization.
12 Security — ANSI injection, ref sanitization, exc sanitization.
13 Stress — large repos, concurrent repos, reset-and-verify cycles.
14 """
15
16 from __future__ import annotations
17
18 import json
19 import os
20 import pathlib
21 import subprocess
22 import threading
23 import time
24
25 import pytest
26
27 from tests.cli_test_helper import CliRunner, InvokeResult
28 from muse.core.refs import (
29 get_head_commit_id,
30 read_current_branch,
31 )
32 from muse.core.object_store import object_path as snapshot_path
33 from muse.core.types import split_id
34
35 runner = CliRunner()
36
37 # ──────────────────────────────────────────────────────────────────────────────
38 # Helpers
39 # ──────────────────────────────────────────────────────────────────────────────
40
41 JSON_REQUIRED_KEYS = {
42 "branch", "ref", "old_commit_id", "new_commit_id", "snapshot_id", "mode", "dry_run",
43 }
44
45
46 def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult:
47 saved = os.getcwd()
48 try:
49 os.chdir(repo)
50 return runner.invoke(None, args)
51 finally:
52 os.chdir(saved)
53
54
55 def _reset(repo: pathlib.Path, *extra: str) -> InvokeResult:
56 return _invoke(repo, ["reset", *extra])
57
58
59 def _commit(repo: pathlib.Path, message: str) -> str:
60 """Stage all changes and commit, returning the full commit ID."""
61 import re
62
63 _invoke(repo, ["code", "add", "."])
64 result = _invoke(repo, ["commit", "-m", message])
65 m = re.search(r'\[(?:main|[^ ]+) (sha256:[0-9a-f]{64})', result.output)
66 return m.group(1) if m else ""
67
68
69 @pytest.fixture()
70 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
71 """Initialised repo with two commits on ``main``."""
72 saved = os.getcwd()
73 try:
74 os.chdir(tmp_path)
75 runner.invoke(None, ["init"])
76 finally:
77 os.chdir(saved)
78 (tmp_path / "a.py").write_text("x = 1\n")
79 _commit(tmp_path, "initial")
80 (tmp_path / "b.py").write_text("y = 2\n")
81 _commit(tmp_path, "add b")
82 return tmp_path
83
84
85 @pytest.fixture()
86 def c1_id(repo: pathlib.Path) -> str:
87 """Full commit ID of the first commit (HEAD~1)."""
88 from muse.core.commits import read_commit
89
90 head_id = get_head_commit_id(repo, "main") or ""
91 head = read_commit(repo, head_id)
92 return (head.parent_commit_id or "") if head else ""
93
94
95 # ──────────────────────────────────────────────────────────────────────────────
96 # Unit — parser flags
97 # ──────────────────────────────────────────────────────────────────────────────
98
99
100 class TestRegisterFlags:
101 def _parse(self, *args: str) -> "object":
102 import argparse
103 from muse.cli.commands.reset import register
104
105 p = argparse.ArgumentParser()
106 sub = p.add_subparsers()
107 register(sub)
108 return p.parse_args(["reset", *args])
109
110 def test_default_json_out_is_false(self) -> None:
111 ns = self._parse("HEAD~1")
112 assert ns.json_out is False
113
114 def test_json_flag_sets_json_out(self) -> None:
115 ns = self._parse("HEAD~1", "--json")
116 assert ns.json_out is True
117
118 def test_j_shorthand_sets_json_out(self) -> None:
119 ns = self._parse("HEAD~1", "-j")
120 assert ns.json_out is True
121
122 def test_dry_run_default_false(self) -> None:
123 import argparse
124 from muse.cli.commands.reset import register
125
126 p = argparse.ArgumentParser()
127 sub = p.add_subparsers()
128 register(sub)
129 ns = p.parse_args(["reset", "HEAD~1"])
130 assert ns.dry_run is False
131
132 def test_dry_run_flag(self) -> None:
133 import argparse
134 from muse.cli.commands.reset import register
135
136 p = argparse.ArgumentParser()
137 sub = p.add_subparsers()
138 register(sub)
139 ns = p.parse_args(["reset", "HEAD~1", "--dry-run"])
140 assert ns.dry_run is True
141
142 def test_hard_default_false(self) -> None:
143 import argparse
144 from muse.cli.commands.reset import register
145
146 p = argparse.ArgumentParser()
147 sub = p.add_subparsers()
148 register(sub)
149 ns = p.parse_args(["reset", "HEAD~1"])
150 assert ns.hard is False
151
152 def test_hard_flag(self) -> None:
153 import argparse
154 from muse.cli.commands.reset import register
155
156 p = argparse.ArgumentParser()
157 sub = p.add_subparsers()
158 register(sub)
159 ns = p.parse_args(["reset", "HEAD~1", "--hard"])
160 assert ns.hard is True
161
162 def test_force_default_false(self) -> None:
163 import argparse
164 from muse.cli.commands.reset import register
165
166 p = argparse.ArgumentParser()
167 sub = p.add_subparsers()
168 register(sub)
169 ns = p.parse_args(["reset", "HEAD~1"])
170 assert ns.force is False
171
172
173 # ──────────────────────────────────────────────────────────────────────────────
174 # Unit — dead-code removal
175 # ──────────────────────────────────────────────────────────────────────────────
176
177
178 class TestDeadCodeRemoved:
179 def test_read_branch_wrapper_removed(self) -> None:
180 import muse.cli.commands.reset as m
181
182 assert not hasattr(m, "_read_branch"), (
183 "_read_branch was a dead one-liner wrapper and must be deleted"
184 )
185
186
187 # ──────────────────────────────────────────────────────────────────────────────
188 # Integration — error routing to stderr
189 # ──────────────────────────────────────────────────────────────────────────────
190
191
192 class TestErrorRouting:
193 def test_unknown_ref_error_to_stderr(self, repo: pathlib.Path) -> None:
194 result = _reset(repo, "bogus-ref")
195 assert result.exit_code == 1
196 assert "not found" in (result.stderr or "").lower()
197 assert "not found" not in result.output.replace(result.stderr or "", "")
198
199 def test_unknown_flag_exits_nonzero(self, repo: pathlib.Path) -> None:
200 result = _reset(repo, "HEAD~1", "--format", "xml")
201 assert result.exit_code != 0
202
203 def test_missing_snapshot_error_to_stderr(self, repo: pathlib.Path, c1_id: str) -> None:
204 """When --hard reset target snapshot is missing, error goes to stderr."""
205 from muse.core.commits import read_commit
206
207 commit = read_commit(repo, c1_id)
208 if commit is None:
209 pytest.skip("Could not read c1 commit")
210 snap_id = commit.snapshot_id
211 snap_path = snapshot_path(repo, snap_id)
212 snap_path.unlink(missing_ok=True)
213
214 result = _reset(repo, c1_id, "--hard")
215 assert result.exit_code != 0
216 assert "not found" in (result.stderr or "").lower() or "snapshot" in (result.stderr or "").lower()
217
218 def test_snapshot_pre_validated_before_branch_ref_written(
219 self, repo: pathlib.Path, c1_id: str
220 ) -> None:
221 """Critical ordering fix: branch ref must NOT advance when snapshot is missing.
222
223 Before the fix, write_branch_ref() was called BEFORE read_snapshot(),
224 so a missing snapshot would leave the branch pointer at the new commit
225 with an unrestored working tree — an inconsistent, unrecoverable state.
226 """
227 from muse.core.commits import read_commit
228
229 commit = read_commit(repo, c1_id)
230 if commit is None:
231 pytest.skip("Could not read c1 commit")
232 snap_id = commit.snapshot_id
233 snap_path = snapshot_path(repo, snap_id)
234 snap_path.unlink(missing_ok=True)
235
236 before_head = get_head_commit_id(repo, "main")
237 _reset(repo, c1_id, "--hard")
238 after_head = get_head_commit_id(repo, "main")
239
240 # Branch ref must remain at the original commit — not advanced to c1.
241 assert before_head == after_head, (
242 "Branch ref was advanced even though snapshot was missing — "
243 "this is the pre-fix ordering bug"
244 )
245
246 def test_snapshot_source_in_run_before_write_branch_ref(self) -> None:
247 """Source inspection: read_snapshot must appear before write_branch_ref.
248
249 We skip comment lines (those starting with #) to avoid false matches
250 from documentation comments that reference function names.
251 """
252 import inspect
253 from muse.cli.commands.reset import run
254
255 src = inspect.getsource(run)
256 # Only consider non-comment executable lines.
257 code_lines = [
258 (i, l) for i, l in enumerate(src.split("\n"))
259 if not l.lstrip().startswith("#")
260 ]
261 snap_lineno = next((i for i, l in code_lines if "read_snapshot(" in l), -1)
262 write_lineno = next((i for i, l in code_lines if "write_branch_ref(" in l), -1)
263 assert snap_lineno != -1, "read_snapshot not found in run()"
264 assert write_lineno != -1, "write_branch_ref not found in run()"
265 assert snap_lineno < write_lineno, (
266 f"read_snapshot (line {snap_lineno}) must appear before "
267 f"write_branch_ref (line {write_lineno}) in run() — "
268 "this is the critical ordering fix that prevents orphaned branch refs"
269 )
270
271
272 # ──────────────────────────────────────────────────────────────────────────────
273 # Integration — JSON schema stability
274 # ──────────────────────────────────────────────────────────────────────────────
275
276
277 class TestJsonSchema:
278 def test_soft_reset_has_all_keys(self, repo: pathlib.Path, c1_id: str) -> None:
279 result = _reset(repo, c1_id, "--json")
280 assert result.exit_code == 0
281 data = json.loads(result.output)
282 missing = JSON_REQUIRED_KEYS - set(data)
283 assert not missing, f"Missing keys in soft reset JSON: {missing}"
284
285 def test_hard_reset_has_all_keys(self, repo: pathlib.Path, c1_id: str) -> None:
286 result = _reset(repo, c1_id, "--hard", "--json")
287 assert result.exit_code == 0
288 data = json.loads(result.output)
289 missing = JSON_REQUIRED_KEYS - set(data)
290 assert not missing, f"Missing keys in hard reset JSON: {missing}"
291
292 def test_dry_run_has_all_keys(self, repo: pathlib.Path, c1_id: str) -> None:
293 result = _reset(repo, c1_id, "--dry-run", "--json")
294 assert result.exit_code == 0
295 data = json.loads(result.output)
296 missing = JSON_REQUIRED_KEYS - set(data)
297 assert not missing, f"Missing keys in dry-run JSON: {missing}"
298
299 def test_soft_mode_is_soft(self, repo: pathlib.Path, c1_id: str) -> None:
300 result = _reset(repo, c1_id, "--json")
301 data = json.loads(result.output)
302 assert data["mode"] == "soft"
303
304 def test_hard_mode_is_hard(self, repo: pathlib.Path, c1_id: str) -> None:
305 result = _reset(repo, c1_id, "--hard", "--json")
306 data = json.loads(result.output)
307 assert data["mode"] == "hard"
308
309 def test_dry_run_flag_is_true(self, repo: pathlib.Path, c1_id: str) -> None:
310 result = _reset(repo, c1_id, "--dry-run", "--json")
311 data = json.loads(result.output)
312 assert data["dry_run"] is True
313
314 def test_live_reset_dry_run_flag_is_false(self, repo: pathlib.Path, c1_id: str) -> None:
315 result = _reset(repo, c1_id, "--json")
316 data = json.loads(result.output)
317 assert data["dry_run"] is False
318
319 def test_ref_field_matches_input(self, repo: pathlib.Path, c1_id: str) -> None:
320 result = _reset(repo, c1_id, "--json")
321 data = json.loads(result.output)
322 assert data["ref"] == c1_id
323
324 def test_snapshot_id_is_sha256(self, repo: pathlib.Path, c1_id: str) -> None:
325 result = _reset(repo, c1_id, "--json")
326 data = json.loads(result.output)
327 sid = data["snapshot_id"]
328 _, hex_part = split_id(sid)
329 assert len(hex_part) == 64, f"Expected 64-char hex after prefix, got {len(hex_part)}: {sid!r}"
330 assert all(c in "0123456789abcdef" for c in hex_part)
331
332 def test_new_commit_id_is_sha256(self, repo: pathlib.Path, c1_id: str) -> None:
333 result = _reset(repo, c1_id, "--json")
334 data = json.loads(result.output)
335 nid = data["new_commit_id"]
336 _, hex_part = split_id(nid)
337 assert len(hex_part) == 64, f"Expected 64-char hex after prefix, got {len(hex_part)}: {nid!r}"
338
339 def test_old_commit_id_was_head(self, repo: pathlib.Path, c1_id: str) -> None:
340 head_before = get_head_commit_id(repo, "main")
341 result = _reset(repo, c1_id, "--json")
342 data = json.loads(result.output)
343 assert data["old_commit_id"] == head_before
344
345 def test_branch_field_is_current_branch(self, repo: pathlib.Path, c1_id: str) -> None:
346 result = _reset(repo, c1_id, "--json")
347 data = json.loads(result.output)
348 assert data["branch"] == "main"
349
350
351 # ──────────────────────────────────────────────────────────────────────────────
352 # Integration — --dry-run
353 # ──────────────────────────────────────────────────────────────────────────────
354
355
356 class TestDryRun:
357 def test_dry_run_does_not_advance_branch(self, repo: pathlib.Path, c1_id: str) -> None:
358 before = get_head_commit_id(repo, "main")
359 _reset(repo, c1_id, "--dry-run")
360 after = get_head_commit_id(repo, "main")
361 assert before == after
362
363 def test_dry_run_does_not_modify_workdir(self, repo: pathlib.Path, c1_id: str) -> None:
364 b_content = (repo / "b.py").read_text()
365 _reset(repo, c1_id, "--dry-run", "--hard")
366 assert (repo / "b.py").read_text() == b_content
367
368 def test_dry_run_exit_code_zero(self, repo: pathlib.Path, c1_id: str) -> None:
369 result = _reset(repo, c1_id, "--dry-run")
370 assert result.exit_code == 0
371
372 def test_dry_run_invalid_ref_exits_1(self, repo: pathlib.Path) -> None:
373 result = _reset(repo, "nonexistent-ref", "--dry-run")
374 assert result.exit_code == 1
375
376 def test_dry_run_json_shows_would_be_commit(self, repo: pathlib.Path, c1_id: str) -> None:
377 result = _reset(repo, c1_id, "--dry-run", "--json")
378 data = json.loads(result.output)
379 assert data["new_commit_id"] == c1_id or data["new_commit_id"].startswith(c1_id)
380
381 def test_dry_run_text_mentions_would(self, repo: pathlib.Path, c1_id: str) -> None:
382 result = _reset(repo, c1_id, "--dry-run")
383 assert "dry-run" in result.output.lower() or "would" in result.output.lower()
384
385 def test_dry_run_does_not_write_reflog(self, repo: pathlib.Path, c1_id: str) -> None:
386 from muse.core.reflog import read_reflog
387
388 before_count = len(read_reflog(repo, "main"))
389 _reset(repo, c1_id, "--dry-run")
390 after_count = len(read_reflog(repo, "main"))
391 assert before_count == after_count
392
393
394 # ──────────────────────────────────────────────────────────────────────────────
395 # Integration — soft reset
396 # ──────────────────────────────────────────────────────────────────────────────
397
398
399 class TestSoftReset:
400 def test_soft_advances_branch_to_target(self, repo: pathlib.Path, c1_id: str) -> None:
401 _reset(repo, c1_id)
402 head = get_head_commit_id(repo, "main")
403 assert head is not None and head.startswith(c1_id)
404
405 def test_soft_preserves_working_tree(self, repo: pathlib.Path, c1_id: str) -> None:
406 before = (repo / "b.py").read_text()
407 _reset(repo, c1_id)
408 assert (repo / "b.py").read_text() == before
409
410 def test_soft_reflog_entry_written(self, repo: pathlib.Path, c1_id: str) -> None:
411 from muse.core.reflog import read_reflog
412
413 before_count = len(read_reflog(repo, "main"))
414 _reset(repo, c1_id)
415 after_count = len(read_reflog(repo, "main"))
416 assert after_count > before_count
417
418 def test_soft_text_output_has_commit_id(self, repo: pathlib.Path, c1_id: str) -> None:
419 result = _reset(repo, c1_id)
420 assert c1_id[:8] in result.output
421
422
423 # ──────────────────────────────────────────────────────────────────────────────
424 # Integration — hard reset
425 # ──────────────────────────────────────────────────────────────────────────────
426
427
428 class TestHardReset:
429 def test_hard_advances_branch_to_target(self, repo: pathlib.Path, c1_id: str) -> None:
430 result = _reset(repo, c1_id, "--hard")
431 assert result.exit_code == 0
432 head = get_head_commit_id(repo, "main")
433 assert head is not None and head.startswith(c1_id)
434
435 def test_hard_restores_workdir(self, repo: pathlib.Path, c1_id: str) -> None:
436 assert (repo / "b.py").exists()
437 result = _reset(repo, c1_id, "--hard")
438 assert result.exit_code == 0
439 # b.py was added in the second commit; resetting to c1 removes it
440 assert not (repo / "b.py").exists()
441
442 def test_hard_text_output_shows_head_is_now(self, repo: pathlib.Path, c1_id: str) -> None:
443 result = _reset(repo, c1_id, "--hard")
444 assert "HEAD is now at" in result.output or c1_id[:8] in result.output
445
446 def test_hard_uses_message_first_line(self, repo: pathlib.Path, c1_id: str) -> None:
447 """Text output shows only the first line of a multiline commit message."""
448 (repo / "x.py").write_text("x=1\n")
449 multiline_id = _commit(repo, "first line\n\nmore detail here")
450 # Go back to c1 so we can reset to the multiline commit
451 _reset(repo, c1_id)
452 result = _reset(repo, multiline_id, "--hard")
453 assert "more detail here" not in result.output
454
455
456 # ──────────────────────────────────────────────────────────────────────────────
457 # Security — ANSI injection
458 # ──────────────────────────────────────────────────────────────────────────────
459
460
461 class TestSecurityAnsi:
462 ESC = "\x1b["
463
464 def test_unknown_ref_sanitized_in_stderr(self, repo: pathlib.Path) -> None:
465 malicious_ref = f"{self.ESC}31mmalicious{self.ESC}0m"
466 result = _reset(repo, malicious_ref)
467 assert self.ESC not in (result.stderr or "")
468
469 def test_unknown_flag_with_ansi_exits_nonzero(self, repo: pathlib.Path) -> None:
470 malicious_fmt = f"{self.ESC}31mxml{self.ESC}0m"
471 result = _reset(repo, "HEAD~1", "--format", malicious_fmt)
472 assert result.exit_code != 0
473
474 def test_no_ansi_in_stdout_on_error(self, repo: pathlib.Path) -> None:
475 malicious_ref = f"{self.ESC}31mmalicious{self.ESC}0m"
476 result = _reset(repo, malicious_ref)
477 # stdout must be clean — errors go to stderr
478 stdout_only = result.output.replace(result.stderr or "", "")
479 assert self.ESC not in stdout_only
480
481 def test_exc_sanitized_in_branch_validation(self, repo: pathlib.Path) -> None:
482 """sanitize_display(str(exc)) must be used, not bare f'{exc}'."""
483 import inspect
484 from muse.cli.commands.reset import run
485
486 src = inspect.getsource(run)
487 # Confirm the pattern sanitize_display(str(exc)) is used, not bare {exc}
488 assert "sanitize_display(str(exc))" in src
489
490 def test_ref_sanitized_in_not_found_message(self) -> None:
491 """sanitize_display(ref) must be used in the not-found error message."""
492 import inspect
493 from muse.cli.commands.reset import run
494
495 src = inspect.getsource(run)
496 assert "sanitize_display(ref)" in src
497
498 def test_soft_text_output_sanitizes_branch(self, repo: pathlib.Path, c1_id: str) -> None:
499 result = _reset(repo, c1_id)
500 assert self.ESC not in result.output
501
502 def test_hard_text_output_sanitizes_message(self, repo: pathlib.Path, c1_id: str) -> None:
503 result = _reset(repo, c1_id, "--hard")
504 assert self.ESC not in result.output
505
506
507 # ──────────────────────────────────────────────────────────────────────────────
508 # Integration — get_head_commit_id replaces ref_file.read_text()
509 # ──────────────────────────────────────────────────────────────────────────────
510
511
512 class TestRefAbstraction:
513 def test_no_direct_ref_file_read(self) -> None:
514 """run() must use get_head_commit_id(), not read ref_file directly."""
515 import inspect
516 from muse.cli.commands.reset import run
517
518 src = inspect.getsource(run)
519 assert "ref_file.read_text" not in src, (
520 "Direct ref_file.read_text() bypasses the get_head_commit_id "
521 "abstraction layer and is a TOCTOU vulnerability"
522 )
523 assert "get_head_commit_id" in src
524
525 def test_old_commit_id_correct_on_first_commit(self, repo: pathlib.Path) -> None:
526 """old_commit_id in JSON must match HEAD before the reset."""
527 head = get_head_commit_id(repo, "main")
528 result = _reset(repo, "HEAD~1", "--json")
529 data = json.loads(result.output)
530 assert data["old_commit_id"] == head
531
532
533 # ──────────────────────────────────────────────────────────────────────────────
534 # Stress
535 # ──────────────────────────────────────────────────────────────────────────────
536
537
538 @pytest.mark.slow
539 class TestStress:
540 def test_soft_reset_across_50_commits(self, repo: pathlib.Path) -> None:
541 """Soft-reset across 50 commits must complete under 5s."""
542 # Add 48 more commits (we already have 2)
543 for i in range(48):
544 (repo / f"f{i:03d}.py").write_text(f"x={i}\n")
545 _commit(repo, f"commit {i}")
546
547 from muse.core.commits import read_commit
548
549 # Walk to the 10th commit from the end
550 current_id = get_head_commit_id(repo, "main") or ""
551 target_id = current_id
552 for _ in range(10):
553 c = read_commit(repo, target_id)
554 if c and c.parent_commit_id:
555 target_id = c.parent_commit_id
556
557 t0 = time.perf_counter()
558 result = _reset(repo, target_id)
559 elapsed = (time.perf_counter() - t0) * 1000
560 assert result.exit_code == 0
561 assert elapsed < 5000, f"50-commit soft reset took {elapsed:.0f}ms (limit 5s)"
562
563 def test_hard_reset_with_100_files(self, repo: pathlib.Path, c1_id: str) -> None:
564 """Hard-reset restoring 100 files must complete under 5s."""
565 for i in range(100):
566 (repo / f"g{i:03d}.py").write_text(f"y={i}\n")
567 _commit(repo, "add 100 files")
568 head_id = get_head_commit_id(repo, "main") or ""
569
570 # Reset back to c1 (removes 101 files) then back to head (restores them)
571 t0 = time.perf_counter()
572 r1 = _reset(repo, c1_id, "--hard")
573 r2 = _reset(repo, head_id, "--hard")
574 elapsed = (time.perf_counter() - t0) * 1000
575 assert r1.exit_code == 0
576 assert r2.exit_code == 0
577 assert elapsed < 5000, f"100-file hard reset cycle took {elapsed:.0f}ms"
578
579 def test_dry_run_50_commits_fast(self, repo: pathlib.Path) -> None:
580 """Dry-run across 50 commits must complete under 2s."""
581 for i in range(48):
582 (repo / f"h{i:03d}.py").write_text(f"z={i}\n")
583 _commit(repo, f"commit {i}")
584
585 t0 = time.perf_counter()
586 result = _reset(repo, "HEAD~1", "--dry-run")
587 elapsed = (time.perf_counter() - t0) * 1000
588 assert result.exit_code == 0
589 assert elapsed < 2000, f"dry-run took {elapsed:.0f}ms (limit 2s)"
590
591 def test_concurrent_resets_separate_repos(self, tmp_path: pathlib.Path) -> None:
592 """Multiple repos resetting concurrently must not interfere."""
593 errors: list[str] = []
594
595 def do_reset(idx: int) -> None:
596 repo_dir = tmp_path / f"repo_{idx}"
597 repo_dir.mkdir()
598 subprocess.run(["muse", "init"], cwd=str(repo_dir), capture_output=True)
599 (repo_dir / "a.py").write_text(f"x={idx}\n")
600 subprocess.run(["muse", "code", "add", "."], cwd=str(repo_dir), capture_output=True)
601 subprocess.run(
602 ["muse", "commit", "-m", f"c1_{idx}"],
603 cwd=str(repo_dir), capture_output=True,
604 )
605 (repo_dir / "b.py").write_text(f"y={idx}\n")
606 subprocess.run(["muse", "code", "add", "."], cwd=str(repo_dir), capture_output=True)
607 subprocess.run(
608 ["muse", "commit", "-m", f"c2_{idx}"],
609 cwd=str(repo_dir), capture_output=True,
610 )
611 r = subprocess.run(
612 ["muse", "reset", "HEAD~1", "--json"],
613 cwd=str(repo_dir), capture_output=True, text=True,
614 )
615 if r.returncode != 0:
616 errors.append(f"repo_{idx}: exit={r.returncode}, err={r.stderr[:60]}")
617 return
618 try:
619 data = json.loads(r.stdout)
620 if data["mode"] != "soft":
621 errors.append(f"repo_{idx}: unexpected mode {data['mode']}")
622 if data["dry_run"] is not False:
623 errors.append(f"repo_{idx}: dry_run not False")
624 except Exception as e:
625 errors.append(f"repo_{idx}: parse error {e}")
626
627 threads = [threading.Thread(target=do_reset, args=(i,)) for i in range(6)]
628 for t in threads:
629 t.start()
630 for t in threads:
631 t.join()
632 assert not errors, f"Concurrent reset errors:\n{'\n'.join(errors)}"
File History 1 commit
sha256:3788deb0ce8d6f235a23185f7f5cd65785ecdb41b0a7f2b0107adebf45c09221 update tests/test_cmd_reset_hardening.py and tests/test_cmd… Human 4 days ago