"""Muse Wire Protocol — end-to-end localhost integration tests. Requires ``https://localhost:1337`` to be running with gabriel's identity registered. All tests are auto-skipped when the hub is not reachable. The local hub uses a self-signed TLS cert (deploy/local-tls/) so all urllib calls use an unverified SSL context and all httpx calls pass verify=False. Coverage -------- T1 Hub health + auth — whoami round-trip T2 Repo lifecycle — create, list, delete hub repo T3 Push (cold) — initial push of local commits to a fresh hub repo T4 Clone — clone a pushed repo, verify snapshot equality T5 Incremental push — push new commits, only delta transferred T6 Pull — push from location A, pull from B, verify merge result T7 Fetch — fetch from remote, objects arrive, local HEAD unchanged T8 Force push — divergent history accepted with --force T9 Cross-repo — multi-file repo (contracts-style) full push/clone cycle T10 Idempotent re-push — re-push same commits, 0 new objects stored """ from __future__ import annotations import datetime import itertools import json import pathlib import time import urllib.error import urllib.request from collections.abc import Mapping from typing import TYPE_CHECKING, TypedDict if TYPE_CHECKING: from muse.core.transport import SigningIdentity import ssl import pytest # Unverified SSL context for the self-signed localhost cert. _SSL_NOVERIFY = ssl.create_default_context() _SSL_NOVERIFY.check_hostname = False _SSL_NOVERIFY.verify_mode = ssl.CERT_NONE class _HubRepo(TypedDict): repo_id: str slug: str url: str from muse._version import __version__ from muse.cli.config import get_signing_identity from muse.core.msign import build_msign_header from muse.core.object_store import write_object from muse.core.ids import hash_commit as compute_commit_id, hash_snapshot as compute_snapshot_id from muse.core.refs import get_head_commit_id from muse.core.commits import ( CommitRecord, read_commit, write_commit, ) from muse.core.snapshots import ( SnapshotRecord, read_snapshot, write_snapshot, ) from tests.cli_test_helper import CliRunner from muse.core.types import blob_id, content_hash from muse.core.paths import heads_dir, muse_dir, ref_path _id_seq = itertools.count() def _new_id() -> str: return content_hash({"seq": next(_id_seq)}) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- HUB = "https://localhost:1337" OWNER = "gabriel" runner = CliRunner() # --------------------------------------------------------------------------- # Hub availability guard — skip entire module if hub not reachable # --------------------------------------------------------------------------- def _hub_reachable() -> bool: try: urllib.request.urlopen(f"{HUB}/healthz", timeout=2, context=_SSL_NOVERIFY) return True except (urllib.error.URLError, OSError): return False def _identity_registered() -> bool: """Return True only if hub is reachable AND gabriel's identity is registered.""" if not _hub_reachable(): return False from muse.cli.config import get_signing_identity from muse.core.msign import build_msign_header signing = get_signing_identity(remote_url=HUB) if signing is None: return False url = f"{HUB}/api/identities/{signing.handle}" auth = build_msign_header(signing, "GET", url, None) req = urllib.request.Request(url, headers={"Authorization": auth, "Accept": "application/json"}) try: urllib.request.urlopen(req, timeout=5, context=_SSL_NOVERIFY) return True except (urllib.error.URLError, urllib.error.HTTPError, OSError): return False pytestmark = pytest.mark.skipif( not _identity_registered(), reason="localhost hub not reachable or identity not registered — run: muse auth register", ) # --------------------------------------------------------------------------- # Auth helper # --------------------------------------------------------------------------- def _signing() -> "SigningIdentity": """Return gabriel's signing identity for localhost.""" signing = get_signing_identity(remote_url=HUB) if signing is None: pytest.skip("No signing identity for localhost hub") return signing def _hub_request(method: str, path: str, body: Mapping[str, object] | None = None) -> Mapping[str, object]: """Make a signed API request to the local hub. Returns parsed JSON.""" signing = _signing() url = f"{HUB}{path}" data: bytes | None = None if body is not None: data = json.dumps(body).encode() auth = build_msign_header(signing, method, url, data) headers: dict[str, str] = { "Authorization": auth, "Accept": "application/json", } if data is not None: headers["Content-Type"] = "application/json" req = urllib.request.Request(url, data=data, headers=headers, method=method) try: with urllib.request.urlopen(req, timeout=15, context=_SSL_NOVERIFY) as resp: body = resp.read() return json.loads(body) if body.strip() else {} except urllib.error.HTTPError as exc: raw = exc.read().decode(errors="replace") if exc.code in (401, 403) or (exc.code == 404 and "identity not found" in raw): pytest.skip("Identity not registered on localhost hub — run: muse auth register") pytest.fail(f"Hub request failed: {method} {path} → {exc.code}: {raw}") # --------------------------------------------------------------------------- # Hub repo lifecycle fixture # --------------------------------------------------------------------------- @pytest.fixture def hub_repo() -> _HubRepo: """Create a private test hub repo; delete it after the test.""" slug = f"test-wire-{_new_id()[7:15]}" resp = _hub_request("POST", "/api/repos", { "name": slug, "owner": OWNER, "visibility": "private", "domain": "code", }) repo_id: str = resp["repoId"] yield {"repo_id": repo_id, "slug": slug, "url": f"{HUB}/{OWNER}/{slug}"} # Cleanup — tolerate 404 if test already deleted it try: _hub_request("DELETE", f"/api/repos/{repo_id}") except BaseException: pass # --------------------------------------------------------------------------- # Local repo builder # --------------------------------------------------------------------------- def _init_local_repo( root: pathlib.Path, hub_slug: str, *, branch: str = "main", n_commits: int = 1, file_tree: dict[str, bytes] | None = None, ) -> list[str]: """Initialise a .muse/ repo with N commits; return commit IDs oldest-first. ``file_tree`` maps path → content for the first commit. Subsequent commits each add/update a single generated file. """ dot_muse = muse_dir(root) for sub in ("refs/heads", "objects", "commits", "snapshots"): (dot_muse / sub).mkdir(parents=True, exist_ok=True) (dot_muse / "repo.json").write_text(json.dumps({ "repo_id": f"test-{hub_slug}", "schema_version": __version__, "domain": "code", })) (dot_muse / "HEAD").write_text(f"ref: refs/heads/{branch}\n") (dot_muse / "config.toml").write_text( f'[remotes.local]\nurl = "{HUB}/{OWNER}/{hub_slug}"\n' ) if file_tree is None: file_tree = {f"file_{_new_id()[7:13]}.txt": b"initial content"} commit_ids: list[str] = [] manifest: dict[str, str] = {} # First commit — full file tree for path, content in file_tree.items(): oid = blob_id(content) write_object(root, oid, content) manifest[path] = oid snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=dict(manifest))) now = datetime.datetime.now(tz=datetime.timezone.utc) cid = compute_commit_id( parent_ids=[], snapshot_id=snap_id, message="initial commit", committed_at_iso=now.isoformat(), author=OWNER,) write_commit(root, CommitRecord( commit_id=cid, branch=branch, snapshot_id=snap_id, message="initial commit", committed_at=now, author=OWNER, )) commit_ids.append(cid) parent_ids = [cid] # Additional commits for i in range(1, n_commits): extra = f"extra_{i}_{_new_id()[7:11]}.txt" content = f"commit {i} content".encode() oid = blob_id(content) write_object(root, oid, content) manifest = dict(manifest) manifest[extra] = oid snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=dict(manifest))) now = datetime.datetime.now(tz=datetime.timezone.utc) cid = compute_commit_id( parent_ids=parent_ids, snapshot_id=snap_id, message=f"commit {i}", committed_at_iso=now.isoformat(), author=OWNER,) write_commit(root, CommitRecord( commit_id=cid, branch=branch, snapshot_id=snap_id, message=f"commit {i}", committed_at=now, author=OWNER, parent_commit_id=parent_ids[0] if len(parent_ids) == 1 else None, parent2_commit_id=parent_ids[1] if len(parent_ids) > 1 else None, )) commit_ids.append(cid) parent_ids = [cid] (dot_muse / "refs" / "heads" / branch).write_text(commit_ids[-1]) return commit_ids def _add_commit(root: pathlib.Path, hub_slug: str, branch: str = "main") -> str: """Append one more commit to an existing local repo. Returns new commit ID.""" from muse.core.snapshots import read_snapshot as _read_snap parent_cid = get_head_commit_id(root, branch) parent_rec = read_commit(root, parent_cid) parent_snap = _read_snap(root, parent_rec.snapshot_id) manifest = dict(parent_snap.manifest) if parent_snap else {} extra = f"extra_{_new_id()[7:13]}.txt" content = f"added: {extra}".encode() oid = blob_id(content) write_object(root, oid, content) manifest[extra] = oid snap_id = compute_snapshot_id(manifest) write_snapshot(root, SnapshotRecord(snapshot_id=snap_id, manifest=dict(manifest))) now = datetime.datetime.now(tz=datetime.timezone.utc) cid = compute_commit_id( parent_ids=[parent_cid], snapshot_id=snap_id, message=f"new commit {extra}", committed_at_iso=now.isoformat(), author=OWNER,) write_commit(root, CommitRecord( commit_id=cid, branch=branch, snapshot_id=snap_id, message=f"new commit {extra}", committed_at=now, author=OWNER, parent_commit_id=parent_cid, )) (ref_path(root, branch)).write_text(cid) return cid # --------------------------------------------------------------------------- # T1 — Hub health + auth # --------------------------------------------------------------------------- class TestT1Auth: """Tier 1: hub is up, gabriel's identity round-trips.""" def test_healthz(self) -> None: resp = urllib.request.urlopen(f"{HUB}/healthz", timeout=5, context=_SSL_NOVERIFY) data = json.loads(resp.read()) assert data["status"] == "ok" assert data["db"] is True def test_whoami(self) -> None: """Signed GET to /api/identities/{handle} returns gabriel's handle.""" data = _hub_request("GET", f"/api/identities/{OWNER}") assert data.get("handle") == OWNER or data.get("owner") == OWNER or OWNER in str(data) # --------------------------------------------------------------------------- # T2 — Repo lifecycle # --------------------------------------------------------------------------- class TestT2RepoLifecycle: """Tier 2: create, verify presence, delete a hub repo.""" def test_create_and_list(self, hub_repo: _HubRepo) -> None: slug = hub_repo["slug"] data = _hub_request("GET", f"/{OWNER}/{slug}/refs") assert "branch_heads" in data or "branches" in data or data is not None def test_repo_id_is_sha256(self, hub_repo: _HubRepo) -> None: repo_id = hub_repo["repo_id"] assert repo_id.startswith("sha256:"), ( f"repo_id should be sha256-addressed, got: {repo_id!r}" ) def test_delete_returns_no_content(self, hub_repo: _HubRepo) -> None: """Explicit delete — fixture cleanup would also cover this, but verify 204.""" signing = _signing() url = f"{HUB}/api/repos/{hub_repo['repo_id']}" auth = build_msign_header(signing, "DELETE", url, None) req = urllib.request.Request( url, headers={"Authorization": auth, "Accept": "application/json"}, method="DELETE" ) try: with urllib.request.urlopen(req, timeout=10, context=_SSL_NOVERIFY) as resp: assert resp.status == 204 except urllib.error.HTTPError as exc: if exc.code == 204: pass # urllib raises on 204, treat as success else: pytest.fail(f"Delete failed: {exc.code} {exc.read()}") # Fixture cleanup will get a 404 — that's fine, it tolerates it # --------------------------------------------------------------------------- # T3 — Push (cold) # --------------------------------------------------------------------------- class TestT3ColdPush: """Tier 3: push a local repo to a fresh hub repo.""" def test_initial_push_succeeds( self, tmp_path: pathlib.Path, hub_repo: _HubRepo, monkeypatch: pytest.MonkeyPatch ) -> None: root = tmp_path / "local" root.mkdir() commit_ids = _init_local_repo(root, hub_repo["slug"], n_commits=3) monkeypatch.chdir(root) result = runner.invoke(None, ["push", "local", "main"]) assert result.exit_code == 0, f"push failed:\n{result.output}\n{result.stderr}" assert "Pushed" in result.output or "✅" in result.output def test_push_reports_commit_count( self, tmp_path: pathlib.Path, hub_repo: _HubRepo, monkeypatch: pytest.MonkeyPatch ) -> None: root = tmp_path / "local" root.mkdir() _init_local_repo(root, hub_repo["slug"], n_commits=5) monkeypatch.chdir(root) result = runner.invoke(None, ["push", "local", "main"]) assert result.exit_code == 0 # Output should mention commit count output = result.output + (result.stderr or "") assert any(c.isdigit() for c in output), "Expected numeric output (commit/object counts)" def test_push_empty_repo_succeeds( self, tmp_path: pathlib.Path, hub_repo: _HubRepo, monkeypatch: pytest.MonkeyPatch ) -> None: """Even a single-commit repo pushes cleanly.""" root = tmp_path / "local" root.mkdir() _init_local_repo(root, hub_repo["slug"], n_commits=1) monkeypatch.chdir(root) result = runner.invoke(None, ["push", "local", "main"]) assert result.exit_code == 0 # --------------------------------------------------------------------------- # T4 — Clone # --------------------------------------------------------------------------- class TestT4Clone: """Tier 4: clone a pushed repo and verify snapshot equality.""" def test_clone_restores_snapshot( self, tmp_path: pathlib.Path, hub_repo: _HubRepo, monkeypatch: pytest.MonkeyPatch ) -> None: # Push from location A src = tmp_path / "source" src.mkdir() commit_ids = _init_local_repo( src, hub_repo["slug"], n_commits=2, file_tree={ "README.md": b"# Test repo", "src/main.py": b"print('hello')", } ) monkeypatch.chdir(src) push_result = runner.invoke(None, ["push", "local", "main"]) assert push_result.exit_code == 0, push_result.output # Clone to location B dst = tmp_path / "clone" result = runner.invoke(None, ["clone", hub_repo["url"], str(dst)]) assert result.exit_code == 0, f"clone failed:\n{result.output}\n{result.stderr}" assert dst.exists(), "Clone directory not created" # Verify HEAD commit matches cloned_head = get_head_commit_id(dst, "main") assert cloned_head == commit_ids[-1], ( f"Cloned HEAD {cloned_head} != expected {commit_ids[-1]}" ) def test_clone_restores_file_objects( self, tmp_path: pathlib.Path, hub_repo: _HubRepo, monkeypatch: pytest.MonkeyPatch ) -> None: file_content = b"unique content for object verification wire-test" oid = blob_id(file_content) src = tmp_path / "source" src.mkdir() _init_local_repo(src, hub_repo["slug"], file_tree={"data.bin": file_content}) monkeypatch.chdir(src) runner.invoke(None, ["push", "local", "main"]) dst = tmp_path / "clone" result = runner.invoke(None, ["clone", hub_repo["url"], str(dst)]) assert result.exit_code == 0 # Verify object content was transferred from muse.core.object_store import read_object cloned_obj = read_object(dst, oid) assert cloned_obj == file_content # --------------------------------------------------------------------------- # T5 — Incremental push # --------------------------------------------------------------------------- class TestT5IncrementalPush: """Tier 5: second push transfers only new objects.""" def test_incremental_push_succeeds( self, tmp_path: pathlib.Path, hub_repo: _HubRepo, monkeypatch: pytest.MonkeyPatch ) -> None: root = tmp_path / "local" root.mkdir() _init_local_repo(root, hub_repo["slug"], n_commits=2) monkeypatch.chdir(root) # First push r1 = runner.invoke(None, ["push", "local", "main"]) assert r1.exit_code == 0, r1.output # Add a commit new_cid = _add_commit(root, hub_repo["slug"]) # Second push — should succeed with fewer objects r2 = runner.invoke(None, ["push", "local", "main"]) assert r2.exit_code == 0, r2.output output = r2.output + (r2.stderr or "") assert "Pushed" in output or "✅" in output def test_incremental_push_head_advances( self, tmp_path: pathlib.Path, hub_repo: _HubRepo, monkeypatch: pytest.MonkeyPatch ) -> None: root = tmp_path / "local" root.mkdir() _init_local_repo(root, hub_repo["slug"], n_commits=1) monkeypatch.chdir(root) runner.invoke(None, ["push", "local", "main"]) new_cid = _add_commit(root, hub_repo["slug"]) r2 = runner.invoke(None, ["push", "local", "main"]) assert r2.exit_code == 0 # Verify hub refs report the new head refs_data = _hub_request("GET", f"/{OWNER}/{hub_repo['slug']}/refs") branch_heads = refs_data.get("branch_heads", refs_data.get("branches", {})) assert "main" in branch_heads # Head should now be the new commit (or its sha256: prefixed form) hub_head = branch_heads["main"] assert new_cid.lstrip("sha256:") in hub_head or hub_head in new_cid # --------------------------------------------------------------------------- # T6 — Pull # --------------------------------------------------------------------------- class TestT6Pull: """Tier 6: push from A, pull from B, verify merge result.""" def test_pull_updates_local_head( self, tmp_path: pathlib.Path, hub_repo: _HubRepo, monkeypatch: pytest.MonkeyPatch ) -> None: # Push initial commits from location A src = tmp_path / "A" src.mkdir() commit_ids = _init_local_repo(src, hub_repo["slug"], n_commits=2) monkeypatch.chdir(src) runner.invoke(None, ["push", "local", "main"]) # Clone to location B dst = tmp_path / "B" r_clone = runner.invoke(None, ["clone", hub_repo["url"], str(dst)]) assert r_clone.exit_code == 0, r_clone.output # Push a new commit from A monkeypatch.chdir(src) new_cid = _add_commit(src, hub_repo["slug"]) runner.invoke(None, ["push", "local", "main"]) # Pull from B (clone creates remote named 'origin') monkeypatch.chdir(dst) r_pull = runner.invoke(None, ["pull", "origin", "main"]) assert r_pull.exit_code == 0, f"pull failed:\n{r_pull.output}\n{r_pull.stderr}" # B's HEAD should now match A's HEAD b_head = get_head_commit_id(dst, "main") assert b_head == new_cid, f"Pull did not advance B's HEAD: {b_head} != {new_cid}" # --------------------------------------------------------------------------- # T7 — Fetch # --------------------------------------------------------------------------- class TestT7Fetch: """Tier 7: fetch downloads objects but does not move local HEAD.""" def test_fetch_does_not_move_head( self, tmp_path: pathlib.Path, hub_repo: _HubRepo, monkeypatch: pytest.MonkeyPatch ) -> None: # Push initial from A src = tmp_path / "A" src.mkdir() commit_ids = _init_local_repo(src, hub_repo["slug"], n_commits=1) monkeypatch.chdir(src) runner.invoke(None, ["push", "local", "main"]) # Clone to B — B now has commit_ids[-1] as HEAD dst = tmp_path / "B" r_clone = runner.invoke(None, ["clone", hub_repo["url"], str(dst)]) assert r_clone.exit_code == 0 b_head_before = get_head_commit_id(dst, "main") # Push new commit from A monkeypatch.chdir(src) _add_commit(src, hub_repo["slug"]) runner.invoke(None, ["push", "local", "main"]) # Fetch from B — should NOT move main HEAD (clone creates remote 'origin') monkeypatch.chdir(dst) r_fetch = runner.invoke(None, ["fetch", "origin"]) assert r_fetch.exit_code == 0, f"fetch failed:\n{r_fetch.output}\n{r_fetch.stderr}" b_head_after = get_head_commit_id(dst, "main") assert b_head_after == b_head_before, ( f"fetch must not advance HEAD: was {b_head_before}, now {b_head_after}" ) # --------------------------------------------------------------------------- # T8 — Force push # --------------------------------------------------------------------------- class TestT8ForcePush: """Tier 8: divergent history rejected by default, accepted with --force.""" def test_non_fast_forward_rejected( self, tmp_path: pathlib.Path, hub_repo: _HubRepo, monkeypatch: pytest.MonkeyPatch ) -> None: # Push 2 commits src = tmp_path / "local" src.mkdir() commit_ids = _init_local_repo(src, hub_repo["slug"], n_commits=2) monkeypatch.chdir(src) runner.invoke(None, ["push", "local", "main"]) # Rewind HEAD to first commit (create divergent history) head_ref = heads_dir(src) / "main" head_ref.write_text(commit_ids[0]) # Try normal push — should fail (not fast-forward) r = runner.invoke(None, ["push", "local", "main"]) # Either exit code non-zero OR output contains rejection message assert r.exit_code != 0 or "not fast-forward" in (r.output + (r.stderr or "")).lower() def test_force_push_accepted( self, tmp_path: pathlib.Path, hub_repo: _HubRepo, monkeypatch: pytest.MonkeyPatch ) -> None: # Push 2 commits src = tmp_path / "local" src.mkdir() commit_ids = _init_local_repo(src, hub_repo["slug"], n_commits=2) monkeypatch.chdir(src) runner.invoke(None, ["push", "local", "main"]) # Rewind to first commit and add a divergent commit head_ref = heads_dir(src) / "main" head_ref.write_text(commit_ids[0]) _add_commit(src, hub_repo["slug"]) # Force push r = runner.invoke(None, ["push", "local", "main", "--force"]) assert r.exit_code == 0, f"force push failed:\n{r.output}\n{r.stderr}" # --------------------------------------------------------------------------- # T9 — Cross-repo (contracts-style multi-file repo) # --------------------------------------------------------------------------- class TestT9CrossRepo: """Tier 9: rich multi-file repo — push, clone, pull full cycle.""" def test_multi_file_repo_round_trip( self, tmp_path: pathlib.Path, hub_repo: _HubRepo, monkeypatch: pytest.MonkeyPatch ) -> None: # Build a contracts-style repo with many files file_tree = { "README.md": b"# contracts\nShared type contracts.", "muse_contracts/__init__.py": b"", "muse_contracts/wire.py": b"from dataclasses import dataclass\n", "muse_contracts/issue.py": b"from typing import TypedDict\n", "docs/reference/type-contracts.md": b"# Type Contracts\n", "scripts/gen_type_contracts.py": b"#!/usr/bin/env python3\n", "pyproject.toml": b"[tool.poetry]\nname = 'muse-contracts'\n", } src = tmp_path / "contracts" src.mkdir() commit_ids = _init_local_repo( src, hub_repo["slug"], n_commits=1, file_tree=file_tree ) monkeypatch.chdir(src) r_push = runner.invoke(None, ["push", "local", "main"]) assert r_push.exit_code == 0, r_push.output dst = tmp_path / "contracts_clone" r_clone = runner.invoke(None, ["clone", hub_repo["url"], str(dst)]) assert r_clone.exit_code == 0, r_clone.output # Verify all file objects transferred from muse.core.object_store import read_object cloned_head = get_head_commit_id(dst, "main") cloned_commit = read_commit(dst, cloned_head) cloned_snap = read_snapshot(dst, cloned_commit.snapshot_id) assert cloned_snap is not None for path, content in file_tree.items(): expected_oid = blob_id(content) assert path in cloned_snap.manifest, f"Missing file in cloned snapshot: {path}" assert cloned_snap.manifest[path] == expected_oid cloned_bytes = read_object(dst, expected_oid) assert cloned_bytes == content, f"Content mismatch for {path}" def test_multi_commit_pull_from_cross_repo( self, tmp_path: pathlib.Path, hub_repo: _HubRepo, monkeypatch: pytest.MonkeyPatch ) -> None: """Push 3 commits, clone, push 2 more, pull — verify all 5 commits.""" src = tmp_path / "src" src.mkdir() commit_ids = _init_local_repo(src, hub_repo["slug"], n_commits=3) monkeypatch.chdir(src) runner.invoke(None, ["push", "local", "main"]) dst = tmp_path / "dst" runner.invoke(None, ["clone", hub_repo["url"], str(dst)]) monkeypatch.chdir(src) new1 = _add_commit(src, hub_repo["slug"]) new2 = _add_commit(src, hub_repo["slug"]) runner.invoke(None, ["push", "local", "main"]) monkeypatch.chdir(dst) r = runner.invoke(None, ["pull", "origin", "main"]) assert r.exit_code == 0, r.output # Walk commit chain — should have 5 commits total head = get_head_commit_id(dst, "main") assert head == new2 seen = [] cid = head while cid: rec = read_commit(dst, cid) seen.append(cid) cid = rec.parent_commit_id if rec else None assert len(seen) == 5, f"Expected 5 commits in chain, got {len(seen)}" # --------------------------------------------------------------------------- # T10 — Idempotent re-push # --------------------------------------------------------------------------- class TestT10IdempotentPush: """Tier 10: re-pushing the same commits transfers 0 new objects.""" def test_idempotent_push_zero_new_objects( self, tmp_path: pathlib.Path, hub_repo: _HubRepo, monkeypatch: pytest.MonkeyPatch ) -> None: root = tmp_path / "local" root.mkdir() _init_local_repo(root, hub_repo["slug"], n_commits=3) monkeypatch.chdir(root) # First push r1 = runner.invoke(None, ["push", "local", "main"]) assert r1.exit_code == 0, r1.output # Second push — identical history, nothing new r2 = runner.invoke(None, ["push", "local", "main"]) assert r2.exit_code == 0, r2.output output = r2.output + (r2.stderr or "") # Remote should report nothing new to push assert ( "already present" in output or "0 commit" in output or "✅" in output or "up to date" in output.lower() or "already at" in output.lower() ) def test_nuke_and_repush( self, tmp_path: pathlib.Path, hub_repo: _HubRepo, monkeypatch: pytest.MonkeyPatch ) -> None: """Delete the hub repo, recreate it, re-push — full idempotency check.""" root = tmp_path / "local" root.mkdir() commit_ids = _init_local_repo(root, hub_repo["slug"], n_commits=4) monkeypatch.chdir(root) # First push r1 = runner.invoke(None, ["push", "local", "main"]) assert r1.exit_code == 0 # Nuke the hub repo _hub_request("DELETE", f"/api/repos/{hub_repo['repo_id']}") # Recreate with same slug resp = _hub_request("POST", "/api/repos", { "name": hub_repo["slug"], "owner": OWNER, "visibility": "private", "domain": "code", }) hub_repo["repo_id"] = resp["repoId"] # update for fixture cleanup # Re-push same local commits r2 = runner.invoke(None, ["push", "local", "main"]) assert r2.exit_code == 0, f"re-push after nuke failed:\n{r2.output}\n{r2.stderr}" # Verify all commits landed correctly head = get_head_commit_id(root, "main") assert head == commit_ids[-1]