test_proposal_reimagination_phase3.py
python
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa
Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As…
Human
1 day ago
| 1 | """Phase 3 — Merge Strategy Engine tests (issue #37). |
| 2 | |
| 3 | Tier 1 — Unit (pure, no DB) |
| 4 | Domain classifier: |
| 5 | - Extension-based classification for code/midi/stem/payment |
| 6 | - Path-prefix overrides beat extension rules |
| 7 | - Unknown extensions fall back to "code" |
| 8 | - paths_for_domains filters correctly |
| 9 | |
| 10 | OVERLAY: |
| 11 | - from_branch wins all conflicts |
| 12 | - to_branch-only files preserved |
| 13 | - files_added / files_modified / files_removed counts correct |
| 14 | - conflicts populated when ancestor provided and both sides changed same file |
| 15 | - no conflicts when ancestor not provided |
| 16 | |
| 17 | WEAVE: |
| 18 | - only-from changes applied cleanly |
| 19 | - only-to changes preserved |
| 20 | - unchanged files from ancestor preserved |
| 21 | - true conflicts recorded and resolved from_wins |
| 22 | - from_branch deletions honoured |
| 23 | - to_branch deletions honoured when from didn't touch |
| 24 | |
| 25 | REPLAY: |
| 26 | - applies only from_branch delta (vs ancestor) onto to_branch |
| 27 | - to_branch-only changes preserved (unlike overlay) |
| 28 | - conflicts recorded when both sides changed same file |
| 29 | - from_branch removals applied |
| 30 | |
| 31 | SELECTIVE: |
| 32 | - only files in selected domains taken from from_branch |
| 33 | - files in other domains unchanged from to_branch |
| 34 | - files_skipped counts non-selected files |
| 35 | - ValueError on empty selective_domains |
| 36 | |
| 37 | PHASED: |
| 38 | - fallback to overlay when no phase_manifests provided |
| 39 | - strategy name set to "phased" |
| 40 | - phase_results populated with at least one entry |
| 41 | - full phased execution applies deltas in order |
| 42 | |
| 43 | execute_merge_strategy router: |
| 44 | - routes all 5 strategies correctly |
| 45 | - raises ValueError on unknown strategy |
| 46 | - weave falls back to overlay when no ancestor |
| 47 | |
| 48 | Tier 5 — Integration (DB) |
| 49 | - merge_proposal uses OVERLAY by default |
| 50 | - merge_proposal uses SELECTIVE and only applies selected domains |
| 51 | - merge_proposal logs strategy info (smoke test on execution path) |
| 52 | """ |
| 53 | |
| 54 | from __future__ import annotations |
| 55 | |
| 56 | import os |
| 57 | from datetime import datetime, timezone |
| 58 | from typing import Any |
| 59 | |
| 60 | import pytest |
| 61 | from muse.core.types import blob_id, fake_id, short_id |
| 62 | from sqlalchemy.ext.asyncio import AsyncSession |
| 63 | |
| 64 | from musehub.types.json_types import StrDict |
| 65 | from musehub.services.proposal_merge_strategies import ( |
| 66 | ConflictEntry, |
| 67 | MergeResult, |
| 68 | classify_domain, |
| 69 | execute_merge_strategy, |
| 70 | merge_selective, |
| 71 | merge_phased, |
| 72 | merge_overlay, |
| 73 | merge_replay, |
| 74 | merge_weave, |
| 75 | paths_for_domains, |
| 76 | ) |
| 77 | |
| 78 | |
| 79 | # ───────────────────────────────────────────────────────────────────────────── |
| 80 | # Helpers |
| 81 | # ───────────────────────────────────────────────────────────────────────────── |
| 82 | |
| 83 | |
| 84 | def _now() -> datetime: |
| 85 | return datetime.now(tz=timezone.utc) |
| 86 | |
| 87 | |
| 88 | def _uid() -> str: |
| 89 | return short_id(blob_id(os.urandom(16)), strip=True) |
| 90 | |
| 91 | |
| 92 | def _oid(label: int | str) -> str: |
| 93 | """Deterministic test object ID — delegates to the canonical fake_id utility.""" |
| 94 | return fake_id(str(label)) |
| 95 | |
| 96 | |
| 97 | # ───────────────────────────────────────────────────────────────────────────── |
| 98 | # Tier 1 — Domain classifier |
| 99 | # ───────────────────────────────────────────────────────────────────────────── |
| 100 | |
| 101 | |
| 102 | class TestDomainClassifier: |
| 103 | def test_python_file_is_code(self) -> None: |
| 104 | assert classify_domain("src/auth.py") == "code" |
| 105 | |
| 106 | def test_typescript_file_is_code(self) -> None: |
| 107 | assert classify_domain("src/components/App.tsx") == "code" |
| 108 | |
| 109 | def test_midi_extension(self) -> None: |
| 110 | assert classify_domain("tracks/main.mid") == "midi" |
| 111 | |
| 112 | def test_midi_dot_midi_extension(self) -> None: |
| 113 | assert classify_domain("sequences/intro.midi") == "midi" |
| 114 | |
| 115 | def test_wav_is_stem(self) -> None: |
| 116 | assert classify_domain("samples/kick.wav") == "stem" |
| 117 | |
| 118 | def test_flac_is_stem(self) -> None: |
| 119 | assert classify_domain("recordings/session.flac") == "stem" |
| 120 | |
| 121 | def test_unknown_extension_fallback_to_code(self) -> None: |
| 122 | assert classify_domain("assets/mystery.xyz") == "code" |
| 123 | |
| 124 | def test_path_prefix_midi_beats_extension(self) -> None: |
| 125 | # .py file under midi/ path should be classified as midi |
| 126 | assert classify_domain("midi/generator.py") == "midi" |
| 127 | |
| 128 | def test_path_prefix_stem(self) -> None: |
| 129 | assert classify_domain("stems/vocal.wav") == "stem" |
| 130 | |
| 131 | def test_path_prefix_payment(self) -> None: |
| 132 | assert classify_domain("payments/claim_001.json") == "payment" |
| 133 | |
| 134 | def test_path_prefix_identity(self) -> None: |
| 135 | assert classify_domain("identity/keys.toml") == "identity" |
| 136 | |
| 137 | def test_paths_for_domains_filter(self) -> None: |
| 138 | manifest = { |
| 139 | "src/main.py": _oid(1), |
| 140 | "tracks/beat.mid": _oid(2), |
| 141 | "stems/vocal.wav": _oid(3), |
| 142 | } |
| 143 | code_paths = paths_for_domains(manifest, ["code"]) |
| 144 | assert "src/main.py" in code_paths |
| 145 | assert "tracks/beat.mid" not in code_paths |
| 146 | |
| 147 | midi_stem = paths_for_domains(manifest, ["midi", "stem"]) |
| 148 | assert "tracks/beat.mid" in midi_stem |
| 149 | assert "stems/vocal.wav" in midi_stem |
| 150 | assert "src/main.py" not in midi_stem |
| 151 | |
| 152 | |
| 153 | # ───────────────────────────────────────────────────────────────────────────── |
| 154 | # Tier 1 — STATE_OVERLAY |
| 155 | # ───────────────────────────────────────────────────────────────────────────── |
| 156 | |
| 157 | |
| 158 | class TestStateOverlay: |
| 159 | def _to(self) -> StrDict: |
| 160 | return {"shared.py": _oid(1), "only_to.py": _oid(2)} |
| 161 | |
| 162 | def _from(self) -> StrDict: |
| 163 | return {"shared.py": _oid(3), "only_from.py": _oid(4)} |
| 164 | |
| 165 | def test_from_wins_shared_file(self) -> None: |
| 166 | to, frm = self._to(), self._from() |
| 167 | result = merge_overlay(to, frm) |
| 168 | assert result.manifest["shared.py"] == frm["shared.py"] |
| 169 | |
| 170 | def test_to_only_files_preserved(self) -> None: |
| 171 | to, frm = self._to(), self._from() |
| 172 | result = merge_overlay(to, frm) |
| 173 | assert "only_to.py" in result.manifest |
| 174 | |
| 175 | def test_from_only_files_added(self) -> None: |
| 176 | to, frm = self._to(), self._from() |
| 177 | result = merge_overlay(to, frm) |
| 178 | assert result.manifest["only_from.py"] == frm["only_from.py"] |
| 179 | |
| 180 | def test_counts(self) -> None: |
| 181 | to = {"a.py": _oid(1), "b.py": _oid(2)} |
| 182 | frm = {"b.py": _oid(3), "c.py": _oid(4)} # b modified, c added |
| 183 | result = merge_overlay(to, frm) |
| 184 | assert result.files_added == 1 # c |
| 185 | assert result.files_modified == 1 # b |
| 186 | |
| 187 | def test_no_conflicts_without_ancestor(self) -> None: |
| 188 | to, frm = self._to(), self._from() |
| 189 | result = merge_overlay(to, frm) |
| 190 | assert result.conflicts == [] |
| 191 | |
| 192 | def test_conflicts_when_ancestor_provided(self) -> None: |
| 193 | ancestor = {"shared.py": _oid(0)} |
| 194 | to = {"shared.py": _oid(1)} # to changed it |
| 195 | frm = {"shared.py": _oid(2)} # from changed it differently |
| 196 | result = merge_overlay(to, frm, ancestor_manifest=ancestor) |
| 197 | assert len(result.conflicts) == 1 |
| 198 | assert result.conflicts[0].path == "shared.py" |
| 199 | assert result.conflicts[0].resolution == "from_wins" |
| 200 | |
| 201 | def test_no_conflict_when_only_one_side_changed(self) -> None: |
| 202 | ancestor = {"shared.py": _oid(0), "to_only.py": _oid(1)} |
| 203 | to = {"shared.py": _oid(0), "to_only.py": _oid(2)} # only to_only changed |
| 204 | frm = {"shared.py": _oid(3)} # from changed shared |
| 205 | result = merge_overlay(to, frm, ancestor_manifest=ancestor) |
| 206 | assert result.conflicts == [] |
| 207 | |
| 208 | def test_strategy_name(self) -> None: |
| 209 | result = merge_overlay({}, {}) |
| 210 | assert result.strategy == "overlay" |
| 211 | |
| 212 | def test_domains_merged_populated(self) -> None: |
| 213 | to = {} |
| 214 | frm = {"tracks/beat.mid": _oid(1), "src/main.py": _oid(2)} |
| 215 | result = merge_overlay(to, frm) |
| 216 | assert "midi" in result.domains_merged |
| 217 | assert "code" in result.domains_merged |
| 218 | |
| 219 | |
| 220 | # ───────────────────────────────────────────────────────────────────────────── |
| 221 | # Tier 1 — STATE_WEAVE |
| 222 | # ───────────────────────────────────────────────────────────────────────────── |
| 223 | |
| 224 | |
| 225 | class TestStateWeave: |
| 226 | def test_clean_from_change_applied(self) -> None: |
| 227 | ancestor = {"a.py": _oid(1), "b.py": _oid(2)} |
| 228 | to = {"a.py": _oid(1), "b.py": _oid(2)} # unchanged |
| 229 | frm = {"a.py": _oid(3), "b.py": _oid(2)} # a changed |
| 230 | result = merge_weave(to, frm, ancestor_manifest=ancestor) |
| 231 | assert result.manifest["a.py"] == _oid(3) |
| 232 | assert result.conflicts == [] |
| 233 | |
| 234 | def test_clean_to_change_preserved(self) -> None: |
| 235 | ancestor = {"a.py": _oid(1)} |
| 236 | to = {"a.py": _oid(2)} # to changed it |
| 237 | frm = {"a.py": _oid(1)} # from unchanged |
| 238 | result = merge_weave(to, frm, ancestor_manifest=ancestor) |
| 239 | assert result.manifest["a.py"] == _oid(2) |
| 240 | assert result.conflicts == [] |
| 241 | |
| 242 | def test_unchanged_file_kept(self) -> None: |
| 243 | ancestor = {"stable.py": _oid(9)} |
| 244 | to = {"stable.py": _oid(9)} |
| 245 | frm = {"stable.py": _oid(9)} |
| 246 | result = merge_weave(to, frm, ancestor_manifest=ancestor) |
| 247 | assert result.manifest["stable.py"] == _oid(9) |
| 248 | |
| 249 | def test_true_conflict_recorded_from_wins(self) -> None: |
| 250 | ancestor = {"conflict.py": _oid(0)} |
| 251 | to = {"conflict.py": _oid(1)} |
| 252 | frm = {"conflict.py": _oid(2)} |
| 253 | result = merge_weave(to, frm, ancestor_manifest=ancestor) |
| 254 | assert result.manifest["conflict.py"] == _oid(2) |
| 255 | assert len(result.conflicts) == 1 |
| 256 | assert result.conflicts[0].resolution == "from_wins" |
| 257 | |
| 258 | def test_from_deletion_honoured(self) -> None: |
| 259 | ancestor = {"gone.py": _oid(1)} |
| 260 | to = {"gone.py": _oid(1)} # unchanged in to |
| 261 | frm = {} # deleted in from |
| 262 | result = merge_weave(to, frm, ancestor_manifest=ancestor) |
| 263 | assert "gone.py" not in result.manifest |
| 264 | |
| 265 | def test_to_deletion_honoured_when_from_untouched(self) -> None: |
| 266 | ancestor = {"bye.py": _oid(1)} |
| 267 | to = {} # deleted in to |
| 268 | frm = {"bye.py": _oid(1)} # unchanged in from |
| 269 | result = merge_weave(to, frm, ancestor_manifest=ancestor) |
| 270 | assert "bye.py" not in result.manifest |
| 271 | |
| 272 | def test_new_file_from_only_added(self) -> None: |
| 273 | ancestor: StrDict = {} |
| 274 | to: StrDict = {} |
| 275 | frm = {"new.py": _oid(5)} |
| 276 | result = merge_weave(to, frm, ancestor_manifest=ancestor) |
| 277 | assert result.manifest["new.py"] == _oid(5) |
| 278 | |
| 279 | def test_strategy_name(self) -> None: |
| 280 | result = merge_weave({}, {}, ancestor_manifest={}) |
| 281 | assert result.strategy == "weave" |
| 282 | |
| 283 | |
| 284 | # ───────────────────────────────────────────────────────────────────────────── |
| 285 | # Tier 1 — STATE_REBASE |
| 286 | # ───────────────────────────────────────────────────────────────────────────── |
| 287 | |
| 288 | |
| 289 | class TestStateRebase: |
| 290 | def test_from_delta_applied_onto_to(self) -> None: |
| 291 | ancestor = {"a.py": _oid(1), "b.py": _oid(2)} |
| 292 | to = {"a.py": _oid(1), "b.py": _oid(3)} # to changed b |
| 293 | frm = {"a.py": _oid(4), "b.py": _oid(2)} # from changed a |
| 294 | result = merge_replay(to, frm, ancestor_manifest=ancestor) |
| 295 | # a gets from's version, b keeps to's change |
| 296 | assert result.manifest["a.py"] == _oid(4) |
| 297 | assert result.manifest["b.py"] == _oid(3) |
| 298 | |
| 299 | def test_to_only_changes_preserved(self) -> None: |
| 300 | ancestor = {"a.py": _oid(1)} |
| 301 | to = {"a.py": _oid(1), "to_added.py": _oid(10)} # to added a file |
| 302 | frm = {"a.py": _oid(1)} # from didn't touch it |
| 303 | result = merge_replay(to, frm, ancestor_manifest=ancestor) |
| 304 | assert "to_added.py" in result.manifest |
| 305 | |
| 306 | def test_conflict_when_both_modified_same_file(self) -> None: |
| 307 | ancestor = {"x.py": _oid(0)} |
| 308 | to = {"x.py": _oid(1)} |
| 309 | frm = {"x.py": _oid(2)} |
| 310 | result = merge_replay(to, frm, ancestor_manifest=ancestor) |
| 311 | assert result.manifest["x.py"] == _oid(2) |
| 312 | assert len(result.conflicts) == 1 |
| 313 | |
| 314 | def test_from_removal_applied(self) -> None: |
| 315 | ancestor = {"old.py": _oid(1)} |
| 316 | to = {"old.py": _oid(1)} |
| 317 | frm: StrDict = {} |
| 318 | result = merge_replay(to, frm, ancestor_manifest=ancestor) |
| 319 | assert "old.py" not in result.manifest |
| 320 | assert result.files_removed == 1 |
| 321 | |
| 322 | def test_strategy_name(self) -> None: |
| 323 | result = merge_replay({}, {}, ancestor_manifest={}) |
| 324 | assert result.strategy == "replay" |
| 325 | |
| 326 | |
| 327 | # ───────────────────────────────────────────────────────────────────────────── |
| 328 | # Tier 1 — DOMAIN_SELECTIVE |
| 329 | # ───────────────────────────────────────────────────────────────────────────── |
| 330 | |
| 331 | |
| 332 | class TestDomainSelective: |
| 333 | def test_selected_domain_applied(self) -> None: |
| 334 | to = {"src/main.py": _oid(1), "tracks/beat.mid": _oid(2)} |
| 335 | frm = {"src/main.py": _oid(3), "tracks/beat.mid": _oid(4)} |
| 336 | result = merge_selective(to, frm, selective_domains=["code"]) |
| 337 | assert result.manifest["src/main.py"] == _oid(3) # code → from wins |
| 338 | assert result.manifest["tracks/beat.mid"] == _oid(2) # midi → untouched |
| 339 | |
| 340 | def test_non_selected_files_unchanged(self) -> None: |
| 341 | to = {"tracks/beat.mid": _oid(1)} |
| 342 | frm = {"tracks/beat.mid": _oid(2), "src/new.py": _oid(3)} |
| 343 | result = merge_selective(to, frm, selective_domains=["code"]) |
| 344 | assert result.manifest["tracks/beat.mid"] == _oid(1) # midi not selected |
| 345 | |
| 346 | def test_files_skipped_counted(self) -> None: |
| 347 | to = {"a.py": _oid(1)} |
| 348 | frm = {"a.py": _oid(2), "beat.mid": _oid(3), "vocal.wav": _oid(4)} |
| 349 | result = merge_selective(to, frm, selective_domains=["code"]) |
| 350 | assert result.files_skipped >= 2 # mid + wav not in selected |
| 351 | |
| 352 | def test_empty_selective_domains_raises(self) -> None: |
| 353 | with pytest.raises(ValueError, match="selective_domains"): |
| 354 | merge_selective({}, {}, selective_domains=[]) |
| 355 | |
| 356 | def test_domains_merged_lists_selected(self) -> None: |
| 357 | to = {} |
| 358 | frm = {"src/x.py": _oid(1)} |
| 359 | result = merge_selective(to, frm, selective_domains=["code", "midi"]) |
| 360 | assert "code" in result.domains_merged |
| 361 | |
| 362 | def test_strategy_name(self) -> None: |
| 363 | result = merge_selective({}, {}, selective_domains=["code"]) |
| 364 | assert result.strategy == "selective" |
| 365 | |
| 366 | def test_conflict_recorded_when_ancestor_provided(self) -> None: |
| 367 | ancestor = {"src/x.py": _oid(0)} |
| 368 | to = {"src/x.py": _oid(1)} |
| 369 | frm = {"src/x.py": _oid(2)} |
| 370 | result = merge_selective( |
| 371 | to, frm, selective_domains=["code"], ancestor_manifest=ancestor |
| 372 | ) |
| 373 | assert len(result.conflicts) == 1 |
| 374 | |
| 375 | |
| 376 | # ───────────────────────────────────────────────────────────────────────────── |
| 377 | # Tier 1 — PHASED |
| 378 | # ───────────────────────────────────────────────────────────────────────────── |
| 379 | |
| 380 | |
| 381 | class TestMergePhased: |
| 382 | def test_fallback_overlay_when_no_phase_manifests(self) -> None: |
| 383 | to = {"a.py": _oid(1)} |
| 384 | frm = {"a.py": _oid(2), "b.py": _oid(3)} |
| 385 | result = merge_phased(to, frm) |
| 386 | assert result.strategy == "phased" |
| 387 | assert result.manifest["a.py"] == _oid(2) |
| 388 | assert result.manifest["b.py"] == _oid(3) |
| 389 | |
| 390 | def test_fallback_has_one_phase_result(self) -> None: |
| 391 | result = merge_phased({}, {"new.py": _oid(1)}) |
| 392 | assert len(result.phase_results) == 1 |
| 393 | assert result.phase_results[0].dependency_proposal_id == "self" |
| 394 | |
| 395 | def test_full_phased_applies_in_order(self) -> None: |
| 396 | ancestor = {"a.py": _oid(0)} |
| 397 | to = {"a.py": _oid(0)} |
| 398 | frm = {"a.py": _oid(3), "c.py": _oid(4)} |
| 399 | |
| 400 | dep1_id = "dep1" |
| 401 | dep1_manifest = {"a.py": _oid(1)} # dep1 changed a.py |
| 402 | |
| 403 | dep2_id = "dep2" |
| 404 | dep2_manifest = {"a.py": _oid(2), "b.py": _oid(5)} # dep2 changed a.py + added b.py |
| 405 | |
| 406 | result = merge_phased( |
| 407 | to, frm, |
| 408 | ancestor_manifest=ancestor, |
| 409 | dependency_order=[dep1_id, dep2_id], |
| 410 | phase_manifests={dep1_id: dep1_manifest, dep2_id: dep2_manifest}, |
| 411 | ) |
| 412 | assert result.strategy == "phased" |
| 413 | # Final merge: from_branch (a=3, c=4) applied on top of dep2 result |
| 414 | assert result.manifest["c.py"] == _oid(4) |
| 415 | assert len(result.phase_results) == 3 # dep1, dep2, self |
| 416 | |
| 417 | def test_phase_results_have_correct_indices(self) -> None: |
| 418 | dep_id = "dep_x" |
| 419 | result = merge_phased( |
| 420 | {"a.py": _oid(1)}, {"a.py": _oid(2)}, |
| 421 | dependency_order=[dep_id], |
| 422 | phase_manifests={dep_id: {"a.py": _oid(1)}}, |
| 423 | ) |
| 424 | indices = [pr.phase_index for pr in result.phase_results] |
| 425 | assert indices == sorted(indices) |
| 426 | |
| 427 | def test_strategy_name(self) -> None: |
| 428 | result = merge_phased({}, {}) |
| 429 | assert result.strategy == "phased" |
| 430 | |
| 431 | |
| 432 | # ───────────────────────────────────────────────────────────────────────────── |
| 433 | # Tier 1 — execute_merge_strategy router |
| 434 | # ───────────────────────────────────────────────────────────────────────────── |
| 435 | |
| 436 | |
| 437 | class TestStrategyRouter: |
| 438 | def test_routes_overlay(self) -> None: |
| 439 | result = execute_merge_strategy("overlay", {}, {"x.py": _oid(1)}) |
| 440 | assert result.strategy == "overlay" |
| 441 | |
| 442 | def test_routes_weave(self) -> None: |
| 443 | result = execute_merge_strategy( |
| 444 | "weave", {}, {}, ancestor_manifest={} |
| 445 | ) |
| 446 | assert result.strategy == "weave" |
| 447 | |
| 448 | def test_routes_replay(self) -> None: |
| 449 | result = execute_merge_strategy( |
| 450 | "replay", {}, {}, ancestor_manifest={} |
| 451 | ) |
| 452 | assert result.strategy == "replay" |
| 453 | |
| 454 | def test_routes_selective(self) -> None: |
| 455 | result = execute_merge_strategy( |
| 456 | "selective", {}, {}, selective_domains=["code"] |
| 457 | ) |
| 458 | assert result.strategy == "selective" |
| 459 | |
| 460 | def test_routes_phased(self) -> None: |
| 461 | result = execute_merge_strategy("phased", {}, {}) |
| 462 | assert result.strategy == "phased" |
| 463 | |
| 464 | def test_unknown_strategy_raises(self) -> None: |
| 465 | with pytest.raises(ValueError, match="Unknown merge strategy"): |
| 466 | execute_merge_strategy("magic_merge", {}, {}) |
| 467 | |
| 468 | def test_weave_fallback_without_ancestor(self) -> None: |
| 469 | result = execute_merge_strategy("weave", {}, {"a.py": _oid(1)}) |
| 470 | assert result.strategy == "weave" |
| 471 | assert result.manifest["a.py"] == _oid(1) |
| 472 | |
| 473 | def test_replay_fallback_without_ancestor(self) -> None: |
| 474 | result = execute_merge_strategy("replay", {}, {"a.py": _oid(1)}) |
| 475 | assert result.strategy == "replay" |
| 476 | |
| 477 | |
| 478 | # ───────────────────────────────────────────────────────────────────────────── |
| 479 | # Tier 5 — Integration: strategy wired into merge_proposal |
| 480 | # ───────────────────────────────────────────────────────────────────────────── |
| 481 | |
| 482 | |
| 483 | async def _make_repo(session: AsyncSession) -> str: |
| 484 | from musehub.core.genesis import compute_identity_id, compute_repo_id |
| 485 | from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubRepo |
| 486 | |
| 487 | owner = "strattest" |
| 488 | slug = f"repo-{_uid()}" |
| 489 | owner_id = compute_identity_id(owner.encode()) |
| 490 | created_at = _now() |
| 491 | repo = MusehubRepo( |
| 492 | repo_id=compute_repo_id(owner_id, slug, "code", created_at.isoformat()), |
| 493 | name=slug, |
| 494 | owner=owner, |
| 495 | slug=slug, |
| 496 | visibility="public", |
| 497 | owner_user_id=owner_id, |
| 498 | description="", |
| 499 | tags=[], |
| 500 | created_at=created_at, |
| 501 | ) |
| 502 | session.add(repo) |
| 503 | await session.flush() |
| 504 | return repo.repo_id |
| 505 | |
| 506 | |
| 507 | async def _make_branch_with_commit( |
| 508 | session: AsyncSession, |
| 509 | repo_id: str, |
| 510 | branch_name: str, |
| 511 | manifest: StrDict, |
| 512 | ) -> None: |
| 513 | """Create a branch + one commit + snapshot with the given manifest.""" |
| 514 | from musehub.core.genesis import compute_identity_id |
| 515 | from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubCommitRef, MusehubRepo |
| 516 | from musehub.muse_cli.snapshot import compute_commit_id, compute_snapshot_id |
| 517 | from musehub.services.musehub_snapshot import upsert_snapshot_entries |
| 518 | from musehub.core.genesis import compute_branch_id |
| 519 | |
| 520 | created_at = _now() |
| 521 | snapshot_id = compute_snapshot_id(manifest) |
| 522 | await upsert_snapshot_entries(session, repo_id, snapshot_id, manifest) |
| 523 | |
| 524 | commit_id = compute_commit_id( |
| 525 | [], snapshot_id, f"init {branch_name}", created_at.isoformat(), |
| 526 | author="strattest", signer_public_key="", |
| 527 | ) |
| 528 | commit = MusehubCommit( |
| 529 | commit_id=commit_id, |
| 530 | branch=branch_name, |
| 531 | parent_ids=[], |
| 532 | message=f"init {branch_name}", |
| 533 | author="strattest", |
| 534 | timestamp=created_at, |
| 535 | snapshot_id=snapshot_id, |
| 536 | ) |
| 537 | session.add(commit) |
| 538 | session.add(MusehubCommitRef(repo_id=repo_id, commit_id=commit_id)) |
| 539 | |
| 540 | branch = MusehubBranch( |
| 541 | branch_id=compute_branch_id(repo_id, branch_name), |
| 542 | repo_id=repo_id, |
| 543 | name=branch_name, |
| 544 | head_commit_id=commit_id, |
| 545 | ) |
| 546 | session.add(branch) |
| 547 | await session.flush() |
| 548 | |
| 549 | |
| 550 | class TestMergeProposalStrategyIntegration: |
| 551 | @pytest.mark.asyncio |
| 552 | async def test_default_overlay_merges_manifests( |
| 553 | self, db_session: AsyncSession |
| 554 | ) -> None: |
| 555 | from musehub.services.musehub_proposals import create_proposal, merge_proposal |
| 556 | from musehub.services.musehub_snapshot import get_snapshot_manifest |
| 557 | from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubRepo |
| 558 | from sqlalchemy import select |
| 559 | |
| 560 | repo_id = await _make_repo(db_session) |
| 561 | |
| 562 | to_manifest = {"shared.py": _oid(1), "to_only.py": _oid(2)} |
| 563 | from_manifest = {"shared.py": _oid(3), "from_only.py": _oid(4)} |
| 564 | |
| 565 | await _make_branch_with_commit(db_session, repo_id, "dev", to_manifest) |
| 566 | await _make_branch_with_commit(db_session, repo_id, "feat/overlay", from_manifest) |
| 567 | |
| 568 | proposal_resp = await create_proposal( |
| 569 | db_session, |
| 570 | repo_id=repo_id, |
| 571 | title="overlay merge", |
| 572 | from_branch="feat/overlay", |
| 573 | to_branch="dev", |
| 574 | author="strattest", |
| 575 | ) |
| 576 | merged_resp = await merge_proposal(db_session, repo_id, proposal_resp.proposal_id) |
| 577 | |
| 578 | # Find the merge commit and read its snapshot |
| 579 | merge_commit = await db_session.get(MusehubCommit, merged_resp.merge_commit_id) |
| 580 | assert merge_commit is not None |
| 581 | merged = await get_snapshot_manifest(db_session, merge_commit.snapshot_id) |
| 582 | |
| 583 | assert merged["shared.py"] == _oid(3) # from_branch won |
| 584 | assert merged["to_only.py"] == _oid(2) # preserved from to |
| 585 | assert merged["from_only.py"] == _oid(4) # added from from |
| 586 | |
| 587 | @pytest.mark.asyncio |
| 588 | async def test_domain_selective_only_applies_selected_domain( |
| 589 | self, db_session: AsyncSession |
| 590 | ) -> None: |
| 591 | from musehub.services.musehub_proposals import create_proposal, merge_proposal |
| 592 | from musehub.services.musehub_snapshot import get_snapshot_manifest |
| 593 | from musehub.db.musehub_repo_models import MusehubBranch, MusehubCommit, MusehubRepo |
| 594 | |
| 595 | repo_id = await _make_repo(db_session) |
| 596 | |
| 597 | to_manifest = {"src/main.py": _oid(1), "tracks/beat.mid": _oid(2)} |
| 598 | from_manifest = {"src/main.py": _oid(3), "tracks/beat.mid": _oid(4)} |
| 599 | |
| 600 | await _make_branch_with_commit(db_session, repo_id, "dev", to_manifest) |
| 601 | await _make_branch_with_commit(db_session, repo_id, "feat/selective", from_manifest) |
| 602 | |
| 603 | proposal_resp = await create_proposal( |
| 604 | db_session, |
| 605 | repo_id=repo_id, |
| 606 | title="selective merge", |
| 607 | from_branch="feat/selective", |
| 608 | to_branch="dev", |
| 609 | author="strattest", |
| 610 | merge_strategy="selective", |
| 611 | selective_domains=["code"], |
| 612 | ) |
| 613 | merged_resp = await merge_proposal(db_session, repo_id, proposal_resp.proposal_id) |
| 614 | |
| 615 | merge_commit = await db_session.get(MusehubCommit, merged_resp.merge_commit_id) |
| 616 | assert merge_commit is not None |
| 617 | merged = await get_snapshot_manifest(db_session, merge_commit.snapshot_id) |
| 618 | |
| 619 | assert merged["src/main.py"] == _oid(3) # code domain applied |
| 620 | assert merged["tracks/beat.mid"] == _oid(2) # midi not selected — untouched |
File History
3 commits
sha256:94ef169c149a452bff7c604ded8b280b19bd477c2dabcb56972780b0b784c7aa
Merge 'fix/assignee-sigil-inline' into 'dev' — proposal: As…
Human
1 day ago
sha256:6b1949fc2797ca4c1936a637a4cbfec828ef56cf52398a2e74ca3c4f494e728f
fix: use wire_bytes not mpack_bytes_raw in compute_object_b…
Sonnet 4.6
patch
9 days ago
sha256:4aed3d8601c8dd3ed37074de35f11f4a9699a0a4b99d43727048fd3f8e6fd13d
chore: doc sweep, ignore wrangler build state, misc fixes
Sonnet 4.6
minor
⚠
12 days ago