"""TDD — Phase 3 of issue #40: DB-query BFS sites migrated to walk_dag_async. Structural tests confirm each function uses walk_dag_async and has no inline BFS/DFS loop. Behavioural tests confirm the migrated functions produce correct output when given a fake session. P3-1 Structural — _walk_new_commits uses walk_dag_async; no inline while stack P3-2 Structural — _resolve_ancestor_manifest uses walk_dag_async; no inline while frontier P3-3 Structural — _resolve_ancestor_manifest uses walk_dag_async; no inline while fid P3-4 Structural — _is_ancestor_db uses walk_dag_async; no inline while frontier P3-5 Behavioural — _walk_new_commits with stop_at returns commits up to boundary P3-6 Behavioural — _walk_new_commits without stop_at returns all repo commits P3-7 Behavioural — _resolve_ancestor_manifest returns manifest at LCA of two branches P3-8 Behavioural — _is_ancestor_db: True when ancestor is reachable P3-9 Behavioural — _is_ancestor_db: False when max_hops cap exceeded before finding ancestor """ from __future__ import annotations import datetime import inspect from types import SimpleNamespace from unittest.mock import AsyncMock, MagicMock, patch import pytest # --------------------------------------------------------------------------- # P3-1 Structural — _walk_new_commits # --------------------------------------------------------------------------- def test_p3_1_walk_new_commits_uses_walk_dag_async() -> None: """_walk_new_commits must use walk_dag_async; no inline while stack.""" from musehub.services import musehub_symbol_indexer as mod src = inspect.getsource(mod._walk_new_commits) assert "walk_dag_async" in src, ( "_walk_new_commits must delegate DFS to walk_dag_async." ) assert "while stack" not in src, ( "_walk_new_commits still has an inline while-stack DFS." ) # --------------------------------------------------------------------------- # P3-2 Structural — _resolve_ancestor_manifest (walk 1: while frontier) # --------------------------------------------------------------------------- def test_p3_2_resolve_ancestor_manifest_uses_walk_dag_async() -> None: """_resolve_ancestor_manifest must use walk_dag_async; no inline while frontier.""" from musehub.services import musehub_proposals as mod src = inspect.getsource(mod._resolve_ancestor_manifest) assert "walk_dag_async" in src, ( "_resolve_ancestor_manifest must delegate BFS to walk_dag_async." ) assert "while frontier" not in src, ( "_resolve_ancestor_manifest still has an inline while-frontier BFS." ) # --------------------------------------------------------------------------- # P3-3 Structural — _resolve_ancestor_manifest (walk 2: while fid) # --------------------------------------------------------------------------- def test_p3_3_resolve_ancestor_manifest_no_while_fid() -> None: """_resolve_ancestor_manifest must not have an inline while-fid first-parent walk.""" from musehub.services import musehub_proposals as mod src = inspect.getsource(mod._resolve_ancestor_manifest) assert "while fid" not in src, ( "_resolve_ancestor_manifest still has an inline while-fid first-parent walk." ) # --------------------------------------------------------------------------- # P3-4 Structural — _is_ancestor_db # --------------------------------------------------------------------------- def test_p3_4_is_ancestor_db_uses_walk_dag_async() -> None: """_is_ancestor_db must use walk_dag_async; no inline while frontier.""" from musehub.services import musehub_wire as mod src = inspect.getsource(mod._is_ancestor_db) assert "walk_dag_async" in src, ( "_is_ancestor_db must delegate BFS to walk_dag_async." ) assert "while frontier" not in src, ( "_is_ancestor_db still has an inline while-frontier BFS." ) # --------------------------------------------------------------------------- # Helpers for behavioural tests # --------------------------------------------------------------------------- _TZ = datetime.timezone.utc def _ts(n: int) -> datetime.datetime: return datetime.datetime(2026, 1, n, tzinfo=_TZ) def _commit(cid: str, parents: list[str], n: int, repo: str = "repo1", snapshot_id: str | None = None) -> SimpleNamespace: return SimpleNamespace( commit_id=cid, parent_ids=parents, repo_id=repo, timestamp=_ts(n), snapshot_id=snapshot_id, ) def _exec_result(*, scalar: SimpleNamespace | None = None, scalars: list[SimpleNamespace] | None = None, one: SimpleNamespace | None = None) -> MagicMock: """Build a MagicMock mimicking an AsyncSession execute result.""" r = MagicMock() r.scalar_one_or_none.return_value = scalar r.scalars.return_value = scalars or [] r.one_or_none.return_value = one return r # --------------------------------------------------------------------------- # P3-5 Behavioural — _walk_new_commits with stop_at # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_p3_5_walk_new_commits_with_stop_at() -> None: """_walk_new_commits with stop_at=C1 returns only C3 and C2 (C1 excluded). Chain: C3 → C2 → C1. stop_at=C1. Expected: sorted by timestamp → [C2, C3]. """ from musehub.services.musehub_symbol_indexer import _walk_new_commits c1 = _commit("C1", [], 1) c2 = _commit("C2", ["C1"], 2) c3 = _commit("C3", ["C2"], 3) session = MagicMock() # One bulk execute returns all repo commits; walk then excludes stop_at=C1 in memory session.execute = AsyncMock(return_value=_exec_result(scalars=[c1, c2, c3])) result = await _walk_new_commits(session, "repo1", "C3", "C1") assert [c.commit_id for c in result] == ["C2", "C3"], ( f"Expected [C2, C3] sorted oldest→newest, got {[c.commit_id for c in result]}" ) # --------------------------------------------------------------------------- # P3-6 Behavioural — _walk_new_commits without stop_at # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_p3_6_walk_new_commits_no_stop_at() -> None: """_walk_new_commits without stop_at returns all commits sorted by timestamp. Chain: C3 → C2 → C1. no stop_at. Expected: sorted by timestamp → [C1, C2, C3]. """ from musehub.services.musehub_symbol_indexer import _walk_new_commits c1 = _commit("C1", [], 1) c2 = _commit("C2", ["C1"], 2) c3 = _commit("C3", ["C2"], 3) session = MagicMock() # One bulk execute to fetch all commits for the repo session.execute = AsyncMock(return_value=_exec_result(scalars=[c3, c1, c2])) result = await _walk_new_commits(session, "repo1", "C3", None) assert [c.commit_id for c in result] == ["C1", "C2", "C3"], ( f"Expected [C1, C2, C3] sorted oldest→newest, got {[c.commit_id for c in result]}" ) # --------------------------------------------------------------------------- # P3-7 Behavioural — _resolve_ancestor_manifest # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_p3_7_resolve_ancestor_manifest_finds_lca() -> None: """_resolve_ancestor_manifest returns the ancestor's snapshot manifest. Diamond: BASE ← L1 ← L2 (from_branch tip = L2) ← R1 (to_branch tip = R1) LCA = BASE. Expect the manifest at BASE's snapshot. """ from musehub.services.musehub_proposals import _resolve_ancestor_manifest base = _commit("BASE", [], 1, snapshot_id="snap-BASE") r1 = _commit("R1", ["BASE"], 2, snapshot_id="snap-R1") l1 = _commit("L1", ["BASE"], 2, snapshot_id="snap-L1") l2 = _commit("L2", ["L1"], 3, snapshot_id="snap-L2") commits = {"BASE": base, "R1": r1, "L1": l1, "L2": l2} to_branch_row = SimpleNamespace(head_commit_id="R1") from_branch_row = SimpleNamespace(head_commit_id="L2") session = MagicMock() session.get = AsyncMock(side_effect=lambda model, pk: commits.get(pk)) expected_manifest = {"files": {"base_file.py": "sha256:abc"}} with patch( "musehub.services.musehub_proposals._get_branch", new=AsyncMock(side_effect=[to_branch_row, from_branch_row]), ), patch( "musehub.services.musehub_snapshot.get_snapshot_manifest", new=AsyncMock(return_value=expected_manifest), ): result = await _resolve_ancestor_manifest( session, "repo1", "from_branch", "to_branch" ) assert result == expected_manifest, ( f"Expected manifest at BASE, got {result}" ) # --------------------------------------------------------------------------- # P3-8 Behavioural — _is_ancestor_db: True case # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_p3_8_is_ancestor_db_true() -> None: """_is_ancestor_db returns True when ancestor is reachable. Chain: C3 → C2 → C1. ancestor=C1, descendant=C3 → True. """ from musehub.services.musehub_wire import _is_ancestor_db c1 = _commit("C1", [], 1) c2 = _commit("C2", ["C1"], 2) c3 = _commit("C3", ["C2"], 3) session = MagicMock() # DFS from C3: adj("C3"), adj("C2"), adj("C1") → C1 yielded → match session.execute = AsyncMock(side_effect=[ _exec_result(scalar=c3), _exec_result(scalar=c2), _exec_result(scalar=c1), ]) assert await _is_ancestor_db(session, "C1", "C3", "repo1") is True # --------------------------------------------------------------------------- # P3-9 Behavioural — _is_ancestor_db: max_hops cap → False # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_p3_9_is_ancestor_db_max_hops_cap() -> None: """_is_ancestor_db returns False when max_hops is reached before finding ancestor. Chain: C5 → C4 → C3 → C2 → C1. ancestor=C1, descendant=C5, max_hops=3 → walk stops at C3 → False. """ from musehub.services.musehub_wire import _is_ancestor_db c1 = _commit("C1", [], 1) c2 = _commit("C2", ["C1"], 2) c3 = _commit("C3", ["C2"], 3) c4 = _commit("C4", ["C3"], 4) c5 = _commit("C5", ["C4"], 5) session = MagicMock() # DFS from C5, max 3 nodes: adj("C5"), adj("C4"), adj("C3") → stop session.execute = AsyncMock(side_effect=[ _exec_result(scalar=c5), _exec_result(scalar=c4), _exec_result(scalar=c3), ]) assert await _is_ancestor_db(session, "C1", "C5", "repo1", max_hops=3) is False