"""Stress tests for CommitRecord, SnapshotRecord, TagRecord, and provenance fields. Covers: - CommitRecord round-trip through to_dict/from_dict for all format versions. - format_version evolution: missing fields default correctly when reading old records. - reviewed_by (ORSet semantics): list preserved, sorted, deduplicated via overwrite_commit. - test_runs (GCounter semantics): monotonically increases via overwrite_commit. - agent_id / model_id / toolchain_id / prompt_hash / signature fields. - SnapshotRecord round-trip with large manifests. - TagRecord round-trip. - get_head_commit_id on empty branch returns None. - write_commit is idempotent (won't overwrite). - overwrite_commit updates the persisted record correctly. - read_commit for absent commit returns None. - list_commits and list_branches. - list_tags returns all tags. """ import datetime import pathlib import pytest from muse.core.types import fake_id, long_id from muse.core.paths import ref_path, muse_dir from muse.core.crdts.or_set import ORSet from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.domain import SemVerBump from muse.core.store import ( CommitDict, CommitRecord, SnapshotRecord, TagRecord, get_all_commits, get_all_tags, get_head_commit_id, overwrite_commit, read_commit, read_snapshot, write_commit, write_snapshot, write_tag, ) # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def repo(tmp_path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(tmp_path) (dot_muse / "commits").mkdir(parents=True) (dot_muse / "snapshots").mkdir(parents=True) (dot_muse / "tags").mkdir(parents=True) (dot_muse / "refs" / "heads").mkdir(parents=True) return tmp_path def _now() -> datetime.datetime: return datetime.datetime.now(datetime.timezone.utc) _SNAP_ID: str = compute_snapshot_id({}) def _commit( label: str = "default", branch: str = "main", parent: str | None = None, ) -> CommitRecord: """Create a CommitRecord with a real content-addressed commit_id.""" committed_at = _now() cid = compute_commit_id( parent_ids=[parent] if parent else [], snapshot_id=_SNAP_ID, message=f"commit {label}", committed_at_iso=committed_at.isoformat(), ) return CommitRecord( commit_id=cid, branch=branch, snapshot_id=_SNAP_ID, message=f"commit {label}", committed_at=committed_at, parent_commit_id=parent, ) # =========================================================================== # CommitRecord round-trip # =========================================================================== class TestCommitRecordRoundTrip: def test_minimal_round_trip(self) -> None: c = _commit() restored = CommitRecord.from_dict(c.to_dict()) assert restored.commit_id == c.commit_id assert restored.branch == c.branch assert restored.message == c.message def test_all_provenance_fields_preserved(self) -> None: c = CommitRecord( commit_id="prov123", branch="main", snapshot_id="snap", message="provenance commit", committed_at=_now(), agent_id="claude-v4", model_id="claude-3-5-sonnet", toolchain_id="muse-cli-1.0", prompt_hash="abc" * 10 + "ab", signature=f"sig-{'x' * 60}", signer_key_id="key-001", ) d = c.to_dict() restored = CommitRecord.from_dict(d) assert restored.agent_id == "claude-v4" assert restored.model_id == "claude-3-5-sonnet" assert restored.toolchain_id == "muse-cli-1.0" assert restored.signature == c.signature assert restored.signer_key_id == "key-001" def test_crdt_fields_preserved(self) -> None: c = CommitRecord( commit_id="crdt123", branch="main", snapshot_id="snap", message="crdt", committed_at=_now(), reviewed_by=["alice", "bob", "charlie"], test_runs=42, ) d = c.to_dict() restored = CommitRecord.from_dict(d) assert sorted(restored.reviewed_by) == ["alice", "bob", "charlie"] assert restored.test_runs == 42 def test_sem_ver_bump_preserved(self) -> None: bumps: tuple[SemVerBump, ...] = ("none", "patch", "minor", "major") for bump in bumps: c = CommitRecord( commit_id="sv", branch="main", snapshot_id="s", message="m", committed_at=_now(), sem_ver_bump=bump, ) assert CommitRecord.from_dict(c.to_dict()).sem_ver_bump == bump def test_breaking_changes_preserved(self) -> None: c = CommitRecord( commit_id="bc", branch="main", snapshot_id="s", message="m", committed_at=_now(), breaking_changes=["removed `old_api`", "renamed `foo` → `bar`"], ) restored = CommitRecord.from_dict(c.to_dict()) assert restored.breaking_changes == ["removed `old_api`", "renamed `foo` → `bar`"] def test_parent_ids_preserved(self) -> None: c = CommitRecord( commit_id="merge", branch="main", snapshot_id="s", message="m", committed_at=_now(), parent_commit_id="parent-1", parent2_commit_id="parent-2", ) restored = CommitRecord.from_dict(c.to_dict()) assert restored.parent_commit_id == "parent-1" assert restored.parent2_commit_id == "parent-2" def test_missing_crdt_fields_default_correctly(self) -> None: """Simulates reading an older commit that lacks reviewed_by / test_runs.""" minimal: CommitDict = { "commit_id": "old", "repo_id": "r", "branch": "main", "snapshot_id": "snap", "message": "old commit", "committed_at": _now().isoformat(), } restored = CommitRecord.from_dict(minimal) assert restored.reviewed_by == [] assert restored.test_runs == 0 def test_committed_at_timezone_aware(self) -> None: c = _commit() restored = CommitRecord.from_dict(c.to_dict()) assert restored.committed_at.tzinfo is not None # =========================================================================== # CommitRecord persistence # =========================================================================== class TestCommitPersistence: def test_write_and_read_back(self, repo: pathlib.Path) -> None: c = _commit("id001") write_commit(repo, c) restored = read_commit(repo, c.commit_id) assert restored is not None assert restored.commit_id == c.commit_id def test_write_is_idempotent(self, repo: pathlib.Path) -> None: c = _commit("id002") write_commit(repo, c) # Write the same commit again — the store must not corrupt it. write_commit(repo, c) restored = read_commit(repo, c.commit_id) assert restored is not None assert restored.message == c.message def test_read_absent_commit_returns_none(self, repo: pathlib.Path) -> None: assert read_commit(repo, fake_id("does-not-exist")) is None def test_overwrite_commit_updates_reviewed_by(self, repo: pathlib.Path) -> None: c = _commit("id003") write_commit(repo, c) # Simulate ORSet merge: add reviewer. updated = read_commit(repo, c.commit_id) assert updated is not None updated.reviewed_by = ["agent-x", "human-bob"] overwrite_commit(repo, updated) restored = read_commit(repo, c.commit_id) assert restored is not None assert "agent-x" in restored.reviewed_by assert "human-bob" in restored.reviewed_by def test_overwrite_commit_updates_test_runs(self, repo: pathlib.Path) -> None: c = _commit("id004") write_commit(repo, c) for expected in range(1, 6): rec = read_commit(repo, c.commit_id) assert rec is not None rec.test_runs += 1 overwrite_commit(repo, rec) after = read_commit(repo, c.commit_id) assert after is not None assert after.test_runs == expected def test_list_commits_returns_all_written(self, repo: pathlib.Path) -> None: commits = [_commit(f"c{i:04d}") for i in range(20)] for c in commits: write_commit(repo, c) found = {c.commit_id for c in get_all_commits(repo)} for c in commits: assert c.commit_id in found def test_many_commits_all_retrievable(self, repo: pathlib.Path) -> None: real_ids: list[str] = [] prev: str | None = None base_ts = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc) for i in range(100): committed_at = base_ts + datetime.timedelta(seconds=i) cid = compute_commit_id( parent_ids=[prev] if prev else [], snapshot_id=_SNAP_ID, message=f"stress-{i:04d}", committed_at_iso=committed_at.isoformat(), ) write_commit(repo, CommitRecord( commit_id=cid, branch="main", snapshot_id=_SNAP_ID, message=f"stress-{i:04d}", committed_at=committed_at, parent_commit_id=prev, )) real_ids.append(cid) prev = cid for cid in real_ids: assert read_commit(repo, cid) is not None # =========================================================================== # SnapshotRecord # =========================================================================== class TestSnapshotRecordRoundTrip: def test_minimal_round_trip(self) -> None: s = SnapshotRecord(snapshot_id="snap-1", manifest={"f.mid": "hash1"}) restored = SnapshotRecord.from_dict(s.to_dict()) assert restored.snapshot_id == "snap-1" assert restored.manifest == {"f.mid": "hash1"} def test_large_manifest(self) -> None: manifest = {f"track_{i:04d}.mid": f"hash-{i:064d}" for i in range(500)} s = SnapshotRecord(snapshot_id="big-snap", manifest=manifest) restored = SnapshotRecord.from_dict(s.to_dict()) assert len(restored.manifest) == 500 assert restored.manifest["track_0000.mid"] == f"hash-{0:064d}" def test_write_and_read_back(self, repo: pathlib.Path) -> None: manifest = {"a.mid": fake_id("a.mid-content"), "b.mid": fake_id("b.mid-content")} snap_id = compute_snapshot_id(manifest) s = SnapshotRecord(snapshot_id=snap_id, manifest=manifest) write_snapshot(repo, s) restored = read_snapshot(repo, snap_id) assert restored is not None assert restored.manifest == manifest def test_empty_manifest_round_trip(self) -> None: s = SnapshotRecord(snapshot_id="empty-snap", manifest={}) restored = SnapshotRecord.from_dict(s.to_dict()) assert restored.manifest == {} # =========================================================================== # TagRecord # =========================================================================== class TestTagRecord: def test_round_trip(self) -> None: t = TagRecord( tag_id="tag-001", repo_id="test-repo", commit_id="abc123", tag="v1.0.0", ) restored = TagRecord.from_dict(t.to_dict()) assert restored.tag_id == "tag-001" assert restored.tag == "v1.0.0" assert restored.commit_id == "abc123" def test_write_and_list(self, repo: pathlib.Path) -> None: for i in range(10): write_tag(repo, TagRecord( tag_id=fake_id(f"tag-{i:04d}"), repo_id=fake_id("repo"), commit_id=fake_id(f"commit-{i:04d}"), tag=f"v{i}.0.0", )) tags = get_all_tags(repo, fake_id("repo")) assert len(tags) == 10 def test_created_at_preserved(self) -> None: ts = datetime.datetime(2025, 6, 15, 12, 0, 0, tzinfo=datetime.timezone.utc) t = TagRecord(tag_id="t", repo_id="r", commit_id="c", tag="v1", created_at=ts) restored = TagRecord.from_dict(t.to_dict()) assert abs((restored.created_at - ts).total_seconds()) < 1.0 # =========================================================================== # get_head_commit_id # =========================================================================== class TestGetHeadCommitId: def test_empty_branch_returns_none(self, repo: pathlib.Path) -> None: assert get_head_commit_id(repo, "nonexistent-branch") is None def test_returns_id_after_writing_head_ref(self, repo: pathlib.Path) -> None: cid = long_id("a" * 64) ref_path(repo, "main").write_text(f"{cid}\n") assert get_head_commit_id(repo, "main") == cid def test_strips_whitespace(self, repo: pathlib.Path) -> None: cid = long_id("b" * 64) ref_path(repo, "feature").write_text(f" {cid} \n") assert get_head_commit_id(repo, "feature") == cid # =========================================================================== # CRDT semantics on CommitRecord fields # =========================================================================== class TestCRDTAnnotationSemantics: def test_reviewed_by_orset_union_semantics(self, repo: pathlib.Path) -> None: """ORSet union: multiple overwrite_commit calls accumulate reviewers.""" c = _commit("crdt-or-001") write_commit(repo, c) # Agent 1 adds their name. rec = read_commit(repo, c.commit_id) assert rec is not None s, tok1 = ORSet().add("agent-alpha") rec.reviewed_by = list(s.elements()) overwrite_commit(repo, rec) # Agent 2 independently adds their name. rec2 = read_commit(repo, c.commit_id) assert rec2 is not None s2 = ORSet() for name in rec2.reviewed_by: s2, _ = s2.add(name) s2, tok2 = s2.add("agent-beta") rec2.reviewed_by = sorted(s2.elements()) overwrite_commit(repo, rec2) final = read_commit(repo, c.commit_id) assert final is not None assert "agent-alpha" in final.reviewed_by assert "agent-beta" in final.reviewed_by def test_test_runs_gcounter_monotone(self, repo: pathlib.Path) -> None: """GCounter: test_runs must never decrease.""" c = _commit("crdt-gc-001") write_commit(repo, c) prev = 0 for _ in range(50): rec = read_commit(repo, c.commit_id) assert rec is not None rec.test_runs += 1 overwrite_commit(repo, rec) current = read_commit(repo, c.commit_id) assert current is not None assert current.test_runs >= prev prev = current.test_runs assert prev == 50 def test_all_provenance_fields_default_to_empty_string(self) -> None: c = _commit() assert c.agent_id == "" assert c.model_id == "" assert c.toolchain_id == "" assert c.prompt_hash == "" assert c.signature == "" assert c.signer_key_id == ""