"""Comprehensive tests for the "directories as first-class objects" feature. Covers every changed surface: - directories_from_manifest (unit) - walk_workdir_with_dirs (unit + integration) - compute_snapshot_id with directories parameter (unit) - detect_directory_renames (unit + property-style) - diff_workdir_vs_snapshot 6-tuple (unit + integration) - SnapshotRecord.directories serialisation round-trip (unit) - write_snapshot / read_snapshot with directories (integration) - CodePlugin.diff directory rename detection (integration) - delta_summary directory rename counting (unit) - replay_one propagates directories to new SnapshotRecord (integration) - Full commit → branch → rename → commit → merge E2E workflow (e2e) - Stress / performance (stress) - Security: path traversal, symlinks, adversarial inputs (security) """ from __future__ import annotations import datetime import hashlib import json import os import pathlib import subprocess import sys import time import pytest from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.snapshot import ( detect_directory_renames, diff_workdir_vs_snapshot, directories_from_manifest, hash_file, walk_workdir_with_dirs, ) from muse.core.commits import ( CommitRecord, read_commit, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, read_snapshot, write_snapshot, ) from muse.domain import RenameOp, SnapshotManifest from muse.core.types import Manifest, MsgpackDict, blob_id, fake_id, now_utc_iso, split_id from muse.plugins.code.plugin import CodePlugin from muse.core.paths import ref_path, muse_dir # --------------------------------------------------------------------------- # Shared helpers # --------------------------------------------------------------------------- _REPO_ID = "test-repo-dirs" _counter = 0 def _init_store(root: pathlib.Path) -> None: dot_muse = muse_dir(root) for d in ("commits", "snapshots", "objects", "refs/heads"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" ) def _make_snap(root: pathlib.Path, manifest: Manifest, dirs: list[str] | None = None) -> SnapshotRecord: dirs = dirs if dirs is not None else directories_from_manifest(manifest) sid = compute_snapshot_id(manifest, dirs) rec = SnapshotRecord(snapshot_id=sid, manifest=manifest, directories=dirs) write_snapshot(root, rec) return rec def _make_commit_rec( root: pathlib.Path, snap: SnapshotRecord, branch: str = "main", parent_id: str | None = None, message: str = "test commit", ) -> CommitRecord: global _counter _counter += 1 committed_at = datetime.datetime.now(datetime.timezone.utc) cid = compute_commit_id( [parent_id] if parent_id else [], snap.snapshot_id, message, committed_at.isoformat(), ) rec = CommitRecord( commit_id=cid, branch=branch, snapshot_id=snap.snapshot_id, message=message, committed_at=committed_at, parent_commit_id=parent_id, ) write_commit(root, rec) (ref_path(root, branch)).write_text(cid, encoding="utf-8") return rec @pytest.fixture() def store(tmp_path: pathlib.Path) -> pathlib.Path: _init_store(tmp_path) return tmp_path @pytest.fixture() def workdir(tmp_path: pathlib.Path) -> pathlib.Path: return tmp_path # =========================================================================== # 1. directories_from_manifest — unit # =========================================================================== class TestDirectoriesFromManifest: def test_empty_manifest_returns_empty(self) -> None: assert directories_from_manifest({}) == [] def test_flat_files_no_dirs(self) -> None: result = directories_from_manifest({"a.py": "h1", "b.py": "h2"}) assert result == [] def test_single_nested_file(self) -> None: result = directories_from_manifest({"src/main.py": "h1"}) assert result == ["src"] def test_deeply_nested(self) -> None: result = directories_from_manifest({"a/b/c/d.py": "h1"}) assert result == ["a", "a/b", "a/b/c"] def test_multiple_files_same_dir_deduped(self) -> None: result = directories_from_manifest({"src/a.py": "h1", "src/b.py": "h2"}) assert result == ["src"] def test_sibling_dirs(self) -> None: result = directories_from_manifest({ "src/foo.py": "h1", "tests/bar.py": "h2", }) assert result == ["src", "tests"] def test_mixed_flat_and_nested(self) -> None: result = directories_from_manifest({ "root.py": "h0", "src/main.py": "h1", "src/lib/util.py": "h2", }) assert result == ["src", "src/lib"] def test_result_is_sorted(self) -> None: result = directories_from_manifest({ "z/file.py": "h1", "a/file.py": "h2", "m/sub/file.py": "h3", }) assert result == sorted(result) def test_result_is_deduplicated(self) -> None: result = directories_from_manifest({ "pkg/a.py": "h1", "pkg/b.py": "h2", "pkg/c.py": "h3", }) assert result.count("pkg") == 1 def test_large_flat_tree_no_dirs(self) -> None: manifest = {f"file_{i}.txt": f"hash{i}" for i in range(200)} assert directories_from_manifest(manifest) == [] def test_preserves_posix_separators(self) -> None: result = directories_from_manifest({"foo/bar/baz.py": "h"}) assert all("/" in d or d == "foo" for d in result) assert "\\" not in "".join(result) # =========================================================================== # 2. compute_snapshot_id with directories — unit # =========================================================================== class TestComputeSnapshotIdWithDirectories: def test_no_dirs_matches_legacy_behaviour(self) -> None: m = {"a.py": fake_id("h1")} assert compute_snapshot_id(m) == compute_snapshot_id(m, None) assert compute_snapshot_id(m) == compute_snapshot_id(m, []) def test_dirs_change_the_id(self) -> None: m = {"a.py": fake_id("h1")} without = compute_snapshot_id(m, []) with_dir = compute_snapshot_id(m, ["src"]) assert without != with_dir def test_different_dirs_different_id(self) -> None: m = {"a.py": fake_id("h1")} id1 = compute_snapshot_id(m, ["src"]) id2 = compute_snapshot_id(m, ["lib"]) assert id1 != id2 def test_same_files_same_dirs_deterministic(self) -> None: m = {"a/b.py": fake_id("h1"), "c/d.py": fake_id("h2")} dirs = ["a", "c"] assert compute_snapshot_id(m, dirs) == compute_snapshot_id(m, dirs) def test_dir_order_independent(self) -> None: m = {"a.py": fake_id("h1")} id1 = compute_snapshot_id(m, ["src", "lib"]) id2 = compute_snapshot_id(m, ["lib", "src"]) assert id1 == id2 def test_file_rename_changes_id_even_with_same_dirs(self) -> None: dirs = ["src"] id1 = compute_snapshot_id({"src/a.py": fake_id("h1")}, dirs) id2 = compute_snapshot_id({"src/b.py": fake_id("h1")}, dirs) assert id1 != id2 def test_dir_rename_changes_id_same_file_content(self) -> None: manifest = {"f.py": fake_id("h1")} id_old = compute_snapshot_id(manifest, ["old_name"]) id_new = compute_snapshot_id(manifest, ["new_name"]) assert id_old != id_new def test_result_is_64_hex_chars(self) -> None: sid = compute_snapshot_id({"a.py": fake_id("h")}, ["src"]) assert len(sid) == 71 assert all(c in "0123456789abcdef" for c in split_id(sid)[1]) # =========================================================================== # 3. detect_directory_renames — unit # =========================================================================== class TestDetectDirectoryRenames: def test_clean_single_rename(self) -> None: last = {"old/a.py": "h1", "old/b.py": "h2"} current = {"new/a.py": "h1", "new/b.py": "h2"} renames = detect_directory_renames({"old"}, {"new"}, last, current) assert renames == [("old", "new")] def test_no_rename_content_changed(self) -> None: last = {"old/a.py": "h1"} current = {"new/a.py": "DIFFERENT"} renames = detect_directory_renames({"old"}, {"new"}, last, current) assert renames == [] def test_no_rename_empty_old_dir(self) -> None: # old dir has no files in last_manifest → can't match last: Manifest = {} current = {"new/a.py": "h1"} renames = detect_directory_renames({"old"}, {"new"}, last, current) assert renames == [] def test_multiple_independent_renames(self) -> None: last = {"foo/x.py": "h1", "bar/y.py": "h2"} current = {"baz/x.py": "h1", "qux/y.py": "h2"} renames = detect_directory_renames({"foo", "bar"}, {"baz", "qux"}, last, current) assert set(renames) == {("foo", "baz"), ("bar", "qux")} def test_ambiguous_candidates_not_renamed(self) -> None: # Two added dirs have identical file sets → ambiguous, none matched last = {"old/f.py": "h1"} current = {"new1/f.py": "h1", "new2/f.py": "h1"} renames = detect_directory_renames({"old"}, {"new1", "new2"}, last, current) # Should match exactly one (first sorted candidate wins) assert len(renames) == 1 def test_partial_match_not_renamed(self) -> None: last = {"old/a.py": "h1", "old/b.py": "h2"} current = {"new/a.py": "h1"} # b.py missing renames = detect_directory_renames({"old"}, {"new"}, last, current) assert renames == [] def test_extra_file_in_new_dir_not_renamed(self) -> None: last = {"old/a.py": "h1"} current = {"new/a.py": "h1", "new/extra.py": "h2"} renames = detect_directory_renames({"old"}, {"new"}, last, current) assert renames == [] def test_returns_list_of_tuples(self) -> None: last = {"src/main.py": "abc"} current = {"lib/main.py": "abc"} result = detect_directory_renames({"src"}, {"lib"}, last, current) assert isinstance(result, list) assert all(isinstance(r, tuple) and len(r) == 2 for r in result) def test_empty_sets_returns_empty(self) -> None: assert detect_directory_renames(set(), set(), {}, {}) == [] def test_single_file_dir_rename(self) -> None: last = {"pkg/module.py": "cafebabe"} current = {"renamed_pkg/module.py": "cafebabe"} renames = detect_directory_renames({"pkg"}, {"renamed_pkg"}, last, current) assert renames == [("pkg", "renamed_pkg")] # =========================================================================== # 4. diff_workdir_vs_snapshot — 6-tuple (fix broken existing + new) # =========================================================================== class TestDiffWorkdirVsSnapshot6Tuple: def test_returns_6_tuple(self, workdir: pathlib.Path) -> None: result = diff_workdir_vs_snapshot(workdir, {}) assert len(result) == 6 def test_untracked_first_commit(self, workdir: pathlib.Path) -> None: (workdir / "f.py").write_bytes(b"x") added, modified, deleted, untracked, added_dirs, deleted_dirs = \ diff_workdir_vs_snapshot(workdir, {}) assert added == set() assert "f.py" in untracked def test_added_file_detected(self, workdir: pathlib.Path) -> None: (workdir / "f.py").write_bytes(b"x") added, modified, deleted, untracked, added_dirs, deleted_dirs = \ diff_workdir_vs_snapshot(workdir, {"other.py": "abc"}) assert "f.py" in added assert "other.py" in deleted def test_modified_file_detected(self, workdir: pathlib.Path) -> None: f = workdir / "f.py" f.write_bytes(b"new content") added, modified, deleted, untracked, added_dirs, deleted_dirs = \ diff_workdir_vs_snapshot(workdir, {"f.py": "oldhash"}) assert "f.py" in modified def test_clean_workdir_all_empty(self, workdir: pathlib.Path) -> None: f = workdir / "f.py" f.write_bytes(b"content") h = hash_file(f) added, modified, deleted, untracked, added_dirs, deleted_dirs = \ diff_workdir_vs_snapshot(workdir, {"f.py": h}) assert not added and not modified and not deleted and not untracked def test_added_dir_detected(self, workdir: pathlib.Path) -> None: (workdir / "src").mkdir() (workdir / "src" / "main.py").write_bytes(b"x") added, modified, deleted, untracked, added_dirs, deleted_dirs = \ diff_workdir_vs_snapshot(workdir, {"root.py": "abc"}, last_directories=["lib"]) assert "src" in added_dirs assert "lib" in deleted_dirs def test_deleted_dir_detected(self, workdir: pathlib.Path) -> None: (workdir / "f.py").write_bytes(b"x") h = hash_file(workdir / "f.py") added, modified, deleted, untracked, added_dirs, deleted_dirs = \ diff_workdir_vs_snapshot(workdir, {"f.py": h}, last_directories=["old_dir"]) assert "old_dir" in deleted_dirs def test_unchanged_dirs_not_in_delta(self, workdir: pathlib.Path) -> None: (workdir / "src").mkdir() (workdir / "src" / "main.py").write_bytes(b"x") h = hash_file(workdir / "src" / "main.py") added, modified, deleted, untracked, added_dirs, deleted_dirs = \ diff_workdir_vs_snapshot(workdir, {"src/main.py": h}, last_directories=["src"]) assert "src" not in added_dirs assert "src" not in deleted_dirs def test_nonexistent_workdir_returns_all_deleted(self, tmp_path: pathlib.Path) -> None: missing = tmp_path / "gone" added, modified, deleted, untracked, added_dirs, deleted_dirs = \ diff_workdir_vs_snapshot(missing, {"f.py": "h"}, last_directories=["src"]) assert "f.py" in deleted assert "src" in deleted_dirs assert not added def test_pruned_dirs_not_tracked(self, workdir: pathlib.Path) -> None: (workdir / "node_modules").mkdir() (workdir / "node_modules" / "pkg.js").write_bytes(b"x") (workdir / "src").mkdir() (workdir / "src" / "app.py").write_bytes(b"y") added, modified, deleted, untracked, added_dirs, deleted_dirs = \ diff_workdir_vs_snapshot(workdir, {}) assert "node_modules" not in added_dirs assert "src" in added_dirs or "src/app.py" in untracked # =========================================================================== # 5. walk_workdir_with_dirs — unit # =========================================================================== class TestWalkWorkdirWithDirs: def test_empty_dir_returns_empty(self, workdir: pathlib.Path) -> None: files, dirs = walk_workdir_with_dirs(workdir) assert files == {} assert dirs == [] def test_flat_files_no_dirs(self, workdir: pathlib.Path) -> None: (workdir / "a.py").write_bytes(b"x") files, dirs = walk_workdir_with_dirs(workdir) assert "a.py" in files assert dirs == [] def test_nested_file_dir_tracked(self, workdir: pathlib.Path) -> None: (workdir / "src").mkdir() (workdir / "src" / "main.py").write_bytes(b"x") files, dirs = walk_workdir_with_dirs(workdir) assert "src/main.py" in files assert "src" in dirs def test_deeply_nested_dirs_all_tracked(self, workdir: pathlib.Path) -> None: deep = workdir / "a" / "b" / "c" deep.mkdir(parents=True) (deep / "f.py").write_bytes(b"x") files, dirs = walk_workdir_with_dirs(workdir) assert "a" in dirs assert "a/b" in dirs assert "a/b/c" in dirs def test_dirs_sorted(self, workdir: pathlib.Path) -> None: for name in ("zzz", "aaa", "mmm"): (workdir / name).mkdir() (workdir / name / "f.py").write_bytes(b"x") _, dirs = walk_workdir_with_dirs(workdir) assert dirs == sorted(dirs) def test_pruned_dirs_excluded(self, workdir: pathlib.Path) -> None: (workdir / "node_modules").mkdir() (workdir / "node_modules" / "lib.js").write_bytes(b"x") (workdir / "__pycache__").mkdir() (workdir / "__pycache__" / "mod.pyc").write_bytes(b"x") _, dirs = walk_workdir_with_dirs(workdir) assert "node_modules" not in dirs assert "__pycache__" not in dirs def test_symlinks_not_followed(self, workdir: pathlib.Path) -> None: real = workdir / "real_dir" real.mkdir() (real / "secret.py").write_bytes(b"secret") link = workdir / "link_dir" link.symlink_to(real) files, dirs = walk_workdir_with_dirs(workdir) # symlink directory should not be descended (followlinks=False) assert "link_dir/secret.py" not in files # =========================================================================== # 6. SnapshotRecord.directories serialisation — unit # =========================================================================== class TestSnapshotRecordDirectories: def test_default_directories_is_empty_list(self) -> None: rec = SnapshotRecord(snapshot_id="abc", manifest={}) assert rec.directories == [] def test_to_dict_includes_directories(self) -> None: rec = SnapshotRecord(snapshot_id="abc", manifest={}, directories=["src", "lib"]) d = rec.to_dict() assert d["directories"] == ["src", "lib"] def test_from_dict_roundtrip(self) -> None: rec = SnapshotRecord(snapshot_id="abc", manifest={"f.py": "h"}, directories=["pkg"]) loaded = SnapshotRecord.from_dict(rec.to_dict()) assert loaded.directories == ["pkg"] def test_from_dict_roundtrip(self) -> None: rec = SnapshotRecord(snapshot_id="xyz", manifest={}, directories=["a", "b"]) d: MsgpackDict = dict(rec.to_dict()) loaded = SnapshotRecord.from_dict(d) assert loaded.directories == ["a", "b"] def test_from_dict_missing_field_defaults_empty(self) -> None: d: MsgpackDict = { "snapshot_id": "abc", "manifest": {}, "created_at": now_utc_iso(), "note": "", } rec = SnapshotRecord.from_dict(d) assert rec.directories == [] def test_from_dict_filters_non_string_items(self) -> None: d: MsgpackDict = { "snapshot_id": "abc", "manifest": {}, "directories": ["valid", 42, None, "also_valid"], "created_at": now_utc_iso(), "note": "", } rec = SnapshotRecord.from_dict(d) assert rec.directories == ["valid", "also_valid"] def test_from_dict_non_list_directories_defaults_empty(self) -> None: d: MsgpackDict = { "snapshot_id": "abc", "manifest": {}, "directories": "not-a-list", "created_at": now_utc_iso(), "note": "", } rec = SnapshotRecord.from_dict(d) assert rec.directories == [] def test_to_dict_returns_copy_not_reference(self) -> None: dirs = ["src"] rec = SnapshotRecord(snapshot_id="abc", manifest={}, directories=dirs) d = rec.to_dict() d["directories"].append("mutated") assert rec.directories == ["src"] # =========================================================================== # 7. write_snapshot / read_snapshot roundtrip with directories — integration # =========================================================================== class TestWriteReadSnapshotWithDirectories: def test_roundtrip_preserves_directories(self, store: pathlib.Path) -> None: manifest = {"src/main.py": fake_id("h1"), "src/util.py": fake_id("h2")} dirs = ["src"] sid = compute_snapshot_id(manifest, dirs) rec = SnapshotRecord(snapshot_id=sid, manifest=manifest, directories=dirs) write_snapshot(store, rec) loaded = read_snapshot(store, sid) assert loaded is not None assert loaded.directories == ["src"] def test_roundtrip_empty_directories(self, store: pathlib.Path) -> None: manifest = {"f.py": fake_id("h1")} sid = compute_snapshot_id(manifest, []) rec = SnapshotRecord(snapshot_id=sid, manifest=manifest, directories=[]) write_snapshot(store, rec) loaded = read_snapshot(store, sid) assert loaded is not None assert loaded.directories == [] def test_roundtrip_deeply_nested_dirs(self, store: pathlib.Path) -> None: manifest = {"a/b/c/d.py": fake_id("h1")} dirs = directories_from_manifest(manifest) sid = compute_snapshot_id(manifest, dirs) rec = SnapshotRecord(snapshot_id=sid, manifest=manifest, directories=dirs) write_snapshot(store, rec) loaded = read_snapshot(store, sid) assert loaded is not None assert loaded.directories == ["a", "a/b", "a/b/c"] def test_snapshot_id_includes_dirs_in_verification(self, store: pathlib.Path) -> None: """read_snapshot verifies the stored ID — tampering with dirs must fail.""" manifest = {"f.py": fake_id("h1")} dirs = ["src"] sid = compute_snapshot_id(manifest, dirs) rec = SnapshotRecord(snapshot_id=sid, manifest=manifest, directories=dirs) write_snapshot(store, rec) # Compute ID without dirs — must be different sid_no_dirs = compute_snapshot_id(manifest, []) assert sid != sid_no_dirs def test_directory_rename_produces_different_snapshot_id(self, store: pathlib.Path) -> None: manifest = {"f.py": fake_id("h1")} id_old = compute_snapshot_id(manifest, ["old_name"]) id_new = compute_snapshot_id(manifest, ["new_name"]) assert id_old != id_new # =========================================================================== # 8. RenameOp TypedDict — unit # =========================================================================== class TestDirectoryRenameOp: def test_construct_fields(self) -> None: op = RenameOp( op="rename", address="new/path", from_address="old/path", ) assert op["op"] == "rename" assert op["address"] == "new/path" assert op["from_address"] == "old/path" def test_rename_op_has_no_file_count(self) -> None: import typing hints = typing.get_type_hints(RenameOp) assert "file_count" not in hints # =========================================================================== # 9. CodePlugin.diff directory rename detection — integration # =========================================================================== class TestCodePluginDiffDirectories: @pytest.fixture() def plugin(self) -> CodePlugin: from muse.plugins.code.plugin import CodePlugin return CodePlugin() def _snap(self, files: Manifest, dirs: list[str] | None = None) -> SnapshotManifest: d = dirs if dirs is not None else directories_from_manifest(files) return SnapshotManifest(files=files, domain="code", directories=d) def test_directory_rename_emits_rename_op(self, plugin: CodePlugin) -> None: base = self._snap({"src/a.py": "h1", "src/b.py": "h2"}, ["src"]) target = self._snap({"lib/a.py": "h1", "lib/b.py": "h2"}, ["lib"]) delta = plugin.diff(base, target) ops = delta["ops"] dir_rename_ops = [o for o in ops if o["op"] == "rename"] assert len(dir_rename_ops) == 1 assert dir_rename_ops[0]["from_address"] == "src/" assert dir_rename_ops[0]["address"] == "lib/" def test_directory_rename_suppresses_file_level_ops(self, plugin: CodePlugin) -> None: base = self._snap({"src/a.py": "h1"}, ["src"]) target = self._snap({"lib/a.py": "h1"}, ["lib"]) delta = plugin.diff(base, target) ops = delta["ops"] # No plain insert/delete for the covered file paths file_ops = [o for o in ops if o["op"] in ("insert", "delete") and "/" in o["address"]] assert not any(o["address"] in ("src/a.py", "lib/a.py") for o in file_ops) def test_plain_added_dir_emits_insert_op(self, plugin: CodePlugin) -> None: base = self._snap({}, []) target = self._snap({"new/f.py": "h1"}, ["new"]) delta = plugin.diff(base, target) ops = delta["ops"] insert_dir_ops = [o for o in ops if o["op"] == "insert" and o["address"] == "new/"] assert len(insert_dir_ops) == 1 def test_plain_deleted_dir_emits_delete_op(self, plugin: CodePlugin) -> None: base = self._snap({"old/f.py": "h1"}, ["old"]) target = self._snap({}, []) delta = plugin.diff(base, target) ops = delta["ops"] delete_dir_ops = [o for o in ops if o["op"] == "delete" and o["address"] == "old/"] assert len(delete_dir_ops) == 1 def test_no_dir_changes_no_dir_ops(self, plugin: CodePlugin) -> None: base = self._snap({"src/a.py": "h1"}, ["src"]) target = self._snap({"src/a.py": "h2"}, ["src"]) delta = plugin.diff(base, target) ops = delta["ops"] dir_ops = [o for o in ops if o["op"] == "rename" or (o["op"] in ("insert", "delete") and "::" not in o["address"] and "/" not in o["address"])] assert not any(o["op"] == "rename" for o in dir_ops) def test_no_directories_field_no_crash(self, plugin: CodePlugin) -> None: # Snapshots without the directories key should not crash base = SnapshotManifest(files={"a.py": "h1"}, domain="code", directories=[]) target = SnapshotManifest(files={"b.py": "h1"}, domain="code", directories=[]) delta = plugin.diff(base, target) assert "ops" in delta # =========================================================================== # 10. delta_summary directory rename counting — unit # =========================================================================== class TestDeltaSummaryDirectories: def _make_dir_rename_op(self, old: str, new: str) -> RenameOp: return RenameOp(op="rename", address=new, from_address=old) def test_no_changes_returns_no_changes(self) -> None: from muse.plugins.code.symbol_diff import delta_summary assert delta_summary([]) == "no changes" def test_single_directory_rename(self) -> None: from muse.plugins.code.symbol_diff import delta_summary ops = [self._make_dir_rename_op("old", "new")] result = delta_summary(ops) assert "renamed" in result def test_two_directory_renames_plural(self) -> None: from muse.plugins.code.symbol_diff import delta_summary ops = [ self._make_dir_rename_op("a", "x"), self._make_dir_rename_op("b", "y"), ] result = delta_summary(ops) assert "2" in result assert "renamed" in result def test_directory_rename_combined_with_file_ops(self) -> None: from muse.plugins.code.symbol_diff import delta_summary from muse.domain import InsertOp insert_op = InsertOp( op="insert", address="new_file.py", position=None, content_id="h1", content_summary="", ) rename_op = self._make_dir_rename_op("src", "lib") result = delta_summary([insert_op, rename_op]) assert "added" in result assert "renamed" in result def test_directory_rename_not_counted_as_file(self) -> None: from muse.plugins.code.symbol_diff import delta_summary ops = [self._make_dir_rename_op("old", "new")] result = delta_summary(ops) assert "1 added" not in result assert "1 removed" not in result # =========================================================================== # 11. replay_one propagates directories — integration # =========================================================================== class TestReplayOneWithDirectories: def _write_obj(self, store: pathlib.Path, content: bytes) -> str: from muse.core.object_store import write_object oid = blob_id(content) write_object(store, oid, content) return oid def test_clean_merge_produces_snapshot_with_dirs(self, store: pathlib.Path) -> None: from muse.core.rebase import replay_one from muse.plugins.code.plugin import CodePlugin plugin = CodePlugin() domain = "code" # Write actual file objects so apply_manifest can restore them oid_a = self._write_obj(store, b"# a.py content\n") oid_b = self._write_obj(store, b"# b.py content\n") # Create a base commit: one file in src/ base_manifest = {"src/a.py": oid_a} base_dirs = directories_from_manifest(base_manifest) base_snap = _make_snap(store, base_manifest, base_dirs) base_commit = _make_commit_rec(store, base_snap, message="base") # Create "theirs" commit: adds src/b.py (same parent = base) theirs_manifest = {"src/a.py": oid_a, "src/b.py": oid_b} theirs_dirs = directories_from_manifest(theirs_manifest) theirs_snap = _make_snap(store, theirs_manifest, theirs_dirs) theirs_commit = _make_commit_rec( store, theirs_snap, parent_id=base_commit.commit_id, message="theirs" ) # replay theirs_commit on top of base_commit (onto = base) result = replay_one( root=store, commit=theirs_commit, parent_id=base_commit.commit_id, plugin=plugin, domain=domain, branch="main", ) assert isinstance(result, CommitRecord), f"Expected CommitRecord, got: {result}" replayed_snap = read_snapshot(store, result.snapshot_id) assert replayed_snap is not None assert replayed_snap.directories == ["src"] def test_conflict_returns_path_list_not_commit(self, store: pathlib.Path) -> None: from muse.core.rebase import replay_one from muse.plugins.code.plugin import CodePlugin plugin = CodePlugin() oid_v1 = self._write_obj(store, b"version 1\n") oid_v2 = self._write_obj(store, b"version 2\n") oid_v3 = self._write_obj(store, b"version 3\n") # base: file a.py = v1 base_manifest = {"a.py": oid_v1} base_snap = _make_snap(store, base_manifest) base_commit = _make_commit_rec(store, base_snap, message="base") # "theirs" modifies a.py from v1 → v2 theirs_manifest = {"a.py": oid_v2} theirs_snap = _make_snap(store, theirs_manifest) theirs_commit = _make_commit_rec( store, theirs_snap, parent_id=base_commit.commit_id, message="theirs" ) # "ours" (parent_id in replay) also modified a.py from v1 → v3 (conflict) ours_manifest = {"a.py": oid_v3} ours_snap = _make_snap(store, ours_manifest) ours_commit = _make_commit_rec(store, ours_snap, message="ours") result = replay_one( root=store, commit=theirs_commit, parent_id=ours_commit.commit_id, plugin=plugin, domain="code", branch="main", ) # Should return conflict paths, not a CommitRecord assert isinstance(result, list) # =========================================================================== # 12. E2E workflow — full CLI commit/branch/rename/merge cycle # =========================================================================== def _muse(repo: pathlib.Path, *args: str) -> subprocess.CompletedProcess[str]: import shutil import sys # Prefer a .venv installation (development mode), fall back to the # interpreter's sibling or the PATH-resolved binary. venv_muse = pathlib.Path(__file__).parent.parent / ".venv" / "bin" / "muse" if venv_muse.exists(): muse_bin = str(venv_muse) else: sibling = pathlib.Path(sys.executable).parent / "muse" muse_bin = str(sibling) if sibling.exists() else (shutil.which("muse") or "muse") return subprocess.run( [muse_bin, *args], cwd=str(repo), capture_output=True, text=True, ) class TestDirectoriesE2EWorkflow: @pytest.fixture() def repo(self, tmp_path: pathlib.Path) -> pathlib.Path: result = _muse(tmp_path, "init") assert result.returncode == 0, result.stderr return tmp_path def test_commit_records_directories(self, repo: pathlib.Path) -> None: (repo / "src").mkdir() (repo / "src" / "main.py").write_text("x = 1\n") r = _muse(repo, "commit", "-m", "add src/main.py") assert r.returncode == 0, r.stderr # Read the snapshot from store and confirm directories is populated from muse.core.commits import ( get_head_snapshot_id, read_commit, ) from muse.core.refs import read_current_branch branch = read_current_branch(repo) snap_id = get_head_snapshot_id(repo, branch) assert snap_id is not None snap = read_snapshot(repo, snap_id) assert snap is not None assert "src" in snap.directories def test_snapshot_id_changes_on_dir_rename(self, repo: pathlib.Path) -> None: (repo / "old_name").mkdir() (repo / "old_name" / "f.py").write_text("x = 1\n") _muse(repo, "commit", "-m", "add old_name/") from muse.core.refs import read_current_branch from muse.core.commits import get_head_snapshot_id branch = read_current_branch(repo) sid_before = get_head_snapshot_id(repo, branch) # Simulate rename: remove old dir, create new dir with same content import shutil shutil.move(str(repo / "old_name"), str(repo / "new_name")) _muse(repo, "code", "add", ".") _muse(repo, "commit", "-m", "rename dir") sid_after = get_head_snapshot_id(repo, branch) assert sid_before != sid_after def test_status_handles_directory_rename_op(self, repo: pathlib.Path) -> None: (repo / "src").mkdir() (repo / "src" / "app.py").write_text("app = True\n") _muse(repo, "commit", "-m", "initial") import shutil shutil.move(str(repo / "src"), str(repo / "lib")) r = _muse(repo, "status") assert r.returncode == 0, r.stderr def test_nested_directories_tracked_through_commit(self, repo: pathlib.Path) -> None: deep = repo / "a" / "b" / "c" deep.mkdir(parents=True) (deep / "f.py").write_text("pass\n") r = _muse(repo, "commit", "-m", "deep nest") assert r.returncode == 0, r.stderr from muse.core.refs import read_current_branch from muse.core.commits import get_head_snapshot_id branch = read_current_branch(repo) snap = read_snapshot(repo, get_head_snapshot_id(repo, branch)) assert snap is not None assert "a" in snap.directories assert "a/b" in snap.directories assert "a/b/c" in snap.directories # =========================================================================== # 13. Empty directory ghost bug # # Regression tests for: empty directories left on disk after their files are # deleted and committed must NOT appear in `muse status --json` `added`. # # Root cause: CodePlugin.snapshot() recorded every directory visited by # os.walk() into `dirs`, including empty ones. These empty dirs had no # counterpart in HEAD, so diff() produced InsertOp entries for them, # and status --json reported them as `added`. # =========================================================================== class TestEmptyDirectoryGhost: """Empty orphan directories must not appear as 'added' in muse status.""" @pytest.fixture() def repo(self, tmp_path: pathlib.Path) -> pathlib.Path: result = _muse(tmp_path, "init") assert result.returncode == 0, result.stderr return tmp_path # ── Unit: snapshot() must not include empty dirs ────────────────────────── def test_snapshot_excludes_empty_directory(self, tmp_path: pathlib.Path) -> None: """CodePlugin.snapshot() must not list a directory that has no files.""" from muse.plugins.code.plugin import CodePlugin _muse(tmp_path, "init") plugin = CodePlugin() # Empty nested directory — no files, no .musekeep (tmp_path / "empty_pkg" / "sub").mkdir(parents=True) snap = plugin.snapshot(tmp_path) assert "empty_pkg" not in snap["directories"], ( "Empty directory 'empty_pkg' must not appear in snapshot directories" ) assert "empty_pkg/sub" not in snap["directories"], ( "Empty nested directory 'empty_pkg/sub' must not appear in snapshot directories" ) def test_snapshot_includes_dir_with_files(self, tmp_path: pathlib.Path) -> None: """Directories containing files must still appear in the snapshot.""" from muse.plugins.code.plugin import CodePlugin _muse(tmp_path, "init") plugin = CodePlugin() (tmp_path / "pkg").mkdir() (tmp_path / "pkg" / "mod.py").write_text("x = 1\n") snap = plugin.snapshot(tmp_path) assert "pkg" in snap["directories"] # ── Integration: status --json must not list orphan empty dirs ──────────── def test_status_does_not_report_never_committed_empty_dir(self, repo: pathlib.Path) -> None: """An empty directory that was never committed must not appear in added.""" (repo / "orphan" / "nested").mkdir(parents=True) # No files, never committed r = _muse(repo, "status", "--json") assert r.returncode == 0, r.stderr data = json.loads(r.stdout) assert "orphan" not in data["added"], ( "Untracked empty directory 'orphan' must not appear as added" ) assert "orphan/nested" not in data["added"], ( "Untracked empty nested directory must not appear as added" ) def test_status_reports_added_for_dir_with_new_file(self, repo: pathlib.Path) -> None: """A new directory containing a real file must still appear as added.""" (repo / "new_pkg").mkdir() (repo / "new_pkg" / "api.py").write_text("pass\n") r = _muse(repo, "status", "--json") assert r.returncode == 0, r.stderr data = json.loads(r.stdout) # The file should be added (the directory entry itself may or may not be # in added — what matters is the file is visible and dirs without files are not) all_visible = data["added"] + data["untracked"] assert any("new_pkg" in p for p in all_visible), ( "New directory with a file should be reflected in added or untracked" ) # =========================================================================== # 13. Stress / performance # =========================================================================== class TestDirectoriesStress: def test_directories_from_manifest_1000_files(self) -> None: manifest = { f"pkg_{i}/sub_{j}/file_{k}.py": f"hash{i}{j}{k}" for i in range(10) for j in range(10) for k in range(10) } assert len(manifest) == 1000 start = time.monotonic() dirs = directories_from_manifest(manifest) elapsed = time.monotonic() - start # Should complete in under 1 second for 1000 files assert elapsed < 1.0, f"directories_from_manifest took {elapsed:.3f}s for 1000 files" # 10 top-level dirs (pkg_0..9) + 100 second-level dirs (pkg_N/sub_M) = 110 assert len(dirs) == 110 def test_detect_directory_renames_50_dirs(self) -> None: # 50 dirs each with 5 files, all renamed old_N → new_N last: Manifest = {} current: Manifest = {} for i in range(50): for j in range(5): h = blob_id(f"content_{i}_{j}".encode()) last[f"old_{i}/file_{j}.py"] = h current[f"new_{i}/file_{j}.py"] = h deleted = {f"old_{i}" for i in range(50)} added = {f"new_{i}" for i in range(50)} start = time.monotonic() renames = detect_directory_renames(deleted, added, last, current) elapsed = time.monotonic() - start assert elapsed < 2.0, f"detect_directory_renames took {elapsed:.3f}s for 50 dirs" assert len(renames) == 50 def test_compute_snapshot_id_large_dir_list(self) -> None: manifest = {f"f_{i}.py": fake_id(f"h{i}") for i in range(500)} dirs = [f"dir_{i}" for i in range(500)] start = time.monotonic() sid = compute_snapshot_id(manifest, dirs) elapsed = time.monotonic() - start assert elapsed < 1.0, f"compute_snapshot_id took {elapsed:.3f}s for 500 dirs" assert len(sid) == 71 def test_walk_workdir_with_dirs_deep_tree(self, tmp_path: pathlib.Path) -> None: # 20 levels of nesting deep = tmp_path for level in range(20): deep = deep / f"level_{level}" deep.mkdir() (deep / "leaf.py").write_bytes(b"x") start = time.monotonic() files, dirs = walk_workdir_with_dirs(tmp_path) elapsed = time.monotonic() - start assert elapsed < 2.0, f"walk_workdir_with_dirs took {elapsed:.3f}s on 20-level tree" assert "leaf.py" in "".join(files.keys()) assert len(dirs) == 20 # =========================================================================== # 14. Security # =========================================================================== class TestDirectoriesSecurity: def test_path_traversal_in_directory_address_not_resolved(self) -> None: # directories_from_manifest should treat path components literally manifest = {"../../etc/shadow": "h1"} dirs = directories_from_manifest(manifest) # The result should contain "../.." and "../../etc" literally, not resolve them # The important thing: no OS path resolution happens for d in dirs: assert not pathlib.Path(d).is_absolute() def test_null_byte_in_directory_path_handled(self) -> None: # Null bytes in paths are unusual but should not crash manifest = {"src\x00/malicious.py": fake_id("h1")} try: dirs = directories_from_manifest(manifest) sid = compute_snapshot_id(manifest, dirs) assert len(sid) == 71 except (ValueError, TypeError): pass # rejecting is also acceptable def test_very_long_directory_path(self) -> None: long_name = "a" * 4096 manifest = {f"{long_name}/f.py": fake_id("h1")} dirs = directories_from_manifest(manifest) assert dirs == [long_name] sid = compute_snapshot_id(manifest, dirs) assert len(sid) == 71 def test_symlinked_dir_not_followed_during_walk(self, tmp_path: pathlib.Path) -> None: sensitive = tmp_path / "sensitive" sensitive.mkdir() (sensitive / "secret.txt").write_bytes(b"SECRET") repo_root = tmp_path / "repo" repo_root.mkdir() link = repo_root / "malicious_link" link.symlink_to(sensitive) files, dirs = walk_workdir_with_dirs(repo_root) assert "malicious_link/secret.txt" not in files def test_snapshot_record_with_adversarial_dirs_survives_roundtrip(self, store: pathlib.Path) -> None: # Adversarial: dirs containing special characters dirs = ["src", "src/sub dir", "a-b_c.d"] manifest = {"src/f.py": fake_id("h1")} sid = compute_snapshot_id(manifest, dirs) rec = SnapshotRecord(snapshot_id=sid, manifest=manifest, directories=dirs) # to_dict / from_dict roundtrip loaded = SnapshotRecord.from_dict(rec.to_dict()) assert loaded.directories == dirs def test_detect_directory_renames_no_prefix_confusion(self) -> None: # "a" should not confuse files under "ab/" as being under "a/" # because the prefix check uses "a/" (with trailing slash) last = {"a/f.py": "h1"} current = {"ab/f.py": "h1"} # "ab/f.py" does NOT start with "a/" so old_files under "a/" = {"f.py": "h1"} # but new_files under "ab/" = {"f.py": "h1"} — these DO match, so rename is detected # (which is correct: the file genuinely moved from a/ to ab/) renames = detect_directory_renames({"a"}, {"ab"}, last, current) assert renames == [("a", "ab")] def test_detect_directory_renames_prefix_does_not_bleed_across_siblings(self) -> None: # "models" should never absorb files from "models_v2" in the source manifest # when looking at what files belong to "models/" last = {"models/user.py": "h1", "models_v2/user.py": "h2"} # Both dirs deleted, one new dir added with only models_v2's content current = {"new_home/user.py": "h2"} renames = detect_directory_renames({"models", "models_v2"}, {"new_home"}, last, current) # "new_home" has {"user.py": "h2"} which matches "models_v2/" not "models/" assert ("models_v2", "new_home") in renames assert ("models", "new_home") not in renames