gabriel / muse public
test_cmd_conflicts.py python
762 lines 32.6 KB
Raw
sha256:1c4b3e3a9a1f300774c3ee662b572a698d5fd405bf765a71e6011a2e9c3eaaaa feat: Muse — version control for the agent era Human 72 days ago
1 """Tests for ``muse conflicts``.
2
3 Coverage tiers
4 --------------
5 Unit — parser flags, _parse_conflict, _use_color, dead-code removal.
6 Integration — no-merge state, all filters, count, exit-code, JSON schema.
7 End-to-end — full CLI invocations: text and JSON output.
8 Security — ANSI injection in conflict paths and branch names.
9 Stress — 1 000 and 10 000 conflict entries, concurrent reads.
10 """
11
12 from __future__ import annotations
13
14 import json
15 import os
16 import pathlib
17 import subprocess
18 import threading
19 import time
20 from typing import TYPE_CHECKING
21
22 import pytest
23
24 from tests.cli_test_helper import CliRunner, InvokeResult
25 from muse.cli.commands.conflicts import _ConflictInfo
26 from muse.core.merge_engine import write_merge_state
27 from muse.core.store import get_head_commit_id
28
29 if TYPE_CHECKING:
30 import argparse
31
32 runner = CliRunner()
33
34 # ──────────────────────────────────────────────────────────────────────────────
35 # Helpers
36 # ──────────────────────────────────────────────────────────────────────────────
37
38
39 def _invoke(repo: pathlib.Path, args: list[str]) -> InvokeResult:
40 saved = os.getcwd()
41 try:
42 os.chdir(repo)
43 return runner.invoke(None, args)
44 finally:
45 os.chdir(saved)
46
47
48 def _conflicts(repo: pathlib.Path, *extra: str) -> InvokeResult:
49 return _invoke(repo, ["conflicts", *extra])
50
51
52 def _commit(repo: pathlib.Path, *extra: str) -> InvokeResult:
53 return _invoke(repo, ["commit", *extra])
54
55
56 @pytest.fixture()
57 def repo(tmp_path: pathlib.Path) -> pathlib.Path:
58 """Initialised repo with one commit on ``main``, no merge in progress."""
59 saved = os.getcwd()
60 try:
61 os.chdir(tmp_path)
62 runner.invoke(None, ["init"])
63 finally:
64 os.chdir(saved)
65 (tmp_path / "a.py").write_text("x = 1\n")
66 _commit(tmp_path, "-m", "initial")
67 return tmp_path
68
69
70 @pytest.fixture()
71 def conflict_repo(repo: pathlib.Path) -> pathlib.Path:
72 """Repo with an active merge state containing 2 conflicts:
73 one whole-file (``a.py``) and one symbol-level (``src/b.py::Foo.bar``).
74 """
75 sha = get_head_commit_id(repo, "main") or ""
76 write_merge_state(
77 repo,
78 base_commit=sha,
79 ours_commit=sha,
80 theirs_commit=sha,
81 conflict_paths=["a.py", "src/b.py::Foo.bar"],
82 other_branch="feat",
83 )
84 return repo
85
86
87 def _write_state(
88 repo: pathlib.Path,
89 *,
90 paths: list[str],
91 branch: str = "feat",
92 ) -> None:
93 sha = get_head_commit_id(repo, "main") or ""
94 write_merge_state(
95 repo,
96 base_commit=sha,
97 ours_commit=sha,
98 theirs_commit=sha,
99 conflict_paths=paths,
100 other_branch=branch,
101 )
102
103
104 # ──────────────────────────────────────────────────────────────────────────────
105 # Unit — parser flags
106 # ──────────────────────────────────────────────────────────────────────────────
107
108
109 class TestRegisterFlags:
110 def _parse(self, *args: str) -> "argparse.Namespace":
111 import argparse
112
113 from muse.cli.commands.conflicts import register
114
115 p = argparse.ArgumentParser()
116 sub = p.add_subparsers()
117 register(sub)
118 return p.parse_args(["conflicts", *args])
119
120 def test_default_fmt_is_text(self) -> None:
121 ns = self._parse()
122 assert ns.fmt == "text"
123
124 def test_json_flag_sets_fmt(self) -> None:
125 ns = self._parse("--json")
126 assert ns.fmt == "json"
127
128 def test_format_json_flag(self) -> None:
129 ns = self._parse("--format", "json")
130 assert ns.fmt == "json"
131
132 def test_default_filter_is_all(self) -> None:
133 ns = self._parse()
134 assert ns.kind_filter == "all"
135
136 def test_filter_symbol(self) -> None:
137 ns = self._parse("--filter", "symbol")
138 assert ns.kind_filter == "symbol"
139
140 def test_filter_file(self) -> None:
141 ns = self._parse("--filter", "file")
142 assert ns.kind_filter == "file"
143
144 def test_filter_deleted(self) -> None:
145 ns = self._parse("--filter", "deleted")
146 assert ns.kind_filter == "deleted"
147
148 def test_filter_modified(self) -> None:
149 ns = self._parse("--filter", "modified")
150 assert ns.kind_filter == "modified"
151
152 def test_count_flag(self) -> None:
153 ns = self._parse("--count")
154 assert ns.count is True
155
156 def test_count_short_flag(self) -> None:
157 ns = self._parse("-n")
158 assert ns.count is True
159
160 def test_exit_code_flag(self) -> None:
161 ns = self._parse("--exit-code")
162 assert ns.exit_code is True
163
164 def test_exit_code_short_flag(self) -> None:
165 ns = self._parse("-z")
166 assert ns.exit_code is True
167
168 def test_exit_code_default_false(self) -> None:
169 ns = self._parse()
170 assert ns.exit_code is False
171
172
173 # ──────────────────────────────────────────────────────────────────────────────
174 # Unit — _parse_conflict
175 # ──────────────────────────────────────────────────────────────────────────────
176
177
178 class TestParseConflict:
179 def _parse(self, path: str) -> _ConflictInfo:
180 from muse.cli.commands.conflicts import _parse_conflict
181
182 return _parse_conflict(path)
183
184 def test_file_conflict_kind(self) -> None:
185 c = self._parse("src/billing.py")
186 assert c["kind"] == "file"
187 assert c["file"] == "src/billing.py"
188 assert c["symbol"] is None
189 assert c["path"] == "src/billing.py"
190
191 def test_symbol_conflict_kind(self) -> None:
192 c = self._parse("src/billing.py::Invoice.charge")
193 assert c["kind"] == "symbol"
194 assert c["file"] == "src/billing.py"
195 assert c["symbol"] == "Invoice.charge"
196 assert c["path"] == "src/billing.py::Invoice.charge"
197
198 def test_double_colon_in_symbol(self) -> None:
199 """Only the first ``::`` splits file from symbol."""
200 c = self._parse("src/foo.py::A::B")
201 assert c["kind"] == "symbol"
202 assert c["file"] == "src/foo.py"
203 assert c["symbol"] == "A::B"
204
205 def test_ansi_in_path_sanitized(self) -> None:
206 c = self._parse("\x1b[31mevil/path\x1b[0m")
207 assert "\x1b" not in (c["path"] or "")
208 assert "\x1b" not in (c["file"] or "")
209
210 def test_ansi_in_symbol_sanitized(self) -> None:
211 c = self._parse("src/a.py::\x1b[31mEvil\x1b[0m")
212 assert "\x1b" not in (c["symbol"] or "")
213
214 def test_empty_path_kind_file(self) -> None:
215 c = self._parse("")
216 assert c["kind"] == "file"
217
218
219 # ──────────────────────────────────────────────────────────────────────────────
220 # Unit — _use_color
221 # ──────────────────────────────────────────────────────────────────────────────
222
223
224 class TestUseColor:
225 def test_no_color_env_disables_color(self, monkeypatch: pytest.MonkeyPatch) -> None:
226 from muse.cli.commands.conflicts import _use_color
227
228 monkeypatch.setenv("NO_COLOR", "1")
229 assert _use_color() is False
230
231 def test_term_dumb_disables_color(self, monkeypatch: pytest.MonkeyPatch) -> None:
232 from muse.cli.commands.conflicts import _use_color
233
234 monkeypatch.setenv("TERM", "dumb")
235 assert _use_color() is False
236
237 def test_no_color_empty_string_disables(self, monkeypatch: pytest.MonkeyPatch) -> None:
238 """NO_COLOR must be present (any value) to suppress colour."""
239 from muse.cli.commands.conflicts import _use_color
240
241 # NO_COLOR="" is truthy in the spec but empty str is falsy in Python
242 # Spec says "any value" so we rely on presence.
243 monkeypatch.delenv("NO_COLOR", raising=False)
244 monkeypatch.delenv("TERM", raising=False)
245 # Cannot assert True here since stdout is not a TTY in tests, but
246 # we can at least confirm the function doesn't crash.
247 result = _use_color()
248 assert isinstance(result, bool)
249
250
251 # ──────────────────────────────────────────────────────────────────────────────
252 # Integration — no merge in progress
253 # ──────────────────────────────────────────────────────────────────────────────
254
255
256 class TestNoMerge:
257 def test_no_merge_exits_0(self, repo: pathlib.Path) -> None:
258 result = _conflicts(repo)
259 assert result.exit_code == 0
260
261 def test_no_merge_text_message(self, repo: pathlib.Path) -> None:
262 result = _conflicts(repo)
263 assert "No merge in progress" in result.output
264
265 def test_no_merge_json_schema(self, repo: pathlib.Path) -> None:
266 result = _conflicts(repo, "--json")
267 data = json.loads(result.output)
268 assert data["merge_in_progress"] is False
269 assert data["conflict_count"] == 0
270 assert data["conflicts"] == []
271 assert data["merge_from"] is None
272 assert data["ours_commit"] is None
273 assert data["theirs_commit"] is None
274 assert data["base_commit"] is None
275
276 def test_no_merge_json_next_steps_empty(self, repo: pathlib.Path) -> None:
277 result = _conflicts(repo, "--json")
278 data = json.loads(result.output)
279 assert data["next_steps"] == {}
280
281 def test_no_merge_count_is_zero(self, repo: pathlib.Path) -> None:
282 result = _conflicts(repo, "--count")
283 assert result.output.strip() == "0"
284
285 def test_no_merge_exit_code_is_0(self, repo: pathlib.Path) -> None:
286 result = _conflicts(repo, "--exit-code")
287 assert result.exit_code == 0
288
289
290 # ──────────────────────────────────────────────────────────────────────────────
291 # Integration — active merge with conflicts
292 # ──────────────────────────────────────────────────────────────────────────────
293
294
295 class TestWithConflicts:
296 def test_exits_0_by_default(self, conflict_repo: pathlib.Path) -> None:
297 result = _conflicts(conflict_repo)
298 assert result.exit_code == 0
299
300 def test_text_shows_conflict_count(self, conflict_repo: pathlib.Path) -> None:
301 result = _conflicts(conflict_repo)
302 assert "2" in result.output
303
304 def test_text_shows_merge_from(self, conflict_repo: pathlib.Path) -> None:
305 result = _conflicts(conflict_repo)
306 assert "feat" in result.output
307
308 def test_text_shows_file_paths(self, conflict_repo: pathlib.Path) -> None:
309 result = _conflicts(conflict_repo)
310 assert "a.py" in result.output
311 assert "src/b.py" in result.output
312
313 def test_json_schema_completeness(self, conflict_repo: pathlib.Path) -> None:
314 result = _conflicts(conflict_repo, "--json")
315 data = json.loads(result.output)
316 assert data["merge_in_progress"] is True
317 assert data["merge_from"] == "feat"
318 assert data["conflict_count"] == 2
319 assert data["total_conflict_count"] == 2
320 assert "ours_commit" in data
321 assert "theirs_commit" in data
322 assert "base_commit" in data
323 assert isinstance(data["conflicts"], list)
324 assert isinstance(data["next_steps"], dict)
325
326 def test_json_conflict_entry_schema(self, conflict_repo: pathlib.Path) -> None:
327 result = _conflicts(conflict_repo, "--json")
328 data = json.loads(result.output)
329 for entry in data["conflicts"]:
330 assert "path" in entry
331 assert "file" in entry
332 assert "symbol" in entry
333 assert "kind" in entry
334
335 def test_json_file_conflict_has_null_symbol(self, conflict_repo: pathlib.Path) -> None:
336 result = _conflicts(conflict_repo, "--json")
337 data = json.loads(result.output)
338 file_conflicts = [c for c in data["conflicts"] if c["kind"] == "file"]
339 assert file_conflicts
340 assert file_conflicts[0]["symbol"] is None
341
342 def test_json_symbol_conflict_has_symbol(self, conflict_repo: pathlib.Path) -> None:
343 result = _conflicts(conflict_repo, "--json")
344 data = json.loads(result.output)
345 sym_conflicts = [c for c in data["conflicts"] if c["kind"] == "symbol"]
346 assert sym_conflicts
347 assert sym_conflicts[0]["symbol"] == "Foo.bar"
348
349 def test_json_next_steps_has_all_keys(self, conflict_repo: pathlib.Path) -> None:
350 result = _conflicts(conflict_repo, "--json")
351 data = json.loads(result.output)
352 ns = data["next_steps"]
353 assert "resolve_ours" in ns
354 assert "resolve_theirs" in ns
355 assert "resolve_all_ours" in ns
356 assert "resolve_all_theirs" in ns
357 assert "commit" in ns
358 assert "abort" in ns
359
360
361 # ──────────────────────────────────────────────────────────────────────────────
362 # Integration — --filter
363 # ──────────────────────────────────────────────────────────────────────────────
364
365
366 class TestFilter:
367 def test_filter_all_returns_all(self, conflict_repo: pathlib.Path) -> None:
368 result = _conflicts(conflict_repo, "--filter", "all", "--json")
369 data = json.loads(result.output)
370 assert data["conflict_count"] == 2
371
372 def test_filter_symbol_returns_only_symbols(self, conflict_repo: pathlib.Path) -> None:
373 result = _conflicts(conflict_repo, "--filter", "symbol", "--json")
374 data = json.loads(result.output)
375 assert data["conflict_count"] == 1
376 assert data["conflicts"][0]["kind"] == "symbol"
377
378 def test_filter_file_returns_only_files(self, conflict_repo: pathlib.Path) -> None:
379 result = _conflicts(conflict_repo, "--filter", "file", "--json")
380 data = json.loads(result.output)
381 assert data["conflict_count"] == 1
382 assert data["conflicts"][0]["kind"] == "file"
383
384 def test_filter_deleted_returns_file_conflicts(self, conflict_repo: pathlib.Path) -> None:
385 """'deleted' maps to whole-file conflicts (one side removed the file)."""
386 result = _conflicts(conflict_repo, "--filter", "deleted", "--json")
387 data = json.loads(result.output)
388 assert data["conflict_count"] == 1
389 assert all(c["kind"] == "file" for c in data["conflicts"])
390
391 def test_filter_modified_returns_symbol_conflicts(self, conflict_repo: pathlib.Path) -> None:
392 """'modified' maps to symbol-level edit conflicts."""
393 result = _conflicts(conflict_repo, "--filter", "modified", "--json")
394 data = json.loads(result.output)
395 assert data["conflict_count"] == 1
396 assert all(c["kind"] == "symbol" for c in data["conflicts"])
397
398 def test_filter_total_count_reflects_all(self, conflict_repo: pathlib.Path) -> None:
399 """total_conflict_count is always the unfiltered total."""
400 result = _conflicts(conflict_repo, "--filter", "symbol", "--json")
401 data = json.loads(result.output)
402 assert data["total_conflict_count"] == 2
403 assert data["conflict_count"] == 1
404
405 def test_filter_no_match_exits_0(self, repo: pathlib.Path) -> None:
406 _write_state(repo, paths=["a.py"]) # only file conflicts
407 result = _conflicts(repo, "--filter", "symbol")
408 assert result.exit_code == 0
409 assert "No conflicts match" in result.output
410
411 def test_filter_symbol_empty_exits_0(self, repo: pathlib.Path) -> None:
412 _write_state(repo, paths=["a.py"])
413 result = _conflicts(repo, "--filter", "symbol", "--json")
414 data = json.loads(result.output)
415 assert data["conflict_count"] == 0
416
417 def test_filter_symbol_count_respects_filter(self, conflict_repo: pathlib.Path) -> None:
418 """--filter symbol --count must count only symbol conflicts."""
419 result = _conflicts(conflict_repo, "--filter", "symbol", "--count")
420 assert result.output.strip() == "1"
421
422 def test_filter_file_count_respects_filter(self, conflict_repo: pathlib.Path) -> None:
423 result = _conflicts(conflict_repo, "--filter", "file", "--count")
424 assert result.output.strip() == "1"
425
426 def test_filter_deleted_count_respects_filter(self, conflict_repo: pathlib.Path) -> None:
427 result = _conflicts(conflict_repo, "--filter", "deleted", "--count")
428 assert result.output.strip() == "1"
429
430 def test_filter_modified_count_respects_filter(self, conflict_repo: pathlib.Path) -> None:
431 result = _conflicts(conflict_repo, "--filter", "modified", "--count")
432 assert result.output.strip() == "1"
433
434 def test_filter_all_count(self, conflict_repo: pathlib.Path) -> None:
435 result = _conflicts(conflict_repo, "--count")
436 assert result.output.strip() == "2"
437
438
439 # ──────────────────────────────────────────────────────────────────────────────
440 # Integration — --count
441 # ──────────────────────────────────────────────────────────────────────────────
442
443
444 class TestCount:
445 def test_count_is_numeric(self, conflict_repo: pathlib.Path) -> None:
446 result = _conflicts(conflict_repo, "--count")
447 assert result.output.strip().isdigit()
448
449 def test_count_matches_json(self, conflict_repo: pathlib.Path) -> None:
450 count_r = _conflicts(conflict_repo, "--count")
451 json_r = _conflicts(conflict_repo, "--json")
452 data = json.loads(json_r.output)
453 assert int(count_r.output.strip()) == data["conflict_count"]
454
455 def test_count_no_merge(self, repo: pathlib.Path) -> None:
456 result = _conflicts(repo, "--count")
457 assert result.output.strip() == "0"
458
459 def test_count_exit_code_zero_when_clean(self, repo: pathlib.Path) -> None:
460 result = _conflicts(repo, "--count")
461 assert result.exit_code == 0
462
463 def test_count_exit_code_with_exit_code_flag(self, conflict_repo: pathlib.Path) -> None:
464 result = _conflicts(conflict_repo, "--count", "--exit-code")
465 assert result.exit_code == 1
466
467 def test_count_exit_code_zero_when_filtered_to_zero(self, repo: pathlib.Path) -> None:
468 _write_state(repo, paths=["a.py"]) # only file conflicts
469 result = _conflicts(repo, "--filter", "symbol", "--count", "--exit-code")
470 assert result.output.strip() == "0"
471 assert result.exit_code == 0
472
473
474 # ──────────────────────────────────────────────────────────────────────────────
475 # Integration — --exit-code
476 # ──────────────────────────────────────────────────────────────────────────────
477
478
479 class TestExitCode:
480 def test_exit_code_1_when_conflicts(self, conflict_repo: pathlib.Path) -> None:
481 result = _conflicts(conflict_repo, "--exit-code")
482 assert result.exit_code == 1
483
484 def test_exit_code_0_when_no_merge(self, repo: pathlib.Path) -> None:
485 result = _conflicts(repo, "--exit-code")
486 assert result.exit_code == 0
487
488 def test_exit_code_0_when_filter_matches_zero(self, repo: pathlib.Path) -> None:
489 _write_state(repo, paths=["a.py"]) # file only
490 result = _conflicts(repo, "--filter", "symbol", "--exit-code")
491 assert result.exit_code == 0
492
493 def test_exit_code_1_with_json(self, conflict_repo: pathlib.Path) -> None:
494 result = _conflicts(conflict_repo, "--exit-code", "--json")
495 assert result.exit_code == 1
496 data = json.loads(result.output)
497 assert data["conflict_count"] > 0
498
499 def test_exit_code_still_outputs_json(self, conflict_repo: pathlib.Path) -> None:
500 result = _conflicts(conflict_repo, "--exit-code", "--json")
501 data = json.loads(result.output)
502 assert "conflicts" in data
503
504 def test_exit_code_still_outputs_text(self, conflict_repo: pathlib.Path) -> None:
505 result = _conflicts(conflict_repo, "--exit-code")
506 assert "conflict" in result.output.lower()
507
508 def test_exit_code_with_all_resolved(self, repo: pathlib.Path) -> None:
509 sha = get_head_commit_id(repo, "main") or ""
510 write_merge_state(
511 repo,
512 base_commit=sha,
513 ours_commit=sha,
514 theirs_commit=sha,
515 conflict_paths=[], # all resolved
516 other_branch="feat",
517 )
518 result = _conflicts(repo, "--exit-code")
519 assert result.exit_code == 0
520
521
522 # ──────────────────────────────────────────────────────────────────────────────
523 # Integration — all-resolved state
524 # ──────────────────────────────────────────────────────────────────────────────
525
526
527 class TestAllResolved:
528 def test_all_resolved_exits_0(self, repo: pathlib.Path) -> None:
529 sha = get_head_commit_id(repo, "main") or ""
530 write_merge_state(
531 repo,
532 base_commit=sha,
533 ours_commit=sha,
534 theirs_commit=sha,
535 conflict_paths=[],
536 other_branch="feat",
537 )
538 result = _conflicts(repo)
539 assert result.exit_code == 0
540
541 def test_all_resolved_text_message(self, repo: pathlib.Path) -> None:
542 sha = get_head_commit_id(repo, "main") or ""
543 write_merge_state(
544 repo,
545 base_commit=sha,
546 ours_commit=sha,
547 theirs_commit=sha,
548 conflict_paths=[],
549 other_branch="feat",
550 )
551 result = _conflicts(repo)
552 assert "All conflicts resolved" in result.output
553
554 def test_all_resolved_json_count_zero(self, repo: pathlib.Path) -> None:
555 sha = get_head_commit_id(repo, "main") or ""
556 write_merge_state(
557 repo,
558 base_commit=sha,
559 ours_commit=sha,
560 theirs_commit=sha,
561 conflict_paths=[],
562 other_branch="feat",
563 )
564 result = _conflicts(repo, "--json")
565 data = json.loads(result.output)
566 assert data["conflict_count"] == 0
567 assert data["merge_in_progress"] is True
568
569
570 # ──────────────────────────────────────────────────────────────────────────────
571 # Integration — JSON schema stable keys
572 # ──────────────────────────────────────────────────────────────────────────────
573
574
575 class TestJsonSchemastability:
576 REQUIRED_KEYS = {
577 "merge_in_progress", "merge_from", "ours_commit", "theirs_commit",
578 "base_commit", "conflict_count", "total_conflict_count", "conflicts",
579 "next_steps",
580 }
581
582 def test_no_merge_has_all_keys(self, repo: pathlib.Path) -> None:
583 result = _conflicts(repo, "--json")
584 data = json.loads(result.output)
585 missing = self.REQUIRED_KEYS - set(data)
586 assert not missing, f"Missing keys in no-merge JSON: {missing}"
587
588 def test_with_conflicts_has_all_keys(self, conflict_repo: pathlib.Path) -> None:
589 result = _conflicts(conflict_repo, "--json")
590 data = json.loads(result.output)
591 missing = self.REQUIRED_KEYS - set(data)
592 assert not missing, f"Missing keys in conflict JSON: {missing}"
593
594 def test_no_merge_all_nullable_fields_are_null(self, repo: pathlib.Path) -> None:
595 result = _conflicts(repo, "--json")
596 data = json.loads(result.output)
597 assert data["merge_from"] is None
598 assert data["ours_commit"] is None
599 assert data["theirs_commit"] is None
600 assert data["base_commit"] is None
601
602
603 # ──────────────────────────────────────────────────────────────────────────────
604 # Security — ANSI injection
605 # ──────────────────────────────────────────────────────────────────────────────
606
607
608 class TestSecurityAnsi:
609 ESC = "\x1b["
610
611 def _setup_ansi_state(self, repo: pathlib.Path) -> None:
612 sha = get_head_commit_id(repo, "main") or ""
613 write_merge_state(
614 repo,
615 base_commit=sha,
616 ours_commit=sha,
617 theirs_commit=sha,
618 conflict_paths=[f"{self.ESC}31mevil/path{self.ESC}0m",
619 f"src/ok.py::{self.ESC}31mEvilSymbol{self.ESC}0m"],
620 other_branch=f"{self.ESC}31mevil-branch{self.ESC}0m",
621 )
622
623 def test_ansi_in_merge_from_sanitized_in_text(self, repo: pathlib.Path) -> None:
624 self._setup_ansi_state(repo)
625 result = _conflicts(repo)
626 assert self.ESC not in result.output
627
628 def test_ansi_in_conflict_path_sanitized_in_text(self, repo: pathlib.Path) -> None:
629 self._setup_ansi_state(repo)
630 result = _conflicts(repo)
631 assert self.ESC not in result.output
632
633 def test_ansi_in_merge_from_sanitized_in_json(self, repo: pathlib.Path) -> None:
634 self._setup_ansi_state(repo)
635 result = _conflicts(repo, "--json")
636 data = json.loads(result.output)
637 assert self.ESC not in (data.get("merge_from") or "")
638
639 def test_ansi_in_conflict_paths_sanitized_in_json(self, repo: pathlib.Path) -> None:
640 self._setup_ansi_state(repo)
641 result = _conflicts(repo, "--json")
642 data = json.loads(result.output)
643 for entry in data["conflicts"]:
644 assert self.ESC not in (entry.get("path") or "")
645 assert self.ESC not in (entry.get("file") or "")
646 assert self.ESC not in (entry.get("symbol") or "")
647
648 def test_ansi_in_symbol_sanitized_in_json(self, repo: pathlib.Path) -> None:
649 self._setup_ansi_state(repo)
650 result = _conflicts(repo, "--json")
651 data = json.loads(result.output)
652 sym_conflicts = [c for c in data["conflicts"] if c["kind"] == "symbol"]
653 for c in sym_conflicts:
654 assert self.ESC not in (c.get("symbol") or "")
655
656 def test_no_ansi_in_count_output(self, repo: pathlib.Path) -> None:
657 self._setup_ansi_state(repo)
658 result = _conflicts(repo, "--count")
659 assert self.ESC not in result.output
660
661
662 # ──────────────────────────────────────────────────────────────────────────────
663 # Stress
664 # ──────────────────────────────────────────────────────────────────────────────
665
666
667 @pytest.mark.slow
668 class TestStress:
669 def test_1000_conflicts_fast(self, repo: pathlib.Path) -> None:
670 """Listing 1000 conflicts must complete in under 500ms."""
671 paths = [f"src/module_{i:04d}.py::Func{i}" for i in range(1000)]
672 _write_state(repo, paths=paths)
673
674 t0 = time.perf_counter()
675 result = _conflicts(repo, "--json")
676 elapsed = (time.perf_counter() - t0) * 1000
677 data = json.loads(result.output)
678 assert data["conflict_count"] == 1000
679 assert elapsed < 500, f"1000 conflicts took {elapsed:.0f}ms (limit 500ms)"
680
681 def test_10000_conflicts_fast(self, repo: pathlib.Path) -> None:
682 """Listing 10 000 conflicts must complete in under 2s."""
683 paths = [f"src/m_{i:05d}.py::F{i}" for i in range(10000)]
684 _write_state(repo, paths=paths)
685
686 t0 = time.perf_counter()
687 result = _conflicts(repo, "--count")
688 elapsed = (time.perf_counter() - t0) * 1000
689 assert result.output.strip() == "10000"
690 assert elapsed < 2000, f"10000 conflicts took {elapsed:.0f}ms (limit 2s)"
691
692 def test_1000_file_conflicts_filter_fast(self, repo: pathlib.Path) -> None:
693 """Filtering 1000 file conflicts must be fast."""
694 paths = [f"file_{i:04d}.py" for i in range(500)] + \
695 [f"src/s_{i:04d}.py::Sym{i}" for i in range(500)]
696 _write_state(repo, paths=paths)
697
698 t0 = time.perf_counter()
699 result = _conflicts(repo, "--filter", "file", "--count")
700 elapsed = (time.perf_counter() - t0) * 1000
701 assert result.output.strip() == "500"
702 assert elapsed < 500, f"Filtered count of 500 took {elapsed:.0f}ms"
703
704 def test_concurrent_reads_do_not_corrupt(self, repo: pathlib.Path) -> None:
705 """Multiple threads reading conflict state must not interfere."""
706 paths = [f"src/c_{i}.py::F{i}" for i in range(200)]
707 _write_state(repo, paths=paths)
708 errors: list[str] = []
709
710 def read_conflicts() -> None:
711 try:
712 result = _conflicts(repo, "--json")
713 data = json.loads(result.output)
714 if data["conflict_count"] != 200:
715 errors.append(f"Expected 200, got {data['conflict_count']}")
716 except Exception as e:
717 errors.append(str(e))
718
719 threads = [threading.Thread(target=read_conflicts) for _ in range(10)]
720 for t in threads:
721 t.start()
722 for t in threads:
723 t.join()
724 assert not errors, "Concurrent read errors:\n" + "\n".join(errors)
725
726 def test_concurrent_reads_separate_repos(self, tmp_path: pathlib.Path) -> None:
727 """Separate repos read concurrently must not interfere."""
728 errors: list[str] = []
729
730 def check_repo(idx: int) -> None:
731 repo_dir = tmp_path / f"repo_{idx}"
732 repo_dir.mkdir()
733 subprocess.run(["muse", "init"], cwd=str(repo_dir), capture_output=True)
734 (repo_dir / "x.py").write_text(f"x={idx}\n")
735 subprocess.run(
736 ["muse", "commit", "-m", f"base{idx}"],
737 cwd=str(repo_dir), capture_output=True,
738 )
739 paths = [f"f{i}.py" for i in range(50)]
740 from muse.core.store import get_head_commit_id as _ghci
741 sha = _ghci(repo_dir, "main") or ""
742 write_merge_state(
743 repo_dir, base_commit=sha, ours_commit=sha, theirs_commit=sha,
744 conflict_paths=paths, other_branch="feat",
745 )
746 r = subprocess.run(
747 ["muse", "conflicts", "--json"],
748 cwd=str(repo_dir), capture_output=True, text=True,
749 )
750 try:
751 data = json.loads(r.stdout)
752 if data["conflict_count"] != 50:
753 errors.append(f"repo_{idx}: expected 50, got {data['conflict_count']}")
754 except Exception as e:
755 errors.append(f"repo_{idx}: {e}")
756
757 threads = [threading.Thread(target=check_repo, args=(i,)) for i in range(5)]
758 for t in threads:
759 t.start()
760 for t in threads:
761 t.join()
762 assert not errors, "Concurrent multi-repo errors:\n" + "\n".join(errors)
File History 1 commit
sha256:1c4b3e3a9a1f300774c3ee662b572a698d5fd405bf765a71e6011a2e9c3eaaaa feat: Muse — version control for the agent era Human 72 days ago