"""hash-object: canonical sha256: prefix and agent-ready JSON schema. Every object ID emitted by ``muse hash-object`` must carry the ``sha256:`` prefix. Bare hex is only acceptable at the disk boundary (the filename on disk). This test suite enforces that invariant and covers the new agent-ready JSON fields. Test categories --------------- TestCanonicalPrefix — object_id always starts with 'sha256:' TestStdinWriteFixed — stdin + --write was broken (bare hex bug); now fixed TestAgentFields — duration_ms, exit_code, size_bytes in JSON output TestTextOutputPrefix — text format also carries the prefix TestCrossCheck — file and stdin produce identical canonical IDs """ from __future__ import annotations import argparse import json import pathlib from muse.core.errors import ExitCode from tests.cli_test_helper import CliRunner, InvokeResult from muse.core.types import blob_id, split_id from muse.core.paths import muse_dir from muse.core.object_store import object_path, read_object runner = CliRunner() # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _run(*args: str, stdin: bytes | None = None, repo: pathlib.Path | None = None) -> InvokeResult: from muse.cli.app import main as cli env = {"MUSE_REPO_ROOT": str(repo)} if repo else {} return runner.invoke(cli, ["hash-object", *args], input=stdin, env=env) def _make_repo(tmp_path: pathlib.Path) -> pathlib.Path: repo = tmp_path / "repo" dot_muse = muse_dir(repo) for sub in ("objects", "commits", "snapshots", "refs/heads"): (dot_muse / sub).mkdir(parents=True) (dot_muse / "HEAD").write_text("ref: refs/heads/main") (dot_muse / "repo.json").write_text( json.dumps({"repo_id": "test", "domain": "code"}) ) return repo # --------------------------------------------------------------------------- # TestCanonicalPrefix # --------------------------------------------------------------------------- class TestCanonicalPrefix: """object_id in JSON output must always start with 'sha256:'.""" def test_file_json_object_id_has_prefix(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "f.txt" f.write_bytes(b"hello") result = _run("--json", str(f)) assert result.exit_code == 0 data = json.loads(result.output) assert data["object_id"].startswith("sha256:"), ( f"object_id must start with 'sha256:' — got {data['object_id']!r}" ) def test_file_json_object_id_correct_length(self, tmp_path: pathlib.Path) -> None: """sha256: (7 chars) + 64 hex = 71 total.""" f = tmp_path / "f.txt" f.write_bytes(b"hello") result = _run("--json", str(f)) data = json.loads(result.output) assert len(data["object_id"]) == 71 def test_file_json_object_id_matches_canonical(self, tmp_path: pathlib.Path) -> None: content = b"canonical check" f = tmp_path / "f.txt" f.write_bytes(content) result = _run("--json", str(f)) data = json.loads(result.output) assert data["object_id"] == blob_id(content) def test_stdin_json_object_id_has_prefix(self, tmp_path: pathlib.Path) -> None: result = _run("--json", "--stdin", stdin=b"from stdin") assert result.exit_code == 0 data = json.loads(result.output) assert data["object_id"].startswith("sha256:") def test_stdin_json_object_id_matches_canonical(self, tmp_path: pathlib.Path) -> None: content = b"piped data" result = _run("--json", "--stdin", stdin=content) data = json.loads(result.output) assert data["object_id"] == blob_id(content) def test_empty_file_has_prefix(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "empty.txt" f.write_bytes(b"") result = _run("--json", str(f)) data = json.loads(result.output) assert data["object_id"] == blob_id(b"") def test_empty_stdin_has_prefix(self, tmp_path: pathlib.Path) -> None: result = _run("--json", "--stdin", stdin=b"") data = json.loads(result.output) assert data["object_id"] == blob_id(b"") def test_no_bare_hex_in_json_output(self, tmp_path: pathlib.Path) -> None: """The raw 64-char hex without prefix must not appear as object_id.""" content = b"no bare hex" f = tmp_path / "f.txt" f.write_bytes(content) result = _run("--json", str(f)) data = json.loads(result.output) bare_hex = split_id(blob_id(content))[1] assert data["object_id"] != bare_hex, ( "object_id must be 'sha256:', not bare hex" ) # --------------------------------------------------------------------------- # TestStdinWriteFixed # --------------------------------------------------------------------------- class TestStdinWriteFixed: """stdin + --write was broken (passed bare hex to write_object). Now fixed.""" def test_stdin_write_exits_zero(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _run("--stdin", "--write", stdin=b"store me", repo=repo) assert result.exit_code == 0, f"exit {result.exit_code}: {result.output}" def test_stdin_write_stored_true(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) result = _run("--json", "--stdin", "--write", stdin=b"store me", repo=repo) assert json.loads(result.output)["stored"] is True def test_stdin_write_object_file_exists(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) content = b"stdin stored content" result = _run("--json", "--stdin", "--write", stdin=content, repo=repo) oid = json.loads(result.output)["object_id"] obj_file = object_path(repo, oid) assert obj_file.exists(), f"object file not found at {obj_file}" assert read_object(repo, oid) == content def test_stdin_write_object_id_canonical(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) content = b"canonical write" result = _run("--json", "--stdin", "--write", stdin=content, repo=repo) data = json.loads(result.output) assert data["object_id"] == blob_id(content) def test_stdin_write_idempotent(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) content = b"write twice" _run("--stdin", "--write", stdin=content, repo=repo) result2 = _run("--json", "--stdin", "--write", stdin=content, repo=repo) assert result2.exit_code == 0 assert json.loads(result2.output)["stored"] is False # --------------------------------------------------------------------------- # TestAgentFields # --------------------------------------------------------------------------- class TestAgentFields: """JSON output must include duration_ms, exit_code, size_bytes.""" def test_duration_ms_present(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "f.txt" f.write_bytes(b"timing") data = json.loads(_run("--json", str(f)).output) assert "duration_ms" in data, "JSON must include duration_ms" def test_duration_ms_non_negative(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "f.txt" f.write_bytes(b"timing") data = json.loads(_run("--json", str(f)).output) assert data["duration_ms"] >= 0 def test_exit_code_present(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "f.txt" f.write_bytes(b"x") data = json.loads(_run("--json", str(f)).output) assert "exit_code" in data, "JSON must include exit_code" def test_exit_code_zero_on_success(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "f.txt" f.write_bytes(b"x") data = json.loads(_run("--json", str(f)).output) assert data["exit_code"] == 0 def test_size_bytes_present(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "f.txt" f.write_bytes(b"twelve bytes") data = json.loads(_run("--json", str(f)).output) assert "size_bytes" in data, "JSON must include size_bytes" def test_size_bytes_correct_for_file(self, tmp_path: pathlib.Path) -> None: content = b"twelve bytes" f = tmp_path / "f.txt" f.write_bytes(content) data = json.loads(_run("--json", str(f)).output) assert data["size_bytes"] == len(content) def test_size_bytes_correct_for_stdin(self, tmp_path: pathlib.Path) -> None: content = b"stdin payload" data = json.loads(_run("--json", "--stdin", stdin=content).output) assert data["size_bytes"] == len(content) def test_size_bytes_zero_for_empty(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "empty.txt" f.write_bytes(b"") data = json.loads(_run("--json", str(f)).output) assert data["size_bytes"] == 0 def test_stdin_duration_ms_present(self, tmp_path: pathlib.Path) -> None: data = json.loads(_run("--json", "--stdin", stdin=b"x").output) assert "duration_ms" in data def test_stdin_exit_code_present(self, tmp_path: pathlib.Path) -> None: data = json.loads(_run("--json", "--stdin", stdin=b"x").output) assert "exit_code" in data # --------------------------------------------------------------------------- # TestTextOutputPrefix # --------------------------------------------------------------------------- class TestTextOutputPrefix: """Text format must also emit the sha256: prefix.""" def test_text_file_has_prefix(self, tmp_path: pathlib.Path) -> None: f = tmp_path / "f.txt" f.write_bytes(b"text output") result = _run(str(f)) assert result.exit_code == 0 assert result.output.strip().startswith("sha256:") def test_text_stdin_has_prefix(self, tmp_path: pathlib.Path) -> None: result = _run("--stdin", stdin=b"text stdin") assert result.output.strip().startswith("sha256:") def test_text_output_is_correct_canonical_id(self, tmp_path: pathlib.Path) -> None: content = b"text canonical" f = tmp_path / "f.txt" f.write_bytes(content) result = _run(str(f)) assert result.output.strip() == blob_id(content) def test_text_length_is_71(self, tmp_path: pathlib.Path) -> None: """sha256: (7) + 64 hex = 71 characters.""" f = tmp_path / "f.txt" f.write_bytes(b"length check") result = _run(str(f)) assert len(result.output.strip()) == 71 # --------------------------------------------------------------------------- # TestCrossCheck # --------------------------------------------------------------------------- class TestCrossCheck: """File and stdin paths produce identical canonical IDs for the same bytes.""" def test_file_and_stdin_same_id(self, tmp_path: pathlib.Path) -> None: content = b"cross check content" f = tmp_path / "f.txt" f.write_bytes(content) file_id = json.loads(_run("--json", str(f)).output)["object_id"] stdin_id = json.loads(_run("--json", "--stdin", stdin=content).output)["object_id"] assert file_id == stdin_id def test_write_file_and_stdin_same_id(self, tmp_path: pathlib.Path) -> None: repo = _make_repo(tmp_path) content = b"write cross check" f = repo / "f.txt" f.write_bytes(content) file_id = json.loads(_run("--json", "--write", str(f), repo=repo).output)["object_id"] stdin_id = json.loads( _run("--json", "--stdin", "--write", stdin=content, repo=repo).output )["object_id"] assert file_id == stdin_id def test_hash_bytes_returns_canonical(self) -> None: """_hash_bytes must return sha256:-prefixed ID, not bare hex.""" from muse.cli.commands.hash_object import _hash_bytes result = _hash_bytes(b"test data") assert result.startswith("sha256:"), ( f"_hash_bytes must return 'sha256:', got {result!r}" ) assert len(result) == 71 # --------------------------------------------------------------------------- # TestRegisterFlags — argparse-level verification # --------------------------------------------------------------------------- class TestRegisterFlags: """Verify that register() wires --json / -j correctly.""" def _make_parser(self) -> "argparse.ArgumentParser": import argparse from muse.cli.commands.hash_object import register ap = argparse.ArgumentParser() subs = ap.add_subparsers() register(subs) return ap def test_json_flag_long(self) -> None: ns = self._make_parser().parse_args(["hash-object", "--stdin", "--json"]) assert ns.json_out is True def test_j_alias(self) -> None: ns = self._make_parser().parse_args(["hash-object", "--stdin", "-j"]) assert ns.json_out is True def test_default_is_text(self) -> None: ns = self._make_parser().parse_args(["hash-object", "--stdin"]) assert ns.json_out is False def test_dest_is_json_out(self) -> None: ns = self._make_parser().parse_args(["hash-object", "--stdin", "-j"]) assert hasattr(ns, "json_out") assert not hasattr(ns, "fmt")