Generic DAG walker: consolidate inline commit-graph BFS across MuseHub services
Background
During the muse-core issue #6 (generic DAG walker), every standalone `deque + seen: set + read_commit` BFS pattern in the Muse engine was replaced with `walk_dag` / `iter_ancestors` / `ancestor_ids` from `muse.core.graph`.
The same audit applied to MuseHub reveals 12 inline commit-graph BFS/DFS loops across 7 service files. Each independently re-implements the same pattern — a frontier (deque or list), a visited set, a parent-lookup step, and a while loop. None share any utility.
There are two distinct categories:
In-memory BFS — the parent map is pre-loaded from the DB into a dict before the loop. The loop itself is pure Python. These can use `walk_dag` from `muse.core.graph` directly (synchronous, adjacency closure over the dict).
DB-query BFS — each step reads commits from the DB (SQLAlchemy `AsyncSession`). These need a new `walk_dag_async` utility local to MuseHub that accepts an async adjacency function.
A third category — batched frontier walks (`wire_fetch_stream`, `wire_negotiate`) — issues one DB query per BFS level to fetch all frontier commits at once. These cannot be naively delegated to a per-node async walker without losing the batching optimization. They are documented exceptions.
Inventory of inline walk sites
Category A — In-memory BFS (use `muse.core.graph.walk_dag`)
`musehub/services/musehub_intel_providers.py`
Three providers share an identical BFS body (copy-pasted). Each pre-loads a `commit_parents: dict[str, list[str]]` from the DB, then runs:
walk_order: list[str] = []
visited: set[str] = set()
queue: deque[str] = deque([ref])
while queue and len(walk_order) < self._MAX_WALK:
cid = queue.popleft()
if cid in visited or cid not in commit_parents:
continue
visited.add(cid)
walk_order.append(cid)
for parent in commit_parents[cid]:
if parent and parent not in visited:
queue.append(parent)
| Provider | Approx line |
|---|---|
| `_FileCouplingProvider.compute()` (BFS inside `CouplingProvider`) | 511 |
| `_EntangleProvider.compute()` | 642 |
| `_VelocityProvider.compute()` | 1134 |
`musehub/services/musehub_gc.py` — `run_gc()` (line ~124)
Pre-loads `all_commits: dict[str, list[str]]` then:
reachable: set[str] = set()
queue = list(heads)
while queue:
cid = queue.pop()
if cid in reachable or cid not in all_commits:
continue
reachable.add(cid)
queue.extend(all_commits[cid])
`musehub/services/musehub_sync.py` — `_is_fast_forward()` (line ~99)
Builds `parent_map` from the push bundle then:
visited: set[str] = set()
frontier = [head_commit_id]
while frontier:
current = frontier.pop()
if current in visited:
continue
visited.add(current)
if current == remote_head:
return True
for parent in parent_map.get(current, []):
if parent not in visited:
frontier.append(parent)
`musehub/services/musehub_wire.py` — `_is_ancestor_in_bundle()` (line ~941)
Builds `commit_by_id` from bundle list then:
visited: set[str] = set()
frontier: list[str] = [c.commit_id for c in commits]
while frontier:
cid = frontier.pop()
if cid in visited:
continue
visited.add(cid)
if cid == head_id:
return True
row = commit_by_id.get(cid)
if row is None:
continue
for pid in filter(None, [row.parent_commit_id, row.parent2_commit_id]):
if pid not in visited:
frontier.append(pid)
`musehub/services/musehub_divergence.py` — `find_common_ancestor()` (line ~324)
Builds `all_commits` dict from two commit lists then:
a_ancestry: set[str] = set()
frontier = list(a_ids)
while frontier:
cid = frontier.pop()
if cid in a_ancestry:
continue
a_ancestry.add(cid)
commit = all_commits.get(cid)
if commit:
frontier.extend(commit.parent_ids)
for commit in sorted(b_commits, key=lambda c: c.timestamp, reverse=True):
if commit.commit_id in a_ancestry:
return commit.commit_id
Category B — DB-query BFS (need `walk_dag_async`)
`musehub/services/musehub_symbol_indexer.py` — `_walk_new_commits()` (line ~274)
Two DFS variants. When `stop_at` is given (the incremental case):
new_commits: list[db.MusehubCommit] = []
visited: set[str] = {stop_at}
stack = [new_head]
while stack:
cid = stack.pop()
if cid in visited:
continue
visited.add(cid)
row = (await session.execute(
select(db.MusehubCommit).where(db.MusehubCommit.commit_id == cid)
)).scalar_one_or_none()
if row is None or row.repo_id != repo_id:
continue
new_commits.append(row)
stack.extend(row.parent_ids or [])
`musehub/services/musehub_proposals.py` — `_resolve_ancestor_manifest()` (line ~569)
Two sequential walks (BFS to collect reachable commits, then first-parent walk):
# Walk 1 — collect to_branch ancestry
to_commit_ids: set[str] = set()
frontier = [to_b.head_commit_id] if to_b.head_commit_id else []
depth = 0
while frontier and depth < 200:
nxt: list[str] = []
for cid in frontier:
if cid in to_commit_ids:
continue
to_commit_ids.add(cid)
commit = await session.get(db.MusehubCommit, cid)
if commit and commit.parent_ids:
nxt.extend(commit.parent_ids)
frontier = nxt
depth += 1
# Walk 2 — first-parent walk on from_branch
fid = from_b.head_commit_id
depth = 0
while fid and depth < 200:
commit = await session.get(db.MusehubCommit, fid)
...
fid = (commit.parent_ids or [None])[0]
depth += 1
`musehub/services/musehub_wire.py` — `_is_ancestor_db()` (line ~986)
Batched BFS with `max_hops` cap:
frontier: list[str] = [descendant_id]
visited: set[str] = set()
hops = 0
while frontier and hops < max_hops:
batch = list(set(frontier) - visited)
if not batch:
break
visited.update(batch)
frontier = []
hops += len(batch)
rows = (await session.execute(
select(db.MusehubCommit.commit_id, db.MusehubCommit.parent_ids)
.where(db.MusehubCommit.commit_id.in_(batch), ...)
)).all()
for row in rows:
for pid in (row.parent_ids or []):
if pid == ancestor_id:
return True
if pid not in visited:
frontier.append(pid)
Category C — Batched frontier walks (documented exceptions)
`musehub/services/musehub_wire.py` — `wire_fetch_stream()` (lines ~562, ~588)
Two BFS variants (unbounded and depth-limited). Both issue one batch DB query per BFS level (`commit_id.in_(batch)`), fetching the entire frontier in a single round trip. A per-node async walker would degrade this to N queries per level. Keep as-is.
`musehub/services/musehub_wire.py` — `wire_negotiate()` (line ~854)
Same batch-per-level pattern — one query per BFS frontier against `db.MusehubCommit`. Keep as-is.
Proposed utility: `walk_dag_async`
For Category B, add a MuseHub-local async counterpart to `muse.core.graph.walk_dag`:
# musehub/graph/walk.py
from collections.abc import AsyncIterator, Callable, Coroutine
from typing import TypeVar
T = TypeVar("T")
async def walk_dag_async(
starts: "T | list[T]",
adjacency: "Callable[[T], Coroutine[None, None, list[T]]]",
*,
order: "Literal['bfs', 'dfs']" = "bfs",
exclude: "set[T] | None" = None,
max_nodes: "int | None" = None,
) -> "AsyncIterator[T]":
"""Async BFS/DFS over any DAG with an async adjacency function.
Mirrors muse.core.graph.walk_dag but supports async adjacency
(e.g. SQLAlchemy AsyncSession queries).
"""
...
Category A uses `muse.core.graph.walk_dag` directly (sync, in-memory dict adjacency).
Implementation plan
All phases follow TDD: write failing tests → implement → verify green → commit.
Phase 1 — `walk_dag_async` utility + tests
File: `musehub/graph/walk.py`
Tests to write first (`tests/test_walk_dag_async.py`):
- W1 Structural — function exists and is an async generator
- W2 Behavioural — BFS order: linear chain A→B→C yields A, B, C
- W3 Behavioural — `exclude` set prevents visiting excluded nodes and their subtrees
- W4 Behavioural — `max_nodes` cap stops the walk after N nodes
- W5 Behavioural — DFS order: same chain yields depth-first
- W6 Behavioural — multi-root start seeds from all roots simultaneously
- W7 Behavioural — cycles handled (each node yielded at most once)
Phase 2 — In-memory BFS migration (`walk_dag` from muse)
Migrate all Category A sites. Each migration follows the pattern:
# Before
walk_order: list[str] = []
visited: set[str] = set()
queue: deque[str] = deque([ref])
while queue and len(walk_order) < self._MAX_WALK:
...
# After
from muse.core.graph import walk_dag
walk_order = list(walk_dag(
ref,
lambda cid: [p for p in commit_parents.get(cid, []) if p],
max_nodes=self._MAX_WALK,
))
Tests to write first:
| ID | Target | Type | Assertion |
|---|---|---|---|
| P2-1 | `CouplingProvider.compute` BFS | Structural | source contains `walk_dag`; no inline `deque` |
| P2-2 | `EntangleProvider.compute` BFS | Structural | same |
| P2-3 | `VelocityProvider.compute` BFS | Structural | same |
| P2-4 | `run_gc` reachability | Structural | source contains `walk_dag`; no inline `while queue` |
| P2-5 | `_is_fast_forward` | Structural | source contains `walk_dag`; no inline `while frontier` |
| P2-6 | `_is_ancestor_in_bundle` | Structural | source contains `walk_dag`; no inline `while frontier` |
| P2-7 | `find_common_ancestor` | Structural | source contains `walk_dag`; no inline `while frontier` |
| P2-8 | Provider coupling BFS | Behavioural | walk_order is correct for a 3-commit chain |
| P2-9 | `run_gc` reachability | Behavioural | reachable set matches expected for a forked graph |
| P2-10 | `_is_fast_forward` | Behavioural | returns True when remote head is ancestor; False otherwise |
| P2-11 | `_is_ancestor_in_bundle` | Behavioural | returns True/False for reachable/unreachable head |
| P2-12 | `find_common_ancestor` | Behavioural | returns correct LCA for a diamond graph |
Phase 3 — DB-query BFS migration (`walk_dag_async`)
Migrate all Category B sites. Each migration replaces the inline loop with `walk_dag_async` and an async adjacency closure over `session`.
Tests to write first:
| ID | Target | Type | Assertion |
|---|---|---|---|
| P3-1 | `_walk_new_commits` | Structural | source contains `walk_dag_async`; no inline `while stack` |
| P3-2 | `_resolve_ancestor_manifest` walk 1 | Structural | source contains `walk_dag_async`; no inline `while frontier` |
| P3-3 | `_resolve_ancestor_manifest` walk 2 | Structural | source contains `walk_dag_async` with `first_parent_only`; no inline `while fid` |
| P3-4 | `_is_ancestor_db` | Structural | source contains `walk_dag_async`; no inline `while frontier` |
| P3-5 | `_walk_new_commits` | Behavioural | returns commits from new_head down to stop_at (exclusive) |
| P3-6 | `_walk_new_commits` no stop_at | Behavioural | returns all commits from new_head |
| P3-7 | `_resolve_ancestor_manifest` | Behavioural | returns snapshot_id at LCA of two branches |
| P3-8 | `_is_ancestor_db` | Behavioural | returns True when ancestor is reachable; False otherwise |
| P3-9 | `_is_ancestor_db` | Behavioural | respects `max_hops` cap |
Phase 4 — Dedup intel provider BFS
The three providers (Coupling, Entangle, Velocity) share an identical 10-line BFS body and an identical pre-load query. Extract both into a shared helper:
async def _load_commit_walk(
session: AsyncSession,
repo_id: str,
ref: str,
max_walk: int,
) -> list[str]:
"""Return commit IDs in BFS order from ref, up to max_walk."""
...
Tests to write first:
| ID | Type | Assertion |
|---|---|---|
| P4-1 | Structural | each provider delegates to `_load_commit_walk`; no inline BFS body |
| P4-2 | Behavioural | `_load_commit_walk` returns correct walk order for a 4-commit chain |
| P4-3 | Behavioural | `_load_commit_walk` respects `max_walk` cap |
Success criteria
- Zero standalone `deque + visited: set + while` commit-graph BFS patterns in Category A files
- Zero standalone `visited: set + while stack/frontier + await session` commit-graph BFS patterns in Category B files
- `walk_dag_async` has ≥ 7 unit tests (W1–W7), all green
- All migrated functions have at least one structural test (assert uses the new utility) and one behavioural test (assert correct output)
- All existing tests pass at each phase boundary
Documented exceptions (do not migrate)
- `wire_fetch_stream` (two BFS variants) — batched frontier queries; per-node async walker would break batching
- `wire_negotiate` — same batch-per-level pattern
- `_topological_sort` in `musehub_wire.py` — Kahn's algorithm, specific semantics
- `build_dag` in `musehub_repository.py` — Kahn's topological sort
- `topological_sort` in `proposal_dag.py` — proposal dependency DAG, not commit graph
- `RootDistanceIndex.build` in `graph/depth.py` — identity hierarchy with distance tracking
- `CycleDetector.assert_no_cycle` in `graph/cycle.py` — identity DAG cycle detection
Phase 2 complete ✓
Commit: sha256:a053ab7ec888
Migrated all 7 in-memory BFS/DFS sites to muse.core.graph.walk_dag:
| Site | File | Pattern replaced |
|---|---|---|
CouplingProvider.compute |
musehub_intel_providers.py |
deque loop → walk_dag list comprehension |
EntangleProvider.compute |
musehub_intel_providers.py |
deque loop → walk_dag list comprehension |
VelocityProvider.compute |
musehub_intel_providers.py |
deque loop → walk_dag list comprehension |
run_gc reachability |
musehub_gc.py |
while-queue → walk_dag set comprehension |
_is_fast_forward |
musehub_sync.py |
while-frontier → walk_dag early-return loop |
_is_ancestor_in_bundle |
musehub_wire.py |
while-frontier → walk_dag with adjacency closure |
find_common_ancestor |
musehub_divergence.py |
while-frontier → walk_dag set construction |
Tests: 12 tests added (tests/unit/test_phase2_inMemory_walk_dag.py): 7 structural (assert walk_dag present, no inline deque) + 5 behavioural. All 12 green.
Phase 3 (DB-query BFS → walk_dag_async) is next.
Phase 3 complete ✓
Commit: sha256:3f47c35a696c
Migrated all 3 Category B (async DB-query) inline walk sites to walk_dag_async:
| Site | File | Pattern replaced |
|---|---|---|
_walk_new_commits (both branches) |
musehub_symbol_indexer.py |
Two inline while-stack DFS → walk_dag_async with async adjacency closures |
_resolve_ancestor_manifest (walk 1 + walk 2) |
musehub_proposals.py |
while-frontier + while-fid → two walk_dag_async calls (max_nodes=200) |
_is_ancestor_db |
musehub_wire.py |
Batched while-frontier BFS → walk_dag_async + scalar adjacency (max_nodes=max_hops) |
Tests: 9 tests added (tests/unit/test_phase3_db_walk_dag_async.py): 4 structural + 5 behavioural. All 28 unit tests across phases 1–3 green.
Documented exceptions kept intact (Category C — batch-per-frontier, cannot delegate to per-node walker):
wire_fetch_stream(two BFS variants)wire_negotiate
Phase 4 (dedup intel provider BFS body → shared _load_commit_walk helper) is next.
Phase 4 complete ✓ — issue #40 fully done
Commit: sha256:14b7e0ec1180
Extracted the shared DB-fetch + BFS body from all three intel providers into a single helper:
async def _load_commit_walk(
session: AsyncSession,
repo_id: str,
ref: str,
max_walk: int,
) -> list[str]:
Each provider's compute now calls await _load_commit_walk(...) in place of the 15-line repeated fetch+walk block. Zero duplication remains.
Tests: 5 tests added (tests/unit/test_phase4_load_commit_walk.py): 3 structural (one per provider) + 2 behavioural (BFS order on diamond graph, max_walk cap). All 33 unit tests across all four phases green.
P2-1 through P2-3 structural tests updated to reflect the new architecture (walk now lives in the helper, not in compute directly).
Issue #40 complete
All phases delivered:
| Phase | Description | Tests | Commits |
|---|---|---|---|
| 1 | walk_dag_async utility |
W1–W7 (7) | sha256:2a9a09d2bf95 |
| 2 | In-memory BFS → walk_dag (7 sites) |
P2-1–P2-12 (12) | sha256:a053ab7ec888 |
| 3 | DB-query BFS → walk_dag_async (3 sites) |
P3-1–P3-9 (9) | sha256:3f47c35a696c |
| 4 | Dedup intel provider BFS → _load_commit_walk |
P4-1–P4-3 (5) | sha256:14b7e0ec1180 |
Total: 33 unit tests, zero inline commit-graph BFS/DFS loops remaining in Category A or B files.
Phase 1 done —
walk_dag_asyncshippedCommit:
sha256:2a9a09d2bf95What was built
New file:
musehub/graph/walk.py— async BFS/DFS DAG walker.Test coverage (all green)
walk_dag_asyncis an async generator (__aiter__,__anext__)excludeset prevents visiting excluded nodes and their subtreesmax_nodescap stops the walk after exactly N nodesInfrastructure
Added
tests/unit/directory with its ownconftest.pythat overrides the session-scoped_db_schemafixture — unit tests run without a live DB connection.Phase 2 (in-memory BFS migration via
muse.core.graph.walk_dag) is next.