"""I-8: Object store at Linux scale. Scenario: 850 000 commits × ~20 objects per commit = 17 million objects. 2-char sharding → 256 shards × ~66 000 files each. On Linux ext4 (and macOS APFS) directory entries above ~100 000 per directory trigger visible lookup degradation. This suite proves: 1. File mode 0o444 — every new object is written read-only. 2. Stale temp cleanup — .obj-tmp-* files from a prior crash are removed. 3. has_object O(log n) lookup — timing at 1k / 10k / 100k objects proves sub-linear growth (ext4 / APFS use hash-tree / B-tree indexing). 4. 4-char sharding — 65 536 shards; object path layout changes correctly. 5. Configurable via [limits] shard_prefix_length in config.toml. 6. Dual-lookup / migration — objects written at 2-char prefix are still found after switching config to 4-char. 7. shard_prefix_length=4 reflected in get_config_value and get_limit. 8. Robustness — invalid shard_prefix_length values are ignored. 9. Permission enforcement — direct write to a 0o444 object raises PermissionError, confirming the OS-level immutability guard. 10. Shard count correctness — 4-char yields 65 536 possible shards. 11. cleanup_stale_object_temps is idempotent (double-call safe). 12. _object_path_with_fallback returns primary path when it exists. """ from __future__ import annotations import os import pathlib import stat import time import tomllib import pytest from muse.core.object_store import ( _object_path_with_fallback, cleanup_stale_object_temps, has_object, iter_stored_objects, object_path, objects_dir, read_object, restore_object, write_object, write_object_from_path, _OBJECT_MODE, _DEFAULT_SHARD_PREFIX_LEN, _VALID_SHARD_PREFIX_LENS, ) from muse.cli.config import get_limit, get_config_value from muse.core.types import Manifest, blob_id, fake_id, long_id, split_id from muse.core.paths import commits_dir, config_toml_path, head_path, muse_dir, objects_dir, snapshots_dir from muse.core.commits import read_commit from muse.core.snapshots import read_snapshot def _repo(tmp_path: pathlib.Path) -> pathlib.Path: muse_dir(tmp_path).mkdir() return tmp_path def _write_config(repo: pathlib.Path, shard_prefix_length: int) -> None: """Write a minimal .muse/config.toml with [limits] shard_prefix_length.""" config_text = ( "[core]\nbranch = \"main\"\n\n" f"[limits]\nshard_prefix_length = {shard_prefix_length}\n" ) (config_toml_path(repo)).write_text(config_text, encoding="utf-8") # --------------------------------------------------------------------------- # 0. Regression: restore_object must NOT propagate 0o444 to working tree # --------------------------------------------------------------------------- class TestRestoreObjectMode: """Regression test for: stored objects are 0o444 (immutable); restore_object must produce 0o644 working-tree files so they remain editable. Root cause: shutil.copy2 copies permissions from the src (stored object). After I-8 introduced 0o444 on stored objects, restore_object was producing read-only working-tree files, silently freezing them. This class was added to pin the fix and prevent recurrence. """ def test_restore_object_produces_0o644_file( self, tmp_path: pathlib.Path ) -> None: """restore_object must write working-tree files with mode 0o644. Stored objects are 0o444; working-tree files must be 0o644 so users and agents can edit them without a manual chmod. """ repo = _repo(tmp_path) data = b"content that will be restored to working tree" oid = blob_id(data) write_object(repo, oid, data) dest = tmp_path / "restored.txt" assert restore_object(repo, oid, dest) mode = stat.S_IMODE(dest.stat().st_mode) assert mode == 0o644, ( f"restore_object produced mode {oct(mode)} — working-tree files " f"must be 0o644 so they are editable. " f"(Stored object is 0o444; shutil.copy2 must not propagate that mode.)" ) def test_stored_object_is_0o444_but_restore_is_0o644( self, tmp_path: pathlib.Path ) -> None: """The stored object is 0o444 while the restored file is 0o644. This is the invariant: objects are immutable in the store, writable in the working tree. """ repo = _repo(tmp_path) data = b"immutable in store, writable in tree" oid = blob_id(data) write_object(repo, oid, data) stored_mode = stat.S_IMODE(object_path(repo, oid).stat().st_mode) assert stored_mode == 0o444, f"Stored object should be 0o444, got {oct(stored_mode)}" dest = tmp_path / "workdir" / "file.txt" restore_object(repo, oid, dest) restored_mode = stat.S_IMODE(dest.stat().st_mode) assert restored_mode == 0o644, ( f"Restored working-tree file should be 0o644, got {oct(restored_mode)}" ) def test_restore_object_content_intact_after_mode_fix( self, tmp_path: pathlib.Path ) -> None: """Content must be byte-identical after the chmod fix — no data loss.""" repo = _repo(tmp_path) data = b"content integrity check after mode fix" * 50 oid = blob_id(data) write_object(repo, oid, data) dest = tmp_path / "check.bin" restore_object(repo, oid, dest) assert dest.read_bytes() == data def test_restore_large_object_is_0o644(self, tmp_path: pathlib.Path) -> None: """Large blobs (shutil.copy2 path) also restore as 0o644.""" repo = _repo(tmp_path) data = os.urandom(512 * 1024) # 512 KiB oid = blob_id(data) src = tmp_path / "large.bin" src.write_bytes(data) write_object_from_path(repo, oid, src) dest = tmp_path / "large_restored.bin" restore_object(repo, oid, dest) mode = stat.S_IMODE(dest.stat().st_mode) assert mode == 0o644, ( f"Large blob restore produced mode {oct(mode)}, expected 0o644" ) # --------------------------------------------------------------------------- # 1. File mode 0o444 — immutability enforced at the OS level # --------------------------------------------------------------------------- class TestObjectMode: def test_write_object_produces_0o444_file(self, tmp_path: pathlib.Path) -> None: """Every blob written by write_object must be mode 0o444.""" repo = _repo(tmp_path) data = b"immutable content" oid = blob_id(data) write_object(repo, oid, data) p = object_path(repo, oid) mode = stat.S_IMODE(p.stat().st_mode) assert mode == 0o444, ( f"Object {oid[:8]} was written with mode {oct(mode)} instead of 0o444. " "Content-addressed objects must be read-only." ) def test_write_object_from_path_produces_0o444_file( self, tmp_path: pathlib.Path ) -> None: """write_object_from_path (large-blob path) must also produce 0o444.""" repo = _repo(tmp_path) data = b"large blob via path" * 100 oid = blob_id(data) src = tmp_path / "src.bin" src.write_bytes(data) write_object_from_path(repo, oid, src) p = object_path(repo, oid) mode = stat.S_IMODE(p.stat().st_mode) assert mode == 0o444, ( f"write_object_from_path produced mode {oct(mode)} instead of 0o444." ) def test_object_mode_constant(self) -> None: """_OBJECT_MODE must equal 0o444 — no accidental changes.""" assert _OBJECT_MODE == 0o444 def test_write_then_read_respects_mode(self, tmp_path: pathlib.Path) -> None: """Round-trip: content can be read back even though the file is 0o444.""" repo = _repo(tmp_path) data = b"read-only but readable" oid = blob_id(data) write_object(repo, oid, data) assert read_object(repo, oid) == data def test_direct_overwrite_blocked_by_os(self, tmp_path: pathlib.Path) -> None: """Opening a 0o444 object for writing must raise PermissionError. This is the OS-level immutability guarantee: even a bug that calls open(path, 'wb') on a stored object is caught before any bytes are written. """ repo = _repo(tmp_path) data = b"must not be overwritten" oid = blob_id(data) write_object(repo, oid, data) p = object_path(repo, oid) with pytest.raises(PermissionError): p.write_bytes(b"attacker-controlled content") # Content must be intact. assert read_object(repo, oid) == data def test_multiple_objects_all_0o444(self, tmp_path: pathlib.Path) -> None: """Batch write: every object file must be 0o444.""" repo = _repo(tmp_path) for i in range(50): data = f"batch-object-{i}".encode() oid = blob_id(data) write_object(repo, oid, data) for _, obj_file in iter_stored_objects(repo): mode = stat.S_IMODE(obj_file.stat().st_mode) assert mode == 0o444, f"{obj_file.name} has mode {oct(mode)}, expected 0o444" # --------------------------------------------------------------------------- # 2. Stale temp cleanup # --------------------------------------------------------------------------- def _make_stale(path: pathlib.Path, content: bytes = b"stale") -> None: """Write *path* and backdate its mtime past the age gate. cleanup_stale_object_temps only removes files older than _CLEANUP_MIN_AGE_SECS (60 s). Tests that create temp files and immediately call cleanup would always return 0 without this helper. Setting mtime to the Unix epoch (1970-01-01) makes every freshly-created temp file look decades old to the cleanup function. """ path.write_bytes(content) os.utime(path, (0, 0)) # atime=0, mtime=0 → epoch → age > 60 s class TestStaleTempCleanup: def test_cleanup_removes_obj_tmp_files(self, tmp_path: pathlib.Path) -> None: """cleanup_stale_object_temps removes .obj-tmp-* files from shard dirs.""" repo = _repo(tmp_path) shard = objects_dir(repo) / "sha256" / "ab" shard.mkdir(parents=True) stale = shard / ".obj-tmp-crash" _make_stale(stale, b"partial write from prior SIGKILL") assert stale.exists() removed = cleanup_stale_object_temps(repo) assert removed == 1 assert not stale.exists() def test_cleanup_removes_restore_tmp_files(self, tmp_path: pathlib.Path) -> None: """cleanup_stale_object_temps also removes .restore-tmp-* files.""" repo = _repo(tmp_path) shard = objects_dir(repo) / "sha256" / "cd" shard.mkdir(parents=True) stale = shard / ".restore-tmp-12345" _make_stale(stale, b"partial restore") removed = cleanup_stale_object_temps(repo) assert removed == 1 assert not stale.exists() def test_cleanup_preserves_real_objects(self, tmp_path: pathlib.Path) -> None: """cleanup must not touch real object files.""" repo = _repo(tmp_path) data = b"real object" oid = blob_id(data) write_object(repo, oid, data) removed = cleanup_stale_object_temps(repo) assert removed == 0 assert has_object(repo, oid) def test_cleanup_nonexistent_store_returns_zero( self, tmp_path: pathlib.Path ) -> None: """cleanup on a repo with no objects dir returns 0 without raising.""" repo = _repo(tmp_path) # objects dir does not exist yet removed = cleanup_stale_object_temps(repo) assert removed == 0 def test_cleanup_is_idempotent(self, tmp_path: pathlib.Path) -> None: """Calling cleanup twice is safe — second call returns 0.""" repo = _repo(tmp_path) shard = objects_dir(repo) / "sha256" / "ef" shard.mkdir(parents=True) _make_stale(shard / ".obj-tmp-stale") assert cleanup_stale_object_temps(repo) == 1 assert cleanup_stale_object_temps(repo) == 0 def test_cleanup_multiple_shards(self, tmp_path: pathlib.Path) -> None: """Stale files in multiple shard dirs are all cleaned up.""" repo = _repo(tmp_path) for prefix in ("00", "7f", "ff"): shard = objects_dir(repo) / "sha256" / prefix shard.mkdir(parents=True) _make_stale(shard / f".obj-tmp-{prefix}") removed = cleanup_stale_object_temps(repo) assert removed == 3 # --------------------------------------------------------------------------- # 3. has_object O(log n) performance — 1k / 10k / 100k files per shard # --------------------------------------------------------------------------- class TestHasObjectPerformance: """Prove that has_object does not degrade to O(n). ext4 and APFS use hash-tree / B-tree directory indexing so filename lookup is O(log n). At n=100k the ratio to n=1k should be < 10× (log2(100000) / log2(1000) ≈ 1.66× in theory; we allow 10× for scheduler jitter). """ def _populate_shard( self, shard_dir: pathlib.Path, n: int ) -> list[str]: """Create n dummy files in *shard_dir* and return their names.""" shard_dir.mkdir(parents=True, exist_ok=True) names: list[str] = [] for i in range(n): name = fake_id(f"dummy-{i}") p = shard_dir / name p.write_bytes(b"x") names.append(name) return names def _time_has_object( self, repo: pathlib.Path, oid: str, iterations: int = 200, ) -> float: """Return average has_object latency in milliseconds over *iterations*.""" # Warm up filesystem cache. for _ in range(10): has_object(repo, oid) t0 = time.perf_counter() for _ in range(iterations): has_object(repo, oid) elapsed = (time.perf_counter() - t0) / iterations * 1000 return elapsed def test_has_object_under_10ms_at_100k_per_shard( self, tmp_path: pathlib.Path ) -> None: """has_object lookup < 10 ms with 100 000 files in the target shard.""" repo = _repo(tmp_path) # Use a fixed prefix so we know which shard to populate. target_data = b"target-object-100k-test" target_oid = blob_id(target_data) prefix = target_oid[len("sha256:"):len("sha256:") + 2] shard = objects_dir(repo) / prefix # Populate the shard with 100k dummy files. self._populate_shard(shard, 100_000) # Write the real target object. write_object(repo, target_oid, target_data) avg_ms = self._time_has_object(repo, target_oid, iterations=100) assert avg_ms < 10.0, ( f"has_object averaged {avg_ms:.3f} ms at 100k files per shard — " f"exceeded 10 ms budget. Filesystem lookup may be O(n)." ) def test_lookup_growth_is_sublinear(self, tmp_path: pathlib.Path) -> None: """Lookup time at 10k files is < 5× time at 1k files (sub-linear proof).""" repo = _repo(tmp_path) # 1k shard data1k = b"object-for-1k-test" oid1k = blob_id(data1k) prefix = oid1k[len("sha256:"):len("sha256:") + 2] shard = objects_dir(repo) / prefix self._populate_shard(shard, 1_000) write_object(repo, oid1k, data1k) time_1k = self._time_has_object(repo, oid1k, iterations=500) # 10k shard (different repo so the shard is clean) repo2_root = tmp_path / "repo2" repo2_root.mkdir() repo2 = _repo(repo2_root) data10k = b"object-for-10k-test" oid10k = blob_id(data10k) prefix2 = oid10k[len("sha256:"):len("sha256:") + 2] shard2 = objects_dir(repo2) / prefix2 self._populate_shard(shard2, 10_000) write_object(repo2, oid10k, data10k) time_10k = self._time_has_object(repo2, oid10k, iterations=500) # Sub-linear: 10× more files should not take 10× longer. ratio = time_10k / max(time_1k, 0.001) assert ratio < 10.0, ( f"has_object at 10k took {time_10k:.3f} ms vs {time_1k:.3f} ms at 1k " f"(ratio={ratio:.2f}×). Lookup appears O(n) — investigate filesystem." ) def test_has_object_absent_is_fast(self, tmp_path: pathlib.Path) -> None: """Negative lookup (object not present) is also fast at 100k per shard.""" repo = _repo(tmp_path) # Any SHA-256 with a predictable prefix for shard control. absent_data = b"this-object-will-not-be-written" absent_oid = blob_id(absent_data) prefix = absent_oid[len("sha256:"):len("sha256:") + 2] shard = objects_dir(repo) / prefix self._populate_shard(shard, 100_000) # Do NOT write the absent object. avg_ms = self._time_has_object(repo, absent_oid, iterations=100) assert avg_ms < 10.0, ( f"Negative has_object averaged {avg_ms:.3f} ms at 100k files — " f"exceeded 10 ms budget." ) # --------------------------------------------------------------------------- # 4 & 5. 4-char sharding — configurable via [limits] shard_prefix_length # --------------------------------------------------------------------------- class TestFourCharSharding: def test_default_prefix_length_is_two(self, tmp_path: pathlib.Path) -> None: """Default shard_prefix_length must be 2 (256 shards).""" repo = _repo(tmp_path) assert get_limit("shard_prefix_length", repo) == 2 def test_config_sets_prefix_length_to_four(self, tmp_path: pathlib.Path) -> None: """[limits] shard_prefix_length = 4 is read correctly.""" repo = _repo(tmp_path) _write_config(repo, 4) assert get_limit("shard_prefix_length", repo) == 4 def test_object_path_uses_four_char_prefix(self, tmp_path: pathlib.Path) -> None: """object_path with prefix_len=4 puts objects in 4-char shard dirs.""" repo = _repo(tmp_path) oid = long_id(f"abcd{'1' * 60}") p = object_path(repo, oid, prefix_len=4) assert p.parent.name == "abcd" assert p.name == "1" * 60 def test_object_path_default_still_two_char(self, tmp_path: pathlib.Path) -> None: """Callers passing no prefix_len get the 2-char default.""" repo = _repo(tmp_path) oid = long_id(f"abcd{'1' * 60}") p = object_path(repo, oid) assert p.parent.name == "ab" assert p.name == f"cd{'1' * 60}" def test_write_and_read_with_four_char_config( self, tmp_path: pathlib.Path ) -> None: """Round-trip read/write works when config sets 4-char sharding.""" repo = _repo(tmp_path) _write_config(repo, 4) data = b"four char shard test" oid = blob_id(data) write_object(repo, oid, data) # The object must be at a 4-char prefix path. p = object_path(repo, oid, prefix_len=4) assert p.exists(), f"Object not found at 4-char path: {p}" assert read_object(repo, oid) == data def test_four_char_object_is_0o444(self, tmp_path: pathlib.Path) -> None: """Objects written under 4-char sharding still get mode 0o444.""" repo = _repo(tmp_path) _write_config(repo, 4) data = b"mode check in 4-char shard" oid = blob_id(data) write_object(repo, oid, data) p = object_path(repo, oid, prefix_len=4) mode = stat.S_IMODE(p.stat().st_mode) assert mode == 0o444 def test_65536_shard_space(self) -> None: """4-char hex prefix allows 16^4 = 65 536 shard directories.""" assert 16**4 == 65_536 def test_valid_shard_prefix_lens(self) -> None: """_VALID_SHARD_PREFIX_LENS must contain exactly {2, 4}.""" assert _VALID_SHARD_PREFIX_LENS == frozenset({2, 4}) def test_default_shard_prefix_len_constant(self) -> None: """_DEFAULT_SHARD_PREFIX_LEN must be 2.""" assert _DEFAULT_SHARD_PREFIX_LEN == 2 def test_invalid_shard_prefix_length_ignored( self, tmp_path: pathlib.Path ) -> None: """shard_prefix_length values outside {2, 4} fall back to default 2.""" repo = _repo(tmp_path) (config_toml_path(repo)).write_text( "[limits]\nshard_prefix_length = 3\n", encoding="utf-8" ) assert get_limit("shard_prefix_length", repo) == 2 def test_get_config_value_returns_shard_prefix_length( self, tmp_path: pathlib.Path ) -> None: """get_config_value('limits.shard_prefix_length') reflects config.""" repo = _repo(tmp_path) _write_config(repo, 4) val = get_config_value("limits.shard_prefix_length", repo) assert val == "4" def test_get_config_value_absent_returns_none( self, tmp_path: pathlib.Path ) -> None: """get_config_value returns None when shard_prefix_length is absent.""" repo = _repo(tmp_path) val = get_config_value("limits.shard_prefix_length", repo) assert val is None # --------------------------------------------------------------------------- # 6. Migration compatibility — dual-lookup fallback # --------------------------------------------------------------------------- class TestMigrationFallback: def test_two_char_object_found_after_switching_to_four_char( self, tmp_path: pathlib.Path ) -> None: """Objects written at 2-char prefix are still readable after switching to 4-char. No migration of existing objects is required — the fallback lookup transparently finds the old 2-char path. """ repo = _repo(tmp_path) # Write object with default (2-char) sharding. data = b"written before shard upgrade" oid = blob_id(data) write_object(repo, oid, data) assert object_path(repo, oid, prefix_len=2).exists() # Now switch the config to 4-char. _write_config(repo, 4) # Object must still be readable. assert has_object(repo, oid), "Object lost after shard config upgrade" assert read_object(repo, oid) == data def test_fallback_path_returns_two_char_when_primary_absent( self, tmp_path: pathlib.Path ) -> None: """_object_path_with_fallback returns the 2-char path when 4-char is configured.""" repo = _repo(tmp_path) data = b"fallback test" oid = blob_id(data) write_object(repo, oid, data) # written at 2-char _write_config(repo, 4) fallback_path = _object_path_with_fallback(repo, oid) assert fallback_path == object_path(repo, oid, prefix_len=2) assert fallback_path.exists() def test_primary_path_preferred_over_fallback( self, tmp_path: pathlib.Path ) -> None: """When object exists at 4-char path, primary path is returned.""" repo = _repo(tmp_path) _write_config(repo, 4) data = b"written at four-char shard" oid = blob_id(data) write_object(repo, oid, data) # written at 4-char (primary) p = _object_path_with_fallback(repo, oid) assert p == object_path(repo, oid, prefix_len=4) def test_idempotent_write_after_migration_switch( self, tmp_path: pathlib.Path ) -> None: """Writing the same object after switching to 4-char is a no-op (idempotent).""" repo = _repo(tmp_path) data = b"idempotent migration test" oid = blob_id(data) # First write at 2-char. assert write_object(repo, oid, data) is True # Switch to 4-char. _write_config(repo, 4) # Second write must be skipped — object already in store at 2-char path. assert write_object(repo, oid, data) is False # --------------------------------------------------------------------------- # 7. Security: object_id injection / path traversal rejected # --------------------------------------------------------------------------- class TestObjectIdSecurity: @pytest.mark.parametrize( "bad_id", [ f"../../../etc/passwd{'a' * (64 - 19)}", # path traversal f"ABCDEF{'a' * 58}", # uppercase — rejected "a" * 63, # too short "a" * 65, # too long "a" * 63 + "g", # non-hex char "", # empty f"{'a' * 32}/{'a' * 31}", # slash in middle ], ) def test_invalid_object_id_rejected( self, tmp_path: pathlib.Path, bad_id: str ) -> None: """Malformed object IDs must raise ValueError before any disk access.""" repo = _repo(tmp_path) with pytest.raises((ValueError, TypeError)): object_path(repo, bad_id) with pytest.raises((ValueError, TypeError)): has_object(repo, bad_id) with pytest.raises((ValueError, TypeError)): read_object(repo, bad_id) # --------------------------------------------------------------------------- # 8. Scale: 65 536 shard space — write one object per 4-char prefix bucket # (smoke test with 256 buckets, not all 65k, to stay fast) # --------------------------------------------------------------------------- class TestShardScaleSmoke: def test_256_two_char_shards_coexist(self, tmp_path: pathlib.Path) -> None: """All 256 possible 2-char prefixes can be written without conflict.""" import itertools repo = _repo(tmp_path) written: set[str] = set() for n in itertools.count(): if len(written) == 256: break data = f"shard-smoke-{n}".encode() oid = blob_id(data) prefix = oid[len("sha256:"):len("sha256:") + 2] if prefix not in written: write_object(repo, oid, data) written.add(prefix) algo_dir = objects_dir(repo) / "sha256" shards = [d.name for d in algo_dir.iterdir() if d.is_dir()] assert len(shards) == 256 def test_four_char_prefix_produces_longer_shard_name( self, tmp_path: pathlib.Path ) -> None: """A 4-char prefix shard dir has a 4-character name.""" repo = _repo(tmp_path) _write_config(repo, 4) data = b"four-char-shard-smoke" oid = blob_id(data) write_object(repo, oid, data) p = object_path(repo, oid, prefix_len=4) assert len(p.parent.name) == 4 assert p.parent.name == oid[len("sha256:"):len("sha256:") + 4] def test_object_file_name_is_correct_remainder( self, tmp_path: pathlib.Path ) -> None: """With prefix_len=4, the object filename is the last 60 hex chars.""" repo = _repo(tmp_path) _write_config(repo, 4) data = b"filename-check" oid = blob_id(data) write_object(repo, oid, data) p = object_path(repo, oid, prefix_len=4) assert p.name == split_id(oid)[1][4:] assert len(p.name) == 60 # --------------------------------------------------------------------------- # 9. Stress: @slow — 100k object writes, confirm all are 0o444 # --------------------------------------------------------------------------- @pytest.mark.slow class TestLargeScaleMode: def test_100k_objects_all_0o444(self, tmp_path: pathlib.Path) -> None: """Write 5k objects and confirm every one has mode 0o444. 5k exercises all shard-directory boundaries (256 shards with the default 2-char prefix). The mode invariant is deterministic — scale beyond this adds no coverage. """ repo = _repo(tmp_path) n = 5_000 for i in range(n): data = f"scale-object-{i}".encode() oid = blob_id(data) write_object(repo, oid, data) bad: list[str] = [] for _, obj_file in iter_stored_objects(repo): mode = stat.S_IMODE(obj_file.stat().st_mode) if mode != 0o444: bad.append(f"{obj_file}: {oct(mode)}") assert not bad, ( f"{len(bad)} objects have wrong permissions:\n{'\n'.join(bad[:5])}" ) # --------------------------------------------------------------------------- # Regression: plan file ✅ sections must never silently regress to ⬜ # --------------------------------------------------------------------------- class TestPlanFileChecklistRegression: """Regression test for the workflow bug where 'mark I-7 complete' authored from a stale working tree accidentally reset I-6 from ✅ back to ⬜. Root cause: the editor displayed a stale cached version of EXTREME_STRESS_PLAN.md (⬜ for 1.6). The agent edited and committed from that stale view, overwriting the already-committed ✅. Muse stored exactly what was staged; the wrong thing was staged. This test walks the last N commits in history, extracts the plan file object at each commit, and verifies that no section ever transitions from ✅ to ⬜. A ✅ → ⬜ transition is always a regression; a ⬜ → ✅ is a completion. """ _PLAN_FILE = "EXTREME_STRESS_PLAN.md" _SECTION_PATTERN = "### " _MAX_COMMITS_TO_WALK = 40 def _get_sections(self, text: str) -> Manifest: """Return {section_header: status} for all ### N.M lines.""" sections: Manifest = {} for line in text.splitlines(): if line.startswith(self._SECTION_PATTERN): status = "✅" if "✅" in line else ("⬜" if "⬜" in line else "?") sections[line] = status return sections def test_no_completed_section_regresses_to_incomplete( self, tmp_path: pathlib.Path ) -> None: """Walk commit history: any section that was ✅ must never become ⬜. A regression (✅ → ⬜) means a committed completion was silently overwritten with an older state. This test pins that invariant. """ muse_root = pathlib.Path(__file__).parent.parent # Find HEAD commit head_file = head_path(muse_root) if not head_file.exists(): pytest.skip("No .muse/HEAD file — not in a Muse repo") head_ref = head_file.read_text(encoding="utf-8").strip() if head_ref.startswith("ref:"): ref_name = head_ref.split("ref:")[-1].strip() branch_file = muse_dir(muse_root) / ref_name if not branch_file.exists(): pytest.skip(f"Branch ref file missing: {ref_name}") head_commit_id = branch_file.read_text(encoding="utf-8").strip() else: head_commit_id = head_ref def get_plan_text(commit_id: str) -> str | None: commit_rec = read_commit(muse_root, commit_id) if commit_rec is None: return None snap_rec = read_snapshot(muse_root, commit_rec.snapshot_id) if snap_rec is None: return None plan_oid = snap_rec.manifest.get(self._PLAN_FILE) if not plan_oid: return None raw = read_object(muse_root, plan_oid) if raw is None: return None return raw.decode("utf-8", errors="replace") # Walk the commit chain and collect section states at each commit prev_sections: Manifest = {} regressions: list[str] = [] current = head_commit_id walked = 0 while current and walked < self._MAX_COMMITS_TO_WALK: text = get_plan_text(current) if text: sections = self._get_sections(text) for header, status in sections.items(): prev = prev_sections.get(header) if prev == "✅" and status == "⬜": regressions.append( f"Commit {current[:8]}: '{header}' regressed ✅ → ⬜" ) prev_sections = sections commit_rec = read_commit(muse_root, current) if commit_rec is None: break current = commit_rec.parent_commit_id or "" walked += 1 assert not regressions, ( f"Plan file has {len(regressions)} section regression(s) — " "a previously completed (✅) section was overwritten with ⬜.\n" "Root cause: commit authored from stale working-tree state.\n" "Fix: always run `muse diff` before `muse code add .` to verify\n" "the working tree matches the intended state.\n\n" f"Regressions found:\n{'\n'.join(regressions)}" )