gabriel / musehub public
test_mpack_content_scanning_phase3.py python
536 lines 20.1 KB
Raw
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor ⚠ breaking 20 days ago
1 """TDD — Phase 3: content scanning and DMCA takedown (issue #49).
2
3 Phase 3 invariants:
4 3a. Known-hash blocklist — mpack.index quarantines any mpack whose objects
5 appear in musehub_blocked_hashes, before any MinIO writes.
6 3b. content.scan job infrastructure — after indexing, binary objects above
7 the scan threshold get a content.scan job enqueued.
8 3c. DMCA takedown — POST /api/admin/takedown adds hashes to the blocklist,
9 moves existing MinIO objects to quarantine, marks repos with dmca_hold.
10 Requires is_admin=True; non-admins receive 403.
11 """
12 from __future__ import annotations
13
14 import datetime
15 import hashlib
16 import pathlib
17
18 import msgpack
19 import pytest
20 import pytest_asyncio
21 from httpx import AsyncClient, ASGITransport
22 from sqlalchemy import select
23 from sqlalchemy.ext.asyncio import AsyncSession
24 from unittest.mock import patch
25
26 from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request
27 from musehub.db.musehub_abuse_models import MusehubBlockedHash
28 from musehub.db.musehub_jobs_models import MusehubBackgroundJob
29 from musehub.db.musehub_repo_models import MusehubRepo
30 from musehub.db.database import get_db
31 from musehub.main import app
32
33 from muse.core.object_store import write_object
34 from muse.core.mpack import build_mpack
35 from muse.core.paths import muse_dir
36 from muse.core.snapshot import compute_commit_id, compute_snapshot_id
37 from muse.core.commits import CommitRecord, write_commit
38 from muse.core.refs import write_branch_ref
39 from muse.core.snapshots import SnapshotRecord, write_snapshot
40 from muse.core.types import blob_id
41 from musehub.types.json_types import JSONObject
42
43
44 _AUTH_CTX = MSignContext(
45 handle="gabriel",
46 identity_id="sha256:" + "0" * 64,
47 is_agent=False,
48 is_admin=True,
49 )
50
51 _NON_ADMIN_CTX = MSignContext(
52 handle="carol",
53 identity_id="sha256:" + "1" * 64,
54 is_agent=False,
55 is_admin=False,
56 )
57
58 _N_FILES = 8
59 _N_COMMITS = 4
60 _FILES_CHANGED = 2
61 _BLOB_SIZE = 128
62
63
64 # ── fixtures ────────────────────────────────────────────────────────────────
65
66 @pytest_asyncio.fixture()
67 async def client(db_session: AsyncSession) -> None:
68 async def _override_get_db() -> None:
69 yield db_session
70
71 app.dependency_overrides[get_db] = _override_get_db
72 app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX
73 app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX
74
75 async with AsyncClient(
76 transport=ASGITransport(app=app),
77 base_url="https://localhost:1337",
78 ) as c:
79 yield c
80
81 app.dependency_overrides.clear()
82
83
84 @pytest_asyncio.fixture()
85 async def non_admin_client(db_session: AsyncSession) -> None:
86 async def _override_get_db() -> None:
87 yield db_session
88
89 app.dependency_overrides[get_db] = _override_get_db
90 app.dependency_overrides[require_signed_request] = lambda: _NON_ADMIN_CTX
91 app.dependency_overrides[optional_signed_request] = lambda: _NON_ADMIN_CTX
92
93 async with AsyncClient(
94 transport=ASGITransport(app=app),
95 base_url="https://localhost:1337",
96 ) as c:
97 yield c
98
99 app.dependency_overrides.clear()
100
101
102 @pytest_asyncio.fixture()
103 async def repo(client: AsyncClient) -> None:
104 resp = await client.post(
105 "/api/repos",
106 json={"owner": "gabriel", "name": "phase3-scan-test", "visibility": "public", "initialize": False},
107 )
108 assert resp.status_code in (200, 201), resp.text
109 data = resp.json()
110 yield data
111 await client.delete(f"/api/repos/{data['repoId']}")
112
113
114 def _make_repo(tmp: pathlib.Path) -> tuple[pathlib.Path, str, dict]:
115 tmp.mkdir(parents=True, exist_ok=True)
116 dot = muse_dir(tmp)
117 dot.mkdir()
118 (dot / "repo.json").write_text('{"repo_id":"phase3-test","owner":"gabriel"}')
119 for d in ("commits", "snapshots", "objects"):
120 (dot / d).mkdir()
121 (dot / "refs" / "heads").mkdir(parents=True)
122 (dot / "HEAD").write_text("ref: refs/heads/main\n")
123 (dot / "config.toml").write_text("")
124
125 blob_ids: list[str] = []
126 for i in range(_N_FILES):
127 data = f"phase3-base-{i:04d}".encode() + b"x" * _BLOB_SIZE
128 oid = blob_id(data)
129 write_object(tmp, oid, data)
130 blob_ids.append(oid)
131
132 base_manifest = {f"src/file_{i:04d}.py": blob_ids[i] for i in range(_N_FILES)}
133 parent = None
134 tip = ""
135 ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
136
137 for i in range(_N_COMMITS):
138 manifest = dict(base_manifest)
139 for j in range(_FILES_CHANGED):
140 idx = (i * _FILES_CHANGED + j) % _N_FILES
141 raw = f"phase3-c{i:04d}-f{j}".encode() + b"y" * _BLOB_SIZE
142 oid = blob_id(raw)
143 write_object(tmp, oid, raw)
144 manifest[f"src/file_{idx:04d}.py"] = oid
145
146 sid = compute_snapshot_id(manifest)
147 write_snapshot(tmp, SnapshotRecord(snapshot_id=sid, manifest=manifest))
148 msg = f"commit-{i:05d}"
149 cid = compute_commit_id(
150 parent_ids=[parent] if parent else [],
151 snapshot_id=sid,
152 message=msg,
153 committed_at_iso=ts.isoformat(),
154 author="gabriel",
155 )
156 write_commit(tmp, CommitRecord(
157 commit_id=cid, branch="main",
158 snapshot_id=sid, message=msg, committed_at=ts,
159 parent_commit_id=parent, parent2_commit_id=None,
160 author="gabriel", metadata={}, structured_delta=None,
161 sem_ver_bump="none", breaking_changes=[],
162 agent_id="", model_id="", toolchain_id="",
163 prompt_hash="", signature="", signer_key_id="",
164 ))
165 parent = cid
166 tip = cid
167 ts += datetime.timedelta(seconds=60)
168
169 write_branch_ref(tmp, "main", tip)
170 mpack = build_mpack(tmp, [tip], have=[])
171 return tmp, tip, mpack
172
173
174 async def _push_mpack(client: AsyncClient, repo_slug: str, mpack: bytes, head: str, db_session: AsyncSession, repo_id: str) -> str:
175 """Upload mpack to MinIO via presign, then create a mpack.index job row directly.
176
177 The push route is synchronous — it doesn't return a job_id. Tests that need
178 to call process_mpack_index_job directly must create the job row themselves.
179 """
180 import httpx as _httpx
181 from datetime import datetime, timezone
182 from musehub.core.genesis import compute_job_id as _compute_job_id
183
184 wire_bytes = msgpack.packb(mpack, use_bin_type=True)
185 mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest()
186
187 pr = await client.post(
188 f"/gabriel/{repo_slug}/push/mpack-presign",
189 content=msgpack.packb({"mpack_key": mpack_key, "size_bytes": len(wire_bytes)}, use_bin_type=True),
190 headers={"Content-Type": "application/x-msgpack"},
191 )
192 assert pr.status_code == 200, pr.text
193 upload_url = pr.json().get("upload_url") or pr.json().get("uploadUrl")
194
195 async with _httpx.AsyncClient() as raw:
196 put = await raw.put(upload_url, content=wire_bytes)
197 assert put.status_code in (200, 204)
198
199 _now = datetime.now(tz=timezone.utc)
200 job_id = _compute_job_id(repo_id, "mpack.index", _now.isoformat())
201 db_session.add(MusehubBackgroundJob(
202 job_id=job_id,
203 repo_id=repo_id,
204 job_type="mpack.index",
205 payload={
206 "mpack_key": mpack_key,
207 "pusher_id": _AUTH_CTX.identity_id,
208 "branch": "main",
209 "head": head,
210 "force": False,
211 "declared_objects_count": None,
212 },
213 status="pending",
214 ))
215 await db_session.flush()
216 return job_id
217
218
219 # ── 3a: known-hash blocklist ─────────────────────────────────────────────────
220
221 @pytest.mark.skip(reason="muse wire protocol in flux")
222 @pytest.mark.asyncio
223 async def test_blocked_object_quarantines_mpack(
224 client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession,
225 ) -> None:
226 """An object whose sha256 ID is in musehub_blocked_hashes quarantines the mpack.
227
228 The blocklist check runs after Phase 2 validation, before any MinIO PUTs.
229 The job must raise MPackValidationError and set status='quarantined'.
230 """
231 _, head, mpack = _make_repo(tmp_path / "repo")
232 repo_slug = repo["slug"]
233 job_id = await _push_mpack(client, repo_slug, mpack, head, db_session, repo["repoId"])
234
235 # Block one of the objects that will be in this mpack
236 raw_objects = mpack.get("objects") or []
237 assert raw_objects, "mpack must have objects"
238 blocked_oid = raw_objects[0]["object_id"]
239
240 db_session.add(MusehubBlockedHash(
241 object_id=blocked_oid,
242 reason="test NCMEC block",
243 ))
244 await db_session.flush()
245
246 from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError
247 with pytest.raises(MPackValidationError, match="blocked"):
248 await process_mpack_index_job(db_session, job_id)
249
250 await db_session.commit()
251 db_session.expire_all()
252
253 job_row = (await db_session.execute(
254 select(MusehubBackgroundJob).where(MusehubBackgroundJob.job_id == job_id)
255 )).scalar_one()
256 assert job_row.status == "quarantined", (
257 f"expected 'quarantined' after blocked-hash detection, got '{job_row.status}'"
258 )
259
260
261 @pytest.mark.skip(reason="muse wire protocol in flux")
262 @pytest.mark.asyncio
263 async def test_blocked_check_fires_before_minio_puts(
264 client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession,
265 ) -> None:
266 """Blocklist check fires before backend.put — zero MinIO writes on blocked mpack.
267
268 Verifies the order: validate (Phase 2) → blocklist check → MinIO PUTs.
269 When a blocked hash is detected, backend.put must never be called.
270 """
271 _, head, mpack = _make_repo(tmp_path / "repo")
272 job_id = await _push_mpack(client, repo["slug"], mpack, head, db_session, repo["repoId"])
273
274 raw_objects = mpack.get("objects") or []
275 blocked_oid = raw_objects[0]["object_id"]
276 db_session.add(MusehubBlockedHash(object_id=blocked_oid, reason="test"))
277 await db_session.flush()
278
279 from musehub.storage.backends import get_backend as _get_backend
280 backend = _get_backend()
281 put_calls: list[str] = []
282 _real_put = backend.put
283
284 async def _spy(oid: str, data: bytes) -> None:
285 put_calls.append(oid)
286 return await _real_put(oid, data)
287
288 from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError
289 with patch.object(backend, "put", side_effect=_spy):
290 with pytest.raises(MPackValidationError):
291 await process_mpack_index_job(db_session, job_id)
292
293 assert not put_calls, (
294 f"backend.put was called {len(put_calls)} time(s) despite blocked hash — "
295 f"blocklist check must run before MinIO writes"
296 )
297
298
299 @pytest.mark.skip(reason="muse wire protocol in flux")
300 @pytest.mark.asyncio
301 async def test_multiple_blocked_objects_all_reported(
302 client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession,
303 ) -> None:
304 """MPackValidationError message reports all blocked object IDs, not just the first.
305
306 When a mpack contains multiple blocked objects, the error should list them
307 so the operator knows the full extent of the quarantine.
308 """
309 _, head, mpack = _make_repo(tmp_path / "repo")
310 job_id = await _push_mpack(client, repo["slug"], mpack, head, db_session, repo["repoId"])
311
312 raw_objects = mpack.get("objects") or []
313 assert len(raw_objects) >= 2, "need at least 2 objects to test multi-blocked"
314 blocked_oids = [raw_objects[0]["object_id"], raw_objects[1]["object_id"]]
315 for oid in blocked_oids:
316 db_session.add(MusehubBlockedHash(object_id=oid, reason="test"))
317 await db_session.flush()
318
319 from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError
320 with pytest.raises(MPackValidationError) as exc_info:
321 await process_mpack_index_job(db_session, job_id)
322
323 err_msg = str(exc_info.value)
324 assert "2" in err_msg or "blocked" in err_msg.lower(), (
325 f"expected error to mention count/blocked objects, got: {err_msg!r}"
326 )
327
328
329 @pytest.mark.skip(reason="muse wire protocol in flux")
330 @pytest.mark.asyncio
331 async def test_clean_mpack_bypasses_blocklist_unimpeded(
332 client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession,
333 ) -> None:
334 """Regression: a mpack with no blocked objects indexes normally.
335
336 The blocklist table may have entries for other objects — none matching
337 this mpack's content. process_mpack_index_job must complete normally.
338 """
339 _, head, mpack = _make_repo(tmp_path / "repo")
340 job_id = await _push_mpack(client, repo["slug"], mpack, head, db_session, repo["repoId"])
341
342 # Populate blocklist with a hash that's NOT in this mpack
343 db_session.add(MusehubBlockedHash(
344 object_id="sha256:" + "f" * 64,
345 reason="unrelated block",
346 ))
347 await db_session.flush()
348
349 from musehub.services.musehub_wire import process_mpack_index_job
350 result = await process_mpack_index_job(db_session, job_id)
351 await db_session.commit()
352
353 assert result["commits_written"] > 0, "clean mpack must index normally despite non-matching blocklist entries"
354
355
356 # ── 3c: DMCA takedown endpoint ───────────────────────────────────────────────
357
358 @pytest.mark.asyncio
359 async def test_dmca_takedown_adds_hashes_to_blocklist(
360 client: AsyncClient, tmp_path: pathlib.Path, db_session: AsyncSession,
361 ) -> None:
362 """POST /api/admin/takedown adds object_ids to musehub_blocked_hashes.
363
364 After a successful takedown request, each supplied object_id must appear
365 in musehub_blocked_hashes with the supplied reason.
366 """
367 oids = [
368 "sha256:" + hashlib.sha256(f"dmca-object-{i}".encode()).hexdigest()
369 for i in range(3)
370 ]
371 resp = await client.post(
372 "/api/admin/takedown",
373 json={"object_ids": oids, "reason": "DMCA request #12345"},
374 )
375 assert resp.status_code == 200, resp.text
376
377 db_session.expire_all()
378 rows = (await db_session.execute(
379 select(MusehubBlockedHash).where(MusehubBlockedHash.object_id.in_(oids))
380 )).scalars().all()
381 assert len(rows) == len(oids), (
382 f"expected {len(oids)} blocked_hash rows, got {len(rows)}"
383 )
384 for row in rows:
385 assert "DMCA" in (row.reason or ""), (
386 f"expected reason to contain 'DMCA', got {row.reason!r}"
387 )
388
389
390 @pytest.mark.asyncio
391 async def test_dmca_takedown_marks_repos_dmca_hold(
392 client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession,
393 ) -> None:
394 """POST /api/admin/takedown with repo_ids sets dmca_hold=True on each repo.
395
396 A repo under dmca_hold must have its flag persisted so push gates and
397 serve paths can enforce the hold.
398 """
399 repo_id = repo["repoId"]
400 oids = ["sha256:" + hashlib.sha256(b"dmca-hold-test").hexdigest()]
401
402 resp = await client.post(
403 "/api/admin/takedown",
404 json={"object_ids": oids, "reason": "DMCA #hold-test", "repo_ids": [repo_id]},
405 )
406 assert resp.status_code == 200, resp.text
407 data = resp.json()
408 assert data.get("repos_held", 0) == 1, (
409 f"expected repos_held=1 in response, got: {data}"
410 )
411
412 db_session.expire_all()
413 repo_row = (await db_session.execute(
414 select(MusehubRepo).where(MusehubRepo.repo_id == repo_id)
415 )).scalar_one()
416 assert repo_row.dmca_hold is True, (
417 f"expected repo.dmca_hold=True after takedown with repo_ids, got {repo_row.dmca_hold}"
418 )
419
420
421 @pytest.mark.asyncio
422 async def test_dmca_takedown_quarantines_existing_minio_objects(
423 client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession,
424 ) -> None:
425 """POST /api/admin/takedown moves already-stored MinIO objects to quarantine.
426
427 Objects that were written to MinIO before the takedown must be moved to the
428 quarantine prefix so they are no longer publicly fetchable.
429 """
430 # Write a real object to MinIO first (simulates a previously indexed object)
431 from musehub.storage.backends import get_backend as _get_backend
432 backend = _get_backend()
433
434 obj_data = b"sensitive-content-to-quarantine-" + b"x" * 100
435 oid = "sha256:" + hashlib.sha256(obj_data).hexdigest()
436 await backend.put(oid, obj_data)
437
438 # Verify it exists in MinIO before takedown
439 assert await backend.get(oid) is not None, "object must be in MinIO before takedown"
440
441 resp = await client.post(
442 "/api/admin/takedown",
443 json={"object_ids": [oid], "reason": "DMCA quarantine test"},
444 )
445 assert resp.status_code == 200, resp.text
446 data = resp.json()
447 assert data.get("quarantined_count", 0) >= 1, (
448 f"expected quarantined_count >= 1 in response, got: {data}"
449 )
450
451 # Object must no longer be accessible from the main bucket
452 assert await backend.get(oid) is None, (
453 "object must be removed from main MinIO bucket after DMCA takedown"
454 )
455
456
457 @pytest.mark.asyncio
458 async def test_dmca_takedown_requires_admin(
459 non_admin_client: AsyncClient, db_session: AsyncSession,
460 ) -> None:
461 """POST /api/admin/takedown returns 403 for non-admin callers.
462
463 The takedown endpoint is admin-only — a non-admin identity must receive
464 403 Forbidden, not 401 (auth header is valid, permissions are insufficient).
465 """
466 resp = await non_admin_client.post(
467 "/api/admin/takedown",
468 json={"object_ids": ["sha256:" + "a" * 64], "reason": "test"},
469 )
470 assert resp.status_code == 403, (
471 f"expected 403 for non-admin takedown request, got {resp.status_code}: {resp.text}"
472 )
473
474
475 @pytest.mark.asyncio
476 async def test_dmca_takedown_idempotent(
477 client: AsyncClient, db_session: AsyncSession,
478 ) -> None:
479 """POST /api/admin/takedown is idempotent — re-blocking an already-blocked hash is safe.
480
481 Calling takedown twice with the same object_ids must not raise an error or
482 create duplicate rows.
483 """
484 oid = "sha256:" + hashlib.sha256(b"idempotent-takedown").hexdigest()
485 payload = {"object_ids": [oid], "reason": "idempotent test"}
486
487 resp1 = await client.post("/api/admin/takedown", json=payload)
488 assert resp1.status_code == 200, resp1.text
489
490 resp2 = await client.post("/api/admin/takedown", json=payload)
491 assert resp2.status_code == 200, resp2.text
492
493 db_session.expire_all()
494 rows = (await db_session.execute(
495 select(MusehubBlockedHash).where(MusehubBlockedHash.object_id == oid)
496 )).scalars().all()
497 assert len(rows) == 1, f"expected exactly 1 blocked_hash row after idempotent calls, got {len(rows)}"
498
499
500 # ── 3b: content.scan job infrastructure ──────────────────────────────────────
501
502 @pytest.mark.skip(reason="muse wire protocol in flux")
503 @pytest.mark.asyncio
504 async def test_content_scan_jobs_enqueued_after_indexing(
505 client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession,
506 ) -> None:
507 """After mpack.index, content.scan jobs are enqueued for indexed objects.
508
509 Each indexed object gets a content.scan job so CSAM APIs can be integrated
510 as a drop-in when legal review completes. The job type is 'content.scan'
511 and the payload contains the object_id and repo_id.
512 """
513 _, head, mpack = _make_repo(tmp_path / "repo")
514 job_id = await _push_mpack(client, repo["slug"], mpack, head, db_session, repo["repoId"])
515
516 from musehub.services.musehub_wire import process_mpack_index_job
517 result = await process_mpack_index_job(db_session, job_id)
518 await db_session.commit()
519
520 raw_objects = mpack.get("objects") or []
521 all_oids = {obj["object_id"] for obj in raw_objects}
522
523 scan_jobs = (await db_session.execute(
524 select(MusehubBackgroundJob)
525 .where(MusehubBackgroundJob.job_type == "content.scan")
526 .where(MusehubBackgroundJob.repo_id == repo["repoId"])
527 )).scalars().all()
528
529 assert scan_jobs, (
530 "expected content.scan jobs to be enqueued after mpack.index, got none — "
531 "3b job infrastructure missing"
532 )
533 scan_oids = {j.payload.get("object_id") for j in scan_jobs}
534 assert scan_oids <= all_oids, (
535 f"content.scan jobs reference object_ids not in mpack: {scan_oids - all_oids}"
536 )
File History 1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor 20 days ago