"""Phase 6 — Watch mode tests for ``muse bridge git-export --watch``. Tests verify _watch_loop behaviour using mocks so no real sleep or git subprocess is needed. NOTE: git subprocess calls in helper fixtures are INTENTIONAL — they create real git repositories used as export targets. """ from __future__ import annotations import argparse import io import json import os import pathlib import subprocess import sys from unittest.mock import MagicMock, call, patch import pytest from muse.core.paths import head_path, heads_dir, init_repo_dirs from muse.core.types import long_id from tests.cli_test_helper import CliRunner runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_muse_repo(path: pathlib.Path) -> pathlib.Path: path.mkdir(parents=True, exist_ok=True) result = runner.invoke(None, ["init"], cwd=path) assert result.exit_code == 0, f"muse init failed: {result.stderr}" return path def _make_git_repo(path: pathlib.Path) -> pathlib.Path: path.mkdir(parents=True, exist_ok=True) subprocess.run(["git", "init", str(path)], check=True, capture_output=True) subprocess.run( ["git", "-C", str(path), "config", "user.email", "test@test.com"], check=True, capture_output=True, ) subprocess.run( ["git", "-C", str(path), "config", "user.name", "Test"], check=True, capture_output=True, ) (path / "README.md").write_text("init") subprocess.run(["git", "-C", str(path), "add", "."], check=True, capture_output=True) subprocess.run( ["git", "-C", str(path), "commit", "-m", "init"], check=True, capture_output=True, ) return path def _fake_args( git_branch: str = "muse-mirror", git_remote: str = "origin", muse_ref: str | None = None, json_out: bool = True, excludes: list[str] | None = None, strip_muse_metadata: bool = True, fix_modes: bool = False, allow_empty: bool = False, no_push: bool = True, force_push: bool = False, commit_message: str = "mirror: muse {commit_id}", ) -> argparse.Namespace: """Build a minimal argparse.Namespace suitable for _watch_loop.""" return argparse.Namespace( git_branch=git_branch, git_remote=git_remote, muse_ref=muse_ref, json_out=json_out, excludes=excludes or [], strip_muse_metadata=strip_muse_metadata, fix_modes=fix_modes, allow_empty=allow_empty, no_push=no_push, force_push=force_push, commit_message=commit_message, ) # =========================================================================== # Tests # =========================================================================== class TestWatchEmitsPollEvent: """_watch_loop emits JSON poll events on each tick.""" def test_watch_emits_poll_event(self, tmp_path: pathlib.Path) -> None: """_watch_loop emits {'event': 'poll', ...} JSON on each sleep cycle.""" from muse.core.bridge.exporter import _watch_loop muse_root = _make_muse_repo(tmp_path / "muse") git_dir = _make_git_repo(tmp_path / "git") # HEAD will stay the same on both reads → changed=False static_commit_id = long_id("a" * 64) call_count = [0] def fake_sleep(n: float) -> None: call_count[0] += 1 if call_count[0] >= 2: raise KeyboardInterrupt # exit the loop after 1 poll args = _fake_args(json_out=True) captured: list[str] = [] with patch("time.sleep", side_effect=fake_sleep), \ patch("muse.cli.commands.bridge._watch_loop.__globals__", {}) if False else \ patch("builtins.print", side_effect=lambda *a, **kw: captured.append(str(a[0]))): # Patch HEAD to always return static_commit_id hp = head_path(muse_root) hp.write_text("ref: refs/heads/main") ref_path = heads_dir(muse_root) / "main" ref_path.parent.mkdir(parents=True, exist_ok=True) ref_path.write_text(static_commit_id) try: _watch_loop(args, muse_root, git_dir, interval=1) except KeyboardInterrupt: pass # At least one poll event was emitted poll_events = [ line for line in captured if line.startswith("{") and '"event": "poll"' in line ] assert len(poll_events) >= 1 def test_watch_poll_event_has_required_keys(self, tmp_path: pathlib.Path) -> None: """Poll event JSON has 'event', 'muse_commit_id', and 'changed' keys.""" from muse.core.bridge.exporter import _watch_loop muse_root = _make_muse_repo(tmp_path / "muse") git_dir = _make_git_repo(tmp_path / "git") static_commit_id = long_id("b" * 64) hp = head_path(muse_root) hp.write_text("ref: refs/heads/main") ref_path = heads_dir(muse_root) / "main" ref_path.parent.mkdir(parents=True, exist_ok=True) ref_path.write_text(static_commit_id) call_count = [0] def fake_sleep(n: float) -> None: call_count[0] += 1 if call_count[0] >= 2: raise KeyboardInterrupt args = _fake_args(json_out=True) captured: list[str] = [] with patch("time.sleep", side_effect=fake_sleep), \ patch("builtins.print", side_effect=lambda *a, **kw: captured.append(str(a[0]))): try: _watch_loop(args, muse_root, git_dir, interval=1) except KeyboardInterrupt: pass poll_events = [ json.loads(line) for line in captured if line.startswith("{") and '"event": "poll"' in line ] assert len(poll_events) >= 1 evt = poll_events[0] assert evt["event"] == "poll" assert "muse_commit_id" in evt assert "changed" in evt class TestWatchDetectsHeadChange: """_watch_loop detects when Muse HEAD advances.""" def test_watch_detects_head_change(self, tmp_path: pathlib.Path) -> None: """When HEAD changes between polls, changed=True is emitted.""" from muse.core.bridge.exporter import _watch_loop muse_root = _make_muse_repo(tmp_path / "muse") git_dir = _make_git_repo(tmp_path / "git") hp = head_path(muse_root) hp.write_text("ref: refs/heads/main") ref_path = heads_dir(muse_root) / "main" ref_path.parent.mkdir(parents=True, exist_ok=True) commit_a = long_id("a" * 64) commit_b = long_id("b" * 64) ref_path.write_text(commit_a) call_count = [0] def fake_sleep(n: float) -> None: call_count[0] += 1 if call_count[0] == 1: # Advance HEAD between first and second poll ref_path.write_text(commit_b) elif call_count[0] >= 2: raise KeyboardInterrupt args = _fake_args(json_out=True) captured: list[str] = [] # Patch the export path so we don't actually try to run GitExporter mock_exporter = MagicMock() mock_exporter.resolve_muse_ref.return_value = (commit_b, long_id("c" * 64)) mock_exporter.read_snapshot.return_value = {} mock_exporter.sync_to_git.return_value = 0 mock_exporter.git_commit.return_value = None with patch("time.sleep", side_effect=fake_sleep), \ patch("muse.core.bridge.exporter.GitExporter", return_value=mock_exporter), \ patch("muse.core.bridge.exporter._ensure_git_branch"), \ patch("builtins.print", side_effect=lambda *a, **kw: captured.append(str(a[0]))): try: _watch_loop(args, muse_root, git_dir, interval=1) except KeyboardInterrupt: pass poll_events = [ json.loads(line) for line in captured if line.startswith("{") and '"event": "poll"' in line ] # There should be at least one poll with changed=True changed_events = [e for e in poll_events if e.get("changed") is True] assert len(changed_events) >= 1 class TestWatchCallsExportOnChange: """_watch_loop triggers export when HEAD changes.""" def test_watch_calls_export_on_change(self, tmp_path: pathlib.Path) -> None: """When HEAD changes, GitExporter.sync_to_git is called.""" from muse.core.bridge.exporter import _watch_loop muse_root = _make_muse_repo(tmp_path / "muse") git_dir = _make_git_repo(tmp_path / "git") hp = head_path(muse_root) hp.write_text("ref: refs/heads/main") ref_path = heads_dir(muse_root) / "main" ref_path.parent.mkdir(parents=True, exist_ok=True) commit_a = long_id("a" * 64) commit_b = long_id("b" * 64) ref_path.write_text(commit_a) call_count = [0] export_calls: list[int] = [] def fake_sleep(n: float) -> None: call_count[0] += 1 if call_count[0] == 1: ref_path.write_text(commit_b) elif call_count[0] >= 2: raise KeyboardInterrupt mock_exporter = MagicMock() mock_exporter.resolve_muse_ref.return_value = (commit_b, long_id("c" * 64)) mock_exporter.read_snapshot.return_value = {} mock_exporter.sync_to_git.side_effect = lambda *a, **kw: export_calls.append(1) or 0 mock_exporter.git_commit.return_value = None args = _fake_args(json_out=False) with patch("time.sleep", side_effect=fake_sleep), \ patch("muse.core.bridge.exporter.GitExporter", return_value=mock_exporter), \ patch("muse.core.bridge.exporter._ensure_git_branch"), \ patch("builtins.print"): try: _watch_loop(args, muse_root, git_dir, interval=1) except KeyboardInterrupt: pass assert len(export_calls) >= 1, "sync_to_git should have been called when HEAD changed" def test_watch_no_export_when_head_unchanged(self, tmp_path: pathlib.Path) -> None: """When HEAD does not change, no export is triggered.""" from muse.core.bridge.exporter import _watch_loop muse_root = _make_muse_repo(tmp_path / "muse") git_dir = _make_git_repo(tmp_path / "git") hp = head_path(muse_root) hp.write_text("ref: refs/heads/main") ref_path = heads_dir(muse_root) / "main" ref_path.parent.mkdir(parents=True, exist_ok=True) ref_path.write_text(long_id("a" * 64)) call_count = [0] def fake_sleep(n: float) -> None: call_count[0] += 1 if call_count[0] >= 2: raise KeyboardInterrupt export_calls: list[int] = [] mock_exporter = MagicMock() mock_exporter.sync_to_git.side_effect = lambda *a, **kw: export_calls.append(1) or 0 args = _fake_args(json_out=False) with patch("time.sleep", side_effect=fake_sleep), \ patch("muse.core.bridge.exporter.GitExporter", return_value=mock_exporter), \ patch("muse.core.bridge.exporter._ensure_git_branch"), \ patch("builtins.print"): try: _watch_loop(args, muse_root, git_dir, interval=1) except KeyboardInterrupt: pass assert len(export_calls) == 0, "sync_to_git should NOT be called when HEAD is unchanged"