"""TDD spec for CouplingProvider — issue #15, Phase 5. Verifies that CouplingProvider reproduces the same file co-change analysis as ``muse code coupling``: file derivation from symbol addresses, bare-path handling, mass-commit exclusion, canonical pair ordering, MAX_PAIRS cap, and strict repo isolation. Seven test tiers (49 cases) ---------------------------- Unit CP_01 – CP_08 file derivation, heat modifier, pair canonicalisation Integration CP_09 – CP_18 provider upserts, re-runs, counts E2E CP_19 – CP_25 full seeded scenarios Performance CP_26 – CP_32 timing bounds State CP_33 – CP_38 idempotency, stale-row purge, incremental updates Security CP_39 – CP_44 injection strings, repo isolation Stress CP_45 – CP_49 MAX_PAIRS cap, mass-commit exclusion, BFS cap """ from __future__ import annotations import secrets import time from collections import defaultdict from datetime import datetime, timezone import pytest import pytest_asyncio import sqlalchemy as sa from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from muse.core.types import fake_id, long_id from musehub.db.musehub_intel_models import MusehubIntelCoupling, MusehubSymbolHistoryEntry from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubRepo from musehub.services.musehub_intel_providers import CouplingProvider from musehub.types.json_types import JSONObject from musehub.api.routes.musehub.ui_intel import _cp_heat, _cp_short from tests.factories import create_repo # ───────────────────────────────────────────────────────────────────────────── # Helpers # ───────────────────────────────────────────────────────────────────────────── def _cid() -> str: return long_id(secrets.token_hex(32)) async def _seed_commit( session: AsyncSession, repo_id: str, commit_id: str, parent_ids: list[str] | None = None, ) -> None: stmt = ( pg_insert(MusehubCommit) .values( commit_id=commit_id, message="test commit", author="test", branch="dev", parent_ids=parent_ids or [], snapshot_id=None, timestamp=datetime.now(timezone.utc), ) .on_conflict_do_nothing() ) await session.execute(stmt) ref_stmt = ( pg_insert(MusehubCommitRef) .values(repo_id=repo_id, commit_id=commit_id) .on_conflict_do_nothing() ) await session.execute(ref_stmt) async def _seed_history( session: AsyncSession, repo_id: str, commit_id: str, addresses: list[str], ) -> None: for addr in addresses: stmt = ( pg_insert(MusehubSymbolHistoryEntry) .values( repo_id=repo_id, address=addr, commit_id=commit_id, committed_at=datetime.now(timezone.utc), op="modify", ) .on_conflict_do_nothing() ) await session.execute(stmt) async def _run(session: AsyncSession, repo_id: str, ref: str) -> list[tuple[str, JSONObject]]: return await CouplingProvider().compute(session, repo_id, ref, {}) async def _fetch(session: AsyncSession, repo_id: str) -> list[MusehubIntelCoupling]: result = await session.execute( sa.select(MusehubIntelCoupling) .where(MusehubIntelCoupling.repo_id == repo_id) .order_by(sa.desc(MusehubIntelCoupling.co_changes)) ) return list(result.scalars().all()) # ───────────────────────────────────────────────────────────────────────────── # Fixtures # ───────────────────────────────────────────────────────────────────────────── @pytest_asyncio.fixture async def repo(db_session: AsyncSession) -> MusehubRepo: return await create_repo(db_session, owner="testuser", slug="couplingprovider") @pytest_asyncio.fixture async def two_repos(db_session: AsyncSession) -> tuple[MusehubRepo, MusehubRepo]: r1 = await create_repo(db_session, owner="testuser", slug="cp-repo-1") r2 = await create_repo(db_session, owner="testuser", slug="cp-repo-2") return r1, r2 # ───────────────────────────────────────────────────────────────────────────── # Tier 1 — Unit: file derivation, heat modifier, pair canonicalisation # ───────────────────────────────────────────────────────────────────────────── class TestCouplingUnit: """Pure-function tests — no database required.""" def test_CP_01_file_from_symbol_address(self) -> None: """File extracted correctly from symbol address.""" addr = "src/billing.py::charge" file = addr.split("::")[0] if "::" in addr else addr assert file == "src/billing.py" def test_CP_02_bare_path_is_file(self) -> None: """Bare path (no '::') treated directly as filename.""" addr = "cloudflare" file = addr.split("::")[0] if "::" in addr else addr assert file == "cloudflare" def test_CP_03_pair_key_canonical_a_lt_b(self) -> None: """Pair key is always (a, b) where a < b lexicographically.""" files = ["src/z.py", "src/a.py"] canonical = tuple(sorted(files)) assert canonical == ("src/a.py", "src/z.py") def test_CP_04_same_file_pair_excluded(self) -> None: """Two symbols from the same file produce no file pair.""" addr_a = "src/billing.py::charge" addr_b = "src/billing.py::refund" file_a = addr_a.split("::")[0] file_b = addr_b.split("::")[0] assert file_a == file_b def test_CP_05_heat_low(self) -> None: """co_changes < 10 → empty modifier (accent fill).""" assert _cp_heat(1) == "" assert _cp_heat(9) == "" def test_CP_06_heat_medium(self) -> None: """co_changes 10–19 → 'medium' modifier (warning fill).""" assert _cp_heat(10) == "medium" assert _cp_heat(19) == "medium" def test_CP_07_heat_high(self) -> None: """co_changes >= 20 → 'high' modifier (danger fill).""" assert _cp_heat(20) == "high" assert _cp_heat(99) == "high" def test_CP_08_min_co_changes_constant(self) -> None: """_MIN_CO_CHANGES is 2 — pairs below this are noise.""" assert CouplingProvider._MIN_CO_CHANGES == 2 # ───────────────────────────────────────────────────────────────────────────── # Tier 2 — Integration: provider upserts, counts, re-runs # ───────────────────────────────────────────────────────────────────────────── class TestCouplingIntegration: @pytest.mark.asyncio async def test_CP_09_empty_repo_returns_empty( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Provider on a repo with no commits returns [] and stores no rows.""" result = await _run(db_session, repo.repo_id, _cid()) assert result == [] assert await _fetch(db_session, repo.repo_id) == [] @pytest.mark.asyncio async def test_CP_10_no_history_entries_returns_empty( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Commits exist but no history entries → no pairs stored.""" c1 = _cid() await _seed_commit(db_session, repo.repo_id, c1) await db_session.commit() result = await _run(db_session, repo.repo_id, c1) assert result == [] @pytest.mark.asyncio async def test_CP_11_single_co_change_below_threshold( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """One co-change commit → co_changes=1, below _MIN_CO_CHANGES=2, no row.""" c1 = _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_history(db_session, repo.repo_id, c1, ["src/a.py::fn_a", "src/b.py::fn_b"]) await db_session.commit() await _run(db_session, repo.repo_id, c1) assert await _fetch(db_session, repo.repo_id) == [] @pytest.mark.asyncio async def test_CP_12_two_co_changes_produces_one_pair( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Exactly 2 co-change commits → 1 pair with co_changes=2.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn_a", "src/b.py::fn_b"]) await db_session.commit() await _run(db_session, repo.repo_id, c2) pairs = await _fetch(db_session, repo.repo_id) assert len(pairs) == 1 assert pairs[0].co_changes == 2 @pytest.mark.asyncio async def test_CP_13_three_files_produces_three_pairs( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Three files in a commit → 3 cross-file pairs (A↔B, A↔C, B↔C).""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/a.py::fn", "src/b.py::fn", "src/c.py::fn", ]) await db_session.commit() await _run(db_session, repo.repo_id, c2) pairs = await _fetch(db_session, repo.repo_id) assert len(pairs) == 3 @pytest.mark.asyncio async def test_CP_14_same_file_symbols_no_pair( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Two symbols from the same file never produce a pair.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [ "src/billing.py::charge", "src/billing.py::refund", ]) await db_session.commit() await _run(db_session, repo.repo_id, c2) assert await _fetch(db_session, repo.repo_id) == [] @pytest.mark.asyncio async def test_CP_15_pair_stored_canonical_a_lt_b( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Stored pair always has file_a <= file_b lexicographically.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, ["src/z.py::zfn", "src/a.py::afn"]) await db_session.commit() await _run(db_session, repo.repo_id, c2) pairs = await _fetch(db_session, repo.repo_id) assert len(pairs) == 1 assert pairs[0].file_a <= pairs[0].file_b @pytest.mark.asyncio async def test_CP_16_ref_column_populated( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """ref column on each row matches the HEAD ref passed to compute().""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, c2) pairs = await _fetch(db_session, repo.repo_id) assert all(p.ref == c2 for p in pairs) @pytest.mark.asyncio async def test_CP_17_co_changes_count_exact( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """co_changes is the exact number of commits where both files appeared.""" commits = [_cid() for _ in range(4)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid for cid in commits: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, commits[-1]) pairs = await _fetch(db_session, repo.repo_id) assert pairs[0].co_changes == 4 @pytest.mark.asyncio async def test_CP_18_result_key_correct( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Provider returns result tuple with key 'intel.code.coupling'.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() result = await _run(db_session, repo.repo_id, c2) assert len(result) == 1 key, payload = result[0] assert key == "intel.code.coupling" assert "count" in payload assert "commits_analysed" in payload assert "truncated" in payload # ───────────────────────────────────────────────────────────────────────────── # Tier 3 — E2E: full seeded scenarios # ───────────────────────────────────────────────────────────────────────────── class TestCouplingE2E: @pytest.mark.asyncio async def test_CP_19_three_files_correct_ranking( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """A↔B co-changes more than A↔C → A↔B ranked first.""" commits = [_cid() for _ in range(5)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid # A and B in all 5 commits for cid in commits: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) # A and C only in first 2 for cid in commits[:2]: await _seed_history(db_session, repo.repo_id, cid, ["src/c.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, commits[-1]) pairs = await _fetch(db_session, repo.repo_id) assert pairs[0].co_changes == 5 assert pairs[0].file_a in ("src/a.py", "src/b.py") assert pairs[0].file_b in ("src/a.py", "src/b.py") @pytest.mark.asyncio async def test_CP_20_result_count_matches_stored_rows( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """metadata 'count' equals the number of rows actually stored.""" c1, c2, c3 = _cid(), _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) await _seed_commit(db_session, repo.repo_id, c3, [c2]) for cid in [c1, c2, c3]: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn", "src/c.py::fn"]) await db_session.commit() result = await _run(db_session, repo.repo_id, c3) key, payload = result[0] pairs = await _fetch(db_session, repo.repo_id) assert payload["count"] == len(pairs) @pytest.mark.asyncio async def test_CP_21_truncated_true_over_max_pairs( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """truncated=True when raw pair count exceeds MAX_PAIRS.""" provider = CouplingProvider() commits = [_cid() for _ in range(3)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid # 21 files → 210 pairs, exceeds MAX_PAIRS=200 addrs = [f"src/file_{i}.py::fn" for i in range(21)] for cid in commits: await _seed_history(db_session, repo.repo_id, cid, addrs) await db_session.commit() result = await _run(db_session, repo.repo_id, commits[-1]) key, payload = result[0] assert payload["truncated"] is True @pytest.mark.asyncio async def test_CP_22_min_co_filter_in_route_helpers( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Pairs with co_changes below min_co are excluded from route results.""" # Build: A↔B = 5, A↔C = 2 → with min_co=3 only A↔B appears commits_ab = [_cid() for _ in range(5)] commits_ac = [_cid() for _ in range(2)] all_commits = commits_ab + commits_ac prev = None for cid in all_commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid for cid in commits_ab: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) for cid in commits_ac: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/c.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, all_commits[-1]) # Simulate route min_co=3 filter repo_id = repo.repo_id result = await db_session.execute( sa.select(MusehubIntelCoupling) .where( MusehubIntelCoupling.repo_id == repo_id, MusehubIntelCoupling.co_changes >= 3, ) .order_by(sa.desc(MusehubIntelCoupling.co_changes)) ) filtered = result.scalars().all() assert all(p.co_changes >= 3 for p in filtered) assert len(filtered) == 1 assert filtered[0].co_changes == 5 @pytest.mark.asyncio async def test_CP_23_top_limit_respected( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """SQL LIMIT top correctly caps the number of rows returned.""" commits = [_cid() for _ in range(3)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid # 10 files → 45 pairs addrs = [f"src/f{i}.py::fn" for i in range(10)] for cid in commits: await _seed_history(db_session, repo.repo_id, cid, addrs) await db_session.commit() await _run(db_session, repo.repo_id, commits[-1]) result = await db_session.execute( sa.select(MusehubIntelCoupling) .where(MusehubIntelCoupling.repo_id == repo.repo_id) .order_by(sa.desc(MusehubIntelCoupling.co_changes)) .limit(5) ) assert len(result.scalars().all()) <= 5 @pytest.mark.asyncio async def test_CP_24_heat_high_on_stored_pairs( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """_cp_heat returns 'high' for pairs with co_changes >= 20.""" commits = [_cid() for _ in range(22)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid for cid in commits: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, commits[-1]) pairs = await _fetch(db_session, repo.repo_id) assert pairs[0].co_changes >= 20 assert _cp_heat(pairs[0].co_changes) == "high" @pytest.mark.asyncio async def test_CP_25_bar_pct_100_for_top_pair( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Top pair always gets bar_pct=100 (it is the normalisation anchor).""" commits = [_cid() for _ in range(5)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid for cid in commits: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, commits[-1]) pairs = await _fetch(db_session, repo.repo_id) max_co = pairs[0].co_changes bar_pct = round((pairs[0].co_changes / max_co) * 100) assert bar_pct == 100 # ───────────────────────────────────────────────────────────────────────────── # Tier 4 — Performance: timing bounds # ───────────────────────────────────────────────────────────────────────────── class TestCouplingPerformance: @pytest.mark.asyncio async def test_CP_26_ten_commits_ten_files_under_500ms( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """10 commits × 10 files completes in under 500 ms.""" commits = [_cid() for _ in range(10)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid addrs = [f"src/f{i}.py::fn" for i in range(10)] for cid in commits: await _seed_history(db_session, repo.repo_id, cid, addrs) await db_session.commit() t0 = time.monotonic() await _run(db_session, repo.repo_id, commits[-1]) assert time.monotonic() - t0 < 0.5 @pytest.mark.asyncio async def test_CP_27_100_commits_20_files_under_2s( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """100 commits × 20 files completes in under 2 s.""" commits = [_cid() for _ in range(100)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid addrs = [f"src/f{i}.py::fn" for i in range(20)] for cid in commits: await _seed_history(db_session, repo.repo_id, cid, addrs) await db_session.commit() t0 = time.monotonic() await _run(db_session, repo.repo_id, commits[-1]) assert time.monotonic() - t0 < 2.0 @pytest.mark.asyncio async def test_CP_28_empty_repo_fast_path_under_50ms( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Empty repo fast-path exits under 50 ms.""" t0 = time.monotonic() await _run(db_session, repo.repo_id, _cid()) assert time.monotonic() - t0 < 0.05 @pytest.mark.asyncio async def test_CP_29_rerun_not_5x_slower( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Second run is not more than 5× slower than the first.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() t1 = time.monotonic(); await _run(db_session, repo.repo_id, c2); d1 = time.monotonic() - t1 t2 = time.monotonic(); await _run(db_session, repo.repo_id, c2); d2 = time.monotonic() - t2 assert d2 < max(d1 * 5, 0.5) @pytest.mark.asyncio async def test_CP_30_point_lookup_under_10ms( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Fetching pairs for a repo is sub-10 ms after the provider run.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, c2) t0 = time.monotonic() await _fetch(db_session, repo.repo_id) assert time.monotonic() - t0 < 0.01 @pytest.mark.asyncio async def test_CP_31_200_pairs_query_fast( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Fetching full 200-pair leaderboard is sub-50 ms.""" commits = [_cid() for _ in range(3)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid # 21 files → 210 pairs → stored as 200 (MAX_PAIRS) addrs = [f"src/f{i}.py::fn" for i in range(21)] for cid in commits: await _seed_history(db_session, repo.repo_id, cid, addrs) await db_session.commit() await _run(db_session, repo.repo_id, commits[-1]) t0 = time.monotonic() await _fetch(db_session, repo.repo_id) assert time.monotonic() - t0 < 0.05 @pytest.mark.asyncio async def test_CP_32_dashboard_preview_query_fast( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Dashboard preview (top 3, LIMIT query) completes under 20 ms.""" commits = [_cid() for _ in range(3)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid addrs = [f"src/f{i}.py::fn" for i in range(6)] for cid in commits: await _seed_history(db_session, repo.repo_id, cid, addrs) await db_session.commit() await _run(db_session, repo.repo_id, commits[-1]) t0 = time.monotonic() await db_session.execute( sa.select(MusehubIntelCoupling) .where(MusehubIntelCoupling.repo_id == repo.repo_id) .order_by(sa.desc(MusehubIntelCoupling.co_changes)) .limit(3) ) assert time.monotonic() - t0 < 0.02 # ───────────────────────────────────────────────────────────────────────────── # Tier 5 — State: idempotency, stale-row purge, incremental updates # ───────────────────────────────────────────────────────────────────────────── class TestCouplingState: @pytest.mark.asyncio async def test_CP_33_idempotent_two_runs( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Running the provider twice produces identical rows.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, c2) first = {(p.file_a, p.file_b, p.co_changes) for p in await _fetch(db_session, repo.repo_id)} await _run(db_session, repo.repo_id, c2) second = {(p.file_a, p.file_b, p.co_changes) for p in await _fetch(db_session, repo.repo_id)} assert first == second @pytest.mark.asyncio async def test_CP_34_stale_rows_purged_on_rerun( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Re-run deletes all old rows before inserting fresh set.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, c2) count_after_first = (await db_session.execute( sa.select(sa.func.count()).select_from(MusehubIntelCoupling) .where(MusehubIntelCoupling.repo_id == repo.repo_id) )).scalar_one() await _run(db_session, repo.repo_id, c2) count_after_second = (await db_session.execute( sa.select(sa.func.count()).select_from(MusehubIntelCoupling) .where(MusehubIntelCoupling.repo_id == repo.repo_id) )).scalar_one() assert count_after_first == count_after_second @pytest.mark.asyncio async def test_CP_35_incremental_new_pair_appears( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """After adding commits, a new pair materialises on re-run.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, c2) before = len(await _fetch(db_session, repo.repo_id)) c3, c4 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c3, [c2]) await _seed_commit(db_session, repo.repo_id, c4, [c3]) for cid in [c3, c4]: await _seed_history(db_session, repo.repo_id, cid, ["src/c.py::fn", "src/d.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, c4) after = len(await _fetch(db_session, repo.repo_id)) assert after > before @pytest.mark.asyncio async def test_CP_36_no_duplicate_pairs_after_3_runs( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """No duplicate (file_a, file_b) rows after 3 consecutive runs.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() for _ in range(3): await _run(db_session, repo.repo_id, c2) pairs = await _fetch(db_session, repo.repo_id) keys = [(p.file_a, p.file_b) for p in pairs] assert len(keys) == len(set(keys)) @pytest.mark.asyncio async def test_CP_37_co_changes_increases_with_new_commits( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """co_changes increases when more co-change commits are added.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, c2) before = (await _fetch(db_session, repo.repo_id))[0].co_changes c3 = _cid() await _seed_commit(db_session, repo.repo_id, c3, [c2]) await _seed_history(db_session, repo.repo_id, c3, ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, c3) after = (await _fetch(db_session, repo.repo_id))[0].co_changes assert after > before @pytest.mark.asyncio async def test_CP_38_truncated_false_when_under_cap( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """truncated=False when pair count is within MAX_PAIRS.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() result = await _run(db_session, repo.repo_id, c2) key, payload = result[0] assert payload["truncated"] is False # ───────────────────────────────────────────────────────────────────────────── # Tier 6 — Security: injection, isolation, unicode # ───────────────────────────────────────────────────────────────────────────── class TestCouplingSecurity: @pytest.mark.asyncio async def test_CP_39_sql_injection_stored_verbatim( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """SQL injection in file path stored as-is; table survives.""" inject = "src/a.py::fn'; DROP TABLE musehub_intel_coupling; --" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [inject, "src/b.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, c2) pairs = await _fetch(db_session, repo.repo_id) assert isinstance(pairs, list) @pytest.mark.asyncio async def test_CP_40_xss_payload_stored_safely( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """XSS payload in file path stored without execution.""" xss = "src/.py::fn" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, [xss, "src/b.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, c2) pairs = await _fetch(db_session, repo.repo_id) assert isinstance(pairs, list) @pytest.mark.asyncio async def test_CP_41_repo_isolation_strict( self, db_session: AsyncSession, two_repos: tuple[MusehubRepo, MusehubRepo] ) -> None: """Pairs from repo A are never visible when querying repo B.""" r1, r2 = two_repos c1, c2 = _cid(), _cid() await _seed_commit(db_session, r1.repo_id, c1) await _seed_commit(db_session, r1.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, r1.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() await _run(db_session, r1.repo_id, c2) assert await _fetch(db_session, r2.repo_id) == [] @pytest.mark.asyncio async def test_CP_42_two_repos_independent_pairs( self, db_session: AsyncSession, two_repos: tuple[MusehubRepo, MusehubRepo] ) -> None: """Two repos each produce their own independent pair sets.""" r1, r2 = two_repos for repo in [r1, r2]: c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, c2) p1 = await _fetch(db_session, r1.repo_id) p2 = await _fetch(db_session, r2.repo_id) assert len(p1) == 1 and p1[0].repo_id == r1.repo_id assert len(p2) == 1 and p2[0].repo_id == r2.repo_id @pytest.mark.asyncio async def test_CP_43_rerun_updates_ref_column( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Re-run for a new ref updates the ref column on all rows.""" c1, c2, c3 = _cid(), _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) await _seed_commit(db_session, repo.repo_id, c3, [c2]) for cid in [c1, c2, c3]: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, c2) await _run(db_session, repo.repo_id, c3) pairs = await _fetch(db_session, repo.repo_id) assert all(p.ref == c3 for p in pairs) @pytest.mark.asyncio async def test_CP_44_unicode_in_path_handled( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Unicode characters in file paths do not crash the provider.""" c1, c2 = _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c1) await _seed_commit(db_session, repo.repo_id, c2, [c1]) for cid in [c1, c2]: await _seed_history(db_session, repo.repo_id, cid, ["src/música.py::canción", "src/b.py::fn"]) await db_session.commit() await _run(db_session, repo.repo_id, c2) assert isinstance(await _fetch(db_session, repo.repo_id), list) # ───────────────────────────────────────────────────────────────────────────── # Tier 7 — Stress: MAX_PAIRS cap, mass-commit exclusion, BFS cap # ───────────────────────────────────────────────────────────────────────────── class TestCouplingStress: @pytest.mark.asyncio async def test_CP_45_max_pairs_cap_respected( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Stored pair count never exceeds MAX_PAIRS.""" provider = CouplingProvider() commits = [_cid() for _ in range(3)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid # 21 files → 210 pairs; exceeds MAX_PAIRS=200 addrs = [f"src/file_{i}.py::fn" for i in range(21)] for cid in commits: await _seed_history(db_session, repo.repo_id, cid, addrs) await db_session.commit() await _run(db_session, repo.repo_id, commits[-1]) pairs = await _fetch(db_session, repo.repo_id) assert len(pairs) <= provider._MAX_PAIRS @pytest.mark.asyncio async def test_CP_46_mass_commit_excluded( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """Commits touching > MAX_FILES_PER_COMMIT files are skipped.""" provider = CouplingProvider() c_good1, c_good2, c_mass = _cid(), _cid(), _cid() await _seed_commit(db_session, repo.repo_id, c_good1) await _seed_commit(db_session, repo.repo_id, c_good2, [c_good1]) await _seed_commit(db_session, repo.repo_id, c_mass, [c_good2]) for cid in [c_good1, c_good2]: await _seed_history(db_session, repo.repo_id, cid, ["src/a.py::fn", "src/b.py::fn"]) # Mass commit: 250 distinct files big_addrs = [f"src/gen_{i}.py::fn" for i in range(provider._MAX_FILES_PER_COMMIT + 50)] await _seed_history(db_session, repo.repo_id, c_mass, big_addrs) await db_session.commit() await _run(db_session, repo.repo_id, c_mass) pairs = await _fetch(db_session, repo.repo_id) # The A↔B pair from good commits must still be present assert any( "src/a.py" in (p.file_a, p.file_b) for p in pairs ) @pytest.mark.asyncio async def test_CP_47_500_commits_completes( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """500 commits × 5 files completes without error.""" commits = [_cid() for _ in range(500)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid addrs = [f"src/f{i}.py::fn" for i in range(5)] for cid in commits: await _seed_history(db_session, repo.repo_id, cid, addrs) await db_session.commit() result = await _run(db_session, repo.repo_id, commits[-1]) assert result @pytest.mark.asyncio async def test_CP_48_result_count_matches_stored( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """metadata 'count' always equals len(stored rows).""" commits = [_cid() for _ in range(4)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid addrs = [f"src/f{i}.py::fn" for i in range(6)] for cid in commits: await _seed_history(db_session, repo.repo_id, cid, addrs) await db_session.commit() result = await _run(db_session, repo.repo_id, commits[-1]) key, payload = result[0] stored = await _fetch(db_session, repo.repo_id) assert payload["count"] == len(stored) @pytest.mark.asyncio async def test_CP_49_bfs_walk_cap( self, db_session: AsyncSession, repo: MusehubRepo ) -> None: """commits_analysed never exceeds MAX_WALK.""" provider = CouplingProvider() commits = [_cid() for _ in range(50)] prev = None for cid in commits: await _seed_commit(db_session, repo.repo_id, cid, [prev] if prev else []) prev = cid await _seed_history(db_session, repo.repo_id, commits[0], ["src/a.py::fn", "src/b.py::fn"]) await db_session.commit() result = await _run(db_session, repo.repo_id, commits[-1]) if result: key, payload = result[0] assert payload["commits_analysed"] <= provider._MAX_WALK # ───────────────────────────────────────────────────────────────────────────── # Helpers — _cp_short correctness # ───────────────────────────────────────────────────────────────────────────── class TestCpShort: """Unit tests for the _cp_short display helper.""" def test_deep_path_truncated_to_two_parts(self) -> None: assert _cp_short("musehub/services/musehub_wire.py") == "services/musehub_wire.py" def test_single_component_unchanged(self) -> None: assert _cp_short("musehub_wire.py") == "musehub_wire.py" def test_two_components_unchanged(self) -> None: assert _cp_short("services/musehub_wire.py") == "services/musehub_wire.py" def test_very_deep_path(self) -> None: assert _cp_short("a/b/c/d/e.py") == "d/e.py"