"""Section 23 — Agent Context API: 7-layer test suite. Covers musehub/services/musehub_context.py and musehub/models/musehub_context.py. The 15 existing tests in test_musehub_context.py cover E2E + integration basics; this suite adds unit, stress, data-integrity, security, and performance layers. Layer map --------- 1. Unit — pure functions, constants, Pydantic models 2. Integration — service functions against real PostgreSQL DB 3. E2E — HTTP client against the full app 4. Stress — large datasets, concurrent requests 5. Data Integrity — ordering, filtering, exclusion rules 6. Security — auth enforcement, private repo visibility 7. Performance — timing budgets """ from __future__ import annotations import asyncio import secrets import time from datetime import datetime, timezone import pytest from httpx import AsyncClient from muse.core.types import fake_id from musehub.core.genesis import compute_identity_id, compute_repo_id from sqlalchemy.ext.asyncio import AsyncSession from musehub.types.json_types import StrDict from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitRef, MusehubRepo from musehub.db.musehub_social_models import MusehubIssue, MusehubProposal from musehub.models.musehub_context import ( ActiveProposalContext, AgentContextResponse, AnalysisSummaryContext, ContextDepth, ContextFormat, HistoryEntryContext, MusicalStateContext, OpenIssueContext, ) from musehub.services.musehub_context import ( _HISTORY_LIMIT, _INCLUDE_ISSUE_BODY, _INCLUDE_PROPOSAL_BODY, _extract_tracks_from_snapshot, _generate_suggestions, _get_latest_commit, _get_open_issues, _get_open_proposals, _resolve_ref_to_commit, _utc_iso, build_agent_context, ) # --------------------------------------------------------------------------- # DB helpers # --------------------------------------------------------------------------- def _uid() -> str: return secrets.token_hex(16) async def _db_repo(session: AsyncSession, *, visibility: str = "private") -> str: slug = f"test-repo-{_uid()[:8]}" owner_id = compute_identity_id(b"testuser") created_at = datetime.now(tz=timezone.utc) repo = MusehubRepo( repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()), name=slug, slug=slug, owner="testuser", owner_user_id=owner_id, visibility=visibility, created_at=created_at, updated_at=created_at, ) session.add(repo) await session.flush() return repo.repo_id async def _db_commit( session: AsyncSession, repo_id: str, *, branch: str = "main", message: str = "add groove", ts: datetime | None = None, parent_id: str | None = None, ) -> str: commit_id = _uid().replace("-", "") c = MusehubCommit( commit_id=commit_id, branch=branch, parent_ids=[parent_id] if parent_id else [], message=message, author="agent", timestamp=ts or datetime.now(timezone.utc), ) session.add(c) session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id)) await session.flush() return commit_id async def _db_branch(session: AsyncSession, repo_id: str, name: str, head: str) -> None: session.add(MusehubBranch(branch_id=fake_id(f"{repo_id}-branch-{name}"), repo_id=repo_id, name=name, head_commit_id=head)) await session.flush() async def _db_issue( session: AsyncSession, repo_id: str, *, number: int = 1, title: str = "fix harmony", body: str = "needs fixing", state: str = "open", labels: list[str] | None = None, ) -> str: issue = MusehubIssue( issue_id=fake_id(f"{repo_id}-issue-{number}"), repo_id=repo_id, number=number, title=title, body=body, state=state, labels=labels or [], ) session.add(issue) await session.flush() return issue.issue_id async def _db_proposal_ctx( session: AsyncSession, repo_id: str, *, proposal_number: int = 1, title: str = "add tritone sub", body: str = "see description", state: str = "open", from_branch: str = "feat/x", to_branch: str = "main", ) -> str: proposal = MusehubProposal( proposal_id=fake_id(f"{repo_id}-proposal-{proposal_number}"), repo_id=repo_id, proposal_number=proposal_number, title=title, body=body, state=state, from_branch=from_branch, to_branch=to_branch, ) session.add(proposal) await session.flush() return proposal.proposal_id async def _api_repo( client: AsyncClient, auth_headers: StrDict, *, name: str | None = None, visibility: str = "private", ) -> str: name = name or f"repo-{_uid()[:8]}" r = await client.post( "/api/repos", json={"name": name, "owner": "testuser", "visibility": visibility}, headers=auth_headers, ) assert r.status_code == 201, r.text return r.json()["repoId"] # =========================================================================== # Layer 1 — Unit # =========================================================================== class TestUnitUtcIso: def test_naive_datetime_gets_utc(self) -> None: dt = datetime(2026, 1, 15, 12, 0, 0) result = _utc_iso(dt) assert "+00:00" in result or "Z" in result.upper() or "UTC" in result assert "2026-01-15" in result def test_aware_datetime_preserved(self) -> None: dt = datetime(2026, 6, 1, 0, 0, 0, tzinfo=timezone.utc) result = _utc_iso(dt) assert "2026-06-01" in result def test_returns_string(self) -> None: assert isinstance(_utc_iso(datetime.now(timezone.utc)), str) def test_iso_format_parseable(self) -> None: dt = datetime(2025, 3, 14, 9, 26, 53, tzinfo=timezone.utc) result = _utc_iso(dt) parsed = datetime.fromisoformat(result) assert parsed.year == 2025 assert parsed.month == 3 assert parsed.day == 14 class TestUnitExtractTracks: def test_none_snapshot_returns_empty(self) -> None: assert _extract_tracks_from_snapshot(None) == [] def test_any_object_returns_empty(self) -> None: # stub returns [] regardless — just verifies the contract assert _extract_tracks_from_snapshot(object()) == [] def test_returns_list(self) -> None: result = _extract_tracks_from_snapshot(None) assert isinstance(result, list) class TestUnitHistoryLimit: def test_brief_is_three(self) -> None: assert _HISTORY_LIMIT[ContextDepth.brief] == 3 def test_standard_is_ten(self) -> None: assert _HISTORY_LIMIT[ContextDepth.standard] == 10 def test_verbose_is_fifty(self) -> None: assert _HISTORY_LIMIT[ContextDepth.verbose] == 50 def test_all_depths_covered(self) -> None: for depth in ContextDepth: assert depth in _HISTORY_LIMIT class TestUnitIncludeFlags: def test_proposal_body_brief_false(self) -> None: assert _INCLUDE_PROPOSAL_BODY[ContextDepth.brief] is False def test_proposal_body_standard_true(self) -> None: assert _INCLUDE_PROPOSAL_BODY[ContextDepth.standard] is True def test_proposal_body_verbose_true(self) -> None: assert _INCLUDE_PROPOSAL_BODY[ContextDepth.verbose] is True def test_issue_body_brief_false(self) -> None: assert _INCLUDE_ISSUE_BODY[ContextDepth.brief] is False def test_issue_body_standard_false(self) -> None: assert _INCLUDE_ISSUE_BODY[ContextDepth.standard] is False def test_issue_body_verbose_true(self) -> None: assert _INCLUDE_ISSUE_BODY[ContextDepth.verbose] is True class TestUnitGenerateSuggestions: def _empty_state(self) -> MusicalStateContext: return MusicalStateContext(active_tracks=[]) def _state_with_tracks(self) -> MusicalStateContext: return MusicalStateContext(active_tracks=["drums", "bass"]) def _issue(self, n: int = 1) -> OpenIssueContext: return OpenIssueContext( issue_id=_uid(), number=n, title=f"issue {n}", labels=[], body="" ) def _proposal_ctx(self) -> ActiveProposalContext: return ActiveProposalContext( proposal_id=_uid(), title="add swing feel", from_branch="feat/swing", to_branch="main", state="open", body="", ) def test_no_tracks_generates_suggestion(self) -> None: s = _generate_suggestions(self._empty_state(), [], [], ContextDepth.standard) assert len(s) >= 1 assert any("No files" in x for x in s) def test_with_tracks_no_no_files_suggestion(self) -> None: s = _generate_suggestions( self._state_with_tracks(), [], [], ContextDepth.standard ) assert not any("No files" in x for x in s) def test_open_issue_generates_suggestion(self) -> None: s = _generate_suggestions( self._state_with_tracks(), [self._issue(5)], [], ContextDepth.standard ) assert any("#5" in x for x in s) def test_open_pr_generates_suggestion(self) -> None: s = _generate_suggestions( self._state_with_tracks(), [], [self._proposal_ctx()], ContextDepth.standard ) assert any("add swing feel" in x for x in s) def test_brief_caps_at_two(self) -> None: # force 3 suggestions: no tracks + issue + proposal s = _generate_suggestions( self._empty_state(), [self._issue()], [self._proposal_ctx()], ContextDepth.brief ) assert len(s) <= 2 def test_standard_caps_at_four(self) -> None: issues = [self._issue(i) for i in range(1, 5)] proposals_ctx = [self._proposal_ctx()] # empty state + 4 issues + 1 proposal = 6 raw suggestions; capped at 4 s = _generate_suggestions(self._empty_state(), issues, proposals_ctx, ContextDepth.standard) assert len(s) <= 4 def test_verbose_uncapped(self) -> None: issues = [self._issue(i) for i in range(1, 5)] proposals_ctx = [self._proposal_ctx()] s = _generate_suggestions(self._empty_state(), issues, proposals_ctx, ContextDepth.verbose) # 1 (no tracks) + 1 (first issue) + 1 (first proposal) = 3 — all returned assert len(s) == 3 def test_returns_strings(self) -> None: s = _generate_suggestions(self._empty_state(), [], [], ContextDepth.brief) assert all(isinstance(x, str) for x in s) def test_deterministic(self) -> None: state = self._empty_state() issues = [self._issue()] proposals_ctx = [self._proposal_ctx()] s1 = _generate_suggestions(state, issues, proposals_ctx, ContextDepth.standard) s2 = _generate_suggestions(state, issues, proposals_ctx, ContextDepth.standard) assert s1 == s2 class TestUnitModels: def test_context_depth_values(self) -> None: assert ContextDepth.brief == "brief" assert ContextDepth.standard == "standard" assert ContextDepth.verbose == "verbose" def test_context_format_values(self) -> None: assert ContextFormat.json == "json" assert ContextFormat.yaml == "yaml" def test_musical_state_default_empty_tracks(self) -> None: m = MusicalStateContext() assert m.active_tracks == [] def test_history_entry_context_fields(self) -> None: h = HistoryEntryContext( commit_id="abc123", message="add bass", author="agent", timestamp="2026-01-01T00:00:00+00:00", ) assert h.commit_id == "abc123" assert h.active_tracks == [] def test_analysis_all_none_by_default(self) -> None: a = AnalysisSummaryContext() assert a.key_finding is None assert a.chord_progression is None assert a.groove_score is None assert a.emotion is None assert a.harmonic_tension is None assert a.melodic_contour is None def test_open_issue_context_defaults(self) -> None: i = OpenIssueContext(issue_id=_uid(), number=1, title="fix") assert i.labels == [] assert i.body == "" def test_active_pr_context_defaults(self) -> None: p = ActiveProposalContext( proposal_id=_uid(), title="Proposal", from_branch="a", to_branch="b", state="open", ) assert p.body == "" def test_agent_context_response_camel_fields(self) -> None: resp = AgentContextResponse( repo_id="r1", ref="main", depth="standard", musical_state=MusicalStateContext(), analysis=AnalysisSummaryContext(), ) d = resp.model_dump(by_alias=True) assert "repoId" in d assert "musicalState" in d assert "activeProposals" in d assert "openIssues" in d # =========================================================================== # Layer 2 — Integration # =========================================================================== class TestIntegrationResolveRef: async def test_resolve_branch_name(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) commit_id = await _db_commit(db_session, repo_id) await _db_branch(db_session, repo_id, "main", commit_id) await db_session.flush() result = await _resolve_ref_to_commit(db_session, repo_id, "main") assert result is not None assert result.commit_id == commit_id async def test_resolve_commit_id_directly(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) commit_id = await _db_commit(db_session, repo_id) await db_session.flush() result = await _resolve_ref_to_commit(db_session, repo_id, commit_id) assert result is not None assert result.commit_id == commit_id async def test_resolve_nonexistent_returns_none(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) await db_session.flush() result = await _resolve_ref_to_commit(db_session, repo_id, "nonexistent-ref") assert result is None async def test_branch_takes_priority_over_commit_id(self, db_session: AsyncSession) -> None: """If a branch name happens to equal a commit ID substring, branch wins.""" repo_id = await _db_repo(db_session) commit_id = await _db_commit(db_session, repo_id) branch_commit_id = await _db_commit(db_session, repo_id, message="branch head") await _db_branch(db_session, repo_id, "main", branch_commit_id) await db_session.flush() # Resolving "main" returns the branch head, not commit_id result = await _resolve_ref_to_commit(db_session, repo_id, "main") assert result is not None assert result.commit_id == branch_commit_id class TestIntegrationGetLatestCommit: async def test_returns_most_recent(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) ts_old = datetime(2026, 1, 1, tzinfo=timezone.utc) ts_new = datetime(2026, 6, 1, tzinfo=timezone.utc) await _db_commit(db_session, repo_id, ts=ts_old, message="old") new_id = await _db_commit(db_session, repo_id, ts=ts_new, message="new") await db_session.flush() result = await _get_latest_commit(db_session, repo_id) assert result is not None assert result.commit_id == new_id async def test_no_commits_returns_none(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) await db_session.flush() result = await _get_latest_commit(db_session, repo_id) assert result is None class TestIntegrationGetOpenProposals: async def test_include_body_true(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) await _db_proposal_ctx(db_session, repo_id, body="detailed body text") await db_session.flush() results = await _get_open_proposals(db_session, repo_id, include_body=True) assert len(results) == 1 assert results[0].body == "detailed body text" async def test_include_body_false(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) await _db_proposal_ctx(db_session, repo_id, body="detailed body text") await db_session.flush() results = await _get_open_proposals(db_session, repo_id, include_body=False) assert len(results) == 1 assert results[0].body == "" async def test_closed_prs_excluded(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) await _db_proposal_ctx(db_session, repo_id, state="closed") await db_session.flush() results = await _get_open_proposals(db_session, repo_id, include_body=False) assert results == [] class TestIntegrationGetOpenIssues: async def test_include_body_verbose(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) await _db_issue(db_session, repo_id, body="full body text") await db_session.flush() results = await _get_open_issues(db_session, repo_id, include_body=True) assert len(results) == 1 assert results[0].body == "full body text" async def test_include_body_false_empty_string(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) await _db_issue(db_session, repo_id, body="full body text") await db_session.flush() results = await _get_open_issues(db_session, repo_id, include_body=False) assert results[0].body == "" async def test_closed_issues_excluded(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) await _db_issue(db_session, repo_id, state="closed") await db_session.flush() results = await _get_open_issues(db_session, repo_id, include_body=False) assert results == [] async def test_ordered_by_number(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) await _db_issue(db_session, repo_id, number=5, title="five") await _db_issue(db_session, repo_id, number=2, title="two") await _db_issue(db_session, repo_id, number=8, title="eight") await db_session.flush() results = await _get_open_issues(db_session, repo_id, include_body=False) assert [r.number for r in results] == [2, 5, 8] class TestIntegrationBuildAgentContext: async def test_repo_not_found_returns_none(self, db_session: AsyncSession) -> None: result = await build_agent_context( db_session, repo_id="nonexistent-repo", ref="main" ) assert result is None async def test_no_commits_returns_none(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) await db_session.flush() result = await build_agent_context(db_session, repo_id=repo_id, ref="HEAD") assert result is None async def test_head_resolves_to_latest(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) ts_old = datetime(2026, 1, 1, tzinfo=timezone.utc) ts_new = datetime(2026, 6, 1, tzinfo=timezone.utc) await _db_commit(db_session, repo_id, ts=ts_old, branch="main", message="old") new_id = await _db_commit( db_session, repo_id, ts=ts_new, branch="main", message="new" ) await _db_branch(db_session, repo_id, "main", new_id) await db_session.flush() result = await build_agent_context(db_session, repo_id=repo_id, ref="HEAD") assert result is not None assert result.repo_id == repo_id # History excludes the head commit; head is the new one history_ids = [h.commit_id for h in result.history] assert new_id not in history_ids async def test_branch_ref_resolution(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) commit_id = await _db_commit(db_session, repo_id, branch="feature") await _db_branch(db_session, repo_id, "feature", commit_id) await db_session.flush() result = await build_agent_context( db_session, repo_id=repo_id, ref="feature" ) assert result is not None assert result.ref == "feature" async def test_brief_depth_history_limit(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) for i in range(8): ts = datetime(2026, 1, i + 1, tzinfo=timezone.utc) await _db_commit( db_session, repo_id, ts=ts, branch="main", message=f"commit {i}" ) await db_session.flush() result = await build_agent_context( db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.brief ) assert result is not None assert len(result.history) <= 3 async def test_verbose_depth_issue_body_included( self, db_session: AsyncSession ) -> None: repo_id = await _db_repo(db_session) await _db_commit(db_session, repo_id) await _db_issue(db_session, repo_id, body="verbose body") await db_session.flush() result = await build_agent_context( db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.verbose ) assert result is not None assert len(result.open_issues) == 1 assert result.open_issues[0].body == "verbose body" async def test_standard_depth_issue_body_empty( self, db_session: AsyncSession ) -> None: repo_id = await _db_repo(db_session) await _db_commit(db_session, repo_id) await _db_issue(db_session, repo_id, body="hidden") await db_session.flush() result = await build_agent_context( db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.standard ) assert result is not None assert result.open_issues[0].body == "" # =========================================================================== # Layer 3 — E2E # =========================================================================== class TestE2EContextEndpoint: async def test_200_with_all_sections( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: repo_id = await _api_repo(client, auth_headers) await _db_commit(db_session, repo_id) await db_session.commit() r = await client.get(f"/api/repos/{repo_id}/context", headers=auth_headers) assert r.status_code == 200 body = r.json() for key in ("repoId", "ref", "depth", "musicalState", "history", "analysis", "activeProposals", "openIssues", "suggestions"): assert key in body async def test_depth_brief_param( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: repo_id = await _api_repo(client, auth_headers) for i in range(6): await _db_commit(db_session, repo_id, message=f"c{i}") await db_session.commit() r = await client.get( f"/api/repos/{repo_id}/context?depth=brief", headers=auth_headers ) assert r.status_code == 200 body = r.json() assert body["depth"] == "brief" assert len(body["history"]) <= 3 async def test_depth_verbose_param( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: repo_id = await _api_repo(client, auth_headers) await _db_commit(db_session, repo_id) await _db_issue(db_session, repo_id, body="full body verbose") await db_session.commit() r = await client.get( f"/api/repos/{repo_id}/context?depth=verbose", headers=auth_headers ) assert r.status_code == 200 body = r.json() assert body["depth"] == "verbose" assert body["openIssues"][0]["body"] == "full body verbose" async def test_invalid_depth_422( self, client: AsyncClient, auth_headers: StrDict, ) -> None: r = await client.get( "/api/repos/any-id/context?depth=ultra", headers=auth_headers ) assert r.status_code == 422 async def test_unknown_repo_404( self, client: AsyncClient, auth_headers: StrDict, ) -> None: r = await client.get( "/api/repos/no-such-repo/context", headers=auth_headers ) assert r.status_code == 404 async def test_nonexistent_ref_404( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: repo_id = await _api_repo(client, auth_headers) await db_session.commit() r = await client.get( f"/api/repos/{repo_id}/context?ref=no-such-branch", headers=auth_headers ) assert r.status_code == 404 async def test_yaml_format_returns_yaml_content_type( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: import yaml repo_id = await _api_repo(client, auth_headers) await _db_commit(db_session, repo_id) await db_session.commit() r = await client.get( f"/api/repos/{repo_id}/context?format=yaml", headers=auth_headers ) assert r.status_code == 200 assert "yaml" in r.headers.get("content-type", "") parsed = yaml.safe_load(r.text) assert isinstance(parsed, dict) assert "repoId" in parsed # =========================================================================== # Layer 4 — Stress # =========================================================================== class TestStress: async def test_verbose_depth_50_commit_history( self, db_session: AsyncSession, ) -> None: """build_agent_context handles 60 commits; verbose history capped at 50.""" repo_id = await _db_repo(db_session) for i in range(60): ts = datetime(2026, 1, 1, 0, i, 0, tzinfo=timezone.utc) await _db_commit(db_session, repo_id, ts=ts, message=f"commit {i}") await db_session.flush() result = await build_agent_context( db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.verbose ) assert result is not None assert len(result.history) <= 50 async def test_concurrent_context_builds( self, db_session: AsyncSession, ) -> None: """5 concurrent build_agent_context calls on the same repo all succeed.""" repo_id = await _db_repo(db_session) for i in range(5): await _db_commit(db_session, repo_id, message=f"c{i}") await db_session.flush() results = await asyncio.gather( *[ build_agent_context( db_session, repo_id=repo_id, ref="HEAD" ) for _ in range(5) ] ) assert all(r is not None for r in results) async def test_many_open_issues_all_returned_verbose( self, db_session: AsyncSession, ) -> None: repo_id = await _db_repo(db_session) await _db_commit(db_session, repo_id) for i in range(20): await _db_issue(db_session, repo_id, number=i + 1, title=f"issue {i}") await db_session.flush() result = await build_agent_context( db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.verbose ) assert result is not None assert len(result.open_issues) == 20 # =========================================================================== # Layer 5 — Data Integrity # =========================================================================== class TestDataIntegrity: async def test_history_newest_first(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) for i in range(5): ts = datetime(2026, 1, i + 1, tzinfo=timezone.utc) await _db_commit(db_session, repo_id, ts=ts, message=f"c{i}") await db_session.flush() result = await build_agent_context( db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.verbose ) assert result is not None timestamps = [h.timestamp for h in result.history] assert timestamps == sorted(timestamps, reverse=True) async def test_head_commit_excluded_from_history( self, db_session: AsyncSession ) -> None: repo_id = await _db_repo(db_session) ts_old = datetime(2026, 1, 1, tzinfo=timezone.utc) ts_new = datetime(2026, 6, 1, tzinfo=timezone.utc) await _db_commit(db_session, repo_id, ts=ts_old, message="old") new_id = await _db_commit(db_session, repo_id, ts=ts_new, message="new") await db_session.flush() # ref=HEAD resolves to new_id; it must NOT appear in history result = await build_agent_context( db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.verbose ) assert result is not None history_ids = [h.commit_id for h in result.history] assert new_id not in history_ids async def test_closed_proposals_not_in_active_proposals( self, db_session: AsyncSession ) -> None: repo_id = await _db_repo(db_session) await _db_commit(db_session, repo_id) await _db_proposal_ctx(db_session, repo_id, proposal_number=1, state="closed") await _db_proposal_ctx(db_session, repo_id, proposal_number=2, state="merged", title="merged") await db_session.flush() result = await build_agent_context( db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.verbose ) assert result is not None assert result.active_proposals == [] async def test_closed_issues_not_in_open_issues( self, db_session: AsyncSession ) -> None: repo_id = await _db_repo(db_session) await _db_commit(db_session, repo_id) await _db_issue(db_session, repo_id, state="closed") await db_session.flush() result = await build_agent_context( db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.verbose ) assert result is not None assert result.open_issues == [] async def test_proposal_body_empty_at_brief_depth( self, db_session: AsyncSession ) -> None: repo_id = await _db_repo(db_session) await _db_commit(db_session, repo_id) await _db_proposal_ctx(db_session, repo_id, body="secret details") await db_session.flush() result = await build_agent_context( db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.brief ) assert result is not None assert result.active_proposals[0].body == "" async def test_analysis_fields_all_none(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) await _db_commit(db_session, repo_id) await db_session.flush() result = await build_agent_context( db_session, repo_id=repo_id, ref="HEAD" ) assert result is not None a = result.analysis assert a.key_finding is None assert a.chord_progression is None assert a.groove_score is None assert a.emotion is None async def test_repo_id_echoed_in_response(self, db_session: AsyncSession) -> None: repo_id = await _db_repo(db_session) await _db_commit(db_session, repo_id) await db_session.flush() result = await build_agent_context(db_session, repo_id=repo_id, ref="HEAD") assert result is not None assert result.repo_id == repo_id # =========================================================================== # Layer 6 — Security # =========================================================================== class TestSecurity: async def test_private_repo_requires_auth( self, client: AsyncClient, db_session: AsyncSession, ) -> None: """Context endpoint returns 403/401/404 for private repos without token.""" # Create repo and commit directly in DB (no auth_headers to avoid fixture override) repo_id = await _db_repo(db_session, visibility="private") await _db_commit(db_session, repo_id) await db_session.commit() r = await client.get(f"/api/repos/{repo_id}/context") # private repo without auth → 403 or 401 (implementation may 404 for privacy) assert r.status_code in (401, 403, 404) async def test_public_repo_context_accessible_without_auth( self, client: AsyncClient, db_session: AsyncSession, ) -> None: """Public repo context is readable without authentication.""" repo_id = await _db_repo(db_session, visibility="public") await _db_commit(db_session, repo_id) await db_session.commit() r = await client.get(f"/api/repos/{repo_id}/context") assert r.status_code == 200 async def test_sql_injection_in_ref_param_safe( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """SQL injection in ?ref param is handled safely (returns 404, not 500).""" repo_id = await _api_repo(client, auth_headers) await db_session.commit() malicious_ref = "'; DROP TABLE musehub_commits; --" r = await client.get( f"/api/repos/{repo_id}/context", params={"ref": malicious_ref}, headers=auth_headers, ) assert r.status_code in (404, 422) async def test_xss_in_ref_not_echoed_as_html( self, client: AsyncClient, auth_headers: StrDict, db_session: AsyncSession, ) -> None: """XSS attempt in ?ref is not reflected as raw HTML in a 200 response.""" repo_id = await _api_repo(client, auth_headers) await db_session.commit() r = await client.get( f"/api/repos/{repo_id}/context", params={"ref": ""}, headers=auth_headers, ) # Either rejected (404/422) or if echoed, must be JSON-escaped if r.status_code == 200: assert "