"""Phase 2.2 — Symlink attack tests. Covers every identified attack vector: 1. ``.muse/`` itself replaced by a symlink → ``find_repo_root`` rejects it. 2. Critical subdirectories (``.muse/objects/``, ``.muse/commits/``, etc.) replaced by symlinks → ``require_repo`` detects and exits. 3. ``write_object`` / ``write_object_from_path`` detect a symlinked shard dir or objects directory and raise before writing. 4. ``write_text_atomic`` / ``_write_msgpack_atomic`` detect a symlinked parent directory and raise before writing. 5. ``cleanup_stale_object_temps`` skips symlinked shard directories safely. 6. ``_cleanup_muse_dir_temps`` skips symlinked subdirectories safely. 7. Tracked-file symlinks are silently skipped by the workdir walker (``os.lstat`` + ``S_ISREG`` filter). 8. Stress: 50 concurrent symlink-swap attempts during an object write do not corrupt or redirect any data. Each test creates its own isolated temporary directory — no shared state. """ from __future__ import annotations import os import pathlib import tempfile import threading import time import pytest from muse.core.types import DEFAULT_HASH_ALGO, blob_id, fake_id, split_id from muse.core.object_store import ( cleanup_stale_object_temps, objects_algo_dir, write_object, write_object_from_path, ) from muse.core.repo import _cleanup_muse_dir_temps, _verify_muse_dir_integrity, find_repo_root from muse.core.store import CommitDict, _write_msgpack_atomic, write_text_atomic from muse.core.validation import assert_not_symlink, assert_write_inside_repo from muse.core.paths import commits_dir, config_toml_path, head_path, heads_dir, muse_dir, objects_dir from tests.cli_test_helper import CliRunner # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_real_repo(tmp_path: pathlib.Path) -> pathlib.Path: """Initialise a minimal real (non-symlinked) ``.muse/`` repo layout.""" repo = tmp_path / "repo" repo.mkdir() muse = muse_dir(repo) for sub in ("objects", "commits", "snapshots", "refs", "refs/heads", "tags"): (muse / sub).mkdir(parents=True) (muse / "HEAD").write_text("ref: refs/heads/main\n") (muse / "repo.json").write_text('{"repo_id": "test-repo"}') return repo # --------------------------------------------------------------------------- # Unit tests — assert_not_symlink / assert_write_inside_repo # --------------------------------------------------------------------------- class TestAssertNotSymlink: def test_real_dir_passes(self, tmp_path: pathlib.Path) -> None: real = tmp_path / "real" real.mkdir() assert_not_symlink(real, "real dir") # should not raise def test_real_file_passes(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "file.txt" f.write_text("hello") assert_not_symlink(f, "file") # should not raise def test_nonexistent_passes(self, tmp_path: pathlib.Path) -> None: # A path that does not yet exist is not a symlink. assert_not_symlink(tmp_path / "no-such-path", "ghost") def test_symlink_to_dir_raises(self, tmp_path: pathlib.Path) -> None: target = tmp_path / "target" target.mkdir() link = tmp_path / "link" link.symlink_to(target) with pytest.raises(ValueError, match="symbolic link"): assert_not_symlink(link, "test link") def test_symlink_to_file_raises(self, tmp_path: pathlib.Path) -> None: target = tmp_path / "target.txt" target.write_text("data") link = tmp_path / "link.txt" link.symlink_to(target) with pytest.raises(ValueError, match="symbolic link"): assert_not_symlink(link) def test_dangling_symlink_raises(self, tmp_path: pathlib.Path) -> None: link = tmp_path / "dangling" link.symlink_to(tmp_path / "nonexistent") with pytest.raises(ValueError, match="symbolic link"): assert_not_symlink(link, "dangling link") def test_error_message_contains_label(self, tmp_path: pathlib.Path) -> None: link = tmp_path / "malicious" link.symlink_to(tmp_path) with pytest.raises(ValueError, match="malicious-label"): assert_not_symlink(link, "malicious-label") class TestAssertWriteInsideRepo: def test_path_inside_passes(self, tmp_path: pathlib.Path) -> None: repo = tmp_path / "repo" repo.mkdir() target = commits_dir(repo) / "abc.msgpack" assert_write_inside_repo(repo, target) # should not raise def test_path_outside_raises(self, tmp_path: pathlib.Path) -> None: repo = tmp_path / "repo" repo.mkdir() outside = tmp_path / "other" / "malicious.txt" with pytest.raises(ValueError, match="outside the repository root"): assert_write_inside_repo(repo, outside) def test_symlink_escaping_raises(self, tmp_path: pathlib.Path) -> None: """If dest resolves outside repo via symlink, the check catches it.""" repo = tmp_path / "repo" repo.mkdir() muse = muse_dir(repo) muse.mkdir() attacker = tmp_path / "attacker" attacker.mkdir() # Symlink .muse/objects → /tmp/attacker malicious_link = muse / "objects" malicious_link.symlink_to(attacker) # The destination inside the objects dir resolves to attacker/... # Parenthesise to avoid PosixPath * int precedence error. dest = malicious_link / "ab" / ("cd" * 31) with pytest.raises(ValueError, match="outside the repository root"): assert_write_inside_repo(repo, dest) # --------------------------------------------------------------------------- # find_repo_root — symlinked .muse/ is rejected # --------------------------------------------------------------------------- class TestFindRepoRootSymlink: def test_real_muse_dir_found(self, tmp_path: pathlib.Path) -> None: repo = _make_real_repo(tmp_path) found = find_repo_root(start=repo) assert found == repo def test_symlinked_muse_dir_not_found(self, tmp_path: pathlib.Path) -> None: """If .muse/ is a symlink, find_repo_root must not return that directory.""" real_muse = tmp_path / "real_muse_dir" real_muse.mkdir() repo = tmp_path / "repo" repo.mkdir() muse_dir(repo).symlink_to(real_muse) result = find_repo_root(start=repo) assert result is None, ( f"find_repo_root should return None for symlinked .muse/, got {result}" ) def test_dangling_symlink_muse_dir_not_found(self, tmp_path: pathlib.Path) -> None: repo = tmp_path / "repo" repo.mkdir() muse_dir(repo).symlink_to(tmp_path / "nonexistent") assert find_repo_root(start=repo) is None def test_symlink_to_symlink_muse_dir_rejected(self, tmp_path: pathlib.Path) -> None: real_muse = tmp_path / "real_muse" real_muse.mkdir() intermediate = tmp_path / "intermediate" intermediate.symlink_to(real_muse) repo = tmp_path / "repo" repo.mkdir() muse_dir(repo).symlink_to(intermediate) assert find_repo_root(start=repo) is None def test_env_override_still_requires_real_muse(self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch) -> None: """MUSE_REPO_ROOT override: returns None if .muse/ is a symlink.""" real_muse = tmp_path / "real_muse" real_muse.mkdir() repo = tmp_path / "repo" repo.mkdir() muse_dir(repo).symlink_to(real_muse) monkeypatch.setenv("MUSE_REPO_ROOT", str(repo)) result = find_repo_root() assert result is None # --------------------------------------------------------------------------- # _verify_muse_dir_integrity — critical subdirs must not be symlinks # --------------------------------------------------------------------------- class TestVerifyMuseDirIntegrity: def test_clean_repo_passes(self, tmp_path: pathlib.Path) -> None: repo = _make_real_repo(tmp_path) _verify_muse_dir_integrity(muse_dir(repo)) # must not raise @pytest.mark.parametrize("subdir", [ "objects", "commits", "snapshots", "refs", "refs/heads", "tags", ]) def test_symlinked_subdir_causes_exit( self, tmp_path: pathlib.Path, subdir: str ) -> None: import shutil repo = _make_real_repo(tmp_path) muse = muse_dir(repo) attacker = tmp_path / "attacker" attacker.mkdir(parents=True, exist_ok=True) target = muse / subdir # Remove the real directory tree (may be non-empty, e.g. refs/). if target.exists() and not target.is_symlink(): shutil.rmtree(target) target.symlink_to(attacker) with pytest.raises(SystemExit): _verify_muse_dir_integrity(muse) def test_missing_subdirs_pass(self, tmp_path: pathlib.Path) -> None: """Newly-initialised repos may not have all dirs yet — that's fine.""" repo = tmp_path / "fresh" repo.mkdir() muse = muse_dir(repo) muse.mkdir() _verify_muse_dir_integrity(muse) # no dirs present yet — must not raise # --------------------------------------------------------------------------- # write_object — symlinked shard directory is rejected # --------------------------------------------------------------------------- class TestWriteObjectSymlink: def test_normal_write_succeeds(self, tmp_path: pathlib.Path) -> None: repo = _make_real_repo(tmp_path) content = b"hello world" oid = blob_id(content) result = write_object(repo, oid, content) assert result is True def test_symlinked_objects_dir_raises(self, tmp_path: pathlib.Path) -> None: """If .muse/objects/ is a symlink, write_object must raise ValueError.""" repo = _make_real_repo(tmp_path) attacker = tmp_path / "attacker" attacker.mkdir() import shutil shutil.rmtree(objects_dir(repo)) (objects_dir(repo)).symlink_to(attacker) content = b"malicious payload" oid = blob_id(content) # write_object creates the shard dir, then checks it with pytest.raises((ValueError, SystemExit)): write_object(repo, oid, content) # Verify nothing was written to the attacker dir assert not any(attacker.rglob("*")), "Data must not be written to symlink target" def test_symlinked_shard_dir_raises(self, tmp_path: pathlib.Path) -> None: """A symlinked shard dir (e.g. objects/ab/ → /tmp/malicious/) is rejected.""" repo = _make_real_repo(tmp_path) content = b"shard attack" oid = blob_id(content) prefix = split_id(oid)[1][:2] attacker = tmp_path / "attacker_shard" attacker.mkdir() shard = objects_algo_dir(repo) / prefix shard.mkdir(parents=True, exist_ok=True) # Replace real shard dir with symlink import shutil shutil.rmtree(shard) shard.symlink_to(attacker) with pytest.raises((ValueError, SystemExit)): write_object(repo, oid, content) assert not any(attacker.rglob("*")), "No data must reach symlink target" def test_write_object_from_path_symlinked_objects_dir_raises( self, tmp_path: pathlib.Path ) -> None: repo = _make_real_repo(tmp_path) attacker = tmp_path / "attacker" attacker.mkdir() import shutil shutil.rmtree(objects_dir(repo)) (objects_dir(repo)).symlink_to(attacker) src = tmp_path / "source.bin" src.write_bytes(b"from path content") oid = blob_id(src.read_bytes()) with pytest.raises((ValueError, SystemExit)): write_object_from_path(repo, oid, src) assert not any(attacker.rglob("*")), "Data must not be written to symlink target" # --------------------------------------------------------------------------- # write_text_atomic — symlinked parent directory is rejected # --------------------------------------------------------------------------- class TestWriteTextAtomicSymlink: def test_normal_write_succeeds(self, tmp_path: pathlib.Path) -> None: target = tmp_path / "HEAD" write_text_atomic(target, "ref: refs/heads/main\n") assert target.read_text() == "ref: refs/heads/main\n" def test_symlinked_parent_raises(self, tmp_path: pathlib.Path) -> None: """If the parent directory is a symlink, write_text_atomic must raise.""" real_dir = tmp_path / "real" real_dir.mkdir() attacker = tmp_path / "attacker" attacker.mkdir() link_dir = tmp_path / "link_dir" link_dir.symlink_to(attacker) target = link_dir / "HEAD" with pytest.raises(ValueError, match="symbolic link"): write_text_atomic(target, "ref: refs/heads/main\n") # Verify attacker dir untouched assert not any(attacker.iterdir()), "No data must reach symlink target" def test_symlink_at_destination_is_replaced(self, tmp_path: pathlib.Path) -> None: """POSIX os.replace on a symlink replaces the symlink entry itself. This is the SAFE case: writing HEAD when HEAD is a symlink replaces the symlink with a real file — data goes to .muse/HEAD, not to the symlink target. This test documents that behaviour is preserved. """ real_parent = tmp_path / "muse_dir" real_parent.mkdir() elsewhere = tmp_path / "elsewhere.txt" elsewhere.write_text("original") head = real_parent / "HEAD" head.symlink_to(elsewhere) assert head.is_symlink() write_text_atomic(head, "new content\n") # The symlink should be gone — HEAD is now a real file assert not head.is_symlink(), "symlink at destination must be replaced by real file" assert head.read_text() == "new content\n" # The symlink target is untouched assert elsewhere.read_text() == "original" # --------------------------------------------------------------------------- # _write_msgpack_atomic — symlinked parent directory is rejected # --------------------------------------------------------------------------- class TestWriteMsgpackAtomicSymlink: def _minimal_commit_dict(self) -> CommitDict: return CommitDict( commit_id="a" * 64, repo_id=fake_id("repo"), branch="main", parent_commit_id=None, parent2_commit_id=None, snapshot_id="b" * 64, message="test commit", author="test", committed_at="2026-01-01T00:00:00+00:00", metadata={}, ) def test_normal_write_succeeds(self, tmp_path: pathlib.Path) -> None: real_dir = tmp_path / "commits" real_dir.mkdir() target = real_dir / "abc.msgpack" _write_msgpack_atomic(target, self._minimal_commit_dict()) assert target.exists() def test_symlinked_parent_raises(self, tmp_path: pathlib.Path) -> None: attacker = tmp_path / "attacker_commits" attacker.mkdir() link_dir = tmp_path / "commits_link" link_dir.symlink_to(attacker) target = link_dir / "abc.msgpack" with pytest.raises(ValueError, match="symbolic link"): _write_msgpack_atomic(target, self._minimal_commit_dict()) assert not any(attacker.iterdir()), "No data must reach symlink target" # --------------------------------------------------------------------------- # cleanup_stale_object_temps — symlinked shards are skipped # --------------------------------------------------------------------------- class TestCleanupSkipsSymlinks: def test_symlinked_shard_not_entered(self, tmp_path: pathlib.Path) -> None: """cleanup_stale_object_temps must skip symlinked shard directories.""" repo = _make_real_repo(tmp_path) attacker = tmp_path / "attacker" attacker.mkdir() # Place a "stale temp" file inside the attacker directory victim = attacker / ".obj-tmp-should-not-be-deleted" victim.write_bytes(b"important attacker data") # Replace a shard with a symlink → attacker shard = objects_algo_dir(repo) / "ab" shard.mkdir(parents=True, exist_ok=True) import shutil shutil.rmtree(shard) shard.symlink_to(attacker) removed = cleanup_stale_object_temps(repo) assert removed == 0, "Symlinked shard must not be entered" assert victim.exists(), "File in symlink target must not be deleted" def test_real_shards_are_cleaned(self, tmp_path: pathlib.Path) -> None: repo = _make_real_repo(tmp_path) shard = objects_algo_dir(repo) / "cd" shard.mkdir(parents=True) stale = shard / ".obj-tmp-stale123" stale.write_bytes(b"stale data") # Backdate mtime so the 60-second age gate treats this file as stale. os.utime(stale, (0, 0)) removed = cleanup_stale_object_temps(repo) assert removed == 1 assert not stale.exists() class TestCleanupMuseDirSkipsSymlinks: def test_symlinked_subdir_not_entered(self, tmp_path: pathlib.Path) -> None: """_cleanup_muse_dir_temps must skip symlinked subdirectories.""" repo = _make_real_repo(tmp_path) attacker = tmp_path / "attacker_commits" attacker.mkdir() victim = attacker / ".muse-tmp-should-not-be-deleted" victim.write_bytes(b"important data") muse = muse_dir(repo) import shutil shutil.rmtree(muse / "commits") (muse / "commits").symlink_to(attacker) removed = _cleanup_muse_dir_temps(muse) assert removed == 0, "Symlinked subdir must not be entered" assert victim.exists(), "File in symlink target must not be deleted" # --------------------------------------------------------------------------- # Tracked-file symlinks — workdir walker skips them # --------------------------------------------------------------------------- class TestTrackedFileSymlinks: def test_symlink_to_sensitive_file_not_staged(self, tmp_path: pathlib.Path) -> None: """A tracked file that is a symlink is silently excluded from the manifest. The workdir walker uses os.lstat + S_ISREG, so symlinks are never hashed or stored — even if they point to /etc/passwd. """ from muse.core.snapshot import build_snapshot_manifest repo = _make_real_repo(tmp_path) workdir = repo # Create a real file (should be tracked) real_file = workdir / "song.mid" real_file.write_bytes(b"\x4d\x54\x68\x64" + b"\x00" * 10) # Create a symlink to a sensitive target sensitive = tmp_path / "sensitive.txt" sensitive.write_text("secret data") malicious_link = workdir / "malicious.txt" malicious_link.symlink_to(sensitive) manifest = build_snapshot_manifest(workdir) assert "song.mid" in manifest, "real file must be tracked" assert "malicious.txt" not in manifest, "symlink must NOT be in manifest" def test_symlink_to_nonexistent_target_not_staged(self, tmp_path: pathlib.Path) -> None: from muse.core.snapshot import build_snapshot_manifest repo = _make_real_repo(tmp_path) workdir = repo dangling = workdir / "dangling.txt" dangling.symlink_to(tmp_path / "nonexistent") manifest = build_snapshot_manifest(workdir) assert "dangling.txt" not in manifest # --------------------------------------------------------------------------- # Stress: concurrent symlink-swap during write_object # --------------------------------------------------------------------------- class TestConcurrentSymlinkSwapStress: def test_concurrent_symlink_swap_does_not_corrupt( self, tmp_path: pathlib.Path ) -> None: """50 concurrent symlink-swap threads racing against write_object. write_object either succeeds (writes to the real location) or raises ValueError (detects the symlink). It must never silently write to the attacker-controlled location. """ repo = _make_real_repo(tmp_path) attacker = tmp_path / "attacker_stress" attacker.mkdir() obj_dir = objects_dir(repo) content = b"stress test object symlink-check" oid = blob_id(content) shard_prefix = oid[:2] shard_dir = obj_dir / shard_prefix errors: list[str] = [] swap_active = threading.Event() stop_swapping = threading.Event() def swap_shard() -> None: """Repeatedly swap shard dir between real and symlink.""" import shutil while not stop_swapping.is_set(): swap_active.set() # Replace real shard with symlink try: if shard_dir.exists() and not shard_dir.is_symlink(): shutil.rmtree(shard_dir) shard_dir.symlink_to(attacker) time.sleep(0.0005) # Restore real shard if shard_dir.is_symlink(): shard_dir.unlink() shard_dir.mkdir(exist_ok=True) except OSError: pass swapper = threading.Thread(target=swap_shard, daemon=True) swapper.start() swap_active.wait(timeout=1.0) write_errors = 0 write_successes = 0 for _ in range(50): try: write_object(repo, oid, content) write_successes += 1 except (ValueError, OSError, SystemExit): write_errors += 1 stop_swapping.set() swapper.join(timeout=2.0) # The attacker directory must remain empty regardless of outcome. attacker_files = list(attacker.rglob("*")) if attacker_files: errors.append( f"Data leaked to attacker dir: {[str(f) for f in attacker_files]}" ) assert not errors, "\n".join(errors) # Sanity: at least some operations completed (either succeeded or were blocked). assert write_successes + write_errors == 50 # --------------------------------------------------------------------------- # Integration: end-to-end CLI commands with symlinked .muse/ # --------------------------------------------------------------------------- class TestCLIWithSymlinkedMuse: def test_muse_status_rejects_symlinked_muse(self, tmp_path: pathlib.Path) -> None: """muse status must fail when .muse/ is a symlink.""" real_muse = tmp_path / "real_muse" real_muse.mkdir() for sub in ("objects", "commits", "snapshots", "refs/heads", "tags"): (real_muse / sub).mkdir(parents=True) (real_muse / "HEAD").write_text("ref: refs/heads/main\n") (real_muse / "repo.json").write_text('{"repo_id": "test"}') repo = tmp_path / "repo" repo.mkdir() muse_dir(repo).symlink_to(real_muse) runner = CliRunner() # find_repo_root won't find a real .muse/ → should exit non-zero result = runner.invoke(None, ["status"], env={"MUSE_REPO_ROOT": str(repo)}) assert result.exit_code != 0 def test_muse_status_accepts_real_muse(self, tmp_path: pathlib.Path) -> None: """muse status does not reject a real .muse/ directory as a symlink.""" repo = _make_real_repo(tmp_path) (config_toml_path(repo)).write_text( "[core]\nauthor = \"test\"\n" ) (heads_dir(repo) / "main").write_text("") (head_path(repo)).write_text("ref: refs/heads/main\n") runner = CliRunner() result = runner.invoke( None, ["status"], env={"MUSE_REPO_ROOT": str(repo)}, ) # Must not complain about symlinks on a real .muse/. assert "symbolic link" not in result.output.lower()