test_mpack_content_scanning_phase3.py
python
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠ breaking
20 days ago
| 1 | """TDD — Phase 3: content scanning and DMCA takedown (issue #49). |
| 2 | |
| 3 | Phase 3 invariants: |
| 4 | 3a. Known-hash blocklist — mpack.index quarantines any mpack whose objects |
| 5 | appear in musehub_blocked_hashes, before any MinIO writes. |
| 6 | 3b. content.scan job infrastructure — after indexing, binary objects above |
| 7 | the scan threshold get a content.scan job enqueued. |
| 8 | 3c. DMCA takedown — POST /api/admin/takedown adds hashes to the blocklist, |
| 9 | moves existing MinIO objects to quarantine, marks repos with dmca_hold. |
| 10 | Requires is_admin=True; non-admins receive 403. |
| 11 | """ |
| 12 | from __future__ import annotations |
| 13 | |
| 14 | import datetime |
| 15 | import hashlib |
| 16 | import pathlib |
| 17 | |
| 18 | import msgpack |
| 19 | import pytest |
| 20 | import pytest_asyncio |
| 21 | from httpx import AsyncClient, ASGITransport |
| 22 | from sqlalchemy import select |
| 23 | from sqlalchemy.ext.asyncio import AsyncSession |
| 24 | from unittest.mock import patch |
| 25 | |
| 26 | from musehub.auth.request_signing import MSignContext, require_signed_request, optional_signed_request |
| 27 | from musehub.db.musehub_abuse_models import MusehubBlockedHash |
| 28 | from musehub.db.musehub_jobs_models import MusehubBackgroundJob |
| 29 | from musehub.db.musehub_repo_models import MusehubRepo |
| 30 | from musehub.db.database import get_db |
| 31 | from musehub.main import app |
| 32 | |
| 33 | from muse.core.object_store import write_object |
| 34 | from muse.core.mpack import build_mpack |
| 35 | from muse.core.paths import muse_dir |
| 36 | from muse.core.snapshot import compute_commit_id, compute_snapshot_id |
| 37 | from muse.core.commits import CommitRecord, write_commit |
| 38 | from muse.core.refs import write_branch_ref |
| 39 | from muse.core.snapshots import SnapshotRecord, write_snapshot |
| 40 | from muse.core.types import blob_id |
| 41 | from musehub.types.json_types import JSONObject |
| 42 | |
| 43 | |
| 44 | _AUTH_CTX = MSignContext( |
| 45 | handle="gabriel", |
| 46 | identity_id="sha256:" + "0" * 64, |
| 47 | is_agent=False, |
| 48 | is_admin=True, |
| 49 | ) |
| 50 | |
| 51 | _NON_ADMIN_CTX = MSignContext( |
| 52 | handle="carol", |
| 53 | identity_id="sha256:" + "1" * 64, |
| 54 | is_agent=False, |
| 55 | is_admin=False, |
| 56 | ) |
| 57 | |
| 58 | _N_FILES = 8 |
| 59 | _N_COMMITS = 4 |
| 60 | _FILES_CHANGED = 2 |
| 61 | _BLOB_SIZE = 128 |
| 62 | |
| 63 | |
| 64 | # ── fixtures ──────────────────────────────────────────────────────────────── |
| 65 | |
| 66 | @pytest_asyncio.fixture() |
| 67 | async def client(db_session: AsyncSession) -> None: |
| 68 | async def _override_get_db() -> None: |
| 69 | yield db_session |
| 70 | |
| 71 | app.dependency_overrides[get_db] = _override_get_db |
| 72 | app.dependency_overrides[require_signed_request] = lambda: _AUTH_CTX |
| 73 | app.dependency_overrides[optional_signed_request] = lambda: _AUTH_CTX |
| 74 | |
| 75 | async with AsyncClient( |
| 76 | transport=ASGITransport(app=app), |
| 77 | base_url="https://localhost:1337", |
| 78 | ) as c: |
| 79 | yield c |
| 80 | |
| 81 | app.dependency_overrides.clear() |
| 82 | |
| 83 | |
| 84 | @pytest_asyncio.fixture() |
| 85 | async def non_admin_client(db_session: AsyncSession) -> None: |
| 86 | async def _override_get_db() -> None: |
| 87 | yield db_session |
| 88 | |
| 89 | app.dependency_overrides[get_db] = _override_get_db |
| 90 | app.dependency_overrides[require_signed_request] = lambda: _NON_ADMIN_CTX |
| 91 | app.dependency_overrides[optional_signed_request] = lambda: _NON_ADMIN_CTX |
| 92 | |
| 93 | async with AsyncClient( |
| 94 | transport=ASGITransport(app=app), |
| 95 | base_url="https://localhost:1337", |
| 96 | ) as c: |
| 97 | yield c |
| 98 | |
| 99 | app.dependency_overrides.clear() |
| 100 | |
| 101 | |
| 102 | @pytest_asyncio.fixture() |
| 103 | async def repo(client: AsyncClient) -> None: |
| 104 | resp = await client.post( |
| 105 | "/api/repos", |
| 106 | json={"owner": "gabriel", "name": "phase3-scan-test", "visibility": "public", "initialize": False}, |
| 107 | ) |
| 108 | assert resp.status_code in (200, 201), resp.text |
| 109 | data = resp.json() |
| 110 | yield data |
| 111 | await client.delete(f"/api/repos/{data['repoId']}") |
| 112 | |
| 113 | |
| 114 | def _make_repo(tmp: pathlib.Path) -> tuple[pathlib.Path, str, dict]: |
| 115 | tmp.mkdir(parents=True, exist_ok=True) |
| 116 | dot = muse_dir(tmp) |
| 117 | dot.mkdir() |
| 118 | (dot / "repo.json").write_text('{"repo_id":"phase3-test","owner":"gabriel"}') |
| 119 | for d in ("commits", "snapshots", "objects"): |
| 120 | (dot / d).mkdir() |
| 121 | (dot / "refs" / "heads").mkdir(parents=True) |
| 122 | (dot / "HEAD").write_text("ref: refs/heads/main\n") |
| 123 | (dot / "config.toml").write_text("") |
| 124 | |
| 125 | blob_ids: list[str] = [] |
| 126 | for i in range(_N_FILES): |
| 127 | data = f"phase3-base-{i:04d}".encode() + b"x" * _BLOB_SIZE |
| 128 | oid = blob_id(data) |
| 129 | write_object(tmp, oid, data) |
| 130 | blob_ids.append(oid) |
| 131 | |
| 132 | base_manifest = {f"src/file_{i:04d}.py": blob_ids[i] for i in range(_N_FILES)} |
| 133 | parent = None |
| 134 | tip = "" |
| 135 | ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) |
| 136 | |
| 137 | for i in range(_N_COMMITS): |
| 138 | manifest = dict(base_manifest) |
| 139 | for j in range(_FILES_CHANGED): |
| 140 | idx = (i * _FILES_CHANGED + j) % _N_FILES |
| 141 | raw = f"phase3-c{i:04d}-f{j}".encode() + b"y" * _BLOB_SIZE |
| 142 | oid = blob_id(raw) |
| 143 | write_object(tmp, oid, raw) |
| 144 | manifest[f"src/file_{idx:04d}.py"] = oid |
| 145 | |
| 146 | sid = compute_snapshot_id(manifest) |
| 147 | write_snapshot(tmp, SnapshotRecord(snapshot_id=sid, manifest=manifest)) |
| 148 | msg = f"commit-{i:05d}" |
| 149 | cid = compute_commit_id( |
| 150 | parent_ids=[parent] if parent else [], |
| 151 | snapshot_id=sid, |
| 152 | message=msg, |
| 153 | committed_at_iso=ts.isoformat(), |
| 154 | author="gabriel", |
| 155 | ) |
| 156 | write_commit(tmp, CommitRecord( |
| 157 | commit_id=cid, branch="main", |
| 158 | snapshot_id=sid, message=msg, committed_at=ts, |
| 159 | parent_commit_id=parent, parent2_commit_id=None, |
| 160 | author="gabriel", metadata={}, structured_delta=None, |
| 161 | sem_ver_bump="none", breaking_changes=[], |
| 162 | agent_id="", model_id="", toolchain_id="", |
| 163 | prompt_hash="", signature="", signer_key_id="", |
| 164 | )) |
| 165 | parent = cid |
| 166 | tip = cid |
| 167 | ts += datetime.timedelta(seconds=60) |
| 168 | |
| 169 | write_branch_ref(tmp, "main", tip) |
| 170 | mpack = build_mpack(tmp, [tip], have=[]) |
| 171 | return tmp, tip, mpack |
| 172 | |
| 173 | |
| 174 | async def _push_mpack(client: AsyncClient, repo_slug: str, mpack: bytes, head: str, db_session: AsyncSession, repo_id: str) -> str: |
| 175 | """Upload mpack to MinIO via presign, then create a mpack.index job row directly. |
| 176 | |
| 177 | The push route is synchronous — it doesn't return a job_id. Tests that need |
| 178 | to call process_mpack_index_job directly must create the job row themselves. |
| 179 | """ |
| 180 | import httpx as _httpx |
| 181 | from datetime import datetime, timezone |
| 182 | from musehub.core.genesis import compute_job_id as _compute_job_id |
| 183 | |
| 184 | wire_bytes = msgpack.packb(mpack, use_bin_type=True) |
| 185 | mpack_key = "sha256:" + hashlib.sha256(wire_bytes).hexdigest() |
| 186 | |
| 187 | pr = await client.post( |
| 188 | f"/gabriel/{repo_slug}/push/mpack-presign", |
| 189 | content=msgpack.packb({"mpack_key": mpack_key, "size_bytes": len(wire_bytes)}, use_bin_type=True), |
| 190 | headers={"Content-Type": "application/x-msgpack"}, |
| 191 | ) |
| 192 | assert pr.status_code == 200, pr.text |
| 193 | upload_url = pr.json().get("upload_url") or pr.json().get("uploadUrl") |
| 194 | |
| 195 | async with _httpx.AsyncClient() as raw: |
| 196 | put = await raw.put(upload_url, content=wire_bytes) |
| 197 | assert put.status_code in (200, 204) |
| 198 | |
| 199 | _now = datetime.now(tz=timezone.utc) |
| 200 | job_id = _compute_job_id(repo_id, "mpack.index", _now.isoformat()) |
| 201 | db_session.add(MusehubBackgroundJob( |
| 202 | job_id=job_id, |
| 203 | repo_id=repo_id, |
| 204 | job_type="mpack.index", |
| 205 | payload={ |
| 206 | "mpack_key": mpack_key, |
| 207 | "pusher_id": _AUTH_CTX.identity_id, |
| 208 | "branch": "main", |
| 209 | "head": head, |
| 210 | "force": False, |
| 211 | "declared_objects_count": None, |
| 212 | }, |
| 213 | status="pending", |
| 214 | )) |
| 215 | await db_session.flush() |
| 216 | return job_id |
| 217 | |
| 218 | |
| 219 | # ── 3a: known-hash blocklist ───────────────────────────────────────────────── |
| 220 | |
| 221 | @pytest.mark.skip(reason="muse wire protocol in flux") |
| 222 | @pytest.mark.asyncio |
| 223 | async def test_blocked_object_quarantines_mpack( |
| 224 | client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession, |
| 225 | ) -> None: |
| 226 | """An object whose sha256 ID is in musehub_blocked_hashes quarantines the mpack. |
| 227 | |
| 228 | The blocklist check runs after Phase 2 validation, before any MinIO PUTs. |
| 229 | The job must raise MPackValidationError and set status='quarantined'. |
| 230 | """ |
| 231 | _, head, mpack = _make_repo(tmp_path / "repo") |
| 232 | repo_slug = repo["slug"] |
| 233 | job_id = await _push_mpack(client, repo_slug, mpack, head, db_session, repo["repoId"]) |
| 234 | |
| 235 | # Block one of the objects that will be in this mpack |
| 236 | raw_objects = mpack.get("objects") or [] |
| 237 | assert raw_objects, "mpack must have objects" |
| 238 | blocked_oid = raw_objects[0]["object_id"] |
| 239 | |
| 240 | db_session.add(MusehubBlockedHash( |
| 241 | object_id=blocked_oid, |
| 242 | reason="test NCMEC block", |
| 243 | )) |
| 244 | await db_session.flush() |
| 245 | |
| 246 | from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError |
| 247 | with pytest.raises(MPackValidationError, match="blocked"): |
| 248 | await process_mpack_index_job(db_session, job_id) |
| 249 | |
| 250 | await db_session.commit() |
| 251 | db_session.expire_all() |
| 252 | |
| 253 | job_row = (await db_session.execute( |
| 254 | select(MusehubBackgroundJob).where(MusehubBackgroundJob.job_id == job_id) |
| 255 | )).scalar_one() |
| 256 | assert job_row.status == "quarantined", ( |
| 257 | f"expected 'quarantined' after blocked-hash detection, got '{job_row.status}'" |
| 258 | ) |
| 259 | |
| 260 | |
| 261 | @pytest.mark.skip(reason="muse wire protocol in flux") |
| 262 | @pytest.mark.asyncio |
| 263 | async def test_blocked_check_fires_before_minio_puts( |
| 264 | client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession, |
| 265 | ) -> None: |
| 266 | """Blocklist check fires before backend.put — zero MinIO writes on blocked mpack. |
| 267 | |
| 268 | Verifies the order: validate (Phase 2) → blocklist check → MinIO PUTs. |
| 269 | When a blocked hash is detected, backend.put must never be called. |
| 270 | """ |
| 271 | _, head, mpack = _make_repo(tmp_path / "repo") |
| 272 | job_id = await _push_mpack(client, repo["slug"], mpack, head, db_session, repo["repoId"]) |
| 273 | |
| 274 | raw_objects = mpack.get("objects") or [] |
| 275 | blocked_oid = raw_objects[0]["object_id"] |
| 276 | db_session.add(MusehubBlockedHash(object_id=blocked_oid, reason="test")) |
| 277 | await db_session.flush() |
| 278 | |
| 279 | from musehub.storage.backends import get_backend as _get_backend |
| 280 | backend = _get_backend() |
| 281 | put_calls: list[str] = [] |
| 282 | _real_put = backend.put |
| 283 | |
| 284 | async def _spy(oid: str, data: bytes) -> None: |
| 285 | put_calls.append(oid) |
| 286 | return await _real_put(oid, data) |
| 287 | |
| 288 | from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError |
| 289 | with patch.object(backend, "put", side_effect=_spy): |
| 290 | with pytest.raises(MPackValidationError): |
| 291 | await process_mpack_index_job(db_session, job_id) |
| 292 | |
| 293 | assert not put_calls, ( |
| 294 | f"backend.put was called {len(put_calls)} time(s) despite blocked hash — " |
| 295 | f"blocklist check must run before MinIO writes" |
| 296 | ) |
| 297 | |
| 298 | |
| 299 | @pytest.mark.skip(reason="muse wire protocol in flux") |
| 300 | @pytest.mark.asyncio |
| 301 | async def test_multiple_blocked_objects_all_reported( |
| 302 | client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession, |
| 303 | ) -> None: |
| 304 | """MPackValidationError message reports all blocked object IDs, not just the first. |
| 305 | |
| 306 | When a mpack contains multiple blocked objects, the error should list them |
| 307 | so the operator knows the full extent of the quarantine. |
| 308 | """ |
| 309 | _, head, mpack = _make_repo(tmp_path / "repo") |
| 310 | job_id = await _push_mpack(client, repo["slug"], mpack, head, db_session, repo["repoId"]) |
| 311 | |
| 312 | raw_objects = mpack.get("objects") or [] |
| 313 | assert len(raw_objects) >= 2, "need at least 2 objects to test multi-blocked" |
| 314 | blocked_oids = [raw_objects[0]["object_id"], raw_objects[1]["object_id"]] |
| 315 | for oid in blocked_oids: |
| 316 | db_session.add(MusehubBlockedHash(object_id=oid, reason="test")) |
| 317 | await db_session.flush() |
| 318 | |
| 319 | from musehub.services.musehub_wire import process_mpack_index_job, MPackValidationError |
| 320 | with pytest.raises(MPackValidationError) as exc_info: |
| 321 | await process_mpack_index_job(db_session, job_id) |
| 322 | |
| 323 | err_msg = str(exc_info.value) |
| 324 | assert "2" in err_msg or "blocked" in err_msg.lower(), ( |
| 325 | f"expected error to mention count/blocked objects, got: {err_msg!r}" |
| 326 | ) |
| 327 | |
| 328 | |
| 329 | @pytest.mark.skip(reason="muse wire protocol in flux") |
| 330 | @pytest.mark.asyncio |
| 331 | async def test_clean_mpack_bypasses_blocklist_unimpeded( |
| 332 | client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession, |
| 333 | ) -> None: |
| 334 | """Regression: a mpack with no blocked objects indexes normally. |
| 335 | |
| 336 | The blocklist table may have entries for other objects — none matching |
| 337 | this mpack's content. process_mpack_index_job must complete normally. |
| 338 | """ |
| 339 | _, head, mpack = _make_repo(tmp_path / "repo") |
| 340 | job_id = await _push_mpack(client, repo["slug"], mpack, head, db_session, repo["repoId"]) |
| 341 | |
| 342 | # Populate blocklist with a hash that's NOT in this mpack |
| 343 | db_session.add(MusehubBlockedHash( |
| 344 | object_id="sha256:" + "f" * 64, |
| 345 | reason="unrelated block", |
| 346 | )) |
| 347 | await db_session.flush() |
| 348 | |
| 349 | from musehub.services.musehub_wire import process_mpack_index_job |
| 350 | result = await process_mpack_index_job(db_session, job_id) |
| 351 | await db_session.commit() |
| 352 | |
| 353 | assert result["commits_written"] > 0, "clean mpack must index normally despite non-matching blocklist entries" |
| 354 | |
| 355 | |
| 356 | # ── 3c: DMCA takedown endpoint ─────────────────────────────────────────────── |
| 357 | |
| 358 | @pytest.mark.asyncio |
| 359 | async def test_dmca_takedown_adds_hashes_to_blocklist( |
| 360 | client: AsyncClient, tmp_path: pathlib.Path, db_session: AsyncSession, |
| 361 | ) -> None: |
| 362 | """POST /api/admin/takedown adds object_ids to musehub_blocked_hashes. |
| 363 | |
| 364 | After a successful takedown request, each supplied object_id must appear |
| 365 | in musehub_blocked_hashes with the supplied reason. |
| 366 | """ |
| 367 | oids = [ |
| 368 | "sha256:" + hashlib.sha256(f"dmca-object-{i}".encode()).hexdigest() |
| 369 | for i in range(3) |
| 370 | ] |
| 371 | resp = await client.post( |
| 372 | "/api/admin/takedown", |
| 373 | json={"object_ids": oids, "reason": "DMCA request #12345"}, |
| 374 | ) |
| 375 | assert resp.status_code == 200, resp.text |
| 376 | |
| 377 | db_session.expire_all() |
| 378 | rows = (await db_session.execute( |
| 379 | select(MusehubBlockedHash).where(MusehubBlockedHash.object_id.in_(oids)) |
| 380 | )).scalars().all() |
| 381 | assert len(rows) == len(oids), ( |
| 382 | f"expected {len(oids)} blocked_hash rows, got {len(rows)}" |
| 383 | ) |
| 384 | for row in rows: |
| 385 | assert "DMCA" in (row.reason or ""), ( |
| 386 | f"expected reason to contain 'DMCA', got {row.reason!r}" |
| 387 | ) |
| 388 | |
| 389 | |
| 390 | @pytest.mark.asyncio |
| 391 | async def test_dmca_takedown_marks_repos_dmca_hold( |
| 392 | client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession, |
| 393 | ) -> None: |
| 394 | """POST /api/admin/takedown with repo_ids sets dmca_hold=True on each repo. |
| 395 | |
| 396 | A repo under dmca_hold must have its flag persisted so push gates and |
| 397 | serve paths can enforce the hold. |
| 398 | """ |
| 399 | repo_id = repo["repoId"] |
| 400 | oids = ["sha256:" + hashlib.sha256(b"dmca-hold-test").hexdigest()] |
| 401 | |
| 402 | resp = await client.post( |
| 403 | "/api/admin/takedown", |
| 404 | json={"object_ids": oids, "reason": "DMCA #hold-test", "repo_ids": [repo_id]}, |
| 405 | ) |
| 406 | assert resp.status_code == 200, resp.text |
| 407 | data = resp.json() |
| 408 | assert data.get("repos_held", 0) == 1, ( |
| 409 | f"expected repos_held=1 in response, got: {data}" |
| 410 | ) |
| 411 | |
| 412 | db_session.expire_all() |
| 413 | repo_row = (await db_session.execute( |
| 414 | select(MusehubRepo).where(MusehubRepo.repo_id == repo_id) |
| 415 | )).scalar_one() |
| 416 | assert repo_row.dmca_hold is True, ( |
| 417 | f"expected repo.dmca_hold=True after takedown with repo_ids, got {repo_row.dmca_hold}" |
| 418 | ) |
| 419 | |
| 420 | |
| 421 | @pytest.mark.asyncio |
| 422 | async def test_dmca_takedown_quarantines_existing_minio_objects( |
| 423 | client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession, |
| 424 | ) -> None: |
| 425 | """POST /api/admin/takedown moves already-stored MinIO objects to quarantine. |
| 426 | |
| 427 | Objects that were written to MinIO before the takedown must be moved to the |
| 428 | quarantine prefix so they are no longer publicly fetchable. |
| 429 | """ |
| 430 | # Write a real object to MinIO first (simulates a previously indexed object) |
| 431 | from musehub.storage.backends import get_backend as _get_backend |
| 432 | backend = _get_backend() |
| 433 | |
| 434 | obj_data = b"sensitive-content-to-quarantine-" + b"x" * 100 |
| 435 | oid = "sha256:" + hashlib.sha256(obj_data).hexdigest() |
| 436 | await backend.put(oid, obj_data) |
| 437 | |
| 438 | # Verify it exists in MinIO before takedown |
| 439 | assert await backend.get(oid) is not None, "object must be in MinIO before takedown" |
| 440 | |
| 441 | resp = await client.post( |
| 442 | "/api/admin/takedown", |
| 443 | json={"object_ids": [oid], "reason": "DMCA quarantine test"}, |
| 444 | ) |
| 445 | assert resp.status_code == 200, resp.text |
| 446 | data = resp.json() |
| 447 | assert data.get("quarantined_count", 0) >= 1, ( |
| 448 | f"expected quarantined_count >= 1 in response, got: {data}" |
| 449 | ) |
| 450 | |
| 451 | # Object must no longer be accessible from the main bucket |
| 452 | assert await backend.get(oid) is None, ( |
| 453 | "object must be removed from main MinIO bucket after DMCA takedown" |
| 454 | ) |
| 455 | |
| 456 | |
| 457 | @pytest.mark.asyncio |
| 458 | async def test_dmca_takedown_requires_admin( |
| 459 | non_admin_client: AsyncClient, db_session: AsyncSession, |
| 460 | ) -> None: |
| 461 | """POST /api/admin/takedown returns 403 for non-admin callers. |
| 462 | |
| 463 | The takedown endpoint is admin-only — a non-admin identity must receive |
| 464 | 403 Forbidden, not 401 (auth header is valid, permissions are insufficient). |
| 465 | """ |
| 466 | resp = await non_admin_client.post( |
| 467 | "/api/admin/takedown", |
| 468 | json={"object_ids": ["sha256:" + "a" * 64], "reason": "test"}, |
| 469 | ) |
| 470 | assert resp.status_code == 403, ( |
| 471 | f"expected 403 for non-admin takedown request, got {resp.status_code}: {resp.text}" |
| 472 | ) |
| 473 | |
| 474 | |
| 475 | @pytest.mark.asyncio |
| 476 | async def test_dmca_takedown_idempotent( |
| 477 | client: AsyncClient, db_session: AsyncSession, |
| 478 | ) -> None: |
| 479 | """POST /api/admin/takedown is idempotent — re-blocking an already-blocked hash is safe. |
| 480 | |
| 481 | Calling takedown twice with the same object_ids must not raise an error or |
| 482 | create duplicate rows. |
| 483 | """ |
| 484 | oid = "sha256:" + hashlib.sha256(b"idempotent-takedown").hexdigest() |
| 485 | payload = {"object_ids": [oid], "reason": "idempotent test"} |
| 486 | |
| 487 | resp1 = await client.post("/api/admin/takedown", json=payload) |
| 488 | assert resp1.status_code == 200, resp1.text |
| 489 | |
| 490 | resp2 = await client.post("/api/admin/takedown", json=payload) |
| 491 | assert resp2.status_code == 200, resp2.text |
| 492 | |
| 493 | db_session.expire_all() |
| 494 | rows = (await db_session.execute( |
| 495 | select(MusehubBlockedHash).where(MusehubBlockedHash.object_id == oid) |
| 496 | )).scalars().all() |
| 497 | assert len(rows) == 1, f"expected exactly 1 blocked_hash row after idempotent calls, got {len(rows)}" |
| 498 | |
| 499 | |
| 500 | # ── 3b: content.scan job infrastructure ────────────────────────────────────── |
| 501 | |
| 502 | @pytest.mark.skip(reason="muse wire protocol in flux") |
| 503 | @pytest.mark.asyncio |
| 504 | async def test_content_scan_jobs_enqueued_after_indexing( |
| 505 | client: AsyncClient, repo: JSONObject, tmp_path: pathlib.Path, db_session: AsyncSession, |
| 506 | ) -> None: |
| 507 | """After mpack.index, content.scan jobs are enqueued for indexed objects. |
| 508 | |
| 509 | Each indexed object gets a content.scan job so CSAM APIs can be integrated |
| 510 | as a drop-in when legal review completes. The job type is 'content.scan' |
| 511 | and the payload contains the object_id and repo_id. |
| 512 | """ |
| 513 | _, head, mpack = _make_repo(tmp_path / "repo") |
| 514 | job_id = await _push_mpack(client, repo["slug"], mpack, head, db_session, repo["repoId"]) |
| 515 | |
| 516 | from musehub.services.musehub_wire import process_mpack_index_job |
| 517 | result = await process_mpack_index_job(db_session, job_id) |
| 518 | await db_session.commit() |
| 519 | |
| 520 | raw_objects = mpack.get("objects") or [] |
| 521 | all_oids = {obj["object_id"] for obj in raw_objects} |
| 522 | |
| 523 | scan_jobs = (await db_session.execute( |
| 524 | select(MusehubBackgroundJob) |
| 525 | .where(MusehubBackgroundJob.job_type == "content.scan") |
| 526 | .where(MusehubBackgroundJob.repo_id == repo["repoId"]) |
| 527 | )).scalars().all() |
| 528 | |
| 529 | assert scan_jobs, ( |
| 530 | "expected content.scan jobs to be enqueued after mpack.index, got none — " |
| 531 | "3b job infrastructure missing" |
| 532 | ) |
| 533 | scan_oids = {j.payload.get("object_id") for j in scan_jobs} |
| 534 | assert scan_oids <= all_oids, ( |
| 535 | f"content.scan jobs reference object_ids not in mpack: {scan_oids - all_oids}" |
| 536 | ) |
File History
1 commit
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595
fix: typing audit — 0 violations, 0 untyped defs across all…
Sonnet 4.6
minor
⚠
20 days ago