gabriel / muse public

test_cmd_rev_parse.py file-level

at sha256:2 · View file ↗ · Intel ↗

History
1 files
1 commits
0 hotspots
0 🧊 dead
0 💥 blast risk
sha256:4 Merge branch 'dev' into main · gabriel · Jun 17, 2026
1 """Comprehensive tests for ``muse rev-parse``.
2
3 Coverage tiers
4 --------------
5 - Integration: branch, HEAD, SHA prefix, full SHA, --abbrev-ref, --format text
6 - Edge cases: empty repo (no commits), empty ref, ambiguous prefix, HEAD→branch
7 - Security: ANSI/control chars in ref → JSON-escaped, empty ref clean error
8 - Stress: 200 rapid resolves
9 """
10 from __future__ import annotations
11 from collections.abc import Mapping
12
13 import datetime
14 import json
15 import pathlib
16
17 import pytest
18 from muse.core.errors import ExitCode
19 from muse.core.object_store import write_object
20 from muse.core.ids import hash_commit, hash_snapshot
21 from muse.core.commits import (
22 CommitRecord,
23 write_commit,
24 )
25 from muse.core.snapshots import (
26 SnapshotRecord,
27 write_snapshot,
28 )
29 from muse.core.types import Manifest, long_id, split_id
30 from muse.core.paths import muse_dir, ref_path
31 from tests.cli_test_helper import CliRunner, InvokeResult
32
33 runner = CliRunner()
34
35 # ---------------------------------------------------------------------------
36 # Helpers
37 # ---------------------------------------------------------------------------
38
39 def _make_repo(tmp_path: pathlib.Path, branch: str = "main") -> pathlib.Path:
40 repo = tmp_path / "repo"
41 dot_muse = muse_dir(repo)
42 for sub in ("objects", "commits", "snapshots", "refs/heads"):
43 (dot_muse / sub).mkdir(parents=True)
44 (dot_muse / "HEAD").write_text(f"ref: refs/heads/{branch}")
45 (dot_muse / "repo.json").write_text(json.dumps({"repo_id": "test", "domain": "code"}))
46 return repo
47
48
49 _TS = datetime.datetime(2026, 1, 1, tzinfo=datetime.timezone.utc)
50
51
52 def _store_snap(repo: pathlib.Path, manifest: Manifest | None = None) -> str:
53 sid = hash_snapshot(manifest or {})
54 write_snapshot(repo, SnapshotRecord(
55 snapshot_id=sid,
56 manifest=manifest or {},
57 created_at=_TS,
58 ))
59 return sid
60
61
62 def _make_commit(
63 repo: pathlib.Path,
64 snapshot_id: str,
65 *,
66 branch: str = "main",
67 parent: str | None = None,
68 message: str = "test",
69 ) -> str:
70 parents = [parent] if parent else []
71 cid = hash_commit(
72 parent_ids=parents,
73 snapshot_id=snapshot_id,
74 message=message,
75 committed_at_iso=_TS.isoformat(),
76 author="tester",
77 )
78 rec = CommitRecord(
79 commit_id=cid,
80 branch=branch,
81 snapshot_id=snapshot_id,
82 message=message,
83 committed_at=_TS,
84 author="tester",
85 parent_commit_id=parent,
86 )
87 write_commit(repo, rec)
88 return cid
89
90
91 def _set_head(repo: pathlib.Path, branch: str, commit_id: str) -> None:
92 ref = ref_path(repo, branch)
93 ref.parent.mkdir(parents=True, exist_ok=True)
94 ref.write_text(commit_id)
95
96
97 def _rev(repo: pathlib.Path, *args: str) -> InvokeResult:
98 from muse.cli.app import main as cli
99 return runner.invoke(
100 cli,
101 ["rev-parse", *args],
102 env={"MUSE_REPO_ROOT": str(repo)},
103 )
104
105
106 def _populated_repo(tmp_path: pathlib.Path) -> tuple[pathlib.Path, str]:
107 """Return (repo, commit_id) with one commit on main, using real content-addressed IDs."""
108 repo = _make_repo(tmp_path)
109 sid = _store_snap(repo)
110 cid = _make_commit(repo, sid)
111 _set_head(repo, "main", cid)
112 return repo, cid
113
114
115 # ---------------------------------------------------------------------------
116 # Integration — branch resolution
117 # ---------------------------------------------------------------------------
118
119
120 # ---------------------------------------------------------------------------
121 # New: default format is text, --json makes it meaningful
122 # ---------------------------------------------------------------------------
123
124
125 class TestDefaultFormat:
126 def test_default_output_is_text(self, tmp_path: pathlib.Path) -> None:
127 """Without --json the output is a plain commit ID."""
128 repo, cid = _populated_repo(tmp_path)
129 result = _rev(repo, "main")
130 assert result.exit_code == 0
131 assert result.output.strip() == cid
132
133 def test_no_flags_output_is_not_json(self, tmp_path: pathlib.Path) -> None:
134 """Default plain-text output is not parseable as JSON."""
135 repo, cid = _populated_repo(tmp_path)
136 result = _rev(repo, "main")
137 assert result.exit_code == 0
138 with pytest.raises((json.JSONDecodeError, ValueError)):
139 json.loads(result.output)
140
141 def test_json_flag_gives_dict_output(self, tmp_path: pathlib.Path) -> None:
142 """With --json output is a dict."""
143 repo, cid = _populated_repo(tmp_path)
144 result = _rev(repo, "--json", "main")
145 assert result.exit_code == 0
146 data = json.loads(result.output)
147 assert data["commit_id"] == cid
148 assert data["ref"] == "main"
149
150 def test_text_vs_json_differ(self, tmp_path: pathlib.Path) -> None:
151 """Plain text and --json outputs differ in structure."""
152 repo, cid = _populated_repo(tmp_path)
153 text_result = _rev(repo, "main")
154 json_result = _rev(repo, "--json", "main")
155 assert text_result.output.strip() == cid
156 assert json.loads(json_result.output)["commit_id"] == cid
157
158
159 # ---------------------------------------------------------------------------
160 # New: sha256: prefix is required; bare hex is rejected
161 # ---------------------------------------------------------------------------
162
163
164 class TestSha256PrefixRequired:
165 def test_bare_full_hex_rejected(self, tmp_path: pathlib.Path) -> None:
166 """64-char bare hex without sha256: prefix must be rejected."""
167 repo, cid = _populated_repo(tmp_path)
168 result = _rev(repo, split_id(cid)[1])
169 assert result.exit_code == ExitCode.USER_ERROR
170 data = json.loads(result.output)
171 assert "sha256:" in data["error"]
172
173 def test_bare_short_hex_rejected(self, tmp_path: pathlib.Path) -> None:
174 """Short bare hex without sha256: prefix must be rejected."""
175 repo, cid = _populated_repo(tmp_path)
176 result = _rev(repo, split_id(cid)[1][:8]) # 8 bare hex chars, no prefix
177 assert result.exit_code == ExitCode.USER_ERROR
178 data = json.loads(result.output)
179 assert "sha256:" in data["error"]
180
181 def test_canonical_full_id_resolves(self, tmp_path: pathlib.Path) -> None:
182 """sha256:<64hex> must resolve to the commit."""
183 repo, cid = _populated_repo(tmp_path)
184 result = _rev(repo, "--json", cid)
185 assert result.exit_code == 0
186 assert json.loads(result.output)["commit_id"] == cid
187
188 def test_canonical_prefix_resolves(self, tmp_path: pathlib.Path) -> None:
189 """sha256:<8hex> prefix must resolve to the commit."""
190 repo, cid = _populated_repo(tmp_path)
191 prefix = long_id(split_id(cid)[1][:8])# sha256: + 8 hex chars
192 result = _rev(repo, "--json", prefix)
193 assert result.exit_code == 0
194 assert json.loads(result.output)["commit_id"] == cid
195
196
197 # ---------------------------------------------------------------------------
198 # Integration — branch resolution
199 # ---------------------------------------------------------------------------
200
201
202 class TestBranchResolution:
203 def test_resolve_branch_json(self, tmp_path: pathlib.Path) -> None:
204 repo, cid = _populated_repo(tmp_path)
205 result = _rev(repo, "--json", "main")
206 assert result.exit_code == 0
207 data = json.loads(result.output)
208 assert data["commit_id"] == cid
209 assert data["ref"] == "main"
210
211 def test_resolve_branch_text(self, tmp_path: pathlib.Path) -> None:
212 repo, cid = _populated_repo(tmp_path)
213 result = _rev(repo, "main")
214 assert result.exit_code == 0
215 assert result.output.strip() == cid
216
217 def test_json_flag_shorthand(self, tmp_path: pathlib.Path) -> None:
218 repo, cid = _populated_repo(tmp_path)
219 result = _rev(repo, "--json", "main")
220 assert result.exit_code == 0
221 data = json.loads(result.output)
222 assert data["commit_id"] == cid
223
224 def test_unknown_branch_not_found(self, tmp_path: pathlib.Path) -> None:
225 repo = _make_repo(tmp_path)
226 result = _rev(repo, "nonexistent-branch")
227 assert result.exit_code == ExitCode.USER_ERROR
228 data = json.loads(result.output)
229 assert data["commit_id"] is None
230 assert data["error"] == "not found"
231
232
233 # ---------------------------------------------------------------------------
234 # Integration — HEAD resolution
235 # ---------------------------------------------------------------------------
236
237
238 class TestHeadResolution:
239 def test_resolve_head(self, tmp_path: pathlib.Path) -> None:
240 repo, cid = _populated_repo(tmp_path)
241 result = _rev(repo, "--json", "HEAD")
242 assert result.exit_code == 0
243 data = json.loads(result.output)
244 assert data["commit_id"] == cid
245
246 def test_head_lowercase_also_resolves(self, tmp_path: pathlib.Path) -> None:
247 """HEAD resolution is case-insensitive (matches git behaviour)."""
248 repo, cid = _populated_repo(tmp_path)
249 result = _rev(repo, "--json", "head")
250 assert result.exit_code == 0
251 data = json.loads(result.output)
252 assert data["commit_id"] == cid
253
254 def test_head_on_empty_repo_errors(self, tmp_path: pathlib.Path) -> None:
255 """HEAD on a repo with no commits should error cleanly."""
256 repo = _make_repo(tmp_path)
257 result = _rev(repo, "HEAD")
258 assert result.exit_code == ExitCode.USER_ERROR
259 data = json.loads(result.output)
260 assert data["commit_id"] is None
261 assert "no commits" in data["error"]
262
263
264 # ---------------------------------------------------------------------------
265 # Integration — SHA prefix resolution
266 # ---------------------------------------------------------------------------
267
268
269 class TestShaResolution:
270 def test_resolve_full_sha(self, tmp_path: pathlib.Path) -> None:
271 repo, cid = _populated_repo(tmp_path)
272 result = _rev(repo, "--json", cid)
273 assert result.exit_code == 0
274 data = json.loads(result.output)
275 assert data["commit_id"] == cid
276
277 def test_resolve_8char_prefix(self, tmp_path: pathlib.Path) -> None:
278 repo, cid = _populated_repo(tmp_path)
279 prefix = long_id(split_id(cid)[1][:8])# sha256: + first 8 hex chars
280 result = _rev(repo, "--json", prefix)
281 assert result.exit_code == 0
282 data = json.loads(result.output)
283 assert data["commit_id"] == cid
284
285 def test_ambiguous_prefix_returns_candidates(self, tmp_path: pathlib.Path) -> None:
286 """Two commits sharing a prefix → error with candidates list."""
287 # Messages "commit-search-143" and "commit-search-346" produce IDs
288 # sharing the 4-char hex prefix "c1d3" (same snapshot, same timestamp).
289 _AMBIG_MSG_1 = "commit-search-143"
290 _AMBIG_MSG_2 = "commit-search-346"
291 _AMBIG_PREFIX = "c1d3"
292
293 repo = _make_repo(tmp_path)
294 sid = _store_snap(repo)
295 cid1 = _make_commit(repo, sid, branch="main", message=_AMBIG_MSG_1)
296 cid2 = _make_commit(repo, sid, branch="dev", message=_AMBIG_MSG_2)
297 # cid1/cid2 are sha256:<hex>; compare the hex portion only
298 assert split_id(cid1)[1][:4] == split_id(cid2)[1][:4] == _AMBIG_PREFIX
299 _set_head(repo, "main", cid1)
300 _set_head(repo, "dev", cid2)
301
302 result = _rev(repo, long_id(_AMBIG_PREFIX))
303 assert result.exit_code == ExitCode.USER_ERROR
304 data = json.loads(result.output)
305 assert data["error"] == "ambiguous"
306 assert set(data["candidates"]) == {cid1, cid2}
307
308 def test_nonexistent_full_sha_not_found(self, tmp_path: pathlib.Path) -> None:
309 repo = _make_repo(tmp_path)
310 result = _rev(repo, long_id("f" * 64))
311 assert result.exit_code == ExitCode.USER_ERROR
312 data = json.loads(result.output)
313 assert data["error"] == "not found"
314
315
316 # ---------------------------------------------------------------------------
317 # Integration — --abbrev-ref
318 # ---------------------------------------------------------------------------
319
320
321 class TestAbbrevRef:
322 def test_abbrev_ref_head_returns_branch_name(self, tmp_path: pathlib.Path) -> None:
323 """The canonical agent UX: what branch am I on?"""
324 repo = _make_repo(tmp_path, branch="feat/my-feature")
325 result = _rev(repo, "--abbrev-ref", "--json", "HEAD")
326 assert result.exit_code == 0
327 data = json.loads(result.output)
328 assert data["branch"] == "feat/my-feature"
329 assert data["ref"] == "HEAD"
330
331 def test_abbrev_ref_text_format(self, tmp_path: pathlib.Path) -> None:
332 repo = _make_repo(tmp_path, branch="dev")
333 result = _rev(repo, "--abbrev-ref", "HEAD")
334 assert result.exit_code == 0
335 assert result.output.strip() == "dev"
336
337 def test_abbrev_ref_main(self, tmp_path: pathlib.Path) -> None:
338 repo = _make_repo(tmp_path, branch="main")
339 result = _rev(repo, "--abbrev-ref", "--json", "HEAD")
340 assert result.exit_code == 0
341 assert json.loads(result.output)["branch"] == "main"
342
343
344 # ---------------------------------------------------------------------------
345 # Edge cases
346 # ---------------------------------------------------------------------------
347
348
349 class TestEdgeCases:
350 def test_empty_ref_clean_error(self, tmp_path: pathlib.Path) -> None:
351 """Empty string ref must give a clear 'ref must not be empty' error."""
352 repo = _make_repo(tmp_path)
353 result = _rev(repo, "")
354 assert result.exit_code == ExitCode.USER_ERROR
355 data = json.loads(result.output)
356 assert "empty" in data["error"]
357
358 def test_unrecognized_flag_errors(self, tmp_path: pathlib.Path) -> None:
359 repo, _ = _populated_repo(tmp_path)
360 result = _rev(repo, "--no-such-flag", "main")
361 assert result.exit_code != 0
362
363 def test_branch_with_slash_resolves(self, tmp_path: pathlib.Path) -> None:
364 repo = _make_repo(tmp_path, branch="feat/my-feature")
365 sid = _store_snap(repo)
366 cid = _make_commit(repo, sid, branch="feat/my-feature", message="feat-init")
367 _set_head(repo, "feat/my-feature", cid)
368 result = _rev(repo, "--json", "feat/my-feature")
369 assert result.exit_code == 0
370 assert json.loads(result.output)["commit_id"] == cid
371
372
373 # ---------------------------------------------------------------------------
374 # Security
375 # ---------------------------------------------------------------------------
376
377
378 class TestSecurity:
379 def test_ansi_in_ref_is_json_escaped(self, tmp_path: pathlib.Path) -> None:
380 """ANSI escape in ref is safely JSON-encoded, never echoed raw."""
381 repo = _make_repo(tmp_path)
382 malicious = "\x1b[31mmalicious\x1b[0m"
383 result = _rev(repo, malicious)
384 assert result.exit_code == ExitCode.USER_ERROR
385 # Output is JSON — ANSI must be encoded as \u001b, not emitted raw
386 assert "\x1b" not in result.output
387 data = json.loads(result.output)
388 assert data["error"] == "not found"
389
390 def test_path_traversal_ref_gives_not_found(self, tmp_path: pathlib.Path) -> None:
391 repo = _make_repo(tmp_path)
392 result = _rev(repo, "../../../etc/passwd")
393 assert result.exit_code == ExitCode.USER_ERROR
394
395 def test_null_byte_in_ref(self, tmp_path: pathlib.Path) -> None:
396 repo = _make_repo(tmp_path)
397 result = _rev(repo, "branch\x00null")
398 assert result.exit_code == ExitCode.USER_ERROR
399
400 def test_no_traceback_on_bad_input(self, tmp_path: pathlib.Path) -> None:
401 repo = _make_repo(tmp_path)
402 result = _rev(repo, "")
403 assert "Traceback" not in result.output
404
405
406 # ---------------------------------------------------------------------------
407 # JSON schema — duration_ms + exit_code on every output path
408 # ---------------------------------------------------------------------------
409
410
411 class TestJsonSchema:
412 """Every JSON response must carry duration_ms (float ≥ 0) and exit_code (int)."""
413
414 def _assert_schema(self, d: Mapping[str, object], expected_exit: int = 0) -> None:
415 assert "duration_ms" in d, f"duration_ms missing: {d}"
416 assert isinstance(d["duration_ms"], (int, float))
417 assert d["duration_ms"] >= 0
418 assert "exit_code" in d, f"exit_code missing: {d}"
419 assert d["exit_code"] == expected_exit
420
421 def test_branch_resolution_has_schema(self, tmp_path: pathlib.Path) -> None:
422 repo, cid = _populated_repo(tmp_path)
423 result = _rev(repo, "--json", "main")
424 self._assert_schema(json.loads(result.output))
425
426 def test_head_resolution_has_schema(self, tmp_path: pathlib.Path) -> None:
427 repo, cid = _populated_repo(tmp_path)
428 result = _rev(repo, "--json", "HEAD")
429 self._assert_schema(json.loads(result.output))
430
431 def test_sha_resolution_has_schema(self, tmp_path: pathlib.Path) -> None:
432 repo, cid = _populated_repo(tmp_path)
433 result = _rev(repo, "--json", cid)
434 self._assert_schema(json.loads(result.output))
435
436 def test_abbrev_ref_has_schema(self, tmp_path: pathlib.Path) -> None:
437 repo = _make_repo(tmp_path, branch="feat/x")
438 result = _rev(repo, "--abbrev-ref", "--json", "HEAD")
439 self._assert_schema(json.loads(result.output))
440
441 def test_prefix_resolution_has_schema(self, tmp_path: pathlib.Path) -> None:
442 repo, cid = _populated_repo(tmp_path)
443 prefix = long_id(split_id(cid)[1][:8])
444 result = _rev(repo, "--json", prefix)
445 self._assert_schema(json.loads(result.output))
446
447
448 # ---------------------------------------------------------------------------
449 # Error JSON — all error paths emit structured JSON to stdout
450 # ---------------------------------------------------------------------------
451
452
453 class TestErrorJson:
454 """Every error must emit a parseable JSON dict to stdout (not stderr)."""
455
456 def _assert_error(self, result: InvokeResult) -> Mapping[str, object]:
457 assert result.exit_code != 0, "expected non-zero exit"
458 d = json.loads(result.output) # stdout, not stderr
459 assert "error" in d
460 assert "duration_ms" in d, f"duration_ms missing from error: {d}"
461 assert "exit_code" in d
462 assert d["exit_code"] != 0
463 return d
464
465 def test_empty_ref_emits_json_to_stdout(self, tmp_path: pathlib.Path) -> None:
466 """Empty ref error must land on stdout as JSON, not stderr."""
467 repo = _make_repo(tmp_path)
468 result = _rev(repo, "")
469 self._assert_error(result)
470 assert "empty" in json.loads(result.output)["error"]
471
472 def test_not_found_emits_json_to_stdout(self, tmp_path: pathlib.Path) -> None:
473 """Not-found error must land on stdout as JSON, not stderr."""
474 repo = _make_repo(tmp_path)
475 result = _rev(repo, "no-such-ref")
476 self._assert_error(result)
477 assert json.loads(result.output)["error"] == "not found"
478
479 def test_not_found_has_schema(self, tmp_path: pathlib.Path) -> None:
480 repo = _make_repo(tmp_path)
481 result = _rev(repo, "nonexistent-branch")
482 self._assert_error(result)
483 assert json.loads(result.output)["error"] == "not found"
484
485 def test_ambiguous_prefix_has_schema(self, tmp_path: pathlib.Path) -> None:
486 _AMBIG_MSG_1 = "commit-search-143"
487 _AMBIG_MSG_2 = "commit-search-346"
488 _AMBIG_PREFIX = "c1d3"
489 repo = _make_repo(tmp_path)
490 sid = _store_snap(repo)
491 cid1 = _make_commit(repo, sid, branch="main", message=_AMBIG_MSG_1)
492 cid2 = _make_commit(repo, sid, branch="dev", message=_AMBIG_MSG_2)
493 assert split_id(cid1)[1][:4] == split_id(cid2)[1][:4] == _AMBIG_PREFIX
494 _set_head(repo, "main", cid1)
495 _set_head(repo, "dev", cid2)
496 result = _rev(repo, long_id(_AMBIG_PREFIX))
497 d = self._assert_error(result)
498 assert d["error"] == "ambiguous"
499
500 def test_head_no_commits_has_schema(self, tmp_path: pathlib.Path) -> None:
501 repo = _make_repo(tmp_path)
502 result = _rev(repo, "HEAD")
503 self._assert_error(result)
504 assert "no commits" in json.loads(result.output)["error"]
505
506 def test_bare_hex_has_schema(self, tmp_path: pathlib.Path) -> None:
507 repo, cid = _populated_repo(tmp_path)
508 result = _rev(repo, split_id(cid)[1])
509 d = self._assert_error(result)
510 assert "sha256:" in d["error"]
511
512 def test_error_json_has_ref_key(self, tmp_path: pathlib.Path) -> None:
513 """Every error dict must echo back the ref the caller passed."""
514 repo = _make_repo(tmp_path)
515 result = _rev(repo, "missing-branch")
516 d = json.loads(result.output)
517 assert d["ref"] == "missing-branch"
518
519
520 class TestRegisterFlags:
521 def test_default_json_out_is_false(self) -> None:
522 import argparse
523 from muse.cli.commands.rev_parse import register
524 p = argparse.ArgumentParser()
525 subs = p.add_subparsers()
526 register(subs)
527 args = p.parse_args(["rev-parse", "HEAD"])
528 assert args.json_out is False
529
530 def test_json_flag_sets_json_out(self) -> None:
531 import argparse
532 from muse.cli.commands.rev_parse import register
533 p = argparse.ArgumentParser()
534 subs = p.add_subparsers()
535 register(subs)
536 args = p.parse_args(["rev-parse", "HEAD", "--json"])
537 assert args.json_out is True
538
539 def test_j_shorthand_sets_json_out(self) -> None:
540 import argparse
541 from muse.cli.commands.rev_parse import register
542 p = argparse.ArgumentParser()
543 subs = p.add_subparsers()
544 register(subs)
545 args = p.parse_args(["rev-parse", "HEAD", "-j"])
546 assert args.json_out is True