"""Tests for ``muse count-objects`` — object store diagnostics. Coverage tiers: - Unit: _count_loose_objects, _collect_reachable_ids helpers - Integration: empty store, single object, multi-shard, verbose breakdown, --unreachable counts GC candidates, JSON schema, text format, objects match expected after N commits - End-to-end: full CLI via CliRunner - Security: read-only — no mutations; no content reads (stat only) - Stress: store with many objects; --unreachable on multi-commit repo """ from __future__ import annotations from collections.abc import Mapping import datetime import json import os import pathlib import pytest from tests.cli_test_helper import CliRunner from muse.core.object_store import write_object from muse.core.ids import hash_commit, hash_snapshot from muse.core.commits import ( CommitRecord, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, write_snapshot, ) from muse.core.types import Manifest, blob_id from muse.core.paths import muse_dir, objects_dir, ref_path runner = CliRunner() _REPO_ID = "count-objects-test" _counter = 0 # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _init_repo(path: pathlib.Path) -> pathlib.Path: dot_muse = muse_dir(path) for d in ("commits", "snapshots", "objects", "refs/heads", "code"): (dot_muse / d).mkdir(parents=True, exist_ok=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main", encoding="utf-8") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": _REPO_ID, "domain": "code"}), encoding="utf-8" ) return path def _env(repo: pathlib.Path) -> Mapping[str, str]: return {"MUSE_REPO_ROOT": str(repo)} def _commit_files( root: pathlib.Path, files: Mapping[str, bytes], branch: str = "main", ) -> str: global _counter _counter += 1 manifest: Manifest = {} for rel_path, content in files.items(): obj_id = blob_id(content) write_object(root, obj_id, content) manifest[rel_path] = obj_id abs_path = root / rel_path abs_path.parent.mkdir(parents=True, exist_ok=True) abs_path.write_bytes(content) snap_id = hash_snapshot(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=manifest)) committed_at = datetime.datetime.now(datetime.timezone.utc) # Read the current tip to use as parent (proper chain for reachability BFS). branch_ref = ref_path(root, branch) parent_id = branch_ref.read_text(encoding="utf-8").strip() if branch_ref.exists() else None parents = [parent_id] if parent_id else [] commit_id = hash_commit( parent_ids=parents, snapshot_id=snap_id, message=f"commit {_counter}", committed_at_iso=committed_at.isoformat(), ) write_commit( root, CommitRecord( commit_id=commit_id, branch=branch, snapshot_id=snap_id, message=f"commit {_counter}", committed_at=committed_at, parent_commit_id=parent_id, ), ) branch_ref.write_text(commit_id, encoding="utf-8") return commit_id def _invoke(repo: pathlib.Path, *args: str) -> "InvokeResult": from muse.cli.app import main as cli return runner.invoke(cli, ["count-objects", *args], env=_env(repo)) # --------------------------------------------------------------------------- # Unit — _count_loose_objects # --------------------------------------------------------------------------- def test_count_loose_objects_empty_store(tmp_path: pathlib.Path) -> None: from muse.cli.commands.count_objects import _count_loose_objects root = _init_repo(tmp_path) count, size = _count_loose_objects(root) assert count == 0 assert size == 0 def test_count_loose_objects_single_object(tmp_path: pathlib.Path) -> None: from muse.cli.commands.count_objects import _count_loose_objects root = _init_repo(tmp_path) content = b"hello world" obj_id = blob_id(content) write_object(root, obj_id, content) count, size = _count_loose_objects(root) assert count == 1 assert size > 0 def test_count_loose_objects_multiple_shards(tmp_path: pathlib.Path) -> None: from muse.cli.commands.count_objects import _count_loose_objects root = _init_repo(tmp_path) # Write 5 distinct objects (may land in different shards) for i in range(5): content = f"object {i}".encode() write_object(root, blob_id(content), content) count, _ = _count_loose_objects(root) assert count == 5 # --------------------------------------------------------------------------- # Unit — _collect_reachable_ids # --------------------------------------------------------------------------- def test_collect_reachable_ids_empty_repo(tmp_path: pathlib.Path) -> None: from muse.cli.commands.count_objects import _collect_reachable_ids root = _init_repo(tmp_path) ids = _collect_reachable_ids(root) assert isinstance(ids, set) assert len(ids) == 0 def test_collect_reachable_ids_after_commit(tmp_path: pathlib.Path) -> None: from muse.cli.commands.count_objects import _collect_reachable_ids root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) ids = _collect_reachable_ids(root) # At minimum: the blob object for a.py assert len(ids) >= 1 def test_collect_reachable_ids_includes_all_blobs(tmp_path: pathlib.Path) -> None: from muse.cli.commands.count_objects import _collect_reachable_ids root = _init_repo(tmp_path) files = {"a.py": b"# a\n", "b.py": b"# b\n", "c.py": b"# c\n"} _commit_files(root, files) ids = _collect_reachable_ids(root) # All three blob IDs must be reachable for content in files.values(): assert blob_id(content) in ids # --------------------------------------------------------------------------- # Integration — JSON output schema # --------------------------------------------------------------------------- def test_count_objects_json_schema_keys(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) result = _invoke(root, "--json") assert result.exit_code == 0 data = json.loads(result.stdout) for key in ("loose_objects", "loose_size_kb", "total_objects", "total_size_kb", "object_store_path", "duration_ms", "exit_code"): assert key in data, f"Missing key: {key}" def test_count_objects_json_count_matches_written(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) # Write 3 unique blobs directly (no commit overhead) for i in range(3): content = f"direct blob {i}".encode() write_object(root, blob_id(content), content) result = _invoke(root, "--json") data = json.loads(result.stdout) assert data["loose_objects"] >= 3 def test_count_objects_json_empty_store(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["loose_objects"] == 0 assert data["total_objects"] == 0 def test_count_objects_json_size_nonzero_after_write(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) write_object(root, blob_id(b"x" * 1000), b"x" * 1000) result = _invoke(root, "--json") data = json.loads(result.stdout) assert data["loose_size_kb"] > 0 or data["total_size_kb"] > 0 def test_count_objects_json_object_store_path_present(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--json") data = json.loads(result.stdout) assert "objects" in data["object_store_path"] # --------------------------------------------------------------------------- # Integration — text output format # --------------------------------------------------------------------------- def test_count_objects_text_output_nonempty(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) result = _invoke(root) assert result.exit_code == 0 assert result.stdout.strip() def test_count_objects_text_mentions_count(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) for i in range(5): write_object(root, blob_id(f"obj{i}".encode()), f"obj{i}".encode()) result = _invoke(root) # The count should appear somewhere in the output assert any(char.isdigit() for char in result.stdout) # --------------------------------------------------------------------------- # Integration — --verbose shard breakdown # --------------------------------------------------------------------------- def test_count_objects_verbose_json_has_shards(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) for i in range(4): content = f"shard content {i}".encode() write_object(root, blob_id(content), content) result = _invoke(root, "--verbose", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert "shards" in data assert isinstance(data["shards"], list) def test_count_objects_verbose_shards_sum_to_total(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) for i in range(6): content = f"v content {i}".encode() write_object(root, blob_id(content), content) result = _invoke(root, "--verbose", "--json") data = json.loads(result.stdout) shard_total = sum(s["count"] for s in data["shards"]) assert shard_total == data["loose_objects"] # --------------------------------------------------------------------------- # Integration — --unreachable # --------------------------------------------------------------------------- def test_count_objects_unreachable_zero_after_clean_commit(tmp_path: pathlib.Path) -> None: """After a commit where all blobs are referenced, unreachable should be 0.""" root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n", "b.py": b"# b\n"}) result = _invoke(root, "--unreachable", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert "unreachable_objects" in data assert data["unreachable_objects"] == 0 def test_count_objects_unreachable_detects_orphan_blobs(tmp_path: pathlib.Path) -> None: """Blobs written but not referenced by any commit are unreachable.""" root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) # Write an extra blob that is NOT referenced by any commit orphan = b"i am an orphan blob" write_object(root, blob_id(orphan), orphan) result = _invoke(root, "--unreachable", "--json") data = json.loads(result.stdout) assert data["unreachable_objects"] >= 1 def test_count_objects_unreachable_empty_repo(tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--unreachable", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["unreachable_objects"] == 0 # --------------------------------------------------------------------------- # Security — read-only, no mutations # --------------------------------------------------------------------------- def test_count_objects_does_not_modify_store(tmp_path: pathlib.Path) -> None: """count-objects must not write, delete, or move any object files.""" root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) obj_dir = objects_dir(root) # Collect (path, mtime) before before = { str(p): p.stat().st_mtime for p in obj_dir.rglob("*") if p.is_file() } _invoke(root, "--json") _invoke(root, "--unreachable", "--json") # Collect after after = { str(p): p.stat().st_mtime for p in obj_dir.rglob("*") if p.is_file() } assert before == after, "count-objects modified the object store" # --------------------------------------------------------------------------- # Stress # --------------------------------------------------------------------------- def test_count_objects_large_store(tmp_path: pathlib.Path) -> None: """Store with 200 objects — count should be accurate.""" root = _init_repo(tmp_path) for i in range(200): content = f"stress object {i:04d}".encode() write_object(root, blob_id(content), content) result = _invoke(root, "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["loose_objects"] == 200 def test_count_objects_unreachable_large_repo(tmp_path: pathlib.Path) -> None: """10 commits with 10 files each — all referenced, unreachable = 0.""" root = _init_repo(tmp_path) for i in range(10): files = {f"pkg/file_{i}_{j}.py": f"# {i},{j}\n".encode() for j in range(10)} _commit_files(root, files) result = _invoke(root, "--unreachable", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) assert data["unreachable_objects"] == 0 # --------------------------------------------------------------------------- # TestJsonSchemaComplete # --------------------------------------------------------------------------- _REQUIRED_KEYS = frozenset({ "loose_objects", "loose_size_kb", "total_objects", "total_size_kb", "object_store_path", "duration_ms", "exit_code", }) _REQUIRED_KEYS_UNREACHABLE = _REQUIRED_KEYS | {"unreachable_objects"} _REQUIRED_KEYS_VERBOSE = _REQUIRED_KEYS | {"shards"} class TestJsonSchemaComplete: """Every required key must appear in every JSON output path.""" def test_base_keys_present(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--json") assert result.exit_code == 0 data = json.loads(result.stdout) missing = _REQUIRED_KEYS - data.keys() assert not missing, f"Missing keys: {missing}" def test_unreachable_keys_present(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--unreachable", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) missing = _REQUIRED_KEYS_UNREACHABLE - data.keys() assert not missing, f"Missing keys: {missing}" def test_verbose_keys_present(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--verbose", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) missing = _REQUIRED_KEYS_VERBOSE - data.keys() assert not missing, f"Missing keys: {missing}" def test_all_flags_keys_present(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--unreachable", "--verbose", "--json") assert result.exit_code == 0 data = json.loads(result.stdout) missing = (_REQUIRED_KEYS_UNREACHABLE | _REQUIRED_KEYS_VERBOSE) - data.keys() assert not missing, f"Missing keys: {missing}" def test_exit_code_field_is_zero_on_success(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--json") assert result.exit_code == 0 assert json.loads(result.stdout)["exit_code"] == 0 def test_exit_code_is_integer(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--json") assert isinstance(json.loads(result.stdout)["exit_code"], int) def test_json_is_compact(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--json") lines = [ln for ln in result.stdout.splitlines() if ln.strip()] assert len(lines) == 1, "JSON output must be a single line" def test_exit_code_in_json_matches_process_exit(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--json") assert json.loads(result.stdout)["exit_code"] == result.exit_code # --------------------------------------------------------------------------- # TestElapsedSeconds # --------------------------------------------------------------------------- class TestElapsedSeconds: """``duration_ms`` must be a non-negative float in every JSON path.""" def _assert_elapsed(self, data: Mapping[str, object]) -> None: # type: ignore[type-arg] assert "duration_ms" in data assert isinstance(data["duration_ms"], float) assert data["duration_ms"] >= 0.0 def test_elapsed_base(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--json") self._assert_elapsed(json.loads(result.stdout)) def test_elapsed_with_unreachable(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) result = _invoke(root, "--unreachable", "--json") self._assert_elapsed(json.loads(result.stdout)) def test_elapsed_with_verbose(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) result = _invoke(root, "--verbose", "--json") self._assert_elapsed(json.loads(result.stdout)) def test_elapsed_all_flags(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) result = _invoke(root, "--unreachable", "--verbose", "--json") self._assert_elapsed(json.loads(result.stdout)) def test_elapsed_is_float_not_int(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--json") data = json.loads(result.stdout) assert isinstance(data["duration_ms"], float) def test_elapsed_reasonable_upper_bound(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--json") assert json.loads(result.stdout)["duration_ms"] < 5.0 def test_elapsed_six_decimal_places(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--json") elapsed = json.loads(result.stdout)["duration_ms"] assert round(elapsed, 6) == elapsed # --------------------------------------------------------------------------- # TestExitCode # --------------------------------------------------------------------------- class TestExitCode: """``exit_code`` in JSON must mirror the process exit code.""" def test_exit_code_zero_empty_store(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--json") assert result.exit_code == 0 assert json.loads(result.stdout)["exit_code"] == 0 def test_exit_code_zero_with_objects(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) result = _invoke(root, "--json") assert result.exit_code == 0 assert json.loads(result.stdout)["exit_code"] == 0 def test_exit_code_zero_unreachable_flag(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# a\n"}) result = _invoke(root, "--unreachable", "--json") assert result.exit_code == 0 assert json.loads(result.stdout)["exit_code"] == 0 def test_exit_code_zero_verbose_flag(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--verbose", "--json") assert result.exit_code == 0 assert json.loads(result.stdout)["exit_code"] == 0 def test_exit_code_matches_process_exit(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) result = _invoke(root, "--json") assert json.loads(result.stdout)["exit_code"] == result.exit_code # --------------------------------------------------------------------------- # Data integrity — unreachable detection correctness with sha256: prefix # --------------------------------------------------------------------------- class TestUnreachableDetection: """Verify unreachable detection correctly handles sha256:-prefixed IDs.""" def test_all_committed_blobs_reachable(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"x.py": b"x = 1\n", "y.py": b"y = 2\n"}) result = _invoke(root, "--unreachable", "--json") assert result.exit_code == 0 assert json.loads(result.stdout)["unreachable_objects"] == 0 def test_orphan_blob_detected(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# committed\n"}) write_object(root, blob_id(b"orphan"), b"orphan") result = _invoke(root, "--unreachable", "--json") assert json.loads(result.stdout)["unreachable_objects"] >= 1 def test_multiple_orphans_all_counted(self, tmp_path: pathlib.Path) -> None: root = _init_repo(tmp_path) _commit_files(root, {"a.py": b"# committed\n"}) for i in range(5): content = f"orphan {i}".encode() write_object(root, blob_id(content), content) result = _invoke(root, "--unreachable", "--json") assert json.loads(result.stdout)["unreachable_objects"] >= 5 def test_reachable_set_uses_sha256_prefix(self, tmp_path: pathlib.Path) -> None: """_collect_reachable_ids must return sha256:-prefixed IDs.""" from muse.cli.commands.count_objects import _collect_reachable_ids root = _init_repo(tmp_path) content = b"# test\n" _commit_files(root, {"a.py": content}) ids = _collect_reachable_ids(root) assert len(ids) > 0 for obj_id in ids: assert obj_id.startswith("sha256:"), f"ID missing sha256: prefix: {obj_id!r}" class TestRegisterFlags: def test_default_json_out_is_false(self) -> None: import argparse from muse.cli.commands.count_objects import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["count-objects"]) assert args.json_out is False def test_json_flag_sets_json_out(self) -> None: import argparse from muse.cli.commands.count_objects import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["count-objects", "--json"]) assert args.json_out is True def test_j_shorthand_sets_json_out(self) -> None: import argparse from muse.cli.commands.count_objects import register p = argparse.ArgumentParser() subs = p.add_subparsers() register(subs) args = p.parse_args(["count-objects", "-j"]) assert args.json_out is True