test_repo_card_security.py
file-level
1
files
1
commits
0
hotspots
0
π§ dead
0
π₯ blast risk
| 1 | """ |
| 2 | Tier 8 β Security tests for enrich_repo_cards(). |
| 3 | |
| 4 | Security tests verify that malicious inputs cannot cause SQL injection, data |
| 5 | leakage across repo boundaries, or information disclosure via error messages. |
| 6 | |
| 7 | Test IDs |
| 8 | -------- |
| 9 | T800 β SQL injection in repo_id does not execute arbitrary SQL |
| 10 | T801 β repos belonging to other owners return no data for a different owner's repo |
| 11 | T802 β symbol address containing SQL metacharacters is stored and returned safely |
| 12 | T803 β extreme churn_30d / blast values do not overflow or corrupt results |
| 13 | T804 β NULL / empty-string agent_id is never counted as an agent commit |
| 14 | T805 β enrichment of a non-existent repo_id returns zero-value, not a 500 |
| 15 | """ |
| 16 | from __future__ import annotations |
| 17 | |
| 18 | from datetime import datetime, timezone |
| 19 | |
| 20 | import pytest |
| 21 | from sqlalchemy.ext.asyncio import AsyncSession |
| 22 | from sqlalchemy import text |
| 23 | |
| 24 | from musehub.db.musehub_intel_models import MusehubSymbolIntel |
| 25 | from musehub.services.repo_card_enrichment import enrich_repo_cards |
| 26 | from tests.factories import create_commit, create_repo |
| 27 | |
| 28 | |
| 29 | def _utc_now() -> datetime: |
| 30 | return datetime.now(tz=timezone.utc) |
| 31 | |
| 32 | |
| 33 | # --------------------------------------------------------------------------- |
| 34 | # T800 β SQL injection in repo_id |
| 35 | # --------------------------------------------------------------------------- |
| 36 | |
| 37 | @pytest.mark.asyncio |
| 38 | async def test_t800_sql_injection_in_repo_id(db_session: AsyncSession) -> None: |
| 39 | """T800: a crafted repo_id containing SQL does not execute arbitrary SQL.""" |
| 40 | # This ID will never match any real row β the important thing is it |
| 41 | # does not raise an error or corrupt query results. |
| 42 | injection_ids = [ |
| 43 | "sha256:' OR '1'='1", |
| 44 | "sha256:; DROP TABLE musehub_commits; --", |
| 45 | "sha256:\" UNION SELECT 1,2,3 --", |
| 46 | "sha256:" + "a" * 64, # valid-looking but nonexistent |
| 47 | ] |
| 48 | |
| 49 | # Must not raise; must return zero-value enrichments for all IDs |
| 50 | result = await enrich_repo_cards(db_session, injection_ids) |
| 51 | assert len(result) == len(injection_ids) |
| 52 | for enc in result.values(): |
| 53 | assert enc.autonomy_pct == 0 |
| 54 | assert enc.dead_count == 0 |
| 55 | assert enc.hottest_symbol is None |
| 56 | |
| 57 | |
| 58 | # --------------------------------------------------------------------------- |
| 59 | # T801 β cross-repo isolation |
| 60 | # --------------------------------------------------------------------------- |
| 61 | |
| 62 | @pytest.mark.asyncio |
| 63 | async def test_t801_cross_repo_data_isolation(db_session: AsyncSession) -> None: |
| 64 | """T801: enriching repo A cannot read intel data seeded for repo B.""" |
| 65 | repo_a = await create_repo(db_session, visibility="public") |
| 66 | repo_b = await create_repo(db_session, visibility="public") |
| 67 | |
| 68 | # Seed all signals exclusively on repo_b |
| 69 | db_session.add(MusehubSymbolIntel( |
| 70 | repo_id=repo_b.repo_id, |
| 71 | address="src/secret.py::private_fn", |
| 72 | churn_30d=999, |
| 73 | blast=999, |
| 74 | )) |
| 75 | await db_session.commit() |
| 76 | |
| 77 | # Enrich only repo_a |
| 78 | result = await enrich_repo_cards(db_session, [repo_a.repo_id]) |
| 79 | enc_a = result[repo_a.repo_id] |
| 80 | |
| 81 | assert enc_a.hottest_symbol is None, "repo_a should not see repo_b's symbols" |
| 82 | assert enc_a.blast_leader is None |
| 83 | assert enc_a.dead_count == 0 |
| 84 | assert enc_a.autonomy_pct == 0 |
| 85 | |
| 86 | |
| 87 | # --------------------------------------------------------------------------- |
| 88 | # T802 β SQL metacharacters in symbol address |
| 89 | # --------------------------------------------------------------------------- |
| 90 | |
| 91 | @pytest.mark.asyncio |
| 92 | async def test_t802_metacharacters_in_symbol_address(db_session: AsyncSession) -> None: |
| 93 | """T802: symbol addresses containing SQL metacharacters round-trip safely.""" |
| 94 | repo = await create_repo(db_session, visibility="public") |
| 95 | dangerous_address = "src/a.py::fn_with_'quotes'_and_\"doubles\"" |
| 96 | db_session.add(MusehubSymbolIntel( |
| 97 | repo_id=repo.repo_id, |
| 98 | address=dangerous_address, |
| 99 | churn_30d=5, |
| 100 | blast=0, |
| 101 | )) |
| 102 | await db_session.commit() |
| 103 | |
| 104 | result = await enrich_repo_cards(db_session, [repo.repo_id]) |
| 105 | enc = result[repo.repo_id] |
| 106 | |
| 107 | assert enc.hottest_symbol is not None |
| 108 | assert enc.hottest_symbol.address == dangerous_address |
| 109 | |
| 110 | |
| 111 | # --------------------------------------------------------------------------- |
| 112 | # T803 β extreme numeric values do not overflow |
| 113 | # --------------------------------------------------------------------------- |
| 114 | |
| 115 | @pytest.mark.asyncio |
| 116 | async def test_t803_extreme_numeric_values_safe(db_session: AsyncSession) -> None: |
| 117 | """T803: very large churn_30d / blast values do not corrupt enrichment results.""" |
| 118 | repo = await create_repo(db_session, visibility="public") |
| 119 | db_session.add(MusehubSymbolIntel( |
| 120 | repo_id=repo.repo_id, |
| 121 | address="src/big.py::giant_fn", |
| 122 | churn_30d=2_147_483_647, # INT32_MAX |
| 123 | blast=2_147_483_647, |
| 124 | )) |
| 125 | await db_session.commit() |
| 126 | |
| 127 | result = await enrich_repo_cards(db_session, [repo.repo_id]) |
| 128 | enc = result[repo.repo_id] |
| 129 | |
| 130 | assert enc.hottest_symbol is not None |
| 131 | assert enc.hottest_symbol.churn_30d == 2_147_483_647 |
| 132 | assert enc.blast_leader is not None |
| 133 | assert enc.blast_leader.blast == 2_147_483_647 |
| 134 | # autonomy_pct must still be in bounds |
| 135 | assert 0 <= enc.autonomy_pct <= 100 |
| 136 | |
| 137 | |
| 138 | # --------------------------------------------------------------------------- |
| 139 | # T804 β NULL / empty agent_id never counted as agent |
| 140 | # --------------------------------------------------------------------------- |
| 141 | |
| 142 | @pytest.mark.asyncio |
| 143 | async def test_t804_null_and_empty_agent_id_not_counted(db_session: AsyncSession) -> None: |
| 144 | """T804: commits with agent_id = NULL or '' are never counted as agent commits.""" |
| 145 | repo = await create_repo(db_session, visibility="public") |
| 146 | |
| 147 | # Insert commits with NULL agent_id (default from factory) |
| 148 | for _ in range(3): |
| 149 | await create_commit(db_session, repo.repo_id, timestamp=_utc_now()) |
| 150 | |
| 151 | # Insert a commit with explicit empty string |
| 152 | commit = await create_commit(db_session, repo.repo_id, timestamp=_utc_now()) |
| 153 | await db_session.execute( |
| 154 | text("UPDATE musehub_commits SET agent_id = '' WHERE commit_id = :cid"), |
| 155 | {"cid": commit.commit_id}, |
| 156 | ) |
| 157 | await db_session.commit() |
| 158 | |
| 159 | result = await enrich_repo_cards(db_session, [repo.repo_id]) |
| 160 | assert result[repo.repo_id].autonomy_pct == 0, ( |
| 161 | "NULL and empty agent_id should contribute 0 to autonomy_pct" |
| 162 | ) |
| 163 | |
| 164 | |
| 165 | # --------------------------------------------------------------------------- |
| 166 | # T805 β non-existent repo_id returns zero-value, not a 500 |
| 167 | # --------------------------------------------------------------------------- |
| 168 | |
| 169 | @pytest.mark.asyncio |
| 170 | async def test_t805_nonexistent_repo_id_returns_zero_value(db_session: AsyncSession) -> None: |
| 171 | """T805: a repo_id with no matching rows returns a zero-value enrichment without raising.""" |
| 172 | ghost_id = "sha256:" + "0" * 64 |
| 173 | |
| 174 | result = await enrich_repo_cards(db_session, [ghost_id]) |
| 175 | |
| 176 | assert ghost_id in result |
| 177 | enc = result[ghost_id] |
| 178 | assert enc.autonomy_pct == 0 |
| 179 | assert enc.hottest_symbol is None |
| 180 | assert enc.blast_leader is None |
| 181 | assert enc.dead_count == 0 |
| 182 | assert enc.error_count == 0 |
| 183 | assert enc.warning_count == 0 |
| 184 | assert enc.health_status == "clean" |
| 185 | assert len(enc.pulse_buckets) == 30 |