"""Property-based tests for the three-way merge engine invariants. These tests use Hypothesis to generate random manifests and DAG shapes and verify mathematical invariants that must hold for ANY input — not just the specific inputs we thought to write as unit tests. Why these tests? ---------------- The silent-drop bug (fixed in 73427a30) survived all existing tests because the existing tests only exercised SPECIFIC manifests. A property test with the invariant "theirs-only files must survive merge" would have caught it immediately: Hypothesis would have generated a manifest where theirs added a file that ours didn't touch, run apply_merge, and found the file absent. Invariants tested ----------------- M1 Theirs-only additions survive in merged manifest. M2 Ours-only additions survive in merged manifest. M3 Theirs-only deletions are applied in merged manifest. M4 Ours-only deletions are applied in merged manifest. M5 Conflict paths are exactly the intersection of ours_changed and theirs_changed. M6 Conflict paths are absent from apply_merge output (caller resolves them). M7 Non-conflicting paths from both sides are correct in output. M8 Identical content on both sides is never a conflict in diff_snapshots. M9 apply_merge is idempotent when ours == theirs (convergence). M10 merged result contains no paths from conflict_paths when they exist. LCA invariants -------------- L1 LCA(A, A) == A. L2 LCA(A, B) is an ancestor of A (reachable from A). L3 LCA(A, B) is an ancestor of B (reachable from B). L4 LCA(A, B) == LCA(B, A) — commutativity. L5 If A is ancestor of B, LCA(A, B) == A. Snapshot integrity invariant ---------------------------- SI1 Every object_id referenced in a merged manifest exists in the object store. """ from __future__ import annotations import datetime import json import pathlib import pytest from hypothesis import HealthCheck, given, settings from hypothesis import strategies as st from muse.core.merge_engine import ( apply_merge, detect_conflicts, diff_snapshots, find_merge_base, ) from muse.core.types import Manifest, fake_id, blob_id from muse.core.paths import head_path, heads_dir, muse_dir # --------------------------------------------------------------------------- # Strategies — building blocks for random manifests and DAGs # --------------------------------------------------------------------------- def _h(label: str) -> str: return fake_id(label) # A path looks like "src/module_3.py" or "README.md". _path_strategy = st.text( alphabet=st.characters(whitelist_categories=("Ll", "Lu", "Nd"), whitelist_characters="/_-."), min_size=1, max_size=32, ).filter(lambda s: "/" not in s or not s.startswith("/")) # A content hash is a 64-char hex string (we use sha256 of a label for uniqueness). _hash_strategy = st.text( alphabet="0123456789abcdef", min_size=64, max_size=64, ) # A manifest is a dict of path → hash with at most 40 entries. _manifest_strategy = st.dictionaries( keys=_path_strategy, values=_hash_strategy, min_size=0, max_size=40, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _linear_repo( tmp_path: pathlib.Path, ) -> tuple[pathlib.Path, str]: """Create a minimal code-domain Muse repo for LCA tests.""" from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) dot_muse = muse_dir(tmp_path) dot_muse.mkdir(exist_ok=True) repo_id = fake_id("repo") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": repo_id, "domain": "code", "default_branch": "main"}), encoding="utf-8", ) (dot_muse / "refs" / "heads").mkdir(parents=True, exist_ok=True) (dot_muse / "snapshots").mkdir(exist_ok=True) (dot_muse / "commits").mkdir(exist_ok=True) (dot_muse / "objects").mkdir(exist_ok=True) return tmp_path, repo_id def _make_commit_lca( root: pathlib.Path, repo_id: str, manifest: Manifest, message: str = "c", parent_id: str | None = None, parent2_id: str | None = None, ) -> str: from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) snap_id = compute_snapshot_id(manifest) committed_at = datetime.datetime.now(datetime.timezone.utc) parent_ids: list[str] = [] if parent_id: parent_ids.append(parent_id) if parent2_id: parent_ids.append(parent2_id) commit_id = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=message, committed_at_iso=committed_at.isoformat(), ) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) write_commit(root, CommitRecord( commit_id=commit_id, branch="main", snapshot_id=snap_id, message=message, committed_at=committed_at, parent_commit_id=parent_id, parent2_commit_id=parent2_id, )) return commit_id # =========================================================================== # M — apply_merge / diff_snapshots / detect_conflicts invariants # =========================================================================== class TestMergeEngineInvariantsM: """Property-based invariants for the pure merge functions.""" @given( base=_manifest_strategy, extra_theirs=_manifest_strategy, ) @settings(max_examples=200, suppress_health_check=[HealthCheck.too_slow]) def test_M1_theirs_only_additions_survive( self, base: Manifest, extra_theirs: Manifest ) -> None: """M1: every file theirs adds (not in base, not touched by ours) is in merged.""" # ours: no changes from base ours = dict(base) # theirs: base + extra_theirs (disjoint new keys only) theirs = dict(base) new_keys = {k: v for k, v in extra_theirs.items() if k not in base} theirs.update(new_keys) ours_changed = diff_snapshots(base, ours) theirs_changed = diff_snapshots(base, theirs) conflicts = detect_conflicts(ours_changed, theirs_changed, ours, theirs) merged = apply_merge(base, ours, theirs, ours_changed, theirs_changed, conflicts) for path, obj_id in new_keys.items(): assert path in merged, ( f"M1 VIOLATED: theirs-only addition '{path}' absent from merged.\n" f"base keys: {set(base)}\n" f"extra_theirs keys: {set(new_keys)}\n" f"merged keys: {set(merged)}" ) assert merged[path] == obj_id, ( f"M1 VIOLATED: theirs-only addition '{path}' has wrong hash in merged." ) @given( base=_manifest_strategy, extra_ours=_manifest_strategy, ) @settings(max_examples=200, suppress_health_check=[HealthCheck.too_slow]) def test_M2_ours_only_additions_survive( self, base: Manifest, extra_ours: Manifest ) -> None: """M2: every file ours adds (not in base, not touched by theirs) is in merged.""" ours = dict(base) new_keys = {k: v for k, v in extra_ours.items() if k not in base} ours.update(new_keys) theirs = dict(base) ours_changed = diff_snapshots(base, ours) theirs_changed = diff_snapshots(base, theirs) conflicts = detect_conflicts(ours_changed, theirs_changed, ours, theirs) merged = apply_merge(base, ours, theirs, ours_changed, theirs_changed, conflicts) for path, obj_id in new_keys.items(): assert path in merged, f"M2 VIOLATED: ours-only addition '{path}' absent from merged." assert merged[path] == obj_id @given(base=_manifest_strategy, del_keys=st.frozensets(st.text(min_size=1, max_size=20))) @settings(max_examples=200, suppress_health_check=[HealthCheck.too_slow]) def test_M3_theirs_only_deletions_applied( self, base: Manifest, del_keys: frozenset[str] ) -> None: """M3: files theirs deletes (and ours does not touch) are absent from merged.""" to_delete = {k for k in del_keys if k in base} if not to_delete: return # no relevant keys to test ours = dict(base) theirs = {k: v for k, v in base.items() if k not in to_delete} ours_changed = diff_snapshots(base, ours) theirs_changed = diff_snapshots(base, theirs) conflicts = detect_conflicts(ours_changed, theirs_changed, ours, theirs) merged = apply_merge(base, ours, theirs, ours_changed, theirs_changed, conflicts) for path in to_delete: assert path not in merged, ( f"M3 VIOLATED: theirs-only deletion '{path}' still present in merged." ) @given(base=_manifest_strategy, del_keys=st.frozensets(st.text(min_size=1, max_size=20))) @settings(max_examples=200, suppress_health_check=[HealthCheck.too_slow]) def test_M4_ours_only_deletions_applied( self, base: Manifest, del_keys: frozenset[str] ) -> None: """M4: files ours deletes (and theirs does not touch) are absent from merged.""" to_delete = {k for k in del_keys if k in base} if not to_delete: return theirs = dict(base) ours = {k: v for k, v in base.items() if k not in to_delete} ours_changed = diff_snapshots(base, ours) theirs_changed = diff_snapshots(base, theirs) conflicts = detect_conflicts(ours_changed, theirs_changed, ours, theirs) merged = apply_merge(base, ours, theirs, ours_changed, theirs_changed, conflicts) for path in to_delete: assert path not in merged, ( f"M4 VIOLATED: ours-only deletion '{path}' still present in merged." ) @given(base=_manifest_strategy, ours=_manifest_strategy, theirs=_manifest_strategy) @settings(max_examples=300, suppress_health_check=[HealthCheck.too_slow]) def test_M5_conflicts_are_divergent_intersection( self, base: Manifest, ours: Manifest, theirs: Manifest, ) -> None: """M5: detect_conflicts returns exactly paths where both changed AND disagree. Formally: conflicts = {p ∈ ours_changed ∩ theirs_changed | ours.get(p) ≠ theirs.get(p)}. Convergent changes — both deleted the same file (both .get(p) == None), or both added/modified to the same hash — are NOT conflicts. The old invariant that conflicts == ours_changed ∩ theirs_changed was incorrect for these cases. """ ours_changed = diff_snapshots(base, ours) theirs_changed = diff_snapshots(base, theirs) conflicts = detect_conflicts(ours_changed, theirs_changed, ours, theirs) expected = { p for p in ours_changed & theirs_changed if ours.get(p) != theirs.get(p) } assert conflicts == expected, ( f"M5 VIOLATED: conflict set {conflicts!r} != expected {expected!r}.\n" f" ours_changed={ours_changed!r}\n" f" theirs_changed={theirs_changed!r}" ) @given(base=_manifest_strategy, ours=_manifest_strategy, theirs=_manifest_strategy) @settings(max_examples=200, suppress_health_check=[HealthCheck.too_slow]) def test_M6_conflict_paths_absent_from_apply_merge( self, base: Manifest, ours: Manifest, theirs: Manifest, ) -> None: """M6: apply_merge never writes conflict paths — callers resolve them.""" ours_changed = diff_snapshots(base, ours) theirs_changed = diff_snapshots(base, theirs) conflicts = detect_conflicts(ours_changed, theirs_changed, ours, theirs) merged = apply_merge(base, ours, theirs, ours_changed, theirs_changed, conflicts) # Conflict paths must not be in merged at a value taken from ours or theirs # (they stay at base or absent). The strict invariant: a conflict path # must have been either absent from base (so it should be absent in merged) # or present at base's value. for path in conflicts: if path in merged: # If present, it must be at base's value (not ours or theirs override). assert merged[path] == base.get(path), ( f"M6 VIOLATED: conflict path '{path}' in merged at non-base value." ) @given(base=_manifest_strategy, ours=_manifest_strategy, theirs=_manifest_strategy) @settings(max_examples=200, suppress_health_check=[HealthCheck.too_slow]) def test_M7_non_conflicting_paths_correct_in_output( self, base: Manifest, ours: Manifest, theirs: Manifest, ) -> None: """M7: non-conflicting ours-only changes are at ours value; theirs-only at theirs value.""" ours_changed = diff_snapshots(base, ours) theirs_changed = diff_snapshots(base, theirs) conflicts = detect_conflicts(ours_changed, theirs_changed, ours, theirs) merged = apply_merge(base, ours, theirs, ours_changed, theirs_changed, conflicts) for path in ours_changed - conflicts: if path in ours: assert merged.get(path) == ours[path], ( f"M7 VIOLATED: ours-only change '{path}' not at ours value in merged." ) else: assert path not in merged, ( f"M7 VIOLATED: ours-only deletion '{path}' still present in merged." ) for path in theirs_changed - conflicts: if path in theirs: assert merged.get(path) == theirs[path], ( f"M7 VIOLATED: theirs-only change '{path}' not at theirs value in merged." ) else: assert path not in merged, ( f"M7 VIOLATED: theirs-only deletion '{path}' still present in merged." ) @given(base=_manifest_strategy, common_changes=_manifest_strategy) @settings(max_examples=200, suppress_health_check=[HealthCheck.too_slow]) def test_M8_convergent_modify_auto_resolves( self, base: Manifest, common_changes: Manifest ) -> None: """M8: when both branches independently arrive at the same hash, no conflict fires and the merged manifest contains the path at the agreed hash. This is the canonical convergent-change invariant: identical outcomes are not conflicts regardless of whether both sides changed the path. """ shared_hash = _h("shared-content") path = "shared.py" base_with = dict(base) base_with[path] = _h("original") ours = dict(base_with) ours[path] = shared_hash theirs = dict(base_with) theirs[path] = shared_hash ours_changed = diff_snapshots(base_with, ours) theirs_changed = diff_snapshots(base_with, theirs) conflicts = detect_conflicts(ours_changed, theirs_changed, ours, theirs) merged = apply_merge(base_with, ours, theirs, ours_changed, theirs_changed, conflicts) # Convergent change: detect_conflicts must NOT flag 'path'. assert path not in conflicts, ( f"M8 VIOLATED: convergent path '{path}' (same hash on both sides) " f"wrongly reported as conflict." ) # apply_merge must include 'path' at the agreed hash. assert merged.get(path) == shared_hash, ( f"M8 VIOLATED: merged['{path}'] = {merged.get(path)!r}, " f"expected {shared_hash!r}." ) @given(base=_manifest_strategy, changes=_manifest_strategy) @settings(max_examples=200, suppress_health_check=[HealthCheck.too_slow]) def test_M9_merge_ours_equals_theirs_is_convergent( self, base: Manifest, changes: Manifest ) -> None: """M9: when ours == theirs, apply_merge produces ours exactly (no-conflict convergence).""" ours = dict(changes) theirs = dict(changes) ours_changed = diff_snapshots(base, ours) theirs_changed = diff_snapshots(base, theirs) conflicts = detect_conflicts(ours_changed, theirs_changed, ours, theirs) merged = apply_merge(base, ours, theirs, ours_changed, theirs_changed, conflicts) # Every non-conflicting path must be at ours (== theirs) value. for path in set(ours) | set(theirs): if path in conflicts: continue expected = ours.get(path) if expected is not None: assert merged.get(path) == expected, ( f"M9 VIOLATED: ours == theirs but merged[{path!r}] != ours[{path!r}]." ) @given(base=_manifest_strategy, ours=_manifest_strategy, theirs=_manifest_strategy) @settings(max_examples=200, suppress_health_check=[HealthCheck.too_slow]) def test_M10_untouched_base_paths_preserved( self, base: Manifest, ours: Manifest, theirs: Manifest ) -> None: """M10: files neither branch touched remain in merged at base value.""" ours_changed = diff_snapshots(base, ours) theirs_changed = diff_snapshots(base, theirs) conflicts = detect_conflicts(ours_changed, theirs_changed, ours, theirs) merged = apply_merge(base, ours, theirs, ours_changed, theirs_changed, conflicts) untouched = set(base) - ours_changed - theirs_changed for path in untouched: assert merged.get(path) == base[path], ( f"M10 VIOLATED: untouched path '{path}' changed value in merged." ) # =========================================================================== # L — LCA / find_merge_base invariants # =========================================================================== class TestLCAInvariantsL: """Property-based invariants for find_merge_base (Lowest Common Ancestor).""" @pytest.fixture def repo(self, tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]: return _linear_repo(tmp_path) def test_L1_lca_of_same_commit_is_itself( self, tmp_path: pathlib.Path ) -> None: """L1: LCA(X, X) == X for any commit X.""" root, repo_id = _linear_repo(tmp_path) c = _make_commit_lca(root, repo_id, {}, "root") lca = find_merge_base(root, c, c) assert lca == c, f"L1: LCA({c[:8]}, {c[:8]}) should be {c[:8]}, got {lca}" def test_L2_L3_lca_is_ancestor_of_both(self, tmp_path: pathlib.Path) -> None: """L2+L3: LCA(A,B) must be in the ancestry of both A and B.""" root, repo_id = _linear_repo(tmp_path) c0 = _make_commit_lca(root, repo_id, {}, "base") c1 = _make_commit_lca(root, repo_id, {"a.py": _h("a1")}, "ours", parent_id=c0) c2 = _make_commit_lca(root, repo_id, {"b.py": _h("b1")}, "theirs", parent_id=c0) lca = find_merge_base(root, c1, c2) assert lca is not None, "LCA of two commits with common ancestor must exist" from muse.core.commits import read_commit def _ancestors(start: str) -> set[str]: visited: set[str] = set() q = [start] while q: cid = q.pop() if cid in visited: continue visited.add(cid) commit = read_commit(root, cid) if commit is None: continue if commit.parent_commit_id: q.append(commit.parent_commit_id) if commit.parent2_commit_id: q.append(commit.parent2_commit_id) return visited assert lca in _ancestors(c1), f"L2: LCA {lca[:8]} not in ancestry of A {c1[:8]}" assert lca in _ancestors(c2), f"L3: LCA {lca[:8]} not in ancestry of B {c2[:8]}" def test_L4_lca_commutativity(self, tmp_path: pathlib.Path) -> None: """L4: LCA(A, B) == LCA(B, A).""" root, repo_id = _linear_repo(tmp_path) c0 = _make_commit_lca(root, repo_id, {}, "base") c1 = _make_commit_lca(root, repo_id, {"a.py": _h("a1")}, "c1", parent_id=c0) c2 = _make_commit_lca(root, repo_id, {"b.py": _h("b1")}, "c2", parent_id=c0) lca_ab = find_merge_base(root, c1, c2) lca_ba = find_merge_base(root, c2, c1) assert lca_ab == lca_ba, ( f"L4: LCA({c1[:8]}, {c2[:8]}) = {lca_ab}, " f"LCA({c2[:8]}, {c1[:8]}) = {lca_ba} — not commutative" ) def test_L5_if_a_is_ancestor_of_b_lca_is_a(self, tmp_path: pathlib.Path) -> None: """L5: if A is ancestor of B, LCA(A, B) == A.""" root, repo_id = _linear_repo(tmp_path) c0 = _make_commit_lca(root, repo_id, {}, "base") c1 = _make_commit_lca(root, repo_id, {"a.py": _h("a1")}, "c1", parent_id=c0) c2 = _make_commit_lca(root, repo_id, {"b.py": _h("b1")}, "c2", parent_id=c1) lca = find_merge_base(root, c0, c2) assert lca == c0, f"L5: c0 is ancestor of c2 so LCA should be c0, got {lca}" lca2 = find_merge_base(root, c1, c2) assert lca2 == c1, f"L5: c1 is ancestor of c2 so LCA should be c1, got {lca2}" def test_L6_lca_for_merge_commit_topology(self, tmp_path: pathlib.Path) -> None: """L6: diamond topology — LCA of the two branches is the fork point, not deeper.""" root, repo_id = _linear_repo(tmp_path) base = _make_commit_lca(root, repo_id, {}, "base") a = _make_commit_lca(root, repo_id, {"a.py": _h("a")}, "a", parent_id=base) b = _make_commit_lca(root, repo_id, {"b.py": _h("b")}, "b", parent_id=base) # Merge commit of a and b m = _make_commit_lca( root, repo_id, {"a.py": _h("a"), "b.py": _h("b")}, "merge", parent_id=a, parent2_id=b ) # LCA of a and m should be a (m is a descendant of a). assert find_merge_base(root, a, m) == a, "L6a" # LCA of b and m should be b. assert find_merge_base(root, b, m) == b, "L6b" # LCA of base and m should be base. assert find_merge_base(root, base, m) == base, "L6c" # =========================================================================== # SI — Snapshot integrity invariants # =========================================================================== class TestSnapshotIntegritySI: """After every merge, every object_id in the snapshot must be in the store.""" def _object_exists(self, root: pathlib.Path, obj_id: str) -> bool: """Return True if obj_id is readable from the object store.""" from muse.core.object_store import read_object return read_object(root, obj_id) is not None def test_SI1_fast_forward_snapshot_objects_all_present( self, tmp_path: pathlib.Path ) -> None: """SI1: after fast-forward, every object in the new snapshot is in the store.""" from muse.core.object_store import write_object from muse.core.commits import read_commit from muse.core.snapshots import read_snapshot root, repo_id = _linear_repo(tmp_path) (head_path(root)).write_text("ref: refs/heads/main", encoding="utf-8") # Write real objects to the store. contents = {f"file_{i}.py": f"x = {i}\n".encode() for i in range(10)} manifest: Manifest = {} for path, data in contents.items(): obj_id = blob_id(data) write_object(root, obj_id, data) manifest[path] = obj_id base_c = _make_commit_lca(root, repo_id, {}, "base") # Write feat branch with the manifest. (heads_dir(root) / "main").write_text(base_c, encoding="utf-8") feat_c = _make_commit_lca(root, repo_id, manifest, "feat", parent_id=base_c) (heads_dir(root) / "feat").write_text(feat_c, encoding="utf-8") # Run merge (fast-forward). from tests.cli_test_helper import CliRunner runner = CliRunner() env = {"MUSE_REPO_ROOT": str(root)} runner.invoke(None, ["merge", "--force", "feat"], env=env, catch_exceptions=False) # Verify every object in main's snapshot is in the store. main_head = (heads_dir(root) / "main").read_text().strip() commit = read_commit(root, main_head) assert commit is not None snap = read_snapshot(root, commit.snapshot_id) assert snap is not None for path, obj_id in snap.manifest.items(): assert self._object_exists(root, obj_id), ( f"SI1 VIOLATED: blob {obj_id[:8]} for '{path}' missing from store " f"after fast-forward merge." ) def test_SI2_apply_merge_output_all_hashes_from_inputs( self, tmp_path: pathlib.Path ) -> None: """SI2: every hash in apply_merge output came from base, ours, or theirs. This ensures apply_merge never invents a hash. Any invented hash would reference a non-existent object and corrupt the repo on checkout. """ base = {"a.py": _h("a-base"), "b.py": _h("b-base"), "c.py": _h("c-base")} ours = {"a.py": _h("a-ours"), "b.py": _h("b-base"), "d.py": _h("d-ours")} theirs = {"a.py": _h("a-theirs"), "c.py": _h("c-theirs"), "e.py": _h("e-theirs")} all_known_hashes = ( set(base.values()) | set(ours.values()) | set(theirs.values()) ) ours_changed = diff_snapshots(base, ours) theirs_changed = diff_snapshots(base, theirs) conflicts = detect_conflicts(ours_changed, theirs_changed, ours, theirs) merged = apply_merge(base, ours, theirs, ours_changed, theirs_changed, conflicts) for path, obj_id in merged.items(): assert obj_id in all_known_hashes, ( f"SI2 VIOLATED: apply_merge produced unknown hash {obj_id[:8]} for '{path}'. " f"Invented hashes corrupt the object store." ) @given(base=_manifest_strategy, ours=_manifest_strategy, theirs=_manifest_strategy) @settings(max_examples=300, suppress_health_check=[HealthCheck.too_slow]) def test_SI3_all_merged_hashes_come_from_known_inputs( self, base: Manifest, ours: Manifest, theirs: Manifest ) -> None: """SI3: property version of SI2 — apply_merge never generates new hashes.""" all_known = set(base.values()) | set(ours.values()) | set(theirs.values()) ours_changed = diff_snapshots(base, ours) theirs_changed = diff_snapshots(base, theirs) conflicts = detect_conflicts(ours_changed, theirs_changed, ours, theirs) merged = apply_merge(base, ours, theirs, ours_changed, theirs_changed, conflicts) for path, obj_id in merged.items(): assert obj_id in all_known, ( f"SI3 VIOLATED: merged[{path!r}] = {obj_id[:8]} was not in any input manifest." ) # =========================================================================== # DT — Determinism tests # =========================================================================== class TestDeterminismDT: """Snapshot IDs and commit IDs must be deterministic across calls and dict orderings.""" def test_DT1_snapshot_id_independent_of_dict_insertion_order(self) -> None: """DT1: compute_snapshot_id produces the same ID regardless of dict key order.""" from muse.core.ids import hash_snapshot as compute_snapshot_id manifest = {f"file_{i:03d}.py": _h(f"content-{i}") for i in range(50)} # Build the same dict in reverse insertion order. manifest_reversed: Manifest = {} for k in reversed(list(manifest.keys())): manifest_reversed[k] = manifest[k] id_forward = compute_snapshot_id(manifest) id_reversed = compute_snapshot_id(manifest_reversed) assert id_forward == id_reversed, ( "DT1 VIOLATED: compute_snapshot_id is sensitive to dict insertion order. " "Two repos with the same content would get different snapshot IDs." ) def test_DT2_snapshot_id_is_stable_across_calls(self) -> None: """DT2: the same manifest always produces the same snapshot_id.""" from muse.core.ids import hash_snapshot as compute_snapshot_id manifest = {"app.py": _h("app"), "README.md": _h("readme")} ids = {compute_snapshot_id(manifest) for _ in range(100)} assert len(ids) == 1, "DT2 VIOLATED: compute_snapshot_id is not deterministic." def test_DT3_empty_manifest_has_stable_id(self) -> None: """DT3: the empty manifest always has the same snapshot_id.""" from muse.core.ids import hash_snapshot as compute_snapshot_id ids = {compute_snapshot_id({}) for _ in range(50)} assert len(ids) == 1, "DT3: empty manifest snapshot_id is not stable." def test_DT4_commit_id_stable_for_same_inputs(self) -> None: """DT4: compute_commit_id with the same inputs always returns the same ID.""" from muse.core.ids import hash_commit as compute_commit_id kwargs = dict( parent_ids=["abc" * 21 + "a"], snapshot_id=_h("snap"), message="test commit", committed_at_iso="2026-01-01T00:00:00+00:00", ) ids = {compute_commit_id(**kwargs) for _ in range(50)} assert len(ids) == 1, "DT4: compute_commit_id is not deterministic." @given(manifest=_manifest_strategy) @settings(max_examples=200) def test_DT5_snapshot_id_property_stable(self, manifest: Manifest) -> None: """DT5: for any random manifest, snapshot_id is the same when called twice.""" from muse.core.ids import hash_snapshot as compute_snapshot_id assert compute_snapshot_id(manifest) == compute_snapshot_id(manifest) @given(manifest=_manifest_strategy) @settings(max_examples=200) def test_DT6_snapshot_id_order_independent_property(self, manifest: Manifest) -> None: """DT6: for any manifest, shuffling key insertion order doesn't change the ID.""" from muse.core.ids import hash_snapshot as compute_snapshot_id keys = list(manifest.keys()) # Build a copy with reversed key order shuffled = {k: manifest[k] for k in reversed(keys)} assert compute_snapshot_id(manifest) == compute_snapshot_id(shuffled), ( "DT6 VIOLATED: snapshot_id changed when dict key order changed." )