gabriel / muse public
test_core_coverage_gaps.py python
509 lines 19.9 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
1 """Tests targeting coverage gaps in core modules: object_store, repo, store, merge_engine."""
2
3 import json
4 import os
5 import pathlib
6
7 import pytest
8
9 from muse.core.types import NULL_LONG_ID, blob_id, fake_id, long_id
10 from muse.core.object_store import (
11 has_object,
12 object_path,
13 objects_dir,
14 read_object,
15 restore_object,
16 write_object,
17 write_object_from_path,
18 )
19 from muse.core.repo import find_repo_root, require_repo
20 from muse.core.refs import get_head_commit_id
21 from muse.core.commits import (
22 CommitRecord,
23 get_commits_for_branch,
24 get_head_snapshot_id,
25 read_commit,
26 resolve_commit_ref,
27 update_commit_metadata,
28 write_commit,
29 )
30 from muse.core.snapshots import (
31 SnapshotRecord,
32 get_head_snapshot_manifest,
33 read_snapshot,
34 write_snapshot,
35 )
36 from muse.core.tags import get_tags_for_commit
37 from muse.core.merge_engine import apply_resolution, clear_merge_state, read_merge_state, write_merge_state
38 from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id
39 from muse.core.types import Manifest
40 from muse.core.paths import heads_dir, merge_state_path, muse_dir, objects_dir
41
42 import datetime
43
44
45 # ---------------------------------------------------------------------------
46 # object_store
47 # ---------------------------------------------------------------------------
48
49
50 class TestObjectStore:
51 def test_objects_dir_path(self, tmp_path: pathlib.Path) -> None:
52 d = objects_dir(tmp_path)
53 assert d == objects_dir(tmp_path)
54
55 def test_object_path_sharding(self, tmp_path: pathlib.Path) -> None:
56 oid = long_id(f"ab{'c' * 62}")
57 p = object_path(tmp_path, oid)
58 assert p.parent.name == "ab"
59 assert p.name == "c" * 62
60
61 def test_has_object_false_when_absent(self, tmp_path: pathlib.Path) -> None:
62 assert not has_object(tmp_path, long_id("a" * 64))
63
64 def test_has_object_true_after_write(self, tmp_path: pathlib.Path) -> None:
65 content = b"hello"
66 oid = blob_id(content)
67 write_object(tmp_path, oid, content)
68 assert has_object(tmp_path, oid)
69
70 def test_write_object_idempotent_returns_false(self, tmp_path: pathlib.Path) -> None:
71 content = b"first"
72 oid = blob_id(content)
73 assert write_object(tmp_path, oid, content) is True
74 # Second write with correct hash but same ID — idempotent
75 assert write_object(tmp_path, oid, content) is False
76 # content should not change
77 assert read_object(tmp_path, oid) == content
78
79 def test_write_object_from_path_idempotent(self, tmp_path: pathlib.Path) -> None:
80 content = b"content"
81 src = tmp_path / "src.bin"
82 src.write_bytes(content)
83 oid = blob_id(content)
84 assert write_object_from_path(tmp_path, oid, src) is True
85 assert write_object_from_path(tmp_path, oid, src) is False
86
87 def test_write_object_from_path_stores_content(self, tmp_path: pathlib.Path) -> None:
88 content = b"my bytes"
89 src = tmp_path / "file.bin"
90 src.write_bytes(content)
91 oid = blob_id(content)
92 write_object_from_path(tmp_path, oid, src)
93 assert read_object(tmp_path, oid) == content
94
95 def test_read_object_returns_none_when_absent(self, tmp_path: pathlib.Path) -> None:
96 assert read_object(tmp_path, long_id("e" * 64)) is None
97
98 def test_read_object_returns_bytes(self, tmp_path: pathlib.Path) -> None:
99 content = b"data"
100 oid = blob_id(content)
101 write_object(tmp_path, oid, content)
102 assert read_object(tmp_path, oid) == content
103
104 def test_restore_object_returns_false_when_absent(self, tmp_path: pathlib.Path) -> None:
105 dest = tmp_path / "out.bin"
106 result = restore_object(tmp_path, NULL_LONG_ID, dest)
107 assert result is False
108 assert not dest.exists()
109
110 def test_restore_object_creates_dest(self, tmp_path: pathlib.Path) -> None:
111 content = b"restored"
112 oid = blob_id(content)
113 write_object(tmp_path, oid, content)
114 dest = tmp_path / "sub" / "out.bin"
115 result = restore_object(tmp_path, oid, dest)
116 assert result is True
117 assert dest.read_bytes() == content
118
119 def test_restore_object_creates_parent_dirs(self, tmp_path: pathlib.Path) -> None:
120 content = b"nested"
121 oid = blob_id(content)
122 write_object(tmp_path, oid, content)
123 dest = tmp_path / "a" / "b" / "c" / "file.bin"
124 restore_object(tmp_path, oid, dest)
125 assert dest.exists()
126
127
128 class TestRestoreObjectIdempotency:
129 """restore_object must preserve the destination inode when content matches.
130
131 The ``os.replace`` rename syscall always produces a new inode. Editors
132 (Cursor, VS Code, Vim, …) use inode-based filesystem-event watchers; a
133 spurious rename blinds them to subsequent changes, leaving permanently stale
134 buffers. The fix: hash-check dest before writing — if bytes already match
135 the requested object_id, return without touching the file.
136
137 These tests are the regression gate for that fix. They prove:
138
139 1. When dest already has the correct content the inode is preserved.
140 2. When dest has *different* content the file is replaced (inode changes).
141 3. When dest does not yet exist the write proceeds normally.
142 4. A checkout-style simulation: many files, only changed ones get new inodes.
143 """
144
145 def test_inode_preserved_when_content_matches(self, tmp_path: pathlib.Path) -> None:
146 """Core regression: restore_object must NOT rename when content is correct."""
147 content = b"editor-watching-this-file"
148 oid = blob_id(content)
149 write_object(tmp_path, oid, content)
150
151 dest = tmp_path / "file.txt"
152 dest.write_bytes(content)
153 inode_before = dest.stat().st_ino
154
155 result = restore_object(tmp_path, oid, dest)
156
157 assert result is True
158 assert dest.read_bytes() == content
159 # The inode must not change — a rename would produce a new inode and
160 # blind any editor that was watching the original file descriptor.
161 assert dest.stat().st_ino == inode_before, (
162 "restore_object issued a spurious rename even though dest already "
163 "contained the correct content — this blinds inode-watching editors"
164 )
165
166 def test_mtime_preserved_when_content_matches(self, tmp_path: pathlib.Path) -> None:
167 """mtime stability: no spurious write means no mtime bump."""
168 content = b"stable-mtime-check"
169 oid = blob_id(content)
170 write_object(tmp_path, oid, content)
171
172 dest = tmp_path / "file.txt"
173 dest.write_bytes(content)
174 mtime_ns_before = dest.stat().st_mtime_ns
175
176 restore_object(tmp_path, oid, dest)
177
178 assert dest.stat().st_mtime_ns == mtime_ns_before, (
179 "restore_object bumped mtime even though content was already correct"
180 )
181
182 def test_inode_changes_when_content_differs(self, tmp_path: pathlib.Path) -> None:
183 """When dest has wrong content the file must be replaced."""
184 correct = b"correct-content"
185 wrong = b"wrong-content-different-bytes"
186 oid = blob_id(correct)
187 write_object(tmp_path, oid, correct)
188
189 dest = tmp_path / "file.txt"
190 dest.write_bytes(wrong)
191 inode_before = dest.stat().st_ino
192
193 result = restore_object(tmp_path, oid, dest)
194
195 assert result is True
196 assert dest.read_bytes() == correct
197 # Content changed — a rename is expected; the inode must be different.
198 assert dest.stat().st_ino != inode_before
199
200 def test_idempotent_on_fresh_file(self, tmp_path: pathlib.Path) -> None:
201 """When dest does not yet exist the write proceeds normally."""
202 content = b"brand-new-file"
203 oid = blob_id(content)
204 write_object(tmp_path, oid, content)
205
206 dest = tmp_path / "new.txt"
207 assert not dest.exists()
208
209 result = restore_object(tmp_path, oid, dest)
210
211 assert result is True
212 assert dest.read_bytes() == content
213
214 def test_second_restore_is_truly_noop(self, tmp_path: pathlib.Path) -> None:
215 """Calling restore_object twice leaves the file and inode unchanged."""
216 content = b"idempotent-restore"
217 oid = blob_id(content)
218 write_object(tmp_path, oid, content)
219
220 dest = tmp_path / "file.txt"
221 restore_object(tmp_path, oid, dest) # first call — writes the file
222 inode_first = dest.stat().st_ino
223 mtime_first = dest.stat().st_mtime_ns
224
225 restore_object(tmp_path, oid, dest) # second call — must be a no-op
226
227 assert dest.stat().st_ino == inode_first
228 assert dest.stat().st_mtime_ns == mtime_first
229 assert dest.read_bytes() == content
230
231 def test_checkout_simulation_only_changed_files_renamed(
232 self, tmp_path: pathlib.Path
233 ) -> None:
234 """Simulate a branch checkout: only files that changed get new inodes.
235
236 This is the end-to-end scenario that caused the Cursor stale-buffer bug:
237 a ``muse checkout`` that touches N files would rename ALL of them even
238 when most were identical on both branches. After the fix, only the
239 genuinely changed file gets a new inode.
240 """
241 unchanged_content = b"I am the same on both branches"
242 changed_old = b"old branch content"
243 changed_new = b"new branch content"
244
245 unchanged_oid = blob_id(unchanged_content)
246 changed_oid = blob_id(changed_new)
247
248 write_object(tmp_path, unchanged_oid, unchanged_content)
249 write_object(tmp_path, changed_oid, changed_new)
250
251 unchanged_dest = tmp_path / "unchanged.py"
252 changed_dest = tmp_path / "changed.py"
253
254 # Simulate working tree before checkout
255 unchanged_dest.write_bytes(unchanged_content)
256 changed_dest.write_bytes(changed_old)
257
258 inode_unchanged_before = unchanged_dest.stat().st_ino
259 inode_changed_before = changed_dest.stat().st_ino
260
261 # Simulate _checkout_snapshot restoring both files
262 restore_object(tmp_path, unchanged_oid, unchanged_dest)
263 restore_object(tmp_path, changed_oid, changed_dest)
264
265 # unchanged file: inode must be preserved (no rename)
266 assert unchanged_dest.stat().st_ino == inode_unchanged_before, (
267 "unchanged file got a new inode — editor watching it would go blind"
268 )
269 # changed file: inode should differ (content replaced)
270 assert changed_dest.stat().st_ino != inode_changed_before
271 assert changed_dest.read_bytes() == changed_new
272
273
274 # ---------------------------------------------------------------------------
275 # repo
276 # ---------------------------------------------------------------------------
277
278
279 class TestFindRepoRoot:
280 def test_finds_muse_dir_in_cwd(self, tmp_path: pathlib.Path) -> None:
281 muse_dir(tmp_path).mkdir()
282 result = find_repo_root(tmp_path)
283 assert result == tmp_path
284
285 def test_finds_muse_dir_in_parent(self, tmp_path: pathlib.Path) -> None:
286 muse_dir(tmp_path).mkdir()
287 subdir = tmp_path / "a" / "b"
288 subdir.mkdir(parents=True)
289 result = find_repo_root(subdir)
290 assert result == tmp_path
291
292 def test_returns_none_when_no_repo(self, tmp_path: pathlib.Path) -> None:
293 result = find_repo_root(tmp_path)
294 assert result is None
295
296 def test_env_override_returns_path(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
297 muse_dir(tmp_path).mkdir()
298 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
299 result = find_repo_root()
300 assert result == tmp_path
301
302 def test_env_override_returns_none_when_not_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
303 # tmp_path exists but has no .muse/
304 monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path))
305 result = find_repo_root()
306 assert result is None
307
308 def test_require_repo_exits_when_no_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None:
309 monkeypatch.delenv("MUSE_REPO_ROOT", raising=False)
310 monkeypatch.chdir(tmp_path)
311 with pytest.raises(SystemExit):
312 require_repo()
313
314
315 # ---------------------------------------------------------------------------
316 # store coverage gaps
317 # ---------------------------------------------------------------------------
318
319
320 class TestStoreGaps:
321 def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path:
322 muse = muse_dir(tmp_path)
323 for d in ("commits", "snapshots", "objects", "refs/heads"):
324 (muse / d).mkdir(parents=True)
325 (muse / "HEAD").write_text("ref: refs/heads/main\n")
326 (muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"}))
327 (muse / "refs" / "heads" / "main").write_text("")
328 return tmp_path
329
330 def test_get_head_commit_id_empty_branch(self, tmp_path: pathlib.Path) -> None:
331 root = self._make_repo(tmp_path)
332 assert get_head_commit_id(root, "main") is None
333
334 def test_get_head_snapshot_id_no_commits(self, tmp_path: pathlib.Path) -> None:
335 root = self._make_repo(tmp_path)
336 assert get_head_snapshot_id(root, "main") is None
337
338 def test_get_head_snapshot_manifest_no_commits(self, tmp_path: pathlib.Path) -> None:
339 root = self._make_repo(tmp_path)
340 assert get_head_snapshot_manifest(root, "main") is None
341
342 def test_get_commits_for_branch_empty(self, tmp_path: pathlib.Path) -> None:
343 root = self._make_repo(tmp_path)
344 commits = get_commits_for_branch(root, "main")
345 assert commits == []
346
347 def _seed_chain(self, root: pathlib.Path, n: int) -> list[str]:
348 """Write a linear chain of *n* commits on ``main`` and return their IDs (newest first)."""
349 ids: list[str] = []
350 parent_id: str | None = None
351 manifest: Manifest = {}
352 snap_id = compute_snapshot_id(manifest)
353 write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest))
354 for i in range(n):
355 committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) + datetime.timedelta(hours=i)
356 message = f"commit {i}"
357 parent_ids = [parent_id] if parent_id else []
358 commit_id = compute_commit_id(
359 parent_ids=parent_ids,
360 snapshot_id=snap_id,
361 message=message,
362 committed_at_iso=committed_at.isoformat(),
363 )
364 commit = CommitRecord(
365 commit_id=commit_id,
366 branch="main",
367 snapshot_id=snap_id,
368 message=message,
369 committed_at=committed_at,
370 parent_commit_id=parent_id,
371 )
372 write_commit(root, commit)
373 ids.append(commit_id)
374 parent_id = commit_id
375 # HEAD points at the last (newest) commit
376 (heads_dir(root) / "main").write_text(ids[-1])
377 ids.reverse() # newest first, matching get_commits_for_branch order
378 return ids
379
380 def test_get_commits_for_branch_max_count_stops_early(
381 self, tmp_path: pathlib.Path
382 ) -> None:
383 """max_count caps the walk — only that many commits are returned."""
384 root = self._make_repo(tmp_path)
385 all_ids = self._seed_chain(root, 5)
386
387 result = get_commits_for_branch(root, "main", max_count=2)
388 assert len(result) == 2
389 assert result[0].commit_id == all_ids[0]
390 assert result[1].commit_id == all_ids[1]
391
392 def test_get_commits_for_branch_max_count_zero_returns_all(
393 self, tmp_path: pathlib.Path
394 ) -> None:
395 """max_count=0 (the default) returns the full chain."""
396 root = self._make_repo(tmp_path)
397 all_ids = self._seed_chain(root, 5)
398
399 result = get_commits_for_branch(root, "main", max_count=0)
400 assert len(result) == 5
401 assert [c.commit_id for c in result] == all_ids
402
403 def test_get_commits_for_branch_max_count_larger_than_chain(
404 self, tmp_path: pathlib.Path
405 ) -> None:
406 """max_count larger than the chain length returns every commit without error."""
407 root = self._make_repo(tmp_path)
408 all_ids = self._seed_chain(root, 3)
409
410 result = get_commits_for_branch(root, "main", max_count=100)
411 assert len(result) == 3
412 assert [c.commit_id for c in result] == all_ids
413
414 def test_resolve_commit_ref_with_none_returns_head(self, tmp_path: pathlib.Path) -> None:
415 root = self._make_repo(tmp_path)
416 manifest: Manifest = {"a.mid": fake_id("a.mid-content")}
417 snap_id = compute_snapshot_id(manifest)
418 snap = SnapshotRecord(snapshot_id=snap_id, manifest=manifest)
419 write_snapshot(root, snap)
420 committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
421 commit_id = compute_commit_id(
422 parent_ids=[],
423 snapshot_id=snap_id,
424 message="test",
425 committed_at_iso=committed_at.isoformat(),
426 )
427 commit = CommitRecord(
428 commit_id=commit_id,
429 branch="main",
430 snapshot_id=snap_id,
431 message="test",
432 committed_at=committed_at,
433 )
434 write_commit(root, commit)
435 (heads_dir(root) / "main").write_text(commit_id)
436
437 result = resolve_commit_ref(root, "main", None)
438 assert result is not None
439 assert result.commit_id == commit_id
440
441 def test_read_commit_returns_none_for_unknown(self, tmp_path: pathlib.Path) -> None:
442 root = self._make_repo(tmp_path)
443 assert read_commit(root, long_id("a" * 64)) is None
444
445 def test_read_snapshot_returns_none_for_unknown(self, tmp_path: pathlib.Path) -> None:
446 root = self._make_repo(tmp_path)
447 assert read_snapshot(root, long_id("b" * 64)) is None
448
449 def test_update_commit_metadata_false_for_unknown(self, tmp_path: pathlib.Path) -> None:
450 root = self._make_repo(tmp_path)
451 assert update_commit_metadata(root, long_id("c" * 64), "key", "val") is False
452
453 def test_get_tags_for_commit_empty(self, tmp_path: pathlib.Path) -> None:
454 root = self._make_repo(tmp_path)
455 tags = get_tags_for_commit(root, long_id("d" * 64), long_id("c" * 64))
456 assert tags == []
457
458
459 # ---------------------------------------------------------------------------
460 # merge_engine coverage gaps
461 # ---------------------------------------------------------------------------
462
463
464 class TestMergeEngineCoverageGaps:
465 def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path:
466 muse = muse_dir(tmp_path)
467 muse.mkdir(parents=True)
468 return tmp_path
469
470 def test_clear_merge_state_no_file(self, tmp_path: pathlib.Path) -> None:
471 root = self._make_repo(tmp_path)
472 # Should not raise even if MERGE_STATE.json is absent
473 clear_merge_state(root)
474
475 def test_apply_resolution_copies_object(self, tmp_path: pathlib.Path) -> None:
476 root = self._make_repo(tmp_path)
477 # Write a real object to the store — oid must be the SHA-256 of the content.
478 content = b"resolved content"
479 oid = blob_id(content)
480 write_object(root, oid, content)
481
482 apply_resolution(root, "track.mid", oid)
483 dest = root / "track.mid"
484 assert dest.exists()
485 assert dest.read_bytes() == b"resolved content"
486
487 def test_apply_resolution_raises_when_object_absent(self, tmp_path: pathlib.Path) -> None:
488 root = self._make_repo(tmp_path)
489 with pytest.raises(FileNotFoundError):
490 apply_resolution(root, "track.mid", NULL_LONG_ID)
491
492 def test_read_merge_state_invalid_json_returns_none(self, tmp_path: pathlib.Path) -> None:
493 root = self._make_repo(tmp_path)
494 (merge_state_path(root)).write_text("not json {{")
495 result = read_merge_state(root)
496 assert result is None
497
498 def test_write_then_clear_merge_state(self, tmp_path: pathlib.Path) -> None:
499 root = self._make_repo(tmp_path)
500 write_merge_state(
501 root,
502 base_commit="b" * 64,
503 ours_commit="o" * 64,
504 theirs_commit="t" * 64,
505 conflict_paths=["a.mid"],
506 )
507 assert (merge_state_path(root)).exists()
508 clear_merge_state(root)
509 assert not (merge_state_path(root)).exists()
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 23 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 29 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 29 days ago