""" Tier 8 — Security tests for enrich_repo_cards(). Security tests verify that malicious inputs cannot cause SQL injection, data leakage across repo boundaries, or information disclosure via error messages. Test IDs -------- T800 — SQL injection in repo_id does not execute arbitrary SQL T801 — repos belonging to other owners return no data for a different owner's repo T802 — symbol address containing SQL metacharacters is stored and returned safely T803 — extreme churn_30d / blast values do not overflow or corrupt results T804 — NULL / empty-string agent_id is never counted as an agent commit T805 — enrichment of a non-existent repo_id returns zero-value, not a 500 """ from __future__ import annotations from datetime import datetime, timezone import pytest from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import text from musehub.db.musehub_intel_models import MusehubSymbolIntel from musehub.services.repo_card_enrichment import enrich_repo_cards from tests.factories import create_commit, create_repo def _utc_now() -> datetime: return datetime.now(tz=timezone.utc) # --------------------------------------------------------------------------- # T800 — SQL injection in repo_id # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t800_sql_injection_in_repo_id(db_session: AsyncSession) -> None: """T800: a crafted repo_id containing SQL does not execute arbitrary SQL.""" # This ID will never match any real row — the important thing is it # does not raise an error or corrupt query results. injection_ids = [ "sha256:' OR '1'='1", "sha256:; DROP TABLE musehub_commits; --", "sha256:\" UNION SELECT 1,2,3 --", "sha256:" + "a" * 64, # valid-looking but nonexistent ] # Must not raise; must return zero-value enrichments for all IDs result = await enrich_repo_cards(db_session, injection_ids) assert len(result) == len(injection_ids) for enc in result.values(): assert enc.autonomy_pct == 0 assert enc.dead_count == 0 assert enc.hottest_symbol is None # --------------------------------------------------------------------------- # T801 — cross-repo isolation # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t801_cross_repo_data_isolation(db_session: AsyncSession) -> None: """T801: enriching repo A cannot read intel data seeded for repo B.""" repo_a = await create_repo(db_session, visibility="public") repo_b = await create_repo(db_session, visibility="public") # Seed all signals exclusively on repo_b db_session.add(MusehubSymbolIntel( repo_id=repo_b.repo_id, address="src/secret.py::private_fn", churn_30d=999, blast=999, )) await db_session.commit() # Enrich only repo_a result = await enrich_repo_cards(db_session, [repo_a.repo_id]) enc_a = result[repo_a.repo_id] assert enc_a.hottest_symbol is None, "repo_a should not see repo_b's symbols" assert enc_a.blast_leader is None assert enc_a.dead_count == 0 assert enc_a.autonomy_pct == 0 # --------------------------------------------------------------------------- # T802 — SQL metacharacters in symbol address # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t802_metacharacters_in_symbol_address(db_session: AsyncSession) -> None: """T802: symbol addresses containing SQL metacharacters round-trip safely.""" repo = await create_repo(db_session, visibility="public") dangerous_address = "src/a.py::fn_with_'quotes'_and_\"doubles\"" db_session.add(MusehubSymbolIntel( repo_id=repo.repo_id, address=dangerous_address, churn_30d=5, blast=0, )) await db_session.commit() result = await enrich_repo_cards(db_session, [repo.repo_id]) enc = result[repo.repo_id] assert enc.hottest_symbol is not None assert enc.hottest_symbol.address == dangerous_address # --------------------------------------------------------------------------- # T803 — extreme numeric values do not overflow # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t803_extreme_numeric_values_safe(db_session: AsyncSession) -> None: """T803: very large churn_30d / blast values do not corrupt enrichment results.""" repo = await create_repo(db_session, visibility="public") db_session.add(MusehubSymbolIntel( repo_id=repo.repo_id, address="src/big.py::giant_fn", churn_30d=2_147_483_647, # INT32_MAX blast=2_147_483_647, )) await db_session.commit() result = await enrich_repo_cards(db_session, [repo.repo_id]) enc = result[repo.repo_id] assert enc.hottest_symbol is not None assert enc.hottest_symbol.churn_30d == 2_147_483_647 assert enc.blast_leader is not None assert enc.blast_leader.blast == 2_147_483_647 # autonomy_pct must still be in bounds assert 0 <= enc.autonomy_pct <= 100 # --------------------------------------------------------------------------- # T804 — NULL / empty agent_id never counted as agent # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t804_null_and_empty_agent_id_not_counted(db_session: AsyncSession) -> None: """T804: commits with agent_id = NULL or '' are never counted as agent commits.""" repo = await create_repo(db_session, visibility="public") # Insert commits with NULL agent_id (default from factory) for _ in range(3): await create_commit(db_session, repo.repo_id, timestamp=_utc_now()) # Insert a commit with explicit empty string commit = await create_commit(db_session, repo.repo_id, timestamp=_utc_now()) await db_session.execute( text("UPDATE musehub_commits SET agent_id = '' WHERE commit_id = :cid"), {"cid": commit.commit_id}, ) await db_session.commit() result = await enrich_repo_cards(db_session, [repo.repo_id]) assert result[repo.repo_id].autonomy_pct == 0, ( "NULL and empty agent_id should contribute 0 to autonomy_pct" ) # --------------------------------------------------------------------------- # T805 — non-existent repo_id returns zero-value, not a 500 # --------------------------------------------------------------------------- @pytest.mark.asyncio async def test_t805_nonexistent_repo_id_returns_zero_value(db_session: AsyncSession) -> None: """T805: a repo_id with no matching rows returns a zero-value enrichment without raising.""" ghost_id = "sha256:" + "0" * 64 result = await enrich_repo_cards(db_session, [ghost_id]) assert ghost_id in result enc = result[ghost_id] assert enc.autonomy_pct == 0 assert enc.hottest_symbol is None assert enc.blast_leader is None assert enc.dead_count == 0 assert enc.error_count == 0 assert enc.warning_count == 0 assert enc.health_status == "clean" assert len(enc.pulse_buckets) == 30