gabriel / musehub public
Closed #40 Enhancement
filed by gabriel human · 42 days ago

Generic DAG walker: consolidate inline commit-graph BFS across MuseHub services

0 Anchors
Blast radius
Churn 30d
0 Proposals

Background

During the muse-core issue #6 (generic DAG walker), every standalone `deque + seen: set + read_commit` BFS pattern in the Muse engine was replaced with `walk_dag` / `iter_ancestors` / `ancestor_ids` from `muse.core.graph`.

The same audit applied to MuseHub reveals 12 inline commit-graph BFS/DFS loops across 7 service files. Each independently re-implements the same pattern — a frontier (deque or list), a visited set, a parent-lookup step, and a while loop. None share any utility.

There are two distinct categories:

  1. In-memory BFS — the parent map is pre-loaded from the DB into a dict before the loop. The loop itself is pure Python. These can use `walk_dag` from `muse.core.graph` directly (synchronous, adjacency closure over the dict).

  2. DB-query BFS — each step reads commits from the DB (SQLAlchemy `AsyncSession`). These need a new `walk_dag_async` utility local to MuseHub that accepts an async adjacency function.

A third category — batched frontier walks (`wire_fetch_stream`, `wire_negotiate`) — issues one DB query per BFS level to fetch all frontier commits at once. These cannot be naively delegated to a per-node async walker without losing the batching optimization. They are documented exceptions.


Inventory of inline walk sites

Category A — In-memory BFS (use `muse.core.graph.walk_dag`)

`musehub/services/musehub_intel_providers.py`

Three providers share an identical BFS body (copy-pasted). Each pre-loads a `commit_parents: dict[str, list[str]]` from the DB, then runs:

walk_order: list[str] = []
visited: set[str] = set()
queue: deque[str] = deque([ref])
while queue and len(walk_order) < self._MAX_WALK:
    cid = queue.popleft()
    if cid in visited or cid not in commit_parents:
        continue
    visited.add(cid)
    walk_order.append(cid)
    for parent in commit_parents[cid]:
        if parent and parent not in visited:
            queue.append(parent)
Provider Approx line
`_FileCouplingProvider.compute()` (BFS inside `CouplingProvider`) 511
`_EntangleProvider.compute()` 642
`_VelocityProvider.compute()` 1134

`musehub/services/musehub_gc.py` — `run_gc()` (line ~124)

Pre-loads `all_commits: dict[str, list[str]]` then:

reachable: set[str] = set()
queue = list(heads)
while queue:
    cid = queue.pop()
    if cid in reachable or cid not in all_commits:
        continue
    reachable.add(cid)
    queue.extend(all_commits[cid])

`musehub/services/musehub_sync.py` — `_is_fast_forward()` (line ~99)

Builds `parent_map` from the push bundle then:

visited: set[str] = set()
frontier = [head_commit_id]
while frontier:
    current = frontier.pop()
    if current in visited:
        continue
    visited.add(current)
    if current == remote_head:
        return True
    for parent in parent_map.get(current, []):
        if parent not in visited:
            frontier.append(parent)

`musehub/services/musehub_wire.py` — `_is_ancestor_in_bundle()` (line ~941)

Builds `commit_by_id` from bundle list then:

visited: set[str] = set()
frontier: list[str] = [c.commit_id for c in commits]
while frontier:
    cid = frontier.pop()
    if cid in visited:
        continue
    visited.add(cid)
    if cid == head_id:
        return True
    row = commit_by_id.get(cid)
    if row is None:
        continue
    for pid in filter(None, [row.parent_commit_id, row.parent2_commit_id]):
        if pid not in visited:
            frontier.append(pid)

`musehub/services/musehub_divergence.py` — `find_common_ancestor()` (line ~324)

Builds `all_commits` dict from two commit lists then:

a_ancestry: set[str] = set()
frontier = list(a_ids)
while frontier:
    cid = frontier.pop()
    if cid in a_ancestry:
        continue
    a_ancestry.add(cid)
    commit = all_commits.get(cid)
    if commit:
        frontier.extend(commit.parent_ids)
for commit in sorted(b_commits, key=lambda c: c.timestamp, reverse=True):
    if commit.commit_id in a_ancestry:
        return commit.commit_id

Category B — DB-query BFS (need `walk_dag_async`)

`musehub/services/musehub_symbol_indexer.py` — `_walk_new_commits()` (line ~274)

Two DFS variants. When `stop_at` is given (the incremental case):

new_commits: list[db.MusehubCommit] = []
visited: set[str] = {stop_at}
stack = [new_head]
while stack:
    cid = stack.pop()
    if cid in visited:
        continue
    visited.add(cid)
    row = (await session.execute(
        select(db.MusehubCommit).where(db.MusehubCommit.commit_id == cid)
    )).scalar_one_or_none()
    if row is None or row.repo_id != repo_id:
        continue
    new_commits.append(row)
    stack.extend(row.parent_ids or [])

`musehub/services/musehub_proposals.py` — `_resolve_ancestor_manifest()` (line ~569)

Two sequential walks (BFS to collect reachable commits, then first-parent walk):

# Walk 1 — collect to_branch ancestry
to_commit_ids: set[str] = set()
frontier = [to_b.head_commit_id] if to_b.head_commit_id else []
depth = 0
while frontier and depth < 200:
    nxt: list[str] = []
    for cid in frontier:
        if cid in to_commit_ids:
            continue
        to_commit_ids.add(cid)
        commit = await session.get(db.MusehubCommit, cid)
        if commit and commit.parent_ids:
            nxt.extend(commit.parent_ids)
    frontier = nxt
    depth += 1

# Walk 2 — first-parent walk on from_branch
fid = from_b.head_commit_id
depth = 0
while fid and depth < 200:
    commit = await session.get(db.MusehubCommit, fid)
    ...
    fid = (commit.parent_ids or [None])[0]
    depth += 1

`musehub/services/musehub_wire.py` — `_is_ancestor_db()` (line ~986)

Batched BFS with `max_hops` cap:

frontier: list[str] = [descendant_id]
visited: set[str] = set()
hops = 0
while frontier and hops < max_hops:
    batch = list(set(frontier) - visited)
    if not batch:
        break
    visited.update(batch)
    frontier = []
    hops += len(batch)
    rows = (await session.execute(
        select(db.MusehubCommit.commit_id, db.MusehubCommit.parent_ids)
        .where(db.MusehubCommit.commit_id.in_(batch), ...)
    )).all()
    for row in rows:
        for pid in (row.parent_ids or []):
            if pid == ancestor_id:
                return True
            if pid not in visited:
                frontier.append(pid)

Category C — Batched frontier walks (documented exceptions)

`musehub/services/musehub_wire.py` — `wire_fetch_stream()` (lines ~562, ~588)

Two BFS variants (unbounded and depth-limited). Both issue one batch DB query per BFS level (`commit_id.in_(batch)`), fetching the entire frontier in a single round trip. A per-node async walker would degrade this to N queries per level. Keep as-is.

`musehub/services/musehub_wire.py` — `wire_negotiate()` (line ~854)

Same batch-per-level pattern — one query per BFS frontier against `db.MusehubCommit`. Keep as-is.


Proposed utility: `walk_dag_async`

For Category B, add a MuseHub-local async counterpart to `muse.core.graph.walk_dag`:

# musehub/graph/walk.py

from collections.abc import AsyncIterator, Callable, Coroutine
from typing import TypeVar

T = TypeVar("T")

async def walk_dag_async(
    starts: "T | list[T]",
    adjacency: "Callable[[T], Coroutine[None, None, list[T]]]",
    *,
    order: "Literal['bfs', 'dfs']" = "bfs",
    exclude: "set[T] | None" = None,
    max_nodes: "int | None" = None,
) -> "AsyncIterator[T]":
    """Async BFS/DFS over any DAG with an async adjacency function.

    Mirrors muse.core.graph.walk_dag but supports async adjacency
    (e.g. SQLAlchemy AsyncSession queries).
    """
    ...

Category A uses `muse.core.graph.walk_dag` directly (sync, in-memory dict adjacency).


Implementation plan

All phases follow TDD: write failing tests → implement → verify green → commit.

Phase 1 — `walk_dag_async` utility + tests

File: `musehub/graph/walk.py`

Tests to write first (`tests/test_walk_dag_async.py`):

  • W1 Structural — function exists and is an async generator
  • W2 Behavioural — BFS order: linear chain A→B→C yields A, B, C
  • W3 Behavioural — `exclude` set prevents visiting excluded nodes and their subtrees
  • W4 Behavioural — `max_nodes` cap stops the walk after N nodes
  • W5 Behavioural — DFS order: same chain yields depth-first
  • W6 Behavioural — multi-root start seeds from all roots simultaneously
  • W7 Behavioural — cycles handled (each node yielded at most once)

Phase 2 — In-memory BFS migration (`walk_dag` from muse)

Migrate all Category A sites. Each migration follows the pattern:

# Before
walk_order: list[str] = []
visited: set[str] = set()
queue: deque[str] = deque([ref])
while queue and len(walk_order) < self._MAX_WALK:
    ...

# After
from muse.core.graph import walk_dag
walk_order = list(walk_dag(
    ref,
    lambda cid: [p for p in commit_parents.get(cid, []) if p],
    max_nodes=self._MAX_WALK,
))

Tests to write first:

ID Target Type Assertion
P2-1 `CouplingProvider.compute` BFS Structural source contains `walk_dag`; no inline `deque`
P2-2 `EntangleProvider.compute` BFS Structural same
P2-3 `VelocityProvider.compute` BFS Structural same
P2-4 `run_gc` reachability Structural source contains `walk_dag`; no inline `while queue`
P2-5 `_is_fast_forward` Structural source contains `walk_dag`; no inline `while frontier`
P2-6 `_is_ancestor_in_bundle` Structural source contains `walk_dag`; no inline `while frontier`
P2-7 `find_common_ancestor` Structural source contains `walk_dag`; no inline `while frontier`
P2-8 Provider coupling BFS Behavioural walk_order is correct for a 3-commit chain
P2-9 `run_gc` reachability Behavioural reachable set matches expected for a forked graph
P2-10 `_is_fast_forward` Behavioural returns True when remote head is ancestor; False otherwise
P2-11 `_is_ancestor_in_bundle` Behavioural returns True/False for reachable/unreachable head
P2-12 `find_common_ancestor` Behavioural returns correct LCA for a diamond graph

Phase 3 — DB-query BFS migration (`walk_dag_async`)

Migrate all Category B sites. Each migration replaces the inline loop with `walk_dag_async` and an async adjacency closure over `session`.

Tests to write first:

ID Target Type Assertion
P3-1 `_walk_new_commits` Structural source contains `walk_dag_async`; no inline `while stack`
P3-2 `_resolve_ancestor_manifest` walk 1 Structural source contains `walk_dag_async`; no inline `while frontier`
P3-3 `_resolve_ancestor_manifest` walk 2 Structural source contains `walk_dag_async` with `first_parent_only`; no inline `while fid`
P3-4 `_is_ancestor_db` Structural source contains `walk_dag_async`; no inline `while frontier`
P3-5 `_walk_new_commits` Behavioural returns commits from new_head down to stop_at (exclusive)
P3-6 `_walk_new_commits` no stop_at Behavioural returns all commits from new_head
P3-7 `_resolve_ancestor_manifest` Behavioural returns snapshot_id at LCA of two branches
P3-8 `_is_ancestor_db` Behavioural returns True when ancestor is reachable; False otherwise
P3-9 `_is_ancestor_db` Behavioural respects `max_hops` cap

Phase 4 — Dedup intel provider BFS

The three providers (Coupling, Entangle, Velocity) share an identical 10-line BFS body and an identical pre-load query. Extract both into a shared helper:

async def _load_commit_walk(
    session: AsyncSession,
    repo_id: str,
    ref: str,
    max_walk: int,
) -> list[str]:
    """Return commit IDs in BFS order from ref, up to max_walk."""
    ...

Tests to write first:

ID Type Assertion
P4-1 Structural each provider delegates to `_load_commit_walk`; no inline BFS body
P4-2 Behavioural `_load_commit_walk` returns correct walk order for a 4-commit chain
P4-3 Behavioural `_load_commit_walk` respects `max_walk` cap

Success criteria

  • Zero standalone `deque + visited: set + while` commit-graph BFS patterns in Category A files
  • Zero standalone `visited: set + while stack/frontier + await session` commit-graph BFS patterns in Category B files
  • `walk_dag_async` has ≥ 7 unit tests (W1–W7), all green
  • All migrated functions have at least one structural test (assert uses the new utility) and one behavioural test (assert correct output)
  • All existing tests pass at each phase boundary

Documented exceptions (do not migrate)

  • `wire_fetch_stream` (two BFS variants) — batched frontier queries; per-node async walker would break batching
  • `wire_negotiate` — same batch-per-level pattern
  • `_topological_sort` in `musehub_wire.py` — Kahn's algorithm, specific semantics
  • `build_dag` in `musehub_repository.py` — Kahn's topological sort
  • `topological_sort` in `proposal_dag.py` — proposal dependency DAG, not commit graph
  • `RootDistanceIndex.build` in `graph/depth.py` — identity hierarchy with distance tracking
  • `CycleDetector.assert_no_cycle` in `graph/cycle.py` — identity DAG cycle detection
Activity4
gabriel opened this issue 42 days ago
gabriel 42 days ago

Phase 1 done — walk_dag_async shipped

Commit: sha256:2a9a09d2bf95

What was built

New file: musehub/graph/walk.py — async BFS/DFS DAG walker.

async def walk_dag_async(
    starts: T | list[T] | Iterable[T],
    adjacency: Callable[[T], Coroutine[None, None, list[T]]],
    *,
    order: Literal['bfs', 'dfs'] = 'bfs',
    exclude: set[T] | frozenset[T] | None = None,
    max_nodes: int | None = None,
) -> AsyncIterator[T]:

Test coverage (all green)

Test Assertion
W1 walk_dag_async is an async generator (__aiter__, __anext__)
W2 BFS on linear chain A→B→C yields A, B, C in order
W3 exclude set prevents visiting excluded nodes and their subtrees
W4 max_nodes cap stops the walk after exactly N nodes
W5 DFS on diamond graph yields different order than BFS
W6 Multi-root seeds the walk from all starts simultaneously
W7 Cycles: each node yielded at most once (walk terminates)

Infrastructure

Added tests/unit/ directory with its own conftest.py that overrides the session-scoped _db_schema fixture — unit tests run without a live DB connection.

Phase 2 (in-memory BFS migration via muse.core.graph.walk_dag) is next.

gabriel 42 days ago

Phase 2 complete ✓

Commit: sha256:a053ab7ec888

Migrated all 7 in-memory BFS/DFS sites to muse.core.graph.walk_dag:

Site File Pattern replaced
CouplingProvider.compute musehub_intel_providers.py deque loop → walk_dag list comprehension
EntangleProvider.compute musehub_intel_providers.py deque loop → walk_dag list comprehension
VelocityProvider.compute musehub_intel_providers.py deque loop → walk_dag list comprehension
run_gc reachability musehub_gc.py while-queue → walk_dag set comprehension
_is_fast_forward musehub_sync.py while-frontier → walk_dag early-return loop
_is_ancestor_in_bundle musehub_wire.py while-frontier → walk_dag with adjacency closure
find_common_ancestor musehub_divergence.py while-frontier → walk_dag set construction

Tests: 12 tests added (tests/unit/test_phase2_inMemory_walk_dag.py): 7 structural (assert walk_dag present, no inline deque) + 5 behavioural. All 12 green.

Phase 3 (DB-query BFS → walk_dag_async) is next.

gabriel 42 days ago

Phase 3 complete ✓

Commit: sha256:3f47c35a696c

Migrated all 3 Category B (async DB-query) inline walk sites to walk_dag_async:

Site File Pattern replaced
_walk_new_commits (both branches) musehub_symbol_indexer.py Two inline while-stack DFS → walk_dag_async with async adjacency closures
_resolve_ancestor_manifest (walk 1 + walk 2) musehub_proposals.py while-frontier + while-fid → two walk_dag_async calls (max_nodes=200)
_is_ancestor_db musehub_wire.py Batched while-frontier BFS → walk_dag_async + scalar adjacency (max_nodes=max_hops)

Tests: 9 tests added (tests/unit/test_phase3_db_walk_dag_async.py): 4 structural + 5 behavioural. All 28 unit tests across phases 1–3 green.

Documented exceptions kept intact (Category C — batch-per-frontier, cannot delegate to per-node walker):

  • wire_fetch_stream (two BFS variants)
  • wire_negotiate

Phase 4 (dedup intel provider BFS body → shared _load_commit_walk helper) is next.

gabriel 42 days ago

Phase 4 complete ✓ — issue #40 fully done

Commit: sha256:14b7e0ec1180

Extracted the shared DB-fetch + BFS body from all three intel providers into a single helper:

async def _load_commit_walk(
    session: AsyncSession,
    repo_id: str,
    ref: str,
    max_walk: int,
) -> list[str]:

Each provider's compute now calls await _load_commit_walk(...) in place of the 15-line repeated fetch+walk block. Zero duplication remains.

Tests: 5 tests added (tests/unit/test_phase4_load_commit_walk.py): 3 structural (one per provider) + 2 behavioural (BFS order on diamond graph, max_walk cap). All 33 unit tests across all four phases green.

P2-1 through P2-3 structural tests updated to reflect the new architecture (walk now lives in the helper, not in compute directly).


Issue #40 complete

All phases delivered:

Phase Description Tests Commits
1 walk_dag_async utility W1–W7 (7) sha256:2a9a09d2bf95
2 In-memory BFS → walk_dag (7 sites) P2-1–P2-12 (12) sha256:a053ab7ec888
3 DB-query BFS → walk_dag_async (3 sites) P3-1–P3-9 (9) sha256:3f47c35a696c
4 Dedup intel provider BFS → _load_commit_walk P4-1–P4-3 (5) sha256:14b7e0ec1180

Total: 33 unit tests, zero inline commit-graph BFS/DFS loops remaining in Category A or B files.