gabriel / musehub public
test_snapshot_symbol_indexer.py python
2,543 lines 107.4 KB
Raw
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2 feat: add repair-commit wire endpoint (API parity with repa… Opus 4.8 minor ⚠ breaking 1 day ago
1 """Tests for the Snapshot & Symbol Indexer — Section 5 of test-coverage-checklist.md.
2
3 Complements test_snapshot_entries.py (14 tests on the snapshot write/read path).
4 This file focuses on the symbol indexer and the gaps not covered there.
5
6 Coverage layers
7 ───────────────
8 Unit — _extract_ops (flat/nested child_ops, missing address, non-dict delta);
9 _op_to_muse_op (all mapping keys, unknown passthrough).
10 Integration — build_symbol_index: empty list when no structured_delta; returns results
11 for repos with structured_delta; correct symbol_history/hash_occurrence
12 content; upsert semantics (only one row per repo/intel_type); BFS
13 excludes orphaned commits.
14 load_symbol_history: empty when no index; with/without file_path filter.
15 load_hash_occurrence: empty when no index; correct content.
16 get_index_meta: None/present states.
17 load_intel_snapshot: None/present states.
18 get_snapshot_manifests_batch: empty list, single, multi-snapshot.
19 Data — upsert_snapshot_entries atomic replace (stale entries removed);
20 build_symbol_index + persist_intel_results upserts on rebuild;
21 BFS reachability excludes orphaned branches.
22 Security — Corrupt JSON blob returns {} not exception;
23 build_symbol_index with unknown head_commit_id returns empty list.
24 Stress — upsert_snapshot_entries with 1 000-file manifest;
25 get_snapshot_manifests_batch with 50 snapshots in one query;
26 build_symbol_index with 100 commits (10 ops each);
27 load_symbol_history file_path filter on large index.
28 Performance — _extract_ops 1 000 calls < 100 ms;
29 build_symbol_index 100 commits < 3 s.
30 E2E — Full pipeline: commits with structured_delta → build_symbol_index →
31 persist_intel_results → get_index_meta returns correct ref;
32 rebuild replaces previous result; symbol list HTTP page returns 200.
33 """
34 from __future__ import annotations
35
36 import json
37 import secrets
38 import time
39 from datetime import datetime, timezone
40
41 import pytest
42 from sqlalchemy import select
43 from sqlalchemy.ext.asyncio import AsyncSession
44
45 from musehub.db.musehub_intel_models import MusehubIntelResult, MusehubSymbolHistoryEntry
46 from musehub.db.musehub_repo_models import MusehubCommit, MusehubCommitRef, MusehubSnapshot, MusehubSnapshotRef
47 from tests.factories import create_repo
48 from musehub.types.json_types import JSONObject
49 from muse.core.types import long_id, blob_id
50
51
52 # ─────────────────────────────────────────────────────────────────────────────
53 # Helpers
54 # ─────────────────────────────────────────────────────────────────────────────
55
56 def _now() -> datetime:
57 return datetime.now(tz=timezone.utc)
58
59
60 async def _commit_with_delta(
61 session: AsyncSession,
62 repo_id: str,
63 commit_id: str,
64 ops: list[JSONObject],
65 parent_ids: list[str] | None = None,
66 branch: str = "main",
67 author: str = "gabriel",
68 ) -> MusehubCommit:
69 """Insert a commit with a structured_delta."""
70 commit = MusehubCommit(
71 commit_id=commit_id,
72 branch=branch,
73 parent_ids=parent_ids or [],
74 message="feat: test commit",
75 author=author,
76 timestamp=_now(),
77 structured_delta={"ops": ops},
78 )
79 session.add(commit)
80 session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id))
81 await session.flush()
82 return commit
83
84
85 def _insert_op(address: str, content_id: str = "sha256:abc") -> JSONObject:
86 return {"address": address, "op": "insert", "content_id": content_id}
87
88
89 def _move_op(address: str, from_address: str, content_id: str = "sha256:abc") -> JSONObject:
90 return {"address": address, "op": "move", "from_address": from_address, "content_id": content_id}
91
92
93 def _patch_op(file_addr: str, children: list[JSONObject]) -> JSONObject:
94 return {"address": file_addr, "op": "patch", "child_ops": children}
95
96
97 async def _build_and_persist(
98 session: AsyncSession,
99 repo_id: str,
100 commit_id: str,
101 ) -> list[tuple[str, dict]]:
102 """Build symbol index and persist results; returns the result list."""
103 from musehub.services.musehub_symbol_indexer import build_symbol_index
104 from musehub.services.musehub_intel_providers import persist_intel_results
105 results = await build_symbol_index(session, repo_id, commit_id)
106 if results:
107 await persist_intel_results(session, repo_id, commit_id, results)
108 return results
109
110
111 def _get_result_data(results: list[tuple[str, JSONObject]], intel_type: str) -> JSONObject:
112 """Extract data dict for a specific intel_type from the results list."""
113 for t, data in results:
114 if t == intel_type:
115 return data
116 return {}
117
118
119 # ─────────────────────────────────────────────────────────────────────────────
120 # Layer 1 — Unit: pure functions
121 # ─────────────────────────────────────────────────────────────────────────────
122
123 class TestExtractOps:
124 """_extract_ops pulls a flat list of ops including child_ops."""
125
126 def _run(self, structured_delta: JSONObject | None) -> list[JSONObject]:
127 from musehub.services.musehub_symbol_indexer import _extract_ops
128 return _extract_ops(structured_delta)
129
130 def test_no_structured_delta_returns_empty(self) -> None:
131 assert self._run(None) == []
132
133 def test_none_delta_returns_empty(self) -> None:
134 assert self._run(None) == []
135
136 def test_non_dict_delta_returns_empty(self) -> None:
137 assert self._run("bad") == [] # type: ignore[arg-type]
138
139 def test_flat_ops_without_child_ops(self) -> None:
140 delta = {
141 "ops": [
142 {"address": "main.py::Foo", "op": "insert"},
143 {"address": "main.py::Bar", "op": "delete"},
144 ]
145 }
146 result = self._run(delta)
147 assert len(result) == 2
148 assert result[0]["address"] == "main.py::Foo"
149 assert result[1]["address"] == "main.py::Bar"
150
151 def test_patch_op_with_child_ops_flattened(self) -> None:
152 delta = {
153 "ops": [
154 {
155 "address": "src/app.py",
156 "op": "patch",
157 "child_ops": [
158 {"address": "src/app.py::MyClass", "op": "insert"},
159 {"address": "src/app.py::MyClass.run", "op": "insert"},
160 ],
161 }
162 ]
163 }
164 result = self._run(delta)
165 # 1 top-level + 2 child_ops
166 assert len(result) == 3
167 addresses = [op["address"] for op in result]
168 assert "src/app.py" in addresses
169 assert "src/app.py::MyClass" in addresses
170 assert "src/app.py::MyClass.run" in addresses
171
172 def test_op_without_address_skipped(self) -> None:
173 delta = {
174 "ops": [
175 {"op": "insert"}, # no address
176 {"address": "ok.py", "op": "insert"},
177 ]
178 }
179 result = self._run(delta)
180 assert len(result) == 1
181 assert result[0]["address"] == "ok.py"
182
183 def test_child_op_without_address_skipped(self) -> None:
184 delta = {
185 "ops": [
186 {
187 "address": "file.py",
188 "op": "patch",
189 "child_ops": [
190 {"op": "insert"}, # no address — must be skipped
191 {"address": "file.py::Good", "op": "insert"},
192 ],
193 }
194 ]
195 }
196 result = self._run(delta)
197 addresses = [op["address"] for op in result]
198 assert "file.py::Good" in addresses
199 for op in result:
200 assert "address" in op
201
202 def test_non_dict_op_skipped(self) -> None:
203 delta = {"ops": ["not-a-dict", {"address": "f.py", "op": "add"}]}
204 result = self._run(delta)
205 assert len(result) == 1
206
207
208 class TestRawOpStorage:
209 """Raw DomainOp types are stored verbatim in op; full payload in op_payload."""
210
211 @pytest.mark.asyncio
212 async def test_insert_op_stored_raw(self, db_session: AsyncSession) -> None:
213 from musehub.services.musehub_symbol_indexer import build_symbol_index
214 from sqlalchemy import select
215
216 repo = await create_repo(db_session, slug="raw-insert")
217 commit = await _commit_with_delta(
218 db_session, repo.repo_id, "raw-c001",
219 ops=[{
220 "address": "main.py::Foo",
221 "op": "insert",
222 "content_id": "sha256:aaa",
223 "content_summary": "added function Foo",
224 "position": 0,
225 }],
226 )
227 await build_symbol_index(db_session, repo.repo_id, commit.commit_id)
228
229 row = (await db_session.execute(
230 select(MusehubSymbolHistoryEntry).where(
231 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
232 MusehubSymbolHistoryEntry.address == "main.py::Foo",
233 )
234 )).scalar_one()
235 assert row.op == "insert"
236 assert row.op_payload is not None
237 assert row.op_payload["content_summary"] == "added function Foo"
238 assert row.op_payload["position"] == 0
239 assert "op" not in row.op_payload
240 assert "address" not in row.op_payload
241
242 @pytest.mark.asyncio
243 async def test_replace_op_stored_raw(self, db_session: AsyncSession) -> None:
244 from musehub.services.musehub_symbol_indexer import build_symbol_index
245 from sqlalchemy import select
246
247 repo = await create_repo(db_session, slug="raw-replace")
248 commit = await _commit_with_delta(
249 db_session, repo.repo_id, "raw-c002",
250 ops=[{
251 "address": "main.py::Foo",
252 "op": "replace",
253 "old_content_id": "sha256:old",
254 "new_content_id": "sha256:new",
255 "old_summary": "function Foo v1",
256 "new_summary": "function Foo v2",
257 "position": None,
258 }],
259 )
260 await build_symbol_index(db_session, repo.repo_id, commit.commit_id)
261
262 row = (await db_session.execute(
263 select(MusehubSymbolHistoryEntry).where(
264 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
265 MusehubSymbolHistoryEntry.address == "main.py::Foo",
266 )
267 )).scalar_one()
268 assert row.op == "replace"
269 assert row.content_id == "sha256:new"
270 assert row.op_payload["old_content_id"] == "sha256:old"
271 assert row.op_payload["new_content_id"] == "sha256:new"
272 assert row.op_payload["old_summary"] == "function Foo v1"
273 assert row.op_payload["new_summary"] == "function Foo v2"
274
275 @pytest.mark.asyncio
276 async def test_patch_op_stored_raw_with_child_summary(
277 self, db_session: AsyncSession
278 ) -> None:
279 from musehub.services.musehub_symbol_indexer import build_symbol_index
280 from sqlalchemy import select
281
282 repo = await create_repo(db_session, slug="raw-patch")
283 commit = await _commit_with_delta(
284 db_session, repo.repo_id, "raw-c003",
285 ops=[{
286 "address": "src/app.py",
287 "op": "patch",
288 "child_domain": "python",
289 "child_summary": "2 symbols changed",
290 "child_ops": [
291 {"address": "src/app.py::MyClass", "op": "insert", "content_id": "sha256:cls", "content_summary": "added class", "position": 0},
292 ],
293 }],
294 )
295 await build_symbol_index(db_session, repo.repo_id, commit.commit_id)
296
297 rows = (await db_session.execute(
298 select(MusehubSymbolHistoryEntry).where(
299 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
300 )
301 )).scalars().all()
302 by_addr = {r.address: r for r in rows}
303
304 # Parent patch entry
305 patch_row = by_addr["src/app.py"]
306 assert patch_row.op == "patch"
307 assert patch_row.op_payload["child_summary"] == "2 symbols changed"
308 assert patch_row.op_payload["child_domain"] == "python"
309 assert "child_ops" not in patch_row.op_payload # stripped — those are separate rows
310
311 # Child entry
312 child_row = by_addr["src/app.py::MyClass"]
313 assert child_row.op == "insert"
314
315 @pytest.mark.asyncio
316 async def test_mutate_op_stored_raw_with_fields(
317 self, db_session: AsyncSession
318 ) -> None:
319 from musehub.services.musehub_symbol_indexer import build_symbol_index
320 from sqlalchemy import select
321
322 repo = await create_repo(db_session, slug="raw-mutate")
323 commit = await _commit_with_delta(
324 db_session, repo.repo_id, "raw-c004",
325 ops=[{
326 "address": "track.mid::note@bar4",
327 "op": "mutate",
328 "entity_id": "test-note-42",
329 "old_content_id": "sha256:old",
330 "new_content_id": "sha256:new",
331 "fields": {"velocity": {"old": "80", "new": "100"}},
332 "old_summary": "velocity 80",
333 "new_summary": "velocity 100",
334 "position": 3,
335 }],
336 )
337 await build_symbol_index(db_session, repo.repo_id, commit.commit_id)
338
339 row = (await db_session.execute(
340 select(MusehubSymbolHistoryEntry).where(
341 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
342 )
343 )).scalar_one()
344 assert row.op == "mutate"
345 assert row.op_payload["entity_id"] == "test-note-42"
346 assert row.op_payload["fields"] == {"velocity": {"old": "80", "new": "100"}}
347 assert row.op_payload["new_summary"] == "velocity 100"
348
349 @pytest.mark.asyncio
350 async def test_patch_with_from_address_is_rename(
351 self, db_session: AsyncSession
352 ) -> None:
353 """PatchOp with from_address is a file rename+modify; from_address in payload."""
354 from musehub.services.musehub_symbol_indexer import build_symbol_index
355 from sqlalchemy import select
356
357 repo = await create_repo(db_session, slug="raw-rename")
358 commit = await _commit_with_delta(
359 db_session, repo.repo_id, "raw-c005",
360 ops=[{
361 "address": "src/new.py",
362 "op": "patch",
363 "from_address": "src/old.py",
364 "child_domain": "python",
365 "child_summary": "file renamed",
366 "child_ops": [],
367 }],
368 )
369 await build_symbol_index(db_session, repo.repo_id, commit.commit_id)
370
371 row = (await db_session.execute(
372 select(MusehubSymbolHistoryEntry).where(
373 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
374 MusehubSymbolHistoryEntry.address == "src/new.py",
375 )
376 )).scalar_one()
377 assert row.op == "patch"
378 assert row.op_payload["from_address"] == "src/old.py"
379
380 @pytest.mark.asyncio
381 async def test_op_payload_excludes_op_and_address_keys(
382 self, db_session: AsyncSession
383 ) -> None:
384 from musehub.services.musehub_symbol_indexer import build_symbol_index
385 from sqlalchemy import select
386
387 repo = await create_repo(db_session, slug="raw-exclude")
388 commit = await _commit_with_delta(
389 db_session, repo.repo_id, "raw-c006",
390 ops=[{
391 "address": "util.py::helper",
392 "op": "insert",
393 "content_id": "sha256:ccc",
394 "content_summary": "added helper",
395 "position": 1,
396 }],
397 )
398 await build_symbol_index(db_session, repo.repo_id, commit.commit_id)
399
400 row = (await db_session.execute(
401 select(MusehubSymbolHistoryEntry).where(
402 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
403 )
404 )).scalar_one()
405 assert "op" not in row.op_payload
406 assert "address" not in row.op_payload
407
408
409 # ─────────────────────────────────────────────────────────────────────────────
410 # Layer 2 — Integration: build_symbol_index + read functions
411 # ─────────────────────────────────────────────────────────────────────────────
412
413 class TestBuildSymbolIndex:
414 @pytest.mark.asyncio
415 async def test_returns_empty_when_no_structured_delta(
416 self, db_session: AsyncSession
417 ) -> None:
418 from musehub.services.musehub_symbol_indexer import build_symbol_index
419 from tests.factories import create_commit
420
421 repo = await create_repo(db_session, slug="idx-nodelta")
422 commit = await create_commit(db_session, repo.repo_id, branch="main")
423
424 results = await build_symbol_index(db_session, repo.repo_id, commit.commit_id)
425 assert results == []
426
427 @pytest.mark.asyncio
428 async def test_returns_results_for_structured_delta(
429 self, db_session: AsyncSession
430 ) -> None:
431 from musehub.services.musehub_symbol_indexer import load_symbol_history
432 repo = await create_repo(db_session, slug="idx-creates")
433 commit = await _commit_with_delta(
434 db_session, repo.repo_id, "c001",
435 ops=[_insert_op("main.py::Foo", "sha256:aaa")],
436 )
437
438 results = await _build_and_persist(db_session, repo.repo_id, commit.commit_id)
439 await db_session.commit()
440
441 assert results
442 types = {t for t, _ in results}
443 # Aggregate blobs are still produced.
444 assert "code.intel_summary" in types
445 assert "code.intel_snapshot" in types
446 # Per-symbol data now lives in normalized tables, not in blobs.
447 assert "code.symbol_history" not in types
448 assert "code.hash_occurrence" not in types
449 assert "code.per_symbol_intel" not in types
450 # Confirm normalized rows were written.
451 history = await load_symbol_history(db_session, repo.repo_id)
452 assert "main.py::Foo" in history
453
454 @pytest.mark.asyncio
455 async def test_symbol_history_contains_correct_entries(
456 self, db_session: AsyncSession
457 ) -> None:
458 from musehub.services.musehub_symbol_indexer import load_symbol_history
459 repo = await create_repo(db_session, slug="idx-symhist")
460 commit = await _commit_with_delta(
461 db_session, repo.repo_id, "c002",
462 ops=[
463 _insert_op("src/app.py::MyClass", "sha256:class"),
464 _insert_op("src/app.py::my_func", "sha256:func"),
465 ],
466 )
467
468 await _build_and_persist(db_session, repo.repo_id, commit.commit_id)
469 await db_session.commit()
470
471 entries = await load_symbol_history(db_session, repo.repo_id)
472 assert "src/app.py::MyClass" in entries
473 assert "src/app.py::my_func" in entries
474 assert entries["src/app.py::MyClass"][0]["op"] == "insert"
475
476 @pytest.mark.asyncio
477 async def test_hash_occurrence_tracks_shared_content(
478 self, db_session: AsyncSession
479 ) -> None:
480 repo = await create_repo(db_session, slug="idx-hashoc")
481 shared_hash = "sha256:shared"
482 commit = await _commit_with_delta(
483 db_session, repo.repo_id, "c003",
484 ops=[
485 _insert_op("a.py::Foo", shared_hash),
486 _insert_op("b.py::Bar", shared_hash),
487 ],
488 )
489
490 from musehub.services.musehub_symbol_indexer import load_hash_occurrence
491 await _build_and_persist(db_session, repo.repo_id, commit.commit_id)
492 await db_session.commit()
493
494 entries = await load_hash_occurrence(db_session, repo.repo_id)
495 assert shared_hash in entries
496 assert set(entries[shared_hash]) == {"a.py::Foo", "b.py::Bar"}
497
498 @pytest.mark.asyncio
499 async def test_rebuild_upserts_one_row_per_intel_type(
500 self, db_session: AsyncSession
501 ) -> None:
502 from sqlalchemy import select, func
503
504 repo = await create_repo(db_session, slug="idx-prune")
505 c1 = await _commit_with_delta(db_session, repo.repo_id, "c100",
506 ops=[_insert_op("f.py::A")])
507 await _build_and_persist(db_session, repo.repo_id, c1.commit_id)
508 await db_session.commit()
509
510 c2 = await _commit_with_delta(db_session, repo.repo_id, "c101",
511 ops=[_insert_op("f.py::B")])
512 await _build_and_persist(db_session, repo.repo_id, c2.commit_id)
513 await db_session.commit()
514
515 # code.symbol_history is no longer a blob — it lives in normalized rows.
516 # intel_summary/intel_snapshot are the only blobs, each upserted once.
517 blob_count = (await db_session.execute(
518 select(func.count()).select_from(MusehubIntelResult).where(
519 MusehubIntelResult.repo_id == repo.repo_id,
520 MusehubIntelResult.intel_type == "code.intel_summary",
521 )
522 )).scalar_one()
523 assert blob_count == 1
524
525 @pytest.mark.asyncio
526 async def test_bfs_excludes_orphaned_commits(
527 self, db_session: AsyncSession
528 ) -> None:
529 """Commits not reachable from head must not appear in the symbol index."""
530 from musehub.services.musehub_symbol_indexer import load_symbol_history
531
532 repo = await create_repo(db_session, slug="idx-bfs")
533 await _commit_with_delta(
534 db_session, repo.repo_id, "orphan",
535 ops=[_insert_op("orphan.py::OrphanSym", "sha256:orphan")],
536 parent_ids=[],
537 )
538 head = await _commit_with_delta(
539 db_session, repo.repo_id, "head",
540 ops=[_insert_op("main.py::RealSym", "sha256:real")],
541 parent_ids=[],
542 )
543
544 await _build_and_persist(db_session, repo.repo_id, head.commit_id)
545 await db_session.commit()
546
547 history = await load_symbol_history(db_session, repo.repo_id)
548 assert "main.py::RealSym" in history
549 assert "orphan.py::OrphanSym" not in history
550
551
552 class TestLoadFunctions:
553 @pytest.mark.asyncio
554 async def test_load_symbol_history_empty_when_no_index(
555 self, db_session: AsyncSession
556 ) -> None:
557 from musehub.services.musehub_symbol_indexer import load_symbol_history
558 repo = await create_repo(db_session, slug="load-noindex")
559 result = await load_symbol_history(db_session, repo.repo_id)
560 assert result == {}
561
562 @pytest.mark.asyncio
563 async def test_load_symbol_history_with_file_path_filter(
564 self, db_session: AsyncSession
565 ) -> None:
566 from musehub.services.musehub_symbol_indexer import load_symbol_history
567
568 repo = await create_repo(db_session, slug="load-filter")
569 commit = await _commit_with_delta(
570 db_session, repo.repo_id, "cF01",
571 ops=[
572 _insert_op("a.py::Foo", "sha256:x"),
573 _insert_op("a.py", "sha256:file"),
574 _insert_op("b.py::Bar", "sha256:y"),
575 ],
576 )
577 await _build_and_persist(db_session, repo.repo_id, commit.commit_id)
578 await db_session.commit()
579
580 result = await load_symbol_history(db_session, repo.repo_id, file_path="a.py")
581 assert "a.py::Foo" in result
582 assert "a.py" in result
583 assert "b.py::Bar" not in result
584
585 @pytest.mark.asyncio
586 async def test_load_hash_occurrence_empty_when_no_index(
587 self, db_session: AsyncSession
588 ) -> None:
589 from musehub.services.musehub_symbol_indexer import load_hash_occurrence
590 repo = await create_repo(db_session, slug="hash-noindex")
591 assert await load_hash_occurrence(db_session, repo.repo_id) == {}
592
593 @pytest.mark.asyncio
594 async def test_load_hash_occurrence_returns_correct_entries(
595 self, db_session: AsyncSession
596 ) -> None:
597 from musehub.services.musehub_symbol_indexer import load_hash_occurrence
598
599 repo = await create_repo(db_session, slug="hash-entries")
600 commit = await _commit_with_delta(
601 db_session, repo.repo_id, "cH01",
602 ops=[_insert_op("x.py::X", "sha256:hash1"), _insert_op("y.py::Y", "sha256:hash1")],
603 )
604 await _build_and_persist(db_session, repo.repo_id, commit.commit_id)
605 await db_session.commit()
606
607 result = await load_hash_occurrence(db_session, repo.repo_id)
608 assert "sha256:hash1" in result
609 assert set(result["sha256:hash1"]) == {"x.py::X", "y.py::Y"}
610
611 @pytest.mark.asyncio
612 async def test_get_index_meta_none_when_no_index(
613 self, db_session: AsyncSession
614 ) -> None:
615 from musehub.services.musehub_symbol_indexer import get_index_meta
616 repo = await create_repo(db_session, slug="meta-none")
617 assert await get_index_meta(db_session, repo.repo_id) is None
618
619 @pytest.mark.asyncio
620 async def test_get_index_meta_returns_ref_and_symbol_count(
621 self, db_session: AsyncSession
622 ) -> None:
623 from musehub.services.musehub_symbol_indexer import get_index_meta
624
625 repo = await create_repo(db_session, slug="meta-ok")
626 commit = await _commit_with_delta(
627 db_session, repo.repo_id, "cM01",
628 ops=[_insert_op("f.py::A"), _insert_op("f.py::B")],
629 )
630 await _build_and_persist(db_session, repo.repo_id, commit.commit_id)
631 await db_session.commit()
632
633 meta = await get_index_meta(db_session, repo.repo_id)
634 assert meta is not None
635 assert meta["ref"] == commit.commit_id
636 assert meta["built_at"] is not None
637 assert meta["symbol_count"] >= 2
638
639 @pytest.mark.asyncio
640 async def test_load_intel_snapshot_none_when_no_index(
641 self, db_session: AsyncSession
642 ) -> None:
643 from musehub.services.musehub_symbol_indexer import load_intel_snapshot
644 repo = await create_repo(db_session, slug="intel-none")
645 assert await load_intel_snapshot(db_session, repo.repo_id) is None
646
647 @pytest.mark.asyncio
648 async def test_load_intel_snapshot_returns_snapshot_when_built(
649 self, db_session: AsyncSession
650 ) -> None:
651 from musehub.services.musehub_symbol_indexer import load_intel_snapshot
652
653 repo = await create_repo(db_session, slug="intel-ok")
654 commit = await _commit_with_delta(
655 db_session, repo.repo_id, "cI01",
656 ops=[_insert_op("app.py::Handler", "sha256:h1")],
657 )
658 results = await _build_and_persist(db_session, repo.repo_id, commit.commit_id)
659 await db_session.commit()
660
661 assert results, "build_symbol_index returned empty results"
662 assert any(t == "code.intel_snapshot" for t, _ in results), "code.intel_snapshot not in results"
663 snap = await load_intel_snapshot(db_session, repo.repo_id)
664 assert snap is not None
665
666
667 class TestGetSnapshotManifestsBatch:
668 @pytest.mark.asyncio
669 async def test_empty_list_returns_empty_dict(
670 self, db_session: AsyncSession
671 ) -> None:
672 from musehub.services.musehub_snapshot import get_snapshot_manifests_batch
673 result = await get_snapshot_manifests_batch(db_session, [])
674 assert result == {}
675
676 @pytest.mark.asyncio
677 async def test_single_snapshot_manifest(
678 self, db_session: AsyncSession
679 ) -> None:
680 from musehub.services.musehub_snapshot import (
681 get_snapshot_manifests_batch,
682 upsert_snapshot_entries,
683 )
684 repo = await create_repo(db_session, slug="batch-single")
685 snap_id = "snap-batch-01"
686 await upsert_snapshot_entries(
687 db_session, repo.repo_id, snap_id, {"a.py": "sha256:a", "b.py": "sha256:b"}
688 )
689 await db_session.commit()
690
691 result = await get_snapshot_manifests_batch(db_session, [snap_id])
692 assert snap_id in result
693 assert result[snap_id]["a.py"] == "sha256:a"
694 assert result[snap_id]["b.py"] == "sha256:b"
695
696 @pytest.mark.asyncio
697 async def test_multiple_snapshots_grouped_correctly(
698 self, db_session: AsyncSession
699 ) -> None:
700 from musehub.services.musehub_snapshot import (
701 get_snapshot_manifests_batch,
702 upsert_snapshot_entries,
703 )
704 repo = await create_repo(db_session, slug="batch-multi")
705 for i in range(5):
706 snap_id = f"snap-multi-{i:02d}"
707 await upsert_snapshot_entries(
708 db_session, repo.repo_id, snap_id, {f"file{i}.py": long_id(f"{i}")}
709 )
710 await db_session.commit()
711
712 ids = [f"snap-multi-{i:02d}" for i in range(5)]
713 result = await get_snapshot_manifests_batch(db_session, ids)
714 assert len(result) == 5
715 for i, sid in enumerate(ids):
716 assert f"file{i}.py" in result[sid]
717
718 @pytest.mark.asyncio
719 async def test_unknown_snapshot_id_returns_empty_manifest(
720 self, db_session: AsyncSession
721 ) -> None:
722 from musehub.services.musehub_snapshot import get_snapshot_manifests_batch
723 result = await get_snapshot_manifests_batch(db_session, ["ghost-snap"])
724 assert result == {"ghost-snap": {}}
725
726
727 # ─────────────────────────────────────────────────────────────────────────────
728 # Layer 3 — E2E: full pipeline via direct service calls with real DB
729 # ─────────────────────────────────────────────────────────────────────────────
730
731 class TestSymbolIndexPipeline:
732 @pytest.mark.asyncio
733 async def test_build_then_meta_reflects_head_commit(
734 self, db_session: AsyncSession
735 ) -> None:
736 from musehub.services.musehub_symbol_indexer import get_index_meta
737
738 repo = await create_repo(db_session, slug="e2e-pipeline")
739 c1 = await _commit_with_delta(
740 db_session, repo.repo_id, "pipe-c001",
741 ops=[_insert_op("service.py::APIHandler", "sha256:h1")],
742 )
743 await _build_and_persist(db_session, repo.repo_id, c1.commit_id)
744 await db_session.commit()
745
746 meta = await get_index_meta(db_session, repo.repo_id)
747 assert meta is not None
748 assert meta["ref"] == c1.commit_id
749
750 @pytest.mark.asyncio
751 async def test_rebuild_updates_ref_to_latest_commit(
752 self, db_session: AsyncSession
753 ) -> None:
754 from musehub.services.musehub_symbol_indexer import get_index_meta
755
756 repo = await create_repo(db_session, slug="e2e-rebuild")
757 c1 = await _commit_with_delta(db_session, repo.repo_id, "rb-c001",
758 ops=[_insert_op("a.py::Old")])
759 await _build_and_persist(db_session, repo.repo_id, c1.commit_id)
760 await db_session.commit()
761
762 c2 = await _commit_with_delta(db_session, repo.repo_id, "rb-c002",
763 ops=[_insert_op("b.py::New")],
764 parent_ids=[c1.commit_id])
765 await _build_and_persist(db_session, repo.repo_id, c2.commit_id)
766 await db_session.commit()
767
768 meta = await get_index_meta(db_session, repo.repo_id)
769 assert meta is not None
770 assert meta["ref"] == c2.commit_id
771
772 @pytest.mark.asyncio
773 async def test_multi_commit_chain_all_symbols_indexed(
774 self, db_session: AsyncSession
775 ) -> None:
776 """3-commit chain — every symbol from every commit must appear in the index."""
777 from musehub.services.musehub_symbol_indexer import load_symbol_history
778
779 repo = await create_repo(db_session, slug="e2e-chain")
780 c1 = await _commit_with_delta(db_session, repo.repo_id, "chain-c001",
781 ops=[_insert_op("a.py::A1")])
782 c2 = await _commit_with_delta(db_session, repo.repo_id, "chain-c002",
783 ops=[_insert_op("b.py::B1")],
784 parent_ids=[c1.commit_id])
785 c3 = await _commit_with_delta(db_session, repo.repo_id, "chain-c003",
786 ops=[_insert_op("c.py::C1")],
787 parent_ids=[c2.commit_id])
788 await _build_and_persist(db_session, repo.repo_id, c3.commit_id)
789 await db_session.commit()
790
791 history = await load_symbol_history(db_session, repo.repo_id)
792 assert "a.py::A1" in history
793 assert "b.py::B1" in history
794 assert "c.py::C1" in history
795
796
797 # ─────────────────────────────────────────────────────────────────────────────
798 # Layer 4 — Data Integrity
799 # ─────────────────────────────────────────────────────────────────────────────
800
801 class TestDataIntegrity:
802 @pytest.mark.asyncio
803 async def test_upsert_atomic_replace_removes_stale_entries(
804 self, db_session: AsyncSession
805 ) -> None:
806 """Different snap_ids store different manifests independently."""
807 from musehub.services.musehub_snapshot import (
808 get_snapshot_manifest,
809 upsert_snapshot_entries,
810 )
811 repo = await create_repo(db_session, slug="di-atomic")
812 snap_id_a = "snap-atomic-a"
813 snap_id_b = "snap-atomic-b"
814 await upsert_snapshot_entries(
815 db_session, repo.repo_id, snap_id_a,
816 {"old_file.py": "sha256:old", "shared.py": "sha256:shared"},
817 )
818 await db_session.commit()
819
820 await upsert_snapshot_entries(
821 db_session, repo.repo_id, snap_id_b,
822 {"new_file.py": "sha256:new"},
823 )
824 await db_session.commit()
825
826 manifest_b = await get_snapshot_manifest(db_session, snap_id_b)
827 assert "new_file.py" in manifest_b
828 assert "old_file.py" not in manifest_b
829
830 manifest_a = await get_snapshot_manifest(db_session, snap_id_a)
831 assert "old_file.py" in manifest_a
832
833 @pytest.mark.asyncio
834 async def test_only_one_result_per_intel_type_after_multiple_builds(
835 self, db_session: AsyncSession
836 ) -> None:
837 from sqlalchemy import select, func
838
839 repo = await create_repo(db_session, slug="di-onerow")
840 for i in range(3):
841 c = await _commit_with_delta(
842 db_session, repo.repo_id, f"di-c{i:03d}",
843 ops=[_insert_op(f"f{i}.py::Sym")],
844 )
845 await _build_and_persist(db_session, repo.repo_id, c.commit_id)
846 await db_session.commit()
847
848 # code.symbol_history is no longer a blob.
849 # intel_summary must exist with exactly one row (upserted each push).
850 count = (await db_session.execute(
851 select(func.count()).select_from(MusehubIntelResult).where(
852 MusehubIntelResult.repo_id == repo.repo_id,
853 MusehubIntelResult.intel_type == "code.intel_summary",
854 )
855 )).scalar_one()
856 assert count == 1
857
858 @pytest.mark.asyncio
859 async def test_symbol_history_includes_commit_id_and_timestamp(
860 self, db_session: AsyncSession
861 ) -> None:
862 repo = await create_repo(db_session, slug="di-fields")
863 commit = await _commit_with_delta(
864 db_session, repo.repo_id, "di-field-001",
865 ops=[_insert_op("service.py::MyFn", "sha256:myfn")],
866 )
867 from musehub.services.musehub_symbol_indexer import load_symbol_history
868 await _build_and_persist(db_session, repo.repo_id, commit.commit_id)
869 await db_session.commit()
870
871 entries = await load_symbol_history(db_session, repo.repo_id)
872 entry = entries["service.py::MyFn"][0]
873 assert entry["commit_id"] == commit.commit_id
874 assert entry["committed_at"] != ""
875 assert entry["op"] == "insert"
876 assert entry["content_id"] == "sha256:myfn"
877
878
879 # ─────────────────────────────────────────────────────────────────────────────
880 # Layer 5 — Security
881 # ─────────────────────────────────────────────────────────────────────────────
882
883 class TestSecurity:
884 @pytest.mark.asyncio
885 async def test_corrupt_json_returns_empty_not_exception(
886 self, db_session: AsyncSession
887 ) -> None:
888 """A corrupt code.symbol_history data_json must return {} — not raise."""
889 from musehub.services.musehub_symbol_indexer import load_symbol_history
890 from musehub.core.genesis import compute_intel_result_id
891
892 repo = await create_repo(db_session, slug="sec-corrupt")
893 # Manually insert a row with garbage JSON
894 result_id = compute_intel_result_id(repo.repo_id, "code.symbol_history", "bad-ref")
895 from sqlalchemy.dialects.postgresql import insert as pg_insert
896 await db_session.execute(
897 pg_insert(MusehubIntelResult).values(
898 result_id=result_id,
899 repo_id=repo.repo_id,
900 intel_type="code.symbol_history",
901 domain="code",
902 ref="bad-ref",
903 data_json="not valid json {{{{",
904 schema_version=1,
905 computed_at=_now(),
906 ).on_conflict_do_nothing()
907 )
908 await db_session.commit()
909
910 result = await load_symbol_history(db_session, repo.repo_id)
911 assert result == {}
912
913 @pytest.mark.asyncio
914 async def test_build_with_unknown_head_commit_returns_empty(
915 self, db_session: AsyncSession
916 ) -> None:
917 """Unknown head_commit_id must return [], not raise."""
918 from musehub.services.musehub_symbol_indexer import build_symbol_index
919
920 repo = await create_repo(db_session, slug="sec-unknown-head")
921 results = await build_symbol_index(
922 db_session, repo.repo_id, "nonexistent-commit-id"
923 )
924 assert results == []
925
926 @pytest.mark.asyncio
927 async def test_corrupt_hash_occurrence_returns_empty(
928 self, db_session: AsyncSession
929 ) -> None:
930 from musehub.services.musehub_symbol_indexer import load_hash_occurrence
931 from musehub.core.genesis import compute_intel_result_id
932 from sqlalchemy.dialects.postgresql import insert as pg_insert
933
934 repo = await create_repo(db_session, slug="sec-corrupt-hash")
935 result_id = compute_intel_result_id(repo.repo_id, "code.hash_occurrence", "bad-ref")
936 await db_session.execute(
937 pg_insert(MusehubIntelResult).values(
938 result_id=result_id,
939 repo_id=repo.repo_id,
940 intel_type="code.hash_occurrence",
941 domain="code",
942 ref="bad-ref",
943 data_json="} invalid {",
944 schema_version=1,
945 computed_at=_now(),
946 ).on_conflict_do_nothing()
947 )
948 await db_session.commit()
949
950 result = await load_hash_occurrence(db_session, repo.repo_id)
951 assert result == {}
952
953
954 # ─────────────────────────────────────────────────────────────────────────────
955 # Layer 5B — Per-symbol intel
956 # ─────────────────────────────────────────────────────────────────────────────
957
958 class TestPerSymbolIntel:
959 @pytest.mark.asyncio
960 async def test_early_return_when_already_current(
961 self, db_session: AsyncSession
962 ) -> None:
963 """When the index is current and code.per_symbol_intel exists,
964 build_symbol_index must return [] (early exit, no recompute)."""
965 from musehub.services.musehub_symbol_indexer import build_symbol_index
966
967 repo = await create_repo(db_session, slug="bfil-current")
968 commit = await _commit_with_delta(
969 db_session, repo.repo_id, "bfil-c001",
970 ops=[_insert_op("svc.py::Handler", "sha256:h1")],
971 )
972 await _build_and_persist(db_session, repo.repo_id, commit.commit_id)
973 await db_session.commit()
974
975 # Second call with same head: must early-return (empty list).
976 results2 = await build_symbol_index(db_session, repo.repo_id, commit.commit_id)
977 assert results2 == [], (
978 "build_symbol_index must return [] when index is current "
979 "and per_symbol_intel result exists."
980 )
981
982 @pytest.mark.asyncio
983 async def test_per_symbol_intel_populated_on_first_build(
984 self, db_session: AsyncSession
985 ) -> None:
986 from musehub.services.musehub_symbol_indexer import lookup_symbol_intel
987 repo = await create_repo(db_session, slug="bfil-fresh")
988 commit = await _commit_with_delta(
989 db_session, repo.repo_id, "bfil-fresh-c001",
990 ops=[_insert_op("api.py::Router", "sha256:r1")],
991 )
992 await _build_and_persist(db_session, repo.repo_id, commit.commit_id)
993 await db_session.commit()
994
995 psi_data = await lookup_symbol_intel(db_session, repo.repo_id, ["api.py::Router"])
996 assert "api.py::Router" in psi_data
997
998 @pytest.mark.asyncio
999 async def test_per_symbol_intel_contains_expected_fields(
1000 self, db_session: AsyncSession
1001 ) -> None:
1002 from musehub.services.musehub_symbol_indexer import lookup_symbol_intel
1003 repo = await create_repo(db_session, slug="bfil-fields")
1004 commit = await _commit_with_delta(
1005 db_session, repo.repo_id, "bfil-fields-c001",
1006 ops=[_insert_op("lib.py::Parser", "sha256:p1")],
1007 )
1008 await _build_and_persist(db_session, repo.repo_id, commit.commit_id)
1009 await db_session.commit()
1010
1011 psi_data = await lookup_symbol_intel(db_session, repo.repo_id, ["lib.py::Parser"])
1012 entry = psi_data["lib.py::Parser"]
1013 for field in ("churn", "churn_30d", "churn_90d", "blast", "blast_direct",
1014 "blast_cross", "blast_top", "last_changed", "last_author",
1015 "author_count", "gravity", "weekly"):
1016 assert field in entry, f"Missing field '{field}' in per_symbol intel entry."
1017
1018 @pytest.mark.asyncio
1019 async def test_author_count_reflects_unique_authors(
1020 self, db_session: AsyncSession
1021 ) -> None:
1022 repo = await create_repo(db_session, slug="bfil-authors")
1023 authors_seq = [("alice", "bfil-authors-c001"), ("bob", "bfil-authors-c002"), ("alice", "bfil-authors-c003")]
1024 prev_id: list[str] = []
1025 for i, (author, cid) in enumerate(authors_seq, start=1):
1026 commit = await _commit_with_delta(
1027 db_session, repo.repo_id, cid,
1028 ops=[_insert_op("lib.py::Widget", f"sha256:w{i}")],
1029 parent_ids=prev_id,
1030 author=author,
1031 )
1032 prev_id = [cid]
1033 await _build_and_persist(db_session, repo.repo_id, commit.commit_id)
1034 await db_session.commit()
1035
1036 from musehub.services.musehub_symbol_indexer import lookup_symbol_intel
1037 psi_data = await lookup_symbol_intel(db_session, repo.repo_id, ["lib.py::Widget"])
1038 entry = psi_data["lib.py::Widget"]
1039 assert entry["author_count"] == 2, (
1040 f"Expected 2 unique authors (alice, bob), got {entry['author_count']}"
1041 )
1042 assert entry["churn"] == 3
1043
1044 @pytest.mark.asyncio
1045 async def test_lookup_symbol_intel_returns_matching_addresses(
1046 self, db_session: AsyncSession
1047 ) -> None:
1048 from musehub.services.musehub_symbol_indexer import lookup_symbol_intel
1049
1050 repo = await create_repo(db_session, slug="bfil-lookup")
1051 commit = await _commit_with_delta(
1052 db_session, repo.repo_id, "bfil-lookup-c001",
1053 ops=[
1054 _insert_op("a.py::Foo", "sha256:f1"),
1055 _insert_op("b.py::Bar", "sha256:b1"),
1056 _insert_op("c.py::Baz", "sha256:z1"),
1057 ],
1058 )
1059 await _build_and_persist(db_session, repo.repo_id, commit.commit_id)
1060 await db_session.commit()
1061
1062 result = await lookup_symbol_intel(db_session, repo.repo_id, ["a.py::Foo", "c.py::Baz"])
1063 assert set(result.keys()) == {"a.py::Foo", "c.py::Baz"}
1064 assert "b.py::Bar" not in result
1065
1066 @pytest.mark.asyncio
1067 async def test_lookup_symbol_intel_returns_empty_when_no_index(
1068 self, db_session: AsyncSession
1069 ) -> None:
1070 from musehub.services.musehub_symbol_indexer import lookup_symbol_intel
1071
1072 repo = await create_repo(db_session, slug="bfil-lookup-null")
1073 result = await lookup_symbol_intel(db_session, repo.repo_id, ["core.py::Engine"])
1074 assert result == {}
1075
1076
1077 # ─────────────────────────────────────────────────────────────────────────────
1078 # Layer 6 — Stress
1079 # ─────────────────────────────────────────────────────────────────────────────
1080
1081 class TestStress:
1082 @pytest.mark.asyncio
1083 async def test_upsert_1000_file_manifest(self, db_session: AsyncSession) -> None:
1084 from musehub.services.musehub_snapshot import (
1085 get_snapshot_manifest,
1086 upsert_snapshot_entries,
1087 )
1088 repo = await create_repo(db_session, slug="stress-1k-snap")
1089 snap_id = "snap-1k"
1090 manifest = {f"src/file_{i:04d}.py": long_id(f"{i:04d}") for i in range(1000)}
1091
1092 await upsert_snapshot_entries(db_session, repo.repo_id, snap_id, manifest)
1093 await db_session.commit()
1094
1095 result = await get_snapshot_manifest(db_session, snap_id)
1096 assert len(result) == 1000
1097 assert result["src/file_0500.py"] == "sha256:0500"
1098
1099 @pytest.mark.asyncio
1100 async def test_batch_manifest_50_snapshots(self, db_session: AsyncSession) -> None:
1101 from musehub.services.musehub_snapshot import (
1102 get_snapshot_manifests_batch,
1103 upsert_snapshot_entries,
1104 )
1105 repo = await create_repo(db_session, slug="stress-batch-50")
1106 ids: list[str] = []
1107 for i in range(50):
1108 sid = f"stress-snap-{i:02d}"
1109 ids.append(sid)
1110 await upsert_snapshot_entries(
1111 db_session, repo.repo_id, sid,
1112 {f"f{i}.py": long_id(f"{i}")},
1113 )
1114 await db_session.commit()
1115
1116 result = await get_snapshot_manifests_batch(db_session, ids)
1117 assert len(result) == 50
1118 for i, sid in enumerate(ids):
1119 assert f"f{i}.py" in result[sid]
1120
1121 @pytest.mark.asyncio
1122 async def test_build_symbol_index_100_commits(
1123 self, db_session: AsyncSession
1124 ) -> None:
1125 """100-commit chain with 5 ops each — indexer must complete successfully."""
1126 from musehub.services.musehub_symbol_indexer import load_symbol_history
1127
1128 repo = await create_repo(db_session, slug="stress-100-commits")
1129 prev_id: str | None = None
1130 head_id = "stress-head"
1131 for i in range(100):
1132 cid = f"stress-{i:04d}" if i < 99 else head_id
1133 ops = [_insert_op(f"file{i}.py::Sym{j}", long_id(f"{i}{j}")) for j in range(5)]
1134 commit = await _commit_with_delta(
1135 db_session, repo.repo_id, cid, ops=ops,
1136 parent_ids=[prev_id] if prev_id else [],
1137 )
1138 prev_id = commit.commit_id
1139
1140 await _build_and_persist(db_session, repo.repo_id, head_id)
1141 await db_session.commit()
1142
1143 history = await load_symbol_history(db_session, repo.repo_id)
1144 # 100 files × 5 symbols each = 500 top-level symbol entries
1145 assert len(history) == 500
1146
1147 @pytest.mark.asyncio
1148 async def test_load_symbol_history_file_filter_on_large_index(
1149 self, db_session: AsyncSession
1150 ) -> None:
1151 """Filter on large index returns only matching addresses."""
1152 from musehub.services.musehub_symbol_indexer import load_symbol_history
1153
1154 repo = await create_repo(db_session, slug="stress-filter-large")
1155 ops = []
1156 for i in range(50):
1157 for j in range(10):
1158 ops.append(_insert_op(f"src/module_{i:02d}.py::Sym{j}", long_id(f"{i}{j}")))
1159
1160 commit = await _commit_with_delta(db_session, repo.repo_id, "stress-fl-head", ops=ops)
1161 await _build_and_persist(db_session, repo.repo_id, commit.commit_id)
1162 await db_session.commit()
1163
1164 result = await load_symbol_history(db_session, repo.repo_id, file_path="src/module_05.py")
1165 assert len(result) == 10
1166 for key in result:
1167 assert key.startswith("src/module_05.py")
1168
1169
1170 # ─────────────────────────────────────────────────────────────────────────────
1171 # Layer: backfill_genesis_ops
1172 # ─────────────────────────────────────────────────────────────────────────────
1173
1174 class TestBackfillGenesisOps:
1175 """backfill_genesis_ops corrects birth entries that were indexed as
1176 op='modify' because the genesis commit had no structured_delta."""
1177
1178 async def _seed_bad_birth(
1179 self,
1180 session: AsyncSession,
1181 repo_id: str,
1182 address: str = "src/a.py::my_fn",
1183 op: str = "modify",
1184 ) -> MusehubSymbolHistoryEntry:
1185 """Insert a history entry that simulates a mis-indexed birth op."""
1186 from datetime import timedelta
1187 entry = MusehubSymbolHistoryEntry(
1188 repo_id=repo_id,
1189 address=address,
1190 commit_id=blob_id(secrets.token_bytes(16)),
1191 committed_at=_now() - timedelta(days=10),
1192 author="gabriel",
1193 op=op,
1194 content_id=blob_id(secrets.token_bytes(16)),
1195 )
1196 session.add(entry)
1197 await session.flush()
1198 return entry
1199
1200 @pytest.mark.asyncio
1201 async def test_dry_run_returns_count_without_writing(
1202 self, db_session: AsyncSession
1203 ) -> None:
1204 from musehub.services.musehub_symbol_indexer import backfill_genesis_ops
1205 from sqlalchemy import select
1206
1207 repo = await create_repo(db_session, slug="bf-dry-run")
1208 await self._seed_bad_birth(db_session, repo.repo_id, op="modify")
1209 await db_session.flush()
1210
1211 count = await backfill_genesis_ops(db_session, repo_id=repo.repo_id, dry_run=True)
1212 assert count == 1
1213
1214 # Nothing written — row still has op='modify'
1215 rows = (await db_session.execute(
1216 select(MusehubSymbolHistoryEntry).where(
1217 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
1218 )
1219 )).scalars().all()
1220 assert all(r.op == "modify" for r in rows)
1221
1222 @pytest.mark.asyncio
1223 async def test_corrects_modify_to_add(self, db_session: AsyncSession) -> None:
1224 from musehub.services.musehub_symbol_indexer import backfill_genesis_ops
1225 from sqlalchemy import select
1226
1227 repo = await create_repo(db_session, slug="bf-modify")
1228 entry = await self._seed_bad_birth(db_session, repo.repo_id, op="modify")
1229 await db_session.flush()
1230
1231 updated = await backfill_genesis_ops(db_session, repo_id=repo.repo_id)
1232 assert updated == 1
1233
1234 refreshed = (await db_session.execute(
1235 select(MusehubSymbolHistoryEntry).where(
1236 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
1237 MusehubSymbolHistoryEntry.address == entry.address,
1238 MusehubSymbolHistoryEntry.commit_id == entry.commit_id,
1239 )
1240 )).scalar_one()
1241 assert refreshed.op == "add"
1242
1243 @pytest.mark.asyncio
1244 async def test_skips_entries_already_add(self, db_session: AsyncSession) -> None:
1245 from musehub.services.musehub_symbol_indexer import backfill_genesis_ops
1246
1247 repo = await create_repo(db_session, slug="bf-already-add")
1248 await self._seed_bad_birth(db_session, repo.repo_id, op="add")
1249 await db_session.flush()
1250
1251 updated = await backfill_genesis_ops(db_session, repo_id=repo.repo_id)
1252 assert updated == 0
1253
1254 @pytest.mark.asyncio
1255 async def test_only_corrects_oldest_entry_not_later_modifies(
1256 self, db_session: AsyncSession
1257 ) -> None:
1258 """A subsequent modify on the same symbol must not be changed."""
1259 from datetime import timedelta
1260 from musehub.services.musehub_symbol_indexer import backfill_genesis_ops
1261 from sqlalchemy import select
1262
1263 repo = await create_repo(db_session, slug="bf-oldest-only")
1264 address = "src/b.py::helper"
1265
1266 birth = MusehubSymbolHistoryEntry(
1267 repo_id=repo.repo_id,
1268 address=address,
1269 commit_id=blob_id(secrets.token_bytes(16)),
1270 committed_at=_now() - timedelta(days=5),
1271 author="gabriel",
1272 op="modify",
1273 content_id=blob_id(secrets.token_bytes(16)),
1274 )
1275 later = MusehubSymbolHistoryEntry(
1276 repo_id=repo.repo_id,
1277 address=address,
1278 commit_id=blob_id(secrets.token_bytes(16)),
1279 committed_at=_now() - timedelta(days=1),
1280 author="gabriel",
1281 op="modify",
1282 content_id=blob_id(secrets.token_bytes(16)),
1283 )
1284 session = db_session
1285 session.add(birth)
1286 session.add(later)
1287 await session.flush()
1288
1289 updated = await backfill_genesis_ops(session, repo_id=repo.repo_id)
1290 assert updated == 1
1291
1292 rows = (await session.execute(
1293 select(MusehubSymbolHistoryEntry)
1294 .where(
1295 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
1296 MusehubSymbolHistoryEntry.address == address,
1297 )
1298 .order_by(MusehubSymbolHistoryEntry.committed_at.asc())
1299 )).scalars().all()
1300 assert rows[0].op == "add" # birth corrected
1301 assert rows[1].op == "modify" # later change untouched
1302
1303 @pytest.mark.asyncio
1304 async def test_repo_id_none_corrects_all_repos(
1305 self, db_session: AsyncSession
1306 ) -> None:
1307 from musehub.services.musehub_symbol_indexer import backfill_genesis_ops
1308
1309 repo_a = await create_repo(db_session, slug="bf-all-a")
1310 repo_b = await create_repo(db_session, slug="bf-all-b")
1311 await self._seed_bad_birth(db_session, repo_a.repo_id, op="modify")
1312 await self._seed_bad_birth(db_session, repo_b.repo_id, op="modify")
1313 await db_session.flush()
1314
1315 updated = await backfill_genesis_ops(db_session, repo_id=None)
1316 assert updated >= 2
1317
1318 @pytest.mark.asyncio
1319 async def test_idempotent(self, db_session: AsyncSession) -> None:
1320 """Running twice returns 0 on the second pass."""
1321 from musehub.services.musehub_symbol_indexer import backfill_genesis_ops
1322
1323 repo = await create_repo(db_session, slug="bf-idempotent")
1324 await self._seed_bad_birth(db_session, repo.repo_id, op="modify")
1325 await db_session.flush()
1326
1327 first = await backfill_genesis_ops(db_session, repo_id=repo.repo_id)
1328 assert first == 1
1329 second = await backfill_genesis_ops(db_session, repo_id=repo.repo_id)
1330 assert second == 0
1331
1332
1333 # ─────────────────────────────────────────────────────────────────────────────
1334 # Layer: backfill_content_ids_from_snapshots
1335 # ─────────────────────────────────────────────────────────────────────────────
1336
1337 class TestBackfillContentIdsFromSnapshots:
1338 """backfill_content_ids_from_snapshots fills missing content_id values
1339 on file-level history entries by reading snapshot manifests from the DAG."""
1340
1341 async def _seed_snapshot_and_commit(
1342 self,
1343 session: AsyncSession,
1344 repo_id: str,
1345 manifest: dict[str, str],
1346 commit_id: str | None = None,
1347 ) -> tuple[MusehubSnapshot, MusehubCommit]:
1348 """Insert a snapshot (msgpack manifest) and a commit pointing to it."""
1349 import msgpack
1350
1351 cid = commit_id or blob_id(secrets.token_bytes(16))
1352 snap_id = blob_id(secrets.token_bytes(16))
1353
1354 snapshot = MusehubSnapshot(
1355 snapshot_id=snap_id,
1356 directories=[],
1357 manifest_blob=msgpack.packb(manifest, use_bin_type=True),
1358 entry_count=len(manifest),
1359 )
1360 session.add(snapshot)
1361 session.add(MusehubSnapshotRef(repo_id=repo_id, snapshot_id=snap_id))
1362
1363 commit = MusehubCommit(
1364 commit_id=cid,
1365 branch="main",
1366 parent_ids=[],
1367 message="test commit",
1368 author="gabriel",
1369 timestamp=_now(),
1370 snapshot_id=snap_id,
1371 )
1372 session.add(commit)
1373 session.add(MusehubCommitRef(repo_id=repo_id, commit_id=cid))
1374 await session.flush()
1375 return snapshot, commit
1376
1377 async def _seed_missing_entry(
1378 self,
1379 session: AsyncSession,
1380 repo_id: str,
1381 address: str,
1382 commit_id: str,
1383 ) -> MusehubSymbolHistoryEntry:
1384 """Insert a file-level history entry with content_id=None."""
1385 entry = MusehubSymbolHistoryEntry(
1386 repo_id=repo_id,
1387 address=address,
1388 commit_id=commit_id,
1389 committed_at=_now(),
1390 author="gabriel",
1391 op="add",
1392 content_id=None,
1393 )
1394 session.add(entry)
1395 await session.flush()
1396 return entry
1397
1398 @pytest.mark.asyncio
1399 async def test_dry_run_returns_count_without_writing(
1400 self, db_session: AsyncSession
1401 ) -> None:
1402 from musehub.services.musehub_symbol_indexer import backfill_content_ids_from_snapshots
1403 from sqlalchemy import select
1404
1405 repo = await create_repo(db_session, slug="bcid-dry")
1406 address = "src/app.ts"
1407 content_id = blob_id(secrets.token_bytes(16))
1408 _, commit = await self._seed_snapshot_and_commit(
1409 db_session, repo.repo_id, {address: content_id}
1410 )
1411 await self._seed_missing_entry(db_session, repo.repo_id, address, commit.commit_id)
1412 await db_session.flush()
1413
1414 count = await backfill_content_ids_from_snapshots(
1415 db_session, repo_id=repo.repo_id, dry_run=True
1416 )
1417 assert count == 1
1418
1419 # Nothing written — content_id still None
1420 rows = (await db_session.execute(
1421 select(MusehubSymbolHistoryEntry).where(
1422 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
1423 )
1424 )).scalars().all()
1425 assert all(r.content_id is None for r in rows)
1426
1427 @pytest.mark.asyncio
1428 async def test_fills_content_id_from_manifest(
1429 self, db_session: AsyncSession
1430 ) -> None:
1431 from musehub.services.musehub_symbol_indexer import backfill_content_ids_from_snapshots
1432 from sqlalchemy import select
1433
1434 repo = await create_repo(db_session, slug="bcid-fill")
1435 address = "src/app.ts"
1436 expected_cid = blob_id(secrets.token_bytes(16))
1437 _, commit = await self._seed_snapshot_and_commit(
1438 db_session, repo.repo_id, {address: expected_cid}
1439 )
1440 await self._seed_missing_entry(db_session, repo.repo_id, address, commit.commit_id)
1441 await db_session.flush()
1442
1443 updated = await backfill_content_ids_from_snapshots(db_session, repo_id=repo.repo_id)
1444 assert updated == 1
1445
1446 row = (await db_session.execute(
1447 select(MusehubSymbolHistoryEntry).where(
1448 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
1449 MusehubSymbolHistoryEntry.address == address,
1450 MusehubSymbolHistoryEntry.commit_id == commit.commit_id,
1451 )
1452 )).scalar_one()
1453 assert row.content_id == expected_cid
1454
1455 @pytest.mark.asyncio
1456 async def test_skips_symbol_level_addresses(
1457 self, db_session: AsyncSession
1458 ) -> None:
1459 """Entries with '::' in the address are symbol-level and must be skipped."""
1460 from musehub.services.musehub_symbol_indexer import backfill_content_ids_from_snapshots
1461
1462 repo = await create_repo(db_session, slug="bcid-sym")
1463 address = "src/app.ts::MyClass"
1464 content_id = blob_id(secrets.token_bytes(16))
1465 _, commit = await self._seed_snapshot_and_commit(
1466 db_session, repo.repo_id, {"src/app.ts": content_id}
1467 )
1468 entry = MusehubSymbolHistoryEntry(
1469 repo_id=repo.repo_id,
1470 address=address,
1471 commit_id=commit.commit_id,
1472 committed_at=_now(),
1473 author="gabriel",
1474 op="add",
1475 content_id=None,
1476 )
1477 db_session.add(entry)
1478 await db_session.flush()
1479
1480 updated = await backfill_content_ids_from_snapshots(db_session, repo_id=repo.repo_id)
1481 assert updated == 0
1482
1483 @pytest.mark.asyncio
1484 async def test_skips_entries_already_with_content_id(
1485 self, db_session: AsyncSession
1486 ) -> None:
1487 from musehub.services.musehub_symbol_indexer import backfill_content_ids_from_snapshots
1488 from sqlalchemy import select
1489
1490 repo = await create_repo(db_session, slug="bcid-skip")
1491 address = "src/keep.py"
1492 existing_cid = blob_id(secrets.token_bytes(16))
1493 manifest_cid = blob_id(secrets.token_bytes(16))
1494 _, commit = await self._seed_snapshot_and_commit(
1495 db_session, repo.repo_id, {address: manifest_cid}
1496 )
1497 entry = MusehubSymbolHistoryEntry(
1498 repo_id=repo.repo_id,
1499 address=address,
1500 commit_id=commit.commit_id,
1501 committed_at=_now(),
1502 author="gabriel",
1503 op="add",
1504 content_id=existing_cid,
1505 )
1506 db_session.add(entry)
1507 await db_session.flush()
1508
1509 updated = await backfill_content_ids_from_snapshots(db_session, repo_id=repo.repo_id)
1510 assert updated == 0
1511
1512 # Original content_id preserved
1513 row = (await db_session.execute(
1514 select(MusehubSymbolHistoryEntry).where(
1515 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
1516 )
1517 )).scalar_one()
1518 assert row.content_id == existing_cid
1519
1520 @pytest.mark.asyncio
1521 async def test_skips_entry_when_path_absent_from_manifest(
1522 self, db_session: AsyncSession
1523 ) -> None:
1524 """If the manifest doesn't contain the address, the entry is left alone."""
1525 from musehub.services.musehub_symbol_indexer import backfill_content_ids_from_snapshots
1526 from sqlalchemy import select
1527
1528 repo = await create_repo(db_session, slug="bcid-absent")
1529 address = "src/ghost.py"
1530 _, commit = await self._seed_snapshot_and_commit(
1531 db_session, repo.repo_id, {"src/other.py": blob_id(secrets.token_bytes(16))}
1532 )
1533 await self._seed_missing_entry(db_session, repo.repo_id, address, commit.commit_id)
1534 await db_session.flush()
1535
1536 updated = await backfill_content_ids_from_snapshots(db_session, repo_id=repo.repo_id)
1537 assert updated == 0
1538
1539 row = (await db_session.execute(
1540 select(MusehubSymbolHistoryEntry).where(
1541 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
1542 )
1543 )).scalar_one()
1544 assert row.content_id is None
1545
1546 @pytest.mark.asyncio
1547 async def test_repo_id_none_fills_all_repos(
1548 self, db_session: AsyncSession
1549 ) -> None:
1550 from musehub.services.musehub_symbol_indexer import backfill_content_ids_from_snapshots
1551
1552 repo_a = await create_repo(db_session, slug="bcid-all-a")
1553 repo_b = await create_repo(db_session, slug="bcid-all-b")
1554 cid_a = blob_id(secrets.token_bytes(16))
1555 cid_b = blob_id(secrets.token_bytes(16))
1556
1557 _, commit_a = await self._seed_snapshot_and_commit(
1558 db_session, repo_a.repo_id, {"src/a.py": cid_a}
1559 )
1560 _, commit_b = await self._seed_snapshot_and_commit(
1561 db_session, repo_b.repo_id, {"src/b.py": cid_b}
1562 )
1563 await self._seed_missing_entry(db_session, repo_a.repo_id, "src/a.py", commit_a.commit_id)
1564 await self._seed_missing_entry(db_session, repo_b.repo_id, "src/b.py", commit_b.commit_id)
1565 await db_session.flush()
1566
1567 updated = await backfill_content_ids_from_snapshots(db_session, repo_id=None)
1568 assert updated >= 2
1569
1570 @pytest.mark.asyncio
1571 async def test_idempotent(self, db_session: AsyncSession) -> None:
1572 """Running twice returns 0 on the second pass."""
1573 from musehub.services.musehub_symbol_indexer import backfill_content_ids_from_snapshots
1574
1575 repo = await create_repo(db_session, slug="bcid-idem")
1576 address = "src/main.py"
1577 cid = blob_id(secrets.token_bytes(16))
1578 _, commit = await self._seed_snapshot_and_commit(
1579 db_session, repo.repo_id, {address: cid}
1580 )
1581 await self._seed_missing_entry(db_session, repo.repo_id, address, commit.commit_id)
1582 await db_session.flush()
1583
1584 first = await backfill_content_ids_from_snapshots(db_session, repo_id=repo.repo_id)
1585 assert first == 1
1586 second = await backfill_content_ids_from_snapshots(db_session, repo_id=repo.repo_id)
1587 assert second == 0
1588
1589 @pytest.mark.asyncio
1590 async def test_corrupt_manifest_blob_is_skipped_gracefully(
1591 self, db_session: AsyncSession
1592 ) -> None:
1593 """A corrupt manifest blob must not raise — entry is left with content_id=None."""
1594 from musehub.services.musehub_symbol_indexer import backfill_content_ids_from_snapshots
1595 from sqlalchemy import select
1596
1597 repo = await create_repo(db_session, slug="bcid-corrupt")
1598 snap_id = blob_id(secrets.token_bytes(16))
1599 commit_id = blob_id(secrets.token_bytes(16))
1600 address = "src/broken.py"
1601
1602 snapshot = MusehubSnapshot(
1603 snapshot_id=snap_id,
1604 directories=[],
1605 manifest_blob=b"\xff\xfe not msgpack",
1606 entry_count=0,
1607 )
1608 db_session.add(snapshot)
1609 db_session.add(MusehubSnapshotRef(repo_id=repo.repo_id, snapshot_id=snap_id))
1610
1611 commit = MusehubCommit(
1612 commit_id=commit_id,
1613 branch="main",
1614 parent_ids=[],
1615 message="corrupt test",
1616 author="gabriel",
1617 timestamp=_now(),
1618 snapshot_id=snap_id,
1619 )
1620 db_session.add(commit)
1621 db_session.add(MusehubCommitRef(repo_id=repo.repo_id, commit_id=commit_id))
1622 await self._seed_missing_entry(db_session, repo.repo_id, address, commit_id)
1623 await db_session.flush()
1624
1625 # Must not raise
1626 updated = await backfill_content_ids_from_snapshots(db_session, repo_id=repo.repo_id)
1627 assert updated == 0
1628
1629 row = (await db_session.execute(
1630 select(MusehubSymbolHistoryEntry).where(
1631 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
1632 )
1633 )).scalar_one()
1634 assert row.content_id is None
1635
1636
1637 # ─────────────────────────────────────────────────────────────────────────────
1638 # Layer: backfill_raw_ops_from_commits
1639 # ─────────────────────────────────────────────────────────────────────────────
1640
1641 class TestBackfillRawOpsFromCommits:
1642 """backfill_raw_ops_from_commits re-indexes stale coarse-op rows by reading
1643 the original structured_delta from commit_meta."""
1644
1645 async def _seed_commit_with_meta(
1646 self,
1647 session: AsyncSession,
1648 repo_id: str,
1649 ops: list[dict],
1650 commit_id: str | None = None,
1651 ) -> MusehubCommit:
1652 cid = commit_id or blob_id(secrets.token_bytes(16))
1653 commit = MusehubCommit(
1654 commit_id=cid,
1655 branch="main",
1656 parent_ids=[],
1657 message="test",
1658 author="gabriel",
1659 timestamp=_now(),
1660 structured_delta={"ops": ops},
1661 )
1662 session.add(commit)
1663 session.add(MusehubCommitRef(repo_id=repo_id, commit_id=cid))
1664 await session.flush()
1665 return commit
1666
1667 async def _seed_stale_entry(
1668 self,
1669 session: AsyncSession,
1670 repo_id: str,
1671 address: str,
1672 commit_id: str,
1673 coarse_op: str,
1674 content_id: str | None = None,
1675 ) -> MusehubSymbolHistoryEntry:
1676 entry = MusehubSymbolHistoryEntry(
1677 repo_id=repo_id,
1678 address=address,
1679 commit_id=commit_id,
1680 committed_at=_now(),
1681 author="gabriel",
1682 op=coarse_op,
1683 op_payload=None,
1684 content_id=content_id,
1685 )
1686 session.add(entry)
1687 await session.flush()
1688 return entry
1689
1690 @pytest.mark.asyncio
1691 async def test_dry_run_returns_count_without_writing(
1692 self, db_session: AsyncSession
1693 ) -> None:
1694 from musehub.services.musehub_symbol_indexer import backfill_raw_ops_from_commits
1695 from sqlalchemy import select
1696
1697 repo = await create_repo(db_session, slug="bro-dry")
1698 commit = await self._seed_commit_with_meta(db_session, repo.repo_id, [
1699 {"address": "src/a.py::Fn", "op": "insert", "content_id": "sha256:aaa",
1700 "content_summary": "added Fn", "position": 0},
1701 ])
1702 await self._seed_stale_entry(db_session, repo.repo_id, "src/a.py::Fn",
1703 commit.commit_id, "add")
1704 await db_session.flush()
1705
1706 count = await backfill_raw_ops_from_commits(db_session, repo_id=repo.repo_id, dry_run=True)
1707 assert count == 1
1708
1709 row = (await db_session.execute(
1710 select(MusehubSymbolHistoryEntry).where(
1711 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
1712 )
1713 )).scalar_one()
1714 assert row.op == "add"
1715 assert row.op_payload is None
1716
1717 @pytest.mark.asyncio
1718 async def test_add_becomes_insert_with_payload(
1719 self, db_session: AsyncSession
1720 ) -> None:
1721 from musehub.services.musehub_symbol_indexer import backfill_raw_ops_from_commits
1722
1723 repo = await create_repo(db_session, slug="bro-insert")
1724 commit = await self._seed_commit_with_meta(db_session, repo.repo_id, [
1725 {"address": "src/a.py::Fn", "op": "insert", "content_id": "sha256:aaa",
1726 "content_summary": "added function Fn", "position": 0},
1727 ])
1728 await self._seed_stale_entry(db_session, repo.repo_id, "src/a.py::Fn",
1729 commit.commit_id, "add", "sha256:aaa")
1730 await db_session.flush()
1731
1732 updated = await backfill_raw_ops_from_commits(db_session, repo_id=repo.repo_id)
1733 assert updated == 1
1734
1735 row = (await db_session.execute(
1736 select(MusehubSymbolHistoryEntry).where(
1737 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
1738 )
1739 )).scalar_one()
1740 assert row.op == "insert"
1741 assert row.op_payload["content_summary"] == "added function Fn"
1742 assert row.op_payload["position"] == 0
1743 assert "op" not in row.op_payload
1744 assert "address" not in row.op_payload
1745
1746 @pytest.mark.asyncio
1747 async def test_modify_becomes_replace_with_payload(
1748 self, db_session: AsyncSession
1749 ) -> None:
1750 from musehub.services.musehub_symbol_indexer import backfill_raw_ops_from_commits
1751
1752 repo = await create_repo(db_session, slug="bro-replace")
1753 commit = await self._seed_commit_with_meta(db_session, repo.repo_id, [
1754 {"address": "src/b.py::Bar", "op": "replace",
1755 "old_content_id": "sha256:old", "new_content_id": "sha256:new",
1756 "old_summary": "Bar v1", "new_summary": "Bar v2", "position": None},
1757 ])
1758 await self._seed_stale_entry(db_session, repo.repo_id, "src/b.py::Bar",
1759 commit.commit_id, "modify", "sha256:new")
1760 await db_session.flush()
1761
1762 updated = await backfill_raw_ops_from_commits(db_session, repo_id=repo.repo_id)
1763 assert updated == 1
1764
1765 row = (await db_session.execute(
1766 select(MusehubSymbolHistoryEntry).where(
1767 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
1768 )
1769 )).scalar_one()
1770 assert row.op == "replace"
1771 assert row.op_payload["old_content_id"] == "sha256:old"
1772 assert row.op_payload["new_content_id"] == "sha256:new"
1773 assert row.op_payload["old_summary"] == "Bar v1"
1774
1775 @pytest.mark.asyncio
1776 async def test_modify_becomes_patch_for_file_level(
1777 self, db_session: AsyncSession
1778 ) -> None:
1779 from musehub.services.musehub_symbol_indexer import backfill_raw_ops_from_commits
1780
1781 repo = await create_repo(db_session, slug="bro-patch")
1782 commit = await self._seed_commit_with_meta(db_session, repo.repo_id, [
1783 {"address": "src/c.py", "op": "patch", "child_domain": "python",
1784 "child_summary": "3 symbols changed",
1785 "child_ops": [
1786 {"address": "src/c.py::Cls", "op": "replace",
1787 "old_content_id": "sha256:o", "new_content_id": "sha256:n",
1788 "old_summary": "Cls v1", "new_summary": "Cls v2", "position": 0},
1789 ]},
1790 ])
1791 # Both file-level and symbol-level stale entries
1792 await self._seed_stale_entry(db_session, repo.repo_id, "src/c.py",
1793 commit.commit_id, "modify")
1794 await self._seed_stale_entry(db_session, repo.repo_id, "src/c.py::Cls",
1795 commit.commit_id, "modify", "sha256:n")
1796 await db_session.flush()
1797
1798 updated = await backfill_raw_ops_from_commits(db_session, repo_id=repo.repo_id)
1799 assert updated == 2
1800
1801 rows = {r.address: r for r in (await db_session.execute(
1802 select(MusehubSymbolHistoryEntry).where(
1803 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
1804 )
1805 )).scalars().all()}
1806
1807 assert rows["src/c.py"].op == "patch"
1808 assert rows["src/c.py"].op_payload["child_summary"] == "3 symbols changed"
1809 assert "child_ops" not in rows["src/c.py"].op_payload
1810
1811 assert rows["src/c.py::Cls"].op == "replace"
1812 assert rows["src/c.py::Cls"].op_payload["old_content_id"] == "sha256:o"
1813
1814 @pytest.mark.asyncio
1815 async def test_already_correct_ops_not_touched(
1816 self, db_session: AsyncSession
1817 ) -> None:
1818 """delete and move are already correct raw values — must be skipped."""
1819 from musehub.services.musehub_symbol_indexer import backfill_raw_ops_from_commits
1820
1821 repo = await create_repo(db_session, slug="bro-skip")
1822 commit = await self._seed_commit_with_meta(db_session, repo.repo_id, [
1823 {"address": "src/d.py::Gone", "op": "delete",
1824 "content_id": "sha256:gone", "content_summary": "removed Gone", "position": 0},
1825 ])
1826 await self._seed_stale_entry(db_session, repo.repo_id, "src/d.py::Gone",
1827 commit.commit_id, "delete", "sha256:gone")
1828 await db_session.flush()
1829
1830 updated = await backfill_raw_ops_from_commits(db_session, repo_id=repo.repo_id)
1831 assert updated == 0
1832
1833 @pytest.mark.asyncio
1834 async def test_entry_missing_from_delta_left_alone(
1835 self, db_session: AsyncSession
1836 ) -> None:
1837 """If the delta has no matching address, the row is left untouched."""
1838 from musehub.services.musehub_symbol_indexer import backfill_raw_ops_from_commits
1839
1840 repo = await create_repo(db_session, slug="bro-missing")
1841 commit = await self._seed_commit_with_meta(db_session, repo.repo_id, [
1842 {"address": "src/other.py::X", "op": "insert",
1843 "content_id": "sha256:x", "content_summary": "added X", "position": 0},
1844 ])
1845 await self._seed_stale_entry(db_session, repo.repo_id, "src/ghost.py::Y",
1846 commit.commit_id, "add")
1847 await db_session.flush()
1848
1849 updated = await backfill_raw_ops_from_commits(db_session, repo_id=repo.repo_id)
1850 assert updated == 0
1851
1852 row = (await db_session.execute(
1853 select(MusehubSymbolHistoryEntry).where(
1854 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
1855 )
1856 )).scalar_one()
1857 assert row.op == "add"
1858
1859 @pytest.mark.asyncio
1860 async def test_repo_id_none_fixes_all_repos(
1861 self, db_session: AsyncSession
1862 ) -> None:
1863 from musehub.services.musehub_symbol_indexer import backfill_raw_ops_from_commits
1864
1865 repo_a = await create_repo(db_session, slug="bro-all-a")
1866 repo_b = await create_repo(db_session, slug="bro-all-b")
1867 for repo in (repo_a, repo_b):
1868 commit = await self._seed_commit_with_meta(db_session, repo.repo_id, [
1869 {"address": "f.py::Fn", "op": "insert", "content_id": "sha256:x",
1870 "content_summary": "added Fn", "position": 0},
1871 ])
1872 await self._seed_stale_entry(db_session, repo.repo_id, "f.py::Fn",
1873 commit.commit_id, "add")
1874 await db_session.flush()
1875
1876 updated = await backfill_raw_ops_from_commits(db_session, repo_id=None)
1877 assert updated >= 2
1878
1879 @pytest.mark.asyncio
1880 async def test_idempotent(self, db_session: AsyncSession) -> None:
1881 from musehub.services.musehub_symbol_indexer import backfill_raw_ops_from_commits
1882
1883 repo = await create_repo(db_session, slug="bro-idem")
1884 commit = await self._seed_commit_with_meta(db_session, repo.repo_id, [
1885 {"address": "src/e.py::E", "op": "insert", "content_id": "sha256:e",
1886 "content_summary": "added E", "position": 0},
1887 ])
1888 await self._seed_stale_entry(db_session, repo.repo_id, "src/e.py::E",
1889 commit.commit_id, "add")
1890 await db_session.flush()
1891
1892 first = await backfill_raw_ops_from_commits(db_session, repo_id=repo.repo_id)
1893 assert first == 1
1894 second = await backfill_raw_ops_from_commits(db_session, repo_id=repo.repo_id)
1895 assert second == 0
1896
1897
1898 # ─────────────────────────────────────────────────────────────────────────────
1899 # Layer 2 — Snapshot-diff backfill
1900 # ─────────────────────────────────────────────────────────────────────────────
1901
1902
1903 import msgpack # type: ignore[import]
1904
1905
1906 async def _seed_commit_with_snapshot(
1907 session: AsyncSession,
1908 repo_id: str,
1909 commit_id: str,
1910 manifest: dict[str, str],
1911 parent_ids: list[str] | None = None,
1912 branch: str = "main",
1913 timestamp: datetime | None = None,
1914 ) -> MusehubCommit:
1915 """Seed a commit + snapshot row. manifest maps path → object_id.
1916
1917 Snapshot is content-addressed; two commits with identical manifests share
1918 one snapshot row (INSERT ... ON CONFLICT DO NOTHING).
1919 """
1920 from sqlalchemy.dialects.postgresql import insert as pg_insert
1921 snap_id = blob_id(msgpack.packb(sorted(manifest.items()), use_bin_type=True))
1922 await session.execute(
1923 pg_insert(MusehubSnapshot).values(
1924 snapshot_id=snap_id,
1925 directories=[],
1926 manifest_blob=msgpack.packb(manifest, use_bin_type=True),
1927 entry_count=len(manifest),
1928 created_at=timestamp or _now(),
1929 ).on_conflict_do_nothing(index_elements=["snapshot_id"])
1930 )
1931 await session.execute(
1932 pg_insert(MusehubSnapshotRef).values(
1933 repo_id=repo_id,
1934 snapshot_id=snap_id,
1935 ).on_conflict_do_nothing()
1936 )
1937 commit = MusehubCommit(
1938 commit_id=commit_id,
1939 branch=branch,
1940 parent_ids=parent_ids or [],
1941 message="test",
1942 author="gabriel",
1943 timestamp=timestamp or _now(),
1944 snapshot_id=snap_id,
1945 )
1946 session.add(commit)
1947 session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id))
1948 await session.flush()
1949 return commit
1950
1951
1952 class TestBackfillHistoryFromSnapshots:
1953 """backfill_history_from_snapshots walks the commit graph, diffs adjacent
1954 snapshot manifests, and creates history entries for any address/commit pair
1955 not already covered by structured_delta indexing."""
1956
1957 @pytest.mark.asyncio
1958 async def test_genesis_commit_all_inserts(self, db_session: AsyncSession) -> None:
1959 """Every file in the first commit (no parent) is recorded as insert."""
1960 from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots
1961
1962 repo = await create_repo(db_session, slug="sdb-genesis")
1963 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1",
1964 {"a.py": "sha256:aaa", "b.py": "sha256:bbb"})
1965 await db_session.commit()
1966
1967 count = await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id)
1968 assert count == 2
1969
1970 rows = (await db_session.execute(
1971 select(MusehubSymbolHistoryEntry)
1972 .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id)
1973 )).scalars().all()
1974 ops = {r.address: r.op for r in rows}
1975 assert ops == {"a.py": "insert", "b.py": "insert"}
1976
1977 @pytest.mark.asyncio
1978 async def test_new_file_in_child_is_insert(self, db_session: AsyncSession) -> None:
1979 """A file present in commit N but absent from commit N-1 is an insert."""
1980 from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots
1981
1982 repo = await create_repo(db_session, slug="sdb-insert")
1983 t1 = datetime(2026, 1, 1, tzinfo=timezone.utc)
1984 t2 = datetime(2026, 1, 2, tzinfo=timezone.utc)
1985 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1",
1986 {"a.py": "sha256:aaa"}, timestamp=t1)
1987 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c2",
1988 {"a.py": "sha256:aaa", "b.py": "sha256:bbb"},
1989 parent_ids=["c1"], timestamp=t2)
1990 await db_session.commit()
1991
1992 await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id)
1993
1994 rows = (await db_session.execute(
1995 select(MusehubSymbolHistoryEntry)
1996 .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
1997 MusehubSymbolHistoryEntry.commit_id == "c2")
1998 )).scalars().all()
1999 ops = {r.address: r.op for r in rows}
2000 assert "b.py" in ops
2001 assert ops["b.py"] == "insert"
2002 # a.py content unchanged — no entry needed for c2
2003 assert "a.py" not in ops
2004
2005 @pytest.mark.asyncio
2006 async def test_changed_content_is_replace(self, db_session: AsyncSession) -> None:
2007 """A file with a different object_id in the child commit is a replace."""
2008 from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots
2009
2010 repo = await create_repo(db_session, slug="sdb-replace")
2011 t1 = datetime(2026, 1, 1, tzinfo=timezone.utc)
2012 t2 = datetime(2026, 1, 2, tzinfo=timezone.utc)
2013 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1",
2014 {"a.py": "sha256:v1"}, timestamp=t1)
2015 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c2",
2016 {"a.py": "sha256:v2"},
2017 parent_ids=["c1"], timestamp=t2)
2018 await db_session.commit()
2019
2020 await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id)
2021
2022 rows = (await db_session.execute(
2023 select(MusehubSymbolHistoryEntry)
2024 .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
2025 MusehubSymbolHistoryEntry.commit_id == "c2")
2026 )).scalars().all()
2027 assert len(rows) == 1
2028 assert rows[0].address == "a.py"
2029 assert rows[0].op == "replace"
2030 assert rows[0].content_id == "sha256:v2"
2031
2032 @pytest.mark.asyncio
2033 async def test_removed_file_is_delete(self, db_session: AsyncSession) -> None:
2034 """A file absent from the child but present in the parent is a delete."""
2035 from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots
2036
2037 repo = await create_repo(db_session, slug="sdb-delete")
2038 t1 = datetime(2026, 1, 1, tzinfo=timezone.utc)
2039 t2 = datetime(2026, 1, 2, tzinfo=timezone.utc)
2040 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1",
2041 {"a.py": "sha256:v1", "b.py": "sha256:vb"},
2042 timestamp=t1)
2043 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c2",
2044 {"a.py": "sha256:v1"},
2045 parent_ids=["c1"], timestamp=t2)
2046 await db_session.commit()
2047
2048 await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id)
2049
2050 rows = (await db_session.execute(
2051 select(MusehubSymbolHistoryEntry)
2052 .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
2053 MusehubSymbolHistoryEntry.commit_id == "c2")
2054 )).scalars().all()
2055 ops = {r.address: r.op for r in rows}
2056 assert ops.get("b.py") == "delete"
2057 assert "a.py" not in ops # unchanged
2058
2059 @pytest.mark.asyncio
2060 async def test_unambiguous_rename_is_move(self, db_session: AsyncSession) -> None:
2061 """When exactly one file disappears and one appears with the same object_id,
2062 the appearance is recorded as move with from_address in op_payload."""
2063 from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots
2064
2065 repo = await create_repo(db_session, slug="sdb-move")
2066 t1 = datetime(2026, 1, 1, tzinfo=timezone.utc)
2067 t2 = datetime(2026, 1, 2, tzinfo=timezone.utc)
2068 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1",
2069 {"old.py": "sha256:content"}, timestamp=t1)
2070 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c2",
2071 {"new.py": "sha256:content"},
2072 parent_ids=["c1"], timestamp=t2)
2073 await db_session.commit()
2074
2075 await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id)
2076
2077 rows = (await db_session.execute(
2078 select(MusehubSymbolHistoryEntry)
2079 .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
2080 MusehubSymbolHistoryEntry.commit_id == "c2")
2081 )).scalars().all()
2082 by_addr = {r.address: r for r in rows}
2083
2084 assert "new.py" in by_addr
2085 assert by_addr["new.py"].op == "move"
2086 assert (by_addr["new.py"].op_payload or {}).get("from_address") == "old.py"
2087 # old.py emits a delete with to_address pointing to new location
2088 assert "old.py" in by_addr
2089 assert by_addr["old.py"].op == "delete"
2090 assert (by_addr["old.py"].op_payload or {}).get("to_address") == "new.py"
2091
2092 @pytest.mark.asyncio
2093 async def test_ambiguous_rename_falls_back_to_insert_delete(
2094 self, db_session: AsyncSession
2095 ) -> None:
2096 """Same object_id disappears from two paths → ambiguous rename.
2097 Fall back: record inserts for new paths, deletes for old paths."""
2098 from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots
2099
2100 repo = await create_repo(db_session, slug="sdb-ambig")
2101 shared = "sha256:shared"
2102 t1 = datetime(2026, 1, 1, tzinfo=timezone.utc)
2103 t2 = datetime(2026, 1, 2, tzinfo=timezone.utc)
2104 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1",
2105 {"a.py": shared, "b.py": shared}, timestamp=t1)
2106 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c2",
2107 {"c.py": shared},
2108 parent_ids=["c1"], timestamp=t2)
2109 await db_session.commit()
2110
2111 await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id)
2112
2113 rows = (await db_session.execute(
2114 select(MusehubSymbolHistoryEntry)
2115 .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
2116 MusehubSymbolHistoryEntry.commit_id == "c2")
2117 )).scalars().all()
2118 ops = {r.address: r.op for r in rows}
2119 # c.py cannot be a move — two candidates for origin
2120 assert ops.get("c.py") == "insert"
2121 assert ops.get("a.py") == "delete"
2122 assert ops.get("b.py") == "delete"
2123
2124 def test_diff_manifests_move_emits_delete_with_to_address(self) -> None:
2125 """_diff_manifests includes a delete tuple with to_address for move sources."""
2126 from musehub.services.musehub_symbol_indexer import _diff_manifests
2127
2128 parent = {"old.py": "sha256:content"}
2129 child = {"new.py": "sha256:content"}
2130 ops = _diff_manifests(parent, child)
2131
2132 by_addr = {addr: (op, extra) for addr, op, extra in ops}
2133 # move destination carries from_address
2134 assert by_addr["new.py"] == ("move", "old.py")
2135 # move source carries to_address (not None)
2136 assert "old.py" in by_addr
2137 assert by_addr["old.py"][0] == "delete"
2138 assert by_addr["old.py"][1] == "new.py" # to_address
2139
2140 def test_diff_manifests_ambiguous_delete_has_no_to_address(self) -> None:
2141 """Ambiguous renames fall back to plain delete (no to_address)."""
2142 from musehub.services.musehub_symbol_indexer import _diff_manifests
2143
2144 shared = "sha256:shared"
2145 parent = {"a.py": shared, "b.py": shared}
2146 child = {"c.py": shared}
2147 ops = _diff_manifests(parent, child)
2148
2149 by_addr = {addr: (op, extra) for addr, op, extra in ops}
2150 # c.py is an insert (ambiguous — two possible sources)
2151 assert by_addr["c.py"] == ("insert", None)
2152 # plain deletes: no to_address
2153 assert by_addr["a.py"] == ("delete", None)
2154 assert by_addr["b.py"] == ("delete", None)
2155
2156 @pytest.mark.asyncio
2157 async def test_move_delete_op_payload_has_to_address(
2158 self, db_session: AsyncSession
2159 ) -> None:
2160 """DELETE entry for a move-source path carries to_address in op_payload."""
2161 from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots
2162
2163 repo = await create_repo(db_session, slug="sdb-move-payload")
2164 t1 = datetime(2026, 1, 1, tzinfo=timezone.utc)
2165 t2 = datetime(2026, 1, 2, tzinfo=timezone.utc)
2166 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1",
2167 {"old.py": "sha256:content"}, timestamp=t1)
2168 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c2",
2169 {"new.py": "sha256:content"},
2170 parent_ids=["c1"], timestamp=t2)
2171 await db_session.commit()
2172
2173 await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id)
2174
2175 rows = (await db_session.execute(
2176 select(MusehubSymbolHistoryEntry)
2177 .where(
2178 MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
2179 MusehubSymbolHistoryEntry.address == "old.py",
2180 MusehubSymbolHistoryEntry.commit_id == "c2",
2181 )
2182 )).scalars().all()
2183 assert len(rows) == 1
2184 row = rows[0]
2185 assert row.op == "delete"
2186 assert (row.op_payload or {}).get("to_address") == "new.py"
2187 assert (row.op_payload or {}).get("inferred_from") == "snapshot_diff"
2188
2189 @pytest.mark.asyncio
2190 async def test_skips_addresses_already_covered_by_structured_delta(
2191 self, db_session: AsyncSession
2192 ) -> None:
2193 """Addresses that already have a history entry for the commit are not overwritten."""
2194 from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots
2195
2196 repo = await create_repo(db_session, slug="sdb-skip")
2197 t1 = datetime(2026, 1, 1, tzinfo=timezone.utc)
2198 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1",
2199 {"a.py": "sha256:v1"}, timestamp=t1)
2200 # Pre-existing entry from structured_delta (e.g. 'patch' — richer semantics)
2201 session_entry = MusehubSymbolHistoryEntry(
2202 repo_id=repo.repo_id, address="a.py", commit_id="c1",
2203 op="patch", op_payload={"from_address": "old/a.py"},
2204 content_id="sha256:v1", committed_at=t1, author="gabriel",
2205 )
2206 db_session.add(session_entry)
2207 await db_session.commit()
2208
2209 count = await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id)
2210
2211 assert count == 0 # nothing to do
2212 rows = (await db_session.execute(
2213 select(MusehubSymbolHistoryEntry)
2214 .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id)
2215 )).scalars().all()
2216 assert len(rows) == 1
2217 assert rows[0].op == "patch" # original preserved
2218
2219 @pytest.mark.asyncio
2220 async def test_unchanged_files_produce_no_entries(
2221 self, db_session: AsyncSession
2222 ) -> None:
2223 """Files with identical object_ids across parent and child produce no entry."""
2224 from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots
2225
2226 repo = await create_repo(db_session, slug="sdb-nochange")
2227 t1 = datetime(2026, 1, 1, tzinfo=timezone.utc)
2228 t2 = datetime(2026, 1, 2, tzinfo=timezone.utc)
2229 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1",
2230 {"a.py": "sha256:same", "b.py": "sha256:same2"},
2231 timestamp=t1)
2232 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c2",
2233 {"a.py": "sha256:same", "b.py": "sha256:same2"},
2234 parent_ids=["c1"], timestamp=t2)
2235 await db_session.commit()
2236
2237 await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id)
2238
2239 c2_rows = (await db_session.execute(
2240 select(MusehubSymbolHistoryEntry)
2241 .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id,
2242 MusehubSymbolHistoryEntry.commit_id == "c2")
2243 )).scalars().all()
2244 assert c2_rows == []
2245
2246 @pytest.mark.asyncio
2247 async def test_dry_run_returns_count_without_writing(
2248 self, db_session: AsyncSession
2249 ) -> None:
2250 from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots
2251
2252 repo = await create_repo(db_session, slug="sdb-dry")
2253 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1",
2254 {"a.py": "sha256:v1", "b.py": "sha256:v2"})
2255 await db_session.commit()
2256
2257 count = await backfill_history_from_snapshots(
2258 db_session, repo_id=repo.repo_id, dry_run=True
2259 )
2260 assert count == 2
2261
2262 existing = (await db_session.execute(
2263 select(MusehubSymbolHistoryEntry)
2264 .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id)
2265 )).scalars().all()
2266 assert existing == []
2267
2268 @pytest.mark.asyncio
2269 async def test_idempotent(self, db_session: AsyncSession) -> None:
2270 from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots
2271
2272 repo = await create_repo(db_session, slug="sdb-idem")
2273 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1",
2274 {"a.py": "sha256:v1"})
2275 await db_session.commit()
2276
2277 first = await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id)
2278 await db_session.commit()
2279 second = await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id)
2280 assert first == 1
2281 assert second == 0
2282
2283 @pytest.mark.asyncio
2284 async def test_repo_id_filter(self, db_session: AsyncSession) -> None:
2285 from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots
2286
2287 repo_a = await create_repo(db_session, slug="sdb-filter-a")
2288 repo_b = await create_repo(db_session, slug="sdb-filter-b")
2289 await _seed_commit_with_snapshot(db_session, repo_a.repo_id, "ca1",
2290 {"a.py": "sha256:a"})
2291 await _seed_commit_with_snapshot(db_session, repo_b.repo_id, "cb1",
2292 {"b.py": "sha256:b"})
2293 await db_session.commit()
2294
2295 count = await backfill_history_from_snapshots(db_session, repo_id=repo_a.repo_id)
2296 assert count == 1
2297
2298 a_rows = (await db_session.execute(
2299 select(MusehubSymbolHistoryEntry)
2300 .where(MusehubSymbolHistoryEntry.repo_id == repo_a.repo_id)
2301 )).scalars().all()
2302 b_rows = (await db_session.execute(
2303 select(MusehubSymbolHistoryEntry)
2304 .where(MusehubSymbolHistoryEntry.repo_id == repo_b.repo_id)
2305 )).scalars().all()
2306 assert len(a_rows) == 1
2307 assert len(b_rows) == 0
2308
2309 @pytest.mark.asyncio
2310 async def test_inferred_op_payload_marks_source(
2311 self, db_session: AsyncSession
2312 ) -> None:
2313 """Entries created by snapshot-diff carry inferred_from='snapshot_diff'
2314 in op_payload so callers can distinguish them from structured_delta entries."""
2315 from musehub.services.musehub_symbol_indexer import backfill_history_from_snapshots
2316
2317 repo = await create_repo(db_session, slug="sdb-mark")
2318 await _seed_commit_with_snapshot(db_session, repo.repo_id, "c1",
2319 {"a.py": "sha256:v1"})
2320 await db_session.commit()
2321
2322 await backfill_history_from_snapshots(db_session, repo_id=repo.repo_id)
2323
2324 rows = (await db_session.execute(
2325 select(MusehubSymbolHistoryEntry)
2326 .where(MusehubSymbolHistoryEntry.repo_id == repo.repo_id)
2327 )).scalars().all()
2328 assert len(rows) == 1
2329 assert (rows[0].op_payload or {}).get("inferred_from") == "snapshot_diff"
2330
2331
2332 # ─────────────────────────────────────────────────────────────────────────────
2333 # Layer 3 — Lineage walk: load_symbol_history follows from_address chains
2334 # ─────────────────────────────────────────────────────────────────────────────
2335
2336
2337 async def _seed_history_entry(
2338 session: AsyncSession,
2339 repo_id: str,
2340 address: str,
2341 commit_id: str,
2342 op: str,
2343 op_payload: JSONObject | None = None,
2344 content_id: str | None = None,
2345 committed_at: datetime | None = None,
2346 ) -> MusehubSymbolHistoryEntry:
2347 """Write a single history row directly (bypasses the indexer)."""
2348 row = MusehubSymbolHistoryEntry(
2349 repo_id=repo_id,
2350 address=address,
2351 commit_id=commit_id,
2352 op=op,
2353 op_payload=op_payload or {},
2354 content_id=content_id,
2355 committed_at=committed_at or _now(),
2356 author="gabriel",
2357 )
2358 session.add(row)
2359 await session.flush()
2360 return row
2361
2362
2363 class TestLoadSymbolHistoryLineage:
2364 """load_symbol_history follows from_address chains in op_payload to build
2365 full symbol lineage across renames and moves."""
2366
2367 @pytest.mark.asyncio
2368 async def test_no_from_address_unchanged(self, db_session: AsyncSession) -> None:
2369 """A symbol with no move history is returned as-is."""
2370 from musehub.services.musehub_symbol_indexer import load_symbol_history
2371
2372 repo = await create_repo(db_session, slug="lin-noop")
2373 await _seed_history_entry(db_session, repo.repo_id, "src/a.py::Foo", "c1",
2374 "insert", content_id="sha256:v1")
2375 await _seed_history_entry(db_session, repo.repo_id, "src/a.py::Foo", "c2",
2376 "replace", content_id="sha256:v2")
2377 await db_session.commit()
2378
2379 history = await load_symbol_history(db_session, repo.repo_id)
2380 assert "src/a.py::Foo" in history
2381 assert len(history["src/a.py::Foo"]) == 2
2382 assert history["src/a.py::Foo"][0]["op"] == "insert"
2383
2384 @pytest.mark.asyncio
2385 async def test_single_rename_prepends_origin_history(
2386 self, db_session: AsyncSession
2387 ) -> None:
2388 """History for new.py::Foo should include the insert at old.py::Foo."""
2389 from musehub.services.musehub_symbol_indexer import load_symbol_history
2390
2391 repo = await create_repo(db_session, slug="lin-single")
2392 # old.py::Foo was inserted, then modified
2393 await _seed_history_entry(db_session, repo.repo_id, "old.py::Foo", "c1",
2394 "insert", content_id="sha256:v1")
2395 await _seed_history_entry(db_session, repo.repo_id, "old.py::Foo", "c2",
2396 "replace", content_id="sha256:v2")
2397 # new.py::Foo was born via a move from old.py::Foo
2398 await _seed_history_entry(
2399 db_session, repo.repo_id, "new.py::Foo", "c3", "move",
2400 op_payload={"from_address": "old.py::Foo"},
2401 content_id="sha256:v2",
2402 )
2403 await db_session.commit()
2404
2405 history = await load_symbol_history(db_session, repo.repo_id)
2406
2407 # The new address should have the full chain: insert → replace → move
2408 assert "new.py::Foo" in history
2409 ops = [e["op"] for e in history["new.py::Foo"]]
2410 assert ops[0] == "insert", f"Expected insert first, got: {ops}"
2411 assert ops[-1] == "move", f"Expected move last, got: {ops}"
2412 assert len(ops) == 3
2413
2414 @pytest.mark.asyncio
2415 async def test_origin_address_excluded_from_top_level_keys(
2416 self, db_session: AsyncSession
2417 ) -> None:
2418 """After a rename, the old address should not appear as a top-level key."""
2419 from musehub.services.musehub_symbol_indexer import load_symbol_history
2420
2421 repo = await create_repo(db_session, slug="lin-noold")
2422 await _seed_history_entry(db_session, repo.repo_id, "old.py::Foo", "c1",
2423 "insert", content_id="sha256:v1")
2424 await _seed_history_entry(
2425 db_session, repo.repo_id, "new.py::Foo", "c2", "move",
2426 op_payload={"from_address": "old.py::Foo"},
2427 content_id="sha256:v1",
2428 )
2429 await db_session.commit()
2430
2431 history = await load_symbol_history(db_session, repo.repo_id)
2432
2433 assert "old.py::Foo" not in history, (
2434 "Origin address should be folded into new.py::Foo's lineage, "
2435 "not kept as a separate top-level key"
2436 )
2437
2438 @pytest.mark.asyncio
2439 async def test_multi_hop_rename_walks_full_chain(
2440 self, db_session: AsyncSession
2441 ) -> None:
2442 """A→B→C chain: history for C includes all entries from A, B, and C."""
2443 from musehub.services.musehub_symbol_indexer import load_symbol_history
2444
2445 repo = await create_repo(db_session, slug="lin-multi")
2446 await _seed_history_entry(db_session, repo.repo_id, "a.py::Fn", "c1",
2447 "insert", content_id="sha256:v1")
2448 await _seed_history_entry(
2449 db_session, repo.repo_id, "b.py::Fn", "c2", "move",
2450 op_payload={"from_address": "a.py::Fn"},
2451 content_id="sha256:v1",
2452 )
2453 await _seed_history_entry(
2454 db_session, repo.repo_id, "c.py::Fn", "c3", "move",
2455 op_payload={"from_address": "b.py::Fn"},
2456 content_id="sha256:v1",
2457 )
2458 await db_session.commit()
2459
2460 history = await load_symbol_history(db_session, repo.repo_id)
2461
2462 assert "c.py::Fn" in history
2463 assert "b.py::Fn" not in history
2464 assert "a.py::Fn" not in history
2465 ops = [e["op"] for e in history["c.py::Fn"]]
2466 assert ops[0] == "insert"
2467 assert ops[-1] == "move"
2468 assert len(ops) == 3
2469
2470 @pytest.mark.asyncio
2471 async def test_lineage_walk_is_bounded_on_missing_origin(
2472 self, db_session: AsyncSession
2473 ) -> None:
2474 """If from_address has no rows, lineage walk stops gracefully."""
2475 from musehub.services.musehub_symbol_indexer import load_symbol_history
2476
2477 repo = await create_repo(db_session, slug="lin-bound")
2478 # new.py::Foo claims to have come from ghost.py::Foo, which has no rows
2479 await _seed_history_entry(
2480 db_session, repo.repo_id, "new.py::Foo", "c1", "move",
2481 op_payload={"from_address": "ghost.py::Foo"},
2482 content_id="sha256:v1",
2483 )
2484 await db_session.commit()
2485
2486 # Must not raise, must not loop
2487 history = await load_symbol_history(db_session, repo.repo_id)
2488 assert "new.py::Foo" in history
2489 assert len(history["new.py::Foo"]) == 1
2490
2491 @pytest.mark.asyncio
2492 async def test_file_path_filter_includes_lineage(
2493 self, db_session: AsyncSession
2494 ) -> None:
2495 """file_path filter on new.py returns the full lineage including old.py origin."""
2496 from musehub.services.musehub_symbol_indexer import load_symbol_history
2497
2498 repo = await create_repo(db_session, slug="lin-filter")
2499 await _seed_history_entry(db_session, repo.repo_id, "old.py::Foo", "c1",
2500 "insert", content_id="sha256:v1")
2501 await _seed_history_entry(
2502 db_session, repo.repo_id, "new.py::Foo", "c2", "move",
2503 op_payload={"from_address": "old.py::Foo"},
2504 content_id="sha256:v1",
2505 )
2506 # unrelated symbol in another file
2507 await _seed_history_entry(db_session, repo.repo_id, "other.py::Bar", "c3",
2508 "insert", content_id="sha256:vx")
2509 await db_session.commit()
2510
2511 history = await load_symbol_history(db_session, repo.repo_id, file_path="new.py")
2512
2513 assert "new.py::Foo" in history
2514 assert "other.py::Bar" not in history
2515 ops = [e["op"] for e in history["new.py::Foo"]]
2516 assert ops[0] == "insert"
2517
2518 @pytest.mark.asyncio
2519 async def test_lineage_entries_carry_original_address(
2520 self, db_session: AsyncSession
2521 ) -> None:
2522 """Each entry in the merged lineage carries its original address so the
2523 UI can show where the symbol lived at that point in time."""
2524 from musehub.services.musehub_symbol_indexer import load_symbol_history
2525
2526 repo = await create_repo(db_session, slug="lin-addr")
2527 await _seed_history_entry(db_session, repo.repo_id, "old.py::Foo", "c1",
2528 "insert", content_id="sha256:v1")
2529 await _seed_history_entry(
2530 db_session, repo.repo_id, "new.py::Foo", "c2", "move",
2531 op_payload={"from_address": "old.py::Foo"},
2532 content_id="sha256:v1",
2533 )
2534 await db_session.commit()
2535
2536 history = await load_symbol_history(db_session, repo.repo_id)
2537 entries = history["new.py::Foo"]
2538
2539 insert_entries = [e for e in entries if e["op"] == "insert"]
2540 assert insert_entries, "Expected at least one insert entry in lineage"
2541 assert insert_entries[0].get("address") == "old.py::Foo", (
2542 "Lineage entries must carry their original address for UI rendering"
2543 )
File History 1 commit
sha256:3ff9c9863a9891bdcde71b4a43228f66d0493e38b7cc1d09fe9eb7de774046b2 feat: add repair-commit wire endpoint (API parity with repa… Opus 4.8 minor 1 day ago