"""TDD spec for EntangleProvider — issue #13, Phase 5. Verifies that EntangleProvider reproduces the same co-change analysis as ``muse code entangle``: Jaccard-min rate, import filter, mass-commit exclusion, canonical pair ordering, and repo isolation. Eight test tiers (54 cases) --------------------------- Unit ET_01 – ET_08 rate formula, import filter, pair canonicalisation Integration ET_09 – ET_18 provider upserts, re-runs, row counts E2E ET_19 – ET_25 full seeded scenarios Stress ET_26 – ET_30 500-symbol batch, mass-commit exclusion State ET_31 – ET_36 idempotency, incremental updates, stale-row purge Integrity ET_37 – ET_41 corrupt addresses, NULL exclusion, file-same filter Performance ET_42 – ET_46 timing bounds on realistic datasets Security ET_47 – ET_54 injection strings, repo isolation, address length cap """ from __future__ import annotations import secrets import time from collections import defaultdict from itertools import combinations import pytest import pytest_asyncio import sqlalchemy as sa from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import fake_id, long_id from musehub.db.musehub_intel_models import MusehubIntelEntangle, MusehubSymbolHistoryEntry from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo from musehub.services.musehub_intel_providers import EntangleProvider from musehub.types.json_types import JSONObject from tests.factories import create_repo # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── def _uid() -> str: return fake_id(secrets.token_hex(16)) def _cid() -> str: return long_id(secrets.token_hex(32)) _OWNER = "testuser" _SLUG = "entangleprovider" async def _seed_commit( session: AsyncSession, repo_id: str, commit_id: str, parent_ids: list[str] | None = None, ) -> None: from datetime import datetime, timezone stmt = ( pg_insert(MusehubCommit) .values( commit_id=commit_id, message="test commit", author="test", branch="dev", parent_ids=parent_ids or [], snapshot_id=None, timestamp=datetime.now(tz=timezone.utc), ) .on_conflict_do_nothing() ) await session.execute(stmt) ref_stmt = ( pg_insert(MusehubCommitRef) .values(repo_id=repo_id, commit_id=commit_id) .on_conflict_do_nothing() ) await session.execute(ref_stmt) async def _seed_history( session: AsyncSession, repo_id: str, commit_id: str, addresses: list[str], ) -> None: from datetime import datetime, timezone now = datetime.now(tz=timezone.utc) for addr in addresses: stmt = ( pg_insert(MusehubSymbolHistoryEntry) .values( repo_id=repo_id, address=addr, commit_id=commit_id, committed_at=now, op="update", ) .on_conflict_do_nothing() ) await session.execute(stmt) async def _run_provider( session: AsyncSession, repo_id: str, ref: str ) -> list[tuple[str, JSONObject]]: return await EntangleProvider().compute(session, repo_id, ref, {}) async def _fetch_pairs( session: AsyncSession, repo_id: str ) -> list[MusehubIntelEntangle]: result = await session.execute( sa.select(MusehubIntelEntangle) .where(MusehubIntelEntangle.repo_id == repo_id) .order_by( sa.desc(MusehubIntelEntangle.co_change_rate), sa.desc(MusehubIntelEntangle.co_changes), ) ) return list(result.scalars().all()) # ───────────────────────────────────────────────────────────────────────────── # Fixtures # ───────────────────────────────────────────────────────────────────────────── @pytest_asyncio.fixture async def repo(db_session: AsyncSession) -> MusehubRepo: return await create_repo(db_session, owner=_OWNER, slug=_SLUG) @pytest_asyncio.fixture async def two_repos(db_session: AsyncSession) -> tuple[MusehubRepo, MusehubRepo]: r1 = await create_repo(db_session, owner=_OWNER, slug="et-repo-1") r2 = await create_repo(db_session, owner=_OWNER, slug="et-repo-2") return r1, r2 # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — Unit: rate formula, import filter, pair canonicalisation # ───────────────────────────────────────────────────────────────────────────── class TestEntangleUnit: """Pure-function unit tests — no database required.""" def test_ET_01_jaccard_min_rate_perfect(self) -> None: """100% rate: A and B co-change in every commit both appear.""" symbol_commits = { "src/billing.py::charge": {"c1", "c2", "c3"}, "src/ledger.py::record": {"c1", "c2", "c3"}, } a, b = "src/billing.py::charge", "src/ledger.py::record" co = 3 rate = co / min(len(symbol_commits[a]), len(symbol_commits[b])) assert rate == 1.0 def test_ET_02_jaccard_min_rate_partial(self) -> None: """Partial rate: B appears only in a subset of A's commits.""" symbol_commits = { "src/a.py::fn1": {"c1", "c2", "c3", "c4", "c5"}, "src/b.py::fn2": {"c1", "c2"}, } a, b = "src/a.py::fn1", "src/b.py::fn2" co = 2 rate = co / min(len(symbol_commits[a]), len(symbol_commits[b])) assert rate == 1.0 def test_ET_03_jaccard_min_rate_low(self) -> None: """Low coupling: only 1 of 10 of B's commits overlap.""" symbol_commits = { "src/a.py::fn1": {"c1"}, "src/b.py::fn2": {"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8", "c9", "c10"}, } a, b = "src/a.py::fn1", "src/b.py::fn2" co = 1 rate = co / min(len(symbol_commits[a]), len(symbol_commits[b])) assert rate == 1.0 def test_ET_04_import_pseudo_symbol_excluded(self) -> None: """Addresses containing ::import:: must be filtered.""" addr = "src/billing.py::import::os" assert "::import::" in addr def test_ET_05_bare_path_excluded(self) -> None: """Addresses without '::' are bare file paths — not symbols.""" addr = "cloudflare" assert "::" not in addr def test_ET_06_pair_key_canonical_ordering(self) -> None: """Pair key is always (a, b) where a < b lexicographically.""" syms = ["src/z.py::zfn", "src/a.py::afn"] canonical = tuple(sorted(syms)) assert canonical == ("src/a.py::afn", "src/z.py::zfn") def test_ET_07_same_file_pairs_excluded(self) -> None: """Pairs where file_a == file_b must be excluded.""" a = "src/billing.py::charge" b = "src/billing.py::refund" assert a.split("::")[0] == b.split("::")[0] def test_ET_08_min_co_changes_threshold(self) -> None: """Pairs with co_changes < 2 are noise — must be excluded.""" provider = EntangleProvider() assert provider._MIN_CO_CHANGES == 2 # ───────────────────────────────────────────────────────────────────────────── # Tier 2 — Integration: provider upserts, reruns, row counts # ───────────────────────────────────────────────────────────────────────────── class TestEntangleIntegration: @pytest.mark.asyncio async def test_ET_09_empty_repo_returns_empty( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Provider on a repo with no commits returns empty results.""" ref = _cid() result = await _run_provider(db_session, repo.repo_id, ref) assert result == [] pairs = await _fetch_pairs(db_session, repo.repo_id) assert pairs == [] @pytest.mark.asyncio async def test_ET_10_no_history_entries_returns_empty( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Commits exist but no history entries → no pairs.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) await db_session.commit() result = await _run_provider(db_session, c2, {}) assert result == [] @pytest.mark.asyncio async def test_ET_11_two_symbols_in_one_commit_no_pair( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Single co-change commit yields co_changes=1 — below MIN_CO_CHANGES=2.""" c1 = _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_history(db_session, repo.repo_id, c1, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c1) pairs = await _fetch_pairs(db_session, repo.repo_id) assert pairs == [] @pytest.mark.asyncio async def test_ET_12_two_co_changes_produces_one_pair( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Exactly 2 co-change commits → 1 pair at rate 1.0.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/billing.py::charge", "src/ledger.py::record", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) pairs = await _fetch_pairs(db_session, repo.repo_id) assert len(pairs) == 1 p = pairs[0] assert p.co_changes == 2 assert p.co_change_rate == 1.0 @pytest.mark.asyncio async def test_ET_13_import_symbols_excluded( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Import pseudo-symbols are not stored as entangle pairs.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::import::os", "src/b.py::import::sys", "src/a.py::real_fn", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) pairs = await _fetch_pairs(db_session, repo.repo_id) for p in pairs: assert "::import::" not in p.symbol_a assert "::import::" not in p.symbol_b @pytest.mark.asyncio async def test_ET_14_bare_path_addresses_excluded( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Bare path entries (no '::') are not treated as symbols.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "cloudflare", "src/a.py::real_fn", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) pairs = await _fetch_pairs(db_session, repo.repo_id) for p in pairs: assert "::" in p.symbol_a assert "::" in p.symbol_b @pytest.mark.asyncio async def test_ET_15_same_file_pair_excluded( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Two symbols from the same file must not produce a pair.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/billing.py::charge", "src/billing.py::refund", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) pairs = await _fetch_pairs(db_session, repo.repo_id) assert pairs == [] @pytest.mark.asyncio async def test_ET_16_pair_stored_canonical_a_lt_b( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Stored pair always has symbol_a < symbol_b lexicographically.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/z.py::zfn", "src/a.py::afn", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) pairs = await _fetch_pairs(db_session, repo.repo_id) assert len(pairs) == 1 assert pairs[0].symbol_a <= pairs[0].symbol_b @pytest.mark.asyncio async def test_ET_17_file_a_b_populated( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """file_a and file_b columns derive from the symbol address.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/billing.py::charge", "src/ledger.py::record", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) pairs = await _fetch_pairs(db_session, repo.repo_id) assert len(pairs) == 1 p = pairs[0] assert p.file_a is not None and "/" in p.file_a assert p.file_b is not None and "/" in p.file_b assert p.file_a != p.file_b @pytest.mark.asyncio async def test_ET_18_commits_both_active_is_min( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """commits_both_active equals |commits_a ∪ commits_b| (Jaccard union).""" # B appears in 2 commits; A in 4 commits; co_changes = 2 # union = 4 + 2 - 2 = 4; rate = 2/4 = 0.5 commits = [_cid() for _ in range(4)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid # A in all 4 for cid in commits: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn_a"]) # B only in first 2 for cid in commits[:2]: await _seed_history(db_session, repo.repo_id, cid, ["src/b.py::fn_b"]) await db_session.commit() await _run_provider(db_session, repo.repo_id, commits[-1]) pairs = await _fetch_pairs(db_session, repo.repo_id) assert len(pairs) == 1 assert pairs[0].commits_both_active == 4 # union: 4 + 2 - 2 assert pairs[0].co_changes == 2 assert pairs[0].co_change_rate == 0.5 # ───────────────────────────────────────────────────────────────────────────── # Tier 3 — E2E: full seeded scenarios # ───────────────────────────────────────────────────────────────────────────── class TestEntangleE2E: @pytest.mark.asyncio async def test_ET_19_three_symbol_pair_ranking( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Three symbols; AB pairs more than AC; AB ranked first.""" commits = [_cid() for _ in range(5)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid # A+B co-change in all 5 for cid in commits: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) # A+C co-change in only 2 for cid in commits[:2]: await _seed_history(db_session, repo.repo_id, cid, ["src/c.py::fn_c"]) await db_session.commit() await _run_provider(db_session, repo.repo_id, commits[-1]) pairs = await _fetch_pairs(db_session, repo.repo_id) assert len(pairs) == 3 # AB at 1.0 should come first (most co_changes) assert pairs[0].co_change_rate == 1.0 @pytest.mark.asyncio async def test_ET_20_a_in_test_flag_set_for_test_files( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """a_in_test / b_in_test flags set when file path contains 'test'.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "tests/test_billing.py::test_charge", "src/ledger.py::record", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) pairs = await _fetch_pairs(db_session, repo.repo_id) assert len(pairs) == 1 p = pairs[0] # one side is in test, the other is not assert p.a_in_test != p.b_in_test @pytest.mark.asyncio async def test_ET_21_result_metadata_keys( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Provider returns (key, payload) tuples with expected metadata keys.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) await db_session.commit() result = await _run_provider(db_session, repo.repo_id, c2) assert len(result) == 1 key, payload = result[0] assert key == "intel.code.entangle" assert "count" in payload assert "commits_analysed" in payload assert "truncated" in payload @pytest.mark.asyncio async def test_ET_22_ref_stored_on_pair_row( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """The ref used for the walk is stored on each pair row.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) pairs = await _fetch_pairs(db_session, repo.repo_id) assert len(pairs) == 1 assert pairs[0].ref == c2 @pytest.mark.asyncio async def test_ET_23_multiple_disconnected_pairs( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Two independent high-rate pairs are both stored correctly.""" c1, c2, c3 = _cid(), _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) await _seed_commit(db_session, repo.repo_id, c3, [c2]) for cid in [c1, c2, c3]: await _seed_history(db_session, repo.repo_id, cid, [ "src/alpha.py::a1", "src/beta.py::b1", # pair 1 "src/gamma.py::c1", "src/delta.py::d1", # pair 2 ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c3) pairs = await _fetch_pairs(db_session, repo.repo_id) # At least 2 cross-file pairs assert len(pairs) >= 2 @pytest.mark.asyncio async def test_ET_24_structurally_linked_defaults_false( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """structurally_linked is always False — not yet implemented.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) pairs = await _fetch_pairs(db_session, repo.repo_id) assert all(p.structurally_linked is False for p in pairs) @pytest.mark.asyncio async def test_ET_25_same_file_false_on_stored_pair( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """same_file is always False since same-file pairs are excluded.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) pairs = await _fetch_pairs(db_session, repo.repo_id) assert all(p.same_file is False for p in pairs) # ───────────────────────────────────────────────────────────────────────────── # Tier 4 — Stress: large datasets # ───────────────────────────────────────────────────────────────────────────── class TestEntangleStress: @pytest.mark.asyncio async def test_ET_26_max_pairs_cap_respected( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Provider stores at most MAX_PAIRS pairs even when more exist.""" provider = EntangleProvider() # Build enough distinct cross-file pairs by spreading symbols # across 35 files × 2 symbols = 70 symbols → 70*69/2 ≈ 2415 pairs before filter commits = [_cid() for _ in range(3)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid addrs = [f"src/file_{i}.py::fn_{j}" for i in range(35) for j in range(2)] for cid in commits: await _seed_history(db_session, repo.repo_id, cid, addrs) await db_session.commit() await _run_provider(db_session, repo.repo_id, commits[-1]) pairs = await _fetch_pairs(db_session, repo.repo_id) assert len(pairs) <= provider._MAX_PAIRS @pytest.mark.asyncio async def test_ET_27_mass_commit_excluded( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Commits touching > MAX_SYMBOLS_PER_COMMIT symbols are skipped.""" provider = EntangleProvider() # Seed two legit commits and one mass commit c_legit1, c_legit2, c_mass = _cid(), _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c_legit1) await _seed_commit(db_session, repo.repo_id, c_legit2, [c_legit1]) await _seed_commit(db_session, repo.repo_id, c_mass, [c_legit2]) # Legit commits: A and B co-change for cid in [c_legit1, c_legit2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) # Mass commit: 600 symbols big_addrs = [f"src/gen_{i}.py::fn" for i in range(provider._MAX_SYMBOLS_PER_COMMIT + 100)] await _seed_history(db_session, repo.repo_id, c_mass, big_addrs) await db_session.commit() result = await _run_provider(db_session, repo.repo_id, c_mass) # Provider should still return the AB pair from legit commits pairs = await _fetch_pairs(db_session, repo.repo_id) assert any( ("src/a.py::fn_a" in (p.symbol_a, p.symbol_b)) for p in pairs ) @pytest.mark.asyncio async def test_ET_28_500_symbols_completes( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """500 symbols across 10 commits completes without error.""" commits = [_cid() for _ in range(10)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid # 250 files × 2 symbols = 500 symbols (all under mass-commit limit) addrs = [f"src/f{i}.py::fn_{j}" for i in range(250) for j in range(2)] for cid in commits: await _seed_history(db_session, repo.repo_id, cid, addrs) await db_session.commit() result = await _run_provider(db_session, repo.repo_id, commits[-1]) assert result # no exception @pytest.mark.asyncio async def test_ET_29_result_count_matches_stored_rows( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """metadata 'count' matches the actual number of rows stored.""" c1, c2, c3 = _cid(), _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) await _seed_commit(db_session, repo.repo_id, c3, [c2]) for cid in [c1, c2, c3]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", "src/c.py::fn_c", ]) await db_session.commit() result = await _run_provider(db_session, repo.repo_id, c3) key, payload = result[0] pairs = await _fetch_pairs(db_session, repo.repo_id) assert payload["count"] == len(pairs) @pytest.mark.asyncio async def test_ET_30_bfs_walk_cap_limits_commits_analysed( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """commits_analysed never exceeds MAX_WALK.""" provider = EntangleProvider() cap = provider._MAX_WALK commits = [] prev = None for _ in range(min(cap + 5, 50)): # keep it fast; just verify cap exists cid = _cid() await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) commits.append(cid) prev = cid await _seed_history(db_session, repo.repo_id, commits[0], [ "src/a.py::fn_a", "src/b.py::fn_b", ]) await db_session.commit() result = await _run_provider(db_session, repo.repo_id, commits[-1]) if result: key, payload = result[0] assert payload["commits_analysed"] <= cap # ───────────────────────────────────────────────────────────────────────────── # Tier 5 — State: idempotency, incremental updates, stale-row purge # ───────────────────────────────────────────────────────────────────────────── class TestEntangleState: @pytest.mark.asyncio async def test_ET_31_idempotent_rerun_same_rows( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Running the provider twice produces the same set of rows.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) first_run = await _fetch_pairs(db_session, repo.repo_id) await _run_provider(db_session, repo.repo_id, c2) second_run = await _fetch_pairs(db_session, repo.repo_id) assert len(first_run) == len(second_run) assert {(p.symbol_a, p.symbol_b) for p in first_run} == { (p.symbol_a, p.symbol_b) for p in second_run } @pytest.mark.asyncio async def test_ET_32_stale_rows_purged_on_rerun( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Re-run deletes stale pairs that no longer exist in fresh data.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) first_count_result = await db_session.execute( sa.select(sa.func.count()).select_from(MusehubIntelEntangle) .where(MusehubIntelEntangle.repo_id == repo.repo_id) ) assert first_count_result.scalar_one() == 1 # Add a new commit that breaks the entangle signal (different symbols) c3 = _cid() await _seed_commit(db_session, repo.repo_id, c3, [c2]) await _seed_history(db_session, repo.repo_id, c3, [ "src/x.py::fn_x", # completely different ]) # Re-run; AB pair should still exist (still valid from c1, c2) await db_session.commit() await _run_provider(db_session, repo.repo_id, c3) second_run = await _fetch_pairs(db_session, repo.repo_id) assert len(second_run) == 1 # AB still valid @pytest.mark.asyncio async def test_ET_33_incremental_new_pair_appears( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """After adding commits, a new pair materialises on re-run.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) before = await _fetch_pairs(db_session, repo.repo_id) # Two new commits introducing a CD pair c3, c4 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c3, [c2]) await _seed_commit(db_session, repo.repo_id, c4, [c3]) for cid in [c3, c4]: await _seed_history(db_session, repo.repo_id, cid, [ "src/c.py::fn_c", "src/d.py::fn_d", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c4) after = await _fetch_pairs(db_session, repo.repo_id) assert len(after) > len(before) @pytest.mark.asyncio async def test_ET_34_no_duplicate_pairs( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """No (symbol_a, symbol_b) duplicate rows for the same repo.""" c1, c2, c3 = _cid(), _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) await _seed_commit(db_session, repo.repo_id, c3, [c2]) for cid in [c1, c2, c3]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) await db_session.commit() for _ in range(3): await _run_provider(db_session, repo.repo_id, c3) pairs = await _fetch_pairs(db_session, repo.repo_id) keys = [(p.symbol_a, p.symbol_b) for p in pairs] assert len(keys) == len(set(keys)) @pytest.mark.asyncio async def test_ET_35_rate_updates_on_new_commits( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Rate increases when more co-change commits are added.""" # Initial: A in 3 commits, B in 3 commits, co=2 → rate=2/3 commits = [_cid() for _ in range(3)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid # A appears in all 3 for cid in commits: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn_a"]) # B co-changes only in first 2 for cid in commits[:2]: await _seed_history(db_session, repo.repo_id, cid, ["src/b.py::fn_b"]) await db_session.commit() await _run_provider(db_session, repo.repo_id, commits[-1]) pairs_before = await _fetch_pairs(db_session, repo.repo_id) rate_before = pairs_before[0].co_change_rate if pairs_before else 0.0 # Now add a commit where both co-change again c_new = _cid() await _seed_commit(db_session, repo.repo_id, c_new, [commits[-1]]) await _seed_history(db_session, repo.repo_id, c_new, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c_new) pairs_after = await _fetch_pairs(db_session, repo.repo_id) rate_after = pairs_after[0].co_change_rate if pairs_after else 0.0 assert rate_after >= rate_before @pytest.mark.asyncio async def test_ET_36_truncated_flag_true_when_over_cap( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """truncated=True when more pairs were found than MAX_PAIRS.""" provider = EntangleProvider() commits = [_cid() for _ in range(3)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid # 35 files × 2 syms → ~2415 cross-file pairs, exceeds MAX_PAIRS=500 addrs = [f"src/file_{i}.py::fn_{j}" for i in range(35) for j in range(2)] for cid in commits: await _seed_history(db_session, repo.repo_id, cid, addrs) await db_session.commit() result = await _run_provider(db_session, repo.repo_id, commits[-1]) key, payload = result[0] assert payload["truncated"] is True # ───────────────────────────────────────────────────────────────────────────── # Tier 6 — Integrity: edge cases and data quality # ───────────────────────────────────────────────────────────────────────────── class TestEntangleIntegrity: @pytest.mark.asyncio async def test_ET_37_address_with_only_import_produces_no_pair( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """A commit with only import pseudo-symbols generates no pair rows.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::import::os", "src/b.py::import::sys", "src/c.py::import::typing", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) pairs = await _fetch_pairs(db_session, repo.repo_id) assert pairs == [] @pytest.mark.asyncio async def test_ET_38_mixed_valid_and_import_symbols( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Import symbols in same commit as real symbols don't pair with real ones.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::real_fn", "src/b.py::import::os", # filtered "src/c.py::other_fn", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) pairs = await _fetch_pairs(db_session, repo.repo_id) for p in pairs: assert "::import::" not in p.symbol_a assert "::import::" not in p.symbol_b @pytest.mark.asyncio async def test_ET_39_unknown_ref_in_bfs_returns_empty( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """BFS from unknown ref produces no pairs (ref not in commit table).""" unknown_ref = _cid() result = await _run_provider(db_session, repo.repo_id, unknown_ref) assert result == [] @pytest.mark.asyncio async def test_ET_40_co_changes_count_exact( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """co_changes is the exact number of commits where both symbols appeared.""" n_together = 4 n_solo_a = 2 commits_together = [_cid() for _ in range(n_together)] commits_a_only = [_cid() for _ in range(n_solo_a)] all_commits = commits_together + commits_a_only prev = None for cid in all_commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid for cid in commits_together: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) for cid in commits_a_only: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn_a"]) await db_session.commit() await _run_provider(db_session, repo.repo_id, all_commits[-1]) pairs = await _fetch_pairs(db_session, repo.repo_id) assert len(pairs) == 1 # union = count_a + count_b - co_changes = (n_together + n_solo_a) + n_together - n_together union = n_together + n_solo_a # = 6 assert pairs[0].co_changes == n_together assert pairs[0].commits_both_active == union assert pairs[0].co_change_rate == round(n_together / union, 6) @pytest.mark.asyncio async def test_ET_41_rate_capped_at_one( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """co_change_rate is never > 1.0.""" commits = [_cid() for _ in range(5)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid for cid in commits: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, commits[-1]) pairs = await _fetch_pairs(db_session, repo.repo_id) for p in pairs: assert 0.0 <= p.co_change_rate <= 1.0 # ───────────────────────────────────────────────────────────────────────────── # Tier 7 — Performance: timing bounds # ───────────────────────────────────────────────────────────────────────────── class TestEntanglePerformance: @pytest.mark.asyncio async def test_ET_42_ten_commits_ten_symbols_under_500ms( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """10 commits × 10 symbols completes in under 500 ms.""" commits = [_cid() for _ in range(10)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid addrs = [f"src/file_{i}.py::fn" for i in range(10)] for cid in commits: await _seed_history(db_session, repo.repo_id, cid, addrs) await db_session.commit() t0 = time.monotonic() await _run_provider(db_session, repo.repo_id, commits[-1]) elapsed = time.monotonic() - t0 assert elapsed < 0.5, f"took {elapsed:.3f}s" @pytest.mark.asyncio async def test_ET_43_100_commits_20_symbols_under_2s( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """100 commits × 20 symbols completes in under 2 s.""" commits = [_cid() for _ in range(100)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid addrs = [f"src/f{i}.py::fn" for i in range(20)] for cid in commits: await _seed_history(db_session, repo.repo_id, cid, addrs) await db_session.commit() t0 = time.monotonic() await _run_provider(db_session, repo.repo_id, commits[-1]) elapsed = time.monotonic() - t0 assert elapsed < 2.0, f"took {elapsed:.3f}s" @pytest.mark.asyncio async def test_ET_44_empty_repo_under_50ms( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Empty repo fast-path exits under 50 ms.""" t0 = time.monotonic() await _run_provider(db_session, repo.repo_id, _cid()) elapsed = time.monotonic() - t0 assert elapsed < 0.05, f"took {elapsed:.3f}s" @pytest.mark.asyncio async def test_ET_45_rerun_same_speed_as_first( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Second run is not significantly slower than first run.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) await db_session.commit() t1 = time.monotonic() await _run_provider(db_session, repo.repo_id, c2) d1 = time.monotonic() - t1 t2 = time.monotonic() await _run_provider(db_session, repo.repo_id, c2) d2 = time.monotonic() - t2 # second run should not be more than 5× slower assert d2 < max(d1 * 5, 0.5) @pytest.mark.asyncio async def test_ET_46_point_lookup_fast( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Fetching pairs for a specific repo is sub-10 ms after provider run.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) t0 = time.monotonic() await _fetch_pairs(db_session, repo.repo_id) elapsed = time.monotonic() - t0 assert elapsed < 0.01, f"took {elapsed:.3f}s" # ───────────────────────────────────────────────────────────────────────────── # Tier 8 — Security: injection, isolation, address length # ───────────────────────────────────────────────────────────────────────────── class TestEntangleSecurity: @pytest.mark.asyncio async def test_ET_47_sql_injection_in_address_stored_verbatim( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """SQL injection strings in symbol addresses are stored as-is (no execution).""" inject = "src/a.py::fn'; DROP TABLE musehub_intel_entangle; --" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ inject, "src/b.py::fn_b", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) # Table must still exist pairs = await _fetch_pairs(db_session, repo.repo_id) # The injection address should appear verbatim or be stored without issue assert isinstance(pairs, list) @pytest.mark.asyncio async def test_ET_48_xss_payload_in_address_stored_safely( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """XSS payloads in addresses are stored without execution.""" xss = "src/.py::fn" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ xss, "src/b.py::fn_b", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) pairs = await _fetch_pairs(db_session, repo.repo_id) assert isinstance(pairs, list) @pytest.mark.asyncio async def test_ET_49_repo_isolation_strict( self, db_session: AsyncSession, two_repos: tuple[MusehubRepo, MusehubRepo] ) -> None: """Pairs from repo A are never visible when querying repo B.""" r1, r2 = two_repos c1, c2 = _cid(), _cid() await _seed_commit(db_session, r1.repo_id, c1) await _seed_commit(db_session, r1.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, r1.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) await db_session.commit() await _run_provider(db_session, r1.repo_id, c2) # Repo 2 has no data pairs_r2 = await _fetch_pairs(db_session, r2.repo_id) assert pairs_r2 == [] @pytest.mark.asyncio async def test_ET_50_repo_isolation_no_cross_contamination( self, db_session: AsyncSession, two_repos: tuple[MusehubRepo, MusehubRepo] ) -> None: """Two repos each get their own independent pair sets.""" r1, r2 = two_repos for repo in [r1, r2]: c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) pairs_r1 = await _fetch_pairs(db_session, r1.repo_id) pairs_r2 = await _fetch_pairs(db_session, r2.repo_id) assert len(pairs_r1) == 1 assert len(pairs_r2) == 1 assert pairs_r1[0].repo_id == r1.repo_id assert pairs_r2[0].repo_id == r2.repo_id @pytest.mark.asyncio async def test_ET_51_delete_old_provider_run_on_rerun( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Rerun for a different ref purges all previous rows for the repo.""" c1, c2, c3 = _cid(), _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) await _seed_commit(db_session, repo.repo_id, c3, [c2]) for cid in [c1, c2, c3]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn_a", "src/b.py::fn_b", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) await _run_provider(db_session, repo.repo_id, c3) pairs = await _fetch_pairs(db_session, repo.repo_id) # All stored rows must point to the latest ref for p in pairs: assert p.ref == c3 @pytest.mark.asyncio async def test_ET_52_unicode_in_address_handled( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Unicode characters in addresses do not crash the provider.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/música.py::canción", "src/b.py::fn_b", ]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) pairs = await _fetch_pairs(db_session, repo.repo_id) assert isinstance(pairs, list) @pytest.mark.asyncio async def test_ET_53_long_address_does_not_exceed_column_width( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Addresses truncated to 512 chars by the route layer don't crash storage.""" long_addr_a = "src/" + "a" * 500 + ".py::fn" long_addr_b = "src/" + "b" * 500 + ".py::fn" # These exceed 512 chars — simulate what the route-layer would see # The provider itself stores verbatim; the model column is VARCHAR(512) # so the DB will reject anything longer. Just verify the provider # doesn't crash on realistic (under 512) addresses. addr_a = f"{long_addr_a[:100]}::fn_a" addr_b = f"{long_addr_b[:100]}::fn_b" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [addr_a, addr_b]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) pairs = await _fetch_pairs(db_session, repo.repo_id) assert len(pairs) == 1 @pytest.mark.asyncio async def test_ET_54_newline_in_address_stored_verbatim( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Newline characters in addresses don't trigger injections or errors.""" addr_a = "src/a.py::fn\n_a" addr_b = "src/b.py::fn_b" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [addr_a, addr_b]) await db_session.commit() await _run_provider(db_session, repo.repo_id, c2) pairs = await _fetch_pairs(db_session, repo.repo_id) assert isinstance(pairs, list)