gabriel / muse public
test_plugin_apply_and_checkout.py python
333 lines 12.0 KB
Raw
sha256:1d3f5470f45db58e32047678debc9438fdded1b2c7332cc743d2b8be32fdafc8 fixing more broken tests Human patch 3 days ago
1 """Tests for plugin.apply() and the incremental checkout that wires it in.
2
3 Covers:
4 - MidiPlugin.apply() with a workdir path (files already updated on disk)
5 - MidiPlugin.apply() with a snapshot dict (in-memory removals)
6 - checkout incremental delta: only changed files are touched
7 - revert reuses parent snapshot_id (no re-scan)
8 - cherry-pick uses merged_manifest directly (no re-scan)
9 """
10
11 import pathlib
12
13 import pytest
14 from tests.cli_test_helper import CliRunner
15
16 cli = None # argparse migration — CliRunner ignores this arg
17 from muse.core.refs import get_head_commit_id
18 from muse.core.commits import read_commit
19 from muse.core.snapshots import read_snapshot
20 from muse.domain import DeleteOp, SnapshotManifest, StructuredDelta
21 from muse.plugins.midi.plugin import MidiPlugin
22
23 runner = CliRunner()
24 plugin = MidiPlugin()
25
26
27 # ---------------------------------------------------------------------------
28 # Fixtures
29 # ---------------------------------------------------------------------------
30
31
32 @pytest.fixture
33 def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
34 monkeypatch.chdir(tmp_path)
35 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
36 result = runner.invoke(cli, ["init", "--domain", "code"])
37 assert result.exit_code == 0, result.output
38 return tmp_path
39
40
41 def _write(repo: pathlib.Path, filename: str, content: str = "data") -> None:
42 (repo / filename).write_text(content)
43
44
45 def _add_all() -> None:
46 runner.invoke(cli, ["code", "add", "."])
47
48
49 def _commit(msg: str = "commit") -> None:
50 _add_all()
51 result = runner.invoke(cli, ["commit", "-m", msg])
52 assert result.exit_code == 0, result.output
53
54
55 def _head_id(repo: pathlib.Path) -> str:
56 cid = get_head_commit_id(repo, "main")
57 assert cid is not None
58 return cid
59
60
61 # ---------------------------------------------------------------------------
62 # MidiPlugin.apply() — unit tests
63 # ---------------------------------------------------------------------------
64
65
66 def _empty_delta() -> StructuredDelta:
67 return StructuredDelta(domain="midi", ops=[], summary="no changes")
68
69
70 class TestMidiPluginApplyPath:
71 """apply() with a workdir path rescans the directory for ground truth.
72
73 When live_state is a pathlib.Path, apply() ignores the delta and simply
74 rescans the directory — the physical filesystem is the source of truth.
75 """
76
77 def test_apply_returns_snapshot_of_workdir(self, tmp_path: pathlib.Path) -> None:
78 workdir = tmp_path / "work"
79 workdir.mkdir()
80 (workdir / "a.py").write_bytes(b"midi-a")
81 (workdir / "b.py").write_bytes(b"midi-b")
82
83 # Simulate remove b.mid physically before calling apply.
84 (workdir / "b.py").unlink()
85
86 result = plugin.apply(_empty_delta(), workdir)
87
88 assert "b.py" not in result["files"]
89 assert "a.py" in result["files"]
90
91 def test_apply_picks_up_added_file(self, tmp_path: pathlib.Path) -> None:
92 workdir = tmp_path / "work"
93 workdir.mkdir()
94 (workdir / "a.py").write_bytes(b"original")
95
96 # Add file physically before calling apply.
97 (workdir / "new.mid").write_bytes(b"new content")
98 result = plugin.apply(_empty_delta(), workdir)
99
100 assert "new.mid" in result["files"]
101
102 def test_apply_picks_up_modified_content(self, tmp_path: pathlib.Path) -> None:
103 workdir = tmp_path / "work"
104 workdir.mkdir()
105 (workdir / "a.py").write_bytes(b"v1")
106
107 snap_v1 = plugin.snapshot(workdir)
108
109 # Modify physically, then apply rescans.
110 (workdir / "a.py").write_bytes(b"v2")
111 result = plugin.apply(_empty_delta(), workdir)
112
113 assert result["files"]["a.py"] != snap_v1["files"]["a.py"]
114
115 def test_apply_empty_delta_returns_current_state(self, tmp_path: pathlib.Path) -> None:
116 workdir = tmp_path / "work"
117 workdir.mkdir()
118 (workdir / "a.py").write_bytes(b"data")
119
120 result = plugin.apply(_empty_delta(), workdir)
121 expected = plugin.snapshot(workdir)
122 assert result["files"] == expected["files"]
123
124
125 class TestMidiPluginApplyDict:
126 """apply() with a snapshot dict applies ops in-memory."""
127
128 def _delete(self, address: str, content_id: str = "x") -> DeleteOp:
129 return DeleteOp(
130 op="delete", address=address, position=None,
131 content_id=content_id, content_summary=f"deleted: {address}",
132 )
133
134 def test_apply_removes_deleted_paths(self) -> None:
135 snap = SnapshotManifest(files={"a.py": "aaa", "b.py": "bbb"}, domain="midi")
136 delta = StructuredDelta(
137 domain="midi",
138 ops=[self._delete("b.py", "bbb")],
139 summary="1 file removed",
140 )
141 result = plugin.apply(delta, snap)
142 assert "b.py" not in result["files"]
143 assert "a.py" in result["files"]
144
145 def test_apply_removes_multiple_paths(self) -> None:
146 snap = SnapshotManifest(files={"a.py": "aaa", "b.py": "bbb", "c.mid": "ccc"}, domain="midi")
147 delta = StructuredDelta(
148 domain="midi",
149 ops=[self._delete("a.py", "aaa"), self._delete("c.mid", "ccc")],
150 summary="2 files removed",
151 )
152 result = plugin.apply(delta, snap)
153 assert result["files"] == {"b.py": "bbb"}
154
155 def test_apply_nonexistent_remove_is_noop(self) -> None:
156 snap = SnapshotManifest(files={"a.py": "aaa"}, domain="midi")
157 delta = StructuredDelta(
158 domain="midi",
159 ops=[self._delete("ghost.mid")],
160 summary="1 file removed",
161 )
162 result = plugin.apply(delta, snap)
163 assert result["files"] == {"a.py": "aaa"}
164
165 def test_apply_preserves_domain(self) -> None:
166 snap = SnapshotManifest(files={}, domain="midi")
167 delta = _empty_delta()
168 result = plugin.apply(delta, snap)
169 assert result["domain"] == "midi"
170
171
172 # ---------------------------------------------------------------------------
173 # Incremental checkout via plugin.apply()
174 # ---------------------------------------------------------------------------
175
176
177 class TestIncrementalCheckout:
178 def test_checkout_branch_only_changes_delta(self, repo: pathlib.Path) -> None:
179 """Files unchanged between branches are not touched on disk."""
180 _write(repo, "shared.py", "shared")
181 _write(repo, "main_only.py", "main")
182 _commit("main initial")
183
184 runner.invoke(cli, ["branch", "feature"])
185 runner.invoke(cli, ["checkout", "feature"])
186 _write(repo, "feature_only.py", "feature")
187 _commit("feature commit")
188
189 runner.invoke(cli, ["checkout", "main"])
190 assert (repo / "shared.py").exists()
191 assert (repo / "main_only.py").exists()
192 assert not (repo / "feature_only.py").exists()
193
194 def test_checkout_restores_correct_content(self, repo: pathlib.Path) -> None:
195 """Modified files get the correct content after checkout."""
196 _write(repo, "beat.py", "v1")
197 _commit("v1")
198
199 runner.invoke(cli, ["branch", "feature"])
200 runner.invoke(cli, ["checkout", "feature"])
201 _write(repo, "beat.py", "v2")
202 _commit("v2 on feature")
203
204 runner.invoke(cli, ["checkout", "main"])
205 assert (repo / "beat.py").read_text() == "v1"
206
207 def test_checkout_commit_id_incremental(self, repo: pathlib.Path) -> None:
208 """Detached HEAD checkout also uses incremental apply."""
209 _write(repo, "a.py", "original")
210 _commit("first")
211 first_id = _head_id(repo)
212
213 _write(repo, "b.py", "new")
214 _commit("second")
215
216 runner.invoke(cli, ["checkout", first_id])
217 assert (repo / "a.py").exists()
218 assert not (repo / "b.py").exists()
219
220 def test_checkout_to_new_branch_keeps_workdir(self, repo: pathlib.Path) -> None:
221 """Switching to a brand-new (no-commit) branch keeps the current workdir intact."""
222 _write(repo, "beat.py")
223 _commit("some work")
224
225 runner.invoke(cli, ["branch", "empty-branch"])
226 runner.invoke(cli, ["checkout", "empty-branch"])
227 # workdir is preserved — new branch inherits from where we branched
228 assert (repo / "beat.py").exists()
229
230
231 # ---------------------------------------------------------------------------
232 # Revert reuses parent snapshot (no re-scan)
233 # ---------------------------------------------------------------------------
234
235
236 class TestRevertSnapshotReuse:
237 def test_revert_commit_points_to_parent_snapshot(self, repo: pathlib.Path) -> None:
238 """The revert commit's snapshot_id is the same as the reverted commit's parent."""
239 _write(repo, "beat.py", "base")
240 _commit("base")
241 base_id = _head_id(repo)
242 base_commit = read_commit(repo, base_id)
243 assert base_commit is not None
244 parent_snapshot_id = base_commit.snapshot_id
245
246 _write(repo, "lead.py", "new")
247 _commit("add lead")
248 lead_id = _head_id(repo)
249
250 runner.invoke(cli, ["revert", lead_id])
251 revert_id = _head_id(repo)
252 revert_commit = read_commit(repo, revert_id)
253 assert revert_commit is not None
254 # The revert commit points to the same snapshot as "base" — no re-scan.
255 assert revert_commit.snapshot_id == parent_snapshot_id
256
257 def test_revert_snapshot_already_in_store(self, repo: pathlib.Path) -> None:
258 """After revert, the referenced snapshot is already in the store (not re-created)."""
259 _write(repo, "beat.py", "base")
260 _commit("base")
261 base_id = _head_id(repo)
262 base_commit = read_commit(repo, base_id)
263 assert base_commit is not None
264
265 _write(repo, "lead.py", "new")
266 _commit("add lead")
267 lead_id = _head_id(repo)
268
269 runner.invoke(cli, ["revert", lead_id])
270 revert_id = _head_id(repo)
271 revert_commit = read_commit(repo, revert_id)
272 assert revert_commit is not None
273
274 snap = read_snapshot(repo, revert_commit.snapshot_id)
275 assert snap is not None
276 assert "beat.py" in snap.manifest
277 assert "lead.py" not in snap.manifest
278
279
280 # ---------------------------------------------------------------------------
281 # Cherry-pick uses merged_manifest (no re-scan)
282 # ---------------------------------------------------------------------------
283
284
285 class TestCherryPickManifestReuse:
286 def test_cherry_pick_commit_has_correct_snapshot(self, repo: pathlib.Path) -> None:
287 """Cherry-picked commit's snapshot contains only the right files."""
288 _write(repo, "base.py", "base")
289 _commit("base")
290
291 runner.invoke(cli, ["branch", "feature"])
292 runner.invoke(cli, ["checkout", "feature"])
293 _write(repo, "feature.py", "feature content")
294 _commit("feature addition")
295 feature_id = get_head_commit_id(repo, "feature")
296 assert feature_id is not None
297
298 runner.invoke(cli, ["checkout", "main"])
299 result = runner.invoke(cli, ["cherry-pick", feature_id])
300 assert result.exit_code == 0, result.output
301
302 picked_id = _head_id(repo)
303 picked_commit = read_commit(repo, picked_id)
304 assert picked_commit is not None
305 snap = read_snapshot(repo, picked_commit.snapshot_id)
306 assert snap is not None
307 assert "feature.py" in snap.manifest
308 assert "base.py" in snap.manifest
309
310 def test_cherry_pick_snapshot_objects_in_store(self, repo: pathlib.Path) -> None:
311 """Objects in the cherry-pick snapshot are already in the store."""
312 _write(repo, "base.py", "base")
313 _commit("base")
314
315 runner.invoke(cli, ["branch", "feature"])
316 runner.invoke(cli, ["checkout", "feature"])
317 _write(repo, "extra.py", "extra")
318 _commit("feature extra")
319 feature_id = get_head_commit_id(repo, "feature")
320 assert feature_id is not None
321
322 runner.invoke(cli, ["checkout", "main"])
323 runner.invoke(cli, ["cherry-pick", feature_id])
324
325 picked_id = _head_id(repo)
326 picked_commit = read_commit(repo, picked_id)
327 assert picked_commit is not None
328 snap = read_snapshot(repo, picked_commit.snapshot_id)
329 assert snap is not None
330
331 from muse.core.object_store import has_object
332 for oid in snap.manifest.values():
333 assert has_object(repo, oid), f"Object {oid[:8]} missing from store"
File History 1 commit
sha256:1d3f5470f45db58e32047678debc9438fdded1b2c7332cc743d2b8be32fdafc8 fixing more broken tests Human patch 3 days ago