"""TDD — Phase 3: content scanning and DMCA takedown (issue #49). Phase 3 invariants: 3a. Known-hash blocklist — mpack.index quarantines any mpack whose objects appear in musehub_blocked_hashes, before any MinIO writes. 3b. content.scan job infrastructure — after indexing, binary objects above the scan threshold get a content.scan job enqueued. 3c. DMCA takedown — POST /api/admin/takedown adds hashes to the blocklist, moves existing MinIO objects to quarantine, marks repos with dmca_hold. Requires is_admin=True; non-admins receive 403. """ from __future__ import annotations import datetime import hashlib import pathlib import msgpack import pytest import pytest_asyncio from httpx import AsyncClient, ASGITransport from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from unittest.mock import patch from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request from musehub.db.musehub_abuse_models import MusehubBlockedHash from musehub.db.musehub_jobs_models import MusehubBackgroundJob from musehub.db.musehub_repo_models import MusehubRepo from musehub.db.database import get_db from musehub.main import app from muse.core.object_store import write_object from muse.core.mpack import build_mpack from muse.core.paths import muse_dir from muse.core.snapshot import compute_commit_id, compute_snapshot_id from muse.core.commits import CommitRecord, write_commit from muse.core.refs import write_branch_ref from muse.core.snapshots import SnapshotRecord, write_snapshot from muse.core.types import blob_id from musehub.types.json_types import JSONObject _AUTH_CTX = MSignContext( handle="gabriel", identity_id="sha256:" + "0" * 64, is_agent=False, is_admin=True, ) _NON_ADMIN_CTX = MSignContext( handle="carol", identity_id="sha256:" + "1" * 64, is_agent=False, is_admin=False, ) _N_FILES = 8 _N_COMMITS = 4 _FILES_CHANGED = 2 _BLOB_SIZE = 128 # ── fixtures ──────────────────────────────────────────────────────────────── @pytest_asyncio.fixture() async def client(db_session: AsyncSession) -> None: async def _override_get_db() -> None: yield db_session app.dependency_overrides[get_db] = _override_get_db app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX async with AsyncClient( transport=ASGITransport(app=app), base_url="https://localhost:1337", ) as c: yield c app.dependency_overrides.clear() @pytest_asyncio.fixture() async def non_admin_client(db_session: AsyncSession) -> None: async def _override_get_db() -> None: yield db_session app.dependency_overrides[get_db] = _override_get_db app.dependency_overrides[require_signed_request] = lambda: _NON_ADMIN_CTX app.dependency_overrides[optional_signed_request] = lambda: _NON_ADMIN_CTX async with AsyncClient( transport=ASGITransport(app=app), base_url="https://localhost:1337", ) as c: yield c app.dependency_overrides.clear() @pytest_asyncio.fixture() async def repo(client: AsyncClient) -> None: resp = await client.post( "/api/repos", json={"owner": "gabriel", "name": "phase3-scan-test", "visibility": "public", "initialize": False}, ) assert resp.status_code in (200, 201), resp.text data = resp.json() yield data await client.delete(f"/api/repos/{data['repoId']}") def _make_repo(tmp: pathlib.Path) -> tuple[pathlib.Path, str, dict]: tmp.mkdir(parents=True, exist_ok=True) dot = muse_dir(tmp) dot.mkdir() (dot / "repo.json").write_text('{"repo_id":"phase3-test","owner":"gabriel"}') for d in ("commits", "snapshots", "objects"): (dot / d).mkdir() (dot / "refs" / "heads").mkdir(parents=True) (dot / "HEAD").write_text("ref: refs/heads/main\n") (dot / "config.toml").write_text("") blob_ids: list[str] = [] for i in range(_N_FILES): data = f"phase3-base-{i:04d}".encode() + b"x" * _BLOB_SIZE oid = blob_id(data) write_object(tmp, oid, data) blob_ids.append(oid) base_manifest = {f"src/file_{i:04d}.py": blob_ids[i] for i in range(_N_FILES)} parent = None tip = "" ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) for i in range(_N_COMMITS): manifest = dict(base_manifest) for j in range(_FILES_CHANGED): idx = (i * _FILES_CHANGED + j) % _N_FILES raw = f"phase3-c{i:04d}-f{j}".encode() + b"y" * _BLOB_SIZE oid = blob_id(raw) write_object(tmp, oid, raw) manifest[f"src/file_{idx:04d}.py"] = oid sid = compute_snapshot_id(manifest) write_snapshot(tmp, SnapshotRecord(snapshot_id=sid, manifest=manifest)) msg = f"commit-{i:05d}" cid = compute_commit_id( parent_ids=[parent] if parent else [], snapshot_id=sid, message=msg, committed_at_iso=ts.isoformat(), author="gabriel", ) write_commit(tmp, CommitRecord( commit_id=cid, branch="main", snapshot_id=sid, message=msg, committed_at=ts, parent_commit_id=parent, parent2_commit_id=None, author="gabriel", metadata={}, structured_delta=None, sem_ver_bump="none", breaking_changes=[], agent_id="", model_id="", toolchain_id="", prompt_hash="", signature="", signer_key_id="", )) parent = cid tip = cid ts += datetime.timedelta(seconds=60) write_branch_ref(tmp, "main", tip) mpack = build_mpack(tmp, [tip], have=[]) return tmp, tip, mpack async def _push_mpack(client: AsyncClient, repo_slug: str, mpack: bytes, head: str, db_session: AsyncSession, repo_id: str) -> str: """Upload mpack to MinIO via presign, then create a mpack.index job row directly. The push route is synchronous — it doesn't return a job_id. Tests that need to call process_mpack_index_job directly must create the job row themselves. """ import httpx as _httpx from datetime import datetime, timezone from musehub.core.genesis import compute_job_id as _compute_job_id wire_bytes = msgpack.packb(mpack, use_bin_type=True) mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() pr = await client.post( f"/gabriel/{repo_slug}/push/mpack-presign", content=msgpack.packb({"mpack_key": mpack_key, "size_bytes": len(wire_bytes)}, use_bin_type=True), headers={"Content-Type": "application/x-msgpack"}, ) assert pr.status_code == 200, pr.text upload_url = pr.json().get("upload_url") or pr.json().get("uploadUrl") async with _httpx.AsyncClient() as raw: put = await raw.put(upload_url, content=wire_bytes) assert put.status_code in (200, 204) _now = datetime.now(tz=timezone.utc) job_id = _compute_job_id(repo_id, "mpack.index", _now.isoformat()) db_session.add(MusehubBackgroundJob( job_id=job_id, repo_id=repo_id, job_type="mpack.index", payload={ "mpack_key": mpack_key, "pusher_id": _AUTH_CTX.identity_id, "branch": "main", "head": head, "force": False, "declared_objects_count": None, }, status="pending", )) await db_session.flush() return job_id # ── 3a: known-hash blocklist ───────────────────────────────────────────────── @pytest.mark.skip(reason="muse wire protocol in flux") @pytest.mark.asyncio async def test_blocked_object_quarantines_mpack( client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """An object whose sha256 ID is in musehub_blocked_hashes quarantines the mpack. The blocklist check runs after Phase 2 validation, before any MinIO PUTs. The job must raise MPackValidationError and set status='quarantined'. """ _, head, mpack = _make_repo(tmp_path / "repo") repo_slug = repo["slug"] job_id = await _push_mpack(client, repo_slug, mpack, head, db_session, repo["repoId"]) # Block one of the objects that will be in this mpack raw_objects = mpack.get("objects") or [] assert raw_objects, "mpack must have objects" blocked_oid = raw_objects[0]["object_id"] db_session.add(MusehubBlockedHash( object_id=blocked_oid, reason="test NCMEC block", )) await db_session.flush() from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError with pytest.raises(MPackValidationError, match="blocked"): await process_mpack_index_job(db_session, job_id) await db_session.commit() db_session.expire_all() job_row = (await db_session.execute( select(MusehubBackgroundJob).where(MusehubBackgroundJob.job_id == job_id) )).scalar_one() assert job_row.status == "quarantined", ( f"expected 'quarantined' after blocked-hash detection, got '{job_row.status}'" ) @pytest.mark.skip(reason="muse wire protocol in flux") @pytest.mark.asyncio async def test_blocked_check_fires_before_minio_puts( client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """Blocklist check fires before backend.put — zero MinIO writes on blocked mpack. Verifies the order: validate (Phase 2) → blocklist check → MinIO PUTs. When a blocked hash is detected, backend.put must never be called. """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo["slug"], mpack, head, db_session, repo["repoId"]) raw_objects = mpack.get("objects") or [] blocked_oid = raw_objects[0]["object_id"] db_session.add(MusehubBlockedHash(object_id=blocked_oid, reason="test")) await db_session.flush() from musehub.storage.backends import get_backend as _get_backend backend = _get_backend() put_calls: list[str] = [] _real_put = backend.put async def _spy(oid: str, data: bytes) -> None: put_calls.append(oid) return await _real_put(oid, data) from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError with patch.object(backend, "put", side_effect=_spy): with pytest.raises(MPackValidationError): await process_mpack_index_job(db_session, job_id) assert not put_calls, ( f"backend.put was called {len(put_calls)} time(s) despite blocked hash — " f"blocklist check must run before MinIO writes" ) @pytest.mark.skip(reason="muse wire protocol in flux") @pytest.mark.asyncio async def test_multiple_blocked_objects_all_reported( client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """MPackValidationError message reports all blocked object IDs, not just the first. When a mpack contains multiple blocked objects, the error should list them so the operator knows the full extent of the quarantine. """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo["slug"], mpack, head, db_session, repo["repoId"]) raw_objects = mpack.get("objects") or [] assert len(raw_objects) >= 2, "need at least 2 objects to test multi-blocked" blocked_oids = [raw_objects[0]["object_id"], raw_objects[1]["object_id"]] for oid in blocked_oids: db_session.add(MusehubBlockedHash(object_id=oid, reason="test")) await db_session.flush() from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError with pytest.raises(MPackValidationError) as exc_info: await process_mpack_index_job(db_session, job_id) err_msg = str(exc_info.value) assert "2" in err_msg or "blocked" in err_msg.lower(), ( f"expected error to mention count/blocked objects, got: {err_msg!r}" ) @pytest.mark.skip(reason="muse wire protocol in flux") @pytest.mark.asyncio async def test_clean_mpack_bypasses_blocklist_unimpeded( client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """Regression: a mpack with no blocked objects indexes normally. The blocklist table may have entries for other objects — none matching this mpack's content. process_mpack_index_job must complete normally. """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo["slug"], mpack, head, db_session, repo["repoId"]) # Populate blocklist with a hash that's NOT in this mpack db_session.add(MusehubBlockedHash( object_id="sha256:" + "f" * 64, reason="unrelated block", )) await db_session.flush() from musehub.services.musehub_wire import process_mpack_index_job result = await process_mpack_index_job(db_session, job_id) await db_session.commit() assert result["commits_written"] > 0, "clean mpack must index normally despite non-matching blocklist entries" # ── 3c: DMCA takedown endpoint ─────────────────────────────────────────────── @pytest.mark.asyncio async def test_dmca_takedown_adds_hashes_to_blocklist( client: AsyncClient, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """POST /api/admin/takedown adds object_ids to musehub_blocked_hashes. After a successful takedown request, each supplied object_id must appear in musehub_blocked_hashes with the supplied reason. """ oids = [ "sha256:" + hashlib.sha256(f"dmca-object-{i}".encode()).hexdigest() for i in range(3) ] resp = await client.post( "/api/admin/takedown", json={"object_ids": oids, "reason": "DMCA request #12345"}, ) assert resp.status_code == 200, resp.text db_session.expire_all() rows = (await db_session.execute( select(MusehubBlockedHash).where(MusehubBlockedHash.object_id.in_(oids)) )).scalars().all() assert len(rows) == len(oids), ( f"expected {len(oids)} blocked_hash rows, got {len(rows)}" ) for row in rows: assert "DMCA" in (row.reason or ""), ( f"expected reason to contain 'DMCA', got {row.reason!r}" ) @pytest.mark.asyncio async def test_dmca_takedown_marks_repos_dmca_hold( client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """POST /api/admin/takedown with repo_ids sets dmca_hold=True on each repo. A repo under dmca_hold must have its flag persisted so push gates and serve paths can enforce the hold. """ repo_id = repo["repoId"] oids = ["sha256:" + hashlib.sha256(b"dmca-hold-test").hexdigest()] resp = await client.post( "/api/admin/takedown", json={"object_ids": oids, "reason": "DMCA #hold-test", "repo_ids": [repo_id]}, ) assert resp.status_code == 200, resp.text data = resp.json() assert data.get("repos_held", 0) == 1, ( f"expected repos_held=1 in response, got: {data}" ) db_session.expire_all() repo_row = (await db_session.execute( select(MusehubRepo).where(MusehubRepo.repo_id == repo_id) )).scalar_one() assert repo_row.dmca_hold is True, ( f"expected repo.dmca_hold=True after takedown with repo_ids, got {repo_row.dmca_hold}" ) @pytest.mark.asyncio async def test_dmca_takedown_quarantines_existing_minio_objects( client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """POST /api/admin/takedown moves already-stored MinIO objects to quarantine. Objects that were written to MinIO before the takedown must be moved to the quarantine prefix so they are no longer publicly fetchable. """ # Write a real object to MinIO first (simulates a previously indexed object) from musehub.storage.backends import get_backend as _get_backend backend = _get_backend() obj_data = b"sensitive-content-to-quarantine-" + b"x" * 100 oid = "sha256:" + hashlib.sha256(obj_data).hexdigest() await backend.put(oid, obj_data) # Verify it exists in MinIO before takedown assert await backend.get(oid) is not None, "object must be in MinIO before takedown" resp = await client.post( "/api/admin/takedown", json={"object_ids": [oid], "reason": "DMCA quarantine test"}, ) assert resp.status_code == 200, resp.text data = resp.json() assert data.get("quarantined_count", 0) >= 1, ( f"expected quarantined_count >= 1 in response, got: {data}" ) # Object must no longer be accessible from the main bucket assert await backend.get(oid) is None, ( "object must be removed from main MinIO bucket after DMCA takedown" ) @pytest.mark.asyncio async def test_dmca_takedown_requires_admin( non_admin_client: AsyncClient, db_session: AsyncSession, ) -> None: """POST /api/admin/takedown returns 403 for non-admin callers. The takedown endpoint is admin-only — a non-admin identity must receive 403 Forbidden, not 401 (auth header is valid, permissions are insufficient). """ resp = await non_admin_client.post( "/api/admin/takedown", json={"object_ids": ["sha256:" + "a" * 64], "reason": "test"}, ) assert resp.status_code == 403, ( f"expected 403 for non-admin takedown request, got {resp.status_code}: {resp.text}" ) @pytest.mark.asyncio async def test_dmca_takedown_idempotent( client: AsyncClient, db_session: AsyncSession, ) -> None: """POST /api/admin/takedown is idempotent — re-blocking an already-blocked hash is safe. Calling takedown twice with the same object_ids must not raise an error or create duplicate rows. """ oid = "sha256:" + hashlib.sha256(b"idempotent-takedown").hexdigest() payload = {"object_ids": [oid], "reason": "idempotent test"} resp1 = await client.post("/api/admin/takedown", json=payload) assert resp1.status_code == 200, resp1.text resp2 = await client.post("/api/admin/takedown", json=payload) assert resp2.status_code == 200, resp2.text db_session.expire_all() rows = (await db_session.execute( select(MusehubBlockedHash).where(MusehubBlockedHash.object_id == oid) )).scalars().all() assert len(rows) == 1, f"expected exactly 1 blocked_hash row after idempotent calls, got {len(rows)}" # ── 3b: content.scan job infrastructure ────────────────────────────────────── @pytest.mark.skip(reason="muse wire protocol in flux") @pytest.mark.asyncio async def test_content_scan_jobs_enqueued_after_indexing( client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession, ) -> None: """After mpack.index, content.scan jobs are enqueued for indexed objects. Each indexed object gets a content.scan job so CSAM APIs can be integrated as a drop-in when legal review completes. The job type is 'content.scan' and the payload contains the object_id and repo_id. """ _, head, mpack = _make_repo(tmp_path / "repo") job_id = await _push_mpack(client, repo["slug"], mpack, head, db_session, repo["repoId"]) from musehub.services.musehub_wire import process_mpack_index_job result = await process_mpack_index_job(db_session, job_id) await db_session.commit() raw_objects = mpack.get("objects") or [] all_oids = {obj["object_id"] for obj in raw_objects} scan_jobs = (await db_session.execute( select(MusehubBackgroundJob) .where(MusehubBackgroundJob.job_type == "content.scan") .where(MusehubBackgroundJob.repo_id == repo["repoId"]) )).scalars().all() assert scan_jobs, ( "expected content.scan jobs to be enqueued after mpack.index, got none — " "3b job infrastructure missing" ) scan_oids = {j.payload.get("object_id") for j in scan_jobs} assert scan_oids <= all_oids, ( f"content.scan jobs reference object_ids not in mpack: {scan_oids - all_oids}" )