gabriel / muse public
test_cmd_merge_tree.py python
486 lines 17.9 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 20 days ago
1 """Tests for ``muse merge-tree`` — three-way merge without working tree modification.
2
3 Coverage tiers:
4 - Unit: clean merge (no conflicts), conflicting merge, trivial merge
5 (one branch unchanged), explicit --base override, JSON schema,
6 working-tree isolation (no files written without --write-objects)
7 - Integration: merge-tree result matches actual muse merge; --write-objects
8 creates snapshot; text output; nonexistent branch exits nonzero;
9 batch conflict reporting; same-base same-result determinism
10 - Security: ANSI in branch name rejected; no working-tree mutation
11 - Stress: 50-file merge with 30% conflicts; manifest cache (3 snapshot reads)
12 """
13
14 from __future__ import annotations
15 from collections.abc import Mapping
16
17 import datetime
18 import json
19 import pathlib
20
21 import pytest
22
23 from tests.cli_test_helper import CliRunner, InvokeResult
24 from muse.core.object_store import write_object
25 from muse.core.ids import hash_commit, hash_snapshot
26 from muse.core.commits import (
27 CommitRecord,
28 write_commit,
29 )
30 from muse.core.snapshots import (
31 SnapshotRecord,
32 write_snapshot,
33 )
34 from muse.core.types import Manifest, blob_id
35 from muse.core.paths import merge_state_path, muse_dir, ref_path
36
37 runner = CliRunner()
38
39 _REPO_ID = "merge-tree-test"
40 _counter = 0
41
42
43 # ---------------------------------------------------------------------------
44 # Helpers
45 # ---------------------------------------------------------------------------
46
47
48
49
50 def _init_repo(path: pathlib.Path) -> pathlib.Path:
51 dot_muse = muse_dir(path)
52 for d in ("commits", "snapshots", "objects", "refs/heads", "code"):
53 (dot_muse / d).mkdir(parents=True, exist_ok=True)
54 (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8")
55 (dot_muse / "repo.json").write_text(
56 json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8"
57 )
58 return path
59
60
61 def _env(repo: pathlib.Path) -> Mapping[str, str]:
62 return {"MUSE_REPO_ROOT": str(repo)}
63
64
65 def _write_files(root: pathlib.Path, files: Mapping[str, bytes]) -> Manifest:
66 manifest: Manifest = {}
67 for rel_path, content in files.items():
68 obj_id = blob_id(content)
69 write_object(root, obj_id, content)
70 manifest[rel_path] = obj_id
71 abs_path = root / rel_path
72 abs_path.parent.mkdir(parents=True, exist_ok=True)
73 abs_path.write_bytes(content)
74 return manifest
75
76
77 def _commit(
78 root: pathlib.Path,
79 files: Mapping[str, bytes],
80 branch: str = "main",
81 parent_id: str | None = None,
82 message: str | None = None,
83 ) -> str:
84 global _counter
85 _counter += 1
86 manifest = _write_files(root, files)
87 snap_id = hash_snapshot(manifest)
88 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
89 committed_at = datetime.datetime.now(datetime.timezone.utc)
90 msg = message or f"commit {_counter}"
91 commit_id = hash_commit( parent_ids=[parent_id] if parent_id else [],
92 snapshot_id=snap_id,
93 message=msg,
94 committed_at_iso=committed_at.isoformat(),
95 )
96 write_commit(root, CommitRecord(
97 commit_id=commit_id, branch=branch,
98 snapshot_id=snap_id, message=msg, committed_at=committed_at,
99 parent_commit_id=parent_id,
100 ))
101 branch_ref = ref_path(root, branch)
102 branch_ref.parent.mkdir(parents=True, exist_ok=True)
103 branch_ref.write_text(commit_id, encoding="utf-8")
104 return commit_id
105
106
107 def _invoke(repo: pathlib.Path, *args: str) -> InvokeResult:
108 from muse.cli.app import main as cli
109 return runner.invoke(cli, ["merge-tree", *args], env=_env(repo))
110
111
112 def _setup_divergent_repo(root: pathlib.Path) -> tuple[str, str, str]:
113 """Create a base commit then two branches with non-overlapping changes.
114
115 Returns (base_id, branch_a_id, branch_b_id).
116 """
117 base_id = _commit(root, {
118 "shared.py": b"x = 0\n",
119 "only_base.py": b"base\n",
120 }, branch="main")
121
122 a_id = _commit(root, {
123 "shared.py": b"x = 0\n",
124 "only_base.py": b"base\n",
125 "a_new.py": b"a = 1\n",
126 }, branch="branch-a", parent_id=base_id)
127
128 b_id = _commit(root, {
129 "shared.py": b"x = 0\n",
130 "only_base.py": b"base\n",
131 "b_new.py": b"b = 2\n",
132 }, branch="branch-b", parent_id=base_id)
133
134 return base_id, a_id, b_id
135
136
137 def _setup_conflicting_repo(root: pathlib.Path) -> tuple[str, str, str]:
138 """Create a base commit then two branches that both modify shared.py differently."""
139 base_id = _commit(root, {"shared.py": b"x = 0\n"}, branch="main")
140
141 a_id = _commit(root, {"shared.py": b"x = 1\n"}, branch="branch-a", parent_id=base_id)
142 b_id = _commit(root, {"shared.py": b"x = 2\n"}, branch="branch-b", parent_id=base_id)
143
144 return base_id, a_id, b_id
145
146
147 # ---------------------------------------------------------------------------
148 # Unit — clean merge (no conflicts)
149 # ---------------------------------------------------------------------------
150
151
152 def test_clean_merge_exits_zero(tmp_path: pathlib.Path) -> None:
153 root = _init_repo(tmp_path)
154 _setup_divergent_repo(root)
155 result = _invoke(root, "branch-a", "branch-b", "--json")
156 assert result.exit_code == 0
157
158
159 def test_clean_merge_json_schema(tmp_path: pathlib.Path) -> None:
160 root = _init_repo(tmp_path)
161 _setup_divergent_repo(root)
162 result = _invoke(root, "branch-a", "branch-b", "--json")
163 assert result.exit_code == 0
164 data = json.loads(result.stdout)
165 for key in ("base", "branch1", "branch2", "conflicts", "merged_manifest", "trivially_merged"):
166 assert key in data, f"missing key: {key}"
167
168
169 def test_clean_merge_no_conflicts(tmp_path: pathlib.Path) -> None:
170 root = _init_repo(tmp_path)
171 _setup_divergent_repo(root)
172 result = _invoke(root, "branch-a", "branch-b", "--json")
173 data = json.loads(result.stdout)
174 assert data["conflicts"] == []
175 assert data["trivially_merged"] is True
176
177
178 def test_clean_merge_manifest_contains_both_changes(tmp_path: pathlib.Path) -> None:
179 root = _init_repo(tmp_path)
180 _setup_divergent_repo(root)
181 result = _invoke(root, "branch-a", "branch-b", "--json")
182 data = json.loads(result.stdout)
183 manifest = data["merged_manifest"]
184 assert "a_new.py" in manifest
185 assert "b_new.py" in manifest
186 assert manifest["a_new.py"] is not None
187 assert manifest["b_new.py"] is not None
188
189
190 # ---------------------------------------------------------------------------
191 # Unit — conflicting merge
192 # ---------------------------------------------------------------------------
193
194
195 def test_conflicting_merge_exits_nonzero(tmp_path: pathlib.Path) -> None:
196 root = _init_repo(tmp_path)
197 _setup_conflicting_repo(root)
198 result = _invoke(root, "branch-a", "branch-b", "--json")
199 assert result.exit_code != 0
200
201
202 def test_conflicting_merge_reports_conflict_paths(tmp_path: pathlib.Path) -> None:
203 root = _init_repo(tmp_path)
204 _setup_conflicting_repo(root)
205 result = _invoke(root, "branch-a", "branch-b", "--json")
206 data = json.loads(result.stdout)
207 assert "shared.py" in data["conflicts"]
208 assert data["trivially_merged"] is False
209
210
211 def test_conflicting_merge_manifest_has_null_for_conflicts(tmp_path: pathlib.Path) -> None:
212 root = _init_repo(tmp_path)
213 _setup_conflicting_repo(root)
214 result = _invoke(root, "branch-a", "branch-b", "--json")
215 data = json.loads(result.stdout)
216 assert data["merged_manifest"]["shared.py"] is None
217
218
219 # ---------------------------------------------------------------------------
220 # Unit — trivial merge (one branch unchanged)
221 # ---------------------------------------------------------------------------
222
223
224 def test_trivial_merge_one_side_unchanged(tmp_path: pathlib.Path) -> None:
225 root = _init_repo(tmp_path)
226 base_id = _commit(root, {"a.py": b"x=1\n"}, branch="main")
227 # branch-a has a new file; branch-b is identical to base
228 _commit(root, {"a.py": b"x=1\n", "new.py": b"y=2\n"}, branch="branch-a", parent_id=base_id)
229 _commit(root, {"a.py": b"x=1\n"}, branch="branch-b", parent_id=base_id)
230 result = _invoke(root, "branch-a", "branch-b", "--json")
231 data = json.loads(result.stdout)
232 assert data["conflicts"] == []
233 assert "new.py" in data["merged_manifest"]
234
235
236 # ---------------------------------------------------------------------------
237 # Unit — explicit --base override
238 # ---------------------------------------------------------------------------
239
240
241 def test_explicit_base_override(tmp_path: pathlib.Path) -> None:
242 root = _init_repo(tmp_path)
243 base_id = _commit(root, {"a.py": b"v0\n"}, branch="main")
244 a_id = _commit(root, {"a.py": b"v1\n"}, branch="branch-a", parent_id=base_id)
245 b_id = _commit(root, {"a.py": b"v2\n"}, branch="branch-b", parent_id=base_id)
246 # With explicit base the merge still uses the same base → same result
247 result = _invoke(root, "branch-a", "branch-b", "--base", base_id, "--json")
248 data = json.loads(result.stdout)
249 assert data["base"] == base_id
250
251
252 def test_explicit_base_nonexistent_exits_nonzero(tmp_path: pathlib.Path) -> None:
253 root = _init_repo(tmp_path)
254 base_id = _commit(root, {"a.py": b"v0\n"}, branch="main")
255 a_id = _commit(root, {"a.py": b"v1\n"}, branch="branch-a", parent_id=base_id)
256 b_id = _commit(root, {"a.py": b"v2\n"}, branch="branch-b", parent_id=base_id)
257 bad_base = "a" * 64
258 result = _invoke(root, "branch-a", "branch-b", "--base", bad_base, "--json")
259 assert result.exit_code != 0
260
261
262 # ---------------------------------------------------------------------------
263 # Unit — working-tree isolation
264 # ---------------------------------------------------------------------------
265
266
267 def test_no_working_tree_mutation(tmp_path: pathlib.Path) -> None:
268 """merge-tree must never write to the working tree."""
269 root = _init_repo(tmp_path)
270 _setup_divergent_repo(root)
271 # Record working-tree state before
272 before = {p.name for p in root.iterdir() if not p.name.startswith(".")}
273 _invoke(root, "branch-a", "branch-b", "--json")
274 after = {p.name for p in root.iterdir() if not p.name.startswith(".")}
275 assert before == after
276
277
278 def test_no_merge_state_written(tmp_path: pathlib.Path) -> None:
279 """merge-tree must not write MERGE_STATE.json."""
280 root = _init_repo(tmp_path)
281 _setup_conflicting_repo(root)
282 _invoke(root, "branch-a", "branch-b", "--json")
283 assert not (merge_state_path(root)).exists()
284
285
286 # ---------------------------------------------------------------------------
287 # Integration — --write-objects
288 # ---------------------------------------------------------------------------
289
290
291 def test_write_objects_creates_snapshot(tmp_path: pathlib.Path) -> None:
292 root = _init_repo(tmp_path)
293 _setup_divergent_repo(root)
294 result = _invoke(root, "branch-a", "branch-b", "--write-objects", "--json")
295 assert result.exit_code == 0
296 data = json.loads(result.stdout)
297 assert "snapshot_id" in data
298 snap_id = data["snapshot_id"]
299 # Snapshot must exist in the unified object store
300 from muse.core.object_store import object_path
301 assert object_path(root, snap_id).exists()
302
303
304 def test_write_objects_snapshot_matches_manifest(tmp_path: pathlib.Path) -> None:
305 root = _init_repo(tmp_path)
306 _setup_divergent_repo(root)
307 result = _invoke(root, "branch-a", "branch-b", "--write-objects", "--json")
308 data = json.loads(result.stdout)
309 from muse.core.snapshots import read_snapshot
310 snap = read_snapshot(root, data["snapshot_id"])
311 assert snap is not None
312 # Manifest in snapshot matches non-null entries in merged_manifest
313 for path, oid in data["merged_manifest"].items():
314 if oid is not None:
315 assert snap.manifest.get(path) == oid
316
317
318 def test_write_objects_not_default(tmp_path: pathlib.Path) -> None:
319 """Without --write-objects no snapshot_id is returned."""
320 root = _init_repo(tmp_path)
321 _setup_divergent_repo(root)
322 result = _invoke(root, "branch-a", "branch-b", "--json")
323 data = json.loads(result.stdout)
324 assert "snapshot_id" not in data
325
326
327 # ---------------------------------------------------------------------------
328 # Integration — text output
329 # ---------------------------------------------------------------------------
330
331
332 def test_text_output_shows_branch_names(tmp_path: pathlib.Path) -> None:
333 root = _init_repo(tmp_path)
334 _setup_divergent_repo(root)
335 result = _invoke(root, "branch-a", "branch-b")
336 assert result.exit_code == 0
337 assert "branch-a" in result.stdout or "a_new.py" in result.stdout
338
339
340 def test_text_output_conflict_mentions_path(tmp_path: pathlib.Path) -> None:
341 root = _init_repo(tmp_path)
342 _setup_conflicting_repo(root)
343 result = _invoke(root, "branch-a", "branch-b")
344 assert "shared.py" in result.stdout
345
346
347 # ---------------------------------------------------------------------------
348 # Integration — error cases
349 # ---------------------------------------------------------------------------
350
351
352 def test_nonexistent_branch_exits_nonzero(tmp_path: pathlib.Path) -> None:
353 root = _init_repo(tmp_path)
354 _commit(root, {"a.py": b"x\n"}, branch="main")
355 result = _invoke(root, "main", "no-such-branch", "--json")
356 assert result.exit_code != 0
357
358
359 def test_both_branches_nonexistent_exits_nonzero(tmp_path: pathlib.Path) -> None:
360 root = _init_repo(tmp_path)
361 result = _invoke(root, "ghost-a", "ghost-b", "--json")
362 assert result.exit_code != 0
363
364
365 def test_no_common_ancestor_exits_nonzero(tmp_path: pathlib.Path) -> None:
366 """Branches with no shared history cannot be merge-treed."""
367 root = _init_repo(tmp_path)
368 # Two independent root commits — no shared ancestor
369 _commit(root, {"x.py": b"x\n"}, branch="orphan-a")
370 _commit(root, {"y.py": b"y\n"}, branch="orphan-b")
371 result = _invoke(root, "orphan-a", "orphan-b", "--json")
372 assert result.exit_code != 0
373
374
375 # ---------------------------------------------------------------------------
376 # Integration — determinism
377 # ---------------------------------------------------------------------------
378
379
380 def test_same_inputs_same_output(tmp_path: pathlib.Path) -> None:
381 """merge-tree is pure — same inputs must produce identical output (excluding timing)."""
382 root = _init_repo(tmp_path)
383 _setup_divergent_repo(root)
384 r1 = _invoke(root, "branch-a", "branch-b", "--json")
385 r2 = _invoke(root, "branch-a", "branch-b", "--json")
386 _TIMING_KEYS = {"duration_ms", "timestamp"}
387 d1 = {k: v for k, v in json.loads(r1.stdout).items() if k not in _TIMING_KEYS}
388 d2 = {k: v for k, v in json.loads(r2.stdout).items() if k not in _TIMING_KEYS}
389 assert d1 == d2
390
391
392 def test_branch_order_does_not_affect_conflict_detection(tmp_path: pathlib.Path) -> None:
393 """Conflicts must be detected regardless of argument order."""
394 root = _init_repo(tmp_path)
395 _setup_conflicting_repo(root)
396 r1 = _invoke(root, "branch-a", "branch-b", "--json")
397 r2 = _invoke(root, "branch-b", "branch-a", "--json")
398 d1 = json.loads(r1.stdout)
399 d2 = json.loads(r2.stdout)
400 assert set(d1["conflicts"]) == set(d2["conflicts"])
401
402
403 # ---------------------------------------------------------------------------
404 # Security
405 # ---------------------------------------------------------------------------
406
407
408 def test_ansi_in_branch_name_rejected(tmp_path: pathlib.Path) -> None:
409 root = _init_repo(tmp_path)
410 result = _invoke(root, "\x1b[31mbad\x1b[0m", "main")
411 assert result.exit_code != 0
412
413
414 def test_ansi_in_second_branch_name_rejected(tmp_path: pathlib.Path) -> None:
415 root = _init_repo(tmp_path)
416 result = _invoke(root, "main", "\x1b[31mbad\x1b[0m")
417 assert result.exit_code != 0
418
419
420 # ---------------------------------------------------------------------------
421 # Stress — 50 files, 30% conflict rate
422 # ---------------------------------------------------------------------------
423
424
425 def test_stress_50_files_30_pct_conflicts(tmp_path: pathlib.Path) -> None:
426 root = _init_repo(tmp_path)
427 n = 50
428 conflict_count = int(n * 0.3) # 15 conflict files
429
430 base_files = {f"f{i}.py": f"v = {i}\n".encode() for i in range(n)}
431 base_id = _commit(root, base_files, branch="main")
432
433 # branch-a: modify first conflict_count files + add a_extra.py
434 a_files = dict(base_files)
435 for i in range(conflict_count):
436 a_files[f"f{i}.py"] = f"v = {i}_a\n".encode()
437 a_files["a_extra.py"] = b"a = 1\n"
438 a_id = _commit(root, a_files, branch="stress-a", parent_id=base_id)
439
440 # branch-b: modify same conflict_count files differently + add b_extra.py
441 b_files = dict(base_files)
442 for i in range(conflict_count):
443 b_files[f"f{i}.py"] = f"v = {i}_b\n".encode()
444 b_files["b_extra.py"] = b"b = 2\n"
445 b_id = _commit(root, b_files, branch="stress-b", parent_id=base_id)
446
447 result = _invoke(root, "stress-a", "stress-b", "--json")
448 assert result.exit_code != 0 # has conflicts
449 data = json.loads(result.stdout)
450
451 assert len(data["conflicts"]) == conflict_count
452 # Non-conflict extra files should be in merged manifest
453 assert "a_extra.py" in data["merged_manifest"]
454 assert "b_extra.py" in data["merged_manifest"]
455 # Unchanged files should be in merged manifest
456 for i in range(conflict_count, n):
457 assert f"f{i}.py" in data["merged_manifest"]
458
459
460 class TestRegisterFlags:
461 def test_default_json_out_is_false(self) -> None:
462 import argparse
463 from muse.cli.commands.merge_tree import register
464 p = argparse.ArgumentParser()
465 subs = p.add_subparsers()
466 register(subs)
467 args = p.parse_args(["merge-tree", "main", "dev"])
468 assert args.json_out is False
469
470 def test_json_flag_sets_json_out(self) -> None:
471 import argparse
472 from muse.cli.commands.merge_tree import register
473 p = argparse.ArgumentParser()
474 subs = p.add_subparsers()
475 register(subs)
476 args = p.parse_args(["merge-tree", "main", "dev", "--json"])
477 assert args.json_out is True
478
479 def test_j_shorthand_sets_json_out(self) -> None:
480 import argparse
481 from muse.cli.commands.merge_tree import register
482 p = argparse.ArgumentParser()
483 subs = p.add_subparsers()
484 register(subs)
485 args = p.parse_args(["merge-tree", "main", "dev", "-j"])
486 assert args.json_out is True
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 20 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 29 days ago