gabriel / muse public
test_stress_midi_all_dims.py python
473 lines 18.8 KB
Raw
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
1 """Stress tests for the 21-dimension MIDI merge engine.
2
3 Each of the 21 internal dimensions is tested for:
4 1. Clean auto-merge when only one side changes (left_only / right_only).
5 2. Correct conflict detection when both sides change independently.
6 3. Unchanged dimensions are preserved from base unchanged.
7 4. Non-independent dimensions (tempo_map, time_signatures, track_structure)
8 block the entire merge on bilateral conflict.
9 5. dimension_conflict_detail returns correct change labels for all 21 dims.
10 6. extract_dimensions round-trips every event type.
11 7. Large sequences (100 notes, many CC events) handled correctly.
12 """
13
14 import io
15 from typing import TypedDict
16
17 import mido
18 import pytest
19
20 pytestmark = pytest.mark.midi
21
22 from muse.core.attributes import AttributeRule
23 from muse.plugins.midi.midi_merge import (
24 INTERNAL_DIMS,
25 NON_INDEPENDENT_DIMS,
26 MidiDimensions,
27 dimension_conflict_detail,
28 extract_dimensions,
29 merge_midi_dimensions,
30 )
31
32
33 # ---------------------------------------------------------------------------
34 # Typed kwargs for _make_midi — avoids bare dict in parametrize
35 # ---------------------------------------------------------------------------
36
37
38 class MidiKwargs(TypedDict, total=False):
39 notes: list[tuple[int, int, int]]
40 pitchwheel: list[tuple[int, int]]
41 control_change: list[tuple[int, int, int]]
42 channel_pressure: list[tuple[int, int]]
43 poly_aftertouch: list[tuple[int, int, int]]
44 program_change: list[tuple[int, int]]
45 set_tempo: int
46 time_sig: tuple[int, int]
47 key_sig: str
48 marker: str
49 track_name: str
50 ticks_per_beat: int
51
52
53 # ---------------------------------------------------------------------------
54 # MIDI construction helpers (reused from test_music_midi_merge)
55 # ---------------------------------------------------------------------------
56
57
58 def _make_midi(
59 *,
60 notes: list[tuple[int, int, int]] | None = None,
61 pitchwheel: list[tuple[int, int]] | None = None,
62 control_change: list[tuple[int, int, int]] | None = None,
63 channel_pressure: list[tuple[int, int]] | None = None,
64 poly_aftertouch: list[tuple[int, int, int]] | None = None,
65 program_change: list[tuple[int, int]] | None = None,
66 set_tempo: int | None = None,
67 time_sig: tuple[int, int] | None = None,
68 key_sig: str | None = None,
69 marker: str | None = None,
70 track_name: str | None = None,
71 ticks_per_beat: int = 480,
72 ) -> bytes:
73 mid = mido.MidiFile(type=0, ticks_per_beat=ticks_per_beat)
74 track = mido.MidiTrack()
75 events: list[tuple[int, mido.Message]] = []
76
77 tempo = set_tempo if set_tempo is not None else 500_000
78 events.append((0, mido.MetaMessage("set_tempo", tempo=tempo, time=0)))
79
80 if time_sig:
81 events.append((0, mido.MetaMessage("time_signature", numerator=time_sig[0], denominator=time_sig[1], time=0)))
82 if key_sig:
83 events.append((0, mido.MetaMessage("key_signature", key=key_sig, time=0)))
84 if marker:
85 events.append((0, mido.MetaMessage("marker", text=marker, time=0)))
86 if track_name:
87 events.append((0, mido.MetaMessage("track_name", name=track_name, time=0)))
88
89 for abs_tick, note, vel in notes or []:
90 events.append((abs_tick, mido.Message("note_on", note=note, velocity=vel, time=0)))
91 events.append((abs_tick + 120, mido.Message("note_off", note=note, velocity=0, time=0)))
92 for abs_tick, pitch in pitchwheel or []:
93 events.append((abs_tick, mido.Message("pitchwheel", pitch=pitch, time=0)))
94 for abs_tick, ctrl, val in control_change or []:
95 events.append((abs_tick, mido.Message("control_change", control=ctrl, value=val, time=0)))
96 for abs_tick, pressure in channel_pressure or []:
97 events.append((abs_tick, mido.Message("aftertouch", value=pressure, time=0)))
98 for abs_tick, note, pressure in poly_aftertouch or []:
99 events.append((abs_tick, mido.Message("polytouch", note=note, value=pressure, time=0)))
100 for abs_tick, prog in program_change or []:
101 events.append((abs_tick, mido.Message("program_change", program=prog, time=0)))
102
103 events.sort(key=lambda x: x[0])
104 prev = 0
105 for abs_tick, msg in events:
106 delta = abs_tick - prev
107 track.append(msg.copy(time=delta))
108 prev = abs_tick
109
110 track.append(mido.MetaMessage("end_of_track", time=0))
111 mid.tracks.append(track)
112 buf = io.BytesIO()
113 mid.save(file=buf)
114 return buf.getvalue()
115
116
117 def _empty_midi() -> bytes:
118 return _make_midi()
119
120
121 _NO_ATTRS: list[AttributeRule] = []
122
123
124 def _ours_rule(path: str) -> list[AttributeRule]:
125 return [AttributeRule(path_pattern=path, dimension="*", strategy="ours")]
126
127
128 # ---------------------------------------------------------------------------
129 # Dimension count
130 # ---------------------------------------------------------------------------
131
132
133 class TestDimensionCount:
134 def test_exactly_21_internal_dims(self) -> None:
135 assert len(INTERNAL_DIMS) == 21
136
137 def test_all_dims_present_in_extracted_dimensions(self) -> None:
138 dims = extract_dimensions(_empty_midi())
139 for d in INTERNAL_DIMS:
140 assert d in dims.slices, f"Missing dimension: {d}"
141
142 def test_non_independent_dims_subset_of_internal(self) -> None:
143 for d in NON_INDEPENDENT_DIMS:
144 assert d in INTERNAL_DIMS
145
146
147 # ---------------------------------------------------------------------------
148 # extract_dimensions correctness per event type
149 # ---------------------------------------------------------------------------
150
151
152 class TestExtractDimensionsPerType:
153 def test_notes_extracted(self) -> None:
154 midi = _make_midi(notes=[(0, 60, 80)])
155 dims = extract_dimensions(midi)
156 assert len(dims.slices["notes"].events) > 0
157
158 def test_pitch_bend_extracted(self) -> None:
159 midi = _make_midi(pitchwheel=[(0, 4096)])
160 dims = extract_dimensions(midi)
161 assert len(dims.slices["pitch_bend"].events) > 0
162
163 def test_channel_pressure_extracted(self) -> None:
164 midi = _make_midi(channel_pressure=[(0, 64)])
165 dims = extract_dimensions(midi)
166 assert len(dims.slices["channel_pressure"].events) > 0
167
168 def test_poly_pressure_extracted(self) -> None:
169 midi = _make_midi(poly_aftertouch=[(0, 60, 64)])
170 dims = extract_dimensions(midi)
171 assert len(dims.slices["poly_pressure"].events) > 0
172
173 def test_program_change_extracted(self) -> None:
174 midi = _make_midi(program_change=[(0, 25)])
175 dims = extract_dimensions(midi)
176 assert len(dims.slices["program_change"].events) > 0
177
178 def test_marker_extracted(self) -> None:
179 midi = _make_midi(marker="Chorus")
180 dims = extract_dimensions(midi)
181 assert len(dims.slices["markers"].events) > 0
182
183 def test_track_name_extracted(self) -> None:
184 midi = _make_midi(track_name="Piano")
185 dims = extract_dimensions(midi)
186 assert len(dims.slices["track_structure"].events) > 0
187
188 @pytest.mark.parametrize("cc_num,expected_dim", [
189 (1, "cc_modulation"),
190 (7, "cc_volume"),
191 (10, "cc_pan"),
192 (11, "cc_expression"),
193 (64, "cc_sustain"),
194 (65, "cc_portamento"),
195 (66, "cc_sostenuto"),
196 (67, "cc_soft_pedal"),
197 (91, "cc_reverb"),
198 (93, "cc_chorus"),
199 (20, "cc_other"), # CC 20 is "other"
200 (100, "cc_other"), # CC 100 is "other"
201 ])
202 def test_cc_event_classified_to_correct_dimension(self, cc_num: int, expected_dim: str) -> None:
203 midi = _make_midi(control_change=[(0, cc_num, 64)])
204 dims = extract_dimensions(midi)
205 assert len(dims.slices[expected_dim].events) > 0
206
207 def test_empty_midi_all_dims_present_with_empty_events(self) -> None:
208 midi = _empty_midi()
209 dims = extract_dimensions(midi)
210 for d in INTERNAL_DIMS:
211 assert d in dims.slices
212
213
214 # ---------------------------------------------------------------------------
215 # dimension_conflict_detail correctness
216 # ---------------------------------------------------------------------------
217
218
219 class TestDimensionConflictDetail:
220 def test_all_unchanged_when_identical(self) -> None:
221 midi = _make_midi(notes=[(0, 60, 80)])
222 dims = extract_dimensions(midi)
223 detail = dimension_conflict_detail(dims, dims, dims)
224 for d in INTERNAL_DIMS:
225 assert detail[d] == "unchanged", f"Expected unchanged for {d}, got {detail[d]}"
226
227 def test_left_only_change_detected(self) -> None:
228 base_midi = _empty_midi()
229 left_midi = _make_midi(notes=[(0, 60, 80)])
230 base_dims = extract_dimensions(base_midi)
231 left_dims = extract_dimensions(left_midi)
232 detail = dimension_conflict_detail(base_dims, left_dims, base_dims)
233 assert detail["notes"] == "left_only"
234
235 def test_right_only_change_detected(self) -> None:
236 base_midi = _empty_midi()
237 right_midi = _make_midi(pitchwheel=[(0, 1000)])
238 base_dims = extract_dimensions(base_midi)
239 right_dims = extract_dimensions(right_midi)
240 detail = dimension_conflict_detail(base_dims, base_dims, right_dims)
241 assert detail["pitch_bend"] == "right_only"
242
243 def test_bilateral_conflict_detected(self) -> None:
244 base_midi = _empty_midi()
245 left_midi = _make_midi(notes=[(0, 60, 80)])
246 right_midi = _make_midi(notes=[(0, 64, 80)])
247 base_dims = extract_dimensions(base_midi)
248 left_dims = extract_dimensions(left_midi)
249 right_dims = extract_dimensions(right_midi)
250 detail = dimension_conflict_detail(base_dims, left_dims, right_dims)
251 assert detail["notes"] == "both"
252
253 def test_independent_cc_dims_dont_cross_contaminate(self) -> None:
254 """Changing CC1 on left and CC7 on right → each in its own dimension."""
255 base = _empty_midi()
256 left = _make_midi(control_change=[(0, 1, 100)]) # modulation
257 right = _make_midi(control_change=[(0, 7, 80)]) # volume
258 b = extract_dimensions(base)
259 l = extract_dimensions(left)
260 r = extract_dimensions(right)
261 detail = dimension_conflict_detail(b, l, r)
262 assert detail["cc_modulation"] == "left_only"
263 assert detail["cc_volume"] == "right_only"
264 # Notes should be unchanged.
265 assert detail["notes"] == "unchanged"
266
267
268 # ---------------------------------------------------------------------------
269 # merge_midi_dimensions — clean auto-merge per dimension
270 # ---------------------------------------------------------------------------
271
272
273 class TestCleanMergePerDimension:
274 def test_notes_left_only_auto_merges(self) -> None:
275 base = _empty_midi()
276 left = _make_midi(notes=[(0, 60, 80)])
277 result = merge_midi_dimensions(base, left, base, _NO_ATTRS, "test.mid")
278 assert result is not None
279 merged_bytes, report = result
280 assert report.get("notes") in ("left", "left_only", "base", None) or "notes" in report
281
282 def test_pitchwheel_right_only_auto_merges(self) -> None:
283 base = _empty_midi()
284 right = _make_midi(pitchwheel=[(0, 2000)])
285 result = merge_midi_dimensions(base, base, right, _NO_ATTRS, "test.mid")
286 assert result is not None
287
288 def test_cc_modulation_independent_of_cc_volume(self) -> None:
289 """Left edits CC1 (modulation), right edits CC7 (volume) — must auto-merge."""
290 base = _empty_midi()
291 left = _make_midi(control_change=[(0, 1, 100)])
292 right = _make_midi(control_change=[(0, 7, 80)])
293 result = merge_midi_dimensions(base, left, right, _NO_ATTRS, "test.mid")
294 assert result is not None
295
296 @pytest.mark.parametrize("cc_left,cc_right", [
297 (1, 7), (1, 10), (1, 11), (1, 64), (1, 91),
298 (7, 10), (7, 64), (10, 91), (64, 93),
299 ])
300 def test_independent_cc_pairs_auto_merge(self, cc_left: int, cc_right: int) -> None:
301 """Every pair of distinct named CCs can be changed independently."""
302 base = _empty_midi()
303 left = _make_midi(control_change=[(0, cc_left, 64)])
304 right = _make_midi(control_change=[(0, cc_right, 64)])
305 result = merge_midi_dimensions(base, left, right, _NO_ATTRS, "test.mid")
306 assert result is not None, f"CC{cc_left} vs CC{cc_right} should auto-merge"
307
308 def test_notes_and_pitchwheel_independently_auto_merge(self) -> None:
309 """Left adds notes; right adds pitchwheel — must auto-merge."""
310 base = _empty_midi()
311 left = _make_midi(notes=[(0, 60, 80)])
312 right = _make_midi(pitchwheel=[(0, 500)])
313 result = merge_midi_dimensions(base, left, right, _NO_ATTRS, "test.mid")
314 assert result is not None
315
316 def test_program_change_independent_of_notes(self) -> None:
317 base = _empty_midi()
318 left = _make_midi(notes=[(0, 60, 80)])
319 right = _make_midi(program_change=[(0, 25)])
320 result = merge_midi_dimensions(base, left, right, _NO_ATTRS, "test.mid")
321 assert result is not None
322
323
324 # ---------------------------------------------------------------------------
325 # merge_midi_dimensions — conflict resolution with strategy
326 # ---------------------------------------------------------------------------
327
328
329 class TestConflictResolutionStrategies:
330 def test_bilateral_notes_conflict_with_ours_strategy(self) -> None:
331 base = _empty_midi()
332 left = _make_midi(notes=[(0, 60, 80)])
333 right = _make_midi(notes=[(0, 64, 80)])
334 result = merge_midi_dimensions(base, left, right, _ours_rule("test.mid"), "test.mid")
335 # With "ours" strategy, conflict is resolved in favour of left.
336 assert result is not None
337
338 def test_bilateral_notes_conflict_no_strategy_returns_none(self) -> None:
339 base = _empty_midi()
340 left = _make_midi(notes=[(0, 60, 80)])
341 right = _make_midi(notes=[(0, 64, 80)])
342 result = merge_midi_dimensions(base, left, right, _NO_ATTRS, "test.mid")
343 assert result is None
344
345 def test_non_independent_tempo_conflict_blocks_merge(self) -> None:
346 """tempo_map bilateral conflict → entire merge blocked."""
347 base = _empty_midi()
348 left = _make_midi(set_tempo=400_000)
349 right = _make_midi(set_tempo=600_000)
350 result = merge_midi_dimensions(base, left, right, _NO_ATTRS, "test.mid")
351 assert result is None
352
353
354 # ---------------------------------------------------------------------------
355 # Large sequence stress tests
356 # ---------------------------------------------------------------------------
357
358
359 class TestLargeSequenceStress:
360 def test_100_notes_extract_dimension(self) -> None:
361 notes = [(i * 480, (60 + i % 12), 80) for i in range(100)]
362 midi = _make_midi(notes=notes)
363 dims = extract_dimensions(midi)
364 # Each note generates a note_on and note_off → ≥200 events.
365 assert len(dims.slices["notes"].events) >= 200
366
367 def test_many_cc_events_all_classified(self) -> None:
368 """50 CC events across multiple controllers all classified correctly."""
369 ccs = [(i * 10, cc, i % 127) for i, cc in enumerate([1, 7, 10, 11, 64] * 10)]
370 midi = _make_midi(control_change=ccs)
371 dims = extract_dimensions(midi)
372 total = sum(len(dims.slices[d].events) for d in INTERNAL_DIMS)
373 assert total >= len(ccs)
374
375 def test_all_21_dimensions_touched_and_auto_merge(self) -> None:
376 """Base empty; left touches notes, right touches pitch_bend — 21 dims all present."""
377 base = _empty_midi()
378 left = _make_midi(notes=[(0, 60, 80)])
379 right = _make_midi(pitchwheel=[(0, 1000)])
380 result = merge_midi_dimensions(base, left, right, _NO_ATTRS, "stress.mid")
381 assert result is not None
382 merged_bytes, report = result
383 assert len(merged_bytes) > 0
384
385 def test_hash_stability_empty_dimension(self) -> None:
386 """Hash of an empty dimension must be stable across calls."""
387 midi = _empty_midi()
388 d1 = extract_dimensions(midi)
389 d2 = extract_dimensions(midi)
390 for dim in INTERNAL_DIMS:
391 assert d1.slices[dim].content_hash == d2.slices[dim].content_hash
392
393 def test_merged_output_is_valid_midi(self) -> None:
394 """The bytes returned by merge_midi_dimensions must parse as valid MIDI."""
395 base = _empty_midi()
396 left = _make_midi(notes=[(0, 60, 80)])
397 right = _make_midi(pitchwheel=[(0, 500)])
398 result = merge_midi_dimensions(base, left, right, _NO_ATTRS, "test.mid")
399 assert result is not None
400 merged_bytes, _ = result
401 # Should not raise.
402 parsed = mido.MidiFile(file=io.BytesIO(merged_bytes))
403 assert parsed.ticks_per_beat > 0
404
405
406 # ---------------------------------------------------------------------------
407 # dimension_conflict_detail — all 21 dimensions
408 # ---------------------------------------------------------------------------
409
410
411 class TestAllDimensionConflictDetail:
412 """Verify every dimension can independently report unchanged/left_only/right_only/bilateral."""
413
414 @pytest.mark.parametrize("dim,left_kwargs,right_kwargs", [
415 ("notes",
416 {"notes": [(0, 60, 80)]},
417 {"notes": [(0, 64, 80)]}),
418 ("pitch_bend",
419 {"pitchwheel": [(0, 1000)]},
420 {"pitchwheel": [(0, -1000)]}),
421 ("channel_pressure",
422 {"channel_pressure": [(0, 80)]},
423 {"channel_pressure": [(0, 40)]}),
424 ("poly_pressure",
425 {"poly_aftertouch": [(0, 60, 80)]},
426 {"poly_aftertouch": [(0, 60, 40)]}),
427 ("cc_modulation",
428 {"control_change": [(0, 1, 100)]},
429 {"control_change": [(0, 1, 50)]}),
430 ("cc_volume",
431 {"control_change": [(0, 7, 100)]},
432 {"control_change": [(0, 7, 50)]}),
433 ("cc_pan",
434 {"control_change": [(0, 10, 64)]},
435 {"control_change": [(0, 10, 32)]}),
436 ("cc_expression",
437 {"control_change": [(0, 11, 100)]},
438 {"control_change": [(0, 11, 50)]}),
439 ("cc_sustain",
440 {"control_change": [(0, 64, 127)]},
441 {"control_change": [(0, 64, 0)]}),
442 ("cc_portamento",
443 {"control_change": [(0, 65, 127)]},
444 {"control_change": [(0, 65, 0)]}),
445 ("cc_sostenuto",
446 {"control_change": [(0, 66, 127)]},
447 {"control_change": [(0, 66, 0)]}),
448 ("cc_soft_pedal",
449 {"control_change": [(0, 67, 127)]},
450 {"control_change": [(0, 67, 0)]}),
451 ("cc_reverb",
452 {"control_change": [(0, 91, 80)]},
453 {"control_change": [(0, 91, 40)]}),
454 ("cc_chorus",
455 {"control_change": [(0, 93, 80)]},
456 {"control_change": [(0, 93, 40)]}),
457 ("program_change",
458 {"program_change": [(0, 10)]},
459 {"program_change": [(0, 20)]}),
460 ])
461 def test_bilateral_conflict_per_dimension(
462 self, dim: str, left_kwargs: MidiKwargs, right_kwargs: MidiKwargs
463 ) -> None:
464 base = _empty_midi()
465 left = _make_midi(**left_kwargs)
466 right = _make_midi(**right_kwargs)
467 b = extract_dimensions(base)
468 l = extract_dimensions(left)
469 r = extract_dimensions(right)
470 detail = dimension_conflict_detail(b, l, r)
471 assert detail[dim] == "both", (
472 f"Expected bilateral_conflict for {dim}, got {detail[dim]}"
473 )
File History 4 commits
sha256:81ae324db5ad375fbfe4834c6fcb378312cafad3cc92dec5d3e5c427306621a2 fix: remove commit_exists filter from have anchors — server… Sonnet 4.6 patch 21 days ago
sha256:36c3cb3e76619d4c30a6d9bf81b5ec4ff148e30dcfed913e3114ca7b43b81c7e fix: rename objects→blobs in push client and all stale test… Sonnet 4.6 patch 22 days ago
sha256:c06a9b9b9fee26c68ea725b44d54b2c0a171301ce9de746d5b656617b4463a9a fix: repair four test failures from post-migration audit Sonnet 4.6 patch 28 days ago
sha256:1900655993c83c4107067375548a7be823e471d2515830842f1a12cba4bd3cdf fix: unified object store migration — idempotent writes, JS… Sonnet 4.6 minor 29 days ago