test_bridge_watch.py
python
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b
fix: try fetch/presign before fetch/mpack to avoid Cloudfla…
Sonnet 4.6
patch
6 days ago
| 1 | """Phase 6 — Watch mode tests for ``muse bridge git-export --watch``. |
| 2 | |
| 3 | Tests verify _watch_loop behaviour using mocks so no real sleep or git |
| 4 | subprocess is needed. |
| 5 | |
| 6 | NOTE: git subprocess calls in helper fixtures are INTENTIONAL — they create |
| 7 | real git repositories used as export targets. |
| 8 | """ |
| 9 | |
| 10 | from __future__ import annotations |
| 11 | |
| 12 | import argparse |
| 13 | import io |
| 14 | import json |
| 15 | import os |
| 16 | import pathlib |
| 17 | import subprocess |
| 18 | import sys |
| 19 | from unittest.mock import MagicMock, call, patch |
| 20 | |
| 21 | import pytest |
| 22 | |
| 23 | from muse.core.paths import head_path, heads_dir, init_repo_dirs |
| 24 | from muse.core.types import long_id |
| 25 | from tests.cli_test_helper import CliRunner |
| 26 | |
| 27 | runner = CliRunner() |
| 28 | |
| 29 | |
| 30 | # --------------------------------------------------------------------------- |
| 31 | # Helpers |
| 32 | # --------------------------------------------------------------------------- |
| 33 | |
| 34 | def _make_muse_repo(path: pathlib.Path) -> pathlib.Path: |
| 35 | path.mkdir(parents=True, exist_ok=True) |
| 36 | result = runner.invoke(None, ["init"], cwd=path) |
| 37 | assert result.exit_code == 0, f"muse init failed: {result.stderr}" |
| 38 | return path |
| 39 | |
| 40 | |
| 41 | def _make_git_repo(path: pathlib.Path) -> pathlib.Path: |
| 42 | path.mkdir(parents=True, exist_ok=True) |
| 43 | subprocess.run(["git", "init", str(path)], check=True, capture_output=True) |
| 44 | subprocess.run( |
| 45 | ["git", "-C", str(path), "config", "user.email", "[email protected]"], |
| 46 | check=True, capture_output=True, |
| 47 | ) |
| 48 | subprocess.run( |
| 49 | ["git", "-C", str(path), "config", "user.name", "Test"], |
| 50 | check=True, capture_output=True, |
| 51 | ) |
| 52 | (path / "README.md").write_text("init") |
| 53 | subprocess.run(["git", "-C", str(path), "add", "."], check=True, capture_output=True) |
| 54 | subprocess.run( |
| 55 | ["git", "-C", str(path), "commit", "-m", "init"], |
| 56 | check=True, capture_output=True, |
| 57 | ) |
| 58 | return path |
| 59 | |
| 60 | |
| 61 | def _fake_args( |
| 62 | git_branch: str = "muse-mirror", |
| 63 | git_remote: str = "origin", |
| 64 | muse_ref: str | None = None, |
| 65 | json_out: bool = True, |
| 66 | excludes: list[str] | None = None, |
| 67 | strip_muse_metadata: bool = True, |
| 68 | fix_modes: bool = False, |
| 69 | allow_empty: bool = False, |
| 70 | no_push: bool = True, |
| 71 | force_push: bool = False, |
| 72 | commit_message: str = "mirror: muse {commit_id}", |
| 73 | ) -> argparse.Namespace: |
| 74 | """Build a minimal argparse.Namespace suitable for _watch_loop.""" |
| 75 | return argparse.Namespace( |
| 76 | git_branch=git_branch, |
| 77 | git_remote=git_remote, |
| 78 | muse_ref=muse_ref, |
| 79 | json_out=json_out, |
| 80 | excludes=excludes or [], |
| 81 | strip_muse_metadata=strip_muse_metadata, |
| 82 | fix_modes=fix_modes, |
| 83 | allow_empty=allow_empty, |
| 84 | no_push=no_push, |
| 85 | force_push=force_push, |
| 86 | commit_message=commit_message, |
| 87 | ) |
| 88 | |
| 89 | |
| 90 | # =========================================================================== |
| 91 | # Tests |
| 92 | # =========================================================================== |
| 93 | |
| 94 | class TestWatchEmitsPollEvent: |
| 95 | """_watch_loop emits JSON poll events on each tick.""" |
| 96 | |
| 97 | def test_watch_emits_poll_event(self, tmp_path: pathlib.Path) -> None: |
| 98 | """_watch_loop emits {'event': 'poll', ...} JSON on each sleep cycle.""" |
| 99 | from muse.core.bridge.exporter import _watch_loop |
| 100 | |
| 101 | muse_root = _make_muse_repo(tmp_path / "muse") |
| 102 | git_dir = _make_git_repo(tmp_path / "git") |
| 103 | |
| 104 | # HEAD will stay the same on both reads → changed=False |
| 105 | static_commit_id = long_id("a" * 64) |
| 106 | call_count = [0] |
| 107 | |
| 108 | def fake_sleep(n: float) -> None: |
| 109 | call_count[0] += 1 |
| 110 | if call_count[0] >= 2: |
| 111 | raise KeyboardInterrupt # exit the loop after 1 poll |
| 112 | |
| 113 | args = _fake_args(json_out=True) |
| 114 | captured: list[str] = [] |
| 115 | |
| 116 | with patch("time.sleep", side_effect=fake_sleep), \ |
| 117 | patch("muse.cli.commands.bridge._watch_loop.__globals__", {}) if False else \ |
| 118 | patch("builtins.print", side_effect=lambda *a, **kw: captured.append(str(a[0]))): |
| 119 | # Patch HEAD to always return static_commit_id |
| 120 | hp = head_path(muse_root) |
| 121 | hp.write_text("ref: refs/heads/main") |
| 122 | ref_path = heads_dir(muse_root) / "main" |
| 123 | ref_path.parent.mkdir(parents=True, exist_ok=True) |
| 124 | ref_path.write_text(static_commit_id) |
| 125 | |
| 126 | try: |
| 127 | _watch_loop(args, muse_root, git_dir, interval=1) |
| 128 | except KeyboardInterrupt: |
| 129 | pass |
| 130 | |
| 131 | # At least one poll event was emitted |
| 132 | poll_events = [ |
| 133 | line for line in captured |
| 134 | if line.startswith("{") and '"event": "poll"' in line |
| 135 | ] |
| 136 | assert len(poll_events) >= 1 |
| 137 | |
| 138 | def test_watch_poll_event_has_required_keys(self, tmp_path: pathlib.Path) -> None: |
| 139 | """Poll event JSON has 'event', 'muse_commit_id', and 'changed' keys.""" |
| 140 | from muse.core.bridge.exporter import _watch_loop |
| 141 | |
| 142 | muse_root = _make_muse_repo(tmp_path / "muse") |
| 143 | git_dir = _make_git_repo(tmp_path / "git") |
| 144 | |
| 145 | static_commit_id = long_id("b" * 64) |
| 146 | hp = head_path(muse_root) |
| 147 | hp.write_text("ref: refs/heads/main") |
| 148 | ref_path = heads_dir(muse_root) / "main" |
| 149 | ref_path.parent.mkdir(parents=True, exist_ok=True) |
| 150 | ref_path.write_text(static_commit_id) |
| 151 | |
| 152 | call_count = [0] |
| 153 | |
| 154 | def fake_sleep(n: float) -> None: |
| 155 | call_count[0] += 1 |
| 156 | if call_count[0] >= 2: |
| 157 | raise KeyboardInterrupt |
| 158 | |
| 159 | args = _fake_args(json_out=True) |
| 160 | captured: list[str] = [] |
| 161 | |
| 162 | with patch("time.sleep", side_effect=fake_sleep), \ |
| 163 | patch("builtins.print", side_effect=lambda *a, **kw: captured.append(str(a[0]))): |
| 164 | try: |
| 165 | _watch_loop(args, muse_root, git_dir, interval=1) |
| 166 | except KeyboardInterrupt: |
| 167 | pass |
| 168 | |
| 169 | poll_events = [ |
| 170 | json.loads(line) |
| 171 | for line in captured |
| 172 | if line.startswith("{") and '"event": "poll"' in line |
| 173 | ] |
| 174 | assert len(poll_events) >= 1 |
| 175 | evt = poll_events[0] |
| 176 | assert evt["event"] == "poll" |
| 177 | assert "muse_commit_id" in evt |
| 178 | assert "changed" in evt |
| 179 | |
| 180 | |
| 181 | class TestWatchDetectsHeadChange: |
| 182 | """_watch_loop detects when Muse HEAD advances.""" |
| 183 | |
| 184 | def test_watch_detects_head_change(self, tmp_path: pathlib.Path) -> None: |
| 185 | """When HEAD changes between polls, changed=True is emitted.""" |
| 186 | from muse.core.bridge.exporter import _watch_loop |
| 187 | |
| 188 | muse_root = _make_muse_repo(tmp_path / "muse") |
| 189 | git_dir = _make_git_repo(tmp_path / "git") |
| 190 | |
| 191 | hp = head_path(muse_root) |
| 192 | hp.write_text("ref: refs/heads/main") |
| 193 | ref_path = heads_dir(muse_root) / "main" |
| 194 | ref_path.parent.mkdir(parents=True, exist_ok=True) |
| 195 | |
| 196 | commit_a = long_id("a" * 64) |
| 197 | commit_b = long_id("b" * 64) |
| 198 | ref_path.write_text(commit_a) |
| 199 | |
| 200 | call_count = [0] |
| 201 | |
| 202 | def fake_sleep(n: float) -> None: |
| 203 | call_count[0] += 1 |
| 204 | if call_count[0] == 1: |
| 205 | # Advance HEAD between first and second poll |
| 206 | ref_path.write_text(commit_b) |
| 207 | elif call_count[0] >= 2: |
| 208 | raise KeyboardInterrupt |
| 209 | |
| 210 | args = _fake_args(json_out=True) |
| 211 | captured: list[str] = [] |
| 212 | |
| 213 | # Patch the export path so we don't actually try to run GitExporter |
| 214 | mock_exporter = MagicMock() |
| 215 | mock_exporter.resolve_muse_ref.return_value = (commit_b, long_id("c" * 64)) |
| 216 | mock_exporter.read_snapshot.return_value = {} |
| 217 | mock_exporter.sync_to_git.return_value = 0 |
| 218 | mock_exporter.git_commit.return_value = None |
| 219 | |
| 220 | with patch("time.sleep", side_effect=fake_sleep), \ |
| 221 | patch("muse.core.bridge.exporter.GitExporter", return_value=mock_exporter), \ |
| 222 | patch("muse.core.bridge.exporter._ensure_git_branch"), \ |
| 223 | patch("builtins.print", side_effect=lambda *a, **kw: captured.append(str(a[0]))): |
| 224 | try: |
| 225 | _watch_loop(args, muse_root, git_dir, interval=1) |
| 226 | except KeyboardInterrupt: |
| 227 | pass |
| 228 | |
| 229 | poll_events = [ |
| 230 | json.loads(line) |
| 231 | for line in captured |
| 232 | if line.startswith("{") and '"event": "poll"' in line |
| 233 | ] |
| 234 | # There should be at least one poll with changed=True |
| 235 | changed_events = [e for e in poll_events if e.get("changed") is True] |
| 236 | assert len(changed_events) >= 1 |
| 237 | |
| 238 | |
| 239 | class TestWatchCallsExportOnChange: |
| 240 | """_watch_loop triggers export when HEAD changes.""" |
| 241 | |
| 242 | def test_watch_calls_export_on_change(self, tmp_path: pathlib.Path) -> None: |
| 243 | """When HEAD changes, GitExporter.sync_to_git is called.""" |
| 244 | from muse.core.bridge.exporter import _watch_loop |
| 245 | |
| 246 | muse_root = _make_muse_repo(tmp_path / "muse") |
| 247 | git_dir = _make_git_repo(tmp_path / "git") |
| 248 | |
| 249 | hp = head_path(muse_root) |
| 250 | hp.write_text("ref: refs/heads/main") |
| 251 | ref_path = heads_dir(muse_root) / "main" |
| 252 | ref_path.parent.mkdir(parents=True, exist_ok=True) |
| 253 | |
| 254 | commit_a = long_id("a" * 64) |
| 255 | commit_b = long_id("b" * 64) |
| 256 | ref_path.write_text(commit_a) |
| 257 | |
| 258 | call_count = [0] |
| 259 | export_calls: list[int] = [] |
| 260 | |
| 261 | def fake_sleep(n: float) -> None: |
| 262 | call_count[0] += 1 |
| 263 | if call_count[0] == 1: |
| 264 | ref_path.write_text(commit_b) |
| 265 | elif call_count[0] >= 2: |
| 266 | raise KeyboardInterrupt |
| 267 | |
| 268 | mock_exporter = MagicMock() |
| 269 | mock_exporter.resolve_muse_ref.return_value = (commit_b, long_id("c" * 64)) |
| 270 | mock_exporter.read_snapshot.return_value = {} |
| 271 | mock_exporter.sync_to_git.side_effect = lambda *a, **kw: export_calls.append(1) or 0 |
| 272 | mock_exporter.git_commit.return_value = None |
| 273 | |
| 274 | args = _fake_args(json_out=False) |
| 275 | |
| 276 | with patch("time.sleep", side_effect=fake_sleep), \ |
| 277 | patch("muse.core.bridge.exporter.GitExporter", return_value=mock_exporter), \ |
| 278 | patch("muse.core.bridge.exporter._ensure_git_branch"), \ |
| 279 | patch("builtins.print"): |
| 280 | try: |
| 281 | _watch_loop(args, muse_root, git_dir, interval=1) |
| 282 | except KeyboardInterrupt: |
| 283 | pass |
| 284 | |
| 285 | assert len(export_calls) >= 1, "sync_to_git should have been called when HEAD changed" |
| 286 | |
| 287 | def test_watch_no_export_when_head_unchanged(self, tmp_path: pathlib.Path) -> None: |
| 288 | """When HEAD does not change, no export is triggered.""" |
| 289 | from muse.core.bridge.exporter import _watch_loop |
| 290 | |
| 291 | muse_root = _make_muse_repo(tmp_path / "muse") |
| 292 | git_dir = _make_git_repo(tmp_path / "git") |
| 293 | |
| 294 | hp = head_path(muse_root) |
| 295 | hp.write_text("ref: refs/heads/main") |
| 296 | ref_path = heads_dir(muse_root) / "main" |
| 297 | ref_path.parent.mkdir(parents=True, exist_ok=True) |
| 298 | ref_path.write_text(long_id("a" * 64)) |
| 299 | |
| 300 | call_count = [0] |
| 301 | |
| 302 | def fake_sleep(n: float) -> None: |
| 303 | call_count[0] += 1 |
| 304 | if call_count[0] >= 2: |
| 305 | raise KeyboardInterrupt |
| 306 | |
| 307 | export_calls: list[int] = [] |
| 308 | mock_exporter = MagicMock() |
| 309 | mock_exporter.sync_to_git.side_effect = lambda *a, **kw: export_calls.append(1) or 0 |
| 310 | |
| 311 | args = _fake_args(json_out=False) |
| 312 | |
| 313 | with patch("time.sleep", side_effect=fake_sleep), \ |
| 314 | patch("muse.core.bridge.exporter.GitExporter", return_value=mock_exporter), \ |
| 315 | patch("muse.core.bridge.exporter._ensure_git_branch"), \ |
| 316 | patch("builtins.print"): |
| 317 | try: |
| 318 | _watch_loop(args, muse_root, git_dir, interval=1) |
| 319 | except KeyboardInterrupt: |
| 320 | pass |
| 321 | |
| 322 | assert len(export_calls) == 0, "sync_to_git should NOT be called when HEAD is unchanged" |
File History
1 commit
sha256:2eaa5d95f9d9383498e76947410a26e5a3ba23d182f339910c424cf88fad412b
fix: try fetch/presign before fetch/mpack to avoid Cloudfla…
Sonnet 4.6
patch
6 days ago