gabriel / musehub public
test_genesis_ids.py python
1,455 lines 60.9 KB
Raw
sha256:4992098130166d191cefed0a2821d19cd3cdd3cf50867a4e715c2b30636826c7 fix: repair syntax errors from typing annotation cleanup Sonnet 4.6 20 days ago
1 """TDD: genesis-addressed IDs for every first-class semantic entity.
2
3 CONTRACT (issue #10):
4
5 Every first-class semantic entity in the Muse ecosystem has an identity
6 derived from the minimal immutable facts about the moment it was declared
7 to exist — its genesis context. The formula is universal:
8
9 entity_id = "<algo>:" + hash(NUL.join(genesis_fields)).hexdigest()
10
11 The algorithm prefix is intentionally not hardcoded to "sha256". Any
12 canonical <algo>:<hex> form is valid. This future-proofs the system for
13 hash algorithm upgrades without breaking the validator contract.
14
15 Tier 1 — canonical form
16 All compute_* functions return strings matching ^[a-z][a-z0-9]*:[0-9a-f]{32,}$.
17
18 Tier 2 — determinism
19 Same inputs always produce the same ID. Different inputs always produce
20 different IDs (collision resistance tested with minimal single-field diffs).
21
22 Tier 3 — separator injection safety
23 Field values containing NUL bytes, pipe chars, colons, or path separators
24 do not break the hash or allow crafted collisions.
25
26 Tier 3b — Pydantic boundary enforcement
27 Wire-protocol and response models reject non-canonical IDs at the
28 Pydantic validation boundary. Both request models (untrusted input) and
29 response models (service layer bug detection) are covered. Optional ID
30 fields accept None and valid canonical IDs, reject malformed strings.
31
32 Tier 4 — cross-verification (CLI ↔ hub)
33 Functions that exist in both muse.core.genesis and musehub.core.genesis
34 produce identical output for the same inputs.
35
36 Tier 5 — derivation chain
37 Entity IDs that take other entity IDs as genesis fields (e.g. issue_id
38 takes repo_id) form a verifiable chain: changing the parent ID changes
39 all descendant IDs.
40 """
41
42 from __future__ import annotations
43
44 import re
45 import sys
46 from datetime import datetime, timezone
47 from pathlib import Path
48
49 import pytest
50 from muse.core.types import fake_id
51 from pydantic import ValidationError
52
53 from musehub.models.musehub import (
54 CreateRepoRequest,
55 IssueCommentCreate,
56 ProposalCommentCreate,
57 ProposalReviewResponse,
58 ReleaseAssetResponse,
59 UserForkedRepoEntry,
60 WebhookResponse,
61 WireTagInput,
62 )
63 from musehub.api.routes.musehub.collaborators import CollaboratorResponse
64 from musehub.api.routes.musehub.labels import LabelResponse
65 from musehub.models.wire import (
66 WireCommit,
67 WireFetchRequest,
68 WireObject,
69 WireSnapshot,
70 )
71
72 # Hub-side genesis functions
73 from musehub.core.genesis import (
74 compute_asset_id,
75 compute_bridge_mirror_id,
76 compute_collaborator_id,
77 compute_comment_id,
78 compute_domain_id,
79 compute_domain_install_id,
80 compute_fork_id,
81 compute_identity_id,
82 compute_issue_event_id,
83 compute_issue_id,
84 compute_job_id,
85 compute_key_id,
86 compute_label_id,
87 compute_mist_id,
88 compute_proposal_id,
89 compute_release_id,
90 compute_repo_id,
91 compute_reservation_id,
92 compute_review_id,
93 compute_session_id,
94 compute_tag_id,
95 compute_task_id,
96 compute_webhook_delivery_id,
97 compute_webhook_id,
98 mist_short_id,
99 )
100
101 # CLI-side genesis functions (cross-verification) — skip gracefully if not yet present
102 sys.path.insert(0, str(Path.home() / "ecosystem" / "muse"))
103 try:
104 from muse.core.genesis import (
105 compute_release_id as cli_compute_release_id,
106 compute_tag_id as cli_compute_tag_id,
107 )
108 _CLI_GENESIS_AVAILABLE = True
109 except ModuleNotFoundError:
110 _CLI_GENESIS_AVAILABLE = False
111 cli_compute_release_id = None # type: ignore[assignment]
112 cli_compute_tag_id = None # type: ignore[assignment]
113
114 # Algo-agnostic canonical pattern: <lowercase-algo>:<lowercase-hex, ≥32 chars>
115 # Do NOT tighten to "sha256" only — this pattern must survive hash algorithm upgrades.
116 _CANONICAL_RE = re.compile(r"^[a-z][a-z0-9]*:[0-9a-f]{32,}$")
117
118 # ---------------------------------------------------------------------------
119 # Shared deterministic inputs
120 # ---------------------------------------------------------------------------
121
122 _PUBKEY = bytes(range(32)) # 32 deterministic bytes
123 _IDENTITY_ID = compute_identity_id(_PUBKEY)
124
125 _REPO_ID = compute_repo_id(_IDENTITY_ID, "my-repo", "code", "2026-01-01T00:00:00Z")
126 _ISSUE_ID = compute_issue_id(_REPO_ID, _IDENTITY_ID, "2026-01-02T00:00:00Z")
127 _PROPOSAL_ID = compute_proposal_id(_REPO_ID, _IDENTITY_ID, "feat/x", "main", "2026-01-03T00:00:00Z")
128 _RELEASE_ID = compute_release_id(_REPO_ID, "v1.0.0", "2026-01-04T00:00:00Z")
129 _COMMIT_ID = fake_id("commit-stub") # canonical stub; real commits use compute_commit_id
130 _TAG_ID = compute_tag_id(_REPO_ID, _COMMIT_ID, "emotion:joyful", "2026-01-05T00:00:00Z")
131 _SESSION_ID = compute_session_id(_REPO_ID, _IDENTITY_ID, "2026-01-06T00:00:00Z")
132 _MIST_ID = compute_mist_id(b"hello muse")
133 _COMMENT_ID = compute_comment_id(_ISSUE_ID, _IDENTITY_ID, "2026-01-07T00:00:00Z")
134 _REVIEW_ID = compute_review_id(_PROPOSAL_ID, _IDENTITY_ID, "2026-01-08T00:00:00Z")
135
136 # Phase 2 genesis IDs
137 _LABEL_ID = compute_label_id(_REPO_ID, "bug", "2026-01-09T00:00:00Z")
138 _ASSET_ID = compute_asset_id(_RELEASE_ID, "v1.0.0-linux-amd64.tar.gz", "2026-01-10T00:00:00Z")
139 _WEBHOOK_ID = compute_webhook_id(_REPO_ID, "https://ci.example.com/hook", "2026-01-11T00:00:00Z")
140 _FORK_REPO_ID = compute_repo_id(_IDENTITY_ID, "my-fork", "code", "2026-01-12T00:00:00Z")
141 _FORK_ID = compute_fork_id(_REPO_ID, _FORK_REPO_ID, "2026-01-13T00:00:00Z")
142 _COLLAB_IDENTITY_ID = compute_identity_id(bytes(range(1, 33)))
143 _COLLABORATOR_ID = compute_collaborator_id(_REPO_ID, _COLLAB_IDENTITY_ID, "2026-01-14T00:00:00Z")
144 _KEY_ID = compute_key_id(_IDENTITY_ID, "ed25519:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
145 _DOMAIN_ID = compute_domain_id("gabriel", "code", "2026-01-15T00:00:00Z")
146
147 # Previously random-ID entities — now genesis-addressed
148 _BRIDGE_MIRROR_ID = compute_bridge_mirror_id(_REPO_ID, "https://github.com/gabriel/my-repo.git")
149 _DOMAIN_INSTALL_ID = compute_domain_install_id(_IDENTITY_ID, _DOMAIN_ID)
150 _ISSUE_EVENT_ID = compute_issue_event_id(_ISSUE_ID, "opened", "gabriel", "2026-01-16T00:00:00Z")
151 _JOB_ID = compute_job_id(_REPO_ID, "intel.code", "2026-01-17T00:00:00Z")
152 _WEBHOOK_DELIVERY_ID = compute_webhook_delivery_id(_WEBHOOK_ID, "push", 1, "2026-01-18T00:00:00Z")
153
154 # Coord entities — genesis-addressed (content-addressed, not random)
155 _TASK_ID = compute_task_id(_REPO_ID, "default", "agent-1", "2026-01-19T00:00:00Z")
156 _RESERVATION_ID = compute_reservation_id(
157 _REPO_ID, "agent-1",
158 ",".join(sorted(["src/engine.py::AudioEngine", "src/mixer.py::Mixer"])),
159 "2026-01-20T00:00:00Z",
160 )
161
162
163 # ===========================================================================
164 # Tier 1 — canonical form
165 # ===========================================================================
166
167 class TestCanonicalForm:
168
169 def test_identity_id_canonical(self) -> None:
170 assert _CANONICAL_RE.match(_IDENTITY_ID)
171
172 def test_repo_id_canonical(self) -> None:
173 assert _CANONICAL_RE.match(_REPO_ID)
174
175 def test_issue_id_canonical(self) -> None:
176 assert _CANONICAL_RE.match(_ISSUE_ID)
177
178 def test_proposal_id_canonical(self) -> None:
179 assert _CANONICAL_RE.match(_PROPOSAL_ID)
180
181 def test_release_id_canonical(self) -> None:
182 assert _CANONICAL_RE.match(_RELEASE_ID)
183
184 def test_tag_id_canonical(self) -> None:
185 assert _CANONICAL_RE.match(_TAG_ID)
186
187 def test_session_id_canonical(self) -> None:
188 assert _CANONICAL_RE.match(_SESSION_ID)
189
190 def test_mist_id_canonical(self) -> None:
191 assert _CANONICAL_RE.match(_MIST_ID)
192
193 def test_comment_id_canonical(self) -> None:
194 assert _CANONICAL_RE.match(_COMMENT_ID)
195
196 def test_review_id_canonical(self) -> None:
197 assert _CANONICAL_RE.match(_REVIEW_ID)
198
199 def test_mist_short_id_is_12_chars(self) -> None:
200 short = mist_short_id(_MIST_ID)
201 assert len(short) == 12
202 assert re.match(r"^[0-9a-f]{12}$", short)
203
204 def test_mist_short_id_is_prefix_of_digest(self) -> None:
205 digest = _MIST_ID.removeprefix("sha256:")
206 assert mist_short_id(_MIST_ID) == digest[:12]
207
208 def test_bridge_mirror_id_canonical(self) -> None:
209 assert _CANONICAL_RE.match(_BRIDGE_MIRROR_ID)
210
211 def test_domain_install_id_canonical(self) -> None:
212 assert _CANONICAL_RE.match(_DOMAIN_INSTALL_ID)
213
214 def test_issue_event_id_canonical(self) -> None:
215 assert _CANONICAL_RE.match(_ISSUE_EVENT_ID)
216
217 def test_job_id_canonical(self) -> None:
218 assert _CANONICAL_RE.match(_JOB_ID)
219
220 def test_webhook_delivery_id_canonical(self) -> None:
221 assert _CANONICAL_RE.match(_WEBHOOK_DELIVERY_ID)
222
223 def test_task_id_canonical(self) -> None:
224 assert _CANONICAL_RE.match(_TASK_ID)
225
226 def test_reservation_id_canonical(self) -> None:
227 assert _CANONICAL_RE.match(_RESERVATION_ID)
228
229
230 # ===========================================================================
231 # Tier 2 — determinism and collision resistance
232 # ===========================================================================
233
234 class TestDeterminism:
235
236 def test_identity_id_deterministic(self) -> None:
237 assert compute_identity_id(_PUBKEY) == compute_identity_id(_PUBKEY)
238
239 def test_repo_id_deterministic(self) -> None:
240 args = (_IDENTITY_ID, "repo", "code", "2026-01-01T00:00:00Z")
241 assert compute_repo_id(*args) == compute_repo_id(*args)
242
243 def test_issue_id_deterministic(self) -> None:
244 args = (_REPO_ID, _IDENTITY_ID, "2026-01-02T00:00:00Z")
245 assert compute_issue_id(*args) == compute_issue_id(*args)
246
247 def test_different_pubkeys_yield_different_identity_ids(self) -> None:
248 pk_a = bytes(range(32))
249 pk_b = bytes(range(1, 33))
250 assert compute_identity_id(pk_a) != compute_identity_id(pk_b)
251
252 def test_different_slugs_yield_different_repo_ids(self) -> None:
253 a = compute_repo_id(_IDENTITY_ID, "repo-a", "code", "2026-01-01T00:00:00Z")
254 b = compute_repo_id(_IDENTITY_ID, "repo-b", "code", "2026-01-01T00:00:00Z")
255 assert a != b
256
257 def test_different_domains_yield_different_repo_ids(self) -> None:
258 a = compute_repo_id(_IDENTITY_ID, "repo", "code", "2026-01-01T00:00:00Z")
259 b = compute_repo_id(_IDENTITY_ID, "repo", "music", "2026-01-01T00:00:00Z")
260 assert a != b
261
262 def test_different_timestamps_yield_different_issue_ids(self) -> None:
263 a = compute_issue_id(_REPO_ID, _IDENTITY_ID, "2026-01-01T00:00:00Z")
264 b = compute_issue_id(_REPO_ID, _IDENTITY_ID, "2026-01-02T00:00:00Z")
265 assert a != b
266
267 def test_different_branches_yield_different_proposal_ids(self) -> None:
268 a = compute_proposal_id(_REPO_ID, _IDENTITY_ID, "feat/a", "main", "2026-01-01T00:00:00Z")
269 b = compute_proposal_id(_REPO_ID, _IDENTITY_ID, "feat/b", "main", "2026-01-01T00:00:00Z")
270 assert a != b
271
272 def test_different_tags_yield_different_release_ids(self) -> None:
273 a = compute_release_id(_REPO_ID, "v1.0.0", "2026-01-01T00:00:00Z")
274 b = compute_release_id(_REPO_ID, "v2.0.0", "2026-01-01T00:00:00Z")
275 assert a != b
276
277 def test_different_labels_yield_different_tag_ids(self) -> None:
278 a = compute_tag_id(_REPO_ID, _COMMIT_ID, "emotion:joyful", "2026-01-01T00:00:00Z")
279 b = compute_tag_id(_REPO_ID, _COMMIT_ID, "emotion:melancholic", "2026-01-01T00:00:00Z")
280 assert a != b
281
282 def test_different_content_yields_different_mist_ids(self) -> None:
283 assert compute_mist_id(b"hello") != compute_mist_id(b"world")
284
285 def test_all_entity_ids_are_distinct(self) -> None:
286 """All entity IDs computed from their respective genesis contexts are unique."""
287 ids = [
288 _IDENTITY_ID, _REPO_ID, _ISSUE_ID, _PROPOSAL_ID,
289 _RELEASE_ID, _TAG_ID, _SESSION_ID, _MIST_ID,
290 _COMMENT_ID, _REVIEW_ID,
291 ]
292 assert len(ids) == len(set(ids)), "two entity IDs collided"
293
294
295 # ===========================================================================
296 # Tier 3 — separator injection safety
297 # ===========================================================================
298
299 class TestSeparatorInjection:
300
301 def test_nul_byte_in_slug_does_not_collide(self) -> None:
302 """A slug containing NUL + domain cannot be crafted to match a different (slug, domain) pair."""
303 # If the separator were not NUL, "a|b" + "|" + "c" could equal "a" + "|" + "b|c".
304 # With NUL separator, NUL inside a field value is structurally impossible in normal usage,
305 # but we verify the function still returns a valid ID for unusual inputs.
306 exotic = compute_repo_id(_IDENTITY_ID, "repo\x00extra", "code", "2026-01-01T00:00:00Z")
307 normal = compute_repo_id(_IDENTITY_ID, "repo", "code\x00extra", "2026-01-01T00:00:00Z")
308 assert _CANONICAL_RE.match(exotic)
309 assert exotic != normal, "NUL in field value must not produce collisions across fields"
310
311 def test_pipe_in_label_is_safe(self) -> None:
312 a = compute_tag_id(_REPO_ID, _COMMIT_ID, "section|verse", "2026-01-01T00:00:00Z")
313 assert _CANONICAL_RE.match(a)
314
315 def test_colon_in_label_is_safe(self) -> None:
316 a = compute_tag_id(_REPO_ID, _COMMIT_ID, "emotion:joyful:extra", "2026-01-01T00:00:00Z")
317 assert _CANONICAL_RE.match(a)
318
319 def test_path_separator_in_branch_is_safe(self) -> None:
320 a = compute_proposal_id(_REPO_ID, _IDENTITY_ID, "feat/nested/branch", "main", "2026-01-01T00:00:00Z")
321 assert _CANONICAL_RE.match(a)
322
323 def test_sha256_prefix_in_field_is_safe(self) -> None:
324 """Entity IDs used as genesis fields (which start with sha256:) are handled correctly."""
325 # repo_id and identity_id both start with "sha256:" — verify no double-prefix or truncation.
326 repo = compute_repo_id(_IDENTITY_ID, "test", "code", "2026-01-01T00:00:00Z")
327 issue = compute_issue_id(repo, _IDENTITY_ID, "2026-01-01T00:00:00Z")
328 assert _CANONICAL_RE.match(issue)
329
330
331 # ===========================================================================
332 # Tier 3b — Pydantic boundary enforcement
333 # ===========================================================================
334
335 _VALID_ID = fake_id("valid-id-stub")
336 _FUTURE_ID = "blake3:" + "b" * 64 # algo-agnostic: must also be accepted
337 _BAD_IDS = [
338 "not-a-sha",
339 "a1b2c3d4-e5f6-7890-abcd-ef1234567890", # plain string, not sha256:
340 f"SHA256:{'a' * 64}", # uppercase algo
341 f"sha256:{'A' * 64}", # uppercase hex
342 "",
343 "sha256:tooshort",
344 ]
345 _DT = datetime(2026, 1, 1, tzinfo=timezone.utc)
346
347
348 class TestPydanticBoundaryWireModels:
349 """Wire models reject non-canonical IDs; accept valid canonical forms."""
350
351 def test_wire_commit_rejects_bad_commit_id(self) -> None:
352 for bad in _BAD_IDS:
353 with pytest.raises(ValidationError):
354 WireCommit(commit_id=bad)
355
356 def test_wire_commit_accepts_future_algo(self) -> None:
357 c = WireCommit(commit_id=_FUTURE_ID)
358 assert c.commit_id == _FUTURE_ID
359
360 def test_wire_snapshot_rejects_bad_id(self) -> None:
361 with pytest.raises(ValidationError):
362 WireSnapshot(snapshot_id="bad-id")
363
364 def test_wire_snapshot_accepts_future_algo(self) -> None:
365 s = WireSnapshot(snapshot_id=_FUTURE_ID)
366 assert s.snapshot_id == _FUTURE_ID
367
368 def test_wire_object_rejects_bad_object_id(self) -> None:
369 with pytest.raises(ValidationError):
370 WireObject(object_id="bad", content=b"x")
371
372 def test_wire_object_accepts_future_algo(self) -> None:
373 o = WireObject(object_id=_FUTURE_ID, content=b"x")
374 assert o.object_id == _FUTURE_ID
375
376 def test_fetch_request_rejects_bad_want(self) -> None:
377 with pytest.raises(ValidationError):
378 WireFetchRequest(want=["not-valid"], have=[])
379
380 def test_fetch_request_rejects_bad_have(self) -> None:
381 with pytest.raises(ValidationError):
382 WireFetchRequest(want=[], have=["not-a-content-id"])
383
384 def test_fetch_request_accepts_valid_and_future_ids(self) -> None:
385 r = WireFetchRequest(want=[_VALID_ID, _FUTURE_ID], have=[_VALID_ID])
386 assert len(r.want) == 2
387
388 class TestPydanticBoundaryResponseModels:
389 """Response models reject non-canonical IDs (catches service layer bugs)."""
390
391 def test_proposal_review_rejects_bad_id(self) -> None:
392 with pytest.raises(ValidationError):
393 ProposalReviewResponse(
394 id="bad-id", proposal_id=_VALID_ID,
395 reviewer_username="gabriel", state="approved", created_at=_DT,
396 )
397
398 def test_proposal_review_rejects_bad_proposal_id(self) -> None:
399 with pytest.raises(ValidationError):
400 ProposalReviewResponse(
401 id=_VALID_ID, proposal_id="not-canonical",
402 reviewer_username="gabriel", state="approved", created_at=_DT,
403 )
404
405 def test_proposal_review_accepts_future_algo(self) -> None:
406 r = ProposalReviewResponse(
407 id=_FUTURE_ID, proposal_id=_FUTURE_ID,
408 reviewer_username="gabriel", state="approved", created_at=_DT,
409 )
410 assert r.id == _FUTURE_ID
411
412 def test_release_asset_rejects_bad_asset_id(self) -> None:
413 with pytest.raises(ValidationError):
414 ReleaseAssetResponse(
415 asset_id="bad", release_id=_VALID_ID,
416 name="f.tar.gz", download_url="https://x.com/f", created_at=_DT,
417 )
418
419 def test_release_asset_rejects_bad_release_id(self) -> None:
420 with pytest.raises(ValidationError):
421 ReleaseAssetResponse(
422 asset_id=_VALID_ID, release_id="not-an-id",
423 name="f.tar.gz", download_url="https://x.com/f", created_at=_DT,
424 )
425
426 def test_release_asset_accepts_future_algo(self) -> None:
427 a = ReleaseAssetResponse(
428 asset_id=_FUTURE_ID, release_id=_FUTURE_ID,
429 name="f.tar.gz", download_url="https://x.com/f", created_at=_DT,
430 )
431 assert a.release_id == _FUTURE_ID
432
433 def test_wire_tag_rejects_bad_tag_id(self) -> None:
434 with pytest.raises(ValidationError):
435 WireTagInput(tag_id="bad", commit_id=_VALID_ID, tag="section:verse")
436
437 def test_wire_tag_rejects_bad_commit_id(self) -> None:
438 with pytest.raises(ValidationError):
439 WireTagInput(tag_id=_VALID_ID, commit_id="bad", tag="section:verse")
440
441 def test_wire_tag_accepts_future_algo(self) -> None:
442 t = WireTagInput(tag_id=_FUTURE_ID, commit_id=_FUTURE_ID, tag="section:verse")
443 assert t.tag_id == _FUTURE_ID
444
445
446 class TestPydanticBoundaryOptionalFields:
447 """Optional genesis ID fields accept None, valid IDs, reject bad strings."""
448
449 def test_create_repo_template_none(self) -> None:
450 r = CreateRepoRequest(name="muse", owner="gabriel")
451 assert r.template_repo_id is None
452
453 def test_create_repo_template_valid(self) -> None:
454 r = CreateRepoRequest(name="muse", owner="gabriel", template_repo_id=_VALID_ID)
455 assert r.template_repo_id == _VALID_ID
456
457 def test_create_repo_template_future_algo(self) -> None:
458 r = CreateRepoRequest(name="muse", owner="gabriel", template_repo_id=_FUTURE_ID)
459 assert r.template_repo_id == _FUTURE_ID
460
461 def test_create_repo_template_bad(self) -> None:
462 with pytest.raises(ValidationError):
463 CreateRepoRequest(name="muse", owner="gabriel", template_repo_id="bad-format")
464
465 def test_issue_comment_parent_none(self) -> None:
466 assert IssueCommentCreate(body="hi").parent_id is None
467
468 def test_issue_comment_parent_valid(self) -> None:
469 c = IssueCommentCreate(body="reply", parent_id=_VALID_ID)
470 assert c.parent_id == _VALID_ID
471
472 def test_issue_comment_parent_future_algo(self) -> None:
473 c = IssueCommentCreate(body="reply", parent_id=_FUTURE_ID)
474 assert c.parent_id == _FUTURE_ID
475
476 def test_issue_comment_parent_bad(self) -> None:
477 with pytest.raises(ValidationError):
478 IssueCommentCreate(body="reply", parent_id="a1b2c3d4-bad")
479
480 def test_proposal_comment_parent_none(self) -> None:
481 assert ProposalCommentCreate(body="hi").parent_comment_id is None
482
483 def test_proposal_comment_parent_valid(self) -> None:
484 c = ProposalCommentCreate(body="reply", parent_comment_id=_VALID_ID)
485 assert c.parent_comment_id == _VALID_ID
486
487 def test_proposal_comment_parent_future_algo(self) -> None:
488 c = ProposalCommentCreate(body="reply", parent_comment_id=_FUTURE_ID)
489 assert c.parent_comment_id == _FUTURE_ID
490
491 def test_proposal_comment_parent_bad_format(self) -> None:
492 with pytest.raises(ValidationError):
493 ProposalCommentCreate(
494 body="reply",
495 parent_comment_id="a1b2c3d4-e5f6-7890-abcd-ef1234567890",
496 )
497
498
499 # ===========================================================================
500 # Tier 4 — cross-verification (CLI ↔ hub)
501 # ===========================================================================
502
503 @pytest.mark.skipif(not _CLI_GENESIS_AVAILABLE, reason="muse.core.genesis not yet implemented in CLI")
504 class TestCrossVerification:
505
506 def test_tag_id_cli_and_hub_agree(self) -> None:
507 args = (_REPO_ID, _COMMIT_ID, "emotion:joyful", "2026-01-01T00:00:00Z")
508 assert compute_tag_id(*args) == cli_compute_tag_id(*args)
509
510 def test_release_id_cli_and_hub_agree(self) -> None:
511 args = (_REPO_ID, "v1.0.0", "2026-01-01T00:00:00Z")
512 assert compute_release_id(*args) == cli_compute_release_id(*args)
513
514 def test_tag_id_cli_returns_canonical(self) -> None:
515 result = cli_compute_tag_id(_REPO_ID, _COMMIT_ID, "v1.0-wip", "2026-01-01T00:00:00Z")
516 assert _CANONICAL_RE.match(result)
517
518 def test_release_id_cli_returns_canonical(self) -> None:
519 result = cli_compute_release_id(_REPO_ID, "v2.0.0", "2026-01-01T00:00:00Z")
520 assert _CANONICAL_RE.match(result)
521
522
523 # ===========================================================================
524 # Tier 5 — derivation chain
525 # ===========================================================================
526
527 class TestDerivationChain:
528
529 def test_changing_owner_changes_repo_id(self) -> None:
530 id_a = compute_identity_id(bytes(range(32)))
531 id_b = compute_identity_id(bytes(range(1, 33)))
532 repo_a = compute_repo_id(id_a, "repo", "code", "2026-01-01T00:00:00Z")
533 repo_b = compute_repo_id(id_b, "repo", "code", "2026-01-01T00:00:00Z")
534 assert repo_a != repo_b
535
536 def test_changing_repo_changes_issue_id(self) -> None:
537 repo_a = compute_repo_id(_IDENTITY_ID, "repo-a", "code", "2026-01-01T00:00:00Z")
538 repo_b = compute_repo_id(_IDENTITY_ID, "repo-b", "code", "2026-01-01T00:00:00Z")
539 issue_a = compute_issue_id(repo_a, _IDENTITY_ID, "2026-01-02T00:00:00Z")
540 issue_b = compute_issue_id(repo_b, _IDENTITY_ID, "2026-01-02T00:00:00Z")
541 assert issue_a != issue_b
542
543 def test_changing_issue_changes_comment_id(self) -> None:
544 issue_a = compute_issue_id(_REPO_ID, _IDENTITY_ID, "2026-01-01T00:00:00Z")
545 issue_b = compute_issue_id(_REPO_ID, _IDENTITY_ID, "2026-01-02T00:00:00Z")
546 comment_a = compute_comment_id(issue_a, _IDENTITY_ID, "2026-01-03T00:00:00Z")
547 comment_b = compute_comment_id(issue_b, _IDENTITY_ID, "2026-01-03T00:00:00Z")
548 assert comment_a != comment_b
549
550 def test_changing_proposal_changes_review_id(self) -> None:
551 prop_a = compute_proposal_id(_REPO_ID, _IDENTITY_ID, "feat/a", "main", "2026-01-01T00:00:00Z")
552 prop_b = compute_proposal_id(_REPO_ID, _IDENTITY_ID, "feat/b", "main", "2026-01-01T00:00:00Z")
553 review_a = compute_review_id(prop_a, _IDENTITY_ID, "2026-01-02T00:00:00Z")
554 review_b = compute_review_id(prop_b, _IDENTITY_ID, "2026-01-02T00:00:00Z")
555 assert review_a != review_b
556
557 def test_identity_repo_issue_comment_chain_is_fully_verifiable(self) -> None:
558 """Full four-level chain: identity → repo → issue → comment."""
559 pk = bytes(range(32))
560 identity_id = compute_identity_id(pk)
561 repo_id = compute_repo_id(identity_id, "chain-test", "code", "2026-01-01T00:00:00Z")
562 issue_id = compute_issue_id(repo_id, identity_id, "2026-01-02T00:00:00Z")
563 comment_id = compute_comment_id(issue_id, identity_id, "2026-01-03T00:00:00Z")
564
565 # Every level is canonical
566 for entity_id in (identity_id, repo_id, issue_id, comment_id):
567 assert _CANONICAL_RE.match(entity_id), f"non-canonical: {entity_id}"
568
569 # Mutating the pubkey propagates through the entire chain
570 pk2 = bytes(range(1, 33))
571 identity_id2 = compute_identity_id(pk2)
572 repo_id2 = compute_repo_id(identity_id2, "chain-test", "code", "2026-01-01T00:00:00Z")
573 issue_id2 = compute_issue_id(repo_id2, identity_id2, "2026-01-02T00:00:00Z")
574 comment_id2 = compute_comment_id(issue_id2, identity_id2, "2026-01-03T00:00:00Z")
575
576 assert identity_id != identity_id2
577 assert repo_id != repo_id2
578 assert issue_id != issue_id2
579 assert comment_id != comment_id2
580
581
582 # ===========================================================================
583 # Tier 6 — service-layer contract (unit, mocked DB)
584 #
585 # Every service function that creates a first-class entity must compute its ID
586 # from genesis context — not randomly generated. Tests here are RED until Phase 4
587 # is implemented and will stay GREEN thereafter.
588 # ===========================================================================
589
590
591 class TestServiceLayerGenesisIds:
592 """Service creation functions assign genesis-addressed IDs, never random IDs."""
593
594 # ------------------------------------------------------------------
595 # Helpers shared across tests
596 # ------------------------------------------------------------------
597
598 def _async_session(self, *, execute_returns: MagicMock | None = None) -> "AsyncMock":
599 """Return a minimal AsyncMock DB session."""
600 from unittest.mock import AsyncMock, MagicMock
601
602 session = AsyncMock()
603 scalar = MagicMock()
604 scalar.scalar_one_or_none.return_value = execute_returns
605 scalar.scalar_one.return_value = None
606 session.execute.return_value = scalar
607 session.flush = AsyncMock()
608 session.commit = AsyncMock()
609 session.delete = AsyncMock()
610
611 async def _refresh(obj: MagicMock) -> None:
612 from datetime import datetime, timezone
613 for attr in ("created_at", "updated_at"):
614 if not getattr(obj, attr, None):
615 try:
616 setattr(obj, attr, datetime.now(timezone.utc))
617 except Exception:
618 pass
619 for attr in ("last_used_at",):
620 if not hasattr(obj, attr):
621 try:
622 setattr(obj, attr, None)
623 except Exception:
624 pass
625
626 session.refresh = _refresh
627 return session
628
629 def _keypair(self) -> None:
630 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
631 from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
632 priv = Ed25519PrivateKey.generate()
633 pub = priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
634 return priv, pub
635
636 # ------------------------------------------------------------------
637 # 6.1 Identity — register_agent_identity
638 # ------------------------------------------------------------------
639
640 @pytest.mark.asyncio
641 async def test_register_agent_identity_uses_compute_identity_id(self) -> None:
642 """register_agent_identity assigns identity_id = compute_identity_id(pub_bytes)."""
643 from muse.core.types import encode_pubkey, public_key_fingerprint
644 from musehub.services.musehub_auth import register_agent_identity
645
646 _, pub = self._keypair()
647 pub_b64 = encode_pubkey("ed25519", pub)
648 fp = public_key_fingerprint(pub)
649 expected = compute_identity_id(pub)
650
651 session = self._async_session()
652 await register_agent_identity(
653 session=session,
654 handle="test-agent",
655 public_key_b64=pub_b64,
656 fingerprint=fp,
657 algorithm="ed25519",
658 spawned_by="gabriel",
659 )
660
661 # First add() is the MusehubIdentity row.
662 identity = session.add.call_args_list[0][0][0]
663 assert identity.identity_id == expected, (
664 f"expected genesis ID {expected!r}, got {identity.identity_id!r}"
665 )
666 assert _CANONICAL_RE.match(identity.identity_id)
667
668 # ------------------------------------------------------------------
669 # 6.2 Session — upsert_session
670 # ------------------------------------------------------------------
671
672 @pytest.mark.asyncio
673 async def test_upsert_session_uses_compute_session_id(self) -> None:
674 """upsert_session assigns session_id = compute_session_id(repo_id, author_identity_id, started_at)."""
675 from unittest.mock import MagicMock, patch
676 from datetime import datetime, timezone
677 from musehub.models.musehub import SessionCreate
678 from musehub.services.musehub_sessions import upsert_session
679
680 repo_id = _REPO_ID
681 author_identity_id = _IDENTITY_ID
682 started_at = datetime(2026, 1, 6, 0, 0, 0, tzinfo=timezone.utc)
683 expected = compute_session_id(repo_id, author_identity_id, started_at.isoformat())
684
685 data = SessionCreate(started_at=started_at, participants=[], intent="", location="")
686 session = self._async_session()
687
688 with patch("musehub.services.musehub_sessions._to_response", return_value=MagicMock()):
689 await upsert_session(
690 session,
691 repo_id=repo_id,
692 author_identity_id=author_identity_id,
693 data=data,
694 )
695
696 added = session.add.call_args_list[0][0][0]
697 assert added.session_id == expected
698 assert _CANONICAL_RE.match(added.session_id)
699
700 # ------------------------------------------------------------------
701 # 6.3 Issue — create_issue
702 # ------------------------------------------------------------------
703
704 @pytest.mark.asyncio
705 async def test_create_issue_uses_compute_issue_id(self) -> None:
706 """create_issue assigns issue_id = compute_issue_id(repo_id, author_identity_id, created_at)."""
707 from unittest.mock import AsyncMock, MagicMock, patch
708 from datetime import datetime, timezone
709 from musehub.services import musehub_issues
710
711 repo_id = _REPO_ID
712 author_identity_id = _IDENTITY_ID
713 fixed_now = datetime(2026, 1, 2, 0, 0, 0, tzinfo=timezone.utc)
714 expected = compute_issue_id(repo_id, author_identity_id, fixed_now.isoformat())
715
716 session = self._async_session()
717 # _next_issue_number calls session.execute(...).scalar_one_or_none()
718 # Return None so next number = 1.
719 session.execute.return_value.scalar_one_or_none.return_value = None
720
721 with patch("musehub.services.musehub_issues.datetime") as mock_dt:
722 mock_dt.now.return_value = fixed_now
723 mock_dt.side_effect = lambda *a, **kw: datetime(*a, **kw)
724
725 await musehub_issues.create_issue(
726 session,
727 repo_id=repo_id,
728 title="Test issue",
729 body="body",
730 labels=[],
731 author="gabriel",
732 author_identity_id=author_identity_id,
733 )
734
735 added = session.add.call_args_list[0][0][0]
736 assert added.issue_id == expected
737 assert _CANONICAL_RE.match(added.issue_id)
738
739 # ------------------------------------------------------------------
740 # 6.4 Proposal — create_proposal
741 # ------------------------------------------------------------------
742
743 @pytest.mark.asyncio
744 async def test_create_proposal_uses_compute_proposal_id(self) -> None:
745 """create_proposal assigns proposal_id = compute_proposal_id(...)."""
746 from unittest.mock import AsyncMock, MagicMock, patch
747 from datetime import datetime, timezone
748 from musehub.services import musehub_proposals
749
750 repo_id = _REPO_ID
751 author_identity_id = _IDENTITY_ID
752 from_branch = "feat/x"
753 to_branch = "main"
754 fixed_now = datetime(2026, 1, 3, 0, 0, 0, tzinfo=timezone.utc)
755 expected = compute_proposal_id(repo_id, author_identity_id, from_branch, to_branch, fixed_now.isoformat())
756
757 # _get_branch makes a DB call — return a fake branch row.
758 from unittest.mock import MagicMock
759 fake_branch = MagicMock()
760 fake_branch.head_commit_id = fake_id("branch-head-stub")
761
762 session = self._async_session()
763 # First execute: _get_branch → returns branch
764 # Second execute: max proposal_number → returns None
765 # Third execute: _touched_symbols → returns []
766 from unittest.mock import AsyncMock
767 results = [
768 MagicMock(**{"scalar_one_or_none.return_value": fake_branch}), # _get_branch(from_branch)
769 MagicMock(**{"scalar_one_or_none.return_value": None}), # _get_branch(to_branch)
770 MagicMock(**{"scalar_one_or_none.return_value": None}), # max proposal_number
771 MagicMock(**{"scalars.return_value.all.return_value": []}), # _touched_symbols
772 ]
773 session.execute.side_effect = results
774
775 with patch("musehub.services.musehub_proposals._utc_now", return_value=fixed_now):
776 await musehub_proposals.create_proposal(
777 session,
778 repo_id=repo_id,
779 title="Test proposal",
780 from_branch=from_branch,
781 to_branch=to_branch,
782 body="",
783 author="gabriel",
784 author_identity_id=author_identity_id,
785 )
786
787 added = session.add.call_args_list[0][0][0]
788 assert added.proposal_id == expected
789 assert _CANONICAL_RE.match(added.proposal_id)
790
791 # ------------------------------------------------------------------
792 # 6.5 Repo — create_repo
793 # ------------------------------------------------------------------
794
795 @pytest.mark.asyncio
796 async def test_create_repo_uses_compute_repo_id(self) -> None:
797 """create_repo assigns repo_id = compute_repo_id(owner_user_id, slug, domain, created_at)."""
798 from unittest.mock import patch
799 from datetime import datetime, timezone
800 from musehub.services import musehub_repository
801
802 owner_identity_id = _IDENTITY_ID
803 slug = "my-repo"
804 domain = "code"
805 fixed_now = datetime(2026, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
806 expected = compute_repo_id(owner_identity_id, slug, domain, fixed_now.isoformat())
807
808 from unittest.mock import AsyncMock
809 session = self._async_session()
810 # template lookup returns None; no other DB reads needed.
811 session.get = AsyncMock(return_value=None)
812
813 from unittest.mock import MagicMock
814
815 with patch("musehub.services.musehub_repository.datetime") as mock_dt, \
816 patch("musehub.services.musehub_repository._to_repo_response", return_value=MagicMock()):
817 mock_dt.now.return_value = fixed_now
818 mock_dt.side_effect = lambda *a, **kw: datetime(*a, **kw)
819
820 await musehub_repository.create_repo(
821 session,
822 name="My Repo",
823 owner="gabriel",
824 visibility="public",
825 owner_user_id=compute_identity_id(b"gabriel"),
826 owner_identity_id=owner_identity_id,
827 domain=domain,
828 )
829
830 added = session.add.call_args_list[0][0][0]
831 assert added.repo_id == expected
832 assert _CANONICAL_RE.match(added.repo_id)
833
834
835 # ===========================================================================
836 # Phase 2 — Tier 1: canonical form for new genesis functions
837 # ===========================================================================
838
839
840 class TestPhase2CanonicalForm:
841
842 def test_label_id_canonical(self) -> None:
843 assert _CANONICAL_RE.match(_LABEL_ID)
844
845 def test_asset_id_canonical(self) -> None:
846 assert _CANONICAL_RE.match(_ASSET_ID)
847
848 def test_webhook_id_canonical(self) -> None:
849 assert _CANONICAL_RE.match(_WEBHOOK_ID)
850
851 def test_fork_id_canonical(self) -> None:
852 assert _CANONICAL_RE.match(_FORK_ID)
853
854 def test_collaborator_id_canonical(self) -> None:
855 assert _CANONICAL_RE.match(_COLLABORATOR_ID)
856
857 def test_key_id_canonical(self) -> None:
858 assert _CANONICAL_RE.match(_KEY_ID)
859
860 def test_domain_id_canonical(self) -> None:
861 assert _CANONICAL_RE.match(_DOMAIN_ID)
862
863
864 # ===========================================================================
865 # Phase 2 — Tier 2: determinism and collision resistance
866 # ===========================================================================
867
868
869 class TestPhase2Determinism:
870
871 def test_label_id_deterministic(self) -> None:
872 args = (_REPO_ID, "bug", "2026-01-09T00:00:00Z")
873 assert compute_label_id(*args) == compute_label_id(*args)
874
875 def test_label_different_names_differ(self) -> None:
876 a = compute_label_id(_REPO_ID, "bug", "2026-01-09T00:00:00Z")
877 b = compute_label_id(_REPO_ID, "enhancement", "2026-01-09T00:00:00Z")
878 assert a != b
879
880 def test_label_different_repos_differ(self) -> None:
881 repo2 = compute_repo_id(_IDENTITY_ID, "other-repo", "code", "2026-01-01T00:00:00Z")
882 a = compute_label_id(_REPO_ID, "bug", "2026-01-09T00:00:00Z")
883 b = compute_label_id(repo2, "bug", "2026-01-09T00:00:00Z")
884 assert a != b
885
886 def test_asset_id_deterministic(self) -> None:
887 args = (_RELEASE_ID, "v1.0.0-linux-amd64.tar.gz", "2026-01-10T00:00:00Z")
888 assert compute_asset_id(*args) == compute_asset_id(*args)
889
890 def test_asset_different_filenames_differ(self) -> None:
891 a = compute_asset_id(_RELEASE_ID, "linux.tar.gz", "2026-01-10T00:00:00Z")
892 b = compute_asset_id(_RELEASE_ID, "darwin.tar.gz", "2026-01-10T00:00:00Z")
893 assert a != b
894
895 def test_webhook_id_deterministic(self) -> None:
896 args = (_REPO_ID, "https://ci.example.com/hook", "2026-01-11T00:00:00Z")
897 assert compute_webhook_id(*args) == compute_webhook_id(*args)
898
899 def test_webhook_different_urls_differ(self) -> None:
900 a = compute_webhook_id(_REPO_ID, "https://ci.example.com/a", "2026-01-11T00:00:00Z")
901 b = compute_webhook_id(_REPO_ID, "https://ci.example.com/b", "2026-01-11T00:00:00Z")
902 assert a != b
903
904 def test_fork_id_deterministic(self) -> None:
905 args = (_REPO_ID, _FORK_REPO_ID, "2026-01-13T00:00:00Z")
906 assert compute_fork_id(*args) == compute_fork_id(*args)
907
908 def test_fork_different_source_repos_differ(self) -> None:
909 repo2 = compute_repo_id(_IDENTITY_ID, "other-repo", "code", "2026-01-01T00:00:00Z")
910 a = compute_fork_id(_REPO_ID, _FORK_REPO_ID, "2026-01-13T00:00:00Z")
911 b = compute_fork_id(repo2, _FORK_REPO_ID, "2026-01-13T00:00:00Z")
912 assert a != b
913
914 def test_collaborator_id_deterministic(self) -> None:
915 args = (_REPO_ID, _COLLAB_IDENTITY_ID, "2026-01-14T00:00:00Z")
916 assert compute_collaborator_id(*args) == compute_collaborator_id(*args)
917
918 def test_collaborator_different_identities_differ(self) -> None:
919 id2 = compute_identity_id(bytes(range(2, 34)))
920 a = compute_collaborator_id(_REPO_ID, _COLLAB_IDENTITY_ID, "2026-01-14T00:00:00Z")
921 b = compute_collaborator_id(_REPO_ID, id2, "2026-01-14T00:00:00Z")
922 assert a != b
923
924 def test_key_id_deterministic(self) -> None:
925 args = (_IDENTITY_ID, "ed25519:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
926 assert compute_key_id(*args) == compute_key_id(*args)
927
928 def test_key_different_pubkeys_differ(self) -> None:
929 a = compute_key_id(_IDENTITY_ID, "ed25519:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
930 b = compute_key_id(_IDENTITY_ID, "ed25519:BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB")
931 assert a != b
932
933 def test_key_no_timestamp_by_design(self) -> None:
934 """compute_key_id takes no timestamp — a pubkey can only be registered once per identity."""
935 # Two calls with identical args must produce identical IDs (idempotent registration).
936 id1 = compute_key_id(_IDENTITY_ID, "ed25519:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
937 id2 = compute_key_id(_IDENTITY_ID, "ed25519:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")
938 assert id1 == id2
939
940 def test_domain_id_deterministic(self) -> None:
941 args = ("gabriel", "code", "2026-01-15T00:00:00Z")
942 assert compute_domain_id(*args) == compute_domain_id(*args)
943
944 def test_domain_different_slugs_differ(self) -> None:
945 a = compute_domain_id("gabriel", "code", "2026-01-15T00:00:00Z")
946 b = compute_domain_id("gabriel", "music", "2026-01-15T00:00:00Z")
947 assert a != b
948
949 def test_all_phase2_ids_are_distinct(self) -> None:
950 ids = [_LABEL_ID, _ASSET_ID, _WEBHOOK_ID, _FORK_ID, _COLLABORATOR_ID, _KEY_ID, _DOMAIN_ID]
951 assert len(ids) == len(set(ids)), "two Phase 2 entity IDs collided"
952
953 def test_phase2_ids_distinct_from_phase1_ids(self) -> None:
954 phase1 = {_IDENTITY_ID, _REPO_ID, _ISSUE_ID, _PROPOSAL_ID, _RELEASE_ID, _TAG_ID, _SESSION_ID, _MIST_ID, _COMMENT_ID, _REVIEW_ID}
955 phase2 = {_LABEL_ID, _ASSET_ID, _WEBHOOK_ID, _FORK_ID, _COLLABORATOR_ID, _KEY_ID, _DOMAIN_ID}
956 assert phase1.isdisjoint(phase2), "a Phase 2 ID collided with a Phase 1 ID"
957
958
959 # ===========================================================================
960 # Phase 2 — Tier 3: separator injection safety
961 # ===========================================================================
962
963
964 class TestPhase2SeparatorInjection:
965
966 def test_label_nul_in_name_is_safe_canonical(self) -> None:
967 # NUL within a field value is structurally unusual; the function must
968 # still return a valid canonical ID even for exotic inputs.
969 a = compute_label_id(_REPO_ID, "bug\x00feature", "2026-01-09T00:00:00Z")
970 assert _CANONICAL_RE.match(a)
971
972 def test_webhook_url_with_colons_is_safe(self) -> None:
973 url = "https://user:[email protected]:8080/hook"
974 result = compute_webhook_id(_REPO_ID, url, "2026-01-11T00:00:00Z")
975 assert _CANONICAL_RE.match(result)
976
977 def test_domain_slug_with_path_separator_is_safe(self) -> None:
978 result = compute_domain_id("gabriel", "audio/midi", "2026-01-15T00:00:00Z")
979 assert _CANONICAL_RE.match(result)
980
981 def test_key_id_with_base64_padding_chars_is_safe(self) -> None:
982 pubkey = "ed25519:ABC+/DEF==padded=="
983 result = compute_key_id(_IDENTITY_ID, pubkey)
984 assert _CANONICAL_RE.match(result)
985
986 def test_fork_nul_in_repo_id_does_not_collide(self) -> None:
987 # Synthesize two repo IDs that differ only in NUL placement
988 a = compute_fork_id(_REPO_ID + "\x00x", _FORK_REPO_ID, "2026-01-13T00:00:00Z")
989 b = compute_fork_id(_REPO_ID, "\x00x" + _FORK_REPO_ID, "2026-01-13T00:00:00Z")
990 assert _CANONICAL_RE.match(a)
991 assert a != b
992
993
994 # ===========================================================================
995 # Phase 2 — Tier 3b: Pydantic boundary enforcement for Phase 2 models
996 # ===========================================================================
997
998
999 class TestPydanticBoundaryPhase2:
1000
1001 # LabelResponse ──────────────────────────────────────────────────────────
1002
1003 def test_label_response_rejects_bad_label_id(self) -> None:
1004 with pytest.raises(ValidationError):
1005 LabelResponse(
1006 label_id="not-canonical",
1007 repo_id=_VALID_ID,
1008 name="bug",
1009 color="#d73a4a",
1010 description=None,
1011 created_at=_DT,
1012 )
1013
1014 def test_label_response_rejects_bad_repo_id(self) -> None:
1015 with pytest.raises(ValidationError):
1016 LabelResponse(
1017 label_id=_VALID_ID,
1018 repo_id="not-a-content-id",
1019 name="bug",
1020 color="#d73a4a",
1021 description=None,
1022 created_at=_DT,
1023 )
1024
1025 def test_label_response_accepts_future_algo(self) -> None:
1026 r = LabelResponse(
1027 label_id=_FUTURE_ID,
1028 repo_id=_FUTURE_ID,
1029 name="bug",
1030 color="#d73a4a",
1031 description=None,
1032 created_at=_DT,
1033 )
1034 assert r.label_id == _FUTURE_ID
1035
1036 # CollaboratorResponse ───────────────────────────────────────────────────
1037
1038 def test_collaborator_response_rejects_bad_collaborator_id(self) -> None:
1039 with pytest.raises(ValidationError):
1040 CollaboratorResponse(
1041 collaborator_id="bad-id",
1042 repo_id=_VALID_ID,
1043 handle="alice",
1044 permission="write",
1045 invited_by=None,
1046 )
1047
1048 def test_collaborator_response_rejects_bad_repo_id(self) -> None:
1049 with pytest.raises(ValidationError):
1050 CollaboratorResponse(
1051 collaborator_id=_VALID_ID,
1052 repo_id="not-genesis",
1053 handle="alice",
1054 permission="write",
1055 invited_by=None,
1056 )
1057
1058 def test_collaborator_response_accepts_future_algo(self) -> None:
1059 r = CollaboratorResponse(
1060 collaborator_id=_FUTURE_ID,
1061 repo_id=_FUTURE_ID,
1062 handle="alice",
1063 permission="write",
1064 invited_by=None,
1065 )
1066 assert r.collaborator_id == _FUTURE_ID
1067
1068 # WebhookResponse ────────────────────────────────────────────────────────
1069
1070 def test_webhook_response_rejects_bad_webhook_id(self) -> None:
1071 with pytest.raises(ValidationError):
1072 WebhookResponse(
1073 webhook_id="not-an-id",
1074 repo_id=_VALID_ID,
1075 url="https://ci.example.com/hook",
1076 events=["push"],
1077 active=True,
1078 created_at=_DT,
1079 updated_at=_DT,
1080 )
1081
1082 def test_webhook_response_rejects_bad_repo_id(self) -> None:
1083 with pytest.raises(ValidationError):
1084 WebhookResponse(
1085 webhook_id=_VALID_ID,
1086 repo_id="bad",
1087 url="https://ci.example.com/hook",
1088 events=["push"],
1089 active=True,
1090 created_at=_DT,
1091 updated_at=_DT,
1092 )
1093
1094 def test_webhook_response_accepts_future_algo(self) -> None:
1095 r = WebhookResponse(
1096 webhook_id=_FUTURE_ID,
1097 repo_id=_FUTURE_ID,
1098 url="https://ci.example.com/hook",
1099 events=["push"],
1100 active=True,
1101 created_at=_DT,
1102 updated_at=_DT,
1103 )
1104 assert r.webhook_id == _FUTURE_ID
1105
1106 # UserForkedRepoEntry ────────────────────────────────────────────────────
1107
1108 def _make_fork_repo_response(self, repo_id: str = _VALID_ID) -> None:
1109 from musehub.models.musehub import RepoResponse
1110 return RepoResponse(
1111 repo_id=repo_id,
1112 name="my-fork",
1113 owner="alice",
1114 slug="my-fork",
1115 visibility="public",
1116 owner_user_id=_VALID_ID,
1117 clone_url="https://musehub.ai/api/repos/x",
1118 created_at=_DT,
1119 updated_at=_DT,
1120 )
1121
1122 def test_forked_repo_entry_rejects_bad_fork_id(self) -> None:
1123 with pytest.raises(ValidationError):
1124 UserForkedRepoEntry(
1125 fork_id="not-a-content-id",
1126 fork_repo=self._make_fork_repo_response(),
1127 source_owner="gabriel",
1128 source_slug="original-repo",
1129 forked_at=_DT,
1130 )
1131
1132 def test_forked_repo_entry_rejects_bad_source_repo_id(self) -> None:
1133 # UserForkedRepoEntry doesn't hold source_repo_id directly — fork_id is the only genesis field
1134 # Validate that a bad fork_id fails and a good one passes even if source info is minimal
1135 with pytest.raises(ValidationError):
1136 UserForkedRepoEntry(
1137 fork_id="bad-id",
1138 fork_repo=self._make_fork_repo_response(),
1139 source_owner="gabriel",
1140 source_slug="original-repo",
1141 forked_at=_DT,
1142 )
1143
1144 def test_forked_repo_entry_rejects_bad_fork_repo_id(self) -> None:
1145 # The nested RepoResponse.repo_id is also genesis-validated
1146 with pytest.raises(ValidationError):
1147 UserForkedRepoEntry(
1148 fork_id=_VALID_ID,
1149 fork_repo=self._make_fork_repo_response(repo_id="bad-id"),
1150 source_owner="gabriel",
1151 source_slug="original-repo",
1152 forked_at=_DT,
1153 )
1154
1155 def test_forked_repo_entry_accepts_future_algo(self) -> None:
1156 from musehub.models.musehub import RepoResponse
1157 fake_repo = RepoResponse(
1158 repo_id=_FUTURE_ID,
1159 name="my-fork",
1160 owner="alice",
1161 slug="my-fork",
1162 visibility="public",
1163 owner_user_id=_FUTURE_ID,
1164 clone_url="https://musehub.ai/api/repos/x",
1165 created_at=_DT,
1166 updated_at=_DT,
1167 )
1168 r = UserForkedRepoEntry(
1169 fork_id=_FUTURE_ID,
1170 fork_repo=fake_repo,
1171 source_owner="gabriel",
1172 source_slug="original-repo",
1173 forked_at=_DT,
1174 )
1175 assert r.fork_id == _FUTURE_ID
1176
1177
1178 # ===========================================================================
1179 # Phase 2 — Tier 6: service-layer genesis contract tests
1180 # ===========================================================================
1181
1182
1183 class TestServiceLayerPhase2GenesisIds:
1184 """Phase 2 service creation functions assign genesis-addressed IDs, never random IDs."""
1185
1186 def _async_session(self, *, execute_returns: MagicMock | None = None) -> "AsyncMock":
1187 from unittest.mock import AsyncMock, MagicMock
1188
1189 session = AsyncMock()
1190 scalar = MagicMock()
1191 scalar.scalar_one_or_none.return_value = execute_returns
1192 session.execute.return_value = scalar
1193 session.commit = AsyncMock()
1194 session.flush = AsyncMock()
1195
1196 async def _refresh(obj: MagicMock) -> None:
1197 from datetime import datetime, timezone
1198 for attr in ("created_at", "updated_at"):
1199 if not getattr(obj, attr, None):
1200 try:
1201 setattr(obj, attr, datetime.now(timezone.utc))
1202 except Exception:
1203 pass
1204
1205 session.refresh = _refresh
1206 return session
1207
1208 # 6.6 Label — create_label ─────────────────────────────────────────────
1209
1210 @pytest.mark.asyncio
1211 async def test_create_label_uses_compute_label_id(self) -> None:
1212 """create_label calls compute_label_id(repo_id, name, iso_ts) — not a random ID."""
1213 from unittest.mock import patch, AsyncMock, MagicMock
1214 import musehub.api.routes.musehub.labels as labels_module
1215
1216 repo_id = _REPO_ID
1217 name = "bug"
1218
1219 captured: list[tuple] = []
1220 _real = compute_label_id
1221 def _spy(r: str, n: str, t: str) -> None:
1222 result = _real(r, n, t)
1223 captured.append((r, n, t, result))
1224 return result
1225
1226 db = self._async_session()
1227 # _guard_repo_owner → get_repo + check_write_access; uniqueness check → no duplicate
1228 db.execute.return_value.scalar_one_or_none.return_value = None
1229
1230 with patch("musehub.api.routes.musehub.labels.musehub_repository") as mock_svc, \
1231 patch("musehub.api.routes.musehub.labels.compute_label_id", side_effect=_spy):
1232 fake_repo = MagicMock()
1233 mock_svc.get_repo = AsyncMock(return_value=fake_repo)
1234 mock_svc.check_write_access = AsyncMock(return_value=True)
1235
1236 try:
1237 await labels_module.create_label(
1238 repo_id=repo_id,
1239 body=labels_module.LabelCreate(name=name, color="#d73a4a"),
1240 db=db,
1241 token=MagicMock(handle="gabriel"),
1242 )
1243 except Exception:
1244 pass
1245
1246 assert captured, "compute_label_id was never called — label creation is broken"
1247 call_repo_id, call_name, call_ts, call_result = captured[0]
1248 assert call_repo_id == repo_id
1249 assert call_name == name
1250 assert _CANONICAL_RE.match(call_result)
1251
1252 # 6.7 Webhook — create_webhook ─────────────────────────────────────────
1253
1254 @pytest.mark.asyncio
1255 async def test_create_webhook_uses_compute_webhook_id(self) -> None:
1256 """Webhook rows are assigned compute_webhook_id(repo_id, url, created_at_iso)."""
1257 from unittest.mock import patch, AsyncMock, MagicMock
1258 from musehub.services.musehub_webhook_dispatcher import create_webhook
1259
1260 repo_id = _REPO_ID
1261 url = "https://ci.example.com/hook"
1262
1263 captured: list[tuple] = []
1264 _real = compute_webhook_id
1265 def _spy(r: str, u: str, t: str) -> None:
1266 result = _real(r, u, t)
1267 captured.append((r, u, t, result))
1268 return result
1269
1270 db = self._async_session()
1271
1272 with patch("musehub.services.musehub_webhook_dispatcher.compute_webhook_id", side_effect=_spy):
1273 try:
1274 await create_webhook(db, repo_id=repo_id, url=url, events=["push"], secret="")
1275 except Exception:
1276 pass
1277
1278 assert captured, "compute_webhook_id was never called — webhook creation is broken"
1279 call_repo_id, call_url, call_ts, call_result = captured[0]
1280 assert call_repo_id == repo_id
1281 assert call_url == url
1282 assert _CANONICAL_RE.match(call_result)
1283 # Verify the row passed to session.add has that ID
1284 if db.add.called:
1285 row = db.add.call_args_list[0][0][0]
1286 assert row.webhook_id == call_result
1287
1288 # 6.8 Fork — compute_fork_id called at fork creation ───────────────────
1289
1290 @pytest.mark.asyncio
1291 async def test_fork_repo_uses_compute_fork_id(self) -> None:
1292 """fork_repo calls compute_fork_id(source_repo_id, fork_repo_id, created_at_iso)."""
1293 from unittest.mock import patch, AsyncMock, MagicMock
1294 from musehub.services.musehub_repository import fork_repo
1295 from musehub.models.musehub import ForkRepoRequest
1296 from datetime import datetime, timezone
1297
1298 source_repo_id = _REPO_ID
1299 captured: list[tuple] = []
1300 _real = compute_fork_id
1301 def _spy(src: str, frk: str, t: str) -> None:
1302 result = _real(src, frk, t)
1303 captured.append((src, frk, t, result))
1304 return result
1305
1306 fake_source_repo = MagicMock()
1307 fake_source_repo.repo_id = source_repo_id
1308 fake_source_repo.name = "my-repo"
1309 fake_source_repo.slug = "my-repo"
1310 fake_source_repo.visibility = "public"
1311 fake_source_repo.description = "A test repo"
1312 fake_source_repo.domain_id = None
1313 fake_source_repo.owner = "gabriel"
1314 fake_source_repo.tags = []
1315
1316 db = self._async_session()
1317 call_count = 0
1318 def _execute(*a: MagicMock, **kw: MagicMock) -> None:
1319 nonlocal call_count
1320 call_count += 1
1321 m = MagicMock()
1322 if call_count == 1:
1323 m.scalar_one_or_none.return_value = fake_source_repo # source repo
1324 else:
1325 m.scalar_one_or_none.return_value = None # no duplicate / no slug collision
1326 return m
1327 db.execute.side_effect = _execute
1328
1329 # After flush()+refresh(), the fork_repo_row needs a repo_id so compute_fork_id can use it.
1330 # The DB would normally auto-assign it; we supply a canonical stub.
1331 _fork_repo_id_stub = fake_id("fork-repo-stub")
1332 refresh_count = 0
1333 async def _refresh_with_repo_id(obj: MagicMock) -> None:
1334 nonlocal refresh_count
1335 refresh_count += 1
1336 now = datetime.now(timezone.utc)
1337 for attr in ("created_at", "updated_at"):
1338 if not getattr(obj, attr, None):
1339 try:
1340 setattr(obj, attr, now)
1341 except Exception:
1342 pass
1343 # First refresh is for the fork repo row — give it a genesis-style repo_id.
1344 if refresh_count == 1 and hasattr(obj, "repo_id") and not getattr(obj, "repo_id", None):
1345 try:
1346 obj.repo_id = _fork_repo_id_stub
1347 except Exception:
1348 pass
1349 db.refresh = _refresh_with_repo_id
1350
1351 with patch("musehub.services.musehub_repository.compute_fork_id", side_effect=_spy):
1352 try:
1353 await fork_repo(
1354 db,
1355 source_repo_id=source_repo_id,
1356 forked_by_handle="alice",
1357 request=ForkRepoRequest(name=None),
1358 )
1359 except Exception:
1360 pass
1361
1362 assert captured, "compute_fork_id was never called — fork creation is broken"
1363 call_src, call_frk, call_ts, call_result = captured[0]
1364 assert call_src == source_repo_id
1365 assert _CANONICAL_RE.match(call_result)
1366
1367 # 6.9 Auth key — register_identity key_id ──────────────────────────────
1368
1369 @pytest.mark.asyncio
1370 async def test_register_key_uses_compute_key_id(self) -> None:
1371 """MusehubAuthKey rows use compute_key_id(identity_id, public_key_b64) — no random IDs."""
1372 from muse.core.types import encode_pubkey, public_key_fingerprint
1373 from musehub.services.musehub_auth import register_agent_identity
1374
1375 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
1376 from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
1377 priv = Ed25519PrivateKey.generate()
1378 pub = priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
1379 pub_b64 = encode_pubkey("ed25519", pub)
1380 fp = public_key_fingerprint(pub)
1381 expected_identity_id = compute_identity_id(pub)
1382 expected_key_id = compute_key_id(expected_identity_id, pub_b64)
1383
1384 db = self._async_session()
1385 await register_agent_identity(
1386 session=db,
1387 handle="key-test-agent",
1388 public_key_b64=pub_b64,
1389 fingerprint=fp,
1390 algorithm="ed25519",
1391 spawned_by="gabriel",
1392 )
1393
1394 # add() call list: [0] = identity, [1] = key
1395 assert len(db.add.call_args_list) >= 2
1396 key_row = db.add.call_args_list[1][0][0]
1397 assert key_row.key_id == expected_key_id
1398 assert _CANONICAL_RE.match(key_row.key_id)
1399
1400 # 6.10 Collaborator invite — invite_collaborator ────────────────────────
1401
1402 @pytest.mark.asyncio
1403 async def test_invite_collaborator_uses_compute_collaborator_id(self) -> None:
1404 """Collaborator rows use compute_collaborator_id(repo_id, identity_id, invited_at_iso)."""
1405 from unittest.mock import patch, AsyncMock, MagicMock
1406 from datetime import datetime, timezone
1407 import musehub.api.routes.musehub.collaborators as collabs_module
1408
1409 repo_id = _REPO_ID
1410 fixed_now = datetime(2026, 1, 14, 0, 0, 0, tzinfo=timezone.utc)
1411 invitee_identity_id = _COLLAB_IDENTITY_ID
1412 expected = compute_collaborator_id(repo_id, invitee_identity_id, fixed_now.isoformat())
1413
1414 # Repo owner must match the actor so the 403 guard passes.
1415 fake_repo = MagicMock()
1416 fake_repo.owner = "gabriel"
1417
1418 fake_invitee_identity = MagicMock()
1419 fake_invitee_identity.identity_id = invitee_identity_id
1420
1421 db = self._async_session()
1422 call_count = 0
1423 def _execute(*a: MagicMock, **kw: MagicMock) -> None:
1424 nonlocal call_count
1425 call_count += 1
1426 m = MagicMock()
1427 if call_count == 1:
1428 m.scalar_one_or_none.return_value = MagicMock(permission="owner") # actor perm
1429 elif call_count == 2:
1430 m.scalar_one_or_none.return_value = None # no duplicate
1431 else:
1432 m.scalar_one_or_none.return_value = fake_invitee_identity # invitee lookup
1433 return m
1434 db.execute.side_effect = _execute
1435
1436 with patch("musehub.api.routes.musehub.collaborators.musehub_repository") as mock_svc, \
1437 patch("musehub.api.routes.musehub.collaborators.datetime") as mock_dt:
1438 mock_svc.get_repo = AsyncMock(return_value=fake_repo)
1439 mock_dt.now.return_value = fixed_now
1440 mock_dt.timezone = timezone
1441
1442 try:
1443 await collabs_module.invite_collaborator(
1444 repo_id=repo_id,
1445 body=collabs_module.CollaboratorInviteRequest(handle="alice"),
1446 db=db,
1447 token=MagicMock(handle="gabriel"),
1448 )
1449 except Exception:
1450 pass
1451
1452 assert db.add.called, "db.add was never called — collaborator row was not created"
1453 row = db.add.call_args_list[0][0][0]
1454 assert row.id == expected, f"expected {expected!r}, got {row.id!r}"
1455 assert _CANONICAL_RE.match(row.id)
File History 2 commits
sha256:4992098130166d191cefed0a2821d19cd3cdd3cf50867a4e715c2b30636826c7 fix: repair syntax errors from typing annotation cleanup Sonnet 4.6 20 days ago
sha256:ef10830ce231e0a20efcb0e2586cb879471247e916616e6fdd0d51df459e2595 fix: typing audit — 0 violations, 0 untyped defs across all… Sonnet 4.6 minor 20 days ago