"""Section 6 — Symbol Intelligence (Intel): 7-layer test suite.
Covers:
- musehub/services/musehub_intel.py (compute_intel, _parse_ts, _health_label,
_health_color_class, IntelSnapshot, as_dict/from_dict)
- musehub/api/routes/musehub/blame.py (_build_real_symbol_blame, GET /repos/{repo_id}/blame/{ref})
- musehub/services/musehub_cross_repo.py (search_symbol_across_repos, cross_repo_impact,
workspace_blast_risk_top_n, build_deps_graph,
_module_prefix, _short_label)
Layers:
1. Unit — pure function tests, no DB, no I/O
2. Integration — real DB (PostgreSQL), service calls, no HTTP layer
3. End-to-End — full HTTP via AsyncClient, real DB
4. Stress — large data sets, volume correctness
5. Data Integrity — stored data correctness, field validation, round-trip
6. Security — auth guards, private repo access, injection safety
7. Performance — latency budgets for critical paths
"""
from __future__ import annotations
import json
import secrets
import time
from datetime import datetime, timedelta, timezone
import msgpack
type SymbolHistory = dict[str, list[JSONObject]]
import pytest
import pytest_asyncio
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncSession
from musehub.services.musehub_intel import (
IntelSnapshot,
BlastRiskEntry,
CouplingPair,
DeadEntry,
HotspotEntry,
VelocityWindow,
_health_color_class,
_health_label,
_parse_ts,
compute_intel,
)
from musehub.types.json_types import JSONObject, StrDict
from tests.factories import create_repo
# ---------------------------------------------------------------------------
# Local helpers
# ---------------------------------------------------------------------------
def _now() -> datetime:
return datetime.now(tz=timezone.utc)
def _ago(days: int = 0, **kwargs: int) -> datetime:
return _now() - timedelta(days=days, **kwargs)
def _ts(dt: datetime) -> str:
return dt.isoformat()
def _entry(commit_id: str, op: str = "add", ts: datetime | None = None,
content_id: str = "sha256:abc") -> JSONObject:
return {
"commit_id": commit_id,
"op": op,
"timestamp": _ts(ts or _now()),
"committed_at": _ts(ts or _now()),
"content_id": content_id,
}
def _history(**kwargs: list[JSONObject]) -> SymbolHistory:
"""Build a symbol_history dict from keyword args: addr=entries."""
return dict(kwargs)
async def _build_index(session: AsyncSession, repo_id: str, head_id: str,
ops: list[JSONObject]) -> "types.SimpleNamespace":
"""Insert one commit, build the symbol index, persist results, and return
a namespace with intel_full_json and intel_summary attributes."""
import types as _types
from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef
from musehub.services.musehub_symbol_indexer import build_symbol_index
from musehub.services.musehub_intel_providers import persist_intel_results
commit = MusehubCommit(
commit_id=head_id,
branch="main",
parent_ids=[],
message="test commit",
author="gabriel",
timestamp=_now(),
structured_delta={"ops": ops},
)
session.add(commit)
session.add(MusehubCommitRef(repo_id=repo_id, commit_id=head_id))
await session.flush()
results = await build_symbol_index(session, repo_id, head_id)
await persist_intel_results(session, repo_id, head_id, results)
await session.commit()
data_by_type = {t: json.dumps(d) for t, d in results}
return _types.SimpleNamespace(
intel_full_json=data_by_type.get("code.intel_snapshot"),
intel_summary=data_by_type.get("code.intel_summary"),
)
def _insert_op(address: str, content_id: str = "sha256:abc") -> JSONObject:
return {"address": address, "op": "insert", "content_id": content_id}
# ===========================================================================
# Layer 1 — Unit tests (pure functions, no DB, no I/O)
# ===========================================================================
class TestParseTs:
def test_iso_string_utc(self) -> None:
dt = _parse_ts("2025-01-15T10:30:00+00:00")
assert dt.year == 2025
assert dt.month == 1
assert dt.tzinfo is not None
def test_iso_string_z_suffix(self) -> None:
dt = _parse_ts("2025-06-01T00:00:00Z")
assert dt.tzinfo is not None
assert dt.year == 2025
def test_unix_int(self) -> None:
dt = _parse_ts(0)
assert dt.year == 1970
assert dt.tzinfo is not None
def test_unix_float(self) -> None:
dt = _parse_ts(1_700_000_000.5)
assert dt.year == 2023
def test_invalid_string_raises(self) -> None:
with pytest.raises(Exception):
_parse_ts("not-a-date")
class TestHealthLabel:
def test_excellent(self) -> None:
assert _health_label(100) == "Excellent"
assert _health_label(90) == "Excellent"
def test_good(self) -> None:
assert _health_label(89) == "Good"
assert _health_label(75) == "Good"
def test_fair(self) -> None:
assert _health_label(74) == "Fair"
assert _health_label(55) == "Fair"
def test_poor(self) -> None:
assert _health_label(54) == "Poor"
assert _health_label(35) == "Poor"
def test_critical(self) -> None:
assert _health_label(34) == "Critical"
assert _health_label(0) == "Critical"
class TestHealthColorClass:
def test_excellent(self) -> None:
assert _health_color_class(90) == "intel-health--excellent"
def test_good(self) -> None:
assert _health_color_class(75) == "intel-health--good"
def test_fair(self) -> None:
assert _health_color_class(55) == "intel-health--fair"
def test_poor(self) -> None:
assert _health_color_class(35) == "intel-health--poor"
def test_critical(self) -> None:
assert _health_color_class(0) == "intel-health--critical"
class TestComputeIntelUnit:
def test_empty_history_returns_zero_score(self) -> None:
snap = compute_intel({}, [], now_utc=_now())
assert snap.total_symbols == 0
assert snap.total_commits_indexed == 0
assert snap.health_score == 100 # no penalties = 100
assert snap.health_label == "Excellent"
def test_single_symbol_no_ts(self) -> None:
history = {"file.py::Foo": [{"commit_id": "c1", "op": "add"}]}
snap = compute_intel(history, [], now_utc=_now())
assert snap.total_symbols == 1
assert snap.total_commits_indexed == 1
def test_hotspot_detection(self) -> None:
# 12 changes on one symbol — exceeds _HOTSPOT_THRESHOLD (10)
entries = [_entry(f"c{i}") for i in range(12)]
history = {"file.py::HotFn": entries}
snap = compute_intel(history, [], now_utc=_now())
assert snap.alert_hotspot_count >= 1
assert any(h.address == "file.py::HotFn" for h in snap.hotspots)
def test_dead_code_detection(self) -> None:
# One old entry, last touched 100 days ago
old_ts = _ago(100)
history = {"file.py::Stale": [_entry("c1", ts=old_ts)]}
snap = compute_intel(history, [], now_utc=_now())
assert snap.alert_dead_count >= 1
assert any(d.address == "file.py::Stale" for d in snap.dead_candidates)
def test_recent_symbol_not_dead(self) -> None:
recent_ts = _ago(5)
history = {"file.py::Fresh": [_entry("c1", ts=recent_ts)]}
snap = compute_intel(history, [], now_utc=_now())
assert snap.alert_dead_count == 0
def test_blast_risk_co_change(self) -> None:
# Two symbols always change together → blast risk for both
entries_a = [_entry("c1"), _entry("c2")]
entries_b = [_entry("c1"), _entry("c2")]
history = {
"file.py::Alpha": entries_a,
"file.py::Beta": entries_b,
}
snap = compute_intel(history, [], now_utc=_now())
# Both are co-changed — blast risk entries should include at least one
assert len(snap.blast_risk) >= 1
def test_coupling_pairs_detected(self) -> None:
# Symbols sharing same commit → coupling pair
entries_a = [_entry("shared-commit")]
entries_b = [_entry("shared-commit")]
history = {
"file.py::A": entries_a,
"file.py::B": entries_b,
}
snap = compute_intel(history, [], now_utc=_now())
assert len(snap.coupling_pairs) >= 1
pair = snap.coupling_pairs[0]
assert pair.shared_commits >= 1
def test_breaking_changes_reduce_score(self) -> None:
snap_no_breaks = compute_intel({}, [], now_utc=_now())
snap_with_breaks = compute_intel({}, ["break1", "break2", "break3"], now_utc=_now())
assert snap_with_breaks.health_score < snap_no_breaks.health_score
assert snap_with_breaks.alert_breaking_count == 3
def test_velocity_buckets_populated(self) -> None:
recent = _ago(days=1)
history = {"file.py::Fn": [_entry("c1", ts=recent)]}
snap = compute_intel(history, [], now_utc=_now())
assert len(snap.velocity.weeks) == 12
assert snap.velocity.weeks[0] >= 1 # most recent week bucket
def test_health_score_capped_at_100(self) -> None:
snap = compute_intel({}, [], now_utc=_now())
assert 0 <= snap.health_score <= 100
def test_top_n_hotspots_limit(self) -> None:
# 20 symbols each changed 15 times → _TOP_N=10 returned
history: SymbolHistory = {}
for i in range(20):
history[f"file.py::Fn{i}"] = [_entry(f"c{i}_{j}") for j in range(15)]
snap = compute_intel(history, [], now_utc=_now())
assert len(snap.hotspots) <= 10
def test_dead_candidates_sorted_by_coldest_first(self) -> None:
h = {
"file.py::Old": [_entry("c1", ts=_ago(200))],
"file.py::Older": [_entry("c2", ts=_ago(300))],
}
snap = compute_intel(h, [], now_utc=_now())
if len(snap.dead_candidates) >= 2:
assert snap.dead_candidates[0].days_cold >= snap.dead_candidates[1].days_cold
def test_timestamp_invalid_gracefully_ignored(self) -> None:
history = {
"file.py::BadTs": [{"commit_id": "c1", "op": "add", "timestamp": "NOT_A_DATE"}]
}
snap = compute_intel(history, [], now_utc=_now())
# Should not raise; symbol counted but ts ignored
assert snap.total_symbols == 1
class TestIntelSnapshotSerialisation:
def _make_snap(self) -> IntelSnapshot:
return IntelSnapshot(
health_score=80,
health_label="Good",
alert_hotspot_count=2,
alert_dead_count=1,
alert_blast_risk_count=3,
alert_breaking_count=0,
hotspots=[HotspotEntry(address="a.py::Fn", change_count=15, last_changed=None)],
dead_candidates=[DeadEntry(address="b.py::Old", days_cold=120, blast_radius=0, added_at=None)],
blast_risk=[BlastRiskEntry(address="c.py::Risk", co_change_count=25, top_co_symbols=["d.py::X"])],
coupling_pairs=[CouplingPair(address_a="a.py::F", address_b="b.py::G", shared_commits=5)],
velocity=VelocityWindow(weeks=[1, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0]),
total_symbols=50,
total_commits_indexed=10,
)
def test_as_dict_round_trip(self) -> None:
snap = self._make_snap()
d = snap.as_dict()
reconstructed = IntelSnapshot.from_dict(d)
assert reconstructed.health_score == 80
assert reconstructed.health_label == "Good"
assert reconstructed.total_symbols == 50
assert reconstructed.hotspots[0].address == "a.py::Fn"
assert reconstructed.dead_candidates[0].days_cold == 120
assert reconstructed.blast_risk[0].co_change_count == 25
assert reconstructed.coupling_pairs[0].shared_commits == 5
assert reconstructed.velocity.weeks[0] == 1
def test_as_dict_json_serialisable(self) -> None:
snap = self._make_snap()
d = snap.as_dict()
# Must be JSON-serialisable (no datetimes, no custom objects)
json_str = json.dumps(d)
assert "health_score" in json_str
def test_from_dict_missing_optional_fields(self) -> None:
minimal = {
"health_score": 70,
"health_label": "Fair",
"alert_hotspot_count": 0,
"alert_dead_count": 0,
"alert_blast_risk_count": 0,
"alert_breaking_count": 0,
"total_symbols": 0,
"total_commits_indexed": 0,
}
snap = IntelSnapshot.from_dict(minimal)
assert snap.hotspots == []
assert snap.dead_candidates == []
assert snap.coupling_pairs == []
assert snap.velocity.weeks == []
class TestModulePrefix:
def test_three_segments(self) -> None:
from musehub.services.musehub_cross_repo import _module_prefix
result = _module_prefix("musehub.services.musehub_ci.enqueue_run")
assert result == "musehub.services.musehub_ci"
def test_fewer_than_depth(self) -> None:
from musehub.services.musehub_cross_repo import _module_prefix
result = _module_prefix("a.b")
assert result == "a.b" # shorter than depth=3, returns as-is
def test_exactly_depth(self) -> None:
from musehub.services.musehub_cross_repo import _module_prefix
result = _module_prefix("a.b.c")
assert result == "a.b.c"
def test_custom_depth(self) -> None:
from musehub.services.musehub_cross_repo import _module_prefix
result = _module_prefix("a.b.c.d.e", depth=2)
assert result == "a.b"
class TestShortLabel:
def test_two_segments(self) -> None:
from musehub.services.musehub_cross_repo import _short_label
assert _short_label("a.b.c") == "b.c"
def test_single_segment(self) -> None:
from musehub.services.musehub_cross_repo import _short_label
assert _short_label("single") == "single"
class TestBuildRealSymbolBlame:
def test_filters_to_path(self) -> None:
from musehub.api.routes.musehub.blame import _build_real_symbol_blame
history = {
"musehub/api.py::Foo": [_entry("c1")],
"other/file.py::Bar": [_entry("c2")],
}
commit_map = {
"c1": {"message": "add Foo", "author": "gabriel", "timestamp": _now()},
}
results = _build_real_symbol_blame(history, "musehub/api.py", commit_map)
assert len(results) == 1
assert results[0].symbol_name == "Foo"
def test_excludes_import_declarations(self) -> None:
from musehub.api.routes.musehub.blame import _build_real_symbol_blame
history = {
"file.py::import::os": [_entry("c1")],
"file.py::MyFn": [_entry("c1")],
}
commit_map = {"c1": {"message": "m", "author": "g", "timestamp": _now()}}
results = _build_real_symbol_blame(history, "file.py", commit_map)
names = [r.symbol_name for r in results]
assert "MyFn" in names
assert "import::os" not in names
def test_excludes_deleted_symbols(self) -> None:
from musehub.api.routes.musehub.blame import _build_real_symbol_blame
history = {
"file.py::Gone": [_entry("c1", op="delete")],
"file.py::Here": [_entry("c2", op="add")],
}
commit_map = {
"c1": {"message": "del", "author": "g", "timestamp": _now()},
"c2": {"message": "add", "author": "g", "timestamp": _now()},
}
results = _build_real_symbol_blame(history, "file.py", commit_map)
names = [r.symbol_name for r in results]
assert "Gone" not in names
assert "Here" in names
def test_intel_signals_populated(self) -> None:
from musehub.api.routes.musehub.blame import _build_real_symbol_blame
history = {
"file.py::HotFn": [_entry("c1")],
}
commit_map = {"c1": {"message": "m", "author": "g", "timestamp": _now()}}
intel = compute_intel(
{"file.py::HotFn": [_entry(f"c{i}") for i in range(15)]},
[],
now_utc=_now(),
)
results = _build_real_symbol_blame(history, "file.py", commit_map, intel=intel)
assert len(results) == 1
assert results[0].is_hotspot is True
def test_change_count_reflects_history_length(self) -> None:
from musehub.api.routes.musehub.blame import _build_real_symbol_blame
history = {
"file.py::Changed": [_entry("c1"), _entry("c2"), _entry("c3")],
}
commit_map = {
"c1": {"message": "m", "author": "g", "timestamp": _now()},
"c2": {"message": "m", "author": "g", "timestamp": _now()},
"c3": {"message": "m", "author": "g", "timestamp": _now()},
}
results = _build_real_symbol_blame(history, "file.py", commit_map)
assert results[0].change_count == 3
def test_empty_history_returns_empty_list(self) -> None:
from musehub.api.routes.musehub.blame import _build_real_symbol_blame
results = _build_real_symbol_blame({}, "file.py", {})
assert results == []
def test_unknown_commit_id_falls_back_gracefully(self) -> None:
from musehub.api.routes.musehub.blame import _build_real_symbol_blame
history = {"file.py::Fn": [_entry("unknown-commit")]}
results = _build_real_symbol_blame(history, "file.py", {})
assert len(results) == 1
assert results[0].author == ""
assert results[0].commit_message == ""
# ===========================================================================
# Layer 2 — Integration tests (real DB, service layer, no HTTP)
# ===========================================================================
class TestComputeIntelIntegration:
@pytest.mark.asyncio
async def test_load_intel_snapshot_none_when_no_index(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_symbol_indexer import load_intel_snapshot
repo = await create_repo(db_session, slug="intel-no-index")
result = await load_intel_snapshot(db_session, repo.repo_id)
assert result is None
@pytest.mark.asyncio
async def test_build_index_populates_intel_full_json(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_symbol_indexer import load_intel_snapshot
repo = await create_repo(db_session, slug="intel-populated")
ops = [_insert_op("src/main.py::run"), _insert_op("src/main.py::setup")]
row = await _build_index(db_session, repo.repo_id, "head-intel-1", ops)
assert row is not None
assert row.intel_full_json is not None
snap = await load_intel_snapshot(db_session, repo.repo_id)
assert snap is not None
assert snap.total_symbols == 2
@pytest.mark.asyncio
async def test_intel_health_score_range(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_symbol_indexer import load_intel_snapshot
repo = await create_repo(db_session, slug="intel-health-range")
ops = [_insert_op(f"src/f.py::Fn{i}") for i in range(5)]
await _build_index(db_session, repo.repo_id, "head-hr", ops)
snap = await load_intel_snapshot(db_session, repo.repo_id)
assert snap is not None
assert 0 <= snap.health_score <= 100
@pytest.mark.asyncio
async def test_intel_summary_json_fields(
self, db_session: AsyncSession
) -> None:
repo = await create_repo(db_session, slug="intel-summary-fields")
ops = [_insert_op("api.py::endpoint")]
row = await _build_index(db_session, repo.repo_id, "head-summ", ops)
assert row is not None
assert row.intel_summary is not None
summary = json.loads(row.intel_summary)
assert "health_score" in summary
assert "symbol_count" in summary
assert "hotspot_count" in summary
assert "dead_symbol_count" in summary
class TestBlameIntegration:
@pytest.mark.asyncio
async def test_blame_returns_empty_when_no_index(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_symbol_indexer import load_symbol_history
from musehub.api.routes.musehub.blame import _build_real_symbol_blame
repo = await create_repo(db_session, slug="blame-no-idx")
history = await load_symbol_history(db_session, repo.repo_id, file_path="file.py")
results = _build_real_symbol_blame(history, "file.py", {})
assert results == []
@pytest.mark.asyncio
async def test_blame_entries_after_index_build(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_symbol_indexer import load_symbol_history
from musehub.api.routes.musehub.blame import _build_real_symbol_blame
repo = await create_repo(db_session, slug="blame-with-idx")
ops = [
_insert_op("src/api.py::handle_request"),
_insert_op("src/api.py::parse_args"),
]
await _build_index(db_session, repo.repo_id, "head-blame", ops)
history = await load_symbol_history(
db_session, repo.repo_id, file_path="src/api.py"
)
results = _build_real_symbol_blame(history, "src/api.py", {})
names = [r.symbol_name for r in results]
assert "handle_request" in names
assert "parse_args" in names
class TestCrossRepoIntegration:
@pytest.mark.asyncio
async def test_search_symbol_no_repos(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_cross_repo import search_symbol_across_repos
result = await search_symbol_across_repos(
db_session, "ghost-owner", "Fn", visible_to_user="ghost-owner"
)
assert result == []
@pytest.mark.asyncio
async def test_search_symbol_finds_match(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_cross_repo import search_symbol_across_repos
owner = f"owner-{secrets.token_hex(4)}"
repo = await create_repo(db_session, slug="search-sym-repo", owner=owner,
visibility="public")
ops = [_insert_op("api.py::compute_intel")]
await _build_index(db_session, repo.repo_id, "head-search", ops)
results = await search_symbol_across_repos(
db_session, owner, "compute_intel", visible_to_user=owner
)
assert len(results) >= 1
assert any("compute_intel" in r.address for r in results)
@pytest.mark.asyncio
async def test_search_symbol_case_insensitive(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_cross_repo import search_symbol_across_repos
owner = f"owner-{secrets.token_hex(4)}"
repo = await create_repo(db_session, slug="search-case-repo", owner=owner,
visibility="public")
ops = [_insert_op("api.py::MyFunction")]
await _build_index(db_session, repo.repo_id, "head-case", ops)
results = await search_symbol_across_repos(
db_session, owner, "myfunction", visible_to_user=owner
)
assert any("MyFunction" in r.address for r in results)
@pytest.mark.asyncio
async def test_search_symbol_private_repo_excluded_without_auth(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_cross_repo import search_symbol_across_repos
owner = f"owner-{secrets.token_hex(4)}"
repo = await create_repo(db_session, slug="search-private-repo", owner=owner,
visibility="private")
ops = [_insert_op("api.py::SecretFn")]
await _build_index(db_session, repo.repo_id, "head-priv", ops)
# visible_to_user=None → only public repos
results = await search_symbol_across_repos(
db_session, owner, "SecretFn", visible_to_user=None
)
assert not any("SecretFn" in r.address for r in results)
@pytest.mark.asyncio
async def test_workspace_blast_risk_empty(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_cross_repo import workspace_blast_risk_top_n
result = await workspace_blast_risk_top_n(
db_session, "nonexistent-owner", visible_to_user="nonexistent-owner"
)
assert result == []
@pytest.mark.asyncio
async def test_workspace_blast_risk_populated(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_cross_repo import workspace_blast_risk_top_n
owner = f"owner-{secrets.token_hex(4)}"
repo = await create_repo(db_session, slug="wbr-repo", owner=owner,
visibility="public")
ops = [_insert_op("a.py::Fn"), _insert_op("b.py::Gn")]
await _build_index(db_session, repo.repo_id, "head-wbr", ops)
results = await workspace_blast_risk_top_n(
db_session, owner, visible_to_user=owner
)
assert len(results) >= 2
# Sorted by co_change_count descending
for i in range(len(results) - 1):
assert results[i].co_change_count >= results[i + 1].co_change_count
@pytest.mark.asyncio
async def test_cross_repo_impact_no_source_repo(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_cross_repo import cross_repo_impact
result = await cross_repo_impact(
db_session, "ghost-owner", secrets.token_hex(16), "file.py::Fn",
visible_to_user="ghost-owner",
)
assert result is None
@pytest.mark.asyncio
async def test_cross_repo_impact_unknown_address(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_cross_repo import cross_repo_impact
owner = f"owner-{secrets.token_hex(4)}"
repo = await create_repo(db_session, slug="cri-unknown", owner=owner,
visibility="public")
ops = [_insert_op("a.py::KnownFn")]
await _build_index(db_session, repo.repo_id, "head-cri", ops)
result = await cross_repo_impact(
db_session, owner, repo.repo_id, "a.py::NonExistent",
visible_to_user=owner,
)
assert result is None
@pytest.mark.asyncio
async def test_build_deps_graph_single_repo(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_cross_repo import build_deps_graph
owner = f"owner-{secrets.token_hex(4)}"
repo = await create_repo(db_session, slug="deps-single", owner=owner,
visibility="public")
ops = [
_insert_op("a.b.c.Fn"),
_insert_op("a.b.d.Gn"),
]
await _build_index(db_session, repo.repo_id, "head-deps", ops)
graph = await build_deps_graph(
db_session, owner, repo.repo_id, visible_to_user=owner
)
assert hasattr(graph, "nodes")
assert hasattr(graph, "edges")
@pytest.mark.asyncio
async def test_build_deps_graph_no_source_repo_returns_empty(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_cross_repo import build_deps_graph, DepsGraph
owner = f"owner-{secrets.token_hex(4)}"
graph = await build_deps_graph(
db_session, owner, secrets.token_hex(16), visible_to_user=owner
)
assert isinstance(graph, DepsGraph)
# ===========================================================================
# Layer 3 — End-to-End tests (full HTTP via AsyncClient, real DB)
# ===========================================================================
class TestBlameEndToEnd:
@pytest.mark.asyncio
async def test_blame_404_unknown_repo(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
resp = await client.get(
f"/api/repos/{secrets.token_hex(16)}/blame/HEAD",
params={"path": "file.py"},
)
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_blame_public_repo_no_auth(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
repo = await create_repo(db_session, slug="blame-e2e-pub", visibility="public")
await db_session.commit()
resp = await client.get(
f"/api/repos/{repo.repo_id}/blame/HEAD",
params={"path": "file.py"},
)
assert resp.status_code == 200
data = resp.json()
assert "entries" in data
assert "totalEntries" in data
assert "path" in data
@pytest.mark.asyncio
async def test_blame_private_repo_requires_auth(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
repo = await create_repo(db_session, slug="blame-e2e-priv", visibility="private")
await db_session.commit()
resp = await client.get(
f"/api/repos/{repo.repo_id}/blame/HEAD",
params={"path": "file.py"},
)
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_blame_returns_entries_after_index_build(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
repo = await create_repo(db_session, slug="blame-e2e-entries", visibility="public")
ops = [_insert_op("api/routes.py::dispatch"), _insert_op("api/routes.py::validate")]
await _build_index(db_session, repo.repo_id, "head-blame-e2e", ops)
resp = await client.get(
f"/api/repos/{repo.repo_id}/blame/HEAD",
params={"path": "api/routes.py"},
)
assert resp.status_code == 200
data = resp.json()
names = [e["symbolName"] for e in data["entries"]]
assert "dispatch" in names
assert "validate" in names
@pytest.mark.asyncio
async def test_blame_path_filter_respected(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
repo = await create_repo(db_session, slug="blame-e2e-filter", visibility="public")
ops = [
_insert_op("path/a.py::FnA"),
_insert_op("path/b.py::FnB"),
]
await _build_index(db_session, repo.repo_id, "head-filter", ops)
resp = await client.get(
f"/api/repos/{repo.repo_id}/blame/HEAD",
params={"path": "path/a.py"},
)
assert resp.status_code == 200
data = resp.json()
names = [e["symbolName"] for e in data["entries"]]
assert "FnA" in names
assert "FnB" not in names
@pytest.mark.asyncio
async def test_symbol_index_rebuild_endpoint(
self, client: AsyncClient, db_session: AsyncSession, auth_headers: StrDict
) -> None:
from musehub.db.musehub_repo_models import MusehubBranch as _Branch, MusehubCommit as _Commit, MusehubCommitRef as _CommitRef
repo = await create_repo(db_session, slug="rebuild-e2e")
# Create a head commit on "main"
commit = _Commit(
commit_id="rebuild-head",
branch="main",
parent_ids=[],
message="initial",
author="gabriel",
timestamp=_now(),
structured_delta={"ops": [_insert_op("x.py::Fn")]},
)
db_branch = _Branch(
branch_id=secrets.token_hex(16),
repo_id=repo.repo_id,
name="main",
head_commit_id="rebuild-head",
)
db_session.add(commit)
db_session.add(_CommitRef(repo_id=repo.repo_id, commit_id="rebuild-head"))
db_session.add(db_branch)
await db_session.commit()
resp = await client.post(
f"/api/repos/{repo.repo_id}/symbol-index/rebuild",
headers=auth_headers,
)
assert resp.status_code in (200, 202)
@pytest.mark.asyncio
async def test_symbol_index_rebuild_requires_auth(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
repo = await create_repo(db_session, slug="rebuild-noauth")
await db_session.commit()
resp = await client.post(f"/api/repos/{repo.repo_id}/symbol-index/rebuild")
assert resp.status_code == 401
# ===========================================================================
# Layer 4 — Stress tests
# ===========================================================================
class TestStress:
def test_compute_intel_1000_symbols(self) -> None:
"""compute_intel on 1000 symbols completes without error."""
history: SymbolHistory = {}
for i in range(1000):
ts = _ago(days=i % 200)
history[f"module/file_{i % 20}.py::Fn{i}"] = [
_entry(f"c{i}", ts=ts)
]
snap = compute_intel(history, [], now_utc=_now())
assert snap.total_symbols == 1000
assert 0 <= snap.health_score <= 100
def test_compute_intel_many_co_changing_symbols(self) -> None:
"""50 symbols all sharing the same commit — coupling matrix stays bounded."""
commit_id = "shared"
history = {
f"file.py::Fn{i}": [_entry(commit_id)] for i in range(50)
}
snap = compute_intel(history, [], now_utc=_now())
# _TOP_COUPLING=5 cap must be respected
assert len(snap.coupling_pairs) <= 5
@pytest.mark.asyncio
async def test_search_symbol_across_10_repos(
self, db_session: AsyncSession
) -> None:
"""Search across 10 repos each with 20 symbols."""
from musehub.services.musehub_cross_repo import search_symbol_across_repos
owner = f"stress-owner-{secrets.token_hex(3)}"
for i in range(10):
repo = await create_repo(
db_session, slug=f"stress-repo-{i}", owner=owner, visibility="public"
)
ops = [_insert_op(f"mod{j}.py::TargetFn{j}") for j in range(20)]
await _build_index(db_session, repo.repo_id, f"head-stress-{i}", ops)
results = await search_symbol_across_repos(
db_session, owner, "TargetFn", visible_to_user=owner, limit=50
)
assert len(results) >= 1
@pytest.mark.asyncio
async def test_workspace_blast_risk_across_5_repos(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_cross_repo import workspace_blast_risk_top_n
owner = f"wbr-owner-{secrets.token_hex(3)}"
for i in range(5):
repo = await create_repo(
db_session, slug=f"wbr-sr-{i}", owner=owner, visibility="public"
)
ops = [_insert_op(f"f{j}.py::Fn{j}") for j in range(10)]
await _build_index(db_session, repo.repo_id, f"head-wbr-{i}", ops)
results = await workspace_blast_risk_top_n(
db_session, owner, top_n=20, visible_to_user=owner
)
# 5 repos × 10 symbols each = 50 entries, capped at top_n=20
assert len(results) <= 20
assert len(results) >= 1
def test_blame_build_500_symbols(self) -> None:
"""_build_real_symbol_blame with 500 symbols in one file stays fast."""
from musehub.api.routes.musehub.blame import _build_real_symbol_blame
history = {f"big/file.py::Fn{i}": [_entry(f"c{i}")] for i in range(500)}
commit_map = {f"c{i}": {"message": "m", "author": "g", "timestamp": _now()}
for i in range(500)}
results = _build_real_symbol_blame(history, "big/file.py", commit_map)
assert len(results) == 500
# ===========================================================================
# Layer 5 — Data Integrity tests
# ===========================================================================
class TestDataIntegrity:
def test_intel_snapshot_as_dict_from_dict_identity(self) -> None:
"""Round-trip through as_dict/from_dict is lossless for all fields."""
snap = compute_intel(
{
"file.py::Fn": [_entry(f"c{i}") for i in range(15)],
"file.py::Old": [_entry("co", ts=_ago(150))],
},
["breaking1"],
now_utc=_now(),
)
d = snap.as_dict()
reconstructed = IntelSnapshot.from_dict(d)
assert reconstructed.health_score == snap.health_score
assert reconstructed.alert_hotspot_count == snap.alert_hotspot_count
assert reconstructed.alert_dead_count == snap.alert_dead_count
assert reconstructed.alert_breaking_count == snap.alert_breaking_count
assert len(reconstructed.hotspots) == len(snap.hotspots)
@pytest.mark.asyncio
async def test_intel_full_json_stored_and_retrievable(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_symbol_indexer import load_intel_snapshot
repo = await create_repo(db_session, slug="di-intel-json")
ops = [_insert_op("svc.py::do_work", "sha256:beef")]
row = await _build_index(db_session, repo.repo_id, "head-di", ops)
assert row.intel_full_json is not None
snap = await load_intel_snapshot(db_session, repo.repo_id)
assert snap is not None
assert snap.total_symbols == 1
hotspot_addrs = [h.address for h in snap.hotspots]
# Address must be present in symbol set
all_in_dict = json.loads(row.intel_full_json)
assert all_in_dict["total_symbols"] == 1
def test_velocity_week_buckets_count(self) -> None:
"""Velocity must always have exactly 12 buckets."""
history = {
"f.py::Fn": [_entry("c1", ts=_ago(days=1))],
}
snap = compute_intel(history, [], now_utc=_now())
assert len(snap.velocity.weeks) == 12
def test_hotspot_entries_have_required_fields(self) -> None:
history = {
"f.py::Fn": [_entry(f"c{i}") for i in range(12)],
}
snap = compute_intel(history, [], now_utc=_now())
for h in snap.hotspots:
assert isinstance(h.address, str)
assert isinstance(h.change_count, int)
assert h.change_count > 0
def test_dead_entry_days_cold_matches_expected(self) -> None:
old_ts = _ago(120)
history = {"f.py::Old": [_entry("c1", ts=old_ts)]}
snap = compute_intel(history, [], now_utc=_now())
if snap.dead_candidates:
entry = snap.dead_candidates[0]
assert 110 <= entry.days_cold <= 130 # allow ±10 days rounding
@pytest.mark.asyncio
async def test_blame_entry_fields_complete(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_symbol_indexer import load_symbol_history
from musehub.api.routes.musehub.blame import _build_real_symbol_blame
repo = await create_repo(db_session, slug="di-blame-fields")
ops = [_insert_op("f.py::Fn", "sha256:data1")]
await _build_index(db_session, repo.repo_id, "head-di-blame", ops)
history = await load_symbol_history(db_session, repo.repo_id, file_path="f.py")
commit_map = {"head-di-blame": {"message": "feat: add fn", "author": "gabriel",
"timestamp": _now()}}
results = _build_real_symbol_blame(history, "f.py", commit_map)
assert len(results) == 1
entry = results[0]
assert entry.symbol_name == "Fn"
assert entry.symbol_address == "f.py::Fn"
assert entry.op in ("add", "modify", "delete", "insert", "replace", "patch", "rename")
# ===========================================================================
# Layer 6 — Security tests
# ===========================================================================
class TestSecurity:
@pytest.mark.asyncio
async def test_blame_private_repo_401_no_token(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
repo = await create_repo(db_session, slug="sec-blame-priv", visibility="private")
await db_session.commit()
resp = await client.get(
f"/api/repos/{repo.repo_id}/blame/HEAD",
params={"path": "file.py"},
)
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_blame_404_for_deleted_repo(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
repo = await create_repo(db_session, slug="sec-blame-deleted", visibility="public")
await db_session.delete(repo)
await db_session.commit()
resp = await client.get(
f"/api/repos/{repo.repo_id}/blame/HEAD",
params={"path": "file.py"},
)
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_search_private_repo_not_visible_to_other_user(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_cross_repo import search_symbol_across_repos
owner = f"sec-owner-{secrets.token_hex(3)}"
repo = await create_repo(db_session, slug="sec-priv-search", owner=owner,
visibility="private")
ops = [_insert_op("secret.py::TopSecretFn")]
await _build_index(db_session, repo.repo_id, "head-sec-priv", ops)
# Different user can't see private repo
results = await search_symbol_across_repos(
db_session, owner, "TopSecretFn", visible_to_user="other-user"
)
assert not any("TopSecretFn" in r.address for r in results)
@pytest.mark.asyncio
async def test_blame_path_with_traversal_chars_no_crash(
self, client: AsyncClient, db_session: AsyncSession
) -> None:
repo = await create_repo(db_session, slug="sec-traversal", visibility="public")
await db_session.commit()
# Path with traversal attempt — server should return 200 with empty entries
resp = await client.get(
f"/api/repos/{repo.repo_id}/blame/HEAD",
params={"path": "../../../etc/passwd"},
)
assert resp.status_code == 200
data = resp.json()
assert data["entries"] == []
def test_compute_intel_with_injected_commit_ids(self) -> None:
"""Malformed commit IDs in history do not cause exceptions."""
history = {
"f.py::Fn": [
{"commit_id": "'; DROP TABLE commits; --", "op": "add"},
{"commit_id": "", "op": "modify"},
{"commit_id": None, "op": "add"},
]
}
snap = compute_intel(history, [], now_utc=_now())
assert snap.total_symbols == 1
def test_blame_build_with_xss_in_commit_message(self) -> None:
"""XSS in commit messages is returned verbatim, not executed."""
from musehub.api.routes.musehub.blame import _build_real_symbol_blame
history = {"f.py::Fn": [_entry("c1")]}
xss_msg = ""
commit_map = {"c1": {"message": xss_msg, "author": "
",
"timestamp": _now()}}
results = _build_real_symbol_blame(history, "f.py", commit_map)
assert results[0].commit_message == xss_msg # stored as-is (escaping is UI's job)
# ===========================================================================
# Layer 7 — Performance tests
# ===========================================================================
class TestPerformance:
def test_compute_intel_500_symbols_under_200ms(self) -> None:
history = {
f"pkg/mod_{i}.py::Symbol{i}": [
_entry(f"c{i}_{j}", ts=_ago(j % 300))
for j in range(5)
]
for i in range(100)
}
t0 = time.perf_counter()
snap = compute_intel(history, [], now_utc=_now())
elapsed_ms = (time.perf_counter() - t0) * 1000
assert elapsed_ms < 200, f"compute_intel took {elapsed_ms:.1f}ms"
assert snap.total_symbols == 100
def test_intel_as_dict_from_dict_1000_entries_under_50ms(self) -> None:
history = {f"f.py::Fn{i}": [_entry(f"c{i}")] for i in range(1000)}
snap = compute_intel(history, [], now_utc=_now())
t0 = time.perf_counter()
d = snap.as_dict()
IntelSnapshot.from_dict(d)
elapsed_ms = (time.perf_counter() - t0) * 1000
assert elapsed_ms < 50, f"as_dict/from_dict took {elapsed_ms:.1f}ms"
def test_blame_build_1000_symbols_under_200ms(self) -> None:
from musehub.api.routes.musehub.blame import _build_real_symbol_blame
history = {f"big/file.py::Fn{i}": [_entry(f"c{i}")] for i in range(1000)}
commit_map = {f"c{i}": {"message": "m", "author": "g", "timestamp": _now()}
for i in range(1000)}
t0 = time.perf_counter()
results = _build_real_symbol_blame(history, "big/file.py", commit_map)
elapsed_ms = (time.perf_counter() - t0) * 1000
assert elapsed_ms < 200, f"_build_real_symbol_blame took {elapsed_ms:.1f}ms"
assert len(results) == 1000
@pytest.mark.asyncio
async def test_search_across_5_repos_under_1s(
self, db_session: AsyncSession
) -> None:
from musehub.services.musehub_cross_repo import search_symbol_across_repos
owner = f"perf-owner-{secrets.token_hex(3)}"
for i in range(5):
repo = await create_repo(
db_session, slug=f"perf-repo-{i}", owner=owner, visibility="public"
)
ops = [_insert_op(f"m{j}.py::Fn{j}") for j in range(30)]
await _build_index(db_session, repo.repo_id, f"head-perf-{i}", ops)
t0 = time.perf_counter()
results = await search_symbol_across_repos(
db_session, owner, "Fn", visible_to_user=owner
)
elapsed_ms = (time.perf_counter() - t0) * 1000
assert elapsed_ms < 1000, f"search_symbol_across_repos took {elapsed_ms:.1f}ms"
assert len(results) >= 1