gabriel / muse public
test_cmd_reset_hardening.py python
629 lines 27.6 KB
Raw
sha256:3788deb0ce8d6f235a23185f7f5cd65785ecdb41b0a7f2b0107adebf45c09221 update tests/test_cmd_reset_hardening.py and tests/test_cmd… Human 14 days ago
1 """Hardening tests for ``muse reset`` — security, schema, error routing, ordering.
2
3 These tests cover the issues fixed in the security/correctness/agent-UX audit.
4 They are intentionally distinct from the existing test_cmd_reset_revert.py and
5 test_cli_reset_revert.py suites, which cover the core reset algorithm.
6
7 Coverage tiers
8 --------------
9 Unit — parser flags, dead-code removal.
10 Integration — error routing to stderr, JSON schema, --dry-run, ordering safety.
11 End-to-end — full CLI: security, branch-name sanitization.
12 Security — ANSI injection, ref sanitization, exc sanitization.
13 Stress — large repos, concurrent repos, reset-and-verify cycles.
14 """
15
16 from __future__ import annotations
17
18 import json
19 import os
20 import pathlib
21 import subprocess
22 import threading
23 import time
24
25 import pytest
26
27 from tests.cli_test_helper import CliRunner, InvokeResult
28 from muse.core.refs import (
29 get_head_commit_id,
30 read_current_branch,
31 )
32 from muse.core.object_store import object_path as snapshot_path
33 from muse.core.types import split_id
34
35 runner = CliRunner()
36
37 # ──────────────────────────────────────────────────────────────────────────────
38 # Helpers
39 # ──────────────────────────────────────────────────────────────────────────────
40
41 JSON_REQUIRED_KEYS = {
42 "branch", "ref", "old_commit_id", "new_commit_id", "snapshot_id", "mode", "dry_run",
43 }
44
45
46 def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult:
47 saved = os.getcwd()
48 try:
49 os.chdir(repo)
50 return runner.invoke(None, args)
51 finally:
52 os.chdir(saved)
53
54
55 def _reset(repo: pathlib.Path, *extra: str) -> InvokeResult:
56 return _invoke(repo, ["reset", *extra])
57
58
59 def _commit(repo: pathlib.Path, message: str) -> str:
60 """Commit current working tree and return the (prefix of) commit ID."""
61 import re
62
63 result = _invoke(repo, ["commit", "-m", message])
64 m = re.search(r'\[(?:main|[^ ]+) ([0-9a-f]{8,})', result.output)
65 return m.group(1) if m else ""
66
67
68 @pytest.fixture()
69 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
70 """Initialised repo with two commits on ``main``."""
71 saved = os.getcwd()
72 try:
73 os.chdir(tmp_path)
74 runner.invoke(None, ["init"])
75 finally:
76 os.chdir(saved)
77 (tmp_path / "a.py").write_text("x = 1\n")
78 _commit(tmp_path, "initial")
79 (tmp_path / "b.py").write_text("y = 2\n")
80 _commit(tmp_path, "add b")
81 return tmp_path
82
83
84 @pytest.fixture()
85 def c1_id(repo: pathlib.Path) -> str:
86 """Full commit ID of the first commit (HEAD~1)."""
87 from muse.core.commits import read_commit
88
89 head_id = get_head_commit_id(repo, "main") or ""
90 head = read_commit(repo, head_id)
91 return (head.parent_commit_id or "") if head else ""
92
93
94 # ──────────────────────────────────────────────────────────────────────────────
95 # Unit — parser flags
96 # ──────────────────────────────────────────────────────────────────────────────
97
98
99 class TestRegisterFlags:
100 def _parse(self, *args: str) -> "object":
101 import argparse
102 from muse.cli.commands.reset import register
103
104 p = argparse.ArgumentParser()
105 sub = p.add_subparsers()
106 register(sub)
107 return p.parse_args(["reset", *args])
108
109 def test_default_json_out_is_false(self) -> None:
110 ns = self._parse("HEAD~1")
111 assert ns.json_out is False
112
113 def test_json_flag_sets_json_out(self) -> None:
114 ns = self._parse("HEAD~1", "--json")
115 assert ns.json_out is True
116
117 def test_j_shorthand_sets_json_out(self) -> None:
118 ns = self._parse("HEAD~1", "-j")
119 assert ns.json_out is True
120
121 def test_dry_run_default_false(self) -> None:
122 import argparse
123 from muse.cli.commands.reset import register
124
125 p = argparse.ArgumentParser()
126 sub = p.add_subparsers()
127 register(sub)
128 ns = p.parse_args(["reset", "HEAD~1"])
129 assert ns.dry_run is False
130
131 def test_dry_run_flag(self) -> None:
132 import argparse
133 from muse.cli.commands.reset import register
134
135 p = argparse.ArgumentParser()
136 sub = p.add_subparsers()
137 register(sub)
138 ns = p.parse_args(["reset", "HEAD~1", "--dry-run"])
139 assert ns.dry_run is True
140
141 def test_hard_default_false(self) -> None:
142 import argparse
143 from muse.cli.commands.reset import register
144
145 p = argparse.ArgumentParser()
146 sub = p.add_subparsers()
147 register(sub)
148 ns = p.parse_args(["reset", "HEAD~1"])
149 assert ns.hard is False
150
151 def test_hard_flag(self) -> None:
152 import argparse
153 from muse.cli.commands.reset import register
154
155 p = argparse.ArgumentParser()
156 sub = p.add_subparsers()
157 register(sub)
158 ns = p.parse_args(["reset", "HEAD~1", "--hard"])
159 assert ns.hard is True
160
161 def test_force_default_false(self) -> None:
162 import argparse
163 from muse.cli.commands.reset import register
164
165 p = argparse.ArgumentParser()
166 sub = p.add_subparsers()
167 register(sub)
168 ns = p.parse_args(["reset", "HEAD~1"])
169 assert ns.force is False
170
171
172 # ──────────────────────────────────────────────────────────────────────────────
173 # Unit — dead-code removal
174 # ──────────────────────────────────────────────────────────────────────────────
175
176
177 class TestDeadCodeRemoved:
178 def test_read_branch_wrapper_removed(self) -> None:
179 import muse.cli.commands.reset as m
180
181 assert not hasattr(m, "_read_branch"), (
182 "_read_branch was a dead one-liner wrapper and must be deleted"
183 )
184
185
186 # ──────────────────────────────────────────────────────────────────────────────
187 # Integration — error routing to stderr
188 # ──────────────────────────────────────────────────────────────────────────────
189
190
191 class TestErrorRouting:
192 def test_unknown_ref_error_to_stderr(self, repo: pathlib.Path) -> None:
193 result = _reset(repo, "bogus-ref")
194 assert result.exit_code == 1
195 assert "not found" in (result.stderr or "").lower()
196 assert "not found" not in result.output.replace(result.stderr or "", "")
197
198 def test_unknown_flag_exits_nonzero(self, repo: pathlib.Path) -> None:
199 result = _reset(repo, "HEAD~1", "--format", "xml")
200 assert result.exit_code != 0
201
202 def test_missing_snapshot_error_to_stderr(self, repo: pathlib.Path, c1_id: str) -> None:
203 """When --hard reset target snapshot is missing, error goes to stderr."""
204 from muse.core.commits import read_commit
205
206 commit = read_commit(repo, c1_id)
207 if commit is None:
208 pytest.skip("Could not read c1 commit")
209 snap_id = commit.snapshot_id
210 snap_path = snapshot_path(repo, snap_id)
211 snap_path.unlink(missing_ok=True)
212
213 result = _reset(repo, c1_id, "--hard")
214 assert result.exit_code != 0
215 assert "not found" in (result.stderr or "").lower() or "snapshot" in (result.stderr or "").lower()
216
217 def test_snapshot_pre_validated_before_branch_ref_written(
218 self, repo: pathlib.Path, c1_id: str
219 ) -> None:
220 """Critical ordering fix: branch ref must NOT advance when snapshot is missing.
221
222 Before the fix, write_branch_ref() was called BEFORE read_snapshot(),
223 so a missing snapshot would leave the branch pointer at the new commit
224 with an unrestored working tree — an inconsistent, unrecoverable state.
225 """
226 from muse.core.commits import read_commit
227
228 commit = read_commit(repo, c1_id)
229 if commit is None:
230 pytest.skip("Could not read c1 commit")
231 snap_id = commit.snapshot_id
232 snap_path = snapshot_path(repo, snap_id)
233 snap_path.unlink(missing_ok=True)
234
235 before_head = get_head_commit_id(repo, "main")
236 _reset(repo, c1_id, "--hard")
237 after_head = get_head_commit_id(repo, "main")
238
239 # Branch ref must remain at the original commit — not advanced to c1.
240 assert before_head == after_head, (
241 "Branch ref was advanced even though snapshot was missing — "
242 "this is the pre-fix ordering bug"
243 )
244
245 def test_snapshot_source_in_run_before_write_branch_ref(self) -> None:
246 """Source inspection: read_snapshot must appear before write_branch_ref.
247
248 We skip comment lines (those starting with #) to avoid false matches
249 from documentation comments that reference function names.
250 """
251 import inspect
252 from muse.cli.commands.reset import run
253
254 src = inspect.getsource(run)
255 # Only consider non-comment executable lines.
256 code_lines = [
257 (i, l) for i, l in enumerate(src.split("\n"))
258 if not l.lstrip().startswith("#")
259 ]
260 snap_lineno = next((i for i, l in code_lines if "read_snapshot(" in l), -1)
261 write_lineno = next((i for i, l in code_lines if "write_branch_ref(" in l), -1)
262 assert snap_lineno != -1, "read_snapshot not found in run()"
263 assert write_lineno != -1, "write_branch_ref not found in run()"
264 assert snap_lineno < write_lineno, (
265 f"read_snapshot (line {snap_lineno}) must appear before "
266 f"write_branch_ref (line {write_lineno}) in run() — "
267 "this is the critical ordering fix that prevents orphaned branch refs"
268 )
269
270
271 # ──────────────────────────────────────────────────────────────────────────────
272 # Integration — JSON schema stability
273 # ──────────────────────────────────────────────────────────────────────────────
274
275
276 class TestJsonSchema:
277 def test_soft_reset_has_all_keys(self, repo: pathlib.Path, c1_id: str) -> None:
278 result = _reset(repo, c1_id, "--json")
279 assert result.exit_code == 0
280 data = json.loads(result.output)
281 missing = JSON_REQUIRED_KEYS - set(data)
282 assert not missing, f"Missing keys in soft reset JSON: {missing}"
283
284 def test_hard_reset_has_all_keys(self, repo: pathlib.Path, c1_id: str) -> None:
285 result = _reset(repo, c1_id, "--hard", "--json")
286 assert result.exit_code == 0
287 data = json.loads(result.output)
288 missing = JSON_REQUIRED_KEYS - set(data)
289 assert not missing, f"Missing keys in hard reset JSON: {missing}"
290
291 def test_dry_run_has_all_keys(self, repo: pathlib.Path, c1_id: str) -> None:
292 result = _reset(repo, c1_id, "--dry-run", "--json")
293 assert result.exit_code == 0
294 data = json.loads(result.output)
295 missing = JSON_REQUIRED_KEYS - set(data)
296 assert not missing, f"Missing keys in dry-run JSON: {missing}"
297
298 def test_soft_mode_is_soft(self, repo: pathlib.Path, c1_id: str) -> None:
299 result = _reset(repo, c1_id, "--json")
300 data = json.loads(result.output)
301 assert data["mode"] == "soft"
302
303 def test_hard_mode_is_hard(self, repo: pathlib.Path, c1_id: str) -> None:
304 result = _reset(repo, c1_id, "--hard", "--json")
305 data = json.loads(result.output)
306 assert data["mode"] == "hard"
307
308 def test_dry_run_flag_is_true(self, repo: pathlib.Path, c1_id: str) -> None:
309 result = _reset(repo, c1_id, "--dry-run", "--json")
310 data = json.loads(result.output)
311 assert data["dry_run"] is True
312
313 def test_live_reset_dry_run_flag_is_false(self, repo: pathlib.Path, c1_id: str) -> None:
314 result = _reset(repo, c1_id, "--json")
315 data = json.loads(result.output)
316 assert data["dry_run"] is False
317
318 def test_ref_field_matches_input(self, repo: pathlib.Path, c1_id: str) -> None:
319 result = _reset(repo, c1_id, "--json")
320 data = json.loads(result.output)
321 assert data["ref"] == c1_id
322
323 def test_snapshot_id_is_sha256(self, repo: pathlib.Path, c1_id: str) -> None:
324 result = _reset(repo, c1_id, "--json")
325 data = json.loads(result.output)
326 sid = data["snapshot_id"]
327 _, hex_part = split_id(sid)
328 assert len(hex_part) == 64, f"Expected 64-char hex after prefix, got {len(hex_part)}: {sid!r}"
329 assert all(c in "0123456789abcdef" for c in hex_part)
330
331 def test_new_commit_id_is_sha256(self, repo: pathlib.Path, c1_id: str) -> None:
332 result = _reset(repo, c1_id, "--json")
333 data = json.loads(result.output)
334 nid = data["new_commit_id"]
335 _, hex_part = split_id(nid)
336 assert len(hex_part) == 64, f"Expected 64-char hex after prefix, got {len(hex_part)}: {nid!r}"
337
338 def test_old_commit_id_was_head(self, repo: pathlib.Path, c1_id: str) -> None:
339 head_before = get_head_commit_id(repo, "main")
340 result = _reset(repo, c1_id, "--json")
341 data = json.loads(result.output)
342 assert data["old_commit_id"] == head_before
343
344 def test_branch_field_is_current_branch(self, repo: pathlib.Path, c1_id: str) -> None:
345 result = _reset(repo, c1_id, "--json")
346 data = json.loads(result.output)
347 assert data["branch"] == "main"
348
349
350 # ──────────────────────────────────────────────────────────────────────────────
351 # Integration — --dry-run
352 # ──────────────────────────────────────────────────────────────────────────────
353
354
355 class TestDryRun:
356 def test_dry_run_does_not_advance_branch(self, repo: pathlib.Path, c1_id: str) -> None:
357 before = get_head_commit_id(repo, "main")
358 _reset(repo, c1_id, "--dry-run")
359 after = get_head_commit_id(repo, "main")
360 assert before == after
361
362 def test_dry_run_does_not_modify_workdir(self, repo: pathlib.Path, c1_id: str) -> None:
363 b_content = (repo / "b.py").read_text()
364 _reset(repo, c1_id, "--dry-run", "--hard")
365 assert (repo / "b.py").read_text() == b_content
366
367 def test_dry_run_exit_code_zero(self, repo: pathlib.Path, c1_id: str) -> None:
368 result = _reset(repo, c1_id, "--dry-run")
369 assert result.exit_code == 0
370
371 def test_dry_run_invalid_ref_exits_1(self, repo: pathlib.Path) -> None:
372 result = _reset(repo, "nonexistent-ref", "--dry-run")
373 assert result.exit_code == 1
374
375 def test_dry_run_json_shows_would_be_commit(self, repo: pathlib.Path, c1_id: str) -> None:
376 result = _reset(repo, c1_id, "--dry-run", "--json")
377 data = json.loads(result.output)
378 assert data["new_commit_id"] == c1_id or data["new_commit_id"].startswith(c1_id)
379
380 def test_dry_run_text_mentions_would(self, repo: pathlib.Path, c1_id: str) -> None:
381 result = _reset(repo, c1_id, "--dry-run")
382 assert "dry-run" in result.output.lower() or "would" in result.output.lower()
383
384 def test_dry_run_does_not_write_reflog(self, repo: pathlib.Path, c1_id: str) -> None:
385 from muse.core.reflog import read_reflog
386
387 before_count = len(read_reflog(repo, "main"))
388 _reset(repo, c1_id, "--dry-run")
389 after_count = len(read_reflog(repo, "main"))
390 assert before_count == after_count
391
392
393 # ──────────────────────────────────────────────────────────────────────────────
394 # Integration — soft reset
395 # ──────────────────────────────────────────────────────────────────────────────
396
397
398 class TestSoftReset:
399 def test_soft_advances_branch_to_target(self, repo: pathlib.Path, c1_id: str) -> None:
400 _reset(repo, c1_id)
401 head = get_head_commit_id(repo, "main")
402 assert head is not None and head.startswith(c1_id)
403
404 def test_soft_preserves_working_tree(self, repo: pathlib.Path, c1_id: str) -> None:
405 before = (repo / "b.py").read_text()
406 _reset(repo, c1_id)
407 assert (repo / "b.py").read_text() == before
408
409 def test_soft_reflog_entry_written(self, repo: pathlib.Path, c1_id: str) -> None:
410 from muse.core.reflog import read_reflog
411
412 before_count = len(read_reflog(repo, "main"))
413 _reset(repo, c1_id)
414 after_count = len(read_reflog(repo, "main"))
415 assert after_count > before_count
416
417 def test_soft_text_output_has_commit_id(self, repo: pathlib.Path, c1_id: str) -> None:
418 result = _reset(repo, c1_id)
419 assert c1_id[:8] in result.output
420
421
422 # ──────────────────────────────────────────────────────────────────────────────
423 # Integration — hard reset
424 # ──────────────────────────────────────────────────────────────────────────────
425
426
427 class TestHardReset:
428 def test_hard_advances_branch_to_target(self, repo: pathlib.Path, c1_id: str) -> None:
429 result = _reset(repo, c1_id, "--hard")
430 assert result.exit_code == 0
431 head = get_head_commit_id(repo, "main")
432 assert head is not None and head.startswith(c1_id)
433
434 def test_hard_restores_workdir(self, repo: pathlib.Path, c1_id: str) -> None:
435 assert (repo / "b.py").exists()
436 result = _reset(repo, c1_id, "--hard")
437 assert result.exit_code == 0
438 # b.py was added in the second commit; resetting to c1 removes it
439 assert not (repo / "b.py").exists()
440
441 def test_hard_text_output_shows_head_is_now(self, repo: pathlib.Path, c1_id: str) -> None:
442 result = _reset(repo, c1_id, "--hard")
443 assert "HEAD is now at" in result.output or c1_id[:8] in result.output
444
445 def test_hard_uses_message_first_line(self, repo: pathlib.Path, c1_id: str) -> None:
446 """Text output shows only the first line of a multiline commit message."""
447 (repo / "x.py").write_text("x=1\n")
448 multiline_id = _commit(repo, "first line\n\nmore detail here")
449 # Go back to c1 so we can reset to the multiline commit
450 _reset(repo, c1_id)
451 result = _reset(repo, multiline_id, "--hard")
452 assert "more detail here" not in result.output
453
454
455 # ──────────────────────────────────────────────────────────────────────────────
456 # Security — ANSI injection
457 # ──────────────────────────────────────────────────────────────────────────────
458
459
460 class TestSecurityAnsi:
461 ESC = "\x1b["
462
463 def test_unknown_ref_sanitized_in_stderr(self, repo: pathlib.Path) -> None:
464 malicious_ref = f"{self.ESC}31mmalicious{self.ESC}0m"
465 result = _reset(repo, malicious_ref)
466 assert self.ESC not in (result.stderr or "")
467
468 def test_unknown_flag_with_ansi_exits_nonzero(self, repo: pathlib.Path) -> None:
469 malicious_fmt = f"{self.ESC}31mxml{self.ESC}0m"
470 result = _reset(repo, "HEAD~1", "--format", malicious_fmt)
471 assert result.exit_code != 0
472
473 def test_no_ansi_in_stdout_on_error(self, repo: pathlib.Path) -> None:
474 malicious_ref = f"{self.ESC}31mmalicious{self.ESC}0m"
475 result = _reset(repo, malicious_ref)
476 # stdout must be clean — errors go to stderr
477 stdout_only = result.output.replace(result.stderr or "", "")
478 assert self.ESC not in stdout_only
479
480 def test_exc_sanitized_in_branch_validation(self, repo: pathlib.Path) -> None:
481 """sanitize_display(str(exc)) must be used, not bare f'{exc}'."""
482 import inspect
483 from muse.cli.commands.reset import run
484
485 src = inspect.getsource(run)
486 # Confirm the pattern sanitize_display(str(exc)) is used, not bare {exc}
487 assert "sanitize_display(str(exc))" in src
488
489 def test_ref_sanitized_in_not_found_message(self) -> None:
490 """sanitize_display(ref) must be used in the not-found error message."""
491 import inspect
492 from muse.cli.commands.reset import run
493
494 src = inspect.getsource(run)
495 assert "sanitize_display(ref)" in src
496
497 def test_soft_text_output_sanitizes_branch(self, repo: pathlib.Path, c1_id: str) -> None:
498 result = _reset(repo, c1_id)
499 assert self.ESC not in result.output
500
501 def test_hard_text_output_sanitizes_message(self, repo: pathlib.Path, c1_id: str) -> None:
502 result = _reset(repo, c1_id, "--hard")
503 assert self.ESC not in result.output
504
505
506 # ──────────────────────────────────────────────────────────────────────────────
507 # Integration — get_head_commit_id replaces ref_file.read_text()
508 # ──────────────────────────────────────────────────────────────────────────────
509
510
511 class TestRefAbstraction:
512 def test_no_direct_ref_file_read(self) -> None:
513 """run() must use get_head_commit_id(), not read ref_file directly."""
514 import inspect
515 from muse.cli.commands.reset import run
516
517 src = inspect.getsource(run)
518 assert "ref_file.read_text" not in src, (
519 "Direct ref_file.read_text() bypasses the get_head_commit_id "
520 "abstraction layer and is a TOCTOU vulnerability"
521 )
522 assert "get_head_commit_id" in src
523
524 def test_old_commit_id_correct_on_first_commit(self, repo: pathlib.Path) -> None:
525 """old_commit_id in JSON must match HEAD before the reset."""
526 head = get_head_commit_id(repo, "main")
527 result = _reset(repo, "HEAD~1", "--json")
528 data = json.loads(result.output)
529 assert data["old_commit_id"] == head
530
531
532 # ──────────────────────────────────────────────────────────────────────────────
533 # Stress
534 # ──────────────────────────────────────────────────────────────────────────────
535
536
537 @pytest.mark.slow
538 class TestStress:
539 def test_soft_reset_across_50_commits(self, repo: pathlib.Path) -> None:
540 """Soft-reset across 50 commits must complete under 5s."""
541 # Add 48 more commits (we already have 2)
542 for i in range(48):
543 (repo / f"f{i:03d}.py").write_text(f"x={i}\n")
544 _commit(repo, f"commit {i}")
545
546 from muse.core.commits import read_commit
547
548 # Walk to the 10th commit from the end
549 current_id = get_head_commit_id(repo, "main") or ""
550 target_id = current_id
551 for _ in range(10):
552 c = read_commit(repo, target_id)
553 if c and c.parent_commit_id:
554 target_id = c.parent_commit_id
555
556 t0 = time.perf_counter()
557 result = _reset(repo, target_id)
558 elapsed = (time.perf_counter() - t0) * 1000
559 assert result.exit_code == 0
560 assert elapsed < 5000, f"50-commit soft reset took {elapsed:.0f}ms (limit 5s)"
561
562 def test_hard_reset_with_100_files(self, repo: pathlib.Path, c1_id: str) -> None:
563 """Hard-reset restoring 100 files must complete under 5s."""
564 for i in range(100):
565 (repo / f"g{i:03d}.py").write_text(f"y={i}\n")
566 _commit(repo, "add 100 files")
567 head_id = get_head_commit_id(repo, "main") or ""
568
569 # Reset back to c1 (removes 101 files) then back to head (restores them)
570 t0 = time.perf_counter()
571 r1 = _reset(repo, c1_id, "--hard")
572 r2 = _reset(repo, head_id, "--hard")
573 elapsed = (time.perf_counter() - t0) * 1000
574 assert r1.exit_code == 0
575 assert r2.exit_code == 0
576 assert elapsed < 5000, f"100-file hard reset cycle took {elapsed:.0f}ms"
577
578 def test_dry_run_50_commits_fast(self, repo: pathlib.Path) -> None:
579 """Dry-run across 50 commits must complete under 2s."""
580 for i in range(48):
581 (repo / f"h{i:03d}.py").write_text(f"z={i}\n")
582 _commit(repo, f"commit {i}")
583
584 t0 = time.perf_counter()
585 result = _reset(repo, "HEAD~1", "--dry-run")
586 elapsed = (time.perf_counter() - t0) * 1000
587 assert result.exit_code == 0
588 assert elapsed < 2000, f"dry-run took {elapsed:.0f}ms (limit 2s)"
589
590 def test_concurrent_resets_separate_repos(self, tmp_path: pathlib.Path) -> None:
591 """Multiple repos resetting concurrently must not interfere."""
592 errors: list[str] = []
593
594 def do_reset(idx: int) -> None:
595 repo_dir = tmp_path / f"repo_{idx}"
596 repo_dir.mkdir()
597 subprocess.run(["muse", "init"], cwd=str(repo_dir), capture_output=True)
598 (repo_dir / "a.py").write_text(f"x={idx}\n")
599 subprocess.run(
600 ["muse", "commit", "-m", f"c1_{idx}"],
601 cwd=str(repo_dir), capture_output=True,
602 )
603 (repo_dir / "b.py").write_text(f"y={idx}\n")
604 subprocess.run(
605 ["muse", "commit", "-m", f"c2_{idx}"],
606 cwd=str(repo_dir), capture_output=True,
607 )
608 r = subprocess.run(
609 ["muse", "reset", "HEAD~1", "--json"],
610 cwd=str(repo_dir), capture_output=True, text=True,
611 )
612 if r.returncode != 0:
613 errors.append(f"repo_{idx}: exit={r.returncode}, err={r.stderr[:60]}")
614 return
615 try:
616 data = json.loads(r.stdout)
617 if data["mode"] != "soft":
618 errors.append(f"repo_{idx}: unexpected mode {data['mode']}")
619 if data["dry_run"] is not False:
620 errors.append(f"repo_{idx}: dry_run not False")
621 except Exception as e:
622 errors.append(f"repo_{idx}: parse error {e}")
623
624 threads = [threading.Thread(target=do_reset, args=(i,)) for i in range(6)]
625 for t in threads:
626 t.start()
627 for t in threads:
628 t.join()
629 assert not errors, f"Concurrent reset errors:\n{'\n'.join(errors)}"
File History 1 commit
sha256:3788deb0ce8d6f235a23185f7f5cd65785ecdb41b0a7f2b0107adebf45c09221 update tests/test_cmd_reset_hardening.py and tests/test_cmd… Human 14 days ago