gabriel / muse public
test_plugin_apply_and_checkout.py python
328 lines 12.0 KB
Raw
sha256:1d3f5470f45db58e32047678debc9438fdded1b2c7332cc743d2b8be32fdafc8 fixing more broken tests Human patch 14 days ago
1 """Tests for plugin.apply() and the incremental checkout that wires it in.
2
3 Covers:
4 - MidiPlugin.apply() with a workdir path (files already updated on disk)
5 - MidiPlugin.apply() with a snapshot dict (in-memory removals)
6 - checkout incremental delta: only changed files are touched
7 - revert reuses parent snapshot_id (no re-scan)
8 - cherry-pick uses merged_manifest directly (no re-scan)
9 """
10
11 import pathlib
12
13 import pytest
14 from tests.cli_test_helper import CliRunner
15
16 cli = None # argparse migration — CliRunner ignores this arg
17 from muse.core.refs import get_head_commit_id
18 from muse.core.commits import read_commit
19 from muse.core.snapshots import read_snapshot
20 from muse.domain import DeleteOp, SnapshotManifest, StructuredDelta
21 from muse.plugins.midi.plugin import MidiPlugin
22
23 runner = CliRunner()
24 plugin = MidiPlugin()
25
26
27 # ---------------------------------------------------------------------------
28 # Fixtures
29 # ---------------------------------------------------------------------------
30
31
32 @pytest.fixture
33 def repo(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> pathlib.Path:
34 monkeypatch.chdir(tmp_path)
35 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
36 result = runner.invoke(cli, ["init"])
37 assert result.exit_code == 0, result.output
38 return tmp_path
39
40
41 def _write(repo: pathlib.Path, filename: str, content: str = "data") -> None:
42 (repo / filename).write_text(content)
43
44
45 def _commit(msg: str = "commit") -> None:
46 result = runner.invoke(cli, ["commit", "-m", msg])
47 assert result.exit_code == 0, result.output
48
49
50 def _head_id(repo: pathlib.Path) -> str:
51 cid = get_head_commit_id(repo, "main")
52 assert cid is not None
53 return cid
54
55
56 # ---------------------------------------------------------------------------
57 # MidiPlugin.apply() — unit tests
58 # ---------------------------------------------------------------------------
59
60
61 def _empty_delta() -> StructuredDelta:
62 return StructuredDelta(domain="midi", ops=[], summary="no changes")
63
64
65 class TestMidiPluginApplyPath:
66 """apply() with a workdir path rescans the directory for ground truth.
67
68 When live_state is a pathlib.Path, apply() ignores the delta and simply
69 rescans the directory — the physical filesystem is the source of truth.
70 """
71
72 def test_apply_returns_snapshot_of_workdir(self, tmp_path: pathlib.Path) -> None:
73 workdir = tmp_path / "work"
74 workdir.mkdir()
75 (workdir / "a.mid").write_bytes(b"midi-a")
76 (workdir / "b.mid").write_bytes(b"midi-b")
77
78 # Simulate remove b.mid physically before calling apply.
79 (workdir / "b.mid").unlink()
80
81 result = plugin.apply(_empty_delta(), workdir)
82
83 assert "b.mid" not in result["files"]
84 assert "a.mid" in result["files"]
85
86 def test_apply_picks_up_added_file(self, tmp_path: pathlib.Path) -> None:
87 workdir = tmp_path / "work"
88 workdir.mkdir()
89 (workdir / "a.mid").write_bytes(b"original")
90
91 # Add file physically before calling apply.
92 (workdir / "new.mid").write_bytes(b"new content")
93 result = plugin.apply(_empty_delta(), workdir)
94
95 assert "new.mid" in result["files"]
96
97 def test_apply_picks_up_modified_content(self, tmp_path: pathlib.Path) -> None:
98 workdir = tmp_path / "work"
99 workdir.mkdir()
100 (workdir / "a.mid").write_bytes(b"v1")
101
102 snap_v1 = plugin.snapshot(workdir)
103
104 # Modify physically, then apply rescans.
105 (workdir / "a.mid").write_bytes(b"v2")
106 result = plugin.apply(_empty_delta(), workdir)
107
108 assert result["files"]["a.mid"] != snap_v1["files"]["a.mid"]
109
110 def test_apply_empty_delta_returns_current_state(self, tmp_path: pathlib.Path) -> None:
111 workdir = tmp_path / "work"
112 workdir.mkdir()
113 (workdir / "a.mid").write_bytes(b"data")
114
115 result = plugin.apply(_empty_delta(), workdir)
116 expected = plugin.snapshot(workdir)
117 assert result["files"] == expected["files"]
118
119
120 class TestMidiPluginApplyDict:
121 """apply() with a snapshot dict applies ops in-memory."""
122
123 def _delete(self, address: str, content_id: str = "x") -> DeleteOp:
124 return DeleteOp(
125 op="delete", address=address, position=None,
126 content_id=content_id, content_summary=f"deleted: {address}",
127 )
128
129 def test_apply_removes_deleted_paths(self) -> None:
130 snap = SnapshotManifest(files={"a.mid": "aaa", "b.mid": "bbb"}, domain="midi")
131 delta = StructuredDelta(
132 domain="midi",
133 ops=[self._delete("b.mid", "bbb")],
134 summary="1 file removed",
135 )
136 result = plugin.apply(delta, snap)
137 assert "b.mid" not in result["files"]
138 assert "a.mid" in result["files"]
139
140 def test_apply_removes_multiple_paths(self) -> None:
141 snap = SnapshotManifest(files={"a.mid": "aaa", "b.mid": "bbb", "c.mid": "ccc"}, domain="midi")
142 delta = StructuredDelta(
143 domain="midi",
144 ops=[self._delete("a.mid", "aaa"), self._delete("c.mid", "ccc")],
145 summary="2 files removed",
146 )
147 result = plugin.apply(delta, snap)
148 assert result["files"] == {"b.mid": "bbb"}
149
150 def test_apply_nonexistent_remove_is_noop(self) -> None:
151 snap = SnapshotManifest(files={"a.mid": "aaa"}, domain="midi")
152 delta = StructuredDelta(
153 domain="midi",
154 ops=[self._delete("ghost.mid")],
155 summary="1 file removed",
156 )
157 result = plugin.apply(delta, snap)
158 assert result["files"] == {"a.mid": "aaa"}
159
160 def test_apply_preserves_domain(self) -> None:
161 snap = SnapshotManifest(files={}, domain="midi")
162 delta = _empty_delta()
163 result = plugin.apply(delta, snap)
164 assert result["domain"] == "midi"
165
166
167 # ---------------------------------------------------------------------------
168 # Incremental checkout via plugin.apply()
169 # ---------------------------------------------------------------------------
170
171
172 class TestIncrementalCheckout:
173 def test_checkout_branch_only_changes_delta(self, repo: pathlib.Path) -> None:
174 """Files unchanged between branches are not touched on disk."""
175 _write(repo, "shared.mid", "shared")
176 _write(repo, "main-only.mid", "main")
177 _commit("main initial")
178
179 runner.invoke(cli, ["branch", "feature"])
180 runner.invoke(cli, ["checkout", "feature"])
181 _write(repo, "feature-only.mid", "feature")
182 _commit("feature commit")
183
184 runner.invoke(cli, ["checkout", "main"])
185 assert (repo / "shared.mid").exists()
186 assert (repo / "main-only.mid").exists()
187 assert not (repo / "feature-only.mid").exists()
188
189 def test_checkout_restores_correct_content(self, repo: pathlib.Path) -> None:
190 """Modified files get the correct content after checkout."""
191 _write(repo, "beat.mid", "v1")
192 _commit("v1")
193
194 runner.invoke(cli, ["branch", "feature"])
195 runner.invoke(cli, ["checkout", "feature"])
196 _write(repo, "beat.mid", "v2")
197 _commit("v2 on feature")
198
199 runner.invoke(cli, ["checkout", "main"])
200 assert (repo / "beat.mid").read_text() == "v1"
201
202 def test_checkout_commit_id_incremental(self, repo: pathlib.Path) -> None:
203 """Detached HEAD checkout also uses incremental apply."""
204 _write(repo, "a.mid", "original")
205 _commit("first")
206 first_id = _head_id(repo)
207
208 _write(repo, "b.mid", "new")
209 _commit("second")
210
211 runner.invoke(cli, ["checkout", first_id])
212 assert (repo / "a.mid").exists()
213 assert not (repo / "b.mid").exists()
214
215 def test_checkout_to_new_branch_keeps_workdir(self, repo: pathlib.Path) -> None:
216 """Switching to a brand-new (no-commit) branch keeps the current workdir intact."""
217 _write(repo, "beat.mid")
218 _commit("some work")
219
220 runner.invoke(cli, ["branch", "empty-branch"])
221 runner.invoke(cli, ["checkout", "empty-branch"])
222 # workdir is preserved — new branch inherits from where we branched
223 assert (repo / "beat.mid").exists()
224
225
226 # ---------------------------------------------------------------------------
227 # Revert reuses parent snapshot (no re-scan)
228 # ---------------------------------------------------------------------------
229
230
231 class TestRevertSnapshotReuse:
232 def test_revert_commit_points_to_parent_snapshot(self, repo: pathlib.Path) -> None:
233 """The revert commit's snapshot_id is the same as the reverted commit's parent."""
234 _write(repo, "beat.mid", "base")
235 _commit("base")
236 base_id = _head_id(repo)
237 base_commit = read_commit(repo, base_id)
238 assert base_commit is not None
239 parent_snapshot_id = base_commit.snapshot_id
240
241 _write(repo, "lead.mid", "new")
242 _commit("add lead")
243 lead_id = _head_id(repo)
244
245 runner.invoke(cli, ["revert", lead_id])
246 revert_id = _head_id(repo)
247 revert_commit = read_commit(repo, revert_id)
248 assert revert_commit is not None
249 # The revert commit points to the same snapshot as "base" — no re-scan.
250 assert revert_commit.snapshot_id == parent_snapshot_id
251
252 def test_revert_snapshot_already_in_store(self, repo: pathlib.Path) -> None:
253 """After revert, the referenced snapshot is already in the store (not re-created)."""
254 _write(repo, "beat.mid", "base")
255 _commit("base")
256 base_id = _head_id(repo)
257 base_commit = read_commit(repo, base_id)
258 assert base_commit is not None
259
260 _write(repo, "lead.mid", "new")
261 _commit("add lead")
262 lead_id = _head_id(repo)
263
264 runner.invoke(cli, ["revert", lead_id])
265 revert_id = _head_id(repo)
266 revert_commit = read_commit(repo, revert_id)
267 assert revert_commit is not None
268
269 snap = read_snapshot(repo, revert_commit.snapshot_id)
270 assert snap is not None
271 assert "beat.mid" in snap.manifest
272 assert "lead.mid" not in snap.manifest
273
274
275 # ---------------------------------------------------------------------------
276 # Cherry-pick uses merged_manifest (no re-scan)
277 # ---------------------------------------------------------------------------
278
279
280 class TestCherryPickManifestReuse:
281 def test_cherry_pick_commit_has_correct_snapshot(self, repo: pathlib.Path) -> None:
282 """Cherry-picked commit's snapshot contains only the right files."""
283 _write(repo, "base.mid", "base")
284 _commit("base")
285
286 runner.invoke(cli, ["branch", "feature"])
287 runner.invoke(cli, ["checkout", "feature"])
288 _write(repo, "feature.mid", "feature content")
289 _commit("feature addition")
290 feature_id = get_head_commit_id(repo, "feature")
291 assert feature_id is not None
292
293 runner.invoke(cli, ["checkout", "main"])
294 result = runner.invoke(cli, ["cherry-pick", feature_id])
295 assert result.exit_code == 0, result.output
296
297 picked_id = _head_id(repo)
298 picked_commit = read_commit(repo, picked_id)
299 assert picked_commit is not None
300 snap = read_snapshot(repo, picked_commit.snapshot_id)
301 assert snap is not None
302 assert "feature.mid" in snap.manifest
303 assert "base.mid" in snap.manifest
304
305 def test_cherry_pick_snapshot_objects_in_store(self, repo: pathlib.Path) -> None:
306 """Objects in the cherry-pick snapshot are already in the store."""
307 _write(repo, "base.mid", "base")
308 _commit("base")
309
310 runner.invoke(cli, ["branch", "feature"])
311 runner.invoke(cli, ["checkout", "feature"])
312 _write(repo, "extra.mid", "extra")
313 _commit("feature extra")
314 feature_id = get_head_commit_id(repo, "feature")
315 assert feature_id is not None
316
317 runner.invoke(cli, ["checkout", "main"])
318 runner.invoke(cli, ["cherry-pick", feature_id])
319
320 picked_id = _head_id(repo)
321 picked_commit = read_commit(repo, picked_id)
322 assert picked_commit is not None
323 snap = read_snapshot(repo, picked_commit.snapshot_id)
324 assert snap is not None
325
326 from muse.core.object_store import has_object
327 for oid in snap.manifest.values():
328 assert has_object(repo, oid), f"Object {oid[:8]} missing from store"
File History 1 commit
sha256:1d3f5470f45db58e32047678debc9438fdded1b2c7332cc743d2b8be32fdafc8 fixing more broken tests Human patch 14 days ago