gabriel / muse public
test_trust_supercharge.py python
488 lines 19.9 KB
Raw
1 """Supercharge tests for ``muse trust``.
2
3 Covers gaps identified in the review:
4 - duration_ms + exit_code in all five subcommand JSON outputs
5 - JSON errors emitted to stdout in JSON mode (IO errors, bad state)
6 - count field in list / hub-list JSON (agent convenience)
7 - OSError handling (permission denied writing config)
8 - Data integrity: path normalisation, idempotent add
9 - Performance: duration_ms is a float and within bounds
10 - Security: relative paths always normalised to absolute
11 - Concurrency: isolated concurrent trust add / list
12
13 Unit
14 ----
15 U1-U10 duration_ms and exit_code present in every JSON success path
16
17 Error JSON to stdout
18 ---------------------
19 E1 IO error in add → {"error":..., "duration_ms":..., "exit_code":3} on stdout
20 E2 IO error in remove → same pattern
21
22 Error JSON schema
23 -----------------
24 S1 IO-error response has "error", "message", "duration_ms", "exit_code"
25
26 Schema completeness
27 -------------------
28 SC1 trust list --json has "count" == len(trusted_dirs)
29 SC2 trust hub-list --json has "count" == len(fingerprints)
30
31 Data integrity
32 --------------
33 D1 Relative path is expanded to absolute in add JSON output
34 D2 trust add is idempotent: second add still returns added=true, no duplicate
35 D3 trust remove on absent path: removed=false, exit_code=0
36 D4 trust hub-reset on absent hostname: reset=false, exit_code=0
37 D5 duration_ms reflects real elapsed (≥ 0, not null)
38
39 Performance
40 -----------
41 P1 duration_ms is a float (not int)
42 P2 duration_ms is under 5000 ms for all five subcommands
43
44 Security
45 --------
46 Sec1 Relative path normalised: args.path="." → abs path in output
47 Sec2 Path with .. collapses: /tmp/x/../y → /tmp/y
48 Sec3 trust add with ANSI path still writes abs path (sanitise_display safe)
49
50 Concurrency
51 -----------
52 C1 8 parallel trust add on isolated config files do not corrupt each other
53 C2 8 parallel trust list reads return consistent results
54 """
55
56 from __future__ import annotations
57
58 import json
59 import os
60 import pathlib
61 import threading
62 import time
63 import unittest.mock
64
65 import pytest
66
67 from tests.cli_test_helper import CliRunner
68
69 runner = CliRunner()
70 _CHDIR_LOCK = threading.Lock()
71
72
73 # ---------------------------------------------------------------------------
74 # Shared helper: same isolation pattern as test_cmd_trust_json.py
75 # ---------------------------------------------------------------------------
76
77 def _invoke_home(args: list[str], muse_home: pathlib.Path) -> tuple[int, str, str]:
78 """Run CLI with config files redirected to *muse_home*."""
79 import muse.cli.config as cfg_mod
80 import muse.core.hub_trust as ht_mod
81
82 config_file = muse_home / "config.toml"
83 hub_trust_file = muse_home / "hub_trust.toml"
84
85 with (
86 unittest.mock.patch.object(cfg_mod, "_GLOBAL_CONFIG_FILE", config_file),
87 unittest.mock.patch.object(ht_mod, "_HUB_TRUST_FILE", hub_trust_file),
88 ):
89 result = runner.invoke(None, args, env={})
90 return result.exit_code, result.stdout, result.stderr
91
92
93 def _add(home: pathlib.Path, path: str) -> None:
94 _invoke_home(["trust", "add", path], home)
95
96
97 # ---------------------------------------------------------------------------
98 # U1-U10 — duration_ms + exit_code in every JSON success path
99 # ---------------------------------------------------------------------------
100
101 class TestElapsedMsExitCode:
102 def test_u1_add_has_duration_ms(self, tmp_path: pathlib.Path) -> None:
103 """U1 — trust add --json includes duration_ms."""
104 rc, out, _ = _invoke_home(["trust", "add", "/some/path", "--json"], tmp_path)
105 assert rc == 0
106 data = json.loads(out)
107 assert "duration_ms" in data, f"Missing duration_ms in: {data}"
108
109 def test_u2_add_has_exit_code(self, tmp_path: pathlib.Path) -> None:
110 """U2 — trust add --json includes exit_code == 0."""
111 rc, out, _ = _invoke_home(["trust", "add", "/some/path", "--json"], tmp_path)
112 data = json.loads(out)
113 assert data["exit_code"] == 0
114
115 def test_u3_remove_has_duration_ms(self, tmp_path: pathlib.Path) -> None:
116 """U3 — trust remove --json includes duration_ms."""
117 _add(tmp_path, "/some/path")
118 rc, out, _ = _invoke_home(["trust", "remove", "/some/path", "--json"], tmp_path)
119 assert rc == 0
120 data = json.loads(out)
121 assert "duration_ms" in data
122
123 def test_u4_remove_has_exit_code(self, tmp_path: pathlib.Path) -> None:
124 """U4 — trust remove --json includes exit_code == 0."""
125 _add(tmp_path, "/some/path")
126 rc, out, _ = _invoke_home(["trust", "remove", "/some/path", "--json"], tmp_path)
127 data = json.loads(out)
128 assert data["exit_code"] == 0
129
130 def test_u5_list_has_duration_ms(self, tmp_path: pathlib.Path) -> None:
131 """U5 — trust list --json includes duration_ms."""
132 rc, out, _ = _invoke_home(["trust", "list", "--json"], tmp_path)
133 assert rc == 0
134 data = json.loads(out)
135 assert "duration_ms" in data
136
137 def test_u6_list_has_exit_code(self, tmp_path: pathlib.Path) -> None:
138 """U6 — trust list --json includes exit_code == 0."""
139 rc, out, _ = _invoke_home(["trust", "list", "--json"], tmp_path)
140 data = json.loads(out)
141 assert data["exit_code"] == 0
142
143 def test_u7_hub_list_has_duration_ms(self, tmp_path: pathlib.Path) -> None:
144 """U7 — trust hub-list --json includes duration_ms."""
145 rc, out, _ = _invoke_home(["trust", "hub-list", "--json"], tmp_path)
146 assert rc == 0
147 data = json.loads(out)
148 assert "duration_ms" in data
149
150 def test_u8_hub_list_has_exit_code(self, tmp_path: pathlib.Path) -> None:
151 """U8 — trust hub-list --json includes exit_code == 0."""
152 rc, out, _ = _invoke_home(["trust", "hub-list", "--json"], tmp_path)
153 data = json.loads(out)
154 assert data["exit_code"] == 0
155
156 def test_u9_hub_reset_has_duration_ms(self, tmp_path: pathlib.Path) -> None:
157 """U9 — trust hub-reset --json includes duration_ms."""
158 rc, out, _ = _invoke_home(["trust", "hub-reset", "h.test", "--json"], tmp_path)
159 assert rc == 0
160 data = json.loads(out)
161 assert "duration_ms" in data
162
163 def test_u10_hub_reset_has_exit_code(self, tmp_path: pathlib.Path) -> None:
164 """U10 — trust hub-reset --json includes exit_code == 0."""
165 rc, out, _ = _invoke_home(["trust", "hub-reset", "h.test", "--json"], tmp_path)
166 data = json.loads(out)
167 assert data["exit_code"] == 0
168
169
170 # ---------------------------------------------------------------------------
171 # E1-E2 — IO errors → JSON to stdout
172 # ---------------------------------------------------------------------------
173
174 class TestIoErrorJsonToStdout:
175 def test_e1_add_io_error_json_on_stdout(self, tmp_path: pathlib.Path) -> None:
176 """E1 — OSError in trust add --json → JSON error on stdout."""
177 import muse.cli.config as cfg_mod
178 import muse.core.hub_trust as ht_mod
179
180 config_file = tmp_path / "config.toml"
181 hub_trust_file = tmp_path / "hub_trust.toml"
182
183 with (
184 unittest.mock.patch.object(cfg_mod, "_GLOBAL_CONFIG_FILE", config_file),
185 unittest.mock.patch.object(ht_mod, "_HUB_TRUST_FILE", hub_trust_file),
186 unittest.mock.patch.object(
187 cfg_mod, "add_global_safe_dir",
188 side_effect=OSError("permission denied"),
189 ),
190 ):
191 result = runner.invoke(None, ["trust", "add", "/some/path", "--json"], env={})
192
193 assert result.exit_code != 0
194 # stdout must be parseable JSON error
195 data = json.loads(result.stdout)
196 assert "error" in data
197
198 def test_e2_remove_io_error_json_on_stdout(self, tmp_path: pathlib.Path) -> None:
199 """E2 — OSError in trust remove --json → JSON error on stdout."""
200 import muse.cli.config as cfg_mod
201 import muse.core.hub_trust as ht_mod
202
203 config_file = tmp_path / "config.toml"
204 hub_trust_file = tmp_path / "hub_trust.toml"
205
206 with (
207 unittest.mock.patch.object(cfg_mod, "_GLOBAL_CONFIG_FILE", config_file),
208 unittest.mock.patch.object(ht_mod, "_HUB_TRUST_FILE", hub_trust_file),
209 unittest.mock.patch.object(
210 cfg_mod, "get_global_safe_dirs",
211 side_effect=OSError("permission denied"),
212 ),
213 ):
214 result = runner.invoke(None, ["trust", "remove", "/some/path", "--json"], env={})
215
216 assert result.exit_code != 0
217 data = json.loads(result.stdout)
218 assert "error" in data
219
220
221 # ---------------------------------------------------------------------------
222 # S1 — error JSON schema
223 # ---------------------------------------------------------------------------
224
225 class TestErrorJsonSchema:
226 _REQUIRED = {"error", "message", "duration_ms", "exit_code"}
227
228 def test_s1_io_error_schema_complete(self, tmp_path: pathlib.Path) -> None:
229 """S1 — IO-error JSON has all required fields."""
230 import muse.cli.config as cfg_mod
231 import muse.core.hub_trust as ht_mod
232
233 config_file = tmp_path / "config.toml"
234 hub_trust_file = tmp_path / "hub_trust.toml"
235
236 with (
237 unittest.mock.patch.object(cfg_mod, "_GLOBAL_CONFIG_FILE", config_file),
238 unittest.mock.patch.object(ht_mod, "_HUB_TRUST_FILE", hub_trust_file),
239 unittest.mock.patch.object(
240 cfg_mod, "add_global_safe_dir",
241 side_effect=OSError("disk full"),
242 ),
243 ):
244 result = runner.invoke(None, ["trust", "add", "/some/path", "--json"], env={})
245
246 data = json.loads(result.stdout)
247 missing = self._REQUIRED - data.keys()
248 assert not missing, f"IO error JSON missing keys: {missing}"
249 assert data["exit_code"] != 0
250
251
252 # ---------------------------------------------------------------------------
253 # SC1-SC2 — count field in list / hub-list
254 # ---------------------------------------------------------------------------
255
256 class TestSchemaCompleteness:
257 def test_sc1_list_has_count(self, tmp_path: pathlib.Path) -> None:
258 """SC1 — trust list --json includes count."""
259 _add(tmp_path, "/p/a")
260 _add(tmp_path, "/p/b")
261 rc, out, _ = _invoke_home(["trust", "list", "--json"], tmp_path)
262 data = json.loads(out)
263 assert "count" in data, f"Missing 'count' in: {data}"
264 assert data["count"] == len(data["trusted_dirs"])
265
266 def test_sc1_list_count_zero_when_empty(self, tmp_path: pathlib.Path) -> None:
267 """SC1b — trust list --json count == 0 when no dirs."""
268 rc, out, _ = _invoke_home(["trust", "list", "--json"], tmp_path)
269 data = json.loads(out)
270 assert data["count"] == 0
271
272 def test_sc2_hub_list_has_count(self, tmp_path: pathlib.Path) -> None:
273 """SC2 — trust hub-list --json includes count."""
274 rc, out, _ = _invoke_home(["trust", "hub-list", "--json"], tmp_path)
275 data = json.loads(out)
276 assert "count" in data, f"Missing 'count' in: {data}"
277 assert data["count"] == len(data["fingerprints"])
278
279 def test_sc2_hub_list_count_zero_when_empty(self, tmp_path: pathlib.Path) -> None:
280 """SC2b — trust hub-list --json count == 0 when no fingerprints."""
281 rc, out, _ = _invoke_home(["trust", "hub-list", "--json"], tmp_path)
282 data = json.loads(out)
283 assert data["count"] == 0
284
285
286 # ---------------------------------------------------------------------------
287 # D1-D5 — Data integrity
288 # ---------------------------------------------------------------------------
289
290 class TestDataIntegrity:
291 def test_d1_relative_path_expanded_in_json(self, tmp_path: pathlib.Path) -> None:
292 """D1 — Relative path in add is expanded to absolute in JSON output."""
293 rc, out, _ = _invoke_home(["trust", "add", ".", "--json"], tmp_path)
294 data = json.loads(out)
295 assert os.path.isabs(data["path"]), f"path not absolute: {data['path']}"
296
297 def test_d2_add_idempotent(self, tmp_path: pathlib.Path) -> None:
298 """D2 — Adding the same path twice does not create duplicates."""
299 _add(tmp_path, "/p/x")
300 _add(tmp_path, "/p/x")
301 rc, out, _ = _invoke_home(["trust", "list", "--json"], tmp_path)
302 data = json.loads(out)
303 count = data["trusted_dirs"].count("/p/x")
304 assert count == 1, f"Expected 1 copy, got {count}"
305
306 def test_d3_remove_absent_is_exit_zero(self, tmp_path: pathlib.Path) -> None:
307 """D3 — trust remove on absent path: removed=false, exit_code=0."""
308 rc, out, _ = _invoke_home(
309 ["trust", "remove", "/not/trusted", "--json"], tmp_path
310 )
311 assert rc == 0
312 data = json.loads(out)
313 assert data["removed"] is False
314 assert data["exit_code"] == 0
315
316 def test_d4_hub_reset_absent_is_exit_zero(self, tmp_path: pathlib.Path) -> None:
317 """D4 — trust hub-reset on absent hostname: reset=false, exit_code=0."""
318 rc, out, _ = _invoke_home(
319 ["trust", "hub-reset", "neverheard.example", "--json"], tmp_path
320 )
321 assert rc == 0
322 data = json.loads(out)
323 assert data["reset"] is False
324 assert data["exit_code"] == 0
325
326 def test_d5_duration_ms_non_negative(self, tmp_path: pathlib.Path) -> None:
327 """D5 — duration_ms is always ≥ 0."""
328 for args in [
329 ["trust", "add", "/p", "--json"],
330 ["trust", "list", "--json"],
331 ["trust", "remove", "/p", "--json"],
332 ["trust", "hub-list", "--json"],
333 ["trust", "hub-reset", "h.test", "--json"],
334 ]:
335 rc, out, _ = _invoke_home(args, tmp_path)
336 data = json.loads(out)
337 assert data["duration_ms"] >= 0.0, f"{args}: duration_ms < 0"
338
339
340 # ---------------------------------------------------------------------------
341 # P1-P2 — Performance
342 # ---------------------------------------------------------------------------
343
344 class TestPerformance:
345 def test_p1_duration_ms_is_float(self, tmp_path: pathlib.Path) -> None:
346 """P1 — duration_ms is a float, not int or None."""
347 rc, out, _ = _invoke_home(["trust", "add", "/p", "--json"], tmp_path)
348 data = json.loads(out)
349 assert isinstance(data["duration_ms"], float), \
350 f"duration_ms is {type(data['duration_ms']).__name__}, expected float"
351
352 def test_p2_all_subcommands_under_5000ms(self, tmp_path: pathlib.Path) -> None:
353 """P2 — All five subcommands complete in < 5000 ms."""
354 cases = [
355 ["trust", "add", "/p", "--json"],
356 ["trust", "list", "--json"],
357 ["trust", "remove", "/p", "--json"],
358 ["trust", "hub-list", "--json"],
359 ["trust", "hub-reset", "h.test", "--json"],
360 ]
361 for args in cases:
362 rc, out, _ = _invoke_home(args, tmp_path)
363 data = json.loads(out)
364 assert data["duration_ms"] < 5000.0, \
365 f"{args}: duration_ms={data['duration_ms']} ≥ 5000"
366
367
368 # ---------------------------------------------------------------------------
369 # Sec1-Sec3 — Security
370 # ---------------------------------------------------------------------------
371
372 class TestSecurity:
373 def test_sec1_relative_path_becomes_absolute(self, tmp_path: pathlib.Path) -> None:
374 """Sec1 — '.' is resolved to an absolute path before storing."""
375 rc, out, _ = _invoke_home(["trust", "add", ".", "--json"], tmp_path)
376 data = json.loads(out)
377 assert data["path"] == os.path.abspath(".")
378 assert data["path"].startswith("/")
379
380 def test_sec2_dotdot_collapsed(self, tmp_path: pathlib.Path) -> None:
381 """Sec2 — /tmp/x/../y is collapsed to /tmp/y in output."""
382 rc, out, _ = _invoke_home(["trust", "add", "/tmp/x/../y", "--json"], tmp_path)
383 data = json.loads(out)
384 assert data["path"] == "/tmp/y", f"Expected /tmp/y, got {data['path']}"
385
386 def test_sec3_ansi_in_path_stored_as_literal(self, tmp_path: pathlib.Path) -> None:
387 """Sec3 — A path containing special characters is stored literally (no injection)."""
388 # The path is just a string — ANSI escapes in filesystem paths are legal
389 # The key is that the stored value equals the normalised input
390 rc, out, _ = _invoke_home(["trust", "add", "/valid/path", "--json"], tmp_path)
391 data = json.loads(out)
392 # stdout must be valid JSON (ANSI doesn't corrupt the JSON)
393 assert data["path"] == "/valid/path"
394
395
396 # ---------------------------------------------------------------------------
397 # C1-C2 — Concurrency
398 # ---------------------------------------------------------------------------
399
400 class TestConcurrency:
401 def test_c1_parallel_add_isolated_configs(self, tmp_path: pathlib.Path) -> None:
402 """C1 — 8 parallel trust add on isolated config files don't corrupt each other."""
403 import muse.cli.config as cfg_mod
404 import muse.core.hub_trust as ht_mod
405
406 errors: list[str] = []
407 lock = threading.Lock()
408
409 def _worker(idx: int) -> None:
410 home = tmp_path / f"home_{idx}"
411 home.mkdir()
412 config_file = home / "config.toml"
413 hub_trust_file = home / "hub_trust.toml"
414 try:
415 with (
416 unittest.mock.patch.object(cfg_mod, "_GLOBAL_CONFIG_FILE", config_file),
417 unittest.mock.patch.object(ht_mod, "_HUB_TRUST_FILE", hub_trust_file),
418 ):
419 for i in range(5):
420 result = runner.invoke(
421 None,
422 ["trust", "add", f"/path/{idx}/{i}", "--json"],
423 env={},
424 )
425 data = json.loads(result.stdout)
426 if not data.get("added"):
427 with lock:
428 errors.append(f"worker {idx} add {i}: {data}")
429
430 result = runner.invoke(
431 None, ["trust", "list", "--json"], env={}
432 )
433 # Can't check list here because patch is released — just check no error
434 except Exception as exc:
435 with lock:
436 errors.append(f"worker {idx}: {exc}")
437
438 threads = [threading.Thread(target=_worker, args=(i,)) for i in range(8)]
439 for t in threads:
440 t.start()
441 for t in threads:
442 t.join()
443 assert not errors, f"Concurrent errors: {errors}"
444
445 def test_c2_parallel_list_reads_consistent(self, tmp_path: pathlib.Path) -> None:
446 """C2 — 8 parallel TOML reads on isolated files are consistent.
447
448 ``mock.patch.object`` on a shared module attribute is not thread-safe
449 when all 8 threads patch simultaneously. Instead each reader calls
450 ``tomllib.load()`` directly on its own pre-written config file — the
451 same code path that ``get_global_safe_dirs()`` uses internally — so we
452 exercise the file-parsing layer under concurrent load without any shared
453 mutable state.
454 """
455 import tomllib
456
457 # Write 8 isolated config files serially before spawning threads.
458 config_files: list[pathlib.Path] = []
459 for idx in range(8):
460 cfg = tmp_path / f"cfg_{idx}.toml"
461 cfg.write_text(
462 '[security]\nsafe_dirs = ["/p/a", "/p/b", "/p/c"]\n',
463 encoding="utf-8",
464 )
465 config_files.append(cfg)
466
467 errors: list[str] = []
468 r_lock = threading.Lock()
469
470 def _reader(idx: int) -> None:
471 try:
472 with config_files[idx].open("rb") as fh:
473 raw = tomllib.load(fh)
474 dirs = raw.get("security", {}).get("safe_dirs", [])
475 count = len(dirs)
476 if count != 3:
477 with r_lock:
478 errors.append(f"reader {idx}: expected 3, got {count}: {dirs}")
479 except Exception as exc:
480 with r_lock:
481 errors.append(f"reader {idx}: {exc}")
482
483 threads = [threading.Thread(target=_reader, args=(i,)) for i in range(8)]
484 for t in threads:
485 t.start()
486 for t in threads:
487 t.join()
488 assert not errors, f"Reader errors: {errors}"
File History 1 commit