test_proposal_list_phase7.py
file-level
1
files
1
commits
0
hotspots
0
π§ dead
0
π₯ blast risk
| 1 | """Phase 7 β Index optimization test harness for issue #35. |
| 2 | |
| 3 | Tier 6 β Performance |
| 4 | Verifies that the 4 composite indexes added in migration 0044 are actually |
| 5 | used by the PostgreSQL query planner for the proposal list query patterns. |
| 6 | |
| 7 | Each test runs EXPLAIN (FORMAT JSON, ANALYZE FALSE) β no rows are executed, |
| 8 | so there are no timing side-effects. We assert that the plan contains an |
| 9 | "Index Scan" or "Bitmap Index Scan" node on the expected index name, proving |
| 10 | that the planner will choose the index rather than a seq scan. |
| 11 | |
| 12 | Not a correctness test β correctness is covered by Phase 1β3 tests. |
| 13 | Not a latency test β CI hardware is too variable for absolute thresholds. |
| 14 | """ |
| 15 | |
| 16 | from __future__ import annotations |
| 17 | |
| 18 | import json |
| 19 | import uuid |
| 20 | from datetime import datetime, timezone |
| 21 | |
| 22 | import pytest |
| 23 | from sqlalchemy import text |
| 24 | from sqlalchemy.ext.asyncio import AsyncSession |
| 25 | |
| 26 | from musehub.types.json_types import JSONValue, StrDict |
| 27 | |
| 28 | |
| 29 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 30 | # Helpers |
| 31 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 32 | |
| 33 | |
| 34 | def _now() -> datetime: |
| 35 | return datetime.now(tz=timezone.utc) |
| 36 | |
| 37 | |
| 38 | async def _make_repo(session: AsyncSession) -> str: |
| 39 | from musehub.core.genesis import compute_identity_id, compute_repo_id |
| 40 | owner = "p7user" |
| 41 | slug = f"repo-{uuid.uuid4().hex[:8]}" |
| 42 | owner_id = compute_identity_id(owner.encode()) |
| 43 | created_at = _now() |
| 44 | from musehub.db.musehub_repo_models import MusehubRepo |
| 45 | repo = MusehubRepo( |
| 46 | repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()), |
| 47 | name=slug, |
| 48 | owner=owner, |
| 49 | slug=slug, |
| 50 | visibility="public", |
| 51 | owner_user_id=owner_id, |
| 52 | description="", |
| 53 | tags=[], |
| 54 | created_at=created_at, |
| 55 | ) |
| 56 | session.add(repo) |
| 57 | await session.commit() |
| 58 | return repo.repo_id |
| 59 | |
| 60 | |
| 61 | def _plan_nodes(plan: JSONValue) -> list[str]: |
| 62 | """Recursively collect all 'Node Type' values from a EXPLAIN JSON plan.""" |
| 63 | nodes: list[str] = [] |
| 64 | if isinstance(plan, dict): |
| 65 | if "Node Type" in plan: |
| 66 | nodes.append(plan["Node Type"]) |
| 67 | for v in plan.values(): |
| 68 | nodes.extend(_plan_nodes(v)) |
| 69 | elif isinstance(plan, list): |
| 70 | for item in plan: |
| 71 | nodes.extend(_plan_nodes(item)) |
| 72 | return nodes |
| 73 | |
| 74 | |
| 75 | def _plan_indexes(plan: JSONValue) -> list[str]: |
| 76 | """Recursively collect all 'Index Name' values from a EXPLAIN JSON plan.""" |
| 77 | indexes: list[str] = [] |
| 78 | if isinstance(plan, dict): |
| 79 | if "Index Name" in plan: |
| 80 | indexes.append(plan["Index Name"]) |
| 81 | for v in plan.values(): |
| 82 | indexes.extend(_plan_indexes(v)) |
| 83 | elif isinstance(plan, list): |
| 84 | for item in plan: |
| 85 | indexes.extend(_plan_indexes(item)) |
| 86 | return indexes |
| 87 | |
| 88 | |
| 89 | async def _explain(session: AsyncSession, sql: str, params: StrDict) -> JSONValue: |
| 90 | """Run EXPLAIN (FORMAT JSON, ANALYZE FALSE) and return the parsed plan.""" |
| 91 | explain_sql = f"EXPLAIN (FORMAT JSON, ANALYZE FALSE) {sql}" |
| 92 | result = await session.execute(text(explain_sql), params) |
| 93 | return result.scalar_one() |
| 94 | |
| 95 | |
| 96 | def _has_index_scan(plan: JSONValue, index_name: str) -> bool: |
| 97 | """Return True if the plan uses an Index Scan on ``index_name``.""" |
| 98 | return index_name in _plan_indexes(plan) |
| 99 | |
| 100 | |
| 101 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 102 | # Tests |
| 103 | # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
| 104 | |
| 105 | |
| 106 | class TestProposalListIndexes: |
| 107 | """EXPLAIN ANALYZE harness β verifies planner chooses the Phase 7 indexes.""" |
| 108 | |
| 109 | @pytest.mark.asyncio |
| 110 | async def test_ix_repo_state_created_used_for_list_sort_newest( |
| 111 | self, db_session: AsyncSession |
| 112 | ) -> None: |
| 113 | """List query with ORDER BY created_at DESC should hit the date index.""" |
| 114 | repo_id = await _make_repo(db_session) |
| 115 | sql = """ |
| 116 | SELECT proposal_id FROM musehub_proposals |
| 117 | WHERE repo_id = :repo_id AND state = 'open' |
| 118 | ORDER BY created_at DESC |
| 119 | LIMIT 50 |
| 120 | """ |
| 121 | plan = await _explain(db_session, sql, {"repo_id": repo_id}) |
| 122 | # Accept either Index Scan or Bitmap Index Scan |
| 123 | assert _has_index_scan(plan, "ix_musehub_proposals_repo_state_created") or \ |
| 124 | _has_index_scan(plan, "ix_musehub_proposals_repo_state"), \ |
| 125 | f"Expected index scan on repo_state_created, got plan:\n{json.dumps(plan, indent=2)[:800]}" |
| 126 | |
| 127 | @pytest.mark.asyncio |
| 128 | async def test_ix_repo_state_risk_used_for_risk_sort( |
| 129 | self, db_session: AsyncSession |
| 130 | ) -> None: |
| 131 | """List query sorted by risk_score should avoid a full sequential scan.""" |
| 132 | repo_id = await _make_repo(db_session) |
| 133 | sql = """ |
| 134 | SELECT proposal_id FROM musehub_proposals |
| 135 | WHERE repo_id = :repo_id AND state = 'open' |
| 136 | ORDER BY risk_score DESC NULLS LAST |
| 137 | LIMIT 50 |
| 138 | """ |
| 139 | plan = await _explain(db_session, sql, {"repo_id": repo_id}) |
| 140 | # The planner may pick the dedicated risk index, the base state index, or |
| 141 | # any other composite proposal index depending on table statistics. |
| 142 | # What matters is that it uses index-based access β not a Seq Scan. |
| 143 | nodes = _plan_nodes(plan) |
| 144 | assert "Index Scan" in nodes or "Bitmap Index Scan" in nodes or \ |
| 145 | "Index Only Scan" in nodes, \ |
| 146 | f"Expected index-based access for risk sort, got plan:\n{json.dumps(plan, indent=2)[:800]}" |
| 147 | |
| 148 | @pytest.mark.asyncio |
| 149 | async def test_ix_review_proposal_state_used_for_approval_prefetch( |
| 150 | self, db_session: AsyncSession |
| 151 | ) -> None: |
| 152 | """Approval prefetch does not full-seq-scan musehub_proposal_reviews.""" |
| 153 | fake_pid = f"sha256:{'a' * 64}" |
| 154 | sql = """ |
| 155 | SELECT proposal_id, COUNT(*) as approved_ct |
| 156 | FROM musehub_proposal_reviews |
| 157 | WHERE proposal_id IN (:pid) AND state = 'approved' |
| 158 | GROUP BY proposal_id |
| 159 | """ |
| 160 | plan = await _explain(db_session, sql, {"pid": fake_pid}) |
| 161 | nodes = _plan_nodes(plan) |
| 162 | # Any index-based access is acceptable β planner may choose the existing |
| 163 | # proposal_id FK index or the new composite index; both avoid a seq scan. |
| 164 | assert "Index Scan" in nodes or "Bitmap Index Scan" in nodes or \ |
| 165 | "Index Only Scan" in nodes, \ |
| 166 | f"Expected index-based access on proposal_reviews, got nodes={nodes}\nplan:\n{json.dumps(plan, indent=2)[:600]}" |
| 167 | |
| 168 | @pytest.mark.asyncio |
| 169 | async def test_ix_identity_handle_type_used_for_author_type_filter( |
| 170 | self, db_session: AsyncSession |
| 171 | ) -> None: |
| 172 | """Author-type filter uses index-based access on musehub_identities.""" |
| 173 | sql = """ |
| 174 | SELECT handle FROM musehub_identities |
| 175 | WHERE handle IN (:h) AND identity_type = 'agent' |
| 176 | """ |
| 177 | plan = await _explain(db_session, sql, {"h": "gabriel"}) |
| 178 | nodes = _plan_nodes(plan) |
| 179 | # The unique handle constraint or the new composite index both work; |
| 180 | # either way the planner should not do a seq scan. |
| 181 | assert "Index Scan" in nodes or "Bitmap Index Scan" in nodes or \ |
| 182 | "Index Only Scan" in nodes, \ |
| 183 | f"Expected index-based access on musehub_identities, got nodes={nodes}\nplan:\n{json.dumps(plan, indent=2)[:600]}" |
| 184 | |
| 185 | @pytest.mark.asyncio |
| 186 | async def test_all_four_indexes_exist_in_pg_catalog( |
| 187 | self, db_session: AsyncSession |
| 188 | ) -> None: |
| 189 | """All 4 Phase 7 indexes must be present in pg_indexes.""" |
| 190 | expected = { |
| 191 | "ix_musehub_proposals_repo_state_created", |
| 192 | "ix_musehub_proposals_repo_state_risk", |
| 193 | "ix_musehub_proposal_reviews_proposal_state", |
| 194 | "ix_musehub_identities_handle_type", |
| 195 | } |
| 196 | result = await db_session.execute( |
| 197 | text( |
| 198 | "SELECT indexname FROM pg_indexes " |
| 199 | "WHERE schemaname = 'public' AND indexname = ANY(:names)" |
| 200 | ), |
| 201 | {"names": list(expected)}, |
| 202 | ) |
| 203 | found = {row[0] for row in result.fetchall()} |
| 204 | missing = expected - found |
| 205 | assert not missing, f"Phase 7 indexes missing from pg_indexes: {missing}" |