"""Tests targeting coverage gaps in core modules: object_store, repo, store, merge_engine.""" import json import os import pathlib import pytest from muse.core.types import NULL_LONG_ID, blob_id, fake_id, long_id from muse.core.object_store import ( has_object, object_path, objects_dir, read_object, restore_object, write_object, write_object_from_path, ) from muse.core.repo import find_repo_root, require_repo from muse.core.refs import get_head_commit_id from muse.core.commits import ( CommitRecord, get_commits_for_branch, get_head_snapshot_id, read_commit, resolve_commit_ref, update_commit_metadata, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, get_head_snapshot_manifest, read_snapshot, write_snapshot, ) from muse.core.tags import get_tags_for_commit from muse.core.merge_engine import apply_resolution, clear_merge_state, read_merge_state, write_merge_state from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.types import Manifest from muse.core.paths import heads_dir, merge_state_path, muse_dir, objects_dir import datetime # --------------------------------------------------------------------------- # object_store # --------------------------------------------------------------------------- class TestObjectStore: def test_objects_dir_path(self, tmp_path: pathlib.Path) -> None: d = objects_dir(tmp_path) assert d == objects_dir(tmp_path) def test_object_path_sharding(self, tmp_path: pathlib.Path) -> None: oid = long_id(f"ab{'c' * 62}") p = object_path(tmp_path, oid) assert p.parent.name == "ab" assert p.name == "c" * 62 def test_has_object_false_when_absent(self, tmp_path: pathlib.Path) -> None: assert not has_object(tmp_path, long_id("a" * 64)) def test_has_object_true_after_write(self, tmp_path: pathlib.Path) -> None: content = b"hello" oid = blob_id(content) write_object(tmp_path, oid, content) assert has_object(tmp_path, oid) def test_write_object_idempotent_returns_false(self, tmp_path: pathlib.Path) -> None: content = b"first" oid = blob_id(content) assert write_object(tmp_path, oid, content) is True # Second write with correct hash but same ID — idempotent assert write_object(tmp_path, oid, content) is False # content should not change assert read_object(tmp_path, oid) == content def test_write_object_from_path_idempotent(self, tmp_path: pathlib.Path) -> None: content = b"content" src = tmp_path / "src.bin" src.write_bytes(content) oid = blob_id(content) assert write_object_from_path(tmp_path, oid, src) is True assert write_object_from_path(tmp_path, oid, src) is False def test_write_object_from_path_stores_content(self, tmp_path: pathlib.Path) -> None: content = b"my bytes" src = tmp_path / "file.bin" src.write_bytes(content) oid = blob_id(content) write_object_from_path(tmp_path, oid, src) assert read_object(tmp_path, oid) == content def test_read_object_returns_none_when_absent(self, tmp_path: pathlib.Path) -> None: assert read_object(tmp_path, long_id("e" * 64)) is None def test_read_object_returns_bytes(self, tmp_path: pathlib.Path) -> None: content = b"data" oid = blob_id(content) write_object(tmp_path, oid, content) assert read_object(tmp_path, oid) == content def test_restore_object_returns_false_when_absent(self, tmp_path: pathlib.Path) -> None: dest = tmp_path / "out.bin" result = restore_object(tmp_path, NULL_LONG_ID, dest) assert result is False assert not dest.exists() def test_restore_object_creates_dest(self, tmp_path: pathlib.Path) -> None: content = b"restored" oid = blob_id(content) write_object(tmp_path, oid, content) dest = tmp_path / "sub" / "out.bin" result = restore_object(tmp_path, oid, dest) assert result is True assert dest.read_bytes() == content def test_restore_object_creates_parent_dirs(self, tmp_path: pathlib.Path) -> None: content = b"nested" oid = blob_id(content) write_object(tmp_path, oid, content) dest = tmp_path / "a" / "b" / "c" / "file.bin" restore_object(tmp_path, oid, dest) assert dest.exists() class TestRestoreObjectIdempotency: """restore_object must preserve the destination inode when content matches. The ``os.replace`` rename syscall always produces a new inode. Editors (Cursor, VS Code, Vim, …) use inode-based filesystem-event watchers; a spurious rename blinds them to subsequent changes, leaving permanently stale buffers. The fix: hash-check dest before writing — if bytes already match the requested object_id, return without touching the file. These tests are the regression gate for that fix. They prove: 1. When dest already has the correct content the inode is preserved. 2. When dest has *different* content the file is replaced (inode changes). 3. When dest does not yet exist the write proceeds normally. 4. A checkout-style simulation: many files, only changed ones get new inodes. """ def test_inode_preserved_when_content_matches(self, tmp_path: pathlib.Path) -> None: """Core regression: restore_object must NOT rename when content is correct.""" content = b"editor-watching-this-file" oid = blob_id(content) write_object(tmp_path, oid, content) dest = tmp_path / "file.txt" dest.write_bytes(content) inode_before = dest.stat().st_ino result = restore_object(tmp_path, oid, dest) assert result is True assert dest.read_bytes() == content # The inode must not change — a rename would produce a new inode and # blind any editor that was watching the original file descriptor. assert dest.stat().st_ino == inode_before, ( "restore_object issued a spurious rename even though dest already " "contained the correct content — this blinds inode-watching editors" ) def test_mtime_preserved_when_content_matches(self, tmp_path: pathlib.Path) -> None: """mtime stability: no spurious write means no mtime bump.""" content = b"stable-mtime-check" oid = blob_id(content) write_object(tmp_path, oid, content) dest = tmp_path / "file.txt" dest.write_bytes(content) mtime_ns_before = dest.stat().st_mtime_ns restore_object(tmp_path, oid, dest) assert dest.stat().st_mtime_ns == mtime_ns_before, ( "restore_object bumped mtime even though content was already correct" ) def test_inode_changes_when_content_differs(self, tmp_path: pathlib.Path) -> None: """When dest has wrong content the file must be replaced.""" correct = b"correct-content" wrong = b"wrong-content-different-bytes" oid = blob_id(correct) write_object(tmp_path, oid, correct) dest = tmp_path / "file.txt" dest.write_bytes(wrong) inode_before = dest.stat().st_ino result = restore_object(tmp_path, oid, dest) assert result is True assert dest.read_bytes() == correct # Content changed — a rename is expected; the inode must be different. assert dest.stat().st_ino != inode_before def test_idempotent_on_fresh_file(self, tmp_path: pathlib.Path) -> None: """When dest does not yet exist the write proceeds normally.""" content = b"brand-new-file" oid = blob_id(content) write_object(tmp_path, oid, content) dest = tmp_path / "new.txt" assert not dest.exists() result = restore_object(tmp_path, oid, dest) assert result is True assert dest.read_bytes() == content def test_second_restore_is_truly_noop(self, tmp_path: pathlib.Path) -> None: """Calling restore_object twice leaves the file and inode unchanged.""" content = b"idempotent-restore" oid = blob_id(content) write_object(tmp_path, oid, content) dest = tmp_path / "file.txt" restore_object(tmp_path, oid, dest) # first call — writes the file inode_first = dest.stat().st_ino mtime_first = dest.stat().st_mtime_ns restore_object(tmp_path, oid, dest) # second call — must be a no-op assert dest.stat().st_ino == inode_first assert dest.stat().st_mtime_ns == mtime_first assert dest.read_bytes() == content def test_checkout_simulation_only_changed_files_renamed( self, tmp_path: pathlib.Path ) -> None: """Simulate a branch checkout: only files that changed get new inodes. This is the end-to-end scenario that caused the Cursor stale-buffer bug: a ``muse checkout`` that touches N files would rename ALL of them even when most were identical on both branches. After the fix, only the genuinely changed file gets a new inode. """ unchanged_content = b"I am the same on both branches" changed_old = b"old branch content" changed_new = b"new branch content" unchanged_oid = blob_id(unchanged_content) changed_oid = blob_id(changed_new) write_object(tmp_path, unchanged_oid, unchanged_content) write_object(tmp_path, changed_oid, changed_new) unchanged_dest = tmp_path / "unchanged.py" changed_dest = tmp_path / "changed.py" # Simulate working tree before checkout unchanged_dest.write_bytes(unchanged_content) changed_dest.write_bytes(changed_old) inode_unchanged_before = unchanged_dest.stat().st_ino inode_changed_before = changed_dest.stat().st_ino # Simulate _checkout_snapshot restoring both files restore_object(tmp_path, unchanged_oid, unchanged_dest) restore_object(tmp_path, changed_oid, changed_dest) # unchanged file: inode must be preserved (no rename) assert unchanged_dest.stat().st_ino == inode_unchanged_before, ( "unchanged file got a new inode — editor watching it would go blind" ) # changed file: inode should differ (content replaced) assert changed_dest.stat().st_ino != inode_changed_before assert changed_dest.read_bytes() == changed_new # --------------------------------------------------------------------------- # repo # --------------------------------------------------------------------------- class TestFindRepoRoot: def test_finds_muse_dir_in_cwd(self, tmp_path: pathlib.Path) -> None: muse_dir(tmp_path).mkdir() result = find_repo_root(tmp_path) assert result == tmp_path def test_finds_muse_dir_in_parent(self, tmp_path: pathlib.Path) -> None: muse_dir(tmp_path).mkdir() subdir = tmp_path / "a" / "b" subdir.mkdir(parents=True) result = find_repo_root(subdir) assert result == tmp_path def test_returns_none_when_no_repo(self, tmp_path: pathlib.Path) -> None: result = find_repo_root(tmp_path) assert result is None def test_env_override_returns_path(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: muse_dir(tmp_path).mkdir() monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) result = find_repo_root() assert result == tmp_path def test_env_override_returns_none_when_not_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: # tmp_path exists but has no .muse/ monkeypatch.setenv("MUSE_REPO_ROOT", str(tmp_path)) result = find_repo_root() assert result is None def test_require_repo_exits_when_no_repo(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.delenv("MUSE_REPO_ROOT", raising=False) monkeypatch.chdir(tmp_path) with pytest.raises(SystemExit): require_repo() # --------------------------------------------------------------------------- # store coverage gaps # --------------------------------------------------------------------------- class TestStoreGaps: def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path: muse = muse_dir(tmp_path) for d in ("commits", "snapshots", "objects", "refs/heads"): (muse / d).mkdir(parents=True) (muse / "HEAD").write_text("ref: refs/heads/main\n") (muse / "repo.json").write_text(json.dumps({"repo_id": "test-repo"})) (muse / "refs" / "heads" / "main").write_text("") return tmp_path def test_get_head_commit_id_empty_branch(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) assert get_head_commit_id(root, "main") is None def test_get_head_snapshot_id_no_commits(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) assert get_head_snapshot_id(root, "main") is None def test_get_head_snapshot_manifest_no_commits(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) assert get_head_snapshot_manifest(root, "main") is None def test_get_commits_for_branch_empty(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) commits = get_commits_for_branch(root, "main") assert commits == [] def _seed_chain(self, root: pathlib.Path, n: int) -> list[str]: """Write a linear chain of *n* commits on ``main`` and return their IDs (newest first).""" ids: list[str] = [] parent_id: str | None = None manifest: Manifest = {} snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) for i in range(n): committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) + datetime.timedelta(hours=i) message = f"commit {i}" parent_ids = [parent_id] if parent_id else [] commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), ) commit = CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=message, committed_at=committed_at, parent_commit_id=parent_id, ) write_commit(root, commit) ids.append(commit_id) parent_id = commit_id # HEAD points at the last (newest) commit (heads_dir(root) / "main").write_text(ids[-1]) ids.reverse() # newest first, matching get_commits_for_branch order return ids def test_get_commits_for_branch_max_count_stops_early( self, tmp_path: pathlib.Path ) -> None: """max_count caps the walk — only that many commits are returned.""" root = self._make_repo(tmp_path) all_ids = self._seed_chain(root, 5) result = get_commits_for_branch(root, "main", max_count=2) assert len(result) == 2 assert result[0].commit_id == all_ids[0] assert result[1].commit_id == all_ids[1] def test_get_commits_for_branch_max_count_zero_returns_all( self, tmp_path: pathlib.Path ) -> None: """max_count=0 (the default) returns the full chain.""" root = self._make_repo(tmp_path) all_ids = self._seed_chain(root, 5) result = get_commits_for_branch(root, "main", max_count=0) assert len(result) == 5 assert [c.commit_id for c in result] == all_ids def test_get_commits_for_branch_max_count_larger_than_chain( self, tmp_path: pathlib.Path ) -> None: """max_count larger than the chain length returns every commit without error.""" root = self._make_repo(tmp_path) all_ids = self._seed_chain(root, 3) result = get_commits_for_branch(root, "main", max_count=100) assert len(result) == 3 assert [c.commit_id for c in result] == all_ids def test_resolve_commit_ref_with_none_returns_head(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) manifest: Manifest = {"a.mid": fake_id("a.mid-content")} snap_id = compute_snapshot_id(manifest) snap = SnapshotRecord(snapshot_id=snap_id, manifest=manifest) write_snapshot(root, snap) committed_at = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) commit_id = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="test", committed_at_iso=committed_at.isoformat(), ) commit = CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message="test", committed_at=committed_at, ) write_commit(root, commit) (heads_dir(root) / "main").write_text(commit_id) result = resolve_commit_ref(root, "main", None) assert result is not None assert result.commit_id == commit_id def test_read_commit_returns_none_for_unknown(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) assert read_commit(root, long_id("a" * 64)) is None def test_read_snapshot_returns_none_for_unknown(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) assert read_snapshot(root, long_id("b" * 64)) is None def test_update_commit_metadata_false_for_unknown(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) assert update_commit_metadata(root, long_id("c" * 64), "key", "val") is False def test_get_tags_for_commit_empty(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) tags = get_tags_for_commit(root, long_id("d" * 64), long_id("c" * 64)) assert tags == [] # --------------------------------------------------------------------------- # merge_engine coverage gaps # --------------------------------------------------------------------------- class TestMergeEngineCoverageGaps: def _make_repo(self, tmp_path: pathlib.Path) -> pathlib.Path: muse = muse_dir(tmp_path) muse.mkdir(parents=True) return tmp_path def test_clear_merge_state_no_file(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) # Should not raise even if MERGE_STATE.json is absent clear_merge_state(root) def test_apply_resolution_copies_object(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) # Write a real object to the store — oid must be the SHA-256 of the content. content = b"resolved content" oid = blob_id(content) write_object(root, oid, content) apply_resolution(root, "track.mid", oid) dest = root / "track.mid" assert dest.exists() assert dest.read_bytes() == b"resolved content" def test_apply_resolution_raises_when_object_absent(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) with pytest.raises(FileNotFoundError): apply_resolution(root, "track.mid", NULL_LONG_ID) def test_read_merge_state_invalid_json_returns_none(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) (merge_state_path(root)).write_text("not json {{") result = read_merge_state(root) assert result is None def test_write_then_clear_merge_state(self, tmp_path: pathlib.Path) -> None: root = self._make_repo(tmp_path) write_merge_state( root, base_commit="b" * 64, ours_commit="o" * 64, theirs_commit="t" * 64, conflict_paths=["a.mid"], ) assert (merge_state_path(root)).exists() clear_merge_state(root) assert not (merge_state_path(root)).exists()