"""TDD: genesis-addressed IDs for every first-class semantic entity. CONTRACT (issue #10): Every first-class semantic entity in the Muse ecosystem has an identity derived from the minimal immutable facts about the moment it was declared to exist — its genesis context. The formula is universal: entity_id = ":" + hash(NUL.join(genesis_fields)).hexdigest() The algorithm prefix is intentionally not hardcoded to "sha256". Any canonical : form is valid. This future-proofs the system for hash algorithm upgrades without breaking the validator contract. Tier 1 — canonical form All compute_* functions return strings matching ^[a-z][a-z0-9]*:[0-9a-f]{32,}$. Tier 2 — determinism Same inputs always produce the same ID. Different inputs always produce different IDs (collision resistance tested with minimal single-field diffs). Tier 3 — separator injection safety Field values containing NUL bytes, pipe chars, colons, or path separators do not break the hash or allow crafted collisions. Tier 3b — Pydantic boundary enforcement Wire-protocol and response models reject non-canonical IDs at the Pydantic validation boundary. Both request models (untrusted input) and response models (service layer bug detection) are covered. Optional ID fields accept None and valid canonical IDs, reject malformed strings. Tier 4 — cross-verification (CLI ↔ hub) Functions that exist in both muse.core.genesis and musehub.core.genesis produce identical output for the same inputs. Tier 5 — derivation chain Entity IDs that take other entity IDs as genesis fields (e.g. issue_id takes repo_id) form a verifiable chain: changing the parent ID changes all descendant IDs. """ from __future__ import annotations import re import sys from datetime import datetime, timezone from pathlib import Path import pytest from muse.core.types import fake_id from pydantic import ValidationError from musehub.models.musehub import ( CreateRepoRequest, IssueCommentCreate, ProposalCommentCreate, ProposalReviewResponse, ReleaseAssetResponse, UserForkedRepoEntry, WebhookResponse, WireTagInput, ) from musehub.api.routes.musehub.collaborators import CollaboratorResponse from musehub.api.routes.musehub.labels import LabelResponse from musehub.models.wire import ( WireCommit, WireFetchRequest, WireObject, WireSnapshot, ) # Hub-side genesis functions from musehub.core.genesis import ( compute_asset_id, compute_bridge_mirror_id, compute_collaborator_id, compute_comment_id, compute_domain_id, compute_domain_install_id, compute_fork_id, compute_identity_id, compute_issue_event_id, compute_issue_id, compute_job_id, compute_key_id, compute_label_id, compute_mist_id, compute_proposal_id, compute_release_id, compute_repo_id, compute_reservation_id, compute_review_id, compute_session_id, compute_tag_id, compute_task_id, compute_webhook_delivery_id, compute_webhook_id, mist_short_id, ) # CLI-side genesis functions (cross-verification) — skip gracefully if not yet present sys.path.insert(0, str(Path.home() / "ecosystem" / "muse")) try: from muse.core.genesis import ( compute_release_id as cli_compute_release_id, compute_tag_id as cli_compute_tag_id, ) _CLI_GENESIS_AVAILABLE = True except ModuleNotFoundError: _CLI_GENESIS_AVAILABLE = False cli_compute_release_id = None # type: ignore[assignment] cli_compute_tag_id = None # type: ignore[assignment] # Algo-agnostic canonical pattern: : # Do NOT tighten to "sha256" only — this pattern must survive hash algorithm upgrades. _CANONICAL_RE = re.compile(r"^[a-z][a-z0-9]*:[0-9a-f]{32,}$") # --------------------------------------------------------------------------- # Shared deterministic inputs # --------------------------------------------------------------------------- _PUBKEY = bytes(range(32)) # 32 deterministic bytes _IDENTITY_ID = compute_identity_id(_PUBKEY) _REPO_ID = compute_repo_id(_IDENTITY_ID, "my-repo", "code", "2026-01-01T00:00:00Z") _ISSUE_ID = compute_issue_id(_REPO_ID, _IDENTITY_ID, "2026-01-02T00:00:00Z") _PROPOSAL_ID = compute_proposal_id(_REPO_ID, _IDENTITY_ID, "feat/x", "main", "2026-01-03T00:00:00Z") _RELEASE_ID = compute_release_id(_REPO_ID, "v1.0.0", "2026-01-04T00:00:00Z") _COMMIT_ID = fake_id("commit-stub") # canonical stub; real commits use compute_commit_id _TAG_ID = compute_tag_id(_REPO_ID, _COMMIT_ID, "emotion:joyful", "2026-01-05T00:00:00Z") _SESSION_ID = compute_session_id(_REPO_ID, _IDENTITY_ID, "2026-01-06T00:00:00Z") _MIST_ID = compute_mist_id(b"hello muse") _COMMENT_ID = compute_comment_id(_ISSUE_ID, _IDENTITY_ID, "2026-01-07T00:00:00Z") _REVIEW_ID = compute_review_id(_PROPOSAL_ID, _IDENTITY_ID, "2026-01-08T00:00:00Z") # Phase 2 genesis IDs _LABEL_ID = compute_label_id(_REPO_ID, "bug", "2026-01-09T00:00:00Z") _ASSET_ID = compute_asset_id(_RELEASE_ID, "v1.0.0-linux-amd64.tar.gz", "2026-01-10T00:00:00Z") _WEBHOOK_ID = compute_webhook_id(_REPO_ID, "https://ci.example.com/hook", "2026-01-11T00:00:00Z") _FORK_REPO_ID = compute_repo_id(_IDENTITY_ID, "my-fork", "code", "2026-01-12T00:00:00Z") _FORK_ID = compute_fork_id(_REPO_ID, _FORK_REPO_ID, "2026-01-13T00:00:00Z") _COLLAB_IDENTITY_ID = compute_identity_id(bytes(range(1, 33))) _COLLABORATOR_ID = compute_collaborator_id(_REPO_ID, _COLLAB_IDENTITY_ID, "2026-01-14T00:00:00Z") _KEY_ID = compute_key_id(_IDENTITY_ID, "ed25519:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") _DOMAIN_ID = compute_domain_id("gabriel", "code", "2026-01-15T00:00:00Z") # Previously random-ID entities — now genesis-addressed _BRIDGE_MIRROR_ID = compute_bridge_mirror_id(_REPO_ID, "https://github.com/gabriel/my-repo.git") _DOMAIN_INSTALL_ID = compute_domain_install_id(_IDENTITY_ID, _DOMAIN_ID) _ISSUE_EVENT_ID = compute_issue_event_id(_ISSUE_ID, "opened", "gabriel", "2026-01-16T00:00:00Z") _JOB_ID = compute_job_id(_REPO_ID, "intel.code", "2026-01-17T00:00:00Z") _WEBHOOK_DELIVERY_ID = compute_webhook_delivery_id(_WEBHOOK_ID, "push", 1, "2026-01-18T00:00:00Z") # Coord entities — genesis-addressed (content-addressed, not random) _TASK_ID = compute_task_id(_REPO_ID, "default", "agent-1", "2026-01-19T00:00:00Z") _RESERVATION_ID = compute_reservation_id( _REPO_ID, "agent-1", ",".join(sorted(["src/engine.py::AudioEngine", "src/mixer.py::Mixer"])), "2026-01-20T00:00:00Z", ) # =========================================================================== # Tier 1 — canonical form # =========================================================================== class TestCanonicalForm: def test_identity_id_canonical(self) -> None: assert _CANONICAL_RE.match(_IDENTITY_ID) def test_repo_id_canonical(self) -> None: assert _CANONICAL_RE.match(_REPO_ID) def test_issue_id_canonical(self) -> None: assert _CANONICAL_RE.match(_ISSUE_ID) def test_proposal_id_canonical(self) -> None: assert _CANONICAL_RE.match(_PROPOSAL_ID) def test_release_id_canonical(self) -> None: assert _CANONICAL_RE.match(_RELEASE_ID) def test_tag_id_canonical(self) -> None: assert _CANONICAL_RE.match(_TAG_ID) def test_session_id_canonical(self) -> None: assert _CANONICAL_RE.match(_SESSION_ID) def test_mist_id_canonical(self) -> None: assert _CANONICAL_RE.match(_MIST_ID) def test_comment_id_canonical(self) -> None: assert _CANONICAL_RE.match(_COMMENT_ID) def test_review_id_canonical(self) -> None: assert _CANONICAL_RE.match(_REVIEW_ID) def test_mist_short_id_is_12_chars(self) -> None: short = mist_short_id(_MIST_ID) assert len(short) == 12 assert re.match(r"^[0-9a-f]{12}$", short) def test_mist_short_id_is_prefix_of_digest(self) -> None: digest = _MIST_ID.removeprefix("sha256:") assert mist_short_id(_MIST_ID) == digest[:12] def test_bridge_mirror_id_canonical(self) -> None: assert _CANONICAL_RE.match(_BRIDGE_MIRROR_ID) def test_domain_install_id_canonical(self) -> None: assert _CANONICAL_RE.match(_DOMAIN_INSTALL_ID) def test_issue_event_id_canonical(self) -> None: assert _CANONICAL_RE.match(_ISSUE_EVENT_ID) def test_job_id_canonical(self) -> None: assert _CANONICAL_RE.match(_JOB_ID) def test_webhook_delivery_id_canonical(self) -> None: assert _CANONICAL_RE.match(_WEBHOOK_DELIVERY_ID) def test_task_id_canonical(self) -> None: assert _CANONICAL_RE.match(_TASK_ID) def test_reservation_id_canonical(self) -> None: assert _CANONICAL_RE.match(_RESERVATION_ID) # =========================================================================== # Tier 2 — determinism and collision resistance # =========================================================================== class TestDeterminism: def test_identity_id_deterministic(self) -> None: assert compute_identity_id(_PUBKEY) == compute_identity_id(_PUBKEY) def test_repo_id_deterministic(self) -> None: args = (_IDENTITY_ID, "repo", "code", "2026-01-01T00:00:00Z") assert compute_repo_id(*args) == compute_repo_id(*args) def test_issue_id_deterministic(self) -> None: args = (_REPO_ID, _IDENTITY_ID, "2026-01-02T00:00:00Z") assert compute_issue_id(*args) == compute_issue_id(*args) def test_different_pubkeys_yield_different_identity_ids(self) -> None: pk_a = bytes(range(32)) pk_b = bytes(range(1, 33)) assert compute_identity_id(pk_a) != compute_identity_id(pk_b) def test_different_slugs_yield_different_repo_ids(self) -> None: a = compute_repo_id(_IDENTITY_ID, "repo-a", "code", "2026-01-01T00:00:00Z") b = compute_repo_id(_IDENTITY_ID, "repo-b", "code", "2026-01-01T00:00:00Z") assert a != b def test_different_domains_yield_different_repo_ids(self) -> None: a = compute_repo_id(_IDENTITY_ID, "repo", "code", "2026-01-01T00:00:00Z") b = compute_repo_id(_IDENTITY_ID, "repo", "music", "2026-01-01T00:00:00Z") assert a != b def test_different_timestamps_yield_different_issue_ids(self) -> None: a = compute_issue_id(_REPO_ID, _IDENTITY_ID, "2026-01-01T00:00:00Z") b = compute_issue_id(_REPO_ID, _IDENTITY_ID, "2026-01-02T00:00:00Z") assert a != b def test_different_branches_yield_different_proposal_ids(self) -> None: a = compute_proposal_id(_REPO_ID, _IDENTITY_ID, "feat/a", "main", "2026-01-01T00:00:00Z") b = compute_proposal_id(_REPO_ID, _IDENTITY_ID, "feat/b", "main", "2026-01-01T00:00:00Z") assert a != b def test_different_tags_yield_different_release_ids(self) -> None: a = compute_release_id(_REPO_ID, "v1.0.0", "2026-01-01T00:00:00Z") b = compute_release_id(_REPO_ID, "v2.0.0", "2026-01-01T00:00:00Z") assert a != b def test_different_labels_yield_different_tag_ids(self) -> None: a = compute_tag_id(_REPO_ID, _COMMIT_ID, "emotion:joyful", "2026-01-01T00:00:00Z") b = compute_tag_id(_REPO_ID, _COMMIT_ID, "emotion:melancholic", "2026-01-01T00:00:00Z") assert a != b def test_different_content_yields_different_mist_ids(self) -> None: assert compute_mist_id(b"hello") != compute_mist_id(b"world") def test_all_entity_ids_are_distinct(self) -> None: """All entity IDs computed from their respective genesis contexts are unique.""" ids = [ _IDENTITY_ID, _REPO_ID, _ISSUE_ID, _PROPOSAL_ID, _RELEASE_ID, _TAG_ID, _SESSION_ID, _MIST_ID, _COMMENT_ID, _REVIEW_ID, ] assert len(ids) == len(set(ids)), "two entity IDs collided" # =========================================================================== # Tier 3 — separator injection safety # =========================================================================== class TestSeparatorInjection: def test_nul_byte_in_slug_does_not_collide(self) -> None: """A slug containing NUL + domain cannot be crafted to match a different (slug, domain) pair.""" # If the separator were not NUL, "a|b" + "|" + "c" could equal "a" + "|" + "b|c". # With NUL separator, NUL inside a field value is structurally impossible in normal usage, # but we verify the function still returns a valid ID for unusual inputs. exotic = compute_repo_id(_IDENTITY_ID, "repo\x00extra", "code", "2026-01-01T00:00:00Z") normal = compute_repo_id(_IDENTITY_ID, "repo", "code\x00extra", "2026-01-01T00:00:00Z") assert _CANONICAL_RE.match(exotic) assert exotic != normal, "NUL in field value must not produce collisions across fields" def test_pipe_in_label_is_safe(self) -> None: a = compute_tag_id(_REPO_ID, _COMMIT_ID, "section|verse", "2026-01-01T00:00:00Z") assert _CANONICAL_RE.match(a) def test_colon_in_label_is_safe(self) -> None: a = compute_tag_id(_REPO_ID, _COMMIT_ID, "emotion:joyful:extra", "2026-01-01T00:00:00Z") assert _CANONICAL_RE.match(a) def test_path_separator_in_branch_is_safe(self) -> None: a = compute_proposal_id(_REPO_ID, _IDENTITY_ID, "feat/nested/branch", "main", "2026-01-01T00:00:00Z") assert _CANONICAL_RE.match(a) def test_sha256_prefix_in_field_is_safe(self) -> None: """Entity IDs used as genesis fields (which start with sha256:) are handled correctly.""" # repo_id and identity_id both start with "sha256:" — verify no double-prefix or truncation. repo = compute_repo_id(_IDENTITY_ID, "test", "code", "2026-01-01T00:00:00Z") issue = compute_issue_id(repo, _IDENTITY_ID, "2026-01-01T00:00:00Z") assert _CANONICAL_RE.match(issue) # =========================================================================== # Tier 3b — Pydantic boundary enforcement # =========================================================================== _VALID_ID = fake_id("valid-id-stub") _FUTURE_ID = "blake3:" + "b" * 64 # algo-agnostic: must also be accepted _BAD_IDS = [ "not-a-sha", "a1b2c3d4-e5f6-7890-abcd-ef1234567890", # plain string, not sha256: f"SHA256:{'a' * 64}", # uppercase algo f"sha256:{'A' * 64}", # uppercase hex "", "sha256:tooshort", ] _DT = datetime(2026, 1, 1, tzinfo=timezone.utc) class TestPydanticBoundaryWireModels: """Wire models reject non-canonical IDs; accept valid canonical forms.""" def test_wire_commit_rejects_bad_commit_id(self) -> None: for bad in _BAD_IDS: with pytest.raises(ValidationError): WireCommit(commit_id=bad) def test_wire_commit_accepts_future_algo(self) -> None: c = WireCommit(commit_id=_FUTURE_ID) assert c.commit_id == _FUTURE_ID def test_wire_snapshot_rejects_bad_id(self) -> None: with pytest.raises(ValidationError): WireSnapshot(snapshot_id="bad-id") def test_wire_snapshot_accepts_future_algo(self) -> None: s = WireSnapshot(snapshot_id=_FUTURE_ID) assert s.snapshot_id == _FUTURE_ID def test_wire_object_rejects_bad_object_id(self) -> None: with pytest.raises(ValidationError): WireObject(object_id="bad", content=b"x") def test_wire_object_accepts_future_algo(self) -> None: o = WireObject(object_id=_FUTURE_ID, content=b"x") assert o.object_id == _FUTURE_ID def test_fetch_request_rejects_bad_want(self) -> None: with pytest.raises(ValidationError): WireFetchRequest(want=["not-valid"], have=[]) def test_fetch_request_rejects_bad_have(self) -> None: with pytest.raises(ValidationError): WireFetchRequest(want=[], have=["not-a-content-id"]) def test_fetch_request_accepts_valid_and_future_ids(self) -> None: r = WireFetchRequest(want=[_VALID_ID, _FUTURE_ID], have=[_VALID_ID]) assert len(r.want) == 2 class TestPydanticBoundaryResponseModels: """Response models reject non-canonical IDs (catches service layer bugs).""" def test_proposal_review_rejects_bad_id(self) -> None: with pytest.raises(ValidationError): ProposalReviewResponse( id="bad-id", proposal_id=_VALID_ID, reviewer_username="gabriel", state="approved", created_at=_DT, ) def test_proposal_review_rejects_bad_proposal_id(self) -> None: with pytest.raises(ValidationError): ProposalReviewResponse( id=_VALID_ID, proposal_id="not-canonical", reviewer_username="gabriel", state="approved", created_at=_DT, ) def test_proposal_review_accepts_future_algo(self) -> None: r = ProposalReviewResponse( id=_FUTURE_ID, proposal_id=_FUTURE_ID, reviewer_username="gabriel", state="approved", created_at=_DT, ) assert r.id == _FUTURE_ID def test_release_asset_rejects_bad_asset_id(self) -> None: with pytest.raises(ValidationError): ReleaseAssetResponse( asset_id="bad", release_id=_VALID_ID, name="f.tar.gz", download_url="https://x.com/f", created_at=_DT, ) def test_release_asset_rejects_bad_release_id(self) -> None: with pytest.raises(ValidationError): ReleaseAssetResponse( asset_id=_VALID_ID, release_id="not-an-id", name="f.tar.gz", download_url="https://x.com/f", created_at=_DT, ) def test_release_asset_accepts_future_algo(self) -> None: a = ReleaseAssetResponse( asset_id=_FUTURE_ID, release_id=_FUTURE_ID, name="f.tar.gz", download_url="https://x.com/f", created_at=_DT, ) assert a.release_id == _FUTURE_ID def test_wire_tag_rejects_bad_tag_id(self) -> None: with pytest.raises(ValidationError): WireTagInput(tag_id="bad", commit_id=_VALID_ID, tag="section:verse") def test_wire_tag_rejects_bad_commit_id(self) -> None: with pytest.raises(ValidationError): WireTagInput(tag_id=_VALID_ID, commit_id="bad", tag="section:verse") def test_wire_tag_accepts_future_algo(self) -> None: t = WireTagInput(tag_id=_FUTURE_ID, commit_id=_FUTURE_ID, tag="section:verse") assert t.tag_id == _FUTURE_ID class TestPydanticBoundaryOptionalFields: """Optional genesis ID fields accept None, valid IDs, reject bad strings.""" def test_create_repo_template_none(self) -> None: r = CreateRepoRequest(name="muse", owner="gabriel") assert r.template_repo_id is None def test_create_repo_template_valid(self) -> None: r = CreateRepoRequest(name="muse", owner="gabriel", template_repo_id=_VALID_ID) assert r.template_repo_id == _VALID_ID def test_create_repo_template_future_algo(self) -> None: r = CreateRepoRequest(name="muse", owner="gabriel", template_repo_id=_FUTURE_ID) assert r.template_repo_id == _FUTURE_ID def test_create_repo_template_bad(self) -> None: with pytest.raises(ValidationError): CreateRepoRequest(name="muse", owner="gabriel", template_repo_id="bad-format") def test_issue_comment_parent_none(self) -> None: assert IssueCommentCreate(body="hi").parent_id is None def test_issue_comment_parent_valid(self) -> None: c = IssueCommentCreate(body="reply", parent_id=_VALID_ID) assert c.parent_id == _VALID_ID def test_issue_comment_parent_future_algo(self) -> None: c = IssueCommentCreate(body="reply", parent_id=_FUTURE_ID) assert c.parent_id == _FUTURE_ID def test_issue_comment_parent_bad(self) -> None: with pytest.raises(ValidationError): IssueCommentCreate(body="reply", parent_id="a1b2c3d4-bad") def test_proposal_comment_parent_none(self) -> None: assert ProposalCommentCreate(body="hi").parent_comment_id is None def test_proposal_comment_parent_valid(self) -> None: c = ProposalCommentCreate(body="reply", parent_comment_id=_VALID_ID) assert c.parent_comment_id == _VALID_ID def test_proposal_comment_parent_future_algo(self) -> None: c = ProposalCommentCreate(body="reply", parent_comment_id=_FUTURE_ID) assert c.parent_comment_id == _FUTURE_ID def test_proposal_comment_parent_bad_format(self) -> None: with pytest.raises(ValidationError): ProposalCommentCreate( body="reply", parent_comment_id="a1b2c3d4-e5f6-7890-abcd-ef1234567890", ) # =========================================================================== # Tier 4 — cross-verification (CLI ↔ hub) # =========================================================================== @pytest.mark.skipif(not _CLI_GENESIS_AVAILABLE, reason="muse.core.genesis not yet implemented in CLI") class TestCrossVerification: def test_tag_id_cli_and_hub_agree(self) -> None: args = (_REPO_ID, _COMMIT_ID, "emotion:joyful", "2026-01-01T00:00:00Z") assert compute_tag_id(*args) == cli_compute_tag_id(*args) def test_release_id_cli_and_hub_agree(self) -> None: args = (_REPO_ID, "v1.0.0", "2026-01-01T00:00:00Z") assert compute_release_id(*args) == cli_compute_release_id(*args) def test_tag_id_cli_returns_canonical(self) -> None: result = cli_compute_tag_id(_REPO_ID, _COMMIT_ID, "v1.0-wip", "2026-01-01T00:00:00Z") assert _CANONICAL_RE.match(result) def test_release_id_cli_returns_canonical(self) -> None: result = cli_compute_release_id(_REPO_ID, "v2.0.0", "2026-01-01T00:00:00Z") assert _CANONICAL_RE.match(result) # =========================================================================== # Tier 5 — derivation chain # =========================================================================== class TestDerivationChain: def test_changing_owner_changes_repo_id(self) -> None: id_a = compute_identity_id(bytes(range(32))) id_b = compute_identity_id(bytes(range(1, 33))) repo_a = compute_repo_id(id_a, "repo", "code", "2026-01-01T00:00:00Z") repo_b = compute_repo_id(id_b, "repo", "code", "2026-01-01T00:00:00Z") assert repo_a != repo_b def test_changing_repo_changes_issue_id(self) -> None: repo_a = compute_repo_id(_IDENTITY_ID, "repo-a", "code", "2026-01-01T00:00:00Z") repo_b = compute_repo_id(_IDENTITY_ID, "repo-b", "code", "2026-01-01T00:00:00Z") issue_a = compute_issue_id(repo_a, _IDENTITY_ID, "2026-01-02T00:00:00Z") issue_b = compute_issue_id(repo_b, _IDENTITY_ID, "2026-01-02T00:00:00Z") assert issue_a != issue_b def test_changing_issue_changes_comment_id(self) -> None: issue_a = compute_issue_id(_REPO_ID, _IDENTITY_ID, "2026-01-01T00:00:00Z") issue_b = compute_issue_id(_REPO_ID, _IDENTITY_ID, "2026-01-02T00:00:00Z") comment_a = compute_comment_id(issue_a, _IDENTITY_ID, "2026-01-03T00:00:00Z") comment_b = compute_comment_id(issue_b, _IDENTITY_ID, "2026-01-03T00:00:00Z") assert comment_a != comment_b def test_changing_proposal_changes_review_id(self) -> None: prop_a = compute_proposal_id(_REPO_ID, _IDENTITY_ID, "feat/a", "main", "2026-01-01T00:00:00Z") prop_b = compute_proposal_id(_REPO_ID, _IDENTITY_ID, "feat/b", "main", "2026-01-01T00:00:00Z") review_a = compute_review_id(prop_a, _IDENTITY_ID, "2026-01-02T00:00:00Z") review_b = compute_review_id(prop_b, _IDENTITY_ID, "2026-01-02T00:00:00Z") assert review_a != review_b def test_identity_repo_issue_comment_chain_is_fully_verifiable(self) -> None: """Full four-level chain: identity → repo → issue → comment.""" pk = bytes(range(32)) identity_id = compute_identity_id(pk) repo_id = compute_repo_id(identity_id, "chain-test", "code", "2026-01-01T00:00:00Z") issue_id = compute_issue_id(repo_id, identity_id, "2026-01-02T00:00:00Z") comment_id = compute_comment_id(issue_id, identity_id, "2026-01-03T00:00:00Z") # Every level is canonical for entity_id in (identity_id, repo_id, issue_id, comment_id): assert _CANONICAL_RE.match(entity_id), f"non-canonical: {entity_id}" # Mutating the pubkey propagates through the entire chain pk2 = bytes(range(1, 33)) identity_id2 = compute_identity_id(pk2) repo_id2 = compute_repo_id(identity_id2, "chain-test", "code", "2026-01-01T00:00:00Z") issue_id2 = compute_issue_id(repo_id2, identity_id2, "2026-01-02T00:00:00Z") comment_id2 = compute_comment_id(issue_id2, identity_id2, "2026-01-03T00:00:00Z") assert identity_id != identity_id2 assert repo_id != repo_id2 assert issue_id != issue_id2 assert comment_id != comment_id2 # =========================================================================== # Tier 6 — service-layer contract (unit, mocked DB) # # Every service function that creates a first-class entity must compute its ID # from genesis context — not randomly generated. Tests here are RED until Phase 4 # is implemented and will stay GREEN thereafter. # =========================================================================== class TestServiceLayerGenesisIds: """Service creation functions assign genesis-addressed IDs, never random IDs.""" # ------------------------------------------------------------------ # Helpers shared across tests # ------------------------------------------------------------------ def _async_session(self, *, execute_returns: MagicMock | None = None) -> "AsyncMock": """Return a minimal AsyncMock DB session.""" from unittest.mock import AsyncMock, MagicMock session = AsyncMock() scalar = MagicMock() scalar.scalar_one_or_none.return_value = execute_returns scalar.scalar_one.return_value = None session.execute.return_value = scalar session.flush = AsyncMock() session.commit = AsyncMock() session.delete = AsyncMock() async def _refresh(obj: MagicMock) -> None: from datetime import datetime, timezone for attr in ("created_at", "updated_at"): if not getattr(obj, attr, None): try: setattr(obj, attr, datetime.now(timezone.utc)) except Exception: pass for attr in ("last_used_at",): if not hasattr(obj, attr): try: setattr(obj, attr, None) except Exception: pass session.refresh = _refresh return session def _keypair(self) -> None: from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat priv = Ed25519PrivateKey.generate() pub = priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) return priv, pub # ------------------------------------------------------------------ # 6.1 Identity — register_agent_identity # ------------------------------------------------------------------ @pytest.mark.asyncio async def test_register_agent_identity_uses_compute_identity_id(self) -> None: """register_agent_identity assigns identity_id = compute_identity_id(pub_bytes).""" from muse.core.types import encode_pubkey, public_key_fingerprint from musehub.services.musehub_auth import register_agent_identity _, pub = self._keypair() pub_b64 = encode_pubkey("ed25519", pub) fp = public_key_fingerprint(pub) expected = compute_identity_id(pub) session = self._async_session() await register_agent_identity( session=session, handle="test-agent", public_key_b64=pub_b64, fingerprint=fp, algorithm="ed25519", spawned_by="gabriel", ) # First add() is the MusehubIdentity row. identity = session.add.call_args_list[0][0][0] assert identity.identity_id == expected, ( f"expected genesis ID {expected!r}, got {identity.identity_id!r}" ) assert _CANONICAL_RE.match(identity.identity_id) # ------------------------------------------------------------------ # 6.2 Session — upsert_session # ------------------------------------------------------------------ @pytest.mark.asyncio async def test_upsert_session_uses_compute_session_id(self) -> None: """upsert_session assigns session_id = compute_session_id(repo_id, author_identity_id, started_at).""" from unittest.mock import MagicMock, patch from datetime import datetime, timezone from musehub.models.musehub import SessionCreate from musehub.services.musehub_sessions import upsert_session repo_id = _REPO_ID author_identity_id = _IDENTITY_ID started_at = datetime(2026, 1, 6, 0, 0, 0, tzinfo=timezone.utc) expected = compute_session_id(repo_id, author_identity_id, started_at.isoformat()) data = SessionCreate(started_at=started_at, participants=[], intent="", location="") session = self._async_session() with patch("musehub.services.musehub_sessions._to_response", return_value=MagicMock()): await upsert_session( session, repo_id=repo_id, author_identity_id=author_identity_id, data=data, ) added = session.add.call_args_list[0][0][0] assert added.session_id == expected assert _CANONICAL_RE.match(added.session_id) # ------------------------------------------------------------------ # 6.3 Issue — create_issue # ------------------------------------------------------------------ @pytest.mark.asyncio async def test_create_issue_uses_compute_issue_id(self) -> None: """create_issue assigns issue_id = compute_issue_id(repo_id, author_identity_id, created_at).""" from unittest.mock import AsyncMock, MagicMock, patch from datetime import datetime, timezone from musehub.services import musehub_issues repo_id = _REPO_ID author_identity_id = _IDENTITY_ID fixed_now = datetime(2026, 1, 2, 0, 0, 0, tzinfo=timezone.utc) expected = compute_issue_id(repo_id, author_identity_id, fixed_now.isoformat()) session = self._async_session() # _next_issue_number calls session.execute(...).scalar_one_or_none() # Return None so next number = 1. session.execute.return_value.scalar_one_or_none.return_value = None with patch("musehub.services.musehub_issues.datetime") as mock_dt: mock_dt.now.return_value = fixed_now mock_dt.side_effect = lambda *a, **kw: datetime(*a, **kw) await musehub_issues.create_issue( session, repo_id=repo_id, title="Test issue", body="body", labels=[], author="gabriel", author_identity_id=author_identity_id, ) added = session.add.call_args_list[0][0][0] assert added.issue_id == expected assert _CANONICAL_RE.match(added.issue_id) # ------------------------------------------------------------------ # 6.4 Proposal — create_proposal # ------------------------------------------------------------------ @pytest.mark.asyncio async def test_create_proposal_uses_compute_proposal_id(self) -> None: """create_proposal assigns proposal_id = compute_proposal_id(...).""" from unittest.mock import AsyncMock, MagicMock, patch from datetime import datetime, timezone from musehub.services import musehub_proposals repo_id = _REPO_ID author_identity_id = _IDENTITY_ID from_branch = "feat/x" to_branch = "main" fixed_now = datetime(2026, 1, 3, 0, 0, 0, tzinfo=timezone.utc) expected = compute_proposal_id(repo_id, author_identity_id, from_branch, to_branch, fixed_now.isoformat()) # _get_branch makes a DB call — return a fake branch row. from unittest.mock import MagicMock fake_branch = MagicMock() fake_branch.head_commit_id = fake_id("branch-head-stub") session = self._async_session() # First execute: _get_branch → returns branch # Second execute: max proposal_number → returns None # Third execute: _touched_symbols → returns [] from unittest.mock import AsyncMock results = [ MagicMock(**{"scalar_one_or_none.return_value": fake_branch}), # _get_branch(from_branch) MagicMock(**{"scalar_one_or_none.return_value": None}), # _get_branch(to_branch) MagicMock(**{"scalar_one_or_none.return_value": None}), # max proposal_number MagicMock(**{"scalars.return_value.all.return_value": []}), # _touched_symbols ] session.execute.side_effect = results with patch("musehub.services.musehub_proposals._utc_now", return_value=fixed_now): await musehub_proposals.create_proposal( session, repo_id=repo_id, title="Test proposal", from_branch=from_branch, to_branch=to_branch, body="", author="gabriel", author_identity_id=author_identity_id, ) added = session.add.call_args_list[0][0][0] assert added.proposal_id == expected assert _CANONICAL_RE.match(added.proposal_id) # ------------------------------------------------------------------ # 6.5 Repo — create_repo # ------------------------------------------------------------------ @pytest.mark.asyncio async def test_create_repo_uses_compute_repo_id(self) -> None: """create_repo assigns repo_id = compute_repo_id(owner_user_id, slug, domain, created_at).""" from unittest.mock import patch from datetime import datetime, timezone from musehub.services import musehub_repository owner_identity_id = _IDENTITY_ID slug = "my-repo" domain = "code" fixed_now = datetime(2026, 1, 1, 0, 0, 0, tzinfo=timezone.utc) expected = compute_repo_id(owner_identity_id, slug, domain, fixed_now.isoformat()) from unittest.mock import AsyncMock session = self._async_session() # template lookup returns None; no other DB reads needed. session.get = AsyncMock(return_value=None) from unittest.mock import MagicMock with patch("musehub.services.musehub_repository.datetime") as mock_dt, \ patch("musehub.services.musehub_repository._to_repo_response", return_value=MagicMock()): mock_dt.now.return_value = fixed_now mock_dt.side_effect = lambda *a, **kw: datetime(*a, **kw) await musehub_repository.create_repo( session, name="My Repo", owner="gabriel", visibility="public", owner_user_id=compute_identity_id(b"gabriel"), owner_identity_id=owner_identity_id, domain=domain, ) added = session.add.call_args_list[0][0][0] assert added.repo_id == expected assert _CANONICAL_RE.match(added.repo_id) # =========================================================================== # Phase 2 — Tier 1: canonical form for new genesis functions # =========================================================================== class TestPhase2CanonicalForm: def test_label_id_canonical(self) -> None: assert _CANONICAL_RE.match(_LABEL_ID) def test_asset_id_canonical(self) -> None: assert _CANONICAL_RE.match(_ASSET_ID) def test_webhook_id_canonical(self) -> None: assert _CANONICAL_RE.match(_WEBHOOK_ID) def test_fork_id_canonical(self) -> None: assert _CANONICAL_RE.match(_FORK_ID) def test_collaborator_id_canonical(self) -> None: assert _CANONICAL_RE.match(_COLLABORATOR_ID) def test_key_id_canonical(self) -> None: assert _CANONICAL_RE.match(_KEY_ID) def test_domain_id_canonical(self) -> None: assert _CANONICAL_RE.match(_DOMAIN_ID) # =========================================================================== # Phase 2 — Tier 2: determinism and collision resistance # =========================================================================== class TestPhase2Determinism: def test_label_id_deterministic(self) -> None: args = (_REPO_ID, "bug", "2026-01-09T00:00:00Z") assert compute_label_id(*args) == compute_label_id(*args) def test_label_different_names_differ(self) -> None: a = compute_label_id(_REPO_ID, "bug", "2026-01-09T00:00:00Z") b = compute_label_id(_REPO_ID, "enhancement", "2026-01-09T00:00:00Z") assert a != b def test_label_different_repos_differ(self) -> None: repo2 = compute_repo_id(_IDENTITY_ID, "other-repo", "code", "2026-01-01T00:00:00Z") a = compute_label_id(_REPO_ID, "bug", "2026-01-09T00:00:00Z") b = compute_label_id(repo2, "bug", "2026-01-09T00:00:00Z") assert a != b def test_asset_id_deterministic(self) -> None: args = (_RELEASE_ID, "v1.0.0-linux-amd64.tar.gz", "2026-01-10T00:00:00Z") assert compute_asset_id(*args) == compute_asset_id(*args) def test_asset_different_filenames_differ(self) -> None: a = compute_asset_id(_RELEASE_ID, "linux.tar.gz", "2026-01-10T00:00:00Z") b = compute_asset_id(_RELEASE_ID, "darwin.tar.gz", "2026-01-10T00:00:00Z") assert a != b def test_webhook_id_deterministic(self) -> None: args = (_REPO_ID, "https://ci.example.com/hook", "2026-01-11T00:00:00Z") assert compute_webhook_id(*args) == compute_webhook_id(*args) def test_webhook_different_urls_differ(self) -> None: a = compute_webhook_id(_REPO_ID, "https://ci.example.com/a", "2026-01-11T00:00:00Z") b = compute_webhook_id(_REPO_ID, "https://ci.example.com/b", "2026-01-11T00:00:00Z") assert a != b def test_fork_id_deterministic(self) -> None: args = (_REPO_ID, _FORK_REPO_ID, "2026-01-13T00:00:00Z") assert compute_fork_id(*args) == compute_fork_id(*args) def test_fork_different_source_repos_differ(self) -> None: repo2 = compute_repo_id(_IDENTITY_ID, "other-repo", "code", "2026-01-01T00:00:00Z") a = compute_fork_id(_REPO_ID, _FORK_REPO_ID, "2026-01-13T00:00:00Z") b = compute_fork_id(repo2, _FORK_REPO_ID, "2026-01-13T00:00:00Z") assert a != b def test_collaborator_id_deterministic(self) -> None: args = (_REPO_ID, _COLLAB_IDENTITY_ID, "2026-01-14T00:00:00Z") assert compute_collaborator_id(*args) == compute_collaborator_id(*args) def test_collaborator_different_identities_differ(self) -> None: id2 = compute_identity_id(bytes(range(2, 34))) a = compute_collaborator_id(_REPO_ID, _COLLAB_IDENTITY_ID, "2026-01-14T00:00:00Z") b = compute_collaborator_id(_REPO_ID, id2, "2026-01-14T00:00:00Z") assert a != b def test_key_id_deterministic(self) -> None: args = (_IDENTITY_ID, "ed25519:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") assert compute_key_id(*args) == compute_key_id(*args) def test_key_different_pubkeys_differ(self) -> None: a = compute_key_id(_IDENTITY_ID, "ed25519:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") b = compute_key_id(_IDENTITY_ID, "ed25519:BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB") assert a != b def test_key_no_timestamp_by_design(self) -> None: """compute_key_id takes no timestamp — a pubkey can only be registered once per identity.""" # Two calls with identical args must produce identical IDs (idempotent registration). id1 = compute_key_id(_IDENTITY_ID, "ed25519:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") id2 = compute_key_id(_IDENTITY_ID, "ed25519:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA") assert id1 == id2 def test_domain_id_deterministic(self) -> None: args = ("gabriel", "code", "2026-01-15T00:00:00Z") assert compute_domain_id(*args) == compute_domain_id(*args) def test_domain_different_slugs_differ(self) -> None: a = compute_domain_id("gabriel", "code", "2026-01-15T00:00:00Z") b = compute_domain_id("gabriel", "music", "2026-01-15T00:00:00Z") assert a != b def test_all_phase2_ids_are_distinct(self) -> None: ids = [_LABEL_ID, _ASSET_ID, _WEBHOOK_ID, _FORK_ID, _COLLABORATOR_ID, _KEY_ID, _DOMAIN_ID] assert len(ids) == len(set(ids)), "two Phase 2 entity IDs collided" def test_phase2_ids_distinct_from_phase1_ids(self) -> None: phase1 = {_IDENTITY_ID, _REPO_ID, _ISSUE_ID, _PROPOSAL_ID, _RELEASE_ID, _TAG_ID, _SESSION_ID, _MIST_ID, _COMMENT_ID, _REVIEW_ID} phase2 = {_LABEL_ID, _ASSET_ID, _WEBHOOK_ID, _FORK_ID, _COLLABORATOR_ID, _KEY_ID, _DOMAIN_ID} assert phase1.isdisjoint(phase2), "a Phase 2 ID collided with a Phase 1 ID" # =========================================================================== # Phase 2 — Tier 3: separator injection safety # =========================================================================== class TestPhase2SeparatorInjection: def test_label_nul_in_name_is_safe_canonical(self) -> None: # NUL within a field value is structurally unusual; the function must # still return a valid canonical ID even for exotic inputs. a = compute_label_id(_REPO_ID, "bug\x00feature", "2026-01-09T00:00:00Z") assert _CANONICAL_RE.match(a) def test_webhook_url_with_colons_is_safe(self) -> None: url = "https://user:pass@ci.example.com:8080/hook" result = compute_webhook_id(_REPO_ID, url, "2026-01-11T00:00:00Z") assert _CANONICAL_RE.match(result) def test_domain_slug_with_path_separator_is_safe(self) -> None: result = compute_domain_id("gabriel", "audio/midi", "2026-01-15T00:00:00Z") assert _CANONICAL_RE.match(result) def test_key_id_with_base64_padding_chars_is_safe(self) -> None: pubkey = "ed25519:ABC+/DEF==padded==" result = compute_key_id(_IDENTITY_ID, pubkey) assert _CANONICAL_RE.match(result) def test_fork_nul_in_repo_id_does_not_collide(self) -> None: # Synthesize two repo IDs that differ only in NUL placement a = compute_fork_id(_REPO_ID + "\x00x", _FORK_REPO_ID, "2026-01-13T00:00:00Z") b = compute_fork_id(_REPO_ID, "\x00x" + _FORK_REPO_ID, "2026-01-13T00:00:00Z") assert _CANONICAL_RE.match(a) assert a != b # =========================================================================== # Phase 2 — Tier 3b: Pydantic boundary enforcement for Phase 2 models # =========================================================================== class TestPydanticBoundaryPhase2: # LabelResponse ────────────────────────────────────────────────────────── def test_label_response_rejects_bad_label_id(self) -> None: with pytest.raises(ValidationError): LabelResponse( label_id="not-canonical", repo_id=_VALID_ID, name="bug", color="#d73a4a", description=None, created_at=_DT, ) def test_label_response_rejects_bad_repo_id(self) -> None: with pytest.raises(ValidationError): LabelResponse( label_id=_VALID_ID, repo_id="not-a-content-id", name="bug", color="#d73a4a", description=None, created_at=_DT, ) def test_label_response_accepts_future_algo(self) -> None: r = LabelResponse( label_id=_FUTURE_ID, repo_id=_FUTURE_ID, name="bug", color="#d73a4a", description=None, created_at=_DT, ) assert r.label_id == _FUTURE_ID # CollaboratorResponse ─────────────────────────────────────────────────── def test_collaborator_response_rejects_bad_collaborator_id(self) -> None: with pytest.raises(ValidationError): CollaboratorResponse( collaborator_id="bad-id", repo_id=_VALID_ID, handle="alice", permission="write", invited_by=None, ) def test_collaborator_response_rejects_bad_repo_id(self) -> None: with pytest.raises(ValidationError): CollaboratorResponse( collaborator_id=_VALID_ID, repo_id="not-genesis", handle="alice", permission="write", invited_by=None, ) def test_collaborator_response_accepts_future_algo(self) -> None: r = CollaboratorResponse( collaborator_id=_FUTURE_ID, repo_id=_FUTURE_ID, handle="alice", permission="write", invited_by=None, ) assert r.collaborator_id == _FUTURE_ID # WebhookResponse ──────────────────────────────────────────────────────── def test_webhook_response_rejects_bad_webhook_id(self) -> None: with pytest.raises(ValidationError): WebhookResponse( webhook_id="not-an-id", repo_id=_VALID_ID, url="https://ci.example.com/hook", events=["push"], active=True, created_at=_DT, updated_at=_DT, ) def test_webhook_response_rejects_bad_repo_id(self) -> None: with pytest.raises(ValidationError): WebhookResponse( webhook_id=_VALID_ID, repo_id="bad", url="https://ci.example.com/hook", events=["push"], active=True, created_at=_DT, updated_at=_DT, ) def test_webhook_response_accepts_future_algo(self) -> None: r = WebhookResponse( webhook_id=_FUTURE_ID, repo_id=_FUTURE_ID, url="https://ci.example.com/hook", events=["push"], active=True, created_at=_DT, updated_at=_DT, ) assert r.webhook_id == _FUTURE_ID # UserForkedRepoEntry ──────────────────────────────────────────────────── def _make_fork_repo_response(self, repo_id: str = _VALID_ID) -> None: from musehub.models.musehub import RepoResponse return RepoResponse( repo_id=repo_id, name="my-fork", owner="alice", slug="my-fork", visibility="public", owner_user_id=_VALID_ID, clone_url="https://musehub.ai/api/repos/x", created_at=_DT, updated_at=_DT, ) def test_forked_repo_entry_rejects_bad_fork_id(self) -> None: with pytest.raises(ValidationError): UserForkedRepoEntry( fork_id="not-a-content-id", fork_repo=self._make_fork_repo_response(), source_owner="gabriel", source_slug="original-repo", forked_at=_DT, ) def test_forked_repo_entry_rejects_bad_source_repo_id(self) -> None: # UserForkedRepoEntry doesn't hold source_repo_id directly — fork_id is the only genesis field # Validate that a bad fork_id fails and a good one passes even if source info is minimal with pytest.raises(ValidationError): UserForkedRepoEntry( fork_id="bad-id", fork_repo=self._make_fork_repo_response(), source_owner="gabriel", source_slug="original-repo", forked_at=_DT, ) def test_forked_repo_entry_rejects_bad_fork_repo_id(self) -> None: # The nested RepoResponse.repo_id is also genesis-validated with pytest.raises(ValidationError): UserForkedRepoEntry( fork_id=_VALID_ID, fork_repo=self._make_fork_repo_response(repo_id="bad-id"), source_owner="gabriel", source_slug="original-repo", forked_at=_DT, ) def test_forked_repo_entry_accepts_future_algo(self) -> None: from musehub.models.musehub import RepoResponse fake_repo = RepoResponse( repo_id=_FUTURE_ID, name="my-fork", owner="alice", slug="my-fork", visibility="public", owner_user_id=_FUTURE_ID, clone_url="https://musehub.ai/api/repos/x", created_at=_DT, updated_at=_DT, ) r = UserForkedRepoEntry( fork_id=_FUTURE_ID, fork_repo=fake_repo, source_owner="gabriel", source_slug="original-repo", forked_at=_DT, ) assert r.fork_id == _FUTURE_ID # =========================================================================== # Phase 2 — Tier 6: service-layer genesis contract tests # =========================================================================== class TestServiceLayerPhase2GenesisIds: """Phase 2 service creation functions assign genesis-addressed IDs, never random IDs.""" def _async_session(self, *, execute_returns: MagicMock | None = None) -> "AsyncMock": from unittest.mock import AsyncMock, MagicMock session = AsyncMock() scalar = MagicMock() scalar.scalar_one_or_none.return_value = execute_returns session.execute.return_value = scalar session.commit = AsyncMock() session.flush = AsyncMock() async def _refresh(obj: MagicMock) -> None: from datetime import datetime, timezone for attr in ("created_at", "updated_at"): if not getattr(obj, attr, None): try: setattr(obj, attr, datetime.now(timezone.utc)) except Exception: pass session.refresh = _refresh return session # 6.6 Label — create_label ───────────────────────────────────────────── @pytest.mark.asyncio async def test_create_label_uses_compute_label_id(self) -> None: """create_label calls compute_label_id(repo_id, name, iso_ts) — not a random ID.""" from unittest.mock import patch, AsyncMock, MagicMock import musehub.api.routes.musehub.labels as labels_module repo_id = _REPO_ID name = "bug" captured: list[tuple] = [] _real = compute_label_id def _spy(r: str, n: str, t: str) -> None: result = _real(r, n, t) captured.append((r, n, t, result)) return result db = self._async_session() # _guard_repo_owner → get_repo + check_write_access; uniqueness check → no duplicate db.execute.return_value.scalar_one_or_none.return_value = None with patch("musehub.api.routes.musehub.labels.musehub_repository") as mock_svc, \ patch("musehub.api.routes.musehub.labels.compute_label_id", side_effect=_spy): fake_repo = MagicMock() mock_svc.get_repo = AsyncMock(return_value=fake_repo) mock_svc.check_write_access = AsyncMock(return_value=True) try: await labels_module.create_label( repo_id=repo_id, body=labels_module.LabelCreate(name=name, color="#d73a4a"), db=db, token=MagicMock(handle="gabriel"), ) except Exception: pass assert captured, "compute_label_id was never called — label creation is broken" call_repo_id, call_name, call_ts, call_result = captured[0] assert call_repo_id == repo_id assert call_name == name assert _CANONICAL_RE.match(call_result) # 6.7 Webhook — create_webhook ───────────────────────────────────────── @pytest.mark.asyncio async def test_create_webhook_uses_compute_webhook_id(self) -> None: """Webhook rows are assigned compute_webhook_id(repo_id, url, created_at_iso).""" from unittest.mock import patch, AsyncMock, MagicMock from musehub.services.musehub_webhook_dispatcher import create_webhook repo_id = _REPO_ID url = "https://ci.example.com/hook" captured: list[tuple] = [] _real = compute_webhook_id def _spy(r: str, u: str, t: str) -> None: result = _real(r, u, t) captured.append((r, u, t, result)) return result db = self._async_session() with patch("musehub.services.musehub_webhook_dispatcher.compute_webhook_id", side_effect=_spy): try: await create_webhook(db, repo_id=repo_id, url=url, events=["push"], secret="") except Exception: pass assert captured, "compute_webhook_id was never called — webhook creation is broken" call_repo_id, call_url, call_ts, call_result = captured[0] assert call_repo_id == repo_id assert call_url == url assert _CANONICAL_RE.match(call_result) # Verify the row passed to session.add has that ID if db.add.called: row = db.add.call_args_list[0][0][0] assert row.webhook_id == call_result # 6.8 Fork — compute_fork_id called at fork creation ─────────────────── @pytest.mark.asyncio async def test_fork_repo_uses_compute_fork_id(self) -> None: """fork_repo calls compute_fork_id(source_repo_id, fork_repo_id, created_at_iso).""" from unittest.mock import patch, AsyncMock, MagicMock from musehub.services.musehub_repository import fork_repo from musehub.models.musehub import ForkRepoRequest from datetime import datetime, timezone source_repo_id = _REPO_ID captured: list[tuple] = [] _real = compute_fork_id def _spy(src: str, frk: str, t: str) -> None: result = _real(src, frk, t) captured.append((src, frk, t, result)) return result fake_source_repo = MagicMock() fake_source_repo.repo_id = source_repo_id fake_source_repo.name = "my-repo" fake_source_repo.slug = "my-repo" fake_source_repo.visibility = "public" fake_source_repo.description = "A test repo" fake_source_repo.domain_id = None fake_source_repo.owner = "gabriel" fake_source_repo.tags = [] db = self._async_session() call_count = 0 def _execute(*a: MagicMock, **kw: MagicMock) -> None: nonlocal call_count call_count += 1 m = MagicMock() if call_count == 1: m.scalar_one_or_none.return_value = fake_source_repo # source repo else: m.scalar_one_or_none.return_value = None # no duplicate / no slug collision return m db.execute.side_effect = _execute # After flush()+refresh(), the fork_repo_row needs a repo_id so compute_fork_id can use it. # The DB would normally auto-assign it; we supply a canonical stub. _fork_repo_id_stub = fake_id("fork-repo-stub") refresh_count = 0 async def _refresh_with_repo_id(obj: MagicMock) -> None: nonlocal refresh_count refresh_count += 1 now = datetime.now(timezone.utc) for attr in ("created_at", "updated_at"): if not getattr(obj, attr, None): try: setattr(obj, attr, now) except Exception: pass # First refresh is for the fork repo row — give it a genesis-style repo_id. if refresh_count == 1 and hasattr(obj, "repo_id") and not getattr(obj, "repo_id", None): try: obj.repo_id = _fork_repo_id_stub except Exception: pass db.refresh = _refresh_with_repo_id with patch("musehub.services.musehub_repository.compute_fork_id", side_effect=_spy): try: await fork_repo( db, source_repo_id=source_repo_id, forked_by_handle="alice", request=ForkRepoRequest(name=None), ) except Exception: pass assert captured, "compute_fork_id was never called — fork creation is broken" call_src, call_frk, call_ts, call_result = captured[0] assert call_src == source_repo_id assert _CANONICAL_RE.match(call_result) # 6.9 Auth key — register_identity key_id ────────────────────────────── @pytest.mark.asyncio async def test_register_key_uses_compute_key_id(self) -> None: """MusehubAuthKey rows use compute_key_id(identity_id, public_key_b64) — no random IDs.""" from muse.core.types import encode_pubkey, public_key_fingerprint from musehub.services.musehub_auth import register_agent_identity from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat priv = Ed25519PrivateKey.generate() pub = priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) pub_b64 = encode_pubkey("ed25519", pub) fp = public_key_fingerprint(pub) expected_identity_id = compute_identity_id(pub) expected_key_id = compute_key_id(expected_identity_id, pub_b64) db = self._async_session() await register_agent_identity( session=db, handle="key-test-agent", public_key_b64=pub_b64, fingerprint=fp, algorithm="ed25519", spawned_by="gabriel", ) # add() call list: [0] = identity, [1] = key assert len(db.add.call_args_list) >= 2 key_row = db.add.call_args_list[1][0][0] assert key_row.key_id == expected_key_id assert _CANONICAL_RE.match(key_row.key_id) # 6.10 Collaborator invite — invite_collaborator ──────────────────────── @pytest.mark.asyncio async def test_invite_collaborator_uses_compute_collaborator_id(self) -> None: """Collaborator rows use compute_collaborator_id(repo_id, identity_id, invited_at_iso).""" from unittest.mock import patch, AsyncMock, MagicMock from datetime import datetime, timezone import musehub.api.routes.musehub.collaborators as collabs_module repo_id = _REPO_ID fixed_now = datetime(2026, 1, 14, 0, 0, 0, tzinfo=timezone.utc) invitee_identity_id = _COLLAB_IDENTITY_ID expected = compute_collaborator_id(repo_id, invitee_identity_id, fixed_now.isoformat()) # Repo owner must match the actor so the 403 guard passes. fake_repo = MagicMock() fake_repo.owner = "gabriel" fake_invitee_identity = MagicMock() fake_invitee_identity.identity_id = invitee_identity_id db = self._async_session() call_count = 0 def _execute(*a: MagicMock, **kw: MagicMock) -> None: nonlocal call_count call_count += 1 m = MagicMock() if call_count == 1: m.scalar_one_or_none.return_value = MagicMock(permission="owner") # actor perm elif call_count == 2: m.scalar_one_or_none.return_value = None # no duplicate else: m.scalar_one_or_none.return_value = fake_invitee_identity # invitee lookup return m db.execute.side_effect = _execute with patch("musehub.api.routes.musehub.collaborators.musehub_repository") as mock_svc, \ patch("musehub.api.routes.musehub.collaborators.datetime") as mock_dt: mock_svc.get_repo = AsyncMock(return_value=fake_repo) mock_dt.now.return_value = fixed_now mock_dt.timezone = timezone try: await collabs_module.invite_collaborator( repo_id=repo_id, body=collabs_module.CollaboratorInviteRequest(handle="alice"), db=db, token=MagicMock(handle="gabriel"), ) except Exception: pass assert db.add.called, "db.add was never called — collaborator row was not created" row = db.add.call_args_list[0][0][0] assert row.id == expected, f"expected {expected!r}, got {row.id!r}" assert _CANONICAL_RE.match(row.id)