"""Section 23 — Agent Context API: 7-layer test suite.
Covers musehub/services/musehub_context.py and musehub/models/musehub_context.py.
The 15 existing tests in test_musehub_context.py cover E2E + integration basics;
this suite adds unit, stress, data-integrity, security, and performance layers.
Layer map
---------
1. Unit — pure functions, constants, Pydantic models
2. Integration — service functions against real PostgreSQL DB
3. E2E — HTTP client against the full app
4. Stress — large datasets, concurrent requests
5. Data Integrity — ordering, filtering, exclusion rules
6. Security — auth enforcement, private repo visibility
7. Performance — timing budgets
"""
from __future__ import annotations
import asyncio
import secrets
import time
from datetime import datetime, timezone
import pytest
from httpx import AsyncClient
from muse.core.types import fake_id
from musehub.core.genesis import compute_identity_id, compute_repo_id
from sqlalchemy.ext.asyncio import AsyncSession
from musehub.types.json_types import StrDict
from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitRef, MusehubRepo
from musehub.db.musehub_social_models import MusehubIssue, MusehubProposal
from musehub.models.musehub_context import (
ActiveProposalContext,
AgentContextResponse,
AnalysisSummaryContext,
ContextDepth,
ContextFormat,
HistoryEntryContext,
MusicalStateContext,
OpenIssueContext,
)
from musehub.services.musehub_context import (
_HISTORY_LIMIT,
_INCLUDE_ISSUE_BODY,
_INCLUDE_PROPOSAL_BODY,
_extract_tracks_from_snapshot,
_generate_suggestions,
_get_latest_commit,
_get_open_issues,
_get_open_proposals,
_resolve_ref_to_commit,
_utc_iso,
build_agent_context,
)
# ---------------------------------------------------------------------------
# DB helpers
# ---------------------------------------------------------------------------
def _uid() -> str:
return secrets.token_hex(16)
async def _db_repo(session: AsyncSession, *, visibility: str = "private") -> str:
slug = f"test-repo-{_uid()[:8]}"
owner_id = compute_identity_id(b"testuser")
created_at = datetime.now(tz=timezone.utc)
repo = MusehubRepo(
repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()),
name=slug,
slug=slug,
owner="testuser",
owner_user_id=owner_id,
visibility=visibility,
created_at=created_at,
updated_at=created_at,
)
session.add(repo)
await session.flush()
return repo.repo_id
async def _db_commit(
session: AsyncSession,
repo_id: str,
*,
branch: str = "main",
message: str = "add groove",
ts: datetime | None = None,
parent_id: str | None = None,
) -> str:
commit_id = _uid().replace("-", "")
c = MusehubCommit(
commit_id=commit_id,
branch=branch,
parent_ids=[parent_id] if parent_id else [],
message=message,
author="agent",
timestamp=ts or datetime.now(timezone.utc),
)
session.add(c)
session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id))
await session.flush()
return commit_id
async def _db_branch(session: AsyncSession, repo_id: str, name: str, head: str) -> None:
session.add(MusehubBranch(branch_id=fake_id(f"{repo_id}-branch-{name}"), repo_id=repo_id, name=name, head_commit_id=head))
await session.flush()
async def _db_issue(
session: AsyncSession,
repo_id: str,
*,
number: int = 1,
title: str = "fix harmony",
body: str = "needs fixing",
state: str = "open",
labels: list[str] | None = None,
) -> str:
issue = MusehubIssue(
issue_id=fake_id(f"{repo_id}-issue-{number}"),
repo_id=repo_id,
number=number,
title=title,
body=body,
state=state,
labels=labels or [],
)
session.add(issue)
await session.flush()
return issue.issue_id
async def _db_proposal_ctx(
session: AsyncSession,
repo_id: str,
*,
proposal_number: int = 1,
title: str = "add tritone sub",
body: str = "see description",
state: str = "open",
from_branch: str = "feat/x",
to_branch: str = "main",
) -> str:
proposal = MusehubProposal(
proposal_id=fake_id(f"{repo_id}-proposal-{proposal_number}"),
repo_id=repo_id,
proposal_number=proposal_number,
title=title,
body=body,
state=state,
from_branch=from_branch,
to_branch=to_branch,
)
session.add(proposal)
await session.flush()
return proposal.proposal_id
async def _api_repo(
client: AsyncClient,
auth_headers: StrDict,
*,
name: str | None = None,
visibility: str = "private",
) -> str:
name = name or f"repo-{_uid()[:8]}"
r = await client.post(
"/api/repos",
json={"name": name, "owner": "testuser", "visibility": visibility},
headers=auth_headers,
)
assert r.status_code == 201, r.text
return r.json()["repoId"]
# ===========================================================================
# Layer 1 — Unit
# ===========================================================================
class TestUnitUtcIso:
def test_naive_datetime_gets_utc(self) -> None:
dt = datetime(2026, 1, 15, 12, 0, 0)
result = _utc_iso(dt)
assert "+00:00" in result or "Z" in result.upper() or "UTC" in result
assert "2026-01-15" in result
def test_aware_datetime_preserved(self) -> None:
dt = datetime(2026, 6, 1, 0, 0, 0, tzinfo=timezone.utc)
result = _utc_iso(dt)
assert "2026-06-01" in result
def test_returns_string(self) -> None:
assert isinstance(_utc_iso(datetime.now(timezone.utc)), str)
def test_iso_format_parseable(self) -> None:
dt = datetime(2025, 3, 14, 9, 26, 53, tzinfo=timezone.utc)
result = _utc_iso(dt)
parsed = datetime.fromisoformat(result)
assert parsed.year == 2025
assert parsed.month == 3
assert parsed.day == 14
class TestUnitExtractTracks:
def test_none_snapshot_returns_empty(self) -> None:
assert _extract_tracks_from_snapshot(None) == []
def test_any_object_returns_empty(self) -> None:
# stub returns [] regardless — just verifies the contract
assert _extract_tracks_from_snapshot(object()) == []
def test_returns_list(self) -> None:
result = _extract_tracks_from_snapshot(None)
assert isinstance(result, list)
class TestUnitHistoryLimit:
def test_brief_is_three(self) -> None:
assert _HISTORY_LIMIT[ContextDepth.brief] == 3
def test_standard_is_ten(self) -> None:
assert _HISTORY_LIMIT[ContextDepth.standard] == 10
def test_verbose_is_fifty(self) -> None:
assert _HISTORY_LIMIT[ContextDepth.verbose] == 50
def test_all_depths_covered(self) -> None:
for depth in ContextDepth:
assert depth in _HISTORY_LIMIT
class TestUnitIncludeFlags:
def test_proposal_body_brief_false(self) -> None:
assert _INCLUDE_PROPOSAL_BODY[ContextDepth.brief] is False
def test_proposal_body_standard_true(self) -> None:
assert _INCLUDE_PROPOSAL_BODY[ContextDepth.standard] is True
def test_proposal_body_verbose_true(self) -> None:
assert _INCLUDE_PROPOSAL_BODY[ContextDepth.verbose] is True
def test_issue_body_brief_false(self) -> None:
assert _INCLUDE_ISSUE_BODY[ContextDepth.brief] is False
def test_issue_body_standard_false(self) -> None:
assert _INCLUDE_ISSUE_BODY[ContextDepth.standard] is False
def test_issue_body_verbose_true(self) -> None:
assert _INCLUDE_ISSUE_BODY[ContextDepth.verbose] is True
class TestUnitGenerateSuggestions:
def _empty_state(self) -> MusicalStateContext:
return MusicalStateContext(active_tracks=[])
def _state_with_tracks(self) -> MusicalStateContext:
return MusicalStateContext(active_tracks=["drums", "bass"])
def _issue(self, n: int = 1) -> OpenIssueContext:
return OpenIssueContext(
issue_id=_uid(), number=n, title=f"issue {n}", labels=[], body=""
)
def _proposal_ctx(self) -> ActiveProposalContext:
return ActiveProposalContext(
proposal_id=_uid(),
title="add swing feel",
from_branch="feat/swing",
to_branch="main",
state="open",
body="",
)
def test_no_tracks_generates_suggestion(self) -> None:
s = _generate_suggestions(self._empty_state(), [], [], ContextDepth.standard)
assert len(s) >= 1
assert any("No files" in x for x in s)
def test_with_tracks_no_no_files_suggestion(self) -> None:
s = _generate_suggestions(
self._state_with_tracks(), [], [], ContextDepth.standard
)
assert not any("No files" in x for x in s)
def test_open_issue_generates_suggestion(self) -> None:
s = _generate_suggestions(
self._state_with_tracks(), [self._issue(5)], [], ContextDepth.standard
)
assert any("#5" in x for x in s)
def test_open_pr_generates_suggestion(self) -> None:
s = _generate_suggestions(
self._state_with_tracks(), [], [self._proposal_ctx()], ContextDepth.standard
)
assert any("add swing feel" in x for x in s)
def test_brief_caps_at_two(self) -> None:
# force 3 suggestions: no tracks + issue + proposal
s = _generate_suggestions(
self._empty_state(), [self._issue()], [self._proposal_ctx()], ContextDepth.brief
)
assert len(s) <= 2
def test_standard_caps_at_four(self) -> None:
issues = [self._issue(i) for i in range(1, 5)]
proposals_ctx = [self._proposal_ctx()]
# empty state + 4 issues + 1 proposal = 6 raw suggestions; capped at 4
s = _generate_suggestions(self._empty_state(), issues, proposals_ctx, ContextDepth.standard)
assert len(s) <= 4
def test_verbose_uncapped(self) -> None:
issues = [self._issue(i) for i in range(1, 5)]
proposals_ctx = [self._proposal_ctx()]
s = _generate_suggestions(self._empty_state(), issues, proposals_ctx, ContextDepth.verbose)
# 1 (no tracks) + 1 (first issue) + 1 (first proposal) = 3 — all returned
assert len(s) == 3
def test_returns_strings(self) -> None:
s = _generate_suggestions(self._empty_state(), [], [], ContextDepth.brief)
assert all(isinstance(x, str) for x in s)
def test_deterministic(self) -> None:
state = self._empty_state()
issues = [self._issue()]
proposals_ctx = [self._proposal_ctx()]
s1 = _generate_suggestions(state, issues, proposals_ctx, ContextDepth.standard)
s2 = _generate_suggestions(state, issues, proposals_ctx, ContextDepth.standard)
assert s1 == s2
class TestUnitModels:
def test_context_depth_values(self) -> None:
assert ContextDepth.brief == "brief"
assert ContextDepth.standard == "standard"
assert ContextDepth.verbose == "verbose"
def test_context_format_values(self) -> None:
assert ContextFormat.json == "json"
assert ContextFormat.yaml == "yaml"
def test_musical_state_default_empty_tracks(self) -> None:
m = MusicalStateContext()
assert m.active_tracks == []
def test_history_entry_context_fields(self) -> None:
h = HistoryEntryContext(
commit_id="abc123",
message="add bass",
author="agent",
timestamp="2026-01-01T00:00:00+00:00",
)
assert h.commit_id == "abc123"
assert h.active_tracks == []
def test_analysis_all_none_by_default(self) -> None:
a = AnalysisSummaryContext()
assert a.key_finding is None
assert a.chord_progression is None
assert a.groove_score is None
assert a.emotion is None
assert a.harmonic_tension is None
assert a.melodic_contour is None
def test_open_issue_context_defaults(self) -> None:
i = OpenIssueContext(issue_id=_uid(), number=1, title="fix")
assert i.labels == []
assert i.body == ""
def test_active_pr_context_defaults(self) -> None:
p = ActiveProposalContext(
proposal_id=_uid(),
title="Proposal",
from_branch="a",
to_branch="b",
state="open",
)
assert p.body == ""
def test_agent_context_response_camel_fields(self) -> None:
resp = AgentContextResponse(
repo_id="r1",
ref="main",
depth="standard",
musical_state=MusicalStateContext(),
analysis=AnalysisSummaryContext(),
)
d = resp.model_dump(by_alias=True)
assert "repoId" in d
assert "musicalState" in d
assert "activeProposals" in d
assert "openIssues" in d
# ===========================================================================
# Layer 2 — Integration
# ===========================================================================
class TestIntegrationResolveRef:
async def test_resolve_branch_name(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
commit_id = await _db_commit(db_session, repo_id)
await _db_branch(db_session, repo_id, "main", commit_id)
await db_session.flush()
result = await _resolve_ref_to_commit(db_session, repo_id, "main")
assert result is not None
assert result.commit_id == commit_id
async def test_resolve_commit_id_directly(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
commit_id = await _db_commit(db_session, repo_id)
await db_session.flush()
result = await _resolve_ref_to_commit(db_session, repo_id, commit_id)
assert result is not None
assert result.commit_id == commit_id
async def test_resolve_nonexistent_returns_none(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
await db_session.flush()
result = await _resolve_ref_to_commit(db_session, repo_id, "nonexistent-ref")
assert result is None
async def test_branch_takes_priority_over_commit_id(self, db_session: AsyncSession) -> None:
"""If a branch name happens to equal a commit ID substring, branch wins."""
repo_id = await _db_repo(db_session)
commit_id = await _db_commit(db_session, repo_id)
branch_commit_id = await _db_commit(db_session, repo_id, message="branch head")
await _db_branch(db_session, repo_id, "main", branch_commit_id)
await db_session.flush()
# Resolving "main" returns the branch head, not commit_id
result = await _resolve_ref_to_commit(db_session, repo_id, "main")
assert result is not None
assert result.commit_id == branch_commit_id
class TestIntegrationGetLatestCommit:
async def test_returns_most_recent(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
ts_old = datetime(2026, 1, 1, tzinfo=timezone.utc)
ts_new = datetime(2026, 6, 1, tzinfo=timezone.utc)
await _db_commit(db_session, repo_id, ts=ts_old, message="old")
new_id = await _db_commit(db_session, repo_id, ts=ts_new, message="new")
await db_session.flush()
result = await _get_latest_commit(db_session, repo_id)
assert result is not None
assert result.commit_id == new_id
async def test_no_commits_returns_none(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
await db_session.flush()
result = await _get_latest_commit(db_session, repo_id)
assert result is None
class TestIntegrationGetOpenProposals:
async def test_include_body_true(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
await _db_proposal_ctx(db_session, repo_id, body="detailed body text")
await db_session.flush()
results = await _get_open_proposals(db_session, repo_id, include_body=True)
assert len(results) == 1
assert results[0].body == "detailed body text"
async def test_include_body_false(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
await _db_proposal_ctx(db_session, repo_id, body="detailed body text")
await db_session.flush()
results = await _get_open_proposals(db_session, repo_id, include_body=False)
assert len(results) == 1
assert results[0].body == ""
async def test_closed_prs_excluded(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
await _db_proposal_ctx(db_session, repo_id, state="closed")
await db_session.flush()
results = await _get_open_proposals(db_session, repo_id, include_body=False)
assert results == []
class TestIntegrationGetOpenIssues:
async def test_include_body_verbose(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
await _db_issue(db_session, repo_id, body="full body text")
await db_session.flush()
results = await _get_open_issues(db_session, repo_id, include_body=True)
assert len(results) == 1
assert results[0].body == "full body text"
async def test_include_body_false_empty_string(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
await _db_issue(db_session, repo_id, body="full body text")
await db_session.flush()
results = await _get_open_issues(db_session, repo_id, include_body=False)
assert results[0].body == ""
async def test_closed_issues_excluded(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
await _db_issue(db_session, repo_id, state="closed")
await db_session.flush()
results = await _get_open_issues(db_session, repo_id, include_body=False)
assert results == []
async def test_ordered_by_number(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
await _db_issue(db_session, repo_id, number=5, title="five")
await _db_issue(db_session, repo_id, number=2, title="two")
await _db_issue(db_session, repo_id, number=8, title="eight")
await db_session.flush()
results = await _get_open_issues(db_session, repo_id, include_body=False)
assert [r.number for r in results] == [2, 5, 8]
class TestIntegrationBuildAgentContext:
async def test_repo_not_found_returns_none(self, db_session: AsyncSession) -> None:
result = await build_agent_context(
db_session, repo_id="nonexistent-repo", ref="main"
)
assert result is None
async def test_no_commits_returns_none(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
await db_session.flush()
result = await build_agent_context(db_session, repo_id=repo_id, ref="HEAD")
assert result is None
async def test_head_resolves_to_latest(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
ts_old = datetime(2026, 1, 1, tzinfo=timezone.utc)
ts_new = datetime(2026, 6, 1, tzinfo=timezone.utc)
await _db_commit(db_session, repo_id, ts=ts_old, branch="main", message="old")
new_id = await _db_commit(
db_session, repo_id, ts=ts_new, branch="main", message="new"
)
await _db_branch(db_session, repo_id, "main", new_id)
await db_session.flush()
result = await build_agent_context(db_session, repo_id=repo_id, ref="HEAD")
assert result is not None
assert result.repo_id == repo_id
# History excludes the head commit; head is the new one
history_ids = [h.commit_id for h in result.history]
assert new_id not in history_ids
async def test_branch_ref_resolution(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
commit_id = await _db_commit(db_session, repo_id, branch="feature")
await _db_branch(db_session, repo_id, "feature", commit_id)
await db_session.flush()
result = await build_agent_context(
db_session, repo_id=repo_id, ref="feature"
)
assert result is not None
assert result.ref == "feature"
async def test_brief_depth_history_limit(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
for i in range(8):
ts = datetime(2026, 1, i + 1, tzinfo=timezone.utc)
await _db_commit(
db_session, repo_id, ts=ts, branch="main", message=f"commit {i}"
)
await db_session.flush()
result = await build_agent_context(
db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.brief
)
assert result is not None
assert len(result.history) <= 3
async def test_verbose_depth_issue_body_included(
self, db_session: AsyncSession
) -> None:
repo_id = await _db_repo(db_session)
await _db_commit(db_session, repo_id)
await _db_issue(db_session, repo_id, body="verbose body")
await db_session.flush()
result = await build_agent_context(
db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.verbose
)
assert result is not None
assert len(result.open_issues) == 1
assert result.open_issues[0].body == "verbose body"
async def test_standard_depth_issue_body_empty(
self, db_session: AsyncSession
) -> None:
repo_id = await _db_repo(db_session)
await _db_commit(db_session, repo_id)
await _db_issue(db_session, repo_id, body="hidden")
await db_session.flush()
result = await build_agent_context(
db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.standard
)
assert result is not None
assert result.open_issues[0].body == ""
# ===========================================================================
# Layer 3 — E2E
# ===========================================================================
class TestE2EContextEndpoint:
async def test_200_with_all_sections(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
repo_id = await _api_repo(client, auth_headers)
await _db_commit(db_session, repo_id)
await db_session.commit()
r = await client.get(f"/api/repos/{repo_id}/context", headers=auth_headers)
assert r.status_code == 200
body = r.json()
for key in ("repoId", "ref", "depth", "musicalState", "history", "analysis", "activeProposals", "openIssues", "suggestions"):
assert key in body
async def test_depth_brief_param(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
repo_id = await _api_repo(client, auth_headers)
for i in range(6):
await _db_commit(db_session, repo_id, message=f"c{i}")
await db_session.commit()
r = await client.get(
f"/api/repos/{repo_id}/context?depth=brief", headers=auth_headers
)
assert r.status_code == 200
body = r.json()
assert body["depth"] == "brief"
assert len(body["history"]) <= 3
async def test_depth_verbose_param(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
repo_id = await _api_repo(client, auth_headers)
await _db_commit(db_session, repo_id)
await _db_issue(db_session, repo_id, body="full body verbose")
await db_session.commit()
r = await client.get(
f"/api/repos/{repo_id}/context?depth=verbose", headers=auth_headers
)
assert r.status_code == 200
body = r.json()
assert body["depth"] == "verbose"
assert body["openIssues"][0]["body"] == "full body verbose"
async def test_invalid_depth_422(
self,
client: AsyncClient,
auth_headers: StrDict,
) -> None:
r = await client.get(
"/api/repos/any-id/context?depth=ultra", headers=auth_headers
)
assert r.status_code == 422
async def test_unknown_repo_404(
self,
client: AsyncClient,
auth_headers: StrDict,
) -> None:
r = await client.get(
"/api/repos/no-such-repo/context", headers=auth_headers
)
assert r.status_code == 404
async def test_nonexistent_ref_404(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
repo_id = await _api_repo(client, auth_headers)
await db_session.commit()
r = await client.get(
f"/api/repos/{repo_id}/context?ref=no-such-branch", headers=auth_headers
)
assert r.status_code == 404
async def test_yaml_format_returns_yaml_content_type(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
import yaml
repo_id = await _api_repo(client, auth_headers)
await _db_commit(db_session, repo_id)
await db_session.commit()
r = await client.get(
f"/api/repos/{repo_id}/context?format=yaml", headers=auth_headers
)
assert r.status_code == 200
assert "yaml" in r.headers.get("content-type", "")
parsed = yaml.safe_load(r.text)
assert isinstance(parsed, dict)
assert "repoId" in parsed
# ===========================================================================
# Layer 4 — Stress
# ===========================================================================
class TestStress:
async def test_verbose_depth_50_commit_history(
self,
db_session: AsyncSession,
) -> None:
"""build_agent_context handles 60 commits; verbose history capped at 50."""
repo_id = await _db_repo(db_session)
for i in range(60):
ts = datetime(2026, 1, 1, 0, i, 0, tzinfo=timezone.utc)
await _db_commit(db_session, repo_id, ts=ts, message=f"commit {i}")
await db_session.flush()
result = await build_agent_context(
db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.verbose
)
assert result is not None
assert len(result.history) <= 50
async def test_concurrent_context_builds(
self,
db_session: AsyncSession,
) -> None:
"""5 concurrent build_agent_context calls on the same repo all succeed."""
repo_id = await _db_repo(db_session)
for i in range(5):
await _db_commit(db_session, repo_id, message=f"c{i}")
await db_session.flush()
results = await asyncio.gather(
*[
build_agent_context(
db_session, repo_id=repo_id, ref="HEAD"
)
for _ in range(5)
]
)
assert all(r is not None for r in results)
async def test_many_open_issues_all_returned_verbose(
self,
db_session: AsyncSession,
) -> None:
repo_id = await _db_repo(db_session)
await _db_commit(db_session, repo_id)
for i in range(20):
await _db_issue(db_session, repo_id, number=i + 1, title=f"issue {i}")
await db_session.flush()
result = await build_agent_context(
db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.verbose
)
assert result is not None
assert len(result.open_issues) == 20
# ===========================================================================
# Layer 5 — Data Integrity
# ===========================================================================
class TestDataIntegrity:
async def test_history_newest_first(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
for i in range(5):
ts = datetime(2026, 1, i + 1, tzinfo=timezone.utc)
await _db_commit(db_session, repo_id, ts=ts, message=f"c{i}")
await db_session.flush()
result = await build_agent_context(
db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.verbose
)
assert result is not None
timestamps = [h.timestamp for h in result.history]
assert timestamps == sorted(timestamps, reverse=True)
async def test_head_commit_excluded_from_history(
self, db_session: AsyncSession
) -> None:
repo_id = await _db_repo(db_session)
ts_old = datetime(2026, 1, 1, tzinfo=timezone.utc)
ts_new = datetime(2026, 6, 1, tzinfo=timezone.utc)
await _db_commit(db_session, repo_id, ts=ts_old, message="old")
new_id = await _db_commit(db_session, repo_id, ts=ts_new, message="new")
await db_session.flush()
# ref=HEAD resolves to new_id; it must NOT appear in history
result = await build_agent_context(
db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.verbose
)
assert result is not None
history_ids = [h.commit_id for h in result.history]
assert new_id not in history_ids
async def test_closed_proposals_not_in_active_proposals(
self, db_session: AsyncSession
) -> None:
repo_id = await _db_repo(db_session)
await _db_commit(db_session, repo_id)
await _db_proposal_ctx(db_session, repo_id, proposal_number=1, state="closed")
await _db_proposal_ctx(db_session, repo_id, proposal_number=2, state="merged", title="merged")
await db_session.flush()
result = await build_agent_context(
db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.verbose
)
assert result is not None
assert result.active_proposals == []
async def test_closed_issues_not_in_open_issues(
self, db_session: AsyncSession
) -> None:
repo_id = await _db_repo(db_session)
await _db_commit(db_session, repo_id)
await _db_issue(db_session, repo_id, state="closed")
await db_session.flush()
result = await build_agent_context(
db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.verbose
)
assert result is not None
assert result.open_issues == []
async def test_proposal_body_empty_at_brief_depth(
self, db_session: AsyncSession
) -> None:
repo_id = await _db_repo(db_session)
await _db_commit(db_session, repo_id)
await _db_proposal_ctx(db_session, repo_id, body="secret details")
await db_session.flush()
result = await build_agent_context(
db_session, repo_id=repo_id, ref="HEAD", depth=ContextDepth.brief
)
assert result is not None
assert result.active_proposals[0].body == ""
async def test_analysis_fields_all_none(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
await _db_commit(db_session, repo_id)
await db_session.flush()
result = await build_agent_context(
db_session, repo_id=repo_id, ref="HEAD"
)
assert result is not None
a = result.analysis
assert a.key_finding is None
assert a.chord_progression is None
assert a.groove_score is None
assert a.emotion is None
async def test_repo_id_echoed_in_response(self, db_session: AsyncSession) -> None:
repo_id = await _db_repo(db_session)
await _db_commit(db_session, repo_id)
await db_session.flush()
result = await build_agent_context(db_session, repo_id=repo_id, ref="HEAD")
assert result is not None
assert result.repo_id == repo_id
# ===========================================================================
# Layer 6 — Security
# ===========================================================================
class TestSecurity:
async def test_private_repo_requires_auth(
self,
client: AsyncClient,
db_session: AsyncSession,
) -> None:
"""Context endpoint returns 403/401/404 for private repos without token."""
# Create repo and commit directly in DB (no auth_headers to avoid fixture override)
repo_id = await _db_repo(db_session, visibility="private")
await _db_commit(db_session, repo_id)
await db_session.commit()
r = await client.get(f"/api/repos/{repo_id}/context")
# private repo without auth → 403 or 401 (implementation may 404 for privacy)
assert r.status_code in (401, 403, 404)
async def test_public_repo_context_accessible_without_auth(
self,
client: AsyncClient,
db_session: AsyncSession,
) -> None:
"""Public repo context is readable without authentication."""
repo_id = await _db_repo(db_session, visibility="public")
await _db_commit(db_session, repo_id)
await db_session.commit()
r = await client.get(f"/api/repos/{repo_id}/context")
assert r.status_code == 200
async def test_sql_injection_in_ref_param_safe(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
"""SQL injection in ?ref param is handled safely (returns 404, not 500)."""
repo_id = await _api_repo(client, auth_headers)
await db_session.commit()
malicious_ref = "'; DROP TABLE musehub_commits; --"
r = await client.get(
f"/api/repos/{repo_id}/context",
params={"ref": malicious_ref},
headers=auth_headers,
)
assert r.status_code in (404, 422)
async def test_xss_in_ref_not_echoed_as_html(
self,
client: AsyncClient,
auth_headers: StrDict,
db_session: AsyncSession,
) -> None:
"""XSS attempt in ?ref is not reflected as raw HTML in a 200 response."""
repo_id = await _api_repo(client, auth_headers)
await db_session.commit()
r = await client.get(
f"/api/repos/{repo_id}/context",
params={"ref": ""},
headers=auth_headers,
)
# Either rejected (404/422) or if echoed, must be JSON-escaped
if r.status_code == 200:
assert "